// TODO: if the first 20 or so do NOT have the same hostname, then stop // and set all clusterRecs to CR_OK #include "Msg51.h" #include "gb-include.h" #include "Clusterdb.h" #include "Stats.h" #include "HashTableT.h" #include "HashTableX.h" #include "RdbCache.h" #include "ScopedLock.h" #include "Sanity.h" #include "Titledb.h" #include "Collectiondb.h" #include "UdpServer.h" // how many Msg0 requests can we launch at the same time? #define MSG51_MAX_REQUESTS 60 static const int signature_init = 0xe1d3c5b7; // . these must be 1-1 with the enums above // . used for titling the counts of g_stats.m_filterStats[] const char * const g_crStrings[] = { "cluster rec not found" , // 0 "uninitialized" , "got clusterdb record" , "has adult bit" , "has wrong language" , "clustered" , "malformed url" , "banned url" , "empty title and summary", "summary error" , "duplicate" , "clusterdb error (subcount of visible)" , "duplicate url", "empty redirection page" , "visible" , "blacklisted" , "ruleset filtered" , "malicious", "end -- do not use" }; RdbCache s_clusterdbQuickCache; static bool s_cacheInit = false; Msg51::Msg51() : m_slot(NULL), m_numSlots(0) { m_clusterRecs = NULL; m_clusterLevels = NULL; pthread_mutex_init(&m_mtx,NULL); set_signature(); reset(); } Msg51::~Msg51 ( ) { reset(); clear_signature(); } void Msg51::reset ( ) { m_clusterRecs = NULL; m_clusterLevels = NULL; m_numSlots = 0; if( m_slot ) { delete[] m_slot; m_slot = NULL; } // Coverity m_docIds = NULL; m_numDocIds = 0; m_callback = NULL; m_state = NULL; m_nexti = 0; m_numRequests = 0; m_numReplies = 0; m_errno = 0; m_niceness = 0; m_collnum = 0; m_isDebug = false; } // . returns false if blocked, true otherwise // . sets g_errno on error bool Msg51::getClusterRecs ( const int64_t *docIds, char *clusterLevels , key96_t *clusterRecs , int32_t numDocIds , collnum_t collnum , void *state , void (* callback)( void *state ) , int32_t niceness , // output bool isDebug ) { verify_signature(); // warning if ( collnum < 0 ) log(LOG_LOGIC,"net: NULL collection. msg51."); // reset this msg reset(); // get the collection rec CollectionRec *cr = g_collectiondb.getRec ( collnum ); if ( ! cr ) { log("db: msg51. Collection rec null for collnum %" PRId32".", (int32_t)collnum); g_errno = EBADENGINEER; gbshutdownLogicError(); } // keep a pointer for the caller m_state = state; m_callback = callback; m_collnum = collnum; // these are storage for the requester m_docIds = docIds; m_clusterLevels = clusterLevels; m_clusterRecs = clusterRecs; m_numDocIds = numDocIds; m_isDebug = isDebug; // bail if none to do if ( m_numDocIds <= 0 ) return true; m_nexti = 0; // for i/o mostly m_niceness = niceness; m_errno = 0; // reset these m_numRequests = 0; m_numReplies = 0; // clear/initialize these if(m_numDocIds<MSG51_MAX_REQUESTS) m_numSlots = m_numDocIds; else m_numSlots = MSG51_MAX_REQUESTS; m_slot = new Slot[m_numSlots]; for ( int32_t i = 0 ; i < m_numSlots ; i++ ) { m_slot[i].m_msg51 = this; m_slot[i].m_inUse = false; } // . do gathering // . returns false if blocked, true otherwise // . send up to MSG51_MAX_REQUESTS requests at the same time return sendRequests ( -1 ); } bool Msg51::sendRequests(int32_t k) { verify_signature(); ScopedLock sl(m_mtx); return sendRequests_unlocked(k); } // . returns false if blocked, true otherwise // . sets g_errno on error (and m_errno) // . k is a hint of which msg0 to use // . if k is -1 we do a complete scan to find available m_msg0[x] bool Msg51::sendRequests_unlocked(int32_t k) { verify_signature(); bool anyAsyncRequests = false; sendLoop: // bail if no slots available if ( m_numRequests - m_numReplies >= m_numSlots ) return false; // any requests left to send? if ( m_nexti >= m_numDocIds ) { if ( anyAsyncRequests ) //we started an async request return false; if ( m_numRequests > m_numReplies ) //still waiting for replies return false; // we are done! return true; } // sanity check if ( m_clusterLevels[m_nexti] < 0 || m_clusterLevels[m_nexti] >= CR_END ) { gbshutdownLogicError(); } // skip if we already got the rec for this guy! if ( m_clusterLevels[m_nexti] != CR_UNINIT ) { m_nexti++; goto sendLoop; } // . check our quick local cache to see if we got it // . use a max age of 1 hour // . this cache is primarly meant to avoid repetetive lookups // when going to the next tier in Msg3a and re-requesting cluster // recs for the same docids we did a second ago RdbCache *c = &s_clusterdbQuickCache; if ( ! s_cacheInit ) c = NULL; int32_t crecSize; char *crecPtr = NULL; key96_t ckey = (key96_t)m_docIds[m_nexti]; if ( c ) { RdbCacheLock rcl(*c); bool found = c->getRecord ( m_collnum , ckey , // cache key &crecPtr , // pointer to it &crecSize , false , // do copy? 3600 , // max age in secs true , // inc counts? NULL );// cachedTime if ( found ) { // sanity check if ( crecSize != sizeof(key96_t) ) gbshutdownLogicError(); m_clusterRecs[m_nexti] = *(key96_t *)crecPtr; // it is no longer CR_UNINIT, we got the rec now m_clusterLevels[m_nexti] = CR_GOT_REC; // debug msg //logf(LOG_DEBUG,"query: msg51 getRec k.n0=%" PRIu64" rec.n0=%" PRIu64, // ckey.n0,m_clusterRecs[m_nexti].n0); m_nexti++; goto sendLoop; } } // . do not hog all the udpserver's slots! // . must have at least one outstanding reply so we can process // his reply and come back here... if ( g_udpServer.getNumUsedSlots() > 1000 && m_numRequests > m_numReplies ) return false; // find empty slot int32_t slot ; // ignore bogus hints if ( k >= m_numSlots ) k = -1; // if hint was provided use that if ( k >= 0 && ! m_slot[k].m_inUse ) slot = k; // otherwise, do a scan for the empty slot else { for ( slot = 0 ; slot < m_numSlots ; slot++ ) // break out if available if(!m_slot[slot].m_inUse) break; } // sanity check -- must have one!! if ( slot >= m_numSlots ) gbshutdownLogicError(); // send it, returns false if blocked, true otherwise if( !sendRequest(slot) ) anyAsyncRequests = true; // update any hint to make our loop more efficient if ( k >= 0 ) k++; goto sendLoop; } // . send using m_msg0s[i] class bool Msg51::sendRequest ( int32_t i ) { // what is the docid? int64_t d; // save it int32_t ci = m_nexti; // what's the docid? d = m_docIds[m_nexti]; // advance so we do not do this docid again m_nexti++; m_slot[i].m_ci = ci; m_slot[i].m_inUse = true; // count it m_numRequests++; // lookup in clusterdb, need a start and endkey key96_t startKey = Clusterdb::makeFirstClusterRecKey ( d ); key96_t endKey = Clusterdb::makeLastClusterRecKey ( d ); // . send the request for the cluster rec, use Msg0 // . returns false and sets g_errno on error // . otherwise, it blocks and returns true bool s = m_slot[i].m_msg0.getList( -1 , // hostid RDB_CLUSTERDB , m_collnum , &m_slot[i].m_list, (char *)&startKey , (char *)&endKey , 36 , // minRecSizes &m_slot[i], // state gotClusterRecWrapper51 , m_niceness , true , // doErrorCorrection true , // includeTree -1 , // firstHostId 0 , // startFileNum -1 , // numFiles 30000 , // timeout &m_slot[i].m_msg5, // use for local reads false , // isRealMerge? false , // noSplit? -1 );// forceParitySplit // loop for more if blocked, slot #i is used, block it //if ( ! s ) { i++; continue; } if ( ! s ) { // only wanted this for faster disk page cache hitting so make // sure it is not "double used" by another msg0 //m_msg0[i].m_msg5 = NULL; return false; } // otherwise, process the response gotClusterRec ( &m_slot[i] ); return true; } void Msg51::gotClusterRecWrapper51(void *state) { Slot *slot = static_cast<Slot*>(state); Msg51 *THIS = slot->m_msg51; verify_signature_at(THIS->signature); { ScopedLock sl(THIS->m_mtx); // process it THIS->gotClusterRec(slot); // get slot number for re-send on this slot int32_t k = (int32_t)(slot-THIS->m_slot); // . if not all done, launch the next one // . this returns false if blocks, true otherwise if ( ! THIS->sendRequests_unlocked(k) ) return; } // we don't need to go on if we're not doing deduping THIS->m_callback ( THIS->m_state ); return; } // . sets m_errno to g_errno if not already set void Msg51::gotClusterRec(Slot *slot) { verify_signature(); // count it m_numReplies++; // free up slot->m_inUse = false; RdbList *list = slot->m_msg0.m_list; // update m_errno if we had an error if ( ! m_errno ) m_errno = g_errno; if ( g_errno ) // print error log(LOG_DEBUG, "query: Had error getting cluster info got docId=d: " "%s.",mstrerror(g_errno)); // this doubles as a ptr to a cluster rec int32_t ci = slot->m_ci; // get docid int64_t docId = m_docIds[ci]; // assume error! m_clusterLevels[ci] = CR_ERROR_CLUSTERDB; // bail on error if ( g_errno || list->getListSize() < 12 ) { //log(LOG_DEBUG, // "build: clusterdb rec for d=%" PRId64" dptr=%" PRIu32" " // "not found. where is it?", docId, (int32_t)ci); g_errno = 0; return; } // . steal rec from this multicast // . point to cluster rec, a int32_t key96_t *rec = &m_clusterRecs[ci]; // store the cluster rec itself *rec = *(key96_t *)(list->getList()); // debug note log(LOG_DEBUG, "build: had clusterdb SUCCESS for d=%" PRId64" dptr=%" PRIu32" " "rec.n1=%" PRIx32",%016" PRIx64" sitehash26=0x%" PRIx32".", (int64_t)docId, (int32_t)ci, rec->n1,rec->n0, Clusterdb::getSiteHash26((char *)rec)); // check for docid mismatch int64_t docId2 = Clusterdb::getDocId ( rec ); if ( docId != docId2 ) { logf(LOG_DEBUG,"query: docid mismatch in clusterdb."); return; } // it is legit, set to CR_OK m_clusterLevels[ci] = CR_OK; RdbCacheLock rcl(s_clusterdbQuickCache); // . init the quick cache if(!s_cacheInit && s_clusterdbQuickCache.init(200*1024, // maxMem sizeof(key96_t), // fixedDataSize (clusterdb rec) false, // support lists 10000, // max recs false, // use half keys? "clusterdbQuickCache" , false, // load from disk? sizeof(key96_t), // cache key size sizeof(key96_t))) // cache data size // only init once if successful s_cacheInit = true; // . add the record to our quick cache as a int64_t // . ignore any error if(s_cacheInit) s_clusterdbQuickCache.addRecord(m_collnum, (key96_t)docId, // docid is key (char *)rec, sizeof(key96_t), // recSize 0); // clear it in case the cache set it, we don't care g_errno = 0; } // . cluster the docids based on the clusterRecs // . returns false and sets g_errno on error // . if maxDocIdsPerHostname is -1 do not do hostname clsutering bool setClusterLevels ( const key96_t *clusterRecs, const int64_t *docIds, int32_t numRecs , int32_t maxDocIdsPerHostname , bool doHostnameClustering , bool familyFilter , bool isDebug , // output to clusterLevels[] char *clusterLevels ) { if ( numRecs <= 0 ) return true; // skip if not clustering on anything //if ( ! doHostnameClustering && ! familyFilter ) { // memset ( clusterLevels, CR_OK, numRecs ); // return true; //} HashTableX ctab; // init to 2*numRecs for speed. use 0 for niceness! if ( ! ctab.set ( 8 , 4 , numRecs * 2,NULL,0,false,"clustertab" ) ) return false; // time it u_int64_t startTime = gettimeofdayInMilliseconds(); for(int32_t i=0; i<numRecs; i++) { char *crec = (char *)&clusterRecs[i]; // . set this cluster level // . right now will be CR_ERROR_CLUSTERDB or CR_OK... char *level = &clusterLevels[i]; // sanity check if ( *level == CR_UNINIT ) gbshutdownLogicError(); // and the adult bit, for cleaning the results if ( familyFilter && Clusterdb::hasAdultContent ( crec ) ) { *level = CR_DIRTY; continue; } // if error looking up in clusterdb, use a 8 bit domainhash from docid bool fakeIt = (*level==CR_ERROR_CLUSTERDB); // assume ok, show it, it is visible *level = CR_OK; // site hash comes next if(!doHostnameClustering) continue; // . get the site hash // . these are only 32 bits! int64_t h; if(fakeIt) h = Titledb::getDomHash8FromDocId(docIds[i]); else h = Clusterdb::getSiteHash26 ( crec ); // inc this count! if ( fakeIt ) { g_stats.m_filterStats[CR_ERROR_CLUSTERDB]++; } // if it matches a siteid on our black list //if ( checkNegative && sht.getSlot((int64_t)h) > 0 ) { // *level = CR_BLACKLISTED_SITE; goto loop; } // look it up uint32_t score = ctab.getScore(h) ; // if still visible, just continue if ( score < (uint32_t)maxDocIdsPerHostname ) { if ( ! ctab.addTerm(h)) return false; continue; } // otherwise, no lonegr visible *level = CR_CLUSTERED; } // debug for ( int32_t i = 0 ; i < numRecs && isDebug ; i++ ) { char *crec = (char *)&clusterRecs[i]; uint32_t siteHash26 = Clusterdb::getSiteHash26(crec); logf(LOG_DEBUG,"query: msg51: hit #%" PRId32") sitehash26=%" PRIu32" " "rec.n0=%" PRIx64" docid=%" PRId64" cl=%" PRId32" (%s)", (int32_t)i, (int32_t)siteHash26, clusterRecs[i].n0, (int64_t)docIds[i], (int32_t)clusterLevels[i], g_crStrings[(int32_t)clusterLevels[i]] ); } //log(LOG_DEBUG,"build: numVisible=%" PRId32" numClustered=%" PRId32" numErrors=%" PRId32, // *numVisible,*numClustered,*numErrors); // show time uint64_t took = gettimeofdayInMilliseconds() - startTime; if ( took > 3 ) log(LOG_INFO,"build: Took %" PRId64" ms to do clustering.",took); // we are all done return true; }