250 lines
7.9 KiB
C++
250 lines
7.9 KiB
C++
// Matt Wells, Copyright Apr 2001
|
|
|
|
// . core class for handling interupts-based i/o on non-blocking descriptors
|
|
// . when an fd/state/callback is registered for reading we call your callback // when fd has a read event (same for write and sleeping)
|
|
|
|
#ifndef _LOOP_H_
|
|
#define _LOOP_H_
|
|
|
|
#include <sys/time.h>
|
|
#include <sys/types.h>
|
|
#include <signal.h>
|
|
#include <fcntl.h> // fcntl()
|
|
#include <sys/poll.h> // POLLIN, POLLPRI, ...
|
|
#ifndef F_SETSIG
|
|
#define F_SETSIG 10 // F_SETSIG
|
|
#endif
|
|
#include "Mem.h" // mmalloc, mfree
|
|
#define QUERYPRIORITYWEIGHT 16
|
|
#define QUICKPOLL_INTERVAL 10
|
|
|
|
int gbsystem(char *cmd);
|
|
FILE* gbpopen(char* cmd);
|
|
|
|
|
|
#define sleep(a) { char *xx=NULL;*xx=0; }
|
|
//#define sleep(a) logf(LOG_INFO,"sleep: sleep");
|
|
|
|
// we have 2 arrays of slots, m_readSlots and m_writeSlots
|
|
class Slot {
|
|
public:
|
|
void *m_state;
|
|
void (* m_callback)(int fd, void *state);
|
|
// the next Slot that's registerd on this fd
|
|
Slot *m_next;
|
|
// save niceness level for doPoll() to segregate
|
|
int32_t m_niceness;
|
|
// last time we called m_callback for this fd
|
|
// time_t m_lastActivity;
|
|
// . when should this fd timeout and we call the callback with
|
|
// errno set to ETIMEDOUT
|
|
// . set to -1 for never timeout
|
|
// . m_timeout is in seconds
|
|
// int32_t m_timeout;
|
|
// this callback should be called every X milliseconds
|
|
int32_t m_tick;
|
|
// when we were last called in ms time (only valid for sleep callbacks)
|
|
int64_t m_lastCall;
|
|
// linked list of available slots
|
|
Slot *m_nextAvail;
|
|
};
|
|
|
|
#define sleep(a) { char *xx=NULL;*xx=0; }
|
|
#define usleep(a) { char *xx=NULL;*xx=0; }
|
|
//#define sleep(a) logf(LOG_INFO,"sleep: sleep");
|
|
|
|
// linux 2.2 kernel has this limitation
|
|
#define MAX_NUM_FDS 1024
|
|
|
|
|
|
// . niceness can only be 0, 1 or 2
|
|
// . we use 0 for query traffic
|
|
// . we use 1 for merge disk threads
|
|
// . we use 2 for indexing/spidering
|
|
// . 0 will use the high priority udp server, g_udpServer2
|
|
// . 1+ will use the low priority udp server, g_udpServer
|
|
// . 0 niceness for disk threads will cancel other threads before launching
|
|
// . 1+ niceness threads will be set to lowest priority using setpriority()
|
|
// . 1 niceness disk thread, when running, will not allow niceness 2 to launch
|
|
//#define MAX_NICENESS 2
|
|
|
|
// are we using async signal handlers?
|
|
extern bool g_isHot;
|
|
|
|
// extern variable that let's us know if we're in a sig handler
|
|
extern bool g_inSigHandler;
|
|
|
|
// a debugging tool really
|
|
extern bool g_interruptsOn ;
|
|
|
|
// are there pending signals for which we should call g_udpServer.makeCallbacks
|
|
extern bool g_someAreQueued;
|
|
|
|
// . so sig handler can get an approx time of day
|
|
// . gettimeofday() is not async signal safe
|
|
// . this is now the "local" time, not synced
|
|
extern int64_t g_now;
|
|
|
|
// . this is now the time synced with host #0
|
|
//extern int64_t g_nowGlobal;
|
|
|
|
//this is the approximate time generated by the timer.
|
|
extern int64_t g_nowApprox;
|
|
|
|
// count of how many SIGVTALRM signals we had so far
|
|
extern int32_t g_numAlarms;
|
|
extern int32_t g_numVTAlarms;
|
|
extern int32_t g_numQuickPolls;
|
|
|
|
extern int32_t g_numSigChlds;
|
|
extern int32_t g_numSigQueues;
|
|
extern int32_t g_numSigPipes;
|
|
extern int32_t g_numSigIOs;
|
|
extern int32_t g_numSigOthers;
|
|
|
|
|
|
extern char g_niceness ;
|
|
|
|
// we make sure the same callback/handler is not hogging the cpu when it is
|
|
// niceness 0 and we do not interrupt it, so this is a critical check
|
|
extern class UdpSlot *g_callSlot;
|
|
|
|
class Loop {
|
|
|
|
public:
|
|
|
|
// constructor and stuff
|
|
Loop();
|
|
~Loop();
|
|
|
|
// free up all our mem
|
|
void reset();
|
|
|
|
// set up the signal handlers or block the signals for queueing
|
|
bool init();
|
|
|
|
// . call this to begin polling/selecting of all registered fds
|
|
// . returns false on error
|
|
bool runLoop();
|
|
|
|
// . register this "fd" with "callback"
|
|
// . "callback" will be called when fd is ready for reading
|
|
// . "timeout" is -1 if this never timesout
|
|
bool registerReadCallback ( int fd ,
|
|
void *state ,
|
|
void (* callback)(int fd,void *state ) ,
|
|
int32_t niceness );//= MAX_NICENESS ) ;
|
|
|
|
// . register this "fd" with "callback"
|
|
// . "callback" will be called when fd is ready for reading
|
|
// . "callback" will be called when there is an error on fd
|
|
bool registerWriteCallback ( int fd ,
|
|
void *state ,
|
|
void (* callback)(int fd, void *state ) ,
|
|
int32_t niceness );
|
|
|
|
// . register this callback to be called every second
|
|
// . TODO: implement "seconds" parameter
|
|
bool registerSleepCallback ( int32_t milliseconds ,
|
|
void *state,
|
|
void (* callback)(int fd,void *state ) ,
|
|
int32_t niceness = 1 );
|
|
|
|
// unregister call back for reading, writing or sleeping
|
|
void unregisterReadCallback ( int fd, void *state ,
|
|
void (* callback)(int fd,void *state),
|
|
bool silent = false );
|
|
void unregisterWriteCallback ( int fd, void *state ,
|
|
void (* callback)(int fd,void *state));
|
|
void unregisterSleepCallback ( void *state ,
|
|
void (* callback)(int fd,void *state));
|
|
|
|
// sets up for signal capture by us, g_loop
|
|
bool setNonBlocking ( int fd , int32_t niceness ) ;
|
|
|
|
// . keep this public so sighandler() can call it
|
|
// . we also call it from HttpServer::getMsgPieceWrapper() to
|
|
// notify a socket that it's m_sendBuf got some new data to send
|
|
void callCallbacks_ass (bool forReading, int fd, int64_t now = 0LL,
|
|
int32_t niceness = -1 );
|
|
|
|
// set to true by sigioHandler() so doPoll() will be called
|
|
bool m_needToPoll;
|
|
|
|
|
|
int64_t m_lastPollTime;
|
|
bool m_inQuickPoll;
|
|
bool m_needsToQuickPoll;
|
|
bool m_canQuickPoll;
|
|
itimerval m_quickInterrupt;
|
|
itimerval m_realInterrupt;
|
|
itimerval m_noInterrupt;
|
|
bool m_isDoingLoop;
|
|
// call this when you don't want to be interrupted
|
|
void interruptsOff ( ) ;
|
|
// and this to resume being interrupted
|
|
void interruptsOn ( ) ;
|
|
|
|
// the sighupHandler() will set this to 1 when we receive
|
|
// a SIGHUP, 2 if a thread crashed, 3 if we got a SIGPWR
|
|
char m_shutdown;
|
|
|
|
void startBlockedCpuTimer();
|
|
void canQuickPoll(int32_t niceness);
|
|
void setitimerInterval(int32_t niceness);
|
|
|
|
void disableTimer();
|
|
void enableTimer();
|
|
|
|
void quickPoll(int32_t niceness, const char* caller = NULL, int32_t lineno = 0);
|
|
|
|
// called when sigqueue overflows and we gotta do a select() or poll()
|
|
void doPoll ( );
|
|
private:
|
|
|
|
|
|
void unregisterCallback ( Slot **slots , int fd , void *state ,
|
|
void (* callback)(int fd,void *state) ,
|
|
bool silent , // = false );
|
|
bool forReading );
|
|
|
|
bool addSlot ( bool forReading , int fd , void *state ,
|
|
void (* callback)(int fd , void *state ) ,
|
|
int32_t niceness , int32_t tick = 0x7fffffff ) ;
|
|
|
|
// set how long to pause waiting for signals (in milliseconds)
|
|
void setSigWaitTime ( int32_t ms ) ;
|
|
|
|
// now we use a linked list of pre-allocated slots to avoid a malloc
|
|
// failure which can cause the merge to dump with "URGENT MERGE FAILED"
|
|
// message becaise it could not register the sleep wrapper to wait
|
|
Slot *getEmptySlot ( ) ;
|
|
void returnSlot ( Slot *s ) ;
|
|
|
|
// . these arrays map an fd to a Slot (see above for Slot definition)
|
|
// . that slot may chain to other slots if more than one procedure
|
|
// is waiting on a file to become available for reading/writing
|
|
// . these fd's are real, not virtual
|
|
// . m_read/writeFds[i] is NULL if no one is waiting on fd #i
|
|
// . fd of MAX_NUM_FDS is used for sleep callbacks
|
|
// . fd of MAX_NUM_FDS+1 is used for thread exit callbacks
|
|
Slot *m_readSlots [MAX_NUM_FDS+2];
|
|
Slot *m_writeSlots [MAX_NUM_FDS+2];
|
|
|
|
// the minimal tick time in milliseconds (ms)
|
|
int32_t m_minTick;
|
|
|
|
// now we pre-allocate our slots to prevent nasty coredumps from merge
|
|
// because it could not register a sleep callback with us
|
|
Slot *m_slots;
|
|
Slot *m_head;
|
|
Slot *m_tail;
|
|
};
|
|
|
|
extern class Loop g_loop;
|
|
|
|
//#define QUICKPOLL(a) if(g_loop.m_canQuickPoll && g_loop.m_needsToQuickPoll) g_loop.quickPoll(a, __PRETTY_FUNCTION__, __LINE__)
|
|
#define QUICKPOLL(a) if(g_niceness && g_loop.m_needsToQuickPoll) g_loop.quickPoll(a, __PRETTY_FUNCTION__, __LINE__)
|
|
|
|
#endif
|