privacore-open-source-searc.../Loop.cpp
Ivan Skytte Jørgensen beeddcf35d Got rid of gb-include.h
2018-07-26 17:29:51 +02:00

881 lines
25 KiB
C++

#include "Loop.h"
#include "JobScheduler.h"
#include "UdpServer.h"
#include "HttpServer.h" // g_httpServer.m_tcp.m_numQueued
#include "Profiler.h"
#include "Process.h"
#include "PageParser.h"
#include "Conf.h"
#include "ScopedLock.h"
#include "Mem.h"
#include "InstanceInfoExchange.h"
#include "Stats.h"
#include "GbDns.h"
#include "Errno.h"
#include <execinfo.h>
#include <sys/auxv.h>
#include <stdio.h>
#include <sys/time.h>
#include <sys/types.h>
#include <signal.h>
#include <fcntl.h> // fcntl()
#include <unistd.h>
#include <sys/poll.h> // POLLIN, POLLPRI, ...
// raised from 5000 to 10000 because we have more UdpSlots now and Multicast
// will call g_loop.registerSleepCallback() if it fails to get a UdpSlot to
// send on.
#define MAX_SLOTS 10000
// TODO: . if signal queue overflows another signal is sent
// . capture that signal and use poll or something???
// Tricky Gotchas:
// TODO: if an event happens on a TCP fd/socket before we fully accept it
// we should just register it then call the read callback in case
// we just missed a ready for reading signal!!!!!
// TODO: signals can be gotten off the queue after we've closed an fd
// in which case the handler should be removed from Loop's registry
// BEFORE being closed... so the handler will be NULL... ???
// NOTE: keep in mind that the signals might be delayed or be really fast!
// TODO: don't mask signals, catch them as they arrive? (like in phhttpd)
class Slot {
public:
void *m_state;
void (* m_callback)(int fd, void *state);
// the next Slot thats registerd on this fd
Slot *m_next;
// save niceness level for doPoll() to segregate
int32_t m_niceness;
// this callback should be called every X milliseconds
int32_t m_tick;
// when we were last called in ms time (only valid for sleep callbacks)
int64_t m_lastCall;
const char *m_description;
// linked list of available slots
Slot *m_nextAvail;
};
// a global class extern'd in .h file
Loop g_loop;
// free up all our mem
void Loop::reset() {
if ( m_slots ) {
log(LOG_DEBUG,"db: resetting loop");
mfree ( m_slots , MAX_SLOTS * sizeof(Slot) , "Loop" );
}
m_slots = NULL;
}
static void sigpwrHandler ( int x , siginfo_t *info , void *y ) ;
static void sighupHandler ( int x , siginfo_t *info , void *y ) ;
static void sigprofHandler(int signo, siginfo_t *info, void *context);
void Loop::unregisterReadCallback ( int fd, void *state , void (* callback)(int fd,void *state) ){
if ( fd < 0 ) return;
// from reading
unregisterCallback ( m_readSlots,fd, state , callback, true);
}
void Loop::unregisterWriteCallback ( int fd, void *state , void (* callback)(int fd,void *state)){
// from writing
unregisterCallback ( m_writeSlots , fd , state,callback,false);
}
void Loop::unregisterSleepCallback ( void *state , void (* callback)(int fd,void *state)){
unregisterCallback (m_readSlots,MAX_NUM_FDS,state,callback,true);
}
static fd_set s_selectMaskRead;
static fd_set s_selectMaskWrite;
static int s_readFds[MAX_NUM_FDS];
static int32_t s_numReadFds = 0;
static int s_writeFds[MAX_NUM_FDS];
static int32_t s_numWriteFds = 0;
void Loop::unregisterCallback(Slot **slots, int fd, void *state, void (* callback)(int fd,void *state), bool forReading) {
// bad fd
if(fd<0) {
log(LOG_LOGIC, "loop: fd to unregister is negative.");
return;
}
// . keep track of new min tick for sleep callbacks
// . sleep a min of 40ms so g_now is somewhat up to date
int32_t min = 40; // 0x7fffffff;
ScopedLock sl(m_slotMutex);
// chase through all callbacks registered with this fd
Slot *prevSlot = NULL;
for(Slot *s = slots[fd]; s; ) {
// get the next slot (NULL if no more)
Slot *next = s->m_next;
if(s->m_callback == callback &&
s->m_state == state) {
// free this slot since it callback matches "callback"
returnSlot ( s );
// if the last one, then remove the FD from s_fdList
// so and clear a bit so doPoll() function is fast
if(slots[fd] == s && s->m_next == NULL) {
if(forReading) {
for(int32_t i = 0; i < s_numReadFds; i++ ) {
if (s_readFds[i] == fd) {
s_readFds[i] = s_readFds[s_numReadFds-1];
s_numReadFds--;
// remove from select mask too
FD_CLR(fd,&s_selectMaskRead );
if(g_conf.m_logDebugLoop || g_conf.m_logDebugTcp) {
log( "loop: unregistering read callback for fd=%i", fd );
}
break;
}
}
} else {
for(int32_t i = 0; i < s_numWriteFds; i++ ) {
if(s_writeFds[i] == fd) {
s_writeFds[i] = s_writeFds[s_numWriteFds-1];
s_numWriteFds--;
// remove from select mask too
FD_CLR(fd,&s_selectMaskWrite);
if(g_conf.m_logDebugLoop || g_conf.m_logDebugTcp) {
log( LOG_DEBUG, "loop: unregistering write callback for fd=%" PRId32" from write #wrts=%" PRId32,
( int32_t ) fd, ( int32_t ) s_numWriteFds );
}
break;
}
}
}
}
// excise the previous slot from linked list
if(prevSlot)
prevSlot->m_next = next;
else
slots[fd] = next;
// watch out if we're in the previous callback, we need to
// fix the linked list in callCallbacks_ass
if(m_callbacksNext == s)
m_callbacksNext = next;
} else {
prevSlot = s;
// if we're unregistering a sleep callback
// we might have to recalculate m_minTick
if(s->m_tick < min)
min = s->m_tick;
}
// advance to the next slot
s = next;
}
// set our new minTick if we were unregistering a sleep callback
if ( fd == MAX_NUM_FDS ) {
m_minTick = min;
}
return;
}
bool Loop::registerReadCallback(int fd, void *state, void (*callback)(int fd, void *state),
const char *description, int32_t niceness) {
// the "true" answers the question "for reading?"
if (addSlot(true, fd, state, callback, niceness, description)) {
return true;
}
log( LOG_WARN, "loop: Unable to register read callback." );
return false;
}
bool Loop::registerWriteCallback(int fd, void *state, void (*callback)(int fd, void *state),
const char *description, int32_t niceness) {
// the "false" answers the question "for reading?"
if (addSlot(false, fd, state, callback, niceness, description)) {
return true;
}
log( LOG_WARN, "loop: Unable to register write callback.");
return false;
}
// tick is in milliseconds
bool Loop::registerSleepCallback(int32_t tick, void *state, void (*callback)(int fd, void *state),
const char *description, int32_t niceness, bool immediate) {
if (!addSlot(true, MAX_NUM_FDS, state, callback, niceness, description, tick, immediate)) {
log( LOG_WARN, "loop: Unable to register sleep callback" );
return false;
}
ScopedLock sl(m_slotMutex);
if ( tick < m_minTick ) {
m_minTick = tick;
}
return true;
}
// . returns false and sets g_errno on error
bool Loop::addSlot(bool forReading, int fd, void *state, void (*callback)(int fd, void *state),
int32_t niceness, const char *description, int32_t tick, bool immediate) {
// ensure fd is >= 0
if ( fd < 0 ) {
g_errno = EBADENGINEER;
log(LOG_LOGIC,"loop: fd to register is negative.");
return false;
}
// sanity
if ( fd > MAX_NUM_FDS ) {
log(LOG_ERROR, "loop: bad fd of %" PRId32,(int32_t)fd);
g_process.shutdownAbort(true);
}
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
log( LOG_DEBUG, "loop: registering %s callback sd=%i", forReading ? "read" : "write", fd);
}
ScopedLock sl(m_slotMutex);
// . ensure fd not already registered with this callback/state
// . prevent dups so you can keep calling register w/o fear
Slot *s;
if ( forReading ) {
s = m_readSlots [ fd ];
} else {
s = m_writeSlots [ fd ];
}
while ( s ) {
if ( s->m_callback == callback &&
s->m_state == state ) {
// don't set g_errno for this anymore, just bitch
//g_errno = EBADENGINEER;
log(LOG_LOGIC,"loop: fd=%i is already registered.",fd);
return true;
}
s = s->m_next;
}
// . make a new slot
// . TODO: implement mprimealloc() to pre-alloc slots for us for speed
//s = (Slot *) mmalloc ( sizeof(Slot ) ,"Loop");
s = getEmptySlot ( );
if ( ! s ) return false;
// for pointing to slot already in position for fd
Slot *next ;
// store ourselves in the slot for this fd
if ( forReading ) {
next = m_readSlots [ fd ];
m_readSlots [ fd ] = s;
// if not already registered, add to list
if ( fd < MAX_NUM_FDS && ! FD_ISSET( fd,&s_selectMaskRead ) ) {
// sanity
if ( s_numReadFds >= MAX_NUM_FDS){
g_process.shutdownAbort(true);
}
s_readFds[s_numReadFds++] = fd;
FD_SET ( fd,&s_selectMaskRead );
}
}
else {
next = m_writeSlots [ fd ];
m_writeSlots [ fd ] = s;
//FD_SET ( fd , &m_writefds );
// if not already registered, add to list
if ( fd<MAX_NUM_FDS && ! FD_ISSET ( fd,&s_selectMaskWrite ) ) {
// sanity
if ( s_numWriteFds>=MAX_NUM_FDS){
g_process.shutdownAbort(true);
}
s_writeFds[s_numWriteFds++] = fd;
FD_SET ( fd,&s_selectMaskWrite );
}
}
// set our callback and state
s->m_callback = callback;
s->m_state = state;
s->m_description = description;
// point to the guy that was registered for fd before us
s->m_next = next;
// save our niceness for doPoll()
s->m_niceness = niceness;
// store the tick for sleep wrappers (should be max for others)
s->m_tick = tick;
// the last called time
s->m_lastCall = immediate ? 0 : gettimeofdayInMilliseconds();
// if fd == MAX_NUM_FDS if it's a sleep callback
if ( fd == MAX_NUM_FDS ) {
return true;
}
// watch out for big bogus fds used for thread exit callbacks
if ( fd > MAX_NUM_FDS ) {
return true;
}
// set fd non-blocking
return setNonBlocking(fd);
}
// . now make sure we're listening for an interrupt on this fd
// . set it non-blocing and enable signal catching for it
// . listen for an interrupt for this fd
bool Loop::setNonBlocking(int fd) {
int flags = fcntl ( fd , F_GETFL ) ;
if ( flags < 0 ) {
g_errno = errno;
log( LOG_WARN, "loop: fcntl(F_GETFL): %s.",strerror(errno));
return false;
}
if ( fcntl ( fd, F_SETFL, flags|O_NONBLOCK) < 0 ) {
g_errno = errno;
log( LOG_WARN, "loop: fcntl(NONBLOCK): %s.",strerror(errno));
return false;
}
// we use select()/poll now so skip stuff below
return true;
}
// . if "forReading" is true call callbacks registered for reading on "fd"
// . if "forReading" is false call callbacks registered for writing on "fd"
// . if fd is MAX_NUM_FDS and "forReading" is true call all sleepy callbacks
void Loop::callCallbacks_ass ( bool forReading , int fd , int64_t now , int32_t niceness ) {
// save the g_errno to send to all callbacks
int saved_errno = g_errno;
ScopedLock sl(m_slotMutex);
// get the first Slot in the chain that is waiting on this fd
Slot *s ;
if ( forReading ) s = m_readSlots [ fd ];
else s = m_writeSlots [ fd ];
//s = m_readSlots [ fd ];
// ensure we called something
int32_t numCalled = 0;
// . now call all the callbacks
// . most will re-register themselves (i.e. call registerCallback...()
while ( s ) {
// skip this slot if he has no callback
if ( ! s->m_callback ) {
continue;
}
// watch out if clock was set back
if ( s->m_lastCall > now ) {
s->m_lastCall = now;
}
// if we're a sleep callback, check to make sure not premature
if ( fd == MAX_NUM_FDS && s->m_lastCall + s->m_tick > now ) {
s = s->m_next;
continue;
}
// skip if not a niceness match
if ( niceness == 0 && s->m_niceness != 0 ) {
s = s->m_next;
continue;
}
// update the lastCall timestamp for this slot
if ( fd == MAX_NUM_FDS ) {
s->m_lastCall = now;
}
// do the callback
// NOTE: callback can unregister fd for Slot s, so get next
m_callbacksNext = s->m_next;
logDebug(g_conf.m_logDebugLoop, "loop: enter fd callback '%s' fd=%d nice=%" PRId32,
s->m_description, fd, s->m_niceness);
// sanity check. -1 no longer supported
if (s->m_niceness < 0) {
g_process.shutdownAbort(true);
}
int64_t took = 0;
m_slotMutex.unlock();
{
int64_t start = gettimeofdayInMilliseconds();
s->m_callback(fd, s->m_state);
took = gettimeofdayInMilliseconds() - start;
}
m_slotMutex.lock();
if (took > g_conf.m_logLoopTimeThreshold) {
log(LOG_WARN, "loop: %s took %" PRId64"ms", s->m_description, took);
}
logDebug(g_conf.m_logDebugLoop, "loop: exit fd callback '%s' fd=%d nice=%" PRId32,
s->m_description, fd, s->m_niceness);
// inc the flag
numCalled++;
// reset g_errno so all callbacks for this fd get same g_errno
g_errno = saved_errno;
// get the next n (will be -1 if no slot after it)
s = m_callbacksNext;
}
m_callbacksNext = NULL;
}
Loop::Loop()
: m_callbacksNext(NULL),
m_slotMutex(),
m_lastKeepaliveTimestamp(0)
{
m_isDoingLoop = false;
// set all callbacks to NULL so we know they're empty
for ( int32_t i = 0 ; i < MAX_NUM_FDS+2 ; i++ ) {
m_readSlots [i] = NULL;
m_writeSlots[i] = NULL;
}
// the extra sleep slots
//m_readSlots [ MAX_NUM_FDS ] = NULL;
m_slots = NULL;
m_pipeFd[0] = -1;
m_pipeFd[1] = -1;
m_shutdown = 0;
m_minTick = 40;
m_head = NULL;
m_tail = NULL;
}
// free all slots from addSlots
Loop::~Loop ( ) {
reset();
if(m_pipeFd[0]>=0) {
close(m_pipeFd[0]);
m_pipeFd[0] = -1;
}
if(m_pipeFd[1]>=0) {
close(m_pipeFd[1]);
m_pipeFd[1] = -1;
}
}
// returns NULL and sets g_errno if none are left
Slot *Loop::getEmptySlot ( ) {
Slot *s = m_head;
if ( ! s ) {
g_errno = EBUFTOOSMALL;
log(LOG_WARN, "loop: No empty slots available. Increase #define MAX_SLOTS.");
return NULL;
}
m_head = s->m_nextAvail;
return s;
}
void Loop::returnSlot ( Slot *s ) {
s->m_nextAvail = m_head;
m_head = s;
}
bool Loop::init ( ) {
// clear this up here before using in doPoll()
FD_ZERO(&s_selectMaskRead);
FD_ZERO(&s_selectMaskWrite);
// set-up wakeup pipe
if(pipe(m_pipeFd)!=0) {
log(LOG_ERROR,"pipe() failed with errno=%d",errno);
return false;
}
setNonBlocking(m_pipeFd[0]);
setNonBlocking(m_pipeFd[1]);
FD_SET(m_pipeFd[0],&s_selectMaskRead);
// sighupHandler() will set this to true so we know when to shutdown
m_shutdown = 0;
// . reset this cuz we have no sleep callbacks right now
// . sleep a min of 40ms so g_now is somewhat up to date
m_minTick = 40; //0x7fffffff;
// make slots
m_slots = (Slot *) mmalloc ( MAX_SLOTS * (int32_t)sizeof(Slot) , "Loop" );
if ( ! m_slots ) return false;
// log it
log(LOG_DEBUG,"loop: Allocated %" PRId32" bytes for %" PRId32" callbacks.",
MAX_SLOTS * (int32_t)sizeof(Slot),(int32_t)MAX_SLOTS);
// init link list ptr
for ( int32_t i = 0 ; i < MAX_SLOTS - 1 ; i++ ) {
m_slots[i].m_nextAvail = &m_slots[i+1];
}
m_slots[MAX_SLOTS - 1].m_nextAvail = NULL;
m_head = &m_slots[0];
m_tail = &m_slots[MAX_SLOTS - 1];
// an innocent log msg
//log ( 0 , "Loop: starting the i/o loop");
// . when using threads GB_SIGRTMIN becomes 35, not 32 anymore
// since threads use these signals to reactivate suspended threads
// . debug msg
//log("admin: GB_SIGRTMIN=%" PRId32, (int32_t)GB_SIGRTMIN );
// . block the GB_SIGRTMIN signal
// . anytime this is raised it goes onto the signal queue
// . we use sigtimedwait() to get signals off the queue
// . sigtimedwait() selects the lowest signo first for handling
// . therefore, GB_SIGRTMIN is higher priority than (GB_SIGRTMIN + 1)
//sigfillset ( &sigs );
struct sigaction actSigPipe;
//Ignore SIGPIPE. We want a plain error return instead from system calls,.
actSigPipe.sa_handler = SIG_IGN;
sigemptyset(&actSigPipe.sa_mask);
actSigPipe.sa_flags = 0;
sigaction(SIGPIPE,&actSigPipe,NULL);
// handle SIGHUP and SIGTERM signals gracefully by saving and shutting down
struct sigaction saShutdown;
sigemptyset(&saShutdown.sa_mask);
saShutdown.sa_flags = SA_SIGINFO | SA_RESTART;
saShutdown.sa_sigaction = sighupHandler;
sigaction(SIGHUP, &saShutdown, NULL);
sigaction(SIGTERM, &saShutdown, NULL);
// if the UPS is about to go off it sends a SIGPWR
struct sigaction saPower;
sigemptyset(&saPower.sa_mask);
saPower.sa_flags = SA_SIGINFO | SA_RESTART;
saPower.sa_sigaction = sigpwrHandler;
sigaction(SIGPWR, &saPower, NULL);
//SIGPROF is used by the profiler
struct sigaction saProfile;
sigemptyset(&saProfile.sa_mask);
saProfile.sa_flags = SA_SIGINFO | SA_RESTART;
saProfile.sa_sigaction = sigprofHandler;
sigaction(SIGPROF, &saProfile, NULL);
// setitimer(ITIMER_PROF...) is called when profiling is enabled/disabled
// it has noticeable overhead so it must not be enabled by default.
// success
return true;
}
// TODO: if we get a segfault while saving, what then?
void sigpwrHandler ( int x , siginfo_t *info , void *y ) {
// let main process know to shutdown
g_loop.m_shutdown = 3;
}
void printStackTrace (bool print_location) {
logf(LOG_ERROR, "gb: Printing stack trace");
static void *s_bt[200];
size_t sz = backtrace(s_bt, 200);
// find ourself
const char* process = (const char*)getauxval(AT_EXECFN);
for( size_t i = 0; i < sz; ++i ) {
char cmd[256];
sprintf(cmd,"addr2line -e %s 0x%" PRIx64, process, (uint64_t)s_bt[i]);
logf(LOG_ERROR, "%s", cmd);
}
}
static void sigprofHandler(int signo, siginfo_t *info, void *context)
{
//This is called on SIGPROF meaning that profiling is enabled
g_profiler.getStackFrame();
}
// shit, we can't make this realtime!! RdbClose() cannot be called by a
// real time sig handler
void sighupHandler ( int x , siginfo_t *info , void *y ) {
// let main process know to shutdown
g_loop.m_shutdown = 1;
}
// . keep a timestamp for the last time we called the sleep callbacks
// . we have to call those every 1 second
static int64_t s_lastTime = 0;
void Loop::runLoop ( ) {
s_lastTime = 0;
m_isDoingLoop = true;
// . now loop forever waiting for signals
// . but every second check for timer-based events
for (;;) {
g_errno = 0;
if ( m_shutdown ) {
// a msg
if (m_shutdown == 1) {
log(LOG_INIT,"loop: got SIGHUP or SIGTERM.");
} else if (m_shutdown == 2) {
log(LOG_INIT,"loop: got SIGBAD in thread.");
} else {
log(LOG_INIT,"loop: got SIGPWR.");
}
// . turn off interrupts here because it doesn't help to do
// it in the thread
// . TODO: turn off signals for sigbadhandler()
// if thread got the signal, just wait for him to save all
// Rdbs and then dump core
if ( m_shutdown == 2 ) {
//log(0,"Thread is saving & shutting down urgently.");
//log("loop: Resuming despite thread crash.");
//m_shutdown = 0;
//goto BIGLOOP;
}
// otherwise, thread did not save, so we must do it
log ( LOG_INIT ,"loop: Saving and shutting down urgently.");
g_process.shutdown ( true );
}
//
//
// THE HEART OF GB. process events/signals on FDs.
//
//
doPoll();
}
}
static void cleanupFinishedJobs() {
if(g_conf.m_maxJobCleanupTime<=0)
g_jobScheduler.cleanup_finished_jobs();
else {
struct timespec ts_start;
clock_gettime(CLOCK_MONOTONIC,&ts_start);
g_jobScheduler.cleanup_finished_jobs();
struct timespec ts_end;
clock_gettime(CLOCK_MONOTONIC,&ts_end);
int msecs = (ts_end.tv_sec - ts_start.tv_sec)*1000
+ (ts_end.tv_nsec - ts_start.tv_nsec)/1000000;
if(msecs>g_conf.m_maxJobCleanupTime) {
log(LOG_LOGIC,"Cleaning up job(s) took %dms hwich is more than allowed. Dumping core",msecs);
g_process.shutdownAbort();
}
}
}
//--- TODO: flush the signal queue after polling until done
//--- are we getting stale signals resolved by flush so we get
//--- read event on a socket that isnt in read mode???
// TODO: set signal handler to SIG_DFL to prevent signals from queuing up now
// . this handles high priority fds first (lowest niceness)
void Loop::doPoll ( ) {
// set time
//g_now = gettimeofdayInMilliseconds();
logDebug( g_conf.m_logDebugLoop, "loop: Entered doPoll." );
if(g_udpServer.needBottom()) {
g_udpServer.makeCallbacks(1);
}
if(m_lastKeepaliveTimestamp + g_conf.m_vagusKeepaliveSendInterval <= gettimeofdayInMilliseconds()) {
m_lastKeepaliveTimestamp = gettimeofdayInMilliseconds();
InstanceInfoExchange::weAreAlive();
}
GbDns::makeCallbacks();
int32_t n;
timeval v;
v.tv_sec = 0;
// 10ms for sleepcallbacks so they can be called...
// and we need this to be the same as sigalrmhandler() since we
// keep track of cpu usage here too, since sigalrmhandler is "VT"
// based it only goes off when that much "cpu time" has elapsed.
v.tv_usec = 10 * 1000;
// gotta copy to our own since bits get cleared by select() function
fd_set readfds = s_selectMaskRead;
fd_set writefds = s_selectMaskWrite;
logDebug( g_conf.m_logDebugLoop, "loop: in select" );
// . poll the fd's searching for socket closes
// . the sigalrms and sigvtalrms and SIGCHLDs knock us out of this
// select() with n < 0 and errno equal to EINTR.
// . crap the sigalarms kick us out here every 1ms. i noticed
// then when running disableTimer() above and we don't get
// any EINTRs... can we mask those out here? it only seems to be
// the SIGALRMs not the SIGVTALRMs that interrupt us.
n = select (MAX_NUM_FDS,
&readfds,
&writefds,
NULL,//&exceptfds,
&v );
if(n<0) {
g_errno = errno;
log( LOG_WARN, "loop: select: %s.", strerror( g_errno ) );
return;
}
errno = 0;
logDebug( g_conf.m_logDebugLoop, "loop: select() returned %d", n);
if (g_conf.m_logDebugLoop || g_conf.m_logDebugTcp) {
for ( int32_t i = 0; i < MAX_NUM_FDS; i++) {
// continue if not set for reading
if ( FD_ISSET ( i, &readfds ) ) {
log( LOG_DEBUG, "loop: fd=%" PRId32" is on for read", i);
}
if ( FD_ISSET ( i, &writefds ) ) {
log( LOG_DEBUG, "loop: fd=%" PRId32" is on for write", i);
}
// if niceness is not -1, handle it below
}
}
cleanupFinishedJobs();
const int64_t now = gettimeofdayInMilliseconds();
if( n > 0 && FD_ISSET( m_pipeFd[0], &readfds ) ) {
//drain the wakeup pipe
char buf[32];
ssize_t ignored __attribute__((unused)) = read( m_pipeFd[0], buf, sizeof(buf) ); // shut up gcc warning: ignoring return value
n--;
FD_CLR( m_pipeFd[0], &readfds );
}
// now keep this fast, too. just check fds we need to.
for ( int32_t i = 0 ; i < s_numReadFds ; i++ ) {
if ( n == 0 ) break;
int fd = s_readFds[i];
Slot *s = m_readSlots [ fd ];
// if niceness is not 0, handle it below
if ( s && s->m_niceness > 0 ) continue;
// must be set
if ( ! FD_ISSET ( fd , &readfds ) ) continue;
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
log( LOG_DEBUG, "loop: calling cback0 niceness=%" PRId32" fd=%i", s ? s->m_niceness : -1, fd );
}
callCallbacks_ass (true,fd, now,0);//read?
}
for ( int32_t i = 0 ; i < s_numWriteFds ; i++ ) {
if ( n == 0 ) break;
int fd = s_writeFds[i];
Slot *s = m_writeSlots [ fd ];
// if niceness is not 0, handle it below
if ( s && s->m_niceness > 0 ) continue;
// fds are always ready for writing so take this out.
if ( ! FD_ISSET ( fd , &writefds ) ) continue;
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
log( LOG_DEBUG, "loop: calling wcback0 niceness=%" PRId32" fd=%i", s ? s->m_niceness : -1, fd );
}
callCallbacks_ass (false,fd, now,0);//false=forRead?
}
cleanupFinishedJobs();
// now for lower priority fds
for ( int32_t i = 0 ; i < s_numReadFds ; i++ ) {
if ( n == 0 ) break;
int fd = s_readFds[i];
Slot *s = m_readSlots [ fd ];
// if niceness is <= 0 we did it above
if ( s && s->m_niceness <= 0 ) continue;
// must be set
if ( ! FD_ISSET ( fd , &readfds ) ) continue;
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
log( LOG_DEBUG, "loop: calling cback1 fd=%i", fd );
}
callCallbacks_ass (true,fd, now,1);//read?
}
for ( int32_t i = 0 ; i < s_numWriteFds ; i++ ) {
if ( n == 0 ) {
break;
}
int fd = s_writeFds[i];
Slot *s = m_writeSlots[fd];
// if niceness is <= 0 we did it above
if ( s && s->m_niceness <= 0 ) {
continue;
}
// must be set
if ( ! FD_ISSET ( fd , &writefds ) ) {
continue;
}
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
if( s ) {
log( LOG_DEBUG, "loop: calling wcback1 niceness=%" PRId32" fd=%i", s->m_niceness, fd );
}
else {
log( LOG_WARN, "loop: calling wcback1. Slot not found! fd=%i", fd );
}
}
callCallbacks_ass(false, fd, now, 1);//forread?
}
cleanupFinishedJobs();
// call sleepers if they need it
// call this every (about) 1 second
int32_t elapsed = gettimeofdayInMilliseconds() - s_lastTime;
// if someone changed the system clock on us, this could be negative
// so fix it! otherwise, times may NEVER get called in our lifetime
if ( elapsed < 0 ) {
elapsed = m_minTick;
}
if ( elapsed >= m_minTick ) {
// MAX_NUM_FDS is the fd for sleep callbacks
callCallbacks_ass ( true , MAX_NUM_FDS , gettimeofdayInMilliseconds() );
// note the last time we called them
s_lastTime = gettimeofdayInMilliseconds();
cleanupFinishedJobs();
}
logDebug( g_conf.m_logDebugLoop, "loop: Exited doPoll.");
}
void Loop::wakeupPollLoop() {
char dummy='d';
ssize_t ignored __attribute__((unused)) = write(m_pipeFd[1],&dummy,1);; // shut up gcc warning: ignoring return value
}
int gbsystem(const char *cmd ) {
log("gb: running system(\"%s\")", cmd);
return system(cmd);
}