Files
privacore-open-source-searc…/RdbMerge.cpp
2016-08-31 11:13:31 +02:00

588 lines
16 KiB
C++

#include "RdbMerge.h"
#include "Rdb.h"
#include "Process.h"
#include "Spider.h"
RdbMerge::RdbMerge () {}
RdbMerge::~RdbMerge () {}
void RdbMerge::reset () { m_isMerging = false; m_isSuspended = false; }
// . buffer is used for reading and writing
// . return false if blocked, true otherwise
// . sets g_errno on error
// . if niceness is 0 merge will block, otherwise will not block
// . we now use niceness of 1 which should spawn threads that don't allow
// niceness 2 threads to launch while they're running
// . spider process now uses mostly niceness 2
// . we need the merge to take priority over spider processes on disk otherwise
// there's too much contention from spider lookups on disk for the merge
// to finish in a decent amount of time and we end up getting too many files!
bool RdbMerge::merge(rdbid_t rdbId,
collnum_t collnum,
BigFile *target,
RdbMap *targetMap,
RdbIndex *targetIndex,
int32_t startFileNum,
int32_t numFiles,
int32_t niceness,
char keySize) {
// reset ourselves
reset();
// set it
m_rdbId = rdbId;
Rdb *rdb = getRdbFromId(rdbId);
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base = getRdbBase(m_rdbId, collnum);
if (!base) {
return true;
}
m_collnum = collnum;
if (rdb->isCollectionless()) {
m_collnum = 0;
}
m_target = target;
m_targetMap = targetMap;
m_targetIndex = targetIndex;
m_startFileNum = startFileNum;
m_numFiles = numFiles;
m_fixedDataSize = base->getFixedDataSize();
m_niceness = niceness;
m_doneMerging = false;
m_ks = keySize;
// . set the key range we want to retrieve from the files
// . just get from the files, not tree (not cache?)
KEYMIN(m_startKey,m_ks);
KEYMAX(m_endKey,m_ks);
// if we're resuming a killed merge, set m_startKey to last
// key the map knows about.
// the dump will start dumping at the end of the targetMap's data file.
if ( m_targetMap->getNumRecs() > 0 ) {
log(LOG_INIT,"db: Resuming a killed merge.");
//m_startKey = m_targetMap->getLastKey();
m_targetMap->getLastKey(m_startKey);
//m_startKey += (uint32_t) 1;
KEYINC(m_startKey,m_ks);
}
return gotLock();
}
// . returns false if blocked, true otherwise
// . sets g_errno on error
bool RdbMerge::gotLock() {
// . get last mapped offset
// . this may actually be smaller than the file's actual size
// but the excess is not in the map, so we need to do it again
int64_t startOffset = m_targetMap->getFileSize();
// if startOffset is > 0 use the last key as RdbDump:m_prevLastKey
// so it can compress the next key it dumps providee m_useHalfKeys
// is true (key compression) and the next key has the same top 6 bytes
// as m_prevLastKey
char prevLastKey[MAX_KEY_BYTES];
if (startOffset > 0) {
m_targetMap->getLastKey(prevLastKey);
} else {
KEYMIN(prevLastKey, m_ks);
}
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base = getRdbBase(m_rdbId, m_collnum);
if (!base) {
return true;
}
// . set up a a file to dump the records into
// . returns false and sets g_errno on error
// . this will open m_target as O_RDWR | O_NONBLOCK | O_ASYNC ...
m_dump.set(m_collnum,
m_target,
NULL, // buckets to dump is NULL, we call dumpList
NULL, // tree to dump is NULL, we call dumpList
NULL,
m_targetMap,
m_targetIndex,
0, // m_maxBufSize. not needed if no tree!
m_niceness, // niceness of dump
this, // state
dumpListWrapper,
base->useHalfKeys(),
startOffset,
prevLastKey,
m_ks,
NULL);
// what kind of error?
if ( g_errno ) {
log( LOG_WARN, "db: gotLock: %s.", mstrerror(g_errno) );
return true;
}
// we're now merging since the dump was set up successfully
m_isMerging = true;
// make it suspended for now
m_isSuspended = true;
// . this unsuspends it
// . this returns false on error and sets g_errno
// . it returns true if blocked or merge completed successfully
return resumeMerge ( );
}
void RdbMerge::suspendMerge ( ) {
if (!m_isMerging) {
return;
}
// do not reset m_isReadyToSave...
if (m_isSuspended) {
return;
}
m_isSuspended = true;
// we are waiting for the suspension to kick in really
m_isReadyToSave = false;
// . we don't want the dump writing to an RdbMap that has been deleted
// . this can happen if the close is delayed because we are dumping
// a tree to disk
m_dump.setSuspended();
}
void RdbMerge::doSleep() {
log(LOG_WARN, "db: Merge had error: %s. Sleeping and retrying.", mstrerror(g_errno));
g_errno = 0;
g_loop.registerSleepCallback(1000, this, tryAgainWrapper);
}
// . return false if blocked, otherwise true
// . sets g_errno on error
bool RdbMerge::resumeMerge() {
// return true if not suspended
if (!m_isSuspended) {
return true;
}
// turn off the suspension so getNextList() will work
m_isSuspended = false;
// the usual loop
for (;;) {
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . we return true if it blocked
if (!getNextList()) {
return false;
}
// if g_errno is out of memory then msg3 wasn't able to get the lists
// so we should sleep and retry...
if (g_errno == ENOMEM) {
doSleep();
return false;
}
// if list is empty or we had an error then we're done
if (g_errno || m_doneMerging) {
doneMerging();
return true;
}
// . otherwise dump the list we read to our target file
// . this returns false if blocked, true otherwise
if (!dumpList()) {
return false;
}
}
}
// . return false if blocked, true otherwise
// . sets g_errno on error
bool RdbMerge::getNextList() {
// return true if g_errno is set
if (g_errno || m_doneMerging) {
return true;
}
// it's suspended so we count this as blocking
if (m_isSuspended) {
m_isReadyToSave = true;
return false;
}
// if the power is off, suspend the merging
if (!g_process.m_powerIsOn) {
m_isReadyToSave = true;
doSleep();
return false;
}
// no chop threads
m_numThreads = 0;
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base = getRdbBase(m_rdbId, m_collnum);
if (!base) {
// hmmm it doesn't set g_errno so we set it here now
// otherwise we do an infinite loop sometimes if a collection
// rec is deleted for the collnum
g_errno = ENOCOLLREC;
return true;
}
/// @todo ALC we should not chop head here once we start using additional disk for merging
// . if a contributor has just surpassed a "part" in his BigFile
// then we can delete that part from the BigFile and the map
for (int32_t i = m_startFileNum; i < m_startFileNum + m_numFiles; ++i) {
RdbMap *map = base->getMap(i);
int32_t page = map->getPage(m_startKey);
int64_t offset = map->getAbsoluteOffset(page);
BigFile *file = base->getFile(i);
int32_t part = file->getPartNum(offset);
if (part == 0) {
continue;
}
// i've seen this bug happen if we chop a part off on our
// last dump and the merge never completes for some reason...
// so if we're in the last part then don't chop the part b4 us
if (part >= file->m_maxParts - 1) {
continue;
}
// if we already unlinked part # (part-1) then continue
if (!file->doesPartExist(part - 1)) {
continue;
}
// . otherwise, excise from the map
// . we must be able to chop the mapped segments corresponding
// EXACTLY to the part file
// . therefore, PAGES_PER_SEGMENT define'd in RdbMap.h must
// evenly divide MAX_PART_SIZE in BigFile.h
// . i do this check in RdbMap.cpp
if (!map->chopHead(MAX_PART_SIZE)) {
// we had an error!
log(LOG_WARN, "db: Failed to remove data from map for %s.part%" PRId32".", file->getFilename(), part);
return true;
}
// . also, unlink any part files BELOW part # "part"
// . this returns false if it blocked, true otherwise
// . this sets g_errno on error
// . now we just unlink part file #(part-1) explicitly
if (!file->unlinkPart(part - 1, unlinkPartWrapper, this)) {
m_numThreads++;
}
if (!g_errno) {
continue;
}
log(LOG_WARN, "db: Failed to unlink file %s.part%" PRId32".", file->getFilename(), part);
return true;
}
// wait for file to be unlinked before getting list
if (m_numThreads > 0) {
return false;
}
// otherwise, get it now
return getAnotherList();
}
void RdbMerge::unlinkPartWrapper(void *state) {
RdbMerge *THIS = (RdbMerge *)state;
// wait for all threads to complete
if (--THIS->m_numThreads > 0) {
return;
}
// return if this blocks
if (!THIS->getAnotherList()) {
return;
}
// otherwise, continue the merge loop
THIS->resumeMerge();
}
bool RdbMerge::getAnotherList() {
log(LOG_DEBUG,"db: Getting another list for merge.");
// clear it up in case it was already set
g_errno = 0;
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base = getRdbBase(m_rdbId, m_collnum);
if (!base) {
return true;
}
// if merging titledb files, we must adjust m_endKey so we do
// not have to read a huge 200MB+ tfndb list
char newEndKey[MAX_KEY_BYTES];
KEYSET(newEndKey,m_endKey,m_ks);
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . we return false if it blocked
// . m_maxBufSize may be exceeded by a rec, it's just a target size
// . niceness is usually MAX_NICENESS, but reindex.cpp sets to 0
// . this was a call to Msg3, but i made it call Msg5 since
// we now do the merging in Msg5, not in msg3 anymore
// . this will now handle truncation, dup and neg rec removal
// . it remembers last termId and count so it can truncate even when
// IndexList is split between successive reads
// . IMPORTANT: when merging titledb we could be merging about 255
// files, so if we are limited to only X fds it can have a cascade
// affect where reading from one file closes the fd of another file
// in the read (since we call open before spawning the read thread)
// and can therefore take 255 retries for the Msg3 to complete
// because each read gives a EFILCLOSED error.
// so to fix it we allow one retry for each file in the read plus
// the original retry of 25
int32_t nn = base->getNumFiles();
if ( m_numFiles > 0 && m_numFiles < nn ) nn = m_numFiles;
// for now force to 100k
int32_t bufSize = 100000; // g_conf.m_mergeBufSize
// get it
return m_msg5.getList ( m_rdbId ,
m_collnum ,
&m_list ,
m_startKey ,
newEndKey , // usually is maxed!
bufSize ,
false , // includeTree?
0 , // max cache age for lookup
m_startFileNum , // startFileNum
m_numFiles ,
this , // state
gotListWrapper , // callback
m_niceness , // niceness
true , // do error correction?
NULL , // cache key ptr
0 , // retry #
nn + 75 , // max retries (mk it high)
false , // compensate for merge?
-1LL , // sync point
true , // isRealMerge? absolutely!
false );
}
void RdbMerge::gotListWrapper(void *state, RdbList *list, Msg5 *msg5) {
// get a ptr to ourselves
RdbMerge *THIS = (RdbMerge *)state;
for (;;) {
// if g_errno is out of memory then msg3 wasn't able to get the lists
// so we should sleep and retry
if (g_errno == ENOMEM) {
THIS->doSleep();
return;
}
// if g_errno we're done
if (g_errno || THIS->m_doneMerging) {
THIS->doneMerging();
return;
}
// return if this blocked
if (!THIS->dumpList()) {
return;
}
// return if this blocked
if (!THIS->getNextList()) {
return;
}
// otherwise, keep on trucking
}
}
// called after sleeping for 1 sec because of ENOMEM
void RdbMerge::tryAgainWrapper(int fd, void *state) {
// if power is still off, keep things suspended
if (!g_process.m_powerIsOn) {
return;
}
// get a ptr to ourselves
RdbMerge *THIS = (RdbMerge *)state;
// unregister the sleep callback
g_loop.unregisterSleepCallback(THIS, tryAgainWrapper);
// clear this
g_errno = 0;
// return if this blocked
if (!THIS->getNextList()) {
return;
}
// if this didn't block do the loop
gotListWrapper(THIS, NULL, NULL);
}
// similar to gotListWrapper but we call getNextList() before dumpList()
void RdbMerge::dumpListWrapper(void *state) {
// debug msg
log(LOG_DEBUG,"db: Dump of list completed: %s.",mstrerror(g_errno));
// get a ptr to ourselves
RdbMerge *THIS = (RdbMerge *)state;
for (;;) {
// collection reset or deleted while RdbDump.cpp was writing out?
if (g_errno == ENOCOLLREC) {
THIS->doneMerging();
return;
}
// return if this blocked
if (!THIS->getNextList()) {
return;
}
// if g_errno is out of memory then msg3 wasn't able to get the lists
// so we should sleep and retry
if (g_errno == ENOMEM) {
// if the dump failed, it should reset m_dump.m_offset of
// the file to what it was originally (in case it failed
// in adding the list to the map). we do not need to set
// m_startKey back to the startkey of this list, because
// it is *now* only advanced on successful dump!!
THIS->doSleep();
return;
}
// . if g_errno we're done
// . if list is empty we're done
if (g_errno || THIS->m_doneMerging) {
THIS->doneMerging();
return;
}
// return if this blocked
if (!THIS->dumpList()) {
return;
}
// otherwise, keep on trucking
}
}
// . return false if blocked, true otherwise
// . set g_errno on error
// . list should be truncated, possible have all negative keys removed,
// and de-duped thanks to RdbList::indexMerge_r() and RdbList::merge_r()
bool RdbMerge::dumpList() {
// return true on g_errno
if (g_errno) {
return true;
}
// . it's suspended so we count this as blocking
// . resumeMerge() will call getNextList() again, not dumpList() so
// don't advance m_startKey
if (m_isSuspended) {
m_isReadyToSave = true;
return false;
}
// if we use getLastKey() for this the merge completes but then
// tries to merge two empty lists and cores in the merge function
// because of that. i guess it relies on endkey rollover only and
// not on reading less than minRecSizes to determine when to stop
// doing the merge.
m_list.getEndKey(m_startKey) ;
KEYINC(m_startKey,m_ks);
/////
//
// dedup for spiderdb before we dump it. try to save disk space.
//
/////
if (m_rdbId == RDB_SPIDERDB) {
dedupSpiderdbList( &m_list );
}
// if the startKey rolled over we're done
//if ( m_startKey.n0 == 0LL && m_startKey.n1 == 0 ) m_doneMerging=true;
if (KEYCMP(m_startKey, KEYMIN(), m_ks) == 0) {
m_doneMerging = true;
}
log(LOG_DEBUG,"db: Dumping list.");
// . send the whole list to the dump
// . it returns false if blocked, true otherwise
// . it sets g_errno on error
// . it calls dumpListWrapper when done dumping
// . return true if m_dump had an error or it did not block
// . if it gets a EFILECLOSED error it will keep retrying forever
return m_dump.dumpList(&m_list, m_niceness, false);
}
void RdbMerge::doneMerging() {
// save this
int32_t saved = g_errno;
// let RdbDump free its m_verifyBuf buffer if it existed
m_dump.reset();
// . free the list's memory, reset() doesn't do it
// . when merging titledb i'm still seeing 200MB allocs to read from tfndb.
m_list.freeList();
log(LOG_INFO,"db: Merge status: %s.",mstrerror(g_errno));
// . reset our class
// . this will free it's cutoff keys buffer, trash buffer, treelist
// . TODO: should we not reset to keep the mem handy for next time
// to help avoid out of mem errors?
m_msg5.reset();
// . do we really need these anymore?
// . turn these off before calling incorporateMerge() since it
// will call attemptMerge() on all the other dbs
m_isMerging = false;
m_isSuspended = false;
// if collection rec was deleted while merging files for it
// then the rdbbase should be NULL i guess.
if (saved == ENOCOLLREC) {
return;
}
// if we are exiting then dont bother renaming the files around now.
// this prevents a core in RdbBase::incorporateMerge()
if (g_process.m_mode == EXIT_MODE) {
log(LOG_INFO, "merge: exiting. not ending merge.");
return;
}
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base = getRdbBase(m_rdbId, m_collnum);
if (!base) {
return;
}
// pass g_errno on to incorporate merge so merged file can be unlinked
base->incorporateMerge();
}