mirror of
https://github.com/privacore/open-source-search-engine.git
synced 2025-02-02 03:38:43 -05:00
1930 lines
59 KiB
C++
1930 lines
59 KiB
C++
#include "Msg40.h"
|
|
#include "Stats.h" // for timing and graphing time to get all summaries
|
|
#include "Collectiondb.h"
|
|
#include "sort.h"
|
|
#include "matches2.h"
|
|
#include "XmlDoc.h" // computeSimilarity()
|
|
#include "Speller.h"
|
|
#include "Wiki.h"
|
|
#include "HttpServer.h"
|
|
#include "PageResults.h"
|
|
#include "HashTable.h"
|
|
//#include "AdultCheck.h"
|
|
#include "Process.h"
|
|
#include "UrlRealtimeClassification.h"
|
|
#include "UdpServer.h"
|
|
#include "Conf.h"
|
|
#include "GbMutex.h"
|
|
#include "ScopedLock.h"
|
|
#include "Mem.h"
|
|
#include "ScopedLock.h"
|
|
#include "Errno.h"
|
|
#include <new>
|
|
|
|
|
|
// increasing this doesn't seem to improve performance any on a single
|
|
// node cluster....
|
|
#define MAX_OUTSTANDING_MSG20S 200
|
|
|
|
static void gotDocIdsWrapper ( void *state );
|
|
static bool gotSummaryWrapper ( void *state );
|
|
|
|
static bool isVariantLikeSubDomain(const char *s, int32_t len);
|
|
|
|
Msg40::Msg40()
|
|
: m_deadline(0),
|
|
m_numRealtimeClassificationsStarted(0),
|
|
m_numRealtimeClassificationsCompleted(0),
|
|
m_mtxRealtimeClassificationsCounters(),
|
|
m_realtimeClassificationsSubmitted(false)
|
|
{
|
|
m_socketHadError = 0;
|
|
m_buf = NULL;
|
|
m_buf2 = NULL;
|
|
m_msg20 = NULL;
|
|
m_numMsg20s = 0;
|
|
m_msg20StartBuf = NULL;
|
|
m_numToFree = 0;
|
|
// new stuff for streaming results:
|
|
m_numPrinted = 0;
|
|
m_printedHeader = false;
|
|
m_printedTail = false;
|
|
m_sendsOut = 0;
|
|
m_sendsIn = 0;
|
|
m_printi = 0;
|
|
m_numDisplayed = 0;
|
|
m_numPrintedSoFar = 0;
|
|
m_didSummarySkip = false;
|
|
m_omitCount = 0;
|
|
m_printCount = 0;
|
|
m_numCollsToSearch = 0;
|
|
m_numMsg20sIn = 0;
|
|
m_numMsg20sOut = 0;
|
|
|
|
// Coverity
|
|
m_msg3aRecallCnt = 0;
|
|
m_docsToGet = 0;
|
|
m_docsToGetVisible = 0;
|
|
m_state = NULL;
|
|
m_callback = NULL;
|
|
m_numRequests = 0;
|
|
m_numReplies = 0;
|
|
m_moreToCome = false;
|
|
m_lastProcessedi = 0;
|
|
m_startTime = 0;
|
|
m_cachedTime = 0;
|
|
m_tasksRemaining = 0;
|
|
m_bufMaxSize = 0;
|
|
m_bufMaxSize2 = 0;
|
|
m_errno = 0;
|
|
m_si = NULL;
|
|
m_msg3aPtrs = NULL;
|
|
m_num3aRequests = 0;
|
|
m_num3aReplies = 0;
|
|
m_firstCollnum = 0;
|
|
}
|
|
|
|
void Msg40::resetBuf2 ( ) {
|
|
// remember num to free in reset() function
|
|
char *p = m_msg20StartBuf;
|
|
// msg20 destructors
|
|
for ( int32_t i = 0 ; i < m_numToFree ; i++ ) {
|
|
// cast it
|
|
Msg20 *m = (Msg20 *)p;
|
|
// free its stuff
|
|
m->~Msg20();
|
|
// advance
|
|
p += sizeof(Msg20);
|
|
}
|
|
// now free the msg20 ptrs and buffer space
|
|
if ( m_buf2 ) mfree ( m_buf2 , m_bufMaxSize2 , "Msg40b" );
|
|
m_buf2 = NULL;
|
|
}
|
|
|
|
Msg40::~Msg40() {
|
|
// free tmp msg3as now
|
|
for ( int32_t i = 0 ; i < m_numCollsToSearch ; i++ ) {
|
|
if ( ! m_msg3aPtrs[i] ) continue;
|
|
if ( m_msg3aPtrs[i] == &m_msg3a ) continue;
|
|
mdelete ( m_msg3aPtrs[i] , sizeof(Msg3a), "tmsg3a");
|
|
delete ( m_msg3aPtrs[i] );
|
|
m_msg3aPtrs[i] = NULL;
|
|
}
|
|
if ( m_buf ) mfree ( m_buf , m_bufMaxSize , "Msg40" );
|
|
m_buf = NULL;
|
|
resetBuf2();
|
|
}
|
|
|
|
// . returns false if blocked, true otherwise
|
|
// . sets g_errno on error
|
|
// . uses Msg3a to get docIds
|
|
// . uses many msg20s to get title/summary/url/docLen for each docId
|
|
bool Msg40::getResults ( SearchInput *si ,
|
|
bool forward ,
|
|
void *state ,
|
|
void (* callback) ( void *state ) ) {
|
|
|
|
log(LOG_INFO, "query: Msg40 start: query_id='%s' query='%s'", si->m_queryId, si->m_query);
|
|
m_omitCount = 0;
|
|
|
|
if(g_conf.m_msg40_msg39_timeout>0) {
|
|
m_deadline = gettimeofdayInMilliseconds() + g_conf.m_msg40_msg39_timeout;
|
|
}
|
|
|
|
// warning
|
|
//if ( ! si->m_coll2 ) log(LOG_LOGIC,"net: NULL collection. msg40.");
|
|
if ( si->m_collnumBuf.length() < (int32_t)sizeof(collnum_t) )
|
|
log(LOG_LOGIC,"net: NULL collection. msg40.");
|
|
|
|
|
|
m_lastProcessedi = -1;
|
|
m_didSummarySkip = false;
|
|
|
|
m_si = si;
|
|
m_state = state;
|
|
m_callback = callback;
|
|
m_msg3aRecallCnt = 0;
|
|
// we haven't allocated any Msg20s yet
|
|
m_numMsg20s = 0;
|
|
// reset our error keeper
|
|
m_errno = 0;
|
|
|
|
// take search parms i guess from first collnum
|
|
const collnum_t *cp = (const collnum_t *)m_si->m_collnumBuf.getBufStart();
|
|
|
|
// get the collection rec
|
|
CollectionRec *cr =g_collectiondb.getRec( cp[0] );
|
|
|
|
// g_errno should be set if not found
|
|
if ( ! cr ) { g_errno = ENOCOLLREC; return true; }
|
|
|
|
// save that
|
|
m_firstCollnum = cr->m_collnum;
|
|
|
|
// . reset these
|
|
// . Next X Results links? yes or no?
|
|
m_moreToCome = false;
|
|
// set this to zero -- assume not in cache
|
|
m_cachedTime = 0;
|
|
|
|
// bail now if 0 requested!
|
|
if ( m_si->m_docsWanted == 0 ) {
|
|
log("msg40: n=0.");
|
|
return true;
|
|
}
|
|
|
|
// or if no query terms
|
|
if ( m_si->m_q.m_numTerms <= 0 ) {
|
|
log("msg40: numTerms=0.");
|
|
return true;
|
|
}
|
|
|
|
//enforce hard limits
|
|
if(m_si->m_docsWanted > g_conf.m_maxDocsWanted) {
|
|
log(LOG_DEBUG,"msg40: limiting docs-wanted from %d to %d", m_si->m_docsWanted, g_conf.m_maxDocsWanted);
|
|
m_si->m_docsWanted = g_conf.m_maxDocsWanted;
|
|
}
|
|
if(m_si->m_firstResultNum > g_conf.m_maxFirstResultNum) {
|
|
log(LOG_DEBUG,"msg40: limiting docs-offset from %d to %d", m_si->m_firstResultNum, g_conf.m_maxFirstResultNum);
|
|
m_si->m_firstResultNum = g_conf.m_maxFirstResultNum;
|
|
}
|
|
// how many docids do we need to get?
|
|
int32_t get = m_si->m_docsWanted + m_si->m_firstResultNum ;
|
|
// we get one extra for so we can set m_moreToFollow so we know
|
|
// if more docids can be gotten (i.e. show a "Next 10" link)
|
|
get++;
|
|
|
|
// ok, need some sane limit though to prevent malloc from
|
|
// trying to get 7800003 docids and going ENOMEM
|
|
if ( get > MAXDOCIDSTOCOMPUTE ) {
|
|
log("msg40: asking for too many docids. reducing to %" PRId32,
|
|
(int32_t)MAXDOCIDSTOCOMPUTE);
|
|
get = MAXDOCIDSTOCOMPUTE;
|
|
}
|
|
// this is how many visible results we need, after filtering/clustering
|
|
m_docsToGetVisible = get;
|
|
|
|
// . get a little more since this usually doesn't remove many docIds
|
|
// . deduping is now done in Msg40.cpp once the summaries are gotten
|
|
if ( m_si->m_doDupContentRemoval ) get = (get*120LL)/100LL;
|
|
|
|
// . ALWAYS get at least this many
|
|
// . this allows Msg3a to allow higher scoring docids in tier #1 to
|
|
// outrank lower-scoring docids in tier #0, even if such docids have
|
|
// all the query terms explicitly. and we can guarantee consistency
|
|
// as long as we only allow for this outranking within the first
|
|
// MIN_DOCS_TO_GET docids.
|
|
if ( get < MIN_DOCS_TO_GET ) get = MIN_DOCS_TO_GET;
|
|
// this is how many docids to get total, assuming that some will be
|
|
// filtered out for being dups, etc. and that we will have at least
|
|
// m_docsToGetVisible leftover that are unfiltered and visible. so
|
|
// we tell each msg39 split to get more docids than we actually want
|
|
// in anticipation some will be filtered out in this class.
|
|
m_docsToGet = get;
|
|
|
|
// debug msg
|
|
if ( m_si->m_debug )
|
|
logf(LOG_DEBUG,"query: msg40 mapped %" PRId32" wanted to %" PRId32" to get",
|
|
m_docsToGetVisible,m_docsToGet );
|
|
|
|
// let's try using msg 0xfd like Proxy.cpp uses to forward an http
|
|
// request! then we just need specify the ip of the proxy and we
|
|
// do not need hosts2.conf!
|
|
if ( forward ) { g_process.shutdownAbort(true); }
|
|
|
|
// time the cache lookup
|
|
if ( g_conf.m_logTimingQuery || m_si->m_debug || g_conf.m_logDebugQuery)
|
|
m_startTime = gettimeofdayInMilliseconds();
|
|
|
|
// keep going
|
|
bool status = prepareToGetDocIds ( );
|
|
|
|
return status;
|
|
}
|
|
|
|
bool Msg40::prepareToGetDocIds ( ) {
|
|
|
|
// log the time it took for cache lookup
|
|
if ( g_conf.m_logTimingQuery || m_si->m_debug || g_conf.m_logDebugQuery) {
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
int64_t took = now - m_startTime;
|
|
logf(LOG_TIMING,"query: [%p] Not found in cache. Lookup took %" PRId64" ms.",this,took);
|
|
m_startTime = now;
|
|
logf(LOG_TIMING,"query: msg40: [%p] Getting up to %" PRId32" (docToGet=%" PRId32") docids", this,
|
|
m_docsToGetVisible, m_docsToGet);
|
|
}
|
|
|
|
// . if query has dirty words and family filter is on, set
|
|
// number of results to 0, and set the m_queryClen flag to true
|
|
// . m_qbuf1 should be the advanced/composite query
|
|
#if 0
|
|
//@@@ TODO
|
|
if ( m_si->m_familyFilter &&
|
|
getAdultPoints ( m_si->m_sbuf1.getBufStart() ,
|
|
m_si->m_sbuf1.length() ,
|
|
NULL ) ) {
|
|
// make sure the m_numDocIds gets set to 0
|
|
m_msg3a.reset();
|
|
return true;
|
|
}
|
|
#endif
|
|
|
|
return getDocIds( false );
|
|
}
|
|
|
|
// . returns false if blocked, true otherwise
|
|
// . sets g_errno on error
|
|
bool Msg40::getDocIds ( bool recall ) {
|
|
|
|
////
|
|
//
|
|
// NEW CODE FOR LAUNCHING one MSG3a per collnum to search a token
|
|
//
|
|
////
|
|
m_num3aReplies = 0;
|
|
m_num3aRequests = 0;
|
|
|
|
// how many are we searching? usually just one.
|
|
m_numCollsToSearch = m_si->m_collnumBuf.length() /sizeof(collnum_t);
|
|
|
|
// make enough for ptrs
|
|
int32_t need = sizeof(Msg3a *) * m_numCollsToSearch;
|
|
if ( ! m_msg3aPtrBuf.reserve ( need ) ) return true;
|
|
// cast the mem buffer
|
|
m_msg3aPtrs = (Msg3a **)m_msg3aPtrBuf.getBufStart();
|
|
|
|
// clear these out so we do not free them when destructing
|
|
for ( int32_t i = 0 ; i < m_numCollsToSearch ;i++ )
|
|
m_msg3aPtrs[i] = NULL;
|
|
|
|
// use first guy in case only one coll we are searching, the std case
|
|
if ( m_numCollsToSearch <= 1 )
|
|
m_msg3aPtrs[0] = &m_msg3a;
|
|
|
|
return federatedLoop();
|
|
}
|
|
|
|
bool Msg40::federatedLoop ( ) {
|
|
|
|
// search the provided collnums (collections)
|
|
const collnum_t *cp = (const collnum_t *)m_si->m_collnumBuf.getBufStart();
|
|
|
|
// we modified m_rcache above to be true if we should read from cache
|
|
int32_t maxAge = 0 ;
|
|
if ( m_si->m_rcache ) maxAge = g_conf.m_indexdbMaxIndexListAge;
|
|
|
|
|
|
if(g_conf.m_logTraceQuery || m_si->m_debug)
|
|
m_si->m_baseScoringParameters.traceToLog("query:msg40");
|
|
|
|
// reset it
|
|
Msg39Request mr;
|
|
mr.reset();
|
|
|
|
mr.m_maxAge = maxAge;
|
|
mr.m_addToCache = m_si->m_wcache;
|
|
mr.m_docsToGet = m_docsToGet;
|
|
mr.m_niceness = m_si->m_niceness;
|
|
mr.m_debug = m_si->m_debug ;
|
|
mr.m_getDocIdScoringInfo = m_si->m_getDocIdScoringInfo;
|
|
mr.m_doSiteClustering = m_si->m_doSiteClustering ;
|
|
mr.m_hideAllClustered = m_si->m_hideAllClustered;
|
|
mr.m_familyFilter = m_si->m_familyFilter;
|
|
mr.m_doMaxScoreAlgo = m_si->m_doMaxScoreAlgo;
|
|
mr.m_modifyQuery = true; //we are a user-specified query so modifying it is ok. todo/hack until msg39 can carry the full query information
|
|
mr.m_baseScoringParameters = m_si->m_baseScoringParameters;
|
|
mr.m_doDupContentRemoval = m_si->m_doDupContentRemoval ;
|
|
mr.m_word_variations_config = m_si->m_word_variations_config;
|
|
mr.m_familyFilter = m_si->m_familyFilter ;
|
|
mr.m_allowHighFrequencyTermCache = m_si->m_allowHighFrequencyTermCache;
|
|
mr.m_language = (unsigned char)m_si->m_queryLangId;
|
|
mr.ptr_query = const_cast<char*>(m_si->m_q.originalQuery());
|
|
mr.size_query = strlen(m_si->m_q.originalQuery())+1;
|
|
int32_t slen = 0; if ( m_si->m_sites ) slen=strlen(m_si->m_sites)+1;
|
|
mr.ptr_whiteList = const_cast<char*>(m_si->m_sites);
|
|
mr.size_whiteList = slen;
|
|
mr.m_timeout = g_conf.m_msg40_msg39_timeout;
|
|
mr.m_realMaxTop = m_si->m_realMaxTop;
|
|
|
|
mr.m_minSerpDocId = m_si->m_minSerpDocId;
|
|
mr.m_maxSerpScore = m_si->m_maxSerpScore;
|
|
memcpy(mr.m_queryId, m_si->m_queryId, sizeof(m_si->m_queryId));
|
|
|
|
if ( mr.m_timeout < m_si->m_minMsg3aTimeout )
|
|
mr.m_timeout = m_si->m_minMsg3aTimeout;
|
|
|
|
//
|
|
// how many docid splits should we do to avoid going OOM?
|
|
//
|
|
const CollectionRec *cr = g_collectiondb.getRec(m_firstCollnum);
|
|
RdbBase *base = NULL;
|
|
if ( cr ) g_titledb.getRdb()->getBase(cr->m_collnum);
|
|
//NOTE: the above line is a bug, but the obvious fix causes numDocIdSplits to become huge (eg 200)
|
|
int64_t numDocs = 0;
|
|
if ( base ) numDocs = base->getNumTotalRecs();
|
|
// for every 5M docids per host, lets split up the docid range
|
|
// to avoid going OOM
|
|
int32_t mult = numDocs / 5000000;
|
|
if ( mult <= 0 ) mult = 1;
|
|
|
|
int32_t nt = m_si->m_q.getNumTerms();
|
|
int32_t numDocIdSplits = nt / 2; // ;/// 2;
|
|
if ( numDocIdSplits <= 0 ) numDocIdSplits = 1;
|
|
// and mult based on index size
|
|
numDocIdSplits *= mult;
|
|
|
|
if ( cr ) mr.m_maxQueryTerms = cr->m_maxQueryTerms;
|
|
else mr.m_maxQueryTerms = 100;
|
|
|
|
// limit numDocIdSplits to the range specified in the configuration
|
|
if ( numDocIdSplits < g_conf.min_docid_splits )
|
|
numDocIdSplits = g_conf.min_docid_splits;
|
|
if ( numDocIdSplits > g_conf.max_docid_splits )
|
|
numDocIdSplits = g_conf.max_docid_splits;
|
|
log(LOG_DEBUG,"query: Msg40::federatedLoop: numDocIdSplits=%d", numDocIdSplits);
|
|
// store it in the reuquest now
|
|
mr.m_numDocIdSplits = numDocIdSplits;
|
|
|
|
if(m_si->m_doSiteClustering && cr) {
|
|
//Make a temporary query instance so we can calculate if site clustering should be turned off. We cannot use m_si->m_q because that could affect word highlighting
|
|
Query tmpQuery;
|
|
const CollectionRec *cr = g_collectiondb.getRec(m_firstCollnum);
|
|
tmpQuery.set(m_si->m_query,
|
|
m_si->m_queryLangId,
|
|
1.0, 1.0, NULL,
|
|
true,
|
|
m_si->m_allowHighFrequencyTermCache,
|
|
cr->m_maxQueryTerms);
|
|
|
|
DerivedScoringWeights dsw; //don't care about the values.
|
|
tmpQuery.modifyQuery(&dsw, *cr, &m_si->m_doSiteClustering);
|
|
mr.m_doSiteClustering = m_si->m_doSiteClustering;
|
|
}
|
|
|
|
|
|
int32_t maxOutMsg3as = 1;
|
|
|
|
// create new ones if searching more than 1 coll
|
|
for ( int32_t i = m_num3aRequests ; i < m_numCollsToSearch ; i++ ) {
|
|
|
|
// do not have more than this many outstanding
|
|
if ( m_num3aRequests - m_num3aReplies >= maxOutMsg3as )
|
|
// wait for it to return before launching another
|
|
return false;
|
|
|
|
// get it
|
|
Msg3a *mp = m_msg3aPtrs[i];
|
|
// stop if only searching one collection
|
|
if ( ! mp ) {
|
|
try { mp = new ( Msg3a); }
|
|
catch(std::bad_alloc&) {
|
|
g_errno = ENOMEM;
|
|
return true;
|
|
}
|
|
mnew(mp,sizeof(Msg3a),"tm3ap");
|
|
}
|
|
// assign it
|
|
m_msg3aPtrs[i] = mp;
|
|
// assign the request for it
|
|
memcpy ( &mp->m_msg39req , &mr , sizeof(Msg39Request) );
|
|
// then customize it to just search this collnum
|
|
mp->m_msg39req.m_collnum = cp[i];
|
|
|
|
// launch a search request
|
|
m_num3aRequests++;
|
|
// this returns false if it would block and will call callback
|
|
// m_si is actually contained in State0 in PageResults.cpp
|
|
// and Msg40::m_si points to that. so State0's destructor
|
|
// should call SearchInput's destructor which calls
|
|
// Query's destructor to destroy &m_si->m_q here when done.
|
|
if(!mp->getDocIds(m_si,&m_si->m_q,this,gotDocIdsWrapper))
|
|
continue;
|
|
if ( g_errno && ! m_errno )
|
|
m_errno = g_errno;
|
|
m_num3aReplies++;
|
|
}
|
|
|
|
// call again w/o parameters now
|
|
return gotDocIds ( );
|
|
}
|
|
|
|
// . uses parameters assigned to local member vars above
|
|
// . returns false if blocked, true otherwise
|
|
// . sets g_errno on error
|
|
void gotDocIdsWrapper ( void *state ) {
|
|
Msg40 *THIS = (Msg40 *) state;
|
|
// if this blocked, it returns false
|
|
//if ( ! checkTurnOffRAT ( state ) ) return;
|
|
THIS->m_num3aReplies++;
|
|
// try to launch more if there are more colls left to search
|
|
if ( THIS->m_num3aRequests < THIS->m_numCollsToSearch ) {
|
|
THIS->federatedLoop ( );
|
|
return;
|
|
}
|
|
// return if this blocked
|
|
if ( ! THIS->gotDocIds() ) return;
|
|
// now call callback, we're done
|
|
log(LOG_INFO, "query: Msg40 end: query_id='%s' query='%s', results=%d", THIS->m_si->m_queryId, THIS->m_si->m_query, THIS->getNumResults());
|
|
THIS->m_callback ( THIS->m_state );
|
|
}
|
|
|
|
// . return false if blocked, true otherwise
|
|
// . sets g_errno on error
|
|
bool Msg40::gotDocIds ( ) {
|
|
|
|
// return now if still waiting for a msg3a reply to get in
|
|
if ( m_num3aReplies < m_num3aRequests ) return false;
|
|
|
|
|
|
// if searching over multiple collections let's merge their docids
|
|
// into m_msg3a now before we go forward
|
|
// this will set g_errno on error, like oom
|
|
if ( ! mergeDocIdsIntoBaseMsg3a() )
|
|
log("msg40: error: %s",mstrerror(g_errno));
|
|
|
|
adjustRankingBasedOnFlags();
|
|
|
|
// log the time it took for cache lookup
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
|
|
if ( g_conf.m_logTimingQuery || m_si->m_debug||g_conf.m_logDebugQuery){
|
|
int64_t took = now - m_startTime;
|
|
logf(LOG_DEBUG,"query: msg40: [%p] Got %" PRId32" docids in %" PRId64" ms",
|
|
this,m_msg3a.getNumDocIds(),took);
|
|
logf(LOG_DEBUG,"query: msg40: [%p] Getting up to %" PRId32" summaries",
|
|
this,m_docsToGetVisible);
|
|
}
|
|
|
|
// save any covered up error
|
|
if ( ! m_errno && m_msg3a.m_errno ) m_errno = m_msg3a.m_errno;
|
|
//sanity check. we might not have allocated due to out of memory
|
|
if ( g_errno ) { m_errno = g_errno; return true; }
|
|
|
|
// time this
|
|
m_startTime = gettimeofdayInMilliseconds();
|
|
|
|
// we haven't got any Msg20 responses as of yet or sent any requests
|
|
m_numRequests = 0;
|
|
m_numReplies = 0;
|
|
|
|
if ( ! m_urlTable.set ( m_msg3a.getNumDocIds() * 2 ) ) {
|
|
m_errno = g_errno;
|
|
log("query: Failed to allocate memory for url deduping. Not deduping search results.");
|
|
return true;
|
|
}
|
|
|
|
// if only getting docids, skip summaries, and references
|
|
if ( m_si->m_docIdsOnly ) return true;
|
|
|
|
// . alloc buf to hold all m_msg20[i] ptrs and the Msg20s they point to
|
|
// . returns false and sets g_errno/m_errno on error
|
|
// . salvage any Msg20s that we can if we are being re-called
|
|
if ( ! reallocMsg20Buf() ) return true;
|
|
|
|
// . launch a bunch of task that depend on the docids we got
|
|
// . keep track of how many are out
|
|
m_tasksRemaining = 0;
|
|
|
|
// debug msg
|
|
if ( m_si->m_debug || g_conf.m_logDebugQuery )
|
|
logf(LOG_DEBUG,"query: [%p] Getting reference pages and dir pages.",
|
|
this);
|
|
|
|
return launchMsg20s ( false );
|
|
}
|
|
|
|
bool Msg40::mergeDocIdsIntoBaseMsg3a() {
|
|
|
|
// only do this if we were searching multiple collections, otherwise
|
|
// all the docids are already in m_msg3a
|
|
if ( m_numCollsToSearch <= 1 ) return true;
|
|
|
|
// free any mem in use
|
|
m_msg3a.reset();
|
|
|
|
// count total docids into "td"
|
|
int32_t td = 0LL;
|
|
for ( int32_t i = 0 ; i < m_numCollsToSearch ; i++ ) {
|
|
Msg3a *mp = m_msg3aPtrs[i];
|
|
td += mp->getNumDocIds();
|
|
// reset cursor for list of docids from this collection
|
|
mp->m_cursor = 0;
|
|
// add up here too
|
|
m_msg3a.m_numTotalEstimatedHits += mp->m_numTotalEstimatedHits;
|
|
}
|
|
|
|
// setup to to merge all msg3as into our one m_msg3a
|
|
int32_t need = 0;
|
|
need += td * 8;
|
|
need += td * sizeof(double);
|
|
need += td * sizeof(unsigned);
|
|
need += td * sizeof(key96_t);
|
|
need += td * 1;
|
|
need += td * sizeof(collnum_t);
|
|
// make room for the merged docids
|
|
m_msg3a.m_finalBuf = (char *)mmalloc ( need , "finalBuf" );
|
|
m_msg3a.m_finalBufSize = need;
|
|
// return true with g_errno set
|
|
if ( ! m_msg3a.m_finalBuf ) return true;
|
|
// parse the memory up into arrays
|
|
char *p = m_msg3a.m_finalBuf;
|
|
m_msg3a.m_docIds = (int64_t *)p; p += td * 8;
|
|
m_msg3a.m_scores = (double *)p; p += td * sizeof(double);
|
|
m_msg3a.m_flags = (unsigned*)p; p += td * sizeof(unsigned);
|
|
m_msg3a.m_clusterRecs = (key96_t *)p; p += td * sizeof(key96_t);
|
|
m_msg3a.m_clusterLevels = (char *)p; p += td * 1;
|
|
m_msg3a.m_scoreInfos = NULL;
|
|
m_msg3a.m_collnums = (collnum_t *)p; p += td * sizeof(collnum_t);
|
|
if ( p - m_msg3a.m_finalBuf != need ) { g_process.shutdownAbort(true); }
|
|
|
|
m_msg3a.m_numDocIds = td;
|
|
|
|
//
|
|
// begin the collection merge
|
|
//
|
|
|
|
for(int32_t next = 0; next<td; next++) {
|
|
// get next biggest score in the msg3as
|
|
double max = -1000000000.0;
|
|
Msg3a *maxmp = NULL;
|
|
for ( int32_t i = 0 ; i < m_numCollsToSearch ; i++ ) {
|
|
// shortcut
|
|
Msg3a *mp = m_msg3aPtrs[i];
|
|
// get cursor
|
|
int32_t cursor = mp->m_cursor;
|
|
// skip if exhausted
|
|
if ( cursor >= mp->m_numDocIds ) continue;
|
|
// get his next score
|
|
double score = mp->m_scores[ cursor ];
|
|
if ( score <= max ) continue;
|
|
// got a new winner
|
|
max = score;
|
|
maxmp = mp;
|
|
}
|
|
|
|
if(!maxmp )
|
|
break; //done
|
|
// store him
|
|
m_msg3a.m_docIds [next] = maxmp->m_docIds[maxmp->m_cursor];
|
|
m_msg3a.m_scores [next] = maxmp->m_scores[maxmp->m_cursor];
|
|
m_msg3a.m_flags [next] = maxmp->m_flags[maxmp->m_cursor];
|
|
m_msg3a.m_collnums[next] = maxmp->m_msg39req.m_collnum;
|
|
m_msg3a.m_clusterLevels[next] = CR_OK;
|
|
maxmp->m_cursor++;
|
|
}
|
|
|
|
// free tmp msg3as now
|
|
for ( int32_t i = 0 ; i < m_numCollsToSearch ; i++ ) {
|
|
if ( m_msg3aPtrs[i] == &m_msg3a ) continue;
|
|
mdelete ( m_msg3aPtrs[i] , sizeof(Msg3a), "tmsg3a");
|
|
delete ( m_msg3aPtrs[i] );
|
|
m_msg3aPtrs[i] = NULL;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
//adjust the order of the results based on the flags of the documents
|
|
void Msg40::adjustRankingBasedOnFlags() {
|
|
int *rank = (int*)mmalloc(m_msg3a.m_numDocIds * sizeof(int), "ranksort");
|
|
if(!rank)
|
|
return; //not a serious problem
|
|
|
|
for(int i=0; i<m_msg3a.m_numDocIds; i++) {
|
|
unsigned flags = m_msg3a.m_flags[i];
|
|
int adjustment = 0;
|
|
if(flags) {
|
|
for(int bit=0; bit<26; bit++)
|
|
if((1<<bit)&flags)
|
|
adjustment += m_si->m_baseScoringParameters.m_flagRankAdjustment[bit];
|
|
rank[i] = i + adjustment;
|
|
} else
|
|
rank[i] = i;
|
|
}
|
|
|
|
//There are no library functions to sort multiple side-by-side arrays. Fortunately, the positions are almost sorted, so a plain insertion sort is the ideal algorithm
|
|
for(int i=1; i<m_msg3a.m_numDocIds; i++) {
|
|
for(int j=i; j>0 && rank[j-1] > rank[j]; j--) {
|
|
std::swap(m_msg3a.m_docIds[j],m_msg3a.m_docIds[j-1]);
|
|
std::swap(m_msg3a.m_scores[j],m_msg3a.m_scores[j-1]);
|
|
std::swap(m_msg3a.m_flags[j],m_msg3a.m_flags[j-1]);
|
|
if(m_msg3a.m_collnums)
|
|
std::swap(m_msg3a.m_collnums[j],m_msg3a.m_collnums[j-1]);
|
|
std::swap(m_msg3a.m_clusterLevels[j],m_msg3a.m_clusterLevels[j-1]);
|
|
std::swap(rank[j],rank[j-1]);
|
|
}
|
|
}
|
|
|
|
mfree(rank,m_msg3a.m_numDocIds * sizeof(int),"ranksort");
|
|
}
|
|
|
|
|
|
// . returns false and sets g_errno/m_errno on error
|
|
// . makes m_msg3a.m_numDocIds ptrs to Msg20s.
|
|
// . does not allocate a Msg20 in the buffer if the m_msg3a.m_clusterLevels[i]
|
|
// is something other than CR_OK
|
|
bool Msg40::reallocMsg20Buf ( ) {
|
|
|
|
// if the user only requested docids, we have no summaries
|
|
if ( m_si->m_docIdsOnly ) return true;
|
|
|
|
// . allocate m_buf2 to hold all our Msg20 pointers and Msg20 classes
|
|
// . how much mem do we need?
|
|
// . need space for the msg20 ptrs
|
|
int64_t need = m_msg3a.m_numDocIds * sizeof(Msg20 *);
|
|
// need space for the classes themselves, only if "visible" though
|
|
for ( int32_t i = 0 ; i < m_msg3a.m_numDocIds ; i++ )
|
|
if ( m_msg3a.m_clusterLevels[i] == CR_OK )
|
|
need += sizeof(Msg20);
|
|
|
|
// MDW: try to preserve the old Msg20s if we are being re-called
|
|
if ( m_buf2 ) {
|
|
// make new buf
|
|
char *newBuf = (char *)mmalloc(need,"Msg40d");
|
|
// return false if it fails
|
|
if ( ! newBuf ) { m_errno = g_errno; return false; }
|
|
// fill it up
|
|
char *p = newBuf;
|
|
// point to our new array of Msg20 ptrs
|
|
Msg20 **tmp = (Msg20 **)p;
|
|
// skip over pointer array
|
|
p += m_msg3a.m_numDocIds * sizeof(Msg20 *);
|
|
// record start to set to m_msg20StartBuf
|
|
char *pstart = p;
|
|
// and count for m_numToFree
|
|
int32_t pcount = 0;
|
|
// fill in the actual Msg20s from the old buffer
|
|
for ( int32_t i = 0 ; i < m_msg3a.m_numDocIds ; i++ ) {
|
|
// assume empty, because clustered, filtered, etc.
|
|
tmp[i] = NULL;
|
|
// if clustered, keep it as a NULL ptr
|
|
if ( m_msg3a.m_clusterLevels[i] != CR_OK ) continue;
|
|
// point it to its memory
|
|
tmp[i] = (Msg20 *)p;
|
|
// point to the next Msg20
|
|
p += sizeof(Msg20);
|
|
// init it
|
|
new (tmp[i]) Msg20();
|
|
// count it
|
|
pcount++;
|
|
// skip it if it is a new docid, we do not have a Msg20
|
|
// for it from the previous tier. IF it is from
|
|
// the current tier, THEN it is new.
|
|
//if ( m_msg3a.m_tiers[i] == m_msg3a.m_tier ) continue;
|
|
// see if we can find this docid from the old list!
|
|
int32_t k = 0;
|
|
for ( ; k < m_numMsg20s ; k++ ) {
|
|
// skip if NULL
|
|
if ( ! m_msg20[k] ) continue;
|
|
// if it never gave us a reply then skip it
|
|
if ( ! m_msg20[k]->m_gotReply ) continue;
|
|
//or if it had an error
|
|
if ( m_msg20[k]->m_errno ) continue;
|
|
// skip if no match
|
|
if ( m_msg3a .m_docIds[i] !=
|
|
m_msg20[k]->m_r->m_docId )//getDocId() )
|
|
continue;
|
|
// we got a match, grab its Msg20
|
|
break;
|
|
}
|
|
// . skip if we could not match it... strange...
|
|
// . no, because it may have been in the prev tier,
|
|
// from a split, but it was not in msg3a's final
|
|
// merged list made in Msg3a::mergeLists(), but now
|
|
// it is in there, with the previous tier, because
|
|
// we asked for more docids from msg3a.
|
|
// . NO! why did we go to the next tier unnecessarily
|
|
// THEN? no again, because we did a msg3a recall
|
|
// and asked for more docids which required us
|
|
// going to the next tier, even though some (but
|
|
// not enough) docids remained in the previous tier.
|
|
if ( k >= m_numMsg20s ) {
|
|
/*
|
|
logf(LOG_DEBUG,"query: msg40: could not match "
|
|
"docid %" PRId64" (max=%" PRId32") "
|
|
"to msg20. newBitScore=0x%hhx q=%s",
|
|
m_msg3a.m_docIds[i],
|
|
(char)m_msg3a.m_bitScores[i],
|
|
m_msg3a.m_q->m_orig);
|
|
*/
|
|
continue;
|
|
}
|
|
// it is from an older tier but never got the msg20
|
|
// for it? what happened? it got unclustered??
|
|
if ( ! m_msg20[k] ) continue;
|
|
|
|
// . otherwise copy the memory if available
|
|
// . if m_msg20[i]->m_docId is set this will save us
|
|
// repeating a summary lookup
|
|
tmp[i]->moveFrom(m_msg20[k]);
|
|
}
|
|
// sanity check
|
|
if ( p - (char *)tmp != need ) { g_process.shutdownAbort(true); }
|
|
|
|
resetBuf2();
|
|
|
|
// the new buf2 stuff
|
|
m_numToFree = pcount;
|
|
m_msg20StartBuf = pstart;
|
|
|
|
// re-assign the msg20 ptr to the ptrs
|
|
m_msg20 = tmp;
|
|
// update new count
|
|
m_numMsg20s = m_msg3a.m_numDocIds;
|
|
|
|
// assign to new mem
|
|
m_buf2 = newBuf;
|
|
m_bufMaxSize2 = need;
|
|
|
|
// all done
|
|
return true;
|
|
}
|
|
|
|
m_numMsg20s = m_msg3a.m_numDocIds;
|
|
|
|
m_buf2 = NULL;
|
|
m_bufMaxSize2 = need;
|
|
|
|
// do the alloc
|
|
if ( need ) m_buf2 = (char *)mmalloc ( need ,"Msg40msg20");
|
|
if ( need && ! m_buf2 ) { m_errno = g_errno; return false; }
|
|
// point to the mem
|
|
char *p = m_buf2;
|
|
// point to the array, then make p point to the Msg20 buffer space
|
|
m_msg20 = (Msg20 **)p;
|
|
p += m_numMsg20s * sizeof(Msg20 *);
|
|
// start free here
|
|
m_msg20StartBuf = p;
|
|
// set the m_msg20[] array to use this memory, m_buf20
|
|
for ( int32_t i = 0 ; i < m_numMsg20s ; i++ ) {
|
|
// assume empty
|
|
m_msg20[i] = NULL;
|
|
// if clustered, do a NULL ptr
|
|
if ( m_msg3a.m_clusterLevels[i] != CR_OK ) continue;
|
|
// point it to its memory
|
|
m_msg20[i] = (Msg20 *)p;
|
|
// call its constructor
|
|
new (m_msg20[i]) Msg20();
|
|
// point to the next Msg20
|
|
p += sizeof(Msg20);
|
|
// remember num to free in reset() function
|
|
m_numToFree++;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool Msg40::launchMsg20s(bool recalled) {
|
|
|
|
if( !m_si ) {
|
|
logError("cannot use this function when m_si is not set");
|
|
gbshutdownLogicError();
|
|
}
|
|
|
|
// don't launch any more if client browser closed socket
|
|
if ( m_socketHadError ) {
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
// these are just like for passing to Msg39 above
|
|
int64_t maxCacheAge = 0 ;
|
|
// may it somewhat jive with the search results caching, otherwise
|
|
// it will tell me a search result was indexed like 3 days ago
|
|
// when it was just indexed 10 minutes ago because the
|
|
// titledbMaxCacheAge was set way too high
|
|
if ( m_si->m_rcache )
|
|
maxCacheAge = g_conf.m_docSummaryWithDescriptionMaxCacheAge;
|
|
|
|
int32_t maxOut = (int32_t)MAX_OUTSTANDING_MSG20S;
|
|
if ( g_udpServer.getNumUsedSlots() > 500 ) maxOut = 10;
|
|
if ( g_udpServer.getNumUsedSlots() > 800 ) maxOut = 1;
|
|
|
|
// if not deduping or site clustering, then
|
|
// just skip over docids for speed.
|
|
// don't bother with summaries we do not need
|
|
if ( ! m_si->m_doDupContentRemoval &&
|
|
! m_si->m_doSiteClustering &&
|
|
m_lastProcessedi == -1 ) {
|
|
// start getting summaries with the result # they want
|
|
m_lastProcessedi = m_si->m_firstResultNum-1;
|
|
// assume we printed the summaries before
|
|
m_printi = m_si->m_firstResultNum;
|
|
m_numDisplayed = m_si->m_firstResultNum;
|
|
// fake this so Msg40::gotSummary() can let us finish
|
|
// because it checks m_numRequests < m_msg3a.m_numDocIds
|
|
m_numRequests = m_si->m_firstResultNum;
|
|
m_numReplies = m_si->m_firstResultNum;
|
|
m_didSummarySkip = true;
|
|
log("query: skipping summary generation of first %" PRId32" docs",
|
|
m_si->m_firstResultNum);
|
|
}
|
|
|
|
// . launch a msg20 getSummary() for each docid
|
|
// . m_numContiguous should preceed any gap, see below
|
|
for ( int32_t i = m_lastProcessedi+1 ; i < m_msg3a.m_numDocIds ;i++ ) {
|
|
// if the user only requested docids, do not get the summaries
|
|
if ( m_si->m_docIdsOnly ) break;
|
|
// hard limit
|
|
if ( m_numRequests-m_numReplies >= maxOut ) break;
|
|
|
|
// do not repeat for this i
|
|
m_lastProcessedi = i;
|
|
|
|
// start up a Msg20 to get the summary
|
|
Msg20 *m = m_msg20[i];
|
|
|
|
// if to a dead host, skip it
|
|
int64_t docId = m_msg3a.m_docIds[i];
|
|
uint32_t shardNum = g_hostdb.getShardNumFromDocId ( docId );
|
|
// get the collection rec
|
|
const CollectionRec *cr = g_collectiondb.getRec(m_firstCollnum);
|
|
// if shard is dead then do not send to it if not crawlbot
|
|
if ( g_hostdb.isShardDead ( shardNum ) && cr ) {
|
|
log("msg40: skipping summary lookup #%" PRId32" of docid %" PRId64" for dead shard #%" PRId32
|
|
, i
|
|
, docId
|
|
, shardNum );
|
|
m_numRequests++;
|
|
m_numReplies++;
|
|
continue;
|
|
}
|
|
|
|
|
|
// if msg20 ptr null that means the cluster level is not CR_OK
|
|
if ( ! m ) {
|
|
m_numRequests++;
|
|
m_numReplies++;
|
|
continue;
|
|
}
|
|
// . did we already TRY to get the summary for this docid?
|
|
// . we might be re-called from the refilter: below
|
|
// . if already did it, skip it
|
|
// . Msg20::getSummary() sets m_docId, first thing
|
|
if ( m_msg3a.m_docIds[i] == m->getRequestDocId() ) {
|
|
m_numRequests++;
|
|
m_numReplies++;
|
|
continue;
|
|
}
|
|
|
|
// assume no error
|
|
g_errno = 0;
|
|
// debug msg
|
|
if ( m_si->m_debug || g_conf.m_logDebugQuery )
|
|
logf(LOG_DEBUG,"query: msg40: [%p] Getting summary #%" PRId32" for docId=%" PRId64,
|
|
this,i,m_msg3a.m_docIds[i]);
|
|
// launch it
|
|
m_numRequests++;
|
|
|
|
if ( ! cr ) {
|
|
log("msg40: missing coll");
|
|
g_errno = ENOCOLLREC;
|
|
if ( m_numReplies < m_numRequests ) return false;
|
|
return true;
|
|
}
|
|
|
|
// set the summary request then get it!
|
|
Msg20Request req;
|
|
Query *q = &m_si->m_q;
|
|
req.ptr_qbuf = const_cast<char*>(q->getQuery());
|
|
req.size_qbuf = q->getQueryLen()+1;
|
|
req.m_langId = m_si->m_queryLangId;
|
|
req.m_prefferedResultLangId = getLangIdFromAbbr(m_si->getPreferredResultLanguage().c_str());
|
|
|
|
req.m_highlightQueryTerms = m_si->m_doQueryHighlighting;
|
|
|
|
req.m_isDebug = (bool)m_si->m_debug;
|
|
|
|
if ( m_si->m_displayMetas && m_si->m_displayMetas[0] ) {
|
|
int32_t dlen = strlen(m_si->m_displayMetas);
|
|
req.ptr_displayMetas = const_cast<char *>(m_si->m_displayMetas);
|
|
req.size_displayMetas = dlen+1;
|
|
}
|
|
|
|
req.m_docId = m_msg3a.m_docIds[i];
|
|
|
|
// if the msg3a was merged from other msg3as because we
|
|
// were searching multiple collections...
|
|
if ( m_msg3a.m_collnums )
|
|
req.m_collnum = m_msg3a.m_collnums[i];
|
|
// otherwise, just one collection
|
|
else
|
|
req.m_collnum = m_msg3a.m_msg39req.m_collnum;
|
|
|
|
req.m_numSummaryLines = m_si->m_numLinesInSummary;
|
|
req.m_maxCacheAge = maxCacheAge;
|
|
req.m_state = this;
|
|
req.m_callback = gotSummaryWrapper;
|
|
req.m_niceness = m_si->m_niceness;
|
|
req.m_showBanned = m_si->m_showBanned;
|
|
req.m_includeCachedCopy = m_si->m_includeCachedCopy;
|
|
req.m_getSummaryVector = true;
|
|
req.m_titleMaxLen = m_si->m_titleMaxLen;
|
|
req.m_summaryMaxLen = cr->m_summaryMaxLen;
|
|
req.m_word_variations_config = m_si->m_word_variations_config;
|
|
req.m_useQueryStopWords = m_si->m_word_variations_config.m_wiktionaryWordVariations; //SearchInput doesn't have a m_useQueryStopWords, but if they wanted synonyms (m_queryExpansion) then they probably also want stop words
|
|
req.m_allowHighFrequencyTermCache = m_si->m_allowHighFrequencyTermCache;
|
|
|
|
// Line means excerpt
|
|
req.m_summaryMaxNumCharsPerLine = m_si->m_summaryMaxNumCharsPerLine;
|
|
|
|
// a special undocumented thing for getting <h1> tag
|
|
req.m_getHeaderTag = m_si->m_hr.getLong("geth1tag",0);
|
|
// let "ns" parm override
|
|
req.m_numSummaryLines = m_si->m_numLinesInSummary;
|
|
|
|
// . buzz likes to do the &inlinks=1 parm to get inlinks
|
|
// . use "&inlinks=1" for realtime inlink info, use
|
|
// "&inlinks=2" to just get it from the title rec, which is
|
|
// more stale, but does not take extra time or resources
|
|
// . we "default" to the realtime stuff... i.e. since buzz
|
|
// is already using "&inlinks=1"
|
|
if ( m_si->m_displayInlinks == 2 )
|
|
req.m_getLinkInfo = true;
|
|
|
|
// it copies this using a serialize() function
|
|
if ( ! m->getSummary ( &req ) ) continue;
|
|
|
|
// got reply
|
|
m_numReplies++;
|
|
// . otherwise we got summary without blocking
|
|
// . deal with an error
|
|
if ( ! g_errno ) continue;
|
|
// log it
|
|
log("query: Had error getting summary: %s.",
|
|
mstrerror(g_errno));
|
|
// record g_errno
|
|
if ( ! m_errno ) m_errno = g_errno;
|
|
// reset g_errno
|
|
g_errno = 0;
|
|
}
|
|
// return false if still waiting on replies
|
|
if ( m_numReplies < m_numRequests ) return false;
|
|
// do not re-call gotSummary() to avoid a possible recursive stack
|
|
// explosion. this is only true if we are being called from
|
|
// gotSummary() already, so do not call it again!!
|
|
if ( recalled )
|
|
return true;
|
|
// if we got nothing, that's it
|
|
if ( m_msg3a.m_numDocIds <= 0 ) {
|
|
// otherwise, we're done
|
|
return true;
|
|
}
|
|
|
|
// . i guess crash here for now
|
|
// . seems like we can call reallocMsg20Buf() and the first 50
|
|
// can already be set, so we drop down to here... so don't core
|
|
logf(LOG_DEBUG,"query: Had all msg20s already.");
|
|
// . otherwise, we got everyone, so go right to the merge routine
|
|
// . returns false if not all replies have been received
|
|
// . returns true if done
|
|
// . sets g_errno on error
|
|
return gotSummary ( );
|
|
}
|
|
|
|
Msg20 *Msg40::getAvailMsg20 ( ) {
|
|
for ( int32_t i = 0 ; i < m_numMsg20s ; i++ ) {
|
|
// m_inProgress is set to false right before it
|
|
// calls Msg20::m_callback which is gotSummaryWrapper()
|
|
// so we should be ok with this
|
|
if ( m_msg20[i]->m_launched ) continue;
|
|
return m_msg20[i];
|
|
}
|
|
// how can this happen??? THIS HAPPEND
|
|
g_process.shutdownAbort(true);
|
|
return NULL;
|
|
}
|
|
|
|
Msg20 *Msg40::getCompletedSummary ( int32_t ix ) {
|
|
for ( int32_t i = 0 ; i < m_numMsg20s ; i++ ) {
|
|
// it seems m_numMsg20s can be > m_numRequests when doing
|
|
// a multi collection federated search somehow and this
|
|
// can therefore be null
|
|
if ( ! m_msg20[i] )
|
|
continue;
|
|
|
|
if ( m_msg20[i]->m_ii != ix ) continue;
|
|
if ( m_msg20[i]->m_inProgress ) return NULL;
|
|
return m_msg20[i];
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
bool gotSummaryWrapper ( void *state ) {
|
|
Msg40 *THIS = (Msg40 *)state;
|
|
// inc it here
|
|
THIS->m_numReplies++;
|
|
|
|
if ( (THIS->m_numReplies % 10) == 0 ) {
|
|
log( "msg40: got %" PRId32 " summaries out of %" PRId32 "",
|
|
THIS->m_numReplies,
|
|
THIS->m_msg3a.m_numDocIds );
|
|
}
|
|
|
|
// it returns false if we're still awaiting replies
|
|
if ( !THIS->gotSummary() ) {
|
|
return false;
|
|
}
|
|
|
|
// now call callback, we're done
|
|
log(LOG_INFO, "query: Msg40 end: query_id='%s' query='%s', results=%d", THIS->m_si->m_queryId, THIS->m_si->m_query, THIS->getNumResults());
|
|
THIS->m_callback ( THIS->m_state );
|
|
|
|
return true;
|
|
}
|
|
|
|
static void doneSendingWrapper9(void *state, TcpSocket *sock) {
|
|
Msg40 *THIS = (Msg40 *)state;
|
|
|
|
// the send completed, count it
|
|
THIS->m_sendsIn++;
|
|
|
|
// error?
|
|
if ( THIS->m_sendsIn > THIS->m_sendsOut ) {
|
|
log("msg40: sendsin > sendsout. bailing!!!");
|
|
// try to prevent a core i haven't fixed right yet!!!
|
|
// seems like a reply coming back after we've destroyed the
|
|
// state!!!
|
|
return;
|
|
}
|
|
// socket error? if client closes the socket midstream we get one.
|
|
if ( g_errno ) {
|
|
THIS->m_socketHadError = g_errno;
|
|
log("msg40: streaming socket had error: %s",
|
|
mstrerror(g_errno));
|
|
// i guess destroy the socket here so we don't get called again?
|
|
}
|
|
|
|
// clear it so we don't think it was a msg20 error below
|
|
g_errno = 0;
|
|
|
|
// try to send more... returns false if blocked on something
|
|
if ( ! THIS->gotSummary() ) return;
|
|
|
|
// all done!!!???
|
|
log(LOG_INFO, "query: Msg40 end: query_id='%s' query='%s', results=%d", THIS->m_si->m_queryId, THIS->m_si->m_query, THIS->getNumResults());
|
|
THIS->m_callback ( THIS->m_state );
|
|
}
|
|
|
|
// . returns false if not all replies have been received (or timed/erroredout)
|
|
// . returns true if done (or an error finished us)
|
|
// . sets g_errno on error
|
|
bool Msg40::gotSummary ( ) {
|
|
// now m_linkInfo[i] (for some i, i dunno which) is filled
|
|
if ( m_si->m_debug || g_conf.m_logDebugQuery )
|
|
logf(LOG_DEBUG,"query: msg40: [%p] Got summary. Total got=#%" PRId32".",
|
|
this,m_numReplies);
|
|
|
|
// did we have a problem getting this summary?
|
|
if ( g_errno ) {
|
|
// save it
|
|
m_errno = g_errno;
|
|
// log it
|
|
log("query: msg40: Got error getting summary: %s.", mstrerror(g_errno));
|
|
// reset g_errno
|
|
g_errno = 0;
|
|
}
|
|
|
|
// initialize dedup table if we haven't already
|
|
if ( ! m_dedupTable.isInitialized() &&
|
|
! m_dedupTable.set (4,0,64,NULL,0,false,"srdt") )
|
|
log("query: error initializing dedup table: %s",
|
|
mstrerror(g_errno));
|
|
|
|
State0 *st = (State0 *)m_state;
|
|
|
|
doAgain:
|
|
|
|
st->m_sb.reset();
|
|
|
|
// do we still own this socket? i am thinking it got closed somewhere
|
|
// and the socket descriptor was re-assigned to another socket
|
|
// getting a diffbot reply from XmLDoc::getDiffbotReply()
|
|
if ( st->m_socket &&
|
|
st->m_socket->m_startTime != st->m_socketStartTimeHack ) {
|
|
log("msg40: lost control of socket. sd=%i. the socket descriptor closed on us and got re-used by someone else.",
|
|
(int)st->m_socket->m_sd);
|
|
// if there wasn't already an error like 'broken pipe' then
|
|
// set it here so we stop getting summaries if streaming.
|
|
if ( ! m_socketHadError ) m_socketHadError = EBADENGINEER;
|
|
// make it NULL to avoid us from doing anything to it
|
|
// since sommeone else is using it now.
|
|
st->m_socket = NULL;
|
|
}
|
|
|
|
|
|
// . transmit the chunk in sb if non-zero length
|
|
// . steals the allocated buffer from sb and stores in the
|
|
// TcpSocket::m_sendBuf, which it frees when socket is
|
|
// ultimately destroyed or we call sendChunk() again.
|
|
// . when TcpServer is done transmitting, it does not close the
|
|
// socket but rather calls doneSendingWrapper() which can call
|
|
// this function again to send another chunk
|
|
// . when we are truly done sending all the data, then we set lastChunk
|
|
// to true and TcpServer.cpp will destroy m_socket when done.
|
|
// no, actually we just set m_streamingMode to false i guess above
|
|
if ( st->m_sb.length() &&
|
|
// did client browser close the socket on us midstream?
|
|
! m_socketHadError &&
|
|
st->m_socket)
|
|
{
|
|
if( ! g_httpServer.m_tcp.sendChunk ( st->m_socket,
|
|
&st->m_sb,
|
|
this,
|
|
doneSendingWrapper9 ) )
|
|
// if it blocked, inc this count. we'll only call m_callback
|
|
// above when m_sendsIn equals m_sendsOut... and
|
|
// m_numReplies == m_numRequests
|
|
m_sendsOut++;
|
|
|
|
// writing on closed socket?
|
|
if ( g_errno ) {
|
|
if ( ! m_socketHadError ) m_socketHadError = g_errno;
|
|
log("msg40: got tcp error : %s",mstrerror(g_errno));
|
|
// disown it here so we do not damage in case it gets
|
|
// reopened by someone else
|
|
st->m_socket = NULL;
|
|
}
|
|
}
|
|
|
|
// do we need to launch another batch of summary requests?
|
|
if ( m_numRequests < m_msg3a.m_numDocIds && ! m_socketHadError ) {
|
|
// . if we can launch another, do it
|
|
// . say "true" here so it does not call us, gotSummary() and
|
|
// do a recursive stack explosion
|
|
// . this returns false if still waiting on more to come back
|
|
if ( ! launchMsg20s ( true ) ) return false;
|
|
|
|
// it returned true, so m_numRequests == m_numReplies and
|
|
// we don't need to launch any more! but that does NOT
|
|
// make sense because m_numContiguous < m_msg3a.m_numDocIds
|
|
// . i guess the launch can fail because of oom... and
|
|
// end up returning true here... seen it happen, and
|
|
// we had full requests/replies for m_msg3a.m_numDocIds
|
|
log("msg40: got all replies i guess");
|
|
goto doAgain;
|
|
}
|
|
|
|
|
|
// . ok, now i wait for all msg20s (getsummary) to come back in.
|
|
// . TODO: evaluate if this hurts us
|
|
if ( m_numReplies < m_numRequests )
|
|
return false;
|
|
|
|
return gotSummaries();
|
|
}
|
|
|
|
|
|
//We got the replies we initially requested. Now set cluster levels, and filter on clustering, family filter,
|
|
//error/show-errors, etc. After that, the summaries are ready for realtime-classification.
|
|
bool Msg40::gotSummaries() {
|
|
int64_t startTime = gettimeofdayInMilliseconds();
|
|
|
|
// loop over each clusterLevel and set it
|
|
for ( int32_t i = 0 ; i < m_numReplies ; i++ ) {
|
|
// did we skip the first X summaries because we were
|
|
// not deduping/siteclustering/gettingGigabits?
|
|
if ( m_didSummarySkip && i < m_si->m_firstResultNum )
|
|
continue;
|
|
// get current cluster level
|
|
char *level = &m_msg3a.m_clusterLevels[i];
|
|
// sanity check -- this is a transistional value msg3a should
|
|
// set it to something else!
|
|
if ( *level == CR_GOT_REC ) { g_process.shutdownAbort(true); }
|
|
if ( *level == CR_ERROR_CLUSTERDB ) { g_process.shutdownAbort(true); }
|
|
// skip if already "bad"
|
|
if ( *level != CR_OK ) continue;
|
|
// if the user only requested docids, we have no summaries
|
|
if ( m_si->m_docIdsOnly ) break;
|
|
// convenient var
|
|
Msg20 *m = m_msg20[i];
|
|
// get the Msg20 reply
|
|
const Msg20Reply *mr = m->m_r;
|
|
// if no reply, all hosts must have been dead i guess so
|
|
// filter out this guy
|
|
if ( ! mr && ! m->m_errno ) {
|
|
logf(LOG_DEBUG,"query: msg 20 reply was null.");
|
|
m->m_errno = ENOHOSTS;
|
|
}
|
|
|
|
if ( m_si->m_familyFilter && mr && mr->m_isAdult) {
|
|
logf(LOG_DEBUG,"query: msg20.is_adult and family filter is on.");
|
|
m->m_errno = EDOCADULT;
|
|
}
|
|
// if any msg20 has m_errno set, then set ours so at least the
|
|
// xml feed will know there was a problem even though it may
|
|
// have gotten search results.
|
|
if ( m->m_errno ) {
|
|
if ( m_si->m_debug || g_conf.m_logDebugQuery ) {
|
|
logf( LOG_DEBUG, "query: result %" PRId32 " (docid=%" PRId64 ") had an error (%s) and will not be shown.",
|
|
i, m_msg3a.m_docIds[i], mstrerror( m->m_errno ) );
|
|
}
|
|
|
|
// update our m_errno while here
|
|
if ( ! m_errno ) {
|
|
m_errno = m->m_errno;
|
|
}
|
|
|
|
if ( ! m_si->m_showErrors ) {
|
|
*level = CR_ERROR_SUMMARY;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// a special case
|
|
if ( mr && ( mr->m_errno == CR_RULESET_FILTERED || mr->m_errno == EDOCFILTERED ) ) {
|
|
*level = CR_RULESET_FILTERED;
|
|
continue;
|
|
}
|
|
|
|
if ( ! m_si->m_showBanned && mr && mr->m_isBanned ) {
|
|
if ( m_si->m_debug || g_conf.m_logDebugQuery )
|
|
logf( LOG_DEBUG, "query: result %" PRId32 " (docid=%" PRId64 ") is banned and will not be shown.",
|
|
i, m_msg3a.m_docIds[i] );
|
|
*level = CR_BANNED_URL;
|
|
continue;
|
|
}
|
|
|
|
// corruption?
|
|
if ( mr && !mr->ptr_ubuf ) {
|
|
log( "msg40: got corrupt msg20 reply for docid %" PRId64, mr->m_docId );
|
|
*level = CR_BAD_URL;
|
|
continue;
|
|
}
|
|
|
|
// don't filter out disallowed root doc
|
|
if (mr && mr->m_indexCode == EDOCDISALLOWEDROOT) {
|
|
continue;
|
|
}
|
|
|
|
// temporarily disabled: if titledb has old records with content and redirect then this ends up filtering out most results and the whole query will be very slow
|
|
// // filter simplified redirection/non-caconical document
|
|
// if (mr && mr->size_rubuf > 1 && (mr->m_contentLen <= 0 || mr->m_httpStatus != 200 ||
|
|
// mr->m_indexCode == EDOCNONCANONICAL || mr->m_indexCode == EDOCSIMPLIFIEDREDIR)) {
|
|
// if (!m_si->m_showErrors) {
|
|
// *level = CR_EMPTY_REDIRECTION_PAGE;
|
|
// continue;
|
|
// }
|
|
// }
|
|
|
|
// filter empty title & summaries
|
|
if ( mr && mr->size_tbuf <= 1 && mr->size_displaySum <= 1 ) {
|
|
if ( ! m_si->m_showErrors ) {
|
|
*level = CR_EMPTY_TITLE_SUMMARY;
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
// what is the deduping threshhold? 0 means do not do deuping
|
|
int32_t dedupPercent = 0;
|
|
if ( m_si->m_doDupContentRemoval && m_si->m_percentSimilarSummary )
|
|
dedupPercent = m_si->m_percentSimilarSummary;
|
|
// icc=1 turns this off too i think
|
|
if ( m_si->m_includeCachedCopy ) dedupPercent = 0;
|
|
// if the user only requested docids, we have no summaries
|
|
if ( m_si->m_docIdsOnly ) dedupPercent = 0;
|
|
|
|
// filter out duplicate/similar summaries
|
|
for ( int32_t i = 0 ; dedupPercent && i < m_numReplies ; i++ ) {
|
|
// skip if already invisible
|
|
if ( m_msg3a.m_clusterLevels[i] != CR_OK ) continue;
|
|
// Skip if invalid
|
|
if ( m_msg20[i]->m_errno ) continue;
|
|
|
|
// get it
|
|
const Msg20Reply *mri = m_msg20[i]->m_r;
|
|
|
|
// see if any result lower-scoring than #i is a dup of #i
|
|
for( int32_t m = i+1 ; m < m_numReplies ; m++ ) {
|
|
// get current cluster level
|
|
char *level = &m_msg3a.m_clusterLevels[m];
|
|
// skip if already invisible
|
|
if ( *level != CR_OK ) continue;
|
|
// get it
|
|
if ( m_msg20[m]->m_errno ) continue;
|
|
|
|
const Msg20Reply *mrm = m_msg20[m]->m_r;
|
|
|
|
// use gigabit vector to do topic clustering, etc.
|
|
const int32_t *vi = (int32_t *)mri->ptr_vbuf;
|
|
const int32_t *vm = (int32_t *)mrm->ptr_vbuf;
|
|
float s ;
|
|
s = computeSimilarity(vi,vm,NULL,NULL,NULL);
|
|
// skip if not similar
|
|
if ( (int32_t)s < dedupPercent ) continue;
|
|
// otherwise mark it as a summary dup
|
|
if ( m_si->m_debug || g_conf.m_logDebugQuery )
|
|
logf( LOG_DEBUG, "query: result #%" PRId32" (docid=%" PRId64") is %.02f%% similar-summary of #%" PRId32" (docid=%" PRId64")",
|
|
m, m_msg3a.m_docIds[m] ,
|
|
s, i, m_msg3a.m_docIds[i] );
|
|
*level = CR_DUP_SUMMARY;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
//
|
|
// BEGIN URL NORMALIZE AND COMPARE
|
|
//
|
|
|
|
// . ONLY DEDUP URL if it explicitly enabled AND we are not performing
|
|
// a site: or suburl: query.
|
|
if(m_si->m_dedupURL &&
|
|
!m_si->m_q.m_hasPositiveSiteField &&
|
|
!m_si->m_q.m_hasSubUrlField) {
|
|
for(int32_t i = 0 ; i < m_msg3a.m_numDocIds ; i++) {
|
|
// skip if already invisible
|
|
if(m_msg3a.m_clusterLevels[i] != CR_OK) continue;
|
|
|
|
// get it
|
|
const Msg20Reply *mr = m_msg20[i]->m_r;
|
|
|
|
// hash the URL all in lower case to catch wiki dups
|
|
const char *url = mr-> ptr_ubuf;
|
|
int32_t ulen = mr->size_ubuf - 1;
|
|
|
|
// since the redirect url is a more accurate
|
|
// representation of the conent do that if it exists.
|
|
if ( mr->ptr_rubuf ) {
|
|
url = mr-> ptr_rubuf;
|
|
ulen = mr->size_rubuf - 1;
|
|
}
|
|
|
|
// fix for directories, sometimes they are indexed
|
|
// without a trailing slash, so let's normalize to
|
|
// this standard.
|
|
if(url[ulen-1] == '/')
|
|
ulen--;
|
|
|
|
Url u;
|
|
u.set( url, ulen );
|
|
url = u.getHost();
|
|
|
|
if(u.getPathLen() > 1) {
|
|
// . remove sub-domain to fix conflicts with
|
|
// sites having www,us,en,fr,de,uk,etc AND
|
|
// it redirects to the same page.
|
|
const char *host = u.getHost();
|
|
const char *mdom = u.getMidDomain();
|
|
if(mdom && host) {
|
|
int32_t hlen = mdom - host;
|
|
if (isVariantLikeSubDomain(host, hlen-1))
|
|
url = mdom;
|
|
}
|
|
}
|
|
|
|
// adjust url string length
|
|
ulen -= url - u.getUrl();
|
|
|
|
uint64_t h = hash64Lower_a(url, ulen);
|
|
int32_t slot = m_urlTable.getSlot(h);
|
|
// if there is no slot,this url doesn't exist => add it
|
|
if(slot == -1) {
|
|
m_urlTable.addKey(h,mr->m_docId);
|
|
} else {
|
|
// If there was a slot, denote with the
|
|
// cluster level URL already exited previously
|
|
char *level = &m_msg3a.m_clusterLevels[i];
|
|
if(m_si->m_debug || g_conf.m_logDebugQuery)
|
|
logf(LOG_DEBUG, "query: result #%" PRId32" (docid=%" PRId64") is the same URL as (docid=%" PRId64")",
|
|
i,m_msg3a.m_docIds[i],
|
|
m_urlTable.getValueFromSlot(slot));
|
|
*level = CR_DUP_URL;
|
|
}
|
|
}
|
|
}
|
|
|
|
//
|
|
// END URL NORMALIZE AND COMPARE
|
|
//
|
|
|
|
// show time
|
|
int64_t took = gettimeofdayInMilliseconds() - startTime;
|
|
if ( took > 3 )
|
|
log(LOG_INFO,"query: Took %" PRId64" ms to do clustering and dup removal.",took);
|
|
|
|
if(!submitUrlRealtimeClassification())
|
|
return false;
|
|
else
|
|
return gotEnoughSummaries();
|
|
}
|
|
|
|
|
|
struct UrlClassificationContext {
|
|
Msg40 *msg40;
|
|
int i;
|
|
UrlClassificationContext(Msg40 *msg40_, int i_) : msg40(msg40_), i(i_) {}
|
|
};
|
|
|
|
//start classifying the URLs of the results
|
|
bool Msg40::submitUrlRealtimeClassification() {
|
|
if(!g_urlRealtimeClassification.realtimeUrlClassificationWorks()) {
|
|
log(LOG_DEBUG,"Bypassing URL realtime classification because it is diabled or not working");
|
|
return true; //done
|
|
}
|
|
|
|
{
|
|
ScopedLock sl(m_mtxRealtimeClassificationsCounters);
|
|
m_realtimeClassificationsSubmitted = true;
|
|
}
|
|
|
|
int num_started = 0;
|
|
int num_wanted_to_start = 0;
|
|
for(int i=0; i<m_numReplies; i++) {
|
|
if(m_msg3a.m_clusterLevels[i]==CR_OK) {
|
|
num_wanted_to_start++;
|
|
Msg20 *m = m_msg20[i];
|
|
std::string url(m->m_r->ptr_ubuf,m->m_r->size_ubuf);
|
|
UrlClassificationContext *ucc = new UrlClassificationContext(this,i);
|
|
incrementRealtimeClassificationsStarted();
|
|
if(g_urlRealtimeClassification.classifyUrl(url.c_str(),&urlClassificationCallback0,ucc)) {
|
|
logTrace(g_conf.m_logTraceUrlClassification, "URL classification of '%s' started", url.c_str());
|
|
num_started++;
|
|
} else {
|
|
logTrace(g_conf.m_logTraceUrlClassification, "URL classification of '%s' NOT started", url.c_str());
|
|
incrementRealtimeClassificationsCompleted();
|
|
delete ucc;
|
|
}
|
|
}
|
|
}
|
|
|
|
log(LOG_DEBUG,"msg40: Started URL classification on %d out of %d URLs (wanted %d)", num_started, m_numReplies, num_wanted_to_start);
|
|
|
|
bool done;
|
|
{
|
|
ScopedLock sl(m_mtxRealtimeClassificationsCounters);
|
|
m_realtimeClassificationsSubmitted = false;
|
|
done = (m_numRealtimeClassificationsCompleted == m_numRealtimeClassificationsStarted);
|
|
}
|
|
|
|
return done;
|
|
}
|
|
|
|
|
|
void Msg40::urlClassificationCallback0(void *context, uint32_t classification) {
|
|
UrlClassificationContext *ucc = reinterpret_cast<UrlClassificationContext*>(context);
|
|
ucc->msg40->urlClassificationCallback1(ucc->i,classification);
|
|
delete ucc;
|
|
}
|
|
|
|
void Msg40::urlClassificationCallback1(int i, uint32_t classification) {
|
|
if(classification&URL_CLASSIFICATION_MALICIOUS) {
|
|
m_msg3a.m_clusterLevels[i] = CR_MALICIOUS;
|
|
log(LOG_DEBUG,"URL '%*.*s' classified as malicous. Filtering it out",
|
|
(int)m_msg20[i]->m_r->size_ubuf, (int)m_msg20[i]->m_r->size_ubuf, m_msg20[i]->m_r->ptr_ubuf);
|
|
}
|
|
if(incrementRealtimeClassificationsCompleted()) {
|
|
log(LOG_TRACE,"msg40: all URL classifications completed");
|
|
if(gotEnoughSummaries()) {
|
|
log(LOG_INFO, "query: Msg40 end: query_id='%s' query='%s', results=%d", m_si->m_queryId, m_si->m_query, getNumResults());
|
|
m_callback(m_state);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
bool Msg40::gotEnoughSummaries() {
|
|
m_omitCount = 0;
|
|
|
|
// count how many are visible!
|
|
int32_t visible = 0;
|
|
// loop over each clusterLevel and set it
|
|
for ( int32_t i = 0 ; i < m_numReplies ; i++ ) {
|
|
// get current cluster level
|
|
const char *level = &m_msg3a.m_clusterLevels[i];
|
|
// on CR_OK
|
|
if ( *level == CR_OK ) visible++;
|
|
// otherwise count as ommitted
|
|
else m_omitCount++;
|
|
}
|
|
|
|
// . let's wait for the tasks to complete before even trying to launch
|
|
// more than the first MAX_OUTSTANDING msg20s
|
|
// . the msg3a re-call will end up re-doing our tasks as well! so we
|
|
// have to make sure they complete at this point
|
|
if ( m_tasksRemaining > 0 ) return false;
|
|
|
|
// debug
|
|
bool debug = (m_si->m_debug || g_conf.m_logDebugQuery);
|
|
for ( int32_t i = 0 ; debug && i < m_msg3a.m_numDocIds ; i++ ) {
|
|
int32_t cn = (int32_t)m_msg3a.m_clusterLevels[i];
|
|
if ( cn < 0 || cn >= CR_END ) { g_process.shutdownAbort(true); }
|
|
const char *s = g_crStrings[cn];
|
|
if ( ! s ) { g_process.shutdownAbort(true); }
|
|
logf(LOG_DEBUG, "query: msg40 final hit #%" PRId32") d=%" PRIu64" cl=%" PRId32" (%s)",
|
|
i,m_msg3a.m_docIds[i],(int32_t)m_msg3a.m_clusterLevels[i],s);
|
|
}
|
|
|
|
if ( debug )
|
|
logf (LOG_DEBUG,"query: msg40: firstResult=%" PRId32", "
|
|
"totalDocIds=%" PRId32", resultsWanted=%" PRId32" "
|
|
"visible=%" PRId32" toGet=%" PRId32" recallCnt=%" PRId32,
|
|
m_si->m_firstResultNum, m_msg3a.m_numDocIds ,
|
|
m_docsToGetVisible, visible,
|
|
//m_numContiguous,
|
|
m_docsToGet , m_msg3aRecallCnt);
|
|
|
|
// if we do not have enough visible, try to get more
|
|
if ( visible < m_docsToGetVisible && m_msg3a.m_moreDocIdsAvail &&
|
|
// do not spin too long in this!
|
|
// TODO: fix this better somehow later
|
|
m_docsToGet <= 1000 &&
|
|
// doesn't work on multi-coll just yet, it cores
|
|
m_numCollsToSearch == 1 ) {
|
|
if(m_deadline>0 && m_deadline>gettimeofdayInMilliseconds()) {
|
|
// can it cover us?
|
|
int32_t need = m_docsToGet + 20;
|
|
// increase by 25 percent as well
|
|
need *= 1.25;
|
|
// note it
|
|
log("msg40: too many summaries invisible. getting more docids from msg3a merge and getting summaries. "
|
|
"%" PRId32" are visible, need %" PRId32". %" PRId32" to %" PRId32". numReplies=%" PRId32" numRequests=%" PRId32,
|
|
visible, m_docsToGetVisible,
|
|
m_msg3a.m_docsToGet, need,
|
|
m_numReplies, m_numRequests);
|
|
|
|
// get more!
|
|
m_docsToGet = need;
|
|
// reset this before launch
|
|
m_numReplies = 0;
|
|
m_numRequests = 0;
|
|
// reprocess all!
|
|
m_lastProcessedi = -1;
|
|
// let's do it all from the top!
|
|
log(LOG_INFO, "query: Msg40 redo: query_id='%s' query='%s', visible=%d", m_si->m_queryId, m_si->m_query, visible);
|
|
return getDocIds ( true ) ;
|
|
} else {
|
|
log("msg40: many summaries invisible but deadline has been passed. %d are visible, wanted %d",
|
|
visible, m_docsToGetVisible);
|
|
}
|
|
}
|
|
|
|
// get time now
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
// . add the stat for how long to get all the summaries
|
|
// . use purple for tie to get all summaries
|
|
// . THIS INCLUDES Msg3a/Msg39 RECALLS!!!
|
|
// . can we subtract that?
|
|
//"get_all_summaries"
|
|
g_stats.addStat_r ( 0, m_startTime, now, 0x008220ff );
|
|
|
|
// timestamp log
|
|
if ( g_conf.m_logTimingQuery || m_si->m_debug )
|
|
logf(LOG_DEBUG,"query: msg40: [%p] Got %" PRId32" summaries in %" PRId64" ms",
|
|
this ,
|
|
visible, // m_visibleContiguous,
|
|
now - m_startTime );
|
|
|
|
|
|
// set m_moreToCome, if true, we print a "Next 10" link
|
|
m_moreToCome = (visible > m_si->m_docsWanted+m_si->m_firstResultNum);
|
|
if ( m_si->m_debug || g_conf.m_logDebugQuery ) {
|
|
logf( LOG_DEBUG, "query: msg40: more? %d", m_moreToCome );
|
|
}
|
|
|
|
// alloc m_buf, which should be NULL
|
|
if ( m_buf ) { g_process.shutdownAbort(true); }
|
|
|
|
// . we need to collapse m_msg3a.m_docIds[], etc. into m_docIds[] etc
|
|
// to be just the docids we wanted.
|
|
// . at this point we should merge in all docids from all Msg40s from
|
|
// different clusters, etc.
|
|
// . now alloc space for "docsWanted" m_docIds[], m_scores[],
|
|
// m_bitScores[], m_clusterLevels[] and m_newMsg20[]
|
|
|
|
//
|
|
// HACK TIME
|
|
//
|
|
|
|
// . bury filtered/clustered docids from m_msg3a.m_docIds[]
|
|
// . also remove result no in the request window specified by &s=X&n=Y
|
|
// where "s" is m_si->m_firstResultNum (which starts at 0) and "n"
|
|
// is the number of results requested, m_si->m_docsWanted
|
|
// . this is a bit of a hack (MDW)
|
|
int32_t c = 0;
|
|
int32_t v = 0;
|
|
for ( int32_t i = 0 ; i < m_msg3a.m_numDocIds ; i++ ) {
|
|
// must ahve a cluster level of CR_OK (visible)
|
|
// v is the visible count
|
|
if ( ( m_msg3a.m_clusterLevels[i] != CR_OK ) || ( v++ < m_si->m_firstResultNum ) ) {
|
|
// skip
|
|
continue;
|
|
}
|
|
|
|
// we got a winner, save it
|
|
m_msg3a.m_docIds [c] = m_msg3a.m_docIds [i];
|
|
m_msg3a.m_scores [c] = m_msg3a.m_scores [i];
|
|
m_msg3a.m_flags [c] = m_msg3a.m_flags [i];
|
|
m_msg3a.m_clusterLevels [c] = m_msg3a.m_clusterLevels [i];
|
|
m_msg20 [c] = m_msg20 [i];
|
|
|
|
if ( m_msg3a.m_scoreInfos ) {
|
|
m_msg3a.m_scoreInfos [c] = m_msg3a.m_scoreInfos [i];
|
|
}
|
|
|
|
int32_t need = m_si->m_docsWanted;
|
|
|
|
// if done, bail
|
|
if ( ++c >= need ) {
|
|
break;
|
|
}
|
|
}
|
|
// reset the # of docids we got to how many we kept!
|
|
m_msg3a.m_numDocIds = c;
|
|
|
|
// debug
|
|
for ( int32_t i = 0 ; debug && i < m_msg3a.m_numDocIds ; i++ )
|
|
logf(LOG_DEBUG, "query: msg40 clipped hit #%" PRId32") d=%" PRIu64" cl=%" PRId32" (%s)",
|
|
i,m_msg3a.m_docIds[i],(int32_t)m_msg3a.m_clusterLevels[i],
|
|
g_crStrings[(int32_t)m_msg3a.m_clusterLevels[i]]);
|
|
|
|
//
|
|
// END HACK
|
|
//
|
|
|
|
//Old logic for whether to store the msg40+results in the cache or not. Logic looks fine
|
|
//but the cache has been removed a long time ago.
|
|
//
|
|
// // . uc = use cache?
|
|
// // . store in cache now if we need to
|
|
// bool uc = false;
|
|
// if ( m_si->m_useCache ) uc = true;
|
|
// if ( m_si->m_wcache ) uc = true;
|
|
// // . do not store if there was an error
|
|
// // . no, allow errors in cache since we often have lots of
|
|
// // docid not founds and what not, due to index corruption and
|
|
// // being out of sync with titledb
|
|
// if ( m_errno &&
|
|
// // forgive "Record not found" errors, they are quite common
|
|
// m_errno != ENOTFOUND ) {
|
|
// logf(LOG_DEBUG,"query: not storing in cache: %s",
|
|
// mstrerror(m_errno));
|
|
// uc = false;
|
|
// }
|
|
// if ( m_si->m_docIdsOnly ) uc = false;
|
|
//
|
|
// // all done if not storing in cache
|
|
// if ( ! uc ) return true;
|
|
|
|
|
|
// ignore errors
|
|
g_errno = 0;
|
|
return true;
|
|
}
|
|
|
|
//For the purpose of clustering and result suppression these hosts are considered the same:
|
|
// example.com
|
|
// www.example.com
|
|
// fr.example.com
|
|
// pl.example.com
|
|
//etc.
|
|
static const char * const s_variantLikeSubDomains[] = {
|
|
//Language sub-domains. Basically ISO 639-1 language codes, unknown revision (two-letter codes)
|
|
//Not perfec, eg. we don't detect nynorsk vs. bokmål because those have iso 639-2 three-letter codes
|
|
"aa",
|
|
"ab",
|
|
"af",
|
|
"am",
|
|
"ar",
|
|
"as",
|
|
"ay",
|
|
"az",
|
|
"ba",
|
|
"be",
|
|
"bg",
|
|
"bh",
|
|
"bi",
|
|
"bn",
|
|
"bo",
|
|
"br",
|
|
"ca",
|
|
"co",
|
|
"cs",
|
|
"cy",
|
|
"da",
|
|
"de",
|
|
"dz",
|
|
"el",
|
|
"en",
|
|
"eo",
|
|
"es",
|
|
"et",
|
|
"eu",
|
|
"fa",
|
|
"fi",
|
|
"fj",
|
|
"fo",
|
|
"fr",
|
|
"fy",
|
|
"ga",
|
|
"gd",
|
|
"gl",
|
|
"gn",
|
|
"gu",
|
|
"ha",
|
|
"he",
|
|
"hi",
|
|
"hr",
|
|
"hu",
|
|
"hy",
|
|
"ia",
|
|
"id",
|
|
"ie",
|
|
"ik",
|
|
"is",
|
|
"it",
|
|
"iu",
|
|
"ja",
|
|
"jw",
|
|
"ka",
|
|
"kk",
|
|
"kl",
|
|
"km",
|
|
"kn",
|
|
"ko",
|
|
"ks",
|
|
"ku",
|
|
"ky",
|
|
"la",
|
|
"ln",
|
|
"lo",
|
|
"lt",
|
|
"lv",
|
|
"mg",
|
|
"mi",
|
|
"mk",
|
|
"ml",
|
|
"mn",
|
|
"mo",
|
|
"mr",
|
|
"ms",
|
|
"mt",
|
|
"my",
|
|
"na",
|
|
"ne",
|
|
"nl",
|
|
"no",
|
|
"oc",
|
|
"om",
|
|
"or",
|
|
"pa",
|
|
"pl",
|
|
"ps",
|
|
"pt",
|
|
"qu",
|
|
"rm",
|
|
"rn",
|
|
"ro",
|
|
"ru",
|
|
"rw",
|
|
"sa",
|
|
"sd",
|
|
"sg",
|
|
"sh",
|
|
"si",
|
|
"sk",
|
|
"sl",
|
|
"sm",
|
|
"sn",
|
|
"so",
|
|
"sq",
|
|
"sr",
|
|
"ss",
|
|
"st",
|
|
"su",
|
|
"sv",
|
|
"sw",
|
|
"ta",
|
|
"te",
|
|
"tg",
|
|
"th",
|
|
"ti",
|
|
"tk",
|
|
"tl",
|
|
"tn",
|
|
"to",
|
|
"tr",
|
|
"ts",
|
|
"tt",
|
|
"tw",
|
|
"ug",
|
|
"uk",
|
|
"ur",
|
|
"uz",
|
|
"vi",
|
|
"vo",
|
|
"wo",
|
|
"xh",
|
|
"yi",
|
|
"yo",
|
|
"za",
|
|
"zh",
|
|
"zu",
|
|
// Common Country sub-domains
|
|
"us" ,
|
|
"uk" ,
|
|
// Common web sub-domains
|
|
"www"
|
|
};
|
|
static HashTable s_variantLikeSubDomainTable;
|
|
static bool s_variantLikeSubDomainInitialized = false;
|
|
static GbMutex s_variantLikeSubDomainMutex;
|
|
|
|
static bool initVariantLikeSubDomainTable(HashTable *table, const char * const words[], int32_t size ){
|
|
// set up the hash table
|
|
if ( ! table->set ( size * 2 ) ) {
|
|
log(LOG_INIT, "build: Could not init sub-domain table.");
|
|
return false;
|
|
}
|
|
// now add in all the stop words
|
|
int32_t n = (int32_t)size/ sizeof(char *);
|
|
for ( int32_t i = 0 ; i < n ; i++ ) {
|
|
const char *sw = words[i];
|
|
int32_t swlen = strlen ( sw );
|
|
int32_t h = hash32Lower_a(sw, swlen);
|
|
int32_t slot = table->getSlot(h);
|
|
// if there is no slot, this url doesn't exist => add it
|
|
if(slot == -1)
|
|
table->addKey(h,0);
|
|
else
|
|
log(LOG_INIT,"build: Sub-domain table has duplicates");
|
|
}
|
|
return true;
|
|
}
|
|
|
|
static bool isVariantLikeSubDomain(const char *s , int32_t len) {
|
|
ScopedLock sl(s_variantLikeSubDomainMutex);
|
|
if ( ! s_variantLikeSubDomainInitialized ) {
|
|
s_variantLikeSubDomainInitialized = initVariantLikeSubDomainTable(&s_variantLikeSubDomainTable, s_variantLikeSubDomains, sizeof(s_variantLikeSubDomains));
|
|
if (!s_variantLikeSubDomainInitialized)
|
|
return false;
|
|
}
|
|
sl.unlock();
|
|
|
|
// get from table
|
|
int32_t h = hash32Lower_a(s, len);
|
|
if(s_variantLikeSubDomainTable.getSlot(h) == -1)
|
|
return false;
|
|
return true;
|
|
}
|
|
|
|
|
|
void Msg40::incrementRealtimeClassificationsStarted() {
|
|
ScopedLock sl(m_mtxRealtimeClassificationsCounters);
|
|
m_numRealtimeClassificationsStarted++;
|
|
if(m_numRealtimeClassificationsCompleted>=m_numRealtimeClassificationsStarted) gbshutdownLogicError();
|
|
}
|
|
|
|
bool Msg40::incrementRealtimeClassificationsCompleted() {
|
|
ScopedLock sl(m_mtxRealtimeClassificationsCounters);
|
|
m_numRealtimeClassificationsCompleted++;
|
|
if(m_numRealtimeClassificationsCompleted>m_numRealtimeClassificationsStarted) gbshutdownLogicError();
|
|
return m_numRealtimeClassificationsCompleted==m_numRealtimeClassificationsStarted && !m_realtimeClassificationsSubmitted;
|
|
}
|
|
|
|
bool Msg40::areAllRealtimeClassificationsCompleted() const {
|
|
ScopedLock sl(const_cast<GbMutex&>(m_mtxRealtimeClassificationsCounters));
|
|
return (!m_realtimeClassificationsSubmitted) && (m_numRealtimeClassificationsCompleted==m_numRealtimeClassificationsStarted);
|
|
}
|