333 lines
7.8 KiB
C++
333 lines
7.8 KiB
C++
#include "Msg6b.h"
|
|
#include "Tagdb.h"
|
|
|
|
void gotDocIdListWrapper(void* state);
|
|
void gotMsg22Wrapper(void *state);
|
|
void gotMsg8aWrapper(void *state);
|
|
|
|
#define MAX_OUTSTANDING_MSG6B 20
|
|
|
|
|
|
|
|
Msg6b::Msg6b() {
|
|
m_msg22s = NULL;
|
|
m_msg8as = NULL;
|
|
//m_siteRecs = NULL;
|
|
m_tagRecs = NULL;
|
|
m_numMsg22s = 0;
|
|
// reset();
|
|
}
|
|
|
|
|
|
void Msg6b::reset() {
|
|
m_numToGet = 0;
|
|
m_numToKeep = 0;
|
|
m_docIdPtr = NULL;
|
|
m_lastDocIdPtr = NULL;
|
|
if(m_numMsg22s <= 0) return;
|
|
if(m_msg22s) {
|
|
delete [] (m_msg22s);
|
|
mdelete(m_msg22s,
|
|
m_numMsg22s * sizeof(Msg22),
|
|
"Msg6bMsg22s");
|
|
}
|
|
m_msg22s = NULL;
|
|
|
|
if(m_msg8as) {
|
|
delete [] (m_msg8as);
|
|
mdelete( m_msg8as, m_numMsg22s * sizeof(Msg8a),
|
|
"Msg6bMsg8as" );
|
|
}
|
|
m_msg8as = NULL;
|
|
|
|
//if(m_siteRecs) {
|
|
// delete [] (m_siteRecs);
|
|
// mdelete(m_siteRecs, m_numMsg22s * sizeof(SiteRec),
|
|
// "Msg6bSiteRecs" );
|
|
//}
|
|
//m_siteRecs = NULL;
|
|
if ( m_tagRecs ) {
|
|
delete [] (m_tagRecs);
|
|
mdelete ( m_tagRecs , m_numMsg22s * sizeof(TagRec) , "msg6btb");
|
|
m_tagRecs = NULL;
|
|
}
|
|
|
|
m_numMsg22s = 0;
|
|
}
|
|
|
|
|
|
Msg6b::~Msg6b() { reset(); }
|
|
|
|
|
|
|
|
bool Msg6b::getTitlerecSample(char* query,
|
|
char* coll,
|
|
int32_t collLen,
|
|
int32_t numSamples,
|
|
int32_t numToKeep,
|
|
void *state,
|
|
bool (*recordCallback) (void *state,
|
|
TitleRec* tr,
|
|
TagRec *tagRec),
|
|
void (*endCallback) (void *state),
|
|
bool doSiteClustering,
|
|
bool doIpClustering,
|
|
bool getTagRecs,
|
|
int32_t niceness) {
|
|
//clear it out;
|
|
reset();
|
|
strcpy(m_coll, coll);
|
|
m_collLen = collLen;
|
|
m_callbackState = state;
|
|
m_recordCallback = recordCallback;
|
|
m_endCallback = endCallback;
|
|
|
|
CollectionRec* cr = g_collectiondb.getRec ( m_coll );
|
|
if ( ! cr ) {
|
|
g_errno = ENOCOLLREC;
|
|
log("admin: no collection record found ");
|
|
return true;
|
|
}
|
|
|
|
|
|
m_query.set ( query , gbstrlen(query) , m_coll , m_collLen , 0 /*boolFlag*/ );
|
|
|
|
//log(LOG_WARN, "Msg6b got %s query.",query);
|
|
|
|
m_numToGet = numSamples;
|
|
m_numToKeep = numToKeep;
|
|
if(m_numToGet < m_numToKeep) m_numToGet = m_numToKeep;
|
|
|
|
//get twice as many docids as they want to account for errors
|
|
int32_t docsWanted = m_numToGet * 2;
|
|
m_niceness = niceness;
|
|
m_getTagRecs = getTagRecs;
|
|
|
|
int32_t tierStage0 = cr->m_tierStage0;
|
|
int32_t tierStage1 = cr->m_tierStage1;
|
|
int32_t tierStage2 = cr->m_tierStage2;
|
|
|
|
// set our request
|
|
Msg39Request req;
|
|
req.ptr_coll = m_coll;
|
|
req.size_coll = m_collLen+1;
|
|
req.m_docsToGet = docsWanted;
|
|
req.m_niceness = m_niceness;
|
|
req.m_doSiteClustering = doSiteClustering;
|
|
req.m_doIpClustering = doIpClustering;
|
|
req.m_doDupContentRemoval = false;
|
|
req.m_tierSize0 = tierStage0 ;
|
|
req.m_tierSize1 = tierStage1 ;
|
|
req.m_tierSize2 = tierStage2 ;
|
|
req.ptr_query = m_query.m_orig;
|
|
req.size_query = m_query.m_origLen+1;
|
|
req.m_timeout = 100000; // very high
|
|
|
|
g_errno = 0;
|
|
// . get the docIds
|
|
// . this sets m_msg3a.m_clusterLevels[] for us
|
|
if ( ! m_msg3a.getDocIds ( &req ,
|
|
&m_query ,
|
|
this ,
|
|
gotDocIdListWrapper ))
|
|
return false;
|
|
|
|
return gotDocIdList();
|
|
}
|
|
|
|
void gotDocIdListWrapper(void* state) {
|
|
Msg6b* THIS = (Msg6b*)state;
|
|
if(THIS->gotDocIdList()) {
|
|
THIS->callbackIfDone();
|
|
}
|
|
}
|
|
|
|
|
|
bool Msg6b::gotDocIdList() {
|
|
m_docIdPtr = m_msg3a.getDocIds();
|
|
m_lastDocIdPtr = m_docIdPtr + m_msg3a.getNumDocIds();
|
|
|
|
//log(LOG_WARN, "Msg6b got %"INT32" docids.",m_msg3a.getNumDocIds());
|
|
if(m_docIdPtr == m_lastDocIdPtr) return true;
|
|
|
|
|
|
if(m_numToKeep < MAX_OUTSTANDING_MSG6B)
|
|
m_numToKeep = MAX_OUTSTANDING_MSG6B;
|
|
|
|
m_numMsg22s = m_numToKeep;
|
|
//sanity check:
|
|
if (m_msg22s || m_msg8as || m_tagRecs ) {
|
|
log(LOG_WARN, "admin: trying to reuse msg6b object "
|
|
"without freeing");
|
|
char *xx = NULL; *xx = 0;
|
|
}
|
|
|
|
try { m_msg22s = new Msg22[m_numMsg22s]; }
|
|
catch ( ... ) {
|
|
log("admin: Msg6b could not malloc enough memory for TitleRec sample.");
|
|
return true;
|
|
}
|
|
mnew(m_msg22s, sizeof(Msg22)*m_numMsg22s, "Msg6bMsg22s");
|
|
|
|
if(m_getTagRecs ){
|
|
try { m_msg8as = new Msg8a[m_numMsg22s]; }
|
|
catch ( ... ) {
|
|
log("admin: Msg6b could not malloc enough memory.");
|
|
return true;
|
|
}
|
|
mnew( m_msg8as, m_numMsg22s * sizeof(Msg8a), "Msg6bMsg8as" );
|
|
//try { m_siteRecs = new SiteRec[m_numMsg22s]; }
|
|
//catch ( ... ) {
|
|
// log("admin: Msg6b could not malloc enough memory.");
|
|
// return true;
|
|
//}
|
|
//mnew( m_siteRecs, m_numMsg22s * sizeof(SiteRec), "Msg6bSiteRecs" );
|
|
try { m_tagRecs = new TagRec[m_numMsg22s]; }
|
|
catch ( ... ) {
|
|
log("admin: Msg6b could not malloc enough memory.");
|
|
return true;
|
|
}
|
|
mnew( m_tagRecs, m_numMsg22s * sizeof(TagRec), "Msg6bTagRecs" );
|
|
|
|
}
|
|
else {
|
|
m_msg8as = NULL;
|
|
//m_siteRecs = NULL;
|
|
m_tagRecs = NULL;
|
|
}
|
|
|
|
for(int32_t i = 0; i < m_numMsg22s; i++) {
|
|
m_msg22s[i].m_slot = i;
|
|
m_msg22s[i].m_parent = this;
|
|
if(m_getTagRecs ) {//SiteRecs) {
|
|
m_msg8as[i].m_slotNum = i;
|
|
m_msg8as[i].m_parent = this;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
m_numGotten = 0;
|
|
m_lastSlotUsed = 0;
|
|
|
|
bool noBlock = true;
|
|
for(int32_t i = 0; i < m_numToKeep; i++) {
|
|
noBlock &= getMsg22s(i);
|
|
m_lastSlotUsed++;
|
|
}
|
|
return noBlock;
|
|
}
|
|
|
|
|
|
bool Msg6b::getMsg22s(int32_t sampleNum) {
|
|
//just send them over the network now, matt seems to think
|
|
//that it will not slow things down much
|
|
int64_t goodDocId = -1;
|
|
if(m_docIdPtr < m_lastDocIdPtr &&
|
|
m_numOutstanding < m_numToKeep &&
|
|
m_numGotten < m_numToGet ) {
|
|
m_numOutstanding++;
|
|
goodDocId = *m_docIdPtr++;
|
|
} else {
|
|
return true;
|
|
}
|
|
|
|
if ( ! m_msg22s[sampleNum].getTitleRec ( NULL ,
|
|
goodDocId ,
|
|
true ,
|
|
m_coll ,
|
|
m_collLen ,
|
|
NULL ,
|
|
false ,
|
|
false ,
|
|
false ,
|
|
&m_msg22s[sampleNum] ,
|
|
gotMsg22Wrapper,
|
|
m_niceness ,//niceness
|
|
false ,
|
|
false ,
|
|
60*60*24 , //maxcacheage
|
|
60 )) {
|
|
return false;
|
|
}
|
|
return gotMsg22s(sampleNum);
|
|
}
|
|
|
|
|
|
void gotMsg22Wrapper(void *state) {
|
|
Msg22* m22 = (Msg22*)state;
|
|
Msg6b* THIS = (Msg6b*)m22->m_parent;
|
|
if(THIS->gotMsg22s(m22->m_slot)) {
|
|
THIS->callbackIfDone();
|
|
}
|
|
}
|
|
|
|
bool Msg6b::gotMsg22s(int32_t sampleNum) {
|
|
if(!m_getTagRecs) return gotMsg8as(sampleNum);
|
|
|
|
TitleRec* tr = m_msg22s[sampleNum].getTitleRec();
|
|
if(tr->isEmpty()) return getMsg22s(sampleNum);
|
|
if(m_msg22s[sampleNum].m_errno != 0) return getMsg22s(sampleNum);
|
|
|
|
|
|
if ( ! m_msg8as[sampleNum].getTagRec ( tr->getUrl(),
|
|
m_coll ,
|
|
m_collLen ,
|
|
true , // useCanonicalName?
|
|
m_niceness , // niceness
|
|
&m_msg8as[sampleNum] ,
|
|
gotMsg8aWrapper ,
|
|
&m_tagRecs[sampleNum] ))
|
|
return false;
|
|
return gotMsg8as(sampleNum);
|
|
}
|
|
|
|
|
|
void gotMsg8aWrapper(void *state){
|
|
Msg8a* m8a = (Msg8a*)state;
|
|
Msg6b* THIS = (Msg6b*)m8a->m_parent;
|
|
if(THIS->gotMsg8as(m8a->m_slotNum)) {
|
|
THIS->callbackIfDone();
|
|
}
|
|
}
|
|
|
|
//note: we also come here if we were not getting siterecs
|
|
bool Msg6b::gotMsg8as(int32_t sampleNum) {
|
|
m_numOutstanding--;
|
|
if(m_msg22s[sampleNum].m_errno != 0) return getMsg22s(sampleNum);
|
|
|
|
TitleRec* tr = m_msg22s[sampleNum].getTitleRec();
|
|
if(tr->isEmpty()) return getMsg22s(sampleNum);
|
|
|
|
m_numGotten++;
|
|
|
|
//we call their callback, if they return true we keep this slot
|
|
//for them, i.e. we can't relaunch a request reusing the titlerec.
|
|
bool keepSlot = false;
|
|
if(m_recordCallback) {
|
|
//SiteRec* sr = NULL;
|
|
TagRec *tagRec = NULL;
|
|
if(m_getTagRecs) tagRec =&m_tagRecs[sampleNum];
|
|
keepSlot = m_recordCallback(m_callbackState, tr, tagRec);
|
|
}
|
|
|
|
|
|
if(m_numGotten < m_numToGet) {
|
|
if(keepSlot) {
|
|
m_lastSlotUsed++;
|
|
if(m_lastSlotUsed < m_numToKeep)
|
|
return getMsg22s(m_lastSlotUsed);
|
|
}
|
|
return getMsg22s(sampleNum);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
bool Msg6b::callbackIfDone() {
|
|
if(m_numOutstanding != 0) return false;
|
|
m_endCallback(m_callbackState);
|
|
return true;
|
|
}
|