Preparation for running RdbDump in a thread.

Modify all Rdb::dumpTree calls to Rdb::submitDumpJob.
Add GbThreadQueue for dump thread. Call initialization & finalize functions.
This commit is contained in:
Ai Lin Chia
2017-04-28 10:45:12 +02:00
parent 9f3c3ca616
commit 9eb5adf398
11 changed files with 106 additions and 54 deletions

@ -254,8 +254,8 @@ void DailyMerge::dailyMergeLoop ( ) {
// tell it to save, otherwise this might not get saved
m_cr->setNeedsSave();
// initiate dumps
g_spiderdb.getRdb ()->dumpTree();
g_linkdb.getRdb ()->dumpTree();
g_spiderdb.getRdb ()->submitRdbDumpJob(true);
g_linkdb.getRdb ()->submitRdbDumpJob(true);
// if neither has recs in tree, go to next mode
if(g_spiderdb.getRdb()->getNumUsedNodes()>0) return;
if(g_linkdb .getRdb()->getNumUsedNodes()>0) return;

@ -305,7 +305,7 @@ static bool Msg4In::addMetaList(const char *p, UdpSlot *slot) {
if (rdb->isDumping()) {
anyDumping = true;
} else if (!rdb->hasRoom(rdbItem.second.m_numRecs, rdbItem.second.m_dataSizes)) {
rdb->dumpTree();
rdb->submitRdbDumpJob(true);
hasRoom = false;
}
}
@ -392,11 +392,7 @@ static bool Msg4In::addMetaList(const char *p, UdpSlot *slot) {
// Initiate dumps for any Rdbs wanting it
for (auto const &rdbItem : rdbItems) {
Rdb *rdb = getRdbFromId(rdbItem.first);
if (rdb->needsDump()) {
logDebug(g_conf.m_logDebugSpider, "Rdb %s needs dumping", getDbnameFromId(rdbItem.first));
rdb->dumpTree();
// we ignore the return value because we have processed the list/msg4
}
rdb->submitRdbDumpJob(false);
}
// success

@ -683,12 +683,12 @@ static bool CommandForceIt(const char *rec) {
}
static bool CommandDiskDump(const char *rec) {
g_clusterdb.getRdb()->dumpTree();
g_tagdb.getRdb()->dumpTree();
g_spiderdb.getRdb()->dumpTree();
g_posdb.getRdb()->dumpTree();
g_titledb.getRdb()->dumpTree();
g_linkdb.getRdb()->dumpTree();
g_clusterdb.getRdb()->submitRdbDumpJob(true);
g_tagdb.getRdb()->submitRdbDumpJob(true);
g_spiderdb.getRdb()->submitRdbDumpJob(true);
g_posdb.getRdb()->submitRdbDumpJob(true);
g_titledb.getRdb()->submitRdbDumpJob(true);
g_linkdb.getRdb()->submitRdbDumpJob(true);
g_errno = 0;
return true;
}

@ -653,6 +653,8 @@ bool Process::shutdown2() {
RdbBase::finalizeGlobalIndexThread();
Msg4In::finalizeIncomingThread();
Rdb::finalizeRdbDumpThread();
g_jobScheduler.cancel_all_jobs_for_shutdown();
static bool s_printed = false;

106
Rdb.cpp

@ -713,39 +713,77 @@ bool Rdb::loadTree ( ) {
return true;
}
/// @todo ALC consider if we need one per rdb
static GbThreadQueue s_rdbDumpThreadQueue;
static time_t s_lastTryTime = 0;
void Rdb::submitRdbDumpJob(bool forceDump) {
logTrace(g_conf.m_logTraceRdb, "BEGIN %s", m_dbname);
if (getNumUsedNodes() <= 0) {
logTrace(g_conf.m_logTraceRdb, "END. %s: No used nodes/keys. Returning", m_dbname);
return;
}
// never dump doledb any more. it's rdbtree only.
if (m_rdbId == RDB_DOLEDB) {
logTrace(g_conf.m_logTraceRdb, "END. %s: Rdb is doledb. Returning", m_dbname);
return;
}
// if it has been less than 3 seconds since our last failed attempt
// do not try again to avoid flooding our log
if (getTime() - s_lastTryTime < 3) {
logTrace(g_conf.m_logTraceRdb, "END. %s: Less than 3 seconds since last attempt. Returning", m_dbname);
return;
}
// don't dump if not 90% full
if (!forceDump && !needsDump()) {
logTrace(g_conf.m_logTraceRdb, "END. %s: Tree not 90 percent full and not force dump. Returning", m_dbname);
return;
}
// bail if already dumping
{
ScopedLock sl(m_isDumpingMtx);
if (m_isDumping) {
logTrace(g_conf.m_logTraceRdb, "END. %s: Already dumping. Returning", m_dbname);
return;
}
m_isDumping = true;
}
/// @todo ALC enable threading when we have made dependency thread-safe
//s_rdbDumpThreadQueue.addItem(this);
dumpRdb(this);
log(LOG_INFO, "db: Submitted job %p to dump tree for %s", this, getDbname());
}
void Rdb::dumpRdb(void *item) {
Rdb *rdb = static_cast<Rdb*>(item);
log(LOG_INFO, "db: Processing job %p to dump tree", item);
rdb->dumpTree();
log(LOG_INFO, "db: Processed job %p to dump tree", item);
}
bool Rdb::initializeRdbDumpThread() {
return s_rdbDumpThreadQueue.initialize(dumpRdb, "dump-rdb");
}
void Rdb::finalizeRdbDumpThread() {
s_rdbDumpThreadQueue.finalize();
}
// . start dumping the tree
// . returns false and sets g_errno on error
bool Rdb::dumpTree() {
logTrace( g_conf.m_logTraceRdb, "BEGIN %s", m_dbname );
if (getNumUsedNodes() <= 0) {
logTrace( g_conf.m_logTraceRdb, "END. %s: No used nodes/keys. Returning true", m_dbname );
return true;
}
// never dump doledb any more. it's rdbtree only.
if ( m_rdbId == RDB_DOLEDB ) {
logTrace( g_conf.m_logTraceRdb, "END. %s: Rdb is doledb. Returning true", m_dbname );
return true;
}
// bail if already dumping
if ( m_isDumping ) {
logTrace( g_conf.m_logTraceRdb, "END. %s: Already dumping. Returning true", m_dbname );
return true;
}
// if it has been less than 3 seconds since our last failed attempt
// do not try again to avoid flooding our log
if ( getTime() - s_lastTryTime < 3 ) {
logTrace( g_conf.m_logTraceRdb, "END. %s: Less than 3 seconds since last attempt. Returning true", m_dbname );
return true;
}
// don't dump if not 90% full
if ( ! needsDump() ) {
if (!needsDump()) {
log(LOG_INFO, "db: %s tree not 90 percent full but dumping.",m_dbname);
}
@ -793,6 +831,7 @@ bool Rdb::dumpTree() {
// clear this for dumpCollLoop()
g_errno = 0;
m_dumpErrno = 0;
for (int collnum = 0; collnum < getNumBases(); collnum++) {
RdbBase *base = getBase(collnum);
if (base) {
@ -801,11 +840,6 @@ bool Rdb::dumpTree() {
}
}
// we have our own flag here since m_dump::m_isDumping gets
// set to true between collection dumps, RdbMem.cpp needs
// a flag that doesn't do that... see RdbDump.cpp.
m_isDumping = true;
// this returns false if blocked, which means we're ok, so we ret true
if ( ! dumpCollLoop ( ) ) {
logTrace( g_conf.m_logTraceRdb, "END. %s: dumpCollLoop blocked. Returning true", m_dbname );
@ -814,6 +848,7 @@ bool Rdb::dumpTree() {
// if it returns true with g_errno set, there was an error
if ( g_errno ) {
ScopedLock sl(m_isDumpingMtx);
logTrace( g_conf.m_logTraceRdb, "END. %s: dumpCollLoop g_error=%s. Returning false", m_dbname, mstrerror( g_errno) );
m_isDumping = false;
return false;
@ -1039,7 +1074,10 @@ void Rdb::doneDumping ( ) {
// . we have to set this here otherwise RdbMem's memory ring buffer
// will think the dumping is no longer going on and use the primary
// memory for allocating new titleRecs and such and that is not good!
m_isDumping = false;
{
ScopedLock sl(m_isDumpingMtx);
m_isDumping = false;
}
// try merge for all, first one that needs it will do it, preventing
// the rest from doing it
@ -1208,7 +1246,7 @@ bool Rdb::addList(collnum_t collnum, RdbList *list, bool checkForRoom) {
}
logTrace( g_conf.m_logTraceRdb, "%s: Not enough room. Calling dumpTree", m_dbname );
dumpTree();
submitRdbDumpJob(true);
// set g_errno after intiating the dump!
g_errno = ETRYAGAIN;
@ -1250,7 +1288,7 @@ bool Rdb::addList(collnum_t collnum, RdbList *list, bool checkForRoom) {
if ( g_errno == ENOMEM ) {
// start dumping the tree to disk so we have room 4 add
logTrace( g_conf.m_logTraceRdb, "%s: Not enough memory. Calling dumpTree", m_dbname );
dumpTree();
submitRdbDumpJob(true);
// tell caller to try again later (1 second or so)
g_errno = ETRYAGAIN;
}

13
Rdb.h

@ -208,9 +208,10 @@ public:
// tree is not balanced
bool loadTree ( ) ;
// . write out tree to a file with keys in order
// . only shift.cpp/reindex.cpp programs set niceness to 0
bool dumpTree();
static bool initializeRdbDumpThread();
static void finalizeRdbDumpThread();
void submitRdbDumpJob(bool forceDump);
bool needsDump() const;
@ -225,6 +226,7 @@ public:
bool updateToRebuildFiles ( Rdb *rdb2 , char *coll ) ;
static void doneDumpingCollWrapper(void *state);
GbMutex m_isDumpingMtx;
private:
bool addRdbBase2 ( collnum_t collnum );
@ -233,8 +235,13 @@ private:
// returns false if no room in tree or m_mem for a list to add
bool hasRoom(RdbList *list);
static void dumpRdb(void *item);
bool getTreeCollExist(collnum_t collnum) const;
// . write out tree to a file with keys in order
bool dumpTree();
bool addList(collnum_t collnum, RdbList *list, bool checkForRoom);
// get the directory name where this rdb stores its files
const char *getDir() const { return g_hostdb.m_dir; }

@ -933,7 +933,7 @@ bool Repair::dumpLoop ( ) {
Rdb **rdbs = getSecondaryRdbs ( &nsr );
for ( int32_t i = 0 ; i < nsr ; i++ ) {
Rdb *rdb = rdbs[i];
rdb->dumpTree();
rdb->submitRdbDumpJob(true);
}
g_errno = 0;
// . register sleep wrapper to check when dumping is done

@ -1645,6 +1645,11 @@ int main2 ( int argc , char *argv[] ) {
if ( g_conf.m_readOnlyMode )
log("db: -- Read Only Mode Set. Can Not Add New Data. --");
if (!Rdb::initializeRdbDumpThread()) {
logError("Unable to initialize rdb dump thread");
return 1;
}
// . collectiondb, does not use rdb, loads directly from disk
// . do this up here so RdbTree::fixTree_unlocked() can fix RdbTree::m_collnums
// . this is a fake init, cuz we pass in "true"

@ -47,6 +47,8 @@ static void deleteRdbFiles() {
void GbTest::initializeRdbs() {
ASSERT_TRUE(g_loop.init());
ASSERT_TRUE(Rdb::initializeRdbDumpThread());
ASSERT_TRUE(g_collectiondb.loadAllCollRecs());
ASSERT_TRUE(g_posdb.init());
@ -75,6 +77,8 @@ void GbTest::resetRdbs() {
g_collectiondb.reset();
Rdb::finalizeRdbDumpThread();
g_loop.reset();
new(&g_loop) Loop(); // some variables are not Loop::reset. Call the constructor to re-initialize them
}

@ -16,7 +16,7 @@ static void saveAndReloadPosdbBucket() {
}
static void dumpPosdb() {
g_posdb.getRdb()->dumpTree();
g_posdb.getRdb()->submitRdbDumpJob(true);
g_posdb.getRdb()->getBase(0)->markNewFileReadable();
g_posdb.getRdb()->getBase(0)->generateGlobalIndex();
}

@ -408,7 +408,7 @@ bool RdbListNoMergeTest::m_savedMergeConf = g_conf.m_noInMemoryPosdbMerge;
static void addListToTree(rdbid_t rdbId, collnum_t collNum, RdbList *list) {
Rdb *rdb = getRdbFromId(rdbId);
rdb->addList(collNum, list);
rdb->dumpTree();
rdb->submitRdbDumpJob(true);
rdb->getBase(0)->markNewFileReadable();
rdb->getBase(0)->generateGlobalIndex();
}