// Matt Wells, 2014

// TODO: if a host is being removed put # removed after it like we do # retired
//       and we can at least load it up and it will move its records to the 
//       new guys.


#include "Rebalance.h"

#include "gb-include.h"

#include "Rdb.h"
#include "Spider.h"
#include "Collectiondb.h"
#include "Pages.h"
#include "Spider.h"
#include "Process.h"
#include "Parms.h"
#include "max_niceness.h"
#include "Conf.h"

Rebalance g_rebalance;

Rebalance::Rebalance ( ) {
	m_registered = false;
	m_allowSave = false;
	//m_inRebalanceLoop = false;
	m_numForeignRecs = 0;
	m_rebalanceCount = 0LL;
	m_scannedCount = 0LL;
	// reset 
	m_rdbNum = 0;
	m_collnum = 0;
	m_lastCollnum = -1;
	m_lastRdb = NULL;
	m_lastPercent = -1;
	KEYMIN ( m_nextKey , MAX_KEY_BYTES );
	KEYMAX ( m_endKey , MAX_KEY_BYTES );
	m_needsRebalanceValid = false;
	m_needsRebalance = (char)false;
	m_warnedUser = false;
	m_userApproved = false;
	m_isScanning = false;
	m_blocked = 0;
}

// . returns NULL if we don't know yet if we need to rebalance
// . otherwise returns ptr to the bool we want
const char *Rebalance::getNeedsRebalance ( ) {

	if ( m_needsRebalanceValid )
		return &m_needsRebalance;

	// wait for collections and parms to be in sync. new hosts won't
	// have the collection recs and subdirs...
	if ( ! g_parms.inSyncWithHost0() ) return NULL;

	// wait for all hosts to agree
	if ( ! g_hostdb.hostsConfInAgreement() ) return NULL;

	// for simplicty,  only gb shards on stripe 0 should run this i guess
	if ( g_hostdb.m_myHost->m_stripe != 0 ) {
		m_needsRebalanceValid = true;
		m_needsRebalance = (char)false;
		return &m_needsRebalance;
	}

	// allow it to save to file now that we have almost had a chance to
	// load in case it cores at startup and overwrites the file!!
	m_allowSave = true;

	// the last time we loaded hosts.conf we saved the checksum.
	// if it changed we should think about auto-scaling. spidering
	// can only occur once g_hostdb.m_hostsConfInAgreement is true.
	SafeBuf sb;
	sb.load ( g_hostdb.m_dir , "rebalance.txt");

	// if we did not note any foreign recs, and file not there, save it
	// so next time we startup we can tell if hosts.conf changed
	if ( sb.length() <= 1 && m_numForeignRecs == 0 ) {
		// save it!
		saveRebalanceFile();
		// assume we do not need a rebalance
		m_needsRebalanceValid = true;
		m_needsRebalance = (char)false;
		return &m_needsRebalance;
	}

	// if does not exist, do the scan if we noted some foreign recs
	if ( sb.length() <= 10 ) {
		// we need it!
		m_needsRebalanceValid = true;
		m_needsRebalance = (char)true;
		return &m_needsRebalance;
	}

	// otherwise, get data from the file if it is there
	// we are shard #x of a total of y. if that changed or 
	// crc-saved.dat does not exist we have to scan. we can 
	// periodically save our scan progress in case we get shutdown
	// mid-stream i guess.
	int32_t x = 0;
	int32_t y = 0;
	int32_t z = 0;
	int32_t rebalancing = 0;
	int32_t cn;
	// parse the file
	char keyStr[128];
	sscanf(sb.getBufStart(),
	       "myshard: %" PRId32"\n"
	       "numshards: %" PRId32"\n"
	       "numhostspershard: %" PRId32"\n"
	       "rebalancing: %" PRId32"\n"
	       "collnum: %" PRId32"\n"
	       "rdbnum: %" PRId32"\n"
	       "nextkey: %s\n",
	       &x,
	       &y,
	       &z,
	       // were we rebalancing last time?
	       &rebalancing,
	       // how far did we get?
	       &cn,
	       &m_rdbNum,
	       keyStr
	       );

	// convert m_nextKey into an ascii string and store into keyStr
	hexToBin(keyStr,strlen(keyStr), (char *)m_nextKey);

	m_collnum = cn;
	//m_collnum = 4695; //debug skip

	//m_collnum = 18101; // just global index for now

	// we are valid now either way
	m_needsRebalanceValid = true;
	// assume ok
	m_needsRebalance = (char)false;
	// if hosts.conf is different and we are part of a different 
	// shard then we must auto scale
	if ( x != (int32_t)g_hostdb.m_myHost->m_shardNum) m_needsRebalance = (char)true;
	if ( y != g_hostdb.m_numShards          ) m_needsRebalance = (char)true;
	if ( z != g_hostdb.getNumHostsPerShard()) m_needsRebalance = (char)true;
	if ( rebalancing                        ) m_needsRebalance = (char)true;

	// how can this be?
	if ( m_numForeignRecs ) m_needsRebalance = (char)true;

	// and we don't need user consent, they already did last time
	if ( rebalancing ) {
		// this was causing a core from starting too early!
		//m_warnedUser   = true;
		//m_userApproved = true;
	}

	return &m_needsRebalance;
}

// . this is called every 500ms from Process.cpp
// . if all pings came in and all hosts have the same hosts.conf
//   and if we detected any shard imbalance at startup we have to
//   scan all rdbs for records that don't belong to us and send them
//   where they should go
void Rebalance::rebalanceLoop ( ) {

	// once this knows, it returns right away. so it is super fast
	const char *np = getNeedsRebalance();
	// if we don't know yet, this np is NULL
	if ( ! np ) return;
	// if we do not need to rebalance just return
	if ( *np == 0 ) return;

	// note in log
	if ( ! m_warnedUser ) {
		m_warnedUser = true;
		log("db: CRITICAL. please click on the rebalance "
		    "link in master controls");
	}

	// . ok, we need to rebalance
	// . require user to push the rebalance link to MAKE SURE!!
	// . if re-starting in the middle of a prior rebalance we should
	//   have set this to true automatically above so we do not require
	//   approval each time a host is restarted
	if ( ! m_userApproved ) return;

	// if already scanning, we are good, just bail. check this since
	// we are called from Process.cpp every 500 ms
	if ( m_isScanning ) return;

	// ok, flag it has officially scanning now
	m_isScanning = true;

	// start scanning
	scanLoop();
}

void Rebalance::scanLoop ( ) {

	// scan all rdbs in each coll
	for ( ; m_collnum < g_collectiondb.getNumRecs(); m_collnum++ ) {
		// get collrec i guess
		CollectionRec *cr = g_collectiondb.getRec(m_collnum);
		// skip if none... like statsdb, i guess don't rebalance!!
		if ( ! cr ) continue;

		// only global index for now
		//if ( m_collnum != 18101 ) continue;

		// new?
		//if ( m_lastCollnum != m_collnum ) {
		//	log("rebal: rebalancing %s", cr->m_coll);
		//	m_lastCollnum = m_collnum;
		//}

		// scan all rdbs in that collection
		for ( ; m_rdbNum < g_process.m_numRdbs ; m_rdbNum++ ) {
			// skip if not good
			Rdb *rdb = g_process.m_rdbs[m_rdbNum];
			// not an RDB2
			if ( rdb->isSecondaryRdb() ) continue;
			// or if uninitialized
			if ( ! rdb->isInitialized() ) continue;
			// log it as well
			if ( m_lastRdb != rdb ) {
				log("rebal: scanning %s (%" PRId32") [%s]",
				    cr->m_coll,(int32_t)cr->m_collnum,
				    rdb->getDbname());
				// only do this once per rdb/coll
				m_lastRdb = rdb;
				// reset key cursor as well!!!
				KEYMIN ( m_nextKey , MAX_KEY_BYTES );

				// This logic now in RdbBase.cpp.
				// let's keep posdb and titledb tight-merged so
				// we do not run out of disk space because we 
				// will be dumping tons of negative recs
				//RdbBase *base = rdb->getBase(m_collnum);
				//base->m_savedMin = base->m_minFilesToMerge;
				//base->m_minFilesToMerge = 2;

			}
			// percent update?
			int32_t percent = (unsigned char)m_nextKey[rdb->getKeySize()-1];
			percent *= 100;
			percent /= 256;
			if ( percent != m_lastPercent && percent ) {
				log("rebal: %" PRId32"%% complete",percent);
				m_lastPercent = percent;
			}
			// scan it. returns true if done, false if blocked
			if ( ! scanRdb ( ) ) return;
			// note it
			log("rebal: moved %" PRId64" of %" PRId64" recs scanned in "
			    "%s for coll.%s.%" PRId32,
			    m_rebalanceCount,m_scannedCount,
			    rdb->getDbname(),cr->m_coll,(int32_t)cr->m_collnum);
			m_rebalanceCount = 0;
			m_scannedCount = 0;
			m_lastPercent = -1;

			// This logic now in RdbBase.cpp.
			// go back to normal merge threshold
			//RdbBase *base = rdb->getBase(m_collnum);
			//base->m_minFilesToMerge = base->m_savedMin;

		}
		// reset it for next colls
		m_rdbNum = 0;
	}

	// done:
	// all done
	m_isScanning     = false;
	m_needsRebalance = (char)false;

	// get rid of the 'F' flag in PageHosts.cpp
	m_numForeignRecs = 0;

	// save the file then, but with these stats:
	m_collnum = 0;
	m_rdbNum = 0;
	KEYMIN(m_nextKey,MAX_KEY_BYTES);

	log("rebal: done rebalancing all collections. "
	    "Saving rebalance.txt.");

	saveRebalanceFile();
}

bool Rebalance::saveRebalanceFile ( ) {

	if ( ! m_allowSave ) return true;

	char keyStr[128];
	// convert m_nextKey 
	binToHex ( (unsigned char *)m_nextKey , MAX_KEY_BYTES , keyStr );

	//log("db: saving rebalance.txt");
	StackBuf<3000> sb;
	sb.safePrintf (
		       "myshard: %" PRId32"\n"
		       "numshards: %" PRId32"\n"
		       "numhostspershard: %" PRId32"\n"
		       "rebalancing: %" PRId32"\n"
		       "collnum: %" PRId32"\n"
		       "rdbnum: %" PRId32"\n"
		       "nextkey: %s\n",
		       (int32_t)g_hostdb.m_myHost->m_shardNum,
		       (int32_t)g_hostdb.m_numShards,
		       (int32_t)g_hostdb.getNumHostsPerShard(),
		       // were we rebalancing last time?
		       (int32_t)m_isScanning,
		       // how far did we get?
		       (int32_t)m_collnum,
		       (int32_t)m_rdbNum,
		       keyStr
		       );

	return sb.save ( g_hostdb.m_dir , "rebalance.txt" );
}

static void gotListWrapper ( void *state , RdbList *list, Msg5 *msg5 ) {
	// . this can block if a msg4 blocks, in which case it returns false
	// . when its msg4 callback is called it calls scanLoop() from there
	if ( ! g_rebalance.gotList() ) return;
	// init another rdb scan pass
	g_rebalance.scanLoop();
}

static void sleepWrapper ( int fd , void *state ) {
	// try a re-call since we were merging last time
	g_rebalance.scanLoop();
}

bool Rebalance::scanRdb ( ) {

	// get collrec i guess
	//CollectionRec *cr = g_collectiondb.m_recs[m_collnum];

	Rdb *rdb = g_process.m_rdbs[m_rdbNum];

	// unregister it if it was registered
	if ( m_registered ) {
		g_loop.unregisterSleepCallback ( NULL,sleepWrapper );
		m_registered = false;
	}

	if (g_process.isShuttingDown()) return false;

	// . if this rdb is merging wait until merge is done
	// . we will be dumping out a lot of negative recs and if we are
	//   short on disk space we need to merge them in immediately with
	//   all our data so that they annihilate quickly with the positive
	//   keys in there to free up more disk
	RdbBase *base = rdb->getBase ( m_collnum );
	// base is NULL for like monitordb...
	if ( base && base->isMerging() ) {
		log("rebal: waiting for merge on %s for coll #%" PRId32" to complete",
		    rdb->getDbname(),(int32_t)m_collnum);
		g_loop.registerSleepCallback(1000, NULL, sleepWrapper, "Rebalance::sleepWrapper", 1);
		m_registered = true;
		// we blocked, return false
		return false;
	}
	// or really if any merging is going on way for it to save disk space
	if ( rdb->isMerging() ) {
		log("rebal: waiting for merge on %s for coll ??? to complete",
		    rdb->getDbname());
		g_loop.registerSleepCallback(1000, NULL, sleepWrapper, "Rebalance::sleepWrapper", 1);
		m_registered = true;
		// we blocked, return false
		return false;
	}

	for(;;) {
		if (g_process.isShuttingDown()) return false;

		//log("rebal: loading list start = %s",KEYSTR(m_nextKey,rdb->m_ks));

		if ( ! m_msg5.getList ( rdb->getRdbId()     ,
					m_collnum, // coll             ,
					&m_list          ,
					m_nextKey        ,
					m_endKey         , // should be maxed!
					100024             , // min rec sizes
					true             , // include tree?
					0                , // startFileNum
					-1               , // m_numFiles   
					this             , // state 
					gotListWrapper   , // callback
					MAX_NICENESS     , // niceness
					true             , // do error correction?
					-1               , // maxRetries
					false))             // isRealMerge
			return false;

		//
		// msg5 did not block on i/o if we made it here
		//

		// all done if list empty
		if ( m_list.isEmpty() ) return true;

		// process that list. return false if blocked.
		if ( ! gotList() ) return false;

	}
}

static void doneAddingMetaWrapper ( void *state ) {
	if ( g_rebalance.m_blocked <= 0 ) { g_process.shutdownAbort(true); }
	g_rebalance.m_blocked--;
	// wait for other msg4 add to complete
	if ( g_rebalance.m_blocked > 0 ) return;
	// ok, both msg4s are done, resume
	g_rebalance.scanLoop();
}

// scan that list
bool Rebalance::gotList ( ) {

	if ( m_blocked ) { g_process.shutdownAbort(true); }

	Rdb *rdb = g_process.m_rdbs[m_rdbNum];

	rdbid_t rdbId = rdb->getRdbId();

	int32_t ks = rdb->getKeySize();

	int32_t myShard = g_hostdb.m_myHost->m_shardNum;

	m_list.resetListPtr();

	//log("rebal: got list of %" PRId32" bytes",m_list.getListSize());

	m_posMetaList.reset();
	m_negMetaList.reset();

	if ( m_list.isEmpty() ) {
	        KEYSET ( m_nextKey , m_endKey , ks );
		return true;
	}

	//char *last = NULL;

	for ( ; ! m_list.isExhausted() ; m_list.skipCurrentRecord() ) {
		// get tht rec
		//char *rec = m_list.getCurrentRec();
		// get it
		m_list.getCurrentKey  ( m_nextKey );
		// skip if negative... wtf?
		if ( KEYNEG(m_nextKey) ) continue;
		// get shard
		int32_t shard = getShardNum ( rdbId , m_nextKey );
		// save last ptr
		//last = rec;
		// debug!
		//log("rebal: checking key %s",KEYSTR(m_nextKey,ks));
		// count as scanned
		m_scannedCount++;
		// skip it if it belongs with us
		if ( shard == myShard ) continue;
		// note it
		//log("rebal: shard is %" PRId32,shard);
		// count it
		m_rebalanceCount++;
		// otherwise, it does not!
		//int32_t recSize = m_list.getCurrentRecSize();
		// copy the full key into "key" buf because might be compressed
		char key[MAX_KEY_BYTES];
		m_list.getCurrentKey ( key );
		// store rdbid, no! we supply rdbid below to msg4
		//m_posMetaList.pushChar ( rdbId );
		// first key
		m_posMetaList.safeMemcpy ( key , ks );
		// then record
		int32_t dataSize = rdb->getFixedDataSize();
		if ( dataSize == -1 ) {
			dataSize = m_list.getCurrentDataSize();
			m_posMetaList.pushLong ( dataSize );
		}
		// then data
		if ( dataSize ) {
			char *data = m_list.getCurrentData();
			m_posMetaList.safeMemcpy ( data , dataSize );
		}
		//
		// NOW DELETE FROM OUR SHARD!
		//
		// store rdbid, no! we supply rdbid below to msg4
		//m_negMetaList.pushChar ( rdbId );
		// make key a delete
		key[0] &= 0xfe;
		// for debug...
		//log("rebal: rm key %s",KEYSTR(key,ks));
		// and store that negative key
		m_negMetaList.safeMemcpy ( key , ks );
	}

	//  update nextkey
	//if ( last ) {
	if ( ! m_list.isEmpty() ) {
		// get the last key we scanned, all "ks" bytes of it.
		// because some keys are compressed and we take the
		// more significant compressed out bytes from m_list.m_*
		// member vars
		//m_list.getKey  ( last , m_nextKey );
		// if it is not maxxed out, then incremenet it for the
		// next scan round
		if ( KEYCMP ( m_nextKey , KEYMAX() , ks ) != 0 )
			KEYINC ( m_nextKey , ks );
	}

	if (!m_msg4a.addMetaList(&m_posMetaList, m_collnum, this, doneAddingMetaWrapper, rdb->getRdbId(), -1)) { // shard override, not!
		++m_blocked;
	}

	if (!m_msg4b.addMetaList(&m_negMetaList, m_collnum, this, doneAddingMetaWrapper, rdb->getRdbId(), myShard)) { // shard override, not!
		++m_blocked;
	}

	return ( m_blocked == 0 );
}