privacore-open-source-searc.../Rebalance.cpp
2018-07-20 16:05:57 +02:00

524 lines
15 KiB
C++

// Matt Wells, 2014
// TODO: if a host is being removed put # removed after it like we do # retired
// and we can at least load it up and it will move its records to the
// new guys.
#include "Rebalance.h"
#include "Rdb.h"
#include "Spider.h"
#include "Collectiondb.h"
#include "Pages.h"
#include "Spider.h"
#include "Process.h"
#include "Parms.h"
#include "max_niceness.h"
#include "Conf.h"
Rebalance g_rebalance;
Rebalance::Rebalance ( ) {
m_registered = false;
m_allowSave = false;
//m_inRebalanceLoop = false;
m_numForeignRecs = 0;
m_rebalanceCount = 0LL;
m_scannedCount = 0LL;
// reset
m_rdbNum = 0;
m_collnum = 0;
m_lastCollnum = -1;
m_lastRdb = NULL;
m_lastPercent = -1;
KEYMIN ( m_nextKey , MAX_KEY_BYTES );
KEYMAX ( m_endKey , MAX_KEY_BYTES );
m_needsRebalanceValid = false;
m_needsRebalance = (char)false;
m_warnedUser = false;
m_userApproved = false;
m_isScanning = false;
m_blocked = 0;
}
// . returns NULL if we don't know yet if we need to rebalance
// . otherwise returns ptr to the bool we want
const char *Rebalance::getNeedsRebalance ( ) {
if ( m_needsRebalanceValid )
return &m_needsRebalance;
// wait for collections and parms to be in sync. new hosts won't
// have the collection recs and subdirs...
if ( ! g_parms.inSyncWithHost0() ) return NULL;
// wait for all hosts to agree
if ( ! g_hostdb.hostsConfInAgreement() ) return NULL;
// for simplicty, only gb shards on stripe 0 should run this i guess
if ( g_hostdb.m_myHost->m_stripe != 0 ) {
m_needsRebalanceValid = true;
m_needsRebalance = (char)false;
return &m_needsRebalance;
}
// allow it to save to file now that we have almost had a chance to
// load in case it cores at startup and overwrites the file!!
m_allowSave = true;
// the last time we loaded hosts.conf we saved the checksum.
// if it changed we should think about auto-scaling. spidering
// can only occur once g_hostdb.m_hostsConfInAgreement is true.
SafeBuf sb;
sb.load ( g_hostdb.m_dir , "rebalance.txt");
// if we did not note any foreign recs, and file not there, save it
// so next time we startup we can tell if hosts.conf changed
if ( sb.length() <= 1 && m_numForeignRecs == 0 ) {
// save it!
saveRebalanceFile();
// assume we do not need a rebalance
m_needsRebalanceValid = true;
m_needsRebalance = (char)false;
return &m_needsRebalance;
}
// if does not exist, do the scan if we noted some foreign recs
if ( sb.length() <= 10 ) {
// we need it!
m_needsRebalanceValid = true;
m_needsRebalance = (char)true;
return &m_needsRebalance;
}
// otherwise, get data from the file if it is there
// we are shard #x of a total of y. if that changed or
// crc-saved.dat does not exist we have to scan. we can
// periodically save our scan progress in case we get shutdown
// mid-stream i guess.
int32_t x = 0;
int32_t y = 0;
int32_t z = 0;
int32_t rebalancing = 0;
int32_t cn;
// parse the file
char keyStr[128];
sscanf(sb.getBufStart(),
"myshard: %" PRId32"\n"
"numshards: %" PRId32"\n"
"numhostspershard: %" PRId32"\n"
"rebalancing: %" PRId32"\n"
"collnum: %" PRId32"\n"
"rdbnum: %" PRId32"\n"
"nextkey: %s\n",
&x,
&y,
&z,
// were we rebalancing last time?
&rebalancing,
// how far did we get?
&cn,
&m_rdbNum,
keyStr
);
// convert m_nextKey into an ascii string and store into keyStr
hexToBin(keyStr,strlen(keyStr), (char *)m_nextKey);
m_collnum = cn;
//m_collnum = 4695; //debug skip
//m_collnum = 18101; // just global index for now
// we are valid now either way
m_needsRebalanceValid = true;
// assume ok
m_needsRebalance = (char)false;
// if hosts.conf is different and we are part of a different
// shard then we must auto scale
if ( x != (int32_t)g_hostdb.m_myHost->m_shardNum) m_needsRebalance = (char)true;
if ( y != g_hostdb.m_numShards ) m_needsRebalance = (char)true;
if ( z != g_hostdb.getNumHostsPerShard()) m_needsRebalance = (char)true;
if ( rebalancing ) m_needsRebalance = (char)true;
// how can this be?
if ( m_numForeignRecs ) m_needsRebalance = (char)true;
// and we don't need user consent, they already did last time
if ( rebalancing ) {
// this was causing a core from starting too early!
//m_warnedUser = true;
//m_userApproved = true;
}
return &m_needsRebalance;
}
// . this is called every 500ms from Process.cpp
// . if all pings came in and all hosts have the same hosts.conf
// and if we detected any shard imbalance at startup we have to
// scan all rdbs for records that don't belong to us and send them
// where they should go
void Rebalance::rebalanceLoop ( ) {
// once this knows, it returns right away. so it is super fast
const char *np = getNeedsRebalance();
// if we don't know yet, this np is NULL
if ( ! np ) return;
// if we do not need to rebalance just return
if ( *np == 0 ) return;
// note in log
if ( ! m_warnedUser ) {
m_warnedUser = true;
log("db: CRITICAL. please click on the rebalance "
"link in master controls");
}
// . ok, we need to rebalance
// . require user to push the rebalance link to MAKE SURE!!
// . if re-starting in the middle of a prior rebalance we should
// have set this to true automatically above so we do not require
// approval each time a host is restarted
if ( ! m_userApproved ) return;
// if already scanning, we are good, just bail. check this since
// we are called from Process.cpp every 500 ms
if ( m_isScanning ) return;
// ok, flag it has officially scanning now
m_isScanning = true;
// start scanning
scanLoop();
}
void Rebalance::scanLoop ( ) {
// scan all rdbs in each coll
for ( ; m_collnum < g_collectiondb.getNumRecs(); m_collnum++ ) {
// get collrec i guess
CollectionRec *cr = g_collectiondb.getRec(m_collnum);
// skip if none... like statsdb, i guess don't rebalance!!
if ( ! cr ) continue;
// only global index for now
//if ( m_collnum != 18101 ) continue;
// new?
//if ( m_lastCollnum != m_collnum ) {
// log("rebal: rebalancing %s", cr->m_coll);
// m_lastCollnum = m_collnum;
//}
// scan all rdbs in that collection
for ( ; m_rdbNum < g_process.m_numRdbs ; m_rdbNum++ ) {
// skip if not good
Rdb *rdb = g_process.m_rdbs[m_rdbNum];
// not an RDB2
if ( rdb->isSecondaryRdb() ) continue;
// or if uninitialized
if ( ! rdb->isInitialized() ) continue;
// log it as well
if ( m_lastRdb != rdb ) {
log("rebal: scanning %s (%" PRId32") [%s]",
cr->m_coll,(int32_t)cr->m_collnum,
rdb->getDbname());
// only do this once per rdb/coll
m_lastRdb = rdb;
// reset key cursor as well!!!
KEYMIN ( m_nextKey , MAX_KEY_BYTES );
// This logic now in RdbBase.cpp.
// let's keep posdb and titledb tight-merged so
// we do not run out of disk space because we
// will be dumping tons of negative recs
//RdbBase *base = rdb->getBase(m_collnum);
//base->m_savedMin = base->m_minFilesToMerge;
//base->m_minFilesToMerge = 2;
}
// percent update?
int32_t percent = (unsigned char)m_nextKey[rdb->getKeySize()-1];
percent *= 100;
percent /= 256;
if ( percent != m_lastPercent && percent ) {
log("rebal: %" PRId32"%% complete",percent);
m_lastPercent = percent;
}
// scan it. returns true if done, false if blocked
if ( ! scanRdb ( ) ) return;
// note it
log("rebal: moved %" PRId64" of %" PRId64" recs scanned in "
"%s for coll.%s.%" PRId32,
m_rebalanceCount,m_scannedCount,
rdb->getDbname(),cr->m_coll,(int32_t)cr->m_collnum);
m_rebalanceCount = 0;
m_scannedCount = 0;
m_lastPercent = -1;
// This logic now in RdbBase.cpp.
// go back to normal merge threshold
//RdbBase *base = rdb->getBase(m_collnum);
//base->m_minFilesToMerge = base->m_savedMin;
}
// reset it for next colls
m_rdbNum = 0;
}
// done:
// all done
m_isScanning = false;
m_needsRebalance = (char)false;
// get rid of the 'F' flag in PageHosts.cpp
m_numForeignRecs = 0;
// save the file then, but with these stats:
m_collnum = 0;
m_rdbNum = 0;
KEYMIN(m_nextKey,MAX_KEY_BYTES);
log("rebal: done rebalancing all collections. "
"Saving rebalance.txt.");
saveRebalanceFile();
}
bool Rebalance::saveRebalanceFile ( ) {
if ( ! m_allowSave ) return true;
char keyStr[128];
// convert m_nextKey
binToHex ( (unsigned char *)m_nextKey , MAX_KEY_BYTES , keyStr );
//log("db: saving rebalance.txt");
StackBuf<3000> sb;
sb.safePrintf (
"myshard: %" PRId32"\n"
"numshards: %" PRId32"\n"
"numhostspershard: %" PRId32"\n"
"rebalancing: %" PRId32"\n"
"collnum: %" PRId32"\n"
"rdbnum: %" PRId32"\n"
"nextkey: %s\n",
(int32_t)g_hostdb.m_myHost->m_shardNum,
(int32_t)g_hostdb.m_numShards,
(int32_t)g_hostdb.getNumHostsPerShard(),
// were we rebalancing last time?
(int32_t)m_isScanning,
// how far did we get?
(int32_t)m_collnum,
(int32_t)m_rdbNum,
keyStr
);
return sb.save ( g_hostdb.m_dir , "rebalance.txt" );
}
static void gotListWrapper ( void *state , RdbList *list, Msg5 *msg5 ) {
// . this can block if a msg4 blocks, in which case it returns false
// . when its msg4 callback is called it calls scanLoop() from there
if ( ! g_rebalance.gotList() ) return;
// init another rdb scan pass
g_rebalance.scanLoop();
}
static void sleepWrapper ( int fd , void *state ) {
// try a re-call since we were merging last time
g_rebalance.scanLoop();
}
bool Rebalance::scanRdb ( ) {
// get collrec i guess
//CollectionRec *cr = g_collectiondb.m_recs[m_collnum];
Rdb *rdb = g_process.m_rdbs[m_rdbNum];
// unregister it if it was registered
if ( m_registered ) {
g_loop.unregisterSleepCallback ( NULL,sleepWrapper );
m_registered = false;
}
if (g_process.isShuttingDown()) return false;
// . if this rdb is merging wait until merge is done
// . we will be dumping out a lot of negative recs and if we are
// short on disk space we need to merge them in immediately with
// all our data so that they annihilate quickly with the positive
// keys in there to free up more disk
RdbBase *base = rdb->getBase ( m_collnum );
// base is NULL for like monitordb...
if ( base && base->isMerging() ) {
log("rebal: waiting for merge on %s for coll #%" PRId32" to complete",
rdb->getDbname(),(int32_t)m_collnum);
g_loop.registerSleepCallback(1000, NULL, sleepWrapper, "Rebalance::sleepWrapper", 1);
m_registered = true;
// we blocked, return false
return false;
}
// or really if any merging is going on way for it to save disk space
if ( rdb->isMerging() ) {
log("rebal: waiting for merge on %s for coll ??? to complete",
rdb->getDbname());
g_loop.registerSleepCallback(1000, NULL, sleepWrapper, "Rebalance::sleepWrapper", 1);
m_registered = true;
// we blocked, return false
return false;
}
for(;;) {
if (g_process.isShuttingDown()) return false;
//log("rebal: loading list start = %s",KEYSTR(m_nextKey,rdb->m_ks));
if ( ! m_msg5.getList ( rdb->getRdbId() ,
m_collnum, // coll ,
&m_list ,
m_nextKey ,
m_endKey , // should be maxed!
100024 , // min rec sizes
true , // include tree?
0 , // startFileNum
-1 , // m_numFiles
this , // state
gotListWrapper , // callback
MAX_NICENESS , // niceness
true , // do error correction?
-1 , // maxRetries
false)) // isRealMerge
return false;
//
// msg5 did not block on i/o if we made it here
//
// all done if list empty
if ( m_list.isEmpty() ) return true;
// process that list. return false if blocked.
if ( ! gotList() ) return false;
}
}
static void doneAddingMetaWrapper ( void *state ) {
if ( g_rebalance.m_blocked <= 0 ) { g_process.shutdownAbort(true); }
g_rebalance.m_blocked--;
// wait for other msg4 add to complete
if ( g_rebalance.m_blocked > 0 ) return;
// ok, both msg4s are done, resume
g_rebalance.scanLoop();
}
// scan that list
bool Rebalance::gotList ( ) {
if ( m_blocked ) { g_process.shutdownAbort(true); }
Rdb *rdb = g_process.m_rdbs[m_rdbNum];
rdbid_t rdbId = rdb->getRdbId();
int32_t ks = rdb->getKeySize();
int32_t myShard = g_hostdb.m_myHost->m_shardNum;
m_list.resetListPtr();
//log("rebal: got list of %" PRId32" bytes",m_list.getListSize());
m_posMetaList.reset();
m_negMetaList.reset();
if ( m_list.isEmpty() ) {
KEYSET ( m_nextKey , m_endKey , ks );
return true;
}
//char *last = NULL;
for ( ; ! m_list.isExhausted() ; m_list.skipCurrentRecord() ) {
// get tht rec
//char *rec = m_list.getCurrentRec();
// get it
m_list.getCurrentKey ( m_nextKey );
// skip if negative... wtf?
if ( KEYNEG(m_nextKey) ) continue;
// get shard
int32_t shard = getShardNum ( rdbId , m_nextKey );
// save last ptr
//last = rec;
// debug!
//log("rebal: checking key %s",KEYSTR(m_nextKey,ks));
// count as scanned
m_scannedCount++;
// skip it if it belongs with us
if ( shard == myShard ) continue;
// note it
//log("rebal: shard is %" PRId32,shard);
// count it
m_rebalanceCount++;
// otherwise, it does not!
//int32_t recSize = m_list.getCurrentRecSize();
// copy the full key into "key" buf because might be compressed
char key[MAX_KEY_BYTES];
m_list.getCurrentKey ( key );
// store rdbid, no! we supply rdbid below to msg4
//m_posMetaList.pushChar ( rdbId );
// first key
m_posMetaList.safeMemcpy ( key , ks );
// then record
int32_t dataSize = rdb->getFixedDataSize();
if ( dataSize == -1 ) {
dataSize = m_list.getCurrentDataSize();
m_posMetaList.pushLong ( dataSize );
}
// then data
if ( dataSize ) {
char *data = m_list.getCurrentData();
m_posMetaList.safeMemcpy ( data , dataSize );
}
//
// NOW DELETE FROM OUR SHARD!
//
// store rdbid, no! we supply rdbid below to msg4
//m_negMetaList.pushChar ( rdbId );
// make key a delete
key[0] &= 0xfe;
// for debug...
//log("rebal: rm key %s",KEYSTR(key,ks));
// and store that negative key
m_negMetaList.safeMemcpy ( key , ks );
}
// update nextkey
//if ( last ) {
if ( ! m_list.isEmpty() ) {
// get the last key we scanned, all "ks" bytes of it.
// because some keys are compressed and we take the
// more significant compressed out bytes from m_list.m_*
// member vars
//m_list.getKey ( last , m_nextKey );
// if it is not maxxed out, then incremenet it for the
// next scan round
if ( KEYCMP ( m_nextKey , KEYMAX() , ks ) != 0 )
KEYINC ( m_nextKey , ks );
}
if (!m_msg4a.addMetaList(&m_posMetaList, m_collnum, this, doneAddingMetaWrapper, rdb->getRdbId(), -1)) { // shard override, not!
++m_blocked;
}
if (!m_msg4b.addMetaList(&m_negMetaList, m_collnum, this, doneAddingMetaWrapper, rdb->getRdbId(), myShard)) { // shard override, not!
++m_blocked;
}
return ( m_blocked == 0 );
}