This commit is contained in:
Zak Betz
2015-09-03 22:17:48 -06:00
13 changed files with 806 additions and 160 deletions

@ -852,7 +852,8 @@ bool BigFile::readwrite ( void *buf ,
int32_t rate = 100000;
if ( took > 500 ) rate = fstate->m_bytesDone / took ;
if ( rate < 8000 && fstate->m_niceness <= 0 ) {
log(LOG_INFO,"disk: Read %"INT32" bytes in %"INT64" ms (%"INT32"MB/s).",
log(LOG_INFO,"disk: Read %"INT32" bytes in %"INT64" "
"ms (%"INT32"KB/s).",
fstate->m_bytesDone,took,rate);
g_stats.m_slowDiskReads++;
}
@ -952,7 +953,8 @@ void doneWrapper ( void *state , ThreadEntry *t ) {
if ( fstate->m_errno == EDISKSTUCK ) slow = true;
if ( slow && fstate->m_niceness <= 0 ) {
if ( fstate->m_errno != EDISKSTUCK )
log(LOG_INFO, "disk: Read %"INT32" bytes in %"INT64" ms (%"INT32"MB/s).",
log(LOG_INFO, "disk: Read %"INT32" bytes in %"INT64" "
"ms (%"INT32"KB/s).",
fstate->m_bytesDone,took,rate);
g_stats.m_slowDiskReads++;
}
@ -1273,9 +1275,9 @@ bool readwrite_r ( FileState *fstate , ThreadEntry *t ) {
if ( t && t->m_callback == ohcrap ) return false;
// only set this now if we are the first one
if ( g_threads.m_threadQueues[DISK_THREAD].m_hiReturned ==
g_threads.m_threadQueues[DISK_THREAD].m_hiLaunched )
g_lastDiskReadStarted = fstate->m_startTime;
// if ( g_threads.m_threadQueues[DISK_THREAD].m_hiReturned ==
// g_threads.m_threadQueues[DISK_THREAD].m_hiLaunched )
// g_lastDiskReadStarted = fstate->m_startTime;
// fake it out
//static int32_t s_poo = 0;

@ -329,6 +329,12 @@ bool Collectiondb::addExistingColl ( char *coll, collnum_t collnum ) {
if ( cr->m_isCustomCrawl )
cr->m_indexSpiderReplies = true;
// and don't do link voting, will help speed up
if ( cr->m_isCustomCrawl ) {
cr->m_getLinkInfo = false;
cr->m_computeSiteNumInlinks = false;
}
// we need to compile the regular expressions or update the url
// filters with new logic that maps crawlbot parms to url filters
return cr->rebuildUrlFilters ( );

@ -2634,8 +2634,6 @@ bool Hostdb::createHostsConf( char *cwd ) {
sb.safePrintf("# if you want more.\n");
sb.safePrintf("#\n");
/*
sb.safePrintf("# Format:\n");
sb.safePrintf("#\n");
sb.safePrintf("# first column: hostID (starts at 0 and increments from there)\n");
@ -2644,9 +2642,12 @@ bool Hostdb::createHostsConf( char *cwd ) {
sb.safePrintf("# fourth column: port that HTTP listens on\n");
sb.safePrintf("# fifth column: port that udp server listens on\n");
sb.safePrintf("# sixth column: IP address or hostname that has an IP address in /etc/hosts\n");
sb.safePrintf("# seventh column: like sixth column but for secondary ethernet port. (optional)\n");
sb.safePrintf("# seventh column: like sixth column but for secondary ethernet port. Can be the same as the sixth column.\n");
sb.safePrintf("# eigth column: An optional text note that will "
"display in the hosts table for this host.\n");
sb.safePrintf("\n");
sb.safePrintf("\n");
/*
sb.safePrintf("# This file consists of a list of lines like this:\n");
sb.safePrintf("#\n");
sb.safePrintf("# <ClientDnsPort> <HttpsPort> <HttpPort> <UdpPort> <IP1> <IP2> <Path>\n");

@ -2017,12 +2017,12 @@ void Loop::doPoll ( ) {
// if shutting down was it a sigterm ?
if ( m_shutdown ) goto again;
// handle returned threads for niceness 0
if ( g_threads.m_needsCleanup )
g_threads.timedCleanUp(-3,0); // 3 ms
//if ( g_threads.m_needsCleanup )
g_threads.timedCleanUp(-3,0); // 3 ms
if ( m_inQuickPoll ) goto again;
// high niceness threads
if ( g_threads.m_needsCleanup )
g_threads.timedCleanUp(-4,MAX_NICENESS); //3 ms
//if ( g_threads.m_needsCleanup )
g_threads.timedCleanUp(-4,MAX_NICENESS); //3 ms
goto again;
}

@ -566,7 +566,8 @@ skipReplaceHost:
h->m_pingInfo.m_udpSlotsInUseIncoming ) {
char *f1 = "";
char *f2 = "";
if ( h->m_pingInfo.m_udpSlotsInUseIncoming >= 200 ) {
// MAXUDPSLOTS in Spider.cpp is 300 right now
if ( h->m_pingInfo.m_udpSlotsInUseIncoming >= 300 ) {
f1 = "<b>";
f2 = "</b>";
}

@ -29,24 +29,31 @@ bool sendPageThreads ( TcpSocket *s , HttpRequest *r ) {
int32_t loActive = q->m_loLaunched - q->m_loReturned;
int32_t mdActive = q->m_mdLaunched - q->m_mdReturned;
int32_t hiActive = q->m_hiLaunched - q->m_hiReturned;
int32_t total = loActive + mdActive + hiActive;
// int32_t loActive = q->m_loLaunched - q->m_loReturned;
// int32_t mdActive = q->m_mdLaunched - q->m_mdReturned;
// int32_t hiActive = q->m_hiLaunched - q->m_hiReturned;
// int32_t total = loActive + mdActive + hiActive;
int32_t total = q->m_launched - q->m_returned;
p.safePrintf ( "<table %s>"
"<tr class=hdrow><td colspan=\"11\">"
//"<center>"
//"<font size=+1>"
"<b>Thread Type: %s"
" (low: %"INT32""
" med: %"INT32""
" high: %"INT32""
" total: %"INT32")</td></tr>",
// " (low: %"INT32""
// " med: %"INT32""
// " high: %"INT32""
" (launched: %"INT32" "
"returned: %"INT32" "
"total: %"INT32")</td></tr>",
TABLE_STYLE,
q->getThreadType(),
loActive, mdActive,
hiActive, total);
// loActive, mdActive,
// hiActive,
(int32_t)q->m_launched,
(int32_t)q->m_returned,
total);
p.safePrintf ("<tr bgcolor=#%s>"
@ -59,19 +66,20 @@ bool sendPageThreads ( TcpSocket *s , HttpRequest *r ) {
"<td><b>Callback</b></td>"
"<td><b>Routine</b></td>"
"<td><b>Bytes Done</b></td>"
"<td><b>KBytes/Sec</b></td>"
"<td><b>Megabytes/Sec</b></td>"
"<td><b>Read|Write</b></td>"
"</tr>"
, LIGHT_BLUE
);
for ( int32_t j = 0 ; j < q->m_top ; j++ ) {
for ( int32_t j = 0 ; j < q->m_maxEntries ; j++ ) {
ThreadEntry *t = &q->m_entries[j];
if(!t->m_isOccupied) continue;
FileState *fs = (FileState *)t->m_state;
bool diskThread = false;
if(q->m_threadType == DISK_THREAD && fs) diskThread = true;
if(q->m_threadType == DISK_THREAD && fs)
diskThread = true;
// might have got pre-called from EDISKSTUCK
if ( ! t->m_callback ) fs = NULL;
@ -81,18 +89,29 @@ bool sendPageThreads ( TcpSocket *s , HttpRequest *r ) {
if(t->m_isDone) {
p.safePrintf("<td><font color='red'><b>done</b></font></td>");
p.safePrintf("<td>%"INT32"</td>", t->m_niceness);
p.safePrintf("<td>%"INT64"</td>", t->m_launchedTime - t->m_queuedTime); //queued
p.safePrintf("<td>%"INT64"</td>", t->m_exitTime - t->m_launchedTime); //run time
p.safePrintf("<td>%"INT64"</td>", now - t->m_exitTime); //cleanup
p.safePrintf("<td>%"INT64"</td>", now - t->m_queuedTime); //total
p.safePrintf("<td>%"INT64"ms</td>", t->m_launchedTime - t->m_queuedTime); //queued
p.safePrintf("<td>%"INT64"ms</td>", t->m_exitTime - t->m_launchedTime); //run time
p.safePrintf("<td>%"INT64"ms</td>", now - t->m_exitTime); //cleanup
p.safePrintf("<td>%"INT64"ms</td>", now - t->m_queuedTime); //total
p.safePrintf("<td>%s</td>", g_profiler.getFnName((PTRTYPE)t->m_callback));
p.safePrintf("<td>%s</td>", g_profiler.getFnName((PTRTYPE)t->m_startRoutine));
if(diskThread && fs) {
int64_t took = (t->m_exitTime - t->m_launchedTime);
if(took <= 0) took = 1;
p.safePrintf("<td>%"INT32"/%"INT32"</td>", t->m_bytesToGo, t->m_bytesToGo);
p.safePrintf("<td>%.2f kbps</td>", (float)t->m_bytesToGo/took);
p.safePrintf("<td>%s</td>",t->m_doWrite? "Write":"Read");
char *sign = "";
if(took <= 0) {sign=">";took = 1;}
p.safePrintf("<td>%"INT32"/%"INT32""
"</td>",
t->m_bytesToGo,
t->m_bytesToGo);
p.safePrintf("<td>%s%.2f MB/s</td>",
sign,
(float)t->m_bytesToGo/
(1024.0*1024.0)/
((float)took/1000.0));
p.safePrintf("<td>%s</td>",
t->m_doWrite?
"<font color=red>"
"Write</font>":"Read");
}
else {
p.safePrintf("<td>--</td>");
@ -113,7 +132,7 @@ bool sendPageThreads ( TcpSocket *s , HttpRequest *r ) {
int64_t took = (now - t->m_launchedTime);
if(took <= 0) took = 1;
p.safePrintf("<td>%c%c%c/%"INT32"</td>", '?','?','?',t->m_bytesToGo);
p.safePrintf("<td>%.2f kbps</td>", 0.0);//(float)fs->m_bytesDone/took);
p.safePrintf("<td>%.2f MB/s</td>", 0.0);//(float)fs->m_bytesDone/took);
p.safePrintf("<td>%s</td>",t->m_doWrite? "Write":"Read");
}
else {
@ -151,7 +170,7 @@ bool sendPageThreads ( TcpSocket *s , HttpRequest *r ) {
}
/*
int32_t loActiveBig = disk->m_loLaunchedBig - disk->m_loReturnedBig;
int32_t loActiveMed = disk->m_loLaunchedMed - disk->m_loReturnedMed;
int32_t loActiveSma = disk->m_loLaunchedSma - disk->m_loReturnedSma;
@ -208,7 +227,7 @@ bool sendPageThreads ( TcpSocket *s , HttpRequest *r ) {
"<td><b>Active Write Threads</b></td><td>%"INT32"</td>"
"</tr></table>",
activeWrites);
*/
return g_httpServer.sendDynamicPage ( s , (char*) p.getBufStart() ,
p.length() );

@ -1463,6 +1463,9 @@ Profiler::getStackFrame(int sig) {
// somewhere. but for now just ignore.
if ( g_inMemcpy ) return;
// likewise, not if in system malloc since backtrace() mallocs
if ( g_inMemFunction ) return;
//void *trace[32];
// the innermost line number

@ -1797,8 +1797,11 @@ void attemptMergeAll2 ( ) {
tryLoop:
// if a collection got deleted, reset this to 0
if ( s_lastCollnum >= g_collectiondb.m_numRecs )
if ( s_lastCollnum >= g_collectiondb.m_numRecs ) {
s_lastCollnum = 0;
// and return so we don't spin 1000 times over a single coll.
return;
}
// limit to 1000 checks to save the cpu since we call this once
// every 2 seconds.

@ -2835,6 +2835,9 @@ static void gotSpiderdbListWrapper2( void *state , RdbList *list,Msg5 *msg5) {
THIS->populateWaitingTreeFromSpiderdb ( true );
}
// lower from 1300 to 300
#define MAXUDPSLOTS 300
//////////////////
//////////////////
//
@ -2867,6 +2870,8 @@ void SpiderColl::populateWaitingTreeFromSpiderdb ( bool reentry ) {
if ( m_deleteMyself ) { char *xx=NULL;*xx=0; }
// skip if spiders off
if ( ! m_cr->m_spideringEnabled ) return;
// skip if udp table is full
if ( g_udpServer.getNumUsedSlotsIncoming() >= MAXUDPSLOTS ) return;
// if entering for the first time, we need to read list from spiderdb
if ( ! reentry ) {
// just return if we should not be doing this yet
@ -3117,6 +3122,9 @@ void SpiderColl::populateDoledbFromWaitingTree ( ) { // bool reentry ) {
// this.
if ( ! g_conf.m_spideringEnabled ) return;
// skip if udp table is full
if ( g_udpServer.getNumUsedSlotsIncoming() >= MAXUDPSLOTS ) return;
// try skipping!!!!!!!!!!!
// yeah, this makes us scream. in addition to calling
// Doledb::m_rdb::addRecord() below
@ -5945,9 +5953,6 @@ void SpiderLoop::startLoop ( ) {
log("build: failed to register updatecrawlinfowrapper");
}
// lower from 1300 to 200
#define MAXUDPSLOTS 200
// call this every 50ms it seems to try to spider urls and populate doledb
// from the waiting tree
void doneSleepingWrapperSL ( int fd , void *state ) {
@ -13675,6 +13680,14 @@ void handleRequestc1 ( UdpSlot *slot , int32_t niceness ) {
char *req = slot->m_readBuf;
if ( ! slot->m_host ) {
log("handc1: no slot->m_host from ip=%s udpport=%i",
iptoa(slot->m_ip),(int)slot->m_port);
g_errno = ENOHOSTS;
g_udpServer.sendErrorReply ( slot , g_errno );
return;
}
//if ( ! isClockSynced() ) {
//}
@ -14075,7 +14088,7 @@ bool getSpiderStatusMsg ( CollectionRec *cx , SafeBuf *msg , int32_t *status ) {
// out CollectionRec::m_globalCrawlInfo counts do not have a dead
// host's counts tallied into it, which could make a difference on
// whether we have exceed a maxtocrawl limit or some such, so wait...
if ( ! s_countsAreValid ) {
if ( ! s_countsAreValid && g_hostdb.hasDeadHost() ) {
*status = SP_ADMIN_PAUSED;
return msg->safePrintf("All crawling temporarily paused "
"because a shard is down.");

@ -248,6 +248,9 @@ void Statsdb::addDocsIndexed ( ) {
if ( g_hostdb.hasDeadHost() ) return;
// only host #0 needs this
if ( g_hostdb.m_hostId != 0 ) return;
// only once per five seconds
int32_t now = getTimeLocal();
static int32_t s_lastTime = 0;

File diff suppressed because it is too large Load Diff

@ -59,6 +59,13 @@ class ThreadEntry {
bool m_needsJoin;
pthread_t m_joinTid;
class ThreadEntry *m_nextLink;
class ThreadEntry *m_prevLink;
// the waiting linked list we came from
ThreadEntry **m_bestHeadPtr;
ThreadEntry **m_bestTailPtr;
};
//#define MAX_THREAD_ENTRIES 1024
@ -85,6 +92,33 @@ class ThreadQueue {
int32_t m_entriesSize;
int32_t m_maxEntries;
// linked list head for launched thread entries
ThreadEntry *m_launchedHead;
// linked list head for empty thread entries
ThreadEntry *m_emptyHead;
// 8 heads/tails for linked lists of thread entries waiting to launch
ThreadEntry *m_waitHead0;
ThreadEntry *m_waitHead1;
ThreadEntry *m_waitHead2;
ThreadEntry *m_waitHead3;
ThreadEntry *m_waitHead4;
ThreadEntry *m_waitHead5;
ThreadEntry *m_waitHead6;
ThreadEntry *m_waitHead7;
ThreadEntry *m_waitTail0;
ThreadEntry *m_waitTail1;
ThreadEntry *m_waitTail2;
ThreadEntry *m_waitTail3;
ThreadEntry *m_waitTail4;
ThreadEntry *m_waitTail5;
ThreadEntry *m_waitTail6;
ThreadEntry *m_waitTail7;
/*
// counts the high/low priority (niceness <= 0) threads
int64_t m_hiLaunched;
int64_t m_hiReturned;
@ -114,6 +148,7 @@ class ThreadQueue {
int64_t m_mdReturnedSma;
int64_t m_loLaunchedSma;
int64_t m_loReturnedSma;
*/
// init
bool init (char threadType, int32_t maxThreads, int32_t maxEntries);
@ -143,6 +178,13 @@ class ThreadQueue {
// . returns false and sets errno on error
bool launchThread2 ( ThreadEntry *te );
bool launchThreadForReals ( ThreadEntry **headPtr ,
ThreadEntry **tailPtr ) ;
void removeThreads2 ( ThreadEntry **headPtr ,
ThreadEntry **tailPtr ,
class BigFile *bf ) ;
void print ( ) ;
// these are called by g_udpServer2, the high priority udp server
@ -245,10 +287,12 @@ class Threads {
int32_t getNumThreadQueues() { return m_numQueues; }
// used by UdpServer to see if it should call a low priority callback
int32_t getNumActiveHighPriorityCpuThreads() ;
//int32_t getNumActiveHighPriorityCpuThreads() ;
// all high priority threads...
int32_t getNumActiveHighPriorityThreads() ;
bool hasHighPriorityCpuThreads() ;
int32_t getNumThreadsOutOrQueued();
// counts the high/low priority (niceness <= 0) threads

@ -10056,6 +10056,10 @@ char *XmlDoc::getIsDup ( ) {
// sanity. must be posdb list.
if ( ! list->isEmpty() && list->m_ks != 18 ) { char *xx=NULL;*xx=0;}
// so getSiteRank() does not core
int32_t *sni = getSiteNumInlinks();
if ( ! sni || sni == (int32_t *)-1 ) return (char *)sni;
// . see if there are any pages that seem like they are dups of us
// . they must also have a HIGHER score than us, for us to be
// considered the dup
@ -13806,7 +13810,7 @@ int32_t *XmlDoc::getSiteNumInlinks ( ) {
// hacks of speed. computeSiteNumInlinks is true by default
// but if the user turns it off the just use sitelinks.txt
if ( ! cr->m_computeSiteNumInlinks ) {
if ( cr && ! cr->m_computeSiteNumInlinks ) {
int32_t hostHash32 = getHostHash32a();
int32_t min = g_tagdb.getMinSiteInlinks ( hostHash32 );
// try with www if not there