3368 lines
106 KiB
C++
3368 lines
106 KiB
C++
#include "gb-include.h"
|
|
|
|
#include "BigFile.h"
|
|
#include "Threads.h"
|
|
#include "Errno.h"
|
|
#include "Loop.h"
|
|
#include <sys/time.h>
|
|
#include <sys/resource.h>
|
|
#include <sys/types.h> // getuid()/pid_t/getpid()
|
|
#include <sys/wait.h> // waitpid()
|
|
#include "Rdb.h" // g_mergeUrgent
|
|
#include <sched.h> // clone()
|
|
//#include "Msg16.h" // g_pid g_ticker
|
|
#include "XmlDoc.h" // g_pid g_ticker
|
|
#include "Profiler.h"
|
|
#include "Stats.h"
|
|
#include "Process.h"
|
|
|
|
// try using pthreads again
|
|
//#define PTHREADS
|
|
|
|
// use these stubs so libplotter.a works
|
|
|
|
#ifndef PTHREADS
|
|
int pthread_mutex_lock (pthread_mutex_t *t ) { return 0; }
|
|
int pthread_mutex_unlock (pthread_mutex_t *t ) { return 0; }
|
|
#else
|
|
#include <pthread.h>
|
|
//pthread_attr_t s_attr;
|
|
#endif
|
|
|
|
// main process id (or thread id if using pthreads)
|
|
//static pid_t s_pid = (pid_t) -1;
|
|
// on 64-bit architectures pthread_t is 64 bit and pid_t is 32 bit:
|
|
static pthread_t s_pid = (pthread_t) -1;
|
|
|
|
//pid_t getpidtid() {
|
|
// on 64-bit architectures pthread_t is 64 bit and pid_t is 32 bit:
|
|
pthread_t getpidtid() {
|
|
#ifdef PTHREADS
|
|
// gettid() is a bit newer so not in our libc32...
|
|
// so kinda fake it. return the "thread" id, not process id.
|
|
// Threads::amThread() should still work based on thread ids because
|
|
// the main process has a unique thread id as well.
|
|
return pthread_self();
|
|
#else
|
|
return (pthread_t)getpid();
|
|
#endif
|
|
}
|
|
|
|
|
|
|
|
// BIG PROBLEM:
|
|
// When doing pthread_join it doesn't always ensure a thread doesn't go
|
|
// zombie. It seems like when SIGIOs are generated by sigqueue() because
|
|
// of a full signal queue, threads start going zombie on me.
|
|
|
|
|
|
// PROBLEM #1: with using the "state" to identify a thread in the queue.
|
|
// Often a caller make s a disk access using a certain "state" var.
|
|
// He gets control back when cleanUp() calls t->m_callback. And he launches
|
|
// another read thread in there, which calls Threads::exit(state) before
|
|
// the original thread entry's m_isOccupied is set to false. So
|
|
// Threads::exit(state) mistakenly picks the thread entry from the former
|
|
// thread picks it has the same state as its thread!
|
|
|
|
// PROBLEM #2: technically, a thread in Threads::exit() can set its m_done
|
|
// bit then send the signal. When the Loop sig handler sees the done bit is
|
|
// set it decrements the global thread count even though the thread may
|
|
// not have actually exited yet!! I spawned like 1,000 threads this way!!!!!
|
|
|
|
// a global class extern'd in .h file
|
|
Threads g_threads;
|
|
|
|
|
|
// . call this before calling a ThreadEntry's m_startRoutine
|
|
// . allows us to set the priority of the thread
|
|
int startUp ( void *state ) ;
|
|
void *startUp2 ( void *state ) ;
|
|
|
|
// JAB: warning abatement
|
|
//static void launchThreadsWrapper ( int fd , void *state );
|
|
static void killStalledFiltersWrapper ( int fd , void *state );
|
|
|
|
static void makeCallback ( ThreadEntry *t ) ;
|
|
|
|
// is the caller a thread?
|
|
bool Threads::amThread () {
|
|
if ( s_pid == (pthread_t)-1 ) return false;
|
|
#ifdef PTHREADS
|
|
// gettid() is a bit newer so not in our libc32...
|
|
// so kinda fake it. return the "thread" id, not process id.
|
|
// Threads::amThread() should still work based on thread ids because
|
|
// the main process has a unique thread id as well.
|
|
return (pthread_self() != s_pid);
|
|
#else
|
|
return ( getpidtid() != s_pid );
|
|
#endif
|
|
}
|
|
|
|
#ifndef PTHREADS
|
|
static int32_t s_bad = 0;
|
|
static int32_t s_badPid = -1;
|
|
#endif
|
|
|
|
#define MAX_PID 32767
|
|
|
|
#ifndef PTHREADS
|
|
|
|
static int s_errno ;
|
|
static int s_errnos [ MAX_PID + 1 ];
|
|
|
|
// this was improvised from linuxthreads/errno.c
|
|
//#define CURRENT_STACK_FRAME ({ char __csf; &__csf; })
|
|
// WARNING: you MUST compile with -DREENTRANT for this to work
|
|
int *__errno_location (void) {
|
|
int32_t pid = (int32_t) getpid();
|
|
//if ( pid == s_pid ) return &g_errno;
|
|
if ( pid <= (int32_t)MAX_PID ) return &s_errnos[pid];
|
|
s_bad++;
|
|
s_badPid = pid;
|
|
return &s_errno;
|
|
}
|
|
|
|
#endif
|
|
|
|
// stack must be page aligned for mprotect
|
|
#define THRPAGESIZE 8192
|
|
|
|
// how much of stack to use as guard space
|
|
#define GUARDSIZE (32*1024)
|
|
|
|
// . crashed in saving with 800k, so try 1M
|
|
// . must be multiple of THRPAGESIZE
|
|
//#define STACK_SIZE ((512+256) * 1024)
|
|
// pthread_create() cores in calloc() if we don't make STACK_SIZE bigger:
|
|
#define STACK_SIZE ((512+256+1024) * 1024)
|
|
|
|
// jeta was having some problems, but i don't think they were related to
|
|
// this stack size of 512k, but i will try boosting to 800k anyway.
|
|
//#define STACK_SIZE (512 * 1024)
|
|
|
|
// 256k was not enough, try 512k
|
|
//#define STACK_SIZE (256 * 1024)
|
|
|
|
// at 128k (and even 200k) some threads do not return! why?? there's
|
|
// obviously some stack overwriting going on!
|
|
//#define STACK_SIZE (128 * 1024)
|
|
|
|
static char *s_stackAlloc = NULL;
|
|
static int32_t s_stackAllocSize;
|
|
|
|
//#ifndef PTHREADS
|
|
static char *s_stack = NULL;
|
|
static int32_t s_stackSize;
|
|
static char *s_stackPtrs [ MAX_STACKS ];
|
|
//#endif
|
|
|
|
static int32_t s_next [ MAX_STACKS ];
|
|
static int32_t s_head ;
|
|
|
|
// returns NULL if none left
|
|
int32_t Threads::getStack ( ) {
|
|
if ( s_head == -1 ) return -1;
|
|
int32_t i = s_head;
|
|
s_head = s_next [ s_head ];
|
|
return i;
|
|
}
|
|
|
|
void Threads::returnStack ( int32_t si ) {
|
|
if ( s_head == -1 ) { s_head = si; s_next[si] = -1; return; }
|
|
s_next[si] = s_head;
|
|
s_head = si;
|
|
}
|
|
|
|
void Threads::reset ( ) {
|
|
if ( s_stackAlloc ) {
|
|
mprotect(s_stackAlloc, s_stackAllocSize, PROT_READ|PROT_WRITE);
|
|
mfree ( s_stackAlloc , s_stackAllocSize,
|
|
"ThreadStack");
|
|
}
|
|
s_stackAlloc = NULL;
|
|
for ( int32_t i = 0 ; i < MAX_THREAD_QUEUES ; i++ )
|
|
m_threadQueues[i].reset();
|
|
}
|
|
|
|
Threads::Threads ( ) {
|
|
m_numQueues = 0;
|
|
m_initialized = false;
|
|
}
|
|
|
|
void Threads::setPid ( ) {
|
|
// set s_pid to the main process id
|
|
#ifdef PTHREADS
|
|
// on 64bit arch pid is 32 bit and pthread_t is 64 bit
|
|
s_pid = (pthread_t)pthread_self();
|
|
//log(LOG_INFO,
|
|
// "threads: main process THREAD id = %" UINT32 "",(int32_t unsigned)s_pid);
|
|
pthread_t tid = pthread_self();
|
|
sched_param param;
|
|
int policy;
|
|
// scheduling parameters of target thread
|
|
pthread_getschedparam ( tid, &policy, ¶m);
|
|
//log(LOG_INFO,
|
|
// "threads: min/max thread priority settings = %" INT32 "/%" INT32 " (policy=%" INT32 ")",
|
|
// (int32_t)sched_get_priority_min(policy),
|
|
// (int32_t)sched_get_priority_max(policy),
|
|
// (int32_t)policy);
|
|
#else
|
|
s_pid = getpid();
|
|
#endif
|
|
}
|
|
|
|
bool Threads::init ( ) {
|
|
|
|
if ( m_initialized ) return true;
|
|
m_initialized = true;
|
|
|
|
m_needsCleanup = false;
|
|
//m_needBottom = false;
|
|
|
|
// sanity check
|
|
//if ( sizeof(pthread_t) > sizeof(pid_t) ) { char *xx=NULL;*xx=0; }
|
|
if ( sizeof(pid_t) > sizeof(pthread_t) ) { char *xx=NULL;*xx=0; }
|
|
|
|
setPid();
|
|
|
|
#ifdef _STACK_GROWS_UP
|
|
return log("thread: Stack growing up not supported.");
|
|
#endif
|
|
|
|
//g_conf.m_logDebugThread = true;
|
|
|
|
// . damn, this only applies to fork() calls, i guess
|
|
// . a quick a dirty way to restrict # of threads so we don't explode
|
|
/*
|
|
struct rlimit lim;
|
|
lim.rlim_max = 100;
|
|
if ( setrlimit(RLIMIT_NPROC,&lim) )
|
|
log("thread::init: setrlimit: %s", mstrerror(errno) );
|
|
else
|
|
log("thread::init: set max number of processes to 100");
|
|
*/
|
|
|
|
// allow threads until disabled
|
|
m_disabled = false;
|
|
/*
|
|
// # of low priority threads launched and returned
|
|
m_hiLaunched = 0;
|
|
m_hiReturned = 0;
|
|
m_loLaunched = 0;
|
|
m_loReturned = 0;
|
|
*/
|
|
//int32_t m_queryMaxBigDiskThreads ; // > 1M read
|
|
//int32_t m_queryMaxMedDiskThreads ; // 100k - 1M read
|
|
//int32_t m_queryMaxSmaDiskThreads ; // < 100k per read
|
|
// categorize the disk read sizes by these here
|
|
// g_conf.m_bigReadSize = 0x7fffffff;
|
|
// g_conf.m_medReadSize = 1000000;
|
|
// g_conf.m_smaReadSize = 100000;
|
|
|
|
// . register a sleep wrapper to launch threads every 30ms
|
|
// . sometimes a bunch of threads mark themselves as done and the
|
|
// cleanUp() handler sees them as all still launched so it doesn't
|
|
// launch any new ones
|
|
|
|
//if ( ! g_loop.registerSleepCallback(30,NULL,launchThreadsWrapper))
|
|
// return log("thread: Failed to initialize timer callback.");
|
|
|
|
if ( ! g_loop.registerSleepCallback(1000,NULL,
|
|
killStalledFiltersWrapper,0))
|
|
return log("thread: Failed to initialize timer callback2.");
|
|
|
|
// debug
|
|
//log("thread: main process has pid=%" INT32 "",(int32_t)s_pid);
|
|
|
|
// . set priority of the main process to 0
|
|
// . setpriority() only applies to SCHED_OTHER threads
|
|
// . priority of threads with niceness 0 will be 0
|
|
// . priority of threads with niceness 1 will be 10
|
|
// . priority of threads with niceness 2 will be 20
|
|
// . see 'man sched_setscheduler' for detail scheduling info
|
|
// . no need to call getpid(), 0 for pid means the current process
|
|
#ifndef PTHREADS
|
|
if ( setpriority ( PRIO_PROCESS, getpid() , 0 ) < 0 )
|
|
log("thread: Call to setpriority failed: %s.",
|
|
mstrerror(errno));
|
|
#endif
|
|
|
|
// make multiplier higher for raids, can do more seeks
|
|
//int32_t m = 1;
|
|
//#ifdef _LARS_
|
|
//m = 3;
|
|
//#endif
|
|
// register the types of threads here instead of in main.cpp
|
|
//if ( ! g_threads.registerType ( DISK_THREAD ,m*20/*maxThreads*/))
|
|
// try running blaster with 5 threads and you'll
|
|
// . see like a double degrade in performance for some reason!!
|
|
// . TODO: why?
|
|
// . well, this should be controlled g_conf.m_maxSpiderDiskThreads
|
|
// for niceness 1+ threads, and g_conf.m_maxPriorityDiskThreads for
|
|
// niceness 0 and below disk threads
|
|
// . 100 maxThreads out at a time, 32000 can be queued
|
|
if ( ! g_threads.registerType ( DISK_THREAD ,100/*maxThreads*/,32000))
|
|
return log("thread: Failed to register thread type." );
|
|
// . these are used by Msg5 to merge what it reads from disk
|
|
// . i raised it from 1 to 2 and got better response time from Msg10
|
|
// . i leave one open in case one is used for doing a big merge
|
|
// with high niceness cuz it would hold up high priority ones!
|
|
// . TODO: is there a better way? cancel it when UdpServer calls
|
|
// Threads::suspendLowPriorityThreads() ?
|
|
// . this used to be 2 but now defaults to 10 in Parms.cpp. i found
|
|
// i have less int32_t gray lines in the performance graph when i
|
|
// did that on trinity.
|
|
int32_t max2 = g_conf.m_maxCpuMergeThreads;
|
|
if ( max2 < 1 ) max2 = 1;
|
|
if ( ! g_threads.registerType ( MERGE_THREAD , max2,1000) )
|
|
return log("thread: Failed to register thread type." );
|
|
// will raising this from 1 to 2 make it faster too?
|
|
// i raised since global specs new servers have 2 (hyperthreaded?) cpus
|
|
int32_t max = g_conf.m_maxCpuThreads;
|
|
if ( max < 1 ) max = 1;
|
|
if ( ! g_threads.registerType ( INTERSECT_THREAD,max,10) )
|
|
return log("thread: Failed to register thread type." );
|
|
// filter thread spawned to call popen() to filter an http reply
|
|
if ( ! g_threads.registerType ( FILTER_THREAD, 2/*maxThreads*/,300) )
|
|
return log("thread: Failed to register thread type." );
|
|
// RdbTree uses this to save itself
|
|
if ( ! g_threads.registerType ( SAVETREE_THREAD,1/*maxThreads*/,100) )
|
|
return log("thread: Failed to register thread type." );
|
|
// . File.cpp spawns a rename thread for doing renames and unlinks
|
|
// . doing a tight merge on titldb can be ~250 unlinks
|
|
// . MDW up from 1 to 30 max, after doing a ddump on 3000+ collections
|
|
// it was taking forever to go one at a time through the unlink
|
|
// thread queue. seemed like a 1 second space between unlinks.
|
|
// 1/23/1014
|
|
if ( ! g_threads.registerType ( UNLINK_THREAD,5/*maxThreads*/,3000) )
|
|
return log("thread: Failed to register thread type." );
|
|
// generic multipurpose
|
|
if ( ! g_threads.registerType (GENERIC_THREAD,20/*maxThreads*/,100) )
|
|
return log("thread: Failed to register thread type." );
|
|
// for call SSL_accept() which blocks for 10ms even when socket
|
|
// is non-blocking...
|
|
//if (!g_threads.registerType (SSLACCEPT_THREAD,20/*maxThreads*/,100))
|
|
// return log("thread: Failed to register thread type." );
|
|
|
|
//#ifndef PTHREADS
|
|
|
|
// sanity check
|
|
if ( GUARDSIZE >= STACK_SIZE )
|
|
return log("thread: Stack guard size too big.");
|
|
// not more than this outstanding
|
|
int32_t maxThreads = 0;
|
|
for ( int32_t i = 0 ; i < m_numQueues ; i++ )
|
|
maxThreads += m_threadQueues[i].m_maxLaunched;
|
|
// limit to stack we got
|
|
if ( maxThreads > MAX_STACKS ) maxThreads = MAX_STACKS;
|
|
// allocate the stack space
|
|
s_stackAllocSize = STACK_SIZE * maxThreads + THRPAGESIZE ;
|
|
// clear stack to help check for overwrites
|
|
s_stackAlloc = (char *) mcalloc ( s_stackAllocSize , "ThreadStack" );
|
|
if ( ! s_stackAlloc )
|
|
return log("thread: Unable to allocate %" INT32 " bytes for thread "
|
|
"stacks.", s_stackAllocSize);
|
|
log(LOG_INIT,"thread: Using %" INT32 " bytes for %" INT32 " thread stacks.",
|
|
s_stackAllocSize,maxThreads);
|
|
// align
|
|
s_stack = (char *)(((uint64_t)s_stackAlloc+THRPAGESIZE-1)&~(THRPAGESIZE-1));
|
|
// new size
|
|
s_stackSize = s_stackAllocSize - (s_stack - s_stackAlloc);
|
|
// protect the whole stack while not in use
|
|
if ( mprotect ( s_stack , s_stackSize , PROT_NONE ) )
|
|
log("thread: Call to mprotect failed: %s.",mstrerror(errno));
|
|
// test
|
|
//s_stack[0] = 1;
|
|
// init the linked list
|
|
for ( int32_t i = 0 ; i < MAX_STACKS ; i++ ) {
|
|
if ( i == MAX_STACKS - 1 ) s_next[i] = -1;
|
|
else s_next[i] = i + 1;
|
|
s_stackPtrs[i] = s_stack + STACK_SIZE * i;
|
|
}
|
|
s_head = 0;
|
|
|
|
// don't do real time stuff for now
|
|
return true;
|
|
|
|
//#else
|
|
|
|
// . keep stack size small, 128k
|
|
// . if we get problems, we'll increase this to 256k
|
|
// . seems like it grows dynamically from 4K to up to 2M as needed
|
|
//if ( pthread_attr_setstacksize ( &s_attr , (size_t)128*1024 ) )
|
|
// return log("thread: init: pthread_attr_setstacksize: %s",
|
|
// mstrerror(errno));
|
|
//pthread_attr_setschedparam ( &s_attr , PTHREAD_EXPLICIT_SCHED );
|
|
//pthread_attr_setscope ( &s_attr , PTHREAD_SCOPE_SYSTEM );
|
|
// return true;
|
|
|
|
//#endif
|
|
|
|
}
|
|
|
|
// all types should be registered in main.cpp before any threads launch
|
|
bool Threads::registerType ( char type , int32_t maxThreads , int32_t maxEntries ) {
|
|
// return false and set g_errno if no more room
|
|
if ( m_numQueues >= MAX_THREAD_QUEUES ) {
|
|
g_errno = EBUFTOOSMALL;
|
|
return log(LOG_LOGIC,"thread: registerType: Too many thread "
|
|
"queues");
|
|
}
|
|
// initialize the ThreadQueue class for this type
|
|
if ( ! m_threadQueues[m_numQueues].init(type,maxThreads,maxEntries))
|
|
return false;
|
|
// we got one more queue now
|
|
m_numQueues++;
|
|
return true;
|
|
}
|
|
|
|
int32_t Threads::getNumThreadsOutOrQueued() {
|
|
int32_t n = 0;
|
|
for ( int32_t i = 0 ; i < m_numQueues ; i++ ) {
|
|
// skip INTERSECT_THREAD, used to intersect termlists to
|
|
// resolve a query. i've seen these get stuck in an infinite
|
|
// loop sometimes.
|
|
if ( i == INTERSECT_THREAD ) continue;
|
|
// skip filter threads, those get stuck sometimes
|
|
if ( i == FILTER_THREAD ) continue;
|
|
// tally up all other threads
|
|
n += m_threadQueues[i].getNumThreadsOutOrQueued();
|
|
}
|
|
return n;
|
|
}
|
|
|
|
int32_t Threads::getNumWriteThreadsOut() {
|
|
return m_threadQueues[DISK_THREAD].getNumWriteThreadsOut();
|
|
}
|
|
|
|
int32_t Threads::getNumActiveWriteUnlinkRenameThreadsOut() {
|
|
// these do not countthreads that are done, and just awaiting join
|
|
int32_t n = m_threadQueues[DISK_THREAD].getNumWriteThreadsOut();
|
|
n += m_threadQueues[UNLINK_THREAD].getNumActiveThreadsOut();
|
|
return n;
|
|
}
|
|
|
|
// . returns false (and may set errno) if failed to launch a thread
|
|
// . returns true if thread added to queue successfully
|
|
// . may be launched instantly or later depending on # of threads in the queue
|
|
bool Threads::call ( char type ,
|
|
int32_t niceness ,
|
|
void *state ,
|
|
void (* callback )(void *state,ThreadEntry *t) ,
|
|
void *(* startRoutine)(void *state,ThreadEntry *t) ) {
|
|
// debug
|
|
//return false;
|
|
#ifdef _VALGRIND_
|
|
return false;
|
|
#endif
|
|
|
|
g_errno = 0;
|
|
|
|
// don't spawn any if disabled
|
|
if ( m_disabled ) return false;
|
|
if ( ! g_conf.m_useThreads ) return false;
|
|
|
|
if ( type == DISK_THREAD && ! g_conf.m_useThreadsForDisk )
|
|
return false;
|
|
if ( type == MERGE_THREAD && ! g_conf.m_useThreadsForIndexOps )
|
|
return false;
|
|
if ( type == INTERSECT_THREAD && ! g_conf.m_useThreadsForIndexOps )
|
|
return false;
|
|
if ( type == FILTER_THREAD && ! g_conf.m_useThreadsForSystemCalls )
|
|
return false;
|
|
|
|
if ( ! m_initialized && ! init() )
|
|
return log("db: Threads init failed." );
|
|
|
|
// . sanity check
|
|
// . a thread can NOT call this
|
|
//if ( getpid() != s_pid ) {
|
|
// fprintf(stderr,"thread: call: bad engineer\n");
|
|
// ::exit(-1);
|
|
//}
|
|
// don't launch for now
|
|
//return false;
|
|
// . sanity check
|
|
// . ensure top 4 bytes of state is the callback
|
|
//if ( *(int32_t *)state != (int32_t)callback ) {
|
|
// g_errno = EBADENGINEER;
|
|
// sleep(50000);
|
|
// return log("thread: call: top 4 bytes of state != callback");
|
|
//}
|
|
// debug msg
|
|
//log("adding thread to queue, type=%" INT32 "",(int32_t)type);
|
|
// find the type
|
|
int32_t i;
|
|
for ( i = 0 ; i < m_numQueues ; i++ )
|
|
if ( m_threadQueues[i].m_threadType == type ) break;
|
|
// bitch if type not added via registerType() call
|
|
if ( i == m_numQueues ) {
|
|
g_errno = EBADENGINEER;
|
|
return log(LOG_LOGIC,"thread: addtoqueue: Unregistered "
|
|
"thread type %" INT32 "",(int32_t)type);
|
|
}
|
|
// debug msg
|
|
//log("thread: call: adding entry for thread");
|
|
// . add to this queue
|
|
// . returns NULL and sets g_errno on error
|
|
ThreadEntry *t = m_threadQueues[i].addEntry(niceness,state,
|
|
callback,startRoutine);
|
|
if ( ! t ) return log("thread: Failed to add entry to thread pool: "
|
|
"%s.",mstrerror(g_errno));
|
|
// debug msg
|
|
//log("added");
|
|
// clear g_errno
|
|
//g_errno = 0;
|
|
// . try to launch as many threads as we can
|
|
// . this sets g_errno on error
|
|
// . if it has an error, just ignore it, our thread is queued
|
|
launchLoop:
|
|
if ( m_threadQueues[i].launchThread2 ( ) )
|
|
goto launchLoop;
|
|
//if ( ! m_threadQueues[i].launchThread2 ( t ) && g_errno ) {
|
|
// log("thread: failed thread launch: %s",mstrerror(g_errno));
|
|
// return false;
|
|
//}
|
|
|
|
// return false if there was an error launching the thread
|
|
//if ( g_errno ) return false;
|
|
// clear g_errno
|
|
g_errno = 0;
|
|
// success
|
|
return true;
|
|
}
|
|
|
|
// static void launchThreadsWrapper ( int fd , void *state ) {
|
|
// // debug msg
|
|
// //if ( g_conf.m_timingDebugEnabled )
|
|
// // log("thread: launchThreadsWrapper: entered");
|
|
// // clean up
|
|
// g_threads.cleanUp(NULL,1000);
|
|
// // and launch
|
|
// g_threads.launchThreads();
|
|
// }
|
|
|
|
static void killStalledFiltersWrapper ( int fd , void *state ) {
|
|
// bail if no pid
|
|
if ( g_pid == -1 ) return;
|
|
// . only kill after ticker reaches a count of 30
|
|
// . we are called once every second, so inc it each time
|
|
int32_t timeout = g_filterTimeout;
|
|
if ( timeout <= 0 ) timeout = 30;
|
|
if ( g_ticker++ < timeout ) return;
|
|
// debug
|
|
log("threads: killing stalled filter process of age %" INT32 " "
|
|
"seconds and pid=%" INT32 ".",g_ticker,(int32_t)g_pid);
|
|
// kill him
|
|
int err = kill ( g_pid , 9 );
|
|
// don't kill again
|
|
g_pid = -1;
|
|
if ( err != 0 ) log("threads: kill filter: %s", mstrerror(err) );
|
|
}
|
|
|
|
// . called by g_loop in Loop.cpp after getting a SI_QUEUE signal that it
|
|
// is from when a thread exited
|
|
// . we put that signal there using sigqeueue() in Threads::exit()
|
|
// . this way another thread can be launched right away
|
|
int32_t Threads::launchThreads ( ) {
|
|
|
|
// stop launching threads if trying to exit.
|
|
// only launch save tree threads. so if in the middle of saving
|
|
// we allow it to complete?
|
|
if ( g_process.m_mode == EXIT_MODE )
|
|
return 0;
|
|
|
|
// try launching from each queue
|
|
int32_t numLaunched = 0;
|
|
// try to launch DISK threads last so cpu-based threads get precedence
|
|
for ( int32_t i = m_numQueues - 1 ; i >= 0 ; i-- ) {
|
|
|
|
// clear g_errno
|
|
g_errno = 0;
|
|
// launch as many threads as we can from queue #i
|
|
while ( m_threadQueues[i].launchThread2( ) )
|
|
numLaunched++;
|
|
// continue if no g_errno set
|
|
if ( ! g_errno ) continue;
|
|
// otherwise bitch about it
|
|
log("thread: Failed to launch thread: %s.",mstrerror(g_errno));
|
|
}
|
|
// clear g_errno
|
|
g_errno = 0;
|
|
return numLaunched;
|
|
}
|
|
|
|
// . will cancel currently running low priority threads
|
|
// . will prevent any low priority threads from launching
|
|
// . will only cancel disk threads for now
|
|
void Threads::suspendLowPriorityThreads() {
|
|
// debug msg
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: SUSPENDING LOW-PRIORITY THREADS.");
|
|
// just cancel disk threads for now
|
|
for ( int32_t i = 0 ; i < m_numQueues; i++ )
|
|
m_threadQueues[i].suspendLowPriorityThreads();
|
|
}
|
|
|
|
void Threads::resumeLowPriorityThreads() {
|
|
// debug msg
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: RESUMING LOW-PRIORITY THREADS.");
|
|
for ( int32_t i = 0 ; i < m_numQueues; i++ )
|
|
m_threadQueues[i].resumeLowPriorityThreads();
|
|
}
|
|
|
|
//void Threads::cancelLowPriorityThreads () {
|
|
// for ( int32_t i = 0 ; i < m_numQueues; i++ )
|
|
// m_threadQueues[i].cancelLowPriorityThreads();
|
|
//}
|
|
|
|
void checkList ( ThreadEntry **headPtr , ThreadEntry **tailPtr ) {
|
|
ThreadEntry *t = *headPtr;
|
|
// another check
|
|
if ( tailPtr && *headPtr && ! *tailPtr ) { char *xx=NULL;*xx=0; }
|
|
if ( tailPtr && ! *headPtr && *tailPtr ) { char *xx=NULL;*xx=0; }
|
|
if ( ! t ) return;
|
|
// head can not have a prev
|
|
if ( t->m_prevLink ) { char *xx=NULL;*xx=0; }
|
|
ThreadEntry *last = NULL;
|
|
for ( ; t ; t = t->m_nextLink ) {
|
|
if ( t->m_prevLink != last ) { char *xx=NULL;*xx=0; }
|
|
if ( last && last->m_nextLink != t ) { char *xx=NULL;*xx=0; }
|
|
last = t;
|
|
}
|
|
if ( ! tailPtr ) return;
|
|
t = *tailPtr;
|
|
// tail can not have a next
|
|
if ( t->m_nextLink ) { char *xx=NULL;*xx=0; }
|
|
last = NULL;
|
|
for ( ; t ; t = t->m_prevLink ) {
|
|
if ( t->m_nextLink != last ) { char *xx=NULL;*xx=0; }
|
|
if ( last && last->m_prevLink != t ) { char *xx=NULL;*xx=0; }
|
|
last = t;
|
|
}
|
|
}
|
|
|
|
|
|
// remove from a head-only linked list
|
|
void removeLink ( ThreadEntry **headPtr , ThreadEntry *t ) {
|
|
// if ( g_conf.m_logDebugThread )
|
|
// logf(LOG_DEBUG,"thread: removeLink [t=0x%" PTRFMT "]",
|
|
// (PTRTYPE)t);
|
|
if ( g_conf.m_logDebugThread )
|
|
checkList ( headPtr , NULL );
|
|
if ( t->m_prevLink ) t->m_prevLink->m_nextLink = t->m_nextLink;
|
|
else *headPtr = t->m_nextLink;
|
|
if ( t->m_nextLink ) t->m_nextLink->m_prevLink = t->m_prevLink;
|
|
t->m_nextLink = NULL;
|
|
t->m_prevLink = NULL;
|
|
if ( g_conf.m_logDebugThread )
|
|
checkList ( headPtr , NULL );
|
|
}
|
|
// MDW: verify this::::: and the one above!!!!!!!!!!!!!!!!
|
|
void removeLink2 ( ThreadEntry **headPtr ,
|
|
ThreadEntry **tailPtr ,
|
|
ThreadEntry *t ) {
|
|
// if ( g_conf.m_logDebugThread )
|
|
// logf(LOG_DEBUG,"thread: removeLink2 [t=0x%" PTRFMT "]",
|
|
// (PTRTYPE)t);
|
|
if ( g_conf.m_logDebugThread )
|
|
checkList ( headPtr , tailPtr );
|
|
if ( t->m_prevLink ) t->m_prevLink->m_nextLink = t->m_nextLink;
|
|
else *headPtr = t->m_nextLink;
|
|
if ( t->m_nextLink ) t->m_nextLink->m_prevLink = t->m_prevLink;
|
|
else *tailPtr = t->m_prevLink;
|
|
t->m_nextLink = NULL;
|
|
t->m_prevLink = NULL;
|
|
if ( g_conf.m_logDebugThread )
|
|
checkList ( headPtr , tailPtr );
|
|
}
|
|
|
|
// add to a head/tail linked list's tail
|
|
void addLinkToTail ( ThreadEntry **headPtr ,
|
|
ThreadEntry **tailPtr ,
|
|
ThreadEntry *t ) {
|
|
// if ( g_conf.m_logDebugThread )
|
|
// logf(LOG_DEBUG,"thread: addLinkToTail [t=0x%" PTRFMT "]",
|
|
// (PTRTYPE)t);
|
|
if ( g_conf.m_logDebugThread )
|
|
checkList ( headPtr , tailPtr );
|
|
if ( *tailPtr ) {
|
|
(*tailPtr)->m_nextLink = t;
|
|
t->m_nextLink = NULL;
|
|
t->m_prevLink = *tailPtr;
|
|
*tailPtr = t; // t is the new tail
|
|
}
|
|
else {
|
|
*headPtr = t;
|
|
*tailPtr = t;
|
|
t->m_nextLink = NULL;
|
|
t->m_prevLink = NULL;
|
|
}
|
|
if ( g_conf.m_logDebugThread )
|
|
checkList ( headPtr , tailPtr );
|
|
}
|
|
|
|
// add to a head/tail linked list's head
|
|
void addLinkToHead ( ThreadEntry **headPtr ,
|
|
ThreadEntry **tailPtr ,
|
|
ThreadEntry *t ) {
|
|
// if ( g_conf.m_logDebugThread )
|
|
// logf(LOG_DEBUG,"thread: addLinkToHead [t=0x%" PTRFMT "]",
|
|
// (PTRTYPE)t);
|
|
if ( g_conf.m_logDebugThread )
|
|
checkList ( headPtr , tailPtr );
|
|
if ( *headPtr ) {
|
|
(*headPtr)->m_prevLink = t;
|
|
t->m_prevLink = NULL;
|
|
t->m_nextLink = *headPtr;
|
|
*headPtr = t; // t is the new head
|
|
}
|
|
else {
|
|
*headPtr = t;
|
|
*tailPtr = t;
|
|
t->m_nextLink = NULL;
|
|
t->m_prevLink = NULL;
|
|
}
|
|
if ( g_conf.m_logDebugThread )
|
|
checkList ( headPtr , tailPtr );
|
|
}
|
|
|
|
// add to a head-only linked list
|
|
void addLink ( ThreadEntry **headPtr ,
|
|
ThreadEntry *t ) {
|
|
// if ( g_conf.m_logDebugThread )
|
|
// logf(LOG_DEBUG,"thread: addLink [t=0x%" PTRFMT "]",
|
|
// (PTRTYPE)t);
|
|
if ( g_conf.m_logDebugThread )
|
|
checkList ( headPtr , NULL );
|
|
if ( *headPtr ) {
|
|
(*headPtr)->m_prevLink = t;
|
|
t->m_prevLink = NULL;
|
|
t->m_nextLink = *headPtr;
|
|
*headPtr = t; // t is the new head
|
|
}
|
|
else {
|
|
*headPtr = t;
|
|
t->m_nextLink = NULL;
|
|
t->m_prevLink = NULL;
|
|
}
|
|
if ( g_conf.m_logDebugThread )
|
|
checkList ( headPtr , NULL );
|
|
}
|
|
|
|
|
|
///////////////////////////////////////////////////////////////////////
|
|
// functions for ThreadQueue
|
|
///////////////////////////////////////////////////////////////////////
|
|
|
|
ThreadQueue::ThreadQueue ( ) {
|
|
m_entries = NULL;
|
|
m_entriesSize = 0;
|
|
}
|
|
|
|
void ThreadQueue::reset ( ) {
|
|
if ( m_entries ) mfree ( m_entries , m_entriesSize , "Threads" );
|
|
m_entries = NULL;
|
|
m_top = 0;
|
|
}
|
|
|
|
bool ThreadQueue::init ( char threadType, int32_t maxThreads, int32_t maxEntries ) {
|
|
m_threadType = threadType;
|
|
m_launched = 0;
|
|
m_returned = 0;
|
|
m_maxLaunched = maxThreads;
|
|
// # of low priority threads launched and returned
|
|
/*
|
|
m_hiLaunched = 0;
|
|
m_hiReturned = 0;
|
|
m_mdLaunched = 0;
|
|
m_mdReturned = 0;
|
|
m_loLaunched = 0;
|
|
m_loReturned = 0;
|
|
// we now count write threads so we can limit that
|
|
m_writesLaunched = 0;
|
|
m_writesReturned = 0;
|
|
// these are for disk threads, which we now limit based on read sizes
|
|
m_hiLaunchedBig = 0;
|
|
m_hiReturnedBig = 0;
|
|
m_mdLaunchedBig = 0;
|
|
m_mdReturnedBig = 0;
|
|
m_loLaunchedBig = 0;
|
|
m_loReturnedBig = 0;
|
|
m_hiLaunchedMed = 0;
|
|
m_hiReturnedMed = 0;
|
|
m_mdLaunchedMed = 0;
|
|
m_mdReturnedMed = 0;
|
|
m_loLaunchedMed = 0;
|
|
m_loReturnedMed = 0;
|
|
m_hiLaunchedSma = 0;
|
|
m_hiReturnedSma = 0;
|
|
m_mdLaunchedSma = 0;
|
|
m_mdReturnedSma = 0;
|
|
m_loLaunchedSma = 0;
|
|
m_loReturnedSma = 0;
|
|
*/
|
|
|
|
//m_entriesUsed = 0;
|
|
//m_top = 0;
|
|
m_isLowPrioritySuspended = false;
|
|
// alloc space for entries
|
|
m_maxEntries = maxEntries;
|
|
m_entriesSize = sizeof(ThreadEntry)*m_maxEntries;
|
|
m_entries = (ThreadEntry *)mmalloc ( m_entriesSize , "Threads" );
|
|
if ( ! m_entries ) return log("thread: Failed to allocate %" INT32 " bytes "
|
|
"for thread queue.",m_entriesSize);
|
|
// debug msg
|
|
//log("INIT CALLED. setting all m_isDone to 1.");
|
|
// clear m_isOccupied et al for new guys
|
|
//for ( int32_t i = 0 ; i < MAX_THREAD_ENTRIES ; i++ ) {
|
|
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
|
|
m_entries[i].m_isOccupied = false;
|
|
m_entries[i].m_isLaunched = false;
|
|
m_entries[i].m_isDone = true;
|
|
m_entries[i].m_qnum = threadType;
|
|
m_entries[i].m_stack = NULL;
|
|
}
|
|
|
|
m_emptyHead = NULL;
|
|
m_waitHead0 = NULL;
|
|
m_waitHead1 = NULL;
|
|
m_waitHead2 = NULL;
|
|
m_waitHead3 = NULL;
|
|
m_waitHead4 = NULL;
|
|
m_waitHead5 = NULL;
|
|
m_waitHead6 = NULL;
|
|
|
|
m_waitTail0 = NULL;
|
|
m_waitTail1 = NULL;
|
|
m_waitTail2 = NULL;
|
|
m_waitTail3 = NULL;
|
|
m_waitTail4 = NULL;
|
|
m_waitTail5 = NULL;
|
|
m_waitTail6 = NULL;
|
|
|
|
m_launchedHead = NULL;
|
|
|
|
// do not spam the log with log debug msgs even if it is on
|
|
char debug = g_conf.m_logDebugThread;
|
|
g_conf.m_logDebugThread = 0;
|
|
|
|
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
|
|
ThreadEntry *t = &m_entries[i];
|
|
t->m_prevLink = NULL;
|
|
t->m_nextLink = NULL;
|
|
addLink ( &m_emptyHead , t );
|
|
}
|
|
|
|
g_conf.m_logDebugThread = debug;
|
|
|
|
return true;
|
|
}
|
|
|
|
int32_t ThreadQueue::getNumActiveThreadsOut() {
|
|
int32_t n = 0;
|
|
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
|
|
ThreadEntry *e = &m_entries[i];
|
|
if ( ! e->m_isOccupied ) continue;
|
|
if ( ! e->m_isLaunched ) continue;
|
|
// if it is done and just waiting for a join, do not count
|
|
if ( e->m_isDone ) continue;
|
|
n++;
|
|
}
|
|
return n;
|
|
}
|
|
|
|
int32_t ThreadQueue::getNumThreadsOutOrQueued() {
|
|
// MDW: we also need to count threads that are returned but need their
|
|
// callback called so, in the case of RdbDump, the rdblist that was written
|
|
// to disk can update the rdbmap before it gets saved, so it doesn't get
|
|
// out of sync. Process.cpp calls .suspendMerge() to make sure that all
|
|
// merge operations are suspended as well.
|
|
int32_t n = 0;
|
|
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
|
|
ThreadEntry *e = &m_entries[i];
|
|
if ( e->m_isOccupied ) n++;
|
|
}
|
|
return n;
|
|
/*
|
|
int32_t n = m_launched - m_returned;
|
|
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
|
|
ThreadEntry *e = &m_entries[i];
|
|
if ( ! e->m_isOccupied ) continue;
|
|
if ( e->m_isLaunched ) continue;
|
|
if ( e->m_isDone ) continue;
|
|
// do not count "reads", only count writes
|
|
//if ( m_threadType == DISK_THREAD && e->m_state ) {
|
|
// FileState *fs = (FileState *)e->m_state;
|
|
// if ( fs->m_doWrite ) continue;
|
|
//}
|
|
}
|
|
return n;
|
|
*/
|
|
}
|
|
|
|
int32_t ThreadQueue::getNumWriteThreadsOut () {
|
|
// only consider disk threads
|
|
if ( m_threadType != DISK_THREAD ) return 0;
|
|
int32_t n = 0;
|
|
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
|
|
ThreadEntry *e = &m_entries[i];
|
|
if ( ! e->m_isOccupied ) continue;
|
|
if ( ! e->m_isLaunched ) continue;
|
|
if ( e->m_isDone ) continue;
|
|
FileState *fs = (FileState *)e->m_state;
|
|
if ( ! fs ) continue;
|
|
if ( ! fs->m_doWrite ) continue;
|
|
n++;
|
|
}
|
|
return n;
|
|
}
|
|
|
|
|
|
|
|
// return NULL and set g_errno on error
|
|
ThreadEntry *ThreadQueue::addEntry ( int32_t niceness ,
|
|
void *state ,
|
|
void (* callback )(void *state,
|
|
ThreadEntry *t) ,
|
|
void *(* startRoutine)(void *state,
|
|
ThreadEntry *t) ) {
|
|
|
|
// if we are 90% full and niceness is > 0, knock it off
|
|
// int32_t max = m_maxEntries;
|
|
// if ( m_threadType == DISK_THREAD && niceness > 0 ) {
|
|
// max = (m_maxEntries * 90) / 100;
|
|
// if ( max <= 0 ) max = 1;
|
|
// }
|
|
|
|
ThreadEntry *t = m_emptyHead;
|
|
if ( ! t ) {
|
|
g_errno = ENOTHREADSLOTS;
|
|
static time_t s_time = 0;
|
|
time_t now = getTime();
|
|
if ( now - s_time > 5 ) {
|
|
log("thread: Could not add thread to queue. Already "
|
|
"have %" INT32 " entries.",m_maxEntries);
|
|
s_time = now;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
// sanity
|
|
if ( t->m_isOccupied ) { char *xx=NULL;*xx=0; }
|
|
|
|
|
|
// debug test
|
|
//if ( rand() %10 == 1 ) { g_errno = ENOTHREADSLOTS; return NULL; }
|
|
// get first available entry, not in use
|
|
//int32_t i;
|
|
//for ( i = 0 ; i < MAX_THREAD_ENTRIES ; i++ )
|
|
// for ( i = 0 ; i < max ; i++ )
|
|
// if ( ! m_entries[i].m_isOccupied ) break;
|
|
// caution
|
|
// //if ( i >= MAX_THREAD_ENTRIES ) {
|
|
// if ( i >= max ) {
|
|
// g_errno = ENOTHREADSLOTS;
|
|
// static time_t s_time = 0;
|
|
// time_t now = getTime();
|
|
// if ( now - s_time > 5 ) {
|
|
// log("thread: Could not add thread to queue. Already "
|
|
// "have %" INT32 " entries.",max);
|
|
// s_time = now;
|
|
// }
|
|
// return NULL;
|
|
// }
|
|
// debug msg
|
|
//fprintf(stderr,"addEntry my pid=%" UINT32 "\n", (int32_t)getpid() );
|
|
// get an available entry
|
|
//ThreadEntry *t = &m_entries [ i ];
|
|
// debug msg
|
|
//log("claiming entry state=%" UINT32 ", occupied=%" INT32 "",(int32_t)t->m_state,
|
|
// (int32_t)t->m_isOccupied);
|
|
// stick it in
|
|
t->m_niceness = niceness;
|
|
t->m_state = state;
|
|
t->m_callback = callback;
|
|
t->m_startRoutine = startRoutine;
|
|
t->m_isOccupied = true;
|
|
t->m_isCancelled = false;
|
|
t->m_stack = NULL;
|
|
// debug msg
|
|
//log("setting t=%" UINT32 " m_isDone to 0", (int32_t)t );
|
|
t->m_isDone = false;
|
|
t->m_isLaunched = false;
|
|
t->m_queuedTime = gettimeofdayInMilliseconds();
|
|
t->m_readyForBail = false;
|
|
t->m_allocBuf = NULL;
|
|
t->m_allocSize = 0;
|
|
t->m_errno = 0;
|
|
|
|
t->m_bestHeadPtr = NULL;
|
|
t->m_bestTailPtr = NULL;
|
|
|
|
// and when the ohcrap callback gets called and the thread
|
|
// is cleaned up it will check the FileState readsize and
|
|
// m_isWrite to see which launch counts to decrement, so
|
|
// since FileState will be corrupted, we need to store
|
|
// this info directly into the thread entry.
|
|
if ( m_threadType == DISK_THREAD && t->m_state ) {
|
|
FileState *fs = (FileState *)t->m_state;
|
|
t->m_bytesToGo = fs->m_bytesToGo;
|
|
t->m_doWrite = fs->m_doWrite ;
|
|
}
|
|
else {
|
|
t->m_bytesToGo = 0;
|
|
t->m_doWrite = false;
|
|
}
|
|
|
|
// inc the used count
|
|
//m_entriesUsed++;
|
|
// debug msg
|
|
//log("m_entriesUsed now %" INT32 "",m_entriesUsed);
|
|
// might have to inc top as well
|
|
//if ( i == m_top ) m_top++;
|
|
|
|
// there's only two linked lists we can wait in if we are not disk
|
|
if ( m_threadType != DISK_THREAD ) {
|
|
ThreadEntry **bestHeadPtr = NULL;
|
|
ThreadEntry **bestTailPtr = NULL;
|
|
if ( niceness <= 0 ) {
|
|
bestHeadPtr = &m_waitHead0;
|
|
bestTailPtr = &m_waitTail0;
|
|
}
|
|
// 'merge' threads from disk merge ops have niceness 1
|
|
else if ( niceness == 1 ) {
|
|
bestHeadPtr = &m_waitHead1;
|
|
bestTailPtr = &m_waitTail1;
|
|
}
|
|
// niceness is 2? MAX_NICENESS
|
|
else {
|
|
bestHeadPtr = &m_waitHead2;
|
|
bestTailPtr = &m_waitTail2;
|
|
}
|
|
// remove from empty list
|
|
removeLink ( &m_emptyHead , t );
|
|
// add to waiting list at the end
|
|
addLinkToTail ( bestHeadPtr , bestTailPtr , t );
|
|
// debug msg
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: [t=0x%" PTRFMT "] "
|
|
"queued %s thread for launch1. "
|
|
"niceness=%" INT32 ". ", (PTRTYPE)t,
|
|
getThreadType(), (int32_t)niceness );
|
|
return t;
|
|
}
|
|
|
|
//
|
|
// otherwise we are a disk thread, there's 7 linked lists to use
|
|
//
|
|
|
|
// shortcuts
|
|
//int32_t rs = t->m_bytesToGo;
|
|
char nice = t->m_niceness;
|
|
|
|
// get best thread candidate from best linked list of candidates
|
|
ThreadEntry **bestHeadPtr = NULL;
|
|
ThreadEntry **bestTailPtr = NULL;
|
|
|
|
// short/med/long high priority (niceness 0) disk reads in head0/1/2
|
|
// but we can't launch one more if already at our quota.
|
|
if ( ! bestHeadPtr && nice == 0 ) { //&& rs <= g_conf.m_smaReadSize ) {
|
|
bestHeadPtr = &m_waitHead0;
|
|
bestTailPtr = &m_waitTail0;
|
|
}
|
|
// if ( ! bestHeadPtr && nice == 0 && rs <= g_conf.m_medReadSize ) {
|
|
// bestHeadPtr = &m_waitHead1;
|
|
// bestTailPtr = &m_waitTail1;
|
|
// }
|
|
// if ( ! bestHeadPtr && nice == 0 ) {
|
|
// bestHeadPtr = &m_waitHead2;
|
|
// bestTailPtr = &m_waitTail2;
|
|
// }
|
|
// low priority (merge or dump) disk WRITES go in waithead4
|
|
if ( ! bestHeadPtr && t->m_doWrite ) {
|
|
bestHeadPtr = &m_waitHead4;
|
|
bestTailPtr = &m_waitTail4;
|
|
}
|
|
|
|
// niceness 1 read threads here
|
|
if ( ! bestHeadPtr && nice == 1 ) {
|
|
bestHeadPtr = &m_waitHead5;
|
|
bestTailPtr = &m_waitTail5;
|
|
}
|
|
|
|
// niceness 2 read threads here
|
|
if ( ! bestHeadPtr ) {
|
|
bestHeadPtr = &m_waitHead6;
|
|
bestTailPtr = &m_waitTail6;
|
|
}
|
|
|
|
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: [t=0x%" PTRFMT "] "
|
|
"remove from empty list, add to wait list",
|
|
(PTRTYPE)t);
|
|
|
|
// remove from empty list
|
|
removeLink ( &m_emptyHead , t );
|
|
// sanity
|
|
if ( m_emptyHead == t ) { char *xx=NULL;*xx=0; }
|
|
// add to the new waiting list at the end
|
|
addLinkToTail ( bestHeadPtr , bestTailPtr , t );
|
|
|
|
// debug msg
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: [t=0x%" PTRFMT "] "
|
|
"queued %s thread for launch2. "
|
|
"niceness=%" INT32 ". ", (PTRTYPE)t,
|
|
getThreadType(), (int32_t)niceness );
|
|
// success
|
|
return t;
|
|
}
|
|
|
|
int32_t Threads::timedCleanUp (int32_t maxTime, int32_t niceness) {
|
|
|
|
// skip it if exiting
|
|
if ( g_process.m_mode == EXIT_MODE )
|
|
return 0;
|
|
|
|
if ( ! m_needsCleanup ) return 0;
|
|
|
|
//if ( g_inSigHandler ) return 0;
|
|
int64_t startTime = gettimeofdayInMillisecondsLocal();
|
|
int64_t took = 0;
|
|
|
|
if ( niceness >= MAX_NICENESS ) m_needsCleanup = false;
|
|
|
|
//for ( int32_t i = -1 ; i <= niceness ; i++ ) {
|
|
for ( int32_t i = 0 ; i <= niceness ; i++ ) {
|
|
|
|
for ( int32_t j = 0 ; j < m_numQueues ; j++ )
|
|
m_threadQueues[j].timedCleanUp ( niceness );
|
|
|
|
launchThreads();
|
|
|
|
if ( maxTime < 0 ) continue;
|
|
|
|
took = startTime - gettimeofdayInMillisecondsLocal();
|
|
if ( took <= maxTime ) continue;
|
|
// ok, we have to cut if short...
|
|
m_needsCleanup = true;
|
|
break;
|
|
}
|
|
return took;
|
|
}
|
|
|
|
bool Threads::isHittingFile ( BigFile *bf ) {
|
|
return m_threadQueues[DISK_THREAD].isHittingFile(bf);
|
|
}
|
|
|
|
bool ThreadQueue::isHittingFile ( BigFile *bf ) {
|
|
// loop through candidates
|
|
for ( int32_t i = 0 ; i < m_top; i++ ) {
|
|
// point to it
|
|
ThreadEntry *t = &m_entries[i];
|
|
// must be occupied to be done (sanity check)
|
|
if ( ! t->m_isOccupied ) continue;
|
|
// must not be done
|
|
if ( t->m_isDone ) continue;
|
|
// must be launched.. really??
|
|
//if ( ! t->m_isLaunched ) continue;
|
|
// must be a read
|
|
if ( t->m_startRoutine != readwriteWrapper_r ) continue;
|
|
// int16_tcut
|
|
FileState *fs = (FileState *)t->m_state;
|
|
// get bigfile ptr
|
|
if ( fs->m_this == bf ) return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
void Threads::bailOnReads ( ) {
|
|
m_threadQueues[DISK_THREAD].bailOnReads();
|
|
}
|
|
|
|
// Process.cpp calls these callbacks before their time in order to
|
|
// set EDISKSTUCK
|
|
void ThreadQueue::bailOnReads ( ) {
|
|
// note it
|
|
log("threads: bypassing read threads");
|
|
ThreadEntry *t = m_launchedHead;
|
|
ThreadEntry *nextLink = NULL;
|
|
// loop through candidates
|
|
//for ( int32_t i = 0 ; i < m_top; i++ ) {
|
|
for ( ; t ; t = nextLink ) {
|
|
// do it here in case we modify the linked list below
|
|
nextLink = t->m_nextLink;
|
|
// must be occupied to be done (sanity check)
|
|
if ( ! t->m_isOccupied ) continue;
|
|
// skip if not launched yet
|
|
//if ( ! t->m_isLaunched ) continue;
|
|
// must be niceness 0
|
|
if ( t->m_niceness != 0 ) continue;
|
|
// must not be done
|
|
if ( t->m_isDone ) continue;
|
|
// must not have already called callback
|
|
if ( t->m_callback == ohcrap ) continue;
|
|
// must be a read
|
|
if ( t->m_startRoutine != readwriteWrapper_r ) continue;
|
|
// int16_tcut
|
|
FileState *fs = (FileState *)t->m_state;
|
|
// do not stop writes
|
|
if ( fs->m_doWrite ) continue;
|
|
// must be niceness 0 too!
|
|
if ( fs->m_niceness != 0 ) continue;
|
|
// what is this? unlaunched...
|
|
//if ( t->m_pid == 0 ) continue;
|
|
// can only bail on a thread after it copies its FileState
|
|
// class into its stack so we can bypass it and free the
|
|
// original FileState without causing a core. if thread
|
|
// is not yet launched we have to call the callback here too
|
|
// otherwise it never gets launched until the disk is unstuck!
|
|
if ( ! t->m_readyForBail && t->m_isLaunched ) continue;
|
|
// set error
|
|
t->m_errno = EDISKSTUCK;
|
|
// set this too
|
|
g_errno = EDISKSTUCK;
|
|
// do not allow caller to free the alloc'd buf in case
|
|
// its read finally comes through!
|
|
t->m_allocBuf = fs->m_allocBuf;
|
|
t->m_allocSize = fs->m_allocSize;
|
|
fs->m_allocBuf = NULL;
|
|
fs->m_allocSize = 0;
|
|
// call it
|
|
t->m_callback ( t->m_state , t );
|
|
// do not re-call it...
|
|
t->m_callback = ohcrap;
|
|
// invalidate state (FileState usually)
|
|
t->m_state = NULL;
|
|
// do not launch if not yet launched
|
|
if ( t->m_isLaunched ) continue;
|
|
// delete him if not yet launched, otherwise we try to
|
|
// launch it later with a corrupted/unstable FileState...
|
|
// and that causes our launch counts to get out of whack i
|
|
// think...
|
|
t->m_isOccupied = false;
|
|
// note it
|
|
log("threads: bailing unlaunched thread");
|
|
// remove from linked list then
|
|
removeLink ( &m_launchedHead , t );
|
|
addLink ( &m_emptyHead , t );
|
|
// do we have to decrement top
|
|
// if ( m_top == i + 1 )
|
|
// while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied)
|
|
// m_top--;
|
|
}
|
|
}
|
|
|
|
// BigFile.cpp's readwriteWrapper_r() ThreadEntry::m_callback gets set to
|
|
// ohcrap() because it was taking too long to do its read and we prematurely
|
|
// called its callback above in bailOnReads(). In that case we still have to
|
|
// free the disk read buffer which was never used. And doneWrapper() in
|
|
// BigFile.cpp is never called.
|
|
void ohcrap ( void *state , ThreadEntry *t ) {
|
|
// free the read buffer here then
|
|
if ( t->m_allocBuf )
|
|
mfree ( t->m_allocBuf , t->m_allocSize , "RdbScan" );
|
|
log("threads: got one");
|
|
}
|
|
|
|
// . cleans up any threads that have exited
|
|
// . their m_isDone should be set to true
|
|
// . don't process threads whose niceness is > maxNiceness
|
|
// . return true if we cleaned one up
|
|
bool ThreadQueue::timedCleanUp ( int32_t maxNiceness ) {
|
|
|
|
// top:
|
|
int32_t numCallbacks = 0;
|
|
ThreadEntry *t = m_launchedHead;
|
|
ThreadEntry *nextLink = NULL;
|
|
// loop through candidates
|
|
for ( ; t ; t = nextLink ) {
|
|
// get it here in case we remove 't' from the linked list below
|
|
nextLink = t->m_nextLink;
|
|
// point to it
|
|
//ThreadEntry *t = &m_entries[i];
|
|
// skip if not qualified
|
|
if ( t->m_niceness > maxNiceness ) continue;
|
|
// must be occupied to be done (sanity check)
|
|
if ( ! t->m_isOccupied ) continue;
|
|
// skip if not launched yet
|
|
if ( ! t->m_isLaunched ) continue;
|
|
// . we were segfaulting right here before because the thread
|
|
// was setting t->m_pid and at this point it was not
|
|
// set so t->m_pid was a bogus value
|
|
// . thread may have seg faulted, in which case sigbadhandler()
|
|
// in Loop.cpp will get it and set errno to 0x7fffffff
|
|
#ifndef PTHREADS
|
|
// MDW: i hafta take this out because the errno_location thing
|
|
// is not working on the newer gcc
|
|
if ( ! t->m_isDone && t->m_pid >= 0 &&
|
|
s_errnos [t->m_pid] == 0x7fffffff ) {
|
|
log("thread: Got abnormal thread termination. Seems "
|
|
"like the thread might have cored.");
|
|
s_errnos[t->m_pid] = 0;
|
|
goto again;
|
|
}
|
|
#endif
|
|
// skip if not done yet
|
|
if ( ! t->m_isDone ) continue;
|
|
|
|
#ifdef PTHREADS
|
|
|
|
|
|
// if pthread_create() failed it returns the errno and we
|
|
// needsJoin is false, so do not try to join
|
|
// to a thread if we did not create it, lest pthread_join()
|
|
// cores
|
|
if ( t->m_needsJoin ) {
|
|
// . join up with that thread
|
|
// . damn, sometimes he can block forever on his
|
|
// call to sigqueue(),
|
|
int64_t startTime = gettimeofdayInMillisecondsLocal();
|
|
int64_t took;
|
|
int32_t status = pthread_join ( t->m_joinTid , NULL );
|
|
took = startTime - gettimeofdayInMillisecondsLocal();
|
|
if ( took > 50 ) {
|
|
log("threads: pthread_join took %i ms",
|
|
(int)took);
|
|
}
|
|
|
|
if ( status != 0 ) {
|
|
log("threads: pthread_join %" INT64 " = %s (%" INT32 ")",
|
|
(int64_t)t->m_joinTid,mstrerror(status),
|
|
status);
|
|
}
|
|
// debug msg
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: joined1 with "
|
|
"t=0x%" PTRFMT " "
|
|
"jointid=0x%" XINT64 ".",
|
|
(PTRTYPE)t,(int64_t)t->m_joinTid);
|
|
|
|
// re-protect this stack
|
|
mprotect ( t->m_stack + GUARDSIZE ,
|
|
STACK_SIZE - GUARDSIZE,
|
|
PROT_NONE );
|
|
g_threads.returnStack ( t->m_si );
|
|
t->m_stack = NULL;
|
|
|
|
}
|
|
|
|
#else
|
|
|
|
again:
|
|
int status ;
|
|
pid_t pid = waitpid ( t->m_pid , &status , 0 );
|
|
int err = errno;
|
|
// debug the waitpid
|
|
if ( g_conf.m_logDebugThread || g_process.m_exiting )
|
|
log(LOG_DEBUG,"thread: Waiting for t=0x%" PTRFMT " "
|
|
"pid=%" INT32 ".",
|
|
(PTRTYPE)t,(int32_t)t->m_pid);
|
|
// bitch and continue if join failed
|
|
if ( pid != t->m_pid ) {
|
|
// waitpid() gets interrupted by various signals so
|
|
// we need to repeat (SIGCHLD?)
|
|
if ( err == EINTR ) goto again;
|
|
log("thread: Call to waitpid(%" INT32 ") returned %" INT32 ": %s.",
|
|
(int32_t)t->m_pid,(int32_t)pid,mstrerror(err));
|
|
continue;
|
|
}
|
|
// if status not 0 then process got abnormal termination
|
|
if ( ! WIFEXITED(status) ) {
|
|
if ( WIFSIGNALED(status) )
|
|
log("thread: Child process (pid=%i) exited "
|
|
"from unhandled signal number %" INT32 ".",
|
|
pid,(int32_t)WTERMSIG(status));
|
|
else
|
|
log("thread: Child process (pid=%i) exited "
|
|
"for unknown reason." , pid );
|
|
}
|
|
//mfree ( t->m_stack , STACK_SIZE , "Threads" );
|
|
// re-protect this stack
|
|
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE,
|
|
PROT_NONE );
|
|
g_threads.returnStack ( t->m_si );
|
|
t->m_stack = NULL;
|
|
// debug msg
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: joined with pid=%" INT32 " pid=%" INT32 ".",
|
|
(int32_t)t->m_pid,(int32_t)t->m_pid);
|
|
|
|
|
|
#endif
|
|
|
|
// we may get cleaned up and re-used and our niceness reassignd
|
|
// right after set m_isDone to true, so save niceness
|
|
//int32_t niceness = t->m_niceness;
|
|
char qnum = t->m_qnum;
|
|
ThreadQueue *tq = &g_threads.m_threadQueues[(int)qnum];
|
|
if ( tq != this ) { char *xx = NULL; *xx = 0; }
|
|
|
|
// get read size before cleaning it up -- it could get nuked
|
|
//int32_t rs = 0;
|
|
bool isWrite = false;
|
|
if ( tq->m_threadType == DISK_THREAD ) { // && t->m_state ) {
|
|
//FileState *fs = (FileState *)t->m_state;
|
|
//rs = t->m_bytesToGo;
|
|
isWrite = t->m_doWrite ;
|
|
}
|
|
|
|
/*
|
|
if ( niceness <= 0) tq->m_hiReturned++;
|
|
else if ( niceness == 1) tq->m_mdReturned++;
|
|
else if ( niceness >= 2) tq->m_loReturned++;
|
|
// deal with the tiers for disk threads based on read sizes
|
|
if ( tq->m_threadType == DISK_THREAD ) {
|
|
// writes are special cases
|
|
if ( isWrite ) m_writesReturned++;
|
|
if ( rs >= 0 && niceness >= 2 ) {
|
|
if ( rs > g_conf.m_medReadSize )
|
|
tq->m_loReturnedBig++;
|
|
else if ( rs > g_conf.m_smaReadSize )
|
|
tq->m_loReturnedMed++;
|
|
else
|
|
tq->m_loReturnedSma++;
|
|
}
|
|
else if ( rs >= 0 && niceness >= 1 ) {
|
|
if ( rs > g_conf.m_medReadSize )
|
|
tq->m_mdReturnedBig++;
|
|
else if ( rs > g_conf.m_smaReadSize )
|
|
tq->m_mdReturnedMed++;
|
|
else
|
|
tq->m_mdReturnedSma++;
|
|
}
|
|
else if ( rs >= 0 ) {
|
|
if ( rs > g_conf.m_medReadSize )
|
|
tq->m_hiReturnedBig++;
|
|
else if ( rs > g_conf.m_smaReadSize )
|
|
tq->m_hiReturnedMed++;
|
|
else
|
|
tq->m_hiReturnedSma++;
|
|
}
|
|
}
|
|
*/
|
|
|
|
// now count him as returned
|
|
m_returned++;
|
|
|
|
// prepare for relaunch if we were cancelled
|
|
if ( t->m_isCancelled ) {
|
|
t->m_isCancelled = false;
|
|
t->m_isLaunched = false;
|
|
t->m_isDone = false;
|
|
// take out of launched linked list
|
|
removeLink ( &m_launchedHead , t );
|
|
// get the waiting linked list from whence we came
|
|
ThreadEntry **bestHeadPtr = t->m_bestHeadPtr;
|
|
ThreadEntry **bestTailPtr = t->m_bestTailPtr;
|
|
// put BACK into the waiting linked list at the head
|
|
addLinkToHead ( bestHeadPtr , bestTailPtr , t );
|
|
log("thread: Thread cancelled. Preparing thread "
|
|
"for relaunch");
|
|
continue;
|
|
}
|
|
numCallbacks++;
|
|
|
|
// not running any more
|
|
t->m_isLaunched = false;
|
|
// not occupied any more
|
|
t->m_isOccupied = false;
|
|
// do we have to decrement top
|
|
// if ( m_top == i + 1 )
|
|
// while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied)
|
|
// m_top--;
|
|
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: [t=0x%" PTRFMT "] "
|
|
"remove from launched list, add to empty list",
|
|
(PTRTYPE)t);
|
|
|
|
// now move it to the empty list
|
|
removeLink ( &m_launchedHead , t );
|
|
addLink ( &m_emptyHead , t );
|
|
// send a cancel sig to the thread in case it's still there
|
|
//int err = pthread_cancel ( t->m_tid );
|
|
//if ( err != 0 ) log("thread: cleanUp: pthread_cancel: %s",
|
|
// mstrerror(err) );
|
|
// one less entry occupied
|
|
//m_entriesUsed--;
|
|
// debug msg
|
|
//log("m_entriesUsed now %" INT32 "",m_entriesUsed);
|
|
// one more returned
|
|
//m_returned++;
|
|
// clear the g_errno in case set by a previous callback
|
|
//g_errno = 0;
|
|
// launch as many threads as we can before calling the
|
|
// callback since this may hog the CPU like Msg20 does
|
|
//g_threads.launchThreads();
|
|
|
|
|
|
g_errno = 0;
|
|
//g_loop.startBlockedCpuTimer();
|
|
//only allow a quickpoll if we are nice.
|
|
//g_loop.canQuickPoll(t->m_niceness);
|
|
|
|
makeCallback ( t );
|
|
|
|
//int64_t took = gettimeofdayInMilliseconds()-startTime;
|
|
//if(took > 8 && maxNiceness > 0) {
|
|
// if(g_conf.m_sequentialProfiling)
|
|
// log(LOG_TIMING,
|
|
// "admin: Threads spent %" INT64 " ms to callback "
|
|
// "%" INT32 " callbacks, nice: %" INT32 "",
|
|
// took, numCallbacks, maxNiceness);
|
|
// g_threads.m_needBottom = true;
|
|
// maxNiceness = 0;
|
|
//}
|
|
|
|
// clear errno again
|
|
g_errno = 0;
|
|
|
|
if ( g_conf.m_logDebugThread ) {
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
log(LOG_DEBUG,"thread: [t=0x%" PTRFMT "] %s done1. "
|
|
"active=%" INT32 " "
|
|
"time since queued = %" UINT64 " ms "
|
|
"time since launch = %" UINT64 " ms "
|
|
"time since pre-exit = %" UINT64 " ms "
|
|
"time since exit = %" UINT64 " ms",
|
|
(PTRTYPE)t,
|
|
getThreadType() ,
|
|
(int32_t)(m_launched - m_returned) ,
|
|
(uint64_t)(now - t->m_queuedTime),
|
|
(uint64_t)(now - t->m_launchedTime),
|
|
(uint64_t)(now - t->m_preExitTime) ,
|
|
(uint64_t)(now - t->m_exitTime) );
|
|
}
|
|
|
|
}
|
|
|
|
//since we need finer grained control in loop, we no longer collect
|
|
//the callbacks, sort, then call them. we now call them right away
|
|
//that way we can break out if we start taking too long and
|
|
//give control back to udpserver.
|
|
return numCallbacks != 0;
|
|
}
|
|
|
|
void makeCallback ( ThreadEntry *t ) {
|
|
|
|
// sanity check - if in a high niceness callback, we should
|
|
// only be calling niceness 0 callbacks here
|
|
// no, this is only called from sleep wrappers originating from
|
|
// Loop.cpp, so we should be ok
|
|
//if ( g_niceness==0 && t->m_niceness ) { char *xx=NULL;*xx=0; }
|
|
|
|
// save it
|
|
int32_t saved = g_niceness;
|
|
|
|
// log it now
|
|
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: enter thread callback t=0x%" PTRFMT " "
|
|
//"type=%s "
|
|
"state=0x%" PTRFMT " "
|
|
"nice=%" INT32 "",
|
|
(PTRTYPE)t,
|
|
//getThreadType(),
|
|
(PTRTYPE)t->m_state,
|
|
(int32_t)t->m_niceness);
|
|
|
|
// time it?
|
|
int64_t start;
|
|
if ( g_conf.m_maxCallbackDelay >= 0 )
|
|
start = gettimeofdayInMillisecondsLocal();
|
|
|
|
|
|
// then set it
|
|
if ( t->m_niceness >= 1 ) g_niceness = 1;
|
|
else g_niceness = 0;
|
|
|
|
t->m_callback ( t->m_state , t );
|
|
|
|
// time it?
|
|
if ( g_conf.m_maxCallbackDelay >= 0 ) {
|
|
int64_t elapsed = gettimeofdayInMillisecondsLocal() - start;
|
|
if ( elapsed >= g_conf.m_maxCallbackDelay )
|
|
log("threads: Took %" INT64 " ms to call "
|
|
"thread callback niceness=%" INT32 "",
|
|
elapsed,(int32_t)saved);
|
|
}
|
|
|
|
|
|
// log it now
|
|
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"loop: exit thread callback t=0x%" PTRFMT " "
|
|
//"type=%s "
|
|
"nice=%" INT32 "",
|
|
(PTRTYPE)t,
|
|
//getThreadType(),
|
|
(int32_t)t->m_niceness);
|
|
|
|
|
|
// restore global niceness
|
|
g_niceness = saved;
|
|
|
|
}
|
|
|
|
bool Threads::cleanUp ( ThreadEntry *t , int32_t maxNiceness ) {
|
|
bool didSomething = false;
|
|
loop:
|
|
// assume no more cleanup needed
|
|
m_needsCleanup = false;
|
|
//m_needBottom = false;
|
|
// debug msg
|
|
//log("cleanUp");
|
|
|
|
// debug msg
|
|
//log("cleaning up exited threads and calling callbacks");
|
|
for ( int32_t i = 0 ; i < m_numQueues ; i++ ) {
|
|
didSomething |= m_threadQueues[i].cleanUp( t , maxNiceness );
|
|
// . if we broke from the loop
|
|
//if(m_needBottom) maxNiceness = 0;
|
|
}
|
|
// . loop more if we got a new one
|
|
// . thread will set this when about to exit
|
|
// . waitpid() may be interrupted by a SIGCHLD and not get his pid
|
|
if ( m_needsCleanup ) goto loop;
|
|
return didSomething;
|
|
}
|
|
|
|
// . cleans up any threads that have exited
|
|
// . their m_isDone should be set to true
|
|
// . don't process threads whose niceness is > maxNiceness
|
|
bool ThreadQueue::cleanUp ( ThreadEntry *tt , int32_t maxNiceness ) {
|
|
// call all callbacks after all threads are cleaned up
|
|
void (* callbacks[64])(void *state,ThreadEntry *);
|
|
void *states [64];
|
|
int64_t times [64];
|
|
int64_t times2 [64];
|
|
int64_t times3 [64];
|
|
int64_t times4 [64];
|
|
ThreadEntry *tids [64];
|
|
int64_t startTime = gettimeofdayInMilliseconds();
|
|
|
|
// top:
|
|
ThreadEntry *t = m_launchedHead;
|
|
ThreadEntry *nextLink = NULL;
|
|
int32_t numCallbacks = 0;
|
|
// loop through candidates
|
|
//for ( int32_t i = 0 ; i < m_top && numCallbacks < 64 ; i++ ) {
|
|
for ( ; t && numCallbacks < 64 ; t = nextLink ) {
|
|
// do it here in case we modify the linked list below
|
|
nextLink = t->m_nextLink;
|
|
// point to it
|
|
//ThreadEntry *t = &m_entries[i];
|
|
// skip if not qualified
|
|
if ( t->m_niceness > maxNiceness ) {
|
|
//if(t->m_isDone) {
|
|
// g_threads.m_needBottom = true;
|
|
// //g_threads.m_needsCleanup = true;
|
|
//}
|
|
continue;
|
|
}
|
|
// must be occupied to be done (sanity check)
|
|
if ( ! t->m_isOccupied ) continue;
|
|
// skip if not launched yet
|
|
if ( ! t->m_isLaunched ) continue;
|
|
// . we were segfaulting right here before because the thread
|
|
// was setting t->m_pid and at this point it was not
|
|
// set so t->m_pid was a bogus value
|
|
// . thread may have seg faulted, in which case sigbadhandler()
|
|
// in Loop.cpp will get it and set errno to 0x7fffffff
|
|
#ifndef PTHREADS
|
|
// MDW: i hafta take this out because the errno_location thing
|
|
// is not working on the newer gcc
|
|
if ( ! t->m_isDone && t->m_pid >= 0 &&
|
|
s_errnos [t->m_pid] == 0x7fffffff ) {
|
|
log("thread: Got abnormal thread termination. Seems "
|
|
"like the thread might have cored.");
|
|
s_errnos[t->m_pid] = 0;
|
|
goto again;
|
|
}
|
|
#endif
|
|
// skip if not done yet
|
|
if ( ! t->m_isDone ) continue;
|
|
|
|
|
|
#ifdef PTHREADS
|
|
|
|
if ( t->m_needsJoin ) {
|
|
// . join up with that thread
|
|
// . damn, sometimes he can block forever on his
|
|
// call to sigqueue(),
|
|
int32_t status = pthread_join ( t->m_joinTid , NULL );
|
|
if ( status != 0 ) {
|
|
log("threads: "
|
|
"pthread_join2 %" INT64 " = %s (%" INT32 ")",
|
|
(int64_t)t->m_joinTid,mstrerror(status),
|
|
status);
|
|
}
|
|
// debug msg
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: joined2 with "
|
|
"t=0x%" PTRFMT " "
|
|
"jointid=0x%" XINT64 ".",
|
|
(PTRTYPE)t,(int64_t)t->m_joinTid);
|
|
|
|
// re-protect this stack
|
|
mprotect ( t->m_stack + GUARDSIZE ,
|
|
STACK_SIZE - GUARDSIZE,
|
|
PROT_NONE );
|
|
g_threads.returnStack ( t->m_si );
|
|
t->m_stack = NULL;
|
|
|
|
}
|
|
#else
|
|
|
|
again:
|
|
int status ;
|
|
pid_t pid = waitpid ( t->m_pid , &status , 0 );
|
|
int err = errno;
|
|
// debug the waitpid
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: Waiting for "
|
|
"t=0x%" PTRFMT " pid=%" INT32 ".",
|
|
(PTRTYPE)t,(int32_t)t->m_pid);
|
|
// bitch and continue if join failed
|
|
if ( pid != t->m_pid ) {
|
|
// waitpid() gets interrupted by various signals so
|
|
// we need to repeat (SIGCHLD?)
|
|
if ( err == EINTR ) goto again;
|
|
log("thread: Call to waitpid(%" INT32 ") returned %" INT32 ": %s.",
|
|
(int32_t)t->m_pid,(int32_t)pid,mstrerror(err));
|
|
continue;
|
|
}
|
|
// if status not 0 then process got abnormal termination
|
|
if ( ! WIFEXITED(status) ) {
|
|
if ( WIFSIGNALED(status) )
|
|
log("thread: Child process (pid=%i) exited "
|
|
"from unhandled signal number %" INT32 ".",
|
|
pid,(int32_t)WTERMSIG(status));
|
|
else
|
|
log("thread: Child process (pid=%i) exited "
|
|
"for unknown reason." , pid );
|
|
}
|
|
//mfree ( t->m_stack , STACK_SIZE , "Threads" );
|
|
// re-protect this stack
|
|
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE,
|
|
PROT_NONE );
|
|
g_threads.returnStack ( t->m_si );
|
|
t->m_stack = NULL;
|
|
|
|
#endif
|
|
|
|
// debug msg
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: joined with pid=%" INT32 " pid=%" INT32 ".",
|
|
(int32_t)t->m_pid,(int32_t)t->m_pid);
|
|
|
|
|
|
// we may get cleaned up and re-used and our niceness reassignd
|
|
// right after set m_isDone to true, so save niceness
|
|
//int32_t niceness = t->m_niceness;
|
|
char qnum = t->m_qnum;
|
|
ThreadQueue *tq = &g_threads.m_threadQueues[(int)qnum];
|
|
if ( tq != this ) { char *xx = NULL; *xx = 0; }
|
|
|
|
/*
|
|
// get read size before cleaning it up -- it could get nuked
|
|
int32_t rs = 0;
|
|
bool isWrite = false;
|
|
if ( tq->m_threadType == DISK_THREAD ) { // && t->m_state ) {
|
|
//FileState *fs = (FileState *)t->m_state;
|
|
rs = t->m_bytesToGo;
|
|
isWrite = t->m_doWrite ;
|
|
}
|
|
|
|
if ( niceness <= 0) tq->m_hiReturned++;
|
|
else if ( niceness == 1) tq->m_mdReturned++;
|
|
else if ( niceness >= 2) tq->m_loReturned++;
|
|
// deal with the tiers for disk threads based on read sizes
|
|
if ( tq->m_threadType == DISK_THREAD ) {
|
|
// writes are special cases
|
|
if ( isWrite ) m_writesReturned++;
|
|
if ( rs >= 0 && niceness >= 2 ) {
|
|
if ( rs > g_conf.m_medReadSize )
|
|
tq->m_loReturnedBig++;
|
|
else if ( rs > g_conf.m_smaReadSize )
|
|
tq->m_loReturnedMed++;
|
|
else
|
|
tq->m_loReturnedSma++;
|
|
}
|
|
else if ( rs >= 0 && niceness >= 1 ) {
|
|
if ( rs > g_conf.m_medReadSize )
|
|
tq->m_mdReturnedBig++;
|
|
else if ( rs > g_conf.m_smaReadSize )
|
|
tq->m_mdReturnedMed++;
|
|
else
|
|
tq->m_mdReturnedSma++;
|
|
}
|
|
else if ( rs >= 0 ) {
|
|
if ( rs > g_conf.m_medReadSize )
|
|
tq->m_hiReturnedBig++;
|
|
else if ( rs > g_conf.m_smaReadSize )
|
|
tq->m_hiReturnedMed++;
|
|
else
|
|
tq->m_hiReturnedSma++;
|
|
}
|
|
}
|
|
*/
|
|
|
|
// . we should count down here, not in the master thread
|
|
// . solves Problem #2 ?
|
|
// . TODO: this is not necessaruly atomic we should set
|
|
// t->m_aboutToExit to true so cleanUp can periodically set
|
|
// m_returned to what it should be!!!
|
|
//g_threads.m_threadQueues[qnum].m_returned++;
|
|
|
|
// now count him as returned
|
|
m_returned++;
|
|
|
|
// prepare for relaunch if we were cancelled
|
|
if ( t->m_isCancelled ) {
|
|
t->m_isCancelled = false;
|
|
t->m_isLaunched = false;
|
|
t->m_isDone = false;
|
|
// take out of the linked list of launched threads
|
|
removeLink ( &m_launchedHead , t );
|
|
// what waiting linked list did it come from?
|
|
// get the waiting linked list from whence we came
|
|
ThreadEntry **bestHeadPtr = t->m_bestHeadPtr;
|
|
ThreadEntry **bestTailPtr = t->m_bestTailPtr;
|
|
// and put it back at the head
|
|
addLinkToHead ( bestHeadPtr , bestTailPtr , t );
|
|
log("thread: Thread cancelled. Preparing thread "
|
|
"for relaunch");
|
|
continue;
|
|
}
|
|
// debug msg
|
|
//log("[%" UINT32 "] CLEANING UP THREAD type=%" INT32 ", numLaunched=%" INT32 "",
|
|
// m_entries[i].m_tid , m_threadType , m_launched );
|
|
// remove it
|
|
// debug msg
|
|
//log("CLN TID=%" UINT32 " t=%" UINT32 "",(int32_t)t->m_tid , (int32_t)t);
|
|
//log("thread callback for tid=%" UINT32 "",(int32_t)t->m_tid );
|
|
// . save important stuff before freeing up the ThreadEntry
|
|
// for possible take over.
|
|
// . calling the callback may launch a thread which may
|
|
// claim THIS thread entry, t
|
|
//void (* callback)(void *state);
|
|
//callback = t->m_callback;
|
|
//void *state = t->m_state;
|
|
|
|
callbacks [ numCallbacks ] = t->m_callback;
|
|
states [ numCallbacks ] = t->m_state;
|
|
times [ numCallbacks ] = t->m_queuedTime;
|
|
times2 [ numCallbacks ] = t->m_launchedTime;
|
|
times3 [ numCallbacks ] = t->m_preExitTime;
|
|
times4 [ numCallbacks ] = t->m_exitTime;
|
|
tids [ numCallbacks ] = t;
|
|
numCallbacks++;
|
|
|
|
// SOLUTION: before calling the callback which may launch
|
|
// another thread with this same tid, thus causing an error,
|
|
// we should set these to false first:
|
|
// not running any more
|
|
t->m_isLaunched = false;
|
|
// not occupied any more
|
|
t->m_isOccupied = false;
|
|
// do we have to decrement top
|
|
//if ( m_top == i + 1 )
|
|
// while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied)
|
|
// m_top--;
|
|
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: [t=0x%" PTRFMT "] "
|
|
"remove from launched list, add to empty list",
|
|
(PTRTYPE)t);
|
|
|
|
// now move it to the empty list
|
|
removeLink ( &m_launchedHead , t );
|
|
addLink ( &m_emptyHead , t );
|
|
// send a cancel sig to the thread in case it's still there
|
|
//int err = pthread_cancel ( t->m_tid );
|
|
//if ( err != 0 ) log("thread: cleanUp: pthread_cancel: %s",
|
|
// mstrerror(err) );
|
|
// one less entry occupied
|
|
//m_entriesUsed--;
|
|
// debug msg
|
|
//log("m_entriesUsed now %" INT32 "",m_entriesUsed);
|
|
// one more returned
|
|
//m_returned++;
|
|
// clear the g_errno in case set by a previous callback
|
|
//g_errno = 0;
|
|
// launch as many threads as we can before calling the
|
|
// callback since this may hog the CPU like Msg20 does
|
|
//g_threads.launchThreads();
|
|
|
|
|
|
g_errno = 0;
|
|
|
|
makeCallback ( t );
|
|
|
|
// int64_t took = gettimeofdayInMilliseconds()-startTime;
|
|
// if(took > 8 && maxNiceness > 0) {
|
|
// if(g_conf.m_sequentialProfiling)
|
|
// log(LOG_TIMING,
|
|
// "admin: Threads spent %" INT64 " ms to callback "
|
|
// "%" INT32 " callbacks, nice: %" INT32 "",
|
|
// took, numCallbacks, maxNiceness);
|
|
// g_threads.m_needBottom = true;
|
|
// maxNiceness = 0;
|
|
// }
|
|
|
|
// clear errno again
|
|
g_errno = 0;
|
|
|
|
if ( g_conf.m_logDebugThread ) {
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
log(LOG_DEBUG,"thread: [t=0x%" PTRFMT "] %s done2. "
|
|
"active=%" INT32 " "
|
|
"time since queued = %" UINT64 " ms "
|
|
"time since launch = %" UINT64 " ms "
|
|
"time since pre-exit = %" UINT64 " ms "
|
|
"time since exit = %" UINT64 " ms",
|
|
(PTRTYPE)t,
|
|
getThreadType() ,
|
|
(int32_t)(m_launched - m_returned) ,
|
|
(uint64_t)(now - t->m_queuedTime),
|
|
(uint64_t)(now - t->m_launchedTime),
|
|
(uint64_t)(now - t->m_preExitTime) ,
|
|
(uint64_t)(now - t->m_exitTime) );
|
|
}
|
|
|
|
|
|
|
|
|
|
// calling thread callback
|
|
//log("calling thread id %" INT32 " callback", (int32_t)(t->m_tid));
|
|
// first call it's callback
|
|
//callback ( state );
|
|
// clear after just in case
|
|
//g_errno = 0;
|
|
// debug msg
|
|
//log("CLN2 TID=%" UINT32 " t=%" INT32 "",(int32_t)t->m_tid ,(int32_t)t);
|
|
// return now if tt was specified
|
|
//if ( tt ) return;
|
|
}
|
|
|
|
int64_t took2 = gettimeofdayInMilliseconds()-startTime;
|
|
if(numCallbacks > 0 && took2 > 5)
|
|
log(LOG_DEBUG, "threads: took %" INT64 " ms to callback %" INT32 " "
|
|
"callbacks, nice: %" INT32 "", took2, numCallbacks, maxNiceness);
|
|
|
|
//since we need finer grained control in loop, we no longer collect
|
|
//the callbacks, sort, then call them. we now call them right away
|
|
//that way we can break out if we start taking too long and
|
|
//give control back to udpserver.
|
|
return numCallbacks != 0;
|
|
|
|
/*
|
|
// print out that we got them
|
|
if ( g_conf.m_logDebugThread ) {
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
for ( int32_t i = 0 ; i < numCallbacks ; i++ )
|
|
log(LOG_DEBUG,"thread: [tid=%" PTRFMT "] %s done3. "
|
|
"active=%" INT32 " "
|
|
"time since queued = %" UINT64 " ms "
|
|
"time since launch = %" UINT64 " ms "
|
|
"time since pre-exit = %" UINT64 " ms "
|
|
"time since exit = %" UINT64 " ms",
|
|
(PTRTYPE)tids[i],
|
|
getThreadType() ,
|
|
(int32_t)(m_launched - m_returned) ,
|
|
(uint64_t)(now - times [i]),
|
|
(uint64_t)(now - times2[i]) ,
|
|
(uint64_t)(now - times3[i]) ,
|
|
(uint64_t)(now - times4[i]) );
|
|
}
|
|
|
|
// . before calling callbacks, launch any other threads waiting in
|
|
// this queue
|
|
// . TODO: break into parts, cleanup, launch, call callbacks
|
|
//while ( launchThread() );
|
|
|
|
// . sort callbacks by queued time
|
|
// . do bubble sort cuz usually it's not too many threads we cleaned up
|
|
bool flag ;
|
|
void (* tmpCallback)(void *state,ThreadEntry *t);
|
|
int64_t tmpTime;
|
|
void *tmpState;
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
|
|
bubble:
|
|
flag = false;
|
|
for ( int32_t i = 1 ; i < numCallbacks ; i++ ) {
|
|
if ( times[i] >= times[i-1] ) continue;
|
|
tmpTime = times [i ];
|
|
tmpState = states [i ];
|
|
tmpCallback = callbacks[i ];
|
|
times [i ] = times [i-1];
|
|
states [i ] = states [i-1];
|
|
tids [i ] = tids [i-1];
|
|
callbacks [i ] = callbacks[i-1];
|
|
times [i-1] = tmpTime;
|
|
states [i-1] = tmpState;
|
|
callbacks [i-1] = tmpCallback;
|
|
flag = true;
|
|
}
|
|
if ( flag ) goto bubble;
|
|
|
|
// call the callbacks now in order of oldest queued time first
|
|
for ( int32_t i = 0 ; i < numCallbacks ; i++ ) {
|
|
g_errno = 0;
|
|
callbacks[i] ( states[i] , NULL );
|
|
}
|
|
|
|
int64_t took = gettimeofdayInMilliseconds()-now;
|
|
if(numCallbacks > 0 && took > 5)
|
|
log(LOG_TIMING, "admin: took %" INT64 " ms to callback %" INT32 " "
|
|
"callbacks, nice: %" INT32 "", took, numCallbacks, maxNiceness);
|
|
|
|
#if 0 //I think this is more efficient, for now we'll just use the old way
|
|
//theres no reason to sort these, lets just find the top one
|
|
//and call it. do it again it until we've called all of them.
|
|
int32_t newNumCallbacks = numCallbacks;
|
|
for ( int32_t i = 0 ; i < numCallbacks ; i++ ) {
|
|
int64_t maxTime = 0;
|
|
int32_t maxNdx = 0;
|
|
for ( int32_t j = 0 ; j < newNumCallbacks ; j++ ) {
|
|
if(maxTime >= times[i]) {
|
|
maxTime = times[i];
|
|
maxNdx = i;
|
|
}
|
|
}
|
|
g_errno = 0;
|
|
callbacks[maxNdx] ( states[maxNdx] );
|
|
//copy last one into called slot and decrement
|
|
newNumCallbacks--;
|
|
times [maxNdx] = times [newNumCallbacks];
|
|
states [maxNdx] = states [newNumCallbacks];
|
|
callbacks [maxNdx] = callbacks[newNumCallbacks];
|
|
}
|
|
#endif
|
|
g_errno = 0;
|
|
|
|
// close more threads if we were limited by callbacks[] size
|
|
if ( numCallbacks >= 64 ) goto top;
|
|
|
|
//return true if we called something back;
|
|
//returns wrong value if numCallbacks was exactly 64, not too serious...
|
|
return numCallbacks != 0;
|
|
*/
|
|
}
|
|
|
|
// used by UdpServer to see if it should call a low priority callback
|
|
bool Threads::hasHighPriorityCpuThreads() {
|
|
ThreadQueue *q ;
|
|
//int32_t hiActive = 0 ;
|
|
ThreadEntry *t ;
|
|
q = &g_threads.m_threadQueues[INTERSECT_THREAD];
|
|
t = q->m_launchedHead;
|
|
for ( ; t ; t = t->m_nextLink )
|
|
if ( t->m_niceness == 0 ) return true;
|
|
//hiActive += q->m_hiLaunched - q->m_hiReturned;
|
|
q = &g_threads.m_threadQueues[MERGE_THREAD];
|
|
t = q->m_launchedHead;
|
|
for ( ; t ; t = t->m_nextLink )
|
|
if ( t->m_niceness == 0 ) return true;
|
|
//hiActive += q->m_hiLaunched - q->m_hiReturned;
|
|
return false;
|
|
}
|
|
|
|
// used by UdpServer to see if it should call a low priority callback
|
|
int32_t Threads::getNumActiveHighPriorityThreads() {
|
|
ThreadQueue *q ;
|
|
ThreadEntry *t;
|
|
int32_t hiActive = 0 ;
|
|
q = &g_threads.m_threadQueues[DISK_THREAD];
|
|
t = q->m_launchedHead;
|
|
for ( ; t ; t = t->m_nextLink )
|
|
if ( t->m_niceness == 0 ) hiActive++;
|
|
//hiActive += q->m_hiLaunched - q->m_hiReturned;
|
|
q = &g_threads.m_threadQueues[INTERSECT_THREAD];
|
|
t = q->m_launchedHead;
|
|
for ( ; t ; t = t->m_nextLink )
|
|
if ( t->m_niceness == 0 ) hiActive++;
|
|
//hiActive += q->m_hiLaunched - q->m_hiReturned;
|
|
q = &g_threads.m_threadQueues[MERGE_THREAD];
|
|
t = q->m_launchedHead;
|
|
for ( ; t ; t = t->m_nextLink )
|
|
if ( t->m_niceness == 0 ) hiActive++;
|
|
//hiActive += q->m_hiLaunched - q->m_hiReturned;
|
|
return hiActive;
|
|
}
|
|
|
|
// . returns false if no thread launched
|
|
// . returns true if thread was launched
|
|
// . sets g_errno on error
|
|
// . don't launch a low priority thread if a high priority thread is running
|
|
// . i.e. don't launch a high niceness thread if a low niceness is running
|
|
bool ThreadQueue::launchThread2 ( ) {
|
|
|
|
// or if no stacks left, don't even try
|
|
if ( s_head == -1 ) return false;
|
|
// . how many threads are active now?
|
|
// . NOTE: not perfectly thread safe here
|
|
int64_t active = m_launched - m_returned ;
|
|
// debug msg
|
|
if ( g_conf.m_logDebugThread && m_threadType == DISK_THREAD )
|
|
log(LOG_DEBUG,"thread: q=%s launchThread: active=%" INT64 " "
|
|
"max=%" INT32 ".",getThreadType(), active,m_maxLaunched);
|
|
// return if the max is already launched for this thread queue
|
|
if ( active >= m_maxLaunched ) return false;
|
|
|
|
|
|
if ( m_threadType != DISK_THREAD ) {
|
|
// if one thread of this type is already out, forget it
|
|
// then we can't have 100 GENERIC THREADS!!! with this...
|
|
//if ( m_launchedHead ) return false;
|
|
// first try niceness 0 queue
|
|
ThreadEntry **bestHeadPtr = &m_waitHead0;
|
|
ThreadEntry **bestTailPtr = &m_waitTail0;
|
|
// if empty, try niceness 1
|
|
if ( ! *bestHeadPtr ) {
|
|
bestHeadPtr = &m_waitHead1;
|
|
bestTailPtr = &m_waitTail1;
|
|
}
|
|
// then niceness 2
|
|
if ( ! *bestHeadPtr ) {
|
|
bestHeadPtr = &m_waitHead2;
|
|
bestTailPtr = &m_waitTail2;
|
|
}
|
|
// if bother empty, that was easy
|
|
if ( ! *bestHeadPtr ) return false;
|
|
// do not launch a low priority merge, intersect or filter
|
|
// thread if we/ have high priority cpu threads already going
|
|
// on. this way a low priority spider thread will not launch
|
|
// if a high priority cpu-based thread of any kind (right now
|
|
// just MERGE or INTERSECT) is already running.
|
|
if ( (*bestHeadPtr)->m_niceness > 0 &&
|
|
g_threads.hasHighPriorityCpuThreads ( ) )
|
|
return false;
|
|
// otherwise launch the next one
|
|
// MERGE_THREAD, INTERSECT, FILTER, UNLINK, SAVETREE
|
|
return launchThreadForReals ( bestHeadPtr , bestTailPtr );
|
|
}
|
|
|
|
// disk thread is much more complicated
|
|
// use several queues
|
|
|
|
// debug msg
|
|
//log("trying to launch for type=%" INT32 "",(int32_t)m_threadType);
|
|
// clean up any threads that have exited
|
|
//cleanUp ();
|
|
|
|
// if no entries then nothing to launch
|
|
//if ( m_top <= 0 ) return false;
|
|
|
|
|
|
|
|
// return log("MAX. %" INT32 " are launched. %" INT32 " now in queue.",
|
|
// active , m_entriesUsed );
|
|
// . sanity check
|
|
// . a thread can NOT call this
|
|
//if ( getpid() != s_pid ) {
|
|
// fprintf(stderr,"thread: launchThread: bad engineer\n");
|
|
// ::exit(-1);
|
|
//}
|
|
//int64_t now = gettimeofdayInMilliseconds();
|
|
/*
|
|
int64_t now = -1LL;
|
|
// pick thread with lowest niceness first
|
|
int32_t minNiceness = 0x7fffffff;
|
|
int64_t maxWait = -1;
|
|
//int32_t mini = -1;
|
|
//bool minIsWrite = false;
|
|
int32_t lowest = 0x7fffffff;
|
|
int32_t highest = 0;
|
|
// . now base our active thread counts on niceness AND read sizes
|
|
// . this is only used for DISK_THREADs
|
|
// . loActive* includes niceness >= 1
|
|
int32_t loActiveBig = m_loLaunchedBig - m_loReturnedBig;
|
|
int32_t loActiveMed = m_loLaunchedMed - m_loReturnedMed;
|
|
int32_t loActiveSma = m_loLaunchedSma - m_loReturnedSma;
|
|
int32_t mdActiveBig = m_mdLaunchedBig - m_mdReturnedBig;
|
|
int32_t mdActiveMed = m_mdLaunchedMed - m_mdReturnedMed;
|
|
int32_t mdActiveSma = m_mdLaunchedSma - m_mdReturnedSma;
|
|
int32_t hiActiveBig = m_hiLaunchedBig - m_hiReturnedBig;
|
|
int32_t hiActiveMed = m_hiLaunchedMed - m_hiReturnedMed;
|
|
int32_t hiActiveSma = m_hiLaunchedSma - m_hiReturnedSma;
|
|
int32_t activeWrites = m_writesLaunched - m_writesReturned;
|
|
// how many niceness=2 threads are currently running now?
|
|
int64_t loActive = m_loLaunched - m_loReturned;
|
|
int64_t mdActive = m_mdLaunched - m_mdReturned;
|
|
//int64_t hiActive = m_hiLaunched - m_hiReturned;
|
|
int32_t total = loActive + mdActive;
|
|
*/
|
|
// hi priority max
|
|
// JAB: warning abatement
|
|
//int64_t hiActive = m_hiLaunched - m_hiReturned;
|
|
|
|
// get lowest niceness level of launched threads
|
|
ThreadEntry *t = m_launchedHead;
|
|
bool hasLowNicenessOut = false;
|
|
int32_t activeCount = 0;
|
|
int32_t spiderCount = 0;
|
|
// int32_t launchedBig0 = 0 , launchedMed0 = 0 , launchedSma0 = 0;
|
|
// int32_t launchedBig1 = 0 , launchedMed1 = 0 , launchedSma1 = 0;
|
|
for ( ; t ; t = t->m_nextLink ) {
|
|
// if he's done, skip him. maybe he hasn't been cleaned up yet
|
|
if ( t->m_isDone ) continue;
|
|
// count active out
|
|
activeCount++;
|
|
// get the highest niceness for all that are launched
|
|
//if ( niceness > highest ) highest = niceness;
|
|
//int32_t rs = t->m_bytesToGo;
|
|
// is the launched thread niceness 0? (i.e. high priority)
|
|
if ( t->m_niceness == 0 ) {
|
|
// set a flag
|
|
hasLowNicenessOut = true;
|
|
// count read sizes
|
|
// if (rs > g_conf.m_medReadSize ) launchedBig0++;
|
|
// else if (rs > g_conf.m_smaReadSize ) launchedMed0++;
|
|
// else launchedSma0++;
|
|
}
|
|
else {
|
|
spiderCount++;
|
|
}
|
|
// if ( rs > g_conf.m_medReadSize ) launchedBig1++;
|
|
// else if ( rs > g_conf.m_smaReadSize ) launchedMed1++;
|
|
// else launchedSma1++;
|
|
}
|
|
|
|
// int32_t max = g_conf.m_spiderMaxDiskThreads;
|
|
// if ( max <= 0 ) max = 1;
|
|
// if ( activeCount >= max )
|
|
// return false;
|
|
|
|
// get best thread candidate from best linked list of candidates
|
|
ThreadEntry **bestHeadPtr = NULL;
|
|
ThreadEntry **bestTailPtr = NULL;
|
|
// short/med/long high priority (niceness 0) disk reads in head0/1/2
|
|
// but we can't launch one more if already at our quota.
|
|
if ( ! bestHeadPtr && m_waitHead0 ) {
|
|
//launchedSma0 < g_conf.m_queryMaxSmaDiskThreads ) {
|
|
bestHeadPtr = &m_waitHead0;
|
|
bestTailPtr = &m_waitTail0;
|
|
}
|
|
// if ( ! bestHeadPtr &&
|
|
// m_waitHead1 &&
|
|
// launchedMed0 < g_conf.m_queryMaxMedDiskThreads ) {
|
|
// bestHeadPtr = &m_waitHead1;
|
|
// bestTailPtr = &m_waitTail1;
|
|
// }
|
|
// if ( ! bestHeadPtr &&
|
|
// m_waitHead2 &&
|
|
// launchedBig0 < g_conf.m_queryMaxBigDiskThreads ) {
|
|
// bestHeadPtr = &m_waitHead2;
|
|
// bestTailPtr = &m_waitTail2;
|
|
// }
|
|
|
|
// if we have a niceness 0 disk read/write outstandind and we are
|
|
// 1 or 2, do not launch! we do not want low priority disk reads
|
|
// having to contend with high priority ones.
|
|
// now we only do this if the 'separate disk reads' parms is true.
|
|
if ( g_conf.m_separateDiskReads && hasLowNicenessOut && ! bestHeadPtr )
|
|
return false;
|
|
|
|
// threads to save conf and tree/bucket files to disk go in waitHead3
|
|
// no these use SAVETREE Thread type
|
|
// if ( ! bestHead ) {
|
|
// bestHead = m_waitHead3;
|
|
// bestTail = m_waitTail3;
|
|
// }
|
|
|
|
// do not allow too high niceness read threads out
|
|
if ( spiderCount >= g_conf.m_spiderMaxDiskThreads )
|
|
return false;
|
|
|
|
// low priority (merge or dump) disk WRITES go in waithead4
|
|
if ( ! bestHeadPtr && m_waitHead4 ) {
|
|
bestHeadPtr = &m_waitHead4;
|
|
bestTailPtr = &m_waitTail4;
|
|
}
|
|
|
|
// niceness 1. for merge reads so they superscede regular spider reads
|
|
|
|
// niceness 1 read threads:
|
|
if ( ! bestHeadPtr && m_waitHead5 ) {
|
|
bestHeadPtr = &m_waitHead5;
|
|
bestTailPtr = &m_waitTail5;
|
|
}
|
|
|
|
// niceness 2 read threads:
|
|
if ( ! bestHeadPtr && m_waitHead6 ) {
|
|
bestHeadPtr = &m_waitHead6;
|
|
bestTailPtr = &m_waitTail6;
|
|
}
|
|
|
|
// if nobody waiting, return false
|
|
if ( ! bestHeadPtr ) return false;
|
|
|
|
// i dunno what the point of this was... so i commented it out
|
|
//int32_t max2 = g_conf.m_queryMaxDiskThreads ;
|
|
//if ( max2 <= 0 ) max2 = 1;
|
|
// only do this check if we're a addlists/instersect thread queue
|
|
//if (m_threadType == INTERSECT_THREAD&& hiActive >= max2)return false;
|
|
|
|
// point to entry in the best linked list to launch from
|
|
return launchThreadForReals ( bestHeadPtr , bestTailPtr );
|
|
|
|
/*
|
|
// loop through candidates
|
|
for ( int32_t i = 0 ; i < m_top ; i++ ) {
|
|
// skip if not occupied
|
|
if ( ! m_entries[i].m_isOccupied ) continue;
|
|
int32_t niceness = m_entries[i].m_niceness;
|
|
// get lowest niceness level of launched threads
|
|
if ( m_entries[i].m_isLaunched ) {
|
|
// if he's done, skip him
|
|
if ( m_entries[i].m_isDone ) continue;
|
|
// get the highest niceness for all that are launched
|
|
if ( niceness > highest ) highest = niceness;
|
|
// get the lowest niceness for all that are launched
|
|
if ( niceness < lowest ) lowest = niceness;
|
|
// continue now since it's already launched
|
|
continue;
|
|
}
|
|
|
|
// . these filters really make it so the spider does not
|
|
// impact query response time
|
|
|
|
//if ( niceness >= 1 && hiActive > 0 ) continue;
|
|
|
|
// don't consider any lows if one already running
|
|
//if ( niceness >= 2 && loActive > 0 ) continue;
|
|
// don't consider any lows if a hi already running
|
|
//if ( niceness >= 2 && hiActive > 0 ) continue;
|
|
// don't consider any mediums if one already running
|
|
//if ( niceness == 1 && mdActive > 0 ) continue;
|
|
// don't consider any mediums if a hi already running
|
|
//if ( niceness == 1 && hiActive > 0 ) continue;
|
|
|
|
//if ( m_threadType == DISK_THREAD ) {
|
|
// if ( niceness >= 1 && hiActive > 0 ) continue;
|
|
// if ( niceness >= 2 && loActive >= max ) continue;
|
|
// if ( niceness == 1 && mdActive >= max ) continue;
|
|
//}
|
|
|
|
// treat niceness 1 as niceness 2 for ranking purposes
|
|
// IFF we're not too backlogged with file merges. i.e.
|
|
// IFF we're merging faster than we're dumping.
|
|
// only merges and dumps have niceness 1 really.
|
|
// spider disk reads are all niceness 2.
|
|
// Now Rdb::addList just refuses to add data if we have too
|
|
// many unmerged files on disk!
|
|
// now we use niceness 1 for "real merges" so those reads take
|
|
// priority over spider build reads. (8/14/12)
|
|
//if(niceness == 1 ) niceness = 2;
|
|
|
|
// if he doesn't beat or tie us, skip him
|
|
if ( niceness > minNiceness ) continue;
|
|
|
|
// no more than "max" medium and low priority threads should
|
|
// be active/launched at any one time
|
|
if ( niceness >= 1 && total >= max ) continue;
|
|
|
|
// int16_tcut
|
|
ThreadEntry *t = &m_entries[i];
|
|
|
|
// what is this guy's read size?
|
|
// the filestate provided could have been
|
|
//FileState *fs ;
|
|
int32_t readSize = 0 ;
|
|
bool isWrite = false;
|
|
if ( m_threadType == DISK_THREAD ){//&&m_entries[i].m_state ) {
|
|
//fs = (FileState *)m_entries[i].m_state;
|
|
readSize = t->m_bytesToGo;
|
|
isWrite = t->m_doWrite ;
|
|
}
|
|
if ( isWrite && activeWrites > g_conf.m_maxWriteThreads )
|
|
continue;
|
|
|
|
if ( m_threadType == MERGE_THREAD ||
|
|
m_threadType == INTERSECT_THREAD ||
|
|
m_threadType == FILTER_THREAD )
|
|
if ( niceness > 0 && hiActive2 > 0 )
|
|
continue;
|
|
|
|
// how many threads can be launched for this readSize/niceness?
|
|
if ( niceness >= 1 && m_threadType == DISK_THREAD ) {
|
|
if ( readSize > g_conf.m_medReadSize ) {
|
|
if ( loActiveBig + mdActiveBig >=
|
|
g_conf.m_spiderMaxBigDiskThreads )
|
|
continue;
|
|
}
|
|
else if ( readSize > g_conf.m_smaReadSize ) {
|
|
if ( loActiveMed + mdActiveMed >=
|
|
g_conf.m_spiderMaxMedDiskThreads )
|
|
continue;
|
|
}
|
|
else if ( loActiveSma + mdActiveSma >=
|
|
g_conf.m_spiderMaxSmaDiskThreads )
|
|
continue;
|
|
}
|
|
else if ( niceness < 1 && m_threadType == DISK_THREAD ) {
|
|
if ( readSize > g_conf.m_medReadSize ) {
|
|
if ( hiActiveBig >=
|
|
g_conf.m_queryMaxBigDiskThreads )
|
|
continue;
|
|
}
|
|
else if ( readSize > g_conf.m_smaReadSize ) {
|
|
if ( hiActiveMed >=
|
|
g_conf.m_queryMaxMedDiskThreads )
|
|
continue;
|
|
}
|
|
else if ( hiActiveSma >=
|
|
g_conf.m_queryMaxSmaDiskThreads )
|
|
continue;
|
|
}
|
|
// be lazy with this since it uses a significant amount of cpu
|
|
if ( now == -1LL ) now = gettimeofdayInMilliseconds();
|
|
// how long has this entry been waiting in the queue to launch?
|
|
int64_t waited = now - m_entries[i].m_queuedTime ;
|
|
// adjust "waited" if it originally had a niceness of 1
|
|
if ( m_entries[i].m_niceness >= 1 ) {
|
|
// save threads gain precedence
|
|
if ( m_threadType == SAVETREE_THREAD )
|
|
waited += 49999999;
|
|
else if ( m_threadType == UNLINK_THREAD )
|
|
waited += 39999999;
|
|
else if ( m_threadType == MERGE_THREAD )
|
|
waited += 29999999;
|
|
else if ( m_threadType == INTERSECT_THREAD )
|
|
waited += 29999999;
|
|
else if ( m_threadType == FILTER_THREAD )
|
|
waited += 9999999;
|
|
// if its a write thread... do it quick
|
|
else if ( isWrite )
|
|
waited += 19999999;
|
|
// . if it has waited more than 500 ms it needs
|
|
// to launch now...
|
|
// . these values seem to be VERY well tuned on
|
|
// my production machines, but they may have to
|
|
// be tuned for other machines? TODO.
|
|
// . they should auto-tune so when merge is more
|
|
// important it should starve the other spider reads
|
|
else if ( waited >= 500 )
|
|
waited += 9999999;
|
|
// it hurts for these guys to wait that int32_t
|
|
else waited *= 4;
|
|
}
|
|
// is real merge?
|
|
if ( m_entries[i].m_niceness == 1 )
|
|
waited += 19999999;
|
|
// watch out for clock skew
|
|
if ( waited < 0 ) waited = 0;
|
|
// if tied, the lowest time wins
|
|
if ( niceness == minNiceness && waited <= maxWait ) continue;
|
|
// we got a new winner
|
|
mini = i;
|
|
minNiceness = niceness;
|
|
maxWait = waited;
|
|
minIsWrite = isWrite;
|
|
}
|
|
// if no candidate, bail honorably
|
|
if ( mini == -1 ) return false;
|
|
// . if we've got an urgent thread (niceness=1) going on, do not allow
|
|
// niceness=2 threads to launch
|
|
// . actually, don't launch *ANY* if doing an urgent merge even if
|
|
// no niceness=1 thread currently launched
|
|
// . CAUTION! when titledb is merging it dumps tfndb and if tfndb
|
|
// goes urgent,make sure it dumps out with niceness 1, otherwise
|
|
// this will freeze us!! since titledb won't be able to continue
|
|
// merging until tfndb finishes dumping...
|
|
// . Now Rdb::addList just refuses to add data if we have too
|
|
// many unmerged files on disk! Let others still read from us,
|
|
// just not add to us...
|
|
//if ( minNiceness >= 2 && g_numUrgentMerges > 0 ) // && lowest <= 1 )
|
|
// return false;
|
|
// if we're urgent, don't launch a niceness 2 unless he's waited
|
|
// at least 200ms in the queue
|
|
//if ( minNiceness >= 2 && g_numUrgentMerges > 0 && maxWait < 200 )
|
|
// return false;
|
|
// . if the thread to launch has niceness > lowest launched then bail
|
|
// . i.e. don't launch a low-priority thread if we have highs running
|
|
// . we no longer let a niceness of 1 prevent a niceness of 2 from
|
|
// launching, this way we can launch merge threads at a niceness
|
|
// of 1 w/o hurting the spidering too much, but still giving the
|
|
// merge some preferential treatment over the disk so we don't
|
|
// RdbDump data faster than we can finish the merge.
|
|
if ( minNiceness > lowest && lowest < 1 ) return false;
|
|
// . don't launch a low priority thread if there's a high launched
|
|
// from any ThreadQueue
|
|
// . hi priority is niceness <= 0, med/lo priority is niceness >= 1
|
|
//int64_t hiActive = g_threads.m_hiLaunched - g_threads.m_hiReturned;
|
|
// now just limit it to this queue... so you can launch a low
|
|
// merge thread even if there's a high disk read going on
|
|
//if ( minNiceness >= 1 && hiActive > 0 ) return false;
|
|
// if we're low priority and suspended is true then bail
|
|
//if ( minNiceness > 0 && m_isLowPrioritySuspended ) return false;
|
|
// if we're going to launch a high priority thread then CANCEL
|
|
// any low priority disk-read threads already in progress
|
|
//if ( minNiceness <= 0 && highest > 0 ) cancelLowPriorityThreads ();
|
|
// . let's cancel ALL low priority threads if we're launching a high
|
|
// . hi priority is niceness <= 0, low priority is niceness >= 1
|
|
//int64_t loActive = g_threads.m_loLaunched - g_threads.m_loReturned;
|
|
int32_t realNiceness = m_entries[mini].m_niceness;
|
|
//if ( realNiceness <= 0 && (loActive + mdActive) > 0 )
|
|
// // this actually cancels medium priority threads, too
|
|
// g_threads.cancelLowPriorityThreads();
|
|
// point to winning entry
|
|
ThreadEntry *t = &m_entries[mini];
|
|
*/
|
|
}
|
|
|
|
bool ThreadQueue::launchThreadForReals ( ThreadEntry **headPtr ,
|
|
ThreadEntry **tailPtr ) {
|
|
|
|
ThreadEntry *t = *headPtr;
|
|
|
|
// if descriptor was closed, just return error now, we
|
|
// cannot try to re-open because the file might have been
|
|
// unlinked. Sync.cpp does a DISK_THREAD but does not pass in a valid
|
|
// FileState ptr because it does its own saving, so check for NULLs.
|
|
FileState *fs = (FileState *)t->m_state;
|
|
bool allocated = false;
|
|
if ( m_threadType == DISK_THREAD && fs && ! fs->m_doWrite ) {
|
|
// allocate the read buffer here!
|
|
if ( ! fs->m_doWrite && ! fs->m_buf && t->m_bytesToGo > 0 ) {
|
|
int32_t need = t->m_bytesToGo + fs->m_allocOff;
|
|
char *p = (char *) mmalloc ( need , "ThreadReadBuf" );
|
|
if ( p ) {
|
|
fs->m_buf = p + fs->m_allocOff;
|
|
fs->m_allocBuf = p;
|
|
fs->m_allocSize = need;
|
|
allocated = true;
|
|
}
|
|
else
|
|
log("thread: read buf alloc failed "
|
|
"for %" INT32 " bytes.",need);
|
|
// just let the BigFile::readWrite_r() handle the
|
|
// error for the NULL read buf
|
|
}
|
|
// . otherwise, they are intact, so get the real fds
|
|
// . we know the stored File is still around because of that
|
|
bool doWrite = fs->m_doWrite;
|
|
BigFile *bb = fs->m_this;
|
|
fs->m_fd1 = bb->getfd (fs->m_filenum1,!doWrite);//&fs->m_vfd1);
|
|
fs->m_fd2 = bb->getfd (fs->m_filenum2,!doWrite);//&fs->m_vfd2);
|
|
// is this bad?
|
|
if ( fs->m_fd1 < 0 ) log("disk: fd1 is %i for %s",
|
|
fs->m_fd1,bb->getFilename());
|
|
if ( fs->m_fd2 < 0 ) log("disk: fd2 is %i for %s.",
|
|
fs->m_fd2,bb->getFilename());
|
|
fs->m_closeCount1 = getCloseCount_r ( fs->m_fd1 );
|
|
fs->m_closeCount2 = getCloseCount_r ( fs->m_fd2 );
|
|
}
|
|
|
|
// count it as launched now, before we actually launch it
|
|
m_launched++;
|
|
/*
|
|
// priority-based GLOBAL & LOCAL launch count
|
|
if ( realNiceness <= 0 ) m_hiLaunched++;
|
|
else if ( realNiceness == 1 ) m_mdLaunched++;
|
|
else if ( realNiceness >= 2 ) m_loLaunched++;
|
|
// deal with the tiers for disk threads based on read sizes
|
|
if ( m_threadType == DISK_THREAD ) {
|
|
// writes are special cases
|
|
if ( t->m_doWrite ) m_writesLaunched++;
|
|
//FileState *fs = (FileState *)m_entries[mini].m_state;
|
|
int32_t rs = t->m_bytesToGo; // 0;
|
|
//if ( fs ) rs = fs->m_bytesToGo;
|
|
if ( realNiceness >= 2 ) {
|
|
if ( rs > g_conf.m_medReadSize )
|
|
m_loLaunchedBig++;
|
|
else if ( rs > g_conf.m_smaReadSize )
|
|
m_loLaunchedMed++;
|
|
else
|
|
m_loLaunchedSma++;
|
|
}
|
|
else if ( realNiceness >= 1 ) {
|
|
if ( rs > g_conf.m_medReadSize )
|
|
m_mdLaunchedBig++;
|
|
else if ( rs > g_conf.m_smaReadSize )
|
|
m_mdLaunchedMed++;
|
|
else
|
|
m_mdLaunchedSma++;
|
|
}
|
|
else {
|
|
if ( rs > g_conf.m_medReadSize )
|
|
m_hiLaunchedBig++;
|
|
else if ( rs > g_conf.m_smaReadSize )
|
|
m_hiLaunchedMed++;
|
|
else
|
|
m_hiLaunchedSma++;
|
|
}
|
|
}
|
|
*/
|
|
// debug msg
|
|
//if ( m_threadType == 0 )
|
|
// log("creating thread, t=%" UINT32 " state=%" UINT32 "
|
|
//launched = %" INT32 "",
|
|
// t , (int32_t)t->m_state , m_launched );
|
|
// and set the flag
|
|
t->m_isLaunched = true;
|
|
// . launch it
|
|
// . this sets the pthread_t ptr for identificatoin
|
|
// . returns false on error
|
|
//pthread_t tmp;
|
|
loop:
|
|
// debug msg
|
|
if ( g_conf.m_logDebugThread ) {
|
|
int32_t active = m_launched - m_returned ;
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
log(LOG_DEBUG,"thread: [t=0x%" PTRFMT "] launched %s thread. "
|
|
"active=%" INT32 " "
|
|
"niceness=%" INT32 ". waited %" UINT64 " ms in queue.",
|
|
(PTRTYPE)t, getThreadType(), active, t->m_niceness ,
|
|
now - t->m_queuedTime);
|
|
}
|
|
// be lazy with this since it uses a significant amount of cpu
|
|
//if ( now == -1LL ) now = gettimeofdayInMilliseconds();
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
//t->m_launchedTime = g_now;
|
|
t->m_launchedTime = now;
|
|
// loop2:
|
|
// spawn the thread
|
|
int32_t count = 0;
|
|
pid_t pid;
|
|
|
|
#ifndef PTHREADS
|
|
|
|
//int status;
|
|
//int ret;
|
|
// random failure test
|
|
//if ( rand() %10 == 1 ) { err = ENOMEM; goto hadError; }
|
|
// malloc twice the size
|
|
t->m_stackSize = STACK_SIZE;
|
|
//t->m_stack = (char *)mmalloc ( t->m_stackSize , "Threads" );
|
|
int32_t si = g_threads.getStack ( );
|
|
if ( si < 0 ) {
|
|
log(LOG_LOGIC,"thread: Unable to get stack. Bad engineer.");
|
|
goto hadError;
|
|
}
|
|
t->m_si = si;
|
|
t->m_stack = s_stackPtrs [ si ];
|
|
// UNprotect the whole stack so we can use it
|
|
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE ,
|
|
PROT_READ | PROT_WRITE );
|
|
// clear g_errno
|
|
g_errno = 0;
|
|
// . make another process
|
|
// . do not use sig handlers, so if a child process gets any unhandled
|
|
// signal (like SEGV) it will just exit
|
|
pid = clone ( startUp , t->m_stack + t->m_stackSize ,
|
|
CLONE_FS | CLONE_FILES | CLONE_VM | //CLONE_SIGHAND |
|
|
SIGCHLD ,
|
|
(void *)t );
|
|
// . we set the pid because we are the one that checks it!
|
|
// . if we just let him do it, when we check in cleanup routine
|
|
// we can get an uninitialized pid
|
|
t->m_pid = pid;
|
|
// might as well bitch if we should here
|
|
if ( s_bad ) {
|
|
log(LOG_LOGIC,"thread: PID received: %" INT32 " > %" INT32 ". Bad.",
|
|
s_badPid, (int32_t)MAX_PID);
|
|
//char *xx = NULL; *xx = 0;
|
|
}
|
|
// wait for him
|
|
//ret = waitpid ( -1*pid , &status , 0 );
|
|
//if ( ret != pid )
|
|
// log("waitpid(pid=%" INT32 "): ret=%" INT32 " err=%s",
|
|
// (int32_t)pid,(int32_t)ret,mstrerror(errno));
|
|
// check if he's done
|
|
//if ( ! t->m_isDone ) log("NOT DONE");
|
|
// set the pid
|
|
//t->m_pid = pid;
|
|
// error?
|
|
if ( pid == (pid_t)-1 ) g_errno = errno;
|
|
|
|
//
|
|
// now use pthreads again... are they stable yet?
|
|
//
|
|
#else
|
|
|
|
// assume it does not go through
|
|
t->m_needsJoin = false;
|
|
|
|
// pthread inherits our sigmask, so don't let it handle sigalrm
|
|
// signals in Loop.cpp, it'll screw things up. that handler
|
|
// is only meant to be called by the main process. if we end up
|
|
// double calling it, this thread may think g_callback is non-null
|
|
// then it gets set to NULL, then the thread cores! seen it...
|
|
// sigset_t sigs;
|
|
// sigemptyset ( &sigs );
|
|
// sigaddset ( &sigs , SIGALRM );
|
|
// sigaddset ( &sigs , SIGVTALRM );
|
|
// if ( sigprocmask ( SIG_BLOCK , &sigs , NULL ) < 0 )
|
|
// log("threads: failed to block sig");
|
|
|
|
|
|
// supply our own stack to make pthread_create() fast otherwise
|
|
// it has slowness issues with mmap()
|
|
// http://www.gossamer-threads.com/lists/linux/kernel/960227
|
|
t->m_stackSize = STACK_SIZE;
|
|
//t->m_stack = (char *)mmalloc ( t->m_stackSize , "Threads" );
|
|
int32_t si = g_threads.getStack ( );
|
|
if ( si < 0 ) {
|
|
log(LOG_LOGIC,"thread: Unable to get stack. Bad engineer.");
|
|
goto hadError;
|
|
}
|
|
t->m_si = si;
|
|
t->m_stack = s_stackPtrs [ si ];
|
|
// check it's aligned
|
|
if ( (uint64_t)(t->m_stack) & (THRPAGESIZE-1) ) {
|
|
char *xx=NULL;*xx=0; }
|
|
// UNprotect the whole stack so we can use it
|
|
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE ,
|
|
PROT_READ | PROT_WRITE );
|
|
|
|
pthread_attr_t attr;
|
|
pthread_attr_init ( &attr );
|
|
pthread_attr_setstack ( &attr , t->m_stack , t->m_stackSize );
|
|
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: [t=0x%" PTRFMT "] "
|
|
"remove from wait list, add to launch list",
|
|
(PTRTYPE)t);
|
|
|
|
// remove from waiting linked list, whichever one it was in
|
|
removeLink2 ( headPtr , tailPtr , t );
|
|
// add to 'launched' linked list
|
|
addLink ( &m_launchedHead , t );
|
|
|
|
// save the waiting linked list we came from in case we get cancelled
|
|
// and we have to put it back into it
|
|
t->m_bestHeadPtr = headPtr;
|
|
t->m_bestTailPtr = tailPtr;
|
|
|
|
// debug
|
|
if ( g_conf.m_logDebugThread )
|
|
log("threads: pthread_create: "
|
|
"stack=%" PTRFMT " stacksize=%" INT64 ""
|
|
, (PTRTYPE)t->m_stack
|
|
, (int64_t)t->m_stackSize );
|
|
|
|
// this returns 0 on success, or the errno otherwise
|
|
g_errno = pthread_create ( &t->m_joinTid , &attr, startUp2 , t) ;
|
|
|
|
// if ( sigprocmask ( SIG_UNBLOCK , &sigs , NULL ) < 0 )
|
|
// log("threads: failed to unblock sig");
|
|
|
|
|
|
#endif
|
|
|
|
// we're back from pthread_create
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: Back from clone "
|
|
"t=0x%" PTRFMT " pid=%" INT32 ".",
|
|
(PTRTYPE)t,(int32_t)pid);
|
|
|
|
|
|
// return true on successful creation of the thread
|
|
if ( g_errno == 0 ) {
|
|
// good stuff, the thread needs a join now
|
|
t->m_needsJoin = true;
|
|
if ( count > 0 )
|
|
log("thread: Call to clone looped %" INT32 " times.",
|
|
count);
|
|
return true;
|
|
}
|
|
|
|
//#undef usleep
|
|
|
|
// forever loop
|
|
if ( g_errno == EAGAIN ) {
|
|
if ( count++ == 0 )
|
|
log("thread: Call to clone had error: %s.",
|
|
mstrerror(g_errno));
|
|
//usleep(1);
|
|
//goto loop2;
|
|
goto hadError;
|
|
}
|
|
// debug msg
|
|
// log("created tid=%" INT32 "",(int32_t)t->m_tid);
|
|
// return true;
|
|
//}
|
|
// do again if g_errno is AGAIN
|
|
if ( g_errno == EINTR ) {
|
|
log("thread: Call to clone was interrupted. Trying again.");
|
|
goto loop;
|
|
}
|
|
//#ifndef _PTHREADS_
|
|
hadError:
|
|
//#endif
|
|
|
|
if ( g_errno )
|
|
log("thread: pthread_create had error = %s",
|
|
mstrerror(g_errno));
|
|
|
|
// it didn't launch, did it? dec the count.
|
|
m_launched--;
|
|
// re-protect this stack
|
|
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE,
|
|
PROT_NONE );
|
|
// RETURN THE STACK
|
|
g_threads.returnStack ( t->m_si );
|
|
t->m_stack = NULL;
|
|
|
|
/*
|
|
// priority-based LOCAL & GLOBAL launch counts
|
|
if ( realNiceness <= 0 ) m_hiLaunched--;
|
|
else if ( realNiceness == 1 ) m_mdLaunched--;
|
|
else if ( realNiceness >= 2 ) m_loLaunched--;
|
|
// . deal with the tiers for disk threads based on read sizes
|
|
// . WARNING: we cannot easily change tiers dynamically
|
|
// because it will throw these counts off
|
|
if ( m_threadType == DISK_THREAD ) {
|
|
// writes are special cases
|
|
if ( t->m_doWrite ) m_writesLaunched--;
|
|
//FileState *fs = (FileState *)m_entries[mini].m_state;
|
|
int32_t rs = t->m_bytesToGo; // 0;
|
|
//if ( fs ) rs = fs->m_bytesToGo;
|
|
if ( realNiceness >= 2 ) {
|
|
if ( rs > g_conf.m_medReadSize )
|
|
m_loLaunchedBig--;
|
|
else if ( rs > g_conf.m_smaReadSize )
|
|
m_loLaunchedMed--;
|
|
else
|
|
m_loLaunchedSma--;
|
|
}
|
|
else if ( realNiceness >= 1 ) {
|
|
if ( rs > g_conf.m_medReadSize )
|
|
m_mdLaunchedBig--;
|
|
else if ( rs > g_conf.m_smaReadSize )
|
|
m_mdLaunchedMed--;
|
|
else
|
|
m_mdLaunchedSma--;
|
|
}
|
|
else {
|
|
if ( rs > g_conf.m_medReadSize )
|
|
m_hiLaunchedBig--;
|
|
else if ( rs > g_conf.m_smaReadSize )
|
|
m_hiLaunchedMed--;
|
|
else
|
|
m_hiLaunchedSma--;
|
|
}
|
|
}
|
|
*/
|
|
// unset the flag
|
|
t->m_isLaunched = false;
|
|
// bail on other errors
|
|
log("thread: Call to clone had error: %s.", mstrerror(g_errno));
|
|
// correction on this error
|
|
log("thread: Try not using so much memory. "
|
|
"memused now =%" INT64 ".",g_mem.getUsedMem());
|
|
// free allocated buffer
|
|
if ( allocated ) {
|
|
mfree ( fs->m_allocBuf , fs->m_allocSize , "ThreadReadBuf" );
|
|
fs->m_buf = NULL;
|
|
}
|
|
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: [t=0x%" PTRFMT "] "
|
|
"remove from launched list, RE-add to wait list",
|
|
(PTRTYPE)t);
|
|
|
|
// remove from launched linked list
|
|
removeLink ( &m_launchedHead , t );
|
|
// back into the queue waiting to launch
|
|
addLinkToHead ( headPtr , tailPtr , t );
|
|
|
|
// i'm not sure return value matters at this point? the thread
|
|
// is queued and hopefully will launch at some point
|
|
return false;
|
|
|
|
/*
|
|
// if this is the direct thread request do not call callback, just
|
|
// return false, otherwise we get into an unexpected loop thingy
|
|
if ( t == te )
|
|
return log("thread: Returning false.");
|
|
// do it blocking
|
|
log("thread: Calling without thread. This will crash many times. "
|
|
"Please fix it.");
|
|
// return false so caller will re-do without thread!
|
|
// so BigFile::readwrite() will retry without thread and we won't
|
|
// get into a weird loop thingy
|
|
if ( te ) return false;
|
|
|
|
// uint64_t profilerStart,profilerEnd;
|
|
// uint64_t statStart,statEnd;
|
|
|
|
//if (g_conf.m_profilingEnabled){
|
|
// address=(int32_t)t->m_startRoutine;
|
|
// g_profiler.startTimer(address, __PRETTY_FUNCTION__);
|
|
//}
|
|
t->m_startRoutine ( t->m_state , t );
|
|
//if (g_conf.m_profilingEnabled) {
|
|
// if(!g_profiler.endTimer(address, __PRETTY_FUNCTION__))
|
|
// log(LOG_WARN,"admin: Couldn't add the fn %" INT32 "",
|
|
// (int32_t)address);
|
|
//}
|
|
t->m_exitTime = gettimeofdayInMilliseconds();
|
|
// flag it for cleanup
|
|
t->m_isDone = true;
|
|
t->m_isLaunched = true;
|
|
// clean it up
|
|
cleanUp ( t , 200);//maxNiceness thread can have to be cleaned up
|
|
// ignore error
|
|
g_errno = 0;
|
|
// we kinda launched one, so say true here
|
|
return true; // false;
|
|
*/
|
|
}
|
|
|
|
#ifndef PTHREADS
|
|
static bool s_firstTime = true;
|
|
#endif
|
|
|
|
// threads start up with cacnellation deferred until pthreads_testcancel()
|
|
// is called, but we never call that
|
|
int startUp ( void *state ) {
|
|
// get thread entry
|
|
ThreadEntry *t = (ThreadEntry *)state;
|
|
|
|
// no! now parent does since he is the one that needs to check it
|
|
// in the cleanup routine
|
|
// remember the pid
|
|
//t->m_pid = getpid();
|
|
// . sanity check
|
|
// . a thread can NOT call this
|
|
#ifndef PTHREADS
|
|
if ( getpid() == s_pid )
|
|
log("thread: Thread has same pid %i as main process.",s_pid);
|
|
#endif
|
|
// the cleanup handler
|
|
//pthread_cleanup_push ( exitWrapper , t ) ; // t->m_state );
|
|
// our signal set
|
|
sigset_t set;
|
|
sigemptyset(&set);
|
|
//sigaddset(&set, SIGHUP);
|
|
// we need this here so if we break the gb process with gdb it
|
|
// does not kill the child processes when it sends out the SIGINT.
|
|
sigaddset(&set, SIGINT);
|
|
// ignore the real time signal, man...
|
|
//sigaddset(&set, GB_SIGRTMIN);
|
|
//pthread_sigmask(SIG_BLOCK, &set, NULL);
|
|
#ifndef PTHREADS
|
|
sigprocmask(SIG_BLOCK, &set, NULL);
|
|
#else
|
|
// turn these off in the thread
|
|
sigaddset ( &set , SIGALRM );
|
|
sigaddset ( &set , SIGVTALRM );
|
|
pthread_sigmask(SIG_BLOCK,&set,NULL);
|
|
#endif
|
|
// . what this lwp's priority be?
|
|
// . can range from -20 to +20
|
|
// . the lower p, the more cpu time it gets
|
|
// . this is really the niceness, not the priority
|
|
int p ;
|
|
// currently our niceness ranges from -1 to 2 for us
|
|
if ( t->m_niceness == 2 ) p = 19 ;
|
|
else if ( t->m_niceness == 1 ) p = 10 ;
|
|
else p = 0 ;
|
|
// remember the tid
|
|
//t->m_tid = pthread_self();
|
|
// debug
|
|
if ( g_conf.m_logDebugThread )
|
|
log(LOG_DEBUG,"thread: [t=0x%" PTRFMT "] "
|
|
"in startup pid=%" INT64 " pppid=%" INT32 "",
|
|
(PTRTYPE)t,(int64_t)getpidtid(),(int32_t)getppid());
|
|
// debug msg
|
|
//fprintf(stderr,"new thread tid=%" INT32 " pid=%" INT32 "\n",
|
|
// (int32_t)t->m_tid,(int32_t)t->m_pid);
|
|
// . set this process's priority
|
|
// . setpriority() is only used for SCHED_OTHER threads
|
|
//if ( pthread_setschedprio ( getpidtid() , p ) ) {
|
|
#ifndef PTHREADS
|
|
if ( setpriority ( PRIO_PROCESS, getpidtid() , p ) < 0 ) {
|
|
// do we even support logging from a thread?
|
|
if ( s_firstTime ) {
|
|
log("thread: Call to "
|
|
"setpriority(%" UINT32 ",%i) in thread "
|
|
"failed: %s. This "
|
|
"message will not be repeated.",
|
|
(uint32_t)getpidtid(),p,mstrerror(errno));
|
|
s_firstTime = false;
|
|
}
|
|
errno = 0;
|
|
}
|
|
#endif
|
|
/*
|
|
sched_param sp;
|
|
int pp;
|
|
int err = pthread_getschedparam ( t->m_tid , &pp , &sp ) ;
|
|
if ( err ) log("thread: startUp: pthread_getschedparam: %s",
|
|
mstrerror(err));
|
|
// adjust the priority
|
|
p = 1;
|
|
sp.sched_priority = p;
|
|
// and reassign
|
|
err = pthread_setschedparam ( t->m_tid , SCHED_OTHER , &sp ) ;
|
|
log("thread: startUp: pthread_setschedparam(%" INT32 "): %s",
|
|
p,mstrerror(err));
|
|
*/
|
|
// somehow, it works ok when we have this print statement delay!!!
|
|
//fprintf(stderr,"thread pid = %" INT32 "\n",(int32_t)getpid());
|
|
// . call the startRoutine
|
|
// . IMPORTANT: this can NEVER do non-blocking stuff
|
|
t->m_startRoutine ( t->m_state , t );
|
|
// pop it off
|
|
//pthread_cleanup_pop ( 1 /*execute handler?*/ );
|
|
|
|
// . now throw it on g_loop's sigqueue
|
|
// . the first 4 bytes of t->m_state should be t->m_callback
|
|
// . no! just use 1 to tell Loop to call g_threads.cleanUp()
|
|
// . TODO: pass in a ptr to cleanUpWrapper() instead of "t"
|
|
// . sival_int is only 4 bytes on a 64 bit arch...
|
|
sigval_t svt;
|
|
svt.sival_int = 1;//(int64_t)t ; //(int)(t->m_state); // fd;
|
|
|
|
|
|
// set exit time
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
t->m_preExitTime = now;
|
|
t->m_exitTime = now;
|
|
if ( g_conf.m_logDebugThread ) {
|
|
|
|
log(LOG_DEBUG,"thread: [t=0x%" PTRFMT "] "
|
|
"done with startup pid=%" INT64 "",
|
|
(PTRTYPE)t,(int64_t)getpidtid());
|
|
}
|
|
|
|
// . now mark thread as ready for removal
|
|
// . do this BEFORE queueing the signal since we're still a thread!!!
|
|
// . cleanUp() will take care of the rest
|
|
// . cleanUp() will call pthread_join on us!
|
|
t->m_isDone = true;
|
|
|
|
// let Loop.cpp's sigHandler_r call g_thread.cleanUp()
|
|
g_threads.m_needsCleanup = true;
|
|
|
|
//if(t->m_niceness > 0) g_threads.m_needBottom = true;
|
|
|
|
// . send the signal
|
|
// . if queue is full g_loop will get a SIGIO and call
|
|
// g_threads.cleanUp()/launchThreads() in it's doPoll() routine
|
|
// . we reserve GB_SIGRTMIN itself for unblocked interrupts for
|
|
// UdpServer
|
|
// . damn, it seems that if the queue is full pthread_join is unable
|
|
// to detach threads.... so sleep until it clears up
|
|
// . HEY! process is supposed to send us an ECHLD signal? right?
|
|
//sigqueue ( s_pid, GB_SIGRTMIN + 1 + t->m_niceness, svt ) ;
|
|
|
|
// . it does not send us a signal automatically, so we must do it!
|
|
// . i noticed during the linkdb rebuild we were not getting the signal
|
|
//sigqueue ( s_pid, GB_SIGRTMIN + 1 + t->m_niceness, svt ) ;
|
|
|
|
// i verified this breaks select() in Loop.cpp out of it's sleep
|
|
//fprintf(stderr,"threads sending SIGCHLD\n");
|
|
|
|
// try a sigchld now! doesn't it already do this? no...
|
|
// on 64bit arch pthread_t is 64bit and pid_t is 32bit
|
|
// i dont think this makes sense with pthreads any more, they don't
|
|
// use pid_t they use pthread_t
|
|
#ifndef PTHREADS
|
|
sigqueue ( (pid_t)(int64_t)s_pid, SIGCHLD, svt ) ;
|
|
#else
|
|
pthread_kill ( s_pid , SIGCHLD );
|
|
#endif
|
|
|
|
|
|
return 0;
|
|
}
|
|
|
|
// pthread_create uses this one
|
|
void *startUp2 ( void *state ) {
|
|
startUp ( state );
|
|
return NULL;
|
|
}
|
|
|
|
// watch out, UdpServer::getEmptySlot() calls UdpServer::suspend() and we
|
|
// could be in a signal handler here
|
|
void ThreadQueue::suspendLowPriorityThreads() {
|
|
// disable for now
|
|
//return;
|
|
// just return if already suspended
|
|
if ( m_isLowPrioritySuspended ) return;
|
|
// log it
|
|
//log("ThreadQueue:: suspending low priority threads!!!!!!");
|
|
// set the flag so low priority threads won't be launched
|
|
m_isLowPrioritySuspended = true;
|
|
// . cancel any outstanding low priority threads that are running
|
|
// . no, only cancel if we run another disk thread
|
|
// . this will happen above in ThreadQueue::launchThread()
|
|
//cancelLowPriorityThreads();
|
|
}
|
|
|
|
// this is called by UdpServer::destroySlot() and should NOT be in a sig handlr
|
|
void ThreadQueue::resumeLowPriorityThreads() {
|
|
// disable for now
|
|
//return;
|
|
// bail if no need
|
|
if ( ! m_isLowPrioritySuspended ) return;
|
|
// turn em back on
|
|
m_isLowPrioritySuspended = false;
|
|
// just in case, though
|
|
//if ( g_inSigHandler ) {
|
|
// log(LOG_LOGIC,"thread: resumeLowPriorityThreads: In sig "
|
|
// "handler.");
|
|
// return;
|
|
//}
|
|
// try to start up some threads then
|
|
g_threads.launchThreads();
|
|
}
|
|
|
|
void ThreadQueue::print ( ) {
|
|
// loop through candidates
|
|
for ( int32_t i = 0 ; i < m_top ; i++ ) {
|
|
ThreadEntry *t = &m_entries[i];
|
|
// print it
|
|
log(LOG_INIT,"thread: address=%" PTRFMT " "
|
|
"pid=%u state=%" PTRFMT " "
|
|
"occ=%i done=%i lnch=%i",
|
|
(PTRTYPE)t , t->m_pid ,
|
|
(PTRTYPE)t->m_state , t->m_isOccupied , t->m_isDone ,
|
|
t->m_isLaunched );
|
|
}
|
|
}
|
|
|
|
const char *ThreadQueue::getThreadType ( ) {
|
|
const char *s = "unknown";
|
|
if ( m_threadType == DISK_THREAD ) s = "disk";
|
|
if ( m_threadType == MERGE_THREAD ) s = "merge";
|
|
if ( m_threadType == INTERSECT_THREAD ) s = "intersectlists";
|
|
if ( m_threadType == FILTER_THREAD ) s = "filter";
|
|
if ( m_threadType == SAVETREE_THREAD ) s = "savetree";
|
|
if ( m_threadType == UNLINK_THREAD ) s = "unlink";
|
|
if ( m_threadType == GENERIC_THREAD ) s = "generic";
|
|
return s;
|
|
}
|
|
|
|
#include "BigFile.h" // FileState class
|
|
/*
|
|
MDW: this is unused
|
|
int32_t Threads::getDiskThreadLoad ( int32_t maxNiceness , int32_t *totalToRead ) {
|
|
ThreadQueue *q = &m_threadQueues[DISK_THREAD];
|
|
ThreadEntry *e = q->m_entries;
|
|
int32_t top = q->m_top;
|
|
*totalToRead = 0;
|
|
int32_t n = 0;
|
|
// we really can't suspend threads cuz they might have the
|
|
// mutex lock so we just cancel the disk threads here then
|
|
for ( int32_t i = 0 ; i < top ; i++ ) {
|
|
// get entry
|
|
ThreadEntry *t = &e[i];
|
|
// skip if not occupied
|
|
if ( ! t->m_isOccupied ) continue;
|
|
// skip if it's nicer than what we want
|
|
if (t->m_niceness > maxNiceness && ! t->m_isLaunched) continue;
|
|
// skip if already done
|
|
if ( t->m_isDone ) continue;
|
|
// cast state data
|
|
FileState *fs = (FileState *) t->m_state;
|
|
// sometimes NULL, like from Sync.cpp's call
|
|
if ( ! fs ) continue;
|
|
// only remove read operations, since write operations get
|
|
// the fd up front
|
|
if ( t->m_doWrite ) continue;
|
|
// how many byte to do
|
|
//int32_t todo = fs->m_bytesToGo - fs->m_bytesDone;
|
|
int32_t todo = t->m_bytesToGo;
|
|
// multiply by 2 if a write
|
|
if ( t->m_doWrite ) todo *= 2;
|
|
// add to total bytes to read
|
|
*totalToRead += todo;
|
|
// count the thread
|
|
n++;
|
|
}
|
|
return n;
|
|
}
|
|
*/
|
|
|
|
// when a BigFile is removed, much like we remove its pages from DiskPageCache
|
|
// we also remove any unlaunched reads/writes on it from the thread queue.
|
|
void ThreadQueue::removeThreads ( BigFile *bf ) {
|
|
// did the BigFile get hosed? that means our BigFile was
|
|
// unlinked or closed before we got a chance to launch the
|
|
// thread.
|
|
//int32_t maxi = -1;
|
|
//ThreadEntry *head ;
|
|
removeThreads2 ( &m_waitHead0 , &m_waitTail0 , bf );
|
|
removeThreads2 ( &m_waitHead1 , &m_waitTail1 , bf );
|
|
removeThreads2 ( &m_waitHead2 , &m_waitTail2 , bf );
|
|
removeThreads2 ( &m_waitHead3 , &m_waitTail3 , bf );
|
|
removeThreads2 ( &m_waitHead4 , &m_waitTail4 , bf );
|
|
removeThreads2 ( &m_waitHead5 , &m_waitTail5 , bf );
|
|
removeThreads2 ( &m_waitHead6 , &m_waitTail6 , bf );
|
|
}
|
|
|
|
void ThreadQueue::removeThreads2 ( ThreadEntry **headPtr ,
|
|
ThreadEntry **tailPtr ,
|
|
BigFile *bf ) {
|
|
|
|
int32_t saved = g_errno;
|
|
|
|
ThreadEntry *t = *headPtr;
|
|
ThreadEntry *nextLink = NULL;
|
|
for ( ; t ; t = nextLink ) {
|
|
// do it here in case we modify the linked list below
|
|
nextLink = t->m_nextLink;
|
|
// get the filestate
|
|
FileState *fs = (FileState *)t->m_state;
|
|
// skip if NULL
|
|
if ( ! fs ) continue;
|
|
// skip if not match
|
|
if ( fs->m_this != (void *)bf ) continue;
|
|
// . let it finish writing if it is a write thread
|
|
// . otherwise, if we are exiting, we could free the
|
|
// buffer being written and cause the thread to core...
|
|
if ( fs->m_doWrite ) {
|
|
log(LOG_INFO,"disk: Not removing write thread.");
|
|
continue;
|
|
}
|
|
// . should we really? if we renamed the file to another,
|
|
// we need to recompute the offsets to read, etc.. so we
|
|
// should fail up to Msg5 with EFILECLOSED or something...
|
|
// . i think we did a rename and it got the same fd, and since
|
|
// we did not remove the launched or done threads after the
|
|
// rename, we're not sure if they read from the newly renamed
|
|
// file or not, and our read offset was for the old file...
|
|
// . at least set the error flag for doneWrapper()
|
|
fs->m_errno2 = EFILECLOSED;
|
|
// log it
|
|
logf(LOG_INFO,"disk: Removing/flagging operation in thread "
|
|
"queue. fs=0x%" PTRFMT "", (PTRTYPE)fs);
|
|
// skip if already done
|
|
if ( t->m_isDone ) continue;
|
|
// skip if launched
|
|
if ( t->m_isLaunched ) continue;
|
|
// note in the log it is launched
|
|
log(LOG_INFO,"disk: Thread is launched.");
|
|
// tell donewrapper what happened
|
|
fs->m_errno = EFILECLOSED;
|
|
g_errno = EFILECLOSED;
|
|
// note it
|
|
//log(LOG_INFO,"disk: Removing operation from thread queue.");
|
|
// remove it from the thread queue
|
|
t->m_isDone = true;
|
|
t->m_isLaunched = false;
|
|
t->m_isOccupied = false;
|
|
// keep track
|
|
//maxi = i;
|
|
|
|
// remove from waiting linked list, whichever one it was in
|
|
removeLink2 ( headPtr , tailPtr , t );
|
|
// add to 'empty' linked list
|
|
addLink ( &m_emptyHead , t );
|
|
|
|
|
|
makeCallback ( t );
|
|
|
|
}
|
|
// do we have to decrement top
|
|
// if ( m_top == maxi + 1 )
|
|
// while ( m_top>0 && !m_entries[m_top-1].m_isOccupied) m_top--;
|
|
|
|
// this was causing us to lose a g_errno value when XmlDoc::~XmlDoc()
|
|
// called BigFile::~BigFile() called removeThreads()
|
|
//called removeThreads2()
|
|
//g_errno = 0;
|
|
g_errno = saved;
|
|
}
|
|
|
|
|
|
void Threads::printState() {
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
|
|
|
|
for ( int32_t i = 0 ; i < m_numQueues; i++ ) {
|
|
ThreadQueue *q = &m_threadQueues[i];
|
|
|
|
// int32_t loActive = q->m_loLaunched - q->m_loReturned;
|
|
// int32_t mdActive = q->m_mdLaunched - q->m_mdReturned;
|
|
// int32_t hiActive = q->m_hiLaunched - q->m_hiReturned;
|
|
// int32_t total = loActive + mdActive + hiActive;
|
|
|
|
//if( total == 0) continue;
|
|
// log(LOG_TIMING,
|
|
// "admin: Thread counts: type:%s "
|
|
// "%" INT32 ":low %" INT32 ":med%" INT32 ":high %" INT32 ":total",
|
|
// q->getThreadType(),loActive, mdActive, hiActive, total);
|
|
|
|
for ( int32_t j = 0 ; j < q->m_top ; j++ ) {
|
|
ThreadEntry *t = &q->m_entries[j];
|
|
|
|
if(!t->m_isOccupied) continue;
|
|
if(t->m_isDone) {
|
|
log(LOG_TIMING,
|
|
"admin: Thread -done- "
|
|
"nice: %" INT32 " "
|
|
"totalTime: %" INT64 " (ms) "
|
|
"queuedTime: %" INT64 "(ms) "
|
|
"runTime: %" INT64 "(ms) "
|
|
"cleanup: %" INT64 "(ms) "
|
|
"callback:%s",
|
|
t->m_niceness,
|
|
now - t->m_queuedTime,
|
|
t->m_launchedTime - t->m_queuedTime,
|
|
t->m_exitTime - t->m_launchedTime,
|
|
now - t->m_exitTime,
|
|
g_profiler.
|
|
getFnName((PTRTYPE)t->m_callback));
|
|
continue;
|
|
}
|
|
if(t->m_isLaunched) {
|
|
log(LOG_TIMING,
|
|
"admin: Thread -launched- "
|
|
"nice: %" INT32 " "
|
|
"totalTime: %" INT64 "(ms) "
|
|
"queuedTime: %" INT64 "(ms) "
|
|
"runTime: %" INT64 "(ms) "
|
|
"callback:%s",
|
|
t->m_niceness,
|
|
now - t->m_queuedTime,
|
|
t->m_launchedTime - t->m_queuedTime,
|
|
now - t->m_launchedTime,
|
|
g_profiler.
|
|
getFnName((PTRTYPE)t->m_callback));
|
|
continue;
|
|
}
|
|
|
|
log(LOG_TIMING,
|
|
"admin: Thread -queued- "
|
|
"nice: %" INT32 " "
|
|
"queueTime: %" INT64 "(ms) "
|
|
"callback:%s",
|
|
t->m_niceness,
|
|
now - t->m_queuedTime,
|
|
g_profiler.getFnName((PTRTYPE)t->m_callback));
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
void ThreadQueue::killAllThreads ( ) {
|
|
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
|
|
ThreadEntry *e = &m_entries[i];
|
|
if ( ! e->m_isOccupied ) continue;
|
|
if ( ! e->m_isLaunched ) continue;
|
|
log("threads: killling thread id %i",(int)e->m_joinTid);
|
|
pthread_kill ( e->m_joinTid , SIGKILL );
|
|
log("threads: joining with thread id %i",(int)e->m_joinTid);
|
|
pthread_join ( e->m_joinTid , NULL );
|
|
}
|
|
}
|
|
|
|
void Threads::killAllThreads ( ) {
|
|
log("threads: killing all threads");
|
|
for ( int32_t j = 0 ; j < m_numQueues ; j++ ) {
|
|
ThreadQueue *tq = &m_threadQueues[j];
|
|
tq->killAllThreads();
|
|
}
|
|
}
|