fix collection swap logic a bunch. seems to work now.

This commit is contained in:
mwells
2014-09-29 13:05:20 -07:00
parent 257a7e3c10
commit bca24fb0e6
6 changed files with 99 additions and 22 deletions

@ -588,18 +588,45 @@ bool Collectiondb::addNewColl ( char *coll ,
return true;
}
void CollectionRec::setBasePtr ( char rdbId , class RdbBase *base ) {
// if in the process of swapping in, this will be false...
//if ( m_swappedOut ) { char *xx=NULL;*xx=0; }
if ( rdbId < 0 || rdbId >= RDB_END ) { char *xx=NULL;*xx=0; }
if ( m_bases[ (unsigned char)rdbId ]){ char *xx=NULL;*xx=0; }
m_bases [ (unsigned char)rdbId ] = base;
}
RdbBase *CollectionRec::getBasePtr ( char rdbId ) {
if ( rdbId < 0 || rdbId >= RDB_END ) { char *xx=NULL;*xx=0; }
return m_bases [ (unsigned char)rdbId ];
}
static bool s_inside = false;
// returns NULL w/ g_errno set on error.
RdbBase *CollectionRec::getBase ( char rdbId ) {
if ( s_inside ) { char *xx=NULL;*xx=0; }
if ( ! m_swappedOut ) return m_bases[(unsigned char)rdbId];
log("cdb: swapin collnum=%li",(long)m_collnum);
s_inside = true;
// load them back in. return NULL w/ g_errno set on error.
if ( ! g_collectiondb.addRdbBasesForCollRec ( this ) ) return NULL;
if ( ! g_collectiondb.addRdbBasesForCollRec ( this ) ) {
log("coll: error swapin: %s",mstrerror(g_errno));
return NULL;
}
s_inside = false;
g_collectiondb.m_numCollsSwappedOut--;
m_swappedOut = false;
log("coll: swapin was successful for collnum=%li",(long)m_collnum);
return m_bases[(unsigned char)rdbId];
}
@ -607,6 +634,8 @@ bool CollectionRec::swapOut ( ) {
if ( m_swappedOut ) return true;
log("cdb: swapout collnum=%li",(long)m_collnum);
// free all RdbBases in each rdb
for ( long i = 0 ; i < g_process.m_numRdbs ; i++ ) {
Rdb *rdb = g_process.m_rdbs[i];
@ -639,10 +668,13 @@ bool Collectiondb::registerCollRec ( CollectionRec *cr , bool isNew ) {
return true;
}
// swap it in
bool Collectiondb::addRdbBaseToAllRdbsForEachCollRec ( ) {
for ( long i = 0 ; i < m_numRecs ; i++ ) {
CollectionRec *cr = m_recs[i];
if ( ! cr ) continue;
// skip if swapped out
if ( cr->m_swappedOut ) continue;
// add rdb base files etc. for it
addRdbBasesForCollRec ( cr );
}

@ -1277,12 +1277,8 @@ class CollectionRec {
class RdbBase *getBase ( char rdbId );
// Rdb.cpp uses this after deleting an RdbBase and adding new one
void setBasePtr ( char rdbId , class RdbBase *base ) {
m_bases [ (unsigned char)rdbId ] = base;
};
class RdbBase *getBasePtr ( char rdbId ) {
return m_bases [ (unsigned char)rdbId ];
};
void setBasePtr ( char rdbId , class RdbBase *base ) ;
class RdbBase *getBasePtr ( char rdbId ) ;
private:
// . now chuck this into CollectionRec instead of having a fixed

@ -584,7 +584,7 @@ Loop::Loop ( ) {
m_inQuickPoll = false;
m_needsToQuickPoll = false;
m_canQuickPoll = false;
m_isDoingLoop = false;
// set all callbacks to NULL so we know they're empty
for ( long i = 0 ; i < MAX_NUM_FDS+2 ; i++ ) {
@ -1220,6 +1220,8 @@ bool Loop::runLoop ( ) {
// . makes g_udpServer2 quite jumpy
g_loop.interruptsOn();
m_isDoingLoop = true;
//mdw:enableTimer();
// . now loop forever waiting for signals

1
Loop.h

@ -175,6 +175,7 @@ class Loop {
itimerval m_quickInterrupt;
itimerval m_realInterrupt;
itimerval m_noInterrupt;
bool m_isDoingLoop;
// call this when you don't want to be interrupted
void interruptsOff ( ) ;
// and this to resume being interrupted

68
Rdb.cpp

@ -515,7 +515,7 @@ bool Rdb::addRdbBase2 ( collnum_t collnum ) { // addColl2()
// . ensure no previous one exists
// . well it will be there but will be uninitialized, m_rdb will b NULL
RdbBase *base = NULL;
if ( cr ) base = cr->getBasePtr ( collnum );
if ( cr ) base = cr->getBasePtr ( m_rdbId );
if ( base ) { // m_bases [ collnum ] ) {
g_errno = EBADENGINEER;
return log("db: Rdb for db \"%s\" and "
@ -1050,8 +1050,12 @@ bool Rdb::saveMaps ( bool useThread ) {
// if ( m_bases[i] ) m_bases[i]->saveMaps ( useThread );
// now loop over bases
for ( long i = 0 ; i < getNumBases() ; i++ ) {
CollectionRec *cr = g_collectiondb.m_recs[i];
if ( ! cr ) continue;
// if swapped out, this will be NULL, so skip it
RdbBase *base = cr->getBasePtr(m_rdbId);
// shut it down
RdbBase *base = getBase(i);
//RdbBase *base = getBase(i);
//if ( m_bases[i] ) m_bases[i]->closeMaps ( m_urgent );
//if ( base ) base->closeMaps ( m_urgent );
if ( base ) base->saveMaps ( useThread );
@ -1253,7 +1257,11 @@ bool Rdb::dumpTree ( long niceness ) {
// . keep the number of files down
// . dont dump all the way up to the max, leave one open for merging
for ( long i = 0 ; i < getNumBases() ; i++ ) {
RdbBase *base = getBase(i);
CollectionRec *cr = g_collectiondb.m_recs[i];
if ( ! cr ) continue;
// if swapped out, this will be NULL, so skip it
RdbBase *base = cr->getBasePtr(m_rdbId);
//RdbBase *base = getBase(i);
if ( base && base->m_numFiles >= max ) {
base->attemptMerge (1,false);//niceness,forced?
g_errno = ETOOMANYFILES;
@ -1395,10 +1403,14 @@ bool Rdb::gotTokenForDump ( ) {
bool Rdb::dumpCollLoop ( ) {
loop:
CollectionRec *cr = g_collectiondb.m_recs[m_dumpCollnum];
if ( ! cr ) return true;
// the only was g_errno can be set here is from a previous dump
// error?
if ( g_errno ) {
RdbBase *base = getBase(m_dumpCollnum);
// if swapped out, this will be NULL, so skip it
RdbBase *base = cr->getBasePtr(m_rdbId);
//RdbBase *base = getBase(m_dumpCollnum);
log("build: Error dumping collection: %s.",mstrerror(g_errno));
// . if we wrote nothing, remove the file
// . if coll was deleted under us, base will be NULL!
@ -1418,12 +1430,13 @@ bool Rdb::dumpCollLoop ( ) {
// advance
m_dumpCollnum++;
// advance m_dumpCollnum until we have a non-null RdbBase
while ( m_dumpCollnum < getNumBases() && ! getBase(m_dumpCollnum) )
while ( m_dumpCollnum < getNumBases() &&
! cr->getBasePtr (m_rdbId) )
m_dumpCollnum++;
// if no more, we're done...
if ( m_dumpCollnum >= getNumBases() ) return true;
RdbBase *base = getBase(m_dumpCollnum);
RdbBase *base = cr->getBasePtr(m_rdbId);//m_dumpCollnum);
// before we create the file, see if tree has anything for this coll
//key_t k; k.setMin();
@ -1670,8 +1683,14 @@ void attemptMergeAll ( int fd , void *state ) {
// called by main.cpp
void Rdb::attemptMerge ( long niceness , bool forced , bool doLog ) {
for ( long i = 0 ; i < getNumBases() ; i++ ) {
RdbBase *base = getBase(i);
CollectionRec *cr = g_collectiondb.m_recs[i];
if ( ! cr ) continue;
// if swapped out, this will be NULL, so skip it
RdbBase *base = cr->getBasePtr(m_rdbId);
//RdbBase *base = getBase(i);
if ( ! base ) continue;
base->attemptMerge(niceness,forced,doLog);
// stop if we got unlink/rename threads out from a merge
@ -2640,7 +2659,10 @@ long long Rdb::getNumTotalRecs ( bool useCache ) {
//return 0; // too many collections!!
for ( long i = 0 ; i < nb ; i++ ) {
RdbBase *base = getBase(i);
CollectionRec *cr = g_collectiondb.m_recs[i];
if ( ! cr ) continue;
// if swapped out, this will be NULL, so skip it
RdbBase *base = cr->getBasePtr(m_rdbId);
if ( ! base ) continue;
total += base->getNumTotalRecs();
}
@ -2661,7 +2683,11 @@ long long Rdb::getNumTotalRecs ( bool useCache ) {
long long Rdb::getMapMemAlloced () {
long long total = 0;
for ( long i = 0 ; i < getNumBases() ; i++ ) {
RdbBase *base = getBase(i);
// skip null base if swapped out
CollectionRec *cr = g_collectiondb.m_recs[m_dumpCollnum];
if ( ! cr ) return true;
RdbBase *base = cr->getBasePtr(m_rdbId);
//RdbBase *base = getBase(i);
if ( ! base ) continue;
total += base->getMapMemAlloced();
}
@ -2672,7 +2698,11 @@ long long Rdb::getMapMemAlloced () {
long Rdb::getNumSmallFiles ( ) {
long total = 0;
for ( long i = 0 ; i < getNumBases() ; i++ ) {
RdbBase *base = getBase(i);
// skip null base if swapped out
CollectionRec *cr = g_collectiondb.m_recs[m_dumpCollnum];
if ( ! cr ) return true;
RdbBase *base = cr->getBasePtr(m_rdbId);
//RdbBase *base = getBase(i);
if ( ! base ) continue;
total += base->getNumSmallFiles();
}
@ -2683,7 +2713,11 @@ long Rdb::getNumSmallFiles ( ) {
long Rdb::getNumFiles ( ) {
long total = 0;
for ( long i = 0 ; i < getNumBases() ; i++ ) {
RdbBase *base = getBase(i);
CollectionRec *cr = g_collectiondb.m_recs[i];
if ( ! cr ) continue;
// if swapped out, this will be NULL, so skip it
RdbBase *base = cr->getBasePtr(m_rdbId);
//RdbBase *base = getBase(i);
if ( ! base ) continue;
total += base->getNumFiles();
}
@ -2693,7 +2727,11 @@ long Rdb::getNumFiles ( ) {
long long Rdb::getDiskSpaceUsed ( ) {
long long total = 0;
for ( long i = 0 ; i < getNumBases() ; i++ ) {
RdbBase *base = getBase(i);
CollectionRec *cr = g_collectiondb.m_recs[i];
if ( ! cr ) continue;
// if swapped out, this will be NULL, so skip it
RdbBase *base = cr->getBasePtr(m_rdbId);
//RdbBase *base = getBase(i);
if ( ! base ) continue;
total += base->getDiskSpaceUsed();
}
@ -2705,7 +2743,11 @@ bool Rdb::isMerging ( ) {
return (bool)m_numMergesOut;
for ( long i = 0 ; i < getNumBases() ; i++ ) {
RdbBase *base = getBase(i);
CollectionRec *cr = g_collectiondb.m_recs[i];
if ( ! cr ) continue;
// if swapped out, this will be NULL, so skip it
RdbBase *base = cr->getBasePtr(m_rdbId);
//RdbBase *base = getBase(i);
if ( ! base ) continue;
if ( base->isMerging() ) return true;
}

@ -2425,6 +2425,10 @@ bool RdbBase::verifyFileSharding ( ) {
if ( m_rdb->m_isCollectionLess ) return true;
// if swapping in from CollectionRec::getBase() then do
// not re-verify file sharding! only do at startup
if ( g_loop.m_isDoingLoop ) return true;
g_threads.disableThreads();
Msg5 msg5;