more spider fixes. right after getting lock,
use msg12 to remove rec from doledb/doleiptable and add 0 entry to waiting table so doledb is again immediately repopulated with that firstIp so we can spider multiple urls from the same ip at the same time.
This commit is contained in:
1
Conf.h
1
Conf.h
@ -619,6 +619,7 @@ class Conf {
|
||||
bool m_logDebugQuota ;
|
||||
bool m_logDebugRobots ;
|
||||
bool m_logDebugSpcache ; // SpiderCache.cpp debug
|
||||
bool m_logDebugSpiderFlow;
|
||||
bool m_logDebugSpeller ;
|
||||
bool m_logDebugTagdb ;
|
||||
bool m_logDebugSections;
|
||||
|
@ -1437,7 +1437,7 @@ unsigned long Hostdb::makeGroupMask ( long numGroups ) {
|
||||
// return first alive host in a group/shard
|
||||
Host *Hostdb::getLiveHostInGroup ( long groupId ) {
|
||||
Host *group = getGroup ( groupId );
|
||||
Host *live = NULL;
|
||||
//Host *live = NULL;
|
||||
for ( long i = 0 ; i < m_numHostsPerGroup ; i++ ) {
|
||||
// get it
|
||||
Host *h = &group[i];
|
||||
|
29
Msg4.cpp
29
Msg4.cpp
@ -1592,28 +1592,14 @@ bool loadAddsInProgress ( char *prefix ) {
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// right now the FAKEDB record is a signal to remove the spider lock
|
||||
// from the lock table because we are done spidering it.
|
||||
//
|
||||
void processSpecialSignal ( collnum_t collnum , char *p ) {
|
||||
|
||||
key_t *fake = (key_t *)p;
|
||||
|
||||
// this fake key means to add to waiting tree if we
|
||||
// are the assigned spider host!!
|
||||
if ( fake->n1 ) {
|
||||
//char *xx=NULL;*xx=0;
|
||||
// get firstip
|
||||
long firstIp = fake->n1;
|
||||
// when it should be spidered, might be 0
|
||||
long long timestamp = fake->n0;
|
||||
// add it
|
||||
SpiderColl *sc = g_spiderCache.getSpiderColl(collnum);
|
||||
sc->addToWaitingTree ( timestamp , firstIp , true );
|
||||
// that is it, not an actual rdb add
|
||||
return;
|
||||
}
|
||||
|
||||
// now this fake titledb key is used to remove spider lock
|
||||
if ( fake->n1 ) { char *xx=NULL;*xx=0; }
|
||||
|
||||
// use a uh48 of 0 to signify an unlock operation
|
||||
//g_titledb.getUrlHash48 ( (key_t *)key ) == 0LL ) {
|
||||
// must be 96 bits
|
||||
@ -1630,13 +1616,18 @@ void processSpecialSignal ( collnum_t collnum , char *p ) {
|
||||
// log debug msg
|
||||
if ( g_conf.m_logDebugSpider)
|
||||
// log debug
|
||||
logf(LOG_DEBUG,"spider: rdb: got fake titledb "
|
||||
logf(LOG_DEBUG,"msg4: got FAKE titledb "
|
||||
"key for lockkey=%llu - removing spider lock",
|
||||
lockKey);
|
||||
// shortcut
|
||||
HashTableX *ht = &g_spiderLoop.m_lockTable;
|
||||
UrlLock *lock = (UrlLock *)ht->getValue ( &lockKey );
|
||||
time_t nowGlobal = getTimeGlobal();
|
||||
|
||||
if ( g_conf.m_logDebugSpiderFlow )
|
||||
logf(LOG_DEBUG,"spflow: removed lock for "
|
||||
"docid=lockkey=%llu", lockKey);
|
||||
|
||||
// test it
|
||||
//if ( m_nowGlobal == 0 && lock )
|
||||
// m_nowGlobal = getTimeGlobal();
|
||||
|
@ -7625,6 +7625,14 @@ void Parms::init ( ) {
|
||||
m->m_priv = 1;
|
||||
m++;
|
||||
|
||||
m->m_title = "log debug spider flow messages";
|
||||
m->m_cgi = "ldspf";
|
||||
m->m_off = (char *)&g_conf.m_logDebugSpiderFlow - g;
|
||||
m->m_type = TYPE_BOOL;
|
||||
m->m_def = "0";
|
||||
m->m_priv = 1;
|
||||
m++;
|
||||
|
||||
m->m_title = "log debug speller messages";
|
||||
m->m_cgi = "ldsp";
|
||||
m->m_off = (char *)&g_conf.m_logDebugSpeller - g;
|
||||
|
16
Rdb.cpp
16
Rdb.cpp
@ -1950,9 +1950,17 @@ bool Rdb::addRecord ( collnum_t collnum,
|
||||
// remove from g_spiderLoop.m_lockTable too!
|
||||
if ( KEYNEG(key) ) {
|
||||
// log debug
|
||||
logf(LOG_DEBUG,"spider: rdb: "
|
||||
"got negative doledb "
|
||||
"key for uh48=%llu",
|
||||
logf(LOG_DEBUG,"spflow: removed doledb key "
|
||||
"for uh48=%llu",
|
||||
g_doledb.getUrlHash48(&doleKey));
|
||||
}
|
||||
else {
|
||||
// what collection?
|
||||
//SpiderColl *sc = g_spiderCache.getSpiderColl(collnum)
|
||||
// do not overflow!
|
||||
// log debug
|
||||
logf(LOG_DEBUG,"spflow: added doledb key "
|
||||
"for uh48=%llu",
|
||||
g_doledb.getUrlHash48(&doleKey));
|
||||
}
|
||||
}
|
||||
@ -2209,6 +2217,7 @@ bool Rdb::addRecord ( collnum_t collnum,
|
||||
// add the request
|
||||
if ( isReq ) {
|
||||
// log that. why isn't this undoling always
|
||||
/*
|
||||
if ( g_conf.m_logDebugSpider )
|
||||
logf(LOG_DEBUG,"spider: rdb: got spider "
|
||||
"request for uh48=%llu prntdocid=%llu "
|
||||
@ -2216,6 +2225,7 @@ bool Rdb::addRecord ( collnum_t collnum,
|
||||
sreq->getUrlHash48(),
|
||||
sreq->getParentDocId(),
|
||||
iptoa(sreq->m_firstIp));
|
||||
*/
|
||||
// false means to NOT call evaluateAllRequests()
|
||||
// because we call it below. the reason we do this
|
||||
// is because it does not always get called
|
||||
|
323
Spider.cpp
323
Spider.cpp
@ -1268,6 +1268,9 @@ bool SpiderColl::makeWaitingTree ( ) {
|
||||
log("spider: makeWaitTree: %s",mstrerror(g_errno));
|
||||
return false;
|
||||
}
|
||||
// note it
|
||||
log(LOG_DEBUG,"spflow: added time=1 ip=%s to waiting tree",
|
||||
iptoa(firstIp));
|
||||
// a tmp var
|
||||
long long fakeone = 1LL;
|
||||
// add to table now since its in the tree
|
||||
@ -1499,26 +1502,48 @@ bool SpiderColl::addSpiderReply ( SpiderReply *srep ) {
|
||||
// if ( last != srep->m_downloadEndTime ) { char *xx=NULL;*xx=0;}
|
||||
//}
|
||||
|
||||
|
||||
// skip:
|
||||
|
||||
// . add to wait tree and let it populate doledb on its batch run
|
||||
// . use a spiderTime of 0 which means unknown and that it needs to
|
||||
// scan spiderdb to get that
|
||||
// . returns false and sets g_errno on error
|
||||
return addToWaitingTree ( 0LL, srep->m_firstIp , true );
|
||||
}
|
||||
|
||||
|
||||
void SpiderColl::removeFromDoledbTable ( long firstIp ) {
|
||||
|
||||
// . decrement doledb table ip count for firstIp
|
||||
// . update how many per ip we got doled
|
||||
long *score = (long *)m_doleIpTable.getValue32 ( srep->m_firstIp );
|
||||
long *score = (long *)m_doleIpTable.getValue32 ( firstIp );
|
||||
|
||||
// wtf! how did this spider without being doled?
|
||||
if ( ! score ) {
|
||||
if ( ! srep->m_fromInjectionRequest )
|
||||
log("spider: corruption. received spider reply whose "
|
||||
"ip has no entry in dole ip table. firstip=%s",
|
||||
iptoa(srep->m_firstIp));
|
||||
goto skip;
|
||||
//if ( ! srep->m_fromInjectionRequest )
|
||||
log("spider: corruption. received spider reply whose "
|
||||
"ip has no entry in dole ip table. firstip=%s",
|
||||
iptoa(firstIp));
|
||||
return;
|
||||
}
|
||||
|
||||
// reduce it
|
||||
*score = *score - 1;
|
||||
|
||||
// now we log it too
|
||||
log(LOG_DEBUG,"spflow: removed ip=%s to doleiptable (count=%li)",
|
||||
iptoa(firstIp),*score);
|
||||
|
||||
|
||||
// remove if zero
|
||||
if ( *score == 0 ) {
|
||||
// note it
|
||||
log(LOG_DEBUG,"spflow: removing ip=%s from doledbtable",
|
||||
iptoa(firstIp));
|
||||
// this can file if writes are disabled on this hashtablex
|
||||
// because it is saving
|
||||
m_doleIpTable.removeKey ( &srep->m_firstIp );
|
||||
m_doleIpTable.removeKey ( &firstIp );
|
||||
// sanity check
|
||||
//if ( ! m_doleIpTable.m_isWritable ) { char *xx=NULL;*xx=0; }
|
||||
}
|
||||
@ -1527,20 +1552,12 @@ bool SpiderColl::addSpiderReply ( SpiderReply *srep ) {
|
||||
// all done?
|
||||
if ( g_conf.m_logDebugSpider ) {
|
||||
// log that too!
|
||||
logf(LOG_DEBUG,"spider: undoling srep uh48=%llu "
|
||||
"parentdocid=%llu ipdolecount=%li",
|
||||
srep->getUrlHash48(),srep->getParentDocId(),*score);
|
||||
logf(LOG_DEBUG,"spider: discounting firstip=%s to %li",
|
||||
iptoa(firstIp),*score);
|
||||
}
|
||||
|
||||
skip:
|
||||
|
||||
// . add to wait tree and let it populate doledb on its batch run
|
||||
// . use a spiderTime of 0 which means unknown and that it needs to
|
||||
// scan spiderdb to get that
|
||||
// . returns false and sets g_errno on error
|
||||
return addToWaitingTree ( 0LL, srep->m_firstIp , true );
|
||||
}
|
||||
|
||||
|
||||
// . Rdb.cpp calls SpiderColl::addSpiderRequest/Reply() for every positive
|
||||
// spiderdb record it adds to spiderdb. that way our cache is kept
|
||||
// uptodate incrementally
|
||||
@ -1590,12 +1607,12 @@ bool SpiderColl::addSpiderRequest ( SpiderRequest *sreq ,
|
||||
// data value part of the doleiptable...? therefore we should
|
||||
// probably move this check down below after we get the priority
|
||||
// of the spider request.
|
||||
char *val = (char *)m_doleIpTable.getValue ( &sreq->m_firstIp );
|
||||
if ( val && *val > 0 ) {
|
||||
if ( g_conf.m_logDebugSpider )
|
||||
log("spider: request already in dole table");
|
||||
return true;
|
||||
}
|
||||
//char *val = (char *)m_doleIpTable.getValue ( &sreq->m_firstIp );
|
||||
//if ( val && *val > 0 ) {
|
||||
// if ( g_conf.m_logDebugSpider )
|
||||
// log("spider: request IP already in dole table");
|
||||
// return true;
|
||||
//}
|
||||
|
||||
// . skip if already in wait tree
|
||||
// . no, no. what if the current url for this firstip is not due to
|
||||
@ -1675,7 +1692,7 @@ bool SpiderColl::addSpiderRequest ( SpiderRequest *sreq ,
|
||||
|
||||
// log it
|
||||
logf(LOG_DEBUG,
|
||||
"spider: added %s request to wait tree "
|
||||
"spider: spiderdb added %s request to wait tree "
|
||||
"uh48=%llu "
|
||||
"firstIp=%s "
|
||||
"parentFirstIp=%lu "
|
||||
@ -1789,6 +1806,11 @@ bool SpiderColl::addToWaitingTree ( uint64_t spiderTimeMS , long firstIp ,
|
||||
//char *xx=NULL; *xx=0;
|
||||
return false;
|
||||
}
|
||||
|
||||
// note it
|
||||
log(LOG_DEBUG,"spflow: added time=%lli ip=%s to waiting tree",
|
||||
spiderTimeMS , iptoa(firstIp));
|
||||
|
||||
// add to table now since its in the tree
|
||||
if ( ! m_waitingTable.addKey ( &firstIp , &spiderTimeMS ) ) {
|
||||
// remove from tree then
|
||||
@ -1852,7 +1874,7 @@ long SpiderColl::getNextIpFromWaitingTree ( ) {
|
||||
// at least one ping from him so we do not remove at startup.
|
||||
// if it is in doledb or in the middle of being added to doledb
|
||||
// via msg4, nuke it as well!
|
||||
if ( ! isAssignedToUs (firstIp) || m_doleIpTable.isInTable(&firstIp)) {
|
||||
if ( ! isAssignedToUs (firstIp) && m_doleIpTable.isInTable(&firstIp)) {
|
||||
// only delete if this host is alive and has sent us a ping
|
||||
// before so we know he was up at one time. this way we do not
|
||||
// remove all his keys just because we restarted and think he
|
||||
@ -1862,6 +1884,12 @@ long SpiderColl::getNextIpFromWaitingTree ( ) {
|
||||
// and becase the trees/tables for spidercache are saving
|
||||
// in Process.cpp's g_spiderCache::save() call
|
||||
m_waitingTree.deleteNode ( node , true );
|
||||
|
||||
// note it
|
||||
log(LOG_DEBUG,"spflow: removed1 ip=%s from waiting tree. "
|
||||
"nn=%li",
|
||||
iptoa(firstIp),m_waitingTree.m_numUsedNodes);
|
||||
|
||||
// log it
|
||||
if ( g_conf.m_logDebugSpcache )
|
||||
log("spider: erasing waitingtree key firstip=%s",
|
||||
@ -2618,6 +2646,16 @@ bool SpiderColl::scanSpiderdb ( bool needList ) {
|
||||
if ( tm1 == tm2 && priority == winPriority &&
|
||||
spiderTimeMS > winTimeMS )
|
||||
continue;
|
||||
|
||||
// bail if it is locked! we now call
|
||||
// msg12::confirmLockAcquisition() after we get the lock,
|
||||
// which deletes the doledb record from doledb and doleiptable
|
||||
// rightaway and adds a "0" entry into the waiting tree so
|
||||
// that scanSpiderdb() repopulates doledb again with that
|
||||
// "firstIp". this way we can spider multiple urls from the
|
||||
// same ip at the same time.
|
||||
if ( g_spiderLoop.m_lockTable.isInTable(&sreq->m_probDocId) )
|
||||
continue;
|
||||
|
||||
// ok, we got a new winner
|
||||
winPriority = priority;
|
||||
@ -2721,6 +2759,16 @@ bool SpiderColl::scanSpiderdb ( bool needList ) {
|
||||
log("spider: nuking misleading waitingtree key "
|
||||
"firstIp=%s", iptoa(firstIp));
|
||||
m_waitingTree.deleteNode ( 0,(char *)&m_waitingTreeKey,true);
|
||||
|
||||
// note it
|
||||
unsigned long long timestamp64 = m_waitingTreeKey.n1;
|
||||
timestamp64 <<= 32;
|
||||
timestamp64 |= m_waitingTreeKey.n0 >> 32;
|
||||
long firstIp = m_waitingTreeKey.n0 &= 0xffffffff;
|
||||
log(LOG_DEBUG,"spflow: removed2 time=%lli ip=%s from "
|
||||
"waiting tree. nn=%li.",
|
||||
timestamp64, iptoa(firstIp),m_waitingTree.m_numUsedNodes);
|
||||
|
||||
m_waitingTable.removeKey ( &firstIp );
|
||||
// sanity check
|
||||
if ( ! m_waitingTable.m_isWritable ) { char *xx=NULL;*xx=0;}
|
||||
@ -2774,6 +2822,11 @@ bool SpiderColl::scanSpiderdb ( bool needList ) {
|
||||
m_bestRequest->m_url);
|
||||
// this should never fail since we deleted one above
|
||||
m_waitingTree.addKey ( &wk2 );
|
||||
|
||||
// note it
|
||||
log(LOG_DEBUG,"spflow: added time=%lli ip=%s to waiting tree",
|
||||
m_bestSpiderTimeMS , iptoa(fip));
|
||||
|
||||
// keep the table in sync now with the time
|
||||
m_waitingTable.addKey( &fip, &m_bestSpiderTimeMS );
|
||||
// sanity check
|
||||
@ -2971,6 +3024,9 @@ bool SpiderColl::addToDoleTable ( SpiderRequest *sreq ) {
|
||||
*score = *score + 1;
|
||||
// sanity check
|
||||
if ( *score <= 0 ) { char *xx=NULL;*xx=0; }
|
||||
// now we log it too
|
||||
log(LOG_DEBUG,"spflow: added ip=%s to doleiptable (score=%li)",
|
||||
iptoa(sreq->m_firstIp),*score);
|
||||
}
|
||||
else {
|
||||
// ok, add new slot
|
||||
@ -2982,6 +3038,9 @@ bool SpiderColl::addToDoleTable ( SpiderRequest *sreq ) {
|
||||
// return true with g_errno set on error
|
||||
return false;
|
||||
}
|
||||
// now we log it too
|
||||
log(LOG_DEBUG,"spflow: added ip=%s to doleiptable (score=1)",
|
||||
iptoa(sreq->m_firstIp));
|
||||
// sanity check
|
||||
//if ( ! m_doleIpTable.m_isWritable ) { char *xx=NULL;*xx=0;}
|
||||
}
|
||||
@ -3272,8 +3331,8 @@ void SpiderLoop::spiderDoledUrls ( ) {
|
||||
if ( g_conf.m_testSpiderEnabled ) maxSpiders = 6;
|
||||
}
|
||||
// debug log
|
||||
if ( g_conf.m_logDebugSpider )
|
||||
log("spider: has %li spiders out",m_sc->m_spidersOut);
|
||||
//if ( g_conf.m_logDebugSpider )
|
||||
// log("spider: has %li spiders out",m_sc->m_spidersOut);
|
||||
// obey max spiders per collection too
|
||||
if ( m_sc->m_spidersOut >= maxSpiders ) continue;
|
||||
// ok, we are good to launch a spider for coll m_cri
|
||||
@ -3378,11 +3437,11 @@ void SpiderLoop::spiderDoledUrls ( ) {
|
||||
if ( g_conf.m_logDebugSpider ) {
|
||||
m_doleStart = gettimeofdayInMillisecondsLocal();
|
||||
// 12 byte doledb keys
|
||||
long pri = g_doledb.getPriority(&m_sc->m_nextDoledbKey);
|
||||
logf(LOG_DEBUG,"spider: loading list from doledb startkey=%s "
|
||||
"pri=%li",
|
||||
KEYSTR(&m_sc->m_nextDoledbKey,12),
|
||||
pri);
|
||||
//long pri = g_doledb.getPriority(&m_sc->m_nextDoledbKey);
|
||||
//logf(LOG_DEBUG,"spider: loading list from doledb startkey=%s"
|
||||
// " pri=%li",
|
||||
// KEYSTR(&m_sc->m_nextDoledbKey,12),
|
||||
// pri);
|
||||
}
|
||||
|
||||
// get a spider rec for us to spider from doledb
|
||||
@ -3411,8 +3470,8 @@ void SpiderLoop::spiderDoledUrls ( ) {
|
||||
// return if it blocked
|
||||
return ;
|
||||
// debug
|
||||
log(LOG_DEBUG,"spider: read list of %li bytes from spiderdb for "
|
||||
"pri=%li+",m_list.m_listSize,(long)m_sc->m_pri);
|
||||
//log(LOG_DEBUG,"spider: read list of %li bytes from spiderdb for "
|
||||
// "pri=%li+",m_list.m_listSize,(long)m_sc->m_pri);
|
||||
// breathe
|
||||
QUICKPOLL ( MAX_NICENESS );
|
||||
// . add urls in list to cache
|
||||
@ -3459,9 +3518,11 @@ bool SpiderLoop::gotDoledbList2 ( ) {
|
||||
if ( g_conf.m_logDebugSpider ) {
|
||||
long long now = gettimeofdayInMillisecondsLocal();
|
||||
long long took = now - m_doleStart;
|
||||
logf(LOG_DEBUG,"spider: GOT list from doledb in %llims "
|
||||
"size=%li bytes",
|
||||
took,m_list.getListSize());
|
||||
if ( took > 1 )
|
||||
logf(LOG_DEBUG,"spider: GOT list from doledb in "
|
||||
"%llims "
|
||||
"size=%li bytes",
|
||||
took,m_list.getListSize());
|
||||
}
|
||||
|
||||
// bail instantly if in read-only mode (no RdbTrees!)
|
||||
@ -3960,8 +4021,8 @@ bool SpiderLoop::spiderUrl9 ( SpiderRequest *sreq ,
|
||||
collnum,
|
||||
sameIpWaitTime,
|
||||
m_sreq->m_firstIp,
|
||||
NULL ,
|
||||
NULL ) )
|
||||
NULL , // state
|
||||
NULL ) ) // callback
|
||||
return false;
|
||||
// no go
|
||||
m_msg12.m_gettingLocks = false;
|
||||
@ -4092,6 +4153,10 @@ bool SpiderLoop::spiderUrl2 ( ) {
|
||||
// update this
|
||||
m_sc->m_outstandingSpiders[(unsigned char)m_sreq->m_priority]++;
|
||||
|
||||
if ( g_conf.m_logDebugSpiderFlow )
|
||||
log(LOG_DEBUG,"spflow: sc_out=%li url=%s",
|
||||
m_sc->m_spidersOut,m_sreq->m_url);
|
||||
|
||||
// . return if this blocked
|
||||
// . no, launch another spider!
|
||||
bool status = xd->indexDoc();
|
||||
@ -4374,6 +4439,7 @@ bool Msg12::getLocks ( long long probDocId ,
|
||||
m_numReplies = 0;
|
||||
m_grants = 0;
|
||||
m_removing = false;
|
||||
m_confirming = false;
|
||||
// make sure is really docid
|
||||
if ( probDocId & ~DOCID_MASK ) { char *xx=NULL;*xx=0; }
|
||||
// . mask out the lower bits that may change if there is a collision
|
||||
@ -4392,7 +4458,6 @@ bool Msg12::getLocks ( long long probDocId ,
|
||||
m_callback = callback;
|
||||
m_state = state;
|
||||
m_hasLock = false;
|
||||
|
||||
// support ability to spider multiple urls from same ip
|
||||
m_doledbKey = *doledbKey;
|
||||
m_collnum = collnum;
|
||||
@ -4427,9 +4492,9 @@ bool Msg12::getLocks ( long long probDocId ,
|
||||
"lockkey=%llu", m_url,m_lockKey);
|
||||
|
||||
// now the locking group is based on the probable docid
|
||||
unsigned long groupId = g_hostdb.getGroupIdFromDocId(m_lockKey);
|
||||
m_lockGroupId = g_hostdb.getGroupIdFromDocId(m_lockKey);
|
||||
// ptr to list of hosts in the group
|
||||
Host *hosts = g_hostdb.getGroup ( groupId );
|
||||
Host *hosts = g_hostdb.getGroup ( m_lockGroupId );
|
||||
|
||||
// short cut
|
||||
UdpServer *us = &g_udpServer;
|
||||
@ -4530,20 +4595,29 @@ bool Msg12::gotLockReply ( UdpSlot *slot ) {
|
||||
m_gettingLocks = false;
|
||||
return true;
|
||||
}
|
||||
// all done if we were confirming
|
||||
if ( m_confirming ) {
|
||||
// note it
|
||||
if ( g_conf.m_logDebugSpider )
|
||||
logf(LOG_DEBUG,"spider: done confirming all locks "
|
||||
"for %s",m_url);//m_sreq->m_url);
|
||||
// we are done
|
||||
m_gettingLocks = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
// if got ALL locks, spider it
|
||||
if ( m_grants == m_numReplies ) {
|
||||
// note it
|
||||
if ( g_conf.m_logDebugSpider )
|
||||
logf(LOG_DEBUG,"spider: got lock for %s lockkey=%llu",
|
||||
m_url,m_lockKey);//m_sreq->getUrlHash48());
|
||||
if ( g_conf.m_logDebugSpiderFlow )
|
||||
logf(LOG_DEBUG,"spflow: got lock for docid=lockkey=%llu",
|
||||
m_lockKey);
|
||||
// flag this
|
||||
m_hasLock = true;
|
||||
// we are done
|
||||
m_gettingLocks = false;
|
||||
//m_gettingLocks = false;
|
||||
|
||||
|
||||
char *coll = g_collectiondb.getCollName ( m_collnum );
|
||||
|
||||
///////
|
||||
//
|
||||
// now tell our group (shard) to remove from doledb
|
||||
@ -4551,44 +4625,19 @@ bool Msg12::gotLockReply ( UdpSlot *slot ) {
|
||||
// should skip this probable docid because it is in the
|
||||
// LOCK TABLE!
|
||||
//
|
||||
// This logic should allow us to spider multiple urls
|
||||
// from the same IP at the same time.
|
||||
//
|
||||
///////
|
||||
char listBuf[128];
|
||||
char *p = listBuf;
|
||||
// doledb
|
||||
*p++ = RDB_DOLEDB;
|
||||
// make it negative
|
||||
m_doledbKey.n0 &= 0xfffffffffffffffeLL;
|
||||
// add the negative doledb key to delete it
|
||||
memcpy ( p , &m_doledbKey , sizeof(DOLEDBKEY) );
|
||||
p += sizeof(DOLEDBKEY);
|
||||
// now add the fake titledb key
|
||||
*p++ = RDB_FAKEDB;
|
||||
// this can't be zero!! because Rdb.cpp checks it for zero
|
||||
// as meaning to remove the spider lock
|
||||
if ( ! m_firstIp ) { char *xx=NULL;*xx=0; }
|
||||
long long timestamp64 = gettimeofdayInMillisecondsGlobal();
|
||||
// when to spider this
|
||||
timestamp64 += m_sameIpWaitTime;
|
||||
// embed the firstip and spiderIpWaitTime into this key
|
||||
key_t k;
|
||||
k.n1 = m_firstIp;
|
||||
k.n0 = timestamp64;
|
||||
memcpy ( p , &k , sizeof(key_t) );
|
||||
p += sizeof(key_t);
|
||||
// tell Rdb.cpp on each host in our group (shard) that they
|
||||
// should add this new entry into their waiting table. really
|
||||
// only the assigned host in the group (shard) should do this.
|
||||
if ( ! m_msg4.addMetaList ( listBuf ,
|
||||
p - listBuf ,
|
||||
coll ,
|
||||
this ,
|
||||
rejuvenateIPWrapper ,
|
||||
1 ) ) // niceness
|
||||
return false;
|
||||
// ok, they are all back, resume loop
|
||||
|
||||
// returns false if would block
|
||||
if ( ! confirmLockAcquisition ( ) ) return false;
|
||||
// . we did it without blocking, maybe cuz we are a single node
|
||||
// . ok, they are all back, resume loop
|
||||
if ( ! m_callback ) g_spiderLoop.spiderUrl2 ( );
|
||||
// all done
|
||||
return true;
|
||||
|
||||
}
|
||||
// note it
|
||||
if ( g_conf.m_logDebugSpider )
|
||||
@ -4684,6 +4733,71 @@ bool Msg12::removeAllLocks ( ) {
|
||||
return true;
|
||||
}
|
||||
|
||||
class ConfirmRequest {
|
||||
public:
|
||||
collnum_t m_collnum;
|
||||
key_t m_doledbKey;
|
||||
long m_firstIp;
|
||||
};
|
||||
|
||||
|
||||
bool Msg12::confirmLockAcquisition ( ) {
|
||||
|
||||
// we are now removing
|
||||
m_confirming = true;
|
||||
|
||||
// make that the request
|
||||
// . point to start of the 12 byte request buffer
|
||||
// . m_lockSequence should still be valid
|
||||
ConfirmRequest cq;
|
||||
char *request = (char *)&cq;
|
||||
long requestSize = sizeof(ConfirmRequest);
|
||||
// sanity
|
||||
if ( requestSize != sizeof(ConfirmRequest) ) { char *xx=NULL;*xx=0; }
|
||||
// set it
|
||||
cq.m_collnum = m_collnum;
|
||||
cq.m_doledbKey = m_doledbKey;
|
||||
cq.m_firstIp = m_firstIp;
|
||||
// . use the locking group from when we sent the lock request
|
||||
// . get ptr to list of hosts in the group
|
||||
Host *hosts = g_hostdb.getGroup ( m_lockGroupId );
|
||||
// this must select the same group that is going to spider it!
|
||||
// i.e. our group! because we check our local lock table to see
|
||||
// if a doled url is locked before spidering it ourselves.
|
||||
//Host *hosts = g_hostdb.getMyGroup();
|
||||
// short cut
|
||||
UdpServer *us = &g_udpServer;
|
||||
// get # of hosts in each mirror group
|
||||
long hpg = g_hostdb.getNumHostsPerGroup();
|
||||
// loop over hosts in that group
|
||||
for ( long i = 0 ; i < hpg ; i++ ) {
|
||||
// get a host
|
||||
Host *h = &hosts[i];
|
||||
// skip if dead! no need to get a reply from dead guys
|
||||
if ( g_hostdb.isDead ( h ) ) continue;
|
||||
// send request to him
|
||||
if ( ! us->sendRequest ( request ,
|
||||
// a size of 2 should mean confirm
|
||||
requestSize ,
|
||||
0x12 , // msgType
|
||||
h->m_ip ,
|
||||
h->m_port ,
|
||||
h->m_hostId ,
|
||||
NULL , // retSlotPtrPtr
|
||||
this , // state data
|
||||
gotLockReplyWrapper ,
|
||||
60*60*24*365 ) )
|
||||
// udpserver returns false and sets g_errno on error
|
||||
return true;
|
||||
// count them
|
||||
m_numRequests++;
|
||||
}
|
||||
// block?
|
||||
if ( m_numRequests > 0 ) return false;
|
||||
// did not block
|
||||
return true;
|
||||
}
|
||||
|
||||
void handleRequest12 ( UdpSlot *udpSlot , long niceness ) {
|
||||
// get request
|
||||
char *request = udpSlot->m_readBuf;
|
||||
@ -4692,6 +4806,49 @@ void handleRequest12 ( UdpSlot *udpSlot , long niceness ) {
|
||||
UdpServer *us = &g_udpServer;
|
||||
// breathe
|
||||
QUICKPOLL ( niceness );
|
||||
|
||||
//
|
||||
// . is it confirming that he got all the locks?
|
||||
// . if so, remove the doledb record and dock the doleiptable count
|
||||
// before adding a waiting tree entry to re-pop the doledb record
|
||||
//
|
||||
if ( reqSize == sizeof(ConfirmRequest) ) {
|
||||
char *msg = NULL;
|
||||
ConfirmRequest *cq = (ConfirmRequest *)request;
|
||||
// get it
|
||||
SpiderColl *sc = g_spiderCache.getSpiderColl(cq->m_collnum);
|
||||
// make it negative
|
||||
cq->m_doledbKey.n0 &= 0xfffffffffffffffeLL;
|
||||
// and add it to doledb
|
||||
Rdb *rdb = &g_doledb.m_rdb;
|
||||
if ( ! rdb->addRecord ( cq->m_collnum,
|
||||
(char *)&cq->m_doledbKey,
|
||||
NULL , // data
|
||||
0 , //dataSize
|
||||
1 )){ // niceness
|
||||
// tree is dumping or something, probably ETRYAGAIN
|
||||
msg = "error adding neg rec to doledb";
|
||||
log("spider: %s %s",msg,mstrerror(g_errno));
|
||||
us->sendErrorReply ( udpSlot , g_errno );
|
||||
return;
|
||||
}
|
||||
// now remove from doleiptable since we removed from doledb
|
||||
sc->removeFromDoledbTable ( cq->m_firstIp );
|
||||
// . now add to waiting tree so we add another spiderdb
|
||||
// record for this firstip to doledb
|
||||
// . true = callForScan
|
||||
if ( ! sc->addToWaitingTree ( 0 , cq->m_firstIp, true ) ) {
|
||||
msg = "FAILED TO ADD TO WAITING TREE";
|
||||
log("spider: %s %s",msg,mstrerror(g_errno));
|
||||
us->sendErrorReply ( udpSlot , g_errno );
|
||||
return;
|
||||
}
|
||||
// success!!
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
// sanity check
|
||||
if ( reqSize != 12 ) {
|
||||
log("spider: bad msg12 request size");
|
||||
|
8
Spider.h
8
Spider.h
@ -985,6 +985,8 @@ class SpiderColl {
|
||||
bool addSpiderReply ( SpiderReply *srep );
|
||||
bool addSpiderRequest ( SpiderRequest *sreq , long long nowGlobalMS );
|
||||
|
||||
void removeFromDoledbTable ( long firstIp );
|
||||
|
||||
bool addToDoleTable ( SpiderRequest *sreq ) ;
|
||||
|
||||
|
||||
@ -1128,6 +1130,11 @@ public:
|
||||
|
||||
class Msg12 {
|
||||
public:
|
||||
|
||||
bool confirmLockAcquisition ( ) ;
|
||||
|
||||
unsigned long m_lockGroupId;
|
||||
|
||||
// stuff for getting the msg12 lock for spidering a url
|
||||
bool getLocks ( long long probDocId,
|
||||
char *url ,
|
||||
@ -1148,6 +1155,7 @@ class Msg12 {
|
||||
long m_numRequests;
|
||||
long m_grants;
|
||||
bool m_removing;
|
||||
bool m_confirming;
|
||||
char *m_url; // for debugging
|
||||
void *m_state;
|
||||
void (*m_callback)(void *state);
|
||||
|
@ -17247,10 +17247,10 @@ char *XmlDoc::getMetaList ( bool forDelete ) {
|
||||
// how much we need
|
||||
long needx = sizeof(SpiderReply) + 1;
|
||||
// doledb key?
|
||||
if ( m_doledbKey.n0 || m_doledbKey.n1 )
|
||||
needx += 1 + sizeof(key_t); // + 4;
|
||||
// the titledb unlock key for msg12 in spider.cpp
|
||||
needx += 1 + sizeof(key_t);
|
||||
//if ( m_doledbKey.n0 || m_doledbKey.n1 )
|
||||
// needx += 1 + sizeof(key_t); // + 4;
|
||||
// the FAKEDB unlock key for msg12 in spider.cpp
|
||||
needx += 1 + sizeof(key_t); // FAKEDB
|
||||
// make the buffer
|
||||
m_metaList = (char *)mmalloc ( needx , "metalist");
|
||||
if ( ! m_metaList ) return NULL;
|
||||
|
5
gb.conf
5
gb.conf
@ -73,10 +73,10 @@
|
||||
<qaSearchTestEnabled>1</>
|
||||
|
||||
# Enable spidering on all hosts
|
||||
<allSpidersOn>1</>
|
||||
<allSpidersOn>0</>
|
||||
|
||||
# Disable spidering on all hosts
|
||||
<allSpidersOff>1</>
|
||||
<allSpidersOff>0</>
|
||||
|
||||
# Serves ads unless pure=1 is in cgi parms.
|
||||
<adFeedEnabled>0</>
|
||||
@ -588,6 +588,7 @@
|
||||
<logDebugQuotaMessages>0</>
|
||||
<logDebugRobotsMessages>0</>
|
||||
<logDebugSpiderCacheMessages>0</>
|
||||
<logDebugSpiderFlowMessages>1</>
|
||||
<logDebugSpellerMessages>0</>
|
||||
<logDebugSectionsMessages>0</>
|
||||
<logDebugSeoInsertMessages>1</>
|
||||
|
Reference in New Issue
Block a user