privacore-open-source-searc.../SpiderColl.cpp
Ivan Skytte Jørgensen e515e92dae Removed local/global time distinction
It has always been local time since ... forever. We rely on NTP doing its job.
2018-08-07 14:38:37 +02:00

3509 lines
114 KiB
C++

#include "SpiderColl.h"
#include "Spider.h"
#include "SpiderLoop.h"
#include "Doledb.h"
#include "Collectiondb.h"
#include "UdpServer.h"
#include "Stats.h"
#include "SafeBuf.h"
#include "Repair.h" //g_repairMode
#include "Process.h"
#include "JobScheduler.h"
#include "XmlDoc.h"
#include "ip.h"
#include "Conf.h"
#include "Mem.h"
#include "SpiderdbRdbSqliteBridge.h"
#include "SpiderdbUtil.h"
#include "UrlBlockCheck.h"
#include "ScopedLock.h"
#include "Sanity.h"
#include "Errno.h"
#define OVERFLOWLISTSIZE 200
// How large chunks of spiderdb to load in for each read
#define SR_READ_SIZE (512*1024)
//Hint for size allocation to m_winnerTree
#define MAX_REQUEST_SIZE (sizeof(SpiderRequest)+MAX_URL_LEN+1)
#define MAX_SP_REPLY_SIZE sizeof(SpiderReply)
static key96_t makeWaitingTreeKey ( uint64_t spiderTimeMS , int32_t firstIp ) {
// sanity
if ( ((int64_t)spiderTimeMS) < 0 ) gbshutdownAbort(true);
// make the wait tree key
key96_t wk;
wk.n1 = (spiderTimeMS>>32);
wk.n0 = (spiderTimeMS&0xffffffff);
wk.n0 <<= 32;
wk.n0 |= (uint32_t)firstIp;
// sanity
if ( wk.n1 & 0x8000000000000000LL ) gbshutdownAbort(true);
return wk;
}
/////////////////////////
///////////////////////// SpiderColl
/////////////////////////
void SpiderColl::setCollectionRec ( CollectionRec *cr ) {
m_cr = cr;
}
CollectionRec *SpiderColl::getCollectionRec() {
return m_cr;
}
const CollectionRec *SpiderColl::getCollectionRec() const {
return m_cr;
}
SpiderColl::SpiderColl(CollectionRec *cr) {
m_overflowList = NULL;
m_lastOverflowFirstIp = 0;
m_deleteMyself = false;
m_isLoading = false;
m_gettingWaitingTreeList = false;
m_lastScanTime = 0;
m_isPopulatingDoledb = false;
m_numAdded = 0;
m_numBytesScanned = 0;
m_lastPrintCount = 0;
m_siteListIsEmptyValid = false;
m_cr = NULL;
// re-set this to min and set m_needsWaitingTreeRebuild to true
// when the admin updates the url filters page
m_waitingTreeNeedsRebuild = false;
m_waitingTreeNextKey.setMin();
m_spidersOut = 0;
m_coll[0] = '\0';
// PVS-Studio
m_lastReplyValid = false;
memset(m_lastReplyBuf, 0, sizeof(m_lastReplyBuf));
m_didRead = false;
m_siteListIsEmpty = false;
m_tailIp = 0;
m_tailPriority = 0;
m_tailTimeMS = 0;
m_tailUh48 = 0;
m_minFutureTimeMS = 0;
m_gettingWaitingTreeList = false;
m_lastScanTime = 0;
m_waitingTreeNeedsRebuild = false;
m_numAdded = 0;
m_numBytesScanned = 0;
m_collnum = -1;
m_lastReindexTimeMS = 0;
m_countingPagesIndexed = false;
m_lastReqUh48a = 0;
m_lastReqUh48b = 0;
m_lastRepUh48 = 0;
m_waitingTreeKeyValid = false;
m_scanningIp = 0;
m_gotNewDataForScanningIp = 0;
m_lastListSize = 0;
m_lastScanningIp = 0;
m_totalBytesScanned = 0;
m_deleteMyself = false;
m_pri2 = 0;
memset(m_outstandingSpiders, 0, sizeof(m_outstandingSpiders));
m_overflowList = NULL;
m_totalNewSpiderRequests = 0;
m_lastSreqUh48 = 0;
memset(m_cblocks, 0, sizeof(m_cblocks));
m_pageNumInlinks = 0;
m_lastCBlockIp = 0;
m_lastOverflowFirstIp = 0;
reset();
// reset this
memset ( m_outstandingSpiders , 0 , 4 * MAX_SPIDER_PRIORITIES );
m_collnum = cr->m_collnum;
strcpy(m_coll, cr->m_coll);
m_cr = cr;
// set first doledb scan key
m_nextDoledbKey.setMin();
// mark it as loading so it can't be deleted while loading
m_isLoading = true;
// . load its tables from disk
// . crap i think this might call quickpoll and we get a parm
// update to delete this spider coll!
load();
m_isLoading = false;
}
// load the tables that we set when m_doInitialScan is true
bool SpiderColl::load ( ) {
// error?
int32_t err = 0;
// make the dir
const char *coll = g_collectiondb.getCollName(m_collnum);
// sanity check
if ( ! coll || coll[0]=='\0' ) {
log(LOG_ERROR,"spider: bad collnum of %" PRId32,(int32_t)m_collnum);
g_errno = ENOCOLLREC;
return false;
//gbshutdownAbort(true);
}
// reset this once
m_isPopulatingDoledb = false;
// keep it kinda low if we got a ton of collections
int32_t maxMem = 15000;
int32_t maxNodes = 500;
if ( g_collectiondb.getNumRecsUsed() > 500 ) {
maxNodes = 100;
maxMem = maxNodes * 20;
}
if ( ! m_lastDownloadCache.init ( maxMem , // maxcachemem,
8 , // fixed data size (MS)
maxNodes , // max nodes
"downcache", // dbname
false , // load from disk?
12 , // key size (firstip)
-1 )) {// numPtrsMax
log(LOG_WARN, "spider: dcache init failed");
return false;
}
// this has a quickpoll in it, so that quickpoll processes
// a restart request from crawlbottesting for this collnum which
// calls Collectiondb::resetColl2() which calls deleteSpiderColl()
// on THIS spidercoll, but our m_loading flag is set
if (!m_sniTable.set ( 4,8,0,NULL,0,false,"snitbl") )
return false;
if (!m_cdTable.set (4,4,0,NULL,0,false,"cdtbl"))
return false;
// doledb seems to have like 32000 entries in it
int32_t numSlots = 0; // was 128000
if(!m_doledbIpTable.set(4,4,numSlots,NULL,0,false,"doleip"))
return false;
// this should grow dynamically...
if (!m_waitingTable.set (4,8,16,NULL,0,false,"waittbl"))
return false;
// . a tree of keys, key is earliestSpiderTime|ip (key=12 bytes)
// . earliestSpiderTime is 0 if unknown
// . max nodes is 1M but we should grow dynamically! TODO
// . let's up this to 5M because we are hitting the limit in some
// test runs...
// . try going to 20M now since we hit it again...
// . start off at just 10 nodes since we grow dynamically now
if (!m_waitingTree.set(0, 10, -1, true, "waittree2", "waitingtree", sizeof(key96_t))) {
return false;
}
m_waitingTreeKeyValid = false;
m_scanningIp = 0;
// make dir
char dir[500];
sprintf(dir,"%scoll.%s.%" PRId32,g_hostdb.m_dir,coll,(int32_t)m_collnum);
// load in the waiting tree, IPs waiting to get into doledb
BigFile file;
file.set ( dir , "waitingtree-saved.dat");
bool treeExists = file.doesExist();
// load the table with file named "THISDIR/saved"
if ( treeExists && !m_waitingTree.fastLoad(&file, &m_waitingMem) )
err = g_errno;
// init wait table. scan wait tree and add the ips into table.
if ( ! makeWaitingTable() ) err = g_errno;
// save it
g_errno = err;
// return false on error
if ( g_errno ) {
// note it
log(LOG_WARN,"spider: had error loading initial table: %s", mstrerror(g_errno));
return false;
}
// . do this now just to keep everything somewhat in sync
// . we lost dmoz.org and could not get it back in because it was
// in the doleip table but NOT in doledb!!!
return makeDoledbIPTable();
}
// . scan all spiderRequests in doledb at startup and add them to our tables
// . then, when we scan spiderdb and add to orderTree/urlhashtable it will
// see that the request is in doledb and set m_doled...
// . initialize the dole table for that then
// quickly scan doledb and add the doledb records to our trees and
// tables. that way if we receive a SpiderReply() then addSpiderReply()
// will be able to find the associated SpiderRequest.
// MAKE SURE to put each spiderrequest into m_doleTable... and into
// maybe m_urlHashTable too???
// this should block since we are at startup...
bool SpiderColl::makeDoledbIPTable() {
log(LOG_DEBUG,"spider: making dole ip table for %s",m_coll);
key96_t startKey ; startKey.setMin();
key96_t endKey ; endKey.setMax();
key96_t lastKey ; lastKey.setMin();
// get a meg at a time
int32_t minRecSizes = 1024*1024;
Msg5 msg5;
RdbList list;
for (;;) {
// use msg5 to get the list, should ALWAYS block since no threads
if (!msg5.getList(RDB_DOLEDB,
m_collnum,
&list,
&startKey,
&endKey,
minRecSizes,
true, // includeTree?
0, // startFileNum
-1, // numFiles
NULL, // state
NULL, // callback
0, // niceness
false, // err correction?
-1, // maxRetries
false)) { // isRealMerge
log(LOG_LOGIC, "spider: getList did not block.");
return false;
}
// shortcut
int32_t minSize = (int32_t)(sizeof(SpiderRequest) + sizeof(key96_t) + 4 - MAX_URL_LEN);
// all done if empty
if (list.isEmpty()) {
log(LOG_DEBUG,"spider: making dole ip table done.");
return true;
}
// loop over entries in list
for (list.resetListPtr(); !list.isExhausted(); list.skipCurrentRecord()) {
// get rec
const char *rec = list.getCurrentRec();
// get key
key96_t k = list.getCurrentKey();
// skip deletes -- how did this happen?
if ((k.n0 & 0x01) == 0) {
continue;
}
// check this out
int32_t recSize = list.getCurrentRecSize();
// zero?
if (recSize <= 0) gbshutdownCorrupted();
// 16 is bad too... wtf is this?
if (recSize <= 16) {
continue;
}
// crazy?
if (recSize <= minSize) gbshutdownAbort(true);
// . doledb key is 12 bytes, followed by a 4 byte datasize
// . so skip that key and dataSize to point to spider request
const SpiderRequest *sreq = (const SpiderRequest *)(rec + sizeof(key96_t) + 4);
// add to dole tables
if (!addToDoledbIpTable(sreq)) {
// return false with g_errno set on error
return false;
}
}
startKey = *(key96_t *)list.getLastKey();
startKey++;
// watch out for wrap around
if (startKey < *(key96_t *)list.getLastKey()) {
log(LOG_DEBUG,"spider: making dole ip table done.");
return true;
}
}
}
CollectionRec *SpiderColl::getCollRec() {
CollectionRec *cr = g_collectiondb.getRec(m_collnum);
if ( ! cr ) log(LOG_WARN,"spider: lost coll rec");
return cr;
}
const CollectionRec *SpiderColl::getCollRec() const {
const CollectionRec *cr = g_collectiondb.getRec(m_collnum);
if ( ! cr ) log(LOG_WARN,"spider: lost coll rec");
return cr;
}
const char *SpiderColl::getCollName() const {
const CollectionRec *cr = getCollRec();
if ( ! cr ) return "lostcollection";
return cr->m_coll;
}
bool SpiderColl::makeWaitingTable ( ) {
ScopedLock sl(m_waitingTree.getLock());
log(LOG_DEBUG,"spider: making waiting table for %s.",m_coll);
for (int32_t node = m_waitingTree.getFirstNode_unlocked(); node >= 0;
node = m_waitingTree.getNextNode_unlocked(node)) {
// get key
const key96_t *key = reinterpret_cast<const key96_t*>(m_waitingTree.getKey_unlocked(node));
// get ip from that
int32_t ip = (key->n0) & 0xffffffff;
// spider time is up top
uint64_t spiderTimeMS = (key->n1);
spiderTimeMS <<= 32;
spiderTimeMS |= ((key->n0) >> 32);
// store in waiting table
if (!addToWaitingTable(ip, spiderTimeMS)) return false;
}
log(LOG_DEBUG,"spider: making waiting table done.");
return true;
}
SpiderColl::~SpiderColl () {
reset();
}
// we call this now instead of reset when Collectiondb::resetColl() is used
void SpiderColl::clearLocks ( ) {
g_spiderLoop.clearLocks(m_collnum);
}
void SpiderColl::reset ( ) {
// reset these for SpiderLoop;
m_nextDoledbKey.setMin();
// set this to -1 here, when we enter spiderDoledUrls() it will
// see that its -1 and set the m_msg5StartKey
m_pri2 = -1; // MAX_SPIDER_PRIORITIES - 1;
m_isPopulatingDoledb = false;
const char *coll = "unknown";
if ( m_coll[0] ) coll = m_coll;
log(LOG_DEBUG,"spider: resetting spider cache coll=%s",coll);
m_doledbIpTable .reset();
m_cdTable .reset();
m_sniTable .reset();
m_waitingTable.reset();
m_waitingTree.reset();
m_waitingMem .reset();
m_winnerTree.reset();
m_winnerTable .reset();
m_dupCache .reset();
if ( m_overflowList ) {
mfree ( m_overflowList , OVERFLOWLISTSIZE * 4 ,"olist" );
m_overflowList = NULL;
}
// each spider priority in the collection has essentially a cursor
// that references the next spider rec in doledb to spider. it is
// used as a performance hack to avoid the massive positive/negative
// key annihilations related to starting at the top of the priority
// queue every time we scan it, which causes us to do upwards of
// 300 re-reads!
for ( int32_t i = 0 ; i < MAX_SPIDER_PRIORITIES ; i++ ) {
m_nextKeys[i] = Doledb::makeFirstKey2 ( i );
}
}
bool SpiderColl::updateSiteNumInlinksTable(int32_t siteHash32, int32_t sni, time_t timestamp) {
// do not update if invalid
if ( sni == -1 ) return true;
ScopedLock sl(m_sniTableMtx);
// . get entry for siteNumInlinks table
// . use 32-bit key specialized lookup for speed
uint64_t *val = (uint64_t *)m_sniTable.getValue32(siteHash32);
// bail?
if ( val && ((*val)&0xffffffff) > (uint32_t)timestamp ) return true;
// . make new data for this key
// . lower 32 bits is the addedTime
// . upper 32 bits is the siteNumInlinks
uint64_t nv = (uint32_t)sni;
// shift up
nv <<= 32;
// or in time
nv |= (uint32_t)timestamp;//sreq->m_addedTime;
// just direct update if faster
if ( val ) *val = nv;
// store it anew otherwise
else if ( ! m_sniTable.addKey(&siteHash32,&nv) )
// return false with g_errno set on error
return false;
// success
return true;
}
// . we call this when we receive a spider reply in Rdb.cpp
// . returns false and sets g_errno on error
// . xmldoc.cpp adds reply AFTER the negative doledb rec since we decement
// the count in m_doledbIpTable here
bool SpiderColl::addSpiderReply(const SpiderReply *srep) {
////
//
// skip if not assigned to us for doling
//
////
if ( ! isAssignedToUs ( srep->m_firstIp ) )
return true;
/////////
//
// remove the lock here
//
//////
int64_t lockKey = makeLockTableKey ( srep );
logDebug(g_conf.m_logDebugSpider, "spider: removing lock uh48=%" PRId64" lockKey=%" PRIu64, srep->getUrlHash48(), lockKey);
/////
//
// but do note that its spider has returned for populating the
// waiting tree. addToWaitingTree should not add an entry if
// a spiderReply is still pending according to the lock table,
// UNLESS, maxSpidersPerIP is more than what the lock table says
// is currently being spidered.
//
/////
// now just remove it since we only spider our own urls
// and doledb is in memory
g_spiderLoop.removeLock(lockKey);
// update the latest siteNumInlinks count for this "site" (repeatbelow)
updateSiteNumInlinksTable ( srep->m_siteHash32, srep->m_siteNumInlinks, srep->m_spideredTime );
// clear error for this
g_errno = 0;
// no update if injecting or from pagereindex (docid based spider request)
if (!srep->m_fromInjectionRequest) {
ScopedLock sl(m_cdTableMtx);
// use the domain hash for this guy! since its from robots.txt
const int32_t *cdp = (const int32_t *)m_cdTable.getValue32(srep->m_domHash32);
// update it only if better or empty
if (!cdp) {
// update m_sniTable if we should
// . make new data for this key
// . lower 32 bits is the spideredTime
// . upper 32 bits is the crawldelay
int32_t nv = (int32_t)(srep->m_crawlDelayMS);
if (!m_cdTable.addKey(&srep->m_domHash32, &nv)) {
char ipbuf[16];
log(LOG_WARN, "spider: failed to add crawl delay for firstip=%s", iptoa(srep->m_firstIp,ipbuf));
// just ignore
g_errno = 0;
}
}
}
// . anytime we add a reply then
// we must update this downloadTable with the replies
// SpiderReply::m_downloadEndTime so we can obey sameIpWait
// . that is the earliest that this url can be respidered, but we
// also have a sameIpWait constraint we have to consider...
// . we alone our responsible for adding doledb recs from this ip so
// this is easy to throttle...
// . and make sure to only add to this download time hash table if
// SpiderReply::m_downloadEndTime is non-zero, because zero means
// no download happened. (TODO: check this)
// . TODO: consult crawldelay table here too! use that value if is
// less than our sameIpWait
// . make m_lastDownloadTable an rdbcache ...
// . this is 0 for pagereindex docid-based replies
if (srep->m_downloadEndTime) {
RdbCacheLock rcl(m_lastDownloadCache);
m_lastDownloadCache.addLongLong(m_collnum, srep->m_firstIp, srep->m_downloadEndTime);
// ignore errors from that, it's just a cache
g_errno = 0;
}
char ipbuf[16];
logDebug(g_conf.m_logDebugSpider, "spider: adding spider reply, download end time %" PRId64" for "
"ip=%s(%" PRIu32") uh48=%" PRIu64" indexcode=\"%s\" coll=%" PRId32" "
"k.n1=%" PRIu64" k.n0=%" PRIu64,
//"to SpiderColl::m_lastDownloadCache",
srep->m_downloadEndTime,
iptoa(srep->m_firstIp,ipbuf),
(uint32_t)srep->m_firstIp,
srep->getUrlHash48(),
mstrerror(srep->m_errCode),
(int32_t)m_collnum,
srep->m_key.n1,
srep->m_key.n0);
// . add to wait tree and let it populate doledb on its batch run
// . use a spiderTime of 0 which means unknown and that it needs to
// scan spiderdb to get that
// . returns false if did not add to waiting tree
// . returns false sets g_errno on error
addToWaitingTree(srep->m_firstIp);
// ignore errors i guess
g_errno = 0;
return true;
}
bool SpiderColl::isInDupCache(const SpiderRequest *sreq, bool addToCache) {
// init dup cache?
if ( ! m_dupCache.isInitialized() )
// use 50k i guess of 64bit numbers and linked list info
m_dupCache.init ( 90000,
4 , // fixeddatasize (don't really need this)
5000, // maxcachenodes
"urldups", // dbname
false, // loadfromdisk
12, // cachekeysize
-1 ); // numptrsmax
// quit add dups over and over again...
int64_t dupKey64 = sreq->getUrlHash48();
int64_t org_dupKey64 = dupKey64;
// . these flags make big difference in url filters
// . NOTE: if you see a url that is not getting spidered that should be it might
// be because we are not incorporating other flags here...
if ( sreq->m_fakeFirstIp ) dupKey64 ^= 12345;
if ( sreq->m_isAddUrl ) dupKey64 ^= 49387333;
if ( sreq->m_isInjecting ) dupKey64 ^= 3276404;
if ( sreq->m_isPageReindex) dupKey64 ^= 32999604;
if ( sreq->m_forceDelete ) dupKey64 ^= 29386239;
if ( sreq->m_hadReply ) dupKey64 ^= 293294099;
// . maxage=86400,promoteRec=yes. returns -1 if not in there
RdbCacheLock rcl(m_dupCache);
if (m_dupCache.getLong(0, dupKey64, 86400, true) != -1) {
logDebug(g_conf.m_logDebugSpider, "spider: skipping dup request. url=%s uh48=%" PRIu64 ", dupkey=%" PRIu64 ", org_dupkey=%" PRIu64 ", %s%s%s%s%s%s", sreq->m_url, sreq->getUrlHash48(), dupKey64, org_dupKey64,
sreq->m_fakeFirstIp?"fakeFirstIp ":"",sreq->m_isAddUrl?"isAddUrl ":"",sreq->m_isInjecting?"isInjecting ":"",sreq->m_isPageReindex?"isPageReindex ":"",sreq->m_forceDelete?"forceDelete ":"",sreq->m_hadReply?"hadReply":"");
return true;
}
if (addToCache) {
logDebug(g_conf.m_logDebugSpider, "spider: Adding to dup cache. url=%s uh48=%" PRIu64 ", dupkey=%" PRIu64 ", org_dupkey=%" PRIu64 ", %s%s%s%s%s%s", sreq->m_url, sreq->getUrlHash48(), dupKey64, org_dupKey64,
sreq->m_fakeFirstIp?"fakeFirstIp ":"",sreq->m_isAddUrl?"isAddUrl ":"",sreq->m_isInjecting?"isInjecting ":"",sreq->m_isPageReindex?"isPageReindex ":"",sreq->m_forceDelete?"forceDelete ":"",sreq->m_hadReply?"hadReply":"");
// add it
m_dupCache.addLong(0, dupKey64, 1);
}
return false;
}
// . Rdb.cpp calls SpiderColl::addSpiderRequest/Reply() for every positive
// spiderdb record it adds to spiderdb. that way our cache is kept
// uptodate incrementally
// . returns false and sets g_errno on error
// . if the spiderTime appears to be AFTER m_nextReloadTime then we should
// not add this spider request to keep the cache trimmed!!! (MDW: TODO)
// . BUT! if we have 150,000 urls that is going to take a long time to
// spider, so it should have a high reload rate!
bool SpiderColl::addSpiderRequest(const SpiderRequest *sreq) {
int64_t nowGlobalMS = gettimeofdayInMilliseconds();
// don't add negative keys or data less thangs
if ( sreq->m_dataSize <= 0 ) {
log( "spider: add spider request is dataless for uh48=%" PRIu64, sreq->getUrlHash48() );
return true;
}
// . are we already more or less in spiderdb? true = addToCache
// . put this above isAssignedToUs() so we *try* to keep twins in sync because
// Rdb.cpp won't add the spiderrequest if its in this dup cache, and we add
// it to the dupcache here...
if ( isInDupCache ( sreq , true ) ) {
logDebug( g_conf.m_logDebugSpider, "spider: skipping dup request url=%s uh48=%" PRIu64,
sreq->m_url, sreq->getUrlHash48() );
return true;
}
// skip if not assigned to us for doling
if ( ! isAssignedToUs ( sreq->m_firstIp ) ) {
logDebug( g_conf.m_logDebugSpider, "spider: spider request not assigned to us. skipping." );
return true;
}
if ( sreq->isCorrupt() ) {
log( LOG_WARN, "spider: not adding corrupt spider req to doledb");
return true;
}
// . get the url's length contained in this record
// . it should be NULL terminated
// . we set the ip here too
int32_t ulen = sreq->getUrlLen();
// watch out for corruption
if ( sreq->m_firstIp == 0 || sreq->m_firstIp == -1 || ulen <= 0 ) {
log(LOG_ERROR,"spider: Corrupt spider req with url length of "
"%" PRId32" <= 0 u=%s. dataSize=%" PRId32" firstip=%" PRId32" uh48=%" PRIu64". Skipping.",
ulen,sreq->m_url,
sreq->m_dataSize,sreq->m_firstIp,sreq->getUrlHash48());
return true;
}
// . we can't do this because we do not have the spiderReply!!!???
// . MDW: no, we have to do it because tradesy.com has links to twitter
// on every page and twitter is not allowed so we continually
// re-scan a big spiderdblist for twitter's firstip. major performace
// degradation. so try to get ufn without reply. if we need
// a reply to get the ufn then this function should return -1 which
// means an unknown ufn and we'll add to waiting tree.
// get ufn/priority,because if filtered we do not want to add to doledb
// HACK: set isOutlink to true here since we don't know if we have sre
int32_t ufn = ::getUrlFilterNum(sreq, NULL, nowGlobalMS, false, m_cr, true, -1);
if (ufn >= 0) {
// spiders disabled for this row in url filters?
if (m_cr->m_maxSpidersPerRule[ufn] == 0) {
logDebug(g_conf.m_logDebugSpider, "spider: request spidersoff ufn=%d url=%s", ufn, sreq->m_url);
return true;
}
// do not add to doledb if bad
if (m_cr->m_forceDelete[ufn]) {
logDebug(g_conf.m_logDebugSpider, "spider: request %s is filtered ufn=%d", sreq->m_url, ufn);
return true;
}
}
// once in waiting tree, we will scan waiting tree and then lookup
// each firstIp in waiting tree in spiderdb to get the best
// SpiderRequest for that firstIp, then we can add it to doledb
// as long as it can be spidered now
bool added = addToWaitingTree(sreq->m_firstIp);
// if already doled and we beat the priority/spidertime of what
// was doled then we should probably delete the old doledb key
// and add the new one. hmm, the waitingtree scan code ...
// update the latest siteNumInlinks count for this "site"
if (sreq->m_siteNumInlinksValid) {
// updates m_siteNumInlinksTable
updateSiteNumInlinksTable(sreq->m_siteHash32, sreq->m_siteNumInlinks, (time_t) sreq->m_addedTime);
// clear error for this if there was any
g_errno = 0;
}
// log it
char ipbuf[16];
logDebug( g_conf.m_logDebugSpider, "spider: %s request to waiting tree %s"
" uh48=%" PRIu64
" firstIp=%s "
" pageNumInlinks=%" PRIu32
" parentdocid=%" PRIu64
" isinjecting=%" PRId32
" ispagereindex=%" PRId32
" ufn=%" PRId32
" priority=%" PRId32
" addedtime=%" PRIu32,
added ? "ADDED" : "DIDNOTADD",
sreq->m_url,
sreq->getUrlHash48(),
iptoa(sreq->m_firstIp,ipbuf),
(uint32_t)sreq->m_pageNumInlinks,
sreq->getParentDocId(),
(int32_t)(bool)sreq->m_isInjecting,
(int32_t)(bool)sreq->m_isPageReindex,
(int32_t)sreq->m_ufn,
(int32_t)sreq->m_priority,
(uint32_t)sreq->m_addedTime );
return true;
}
bool SpiderColl::printWaitingTree() {
ScopedLock sl(m_waitingTree.getLock());
for (int32_t node = m_waitingTree.getFirstNode_unlocked(); node >= 0;
node = m_waitingTree.getNextNode_unlocked(node)) {
const key96_t *wk = reinterpret_cast<const key96_t*>(m_waitingTree.getKey_unlocked(node));
// spider time is up top
uint64_t spiderTimeMS = (wk->n1);
spiderTimeMS <<= 32;
spiderTimeMS |= ((wk->n0) >> 32);
// then ip
int32_t firstIp = wk->n0 & 0xffffffff;
// show it
char ipbuf[16];
// for readable timestamp..
time_t now_t = (time_t)(spiderTimeMS / 1000);
struct tm tm_buf;
struct tm *stm = gmtime_r(&now_t,&tm_buf);
log(LOG_INFO,"dump: time=%" PRId64 " (%04d%02d%02d-%02d%02d%02d-%03d) firstip=%s",spiderTimeMS, stm->tm_year+1900,stm->tm_mon+1,stm->tm_mday,stm->tm_hour,stm->tm_min,stm->tm_sec,(int)(spiderTimeMS%1000), iptoa(firstIp,ipbuf));
}
return true;
}
//////
//
// . 1. called by addSpiderReply(). it should have the sameIpWait available
// or at least that will be in the crawldelay cache table.
// SpiderReply::m_crawlDelayMS. Unfortunately, no maxSpidersPerIP!!!
// we just add a "0" in the waiting tree which means evalIpLoop() will
// be called and can get the maxSpidersPerIP from the winning candidate
// and add to the waiting tree based on that.
// . 2. called by addSpiderRequests(). It SHOULD maybe just add a "0" as well
// to offload the logic. try that.
// . 3. called by populateWaitingTreeFromSpiderdb(). it just adds "0" as well,
// if not doled
// . 4. UPDATED in evalIpLoop() if the best SpiderRequest for a firstIp is
// in the future, this is the only time we will add a waiting tree key
// whose spider time is non-zero. that is where we also take
// sameIpWait and maxSpidersPerIP into consideration. evalIpLoop()
// will actually REMOVE the entry from the waiting tree if that IP
// already has the max spiders outstanding per IP. when a spiderReply
// is received it will populate the waiting tree again with a "0" entry
// and evalIpLoop() will re-do its check.
//
//////
// . returns true if we added to waiting tree, false if not
// . if one of these add fails consider increasing mem used by tree/table
// . if we lose an ip that sux because it won't be gotten again unless
// we somehow add another request/reply to spiderdb in the future
bool SpiderColl::addToWaitingTree(int32_t firstIp) {
char ipbuf[16];
logDebug( g_conf.m_logDebugSpider, "spider: addtowaitingtree ip=%s", iptoa(firstIp,ipbuf) );
// we are currently reading spiderdb for this ip and trying to find
// a best SpiderRequest or requests to add to doledb. so if this
// happens, let the scan know that more replies or requests came in
// while we were scanning so that it should not delete the rec from
// waiting tree and not add to doledb, then we'd lose it forever or
// until the next waitingtree rebuild was triggered in time.
//
// Before i was only setting this in addSpiderRequest() so if a new
// reply came in it was not setting m_gotNewDataForScanninIp and
// we ended up losing the IP from the waiting tree forever (or until
// the next timed rebuild). putting it here seems to fix that.
/// @todo ALC verify that we won't lose IP from waiting tree. Do we need to lock the whole evalIpLoop?
if ( firstIp == m_scanningIp ) {
m_gotNewDataForScanningIp = m_scanningIp.load();
//log(LOG_DEBUG,"spider: got new data for %s",iptoa(firstIp));
//return true;
}
// . this can now be only 0
// . only evalIpLoop() will add a waiting tree key with a non-zero
// value after it figures out the EARLIEST time that a
// SpiderRequest from this firstIp can be spidered.
uint64_t spiderTimeMS = 0;
// don't write to tree if we're shutting down
if (g_process.isShuttingDown()) {
log(LOG_WARN, "spider: addtowaitingtree: failed. shutting down");
return false;
}
// only if we are the responsible host in the shard
if ( ! isAssignedToUs ( firstIp ) )
return false;
// . do not add to waiting tree if already in doledb
// . an ip should not exist in both doledb and waiting tree.
// . waiting tree is meant to be a signal that we need to add
// a spiderrequest from that ip into doledb where it can be picked
// up for immediate spidering
if (isInDoledbIpTable(firstIp)) {
logDebug( g_conf.m_logDebugSpider, "spider: not adding to waiting tree, already in doleip table" );
return false;
}
ScopedLock sl(m_waitingTree.getLock());
// see if in tree already, so we can delete it and replace it below
// . this is true if already in tree
// . if spiderTimeMS is a sooner time than what this firstIp already
// has as its earliest time, then we will override it and have to
// update both m_waitingTree and m_waitingTable, however
// IF the spiderTimeMS is a later time, then we bail without doing
// anything at this point.
int64_t sms;
if (getFromWaitingTable(firstIp, &sms)) {
// not only must we be a sooner time, but we must be 5-seconds
// sooner than the time currently in there to avoid thrashing
// when we had a ton of outlinks with this first ip within an
// 5-second interval.
//
// i'm not so sure what i was doing here before, but i don't
// want to starve the spiders, so make this 100ms not 5000ms
if ( (int64_t)spiderTimeMS > sms - 100 ) {
logDebug( g_conf.m_logDebugSpider, "spider: skip updating waiting tree" );
return false;
}
// make the key then
key96_t wk = makeWaitingTreeKey ( sms, firstIp );
// must be there
if (!m_waitingTree.deleteNode_unlocked(0, (char*)&wk, false)) {
// sanity check. ensure waitingTable and waitingTree in sync
gbshutdownLogicError();
}
// log the replacement
logDebug( g_conf.m_logDebugSpider, "spider: replacing waitingtree key oldtime=%" PRIu32" newtime=%" PRIu32" firstip=%s",
(uint32_t)(sms/1000LL),
(uint32_t)(spiderTimeMS/1000LL),
iptoa(firstIp,ipbuf) );
} else {
// time of 0 means we got the reply for something we spidered
// in doledb so we will need to recompute the best spider
// requests for this first ip
// log the replacement
logDebug( g_conf.m_logDebugSpcache, "spider: adding new key to waitingtree newtime=%" PRIu32"%s firstip=%s",
(uint32_t)(spiderTimeMS/1000LL),
( spiderTimeMS == 0 ) ? "(replyreset)" : "",
iptoa(firstIp,ipbuf) );
}
// what is this?
if ( firstIp == 0 || firstIp == -1 ) {
log(LOG_WARN, "spider: got ip of %s. cn=%" PRId32" "
"wtf? failed to add to "
"waiting tree, but return true anyway.",
iptoa(firstIp,ipbuf) ,
(int32_t)m_collnum);
// don't return true lest m_waitingTreeNextKey never gets updated
// and we end up in an infinite loop doing
// populateWaitingTreeFromSpiderdb()
return true;
}
// grow the tree if too small!
int32_t used = m_waitingTree.getNumUsedNodes_unlocked();
int32_t max = m_waitingTree.getNumTotalNodes_unlocked();
if ( used + 1 > max ) {
int32_t more = (((int64_t)used) * 15) / 10;
if ( more < 10 ) more = 10;
if ( more > 100000 ) more = 100000;
int32_t newNum = max + more;
log(LOG_DEBUG, "spider: growing waiting tree to from %" PRId32" to %" PRId32" nodes for collnum %" PRId32,
max , newNum , (int32_t)m_collnum );
if (!m_waitingTree.growTree_unlocked(newNum)) {
log(LOG_ERROR, "Failed to grow waiting tree to add firstip %s", iptoa(firstIp,ipbuf));
return false;
}
if (!setWaitingTableSize(newNum)) {
log(LOG_ERROR, "Failed to grow waiting table to add firstip %s", iptoa(firstIp,ipbuf));
return false;
}
}
key96_t wk = makeWaitingTreeKey(spiderTimeMS, firstIp);
// add that
int32_t wn;
if ((wn = m_waitingTree.addKey_unlocked(&wk)) < 0) {
log(LOG_ERROR, "waitingtree add failed for ip=%s. increase max nodes lest we lose this IP forever. err=%s",
iptoa(firstIp,ipbuf), mstrerror(g_errno));
return false;
}
// note it
logDebug(g_conf.m_logDebugSpider, "spider: added time=%" PRId64" ip=%s to waiting tree node=%" PRId32,
spiderTimeMS, iptoa(firstIp,ipbuf), wn);
// add to table now since its in the tree
if (!addToWaitingTable(firstIp, spiderTimeMS)) {
// remove from tree then
m_waitingTree.deleteNode_unlocked(wn, false);
log(LOG_ERROR, "waitingtable add failed for ip=%s. increase max nodes lest we lose this IP forever. err=%s",
iptoa(firstIp,ipbuf), mstrerror(g_errno));
return false;
}
// tell caller there was no error
return true;
}
// . this scan is started anytime we call addSpiderRequest() or addSpiderReply
// . if nothing is in tree it quickly exits
// . otherwise it scan the entries in the tree
// . each entry is a key with spiderTime/firstIp
// . if spiderTime > now it stops the scan
// . if the firstIp is already in doledb (m_doledbIpTable) then it removes
// it from the waitingtree and waitingtable. how did that happen?
// . otherwise, it looks up that firstIp in spiderdb to get a list of all
// the spiderdb recs from that firstIp
// . then it selects the "best" one and adds it to doledb. once added to
// doledb it adds it to doleIpTable, and remove from waitingtree and
// waitingtable
// . returns false if blocked, true otherwise
int32_t SpiderColl::getNextIpFromWaitingTree() {
// reset first key to get first rec in waiting tree
m_waitingTreeKey.setMin();
// current time on host #0
uint64_t nowMS = gettimeofdayInMilliseconds();
for (;;) {
ScopedLock sl(m_waitingTree.getLock());
// we might have deleted the only node below...
if (m_waitingTree.isEmpty_unlocked()) {
return 0;
}
// assume none
int32_t firstIp = 0;
// set node from wait tree key. this way we can resume from a prev key
int32_t node = m_waitingTree.getNextNode_unlocked(0, (char *)&m_waitingTreeKey);
// if empty, stop
if (node < 0) {
return 0;
}
// get the key
const key96_t *k = reinterpret_cast<const key96_t *>(m_waitingTree.getKey_unlocked(node));
// ok, we got one
firstIp = (k->n0) & 0xffffffff;
// sometimes we take over for a dead host, but if he's no longer
// dead then we can remove his keys. but first make sure we have had
// at least one ping from him so we do not remove at startup.
// if it is in doledb or in the middle of being added to doledb
// via msg4, nuke it as well!
if (firstIp == 0 || firstIp == -1 || !isAssignedToUs(firstIp) || isInDoledbIpTable(firstIp)) {
if (firstIp == 0 || firstIp == -1) {
log(LOG_WARN, "spider: removing corrupt spiderreq firstip of %" PRId32"from waiting tree collnum=%i",
firstIp, (int)m_collnum);
}
// these operations should fail if writes have been disabled
// and becase the trees/tables for spidercache are saving
// in Process.cpp's g_spiderCache::save() call
m_waitingTree.deleteNode_unlocked(node, true);
char ipbuf[16];
logDebug(g_conf.m_logDebugSpider, "spider: removed ip=%s from waiting tree. nn=%" PRId32,
iptoa(firstIp,ipbuf), m_waitingTree.getNumUsedNodes_unlocked());
logDebug(g_conf.m_logDebugSpcache, "spider: erasing waitingtree key firstip=%s", iptoa(firstIp,ipbuf));
// remove from table too!
removeFromWaitingTable(firstIp);
continue;
}
// spider time is up top
uint64_t spiderTimeMS = (k->n1);
spiderTimeMS <<= 32;
spiderTimeMS |= ((k->n0) >> 32);
// stop if need to wait for this one
if (spiderTimeMS > nowMS) {
return 0;
}
// save key for deleting when done
m_waitingTreeKey.n1 = k->n1;
m_waitingTreeKey.n0 = k->n0;
m_waitingTreeKeyValid = true;
m_scanningIp = firstIp;
// compute the best request from spiderdb list, not valid yet
m_lastReplyValid = false;
// start reading spiderdb here
m_nextKey = Spiderdb::makeFirstKey(firstIp);
m_endKey = Spiderdb::makeLastKey(firstIp);
// all done
return firstIp;
}
}
void SpiderColl::getSpiderdbWaitingTreeListWrapper(void *state) {
SpiderColl *sc = static_cast<SpiderColl*>(state);
if (!SpiderdbRdbSqliteBridge::getFirstIps(sc->m_cr->m_collnum,
&sc->m_waitingTreeList,
Spiderdb::getFirstIp(&sc->m_waitingTreeNextKey),
-1,
SR_READ_SIZE)) {
if (!g_errno) {
g_errno = EIO; //imprecise
logTrace(g_conf.m_logTraceSpider, "END, got io-error from sqlite");
return;
}
}
}
void SpiderColl::gotSpiderdbWaitingTreeListWrapper(void *state, job_exit_t exit_type) {
SpiderColl *THIS = (SpiderColl *)state;
// did our collection rec get deleted? since we were doing a read
// the SpiderColl will have been preserved in that case but its
// m_deleteMyself flag will have been set.
if (tryToDeleteSpiderColl(THIS, "2")) {
return;
}
THIS->m_gettingWaitingTreeList = false;
THIS->populateWaitingTreeFromSpiderdb(true);
}
//////////////////
//////////////////
//
// THE BACKGROUND FUNCTION
//
// when the user changes the ufn table the waiting tree is flushed
// and repopulated from spiderdb with this. also used for repairs.
//
//////////////////
//////////////////
// . this stores an ip into the waiting tree with a spidertime of "0" so
// it will be evaluate properly by populateDoledbFromWaitingTree()
//
// @@@ BR: "it seems they fall out over time" - wtf?
// . scan spiderdb to make sure each firstip represented in spiderdb is
// in the waiting tree. it seems they fall out over time. we need to fix
// that but in the meantime this should do a bg repair. and is nice to have
//
// . the waiting tree key is really just a spidertime and a firstip. so we will
// still need populatedoledbfromwaitingtree to periodically scan firstips
// that are already in doledb to see if it has a higher-priority request
// for that firstip. in which case it can add that to doledb too, but then
// we have to be sure to only grant one lock for a firstip to avoid hammering
// that firstip
//
// . this should be called from a sleepwrapper, the same sleep wrapper we
// call populateDoledbFromWaitingTree() from should be fine
void SpiderColl::populateWaitingTreeFromSpiderdb ( bool reentry ) {
logTrace( g_conf.m_logTraceSpider, "BEGIN" );
// skip if in repair mode
if ( g_repairMode )
{
logTrace( g_conf.m_logTraceSpider, "END, in repair mode" );
return;
}
// sanity
if ( m_deleteMyself ) gbshutdownLogicError();
// skip if spiders off
if ( ! m_cr->m_spideringEnabled )
{
logTrace( g_conf.m_logTraceSpider, "END, spiders disabled" );
return;
}
if ( ! g_hostdb.getMyHost( )->m_spiderEnabled )
{
logTrace( g_conf.m_logTraceSpider, "END, spiders disabled (2)" );
return;
}
// skip if udp table is full
if ( g_udpServer.getNumUsedSlotsIncoming() >= MAXUDPSLOTS )
{
logTrace( g_conf.m_logTraceSpider, "END, UDP table full" );
return;
}
// if entering for the first time, we need to read list from spiderdb
if ( ! reentry ) {
// just return if we should not be doing this yet
if ( ! m_waitingTreeNeedsRebuild )
{
logTrace( g_conf.m_logTraceSpider, "END, !m_waitingTreeNeedsRebuild" );
return;
}
// a double call? can happen if list read is slow...
if ( m_gettingWaitingTreeList )
{
logTrace( g_conf.m_logTraceSpider, "END, double call" );
return;
}
// . borrow a msg5
// . if none available just return, we will be called again
// by the sleep/timer function
// . read in a replacement SpiderRequest to add to doledb from
// this ip
// . get the list of spiderdb records
// . do not include cache, those results are old and will mess
// us up
char ipbuf[16];
char keystrbuf[MAX_KEYSTR_BYTES];
log(LOG_DEBUG,"spider: populateWaitingTree: calling msg5: startKey=0x%s firstip=%s",
KEYSTR(&m_waitingTreeNextKey,sizeof(m_waitingTreeNextKey),keystrbuf), iptoa(Spiderdb::getFirstIp(&m_waitingTreeNextKey),ipbuf));
// flag it
m_gettingWaitingTreeList = true;
// read the list from local disk
if (g_jobScheduler.submit(getSpiderdbWaitingTreeListWrapper, gotSpiderdbWaitingTreeListWrapper, this, thread_type_spider_read, 0)) {
return;
}
// unable to submit job
getSpiderdbWaitingTreeListWrapper(this);
if (g_errno) {
return;
}
}
// show list stats
logDebug( g_conf.m_logDebugSpider, "spider: populateWaitingTree: got list of size %" PRId32, m_waitingTreeList.getListSize() );
// unflag it
m_gettingWaitingTreeList = false;
// don't proceed if we're shutting down
if (g_process.isShuttingDown()) {
logTrace( g_conf.m_logTraceSpider, "END, process is shutting down" );
return;
}
// ensure we point to the top of the list
m_waitingTreeList.resetListPtr();
// bail on error
if ( g_errno ) {
log(LOG_ERROR,"spider: Had error getting list of urls from spiderdb2: %s.", mstrerror(g_errno));
//m_isReadDone2 = true;
logTrace( g_conf.m_logTraceSpider, "END" );
return;
}
int32_t lastOne = 0;
// loop over all serialized spiderdb records in the list
for ( ; ! m_waitingTreeList.isExhausted() ; ) {
// get spiderdb rec in its serialized form
const char *rec = m_waitingTreeList.getCurrentRec();
// skip to next guy
m_waitingTreeList.skipCurrentRecord();
// negative? wtf?
if ( (rec[0] & 0x01) == 0x00 ) {
//logf(LOG_DEBUG,"spider: got negative spider rec");
continue;
}
// if its a SpiderReply skip it
if ( ! Spiderdb::isSpiderRequest ( (key128_t *)rec))
{
continue;
}
// cast it
const SpiderRequest *sreq = reinterpret_cast<const SpiderRequest *>(rec);
// get first ip
int32_t firstIp = Spiderdb::getFirstIp(&sreq->m_key);
// if same as last, skip it
if ( firstIp == lastOne )
{
char ipbuf[16];
logTrace( g_conf.m_logTraceSpider, "Skipping, IP [%s] same as last" , iptoa(firstIp,ipbuf));
continue;
}
// set this lastOne for speed
lastOne = firstIp;
// if firstip already in waiting tree, skip it
if (isInWaitingTable(firstIp)) {
char ipbuf[16];
logTrace( g_conf.m_logTraceSpider, "Skipping, IP [%s] already in waiting tree" , iptoa(firstIp,ipbuf));
continue;
}
// skip if only our twin should add it to waitingtree/doledb
if ( ! isAssignedToUs ( firstIp ) )
{
char ipbuf[16];
logTrace( g_conf.m_logTraceSpider, "Skipping, IP [%s] not assigned to us" , iptoa(firstIp,ipbuf));
continue;
}
// skip if ip already represented in doledb i guess otherwise
// the populatedoledb scan will nuke it!!
if (isInDoledbIpTable(firstIp)) {
char ipbuf[16];
logTrace( g_conf.m_logTraceSpider, "Skipping, IP [%s] already in doledb" , iptoa(firstIp,ipbuf));
continue;
}
// not currently spidering either. when they got their
// lock they called confirmLockAcquisition() which will
// have added an entry to the waiting table. sometimes the
// lock still exists but the spider is done. because the
// lock persists for 5 seconds afterwards in case there was
// a lock request for that url in progress, so it will be
// denied.
// . this is starving other collections , should be
// added to waiting tree anyway! otherwise it won't get
// added!!!
// . so now i made this collection specific, not global
if ( g_spiderLoop.getNumSpidersOutPerIp (firstIp,m_collnum)>0)
{
char ipbuf[16];
logTrace( g_conf.m_logTraceSpider, "Skipping, IP [%s] is already being spidered" , iptoa(firstIp,ipbuf));
continue;
}
// otherwise, we want to add it with 0 time so the doledb
// scan will evaluate it properly
// this will return false if we are saving the tree i guess
if (!addToWaitingTree(firstIp)) {
char ipbuf[16];
log(LOG_INFO, "spider: failed to add ip %s to waiting tree. "
"ip will not get spidered then and our population of waiting tree will repeat until this add happens.",
iptoa(firstIp,ipbuf) );
logTrace( g_conf.m_logTraceSpider, "END, addToWaitingTree for IP [%s] failed" , iptoa(firstIp,ipbuf));
return;
}
else
{
char ipbuf[16];
logTrace( g_conf.m_logTraceSpider, "IP [%s] added to waiting tree" , iptoa(firstIp,ipbuf));
}
// count it
m_numAdded++;
// ignore errors for this
g_errno = 0;
}
// are we the final list in the scan?
bool shortRead = ( m_waitingTreeList.getListSize() <= 0);
m_numBytesScanned += m_waitingTreeList.getListSize();
// reset? still left over from our first scan?
if ( m_lastPrintCount > m_numBytesScanned )
m_lastPrintCount = 0;
// announce every 10MB
if ( m_numBytesScanned - m_lastPrintCount > 10000000 ) {
log(LOG_INFO, "spider: %" PRIu64" spiderdb bytes scanned for waiting tree re-population for cn=%" PRId32,m_numBytesScanned,
(int32_t)m_collnum);
m_lastPrintCount = m_numBytesScanned;
}
// debug info
log(LOG_DEBUG,"spider: Read2 %" PRId32" spiderdb bytes.",m_waitingTreeList.getListSize());
// reset any errno cuz we're just a cache
g_errno = 0;
// if not done, keep going
if ( ! shortRead ) {
// . inc it here
// . it can also be reset on a collection rec update
key128_t lastKey = *(key128_t *)m_waitingTreeList.getLastKey();
if ( lastKey < m_waitingTreeNextKey ) {
log(LOG_WARN, "spider: got corruption 9. spiderdb keys out of order for collnum=%" PRId32, (int32_t)m_collnum);
// this should result in an empty list read for
// our next scan of spiderdb. unfortunately we could
// miss a lot of spider requests then
KEYMAX((char*)&m_waitingTreeNextKey,sizeof(m_waitingTreeNextKey));
}
else {
m_waitingTreeNextKey = lastKey;
m_waitingTreeNextKey++;
}
// watch out for wrap around
if ( m_waitingTreeNextKey < lastKey ) shortRead = true;
// nah, advance the firstip, should be a lot faster when
// we are only a few firstips...
if ( lastOne && lastOne != -1 ) { // && ! gotCorruption ) {
key128_t cand = Spiderdb::makeFirstKey(lastOne+1);
// corruption still seems to happen, so only
// do this part if it increases the key to avoid
// putting us into an infinite loop.
if ( cand > m_waitingTreeNextKey ) m_waitingTreeNextKey = cand;
}
}
if ( shortRead ) {
// mark when the scan completed so we can do another one
// like 24 hrs from that...
m_lastScanTime = getTime();
log(LOG_DEBUG, "spider: WaitingTree rebuild complete for %s. Added %" PRId32" recs to waiting tree, scanned %" PRId64" bytes of spiderdb.",
m_coll, m_numAdded, m_numBytesScanned);
//printWaitingTree();
// reset the count for next scan
m_numAdded = 0 ;
m_numBytesScanned = 0;
// reset for next scan
m_waitingTreeNextKey.setMin();
// no longer need rebuild
m_waitingTreeNeedsRebuild = false;
}
// free list to save memory
m_waitingTreeList.freeList();
// wait for sleepwrapper to call us again with our updated m_waitingTreeNextKey
logTrace( g_conf.m_logTraceSpider, "END, done" );
return;
}
//static bool s_ufnTreeSet = false;
//static RdbTree s_ufnTree;
//static time_t s_lastUfnTreeFlushTime = 0;
//////////////////////////
//////////////////////////
//
// The first KEYSTONE function.
//
// CALL THIS ANYTIME to load up doledb from waiting tree entries
//
// This is a key function.
//
// It is called from two places:
//
// 1) sleep callback
//
// 2) addToWaitingTree()
// is called from addSpiderRequest() anytime a SpiderRequest
// is added to spiderdb (or from addSpiderReply())
//
// It can only be entered once so will just return if already scanning
// spiderdb.
//
//////////////////////////
//////////////////////////
// . for each IP in the waiting tree, scan all its SpiderRequests and determine
// which one should be the next to be spidered. and put that one in doledb.
// . we call this a lot, like if the admin changes the url filters table
// we have to re-scan all of spiderdb basically and re-do doledb
void SpiderColl::populateDoledbFromWaitingTree ( ) { // bool reentry ) {
logTrace( g_conf.m_logTraceSpider, "BEGIN" );
// only one loop can run at a time!
if ( m_isPopulatingDoledb ) {
logTrace( g_conf.m_logTraceSpider, "END, already populating doledb" );
return;
}
// skip if in repair mode
if ( g_repairMode ) {
logTrace( g_conf.m_logTraceSpider, "END, in repair mode" );
return;
}
// let's skip if spiders off so we can inject/popoulate the index quick
// since addSpiderRequest() calls addToWaitingTree() which then calls
// this.
if ( ! g_conf.m_spideringEnabled ) {
logTrace( g_conf.m_logTraceSpider, "END, spidering not enabled" );
return;
}
if ( ! g_hostdb.getMyHost( )->m_spiderEnabled ) {
logTrace( g_conf.m_logTraceSpider, "END, spidering not enabled (2)" );
return;
}
// skip if udp table is full
if ( g_udpServer.getNumUsedSlotsIncoming() >= MAXUDPSLOTS ) {
logTrace( g_conf.m_logTraceSpider, "END, no more UDP slots" );
return;
}
// set this flag so we are not re-entered
m_isPopulatingDoledb = true;
for(;;) {
// are we trying to exit? some firstip lists can be quite long, so
// terminate here so all threads can return and we can exit properly
if (g_process.isShuttingDown()) {
m_isPopulatingDoledb = false;
logTrace( g_conf.m_logTraceSpider, "END, shutting down" );
return;
}
// . get next IP that is due to be spidered from
// . also sets m_waitingTreeKey so we can delete it easily!
int32_t ip = getNextIpFromWaitingTree();
// . return if none. all done. unset populating flag.
// . it returns 0 if the next firstip has a spidertime in the future
if ( ip == 0 ) {
m_isPopulatingDoledb = false;
return;
}
// set read range for scanning spiderdb
m_nextKey = Spiderdb::makeFirstKey(ip);
m_endKey = Spiderdb::makeLastKey (ip);
char ipbuf[16];
logDebug( g_conf.m_logDebugSpider, "spider: for cn=%i nextip=%s nextkey=%s",
(int)m_collnum, iptoa(ip,ipbuf), KEYSTR( &m_nextKey, sizeof( key128_t ) ) );
//////
//
// do TWO PASSES, one to count pages, the other to get the best url!!
//
//////
// assume we don't have to do two passes
m_countingPagesIndexed = false;
// get the collectionrec
const CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
// but if we have quota based url filters we do have to count
if ( cr && cr->m_urlFiltersHavePageCounts ) {
// tell evalIpLoop() to count first
m_countingPagesIndexed = true;
// reset this stuff used for counting UNIQUE votes
m_lastReqUh48a = 0LL;
m_lastReqUh48b = 0LL;
m_lastRepUh48 = 0LL;
// and setup the LOCAL counting table if not initialized
if (!m_siteIndexedDocumentCount.isInitialized()) {
m_siteIndexedDocumentCount.set(4, 4, 0, NULL, 0, false, "ltpct");
}
// otherwise, just reset it so we can repopulate it
else m_siteIndexedDocumentCount.reset();
}
logDebug( g_conf.m_logDebugSpider, "spider: evalIpLoop: waitingtree nextip=%s numUsedNodes=%" PRId32,
iptoa(ip,ipbuf), m_waitingTree.getNumUsedNodes() );
//@@@@@@ BR: THIS SHOULD BE DEBUGGED AND ENABLED
/*
// assume using tree
m_useTree = true;
// . flush the tree every 12 hours
// . i guess we could add incoming requests to the ufntree if
// they strictly beat the ufn tree tail node, HOWEVER, we
// still have the problem of that if a url we spidered is due
// to be respidered very soon we will miss it, as only the reply
// is added back into spiderdb, not a new request.
int32_t nowLocal = getTime();
// make it one hour so we don't cock-block a new high priority
// request that just got added... crap, what if its an addurl
// or something like that????
if ( nowLocal - s_lastUfnTreeFlushTime > 3600 ) {
s_ufnTree.clear();
s_lastUfnTreeFlushTime = nowLocal;
}
int64_t uh48;
//
// s_ufnTree tries to cache the top X spiderrequests for an IP
// that should be spidered next so we do not have to scan like
// a million spiderrequests in spiderdb to find the best one.
//
// if we have a specific uh48 targetted in s_ufnTree then that
// saves a ton of time!
// key format for s_ufnTree:
// iiiiiiii iiiiiiii iiiiiii iiiiiii i = firstip
// PPPPPPPP tttttttt ttttttt ttttttt P = priority
// tttttttt tttttttt hhhhhhh hhhhhhh t = spiderTimeMS (40 bits)
// hhhhhhhh hhhhhhhh hhhhhhh hhhhhhh h = urlhash48
key128_t key;
key.n1 = ip;
key.n1 <<= 32;
key.n0 = 0LL;
int32_t node = s_ufnTree.getNextNode_unlocked(0,(char *)&key);
// cancel node if not from our ip
if ( node >= 0 ) {
key128_t *rk = (key128_t *)s_ufnTree.getKey ( node );
if ( (rk->n1 >> 32) != (uint32_t)ip ) node = -1;
}
if ( node >= 0 ) {
// get the key
key128_t *nk = (key128_t *)s_ufnTree.getKey ( node );
// parse out uh48
uh48 = nk->n0;
// mask out spidertimems
uh48 &= 0x0000ffffffffffffLL;
// use that to refine the key range immensley!
m_nextKey = g_spiderdb.makeFirstKey2 (ip, uh48);
m_endKey = g_spiderdb.makeLastKey2 (ip, uh48);
// do not add the recs to the tree!
m_useTree = false;
}
*/
// so we know if we are the first read or not...
m_firstKey = m_nextKey;
// . initialize this before scanning the spiderdb recs of an ip
// . it lets us know if we recvd new spider requests for m_scanningIp
// while we were doing the scan
m_gotNewDataForScanningIp = 0;
m_lastListSize = -1;
// let evalIpLoop() know it has not yet tried to read from spiderdb
m_didRead = false;
// reset this
int32_t maxWinners = (int32_t)MAX_WINNER_NODES;
if (m_winnerTree.getNumNodes() == 0 &&
!m_winnerTree.set(-1, maxWinners, maxWinners * MAX_REQUEST_SIZE, true, "wintree", NULL,
sizeof(key192_t), -1)) {
m_isPopulatingDoledb = false;
log(LOG_ERROR, "Could not initialize m_winnerTree: %s",mstrerror(g_errno));
logTrace( g_conf.m_logTraceSpider, "END, after winnerTree.set" );
return;
}
if ( ! m_winnerTable.isInitialized() &&
! m_winnerTable.set ( 8 , // uh48 is key
sizeof(key192_t) , // winnertree key is data
64 , // 64 slots initially
NULL ,
0 ,
false , // allow dups?
"wtdedup" ) ) {
m_isPopulatingDoledb = false;
log(LOG_ERROR, "Could not initialize m_winnerTable: %s",mstrerror(g_errno));
logTrace( g_conf.m_logTraceSpider, "END, after winnerTable.set" );
return;
}
// clear it before evaluating this ip so it is empty
m_winnerTree.clear();
// and table as well now
m_winnerTable.clear();
// reset this as well
m_minFutureTimeMS = 0LL;
m_totalBytesScanned = 0LL;
m_totalNewSpiderRequests = 0LL;
m_lastOverflowFirstIp = 0;
// . look up in spiderdb otherwise and add best req to doledb from ip
// . if it blocks ultimately it calls gotSpiderdbListWrapper() which
// calls this function again with re-entry set to true
if ( ! evalIpLoop () ) {
logTrace( g_conf.m_logTraceSpider, "END, after evalIpLoop" );
return ;
}
// oom error? i've seen this happen and we end up locking up!
if ( g_errno ) {
log( "spider: evalIpLoop: %s", mstrerror(g_errno) );
m_isPopulatingDoledb = false;
logTrace( g_conf.m_logTraceSpider, "END, error after evalIpLoop" );
return;
}
}
}
///////////////////
//
// KEYSTONE FUNCTION
//
// . READ ALL spiderdb recs for IP of m_scanningIp
// . add winner to doledb
// . called ONLY by populateDoledbFromWaitingTree()
//
// . continually scan spiderdb requests for a particular ip, m_scanningIp
// . compute the best spider request to spider next
// . add it to doledb
// . getNextIpFromWaitingTree() must have been called to set m_scanningIp
// otherwise m_bestRequestValid might not have been reset to false
//
///////////////////
bool SpiderColl::evalIpLoop ( ) {
logTrace( g_conf.m_logTraceSpider, "BEGIN" );
//testWinnerTreeKey ( );
// sanity
if ( m_scanningIp == 0 || m_scanningIp == -1 ) gbshutdownLogicError();
// are we trying to exit? some firstip lists can be quite long, so
// terminate here so all threads can return and we can exit properly
if (g_process.isShuttingDown()) {
logTrace( g_conf.m_logTraceSpider, "END, shutting down" );
return true;
}
bool useCache = true;
const CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
// did our collection rec get deleted? since we were doing a read
// the SpiderColl will have been preserved in that case but its
// m_deleteMyself flag will have been set.
if ( tryToDeleteSpiderColl ( this ,"6" ) ) return false;
// if doing site or page quotes for the sitepages or domainpages
// url filter expressions, we can't muck with the cache because
// we end up skipping the counting part.
if ( ! cr )
useCache = false;
if ( cr && cr->m_urlFiltersHavePageCounts )
useCache = false;
if ( m_countingPagesIndexed )
useCache = false;
// assume not from cache
if ( useCache ) {
// if this ip is in the winnerlistcache use that. it saves us a lot of time.
key96_t cacheKey;
cacheKey.n0 = m_scanningIp;
cacheKey.n1 = 0;
char *doleBuf = NULL;
size_t doleBufSize;
//g_spiderLoop.m_winnerListCache.verify();
FxBlobCacheLock<int32_t> rcl(g_spiderLoop.m_winnerListCache);
bool inCache = g_spiderLoop.m_winnerListCache.lookup(m_scanningIp, (void**)&doleBuf, &doleBufSize);
if ( inCache ) {
int32_t crc = hash32 ( doleBuf + 4 , doleBufSize - 4 );
char ipbuf[16];
logDebug( g_conf.m_logDebugSpider, "spider: GOT %zu bytes of SpiderRequests "
"from winnerlistcache for ip %s ptr=%p crc=%" PRIu32,
doleBufSize,
iptoa(m_scanningIp,ipbuf),
doleBuf,
crc);
//copy doleuf out from cache so we can release the lock
SafeBuf sb;
sb.safeMemcpy(doleBuf,doleBufSize);
rcl.unlock();
// now add the first rec m_doleBuf into doledb's tree
// and re-add the rest back to the cache with the same key.
bool rc = addDoleBufIntoDoledb(&sb,true);
logTrace( g_conf.m_logTraceSpider, "END, after addDoleBufIntoDoledb. returning %s", rc ? "true" : "false" );
return rc;
}
}
top:
// did our collection rec get deleted? since we were doing a read
// the SpiderColl will have been preserved in that case but its
// m_deleteMyself flag will have been set.
if ( tryToDeleteSpiderColl ( this, "4" ) ) {
logTrace( g_conf.m_logTraceSpider, "END, after tryToDeleteSpiderColl (4)" );
return false;
}
// if first time here, let's do a read first
if ( ! m_didRead ) {
// reset list size to 0
m_list.reset();
// assume we did a read now
m_didRead = true;
// reset some stuff
m_lastScanningIp = 0;
// reset these that need to keep track of requests for
// the same url that might span two spiderdb lists or more
m_lastSreqUh48 = 0LL;
// do a read. if it blocks it will recall this loop
if ( ! readListFromSpiderdb () ) {
logTrace( g_conf.m_logTraceSpider, "END, readListFromSpiderdb returned false" );
return false;
}
}
for(;;) {
// did our collection rec get deleted? since we were doing a read
// the SpiderColl will have been preserved in that case but its
// m_deleteMyself flag will have been set.
if ( tryToDeleteSpiderColl ( this, "5" ) ) {
// pretend to block since we got deleted!!!
logTrace( g_conf.m_logTraceSpider, "END, after tryToDeleteSpiderColl (5)" );
return false;
}
// . did reading the list from spiderdb have an error?
// . i guess we don't add to doledb then
if ( g_errno ) {
log(LOG_ERROR,"spider: Had error getting list of urls from spiderdb: %s.",mstrerror(g_errno));
// save mem
m_list.freeList();
logTrace( g_conf.m_logTraceSpider, "END, g_errno %" PRId32, g_errno );
return true;
}
// if we started reading, then assume we got a fresh list here
logDebug( g_conf.m_logDebugSpider, "spider: back from msg5 spiderdb read2 of %" PRId32" bytes (cn=%" PRId32")",
m_list.getListSize(), (int32_t)m_collnum );
// . set the winning request for all lists we read so far
// . if m_countingPagesIndexed is true this will just fill in
// quota info into m_localTable...
scanListForWinners();
// if list not empty, keep reading!
if(m_list.isEmpty())
break;
// update m_nextKey for successive reads of spiderdb by
// calling readListFromSpiderdb()
key128_t lastKey = *(key128_t *)m_list.getLastKey();
// we're already done with this ip
int64_t lastUh48 = Spiderdb::getUrlHash48(&lastKey);
if (lastUh48 == 0xffffffffffffLL) {
break;
}
// crazy corruption?
if ( lastKey < m_nextKey ) {
char ipbuf[16];
log(LOG_WARN, "spider: got corruption. spiderdb keys out of order for "
"collnum=%" PRId32" for evaluation of firstip=%s so terminating evaluation of that firstip." ,
(int32_t)m_collnum, iptoa(m_scanningIp,ipbuf));
// this should result in an empty list read for
// m_scanningIp in spiderdb
m_nextKey = m_endKey;
}
else {
m_nextKey = Spiderdb::makeLastKey(Spiderdb::getFirstIp(&lastKey), ++lastUh48);
}
// . watch out for wrap around
// . normally i would go by this to indicate that we are
// done reading, but there's some bugs... so we go
// by whether our list is empty or not for now
if(m_nextKey < lastKey)
m_nextKey = lastKey;
// reset list to save mem
m_list.reset();
// read more! return if it blocked
if(!readListFromSpiderdb())
return false;
// we got a list without blocking
}
// . we are all done if last list read was empty
// . if we were just counting pages for quota, do a 2nd pass!
if ( m_countingPagesIndexed ) {
// do not do again.
m_countingPagesIndexed = false;
// start at the top again
m_nextKey = Spiderdb::makeFirstKey(m_scanningIp);
// this time m_localTable should have the quota info in it so
// getUrlFilterNum() can use that
m_didRead = false;
// do the 2nd pass. read list from the very top.
goto top;
}
// free list to save memory
m_list.freeList();
// . add all winners if we can in m_winnerTree into doledb
// . if list was empty, then reading is all done so take the winner we
// got from all the lists we did read for this IP and add him
// to doledb
// . if no winner exists, then remove m_scanningIp from m_waitingTree
// so we do not waste our time again. if url filters change then
// waiting tree will be rebuilt and we'll try again... or if
// a new spider request or reply for this ip comes in we'll try
// again as well...
// . this returns false if blocked adding to doledb using msg1
if ( ! addWinnersIntoDoledb() ) {
logTrace( g_conf.m_logTraceSpider, "END, returning false. After addWinnersIntoDoledb" );
return false;
}
// we are done...
logTrace( g_conf.m_logTraceSpider, "END, all done" );
return true;
}
void SpiderColl::getSpiderdbListWrapper(void *state) {
SpiderColl *sc = static_cast<SpiderColl*>(state);
if(!SpiderdbRdbSqliteBridge::getList(sc->m_cr->m_collnum,
&sc->m_list,
sc->m_nextKey,
sc->m_endKey,
SR_READ_SIZE)) {
if(!g_errno) {
g_errno = EIO; //imprecise
}
logTrace( g_conf.m_logTraceSpider, "END, got io-error from sqlite" );
return;
}
}
void SpiderColl::gotSpiderdbListWrapper(void *state, job_exit_t exit_type) {
SpiderColl *THIS = (SpiderColl *)state;
// are we trying to exit? some firstip lists can be quite long, so
// terminate here so all threads can return and we can exit properly
if (g_process.isShuttingDown()) {
return;
}
// return if that blocked
if (!THIS->evalIpLoop()) {
return;
}
// we are done, re-entry popuatedoledb
THIS->m_isPopulatingDoledb = false;
// gotta set m_isPopulatingDoledb to false lest it won't work
THIS->populateDoledbFromWaitingTree();
}
// . this is ONLY CALLED from evalIpLoop() above
// . returns false if blocked, true otherwise
// . returns true and sets g_errno on error
bool SpiderColl::readListFromSpiderdb ( ) {
logTrace( g_conf.m_logTraceSpider, "BEGIN" );
if ( ! m_waitingTreeKeyValid ) gbshutdownLogicError();
if ( ! m_scanningIp ) gbshutdownLogicError();
const CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
if ( ! cr ) {
log(LOG_ERROR,"spider: lost collnum %" PRId32,(int32_t)m_collnum);
g_errno = ENOCOLLREC;
logTrace( g_conf.m_logTraceSpider, "END, ENOCOLLREC" );
return true;
}
// i guess we are always restricted to an ip, because
// populateWaitingTreeFromSpiderdb calls its own msg5.
int32_t firstIp0 = Spiderdb::getFirstIp(&m_nextKey);
// sanity
if ( m_scanningIp != firstIp0 ) gbshutdownLogicError();
// sometimes we already have this ip in doledb/doleiptable
// already and somehow we try to scan spiderdb for it anyway
if (isInDoledbIpTable(firstIp0)) gbshutdownLogicError();
// if it got zapped from the waiting tree by the time we read the list
if (!isInWaitingTable(m_scanningIp)) {
logTrace( g_conf.m_logTraceSpider, "END, IP no longer in waitingTree" );
return true;
}
// sanity check
if (!m_waitingTree.getNode(0, (char *)&m_waitingTreeKey)) {
// it gets removed because addSpiderReply() calls addToWaitingTree
// and replaces the node we are scanning with one that has a better
// time, an earlier time, even though that time may have come and
// we are scanning it now. perhaps addToWaitingTree() should ignore
// the ip if it equals m_scanningIp?
log(LOG_WARN, "spider: waiting tree key removed while reading list for %s (%" PRId32")", cr->m_coll,(int32_t)m_collnum);
logTrace( g_conf.m_logTraceSpider, "END, waitingTree node was removed" );
return true;
}
// . read in a replacement SpiderRequest to add to doledb from
// this ip
// . get the list of spiderdb records
// . do not include cache, those results are old and will mess
// us up
if (g_conf.m_logDebugSpider ) {
// got print each out individually because KEYSTR
// uses a static buffer to store the string
SafeBuf tmp;
char ipbuf[16];
tmp.safePrintf("spider: readListFromSpiderdb: calling msg5: ");
tmp.safePrintf("firstKey=%s ", KEYSTR(&m_firstKey,sizeof(key128_t)));
tmp.safePrintf("endKey=%s ", KEYSTR(&m_endKey,sizeof(key128_t)));
tmp.safePrintf("nextKey=%s ", KEYSTR(&m_nextKey,sizeof(key128_t)));
tmp.safePrintf("firstip=%s ", iptoa(m_scanningIp,ipbuf));
tmp.safePrintf("(cn=%" PRId32")",(int32_t)m_collnum);
log(LOG_DEBUG,"%s",tmp.getBufStart());
}
// log this better
char ipbuf[16];
logDebug(g_conf.m_logDebugSpider, "spider: readListFromSpiderdb: firstip=%s key=%s",
iptoa(m_scanningIp,ipbuf), KEYSTR( &m_nextKey, sizeof( key128_t ) ) );
// . read the list from local disk
// . if a niceness 0 intersect thread is taking a LONG time
// then this will not complete in a long time and we
// end up timing out the round. so try checking for
// m_gettingList in spiderDoledUrls() and setting
// m_lastSpiderCouldLaunch
if (g_jobScheduler.submit(getSpiderdbListWrapper, gotSpiderdbListWrapper, this, thread_type_spider_read, 0)) {
return false;
}
// unable to submit job
getSpiderdbListWrapper(this);
// note its return
logDebug( g_conf.m_logDebugSpider, "spider: back from msg5 spiderdb read of %" PRId32" bytes",m_list.getListSize());
// got it without blocking. maybe all in tree or in cache
logTrace( g_conf.m_logTraceSpider, "END, didn't block" );
return true;
}
static int32_t s_lastIn = 0;
static int32_t s_lastOut = 0;
bool SpiderColl::isFirstIpInOverflowList(int32_t firstIp) const {
if ( ! m_overflowList ) return false;
if ( firstIp == 0 || firstIp == -1 ) return false;
if ( firstIp == s_lastIn ) return true;
if ( firstIp == s_lastOut ) return false;
for ( int32_t oi = 0 ; ; oi++ ) {
// stop at end
if ( ! m_overflowList[oi] ) break;
// an ip of zero is end of the list
if ( m_overflowList[oi] == firstIp ) {
s_lastIn = firstIp;
return true;
}
}
s_lastOut = firstIp;
return false;
}
// . ADDS top X winners to m_winnerTree
// . this is ONLY CALLED from evalIpLoop() above
// . scan m_list that we read from spiderdb for m_scanningIp IP
// . set m_bestRequest if an request in m_list is better than what is
// in m_bestRequest from previous lists for this IP
bool SpiderColl::scanListForWinners ( ) {
// if list is empty why are we here?
if ( m_list.isEmpty() ) return true;
// don't proceed if we're shutting down
if (g_process.isShuttingDown()) {
return true;
}
// ensure we point to the top of the list
m_list.resetListPtr();
// get this
int64_t nowGlobalMS = gettimeofdayInMilliseconds();//Local();
uint32_t nowGlobal = nowGlobalMS / 1000;
const SpiderReply *srep = NULL;
int64_t srepUh48 = 0;
// if we are continuing from another list...
if ( m_lastReplyValid ) {
srep = (SpiderReply *)m_lastReplyBuf;
srepUh48 = srep->getUrlHash48();
}
// show list stats
char ipbuf[16];
logDebug( g_conf.m_logDebugSpider, "spider: readListFromSpiderdb: got list of size %" PRId32" for firstip=%s",
m_list.getListSize(), iptoa(m_scanningIp,ipbuf) );
// if we don't read minRecSizes worth of data that MUST indicate
// there is no more data to read. put this theory to the test
// before we use it to indcate an end of list condition.
if ( m_list.getListSize() > 0 &&
m_lastScanningIp == m_scanningIp &&
m_lastListSize < (int32_t)SR_READ_SIZE &&
m_lastListSize >= 0 ) {
log(LOG_ERROR,"spider: shucks. spiderdb reads not full.");
}
m_lastListSize = m_list.getListSize();
m_lastScanningIp = m_scanningIp;
m_totalBytesScanned += m_list.getListSize();
if ( m_list.isEmpty() ) {
logDebug( g_conf.m_logDebugSpider, "spider: failed to get rec for ip=%s", iptoa(m_scanningIp,ipbuf) );
}
int32_t firstIp = m_waitingTreeKey.n0 & 0xffffffff;
key128_t finalKey;
int32_t recCount = 0;
// loop over all serialized spiderdb records in the list
for ( ; ! m_list.isExhausted() ; ) {
// stop coring on empty lists
if ( m_list.isEmpty() ) break;
// get spiderdb rec in its serialized form
char *rec = m_list.getCurrentRec();
// count it
recCount++;
// sanity
memcpy ( (char *)&finalKey , rec , sizeof(key128_t) );
// skip to next guy
m_list.skipCurrentRecord();
// negative? wtf?
if ( (rec[0] & 0x01) == 0x00 ) {
logf(LOG_DEBUG,"spider: got negative spider rec");
continue;
}
// if its a SpiderReply set it for an upcoming requests
if ( ! Spiderdb::isSpiderRequest ( (key128_t *)rec ) ) {
// see if this is the most recent one
const SpiderReply *tmp = (SpiderReply *)rec;
// . MDW: we have to detect corrupt replies up here so
// they do not become the winning reply because
// their date is in the future!!
if ( tmp->m_spideredTime > nowGlobal + 1 ) {
if ( m_cr->m_spiderCorruptCount == 0 ) {
log( LOG_WARN, "spider: got corrupt time spiderReply in scan uh48=%" PRId64" httpstatus=%" PRId32" datasize=%" PRId32" (cn=%" PRId32") ip=%s",
tmp->getUrlHash48(),
(int32_t)tmp->m_httpStatus,
tmp->m_dataSize,
(int32_t)m_collnum,
iptoa(m_scanningIp,ipbuf));
}
m_cr->m_spiderCorruptCount++;
// don't nuke it just for that...
//srep = NULL;
continue;
}
// . this is -1 on corruption
// . i've seen -31757, 21... etc for bad http replies
// in the qatest123 doc cache... so turn off for that
if ( tmp->m_httpStatus >= 1000 ) {
if ( m_cr->m_spiderCorruptCount == 0 ) {
log(LOG_WARN, "spider: got corrupt 3 spiderReply in scan uh48=%" PRId64" httpstatus=%" PRId32" datasize=%" PRId32" (cn=%" PRId32") ip=%s",
tmp->getUrlHash48(),
(int32_t)tmp->m_httpStatus,
tmp->m_dataSize,
(int32_t)m_collnum,
iptoa(m_scanningIp,ipbuf));
}
m_cr->m_spiderCorruptCount++;
// don't nuke it just for that...
//srep = NULL;
continue;
}
// bad langid?
if ( ! getLanguageAbbr (tmp->m_langId) ) {
log(LOG_WARN, "spider: got corrupt 4 spiderReply in scan uh48=%" PRId64" langid=%" PRId32" (cn=%" PRId32") ip=%s",
tmp->getUrlHash48(),
(int32_t)tmp->m_langId,
(int32_t)m_collnum,
iptoa(m_scanningIp,ipbuf));
m_cr->m_spiderCorruptCount++;
//srep = NULL;
continue;
}
// if we are corrupt, skip us
if ( tmp->getRecSize() > (int32_t)MAX_SP_REPLY_SIZE )
continue;
// if we have a more recent reply already, skip this
if ( srep &&
srep->getUrlHash48() == tmp->getUrlHash48() &&
srep->m_spideredTime >= tmp->m_spideredTime )
continue;
// otherwise, assign it
srep = tmp;
srepUh48 = srep->getUrlHash48();
continue;
}
// cast it
SpiderRequest *sreq = (SpiderRequest *)rec;
// skip if our twin or another shard should handle it
if ( ! isAssignedToUs ( sreq->m_firstIp ) ) {
continue;
}
int64_t uh48 = sreq->getUrlHash48();
// null out srep if no match
if ( srep && srepUh48 != uh48 ) {
srep = NULL;
}
// . ignore docid-based requests if spidered the url afterwards
// . these are one-hit wonders
// . once done they can be deleted
if ( sreq->m_isPageReindex && srep && srep->m_spideredTime > sreq->m_addedTime ) {
logDebug( g_conf.m_logDebugSpider, "spider: skipping9 %s", sreq->m_url );
continue;
}
// if a replie-less new url spiderrequest count it
// avoid counting query reindex requests
if ( ! srep && m_lastSreqUh48 != uh48 && ! sreq->m_fakeFirstIp ) {
m_totalNewSpiderRequests++;
}
int32_t cblock = ipdom ( sreq->m_firstIp );
bool countIt = true;
// reset page inlink count on url request change
if ( m_lastSreqUh48 != uh48 ) {
m_pageNumInlinks = 0;
m_lastCBlockIp = 0;
}
if ( cblock == m_lastCBlockIp ||
// do not count manually added spider requests
sreq->m_isAddUrl || sreq->m_isInjecting ||
// 20 is good enough
m_pageNumInlinks >= 20 ) {
countIt = false;
}
if ( countIt ) {
int32_t ca;
for ( ca = 0 ; ca < m_pageNumInlinks ; ca++ ) {
if ( m_cblocks[ ca ] == cblock ) {
break;
}
}
// if found in our list, do not count it, already did
if ( ca < m_pageNumInlinks ) {
countIt = false;
}
}
if ( countIt ) {
m_cblocks[m_pageNumInlinks] = cblock;
m_pageNumInlinks++;
if ( m_pageNumInlinks > 20 ) gbshutdownAbort(true);
}
// set this now. it does increase with each request. so
// initial requests will not see the full # of inlinks.
sreq->m_pageNumInlinks = (uint8_t)m_pageNumInlinks;
m_lastSreqUh48 = uh48;
m_lastCBlockIp = cblock;
// only add firstip if manually added and not fake
//
// just calculating page counts? if the url filters are based
// on the # of pages indexed per ip or subdomain/site then
// we have to maintain a page count table. sitepages.
//
if ( m_countingPagesIndexed ) {
// only add dom/site hash seeds if it is
// a fake firstIp to avoid double counting seeds
if ( sreq->m_fakeFirstIp ) continue;
// count the manual additions separately. mangle their
// hash with 0x123456 so they are separate.
if ( (sreq->m_isAddUrl || sreq->m_isInjecting) &&
// unique votes per seed
uh48 != m_lastReqUh48a ) {
// do not repeat count the same url
m_lastReqUh48a = uh48;
// sanity
if ( ! sreq->m_siteHash32) gbshutdownAbort(true); //isj: this abort is questionable
// do a little magic because we count
// seeds as "manual adds" as well as normal pg
int32_t h32;
h32 = sreq->m_siteHash32 ^ 0x123456;
m_siteIndexedDocumentCount.addScore(h32);
}
// unique votes per other for quota
if ( uh48 == m_lastReqUh48b ) continue;
// update this to ensure unique voting
m_lastReqUh48b = uh48;
// now count pages indexed below here
if ( ! srep ) continue;
if ( srepUh48 == m_lastRepUh48 ) continue;
m_lastRepUh48 = srepUh48;
//if ( ! srep ) continue;
// TODO: what is srep->m_isIndexedINValid is set????
if ( ! srep->m_isIndexed ) continue;
// keep count per site and firstip
m_siteIndexedDocumentCount.addScore(sreq->m_siteHash32,1);
const int32_t *tmpNum = (const int32_t *)m_siteIndexedDocumentCount.getValue( &( sreq->m_siteHash32 ) );
logDebug( g_conf.m_logDebugSpider, "spider: sitequota: got %" PRId32" indexed docs for site from "
"firstip of %s from url %s", tmpNum ? *tmpNum : -1,
iptoa(sreq->m_firstIp,ipbuf),
sreq->m_url );
continue;
}
// if the spiderrequest has a fake firstip that means it
// was injected without doing a proper ip lookup for speed.
// xmldoc.cpp will check for m_fakeFirstIp and it that is
// set in the spiderrequest it will simply add a new request
// with the correct firstip. it will be a completely different
// spiderrequest key then. so no need to keep the "fakes".
// it will log the EFAKEFIRSTIP error msg.
if ( sreq->m_fakeFirstIp && srep && srep->m_spideredTime > sreq->m_addedTime ) {
logDebug( g_conf.m_logDebugSpider, "spider: skipping6 %s", sreq->m_url );
continue;
}
if (!sreq->m_urlIsDocId) {
//delete request if it is an url we don't want to index
Url url;
url.set(sreq->m_url);
if (url.hasNonIndexableExtension(TITLEREC_CURRENT_VERSION) || isUrlBlocked(url,NULL)) {
if (srep && !srep->m_isIndexedINValid && srep->m_isIndexed) {
log(LOG_DEBUG, "Found unwanted/non-indexable URL '%s' in spiderdb. Force deleting it", sreq->m_url);
sreq->m_forceDelete = true;
} else {
log(LOG_DEBUG, "Found unwanted/non-indexable URL '%s' in spiderdb. Deleting it", sreq->m_url);
SpiderdbUtil::deleteRecord(m_collnum, sreq->m_firstIp, uh48);
continue;
}
}
}
if ( sreq->isCorrupt() ) {
if ( m_cr->m_spiderCorruptCount == 0 )
log( LOG_WARN, "spider: got corrupt xx spiderRequest in scan because url is %s (cn=%" PRId32")",
sreq->m_url,(int32_t)m_collnum);
m_cr->m_spiderCorruptCount++;
continue;
}
if ( sreq->m_dataSize > (int32_t)sizeof(SpiderRequest) ) {
if ( m_cr->m_spiderCorruptCount == 0 )
log( LOG_WARN, "spider: got corrupt 11 spiderRequest in scan because rectoobig u=%s (cn=%" PRId32")",
sreq->m_url,(int32_t)m_collnum);
m_cr->m_spiderCorruptCount++;
continue;
}
int32_t delta = sreq->m_addedTime - nowGlobal;
if ( delta > 86400 ) {
static bool s_first = true;
if ( m_cr->m_spiderCorruptCount == 0 || s_first ) {
s_first = false;
log( LOG_WARN, "spider: got corrupt 6 spiderRequest in scan because added time is %" PRId32" (delta=%" PRId32" which is well into the future. url=%s (cn=%i)",
(int32_t)sreq->m_addedTime, delta, sreq->m_url, (int)m_collnum);
}
m_cr->m_spiderCorruptCount++;
continue;
}
// update SpiderRequest::m_siteNumInlinks to most recent value
int32_t sni = sreq->m_siteNumInlinks;
{
ScopedLock sl(m_sniTableMtx);
// get the # of inlinks to the site from our table
const uint64_t *val = (const uint64_t *)m_sniTable.getValue32(sreq->m_siteHash32);
// use the most recent sni from this table
if (val)
sni = (int32_t)((*val) >> 32);
// if SpiderRequest is forced then m_siteHash32 is 0!
else if (srep && srep->m_spideredTime >= sreq->m_addedTime)
sni = srep->m_siteNumInlinks;
}
// assign
sreq->m_siteNumInlinks = sni;
// store error count in request so xmldoc knows what it is
// and can increment it and re-add it to its spiderreply if
// it gets another error
if ( srep ) {
// . assign this too from latest reply - smart compress
// . this WAS SpiderReply::m_pubdate so it might be
// set to a non-zero value that is wrong now... but
// not a big deal!
sreq->m_contentHash32 = srep->m_contentHash32;
// if we tried it before
sreq->m_hadReply = true;
}
// . get the url filter we match
// . if this is slow see the TODO below in dedupSpiderdbList()
// which can pre-store these values assuming url filters do
// not change and siteNumInlinks is about the same.
int32_t ufn = ::getUrlFilterNum(sreq, srep, nowGlobal, false, m_cr, false, -1);
// sanity check
if ( ufn == -1 ) {
log( LOG_WARN, "failed to match url filter for url='%s' coll='%s'", sreq->m_url, m_cr->m_coll );
g_errno = EBADENGINEER;
return true;
}
// set the priority (might be the same as old)
int32_t priority = m_cr->m_spiderPriorities[ufn];
// now get rid of negative priorities since we added a
// separate force delete checkbox in the url filters
if ( priority < 0 ) priority = 0;
if ( priority >= MAX_SPIDER_PRIORITIES) gbshutdownLogicError();
logDebug( g_conf.m_logDebugSpider, "spider: got ufn=%" PRId32" for %s (%" PRId64")",
ufn, sreq->m_url, sreq->getUrlHash48() );
if ( srep )
logDebug(g_conf.m_logDebugSpider, "spider: lastspidered=%" PRIu32, srep->m_spideredTime );
// spiders disabled for this row in url filteres?
if ( m_cr->m_maxSpidersPerRule[ufn] <= 0 ) {
continue;
}
// skip if banned (unless need to delete from index)
if (m_cr->m_forceDelete[ufn]) {
// but if it is currently indexed we have to delete it
if (!(srep && srep->m_isIndexed)) {
// so only skip if it's not indexed
continue;
}
}
if ( m_cr->m_forceDelete[ufn] ) {
logDebug( g_conf.m_logDebugSpider, "spider: force delete ufn=%" PRId32" url='%s'", ufn, sreq->m_url );
// force it to a delete
sreq->m_forceDelete = true;
}
int64_t spiderTimeMS = getSpiderTimeMS(sreq, ufn, srep, nowGlobalMS);
// sanity
if ( (int64_t)spiderTimeMS < 0 ) {
log( LOG_WARN, "spider: got corrupt 2 spiderRequest in scan (cn=%" PRId32")",
(int32_t)m_collnum);
continue;
}
// save this shit for storing in doledb
sreq->m_ufn = ufn;
sreq->m_priority = priority;
// if it is in future, skip it and just set m_futureTime and
// and we will update the waiting tree
// with an entry based on that future time if the winnerTree
// turns out to be empty after we've completed our scan
if ( spiderTimeMS > nowGlobalMS ) {
// if futuretime is zero set it to this time
if ( ! m_minFutureTimeMS )
m_minFutureTimeMS = spiderTimeMS;
// otherwise we get the MIN of all future times
else if ( spiderTimeMS < m_minFutureTimeMS )
m_minFutureTimeMS = spiderTimeMS;
if ( g_conf.m_logDebugSpider )
log(LOG_DEBUG,"spider: skippingx %s",sreq->m_url);
continue;
}
// we can't have negative priorities at this point because
// the s_ufnTree uses priority as part of the key so it
// can get the top 100 or so urls for a firstip to avoid
// having to hit spiderdb for every one!
//if ( priority < 0 ) gbshutdownLogicError();
// bail if it is locked! we now call
// msg12::confirmLockAcquisition() after we get the lock,
// which deletes the doledb record from doledb and doleiptable
// rightaway and adds a "0" entry into the waiting tree so
// that evalIpLoop() repopulates doledb again with that
// "firstIp". this way we can spider multiple urls from the
// same ip at the same time.
int64_t key = makeLockTableKey ( sreq );
logDebug( g_conf.m_logDebugSpider, "spider: checking uh48=%" PRId64" lockkey=%" PRId64" used=%" PRId32,
uh48, key, g_spiderLoop.getLockCount() );
// MDW
if (g_spiderLoop.isLocked(key)) {
logDebug( g_conf.m_logDebugSpider, "spider: skipping url lockkey=%" PRId64" in lock table sreq.url=%s",
key, sreq->m_url );
continue;
}
// make key
key192_t wk = makeWinnerTreeKey( firstIp ,
priority ,
spiderTimeMS ,
uh48 );
// assume our added time is the first time this url was added
sreq->m_discoveryTime = sreq->m_addedTime;
// if this url is already in the winnerTree then either we
// replace it or we skip ourselves.
//
// watch out for dups in winner tree, the same url can have
// multiple spiderTimeMses somehow...
// as well, resulting in different priorities...
// actually the dedup table could map to a priority and a node
// so we can kick out a lower priority version of the same url.
int32_t winSlot = m_winnerTable.getSlot ( &uh48 );
if ( winSlot >= 0 ) {
const key192_t *oldwk = (const key192_t *)m_winnerTable.getValueFromSlot ( winSlot );
SpiderRequest *wsreq = (SpiderRequest *)m_winnerTree.getData(0,(const char *)oldwk);
if ( wsreq ) {
// and the min added time as well!
// get the oldest timestamp so
// gbssDiscoveryTime will be accurate.
if ( sreq->m_discoveryTime < wsreq->m_discoveryTime )
wsreq->m_discoveryTime = sreq->m_discoveryTime;
if ( wsreq->m_discoveryTime < sreq->m_discoveryTime )
sreq->m_discoveryTime = wsreq->m_discoveryTime;
}
// are we lower priority? (or equal)
// smaller keys are HIGHER priority.
if(KEYCMP( (const char *)&wk, (const char *)oldwk, sizeof(key192_t)) >= 0)
{
continue;
}
// from table too. no it's a dup uh48!
//m_winnerTable.deleteKey ( &uh48 );
// otherwise we supplant it. remove old key from tree.
m_winnerTree.deleteNode ( 0 , (char *)oldwk , false);
// supplant in table and tree... just add below...
}
// get the top 100 spider requests by priority/time/etc.
int32_t maxWinners = (int32_t)MAX_WINNER_NODES; // 40
//@todo BR: Why max winners based on bytes scanned??
// if less than 10MB of spiderdb requests limit to 400
if ( m_totalBytesScanned < 10000000 ) maxWinners = 400;
// only put one doledb record into winner tree if
// the list is pretty short. otherwise, we end up caching
// too much. granted, we only cache for about 2 mins.
// mdw: for testing take this out!
if ( m_totalBytesScanned < 25000 ) maxWinners = 1;
// sanity. make sure read is somewhat hefty for our maxWinners=1 thing
static_assert(SR_READ_SIZE >= 500000, "ensure read size is big enough");
{
ScopedLock sl(m_winnerTree.getLock());
// only compare to min winner in tree if tree is full
if (m_winnerTree.getNumUsedNodes_unlocked() >= maxWinners) {
// get that key
int64_t tm1 = spiderTimeMS;
// get the spider time of lowest scoring req in tree
int64_t tm2 = m_tailTimeMS;
// if they are both overdue, make them the same
if (tm1 < nowGlobalMS) tm1 = 1;
if (tm2 < nowGlobalMS) tm2 = 1;
// skip spider request if its time is past winner's
if (tm1 > tm2)
continue;
if (tm1 < tm2)
goto gotNewWinner;
// if tied, use priority
if (priority < m_tailPriority)
continue;
if (priority > m_tailPriority)
goto gotNewWinner;
// if tied, use actual times. assuming both<nowGlobalMS
if (spiderTimeMS > m_tailTimeMS)
continue;
if (spiderTimeMS < m_tailTimeMS)
goto gotNewWinner;
// all tied, keep it the same i guess
continue;
// otherwise, add the new winner in and remove the old
gotNewWinner:
// get lowest scoring node in tree
int32_t tailNode = m_winnerTree.getLastNode_unlocked();
// from table too
m_winnerTable.removeKey(&m_tailUh48);
// delete the tail so new spiderrequest can enter
m_winnerTree.deleteNode_unlocked(tailNode, true);
}
}
// somestimes the firstip in its key does not match the
// firstip in the record!
if ( sreq->m_firstIp != firstIp ) {
log(LOG_ERROR,"spider: request %s firstip does not match "
"firstip in key collnum=%i",sreq->m_url,
(int)m_collnum);
log(LOG_ERROR,"spider: ip1=%s",iptoa(sreq->m_firstIp,ipbuf));
log(LOG_ERROR,"spider: ip2=%s",iptoa(firstIp,ipbuf));
continue;
}
// . add to table which allows us to ensure same url not
// repeated in tree
// . just skip if fail to add...
if ( ! m_winnerTable.addKey ( &uh48 , &wk ) ) {
log(LOG_WARN,"spider: skipping. could not add to winnerTable. %s. ip=%s", sreq->m_url,iptoa(m_scanningIp,ipbuf) );
continue;
}
// use an individually allocated buffer for each spiderrequest
// so if it gets removed from tree the memory can be freed by
// the tree which "owns" the data because m_winnerTree.set()
// above set ownsData
// to true above.
int32_t need = sreq->getRecSize();
char *newMem = (char *)mdup ( sreq , need , "sreqbuf" );
if ( ! newMem ) {
log(LOG_WARN,"spider: skipping. could not alloc newMem. %s. ip=%s", sreq->m_url,iptoa(m_scanningIp,ipbuf) );
continue;
}
{
ScopedLock sl(m_winnerTree.getLock());
// add it to the tree of the top urls to spider
m_winnerTree.addNode_unlocked(0, (char *)&wk, (char *)newMem, need);
// set new tail priority and time for next compare
if (m_winnerTree.getNumUsedNodes_unlocked() >= maxWinners) {
// for the worst node in the tree...
int32_t tailNode = m_winnerTree.getLastNode_unlocked();
if (tailNode < 0) gbshutdownAbort(true);
// set new tail parms
const key192_t *tailKey = reinterpret_cast<const key192_t *>(m_winnerTree.getKey_unlocked(tailNode));
// convert to char first then to signed int32_t
parseWinnerTreeKey(tailKey, &m_tailIp, &m_tailPriority, &m_tailTimeMS, &m_tailUh48);
// sanity
if (m_tailIp != firstIp) {
gbshutdownAbort(true);
}
}
}
}
// if no spiderreply for the current url, invalidate this
m_lastReplyValid = false;
// if read is not yet done, save the reply in case next list needs it
if ( srep ) { // && ! m_isReadDone ) {
int32_t rsize = srep->getRecSize();
if((size_t)rsize > sizeof(m_lastReplyBuf))
gbshutdownAbort(true);
memmove ( m_lastReplyBuf, srep, rsize );
m_lastReplyValid = true;
}
logDebug(g_conf.m_logDebugSpider, "spider: Checked list of %" PRId32" spiderdb bytes (%" PRId32" recs) "
"for winners for firstip=%s. winnerTreeUsedNodes=%" PRId32" #newreqs=%" PRId64,
m_list.getListSize(), recCount,
iptoa(m_scanningIp,ipbuf), m_winnerTree.getNumUsedNodes(), m_totalNewSpiderRequests );
// reset any errno cuz we're just a cache
g_errno = 0;
/////
//
// BEGIN maintain firstip overflow list
//
/////
bool overflow = false;
// don't add any more outlinks to this firstip after we
// have 10M spider requests for it.
// lower for testing
//if ( m_totalNewSpiderRequests > 1 )
// @todo BR: Another hardcoded limit..
if ( m_totalNewSpiderRequests > 10000000 )
overflow = true;
// need space
if ( overflow && ! m_overflowList ) {
int32_t need = OVERFLOWLISTSIZE*4;
m_overflowList = (int32_t *)mmalloc(need,"list");
m_overflowList[0] = 0;
}
//
// ensure firstip is in the overflow list if we overflowed
int32_t emptySlot = -1;
bool found = false;
int32_t oi;
// if we dealt with this last round, we're done
if ( m_lastOverflowFirstIp == firstIp )
return true;
m_lastOverflowFirstIp = firstIp;
if ( overflow ) {
logDebug( g_conf.m_logDebugSpider, "spider: firstip %s overflowing with %" PRId32" new reqs",
iptoa(firstIp,ipbuf), (int32_t)m_totalNewSpiderRequests );
}
for ( oi = 0 ; ; oi++ ) {
// sanity
if ( ! m_overflowList ) break;
// an ip of zero is end of the list
if ( ! m_overflowList[oi] ) break;
// if already in there, we are done
if ( m_overflowList[oi] == firstIp ) {
found = true;
break;
}
// -1 means empty slot
if ( m_overflowList[oi] == -1 ) emptySlot = oi;
}
// if we need to add it...
if ( overflow && ! found && m_overflowList ) {
log(LOG_DEBUG,"spider: adding %s to overflow list",iptoa(firstIp,ipbuf));
// reset this little cache thingy
s_lastOut = 0;
// take the empty slot if there is one
if ( emptySlot >= 0 )
m_overflowList[emptySlot] = firstIp;
// or add to new slot. this is #defined to 200 last check
else if ( oi+1 < OVERFLOWLISTSIZE ) {
m_overflowList[oi] = firstIp;
m_overflowList[oi+1] = 0;
}
else
log(LOG_ERROR,"spider: could not add firstip %s to "
"overflow list, full.", iptoa(firstIp,ipbuf));
}
// ensure firstip is NOT in the overflow list if we are ok
for ( int32_t oi2 = 0 ; ! overflow ; oi2++ ) {
// sanity
if ( ! m_overflowList ) break;
// an ip of zero is end of the list
if ( ! m_overflowList[oi2] ) break;
// skip if not a match
if ( m_overflowList[oi2] != firstIp ) continue;
// take it out of list
m_overflowList[oi2] = -1;
log(LOG_DEBUG, "spider: removing %s from overflow list",iptoa(firstIp,ipbuf));
// reset this little cache thingy
s_lastIn = 0;
break;
}
/////
//
// END maintain firstip overflow list
//
/////
// ok we've updated m_bestRequest!!!
return true;
}
// . this is ONLY CALLED from evalIpLoop() above
// . add another 0 entry into waiting tree, unless we had no winner
// . add winner in here into doledb
// . returns false if blocked and doledWrapper() will be called
// . returns true and sets g_errno on error
bool SpiderColl::addWinnersIntoDoledb ( ) {
if ( g_errno ) {
log(LOG_ERROR,"spider: got error when trying to add winner to doledb: "
"%s",mstrerror(g_errno));
return true;
}
// gotta check this again since we might have done a QUICKPOLL() above
// to call g_process.shutdown() so now tree might be unwritable
if (g_process.isShuttingDown()) {
return true;
}
// ok, all done if nothing to add to doledb. i guess we were misled
// that firstIp had something ready for us. maybe the url filters
// table changed to filter/ban them all. if a new request/reply comes
// in for this firstIp then it will re-add an entry to waitingtree and
// we will re-scan spiderdb. if we had something to spider but it was
// in the future the m_minFutureTimeMS will be non-zero, and we deal
// with that below...
if (m_winnerTree.isEmpty() && ! m_minFutureTimeMS ) {
// if we received new incoming requests while we were
// scanning, which is happening for some crawls, then do
// not nuke! just repeat later in populateDoledbFromWaitingTree
if ( m_gotNewDataForScanningIp ) {
if ( g_conf.m_logDebugSpider )
log(LOG_DEBUG, "spider: received new requests, not "
"nuking misleading key");
return true;
}
// note it - this can happen if no more to spider right now!
char ipbuf[16];
if ( g_conf.m_logDebugSpider )
log(LOG_WARN, "spider: nuking misleading waitingtree key "
"firstIp=%s", iptoa(m_scanningIp,ipbuf));
ScopedLock sl(m_waitingTree.getLock());
m_waitingTree.deleteNode_unlocked(0, (char *)&m_waitingTreeKey, true);
m_waitingTreeKeyValid = false;
// note it
uint64_t timestamp64 = m_waitingTreeKey.n1;
timestamp64 <<= 32;
timestamp64 |= m_waitingTreeKey.n0 >> 32;
int32_t firstIp = m_waitingTreeKey.n0 &= 0xffffffff;
if ( g_conf.m_logDebugSpider )
log(LOG_DEBUG,"spider: removed2 time=%" PRId64" ip=%s from "
"waiting tree. nn=%" PRId32".",
timestamp64, iptoa(firstIp,ipbuf),
m_waitingTree.getNumUsedNodes_unlocked());
removeFromWaitingTable(firstIp);
return true;
}
// i've seen this happen, wtf?
if (m_winnerTree.isEmpty() && m_minFutureTimeMS ) {
// this will update the waiting tree key with minFutureTimeMS
addDoleBufIntoDoledb ( NULL , false );
return true;
}
///////////
//
// make winner tree into doledb list to add
//
///////////
// first 4 bytes is offset of next doledb record to add to doledb
// so we do not have to re-add the dolebuf to the cache and make it
// churn. it is really inefficient.
SafeBuf doleBuf;
doleBuf.pushLong(4);
// i am seeing dup uh48's in the m_winnerTree
int32_t firstIp = m_waitingTreeKey.n0 & 0xffffffff;
{
ScopedLock sl(m_winnerTree.getLock());
int32_t ntn = m_winnerTree.getNumNodes_unlocked();
HashTableX dedup;
dedup.set(8, 0, (int32_t)2 * ntn, NULL, 0, false, "windt");
int32_t added = 0;
for (int32_t node = m_winnerTree.getFirstNode_unlocked();
node >= 0; node = m_winnerTree.getNextNode_unlocked(node)) {
// get data for that
const SpiderRequest *sreq2 = reinterpret_cast<const SpiderRequest *>(m_winnerTree.getData_unlocked(node));
// sanity
if (sreq2->m_firstIp != firstIp) gbshutdownAbort(true);
//if ( sreq2->m_spiderTimeMS < 0 ) gbshutdownAbort(true);
if (sreq2->m_ufn < 0) gbshutdownAbort(true);
if (sreq2->m_priority == -1) gbshutdownAbort(true);
// check for errors
bool hadError = false;
// parse it up
int32_t winIp;
int32_t winPriority;
int64_t winSpiderTimeMS;
int64_t winUh48;
const key192_t *winKey = reinterpret_cast<const key192_t *>(m_winnerTree.getKey_unlocked(node));
parseWinnerTreeKey(winKey, &winIp, &winPriority, &winSpiderTimeMS, &winUh48);
// sanity
if (winIp != firstIp) gbshutdownAbort(true);
if (winUh48 != sreq2->getUrlHash48()) gbshutdownAbort(true);
// make the doledb key
key96_t doleKey = Doledb::makeKey(winPriority, winSpiderTimeMS / 1000, winUh48, false);
// dedup. if we add dups the problem is is that they
// overwrite the key in doledb yet the doleiptable count
// remains undecremented and doledb is empty and never
// replenished because the firstip can not be added to
// waitingTree because doleiptable count is > 0. this was
// causing spiders to hang for collections. i am not sure
// why we should be getting dups in winnertree because they
// have the same uh48 and that is the key in the tree.
if (dedup.isInTable(&winUh48)) {
log(LOG_WARN, "spider: got dup uh48=%" PRIu64" dammit", winUh48);
continue;
}
// count it
added++;
// do not allow dups
dedup.addKey(&winUh48);
// store doledb key first
if (!doleBuf.safeMemcpy(&doleKey, sizeof(key96_t)))
hadError = true;
// then size of spiderrequest
if (!doleBuf.pushLong(sreq2->getRecSize()))
hadError = true;
// then the spiderrequest encapsulated
if (!doleBuf.safeMemcpy(sreq2, sreq2->getRecSize()))
hadError = true;
// note and error
if (hadError) {
log(LOG_ERROR,"spider: error making doledb list: %s",
mstrerror(g_errno));
return true;
}
}
}
return addDoleBufIntoDoledb ( &doleBuf , false );
}
bool SpiderColl::validateDoleBuf(const SafeBuf *doleBuf) {
const char *doleBufEnd = doleBuf->getBufPtr();
// get offset
const char *pstart = doleBuf->getBufStart();
const char *p = pstart;
int32_t jump = *(int32_t *)p;
p += 4;
// sanity
if ( jump < 4 || jump > doleBuf->length() )
gbshutdownCorrupted();
bool gotIt = false;
for ( ; p < doleBuf->getBufPtr() ; ) {
if ( p == pstart + jump )
gotIt = true;
// first is doledbkey
p += sizeof(key96_t);
// then size of spider request
int32_t recSize = *(int32_t *)p;
p += 4;
// the spider request encapsulated
SpiderRequest *sreq3;
sreq3 = (SpiderRequest *)p;
// point "p" to next spiderrequest
if ( recSize != sreq3->getRecSize() ) gbshutdownCorrupted();
p += recSize;//sreq3->getRecSize();
// sanity
if ( p > doleBufEnd ) gbshutdownCorrupted();
if ( p < pstart ) gbshutdownCorrupted();
}
if ( ! gotIt ) gbshutdownCorrupted();
return true;
}
bool SpiderColl::addDoleBufIntoDoledb ( SafeBuf *doleBuf, bool isFromCache ) {
////////////////////
//
// UPDATE WAITING TREE ENTRY
//
// Normally the "spidertime" is 0 for a firstIp. This will make it
// a future time if it is not yet due for spidering.
//
////////////////////
int32_t firstIp = m_waitingTreeKey.n0 & 0xffffffff;
char ipbuf[16]; //for various log purposes
{
ScopedLock sl(m_waitingTree.getLock());
/// @todo ALC could we avoid locking winnerTree?
ScopedLock sl2(m_winnerTree.getLock());
// sanity check. how did this happen? it messes up our crawl!
// maybe a doledb add went through? so we should add again?
int32_t wn = m_waitingTree.getNode_unlocked(0, (char *)&m_waitingTreeKey);
if (wn < 0) {
log(LOG_ERROR,"spider: waiting tree key removed while reading list for "
"%s (%" PRId32")", m_coll, (int32_t)m_collnum);
return true;
}
// if best request has a future spiderTime, at least update
// the wait tree with that since we will not be doling this request
// right now.
if (m_winnerTree.isEmpty_unlocked() && m_minFutureTimeMS && !isFromCache) {
// save memory
m_winnerTree.reset_unlocked();
m_winnerTable.reset();
// if in the process of being added to doledb or in doledb...
if (isInDoledbIpTable(firstIp)) {
// sanity i guess. remove this line if it hits this!
log(LOG_ERROR, "spider: wtf????");
//gbshutdownLogicError();
return true;
}
// before you set a time too far into the future, if we
// did receive new spider requests, entertain those
if (m_gotNewDataForScanningIp &&
// we had twitter.com with a future spider date
// on the pe2 cluster but we kept hitting this, so
// don't do this anymore if we scanned a ton of bytes
// like we did for twitter.com because it uses all the
// resources when we can like 150MB of spider requests
// for a single firstip
m_totalBytesScanned < 30000) {
logDebug(g_conf.m_logDebugSpider, "spider: received new requests, not updating waiting tree with future time");
return true;
}
// get old time
uint64_t oldSpiderTimeMS = m_waitingTreeKey.n1;
oldSpiderTimeMS <<= 32;
oldSpiderTimeMS |= (m_waitingTreeKey.n0 >> 32);
// delete old node
if (wn >= 0) {
m_waitingTree.deleteNode_unlocked(wn, false);
}
// invalidate
m_waitingTreeKeyValid = false;
key96_t wk2 = makeWaitingTreeKey(m_minFutureTimeMS, firstIp);
logDebug(g_conf.m_logDebugSpider, "spider: scan replacing waitingtree key oldtime=%" PRIu32" newtime=%" PRIu32" firstip=%s",
(uint32_t)(oldSpiderTimeMS / 1000LL), (uint32_t)(m_minFutureTimeMS / 1000LL), iptoa(firstIp,ipbuf));
// this should never fail since we deleted one above
m_waitingTree.addKey_unlocked(&wk2);
logDebug(g_conf.m_logDebugSpider, "spider: RE-added time=%" PRId64" ip=%s to waiting tree node",
m_minFutureTimeMS, iptoa(firstIp,ipbuf));
// keep the table in sync now with the time
addToWaitingTable(firstIp, m_minFutureTimeMS);
return true;
}
}
char *doleBufEnd = doleBuf->getBufPtr();
// add it to doledb ip table now so that waiting tree does not
// immediately get another spider request from this same ip added
// to it while the msg4 is out. but if add failes we totally bail
// with g_errno set
// . MDW: now we have a list of doledb records in a SafeBuf:
// . scan the requests in safebuf
// get offset
char *p = doleBuf->getBufStart();
int32_t jump = *(int32_t *)p;
// sanity
if ( jump < 4 || jump > doleBuf->length() ) {
gbshutdownCorrupted(); }
// the jump includes itself
p += jump;
//for ( ; p < m_doleBuf.getBuf() ; ) {
// save it
char *doledbRec = p;
// first is doledbkey
p += sizeof(key96_t);
// then size of spider request
p += 4;
// the spider request encapsulated
SpiderRequest *sreq3;
sreq3 = (SpiderRequest *)p;
// point "p" to next spiderrequest
p += sreq3->getRecSize();
// sanity
if ( p > doleBufEnd ) gbshutdownCorrupted();
// for caching logic below, set this
int32_t doledbRecSize = sizeof(key96_t) + 4 + sreq3->getRecSize();
// process sreq3 my incrementing the firstip count in
// m_doledbIpTable
if ( !addToDoledbIpTable(sreq3) ) return true;
// now cache the REST of the spider requests to speed up scanning.
// better than adding 400 recs per firstip to doledb because
// msg5's call to RdbTree::getList() is way faster.
// even if m_doleBuf is from the cache, re-add it to lose the
// top rec.
// allow this to add a 0 length record otherwise we keep the same
// old url in here and keep spidering it over and over again!
// remove from cache? if we added the last spider request in the
// cached dolebuf to doledb then remove it from cache so it's not
// a cached empty dolebuf and we recompute it not using the cache.
if ( p >= doleBufEnd ) {
FxBlobCacheLock<int32_t> rcl(g_spiderLoop.m_winnerListCache);
g_spiderLoop.m_winnerListCache.remove(firstIp);
} else {
// insert (or replace) the3 list in the cache
char *x = doleBuf->getBufStart();
// the new offset is the next record after the one we
// just added to doledb
int32_t newJump = (int32_t)(p - x);
int32_t oldJump = *(int32_t *)x;
// NO! we do a copy in rdbcache and copy the thing over
// since we promote it. so this won't work...
*(int32_t *)x = newJump;
if ( newJump >= doleBuf->length() ) gbshutdownCorrupted();
if ( newJump < 4 ) gbshutdownCorrupted();
logDebug(g_conf.m_logDebugSpider, "spider: rdbcache: updating %" PRId32" bytes of SpiderRequests "
"to winnerlistcache for ip %s oldjump=%" PRId32" newJump=%" PRId32" ptr=%p",
doleBuf->length(),iptoa(firstIp,ipbuf),oldJump, newJump, x);
//validateDoleBuf ( doleBuf );
// inherit timestamp. if 0, RdbCache will set to current time
// don't re-add just use the same modified buffer so we
// don't churn the cache.
// but do add it to cache if not already in there yet.
FxBlobCacheLock<int32_t> rcl(g_spiderLoop.m_winnerListCache);
g_spiderLoop.m_winnerListCache.insert(firstIp, doleBuf->getBufStart(), doleBuf->length());
}
// keep it on stack now that doledb is tree-only
RdbList tmpList;
tmpList.setFromPtr ( doledbRec , doledbRecSize , RDB_DOLEDB );
// now that doledb is tree-only and never dumps to disk, just
// add it directly
g_doledb.getRdb()->addList(m_collnum, &tmpList);
logDebug(g_conf.m_logDebugSpider, "spider: adding doledb tree node size=%" PRId32, doledbRecSize);
int32_t storedFirstIp = (m_waitingTreeKey.n0) & 0xffffffff;
// log it
if ( g_conf.m_logDebugSpcache ) {
uint64_t spiderTimeMS = m_waitingTreeKey.n1;
spiderTimeMS <<= 32;
spiderTimeMS |= (m_waitingTreeKey.n0 >> 32);
logf(LOG_DEBUG,"spider: removing doled waitingtree key"
" spidertime=%" PRIu64" firstIp=%s "
,spiderTimeMS,
iptoa(storedFirstIp,ipbuf)
);
}
{
ScopedLock sl(m_waitingTree.getLock());
// before adding to doledb remove from waiting tree so we do not try
// to readd to doledb...
m_waitingTree.deleteNode_unlocked(0, (char *)&m_waitingTreeKey, true);
removeFromWaitingTable(storedFirstIp);
}
// invalidate
m_waitingTreeKeyValid = false;
// note that ip as being in dole table
if ( g_conf.m_logDebugSpider )
log(LOG_WARN, "spider: added best sreq for ip=%s to doletable AND "
"removed from waiting table",
iptoa(firstIp,ipbuf));
// save memory
m_winnerTree.reset();
m_winnerTable.reset();
//validateDoleBuf( doleBuf );
// add did not block
return true;
}
uint64_t SpiderColl::getSpiderTimeMS(const SpiderRequest *sreq, int32_t ufn, const SpiderReply *srep, int64_t nowMS) {
// . get the scheduled spiderTime for it
// . assume this SpiderRequest never been successfully spidered
int64_t spiderTimeMS = ((uint64_t)sreq->m_addedTime) * 1000LL;
// if injecting for first time, use that!
if (!srep && sreq->m_isInjecting) {
return spiderTimeMS;
}
if (sreq->m_isPageReindex) {
int64_t nextReindexTimeMS = m_lastReindexTimeMS + m_cr->m_spiderReindexDelayMS;
if (nextReindexTimeMS > nowMS) {
return nextReindexTimeMS;
}
m_lastReindexTimeMS = nowMS;
return nextReindexTimeMS;
}
// to avoid hammering an ip, get last time we spidered it...
RdbCacheLock rcl(m_lastDownloadCache);
int64_t lastMS = m_lastDownloadCache.getLongLong(m_collnum, sreq->m_firstIp, -1, true);
rcl.unlock();
// -1 means not found
if ((int64_t)lastMS == -1) {
lastMS = 0;
}
// sanity
if ((int64_t)lastMS < -1) {
log(LOG_ERROR,"spider: corrupt last time in download cache. nuking.");
lastMS = 0;
}
// min time we can spider it
int64_t minSpiderTimeMS1 = lastMS + m_cr->m_spiderIpWaits[ufn];
/////////////////////////////////////////////////
/////////////////////////////////////////////////
// crawldelay table check!!!!
/////////////////////////////////////////////////
/////////////////////////////////////////////////
int64_t minSpiderTimeMS2 = 0;
{
ScopedLock sl(m_cdTableMtx);
int32_t *cdp = (int32_t *)m_cdTable.getValue(&sreq->m_domHash32);
if (cdp && *cdp >= 0) minSpiderTimeMS2 = lastMS + *cdp;
}
// ensure min
if ( spiderTimeMS < minSpiderTimeMS1 ) spiderTimeMS = minSpiderTimeMS1;
if ( spiderTimeMS < minSpiderTimeMS2 ) spiderTimeMS = minSpiderTimeMS2;
// if no reply, use that
if (!srep) {
return spiderTimeMS;
}
// if this is not the first try, then re-compute the spiderTime
// based on that last time
// sanity check
if ( srep->m_spideredTime <= 0 ) {
// a lot of times these are corrupt! wtf???
//spiderTimeMS = minSpiderTimeMS;
return spiderTimeMS;
//gbshutdownAbort(true);
}
// compute new spiderTime for this guy, in seconds
int64_t waitInSecs = (uint64_t)(m_cr->m_spiderFreqs[ufn]*3600*24.0);
// when it was spidered
int64_t lastSpideredMS = ((uint64_t)srep->m_spideredTime) * 1000;
// . when we last attempted to spider it... (base time)
// . use a lastAttempt of 0 to indicate never!
// (first time)
int64_t minSpiderTimeMS3 = lastSpideredMS + (waitInSecs * 1000LL);
// ensure min
if ( spiderTimeMS < minSpiderTimeMS3 ) spiderTimeMS = minSpiderTimeMS3;
// sanity
if ( (int64_t)spiderTimeMS < 0 ) { gbshutdownAbort(true); }
return spiderTimeMS;
}
// . decrement priority
// . will also set m_sc->m_nextDoledbKey
// . will also set m_sc->m_msg5StartKey
void SpiderColl::devancePriority() {
// try next
m_pri2 = m_pri2 - 1;
// how can this happen?
if ( m_pri2 < -1 ) m_pri2 = -1;
// bogus?
if ( m_pri2 < 0 ) return;
// set to next priority otherwise
m_nextDoledbKey = m_nextKeys [m_pri2];
// and the read key
m_msg5StartKey = m_nextDoledbKey;
}
void SpiderColl::setPriority(int32_t pri) {
m_pri2 = pri;
m_nextDoledbKey = m_nextKeys [ m_pri2 ];
m_msg5StartKey = m_nextDoledbKey;
}
bool SpiderColl::tryToDeleteSpiderColl ( SpiderColl *sc , const char *msg ) {
// if not being deleted return false
if ( ! sc->m_deleteMyself ) return false;
// otherwise always return true
if ( sc->m_isLoading ) {
log(LOG_INFO, "spider: deleting sc=%p for collnum=%" PRId32" "
"waiting3",
sc,(int32_t)sc->m_collnum);
return true;
}
// if ( sc->m_gettingWaitingTreeList ) {
// log(LOG_INFO, "spider: deleting sc=%p for collnum=%" PRId32"
//"waiting6",
// (int32_t)sc,(int32_t)sc->m_collnum);
// return true;
// }
// there's still a core of someone trying to write to someting
// in "sc" so we have to try to fix that. somewhere in xmldoc.cpp
// or spider.cpp. everyone should get sc from cr everytime i'd think
log(LOG_INFO, "spider: deleting sc=%p for collnum=%" PRId32" (msg=%s)",
sc,(int32_t)sc->m_collnum,msg);
// . make sure nobody has it
// . cr might be NULL because Collectiondb.cpp::deleteRec2() might
// have nuked it
//CollectionRec *cr = sc->m_cr;
// use fake ptrs for easier debugging
//if ( cr ) cr->m_spiderColl = (SpiderColl *)0x987654;//NULL;
mdelete ( sc , sizeof(SpiderColl),"postdel1");
delete ( sc );
return true;
}
// . returns false with g_errno set on error
// . Rdb.cpp should call this when it receives a doledb key
// . when trying to add a SpiderRequest to the waiting tree we first check
// the doledb table to see if doledb already has an sreq from this firstIp
// . therefore, we should add the ip to the dole table before we launch the
// Msg4 request to add it to doledb, that way we don't add a bunch from the
// same firstIP to doledb
bool SpiderColl::addToDoledbIpTable(const SpiderRequest *sreq) {
ScopedLock sl(m_doledbIpTableMtx);
// update how many per ip we got doled
int32_t *score = (int32_t *)m_doledbIpTable.getValue32 ( sreq->m_firstIp );
// debug point
if ( g_conf.m_logDebugSpider ){//&&1==2 ) { // disable for now, spammy
int64_t uh48 = sreq->getUrlHash48();
int64_t pdocid = sreq->getParentDocId();
int32_t ss = 1;
if ( score ) ss = *score + 1;
// if for some reason this collides with another key
// already in doledb then our counts are off
char ipbuf[16];
log(LOG_DEBUG, "spider: added to doletbl uh48=%" PRIu64" parentdocid=%" PRIu64" "
"ipdolecount=%" PRId32" ufn=%" PRId32" priority=%" PRId32" firstip=%s",
uh48,pdocid,ss,(int32_t)sreq->m_ufn,(int32_t)sreq->m_priority,
iptoa(sreq->m_firstIp,ipbuf));
}
// we had a score there already, so inc it
if ( score ) {
// inc it
*score = *score + 1;
// sanity check
if ( *score <= 0 ) { gbshutdownAbort(true); }
// only one per ip!
// not any more! we allow MAX_WINNER_NODES per ip!
char ipbuf[16];
if ( *score > MAX_WINNER_NODES )
log(LOG_ERROR,"spider: crap. had %" PRId32" recs in doledb for %s "
"from %s."
"how did this happen?",
(int32_t)*score,m_coll,iptoa(sreq->m_firstIp,ipbuf));
// now we log it too
if ( g_conf.m_logDebugSpider )
log(LOG_DEBUG,"spider: added ip=%s to doleiptable "
"(score=%" PRId32")",
iptoa(sreq->m_firstIp,ipbuf),*score);
}
else {
// ok, add new slot
int32_t val = 1;
char ipbuf[16];
if ( ! m_doledbIpTable.addKey ( &sreq->m_firstIp , &val ) ) {
// log it, this is bad
log(LOG_ERROR,"spider: failed to add ip %s to dole ip tbl",
iptoa(sreq->m_firstIp,ipbuf));
// return true with g_errno set on error
return false;
}
// now we log it too
if ( g_conf.m_logDebugSpider )
log(LOG_DEBUG,"spider: added ip=%s to doleiptable "
"(score=1)",iptoa(sreq->m_firstIp,ipbuf));
}
// . these priority slots in doledb are not empty
// . unmark individual priority buckets
// . do not skip them when scanning for urls to spiderd
int32_t pri = sreq->m_priority;
// reset scan for this priority in doledb
m_nextKeys[pri] = Doledb::makeFirstKey2 ( pri );
return true;
}
void SpiderColl::removeFromDoledbIpTable(int32_t firstIp) {
ScopedLock sl(m_doledbIpTableMtx);
// . decrement doledb table ip count for firstIp
// . update how many per ip we got doled
int32_t *score = (int32_t *)m_doledbIpTable.getValue32 ( firstIp );
// wtf! how did this spider without being doled?
if ( ! score ) {
//if ( ! srep->m_fromInjectionRequest )
char ipbuf[16];
log(LOG_ERROR, "spider: corruption. received spider reply whose "
"ip has no entry in dole ip table. firstip=%s",
iptoa(firstIp,ipbuf));
return;
}
// reduce it
*score = *score - 1;
// now we log it too
if ( g_conf.m_logDebugSpider ) {
char ipbuf[16];
log(LOG_DEBUG,"spider: removed ip=%s from doleiptable (newcount=%" PRId32")",
iptoa(firstIp,ipbuf), *score);
}
// remove if zero
if ( *score == 0 ) {
// this can file if writes are disabled on this hashtablex
// because it is saving
m_doledbIpTable.removeKey ( &firstIp );
}
// wtf!
if ( *score < 0 ) { gbshutdownAbort(true); }
// all done?
if ( g_conf.m_logDebugSpider ) {
// log that too!
char ipbuf[16];
logf(LOG_DEBUG,"spider: discounting firstip=%s to %" PRId32,
iptoa(firstIp,ipbuf),*score);
}
}
int32_t SpiderColl::getDoledbIpTableCount() const {
ScopedLock sl(m_doledbIpTableMtx);
return m_doledbIpTable.getNumUsedSlots();
}
bool SpiderColl::isInDoledbIpTable(int32_t firstIp) const {
ScopedLock sl(m_doledbIpTableMtx);
return m_doledbIpTable.isInTable(&firstIp);
}
bool SpiderColl::isDoledbIpTableEmpty() const {
ScopedLock sl(m_doledbIpTableMtx);
return m_doledbIpTable.isEmpty();
}
void SpiderColl::clearDoledbIpTable() {
ScopedLock sl(m_doledbIpTableMtx);
m_doledbIpTable.clear();
}
std::vector<uint32_t> SpiderColl::getDoledbIpTable() const {
ScopedLock sl(m_doledbIpTableMtx);
std::vector<uint32_t> r;
r.reserve(m_doledbIpTable.getNumUsedSlots());
for(int slotNumber=0; slotNumber<m_doledbIpTable.getNumSlots(); slotNumber++) {
if(!m_doledbIpTable.isEmpty(slotNumber)) {
r.push_back(*(const uint32_t*)m_doledbIpTable.getKeyFromSlot(slotNumber));
}
}
return r;
}
bool SpiderColl::addToWaitingTable(int32_t firstIp, int64_t timeMs) {
ScopedLock sl(m_waitingTableMtx);
return m_waitingTable.addKey(&firstIp, &timeMs);
}
bool SpiderColl::getFromWaitingTable(int32_t firstIp, int64_t *timeMs) {
ScopedLock sl(m_waitingTableMtx);
int32_t ws = m_waitingTable.getSlot(&firstIp);
if (ws < 0) {
return false;
}
*timeMs = m_waitingTable.getScore64FromSlot(ws);
return true;
}
void SpiderColl::removeFromWaitingTable(int32_t firstIp) {
ScopedLock sl(m_waitingTableMtx);
m_waitingTable.removeKey(&firstIp);
}
int32_t SpiderColl::getWaitingTableCount() const {
ScopedLock sl(m_waitingTableMtx);
return m_waitingTable.getNumUsedSlots();
}
bool SpiderColl::isInWaitingTable(int32_t firstIp) const {
ScopedLock sl(m_waitingTableMtx);
return m_waitingTable.isInTable(&firstIp);
}
bool SpiderColl::setWaitingTableSize(int32_t numSlots) {
ScopedLock sl(m_waitingTableMtx);
return m_waitingTable.setTableSize(numSlots, NULL, 0);
}
void SpiderColl::clearWaitingTable() {
ScopedLock sl(m_waitingTableMtx);
m_waitingTable.clear();
}