Separate job queue for file rename/unlink

This commit is contained in:
Ivan Skytte Jørgensen
2016-11-10 14:47:34 +01:00
parent 323602f314
commit e32919794a
5 changed files with 38 additions and 11 deletions

1
Conf.h

@ -150,6 +150,7 @@ class Conf {
int32_t m_maxCpuThreads;
int32_t m_maxIOThreads;
int32_t m_maxExternalThreads;
int32_t m_maxFileMetaThreads;
int32_t m_deadHostTimeout;
int32_t m_sendEmailTimeout;

@ -244,6 +244,7 @@ class JobScheduler_impl {
JobQueue cpu_job_queue;
JobQueue io_job_queue;
JobQueue external_job_queue;
JobQueue file_meta_job_queue;
RunningSet running_set;
@ -254,6 +255,7 @@ class JobScheduler_impl {
ThreadPool cpu_thread_pool;
ThreadPool io_thread_pool;
ThreadPool external_thread_pool;
ThreadPool file_meta_thread_pool;
bool no_threads;
bool new_jobs_allowed;
@ -262,17 +264,19 @@ class JobScheduler_impl {
bool submit(thread_type_t thread_type, JobEntry &e);
public:
JobScheduler_impl(unsigned num_cpu_threads, unsigned num_io_threads, unsigned num_external_threads, job_done_notify_t job_done_notify)
JobScheduler_impl(unsigned num_cpu_threads, unsigned num_io_threads, unsigned num_external_threads, unsigned num_file_meta_threads, job_done_notify_t job_done_notify)
: mtx PTHREAD_MUTEX_INITIALIZER,
cpu_job_queue(),
io_job_queue(),
external_job_queue(),
file_meta_job_queue(),
running_set(),
exit_set(),
num_io_write_jobs_running(0),
cpu_thread_pool("cpu",num_cpu_threads,&cpu_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
io_thread_pool("io",num_io_threads,&io_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
external_thread_pool("ext",num_external_threads,&external_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
file_meta_thread_pool("file",num_file_meta_threads,&file_meta_job_queue,&running_set,&exit_set,&num_io_write_jobs_running,&mtx,job_done_notify),
no_threads(num_cpu_threads==0 && num_io_threads==0 && num_external_threads==0),
new_jobs_allowed(true)
{
@ -332,6 +336,7 @@ JobScheduler_impl::~JobScheduler_impl() {
pthread_cond_broadcast(&cpu_job_queue.cond_not_empty);
pthread_cond_broadcast(&io_job_queue.cond_not_empty);
pthread_cond_broadcast(&external_job_queue.cond_not_empty);
pthread_cond_broadcast(&file_meta_job_queue.cond_not_empty);
//Then cancel all outstanding non-started jobs by moving them from the pending queues to the exit-set
{
@ -348,6 +353,10 @@ JobScheduler_impl::~JobScheduler_impl() {
exit_set.push_back(std::make_pair(external_job_queue.back(),job_exit_cancelled));
external_job_queue.pop_back();
}
while(!file_meta_job_queue.empty()) {
exit_set.push_back(std::make_pair(file_meta_job_queue.back(),job_exit_cancelled));
file_meta_job_queue.pop_back();
}
}
//Call finish-callbacks for all the exited / cancelled threads
@ -390,7 +399,7 @@ bool JobScheduler_impl::submit(thread_type_t thread_type, JobEntry &e)
case thread_type_replicate_write: job_queue = &cpu_job_queue; break;
case thread_type_replicate_read: job_queue = &cpu_job_queue; break;
case thread_type_file_merge: job_queue = &cpu_job_queue; break;
case thread_type_file_meta_data: job_queue = &cpu_job_queue; break;
case thread_type_file_meta_data: job_queue = &file_meta_job_queue;break;
case thread_type_statistics: job_queue = &cpu_job_queue; break;
case thread_type_unspecified_io: job_queue = &cpu_job_queue; break;
case thread_type_generate_thumbnail: job_queue = &external_job_queue; break;
@ -507,7 +516,7 @@ bool JobScheduler_impl::is_reading_file(const BigFile *bf)
unsigned JobScheduler_impl::num_queued_jobs() const
{
ScopedLock sl(mtx);
return cpu_job_queue.size() + io_job_queue.size() + external_job_queue.size();
return cpu_job_queue.size() + io_job_queue.size() + external_job_queue.size() + file_meta_job_queue.size();
}
void JobScheduler_impl::cleanup_finished_jobs()
@ -552,13 +561,15 @@ std::vector<JobDigest> JobScheduler_impl::query_job_digests() const
{
std::vector<JobDigest> v;
ScopedLock sl(mtx);
//v.reserve(cpu_job_queue.size() + io_job_queue.size() + external_job_queue.size() + running_set().size() + exit_set().size());
//v.reserve(cpu_job_queue.size() + io_job_queue.size() + external_job_queue.size() + file_meta_job_queue.size() + running_set().size() + exit_set().size());
for(const auto &je : cpu_job_queue)
v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
for(const auto &je : io_job_queue)
v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
for(const auto &je : external_job_queue)
v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
for(const auto &je : file_meta_job_queue)
v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_queued));
for(const auto &je : running_set)
v.push_back(job_entry_to_job_digest(je,JobDigest::job_state_running));
for(const auto &je : exit_set)
@ -597,10 +608,10 @@ JobScheduler::~JobScheduler()
}
bool JobScheduler::initialize(unsigned num_cpu_threads, unsigned num_io_threads, unsigned num_external_threads, job_done_notify_t job_done_notify)
bool JobScheduler::initialize(unsigned num_cpu_threads, unsigned num_io_threads, unsigned num_external_threads, unsigned num_file_meta_threads, job_done_notify_t job_done_notify)
{
assert(!impl);
impl = new JobScheduler_impl(num_cpu_threads,num_io_threads,num_external_threads,job_done_notify);
impl = new JobScheduler_impl(num_cpu_threads,num_io_threads,num_external_threads,num_file_meta_threads,job_done_notify);
return true;
}

@ -81,7 +81,7 @@ public:
JobScheduler();
~JobScheduler();
bool initialize(unsigned num_cpu_threads, unsigned num_io_threads, unsigned num_external_threads, job_done_notify_t job_done_notify=0);
bool initialize(unsigned num_cpu_threads, unsigned num_io_threads, unsigned num_external_threads, unsigned num_file_meta_threads, job_done_notify_t job_done_notify=0);
void finalize();
bool submit(start_routine_t start_routine,

@ -6632,7 +6632,7 @@ void Parms::init ( ) {
m->m_title = "max external threads";
m->m_desc = "Maximum number of threads to use per Gigablast process "
"for doing external calss with system() or similar..";
m->m_cgi = "max_ext_threads";
m->m_cgi = "max_file_meta_threads";
m->m_off = offsetof(Conf,m_maxExternalThreads);
m->m_type = TYPE_LONG;
m->m_def = "2";
@ -6644,6 +6644,21 @@ void Parms::init ( ) {
m->m_group = false;
m++;
m->m_title = "max file meta threads";
m->m_desc = "Maximum number of threads to use per Gigablast process "
"for doing file unlinks and renames";
m->m_cgi = "max_ext_threads";
m->m_off = offsetof(Conf,m_maxFileMetaThreads);
m->m_type = TYPE_LONG;
m->m_def = "2";
m->m_units = "threads";
m->m_min = 0;
m->m_flags = 0;
m->m_page = PAGE_MASTER;
m->m_obj = OBJ_CONF;
m->m_group = false;
m++;
m->m_title = "flush disk writes";
m->m_desc = "If enabled then all writes will be flushed to disk. "

@ -1482,7 +1482,7 @@ int main2 ( int argc , char *argv[] ) {
return 1;
}
if ( ! g_jobScheduler.initialize(g_conf.m_maxCpuThreads, g_conf.m_maxIOThreads, g_conf.m_maxExternalThreads, wakeupPollLoop)) {
if ( ! g_jobScheduler.initialize(g_conf.m_maxCpuThreads, g_conf.m_maxIOThreads, g_conf.m_maxExternalThreads, g_conf.m_maxFileMetaThreads, wakeupPollLoop)) {
log( LOG_ERROR, "db: JobScheduler init failed." );
return 1;
}
@ -4005,7 +4005,7 @@ static int64_t s_startTime = 0;
void seektest ( const char *testdir, int32_t numThreads, int32_t maxReadSize , const char *filename ) {
g_loop.init();
g_jobScheduler.initialize(numThreads,numThreads,numThreads);
g_jobScheduler.initialize(numThreads,numThreads,numThreads,numThreads);
s_numThreads = numThreads;
s_maxReadSize = maxReadSize;
if ( s_maxReadSize <= 0 ) s_maxReadSize = 1;
@ -5409,7 +5409,7 @@ int injectFile ( const char *filename , char *ips , const char *coll ) {
if ( ! g_loop.init() ) {
log("db: Loop init failed." ); exit(0); }
// set up the threads, might need g_conf
if ( ! g_jobScheduler.initialize(2,4,2) ) {
if ( ! g_jobScheduler.initialize(2,4,2,1) ) {
log("db: Threads init failed." ); exit(0); }
s_injectTitledb = true;
s_titledbKey.setMin();