Files
privacore-open-source-searc…/Loop.cpp
2016-08-30 14:28:33 +02:00

934 lines
28 KiB
C++

#include "gb-include.h"
#include "Loop.h"
#include "JobScheduler.h"
#include "UdpServer.h"
#include "HttpServer.h" // g_httpServer.m_tcp.m_numQueued
#include "Profiler.h"
#include "Process.h"
#include "PageParser.h"
#include "Conf.h"
#include "Stats.h"
#include <execinfo.h>
#include <sys/auxv.h>
#include <stdio.h>
#include <sys/time.h>
#include <sys/types.h>
#include <signal.h>
#include <fcntl.h> // fcntl()
#include <sys/poll.h> // POLLIN, POLLPRI, ...
// raised from 5000 to 10000 because we have more UdpSlots now and Multicast
// will call g_loop.registerSleepCallback() if it fails to get a UdpSlot to
// send on.
#define MAX_SLOTS 10000
// TODO: . if signal queue overflows another signal is sent
// . capture that signal and use poll or something???
// Tricky Gotchas:
// TODO: if an event happens on a TCP fd/socket before we fully accept it
// we should just register it then call the read callback in case
// we just missed a ready for reading signal!!!!!
// TODO: signals can be gotten off the queue after we've closed an fd
// in which case the handler should be removed from Loop's registry
// BEFORE being closed... so the handler will be NULL... ???
// NOTE: keep in mind that the signals might be delayed or be really fast!
// TODO: don't mask signals, catch them as they arrive? (like in phhttpd)
// a global class extern'd in .h file
Loop g_loop;
// the global niceness
char g_niceness = 0;
// use this in case we unregister the "next" callback
static Slot *s_callbacksNext;
// free up all our mem
void Loop::reset() {
if ( m_slots ) {
log(LOG_DEBUG,"db: resetting loop");
mfree ( m_slots , MAX_SLOTS * sizeof(Slot) , "Loop" );
}
m_slots = NULL;
}
static void sigbadHandler ( int x , siginfo_t *info , void *y ) ;
static void sigpwrHandler ( int x , siginfo_t *info , void *y ) ;
static void sighupHandler ( int x , siginfo_t *info , void *y ) ;
static void sigprofHandler(int signo, siginfo_t *info, void *context);
void Loop::unregisterReadCallback ( int fd, void *state , void (* callback)(int fd,void *state) ){
if ( fd < 0 ) return;
// from reading
unregisterCallback ( m_readSlots,fd, state , callback, true);
}
void Loop::unregisterWriteCallback ( int fd, void *state , void (* callback)(int fd,void *state)){
// from writing
unregisterCallback ( m_writeSlots , fd , state,callback,false);
}
void Loop::unregisterSleepCallback ( void *state , void (* callback)(int fd,void *state)){
unregisterCallback (m_readSlots,MAX_NUM_FDS,state,callback,true);
}
static fd_set s_selectMaskRead;
static fd_set s_selectMaskWrite;
static int s_readFds[MAX_NUM_FDS];
static int32_t s_numReadFds = 0;
static int s_writeFds[MAX_NUM_FDS];
static int32_t s_numWriteFds = 0;
void Loop::unregisterCallback ( Slot **slots , int fd , void *state , void (* callback)(int fd,void *state) ,
bool forReading ) {
// bad fd
if ( fd < 0 ) {log(LOG_LOGIC,
"loop: fd to unregister is negative.");return;}
// set a flag if we found it
bool found = false;
// slots is m_readSlots OR m_writeSlots
Slot *s = slots [ fd ];
Slot *lastSlot = NULL;
// . keep track of new min tick for sleep callbacks
// . sleep a min of 40ms so g_now is somewhat up to date
int32_t min = 40; // 0x7fffffff;
int32_t lastMin = min;
// chain through all callbacks registerd with this fd
while ( s ) {
// get the next slot (NULL if no more)
Slot *next = s->m_next;
// if we're unregistering a sleep callback
// we might have to recalculate m_minTick
if ( s->m_tick < min ) { lastMin = min; min = s->m_tick; }
// skip this slot if callbacks don't match
if ( s->m_callback != callback ) { lastSlot = s; goto skip; }
// skip this slot if states don't match
if ( s->m_state != state ) { lastSlot = s; goto skip; }
// free this slot since it callback matches "callback"
//mfree ( s , sizeof(Slot) , "Loop" );
returnSlot ( s );
found = true;
// if the last one, then remove the FD from s_fdList
// so and clear a bit so doPoll() function is fast
if ( slots[fd] == s && s->m_next == NULL ) {
for (int32_t i = 0; i < s_numReadFds ; i++ ) {
if ( ! forReading ) break;
if ( s_readFds[i] != fd ) continue;
s_readFds[i] = s_readFds[s_numReadFds-1];
s_numReadFds--;
// remove from select mask too
FD_CLR(fd,&s_selectMaskRead );
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
log( "loop: unregistering read callback for fd=%i", fd );
}
break;
}
for (int32_t i = 0; i < s_numWriteFds ; i++ ) {
if ( forReading ) break;
if ( s_writeFds[i] != fd ) continue;
s_writeFds[i] = s_writeFds[s_numWriteFds-1];
s_numWriteFds--;
// remove from select mask too
FD_CLR(fd,&s_selectMaskWrite);
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
log( LOG_DEBUG, "loop: unregistering write callback for fd=%" PRId32" from write #wrts=%" PRId32,
( int32_t ) fd, ( int32_t ) s_numWriteFds );
}
break;
}
}
// debug msg
//log("Loop::unregistered fd=%" PRId32" state=%" PRIu32, fd, (int32_t)state );
// revert back to old min if this is the Slot we're removing
min = lastMin;
// excise the previous slot from linked list
if ( lastSlot ) lastSlot->m_next = next;
else slots[fd] = next;
// watch out if we're in the previous callback, we need to
// fix the linked list in callCallbacks_ass
if ( s_callbacksNext == s ) s_callbacksNext = next;
skip:
// advance to the next slot
s = next;
}
// set our new minTick if we were unregistering a sleep callback
if ( fd == MAX_NUM_FDS ) {
m_minTick = min;
}
return;
}
bool Loop::registerReadCallback ( int fd, void *state, void (* callback)(int fd,void *state ) , int32_t niceness ) {
// the "true" answers the question "for reading?"
if ( addSlot ( true, fd, state, callback, niceness ) ) {
return true;
}
log( LOG_WARN, "loop: Unable to register read callback." );
return false;
}
bool Loop::registerWriteCallback ( int fd, void *state, void (* callback)(int fd, void *state ) , int32_t niceness ) {
// the "false" answers the question "for reading?"
if ( addSlot ( false, fd, state, callback, niceness ) ) {
return true;
}
log( LOG_WARN, "loop: Unable to register write callback.");
return false;
}
// tick is in milliseconds
bool Loop::registerSleepCallback ( int32_t tick, void *state, void (* callback)(int fd,void *state ),
int32_t niceness, bool immediate ) {
if ( ! addSlot ( true, MAX_NUM_FDS, state, callback, niceness, tick, immediate ) ) {
log( LOG_WARN, "loop: Unable to register sleep callback" );
return false;
}
if ( tick < m_minTick ) {
m_minTick = tick;
}
return true;
}
// . returns false and sets g_errno on error
bool Loop::addSlot ( bool forReading , int fd, void *state, void (* callback)(int fd, void *state),
int32_t niceness , int32_t tick, bool immediate ) {
// ensure fd is >= 0
if ( fd < 0 ) {
g_errno = EBADENGINEER;
log(LOG_LOGIC,"loop: fd to register is negative.");
return false;
}
// sanity
if ( fd > MAX_NUM_FDS ) {
log("loop: bad fd of %" PRId32,(int32_t)fd);
g_process.shutdownAbort(true);
}
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
log( LOG_DEBUG, "loop: registering %s callback sd=%i", forReading ? "read" : "write", fd);
}
// . ensure fd not already registered with this callback/state
// . prevent dups so you can keep calling register w/o fear
Slot *s;
if ( forReading ) {
s = m_readSlots [ fd ];
} else {
s = m_writeSlots [ fd ];
}
while ( s ) {
if ( s->m_callback == callback &&
s->m_state == state ) {
// don't set g_errno for this anymore, just bitch
//g_errno = EBADENGINEER;
log(LOG_LOGIC,"loop: fd=%i is already registered.",fd);
return true;
}
s = s->m_next;
}
// . make a new slot
// . TODO: implement mprimealloc() to pre-alloc slots for us for speed
//s = (Slot *) mmalloc ( sizeof(Slot ) ,"Loop");
s = getEmptySlot ( );
if ( ! s ) return false;
// for pointing to slot already in position for fd
Slot *next ;
// store ourselves in the slot for this fd
if ( forReading ) {
next = m_readSlots [ fd ];
m_readSlots [ fd ] = s;
// if not already registered, add to list
if ( fd < MAX_NUM_FDS && ! FD_ISSET( fd,&s_selectMaskRead ) ) {
// sanity
if ( s_numReadFds >= MAX_NUM_FDS){
g_process.shutdownAbort(true);
}
s_readFds[s_numReadFds++] = fd;
FD_SET ( fd,&s_selectMaskRead );
}
// fd == MAX_NUM_FDS if it's a sleep callback
//if ( fd < MAX_NUM_FDS ) {
//FD_SET ( fd , &m_readfds );
//FD_SET ( fd , &m_exceptfds );
//}
}
else {
next = m_writeSlots [ fd ];
m_writeSlots [ fd ] = s;
//FD_SET ( fd , &m_writefds );
// if not already registered, add to list
if ( fd<MAX_NUM_FDS && ! FD_ISSET ( fd,&s_selectMaskWrite ) ) {
// sanity
if ( s_numWriteFds>=MAX_NUM_FDS){
g_process.shutdownAbort(true);
}
s_writeFds[s_numWriteFds++] = fd;
FD_SET ( fd,&s_selectMaskWrite );
}
}
// set our callback and state
s->m_callback = callback;
s->m_state = state;
// point to the guy that was registered for fd before us
s->m_next = next;
// save our niceness for doPoll()
s->m_niceness = niceness;
// store the tick for sleep wrappers (should be max for others)
s->m_tick = tick;
// the last called time
s->m_lastCall = immediate ? 0 : gettimeofdayInMilliseconds();
// debug msg
//log("Loop::registered fd=%i state=%" PRIu32,fd,state);
// if fd == MAX_NUM_FDS if it's a sleep callback
if ( fd == MAX_NUM_FDS ) {
return true;
}
// watch out for big bogus fds used for thread exit callbacks
if ( fd > MAX_NUM_FDS ) {
return true;
}
// set fd non-blocking
return setNonBlocking ( fd , niceness ) ;
}
// . now make sure we're listening for an interrupt on this fd
// . set it non-blocing and enable signal catching for it
// . listen for an interrupt for this fd
bool Loop::setNonBlocking ( int fd , int32_t niceness ) {
retry:
int flags = fcntl ( fd , F_GETFL ) ;
if ( flags < 0 ) {
// valgrind
if ( errno == EINTR ) goto retry;
g_errno = errno;
log( LOG_WARN, "loop: fcntl(F_GETFL): %s.",strerror(errno));
return false;
}
retry9:
if ( fcntl ( fd, F_SETFL, flags|O_NONBLOCK|O_ASYNC) < 0 ) {
// valgrind
if ( errno == EINTR ) goto retry9;
g_errno = errno;
log( LOG_WARN, "loop: fcntl(NONBLOCK): %s.",strerror(errno));
return false;
}
// we use select()/poll now so skip stuff below
return true;
}
// . if "forReading" is true call callbacks registered for reading on "fd"
// . if "forReading" is false call callbacks registered for writing on "fd"
// . if fd is MAX_NUM_FDS and "forReading" is true call all sleepy callbacks
void Loop::callCallbacks_ass ( bool forReading , int fd , int64_t now , int32_t niceness ) {
// save the g_errno to send to all callbacks
int saved_errno = g_errno;
// get the first Slot in the chain that is waiting on this fd
Slot *s ;
if ( forReading ) s = m_readSlots [ fd ];
else s = m_writeSlots [ fd ];
//s = m_readSlots [ fd ];
// ensure we called something
int32_t numCalled = 0;
// . now call all the callbacks
// . most will re-register themselves (i.e. call registerCallback...()
while ( s ) {
// skip this slot if he has no callback
if ( ! s->m_callback ) {
continue;
}
// NOTE: callback can unregister fd for Slot s, so get next
//Slot *next = s->m_next;
s_callbacksNext = s->m_next;
// watch out if clock was set back
if ( s->m_lastCall > now ) {
s->m_lastCall = now;
}
// if we're a sleep callback, check to make sure not premature
if ( fd == MAX_NUM_FDS && s->m_lastCall + s->m_tick > now ) {
s = s_callbacksNext;
continue;
}
// skip if not a niceness match
if ( niceness == 0 && s->m_niceness != 0 ) {
s = s_callbacksNext;
continue;
}
// update the lastCall timestamp for this slot
if ( fd == MAX_NUM_FDS ) {
s->m_lastCall = now;
}
// do the callback
logDebug( g_conf.m_logDebugLoop, "loop: enter fd callback fd=%d nice=%" PRId32, fd, s->m_niceness );
// sanity check. -1 no longer supported
if ( s->m_niceness < 0 ) {
g_process.shutdownAbort(true);
}
// Temporarily (for the duration of the callback call) switch
// niceness to the niceness of the slot
int32_t saved_niceness = g_niceness;
g_niceness = s->m_niceness;
// make sure not 2
if ( g_niceness >= 2 ) {
g_niceness = 1;
}
s->m_callback ( fd , s->m_state );
// restore niceness
g_niceness = saved_niceness;
logDebug( g_conf.m_logDebugLoop, "loop: exit fd callback fd=%" PRId32" nice=%" PRId32,
(int32_t)fd,(int32_t)s->m_niceness );
// inc the flag
numCalled++;
// reset g_errno so all callbacks for this fd get same g_errno
g_errno = saved_errno;
// get the next n (will be -1 if no slot after it)
s = s_callbacksNext;
}
s_callbacksNext = NULL;
}
Loop::Loop ( ) {
m_isDoingLoop = false;
// set all callbacks to NULL so we know they're empty
for ( int32_t i = 0 ; i < MAX_NUM_FDS+2 ; i++ ) {
m_readSlots [i] = NULL;
m_writeSlots[i] = NULL;
}
// the extra sleep slots
//m_readSlots [ MAX_NUM_FDS ] = NULL;
m_slots = NULL;
m_pipeFd[0] = -1;
m_pipeFd[1] = -1;
}
// free all slots from addSlots
Loop::~Loop ( ) {
reset();
if(m_pipeFd[0]>=0) {
close(m_pipeFd[0]);
m_pipeFd[0] = -1;
}
if(m_pipeFd[1]>=0) {
close(m_pipeFd[1]);
m_pipeFd[1] = -1;
}
}
// returns NULL and sets g_errno if none are left
Slot *Loop::getEmptySlot ( ) {
Slot *s = m_head;
if ( ! s ) {
g_errno = EBUFTOOSMALL;
log("loop: No empty slots available. "
"Increase #define MAX_SLOTS.");
return NULL;
}
m_head = s->m_nextAvail;
return s;
}
void Loop::returnSlot ( Slot *s ) {
s->m_nextAvail = m_head;
m_head = s;
}
bool Loop::init ( ) {
// clear this up here before using in doPoll()
FD_ZERO(&s_selectMaskRead);
FD_ZERO(&s_selectMaskWrite);
// set-up wakeup pipe
if(pipe(m_pipeFd)!=0) {
log(LOG_ERROR,"pipe() failed with errno=%d",errno);
return false;
}
setNonBlocking(m_pipeFd[0],0);
setNonBlocking(m_pipeFd[1],0);
FD_SET(m_pipeFd[0],&s_selectMaskRead);
// sighupHandler() will set this to true so we know when to shutdown
m_shutdown = 0;
// . reset this cuz we have no sleep callbacks right now
// . sleep a min of 40ms so g_now is somewhat up to date
m_minTick = 40; //0x7fffffff;
// make slots
m_slots = (Slot *) mmalloc ( MAX_SLOTS * (int32_t)sizeof(Slot) , "Loop" );
if ( ! m_slots ) return false;
// log it
log(LOG_DEBUG,"loop: Allocated %" PRId32" bytes for %" PRId32" callbacks.",
MAX_SLOTS * (int32_t)sizeof(Slot),(int32_t)MAX_SLOTS);
// init link list ptr
for ( int32_t i = 0 ; i < MAX_SLOTS - 1 ; i++ ) {
m_slots[i].m_nextAvail = &m_slots[i+1];
}
m_slots[MAX_SLOTS - 1].m_nextAvail = NULL;
m_head = &m_slots[0];
m_tail = &m_slots[MAX_SLOTS - 1];
// an innocent log msg
//log ( 0 , "Loop: starting the i/o loop");
// . when using threads GB_SIGRTMIN becomes 35, not 32 anymore
// since threads use these signals to reactivate suspended threads
// . debug msg
//log("admin: GB_SIGRTMIN=%" PRId32, (int32_t)GB_SIGRTMIN );
// . block the GB_SIGRTMIN signal
// . anytime this is raised it goes onto the signal queue
// . we use sigtimedwait() to get signals off the queue
// . sigtimedwait() selects the lowest signo first for handling
// . therefore, GB_SIGRTMIN is higher priority than (GB_SIGRTMIN + 1)
//sigfillset ( &sigs );
struct sigaction actSigPipe;
//Ignore SIGPIPE. We want a plain error return instead from system calls,.
actSigPipe.sa_handler = SIG_IGN;
sigemptyset(&actSigPipe.sa_mask);
actSigPipe.sa_flags = 0;
sigaction(SIGPIPE,&actSigPipe,NULL);
// handle SIGHUP and SIGTERM signals gracefully by saving and shutting down
struct sigaction saShutdown;
sigemptyset(&saShutdown.sa_mask);
saShutdown.sa_flags = SA_SIGINFO | SA_RESTART;
saShutdown.sa_sigaction = sighupHandler;
sigaction(SIGHUP, &saShutdown, NULL);
sigaction(SIGTERM, &saShutdown, NULL);
//sigaction(SIGABRT, &sa, NULL);
// we should save our data on segv, sigill, sigfpe, sigbus
struct sigaction saBad;
sigemptyset(&saBad.sa_mask);
saBad.sa_flags = SA_SIGINFO | SA_RESTART;
saBad.sa_sigaction = sigbadHandler;
sigaction(SIGSEGV, &saBad, NULL);
sigaction(SIGILL, &saBad, NULL);
sigaction(SIGFPE, &saBad, NULL);
sigaction(SIGBUS, &saBad, NULL);
// if the UPS is about to go off it sends a SIGPWR
struct sigaction saPower;
sigemptyset(&saPower.sa_mask);
saPower.sa_flags = SA_SIGINFO | SA_RESTART;
saPower.sa_sigaction = sigpwrHandler;
sigaction(SIGPWR, &saPower, NULL);
//SIGPROF is used by the profiler
struct sigaction saProfile;
sigemptyset(&saProfile.sa_mask);
saProfile.sa_flags = SA_SIGINFO | SA_RESTART;
saProfile.sa_sigaction = sigprofHandler;
sigaction(SIGPROF, &saProfile, NULL);
// setitimer(ITIMER_PROF...) is called when profiling is enabled/disabled
// it has noticeable overhead so it must not be enabled by default.
// success
return true;
}
// TODO: if we get a segfault while saving, what then?
void sigpwrHandler ( int x , siginfo_t *info , void *y ) {
// let main process know to shutdown
g_loop.m_shutdown = 3;
}
void printStackTrace (bool print_location) {
logf(LOG_ERROR, "gb: Printing stack trace");
static void *s_bt[200];
size_t sz = backtrace(s_bt, 200);
// find ourself
const char* process = (const char*)getauxval(AT_EXECFN);
for( size_t i = 0; i < sz; ++i ) {
char cmd[256];
sprintf(cmd,"addr2line -e %s 0x%" PRIx64, process, (uint64_t)s_bt[i]);
logf(LOG_ERROR, "%s", cmd);
}
}
// TODO: if we get a segfault while saving, what then?
void sigbadHandler ( int x , siginfo_t *info , void *y ) {
log("loop: sigbadhandler. disabling handler from recall.");
// . don't allow this handler to be called again
// . does this work if we're in a thread?
struct sigaction sa;
sigemptyset (&sa.sa_mask);
sa.sa_flags = SA_SIGINFO ; //| SA_ONESHOT;
sa.sa_sigaction = NULL;
sigaction(SIGSEGV, &sa, NULL);
sigaction(SIGILL, &sa, NULL);
sigaction(SIGFPE, &sa, NULL);
sigaction(SIGBUS, &sa, NULL);
sigaction(SIGQUIT, &sa, NULL);
sigaction(SIGSYS, &sa, NULL);
// if we've already been here, or don't need to be, then bail
if ( g_loop.m_shutdown ) {
log("loop: sigbadhandler. shutdown already called.");
return;
}
// unwind
printStackTrace();
// if we're a thread, let main process know to shutdown
g_loop.m_shutdown = 2;
log("loop: sigbadhandler. trying to save now. mode=%" PRId32, (int32_t)g_process.m_mode);
// . this will save all Rdb's
// . if "urgent" is true it will dump core
// . if "urgent" is true it won't broadcast its shutdown to all hosts
g_process.shutdown ( true );
}
static void sigprofHandler(int signo, siginfo_t *info, void *context)
{
//This is called on SIGPROF meaning that profiling is enabled
g_profiler.getStackFrame();
}
// shit, we can't make this realtime!! RdbClose() cannot be called by a
// real time sig handler
void sighupHandler ( int x , siginfo_t *info , void *y ) {
// let main process know to shutdown
g_loop.m_shutdown = 1;
}
// . keep a timestamp for the last time we called the sleep callbacks
// . we have to call those every 1 second
static int64_t s_lastTime = 0;
void Loop::runLoop ( ) {
// set of signals to watch for
sigset_t sigs0;
// clear all signals from the set
sigemptyset ( &sigs0 );
// . set sigs on which sigtimedwait() listens for
// . add this signal to our set of signals to watch (currently NONE)
sigaddset ( &sigs0, SIGCHLD );
// . TODO: do we need to mask SIGIO too? (sig queue overflow?)
// . i would think so, because what if we tried to queue an important
// handler to be called in the high priority UdpServer but the queue
// was full? Then we would finish processing the signals on the queue
// before we would address the excluded high priority signals by
// calling doPoll()
sigaddset ( &sigs0, SIGIO );
s_lastTime = 0;
m_isDoingLoop = true;
// . now loop forever waiting for signals
// . but every second check for timer-based events
for (;;) {
g_errno = 0;
if ( m_shutdown ) {
// a msg
if (m_shutdown == 1) {
log(LOG_INIT,"loop: got SIGHUP or SIGTERM.");
} else if (m_shutdown == 2) {
log(LOG_INIT,"loop: got SIGBAD in thread.");
} else {
log(LOG_INIT,"loop: got SIGPWR.");
}
// . turn off interrupts here because it doesn't help to do
// it in the thread
// . TODO: turn off signals for sigbadhandler()
// if thread got the signal, just wait for him to save all
// Rdbs and then dump core
if ( m_shutdown == 2 ) {
//log(0,"Thread is saving & shutting down urgently.");
//log("loop: Resuming despite thread crash.");
//m_shutdown = 0;
//goto BIGLOOP;
}
// otherwise, thread did not save, so we must do it
log ( LOG_INIT ,"loop: Saving and shutting down urgently.");
g_process.shutdown ( true );
}
//
//
// THE HEART OF GB. process events/signals on FDs.
//
//
doPoll();
}
}
//--- TODO: flush the signal queue after polling until done
//--- are we getting stale signals resolved by flush so we get
//--- read event on a socket that isnt in read mode???
// TODO: set signal handler to SIG_DFL to prevent signals from queuing up now
// . this handles high priority fds first (lowest niceness)
void Loop::doPoll ( ) {
// set time
//g_now = gettimeofdayInMilliseconds();
logDebug( g_conf.m_logDebugLoop, "loop: Entered doPoll." );
if(g_udpServer.needBottom()) {
g_udpServer.makeCallbacks(1);
}
int32_t n;
timeval v;
v.tv_sec = 0;
// 10ms for sleepcallbacks so they can be called...
// and we need this to be the same as sigalrmhandler() since we
// keep track of cpu usage here too, since sigalrmhandler is "VT"
// based it only goes off when that much "cpu time" has elapsed.
v.tv_usec = QUICKPOLL_INTERVAL * 1000;
again:
// gotta copy to our own since bits get cleared by select() function
fd_set readfds = s_selectMaskRead;
fd_set writefds = s_selectMaskWrite;
logDebug( g_conf.m_logDebugLoop, "loop: in select" );
// . poll the fd's searching for socket closes
// . the sigalrms and sigvtalrms and SIGCHLDs knock us out of this
// select() with n < 0 and errno equal to EINTR.
// . crap the sigalarms kick us out here every 1ms. i noticed
// then when running disableTimer() above and we don't get
// any EINTRs... can we mask those out here? it only seems to be
// the SIGALRMs not the SIGVTALRMs that interrupt us.
n = select (MAX_NUM_FDS,
&readfds,
&writefds,
NULL,//&exceptfds,
&v );
if ( n >= 0 ) errno = 0;
logDebug( g_conf.m_logDebugLoop, "loop: out select n=%" PRId32" errno=%" PRId32" errnomsg=%s ms_wait=%i",
(int32_t)n,(int32_t)errno,mstrerror(errno), (int)v.tv_sec*1000);
if ( n < 0 ) {
// valgrind
if ( errno == EINTR ) {
// got it. if we get a sig alarm or vt alarm or
// SIGCHLD (from Threads.cpp) we end up here.
//log("loop: got errno=%" PRId32,(int32_t)errno);
// if not linux we have to decrease this by 1ms
//count -= 1000;
// and re-assign to wait less time. we are
// assuming SIGALRM goes off once per ms and if
// that is not what interrupted us we may end
// up exiting early
//if ( count <= 0 && m_shutdown ) return;
// wait less this time around
//v.tv_usec = count;
// if shutting down was it a sigterm ?
if ( m_shutdown ) goto again;
// handle returned threads for niceness 0
g_jobScheduler.cleanup_finished_jobs();
// high niceness threads
g_jobScheduler.cleanup_finished_jobs();
goto again;
}
g_errno = errno;
log( LOG_WARN, "loop: select: %s.", strerror( g_errno ) );
return;
}
// if we wait for 10ms with nothing happening, fix cpu usage here too
// if ( n == 0 ) {
// Host *h = g_hostdb.m_myHost;
// h->m_cpuUsage = .99 * h->m_cpuUsage + .01 * 000;
// }
logDebug( g_conf.m_logDebugLoop, "loop: Got %" PRId32" fds waiting.", n );
if (g_conf.m_logDebugLoop || g_conf.m_logDebugTcp) {
for ( int32_t i = 0; i < MAX_NUM_FDS; i++) {
// continue if not set for reading
if ( FD_ISSET ( i, &readfds ) ) {
log( LOG_DEBUG, "loop: fd=%" PRId32" is on for read", i);
}
if ( FD_ISSET ( i, &writefds ) ) {
log( LOG_DEBUG, "loop: fd=%" PRId32" is on for write", i);
}
// if niceness is not -1, handle it below
}
}
// handle returned threads for niceness 0
g_jobScheduler.cleanup_finished_jobs();
bool calledOne = false;
const int64_t now = gettimeofdayInMilliseconds();
if( n > 0 && FD_ISSET( m_pipeFd[0], &readfds ) ) {
//drain the wakeup pipe
char buf[32];
(void)read( m_pipeFd[0], buf, sizeof(buf) );
n--;
FD_CLR( m_pipeFd[0], &readfds );
}
// now keep this fast, too. just check fds we need to.
for ( int32_t i = 0 ; i < s_numReadFds ; i++ ) {
if ( n == 0 ) break;
int fd = s_readFds[i];
Slot *s = m_readSlots [ fd ];
// if niceness is not 0, handle it below
if ( s && s->m_niceness > 0 ) continue;
// must be set
if ( ! FD_ISSET ( fd , &readfds ) ) continue;
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
log( LOG_DEBUG, "loop: calling cback0 niceness=%" PRId32" fd=%i", s->m_niceness, fd );
}
calledOne = true;
callCallbacks_ass (true,fd, now,0);//read?
}
for ( int32_t i = 0 ; i < s_numWriteFds ; i++ ) {
if ( n == 0 ) break;
int fd = s_writeFds[i];
Slot *s = m_writeSlots [ fd ];
// if niceness is not 0, handle it below
if ( s && s->m_niceness > 0 ) continue;
// fds are always ready for writing so take this out.
if ( ! FD_ISSET ( fd , &writefds ) ) continue;
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
log( LOG_DEBUG, "loop: calling wcback0 niceness=%" PRId32" fd=%i", s->m_niceness, fd );
}
calledOne = true;
callCallbacks_ass (false,fd, now,0);//false=forRead?
}
// handle returned threads for niceness 0
g_jobScheduler.cleanup_finished_jobs();
// now for lower priority fds
for ( int32_t i = 0 ; i < s_numReadFds ; i++ ) {
if ( n == 0 ) break;
int fd = s_readFds[i];
Slot *s = m_readSlots [ fd ];
// if niceness is <= 0 we did it above
if ( s && s->m_niceness <= 0 ) continue;
// must be set
if ( ! FD_ISSET ( fd , &readfds ) ) continue;
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
log( LOG_DEBUG, "loop: calling cback1 niceness=%" PRId32" fd=%i", s->m_niceness, fd );
}
calledOne = true;
callCallbacks_ass (true,fd, now,1);//read?
}
for ( int32_t i = 0 ; i < s_numWriteFds ; i++ ) {
if ( n == 0 ) break;
int fd = s_writeFds[i];
Slot *s = m_writeSlots [ fd ];
// if niceness is <= 0 we did it above
if ( s && s->m_niceness <= 0 ) continue;
// must be set
if ( ! FD_ISSET ( fd , &writefds ) ) continue;
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
log( LOG_DEBUG, "loop: calling wcback1 niceness=%" PRId32" fd=%i", s->m_niceness, fd );
}
calledOne = true;
callCallbacks_ass (false,fd, now,1);//forread?
}
// handle returned threads for all other nicenesses
g_jobScheduler.cleanup_finished_jobs();
// call sleepers if they need it
// call this every (about) 1 second
int32_t elapsed = gettimeofdayInMilliseconds() - s_lastTime;
// if someone changed the system clock on us, this could be negative
// so fix it! otherwise, times may NEVER get called in our lifetime
if ( elapsed < 0 ) {
elapsed = m_minTick;
}
if ( elapsed >= m_minTick ) {
// MAX_NUM_FDS is the fd for sleep callbacks
callCallbacks_ass ( true , MAX_NUM_FDS , gettimeofdayInMilliseconds() );
// note the last time we called them
s_lastTime = gettimeofdayInMilliseconds();
// handle returned threads for all other nicenesses
g_jobScheduler.cleanup_finished_jobs();
}
logDebug( g_conf.m_logDebugLoop, "loop: Exited doPoll.");
}
void Loop::wakeupPollLoop() {
char dummy='d';
(void)write(m_pipeFd[1],&dummy,1);
}
int gbsystem(const char *cmd ) {
log("gb: running system(\"%s\")",cmd);
int ret = system(cmd);
return ret;
}