forked from Mirrors/privacore-open-source-search-engine
More Threads cleanup
This commit is contained in:
12
BigFile.cpp
12
BigFile.cpp
@ -842,7 +842,7 @@ bool BigFile::readwrite ( void *buf ,
|
||||
// . this returns false and sets g_errno on error, true on success
|
||||
// . we should return false cuz we blocked
|
||||
// . thread will add signal to g_loop on completion to call
|
||||
if ( g_threads.call ( DISK_THREAD/*threadType*/, niceness , fstate ,
|
||||
if ( g_threads.call ( THREAD_TYPE_DISK, niceness , fstate ,
|
||||
doneWrapper , readwriteWrapper_r) ) return false;
|
||||
|
||||
saved = g_errno;
|
||||
@ -1468,8 +1468,8 @@ bool readwrite_r ( FileState *fstate , ThreadEntry *t ) {
|
||||
if ( t && t->m_callback == ohcrap ) return false;
|
||||
|
||||
// only set this now if we are the first one
|
||||
// if ( g_threads.m_threadQueues[DISK_THREAD].m_hiReturned ==
|
||||
// g_threads.m_threadQueues[DISK_THREAD].m_hiLaunched )
|
||||
// if ( g_threads.m_threadQueues[THREAD_TYPE_DISK].m_hiReturned ==
|
||||
// g_threads.m_threadQueues[THREAD_TYPE_DISK].m_hiLaunched )
|
||||
// g_lastDiskReadStarted = fstate->m_startTime;
|
||||
|
||||
// fake it out
|
||||
@ -1874,7 +1874,7 @@ bool BigFile::unlinkRename ( // non-NULL for renames, NULL for unlinks
|
||||
// . returns true on successful spawning
|
||||
// . we can't make a disk thread cuz Threads.cpp checks its
|
||||
// FileState member for readSize for thread throttling
|
||||
if ( g_threads.call (UNLINK_THREAD/*threadType*/,1/*niceness*/,
|
||||
if ( g_threads.call (THREAD_TYPE_UNLINK,1/*niceness*/,
|
||||
f , doneRoutine , startRoutine ) )
|
||||
{
|
||||
if( g_conf.m_logTraceBigFile ) {
|
||||
@ -1921,7 +1921,7 @@ bool BigFile::unlinkRename ( // non-NULL for renames, NULL for unlinks
|
||||
//if ( m_pc ) m_pc->rmVfd ( m_vfd );
|
||||
// remove all queued threads that point to us that have not
|
||||
// yet been launched
|
||||
g_threads.m_threadQueues[DISK_THREAD].removeThreads(this);
|
||||
g_threads.m_threadQueues[THREAD_TYPE_DISK].removeThreads(this);
|
||||
}
|
||||
// close em up
|
||||
//close();
|
||||
@ -2284,7 +2284,7 @@ bool BigFile::close ( ) {
|
||||
|
||||
// remove all queued threads that point to us that have not
|
||||
// yet been launched
|
||||
g_threads.m_threadQueues[DISK_THREAD].removeThreads(this);
|
||||
g_threads.m_threadQueues[THREAD_TYPE_DISK].removeThreads(this);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -597,7 +597,7 @@ bool HashTableX::fastSave ( bool useThread ,
|
||||
// skip thread call if we should
|
||||
if ( ! useThread ) goto skip;
|
||||
// make this a thread now
|
||||
if ( g_threads.call ( SAVETREE_THREAD , // threadType
|
||||
if ( g_threads.call ( THREAD_TYPE_SAVETREE , // threadType
|
||||
1 , // niceness
|
||||
this , // top 4 bytes must be cback
|
||||
threadDoneWrapper ,
|
||||
|
@ -1552,7 +1552,7 @@ bool Hostdb::syncHost ( int32_t syncHostId, bool useSecondaryIps ) {
|
||||
m_syncSecondaryIps = useSecondaryIps;
|
||||
h->m_doingSync = 1;
|
||||
// start the sync in a thread, complete when it's done
|
||||
if ( g_threads.call ( GENERIC_THREAD ,
|
||||
if ( g_threads.call ( THREAD_TYPE_GENERIC ,
|
||||
MAX_NICENESS ,
|
||||
this ,
|
||||
syncDoneWrapper ,
|
||||
|
@ -849,7 +849,7 @@ bool Images::makeThumb ( ) {
|
||||
// reset this since filterStart_r() will set it on error
|
||||
m_errno = 0;
|
||||
// callThread returns true on success, in which case we block
|
||||
if ( g_threads.call ( FILTER_THREAD ,
|
||||
if ( g_threads.call ( THREAD_TYPE_FILTER,
|
||||
MAX_NICENESS ,
|
||||
this ,
|
||||
makeThumbWrapper ,
|
||||
|
@ -781,7 +781,7 @@ bool Msg39::intersectLists ( ) { // bool updateReadInfo ) {
|
||||
// . create the thread
|
||||
// . only one of these type of threads should be launched at a time
|
||||
if ( ! m_debug &&
|
||||
g_threads.call ( INTERSECT_THREAD , // threadType
|
||||
g_threads.call ( THREAD_TYPE_INTERSECT, // threadType
|
||||
m_r->m_niceness ,
|
||||
this , // top 4 bytes must be cback
|
||||
Msg39_controlLoopWrapper2,
|
||||
|
2
Msg5.cpp
2
Msg5.cpp
@ -1115,7 +1115,7 @@ bool Msg5::gotList2 ( ) {
|
||||
// . if size is big, make a thread
|
||||
// . let's always make niceness 0 since it wasn't being very
|
||||
// aggressive before
|
||||
if ( g_threads.call ( MERGE_THREAD , // threadType
|
||||
if ( g_threads.call ( THREAD_TYPE_MERGE , // threadType
|
||||
m_niceness , // m_niceness ,
|
||||
this , // state data for callback
|
||||
threadDoneWrapper ,
|
||||
|
@ -79,7 +79,7 @@ bool sendPageThreads ( TcpSocket *s , HttpRequest *r ) {
|
||||
|
||||
FileState *fs = (FileState *)t->m_state;
|
||||
bool diskThread = false;
|
||||
if(q->m_threadType == DISK_THREAD && fs)
|
||||
if(q->m_threadType == THREAD_TYPE_DISK && fs)
|
||||
diskThread = true;
|
||||
|
||||
// might have got pre-called from EDISKSTUCK
|
||||
@ -167,7 +167,7 @@ bool sendPageThreads ( TcpSocket *s , HttpRequest *r ) {
|
||||
p.safePrintf("</table><br><br>");
|
||||
|
||||
|
||||
if(q->m_threadType == DISK_THREAD) disk = q;
|
||||
if(q->m_threadType == THREAD_TYPE_DISK) disk = q;
|
||||
|
||||
}
|
||||
|
||||
|
12
Process.cpp
12
Process.cpp
@ -364,7 +364,7 @@ void hdtempWrapper ( int fd , void *state ) {
|
||||
g_process.m_threadOut = true;
|
||||
// . call thread to call popen
|
||||
// . callThread returns true on success, in which case we block
|
||||
if ( g_threads.call ( FILTER_THREAD ,
|
||||
if ( g_threads.call ( THREAD_TYPE_FILTER,
|
||||
MAX_NICENESS ,
|
||||
NULL , // this
|
||||
hdtempDoneWrapper ,
|
||||
@ -884,7 +884,6 @@ bool Process::shutdown2 ( ) {
|
||||
static bool s_printed = false;
|
||||
|
||||
// wait for all threads to return
|
||||
//int32_t n = g_threads.getNumThreadsOutOrQueued() ;
|
||||
int32_t n = g_threads.getNumWriteThreadsOut();
|
||||
if ( n != 0 && ! m_urgent ) {
|
||||
log(LOG_INFO,"gb: Has %"INT32" write threads out. Waiting for them to finish.", n);
|
||||
@ -1037,18 +1036,9 @@ bool Process::shutdown2 ( ) {
|
||||
}
|
||||
|
||||
|
||||
|
||||
// cleanup threads, this also launches them too
|
||||
g_threads.timedCleanUp(0x7fffffff,MAX_NICENESS);
|
||||
|
||||
// wait for all threads to complete...
|
||||
//int32_t n = g_threads.getNumThreadsOutOrQueued() ;
|
||||
//if ( n > 0 )
|
||||
// return log(LOG_INFO,
|
||||
// "gb: Waiting for %"INT32" threads to complete.",n);
|
||||
|
||||
//log(LOG_INFO,"gb: Has %"INT32" threads out.",n);
|
||||
|
||||
|
||||
//ok, resetAll will close httpServer's socket so now is the time to
|
||||
//call the callback.
|
||||
|
@ -1355,7 +1355,7 @@ bool RdbCache::save ( bool useThreads ) {
|
||||
m_isSaving = true;
|
||||
// make a thread. returns true on success, in which case
|
||||
// we return false to indicate we blocked.
|
||||
if ( g_threads.call ( SAVETREE_THREAD ,
|
||||
if ( g_threads.call ( THREAD_TYPE_SAVETREE ,
|
||||
1 , // niceness
|
||||
this , // state
|
||||
threadDoneWrapper , // callback
|
||||
|
@ -2441,7 +2441,7 @@ bool RdbTree::fastSave ( char *dir ,
|
||||
// skip thread call if we should
|
||||
if ( ! useThread ) goto skip;
|
||||
// make this a thread now
|
||||
if ( g_threads.call ( SAVETREE_THREAD , // threadType
|
||||
if ( g_threads.call ( THREAD_TYPE_SAVETREE , // threadType
|
||||
1 , // niceness
|
||||
this , // top 4 bytes must be cback
|
||||
threadDoneWrapper ,
|
||||
|
1272
Threads.cpp
1272
Threads.cpp
File diff suppressed because it is too large
Load Diff
92
Threads.h
92
Threads.h
@ -6,7 +6,6 @@
|
||||
#ifndef GB_THREADS_H
|
||||
#define GB_THREADS_H
|
||||
|
||||
#define MAX_THREAD_QUEUES 7
|
||||
|
||||
#include <sys/types.h> // pid_t
|
||||
|
||||
@ -16,33 +15,37 @@
|
||||
pthread_t getpidtid();
|
||||
|
||||
// user-defined thread types
|
||||
#define DISK_THREAD 0
|
||||
#define MERGE_THREAD 1
|
||||
#define INTERSECT_THREAD 2
|
||||
#define FILTER_THREAD 3
|
||||
#define SAVETREE_THREAD 4
|
||||
#define UNLINK_THREAD 5
|
||||
#define GENERIC_THREAD 6
|
||||
enum
|
||||
{
|
||||
THREAD_TYPE_DISK,
|
||||
THREAD_TYPE_MERGE,
|
||||
THREAD_TYPE_INTERSECT,
|
||||
THREAD_TYPE_FILTER,
|
||||
THREAD_TYPE_SAVETREE,
|
||||
THREAD_TYPE_UNLINK,
|
||||
THREAD_TYPE_GENERIC,
|
||||
THREAD_TYPE_MAX
|
||||
};
|
||||
|
||||
|
||||
#define MAX_NICENESS 2
|
||||
|
||||
// . a ThreadQueue has a list of thread entries
|
||||
// . each thread entry represents a thread in progress or waiting to be created
|
||||
class ThreadEntry {
|
||||
|
||||
public:
|
||||
public:
|
||||
int32_t m_niceness ;
|
||||
void (* m_callback)(void *state,class ThreadEntry *) ;
|
||||
void *m_state ;
|
||||
// returns a void * :
|
||||
void *(* m_startRoutine)(void *,class ThreadEntry *) ;
|
||||
void *(* m_startRoutine)(void *,class ThreadEntry *) ;
|
||||
pid_t m_pid ; // process id for waitpid()
|
||||
bool m_isOccupied ; // is thread waiting/going?
|
||||
bool m_isLaunched ; // has it been launched?
|
||||
bool m_isDone ; // is it done running?
|
||||
bool m_readyForBail ; // BigFile.cpp stuck reads
|
||||
char *m_allocBuf ; // BigFile.cpp stuck reads
|
||||
int32_t m_allocSize ; // BigFile.cpp stuck reads
|
||||
int32_t m_allocSize ; // BigFile.cpp stuck reads
|
||||
int32_t m_errno ; // BigFile.cpp stuck reads
|
||||
int32_t m_bytesToGo ; // BigFile.cpp stuck reads
|
||||
int64_t m_queuedTime ; // when call() was called
|
||||
@ -68,12 +71,11 @@ class ThreadEntry {
|
||||
ThreadEntry **m_bestTailPtr;
|
||||
};
|
||||
|
||||
//#define MAX_THREAD_ENTRIES 1024
|
||||
|
||||
|
||||
// our Thread class has one ThreadQueue per thread type
|
||||
class ThreadQueue {
|
||||
|
||||
public:
|
||||
public:
|
||||
// what type of threads are in this queue (used-defined)?
|
||||
char m_threadType;
|
||||
// how many threads have been launched total over time?
|
||||
@ -98,11 +100,11 @@ class ThreadQueue {
|
||||
// linked list head for empty thread entries
|
||||
ThreadEntry *m_emptyHead;
|
||||
|
||||
// 8 heads/tails for linked lists of thread entries waiting to launch
|
||||
// heads/tails for linked lists of thread entries waiting to launch
|
||||
ThreadEntry *m_waitHead0;
|
||||
ThreadEntry *m_waitHead1;
|
||||
ThreadEntry *m_waitHead2;
|
||||
ThreadEntry *m_waitHead3;
|
||||
// ThreadEntry *m_waitHead3;
|
||||
ThreadEntry *m_waitHead4;
|
||||
ThreadEntry *m_waitHead5;
|
||||
ThreadEntry *m_waitHead6;
|
||||
@ -110,7 +112,7 @@ class ThreadQueue {
|
||||
ThreadEntry *m_waitTail0;
|
||||
ThreadEntry *m_waitTail1;
|
||||
ThreadEntry *m_waitTail2;
|
||||
ThreadEntry *m_waitTail3;
|
||||
// ThreadEntry *m_waitTail3;
|
||||
ThreadEntry *m_waitTail4;
|
||||
ThreadEntry *m_waitTail5;
|
||||
ThreadEntry *m_waitTail6;
|
||||
@ -121,14 +123,13 @@ class ThreadQueue {
|
||||
ThreadQueue();
|
||||
void reset();
|
||||
|
||||
int32_t getNumThreadsOutOrQueued();
|
||||
int32_t getNumWriteThreadsOut() ;
|
||||
|
||||
|
||||
// . for adding an entry
|
||||
// . returns false and sets errno on error
|
||||
ThreadEntry *addEntry ( int32_t niceness,
|
||||
void *state ,
|
||||
void *state ,
|
||||
void (* callback )(void *state,
|
||||
class ThreadEntry *t) ,
|
||||
void *(* startRoutine)(void *state,
|
||||
@ -154,17 +155,9 @@ class ThreadQueue {
|
||||
|
||||
void print ( ) ;
|
||||
|
||||
// these are called by g_udpServer2, the high priority udp server
|
||||
void suspendLowPriorityThreads();
|
||||
void resumeLowPriorityThreads();
|
||||
|
||||
// this is true if low priority threads are temporarily suspended
|
||||
bool m_isLowPrioritySuspended ;
|
||||
|
||||
// . cancel running low priority threads
|
||||
// . called by suspendLowPriorityThreads() when first called
|
||||
//void cancelLowPriorityThreads();
|
||||
|
||||
// return m_threadType as a NULL-terminated string
|
||||
const char *getThreadType () ;
|
||||
|
||||
@ -175,9 +168,7 @@ class ThreadQueue {
|
||||
|
||||
// this Threads class has a list of ThreadQueues, 1 per thread type
|
||||
class Threads {
|
||||
|
||||
public:
|
||||
|
||||
public:
|
||||
Threads();
|
||||
|
||||
// returns false and sets errno on error, true otherwise
|
||||
@ -202,7 +193,6 @@ class Threads {
|
||||
void printQueue ( int32_t q ) { m_threadQueues[q].print(); };
|
||||
void printState();
|
||||
|
||||
|
||||
// disable all threads... no more will be created, those in queues
|
||||
// will never be spawned
|
||||
void disableThreads () { m_disabled = true; };
|
||||
@ -216,13 +206,11 @@ class Threads {
|
||||
// sigqueue to call "callback" with "state" as the parameter
|
||||
// . niceness deteremines the niceness of this signal as well as
|
||||
// the thread's priority
|
||||
bool call ( char type ,
|
||||
int32_t niceness ,
|
||||
void *state ,
|
||||
void (* threadDoneCallback)(void *state,
|
||||
class ThreadEntry *t) ,
|
||||
void *(* startRoutine )(void *state,
|
||||
class ThreadEntry *t) );
|
||||
bool call ( char type,
|
||||
int32_t niceness,
|
||||
void *state,
|
||||
void (* threadDoneCallback)(void *state, class ThreadEntry *t) ,
|
||||
void *(* startRoutine )(void *state, class ThreadEntry *t) );
|
||||
|
||||
// try to launch threads waiting to be launched in any queue
|
||||
int32_t launchThreads ();
|
||||
@ -236,13 +224,6 @@ class Threads {
|
||||
//calls callbacks and launches all threads
|
||||
int32_t timedCleanUp (int32_t maxTime, int32_t niceness );//= MAX_NICENESS);
|
||||
|
||||
// these are called by g_udpServer2, the high priority udp server
|
||||
void suspendLowPriorityThreads();
|
||||
void resumeLowPriorityThreads();
|
||||
|
||||
// cancels low priority threads running in each ThreadQueue
|
||||
//void cancelLowPriorityThreads();
|
||||
|
||||
// . gets the number of disk threads (seeks) and total bytes to read
|
||||
// . ignores disk threads that are too nice (over maxNiceness)
|
||||
int32_t getDiskThreadLoad ( int32_t maxNiceness , int32_t *totalToRead ) ;
|
||||
@ -250,35 +231,22 @@ class Threads {
|
||||
ThreadQueue* getThreadQueues() { return &m_threadQueues[0];}
|
||||
int32_t getNumThreadQueues() { return m_numQueues; }
|
||||
|
||||
// used by UdpServer to see if it should call a low priority callback
|
||||
//int32_t getNumActiveHighPriorityCpuThreads() ;
|
||||
// all high priority threads...
|
||||
int32_t getNumActiveHighPriorityThreads() ;
|
||||
|
||||
bool hasHighPriorityCpuThreads() ;
|
||||
|
||||
int32_t getNumThreadsOutOrQueued();
|
||||
int32_t getNumWriteThreadsOut() ;
|
||||
|
||||
// counts the high/low priority (niceness <= 0) threads
|
||||
//int64_t m_hiLaunched;
|
||||
//int64_t m_hiReturned;
|
||||
//int64_t m_loLaunched;
|
||||
//int64_t m_loReturned;
|
||||
|
||||
bool m_needsCleanup;
|
||||
//bool m_needBottom;
|
||||
|
||||
bool m_initialized;
|
||||
|
||||
// private:
|
||||
|
||||
// . allow up to MAX_THREAD_QUEUES different thread types for now
|
||||
// . allow up to THREAD_TYPE_MAX different thread types for now
|
||||
// . types are user-defined numbers
|
||||
// . each type has a corresponding thread queue
|
||||
// . when a thread is done we place a signal on g_loop's sigqueue so
|
||||
// that it will call m_callback w/ m_state
|
||||
ThreadQueue m_threadQueues [ MAX_THREAD_QUEUES ];
|
||||
// that it will call m_callback w/ m_state
|
||||
ThreadQueue m_threadQueues [ THREAD_TYPE_MAX ];
|
||||
int32_t m_numQueues;
|
||||
|
||||
bool m_disabled;
|
||||
|
@ -10860,7 +10860,7 @@ char **XmlDoc::getFilteredContent ( ) {
|
||||
if ( ! m_mimeValid ) { char *xx=NULL;*xx=0; }
|
||||
|
||||
// do it
|
||||
if ( g_threads.call ( FILTER_THREAD ,
|
||||
if ( g_threads.call ( THREAD_TYPE_FILTER,
|
||||
MAX_NICENESS ,
|
||||
this ,
|
||||
filterDoneWrapper ,
|
||||
|
2
main.cpp
2
main.cpp
@ -4874,7 +4874,7 @@ skip:
|
||||
//int pid;
|
||||
for ( int32_t i = 0 ; i < s_numThreads ; i++ ) {
|
||||
//int err = pthread_create ( &tid1,&s_attr,startUp,(void *)i) ;
|
||||
if (!g_threads.call(GENERIC_THREAD,0,
|
||||
if (!g_threads.call(THREAD_TYPE_GENERIC,0,
|
||||
(void *)(PTRTYPE)i,NULL,startUp)){
|
||||
log("test: Thread launch failed."); return; }
|
||||
log(LOG_INIT,"test: Launched thread #%"INT32".",i);
|
||||
|
Reference in New Issue
Block a user