Files
privacore-open-source-searc…/SpiderLoop.cpp
Brian Rasmusson 7942ac43fd Added more info to spider debug log and made log lines more grep'able.
Added lock and readable timestamp to printWaitingTree.
Added more comments to spider code.
2017-07-04 16:07:50 +02:00

1477 lines
43 KiB
C++

#include "SpiderLoop.h"
#include "Spider.h"
#include "SpiderColl.h"
#include "SpiderCache.h"
#include "Doledb.h"
#include "UdpSlot.h"
#include "UdpServer.h"
#include "Collectiondb.h"
#include "SafeBuf.h"
#include "Repair.h"
#include "DailyMerge.h"
#include "Process.h"
#include "XmlDoc.h"
#include "HttpServer.h"
#include "Pages.h"
#include "Parms.h"
#include "ip.h"
#include "Conf.h"
#include "Mem.h"
#include "ScopedLock.h"
// . this was 10 but cpu is getting pegged, so i set to 45
// . we consider the collection done spidering when no urls to spider
// for this many seconds
// . i'd like to set back to 10 for speed... maybe even 5 or less
// . back to 30 from 20 to try to fix crawls thinking they are done
// maybe because of the empty doledb logic taking too long?
//#define SPIDER_DONE_TIMER 30
// try 45 to prevent false revivals
//#define SPIDER_DONE_TIMER 45
// try 30 again since we have new localcrawlinfo update logic much faster
//#define SPIDER_DONE_TIMER 30
// neo under heavy load go to 60
//#define SPIDER_DONE_TIMER 60
// super overloaded
//#define SPIDER_DONE_TIMER 90
#define SPIDER_DONE_TIMER 20
/////////////////////////
///////////////////////// SPIDERLOOP
/////////////////////////
// a global class extern'd in .h file
SpiderLoop g_spiderLoop;
SpiderLoop::SpiderLoop ( ) {
m_crx = NULL;
// clear array of ptrs to Doc's
memset ( m_docs , 0 , sizeof(XmlDoc *) * MAX_SPIDERS );
// Coverity
m_numSpidersOut = 0;
m_launches = 0;
m_maxUsed = 0;
m_sc = NULL;
m_gettingDoledbList = false;
m_activeList = NULL;
m_bookmark = NULL;
m_activeListValid = false;
m_activeListCount = 0;
m_recalcTime = 0;
m_recalcTimeValid = false;
m_doleStart = 0;
}
SpiderLoop::~SpiderLoop ( ) {
reset();
}
// free all doc's
void SpiderLoop::reset() {
// delete all doc's in use
for ( int32_t i = 0 ; i < MAX_SPIDERS ; i++ ) {
if ( m_docs[i] ) {
mdelete ( m_docs[i] , sizeof(XmlDoc) , "Doc" );
delete (m_docs[i]);
}
m_docs[i] = NULL;
}
m_list.freeList();
m_lockTable.reset();
m_winnerListCache.reset();
}
void SpiderLoop::init() {
logTrace( g_conf.m_logTraceSpider, "BEGIN" );
m_crx = NULL;
m_activeListValid = false;
m_activeList = NULL;
m_recalcTime = 0;
m_recalcTimeValid = false;
// we aren't in the middle of waiting to get a list of SpiderRequests
m_gettingDoledbList = false;
// clear array of ptrs to Doc's
memset ( m_docs , 0 , sizeof(XmlDoc *) * MAX_SPIDERS );
// . m_maxUsed is the largest i such that m_docs[i] is in use
// . -1 means there are no used m_docs's
m_maxUsed = -1;
m_numSpidersOut = 0;
// for locking. key size is 8 for easier debugging
m_lockTable.set ( 8,sizeof(UrlLock),0,NULL,0,false, "splocks", true ); // useKeyMagic? yes.
if ( ! m_winnerListCache.init ( 20000000 , // maxcachemem, 20MB
-1 , // fixedatasize
false , // supportlists?
10000 , // maxcachenodes
false , // use half keys
"winnerspidercache", // dbname
false ) )
log(LOG_WARN, "spider: failed to init winnerlist cache. slows down.");
// don't register callbacks when we're not using it
if (!g_hostdb.getMyHost()->m_spiderEnabled) {
logTrace(g_conf.m_logTraceSpider, "END");
return;
}
// sleep for .1 seconds = 100ms
if (!g_loop.registerSleepCallback(50, this, doneSleepingWrapperSL, "SpiderLoop::doneSleepingWrapperSL")) {
log(LOG_ERROR, "build: Failed to register timer callback. Spidering is permanently disabled. Restart to fix.");
}
logTrace( g_conf.m_logTraceSpider, "END" );
}
// call this every 50ms it seems to try to spider urls and populate doledb
// from the waiting tree
void SpiderLoop::doneSleepingWrapperSL ( int fd , void *state ) {
// if spidering disabled then do not do this crap
if ( ! g_conf.m_spideringEnabled ) return;
if ( ! g_hostdb.getMyHost( )->m_spiderEnabled ) return;
// or if trying to exit
if (g_process.isShuttingDown()) return;
// skip if udp table is full
if ( g_udpServer.getNumUsedSlotsIncoming() >= MAXUDPSLOTS ) return;
int32_t now = getTimeLocal();
// point to head of active linked list of collection recs
CollectionRec *nextActive = g_spiderLoop.getActiveList();
collnum_t nextActiveCollnum = nextActive ? nextActive->m_collnum : static_cast<collnum_t>( -1 );
for ( ; nextActive ; ) {
// before we assign crp to nextActive, ensure that it did not get deleted on us.
// if the next collrec got deleted, tr will be NULL
CollectionRec *tr = g_collectiondb.getRec( nextActiveCollnum );
// if it got deleted or restarted then it will not
// match most likely
if ( tr != nextActive ) {
// this shouldn't happen much so log it
log("spider: collnum %" PRId32" got deleted. rebuilding active list", (int32_t)nextActiveCollnum);
// rebuild the active list now
nextActive = g_spiderLoop.getActiveList();
nextActiveCollnum = nextActive ? nextActive->m_collnum : static_cast<collnum_t>( -1 );
continue;
}
// now we become him
CollectionRec *crp = nextActive;
// update these two vars for next iteration
nextActive = crp->m_nextActive;
nextActiveCollnum = nextActive ? nextActive->m_collnum : static_cast<collnum_t>( -1 );
// skip if not enabled
if ( ! crp->m_spideringEnabled ) {
continue;
}
// get it
SpiderColl *sc = g_spiderCache.getSpiderColl(crp->m_collnum);
// skip if none
if ( ! sc ) {
continue;
}
// always do a scan at startup & every 24 hrs
// AND at process startup!!!
if ( ! sc->m_waitingTreeNeedsRebuild && now - sc->getLastScanTime() > 24*3600 ) {
// if a scan is ongoing, this will re-set it
sc->resetWaitingTreeNextKey();
sc->m_waitingTreeNeedsRebuild = true;
log( LOG_INFO, "spider: hit spider queue rebuild timeout for %s (%" PRId32")",
crp->m_coll, (int32_t)crp->m_collnum );
}
// e.g. URL Filter config has changed
if (sc->m_waitingTreeNeedsRebuild) {
// re-entry is false because we are entering for the first time
logTrace(g_conf.m_logTraceSpider, "Calling populateWaitingTreeFromSpiderdb");
sc->populateWaitingTreeFromSpiderdb(false);
}
logTrace( g_conf.m_logTraceSpider, "Calling populateDoledbFromWaitingTree" );
sc->populateDoledbFromWaitingTree();
}
// if we have a ton of collections, reduce cpu load from calling
// spiderDoledUrls()
static uint64_t s_skipCount = 0;
s_skipCount++;
// so instead of every 50ms make it every 200ms if we got 100+ collections in use.
g_spiderLoop.getActiveList();
int32_t activeListCount = g_spiderLoop.m_activeListCount;
if ( ! g_spiderLoop.m_activeListValid ) {
activeListCount = 0;
}
int32_t skip = 1;
if ( activeListCount >= 200 ) {
skip = 8;
} else if ( activeListCount >= 100 ) {
skip = 4;
} else if ( activeListCount >= 50 ) {
skip = 2;
}
if ( ( s_skipCount % skip ) != 0 ) {
return;
}
// spider some urls that were doled to us
logTrace( g_conf.m_logTraceSpider, "Calling spiderDoledUrls" );
g_spiderLoop.spiderDoledUrls( );
}
void SpiderLoop::gotDoledbListWrapper2 ( void *state , RdbList *list , Msg5 *msg5 ) {
// process the doledb list
g_spiderLoop.gotDoledbList2();
}
//////////////////////////
//////////////////////////
//
// The second KEYSTONE function.
//
// Scans doledb and spiders the doledb records.
//
// Doledb records contain SpiderRequests ready for spidering NOW.
//
// 1. gets all locks from all hosts in the shard
// 2. sends confirm msg to all hosts if lock acquired:
// - each host will remove from doledb then
// - assigned host will also add new "0" entry to waiting tree if need be
// - calling addToWaitingTree() will trigger populateDoledbFromWaitingTree()
// to add a new entry into waiting tree, not the one just locked.
// 3. makes a new xmldoc class for that url and calls indexDoc() on it
//
//////////////////////////
//////////////////////////
// now check our RDB_DOLEDB for SpiderRequests to spider!
void SpiderLoop::spiderDoledUrls ( ) {
logTrace( g_conf.m_logTraceSpider, "BEGIN" );
collLoop:
// start again at head if this is NULL
if ( ! m_crx ) m_crx = getActiveList();
bool firstTime = true;
// detect overlap
m_bookmark = m_crx;
// get this
m_sc = NULL;
// set this in the loop
CollectionRec *cr = NULL;
uint32_t nowGlobal = 0;
m_launches = 0;
subloop:
// must be spidering to dole out
if ( ! g_conf.m_spideringEnabled ) {
logTrace( g_conf.m_logTraceSpider, "END, spidering disabled" );
return;
}
if ( ! g_hostdb.getMyHost( )->m_spiderEnabled ) {
logTrace( g_conf.m_logTraceSpider, "END, spidering disabled (2)" );
return;
}
// or if trying to exit
if (g_process.isShuttingDown()) {
logTrace( g_conf.m_logTraceSpider, "END, shutting down" );
return;
}
// don't spider if we have dead host
time_t now = time(NULL);
static bool s_hasDeadHost = g_hostdb.hasDeadHost();
static time_t s_updatedTime = now;
if ((now - s_updatedTime) >= g_conf.m_spiderDeadHostCheckInterval) {
s_updatedTime = now;
s_hasDeadHost = g_hostdb.hasDeadHost();
}
if (s_hasDeadHost) {
logTrace(g_conf.m_logTraceSpider, "END, has dead host");
return;
}
// if we do not overlap ourselves
if ( m_gettingDoledbList ) {
logTrace( g_conf.m_logTraceSpider, "END, already getting DoledbList" );
return;
}
// bail instantly if in read-only mode (no RdbTrees!)
if ( g_conf.m_readOnlyMode ) {
logTrace( g_conf.m_logTraceSpider, "END, in read-only mode" );
return;
}
// or if doing a daily merge
if ( g_dailyMerge.m_mergeMode ) {
logTrace( g_conf.m_logTraceSpider, "END, doing daily merge" );
return;
}
// skip if too many udp slots being used
if ( g_udpServer.getNumUsedSlotsIncoming() >= MAXUDPSLOTS ) {
logTrace( g_conf.m_logTraceSpider, "END, using max UDP slots" );
return;
}
// stop if too many out. this is now 50 down from 500.
if ( m_numSpidersOut >= MAX_SPIDERS ) {
logTrace( g_conf.m_logTraceSpider, "END, reached max spiders" );
return;
}
// a new global conf rule
if ( m_numSpidersOut >= g_conf.m_maxTotalSpiders ) {
logTrace( g_conf.m_logTraceSpider, "END, reached max total spiders" );
return;
}
// bail if no collections
if ( g_collectiondb.getNumRecs() <= 0 ) {
logTrace( g_conf.m_logTraceSpider, "END, no collections" );
return;
}
// not while repairing
if ( g_repairMode ) {
logTrace( g_conf.m_logTraceSpider, "END, in repair mode" );
return;
}
// do not spider until collections/parms in sync with host #0
if ( ! g_parms.inSyncWithHost0() ) {
logTrace( g_conf.m_logTraceSpider, "END, not in sync with host#0" );
return;
}
// don't spider if not all hosts are up, or they do not all
// have the same hosts.conf.
if ( ! g_hostdb.hostsConfInAgreement() ) {
logTrace( g_conf.m_logTraceSpider, "END, host config disagreement" );
return;
}
// if nothin in the active list then return as well
if ( ! m_activeList ) {
logTrace( g_conf.m_logTraceSpider, "END, nothing in active list" );
return;
}
// if we hit the end of the list, wrap it around
if ( ! m_crx ) m_crx = m_activeList;
// we use m_bookmark to determine when we've done a round over all
// the collections. but it will be set to null sometimes when we
// are in this loop because the active list gets recomputed. so
// if we lost it because our bookmarked collection is no longer
// 'active' then just set it to the list head i guess
if ( ! m_bookmark || ! m_bookmark->m_isActive )
m_bookmark = m_activeList;
// i guess return at the end of the linked list if no collection
// launched a spider... otherwise do another cycle to launch another
// spider. i could see a single collection dominating all the spider
// slots in some scenarios with this approach unfortunately.
if ( m_crx == m_bookmark && ! firstTime && m_launches == 0 ) {
logTrace( g_conf.m_logTraceSpider, "END, end of list?" );
return;
}
// reset # launches after doing a round and having launched > 0
if ( m_crx == m_bookmark && ! firstTime )
m_launches = 0;
firstTime = false;
// if a collection got deleted re-calc the active list so
// we don't core trying to access a delete collectionrec.
// i'm not sure if this can happen here but i put this in as a
// precaution.
if ( ! m_activeListValid ) {
m_crx = NULL;
goto collLoop;
}
// return now if list is just empty
if ( ! m_activeList ) {
logTrace( g_conf.m_logTraceSpider, "END, active list empty" );
return;
}
cr = m_crx;
// Fix to shut up STACK
if( !m_crx ) {
goto collLoop;
}
// advance for next time we call goto subloop;
m_crx = m_crx->m_nextActive;
// get the spider collection for this collnum
m_sc = g_spiderCache.getSpiderColl(cr->m_collnum);
// skip if none
if ( ! m_sc ) {
logTrace( g_conf.m_logTraceSpider, "Loop, no spider cache for this collection" );
goto subloop;
}
// always reset priority to max at start
m_sc->setPriority ( MAX_SPIDER_PRIORITIES - 1 );
subloopNextPriority:
// skip if gone
if ( ! cr ) goto subloop;
// stop if not enabled
if ( ! cr->m_spideringEnabled ) goto subloop;
// set current time, synced with host #0
nowGlobal = (uint32_t)getTimeGlobal();
// get max spiders
int32_t maxSpiders = cr->m_maxNumSpiders;
logTrace( g_conf.m_logTraceSpider, "maxSpiders: %" PRId32 , maxSpiders );
// obey max spiders per collection too
if ( m_sc->m_spidersOut >= maxSpiders ) {
logTrace( g_conf.m_logTraceSpider, "Loop, Too many spiders active for collection" );
goto subloop;
}
// shortcut
SpiderColl *sc = cr->m_spiderColl;
if ( sc && sc->isDoledbIpTableEmpty() ) {
logTrace( g_conf.m_logTraceSpider, "Loop, doleIpTable is empty" );
goto subloop;
}
// sanity check
if ( nowGlobal == 0 ) { g_process.shutdownAbort(true); }
// need this for msg5 call
key96_t endKey;
endKey.setMax();
for ( ; ; ) {
// reset priority when it goes bogus
if ( m_sc->m_pri2 < 0 ) {
// reset for next coll
m_sc->setPriority( MAX_SPIDER_PRIORITIES - 1 );
logTrace( g_conf.m_logTraceSpider, "Loop, pri2 < 0" );
goto subloop;
}
// sanity
if ( cr != m_sc->getCollectionRec() ) {
g_process.shutdownAbort(true);
}
// skip the priority if we already have enough spiders on it
int32_t out = m_sc->m_outstandingSpiders[ m_sc->m_pri2 ];
// how many spiders can we have out?
int32_t max = 0;
for ( int32_t i = 0; i < cr->m_numRegExs; i++ ) {
if ( cr->m_spiderPriorities[ i ] != m_sc->m_pri2 ) {
continue;
}
if ( cr->m_maxSpidersPerRule[ i ] > max ) {
max = cr->m_maxSpidersPerRule[ i ];
}
}
// always allow at least 1, they can disable spidering otherwise
// no, we use this to disabled spiders... if ( max <= 0 ) max = 1;
// skip?
if ( out >= max ) {
// try the priority below us
m_sc->devancePriority();
// and try again
logTrace( g_conf.m_logTraceSpider, "Loop, trying previous priority" );
continue;
}
break;
}
// we only launch one spider at a time... so lock it up
m_gettingDoledbList = true;
// log this now
if ( g_conf.m_logDebugSpider ) {
m_doleStart = gettimeofdayInMilliseconds();
if ( m_sc->m_msg5StartKey != m_sc->m_nextDoledbKey ) {
log( "spider: msg5startKey differs from nextdoledbkey" );
}
}
// seems like we need this reset here... strange
m_list.reset();
logTrace( g_conf.m_logTraceSpider, "Getting list (msg5)" );
// get a spider rec for us to spider from doledb (mdw)
if ( ! m_msg5.getList ( RDB_DOLEDB ,
cr->m_collnum, // coll ,
&m_list ,
&m_sc->m_msg5StartKey,//m_sc->m_nextDoledbKey,
&endKey ,
// need to make this big because we don't
// want to end up getting just a negative key
//1 , // minRecSizes (~ 7000)
// we need to read in a lot because we call
// "goto listLoop" below if the url we want
// to dole is locked.
// seems like a ton of negative recs
// MDW: let's now read in 50k, not 2k,of doledb
// spiderrequests because often the first one
// has an ip already in use and then we'd
// just give up on the whole PRIORITY! which
// really freezes the spiders up.
// Also, if a spider request is corrupt in
// doledb it would cork us up too!
50000 , // minRecSizes
true , // includeTree
0 , // startFileNum
-1 , // numFiles (all)
this , // state
gotDoledbListWrapper2 ,
MAX_NICENESS , // niceness
true, // do err correction
-1, // maxRetries
false)) // isRealMerge
{
// return if it blocked
logTrace( g_conf.m_logTraceSpider, "END, getList blocked" );
return;
}
int32_t saved = m_launches;
// . add urls in list to cache
// . returns true if we should read another list
// . will set startKey to next key to start at
bool status = gotDoledbList2 ( );
logTrace( g_conf.m_logTraceSpider, "Back from gotDoledList2. Get more? %s", status ? "true" : "false" );
// if we did not launch anything, then decrement priority and
// try again. but if priority hits -1 then subloop2 will just go to
// the next collection.
if ( saved == m_launches ) {
m_sc->devancePriority();
logTrace( g_conf.m_logTraceSpider, "Loop, get next priority" );
goto subloopNextPriority;
}
logTrace( g_conf.m_logTraceSpider, "END, loop" );
// try another read
// now advance to next coll, launch one spider per coll
goto subloop;
}
// spider the spider rec in this list from doledb
// returns false if would block indexing a doc, returns true if would not,
// and returns true and sets g_errno on error
bool SpiderLoop::gotDoledbList2 ( ) {
// unlock
m_gettingDoledbList = false;
// shortcuts
CollectionRec *cr = m_sc->getCollectionRec();
// update m_msg5StartKey for next read
if ( m_list.getListSize() > 0 ) {
// what is m_list.m_ks ?
m_list.getLastKey((char *)&m_sc->m_msg5StartKey);
m_sc->m_msg5StartKey += 1;
}
// log this now
if ( g_conf.m_logDebugSpider ) {
int64_t now = gettimeofdayInMilliseconds();
int64_t took = now - m_doleStart;
if ( took > 2 )
logf(LOG_DEBUG,"spider: GOT list from doledb in "
"%" PRId64"ms "
"size=%" PRId32" bytes",
took,m_list.getListSize());
}
bool bail = false;
// bail instantly if in read-only mode (no RdbTrees!)
if ( g_conf.m_readOnlyMode ) bail = true;
// or if doing a daily merge
if ( g_dailyMerge.m_mergeMode ) bail = true;
// skip if too many udp slots being used
if (g_udpServer.getNumUsedSlotsIncoming() >= MAXUDPSLOTS ) bail =true;
// stop if too many out
if ( m_numSpidersOut >= MAX_SPIDERS ) bail = true;
if ( bail ) {
// return false to indicate to try another
return false;
}
// bail if list is empty
if ( m_list.getListSize() <= 0 ) {
return true;
}
time_t nowGlobal = getTimeGlobal();
// reset ptr to point to first rec in list
m_list.resetListPtr();
listLoop:
// get the current rec from list ptr
char *rec = (char *)m_list.getCurrentRec();
// the doledbkey
key96_t *doledbKey = (key96_t *)rec;
// get record after it next time
m_sc->m_nextDoledbKey = *doledbKey ;
// sanity check -- wrap watch -- how can this really happen?
if ( m_sc->m_nextDoledbKey.n1 == 0xffffffff &&
m_sc->m_nextDoledbKey.n0 == 0xffffffffffffffffLL ) {
g_process.shutdownAbort(true);
}
// if its negative inc by two then! this fixes the bug where the
// list consisted only of one negative key and was spinning forever
if ( (m_sc->m_nextDoledbKey & 0x01) == 0x00 )
m_sc->m_nextDoledbKey += 2;
// did it hit zero? that means it wrapped around!
if ( m_sc->m_nextDoledbKey.n1 == 0x0 &&
m_sc->m_nextDoledbKey.n0 == 0x0 ) {
// TODO: work this out
g_process.shutdownAbort(true);
}
// get priority from doledb key
int32_t pri = Doledb::getPriority ( doledbKey );
// if the key went out of its priority because its priority had no
// spider requests then it will bleed over into another priority so
// in that case reset it to the top of its priority for next time
int32_t pri3 = Doledb::getPriority ( &m_sc->m_nextDoledbKey );
if ( pri3 != m_sc->m_pri2 ) {
m_sc->m_nextDoledbKey = Doledb::makeFirstKey2 ( m_sc->m_pri2);
}
if ( g_conf.m_logDebugSpider ) {
int32_t pri4 = Doledb::getPriority ( &m_sc->m_nextDoledbKey );
log( LOG_DEBUG, "spider: setting pri2=%" PRId32" queue doledb nextkey to %s (pri=%" PRId32")",
m_sc->m_pri2, KEYSTR(&m_sc->m_nextDoledbKey,12), pri4 );
}
// update next doledbkey for this priority to avoid having to
// process excessive positive/negative key annihilations (mdw)
m_sc->m_nextKeys [ m_sc->m_pri2 ] = m_sc->m_nextDoledbKey;
// sanity
if ( pri < 0 || pri >= MAX_SPIDER_PRIORITIES ) { g_process.shutdownAbort(true); }
// skip the priority if we already have enough spiders on it
int32_t out = m_sc->m_outstandingSpiders[pri];
// how many spiders can we have out?
int32_t max = 0;
// in milliseconds. how long to wait between downloads from same IP.
// only for parnent urls, not including child docs like robots.txt
// iframe contents, etc.
int32_t maxSpidersOutPerIp = 1;
for ( int32_t i = 0 ; i < cr->m_numRegExs ; i++ ) {
if ( cr->m_spiderPriorities[i] != pri ) {
continue;
}
if ( cr->m_maxSpidersPerRule[i] > max ) {
max = cr->m_maxSpidersPerRule[i];
}
if ( cr->m_spiderIpMaxSpiders[i] > maxSpidersOutPerIp ) {
maxSpidersOutPerIp = cr->m_spiderIpMaxSpiders[i];
}
}
// skip? and re-get another doledb list from next priority...
if ( out >= max ) {
return true;
}
// no negatives - wtf?
// if only the tree has doledb recs, Msg5.cpp does not remove
// the negative recs... it doesn't bother to merge.
if ( (doledbKey->n0 & 0x01) == 0 ) {
// just increment then i guess
m_list.skipCurrentRecord();
// if exhausted -- try another load with m_nextKey set
if ( m_list.isExhausted() ) return true;
// otherwise, try the next doledb rec in this list
goto listLoop;
}
// what is this? a dataless positive key?
if ( m_list.getCurrentRecSize() <= 16 ) { g_process.shutdownAbort(true); }
int32_t ipOut = 0;
int32_t globalOut = 0;
// get the "spider rec" (SpiderRequest) (embedded in the doledb rec)
SpiderRequest *sreq = (SpiderRequest *)(rec + sizeof(key96_t)+4);
// sanity check. check for http(s)://
// might be a docid from a pagereindex.cpp
if ( sreq->m_url[0] != 'h' && ! is_digit(sreq->m_url[0]) ) {
log(LOG_WARN, "spider: got corrupt doledb record. ignoring. pls fix!!!" );
goto skipDoledbRec;
}
// . how many spiders out for this ip now?
// . TODO: count locks in case twin is spidering... but it did not seem
// to work right for some reason
for ( int32_t i = 0 ; i <= m_maxUsed ; i++ ) {
// get it
XmlDoc *xd = m_docs[i];
if ( ! xd ) continue;
if ( ! xd->m_sreqValid ) continue;
// to prevent one collection from hogging all the urls for
// particular IP and starving other collections, let's make
// this a per collection count.
// then allow msg13.cpp to handle the throttling on its end.
// also do a global count over all collections now
if ( xd->m_sreq.m_firstIp == sreq->m_firstIp ) globalOut++;
// only count for our same collection otherwise another
// collection can starve us out
if ( xd->m_collnum != cr->m_collnum ) continue;
if ( xd->m_sreq.m_firstIp == sreq->m_firstIp ) ipOut++;
}
// don't give up on this priority, just try next in the list.
// we now read 50k instead of 2k from doledb in order to fix
// one ip from bottle corking the whole priority!!
if ( ipOut >= maxSpidersOutPerIp ) {
skipDoledbRec:
// skip
m_list.skipCurrentRecord();
// if not exhausted try the next doledb rec in this list
if ( ! m_list.isExhausted() ) {
goto listLoop;
}
// print a log msg if we corked things up even
// though we read 50k from doledb
if ( m_list.getListSize() > 50000 ) {
log("spider: 50k not big enough");
}
// list is exhausted...
return true;
}
// but if the global is high, only allow one out per coll so at
// least we dont starve and at least we don't make a huge wait in
// line of queued results just sitting there taking up mem and
// spider slots so the crawlbot hourly can't pass.
if ( globalOut >= maxSpidersOutPerIp && ipOut >= 1 ) {
goto skipDoledbRec;
}
char ipbuf[16];
logDebug( g_conf.m_logDebugSpider, "spider: %" PRId32" spiders out for %s for %s", ipOut, iptoa(sreq->m_firstIp,ipbuf), sreq->m_url );
// sometimes we have it locked, but is still in doledb i guess.
// seems like we might have give the lock to someone else and
// there confirmation has not come through yet, so it's still
// in doledb.
{
ScopedLock sl(m_lockTableMtx);
// get the lock... only avoid if confirmed!
int64_t lockKey = makeLockTableKey(sreq);
int32_t slot = m_lockTable.getSlot(&lockKey);
if (slot >= 0) {
// get the corresponding lock then if there
UrlLock *lock = (UrlLock *)m_lockTable.getValueFromSlot(slot);
// if there and confirmed, why still in doledb?
if (lock) {
// fight log spam
static int32_t s_lastTime = 0;
if (nowGlobal - s_lastTime >= 2) {
// why is it not getting unlocked!?!?!
log("spider: spider request locked but still in doledb. uh48=%" PRId64" firstip=%s %s",
sreq->getUrlHash48(), iptoa(sreq->m_firstIp,ipbuf), sreq->m_url);
s_lastTime = nowGlobal;
}
// just increment then i guess
m_list.skipCurrentRecord();
// let's return false here to avoid an infinite loop
// since we are not advancing nextkey and m_pri is not
// being changed, that is what happens!
if (m_list.isExhausted()) {
// crap. but then we never make it to lower priorities.
// since we are returning false. so let's try the
// next priority in line.
// try returning true now that we skipped to
// the next priority level to avoid the infinite
// loop as described above.
return true;
}
// try the next record in this list
goto listLoop;
}
}
}
// log this now
if ( g_conf.m_logDebugSpider ) {
logf( LOG_DEBUG, "spider: trying to spider url %s", sreq->m_url );
}
// reset reason why crawl is not running, because we basically are now
cr->m_spiderStatus = SP_INPROGRESS; // this is 7
// be sure to save state so we do not re-send emails
cr->setNeedsSave();
// sometimes the spider coll is reset/deleted while we are
// trying to get the lock in spiderUrl() so let's use collnum
collnum_t collnum = m_sc->getCollectionRec()->m_collnum;
// . spider that. we don't care wheter it blocks or not
// . crap, it will need to block to get the locks!
// . so at least wait for that!!!
// . but if we end up launching the spider then this should NOT
// return false! only return false if we should hold up the doledb
// scan
// . this returns true right away if it failed to get the lock...
// which means the url is already locked by someone else...
// . it might also return true if we are already spidering the url
bool status = spiderUrl(sreq, doledbKey, collnum);
// just increment then i guess
m_list.skipCurrentRecord();
// if it blocked, wait for it to return to resume the doledb list
// processing because the msg12 is out and we gotta wait for it to
// come back. when lock reply comes back it tries to spider the url
// then it tries to call spiderDoledUrls() to keep the spider queue
// spidering fully.
if ( ! status ) {
return false;
}
// if exhausted -- try another load with m_nextKey set
if ( m_list.isExhausted() ) {
// if no more in list, fix the next doledbkey,
// m_sc->m_nextDoledbKey
log ( LOG_DEBUG, "spider: list exhausted." );
return true;
}
// otherwise, it might have been in the lock cache and quickly
// rejected, or rejected for some other reason, so try the next
// doledb rec in this list
goto listLoop;
}
// . spider the next url that needs it the most
// . returns false if blocked on a spider launch, otherwise true.
// . returns false if your callback will be called
// . returns true and sets g_errno on error
bool SpiderLoop::spiderUrl(SpiderRequest *sreq, key96_t *doledbKey, collnum_t collnum) {
// sanity
if ( ! m_sc ) { g_process.shutdownAbort(true); }
// wait until our clock is synced with host #0 before spidering since
// we store time stamps in the domain and ip wait tables in
// SpiderCache.cpp. We don't want to freeze domain for a long time
// because we think we have to wait until tomorrow before we can
// spider it.
// turned off?
if ( ( (! g_conf.m_spideringEnabled ||
// or if trying to exit
g_process.isShuttingDown()
) && ! sreq->m_isInjecting ) ||
// repairing the collection's rdbs?
g_repairMode ) {
// try to cancel outstanding spiders, ignore injects
for ( int32_t i = 0 ; i <= m_maxUsed ; i++ ) {
// get it
XmlDoc *xd = m_docs[i];
if ( ! xd ) continue;
// let everyone know, TcpServer::cancel() uses this in
// destroySocket()
g_errno = ECANCELLED;
// cancel the socket trans who has "xd" as its state.
// this will cause XmlDoc::gotDocWrapper() to be called
// now, on this call stack with g_errno set to
// ECANCELLED. But if Msg16 was not in the middle of
// HttpServer::getDoc() then this will have no effect.
g_httpServer.cancel ( xd );//, g_msg13RobotsWrapper );
// cancel any Msg13 that xd might have been waiting for
g_udpServer.cancel ( &xd->m_msg13 , msg_type_13 );
}
return true;
}
// do not launch any new spiders if in repair mode
if ( g_repairMode ) {
g_conf.m_spideringEnabled = false;
return true;
}
// do not launch another spider if less than 25MB of memory available.
// this causes us to dead lock when spiders use up all the mem, and
// file merge operation can not get any, and spiders need to add to
// titledb but can not until the merge completes!!
int64_t freeMem = g_mem.getFreeMem();
if (freeMem < 25*1024*1024 ) {
static int32_t s_lastTime = 0;
static int32_t s_missed = 0;
s_missed++;
int32_t now = getTime();
// don't spam the log, bug let people know about it
if ( now - s_lastTime > 10 ) {
log("spider: Need 25MB of free mem to launch spider, "
"only have %" PRId64". Failed to launch %" PRId32" times so "
"far.", freeMem , s_missed );
s_lastTime = now;
}
}
// . now that we have to use msg12 to see if the thing is locked
// to avoid spidering it.. (see comment in above function)
// we often try to spider something we are already spidering. that
// is why we have an rdbcache, m_lockCache, to make these lock
// lookups quick, now that the locking group is usually different
// than our own!
// . we have to check this now because removeAllLocks() below will
// remove a lock that one of our spiders might have. it is only
// sensitive to our hostid, not "spider id"
// sometimes we exhaust the doledb and m_nextDoledbKey gets reset
// to zero, we do a re-scan and get a doledbkey that is currently
// being spidered or is waiting for its negative doledb key to
// get into our doledb tree
for ( int32_t i = 0 ; i <= m_maxUsed ; i++ ) {
// get it
XmlDoc *xd = m_docs[i];
if ( ! xd ) continue;
// jenkins was coring spidering the same url in different
// collections at the same time
if ( ! xd->m_collnumValid ) continue;
if ( xd->m_collnum != collnum ) continue;
// . problem if it has our doledb key!
// . this happens if we removed the lock above before the
// spider returned!! that's why you need to set
// MAX_LOCK_AGE to like an hour or so
// . i've also seen this happen because we got stuck looking
// up like 80,000 places and it was taking more than an
// hour. it had only reach about 30,000 after an hour.
// so at this point just set the lock timeout to
// 4 hours i guess.
// . i am seeing this again and we are trying over and over
// again to spider the same url and hogging the cpu so
// we need to keep this sanity check in here for times
// like this
if ( xd->m_doledbKey == *doledbKey ) {
// just note it for now
log("spider: spidering same url %s twice. "
"different firstips?",
xd->m_firstUrl.getUrl());
//g_process.shutdownAbort(true); }
}
// keep chugging
continue;
}
// reset g_errno
g_errno = 0;
logDebug(g_conf.m_logDebugSpider, "spider: deleting doledb tree key=%s", KEYSTR(doledbKey, sizeof(*doledbKey)));
// now we just take it out of doledb instantly
bool deleted = g_doledb.getRdb()->deleteTreeNode(collnum, (const char *)doledbKey);
// if url filters rebuilt then doledb gets reset and i've seen us hit
// this node == -1 condition here... so maybe ignore it... just log
// what happened? i think we did a quickpoll somewhere between here
// and the call to spiderDoledUrls() and it the url filters changed
// so it reset doledb's tree. so in that case we should bail on this
// url.
if (!deleted) {
g_errno = EADMININTERFERENCE;
log("spider: lost url about to spider from url filters "
"and doledb tree reset. %s",mstrerror(g_errno));
return true;
}
// now remove from doleiptable since we removed from doledb
m_sc->removeFromDoledbIpTable(sreq->m_firstIp);
// DO NOT add back to waiting tree if max spiders
// out per ip was 1 OR there was a crawldelay. but better
// yet, take care of that in the winReq code above.
// . now add to waiting tree so we add another spiderdb
// record for this firstip to doledb
// . true = callForScan
// . do not add to waiting tree if we have enough outstanding
// spiders for this ip. we will add to waiting tree when
// we receive a SpiderReply in addSpiderReply()
if (
// this will just return true if we are not the
// responsible host for this firstip
! m_sc->addToWaitingTree(sreq->m_firstIp) &&
// must be an error...
g_errno ) {
const char *msg = "FAILED TO ADD TO WAITING TREE";
log("spider: %s %s",msg,mstrerror(g_errno));
//us->sendErrorReply ( udpSlot , g_errno );
//return;
}
int64_t lockKeyUh48 = makeLockTableKey ( sreq );
logDebug(g_conf.m_logDebugSpider, "spider: adding lock uh48=%" PRId64" lockkey=%" PRId64,
sreq->getUrlHash48(),lockKeyUh48);
// . add it to lock table to avoid respider, removing from doledb
// is not enough because we re-add to doledb right away
// . return true on error here
UrlLock tmp;
tmp.m_firstIp = sreq->m_firstIp;
tmp.m_spiderOutstanding = 0;
tmp.m_collnum = collnum;
if (!addLock(lockKeyUh48, &tmp)) {
return true;
}
// now do it. this returns false if it would block, returns true if it
// would not block. sets g_errno on error. it spiders m_sreq.
return spiderUrl2(sreq, doledbKey, collnum);
}
bool SpiderLoop::spiderUrl2(SpiderRequest *sreq, key96_t *doledbKey, collnum_t collnum) {
logTrace( g_conf.m_logTraceSpider, "BEGIN" );
// . find an available doc slot
// . we can have up to MAX_SPIDERS spiders (300)
int32_t i;
for ( i=0 ; i<MAX_SPIDERS ; i++ ) if (! m_docs[i]) break;
// come back later if we're full
if ( i >= MAX_SPIDERS ) {
log(LOG_DEBUG,"build: Already have %" PRId32" outstanding spiders.",
(int32_t)MAX_SPIDERS);
g_process.shutdownAbort(true);
}
XmlDoc *xd;
// otherwise, make a new one if we have to
try { xd = new (XmlDoc); }
// bail on failure, sleep and try again
catch(std::bad_alloc&) {
g_errno = ENOMEM;
log("build: Could not allocate %" PRId32" bytes to spider "
"the url %s. Will retry later.",
(int32_t)sizeof(XmlDoc), sreq->m_url );
logTrace( g_conf.m_logTraceSpider, "END, new XmlDoc failed" );
return true;
}
// register it's mem usage with Mem.cpp class
mnew ( xd , sizeof(XmlDoc) , "XmlDoc" );
// add to the array
m_docs [ i ] = xd;
CollectionRec *cr = g_collectiondb.getRec(collnum);
const char *coll = "collnumwasinvalid";
if ( cr ) coll = cr->m_coll;
if ( g_conf.m_logDebugSpider ) {
char ipbuf[16];
logf(LOG_DEBUG,"spider: spidering firstip9=%s(%" PRIu32") "
"uh48=%" PRIu64" prntdocid=%" PRIu64" k.n1=%" PRIu64" k.n0=%" PRIu64,
iptoa(sreq->m_firstIp,ipbuf),
(uint32_t)sreq->m_firstIp,
sreq->getUrlHash48(),
sreq->getParentDocId() ,
sreq->m_key.n1,
sreq->m_key.n0);
}
// this returns false and sets g_errno on error
if (!xd->set4(sreq, doledbKey, coll, NULL, MAX_NICENESS)) {
// i guess m_coll is no longer valid?
mdelete ( m_docs[i] , sizeof(XmlDoc) , "Doc" );
delete (m_docs[i]);
m_docs[i] = NULL;
// error, g_errno should be set!
logTrace( g_conf.m_logTraceSpider, "END, xd->set4 returned false" );
return true;
}
// call this after doc gets indexed
xd->setCallback ( xd , indexedDocWrapper );
// increase m_maxUsed if we have to
if ( i > m_maxUsed ) m_maxUsed = i;
// count it
m_numSpidersOut++;
// count this
m_sc->m_spidersOut++;
m_launches++;
// sanity check
if (sreq->m_priority <= -1 ) {
log("spider: fixing bogus spider req priority of %i for "
"url %s",
(int)sreq->m_priority,sreq->m_url);
sreq->m_priority = 0;
//g_process.shutdownAbort(true);
}
// update this
m_sc->m_outstandingSpiders[(unsigned char)sreq->m_priority]++;
if ( g_conf.m_logDebugSpider )
log(LOG_DEBUG,"spider: sc_out=%" PRId32" waiting=%" PRId32" url=%s",
m_sc->m_spidersOut,
m_sc->m_waitingTree.getNumUsedNodes(),
sreq->m_url);
// . return if this blocked
// . no, launch another spider!
logTrace( g_conf.m_logTraceSpider, "calling xd->indexDoc" );
bool status = xd->indexDoc();
logTrace( g_conf.m_logTraceSpider, "indexDoc status [%s]" , status?"true":"false");
// if we were injecting and it blocked... return false
if ( ! status ) {
logTrace( g_conf.m_logTraceSpider, "END, indexDoc blocked" );
return false;
}
// deal with this error
indexedDoc ( xd );
// "callback" will not be called cuz it should be NULL
logTrace( g_conf.m_logTraceSpider, "END, return true" );
return true;
}
void SpiderLoop::indexedDocWrapper ( void *state ) {
// . process the results
// . return if this blocks
if ( ! g_spiderLoop.indexedDoc ( (XmlDoc *)state ) ) return;
}
// . this will delete m_docs[i]
// . returns false if blocked, true otherwise
// . sets g_errno on error
bool SpiderLoop::indexedDoc ( XmlDoc *xd ) {
logTrace( g_conf.m_logTraceSpider, "BEGIN" );
// get our doc #, i
int32_t i = 0;
for ( ; i < MAX_SPIDERS ; i++ ) if ( m_docs[i] == xd) break;
// sanity check
if ( i >= MAX_SPIDERS ) { g_process.shutdownAbort(true); }
// . decrease m_maxUsed if we need to
// . we can decrease all the way to -1, which means no spiders going on
if ( m_maxUsed == i ) {
m_maxUsed--;
while ( m_maxUsed >= 0 && ! m_docs[m_maxUsed] ) m_maxUsed--;
}
// count it
m_numSpidersOut--;
// get coll
collnum_t collnum = xd->m_collnum;
// if coll was deleted while spidering, sc will be NULL
SpiderColl *sc = g_spiderCache.getSpiderColl(collnum);
// decrement this
if ( sc ) sc->m_spidersOut--;
// get the original request from xmldoc
SpiderRequest *sreq = &xd->m_sreq;
// update this.
if ( sc ) sc->m_outstandingSpiders[(unsigned char)sreq->m_priority]--;
// note it
// this should not happen any more since indexDoc() will take
// care of g_errno now by clearing it and adding an error spider
// reply to release the lock!!
if ( g_errno ) {
log("spider: spidering %s has error: %s. uh48=%" PRId64". "
"cn=%" PRId32,
xd->m_firstUrl.getUrl(),
mstrerror(g_errno),
xd->getFirstUrlHash48(),
(int32_t)collnum);
// don't release the lock on it right now. just let the
// lock expire on it after MAX_LOCK_AGE seconds. then it will
// be retried. we need to debug gb so these things never
// hapeen...
}
// we don't need this g_errno passed this point
g_errno = 0;
// we are responsible for deleting doc now
mdelete ( m_docs[i] , sizeof(XmlDoc) , "Doc" );
delete (m_docs[i]);
m_docs[i] = NULL;
// we did not block, so return true
logTrace( g_conf.m_logTraceSpider, "END" );
return true;
}
// use -1 for any collnum
int32_t SpiderLoop::getNumSpidersOutPerIp(int32_t firstIp, collnum_t collnum) {
ScopedLock sl(m_lockTableMtx);
int32_t count = 0;
// scan the slots
for (int32_t i = 0; i < m_lockTable.getNumSlots(); i++) {
// skip if empty
if (!m_lockTable.m_flags[i]) {
continue;
}
// cast lock
UrlLock *lock = (UrlLock *)m_lockTable.getValueFromSlot(i);
// skip if not outstanding, just a 5-second expiration wait
// when the spiderReply returns, so that in case a lock
// request for the same url was in progress, it will be denied.
if (!lock->m_spiderOutstanding) {
continue;
}
// correct collnum?
if (lock->m_collnum != collnum && collnum != -1) {
continue;
}
// skip if not yet expired
if (lock->m_firstIp == firstIp) {
count++;
}
}
return count;
}
CollectionRec *SpiderLoop::getActiveList() {
uint32_t nowGlobal = (uint32_t)getTimeGlobal();
if ( nowGlobal >= m_recalcTime && m_recalcTimeValid )
m_activeListValid = false;
// we set m_activeListValid to false when enabling/disabling spiders,
// when rebuilding url filters in Collectiondb.cpp rebuildUrlFilters()
// and when updating the site list in updateSiteList(). all of these
// could possible make an inactive collection active again, or vice
// versa. also when deleting a collection in Collectiondb.cpp. this
// keeps the below loop fast when we have thousands of collections
// and most are inactive or empty/deleted.
if (!m_activeListValid) {
buildActiveList();
//m_crx = m_activeList;
// recompute every 3 seconds, it seems kinda buggy!!
m_recalcTime = nowGlobal + 3;
m_recalcTimeValid = true;
}
return m_activeList;
}
void SpiderLoop::buildActiveList ( ) {
logTrace( g_conf.m_logTraceSpider, "BEGIN" );
// when do we need to rebuild the active list again?
m_recalcTimeValid = false;
m_activeListValid = true;
m_activeListCount = 0;
// reset the linked list of active collections
m_activeList = NULL;
bool found = false;
CollectionRec *tail = NULL;
for ( int32_t i = 0 ; i < g_collectiondb.getNumRecs(); i++ ) {
// get rec
CollectionRec *cr = g_collectiondb.getRec(i);
// skip if gone
if ( ! cr ) continue;
// stop if not enabled
bool active = true;
if ( ! cr->m_spideringEnabled ) active = false;
// we are at the tail of the linked list OR not in the list
cr->m_nextActive = NULL;
cr->m_isActive = false;
if ( ! active ) continue;
cr->m_isActive = true;
m_activeListCount++;
if ( cr == m_crx ) found = true;
// if first one, set it to head
if ( ! tail ) {
m_activeList = cr;
tail = cr;
continue;
}
// if not first one, add it to end of tail
tail->m_nextActive = cr;
tail = cr;
}
// we use m_bookmark so we do not get into an infinite loop
// in spider urls logic above
if ( ! found ) {
m_bookmark = NULL;
m_crx = NULL;
}
logTrace( g_conf.m_logTraceSpider, "END" );
}
bool SpiderLoop::isLocked(int64_t key) const {
ScopedLock sl(m_lockTableMtx);
return m_lockTable.isInTable(&key);
}
int32_t SpiderLoop::getLockCount() const {
ScopedLock sl(m_lockTableMtx);
return m_lockTable.getNumUsedSlots();
}
bool SpiderLoop::addLock(int64_t key, const UrlLock *lock) {
ScopedLock sl(m_lockTableMtx);
return m_lockTable.addKey(&key, lock);
}
void SpiderLoop::removeLock(int64_t key) {
ScopedLock sl(m_lockTableMtx);
m_lockTable.removeKey(&key);
}
void SpiderLoop::clearLocks(collnum_t collnum) {
ScopedLock sl(m_lockTableMtx);
// remove locks from locktable for all spiders out
for (;;) {
bool restart = false;
// scan the slots
for (int32_t i = 0; i < m_lockTable.getNumSlots(); i++) {
// skip if empty
if (!m_lockTable.m_flags[i]) {
continue;
}
UrlLock *lock = (UrlLock *)m_lockTable.getValueFromSlot(i);
// skip if not our collnum
if (lock->m_collnum != collnum) {
continue;
}
// nuke it!
m_lockTable.removeSlot(i);
// restart since cells may have shifted
restart = true;
}
if (!restart) {
break;
}
}
}