#include "JobScheduler.h" #include "ScopedLock.h" #include "BigFile.h" //for FileState definition #include <pthread.h> #include <vector> #include <list> #include <algorithm> #include <stdexcept> #include <assert.h> #include <sys/time.h> #include <stdio.h> namespace { //return current time expressed as milliseconds since 1970 static uint64_t now_ms() { struct timeval tv; gettimeofday(&tv,0); return tv.tv_sec*1000 + tv.tv_usec/1000; } //A job (queued, running or finished) struct JobEntry { start_routine_t start_routine; finish_routine_t finish_callback; void *state; bool is_io_job; //for scheduling: uint64_t start_deadline; //latest time when this job must be started bool is_io_write_job; //valid for I/O jobs: mostly read or mostly write? int initial_priority; //priority when queued //for statistics: thread_type_t thread_type; uint64_t queue_enter_time; //when this job was queued uint64_t start_time; //when this job started running uint64_t stop_time; //when this job stopped running uint64_t finish_time; //when the finish callback was called uint64_t exit_time; //when this job was finished, including finish-callback }; //a set of jobs, prioritized class JobQueue : public std::vector<JobEntry> { public: pthread_cond_t cond_not_empty; unsigned potential_worker_threads; JobQueue() : vector(), cond_not_empty PTHREAD_COND_INITIALIZER, potential_worker_threads(0) { } ~JobQueue() { pthread_cond_destroy(&cond_not_empty); } void add(const JobEntry &e) { push_back(e); pthread_cond_signal(&cond_not_empty); } JobEntry pop_top_priority(); }; JobEntry JobQueue::pop_top_priority() { assert(!empty()); //todo: age old entries std::vector<JobEntry>::iterator best_iter = begin(); std::vector<JobEntry>::iterator iter = best_iter; ++iter; for( ; iter!=end(); ++iter) if(iter->initial_priority<best_iter->initial_priority) best_iter = iter; JobEntry tmp = *best_iter; erase(best_iter); return tmp; } typedef std::vector<std::pair<JobEntry,job_exit_t>> ExitSet; typedef std::list<JobEntry> RunningSet; //parameters given to a pool thread struct PoolThreadParameters { JobQueue *job_queue; //queue to fetch jobs from RunningSet *running_set; //set to store the job in while executing it ExitSet *exit_set; //set to store the finished job+exit-cause in unsigned *num_io_write_jobs_running; //global counter for scheduling pthread_mutex_t *mtx; //mutex covering above 3 containers job_done_notify_t job_done_notify; //notifycation callback whenever a job returns bool stop; }; extern "C" { static void *job_pool_thread_function(void *pv) { PoolThreadParameters *ptp= static_cast<PoolThreadParameters*>(pv); pthread_mutex_lock(ptp->mtx); while(!ptp->stop) { if(ptp->job_queue->empty()) pthread_cond_wait(&ptp->job_queue->cond_not_empty,ptp->mtx); if(ptp->stop) break; if(ptp->job_queue->empty()) //spurious wakeup continue; //take the top-priority job and move it into the running set RunningSet::iterator iter = ptp->running_set->insert(ptp->running_set->begin(),ptp->job_queue->pop_top_priority()); if(iter->is_io_job && iter->is_io_write_job) ++*(ptp->num_io_write_jobs_running); pthread_mutex_unlock(ptp->mtx); job_exit_t job_exit; uint64_t now = now_ms(); iter->start_time = now; if(iter->start_deadline==0 || iter->start_deadline>now) { // Clear g_errno so the thread/job starts with a clean slate g_errno = 0; iter->start_routine(iter->state); iter->stop_time = now_ms(); job_exit = job_exit_normal; } else { job_exit = job_exit_deadline; } pthread_mutex_lock(ptp->mtx); if(iter->is_io_job && iter->is_io_write_job) --*(ptp->num_io_write_jobs_running); //copy+delete it into the exit queue ptp->exit_set->push_back(std::make_pair(*iter,job_exit)); ptp->running_set->erase(iter); pthread_mutex_unlock(ptp->mtx); (ptp->job_done_notify)(); pthread_mutex_lock(ptp->mtx); } pthread_mutex_unlock(ptp->mtx); return 0; } } static void job_done_notify_noop() { } class ThreadPool { public: ThreadPool(const char *thread_name_prefix, unsigned num_threads, JobQueue *job_queue, RunningSet *running_set, ExitSet *exit_set, unsigned *num_io_write_jobs_running, pthread_mutex_t *mtx, job_done_notify_t job_done_notify_); void initiate_stop(); void join_all(); ~ThreadPool(); private: PoolThreadParameters ptp; std::vector<pthread_t> tid; }; ThreadPool::ThreadPool(const char *thread_name_prefix, unsigned num_threads, JobQueue *job_queue, RunningSet *running_set, ExitSet *exit_set, unsigned *num_io_write_jobs_running, pthread_mutex_t *mtx, job_done_notify_t job_done_notify_) : tid(num_threads) { job_queue->potential_worker_threads += num_threads; ptp.job_queue = job_queue; ptp.running_set = running_set; ptp.exit_set = exit_set; ptp.num_io_write_jobs_running = num_io_write_jobs_running; ptp.mtx = mtx; ptp.job_done_notify = job_done_notify_?job_done_notify_:job_done_notify_noop; ptp.stop = false; for(unsigned i=0; i<tid.size(); i++) { int rc = pthread_create(&tid[i], NULL, job_pool_thread_function, &ptp); if(rc!=0) throw std::runtime_error("pthread_create() failed"); char thread_name[16]; //hard limit snprintf(thread_name,sizeof(thread_name),"%s %u", thread_name_prefix,i); rc = pthread_setname_np(tid[i],thread_name); if(rc!=0) throw std::runtime_error("pthread_setname_np() failed"); } } void ThreadPool::initiate_stop() { for(unsigned i=0; i<tid.size(); i++) ptp.stop = true; } void ThreadPool::join_all() { for(unsigned i=0; i<tid.size(); i++) pthread_join(tid[i],NULL); tid.clear(); } ThreadPool::~ThreadPool() { join_all(); } } //anonymous namespace //////////////////////////////////////////////////////////////////////////////// // JobScheduler implementation class JobScheduler_impl { mutable pthread_mutex_t mtx; JobQueue coordinator_job_queue; JobQueue cpu_job_queue; JobQueue summary_job_queue; JobQueue io_job_queue; JobQueue external_job_queue; JobQueue file_meta_job_queue; JobQueue merge_job_queue; RunningSet running_set; ExitSet exit_set; unsigned num_io_write_jobs_running; ThreadPool coordinator_thread_pool; ThreadPool cpu_thread_pool; ThreadPool summary_thread_pool; ThreadPool io_thread_pool; ThreadPool external_thread_pool; ThreadPool file_meta_thread_pool; ThreadPool merge_thread_pool; bool no_threads; bool new_jobs_allowed; std::map<thread_type_t,JobTypeStatistics> job_statistics; bool submit(thread_type_t thread_type, JobEntry &e); void cancel_queued_jobs(JobQueue &jq, job_exit_t job_exit); public: JobScheduler_impl(unsigned num_coordinator_threads, unsigned num_cpu_threads, unsigned num_summary_threads, unsigned num_io_threads, unsigned num_external_threads, unsigned num_file_meta_threads, unsigned num_merge_threads, job_done_notify_t job_done_notify) : mtx PTHREAD_MUTEX_INITIALIZER, coordinator_job_queue(), cpu_job_queue(), summary_job_queue(), io_job_queue(), external_job_queue(), file_meta_job_queue(), merge_job_queue(), running_set(), exit_set(), num_io_write_jobs_running(0), coordinator_thread_pool("coord",num_coordinator_threads,&coordinator_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify), cpu_thread_pool("cpu",num_cpu_threads,&cpu_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify), summary_thread_pool("summary",num_summary_threads,&summary_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify), io_thread_pool("io",num_io_threads,&io_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify), external_thread_pool("ext",num_external_threads,&external_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify), file_meta_thread_pool("file",num_file_meta_threads,&file_meta_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify), merge_thread_pool("merge",num_merge_threads,&merge_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify), no_threads(num_cpu_threads==0 && num_summary_threads==0 && num_io_threads==0 && num_external_threads==0 && num_file_meta_threads==0), new_jobs_allowed(true) { } ~JobScheduler_impl(); bool submit(start_routine_t start_routine, finish_routine_t finish_callback, void *state, thread_type_t thread_type, int priority, uint64_t start_deadline=0); bool submit_io(start_routine_t start_routine, finish_routine_t finish_callback, FileState *fstate, thread_type_t thread_type, int priority, bool is_write_job, uint64_t start_deadline=0); bool are_io_write_jobs_running() const; void cancel_file_read_jobs(const BigFile *bf); //void nice page for html and administation() bool is_reading_file(const BigFile *bf); void allow_new_jobs() { new_jobs_allowed = true; } void disallow_new_jobs() { new_jobs_allowed = false; } bool are_new_jobs_allowed() const { return new_jobs_allowed && !no_threads; } void cancel_all_jobs_for_shutdown(); unsigned num_queued_jobs() const; void cleanup_finished_jobs(); std::vector<JobDigest> query_job_digests() const; std::map<thread_type_t,JobTypeStatistics> query_job_statistics(bool clear); }; JobScheduler_impl::~JobScheduler_impl() { //First prevent new jobs from being submitted new_jobs_allowed = false; //Then tell the worker threads to stop executing more jobs coordinator_thread_pool.initiate_stop(); cpu_thread_pool.initiate_stop(); summary_thread_pool.initiate_stop(); io_thread_pool.initiate_stop(); external_thread_pool.initiate_stop(); file_meta_thread_pool.initiate_stop(); merge_thread_pool.initiate_stop(); //Then wake them if they are sleeping pthread_cond_broadcast(&coordinator_job_queue.cond_not_empty); pthread_cond_broadcast(&cpu_job_queue.cond_not_empty); pthread_cond_broadcast(&summary_job_queue.cond_not_empty); pthread_cond_broadcast(&io_job_queue.cond_not_empty); pthread_cond_broadcast(&external_job_queue.cond_not_empty); pthread_cond_broadcast(&file_meta_job_queue.cond_not_empty); pthread_cond_broadcast(&merge_job_queue.cond_not_empty); //Then cancel all outstanding non-started jobs by moving them from the pending queues to the exit-set { ScopedLock sl(mtx); cancel_queued_jobs(coordinator_job_queue,job_exit_program_exit); cancel_queued_jobs(cpu_job_queue,job_exit_program_exit); cancel_queued_jobs(summary_job_queue,job_exit_program_exit); cancel_queued_jobs(io_job_queue,job_exit_program_exit); cancel_queued_jobs(external_job_queue,job_exit_program_exit); cancel_queued_jobs(file_meta_job_queue,job_exit_program_exit); cancel_queued_jobs(merge_job_queue,job_exit_program_exit); } //Call finish-callbacks for all the exited / cancelled threads //We "know" that this function (finalize) is only called from the main thread, *cough* *cough* cleanup_finished_jobs(); //Then wait for worker threads to finished coordinator_thread_pool.join_all(); cpu_thread_pool.join_all(); summary_thread_pool.join_all(); io_thread_pool.join_all(); file_meta_thread_pool.join_all(); merge_thread_pool.join_all(); external_thread_pool.join_all(); pthread_mutex_destroy(&mtx); } bool JobScheduler_impl::submit(thread_type_t thread_type, JobEntry &e) { if(!new_jobs_allowed) //note: unprotected read return false; e.thread_type = thread_type; //Determine which queue to put the job into //i/o jobs should have the is_io_job=true, but if they don't we will //just treat them as CPU-bound. All this looks over-engineered but we //need some flexibility to make experiments. JobQueue *job_queue; if(e.is_io_job) job_queue = &io_job_queue; else { switch(thread_type) { case thread_type_query_coordinator: job_queue = &coordinator_job_queue; break; case thread_type_query_read: job_queue = &cpu_job_queue; break; case thread_type_query_constrain: job_queue = &cpu_job_queue; break; case thread_type_query_merge: job_queue = &cpu_job_queue; break; case thread_type_query_intersect: job_queue = &cpu_job_queue; break; case thread_type_query_summary: job_queue = &summary_job_queue; break; case thread_type_spider_read: job_queue = &cpu_job_queue; break; case thread_type_spider_write: job_queue = &cpu_job_queue; break; case thread_type_spider_filter: job_queue = &external_job_queue; break; case thread_type_spider_query: job_queue = &cpu_job_queue; break; case thread_type_spider_index: job_queue = &cpu_job_queue; break; case thread_type_merge_filter: job_queue = &merge_job_queue; break; case thread_type_replicate_write: job_queue = &cpu_job_queue; break; case thread_type_replicate_read: job_queue = &cpu_job_queue; break; case thread_type_file_merge: job_queue = &merge_job_queue; break; case thread_type_file_meta_data: job_queue = &file_meta_job_queue;break; case thread_type_index_merge: job_queue = &cpu_job_queue; break; case thread_type_index_generate: job_queue = &merge_job_queue; break; case thread_type_verify_data: job_queue = &cpu_job_queue; break; case thread_type_statistics: job_queue = &cpu_job_queue; break; case thread_type_unspecified_io: job_queue = &cpu_job_queue; break; case thread_type_generate_thumbnail: job_queue = &external_job_queue; break; default: assert(false); } } if(job_queue->potential_worker_threads==0) return false; e.queue_enter_time = now_ms(); ScopedLock sl(mtx); job_queue->add(e); return true; } void JobScheduler_impl::cancel_queued_jobs(JobQueue &jq, job_exit_t job_exit) { while(!jq.empty()) { exit_set.push_back(std::make_pair(jq.back(),job_exit)); jq.pop_back(); } } bool JobScheduler_impl::submit(start_routine_t start_routine, finish_routine_t finish_callback, void *state, thread_type_t thread_type, int priority, uint64_t start_deadline) { JobEntry e; memset(&e, 0, sizeof(JobEntry)); e.start_routine = start_routine; e.finish_callback = finish_callback; e.state = state; e.is_io_job = false; e.start_deadline = start_deadline; e.is_io_write_job = false; e.initial_priority = priority; return submit(thread_type,e); } bool JobScheduler_impl::submit_io(start_routine_t start_routine, finish_routine_t finish_callback, FileState *fstate, thread_type_t thread_type, int priority, bool is_write_job, uint64_t start_deadline) { JobEntry e; memset(&e, 0, sizeof(JobEntry)); e.start_routine = start_routine; e.finish_callback = finish_callback; e.state = fstate; e.is_io_job = true; e.start_deadline = start_deadline; e.is_io_write_job = is_write_job; e.initial_priority = priority; return submit(thread_type,e); } bool JobScheduler_impl::are_io_write_jobs_running() const { ScopedLock sl(mtx); return num_io_write_jobs_running != 0; } void JobScheduler_impl::cancel_file_read_jobs(const BigFile *bf) { ScopedLock sl(mtx); for(JobQueue::iterator iter = io_job_queue.begin(); iter!=io_job_queue.end(); ) { if(iter->is_io_job && !iter->is_io_write_job) { const FileState *fstate = reinterpret_cast<const FileState*>(iter->state); if(fstate->m_bigfile==bf) { exit_set.push_back(std::make_pair(*iter,job_exit_cancelled)); iter = io_job_queue.erase(iter); continue; } } ++iter; } } bool JobScheduler_impl::is_reading_file(const BigFile *bf) { //The old thread stuff tested explicitly if the start_routine was //readwriteWrapper_r() in BigFile.cpp but that is fragile. Besides, //we have the 'is_io_write_job' field. ScopedLock sl(mtx); for(const auto &e : io_job_queue) { if(e.is_io_job && !e.is_io_write_job) { const FileState *fstate = reinterpret_cast<const FileState*>(e.state); if(fstate->m_bigfile==bf) return true; } } for(const auto &e : running_set) { if(e.is_io_job && !e.is_io_write_job) { const FileState *fstate = reinterpret_cast<const FileState*>(e.state); if(fstate->m_bigfile==bf) return true; } } return false; } void JobScheduler_impl::cancel_all_jobs_for_shutdown() { ScopedLock sl(mtx); cancel_queued_jobs(coordinator_job_queue,job_exit_program_exit); cancel_queued_jobs(cpu_job_queue,job_exit_program_exit); cancel_queued_jobs(summary_job_queue,job_exit_program_exit); cancel_queued_jobs(io_job_queue,job_exit_program_exit); cancel_queued_jobs(external_job_queue,job_exit_program_exit); cancel_queued_jobs(file_meta_job_queue,job_exit_program_exit); cancel_queued_jobs(merge_job_queue,job_exit_program_exit); } unsigned JobScheduler_impl::num_queued_jobs() const { ScopedLock sl(mtx); return cpu_job_queue.size() + summary_job_queue.size() + io_job_queue.size() + external_job_queue.size() + file_meta_job_queue.size() + merge_job_queue.size(); } void JobScheduler_impl::cleanup_finished_jobs() { ExitSet es; ScopedLock sl(mtx); es.swap(exit_set); sl.unlock(); for(auto e : es) { e.first.finish_time = now_ms(); if(e.first.finish_callback) e.first.finish_callback(e.first.state,e.second); e.first.exit_time = now_ms(); JobTypeStatistics &s = job_statistics[e.first.thread_type]; s.job_count++; s.queue_time += e.first.start_time - e.first.queue_enter_time; s.running_time += e.first.stop_time - e.first.start_time; s.done_time += e.first.finish_time - e.first.stop_time; s.cleanup_time += e.first.exit_time - e.first.finish_time; } } static JobDigest job_entry_to_job_digest(const JobEntry& je, JobDigest::job_state_t job_state) { JobDigest jd; jd.thread_type = je.thread_type; jd.start_routine = je.start_routine; jd.finish_callback = je.finish_callback; jd.job_state = job_state; jd.start_deadline = je.start_deadline; jd.queue_enter_time = je.queue_enter_time; jd.start_time = je.start_time; jd.stop_time = je.stop_time; return jd; } std::vector<JobDigest> JobScheduler_impl::query_job_digests() const { std::vector<JobDigest> v; ScopedLock sl(mtx); //v.reserve(cpu_job_queue.size() + io_job_queue.size() + external_job_queue.size() + file_meta_job_queue.size() + running_set().size() + exit_set().size()); for(const auto &je : coordinator_job_queue) v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued)); for(const auto &je : cpu_job_queue) v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued)); for(const auto &je : summary_job_queue) v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued)); for(const auto &je : io_job_queue) v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued)); for(const auto &je : external_job_queue) v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued)); for(const auto &je : file_meta_job_queue) v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued)); for(const auto &je : merge_job_queue) v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued)); for(const auto &je : running_set) v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_running)); for(const auto &je : exit_set) v.push_back(job_entry_to_job_digest(je.first,JobDigest::job_state_stopped)); return v; } std::map<thread_type_t,JobTypeStatistics> JobScheduler_impl::query_job_statistics(bool clear) { if(clear) { std::map<thread_type_t,JobTypeStatistics> tmp; tmp.swap(job_statistics); return tmp; } else return job_statistics; } //////////////////////////////////////////////////////////////////////////////// // bridge JobScheduler::JobScheduler() : impl(0) { } JobScheduler::~JobScheduler() { delete impl; impl = 0; } bool JobScheduler::initialize(unsigned num_coordinator_threads, unsigned num_cpu_threads, unsigned num_summary_threads, unsigned num_io_threads, unsigned num_external_threads, unsigned num_file_meta_threads, unsigned num_merge_threads, job_done_notify_t job_done_notify) { assert(!impl); impl = new JobScheduler_impl(num_coordinator_threads,num_cpu_threads,num_summary_threads,num_io_threads,num_external_threads,num_file_meta_threads,num_merge_threads,job_done_notify); return true; } void JobScheduler::finalize() { assert(impl); delete impl; impl = 0; } bool JobScheduler::submit(start_routine_t start_routine, finish_routine_t finish_callback, void *state, thread_type_t thread_type, int priority, uint64_t start_deadline) { if(impl) return impl->submit(start_routine,finish_callback,state,thread_type,priority,start_deadline); else return false; } bool JobScheduler::submit_io(start_routine_t start_routine, finish_routine_t finish_callback, FileState *fstate, thread_type_t thread_type, int priority, bool is_write_job, uint64_t start_deadline) { if(impl) return impl->submit_io(start_routine,finish_callback,fstate,thread_type,priority,is_write_job,start_deadline); else return false; } bool JobScheduler::are_io_write_jobs_running() const { if(impl) return impl->are_io_write_jobs_running(); else return false; } void JobScheduler::cancel_file_read_jobs(const BigFile *bf) { if(impl) impl->cancel_file_read_jobs(bf); } //void nice page for html and administation() bool JobScheduler::is_reading_file(const BigFile *bf) { if(impl) return impl->is_reading_file(bf); else return false; } void JobScheduler::allow_new_jobs() { if(impl) impl->allow_new_jobs(); } void JobScheduler::disallow_new_jobs() { if(impl) impl->disallow_new_jobs(); } bool JobScheduler::are_new_jobs_allowed() const { if(impl) return impl->are_new_jobs_allowed(); else return false; } void JobScheduler::cancel_all_jobs_for_shutdown() { if(impl) impl->cancel_all_jobs_for_shutdown(); } unsigned JobScheduler::num_queued_jobs() const { if(impl) return impl->num_queued_jobs(); else return 0; } void JobScheduler::cleanup_finished_jobs() { if(impl) impl->cleanup_finished_jobs(); } std::vector<JobDigest> JobScheduler::query_job_digests() const { if(impl) return impl->query_job_digests(); else return std::vector<JobDigest>(); } std::map<thread_type_t,JobTypeStatistics> JobScheduler::query_job_statistics(bool clear) { if(impl) return impl->query_job_statistics(clear); else return std::map<thread_type_t,JobTypeStatistics>(); } //////////////////////////////////////////////////////////////////////////////// // The global one-and-only scheduler JobScheduler g_jobScheduler;