privacore-open-source-searc.../Msg5.cpp

1541 lines
52 KiB
C++

#include "Msg5.h"
#include "RdbBase.h"
#include "Rdb.h"
#include "Stats.h"
#include "JobScheduler.h"
#include "Msg0.h"
#include "Process.h"
#include "ip.h"
#include "Sanity.h"
#include "Conf.h"
#include "Mem.h"
#include "Errno.h"
#include "fctypes.h"
#include <cerrno>
//#define GBSANITYCHECK
static const int signature_init = 0x2c3a4f5d;
int32_t g_numCorrupt = 0;
Msg5::Msg5()
: m_rdbId(RDB_NONE),
m_isSingleUnmergedListGet(false)
{
m_waitingForList = false;
//m_waitingForMerge = false;
m_numListPtrs = 0;
set_signature();
// Coverity
m_list = NULL;
memset(m_startKey, 0, sizeof(m_startKey));
memset(m_endKey, 0, sizeof(m_endKey));
m_callback = NULL;
m_state = NULL;
m_calledCallback = 0;
m_includeTree = false;
m_numFiles = 0;
m_startFileNum = 0;
m_minRecSizes = 0;
m_rdbId = RDB_NONE;
m_newMinRecSizes = 0;
m_round = 0;
m_totalSize = 0;
m_readAbsolutelyNothing = false;
m_niceness = 0;
m_doErrorCorrection = false;
m_hadCorruption = false;
m_msg0 = NULL;
m_startTime = 0;
memset(m_listPtrs, 0, sizeof(m_listPtrs));
m_removeNegRecs = false;
m_oldListSize = 0;
m_maxRetries = 0;
m_isRealMerge = false;
m_ks = 0;
m_collnum = 0;
m_errno = 0;
// PVS-Studio
memset(m_fileStartKey, 0, sizeof(m_fileStartKey));
memset(m_minEndKey, 0, sizeof(m_minEndKey));
reset();
}
Msg5::~Msg5() {
verify_signature();
reset();
clear_signature();
}
// frees m_treeList
void Msg5::reset() {
verify_signature();
if ( m_waitingForList ) { // || m_waitingForMerge ) {
log("disk: Trying to reset a class waiting for a reply.");
// might being doing an urgent exit (mainShutdown(1)) or
// g_process.shutdown(), so do not core here
//g_process.shutdownAbort(true);
}
m_msg3.reset();
KEYMIN(m_prevKey,MAX_KEY_BYTES);// m_ks); m_ks is invalid
m_numListPtrs = 0;
// and the tree list
m_treeList.freeList();
verify_signature();
}
bool Msg5::getSingleUnmergedList(rdbid_t rdbId,
collnum_t collnum,
RdbList *list,
const void *startKey,
const void *endKey,
int32_t recSizes, // requested scan size(-1 all)
int32_t fileNum, // file to scan
void *state, // for callback
void (*callback)(void *state, RdbList *list, Msg5 *msg5),
int32_t niceness)
{
m_isSingleUnmergedListGet = true;
return getList(rdbId,collnum, list,
startKey, endKey,
recSizes, true,
fileNum, 1, //startFileNum, numFiles
state, callback,
niceness,
false,-1,-false);
}
bool Msg5::getTreeList(RdbList *result, rdbid_t rdbId, collnum_t collnum, const void *startKey, const void *endKey) {
m_rdbId = rdbId;
m_collnum = collnum;
m_newMinRecSizes = -1;
int32_t dummy1,dummy2,dummy3,dummy4;
return getTreeList(result, startKey, endKey, &dummy1, &dummy2, &dummy3, &dummy4);
}
bool Msg5::getTreeList(RdbList *result, const void *startKey, const void *endKey, int32_t *numPositiveRecs,
int32_t *numNegativeRecs, int32_t *memUsedByTree, int32_t *numUsedNodes) {
Rdb *rdb = getRdbFromId(m_rdbId);
return rdb->getTreeList(result, m_collnum, startKey, endKey, m_newMinRecSizes, numPositiveRecs, numNegativeRecs, memUsedByTree, numUsedNodes);
}
// . return false if blocked, true otherwise
// . set g_errno on error
// . fills "list" with the requested list
// . we want at least "minRecSizes" bytes of records, but not much more
// . we want all our records to have keys in the [startKey,endKey] range
// . final merged list should try to have a size of at least "minRecSizes"
// . if may fall short if not enough records were in [startKey,endKey] range
// . endKey of list will be set so that all records from startKey to that
// endKey are in the list
// . a minRecSizes of 0x7fffffff means virtual inifinty, but it also has
// another special meaning. it tells msg5 to tell RdbTree's getList() to
// pre-allocate the list size by counting the recs ahead of time.
bool Msg5::getList ( rdbid_t rdbId,
collnum_t collnum ,
RdbList *list ,
const void *startKey_ ,
const void *endKey_ ,
int32_t minRecSizes , // requested scan size(-1 none)
bool includeTree ,
int32_t startFileNum , // first file to scan
int32_t numFiles , // rel. to startFileNum,-1 all
void *state , // for callback
void (* callback ) ( void *state ,
RdbList *list ,
Msg5 *msg5 ) ,
int32_t niceness ,
bool doErrorCorrection ,
int32_t maxRetries ,
bool isRealMerge) {
verify_signature();
const char *startKey = static_cast<const char*>(startKey_);
const char *endKey = static_cast<const char*>(endKey_);
char fixedEndKey[MAX_KEY_BYTES];
// make sure we are not being re-used prematurely
if ( m_waitingForList ) {
log(LOG_LOGIC,"disk: Trying to reset a class waiting for a reply.");
g_process.shutdownAbort(true);
}
if ( collnum < 0 ) {
log(LOG_WARN,"msg5: called with bad collnum=%" PRId32,(int32_t)collnum);
g_errno = ENOCOLLREC;
return true;
}
// assume no error
g_errno = 0;
// sanity
if ( ! list ) gbshutdownLogicError();
// . reset the provided list
// . this will not free any mem it may have alloc'd but it will set
// m_listSize to 0 so list->isEmpty() will return true
list->reset();
// key size set
m_ks = getKeySizeFromRdbId(rdbId);
// . complain if endKey < startKey
// . no because IndexReadInfo does this to prevent us from reading
// a list
if ( KEYCMP(startKey,endKey,m_ks)>0 ) return true;
// log("Msg5::readList: startKey > endKey warning");
// we no longer allow negative minRecSizes
if ( minRecSizes < 0 ) {
if ( g_conf.m_logDebugDb )
log(LOG_LOGIC,"net: msg5: MinRecSizes < 0, using 2GB.");
minRecSizes = 0x7fffffff;
//g_process.shutdownAbort(true);
}
// ensure startKey last bit clear, endKey last bit set
if ( !KEYNEG(startKey) )
log(LOG_REMIND,"net: msg5: StartKey lastbit set.");
// fix endkey
if ( KEYNEG(endKey) ) {
log(LOG_REMIND,"net: msg5: EndKey lastbit clear. Fixing.");
//Previously it was fixed by setting the LSB in the endKey
//input parameter. Code review showed that it should only
//happend when called from doInject in main.cpp due to a bug.
//Still, it could happen due to damaged network packets.
KEYSET(fixedEndKey,endKey,m_ks);
fixedEndKey[0] |= 0x01;
endKey = fixedEndKey;
}
// remember stuff
m_rdbId = rdbId;
m_collnum = collnum;
m_list = list;
KEYSET(m_startKey,startKey,m_ks);
KEYSET(m_endKey,endKey,m_ks);
m_minRecSizes = minRecSizes;
m_includeTree = includeTree;
m_startFileNum = startFileNum;
m_numFiles = numFiles;
m_state = state;
m_callback = callback;
m_calledCallback= 0;
m_niceness = niceness;
m_maxRetries = maxRetries;
m_oldListSize = 0;
m_isRealMerge = isRealMerge;
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base = getRdbBase( m_rdbId, m_collnum );
if ( ! base ) {
return true;
}
// . these 2 vars are used for error correction
// . doRemoteLookup is -2 if it's up to us to decide
m_doErrorCorrection = doErrorCorrection;
// these get changed both by cache and gotList()
m_newMinRecSizes = minRecSizes;
m_round = 0;
m_readAbsolutelyNothing = false;
KEYSET(m_fileStartKey,m_startKey,m_ks);
#ifdef GBSANITYCHECK
log("msg5: sk=%s", KEYSTR(m_startKey,m_ks));
log("msg5: ek=%s", KEYSTR(m_endKey,m_ks));
#endif
// . make sure we set base above so Msg0.cpp:268 doesn't freak out
// . if startKey is > endKey list is empty
if ( KEYCMP(m_startKey,m_endKey,m_ks)>0 ) return true;
// same if minRecSizes is 0
if ( m_minRecSizes == 0 ) return true;
// tell Spider.cpp not to nuke us until we get back!!!
m_waitingForList = true;
// timing debug
//log("Msg5:getting list startKey.n1=%" PRIu32,m_startKey.n1);
// start the read loop - hopefully, will only loop once
if ( readList ( ) ) {
m_waitingForList = false;
return true;
}
// we blocked!!! must call m_callback
return false;
}
// . returns false if blocked, true otherwise
// . sets g_errno on error
// . reads from cache, tree and files
// . calls gotList() to do the merge if we need to
// . loops until m_minRecSizes is satisfied OR m_endKey is reached
bool Msg5::readList ( ) {
verify_signature();
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
Rdb *rdb = getRdbFromId(m_rdbId);
RdbBase *base = getRdbBase( m_rdbId, m_collnum );
if ( ! base ) {
return true;
}
do { //until no more read is needed
// . reset our tree list
// . sets fixedDataSize here in case m_includeTree is false because
// we don't want merge to have incompatible lists
m_treeList.reset();
m_treeList.setFixedDataSize ( base->getFixedDataSize() );
m_treeList.setKeySize(m_ks);
// reset Msg3 in case gotList() is called without calling
// Msg3::readList() first
m_msg3.reset();
// assume lists have no errors in them
m_hadCorruption = false;
// . restrict tree's endkey by calling msg3 now...
// . this saves us from spending 1000ms to read 100k of negative
// spiderdb recs from the tree only to have most of the for naught
// . this call will ONLY set m_msg3.m_endKey
// . but only do this if dealing with spiderdb really
// . also now for tfndb, since we scan that in RdbDump.cpp to dedup
// the spiderdb list we are dumping to disk. it is really for any
// time when the endKey is unbounded, so check that now
const char *treeEndKey = m_endKey;
bool compute = true;
if ( ! m_includeTree ) compute = false;
// if endKey is "unbounded" then bound it...
char max[MAX_KEY_BYTES]; KEYMAX(max,m_ks);
if ( KEYCMP(m_endKey,max,m_ks) != 0 ) compute = false;
// BUT don't bother if a small list, probably faster just to get it
if ( m_newMinRecSizes < 1024 ) compute = false;
// try to make merge read threads higher priority than
// regular spider read threads
int32_t niceness = m_niceness;
if ( niceness > 0 ) niceness = 2;
if ( m_isRealMerge ) niceness = 1;
if ( compute ) {
m_msg3.readList ( m_rdbId ,
m_collnum ,
m_fileStartKey , // modified by gotList()
m_endKey ,
m_newMinRecSizes , // modified by gotList()
m_startFileNum ,
m_numFiles ,
NULL , // state
NULL , // callback
niceness ,
0 , // retry num
m_maxRetries , // -1=def
true); // just get endKey?
if ( g_errno ) {
log(LOG_ERROR,"db: Msg5: getting endKey: %s",mstrerror(g_errno));
return true;
}
treeEndKey = m_msg3.m_constrainKey;
}
// . get the list from our tree
// . set g_errno and return true on error
// . it is crucial that we get tree list before spawning a thread
// because Msg22 will assume that if the TitleRec is in the tree
// now we'll get it, because we need to have the latest version
// of a particular document and this guarantees it. Otherwise, if
// the doc is not in the tree then tfndb must tell its file number.
// I just don't want to think its in the tree then have it get
// dumped out right before we read it, then we end up getting the
// older version rather than the new one in the tree which tfndb
// does not know about until it is dumped out. so we could lose
// recs between the in-memory and on-disk phases this way.
// . however, if we are getting a titlerec, we first read the tfndb
// list from the tree then disk. if the merge replaces the tfndb rec
// we want with another while we are reading the tfndb list from
// disk, then the tfndb rec we got from the tree was overwritten!
// so then we'd read from the wrong title file number (tfn) and
// not find the rec because the merge just removed it. so keeping
// the tfndb recs truly in sync with the titledb recs requires
// some dancing. the simplest way is to just scan all titleRecs
// in the event of a disagreement... so turn on m_scanAllIfNotFound,
// which essentially disregards tfndb and searches all the titledb
// files for the titleRec.
if ( m_includeTree ) {
int32_t numNegativeRecs = 0;
int32_t numPositiveRecs = 0;
int32_t memUsedByTree = 0;
int32_t numRecs = 0;
if(!getTreeList(&m_treeList, m_fileStartKey, treeEndKey, &numPositiveRecs, &numNegativeRecs, &memUsedByTree, &numRecs)) {
return true;
}
// if our recSize is fixed we can boost m_minRecSizes to
// compensate for these deletes when we call m_msg3.readList()
int32_t rs = base->getRecSize() ;
// . use an avg. rec size for variable-length records
// . just use tree to estimate avg. rec size
if ( rs == -1) {
// what is the minimal record size?
int32_t minrs = rdb->getKeySize() + 4;
// get avg record size
if(numRecs > 0)
rs = memUsedByTree / numRecs;
// add 10% for deviations
rs = (rs * 110) / 100;
// ensure a minimal record size
if(rs < minrs)
rs = minrs;
}
// . TODO: get avg recSize in this rdb (avgRecSize*numNeg..)
// . don't do this if we're not merging because it makes
// it harder to compute the # of bytes to read used to
// pre-allocate a reply buf for Msg0 when !m_doMerge
// . we set endKey for spiderdb when reading from tree above
// based on the current minRecSizes so do not mess with it
// in that case.
if ( m_rdbId != RDB_SPIDERDB_DEPRECATED ) {
//m_newMinRecSizes += rs * numNegativeRecs;
int32_t nn = m_newMinRecSizes + rs * numNegativeRecs;
if ( rs > 0 && nn < m_newMinRecSizes ) nn = 0x7fffffff;
m_newMinRecSizes = nn;
}
// . if m_endKey = m_startKey + 1 and our list has a rec
// then no need to check the disk, it was in the tree
// . it could be a negative or positive record
// . tree can contain both negative/positive recs for the key
// so we should do the final merge in gotList()
// . that can happen because we don't do an annihilation
// because the positive key may be being dumped out to disk
// but it really wasn't and we get stuck with it
char kk[MAX_KEY_BYTES];
KEYSET(kk,m_startKey,m_ks);
KEYINC(kk,m_ks);
if ( KEYCMP(m_endKey,kk,m_ks)==0 && ! m_treeList.isEmpty() ) {
return gotList();
}
}
// if we don't use the tree then at least set the key bounds cuz we
// pick the min endKey between diskList and treeList below
else {
m_treeList.set ( m_fileStartKey , m_endKey );
}
// . if we're reading indexlists from 2 or more sources then some
// will probably be compressed from 12 byte keys to 6 byte keys
// . it is typically only about 1% when files are small,
// and smaller than that when a file is large
// . but just to be save reading an extra 2% won't hurt too much
if ( base->useHalfKeys() ) {
m_numSources = m_numFiles;
if (m_numSources == -1) {
m_numSources = base->getNumFiles();
}
// if tree is empty, don't count it
if (m_includeTree && !m_treeList.isEmpty()) {
m_numSources++;
}
// . if we don't do a merge then we return the list directly
// (see condition where m_numListPtrs == 1 below)
// from Msg3 (or tree) and we must hit minRecSizes as
// close as possible for Msg3's call to constrain() so
// we don't overflow the UdpSlot's TMPBUFSIZE buffer
// . if we just arbitrarily boost m_newMinRecSizes then
// the single list we get back from Msg3 will not have
// been constrained with m_minRecSizes, but constrained
// with m_newMinRecSizes (x2%) and be too big for our UdpSlot
if ( m_numSources >= 2 ) {
int64_t newmin = (int64_t)m_newMinRecSizes ;
newmin = (newmin * 50LL) / 49LL ;
// watch out for wrap around
if ( (int32_t)newmin < m_newMinRecSizes )
m_newMinRecSizes = 0x7fffffff;
else m_newMinRecSizes = (int32_t)newmin;
}
}
// limit to 20MB so we don't go OOM!
if ( m_newMinRecSizes > 2 * m_minRecSizes &&
m_newMinRecSizes > 20000000 )
m_newMinRecSizes = 20000000;
const char *diskEndKey = m_treeList.getEndKey();
// sanity check
if ( m_treeList.getKeySize() != m_ks ) { g_process.shutdownAbort(true); }
// clear just in case
g_errno = 0;
// . now get from disk
// . use the cache-modified constraints to reduce reading time
// . return false if it blocked
if ( ! m_msg3.readList ( m_rdbId ,
m_collnum ,
m_fileStartKey , // modified by gotList()
diskEndKey ,
m_newMinRecSizes , // modified by gotList()
m_startFileNum ,
m_numFiles ,
m_callback ? this : NULL,
m_callback ? &gotListWrapper0 : NULL,
niceness ,
0 , // retry num
m_maxRetries , // max retries (-1=def)
false))
return false;
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . updates m_newMinRecSizes
// . updates m_fileStartKey to the endKey of m_list + 1
if ( ! gotList () ) return false;
// bail on error from gotList() or Msg3::readList()
if ( g_errno ) return true;
// we may need to re-call getList
verify_signature();
} while(needsRecall());
// we did not block
return true;
}
bool Msg5::needsRecall() {
verify_signature();
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
Rdb *rdb = getRdbFromId(m_rdbId);
RdbBase *base = getRdbBase ( m_rdbId , m_collnum );
// if collection was deleted from under us, base will be NULL
if ( ! base && ! g_errno ) {
log(LOG_WARN,"msg5: base lost for rdbid=%" PRId32" collnum %" PRId32,
(int32_t)m_rdbId,(int32_t)m_collnum);
g_errno = ENOCOLLREC;
return false;
}
bool rc = true;
// . return true if we're done reading
// . sometimes we'll need to read more because Msg3 will shorten the
// endKey to better meat m_minRecSizes but because of
// positive/negative record annihilation on variable-length
// records it won't read enough
if( g_errno || m_newMinRecSizes <= 0 ) {
rc = false;
}
// limit to just doledb for now in case it results in data loss
if( rc && m_readAbsolutelyNothing && (m_rdbId==RDB_DOLEDB||m_rdbId==RDB_SPIDERDB_DEPRECATED) ) {
rc = false;
}
// seems to be ok, let's open it up to fix this bug where we try
// to read too many bytes a small titledb and it does an infinite loop
if( rc && m_readAbsolutelyNothing ) {
log(LOG_WARN, "rdb: read absolutely nothing more for dbname=%s on cn=%" PRId32, base->getDbName(),(int32_t)m_collnum);
rc = false;
}
if( rc && KEYCMP(m_list->getEndKey(), m_endKey, m_ks ) >= 0 ) {
rc = false;
}
if( rc ) {
// this is kinda important. we have to know if we are abusing
// the disk... we should really keep stats on this...
bool logIt = true;
if( m_round > 100 && (m_round % 1000) != 0 ) {
logIt = false;
}
// seems very common when doing rebalancing then merging to have
// to do at least one round of re-reading, so note that
if( m_round == 0 ) {
logIt = false;
}
// so common for doledb because of key annihilations
if( m_rdbId == RDB_DOLEDB && m_round < 10 ) {
logIt = false;
}
if ( logIt ) {
log(LOG_WARN,"db: Reading %" PRId32" again from %s (need %" PRId32" total "
"got %" PRId32" totalListSizes=%" PRId32" sk=%s) "
"cn=%" PRId32" this=%p round=%" PRId32".",
m_newMinRecSizes , base->getDbName() , m_minRecSizes,
m_list->getListSize(),
m_totalSize,
KEYSTR(m_startKey,m_ks),
(int32_t)m_collnum,this, m_round );
}
m_round++;
// record how many screw ups we had so we know if it hurts performance
rdb->didReSeek();
// return true will try to read more from disk
}
else {
if( m_list ) {
m_list->resetListPtr();
}
// return false cuz we don't need a recall
}
return rc;
}
void Msg5::gotListWrapper0(void *state) {
Msg5 *that = static_cast<Msg5*>(state);
that->gotListWrapper();
}
void Msg5::gotListWrapper() {
verify_signature();
if(m_calledCallback) gbshutdownLogicError();
if(!m_msg3.areAllScansCompleted()) gbshutdownLogicError();
// . this sets g_errno on error
// . this will merge cache/tree and disk lists into m_list
// . it will update m_newMinRecSizes
// . it will also update m_fileStartKey to the endKey of m_list + 1
// . returns false if it blocks
if ( ! gotList ( ) ) return;
if(!m_msg3.areAllScansCompleted()) gbshutdownLogicError();
// . throw it back into the loop if necessary
// . only returns true if COMPLETELY done
if ( needsRecall() && ! readList() ) return;
// sanity check
if ( m_calledCallback ) abort();
// set it now
m_calledCallback = 1;
// we are no longer waiting for the list
m_waitingForList = false;
// when completely done call the callback
m_callback ( m_state, m_list, this );
}
// . this is the NEW gotList() !!! mdw
// . returns false if blocked, true otherwise
// . sets g_errno on error
bool Msg5::gotList ( ) {
verify_signature();
if(!m_msg3.areAllScansCompleted()) gbshutdownLogicError();
// return if g_errno is set
if ( g_errno && g_errno != ECORRUPTDATA ) return true;
return gotList2();
}
// . this is the NEW gotList() !!! mdw
// . returns false if blocked, true otherwise
// . sets g_errno on error
bool Msg5::gotList2 ( ) {
verify_signature();
// reset this
m_startTime = 0LL;
// return if g_errno is set
if ( g_errno && g_errno != ECORRUPTDATA ) return true;
// put all the lists in an array of list ptrs
int32_t n = 0;
// all the disk lists
for ( int32_t i = 0 ; n < MAX_RDB_FILES && i < m_msg3.getNumLists(); ++i ) {
RdbList *list = m_msg3.getList(i);
if(list==NULL) gbshutdownLogicError();
// . skip list if empty
// . was this causing problems?
if ( ! m_isRealMerge ) {
if ( list->isEmpty() ) continue;
}
m_listPtrs [ n++ ] = list;
}
// sanity check.
if ( m_msg3.getNumLists() > MAX_RDB_FILES )
log(LOG_LOGIC,"db: Msg3 had more than %" PRId32" lists.",
(int32_t)MAX_RDB_FILES);
// . get smallest endKey from all the lists
// . all lists from Msg3 should have the same endKey, but
// m_treeList::m_endKey may differ
// . m_treeList::m_endKey should ALWAYS be >= that of the files
// . constrain m_treeList to the startKey/endKey of the files
//m_minEndKey = m_endKey;
KEYSET(m_minEndKey,m_endKey,m_ks);
for ( int32_t i = 0 ; i < n ; i++ ) {
//if ( m_listPtrs[i]->getEndKey() < m_minEndKey )
// m_minEndKey = m_listPtrs[i]->getEndKey();
// sanity check
//if ( KEYNEG(m_listPtrs[i]->getEndKey()) ) {
// g_process.shutdownAbort(true); }
if ( KEYCMP(m_listPtrs[i]->getEndKey(),m_minEndKey,m_ks)<0 ) {
KEYSET(m_minEndKey,m_listPtrs[i]->getEndKey(),m_ks);
// crap, if list is all negative keys, then the
// end key seems negative too! however in this
// case RdbScan::m_endKey seems positive so
// maybe we got a negative endkey in constrain?
//if (! (m_minEndKey[0] & 0x01) )
// log("msg5: list had bad endkey");
}
}
// sanity check
//if ( KEYNEG( m_minEndKey) ) {g_process.shutdownAbort(true); }
// . is treeList included?
// . constrain treelist for the merge
// . if used, m_listPtrs [ m_numListPtrs - 1 ] MUST equal &m_treeList
// since newer lists are listed last so their records override older
if ( m_includeTree && ! m_treeList.isEmpty() ) {
// only constrain if we are NOT the sole list because the
// constrain routine sets our endKey to virtual infinity it
// seems like and that makes SpiderCache think that spiderdb
// is exhausted when it is only in the tree. so i added the
// if ( n > 0 ) condition here.
if ( n > 0 ) {
char k[MAX_KEY_BYTES];
m_treeList.getCurrentKey(k);
m_treeList.constrain(m_startKey, m_minEndKey, -1, 0, k, m_rdbId, "tree");
}
m_listPtrs [ n++ ] = &m_treeList;
}
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base = getRdbBase( m_rdbId, m_collnum );
if ( ! base ) {
return true;
}
// if not enough lists, use a dummy list to trigger merge so tfndb
// filter happens and we have a chance to weed out old titleRecs
if ( m_rdbId == RDB_TITLEDB && m_numFiles != 1 && n == 1 && m_isRealMerge ) {
//log(LOG_LOGIC,"db: Adding dummy list.");
m_dummy.set ( NULL , // list data
0 , // list data size
NULL , // alloc
0 , // alloc size
m_startKey ,
m_minEndKey ,
base->getFixedDataSize() ,
true , // own data?
base->useHalfKeys() ,
m_ks );
m_listPtrs [ n++ ] = &m_dummy;
}
if ( n >= MAX_RDB_FILES ) {
log( LOG_LOGIC, "net: msg5: Too many lists (%" PRId32" | %" PRId32").", m_msg3.getNumLists(), n );
}
// store # of lists here for use by the call to merge_r()
m_numListPtrs = n;
// count the sizes
m_totalSize = 0;
for ( int32_t i = 0 ; i < m_numListPtrs ; i++ ) {
m_totalSize += m_listPtrs[ i ]->getListSize();
}
// . but don't breach minRecSizes
// . this totalSize is just to see if we should spawn a thread, really
//if ( totalSize > m_minRecSizes ) m_totalSize = m_minRecSizes;
#ifdef GBSANITYCHECK
// who uses this now?
//log("Msg5:: who is merging?????");
// timing debug
// m_startKey.n1,
// gettimeofdayInMilliseconds()-m_startTime ,
// m_diskList.getListSize());
// ensure both lists are legit
// there may be negative keys in the tree
// diskList may now also have negative recs since Msg3 no longer
// removes them for fears of delayed positive keys not finding their
// negative key because it was merged out by RdbMerge
for ( int32_t i = 0 ; i < m_numListPtrs ; i++ )
m_listPtrs[i]->checkList_r(true);
#endif
// . if no lists we're done
// . if we were a recall, then list may not be empty
if ( m_numListPtrs == 0 && m_list->isEmpty() ) {
// just copy ptrs from this list into m_list
m_list->set ( NULL , // list data
0 , // list data size
NULL , // alloc
0 , // alloc size
m_startKey ,
m_endKey ,
base->getFixedDataSize() ,
true , // own data?
base->useHalfKeys() ,
m_ks );
// . add m_list to our cache if we should
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
return doneMerging();
}
if ( m_numListPtrs == 0 ) {
m_readAbsolutelyNothing = true;
}
// if all lists from msg3 were 0... tree still might have something
if ( m_totalSize == 0 && m_treeList.isEmpty() ) {
m_readAbsolutelyNothing = true;
}
// if msg3 had corruption in a list which was detected in contrain_r()
if ( g_errno == ECORRUPTDATA ) {
// if we only had one list, we were not doing a merge
// so return g_errno to the requested so he tries from the twin
if ( m_numListPtrs == 1 ) {
return true;
}
// assume nothing is wrong
g_errno = 0;
// if m_doErrorCorrection is true, repairLists() should fix
}
// . should we remove negative recs from final merged list?
// . if we're reading from root and tmp merge file of root
// . should we keep this out of the thread in case a file created?
m_removeNegRecs = base->isRootFile(m_startFileNum);
// . if we only have one list, just use it
// . Msg3 should have called constrain() on it so it's m_list so
// m_listEnd and m_listSize all fit m_startKey/m_endKey/m_minRecSizes
// to a tee
// . if it's a tree list it already fits to a tee
// . same with cache list?? better be...
// . if we're only reading one list it should always be empty right?
// . i was getting negative keys in my RDB_DOLEDB list which caused
// Spider.cpp to core, so i add the "! m_removeNegRecs" constraint
// here...
// . TODO: add some code to just filter out the negative recs
// super quick just for this purpose
// . crap, rather than do that just deal with the negative recs
// in the caller code... in this case Spider.cpp::gotDoledbList2()
if ( m_numListPtrs == 1 && m_list->isEmpty() &&
// just do this logic for doledb now, it was causing us to
// return search results whose keys were negative indexdb keys.
// or later we can just write some code to remove the neg
// recs from the single list!
( m_rdbId == RDB_LINKDB || m_rdbId == RDB_DOLEDB ||
// this speeds up our queryloop querylog parsing in seo.cpp quite a bit
( m_rdbId == RDB_POSDB && m_numFiles == 1 ) ) ) {
// log any problems
if ( m_listPtrs[0]->getOwnData() ) {
// . bitch if not empty
// . NO! might be our second time around if we had key
// annihilations between file #0 and the tree, and now
// we only have 1 non-empty list ptr, either from the tree
// or from the file
//if ( ! m_list->isEmpty() )
// log("Msg5::gotList: why is it not empty? size=%" PRId32,
// m_list->getListSize() );
// just copy ptrs from this list into m_list
m_list->set ( m_listPtrs[0]->getList () ,
m_listPtrs[0]->getListSize () ,
m_listPtrs[0]->getAlloc () ,
m_listPtrs[0]->getAllocSize () ,
m_listPtrs[0]->getStartKey () ,
m_listPtrs[0]->getEndKey () ,
m_listPtrs[0]->getFixedDataSize () ,
true , // own data?
m_listPtrs[0]->getUseHalfKeys () ,
m_ks );
// ensure we don't free it when we loop on freeLists() below
m_listPtrs[0]->setOwnData ( false );
// gotta set this too!
if ( m_listPtrs[0]->isLastKeyValid() ) {
m_list->setLastKey ( m_listPtrs[0]->getLastKey() );
}
// . remove titleRecs that shouldn't be there
// . if the tfn of the file we read the titlerec from does not
// match the one in m_tfndbList, then remove it
// . but if we're not merging lists, why remove it?
//if ( m_rdbId == RDB_TITLEDB && m_msg3.m_numFileNums > 1 )
// stripTitleRecs ( m_list , m_tfns[0] , m_tfndbList );
// . add m_list to our cache if we should
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
return doneMerging();
} else {
log(LOG_LOGIC,"db: Msg5: list does not own data.");
}
}
// time the perparation and merge
m_startTime = gettimeofdayInMilliseconds();
// . merge the lists
// . the startKey of the finalList is m_startKey, the first time
// . but after that, we're adding diskLists, so us m_fileStartKey
// . we're called multiple times for the same look-up in case of
// delete records in a variable rec-length db cause some recs in our
// disk lookups to be wiped out, thereby falling below minRecSizes
// . this will set g_errno and return false on error (ENOMEM,...)
// . older list goes first so newer list can override
// . remove all negative-keyed recs since Msg5 is a high level msg call
if(!m_isSingleUnmergedListGet) {
// . prepare for the merge, grows the buffer
// . this returns false and sets g_errno on error
// . should not affect the current list in m_list, only build on top
if ( ! m_list->prepareForMerge ( m_listPtrs, m_numListPtrs, m_minRecSizes ) ) {
log( LOG_WARN, "net: Had error preparing to merge lists from %s: %s", base->getDbName(),mstrerror(g_errno));
return true;
}
} else {
//no need to prepare for merge. We'll just steal the one-and-only list in mergeLists()
}
if(m_callback) {
if (g_jobScheduler.submit(mergeListsWrapper, mergeDoneWrapper, this, m_isRealMerge ? thread_type_file_merge : thread_type_query_merge, m_niceness)) {
return false;
}
// thread creation failed
if ( g_jobScheduler.are_new_jobs_allowed() )
log(LOG_INFO,
"net: Failed to create thread to merge lists. Doing "
"blocking merge. (%s)",mstrerror(g_errno));
}
// clear g_errno because it really isn't a problem, we just block
g_errno = 0;
// repair any corruption
repairLists();
// do it
mergeLists();
// . add m_list to our cache if we should
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
return doneMerging();
}
// thread will run this first
void Msg5::mergeListsWrapper(void *state) {
// we're in a thread now!
Msg5 *that = static_cast<Msg5*>(state);
verify_signature_at(that->signature);
// assume no error since we're at the start of thread call
that->m_errno = 0;
// repair any corruption
that->repairLists();
verify_signature_at(that->signature);
// do the merge
that->mergeLists();
verify_signature_at(that->signature);
if (g_errno && !that->m_errno) {
that->m_errno = g_errno;
}
}
// . now we're done merging
// . when the thread is done we get control back here, in the main process
// Use of ThreadEntry parameter is NOT thread safe
void Msg5::mergeDoneWrapper(void *state, job_exit_t exit_type) {
Msg5 *that = static_cast<Msg5 *>(state);
verify_signature_at(that->signature);
g_errno = that->m_errno;
that->mergeDone(exit_type);
}
void Msg5::mergeDone(job_exit_t /*exit_type*/) {
verify_signature();
if(m_calledCallback) gbshutdownCorrupted();
// we MAY be in a thread now
// debug msg
//log("msg3 back from merge thread (msg5=%" PRIu32")",THIS->m_state);
// . add m_list to our cache if we should
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
if ( ! doneMerging() ) return;
if(m_calledCallback) gbshutdownCorrupted();
// . throw it back into the loop if necessary
// . only returns true if COMPLETELY done
if ( needsRecall() ) {
if(m_calledCallback) gbshutdownCorrupted();
if ( ! readList() ) return;
if(m_calledCallback) gbshutdownCorrupted();
}
// sanity check
if(m_calledCallback) gbshutdownCorrupted();
// we are no longer waiting for the list
m_waitingForList = false;
// set it now
m_calledCallback = 3;
verify_signature();
// when completely done call the callback
m_callback ( m_state, m_list, this );
}
// check lists in the thread
void Msg5::repairLists() {
verify_signature();
// assume none
m_hadCorruption = false;
// return if no need to
if ( ! m_doErrorCorrection ) return;
// or if msg3 already check them and they were ok
if ( m_msg3.isListChecked() ) return;
// if msg3 said they were corrupt... this happens when the map
// is generated over a bad data file and ends up writing the same key
// on more than 500MB worth of data. so when we try to read a list
// that has the startkey/endkey covering that key, the read size
// is too big to ever happen...
if ( m_msg3.listHadCorruption() ) m_hadCorruption = true;
// time it
//m_checkTime = gettimeofdayInMilliseconds();
for ( int32_t i = 0 ; i < m_numListPtrs ; i++ ) {
// . did it breech our minRecSizes?
// . only check for indexdb, our keys are all size 12
// . is this a mercenary problem?
// . cored on 'twelfth night cake'
// . no... this happens after merging the lists. if we had
// a bunch of negative recs we over read anticipating some
// recs will be deleted, so it isn't really necessary to
// bitch about this here..
if ( g_conf.m_logDebugDb &&
m_rdbId == RDB_POSDB &&
m_listPtrs[i]->getListSize() > m_minRecSizes + 12 )
// just log it for now, maybe force core later
log(LOG_DEBUG,"db: Index list size is %" PRId32" but "
"minRecSizes is %" PRId32".",
m_listPtrs[i]->getListSize() ,
m_minRecSizes );
#ifdef GBSANITYCHECK
// core dump on corruption
bool status = m_listPtrs[i]->checkList_r(true);
#else
// this took like 50ms (-O3) on lenny on a 4meg list
bool status = m_listPtrs[i]->checkList_r(false);
#endif
// if no errors, check the next list
if ( status ) continue;
// . show the culprit file
// . logging the key ranges gives us an idea of how long
// it will take to patch the bad data
int32_t nn = m_msg3.getFileNums();
// TODO: fix this. can't call Collectiondb::getBase from within a thread!
RdbBase *base = getRdbBase ( m_rdbId , m_collnum );
if ( i < nn && base ) {
BigFile *bf = base->getFileById(m_msg3.getFileId(i));
log( LOG_WARN, "db: Corrupt filename is %s in collnum %" PRId32".", bf->getFilename(), (int32_t)m_collnum );
log( LOG_WARN, "db: startKey=%s endKey=%s",
KEYSTR( m_listPtrs[i]->getStartKey(), m_ks ),
KEYSTR( m_listPtrs[i]->getEndKey(), m_ks ) );
}
// . remove the bad eggs from the list
// . TODO: support non-fixed data sizes
//if ( m_listPtrs[i]->getFixedDataSize() >= 0 )
m_listPtrs[i]->removeBadData_r();
// otherwise we have a patchable error
m_hadCorruption = true;
}
}
void Msg5::mergeLists() {
verify_signature();
// . don't do any merge if this is true
// . if our fetch of remote list fails, then we'll be called
// again with this set to false
if ( m_hadCorruption ) return;
if ( m_isSingleUnmergedListGet ) {
if(m_numFiles>1) gbshutdownLogicError();
if(m_numFiles<0) gbshutdownLogicError();
if(!m_listPtrs[0]) gbshutdownLogicError();
//just move move the m_msg3.list[0] over to the resulting list
m_list->stealFromOtherList(m_listPtrs[0]);
return;
}
// start the timer
//int64_t startTime = gettimeofdayInMilliseconds();
// . if the key of the last key of the previous list we read from
// is not below startKey, reset the truncation count to avoid errors
// . if caller does the same read over and over again then
// we would do a truncation in error eventually
// . use m_fileStartKey, not just m_startKey, since we may be doing
// a follow-up read
// . old Msg3 notes:
// . otherwise, merge the lists together
// . this may call growList() via RdbList::addRecord/Key() but it
// shouldn't since we called RdbList::prepareForMerge() above
// . we aren't allowed to do allocating in a thread!
// . TODO: only merge the recs not cached, [m_fileStartKey, endKey]
// . merge() might shrink m_endKey in diskList if m_minRecSizes
// contrained us OR it might decrement it by 1 if it's a negative key
// .........................
// . this MUST start at m_list->m_listPtr cuz this may not be the
// 1st time we had to dive in to disk, due to negative rec
// annihilation
// . old finalList.merge_r() Msg5 notes:
// . use startKey of tree
// . NOTE: tree may contains some un-annihilated key pairs because
// one of them was PROBABLY in the dump queue and we decided in
// Rdb::addRecord() NOT to do the annihilation, therefore it's good
// to do the merge to do the annihilation
m_list->merge_r(m_listPtrs, m_numListPtrs, m_startKey, m_minEndKey, m_minRecSizes, m_removeNegRecs, m_rdbId, m_collnum,
m_numSources, m_startFileNum, m_isRealMerge);
// maintain this info for truncation purposes
if ( m_list->isLastKeyValid() )
//m_prevKey = m_list->getLastKey();
KEYSET(m_prevKey,m_list->getLastKey(),m_ks);
else {
// . lastKey should be set and valid if list is not empty
// . we need it for de-duping dup tfndb recs that fall on our
// read boundaries
if ( m_list->getListSize() > 0 )
log(LOG_LOGIC,"db: Msg5. Last key invalid.");
}
}
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
// . this is called when all files are done reading in m_msg3
// . sets g_errno on error
// . problem: say maxRecSizes is 1200 (1000 keys)
// . there are 10000 keys in the [startKey,endKey] range
// . we read 1st 1000 recs from the tree and store in m_treeList
// . we read 1st 1000 recs from disk
// . all recs in tree are negative and annihilate the 1000 recs from disk
// . we are left with an empty list
bool Msg5::doneMerging ( ) {
verify_signature();
//m_waitingForMerge = false;
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base = getRdbBase( m_rdbId, m_collnum );
if ( ! base ) {
return true;
}
// . if there was a merge error, bitch about it
// . Thread class should propagate g_errno when it was set in a thread
if ( g_errno ) {
log( LOG_WARN, "net: Had error merging lists from %s: %s.",
base->getDbName(),mstrerror(g_errno));
return true;
}
// . was a list corrupted?
// . if so, we did not even begin the merge yet
// . try to get the list from a remote brother
// . if that fails we have already removed the bad data, so begin
// our first merge
if ( m_hadCorruption ) {
// log it here, cuz logging in thread doesn't work too well
log( LOG_WARN, "net: Encountered a corrupt list in rdb=%s collnum=%" PRId32,
base->getDbName(),(int32_t)m_collnum);
// remove error condition, we removed the bad data in thread
m_hadCorruption = false;
g_numCorrupt++;
if(m_callback) {
// try to get the list from remote host
if ( ! getRemoteList() ) return false;
// note that
if ( ! g_errno ) {
log("net: got remote list without blocking");
g_process.shutdownAbort(true);
}
} else
g_errno = ESHARDDOWN; //not allowed to get remote list (and return false).
// if it set g_errno, it could not get a remote list
// so try to make due with what we have
if ( g_errno ) {
// log a msg, we actually already removed it in thread
log("net: Removed corrupted data.");
// clear error
g_errno = 0;
// . merge the modified lists again
// . this is not in a thread
// . it should not block
mergeLists();
}
}
if ( m_isRealMerge )
log(LOG_DEBUG,"db: merged list is %" PRId32" bytes long.",
m_list->getListSize());
// log it
int64_t now ;
// only time it if we actually did a merge, check m_startTime
if ( m_startTime ) now = gettimeofdayInMilliseconds();
else now = 0;
int64_t took = now - m_startTime ;
if ( g_conf.m_logTimingNet ) {
if ( took > 5 )
log(LOG_INFO,
"net: Took %" PRIu64" ms to do merge. %" PRId32" lists merged "
"into one list of %" PRId32" bytes.",
took , m_numListPtrs , m_list->getListSize() );
}
// . add the stat
// . use turquoise for time to merge the disk lists
// . we should use another color rather than turquoise
// . these clog up the graph, so only log if took more than 1 ms
// . only time it if we actually did a merge, check m_startTime
if ( took > 1 && m_startTime ) {
//"rdb_list_merge"
g_stats.addStat_r( m_minRecSizes, m_startTime, now, 0x0000ffff );
}
// . scan merged list for problems
// . this caught an incorrectly set m_list->m_lastKey before
#ifdef GBSANITYCHECK
m_list->checkList_r(true, m_rdbId);
#endif
// . TODO: call freeList() on each m_list[i] here rather than destructr
// . free all lists we used
// . some of these may be from Msg3, some from cache, some from tree
for ( int32_t i = 0 ; i < m_numListPtrs ; i++ ) {
if(m_listPtrs[i]) {
m_listPtrs[i]->freeList();
m_listPtrs[i] = NULL;
}
}
// and the tree list
m_treeList.freeList();
// . update our m_newMinRecSizes
// . NOTE: this now ignores the negative records in the tree
int64_t newListSize = m_list->getListSize();
// scale proportionally based on how many got removed during the merge
int64_t percent = 100LL;
int64_t net = newListSize - m_oldListSize;
// add 5% for inconsistencies
if ( net > 0 ) percent =(((int64_t)m_newMinRecSizes*100LL)/net)+5LL;
else percent = 200;
if ( percent <= 0 ) percent = 1;
// set old list size in case we get called again
m_oldListSize = newListSize;
//int32_t delta = m_minRecSizes - (int32_t)newListSize;
// how many recs do we have left to read?
m_newMinRecSizes = m_minRecSizes - (int32_t)newListSize;
// return now if we met our minRecSizes quota
if ( m_newMinRecSizes <= 0 ) return true;
// if we gained something this round then try to read the remainder
//if ( net > 0 ) m_newMinRecSizes = delta;
// otherwise, scale proportionately
int32_t nn = ((int64_t)m_newMinRecSizes * percent ) / 100LL;
if ( percent > 100 ) {
if ( nn > m_newMinRecSizes ) m_newMinRecSizes = nn;
else m_newMinRecSizes = 0x7fffffff;
}
else m_newMinRecSizes = nn;
// . for every round we get call increase by 10 percent
// . try to fix all those negative recs in the rebalance re-run
m_newMinRecSizes *= (int32_t)(1.0 + (m_round * .10));
// wrap around?
if ( m_newMinRecSizes < 0 || m_newMinRecSizes > 1000000000 )
m_newMinRecSizes = 1000000000;
// . don't exceed original min rec sizes by 5 i guess
// . watch out for wrap
//int32_t max = 5 * m_minRecSizes ;
//if ( max < m_minRecSizes ) max = 0x7fffffff;
//if ( m_newMinRecSizes > max && max > m_minRecSizes )
// m_newMinRecSizes = max;
// keep this above a certain point because if we didn't make it now
// we got negative records messing with us
if ( m_rdbId != RDB_DOLEDB &&
m_newMinRecSizes < 128000 ) m_newMinRecSizes = 128000;
// . update startKey in case we need to read more
// . we'll need to read more if endKey < m_endKey && m_newMinRecSizes
// is positive
// . we read more from files AND from tree
//m_fileStartKey = m_list->getEndKey() ;
//m_fileStartKey += (uint32_t)1;
KEYSET(m_fileStartKey,m_list->getEndKey(),m_ks);
KEYINC(m_fileStartKey,m_ks);
return true;
}
bool g_isDumpingRdbFromMain = 0;
// . if we discover one of the lists we read from a file is corrupt we go here
// . uses Msg5 to try to get list remotely
bool Msg5::getRemoteList ( ) {
verify_signature();
// skip this part if doing a cmd line 'gb dump p main 0 -1 1' cmd or
// similar to dump out a local rdb.
if ( g_isDumpingRdbFromMain ) {
g_errno = 1;
return true;
}
// . this returns false if blocked, true otherwise
// . this sets g_errno on error
// . get list from ALL files, not just m_startFileNum/m_numFiles
// since our files may not be the same
// . if doRemotely parm is not supplied replying hostId is unspecified
// get our twin host, or a redundant host in our group
//Host *group = g_hostdb.getGroup ( g_hostdb.m_groupId );
Host *group = g_hostdb.getMyShard();
int32_t n = g_hostdb.getNumHostsPerShard();
// . if we only have 1 host per group, data is unpatchable
// . we should not have been called if this is the case!!
if ( n == 1 ) {
g_errno = EBADENGINEER;
//log("Msg5::gotRemoteList: no twins. data unpatchable.");
return true;
}
// tell them about
log("net: Getting remote list from twin instead.");
// make a new Msg0 for getting remote list
try { m_msg0 = new ( Msg0 ); }
// g_errno should be set if this is NULL
catch(std::bad_alloc&) {
g_errno = ENOMEM;
log("net: Could not allocate memory to get from twin.");
return true;
}
mnew ( m_msg0 , sizeof(Msg0) , "Msg5" );
// select our twin
int32_t i;
for ( i = 0 ; i < n ; i++ )
if ( group[i].m_hostId != g_hostdb.m_myHostId ) break;
Host *h = &group[i];
// get our groupnum. the column #
int32_t forceParitySplit = h->m_shardNum;//group;
// translate base to an id, for the sake of m_msg0
//char rdbId = getIdFromRdb ( base->m_rdb );
// . this returns false if blocked, true otherwise
// . this sets g_errno on error
// . get list from ALL files, not just m_startFileNum/m_numFiles
// since our files may not be the same
// . if doRemotely parm is not supplied replying hostId is unspecified
// . make minRecSizes as big as possible because it gets from ALL
// files and from tree!
// . just make it 256k for now lest, msg0 bitch about it being too big
// if rdbId == RDB_INDEXDB passed the truncation limit
// . wait forever for this host to reply... well, at least a day that
// way if he's dead we'll wait for him to come back up to save our
// data
verify_signature();
if ( ! m_msg0->getList ( h->m_hostId ,
m_rdbId , // rdbId
m_collnum ,
m_list ,
m_startKey ,
m_endKey ,
m_minRecSizes , // was 256k minRecSizes
this ,
gotRemoteListWrapper ,
m_niceness ,
false , // do error correction?
true , // include tree?
-1 , // first hostid
0 , // startFileNum
-1 , // numFiles (-1=all)
60*60*24*1000 , // timeout
m_isRealMerge , // merging files?
false , // doIndexdbSplit
// "forceParitySplit" is a group #
// (the groupId is a mask)
forceParitySplit ))
return false;
// this is strange
log("msg5: call to msg0 did not block");
// . if we did not block then call this directly
// . return false if it blocks
verify_signature();
return gotRemoteList ( ) ;
}
void Msg5::gotRemoteListWrapper( void *state ) {
Msg5 *THIS = (Msg5 *)state;
// return if this blocks
if ( ! THIS->gotRemoteList() ) return;
// sanity check
if ( THIS->m_calledCallback ) { g_process.shutdownAbort(true); }
// we are no longer waiting for the list
THIS->m_waitingForList = false;
// set it now
THIS->m_calledCallback = 4;
// if it doesn't block call the callback, g_errno may be set
THIS->m_callback ( THIS->m_state , THIS->m_list , THIS );
}
// returns false if it blocks
bool Msg5::gotRemoteList ( ) {
verify_signature();
// free the Msg0
mdelete ( m_msg0 , sizeof(Msg0) , "Msg5" );
delete ( m_msg0 );
// return true now if everything ok
if ( ! g_errno ) {
// . i modified checkList to set m_lastKey if it is not set
// . we need it for the big merge for getting next key in
// RdbDump.cpp
// . if it too is invalid, we are fucked
if ( ! m_list->checkList_r ( false ) ) {
log("net: Received bad list from twin.");
g_errno = ECORRUPTDATA;
goto badList;
}
// . success messages
// . logging the key ranges gives us an idea of how long
// it will take to patch the bad data
const char *sk = m_list->getStartKey();
const char *ek = m_list->getEndKey ();
log("net: Received good list from twin. Requested %" PRId32" bytes "
"and got %" PRId32". "
"startKey=%s endKey=%s",
m_minRecSizes , m_list->getListSize() ,
KEYSTR(sk,m_ks),KEYSTR(ek,m_ks));
// . HACK: fix it so end key is right
// . TODO: fix this in Msg0::gotReply()
// . if it is empty, then there must be nothing else left
// since the endKey was maxed in call to Msg0::getList()
if ( ! m_list->isEmpty() )
m_list->setEndKey ( m_list->getLastKey() );
const char *k = m_list->getStartKey();
log(LOG_DEBUG,
//"net: Received list skey.n1=%08" PRIx32" skey.n0=%016" PRIx64"." ,
// k.n1 , k.n0 );
"net: Received list skey=%s." ,
KEYSTR(k,m_ks) );
k = m_list->getEndKey();
log(LOG_DEBUG,
//"net: Received list ekey.n1=%08" PRIx32" ekey.n0=%016" PRIx64"." ,
// k.n1 , k.n0 );
"net: Received list ekey=%s",
KEYSTR(k,m_ks) );
if ( ! m_list->isEmpty() ) {
k = m_list->getLastKey();
//log(LOG_DEBUG,"net: Received list Lkey.n1=%08" PRIx32" "
// "Lkey.n0=%016" PRIx64 , k.n1 , k.n0 );
log(LOG_DEBUG,"net: Received list Lkey=%s",
KEYSTR(k,m_ks) );
}
//log("Msg5::gotRemoteList: received list is good.");
return true;
}
badList:
// it points to a corrupted list from twin, so reset
m_list->reset();
// because we passed m_list to Msg0, it called m_list->reset()
// which set our m_mergeMinListSize to -1, so we have to call
// the prepareForMerge() again
if ( ! m_list->prepareForMerge ( m_listPtrs ,
m_numListPtrs ,
m_minRecSizes ) ) {
log("net: Had error preparing for merge: %s.",
mstrerror(g_errno));
return true;
}
// . if g_errno is timed out we couldn't get a patch list
// . turn off error correction and try again
log("net: Had error getting remote list: %s.", mstrerror(g_errno) );
log("net: Merging repaired lists.");
// clear g_errno so RdbMerge doesn't freak out
g_errno = 0;
// . we have the lists ready to merge
// . hadCorruption should be false at this point
mergeLists();
// process the result
return doneMerging();
}