1238 lines
40 KiB
C++
1238 lines
40 KiB
C++
#include "Msg3.h"
|
|
#include "Rdb.h"
|
|
#include "Stats.h" // for timing and graphing merge time
|
|
#include "RdbCache.h"
|
|
#include "Process.h"
|
|
#include "GbMutex.h"
|
|
#include "ScopedLock.h"
|
|
#include "Sanity.h"
|
|
#include "Conf.h"
|
|
#include "Mem.h"
|
|
#include "Errno.h"
|
|
#include <new>
|
|
#include <cerrno>
|
|
|
|
static const int signature_init = 0x1f2b3a4c;
|
|
int32_t g_numIOErrors = 0;
|
|
|
|
|
|
Msg3::Scan::Scan()
|
|
: m_scan(),
|
|
m_startpg(0), m_endpg(0),
|
|
m_hintOffset(0), m_fileId(0),
|
|
m_inPageCache(false),
|
|
m_shiftCount(0),
|
|
m_list()
|
|
{
|
|
memset(m_hintKey,0,sizeof(m_hintKey));
|
|
}
|
|
|
|
|
|
|
|
Msg3::Msg3()
|
|
: m_scan(NULL),
|
|
m_numScansStarted(0),
|
|
m_numScansCompleted(0),
|
|
m_scansBeingSubmitted(false)
|
|
{
|
|
// log(LOG_TRACE,"Msg3(%p)::Msg3()",this);
|
|
set_signature();
|
|
memset(m_constrainKey, 0, sizeof(m_constrainKey));
|
|
memset(m_startKey, 0, sizeof(m_startKey));
|
|
memset(m_endKey, 0, sizeof(m_endKey));
|
|
memset(m_endKeyOrig, 0, sizeof(m_endKeyOrig));
|
|
memset(m_hintKey, 0, sizeof(m_hintKey));
|
|
reset();
|
|
}
|
|
|
|
|
|
Msg3::~Msg3() {
|
|
verify_signature();
|
|
// log(LOG_TRACE,"Msg3(%p)::~Msg3()",this);
|
|
reset();
|
|
clear_signature();
|
|
}
|
|
|
|
|
|
void Msg3::reset() {
|
|
verify_signature();
|
|
// log(LOG_TRACE,"Msg3(%p)::reset()",this);
|
|
if ( !areAllScansCompleted() ) {
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
m_hadCorruption = false;
|
|
// reset # of lists to 0
|
|
m_numScansCompleted = 0;
|
|
m_numScansStarted = 0;
|
|
if ( m_scan ) {
|
|
//Mem.cpp has bad logic concerning arrays
|
|
//mdelete(m_scan, -1, "Msg3:scan");
|
|
delete[] m_scan;
|
|
m_scan = NULL;
|
|
}
|
|
|
|
// Coverity
|
|
m_rdbId = RDB_NONE;
|
|
m_collnum = 0;
|
|
m_validateCache = false;
|
|
m_startFileNum = 0;
|
|
m_numFiles = 0;
|
|
m_numFileNums = 0;
|
|
m_fileStartKey = NULL;
|
|
m_minRecSizes = 0;
|
|
m_minRecSizesOrig = 0;
|
|
m_niceness = 0;
|
|
m_errno = 0;
|
|
m_retryNum = 0;
|
|
m_maxRetries = 0;
|
|
m_startTime = 0;
|
|
m_hintOffset = 0;
|
|
m_numChunks = 0;
|
|
m_ks = 0;
|
|
m_listsChecked = false;
|
|
m_hadCorruption = false;
|
|
m_state = NULL;
|
|
m_callback = NULL;
|
|
verify_signature();
|
|
}
|
|
|
|
|
|
void Msg3::incrementScansStarted() {
|
|
ScopedLock sl(m_mtxScanCounters);
|
|
m_numScansStarted++;
|
|
if(m_numScansCompleted>=m_numScansStarted) gbshutdownLogicError();
|
|
}
|
|
|
|
bool Msg3::incrementScansCompleted() {
|
|
ScopedLock sl(m_mtxScanCounters);
|
|
m_numScansCompleted++;
|
|
if(m_numScansCompleted>m_numScansStarted) gbshutdownLogicError();
|
|
return m_numScansCompleted==m_numScansStarted && !m_scansBeingSubmitted;
|
|
}
|
|
|
|
bool Msg3::areAllScansCompleted() const {
|
|
ScopedLock sl(const_cast<GbMutex&>(m_mtxScanCounters));
|
|
return (!m_scansBeingSubmitted) && (m_numScansCompleted==m_numScansStarted);
|
|
}
|
|
|
|
|
|
static key192_t makeCacheKey(int64_t vfd,
|
|
int64_t offset,
|
|
int64_t readSize) {
|
|
key192_t k;
|
|
k.n2 = vfd;
|
|
k.n1 = readSize;
|
|
k.n0 = offset;
|
|
return k;
|
|
}
|
|
|
|
static RdbCache g_rdbCaches[5];
|
|
static GbMutex s_rdbcacheMutex; //protects g_rdbCaches
|
|
|
|
class RdbCache *getDiskPageCache ( rdbid_t rdbId ) {
|
|
|
|
RdbCache *rpc = NULL;
|
|
int64_t maxMem;
|
|
int64_t maxRecs;
|
|
const char *dbname;
|
|
switch(rdbId) {
|
|
case RDB_POSDB:
|
|
rpc = &g_rdbCaches[0];
|
|
maxMem = g_conf.m_posdbFileCacheSize;
|
|
maxRecs = maxMem / 5000;
|
|
dbname = "posdbcache";
|
|
break;
|
|
case RDB_TAGDB:
|
|
rpc = &g_rdbCaches[1];
|
|
maxMem = g_conf.m_tagdbFileCacheSize;
|
|
maxRecs = maxMem / 200;
|
|
dbname = "tagdbcache";
|
|
break;
|
|
case RDB_CLUSTERDB:
|
|
rpc = &g_rdbCaches[2];
|
|
maxMem = g_conf.m_clusterdbFileCacheSize;
|
|
maxRecs = maxMem / 32;
|
|
dbname = "clustcache";
|
|
break;
|
|
case RDB_TITLEDB:
|
|
rpc = &g_rdbCaches[3];
|
|
maxMem = g_conf.m_titledbFileCacheSize;
|
|
maxRecs = maxMem / 3000;
|
|
dbname = "titdbcache";
|
|
break;
|
|
case RDB_SPIDERDB_DEPRECATED:
|
|
rpc = &g_rdbCaches[4];
|
|
maxMem = g_conf.m_spiderdbFileCacheSize;
|
|
maxRecs = maxMem / 3000;
|
|
dbname = "spdbcache";
|
|
break;
|
|
default:
|
|
return NULL;
|
|
}
|
|
|
|
if ( maxMem < 0 ) maxMem = 0;
|
|
|
|
ScopedLock sl(s_rdbcacheMutex);
|
|
// did size change? if not, return it
|
|
if ( rpc->getMaxMem() == maxMem )
|
|
return rpc;
|
|
|
|
// re-init or init for the first time here
|
|
if ( ! rpc->init ( maxMem ,
|
|
-1 , // fixedDataSize. -1 since we are lists
|
|
maxRecs ,
|
|
dbname ,
|
|
false , // loadfromdisk
|
|
sizeof(key192_t), // cache key size
|
|
-1 ) ) // numptrsmax
|
|
return NULL;
|
|
|
|
return rpc;
|
|
}
|
|
|
|
// . return false if blocked, true otherwise
|
|
// . set g_errno on error
|
|
// . read list of keys in [startKey,endKey] range
|
|
// . read at least "minRecSizes" bytes of keys in that range
|
|
// . the "m_endKey" of resulting, merged list may have a smaller endKey
|
|
// than the argument, "endKey" due to limitation by "minRecSizes"
|
|
// . resulting list will contain ALL keys between ITS [m_startKey,m_endKey]
|
|
// . final merged list "should" try to have a size of at least "minRecSizes"
|
|
// but due to negative/postive rec elimination may be less
|
|
// . the endKey of the lists we read may be <= "endKey" provided
|
|
// . we try to shrink the endKey if minRecSizes is >= 0 in order to
|
|
// avoid excessive reading
|
|
// . by shrinking the endKey we cannot take into account the size of deleted
|
|
// records, so therefore we may fall short of "minRecSizes" in actuality,
|
|
// in fact, the returned list may even be empty with a shrunken endKey
|
|
// . we merge all lists read from disk into the provided "list"
|
|
// . caller should call Msg3.getList(int32_t i) and Msg3:getNumLists() to retrieve
|
|
// . this makes the query engine faster since we don't need to merge the docIds
|
|
// and can just send them across the network separately and they will be
|
|
// hashed into IndexTable's table w/o having to do time-wasting merging.
|
|
// . caller can specify array of filenums to read from so incremental syncing
|
|
// in Sync class can just read from titledb*.dat files that were formed
|
|
// since the last sync point.
|
|
bool Msg3::readList ( rdbid_t rdbId,
|
|
collnum_t collnum ,
|
|
const char *startKeyArg ,
|
|
const char *endKeyArg ,
|
|
int32_t minRecSizes , // max size of scan
|
|
int32_t startFileNum , // first file to scan
|
|
int32_t numFiles , // rel. to startFileNum
|
|
void *state , // for callback
|
|
void (* callback ) ( void *state ) ,
|
|
int32_t niceness ,
|
|
int32_t retryNum ,
|
|
int32_t maxRetries ,
|
|
bool justGetEndKey) {
|
|
verify_signature();
|
|
|
|
// reset m_alloc and data in all lists in case we are a re-call
|
|
reset();
|
|
|
|
// set this to true to validate
|
|
m_validateCache = false;
|
|
|
|
// clear, this MUST be done so if we return true g_errno is correct
|
|
g_errno = 0;
|
|
|
|
// assume lists are not checked for corruption
|
|
m_listsChecked = false;
|
|
// warn
|
|
if ( minRecSizes < -1 ) {
|
|
log(LOG_LOGIC,"db: Msg3 got minRecSizes of %" PRId32", changing to -1.",minRecSizes);
|
|
minRecSizes = -1;
|
|
}
|
|
|
|
// warning
|
|
if ( collnum < 0 ) {
|
|
log(LOG_LOGIC,"net: NULL collection. msg3.");
|
|
}
|
|
|
|
// remember the callback
|
|
m_rdbId = rdbId;
|
|
m_collnum = collnum;
|
|
m_callback = callback;
|
|
m_state = state;
|
|
m_niceness = niceness;
|
|
m_numScansCompleted = 0;
|
|
m_retryNum = retryNum;
|
|
m_maxRetries = maxRetries;
|
|
m_hadCorruption = false;
|
|
// get keySize of rdb
|
|
m_ks = getKeySizeFromRdbId ( m_rdbId );
|
|
// reset the group error
|
|
m_errno = 0;
|
|
// . ensure startKey last bit clear, endKey last bit set
|
|
// . no! this warning is now only in Msg5
|
|
// . if RdbMerge is merging some files, not involving the root
|
|
// file, then we can expect to get a lot of unmatched negative recs.
|
|
// . as a consequence, our endKeys may often be negative. This means
|
|
// it may not annihilate with the positive key, but we should only
|
|
// miss like this at the boundaries of the lists we fetch.
|
|
// . so in that case RdbList::merge will stop merging once the
|
|
// minRecSizes limit is reached even if it means ending on a negative
|
|
// rec key
|
|
if ( !KEYNEG(startKeyArg) )
|
|
log(LOG_REMIND,"net: msg3: StartKey lastbit set.");
|
|
if ( KEYNEG(endKeyArg) )
|
|
log(LOG_REMIND,"net: msg3: EndKey lastbit clear.");
|
|
|
|
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
|
|
RdbBase *base = getRdbBase( m_rdbId, m_collnum );
|
|
if ( ! base ) {
|
|
return true;
|
|
}
|
|
|
|
// save startFileNum here, just for recall
|
|
m_startFileNum = startFileNum;
|
|
m_numFiles = numFiles;
|
|
|
|
if ( g_conf.m_logDebugQuery )
|
|
log(LOG_DEBUG,
|
|
"net: msg3: "
|
|
"sfn=%" PRId32" nf=%" PRId32" db=%s.",
|
|
(int32_t)startFileNum,
|
|
(int32_t)numFiles,base->getDbName());
|
|
|
|
// If we have a merge going on then a tmp.merge.file exist in the files of RdbBase.
|
|
// The tmp-merge.file or the source files are marked unreadable (because they are
|
|
// unfinished to about to be deleted).
|
|
// The input parameters startFileNum and numFiles are very general, but in reality
|
|
// they are either (0,-1) or (x,1), that is: read all files or a single one. So that
|
|
// is what we will support as best we can. The non-happening corner-cases about
|
|
// overlapping ranges with the merge merge etc are on a best-effort basis, viz. result
|
|
// may be incomplete.
|
|
if(numFiles==-1) {
|
|
//all files
|
|
m_numChunks = base->getNumFiles() - startFileNum;
|
|
} else {
|
|
//a specific range (typically just a single file)
|
|
m_numChunks = numFiles;
|
|
if(startFileNum+m_numChunks > base->getNumFiles())
|
|
m_numChunks = base->getNumFiles() - startFileNum;
|
|
}
|
|
if(m_numChunks<0) {
|
|
//can happen if a merge finishes and deletes files between the upper-logic
|
|
//iteration over files and this calculation.
|
|
m_numChunks = 0;
|
|
}
|
|
|
|
try {
|
|
m_scan = new Scan[m_numChunks];
|
|
} catch(std::bad_alloc&) {
|
|
log(LOG_WARN, "disk: Could not allocate %d 'Scan' structures to read %s.",m_numChunks,base->getDbName());
|
|
g_errno = ENOMEM;
|
|
return true;
|
|
}
|
|
|
|
// store the file numbers in the scan array, these are the files we read
|
|
m_numFileNums = 0;
|
|
for (int32_t i = startFileNum; i < startFileNum + m_numChunks; i++) {
|
|
if (base->isReadable(i)) {
|
|
m_scan[m_numFileNums++].m_fileId = base->getFileId(i);
|
|
}
|
|
}
|
|
|
|
// remember the file range we should scan
|
|
m_numScansStarted = 0;
|
|
m_numScansCompleted = 0;
|
|
KEYSET(m_startKey,startKeyArg,m_ks);
|
|
KEYSET(m_endKey,endKeyArg,m_ks);
|
|
KEYSET(m_constrainKey,endKeyArg,m_ks);//set incase justGetEndKey istrue
|
|
m_minRecSizes = minRecSizes;
|
|
|
|
// bail if 0 files to scan -- no! need to set startKey/endKey
|
|
if ( numFiles == 0 ) return true;
|
|
// don't read anything if endKey < startKey
|
|
if ( KEYCMP(m_startKey,m_endKey,m_ks)>0 ) return true;
|
|
// keep the original in tact in case g_errno == ETRYAGAIN
|
|
KEYSET(m_endKeyOrig,endKeyArg,m_ks);
|
|
m_minRecSizesOrig = minRecSizes;
|
|
// start reading at this key
|
|
m_fileStartKey = startKeyArg;
|
|
// start the timer, keep it fast for clusterdb though
|
|
if ( g_conf.m_logTimingDb ) m_startTime = gettimeofdayInMilliseconds();
|
|
// . we now boost m_minRecSizes to account for negative recs
|
|
// . but not if only reading one list, cuz it won't get merged and
|
|
// it will be too big to send back
|
|
if ( m_numFileNums > 1 ) compensateForNegativeRecs ( base );
|
|
verify_signature();
|
|
|
|
// . often endKey is too big for an efficient read of minRecSizes bytes
|
|
// because we end up reading too much from all the files
|
|
// . this will set m_startpg[i], m_endpg[i] for each RdbScan/RdbFile
|
|
// to ensure we read "minRecSizes" worth of records, not much more
|
|
// . returns the new endKey for all ranges
|
|
// . now this just overwrites m_endKey
|
|
setPageRanges(base);
|
|
|
|
// . NEVER let m_endKey be a negative key, because it will
|
|
// always be unmatched, since delbit is cleared
|
|
// . adjusting it here ensures our generated hints are valid
|
|
// . we will use this key to call constrain() with
|
|
//m_constrainKey = m_endKey;
|
|
//if ( ( m_constrainKey.n0 & 0x01) == 0x00 )
|
|
// m_constrainKey -= (uint32_t)1;
|
|
KEYSET(m_constrainKey,m_endKey,m_ks);
|
|
if ( KEYNEG(m_constrainKey) )
|
|
KEYDEC(m_constrainKey,m_ks);
|
|
|
|
// Msg5 likes to get the endkey for getting the list from the tree
|
|
if ( justGetEndKey ) return true;
|
|
|
|
Rdb *rdb = getRdbFromId(m_rdbId);
|
|
{
|
|
ScopedLock sl(m_mtxScanCounters);
|
|
m_scansBeingSubmitted = true;
|
|
}
|
|
|
|
// . now start reading/scanning the files
|
|
// . our m_scans array starts at 0
|
|
for ( int32_t i = 0 ; i < m_numFileNums ; i++ ) {
|
|
// get the page range
|
|
|
|
// sanity check
|
|
if (i > 0 && m_scan[i - 1].m_fileId >= m_scan[i].m_fileId) {
|
|
log(LOG_LOGIC, "net: msg3: files must be read in order from oldest to newest so RdbList::indexMerge_r "
|
|
"works properly. Otherwise, corruption will result.");
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
RdbMap *map = base->getMapById(m_scan[i].m_fileId);
|
|
// this can happen somehow!
|
|
if (!map) {
|
|
logError("net: msg3: getMapById with fileId=%" PRId32" returns NULL. Bad engineer.", m_scan[i].m_fileId);
|
|
continue;
|
|
}
|
|
|
|
// . sanity check?
|
|
// . no, we must get again since we turn on endKey's last bit
|
|
int32_t p1 , p2;
|
|
map->getPageRange(m_fileStartKey, m_endKey, &p1, &p2, NULL);
|
|
// now get some read info
|
|
int64_t offset = map->getAbsoluteOffset ( p1 );
|
|
int64_t bytesToRead = map->getRecSizes ( p1, p2, false);
|
|
|
|
incrementScansStarted();
|
|
// . keep stats on our disk accesses
|
|
// . count disk seeks (assuming no fragmentation)
|
|
// . count disk bytes read
|
|
if ( bytesToRead > 0 ) {
|
|
rdb->didSeek ( );
|
|
rdb->didRead ( bytesToRead );
|
|
}
|
|
|
|
// . the startKey may be different for each RdbScan class
|
|
// . RdbLists must have all keys within their [startKey,endKey]
|
|
// . therefore set startKey individually from first page in map
|
|
// . this endKey must be >= m_endKey
|
|
// . this startKey must be < m_startKey
|
|
char startKey2 [ MAX_KEY_BYTES ];
|
|
char endKey2 [ MAX_KEY_BYTES ];
|
|
|
|
map->getKey(p1, startKey2);
|
|
map->getKey(p2, endKey2);
|
|
|
|
// store in here
|
|
m_scan[i].m_startpg = p1;
|
|
m_scan[i].m_endpg = p2;
|
|
|
|
// . we read UP TO that endKey, so reduce by 1
|
|
// . but iff p2 is NOT the last page in the map/file
|
|
// . maps[fn]->getKey(lastPage) will return the LAST KEY
|
|
// and maps[fn]->getOffset(lastPage) the length of the file
|
|
if(map->getNumPages() != p2) {
|
|
//only decrease endKey2 if it larger than startKey2
|
|
if(KEYCMP(startKey2,endKey2,m_ks)<0)
|
|
KEYDEC(endKey2,m_ks);
|
|
} else {
|
|
// otherwise, if we're reading all pages, then force the
|
|
// endKey to virtual inifinite
|
|
KEYMAX(endKey2,m_ks);
|
|
}
|
|
|
|
// . set up the hints
|
|
// . these are only used if we are only reading from 1 file
|
|
// . these are used to call constrain() so we can constrain
|
|
// the end of the list w/o looping through all the recs
|
|
// in the list
|
|
int32_t h2 = p2 ;
|
|
// decrease by one page if we're on the last page
|
|
if ( h2 > p1 && map->getNumPages() == h2 ) h2--;
|
|
// . decrease hint page until key is <= endKey on that page
|
|
// AND offset is NOT -1 because the old way would give
|
|
// us hints passed the endkey
|
|
// . also decrease so we can constrain on minRecSizes in
|
|
// case we're the only list being read
|
|
// . use >= m_minRecSizes instead of >, otherwise we may
|
|
// never be able to set "size" in RdbList::constrain()
|
|
// because "p" could equal "maxPtr" right away
|
|
while ( h2 > p1 &&
|
|
(KEYCMP(map->getKeyPtr(h2),m_constrainKey,m_ks)>0 ||
|
|
map->getOffset(h2) == -1 ||
|
|
map->getAbsoluteOffset(h2) - offset >= m_minRecSizes ) )
|
|
{
|
|
h2--;
|
|
}
|
|
|
|
// now set the hint
|
|
m_scan[i].m_hintOffset = map->getAbsoluteOffset(h2) - map->getAbsoluteOffset(p1);
|
|
KEYSET(m_scan[i].m_hintKey, map->getKeyPtr(h2), m_ks);
|
|
|
|
// reset g_errno before calling setRead()
|
|
g_errno = 0;
|
|
|
|
// timing debug
|
|
if ( g_conf.m_logTimingDb )
|
|
log(LOG_TIMING,
|
|
"net: msg: reading %" PRId64" bytes from %s file #%" PRId32" "
|
|
"(niceness=%" PRId32")",
|
|
bytesToRead,base->getDbName(),i,m_niceness);
|
|
|
|
// log huge reads, those hurt us
|
|
if ( bytesToRead > 150000000 ) {
|
|
logf(LOG_INFO,"disk: Reading %" PRId64" bytes at offset %" PRId64" "
|
|
"from %s.",
|
|
bytesToRead,offset,base->getDbName());
|
|
}
|
|
|
|
if(bytesToRead > 10000000 &&
|
|
bytesToRead / 2 > m_minRecSizes &&
|
|
base->getFixedDataSize() >= 0)
|
|
{
|
|
// if any keys in the map are the same report corruption
|
|
char tmpKey [MAX_KEY_BYTES];
|
|
char lastTmpKey[MAX_KEY_BYTES];
|
|
int32_t ccount = 0;
|
|
for(int32_t pn = p1; pn <= p2; pn++) {
|
|
map->getKey ( pn , tmpKey );
|
|
if(pn!=p1 && KEYCMP(tmpKey,lastTmpKey,m_ks) == 0)
|
|
ccount++;
|
|
memcpy(lastTmpKey,tmpKey,sizeof(tmpKey));
|
|
}
|
|
if(ccount > 10) {
|
|
logf(LOG_INFO,"disk: Reading %" PRId32" bytes from %s fileId="
|
|
"%" PRId32" when min "
|
|
"required is %" PRId32". Map is corrupt and has %" PRId32" "
|
|
"identical consecutive page keys because the "
|
|
"map was \"repaired\" because out of order keys "
|
|
"in the index.",
|
|
(int32_t)bytesToRead,
|
|
base->getDbName(), m_scan[i].m_fileId,
|
|
(int32_t)m_minRecSizes,
|
|
(int32_t)ccount);
|
|
incrementScansCompleted();
|
|
m_errno = ECORRUPTDATA;
|
|
m_hadCorruption = true;
|
|
//m_maxRetries = 0;
|
|
break;
|
|
}
|
|
}
|
|
|
|
////////
|
|
//
|
|
// try to get from PAGE CACHE
|
|
//
|
|
////////
|
|
m_scan[i].m_inPageCache = false;
|
|
BigFile *ff = base->getFileById(m_scan[i].m_fileId);
|
|
if (!ff) {
|
|
logError("net: msg3: getFileById with fileId=%" PRId32" returns NULL. Bad engineer.", m_scan[i].m_fileId);
|
|
continue;
|
|
}
|
|
|
|
RdbCache *rpc = getDiskPageCache ( m_rdbId );
|
|
if(rpc) {
|
|
// . vfd is unique 64 bit file id
|
|
// . if file is opened vfd is -1, only set in call to open()
|
|
int64_t vfd = ff->getVfd();
|
|
key192_t ck = makeCacheKey ( vfd , offset, bytesToRead);
|
|
char *rec; int32_t recSize;
|
|
bool inCache = false;
|
|
RdbCacheLock rcl(*rpc);
|
|
if ( vfd != -1 && ! m_validateCache )
|
|
inCache = rpc->getRecord ( (collnum_t)0 , // collnum
|
|
(char *)&ck ,
|
|
&rec ,
|
|
&recSize ,
|
|
true , // copy?
|
|
-1 , // maxAge, none
|
|
true ); // inccounts?
|
|
if ( inCache ) {
|
|
m_scan[i].m_inPageCache = true;
|
|
incrementScansCompleted();
|
|
// now we have to store this value, 6 or 12 so
|
|
// we can modify the hint appropriately
|
|
m_scan[i].m_shiftCount = *rec;
|
|
m_scan[i].m_list.set ( rec +1,
|
|
recSize-1 ,
|
|
rec , // alloc
|
|
recSize , // allocSize
|
|
startKey2 ,
|
|
endKey2 ,
|
|
base->getFixedDataSize() ,
|
|
true , // owndata
|
|
base->useHalfKeys() ,
|
|
getKeySizeFromRdbId ( m_rdbId ) );
|
|
continue;
|
|
}
|
|
}
|
|
|
|
|
|
// . do the scan/read of file #i
|
|
// . this returns false if blocked, true otherwise
|
|
// . this will set g_errno on error
|
|
bool done = m_scan[i].m_scan.setRead(ff, base->getFixedDataSize(), offset, bytesToRead,
|
|
startKey2, endKey2, m_ks, &m_scan[i].m_list,
|
|
callback ? this : NULL,
|
|
callback ? &doneScanningWrapper0 : NULL,
|
|
base->useHalfKeys(), m_rdbId, m_niceness, true);
|
|
|
|
// if it did not block then it completed, so count it
|
|
if (done) {
|
|
incrementScansCompleted();
|
|
}
|
|
|
|
// break on an error, and remember g_errno in case we block
|
|
if ( g_errno ) {
|
|
int32_t tt = LOG_WARN;
|
|
if ( g_errno == EFILECLOSED ) tt = LOG_INFO;
|
|
log(tt,"disk: Reading %s had error: %s.",
|
|
base->getDbName(), mstrerror(g_errno));
|
|
m_errno = g_errno;
|
|
break;
|
|
}
|
|
}
|
|
|
|
{
|
|
ScopedLock sl(m_mtxScanCounters);
|
|
m_scansBeingSubmitted = false;
|
|
|
|
if(m_numScansStarted!=m_numScansCompleted)
|
|
return false; //not completed yet
|
|
}
|
|
|
|
// . if all scans completed without blocking then wrap it up & ret true
|
|
// . doneScanning may now block if it finds data corruption and must
|
|
// get the list remotely
|
|
verify_signature();
|
|
return doneScanning();
|
|
}
|
|
|
|
|
|
void Msg3::doneScanningWrapper0(void *state) {
|
|
Msg3 *THIS = (Msg3 *) state;
|
|
THIS->doneScanningWrapper();
|
|
}
|
|
|
|
void Msg3::doneScanningWrapper() {
|
|
verify_signature();
|
|
// log(LOG_TRACE,"Msg3(%p)::doneScqanningWrapper()",THIS);
|
|
|
|
bool done = incrementScansCompleted();
|
|
|
|
// if we had an error, remember it
|
|
if ( g_errno ) {
|
|
// get base, returns NULL and sets g_errno to ENOCOLLREC on err
|
|
RdbBase *base = getRdbBase( m_rdbId, m_collnum );
|
|
const char *dbname = "NOT FOUND";
|
|
if ( base ) {
|
|
dbname = base->getDbName();
|
|
}
|
|
|
|
int32_t tt = LOG_WARN;
|
|
if ( g_errno == EFILECLOSED ) {
|
|
tt = LOG_INFO;
|
|
}
|
|
log(tt,"net: Reading %s had error: %s.", dbname,mstrerror(g_errno));
|
|
m_errno = g_errno;
|
|
g_errno = 0;
|
|
}
|
|
|
|
// return now if we're awaiting more scan completions
|
|
if ( !done ) {
|
|
return;
|
|
}
|
|
|
|
// . give control to doneScanning
|
|
// . return if it blocks
|
|
if ( !doneScanning() ) {
|
|
return;
|
|
}
|
|
|
|
// if one of our lists was *huge* and could not alloc mem, it was
|
|
// due to corruption
|
|
if ( m_hadCorruption ) {
|
|
g_errno = ECORRUPTDATA;
|
|
}
|
|
|
|
// if it doesn't block call the callback, g_errno may be set
|
|
verify_signature();
|
|
m_callback ( m_state );
|
|
}
|
|
|
|
|
|
// . but now that we may get a list remotely to fix data corruption,
|
|
// this may indeed block
|
|
bool Msg3::doneScanning ( ) {
|
|
verify_signature();
|
|
// . did we have any error on any scan?
|
|
// . if so, repeat ALL of the scans
|
|
g_errno = m_errno;
|
|
// 2 retry is the default
|
|
// int32_t max = 2;
|
|
// see if explicitly provided by the caller
|
|
// if ( m_maxRetries >= 0 ) max = m_maxRetries;
|
|
// now use -1 (no max) as the default no matter what
|
|
int32_t max = -1;
|
|
// ENOMEM is particulary contagious, so watch out with it...
|
|
if ( g_errno == ENOMEM && m_maxRetries == -1 ) max = 0;
|
|
// msg0 sets maxRetries to 2, don't let max stay set to -1
|
|
if ( g_errno == ENOMEM && m_maxRetries != -1 ) max = m_maxRetries;
|
|
// when thread cannot alloc enough read buf it keeps the read buf
|
|
// set to NULL and BigFile.cpp sets g_errno to EBUFTOOSMALL
|
|
if ( g_errno == EBUFTOOSMALL && m_maxRetries == -1 ) max = 0;
|
|
// msg0 sets maxRetries to 2, don't let max stay set to -1
|
|
if ( g_errno == EBUFTOOSMALL && m_maxRetries != -1 ) max = m_maxRetries;
|
|
// this is set above if the map has the same consecutive key repeated
|
|
// and the read is enormous
|
|
if ( g_errno == ECORRUPTDATA ) max = 0;
|
|
// usually bad disk failures, don't retry those forever
|
|
//if ( g_errno == EIO ) max = 3;
|
|
// no, now our hitachis return these even when they're good so
|
|
// we have to keep retrying forever
|
|
if ( g_errno == EIO ) max = -1;
|
|
// count these so we do not take drives offline just because
|
|
// kernel ring buffer complains...
|
|
if ( g_errno == EIO ) g_numIOErrors++;
|
|
// bail early on high priority reads for these errors
|
|
if ( g_errno == EIO && m_niceness == 0 ) max = 0;
|
|
|
|
// on I/O, give up at call it corrupt after a while. some hitachis
|
|
// have I/O errros on little spots, like gk88, maybe we can fix him
|
|
if ( g_errno == EIO && m_retryNum >= 5 ) {
|
|
m_errno = ECORRUPTDATA;
|
|
m_hadCorruption = true;
|
|
// do not do any retries any more
|
|
max = 0;
|
|
}
|
|
|
|
verify_signature();
|
|
// convert m_errno to ECORRUPTDATA if it is EBUFTOOSMALL and the
|
|
// max of the bytesToRead are over 500MB.
|
|
// if bytesToRead was ludicrous, then assume that the data file
|
|
// was corrupted, the map was regenerated and it patched
|
|
// over the corrupted bits which were 500MB or more in size.
|
|
// we cannot practically allocate that much, so let's just
|
|
// give back an empty buffer. treat it like corruption...
|
|
// the way it patches is to store the same key over all the corrupted
|
|
// pages, which can get pretty big. so if you read a range with that
|
|
// key you will be hurting!!
|
|
// this may be the same scenario as when the rdbmap has consecutive
|
|
// same keys. see above where we set m_errno to ECORRUPTDATA...
|
|
if ( g_errno == EBUFTOOSMALL ) {
|
|
int32_t biggest = 0;
|
|
for ( int32_t i = 0 ; i < m_numFileNums ; i++ ) {
|
|
if ( m_scan[i].m_scan.getBytesToRead() < biggest ) continue;
|
|
biggest = m_scan[i].m_scan.getBytesToRead();
|
|
}
|
|
if ( biggest > 500000000 ) {
|
|
log(LOG_WARN,"db: Max read size was %" PRId32" > 500000000. Assuming "
|
|
"corrupt data in data file.",biggest);
|
|
m_errno = ECORRUPTDATA;
|
|
m_hadCorruption = true;
|
|
// do not do any retries on this, the read was > 500MB
|
|
max = 0;
|
|
}
|
|
}
|
|
|
|
// if shutting down gb then limit to 20 so we can shutdown because
|
|
// it can't shutdown until all threads are out of the queue i think
|
|
if (g_process.isShuttingDown() && max < 0) {
|
|
//log("msg3: forcing retries to 0 because shutting down");
|
|
max = 0;
|
|
}
|
|
|
|
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
|
|
RdbBase *base = getRdbBase( m_rdbId, m_collnum );
|
|
if ( ! base ) {
|
|
return true;
|
|
}
|
|
|
|
// this really slows things down because it blocks the cpu so
|
|
// leave it out for now
|
|
#ifdef GBSANITYCHECK
|
|
// check for corruption here, do not do it again in Msg5 if we pass
|
|
if ( ! g_errno ) { // && g_conf.m_doErrorCorrection ) {
|
|
int32_t i;
|
|
for ( i = 0 ; i < m_numFileNums ; i++ )
|
|
if ( ! m_scan[i].m_list.checkList_r ( false ) ) break;
|
|
if ( i < m_numFileNums ) {
|
|
g_errno = ECORRUPTDATA;
|
|
m_errno = ECORRUPTDATA;
|
|
max = g_conf.m_corruptRetries; // try 100 times
|
|
log(LOG_WARN,"db: Encountered corrupt list in file %s.",
|
|
base->getFile(m_scan[i].m_fileNum)->getFilename());
|
|
}
|
|
else
|
|
m_listsChecked = true;
|
|
}
|
|
#endif
|
|
|
|
verify_signature();
|
|
// try to fix this error i've seen
|
|
if ( g_errno == EBADENGINEER && max == -1 )
|
|
max = 100;
|
|
|
|
// . if we had a ETRYAGAIN error, then try again now
|
|
// . it usually means the whole file or a part of it was deleted
|
|
// before we could finish reading it, so we should re-read all now
|
|
// . RdbMerge deletes BigFiles after it merges them
|
|
// . now that we have threads i'd imagine we'd get EBADFD or something
|
|
// . i've also seen "illegal seek" as well
|
|
if ( m_errno && (m_retryNum < max || max < 0) ) {
|
|
// print the error
|
|
static time_t s_time = 0;
|
|
time_t now = getTime();
|
|
if ( now - s_time > 5 ) {
|
|
log(LOG_WARN, "net: Had error reading %s: %s. Retrying. (retry #%" PRId32")",
|
|
base->getDbName(),mstrerror(m_errno) , m_retryNum );
|
|
s_time = now;
|
|
}
|
|
// send email alert if in an infinite loop, but don't send
|
|
// more than once every 2 hours
|
|
static int32_t s_lastSendTime = 0;
|
|
if ( m_retryNum == 100 && getTime() - s_lastSendTime > 3600*2){
|
|
// remove this for now it is going off all the time
|
|
//g_pingServer.sendEmail(NULL,//g_hostdb.getMyHost(),
|
|
// "100 read retries",true);
|
|
s_lastSendTime = getTime();
|
|
}
|
|
// clear g_errno cuz we should for call to readList()
|
|
g_errno = 0;
|
|
// free the list buffer since if we have 1000 Msg3s retrying
|
|
// it will totally use all of our memory
|
|
for ( int32_t i = 0 ; i < m_numChunks ; i++ )
|
|
m_scan[i].m_list.destructor();
|
|
// count retries
|
|
m_retryNum++;
|
|
|
|
// backoff scheme, wait 200ms more each time
|
|
int32_t wait ;
|
|
if ( m_retryNum == 1 ) {
|
|
wait = 10;
|
|
} else {
|
|
wait = 200 * m_retryNum;
|
|
}
|
|
|
|
// . don't wait more than 10 secs between tries
|
|
// . i've seen gf0 and gf16 get mega saturated
|
|
if ( wait > 10000 ) {
|
|
wait = 10000;
|
|
}
|
|
|
|
// wait
|
|
if (g_loop.registerSleepCallback(wait, this, doneSleepingWrapper3, "Msg3::doneSleepingWrapper3", m_niceness)) {
|
|
return false;
|
|
}
|
|
|
|
// otherwise, registration failed
|
|
log(LOG_ERROR,
|
|
"net: Failed to register sleep callback for retry. "
|
|
"Abandoning read. This is bad.");
|
|
// return, g_errno should be set
|
|
g_errno = EBUFTOOSMALL;
|
|
m_errno = EBUFTOOSMALL;
|
|
return true;
|
|
}
|
|
|
|
verify_signature();
|
|
// if we got an error and should not retry any more then give up
|
|
if ( g_errno ) {
|
|
log(LOG_ERROR,
|
|
"net: Had error reading %s: %s. Giving up after %" PRId32" "
|
|
"retries.",
|
|
base->getDbName(),mstrerror(g_errno) , m_retryNum );
|
|
return true;
|
|
}
|
|
|
|
// note it if the retry finally worked
|
|
if ( m_retryNum > 0 )
|
|
log(LOG_INFO,"disk: Read succeeded after retrying %" PRId32" times.",
|
|
(int32_t)m_retryNum);
|
|
|
|
// count total bytes for logging
|
|
int32_t count = 0;
|
|
// . constrain all lists to make merging easier
|
|
// . if we have only one list, then that's nice cuz the constrain
|
|
// will allow us to send it right away w/ zero copying
|
|
// . if we have only 1 list, it won't be merged into a final list,
|
|
// that is, we'll just set m_list = &m_scan[i].m_list
|
|
for ( int32_t i = 0 ; i < m_numFileNums ; i++ ) {
|
|
// count total bytes for logging
|
|
count += m_scan[i].m_list.getListSize();
|
|
if(!m_scan[i].m_inPageCache)
|
|
m_scan[i].m_shiftCount = m_scan[i].m_scan.shiftCount();
|
|
// . hint offset is relative to the offset of first key we read
|
|
// . if that key was only 6 bytes RdbScan shift the list buf
|
|
// down 6 bytes to make the first key 12 bytes... a
|
|
// requirement for all RdbLists
|
|
// . don't inc it, though, if it was 0, pointing to the start
|
|
// of the list because our shift won't affect that
|
|
if ( m_scan[i].m_shiftCount == 6 && m_scan[i].m_hintOffset > 0 )
|
|
m_scan[i].m_hintOffset += 6;
|
|
// posdb double compression
|
|
if ( m_scan[i].m_shiftCount == 12 && m_scan[i].m_hintOffset > 0 )
|
|
m_scan[i].m_hintOffset += 12;
|
|
// . don't constrain on minRecSizes here because it may
|
|
// make our endKey smaller, which will cause problems
|
|
// when Msg5 merges these lists.
|
|
// . If all lists have different endKeys RdbList's merge
|
|
// chooses the min and will merge in recs beyond that
|
|
// causing a bad list BECAUSE we don't check to make
|
|
// sure that recs we are adding are below the endKey
|
|
// . if we only read from one file then constrain based
|
|
// on minRecSizes so we can send the list back w/o merging
|
|
// OR if just merging with RdbTree's list
|
|
int32_t mrs ;
|
|
// . constrain to m_minRecSizesOrig, not m_minRecSizes cuz
|
|
// that could be adjusted by compensateForNegativeRecs()
|
|
// . but, really, they should be the same if we only read from
|
|
// the root file
|
|
if ( m_numFileNums == 1 ) mrs = m_minRecSizesOrig;
|
|
else mrs = -1;
|
|
// . this returns false and sets g_errno on error
|
|
// . like if data is corrupt
|
|
BigFile *ff = base->getFileById(m_scan[i].m_fileId);
|
|
// if we did a merge really quick and delete one of the
|
|
// files we were reading, i've seen 'ff' be NULL
|
|
const char *filename = "lostfilename";
|
|
if ( ff ) filename = ff->getFilename();
|
|
|
|
// compute cache info
|
|
RdbCache *rpc = getDiskPageCache ( m_rdbId );
|
|
int64_t vfd ;
|
|
if ( ff ) vfd = ff->getVfd();
|
|
key192_t ck ;
|
|
if ( ff )
|
|
ck = makeCacheKey ( vfd ,
|
|
m_scan[i].m_scan.getOffset(),
|
|
m_scan[i].m_scan.getBytesToRead() );
|
|
if ( m_validateCache && ff && rpc && vfd != -1 ) {
|
|
bool inCache;
|
|
char *rec; int32_t recSize;
|
|
RdbCacheLock rcl(*rpc);
|
|
inCache = rpc->getRecord ( (collnum_t)0 , // collnum
|
|
(char *)&ck ,
|
|
&rec ,
|
|
&recSize ,
|
|
true , // copy?
|
|
-1 , // maxAge, none
|
|
true ); // inccounts?
|
|
if ( inCache &&
|
|
// 1st byte is RdbScan::m_shifted
|
|
( m_scan[i].m_list.getListSize() != recSize-1 ||
|
|
memcmp ( m_scan[i].m_list.getList() , rec+1,recSize-1) != 0 ||
|
|
*rec != m_scan[i].m_scan.shiftCount() ) ) {
|
|
log(LOG_ERROR, "msg3: cache did not validate");
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
mfree ( rec , recSize , "vca" );
|
|
}
|
|
|
|
|
|
///////
|
|
//
|
|
// STORE IN PAGE CACHE
|
|
//
|
|
///////
|
|
// store what we read in the cache. don't bother storing
|
|
// if it was a retry, just in case something strange happened.
|
|
// store pre-constrain call is more efficient.
|
|
if ( m_retryNum<=0 && ff && rpc && vfd != -1 &&
|
|
! m_scan[i].m_inPageCache )
|
|
{
|
|
RdbCacheLock rcl(*rpc);
|
|
char tmpShiftCount = m_scan[i].m_scan.shiftCount();
|
|
rpc->addRecord ( (collnum_t)0 , // collnum
|
|
(char *)&ck ,
|
|
// rec1 is this little thingy
|
|
&tmpShiftCount,
|
|
1,
|
|
// rec2
|
|
m_scan[i].m_list.getList() ,
|
|
m_scan[i].m_list.getListSize() ,
|
|
0 ); // timestamp. 0 = now
|
|
}
|
|
|
|
if (!m_scan[i].m_list.constrain(m_startKey, m_constrainKey, mrs, m_scan[i].m_hintOffset, m_scan[i].m_hintKey, m_rdbId, filename)) {
|
|
log(LOG_WARN, "net: Had error while constraining list read from %s: %s/%s. vfd=%" PRId32" parts=%" PRId32". "
|
|
"This is likely caused by corrupted data on disk.",
|
|
mstrerror(g_errno), ff->getDir(), ff->getFilename(), ff->getVfd(), (int32_t)ff->getNumParts() );
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// print the time
|
|
if ( g_conf.m_logTimingDb ) {
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
int64_t took = now - m_startTime;
|
|
log(LOG_TIMING,
|
|
"net: Took %" PRId64" ms to read %" PRId32" lists of %" PRId32" bytes total"
|
|
" from %s (niceness=%" PRId32").",
|
|
took,m_numFileNums,count,base->getDbName(),m_niceness);
|
|
}
|
|
verify_signature();
|
|
return true;
|
|
}
|
|
|
|
void Msg3::doneSleepingWrapper3 ( int fd , void *state ) {
|
|
Msg3 *THIS = (Msg3 *)state;
|
|
THIS->doneSleepingWrapper3();
|
|
}
|
|
|
|
void Msg3::doneSleepingWrapper3() {
|
|
verify_signature();
|
|
// now try reading again
|
|
if ( ! doneSleeping ( ) ) return;
|
|
// if it doesn't block call the callback, g_errno may be set
|
|
m_callback ( m_state );
|
|
}
|
|
|
|
bool Msg3::doneSleeping ( ) {
|
|
verify_signature();
|
|
// unregister
|
|
g_loop.unregisterSleepCallback(this,doneSleepingWrapper3);
|
|
// read again
|
|
if ( ! readList ( m_rdbId ,
|
|
m_collnum ,
|
|
m_startKey ,
|
|
m_endKeyOrig ,
|
|
m_minRecSizesOrig ,
|
|
m_startFileNum ,
|
|
m_numFiles ,
|
|
m_state ,
|
|
m_callback ,
|
|
m_niceness ,
|
|
m_retryNum ,
|
|
m_maxRetries ,
|
|
false ) ) return false;
|
|
return true;
|
|
}
|
|
|
|
// . returns a new, smaller endKey
|
|
// . shrinks endKey while still preserving the minRecSizes requirement
|
|
// . this is the most confusing subroutine in the project
|
|
// . this now OVERWRITES endKey with the new one
|
|
void Msg3::setPageRanges(RdbBase *base) {
|
|
verify_signature();
|
|
// sanity check
|
|
//if ( m_ks != 12 && m_ks != 16 ) { g_process.shutdownAbort(true); }
|
|
// . initialize the startpg/endpg for each file
|
|
// . we read from the first offset on m_startpg to offset on m_endpg
|
|
// . since we set them equal that means an empty range for each file
|
|
for ( int32_t i = 0 ; i < m_numFileNums ; i++ ) {
|
|
RdbMap *map = base->getMapById(m_scan[i].m_fileId);
|
|
if(!map && (m_scan[i].m_fileId%2)==0) {
|
|
//Maybe we refer to a in-progress-merge file, eg. 0002, which has been finished and renamed in the meantime to 0003
|
|
//See RdbBase::unlinksDone() for details
|
|
map = base->getMapById(m_scan[i].m_fileId+1);
|
|
}
|
|
if (!map) {
|
|
gbshutdownLogicError();
|
|
}
|
|
|
|
m_scan[i].m_startpg = map->getPage(m_fileStartKey);
|
|
m_scan[i].m_endpg = m_scan[i].m_startpg;
|
|
}
|
|
|
|
// just return if minRecSizes 0 (no reading needed)
|
|
if ( m_minRecSizes <= 0 ) return;
|
|
// calculate minKey minus one
|
|
char lastMinKey[MAX_KEY_BYTES];
|
|
bool lastMinKeyIsValid = false;
|
|
// loop until we find the page ranges that barely satisfy "minRecSizes"
|
|
for(;;) {
|
|
// find the map whose next page has the lowest key
|
|
int32_t minpg = -1;
|
|
char minKey[MAX_KEY_BYTES];
|
|
for (int32_t i = 0; i < m_numFileNums; i++) {
|
|
RdbMap *map = base->getMapById(m_scan[i].m_fileId);
|
|
if (!map) {
|
|
gbshutdownLogicError();
|
|
}
|
|
|
|
// this guy is out of race if his end key > "endKey" already
|
|
if (KEYCMP(map->getKeyPtr(m_scan[i].m_endpg), m_endKey, m_ks) > 0) {
|
|
continue;
|
|
}
|
|
|
|
// get the next page after m_scan[i].m_endpg
|
|
int32_t nextpg = m_scan[i].m_endpg + 1;
|
|
|
|
// if endpg[i]+1 == m_numPages then we maxed out this range
|
|
if (nextpg > map->getNumPages()) {
|
|
continue;
|
|
}
|
|
|
|
// . but this may have an offset of -1
|
|
// . which means the page has no key starting on it and
|
|
// it's occupied by a rec which starts on a previous page
|
|
while (nextpg < map->getNumPages() && map->getOffset(nextpg) == -1) {
|
|
nextpg++;
|
|
}
|
|
|
|
// . continue if his next page doesn't have the minimum key
|
|
// . if nextpg == getNumPages() then it returns the LAST KEY
|
|
// contained in the corresponding RdbFile
|
|
if (minpg != -1 && KEYCMP(map->getKeyPtr(nextpg), minKey, m_ks) > 0) {
|
|
continue;
|
|
}
|
|
|
|
// . we got a winner, his next page has the current min key
|
|
// . if m_scan[i].m_endpg+1 == getNumPages() then getKey() returns the
|
|
// last key in the mapped file
|
|
// . minKey should never equal the key on m_scan[i].m_endpg UNLESS
|
|
// it's on page #m_numPages
|
|
KEYSET(minKey,map->getKeyPtr(nextpg),m_ks);
|
|
minpg = i;
|
|
|
|
// if minKey is same as the current key on this endpg, inc it
|
|
// so we cause some advancement, otherwise, we'll loop forever
|
|
if (KEYCMP(minKey, map->getKeyPtr(m_scan[i].m_endpg), m_ks) != 0) {
|
|
continue;
|
|
}
|
|
|
|
KEYINC(minKey,m_ks);
|
|
}
|
|
|
|
// . we're done if we hit the end of all maps in the race
|
|
if ( minpg == -1 ) {
|
|
return;
|
|
}
|
|
|
|
// sanity check
|
|
if (lastMinKeyIsValid && KEYCMP(minKey, lastMinKey, m_ks) <= 0) {
|
|
g_errno = ECORRUPTDATA;
|
|
log(LOG_ERROR, "db: Got corrupted map in memory for %s. This is almost "
|
|
"always because of bad memory. Please replace your RAM.", base->getDbName());
|
|
gbshutdownCorrupted();
|
|
}
|
|
|
|
// don't let minKey exceed endKey, however
|
|
if (KEYCMP(minKey, m_endKey, m_ks) > 0) {
|
|
KEYSET(minKey, m_endKey, m_ks);
|
|
KEYINC(minKey, m_ks);
|
|
KEYSET(lastMinKey, m_endKey, m_ks);
|
|
} else {
|
|
KEYSET(lastMinKey, minKey, m_ks);
|
|
KEYDEC(lastMinKey, m_ks);
|
|
}
|
|
|
|
// it is now valid
|
|
lastMinKeyIsValid = true;
|
|
|
|
// . advance m_scan[i].m_endpg so that next page < minKey
|
|
// . we want to read UP TO the first key on m_scan[i].m_endpg
|
|
for ( int32_t i = 0 ; i < m_numFileNums ; i++ ) {
|
|
RdbMap *map = base->getMapById(m_scan[i].m_fileId);
|
|
if (!map) {
|
|
gbshutdownLogicError();
|
|
}
|
|
|
|
m_scan[i].m_endpg = map->getEndPage(m_scan[i].m_endpg, lastMinKey);
|
|
}
|
|
|
|
// . if the minKey is BIGGER than the provided endKey we're done
|
|
// . we don't necessarily include records whose key is "minKey"
|
|
if (KEYCMP(minKey, m_endKey, m_ks) > 0) {
|
|
return;
|
|
}
|
|
|
|
// . calculate recSizes per page within [startKey,minKey-1]
|
|
// . compute bytes of records in [startKey,minKey-1] for each map
|
|
// . this includes negative records so we may have annihilations
|
|
// when merging into "diskList" and get less than what we wanted
|
|
// but endKey should be shortened, so our caller will know to call
|
|
// again if he wants more
|
|
int32_t recSizes = 0;
|
|
for ( int32_t i = 0 ; i < m_numFileNums ; i++ ) {
|
|
RdbMap *map = base->getMapById(m_scan[i].m_fileId);
|
|
if (!map) {
|
|
gbshutdownLogicError();
|
|
}
|
|
|
|
recSizes += map->getMinRecSizes(m_scan[i].m_startpg, m_scan[i].m_endpg, m_fileStartKey, lastMinKey, false);
|
|
}
|
|
|
|
// if we hit it then return minKey -1 so we only read UP TO "minKey"
|
|
// not including "minKey"
|
|
if ( recSizes >= m_minRecSizes ) {
|
|
// . sanity check
|
|
// . this sanity check fails sometimes, but leave it
|
|
// out for now... causes the Illegal endkey msgs in
|
|
// RdbList::indexMerge_r()
|
|
//if ( KEYNEG(lastMinKey) ) { g_process.shutdownAbort(true); }
|
|
KEYSET(m_endKey,lastMinKey,m_ks);
|
|
//return lastMinKey;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
// . we now boost m_minRecSizes to account for negative recs in certain files
|
|
// . TODO: use floats for averages, not ints
|
|
void Msg3::compensateForNegativeRecs ( RdbBase *base ) {
|
|
verify_signature();
|
|
// add up counts from each map
|
|
int64_t totalNegatives = 0;
|
|
int64_t totalPositives = 0;
|
|
int64_t totalFileSize = 0;
|
|
for (int32_t i = 0 ; i < m_numFileNums ; i++) {
|
|
int32_t fileId = m_scan[i].m_fileId;
|
|
|
|
RdbMap *map = base->getMapById(fileId);
|
|
if (!map) {
|
|
log(LOG_LOGIC,"net: msg3: getMapById with fileId=%" PRId32" returns NULL. bad engineer.", fileId);
|
|
continue;
|
|
}
|
|
|
|
totalNegatives += map->getNumNegativeRecs();
|
|
totalPositives += map->getNumPositiveRecs();
|
|
totalFileSize += map->getFileSize();
|
|
}
|
|
// add em all up
|
|
int64_t totalNumRecs = totalNegatives + totalPositives;
|
|
// if we have no records on disk, why are we reading from disk?
|
|
if ( totalNumRecs == 0 ) return ;
|
|
// what is the size of a negative record?
|
|
int32_t negRecSize = m_ks;
|
|
if ( base->getFixedDataSize() == -1 ) negRecSize += 4;
|
|
// what is the size of all positive recs combined?
|
|
int64_t posFileSize = totalFileSize - negRecSize * totalNegatives;
|
|
// . we often overestimate the size of the negative recs for indexdb
|
|
// because it uses half keys...
|
|
// . this can make posFileSize go negative and ultimately result in
|
|
// a newMin of 0x7fffffff which really fucks us up
|
|
if ( posFileSize < 0 ) posFileSize = 0;
|
|
// what is the average size of a positive record?
|
|
int32_t posRecSize = 0;
|
|
if ( totalPositives > 0 ) posRecSize = posFileSize / totalPositives;
|
|
// we annihilate the negative recs and their positive pairs
|
|
int64_t loss = totalNegatives * (negRecSize + posRecSize);
|
|
// what is the percentage lost?
|
|
int64_t lostPercent = (100LL * loss) / totalFileSize;
|
|
// how much more should we read to compensate?
|
|
int32_t newMin = ((int64_t)m_minRecSizes * (lostPercent + 100LL))/100LL;
|
|
// newMin will never be smaller unless it overflows
|
|
if ( newMin < m_minRecSizes ) newMin = 0x7fffffff;
|
|
// print msg if we changed m_minRecSizes
|
|
//if ( newMin != m_minRecSizes )
|
|
// log("Msg3::compensated from minRecSizes from %" PRId32" to %" PRId32,
|
|
// m_minRecSizes, newMin );
|
|
// set the new min
|
|
m_minRecSizes = newMin;
|
|
}
|