privacore-open-source-searc.../JobScheduler.cpp
Ivan Skytte Jørgensen beeddcf35d Got rid of gb-include.h
2018-07-26 17:29:51 +02:00

799 lines
24 KiB
C++

#include "JobScheduler.h"
#include "ScopedLock.h"
#include "BigFile.h" //for FileState definition
#include "Errno.h"
#include <pthread.h>
#include <vector>
#include <list>
#include <algorithm>
#include <stdexcept>
#include <assert.h>
#include <sys/time.h>
#include <stdio.h>
namespace {
//return current time expressed as milliseconds since 1970
static uint64_t now_ms() {
struct timeval tv;
gettimeofday(&tv,0);
return tv.tv_sec*1000 + tv.tv_usec/1000;
}
//A job (queued, running or finished)
struct JobEntry {
start_routine_t start_routine;
finish_routine_t finish_callback;
void *state;
bool is_io_job;
//for scheduling:
uint64_t start_deadline; //latest time when this job must be started
bool is_io_write_job; //valid for I/O jobs: mostly read or mostly write?
int initial_priority; //priority when queued
//for statistics:
thread_type_t thread_type;
uint64_t queue_enter_time; //when this job was queued
uint64_t start_time; //when this job started running
uint64_t stop_time; //when this job stopped running
uint64_t finish_time; //when the finish callback was called
uint64_t exit_time; //when this job was finished, including finish-callback
};
//a set of jobs, prioritized
class JobQueue : public std::vector<JobEntry> {
public:
pthread_cond_t cond_not_empty;
unsigned potential_worker_threads;
JobQueue()
: vector(),
cond_not_empty PTHREAD_COND_INITIALIZER,
potential_worker_threads(0)
{
}
~JobQueue() {
pthread_cond_destroy(&cond_not_empty);
}
void add(const JobEntry &e) {
push_back(e);
pthread_cond_signal(&cond_not_empty);
}
JobEntry pop_top_priority();
};
JobEntry JobQueue::pop_top_priority()
{
assert(!empty());
//todo: age old entries
std::vector<JobEntry>::iterator best_iter = begin();
std::vector<JobEntry>::iterator iter = best_iter;
++iter;
for( ; iter!=end(); ++iter)
if(iter->initial_priority<best_iter->initial_priority)
best_iter = iter;
JobEntry tmp = *best_iter;
erase(best_iter);
return tmp;
}
typedef std::vector<std::pair<JobEntry,job_exit_t>> ExitSet;
typedef std::list<JobEntry> RunningSet;
//parameters given to a pool thread
struct PoolThreadParameters {
JobQueue *job_queue; //queue to fetch jobs from
RunningSet *running_set; //set to store the job in while executing it
ExitSet *exit_set; //set to store the finished job+exit-cause in
unsigned *num_io_write_jobs_running; //global counter for scheduling
pthread_mutex_t *mtx; //mutex covering above 3 containers
job_done_notify_t job_done_notify; //notifycation callback whenever a job returns
bool stop;
};
extern "C" {
static void *job_pool_thread_function(void *pv) {
PoolThreadParameters *ptp= static_cast<PoolThreadParameters*>(pv);
pthread_mutex_lock(ptp->mtx);
while(!ptp->stop) {
if(ptp->job_queue->empty())
pthread_cond_wait(&ptp->job_queue->cond_not_empty,ptp->mtx);
if(ptp->stop)
break;
if(ptp->job_queue->empty()) //spurious wakeup
continue;
//take the top-priority job and move it into the running set
RunningSet::iterator iter = ptp->running_set->insert(ptp->running_set->begin(),ptp->job_queue->pop_top_priority());
if(iter->is_io_job && iter->is_io_write_job)
++*(ptp->num_io_write_jobs_running);
pthread_mutex_unlock(ptp->mtx);
job_exit_t job_exit;
uint64_t now = now_ms();
iter->start_time = now;
if(iter->start_deadline==0 || iter->start_deadline>now) {
// Clear g_errno so the thread/job starts with a clean slate
g_errno = 0;
iter->start_routine(iter->state);
iter->stop_time = now_ms();
job_exit = job_exit_normal;
} else {
job_exit = job_exit_deadline;
}
pthread_mutex_lock(ptp->mtx);
if(iter->is_io_job && iter->is_io_write_job)
--*(ptp->num_io_write_jobs_running);
//copy+delete it into the exit queue
ptp->exit_set->push_back(std::make_pair(*iter,job_exit));
ptp->running_set->erase(iter);
pthread_mutex_unlock(ptp->mtx);
(ptp->job_done_notify)();
pthread_mutex_lock(ptp->mtx);
}
pthread_mutex_unlock(ptp->mtx);
return 0;
}
}
static void job_done_notify_noop() {
}
class ThreadPool {
public:
ThreadPool(const char *thread_name_prefix,
unsigned num_threads,
JobQueue *job_queue, RunningSet *running_set, ExitSet *exit_set,
unsigned *num_io_write_jobs_running,
pthread_mutex_t *mtx,
job_done_notify_t job_done_notify_);
void initiate_stop();
void join_all();
~ThreadPool();
private:
PoolThreadParameters ptp;
std::vector<pthread_t> tid;
};
ThreadPool::ThreadPool(const char *thread_name_prefix,
unsigned num_threads,
JobQueue *job_queue, RunningSet *running_set, ExitSet *exit_set,
unsigned *num_io_write_jobs_running,
pthread_mutex_t *mtx,
job_done_notify_t job_done_notify_)
: tid(num_threads)
{
job_queue->potential_worker_threads += num_threads;
ptp.job_queue = job_queue;
ptp.running_set = running_set;
ptp.exit_set = exit_set;
ptp.num_io_write_jobs_running = num_io_write_jobs_running;
ptp.mtx = mtx;
ptp.job_done_notify = job_done_notify_?job_done_notify_:job_done_notify_noop;
ptp.stop = false;
for(unsigned i=0; i<tid.size(); i++) {
int rc = pthread_create(&tid[i], NULL, job_pool_thread_function, &ptp);
if(rc!=0)
throw std::runtime_error("pthread_create() failed");
char thread_name[16]; //hard limit
snprintf(thread_name,sizeof(thread_name),"%s %u", thread_name_prefix,i);
rc = pthread_setname_np(tid[i],thread_name);
if(rc!=0)
throw std::runtime_error("pthread_setname_np() failed");
}
}
void ThreadPool::initiate_stop()
{
for(unsigned i=0; i<tid.size(); i++)
ptp.stop = true;
}
void ThreadPool::join_all()
{
for(unsigned i=0; i<tid.size(); i++)
pthread_join(tid[i],NULL);
tid.clear();
}
ThreadPool::~ThreadPool()
{
join_all();
}
} //anonymous namespace
////////////////////////////////////////////////////////////////////////////////
// JobScheduler implementation
class JobScheduler_impl {
mutable pthread_mutex_t mtx;
JobQueue coordinator_job_queue;
JobQueue cpu_job_queue;
JobQueue summary_job_queue;
JobQueue io_job_queue;
JobQueue external_job_queue;
JobQueue file_meta_job_queue;
JobQueue merge_job_queue;
RunningSet running_set;
ExitSet exit_set;
unsigned num_io_write_jobs_running;
ThreadPool coordinator_thread_pool;
ThreadPool cpu_thread_pool;
ThreadPool summary_thread_pool;
ThreadPool io_thread_pool;
ThreadPool external_thread_pool;
ThreadPool file_meta_thread_pool;
ThreadPool merge_thread_pool;
bool no_threads;
bool new_jobs_allowed;
std::map<thread_type_t,JobTypeStatistics> job_statistics;
bool submit(thread_type_t thread_type, JobEntry &e);
void cancel_queued_jobs(JobQueue &jq, job_exit_t job_exit);
public:
JobScheduler_impl(unsigned num_coordinator_threads, unsigned num_cpu_threads, unsigned num_summary_threads, unsigned num_io_threads, unsigned num_external_threads, unsigned num_file_meta_threads, unsigned num_merge_threads, job_done_notify_t job_done_notify)
: mtx PTHREAD_MUTEX_INITIALIZER,
coordinator_job_queue(),
cpu_job_queue(),
summary_job_queue(),
io_job_queue(),
external_job_queue(),
file_meta_job_queue(),
merge_job_queue(),
running_set(),
exit_set(),
num_io_write_jobs_running(0),
coordinator_thread_pool("coord",num_coordinator_threads,&coordinator_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
cpu_thread_pool("cpu",num_cpu_threads,&cpu_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
summary_thread_pool("summary",num_summary_threads,&summary_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
io_thread_pool("io",num_io_threads,&io_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
external_thread_pool("ext",num_external_threads,&external_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
file_meta_thread_pool("file",num_file_meta_threads,&file_meta_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
merge_thread_pool("merge",num_merge_threads,&merge_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
no_threads(num_cpu_threads==0 && num_summary_threads==0 && num_io_threads==0 && num_external_threads==0 && num_file_meta_threads==0),
new_jobs_allowed(true)
{
}
~JobScheduler_impl();
bool submit(start_routine_t start_routine,
finish_routine_t finish_callback,
void *state,
thread_type_t thread_type,
int priority,
uint64_t start_deadline=0);
bool submit_io(start_routine_t start_routine,
finish_routine_t finish_callback,
FileState *fstate,
thread_type_t thread_type,
int priority,
bool is_write_job,
uint64_t start_deadline=0);
bool are_io_write_jobs_running() const;
void cancel_file_read_jobs(const BigFile *bf);
//void nice page for html and administation()
bool is_reading_file(const BigFile *bf);
void allow_new_jobs() {
new_jobs_allowed = true;
}
void disallow_new_jobs() {
new_jobs_allowed = false;
}
bool are_new_jobs_allowed() const {
return new_jobs_allowed && !no_threads;
}
void cancel_all_jobs_for_shutdown();
unsigned num_queued_jobs() const;
void cleanup_finished_jobs();
std::vector<JobDigest> query_job_digests() const;
std::map<thread_type_t,JobTypeStatistics> query_job_statistics(bool clear);
};
JobScheduler_impl::~JobScheduler_impl() {
//First prevent new jobs from being submitted
new_jobs_allowed = false;
//Then tell the worker threads to stop executing more jobs
coordinator_thread_pool.initiate_stop();
cpu_thread_pool.initiate_stop();
summary_thread_pool.initiate_stop();
io_thread_pool.initiate_stop();
external_thread_pool.initiate_stop();
file_meta_thread_pool.initiate_stop();
merge_thread_pool.initiate_stop();
//Then wake them if they are sleeping
pthread_cond_broadcast(&coordinator_job_queue.cond_not_empty);
pthread_cond_broadcast(&cpu_job_queue.cond_not_empty);
pthread_cond_broadcast(&summary_job_queue.cond_not_empty);
pthread_cond_broadcast(&io_job_queue.cond_not_empty);
pthread_cond_broadcast(&external_job_queue.cond_not_empty);
pthread_cond_broadcast(&file_meta_job_queue.cond_not_empty);
pthread_cond_broadcast(&merge_job_queue.cond_not_empty);
//Then cancel all outstanding non-started jobs by moving them from the pending queues to the exit-set
{
ScopedLock sl(mtx);
cancel_queued_jobs(coordinator_job_queue,job_exit_program_exit);
cancel_queued_jobs(cpu_job_queue,job_exit_program_exit);
cancel_queued_jobs(summary_job_queue,job_exit_program_exit);
cancel_queued_jobs(io_job_queue,job_exit_program_exit);
cancel_queued_jobs(external_job_queue,job_exit_program_exit);
cancel_queued_jobs(file_meta_job_queue,job_exit_program_exit);
cancel_queued_jobs(merge_job_queue,job_exit_program_exit);
}
//Call finish-callbacks for all the exited / cancelled threads
//We "know" that this function (finalize) is only called from the main thread, *cough* *cough*
cleanup_finished_jobs();
//Then wait for worker threads to finished
coordinator_thread_pool.join_all();
cpu_thread_pool.join_all();
summary_thread_pool.join_all();
io_thread_pool.join_all();
file_meta_thread_pool.join_all();
merge_thread_pool.join_all();
external_thread_pool.join_all();
pthread_mutex_destroy(&mtx);
}
bool JobScheduler_impl::submit(thread_type_t thread_type, JobEntry &e)
{
if(!new_jobs_allowed) //note: unprotected read
return false;
e.thread_type = thread_type;
//Determine which queue to put the job into
//i/o jobs should have the is_io_job=true, but if they don't we will
//just treat them as CPU-bound. All this looks over-engineered but we
//need some flexibility to make experiments.
JobQueue *job_queue;
if(e.is_io_job)
job_queue = &io_job_queue;
else {
switch(thread_type) {
case thread_type_query_coordinator: job_queue = &coordinator_job_queue; break;
case thread_type_query_read: job_queue = &cpu_job_queue; break;
case thread_type_query_constrain: job_queue = &cpu_job_queue; break;
case thread_type_query_merge: job_queue = &cpu_job_queue; break;
case thread_type_query_intersect: job_queue = &cpu_job_queue; break;
case thread_type_query_summary: job_queue = &summary_job_queue; break;
case thread_type_spider_read: job_queue = &cpu_job_queue; break;
case thread_type_spider_write: job_queue = &cpu_job_queue; break;
case thread_type_spider_filter: job_queue = &external_job_queue; break;
case thread_type_spider_query: job_queue = &cpu_job_queue; break;
case thread_type_spider_index: job_queue = &cpu_job_queue; break;
case thread_type_merge_filter: job_queue = &merge_job_queue; break;
case thread_type_replicate_write: job_queue = &cpu_job_queue; break;
case thread_type_replicate_read: job_queue = &cpu_job_queue; break;
case thread_type_file_merge: job_queue = &merge_job_queue; break;
case thread_type_file_meta_data: job_queue = &file_meta_job_queue;break;
case thread_type_index_merge: job_queue = &cpu_job_queue; break;
case thread_type_index_generate: job_queue = &merge_job_queue; break;
case thread_type_verify_data: job_queue = &cpu_job_queue; break;
case thread_type_statistics: job_queue = &cpu_job_queue; break;
case thread_type_unspecified_io: job_queue = &cpu_job_queue; break;
case thread_type_generate_thumbnail: job_queue = &external_job_queue; break;
case thread_type_config_load: job_queue = &cpu_job_queue; break;
case thread_type_page_process: job_queue = &cpu_job_queue; break;
default:
assert(false);
}
}
if(job_queue->potential_worker_threads==0)
return false;
e.queue_enter_time = now_ms();
ScopedLock sl(mtx);
job_queue->add(e);
return true;
}
void JobScheduler_impl::cancel_queued_jobs(JobQueue &jq, job_exit_t job_exit) {
while(!jq.empty()) {
exit_set.push_back(std::make_pair(jq.back(),job_exit));
jq.pop_back();
}
}
bool JobScheduler_impl::submit(start_routine_t start_routine,
finish_routine_t finish_callback,
void *state,
thread_type_t thread_type,
int priority,
uint64_t start_deadline)
{
JobEntry e;
memset(&e, 0, sizeof(JobEntry));
e.start_routine = start_routine;
e.finish_callback = finish_callback;
e.state = state;
e.is_io_job = false;
e.start_deadline = start_deadline;
e.is_io_write_job = false;
e.initial_priority = priority;
return submit(thread_type,e);
}
bool JobScheduler_impl::submit_io(start_routine_t start_routine,
finish_routine_t finish_callback,
FileState *fstate,
thread_type_t thread_type,
int priority,
bool is_write_job,
uint64_t start_deadline)
{
JobEntry e;
memset(&e, 0, sizeof(JobEntry));
e.start_routine = start_routine;
e.finish_callback = finish_callback;
e.state = fstate;
e.is_io_job = true;
e.start_deadline = start_deadline;
e.is_io_write_job = is_write_job;
e.initial_priority = priority;
return submit(thread_type,e);
}
bool JobScheduler_impl::are_io_write_jobs_running() const
{
ScopedLock sl(mtx);
return num_io_write_jobs_running != 0;
}
void JobScheduler_impl::cancel_file_read_jobs(const BigFile *bf)
{
ScopedLock sl(mtx);
for(JobQueue::iterator iter = io_job_queue.begin(); iter!=io_job_queue.end(); ) {
if(iter->is_io_job && !iter->is_io_write_job) {
const FileState *fstate = reinterpret_cast<const FileState*>(iter->state);
if(fstate->m_bigfile==bf) {
exit_set.push_back(std::make_pair(*iter,job_exit_cancelled));
iter = io_job_queue.erase(iter);
continue;
}
}
++iter;
}
}
bool JobScheduler_impl::is_reading_file(const BigFile *bf)
{
//The old thread stuff tested explicitly if the start_routine was
//readwriteWrapper_r() in BigFile.cpp but that is fragile. Besides,
//we have the 'is_io_write_job' field.
ScopedLock sl(mtx);
for(const auto &e : io_job_queue) {
if(e.is_io_job && !e.is_io_write_job) {
const FileState *fstate = reinterpret_cast<const FileState*>(e.state);
if(fstate->m_bigfile==bf)
return true;
}
}
for(const auto &e : running_set) {
if(e.is_io_job && !e.is_io_write_job) {
const FileState *fstate = reinterpret_cast<const FileState*>(e.state);
if(fstate->m_bigfile==bf)
return true;
}
}
return false;
}
void JobScheduler_impl::cancel_all_jobs_for_shutdown() {
ScopedLock sl(mtx);
cancel_queued_jobs(coordinator_job_queue,job_exit_program_exit);
cancel_queued_jobs(cpu_job_queue,job_exit_program_exit);
cancel_queued_jobs(summary_job_queue,job_exit_program_exit);
cancel_queued_jobs(io_job_queue,job_exit_program_exit);
cancel_queued_jobs(external_job_queue,job_exit_program_exit);
cancel_queued_jobs(file_meta_job_queue,job_exit_program_exit);
cancel_queued_jobs(merge_job_queue,job_exit_program_exit);
}
unsigned JobScheduler_impl::num_queued_jobs() const
{
ScopedLock sl(mtx);
return cpu_job_queue.size() + summary_job_queue.size() + io_job_queue.size() + external_job_queue.size() + file_meta_job_queue.size() + merge_job_queue.size();
}
void JobScheduler_impl::cleanup_finished_jobs()
{
ExitSet es;
ScopedLock sl(mtx);
es.swap(exit_set);
sl.unlock();
for(auto e : es) {
e.first.finish_time = now_ms();
if(e.first.finish_callback)
e.first.finish_callback(e.first.state,e.second);
e.first.exit_time = now_ms();
JobTypeStatistics &s = job_statistics[e.first.thread_type];
s.job_count++;
s.queue_time += e.first.start_time - e.first.queue_enter_time;
s.running_time += e.first.stop_time - e.first.start_time;
s.done_time += e.first.finish_time - e.first.stop_time;
s.cleanup_time += e.first.exit_time - e.first.finish_time;
}
}
static JobDigest job_entry_to_job_digest(const JobEntry& je, JobDigest::job_state_t job_state)
{
JobDigest jd;
jd.thread_type = je.thread_type;
jd.start_routine = je.start_routine;
jd.finish_callback = je.finish_callback;
jd.job_state = job_state;
jd.start_deadline = je.start_deadline;
jd.queue_enter_time = je.queue_enter_time;
jd.start_time = je.start_time;
jd.stop_time = je.stop_time;
return jd;
}
std::vector<JobDigest> JobScheduler_impl::query_job_digests() const
{
std::vector<JobDigest> v;
ScopedLock sl(mtx);
//v.reserve(cpu_job_queue.size() + io_job_queue.size() + external_job_queue.size() + file_meta_job_queue.size() + running_set().size() + exit_set().size());
for(const auto &je : coordinator_job_queue)
v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
for(const auto &je : cpu_job_queue)
v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
for(const auto &je : summary_job_queue)
v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
for(const auto &je : io_job_queue)
v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
for(const auto &je : external_job_queue)
v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
for(const auto &je : file_meta_job_queue)
v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
for(const auto &je : merge_job_queue)
v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
for(const auto &je : running_set)
v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_running));
for(const auto &je : exit_set)
v.push_back(job_entry_to_job_digest(je.first,JobDigest::job_state_stopped));
return v;
}
std::map<thread_type_t,JobTypeStatistics> JobScheduler_impl::query_job_statistics(bool clear)
{
if(clear) {
std::map<thread_type_t,JobTypeStatistics> tmp;
tmp.swap(job_statistics);
return tmp;
} else
return job_statistics;
}
////////////////////////////////////////////////////////////////////////////////
// bridge
JobScheduler::JobScheduler()
: impl(0)
{
}
JobScheduler::~JobScheduler()
{
delete impl;
impl = 0;
}
bool JobScheduler::initialize(unsigned num_coordinator_threads, unsigned num_cpu_threads, unsigned num_summary_threads, unsigned num_io_threads, unsigned num_external_threads, unsigned num_file_meta_threads, unsigned num_merge_threads, job_done_notify_t job_done_notify)
{
assert(!impl);
impl = new JobScheduler_impl(num_coordinator_threads,num_cpu_threads,num_summary_threads,num_io_threads,num_external_threads,num_file_meta_threads,num_merge_threads,job_done_notify);
return true;
}
void JobScheduler::finalize()
{
assert(impl);
delete impl;
impl = 0;
}
bool JobScheduler::submit(start_routine_t start_routine,
finish_routine_t finish_callback,
void *state,
thread_type_t thread_type,
int priority,
uint64_t start_deadline)
{
if(impl)
return impl->submit(start_routine,finish_callback,state,thread_type,priority,start_deadline);
else
return false;
}
bool JobScheduler::submit_io(start_routine_t start_routine,
finish_routine_t finish_callback,
FileState *fstate,
thread_type_t thread_type,
int priority,
bool is_write_job,
uint64_t start_deadline)
{
if(impl)
return impl->submit_io(start_routine,finish_callback,fstate,thread_type,priority,is_write_job,start_deadline);
else
return false;
}
bool JobScheduler::are_io_write_jobs_running() const
{
if(impl)
return impl->are_io_write_jobs_running();
else
return false;
}
void JobScheduler::cancel_file_read_jobs(const BigFile *bf)
{
if(impl)
impl->cancel_file_read_jobs(bf);
}
//void nice page for html and administation()
bool JobScheduler::is_reading_file(const BigFile *bf)
{
if(impl)
return impl->is_reading_file(bf);
else
return false;
}
void JobScheduler::allow_new_jobs()
{
if(impl)
impl->allow_new_jobs();
}
void JobScheduler::disallow_new_jobs()
{
if(impl)
impl->disallow_new_jobs();
}
bool JobScheduler::are_new_jobs_allowed() const
{
if(impl)
return impl->are_new_jobs_allowed();
else
return false;
}
void JobScheduler::cancel_all_jobs_for_shutdown() {
if(impl)
impl->cancel_all_jobs_for_shutdown();
}
unsigned JobScheduler::num_queued_jobs() const
{
if(impl)
return impl->num_queued_jobs();
else
return 0;
}
void JobScheduler::cleanup_finished_jobs()
{
if(impl)
impl->cleanup_finished_jobs();
}
std::vector<JobDigest> JobScheduler::query_job_digests() const
{
if(impl)
return impl->query_job_digests();
else
return std::vector<JobDigest>();
}
std::map<thread_type_t,JobTypeStatistics> JobScheduler::query_job_statistics(bool clear)
{
if(impl)
return impl->query_job_statistics(clear);
else
return std::map<thread_type_t,JobTypeStatistics>();
}
////////////////////////////////////////////////////////////////////////////////
// The global one-and-only scheduler
JobScheduler g_jobScheduler;