privacore-open-source-searc.../RdbDump.cpp
Ivan Skytte Jørgensen beeddcf35d Got rid of gb-include.h
2018-07-26 17:29:51 +02:00

872 lines
26 KiB
C++

#include "RdbDump.h"
#include "Rdb.h"
#include "RdbCache.h"
#include "Collectiondb.h"
#include "Conf.h"
#include "Mem.h"
#include "Errno.h"
#include <fcntl.h>
RdbDump::RdbDump() {
// Coverity
m_tree = NULL;
m_buckets = NULL;
m_map = NULL;
m_index = NULL;
m_maxBufSize = 0;
m_state = NULL;
m_callback = NULL;
m_offset = 0;
m_file = NULL;
m_list = NULL;
m_buf = NULL;
m_verifyBuf = NULL;
m_verifyBufSize = 0;
m_bytesToWrite = 0;
m_bytesWritten = 0;
memset(m_prevLastKey, 0, sizeof(m_prevLastKey));
m_nextNode = 0;
memset(m_nextKey, 0, sizeof(m_nextKey));
m_rolledOver = false;
memset(&m_fstate, 0, sizeof(m_fstate));
m_niceness = 0;
m_useHalfKeys = false;
m_hacked = false;
m_hacked12 = false;
m_totalPosDumped = 0;
m_totalNegDumped = 0;
m_getListStartTimeMS = 0;
m_numPosRecs = 0;
m_numNegRecs = 0;
m_collnum = 0;
m_rdbId = RDB_NONE;
m_isSuspended = false;
m_ks = 0;
}
// . return false if blocked, true otherwise
// . sets g_errno on error
bool RdbDump::set(collnum_t collnum,
BigFile *file,
RdbBuckets *buckets, // optional buckets to dump
RdbTree *tree, // optional tree to dump
RdbMap *map,
RdbIndex *index,
int32_t maxBufSize,
int32_t niceness,
void *state,
void (*callback)(void *state),
bool useHalfKeys,
int64_t startOffset,
const char *prevLastKey,
char keySize,
rdbid_t rdbId) {
m_collnum = collnum;
m_file = file;
m_buckets = buckets;
m_tree = tree;
m_map = map;
m_index = index;
m_state = state;
m_callback = callback;
m_list = NULL;
m_niceness = niceness;
m_isSuspended = false;
m_ks = keySize;
m_buf = NULL;
m_verifyBuf = NULL;
m_maxBufSize = maxBufSize;
m_offset = startOffset ;
m_rolledOver = false; // true if m_nextKey rolls over back to 0
KEYMIN(m_nextKey,m_ks);
m_nextNode = 0 ; // used in dumpTree()
// if we're dumping indexdb, allow half keys
m_useHalfKeys = useHalfKeys;
//m_prevLastKey = prevLastKey;
KEYSET(m_prevLastKey,prevLastKey,m_ks);
m_rdbId = rdbId;
// . don't dump to a pre-existing file
// . seems like Rdb.cpp makes a new BigFile before calling this
// . now we can resume merges, so we can indeed dump to the END
// of a pre-exiting file, but not when dumping a tree!
if ((m_tree || m_buckets) && m_file->getFileSize() > 0) {
g_errno = EEXIST;
log(LOG_WARN, "db: Could not dump to %s. File exists.", m_file->getFilename());
return true;
}
// . open the file nonblocking, sync with disk, read/write
// . NOTE: O_SYNC doesn't work too well over NFS
// . we need O_SYNC when dumping trees only because we delete the
// nodes/records as we dump them
// . ensure this sets g_errno for us
if (!m_file->open(O_RDWR | O_CREAT)) {
return true;
}
m_file->setFlushingIsApplicable(); //we want to flush rdb files if enabled
// Above open() actually doesn't open any file(-part). First when a fd is requested
// do we open/create a real file. Do that now to check for file creation errors.
// . get the file descriptor of the first real file in BigFile
// . we should only dump to the first file in BigFile otherwise,
// we'd have to juggle fd registration
int fd = m_file->getfd(0, false);
if( fd<0 ) {
log(LOG_LOGIC, "db: dump: Bad fd of first file in BigFile.");
return true;
}
// . if no tree was provided to dump it must be RdbMerge calling us
// . he'll want to call dumpList() on his own
if (!m_tree && !m_buckets) {
return true;
}
// how many recs in tree?
int32_t nr;
const char *structureName;
if (m_tree) {
nr = m_tree->getNumUsedNodes();
structureName = "tree";
} else {
nr = m_buckets->getNumKeys();
structureName = "buckets";
}
log(LOG_INFO,"db: Dumping %" PRId32" recs from %s to files.", nr, structureName);
// keep a total count for reporting when done
m_totalPosDumped = 0;
m_totalNegDumped = 0;
// . start dumping the tree
// . return false if it blocked
if (!dumpTree(false)) {
return false;
}
// no longer dumping
doneDumping();
// return true since we didn't block
return true;
}
void RdbDump::reset ( ) {
// free verify buf if there
if (m_verifyBuf) {
mfree(m_verifyBuf, m_verifyBufSize, "RdbDump4");
m_verifyBuf = NULL;
}
}
void RdbDump::doneDumping() {
int32_t saved = g_errno;
// print stats
log(LOG_INFO,
"db: Dumped %" PRId32" positive and %" PRId32" negative recs. "
"Total = %" PRId32".",
m_totalPosDumped , m_totalNegDumped ,
m_totalPosDumped + m_totalNegDumped );
// . map verify
// . if continueDumping called us with no collectionrec, it got
// deleted so RdbBase::m_map is nuked too i guess
if (saved != ENOCOLLREC && m_map) {
log(LOG_INFO, "db: map # pos=%" PRId64" neg=%" PRId64, m_map->getNumPositiveRecs(), m_map->getNumNegativeRecs());
}
// free the list's memory
if (m_list) {
m_list->freeList();
}
// reset verify buffer
reset();
// did collection get deleted/reset from under us?
if (saved == ENOCOLLREC) {
return;
}
// check dumped file size
if (m_file) {
m_file->invalidateFileSize();
if (m_file->getFileSize() == 0) {
logError("dumped zero byte file");
gbshutdownCorrupted();
}
}
// save the map to disk. true = allDone
if (m_map) {
m_map->writeMap(true);
}
if (m_index) {
m_index->writeIndex(true);
}
#ifdef GBSANITYCHECK
// sanity check
log("DOING SANITY CHECK FOR MAP -- REMOVE ME");
if ( m_map && ! m_map->verifyMap ( m_file ) ) {
g_process.shutdownAbort(true); }
}
#endif
}
void RdbDump::tryAgainWrapper2 ( int fd , void *state ) {
// debug msg
log(LOG_INFO,"db: Trying to get data again.");
// stop waiting
g_loop.unregisterSleepCallback ( state , tryAgainWrapper2 );
// bitch about errors
if (g_errno) log("db: Had error: %s.",mstrerror(g_errno));
// get THIS ptr from state
RdbDump *THIS = (RdbDump *)state;
// continue dumping the tree or give control back to caller
THIS->continueDumping ( );
}
// . returns false if blocked, true otherwise
// . sets g_errno on error
// . dumps the RdbTree, m_tree, into m_file
// . also sets and writes the RdbMap for m_file
// . we methodically get RdbLists from the RdbTree
// . dumped recs are ordered by key if "orderedDump" was true in call to set()
// otherwise, lists are ordered by node #
// . we write each list of recs to the file until the whole tree has been done
// . we delete all records in list from the tree after we've written the list
// . if a cache was provided we incorporate the list into the cache before
// deleting it from the tree to keep the cache in sync. NO we do NOT!
// . called again by writeBuf() when it's done writing the whole list
bool RdbDump::dumpTree(bool recall) {
if (g_conf.m_logTraceRdbDump) {
logTrace(g_conf.m_logTraceRdbDump, "BEGIN");
logTrace(g_conf.m_logTraceRdbDump, "recall.: %s", recall ? "true" : "false");
logTrace(g_conf.m_logTraceRdbDump, "m_rdbId: %02x", m_rdbId);
logTrace(g_conf.m_logTraceRdbDump, "name...: [%s]", getDbnameFromId(m_rdbId) );
}
// set up some vars
char maxEndKey[MAX_KEY_BYTES];
KEYMAX(maxEndKey,m_ks);
// this list will hold the list of nodes/recs from m_tree
m_list = &m_ourList;
for (;;) {
// if the lastKey was the max end key last time then we're done
if (m_rolledOver) {
logTrace(g_conf.m_logTraceRdbDump, "END - m_rolledOver, returning true");
return true;
}
// this is set to -1 when we're done with our unordered dump
if (m_nextNode == -1) {
logTrace(g_conf.m_logTraceRdbDump, "END - m_nextNode, returning true");
return true;
}
// . NOTE: list's buffer space should be re-used!! (TODO)
// . "lastNode" is set to the last node # in the list
if (!recall) {
bool status = true;
m_getListStartTimeMS = gettimeofdayInMilliseconds();
if (m_tree) {
logTrace(g_conf.m_logTraceRdbDump, "m_tree");
status = m_tree->getList(m_collnum, m_nextKey, maxEndKey, m_maxBufSize, m_list,
&m_numPosRecs, &m_numNegRecs, m_useHalfKeys);
} else if (m_buckets) {
logTrace(g_conf.m_logTraceRdbDump, "m_buckets");
status = m_buckets->getList(m_collnum, m_nextKey, maxEndKey, m_maxBufSize, m_list,
&m_numPosRecs, &m_numNegRecs, m_useHalfKeys);
}
logTrace(g_conf.m_logTraceRdbDump, "status: %s", status ? "true" : "false");
// if error getting list (out of memory?)
if (!status) {
logError("db: Had error getting data for dump: %s. Retrying.", mstrerror(g_errno));
// retry for the remaining two types of errors
if (!g_loop.registerSleepCallback(1000, this, tryAgainWrapper2, "RdbDump::tryAgainWrapper2")) {
log(LOG_WARN, "db: Retry failed. Could not register callback.");
logTrace(g_conf.m_logTraceRdbDump, "END - retry failed, returning true");
return true;
}
logTrace(g_conf.m_logTraceRdbDump, "END - returning false");
// wait for sleep
return false;
}
}
int64_t t2 = gettimeofdayInMilliseconds();
log(LOG_INFO, "db: Get list took %" PRId64" ms. %" PRId32" positive. %" PRId32" negative.",
t2 - m_getListStartTimeMS, m_numPosRecs, m_numNegRecs);
// keep a total count for reporting when done
m_totalPosDumped += m_numPosRecs;
m_totalNegDumped += m_numNegRecs;
// . check the list we got from the tree for problems
// . ensures keys are ordered from lowest to highest as well
if (g_conf.m_verifyWrites || g_conf.m_verifyDumpedLists) {
log(LOG_INFO, "dump: verifying list before dumping (rdb=%s collnum=%i)", getDbnameFromId(m_rdbId), (int)m_collnum);
m_list->checkList_r(true, m_rdbId);
}
// if list is empty, we're done!
if (m_list->isEmpty()) {
return true;
}
// get the last key of the list
const char *lastKey = m_list->getLastKey();
// advance m_nextKey
KEYSET(m_nextKey, lastKey, m_ks);
KEYINC(m_nextKey, m_ks);
if (KEYCMP(m_nextKey, lastKey, m_ks) < 0) {
m_rolledOver = true;
}
// if list is empty, we're done!
if (m_list->isEmpty()) {
logTrace(g_conf.m_logTraceRdbDump, "END - list empty, returning true");
return true;
}
// ensure we are getting the first key of the list
m_list->resetListPtr();
// . write this list to disk
// . returns false if blocked, true otherwise
// . sets g_errno on error
// . if this blocks it should call us (dumpTree() back)
if (!dumpList(m_list, false)) {
logTrace(g_conf.m_logTraceRdbDump, "END - after dumpList, returning false");
return false;
}
// close up shop on a write/dumpList error
if (g_errno) {
logTrace(g_conf.m_logTraceRdbDump, "END - g_errno set [%" PRId32"], returning true", g_errno);
return true;
}
// . if dumpList() did not block then keep on truckin'
// . otherwise, wait for callback of dumpTree()
}
}
// . return false if blocked, true otherwise
// . sets g_errno on error
// . this one is also called by RdbMerge to dump lists
bool RdbDump::dumpList(RdbList *list, bool recall) {
logTrace(g_conf.m_logTraceRdbDump, "BEGIN. list=%p recall=%d", list, recall);
// save ptr to list
m_list = list;
// nothing to do if list is empty
if (m_list->isEmpty()) {
logTrace(g_conf.m_logTraceRdbDump, "END. Empty list.");
return true;
}
// if we had a write error and are being recalled...
if (recall) {
m_offset -= m_bytesToWrite;
} else {
// assume we don't hack the list
m_hacked = false;
m_hacked12 = false;
if (g_conf.m_verifyDumpedLists) {
// only run in a thread if callback is set
if (m_callback && g_jobScheduler.submit(&checkList, &checkedList, this, thread_type_verify_data, m_niceness)) {
logTrace(g_conf.m_logTraceRdbDump, "END. Submitted checkList job.");
return false;
}
m_list->checkList_r(true, m_rdbId);
}
}
return dumpList2(recall);
}
void RdbDump::checkList(void *state) {
RdbDump *that = reinterpret_cast<RdbDump*>(state);
logTrace(g_conf.m_logTraceRdbDump, "BEGIN. list=%p", that->m_list);
that->m_list->checkList_r(true, that->m_rdbId);
logTrace(g_conf.m_logTraceRdbDump, "END. list=%p", that->m_list);
}
void RdbDump::checkedList(void *state, job_exit_t /*exit_type*/) {
RdbDump *that = reinterpret_cast<RdbDump*>(state);
logTrace(g_conf.m_logTraceRdbDump, "BEGIN. list=%p", that->m_list);
that->dumpList2(false);
logTrace(g_conf.m_logTraceRdbDump, "END. list=%p", that->m_list);
}
bool RdbDump::dumpList2(bool recall) {
if(!recall) {
m_list->resetListPtr();
// . SANITY CHECK
// . ensure first key is >= last key added to the map map
if (m_offset > 0 && m_map) {
char k[MAX_KEY_BYTES*2+1];
m_list->getCurrentKey(k);
char lastKey[MAX_KEY_BYTES*2+1];
m_map->getLastKey(lastKey);
if (KEYCMP(k, lastKey, m_ks) <= 0) {
log(LOG_LOGIC, "db: Dumping list key out of order. "
"lastKey=%s k=%s",
KEYSTR(lastKey, m_ks),
KEYSTR(k, m_ks));
g_errno = EBADENGINEER;
m_map->printMap();
gbshutdownLogicError();
}
}
// HACK! POSDB
if (m_ks == 18 && m_offset > 0) {
char k[MAX_KEY_BYTES];
m_list->getCurrentKey(k);
// . same top 6 bytes as last key we added?
// . if so, we should only add 6 bytes from this key, not 12
// so on disk it is compressed consistently
if (memcmp((k) + (m_ks - 12), (m_prevLastKey) + (m_ks - 12), 12) == 0) {
char tmp[MAX_KEY_BYTES];
char *p = m_list->getList();
// swap high 12 bytes with low 6 bytes for first key
memcpy (tmp, p, m_ks - 12);
memmove (p, p + (m_ks - 12), 12);
memcpy (p + 12, tmp, m_ks - 12);
// big hack here
m_list->setList(p + 12);
m_list->setListPtr(p + 12);
m_list->setListPtrLo(p);
m_list->setListPtrHi(p + 6);
m_list->setListSize(m_list->getListSize() - 12);
// turn on both bits to indicate double compression
*(p + 12) |= 0x06;
m_hacked12 = true;
}
}
// . HACK
// . if we're doing an ordered dump then hack the list's first 12 byte
// key to make it a 6 byte iff the last key we dumped last time
// shares the same top 6 bytes as the first key of this list
// . this way we maintain compression consistency on the disk
// so IndexTable.cpp can expect all 6 byte keys for the same termid
// and RdbList::checkList_r() can expect the half bits to always be
// on when they can be on
// . IMPORTANT: calling m_list->resetListPtr() will mess this HACK up!!
if (m_useHalfKeys && m_offset > 0 && !m_hacked12) {
char k[MAX_KEY_BYTES];
m_list->getCurrentKey(k);
// . same top 6 bytes as last key we added?
// . if so, we should only add 6 bytes from this key, not 12
// so on disk it is compressed consistently
if (memcmp((k) + (m_ks - 6), (m_prevLastKey) + (m_ks - 6), 6) == 0) {
m_hacked = true;
char tmp[MAX_KEY_BYTES];
char *p = m_list->getList();
memcpy (tmp, p, m_ks - 6);
memmove (p, p + (m_ks - 6), 6);
memcpy (p + 6, tmp, m_ks - 6);
// big hack here
m_list->setList(p + 6);
// make this work for POSDB, too
m_list->setListPtr(p + 6);
m_list->setListPtrLo(p + 6 + 6);
m_list->setListPtrHi(p);
m_list->setListSize(m_list->getListSize() - 6);
// hack on the half bit, too
*(p + 6) |= 0x02;
}
}
// update old last key
m_list->getLastKey(m_prevLastKey);
// now write it to disk
m_buf = m_list->getList();
m_bytesToWrite = m_list->getListSize();
}
// make sure we have enough mem to add to map after a successful
// dump up here, otherwise, if we write it and fail to add to map
// the map is not in sync if we core thereafter
if (m_map && !m_map->prealloc(m_list)) {
log(LOG_ERROR, "db: Failed to prealloc list into map: %s.", mstrerror(g_errno));
// g_errno should be set to something if that failed
if (g_errno == 0) {
gbshutdownLogicError();
}
return true;
}
// tab to the old offset
int64_t offset = m_offset;
// might as well update the offset now, even before write is done
m_offset += m_bytesToWrite ;
// . if we're called by RdbMerge directly use m_callback/m_state
// . otherwise, use doneWritingWrapper() which will call dumpTree()
// . BigFile::write() return 0 if blocked,-1 on error,>0 on completion
// . it also sets g_errno on error
bool isDone = m_file->write(m_buf, m_bytesToWrite, offset, &m_fstate, m_callback ? this : NULL,
m_callback ? &doneWritingWrapper : NULL, m_niceness);
// return false if it blocked
if (!isDone) {
return false;
}
// return true on error
if (g_errno) {
return true;
}
// . delete list from tree, incorporate list into cache, add to map
// . returns false if blocked, true otherwise, sets g_errno on error
// . will only block in calling updateTfndb()
return doneDumpingList();
}
// . delete list from tree, incorporate list into cache, add to map
// . returns false if blocked, true otherwise, sets g_errno on error
bool RdbDump::doneDumpingList() {
logTrace(g_conf.m_logTraceRdbDump, "BEGIN");
// . if error was EFILECLOSE (file got closed before we wrote to it)
// then try again. file can close because fd pool needed more fds
// . we cannot do this retry in BigFile.cpp because the BigFile
// may have been deleted/unlinked from a merge, but we could move
// this check to Msg3... and do it for writes, too...
// . seem to be getting EBADFD errors now, too (what code is it?)
// i don't remember, just do it on *all* errors for now!
if (g_errno) {
if (m_isSuspended) {
// bail on error
logError("db: Had error dumping data: %s.", mstrerror(g_errno));
logTrace( g_conf.m_logTraceRdbDump, "END - returning true" );
return true;
} else {
logError("db: Had error dumping data: %s. Retrying.", mstrerror(g_errno));
bool rc = dumpList(m_list, true);
logTrace(g_conf.m_logTraceRdbDump, "END. Returning %s", rc ? "true" : "false");
return rc;
}
}
// should we verify what we wrote? useful for preventing disk
// corruption from those pesky Western Digitals and Maxtors?
if (g_conf.m_verifyWrites) {
// a debug message, if log disk debug messages is enabled
log(LOG_DEBUG,"disk: Verifying %" PRId32" bytes written.", m_bytesToWrite);
// make a read buf
if (m_verifyBuf && m_verifyBufSize < m_bytesToWrite) {
mfree(m_verifyBuf, m_verifyBufSize, "RdbDump3");
m_verifyBuf = NULL;
m_verifyBufSize = 0;
}
if (!m_verifyBuf) {
m_verifyBuf = (char *)mmalloc(m_bytesToWrite, "RdbDump3");
m_verifyBufSize = m_bytesToWrite;
}
// out of mem? if so, skip the write verify
if (!m_verifyBuf) {
return doneReadingForVerify();
}
// read what we wrote
bool isDone = m_file->read(m_verifyBuf, m_bytesToWrite, m_offset - m_bytesToWrite, &m_fstate, m_callback ? this : NULL,
m_callback ? &doneReadingForVerifyWrapper : NULL, m_niceness);
// return false if it blocked
if (!isDone) {
logTrace(g_conf.m_logTraceRdbDump, "END - isDone is false. returning false");
return false;
}
}
bool rc = doneReadingForVerify();
logTrace(g_conf.m_logTraceRdbDump, "END - after doneReadingForVerify. Returning %s", rc ? "true" : "false");
return rc;
}
void RdbDump::addListToRdbMapRdbIndex(void *state) {
RdbDump *that = static_cast<RdbDump*>(state);
// we're using the list multiple times. if hi/lo are 'hacked', then we need to save it when using it the second time.
const char *savedListPtrHi = that->m_list->getListPtrHi();
const char *savedListPtrLo = that->m_list->getListPtrLo();
// . register this with the map now
// . only register AFTER it's ALL on disk so we don't get partial
// record reads and we don't read stuff on disk that's also in tree
// . add the list to the rdb map if we have one
// . we don't have maps when we do unordered dumps
// . careful, map is NULL if we're doing unordered dump
if (that->m_map) {
int64_t t1 = gettimeofdayInMilliseconds();
bool triedToFix = false;
while (!that->m_map->addList(that->m_list)) {
// keys out of order in list from tree?
if (g_errno == ECORRUPTDATA) {
logError("m_map->addList resulted in ECORRUPTDATA");
if (that->m_tree) {
logError("trying to fix tree");
that->m_tree->fixTree();
}
if (that->m_buckets) {
logError("Contains buckets, cannot fix this yet");
that->m_list->printList(); //@@@@@@ EXCESSIVE
gbshutdownCorrupted();
}
if (triedToFix) {
logError("already tried to fix, exiting hard");
gbshutdownCorrupted();
}
triedToFix = true;
continue;
}
g_errno = ENOMEM;
// this should never happen now since we call prealloc() above
logError("Failed to add data to map, exiting hard");
gbshutdownCorrupted();
}
int64_t took = gettimeofdayInMilliseconds() - t1;
if (took > g_conf.m_logRdbMapAddListTimeThreshold) {
log(LOG_WARN, "db: adding to map took %" PRIu64" ms", took);
} else {
log(LOG_TIMING, "db: adding to map took %" PRIu64" ms", took);
}
}
if (that->m_index) {
// restore hi/lo ptr which was reset after generating map
that->m_list->setListPtrHi(savedListPtrHi);
that->m_list->setListPtrLo(savedListPtrLo);
int64_t t1 = gettimeofdayInMilliseconds();
that->m_index->addList(that->m_list);
int64_t took = gettimeofdayInMilliseconds() - t1;
if (took > g_conf.m_logRdbIndexAddListTimeThreshold) {
log(LOG_WARN, "db: adding to index took %" PRIu64" ms", took);
} else {
log(LOG_TIMING, "db: adding to index took %" PRIu64" ms", took);
}
}
// . HACK: fix hacked lists before deleting from tree
// . iff the first key has the half bit set
if (that->m_hacked) {
char tmp[MAX_KEY_BYTES];
char *p = that->m_list->getList() - 6 ;
memcpy (tmp, p, 6);
memmove (p, p + 6, that->m_ks - 6);
memcpy (p + (that->m_ks - 6), tmp, 6);
// undo the big hack
that->m_list->setList(p);
// make this work for POSDB...
that->m_list->setListPtr(p);
that->m_list->setListPtrLo(p + that->m_ks - 12);
that->m_list->setListPtrHi(p + that->m_ks - 6);
that->m_list->setListSize(that->m_list->getListSize() + 6);
// hack off the half bit, we're 12 bytes again
*p &= 0xfd;
// turn it off again just in case
that->m_hacked = false;
}
if (that->m_hacked12) {
char tmp[MAX_KEY_BYTES];
char *p = that->m_list->getList() - 12 ;
// swap high 12 bytes with low 6 bytes for first key
memcpy (tmp, p, 12);
memmove (p, p + 12, 6);
memcpy (p + 6, tmp, 12);
// big hack here
that->m_list->setList(p);
that->m_list->setListPtr(p);
that->m_list->setListPtrLo(p + 6);
that->m_list->setListPtrHi(p + 12);
that->m_list->setListSize(that->m_list->getListSize() + 12);
// hack off the half bit, we're 12 bytes again
*p &= 0xf9;
that->m_hacked12 = false;
}
}
void RdbDump::addedListToRdbMapRdbIndex(void *state, job_exit_t exit_type) {
RdbDump *that = static_cast<RdbDump*>(state);
that->continueDumping();
}
void RdbDump::doneReadingForVerifyWrapper ( void *state ) {
RdbDump *THIS = (RdbDump *)state;
// return if this blocks
if (!THIS->doneReadingForVerify()) {
return;
}
// continue
THIS->continueDumping();
}
bool RdbDump::doneReadingForVerify ( ) {
logTrace( g_conf.m_logTraceRdbDump, "BEGIN" );
// if someone reset/deleted the collection we were dumping...
CollectionRec *cr = g_collectiondb.getRec(m_collnum);
if (!cr) {
g_errno = ENOCOLLREC;
logError("db: lost collection while dumping to disk. making map null so we can stop.");
m_map = NULL;
// m_file is probably invalid too since it is stored in cr->m_bases[i]->m_files[j]
m_file = NULL;
m_index = NULL;
}
// see if what we wrote is the same as what we read back
if (m_verifyBuf && g_conf.m_verifyWrites && memcmp(m_verifyBuf, m_buf, m_bytesToWrite) != 0 && !g_errno) {
logError("disk: Write verification of %" PRId32" bytes to file %s failed at offset=%" PRId64". Retrying.",
m_bytesToWrite, m_file->getFilename(), m_offset - m_bytesToWrite);
// try writing again
bool rc = dumpList(m_list, true);
logTrace( g_conf.m_logTraceRdbDump, "END - after retrying dumpList. Returning %s", rc ? "true" : "false" );
return rc;
}
// sanity check
if (m_list->getKeySize() != m_ks) {
logError("Sanity check failed. m_list->m_ks [%02x]!= m_ks [%02x]", m_list->getKeySize(), m_ks);
gbshutdownCorrupted();
}
if (m_callback && g_jobScheduler.submit(addListToRdbMapRdbIndex, addedListToRdbMapRdbIndex, this, thread_type_file_merge, 0)) {
logTrace(g_conf.m_logTraceRdbDump, "END - submited addListToRdbMapRdbIndex job, returning false" );
return false;
}
// running in main thread
addListToRdbMapRdbIndex(this);
// don't need to call addedListToRdbMapRdbIndex here because it's just continuing the dump
// which we will when returning true
logTrace( g_conf.m_logTraceRdbDump, "END - OK, returning true" );
return true;
}
// continue dumping the tree
void RdbDump::doneWritingWrapper(void *state) {
// get THIS ptr from state
RdbDump *THIS = (RdbDump *)state;
// bitch about errors
if (g_errno && THIS->m_file) {
log( LOG_WARN, "db: Dump to %s had write error: %s.", THIS->m_file->getFilename(), mstrerror( g_errno ) );
}
// delete list from tree, incorporate list into cache, add to map
if (!THIS->doneDumpingList()) {
return;
}
// continue
THIS->continueDumping();
}
void RdbDump::continueDumping() {
// if someone reset/deleted the collection we were dumping...
CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
if (!cr) {
g_errno = ENOCOLLREC;
// m_file is invalid if collrec got nuked because so did the Rdbbase which has the files
// m_file is probably invalid too since it is stored in cr->m_bases[i]->m_files[j]
m_file = NULL;
log(LOG_WARN, "db: continue dumping lost collection");
} else if (g_errno) {
// bitch about errors, but i guess if we lost our collection
// then the m_file could be invalid since that was probably stored
// in the CollectionRec::RdbBase::m_files[] array of BigFile ptrs
// so we can't say m_file->getFilename()
log(LOG_WARN, "db: Dump to %s had error writing: %s.", m_file->getFilename(),mstrerror(g_errno));
}
// go back now if we were NOT dumping a tree
if (!(m_tree || m_buckets)) {
if (m_callback) {
m_callback(m_state);
}
return;
}
// . continue dumping the tree
// . return if this blocks
// . if the collrec was deleted or reset then g_errno will be
// ENOCOLLREC and we want to skip call to dumpTree(
if (g_errno != ENOCOLLREC && !dumpTree(false)) {
return;
}
// close it up
doneDumping();
if (m_callback) {
m_callback(m_state);
}
}