Merge branch 'master' into nomerge2

This commit is contained in:
Ivan Skytte Jørgensen
2017-03-31 12:39:05 +02:00

@ -268,6 +268,8 @@ class JobScheduler_impl {
std::map<thread_type_t,JobTypeStatistics> job_statistics;
bool submit(thread_type_t thread_type, JobEntry &e);
void cancel_queued_jobs(JobQueue &jq, job_exit_t job_exit);
public:
JobScheduler_impl(unsigned num_coordinator_threads, unsigned num_cpu_threads, unsigned num_summary_threads, unsigned num_io_threads, unsigned num_external_threads, unsigned num_file_meta_threads, unsigned num_merge_threads, job_done_notify_t job_done_notify)
: mtx PTHREAD_MUTEX_INITIALIZER,
@ -360,34 +362,13 @@ JobScheduler_impl::~JobScheduler_impl() {
//Then cancel all outstanding non-started jobs by moving them from the pending queues to the exit-set
{
ScopedLock sl(mtx);
while(!coordinator_job_queue.empty()) {
exit_set.push_back(std::make_pair(coordinator_job_queue.back(),job_exit_program_exit));
coordinator_job_queue.pop_back();
}
while(!cpu_job_queue.empty()) {
exit_set.push_back(std::make_pair(cpu_job_queue.back(),job_exit_cancelled));
cpu_job_queue.pop_back();
}
while(!summary_job_queue.empty()) {
exit_set.push_back(std::make_pair(summary_job_queue.back(),job_exit_program_exit));
summary_job_queue.pop_back();
}
while(!io_job_queue.empty()) {
exit_set.push_back(std::make_pair(io_job_queue.back(),job_exit_program_exit));
io_job_queue.pop_back();
}
while(!external_job_queue.empty()) {
exit_set.push_back(std::make_pair(external_job_queue.back(),job_exit_program_exit));
external_job_queue.pop_back();
}
while(!file_meta_job_queue.empty()) {
exit_set.push_back(std::make_pair(file_meta_job_queue.back(),job_exit_program_exit));
file_meta_job_queue.pop_back();
}
while(!merge_job_queue.empty()) {
exit_set.push_back(std::make_pair(merge_job_queue.back(),job_exit_program_exit));
merge_job_queue.pop_back();
}
cancel_queued_jobs(coordinator_job_queue,job_exit_program_exit);
cancel_queued_jobs(cpu_job_queue,job_exit_program_exit);
cancel_queued_jobs(summary_job_queue,job_exit_program_exit);
cancel_queued_jobs(io_job_queue,job_exit_program_exit);
cancel_queued_jobs(external_job_queue,job_exit_program_exit);
cancel_queued_jobs(file_meta_job_queue,job_exit_program_exit);
cancel_queued_jobs(merge_job_queue,job_exit_program_exit);
}
//Call finish-callbacks for all the exited / cancelled threads
@ -460,6 +441,14 @@ bool JobScheduler_impl::submit(thread_type_t thread_type, JobEntry &e)
}
void JobScheduler_impl::cancel_queued_jobs(JobQueue &jq, job_exit_t job_exit) {
while(!jq.empty()) {
exit_set.push_back(std::make_pair(jq.back(),job_exit));
jq.pop_back();
}
}
bool JobScheduler_impl::submit(start_routine_t start_routine,
finish_routine_t finish_callback,
void *state,
@ -555,34 +544,13 @@ bool JobScheduler_impl::is_reading_file(const BigFile *bf)
void JobScheduler_impl::cancel_all_jobs_for_shutdown() {
ScopedLock sl(mtx);
while(!coordinator_job_queue.empty()) {
exit_set.push_back(std::make_pair(coordinator_job_queue.back(),job_exit_program_exit));
coordinator_job_queue.pop_back();
}
while(!cpu_job_queue.empty()) {
exit_set.push_back(std::make_pair(cpu_job_queue.back(),job_exit_program_exit));
cpu_job_queue.pop_back();
}
while(!summary_job_queue.empty()) {
exit_set.push_back(std::make_pair(summary_job_queue.back(),job_exit_program_exit));
summary_job_queue.pop_back();
}
while(!io_job_queue.empty()) {
exit_set.push_back(std::make_pair(io_job_queue.back(),job_exit_program_exit));
io_job_queue.pop_back();
}
while(!external_job_queue.empty()) {
exit_set.push_back(std::make_pair(external_job_queue.back(),job_exit_program_exit));
external_job_queue.pop_back();
}
while(!file_meta_job_queue.empty()) {
exit_set.push_back(std::make_pair(file_meta_job_queue.back(),job_exit_program_exit));
file_meta_job_queue.pop_back();
}
while(!merge_job_queue.empty()) {
exit_set.push_back(std::make_pair(merge_job_queue.back(),job_exit_program_exit));
merge_job_queue.pop_back();
}
cancel_queued_jobs(coordinator_job_queue,job_exit_program_exit);
cancel_queued_jobs(cpu_job_queue,job_exit_program_exit);
cancel_queued_jobs(summary_job_queue,job_exit_program_exit);
cancel_queued_jobs(io_job_queue,job_exit_program_exit);
cancel_queued_jobs(external_job_queue,job_exit_program_exit);
cancel_queued_jobs(file_meta_job_queue,job_exit_program_exit);
cancel_queued_jobs(merge_job_queue,job_exit_program_exit);
}