#include "gb-include.h" #include "Msg3.h" #include "Rdb.h" #include "Stats.h" // for timing and graphing merge time #include "RdbCache.h" #include "Process.h" #include "GbMutex.h" #include "ScopedLock.h" #include "Sanity.h" #include "Conf.h" #include "Mem.h" #include <new> static const int signature_init = 0x1f2b3a4c; int32_t g_numIOErrors = 0; Msg3::Scan::Scan() : m_scan(), m_startpg(0), m_endpg(0), m_hintOffset(0), m_fileId(0), m_inPageCache(false), m_shiftCount(0), m_list() { memset(m_hintKey,0,sizeof(m_hintKey)); } Msg3::Msg3() : m_scan(NULL), m_numScansStarted(0), m_numScansCompleted(0), m_scansBeingSubmitted(false) { // log(LOG_TRACE,"Msg3(%p)::Msg3()",this); set_signature(); memset(m_constrainKey, 0, sizeof(m_constrainKey)); memset(m_startKey, 0, sizeof(m_startKey)); memset(m_endKey, 0, sizeof(m_endKey)); memset(m_endKeyOrig, 0, sizeof(m_endKeyOrig)); memset(m_hintKey, 0, sizeof(m_hintKey)); reset(); } Msg3::~Msg3() { verify_signature(); // log(LOG_TRACE,"Msg3(%p)::~Msg3()",this); reset(); clear_signature(); } void Msg3::reset() { verify_signature(); // log(LOG_TRACE,"Msg3(%p)::reset()",this); if ( !areAllScansCompleted() ) { g_process.shutdownAbort(true); } m_hadCorruption = false; // reset # of lists to 0 m_numScansCompleted = 0; m_numScansStarted = 0; if ( m_scan ) { //Mem.cpp has bad logic concerning arrays //mdelete(m_scan, -1, "Msg3:scan"); delete[] m_scan; m_scan = NULL; } // Coverity m_rdbId = RDB_NONE; m_collnum = 0; m_validateCache = false; m_startFileNum = 0; m_numFiles = 0; m_numFileNums = 0; m_fileStartKey = NULL; m_minRecSizes = 0; m_minRecSizesOrig = 0; m_niceness = 0; m_errno = 0; m_retryNum = 0; m_maxRetries = 0; m_startTime = 0; m_hintOffset = 0; m_numChunks = 0; m_ks = 0; m_listsChecked = false; m_hadCorruption = false; m_state = NULL; m_callback = NULL; verify_signature(); } void Msg3::incrementScansStarted() { ScopedLock sl(m_mtxScanCounters); m_numScansStarted++; if(m_numScansCompleted>=m_numScansStarted) gbshutdownLogicError(); } bool Msg3::incrementScansCompleted() { ScopedLock sl(m_mtxScanCounters); m_numScansCompleted++; if(m_numScansCompleted>m_numScansStarted) gbshutdownLogicError(); return m_numScansCompleted==m_numScansStarted && !m_scansBeingSubmitted; } bool Msg3::areAllScansCompleted() const { ScopedLock sl(const_cast<GbMutex&>(m_mtxScanCounters)); return (!m_scansBeingSubmitted) && (m_numScansCompleted==m_numScansStarted); } static key192_t makeCacheKey(int64_t vfd, int64_t offset, int64_t readSize) { key192_t k; k.n2 = vfd; k.n1 = readSize; k.n0 = offset; return k; } static RdbCache g_rdbCaches[5]; static GbMutex s_rdbcacheMutex; //protects g_rdbCaches class RdbCache *getDiskPageCache ( rdbid_t rdbId ) { RdbCache *rpc = NULL; int64_t maxMem; int64_t maxRecs; const char *dbname; switch(rdbId) { case RDB_POSDB: rpc = &g_rdbCaches[0]; maxMem = g_conf.m_posdbFileCacheSize; maxRecs = maxMem / 5000; dbname = "posdbcache"; break; case RDB_TAGDB: rpc = &g_rdbCaches[1]; maxMem = g_conf.m_tagdbFileCacheSize; maxRecs = maxMem / 200; dbname = "tagdbcache"; break; case RDB_CLUSTERDB: rpc = &g_rdbCaches[2]; maxMem = g_conf.m_clusterdbFileCacheSize; maxRecs = maxMem / 32; dbname = "clustcache"; break; case RDB_TITLEDB: rpc = &g_rdbCaches[3]; maxMem = g_conf.m_titledbFileCacheSize; maxRecs = maxMem / 3000; dbname = "titdbcache"; break; case RDB_SPIDERDB: rpc = &g_rdbCaches[4]; maxMem = g_conf.m_spiderdbFileCacheSize; maxRecs = maxMem / 3000; dbname = "spdbcache"; break; default: return NULL; } if ( maxMem < 0 ) maxMem = 0; ScopedLock sl(s_rdbcacheMutex); // did size change? if not, return it if ( rpc->getMaxMem() == maxMem ) return rpc; // re-init or init for the first time here if ( ! rpc->init ( maxMem , -1 , // fixedDataSize. -1 since we are lists false , // support lists? maxRecs , false , // use half keys? dbname , false , // loadfromdisk sizeof(key192_t), // cache key size 0 , // data key size -1 ) ) // numptrsmax return NULL; return rpc; } // . return false if blocked, true otherwise // . set g_errno on error // . read list of keys in [startKey,endKey] range // . read at least "minRecSizes" bytes of keys in that range // . the "m_endKey" of resulting, merged list may have a smaller endKey // than the argument, "endKey" due to limitation by "minRecSizes" // . resulting list will contain ALL keys between ITS [m_startKey,m_endKey] // . final merged list "should" try to have a size of at least "minRecSizes" // but due to negative/postive rec elimination may be less // . the endKey of the lists we read may be <= "endKey" provided // . we try to shrink the endKey if minRecSizes is >= 0 in order to // avoid excessive reading // . by shrinking the endKey we cannot take into account the size of deleted // records, so therefore we may fall short of "minRecSizes" in actuality, // in fact, the returned list may even be empty with a shrunken endKey // . we merge all lists read from disk into the provided "list" // . caller should call Msg3.getList(int32_t i) and Msg3:getNumLists() to retrieve // . this makes the query engine faster since we don't need to merge the docIds // and can just send them across the network separately and they will be // hashed into IndexTable's table w/o having to do time-wasting merging. // . caller can specify array of filenums to read from so incremental syncing // in Sync class can just read from titledb*.dat files that were formed // since the last sync point. bool Msg3::readList ( rdbid_t rdbId, collnum_t collnum , const char *startKeyArg , const char *endKeyArg , int32_t minRecSizes , // max size of scan int32_t startFileNum , // first file to scan int32_t numFiles , // rel. to startFileNum void *state , // for callback void (* callback ) ( void *state ) , int32_t niceness , int32_t retryNum , int32_t maxRetries , bool justGetEndKey) { verify_signature(); // reset m_alloc and data in all lists in case we are a re-call reset(); // set this to true to validate m_validateCache = false; // clear, this MUST be done so if we return true g_errno is correct g_errno = 0; // assume lists are not checked for corruption m_listsChecked = false; // warn if ( minRecSizes < -1 ) { log(LOG_LOGIC,"db: Msg3 got minRecSizes of %" PRId32", changing to -1.",minRecSizes); minRecSizes = -1; } // warning if ( collnum < 0 ) { log(LOG_LOGIC,"net: NULL collection. msg3."); } // remember the callback m_rdbId = rdbId; m_collnum = collnum; m_callback = callback; m_state = state; m_niceness = niceness; m_numScansCompleted = 0; m_retryNum = retryNum; m_maxRetries = maxRetries; m_hadCorruption = false; // get keySize of rdb m_ks = getKeySizeFromRdbId ( m_rdbId ); // reset the group error m_errno = 0; // . ensure startKey last bit clear, endKey last bit set // . no! this warning is now only in Msg5 // . if RdbMerge is merging some files, not involving the root // file, then we can expect to get a lot of unmatched negative recs. // . as a consequence, our endKeys may often be negative. This means // it may not annihilate with the positive key, but we should only // miss like this at the boundaries of the lists we fetch. // . so in that case RdbList::merge will stop merging once the // minRecSizes limit is reached even if it means ending on a negative // rec key if ( !KEYNEG(startKeyArg) ) log(LOG_REMIND,"net: msg3: StartKey lastbit set."); if ( KEYNEG(endKeyArg) ) log(LOG_REMIND,"net: msg3: EndKey lastbit clear."); // get base, returns NULL and sets g_errno to ENOCOLLREC on error RdbBase *base = getRdbBase( m_rdbId, m_collnum ); if ( ! base ) { return true; } // save startFileNum here, just for recall m_startFileNum = startFileNum; m_numFiles = numFiles; if ( g_conf.m_logDebugQuery ) log(LOG_DEBUG, "net: msg3: " "sfn=%" PRId32" nf=%" PRId32" db=%s.", (int32_t)startFileNum, (int32_t)numFiles,base->getDbName()); // If we have a merge going on then a tmp.merge.file exist in the files of RdbBase. // The tmp-merge.file or the source files are marked unreadable (because they are // unfinished to about to be deleted). // The input parameters startFileNum and numFiles are very general, but in reality // they are either (0,-1) or (x,1), that is: read all files or a single one. So that // is what we will support as best we can. The non-happening corner-cases about // overlapping ranges with the merge merge etc are on a best-effort basis, viz. result // may be incomplete. if(numFiles==-1) { //all files m_numChunks = base->getNumFiles() - startFileNum; } else { //a specific range (typically just a single file) m_numChunks = numFiles; if(startFileNum+m_numChunks > base->getNumFiles()) m_numChunks = base->getNumFiles() - startFileNum; } if(m_numChunks<0) { //can happen if a merge finishes and deletes files between the upper-logic //iteration over files and this calculation. m_numChunks = 0; } try { m_scan = new Scan[m_numChunks]; } catch(std::bad_alloc&) { log(LOG_WARN, "disk: Could not allocate %d 'Scan' structures to read %s.",m_numChunks,base->getDbName()); g_errno = ENOMEM; return true; } // store the file numbers in the scan array, these are the files we read m_numFileNums = 0; for (int32_t i = startFileNum; i < startFileNum + m_numChunks; i++) { if (base->isReadable(i)) { m_scan[m_numFileNums++].m_fileId = base->getFileId(i); } } // remember the file range we should scan m_numScansStarted = 0; m_numScansCompleted = 0; KEYSET(m_startKey,startKeyArg,m_ks); KEYSET(m_endKey,endKeyArg,m_ks); KEYSET(m_constrainKey,endKeyArg,m_ks);//set incase justGetEndKey istrue m_minRecSizes = minRecSizes; // bail if 0 files to scan -- no! need to set startKey/endKey if ( numFiles == 0 ) return true; // don't read anything if endKey < startKey if ( KEYCMP(m_startKey,m_endKey,m_ks)>0 ) return true; // keep the original in tact in case g_errno == ETRYAGAIN KEYSET(m_endKeyOrig,endKeyArg,m_ks); m_minRecSizesOrig = minRecSizes; // start reading at this key m_fileStartKey = startKeyArg; // start the timer, keep it fast for clusterdb though if ( g_conf.m_logTimingDb ) m_startTime = gettimeofdayInMilliseconds(); // . we now boost m_minRecSizes to account for negative recs // . but not if only reading one list, cuz it won't get merged and // it will be too big to send back if ( m_numFileNums > 1 ) compensateForNegativeRecs ( base ); verify_signature(); // . often endKey is too big for an efficient read of minRecSizes bytes // because we end up reading too much from all the files // . this will set m_startpg[i], m_endpg[i] for each RdbScan/RdbFile // to ensure we read "minRecSizes" worth of records, not much more // . returns the new endKey for all ranges // . now this just overwrites m_endKey setPageRanges(base); // . NEVER let m_endKey be a negative key, because it will // always be unmatched, since delbit is cleared // . adjusting it here ensures our generated hints are valid // . we will use this key to call constrain() with //m_constrainKey = m_endKey; //if ( ( m_constrainKey.n0 & 0x01) == 0x00 ) // m_constrainKey -= (uint32_t)1; KEYSET(m_constrainKey,m_endKey,m_ks); if ( KEYNEG(m_constrainKey) ) KEYDEC(m_constrainKey,m_ks); // Msg5 likes to get the endkey for getting the list from the tree if ( justGetEndKey ) return true; Rdb *rdb = getRdbFromId(m_rdbId); { ScopedLock sl(m_mtxScanCounters); m_scansBeingSubmitted = true; } // . now start reading/scanning the files // . our m_scans array starts at 0 for ( int32_t i = 0 ; i < m_numFileNums ; i++ ) { // get the page range // sanity check if (i > 0 && m_scan[i - 1].m_fileId >= m_scan[i].m_fileId) { log(LOG_LOGIC, "net: msg3: files must be read in order from oldest to newest so RdbList::indexMerge_r " "works properly. Otherwise, corruption will result."); g_process.shutdownAbort(true); } RdbMap *map = base->getMapById(m_scan[i].m_fileId); // this can happen somehow! if (!map) { logError("net: msg3: getMapById with fileId=%" PRId32" returns NULL. Bad engineer.", m_scan[i].m_fileId); continue; } // . sanity check? // . no, we must get again since we turn on endKey's last bit int32_t p1 , p2; map->getPageRange(m_fileStartKey, m_endKey, &p1, &p2, NULL); // now get some read info int64_t offset = map->getAbsoluteOffset ( p1 ); int64_t bytesToRead = map->getRecSizes ( p1, p2, false); incrementScansStarted(); // . keep stats on our disk accesses // . count disk seeks (assuming no fragmentation) // . count disk bytes read if ( bytesToRead > 0 ) { rdb->didSeek ( ); rdb->didRead ( bytesToRead ); } // . the startKey may be different for each RdbScan class // . RdbLists must have all keys within their [startKey,endKey] // . therefore set startKey individually from first page in map // . this endKey must be >= m_endKey // . this startKey must be < m_startKey char startKey2 [ MAX_KEY_BYTES ]; char endKey2 [ MAX_KEY_BYTES ]; map->getKey(p1, startKey2); map->getKey(p2, endKey2); // store in here m_scan[i].m_startpg = p1; m_scan[i].m_endpg = p2; // . we read UP TO that endKey, so reduce by 1 // . but iff p2 is NOT the last page in the map/file // . maps[fn]->getKey(lastPage) will return the LAST KEY // and maps[fn]->getOffset(lastPage) the length of the file if(map->getNumPages() != p2) { //only decrease endKey2 if it larger than startKey2 if(KEYCMP(startKey2,endKey2,m_ks)<0) KEYDEC(endKey2,m_ks); } else { // otherwise, if we're reading all pages, then force the // endKey to virtual inifinite KEYMAX(endKey2,m_ks); } // . set up the hints // . these are only used if we are only reading from 1 file // . these are used to call constrain() so we can constrain // the end of the list w/o looping through all the recs // in the list int32_t h2 = p2 ; // decrease by one page if we're on the last page if ( h2 > p1 && map->getNumPages() == h2 ) h2--; // . decrease hint page until key is <= endKey on that page // AND offset is NOT -1 because the old way would give // us hints passed the endkey // . also decrease so we can constrain on minRecSizes in // case we're the only list being read // . use >= m_minRecSizes instead of >, otherwise we may // never be able to set "size" in RdbList::constrain() // because "p" could equal "maxPtr" right away while ( h2 > p1 && (KEYCMP(map->getKeyPtr(h2),m_constrainKey,m_ks)>0 || map->getOffset(h2) == -1 || map->getAbsoluteOffset(h2) - offset >= m_minRecSizes ) ) { h2--; } // now set the hint m_scan[i].m_hintOffset = map->getAbsoluteOffset(h2) - map->getAbsoluteOffset(p1); KEYSET(m_scan[i].m_hintKey, map->getKeyPtr(h2), m_ks); // reset g_errno before calling setRead() g_errno = 0; // timing debug if ( g_conf.m_logTimingDb ) log(LOG_TIMING, "net: msg: reading %" PRId64" bytes from %s file #%" PRId32" " "(niceness=%" PRId32")", bytesToRead,base->getDbName(),i,m_niceness); // log huge reads, those hurt us if ( bytesToRead > 150000000 ) { logf(LOG_INFO,"disk: Reading %" PRId64" bytes at offset %" PRId64" " "from %s.", bytesToRead,offset,base->getDbName()); } if(bytesToRead > 10000000 && bytesToRead / 2 > m_minRecSizes && base->getFixedDataSize() >= 0) { // if any keys in the map are the same report corruption char tmpKey [MAX_KEY_BYTES]; char lastTmpKey[MAX_KEY_BYTES]; int32_t ccount = 0; for(int32_t pn = p1; pn <= p2; pn++) { map->getKey ( pn , tmpKey ); if(pn!=p1 && KEYCMP(tmpKey,lastTmpKey,m_ks) == 0) ccount++; memcpy(lastTmpKey,tmpKey,sizeof(tmpKey)); } if(ccount > 10) { logf(LOG_INFO,"disk: Reading %" PRId32" bytes from %s fileId=" "%" PRId32" when min " "required is %" PRId32". Map is corrupt and has %" PRId32" " "identical consecutive page keys because the " "map was \"repaired\" because out of order keys " "in the index.", (int32_t)bytesToRead, base->getDbName(), m_scan[i].m_fileId, (int32_t)m_minRecSizes, (int32_t)ccount); incrementScansCompleted(); m_errno = ECORRUPTDATA; m_hadCorruption = true; //m_maxRetries = 0; break; } } //////// // // try to get from PAGE CACHE // //////// m_scan[i].m_inPageCache = false; BigFile *ff = base->getFileById(m_scan[i].m_fileId); if (!ff) { logError("net: msg3: getFileById with fileId=%" PRId32" returns NULL. Bad engineer.", m_scan[i].m_fileId); continue; } RdbCache *rpc = getDiskPageCache ( m_rdbId ); if(rpc) { // . vfd is unique 64 bit file id // . if file is opened vfd is -1, only set in call to open() int64_t vfd = ff->getVfd(); key192_t ck = makeCacheKey ( vfd , offset, bytesToRead); char *rec; int32_t recSize; bool inCache = false; RdbCacheLock rcl(*rpc); if ( vfd != -1 && ! m_validateCache ) inCache = rpc->getRecord ( (collnum_t)0 , // collnum (char *)&ck , &rec , &recSize , true , // copy? -1 , // maxAge, none true ); // inccounts? if ( inCache ) { m_scan[i].m_inPageCache = true; incrementScansCompleted(); // now we have to store this value, 6 or 12 so // we can modify the hint appropriately m_scan[i].m_shiftCount = *rec; m_scan[i].m_list.set ( rec +1, recSize-1 , rec , // alloc recSize , // allocSize startKey2 , endKey2 , base->getFixedDataSize() , true , // owndata base->useHalfKeys() , getKeySizeFromRdbId ( m_rdbId ) ); continue; } } // . do the scan/read of file #i // . this returns false if blocked, true otherwise // . this will set g_errno on error bool done = m_scan[i].m_scan.setRead(ff, base->getFixedDataSize(), offset, bytesToRead, startKey2, endKey2, m_ks, &m_scan[i].m_list, callback ? this : NULL, callback ? &doneScanningWrapper0 : NULL, base->useHalfKeys(), m_rdbId, m_niceness, true); // if it did not block then it completed, so count it if (done) { incrementScansCompleted(); } // break on an error, and remember g_errno in case we block if ( g_errno ) { int32_t tt = LOG_WARN; if ( g_errno == EFILECLOSED ) tt = LOG_INFO; log(tt,"disk: Reading %s had error: %s.", base->getDbName(), mstrerror(g_errno)); m_errno = g_errno; break; } } { ScopedLock sl(m_mtxScanCounters); m_scansBeingSubmitted = false; if(m_numScansStarted!=m_numScansCompleted) return false; //not completed yet } // . if all scans completed without blocking then wrap it up & ret true // . doneScanning may now block if it finds data corruption and must // get the list remotely verify_signature(); return doneScanning(); } void Msg3::doneScanningWrapper0(void *state) { Msg3 *THIS = (Msg3 *) state; THIS->doneScanningWrapper(); } void Msg3::doneScanningWrapper() { verify_signature(); // log(LOG_TRACE,"Msg3(%p)::doneScqanningWrapper()",THIS); bool done = incrementScansCompleted(); // if we had an error, remember it if ( g_errno ) { // get base, returns NULL and sets g_errno to ENOCOLLREC on err RdbBase *base = getRdbBase( m_rdbId, m_collnum ); const char *dbname = "NOT FOUND"; if ( base ) { dbname = base->getDbName(); } int32_t tt = LOG_WARN; if ( g_errno == EFILECLOSED ) { tt = LOG_INFO; } log(tt,"net: Reading %s had error: %s.", dbname,mstrerror(g_errno)); m_errno = g_errno; g_errno = 0; } // return now if we're awaiting more scan completions if ( !done ) { return; } // . give control to doneScanning // . return if it blocks if ( !doneScanning() ) { return; } // if one of our lists was *huge* and could not alloc mem, it was // due to corruption if ( m_hadCorruption ) { g_errno = ECORRUPTDATA; } // if it doesn't block call the callback, g_errno may be set verify_signature(); m_callback ( m_state ); } // . but now that we may get a list remotely to fix data corruption, // this may indeed block bool Msg3::doneScanning ( ) { verify_signature(); // . did we have any error on any scan? // . if so, repeat ALL of the scans g_errno = m_errno; // 2 retry is the default // int32_t max = 2; // see if explicitly provided by the caller // if ( m_maxRetries >= 0 ) max = m_maxRetries; // now use -1 (no max) as the default no matter what int32_t max = -1; // ENOMEM is particulary contagious, so watch out with it... if ( g_errno == ENOMEM && m_maxRetries == -1 ) max = 0; // msg0 sets maxRetries to 2, don't let max stay set to -1 if ( g_errno == ENOMEM && m_maxRetries != -1 ) max = m_maxRetries; // when thread cannot alloc enough read buf it keeps the read buf // set to NULL and BigFile.cpp sets g_errno to EBUFTOOSMALL if ( g_errno == EBUFTOOSMALL && m_maxRetries == -1 ) max = 0; // msg0 sets maxRetries to 2, don't let max stay set to -1 if ( g_errno == EBUFTOOSMALL && m_maxRetries != -1 ) max = m_maxRetries; // this is set above if the map has the same consecutive key repeated // and the read is enormous if ( g_errno == ECORRUPTDATA ) max = 0; // usually bad disk failures, don't retry those forever //if ( g_errno == EIO ) max = 3; // no, now our hitachis return these even when they're good so // we have to keep retrying forever if ( g_errno == EIO ) max = -1; // count these so we do not take drives offline just because // kernel ring buffer complains... if ( g_errno == EIO ) g_numIOErrors++; // bail early on high priority reads for these errors if ( g_errno == EIO && m_niceness == 0 ) max = 0; // on I/O, give up at call it corrupt after a while. some hitachis // have I/O errros on little spots, like gk88, maybe we can fix him if ( g_errno == EIO && m_retryNum >= 5 ) { m_errno = ECORRUPTDATA; m_hadCorruption = true; // do not do any retries any more max = 0; } verify_signature(); // convert m_errno to ECORRUPTDATA if it is EBUFTOOSMALL and the // max of the bytesToRead are over 500MB. // if bytesToRead was ludicrous, then assume that the data file // was corrupted, the map was regenerated and it patched // over the corrupted bits which were 500MB or more in size. // we cannot practically allocate that much, so let's just // give back an empty buffer. treat it like corruption... // the way it patches is to store the same key over all the corrupted // pages, which can get pretty big. so if you read a range with that // key you will be hurting!! // this may be the same scenario as when the rdbmap has consecutive // same keys. see above where we set m_errno to ECORRUPTDATA... if ( g_errno == EBUFTOOSMALL ) { int32_t biggest = 0; for ( int32_t i = 0 ; i < m_numFileNums ; i++ ) { if ( m_scan[i].m_scan.getBytesToRead() < biggest ) continue; biggest = m_scan[i].m_scan.getBytesToRead(); } if ( biggest > 500000000 ) { log(LOG_WARN,"db: Max read size was %" PRId32" > 500000000. Assuming " "corrupt data in data file.",biggest); m_errno = ECORRUPTDATA; m_hadCorruption = true; // do not do any retries on this, the read was > 500MB max = 0; } } // if shutting down gb then limit to 20 so we can shutdown because // it can't shutdown until all threads are out of the queue i think if (g_process.isShuttingDown() && max < 0) { //log("msg3: forcing retries to 0 because shutting down"); max = 0; } // get base, returns NULL and sets g_errno to ENOCOLLREC on error RdbBase *base = getRdbBase( m_rdbId, m_collnum ); if ( ! base ) { return true; } // this really slows things down because it blocks the cpu so // leave it out for now #ifdef GBSANITYCHECK // check for corruption here, do not do it again in Msg5 if we pass if ( ! g_errno ) { // && g_conf.m_doErrorCorrection ) { int32_t i; for ( i = 0 ; i < m_numFileNums ; i++ ) if ( ! m_scan[i].m_list.checkList_r ( false ) ) break; if ( i < m_numFileNums ) { g_errno = ECORRUPTDATA; m_errno = ECORRUPTDATA; max = g_conf.m_corruptRetries; // try 100 times log(LOG_WARN,"db: Encountered corrupt list in file %s.", base->getFile(m_scan[i].m_fileNum)->getFilename()); } else m_listsChecked = true; } #endif verify_signature(); // try to fix this error i've seen if ( g_errno == EBADENGINEER && max == -1 ) max = 100; // . if we had a ETRYAGAIN error, then try again now // . it usually means the whole file or a part of it was deleted // before we could finish reading it, so we should re-read all now // . RdbMerge deletes BigFiles after it merges them // . now that we have threads i'd imagine we'd get EBADFD or something // . i've also seen "illegal seek" as well if ( m_errno && (m_retryNum < max || max < 0) ) { // print the error static time_t s_time = 0; time_t now = getTime(); if ( now - s_time > 5 ) { log(LOG_WARN, "net: Had error reading %s: %s. Retrying. (retry #%" PRId32")", base->getDbName(),mstrerror(m_errno) , m_retryNum ); s_time = now; } // send email alert if in an infinite loop, but don't send // more than once every 2 hours static int32_t s_lastSendTime = 0; if ( m_retryNum == 100 && getTime() - s_lastSendTime > 3600*2){ // remove this for now it is going off all the time //g_pingServer.sendEmail(NULL,//g_hostdb.getMyHost(), // "100 read retries",true); s_lastSendTime = getTime(); } // clear g_errno cuz we should for call to readList() g_errno = 0; // free the list buffer since if we have 1000 Msg3s retrying // it will totally use all of our memory for ( int32_t i = 0 ; i < m_numChunks ; i++ ) m_scan[i].m_list.destructor(); // count retries m_retryNum++; // backoff scheme, wait 200ms more each time int32_t wait ; if ( m_retryNum == 1 ) { wait = 10; } else { wait = 200 * m_retryNum; } // . don't wait more than 10 secs between tries // . i've seen gf0 and gf16 get mega saturated if ( wait > 10000 ) { wait = 10000; } // wait if (g_loop.registerSleepCallback(wait, this, doneSleepingWrapper3, "Msg3::doneSleepingWrapper3", m_niceness)) { return false; } // otherwise, registration failed log(LOG_ERROR, "net: Failed to register sleep callback for retry. " "Abandoning read. This is bad."); // return, g_errno should be set g_errno = EBUFTOOSMALL; m_errno = EBUFTOOSMALL; return true; } verify_signature(); // if we got an error and should not retry any more then give up if ( g_errno ) { log(LOG_ERROR, "net: Had error reading %s: %s. Giving up after %" PRId32" " "retries.", base->getDbName(),mstrerror(g_errno) , m_retryNum ); return true; } // note it if the retry finally worked if ( m_retryNum > 0 ) log(LOG_INFO,"disk: Read succeeded after retrying %" PRId32" times.", (int32_t)m_retryNum); // count total bytes for logging int32_t count = 0; // . constrain all lists to make merging easier // . if we have only one list, then that's nice cuz the constrain // will allow us to send it right away w/ zero copying // . if we have only 1 list, it won't be merged into a final list, // that is, we'll just set m_list = &m_scan[i].m_list for ( int32_t i = 0 ; i < m_numFileNums ; i++ ) { // count total bytes for logging count += m_scan[i].m_list.getListSize(); if(!m_scan[i].m_inPageCache) m_scan[i].m_shiftCount = m_scan[i].m_scan.shiftCount(); // . hint offset is relative to the offset of first key we read // . if that key was only 6 bytes RdbScan shift the list buf // down 6 bytes to make the first key 12 bytes... a // requirement for all RdbLists // . don't inc it, though, if it was 0, pointing to the start // of the list because our shift won't affect that if ( m_scan[i].m_shiftCount == 6 && m_scan[i].m_hintOffset > 0 ) m_scan[i].m_hintOffset += 6; // posdb double compression if ( m_scan[i].m_shiftCount == 12 && m_scan[i].m_hintOffset > 0 ) m_scan[i].m_hintOffset += 12; // . don't constrain on minRecSizes here because it may // make our endKey smaller, which will cause problems // when Msg5 merges these lists. // . If all lists have different endKeys RdbList's merge // chooses the min and will merge in recs beyond that // causing a bad list BECAUSE we don't check to make // sure that recs we are adding are below the endKey // . if we only read from one file then constrain based // on minRecSizes so we can send the list back w/o merging // OR if just merging with RdbTree's list int32_t mrs ; // . constrain to m_minRecSizesOrig, not m_minRecSizes cuz // that could be adjusted by compensateForNegativeRecs() // . but, really, they should be the same if we only read from // the root file if ( m_numFileNums == 1 ) mrs = m_minRecSizesOrig; else mrs = -1; // . this returns false and sets g_errno on error // . like if data is corrupt BigFile *ff = base->getFileById(m_scan[i].m_fileId); // if we did a merge really quick and delete one of the // files we were reading, i've seen 'ff' be NULL const char *filename = "lostfilename"; if ( ff ) filename = ff->getFilename(); // compute cache info RdbCache *rpc = getDiskPageCache ( m_rdbId ); int64_t vfd ; if ( ff ) vfd = ff->getVfd(); key192_t ck ; if ( ff ) ck = makeCacheKey ( vfd , m_scan[i].m_scan.getOffset(), m_scan[i].m_scan.getBytesToRead() ); if ( m_validateCache && ff && rpc && vfd != -1 ) { bool inCache; char *rec; int32_t recSize; RdbCacheLock rcl(*rpc); inCache = rpc->getRecord ( (collnum_t)0 , // collnum (char *)&ck , &rec , &recSize , true , // copy? -1 , // maxAge, none true ); // inccounts? if ( inCache && // 1st byte is RdbScan::m_shifted ( m_scan[i].m_list.getListSize() != recSize-1 || memcmp ( m_scan[i].m_list.getList() , rec+1,recSize-1) != 0 || *rec != m_scan[i].m_scan.shiftCount() ) ) { log(LOG_ERROR, "msg3: cache did not validate"); g_process.shutdownAbort(true); } mfree ( rec , recSize , "vca" ); } /////// // // STORE IN PAGE CACHE // /////// // store what we read in the cache. don't bother storing // if it was a retry, just in case something strange happened. // store pre-constrain call is more efficient. if ( m_retryNum<=0 && ff && rpc && vfd != -1 && ! m_scan[i].m_inPageCache ) { RdbCacheLock rcl(*rpc); char tmpShiftCount = m_scan[i].m_scan.shiftCount(); rpc->addRecord ( (collnum_t)0 , // collnum (char *)&ck , // rec1 is this little thingy &tmpShiftCount, 1, // rec2 m_scan[i].m_list.getList() , m_scan[i].m_list.getListSize() , 0 ); // timestamp. 0 = now } if (!m_scan[i].m_list.constrain(m_startKey, m_constrainKey, mrs, m_scan[i].m_hintOffset, m_scan[i].m_hintKey, m_rdbId, filename)) { log(LOG_WARN, "net: Had error while constraining list read from %s: %s/%s. vfd=%" PRId32" parts=%" PRId32". " "This is likely caused by corrupted data on disk.", mstrerror(g_errno), ff->getDir(), ff->getFilename(), ff->getVfd(), (int32_t)ff->getNumParts() ); continue; } } // print the time if ( g_conf.m_logTimingDb ) { int64_t now = gettimeofdayInMilliseconds(); int64_t took = now - m_startTime; log(LOG_TIMING, "net: Took %" PRId64" ms to read %" PRId32" lists of %" PRId32" bytes total" " from %s (niceness=%" PRId32").", took,m_numFileNums,count,base->getDbName(),m_niceness); } verify_signature(); return true; } void Msg3::doneSleepingWrapper3 ( int fd , void *state ) { Msg3 *THIS = (Msg3 *)state; THIS->doneSleepingWrapper3(); } void Msg3::doneSleepingWrapper3() { verify_signature(); // now try reading again if ( ! doneSleeping ( ) ) return; // if it doesn't block call the callback, g_errno may be set m_callback ( m_state ); } bool Msg3::doneSleeping ( ) { verify_signature(); // unregister g_loop.unregisterSleepCallback(this,doneSleepingWrapper3); // read again if ( ! readList ( m_rdbId , m_collnum , m_startKey , m_endKeyOrig , m_minRecSizesOrig , m_startFileNum , m_numFiles , m_state , m_callback , m_niceness , m_retryNum , m_maxRetries , false ) ) return false; return true; } // . returns a new, smaller endKey // . shrinks endKey while still preserving the minRecSizes requirement // . this is the most confusing subroutine in the project // . this now OVERWRITES endKey with the new one void Msg3::setPageRanges(RdbBase *base) { verify_signature(); // sanity check //if ( m_ks != 12 && m_ks != 16 ) { g_process.shutdownAbort(true); } // . initialize the startpg/endpg for each file // . we read from the first offset on m_startpg to offset on m_endpg // . since we set them equal that means an empty range for each file for ( int32_t i = 0 ; i < m_numFileNums ; i++ ) { RdbMap *map = base->getMapById(m_scan[i].m_fileId); if (!map) { gbshutdownLogicError(); } m_scan[i].m_startpg = map->getPage(m_fileStartKey); m_scan[i].m_endpg = m_scan[i].m_startpg; } // just return if minRecSizes 0 (no reading needed) if ( m_minRecSizes <= 0 ) return; // calculate minKey minus one char lastMinKey[MAX_KEY_BYTES]; bool lastMinKeyIsValid = false; // loop until we find the page ranges that barely satisfy "minRecSizes" for(;;) { // find the map whose next page has the lowest key int32_t minpg = -1; char minKey[MAX_KEY_BYTES]; for (int32_t i = 0; i < m_numFileNums; i++) { RdbMap *map = base->getMapById(m_scan[i].m_fileId); if (!map) { gbshutdownLogicError(); } // this guy is out of race if his end key > "endKey" already if (KEYCMP(map->getKeyPtr(m_scan[i].m_endpg), m_endKey, m_ks) > 0) { continue; } // get the next page after m_scan[i].m_endpg int32_t nextpg = m_scan[i].m_endpg + 1; // if endpg[i]+1 == m_numPages then we maxed out this range if (nextpg > map->getNumPages()) { continue; } // . but this may have an offset of -1 // . which means the page has no key starting on it and // it's occupied by a rec which starts on a previous page while (nextpg < map->getNumPages() && map->getOffset(nextpg) == -1) { nextpg++; } // . continue if his next page doesn't have the minimum key // . if nextpg == getNumPages() then it returns the LAST KEY // contained in the corresponding RdbFile if (minpg != -1 && KEYCMP(map->getKeyPtr(nextpg), minKey, m_ks) > 0) { continue; } // . we got a winner, his next page has the current min key // . if m_scan[i].m_endpg+1 == getNumPages() then getKey() returns the // last key in the mapped file // . minKey should never equal the key on m_scan[i].m_endpg UNLESS // it's on page #m_numPages KEYSET(minKey,map->getKeyPtr(nextpg),m_ks); minpg = i; // if minKey is same as the current key on this endpg, inc it // so we cause some advancement, otherwise, we'll loop forever if (KEYCMP(minKey, map->getKeyPtr(m_scan[i].m_endpg), m_ks) != 0) { continue; } KEYINC(minKey,m_ks); } // . we're done if we hit the end of all maps in the race if ( minpg == -1 ) { return; } // sanity check if (lastMinKeyIsValid && KEYCMP(minKey, lastMinKey, m_ks) <= 0) { g_errno = ECORRUPTDATA; log(LOG_ERROR, "db: Got corrupted map in memory for %s. This is almost " "always because of bad memory. Please replace your RAM.", base->getDbName()); gbshutdownCorrupted(); } // don't let minKey exceed endKey, however if (KEYCMP(minKey, m_endKey, m_ks) > 0) { KEYSET(minKey, m_endKey, m_ks); KEYINC(minKey, m_ks); KEYSET(lastMinKey, m_endKey, m_ks); } else { KEYSET(lastMinKey, minKey, m_ks); KEYDEC(lastMinKey, m_ks); } // it is now valid lastMinKeyIsValid = true; // . advance m_scan[i].m_endpg so that next page < minKey // . we want to read UP TO the first key on m_scan[i].m_endpg for ( int32_t i = 0 ; i < m_numFileNums ; i++ ) { RdbMap *map = base->getMapById(m_scan[i].m_fileId); if (!map) { gbshutdownLogicError(); } m_scan[i].m_endpg = map->getEndPage(m_scan[i].m_endpg, lastMinKey); } // . if the minKey is BIGGER than the provided endKey we're done // . we don't necessarily include records whose key is "minKey" if (KEYCMP(minKey, m_endKey, m_ks) > 0) { return; } // . calculate recSizes per page within [startKey,minKey-1] // . compute bytes of records in [startKey,minKey-1] for each map // . this includes negative records so we may have annihilations // when merging into "diskList" and get less than what we wanted // but endKey should be shortened, so our caller will know to call // again if he wants more int32_t recSizes = 0; for ( int32_t i = 0 ; i < m_numFileNums ; i++ ) { RdbMap *map = base->getMapById(m_scan[i].m_fileId); if (!map) { gbshutdownLogicError(); } recSizes += map->getMinRecSizes(m_scan[i].m_startpg, m_scan[i].m_endpg, m_fileStartKey, lastMinKey, false); } // if we hit it then return minKey -1 so we only read UP TO "minKey" // not including "minKey" if ( recSizes >= m_minRecSizes ) { // . sanity check // . this sanity check fails sometimes, but leave it // out for now... causes the Illegal endkey msgs in // RdbList::indexMerge_r() //if ( KEYNEG(lastMinKey) ) { g_process.shutdownAbort(true); } KEYSET(m_endKey,lastMinKey,m_ks); //return lastMinKey; return; } } } // . we now boost m_minRecSizes to account for negative recs in certain files // . TODO: use floats for averages, not ints void Msg3::compensateForNegativeRecs ( RdbBase *base ) { verify_signature(); // add up counts from each map int64_t totalNegatives = 0; int64_t totalPositives = 0; int64_t totalFileSize = 0; for (int32_t i = 0 ; i < m_numFileNums ; i++) { int32_t fileId = m_scan[i].m_fileId; RdbMap *map = base->getMapById(fileId); if (!map) { log(LOG_LOGIC,"net: msg3: getMapById with fileId=%" PRId32" returns NULL. bad engineer.", fileId); continue; } totalNegatives += map->getNumNegativeRecs(); totalPositives += map->getNumPositiveRecs(); totalFileSize += map->getFileSize(); } // add em all up int64_t totalNumRecs = totalNegatives + totalPositives; // if we have no records on disk, why are we reading from disk? if ( totalNumRecs == 0 ) return ; // what is the size of a negative record? int32_t negRecSize = m_ks; if ( base->getFixedDataSize() == -1 ) negRecSize += 4; // what is the size of all positive recs combined? int64_t posFileSize = totalFileSize - negRecSize * totalNegatives; // . we often overestimate the size of the negative recs for indexdb // because it uses half keys... // . this can make posFileSize go negative and ultimately result in // a newMin of 0x7fffffff which really fucks us up if ( posFileSize < 0 ) posFileSize = 0; // what is the average size of a positive record? int32_t posRecSize = 0; if ( totalPositives > 0 ) posRecSize = posFileSize / totalPositives; // we annihilate the negative recs and their positive pairs int64_t loss = totalNegatives * (negRecSize + posRecSize); // what is the percentage lost? int64_t lostPercent = (100LL * loss) / totalFileSize; // how much more should we read to compensate? int32_t newMin = ((int64_t)m_minRecSizes * (lostPercent + 100LL))/100LL; // newMin will never be smaller unless it overflows if ( newMin < m_minRecSizes ) newMin = 0x7fffffff; // print msg if we changed m_minRecSizes //if ( newMin != m_minRecSizes ) // log("Msg3::compensated from minRecSizes from %" PRId32" to %" PRId32, // m_minRecSizes, newMin ); // set the new min m_minRecSizes = newMin; }