e515e92dae
It has always been local time since ... forever. We rely on NTP doing its job.
3509 lines
114 KiB
C++
3509 lines
114 KiB
C++
#include "SpiderColl.h"
|
|
#include "Spider.h"
|
|
#include "SpiderLoop.h"
|
|
#include "Doledb.h"
|
|
#include "Collectiondb.h"
|
|
#include "UdpServer.h"
|
|
#include "Stats.h"
|
|
#include "SafeBuf.h"
|
|
#include "Repair.h" //g_repairMode
|
|
#include "Process.h"
|
|
#include "JobScheduler.h"
|
|
#include "XmlDoc.h"
|
|
#include "ip.h"
|
|
#include "Conf.h"
|
|
#include "Mem.h"
|
|
#include "SpiderdbRdbSqliteBridge.h"
|
|
#include "SpiderdbUtil.h"
|
|
#include "UrlBlockCheck.h"
|
|
#include "ScopedLock.h"
|
|
#include "Sanity.h"
|
|
#include "Errno.h"
|
|
|
|
|
|
#define OVERFLOWLISTSIZE 200
|
|
|
|
// How large chunks of spiderdb to load in for each read
|
|
#define SR_READ_SIZE (512*1024)
|
|
//Hint for size allocation to m_winnerTree
|
|
#define MAX_REQUEST_SIZE (sizeof(SpiderRequest)+MAX_URL_LEN+1)
|
|
#define MAX_SP_REPLY_SIZE sizeof(SpiderReply)
|
|
|
|
|
|
|
|
static key96_t makeWaitingTreeKey ( uint64_t spiderTimeMS , int32_t firstIp ) {
|
|
// sanity
|
|
if ( ((int64_t)spiderTimeMS) < 0 ) gbshutdownAbort(true);
|
|
// make the wait tree key
|
|
key96_t wk;
|
|
wk.n1 = (spiderTimeMS>>32);
|
|
wk.n0 = (spiderTimeMS&0xffffffff);
|
|
wk.n0 <<= 32;
|
|
wk.n0 |= (uint32_t)firstIp;
|
|
// sanity
|
|
if ( wk.n1 & 0x8000000000000000LL ) gbshutdownAbort(true);
|
|
return wk;
|
|
}
|
|
|
|
/////////////////////////
|
|
///////////////////////// SpiderColl
|
|
/////////////////////////
|
|
|
|
void SpiderColl::setCollectionRec ( CollectionRec *cr ) {
|
|
m_cr = cr;
|
|
}
|
|
|
|
CollectionRec *SpiderColl::getCollectionRec() {
|
|
return m_cr;
|
|
}
|
|
const CollectionRec *SpiderColl::getCollectionRec() const {
|
|
return m_cr;
|
|
}
|
|
|
|
SpiderColl::SpiderColl(CollectionRec *cr) {
|
|
m_overflowList = NULL;
|
|
m_lastOverflowFirstIp = 0;
|
|
m_deleteMyself = false;
|
|
m_isLoading = false;
|
|
m_gettingWaitingTreeList = false;
|
|
m_lastScanTime = 0;
|
|
m_isPopulatingDoledb = false;
|
|
m_numAdded = 0;
|
|
m_numBytesScanned = 0;
|
|
m_lastPrintCount = 0;
|
|
m_siteListIsEmptyValid = false;
|
|
m_cr = NULL;
|
|
// re-set this to min and set m_needsWaitingTreeRebuild to true
|
|
// when the admin updates the url filters page
|
|
m_waitingTreeNeedsRebuild = false;
|
|
m_waitingTreeNextKey.setMin();
|
|
m_spidersOut = 0;
|
|
m_coll[0] = '\0';
|
|
|
|
// PVS-Studio
|
|
m_lastReplyValid = false;
|
|
memset(m_lastReplyBuf, 0, sizeof(m_lastReplyBuf));
|
|
m_didRead = false;
|
|
m_siteListIsEmpty = false;
|
|
m_tailIp = 0;
|
|
m_tailPriority = 0;
|
|
m_tailTimeMS = 0;
|
|
m_tailUh48 = 0;
|
|
m_minFutureTimeMS = 0;
|
|
m_gettingWaitingTreeList = false;
|
|
m_lastScanTime = 0;
|
|
m_waitingTreeNeedsRebuild = false;
|
|
m_numAdded = 0;
|
|
m_numBytesScanned = 0;
|
|
m_collnum = -1;
|
|
m_lastReindexTimeMS = 0;
|
|
m_countingPagesIndexed = false;
|
|
m_lastReqUh48a = 0;
|
|
m_lastReqUh48b = 0;
|
|
m_lastRepUh48 = 0;
|
|
m_waitingTreeKeyValid = false;
|
|
m_scanningIp = 0;
|
|
m_gotNewDataForScanningIp = 0;
|
|
m_lastListSize = 0;
|
|
m_lastScanningIp = 0;
|
|
m_totalBytesScanned = 0;
|
|
m_deleteMyself = false;
|
|
m_pri2 = 0;
|
|
memset(m_outstandingSpiders, 0, sizeof(m_outstandingSpiders));
|
|
m_overflowList = NULL;
|
|
m_totalNewSpiderRequests = 0;
|
|
m_lastSreqUh48 = 0;
|
|
memset(m_cblocks, 0, sizeof(m_cblocks));
|
|
m_pageNumInlinks = 0;
|
|
m_lastCBlockIp = 0;
|
|
m_lastOverflowFirstIp = 0;
|
|
|
|
reset();
|
|
|
|
// reset this
|
|
memset ( m_outstandingSpiders , 0 , 4 * MAX_SPIDER_PRIORITIES );
|
|
|
|
m_collnum = cr->m_collnum;
|
|
strcpy(m_coll, cr->m_coll);
|
|
m_cr = cr;
|
|
|
|
// set first doledb scan key
|
|
m_nextDoledbKey.setMin();
|
|
|
|
// mark it as loading so it can't be deleted while loading
|
|
m_isLoading = true;
|
|
|
|
// . load its tables from disk
|
|
// . crap i think this might call quickpoll and we get a parm
|
|
// update to delete this spider coll!
|
|
load();
|
|
|
|
m_isLoading = false;
|
|
}
|
|
|
|
// load the tables that we set when m_doInitialScan is true
|
|
bool SpiderColl::load ( ) {
|
|
|
|
// error?
|
|
int32_t err = 0;
|
|
// make the dir
|
|
const char *coll = g_collectiondb.getCollName(m_collnum);
|
|
// sanity check
|
|
if ( ! coll || coll[0]=='\0' ) {
|
|
log(LOG_ERROR,"spider: bad collnum of %" PRId32,(int32_t)m_collnum);
|
|
g_errno = ENOCOLLREC;
|
|
return false;
|
|
//gbshutdownAbort(true);
|
|
}
|
|
|
|
// reset this once
|
|
m_isPopulatingDoledb = false;
|
|
|
|
// keep it kinda low if we got a ton of collections
|
|
int32_t maxMem = 15000;
|
|
int32_t maxNodes = 500;
|
|
if ( g_collectiondb.getNumRecsUsed() > 500 ) {
|
|
maxNodes = 100;
|
|
maxMem = maxNodes * 20;
|
|
}
|
|
|
|
if ( ! m_lastDownloadCache.init ( maxMem , // maxcachemem,
|
|
8 , // fixed data size (MS)
|
|
maxNodes , // max nodes
|
|
"downcache", // dbname
|
|
false , // load from disk?
|
|
12 , // key size (firstip)
|
|
-1 )) {// numPtrsMax
|
|
log(LOG_WARN, "spider: dcache init failed");
|
|
return false;
|
|
}
|
|
|
|
// this has a quickpoll in it, so that quickpoll processes
|
|
// a restart request from crawlbottesting for this collnum which
|
|
// calls Collectiondb::resetColl2() which calls deleteSpiderColl()
|
|
// on THIS spidercoll, but our m_loading flag is set
|
|
if (!m_sniTable.set ( 4,8,0,NULL,0,false,"snitbl") )
|
|
return false;
|
|
if (!m_cdTable.set (4,4,0,NULL,0,false,"cdtbl"))
|
|
return false;
|
|
|
|
// doledb seems to have like 32000 entries in it
|
|
int32_t numSlots = 0; // was 128000
|
|
if(!m_doledbIpTable.set(4,4,numSlots,NULL,0,false,"doleip"))
|
|
return false;
|
|
|
|
// this should grow dynamically...
|
|
if (!m_waitingTable.set (4,8,16,NULL,0,false,"waittbl"))
|
|
return false;
|
|
|
|
// . a tree of keys, key is earliestSpiderTime|ip (key=12 bytes)
|
|
// . earliestSpiderTime is 0 if unknown
|
|
// . max nodes is 1M but we should grow dynamically! TODO
|
|
// . let's up this to 5M because we are hitting the limit in some
|
|
// test runs...
|
|
// . try going to 20M now since we hit it again...
|
|
// . start off at just 10 nodes since we grow dynamically now
|
|
if (!m_waitingTree.set(0, 10, -1, true, "waittree2", "waitingtree", sizeof(key96_t))) {
|
|
return false;
|
|
}
|
|
m_waitingTreeKeyValid = false;
|
|
m_scanningIp = 0;
|
|
|
|
// make dir
|
|
char dir[500];
|
|
sprintf(dir,"%scoll.%s.%" PRId32,g_hostdb.m_dir,coll,(int32_t)m_collnum);
|
|
|
|
// load in the waiting tree, IPs waiting to get into doledb
|
|
BigFile file;
|
|
file.set ( dir , "waitingtree-saved.dat");
|
|
bool treeExists = file.doesExist();
|
|
|
|
// load the table with file named "THISDIR/saved"
|
|
if ( treeExists && !m_waitingTree.fastLoad(&file, &m_waitingMem) )
|
|
err = g_errno;
|
|
|
|
// init wait table. scan wait tree and add the ips into table.
|
|
if ( ! makeWaitingTable() ) err = g_errno;
|
|
// save it
|
|
g_errno = err;
|
|
// return false on error
|
|
if ( g_errno ) {
|
|
// note it
|
|
log(LOG_WARN,"spider: had error loading initial table: %s", mstrerror(g_errno));
|
|
return false;
|
|
}
|
|
|
|
// . do this now just to keep everything somewhat in sync
|
|
// . we lost dmoz.org and could not get it back in because it was
|
|
// in the doleip table but NOT in doledb!!!
|
|
return makeDoledbIPTable();
|
|
}
|
|
|
|
// . scan all spiderRequests in doledb at startup and add them to our tables
|
|
// . then, when we scan spiderdb and add to orderTree/urlhashtable it will
|
|
// see that the request is in doledb and set m_doled...
|
|
// . initialize the dole table for that then
|
|
// quickly scan doledb and add the doledb records to our trees and
|
|
// tables. that way if we receive a SpiderReply() then addSpiderReply()
|
|
// will be able to find the associated SpiderRequest.
|
|
// MAKE SURE to put each spiderrequest into m_doleTable... and into
|
|
// maybe m_urlHashTable too???
|
|
// this should block since we are at startup...
|
|
bool SpiderColl::makeDoledbIPTable() {
|
|
log(LOG_DEBUG,"spider: making dole ip table for %s",m_coll);
|
|
|
|
key96_t startKey ; startKey.setMin();
|
|
key96_t endKey ; endKey.setMax();
|
|
key96_t lastKey ; lastKey.setMin();
|
|
// get a meg at a time
|
|
int32_t minRecSizes = 1024*1024;
|
|
Msg5 msg5;
|
|
RdbList list;
|
|
|
|
for (;;) {
|
|
// use msg5 to get the list, should ALWAYS block since no threads
|
|
if (!msg5.getList(RDB_DOLEDB,
|
|
m_collnum,
|
|
&list,
|
|
&startKey,
|
|
&endKey,
|
|
minRecSizes,
|
|
true, // includeTree?
|
|
0, // startFileNum
|
|
-1, // numFiles
|
|
NULL, // state
|
|
NULL, // callback
|
|
0, // niceness
|
|
false, // err correction?
|
|
-1, // maxRetries
|
|
false)) { // isRealMerge
|
|
log(LOG_LOGIC, "spider: getList did not block.");
|
|
return false;
|
|
}
|
|
|
|
// shortcut
|
|
int32_t minSize = (int32_t)(sizeof(SpiderRequest) + sizeof(key96_t) + 4 - MAX_URL_LEN);
|
|
// all done if empty
|
|
if (list.isEmpty()) {
|
|
log(LOG_DEBUG,"spider: making dole ip table done.");
|
|
return true;
|
|
}
|
|
|
|
// loop over entries in list
|
|
for (list.resetListPtr(); !list.isExhausted(); list.skipCurrentRecord()) {
|
|
// get rec
|
|
const char *rec = list.getCurrentRec();
|
|
// get key
|
|
key96_t k = list.getCurrentKey();
|
|
|
|
// skip deletes -- how did this happen?
|
|
if ((k.n0 & 0x01) == 0) {
|
|
continue;
|
|
}
|
|
|
|
// check this out
|
|
int32_t recSize = list.getCurrentRecSize();
|
|
// zero?
|
|
if (recSize <= 0) gbshutdownCorrupted();
|
|
|
|
// 16 is bad too... wtf is this?
|
|
if (recSize <= 16) {
|
|
continue;
|
|
}
|
|
|
|
// crazy?
|
|
if (recSize <= minSize) gbshutdownAbort(true);
|
|
// . doledb key is 12 bytes, followed by a 4 byte datasize
|
|
// . so skip that key and dataSize to point to spider request
|
|
const SpiderRequest *sreq = (const SpiderRequest *)(rec + sizeof(key96_t) + 4);
|
|
// add to dole tables
|
|
if (!addToDoledbIpTable(sreq)) {
|
|
// return false with g_errno set on error
|
|
return false;
|
|
}
|
|
}
|
|
startKey = *(key96_t *)list.getLastKey();
|
|
startKey++;
|
|
|
|
// watch out for wrap around
|
|
if (startKey < *(key96_t *)list.getLastKey()) {
|
|
log(LOG_DEBUG,"spider: making dole ip table done.");
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
|
|
CollectionRec *SpiderColl::getCollRec() {
|
|
CollectionRec *cr = g_collectiondb.getRec(m_collnum);
|
|
if ( ! cr ) log(LOG_WARN,"spider: lost coll rec");
|
|
return cr;
|
|
}
|
|
|
|
const CollectionRec *SpiderColl::getCollRec() const {
|
|
const CollectionRec *cr = g_collectiondb.getRec(m_collnum);
|
|
if ( ! cr ) log(LOG_WARN,"spider: lost coll rec");
|
|
return cr;
|
|
}
|
|
|
|
const char *SpiderColl::getCollName() const {
|
|
const CollectionRec *cr = getCollRec();
|
|
if ( ! cr ) return "lostcollection";
|
|
return cr->m_coll;
|
|
}
|
|
|
|
bool SpiderColl::makeWaitingTable ( ) {
|
|
ScopedLock sl(m_waitingTree.getLock());
|
|
|
|
log(LOG_DEBUG,"spider: making waiting table for %s.",m_coll);
|
|
|
|
for (int32_t node = m_waitingTree.getFirstNode_unlocked(); node >= 0;
|
|
node = m_waitingTree.getNextNode_unlocked(node)) {
|
|
// get key
|
|
const key96_t *key = reinterpret_cast<const key96_t*>(m_waitingTree.getKey_unlocked(node));
|
|
// get ip from that
|
|
int32_t ip = (key->n0) & 0xffffffff;
|
|
// spider time is up top
|
|
uint64_t spiderTimeMS = (key->n1);
|
|
spiderTimeMS <<= 32;
|
|
spiderTimeMS |= ((key->n0) >> 32);
|
|
// store in waiting table
|
|
if (!addToWaitingTable(ip, spiderTimeMS)) return false;
|
|
}
|
|
log(LOG_DEBUG,"spider: making waiting table done.");
|
|
return true;
|
|
}
|
|
|
|
|
|
SpiderColl::~SpiderColl () {
|
|
reset();
|
|
}
|
|
|
|
// we call this now instead of reset when Collectiondb::resetColl() is used
|
|
void SpiderColl::clearLocks ( ) {
|
|
g_spiderLoop.clearLocks(m_collnum);
|
|
}
|
|
|
|
void SpiderColl::reset ( ) {
|
|
// reset these for SpiderLoop;
|
|
m_nextDoledbKey.setMin();
|
|
|
|
// set this to -1 here, when we enter spiderDoledUrls() it will
|
|
// see that its -1 and set the m_msg5StartKey
|
|
m_pri2 = -1; // MAX_SPIDER_PRIORITIES - 1;
|
|
|
|
m_isPopulatingDoledb = false;
|
|
|
|
const char *coll = "unknown";
|
|
if ( m_coll[0] ) coll = m_coll;
|
|
log(LOG_DEBUG,"spider: resetting spider cache coll=%s",coll);
|
|
|
|
m_doledbIpTable .reset();
|
|
m_cdTable .reset();
|
|
m_sniTable .reset();
|
|
m_waitingTable.reset();
|
|
m_waitingTree.reset();
|
|
m_waitingMem .reset();
|
|
m_winnerTree.reset();
|
|
m_winnerTable .reset();
|
|
m_dupCache .reset();
|
|
|
|
if ( m_overflowList ) {
|
|
mfree ( m_overflowList , OVERFLOWLISTSIZE * 4 ,"olist" );
|
|
m_overflowList = NULL;
|
|
}
|
|
|
|
// each spider priority in the collection has essentially a cursor
|
|
// that references the next spider rec in doledb to spider. it is
|
|
// used as a performance hack to avoid the massive positive/negative
|
|
// key annihilations related to starting at the top of the priority
|
|
// queue every time we scan it, which causes us to do upwards of
|
|
// 300 re-reads!
|
|
for ( int32_t i = 0 ; i < MAX_SPIDER_PRIORITIES ; i++ ) {
|
|
m_nextKeys[i] = Doledb::makeFirstKey2 ( i );
|
|
}
|
|
|
|
}
|
|
|
|
bool SpiderColl::updateSiteNumInlinksTable(int32_t siteHash32, int32_t sni, time_t timestamp) {
|
|
// do not update if invalid
|
|
if ( sni == -1 ) return true;
|
|
|
|
ScopedLock sl(m_sniTableMtx);
|
|
|
|
// . get entry for siteNumInlinks table
|
|
// . use 32-bit key specialized lookup for speed
|
|
uint64_t *val = (uint64_t *)m_sniTable.getValue32(siteHash32);
|
|
// bail?
|
|
if ( val && ((*val)&0xffffffff) > (uint32_t)timestamp ) return true;
|
|
// . make new data for this key
|
|
// . lower 32 bits is the addedTime
|
|
// . upper 32 bits is the siteNumInlinks
|
|
uint64_t nv = (uint32_t)sni;
|
|
// shift up
|
|
nv <<= 32;
|
|
// or in time
|
|
nv |= (uint32_t)timestamp;//sreq->m_addedTime;
|
|
// just direct update if faster
|
|
if ( val ) *val = nv;
|
|
// store it anew otherwise
|
|
else if ( ! m_sniTable.addKey(&siteHash32,&nv) )
|
|
// return false with g_errno set on error
|
|
return false;
|
|
// success
|
|
return true;
|
|
}
|
|
|
|
// . we call this when we receive a spider reply in Rdb.cpp
|
|
// . returns false and sets g_errno on error
|
|
// . xmldoc.cpp adds reply AFTER the negative doledb rec since we decement
|
|
// the count in m_doledbIpTable here
|
|
bool SpiderColl::addSpiderReply(const SpiderReply *srep) {
|
|
|
|
////
|
|
//
|
|
// skip if not assigned to us for doling
|
|
//
|
|
////
|
|
if ( ! isAssignedToUs ( srep->m_firstIp ) )
|
|
return true;
|
|
|
|
/////////
|
|
//
|
|
// remove the lock here
|
|
//
|
|
//////
|
|
int64_t lockKey = makeLockTableKey ( srep );
|
|
|
|
logDebug(g_conf.m_logDebugSpider, "spider: removing lock uh48=%" PRId64" lockKey=%" PRIu64, srep->getUrlHash48(), lockKey);
|
|
|
|
/////
|
|
//
|
|
// but do note that its spider has returned for populating the
|
|
// waiting tree. addToWaitingTree should not add an entry if
|
|
// a spiderReply is still pending according to the lock table,
|
|
// UNLESS, maxSpidersPerIP is more than what the lock table says
|
|
// is currently being spidered.
|
|
//
|
|
/////
|
|
|
|
// now just remove it since we only spider our own urls
|
|
// and doledb is in memory
|
|
g_spiderLoop.removeLock(lockKey);
|
|
|
|
// update the latest siteNumInlinks count for this "site" (repeatbelow)
|
|
updateSiteNumInlinksTable ( srep->m_siteHash32, srep->m_siteNumInlinks, srep->m_spideredTime );
|
|
|
|
// clear error for this
|
|
g_errno = 0;
|
|
|
|
// no update if injecting or from pagereindex (docid based spider request)
|
|
if (!srep->m_fromInjectionRequest) {
|
|
ScopedLock sl(m_cdTableMtx);
|
|
|
|
// use the domain hash for this guy! since its from robots.txt
|
|
const int32_t *cdp = (const int32_t *)m_cdTable.getValue32(srep->m_domHash32);
|
|
|
|
// update it only if better or empty
|
|
if (!cdp) {
|
|
// update m_sniTable if we should
|
|
// . make new data for this key
|
|
// . lower 32 bits is the spideredTime
|
|
// . upper 32 bits is the crawldelay
|
|
int32_t nv = (int32_t)(srep->m_crawlDelayMS);
|
|
if (!m_cdTable.addKey(&srep->m_domHash32, &nv)) {
|
|
char ipbuf[16];
|
|
log(LOG_WARN, "spider: failed to add crawl delay for firstip=%s", iptoa(srep->m_firstIp,ipbuf));
|
|
|
|
// just ignore
|
|
g_errno = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
// . anytime we add a reply then
|
|
// we must update this downloadTable with the replies
|
|
// SpiderReply::m_downloadEndTime so we can obey sameIpWait
|
|
// . that is the earliest that this url can be respidered, but we
|
|
// also have a sameIpWait constraint we have to consider...
|
|
// . we alone our responsible for adding doledb recs from this ip so
|
|
// this is easy to throttle...
|
|
// . and make sure to only add to this download time hash table if
|
|
// SpiderReply::m_downloadEndTime is non-zero, because zero means
|
|
// no download happened. (TODO: check this)
|
|
// . TODO: consult crawldelay table here too! use that value if is
|
|
// less than our sameIpWait
|
|
// . make m_lastDownloadTable an rdbcache ...
|
|
// . this is 0 for pagereindex docid-based replies
|
|
if (srep->m_downloadEndTime) {
|
|
RdbCacheLock rcl(m_lastDownloadCache);
|
|
m_lastDownloadCache.addLongLong(m_collnum, srep->m_firstIp, srep->m_downloadEndTime);
|
|
|
|
// ignore errors from that, it's just a cache
|
|
g_errno = 0;
|
|
}
|
|
|
|
char ipbuf[16];
|
|
logDebug(g_conf.m_logDebugSpider, "spider: adding spider reply, download end time %" PRId64" for "
|
|
"ip=%s(%" PRIu32") uh48=%" PRIu64" indexcode=\"%s\" coll=%" PRId32" "
|
|
"k.n1=%" PRIu64" k.n0=%" PRIu64,
|
|
//"to SpiderColl::m_lastDownloadCache",
|
|
srep->m_downloadEndTime,
|
|
iptoa(srep->m_firstIp,ipbuf),
|
|
(uint32_t)srep->m_firstIp,
|
|
srep->getUrlHash48(),
|
|
mstrerror(srep->m_errCode),
|
|
(int32_t)m_collnum,
|
|
srep->m_key.n1,
|
|
srep->m_key.n0);
|
|
|
|
// . add to wait tree and let it populate doledb on its batch run
|
|
// . use a spiderTime of 0 which means unknown and that it needs to
|
|
// scan spiderdb to get that
|
|
// . returns false if did not add to waiting tree
|
|
// . returns false sets g_errno on error
|
|
addToWaitingTree(srep->m_firstIp);
|
|
|
|
// ignore errors i guess
|
|
g_errno = 0;
|
|
|
|
return true;
|
|
}
|
|
|
|
bool SpiderColl::isInDupCache(const SpiderRequest *sreq, bool addToCache) {
|
|
|
|
// init dup cache?
|
|
if ( ! m_dupCache.isInitialized() )
|
|
// use 50k i guess of 64bit numbers and linked list info
|
|
m_dupCache.init ( 90000,
|
|
4 , // fixeddatasize (don't really need this)
|
|
5000, // maxcachenodes
|
|
"urldups", // dbname
|
|
false, // loadfromdisk
|
|
12, // cachekeysize
|
|
-1 ); // numptrsmax
|
|
|
|
// quit add dups over and over again...
|
|
int64_t dupKey64 = sreq->getUrlHash48();
|
|
int64_t org_dupKey64 = dupKey64;
|
|
|
|
|
|
// . these flags make big difference in url filters
|
|
// . NOTE: if you see a url that is not getting spidered that should be it might
|
|
// be because we are not incorporating other flags here...
|
|
if ( sreq->m_fakeFirstIp ) dupKey64 ^= 12345;
|
|
if ( sreq->m_isAddUrl ) dupKey64 ^= 49387333;
|
|
if ( sreq->m_isInjecting ) dupKey64 ^= 3276404;
|
|
if ( sreq->m_isPageReindex) dupKey64 ^= 32999604;
|
|
if ( sreq->m_forceDelete ) dupKey64 ^= 29386239;
|
|
if ( sreq->m_hadReply ) dupKey64 ^= 293294099;
|
|
|
|
// . maxage=86400,promoteRec=yes. returns -1 if not in there
|
|
RdbCacheLock rcl(m_dupCache);
|
|
|
|
if (m_dupCache.getLong(0, dupKey64, 86400, true) != -1) {
|
|
logDebug(g_conf.m_logDebugSpider, "spider: skipping dup request. url=%s uh48=%" PRIu64 ", dupkey=%" PRIu64 ", org_dupkey=%" PRIu64 ", %s%s%s%s%s%s", sreq->m_url, sreq->getUrlHash48(), dupKey64, org_dupKey64,
|
|
sreq->m_fakeFirstIp?"fakeFirstIp ":"",sreq->m_isAddUrl?"isAddUrl ":"",sreq->m_isInjecting?"isInjecting ":"",sreq->m_isPageReindex?"isPageReindex ":"",sreq->m_forceDelete?"forceDelete ":"",sreq->m_hadReply?"hadReply":"");
|
|
return true;
|
|
}
|
|
|
|
if (addToCache) {
|
|
logDebug(g_conf.m_logDebugSpider, "spider: Adding to dup cache. url=%s uh48=%" PRIu64 ", dupkey=%" PRIu64 ", org_dupkey=%" PRIu64 ", %s%s%s%s%s%s", sreq->m_url, sreq->getUrlHash48(), dupKey64, org_dupKey64,
|
|
sreq->m_fakeFirstIp?"fakeFirstIp ":"",sreq->m_isAddUrl?"isAddUrl ":"",sreq->m_isInjecting?"isInjecting ":"",sreq->m_isPageReindex?"isPageReindex ":"",sreq->m_forceDelete?"forceDelete ":"",sreq->m_hadReply?"hadReply":"");
|
|
|
|
// add it
|
|
m_dupCache.addLong(0, dupKey64, 1);
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
|
|
// . Rdb.cpp calls SpiderColl::addSpiderRequest/Reply() for every positive
|
|
// spiderdb record it adds to spiderdb. that way our cache is kept
|
|
// uptodate incrementally
|
|
// . returns false and sets g_errno on error
|
|
// . if the spiderTime appears to be AFTER m_nextReloadTime then we should
|
|
// not add this spider request to keep the cache trimmed!!! (MDW: TODO)
|
|
// . BUT! if we have 150,000 urls that is going to take a long time to
|
|
// spider, so it should have a high reload rate!
|
|
bool SpiderColl::addSpiderRequest(const SpiderRequest *sreq) {
|
|
int64_t nowGlobalMS = gettimeofdayInMilliseconds();
|
|
// don't add negative keys or data less thangs
|
|
if ( sreq->m_dataSize <= 0 ) {
|
|
log( "spider: add spider request is dataless for uh48=%" PRIu64, sreq->getUrlHash48() );
|
|
return true;
|
|
}
|
|
|
|
// . are we already more or less in spiderdb? true = addToCache
|
|
// . put this above isAssignedToUs() so we *try* to keep twins in sync because
|
|
// Rdb.cpp won't add the spiderrequest if its in this dup cache, and we add
|
|
// it to the dupcache here...
|
|
if ( isInDupCache ( sreq , true ) ) {
|
|
logDebug( g_conf.m_logDebugSpider, "spider: skipping dup request url=%s uh48=%" PRIu64,
|
|
sreq->m_url, sreq->getUrlHash48() );
|
|
return true;
|
|
}
|
|
|
|
// skip if not assigned to us for doling
|
|
if ( ! isAssignedToUs ( sreq->m_firstIp ) ) {
|
|
logDebug( g_conf.m_logDebugSpider, "spider: spider request not assigned to us. skipping." );
|
|
return true;
|
|
}
|
|
|
|
if ( sreq->isCorrupt() ) {
|
|
log( LOG_WARN, "spider: not adding corrupt spider req to doledb");
|
|
return true;
|
|
}
|
|
|
|
// . get the url's length contained in this record
|
|
// . it should be NULL terminated
|
|
// . we set the ip here too
|
|
int32_t ulen = sreq->getUrlLen();
|
|
// watch out for corruption
|
|
if ( sreq->m_firstIp == 0 || sreq->m_firstIp == -1 || ulen <= 0 ) {
|
|
log(LOG_ERROR,"spider: Corrupt spider req with url length of "
|
|
"%" PRId32" <= 0 u=%s. dataSize=%" PRId32" firstip=%" PRId32" uh48=%" PRIu64". Skipping.",
|
|
ulen,sreq->m_url,
|
|
sreq->m_dataSize,sreq->m_firstIp,sreq->getUrlHash48());
|
|
return true;
|
|
}
|
|
|
|
// . we can't do this because we do not have the spiderReply!!!???
|
|
// . MDW: no, we have to do it because tradesy.com has links to twitter
|
|
// on every page and twitter is not allowed so we continually
|
|
// re-scan a big spiderdblist for twitter's firstip. major performace
|
|
// degradation. so try to get ufn without reply. if we need
|
|
// a reply to get the ufn then this function should return -1 which
|
|
// means an unknown ufn and we'll add to waiting tree.
|
|
// get ufn/priority,because if filtered we do not want to add to doledb
|
|
// HACK: set isOutlink to true here since we don't know if we have sre
|
|
int32_t ufn = ::getUrlFilterNum(sreq, NULL, nowGlobalMS, false, m_cr, true, -1);
|
|
if (ufn >= 0) {
|
|
// spiders disabled for this row in url filters?
|
|
if (m_cr->m_maxSpidersPerRule[ufn] == 0) {
|
|
logDebug(g_conf.m_logDebugSpider, "spider: request spidersoff ufn=%d url=%s", ufn, sreq->m_url);
|
|
return true;
|
|
}
|
|
|
|
// do not add to doledb if bad
|
|
if (m_cr->m_forceDelete[ufn]) {
|
|
logDebug(g_conf.m_logDebugSpider, "spider: request %s is filtered ufn=%d", sreq->m_url, ufn);
|
|
return true;
|
|
}
|
|
}
|
|
|
|
// once in waiting tree, we will scan waiting tree and then lookup
|
|
// each firstIp in waiting tree in spiderdb to get the best
|
|
// SpiderRequest for that firstIp, then we can add it to doledb
|
|
// as long as it can be spidered now
|
|
bool added = addToWaitingTree(sreq->m_firstIp);
|
|
|
|
// if already doled and we beat the priority/spidertime of what
|
|
// was doled then we should probably delete the old doledb key
|
|
// and add the new one. hmm, the waitingtree scan code ...
|
|
|
|
// update the latest siteNumInlinks count for this "site"
|
|
if (sreq->m_siteNumInlinksValid) {
|
|
// updates m_siteNumInlinksTable
|
|
updateSiteNumInlinksTable(sreq->m_siteHash32, sreq->m_siteNumInlinks, (time_t) sreq->m_addedTime);
|
|
// clear error for this if there was any
|
|
g_errno = 0;
|
|
}
|
|
|
|
// log it
|
|
char ipbuf[16];
|
|
logDebug( g_conf.m_logDebugSpider, "spider: %s request to waiting tree %s"
|
|
" uh48=%" PRIu64
|
|
" firstIp=%s "
|
|
" pageNumInlinks=%" PRIu32
|
|
" parentdocid=%" PRIu64
|
|
" isinjecting=%" PRId32
|
|
" ispagereindex=%" PRId32
|
|
" ufn=%" PRId32
|
|
" priority=%" PRId32
|
|
" addedtime=%" PRIu32,
|
|
added ? "ADDED" : "DIDNOTADD",
|
|
sreq->m_url,
|
|
sreq->getUrlHash48(),
|
|
iptoa(sreq->m_firstIp,ipbuf),
|
|
(uint32_t)sreq->m_pageNumInlinks,
|
|
sreq->getParentDocId(),
|
|
(int32_t)(bool)sreq->m_isInjecting,
|
|
(int32_t)(bool)sreq->m_isPageReindex,
|
|
(int32_t)sreq->m_ufn,
|
|
(int32_t)sreq->m_priority,
|
|
(uint32_t)sreq->m_addedTime );
|
|
|
|
return true;
|
|
}
|
|
|
|
bool SpiderColl::printWaitingTree() {
|
|
ScopedLock sl(m_waitingTree.getLock());
|
|
|
|
for (int32_t node = m_waitingTree.getFirstNode_unlocked(); node >= 0;
|
|
node = m_waitingTree.getNextNode_unlocked(node)) {
|
|
const key96_t *wk = reinterpret_cast<const key96_t*>(m_waitingTree.getKey_unlocked(node));
|
|
// spider time is up top
|
|
uint64_t spiderTimeMS = (wk->n1);
|
|
spiderTimeMS <<= 32;
|
|
spiderTimeMS |= ((wk->n0) >> 32);
|
|
// then ip
|
|
int32_t firstIp = wk->n0 & 0xffffffff;
|
|
// show it
|
|
char ipbuf[16];
|
|
|
|
// for readable timestamp..
|
|
time_t now_t = (time_t)(spiderTimeMS / 1000);
|
|
struct tm tm_buf;
|
|
struct tm *stm = gmtime_r(&now_t,&tm_buf);
|
|
|
|
log(LOG_INFO,"dump: time=%" PRId64 " (%04d%02d%02d-%02d%02d%02d-%03d) firstip=%s",spiderTimeMS, stm->tm_year+1900,stm->tm_mon+1,stm->tm_mday,stm->tm_hour,stm->tm_min,stm->tm_sec,(int)(spiderTimeMS%1000), iptoa(firstIp,ipbuf));
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
//////
|
|
//
|
|
// . 1. called by addSpiderReply(). it should have the sameIpWait available
|
|
// or at least that will be in the crawldelay cache table.
|
|
// SpiderReply::m_crawlDelayMS. Unfortunately, no maxSpidersPerIP!!!
|
|
// we just add a "0" in the waiting tree which means evalIpLoop() will
|
|
// be called and can get the maxSpidersPerIP from the winning candidate
|
|
// and add to the waiting tree based on that.
|
|
// . 2. called by addSpiderRequests(). It SHOULD maybe just add a "0" as well
|
|
// to offload the logic. try that.
|
|
// . 3. called by populateWaitingTreeFromSpiderdb(). it just adds "0" as well,
|
|
// if not doled
|
|
// . 4. UPDATED in evalIpLoop() if the best SpiderRequest for a firstIp is
|
|
// in the future, this is the only time we will add a waiting tree key
|
|
// whose spider time is non-zero. that is where we also take
|
|
// sameIpWait and maxSpidersPerIP into consideration. evalIpLoop()
|
|
// will actually REMOVE the entry from the waiting tree if that IP
|
|
// already has the max spiders outstanding per IP. when a spiderReply
|
|
// is received it will populate the waiting tree again with a "0" entry
|
|
// and evalIpLoop() will re-do its check.
|
|
//
|
|
//////
|
|
|
|
// . returns true if we added to waiting tree, false if not
|
|
// . if one of these add fails consider increasing mem used by tree/table
|
|
// . if we lose an ip that sux because it won't be gotten again unless
|
|
// we somehow add another request/reply to spiderdb in the future
|
|
bool SpiderColl::addToWaitingTree(int32_t firstIp) {
|
|
char ipbuf[16];
|
|
logDebug( g_conf.m_logDebugSpider, "spider: addtowaitingtree ip=%s", iptoa(firstIp,ipbuf) );
|
|
|
|
// we are currently reading spiderdb for this ip and trying to find
|
|
// a best SpiderRequest or requests to add to doledb. so if this
|
|
// happens, let the scan know that more replies or requests came in
|
|
// while we were scanning so that it should not delete the rec from
|
|
// waiting tree and not add to doledb, then we'd lose it forever or
|
|
// until the next waitingtree rebuild was triggered in time.
|
|
//
|
|
// Before i was only setting this in addSpiderRequest() so if a new
|
|
// reply came in it was not setting m_gotNewDataForScanninIp and
|
|
// we ended up losing the IP from the waiting tree forever (or until
|
|
// the next timed rebuild). putting it here seems to fix that.
|
|
/// @todo ALC verify that we won't lose IP from waiting tree. Do we need to lock the whole evalIpLoop?
|
|
if ( firstIp == m_scanningIp ) {
|
|
m_gotNewDataForScanningIp = m_scanningIp.load();
|
|
//log(LOG_DEBUG,"spider: got new data for %s",iptoa(firstIp));
|
|
//return true;
|
|
}
|
|
|
|
// . this can now be only 0
|
|
// . only evalIpLoop() will add a waiting tree key with a non-zero
|
|
// value after it figures out the EARLIEST time that a
|
|
// SpiderRequest from this firstIp can be spidered.
|
|
uint64_t spiderTimeMS = 0;
|
|
|
|
// don't write to tree if we're shutting down
|
|
if (g_process.isShuttingDown()) {
|
|
log(LOG_WARN, "spider: addtowaitingtree: failed. shutting down");
|
|
return false;
|
|
}
|
|
|
|
// only if we are the responsible host in the shard
|
|
if ( ! isAssignedToUs ( firstIp ) )
|
|
return false;
|
|
|
|
// . do not add to waiting tree if already in doledb
|
|
// . an ip should not exist in both doledb and waiting tree.
|
|
// . waiting tree is meant to be a signal that we need to add
|
|
// a spiderrequest from that ip into doledb where it can be picked
|
|
// up for immediate spidering
|
|
if (isInDoledbIpTable(firstIp)) {
|
|
logDebug( g_conf.m_logDebugSpider, "spider: not adding to waiting tree, already in doleip table" );
|
|
return false;
|
|
}
|
|
|
|
ScopedLock sl(m_waitingTree.getLock());
|
|
|
|
// see if in tree already, so we can delete it and replace it below
|
|
// . this is true if already in tree
|
|
// . if spiderTimeMS is a sooner time than what this firstIp already
|
|
// has as its earliest time, then we will override it and have to
|
|
// update both m_waitingTree and m_waitingTable, however
|
|
// IF the spiderTimeMS is a later time, then we bail without doing
|
|
// anything at this point.
|
|
int64_t sms;
|
|
if (getFromWaitingTable(firstIp, &sms)) {
|
|
// not only must we be a sooner time, but we must be 5-seconds
|
|
// sooner than the time currently in there to avoid thrashing
|
|
// when we had a ton of outlinks with this first ip within an
|
|
// 5-second interval.
|
|
//
|
|
// i'm not so sure what i was doing here before, but i don't
|
|
// want to starve the spiders, so make this 100ms not 5000ms
|
|
if ( (int64_t)spiderTimeMS > sms - 100 ) {
|
|
logDebug( g_conf.m_logDebugSpider, "spider: skip updating waiting tree" );
|
|
return false;
|
|
}
|
|
|
|
// make the key then
|
|
key96_t wk = makeWaitingTreeKey ( sms, firstIp );
|
|
|
|
// must be there
|
|
if (!m_waitingTree.deleteNode_unlocked(0, (char*)&wk, false)) {
|
|
// sanity check. ensure waitingTable and waitingTree in sync
|
|
gbshutdownLogicError();
|
|
}
|
|
|
|
// log the replacement
|
|
logDebug( g_conf.m_logDebugSpider, "spider: replacing waitingtree key oldtime=%" PRIu32" newtime=%" PRIu32" firstip=%s",
|
|
(uint32_t)(sms/1000LL),
|
|
(uint32_t)(spiderTimeMS/1000LL),
|
|
iptoa(firstIp,ipbuf) );
|
|
} else {
|
|
// time of 0 means we got the reply for something we spidered
|
|
// in doledb so we will need to recompute the best spider
|
|
// requests for this first ip
|
|
|
|
// log the replacement
|
|
logDebug( g_conf.m_logDebugSpcache, "spider: adding new key to waitingtree newtime=%" PRIu32"%s firstip=%s",
|
|
(uint32_t)(spiderTimeMS/1000LL),
|
|
( spiderTimeMS == 0 ) ? "(replyreset)" : "",
|
|
iptoa(firstIp,ipbuf) );
|
|
}
|
|
|
|
// what is this?
|
|
if ( firstIp == 0 || firstIp == -1 ) {
|
|
log(LOG_WARN, "spider: got ip of %s. cn=%" PRId32" "
|
|
"wtf? failed to add to "
|
|
"waiting tree, but return true anyway.",
|
|
iptoa(firstIp,ipbuf) ,
|
|
(int32_t)m_collnum);
|
|
// don't return true lest m_waitingTreeNextKey never gets updated
|
|
// and we end up in an infinite loop doing
|
|
// populateWaitingTreeFromSpiderdb()
|
|
return true;
|
|
}
|
|
|
|
// grow the tree if too small!
|
|
int32_t used = m_waitingTree.getNumUsedNodes_unlocked();
|
|
int32_t max = m_waitingTree.getNumTotalNodes_unlocked();
|
|
|
|
if ( used + 1 > max ) {
|
|
int32_t more = (((int64_t)used) * 15) / 10;
|
|
if ( more < 10 ) more = 10;
|
|
if ( more > 100000 ) more = 100000;
|
|
int32_t newNum = max + more;
|
|
log(LOG_DEBUG, "spider: growing waiting tree to from %" PRId32" to %" PRId32" nodes for collnum %" PRId32,
|
|
max , newNum , (int32_t)m_collnum );
|
|
if (!m_waitingTree.growTree_unlocked(newNum)) {
|
|
log(LOG_ERROR, "Failed to grow waiting tree to add firstip %s", iptoa(firstIp,ipbuf));
|
|
return false;
|
|
}
|
|
if (!setWaitingTableSize(newNum)) {
|
|
log(LOG_ERROR, "Failed to grow waiting table to add firstip %s", iptoa(firstIp,ipbuf));
|
|
return false;
|
|
}
|
|
}
|
|
|
|
key96_t wk = makeWaitingTreeKey(spiderTimeMS, firstIp);
|
|
|
|
// add that
|
|
int32_t wn;
|
|
if ((wn = m_waitingTree.addKey_unlocked(&wk)) < 0) {
|
|
log(LOG_ERROR, "waitingtree add failed for ip=%s. increase max nodes lest we lose this IP forever. err=%s",
|
|
iptoa(firstIp,ipbuf), mstrerror(g_errno));
|
|
return false;
|
|
}
|
|
|
|
// note it
|
|
logDebug(g_conf.m_logDebugSpider, "spider: added time=%" PRId64" ip=%s to waiting tree node=%" PRId32,
|
|
spiderTimeMS, iptoa(firstIp,ipbuf), wn);
|
|
|
|
// add to table now since its in the tree
|
|
if (!addToWaitingTable(firstIp, spiderTimeMS)) {
|
|
// remove from tree then
|
|
m_waitingTree.deleteNode_unlocked(wn, false);
|
|
|
|
log(LOG_ERROR, "waitingtable add failed for ip=%s. increase max nodes lest we lose this IP forever. err=%s",
|
|
iptoa(firstIp,ipbuf), mstrerror(g_errno));
|
|
return false;
|
|
}
|
|
|
|
// tell caller there was no error
|
|
return true;
|
|
}
|
|
|
|
|
|
// . this scan is started anytime we call addSpiderRequest() or addSpiderReply
|
|
// . if nothing is in tree it quickly exits
|
|
// . otherwise it scan the entries in the tree
|
|
// . each entry is a key with spiderTime/firstIp
|
|
// . if spiderTime > now it stops the scan
|
|
// . if the firstIp is already in doledb (m_doledbIpTable) then it removes
|
|
// it from the waitingtree and waitingtable. how did that happen?
|
|
// . otherwise, it looks up that firstIp in spiderdb to get a list of all
|
|
// the spiderdb recs from that firstIp
|
|
// . then it selects the "best" one and adds it to doledb. once added to
|
|
// doledb it adds it to doleIpTable, and remove from waitingtree and
|
|
// waitingtable
|
|
// . returns false if blocked, true otherwise
|
|
int32_t SpiderColl::getNextIpFromWaitingTree() {
|
|
// reset first key to get first rec in waiting tree
|
|
m_waitingTreeKey.setMin();
|
|
|
|
// current time on host #0
|
|
uint64_t nowMS = gettimeofdayInMilliseconds();
|
|
|
|
for (;;) {
|
|
ScopedLock sl(m_waitingTree.getLock());
|
|
|
|
// we might have deleted the only node below...
|
|
if (m_waitingTree.isEmpty_unlocked()) {
|
|
return 0;
|
|
}
|
|
|
|
// assume none
|
|
int32_t firstIp = 0;
|
|
// set node from wait tree key. this way we can resume from a prev key
|
|
int32_t node = m_waitingTree.getNextNode_unlocked(0, (char *)&m_waitingTreeKey);
|
|
// if empty, stop
|
|
if (node < 0) {
|
|
return 0;
|
|
}
|
|
|
|
// get the key
|
|
const key96_t *k = reinterpret_cast<const key96_t *>(m_waitingTree.getKey_unlocked(node));
|
|
|
|
// ok, we got one
|
|
firstIp = (k->n0) & 0xffffffff;
|
|
|
|
// sometimes we take over for a dead host, but if he's no longer
|
|
// dead then we can remove his keys. but first make sure we have had
|
|
// at least one ping from him so we do not remove at startup.
|
|
// if it is in doledb or in the middle of being added to doledb
|
|
// via msg4, nuke it as well!
|
|
if (firstIp == 0 || firstIp == -1 || !isAssignedToUs(firstIp) || isInDoledbIpTable(firstIp)) {
|
|
if (firstIp == 0 || firstIp == -1) {
|
|
log(LOG_WARN, "spider: removing corrupt spiderreq firstip of %" PRId32"from waiting tree collnum=%i",
|
|
firstIp, (int)m_collnum);
|
|
}
|
|
|
|
// these operations should fail if writes have been disabled
|
|
// and becase the trees/tables for spidercache are saving
|
|
// in Process.cpp's g_spiderCache::save() call
|
|
m_waitingTree.deleteNode_unlocked(node, true);
|
|
|
|
char ipbuf[16];
|
|
logDebug(g_conf.m_logDebugSpider, "spider: removed ip=%s from waiting tree. nn=%" PRId32,
|
|
iptoa(firstIp,ipbuf), m_waitingTree.getNumUsedNodes_unlocked());
|
|
|
|
logDebug(g_conf.m_logDebugSpcache, "spider: erasing waitingtree key firstip=%s", iptoa(firstIp,ipbuf));
|
|
|
|
// remove from table too!
|
|
removeFromWaitingTable(firstIp);
|
|
continue;
|
|
}
|
|
|
|
// spider time is up top
|
|
uint64_t spiderTimeMS = (k->n1);
|
|
spiderTimeMS <<= 32;
|
|
spiderTimeMS |= ((k->n0) >> 32);
|
|
|
|
// stop if need to wait for this one
|
|
if (spiderTimeMS > nowMS) {
|
|
return 0;
|
|
}
|
|
|
|
// save key for deleting when done
|
|
m_waitingTreeKey.n1 = k->n1;
|
|
m_waitingTreeKey.n0 = k->n0;
|
|
m_waitingTreeKeyValid = true;
|
|
m_scanningIp = firstIp;
|
|
|
|
// compute the best request from spiderdb list, not valid yet
|
|
m_lastReplyValid = false;
|
|
|
|
// start reading spiderdb here
|
|
m_nextKey = Spiderdb::makeFirstKey(firstIp);
|
|
m_endKey = Spiderdb::makeLastKey(firstIp);
|
|
|
|
// all done
|
|
return firstIp;
|
|
}
|
|
}
|
|
|
|
void SpiderColl::getSpiderdbWaitingTreeListWrapper(void *state) {
|
|
SpiderColl *sc = static_cast<SpiderColl*>(state);
|
|
|
|
if (!SpiderdbRdbSqliteBridge::getFirstIps(sc->m_cr->m_collnum,
|
|
&sc->m_waitingTreeList,
|
|
Spiderdb::getFirstIp(&sc->m_waitingTreeNextKey),
|
|
-1,
|
|
SR_READ_SIZE)) {
|
|
if (!g_errno) {
|
|
g_errno = EIO; //imprecise
|
|
logTrace(g_conf.m_logTraceSpider, "END, got io-error from sqlite");
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
void SpiderColl::gotSpiderdbWaitingTreeListWrapper(void *state, job_exit_t exit_type) {
|
|
SpiderColl *THIS = (SpiderColl *)state;
|
|
|
|
// did our collection rec get deleted? since we were doing a read
|
|
// the SpiderColl will have been preserved in that case but its
|
|
// m_deleteMyself flag will have been set.
|
|
if (tryToDeleteSpiderColl(THIS, "2")) {
|
|
return;
|
|
}
|
|
|
|
THIS->m_gettingWaitingTreeList = false;
|
|
|
|
THIS->populateWaitingTreeFromSpiderdb(true);
|
|
}
|
|
|
|
|
|
|
|
//////////////////
|
|
//////////////////
|
|
//
|
|
// THE BACKGROUND FUNCTION
|
|
//
|
|
// when the user changes the ufn table the waiting tree is flushed
|
|
// and repopulated from spiderdb with this. also used for repairs.
|
|
//
|
|
//////////////////
|
|
//////////////////
|
|
|
|
// . this stores an ip into the waiting tree with a spidertime of "0" so
|
|
// it will be evaluate properly by populateDoledbFromWaitingTree()
|
|
//
|
|
// @@@ BR: "it seems they fall out over time" - wtf?
|
|
// . scan spiderdb to make sure each firstip represented in spiderdb is
|
|
// in the waiting tree. it seems they fall out over time. we need to fix
|
|
// that but in the meantime this should do a bg repair. and is nice to have
|
|
//
|
|
// . the waiting tree key is really just a spidertime and a firstip. so we will
|
|
// still need populatedoledbfromwaitingtree to periodically scan firstips
|
|
// that are already in doledb to see if it has a higher-priority request
|
|
// for that firstip. in which case it can add that to doledb too, but then
|
|
// we have to be sure to only grant one lock for a firstip to avoid hammering
|
|
// that firstip
|
|
//
|
|
// . this should be called from a sleepwrapper, the same sleep wrapper we
|
|
// call populateDoledbFromWaitingTree() from should be fine
|
|
void SpiderColl::populateWaitingTreeFromSpiderdb ( bool reentry ) {
|
|
|
|
logTrace( g_conf.m_logTraceSpider, "BEGIN" );
|
|
|
|
// skip if in repair mode
|
|
if ( g_repairMode )
|
|
{
|
|
logTrace( g_conf.m_logTraceSpider, "END, in repair mode" );
|
|
return;
|
|
}
|
|
|
|
// sanity
|
|
if ( m_deleteMyself ) gbshutdownLogicError();
|
|
// skip if spiders off
|
|
if ( ! m_cr->m_spideringEnabled )
|
|
{
|
|
logTrace( g_conf.m_logTraceSpider, "END, spiders disabled" );
|
|
return;
|
|
}
|
|
|
|
if ( ! g_hostdb.getMyHost( )->m_spiderEnabled )
|
|
{
|
|
logTrace( g_conf.m_logTraceSpider, "END, spiders disabled (2)" );
|
|
return;
|
|
}
|
|
|
|
|
|
// skip if udp table is full
|
|
if ( g_udpServer.getNumUsedSlotsIncoming() >= MAXUDPSLOTS )
|
|
{
|
|
logTrace( g_conf.m_logTraceSpider, "END, UDP table full" );
|
|
return;
|
|
}
|
|
|
|
// if entering for the first time, we need to read list from spiderdb
|
|
if ( ! reentry ) {
|
|
// just return if we should not be doing this yet
|
|
if ( ! m_waitingTreeNeedsRebuild )
|
|
{
|
|
logTrace( g_conf.m_logTraceSpider, "END, !m_waitingTreeNeedsRebuild" );
|
|
return;
|
|
}
|
|
|
|
// a double call? can happen if list read is slow...
|
|
if ( m_gettingWaitingTreeList )
|
|
{
|
|
logTrace( g_conf.m_logTraceSpider, "END, double call" );
|
|
return;
|
|
}
|
|
|
|
// . borrow a msg5
|
|
// . if none available just return, we will be called again
|
|
// by the sleep/timer function
|
|
|
|
// . read in a replacement SpiderRequest to add to doledb from
|
|
// this ip
|
|
// . get the list of spiderdb records
|
|
// . do not include cache, those results are old and will mess
|
|
// us up
|
|
char ipbuf[16];
|
|
char keystrbuf[MAX_KEYSTR_BYTES];
|
|
log(LOG_DEBUG,"spider: populateWaitingTree: calling msg5: startKey=0x%s firstip=%s",
|
|
KEYSTR(&m_waitingTreeNextKey,sizeof(m_waitingTreeNextKey),keystrbuf), iptoa(Spiderdb::getFirstIp(&m_waitingTreeNextKey),ipbuf));
|
|
|
|
// flag it
|
|
m_gettingWaitingTreeList = true;
|
|
|
|
// read the list from local disk
|
|
if (g_jobScheduler.submit(getSpiderdbWaitingTreeListWrapper, gotSpiderdbWaitingTreeListWrapper, this, thread_type_spider_read, 0)) {
|
|
return;
|
|
}
|
|
|
|
// unable to submit job
|
|
getSpiderdbWaitingTreeListWrapper(this);
|
|
|
|
if (g_errno) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
// show list stats
|
|
logDebug( g_conf.m_logDebugSpider, "spider: populateWaitingTree: got list of size %" PRId32, m_waitingTreeList.getListSize() );
|
|
|
|
// unflag it
|
|
m_gettingWaitingTreeList = false;
|
|
|
|
// don't proceed if we're shutting down
|
|
if (g_process.isShuttingDown()) {
|
|
logTrace( g_conf.m_logTraceSpider, "END, process is shutting down" );
|
|
return;
|
|
}
|
|
|
|
// ensure we point to the top of the list
|
|
m_waitingTreeList.resetListPtr();
|
|
// bail on error
|
|
if ( g_errno ) {
|
|
log(LOG_ERROR,"spider: Had error getting list of urls from spiderdb2: %s.", mstrerror(g_errno));
|
|
//m_isReadDone2 = true;
|
|
logTrace( g_conf.m_logTraceSpider, "END" );
|
|
return;
|
|
}
|
|
|
|
int32_t lastOne = 0;
|
|
// loop over all serialized spiderdb records in the list
|
|
for ( ; ! m_waitingTreeList.isExhausted() ; ) {
|
|
// get spiderdb rec in its serialized form
|
|
const char *rec = m_waitingTreeList.getCurrentRec();
|
|
// skip to next guy
|
|
m_waitingTreeList.skipCurrentRecord();
|
|
// negative? wtf?
|
|
if ( (rec[0] & 0x01) == 0x00 ) {
|
|
//logf(LOG_DEBUG,"spider: got negative spider rec");
|
|
continue;
|
|
}
|
|
// if its a SpiderReply skip it
|
|
if ( ! Spiderdb::isSpiderRequest ( (key128_t *)rec))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
// cast it
|
|
const SpiderRequest *sreq = reinterpret_cast<const SpiderRequest *>(rec);
|
|
// get first ip
|
|
int32_t firstIp = Spiderdb::getFirstIp(&sreq->m_key);
|
|
|
|
// if same as last, skip it
|
|
if ( firstIp == lastOne )
|
|
{
|
|
char ipbuf[16];
|
|
logTrace( g_conf.m_logTraceSpider, "Skipping, IP [%s] same as last" , iptoa(firstIp,ipbuf));
|
|
continue;
|
|
}
|
|
|
|
// set this lastOne for speed
|
|
lastOne = firstIp;
|
|
|
|
// if firstip already in waiting tree, skip it
|
|
if (isInWaitingTable(firstIp)) {
|
|
char ipbuf[16];
|
|
logTrace( g_conf.m_logTraceSpider, "Skipping, IP [%s] already in waiting tree" , iptoa(firstIp,ipbuf));
|
|
continue;
|
|
}
|
|
|
|
// skip if only our twin should add it to waitingtree/doledb
|
|
if ( ! isAssignedToUs ( firstIp ) )
|
|
{
|
|
char ipbuf[16];
|
|
logTrace( g_conf.m_logTraceSpider, "Skipping, IP [%s] not assigned to us" , iptoa(firstIp,ipbuf));
|
|
continue;
|
|
}
|
|
|
|
// skip if ip already represented in doledb i guess otherwise
|
|
// the populatedoledb scan will nuke it!!
|
|
if (isInDoledbIpTable(firstIp)) {
|
|
char ipbuf[16];
|
|
logTrace( g_conf.m_logTraceSpider, "Skipping, IP [%s] already in doledb" , iptoa(firstIp,ipbuf));
|
|
continue;
|
|
}
|
|
|
|
// not currently spidering either. when they got their
|
|
// lock they called confirmLockAcquisition() which will
|
|
// have added an entry to the waiting table. sometimes the
|
|
// lock still exists but the spider is done. because the
|
|
// lock persists for 5 seconds afterwards in case there was
|
|
// a lock request for that url in progress, so it will be
|
|
// denied.
|
|
|
|
// . this is starving other collections , should be
|
|
// added to waiting tree anyway! otherwise it won't get
|
|
// added!!!
|
|
// . so now i made this collection specific, not global
|
|
if ( g_spiderLoop.getNumSpidersOutPerIp (firstIp,m_collnum)>0)
|
|
{
|
|
char ipbuf[16];
|
|
logTrace( g_conf.m_logTraceSpider, "Skipping, IP [%s] is already being spidered" , iptoa(firstIp,ipbuf));
|
|
continue;
|
|
}
|
|
|
|
// otherwise, we want to add it with 0 time so the doledb
|
|
// scan will evaluate it properly
|
|
// this will return false if we are saving the tree i guess
|
|
if (!addToWaitingTree(firstIp)) {
|
|
char ipbuf[16];
|
|
log(LOG_INFO, "spider: failed to add ip %s to waiting tree. "
|
|
"ip will not get spidered then and our population of waiting tree will repeat until this add happens.",
|
|
iptoa(firstIp,ipbuf) );
|
|
logTrace( g_conf.m_logTraceSpider, "END, addToWaitingTree for IP [%s] failed" , iptoa(firstIp,ipbuf));
|
|
return;
|
|
}
|
|
else
|
|
{
|
|
char ipbuf[16];
|
|
logTrace( g_conf.m_logTraceSpider, "IP [%s] added to waiting tree" , iptoa(firstIp,ipbuf));
|
|
}
|
|
|
|
// count it
|
|
m_numAdded++;
|
|
// ignore errors for this
|
|
g_errno = 0;
|
|
}
|
|
|
|
|
|
// are we the final list in the scan?
|
|
bool shortRead = ( m_waitingTreeList.getListSize() <= 0);
|
|
|
|
m_numBytesScanned += m_waitingTreeList.getListSize();
|
|
|
|
// reset? still left over from our first scan?
|
|
if ( m_lastPrintCount > m_numBytesScanned )
|
|
m_lastPrintCount = 0;
|
|
|
|
// announce every 10MB
|
|
if ( m_numBytesScanned - m_lastPrintCount > 10000000 ) {
|
|
log(LOG_INFO, "spider: %" PRIu64" spiderdb bytes scanned for waiting tree re-population for cn=%" PRId32,m_numBytesScanned,
|
|
(int32_t)m_collnum);
|
|
m_lastPrintCount = m_numBytesScanned;
|
|
}
|
|
|
|
// debug info
|
|
log(LOG_DEBUG,"spider: Read2 %" PRId32" spiderdb bytes.",m_waitingTreeList.getListSize());
|
|
// reset any errno cuz we're just a cache
|
|
g_errno = 0;
|
|
|
|
// if not done, keep going
|
|
if ( ! shortRead ) {
|
|
// . inc it here
|
|
// . it can also be reset on a collection rec update
|
|
key128_t lastKey = *(key128_t *)m_waitingTreeList.getLastKey();
|
|
|
|
if ( lastKey < m_waitingTreeNextKey ) {
|
|
log(LOG_WARN, "spider: got corruption 9. spiderdb keys out of order for collnum=%" PRId32, (int32_t)m_collnum);
|
|
|
|
// this should result in an empty list read for
|
|
// our next scan of spiderdb. unfortunately we could
|
|
// miss a lot of spider requests then
|
|
KEYMAX((char*)&m_waitingTreeNextKey,sizeof(m_waitingTreeNextKey));
|
|
}
|
|
else {
|
|
m_waitingTreeNextKey = lastKey;
|
|
m_waitingTreeNextKey++;
|
|
}
|
|
|
|
// watch out for wrap around
|
|
if ( m_waitingTreeNextKey < lastKey ) shortRead = true;
|
|
// nah, advance the firstip, should be a lot faster when
|
|
// we are only a few firstips...
|
|
if ( lastOne && lastOne != -1 ) { // && ! gotCorruption ) {
|
|
key128_t cand = Spiderdb::makeFirstKey(lastOne+1);
|
|
// corruption still seems to happen, so only
|
|
// do this part if it increases the key to avoid
|
|
// putting us into an infinite loop.
|
|
if ( cand > m_waitingTreeNextKey ) m_waitingTreeNextKey = cand;
|
|
}
|
|
}
|
|
|
|
if ( shortRead ) {
|
|
// mark when the scan completed so we can do another one
|
|
// like 24 hrs from that...
|
|
m_lastScanTime = getTime();
|
|
|
|
log(LOG_DEBUG, "spider: WaitingTree rebuild complete for %s. Added %" PRId32" recs to waiting tree, scanned %" PRId64" bytes of spiderdb.",
|
|
m_coll, m_numAdded, m_numBytesScanned);
|
|
//printWaitingTree();
|
|
|
|
// reset the count for next scan
|
|
m_numAdded = 0 ;
|
|
m_numBytesScanned = 0;
|
|
// reset for next scan
|
|
m_waitingTreeNextKey.setMin();
|
|
// no longer need rebuild
|
|
m_waitingTreeNeedsRebuild = false;
|
|
}
|
|
|
|
// free list to save memory
|
|
m_waitingTreeList.freeList();
|
|
// wait for sleepwrapper to call us again with our updated m_waitingTreeNextKey
|
|
logTrace( g_conf.m_logTraceSpider, "END, done" );
|
|
return;
|
|
}
|
|
|
|
|
|
|
|
//static bool s_ufnTreeSet = false;
|
|
//static RdbTree s_ufnTree;
|
|
//static time_t s_lastUfnTreeFlushTime = 0;
|
|
|
|
//////////////////////////
|
|
//////////////////////////
|
|
//
|
|
// The first KEYSTONE function.
|
|
//
|
|
// CALL THIS ANYTIME to load up doledb from waiting tree entries
|
|
//
|
|
// This is a key function.
|
|
//
|
|
// It is called from two places:
|
|
//
|
|
// 1) sleep callback
|
|
//
|
|
// 2) addToWaitingTree()
|
|
// is called from addSpiderRequest() anytime a SpiderRequest
|
|
// is added to spiderdb (or from addSpiderReply())
|
|
//
|
|
// It can only be entered once so will just return if already scanning
|
|
// spiderdb.
|
|
//
|
|
//////////////////////////
|
|
//////////////////////////
|
|
|
|
// . for each IP in the waiting tree, scan all its SpiderRequests and determine
|
|
// which one should be the next to be spidered. and put that one in doledb.
|
|
// . we call this a lot, like if the admin changes the url filters table
|
|
// we have to re-scan all of spiderdb basically and re-do doledb
|
|
void SpiderColl::populateDoledbFromWaitingTree ( ) { // bool reentry ) {
|
|
|
|
logTrace( g_conf.m_logTraceSpider, "BEGIN" );
|
|
|
|
// only one loop can run at a time!
|
|
if ( m_isPopulatingDoledb ) {
|
|
logTrace( g_conf.m_logTraceSpider, "END, already populating doledb" );
|
|
return;
|
|
}
|
|
|
|
// skip if in repair mode
|
|
if ( g_repairMode ) {
|
|
logTrace( g_conf.m_logTraceSpider, "END, in repair mode" );
|
|
return;
|
|
}
|
|
|
|
// let's skip if spiders off so we can inject/popoulate the index quick
|
|
// since addSpiderRequest() calls addToWaitingTree() which then calls
|
|
// this.
|
|
if ( ! g_conf.m_spideringEnabled ) {
|
|
logTrace( g_conf.m_logTraceSpider, "END, spidering not enabled" );
|
|
return;
|
|
}
|
|
|
|
if ( ! g_hostdb.getMyHost( )->m_spiderEnabled ) {
|
|
logTrace( g_conf.m_logTraceSpider, "END, spidering not enabled (2)" );
|
|
return;
|
|
}
|
|
|
|
// skip if udp table is full
|
|
if ( g_udpServer.getNumUsedSlotsIncoming() >= MAXUDPSLOTS ) {
|
|
logTrace( g_conf.m_logTraceSpider, "END, no more UDP slots" );
|
|
return;
|
|
}
|
|
|
|
// set this flag so we are not re-entered
|
|
m_isPopulatingDoledb = true;
|
|
|
|
for(;;) {
|
|
// are we trying to exit? some firstip lists can be quite long, so
|
|
// terminate here so all threads can return and we can exit properly
|
|
if (g_process.isShuttingDown()) {
|
|
m_isPopulatingDoledb = false;
|
|
logTrace( g_conf.m_logTraceSpider, "END, shutting down" );
|
|
return;
|
|
}
|
|
|
|
// . get next IP that is due to be spidered from
|
|
// . also sets m_waitingTreeKey so we can delete it easily!
|
|
int32_t ip = getNextIpFromWaitingTree();
|
|
|
|
// . return if none. all done. unset populating flag.
|
|
// . it returns 0 if the next firstip has a spidertime in the future
|
|
if ( ip == 0 ) {
|
|
m_isPopulatingDoledb = false;
|
|
return;
|
|
}
|
|
|
|
// set read range for scanning spiderdb
|
|
m_nextKey = Spiderdb::makeFirstKey(ip);
|
|
m_endKey = Spiderdb::makeLastKey (ip);
|
|
|
|
char ipbuf[16];
|
|
logDebug( g_conf.m_logDebugSpider, "spider: for cn=%i nextip=%s nextkey=%s",
|
|
(int)m_collnum, iptoa(ip,ipbuf), KEYSTR( &m_nextKey, sizeof( key128_t ) ) );
|
|
|
|
//////
|
|
//
|
|
// do TWO PASSES, one to count pages, the other to get the best url!!
|
|
//
|
|
//////
|
|
// assume we don't have to do two passes
|
|
m_countingPagesIndexed = false;
|
|
|
|
// get the collectionrec
|
|
const CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
|
|
// but if we have quota based url filters we do have to count
|
|
if ( cr && cr->m_urlFiltersHavePageCounts ) {
|
|
// tell evalIpLoop() to count first
|
|
m_countingPagesIndexed = true;
|
|
// reset this stuff used for counting UNIQUE votes
|
|
m_lastReqUh48a = 0LL;
|
|
m_lastReqUh48b = 0LL;
|
|
m_lastRepUh48 = 0LL;
|
|
// and setup the LOCAL counting table if not initialized
|
|
if (!m_siteIndexedDocumentCount.isInitialized()) {
|
|
m_siteIndexedDocumentCount.set(4, 4, 0, NULL, 0, false, "ltpct");
|
|
}
|
|
// otherwise, just reset it so we can repopulate it
|
|
else m_siteIndexedDocumentCount.reset();
|
|
}
|
|
|
|
logDebug( g_conf.m_logDebugSpider, "spider: evalIpLoop: waitingtree nextip=%s numUsedNodes=%" PRId32,
|
|
iptoa(ip,ipbuf), m_waitingTree.getNumUsedNodes() );
|
|
|
|
//@@@@@@ BR: THIS SHOULD BE DEBUGGED AND ENABLED
|
|
|
|
/*
|
|
// assume using tree
|
|
m_useTree = true;
|
|
|
|
// . flush the tree every 12 hours
|
|
// . i guess we could add incoming requests to the ufntree if
|
|
// they strictly beat the ufn tree tail node, HOWEVER, we
|
|
// still have the problem of that if a url we spidered is due
|
|
// to be respidered very soon we will miss it, as only the reply
|
|
// is added back into spiderdb, not a new request.
|
|
int32_t nowLocal = getTime();
|
|
// make it one hour so we don't cock-block a new high priority
|
|
// request that just got added... crap, what if its an addurl
|
|
// or something like that????
|
|
if ( nowLocal - s_lastUfnTreeFlushTime > 3600 ) {
|
|
s_ufnTree.clear();
|
|
s_lastUfnTreeFlushTime = nowLocal;
|
|
}
|
|
|
|
int64_t uh48;
|
|
|
|
//
|
|
// s_ufnTree tries to cache the top X spiderrequests for an IP
|
|
// that should be spidered next so we do not have to scan like
|
|
// a million spiderrequests in spiderdb to find the best one.
|
|
//
|
|
|
|
// if we have a specific uh48 targetted in s_ufnTree then that
|
|
// saves a ton of time!
|
|
// key format for s_ufnTree:
|
|
// iiiiiiii iiiiiiii iiiiiii iiiiiii i = firstip
|
|
// PPPPPPPP tttttttt ttttttt ttttttt P = priority
|
|
// tttttttt tttttttt hhhhhhh hhhhhhh t = spiderTimeMS (40 bits)
|
|
// hhhhhhhh hhhhhhhh hhhhhhh hhhhhhh h = urlhash48
|
|
key128_t key;
|
|
key.n1 = ip;
|
|
key.n1 <<= 32;
|
|
key.n0 = 0LL;
|
|
int32_t node = s_ufnTree.getNextNode_unlocked(0,(char *)&key);
|
|
// cancel node if not from our ip
|
|
if ( node >= 0 ) {
|
|
key128_t *rk = (key128_t *)s_ufnTree.getKey ( node );
|
|
if ( (rk->n1 >> 32) != (uint32_t)ip ) node = -1;
|
|
}
|
|
if ( node >= 0 ) {
|
|
// get the key
|
|
key128_t *nk = (key128_t *)s_ufnTree.getKey ( node );
|
|
// parse out uh48
|
|
uh48 = nk->n0;
|
|
// mask out spidertimems
|
|
uh48 &= 0x0000ffffffffffffLL;
|
|
// use that to refine the key range immensley!
|
|
m_nextKey = g_spiderdb.makeFirstKey2 (ip, uh48);
|
|
m_endKey = g_spiderdb.makeLastKey2 (ip, uh48);
|
|
// do not add the recs to the tree!
|
|
m_useTree = false;
|
|
}
|
|
*/
|
|
|
|
// so we know if we are the first read or not...
|
|
m_firstKey = m_nextKey;
|
|
|
|
// . initialize this before scanning the spiderdb recs of an ip
|
|
// . it lets us know if we recvd new spider requests for m_scanningIp
|
|
// while we were doing the scan
|
|
m_gotNewDataForScanningIp = 0;
|
|
|
|
m_lastListSize = -1;
|
|
|
|
// let evalIpLoop() know it has not yet tried to read from spiderdb
|
|
m_didRead = false;
|
|
|
|
// reset this
|
|
int32_t maxWinners = (int32_t)MAX_WINNER_NODES;
|
|
|
|
if (m_winnerTree.getNumNodes() == 0 &&
|
|
!m_winnerTree.set(-1, maxWinners, maxWinners * MAX_REQUEST_SIZE, true, "wintree", NULL,
|
|
sizeof(key192_t), -1)) {
|
|
m_isPopulatingDoledb = false;
|
|
log(LOG_ERROR, "Could not initialize m_winnerTree: %s",mstrerror(g_errno));
|
|
logTrace( g_conf.m_logTraceSpider, "END, after winnerTree.set" );
|
|
return;
|
|
}
|
|
|
|
if ( ! m_winnerTable.isInitialized() &&
|
|
! m_winnerTable.set ( 8 , // uh48 is key
|
|
sizeof(key192_t) , // winnertree key is data
|
|
64 , // 64 slots initially
|
|
NULL ,
|
|
0 ,
|
|
false , // allow dups?
|
|
"wtdedup" ) ) {
|
|
m_isPopulatingDoledb = false;
|
|
log(LOG_ERROR, "Could not initialize m_winnerTable: %s",mstrerror(g_errno));
|
|
logTrace( g_conf.m_logTraceSpider, "END, after winnerTable.set" );
|
|
return;
|
|
}
|
|
|
|
// clear it before evaluating this ip so it is empty
|
|
m_winnerTree.clear();
|
|
|
|
// and table as well now
|
|
m_winnerTable.clear();
|
|
|
|
// reset this as well
|
|
m_minFutureTimeMS = 0LL;
|
|
m_totalBytesScanned = 0LL;
|
|
m_totalNewSpiderRequests = 0LL;
|
|
m_lastOverflowFirstIp = 0;
|
|
|
|
// . look up in spiderdb otherwise and add best req to doledb from ip
|
|
// . if it blocks ultimately it calls gotSpiderdbListWrapper() which
|
|
// calls this function again with re-entry set to true
|
|
if ( ! evalIpLoop () ) {
|
|
logTrace( g_conf.m_logTraceSpider, "END, after evalIpLoop" );
|
|
return ;
|
|
}
|
|
|
|
// oom error? i've seen this happen and we end up locking up!
|
|
if ( g_errno ) {
|
|
log( "spider: evalIpLoop: %s", mstrerror(g_errno) );
|
|
m_isPopulatingDoledb = false;
|
|
logTrace( g_conf.m_logTraceSpider, "END, error after evalIpLoop" );
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
///////////////////
|
|
//
|
|
// KEYSTONE FUNCTION
|
|
//
|
|
// . READ ALL spiderdb recs for IP of m_scanningIp
|
|
// . add winner to doledb
|
|
// . called ONLY by populateDoledbFromWaitingTree()
|
|
//
|
|
// . continually scan spiderdb requests for a particular ip, m_scanningIp
|
|
// . compute the best spider request to spider next
|
|
// . add it to doledb
|
|
// . getNextIpFromWaitingTree() must have been called to set m_scanningIp
|
|
// otherwise m_bestRequestValid might not have been reset to false
|
|
//
|
|
///////////////////
|
|
|
|
bool SpiderColl::evalIpLoop ( ) {
|
|
logTrace( g_conf.m_logTraceSpider, "BEGIN" );
|
|
//testWinnerTreeKey ( );
|
|
|
|
// sanity
|
|
if ( m_scanningIp == 0 || m_scanningIp == -1 ) gbshutdownLogicError();
|
|
|
|
// are we trying to exit? some firstip lists can be quite long, so
|
|
// terminate here so all threads can return and we can exit properly
|
|
if (g_process.isShuttingDown()) {
|
|
logTrace( g_conf.m_logTraceSpider, "END, shutting down" );
|
|
return true;
|
|
}
|
|
|
|
bool useCache = true;
|
|
const CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
|
|
|
|
// did our collection rec get deleted? since we were doing a read
|
|
// the SpiderColl will have been preserved in that case but its
|
|
// m_deleteMyself flag will have been set.
|
|
if ( tryToDeleteSpiderColl ( this ,"6" ) ) return false;
|
|
|
|
// if doing site or page quotes for the sitepages or domainpages
|
|
// url filter expressions, we can't muck with the cache because
|
|
// we end up skipping the counting part.
|
|
if ( ! cr )
|
|
useCache = false;
|
|
if ( cr && cr->m_urlFiltersHavePageCounts )
|
|
useCache = false;
|
|
if ( m_countingPagesIndexed )
|
|
useCache = false;
|
|
// assume not from cache
|
|
if ( useCache ) {
|
|
// if this ip is in the winnerlistcache use that. it saves us a lot of time.
|
|
key96_t cacheKey;
|
|
cacheKey.n0 = m_scanningIp;
|
|
cacheKey.n1 = 0;
|
|
char *doleBuf = NULL;
|
|
size_t doleBufSize;
|
|
//g_spiderLoop.m_winnerListCache.verify();
|
|
FxBlobCacheLock<int32_t> rcl(g_spiderLoop.m_winnerListCache);
|
|
bool inCache = g_spiderLoop.m_winnerListCache.lookup(m_scanningIp, (void**)&doleBuf, &doleBufSize);
|
|
if ( inCache ) {
|
|
int32_t crc = hash32 ( doleBuf + 4 , doleBufSize - 4 );
|
|
|
|
char ipbuf[16];
|
|
logDebug( g_conf.m_logDebugSpider, "spider: GOT %zu bytes of SpiderRequests "
|
|
"from winnerlistcache for ip %s ptr=%p crc=%" PRIu32,
|
|
doleBufSize,
|
|
iptoa(m_scanningIp,ipbuf),
|
|
doleBuf,
|
|
crc);
|
|
|
|
//copy doleuf out from cache so we can release the lock
|
|
SafeBuf sb;
|
|
sb.safeMemcpy(doleBuf,doleBufSize);
|
|
|
|
rcl.unlock();
|
|
|
|
// now add the first rec m_doleBuf into doledb's tree
|
|
// and re-add the rest back to the cache with the same key.
|
|
bool rc = addDoleBufIntoDoledb(&sb,true);
|
|
|
|
logTrace( g_conf.m_logTraceSpider, "END, after addDoleBufIntoDoledb. returning %s", rc ? "true" : "false" );
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
top:
|
|
|
|
// did our collection rec get deleted? since we were doing a read
|
|
// the SpiderColl will have been preserved in that case but its
|
|
// m_deleteMyself flag will have been set.
|
|
if ( tryToDeleteSpiderColl ( this, "4" ) ) {
|
|
logTrace( g_conf.m_logTraceSpider, "END, after tryToDeleteSpiderColl (4)" );
|
|
return false;
|
|
}
|
|
|
|
// if first time here, let's do a read first
|
|
if ( ! m_didRead ) {
|
|
// reset list size to 0
|
|
m_list.reset();
|
|
// assume we did a read now
|
|
m_didRead = true;
|
|
// reset some stuff
|
|
m_lastScanningIp = 0;
|
|
|
|
// reset these that need to keep track of requests for
|
|
// the same url that might span two spiderdb lists or more
|
|
m_lastSreqUh48 = 0LL;
|
|
|
|
// do a read. if it blocks it will recall this loop
|
|
if ( ! readListFromSpiderdb () ) {
|
|
logTrace( g_conf.m_logTraceSpider, "END, readListFromSpiderdb returned false" );
|
|
return false;
|
|
}
|
|
}
|
|
|
|
for(;;) {
|
|
// did our collection rec get deleted? since we were doing a read
|
|
// the SpiderColl will have been preserved in that case but its
|
|
// m_deleteMyself flag will have been set.
|
|
if ( tryToDeleteSpiderColl ( this, "5" ) ) {
|
|
// pretend to block since we got deleted!!!
|
|
logTrace( g_conf.m_logTraceSpider, "END, after tryToDeleteSpiderColl (5)" );
|
|
return false;
|
|
}
|
|
|
|
// . did reading the list from spiderdb have an error?
|
|
// . i guess we don't add to doledb then
|
|
if ( g_errno ) {
|
|
log(LOG_ERROR,"spider: Had error getting list of urls from spiderdb: %s.",mstrerror(g_errno));
|
|
|
|
// save mem
|
|
m_list.freeList();
|
|
|
|
logTrace( g_conf.m_logTraceSpider, "END, g_errno %" PRId32, g_errno );
|
|
return true;
|
|
}
|
|
|
|
|
|
// if we started reading, then assume we got a fresh list here
|
|
logDebug( g_conf.m_logDebugSpider, "spider: back from msg5 spiderdb read2 of %" PRId32" bytes (cn=%" PRId32")",
|
|
m_list.getListSize(), (int32_t)m_collnum );
|
|
|
|
// . set the winning request for all lists we read so far
|
|
// . if m_countingPagesIndexed is true this will just fill in
|
|
// quota info into m_localTable...
|
|
scanListForWinners();
|
|
|
|
// if list not empty, keep reading!
|
|
if(m_list.isEmpty())
|
|
break;
|
|
|
|
// update m_nextKey for successive reads of spiderdb by
|
|
// calling readListFromSpiderdb()
|
|
key128_t lastKey = *(key128_t *)m_list.getLastKey();
|
|
|
|
// we're already done with this ip
|
|
int64_t lastUh48 = Spiderdb::getUrlHash48(&lastKey);
|
|
if (lastUh48 == 0xffffffffffffLL) {
|
|
break;
|
|
}
|
|
|
|
// crazy corruption?
|
|
if ( lastKey < m_nextKey ) {
|
|
char ipbuf[16];
|
|
log(LOG_WARN, "spider: got corruption. spiderdb keys out of order for "
|
|
"collnum=%" PRId32" for evaluation of firstip=%s so terminating evaluation of that firstip." ,
|
|
(int32_t)m_collnum, iptoa(m_scanningIp,ipbuf));
|
|
|
|
// this should result in an empty list read for
|
|
// m_scanningIp in spiderdb
|
|
m_nextKey = m_endKey;
|
|
}
|
|
else {
|
|
m_nextKey = Spiderdb::makeLastKey(Spiderdb::getFirstIp(&lastKey), ++lastUh48);
|
|
}
|
|
// . watch out for wrap around
|
|
// . normally i would go by this to indicate that we are
|
|
// done reading, but there's some bugs... so we go
|
|
// by whether our list is empty or not for now
|
|
if(m_nextKey < lastKey)
|
|
m_nextKey = lastKey;
|
|
// reset list to save mem
|
|
m_list.reset();
|
|
// read more! return if it blocked
|
|
if(!readListFromSpiderdb())
|
|
return false;
|
|
// we got a list without blocking
|
|
}
|
|
|
|
|
|
// . we are all done if last list read was empty
|
|
// . if we were just counting pages for quota, do a 2nd pass!
|
|
if ( m_countingPagesIndexed ) {
|
|
// do not do again.
|
|
m_countingPagesIndexed = false;
|
|
// start at the top again
|
|
m_nextKey = Spiderdb::makeFirstKey(m_scanningIp);
|
|
// this time m_localTable should have the quota info in it so
|
|
// getUrlFilterNum() can use that
|
|
m_didRead = false;
|
|
// do the 2nd pass. read list from the very top.
|
|
goto top;
|
|
}
|
|
|
|
// free list to save memory
|
|
m_list.freeList();
|
|
|
|
// . add all winners if we can in m_winnerTree into doledb
|
|
// . if list was empty, then reading is all done so take the winner we
|
|
// got from all the lists we did read for this IP and add him
|
|
// to doledb
|
|
// . if no winner exists, then remove m_scanningIp from m_waitingTree
|
|
// so we do not waste our time again. if url filters change then
|
|
// waiting tree will be rebuilt and we'll try again... or if
|
|
// a new spider request or reply for this ip comes in we'll try
|
|
// again as well...
|
|
// . this returns false if blocked adding to doledb using msg1
|
|
if ( ! addWinnersIntoDoledb() ) {
|
|
logTrace( g_conf.m_logTraceSpider, "END, returning false. After addWinnersIntoDoledb" );
|
|
return false;
|
|
}
|
|
|
|
// we are done...
|
|
logTrace( g_conf.m_logTraceSpider, "END, all done" );
|
|
return true;
|
|
}
|
|
|
|
|
|
void SpiderColl::getSpiderdbListWrapper(void *state) {
|
|
SpiderColl *sc = static_cast<SpiderColl*>(state);
|
|
|
|
if(!SpiderdbRdbSqliteBridge::getList(sc->m_cr->m_collnum,
|
|
&sc->m_list,
|
|
sc->m_nextKey,
|
|
sc->m_endKey,
|
|
SR_READ_SIZE)) {
|
|
if(!g_errno) {
|
|
g_errno = EIO; //imprecise
|
|
}
|
|
logTrace( g_conf.m_logTraceSpider, "END, got io-error from sqlite" );
|
|
return;
|
|
}
|
|
}
|
|
|
|
void SpiderColl::gotSpiderdbListWrapper(void *state, job_exit_t exit_type) {
|
|
SpiderColl *THIS = (SpiderColl *)state;
|
|
|
|
// are we trying to exit? some firstip lists can be quite long, so
|
|
// terminate here so all threads can return and we can exit properly
|
|
if (g_process.isShuttingDown()) {
|
|
return;
|
|
}
|
|
|
|
// return if that blocked
|
|
if (!THIS->evalIpLoop()) {
|
|
return;
|
|
}
|
|
|
|
// we are done, re-entry popuatedoledb
|
|
THIS->m_isPopulatingDoledb = false;
|
|
|
|
// gotta set m_isPopulatingDoledb to false lest it won't work
|
|
THIS->populateDoledbFromWaitingTree();
|
|
}
|
|
|
|
|
|
// . this is ONLY CALLED from evalIpLoop() above
|
|
// . returns false if blocked, true otherwise
|
|
// . returns true and sets g_errno on error
|
|
bool SpiderColl::readListFromSpiderdb ( ) {
|
|
logTrace( g_conf.m_logTraceSpider, "BEGIN" );
|
|
|
|
if ( ! m_waitingTreeKeyValid ) gbshutdownLogicError();
|
|
if ( ! m_scanningIp ) gbshutdownLogicError();
|
|
|
|
const CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
|
|
if ( ! cr ) {
|
|
log(LOG_ERROR,"spider: lost collnum %" PRId32,(int32_t)m_collnum);
|
|
g_errno = ENOCOLLREC;
|
|
|
|
logTrace( g_conf.m_logTraceSpider, "END, ENOCOLLREC" );
|
|
return true;
|
|
}
|
|
|
|
// i guess we are always restricted to an ip, because
|
|
// populateWaitingTreeFromSpiderdb calls its own msg5.
|
|
int32_t firstIp0 = Spiderdb::getFirstIp(&m_nextKey);
|
|
// sanity
|
|
if ( m_scanningIp != firstIp0 ) gbshutdownLogicError();
|
|
// sometimes we already have this ip in doledb/doleiptable
|
|
// already and somehow we try to scan spiderdb for it anyway
|
|
if (isInDoledbIpTable(firstIp0)) gbshutdownLogicError();
|
|
|
|
// if it got zapped from the waiting tree by the time we read the list
|
|
if (!isInWaitingTable(m_scanningIp)) {
|
|
logTrace( g_conf.m_logTraceSpider, "END, IP no longer in waitingTree" );
|
|
return true;
|
|
}
|
|
|
|
// sanity check
|
|
if (!m_waitingTree.getNode(0, (char *)&m_waitingTreeKey)) {
|
|
// it gets removed because addSpiderReply() calls addToWaitingTree
|
|
// and replaces the node we are scanning with one that has a better
|
|
// time, an earlier time, even though that time may have come and
|
|
// we are scanning it now. perhaps addToWaitingTree() should ignore
|
|
// the ip if it equals m_scanningIp?
|
|
log(LOG_WARN, "spider: waiting tree key removed while reading list for %s (%" PRId32")", cr->m_coll,(int32_t)m_collnum);
|
|
logTrace( g_conf.m_logTraceSpider, "END, waitingTree node was removed" );
|
|
return true;
|
|
}
|
|
|
|
// . read in a replacement SpiderRequest to add to doledb from
|
|
// this ip
|
|
// . get the list of spiderdb records
|
|
// . do not include cache, those results are old and will mess
|
|
// us up
|
|
if (g_conf.m_logDebugSpider ) {
|
|
// got print each out individually because KEYSTR
|
|
// uses a static buffer to store the string
|
|
SafeBuf tmp;
|
|
char ipbuf[16];
|
|
tmp.safePrintf("spider: readListFromSpiderdb: calling msg5: ");
|
|
tmp.safePrintf("firstKey=%s ", KEYSTR(&m_firstKey,sizeof(key128_t)));
|
|
tmp.safePrintf("endKey=%s ", KEYSTR(&m_endKey,sizeof(key128_t)));
|
|
tmp.safePrintf("nextKey=%s ", KEYSTR(&m_nextKey,sizeof(key128_t)));
|
|
tmp.safePrintf("firstip=%s ", iptoa(m_scanningIp,ipbuf));
|
|
tmp.safePrintf("(cn=%" PRId32")",(int32_t)m_collnum);
|
|
log(LOG_DEBUG,"%s",tmp.getBufStart());
|
|
}
|
|
|
|
// log this better
|
|
char ipbuf[16];
|
|
logDebug(g_conf.m_logDebugSpider, "spider: readListFromSpiderdb: firstip=%s key=%s",
|
|
iptoa(m_scanningIp,ipbuf), KEYSTR( &m_nextKey, sizeof( key128_t ) ) );
|
|
|
|
// . read the list from local disk
|
|
// . if a niceness 0 intersect thread is taking a LONG time
|
|
// then this will not complete in a long time and we
|
|
// end up timing out the round. so try checking for
|
|
// m_gettingList in spiderDoledUrls() and setting
|
|
// m_lastSpiderCouldLaunch
|
|
if (g_jobScheduler.submit(getSpiderdbListWrapper, gotSpiderdbListWrapper, this, thread_type_spider_read, 0)) {
|
|
return false;
|
|
}
|
|
|
|
// unable to submit job
|
|
getSpiderdbListWrapper(this);
|
|
|
|
// note its return
|
|
logDebug( g_conf.m_logDebugSpider, "spider: back from msg5 spiderdb read of %" PRId32" bytes",m_list.getListSize());
|
|
|
|
// got it without blocking. maybe all in tree or in cache
|
|
logTrace( g_conf.m_logTraceSpider, "END, didn't block" );
|
|
return true;
|
|
}
|
|
|
|
|
|
|
|
static int32_t s_lastIn = 0;
|
|
static int32_t s_lastOut = 0;
|
|
|
|
bool SpiderColl::isFirstIpInOverflowList(int32_t firstIp) const {
|
|
if ( ! m_overflowList ) return false;
|
|
if ( firstIp == 0 || firstIp == -1 ) return false;
|
|
if ( firstIp == s_lastIn ) return true;
|
|
if ( firstIp == s_lastOut ) return false;
|
|
for ( int32_t oi = 0 ; ; oi++ ) {
|
|
// stop at end
|
|
if ( ! m_overflowList[oi] ) break;
|
|
// an ip of zero is end of the list
|
|
if ( m_overflowList[oi] == firstIp ) {
|
|
s_lastIn = firstIp;
|
|
return true;
|
|
}
|
|
}
|
|
s_lastOut = firstIp;
|
|
return false;
|
|
}
|
|
|
|
|
|
|
|
// . ADDS top X winners to m_winnerTree
|
|
// . this is ONLY CALLED from evalIpLoop() above
|
|
// . scan m_list that we read from spiderdb for m_scanningIp IP
|
|
// . set m_bestRequest if an request in m_list is better than what is
|
|
// in m_bestRequest from previous lists for this IP
|
|
bool SpiderColl::scanListForWinners ( ) {
|
|
// if list is empty why are we here?
|
|
if ( m_list.isEmpty() ) return true;
|
|
|
|
// don't proceed if we're shutting down
|
|
if (g_process.isShuttingDown()) {
|
|
return true;
|
|
}
|
|
|
|
// ensure we point to the top of the list
|
|
m_list.resetListPtr();
|
|
|
|
// get this
|
|
int64_t nowGlobalMS = gettimeofdayInMilliseconds();//Local();
|
|
uint32_t nowGlobal = nowGlobalMS / 1000;
|
|
|
|
const SpiderReply *srep = NULL;
|
|
int64_t srepUh48 = 0;
|
|
|
|
// if we are continuing from another list...
|
|
if ( m_lastReplyValid ) {
|
|
srep = (SpiderReply *)m_lastReplyBuf;
|
|
srepUh48 = srep->getUrlHash48();
|
|
}
|
|
|
|
// show list stats
|
|
char ipbuf[16];
|
|
logDebug( g_conf.m_logDebugSpider, "spider: readListFromSpiderdb: got list of size %" PRId32" for firstip=%s",
|
|
m_list.getListSize(), iptoa(m_scanningIp,ipbuf) );
|
|
|
|
|
|
// if we don't read minRecSizes worth of data that MUST indicate
|
|
// there is no more data to read. put this theory to the test
|
|
// before we use it to indcate an end of list condition.
|
|
if ( m_list.getListSize() > 0 &&
|
|
m_lastScanningIp == m_scanningIp &&
|
|
m_lastListSize < (int32_t)SR_READ_SIZE &&
|
|
m_lastListSize >= 0 ) {
|
|
log(LOG_ERROR,"spider: shucks. spiderdb reads not full.");
|
|
}
|
|
|
|
m_lastListSize = m_list.getListSize();
|
|
m_lastScanningIp = m_scanningIp;
|
|
|
|
m_totalBytesScanned += m_list.getListSize();
|
|
|
|
if ( m_list.isEmpty() ) {
|
|
logDebug( g_conf.m_logDebugSpider, "spider: failed to get rec for ip=%s", iptoa(m_scanningIp,ipbuf) );
|
|
}
|
|
|
|
|
|
int32_t firstIp = m_waitingTreeKey.n0 & 0xffffffff;
|
|
|
|
key128_t finalKey;
|
|
int32_t recCount = 0;
|
|
|
|
// loop over all serialized spiderdb records in the list
|
|
for ( ; ! m_list.isExhausted() ; ) {
|
|
// stop coring on empty lists
|
|
if ( m_list.isEmpty() ) break;
|
|
// get spiderdb rec in its serialized form
|
|
char *rec = m_list.getCurrentRec();
|
|
// count it
|
|
recCount++;
|
|
// sanity
|
|
memcpy ( (char *)&finalKey , rec , sizeof(key128_t) );
|
|
// skip to next guy
|
|
m_list.skipCurrentRecord();
|
|
// negative? wtf?
|
|
if ( (rec[0] & 0x01) == 0x00 ) {
|
|
logf(LOG_DEBUG,"spider: got negative spider rec");
|
|
continue;
|
|
}
|
|
// if its a SpiderReply set it for an upcoming requests
|
|
if ( ! Spiderdb::isSpiderRequest ( (key128_t *)rec ) ) {
|
|
|
|
// see if this is the most recent one
|
|
const SpiderReply *tmp = (SpiderReply *)rec;
|
|
|
|
// . MDW: we have to detect corrupt replies up here so
|
|
// they do not become the winning reply because
|
|
// their date is in the future!!
|
|
|
|
if ( tmp->m_spideredTime > nowGlobal + 1 ) {
|
|
if ( m_cr->m_spiderCorruptCount == 0 ) {
|
|
log( LOG_WARN, "spider: got corrupt time spiderReply in scan uh48=%" PRId64" httpstatus=%" PRId32" datasize=%" PRId32" (cn=%" PRId32") ip=%s",
|
|
tmp->getUrlHash48(),
|
|
(int32_t)tmp->m_httpStatus,
|
|
tmp->m_dataSize,
|
|
(int32_t)m_collnum,
|
|
iptoa(m_scanningIp,ipbuf));
|
|
}
|
|
m_cr->m_spiderCorruptCount++;
|
|
// don't nuke it just for that...
|
|
//srep = NULL;
|
|
continue;
|
|
}
|
|
|
|
// . this is -1 on corruption
|
|
// . i've seen -31757, 21... etc for bad http replies
|
|
// in the qatest123 doc cache... so turn off for that
|
|
if ( tmp->m_httpStatus >= 1000 ) {
|
|
if ( m_cr->m_spiderCorruptCount == 0 ) {
|
|
log(LOG_WARN, "spider: got corrupt 3 spiderReply in scan uh48=%" PRId64" httpstatus=%" PRId32" datasize=%" PRId32" (cn=%" PRId32") ip=%s",
|
|
tmp->getUrlHash48(),
|
|
(int32_t)tmp->m_httpStatus,
|
|
tmp->m_dataSize,
|
|
(int32_t)m_collnum,
|
|
iptoa(m_scanningIp,ipbuf));
|
|
}
|
|
m_cr->m_spiderCorruptCount++;
|
|
// don't nuke it just for that...
|
|
//srep = NULL;
|
|
continue;
|
|
}
|
|
|
|
// bad langid?
|
|
if ( ! getLanguageAbbr (tmp->m_langId) ) {
|
|
log(LOG_WARN, "spider: got corrupt 4 spiderReply in scan uh48=%" PRId64" langid=%" PRId32" (cn=%" PRId32") ip=%s",
|
|
tmp->getUrlHash48(),
|
|
(int32_t)tmp->m_langId,
|
|
(int32_t)m_collnum,
|
|
iptoa(m_scanningIp,ipbuf));
|
|
m_cr->m_spiderCorruptCount++;
|
|
//srep = NULL;
|
|
continue;
|
|
}
|
|
|
|
// if we are corrupt, skip us
|
|
if ( tmp->getRecSize() > (int32_t)MAX_SP_REPLY_SIZE )
|
|
continue;
|
|
|
|
// if we have a more recent reply already, skip this
|
|
if ( srep &&
|
|
srep->getUrlHash48() == tmp->getUrlHash48() &&
|
|
srep->m_spideredTime >= tmp->m_spideredTime )
|
|
continue;
|
|
// otherwise, assign it
|
|
srep = tmp;
|
|
srepUh48 = srep->getUrlHash48();
|
|
continue;
|
|
}
|
|
// cast it
|
|
SpiderRequest *sreq = (SpiderRequest *)rec;
|
|
|
|
// skip if our twin or another shard should handle it
|
|
if ( ! isAssignedToUs ( sreq->m_firstIp ) ) {
|
|
continue;
|
|
}
|
|
|
|
int64_t uh48 = sreq->getUrlHash48();
|
|
|
|
// null out srep if no match
|
|
if ( srep && srepUh48 != uh48 ) {
|
|
srep = NULL;
|
|
}
|
|
|
|
// . ignore docid-based requests if spidered the url afterwards
|
|
// . these are one-hit wonders
|
|
// . once done they can be deleted
|
|
if ( sreq->m_isPageReindex && srep && srep->m_spideredTime > sreq->m_addedTime ) {
|
|
logDebug( g_conf.m_logDebugSpider, "spider: skipping9 %s", sreq->m_url );
|
|
continue;
|
|
}
|
|
|
|
// if a replie-less new url spiderrequest count it
|
|
// avoid counting query reindex requests
|
|
if ( ! srep && m_lastSreqUh48 != uh48 && ! sreq->m_fakeFirstIp ) {
|
|
m_totalNewSpiderRequests++;
|
|
}
|
|
|
|
int32_t cblock = ipdom ( sreq->m_firstIp );
|
|
|
|
bool countIt = true;
|
|
|
|
// reset page inlink count on url request change
|
|
if ( m_lastSreqUh48 != uh48 ) {
|
|
m_pageNumInlinks = 0;
|
|
m_lastCBlockIp = 0;
|
|
}
|
|
|
|
|
|
if ( cblock == m_lastCBlockIp ||
|
|
// do not count manually added spider requests
|
|
sreq->m_isAddUrl || sreq->m_isInjecting ||
|
|
// 20 is good enough
|
|
m_pageNumInlinks >= 20 ) {
|
|
countIt = false;
|
|
}
|
|
|
|
if ( countIt ) {
|
|
int32_t ca;
|
|
for ( ca = 0 ; ca < m_pageNumInlinks ; ca++ ) {
|
|
if ( m_cblocks[ ca ] == cblock ) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
// if found in our list, do not count it, already did
|
|
if ( ca < m_pageNumInlinks ) {
|
|
countIt = false;
|
|
}
|
|
}
|
|
|
|
if ( countIt ) {
|
|
m_cblocks[m_pageNumInlinks] = cblock;
|
|
m_pageNumInlinks++;
|
|
if ( m_pageNumInlinks > 20 ) gbshutdownAbort(true);
|
|
}
|
|
|
|
// set this now. it does increase with each request. so
|
|
// initial requests will not see the full # of inlinks.
|
|
sreq->m_pageNumInlinks = (uint8_t)m_pageNumInlinks;
|
|
|
|
m_lastSreqUh48 = uh48;
|
|
m_lastCBlockIp = cblock;
|
|
|
|
// only add firstip if manually added and not fake
|
|
|
|
//
|
|
// just calculating page counts? if the url filters are based
|
|
// on the # of pages indexed per ip or subdomain/site then
|
|
// we have to maintain a page count table. sitepages.
|
|
//
|
|
if ( m_countingPagesIndexed ) {
|
|
// only add dom/site hash seeds if it is
|
|
// a fake firstIp to avoid double counting seeds
|
|
if ( sreq->m_fakeFirstIp ) continue;
|
|
// count the manual additions separately. mangle their
|
|
// hash with 0x123456 so they are separate.
|
|
if ( (sreq->m_isAddUrl || sreq->m_isInjecting) &&
|
|
// unique votes per seed
|
|
uh48 != m_lastReqUh48a ) {
|
|
// do not repeat count the same url
|
|
m_lastReqUh48a = uh48;
|
|
// sanity
|
|
if ( ! sreq->m_siteHash32) gbshutdownAbort(true); //isj: this abort is questionable
|
|
// do a little magic because we count
|
|
// seeds as "manual adds" as well as normal pg
|
|
int32_t h32;
|
|
h32 = sreq->m_siteHash32 ^ 0x123456;
|
|
m_siteIndexedDocumentCount.addScore(h32);
|
|
}
|
|
// unique votes per other for quota
|
|
if ( uh48 == m_lastReqUh48b ) continue;
|
|
// update this to ensure unique voting
|
|
m_lastReqUh48b = uh48;
|
|
// now count pages indexed below here
|
|
if ( ! srep ) continue;
|
|
if ( srepUh48 == m_lastRepUh48 ) continue;
|
|
m_lastRepUh48 = srepUh48;
|
|
//if ( ! srep ) continue;
|
|
// TODO: what is srep->m_isIndexedINValid is set????
|
|
if ( ! srep->m_isIndexed ) continue;
|
|
// keep count per site and firstip
|
|
m_siteIndexedDocumentCount.addScore(sreq->m_siteHash32,1);
|
|
|
|
const int32_t *tmpNum = (const int32_t *)m_siteIndexedDocumentCount.getValue( &( sreq->m_siteHash32 ) );
|
|
logDebug( g_conf.m_logDebugSpider, "spider: sitequota: got %" PRId32" indexed docs for site from "
|
|
"firstip of %s from url %s", tmpNum ? *tmpNum : -1,
|
|
iptoa(sreq->m_firstIp,ipbuf),
|
|
sreq->m_url );
|
|
continue;
|
|
}
|
|
|
|
|
|
// if the spiderrequest has a fake firstip that means it
|
|
// was injected without doing a proper ip lookup for speed.
|
|
// xmldoc.cpp will check for m_fakeFirstIp and it that is
|
|
// set in the spiderrequest it will simply add a new request
|
|
// with the correct firstip. it will be a completely different
|
|
// spiderrequest key then. so no need to keep the "fakes".
|
|
// it will log the EFAKEFIRSTIP error msg.
|
|
if ( sreq->m_fakeFirstIp && srep && srep->m_spideredTime > sreq->m_addedTime ) {
|
|
logDebug( g_conf.m_logDebugSpider, "spider: skipping6 %s", sreq->m_url );
|
|
continue;
|
|
}
|
|
|
|
if (!sreq->m_urlIsDocId) {
|
|
//delete request if it is an url we don't want to index
|
|
Url url;
|
|
url.set(sreq->m_url);
|
|
if (url.hasNonIndexableExtension(TITLEREC_CURRENT_VERSION) || isUrlBlocked(url,NULL)) {
|
|
if (srep && !srep->m_isIndexedINValid && srep->m_isIndexed) {
|
|
log(LOG_DEBUG, "Found unwanted/non-indexable URL '%s' in spiderdb. Force deleting it", sreq->m_url);
|
|
sreq->m_forceDelete = true;
|
|
} else {
|
|
log(LOG_DEBUG, "Found unwanted/non-indexable URL '%s' in spiderdb. Deleting it", sreq->m_url);
|
|
SpiderdbUtil::deleteRecord(m_collnum, sreq->m_firstIp, uh48);
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
if ( sreq->isCorrupt() ) {
|
|
if ( m_cr->m_spiderCorruptCount == 0 )
|
|
log( LOG_WARN, "spider: got corrupt xx spiderRequest in scan because url is %s (cn=%" PRId32")",
|
|
sreq->m_url,(int32_t)m_collnum);
|
|
m_cr->m_spiderCorruptCount++;
|
|
continue;
|
|
}
|
|
|
|
if ( sreq->m_dataSize > (int32_t)sizeof(SpiderRequest) ) {
|
|
if ( m_cr->m_spiderCorruptCount == 0 )
|
|
log( LOG_WARN, "spider: got corrupt 11 spiderRequest in scan because rectoobig u=%s (cn=%" PRId32")",
|
|
sreq->m_url,(int32_t)m_collnum);
|
|
m_cr->m_spiderCorruptCount++;
|
|
continue;
|
|
}
|
|
|
|
int32_t delta = sreq->m_addedTime - nowGlobal;
|
|
if ( delta > 86400 ) {
|
|
static bool s_first = true;
|
|
if ( m_cr->m_spiderCorruptCount == 0 || s_first ) {
|
|
s_first = false;
|
|
log( LOG_WARN, "spider: got corrupt 6 spiderRequest in scan because added time is %" PRId32" (delta=%" PRId32" which is well into the future. url=%s (cn=%i)",
|
|
(int32_t)sreq->m_addedTime, delta, sreq->m_url, (int)m_collnum);
|
|
}
|
|
m_cr->m_spiderCorruptCount++;
|
|
continue;
|
|
}
|
|
|
|
|
|
// update SpiderRequest::m_siteNumInlinks to most recent value
|
|
int32_t sni = sreq->m_siteNumInlinks;
|
|
{
|
|
ScopedLock sl(m_sniTableMtx);
|
|
|
|
// get the # of inlinks to the site from our table
|
|
const uint64_t *val = (const uint64_t *)m_sniTable.getValue32(sreq->m_siteHash32);
|
|
// use the most recent sni from this table
|
|
if (val)
|
|
sni = (int32_t)((*val) >> 32);
|
|
// if SpiderRequest is forced then m_siteHash32 is 0!
|
|
else if (srep && srep->m_spideredTime >= sreq->m_addedTime)
|
|
sni = srep->m_siteNumInlinks;
|
|
}
|
|
// assign
|
|
sreq->m_siteNumInlinks = sni;
|
|
|
|
// store error count in request so xmldoc knows what it is
|
|
// and can increment it and re-add it to its spiderreply if
|
|
// it gets another error
|
|
if ( srep ) {
|
|
// . assign this too from latest reply - smart compress
|
|
// . this WAS SpiderReply::m_pubdate so it might be
|
|
// set to a non-zero value that is wrong now... but
|
|
// not a big deal!
|
|
sreq->m_contentHash32 = srep->m_contentHash32;
|
|
// if we tried it before
|
|
sreq->m_hadReply = true;
|
|
}
|
|
|
|
// . get the url filter we match
|
|
// . if this is slow see the TODO below in dedupSpiderdbList()
|
|
// which can pre-store these values assuming url filters do
|
|
// not change and siteNumInlinks is about the same.
|
|
int32_t ufn = ::getUrlFilterNum(sreq, srep, nowGlobal, false, m_cr, false, -1);
|
|
// sanity check
|
|
if ( ufn == -1 ) {
|
|
log( LOG_WARN, "failed to match url filter for url='%s' coll='%s'", sreq->m_url, m_cr->m_coll );
|
|
g_errno = EBADENGINEER;
|
|
return true;
|
|
}
|
|
// set the priority (might be the same as old)
|
|
int32_t priority = m_cr->m_spiderPriorities[ufn];
|
|
// now get rid of negative priorities since we added a
|
|
// separate force delete checkbox in the url filters
|
|
if ( priority < 0 ) priority = 0;
|
|
if ( priority >= MAX_SPIDER_PRIORITIES) gbshutdownLogicError();
|
|
|
|
logDebug( g_conf.m_logDebugSpider, "spider: got ufn=%" PRId32" for %s (%" PRId64")",
|
|
ufn, sreq->m_url, sreq->getUrlHash48() );
|
|
|
|
if ( srep )
|
|
logDebug(g_conf.m_logDebugSpider, "spider: lastspidered=%" PRIu32, srep->m_spideredTime );
|
|
|
|
// spiders disabled for this row in url filteres?
|
|
if ( m_cr->m_maxSpidersPerRule[ufn] <= 0 ) {
|
|
continue;
|
|
}
|
|
|
|
// skip if banned (unless need to delete from index)
|
|
if (m_cr->m_forceDelete[ufn]) {
|
|
// but if it is currently indexed we have to delete it
|
|
if (!(srep && srep->m_isIndexed)) {
|
|
// so only skip if it's not indexed
|
|
continue;
|
|
}
|
|
}
|
|
|
|
if ( m_cr->m_forceDelete[ufn] ) {
|
|
logDebug( g_conf.m_logDebugSpider, "spider: force delete ufn=%" PRId32" url='%s'", ufn, sreq->m_url );
|
|
// force it to a delete
|
|
sreq->m_forceDelete = true;
|
|
}
|
|
|
|
int64_t spiderTimeMS = getSpiderTimeMS(sreq, ufn, srep, nowGlobalMS);
|
|
|
|
// sanity
|
|
if ( (int64_t)spiderTimeMS < 0 ) {
|
|
log( LOG_WARN, "spider: got corrupt 2 spiderRequest in scan (cn=%" PRId32")",
|
|
(int32_t)m_collnum);
|
|
continue;
|
|
}
|
|
|
|
// save this shit for storing in doledb
|
|
sreq->m_ufn = ufn;
|
|
sreq->m_priority = priority;
|
|
|
|
// if it is in future, skip it and just set m_futureTime and
|
|
// and we will update the waiting tree
|
|
// with an entry based on that future time if the winnerTree
|
|
// turns out to be empty after we've completed our scan
|
|
if ( spiderTimeMS > nowGlobalMS ) {
|
|
// if futuretime is zero set it to this time
|
|
if ( ! m_minFutureTimeMS )
|
|
m_minFutureTimeMS = spiderTimeMS;
|
|
// otherwise we get the MIN of all future times
|
|
else if ( spiderTimeMS < m_minFutureTimeMS )
|
|
m_minFutureTimeMS = spiderTimeMS;
|
|
if ( g_conf.m_logDebugSpider )
|
|
log(LOG_DEBUG,"spider: skippingx %s",sreq->m_url);
|
|
continue;
|
|
}
|
|
|
|
|
|
// we can't have negative priorities at this point because
|
|
// the s_ufnTree uses priority as part of the key so it
|
|
// can get the top 100 or so urls for a firstip to avoid
|
|
// having to hit spiderdb for every one!
|
|
//if ( priority < 0 ) gbshutdownLogicError();
|
|
|
|
// bail if it is locked! we now call
|
|
// msg12::confirmLockAcquisition() after we get the lock,
|
|
// which deletes the doledb record from doledb and doleiptable
|
|
// rightaway and adds a "0" entry into the waiting tree so
|
|
// that evalIpLoop() repopulates doledb again with that
|
|
// "firstIp". this way we can spider multiple urls from the
|
|
// same ip at the same time.
|
|
int64_t key = makeLockTableKey ( sreq );
|
|
|
|
logDebug( g_conf.m_logDebugSpider, "spider: checking uh48=%" PRId64" lockkey=%" PRId64" used=%" PRId32,
|
|
uh48, key, g_spiderLoop.getLockCount() );
|
|
|
|
// MDW
|
|
if (g_spiderLoop.isLocked(key)) {
|
|
logDebug( g_conf.m_logDebugSpider, "spider: skipping url lockkey=%" PRId64" in lock table sreq.url=%s",
|
|
key, sreq->m_url );
|
|
continue;
|
|
}
|
|
|
|
// make key
|
|
key192_t wk = makeWinnerTreeKey( firstIp ,
|
|
priority ,
|
|
spiderTimeMS ,
|
|
uh48 );
|
|
|
|
// assume our added time is the first time this url was added
|
|
sreq->m_discoveryTime = sreq->m_addedTime;
|
|
|
|
// if this url is already in the winnerTree then either we
|
|
// replace it or we skip ourselves.
|
|
//
|
|
// watch out for dups in winner tree, the same url can have
|
|
// multiple spiderTimeMses somehow...
|
|
// as well, resulting in different priorities...
|
|
// actually the dedup table could map to a priority and a node
|
|
// so we can kick out a lower priority version of the same url.
|
|
int32_t winSlot = m_winnerTable.getSlot ( &uh48 );
|
|
if ( winSlot >= 0 ) {
|
|
const key192_t *oldwk = (const key192_t *)m_winnerTable.getValueFromSlot ( winSlot );
|
|
|
|
SpiderRequest *wsreq = (SpiderRequest *)m_winnerTree.getData(0,(const char *)oldwk);
|
|
|
|
if ( wsreq ) {
|
|
// and the min added time as well!
|
|
// get the oldest timestamp so
|
|
// gbssDiscoveryTime will be accurate.
|
|
if ( sreq->m_discoveryTime < wsreq->m_discoveryTime )
|
|
wsreq->m_discoveryTime = sreq->m_discoveryTime;
|
|
|
|
if ( wsreq->m_discoveryTime < sreq->m_discoveryTime )
|
|
sreq->m_discoveryTime = wsreq->m_discoveryTime;
|
|
}
|
|
|
|
|
|
|
|
// are we lower priority? (or equal)
|
|
// smaller keys are HIGHER priority.
|
|
if(KEYCMP( (const char *)&wk, (const char *)oldwk, sizeof(key192_t)) >= 0)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
// from table too. no it's a dup uh48!
|
|
//m_winnerTable.deleteKey ( &uh48 );
|
|
// otherwise we supplant it. remove old key from tree.
|
|
m_winnerTree.deleteNode ( 0 , (char *)oldwk , false);
|
|
// supplant in table and tree... just add below...
|
|
}
|
|
|
|
// get the top 100 spider requests by priority/time/etc.
|
|
int32_t maxWinners = (int32_t)MAX_WINNER_NODES; // 40
|
|
|
|
|
|
//@todo BR: Why max winners based on bytes scanned??
|
|
// if less than 10MB of spiderdb requests limit to 400
|
|
if ( m_totalBytesScanned < 10000000 ) maxWinners = 400;
|
|
|
|
// only put one doledb record into winner tree if
|
|
// the list is pretty short. otherwise, we end up caching
|
|
// too much. granted, we only cache for about 2 mins.
|
|
// mdw: for testing take this out!
|
|
if ( m_totalBytesScanned < 25000 ) maxWinners = 1;
|
|
|
|
// sanity. make sure read is somewhat hefty for our maxWinners=1 thing
|
|
static_assert(SR_READ_SIZE >= 500000, "ensure read size is big enough");
|
|
|
|
{
|
|
ScopedLock sl(m_winnerTree.getLock());
|
|
// only compare to min winner in tree if tree is full
|
|
if (m_winnerTree.getNumUsedNodes_unlocked() >= maxWinners) {
|
|
// get that key
|
|
int64_t tm1 = spiderTimeMS;
|
|
// get the spider time of lowest scoring req in tree
|
|
int64_t tm2 = m_tailTimeMS;
|
|
// if they are both overdue, make them the same
|
|
if (tm1 < nowGlobalMS) tm1 = 1;
|
|
if (tm2 < nowGlobalMS) tm2 = 1;
|
|
// skip spider request if its time is past winner's
|
|
if (tm1 > tm2)
|
|
continue;
|
|
if (tm1 < tm2)
|
|
goto gotNewWinner;
|
|
// if tied, use priority
|
|
if (priority < m_tailPriority)
|
|
continue;
|
|
if (priority > m_tailPriority)
|
|
goto gotNewWinner;
|
|
// if tied, use actual times. assuming both<nowGlobalMS
|
|
if (spiderTimeMS > m_tailTimeMS)
|
|
continue;
|
|
if (spiderTimeMS < m_tailTimeMS)
|
|
goto gotNewWinner;
|
|
// all tied, keep it the same i guess
|
|
continue;
|
|
// otherwise, add the new winner in and remove the old
|
|
gotNewWinner:
|
|
// get lowest scoring node in tree
|
|
int32_t tailNode = m_winnerTree.getLastNode_unlocked();
|
|
// from table too
|
|
m_winnerTable.removeKey(&m_tailUh48);
|
|
// delete the tail so new spiderrequest can enter
|
|
m_winnerTree.deleteNode_unlocked(tailNode, true);
|
|
}
|
|
}
|
|
|
|
// somestimes the firstip in its key does not match the
|
|
// firstip in the record!
|
|
if ( sreq->m_firstIp != firstIp ) {
|
|
log(LOG_ERROR,"spider: request %s firstip does not match "
|
|
"firstip in key collnum=%i",sreq->m_url,
|
|
(int)m_collnum);
|
|
log(LOG_ERROR,"spider: ip1=%s",iptoa(sreq->m_firstIp,ipbuf));
|
|
log(LOG_ERROR,"spider: ip2=%s",iptoa(firstIp,ipbuf));
|
|
continue;
|
|
}
|
|
|
|
|
|
// . add to table which allows us to ensure same url not
|
|
// repeated in tree
|
|
// . just skip if fail to add...
|
|
if ( ! m_winnerTable.addKey ( &uh48 , &wk ) ) {
|
|
log(LOG_WARN,"spider: skipping. could not add to winnerTable. %s. ip=%s", sreq->m_url,iptoa(m_scanningIp,ipbuf) );
|
|
continue;
|
|
}
|
|
|
|
// use an individually allocated buffer for each spiderrequest
|
|
// so if it gets removed from tree the memory can be freed by
|
|
// the tree which "owns" the data because m_winnerTree.set()
|
|
// above set ownsData
|
|
// to true above.
|
|
int32_t need = sreq->getRecSize();
|
|
char *newMem = (char *)mdup ( sreq , need , "sreqbuf" );
|
|
if ( ! newMem ) {
|
|
log(LOG_WARN,"spider: skipping. could not alloc newMem. %s. ip=%s", sreq->m_url,iptoa(m_scanningIp,ipbuf) );
|
|
continue;
|
|
}
|
|
|
|
{
|
|
ScopedLock sl(m_winnerTree.getLock());
|
|
// add it to the tree of the top urls to spider
|
|
m_winnerTree.addNode_unlocked(0, (char *)&wk, (char *)newMem, need);
|
|
|
|
// set new tail priority and time for next compare
|
|
if (m_winnerTree.getNumUsedNodes_unlocked() >= maxWinners) {
|
|
// for the worst node in the tree...
|
|
int32_t tailNode = m_winnerTree.getLastNode_unlocked();
|
|
if (tailNode < 0) gbshutdownAbort(true);
|
|
// set new tail parms
|
|
const key192_t *tailKey = reinterpret_cast<const key192_t *>(m_winnerTree.getKey_unlocked(tailNode));
|
|
// convert to char first then to signed int32_t
|
|
parseWinnerTreeKey(tailKey, &m_tailIp, &m_tailPriority, &m_tailTimeMS, &m_tailUh48);
|
|
|
|
// sanity
|
|
if (m_tailIp != firstIp) {
|
|
gbshutdownAbort(true);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// if no spiderreply for the current url, invalidate this
|
|
m_lastReplyValid = false;
|
|
// if read is not yet done, save the reply in case next list needs it
|
|
if ( srep ) { // && ! m_isReadDone ) {
|
|
int32_t rsize = srep->getRecSize();
|
|
if((size_t)rsize > sizeof(m_lastReplyBuf))
|
|
gbshutdownAbort(true);
|
|
memmove ( m_lastReplyBuf, srep, rsize );
|
|
m_lastReplyValid = true;
|
|
}
|
|
|
|
logDebug(g_conf.m_logDebugSpider, "spider: Checked list of %" PRId32" spiderdb bytes (%" PRId32" recs) "
|
|
"for winners for firstip=%s. winnerTreeUsedNodes=%" PRId32" #newreqs=%" PRId64,
|
|
m_list.getListSize(), recCount,
|
|
iptoa(m_scanningIp,ipbuf), m_winnerTree.getNumUsedNodes(), m_totalNewSpiderRequests );
|
|
|
|
// reset any errno cuz we're just a cache
|
|
g_errno = 0;
|
|
|
|
|
|
/////
|
|
//
|
|
// BEGIN maintain firstip overflow list
|
|
//
|
|
/////
|
|
bool overflow = false;
|
|
// don't add any more outlinks to this firstip after we
|
|
// have 10M spider requests for it.
|
|
// lower for testing
|
|
//if ( m_totalNewSpiderRequests > 1 )
|
|
// @todo BR: Another hardcoded limit..
|
|
if ( m_totalNewSpiderRequests > 10000000 )
|
|
overflow = true;
|
|
|
|
// need space
|
|
if ( overflow && ! m_overflowList ) {
|
|
int32_t need = OVERFLOWLISTSIZE*4;
|
|
m_overflowList = (int32_t *)mmalloc(need,"list");
|
|
m_overflowList[0] = 0;
|
|
}
|
|
//
|
|
// ensure firstip is in the overflow list if we overflowed
|
|
int32_t emptySlot = -1;
|
|
bool found = false;
|
|
int32_t oi;
|
|
// if we dealt with this last round, we're done
|
|
if ( m_lastOverflowFirstIp == firstIp )
|
|
return true;
|
|
m_lastOverflowFirstIp = firstIp;
|
|
if ( overflow ) {
|
|
logDebug( g_conf.m_logDebugSpider, "spider: firstip %s overflowing with %" PRId32" new reqs",
|
|
iptoa(firstIp,ipbuf), (int32_t)m_totalNewSpiderRequests );
|
|
}
|
|
|
|
for ( oi = 0 ; ; oi++ ) {
|
|
// sanity
|
|
if ( ! m_overflowList ) break;
|
|
// an ip of zero is end of the list
|
|
if ( ! m_overflowList[oi] ) break;
|
|
// if already in there, we are done
|
|
if ( m_overflowList[oi] == firstIp ) {
|
|
found = true;
|
|
break;
|
|
}
|
|
// -1 means empty slot
|
|
if ( m_overflowList[oi] == -1 ) emptySlot = oi;
|
|
}
|
|
// if we need to add it...
|
|
if ( overflow && ! found && m_overflowList ) {
|
|
log(LOG_DEBUG,"spider: adding %s to overflow list",iptoa(firstIp,ipbuf));
|
|
// reset this little cache thingy
|
|
s_lastOut = 0;
|
|
// take the empty slot if there is one
|
|
if ( emptySlot >= 0 )
|
|
m_overflowList[emptySlot] = firstIp;
|
|
// or add to new slot. this is #defined to 200 last check
|
|
else if ( oi+1 < OVERFLOWLISTSIZE ) {
|
|
m_overflowList[oi] = firstIp;
|
|
m_overflowList[oi+1] = 0;
|
|
}
|
|
else
|
|
log(LOG_ERROR,"spider: could not add firstip %s to "
|
|
"overflow list, full.", iptoa(firstIp,ipbuf));
|
|
}
|
|
// ensure firstip is NOT in the overflow list if we are ok
|
|
for ( int32_t oi2 = 0 ; ! overflow ; oi2++ ) {
|
|
// sanity
|
|
if ( ! m_overflowList ) break;
|
|
// an ip of zero is end of the list
|
|
if ( ! m_overflowList[oi2] ) break;
|
|
// skip if not a match
|
|
if ( m_overflowList[oi2] != firstIp ) continue;
|
|
// take it out of list
|
|
m_overflowList[oi2] = -1;
|
|
log(LOG_DEBUG, "spider: removing %s from overflow list",iptoa(firstIp,ipbuf));
|
|
// reset this little cache thingy
|
|
s_lastIn = 0;
|
|
break;
|
|
}
|
|
/////
|
|
//
|
|
// END maintain firstip overflow list
|
|
//
|
|
/////
|
|
|
|
// ok we've updated m_bestRequest!!!
|
|
return true;
|
|
}
|
|
|
|
|
|
|
|
// . this is ONLY CALLED from evalIpLoop() above
|
|
// . add another 0 entry into waiting tree, unless we had no winner
|
|
// . add winner in here into doledb
|
|
// . returns false if blocked and doledWrapper() will be called
|
|
// . returns true and sets g_errno on error
|
|
bool SpiderColl::addWinnersIntoDoledb ( ) {
|
|
|
|
if ( g_errno ) {
|
|
log(LOG_ERROR,"spider: got error when trying to add winner to doledb: "
|
|
"%s",mstrerror(g_errno));
|
|
return true;
|
|
}
|
|
|
|
// gotta check this again since we might have done a QUICKPOLL() above
|
|
// to call g_process.shutdown() so now tree might be unwritable
|
|
if (g_process.isShuttingDown()) {
|
|
return true;
|
|
}
|
|
|
|
// ok, all done if nothing to add to doledb. i guess we were misled
|
|
// that firstIp had something ready for us. maybe the url filters
|
|
// table changed to filter/ban them all. if a new request/reply comes
|
|
// in for this firstIp then it will re-add an entry to waitingtree and
|
|
// we will re-scan spiderdb. if we had something to spider but it was
|
|
// in the future the m_minFutureTimeMS will be non-zero, and we deal
|
|
// with that below...
|
|
if (m_winnerTree.isEmpty() && ! m_minFutureTimeMS ) {
|
|
// if we received new incoming requests while we were
|
|
// scanning, which is happening for some crawls, then do
|
|
// not nuke! just repeat later in populateDoledbFromWaitingTree
|
|
if ( m_gotNewDataForScanningIp ) {
|
|
if ( g_conf.m_logDebugSpider )
|
|
log(LOG_DEBUG, "spider: received new requests, not "
|
|
"nuking misleading key");
|
|
return true;
|
|
}
|
|
// note it - this can happen if no more to spider right now!
|
|
char ipbuf[16];
|
|
if ( g_conf.m_logDebugSpider )
|
|
log(LOG_WARN, "spider: nuking misleading waitingtree key "
|
|
"firstIp=%s", iptoa(m_scanningIp,ipbuf));
|
|
|
|
ScopedLock sl(m_waitingTree.getLock());
|
|
|
|
m_waitingTree.deleteNode_unlocked(0, (char *)&m_waitingTreeKey, true);
|
|
m_waitingTreeKeyValid = false;
|
|
|
|
// note it
|
|
uint64_t timestamp64 = m_waitingTreeKey.n1;
|
|
timestamp64 <<= 32;
|
|
timestamp64 |= m_waitingTreeKey.n0 >> 32;
|
|
int32_t firstIp = m_waitingTreeKey.n0 &= 0xffffffff;
|
|
if ( g_conf.m_logDebugSpider )
|
|
log(LOG_DEBUG,"spider: removed2 time=%" PRId64" ip=%s from "
|
|
"waiting tree. nn=%" PRId32".",
|
|
timestamp64, iptoa(firstIp,ipbuf),
|
|
m_waitingTree.getNumUsedNodes_unlocked());
|
|
|
|
removeFromWaitingTable(firstIp);
|
|
return true;
|
|
}
|
|
|
|
// i've seen this happen, wtf?
|
|
if (m_winnerTree.isEmpty() && m_minFutureTimeMS ) {
|
|
// this will update the waiting tree key with minFutureTimeMS
|
|
addDoleBufIntoDoledb ( NULL , false );
|
|
return true;
|
|
}
|
|
|
|
///////////
|
|
//
|
|
// make winner tree into doledb list to add
|
|
//
|
|
///////////
|
|
// first 4 bytes is offset of next doledb record to add to doledb
|
|
// so we do not have to re-add the dolebuf to the cache and make it
|
|
// churn. it is really inefficient.
|
|
SafeBuf doleBuf;
|
|
doleBuf.pushLong(4);
|
|
|
|
// i am seeing dup uh48's in the m_winnerTree
|
|
int32_t firstIp = m_waitingTreeKey.n0 & 0xffffffff;
|
|
|
|
{
|
|
ScopedLock sl(m_winnerTree.getLock());
|
|
|
|
int32_t ntn = m_winnerTree.getNumNodes_unlocked();
|
|
|
|
HashTableX dedup;
|
|
dedup.set(8, 0, (int32_t)2 * ntn, NULL, 0, false, "windt");
|
|
|
|
int32_t added = 0;
|
|
for (int32_t node = m_winnerTree.getFirstNode_unlocked();
|
|
node >= 0; node = m_winnerTree.getNextNode_unlocked(node)) {
|
|
// get data for that
|
|
const SpiderRequest *sreq2 = reinterpret_cast<const SpiderRequest *>(m_winnerTree.getData_unlocked(node));
|
|
|
|
// sanity
|
|
if (sreq2->m_firstIp != firstIp) gbshutdownAbort(true);
|
|
//if ( sreq2->m_spiderTimeMS < 0 ) gbshutdownAbort(true);
|
|
if (sreq2->m_ufn < 0) gbshutdownAbort(true);
|
|
if (sreq2->m_priority == -1) gbshutdownAbort(true);
|
|
// check for errors
|
|
bool hadError = false;
|
|
// parse it up
|
|
int32_t winIp;
|
|
int32_t winPriority;
|
|
int64_t winSpiderTimeMS;
|
|
int64_t winUh48;
|
|
const key192_t *winKey = reinterpret_cast<const key192_t *>(m_winnerTree.getKey_unlocked(node));
|
|
parseWinnerTreeKey(winKey, &winIp, &winPriority, &winSpiderTimeMS, &winUh48);
|
|
|
|
// sanity
|
|
if (winIp != firstIp) gbshutdownAbort(true);
|
|
if (winUh48 != sreq2->getUrlHash48()) gbshutdownAbort(true);
|
|
|
|
// make the doledb key
|
|
key96_t doleKey = Doledb::makeKey(winPriority, winSpiderTimeMS / 1000, winUh48, false);
|
|
|
|
// dedup. if we add dups the problem is is that they
|
|
// overwrite the key in doledb yet the doleiptable count
|
|
// remains undecremented and doledb is empty and never
|
|
// replenished because the firstip can not be added to
|
|
// waitingTree because doleiptable count is > 0. this was
|
|
// causing spiders to hang for collections. i am not sure
|
|
// why we should be getting dups in winnertree because they
|
|
// have the same uh48 and that is the key in the tree.
|
|
if (dedup.isInTable(&winUh48)) {
|
|
log(LOG_WARN, "spider: got dup uh48=%" PRIu64" dammit", winUh48);
|
|
continue;
|
|
}
|
|
// count it
|
|
added++;
|
|
// do not allow dups
|
|
dedup.addKey(&winUh48);
|
|
// store doledb key first
|
|
if (!doleBuf.safeMemcpy(&doleKey, sizeof(key96_t)))
|
|
hadError = true;
|
|
// then size of spiderrequest
|
|
if (!doleBuf.pushLong(sreq2->getRecSize()))
|
|
hadError = true;
|
|
// then the spiderrequest encapsulated
|
|
if (!doleBuf.safeMemcpy(sreq2, sreq2->getRecSize()))
|
|
hadError = true;
|
|
// note and error
|
|
if (hadError) {
|
|
log(LOG_ERROR,"spider: error making doledb list: %s",
|
|
mstrerror(g_errno));
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
|
|
return addDoleBufIntoDoledb ( &doleBuf , false );
|
|
}
|
|
|
|
|
|
|
|
bool SpiderColl::validateDoleBuf(const SafeBuf *doleBuf) {
|
|
const char *doleBufEnd = doleBuf->getBufPtr();
|
|
// get offset
|
|
const char *pstart = doleBuf->getBufStart();
|
|
const char *p = pstart;
|
|
int32_t jump = *(int32_t *)p;
|
|
p += 4;
|
|
// sanity
|
|
if ( jump < 4 || jump > doleBuf->length() )
|
|
gbshutdownCorrupted();
|
|
bool gotIt = false;
|
|
for ( ; p < doleBuf->getBufPtr() ; ) {
|
|
if ( p == pstart + jump )
|
|
gotIt = true;
|
|
// first is doledbkey
|
|
p += sizeof(key96_t);
|
|
// then size of spider request
|
|
int32_t recSize = *(int32_t *)p;
|
|
p += 4;
|
|
// the spider request encapsulated
|
|
SpiderRequest *sreq3;
|
|
sreq3 = (SpiderRequest *)p;
|
|
// point "p" to next spiderrequest
|
|
if ( recSize != sreq3->getRecSize() ) gbshutdownCorrupted();
|
|
p += recSize;//sreq3->getRecSize();
|
|
// sanity
|
|
if ( p > doleBufEnd ) gbshutdownCorrupted();
|
|
if ( p < pstart ) gbshutdownCorrupted();
|
|
}
|
|
if ( ! gotIt ) gbshutdownCorrupted();
|
|
return true;
|
|
}
|
|
|
|
|
|
|
|
bool SpiderColl::addDoleBufIntoDoledb ( SafeBuf *doleBuf, bool isFromCache ) {
|
|
////////////////////
|
|
//
|
|
// UPDATE WAITING TREE ENTRY
|
|
//
|
|
// Normally the "spidertime" is 0 for a firstIp. This will make it
|
|
// a future time if it is not yet due for spidering.
|
|
//
|
|
////////////////////
|
|
|
|
int32_t firstIp = m_waitingTreeKey.n0 & 0xffffffff;
|
|
|
|
char ipbuf[16]; //for various log purposes
|
|
{
|
|
ScopedLock sl(m_waitingTree.getLock());
|
|
|
|
/// @todo ALC could we avoid locking winnerTree?
|
|
ScopedLock sl2(m_winnerTree.getLock());
|
|
|
|
// sanity check. how did this happen? it messes up our crawl!
|
|
// maybe a doledb add went through? so we should add again?
|
|
int32_t wn = m_waitingTree.getNode_unlocked(0, (char *)&m_waitingTreeKey);
|
|
if (wn < 0) {
|
|
log(LOG_ERROR,"spider: waiting tree key removed while reading list for "
|
|
"%s (%" PRId32")", m_coll, (int32_t)m_collnum);
|
|
return true;
|
|
}
|
|
|
|
// if best request has a future spiderTime, at least update
|
|
// the wait tree with that since we will not be doling this request
|
|
// right now.
|
|
if (m_winnerTree.isEmpty_unlocked() && m_minFutureTimeMS && !isFromCache) {
|
|
// save memory
|
|
m_winnerTree.reset_unlocked();
|
|
m_winnerTable.reset();
|
|
|
|
// if in the process of being added to doledb or in doledb...
|
|
if (isInDoledbIpTable(firstIp)) {
|
|
// sanity i guess. remove this line if it hits this!
|
|
log(LOG_ERROR, "spider: wtf????");
|
|
//gbshutdownLogicError();
|
|
return true;
|
|
}
|
|
|
|
// before you set a time too far into the future, if we
|
|
// did receive new spider requests, entertain those
|
|
if (m_gotNewDataForScanningIp &&
|
|
// we had twitter.com with a future spider date
|
|
// on the pe2 cluster but we kept hitting this, so
|
|
// don't do this anymore if we scanned a ton of bytes
|
|
// like we did for twitter.com because it uses all the
|
|
// resources when we can like 150MB of spider requests
|
|
// for a single firstip
|
|
m_totalBytesScanned < 30000) {
|
|
logDebug(g_conf.m_logDebugSpider, "spider: received new requests, not updating waiting tree with future time");
|
|
return true;
|
|
}
|
|
|
|
// get old time
|
|
uint64_t oldSpiderTimeMS = m_waitingTreeKey.n1;
|
|
oldSpiderTimeMS <<= 32;
|
|
oldSpiderTimeMS |= (m_waitingTreeKey.n0 >> 32);
|
|
// delete old node
|
|
if (wn >= 0) {
|
|
m_waitingTree.deleteNode_unlocked(wn, false);
|
|
}
|
|
|
|
// invalidate
|
|
m_waitingTreeKeyValid = false;
|
|
key96_t wk2 = makeWaitingTreeKey(m_minFutureTimeMS, firstIp);
|
|
|
|
logDebug(g_conf.m_logDebugSpider, "spider: scan replacing waitingtree key oldtime=%" PRIu32" newtime=%" PRIu32" firstip=%s",
|
|
(uint32_t)(oldSpiderTimeMS / 1000LL), (uint32_t)(m_minFutureTimeMS / 1000LL), iptoa(firstIp,ipbuf));
|
|
|
|
// this should never fail since we deleted one above
|
|
m_waitingTree.addKey_unlocked(&wk2);
|
|
|
|
logDebug(g_conf.m_logDebugSpider, "spider: RE-added time=%" PRId64" ip=%s to waiting tree node",
|
|
m_minFutureTimeMS, iptoa(firstIp,ipbuf));
|
|
|
|
// keep the table in sync now with the time
|
|
addToWaitingTable(firstIp, m_minFutureTimeMS);
|
|
return true;
|
|
}
|
|
}
|
|
|
|
char *doleBufEnd = doleBuf->getBufPtr();
|
|
|
|
// add it to doledb ip table now so that waiting tree does not
|
|
// immediately get another spider request from this same ip added
|
|
// to it while the msg4 is out. but if add failes we totally bail
|
|
// with g_errno set
|
|
|
|
// . MDW: now we have a list of doledb records in a SafeBuf:
|
|
// . scan the requests in safebuf
|
|
|
|
// get offset
|
|
char *p = doleBuf->getBufStart();
|
|
int32_t jump = *(int32_t *)p;
|
|
// sanity
|
|
if ( jump < 4 || jump > doleBuf->length() ) {
|
|
gbshutdownCorrupted(); }
|
|
// the jump includes itself
|
|
p += jump;
|
|
//for ( ; p < m_doleBuf.getBuf() ; ) {
|
|
// save it
|
|
char *doledbRec = p;
|
|
// first is doledbkey
|
|
p += sizeof(key96_t);
|
|
// then size of spider request
|
|
p += 4;
|
|
// the spider request encapsulated
|
|
SpiderRequest *sreq3;
|
|
sreq3 = (SpiderRequest *)p;
|
|
// point "p" to next spiderrequest
|
|
p += sreq3->getRecSize();
|
|
|
|
// sanity
|
|
if ( p > doleBufEnd ) gbshutdownCorrupted();
|
|
|
|
// for caching logic below, set this
|
|
int32_t doledbRecSize = sizeof(key96_t) + 4 + sreq3->getRecSize();
|
|
// process sreq3 my incrementing the firstip count in
|
|
// m_doledbIpTable
|
|
if ( !addToDoledbIpTable(sreq3) ) return true;
|
|
|
|
// now cache the REST of the spider requests to speed up scanning.
|
|
// better than adding 400 recs per firstip to doledb because
|
|
// msg5's call to RdbTree::getList() is way faster.
|
|
// even if m_doleBuf is from the cache, re-add it to lose the
|
|
// top rec.
|
|
// allow this to add a 0 length record otherwise we keep the same
|
|
// old url in here and keep spidering it over and over again!
|
|
|
|
// remove from cache? if we added the last spider request in the
|
|
// cached dolebuf to doledb then remove it from cache so it's not
|
|
// a cached empty dolebuf and we recompute it not using the cache.
|
|
if ( p >= doleBufEnd ) {
|
|
FxBlobCacheLock<int32_t> rcl(g_spiderLoop.m_winnerListCache);
|
|
g_spiderLoop.m_winnerListCache.remove(firstIp);
|
|
} else {
|
|
// insert (or replace) the3 list in the cache
|
|
char *x = doleBuf->getBufStart();
|
|
// the new offset is the next record after the one we
|
|
// just added to doledb
|
|
int32_t newJump = (int32_t)(p - x);
|
|
int32_t oldJump = *(int32_t *)x;
|
|
// NO! we do a copy in rdbcache and copy the thing over
|
|
// since we promote it. so this won't work...
|
|
*(int32_t *)x = newJump;
|
|
if ( newJump >= doleBuf->length() ) gbshutdownCorrupted();
|
|
if ( newJump < 4 ) gbshutdownCorrupted();
|
|
logDebug(g_conf.m_logDebugSpider, "spider: rdbcache: updating %" PRId32" bytes of SpiderRequests "
|
|
"to winnerlistcache for ip %s oldjump=%" PRId32" newJump=%" PRId32" ptr=%p",
|
|
doleBuf->length(),iptoa(firstIp,ipbuf),oldJump, newJump, x);
|
|
//validateDoleBuf ( doleBuf );
|
|
// inherit timestamp. if 0, RdbCache will set to current time
|
|
// don't re-add just use the same modified buffer so we
|
|
// don't churn the cache.
|
|
// but do add it to cache if not already in there yet.
|
|
FxBlobCacheLock<int32_t> rcl(g_spiderLoop.m_winnerListCache);
|
|
g_spiderLoop.m_winnerListCache.insert(firstIp, doleBuf->getBufStart(), doleBuf->length());
|
|
}
|
|
|
|
// keep it on stack now that doledb is tree-only
|
|
RdbList tmpList;
|
|
tmpList.setFromPtr ( doledbRec , doledbRecSize , RDB_DOLEDB );
|
|
|
|
// now that doledb is tree-only and never dumps to disk, just
|
|
// add it directly
|
|
g_doledb.getRdb()->addList(m_collnum, &tmpList);
|
|
|
|
logDebug(g_conf.m_logDebugSpider, "spider: adding doledb tree node size=%" PRId32, doledbRecSize);
|
|
|
|
int32_t storedFirstIp = (m_waitingTreeKey.n0) & 0xffffffff;
|
|
|
|
// log it
|
|
if ( g_conf.m_logDebugSpcache ) {
|
|
uint64_t spiderTimeMS = m_waitingTreeKey.n1;
|
|
spiderTimeMS <<= 32;
|
|
spiderTimeMS |= (m_waitingTreeKey.n0 >> 32);
|
|
logf(LOG_DEBUG,"spider: removing doled waitingtree key"
|
|
" spidertime=%" PRIu64" firstIp=%s "
|
|
,spiderTimeMS,
|
|
iptoa(storedFirstIp,ipbuf)
|
|
);
|
|
}
|
|
|
|
{
|
|
ScopedLock sl(m_waitingTree.getLock());
|
|
|
|
// before adding to doledb remove from waiting tree so we do not try
|
|
// to readd to doledb...
|
|
m_waitingTree.deleteNode_unlocked(0, (char *)&m_waitingTreeKey, true);
|
|
removeFromWaitingTable(storedFirstIp);
|
|
}
|
|
|
|
// invalidate
|
|
m_waitingTreeKeyValid = false;
|
|
|
|
// note that ip as being in dole table
|
|
if ( g_conf.m_logDebugSpider )
|
|
log(LOG_WARN, "spider: added best sreq for ip=%s to doletable AND "
|
|
"removed from waiting table",
|
|
iptoa(firstIp,ipbuf));
|
|
|
|
// save memory
|
|
m_winnerTree.reset();
|
|
m_winnerTable.reset();
|
|
|
|
//validateDoleBuf( doleBuf );
|
|
|
|
// add did not block
|
|
return true;
|
|
}
|
|
|
|
|
|
uint64_t SpiderColl::getSpiderTimeMS(const SpiderRequest *sreq, int32_t ufn, const SpiderReply *srep, int64_t nowMS) {
|
|
// . get the scheduled spiderTime for it
|
|
// . assume this SpiderRequest never been successfully spidered
|
|
int64_t spiderTimeMS = ((uint64_t)sreq->m_addedTime) * 1000LL;
|
|
|
|
// if injecting for first time, use that!
|
|
if (!srep && sreq->m_isInjecting) {
|
|
return spiderTimeMS;
|
|
}
|
|
|
|
if (sreq->m_isPageReindex) {
|
|
int64_t nextReindexTimeMS = m_lastReindexTimeMS + m_cr->m_spiderReindexDelayMS;
|
|
if (nextReindexTimeMS > nowMS) {
|
|
return nextReindexTimeMS;
|
|
}
|
|
|
|
m_lastReindexTimeMS = nowMS;
|
|
|
|
return nextReindexTimeMS;
|
|
}
|
|
|
|
// to avoid hammering an ip, get last time we spidered it...
|
|
RdbCacheLock rcl(m_lastDownloadCache);
|
|
int64_t lastMS = m_lastDownloadCache.getLongLong(m_collnum, sreq->m_firstIp, -1, true);
|
|
rcl.unlock();
|
|
|
|
// -1 means not found
|
|
if ((int64_t)lastMS == -1) {
|
|
lastMS = 0;
|
|
}
|
|
|
|
// sanity
|
|
if ((int64_t)lastMS < -1) {
|
|
log(LOG_ERROR,"spider: corrupt last time in download cache. nuking.");
|
|
lastMS = 0;
|
|
}
|
|
|
|
// min time we can spider it
|
|
int64_t minSpiderTimeMS1 = lastMS + m_cr->m_spiderIpWaits[ufn];
|
|
|
|
/////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////
|
|
// crawldelay table check!!!!
|
|
/////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////
|
|
|
|
int64_t minSpiderTimeMS2 = 0;
|
|
|
|
{
|
|
ScopedLock sl(m_cdTableMtx);
|
|
int32_t *cdp = (int32_t *)m_cdTable.getValue(&sreq->m_domHash32);
|
|
if (cdp && *cdp >= 0) minSpiderTimeMS2 = lastMS + *cdp;
|
|
}
|
|
|
|
// ensure min
|
|
if ( spiderTimeMS < minSpiderTimeMS1 ) spiderTimeMS = minSpiderTimeMS1;
|
|
if ( spiderTimeMS < minSpiderTimeMS2 ) spiderTimeMS = minSpiderTimeMS2;
|
|
|
|
// if no reply, use that
|
|
if (!srep) {
|
|
return spiderTimeMS;
|
|
}
|
|
|
|
// if this is not the first try, then re-compute the spiderTime
|
|
// based on that last time
|
|
// sanity check
|
|
if ( srep->m_spideredTime <= 0 ) {
|
|
// a lot of times these are corrupt! wtf???
|
|
//spiderTimeMS = minSpiderTimeMS;
|
|
return spiderTimeMS;
|
|
//gbshutdownAbort(true);
|
|
}
|
|
// compute new spiderTime for this guy, in seconds
|
|
int64_t waitInSecs = (uint64_t)(m_cr->m_spiderFreqs[ufn]*3600*24.0);
|
|
|
|
// when it was spidered
|
|
int64_t lastSpideredMS = ((uint64_t)srep->m_spideredTime) * 1000;
|
|
// . when we last attempted to spider it... (base time)
|
|
// . use a lastAttempt of 0 to indicate never!
|
|
// (first time)
|
|
int64_t minSpiderTimeMS3 = lastSpideredMS + (waitInSecs * 1000LL);
|
|
// ensure min
|
|
if ( spiderTimeMS < minSpiderTimeMS3 ) spiderTimeMS = minSpiderTimeMS3;
|
|
// sanity
|
|
if ( (int64_t)spiderTimeMS < 0 ) { gbshutdownAbort(true); }
|
|
|
|
return spiderTimeMS;
|
|
}
|
|
|
|
// . decrement priority
|
|
// . will also set m_sc->m_nextDoledbKey
|
|
// . will also set m_sc->m_msg5StartKey
|
|
void SpiderColl::devancePriority() {
|
|
// try next
|
|
m_pri2 = m_pri2 - 1;
|
|
// how can this happen?
|
|
if ( m_pri2 < -1 ) m_pri2 = -1;
|
|
// bogus?
|
|
if ( m_pri2 < 0 ) return;
|
|
// set to next priority otherwise
|
|
m_nextDoledbKey = m_nextKeys [m_pri2];
|
|
// and the read key
|
|
m_msg5StartKey = m_nextDoledbKey;
|
|
}
|
|
|
|
|
|
|
|
void SpiderColl::setPriority(int32_t pri) {
|
|
m_pri2 = pri;
|
|
m_nextDoledbKey = m_nextKeys [ m_pri2 ];
|
|
m_msg5StartKey = m_nextDoledbKey;
|
|
}
|
|
|
|
bool SpiderColl::tryToDeleteSpiderColl ( SpiderColl *sc , const char *msg ) {
|
|
// if not being deleted return false
|
|
if ( ! sc->m_deleteMyself ) return false;
|
|
// otherwise always return true
|
|
if ( sc->m_isLoading ) {
|
|
log(LOG_INFO, "spider: deleting sc=%p for collnum=%" PRId32" "
|
|
"waiting3",
|
|
sc,(int32_t)sc->m_collnum);
|
|
return true;
|
|
}
|
|
// if ( sc->m_gettingWaitingTreeList ) {
|
|
// log(LOG_INFO, "spider: deleting sc=%p for collnum=%" PRId32"
|
|
//"waiting6",
|
|
// (int32_t)sc,(int32_t)sc->m_collnum);
|
|
// return true;
|
|
// }
|
|
// there's still a core of someone trying to write to someting
|
|
// in "sc" so we have to try to fix that. somewhere in xmldoc.cpp
|
|
// or spider.cpp. everyone should get sc from cr everytime i'd think
|
|
log(LOG_INFO, "spider: deleting sc=%p for collnum=%" PRId32" (msg=%s)",
|
|
sc,(int32_t)sc->m_collnum,msg);
|
|
// . make sure nobody has it
|
|
// . cr might be NULL because Collectiondb.cpp::deleteRec2() might
|
|
// have nuked it
|
|
//CollectionRec *cr = sc->m_cr;
|
|
// use fake ptrs for easier debugging
|
|
//if ( cr ) cr->m_spiderColl = (SpiderColl *)0x987654;//NULL;
|
|
mdelete ( sc , sizeof(SpiderColl),"postdel1");
|
|
delete ( sc );
|
|
return true;
|
|
}
|
|
|
|
|
|
// . returns false with g_errno set on error
|
|
// . Rdb.cpp should call this when it receives a doledb key
|
|
// . when trying to add a SpiderRequest to the waiting tree we first check
|
|
// the doledb table to see if doledb already has an sreq from this firstIp
|
|
// . therefore, we should add the ip to the dole table before we launch the
|
|
// Msg4 request to add it to doledb, that way we don't add a bunch from the
|
|
// same firstIP to doledb
|
|
bool SpiderColl::addToDoledbIpTable(const SpiderRequest *sreq) {
|
|
ScopedLock sl(m_doledbIpTableMtx);
|
|
|
|
// update how many per ip we got doled
|
|
int32_t *score = (int32_t *)m_doledbIpTable.getValue32 ( sreq->m_firstIp );
|
|
// debug point
|
|
if ( g_conf.m_logDebugSpider ){//&&1==2 ) { // disable for now, spammy
|
|
int64_t uh48 = sreq->getUrlHash48();
|
|
int64_t pdocid = sreq->getParentDocId();
|
|
int32_t ss = 1;
|
|
if ( score ) ss = *score + 1;
|
|
// if for some reason this collides with another key
|
|
// already in doledb then our counts are off
|
|
char ipbuf[16];
|
|
log(LOG_DEBUG, "spider: added to doletbl uh48=%" PRIu64" parentdocid=%" PRIu64" "
|
|
"ipdolecount=%" PRId32" ufn=%" PRId32" priority=%" PRId32" firstip=%s",
|
|
uh48,pdocid,ss,(int32_t)sreq->m_ufn,(int32_t)sreq->m_priority,
|
|
iptoa(sreq->m_firstIp,ipbuf));
|
|
}
|
|
|
|
|
|
// we had a score there already, so inc it
|
|
if ( score ) {
|
|
// inc it
|
|
*score = *score + 1;
|
|
// sanity check
|
|
if ( *score <= 0 ) { gbshutdownAbort(true); }
|
|
// only one per ip!
|
|
// not any more! we allow MAX_WINNER_NODES per ip!
|
|
char ipbuf[16];
|
|
if ( *score > MAX_WINNER_NODES )
|
|
log(LOG_ERROR,"spider: crap. had %" PRId32" recs in doledb for %s "
|
|
"from %s."
|
|
"how did this happen?",
|
|
(int32_t)*score,m_coll,iptoa(sreq->m_firstIp,ipbuf));
|
|
// now we log it too
|
|
if ( g_conf.m_logDebugSpider )
|
|
log(LOG_DEBUG,"spider: added ip=%s to doleiptable "
|
|
"(score=%" PRId32")",
|
|
iptoa(sreq->m_firstIp,ipbuf),*score);
|
|
}
|
|
else {
|
|
// ok, add new slot
|
|
int32_t val = 1;
|
|
char ipbuf[16];
|
|
if ( ! m_doledbIpTable.addKey ( &sreq->m_firstIp , &val ) ) {
|
|
// log it, this is bad
|
|
log(LOG_ERROR,"spider: failed to add ip %s to dole ip tbl",
|
|
iptoa(sreq->m_firstIp,ipbuf));
|
|
// return true with g_errno set on error
|
|
return false;
|
|
}
|
|
// now we log it too
|
|
if ( g_conf.m_logDebugSpider )
|
|
log(LOG_DEBUG,"spider: added ip=%s to doleiptable "
|
|
"(score=1)",iptoa(sreq->m_firstIp,ipbuf));
|
|
}
|
|
|
|
// . these priority slots in doledb are not empty
|
|
// . unmark individual priority buckets
|
|
// . do not skip them when scanning for urls to spiderd
|
|
int32_t pri = sreq->m_priority;
|
|
|
|
// reset scan for this priority in doledb
|
|
m_nextKeys[pri] = Doledb::makeFirstKey2 ( pri );
|
|
|
|
return true;
|
|
}
|
|
|
|
void SpiderColl::removeFromDoledbIpTable(int32_t firstIp) {
|
|
ScopedLock sl(m_doledbIpTableMtx);
|
|
|
|
// . decrement doledb table ip count for firstIp
|
|
// . update how many per ip we got doled
|
|
int32_t *score = (int32_t *)m_doledbIpTable.getValue32 ( firstIp );
|
|
|
|
// wtf! how did this spider without being doled?
|
|
if ( ! score ) {
|
|
//if ( ! srep->m_fromInjectionRequest )
|
|
char ipbuf[16];
|
|
log(LOG_ERROR, "spider: corruption. received spider reply whose "
|
|
"ip has no entry in dole ip table. firstip=%s",
|
|
iptoa(firstIp,ipbuf));
|
|
return;
|
|
}
|
|
|
|
// reduce it
|
|
*score = *score - 1;
|
|
|
|
// now we log it too
|
|
if ( g_conf.m_logDebugSpider ) {
|
|
char ipbuf[16];
|
|
log(LOG_DEBUG,"spider: removed ip=%s from doleiptable (newcount=%" PRId32")",
|
|
iptoa(firstIp,ipbuf), *score);
|
|
}
|
|
|
|
|
|
// remove if zero
|
|
if ( *score == 0 ) {
|
|
// this can file if writes are disabled on this hashtablex
|
|
// because it is saving
|
|
m_doledbIpTable.removeKey ( &firstIp );
|
|
}
|
|
// wtf!
|
|
if ( *score < 0 ) { gbshutdownAbort(true); }
|
|
// all done?
|
|
if ( g_conf.m_logDebugSpider ) {
|
|
// log that too!
|
|
char ipbuf[16];
|
|
logf(LOG_DEBUG,"spider: discounting firstip=%s to %" PRId32,
|
|
iptoa(firstIp,ipbuf),*score);
|
|
}
|
|
}
|
|
|
|
int32_t SpiderColl::getDoledbIpTableCount() const {
|
|
ScopedLock sl(m_doledbIpTableMtx);
|
|
return m_doledbIpTable.getNumUsedSlots();
|
|
}
|
|
|
|
bool SpiderColl::isInDoledbIpTable(int32_t firstIp) const {
|
|
ScopedLock sl(m_doledbIpTableMtx);
|
|
return m_doledbIpTable.isInTable(&firstIp);
|
|
}
|
|
|
|
bool SpiderColl::isDoledbIpTableEmpty() const {
|
|
ScopedLock sl(m_doledbIpTableMtx);
|
|
return m_doledbIpTable.isEmpty();
|
|
}
|
|
|
|
void SpiderColl::clearDoledbIpTable() {
|
|
ScopedLock sl(m_doledbIpTableMtx);
|
|
m_doledbIpTable.clear();
|
|
}
|
|
|
|
std::vector<uint32_t> SpiderColl::getDoledbIpTable() const {
|
|
ScopedLock sl(m_doledbIpTableMtx);
|
|
std::vector<uint32_t> r;
|
|
r.reserve(m_doledbIpTable.getNumUsedSlots());
|
|
for(int slotNumber=0; slotNumber<m_doledbIpTable.getNumSlots(); slotNumber++) {
|
|
if(!m_doledbIpTable.isEmpty(slotNumber)) {
|
|
r.push_back(*(const uint32_t*)m_doledbIpTable.getKeyFromSlot(slotNumber));
|
|
}
|
|
}
|
|
return r;
|
|
}
|
|
|
|
bool SpiderColl::addToWaitingTable(int32_t firstIp, int64_t timeMs) {
|
|
ScopedLock sl(m_waitingTableMtx);
|
|
return m_waitingTable.addKey(&firstIp, &timeMs);
|
|
}
|
|
|
|
bool SpiderColl::getFromWaitingTable(int32_t firstIp, int64_t *timeMs) {
|
|
ScopedLock sl(m_waitingTableMtx);
|
|
|
|
int32_t ws = m_waitingTable.getSlot(&firstIp);
|
|
if (ws < 0) {
|
|
return false;
|
|
}
|
|
|
|
*timeMs = m_waitingTable.getScore64FromSlot(ws);
|
|
return true;
|
|
}
|
|
|
|
void SpiderColl::removeFromWaitingTable(int32_t firstIp) {
|
|
ScopedLock sl(m_waitingTableMtx);
|
|
m_waitingTable.removeKey(&firstIp);
|
|
}
|
|
|
|
int32_t SpiderColl::getWaitingTableCount() const {
|
|
ScopedLock sl(m_waitingTableMtx);
|
|
return m_waitingTable.getNumUsedSlots();
|
|
}
|
|
|
|
bool SpiderColl::isInWaitingTable(int32_t firstIp) const {
|
|
ScopedLock sl(m_waitingTableMtx);
|
|
return m_waitingTable.isInTable(&firstIp);
|
|
}
|
|
|
|
bool SpiderColl::setWaitingTableSize(int32_t numSlots) {
|
|
ScopedLock sl(m_waitingTableMtx);
|
|
return m_waitingTable.setTableSize(numSlots, NULL, 0);
|
|
}
|
|
|
|
void SpiderColl::clearWaitingTable() {
|
|
ScopedLock sl(m_waitingTableMtx);
|
|
m_waitingTable.clear();
|
|
}
|