250 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			250 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| // Matt Wells, Copyright Apr 2001
 | |
| 
 | |
| // . core class for handling interupts-based i/o on non-blocking descriptors
 | |
| // . when an fd/state/callback is registered for reading we call your callback //   when fd has a read event (same for write and sleeping)
 | |
| 
 | |
| #ifndef _LOOP_H_
 | |
| #define _LOOP_H_
 | |
| 
 | |
| #include <sys/time.h>
 | |
| #include <sys/types.h>
 | |
| #include <signal.h>
 | |
| #include <fcntl.h>      // fcntl()
 | |
| #include <sys/poll.h>   // POLLIN, POLLPRI, ...
 | |
| #ifndef F_SETSIG
 | |
| #define F_SETSIG 10     // F_SETSIG
 | |
| #endif
 | |
| #include "Mem.h"        // mmalloc, mfree
 | |
| #define QUERYPRIORITYWEIGHT 16
 | |
| #define QUICKPOLL_INTERVAL 10
 | |
| 
 | |
| int gbsystem(char *cmd);
 | |
| FILE* gbpopen(char* cmd);
 | |
| 
 | |
| 
 | |
| #define sleep(a) { char *xx=NULL;*xx=0; }
 | |
| //#define sleep(a) logf(LOG_INFO,"sleep: sleep"); 
 | |
| 
 | |
| // we have 2 arrays of slots, m_readSlots and m_writeSlots
 | |
| class Slot {
 | |
|  public:
 | |
| 	void   *m_state;
 | |
| 	void  (* m_callback)(int fd, void *state);
 | |
| 	// the next Slot that's registerd on this fd
 | |
| 	Slot   *m_next;
 | |
| 	// save niceness level for doPoll() to segregate
 | |
| 	int32_t    m_niceness;
 | |
| 	// last time we called m_callback for this fd
 | |
| 	//	time_t  m_lastActivity;
 | |
| 	// . when should this fd timeout and we call the callback with
 | |
| 	//   errno set to ETIMEDOUT
 | |
| 	// . set to -1 for never timeout
 | |
| 	// . m_timeout is in seconds
 | |
| 	//	int32_t    m_timeout;     
 | |
| 	// this callback should be called every X milliseconds
 | |
| 	int32_t      m_tick;
 | |
| 	// when we were last called in ms time (only valid for sleep callbacks)
 | |
| 	int64_t m_lastCall;
 | |
| 	// linked list of available slots
 | |
| 	Slot     *m_nextAvail;
 | |
| };
 | |
| 
 | |
| #define  sleep(a) { char *xx=NULL;*xx=0; }
 | |
| #define usleep(a) { char *xx=NULL;*xx=0; }
 | |
| //#define sleep(a) logf(LOG_INFO,"sleep: sleep"); 
 | |
| 
 | |
| // linux 2.2 kernel has this limitation
 | |
| #define MAX_NUM_FDS 1024
 | |
| 
 | |
| 
 | |
| // . niceness can only be 0, 1 or 2
 | |
| // . we use 0 for query traffic
 | |
| // . we use 1 for merge disk threads
 | |
| // . we use 2 for indexing/spidering
 | |
| // . 0  will use the high priority udp server, g_udpServer2
 | |
| // . 1+ will use the low  priority udp server, g_udpServer
 | |
| // . 0  niceness for disk threads will cancel other threads before launching
 | |
| // . 1+ niceness threads will be set to lowest priority using setpriority()
 | |
| // . 1  niceness disk thread, when running, will not allow niceness 2 to launch
 | |
| //#define MAX_NICENESS 2
 | |
| 
 | |
| // are we using async signal handlers?
 | |
| extern bool g_isHot;
 | |
| 
 | |
| // extern variable that let's us know if we're in a sig handler
 | |
| extern bool g_inSigHandler;
 | |
| 
 | |
| // a debugging tool really
 | |
| extern bool g_interruptsOn ;
 | |
| 
 | |
| // are there pending signals for which we should call g_udpServer.makeCallbacks
 | |
| extern bool g_someAreQueued;
 | |
| 
 | |
| // . so sig handler can get an approx time of day
 | |
| // . gettimeofday() is not async signal safe
 | |
| // . this is now the "local" time, not synced
 | |
| extern int64_t g_now;
 | |
| 
 | |
| // . this is now the time synced with host #0
 | |
| //extern int64_t g_nowGlobal;
 | |
| 
 | |
| //this is the approximate time generated by the timer.
 | |
| extern int64_t g_nowApprox;
 | |
| 
 | |
| // count of how many SIGVTALRM signals we had so far
 | |
| extern int32_t g_numAlarms;
 | |
| extern int32_t g_numVTAlarms;
 | |
| extern int32_t g_numQuickPolls;
 | |
| 
 | |
| extern int32_t g_numSigChlds;
 | |
| extern int32_t g_numSigQueues;
 | |
| extern int32_t g_numSigPipes;
 | |
| extern int32_t g_numSigIOs;
 | |
| extern int32_t g_numSigOthers;
 | |
| 
 | |
| 
 | |
| extern char g_niceness ;
 | |
| 
 | |
| // we make sure the same callback/handler is not hogging the cpu when it is
 | |
| // niceness 0 and we do not interrupt it, so this is a critical check
 | |
| extern class UdpSlot *g_callSlot;
 | |
| 
 | |
| class Loop {
 | |
| 
 | |
|  public:
 | |
| 
 | |
| 	// constructor and stuff
 | |
| 	Loop();
 | |
| 	~Loop();
 | |
| 
 | |
| 	// free up all our mem
 | |
| 	void reset();
 | |
| 
 | |
| 	// set up the signal handlers or block the signals for queueing
 | |
| 	bool init();
 | |
| 	
 | |
| 	// . call this to begin polling/selecting of all registered fds
 | |
| 	// . returns false on error
 | |
| 	bool runLoop();
 | |
| 
 | |
| 	// . register this "fd" with "callback"
 | |
| 	// . "callback" will be called when fd is ready for reading
 | |
| 	// . "timeout" is -1 if this never timesout
 | |
| 	bool registerReadCallback  ( int   fd    ,
 | |
| 				     void *state , 
 | |
| 				     void (* callback)(int fd,void *state ) ,
 | |
| 				     int32_t  niceness );//= MAX_NICENESS ) ;
 | |
| 
 | |
| 	// . register this "fd" with "callback"
 | |
| 	// . "callback" will be called when fd is ready for reading
 | |
| 	// . "callback" will be called when there is an error on fd
 | |
| 	bool registerWriteCallback ( int   fd    ,
 | |
| 				     void *state ,
 | |
| 				     void (* callback)(int fd, void *state ) , 
 | |
| 	 			     int32_t   niceness ); 
 | |
| 
 | |
| 	// . register this callback to be called every second
 | |
| 	// . TODO: implement "seconds" parameter
 | |
| 	bool registerSleepCallback ( int32_t milliseconds ,
 | |
| 				     void *state, 
 | |
| 				     void (* callback)(int fd,void *state ) ,
 | |
| 				     int32_t niceness = 1 );
 | |
| 
 | |
| 	// unregister call back for reading, writing or sleeping
 | |
| 	void unregisterReadCallback  ( int fd, void *state ,
 | |
| 				       void (* callback)(int fd,void *state),
 | |
| 				       bool silent = false );
 | |
| 	void unregisterWriteCallback ( int fd, void *state ,
 | |
| 	 			       void (* callback)(int fd,void *state)); 
 | |
| 	void unregisterSleepCallback ( void *state ,
 | |
| 				       void (* callback)(int fd,void *state));
 | |
| 
 | |
| 	// sets up for signal capture by us, g_loop
 | |
| 	bool setNonBlocking ( int fd , int32_t niceness ) ;
 | |
| 
 | |
| 	// . keep this public so sighandler() can call it
 | |
| 	// . we also call it from HttpServer::getMsgPieceWrapper() to
 | |
| 	//   notify a socket that it's m_sendBuf got some new data to send
 | |
| 	void callCallbacks_ass (bool forReading, int fd, int64_t now = 0LL,
 | |
| 				int32_t niceness = -1 );
 | |
| 
 | |
| 	// set to true by sigioHandler() so doPoll() will be called
 | |
| 	bool m_needToPoll;
 | |
| 
 | |
| 	
 | |
| 	int64_t   m_lastPollTime;
 | |
| 	bool        m_inQuickPoll;
 | |
| 	bool        m_needsToQuickPoll;
 | |
| 	bool        m_canQuickPoll;
 | |
| 	itimerval   m_quickInterrupt;
 | |
| 	itimerval   m_realInterrupt;
 | |
| 	itimerval   m_noInterrupt;
 | |
| 	bool        m_isDoingLoop;
 | |
| 	// call this when you don't want to be interrupted
 | |
| 	void interruptsOff ( ) ;
 | |
| 	// and this to resume being interrupted
 | |
| 	void interruptsOn ( ) ;
 | |
| 
 | |
| 	// the sighupHandler() will set this to 1 when we receive
 | |
| 	// a SIGHUP, 2 if a thread crashed, 3 if we got a SIGPWR
 | |
| 	char m_shutdown;
 | |
| 
 | |
| 	void startBlockedCpuTimer();
 | |
| 	void canQuickPoll(int32_t niceness);
 | |
| 	void setitimerInterval(int32_t niceness);
 | |
| 
 | |
| 	void disableTimer();
 | |
| 	void enableTimer();
 | |
| 
 | |
| 	void quickPoll(int32_t niceness, const char* caller = NULL, int32_t lineno = 0);
 | |
| 
 | |
| 	// called when sigqueue overflows and we gotta do a select() or poll()
 | |
| 	void doPoll ( );
 | |
|  private:
 | |
| 
 | |
| 
 | |
| 	void unregisterCallback ( Slot **slots , int fd , void *state ,
 | |
| 				  void (* callback)(int fd,void *state) ,
 | |
| 				  bool silent , // = false );
 | |
| 				  bool forReading );
 | |
| 
 | |
| 	bool addSlot ( bool forReading , int fd , void *state , 
 | |
| 		       void (* callback)(int fd , void *state ) ,
 | |
| 		       int32_t niceness , int32_t tick = 0x7fffffff ) ;
 | |
| 
 | |
| 	// set how long to pause waiting for signals (in milliseconds)
 | |
| 	void setSigWaitTime ( int32_t ms ) ;
 | |
| 
 | |
| 	// now we use a linked list of pre-allocated slots to avoid a malloc
 | |
| 	// failure which can cause the merge to dump with "URGENT MERGE FAILED"
 | |
| 	// message becaise it could not register the sleep wrapper to wait
 | |
| 	Slot *getEmptySlot (         ) ;
 | |
| 	void  returnSlot   ( Slot *s ) ;
 | |
| 
 | |
| 	// . these arrays map an fd to a Slot (see above for Slot definition)
 | |
| 	// . that slot may chain to other slots if more than one procedure
 | |
| 	//   is waiting on a file to become available for reading/writing
 | |
| 	// . these fd's are real, not virtual
 | |
| 	// . m_read/writeFds[i] is NULL if no one is waiting on fd #i
 | |
| 	// . fd of MAX_NUM_FDS   is used for sleep callbacks
 | |
| 	// . fd of MAX_NUM_FDS+1 is used for thread exit callbacks
 | |
| 	Slot *m_readSlots  [MAX_NUM_FDS+2];
 | |
| 	Slot *m_writeSlots [MAX_NUM_FDS+2];
 | |
| 
 | |
| 	// the minimal tick time in milliseconds (ms)
 | |
| 	int32_t m_minTick;
 | |
| 
 | |
| 	// now we pre-allocate our slots to prevent nasty coredumps from merge
 | |
| 	// because it could not register a sleep callback with us
 | |
| 	Slot *m_slots;
 | |
| 	Slot *m_head;
 | |
| 	Slot *m_tail;
 | |
| };
 | |
| 
 | |
| extern class Loop g_loop;
 | |
| 
 | |
| //#define QUICKPOLL(a) if(g_loop.m_canQuickPoll && g_loop.m_needsToQuickPoll) g_loop.quickPoll(a, __PRETTY_FUNCTION__, __LINE__)
 | |
| #define QUICKPOLL(a) if(g_niceness && g_loop.m_needsToQuickPoll) g_loop.quickPoll(a, __PRETTY_FUNCTION__, __LINE__)
 | |
| 
 | |
| #endif
 |