#include "RdbMerge.h" #include "Rdb.h" #include "Process.h" #include "Spider.h" RdbMerge::RdbMerge () {} RdbMerge::~RdbMerge () {} void RdbMerge::reset () { m_isMerging = false; m_isSuspended = false; } // . buffer is used for reading and writing // . return false if blocked, true otherwise // . sets g_errno on error // . if niceness is 0 merge will block, otherwise will not block // . we now use niceness of 1 which should spawn threads that don't allow // niceness 2 threads to launch while they're running // . spider process now uses mostly niceness 2 // . we need the merge to take priority over spider processes on disk otherwise // there's too much contention from spider lookups on disk for the merge // to finish in a decent amount of time and we end up getting too many files! bool RdbMerge::merge(rdbid_t rdbId, collnum_t collnum, BigFile *target, RdbMap *targetMap, RdbIndex *targetIndex, int32_t startFileNum, int32_t numFiles, int32_t niceness, char keySize) { // reset ourselves reset(); // set it m_rdbId = rdbId; Rdb *rdb = getRdbFromId(rdbId); // get base, returns NULL and sets g_errno to ENOCOLLREC on error RdbBase *base = getRdbBase(m_rdbId, collnum); if (!base) { return true; } m_collnum = collnum; if (rdb->isCollectionless()) { m_collnum = 0; } m_target = target; m_targetMap = targetMap; m_targetIndex = targetIndex; m_startFileNum = startFileNum; m_numFiles = numFiles; m_fixedDataSize = base->getFixedDataSize(); m_niceness = niceness; m_doneMerging = false; m_ks = keySize; // . set the key range we want to retrieve from the files // . just get from the files, not tree (not cache?) KEYMIN(m_startKey,m_ks); KEYMAX(m_endKey,m_ks); // if we're resuming a killed merge, set m_startKey to last // key the map knows about. // the dump will start dumping at the end of the targetMap's data file. if ( m_targetMap->getNumRecs() > 0 ) { log(LOG_INIT,"db: Resuming a killed merge."); //m_startKey = m_targetMap->getLastKey(); m_targetMap->getLastKey(m_startKey); //m_startKey += (uint32_t) 1; KEYINC(m_startKey,m_ks); } return gotLock(); } // . returns false if blocked, true otherwise // . sets g_errno on error bool RdbMerge::gotLock() { // . get last mapped offset // . this may actually be smaller than the file's actual size // but the excess is not in the map, so we need to do it again int64_t startOffset = m_targetMap->getFileSize(); // if startOffset is > 0 use the last key as RdbDump:m_prevLastKey // so it can compress the next key it dumps providee m_useHalfKeys // is true (key compression) and the next key has the same top 6 bytes // as m_prevLastKey char prevLastKey[MAX_KEY_BYTES]; if (startOffset > 0) { m_targetMap->getLastKey(prevLastKey); } else { KEYMIN(prevLastKey, m_ks); } // get base, returns NULL and sets g_errno to ENOCOLLREC on error RdbBase *base = getRdbBase(m_rdbId, m_collnum); if (!base) { return true; } // . set up a a file to dump the records into // . returns false and sets g_errno on error // . this will open m_target as O_RDWR | O_NONBLOCK | O_ASYNC ... m_dump.set(m_collnum, m_target, NULL, // buckets to dump is NULL, we call dumpList NULL, // tree to dump is NULL, we call dumpList NULL, m_targetMap, m_targetIndex, 0, // m_maxBufSize. not needed if no tree! m_niceness, // niceness of dump this, // state dumpListWrapper, base->useHalfKeys(), startOffset, prevLastKey, m_ks, NULL); // what kind of error? if ( g_errno ) { log( LOG_WARN, "db: gotLock: %s.", mstrerror(g_errno) ); return true; } // we're now merging since the dump was set up successfully m_isMerging = true; // make it suspended for now m_isSuspended = true; // . this unsuspends it // . this returns false on error and sets g_errno // . it returns true if blocked or merge completed successfully return resumeMerge ( ); } void RdbMerge::suspendMerge ( ) { if (!m_isMerging) { return; } // do not reset m_isReadyToSave... if (m_isSuspended) { return; } m_isSuspended = true; // we are waiting for the suspension to kick in really m_isReadyToSave = false; // . we don't want the dump writing to an RdbMap that has been deleted // . this can happen if the close is delayed because we are dumping // a tree to disk m_dump.setSuspended(); } void RdbMerge::doSleep() { log(LOG_WARN, "db: Merge had error: %s. Sleeping and retrying.", mstrerror(g_errno)); g_errno = 0; g_loop.registerSleepCallback(1000, this, tryAgainWrapper); } // . return false if blocked, otherwise true // . sets g_errno on error bool RdbMerge::resumeMerge() { // return true if not suspended if (!m_isSuspended) { return true; } // turn off the suspension so getNextList() will work m_isSuspended = false; // the usual loop for (;;) { // . this returns false if blocked, true otherwise // . sets g_errno on error // . we return true if it blocked if (!getNextList()) { return false; } // if g_errno is out of memory then msg3 wasn't able to get the lists // so we should sleep and retry... if (g_errno == ENOMEM) { doSleep(); return false; } // if list is empty or we had an error then we're done if (g_errno || m_doneMerging) { doneMerging(); return true; } // . otherwise dump the list we read to our target file // . this returns false if blocked, true otherwise if (!dumpList()) { return false; } } } // . return false if blocked, true otherwise // . sets g_errno on error bool RdbMerge::getNextList() { // return true if g_errno is set if (g_errno || m_doneMerging) { return true; } // it's suspended so we count this as blocking if (m_isSuspended) { m_isReadyToSave = true; return false; } // if the power is off, suspend the merging if (!g_process.m_powerIsOn) { m_isReadyToSave = true; doSleep(); return false; } // no chop threads m_numThreads = 0; // get base, returns NULL and sets g_errno to ENOCOLLREC on error RdbBase *base = getRdbBase(m_rdbId, m_collnum); if (!base) { // hmmm it doesn't set g_errno so we set it here now // otherwise we do an infinite loop sometimes if a collection // rec is deleted for the collnum g_errno = ENOCOLLREC; return true; } /// @todo ALC we should not chop head here once we start using additional disk for merging // . if a contributor has just surpassed a "part" in his BigFile // then we can delete that part from the BigFile and the map for (int32_t i = m_startFileNum; i < m_startFileNum + m_numFiles; ++i) { RdbMap *map = base->getMap(i); int32_t page = map->getPage(m_startKey); int64_t offset = map->getAbsoluteOffset(page); BigFile *file = base->getFile(i); int32_t part = file->getPartNum(offset); if (part == 0) { continue; } // i've seen this bug happen if we chop a part off on our // last dump and the merge never completes for some reason... // so if we're in the last part then don't chop the part b4 us if (part >= file->m_maxParts - 1) { continue; } // if we already unlinked part # (part-1) then continue if (!file->doesPartExist(part - 1)) { continue; } // . otherwise, excise from the map // . we must be able to chop the mapped segments corresponding // EXACTLY to the part file // . therefore, PAGES_PER_SEGMENT define'd in RdbMap.h must // evenly divide MAX_PART_SIZE in BigFile.h // . i do this check in RdbMap.cpp if (!map->chopHead(MAX_PART_SIZE)) { // we had an error! log(LOG_WARN, "db: Failed to remove data from map for %s.part%" PRId32".", file->getFilename(), part); return true; } // . also, unlink any part files BELOW part # "part" // . this returns false if it blocked, true otherwise // . this sets g_errno on error // . now we just unlink part file #(part-1) explicitly if (!file->unlinkPart(part - 1, unlinkPartWrapper, this)) { m_numThreads++; } if (!g_errno) { continue; } log(LOG_WARN, "db: Failed to unlink file %s.part%" PRId32".", file->getFilename(), part); return true; } // wait for file to be unlinked before getting list if (m_numThreads > 0) { return false; } // otherwise, get it now return getAnotherList(); } void RdbMerge::unlinkPartWrapper(void *state) { RdbMerge *THIS = (RdbMerge *)state; // wait for all threads to complete if (--THIS->m_numThreads > 0) { return; } // return if this blocks if (!THIS->getAnotherList()) { return; } // otherwise, continue the merge loop THIS->resumeMerge(); } bool RdbMerge::getAnotherList() { log(LOG_DEBUG,"db: Getting another list for merge."); // clear it up in case it was already set g_errno = 0; // get base, returns NULL and sets g_errno to ENOCOLLREC on error RdbBase *base = getRdbBase(m_rdbId, m_collnum); if (!base) { return true; } // if merging titledb files, we must adjust m_endKey so we do // not have to read a huge 200MB+ tfndb list char newEndKey[MAX_KEY_BYTES]; KEYSET(newEndKey,m_endKey,m_ks); // . this returns false if blocked, true otherwise // . sets g_errno on error // . we return false if it blocked // . m_maxBufSize may be exceeded by a rec, it's just a target size // . niceness is usually MAX_NICENESS, but reindex.cpp sets to 0 // . this was a call to Msg3, but i made it call Msg5 since // we now do the merging in Msg5, not in msg3 anymore // . this will now handle truncation, dup and neg rec removal // . it remembers last termId and count so it can truncate even when // IndexList is split between successive reads // . IMPORTANT: when merging titledb we could be merging about 255 // files, so if we are limited to only X fds it can have a cascade // affect where reading from one file closes the fd of another file // in the read (since we call open before spawning the read thread) // and can therefore take 255 retries for the Msg3 to complete // because each read gives a EFILCLOSED error. // so to fix it we allow one retry for each file in the read plus // the original retry of 25 int32_t nn = base->getNumFiles(); if ( m_numFiles > 0 && m_numFiles < nn ) nn = m_numFiles; // for now force to 100k int32_t bufSize = 100000; // g_conf.m_mergeBufSize // get it return m_msg5.getList ( m_rdbId , m_collnum , &m_list , m_startKey , newEndKey , // usually is maxed! bufSize , false , // includeTree? 0 , // max cache age for lookup m_startFileNum , // startFileNum m_numFiles , this , // state gotListWrapper , // callback m_niceness , // niceness true , // do error correction? NULL , // cache key ptr 0 , // retry # nn + 75 , // max retries (mk it high) false , // compensate for merge? -1LL , // sync point true , // isRealMerge? absolutely! false ); } void RdbMerge::gotListWrapper(void *state, RdbList *list, Msg5 *msg5) { // get a ptr to ourselves RdbMerge *THIS = (RdbMerge *)state; for (;;) { // if g_errno is out of memory then msg3 wasn't able to get the lists // so we should sleep and retry if (g_errno == ENOMEM) { THIS->doSleep(); return; } // if g_errno we're done if (g_errno || THIS->m_doneMerging) { THIS->doneMerging(); return; } // return if this blocked if (!THIS->dumpList()) { return; } // return if this blocked if (!THIS->getNextList()) { return; } // otherwise, keep on trucking } } // called after sleeping for 1 sec because of ENOMEM void RdbMerge::tryAgainWrapper(int fd, void *state) { // if power is still off, keep things suspended if (!g_process.m_powerIsOn) { return; } // get a ptr to ourselves RdbMerge *THIS = (RdbMerge *)state; // unregister the sleep callback g_loop.unregisterSleepCallback(THIS, tryAgainWrapper); // clear this g_errno = 0; // return if this blocked if (!THIS->getNextList()) { return; } // if this didn't block do the loop gotListWrapper(THIS, NULL, NULL); } // similar to gotListWrapper but we call getNextList() before dumpList() void RdbMerge::dumpListWrapper(void *state) { // debug msg log(LOG_DEBUG,"db: Dump of list completed: %s.",mstrerror(g_errno)); // get a ptr to ourselves RdbMerge *THIS = (RdbMerge *)state; for (;;) { // collection reset or deleted while RdbDump.cpp was writing out? if (g_errno == ENOCOLLREC) { THIS->doneMerging(); return; } // return if this blocked if (!THIS->getNextList()) { return; } // if g_errno is out of memory then msg3 wasn't able to get the lists // so we should sleep and retry if (g_errno == ENOMEM) { // if the dump failed, it should reset m_dump.m_offset of // the file to what it was originally (in case it failed // in adding the list to the map). we do not need to set // m_startKey back to the startkey of this list, because // it is *now* only advanced on successful dump!! THIS->doSleep(); return; } // . if g_errno we're done // . if list is empty we're done if (g_errno || THIS->m_doneMerging) { THIS->doneMerging(); return; } // return if this blocked if (!THIS->dumpList()) { return; } // otherwise, keep on trucking } } // . return false if blocked, true otherwise // . set g_errno on error // . list should be truncated, possible have all negative keys removed, // and de-duped thanks to RdbList::indexMerge_r() and RdbList::merge_r() bool RdbMerge::dumpList() { // return true on g_errno if (g_errno) { return true; } // . it's suspended so we count this as blocking // . resumeMerge() will call getNextList() again, not dumpList() so // don't advance m_startKey if (m_isSuspended) { m_isReadyToSave = true; return false; } // if we use getLastKey() for this the merge completes but then // tries to merge two empty lists and cores in the merge function // because of that. i guess it relies on endkey rollover only and // not on reading less than minRecSizes to determine when to stop // doing the merge. m_list.getEndKey(m_startKey) ; KEYINC(m_startKey,m_ks); ///// // // dedup for spiderdb before we dump it. try to save disk space. // ///// if (m_rdbId == RDB_SPIDERDB) { dedupSpiderdbList( &m_list ); } // if the startKey rolled over we're done //if ( m_startKey.n0 == 0LL && m_startKey.n1 == 0 ) m_doneMerging=true; if (KEYCMP(m_startKey, KEYMIN(), m_ks) == 0) { m_doneMerging = true; } log(LOG_DEBUG,"db: Dumping list."); // . send the whole list to the dump // . it returns false if blocked, true otherwise // . it sets g_errno on error // . it calls dumpListWrapper when done dumping // . return true if m_dump had an error or it did not block // . if it gets a EFILECLOSED error it will keep retrying forever return m_dump.dumpList(&m_list, m_niceness, false); } void RdbMerge::doneMerging() { // save this int32_t saved = g_errno; // let RdbDump free its m_verifyBuf buffer if it existed m_dump.reset(); // . free the list's memory, reset() doesn't do it // . when merging titledb i'm still seeing 200MB allocs to read from tfndb. m_list.freeList(); log(LOG_INFO,"db: Merge status: %s.",mstrerror(g_errno)); // . reset our class // . this will free it's cutoff keys buffer, trash buffer, treelist // . TODO: should we not reset to keep the mem handy for next time // to help avoid out of mem errors? m_msg5.reset(); // . do we really need these anymore? // . turn these off before calling incorporateMerge() since it // will call attemptMerge() on all the other dbs m_isMerging = false; m_isSuspended = false; // if collection rec was deleted while merging files for it // then the rdbbase should be NULL i guess. if (saved == ENOCOLLREC) { return; } // if we are exiting then dont bother renaming the files around now. // this prevents a core in RdbBase::incorporateMerge() if (g_process.m_mode == EXIT_MODE) { log(LOG_INFO, "merge: exiting. not ending merge."); return; } // get base, returns NULL and sets g_errno to ENOCOLLREC on error RdbBase *base = getRdbBase(m_rdbId, m_collnum); if (!base) { return; } // pass g_errno on to incorporate merge so merged file can be unlinked base->incorporateMerge(); }