privacore-open-source-searc.../JobScheduler.h

133 lines
4.0 KiB
C++

#ifndef GB_JOBSCHEDULER_H
#define GB_JOBSCHEDULER_H
#include <inttypes.h>
#include <vector>
#include <map>
class BigFile;
class FileState;
//exit codes/reasons when the finish callback is called
enum job_exit_t {
job_exit_normal, //job ran to completion (possibly with errors)
job_exit_cancelled, //job was cancelled (typically due to file being removed)
job_exit_deadline, //job was cancelled due to deadline
job_exit_program_exit //job was cancelled because program is shutting down
};
typedef void (*start_routine_t)(void *state);
typedef void (*finish_routine_t)(void *state, job_exit_t exit_type);
enum thread_type_t {
thread_type_query_coordinator,
thread_type_query_read,
thread_type_query_constrain,
thread_type_query_merge,
thread_type_query_intersect,
thread_type_query_summary,
thread_type_spider_read,
thread_type_spider_write,
thread_type_spider_filter, //pdf2html/doc2html/...
thread_type_spider_query, //?
thread_type_spider_index,
thread_type_merge_filter,
thread_type_replicate_write,
thread_type_replicate_read,
thread_type_file_merge,
thread_type_file_meta_data, //unlink/rename
thread_type_index_merge,
thread_type_index_generate,
thread_type_verify_data, //mostly CPU
thread_type_statistics, //mostly i/o
thread_type_unspecified_io, //until we can be more specific
thread_type_generate_thumbnail,
thread_type_config_load,
thread_type_page_process,
};
//A digest of a job in the scheduler. For statistics and display purposes
struct JobDigest {
thread_type_t thread_type;
start_routine_t start_routine;
finish_routine_t finish_callback;
enum job_state_t {
job_state_queued,
job_state_running,
job_state_stopped
} job_state;
uint64_t start_deadline; //latest time when this job must be started
uint64_t queue_enter_time; //when this job was queued
uint64_t start_time; //when this job started running
uint64_t stop_time; //when this job stopped running
};
//statistics for queue and execution time per thread type
struct JobTypeStatistics {
uint64_t job_count;
uint64_t queue_time; //time from enque to start, in nsecs
uint64_t running_time; //time from start to stop, in nsecs
uint64_t done_time; //time from stop to cleanup
uint64_t cleanup_time; //time from in cleanup
};
class JobScheduler_impl;
typedef void (*job_done_notify_t)();
class JobScheduler {
JobScheduler(const JobScheduler&);
JobScheduler& operator=(const JobScheduler&);
public:
JobScheduler();
~JobScheduler();
bool initialize(unsigned num_coordinator_threads, unsigned num_cpu_threads, unsigned num_summary_threads, unsigned num_io_threads, unsigned num_external_threads, unsigned num_file_meta_threads, unsigned num_merge_threads, job_done_notify_t job_done_notify=0);
void finalize();
bool submit(start_routine_t start_routine,
finish_routine_t finish_callback,
void *state,
thread_type_t thread_type,
int priority,
uint64_t start_deadline=0);
bool submit_io(start_routine_t start_routine,
finish_routine_t finish_callback,
FileState *fstate,
thread_type_t thread_type,
int priority,
bool is_write_job,
uint64_t start_deadline=0);
bool are_io_write_jobs_running() const;
void cancel_file_read_jobs(const BigFile *bf);
bool is_reading_file(const BigFile *bf);
void allow_new_jobs();
void disallow_new_jobs();
bool are_new_jobs_allowed() const;
void cancel_all_jobs_for_shutdown();
unsigned num_queued_jobs() const;
void cleanup_finished_jobs();
std::vector<JobDigest> query_job_digests() const;
std::map<thread_type_t,JobTypeStatistics> query_job_statistics(bool clear=true);
private:
JobScheduler_impl *impl;
};
extern JobScheduler g_jobScheduler;
#endif // GB_JOBSCHEDULER_H