// Copyright 2008, Gigablast Inc.
#include "DailyMerge.h"
#include "Hostdb.h"
#include "HostFlags.h"
#include "Repair.h"
#include "Rdb.h"
#include "Process.h" // g_process.m_processStartTime
#include "Spider.h"
#include "SpiderColl.h"
#include "SpiderLoop.h"
#include "Proxy.h"
#include "Linkdb.h"
#include "Conf.h"
#include "Collectiondb.h"

static void dailyMergeWrapper ( int fd , void *state ) ;

// the global class
DailyMerge g_dailyMerge;

// main.cpp calls g_dailyMerge.init()
bool DailyMerge::init ( ) {
	// reset these
	m_cr        = NULL;
	m_mergeMode = 0;
	m_didDaily  = false;
	// check every 10 seconds
	if (!g_loop.registerSleepCallback(10 * 1000, NULL, dailyMergeWrapper, "DailyMerge::dailyMergeWrapper")) {
		log( LOG_WARN, "repair: Failed register callback.");
		return false;
	}
	return true;
}

// . call this once every second 
// . this is responsible for advancing from one g_repairMode to the next
void dailyMergeWrapper ( int fd , void *state ) {
	g_dailyMerge.dailyMergeLoop();
}

void DailyMerge::dailyMergeLoop ( ) {
	// if in repair mode, do not do daily merge
	if ( g_repairMode ) {
		return;
	}

	// or if in read only mode
	if ( g_conf.m_readOnlyMode ) {
		return;
	}

	// skip if proxy, a proxy can be hostid 0!
	if ( g_proxy.isProxy() ) {
		return;
	}

	// get local time
	int64_t nowLocalMS = gettimeofdayInMilliseconds();
	// get our hostid
	int32_t hid = g_hostdb.m_myHost->m_hostId;
	// if process only recently started (1 min ago or less)
	// then do not immediately do this...
	if (hid==0 && nowLocalMS - g_process.m_processStartTime < 1*60*1000)
		return;
	// wait until the right time (this is in UTC)
	time_t nowSynced = getTimeSynced();

	// get time since midnight
	struct tm tm_buf;
	struct tm *tt  = gmtime_r(&nowSynced,&tm_buf);
	// how many MINUTES into the day are we? (in UTC)
	int32_t elapsedMins = tt->tm_hour * 60 + tt->tm_min ;

	// . if we are not 0, just use host #0's collnum
	// . an error here will screw up the whole daily merge process
	if ( hid != 0 && m_mergeMode == 0 ) {
		// get host #0
		Host *h = &g_hostdb.m_hosts[0];
		// must have gotten an update from him
		if ( ! h->m_runtimeInformation.m_valid ) return;
		// hostid #0 must NOT be in mode 0
		if ( h->m_runtimeInformation.m_flags & PFLAG_MERGEMODE0 ) return;
		// get the collnum that host #0 is currently daily merging
		collnum_t i = g_hostdb.m_hosts[0].m_runtimeInformation.m_dailyMergeCollnum;
		// this means host #0 is not daily merging a collnum now
		if ( i < 0 ) return;
		// if it is valid, the CollectionRec MUST be there
		CollectionRec *cr = g_collectiondb.getRec ( i );
		if ( ! cr ) { 
			log("daily: host #0 bad collnum %" PRId32,(int32_t)i);return;}
		// if valid, use it
		m_cr = cr;
		// we set m_cr, go to next mode
		m_mergeMode = 1;
		// set the start time here, but don't commit to m_cr just yet
		m_savedStartTime = nowSynced;
	}

	// . only host #0 should do this loop!!!
	if(hid==0) {
		// . loop through each collection to check the time
		for (collnum_t i=0; m_mergeMode==0 && i<g_collectiondb.getNumRecs(); i++) {
			// get collection rec for collnum #i
			CollectionRec *cr = g_collectiondb.getRec ( i );
			// skip if empty, it was deleted at some point
			if ( ! cr ) continue;
			// skip if daily merge trigger is < 0 (do not do dailies)
			if ( cr->m_dailyMergeTrigger < 0 ) continue;
			// . skip if not time yet
			// . !!!!!THIS IS IN MINUTES!!!!!!!!
			if ( (int32_t)elapsedMins < (int32_t)cr->m_dailyMergeTrigger ) 
				continue;
			// do not start more than 15 mins after the trigger time,
			// if we miss that cuz we are down, then too bad
			if ( (int32_t)elapsedMins > (int32_t)cr->m_dailyMergeTrigger + 15 )
				continue;
 			// . how long has it been (in seconds)
			// . !!!!!THIS IS IN SECONDS!!!!!!!!
			int32_t diff = nowSynced - cr->m_dailyMergeStarted;
			// crazy?
			if ( diff < 0 ) continue;
			// if less than 24 hours ago, we already did it
			if ( diff < 24*3600 ) continue;
			// . we must now match the day of week
			// . use <= 0 to do it every day
			// . 0 = sunday ... 6 = saturday
			// . comma separated list is ok ("0,1, 6")
			// . leave blank or at least no numbers to do every day
			char *s = cr->m_dailyMergeDOWList;
			char dowCounts[8];
			memset(dowCounts,0,8);
			for ( ; *s ; s++ ) {
				if ( ! is_digit(*s) ) continue;
				int32_t num = atoi(s);
				if ( num < 0 ) continue;
				if ( num > 6 ) continue;
				dowCounts[num]++;
			}
			// get our dow
			int32_t todayDOW = tt->tm_wday + 1;
			// make sure 1 to 7
			if ( todayDOW < 0 || todayDOW > 6 ) { 
				log(LOG_WARN, "merge: bad today dow of %i for coll %s",
				    (int)todayDOW,cr->m_coll);
				return;
			}
			//if ( todayDOW > 6 ) { g_process.shutdownAbort(true); }
			// skip if not a dayofweek to merge on
			if ( dowCounts [ todayDOW ] == 0 ) continue;

			// set the start time here, but don't commit to m_cr just yet
			m_savedStartTime = nowSynced;
			// . wait for everyone to be in mode #0 in case they just
			//   finished another daily merge. only host #0 does this loop.
			// . PROBLEM: if host #0 crashes before everyone can get into 
			//   mode 1+ and then host #0 is brought back up, then 
			//   obviously, we will not be able to meet this condition,
			//   therefore only check to see if this condition is 
			//   satisfied our "second time around" (so we must complete
			//   one daily merge before checking this again). that is why
			//   i added "m_didDaily". -- MDW
			for ( int32_t i = 0 ; m_didDaily && i<g_hostdb.getNumHosts() ; i++){
				// skip ourselves, obviously we are in merge mode 2
				if ( &g_hostdb.m_hosts[i] == g_hostdb.m_myHost )
					continue;
				// that's good if he is in mode 0
				if ( g_hostdb.m_hosts[i].m_runtimeInformation.m_flags & PFLAG_MERGEMODE0 )
					continue;
				// oops, someone is not mode 0
				return;
			}
			// got one, save it
			m_cr = cr;
			// if we were hostid 0, go into merge mode 1 now
			m_mergeMode = 1;
			// bust out of loop
			break;
		}
	}

	// can we advance to merge mode 1?
	if ( m_mergeMode == 1 ) {
		// no candidates, go back to mode 0 now, we are done
		if ( ! m_cr ) {
			log("daily: Could not get coll rec.");
			m_mergeMode = 0; return; 
		}
		// ok, we got a collection that needs it so turn off spiders
		m_mergeMode = 2;
		// turn spiders off to keep query latency down
		m_spideringEnabled = g_conf.m_spideringEnabled;
		//m_injectionEnabled = g_conf.m_injectionEnabled;
		g_conf.m_spideringEnabled = false;
		//g_conf.m_injectionEnabled = false;
		// log it
		log("daily: Starting daily merge for %s.",m_cr->m_coll);
		log("daily: Waiting for other hosts to enter merge mode.");
	}

	// wait for everyone to make it to mode 1+ before going on
	if ( m_mergeMode == 2 ) {
		// check the ping packet flags
		for ( int32_t i = 0 ; i < g_hostdb.getNumHosts() ; i++ ) {
			// get the host
			Host *h = &g_hostdb.m_hosts[i];
			// skip ourselves, obviously we are in merge mode 2
			if ( h == g_hostdb.m_myHost ) 
				continue;
			// skip dead hosts
			if ( g_hostdb.isDead(h) )
				continue;
			// return if a host still in merge mode 0. wait for it.
			if ( h->m_runtimeInformation.m_flags & PFLAG_MERGEMODE0 )
				return;
		}
		// ok, everyone is out of mode 0 now
		m_mergeMode = 3;
		// log it
		log("daily: Waiting for all hosts to have 0 "
		    "spiders out.");
	}

	// wait for ALL spiders in network to clear
	if ( m_mergeMode == 3 ) {
		// return if we got spiders out!
		if ( g_spiderLoop.getNumSpidersOut() > 0 )
			return;
		// check the ping packet flags
		for ( int32_t i = 0 ; i < g_hostdb.getNumHosts() ; i++ ) {
			// skip ourselves, obviously we are in merge mode 2
			if ( &g_hostdb.m_hosts[i] == g_hostdb.m_myHost )
				continue;
			// if host still has spiders out, we can't go to mode 4
			if ( g_hostdb.m_hosts[i].m_runtimeInformation.m_flags & PFLAG_HASSPIDERS )
				return;
		}
		// ok, nobody has spiders now
		m_mergeMode = 4;
		// log it
		log("daily: Dumping trees.");
	}

	// start the dumps
	if ( m_mergeMode == 4 ) {
		// . set when we did it last, save that to disk to avoid thrash
		// . TODO: BUT do not allow it to be set in the spider 
		//   controls!
		// . THIS IS IN SECONDS!!!!!!!
		// . use the time we started, otherwise the merge time keeps
		//   getting pushed back.
		m_cr->m_dailyMergeStarted = m_savedStartTime; // nowSynced;
		// tell it to save, otherwise this might not get saved
		m_cr->setNeedsSave();
		// initiate dumps
		g_spiderdb.getRdb ()->submitRdbDumpJob(true);
		g_linkdb.getRdb   ()->submitRdbDumpJob(true);
		// if neither has recs in tree, go to next mode
		if(g_spiderdb.getRdb()->getNumUsedNodes()>0) return;
		if(g_linkdb  .getRdb()->getNumUsedNodes()>0) return;
		// ok, all trees are clear and dumped
		m_mergeMode = 5;
		// log it
		log("daily: Merging indexdb files.");
	}

	// start the merge
	if ( m_mergeMode == 5 ) {
		// kick off the merges if not already going

		if(g_spiderdb.getRdb()->getBase(m_cr->m_collnum)->attemptMerge(1,true,2))
			return;

		if(g_linkdb.getRdb()->getBase(m_cr->m_collnum)->attemptMerge(1,true,2))
			return;

		// . minimize titledb merging at spider time, too
		// . will perform a merge IFF there are 200 or more titledb 
		//   files present, otherwise, it will not. will do the merge
		//   such that LESS THAN 200 titledb files will be present
		//   AFTER the merge is completed.
		// . do NOT force merge ALL files on this one, we just want
		//   to make sure there are not 200+ titledb files
		// we seem to dump about 70 per day at a decent spider rate
		// so merge enough so that we don't have to merge while 
		// spidering
		if(g_titledb.getRdb()->getBase(m_cr->m_collnum)->attemptMerge(1,false,230-70))
			return;

		// set m_cr to NULL up here, so that the last guy to
		// complete the daily merge, does not "cycle back" and
		// try to re-daily merge the same collection!
		m_cr = NULL;
		// ok, merges are done
		m_mergeMode = 6;
		// log it
		log("daily: Waiting for all hosts to finish merging.");
	}

	// wait for all to finish before re-enabling spiders
	if ( m_mergeMode == 6 ) {
		// check the ping packet flags
		for ( int32_t i = 0 ; i < g_hostdb.getNumHosts() ; i++ ) {
			// skip ourselves, obviously we are ok
			if ( &g_hostdb.m_hosts[i] == g_hostdb.m_myHost )
				continue;
			// if host in mode 6 or 0, that's good
			if ( g_hostdb.m_hosts[i].m_runtimeInformation.m_flags & PFLAG_MERGEMODE0OR6)
				continue;
			// otherwise, wait for it to be in 6 or 0
			return;
		}
		// ok, nobody has spiders now, everyone is 6 or 0
		m_mergeMode = 0;
		// no coll rec now
		m_cr = NULL;
		// spiders back on
		g_conf.m_spideringEnabled = m_spideringEnabled;
		//g_conf.m_injectionEnabled = m_injectionEnabled;
		// log it
		log("daily: Daily merge completed.");
		// now the next time we do a daily we must make sure all hosts
		// are in merge mode #0 before we start
		m_didDaily  = true;
	}		
}