#include "gb-include.h" #include "Rdb.h" #include "Clusterdb.h" #include "Hostdb.h" #include "Tagdb.h" #include "Posdb.h" #include "Titledb.h" #include "Spider.h" #include "Spider.h" #include "Repair.h" #include "Process.h" #include "Statsdb.h" #include "Sections.h" #include "Spider.h" #include "SpiderColl.h" #include "Doledb.h" #include "Linkdb.h" #include "hash.h" #include "JobScheduler.h" #include "Stats.h" #include "max_niceness.h" #include <sys/stat.h> //mdir() Rdb::Rdb ( ) { m_lastReclaim = -1; m_cacheLastTime = 0; m_cacheLastTotal = 0LL; //m_numBases = 0; m_inAddList = false; m_collectionlessBase = NULL; m_initialized = false; m_numMergesOut = 0; reset(); } void Rdb::reset ( ) { if (m_collectionlessBase) { RdbBase *base = m_collectionlessBase; mdelete (base, sizeof(RdbBase), "Rdb Coll"); delete (base); m_collectionlessBase = NULL; } // reset tree and cache m_tree.reset(); m_buckets.reset(); m_mem.reset(); //m_cache.reset(); m_lastWrite = 0LL; m_isClosing = false; m_isClosed = false; m_isSaving = false; m_isReallyClosing = false; m_registered = false; m_lastTime = 0LL; } Rdb::~Rdb ( ) { reset(); } RdbBase *Rdb::getBase ( collnum_t collnum ) { if ( m_isCollectionLess ) return m_collectionlessBase; // RdbBase for statsdb, etc. resides in collrec #0 i guess CollectionRec *cr = g_collectiondb.m_recs[collnum]; if ( ! cr ) return NULL; // this might load the rdbbase on demand now return cr->getBase ( m_rdbId ); // m_bases[(unsigned char)m_rdbId]; } // used by Rdb::addBase1() void Rdb::addBase ( collnum_t collnum , RdbBase *base ) { // if we are collectionless, like g_statsdb.m_rdb etc.. shared by all collections essentially. if ( m_isCollectionLess ) { m_collectionlessBase = base; return; } CollectionRec *cr = g_collectiondb.m_recs[collnum]; if ( ! cr ) return; //if ( cr->m_bases[(unsigned char)m_rdbId] ) { g_process.shutdownAbort(true); } RdbBase *oldBase = cr->getBasePtr ( m_rdbId ); if ( oldBase ) { g_process.shutdownAbort(true); } //cr->m_bases[(unsigned char)m_rdbId] = base; cr->setBasePtr ( m_rdbId , base ); log ( LOG_DEBUG,"db: added base to collrec " "for rdb=%s rdbid=%" PRId32" coll=%s collnum=%" PRId32" " "base=0x%" PTRFMT"", m_dbname,(int32_t)m_rdbId,cr->m_coll,(int32_t)collnum, (PTRTYPE)base); } bool Rdb::init ( const char *dir , const char *dbname , int32_t fixedDataSize , int32_t minToMerge , int32_t maxTreeMem , int32_t maxTreeNodes , bool useHalfKeys , bool isTitledb , char keySize , bool isCollectionLess, bool useIndexFile ) { // reset all reset(); // sanity if ( ! dir ) { g_process.shutdownAbort(true); } // statsdb m_isCollectionLess = isCollectionLess; // save the dbname NULL terminated into m_dbname/m_dbnameLen m_dbnameLen = strlen ( dbname ); gbmemcpy ( m_dbname , dbname , m_dbnameLen ); m_dbname [ m_dbnameLen ] = '\0'; // store the other parameters for initializing each Rdb m_fixedDataSize = fixedDataSize; m_maxTreeMem = maxTreeMem; m_useHalfKeys = useHalfKeys; m_isTitledb = isTitledb; m_ks = keySize; m_inDumpLoop = false; // set our id m_rdbId = getIdFromRdb(this); if (m_rdbId <= 0) { log( LOG_LOGIC, "db: dbname of %s is invalid.", dbname ); return false; } // sanity check if (m_ks != getKeySizeFromRdbId(m_rdbId)) { g_process.shutdownAbort(true); } if (m_rdbId == RDB_POSDB || m_rdbId == RDB2_POSDB2) { m_useIndexFile = g_conf.m_noInMemoryPosdbMerge ? useIndexFile : false; } else { m_useIndexFile = useIndexFile; } // get page size m_pageSize = GB_TFNDB_PAGE_SIZE; if ( m_rdbId == RDB_POSDB ) m_pageSize = GB_INDEXDB_PAGE_SIZE; if ( m_rdbId == RDB2_POSDB2 ) m_pageSize = GB_INDEXDB_PAGE_SIZE; if ( m_rdbId == RDB_TITLEDB ) m_pageSize = GB_INDEXDB_PAGE_SIZE; if ( m_rdbId == RDB2_TITLEDB2 ) m_pageSize = GB_INDEXDB_PAGE_SIZE; if ( m_rdbId == RDB_SPIDERDB ) m_pageSize = GB_INDEXDB_PAGE_SIZE; if ( m_rdbId == RDB_DOLEDB ) m_pageSize = GB_INDEXDB_PAGE_SIZE; if ( m_rdbId == RDB2_SPIDERDB2 ) m_pageSize = GB_INDEXDB_PAGE_SIZE; if ( m_rdbId == RDB_LINKDB ) m_pageSize = GB_INDEXDB_PAGE_SIZE; if ( m_rdbId == RDB2_LINKDB2 ) m_pageSize = GB_INDEXDB_PAGE_SIZE; // we can't merge more than MAX_RDB_FILES files at a time if ( minToMerge > MAX_RDB_FILES ) minToMerge = MAX_RDB_FILES; m_minToMerge = minToMerge; m_useTree = true; if ( m_rdbId == RDB_POSDB || m_rdbId == RDB2_POSDB2 ) { m_useTree = false; } sprintf(m_treeName,"tree-%s",m_dbname); // . if maxTreeNodes is -1, means auto compute it // . set tree to use our fixed data size // . returns false and sets g_errno on error if(m_useTree) { int32_t rdbId = m_rdbId; // statsdb is collectionless really so pass on to tree if ( rdbId == RDB_STATSDB ) { rdbId = -1; } if (!m_tree.set(fixedDataSize, maxTreeNodes, maxTreeMem, false, m_treeName, false, m_dbname, m_ks, rdbId)) { log( LOG_ERROR, "db: Failed to set tree." ); return false; } } else { if(treeFileExists()) { m_tree.set(fixedDataSize, maxTreeNodes, maxTreeMem, false, m_treeName, false, m_dbname, m_ks, m_rdbId); } // set this then sprintf(m_treeName,"buckets-%s",m_dbname); if (!m_buckets.set(fixedDataSize, maxTreeMem, m_treeName, m_rdbId, m_dbname, m_ks)) { log( LOG_ERROR, "db: Failed to set buckets." ); return false; } } // now get how much mem the tree is using (not including stored recs) int32_t dataMem; if (m_useTree) dataMem = maxTreeMem - m_tree.getTreeOverhead(); else dataMem = maxTreeMem - m_buckets.getMemOccupied( ); sprintf(m_memName,"mem-%s",m_dbname); if ( fixedDataSize != 0 && ! m_mem.init ( this , dataMem , m_ks , m_memName ) ) { log( LOG_ERROR, "db: Failed to initialize memory: %s.", mstrerror( g_errno ) ); return false; } // load any saved tree if ( ! loadTree ( ) ) { log( LOG_ERROR, "db: Failed to load tree." ); return false; } m_initialized = true; // success return true; } // . when the PageRepair.cpp rebuilds our rdb for a particular collection // we clear out the old data just for that collection and point to the newly // rebuilt data // . rdb2 is the rebuilt/secondary rdb we want to set this primary rdb to // . rename, for safe keeping purposes, current old files to : // trash/coll.mycoll.timestamp.indexdb0001.dat.part30 and // trash/timestamp.indexdb-saved.dat // . rename newly rebuilt files from indexdbRebuild0001.dat.part30 to // indexdb0001.dat.part30 (just remove the "Rebuild" from the filename) // . remove all recs for that coll from the tree AND cache because the rebuilt // rdb is replacing the primary rdb for this collection // . the rebuilt secondary tree should be empty! (force dumped) // . reload the maps/files in the primary rdb after we remove "Rebuild" from // their filenames // . returns false and sets g_errno on error bool Rdb::updateToRebuildFiles ( Rdb *rdb2 , char *coll ) { // how come not in repair mode? if ( ! g_repairMode ) { g_process.shutdownAbort(true); } // make a dir in the trash subfolder to hold them uint32_t t = (uint32_t)getTime(); char dstDir[256]; // make the trash dir if not there sprintf ( dstDir , "%s/trash/" , g_hostdb.m_dir ); int32_t status = ::mkdir ( dstDir , getDirCreationFlags() ); // we have to create it sprintf ( dstDir , "%s/trash/rebuilt%" PRIu32"/" , g_hostdb.m_dir , t ); status = ::mkdir ( dstDir , getDirCreationFlags() ); if ( status && errno != EEXIST ) { g_errno = errno; log(LOG_WARN, "repair: Could not mkdir(%s): %s",dstDir, mstrerror(errno)); return false; } // clear it in case it existed g_errno = 0; // if some things need to be saved, how did that happen? // we saved everything before we entered repair mode and did not // allow anything more to be added... and we do not allow any // collections to be deleted via Collectiondb::deleteRec() when // in repair mode... how could this happen? //if ( m_needsSave ) { g_process.shutdownAbort(true); } // delete old collection recs CollectionRec *cr = g_collectiondb.getRec ( coll ); if ( ! cr ) { log(LOG_WARN, "db: Exchange could not find coll, %s.",coll); return false; } collnum_t collnum = cr->m_collnum; RdbBase *base = getBase ( collnum ); if ( ! base ) { log(LOG_WARN, "repair: Could not find old base for %s.", coll); return false; } RdbBase *base2 = rdb2->getBase ( collnum ); if ( ! base2 ) { log(LOG_WARN, "repair: Could not find new base for %s.", coll); return false; } if ( rdb2->getNumUsedNodes() != 0 ) { log(LOG_WARN, "repair: Recs present in rebuilt tree for db %s and collection %s.", m_dbname, coll); return false; } logf(LOG_INFO,"repair: Updating rdb %s for collection %s.", m_dbname,coll); // now MOVE the tree file on disk char src[1024]; char dst[1024]; if(m_useTree) { sprintf ( src , "%s/%s-saved.dat" , g_hostdb.m_dir , m_dbname ); sprintf ( dst , "%s/%s-saved.dat" , dstDir , m_dbname ); } else { sprintf ( src , "%s/%s-buckets-saved.dat", g_hostdb.m_dir , m_dbname ); sprintf ( dst , "%s/%s-buckets-saved.dat", dstDir , m_dbname ); } const char *structName = m_useTree ? "tree" : "buckets"; char cmd[2048+32]; sprintf ( cmd , "mv %s %s",src,dst); logf(LOG_INFO,"repair: Moving *-saved.dat %s. %s", structName, cmd); errno = 0; if ( gbsystem ( cmd ) == -1 ) { log( LOG_ERROR, "repair: Moving saved %s had error: %s.", structName, mstrerror( errno ) ); return false; } log("repair: Moving saved %s: %s",structName, mstrerror(errno)); // now move our map and data files to the "trash" subdir, "dstDir" logf(LOG_INFO,"repair: Moving old data and map files to trash."); if ( ! base->moveToTrash(dstDir) ) { log(LOG_WARN, "repair: Trashing new rdb for %s failed.", coll); return false; } // . now rename the newly rebuilt files to our filenames // . just removes the "Rebuild" from their filenames logf(LOG_INFO,"repair: Renaming new data and map files."); if ( ! base2->removeRebuildFromFilenames() ) { log(LOG_WARN, "repair: Renaming old rdb for %s failed.", coll); return false; } // reset the rdb bases (clears out files and maps from mem) base->reset (); base2->reset(); // reload the newly rebuilt files into the primary rdb logf(LOG_INFO,"repair: Loading new data and map files."); if ( ! base->setFiles() ) { log(LOG_WARN, "repair: Failed to set new files for %s.", coll); return false; } // allow rdb2->reset() to succeed without dumping core rdb2->m_tree.setNeedsSave(false); rdb2->m_buckets.setNeedsSave(false); // . make rdb2, the secondary rdb used for rebuilding, give up its mem // . if we do another rebuild its ::init() will be called by PageRepair rdb2->reset(); // clean out tree, newly rebuilt rdb does not have any data in tree if ( m_useTree ) m_tree.delColl ( collnum ); else m_buckets.delColl( collnum ); // reset our cache //m_cache.clear ( collnum ); // Success return true; } // . returns false and sets g_errno on error, returns true on success // . if this rdb is collectionless we set m_collectionlessBase in addBase() bool Rdb::addRdbBase1 ( const char *coll ) { collnum_t collnum = g_collectiondb.getCollnum ( coll ); return addRdbBase2 ( collnum ); } bool Rdb::addRdbBase2 ( collnum_t collnum ) { // addColl2() if ( ! m_initialized ) { g_errno = EBADENGINEER; log(LOG_WARN, "db: adding coll to uninitialized rdb!"); return false; } // catdb,statsbaccessdb,facebookdb,syncdb if ( m_isCollectionLess ) collnum = (collnum_t)0; // ensure no max breech if ( collnum < (collnum_t) 0 ) { g_errno = ENOBUFS; int64_t maxColls = 1LL << (sizeof(collnum_t)*8); log(LOG_WARN, "db: %s: Failed to add collection #%i. Would breech maximum number of collections, %" PRId64".", m_dbname,collnum,maxColls); return false; } CollectionRec *cr = NULL; const char *coll = NULL; if ( ! m_isCollectionLess ) cr = g_collectiondb.m_recs[collnum]; if ( cr ) coll = cr->m_coll; if ( m_isCollectionLess ) coll = "collectionless"; // . ensure no previous one exists // . well it will be there but will be uninitialized, m_rdb will b NULL RdbBase *base = NULL; if ( cr ) base = cr->getBasePtr ( m_rdbId ); if ( base ) { // m_bases [ collnum ] ) { g_errno = EBADENGINEER; log(LOG_WARN, "db: Rdb for db \"%s\" and collection \"%s\" (collnum %" PRId32") exists.", m_dbname,coll,(int32_t)collnum); return false; } // make a new one RdbBase *newColl = NULL; try {newColl= new(RdbBase);} catch(...){ g_errno = ENOMEM; log(LOG_WARN, "db: %s: Failed to allocate %" PRId32" bytes for collection \"%s\".", m_dbname,(int32_t)sizeof(Rdb),coll); return false; } mnew(newColl, sizeof(RdbBase), "Rdb Coll"); //m_bases [ collnum ] = newColl; base = newColl; // add it to CollectionRec::m_bases[] base ptrs array addBase ( collnum , newColl ); // . set CollectionRec::m_numPos/NegKeysInTree[rdbId] // . these counts are now stored in the CollectionRec and not // in RdbTree since the # of collections can be huge! if ( m_useTree ) { m_tree.setNumKeys ( cr ); } RdbTree *tree = NULL; RdbBuckets *buckets = NULL; if(m_useTree) tree = &m_tree; else buckets = &m_buckets; // . init it // . g_hostdb.m_dir should end in / if ( ! base->init ( g_hostdb.m_dir, m_dbname , m_fixedDataSize , m_minToMerge , m_useHalfKeys , m_ks , m_pageSize , coll , collnum , tree , buckets , &m_dump , this , m_isTitledb , m_useIndexFile ) ) { logf(LOG_INFO,"db: %s: Failed to initialize db for " "collection \"%s\".", m_dbname,coll); //exit(-1); return false; } //if ( (int32_t)collnum >= m_numBases ) m_numBases = (int32_t)collnum + 1; // Success return true; } bool Rdb::resetBase ( collnum_t collnum ) { CollectionRec *cr = g_collectiondb.getRec(collnum); if ( ! cr ) return true; //RdbBase *base = cr->m_bases[(unsigned char)m_rdbId]; // get the ptr, don't use CollectionRec::getBase() so we do not swapin RdbBase *base = cr->getBasePtr (m_rdbId); if ( ! base ) return true; base->reset(); return true; } bool Rdb::deleteAllRecs ( collnum_t collnum ) { // remove from tree if(m_useTree) m_tree.delColl ( collnum ); else m_buckets.delColl ( collnum ); // only for doledb now, because we unlink we do not move the files // into the trash subdir and doledb is easily regenerated. i don't // want to take the risk with other files. if ( m_rdbId != RDB_DOLEDB ) { g_process.shutdownAbort(true); } CollectionRec *cr = g_collectiondb.getRec ( collnum ); // deleted from under us? if ( ! cr ) { log("rdb: deleteallrecs: cr is NULL"); return true; } //Rdbbase *base = cr->m_bases[(unsigned char)m_rdbId]; RdbBase *base = cr->getBase(m_rdbId); if ( ! base ) return true; // scan files in there for ( int32_t i = 0 ; i < base->getNumFiles() ; i++ ) { BigFile *f = base->getFile(i); // move to trash char newdir[1024]; sprintf(newdir, "%strash/",g_hostdb.m_dir); f->move ( newdir ); } // nuke all the files base->reset(); // reset rec counts cr->m_numNegKeysInTree[RDB_DOLEDB] = 0; cr->m_numPosKeysInTree[RDB_DOLEDB] = 0; return true; } bool makeTrashDir() { char trash[1024]; sprintf(trash, "%strash/",g_hostdb.m_dir); if ( ::mkdir ( trash , getDirCreationFlags() ) ) { if ( errno != EEXIST ) { log("dir: mkdir %s had error: %s", trash,mstrerror(errno)); return false; } // clear it errno = 0; } return true; } bool Rdb::deleteColl ( collnum_t collnum , collnum_t newCollnum ) { // remove these collnums from tree if(m_useTree) m_tree.delColl ( collnum ); else m_buckets.delColl ( collnum ); // . close all files, set m_numFiles to 0 in RdbBase // . TODO: what about outstanding merge or dump operations? // . it seems like we can't really recycle this too easily // because reset it not resetting filenames or directory name? // just nuke it and rebuild using addRdbBase2()... RdbBase *oldBase = getBase ( collnum ); mdelete (oldBase, sizeof(RdbBase), "Rdb Coll"); delete (oldBase); // NULL it out... CollectionRec *oldcr = g_collectiondb.getRec(collnum); oldcr->setBasePtr ( m_rdbId , NULL ); char *coll = oldcr->m_coll; const char *msg = "deleted"; // if just resetting recycle base if (collnum != newCollnum) { addRdbBase2(newCollnum); msg = "moved"; } log(LOG_DEBUG,"db: %s base from collrec " "rdb=%s rdbid=%" PRId32" coll=%s collnum=%" PRId32" newcollnum=%" PRId32, msg,m_dbname,(int32_t)m_rdbId,coll,(int32_t)collnum, (int32_t)newCollnum); // move the files into trash // nuke it on disk char oldname[1024]; sprintf(oldname, "%scoll.%s.%" PRId32"/",g_hostdb.m_dir,coll, (int32_t)collnum); char newname[1024]; sprintf(newname, "%strash/coll.%s.%" PRId32".%" PRId64"/",g_hostdb.m_dir,coll, (int32_t)collnum,gettimeofdayInMilliseconds()); //Dir d; d.set ( dname ); // ensure ./trash dir is there makeTrashDir(); // move into that dir ::rename ( oldname , newname ); log ( LOG_DEBUG, "db: cleared data for coll \"%s\" (%" PRId32") rdb=%s.", coll,(int32_t)collnum ,getDbnameFromId(m_rdbId)); return true; } // returns false and sets g_errno on error, returns true on success bool Rdb::delColl ( const char *coll ) { collnum_t collnum = g_collectiondb.getCollnum ( coll ); RdbBase *base = getBase ( collnum ); // ensure its there if (collnum < (collnum_t)0 || !base) { g_errno = EBADENGINEER; log(LOG_WARN, "db: %s: Failed to delete collection #%i. Does not exist.", m_dbname,collnum); return false; } // move all files to trash and clear the tree/buckets deleteColl(collnum, collnum); return true; } // . returns false if blocked true otherwise // . sets g_errno on error // . CAUTION: only set urgent to true if we got a SIGSEGV or SIGPWR... bool Rdb::close ( void *state , void (* callback)(void *state ), bool urgent , bool isReallyClosing ) { // unregister in case already registered if ( m_registered ) g_loop.unregisterSleepCallback (this,closeSleepWrapper); // reset g_errno g_errno = 0; // return true if no RdbBases in m_bases[] to close if ( getNumBases() <= 0 ) return true; // return true if already closed if ( m_isClosed ) return true; // don't call more than once if ( m_isSaving ) return true; // update last write time so main.cpp doesn't keep calling us m_lastWrite = gettimeofdayInMilliseconds(); // set the m_isClosing flag in case we're waiting for a dump. // then, when the dump is done, it will come here again m_closeState = state; m_closeCallback = callback; m_urgent = urgent; m_isReallyClosing = isReallyClosing; if ( m_isReallyClosing ) m_isClosing = true; // . don't call more than once // . really only for when isReallyClosing is false... just a quick save m_isSaving = true; // suspend any merge permanently (not just for this rdb), we're exiting if ( m_isReallyClosing ) { g_merge.suspendMerge(); g_merge2.suspendMerge(); } // . allow dumps to complete unless we're urgent // . if we're urgent, we'll end up with a half dumped file, which // is ok now, since it should get its RdbMap auto-generated for it // when we come back up again if ( ! m_urgent && m_inDumpLoop ) { // m_dump.isDumping() ) { m_isSaving = false; const char *tt = "save"; if ( m_isReallyClosing ) tt = "close"; log(LOG_INFO, "db: Cannot %s %s until dump finishes.", tt, m_dbname); return false; } // if a write thread is outstanding, and we exit now, we can end up // freeing the buffer it is writing and it will core... and things // won't be in sync with the map when it is saved below... if ( m_isReallyClosing && g_merge.isMerging() && // if we cored, we are urgent and need to make sure we save even // if we are merging this rdb... ! m_urgent && g_merge.getRdbId() == m_rdbId && ( g_merge.getNumThreads() || g_merge.isDumping() ) ) { // do not spam this message int64_t now = gettimeofdayInMilliseconds(); if ( now - m_lastTime >= 500 ) { log(LOG_INFO,"db: Waiting for merge to finish last " "write for %s.",m_dbname); m_lastTime = now; } g_loop.registerSleepCallback (500,this,closeSleepWrapper); m_registered = true; // allow to be called again m_isSaving = false; return false; } if ( m_isReallyClosing && g_merge2.isMerging() && // if we cored, we are urgent and need to make sure we save even // if we are merging this rdb... ! m_urgent && g_merge2.getRdbId() == m_rdbId && ( g_merge2.getNumThreads() || g_merge2.isDumping() ) ) { // do not spam this message int64_t now = gettimeofdayInMilliseconds(); if ( now - m_lastTime >= 500 ) { log(LOG_INFO,"db: Waiting for merge to finish last " "write for %s.",m_dbname); m_lastTime = now; } g_loop.registerSleepCallback (500,this,closeSleepWrapper); m_registered = true; // allow to be called again m_isSaving = false; return false; } // if we were merging to a file and are being closed urgently // save the map! Also save the maps of the files we were merging // in case the got their heads chopped (RdbMap::chopHead()) which // we do to save disk space while merging. // try to save the cache, may not save if ( m_isReallyClosing ) { // now loop over bases for ( int32_t i = 0 ; i < g_collectiondb.m_numRecs ; i++ ) { // shut it down RdbBase *base = getBase ( i ); if ( base ) { base->closeMaps ( m_urgent ); } } } // save it using a thread? bool useThread = !(m_urgent || m_isReallyClosing); // . returns false if blocked, true otherwise // . sets g_errno on error if(m_useTree) { if (!m_tree.fastSave(getDir(), m_dbname, useThread, this, doneSavingWrapper)) { return false; } } else { if (!m_buckets.fastSave(getDir(), useThread, this, doneSavingWrapper)) { return false; } } // we saved it w/o blocking OR we had an g_errno doneSaving(); return true; } void Rdb::closeSleepWrapper ( int fd , void *state ) { Rdb *THIS = (Rdb *)state; // sanity check if ( ! THIS->m_isClosing ) { g_process.shutdownAbort(true); } // continue closing, this returns false if blocked if (!THIS->close(THIS->m_closeState, THIS->m_closeCallback, false, true)) { return; } // otherwise, we call the callback THIS->m_closeCallback ( THIS->m_closeState ); } void Rdb::doneSavingWrapper ( void *state ) { Rdb *THIS = (Rdb *)state; THIS->doneSaving(); // . call the callback if any // . this let's PageMaster.cpp know when we're closed if (THIS->m_closeCallback) THIS->m_closeCallback(THIS->m_closeState); } void Rdb::doneSaving ( ) { // bail if g_errno was set if ( g_errno ) { log(LOG_WARN, "db: Had error saving %s-saved.dat: %s.", m_dbname,mstrerror(g_errno)); g_errno = 0; //m_needsSave = true; m_isSaving = false; return; } // sanity if ( m_dbname == NULL || m_dbname[0]=='\0' ) { g_process.shutdownAbort(true); } // display any error, if any, otherwise prints "Success" logf(LOG_INFO,"db: Successfully saved %s-saved.dat.", m_dbname); // mdw ---> file doesn't save right, seems like it keeps the same length as the old file... // . we're now closed // . keep m_isClosing set to true so no one can add data if ( m_isReallyClosing ) m_isClosed = true; // call it again now m_isSaving = false; } bool Rdb::isSavingTree ( ) { if ( m_useTree ) return m_tree.isSaving(); return m_buckets.isSaving(); } bool Rdb::saveTree ( bool useThread ) { const char *dbn = m_dbname; if ( ! dbn || ! dbn[0] ) { dbn = "unknown"; } // . if RdbTree::m_needsSave is false this will return true // . if RdbTree::m_isSaving is true this will return false // . returns false if blocked, true otherwise // . sets g_errno on error if (m_useTree) { if (m_tree.needsSave()) { log( LOG_DEBUG, "db: saving tree %s", dbn ); } return m_tree.fastSave ( getDir(), m_dbname, useThread, NULL, NULL ); } else { if (m_buckets.needsSave()) { log( LOG_DEBUG, "db: saving buckets %s", dbn ); } return m_buckets.fastSave ( getDir(), useThread, NULL, NULL ); } } bool Rdb::saveTreeIndex(bool /* useThread */) { if( !m_useIndexFile ) { return true; } // now loop over bases for ( int32_t i = 0 ; i < getNumBases() ; i++ ) { CollectionRec *cr = g_collectiondb.m_recs[i]; if ( ! cr ) { continue; } // if swapped out, this will be NULL, so skip it RdbBase *base = cr->getBasePtr(m_rdbId); if (base) { base->saveTreeIndex(); } } return true; } bool Rdb::saveIndexes() { // now loop over bases for (int32_t i = 0; i < getNumBases(); i++) { CollectionRec *cr = g_collectiondb.m_recs[i]; if (!cr) { continue; } // if swapped out, this will be NULL, so skip it RdbBase *base = cr->getBasePtr(m_rdbId); if (base) { base->saveIndexes(); } } return true; } bool Rdb::saveMaps () { // now loop over bases for ( int32_t i = 0 ; i < getNumBases() ; i++ ) { CollectionRec *cr = g_collectiondb.m_recs[i]; if ( ! cr ) { continue; } // if swapped out, this will be NULL, so skip it RdbBase *base = cr->getBasePtr(m_rdbId); if ( base ) { base->saveMaps(); } } return true; } bool Rdb::treeFileExists ( ) { char filename[256]; sprintf(filename,"%s-saved.dat",m_dbname); BigFile file; file.set ( getDir() , filename , NULL ); // g_conf.m_stripeDir ); return file.doesExist() > 0; } // returns false and sets g_errno on error bool Rdb::loadTree ( ) { // get the filename of the saved tree char filename[256]; sprintf(filename,"%s-saved.dat",m_dbname); //log (0,"Rdb::loadTree: loading %s",filename); // set a BigFile to this filename BigFile file; file.set ( getDir(), filename , NULL ); // g_conf.m_stripeDir ); bool treeExists = file.doesExist(); bool status = false ; if ( treeExists ) { // load the table with file named "THISDIR/saved" status = m_tree.fastLoad ( &file , &m_mem ) ; // we close it now instead of him } if ( m_useTree ) { file.close(); if ( !status && treeExists ) { log( LOG_ERROR, "db: Could not load saved tree." ); return false; } } else { if ( !m_buckets.loadBuckets( m_dbname ) ) { log( LOG_ERROR, "db: Could not load saved buckets." ); return false; } int32_t numKeys = m_buckets.getNumKeys(); // log("db: Loaded %" PRId32" recs from %s's buckets on disk.", // numKeys, m_dbname); if(!m_buckets.testAndRepair()) { log( LOG_ERROR, "db: unrepairable buckets, remove and restart." ); g_process.shutdownAbort(true); } if(treeExists) { m_buckets.addTree( &m_tree ); if ( m_buckets.getNumKeys() - numKeys > 0 ) { log( LOG_ERROR, "db: Imported %" PRId32" recs from %s's tree to buckets.", m_buckets.getNumKeys()-numKeys, m_dbname); } if ( g_conf.m_readOnlyMode ) { m_buckets.setNeedsSave(false); } else { char newFilename[256]; sprintf(newFilename,"%s-%" PRId32".old", filename, (int32_t)getTime()); bool usingThreads = g_jobScheduler.are_new_jobs_allowed(); g_jobScheduler.disallow_new_jobs(); file.rename(newFilename); if ( usingThreads ) { g_jobScheduler.allow_new_jobs(); } m_tree.reset(); } file.close(); } } return true; } static time_t s_lastTryTime = 0; // . start dumping the tree // . returns false and sets g_errno on error bool Rdb::dumpTree ( int32_t niceness ) { logTrace( g_conf.m_logTraceRdb, "BEGIN %s", m_dbname ); if (getNumUsedNodes() <= 0) { logTrace( g_conf.m_logTraceRdb, "END. %s: No used nodes/keys. Returning true", m_dbname ); return true; } // never dump doledb any more. it's rdbtree only. if ( m_rdbId == RDB_DOLEDB ) { logTrace( g_conf.m_logTraceRdb, "END. %s: Rdb is doledb. Returning true", m_dbname ); return true; } // bail if already dumping //if ( m_dump.isDumping() ) return true; if ( m_inDumpLoop ) { logTrace( g_conf.m_logTraceRdb, "END. %s: Already dumping. Returning true", m_dbname ); return true; } // . if tree is saving do not dump it, that removes things from tree // . i think this caused a problem messing of RdbMem before when // both happened at once if (isSavingTree()) { logTrace( g_conf.m_logTraceRdb, "END. %s: Rdb tree/bucket is saving. Returning true", m_dbname ); return true; } // . if Process is saving, don't start a dump if ( g_process.m_mode == SAVE_MODE ) { logTrace( g_conf.m_logTraceRdb, "END. %s: Process is in save mode. Returning true", m_dbname ); return true; } // if it has been less than 3 seconds since our last failed attempt // do not try again to avoid flooding our log if ( getTime() - s_lastTryTime < 3 ) { logTrace( g_conf.m_logTraceRdb, "END. %s: Less than 3 seconds since last attempt. Returning true", m_dbname ); return true; } // don't dump if not 90% full if ( ! needsDump() ) { log(LOG_INFO, "db: %s tree not 90 percent full but dumping.",m_dbname); //return true; } // reset g_errno -- don't forget! g_errno = 0; // . wait for all unlinking and renaming activity to flush out // . we do not want to dump to a filename in the middle of being // unlinked if ( g_errno || g_numThreads > 0 ) { // update this so we don't try too much and flood the log // with error messages from RdbDump.cpp calling log() and // quickly kicking the log file over 2G which seems to // get the process killed s_lastTryTime = getTime(); // now log a message if ( g_numThreads > 0 ) { log( LOG_INFO, "db: Waiting for previous unlink/rename operations to finish before dumping %s.", m_dbname ); } else { log( LOG_WARN, "db: Failed to dump %s: %s.", m_dbname, mstrerror( g_errno ) ); } logTrace( g_conf.m_logTraceRdb, "END. %s: g_error=%s or g_numThreads=%d. Returning false", m_dbname, mstrerror( g_errno), g_numThreads ); return false; } // remember niceness for calling setDump() m_niceness = niceness; // debug msg log(LOG_INFO,"db: Dumping %s to disk. nice=%" PRId32,m_dbname,niceness); // record last dump time so main.cpp will not save us this period m_lastWrite = gettimeofdayInMilliseconds(); // only try to fix once per dump session int64_t start = m_lastWrite; //gettimeofdayInMilliseconds(); // do not do chain testing because that is too slow if ( m_useTree && ! m_tree.checkTree ( false /* printMsgs?*/, false/*chain?*/) ) { log( LOG_ERROR, "db: %s tree was corrupted in memory. Trying to fix. Your memory is probably bad. " "Please replace it.", m_dbname); // if fix failed why even try to dump? if ( ! m_tree.fixTree() ) { // only try to dump every 3 seconds s_lastTryTime = getTime(); log( LOG_ERROR, "db: Could not fix in memory data for %s. Abandoning dump.", m_dbname ); logTrace( g_conf.m_logTraceRdb, "END. %s: Unable to fix tree. Returning false", m_dbname ); return false; } } log( LOG_INFO, "db: Checking validity of in memory data of %s before dumping, " "took %" PRId64" ms.",m_dbname,gettimeofdayInMilliseconds()-start ); //// // // see what collnums are in the tree and just try those // //// CollectionRec *cr = NULL; for ( int32_t i = 0 ; i < g_collectiondb.m_numRecs ; i++ ) { cr = g_collectiondb.m_recs[i]; if ( ! cr ) continue; // reset his tree count flag thing cr->m_treeCount = 0; } if ( m_useTree ) { // now scan the rdbtree and inc treecount where appropriate for ( int32_t i = 0 ; i < m_tree.getMinUnusedNode() ; i++ ) { // skip node if parents is -2 (unoccupied) if ( m_tree.isEmpty() ) { continue; } // get rec from tree collnum cr = g_collectiondb.m_recs[m_tree.getCollnum(i)]; if ( cr ) { cr->m_treeCount++; } } } else { for(int32_t i = 0; i < m_buckets.getNumBuckets(); i++) { const RdbBucket *b = m_buckets.getBucket(i); collnum_t cn = b->getCollnum(); int32_t nk = b->getNumKeys(); cr = g_collectiondb.m_recs[cn]; if ( cr ) { cr->m_treeCount += nk; } } } // loop through collections, dump each one m_dumpCollnum = (collnum_t)-1; // clear this for dumpCollLoop() g_errno = 0; m_dumpErrno = 0; m_fn = -1000; // this returns false if blocked, which means we're ok, so we ret true if ( ! dumpCollLoop ( ) ) { logTrace( g_conf.m_logTraceRdb, "END. %s: dumpCollLoop blocked. Returning true", m_dbname ); return true; } // if it returns true with g_errno set, there was an error if ( g_errno ) { logTrace( g_conf.m_logTraceRdb, "END. %s: dumpCollLoop g_error=%s. Returning false", m_dbname, mstrerror( g_errno) ); return false; } // otherwise, it completed without blocking doneDumping(); logTrace( g_conf.m_logTraceRdb, "END. %s: Done dumping. Returning true", m_dbname ); return true; } // returns false if blocked, true otherwise bool Rdb::dumpCollLoop ( ) { logTrace( g_conf.m_logTraceRdb, "BEGIN %s", m_dbname ); loop: // if no more, we're done... if ( m_dumpCollnum >= getNumBases() ) { logTrace( g_conf.m_logTraceRdb, "END. %s: No more. Returning true", m_dbname ); return true; } // the only was g_errno can be set here is from a previous dump // error? if ( g_errno ) { hadError: // if swapped out, this will be NULL, so skip it RdbBase *base = NULL; if ( m_dumpCollnum >= 0 ) { CollectionRec *cr = g_collectiondb.m_recs[m_dumpCollnum]; if ( cr ) { base = cr->getBasePtr( m_rdbId ); } } log( LOG_ERROR, "build: Error dumping collection: %s.",mstrerror(g_errno)); // . if we wrote nothing, remove the file // . if coll was deleted under us, base will be NULL! if ( base && (! base->getFile(m_fn)->doesExist() || base->getFile(m_fn)->getFileSize() <= 0) ) { log("build: File %s is zero bytes, removing from memory.",base->getFile(m_fn)->getFilename()); base->buryFiles ( m_fn , m_fn+1 ); } // game over, man doneDumping(); // update this so we don't try too much and flood the log // with error messages s_lastTryTime = getTime(); logTrace( g_conf.m_logTraceRdb, "END. %s: Done dumping with g_errno=%s. Returning true", m_dbname, mstrerror( g_errno ) ); return true; } // advance for next round m_dumpCollnum++; // don't bother getting the base for all collections because // we end up swapping them in for ( ; m_dumpCollnum < getNumBases() ; m_dumpCollnum++ ) { // collection rdbs like statsdb are ok to process if ( m_isCollectionLess ) { break; } // otherwise get the coll rec now if ( !g_collectiondb.m_recs[m_dumpCollnum] ) { // skip if empty continue; } // ok, it's good to dump break; } // if no more, we're done... if ( m_dumpCollnum >= getNumBases() ) { return true; } // swap it in for dumping purposes if we have to // "cr" is NULL potentially for collectionless rdbs, like statsdb, // do we can't involve that... RdbBase *base = getBase(m_dumpCollnum); // hwo can this happen? error swappingin? if ( ! base ) { log( LOG_WARN, "rdb: dumpcollloop base was null for cn=%" PRId32, (int32_t)m_dumpCollnum); goto hadError; } // before we create the file, see if tree has anything for this coll if(m_useTree) { const char *k = KEYMIN(); int32_t nn = m_tree.getNextNode ( m_dumpCollnum , k ); if ( nn < 0 ) goto loop; if ( m_tree.getCollnum(nn) != m_dumpCollnum ) goto loop; } else { if(!m_buckets.collExists(m_dumpCollnum)) goto loop; } // . MDW ADDING A NEW FILE SHOULD BE IN RDBDUMP.CPP NOW... NO! // . get the biggest fileId int32_t id2 = m_isTitledb ? 0 : -1; // if we add to many files then we can not merge, because merge op // needs to add a file and it calls addNewFile() too static int32_t s_flag = 0; if ( base->getNumFiles() + 1 >= MAX_RDB_FILES ) { if ( s_flag < 10 ) log( LOG_WARN, "db: could not dump tree to disk for cn=" "%i %s because it has %" PRId32" files on disk. " "Need to wait for merge operation.", (int)m_dumpCollnum,m_dbname,base->getNumFiles()); s_flag++; goto loop; } // this file must not exist already, we are dumping the tree into it m_fn = base->addNewFile ( id2 ) ; if ( m_fn < 0 ) { log( LOG_LOGIC, "db: rdb: Failed to add new file to dump %s: %s.", m_dbname, mstrerror( g_errno ) ); return false; } log(LOG_INFO,"build: Dumping to %s/%s for coll \"%s\".", base->getFile(m_fn)->getDir(), base->getFile(m_fn)->getFilename() , g_collectiondb.getCollName ( m_dumpCollnum ) ); // what is the avg rec size? int32_t numRecs = getNumUsedNodes(); int32_t avgSize; if(m_useTree) { if ( numRecs <= 0 ) numRecs = 1; avgSize = m_tree.getMemOccupiedForList() / numRecs; } else { avgSize = m_buckets.getRecSize(); } // . don't get more than 3000 recs from the tree because it gets slow // . we'd like to write as much out as possible to reduce possible // file interlacing when synchronous writes are enabled. RdbTree:: // getList() should really be sped up by doing the neighbor node // thing. would help for adding lists, too, maybe. int32_t bufSize = 300 * 1024; int32_t bufSize2 = 3000 * avgSize; if (bufSize2 < bufSize) { bufSize = bufSize2; } if (!m_useTree) { //buckets are much faster at getting lists bufSize *= 4; } RdbBuckets *buckets = NULL; RdbTree *tree = NULL; if (m_useTree) { tree = &m_tree; } else { buckets = &m_buckets; } // . RdbDump will set the filename of the map we pass to this // . RdbMap should dump itself out CLOSE! // . it returns false if blocked, true otherwise & sets g_errno on err // . but we only return false on error here if (!m_dump.set(base->m_collnum, base->getFile(m_fn), buckets, tree, base->getTreeIndex(), base->getMap(m_fn), base->getIndex(m_fn), bufSize, // write buf size m_niceness, // niceness of 1 will NOT block this, // state doneDumpingCollWrapper, m_useHalfKeys, 0LL, // dst start offset KEYMIN(), // prev last key m_ks, // keySize this)) {// for setting m_needsToSave logTrace( g_conf.m_logTraceRdb, "END. %s: RdbDump blocked. Returning false", m_dbname ); return false; } // error? if ( g_errno ) { log("rdb: error dumping = %s . coll deleted from under us?", mstrerror(g_errno)); // shit, what to do here? this is causing our RdbMem // to get corrupted! // because if we end up continuing it calls doneDumping() // and updates RdbMem! maybe set a permanent error then! // and if that is there do not clear RdbMem! m_dumpErrno = g_errno; // for now core out //g_process.shutdownAbort(true); } // loop back up since we did not block goto loop; } static CollectionRec *s_mergeHead = NULL; static CollectionRec *s_mergeTail = NULL; void addCollnumToLinkedListOfMergeCandidates ( collnum_t dumpCollnum ) { // add this collection to the linked list of merge candidates CollectionRec *cr = g_collectiondb.getRec ( dumpCollnum ); if ( ! cr ) return; // do not double add it, if already there just return if ( cr->m_nextLink ) return; if ( cr->m_prevLink ) return; if ( s_mergeTail && cr ) { s_mergeTail->m_nextLink = cr; cr ->m_nextLink = NULL; cr ->m_prevLink = s_mergeTail; s_mergeTail = cr; } else if ( cr ) { cr->m_prevLink = NULL; cr->m_nextLink = NULL; s_mergeHead = cr; s_mergeTail = cr; } } // this is also called in Collectiondb::deleteRec2() void removeFromMergeLinkedList ( CollectionRec *cr ) { CollectionRec *prev = cr->m_prevLink; CollectionRec *next = cr->m_nextLink; cr->m_prevLink = NULL; cr->m_nextLink = NULL; if ( prev ) prev->m_nextLink = next; if ( next ) next->m_prevLink = prev; if ( s_mergeTail == cr ) s_mergeTail = prev; if ( s_mergeHead == cr ) s_mergeHead = next; } void Rdb::doneDumpingCollWrapper ( void *state ) { Rdb *THIS = (Rdb *)state; // we just finished dumping to a file, // so allow it to try to merge again. //RdbBase *base = THIS->getBase(THIS->m_dumpCollnum); //if ( base ) base->m_checkedForMerge = false; logTrace( g_conf.m_logTraceRdb, "%s", THIS->m_dbname ); // return if the loop blocked if ( ! THIS->dumpCollLoop() ) { return; } // otherwise, call big wrapper THIS->doneDumping(); } // Moved a lot of the logic originally here in Rdb::doneDumping into // RdbDump.cpp::dumpTree() void Rdb::doneDumping ( ) { // msg //log(LOG_INFO,"db: Done dumping %s to %s (#%" PRId32"): %s.", // m_dbname,m_files[n]->getFilename(),n,mstrerror(g_errno)); log(LOG_INFO,"db: Done dumping %s: %s.",m_dbname, mstrerror(m_dumpErrno)); // free mem in the primary buffer if ( ! m_dumpErrno ) { m_mem.freeDumpedMem( &m_tree ); } // . tell RdbDump it is done // . we have to set this here otherwise RdbMem's memory ring buffer // will think the dumping is no longer going on and use the primary // memory for allocating new titleRecs and such and that is not good! m_inDumpLoop = false; // . on g_errno the dumped file will be removed from "sync" file and // from m_files and m_maps // . TODO: move this logic into RdbDump.cpp //for ( int32_t i = 0 ; i < getNumBases() ; i++ ) { // if ( m_bases[i] ) m_bases[i]->doneDumping(); //} // if we're closing shop then return if ( m_isClosing ) { // continue closing, this returns false if blocked if (!close(m_closeState, m_closeCallback, false, true)) { return; } // otherwise, we call the callback m_closeCallback ( m_closeState ); return; } // try merge for all, first one that needs it will do it, preventing // the rest from doing it // don't attempt merge if we're niceness 0 if ( !m_niceness ) return; //attemptMerge ( 1 , false ); attemptMergeAllCallback(0,NULL); } void forceMergeAll(rdbid_t rdbId) { // set flag on all RdbBases for ( int32_t i = 0 ; i < g_collectiondb.m_numRecs ; i++ ) { CollectionRec *cr = g_collectiondb.m_recs[i]; if ( ! cr ) { log(LOG_INFO,"%s:%s:%d: coll %" PRId32" - could not get CollectionRec", __FILE__,__func__,__LINE__,i); continue; } RdbBase *base = cr->getBase ( rdbId ); if ( ! base ) { log(LOG_INFO,"%s:%s:%d: coll %" PRId32" - could not get RdbBase", __FILE__,__func__,__LINE__,i); continue; } log(LOG_INFO,"%s:%s:%d: coll %" PRId32" - Set next merge to Forced", __FILE__,__func__,__LINE__,i); base->m_nextMergeForced = true; } // and try to merge now attemptMergeAll(); } // this should be called every few seconds by the sleep callback, too void attemptMergeAllCallback ( int fd , void *state ) { attemptMergeAll(); } // called by main.cpp // . TODO: if rdbbase::attemptMerge() needs to launch a merge but can't // then do NOT remove from linked list. maybe set a flag like 'needsMerge' void attemptMergeAll() { // wait for any current merge to stop! if ( g_merge.isMerging() ) { log(LOG_INFO,"Attempted merge, but merge already running"); return; } int32_t niceness = MAX_NICENESS; static collnum_t s_lastCollnum = 0; int32_t count = 0; tryLoop: // if a collection got deleted, reset this to 0 if ( s_lastCollnum >= g_collectiondb.m_numRecs ) { s_lastCollnum = 0; // and return so we don't spin 1000 times over a single coll. return; } // limit to 1000 checks to save the cpu since we call this once // every 2 seconds. if ( ++count >= 1000 ) return; CollectionRec *cr = g_collectiondb.m_recs[s_lastCollnum]; if ( ! cr ) { s_lastCollnum++; goto tryLoop; } bool force = false; RdbBase *base ; // args = niceness, forceMergeAll, doLog, minToMergeOverride // if RdbBase::attemptMerge() returns true that means it // launched a merge and it will call attemptMergeAll2() when // the merge completes. base = cr->getBasePtr(RDB_POSDB); if ( base && base->attemptMerge(niceness,force,true) ) return; base = cr->getBasePtr(RDB_TITLEDB); if ( base && base->attemptMerge(niceness,force,true) ) return; base = cr->getBasePtr(RDB_TAGDB); if ( base && base->attemptMerge(niceness,force,true) ) return; base = cr->getBasePtr(RDB_LINKDB); if ( base && base->attemptMerge(niceness,force,true) ) return; base = cr->getBasePtr(RDB_SPIDERDB); if ( base && base->attemptMerge(niceness,force,true) ) return; base = cr->getBasePtr(RDB_CLUSTERDB); if ( base && base->attemptMerge(niceness,force,true) ) return; // also try to merge on rdbs being rebuilt base = cr->getBasePtr(RDB2_POSDB2); if ( base && base->attemptMerge(niceness,force,true) ) return; base = cr->getBasePtr(RDB2_TITLEDB2); if ( base && base->attemptMerge(niceness,force,true) ) return; base = cr->getBasePtr(RDB2_TAGDB2); if ( base && base->attemptMerge(niceness,force,true) ) return; base = cr->getBasePtr(RDB2_LINKDB2); if ( base && base->attemptMerge(niceness,force,true) ) return; base = cr->getBasePtr(RDB2_SPIDERDB2); if ( base && base->attemptMerge(niceness,force,true) ) return; base = cr->getBasePtr(RDB2_CLUSTERDB2); if ( base && base->attemptMerge(niceness,force,true) ) return; // try next collection s_lastCollnum++; goto tryLoop; } // . return false and set g_errno on error // . TODO: speedup with m_tree.addSortedKeys() already partially written bool Rdb::addList ( collnum_t collnum , RdbList *list, int32_t niceness ) { // pick it if ( collnum < 0 || collnum > getNumBases() || ! getBase(collnum) ) { g_errno = ENOCOLLREC; log(LOG_WARN, "db: %s bad collnum of %i.",m_dbname,collnum); return false; } // make sure list is reset list->resetListPtr(); // if nothing then just return true if ( list->isExhausted() ) return true; // sanity check if ( list->m_ks != m_ks ) { g_process.shutdownAbort(true); } // we now call getTimeGlobal() so we need to be in sync with host #0 if ( ! isClockInSync () ) { // log("rdb: can not add data because clock not in sync with " // "host #0. issuing try again reply."); g_errno = ETRYAGAIN; return false; } // if we are well into repair mode, level 2, do not add anything // to spiderdb or titledb... that can mess up our titledb scan. // we always rebuild tfndb, clusterdb and spiderdb // but we often just repair titledb, indexdb and datedb because // they are bigger. it may add to indexdb/datedb if ( g_repair.isRepairActive() && // but only check for collection we are repairing/rebuilding collnum == g_repair.m_collnum && // exception, spider status docs can be deleted from titledb // if user turns off 'index spider replies' before doing // the rebuild, when not rebuilding titledb. ((m_rdbId == RDB_TITLEDB && list->m_listSize != 12 ) || m_rdbId == RDB_POSDB || m_rdbId == RDB_CLUSTERDB || m_rdbId == RDB_LINKDB || m_rdbId == RDB_DOLEDB || m_rdbId == RDB_SPIDERDB ) ) { // allow banning of sites still log("db: How did an add come in while in repair mode? rdbId=%" PRId32,(int32_t)m_rdbId); g_errno = EREPAIRING; return false; } // if we are currently in a quickpoll, make sure we are not in // RdbTree::getList(), because we could mess that loop up by adding // or deleting a record into/from the tree now if ( m_tree.isGettingList() ) { g_errno = ETRYAGAIN; return false; } // prevent double entries if ( m_inAddList ) { // i guess the msg1 handler makes it this far! //log("db: msg1 add in an add."); g_errno = ETRYAGAIN; return false; } // lock it m_inAddList = true; // . if we don't have enough room to store list, initiate a dump and // return g_errno of ETRYAGAIN // . otherwise, we're guaranteed to have room for this list if ( ! hasRoom(list,niceness) ) { // stop it m_inAddList = false; // if tree is empty, list will never fit!!! if ( m_useTree && m_tree.getNumUsedNodes() <= 0 ) { g_errno = ELISTTOOBIG; log( LOG_WARN, "db: Tried to add a record that is simply too big (%" PRId32" bytes) to ever fit in " "the memory space for %s. Please increase the max memory for %s in gb.conf.", list->m_listSize, m_dbname, m_dbname ); return false; } // force initiate the dump now, but not if we are niceness 0 // because then we can't be interrupted with quickpoll! if ( niceness != 0 ) { logTrace( g_conf.m_logTraceRdb, "%s: Not enough room. Calling dumpTree", m_dbname ); dumpTree( 1/*niceness*/ ); } // set g_errno after intiating the dump! g_errno = ETRYAGAIN; // return false since we didn't add the list return false; } // otherwise, add one record at a time // unprotect tree from writes if ( m_tree.m_useProtection ) { m_tree.unprotect ( ); } do { char key[MAX_KEY_BYTES]; list->getCurrentKey(key); int32_t dataSize; char *data; // negative keys have no data if ( ! KEYNEG(key) ) { dataSize = list->getCurrentDataSize(); data = list->getCurrentData(); } else { dataSize = 0; data = NULL; } if ( ! addRecord ( collnum , key , data , dataSize ) ) { // bitch static int32_t s_last = 0; int32_t now = time(NULL); // . do not log this more than once per second to stop log spam // . i think this can really lockup the cpu, too if ( now - s_last != 0 ) { log( LOG_INFO, "db: Had error adding data to %s: %s.", m_dbname, mstrerror( g_errno )); } s_last = now; // force initiate the dump now if addRecord failed for no mem if ( g_errno == ENOMEM ) { // start dumping the tree to disk so we have room 4 add if ( niceness != 0 ) { logTrace( g_conf.m_logTraceRdb, "%s: Not enough memory. Calling dumpTree", m_dbname ); dumpTree( 1/*niceness*/ ); } // tell caller to try again later (1 second or so) g_errno = ETRYAGAIN; } // reprotect tree from writes if ( m_tree.m_useProtection ) m_tree.protect ( ); // stop it m_inAddList = false; // discontinue adding any more of the list return false; } } while ( list->skipCurrentRecord() ); // skip to next record, returns false on end of list // reprotect tree from writes if ( m_tree.m_useProtection ) m_tree.protect ( ); // stop it m_inAddList = false; // if tree is >= 90% full dump it if ( m_dump.isDumping() ) { logTrace( g_conf.m_logTraceRdb, "END. %s: is already dumping. Returning true", m_dbname ); return true; } // return true if not ready for dump yet if ( ! needsDump () ) { //logTrace( g_conf.m_logTraceRdb, "END. %s: doesn't need dump. Returning true", m_dbname ); return true; } // if dump started ok, return true if ( niceness != 0 ) { if ( dumpTree( 1/*niceness*/ ) ) { logTrace( g_conf.m_logTraceRdb, "END. %s: dumped tree. Returning true", m_dbname ); return true; } } // technically, since we added the record, it is not an error g_errno = 0; // . otherwise, bitch and return false with g_errno set // . usually this is because it is waiting for an unlink/rename // operation to complete... so make it LOG_INFO log(LOG_INFO,"db: Failed to dump data to disk for %s.",m_dbname); return true; } bool Rdb::needsDump ( ) const { if ( m_mem.is90PercentFull () ) { return true; } if ( m_useTree ) { if ( m_tree.is90PercentFull() ) { return true; } } else { if ( m_buckets.needsDump() ) { return true; } } // if adding to doledb and it has been > 1 day then force a dump // so that all the negative keys in the tree annihilate with the // keys on disk to make it easier to read a doledb list if ( m_rdbId != RDB_DOLEDB ) { return false; } // or dump doledb if a ton of negative recs... // otherwise, no need to dump doledb just yet return ( m_tree.getNumNegativeKeys() > 50000 ); } bool Rdb::hasRoom ( RdbList *list , int32_t niceness ) { // how many nodes will tree need? int32_t numNodes = list->getNumRecs( ); if ( !m_useTree && !m_buckets.hasRoom(numNodes)) return false; // how many nodes will tree need? // how much space will RdbMem, m_mem, need? //int32_t overhead = sizeof(key_t); int32_t overhead = m_ks; if ( list->getFixedDataSize() == -1 ) overhead += 4; // how much mem will the data use? int64_t dataSpace = list->getListSize() - (numNodes * overhead); // does tree have room for these nodes? if ( m_useTree && m_tree.getNumAvailNodes() < numNodes ) return false; // if we are doledb, we are a tree-only rdb, so try to reclaim // memory from deleted nodes. works by condesing the used memory. if ( m_rdbId == RDB_DOLEDB && // if there is no room left in m_mem (RdbMem class)... ( m_mem.m_ptr2 - m_mem.m_ptr1 < dataSpace||g_conf.m_forceIt) && //m_mem.m_ptr1 - m_mem.m_mem > 1024 ) { // and last time we tried this, if any, it reclaimed 1MB+ (m_lastReclaim>1024*1024||m_lastReclaim==-1||g_conf.m_forceIt)){ // reclaim the memory now. returns -1 and sets g_errno on error int32_t reclaimed = reclaimMemFromDeletedTreeNodes(niceness); // reset force flag g_conf.m_forceIt = false; // ignore errors for now g_errno = 0; // how much did we free up? if ( reclaimed >= 0 ) m_lastReclaim = reclaimed; } // does m_mem have room for "dataSpace"? if ( (int64_t)m_mem.getAvailMem() < dataSpace ) return false; // otherwise, we do have room return true; } // . NOTE: low bit should be set , only antiKeys (deletes) have low bit clear // . returns false and sets g_errno on error, true otherwise // . if RdbMem, m_mem, has no mem, sets g_errno to ETRYAGAIN and returns false // because dump should complete soon and free up some mem // . this overwrites dups bool Rdb::addRecord ( collnum_t collnum, char *key , char *data , int32_t dataSize) { if ( ! getBase(collnum) ) { g_errno = EBADENGINEER; log(LOG_LOGIC,"db: addRecord: collection #%i is gone.", collnum); return false; } // skip if tree not writable if ( ! g_process.m_powerIsOn ) { // log it every 3 seconds static int32_t s_last = 0; int32_t now = getTime(); if ( now - s_last > 3 ) { s_last = now; log("db: addRecord: power is off. try again."); } g_errno = ETRYAGAIN; return false; } // we can also use this logic to avoid adding to the waiting tree // because Process.cpp locks all the trees up at once and unlocks // them all at once as well. so since SpiderRequests are added to // spiderdb and then alter the waiting tree, this statement should // protect us. if (!isWritable()) { g_errno = ETRYAGAIN; return false; } // bail if we're closing if ( m_isClosing ) { g_errno = ECLOSING; return false; } // sanity check if ( KEYNEG(key) ) { if ( (dataSize > 0 && data) ) { log( LOG_WARN, "db: Got data for a negative key." ); g_process.shutdownAbort(true); } } // sanity check else if ( m_fixedDataSize >= 0 && dataSize != m_fixedDataSize ) { g_errno = EBADENGINEER; log(LOG_LOGIC,"db: addRecord: DataSize is %" PRId32" should " "be %" PRId32, dataSize,m_fixedDataSize ); g_process.shutdownAbort(true); } // do not add if range being dumped at all because when the // dump completes it calls deleteList() and removes the nodes from // the tree, so if you were overriding a node currently being dumped // we would lose it. if ( m_dump.isDumping() && //oppKey >= m_dump.getFirstKeyInQueue() && // ensure the dump is dumping the collnum of this key m_dump.getCollNum() == collnum && m_dump.getLastKeyInQueue() && // the dump should not split positive/negative keys so // if our positive/negative twin should be in the dump with us // or not in the dump with us, so any positive/negative // annihilation below should be ok and we should be save // to call deleteNode() below KEYCMP(key,m_dump.getFirstKeyInQueue(),m_ks)>=0 && //oppKey <= m_dump.getLastKeyInQueue () ) goto addIt; KEYCMP(key,m_dump.getLastKeyInQueue (),m_ks)<=0 ) { // tell caller to wait and try again later g_errno = ETRYAGAIN; return false; } // save orig char *orig = NULL; // copy the data before adding if we don't already own it if ( data ) { // save orig orig = data; // sanity check if ( m_fixedDataSize == 0 && dataSize > 0 ) { g_errno = EBADENGINEER; log(LOG_LOGIC,"db: addRecord: Data is present. Should not be"); return false; } data = (char *) m_mem.dupData ( key, data, dataSize, collnum); if ( ! data ) { g_errno = ETRYAGAIN; log(LOG_WARN, "db: Could not allocate %" PRId32" bytes to add data to %s. Retrying.",dataSize,m_dbname); return false; } } // . TODO: save this tree-walking state for adding the node!!! // . TODO: use somethin like getNode(key,&lastNode) // then addNode (lastNode,key,data,dataSize) // int32_t lastNode; // . #1) if we're adding a positive key, replace negative counterpart // in the tree, because we'll override the positive rec it was // deleting // . #2) if we're adding a negative key, replace positive counterpart // in the tree, but we must keep negative rec in tree in case // the positive counterpart was overriding one on disk (as in #1) //key_t oppKey = key ; char oppKey[MAX_KEY_BYTES]; int32_t n = -1; if ( m_useTree ) { // make the opposite key of "key" KEYSET(oppKey,key,m_ks); KEYXOR(oppKey,0x01); // look it up n = m_tree.getNode ( collnum , oppKey ); } if ( m_rdbId == RDB_DOLEDB && g_conf.m_logDebugSpider ) { // must be 96 bits if ( m_ks != 12 ) { g_process.shutdownAbort(true); } // set this key_t doleKey = *(key_t *)key; // remove from g_spiderLoop.m_lockTable too! if ( KEYNEG(key) ) { // log debug logf(LOG_DEBUG,"spider: removed doledb key " "for pri=%" PRId32" time=%" PRIu32" uh48=%" PRIu64, (int32_t)g_doledb.getPriority(&doleKey), (uint32_t)g_doledb.getSpiderTime(&doleKey), g_doledb.getUrlHash48(&doleKey)); } else { // do not overflow! // log debug SpiderRequest *sreq = (SpiderRequest *)data; logf(LOG_DEBUG,"spider: added doledb key " "for pri=%" PRId32" time=%" PRIu32" " "uh48=%" PRIu64" " //"docid=%" PRId64" " "u=%s", (int32_t)g_doledb.getPriority(&doleKey), (uint32_t)g_doledb.getSpiderTime(&doleKey), g_doledb.getUrlHash48(&doleKey), //sreq->m_probDocId, sreq->m_url); } } // if it exists then annihilate it if ( n >= 0 ) { // CAUTION: we should not annihilate with oppKey if oppKey may // be in the process of being dumped to disk! This would // render our annihilation useless and make undeletable data /* if ( m_dump.isDumping() && //oppKey >= m_dump.getFirstKeyInQueue() && m_dump.m_lastKeyInQueue && KEYCMP(oppKey,m_dump.getFirstKeyInQueue(),m_ks)>=0 && //oppKey <= m_dump.getLastKeyInQueue () ) goto addIt; KEYCMP(oppKey,m_dump.getLastKeyInQueue (),m_ks)<=0 ) goto addIt; */ // . otherwise, we can REPLACE oppKey // . we NO LONGER annihilate with him. why? // . freeData should be true, the tree doesn't own the data // so it shouldn't free it really m_tree.deleteNode3 ( n , true ); // false =freeData?); // mark as changed //if ( ! m_needsSave ) { // m_needsSave = true; //} } // if we have no files on disk for this db, don't bother // preserving a a negative rec, it just wastes tree space if ( KEYNEG(key) && m_useTree ) { // return if all data is in the tree if ( getBase(collnum)->getNumFiles() == 0 ) { return true; } // . otherwise, assume we match a positive... } // // Add data record to the current index file for the -saved.dat file. // This index is stored in the Rdb record- the individual part file // indexes are in RdbBase and are read-only except when merging). // RdbIndex *index = getBase(collnum)->getTreeIndex(); if (index) { index->addRecord(key); } // . TODO: add using "lastNode" as a start node for the insertion point // . should set g_errno if failed // . caller should retry on g_errno of ETRYAGAIN or ENOMEM if ( !m_useTree ) { // debug indexdb if ( m_buckets.addNode ( collnum , key , data , dataSize )>=0){ return true; } } // . cancel any spider request that is a dup in the dupcache to save disk space // . twins might have different dupcaches so they might have different dups, // but it shouldn't be a big deal because they are dups! if ( m_rdbId == RDB_SPIDERDB && ! KEYNEG(key) ) { // . this will create it if spiders are on and its NULL // . even if spiders are off we need to create it so // that the request can adds its ip to the waitingTree SpiderColl *sc = g_spiderCache.getSpiderColl(collnum); // skip if not there if ( ! sc ) { return true; } SpiderRequest *sreq = (SpiderRequest *)( orig - 4 - sizeof(key128_t) ); // is it really a request and not a SpiderReply? bool isReq = g_spiderdb.isSpiderRequest ( &( sreq->m_key ) ); // skip if in dup cache. do NOT add to cache since // addToWaitingTree() in Spider.cpp will do that when called // from addSpiderRequest() below if ( isReq && sc->isInDupCache ( sreq , false ) ) { logDebug( g_conf.m_logDebugSpider, "spider: adding spider req %s is dup. skipping.", sreq->m_url ); return true; } // if we are overflowing... if ( isReq && ! sreq->m_isAddUrl && ! sreq->m_isPageReindex && ! sreq->m_urlIsDocId && ! sreq->m_forceDelete && sc->isFirstIpInOverflowList ( sreq->m_firstIp ) ) { logDebug( g_conf.m_logDebugSpider, "spider: skipping for overflow url %s ", sreq->m_url ); g_stats.m_totalOverflows++; return true; } } int32_t tn; if ( m_useTree && (tn=m_tree.addNode (collnum,key,data,dataSize))>=0) { // if adding to spiderdb, add to cache, too if ( m_rdbId != RDB_SPIDERDB && m_rdbId != RDB_DOLEDB ) return true; // or if negative key if ( KEYNEG(key) ) return true; // . this will create it if spiders are on and its NULL // . even if spiders are off we need to create it so // that the request can adds its ip to the waitingTree SpiderColl *sc = g_spiderCache.getSpiderColl(collnum); // skip if not there if ( ! sc ) return true; // if doing doledb... if ( m_rdbId == RDB_DOLEDB ) { int32_t pri = g_doledb.getPriority((key_t *)key); // skip over corruption if ( pri < 0 || pri >= MAX_SPIDER_PRIORITIES ) return true; // if added positive key is before cursor, update curso if ( KEYCMP((char *)key, (char *)&sc->m_nextKeys[pri], sizeof(key_t)) < 0 ) { KEYSET((char *)&sc->m_nextKeys[pri], (char *)key, sizeof(key_t) ); // debug log if ( g_conf.m_logDebugSpider ) log("spider: cursor reset pri=%" PRId32" to " "%s", pri,KEYSTR(key,12)); } // that's it for doledb mods return true; } // . ok, now add that reply to the cache // . g_now is in milliseconds! //int32_t nowGlobal = localToGlobalTimeSeconds ( g_now/1000 ); //int32_t nowGlobal = getTimeGlobal(); // assume this is the rec (4 byte dataSize,spiderdb key is // now 16 bytes) SpiderRequest *sreq=(SpiderRequest *)(orig-4-sizeof(key128_t)); // is it really a request and not a SpiderReply? char isReq = g_spiderdb.isSpiderRequest ( &sreq->m_key ); // add the request if ( isReq ) { // log that. why isn't this undoling always if ( g_conf.m_logDebugSpider ) logf(LOG_DEBUG,"spider: rdb: added spider " "request to spiderdb rdb tree " "addnode=%" PRId32" " "request for uh48=%" PRIu64" prntdocid=%" PRIu64" " "firstIp=%s spiderdbkey=%s", tn, sreq->getUrlHash48(), sreq->getParentDocId(), iptoa(sreq->m_firstIp), KEYSTR((char *)&sreq->m_key, sizeof(key128_t))); // false means to NOT call evaluateAllRequests() // because we call it below. the reason we do this // is because it does not always get called // in addSpiderRequest(), like if its a dup and // gets "nuked". (removed callEval arg since not // really needed) sc->addSpiderRequest ( sreq, gettimeofdayInMilliseconds() ); } // otherwise repl else { // shortcut - cast it to reply SpiderReply *rr = (SpiderReply *)sreq; // log that. why isn't this undoling always if ( g_conf.m_logDebugSpider ) logf(LOG_DEBUG,"rdb: rdb: got spider reply" " for uh48=%" PRIu64,rr->getUrlHash48()); // add the reply sc->addSpiderReply(rr); // don't actually add it if "fake". i.e. if it // was an internal error of some sort... this will // make it try over and over again i guess... // no because we need some kinda reply so that gb knows // the pagereindex docid-based spider requests are done, // at least for now, because the replies were not being // added for now. just for internal errors at least... // we were not adding spider replies to the page reindexes // as they completed and when i tried to rerun it // the title recs were not found since they were deleted, // so we gotta add the replies now. int32_t indexCode = rr->m_errCode; if ( //indexCode == EINTERNALERROR || indexCode == EABANDONED ) { log("rdb: not adding spiderreply to rdb " "because " "it was an internal error for uh48=%" PRIu64" " "errCode = %s", rr->getUrlHash48(), mstrerror(indexCode)); m_tree.deleteNode3(tn,false); } } // clear errors from adding to SpiderCache g_errno = 0; // all done return true; } // enhance the error message const char *ss =""; if ( m_tree.isSaving() ) ss = " Tree is saving."; if ( !m_useTree && m_buckets.isSaving() ) ss = " Buckets are saving."; log(LOG_INFO, "db: Had error adding data to %s: %s. %s", m_dbname, mstrerror(g_errno), ss); return false; } // . use the maps and tree to estimate the size of this list w/o hitting disk // . used by Indexdb.cpp to get the size of a list for IDF weighting purposes int64_t Rdb::getListSize ( collnum_t collnum, //key_t startKey , key_t endKey , key_t *max , char *startKey , char *endKey , char *max , int64_t oldTruncationLimit ) { // pick it //collnum_t collnum = g_collectiondb.getCollnum ( coll ); if ( collnum < 0 || collnum > getNumBases() || ! getBase(collnum) ) { log(LOG_WARN, "db: %s bad collnum of %i", m_dbname, collnum); return false; } return getBase(collnum)->getListSize(startKey,endKey,max, oldTruncationLimit); } int64_t Rdb::getNumGlobalRecs ( ) { return getNumTotalRecs() * g_hostdb.m_numShards;//Groups; } // . return number of positive records - negative records int64_t Rdb::getNumTotalRecs ( bool useCache ) { // are we catdb or statsdb? then we have no associated collections // because we are used globally, by all collections if ( m_isCollectionLess ) return m_collectionlessBase->getNumTotalRecs(); // this gets slammed w/ too many collections so use a cache... //if ( g_collectiondb.m_numRecsUsed > 10 ) { int32_t now = 0; if ( useCache ) { now = getTimeLocal(); if ( now - m_cacheLastTime == 0 ) return m_cacheLastTotal; } // same as num recs int32_t nb = getNumBases(); int64_t total = 0LL; //return 0; // too many collections!! for ( int32_t i = 0 ; i < nb ; i++ ) { CollectionRec *cr = g_collectiondb.m_recs[i]; if ( ! cr ) continue; // if swapped out, this will be NULL, so skip it RdbBase *base = cr->getBasePtr(m_rdbId); if ( ! base ) continue; total += base->getNumTotalRecs(); } // . add in the btree // . TODO: count negative and positive recs in the b-tree //total += m_tree.getNumPositiveKeys(); //total -= m_tree.getNumNegativeKeys(); if ( now ) { m_cacheLastTime = now; m_cacheLastTotal = total; } return total; } int64_t Rdb::getCollNumTotalRecs ( collnum_t collnum ) { if ( collnum < 0 ) return 0; CollectionRec *cr = g_collectiondb.m_recs[collnum]; if ( ! cr ) return 0; // if swapped out, this will be NULL, so skip it RdbBase *base = cr->getBasePtr(m_rdbId); if ( ! base ) { log("rdb: getcollnumtotalrecs: base swapped out"); return 0; } return base->getNumTotalRecs(); } // . how much mem is alloced for all of our maps? // . we have one map per file int64_t Rdb::getMapMemAlloced () { int64_t total = 0; for ( int32_t i = 0 ; i < getNumBases() ; i++ ) { // skip null base if swapped out CollectionRec *cr = g_collectiondb.m_recs[i]; if ( ! cr ) return true; RdbBase *base = cr->getBasePtr(m_rdbId); //RdbBase *base = getBase(i); if ( ! base ) continue; total += base->getMapMemAlloced(); } return total; } // sum of all parts of all big files int32_t Rdb::getNumSmallFiles ( ) { int32_t total = 0; for ( int32_t i = 0 ; i < getNumBases() ; i++ ) { // skip null base if swapped out CollectionRec *cr = g_collectiondb.m_recs[i]; if ( ! cr ) return true; RdbBase *base = cr->getBasePtr(m_rdbId); //RdbBase *base = getBase(i); if ( ! base ) continue; total += base->getNumSmallFiles(); } return total; } // sum of all parts of all big files int32_t Rdb::getNumFiles ( ) { int32_t total = 0; for ( int32_t i = 0 ; i < getNumBases() ; i++ ) { CollectionRec *cr = g_collectiondb.m_recs[i]; if ( ! cr ) continue; // if swapped out, this will be NULL, so skip it RdbBase *base = cr->getBasePtr(m_rdbId); //RdbBase *base = getBase(i); if ( ! base ) continue; total += base->getNumFiles(); } return total; } int64_t Rdb::getDiskSpaceUsed ( ) { int64_t total = 0; for ( int32_t i = 0 ; i < getNumBases() ; i++ ) { CollectionRec *cr = g_collectiondb.m_recs[i]; if ( ! cr ) continue; // if swapped out, this will be NULL, so skip it RdbBase *base = cr->getBasePtr(m_rdbId); //RdbBase *base = getBase(i); if ( ! base ) continue; total += base->getDiskSpaceUsed(); } return total; } bool Rdb::isMerging() const { // use this for speed return (bool)m_numMergesOut; } static Rdb *s_table9 [ RDB_END ]; // maps an rdbId to an Rdb Rdb *getRdbFromId ( rdbid_t rdbId ) { static bool s_init = false; if ( ! s_init ) { s_init = true; memset ( s_table9, 0, sizeof(s_table9) ); s_table9 [ RDB_TAGDB ] = g_tagdb.getRdb(); s_table9 [ RDB_POSDB ] = g_posdb.getRdb(); s_table9 [ RDB_TITLEDB ] = g_titledb.getRdb(); s_table9 [ RDB_SPIDERDB ] = g_spiderdb.getRdb(); s_table9 [ RDB_DOLEDB ] = g_doledb.getRdb(); s_table9 [ RDB_CLUSTERDB ] = g_clusterdb.getRdb(); s_table9 [ RDB_LINKDB ] = g_linkdb.getRdb(); s_table9 [ RDB_STATSDB ] = g_statsdb.getRdb(); s_table9 [ RDB2_POSDB2 ] = g_posdb2.getRdb(); s_table9 [ RDB2_TITLEDB2 ] = g_titledb2.getRdb(); s_table9 [ RDB2_SPIDERDB2 ] = g_spiderdb2.getRdb(); s_table9 [ RDB2_CLUSTERDB2 ] = g_clusterdb2.getRdb(); s_table9 [ RDB2_LINKDB2 ] = g_linkdb2.getRdb(); s_table9 [ RDB2_TAGDB2 ] = g_tagdb2.getRdb(); } if ( rdbId >= RDB_END ) return NULL; return s_table9 [ rdbId ]; } // the opposite of the above rdbid_t getIdFromRdb ( Rdb *rdb ) { if ( rdb == g_tagdb.getRdb () ) return RDB_TAGDB; if ( rdb == g_posdb.getRdb () ) return RDB_POSDB; if ( rdb == g_titledb.getRdb () ) return RDB_TITLEDB; if ( rdb == g_spiderdb.getRdb () ) return RDB_SPIDERDB; if ( rdb == g_doledb.getRdb () ) return RDB_DOLEDB; if ( rdb == g_clusterdb.getRdb () ) return RDB_CLUSTERDB; if ( rdb == g_statsdb.getRdb () ) return RDB_STATSDB; if ( rdb == g_linkdb.getRdb () ) return RDB_LINKDB; if ( rdb == g_posdb2.getRdb () ) return RDB2_POSDB2; if ( rdb == g_tagdb2.getRdb () ) return RDB2_TAGDB2; if ( rdb == g_titledb2.getRdb () ) return RDB2_TITLEDB2; if ( rdb == g_spiderdb2.getRdb () ) return RDB2_SPIDERDB2; if ( rdb == g_clusterdb2.getRdb () ) return RDB2_CLUSTERDB2; if ( rdb == g_linkdb2.getRdb () ) return RDB2_LINKDB2; log(LOG_LOGIC,"db: getIdFromRdb: no rdbId for %s.",rdb->getDbname()); return RDB_NONE; } bool isSecondaryRdb ( rdbid_t rdbId ) { switch ( rdbId ) { case RDB2_POSDB2 : return true; case RDB2_TAGDB2 : return true; case RDB2_TITLEDB2 : return true; case RDB2_SPIDERDB2 : return true; case RDB2_CLUSTERDB2 : return true; case RDB2_LINKDB2 : return true; default: return false; } } // use a quick table now... char getKeySizeFromRdbId(rdbid_t rdbId) { static bool s_flag = true; static char s_table1[RDB_END]; if ( s_flag ) { // only stock the table once s_flag = false; // . loop over all possible rdbIds // . RDB_NONE is 0! for (int32_t i = 1; i < RDB_END; ++i) { // assume 12 int32_t ks = 12; if (i == RDB_SPIDERDB || i == RDB2_SPIDERDB2 || i == RDB_TAGDB || i == RDB2_TAGDB2) { ks = 16; } else if (i == RDB_POSDB || i == RDB2_POSDB2) { ks = sizeof(key144_t); } else if (i == RDB_LINKDB || i == RDB2_LINKDB2) { ks = sizeof(key224_t); } // set the table s_table1[i] = ks; } } // sanity check if (s_table1[rdbId] == 0) { log(LOG_ERROR, "rdb: bad lookup rdbid of %i", (int)rdbId); g_process.shutdownAbort(true); } return s_table1[rdbId]; } // returns -1 if dataSize is variable int32_t getDataSizeFromRdbId ( uint8_t rdbId ) { static bool s_flag = true; static int32_t s_table2[80]; if ( s_flag ) { // only stock the table once s_flag = false; // sanity check if ( RDB_END >= 80 ) { g_process.shutdownAbort(true); } // loop over all possible rdbIds for ( int32_t i = 1 ; i < RDB_END ; i++ ) { // assume none int32_t ds = 0; // only these are 16 as of now if ( i == RDB_POSDB || i == RDB_CLUSTERDB || i == RDB_LINKDB ) ds = 0; else if ( i == RDB_TITLEDB || i == RDB_TAGDB || i == RDB_SPIDERDB || i == RDB_DOLEDB ) ds = -1; else if ( i == RDB_STATSDB ) ds = sizeof(StatData); else if ( i == RDB2_POSDB2 || i == RDB2_CLUSTERDB2 || i == RDB2_LINKDB2 ) ds = 0; else if ( i == RDB2_TITLEDB2 || i == RDB2_TAGDB2 || i == RDB2_SPIDERDB2 ) ds = -1; else { continue; } // set the table s_table2[i] = ds; } } return s_table2[rdbId]; } // get the dbname const char *getDbnameFromId ( uint8_t rdbId ) { Rdb *rdb = getRdbFromId ( rdbId ); if ( rdb ) return rdb->getDbname(); log(LOG_LOGIC,"db: rdbId of %" PRId32" is invalid.",(int32_t)rdbId); return "INVALID"; } // get the RdbBase class for an rdbId and collection name RdbBase *getRdbBase(rdbid_t rdbId, const char *coll) { Rdb *rdb = getRdbFromId ( rdbId ); if ( ! rdb ) { log("db: Collection \"%s\" does not exist.",coll); return NULL; } // catdb is a special case collnum_t collnum ; if ( rdb->isCollectionless() ) collnum = (collnum_t) 0; else collnum = g_collectiondb.getCollnum ( coll ); if(collnum == -1) { g_errno = ENOCOLLREC; return NULL; } //return rdb->m_bases [ collnum ]; return rdb->getBase(collnum); } // get the RdbBase class for an rdbId and collection name RdbBase *getRdbBase(rdbid_t rdbId, collnum_t collnum) { Rdb *rdb = getRdbFromId ( rdbId ); if ( ! rdb ) { log("db: Collection #%" PRId32" does not exist.",(int32_t)collnum); return NULL; } if ( rdb->isCollectionless() ) collnum = (collnum_t) 0; return rdb->getBase(collnum); } // calls addList above bool Rdb::addList ( const char *coll , RdbList *list, int32_t niceness ) { // catdb has no collection per se if ( m_isCollectionLess ) return addList ((collnum_t)0,list,niceness); collnum_t collnum = g_collectiondb.getCollnum ( coll ); if ( collnum < (collnum_t) 0 ) { g_errno = ENOCOLLREC; log(LOG_WARN, "db: Could not add list because collection \"%s\" does not exist.",coll); return false; } return addList ( collnum , list, niceness ); } int32_t Rdb::getNumUsedNodes ( ) const { if(m_useTree) return m_tree.getNumUsedNodes(); return m_buckets.getNumKeys(); } int32_t Rdb::getMaxTreeMem() const { if(m_useTree) return m_tree.getMaxMem(); return m_buckets.getMaxMem(); } int32_t Rdb::getNumNegativeKeys() const { if(m_useTree) return m_tree.getNumNegativeKeys(); return m_buckets.getNumNegativeKeys(); } int32_t Rdb::getTreeMemOccupied() const { if(m_useTree) return m_tree.getMemOccupied(); return m_buckets.getMemOccupied(); } int32_t Rdb::getTreeMemAlloced () const { if(m_useTree) return m_tree.getMemAlloced(); return m_buckets.getMemAlloced(); } void Rdb::disableWrites () { if(m_useTree) m_tree.disableWrites(); else m_buckets.disableWrites(); } void Rdb::enableWrites () { if(m_useTree) m_tree.enableWrites(); else m_buckets.enableWrites(); } bool Rdb::isWritable ( ) const { if(m_useTree) return m_tree.isWritable(); return m_buckets.isWritable(); } bool Rdb::needsSave() const { if(m_useTree) return m_tree.needsSave(); else return m_buckets.needsSave(); } void Rdb::cleanTree() { if(m_useTree) return m_tree.cleanTree(); else return m_buckets.cleanBuckets(); } // if we are doledb, we are a tree-only rdb, so try to reclaim // memory from deleted nodes. works by condensing the used memory. // returns how much we reclaimed. int32_t Rdb::reclaimMemFromDeletedTreeNodes( int32_t niceness ) { log("rdb: reclaiming tree mem for doledb"); // this only works for non-dumped RdbMem right now, i.e. doledb only if ( m_rdbId != RDB_DOLEDB ) { g_process.shutdownAbort(true); } // start scanning the mem pool char *p = m_mem.m_mem; char *pend = m_mem.m_ptr1; char *memEnd = m_mem.m_mem + m_mem.m_memSize; char *dst = p; int32_t inUseOld = pend - p; char *pstart = p; int32_t marked = 0; int32_t occupied = 0; HashTableX ht; if (!ht.set ( 4, 4, m_tree.getNumUsedNodes()*2, NULL , 0 , false , niceness , "trectbl", true )) // useMagic? yes.. return -1; int32_t dups = 0; // mark the data of unoccupied nodes somehow int32_t nn = m_tree.getMinUnusedNode(); for ( int32_t i = 0 ; i < nn ; i++ ) { // skip empty nodes in tree if ( m_tree.isEmpty(i) ) {marked++; continue; } // get data ptr char *data = m_tree.getData(i); // and key ptr, if negative skip it //char *key = m_tree.getKey(i); //if ( (key[0] & 0x01) == 0x00 ) { occupied++; continue; } // sanity, ensure legit if ( data < pstart ) { g_process.shutdownAbort(true); } // offset int32_t doff = (int32_t)(data - pstart); // a dup? sanity check if ( ht.isInTable ( &doff ) ) { int32_t *vp = (int32_t *) ht.getValue ( &doff ); log("rdb: reclaim got dup oldi=0x%" PTRFMT" " "newi=%" PRId32" dataoff=%" PRId32"." ,(PTRTYPE)vp,i,doff); //while ( 1 == 1 ) sleep(1); dups++; continue; } // indicate it is legit int32_t val = i; ht.addKey ( &doff , &val ); occupied++; } if ( occupied + dups != m_tree.getNumUsedNodes() ) log("rdb: reclaim mismatch1"); if ( ht.getNumSlotsUsed() + dups != m_tree.getNumUsedNodes() ) log("rdb: reclaim mismatch2"); int32_t skipped = 0; // the spider requests should be linear in there. so we can scan // them. then put their offset into a map that maps it to the new // offset after doing the memmove(). for ( ; p < pend ; ) { SpiderRequest *sreq = (SpiderRequest *)p; int32_t oldOffset = p - pstart; int32_t recSize = sreq->getRecSize(); // negative key? this shouldn't happen if ( (sreq->m_key.n0 & 0x01) == 0x00 ) { log("rdb: reclaim got negative doldb key in scan"); p += sizeof(key_t); skipped++; continue; } // if not in hash table it was deleted from tree i guess if ( ! ht.isInTable ( &oldOffset ) ) { p += recSize; skipped++; continue; } // corrupted? or breach of mem buf? if ( sreq->isCorrupt() || dst + recSize > memEnd ) { log( LOG_WARN, "rdb: not readding corrupted doledb1 in scan. deleting from tree."); g_process.shutdownAbort(true); } //// re -add with the proper value now // // otherwise, copy it over if still in tree gbmemcpy ( dst , p , recSize ); int32_t newOffset = dst - pstart; // store in map, overwrite old value of 1 ht.addKey ( &oldOffset , &newOffset ); dst += recSize; p += recSize; } //if ( skipped != marked ) { g_process.shutdownAbort(true); } // sanity -- this breaks us. i tried taking the quickpolls out to stop // if(ht.getNumSlotsUsed()!=m_tree.m_numUsedNodes){ // log("rdb: %" PRId32" != %" PRId32 // ,ht.getNumSlotsUsed() // ,m_tree.m_numUsedNodes // ); // while(1==1)sleep(1); // g_process.shutdownAbort(true); // } int32_t inUseNew = dst - pstart; // update mem class as well m_mem.m_ptr1 = dst; // how much did we reclaim int32_t reclaimed = inUseOld - inUseNew; if ( reclaimed < 0 ) { g_process.shutdownAbort(true); } if ( inUseNew < 0 ) { g_process.shutdownAbort(true); } if ( inUseNew > m_mem.m_memSize ) { g_process.shutdownAbort(true); } //if ( reclaimed == 0 && marked ) { g_process.shutdownAbort(true);} // now update data ptrs in the tree, m_data[] for ( int i = 0 ; i < nn ; i++ ) { // skip empty nodes in tree if ( m_tree.isEmpty(i)) continue; // update the data otherwise char *data = m_tree.getData(i); // sanity, ensure legit if ( data < pstart ) { g_process.shutdownAbort(true); } int32_t offset = data - pstart; int32_t *newOffsetPtr = (int32_t *)ht.getValue ( &offset ); if ( ! newOffsetPtr ) { g_process.shutdownAbort(true); } char *newData = pstart + *newOffsetPtr; m_tree.setData(i, newData); } log("rdb: reclaimed %" PRId32" bytes after scanning %" PRId32" " "undeleted nodes and %" PRId32" deleted nodes for doledb" ,reclaimed,nn,marked); // return # of bytes of mem we reclaimed return reclaimed; }