forked from Mirrors/privacore-open-source-search-engine
934 lines
28 KiB
C++
934 lines
28 KiB
C++
#include "gb-include.h"
|
|
|
|
#include "Loop.h"
|
|
#include "JobScheduler.h"
|
|
#include "UdpServer.h"
|
|
#include "HttpServer.h" // g_httpServer.m_tcp.m_numQueued
|
|
#include "Profiler.h"
|
|
#include "Process.h"
|
|
#include "PageParser.h"
|
|
#include "Conf.h"
|
|
|
|
#include "Stats.h"
|
|
|
|
#include <execinfo.h>
|
|
#include <sys/auxv.h>
|
|
#include <stdio.h>
|
|
#include <sys/time.h>
|
|
#include <sys/types.h>
|
|
#include <signal.h>
|
|
#include <fcntl.h> // fcntl()
|
|
#include <sys/poll.h> // POLLIN, POLLPRI, ...
|
|
|
|
// raised from 5000 to 10000 because we have more UdpSlots now and Multicast
|
|
// will call g_loop.registerSleepCallback() if it fails to get a UdpSlot to
|
|
// send on.
|
|
#define MAX_SLOTS 10000
|
|
|
|
|
|
// TODO: . if signal queue overflows another signal is sent
|
|
// . capture that signal and use poll or something???
|
|
|
|
// Tricky Gotchas:
|
|
// TODO: if an event happens on a TCP fd/socket before we fully accept it
|
|
// we should just register it then call the read callback in case
|
|
// we just missed a ready for reading signal!!!!!
|
|
// TODO: signals can be gotten off the queue after we've closed an fd
|
|
// in which case the handler should be removed from Loop's registry
|
|
// BEFORE being closed... so the handler will be NULL... ???
|
|
// NOTE: keep in mind that the signals might be delayed or be really fast!
|
|
|
|
// TODO: don't mask signals, catch them as they arrive? (like in phhttpd)
|
|
|
|
|
|
// a global class extern'd in .h file
|
|
Loop g_loop;
|
|
|
|
// the global niceness
|
|
char g_niceness = 0;
|
|
|
|
// use this in case we unregister the "next" callback
|
|
static Slot *s_callbacksNext;
|
|
|
|
// free up all our mem
|
|
void Loop::reset() {
|
|
if ( m_slots ) {
|
|
log(LOG_DEBUG,"db: resetting loop");
|
|
mfree ( m_slots , MAX_SLOTS * sizeof(Slot) , "Loop" );
|
|
}
|
|
m_slots = NULL;
|
|
}
|
|
|
|
static void sigbadHandler ( int x , siginfo_t *info , void *y ) ;
|
|
static void sigpwrHandler ( int x , siginfo_t *info , void *y ) ;
|
|
static void sighupHandler ( int x , siginfo_t *info , void *y ) ;
|
|
static void sigprofHandler(int signo, siginfo_t *info, void *context);
|
|
|
|
void Loop::unregisterReadCallback ( int fd, void *state , void (* callback)(int fd,void *state) ){
|
|
if ( fd < 0 ) return;
|
|
// from reading
|
|
unregisterCallback ( m_readSlots,fd, state , callback, true);
|
|
}
|
|
|
|
void Loop::unregisterWriteCallback ( int fd, void *state , void (* callback)(int fd,void *state)){
|
|
// from writing
|
|
unregisterCallback ( m_writeSlots , fd , state,callback,false);
|
|
}
|
|
|
|
void Loop::unregisterSleepCallback ( void *state , void (* callback)(int fd,void *state)){
|
|
unregisterCallback (m_readSlots,MAX_NUM_FDS,state,callback,true);
|
|
}
|
|
|
|
static fd_set s_selectMaskRead;
|
|
static fd_set s_selectMaskWrite;
|
|
|
|
static int s_readFds[MAX_NUM_FDS];
|
|
static int32_t s_numReadFds = 0;
|
|
static int s_writeFds[MAX_NUM_FDS];
|
|
static int32_t s_numWriteFds = 0;
|
|
|
|
void Loop::unregisterCallback ( Slot **slots , int fd , void *state , void (* callback)(int fd,void *state) ,
|
|
bool forReading ) {
|
|
// bad fd
|
|
if ( fd < 0 ) {log(LOG_LOGIC,
|
|
"loop: fd to unregister is negative.");return;}
|
|
// set a flag if we found it
|
|
bool found = false;
|
|
// slots is m_readSlots OR m_writeSlots
|
|
Slot *s = slots [ fd ];
|
|
Slot *lastSlot = NULL;
|
|
// . keep track of new min tick for sleep callbacks
|
|
// . sleep a min of 40ms so g_now is somewhat up to date
|
|
int32_t min = 40; // 0x7fffffff;
|
|
int32_t lastMin = min;
|
|
|
|
// chain through all callbacks registerd with this fd
|
|
while ( s ) {
|
|
// get the next slot (NULL if no more)
|
|
Slot *next = s->m_next;
|
|
// if we're unregistering a sleep callback
|
|
// we might have to recalculate m_minTick
|
|
if ( s->m_tick < min ) { lastMin = min; min = s->m_tick; }
|
|
// skip this slot if callbacks don't match
|
|
if ( s->m_callback != callback ) { lastSlot = s; goto skip; }
|
|
// skip this slot if states don't match
|
|
if ( s->m_state != state ) { lastSlot = s; goto skip; }
|
|
// free this slot since it callback matches "callback"
|
|
//mfree ( s , sizeof(Slot) , "Loop" );
|
|
returnSlot ( s );
|
|
found = true;
|
|
// if the last one, then remove the FD from s_fdList
|
|
// so and clear a bit so doPoll() function is fast
|
|
if ( slots[fd] == s && s->m_next == NULL ) {
|
|
for (int32_t i = 0; i < s_numReadFds ; i++ ) {
|
|
if ( ! forReading ) break;
|
|
if ( s_readFds[i] != fd ) continue;
|
|
s_readFds[i] = s_readFds[s_numReadFds-1];
|
|
s_numReadFds--;
|
|
// remove from select mask too
|
|
FD_CLR(fd,&s_selectMaskRead );
|
|
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
|
|
log( "loop: unregistering read callback for fd=%i", fd );
|
|
}
|
|
break;
|
|
}
|
|
for (int32_t i = 0; i < s_numWriteFds ; i++ ) {
|
|
if ( forReading ) break;
|
|
if ( s_writeFds[i] != fd ) continue;
|
|
s_writeFds[i] = s_writeFds[s_numWriteFds-1];
|
|
s_numWriteFds--;
|
|
// remove from select mask too
|
|
FD_CLR(fd,&s_selectMaskWrite);
|
|
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
|
|
log( LOG_DEBUG, "loop: unregistering write callback for fd=%" PRId32" from write #wrts=%" PRId32,
|
|
( int32_t ) fd, ( int32_t ) s_numWriteFds );
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
// debug msg
|
|
//log("Loop::unregistered fd=%" PRId32" state=%" PRIu32, fd, (int32_t)state );
|
|
// revert back to old min if this is the Slot we're removing
|
|
min = lastMin;
|
|
// excise the previous slot from linked list
|
|
if ( lastSlot ) lastSlot->m_next = next;
|
|
else slots[fd] = next;
|
|
// watch out if we're in the previous callback, we need to
|
|
// fix the linked list in callCallbacks_ass
|
|
if ( s_callbacksNext == s ) s_callbacksNext = next;
|
|
skip:
|
|
// advance to the next slot
|
|
s = next;
|
|
}
|
|
// set our new minTick if we were unregistering a sleep callback
|
|
if ( fd == MAX_NUM_FDS ) {
|
|
m_minTick = min;
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
bool Loop::registerReadCallback ( int fd, void *state, void (* callback)(int fd,void *state ) , int32_t niceness ) {
|
|
// the "true" answers the question "for reading?"
|
|
if ( addSlot ( true, fd, state, callback, niceness ) ) {
|
|
return true;
|
|
}
|
|
|
|
log( LOG_WARN, "loop: Unable to register read callback." );
|
|
return false;
|
|
}
|
|
|
|
|
|
bool Loop::registerWriteCallback ( int fd, void *state, void (* callback)(int fd, void *state ) , int32_t niceness ) {
|
|
// the "false" answers the question "for reading?"
|
|
if ( addSlot ( false, fd, state, callback, niceness ) ) {
|
|
return true;
|
|
}
|
|
|
|
log( LOG_WARN, "loop: Unable to register write callback.");
|
|
return false;
|
|
}
|
|
|
|
// tick is in milliseconds
|
|
bool Loop::registerSleepCallback ( int32_t tick, void *state, void (* callback)(int fd,void *state ),
|
|
int32_t niceness, bool immediate ) {
|
|
if ( ! addSlot ( true, MAX_NUM_FDS, state, callback, niceness, tick, immediate ) ) {
|
|
log( LOG_WARN, "loop: Unable to register sleep callback" );
|
|
return false;
|
|
}
|
|
|
|
if ( tick < m_minTick ) {
|
|
m_minTick = tick;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
// . returns false and sets g_errno on error
|
|
bool Loop::addSlot ( bool forReading , int fd, void *state, void (* callback)(int fd, void *state),
|
|
int32_t niceness , int32_t tick, bool immediate ) {
|
|
// ensure fd is >= 0
|
|
if ( fd < 0 ) {
|
|
g_errno = EBADENGINEER;
|
|
log(LOG_LOGIC,"loop: fd to register is negative.");
|
|
return false;
|
|
}
|
|
// sanity
|
|
if ( fd > MAX_NUM_FDS ) {
|
|
log("loop: bad fd of %" PRId32,(int32_t)fd);
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
|
|
log( LOG_DEBUG, "loop: registering %s callback sd=%i", forReading ? "read" : "write", fd);
|
|
}
|
|
|
|
// . ensure fd not already registered with this callback/state
|
|
// . prevent dups so you can keep calling register w/o fear
|
|
Slot *s;
|
|
if ( forReading ) {
|
|
s = m_readSlots [ fd ];
|
|
} else {
|
|
s = m_writeSlots [ fd ];
|
|
}
|
|
|
|
while ( s ) {
|
|
if ( s->m_callback == callback &&
|
|
s->m_state == state ) {
|
|
// don't set g_errno for this anymore, just bitch
|
|
//g_errno = EBADENGINEER;
|
|
log(LOG_LOGIC,"loop: fd=%i is already registered.",fd);
|
|
return true;
|
|
}
|
|
s = s->m_next;
|
|
}
|
|
// . make a new slot
|
|
// . TODO: implement mprimealloc() to pre-alloc slots for us for speed
|
|
//s = (Slot *) mmalloc ( sizeof(Slot ) ,"Loop");
|
|
s = getEmptySlot ( );
|
|
if ( ! s ) return false;
|
|
// for pointing to slot already in position for fd
|
|
Slot *next ;
|
|
// store ourselves in the slot for this fd
|
|
if ( forReading ) {
|
|
next = m_readSlots [ fd ];
|
|
m_readSlots [ fd ] = s;
|
|
// if not already registered, add to list
|
|
if ( fd < MAX_NUM_FDS && ! FD_ISSET( fd,&s_selectMaskRead ) ) {
|
|
// sanity
|
|
if ( s_numReadFds >= MAX_NUM_FDS){
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
s_readFds[s_numReadFds++] = fd;
|
|
FD_SET ( fd,&s_selectMaskRead );
|
|
}
|
|
// fd == MAX_NUM_FDS if it's a sleep callback
|
|
//if ( fd < MAX_NUM_FDS ) {
|
|
//FD_SET ( fd , &m_readfds );
|
|
//FD_SET ( fd , &m_exceptfds );
|
|
//}
|
|
}
|
|
else {
|
|
next = m_writeSlots [ fd ];
|
|
m_writeSlots [ fd ] = s;
|
|
//FD_SET ( fd , &m_writefds );
|
|
// if not already registered, add to list
|
|
if ( fd<MAX_NUM_FDS && ! FD_ISSET ( fd,&s_selectMaskWrite ) ) {
|
|
// sanity
|
|
if ( s_numWriteFds>=MAX_NUM_FDS){
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
s_writeFds[s_numWriteFds++] = fd;
|
|
FD_SET ( fd,&s_selectMaskWrite );
|
|
}
|
|
}
|
|
// set our callback and state
|
|
s->m_callback = callback;
|
|
s->m_state = state;
|
|
|
|
// point to the guy that was registered for fd before us
|
|
s->m_next = next;
|
|
|
|
// save our niceness for doPoll()
|
|
s->m_niceness = niceness;
|
|
|
|
// store the tick for sleep wrappers (should be max for others)
|
|
s->m_tick = tick;
|
|
|
|
// the last called time
|
|
s->m_lastCall = immediate ? 0 : gettimeofdayInMilliseconds();
|
|
|
|
// debug msg
|
|
//log("Loop::registered fd=%i state=%" PRIu32,fd,state);
|
|
|
|
// if fd == MAX_NUM_FDS if it's a sleep callback
|
|
if ( fd == MAX_NUM_FDS ) {
|
|
return true;
|
|
}
|
|
|
|
// watch out for big bogus fds used for thread exit callbacks
|
|
if ( fd > MAX_NUM_FDS ) {
|
|
return true;
|
|
}
|
|
|
|
// set fd non-blocking
|
|
return setNonBlocking ( fd , niceness ) ;
|
|
}
|
|
|
|
// . now make sure we're listening for an interrupt on this fd
|
|
// . set it non-blocing and enable signal catching for it
|
|
// . listen for an interrupt for this fd
|
|
bool Loop::setNonBlocking ( int fd , int32_t niceness ) {
|
|
retry:
|
|
int flags = fcntl ( fd , F_GETFL ) ;
|
|
if ( flags < 0 ) {
|
|
// valgrind
|
|
if ( errno == EINTR ) goto retry;
|
|
g_errno = errno;
|
|
log( LOG_WARN, "loop: fcntl(F_GETFL): %s.",strerror(errno));
|
|
return false;
|
|
}
|
|
|
|
retry9:
|
|
if ( fcntl ( fd, F_SETFL, flags|O_NONBLOCK|O_ASYNC) < 0 ) {
|
|
// valgrind
|
|
if ( errno == EINTR ) goto retry9;
|
|
g_errno = errno;
|
|
log( LOG_WARN, "loop: fcntl(NONBLOCK): %s.",strerror(errno));
|
|
return false;
|
|
}
|
|
|
|
// we use select()/poll now so skip stuff below
|
|
return true;
|
|
}
|
|
|
|
// . if "forReading" is true call callbacks registered for reading on "fd"
|
|
// . if "forReading" is false call callbacks registered for writing on "fd"
|
|
// . if fd is MAX_NUM_FDS and "forReading" is true call all sleepy callbacks
|
|
void Loop::callCallbacks_ass ( bool forReading , int fd , int64_t now , int32_t niceness ) {
|
|
// save the g_errno to send to all callbacks
|
|
int saved_errno = g_errno;
|
|
|
|
// get the first Slot in the chain that is waiting on this fd
|
|
Slot *s ;
|
|
if ( forReading ) s = m_readSlots [ fd ];
|
|
else s = m_writeSlots [ fd ];
|
|
//s = m_readSlots [ fd ];
|
|
// ensure we called something
|
|
int32_t numCalled = 0;
|
|
|
|
// . now call all the callbacks
|
|
// . most will re-register themselves (i.e. call registerCallback...()
|
|
while ( s ) {
|
|
// skip this slot if he has no callback
|
|
if ( ! s->m_callback ) {
|
|
continue;
|
|
}
|
|
|
|
// NOTE: callback can unregister fd for Slot s, so get next
|
|
//Slot *next = s->m_next;
|
|
s_callbacksNext = s->m_next;
|
|
|
|
// watch out if clock was set back
|
|
if ( s->m_lastCall > now ) {
|
|
s->m_lastCall = now;
|
|
}
|
|
|
|
// if we're a sleep callback, check to make sure not premature
|
|
if ( fd == MAX_NUM_FDS && s->m_lastCall + s->m_tick > now ) {
|
|
s = s_callbacksNext;
|
|
continue;
|
|
}
|
|
|
|
// skip if not a niceness match
|
|
if ( niceness == 0 && s->m_niceness != 0 ) {
|
|
s = s_callbacksNext;
|
|
continue;
|
|
}
|
|
|
|
// update the lastCall timestamp for this slot
|
|
if ( fd == MAX_NUM_FDS ) {
|
|
s->m_lastCall = now;
|
|
}
|
|
|
|
// do the callback
|
|
|
|
logDebug( g_conf.m_logDebugLoop, "loop: enter fd callback fd=%d nice=%" PRId32, fd, s->m_niceness );
|
|
|
|
// sanity check. -1 no longer supported
|
|
if ( s->m_niceness < 0 ) {
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
// Temporarily (for the duration of the callback call) switch
|
|
// niceness to the niceness of the slot
|
|
int32_t saved_niceness = g_niceness;
|
|
g_niceness = s->m_niceness;
|
|
|
|
// make sure not 2
|
|
if ( g_niceness >= 2 ) {
|
|
g_niceness = 1;
|
|
}
|
|
|
|
s->m_callback ( fd , s->m_state );
|
|
|
|
// restore niceness
|
|
g_niceness = saved_niceness;
|
|
|
|
logDebug( g_conf.m_logDebugLoop, "loop: exit fd callback fd=%" PRId32" nice=%" PRId32,
|
|
(int32_t)fd,(int32_t)s->m_niceness );
|
|
|
|
// inc the flag
|
|
numCalled++;
|
|
// reset g_errno so all callbacks for this fd get same g_errno
|
|
g_errno = saved_errno;
|
|
// get the next n (will be -1 if no slot after it)
|
|
s = s_callbacksNext;
|
|
}
|
|
|
|
s_callbacksNext = NULL;
|
|
}
|
|
|
|
Loop::Loop ( ) {
|
|
m_isDoingLoop = false;
|
|
|
|
// set all callbacks to NULL so we know they're empty
|
|
for ( int32_t i = 0 ; i < MAX_NUM_FDS+2 ; i++ ) {
|
|
m_readSlots [i] = NULL;
|
|
m_writeSlots[i] = NULL;
|
|
}
|
|
// the extra sleep slots
|
|
//m_readSlots [ MAX_NUM_FDS ] = NULL;
|
|
m_slots = NULL;
|
|
m_pipeFd[0] = -1;
|
|
m_pipeFd[1] = -1;
|
|
}
|
|
|
|
// free all slots from addSlots
|
|
Loop::~Loop ( ) {
|
|
reset();
|
|
if(m_pipeFd[0]>=0) {
|
|
close(m_pipeFd[0]);
|
|
m_pipeFd[0] = -1;
|
|
}
|
|
if(m_pipeFd[1]>=0) {
|
|
close(m_pipeFd[1]);
|
|
m_pipeFd[1] = -1;
|
|
}
|
|
}
|
|
|
|
// returns NULL and sets g_errno if none are left
|
|
Slot *Loop::getEmptySlot ( ) {
|
|
Slot *s = m_head;
|
|
if ( ! s ) {
|
|
g_errno = EBUFTOOSMALL;
|
|
log("loop: No empty slots available. "
|
|
"Increase #define MAX_SLOTS.");
|
|
return NULL;
|
|
}
|
|
m_head = s->m_nextAvail;
|
|
return s;
|
|
}
|
|
|
|
void Loop::returnSlot ( Slot *s ) {
|
|
s->m_nextAvail = m_head;
|
|
m_head = s;
|
|
}
|
|
|
|
|
|
bool Loop::init ( ) {
|
|
|
|
// clear this up here before using in doPoll()
|
|
FD_ZERO(&s_selectMaskRead);
|
|
FD_ZERO(&s_selectMaskWrite);
|
|
|
|
// set-up wakeup pipe
|
|
if(pipe(m_pipeFd)!=0) {
|
|
log(LOG_ERROR,"pipe() failed with errno=%d",errno);
|
|
return false;
|
|
}
|
|
setNonBlocking(m_pipeFd[0],0);
|
|
setNonBlocking(m_pipeFd[1],0);
|
|
FD_SET(m_pipeFd[0],&s_selectMaskRead);
|
|
|
|
// sighupHandler() will set this to true so we know when to shutdown
|
|
m_shutdown = 0;
|
|
// . reset this cuz we have no sleep callbacks right now
|
|
// . sleep a min of 40ms so g_now is somewhat up to date
|
|
m_minTick = 40; //0x7fffffff;
|
|
// make slots
|
|
m_slots = (Slot *) mmalloc ( MAX_SLOTS * (int32_t)sizeof(Slot) , "Loop" );
|
|
if ( ! m_slots ) return false;
|
|
// log it
|
|
log(LOG_DEBUG,"loop: Allocated %" PRId32" bytes for %" PRId32" callbacks.",
|
|
MAX_SLOTS * (int32_t)sizeof(Slot),(int32_t)MAX_SLOTS);
|
|
// init link list ptr
|
|
for ( int32_t i = 0 ; i < MAX_SLOTS - 1 ; i++ ) {
|
|
m_slots[i].m_nextAvail = &m_slots[i+1];
|
|
}
|
|
m_slots[MAX_SLOTS - 1].m_nextAvail = NULL;
|
|
m_head = &m_slots[0];
|
|
m_tail = &m_slots[MAX_SLOTS - 1];
|
|
// an innocent log msg
|
|
//log ( 0 , "Loop: starting the i/o loop");
|
|
// . when using threads GB_SIGRTMIN becomes 35, not 32 anymore
|
|
// since threads use these signals to reactivate suspended threads
|
|
// . debug msg
|
|
//log("admin: GB_SIGRTMIN=%" PRId32, (int32_t)GB_SIGRTMIN );
|
|
// . block the GB_SIGRTMIN signal
|
|
// . anytime this is raised it goes onto the signal queue
|
|
// . we use sigtimedwait() to get signals off the queue
|
|
// . sigtimedwait() selects the lowest signo first for handling
|
|
// . therefore, GB_SIGRTMIN is higher priority than (GB_SIGRTMIN + 1)
|
|
//sigfillset ( &sigs );
|
|
|
|
struct sigaction actSigPipe;
|
|
//Ignore SIGPIPE. We want a plain error return instead from system calls,.
|
|
actSigPipe.sa_handler = SIG_IGN;
|
|
sigemptyset(&actSigPipe.sa_mask);
|
|
actSigPipe.sa_flags = 0;
|
|
sigaction(SIGPIPE,&actSigPipe,NULL);
|
|
|
|
// handle SIGHUP and SIGTERM signals gracefully by saving and shutting down
|
|
struct sigaction saShutdown;
|
|
sigemptyset(&saShutdown.sa_mask);
|
|
saShutdown.sa_flags = SA_SIGINFO | SA_RESTART;
|
|
saShutdown.sa_sigaction = sighupHandler;
|
|
sigaction(SIGHUP, &saShutdown, NULL);
|
|
sigaction(SIGTERM, &saShutdown, NULL);
|
|
//sigaction(SIGABRT, &sa, NULL);
|
|
|
|
// we should save our data on segv, sigill, sigfpe, sigbus
|
|
struct sigaction saBad;
|
|
sigemptyset(&saBad.sa_mask);
|
|
saBad.sa_flags = SA_SIGINFO | SA_RESTART;
|
|
saBad.sa_sigaction = sigbadHandler;
|
|
sigaction(SIGSEGV, &saBad, NULL);
|
|
sigaction(SIGILL, &saBad, NULL);
|
|
sigaction(SIGFPE, &saBad, NULL);
|
|
sigaction(SIGBUS, &saBad, NULL);
|
|
|
|
// if the UPS is about to go off it sends a SIGPWR
|
|
struct sigaction saPower;
|
|
sigemptyset(&saPower.sa_mask);
|
|
saPower.sa_flags = SA_SIGINFO | SA_RESTART;
|
|
saPower.sa_sigaction = sigpwrHandler;
|
|
sigaction(SIGPWR, &saPower, NULL);
|
|
|
|
//SIGPROF is used by the profiler
|
|
struct sigaction saProfile;
|
|
sigemptyset(&saProfile.sa_mask);
|
|
saProfile.sa_flags = SA_SIGINFO | SA_RESTART;
|
|
saProfile.sa_sigaction = sigprofHandler;
|
|
sigaction(SIGPROF, &saProfile, NULL);
|
|
// setitimer(ITIMER_PROF...) is called when profiling is enabled/disabled
|
|
// it has noticeable overhead so it must not be enabled by default.
|
|
|
|
// success
|
|
return true;
|
|
}
|
|
|
|
// TODO: if we get a segfault while saving, what then?
|
|
void sigpwrHandler ( int x , siginfo_t *info , void *y ) {
|
|
// let main process know to shutdown
|
|
g_loop.m_shutdown = 3;
|
|
}
|
|
|
|
void printStackTrace (bool print_location) {
|
|
logf(LOG_ERROR, "gb: Printing stack trace");
|
|
|
|
static void *s_bt[200];
|
|
size_t sz = backtrace(s_bt, 200);
|
|
|
|
// find ourself
|
|
const char* process = (const char*)getauxval(AT_EXECFN);
|
|
|
|
for( size_t i = 0; i < sz; ++i ) {
|
|
char cmd[256];
|
|
sprintf(cmd,"addr2line -e %s 0x%" PRIx64, process, (uint64_t)s_bt[i]);
|
|
logf(LOG_ERROR, "%s", cmd);
|
|
}
|
|
}
|
|
|
|
|
|
// TODO: if we get a segfault while saving, what then?
|
|
void sigbadHandler ( int x , siginfo_t *info , void *y ) {
|
|
|
|
log("loop: sigbadhandler. disabling handler from recall.");
|
|
// . don't allow this handler to be called again
|
|
// . does this work if we're in a thread?
|
|
struct sigaction sa;
|
|
sigemptyset (&sa.sa_mask);
|
|
sa.sa_flags = SA_SIGINFO ; //| SA_ONESHOT;
|
|
sa.sa_sigaction = NULL;
|
|
sigaction(SIGSEGV, &sa, NULL);
|
|
sigaction(SIGILL, &sa, NULL);
|
|
sigaction(SIGFPE, &sa, NULL);
|
|
sigaction(SIGBUS, &sa, NULL);
|
|
sigaction(SIGQUIT, &sa, NULL);
|
|
sigaction(SIGSYS, &sa, NULL);
|
|
// if we've already been here, or don't need to be, then bail
|
|
if ( g_loop.m_shutdown ) {
|
|
log("loop: sigbadhandler. shutdown already called.");
|
|
return;
|
|
}
|
|
|
|
// unwind
|
|
printStackTrace();
|
|
|
|
// if we're a thread, let main process know to shutdown
|
|
g_loop.m_shutdown = 2;
|
|
log("loop: sigbadhandler. trying to save now. mode=%" PRId32, (int32_t)g_process.m_mode);
|
|
|
|
// . this will save all Rdb's
|
|
// . if "urgent" is true it will dump core
|
|
// . if "urgent" is true it won't broadcast its shutdown to all hosts
|
|
g_process.shutdown ( true );
|
|
}
|
|
|
|
static void sigprofHandler(int signo, siginfo_t *info, void *context)
|
|
{
|
|
//This is called on SIGPROF meaning that profiling is enabled
|
|
g_profiler.getStackFrame();
|
|
}
|
|
|
|
// shit, we can't make this realtime!! RdbClose() cannot be called by a
|
|
// real time sig handler
|
|
void sighupHandler ( int x , siginfo_t *info , void *y ) {
|
|
// let main process know to shutdown
|
|
g_loop.m_shutdown = 1;
|
|
}
|
|
|
|
// . keep a timestamp for the last time we called the sleep callbacks
|
|
// . we have to call those every 1 second
|
|
static int64_t s_lastTime = 0;
|
|
|
|
void Loop::runLoop ( ) {
|
|
|
|
// set of signals to watch for
|
|
sigset_t sigs0;
|
|
|
|
// clear all signals from the set
|
|
sigemptyset ( &sigs0 );
|
|
|
|
// . set sigs on which sigtimedwait() listens for
|
|
// . add this signal to our set of signals to watch (currently NONE)
|
|
sigaddset ( &sigs0, SIGCHLD );
|
|
// . TODO: do we need to mask SIGIO too? (sig queue overflow?)
|
|
// . i would think so, because what if we tried to queue an important
|
|
// handler to be called in the high priority UdpServer but the queue
|
|
// was full? Then we would finish processing the signals on the queue
|
|
// before we would address the excluded high priority signals by
|
|
// calling doPoll()
|
|
sigaddset ( &sigs0, SIGIO );
|
|
|
|
s_lastTime = 0;
|
|
|
|
m_isDoingLoop = true;
|
|
|
|
// . now loop forever waiting for signals
|
|
// . but every second check for timer-based events
|
|
for (;;) {
|
|
g_errno = 0;
|
|
|
|
if ( m_shutdown ) {
|
|
// a msg
|
|
if (m_shutdown == 1) {
|
|
log(LOG_INIT,"loop: got SIGHUP or SIGTERM.");
|
|
} else if (m_shutdown == 2) {
|
|
log(LOG_INIT,"loop: got SIGBAD in thread.");
|
|
} else {
|
|
log(LOG_INIT,"loop: got SIGPWR.");
|
|
}
|
|
|
|
// . turn off interrupts here because it doesn't help to do
|
|
// it in the thread
|
|
// . TODO: turn off signals for sigbadhandler()
|
|
// if thread got the signal, just wait for him to save all
|
|
// Rdbs and then dump core
|
|
if ( m_shutdown == 2 ) {
|
|
//log(0,"Thread is saving & shutting down urgently.");
|
|
//log("loop: Resuming despite thread crash.");
|
|
//m_shutdown = 0;
|
|
//goto BIGLOOP;
|
|
}
|
|
|
|
// otherwise, thread did not save, so we must do it
|
|
log ( LOG_INIT ,"loop: Saving and shutting down urgently.");
|
|
|
|
g_process.shutdown ( true );
|
|
}
|
|
|
|
//
|
|
//
|
|
// THE HEART OF GB. process events/signals on FDs.
|
|
//
|
|
//
|
|
doPoll();
|
|
}
|
|
}
|
|
|
|
//--- TODO: flush the signal queue after polling until done
|
|
//--- are we getting stale signals resolved by flush so we get
|
|
//--- read event on a socket that isnt in read mode???
|
|
// TODO: set signal handler to SIG_DFL to prevent signals from queuing up now
|
|
// . this handles high priority fds first (lowest niceness)
|
|
void Loop::doPoll ( ) {
|
|
// set time
|
|
//g_now = gettimeofdayInMilliseconds();
|
|
|
|
logDebug( g_conf.m_logDebugLoop, "loop: Entered doPoll." );
|
|
|
|
if(g_udpServer.needBottom()) {
|
|
g_udpServer.makeCallbacks(1);
|
|
}
|
|
|
|
int32_t n;
|
|
|
|
timeval v;
|
|
v.tv_sec = 0;
|
|
// 10ms for sleepcallbacks so they can be called...
|
|
// and we need this to be the same as sigalrmhandler() since we
|
|
// keep track of cpu usage here too, since sigalrmhandler is "VT"
|
|
// based it only goes off when that much "cpu time" has elapsed.
|
|
v.tv_usec = QUICKPOLL_INTERVAL * 1000;
|
|
|
|
again:
|
|
|
|
// gotta copy to our own since bits get cleared by select() function
|
|
fd_set readfds = s_selectMaskRead;
|
|
fd_set writefds = s_selectMaskWrite;
|
|
|
|
logDebug( g_conf.m_logDebugLoop, "loop: in select" );
|
|
|
|
// . poll the fd's searching for socket closes
|
|
// . the sigalrms and sigvtalrms and SIGCHLDs knock us out of this
|
|
// select() with n < 0 and errno equal to EINTR.
|
|
// . crap the sigalarms kick us out here every 1ms. i noticed
|
|
// then when running disableTimer() above and we don't get
|
|
// any EINTRs... can we mask those out here? it only seems to be
|
|
// the SIGALRMs not the SIGVTALRMs that interrupt us.
|
|
n = select (MAX_NUM_FDS,
|
|
&readfds,
|
|
&writefds,
|
|
NULL,//&exceptfds,
|
|
&v );
|
|
|
|
if ( n >= 0 ) errno = 0;
|
|
|
|
logDebug( g_conf.m_logDebugLoop, "loop: out select n=%" PRId32" errno=%" PRId32" errnomsg=%s ms_wait=%i",
|
|
(int32_t)n,(int32_t)errno,mstrerror(errno), (int)v.tv_sec*1000);
|
|
|
|
if ( n < 0 ) {
|
|
// valgrind
|
|
if ( errno == EINTR ) {
|
|
// got it. if we get a sig alarm or vt alarm or
|
|
// SIGCHLD (from Threads.cpp) we end up here.
|
|
//log("loop: got errno=%" PRId32,(int32_t)errno);
|
|
|
|
// if not linux we have to decrease this by 1ms
|
|
//count -= 1000;
|
|
|
|
// and re-assign to wait less time. we are
|
|
// assuming SIGALRM goes off once per ms and if
|
|
// that is not what interrupted us we may end
|
|
// up exiting early
|
|
//if ( count <= 0 && m_shutdown ) return;
|
|
|
|
// wait less this time around
|
|
//v.tv_usec = count;
|
|
|
|
// if shutting down was it a sigterm ?
|
|
if ( m_shutdown ) goto again;
|
|
|
|
// handle returned threads for niceness 0
|
|
g_jobScheduler.cleanup_finished_jobs();
|
|
|
|
// high niceness threads
|
|
g_jobScheduler.cleanup_finished_jobs();
|
|
|
|
goto again;
|
|
}
|
|
g_errno = errno;
|
|
log( LOG_WARN, "loop: select: %s.", strerror( g_errno ) );
|
|
return;
|
|
}
|
|
|
|
// if we wait for 10ms with nothing happening, fix cpu usage here too
|
|
// if ( n == 0 ) {
|
|
// Host *h = g_hostdb.m_myHost;
|
|
// h->m_cpuUsage = .99 * h->m_cpuUsage + .01 * 000;
|
|
// }
|
|
|
|
logDebug( g_conf.m_logDebugLoop, "loop: Got %" PRId32" fds waiting.", n );
|
|
|
|
if (g_conf.m_logDebugLoop || g_conf.m_logDebugTcp) {
|
|
for ( int32_t i = 0; i < MAX_NUM_FDS; i++) {
|
|
// continue if not set for reading
|
|
if ( FD_ISSET ( i, &readfds ) ) {
|
|
log( LOG_DEBUG, "loop: fd=%" PRId32" is on for read", i);
|
|
}
|
|
if ( FD_ISSET ( i, &writefds ) ) {
|
|
log( LOG_DEBUG, "loop: fd=%" PRId32" is on for write", i);
|
|
}
|
|
// if niceness is not -1, handle it below
|
|
}
|
|
}
|
|
|
|
// handle returned threads for niceness 0
|
|
g_jobScheduler.cleanup_finished_jobs();
|
|
|
|
bool calledOne = false;
|
|
const int64_t now = gettimeofdayInMilliseconds();
|
|
|
|
if( n > 0 && FD_ISSET( m_pipeFd[0], &readfds ) ) {
|
|
//drain the wakeup pipe
|
|
char buf[32];
|
|
(void)read( m_pipeFd[0], buf, sizeof(buf) );
|
|
n--;
|
|
FD_CLR( m_pipeFd[0], &readfds );
|
|
}
|
|
|
|
// now keep this fast, too. just check fds we need to.
|
|
for ( int32_t i = 0 ; i < s_numReadFds ; i++ ) {
|
|
if ( n == 0 ) break;
|
|
int fd = s_readFds[i];
|
|
Slot *s = m_readSlots [ fd ];
|
|
// if niceness is not 0, handle it below
|
|
if ( s && s->m_niceness > 0 ) continue;
|
|
// must be set
|
|
if ( ! FD_ISSET ( fd , &readfds ) ) continue;
|
|
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
|
|
log( LOG_DEBUG, "loop: calling cback0 niceness=%" PRId32" fd=%i", s->m_niceness, fd );
|
|
}
|
|
calledOne = true;
|
|
callCallbacks_ass (true,fd, now,0);//read?
|
|
}
|
|
for ( int32_t i = 0 ; i < s_numWriteFds ; i++ ) {
|
|
if ( n == 0 ) break;
|
|
int fd = s_writeFds[i];
|
|
Slot *s = m_writeSlots [ fd ];
|
|
// if niceness is not 0, handle it below
|
|
if ( s && s->m_niceness > 0 ) continue;
|
|
// fds are always ready for writing so take this out.
|
|
if ( ! FD_ISSET ( fd , &writefds ) ) continue;
|
|
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
|
|
log( LOG_DEBUG, "loop: calling wcback0 niceness=%" PRId32" fd=%i", s->m_niceness, fd );
|
|
}
|
|
calledOne = true;
|
|
callCallbacks_ass (false,fd, now,0);//false=forRead?
|
|
}
|
|
|
|
// handle returned threads for niceness 0
|
|
g_jobScheduler.cleanup_finished_jobs();
|
|
|
|
// now for lower priority fds
|
|
for ( int32_t i = 0 ; i < s_numReadFds ; i++ ) {
|
|
if ( n == 0 ) break;
|
|
int fd = s_readFds[i];
|
|
Slot *s = m_readSlots [ fd ];
|
|
// if niceness is <= 0 we did it above
|
|
if ( s && s->m_niceness <= 0 ) continue;
|
|
// must be set
|
|
if ( ! FD_ISSET ( fd , &readfds ) ) continue;
|
|
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
|
|
log( LOG_DEBUG, "loop: calling cback1 niceness=%" PRId32" fd=%i", s->m_niceness, fd );
|
|
}
|
|
calledOne = true;
|
|
callCallbacks_ass (true,fd, now,1);//read?
|
|
}
|
|
|
|
for ( int32_t i = 0 ; i < s_numWriteFds ; i++ ) {
|
|
if ( n == 0 ) break;
|
|
int fd = s_writeFds[i];
|
|
Slot *s = m_writeSlots [ fd ];
|
|
// if niceness is <= 0 we did it above
|
|
if ( s && s->m_niceness <= 0 ) continue;
|
|
// must be set
|
|
if ( ! FD_ISSET ( fd , &writefds ) ) continue;
|
|
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugTcp ) {
|
|
log( LOG_DEBUG, "loop: calling wcback1 niceness=%" PRId32" fd=%i", s->m_niceness, fd );
|
|
}
|
|
calledOne = true;
|
|
callCallbacks_ass (false,fd, now,1);//forread?
|
|
}
|
|
|
|
// handle returned threads for all other nicenesses
|
|
g_jobScheduler.cleanup_finished_jobs();
|
|
|
|
// call sleepers if they need it
|
|
// call this every (about) 1 second
|
|
int32_t elapsed = gettimeofdayInMilliseconds() - s_lastTime;
|
|
// if someone changed the system clock on us, this could be negative
|
|
// so fix it! otherwise, times may NEVER get called in our lifetime
|
|
if ( elapsed < 0 ) {
|
|
elapsed = m_minTick;
|
|
}
|
|
|
|
if ( elapsed >= m_minTick ) {
|
|
// MAX_NUM_FDS is the fd for sleep callbacks
|
|
callCallbacks_ass ( true , MAX_NUM_FDS , gettimeofdayInMilliseconds() );
|
|
// note the last time we called them
|
|
s_lastTime = gettimeofdayInMilliseconds();
|
|
// handle returned threads for all other nicenesses
|
|
g_jobScheduler.cleanup_finished_jobs();
|
|
}
|
|
|
|
logDebug( g_conf.m_logDebugLoop, "loop: Exited doPoll.");
|
|
}
|
|
|
|
|
|
void Loop::wakeupPollLoop() {
|
|
char dummy='d';
|
|
(void)write(m_pipeFd[1],&dummy,1);
|
|
}
|
|
|
|
|
|
int gbsystem(const char *cmd ) {
|
|
log("gb: running system(\"%s\")",cmd);
|
|
int ret = system(cmd);
|
|
return ret;
|
|
}
|