#include "RdbMerge.h"
#include "Rdb.h"
#include "Process.h"
#include "Spider.h" //dedupSpiderdbList()
#include "MergeSpaceCoordinator.h"
#include "Conf.h"


RdbMerge g_merge;


RdbMerge::RdbMerge()
  : m_mergeSpaceCoordinator(NULL),
	m_isAcquireLockJobSubmited(false),
	m_isLockAquired(false),
    m_doneMerging(false),
    m_getListOutstanding(false),
    m_startFileNum(0),
    m_numFiles(0),
    m_fixedDataSize(0),
    m_targetFile(NULL),
    m_targetMap(NULL),
    m_targetIndex(NULL),
	m_doneRegenerateFiles(false),
    m_isMerging(false),
    m_isHalted(false),
    m_dump(),
    m_msg5(),
    m_list(),
    m_niceness(0),
    m_rdbId(RDB_NONE),
    m_collnum(0),
    m_ks(0)
{
	memset(m_startKey, 0, sizeof(m_startKey));
}

RdbMerge::~RdbMerge() {
	delete m_mergeSpaceCoordinator;
}



// . return false if blocked, true otherwise
// . sets g_errno on error
// . if niceness is 0 merge will block, otherwise will not block
// . we now use niceness of 1 which should spawn threads that don't allow
//   niceness 2 threads to launch while they're running
// . spider process now uses mostly niceness 2 
// . we need the merge to take priority over spider processes on disk otherwise
//   there's too much contention from spider lookups on disk for the merge
//   to finish in a decent amount of time and we end up getting too many files!
bool RdbMerge::merge(rdbid_t rdbId,
                     collnum_t collnum,
                     BigFile *targetFile,
                     RdbMap *targetMap,
                     RdbIndex *targetIndex,
                     int32_t startFileNum,
                     int32_t numFiles,
                     int32_t niceness)
{
	if(m_isHalted) {
		logTrace(g_conf.m_logTraceRdbMerge, "END, merging is halted");
		return true;
	}
	if(m_isMerging) {
		logTrace(g_conf.m_logTraceRdbMerge, "END, already merging");
		return true;
	}

	if(!m_mergeSpaceCoordinator) {
		const char *mergeSpaceDir = strlen(g_hostdb.m_myHost->m_mergeDir) > 0 ? g_hostdb.m_myHost->m_mergeDir : g_conf.m_mergespaceDirectory;
		const char *mergeSpaceLockDir = strlen(g_hostdb.m_myHost->m_mergeLockDir) > 0 ? g_hostdb.m_myHost->m_mergeLockDir : g_conf.m_mergespaceLockDirectory;

		m_mergeSpaceCoordinator = new MergeSpaceCoordinator(mergeSpaceLockDir, g_conf.m_mergespaceMinLockFiles, mergeSpaceDir);
	}

	// get base, returns NULL and sets g_errno to ENOCOLLREC on error
	RdbBase *base = getRdbBase(rdbId, collnum);
	if (!base) {
		return true;
	}

	Rdb *rdb = getRdbFromId(rdbId);

	m_rdbId           = rdbId;
	m_collnum         = collnum;
	m_targetFile      = targetFile;
	m_targetMap       = targetMap;
	m_targetIndex     = targetIndex;
	m_startFileNum    = startFileNum;
	m_numFiles        = numFiles;
	m_fixedDataSize   = base->getFixedDataSize();
	m_niceness        = niceness;
	m_doneRegenerateFiles = false;
	m_doneMerging     = false;
	m_ks              = rdb->getKeySize();

	// . set the key range we want to retrieve from the files
	// . just get from the files, not tree (not cache?)
	KEYMIN(m_startKey,m_ks);

	//calculate how much space we need for resulting merged file
	m_spaceNeededForMerge = base->getSpaceNeededForMerge(m_startFileNum,m_numFiles);


	if(!g_loop.registerSleepCallback(5000, this, getLockWrapper, "RdbMerge::getLockWrapper", 0, true))
		return true;

	// we're now merging since we accepted to try
	m_isMerging = true;

	return false;
}

void RdbMerge::acquireLockWrapper(void *state) {
	RdbMerge *that = static_cast<RdbMerge*>(state);

	if(that->m_mergeSpaceCoordinator->acquire(that->m_spaceNeededForMerge)) {
		log(LOG_INFO,"Rdbmerge(%p)::getLock(), m_rdbId=%d: got lock for %" PRIu64 " bytes",
		    that, (int)that->m_rdbId, that->m_spaceNeededForMerge);
		g_loop.unregisterSleepCallback(that, getLockWrapper);
		that->m_isLockAquired = true;
	} else {
		log(LOG_INFO, "Rdbmerge(%p)::getLock(), m_rdbId=%d: Didn't get lock for %" PRIu64 " bytes; retrying in a bit...",
		    that, (int)that->m_rdbId, that->m_spaceNeededForMerge);
	}
}

void RdbMerge::acquireLockDoneWrapper(void *state, job_exit_t exit_type) {
	RdbMerge *that = static_cast<RdbMerge*>(state);

	that->m_isAcquireLockJobSubmited = false;

	if (exit_type != job_exit_normal) {
		return;
	}

	if (that->m_isLockAquired) {
		that->gotLock();
	}
}

void RdbMerge::getLockWrapper(int /*fd*/, void *state) {
	logTrace(g_conf.m_logTraceRdbMerge, "RdbMerge::getLockWrapper(%p)", state);
	RdbMerge *that = static_cast<RdbMerge*>(state);
	that->getLock();
}

void RdbMerge::getLock() {
	logDebug(g_conf.m_logDebugMerge, "Rdbmerge(%p)::getLock(), m_rdbId=%d",this,(int)m_rdbId);
	bool isAcquireLockJobSubmited = m_isAcquireLockJobSubmited.exchange(true);
	if (isAcquireLockJobSubmited) {
		return;
	}

	if (g_jobScheduler.submit(acquireLockWrapper, acquireLockDoneWrapper, this, thread_type_file_merge, 0)) {
		return;
	}

	log(LOG_WARN, "db: merge: Unable to submit acquire lock job. Running on main thread!");
	m_isAcquireLockJobSubmited = false;
	acquireLockWrapper(this);
	acquireLockDoneWrapper(this, job_exit_normal);
}

void RdbMerge::gotLockWrapper(int /*fd*/, void *state) {
	logTrace(g_conf.m_logTraceRdbMerge, "RdbMerge::gotLockWrapper(%p)", state);
	RdbMerge *that = static_cast<RdbMerge*>(state);
	g_loop.unregisterSleepCallback(state, gotLockWrapper);

	that->gotLock();
}

void RdbMerge::regenerateFilesWrapper(void *state) {
	RdbMerge *that = static_cast<RdbMerge*>(state);
	if (that->m_targetMap->getFileSize() == 0) {
		log( LOG_INFO, "db: merge: Attempting to generate map file for data file %s* of %" PRId64" bytes. May take a while.",
		     that->m_targetFile->getFilename(), that->m_targetFile->getFileSize() );

		// this returns false and sets g_errno on error
		if (!that->m_targetMap->generateMap(that->m_targetFile)) {
			log(LOG_ERROR, "db: merge: Map generation failed.");
			gbshutdownCorrupted();
		}

		log(LOG_INFO, "db: merge: Map generation succeeded.");
	}

	if (that->m_targetIndex && that->m_targetIndex->getFileSize() == 0) {
		log(LOG_INFO, "db: merge: Attempting to generate index file for data file %s* of %" PRId64" bytes. May take a while.",
		    that->m_targetFile->getFilename(), that->m_targetFile->getFileSize() );

		// this returns false and sets g_errno on error
		if (!that->m_targetIndex->generateIndex(that->m_targetFile)) {
			logError("db: merge: Index generation failed for %s.", that->m_targetFile->getFilename());
			gbshutdownCorrupted();
		}

		log(LOG_INFO, "db: merge: Index generation succeeded.");
	}
}

void RdbMerge::regenerateFilesDoneWrapper(void *state, job_exit_t exit_type) {
	RdbMerge *that = static_cast<RdbMerge*>(state);
	that->m_doneRegenerateFiles = true;
	that->gotLock();
}

// . returns false if blocked, true otherwise
// . sets g_errno on error
bool RdbMerge::gotLock() {
	// regenerate map/index if needed
	if (!m_doneRegenerateFiles &&
		m_targetFile->getFileSize() > 0 &&
		((m_targetIndex && m_targetIndex->getFileSize() == 0) || m_targetMap->getFileSize() == 0)) {
		log(LOG_WARN, "db: merge: Regenerating map/index from a killed merge.");

		if (g_jobScheduler.submit(regenerateFilesWrapper, regenerateFilesDoneWrapper, this, thread_type_file_merge, 0)) {
			return true;
		}

		log(LOG_WARN, "db: merge: Unable to submit regenerate files job. Running on main thread!");
		regenerateFilesWrapper(this);
	}

	// if we're resuming a killed merge, set m_startKey to last
	// key the map knows about.
	// the dump will start dumping at the end of the targetMap's data file.
	if (m_targetMap->getNumRecs() > 0) {
		log(LOG_INIT,"db: Resuming a killed merge.");
		m_targetMap->getLastKey(m_startKey);
		KEYINC(m_startKey,m_ks);
	}

	// . get last mapped offset
	// . this may actually be smaller than the file's actual size
	//   but the excess is not in the map, so we need to do it again
	int64_t startOffset = m_targetMap->getFileSize();

	// if startOffset is > 0 use the last key as RdbDump:m_prevLastKey
	// so it can compress the next key it dumps providee m_useHalfKeys
	// is true (key compression) and the next key has the same top 6 bytes
	// as m_prevLastKey
	char prevLastKey[MAX_KEY_BYTES];
	if (startOffset > 0) {
		m_targetMap->getLastKey(prevLastKey);
	} else {
		KEYMIN(prevLastKey, m_ks);
	}

	// get base, returns NULL and sets g_errno to ENOCOLLREC on error
	RdbBase *base = getRdbBase(m_rdbId, m_collnum);
	if (!base) {
		relinquishMergespaceLock();
		m_isMerging = false;
		//no need for calling incorporateMerge() because the base/collection is cone
		return true;
	}

	/// @note ALC currently this working because only posdb is using index
	/// if we ever enable more rdb to use indexes, we could potentially have a lot more
	/// global index jobs. means this mechanism will have to be changed.
	if (base->hasPendingGlobalIndexJob()) {
		// wait until no more pending global index job
		g_loop.registerSleepCallback(1000, this, gotLockWrapper, "RdbMerge::gotLockWrapper");
		log(LOG_INFO, "db: merge: Waiting for global index job to complete");
		return true;
	}

	// . set up a a file to dump the records into
	// . returns false and sets g_errno on error
	// . this will open m_target as O_RDWR ...

	m_dump.set(m_collnum,
	           m_targetFile,
	           NULL, // buckets to dump is NULL, we call dumpList
	           NULL, // tree to dump is NULL, we call dumpList
	           m_targetMap,
	           m_targetIndex,
	           0, // m_maxBufSize. not needed if no tree!
	           m_niceness, // niceness of dump
	           this, // state
	           dumpListWrapper,
	           base->useHalfKeys(),
	           startOffset,
	           prevLastKey,
	           m_ks,
	           m_rdbId);
	// what kind of error?
	if ( g_errno ) {
		log(LOG_WARN, "db: gotLock: merge.set: %s.", mstrerror(g_errno));
		relinquishMergespaceLock();
		m_isMerging = false;
		base->incorporateMerge();
		return true;
	}

	// . this returns false on error and sets g_errno
	// . it returns true if blocked or merge completed successfully
	return resumeMerge ( );
}

void RdbMerge::haltMerge() {
	if(m_isHalted) {
		return;
	}

	m_isHalted = true;

	// . we don't want the dump writing to an RdbMap that has been deleted
	// . this can happen if the close is delayed because we are dumping
	//   a tree to disk
	m_dump.setSuspended();
}

void RdbMerge::doSleep() {
	log(LOG_WARN, "db: Merge had error: %s. Sleeping and retrying.", mstrerror(g_errno));
	g_errno = 0;
	g_loop.registerSleepCallback(1000, this, tryAgainWrapper, "RdbMerge::tryAgainWrapper");
}

// . return false if blocked, otherwise true
// . sets g_errno on error
bool RdbMerge::resumeMerge() {
	if(m_isHalted) {
		return true;
	}

	// the usual loop
	for (;;) {
		// . this returns false if blocked, true otherwise
		// . sets g_errno on error
		// . we return true if it blocked
		if (!getNextList()) {
			logTrace(g_conf.m_logTraceRdbMerge, "END. getNextList blocked. list=%p", &m_list);
			return false;
		}

		// if g_errno is out of memory then msg3 wasn't able to get the lists
		// so we should sleep and retry...
		if (g_errno == ENOMEM) {
			doSleep();
			logTrace(g_conf.m_logTraceRdbMerge, "END. out of memory. list=%p", &m_list);
			return false;
		}

		// if list is empty or we had an error then we're done
		if (g_errno || m_doneMerging) {
			doneMerging();
			logTrace(g_conf.m_logTraceRdbMerge, "END. error/done merging. list=%p", &m_list);
			return true;
		}

		// return if this blocked
		if (!filterList()) {
			logTrace(g_conf.m_logTraceRdbMerge, "END. filterList blocked. list=%p", &m_list);
			return false;
		}

		// . otherwise dump the list we read to our target file
		// . this returns false if blocked, true otherwise
		if (!dumpList()) {
			logTrace(g_conf.m_logTraceRdbMerge, "END. dumpList blocked. list=%p", &m_list);
			return false;
		}
	}
}

// . return false if blocked, true otherwise
// . sets g_errno on error
bool RdbMerge::getNextList() {
	// return true if g_errno is set
	if (g_errno || m_doneMerging) {
		return true;
	}

	// it's suspended so we count this as blocking
	if(m_isHalted) {
		return false;
	}

	// get base, returns NULL and sets g_errno to ENOCOLLREC on error
	RdbBase *base = getRdbBase(m_rdbId, m_collnum);
	if (!base) {
		// hmmm it doesn't set g_errno so we set it here now
		// otherwise we do an infinite loop sometimes if a collection
		// rec is deleted for the collnum
		g_errno = ENOCOLLREC;
		return true;
	}

	// otherwise, get it now
	return getAnotherList();
}

bool RdbMerge::getAnotherList() {
	logDebug(g_conf.m_logDebugMerge, "db: Getting another list for merge.");

	// clear it up in case it was already set
	g_errno = 0;

	// get base, returns NULL and sets g_errno to ENOCOLLREC on error
	RdbBase *base = getRdbBase(m_rdbId, m_collnum);
	if (!base) {
		return true;
	}

	logTrace(g_conf.m_logTraceRdbMerge, "list=%p startKey=%s",
	         &m_list, KEYSTR(m_startKey, m_ks));

	// . this returns false if blocked, true otherwise
	// . sets g_errno on error
	// . we return false if it blocked
	// . m_maxBufSize may be exceeded by a rec, it's just a target size
	// . niceness is usually MAX_NICENESS, but reindex.cpp sets to 0
	// . this was a call to Msg3, but i made it call Msg5 since
	//   we now do the merging in Msg5, not in msg3 anymore
	// . this will now handle truncation, dup and neg rec removal
	// . it remembers last termId and count so it can truncate even when
	//   IndexList is split between successive reads
	// . IMPORTANT: when merging titledb we could be merging about 255
	//   files, so if we are limited to only X fds it can have a cascade
	//   affect where reading from one file closes the fd of another file
	//   in the read (since we call open before spawning the read thread)
	//   and can therefore take 255 retries for the Msg3 to complete 
	//   because each read gives a EFILCLOSED error.
	//   so to fix it we allow one retry for each file in the read plus
	//   the original retry of 25
	int32_t nn = base->getNumFiles();
	if ( m_numFiles > 0 && m_numFiles < nn ) nn = m_numFiles;

	int32_t bufSize = g_conf.m_mergeBufSize;
	// get it
	m_getListOutstanding = true;
	bool rc = m_msg5.getList(m_rdbId,
				 m_collnum,
				 &m_list,
				 m_startKey,
				 KEYMAX(),        // usually is maxed!
				 bufSize,
				 false,           // includeTree?
				 m_startFileNum,  // startFileNum
				 m_numFiles,
				 this,            // state
				 gotListWrapper,  // callback
				 m_niceness,      // niceness
				 true,            // do error correction?
				 nn + 75,         // max retries (mk it high)
				 true);            // isRealMerge? absolutely!
	if(rc)
		m_getListOutstanding = false;
	return rc;
	
}

void RdbMerge::gotListWrapper(void *state, RdbList * /*list*/, Msg5 * /*msg5*/) {
	// get a ptr to ourselves
	RdbMerge *THIS = (RdbMerge *)state;

	logTrace(g_conf.m_logTraceRdbMerge, "list=%p startKey=%s",
	         &(THIS->m_list), KEYSTR(THIS->m_startKey, THIS->m_ks));

	THIS->m_getListOutstanding = false;

	for (;;) {
		// if g_errno is out of memory then msg3 wasn't able to get the lists
		// so we should sleep and retry
		if (g_errno == ENOMEM) {
			THIS->doSleep();
			logTrace(g_conf.m_logTraceRdbMerge, "END. out of memory. list=%p", &THIS->m_list);
			return;
		}

		// if g_errno we're done
		if (g_errno || THIS->m_doneMerging) {
			THIS->doneMerging();
			logTrace(g_conf.m_logTraceRdbMerge, "END. error/done merging. list=%p", &THIS->m_list);
			return;
		}

		// return if this blocked
		if (!THIS->filterList()) {
			logTrace(g_conf.m_logTraceRdbMerge, "END. filterList blocked. list=%p", &THIS->m_list);
			return;
		}

		// return if this blocked
		if (!THIS->dumpList()) {
			logTrace(g_conf.m_logTraceRdbMerge, "END. dumpList blocked. list=%p", &THIS->m_list);
			return;
		}

		// return if this blocked
		if (!THIS->getNextList()) {
			logTrace(g_conf.m_logTraceRdbMerge, "END. getNextList blocked. list=%p", &THIS->m_list);
			return;
		}

		// otherwise, keep on trucking
	}
}

// called after sleeping for 1 sec because of ENOMEM
void RdbMerge::tryAgainWrapper(int /*fd*/, void *state) {
	// get a ptr to ourselves
	RdbMerge *THIS = (RdbMerge *)state;

	// unregister the sleep callback
	g_loop.unregisterSleepCallback(THIS, tryAgainWrapper);

	// clear this
	g_errno = 0;

	// return if this blocked
	if (!THIS->getNextList()) {
		logTrace(g_conf.m_logTraceRdbMerge, "END. getNextList blocked. list=%p", &THIS->m_list);
		return;
	}

	// if this didn't block do the loop
	gotListWrapper(THIS, NULL, NULL);
}

void RdbMerge::filterListWrapper(void *state) {
	RdbMerge *THIS = (RdbMerge *)state;

	logTrace(g_conf.m_logTraceRdbMerge, "BEGIN. list=%p m_startKey=%s", &THIS->m_list, KEYSTR(THIS->m_startKey, THIS->m_ks));

	if (THIS->m_rdbId == RDB_SPIDERDB) {
		dedupSpiderdbList(&(THIS->m_list));
	} else if (THIS->m_rdbId == RDB_TITLEDB) {
//		filterTitledbList(&(THIS->m_list));
	}

	logTrace(g_conf.m_logTraceRdbMerge, "END. list=%p", &THIS->m_list);
}

// similar to gotListWrapper but we call dumpList() before dedupList()
void RdbMerge::filterDoneWrapper(void *state, job_exit_t exit_type) {
	// get a ptr to ourselves
	RdbMerge *THIS = (RdbMerge *)state;

	logTrace(g_conf.m_logTraceRdbMerge, "BEGIN. list=%p m_startKey=%s", &THIS->m_list, KEYSTR(THIS->m_startKey, THIS->m_ks));

	for (;;) {
		// return if this blocked
		if (!THIS->dumpList()) {
			logTrace(g_conf.m_logTraceRdbMerge, "END. dumpList blocked. list=%p", &THIS->m_list);
			return;
		}

		// return if this blocked
		if (!THIS->getNextList()) {
			logTrace(g_conf.m_logTraceRdbMerge, "END. getNextList blocked. list=%p", &THIS->m_list);
			return;
		}

		// if g_errno is out of memory then msg3 wasn't able to get the lists
		// so we should sleep and retry
		if (g_errno == ENOMEM) {
			THIS->doSleep();
			logTrace(g_conf.m_logTraceRdbMerge, "END. out of memory. list=%p", &THIS->m_list);
			return;
		}

		// if g_errno we're done
		if (g_errno || THIS->m_doneMerging) {
			THIS->doneMerging();
			logTrace(g_conf.m_logTraceRdbMerge, "END. error/done merging. list=%p", &THIS->m_list);
			return;
		}

		// return if this blocked
		if (!THIS->filterList()) {
			logTrace(g_conf.m_logTraceRdbMerge, "END. filterList blocked. list=%p", &THIS->m_list);
			return;
		}

		// otherwise, keep on trucking
	}
}

bool RdbMerge::filterList() {
	// return true on g_errno
	if (g_errno) {
		return true;
	}

	// . it's suspended so we count this as blocking
	// . resumeMerge() will call getNextList() again, not dumpList() so
	//   don't advance m_startKey
	if(m_isHalted) {
		return false;
	}

	// if we use getLastKey() for this the merge completes but then
	// tries to merge two empty lists and cores in the merge function
	// because of that. i guess it relies on endkey rollover only and
	// not on reading less than minRecSizes to determine when to stop
	// doing the merge.
	m_list.getEndKey(m_startKey) ;
	KEYINC(m_startKey,m_ks);

	logTrace(g_conf.m_logTraceRdbMerge, "listEndKey=%s startKey=%s",
	         KEYSTR(m_list.getEndKey(), m_list.getKeySize()), KEYSTR(m_startKey, m_ks));

	/////
	//
	// dedup for spiderdb before we dump it. try to save disk space.
	//
	/////
	if (m_rdbId == RDB_SPIDERDB || m_rdbId == RDB_TITLEDB) {
		if (g_jobScheduler.submit(filterListWrapper, filterDoneWrapper, this, thread_type_merge_filter, 0)) {
			return false;
		}

		log(LOG_WARN, "db: Unable to submit job for merge filter. Will run in main thread");

		// fall back to filter without thread
		if (m_rdbId == RDB_SPIDERDB) {
			dedupSpiderdbList(&m_list);
		} else {
//			filterTitledbList(&m_list);
		}
	}

	return true;
}

// similar to gotListWrapper but we call getNextList() before dumpList()
void RdbMerge::dumpListWrapper(void *state) {
	// debug msg
	logDebug(g_conf.m_logDebugMerge, "db: Dump of list completed: %s.",mstrerror(g_errno));

	// get a ptr to ourselves
	RdbMerge *THIS = (RdbMerge *)state;

	logTrace(g_conf.m_logTraceRdbMerge, "list=%p startKey=%s",
	         &(THIS->m_list), KEYSTR(THIS->m_startKey, THIS->m_ks));

	for (;;) {
		// collection reset or deleted while RdbDump.cpp was writing out?
		if (g_errno == ENOCOLLREC) {
			THIS->doneMerging();
			logTrace(g_conf.m_logTraceRdbMerge, "END. error/done merging. list=%p", &THIS->m_list);
			return;
		}
		// return if this blocked
		if (!THIS->getNextList()) {
			logTrace(g_conf.m_logTraceRdbMerge, "END. getNextList blocked. list=%p", &THIS->m_list);
			return;
		}

		// if g_errno is out of memory then msg3 wasn't able to get the lists
		// so we should sleep and retry
		if (g_errno == ENOMEM) {
			// if the dump failed, it should reset m_dump.m_offset of
			// the file to what it was originally (in case it failed
			// in adding the list to the map). we do not need to set
			// m_startKey back to the startkey of this list, because
			// it is *now* only advanced on successful dump!!
			THIS->doSleep();
			logTrace(g_conf.m_logTraceRdbMerge, "END. out of memory. list=%p", &THIS->m_list);
			return;
		}

		// . if g_errno we're done
		// . if list is empty we're done
		if (g_errno || THIS->m_doneMerging) {
			THIS->doneMerging();
			logTrace(g_conf.m_logTraceRdbMerge, "END. error/done merging. list=%p", &THIS->m_list);
			return;
		}

		// return if this blocked
		if (!THIS->filterList()) {
			logTrace(g_conf.m_logTraceRdbMerge, "END. filterList blocked. list=%p", &THIS->m_list);
			return;
		}

		// return if this blocked
		if (!THIS->dumpList()) {
			logTrace(g_conf.m_logTraceRdbMerge, "END. dumpList blocked. list=%p", &THIS->m_list);
			return;
		}

		// otherwise, keep on trucking
	}
}

// . return false if blocked, true otherwise
// . set g_errno on error
// . list should be truncated, possible have all negative keys removed,
//   and de-duped thanks to RdbList::indexMerge_r() and RdbList::merge_r()
bool RdbMerge::dumpList() {
	// if the startKey rolled over we're done
	if (KEYCMP(m_startKey, KEYMIN(), m_ks) == 0) {
		m_doneMerging = true;
	}

	logDebug(g_conf.m_logDebugMerge, "db: Dumping list.");

	logTrace(g_conf.m_logTraceRdbMerge, "list=%p startKey=%s",
	         &m_list, KEYSTR(m_startKey, m_ks));

	// . send the whole list to the dump
	// . it returns false if blocked, true otherwise
	// . it sets g_errno on error
	// . it calls dumpListWrapper when done dumping
	// . return true if m_dump had an error or it did not block
	// . if it gets a EFILECLOSED error it will keep retrying forever
	return m_dump.dumpList(&m_list);
}

void RdbMerge::doneMerging() {
	// save this
	int32_t saved_errno = g_errno;

	// let RdbDump free its m_verifyBuf buffer if it existed
	m_dump.reset();

	// . free the list's memory, reset() doesn't do it
	// . when merging titledb i'm still seeing 200MB allocs to read from tfndb.
	m_list.freeList();

	log(LOG_INFO,"db: Merge status: %s.",mstrerror(g_errno));

	// . reset our class
	// . this will free it's cutoff keys buffer, trash buffer, treelist
	// . TODO: should we not reset to keep the mem handy for next time
	//   to help avoid out of mem errors?
	m_msg5.reset();

	// . do we really need these anymore?
	m_isMerging     = false;

	// if collection rec was deleted while merging files for it
	// then the rdbbase should be NULL i guess.
	if (saved_errno == ENOCOLLREC) {
		return;
	}

	// if we are exiting then dont bother renaming the files around now.
	// this prevents a core in RdbBase::incorporateMerge()
	if (g_process.isShuttingDown()) {
		log(LOG_INFO, "merge: exiting. not ending merge.");
		return;
	}

	// get base, returns NULL and sets g_errno to ENOCOLLREC on error
	RdbBase *base = getRdbBase(m_rdbId, m_collnum);
	if (!base) {
		return;
	}

	// pass g_errno on to incorporate merge so merged file can be unlinked
	base->incorporateMerge();
}


void RdbMerge::relinquishMergespaceLock() {
	if(m_mergeSpaceCoordinator) {
		m_mergeSpaceCoordinator->relinquish();
		m_isLockAquired = false;
	}
}


void RdbMerge::mergeIncorporated(const RdbBase * /*currently irrelevant*/) {
	relinquishMergespaceLock();
}