Removed quickpoll functionality from Loop

All heavy tasks should be offloaded to threads so quickpolls (which we disabled many moons ago) are unnecessary.
This commit is contained in:
Ivan Skytte Jørgensen
2016-08-11 18:38:04 +02:00
parent c2a7663763
commit 5ba9dbc4fd
9 changed files with 10 additions and 284 deletions

143
Loop.cpp

@ -368,11 +368,6 @@ void Loop::callCallbacks_ass ( bool forReading , int fd , int64_t now , int32_t
// ensure we called something
int32_t numCalled = 0;
// a hack fix
if ( niceness == -1 && m_inQuickPoll ) {
niceness = 0;
}
// . now call all the callbacks
// . most will re-register themselves (i.e. call registerCallback...()
while ( s ) {
@ -446,9 +441,6 @@ void Loop::callCallbacks_ass ( bool forReading , int fd , int64_t now , int32_t
}
Loop::Loop ( ) {
m_inQuickPoll = false;
m_needsToQuickPoll = false;
m_canQuickPoll = false;
m_isDoingLoop = false;
// set all callbacks to NULL so we know they're empty
@ -515,8 +507,6 @@ bool Loop::init ( ) {
// . reset this cuz we have no sleep callbacks right now
// . sleep a min of 40ms so g_now is somewhat up to date
m_minTick = 40; //0x7fffffff;
// reset the need to poll flag
m_needToPoll = false;
// make slots
m_slots = (Slot *) mmalloc ( MAX_SLOTS * (int32_t)sizeof(Slot) , "Loop" );
if ( ! m_slots ) return false;
@ -615,9 +605,6 @@ void printStackTrace (bool print_location) {
// TODO: if we get a segfault while saving, what then?
void sigbadHandler ( int x , siginfo_t *info , void *y ) {
// turn off sigalarms
g_loop.disableQuickpollTimer();
log("loop: sigbadhandler. disabling handler from recall.");
// . don't allow this handler to be called again
// . does this work if we're in a thread?
@ -693,9 +680,6 @@ void Loop::runLoop ( ) {
// . now loop forever waiting for signals
// . but every second check for timer-based events
for (;;) {
m_lastPollTime = gettimeofdayInMilliseconds();
m_needsToQuickPoll = false;
g_errno = 0;
if ( m_shutdown ) {
@ -744,12 +728,6 @@ void Loop::doPoll ( ) {
// set time
//g_now = gettimeofdayInMilliseconds();
// . turn it off here so it can be turned on again after we've
// called select() so we don't lose any fd events through the cracks
// . some callbacks we call make trigger another SIGIO, but if they
// fail they should set Loop::g_needToPoll to true
m_needToPoll = false;
logDebug( g_conf.m_logDebugLoop, "loop: Entered doPoll." );
if(g_udpServer.needBottom()) {
@ -760,15 +738,11 @@ void Loop::doPoll ( ) {
timeval v;
v.tv_sec = 0;
if ( m_inQuickPoll ) {
v.tv_usec = 0;
} else {
// 10ms for sleepcallbacks so they can be called...
// and we need this to be the same as sigalrmhandler() since we
// keep track of cpu usage here too, since sigalrmhandler is "VT"
// based it only goes off when that much "cpu time" has elapsed.
v.tv_usec = QUICKPOLL_INTERVAL * 1000;
}
// 10ms for sleepcallbacks so they can be called...
// and we need this to be the same as sigalrmhandler() since we
// keep track of cpu usage here too, since sigalrmhandler is "VT"
// based it only goes off when that much "cpu time" has elapsed.
v.tv_usec = QUICKPOLL_INTERVAL * 1000;
again:
@ -821,8 +795,6 @@ void Loop::doPoll ( ) {
// handle returned threads for niceness 0
g_jobScheduler.cleanup_finished_jobs();
if ( m_inQuickPoll ) goto again;
// high niceness threads
g_jobScheduler.cleanup_finished_jobs();
@ -845,10 +817,10 @@ void Loop::doPoll ( ) {
for ( int32_t i = 0; i < MAX_NUM_FDS; i++) {
// continue if not set for reading
if ( FD_ISSET ( i, &readfds ) ) {
log( LOG_DEBUG, "loop: fd=%" PRId32" is on for read qp=%i", i, ( int ) m_inQuickPoll );
log( LOG_DEBUG, "loop: fd=%" PRId32" is on for read", i);
}
if ( FD_ISSET ( i, &writefds ) ) {
log( LOG_DEBUG, "loop: fd=%" PRId32" is on for write qp=%i", i, ( int ) m_inQuickPoll );
log( LOG_DEBUG, "loop: fd=%" PRId32" is on for write", i);
}
// if niceness is not -1, handle it below
}
@ -901,13 +873,6 @@ void Loop::doPoll ( ) {
// handle returned threads for niceness 0
g_jobScheduler.cleanup_finished_jobs();
//
// the stuff below is not super urgent, do not do if in quickpoll
//
if ( m_inQuickPoll ) {
return;
}
// now for lower priority fds
for ( int32_t i = 0 ; i < s_numReadFds ; i++ ) {
if ( n == 0 ) break;
@ -964,98 +929,6 @@ void Loop::doPoll ( ) {
}
void Loop::quickPoll(int32_t niceness, const char* caller, int32_t lineno) {
if ( ! g_conf.m_useQuickpoll ) return;
// convert
//if ( niceness > 1 ) niceness = 1;
// sometimes we init HashTableX's with a niceness of 0 even though
// g_niceness is 1. so prevent a core below.
if ( niceness == 0 ) return;
// sanity check
if ( g_niceness > niceness ) {
log(LOG_WARN,"loop: niceness mismatch!");
//g_process.shutdownAbort(true); }
}
// sanity -- temporary -- no quickpoll in a thread!!!
//if ( g_threads.amThread() ) { g_process.shutdownAbort(true); }
// if we are niceness 1 and not in a handler, make it niceness 2
// so the handlers can be answered and we don't slow other
// spiders down and we don't slow turks' injections down as much
if ( ! g_inHandler && niceness == 1 ) niceness = 2;
// reset this
g_missedQuickPolls = 0;
if(m_inQuickPoll) {
log(LOG_WARN,
"admin: tried to quickpoll from inside quickpoll");
// this happens when handleRequest3f is called from
// a quickpoll and it deletes a collection and BigFile::close
// calls ThreadQueue::removeThreads and Msg3::doneScanning()
// has niceness 2 and calls quickpoll again!
return;
}
int64_t now = gettimeofdayInMilliseconds();
//int64_t took = now - m_lastPollTime;
int64_t now2 = gettimeofdayInMilliseconds();
int32_t gerrno = g_errno;
m_inQuickPoll = true;
// doPoll() will since we are in quickpoll and only call niceness 0
// callbacks for all the fds. and it will set the timer to 0.
doPoll ();
// reset this again
g_missedQuickPolls = 0;
// . avoid quickpolls within a quickpoll
// . was causing seg fault in diskHeartbeatWrapper()
// which call Threads::bailOnReads()
m_canQuickPoll = false;
// . call sleepcallbacks, like the heartbeat in Process.cpp
// . MAX_NUM_FDS is the fd for sleep callbacks
// . specify a niceness of 0 so only niceness 0 sleep callbacks
// will be called
callCallbacks_ass ( true , MAX_NUM_FDS , now , 0 );
// sanity check
if ( g_niceness > niceness ) {
log("loop: niceness mismatch");
}
//log(LOG_WARN, "xx quickpolled took %" PRId64", waited %" PRId64" from %s",
// now2 - now, now - m_lastPollTime, caller);
m_lastPollTime = now2;
m_inQuickPoll = false;
m_needsToQuickPoll = false;
m_canQuickPoll = true;
g_errno = gerrno;
// reset this again
g_missedQuickPolls = 0;
}
void Loop::canQuickPoll(int32_t niceness) {
if(niceness && !m_shutdown) m_canQuickPoll = true;
else m_canQuickPoll = false;
}
void Loop::enableQuickpollTimer() {
m_canQuickPoll = true;
}
void Loop::disableQuickpollTimer() {
m_canQuickPoll = false;
}
void Loop::wakeupPollLoop() {
char dummy='d';
(void)write(m_pipeFd[1],&dummy,1);
@ -1063,9 +936,7 @@ void Loop::wakeupPollLoop() {
int gbsystem(const char *cmd ) {
g_loop.disableQuickpollTimer();
log("gb: running system(\"%s\")",cmd);
int ret = system(cmd);
g_loop.enableQuickpollTimer();
return ret;
}

16
Loop.h

@ -122,27 +122,13 @@ class Loop {
void wakeupPollLoop();
// set to true by sigioHandler() so doPoll() will be called
bool m_needToPoll;
int64_t m_lastPollTime;
bool m_inQuickPoll;
bool m_needsToQuickPoll;
bool m_canQuickPoll;
bool m_isDoingLoop;
// the sighupHandler() will set this to 1 when we receive
// a SIGHUP, 2 if a thread crashed, 3 if we got a SIGPWR
char m_shutdown;
void canQuickPoll(int32_t niceness);
void enableQuickpollTimer();
void disableQuickpollTimer();
void quickPoll(int32_t niceness, const char* caller = NULL, int32_t lineno = 0);
// called when sigqueue overflows and we gotta do a select() or poll()
void doPoll ( );
private:
@ -189,8 +175,6 @@ class Loop {
extern class Loop g_loop;
//#define QUICKPOLL(a) if(g_loop.m_canQuickPoll && g_loop.m_needsToQuickPoll) g_loop.quickPoll(a, __PRETTY_FUNCTION__, __LINE__)
//#define QUICKPOLL(a) if(g_niceness && g_loop.m_needsToQuickPoll) g_loop.quickPoll(a, __PRETTY_FUNCTION__, __LINE__)
#define QUICKPOLL(a)
#endif // GB_LOOP_H

@ -657,7 +657,6 @@ bool Process::save2 ( ) {
// . return false if blocked/waiting
// . this is the SAVE BEFORE EXITING
bool Process::shutdown2() {
g_loop.disableQuickpollTimer();
// only the main process can call this
if ( pthread_self() != s_mainThreadTid ) {

@ -914,26 +914,6 @@ Profiler::checkMissedQuickPoll( FrameTrace *frame,
const uint32_t stackPtr,
uint32_t *ptr)
{
if(g_niceness == 0 || // !g_loop.m_canQuickPoll ||
!g_loop.m_needsToQuickPoll ||
g_loop.m_inQuickPoll) return;
// Get the time difference from when the function we last saw was
// different. The odds are very good that this time represents the time
// which has been spent in this function.
// const uint64_t time = gettimeofdayInMilliseconds();
// const uint64_t delta = time - g_profiler.m_lastDeltaAddressTime;
// const uint32_t maxDelta = QUICKPOLL_INTERVAL;
//log("delta %i", delta);
// if(delta <= maxDelta) return;
// if(time - g_loop.m_lastPollTime <= maxDelta) return;
// If it got here then there is a good change a quick poll call needs to
// be added.
if(frame) ++frame->missQuickPoll;
//if(!ptr) ptr =(uint32_t *)realTimeProfilerData.getValuePointer(
// uint32_t(stackPtr));
if(!ptr) ptr = (uint32_t *)realTimeProfilerData.getValue((uint32_t *)
&stackPtr);
if(ptr) ++ptr[1];
}
@ -984,12 +964,6 @@ Profiler::getStackFrame() {
continue;
}
// a secret # to indicate missed quickpoll
if ( g_niceness != 0 &&
g_loop.m_needsToQuickPoll &&
! g_loop.m_inQuickPoll )
g_profiler.m_ipBuf.pushLongLong(0x123456789LL);
// indicate end of call stack path
g_profiler.m_ipBuf.pushLongLong(0LL);//addr);

@ -1047,14 +1047,6 @@ bool Rdb::dumpTree ( int32_t niceness ) {
return true;
}
// if we are in a quickpoll do not initiate dump.
// we might have been called by handleRequest4 with a niceness of 0
// which was niceness converted from 1
if ( g_loop.m_inQuickPoll ) {
logTrace( g_conf.m_logTraceRdb, "END. %s: In quick poll. Returning true", m_dbname );
return true;
}
// bail if already dumping
//if ( m_dump.isDumping() ) return true;
if ( m_inDumpLoop ) {

@ -1457,13 +1457,6 @@ bool RdbBase::attemptMerge( int32_t niceness, bool forceMergeAll, bool doLog , i
}
// sanity checks
if ( g_loop.m_inQuickPoll ) {
logTrace( g_conf.m_logTraceRdbBase, "END, in QuickPoll (?)" );
log("rdb: cant attempt merge in quickpoll");
return false;
}
if ( niceness == 0 ) { g_process.shutdownAbort(true); }
if ( forceMergeAll ) m_nextMergeForced = true;

@ -1681,7 +1681,6 @@ void indexedDocWrapper ( void *state ) {
//run at niceness 0 but most of the spider pipeline
//cannot. we should really just make injection run at
//MAX_NICENESS. OK, done! mdw
//if ( g_loop.m_inQuickPoll ) return;
// . continue gettings Spider recs to spider
// . if it's already waiting for a list it'll just return
// . mdw: keep your eye on this, it was commented out

@ -2352,7 +2352,7 @@ void TcpServer::readTimeoutPoll ( ) {
// if in a high niceness callback we can only serve
// low niceness (0) sockets at this point. because we might
// do a double callback on a socket that have niceness 1...
if ( g_loop.m_inQuickPoll && s->m_niceness != 0 ) continue;
// close if need be. we added this delayed closing logic because
// the transmission was getting truncated somehow, and i even tried
// the SO_LINGER crap to no avail. so this is basically our own linger

@ -857,7 +857,6 @@ void UdpServer::process(int64_t now, int32_t maxNiceness) {
}
else {
m_needBottom = true;
//g_loop.m_needToPoll = true;
}
}
@ -1325,7 +1324,7 @@ bool UdpServer::makeCallbacks(int32_t niceness) {
return false;
}
logDebug(g_conf.m_logDebugUdp, "udp: makeCallbacks: start. nice=%" PRId32" inquickpoll=%" PRId32, niceness,(int32_t)g_loop.m_inQuickPoll);
logDebug(g_conf.m_logDebugUdp, "udp: makeCallbacks: start. nice=%" PRId32, niceness);
// assume noone called
int32_t numCalled = 0;
@ -1362,15 +1361,6 @@ bool UdpServer::makeCallbacks(int32_t niceness) {
// and if they do not get called right away can cause this host
// to bottleneck many hosts
if ( pass == 0 ) {
// never call any high niceness handler/callback when
// we are already in quickpoll
if ( g_loop.m_inQuickPoll &&
slot->getNiceness() != 0 ) continue;
// never call a msg4 handler even if it has niceness 0
// if we are in quickpoll, because we might be in
// the Msg4.cpp code already!
if ( g_loop.m_inQuickPoll &&
slot->getMsgType() == msg_type_4 ) continue;
// only call handlers in pass 0, not reply callbacks
if ( slot->hasCallback() ) continue;
// only call certain msg handlers...
@ -1398,77 +1388,7 @@ bool UdpServer::makeCallbacks(int32_t niceness) {
continue;
}
}
// if slot niceness is 1 and we are in a quickpoll, then
// change niceness to 0 if its a 0x2c or a get taglist handler
// request. this makes it so a spider that is deep into
// parsing sections or whatever will still handle some
// popular niceness 1 requests and not hold all the other
// spiders up.
if ( g_loop.m_inQuickPoll &&
! slot->hasCallback() && // must be a handler request
// must have been sitting there for 500ms+
// also consider using slot->m_lastActionTime
startTime - slot->getStartTime() > 500 &&
// only do it for these guys now to make sure it
// doesn't hurt the queries coming in
(slot->getMsgType() == msg_type_13 ||
slot->getMsgType() == msg_type_c) &&
this != &g_dns.m_udpServer &&
slot->getNiceness() == 1 &&
! slot->m_convertedNiceness &&
// can't be in a quickpoll in its own handler!!!
// we now set this to true BEFORE calling the handler
// so if we are in the handler now but in a quickpoll
// then we do not re-enter the handler!!
! slot->hasCalledHandler() &&
slot->m_readBufSize > 0 &&
slot->m_sendBufSize == 0 &&
doNicenessConversion &&
m_outstandingConverts < 20 ) {
logDebug(g_conf.m_logDebugUdp, "udp: converting slot from niceness 1 to 0. slot=%p mmsgtype=0x%02x",
slot, slot->getMsgType());
// convert the niceness
slot->m_niceness = 0;
// count it
m_outstandingConverts++;
// flag it somehow so we can decrement
// m_outstandingConverts after we call the handler
// and send back the reply
slot->m_convertedNiceness = 1;
}
// . conversion for dns callbacks
// . usually the callback is gotIpWrapper() in MsgC.cpp i guess
if ( g_loop.m_inQuickPoll &&
! slot->m_convertedNiceness &&
this == &g_dns.m_udpServer &&
slot->hasCallback() &&
slot->getNiceness() == 1 &&
// can't be in a quickpoll in its own handler!!!
// we now set this to true BEFORE calling the handler
// so if we are in the handler now but in a quickpoll
// then we do not re-enter the handler!!
! slot->hasCalledCallback() &&
slot->m_readBufSize > 0 &&
slot->m_sendBufSize > 0 &&
doNicenessConversion &&
m_outstandingConverts < 20 ) {
logDebug(g_conf.m_logDebugUdp, "udp: converting slot2 from niceness 1 to 0. slot=%p mmsgtype=0x%02x",
slot, slot->getMsgType());
// convert the niceness
slot->m_niceness = 0;
// count it
m_outstandingConverts++;
// flag it somehow so we can decrement
// m_outstandingConverts after we call the handler
// and send back the reply
slot->m_convertedNiceness = 1;
}
// never call any high niceness handler/callback when
// we are already in quickpoll
if ( g_loop.m_inQuickPoll && slot->getNiceness() != 0 ) continue;
// skip if not level we want
if ( niceness <= 0 && slot->getNiceness() > 0 && pass>0) continue;
// set g_errno before calling
@ -1534,7 +1454,6 @@ bool UdpServer::makeCallbacks(int32_t niceness) {
// elapsed, numCalled,
// g_profiler.getFnName(cbAddr));
//}
//g_loop.m_needToPoll = true;
m_needBottom = true;
// now we just finish out the list with a
// lower niceness
@ -1550,7 +1469,7 @@ bool UdpServer::makeCallbacks(int32_t niceness) {
// from within a quickpoll. so if we are not in a
// quickpoll, we have to reset the linked list scan after
// calling makeCallback(slot) below.
if ( ! g_loop.m_inQuickPoll ) goto fullRestart;
goto fullRestart;
}
// clear
g_errno = 0;
@ -1585,9 +1504,6 @@ bool UdpServer::makeCallback(UdpSlot *slot) {
// return false;
//}
//only allow a quickpoll if we are nice.
//g_loop.canQuickPoll(slot->m_niceness);
// for timing callbacks and handlers
int64_t start = 0;
int64_t now ;
@ -2041,8 +1957,6 @@ bool UdpServer::readTimeoutPoll ( int64_t now ) {
for ( UdpSlot *slot = m_activeListHead ; slot ; slot = slot->m_activeListNext ) {
// clear g_errno
g_errno = 0;
// only deal with niceness 0 slots when in a quickpoll
if ( g_loop.m_inQuickPoll && slot->getNiceness() != 0 ) continue;
// debug msg
if ( g_conf.m_logDebugUdp ) {
log(LOG_DEBUG,