overhauled the main loop. (BIGLOOP) in Loop.cpp.
sigtimedwait() was cutting it, it was queueing up too many DUPLICATE signals and overflowing the rt signal queue. now gb has its own real-time signal queueing logic that just sets the bit of the FDs that need attention. i think threaded reads/writes are better now too but the performance graph is broken so i need to fix that first. the threads page looks good though. overhaul this hopefully is a massive and stable performance improvement.
This commit is contained in:
667
Loop.cpp
667
Loop.cpp
@ -122,6 +122,9 @@ static void sighupHandler ( int x , siginfo_t *info , void *y ) ;
|
||||
static void sigioHandler ( int x , siginfo_t *info , void *y ) ;
|
||||
static void sigalrmHandler( int x , siginfo_t *info , void *y ) ;
|
||||
|
||||
long g_fdWriteBits[MAX_NUM_FDS/32];
|
||||
long g_fdReadBits [MAX_NUM_FDS/32];
|
||||
|
||||
void Loop::unregisterReadCallback ( int fd, void *state ,
|
||||
void (* callback)(int fd,void *state),
|
||||
bool silent ){
|
||||
@ -547,6 +550,101 @@ void Loop::returnSlot ( Slot *s ) {
|
||||
m_head = s;
|
||||
}
|
||||
|
||||
|
||||
// . come here when we get a GB_SIGRTMIN+X signal etc.
|
||||
// . do not call anything from here because the purpose of this is to just
|
||||
// queue the signals up and DO DEDUPING which linux does not do causing
|
||||
// the sigqueue to overflow.
|
||||
// . we should break out of the sleep loop after the signal is handled
|
||||
// so we can handle/process the queued signals properly. 'man sleep'
|
||||
// states "sleep() makes the calling process sleep until seconds
|
||||
// seconds have elapsed or a signal arrives which is not ignored."
|
||||
void sigHandlerQueue_r ( int x , siginfo_t *info , void *v ) {
|
||||
|
||||
// if we just needed to cleanup a thread
|
||||
if ( info->si_signo == SIGCHLD ) {
|
||||
// this has no fd really, Threads.cpp just sends it when
|
||||
// the thread is done
|
||||
g_threads.m_needsCleanup = true;
|
||||
return;
|
||||
}
|
||||
|
||||
if ( info->si_code == SI_QUEUE ) {
|
||||
//log("admin: got sigqueue");
|
||||
// the thread is done
|
||||
g_threads.m_needsCleanup = true;
|
||||
return;
|
||||
}
|
||||
|
||||
// extract the file descriptor that needs attention
|
||||
int fd = info->si_fd;
|
||||
|
||||
if ( fd >= MAX_NUM_FDS ) {
|
||||
log("loop: CRITICAL ERROR. fd=%i > %i",fd,(int)MAX_NUM_FDS);
|
||||
return;
|
||||
}
|
||||
|
||||
// set the right callback
|
||||
|
||||
// info->si_band values:
|
||||
//#define POLLIN 0x0001 /* There is data to read */
|
||||
//#define POLLPRI 0x0002 /* There is urgent data to read */
|
||||
//#define POLLOUT 0x0004 /* Writing now will not block */
|
||||
//#define POLLERR 0x0008 /* Error condition */
|
||||
//#define POLLHUP 0x0010 /* Hung up */
|
||||
//#define POLLNVAL 0x0020 /* Invalid request: fd not open */
|
||||
int band = info->si_band;
|
||||
// translate SIGPIPE's to band of POLLHUP
|
||||
if ( info->si_signo == SIGPIPE ) {
|
||||
band = POLLHUP;
|
||||
//log("loop: Received SIGPIPE signal. Broken pipe.");
|
||||
}
|
||||
|
||||
// . call the appropriate handler(s)
|
||||
// . TODO: bitch if no callback to handle the read!!!!!!!
|
||||
// . NOTE: when it's connected it sets both POLLIN and POLLOUT
|
||||
// . NOTE: or when a socket is trying to connect to it if it's listener
|
||||
//if ( band & (POLLIN | POLLOUT) == (POLLIN | POLLOUT) )
|
||||
// g_loop.callCallbacks_ass ( true/*forReading?*/ , fd );
|
||||
if ( band & POLLIN ) {
|
||||
// keep stats on this now since some linuxes dont work right
|
||||
g_stats.m_readSignals++;
|
||||
//log("Loop: read %lli fd=%i",gettimeofdayInMilliseconds(),fd);
|
||||
//g_loop.callCallbacks_ass ( true , fd );
|
||||
g_fdReadBits[fd/32] = 1<<(fd%32);
|
||||
}
|
||||
else if ( band & POLLPRI ) {
|
||||
// keep stats on this now since some linuxes dont work right
|
||||
g_stats.m_readSignals++;
|
||||
//log("Loop: read %lli fd=%i",gettimeofdayInMilliseconds(),fd);
|
||||
//g_loop.callCallbacks_ass ( true , fd ) ;
|
||||
g_fdReadBits[fd/32] = 1<<(fd%32);
|
||||
}
|
||||
else if ( band & POLLOUT ) {
|
||||
// keep stats on this now since some linuxes dont work right
|
||||
g_stats.m_writeSignals++;
|
||||
//log("Loop: write %lli fd=%i",gettimeofdayInMilliseconds(),fd)
|
||||
//g_loop.callCallbacks_ass ( false , fd );
|
||||
g_fdWriteBits[fd/32] = 1<<(fd%32);
|
||||
}
|
||||
// fix qainject1() test with this
|
||||
else if ( band & POLLERR ) {
|
||||
//log(LOG_INFO,"loop: got POLLERR on fd=%i.",fd);
|
||||
}
|
||||
//g_loop.callCallbacks_ass ( false , fd );
|
||||
// this happens if the socket closes abruptly
|
||||
// or out of band data, etc... see "man 2 poll" for more info
|
||||
else if ( band & POLLHUP ) {
|
||||
// i see these all the time for fd == 0, so don't print it
|
||||
//if ( fd != 0 )
|
||||
// log(LOG_INFO,"loop: Received hangup on fd=%i.",fd);
|
||||
// it is ready for writing i guess
|
||||
g_fdWriteBits[fd/32] = 1<<(fd%32);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
bool Loop::init ( ) {
|
||||
// redhat 9's NPTL doesn't like our async signals
|
||||
if ( ! g_conf.m_allowAsyncSignals ) g_isHot = false;
|
||||
@ -611,10 +709,43 @@ bool Loop::init ( ) {
|
||||
// . block on any signals in this set (in addition to current sigs)
|
||||
// . use SIG_UNBLOCK to remove signals from block list
|
||||
// . this returns -1 and sets g_errno on error
|
||||
if ( sigprocmask ( SIG_BLOCK, &sigs, 0 ) < 0 ) {
|
||||
g_errno = errno;
|
||||
return log("loop: sigprocmask: %s.",strerror(g_errno));
|
||||
}
|
||||
// . we block a signal so it does not interrupt us, then we can
|
||||
// take it off using our call to sigtimedwait()
|
||||
// . allow it to interrupt us now and we will queue it ourselves
|
||||
// to prevent the linux queue from overflowing
|
||||
// . see 'cat /proc/<pid>/status | grep SigQ' output to see if
|
||||
// overflow occurs. linux does not dedup the signals so when a
|
||||
// host cpu usage hits 100% it seems to miss a ton of signals.
|
||||
// i suspect the culprit is pthread_create() so we need to get
|
||||
// thread pools out soon.
|
||||
// . now we are handling the signals and queueing them ourselves
|
||||
// so comment out this sigprocmask() call
|
||||
// if ( sigprocmask ( SIG_BLOCK, &sigs, 0 ) < 0 ) {
|
||||
// g_errno = errno;
|
||||
// return log("loop: sigprocmask: %s.",strerror(g_errno));
|
||||
// }
|
||||
struct sigaction sa2;
|
||||
// . sa_mask is the set of signals that should be blocked when
|
||||
// we're handling the GB_SIGRTMIN, make this empty
|
||||
// . GB_SIGRTMIN signals will be automatically blocked while we're
|
||||
// handling a SIGIO signal, so don't worry about that
|
||||
// . what sigs should be blocked when in our handler? the same
|
||||
// sigs we are handling i guess
|
||||
memcpy ( &sa2.sa_mask , &sigs , sizeof(sigs) );
|
||||
sa2.sa_flags = SA_SIGINFO ; //| SA_ONESHOT;
|
||||
// call this function
|
||||
sa2.sa_sigaction = sigHandlerQueue_r;
|
||||
g_errno = 0;
|
||||
if ( sigaction ( SIGPIPE, &sa2, 0 ) < 0 ) g_errno = errno;
|
||||
if ( sigaction ( GB_SIGRTMIN , &sa2, 0 ) < 0 ) g_errno = errno;
|
||||
if ( sigaction ( GB_SIGRTMIN + 1, &sa2, 0 ) < 0 ) g_errno = errno;
|
||||
if ( sigaction ( GB_SIGRTMIN + 2, &sa2, 0 ) < 0 ) g_errno = errno;
|
||||
if ( sigaction ( GB_SIGRTMIN + 3, &sa2, 0 ) < 0 ) g_errno = errno;
|
||||
if ( sigaction ( SIGCHLD, &sa2, 0 ) < 0 ) g_errno = errno;
|
||||
if ( sigaction ( SIGIO, &sa2, 0 ) < 0 ) g_errno = errno;
|
||||
if ( g_errno ) log("loop: sigaction(): %s.", mstrerror(g_errno) );
|
||||
|
||||
|
||||
// . we turn this signal on/off to turn interrupts off/on
|
||||
// . clear all signals from the set
|
||||
//sigemptyset ( &m_sigrtmin );
|
||||
@ -644,7 +775,7 @@ bool Loop::init ( ) {
|
||||
// now when we got an unblocked GB_SIGRTMIN signal go here right away
|
||||
#ifndef _VALGRIND_
|
||||
if ( sigaction ( GB_SIGRTMIN, &sa, 0 ) < 0 ) g_errno = errno;
|
||||
if ( g_errno ) log("loop: sigaction GB_SIGRTMIN: %s.", mstrerror(errno));
|
||||
if ( g_errno)log("loop: sigaction GB_SIGRTMIN: %s.", mstrerror(errno));
|
||||
#endif
|
||||
|
||||
// set it this way for SIGIO's
|
||||
@ -885,6 +1016,38 @@ void sigalrmHandler ( int x , siginfo_t *info , void *y ) {
|
||||
//logf(LOG_DEBUG, "xxx now: %lli! approx: %lli", g_now, g_nowApprox);
|
||||
}
|
||||
|
||||
static sigset_t s_rtmin;
|
||||
|
||||
void maskSignals() {
|
||||
|
||||
static bool s_init = false;
|
||||
if ( ! s_init ) {
|
||||
s_init = true;
|
||||
sigemptyset ( &s_rtmin );
|
||||
sigaddset ( &s_rtmin, GB_SIGRTMIN );
|
||||
sigaddset ( &s_rtmin, GB_SIGRTMIN + 1 );
|
||||
sigaddset ( &s_rtmin, GB_SIGRTMIN + 2 );
|
||||
sigaddset ( &s_rtmin, GB_SIGRTMIN + 3 );
|
||||
sigaddset ( &s_rtmin, SIGCHLD );
|
||||
sigaddset ( &s_rtmin, SIGIO );
|
||||
sigaddset ( &s_rtmin, SIGPIPE );
|
||||
}
|
||||
|
||||
// block it
|
||||
if ( sigprocmask ( SIG_BLOCK , &s_rtmin, 0 ) < 0 ) {
|
||||
log("loop: maskSignals: sigprocmask: %s.", strerror(errno));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void unmaskSignals() {
|
||||
// unblock it
|
||||
if ( sigprocmask ( SIG_UNBLOCK , &s_rtmin, 0 ) < 0 ) {
|
||||
log("loop: unmaskSignals: sigprocmask: %s.", strerror(errno));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// shit, we can't make this realtime!! RdbClose() cannot be called by a
|
||||
// real time sig handler
|
||||
@ -929,8 +1092,9 @@ bool Loop::runLoop ( ) {
|
||||
//struct timespec t = { 0 /*seconds*/, 10000000 /*nanoseconds*/};
|
||||
|
||||
// grab any high priority sig first
|
||||
siginfo_t info ;
|
||||
long sigNum ; //= sigwaitinfo ( &sigs1, &info );
|
||||
//siginfo_t info ;
|
||||
//long sigNum ; //= sigwaitinfo ( &sigs1, &info );
|
||||
|
||||
#endif
|
||||
s_lastTime = 0;
|
||||
|
||||
@ -941,231 +1105,328 @@ bool Loop::runLoop ( ) {
|
||||
|
||||
enableTimer();
|
||||
|
||||
long long elapsed;
|
||||
|
||||
// . now loop forever waiting for signals
|
||||
// . but every second check for timer-based events
|
||||
do {
|
||||
|
||||
g_now = gettimeofdayInMilliseconds();
|
||||
BIGLOOP:
|
||||
|
||||
g_now = gettimeofdayInMilliseconds();
|
||||
|
||||
//set the time back to its exact value and reset
|
||||
//the timer.
|
||||
g_nowApprox = g_now;
|
||||
// MDW: won't this hog cpu? just don't disable it in
|
||||
// Process::save2() any more and it should be ok
|
||||
//enableTimer();
|
||||
m_lastPollTime = g_now;
|
||||
m_needsToQuickPoll = false;
|
||||
//set the time back to its exact value and reset
|
||||
//the timer.
|
||||
g_nowApprox = g_now;
|
||||
// MDW: won't this hog cpu? just don't disable it in
|
||||
// Process::save2() any more and it should be ok
|
||||
//enableTimer();
|
||||
m_lastPollTime = g_now;
|
||||
m_needsToQuickPoll = false;
|
||||
|
||||
|
||||
/*
|
||||
// test the heartbeat core...
|
||||
if ( g_numAlarms > 100 ) {
|
||||
/*
|
||||
// test the heartbeat core...
|
||||
if ( g_numAlarms > 100 ) {
|
||||
goo:
|
||||
long j;
|
||||
for ( long k = 0 ; k < 2000000000 ; k++ ) {
|
||||
j=k *5;
|
||||
long j;
|
||||
for ( long k = 0 ; k < 2000000000 ; k++ ) {
|
||||
j=k *5;
|
||||
}
|
||||
goto goo;
|
||||
}
|
||||
*/
|
||||
|
||||
g_errno = 0;
|
||||
|
||||
if ( m_shutdown ) {
|
||||
// a msg
|
||||
if (m_shutdown==1)
|
||||
log(LOG_INIT,"loop: got SIGHUP or SIGTERM.");
|
||||
else if (m_shutdown==2)
|
||||
log(LOG_INIT,"loop: got SIGBAD in thread.");
|
||||
else
|
||||
log(LOG_INIT,"loop: got SIGPWR.");
|
||||
// . turn off interrupts here because it doesn't help to do
|
||||
// it in the thread
|
||||
// . TODO: turn off signals for sigbadhandler()
|
||||
interruptsOff();
|
||||
// if thread got the signal, just wait for him to save all
|
||||
// Rdbs and then dump core
|
||||
if ( m_shutdown == 2 ) {
|
||||
//log(0,"Thread is saving & shutting down urgently.");
|
||||
//while ( 1 == 1 ) sleep (50000);
|
||||
log("loop: Resuming despite thread crash.");
|
||||
m_shutdown = 0;
|
||||
goto BIGLOOP;
|
||||
}
|
||||
goto goo;
|
||||
}
|
||||
*/
|
||||
// otherwise, thread did not save, so we must do it
|
||||
log ( LOG_INIT ,"loop: Saving and shutting down urgently.");
|
||||
// . this will save all Rdb's and dump core
|
||||
// . since "urgent" is true it won't broadcast its shutdown
|
||||
// to all hosts
|
||||
//#ifndef NO_MAIN
|
||||
//mainShutdown( true ); // urgent?
|
||||
//#endif
|
||||
g_process.shutdown ( true );
|
||||
}
|
||||
|
||||
g_errno = 0;
|
||||
//g_udpServer2.sendPoll_ass(true,g_now);
|
||||
//g_udpServer2.process_ass ( g_now );
|
||||
// MDW: see if this works without this junk, if not then
|
||||
// put it back in
|
||||
// seems like never getting signals on redhat 16-core box.
|
||||
// we always process dgrams through this... wtf? try taking
|
||||
// it out and seeing what's happening
|
||||
//g_udpServer.sendPoll_ass (true,g_now);
|
||||
//g_udpServer.process_ass ( g_now );
|
||||
// and dns now too
|
||||
//g_dns.m_udpServer.sendPoll_ass(true,g_now);
|
||||
//g_dns.m_udpServer.process_ass ( g_now );
|
||||
|
||||
if ( m_shutdown ) {
|
||||
// a msg
|
||||
if (m_shutdown==1)
|
||||
log(LOG_INIT,"loop: got SIGHUP or SIGTERM.");
|
||||
else if (m_shutdown==2)
|
||||
log(LOG_INIT,"loop: got SIGBAD in thread.");
|
||||
else
|
||||
log(LOG_INIT,"loop: got SIGPWR.");
|
||||
// . turn off interrupts here because it doesn't help to do
|
||||
// it in the thread
|
||||
// . TODO: turn off signals for sigbadhandler()
|
||||
interruptsOff();
|
||||
// if thread got the signal, just wait for him to save all
|
||||
// Rdbs and then dump core
|
||||
if ( m_shutdown == 2 ) {
|
||||
//log (0,"Thread is saving and shutting down urgently.");
|
||||
//while ( 1 == 1 ) sleep (50000);
|
||||
log("loop: Resuming despite thread crash.");
|
||||
m_shutdown = 0;
|
||||
continue;
|
||||
}
|
||||
// otherwise, thread did not save, so we must do it
|
||||
log ( LOG_INIT ,"loop: Saving and shutting down urgently.");
|
||||
// . this will save all Rdb's and dump core
|
||||
// . since "urgent" is true it won't broadcast its shutdown
|
||||
// to all hosts
|
||||
//#ifndef NO_MAIN
|
||||
//mainShutdown( true ); // urgent?
|
||||
//#endif
|
||||
g_process.shutdown ( true );
|
||||
}
|
||||
// if there was a high niceness http request within a
|
||||
// quickpoll, we stored it and now we'll call it here.
|
||||
//g_httpServer.callQueuedPages();
|
||||
|
||||
//g_udpServer2.sendPoll_ass(true,g_now);
|
||||
//g_udpServer2.process_ass ( g_now );
|
||||
// MDW: see if this works without this junk, if not then
|
||||
// put it back in
|
||||
// seems like never getting signals on redhat 16-core box.
|
||||
// we always process dgrams through this... wtf? try taking
|
||||
// it out and seeing what's happening
|
||||
//g_udpServer.sendPoll_ass (true,g_now);
|
||||
//g_udpServer.process_ass ( g_now );
|
||||
// and dns now too
|
||||
//g_dns.m_udpServer.sendPoll_ass(true,g_now);
|
||||
//g_dns.m_udpServer.process_ass ( g_now );
|
||||
//g_udpServer.printState ( );
|
||||
|
||||
// if there was a high niceness http request within a
|
||||
// quickpoll, we stored it and now we'll call it here.
|
||||
//g_httpServer.callQueuedPages();
|
||||
// if ( g_someAreQueued ) {
|
||||
// // assume none are queued now, we may get interrupted
|
||||
// // and it may get set back to true
|
||||
// g_someAreQueued = false;
|
||||
// //g_udpServer2.makeCallbacks_ass ( 0 );
|
||||
// //g_udpServer2.makeCallbacks_ass ( 1 );
|
||||
// }
|
||||
|
||||
//g_udpServer.printState ( );
|
||||
|
||||
if ( g_someAreQueued ) {
|
||||
// assume none are queued now, we may get interrupted
|
||||
// and it may get set back to true
|
||||
g_someAreQueued = false;
|
||||
//g_udpServer2.makeCallbacks_ass ( 0 );
|
||||
//g_udpServer2.makeCallbacks_ass ( 1 );
|
||||
}
|
||||
|
||||
// if ( g_threads.m_needsCleanup ) {
|
||||
// // bitch about
|
||||
// static bool s_bitched = false;
|
||||
// if ( ! s_bitched ) {
|
||||
// log(LOG_REMIND,"loop: Lost thread signal.");
|
||||
// s_bitched = true;
|
||||
// }
|
||||
// if ( g_threads.m_needsCleanup ) {
|
||||
// // bitch about
|
||||
// static bool s_bitched = false;
|
||||
// if ( ! s_bitched ) {
|
||||
// log(LOG_REMIND,"loop: Lost thread signal.");
|
||||
// s_bitched = true;
|
||||
// }
|
||||
|
||||
|
||||
// }
|
||||
//cleanup and launch threads:
|
||||
//g_threads.printState();
|
||||
g_threads.timedCleanUp(4, MAX_NICENESS ) ; // 4 ms
|
||||
// }
|
||||
//cleanup and launch threads:
|
||||
//g_threads.printState();
|
||||
//g_threads.timedCleanUp(4, MAX_NICENESS ) ; // 4 ms
|
||||
|
||||
// do it anyway
|
||||
// take this out as well to see if signals are coming in
|
||||
//doPoll();
|
||||
// do it anyway
|
||||
// take this out as well to see if signals are coming in
|
||||
//doPoll();
|
||||
|
||||
while ( m_needToPoll ) doPoll();
|
||||
|
||||
long elapsed = g_now - s_lastTime;
|
||||
// if someone changed the system clock on us, this could be negative
|
||||
// so fix it! otherwise, times may NEVER get called in our lifetime
|
||||
if ( elapsed < 0 ) elapsed = m_minTick;
|
||||
// call this every (about) 1 second
|
||||
if ( elapsed >= m_minTick ) {
|
||||
// MAX_NUM_FDS is the fd for sleep callbacks
|
||||
callCallbacks_ass ( true , MAX_NUM_FDS , g_now );
|
||||
// note the last time we called them
|
||||
//g_now = gettimeofdayInMilliseconds();
|
||||
s_lastTime = g_now;
|
||||
}
|
||||
//while ( m_needToPoll ) doPoll();
|
||||
|
||||
#ifndef _POLLONLY_
|
||||
|
||||
// hack
|
||||
//char buffer[100];
|
||||
//if ( recv(27,buffer,99,MSG_PEEK|MSG_DONTWAIT) == 0 ) {
|
||||
// logf(LOG_DEBUG,"CLOSED CLOSED!!");
|
||||
//}
|
||||
//g_errno = 0;
|
||||
// hack
|
||||
//char buffer[100];
|
||||
//if ( recv(27,buffer,99,MSG_PEEK|MSG_DONTWAIT) == 0 ) {
|
||||
// logf(LOG_DEBUG,"CLOSED CLOSED!!");
|
||||
//}
|
||||
//g_errno = 0;
|
||||
|
||||
//check for pending signals, return right away if none.
|
||||
//then we'll do the low priority stuff while we were
|
||||
//supposed to be sleeping.
|
||||
g_inWaitState = true;
|
||||
sigNum = sigtimedwait (&sigs0, &info, s_sigWaitTimePtr ) ;
|
||||
//check for pending signals, return right away if none.
|
||||
//then we'll do the low priority stuff while we were
|
||||
//supposed to be sleeping.
|
||||
g_inWaitState = true;
|
||||
|
||||
// if no signal, we just waited 20 ms and nothing happened
|
||||
if ( sigNum == -1 )
|
||||
sigalrmHandler( 0,&info,NULL);
|
||||
//logf(LOG_DEBUG,"loop: sigNum=%li signo=%li alrm=%li",
|
||||
// (long)sigNum,info.si_signo,(long)SIGVTALRM);
|
||||
// no longer in a wait state...
|
||||
g_inWaitState = false;
|
||||
//sigNum = sigtimedwait (&sigs0, &info, s_sigWaitTimePtr ) ;
|
||||
|
||||
if ( sigNum < 0 ) {
|
||||
if ( errno == EAGAIN || errno == EINTR ||
|
||||
errno == EILSEQ || errno == 0 ) {
|
||||
sigNum = 0;
|
||||
errno = 0;
|
||||
}
|
||||
else if ( errno != ENOMEM ) {
|
||||
log("loop: sigtimedwait(): %s.",
|
||||
strerror(errno));
|
||||
continue;
|
||||
}
|
||||
#undef usleep
|
||||
|
||||
// now we just usleep(). an arriving signal will call
|
||||
// sigHandlerQueue_r() then break us out of this sleep.
|
||||
// 10000 microseconds is 10 milliseconds. it should break out
|
||||
// when a signal comes in just like the sleep() function.
|
||||
usleep(1000 * 10);
|
||||
|
||||
// reinstate the thing that prevents us from non-chalantly adding
|
||||
// usleeps() which could degrade performance
|
||||
#define usleep(a) { char *xx=NULL;*xx=0; }
|
||||
|
||||
// if no signal, we just waited 20 ms and nothing happened
|
||||
// why do we need this now? MDW
|
||||
//if ( sigNum == -1 )
|
||||
// sigalrmHandler( 0,&info,NULL);
|
||||
|
||||
//logf(LOG_DEBUG,"loop: sigNum=%li signo=%li alrm=%li",
|
||||
// (long)sigNum,info.si_signo,(long)SIGVTALRM);
|
||||
// no longer in a wait state...
|
||||
g_inWaitState = false;
|
||||
|
||||
|
||||
long n = MAX_NUM_FDS / 32;
|
||||
|
||||
// process file descriptor callbacks for file descriptors
|
||||
// we queued in sigHandlerQueue_r() function above.
|
||||
// we use an array of 1024 bits like the poll function i guess.
|
||||
for ( long i = 0 ; i < n ; i++ ) {
|
||||
// this is a 32-bit number
|
||||
if ( ! g_fdReadBits[i] ) continue;
|
||||
// scan the individual bits now
|
||||
for ( long j = 0 ; j < 32 ; j++ ) {
|
||||
// mask mask
|
||||
unsigned long mask = 1 << j;
|
||||
// skip jth bit if not on
|
||||
if ( ! g_fdReadBits[i] & mask ) continue;
|
||||
// block signals for just a sec so we can
|
||||
// clear it now that we've handled it
|
||||
//maskSignals();
|
||||
// clear it
|
||||
g_fdReadBits[i] &= ~mask;
|
||||
// reinstate signals
|
||||
//unmaskSignals();
|
||||
// construct the file descriptor
|
||||
long fd = i*32 + j;
|
||||
// . call all callbacks registered on this fd
|
||||
// . forReading = true
|
||||
callCallbacks_ass ( true , fd , g_now );
|
||||
}
|
||||
if ( sigNum == 0 ) {
|
||||
//no signals pending, try to take care of anything
|
||||
// left undone:
|
||||
}
|
||||
|
||||
long long startTime =gettimeofdayInMillisecondsLocal();
|
||||
if(g_now & 1) {
|
||||
if(g_udpServer.needBottom())
|
||||
g_udpServer.makeCallbacks_ass ( 2 );
|
||||
//if(g_udpServer2.needBottom())
|
||||
// g_udpServer2.makeCallbacks_ass ( 2 );
|
||||
// do the same thing but for writing now
|
||||
for ( long i = 0 ; i < n ; i++ ) {
|
||||
// this is a 32-bit number
|
||||
if ( ! g_fdWriteBits[i] ) continue;
|
||||
// scan the individual bits now
|
||||
for ( long j = 0 ; j < 32 ; j++ ) {
|
||||
// mask mask
|
||||
unsigned long mask = 1 << j;
|
||||
// skip jth bit if not on
|
||||
if ( ! g_fdWriteBits[i] & mask ) continue;
|
||||
// block signals for just a sec so we can
|
||||
// clear it now that we've handled it
|
||||
//maskSignals();
|
||||
// clear it
|
||||
g_fdWriteBits[i] &= ~mask;
|
||||
// reinstate signals
|
||||
//unmaskSignals();
|
||||
// construct the file descriptor
|
||||
long fd = i*32 + j;
|
||||
// . call all callbacks registered on this fd.
|
||||
// . forReading = false.
|
||||
callCallbacks_ass ( false , fd , g_now );
|
||||
}
|
||||
}
|
||||
|
||||
if(gettimeofdayInMillisecondsLocal() -
|
||||
startTime > 10)
|
||||
goto notime;
|
||||
long long elapsed = g_now - s_lastTime;
|
||||
// if someone changed the system clock on us, this could be negative
|
||||
// so fix it! otherwise, times may NEVER get called in our lifetime
|
||||
if ( elapsed < 0 ) elapsed = m_minTick;
|
||||
// call this every (about) m_minTicks milliseconds
|
||||
if ( elapsed >= m_minTick ) {
|
||||
// MAX_NUM_FDS is the fd for sleep callbacks
|
||||
callCallbacks_ass ( true , MAX_NUM_FDS , g_now );
|
||||
// note the last time we called them
|
||||
//g_now = gettimeofdayInMilliseconds();
|
||||
s_lastTime = g_now;
|
||||
}
|
||||
|
||||
// call remaining callbacks for udp msgs
|
||||
if ( g_udpServer.needBottom() )
|
||||
g_udpServer.makeCallbacks_ass ( 2 );
|
||||
|
||||
//if(g_udpServer2.needBottom())
|
||||
// g_udpServer2.makeCallbacks_ass ( 2 );
|
||||
|
||||
// if(gettimeofdayInMillisecondsLocal() -
|
||||
// startTime > 10)
|
||||
// goto notime;
|
||||
|
||||
if(g_conf.m_sequentialProfiling)
|
||||
g_threads.printState();
|
||||
if(g_threads.m_needsCleanup)
|
||||
g_threads.timedCleanUp(4 , // ms
|
||||
MAX_NICENESS);
|
||||
}
|
||||
else {
|
||||
if(g_conf.m_sequentialProfiling)
|
||||
g_threads.printState();
|
||||
if(g_threads.m_needsCleanup)
|
||||
g_threads.timedCleanUp(4 , // ms
|
||||
MAX_NICENESS);
|
||||
if ( g_conf.m_sequentialProfiling )
|
||||
g_threads.printState();
|
||||
|
||||
if(gettimeofdayInMillisecondsLocal() -
|
||||
startTime > 10)
|
||||
goto notime;
|
||||
if ( g_threads.m_needsCleanup )
|
||||
// limit to 4ms. cleanup any niceness thread.
|
||||
g_threads.timedCleanUp(4 ,MAX_NICENESS);
|
||||
|
||||
if(g_udpServer.needBottom())
|
||||
g_udpServer.makeCallbacks_ass ( 2 );
|
||||
//if(g_udpServer2.needBottom())
|
||||
// g_udpServer2.makeCallbacks_ass ( 2 );
|
||||
}
|
||||
|
||||
notime:
|
||||
//if we still didn't get all of them cleaned up set
|
||||
//sleep time to none.
|
||||
if(g_udpServer.needBottom() ) {
|
||||
//g_udpServer2.needBottom()) {
|
||||
s_sigWaitTimePtr = &s_sigWaitTime2;
|
||||
}
|
||||
else {
|
||||
//otherwise set it to minTick
|
||||
s_sigWaitTimePtr = &s_sigWaitTime;
|
||||
}
|
||||
}
|
||||
//
|
||||
// we got a signal, process it
|
||||
//
|
||||
else {
|
||||
if ( info.si_code == SIGIO ) {
|
||||
log("loop: got sigio");
|
||||
m_needToPoll = true;
|
||||
}
|
||||
// handle the signal
|
||||
else sigHandler_r ( 0 , &info , NULL );
|
||||
}
|
||||
#endif
|
||||
} while (1);
|
||||
|
||||
goto BIGLOOP;
|
||||
|
||||
// make compiler happy
|
||||
return 0;
|
||||
|
||||
|
||||
/*
|
||||
|
||||
if ( sigNum < 0 ) {
|
||||
if ( errno == EAGAIN || errno == EINTR ||
|
||||
errno == EILSEQ || errno == 0 ) {
|
||||
sigNum = 0;
|
||||
errno = 0;
|
||||
}
|
||||
else if ( errno != ENOMEM ) {
|
||||
log("loop: sigtimedwait(): %s.",
|
||||
strerror(errno));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if ( sigNum == 0 ) {
|
||||
//no signals pending, try to take care of anything
|
||||
// left undone:
|
||||
|
||||
long long startTime =gettimeofdayInMillisecondsLocal();
|
||||
if(g_now & 1) {
|
||||
if(g_udpServer.needBottom())
|
||||
g_udpServer.makeCallbacks_ass ( 2 );
|
||||
//if(g_udpServer2.needBottom())
|
||||
// g_udpServer2.makeCallbacks_ass ( 2 );
|
||||
|
||||
if(gettimeofdayInMillisecondsLocal() -
|
||||
startTime > 10)
|
||||
goto notime;
|
||||
|
||||
if(g_conf.m_sequentialProfiling)
|
||||
g_threads.printState();
|
||||
if(g_threads.m_needsCleanup)
|
||||
g_threads.timedCleanUp(4 , // ms
|
||||
MAX_NICENESS);
|
||||
}
|
||||
else {
|
||||
if(g_conf.m_sequentialProfiling)
|
||||
g_threads.printState();
|
||||
if(g_threads.m_needsCleanup)
|
||||
g_threads.timedCleanUp(4 , // ms
|
||||
MAX_NICENESS);
|
||||
|
||||
if(gettimeofdayInMillisecondsLocal() -
|
||||
startTime > 10)
|
||||
goto notime;
|
||||
|
||||
if(g_udpServer.needBottom())
|
||||
g_udpServer.makeCallbacks_ass ( 2 );
|
||||
//if(g_udpServer2.needBottom())
|
||||
// g_udpServer2.makeCallbacks_ass ( 2 );
|
||||
}
|
||||
|
||||
notime:
|
||||
//if we still didn't get all of them cleaned up set
|
||||
//sleep time to none.
|
||||
if(g_udpServer.needBottom() ) {
|
||||
//g_udpServer2.needBottom()) {
|
||||
s_sigWaitTimePtr = &s_sigWaitTime2;
|
||||
}
|
||||
else {
|
||||
//otherwise set it to minTick
|
||||
s_sigWaitTimePtr = &s_sigWaitTime;
|
||||
}
|
||||
}
|
||||
//
|
||||
// we got a signal, process it
|
||||
//
|
||||
else {
|
||||
if ( info.si_code == SIGIO ) {
|
||||
log("loop: got sigio");
|
||||
m_needToPoll = true;
|
||||
}
|
||||
// handle the signal
|
||||
else sigHandler_r ( 0 , &info , NULL );
|
||||
}
|
||||
*/
|
||||
|
||||
// } while(1)
|
||||
|
||||
/*
|
||||
loop:
|
||||
g_now = gettimeofdayInMilliseconds();
|
||||
g_errno = 0;
|
||||
@ -1262,10 +1523,10 @@ bool Loop::runLoop ( ) {
|
||||
s_bitched = true;
|
||||
}
|
||||
// assume not any more
|
||||
g_threads.m_needsCleanup = false;
|
||||
//g_threads.m_needsCleanup = false;
|
||||
// check thread queue for any threads that completed
|
||||
// so we can call their callbacks and remove them
|
||||
g_threads.cleanUp ( 0 , 1000/*max niceness*/);
|
||||
g_threads.cleanUp ( 0 , 1000) ; // max niceness
|
||||
// launch any threads in waiting since this sig was
|
||||
// from a terminating one
|
||||
g_threads.launchThreads();
|
||||
@ -1347,8 +1608,8 @@ bool Loop::runLoop ( ) {
|
||||
//goto subloop;
|
||||
// we need to make g_now as accurate as possible for hot UdpServer...
|
||||
goto loop;
|
||||
// make compiler happy
|
||||
return 0;
|
||||
*/
|
||||
|
||||
}
|
||||
|
||||
// . the kernel sends a SIGIO signal when the sig queue overflows
|
||||
|
5
Msg5.cpp
5
Msg5.cpp
@ -149,6 +149,11 @@ bool Msg5::getList ( char rdbId ,
|
||||
log("disk: Trying to reset a class waiting for a reply.");
|
||||
char *xx = NULL; *xx = 0;
|
||||
}
|
||||
if ( collnum < 0 ) {
|
||||
log("msg5: called with bad collnum=%li",(long)collnum);
|
||||
g_errno = ENOCOLLREC;
|
||||
return true;
|
||||
}
|
||||
// sanity check. we no longer have record caches!
|
||||
// now we do again for posdb gbdocid:xxx| restricted queries
|
||||
//if ( addToCache || maxCacheAge ) {char *xx=NULL;*xx=0; }
|
||||
|
@ -136,13 +136,16 @@ skipReplaceHost:
|
||||
//char *shotcol = "";
|
||||
char shotcol[1024];
|
||||
shotcol[0] = '\0';
|
||||
char *cs = coll;
|
||||
if ( ! cs ) cs = "";
|
||||
|
||||
if ( g_conf.m_useShotgun && format == FORMAT_HTML ) {
|
||||
colspan = "31";
|
||||
//shotcol = "<td><b>ip2</b></td>";
|
||||
sprintf ( shotcol, "<td><a href=\"/admin/hosts?c=%s"
|
||||
"&sort=2\">"
|
||||
"<b>ping2</b></td></a>",
|
||||
coll);
|
||||
cs);
|
||||
}
|
||||
|
||||
// print host table
|
||||
@ -253,24 +256,24 @@ skipReplaceHost:
|
||||
TABLE_STYLE ,
|
||||
colspan ,
|
||||
|
||||
coll, sort,
|
||||
cs, sort,
|
||||
DARK_BLUE ,
|
||||
|
||||
coll,
|
||||
coll,
|
||||
coll,
|
||||
coll,
|
||||
coll,
|
||||
coll,
|
||||
coll,
|
||||
coll,
|
||||
coll,
|
||||
coll,
|
||||
coll,
|
||||
coll,
|
||||
coll,
|
||||
coll,
|
||||
coll,
|
||||
cs,
|
||||
cs,
|
||||
cs,
|
||||
cs,
|
||||
cs,
|
||||
cs,
|
||||
cs,
|
||||
cs,
|
||||
cs,
|
||||
cs,
|
||||
cs,
|
||||
cs,
|
||||
cs,
|
||||
cs,
|
||||
cs,
|
||||
shotcol );
|
||||
|
||||
// loop through each host we know and print it's stats
|
||||
@ -750,7 +753,7 @@ skipReplaceHost:
|
||||
"</tr>" ,
|
||||
bg,//LIGHT_BLUE ,
|
||||
ipbuf3, h->m_httpPort,
|
||||
coll, sort,
|
||||
cs, sort,
|
||||
i ,
|
||||
h->m_hostname,
|
||||
h->m_shardNum,//group,
|
||||
@ -986,7 +989,7 @@ skipReplaceHost:
|
||||
LIGHT_BLUE,
|
||||
ipbuf3,
|
||||
h->m_httpPort,
|
||||
coll,
|
||||
cs,
|
||||
i ,
|
||||
|
||||
type,
|
||||
|
Reference in New Issue
Block a user