privacore-open-source-searc.../Msg39.cpp
2018-07-30 13:07:02 +02:00

1108 lines
34 KiB
C++

#include "Msg39.h"
#include "UdpSlot.h"
#include "Serialize.h"
#include "Stats.h"
#include "JobScheduler.h"
#include "UdpServer.h"
#include "RdbList.h"
#include "Collectiondb.h"
#include "DocumentIndexChecker.h"
#include "Sanity.h"
#include "Posdb.h"
#include "Conf.h"
#include "ScoringWeights.h"
#include "Mem.h"
#include "Errno.h"
#include "GbSignature.h"
#include <new>
#include "ScopedLock.h"
#include <pthread.h>
#include <assert.h>
#ifdef _VALGRIND_
#include <valgrind/memcheck.h>
#endif
static const int signature_init = 0x2c9a3f0e;
// called to send back the reply
static void sendReply ( UdpSlot *slot ,
Msg39 *msg39 ,
char *reply ,
int32_t replySize ,
int32_t replyMaxSize ,
bool hadError );
namespace {
//a structure for keeping while some sub-task is handled by a different thread/job
class JobState {
public:
declare_signature
Msg39 *msg39;
bool result_ready;
pthread_mutex_t mtx;
pthread_cond_t cond;
JobState(Msg39 *msg39_)
: msg39(msg39_),
result_ready(false)
{
pthread_mutex_init(&mtx,NULL);
pthread_cond_init(&cond,NULL);
set_signature();
}
~JobState() {
verify_signature();
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
clear_signature();
}
void wait_for_finish() {
verify_signature();
ScopedLock sl(mtx);
while(!result_ready)
pthread_cond_wait(&cond,&mtx);
verify_signature();
}
};
//a simple function just signals that the job has been finished
static void JobFinishedCallback(void *state) {
JobState *js = static_cast<JobState*>(state);
verify_signature_at(js->signature);
ScopedLock sl(js->mtx);
assert(!js->result_ready); //can only be called once
js->result_ready = true;
int rc = pthread_cond_signal(&js->cond);
assert(rc==0);
}
} //anonymous namespace
void Msg39Request::reset() {
memset(this, 0, sizeof(*this));
m_docsToGet = 10;
m_niceness = MAX_NICENESS;
m_maxAge = 0;
m_maxQueryTerms = 9999;
m_language = 0;
m_word_variations_config = WordVariationsConfig();
m_debug = false;
m_getDocIdScoringInfo = true;
m_doSiteClustering = true;
m_hideAllClustered = false;
m_doDupContentRemoval = true;
m_addToCache = false;
m_familyFilter = false;
m_timeout = -1; // -1 means auto-compute
m_collnum = -1;
m_useQueryStopWords = true;
m_doMaxScoreAlgo = true;
m_modifyQuery = false; //solution until we get msg39 to carry the whole query information
m_baseScoringParameters.clear();
ptr_query = NULL; // in utf8?
ptr_whiteList = NULL;
size_query = 0;
size_whiteList = 0;
// -1 means to not to docid range restriction
m_minDocId = -1LL;
m_maxDocId = -1LL;
m_numDocIdSplits = 1;
// for widget, to only get results to append to last docid
m_maxSerpScore = 0.0;
m_minSerpDocId = 0LL;
// . search results knobs
// . accumulate the top 10 term pairs from inlink text. lower
// it down from 10 here.
m_realMaxTop = MAX_TOP;
}
bool Msg39::registerHandler ( ) {
// . register ourselves with the udp server
// . it calls our callback when it receives a msg of type 0x39
if ( ! g_udpServer.registerHandler ( msg_type_39, &handleRequest39 ))
return false;
return true;
}
Msg39::Msg39 ()
: m_lists(NULL),
m_clusterBuf(NULL)
{
m_inUse = false;
reset();
}
Msg39::~Msg39 () {
reset();
}
void Msg39::reset() {
if ( m_inUse ) gbshutdownLogicError();
//m_numDocIdSplits = 1;
m_query.reset();
m_numTotalHits = 0;
m_gotClusterRecs = 0;
reset2();
if(m_clusterBuf) {
mfree ( m_clusterBuf, m_clusterBufSize, "Msg39cluster");
m_clusterBuf = NULL;
}
// Coverity
m_slot = NULL;
m_msg39req = NULL;
m_startTime = 0;
m_startTimeQuery = 0;
m_errno = 0;
m_clusterBufSize = 0;
m_clusterDocIds = NULL;
m_clusterLevels = NULL;
m_clusterRecs = NULL;
m_numClusterDocIds = 0;
m_numVisible = 0;
m_debug = false;
}
void Msg39::reset2() {
delete[] m_lists;
m_lists = NULL;
m_msg2.reset();
m_posdbTable.reset();
}
// . handle a request to get a the search results, list of docids only
// . returns false if slot should be nuked and no reply sent
// . sometimes sets g_errno on error
void Msg39::handleRequest39(UdpSlot *slot, int32_t netnice) {
// use Msg39 to get the lists and intersect them
try {
Msg39 *that = new Msg39;
//register msg39 memory
mnew ( that, sizeof(Msg39) , "Msg39" );
// clear it
g_errno = 0;
// . get the resulting docIds, usually blocks
// . sets g_errno on error
that->getDocIds ( slot ) ;
} catch(std::bad_alloc&) {
g_errno = ENOMEM;
log("msg39: new(%" PRId32"): %s",
(int32_t)sizeof(Msg39),mstrerror(g_errno));
sendReply ( slot , NULL , NULL , 0 , 0 ,true);
}
}
// this must always be called sometime AFTER handleRequest() is called
static void sendReply ( UdpSlot *slot , Msg39 *msg39 , char *reply , int32_t replyLen ,
int32_t replyMaxSize , bool hadError ) {
// debug msg
if ( g_conf.m_logDebugQuery || (msg39&&msg39->m_debug) )
logf(LOG_DEBUG,"query: msg39: [%p] "
"Sending reply len=%" PRId32".",
msg39,replyLen);
// sanity
if ( hadError && ! g_errno ) gbshutdownLogicError();
// no longer in use. msg39 will be NULL if ENOMEM or something
if ( msg39 ) msg39->m_inUse = false;
// i guess clear this
int32_t err = g_errno;
g_errno = 0;
// send an error reply if g_errno is set
if ( err ) {
log(LOG_ERROR,"%s:%s:%d: call sendErrorReply. error=%s", __FILE__, __func__, __LINE__, mstrerror(err));
g_udpServer.sendErrorReply( slot, err );
} else {
g_udpServer.sendReply(reply, replyLen, reply, replyMaxSize, slot);
}
// always delete ourselves when done handling the request
if ( msg39 ) {
mdelete ( msg39 , sizeof(Msg39) , "Msg39" );
delete (msg39);
//msg39->~Msg39();
//memset(msg39,-3,sizeof(*msg39));
//::operator delete((void*)msg39);
}
}
// . sets g_errno on error
// . calls gotDocIds to send a reply
void Msg39::getDocIds ( UdpSlot *slot ) {
// remember the slot
m_slot = slot;
// reset this
m_errno = 0;
// get the request
m_msg39req = reinterpret_cast<Msg39Request*>(m_slot->m_readBuf);
int32_t requestSize = m_slot->m_readBufSize;
// ensure it's size is ok
if ( (unsigned)requestSize < sizeof(Msg39Request) ) {
g_errno = EBADREQUESTSIZE;
log(LOG_ERROR,"query: msg39: getDocIds: msg39request is too small (%d bytes): %s.",
requestSize, mstrerror(g_errno) );
sendReply ( m_slot , this , NULL , 0 , 0 , true );
return;
}
// deserialize it before we do anything else
int32_t finalSize = deserializeMsg ( sizeof(Msg39Request),
&m_msg39req->size_termFreqWeights,
&m_msg39req->size_whiteList,
&m_msg39req->ptr_termFreqWeights,
((char*)m_msg39req) + sizeof(*m_msg39req) );
// sanity check
if ( finalSize != requestSize ) {
log("msg39: sending bad request.");
g_errno = EBADREQUESTSIZE;
log(LOG_ERROR,"query: msg39: getDocIds: msg39request deserialization size mismatch (%d != %d): %s.",
finalSize, requestSize, mstrerror(g_errno) );
sendReply ( m_slot , this , NULL , 0 , 0 , true );
return;
}
log(LOG_DEBUG,"query: msg39: processing query_id='%s' query='%.*s', this=%p", m_msg39req->m_queryId, (int)m_msg39req->size_query, m_msg39req->ptr_query, this);
// OK, we have deserialized and checked the msg39request and we can now process
// it by shoveling into the jobe queue. that means that the main thread (or whoever
// called us) is freed up and can do other stuff.
if(!g_jobScheduler.submit(&coordinatorThreadFunc,
NULL, //finish-callback. We don't care
this,
thread_type_query_coordinator,
m_msg39req->m_niceness))
{
log(LOG_ERROR,"Could not add query-coordinator job. Doing it in foreground");
getDocIds2();
}
}
void Msg39::coordinatorThreadFunc(void *state) {
Msg39 *that = static_cast<Msg39*>(state);
log(LOG_DEBUG, "query: msg39: in coordinatorThreadFunc: this=%p", that);
that->getDocIds2();
}
// . the main function to get the docids for the provided query in "req"
// . it always blocks i guess
void Msg39::getDocIds2() {
// flag it as in use
m_inUse = true;
//record start time of the query
m_startTimeQuery = gettimeofdayInMilliseconds();
// a handy thing
m_debug = false;
if ( m_msg39req->m_debug ) m_debug = true;
if ( g_conf.m_logDebugQuery ) m_debug = true;
if ( g_conf.m_logTimingQuery ) m_debug = true;
CollectionRec *cr = g_collectiondb.getRec ( m_msg39req->m_collnum );
if ( ! cr ) {
g_errno = ENOCOLLREC;
log(LOG_LOGIC,"query: msg39: getDocIds: %s." ,
mstrerror(g_errno) );
sendReply ( m_slot , this , NULL , 0 , 0 , true );
return ;
}
// . set our m_query instance
if ( !m_query.set(m_msg39req->ptr_query,
(lang_t)m_msg39req->m_language,
m_msg39req->m_baseScoringParameters.m_bigramWeight,
m_msg39req->m_baseScoringParameters.m_synonymWeight,
&m_msg39req->m_word_variations_config,
m_msg39req->m_useQueryStopWords,
m_msg39req->m_allowHighFrequencyTermCache,
m_msg39req->m_maxQueryTerms) ) {
log("query: msg39: setQuery: %s." ,
mstrerror(g_errno) );
sendReply ( m_slot , this , NULL , 0 , 0 , true );
return ;
}
if(m_debug)
m_query.dumpToLog();
// wtf?
if ( g_errno ) gbshutdownLogicError();
if(m_msg39req->m_modifyQuery) {
bool dont_care; //artifact because queries are parsed both at sender and on each shard.
DerivedScoringWeights dsw;
dsw.init(m_msg39req->m_baseScoringParameters);
m_query.modifyQuery(&dsw, *cr, &dont_care);
}
// set m_errno
if ( m_query.m_truncated ) m_errno = EQUERYTRUNCATED;
// ensure matches with the msg3a sending us this request
if ( m_query.getNumTerms() != m_msg39req->m_nqt ) {
g_errno = EBADENGINEER;
log(LOG_ERROR, "query: Query parsing inconsistency for q='%s'. %i != %i. "
"langid=%d. Check langids and m_queryExpansion parms "
"which are the only parms that could be different in "
"Query::set2(). You probably have different mysynoyms.txt "
"files on two different hosts! check that!!"
,m_query.originalQuery()
,(int)m_query.getNumTerms()
,(int)m_msg39req->m_nqt
,(int)m_msg39req->m_language
);
sendReply ( m_slot , this , NULL , 0 , 0 , true );
return ;
}
//set term frequencyweights based on msg39req
for(int i=0; i<m_query.getNumTerms(); i++)
m_query.m_qterms[i].m_termFreqWeight = ((float *)m_msg39req->ptr_termFreqWeights)[i];
// debug
if ( m_debug )
logf(LOG_DEBUG,"query: msg39: [%p] Got request "
"for q=%s", this,m_query.originalQuery());
// reset this
m_toptree.reset();
controlLoop();
}
// . returns false if blocks true otherwise
// 1. read all termlists for docid range
// 2. intersect termlists to get the intersecting docids
// 3. increment docid ranges and keep going
// 4. when done return the top docids
void Msg39::controlLoop ( ) {
RdbBase *base = getRdbBase(RDB_POSDB,m_msg39req->m_collnum);
if(base==NULL) {
log(LOG_ERROR,"query: Collection %d disappeared", m_msg39req->m_collnum);
g_errno = ENOCOLLREC;
//cant goto hadErrror label due to local variable initialization
sendReply ( m_slot, this, NULL, 0, 0, true );
return;
}
DocumentIndexChecker documentIndexChecker(base);
const int numFiles = base->getNumFiles(); //todo: this can vary if a merge finishes during the query
//todo: choose docid splits based on expected largest rdblist / most common term
//when fixing/re-implementing the docidsplit mechanism fix PosdbTable::allocateTopTree() too
int numDocIdSplits = 1;
const int totalChunks = (numFiles+1)*numDocIdSplits;
int chunksSearched = 0;
if(g_errno) //ugly logic due to C++ prohibited jump over local variable initialization
goto hadError;
for(int fileNum = 0; fileNum<numFiles+1; fileNum++) {
if(fileNum<numFiles && !base->isReadable(fileNum)) {
log(LOG_DEBUG,"posdb file #%d is not currently readable. Skipping", fileNum);
continue;
}
int64_t docidRangeStart = 0;
const int64_t docidRangeDelta = MAX_DOCID / (int64_t)numDocIdSplits;
for(int docIdSplitNumber = 0; docIdSplitNumber < numDocIdSplits; docIdSplitNumber++) {
if(docIdSplitNumber!=0) {
//Estimate if we can do this and next ranges within the deadline
int64_t now = gettimeofdayInMilliseconds();
int64_t time_spent_so_far = now - m_startTimeQuery;
int64_t time_per_range = time_spent_so_far / docIdSplitNumber;
int64_t estimated_this_range_finish_time = now + time_per_range;
int64_t deadline = m_startTimeQuery + m_msg39req->m_timeout;
log(LOG_DEBUG,"query: Msg39::controlLoop(): now=%" PRId64" time_spent_so_far=%" PRId64" time_per_range=%" PRId64" estimated_this_range_finish_time=%" PRId64" deadline=%" PRId64,
now, time_spent_so_far, time_per_range, estimated_this_range_finish_time, deadline);
if(estimated_this_range_finish_time > deadline) {
//estimated completion time crosses the deadline.
log(LOG_INFO,"Msg39::controlLoop(): range %d/%d would cross deadline. Skipping", docIdSplitNumber, m_msg39req->m_numDocIdSplits);
goto skipRest;
}
}
if(fileNum<numFiles && !base->isReadable(fileNum)) {
log(LOG_DEBUG,"posdb file #%d is not currently readable. Skipping", fileNum);
//todo: if a file suddenly becomes unreadable then it means that a merge has started or finihsed and we should really redo the whole query
break;
}
// Reset ourselves, partially, anyway, not m_query etc.
reset2();
// Calculate docid range and fetch lists
int64_t d0 = docidRangeStart;
docidRangeStart += docidRangeDelta;
if(docIdSplitNumber+1 == numDocIdSplits)
docidRangeStart = MAX_DOCID;
else if(docidRangeStart + 20 > MAX_DOCID)
docidRangeStart = MAX_DOCID;
int64_t d1 = docidRangeStart;
if(fileNum!=numFiles)
getLists(fileNum,d0,d1);
else
getLists(-1,d0,d1);
if ( g_errno ) {
log(LOG_ERROR,"Msg39::controlLoop: got error %d after getLists()", g_errno);
goto hadError;
}
// Intersect the lists we loaded (using a thread)
documentIndexChecker.setFileNum(fileNum);
intersectLists(documentIndexChecker);
if ( g_errno ) {
log(LOG_ERROR,"Msg39::controlLoop: got error %d after intersectLists()", g_errno);
goto hadError;
}
// Sum up stats
if ( m_posdbTable.m_t1 ) {
// . measure time to add the lists in bright green
// . use darker green if rat is false (default OR)
g_stats.addStat_r ( 0, m_posdbTable.m_t1, m_posdbTable.m_t2, 0x0000ff00 );
}
// accumulate total hits count over each docid split
m_numTotalHits += m_posdbTable.getTotalHits();
//obsolete comment: minus the shit we filtered out because of gbminint/gbmaxint/gbmin/gbmax/gbsortby/gbrevsortby/gbsortbyint/gbrevsortbyint
m_numTotalHits -= m_posdbTable.getFilteredCount();
chunksSearched++;
}
}
skipRest:
if(m_debug) {
log(LOG_DEBUG,"msg39::controlloop: dumping %d top nodes (before clustering)", m_toptree.getNumUsedNodes());
for(int ti = m_toptree.getHighNode(); ti >= 0; ti = m_toptree.getPrev(ti)) {
const TopNode *t = m_toptree.getNode(ti);
log(LOG_INFO," docid=%15ld score=%f", t->m_docId, t->m_score);
}
}
// ok, we are done, get cluster recs of the winning docids
// . this loads them using msg51 from clusterdb
// . if m_msg39req->m_doSiteClustering is false it just returns true
// . this sets m_gotClusterRecs to true if we get them
getClusterRecs();
// error setting clusterrecs?
if ( g_errno ) {
log(LOG_ERROR,"Msg39::controlLoop: got error %d after getClusterRecs()", g_errno);
goto hadError;
}
// process the cluster recs if we got them
if ( m_gotClusterRecs && ! gotClusterRecs() ) {
log(LOG_ERROR,"Msg39::controlLoop: got error after gotClusterRecs()");
goto hadError;
}
// . all done! set stats and send back reply
// . only sends back the cluster recs if m_gotClusterRecs is true
estimateHitsAndSendReply(chunksSearched/(double)totalChunks);
return;
hadError:
log(LOG_LOGIC,"query: msg39: controlLoop: got error: %s.", mstrerror(g_errno) );
sendReply ( m_slot, this, NULL, 0, 0, true );
}
// . returns false if blocked, true otherwise
// . sets g_errno on error
// . called either from
// 1) doDocIdSplitLoop
// 2) or getDocIds2() if only 1 docidsplit
void Msg39::getLists(int fileNum, int64_t docIdStart, int64_t docIdEnd) {
log(LOG_DEBUG, "query: msg39(this=%p)::getLists()",this);
if ( m_debug ) m_startTime = gettimeofdayInMilliseconds();
// . ask Indexdb for the IndexLists we need for these termIds
// . each rec in an IndexList is a termId/score/docId tuple
// . restrict to this docid?
// . will really make gbdocid:| searches much faster!
int64_t dr = m_query.m_docIdRestriction;
if ( dr ) {
docIdStart = dr;
docIdEnd = dr + 1;
}
// if we have twins, then make sure the twins read different
// pieces of the same docid range to make things 2x faster
int32_t numStripes = g_hostdb.getNumStripes();
int64_t delta2 = ( docIdEnd - docIdStart ) / numStripes;
int32_t stripe = g_hostdb.getMyHost()->m_stripe;
docIdStart += delta2 * stripe; // is this right? // BR 20160313: Doubt it..
docIdEnd = docIdStart + delta2;
// add 1 to be safe so we don't lose a docid
docIdEnd++;
// TODO: add triplet support later for this to split the
// read 3 ways. 4 ways for quads, etc.
//if ( g_hostdb.getNumStripes() >= 3 ) gbshutdownLogicError();
// do not go over MAX_DOCID because it gets masked and
// ends up being 0!!! and we get empty lists
if ( docIdEnd > MAX_DOCID ) docIdEnd = MAX_DOCID;
if ( g_conf.m_logDebugQuery )
{
log(LOG_DEBUG,"query: docId start %" PRId64, docIdStart);
log(LOG_DEBUG,"query: docId end %" PRId64, docIdEnd);
}
//
// set startkey/endkey for each term/termlist
//
for ( int32_t i = 0 ; i < m_query.getNumTerms() ; i++ ) {
// get the term id
int64_t tid = m_query.getTermId(i);
// debug
if ( m_debug )
log("query: setting sk/ek for docids %" PRId64
" to %" PRId64" for termid=%" PRId64
, docIdStart
, docIdEnd
, tid
);
// store now in qterm
Posdb::makeStartKey ( m_query.m_qterms[i].m_startKey, tid, docIdStart );
Posdb::makeEndKey ( m_query.m_qterms[i].m_endKey, tid, docIdEnd );
}
// debug msg
if ( m_debug || g_conf.m_logDebugQuery ) {
for ( int32_t i = 0 ; i < m_query.getNumTerms() ; i++ ) {
// get the term in utf8
//char bb[256];
const QueryTerm *qt = &m_query.m_qterms[i];
//utf16ToUtf8(bb, 256, qt->m_term, qt->m_termLen);
//char *tpc = qt->m_term + qt->m_termLen;
char sign = qt->m_termSign;
if ( sign == 0 ) sign = '0';
const QueryWord *qw = qt->m_qword;
int32_t wikiPhrId = qw->m_wikiPhraseId;
if ( m_query.isPhrase(i) ) wikiPhrId = 0;
char leftwikibigram = 0;
char rightwikibigram = 0;
if ( qt->m_leftPhraseTerm &&
qt->m_leftPhraseTerm->m_isWikiHalfStopBigram )
leftwikibigram = 1;
if ( qt->m_rightPhraseTerm &&
qt->m_rightPhraseTerm->m_isWikiHalfStopBigram )
rightwikibigram = 1;
const QueryTerm *synterm = qt->m_synonymOf;
bool isSynonym = synterm!=NULL;
SafeBuf sb;
sb.safePrintf(
"query: msg39: [%p] "
"query term #%" PRId32" \"%*.*s\" "
"phr=%" PRId32" termId=%" PRIu64" rawTermId=%" PRIu64" "
"tfweight=%.02f "
"sign=%c "
"required=%" PRId32" "
"fieldcode=%" PRId32" "
"wikiphrid=%" PRId32" "
"leftwikibigram=%" PRId32" "
"rightwikibigram=%" PRId32" "
"otermLen=%" PRId32" "
"isSynonym=%s"
"querylangid=%" PRId32" " ,
this ,
i ,
(int)qt->m_termLen, (int)qt->m_termLen, qt->m_term,
(int32_t)m_query.isPhrase(i) ,
m_query.getTermId(i) ,
m_query.getRawTermId(i) ,
((float *)m_msg39req->ptr_termFreqWeights)[i] ,
sign , //c
(int32_t)qt->m_isRequired,
(int32_t)qt->m_fieldCode,
wikiPhrId,
(int32_t)leftwikibigram,
(int32_t)rightwikibigram,
(int32_t)m_query.getTermLen(i) ,
(isSynonym ? "true" : "false"),
(int32_t)m_query.m_langId );
if ( synterm ) {
unsigned stnum = (unsigned)(synterm - m_query.m_qterms);
sb.safePrintf("synofterm#=%u",stnum);
//sb.safeMemcpy(st->m_term,st->m_termLen);
sb.pushChar(' ');
sb.safePrintf("synwid0=%" PRId64" ",qt->m_synWids0);
sb.safePrintf("synwid1=%" PRId64" ",qt->m_synWids1);
sb.safePrintf("synalnumwords=%d ", qt->m_numAlnumWordsInSynonym);
sb.safePrintf("synterm=\"%*.*s\" ", (int)synterm->m_termLen, (int)synterm->m_termLen, synterm->m_term);
}
logf(LOG_DEBUG,"%s",sb.getBufStart());
}
}
// timestamp log
if ( m_debug )
log(LOG_DEBUG,"query: msg39: [%p] "
"Getting %" PRId32" index lists ",
this,m_query.getNumTerms());
// . now get the index lists themselves
// . return if it blocked
// . not doing a merge (last parm) means that the lists we receive
// will be an appending of a bunch of lists so keys won't be in order
// . merging is uneccessary for us here because we hash the keys anyway
// . and merging takes up valuable cpu time
// . caution: the index lists returned from Msg2 are now compressed
// . now i'm merging because it's 10 times faster than hashing anyway
// and the reply buf should now always be <= minRecSizes so we can
// pre-allocate one better, and, 3) this should fix the yahoo.com
// reindex bug
int32_t nqt = m_query.getNumTerms();
try {
m_lists = new RdbList[nqt];
} catch(std::bad_alloc&) {
log(LOG_ERROR,"new[%d] RdbList failed", nqt);
g_errno = ENOMEM;
return;
}
JobState jobState(this);
// call msg2
if ( ! m_msg2.getLists ( m_msg39req->m_collnum,
m_msg39req->m_addToCache,
m_query.m_qterms,
m_query.getNumTerms(),
m_msg39req->ptr_whiteList,
// we need to restrict docid range for
// whitelist as well! this is from
// doDocIdSplitLoop()
fileNum,
docIdStart,
docIdEnd,
//m_query.getNumTerms(),
// 1-1 with query terms
m_lists ,
&jobState, //state
&JobFinishedCallback, //callback
m_msg39req->m_allowHighFrequencyTermCache,
m_msg39req->m_niceness,
m_debug )) {
log(LOG_DEBUG,"m_msg2.getLists returned false - waiting for job to finish");
jobState.wait_for_finish();
} else
log(LOG_DEBUG,"m_msg2.getLists returned true. Must be done");
}
// . now come here when we got the necessary index lists
// . returns false if blocked, true otherwise
// . sets g_errno on error
void Msg39::intersectLists(const DocumentIndexChecker &documentIndexChecker) {
log(LOG_DEBUG, "query: msg39(this=%p)::intersectLists()",this);
// timestamp log
if ( m_debug ) {
log(LOG_DEBUG,"query: msg39: [%p] "
"Got %" PRId32" lists in %" PRId64" ms"
, this,m_query.getNumTerms(),
gettimeofdayInMilliseconds() - m_startTime);
m_startTime = gettimeofdayInMilliseconds();
}
// ensure collection not deleted from under us
if ( ! g_collectiondb.getRec ( m_msg39req->m_collnum ) ) {
g_errno = ENOCOLLREC;
log("msg39: Had error getting termlists: %s.",
mstrerror(g_errno));
return;
}
// . set the IndexTable so it can set it's score weights from the
// termFreqs of each termId in the query
// . this now takes into account the special termIds used for sorting
// by date (0xdadadada and 0xdadadad2 & TERMID_MASK)
// . it should weight them so much so that the summation of scores
// from other query terms cannot make up for a lower date score
// . this will actually calculate the top
// . this might also change m_query.m_termSigns
// . this won't do anything if it was already called
m_posdbTable.init ( &m_query, m_debug, &m_toptree, documentIndexChecker, &m_msg2, m_msg39req);
// if msg2 had ALL empty lists we can cut it short
//todo: check if msg2 lists are all null or empty. If so then bail out
//estimateHitsAndSendReply ( );
// print query term bit numbers here
for ( int32_t i = 0 ; m_debug && i < m_query.getNumTerms() ; i++ ) {
const QueryTerm *qt = &m_query.m_qterms[i];
SafeBuf sb;
sb.safePrintf("query: msg39: BITNUM query term #%" PRId32" \"%*.*s\" "
"termid=%" PRId64" bitnum=%" PRId32" ",
i,
(int)qt->m_termLen, (int)qt->m_termLen, qt->m_term,
qt->m_termId, qt->m_bitNum );
// put it back
logf(LOG_DEBUG,"%s",sb.getBufStart());
}
// timestamp log
if ( m_debug ) {
log(LOG_DEBUG,"query: msg39: [%p] "
"Preparing to intersect "
"took %" PRId64" ms",
this,
gettimeofdayInMilliseconds() - m_startTime );
m_startTime = gettimeofdayInMilliseconds();
}
JobState jobState(this);
// time it
int64_t start = gettimeofdayInMilliseconds();
// . create the thread
// . only one of these type of threads should be launched at a time
if ( g_jobScheduler.submit(&intersectListsThreadFunction,
0, //no finish callback
&jobState,
thread_type_query_intersect,
m_msg39req->m_niceness) ) {
jobState.wait_for_finish();
} else
m_posdbTable.intersectLists();
// time it
int64_t diff = gettimeofdayInMilliseconds() - start;
if ( diff > 10 ) log("query: Intersection job took %" PRId64" ms",diff);
log(LOG_DEBUG, "query: msg39(this=%p)::intersectLists() finished",this);
}
// Use of ThreadEntry parameter is NOT thread safe
void Msg39::intersectListsThreadFunction ( void *state ) {
JobState *js = static_cast<JobState*>(state);
Msg39 *that = js->msg39;
// assume no error since we're at the start of thread call
that->m_errno = 0;
// . do the add
// . addLists() returns false and sets errno on error
// . hash the lists into our table
// . this returns false and sets g_errno on error
// . Msg2 always compresses the lists so be aware that the termId
// has been discarded
that->m_posdbTable.intersectLists();
// . exit the thread
// . threadDoneWrapper will be called by g_loop when he gets the
// thread's termination signal
if (g_errno && !that->m_errno) {
that->m_errno = g_errno;
}
//signal completion directly instead of goiign via the jobscheduler+main thread
JobFinishedCallback(state);
}
// . set the clusterdb recs in the top tree
// . returns false if blocked, true otherwise
// . returns true and sets g_errno on error
void Msg39::getClusterRecs ( ) {
if ( ! m_msg39req->m_doSiteClustering )
return; //nothing to do
// make buf for arrays of the docids, cluster levels and cluster recs
int32_t nodeSize = 8 + 1 + 12;
int32_t numDocIds = m_toptree.getNumUsedNodes();
m_clusterBufSize = numDocIds * nodeSize;
m_clusterBuf = (char *)mmalloc(m_clusterBufSize, "Msg39cluster");
// on error, return true, g_errno should be set
if ( m_clusterBufSize>0 && ! m_clusterBuf ) {
log("query: msg39: Failed to alloc buf for clustering.");
sendReply(m_slot,this,NULL,0,0,true);
return;
}
// assume we got them
m_gotClusterRecs = true;
// parse out the buf
char *p = m_clusterBuf;
// docIds
m_clusterDocIds = (int64_t *)p; p += numDocIds * 8;
m_clusterLevels = (char *)p; p += numDocIds * 1;
m_clusterRecs = (key96_t *)p; p += numDocIds * 12;
// sanity check
if ( p > m_clusterBuf + m_clusterBufSize ) gbshutdownLogicError();
// loop over all results
int32_t nd = 0;
for ( int32_t ti = m_toptree.getHighNode();
ti >= 0 ;
ti = m_toptree.getPrev(ti) , nd++ ) {
// get the guy
TopNode *t = m_toptree.getNode(ti);
// get the docid
//int64_t docId = getDocIdFromPtr(t->m_docIdPtr);
// store in array
m_clusterDocIds[nd] = t->m_docId;
// assume not gotten
m_clusterLevels[nd] = CR_UNINIT;
// assume not found, make the whole thing is 0
m_clusterRecs[nd].n1 = 0;
m_clusterRecs[nd].n0 = 0LL;
}
// store number
m_numClusterDocIds = nd;
// sanity check
if ( nd != m_toptree.getNumUsedNodes() ) gbshutdownLogicError();
JobState jobState(this);
// . ask msg51 to get us the cluster recs
// . it should read it all from the local drives
// . "maxAge" of 0 means to not get from cache (does not include disk)
if ( ! m_msg51.getClusterRecs ( m_clusterDocIds ,
m_clusterLevels ,
m_clusterRecs ,
m_numClusterDocIds ,
m_msg39req->m_collnum,
&jobState, //state
&JobFinishedCallback, //callback
m_msg39req->m_niceness,
m_debug ) )
{
jobState.wait_for_finish();
}
}
// return false and set g_errno on error
bool Msg39::gotClusterRecs() {
if(!m_gotClusterRecs)
return true; //nothing to do
if(!setClusterLevels(m_clusterRecs,
m_clusterDocIds,
m_numClusterDocIds,
2, // maxdocidsperhostname (todo: configurable)
m_msg39req->m_doSiteClustering,
m_msg39req->m_familyFilter,
m_debug,
m_clusterLevels))
{
m_errno = g_errno;
return false;
}
m_numVisible = 0;
// now put the info back into the top tree
int32_t nd = 0;
for(int32_t ti = m_toptree.getHighNode();
ti >= 0;
ti = m_toptree.getPrev(ti) , nd++ ) {
// get the guy
TopNode *t = m_toptree.getNode(ti);
// sanity check
if(t->m_docId!=m_clusterDocIds[nd]) gbshutdownLogicError();
// set it
t->m_clusterLevel = m_clusterLevels[nd];
t->m_clusterRec = m_clusterRecs [nd];
// visible?
if(t->m_clusterLevel==CR_OK)
m_numVisible++;
}
log(LOG_DEBUG,"query: msg39: %" PRId32" docids out of %" PRId32" are visible",
m_numVisible,nd);
// we don't need this anymore
mfree ( m_clusterBuf, m_clusterBufSize, "Msg39cluster");
m_clusterBuf = NULL;
return true;
}
void Msg39::estimateHitsAndSendReply(double pctSearched) {
// no longer in use
m_inUse = false;
// numDocIds counts docs in all tiers when using toptree.
int32_t numDocIds = m_toptree.getNumUsedNodes();
// if we got clusterdb recs in here, use 'em
if(m_gotClusterRecs)
numDocIds = m_numVisible;
// don't send more than the docs that are asked for
if(numDocIds>m_msg39req->m_docsToGet)
numDocIds = m_msg39req->m_docsToGet;
// # of QueryTerms in query
int32_t nqt = m_query.m_numTerms;
// make the reply
Msg39Reply mr;
mr.reset();
mr.m_numDocIds = numDocIds;
// . total estimated hits
mr.m_estimatedHits = m_numTotalHits; //this is now an EXACT count
mr.m_pctSearched = pctSearched;
// sanity check
mr.m_nqt = nqt;
// the m_errno if any
mr.m_errno = m_errno;
// the score info, in no particular order right now
mr.ptr_scoreInfo = m_posdbTable.m_scoreInfoBuf.getBufStart();
mr.size_scoreInfo = m_posdbTable.m_scoreInfoBuf.length();
// that has offset references into posdbtable::m_pairScoreBuf
// and m_singleScoreBuf, so we need those too now
mr.ptr_pairScoreBuf = m_posdbTable.m_pairScoreBuf.getBufStart();
mr.size_pairScoreBuf = m_posdbTable.m_pairScoreBuf.length();
mr.ptr_singleScoreBuf = m_posdbTable.m_singleScoreBuf.getBufStart();
mr.size_singleScoreBuf = m_posdbTable.m_singleScoreBuf.length();
// reserve space for these guys, we fill them in below
mr.ptr_docIds = NULL;
mr.ptr_scores = NULL;
mr.ptr_flags = NULL;
mr.ptr_clusterRecs = NULL;
// this is how much space to reserve
mr.size_docIds = sizeof(int64_t) * numDocIds;
mr.size_scores = sizeof(double) * numDocIds;
mr.size_flags = sizeof(unsigned) * numDocIds;
// if not doing site clustering, we won't have these perhaps...
if(m_gotClusterRecs)
mr.size_clusterRecs = sizeof(key96_t) *numDocIds;
else
mr.size_clusterRecs = 0;
// . that is pretty much it,so serialize it into buffer,"reply"
// . mr.ptr_docIds, etc., will point into the buffer so we can
// re-serialize into it below from the tree
// . returns NULL and sets g_errno on error
// . "true" means we should make mr.ptr_* reference into the
// newly serialized buffer.
int32_t replySize;
char *reply = serializeMsg(sizeof(Msg39Reply), // baseSize
&mr.size_docIds, // firstSizeParm
&mr.size_clusterRecs, // lastSizePrm
&mr.ptr_docIds, // firstStrPtr
&mr, // thisPtr
&replySize,
NULL,
0,
true);
if(!reply){
log("query: Could not allocated memory "
"to hold reply of docids to send back.");
sendReply(m_slot,this,NULL,0,0,true);
return;
}
int64_t *topDocIds = (int64_t*)mr.ptr_docIds;
double *topScores = (double*) mr.ptr_scores;
unsigned *topFlags = (unsigned*)mr.ptr_flags;
key96_t *topRecs = (key96_t*) mr.ptr_clusterRecs;
// sanity
if(nqt!=m_msg2.getNumLists())
log("query: nqt mismatch for q=%s",m_query.originalQuery());
int32_t docCount = 0;
// loop over all results in the TopTree
for(int32_t ti = m_toptree.getHighNode();
ti >= 0;
ti = m_toptree.getPrev(ti))
{
// get the guy
TopNode *t = m_toptree.getNode(ti);
// skip if clusterLevel is bad!
if(m_gotClusterRecs && t->m_clusterLevel!=CR_OK)
continue;
// sanity check
if(t->m_docId<0)
gbshutdownLogicError();
//add it to the reply
topDocIds[docCount] = t->m_docId;
topScores[docCount] = t->m_score;
topFlags[docCount] = t->m_flags;
// supply clusterdb rec? only for full splits
if(m_gotClusterRecs)
topRecs [docCount] = t->m_clusterRec;
docCount++;
if(m_debug) {
logf(LOG_DEBUG,"query: msg39: [%p] "
"%03" PRId32") docId=%012" PRIu64" sum=%.02f",
this, docCount,
t->m_docId,t->m_score);
}
//don't send more than the docs that are wanted
if(docCount>=numDocIds)
break;
}
if(docCount>300 && m_debug)
log("query: Had %" PRId32" nodes in top tree",docCount);
// this is sensitive info
if(m_debug) {
log(LOG_DEBUG,
"query: msg39: [%p] "
"Intersected lists took %" PRId64" (%" PRId64") "
"ms "
"docIdsToGet=%" PRId32" docIdsGot=%" PRId32" "
"q=%s",
this ,
m_posdbTable.m_addListsTime ,
gettimeofdayInMilliseconds() - m_startTime ,
m_msg39req->m_docsToGet ,
numDocIds ,
m_query.getQuery());
}
// now send back the reply
#ifdef _VALGRIND_
VALGRIND_CHECK_MEM_IS_DEFINED(reply,replySize);
#endif
sendReply(m_slot,this,reply,replySize,replySize,false);
return;
}