forked from Mirrors/privacore-open-source-search-engine
		
	
		
			
				
	
	
		
			799 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			799 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| #include "JobScheduler.h"
 | |
| #include "ScopedLock.h"
 | |
| #include "BigFile.h" //for FileState definition
 | |
| #include "Errno.h"
 | |
| #include <pthread.h>
 | |
| #include <vector>
 | |
| #include <list>
 | |
| #include <algorithm>
 | |
| #include <stdexcept>
 | |
| #include <assert.h>
 | |
| #include <sys/time.h>
 | |
| 
 | |
| 
 | |
| #include <stdio.h>
 | |
| 
 | |
| namespace {
 | |
| 
 | |
| 
 | |
| //return current time expressed as milliseconds since 1970
 | |
| static uint64_t now_ms() {
 | |
| 	struct timeval tv;
 | |
| 	gettimeofday(&tv,0);
 | |
| 	return tv.tv_sec*1000 + tv.tv_usec/1000;
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
| //A job (queued, running or finished)
 | |
| struct JobEntry {
 | |
| 	start_routine_t   start_routine;
 | |
| 	finish_routine_t  finish_callback;
 | |
| 	void             *state;
 | |
| 	
 | |
| 	bool              is_io_job;
 | |
| 	
 | |
| 	//for scheduling:
 | |
| 	uint64_t          start_deadline;     //latest time when this job must be started
 | |
| 	bool              is_io_write_job;    //valid for I/O jobs: mostly read or mostly write?
 | |
| 	int               initial_priority;   //priority when queued
 | |
| 	
 | |
| 	//for statistics:
 | |
| 	thread_type_t     thread_type;
 | |
| 	uint64_t          queue_enter_time;   //when this job was queued
 | |
| 	uint64_t          start_time;	      //when this job started running
 | |
| 	uint64_t          stop_time;	      //when this job stopped running
 | |
| 	uint64_t          finish_time;        //when the finish callback was called
 | |
| 	uint64_t          exit_time;	      //when this job was finished, including finish-callback
 | |
| };
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
| //a set of jobs, prioritized
 | |
| class JobQueue : public std::vector<JobEntry> {
 | |
| public:
 | |
| 	pthread_cond_t cond_not_empty;
 | |
| 	unsigned potential_worker_threads;
 | |
| 
 | |
| 	JobQueue()
 | |
| 	  : vector(),
 | |
| 	    cond_not_empty PTHREAD_COND_INITIALIZER,
 | |
| 	    potential_worker_threads(0)
 | |
| 	{
 | |
| 	}
 | |
| 	
 | |
| 	~JobQueue() {
 | |
| 		pthread_cond_destroy(&cond_not_empty);
 | |
| 	}
 | |
| 	
 | |
| 	void add(const JobEntry &e) {
 | |
| 		push_back(e);
 | |
| 		pthread_cond_signal(&cond_not_empty);
 | |
| 	}
 | |
| 	
 | |
| 	JobEntry pop_top_priority();
 | |
| 	
 | |
| 	
 | |
| };
 | |
| 
 | |
| 
 | |
| JobEntry JobQueue::pop_top_priority()
 | |
| {
 | |
| 	assert(!empty());
 | |
| 	//todo: age old entries
 | |
| 	std::vector<JobEntry>::iterator best_iter = begin();
 | |
| 	std::vector<JobEntry>::iterator iter = best_iter;
 | |
| 	++iter;
 | |
| 	for( ; iter!=end(); ++iter)
 | |
| 		if(iter->initial_priority<best_iter->initial_priority)
 | |
| 			best_iter = iter;
 | |
| 	JobEntry tmp = *best_iter;
 | |
| 	erase(best_iter);
 | |
| 	return tmp;
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| typedef std::vector<std::pair<JobEntry,job_exit_t>> ExitSet;
 | |
| typedef std::list<JobEntry> RunningSet;
 | |
| 
 | |
| 
 | |
| //parameters given to a pool thread
 | |
| struct PoolThreadParameters {
 | |
| 	JobQueue          *job_queue;                 //queue to fetch jobs from
 | |
| 	RunningSet        *running_set;               //set to store the job in while executing it
 | |
| 	ExitSet           *exit_set;                  //set to store the finished job+exit-cause in
 | |
| 	unsigned          *num_io_write_jobs_running; //global counter for scheduling
 | |
| 	pthread_mutex_t   *mtx;                       //mutex covering above 3 containers
 | |
| 	job_done_notify_t job_done_notify;            //notifycation callback whenever a job returns
 | |
| 	bool              stop;
 | |
| };
 | |
| 
 | |
| 
 | |
| extern "C" {
 | |
| static void *job_pool_thread_function(void *pv) {
 | |
| 	PoolThreadParameters *ptp= static_cast<PoolThreadParameters*>(pv);
 | |
| 	pthread_mutex_lock(ptp->mtx);
 | |
| 	while(!ptp->stop) {
 | |
| 		if(ptp->job_queue->empty())
 | |
| 			pthread_cond_wait(&ptp->job_queue->cond_not_empty,ptp->mtx);
 | |
| 		if(ptp->stop)
 | |
| 			break;
 | |
| 		if(ptp->job_queue->empty()) //spurious wakeup
 | |
| 			continue;
 | |
| 		
 | |
| 		//take the top-priority job and move it into the running set
 | |
| 		RunningSet::iterator iter = ptp->running_set->insert(ptp->running_set->begin(),ptp->job_queue->pop_top_priority());
 | |
| 		if(iter->is_io_job && iter->is_io_write_job)
 | |
| 			++*(ptp->num_io_write_jobs_running);
 | |
| 		pthread_mutex_unlock(ptp->mtx);
 | |
| 		
 | |
| 		job_exit_t job_exit;
 | |
| 		uint64_t now = now_ms();
 | |
| 		iter->start_time = now;
 | |
| 		if(iter->start_deadline==0 || iter->start_deadline>now) {
 | |
| 			// Clear g_errno so the thread/job starts with a clean slate
 | |
| 			g_errno = 0;
 | |
| 			iter->start_routine(iter->state);
 | |
| 			iter->stop_time = now_ms();
 | |
| 			job_exit = job_exit_normal;
 | |
| 		} else {
 | |
| 			job_exit = job_exit_deadline;
 | |
| 		}
 | |
| 		
 | |
| 		pthread_mutex_lock(ptp->mtx);
 | |
| 		if(iter->is_io_job && iter->is_io_write_job)
 | |
| 			--*(ptp->num_io_write_jobs_running);
 | |
| 		//copy+delete it into the exit queue
 | |
| 		ptp->exit_set->push_back(std::make_pair(*iter,job_exit));
 | |
| 		ptp->running_set->erase(iter);
 | |
| 		pthread_mutex_unlock(ptp->mtx);
 | |
| 		
 | |
| 		(ptp->job_done_notify)();
 | |
| 		
 | |
| 		pthread_mutex_lock(ptp->mtx);
 | |
| 	}
 | |
| 	
 | |
| 	pthread_mutex_unlock(ptp->mtx);
 | |
| 	return 0;
 | |
| }
 | |
| }
 | |
| 
 | |
| 
 | |
| static void job_done_notify_noop() {
 | |
| }
 | |
| 
 | |
| 
 | |
| class ThreadPool {
 | |
| public:
 | |
| 	ThreadPool(const char *thread_name_prefix,
 | |
| 	           unsigned num_threads,
 | |
| 	           JobQueue *job_queue, RunningSet *running_set, ExitSet *exit_set,
 | |
| 		   unsigned *num_io_write_jobs_running,
 | |
| 		   pthread_mutex_t *mtx,
 | |
| 		   job_done_notify_t job_done_notify_);
 | |
| 	void initiate_stop();
 | |
| 	void join_all();
 | |
| 	~ThreadPool();
 | |
| 	
 | |
| private:
 | |
| 	PoolThreadParameters ptp;
 | |
| 	std::vector<pthread_t> tid;
 | |
| };
 | |
| 
 | |
| 
 | |
| ThreadPool::ThreadPool(const char *thread_name_prefix,
 | |
|                        unsigned num_threads,
 | |
|                        JobQueue *job_queue, RunningSet *running_set, ExitSet *exit_set,
 | |
| 		       unsigned *num_io_write_jobs_running,
 | |
| 		       pthread_mutex_t *mtx,
 | |
|                        job_done_notify_t job_done_notify_)
 | |
|   : tid(num_threads)
 | |
| {
 | |
| 	job_queue->potential_worker_threads += num_threads;
 | |
| 	ptp.job_queue = job_queue;
 | |
| 	ptp.running_set = running_set;
 | |
| 	ptp.exit_set = exit_set;
 | |
| 	ptp.num_io_write_jobs_running = num_io_write_jobs_running;
 | |
| 	ptp.mtx = mtx;
 | |
| 	ptp.job_done_notify = job_done_notify_?job_done_notify_:job_done_notify_noop;
 | |
| 	ptp.stop = false;
 | |
| 	for(unsigned i=0; i<tid.size(); i++) {
 | |
| 		int rc = pthread_create(&tid[i], NULL, job_pool_thread_function, &ptp);
 | |
| 		if(rc!=0)
 | |
| 			throw std::runtime_error("pthread_create() failed");
 | |
| 		char thread_name[16]; //hard limit
 | |
| 		snprintf(thread_name,sizeof(thread_name),"%s %u", thread_name_prefix,i);
 | |
| 		rc = pthread_setname_np(tid[i],thread_name);
 | |
| 		if(rc!=0)
 | |
| 			throw std::runtime_error("pthread_setname_np() failed");
 | |
| 	}
 | |
| }
 | |
| 
 | |
| 
 | |
| void ThreadPool::initiate_stop()
 | |
| {
 | |
| 	for(unsigned i=0; i<tid.size(); i++)
 | |
| 		ptp.stop = true;
 | |
| }
 | |
| 
 | |
| void ThreadPool::join_all()
 | |
| {
 | |
| 	for(unsigned i=0; i<tid.size(); i++)
 | |
| 		pthread_join(tid[i],NULL);
 | |
| 	tid.clear();
 | |
| }
 | |
| 
 | |
| ThreadPool::~ThreadPool()
 | |
| {
 | |
| 	join_all();
 | |
| }
 | |
| 
 | |
| } //anonymous namespace
 | |
| 
 | |
| 
 | |
| 
 | |
| ////////////////////////////////////////////////////////////////////////////////
 | |
| // JobScheduler implementation
 | |
| 
 | |
| class JobScheduler_impl {
 | |
| 	mutable pthread_mutex_t mtx;
 | |
| 	
 | |
| 	JobQueue   coordinator_job_queue;
 | |
| 	JobQueue   cpu_job_queue;
 | |
| 	JobQueue   summary_job_queue;
 | |
| 	JobQueue   io_job_queue;
 | |
| 	JobQueue   external_job_queue;
 | |
| 	JobQueue   file_meta_job_queue;
 | |
| 	JobQueue   merge_job_queue;
 | |
| 	
 | |
| 	RunningSet running_set;
 | |
| 	
 | |
| 	ExitSet    exit_set;
 | |
| 	
 | |
| 	unsigned   num_io_write_jobs_running;
 | |
| 	
 | |
| 	ThreadPool coordinator_thread_pool;
 | |
| 	ThreadPool cpu_thread_pool;
 | |
| 	ThreadPool summary_thread_pool;
 | |
| 	ThreadPool io_thread_pool;
 | |
| 	ThreadPool external_thread_pool;
 | |
| 	ThreadPool file_meta_thread_pool;
 | |
| 	ThreadPool merge_thread_pool;
 | |
| 	
 | |
| 	bool no_threads;
 | |
| 	bool new_jobs_allowed;
 | |
| 	
 | |
| 	std::map<thread_type_t,JobTypeStatistics> job_statistics;
 | |
| 	
 | |
| 	bool submit(thread_type_t thread_type, JobEntry &e);
 | |
| 	
 | |
| 	void cancel_queued_jobs(JobQueue &jq, job_exit_t job_exit);
 | |
| public:
 | |
| 	JobScheduler_impl(unsigned num_coordinator_threads, unsigned num_cpu_threads, unsigned num_summary_threads, unsigned num_io_threads, unsigned num_external_threads, unsigned num_file_meta_threads, unsigned num_merge_threads, job_done_notify_t job_done_notify)
 | |
| 	  : mtx PTHREAD_MUTEX_INITIALIZER,
 | |
| 	    coordinator_job_queue(),
 | |
| 	    cpu_job_queue(),
 | |
| 	    summary_job_queue(),
 | |
| 	    io_job_queue(),
 | |
| 	    external_job_queue(),
 | |
| 	    file_meta_job_queue(),
 | |
| 	    merge_job_queue(),
 | |
| 	    running_set(),
 | |
| 	    exit_set(),
 | |
| 	    num_io_write_jobs_running(0),
 | |
| 	    coordinator_thread_pool("coord",num_coordinator_threads,&coordinator_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
 | |
| 	    cpu_thread_pool("cpu",num_cpu_threads,&cpu_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
 | |
| 	    summary_thread_pool("summary",num_summary_threads,&summary_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
 | |
| 	    io_thread_pool("io",num_io_threads,&io_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
 | |
| 	    external_thread_pool("ext",num_external_threads,&external_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
 | |
| 	    file_meta_thread_pool("file",num_file_meta_threads,&file_meta_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
 | |
| 	    merge_thread_pool("merge",num_merge_threads,&merge_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
 | |
| 	    no_threads(num_cpu_threads==0 && num_summary_threads==0 && num_io_threads==0 && num_external_threads==0 && num_file_meta_threads==0),
 | |
| 	    new_jobs_allowed(true)
 | |
| 	{
 | |
| 	}
 | |
| 	
 | |
| 	~JobScheduler_impl();
 | |
| 	
 | |
| 	bool submit(start_routine_t   start_routine,
 | |
| 	            finish_routine_t  finish_callback,
 | |
| 		    void             *state,
 | |
| 		    thread_type_t     thread_type,
 | |
| 		    int               priority,
 | |
| 		    uint64_t          start_deadline=0);
 | |
| 	bool submit_io(start_routine_t   start_routine,
 | |
| 	               finish_routine_t  finish_callback,
 | |
| 		       FileState        *fstate,
 | |
| 		       thread_type_t     thread_type,
 | |
| 		       int               priority,
 | |
| 		       bool              is_write_job,
 | |
| 		       uint64_t          start_deadline=0);
 | |
| 	
 | |
| 	bool are_io_write_jobs_running() const;
 | |
| 	void cancel_file_read_jobs(const BigFile *bf);
 | |
| 	//void nice page for html and administation()
 | |
| 	bool is_reading_file(const BigFile *bf);
 | |
| 	
 | |
| 	void allow_new_jobs() {
 | |
| 		new_jobs_allowed = true;
 | |
| 	}
 | |
| 	void disallow_new_jobs() {
 | |
| 		new_jobs_allowed = false;
 | |
| 	}
 | |
| 	bool are_new_jobs_allowed() const {
 | |
| 		return new_jobs_allowed && !no_threads;
 | |
| 	}
 | |
| 	void cancel_all_jobs_for_shutdown();
 | |
| 	
 | |
| 	unsigned num_queued_jobs() const;
 | |
| 	
 | |
| 	void cleanup_finished_jobs();
 | |
| 	
 | |
| 	std::vector<JobDigest> query_job_digests() const;
 | |
| 	std::map<thread_type_t,JobTypeStatistics> query_job_statistics(bool clear);
 | |
| };
 | |
| 
 | |
| 
 | |
| 
 | |
| JobScheduler_impl::~JobScheduler_impl() {
 | |
| 	//First prevent new jobs from being submitted
 | |
| 	new_jobs_allowed = false;
 | |
| 
 | |
| 	//Then tell the worker threads to stop executing more jobs
 | |
| 	coordinator_thread_pool.initiate_stop();
 | |
| 	cpu_thread_pool.initiate_stop();
 | |
| 	summary_thread_pool.initiate_stop();
 | |
| 	io_thread_pool.initiate_stop();
 | |
| 	external_thread_pool.initiate_stop();
 | |
| 	file_meta_thread_pool.initiate_stop();
 | |
| 	merge_thread_pool.initiate_stop();
 | |
| 
 | |
| 	//Then wake them if they are sleeping
 | |
| 	pthread_cond_broadcast(&coordinator_job_queue.cond_not_empty);
 | |
| 	pthread_cond_broadcast(&cpu_job_queue.cond_not_empty);
 | |
| 	pthread_cond_broadcast(&summary_job_queue.cond_not_empty);
 | |
| 	pthread_cond_broadcast(&io_job_queue.cond_not_empty);
 | |
| 	pthread_cond_broadcast(&external_job_queue.cond_not_empty);
 | |
| 	pthread_cond_broadcast(&file_meta_job_queue.cond_not_empty);
 | |
| 	pthread_cond_broadcast(&merge_job_queue.cond_not_empty);
 | |
| 
 | |
| 	//Then cancel all outstanding non-started jobs by moving them from the pending queues to the exit-set
 | |
| 	{
 | |
| 		ScopedLock sl(mtx);
 | |
| 		cancel_queued_jobs(coordinator_job_queue,job_exit_program_exit);
 | |
| 		cancel_queued_jobs(cpu_job_queue,job_exit_program_exit);
 | |
| 		cancel_queued_jobs(summary_job_queue,job_exit_program_exit);
 | |
| 		cancel_queued_jobs(io_job_queue,job_exit_program_exit);
 | |
| 		cancel_queued_jobs(external_job_queue,job_exit_program_exit);
 | |
| 		cancel_queued_jobs(file_meta_job_queue,job_exit_program_exit);
 | |
| 		cancel_queued_jobs(merge_job_queue,job_exit_program_exit);
 | |
| 	}
 | |
| 
 | |
| 	//Call finish-callbacks for all the exited / cancelled threads
 | |
| 	//We "know" that this function (finalize) is only called from the main thread, *cough* *cough*
 | |
| 	cleanup_finished_jobs();
 | |
| 	
 | |
| 	//Then wait  for worker threads to finished
 | |
| 	coordinator_thread_pool.join_all();
 | |
| 	cpu_thread_pool.join_all();
 | |
| 	summary_thread_pool.join_all();
 | |
| 	io_thread_pool.join_all();
 | |
| 	file_meta_thread_pool.join_all();
 | |
| 	merge_thread_pool.join_all();
 | |
| 	external_thread_pool.join_all();
 | |
| 
 | |
| 	pthread_mutex_destroy(&mtx);
 | |
| }
 | |
| 
 | |
| 
 | |
| bool JobScheduler_impl::submit(thread_type_t thread_type, JobEntry &e)
 | |
| {
 | |
| 	if(!new_jobs_allowed) //note: unprotected read
 | |
| 		return false;
 | |
| 	e.thread_type = thread_type;
 | |
| 	
 | |
| 	//Determine which queue to put the job into
 | |
| 	//i/o jobs should have the is_io_job=true, but if they don't we will
 | |
| 	//just treat them as CPU-bound. All this looks over-engineered but we
 | |
| 	//need some flexibility to make experiments.
 | |
| 	JobQueue *job_queue;
 | |
| 	if(e.is_io_job)
 | |
| 		job_queue = &io_job_queue;
 | |
| 	else {
 | |
| 		switch(thread_type) {
 | |
| 			case thread_type_query_coordinator:  job_queue = &coordinator_job_queue; break;
 | |
| 			case thread_type_query_read:         job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_query_constrain:    job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_query_merge:        job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_query_intersect:    job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_query_summary:      job_queue = &summary_job_queue;  break;
 | |
| 			case thread_type_spider_read:        job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_spider_write:       job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_spider_filter:      job_queue = &external_job_queue; break;
 | |
| 			case thread_type_spider_query:       job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_spider_index:       job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_merge_filter:       job_queue = &merge_job_queue;      break;
 | |
| 			case thread_type_replicate_write:    job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_replicate_read:     job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_file_merge:         job_queue = &merge_job_queue;    break;
 | |
| 			case thread_type_file_meta_data:     job_queue = &file_meta_job_queue;break;
 | |
| 			case thread_type_index_merge:        job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_index_generate:     job_queue = &merge_job_queue;    break;
 | |
| 			case thread_type_verify_data:        job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_statistics:         job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_unspecified_io:     job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_generate_thumbnail: job_queue = &external_job_queue; break;
 | |
| 			case thread_type_config_load:        job_queue = &cpu_job_queue;      break;
 | |
| 			case thread_type_page_process:       job_queue = &cpu_job_queue;      break;
 | |
| 			default:
 | |
| 				assert(false);
 | |
| 
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	if(job_queue->potential_worker_threads==0)
 | |
| 		return false;
 | |
| 	
 | |
| 	e.queue_enter_time = now_ms();
 | |
| 	ScopedLock sl(mtx);
 | |
| 	job_queue->add(e);
 | |
| 	return true;
 | |
| }
 | |
| 
 | |
| 
 | |
| void JobScheduler_impl::cancel_queued_jobs(JobQueue &jq, job_exit_t job_exit) {
 | |
| 	while(!jq.empty()) {
 | |
| 		exit_set.push_back(std::make_pair(jq.back(),job_exit));
 | |
| 		jq.pop_back();
 | |
| 	}
 | |
| }
 | |
| 
 | |
| 
 | |
| bool JobScheduler_impl::submit(start_routine_t   start_routine,
 | |
|                                finish_routine_t  finish_callback,
 | |
|                                void             *state,
 | |
|                                thread_type_t     thread_type,
 | |
|                                int               priority,
 | |
|                                uint64_t          start_deadline)
 | |
| {
 | |
| 	JobEntry e;
 | |
| 	
 | |
| 	memset(&e, 0, sizeof(JobEntry));
 | |
| 	e.start_routine = start_routine;
 | |
| 	e.finish_callback = finish_callback;
 | |
| 	e.state = state;
 | |
| 	e.is_io_job = false;
 | |
| 	e.start_deadline = start_deadline;
 | |
| 	e.is_io_write_job = false;
 | |
| 	e.initial_priority = priority;
 | |
| 	return submit(thread_type,e);
 | |
| }
 | |
| 
 | |
| bool JobScheduler_impl::submit_io(start_routine_t   start_routine,
 | |
|                                   finish_routine_t  finish_callback,
 | |
|                                   FileState        *fstate,
 | |
|                                   thread_type_t     thread_type,
 | |
|                                   int               priority,
 | |
|                                   bool              is_write_job,
 | |
|                                   uint64_t          start_deadline)
 | |
| {
 | |
| 	JobEntry e;
 | |
| 	
 | |
| 	memset(&e, 0, sizeof(JobEntry));
 | |
| 	e.start_routine = start_routine;
 | |
| 	e.finish_callback = finish_callback;
 | |
| 	e.state = fstate;
 | |
| 	e.is_io_job = true;
 | |
| 	e.start_deadline = start_deadline;
 | |
| 	e.is_io_write_job = is_write_job;
 | |
| 	e.initial_priority = priority;
 | |
| 	return submit(thread_type,e);
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
| bool JobScheduler_impl::are_io_write_jobs_running() const
 | |
| {
 | |
| 	ScopedLock sl(mtx);
 | |
| 	return num_io_write_jobs_running != 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| void JobScheduler_impl::cancel_file_read_jobs(const BigFile *bf)
 | |
| {
 | |
| 	ScopedLock sl(mtx);
 | |
| 	for(JobQueue::iterator iter = io_job_queue.begin(); iter!=io_job_queue.end(); ) {
 | |
| 		if(iter->is_io_job && !iter->is_io_write_job) {
 | |
| 			const FileState *fstate = reinterpret_cast<const FileState*>(iter->state);
 | |
| 			if(fstate->m_bigfile==bf) {
 | |
| 				exit_set.push_back(std::make_pair(*iter,job_exit_cancelled));
 | |
| 				iter = io_job_queue.erase(iter);
 | |
| 				continue;
 | |
| 			}
 | |
| 		}
 | |
| 		++iter;
 | |
| 	}
 | |
| }
 | |
| 
 | |
| 
 | |
| bool JobScheduler_impl::is_reading_file(const BigFile *bf)
 | |
| {
 | |
| 	//The old thread stuff tested explicitly if the start_routine was
 | |
| 	//readwriteWrapper_r() in BigFile.cpp but that is fragile. Besides,
 | |
| 	//we have the 'is_io_write_job' field.
 | |
| 	ScopedLock sl(mtx);
 | |
| 	for(const auto &e : io_job_queue) {
 | |
| 		if(e.is_io_job && !e.is_io_write_job) {
 | |
| 			const FileState *fstate = reinterpret_cast<const FileState*>(e.state);
 | |
| 			if(fstate->m_bigfile==bf)
 | |
| 				return true;
 | |
| 		}
 | |
| 	}
 | |
| 	for(const auto &e : running_set) {
 | |
| 		if(e.is_io_job && !e.is_io_write_job) {
 | |
| 			const FileState *fstate = reinterpret_cast<const FileState*>(e.state);
 | |
| 			if(fstate->m_bigfile==bf)
 | |
| 				return true;
 | |
| 		}
 | |
| 	}
 | |
| 	return false;
 | |
| }
 | |
| 
 | |
| 
 | |
| void JobScheduler_impl::cancel_all_jobs_for_shutdown() {
 | |
| 	ScopedLock sl(mtx);
 | |
| 	cancel_queued_jobs(coordinator_job_queue,job_exit_program_exit);
 | |
| 	cancel_queued_jobs(cpu_job_queue,job_exit_program_exit);
 | |
| 	cancel_queued_jobs(summary_job_queue,job_exit_program_exit);
 | |
| 	cancel_queued_jobs(io_job_queue,job_exit_program_exit);
 | |
| 	cancel_queued_jobs(external_job_queue,job_exit_program_exit);
 | |
| 	cancel_queued_jobs(file_meta_job_queue,job_exit_program_exit);
 | |
| 	cancel_queued_jobs(merge_job_queue,job_exit_program_exit);
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| unsigned JobScheduler_impl::num_queued_jobs() const
 | |
| {
 | |
| 	ScopedLock sl(mtx);
 | |
| 	return cpu_job_queue.size() + summary_job_queue.size() + io_job_queue.size() + external_job_queue.size() + file_meta_job_queue.size() + merge_job_queue.size();
 | |
| }
 | |
| 
 | |
| void JobScheduler_impl::cleanup_finished_jobs()
 | |
| {
 | |
| 	ExitSet es;
 | |
| 	ScopedLock sl(mtx);
 | |
| 	es.swap(exit_set);
 | |
| 	sl.unlock();
 | |
| 	
 | |
| 	for(auto e : es) {
 | |
| 		e.first.finish_time = now_ms();
 | |
| 		if(e.first.finish_callback)
 | |
| 			e.first.finish_callback(e.first.state,e.second);
 | |
| 		e.first.exit_time = now_ms();
 | |
| 		
 | |
| 		JobTypeStatistics &s = job_statistics[e.first.thread_type];
 | |
| 		s.job_count++;
 | |
| 		s.queue_time += e.first.start_time - e.first.queue_enter_time;
 | |
| 		s.running_time += e.first.stop_time - e.first.start_time;
 | |
| 		s.done_time += e.first.finish_time - e.first.stop_time;
 | |
| 		s.cleanup_time += e.first.exit_time - e.first.finish_time;
 | |
| 	}
 | |
| }
 | |
| 
 | |
| 
 | |
| static JobDigest job_entry_to_job_digest(const JobEntry& je, JobDigest::job_state_t job_state)
 | |
| {
 | |
| 	JobDigest jd;
 | |
| 	jd.thread_type = je.thread_type;
 | |
| 	jd.start_routine = je.start_routine;
 | |
| 	jd.finish_callback = je.finish_callback;
 | |
| 	jd.job_state = job_state;
 | |
| 	jd.start_deadline = je.start_deadline;
 | |
| 	jd.queue_enter_time = je.queue_enter_time;
 | |
| 	jd.start_time = je.start_time;
 | |
| 	jd.stop_time = je.stop_time;
 | |
| 	return jd;
 | |
| }
 | |
| 
 | |
| 
 | |
| std::vector<JobDigest> JobScheduler_impl::query_job_digests() const
 | |
| {
 | |
| 	std::vector<JobDigest> v;
 | |
| 	ScopedLock sl(mtx);
 | |
| 	//v.reserve(cpu_job_queue.size() + io_job_queue.size() + external_job_queue.size() + file_meta_job_queue.size() + running_set().size() + exit_set().size());
 | |
| 	for(const auto &je : coordinator_job_queue)
 | |
| 		v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
 | |
| 	for(const auto &je : cpu_job_queue)
 | |
| 		v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
 | |
| 	for(const auto &je : summary_job_queue)
 | |
| 		v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
 | |
| 	for(const auto &je : io_job_queue)
 | |
| 		v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
 | |
| 	for(const auto &je : external_job_queue)
 | |
| 		v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
 | |
| 	for(const auto &je : file_meta_job_queue)
 | |
| 		v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
 | |
| 	for(const auto &je : merge_job_queue)
 | |
| 		v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
 | |
| 	for(const auto &je : running_set)
 | |
| 		v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_running));
 | |
| 	for(const auto &je : exit_set)
 | |
| 		v.push_back(job_entry_to_job_digest(je.first,JobDigest::job_state_stopped));
 | |
| 	return v;
 | |
| 	
 | |
| }
 | |
| 
 | |
| 
 | |
| std::map<thread_type_t,JobTypeStatistics> JobScheduler_impl::query_job_statistics(bool clear)
 | |
| {
 | |
| 	if(clear) {
 | |
| 		std::map<thread_type_t,JobTypeStatistics> tmp;
 | |
| 		tmp.swap(job_statistics);
 | |
| 		return tmp;
 | |
| 	} else
 | |
| 		return job_statistics;
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
| ////////////////////////////////////////////////////////////////////////////////
 | |
| // bridge
 | |
| 
 | |
| JobScheduler::JobScheduler()
 | |
|   : impl(0)
 | |
| {
 | |
| }
 | |
| 
 | |
| 
 | |
| JobScheduler::~JobScheduler()
 | |
| {
 | |
| 	delete impl;
 | |
| 	impl = 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| bool JobScheduler::initialize(unsigned num_coordinator_threads, unsigned num_cpu_threads, unsigned num_summary_threads, unsigned num_io_threads, unsigned num_external_threads, unsigned num_file_meta_threads, unsigned num_merge_threads, job_done_notify_t job_done_notify)
 | |
| {
 | |
| 	assert(!impl);
 | |
| 	impl = new JobScheduler_impl(num_coordinator_threads,num_cpu_threads,num_summary_threads,num_io_threads,num_external_threads,num_file_meta_threads,num_merge_threads,job_done_notify);
 | |
| 	return true;
 | |
| }
 | |
| 
 | |
| 
 | |
| void JobScheduler::finalize()
 | |
| {
 | |
| 	assert(impl);
 | |
| 	delete impl;
 | |
| 	impl = 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| bool JobScheduler::submit(start_routine_t   start_routine,
 | |
|                           finish_routine_t  finish_callback,
 | |
|                           void             *state,
 | |
|                           thread_type_t     thread_type,
 | |
|                           int               priority,
 | |
|                           uint64_t          start_deadline)
 | |
| {
 | |
| 	if(impl)
 | |
| 		return impl->submit(start_routine,finish_callback,state,thread_type,priority,start_deadline);
 | |
| 	else
 | |
| 		return false;
 | |
| }
 | |
| 
 | |
| bool JobScheduler::submit_io(start_routine_t   start_routine,
 | |
|                           finish_routine_t  finish_callback,
 | |
|                           FileState        *fstate,
 | |
|                           thread_type_t     thread_type,
 | |
|                           int               priority,
 | |
|                           bool              is_write_job,
 | |
|                           uint64_t          start_deadline)
 | |
| {
 | |
| 	if(impl)
 | |
| 		return impl->submit_io(start_routine,finish_callback,fstate,thread_type,priority,is_write_job,start_deadline);
 | |
| 	else
 | |
| 		return false;
 | |
| }
 | |
| 
 | |
| 
 | |
| bool JobScheduler::are_io_write_jobs_running() const
 | |
| {
 | |
| 	if(impl)
 | |
| 		return impl->are_io_write_jobs_running();
 | |
| 	else
 | |
| 		return false;
 | |
| }
 | |
| 
 | |
| 
 | |
| void JobScheduler::cancel_file_read_jobs(const BigFile *bf)
 | |
| {
 | |
| 	if(impl)
 | |
| 		impl->cancel_file_read_jobs(bf);
 | |
| }
 | |
| 
 | |
| 
 | |
| //void nice page for html and administation()
 | |
| 
 | |
| 
 | |
| bool JobScheduler::is_reading_file(const BigFile *bf)
 | |
| {
 | |
| 	if(impl)
 | |
| 		return impl->is_reading_file(bf);
 | |
| 	else
 | |
| 		return false;
 | |
| }
 | |
| 
 | |
| 
 | |
| void JobScheduler::allow_new_jobs()
 | |
| {
 | |
| 	if(impl)
 | |
| 		impl->allow_new_jobs();
 | |
| }
 | |
| 
 | |
| void JobScheduler::disallow_new_jobs()
 | |
| {
 | |
| 	if(impl)
 | |
| 		impl->disallow_new_jobs();
 | |
| }
 | |
| 
 | |
| bool JobScheduler::are_new_jobs_allowed() const
 | |
| {
 | |
| 	if(impl)
 | |
| 		return impl->are_new_jobs_allowed();
 | |
| 	else
 | |
| 		return false;
 | |
| }
 | |
| 
 | |
| 
 | |
| void JobScheduler::cancel_all_jobs_for_shutdown() {
 | |
| 	if(impl)
 | |
| 		impl->cancel_all_jobs_for_shutdown();
 | |
| }
 | |
| 
 | |
| 
 | |
| unsigned JobScheduler::num_queued_jobs() const
 | |
| {
 | |
| 	if(impl)
 | |
| 		return impl->num_queued_jobs();
 | |
| 	else
 | |
| 		return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| void JobScheduler::cleanup_finished_jobs()
 | |
| {
 | |
| 	if(impl)
 | |
| 		impl->cleanup_finished_jobs();
 | |
| }
 | |
| 
 | |
| 
 | |
| std::vector<JobDigest> JobScheduler::query_job_digests() const
 | |
| {
 | |
| 	if(impl)
 | |
| 		return impl->query_job_digests();
 | |
| 	else
 | |
| 		return std::vector<JobDigest>();
 | |
| }
 | |
| 
 | |
| 
 | |
| std::map<thread_type_t,JobTypeStatistics> JobScheduler::query_job_statistics(bool clear)
 | |
| {
 | |
| 	if(impl)
 | |
| 		return impl->query_job_statistics(clear);
 | |
| 	else
 | |
| 		return std::map<thread_type_t,JobTypeStatistics>();
 | |
| }
 | |
| 
 | |
| 
 | |
| ////////////////////////////////////////////////////////////////////////////////
 | |
| // The global one-and-only scheduler
 | |
| 
 | |
| JobScheduler g_jobScheduler;
 |