23fc5d0e23
The functions didn't have anything to do with Titledb directly, and moving them out will make the static / dynamic domain-list easier to implement.
529 lines
14 KiB
C++
529 lines
14 KiB
C++
// TODO: if the first 20 or so do NOT have the same hostname, then stop
|
|
// and set all clusterRecs to CR_OK
|
|
|
|
#include "Msg51.h"
|
|
|
|
#include "Clusterdb.h"
|
|
#include "Stats.h"
|
|
#include "HashTableT.h"
|
|
#include "HashTableX.h"
|
|
#include "RdbCache.h"
|
|
#include "ScopedLock.h"
|
|
#include "Sanity.h"
|
|
#include "Titledb.h"
|
|
#include "Collectiondb.h"
|
|
#include "UdpServer.h"
|
|
#include "Conf.h"
|
|
#include "Errno.h"
|
|
#include "Docid.h"
|
|
|
|
|
|
// how many Msg0 requests can we launch at the same time?
|
|
#define MSG51_MAX_REQUESTS 60
|
|
|
|
static const int signature_init = 0xe1d3c5b7;
|
|
|
|
// . these must be 1-1 with the enums above
|
|
// . used for titling the counts of g_stats.m_filterStats[]
|
|
const char * const g_crStrings[] = {
|
|
"cluster rec not found" , // 0
|
|
"uninitialized" ,
|
|
"got clusterdb record" ,
|
|
"has adult bit" ,
|
|
"has wrong language" ,
|
|
"clustered" ,
|
|
"malformed url" ,
|
|
"banned url" ,
|
|
"empty title and summary",
|
|
"summary error" ,
|
|
"duplicate" ,
|
|
"clusterdb error (subcount of visible)" ,
|
|
"duplicate url",
|
|
"empty redirection page" ,
|
|
"visible" ,
|
|
"blacklisted" ,
|
|
"ruleset filtered" ,
|
|
"malicious",
|
|
"end -- do not use"
|
|
};
|
|
|
|
RdbCache s_clusterdbQuickCache;
|
|
static bool s_cacheInit = false;
|
|
|
|
|
|
Msg51::Msg51() : m_slot(NULL), m_numSlots(0)
|
|
{
|
|
m_clusterRecs = NULL;
|
|
m_clusterLevels = NULL;
|
|
pthread_mutex_init(&m_mtx,NULL);
|
|
set_signature();
|
|
reset();
|
|
}
|
|
|
|
Msg51::~Msg51 ( ) {
|
|
reset();
|
|
clear_signature();
|
|
}
|
|
|
|
void Msg51::reset ( ) {
|
|
m_clusterRecs = NULL;
|
|
m_clusterLevels = NULL;
|
|
m_numSlots = 0;
|
|
if( m_slot ) {
|
|
delete[] m_slot;
|
|
m_slot = NULL;
|
|
}
|
|
|
|
// Coverity
|
|
m_docIds = NULL;
|
|
m_numDocIds = 0;
|
|
m_callback = NULL;
|
|
m_state = NULL;
|
|
m_nexti = 0;
|
|
m_numRequests = 0;
|
|
m_numReplies = 0;
|
|
m_errno = 0;
|
|
m_niceness = 0;
|
|
m_collnum = 0;
|
|
m_isDebug = false;
|
|
}
|
|
|
|
|
|
// . returns false if blocked, true otherwise
|
|
// . sets g_errno on error
|
|
bool Msg51::getClusterRecs ( const int64_t *docIds,
|
|
char *clusterLevels ,
|
|
key96_t *clusterRecs ,
|
|
int32_t numDocIds ,
|
|
collnum_t collnum ,
|
|
void *state ,
|
|
void (* callback)( void *state ) ,
|
|
int32_t niceness ,
|
|
// output
|
|
bool isDebug )
|
|
{
|
|
verify_signature();
|
|
// warning
|
|
if ( collnum < 0 ) log(LOG_LOGIC,"net: NULL collection. msg51.");
|
|
|
|
// reset this msg
|
|
reset();
|
|
|
|
// get the collection rec
|
|
CollectionRec *cr = g_collectiondb.getRec ( collnum );
|
|
if ( ! cr ) {
|
|
log("db: msg51. Collection rec null for collnum %" PRId32".",
|
|
(int32_t)collnum);
|
|
g_errno = EBADENGINEER;
|
|
gbshutdownLogicError();
|
|
}
|
|
// keep a pointer for the caller
|
|
m_state = state;
|
|
m_callback = callback;
|
|
m_collnum = collnum;
|
|
// these are storage for the requester
|
|
m_docIds = docIds;
|
|
m_clusterLevels = clusterLevels;
|
|
m_clusterRecs = clusterRecs;
|
|
m_numDocIds = numDocIds;
|
|
m_isDebug = isDebug;
|
|
|
|
// bail if none to do
|
|
if ( m_numDocIds <= 0 ) return true;
|
|
|
|
m_nexti = 0;
|
|
// for i/o mostly
|
|
m_niceness = niceness;
|
|
m_errno = 0;
|
|
|
|
// reset these
|
|
m_numRequests = 0;
|
|
m_numReplies = 0;
|
|
// clear/initialize these
|
|
if(m_numDocIds<MSG51_MAX_REQUESTS)
|
|
m_numSlots = m_numDocIds;
|
|
else
|
|
m_numSlots = MSG51_MAX_REQUESTS;
|
|
m_slot = new Slot[m_numSlots];
|
|
for ( int32_t i = 0 ; i < m_numSlots ; i++ ) {
|
|
m_slot[i].m_msg51 = this;
|
|
m_slot[i].m_inUse = false;
|
|
}
|
|
// . do gathering
|
|
// . returns false if blocked, true otherwise
|
|
// . send up to MSG51_MAX_REQUESTS requests at the same time
|
|
return sendRequests ( -1 );
|
|
}
|
|
|
|
|
|
bool Msg51::sendRequests(int32_t k) {
|
|
verify_signature();
|
|
ScopedLock sl(m_mtx);
|
|
return sendRequests_unlocked(k);
|
|
}
|
|
|
|
// . returns false if blocked, true otherwise
|
|
// . sets g_errno on error (and m_errno)
|
|
// . k is a hint of which msg0 to use
|
|
// . if k is -1 we do a complete scan to find available m_msg0[x]
|
|
bool Msg51::sendRequests_unlocked(int32_t k) {
|
|
verify_signature();
|
|
|
|
bool anyAsyncRequests = false;
|
|
sendLoop:
|
|
|
|
// bail if no slots available
|
|
if ( m_numRequests - m_numReplies >= m_numSlots ) return false;
|
|
|
|
// any requests left to send?
|
|
if ( m_nexti >= m_numDocIds ) {
|
|
if ( anyAsyncRequests ) //we started an async request
|
|
return false;
|
|
if ( m_numRequests > m_numReplies ) //still waiting for replies
|
|
return false;
|
|
// we are done!
|
|
return true;
|
|
}
|
|
|
|
// sanity check
|
|
if ( m_clusterLevels[m_nexti] < 0 ||
|
|
m_clusterLevels[m_nexti] >= CR_END ) {
|
|
gbshutdownLogicError(); }
|
|
|
|
// skip if we already got the rec for this guy!
|
|
if ( m_clusterLevels[m_nexti] != CR_UNINIT ) {
|
|
m_nexti++;
|
|
goto sendLoop;
|
|
}
|
|
|
|
// . check our quick local cache to see if we got it
|
|
// . use a max age of 1 hour
|
|
// . this cache is primarly meant to avoid repetetive lookups
|
|
// when going to the next tier in Msg3a and re-requesting cluster
|
|
// recs for the same docids we did a second ago
|
|
if(s_cacheInit) {
|
|
int32_t crecSize;
|
|
char *crecPtr = NULL;
|
|
key96_t ckey = (key96_t)m_docIds[m_nexti];
|
|
|
|
RdbCacheLock rcl(s_clusterdbQuickCache);
|
|
bool found = s_clusterdbQuickCache.getRecord(m_collnum,
|
|
ckey , // cache key
|
|
&crecPtr , // pointer to it
|
|
&crecSize,
|
|
false , // do copy?
|
|
3600 , // max age in secs
|
|
true , // inc counts?
|
|
NULL );// cachedTime
|
|
if ( found ) {
|
|
if ( crecSize != sizeof(key96_t) ) gbshutdownLogicError();
|
|
m_clusterRecs[m_nexti] = *(key96_t *)crecPtr;
|
|
// it is no longer CR_UNINIT, we got the rec now
|
|
m_clusterLevels[m_nexti] = CR_GOT_REC;
|
|
//logf(LOG_DEBUG,"query: msg51 getRec k.n0=%" PRIu64" rec.n0=%" PRIu64, ckey.n0,m_clusterRecs[m_nexti].n0);
|
|
m_nexti++;
|
|
goto sendLoop;
|
|
}
|
|
}
|
|
|
|
// . do not hog all the udpserver's slots!
|
|
// . must have at least one outstanding reply so we can process
|
|
// his reply and come back here...
|
|
if ( g_udpServer.getNumUsedSlots() > 1000 &&
|
|
m_numRequests > m_numReplies ) return false;
|
|
|
|
// find empty slot
|
|
int32_t slot ;
|
|
|
|
// ignore bogus hints
|
|
if ( k >= m_numSlots ) k = -1;
|
|
|
|
// if hint was provided use that
|
|
if ( k >= 0 && ! m_slot[k].m_inUse )
|
|
slot = k;
|
|
// otherwise, do a scan for the empty slot
|
|
else {
|
|
for ( slot = 0 ; slot < m_numSlots ; slot++ )
|
|
// break out if available
|
|
if(!m_slot[slot].m_inUse)
|
|
break;
|
|
}
|
|
|
|
// sanity check -- must have one!!
|
|
if ( slot >= m_numSlots ) gbshutdownLogicError();
|
|
|
|
// send it, returns false if blocked, true otherwise
|
|
if( !sendRequest(slot) )
|
|
anyAsyncRequests = true;
|
|
|
|
// update any hint to make our loop more efficient
|
|
if ( k >= 0 ) k++;
|
|
|
|
goto sendLoop;
|
|
}
|
|
|
|
// . send using m_msg0s[i] class
|
|
bool Msg51::sendRequest ( int32_t i ) {
|
|
// what is the docid?
|
|
int64_t d;
|
|
|
|
// save it
|
|
int32_t ci = m_nexti;
|
|
// what's the docid?
|
|
d = m_docIds[m_nexti];
|
|
// advance so we do not do this docid again
|
|
m_nexti++;
|
|
|
|
m_slot[i].m_ci = ci;
|
|
m_slot[i].m_inUse = true;
|
|
// count it
|
|
m_numRequests++;
|
|
// lookup in clusterdb, need a start and endkey
|
|
key96_t startKey = Clusterdb::makeFirstClusterRecKey ( d );
|
|
key96_t endKey = Clusterdb::makeLastClusterRecKey ( d );
|
|
|
|
// . send the request for the cluster rec, use Msg0
|
|
// . returns false and sets g_errno on error
|
|
// . otherwise, it blocks and returns true
|
|
bool s = m_slot[i].m_msg0.getList( -1 , // hostid
|
|
RDB_CLUSTERDB ,
|
|
m_collnum ,
|
|
&m_slot[i].m_list,
|
|
(char *)&startKey ,
|
|
(char *)&endKey ,
|
|
36 , // minRecSizes
|
|
&m_slot[i], // state
|
|
gotClusterRecWrapper51 ,
|
|
m_niceness ,
|
|
true , // doErrorCorrection
|
|
true , // includeTree
|
|
-1 , // firstHostId
|
|
0 , // startFileNum
|
|
-1 , // numFiles
|
|
30000 , // timeout
|
|
false , // isRealMerge?
|
|
false , // noSplit?
|
|
-1 );// forceParitySplit
|
|
|
|
// loop for more if blocked, slot #i is used, block it
|
|
//if ( ! s ) { i++; continue; }
|
|
if ( ! s ) {
|
|
// only wanted this for faster disk page cache hitting so make
|
|
// sure it is not "double used" by another msg0
|
|
//m_msg0[i].m_msg5 = NULL;
|
|
return false;
|
|
}
|
|
// otherwise, process the response
|
|
gotClusterRec ( &m_slot[i] );
|
|
return true;
|
|
}
|
|
|
|
void Msg51::gotClusterRecWrapper51(void *state) {
|
|
Slot *slot = static_cast<Slot*>(state);
|
|
Msg51 *THIS = slot->m_msg51;
|
|
verify_signature_at(THIS->signature);
|
|
{
|
|
ScopedLock sl(THIS->m_mtx);
|
|
// process it
|
|
THIS->gotClusterRec(slot);
|
|
// get slot number for re-send on this slot
|
|
int32_t k = (int32_t)(slot-THIS->m_slot);
|
|
// . if not all done, launch the next one
|
|
// . this returns false if blocks, true otherwise
|
|
if ( ! THIS->sendRequests_unlocked(k) ) return;
|
|
}
|
|
// we don't need to go on if we're not doing deduping
|
|
THIS->m_callback ( THIS->m_state );
|
|
return;
|
|
}
|
|
|
|
// . sets m_errno to g_errno if not already set
|
|
void Msg51::gotClusterRec(Slot *slot) {
|
|
verify_signature();
|
|
|
|
// count it
|
|
m_numReplies++;
|
|
|
|
// free up
|
|
slot->m_inUse = false;
|
|
|
|
RdbList *list = &slot->m_list;
|
|
|
|
// this doubles as a ptr to a cluster rec
|
|
int32_t ci = slot->m_ci;
|
|
int64_t docId = m_docIds[ci];
|
|
|
|
// update m_errno if we had an error
|
|
if ( ! m_errno ) m_errno = g_errno;
|
|
|
|
if ( g_errno )
|
|
// print error
|
|
log(LOG_DEBUG,
|
|
"query: Had error getting cluster info for docId %ld: %s",
|
|
docId,
|
|
mstrerror(g_errno));
|
|
|
|
// assume error!
|
|
m_clusterLevels[ci] = CR_ERROR_CLUSTERDB;
|
|
|
|
// bail on error
|
|
if ( g_errno || list->getListSize() < 12 ) {
|
|
//log(LOG_DEBUG,
|
|
// "build: clusterdb rec for d=%" PRId64" dptr=%" PRIu32" "
|
|
// "not found. where is it?", docId, (int32_t)ci);
|
|
g_errno = 0;
|
|
return;
|
|
}
|
|
|
|
// . steal rec from this multicast
|
|
// . point to cluster rec, a int32_t
|
|
key96_t *rec = &m_clusterRecs[ci];
|
|
|
|
// store the cluster rec itself
|
|
*rec = *(key96_t *)(list->getList());
|
|
// debug note
|
|
log(LOG_DEBUG,
|
|
"query: had clusterdb SUCCESS for docid=%12" PRId64" ci=%3d "
|
|
"rec.n1=%08x,%016" PRIx64" sitehash26=0x%x",
|
|
(int64_t)docId, ci,
|
|
rec->n1,rec->n0,
|
|
Clusterdb::getSiteHash26(rec));
|
|
|
|
// check for docid mismatch
|
|
int64_t docId2 = Clusterdb::getDocId ( rec );
|
|
if ( docId != docId2 ) {
|
|
logf(LOG_DEBUG,"query: docid mismatch in clusterdb (%ld!=%ld)", docId, docId2);
|
|
return;
|
|
}
|
|
|
|
// it is legit, set to CR_OK
|
|
m_clusterLevels[ci] = CR_OK;
|
|
|
|
RdbCacheLock rcl(s_clusterdbQuickCache);
|
|
// . init the quick cache
|
|
if(!s_cacheInit &&
|
|
s_clusterdbQuickCache.init(g_conf.m_clusterdbQuickCacheMem,
|
|
sizeof(key96_t), // fixedDataSize (clusterdb rec)
|
|
g_conf.m_clusterdbQuickCacheMem/sizeof(key96_t)/2,
|
|
"clusterdbQuickCache" ,
|
|
false, // load from disk?
|
|
sizeof(key96_t), // cache key size
|
|
-1)) // numPtrsMax
|
|
// only init once if successful
|
|
s_cacheInit = true;
|
|
// . add the record to our quick cache as a int64_t
|
|
// . ignore any error
|
|
if(s_cacheInit)
|
|
s_clusterdbQuickCache.addRecord(m_collnum,
|
|
(key96_t)docId, // docid is key
|
|
(char *)rec,
|
|
sizeof(key96_t), // recSize
|
|
0);
|
|
|
|
// clear it in case the cache set it, we don't care
|
|
g_errno = 0;
|
|
}
|
|
|
|
// . cluster the docids based on the clusterRecs
|
|
// . returns false and sets g_errno on error
|
|
// . if maxDocIdsPerHostname is -1 do not do hostname clsutering
|
|
bool setClusterLevels ( const key96_t *clusterRecs,
|
|
const int64_t *docIds,
|
|
int32_t numRecs ,
|
|
int32_t maxDocIdsPerHostname ,
|
|
bool doHostnameClustering ,
|
|
bool familyFilter ,
|
|
bool isDebug ,
|
|
// output to clusterLevels[]
|
|
char *clusterLevels ) {
|
|
|
|
if ( numRecs <= 0 ) return true;
|
|
|
|
// skip if not clustering on anything
|
|
//if ( ! doHostnameClustering && ! familyFilter ) {
|
|
// memset ( clusterLevels, CR_OK, numRecs );
|
|
// return true;
|
|
//}
|
|
|
|
HashTableX ctab;
|
|
// init to 2*numRecs for speed. use 0 for niceness!
|
|
if ( ! ctab.set ( 8 , 4 , numRecs * 2,NULL,0,false,"clustertab" ) )
|
|
return false;
|
|
|
|
// time it
|
|
u_int64_t startTime = gettimeofdayInMilliseconds();
|
|
|
|
for(int32_t i=0; i<numRecs; i++) {
|
|
// . set this cluster level
|
|
// . right now will be CR_ERROR_CLUSTERDB or CR_OK...
|
|
char *level = &clusterLevels[i];
|
|
|
|
// sanity check
|
|
if ( *level == CR_UNINIT ) gbshutdownLogicError();
|
|
// and the adult bit, for cleaning the results
|
|
if ( familyFilter && Clusterdb::hasAdultContent ( &clusterRecs[i] ) ) {
|
|
*level = CR_DIRTY;
|
|
continue;
|
|
}
|
|
// if error looking up in clusterdb, use a 8 bit domainhash from docid
|
|
bool fakeIt = (*level==CR_ERROR_CLUSTERDB);
|
|
// assume ok, show it, it is visible
|
|
*level = CR_OK;
|
|
// site hash comes next
|
|
if(!doHostnameClustering)
|
|
continue;
|
|
|
|
// . get the site hash
|
|
// . these are only 32 bits!
|
|
int64_t h;
|
|
if(fakeIt)
|
|
h = Docid::getDomHash8FromDocId(docIds[i]);
|
|
else
|
|
h = Clusterdb::getSiteHash26 ( &clusterRecs[i] );
|
|
|
|
// inc this count!
|
|
if ( fakeIt ) {
|
|
g_stats.m_filterStats[CR_ERROR_CLUSTERDB]++;
|
|
}
|
|
|
|
// if it matches a siteid on our black list
|
|
//if ( checkNegative && sht.getSlot((int64_t)h) > 0 ) {
|
|
// *level = CR_BLACKLISTED_SITE; goto loop; }
|
|
// look it up
|
|
uint32_t score = ctab.getScore(h) ;
|
|
// if still visible, just continue
|
|
if ( score < (uint32_t)maxDocIdsPerHostname ) {
|
|
if ( ! ctab.addTerm(h))
|
|
return false;
|
|
continue;
|
|
}
|
|
// otherwise, no lonegr visible
|
|
*level = CR_CLUSTERED;
|
|
}
|
|
|
|
|
|
// debug
|
|
for ( int32_t i = 0 ; i < numRecs && isDebug ; i++ ) {
|
|
uint32_t siteHash26 = Clusterdb::getSiteHash26(&clusterRecs[i]);
|
|
logf(LOG_DEBUG,"query: msg51: hit #%2d) sitehash26=0x%x "
|
|
"rec.n0=%" PRIx64" docid=%" PRId64" cl=%" PRId32" (%s)",
|
|
(int32_t)i,
|
|
(int32_t)siteHash26,
|
|
clusterRecs[i].n0,
|
|
(int64_t)docIds[i],
|
|
(int32_t)clusterLevels[i],
|
|
g_crStrings[(int32_t)clusterLevels[i]] );
|
|
}
|
|
|
|
|
|
//log(LOG_DEBUG,"build: numVisible=%" PRId32" numClustered=%" PRId32" numErrors=%" PRId32,
|
|
// *numVisible,*numClustered,*numErrors);
|
|
// show time
|
|
uint64_t took = gettimeofdayInMilliseconds() - startTime;
|
|
if ( took > 3 )
|
|
log(LOG_INFO,"query: Took %" PRId64" ms to do clustering.",took);
|
|
|
|
// we are all done
|
|
return true;
|
|
}
|