Merge branch 'testing'

This commit is contained in:
mwells
2014-08-27 20:00:49 -06:00
18 changed files with 692 additions and 316 deletions

@ -6,7 +6,7 @@
#include "DiskPageCache.h"
#include "RdbMap.h" // GB_PAGE_SIZE
#include "Indexdb.h"
#include "Profiler.h"
// types.h uses key_t type that shmget uses
#undef key_t
@ -512,6 +512,8 @@ void DiskPageCache::addPages ( long vfd,
if ( m_switch && ! *m_switch ) return;
// sometimes the file got unlinked on us
if ( ! m_memOff[vfd] ) return;
// for some reason profiler cores all the time in here
if ( g_profiler.m_realTimeProfilerRunning ) return;
// what is the page range?
long long sp = offset / m_pageSize ;
// point to it
@ -539,6 +541,11 @@ void DiskPageCache::addPages ( long vfd,
char *DiskPageCache::getMemPtrFromOff ( long off ) {
if ( off < 0 ) return NULL; // NULL means not in DiskPageCache
// for some reason profiler cores all the time in here
// and m_numPageSets is 0 like we got reset
if ( g_profiler.m_realTimeProfilerRunning ) return NULL;
// get set number
long sn = off / m_maxPageSetSize ;
// get offset from within the chunk of memory (within the set)

677
Loop.cpp

@ -122,6 +122,9 @@ static void sighupHandler ( int x , siginfo_t *info , void *y ) ;
static void sigioHandler ( int x , siginfo_t *info , void *y ) ;
static void sigalrmHandler( int x , siginfo_t *info , void *y ) ;
long g_fdWriteBits[MAX_NUM_FDS/32];
long g_fdReadBits [MAX_NUM_FDS/32];
void Loop::unregisterReadCallback ( int fd, void *state ,
void (* callback)(int fd,void *state),
bool silent ){
@ -344,7 +347,7 @@ bool Loop::setNonBlocking ( int fd , long niceness ) {
#ifdef _POLLONLY_
return true;
#endif
// truncate nicess cuz we only get GB_SIGRTMIN+1 to GB_SIGRTMIN+2 signals
// trunc nicess cuz we only get GB_SIGRTMIN+1 to GB_SIGRTMIN+2 signals
if ( niceness < -1 ) niceness = -1;
if ( niceness > MAX_NICENESS ) niceness = MAX_NICENESS;
// debug msg
@ -547,6 +550,101 @@ void Loop::returnSlot ( Slot *s ) {
m_head = s;
}
// . come here when we get a GB_SIGRTMIN+X signal etc.
// . do not call anything from here because the purpose of this is to just
// queue the signals up and DO DEDUPING which linux does not do causing
// the sigqueue to overflow.
// . we should break out of the sleep loop after the signal is handled
// so we can handle/process the queued signals properly. 'man sleep'
// states "sleep() makes the calling process sleep until seconds
// seconds have elapsed or a signal arrives which is not ignored."
void sigHandlerQueue_r ( int x , siginfo_t *info , void *v ) {
// if we just needed to cleanup a thread
if ( info->si_signo == SIGCHLD ) {
// this has no fd really, Threads.cpp just sends it when
// the thread is done
g_threads.m_needsCleanup = true;
return;
}
if ( info->si_code == SI_QUEUE ) {
//log("admin: got sigqueue");
// the thread is done
g_threads.m_needsCleanup = true;
return;
}
// extract the file descriptor that needs attention
int fd = info->si_fd;
if ( fd >= MAX_NUM_FDS ) {
log("loop: CRITICAL ERROR. fd=%i > %i",fd,(int)MAX_NUM_FDS);
return;
}
// set the right callback
// info->si_band values:
//#define POLLIN 0x0001 /* There is data to read */
//#define POLLPRI 0x0002 /* There is urgent data to read */
//#define POLLOUT 0x0004 /* Writing now will not block */
//#define POLLERR 0x0008 /* Error condition */
//#define POLLHUP 0x0010 /* Hung up */
//#define POLLNVAL 0x0020 /* Invalid request: fd not open */
int band = info->si_band;
// translate SIGPIPE's to band of POLLHUP
if ( info->si_signo == SIGPIPE ) {
band = POLLHUP;
//log("loop: Received SIGPIPE signal. Broken pipe.");
}
// . call the appropriate handler(s)
// . TODO: bitch if no callback to handle the read!!!!!!!
// . NOTE: when it's connected it sets both POLLIN and POLLOUT
// . NOTE: or when a socket is trying to connect to it if it's listener
//if ( band & (POLLIN | POLLOUT) == (POLLIN | POLLOUT) )
// g_loop.callCallbacks_ass ( true/*forReading?*/ , fd );
if ( band & POLLIN ) {
// keep stats on this now since some linuxes dont work right
g_stats.m_readSignals++;
//log("Loop: read %lli fd=%i",gettimeofdayInMilliseconds(),fd);
//g_loop.callCallbacks_ass ( true , fd );
g_fdReadBits[fd/32] = 1<<(fd%32);
}
else if ( band & POLLPRI ) {
// keep stats on this now since some linuxes dont work right
g_stats.m_readSignals++;
//log("Loop: read %lli fd=%i",gettimeofdayInMilliseconds(),fd);
//g_loop.callCallbacks_ass ( true , fd ) ;
g_fdReadBits[fd/32] = 1<<(fd%32);
}
else if ( band & POLLOUT ) {
// keep stats on this now since some linuxes dont work right
g_stats.m_writeSignals++;
//log("Loop: write %lli fd=%i",gettimeofdayInMilliseconds(),fd)
//g_loop.callCallbacks_ass ( false , fd );
g_fdWriteBits[fd/32] = 1<<(fd%32);
}
// fix qainject1() test with this
else if ( band & POLLERR ) {
//log(LOG_INFO,"loop: got POLLERR on fd=%i.",fd);
}
//g_loop.callCallbacks_ass ( false , fd );
// this happens if the socket closes abruptly
// or out of band data, etc... see "man 2 poll" for more info
else if ( band & POLLHUP ) {
// i see these all the time for fd == 0, so don't print it
//if ( fd != 0 )
// log(LOG_INFO,"loop: Received hangup on fd=%i.",fd);
// it is ready for writing i guess
g_fdWriteBits[fd/32] = 1<<(fd%32);
}
}
bool Loop::init ( ) {
// redhat 9's NPTL doesn't like our async signals
if ( ! g_conf.m_allowAsyncSignals ) g_isHot = false;
@ -581,7 +679,7 @@ bool Loop::init ( ) {
// . when using threads GB_SIGRTMIN becomes 35, not 32 anymore
// since threads use these signals to reactivate suspended threads
// . debug msg
//log("GB_SIGRTMIN=%li", GB_SIGRTMIN );
//log("admin: GB_SIGRTMIN=%li", (long)GB_SIGRTMIN );
// . block the GB_SIGRTMIN signal
// . anytime this is raised it goes onto the signal queue
// . we use sigtimedwait() to get signals off the queue
@ -611,10 +709,43 @@ bool Loop::init ( ) {
// . block on any signals in this set (in addition to current sigs)
// . use SIG_UNBLOCK to remove signals from block list
// . this returns -1 and sets g_errno on error
if ( sigprocmask ( SIG_BLOCK, &sigs, 0 ) < 0 ) {
g_errno = errno;
return log("loop: sigprocmask: %s.",strerror(g_errno));
}
// . we block a signal so it does not interrupt us, then we can
// take it off using our call to sigtimedwait()
// . allow it to interrupt us now and we will queue it ourselves
// to prevent the linux queue from overflowing
// . see 'cat /proc/<pid>/status | grep SigQ' output to see if
// overflow occurs. linux does not dedup the signals so when a
// host cpu usage hits 100% it seems to miss a ton of signals.
// i suspect the culprit is pthread_create() so we need to get
// thread pools out soon.
// . now we are handling the signals and queueing them ourselves
// so comment out this sigprocmask() call
// if ( sigprocmask ( SIG_BLOCK, &sigs, 0 ) < 0 ) {
// g_errno = errno;
// return log("loop: sigprocmask: %s.",strerror(g_errno));
// }
struct sigaction sa2;
// . sa_mask is the set of signals that should be blocked when
// we're handling the GB_SIGRTMIN, make this empty
// . GB_SIGRTMIN signals will be automatically blocked while we're
// handling a SIGIO signal, so don't worry about that
// . what sigs should be blocked when in our handler? the same
// sigs we are handling i guess
memcpy ( &sa2.sa_mask , &sigs , sizeof(sigs) );
sa2.sa_flags = SA_SIGINFO ; //| SA_ONESHOT;
// call this function
sa2.sa_sigaction = sigHandlerQueue_r;
g_errno = 0;
if ( sigaction ( SIGPIPE, &sa2, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( GB_SIGRTMIN , &sa2, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( GB_SIGRTMIN + 1, &sa2, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( GB_SIGRTMIN + 2, &sa2, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( GB_SIGRTMIN + 3, &sa2, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( SIGCHLD, &sa2, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( SIGIO, &sa2, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction(): %s.", mstrerror(g_errno) );
// . we turn this signal on/off to turn interrupts off/on
// . clear all signals from the set
//sigemptyset ( &m_sigrtmin );
@ -644,7 +775,7 @@ bool Loop::init ( ) {
// now when we got an unblocked GB_SIGRTMIN signal go here right away
#ifndef _VALGRIND_
if ( sigaction ( GB_SIGRTMIN, &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction GB_SIGRTMIN: %s.", mstrerror(errno));
if ( g_errno)log("loop: sigaction GB_SIGRTMIN: %s.", mstrerror(errno));
#endif
// set it this way for SIGIO's
@ -711,6 +842,7 @@ bool Loop::init ( ) {
//setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL);
sa.sa_sigaction = sigalrmHandler;
// it's gotta be real time, not virtual cpu time now
//if ( sigaction ( SIGALRM, &sa, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( SIGVTALRM, &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction SIGBUS: %s.", mstrerror(errno));
@ -884,6 +1016,38 @@ void sigalrmHandler ( int x , siginfo_t *info , void *y ) {
//logf(LOG_DEBUG, "xxx now: %lli! approx: %lli", g_now, g_nowApprox);
}
static sigset_t s_rtmin;
void maskSignals() {
static bool s_init = false;
if ( ! s_init ) {
s_init = true;
sigemptyset ( &s_rtmin );
sigaddset ( &s_rtmin, GB_SIGRTMIN );
sigaddset ( &s_rtmin, GB_SIGRTMIN + 1 );
sigaddset ( &s_rtmin, GB_SIGRTMIN + 2 );
sigaddset ( &s_rtmin, GB_SIGRTMIN + 3 );
sigaddset ( &s_rtmin, SIGCHLD );
sigaddset ( &s_rtmin, SIGIO );
sigaddset ( &s_rtmin, SIGPIPE );
}
// block it
if ( sigprocmask ( SIG_BLOCK , &s_rtmin, 0 ) < 0 ) {
log("loop: maskSignals: sigprocmask: %s.", strerror(errno));
return;
}
}
void unmaskSignals() {
// unblock it
if ( sigprocmask ( SIG_UNBLOCK , &s_rtmin, 0 ) < 0 ) {
log("loop: unmaskSignals: sigprocmask: %s.", strerror(errno));
return;
}
}
// shit, we can't make this realtime!! RdbClose() cannot be called by a
// real time sig handler
@ -928,8 +1092,9 @@ bool Loop::runLoop ( ) {
//struct timespec t = { 0 /*seconds*/, 10000000 /*nanoseconds*/};
// grab any high priority sig first
siginfo_t info ;
long sigNum ; //= sigwaitinfo ( &sigs1, &info );
//siginfo_t info ;
//long sigNum ; //= sigwaitinfo ( &sigs1, &info );
#endif
s_lastTime = 0;
@ -940,222 +1105,328 @@ bool Loop::runLoop ( ) {
enableTimer();
long long elapsed;
// . now loop forever waiting for signals
// . but every second check for timer-based events
do {
g_now = gettimeofdayInMilliseconds();
BIGLOOP:
g_now = gettimeofdayInMilliseconds();
//set the time back to its exact value and reset
//the timer.
g_nowApprox = g_now;
// MDW: won't this hog cpu? just don't disable it in
// Process::save2() any more and it should be ok
//enableTimer();
m_lastPollTime = g_now;
m_needsToQuickPoll = false;
//set the time back to its exact value and reset
//the timer.
g_nowApprox = g_now;
// MDW: won't this hog cpu? just don't disable it in
// Process::save2() any more and it should be ok
//enableTimer();
m_lastPollTime = g_now;
m_needsToQuickPoll = false;
/*
// test the heartbeat core...
if ( g_numAlarms > 100 ) {
/*
// test the heartbeat core...
if ( g_numAlarms > 100 ) {
goo:
long j;
for ( long k = 0 ; k < 2000000000 ; k++ ) {
j=k *5;
long j;
for ( long k = 0 ; k < 2000000000 ; k++ ) {
j=k *5;
}
goto goo;
}
*/
g_errno = 0;
if ( m_shutdown ) {
// a msg
if (m_shutdown==1)
log(LOG_INIT,"loop: got SIGHUP or SIGTERM.");
else if (m_shutdown==2)
log(LOG_INIT,"loop: got SIGBAD in thread.");
else
log(LOG_INIT,"loop: got SIGPWR.");
// . turn off interrupts here because it doesn't help to do
// it in the thread
// . TODO: turn off signals for sigbadhandler()
interruptsOff();
// if thread got the signal, just wait for him to save all
// Rdbs and then dump core
if ( m_shutdown == 2 ) {
//log(0,"Thread is saving & shutting down urgently.");
//while ( 1 == 1 ) sleep (50000);
log("loop: Resuming despite thread crash.");
m_shutdown = 0;
goto BIGLOOP;
}
goto goo;
}
*/
// otherwise, thread did not save, so we must do it
log ( LOG_INIT ,"loop: Saving and shutting down urgently.");
// . this will save all Rdb's and dump core
// . since "urgent" is true it won't broadcast its shutdown
// to all hosts
//#ifndef NO_MAIN
//mainShutdown( true ); // urgent?
//#endif
g_process.shutdown ( true );
}
g_errno = 0;
//g_udpServer2.sendPoll_ass(true,g_now);
//g_udpServer2.process_ass ( g_now );
// MDW: see if this works without this junk, if not then
// put it back in
// seems like never getting signals on redhat 16-core box.
// we always process dgrams through this... wtf? try taking
// it out and seeing what's happening
//g_udpServer.sendPoll_ass (true,g_now);
//g_udpServer.process_ass ( g_now );
// and dns now too
//g_dns.m_udpServer.sendPoll_ass(true,g_now);
//g_dns.m_udpServer.process_ass ( g_now );
if ( m_shutdown ) {
// a msg
if (m_shutdown==1)
log(LOG_INIT,"loop: got SIGHUP or SIGTERM.");
else if (m_shutdown==2)
log(LOG_INIT,"loop: got SIGBAD in thread.");
else
log(LOG_INIT,"loop: got SIGPWR.");
// . turn off interrupts here because it doesn't help to do
// it in the thread
// . TODO: turn off signals for sigbadhandler()
interruptsOff();
// if thread got the signal, just wait for him to save all
// Rdbs and then dump core
if ( m_shutdown == 2 ) {
//log (0,"Thread is saving and shutting down urgently.");
//while ( 1 == 1 ) sleep (50000);
log("loop: Resuming despite thread crash.");
m_shutdown = 0;
continue;
}
// otherwise, thread did not save, so we must do it
log ( LOG_INIT ,"loop: Saving and shutting down urgently.");
// . this will save all Rdb's and dump core
// . since "urgent" is true it won't broadcast its shutdown
// to all hosts
//#ifndef NO_MAIN
//mainShutdown( true ); // urgent?
//#endif
g_process.shutdown ( true );
}
// if there was a high niceness http request within a
// quickpoll, we stored it and now we'll call it here.
//g_httpServer.callQueuedPages();
//g_udpServer2.sendPoll_ass(true,g_now);
//g_udpServer2.process_ass ( g_now );
// MDW: see if this works without this junk, if not then
// put it back in
g_udpServer.sendPoll_ass (true,g_now);
g_udpServer.process_ass ( g_now );
// and dns now too
g_dns.m_udpServer.sendPoll_ass(true,g_now);
g_dns.m_udpServer.process_ass ( g_now );
//g_udpServer.printState ( );
// if there was a high niceness http request within a
// quickpoll, we stored it and now we'll call it here.
//g_httpServer.callQueuedPages();
// if ( g_someAreQueued ) {
// // assume none are queued now, we may get interrupted
// // and it may get set back to true
// g_someAreQueued = false;
// //g_udpServer2.makeCallbacks_ass ( 0 );
// //g_udpServer2.makeCallbacks_ass ( 1 );
// }
//g_udpServer.printState ( );
if ( g_someAreQueued ) {
// assume none are queued now, we may get interrupted
// and it may get set back to true
g_someAreQueued = false;
//g_udpServer2.makeCallbacks_ass ( 0 );
//g_udpServer2.makeCallbacks_ass ( 1 );
}
// if ( g_threads.m_needsCleanup ) {
// // bitch about
// static bool s_bitched = false;
// if ( ! s_bitched ) {
// log(LOG_REMIND,"loop: Lost thread signal.");
// s_bitched = true;
// }
// if ( g_threads.m_needsCleanup ) {
// // bitch about
// static bool s_bitched = false;
// if ( ! s_bitched ) {
// log(LOG_REMIND,"loop: Lost thread signal.");
// s_bitched = true;
// }
// }
//cleanup and launch threads:
//g_threads.printState();
g_threads.timedCleanUp(4, MAX_NICENESS ) ; // 4 ms
// }
//cleanup and launch threads:
//g_threads.printState();
//g_threads.timedCleanUp(4, MAX_NICENESS ) ; // 4 ms
// do it anyway
doPoll();
// do it anyway
// take this out as well to see if signals are coming in
//doPoll();
while ( m_needToPoll ) doPoll();
long elapsed = g_now - s_lastTime;
// if someone changed the system clock on us, this could be negative
// so fix it! otherwise, times may NEVER get called in our lifetime
if ( elapsed < 0 ) elapsed = m_minTick;
// call this every (about) 1 second
if ( elapsed >= m_minTick ) {
// MAX_NUM_FDS is the fd for sleep callbacks
callCallbacks_ass ( true , MAX_NUM_FDS , g_now );
// note the last time we called them
//g_now = gettimeofdayInMilliseconds();
s_lastTime = g_now;
}
//while ( m_needToPoll ) doPoll();
#ifndef _POLLONLY_
// hack
//char buffer[100];
//if ( recv(27,buffer,99,MSG_PEEK|MSG_DONTWAIT) == 0 ) {
// logf(LOG_DEBUG,"CLOSED CLOSED!!");
//}
//g_errno = 0;
// hack
//char buffer[100];
//if ( recv(27,buffer,99,MSG_PEEK|MSG_DONTWAIT) == 0 ) {
// logf(LOG_DEBUG,"CLOSED CLOSED!!");
//}
//g_errno = 0;
//check for pending signals, return right away if none.
//then we'll do the low priority stuff while we were
//supposed to be sleeping.
g_inWaitState = true;
sigNum = sigtimedwait (&sigs0, &info, s_sigWaitTimePtr ) ;
//check for pending signals, return right away if none.
//then we'll do the low priority stuff while we were
//supposed to be sleeping.
g_inWaitState = true;
// if no signal, we just waited 20 ms and nothing happened
if ( sigNum == -1 )
sigalrmHandler( 0,&info,NULL);
//logf(LOG_DEBUG,"loop: sigNum=%li signo=%li alrm=%li",
// (long)sigNum,info.si_signo,(long)SIGVTALRM);
// no longer in a wait state...
g_inWaitState = false;
//sigNum = sigtimedwait (&sigs0, &info, s_sigWaitTimePtr ) ;
if ( sigNum < 0 ) {
if ( errno == EAGAIN || errno == EINTR ||
errno == EILSEQ || errno == 0 ) {
sigNum = 0;
errno = 0;
}
else if ( errno != ENOMEM ) {
log("loop: sigtimedwait(): %s.",strerror(errno));
continue;
}
#undef usleep
// now we just usleep(). an arriving signal will call
// sigHandlerQueue_r() then break us out of this sleep.
// 10000 microseconds is 10 milliseconds. it should break out
// when a signal comes in just like the sleep() function.
usleep(1000 * 10);
// reinstate the thing that prevents us from non-chalantly adding
// usleeps() which could degrade performance
#define usleep(a) { char *xx=NULL;*xx=0; }
// if no signal, we just waited 20 ms and nothing happened
// why do we need this now? MDW
//if ( sigNum == -1 )
// sigalrmHandler( 0,&info,NULL);
//logf(LOG_DEBUG,"loop: sigNum=%li signo=%li alrm=%li",
// (long)sigNum,info.si_signo,(long)SIGVTALRM);
// no longer in a wait state...
g_inWaitState = false;
long n = MAX_NUM_FDS / 32;
// process file descriptor callbacks for file descriptors
// we queued in sigHandlerQueue_r() function above.
// we use an array of 1024 bits like the poll function i guess.
for ( long i = 0 ; i < n ; i++ ) {
// this is a 32-bit number
if ( ! g_fdReadBits[i] ) continue;
// scan the individual bits now
for ( long j = 0 ; j < 32 ; j++ ) {
// mask mask
unsigned long mask = 1 << j;
// skip jth bit if not on
if ( ! g_fdReadBits[i] & mask ) continue;
// block signals for just a sec so we can
// clear it now that we've handled it
//maskSignals();
// clear it
g_fdReadBits[i] &= ~mask;
// reinstate signals
//unmaskSignals();
// construct the file descriptor
long fd = i*32 + j;
// . call all callbacks registered on this fd
// . forReading = true
callCallbacks_ass ( true , fd , g_now );
}
if ( sigNum == 0 ) {
//no signals pending, try to take care of anything left undone:
}
long long startTime = gettimeofdayInMillisecondsLocal();
if(g_now & 1) {
if(g_udpServer.needBottom())
g_udpServer.makeCallbacks_ass ( 2 );
//if(g_udpServer2.needBottom())
// g_udpServer2.makeCallbacks_ass ( 2 );
// do the same thing but for writing now
for ( long i = 0 ; i < n ; i++ ) {
// this is a 32-bit number
if ( ! g_fdWriteBits[i] ) continue;
// scan the individual bits now
for ( long j = 0 ; j < 32 ; j++ ) {
// mask mask
unsigned long mask = 1 << j;
// skip jth bit if not on
if ( ! g_fdWriteBits[i] & mask ) continue;
// block signals for just a sec so we can
// clear it now that we've handled it
//maskSignals();
// clear it
g_fdWriteBits[i] &= ~mask;
// reinstate signals
//unmaskSignals();
// construct the file descriptor
long fd = i*32 + j;
// . call all callbacks registered on this fd.
// . forReading = false.
callCallbacks_ass ( false , fd , g_now );
}
}
if(gettimeofdayInMillisecondsLocal() -
startTime > 10)
goto notime;
long long elapsed = g_now - s_lastTime;
// if someone changed the system clock on us, this could be negative
// so fix it! otherwise, times may NEVER get called in our lifetime
if ( elapsed < 0 ) elapsed = m_minTick;
// call this every (about) m_minTicks milliseconds
if ( elapsed >= m_minTick ) {
// MAX_NUM_FDS is the fd for sleep callbacks
callCallbacks_ass ( true , MAX_NUM_FDS , g_now );
// note the last time we called them
//g_now = gettimeofdayInMilliseconds();
s_lastTime = g_now;
}
// call remaining callbacks for udp msgs
if ( g_udpServer.needBottom() )
g_udpServer.makeCallbacks_ass ( 2 );
//if(g_udpServer2.needBottom())
// g_udpServer2.makeCallbacks_ass ( 2 );
// if(gettimeofdayInMillisecondsLocal() -
// startTime > 10)
// goto notime;
if(g_conf.m_sequentialProfiling)
g_threads.printState();
if(g_threads.m_needsCleanup)
g_threads.timedCleanUp(4 , // ms
MAX_NICENESS);
}
else {
if(g_conf.m_sequentialProfiling)
g_threads.printState();
if(g_threads.m_needsCleanup)
g_threads.timedCleanUp(4 , // ms
MAX_NICENESS);
if ( g_conf.m_sequentialProfiling )
g_threads.printState();
if(gettimeofdayInMillisecondsLocal() -
startTime > 10)
goto notime;
if ( g_threads.m_needsCleanup )
// limit to 4ms. cleanup any niceness thread.
g_threads.timedCleanUp(4 ,MAX_NICENESS);
if(g_udpServer.needBottom())
g_udpServer.makeCallbacks_ass ( 2 );
//if(g_udpServer2.needBottom())
// g_udpServer2.makeCallbacks_ass ( 2 );
}
notime:
//if we still didn't get all of them cleaned up set
//sleep time to none.
if(g_udpServer.needBottom() ) {
//g_udpServer2.needBottom()) {
s_sigWaitTimePtr = &s_sigWaitTime2;
}
else {
//otherwise set it to minTick
s_sigWaitTimePtr = &s_sigWaitTime;
}
}
else {
if ( info.si_code == SIGIO ) {
log("loop: got sigio");
m_needToPoll = true;
}
// handle the signal
else sigHandler_r ( 0 , &info , NULL );
}
#endif
} while (1);
goto BIGLOOP;
// make compiler happy
return 0;
/*
if ( sigNum < 0 ) {
if ( errno == EAGAIN || errno == EINTR ||
errno == EILSEQ || errno == 0 ) {
sigNum = 0;
errno = 0;
}
else if ( errno != ENOMEM ) {
log("loop: sigtimedwait(): %s.",
strerror(errno));
continue;
}
}
if ( sigNum == 0 ) {
//no signals pending, try to take care of anything
// left undone:
long long startTime =gettimeofdayInMillisecondsLocal();
if(g_now & 1) {
if(g_udpServer.needBottom())
g_udpServer.makeCallbacks_ass ( 2 );
//if(g_udpServer2.needBottom())
// g_udpServer2.makeCallbacks_ass ( 2 );
if(gettimeofdayInMillisecondsLocal() -
startTime > 10)
goto notime;
if(g_conf.m_sequentialProfiling)
g_threads.printState();
if(g_threads.m_needsCleanup)
g_threads.timedCleanUp(4 , // ms
MAX_NICENESS);
}
else {
if(g_conf.m_sequentialProfiling)
g_threads.printState();
if(g_threads.m_needsCleanup)
g_threads.timedCleanUp(4 , // ms
MAX_NICENESS);
if(gettimeofdayInMillisecondsLocal() -
startTime > 10)
goto notime;
if(g_udpServer.needBottom())
g_udpServer.makeCallbacks_ass ( 2 );
//if(g_udpServer2.needBottom())
// g_udpServer2.makeCallbacks_ass ( 2 );
}
notime:
//if we still didn't get all of them cleaned up set
//sleep time to none.
if(g_udpServer.needBottom() ) {
//g_udpServer2.needBottom()) {
s_sigWaitTimePtr = &s_sigWaitTime2;
}
else {
//otherwise set it to minTick
s_sigWaitTimePtr = &s_sigWaitTime;
}
}
//
// we got a signal, process it
//
else {
if ( info.si_code == SIGIO ) {
log("loop: got sigio");
m_needToPoll = true;
}
// handle the signal
else sigHandler_r ( 0 , &info , NULL );
}
*/
// } while(1)
/*
loop:
g_now = gettimeofdayInMilliseconds();
g_errno = 0;
@ -1252,10 +1523,10 @@ bool Loop::runLoop ( ) {
s_bitched = true;
}
// assume not any more
g_threads.m_needsCleanup = false;
//g_threads.m_needsCleanup = false;
// check thread queue for any threads that completed
// so we can call their callbacks and remove them
g_threads.cleanUp ( 0 , 1000/*max niceness*/);
g_threads.cleanUp ( 0 , 1000) ; // max niceness
// launch any threads in waiting since this sig was
// from a terminating one
g_threads.launchThreads();
@ -1337,8 +1608,8 @@ bool Loop::runLoop ( ) {
//goto subloop;
// we need to make g_now as accurate as possible for hot UdpServer...
goto loop;
// make compiler happy
return 0;
*/
}
// . the kernel sends a SIGIO signal when the sig queue overflows
@ -1706,7 +1977,7 @@ void sigHandler_r ( int x , siginfo_t *info , void *v ) {
// g_threads.launchThreads();
}
// if we just needed a cleanup
// if we just needed to cleanup a thread
if ( info->si_signo == SIGCHLD ) return;
// if we don't got a signal for an fd, just a sigqueue() call, bail now
@ -1719,15 +1990,21 @@ void sigHandler_r ( int x , siginfo_t *info , void *v ) {
//if ( band & (POLLIN | POLLOUT) == (POLLIN | POLLOUT) )
// g_loop.callCallbacks_ass ( true/*forReading?*/ , fd );
if ( band & POLLIN ) {
//log("Loop: read %lli",gettimeofdayInMilliseconds());
// keep stats on this now since some linuxes dont work right
g_stats.m_readSignals++;
//log("Loop: read %lli fd=%i",gettimeofdayInMilliseconds(),fd);
g_loop.callCallbacks_ass ( true , fd );
}
else if ( band & POLLPRI ) {
//log("Loop: read %lli",gettimeofdayInMilliseconds());
// keep stats on this now since some linuxes dont work right
g_stats.m_readSignals++;
//log("Loop: read %lli fd=%i",gettimeofdayInMilliseconds(),fd);
g_loop.callCallbacks_ass ( true , fd ) ;
}
else if ( band & POLLOUT ) {
//log("Loop: read %lli",gettimeofdayInMilliseconds());
// keep stats on this now since some linuxes dont work right
g_stats.m_writeSignals++;
//log("Loop: write %lli fd=%i",gettimeofdayInMilliseconds(),fd)
g_loop.callCallbacks_ass ( false , fd );
}
// fix qainject1() test with this

@ -2850,6 +2850,13 @@ void scanHammerQueue ( int fd , void *state ) {
long long waited = -1LL;
Msg13Request *nextLink = NULL;
//bool useProxies = true;
// user can turn off proxy use with this switch
//if ( ! g_conf.m_useProxyIps ) useProxies = false;
// we gotta have some proxy ips that we can use
//if ( ! g_conf.m_proxyIps.hasDigits() ) useProxies = false;
// scan down the linked list of queued of msg13 requests
for ( ; r ; prev = r , r = nextLink ) {
@ -2868,7 +2875,9 @@ void scanHammerQueue ( int fd , void *state ) {
// . try to be more sensitive for more sensitive website policy
// . we don't know why this proxy was banned, or if we were
// responsible, or who banned it, but be more sensitive
if ( r->m_hammerCallback == downloadTheDocForReals3b )
if ( //useProxies &&
r->m_numBannedProxies &&
r->m_hammerCallback == downloadTheDocForReals3b )
crawlDelayMS = r->m_numBannedProxies * 2000;
// download finished?
@ -2891,6 +2900,15 @@ void scanHammerQueue ( int fd , void *state ) {
// callback can now be either downloadTheDocForReals(r)
// or downloadTheDocForReals3b(r) if it is waiting after
// getting a ProxyReply that had a m_proxyBackoff set
if ( g_conf.m_logDebugSpider )
log(LOG_DEBUG,"spider: calling hammer callback for "
"%s (timestamp=%lli,waited=%lli,crawlDelayMS=%li)",
r->ptr_url,
last,
waited,
crawlDelayMS);
//
// it should also add the current time to the hammer cache
// for r->m_firstIp

@ -149,6 +149,11 @@ bool Msg5::getList ( char rdbId ,
log("disk: Trying to reset a class waiting for a reply.");
char *xx = NULL; *xx = 0;
}
if ( collnum < 0 ) {
log("msg5: called with bad collnum=%li",(long)collnum);
g_errno = ENOCOLLREC;
return true;
}
// sanity check. we no longer have record caches!
// now we do again for posdb gbdocid:xxx| restricted queries
//if ( addToCache || maxCacheAge ) {char *xx=NULL;*xx=0; }

@ -136,13 +136,16 @@ skipReplaceHost:
//char *shotcol = "";
char shotcol[1024];
shotcol[0] = '\0';
char *cs = coll;
if ( ! cs ) cs = "";
if ( g_conf.m_useShotgun && format == FORMAT_HTML ) {
colspan = "31";
//shotcol = "<td><b>ip2</b></td>";
sprintf ( shotcol, "<td><a href=\"/admin/hosts?c=%s"
"&sort=2\">"
"<b>ping2</b></td></a>",
coll);
cs);
}
// print host table
@ -253,24 +256,24 @@ skipReplaceHost:
TABLE_STYLE ,
colspan ,
coll, sort,
cs, sort,
DARK_BLUE ,
coll,
coll,
coll,
coll,
coll,
coll,
coll,
coll,
coll,
coll,
coll,
coll,
coll,
coll,
coll,
cs,
cs,
cs,
cs,
cs,
cs,
cs,
cs,
cs,
cs,
cs,
cs,
cs,
cs,
cs,
shotcol );
// loop through each host we know and print it's stats
@ -750,7 +753,7 @@ skipReplaceHost:
"</tr>" ,
bg,//LIGHT_BLUE ,
ipbuf3, h->m_httpPort,
coll, sort,
cs, sort,
i ,
h->m_hostname,
h->m_shardNum,//group,
@ -986,7 +989,7 @@ skipReplaceHost:
LIGHT_BLUE,
ipbuf3,
h->m_httpPort,
coll,
cs,
i ,
type,

@ -1523,6 +1523,7 @@ bool printLeftNavColumn ( SafeBuf &sb, State0 *st ) {
SearchInput *si = &st->m_si;
Msg40 *msg40 = &st->m_msg40;
CollectionRec *cr = si->m_cr;
char format = si->m_format;
@ -1599,7 +1600,7 @@ bool printLeftNavColumn ( SafeBuf &sb, State0 *st ) {
"<br>"
"<center>"
"<a href=/>"
"<a href=/?c=%s>"
"<div style=\""
"background-color:white;"
"padding:10px;"
@ -1619,6 +1620,7 @@ bool printLeftNavColumn ( SafeBuf &sb, State0 *st ) {
"<br>"
"<br>"
,cr->m_coll
);
}

@ -803,8 +803,11 @@ bool sendPageStats ( TcpSocket *s , HttpRequest *r ) {
"<td colspan=2>"
"<center><b>General Info</b></td></tr>\n"
"<tr class=poo><td><b>Uptime</b></td><td>%s</td></tr>\n"
"<tr class=poo><td><b>Process ID</b></td><td>%lu</td></tr>\n"
"<tr class=poo><td><b>Corrupted Disk Reads</b></td><td>%li</td></tr>\n"
"<tr class=poo><td><b>SIGVTALRMS</b></td><td>%li</td></tr>\n"
"<tr class=poo><td><b>read signals</b></td><td>%lli</td></tr>\n"
"<tr class=poo><td><b>write signals</b></td><td>%lli</td></tr>\n"
"<tr class=poo><td><b>quickpolls</b></td><td>%li</td></tr>\n"
"<tr class=poo><td><b>Kernel Version</b></td><td>%s</td></tr>\n"
//"<tr class=poo><td><b>Gigablast Version</b></td><td>%s %s</td></tr>\n"
@ -815,11 +818,15 @@ bool sendPageStats ( TcpSocket *s , HttpRequest *r ) {
//"<tr class=poo><td><b>Tfndb Extension Bits</b></td><td>%li</td>\n"
"</tr>\n"
"<tr class=poo><td><b>Spider Locks</b></td><td>%li</td></tr>\n"
"<tr class=poo><td><b>Local Time</b></td><td>%s (%li)</td></tr>\n",
"<tr class=poo><td><b>Local Time</b></td><td>%s (%li)</td></tr>\n"
,
TABLE_STYLE ,
ubuf.getBufStart(),
(unsigned long)getpid(),
g_numCorrupt,
g_numAlarms,
g_stats.m_readSignals,
g_stats.m_writeSignals,
g_numQuickPolls,
kv ,
//GBPROJECTNAME,

@ -1196,7 +1196,7 @@ bool Pages::printAdminTop (SafeBuf *sb ,
"<br>"
"<center>"
"<a href=/>"
"<a href=/?c=%s>"
"<div style=\""
"background-color:white;"
"padding:10px;"
@ -1215,6 +1215,7 @@ bool Pages::printAdminTop (SafeBuf *sb ,
"<br>"
"<br>"
,coll
);

@ -11837,14 +11837,14 @@ void Parms::init ( ) {
m->m_desc = "Maximum number of threads to use per Gigablast process "
"for accessing the disk "
"for index-building purposes. Keep low to reduce impact "
"on query response time. Increase for RAID systems or when "
"initially building an index.";
"on query response time. Increase for fast disks or when "
"preferring build speed over lower query latencies";
m->m_cgi = "smdt";
m->m_off = (char *)&g_conf.m_spiderMaxDiskThreads - g;
m->m_type = TYPE_LONG;
m->m_def = "7";
m->m_def = "30";
m->m_units = "threads";
m->m_flags = PF_HIDDEN | PF_NOSAVE;
m->m_flags = 0;//PF_HIDDEN | PF_NOSAVE;
m->m_page = PAGE_MASTER;
m->m_obj = OBJ_CONF;
m++;
@ -11854,10 +11854,10 @@ void Parms::init ( ) {
m->m_cgi = "smbdt";
m->m_off = (char *)&g_conf.m_spiderMaxBigDiskThreads - g;
m->m_type = TYPE_LONG;
m->m_def = "3"; // 1
m->m_def = "8"; // 1
m->m_units = "threads";
m->m_group = 0;
m->m_flags = PF_HIDDEN | PF_NOSAVE;
m->m_flags = 0;//PF_HIDDEN | PF_NOSAVE;
m->m_page = PAGE_MASTER;
m->m_obj = OBJ_CONF;
m++;
@ -11867,10 +11867,10 @@ void Parms::init ( ) {
m->m_cgi = "smmdt";
m->m_off = (char *)&g_conf.m_spiderMaxMedDiskThreads - g;
m->m_type = TYPE_LONG;
m->m_def = "4"; // 3
m->m_def = "19"; // 3
m->m_units = "threads";
m->m_group = 0;
m->m_flags = PF_HIDDEN | PF_NOSAVE;
m->m_flags = 0;//PF_HIDDEN | PF_NOSAVE;
m->m_page = PAGE_MASTER;
m->m_obj = OBJ_CONF;
m++;
@ -11880,10 +11880,10 @@ void Parms::init ( ) {
m->m_cgi = "smsdt";
m->m_off = (char *)&g_conf.m_spiderMaxSmaDiskThreads - g;
m->m_type = TYPE_LONG;
m->m_def = "5";
m->m_def = "20";
m->m_units = "threads";
m->m_group = 0;
m->m_flags = PF_HIDDEN | PF_NOSAVE;
m->m_flags = 0;//PF_HIDDEN | PF_NOSAVE;
m->m_page = PAGE_MASTER;
m->m_obj = OBJ_CONF;
m++;
@ -11891,15 +11891,16 @@ void Parms::init ( ) {
m->m_title = "max query read threads";
m->m_desc = "Maximum number of threads to use per Gigablast process "
"for accessing the disk "
"for querying purposes. IDE systems tend to be more "
"responsive when this is low. Increase for SCSI or RAID "
"systems.";
"for querying purposes.";
//IDE systems tend to be more "
// "responsive when this is low. Increase for SCSI or RAID "
// "systems.";
m->m_cgi = "qmdt";
m->m_off = (char *)&g_conf.m_queryMaxDiskThreads - g;
m->m_type = TYPE_LONG;
m->m_def = "20";
m->m_def = "50";
m->m_units = "threads";
m->m_flags = PF_HIDDEN | PF_NOSAVE;
m->m_flags = 0;//PF_HIDDEN | PF_NOSAVE;
m->m_page = PAGE_MASTER;
m->m_obj = OBJ_CONF;
m++;
@ -11909,10 +11910,10 @@ void Parms::init ( ) {
m->m_cgi = "qmbdt";
m->m_off = (char *)&g_conf.m_queryMaxBigDiskThreads - g;
m->m_type = TYPE_LONG;
m->m_def = "20"; // 1
m->m_def = "30"; // 1
m->m_units = "threads";
m->m_group = 0;
m->m_flags = PF_HIDDEN | PF_NOSAVE;
m->m_flags = 0;//PF_HIDDEN | PF_NOSAVE;
m->m_page = PAGE_MASTER;
m->m_obj = OBJ_CONF;
m++;
@ -11922,10 +11923,10 @@ void Parms::init ( ) {
m->m_cgi = "qmmdt";
m->m_off = (char *)&g_conf.m_queryMaxMedDiskThreads - g;
m->m_type = TYPE_LONG;
m->m_def = "20"; // 3
m->m_def = "30"; // 3
m->m_units = "threads";
m->m_group = 0;
m->m_flags = PF_HIDDEN | PF_NOSAVE;
m->m_flags = 0;//PF_HIDDEN | PF_NOSAVE;
m->m_page = PAGE_MASTER;
m->m_obj = OBJ_CONF;
m++;
@ -11935,10 +11936,10 @@ void Parms::init ( ) {
m->m_cgi = "qmsdt";
m->m_off = (char *)&g_conf.m_queryMaxSmaDiskThreads - g;
m->m_type = TYPE_LONG;
m->m_def = "20";
m->m_def = "40";
m->m_units = "threads";
m->m_group = 0;
m->m_flags = PF_HIDDEN | PF_NOSAVE;
m->m_flags = 0;//PF_HIDDEN | PF_NOSAVE;
m->m_page = PAGE_MASTER;
m->m_obj = OBJ_CONF;
m++;

@ -868,6 +868,10 @@ void hdtempWrapper ( int fd , void *state ) {
long now = getTime();
// or if haven't waited long enough
if ( now < s_nextTime ) return;
// ignore this for now, ssds don't have temp monitoring
return;
// set it
g_process.m_threadOut = true;
// . call thread to call popen
@ -1005,7 +1009,7 @@ void *hdtempStartWrapper_r ( void *state , ThreadEntry *t ) {
// ignore temps now. ssds don't have it
return NULL;
//return NULL;
static char *s_parm = "ata";
@ -1151,6 +1155,10 @@ void heartbeatWrapper ( int fd , void *state ) {
if ( elapsed > 200 )
// now we print the # of elapsed alarms. that way we will
// know if the alarms were going off or not...
// this happens if the rt sig queue is overflowed.
// check the "cat /proc/<pid>/status | grep SigQ" output
// to see if its overflowed. hopefully i will fix this by
// queue the signals myself in Loop.cpp.
log("db: missed heartbeat by %lli ms. Num elapsed alarms = "
"%li", elapsed-100,(long)(g_numAlarms - s_lastNumAlarms));
s_last = now;

@ -18,6 +18,8 @@ Stats::Stats ( ) {
//m_minWindowStartTime = 0;
memset ( m_pts , 0 , sizeof(StatPoint)*MAX_POINTS );
m_readSignals = 0;
m_writeSignals = 0;
m_slowDiskReads = 0;
m_queryTimes = 0;
m_numQueries = 0;
@ -500,7 +502,7 @@ void drawLine2 ( SafeBuf &sb ,
"left:%li;"
"top:%li;"
"background-color:#%06lx;"
"z-index:-5;"
"z-index:5;"
"min-height:%lipx;"
"min-width:%lipx;\"></div>\n"
, x1
@ -564,7 +566,7 @@ void Stats::printGraphInHtml ( SafeBuf &sb ) {
//"overflow-y:hidden;"
"overflow-x:hidden;"
"z-index:-10;"
//"z-index:-10;"
// the tick marks we print below are based on it
// being a window of the last 20 seconds... and using
// DX pixels

@ -110,6 +110,8 @@ class Stats {
long m_totalNumFails;
float m_avgQueryTime;
float m_successRate;
long long m_readSignals;
long long m_writeSignals;
// set in BigFile.cpp
long m_slowDiskReads;

@ -2065,6 +2065,7 @@ bool ThreadQueue::launchThread2 ( ThreadEntry *te ) {
sigset_t sigs;
sigemptyset ( &sigs );
sigaddset ( &sigs , SIGALRM );
//sigaddset ( &sigs , SIGVTALRM );
if ( sigprocmask ( SIG_BLOCK , &sigs , NULL ) < 0 )
log("threads: failed to block sig");

@ -454,8 +454,9 @@ bool UdpServer::init ( unsigned short port, UdpProtocol *proto, long niceness,
// log an innocent msg
//log ( 0, "udp: listening on port %hu with sd=%li and "
// "niceness=%li", m_port, m_sock, m_niceness );
log ( LOG_INIT, "udp: Listening on UDP port %hu with niceness=%li.",
m_port, niceness );
log ( LOG_INIT, "udp: Listening on UDP port %hu with niceness=%li "
"and fd=%i.",
m_port, niceness , m_sock );
// print dgram sizes
//log("udp: using max dgram size of %li bytes", DGRAM_SIZE );
return true;

@ -160,7 +160,12 @@ void UdpSlot::connect ( UdpProtocol *proto ,
// . make async signal safe
// . TODO: this is slow to clear all those m_*bits, we got 1.7k of slot
//long size = (unsigned long)m_tmpBuf - (unsigned long)this ;
long size = (unsigned long)&m_next - (unsigned long)this ;
//long size = (unsigned long)&m_next - (unsigned long)this ;
//memset_ass ( (char *)this , 0 , size );
// avoid that heavy memset_ass() call using this logic.
// we will clear on demand using m_numBitsInitialized logic
// in UdpSlot.h
long size = (unsigned long)&m_sentBits2 - (unsigned long)this ;
memset_ass ( (char *)this , 0 , size );
// store this info
m_proto = proto ;
@ -248,12 +253,13 @@ void UdpSlot::resetConnect ( ) {
m_sentAckBitsOn = 0;
m_nextToSend = 0;
m_firstUnlitSentAckBit = 0;
for ( long b = 0; b < m_dgramsToSend; b++ ) {
clrBit(b, m_sentBits);
clrBit(b, m_readBits);
clrBit(b, m_sentAckBits);
clrBit(b, m_readAckBits);
}
m_numBitsInitialized = 0;
// for ( long b = 0; b < m_dgramsToSend; b++ ) {
// clrBit(b, m_sentBits2);
// clrBit(b, m_readBits2);
// clrBit(b, m_sentAckBits2);
// clrBit(b, m_readAckBits2);
// }
// . set m_dgramsToSend
// . similar to UdpProtocol::getNumDgrams(char *dgram,long dgramSize)
long dataSpace = m_maxDgramSize ;
@ -373,7 +379,7 @@ void UdpSlot::prepareForResend ( long long now , bool resendAll ) {
// clear all if reset is true
if ( resendAll ) {
for ( long i = 0 ; i < m_dgramsToSend ; i++ )
clrBit ( i , m_readAckBits );
clrBit ( i , m_readAckBits2 );
m_readAckBitsOn = 0;
}
// how many sentBits we cleared
@ -381,11 +387,11 @@ void UdpSlot::prepareForResend ( long long now , bool resendAll ) {
// clear each sent bit if it hasn't gotten an ACK
for ( long i = 0 ; i < m_dgramsToSend ; i++ ) {
// continue if we already have an ack for this one
if ( isOn ( i , m_readAckBits ) ) continue;
if ( isOn ( i , m_readAckBits2 ) ) continue;
// continue if it's already cleared
if ( ! isOn ( i , m_sentBits ) ) continue;
if ( ! isOn ( i , m_sentBits2 ) ) continue;
// mark dgram #i as unsent since we don't have ACK for it yet
clrBit ( i , m_sentBits );
clrBit ( i , m_sentBits2 );
// reduce the lit bit count
m_sentBitsOn--;
// may have to adjust m_nextToSend
@ -617,7 +623,7 @@ long UdpSlot::sendDatagramOrAck ( int sock, bool allowResends, long long now ){
// we may have received an ack for an implied resend (from ack gap)
// so we clear some bits, but then got an ACK back later
while ( m_nextToSend < m_dgramsToSend &&
isOn ( m_nextToSend , m_sentBits ) )
isOn ( m_nextToSend , m_sentBits2 ) )
m_nextToSend++;
// if we've sent it all return -2
if ( m_sentBitsOn >= m_dgramsToSend ) return -2;
@ -847,13 +853,13 @@ long UdpSlot::sendDatagramOrAck ( int sock, bool allowResends, long long now ){
// . sendSetup() will set m_firstSendTime to -1
if (m_sentBitsOn == 0 && m_firstSendTime == -1) m_firstSendTime =now;
// mark this dgram as sent
setBit ( dgramNum , m_sentBits );
setBit ( dgramNum , m_sentBits2 );
// count the bit we lit
m_sentBitsOn++;
// update last send time stamp even if we're a resend
m_lastSendTime = now;
// update m_nextToSend
m_nextToSend = getNextUnlitBit ( dgramNum, m_sentBits,m_dgramsToSend);
m_nextToSend = getNextUnlitBit ( dgramNum, m_sentBits2,m_dgramsToSend);
// log network info
if ( g_conf.m_logDebugUdp ) {
//long shotgun = 0;
@ -923,8 +929,8 @@ long UdpSlot::sendDatagramOrAck ( int sock, bool allowResends, long long now ){
return 1;
}
// assume m_readBits, m_sendBits, m_sentAckBits and m_readAckBits are correct
// and update m_firstUnlitSentAckBit, m_sentAckBitsOn, m_readBitsOn,
// assume m_readBits2, m_sendBits2, m_sentAckBits2 and m_readAckBits2 are
// correct and update m_firstUnlitSentAckBit, m_sentAckBitsOn, m_readBitsOn,
// m_readAckBitsOn and m_sentBitsOn
void UdpSlot::fixSlot ( ) {
// log it
@ -948,19 +954,19 @@ void UdpSlot::fixSlot ( ) {
m_sentBitsOn = 0;
m_sentAckBitsOn = 0;
for ( long i = 0 ; i < m_dgramsToRead ; i++ ) {
if ( isOn ( i , m_readBits ) ) m_readBitsOn++;
if ( isOn ( i , m_readBits2 ) ) m_readBitsOn++;
// we send back an ack for every dgram read
if ( isOn ( i , m_sentAckBits ) ) m_sentAckBitsOn++;
if ( isOn ( i , m_sentAckBits2 ) ) m_sentAckBitsOn++;
}
for ( long i = 0 ; i < m_dgramsToSend ; i++ ) {
if ( isOn ( i , m_sentBits ) ) m_sentBitsOn++;
if ( isOn ( i , m_sentBits2 ) ) m_sentBitsOn++;
// we must read an ack for every dgram sent
if ( isOn ( i , m_readAckBits ) ) m_readAckBitsOn++;
if ( isOn ( i , m_readAckBits2 ) ) m_readAckBitsOn++;
}
// start at bit #0 so this doesn't loop forever
m_firstUnlitSentAckBit=getNextUnlitBit(-1,m_sentAckBits,m_dgramsToRead);
m_nextToSend =getNextUnlitBit(-1,m_sentBits ,m_dgramsToSend);
m_firstUnlitSentAckBit=getNextUnlitBit(-1,m_sentAckBits2,m_dgramsToRead);
m_nextToSend =getNextUnlitBit(-1,m_sentBits2 ,m_dgramsToSend);
log(LOG_LOGIC,
"udp: after fixSlot(): "
@ -1007,12 +1013,12 @@ long UdpSlot::sendAck ( int sock , long long now ,
char dgram[DGRAM_SIZE_CEILING];
// . if dgramNum is -1, send the next ack in line
// . it's the first bit in m_sentAckBits that is 0 while being
// . it's the first bit in m_sentAckBits2 that is 0 while being
// lit in m_readBits
if ( dgramNum == -1 ) {
// m_firstUnlitSentAckBit is the first clr bit in m_sentAckBits
dgramNum = m_firstUnlitSentAckBit;
// . now find the first bit in m_sentAckBits that is off
// . now find the first bit in m_sentAckBits2 that is off
// yet on in m_readBits
// . the OLD statement below didn't check to see if dgramNum is
// then off in m_sentAckBits!!!
@ -1020,10 +1026,10 @@ long UdpSlot::sendAck ( int sock , long long now ,
// . we know that m_sentAckBitsOn < m_readBitsOn so the
// we must find a bit with these properties
for ( ; dgramNum < m_dgramsToRead ; dgramNum++ ) {
// if bit off in m_readBits, it's not an ACK candidate
if(!isOn(dgramNum,m_readBits))continue;
// if bit is off in m_sentAckBits, that's the one!
if(!isOn(dgramNum,m_sentAckBits))break;
// if bit off in m_readBits2, it's not an ACK candidate
if(!isOn(dgramNum,m_readBits2))continue;
// if bit is off in m_sentAckBits2, that's the one!
if(!isOn(dgramNum,m_sentAckBits2))break;
}
// if we had no match, that's an error!
if ( dgramNum >= m_dgramsToRead ) {
@ -1123,9 +1129,9 @@ long UdpSlot::sendAck ( int sock , long long now ,
if ( m_host ) m_host->m_dgramsTo++;
// sending to loopback, 127.0.0.1?
else if ( ip == 0x0100007f ) g_hostdb.getMyHost()->m_dgramsTo++;
if ( ! isOn ( dgramNum , m_sentAckBits ) ) {
if ( ! isOn ( dgramNum , m_sentAckBits2 ) ) {
// mark this ack as sent
setBit ( dgramNum , m_sentAckBits );
setBit ( dgramNum , m_sentAckBits2 );
// count the bit we lit
m_sentAckBitsOn++;
}
@ -1149,7 +1155,7 @@ long UdpSlot::sendAck ( int sock , long long now ,
// . otherwise, we had a read hole so we had to skip dgramNum around
if (dgramNum <= m_firstUnlitSentAckBit)
m_firstUnlitSentAckBit = getNextUnlitBit(dgramNum,
m_sentAckBits,
m_sentAckBits2,
m_dgramsToRead);
// log msg
if ( g_conf.m_logDebugUdp ) { // || cancelTrans ) {
@ -1190,7 +1196,7 @@ long UdpSlot::sendAck ( int sock , long long now ,
// . if the read dgram had an error code we set g_errno to that and ret false
// . anyone calling this should call sendDatagramOrAck() immediately afterwards
// in case the send was blocking on receiving an ACK or we should send an ACK
// . updates: m_readBits, m_readBitsOn, m_sentAckBits, m_sentAckBitsOn
// . updates: m_readBits2, m_readBitsOn, m_sentAckBits2, m_sentAckBitsOn
// m_firstUnlitSentAckBit
bool UdpSlot::readDatagramOrAck ( int sock ,
char *peek ,
@ -1324,7 +1330,7 @@ bool UdpSlot::readDatagramOrAck ( int sock ,
if ( dgramNum != 0 )
log(LOG_LOGIC,"udp: Error dgram is not dgram #0.");
// it's new to us, set the read bits
setBit ( dgramNum, m_readBits );
setBit ( dgramNum, m_readBits2 );
// we read one dgram
m_readBitsOn = 1;
// only one dgram to read
@ -1345,7 +1351,7 @@ bool UdpSlot::readDatagramOrAck ( int sock ,
if ( m_callback && m_readAckBitsOn != m_dgramsToSend ) {
// catch em all up
for ( long i = 0 ; i < m_dgramsToSend ; i++ )
setBit ( i , m_readAckBits );
setBit ( i , m_readAckBits2 );
m_readAckBitsOn = m_dgramsToSend;
if ( g_conf.m_logDebugUdp )
log(LOG_DEBUG,"udp: Cramming ACKs "
@ -1360,11 +1366,11 @@ bool UdpSlot::readDatagramOrAck ( int sock ,
// . makeReadBuf will init m_firstReadTime to -1
//if (m_readBitsOn == 0 && m_firstReadTime == -1) m_firstReadTime =now;
// did we already receive this dgram?
if ( isOn(dgramNum,m_readBits) ) {
if ( isOn(dgramNum,m_readBits2) ) {
// did we already send the ack for it?
if ( isOn(dgramNum,m_sentAckBits) ) {
if ( isOn(dgramNum,m_sentAckBits2) ) {
// clear the ack we sent for this so we send it again
clrBit ( dgramNum , m_sentAckBits );
clrBit ( dgramNum , m_sentAckBits2 );
// reduce lit bit count of sent acks
m_sentAckBitsOn--;
// update the next ack to send
@ -1579,7 +1585,7 @@ bool UdpSlot::readDatagramOrAck ( int sock ,
// keep track of dgrams sent outside of our cluster
//else g_stats.m_dgramsFromStrangers++;
// it's new to us, set the read bits
setBit ( dgramNum, m_readBits );
setBit ( dgramNum, m_readBits2 );
// inc the lit bit count
m_readBitsOn++;
// if our proto doesn't use acks, treat this as an ACK as well
@ -1631,7 +1637,7 @@ bool UdpSlot::readDatagramOrAck ( int sock ,
// bounce it back into m_readBuf
memcpy_ass ( dest , src , len );
// it's new to us, set the read bits
setBit ( dgramNum, m_readBits );
setBit ( dgramNum, m_readBits2 );
// inc the lit bit count
m_readBitsOn++;
// if our proto doesn't use acks, treat this as an ACK as well
@ -1661,20 +1667,20 @@ void UdpSlot::readAck ( int sock , long dgramNum , long long now ) {
// reset the resendTime to the starting point before back-off scheme
setResendTime();
// if this is a dup ack, ignore it
if ( isOn ( dgramNum , m_readAckBits ) ) return;
if ( isOn ( dgramNum , m_readAckBits2 ) ) return;
// mark this ack as read
setBit ( dgramNum , m_readAckBits );
setBit ( dgramNum , m_readAckBits2 );
// update lit bit count
m_readAckBitsOn++;
// if it was marked as unsent, fix that
if ( ! isOn ( dgramNum , m_sentBits ) ) {
if ( ! isOn ( dgramNum , m_sentBits2 ) ) {
// bitch if we do not even have a send buffer. why is he acking
// something we haven't even had to a chance to generate let
// alone send? network error?
if ( ! m_sendBufAlloc || m_dgramsToSend <= 0 )
log("udp: Read ack but send buf is empty.");
// mark this dgram as sent
setBit ( dgramNum , m_sentBits );
setBit ( dgramNum , m_sentBits2 );
// update lit bit count
m_sentBitsOn++;
}
@ -1686,11 +1692,11 @@ void UdpSlot::readAck ( int sock , long dgramNum , long long now ) {
// on our right as having sent bits of 0, until we hit a lit ack bit
for ( long i = dgramNum - 1 ; i >= 0 ; i-- ) {
// stop after hitting a lit bit
if ( isOn ( i , m_readAckBits ) ) break;
if ( isOn ( i , m_readAckBits2 ) ) break;
// mark as unsent iff it's marked as sent
if ( ! isOn ( i , m_sentBits ) ) continue;
if ( ! isOn ( i , m_sentBits2 ) ) continue;
// mark as unsent
clrBit ( i , m_sentBits );
clrBit ( i , m_sentBits2 );
// reduce the lit bit count
m_sentBitsOn--;
// update m_nextToSend

@ -220,13 +220,40 @@ class UdpSlot {
// . for internal use
// . set a window bit
void setBit ( long dgramNum , unsigned char *bits ) {
bits [ dgramNum >> 3 ] |= (1 << (dgramNum & 0x07)); };
// lazy initialize,since initializing all bits is too expensive
if ( dgramNum >= m_numBitsInitialized ) {
m_sentBits2 [dgramNum>>3] = 0;
m_readBits2 [dgramNum>>3] = 0;
m_sentAckBits2[dgramNum>>3] = 0;
m_readAckBits2[dgramNum>>3] = 0;
m_numBitsInitialized += 8;
}
bits [ dgramNum >> 3 ] |= (1 << (dgramNum & 0x07));
};
// clear a window bit
void clrBit ( long dgramNum , unsigned char *bits ) {
bits [ dgramNum >> 3 ] &= ~(1 << (dgramNum & 0x07)); };
// lazy initialize,since initializing all bits is too expensive
if ( dgramNum >= m_numBitsInitialized ) {
m_sentBits2 [dgramNum>>3] = 0;
m_readBits2 [dgramNum>>3] = 0;
m_sentAckBits2[dgramNum>>3] = 0;
m_readAckBits2[dgramNum>>3] = 0;
m_numBitsInitialized += 8;
}
bits [ dgramNum >> 3 ] &= ~(1 << (dgramNum & 0x07));
};
// get value of a window bit
bool isOn ( long dgramNum , unsigned char *bits ) {
return bits [ dgramNum >> 3 ] & (1 << (dgramNum & 0x07)); };
// lazy initialize,since initializing all bits is too expensive
if ( dgramNum >= m_numBitsInitialized ) {
m_sentBits2 [dgramNum>>3] = 0;
m_readBits2 [dgramNum>>3] = 0;
m_sentAckBits2[dgramNum>>3] = 0;
m_readAckBits2[dgramNum>>3] = 0;
m_numBitsInitialized += 8;
}
return bits [ dgramNum >> 3 ] & (1 << (dgramNum & 0x07));
};
// clear all the bits
//void clrAllBits ( unsigned char *bits , long numBits ) {
@ -380,14 +407,16 @@ class UdpSlot {
// don't wait longer than this, however
short m_maxWait;
// save cpu by not having to call memset() on m_sentBits et al
long m_numBitsInitialized;
// . i've discarded the window since msg size is limited
// . this way is faster
// . these bits determine what dgrams we've sent/read/sentAck/readAck
unsigned char m_sentBits [ (MAX_DGRAMS / 8) + 1 ];
unsigned char m_readBits [ (MAX_DGRAMS / 8) + 1 ];
unsigned char m_sentAckBits [ (MAX_DGRAMS / 8) + 1 ];
unsigned char m_readAckBits [ (MAX_DGRAMS / 8) + 1 ];
unsigned char m_sentBits2 [ (MAX_DGRAMS / 8) + 1 ];
unsigned char m_readBits2 [ (MAX_DGRAMS / 8) + 1 ];
unsigned char m_sentAckBits2 [ (MAX_DGRAMS / 8) + 1 ];
unsigned char m_readAckBits2 [ (MAX_DGRAMS / 8) + 1 ];
// we keep the unused slots in a linked list in UdpServer
class UdpSlot *m_next;

6
html/dotgdbinit Normal file

@ -0,0 +1,6 @@
handle SIG38 nostop noprint pass
handle SIG39 nostop noprint pass
handle SIG40 nostop noprint pass
handle SIG41 nostop noprint pass
set print thread-events 0

@ -4742,7 +4742,7 @@ int install ( install_flag_konst_t installFlag , long hostId , char *dir ,
SafeBuf tmpBuf;
tmpBuf.safePrintf(
"rcp -r %s %s:%s"
"scp -r %s %s:%s"
, fileListBuf.getBufStart()
, iptoa(h2->m_ip)
, h2->m_dir
@ -4825,7 +4825,7 @@ int install ( install_flag_konst_t installFlag , long hostId , char *dir ,
if ( ! f.doesExist() ) target = "gb";
sprintf(tmp,
"rcp "
"scp "
"%s%s "
"%s:%s/gb.installed%s",
dir,
@ -4840,7 +4840,7 @@ int install ( install_flag_konst_t installFlag , long hostId , char *dir ,
// don't copy to ourselves
//if ( h2->m_hostId == h->m_hostId ) continue;
sprintf(tmp,
"rcp "
"scp "
"%sgb.new "
"%s:%s/tmpgb.installed &",
dir,
@ -5139,7 +5139,7 @@ int install ( install_flag_konst_t installFlag , long hostId , char *dir ,
continue;
}
sprintf(tmp,
"rcp "
"scp "
"%scatdb/content.rdf.u8 "
"%s:%scatdb/content.rdf.u8",
dir,
@ -5148,7 +5148,7 @@ int install ( install_flag_konst_t installFlag , long hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
system ( tmp );
sprintf(tmp,
"rcp "
"scp "
"%scatdb/structure.rdf.u8 "
"%s:%scatdb/structure.rdf.u8",
dir,
@ -5157,7 +5157,7 @@ int install ( install_flag_konst_t installFlag , long hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
system ( tmp );
sprintf(tmp,
"rcp "
"scp "
"%scatdb/gbdmoz.structure.dat "
"%s:%scatdb/gbdmoz.structure.dat",
dir,
@ -5166,7 +5166,7 @@ int install ( install_flag_konst_t installFlag , long hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
system ( tmp );
sprintf(tmp,
"rcp "
"scp "
"%scatdb/gbdmoz.content.dat "
"%s:%scatdb/gbdmoz.content.dat",
dir,
@ -5175,7 +5175,7 @@ int install ( install_flag_konst_t installFlag , long hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
//system ( tmp );
//sprintf(tmp,
// "rcp "
// "scp "
// "%scatdb/gbdmoz.content.dat.diff "
// "%s:%scatdb/gbdmoz.content.dat.diff",
// dir,
@ -5189,7 +5189,7 @@ int install ( install_flag_konst_t installFlag , long hostId , char *dir ,
// don't copy to ourselves
if ( h2->m_hostId == 0 ) continue;
sprintf(tmp,
"rcp "
"scp "
"%scatdb/content.rdf.u8.new "
"%s:%scatdb/content.rdf.u8.new",
dir,
@ -5198,7 +5198,7 @@ int install ( install_flag_konst_t installFlag , long hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
system ( tmp );
sprintf(tmp,
"rcp "
"scp "
"%scatdb/structure.rdf.u8.new "
"%s:%scatdb/structure.rdf.u8.new",
dir,
@ -5207,7 +5207,7 @@ int install ( install_flag_konst_t installFlag , long hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
system ( tmp );
sprintf(tmp,
"rcp "
"scp "
"%scatdb/gbdmoz.structure.dat.new "
"%s:%scatdb/gbdmoz.structure.dat.new",
dir,
@ -5216,7 +5216,7 @@ int install ( install_flag_konst_t installFlag , long hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
system ( tmp );
sprintf(tmp,
"rcp "
"scp "
"%scatdb/gbdmoz.content.dat.new "
"%s:%scatdb/gbdmoz.content.dat.new",
dir,
@ -5225,7 +5225,7 @@ int install ( install_flag_konst_t installFlag , long hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
system ( tmp );
sprintf(tmp,
"rcp "
"scp "
"%scatdb/gbdmoz.content.dat.new.diff "
"%s:%scatdb/gbdmoz.content.dat.new.diff",
dir,