#include "gb-include.h"

#include "Rdb.h"
//#include "Msg35.h"
//#include "Tfndb.h"
//#include "Checksumdb.h"
#include "Clusterdb.h"
#include "Hostdb.h"
#include "Tagdb.h"
#include "Catdb.h"
#include "Posdb.h"
#include "Cachedb.h"
#include "Monitordb.h"
#include "Datedb.h"
#include "Titledb.h"
#include "Sections.h"
#include "Spider.h"
#include "Statsdb.h"
#include "Linkdb.h"
#include "Syncdb.h"
#include "Collectiondb.h"
//#include "CollectionRec.h"
#include "Repair.h"
#include "Rebalance.h"
//#include "Msg3.h" // debug include

// how many rdbs are in "urgent merge" mode?
int32_t g_numUrgentMerges = 0;

int32_t g_numThreads = 0;

char g_dumpMode = 0;

//static void doneDumpingWrapper  ( void *state );
//static void attemptMergeWrapper ( int fd , void *state ) ;

// since we only do one merge at a time, keep this class static
class RdbMerge g_merge;

// this one is used exclusively by tfndb so he can merge when titledb is
// merging since titledb adds a lot of tfndb records
class RdbMerge g_merge2;

RdbBase::RdbBase ( ) {
	m_numFiles  = 0;
	m_rdb = NULL;
	m_nextMergeForced = false;
	m_dbname[0] = '\0';
	m_dbnameLen = 0;
	// use bogus collnum just in case
	m_collnum = -1;
	//m_dummy = NULL;
	reset();
}

void RdbBase::reset ( ) {

	char *db = "";
	if ( m_rdb  ) db = m_dbname;
	//log("debug: base resetting db=%s collnum=%"INT32"",db,(int32_t)m_collnum);

	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) {
		mdelete ( m_files[i] , sizeof(BigFile),"RdbBFile");
		delete (m_files[i]);
		mdelete ( m_maps[i] , sizeof(RdbMap),"RdbBMap");
		delete (m_maps[i]);
	}
	m_numFiles  = 0;
	m_files [ m_numFiles ] = NULL;
	//m_isClosing = false;
	//m_isClosed  = false;
	//m_isSaving  = false;
	// reset tree and cache
	//m_tree.reset();
	//m_cache.reset();
	// free the dummy record used for delete recs (negative recs)
	//if ( m_dummy ) mfree ( m_dummy , m_fixedDataSize ,"Rdb");
	//m_dummy     = NULL;
	// reset stats
	//m_numSeeks   = 0 ;
	//m_numRead    = 0 ;
	// reset net stats
	//m_numReqsGet    = 0 ;
	//m_numNetReadGet = 0 ;
	//m_numRepliesGet = 0 ; 
	//m_numNetSentGet = 0 ;
	//m_numReqsAdd    = 0 ;
	//m_numNetReadAdd = 0 ;
	//m_numRepliesAdd = 0 ; 
	//m_numNetSentAdd = 0 ;
	// we no longer need to be saved
	//m_needsSave = false;
	//m_inWaiting = false;
	// we're not in urgent merge mode yet
	m_mergeUrgent = false;
	//m_waitingForTokenForDump  = false;
	m_waitingForTokenForMerge = false;
	m_isMerging = false;
	m_hasMergeFile = false;
	m_isUnlinking  = false;
	m_numThreads = 0;
	m_checkedForMerge = false;
}

RdbBase::~RdbBase ( ) {
	//close ( NULL , NULL );
	reset();
}

bool RdbBase::init ( char  *dir            ,
		     char  *dbname         ,
		     bool   dedup          ,
		     int32_t   fixedDataSize  ,
		     //uint32_t groupMask      ,
		     //uint32_t groupId        ,
		     int32_t   minToMergeArg     ,
		     //int32_t   maxTreeMem     ,
		     //int32_t   maxTreeNodes   ,
		     //bool   isTreeBalanced ,
		     //int32_t   maxCacheMem    ,
		     //int32_t   maxCacheNodes  ,
		     bool   useHalfKeys    ,
		     char           keySize ,
		     int32_t           pageSize,
		     char          *coll    ,
		     collnum_t      collnum ,
		     RdbTree       *tree    ,
		     RdbBuckets          *buckets ,
		     RdbDump       *dump    ,
		     class Rdb     *rdb    ,
		     void *pc , // DiskPageCache *pc      ,
		     bool           isTitledb            ,
		     bool           preloadDiskPageCache ,
		     bool           biasDiskPageCache    ) {


	m_didRepair = false;
 top:
	// reset all
	reset();
	// sanity
	if ( ! dir ) { char *xx=NULL;*xx=0; }
	// set all our contained classes
	//m_dir.set ( dir );
	// set all our contained classes
	// . "tmp" is bogus
	// . /home/mwells/github/coll.john-test1113.654coll.john-test1113.655
	char tmp[1024];
	sprintf ( tmp , "%scoll.%s.%"INT32"" , dir , coll , (int32_t)collnum );

	// logDebugAdmin
	log(LOG_DEBUG,"db: "
	    "adding new base for dir=%s coll=%s collnum=%"INT32" db=%s",
	    dir,coll,(int32_t)collnum,dbname);

	// catdb is collection independent

	// make a special subdir to store the map and data files in if
	// the db is not associated with a collection. /statsdb /accessdb
	// /facebookdb etc.
	if ( rdb->m_isCollectionLess ) {
		if ( collnum != (collnum_t) 0 ) {
			log("db: collnum not zero for catdb.");
			char *xx = NULL; *xx = 0;
		}
		// make a special "cat" dir for it if we need to
		sprintf ( tmp , "%s%s" , dir , dbname );
		int32_t status = ::mkdir ( tmp , getDirCreationFlags() );
			       // S_IRUSR | S_IWUSR | S_IXUSR | 
			       // S_IRGRP | S_IWGRP | S_IXGRP | 
			       // 		S_IROTH | S_IXOTH );
	        if ( status == -1 && errno != EEXIST && errno )
			return log("db: Failed to make directory %s: %s.",
				   tmp,mstrerror(errno));
	}

	/*
	if ( strcmp ( dbname , "catdb" ) == 0 ) {
	// if ( strcmp ( dbname , "catdb1" ) == 0 ||
	//      strcmp ( dbname , "catdb2" ) == 0 ) {
		// sanity check, ensure we're zero
		if ( collnum != (collnum_t) 0 ) {
			log("db: collnum not zero for catdb.");
			char *xx = NULL; *xx = 0;
		}
		// make a special "cat" dir for it if we need to
		sprintf ( tmp , "%scat" , dir );
		if ( ::mkdir ( tmp ,
			       // S_IRUSR | S_IWUSR | S_IXUSR | 
			       // S_IRGRP | S_IWGRP | S_IXGRP | 
			       // S_IROTH | S_IXOTH ) == -1 && errno != EEXIST )
			return log("db: Failed to make directory %s: %s.",
				   tmp,mstrerror(errno));
	}
	// statsdb is collection independent
	else if ( strcmp ( dbname , "statsdb" ) == 0 ) {
		// sanity check, statsdb should always be zero
		if ( collnum != (collnum_t) 0 ) {
			log ( "db: collnum not zero for statsdb." );
			char *xx = NULL; *xx = 0;
		}
		// make a special "stats" dir for it if necessary
		sprintf ( tmp , "%sstats" , dir );
		if ( ::mkdir ( tmp ,
			       // S_IRUSR | S_IWUSR | S_IXUSR | 
			       // S_IRGRP | S_IWGRP | S_IXGRP | 
			       // S_IROTH | S_IXOTH ) == -1 && errno != EEXIST )
			return log( "db: Failed to make directory %s: %s.",
				    tmp, mstrerror( errno ) );
	}
	// statsdb is collection independent
	else if ( strcmp ( dbname , "accessdb" ) == 0 ) {
		// sanity check, accessdb should always be zero
		if ( collnum != (collnum_t) 0 ) {
			log ( "db: collnum not zero for accessdb." );
			char *xx = NULL; *xx = 0;
		}
		// make a special "stats" dir for it if necessary
		sprintf ( tmp , "%saccess" , dir );
		if ( ::mkdir ( tmp ,
			       // S_IRUSR | S_IWUSR | S_IXUSR | 
			       // S_IRGRP | S_IWGRP | S_IXGRP | 
			       // S_IROTH | S_IXOTH ) == -1 && errno != EEXIST )
			return log( "db: Failed to make directory %s: %s.",
				    tmp, mstrerror( errno ) );
	}
	// syncdb is collection independent
	else if ( strcmp ( dbname , "syncdb" ) == 0 ) {
		// sanity check, statsdb should always be zero
		if ( collnum != (collnum_t) 0 ) {
			log ( "db: collnum not zero for syncdb." );
			char *xx = NULL; *xx = 0;
		}
		// make a special "stats" dir for it if necessary
		sprintf ( tmp , "%ssyncdb" , dir );
		if ( ::mkdir ( tmp ,
			       // S_IRUSR | S_IWUSR | S_IXUSR | 
			       // S_IRGRP | S_IWGRP | S_IXGRP | 
			       // S_IROTH | S_IXOTH ) == -1 && errno != EEXIST )
			return log( "db: Failed to make directory %s: %s.",
				    tmp, mstrerror( errno ) );
	}
	*/

	//m_dir.set ( dir , coll );
	m_dir.set ( tmp );
	m_coll    = coll;
	m_collnum = collnum;
	m_tree    = tree;
	m_buckets = buckets;
	m_dump    = dump;
	m_rdb     = rdb;

	// save the dbname NULL terminated into m_dbname/m_dbnameLen
	m_dbnameLen = gbstrlen ( dbname );
	gbmemcpy ( m_dbname , dbname , m_dbnameLen );
	m_dbname [ m_dbnameLen ] = '\0';
	// set up the dummy file
	//char filename[256];
	//sprintf(filename,"%s-saved.dat",m_dbname);
	//m_dummyFile.set ( getDir() , filename );
	// store the other parameters
	m_dedup            = dedup;
	m_fixedDataSize    = fixedDataSize;
	//m_maxTreeMem       = maxTreeMem;
	m_useHalfKeys      = useHalfKeys;
	m_ks               = keySize;
	m_pageSize         = pageSize;
	//m_pc               = pc;
	m_isTitledb        = isTitledb;
	// wa haven't done a dump yet
	//m_lastWrite        = gettimeofdayInMilliseconds();
	//m_groupMask        = groupMask;
	//m_groupId          = groupId;
	// . set up our cache
	// . we could be adding lists so keep fixedDataSize -1 for cache
	//if ( ! m_cache.init ( maxCacheMem   , 
	//		      fixedDataSize , 
	//		      true          , // support lists
	//		      maxCacheNodes ,
	//		      m_useHalfKeys ,
	//		      m_dbname      ,
	//		      loadCacheFromDisk  ) )
	//	return false;
	// we can't merge more than MAX_RDB_FILES files at a time
	if ( minToMergeArg > MAX_RDB_FILES ) minToMergeArg = MAX_RDB_FILES;
	m_minToMergeArg = minToMergeArg;
	// . set our m_files array
	// . m_dir is bogus causing this to fail
	if ( ! setFiles () ) {
		// try again if we did a repair
		if ( m_didRepair ) goto top;
		// if no repair, give up
		return false;
	}
	//int32_t dataMem;
	// if we're in read only mode, don't bother with *ANY* trees
	//if ( g_conf.m_readOnlyMode ) goto preload;
	// . if maxTreeNodes is -1, means auto compute it
	// . set tree to use our fixed data size
	// . returns false and sets g_errno on error
	//if ( ! m_tree.set ( fixedDataSize  , 
	//		    maxTreeNodes   , // max # nodes in tree
	//		    isTreeBalanced , 
	//		    maxTreeMem     ,
	//		    false          , // own data?
	//		    false          , // dataInPtrs?
	//		    m_dbname       ) )
	//	return false;
	// now get how much mem the tree is using (not including stored recs)
	//dataMem = maxTreeMem - m_tree.getTreeOverhead();
	//if ( fixedDataSize != 0 && ! m_mem.init ( &m_dump , dataMem ) ) 
	//	return log("db: Failed to initialize memory: %s.", 
	//		   mstrerror(g_errno));
	// . scan data in memory every minute to prevent swap-out
	// . the wait here is in ms, MIN_WAIT is in seconds
	// . call this every now and then to quickly scan our memory to
	//   prevent swap out
	/*
	static bool s_firstTime = true;
	if ( !g_loop.registerSleepCallback(MIN_WAIT*1000,this,scanMemWrapper)){
		log("RdbMem::init: register sleep callback for scan failed. "
		     "Warning. Mem may be swapped out.");
		g_errno = 0;
	}
	else if ( s_firstTime ) {
		s_firstTime = false;
		log("RdbMem::init: scanning mem every %"INT32" secs with "
		    "PAGE_SIZE = %"INT32"" , MIN_WAIT , PAGE_SIZE );
	}	
	*/
	// create a dummy data string for deleting nodes
	//if ( m_fixedDataSize > 0 ) {
	//	m_dummy = (char *) mmalloc ( m_fixedDataSize ,"RdbDummy" );
	//	m_dummySize = sizeof(key_t);
	//	if ( ! m_dummy ) return false;
	//}
	//m_delRecSize = sizeof(key_t);
	//if ( m_fixedDataSize == -1 ) m_delRecSize += m_dummySize + 4;
	//if ( m_fixedDataSize  >  0 ) m_delRecSize += m_dummySize;
	// load any saved tree
	//if ( ! loadTree ( ) ) return false;

	// now diskpagecache is much simpler, just basically rdbcache...
	return true;

	/*

	// . init BigFile::m_fileSize and m_lastModifiedTime
	// . m_lastModifiedTime is now used by the merge to select older
	//   titledb files to merge
	int64_t bigFileSize[MAX_RDB_FILES];
	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) {
		BigFile *f = m_files[i];
		bigFileSize[i] = f->getFileSize();
		f->getLastModifiedTime();
	}

	// if minimizingDiskSeeks, we need to know what files are going to be
	// stored in the cache
	if ( m_pc && m_numFiles > 0 && m_pc->m_minimizeDiskSeeks ){
		int32_t maxMem = m_pc->getMemMax();
		while ( maxMem > 0 ){
			int32_t minIndex = 0;
			int64_t minFileSize = -1;
			for ( int32_t i = 0; i < m_numFiles; i++ ){
				if ( bigFileSize[i] < 0 ) continue;
				if ( minFileSize < 0 || 
				     minFileSize > bigFileSize[i] ){
					minIndex = i;
					minFileSize = bigFileSize[i];
					// don't use it again
					bigFileSize[i] = -1;
				}
			}
			if ( minFileSize == -1 ) break;
			// mark in that bigFile that it can use diskcache
			BigFile *f = m_files[minIndex];
			f->m_vfdAllowed = true;
			maxMem -= minFileSize;
		}
	}

	// sanity check
	if ( m_pc && m_pc->m_diskPageSize!=m_pageSize) { char *xx=NULL;*xx=0; }
	// now fill up the page cache
	// preload:
	if ( ! preloadDiskPageCache ) return true;
	if ( ! m_pc ) return true;
	char buf [ 512000 ];
	int32_t total = m_pc->getMemMax();
	log(LOG_DEBUG,"db: %s: Preloading page cache. Total mem to use =%"UINT32"",
	     m_dbname,total);
	//log("max=%"INT32"",total);
	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) {
		if ( total <= 0 ) break;
		BigFile *f = m_files[i];
		f->setBlocking();
		int64_t fsize  = f->getFileSize();
		int64_t off    = 0;
		// for biasing, only read part of the file
		if ( biasDiskPageCache ) {
			int32_t numTwins = g_hostdb.getNumHostsPerShard();
			int32_t thisTwin = g_hostdb.m_hostId/g_hostdb.m_numShards;
			off   = (fsize/numTwins) * thisTwin;
			if ( thisTwin < numTwins-1 )
				fsize = (fsize/numTwins) * (thisTwin+1);
		}
		// read the file
		for ( ; off < fsize ; off += 256000 ) {
			if ( total <= 0 ) break;
			int64_t toread = fsize - off;
			if ( toread > 256000 ) toread = 256000;
			f->read ( buf , toread , off );
			total -= toread;
		}
		f->setNonBlocking();
	}
	// reset m_hits/m_misses
	//m_pc->resetStats();

	// now m_minToMerge might have changed so try to do a merge
	//attemptMerge ( );
	// debug test
	//BigFile f;
	//f.set ( "/gigablast3" , "tmp" );
	//f.open ( O_RDWR | O_CREAT );
	//char buf[128*1024*5+10];
	//int32_t n = f.write ( buf , 128*1024*5+10 , 0 );
	//fprintf(stderr,"n=%"INT32"\n",n);
	return true;
	*/
}

// . move all files into trash subdir
// . change name
// . this is part of PageRepair's repair algorithm. all this stuff blocks.
bool RdbBase::moveToTrash ( char *dstDir ) {
	// get current time as part of filename
	//uint32_t t = (uint32_t)getTime();
	// loop over all files
	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) {
		// . rename the map file
		// . get the "base" filename, does not include directory
		BigFile *f ;
		//char dstDir      [1024];
		//sprintf ( dstDir      , "%s" , subdir );
		char dstFilename [1024];
		f = m_maps[i]->getFile();
		sprintf ( dstFilename , "%s" , f->getFilename());
		// ALWAYS log what we are doing
		logf(LOG_INFO,"repair: Renaming %s to %s%s",
		     f->getFilename(),dstDir,dstFilename);
		if ( ! f->rename ( dstFilename , dstDir ) )
			return log("repair: Moving file had error: %s.",
				   mstrerror(errno));
		// move the data file
		f = m_files[i];
		sprintf ( dstFilename , "%s" , f->getFilename());
		// ALWAYS log what we are doing
		logf(LOG_INFO,"repair: Renaming %s to %s%s",
		     f->getFilename(),dstDir,dstFilename);
		if ( ! f->rename ( dstFilename, dstDir  ) )
			return log("repair: Moving file had error: %s.",
				   mstrerror(errno));
	}
	// now just reset the files so we are empty, we should have our
	// setFiles() called again once the newly rebuilt rdb files are
	// renamed, when RdbBase::rename() is called below
	reset();
	// success
	return true;
}

// . newly rebuilt rdb gets renamed to the original, after we call 
//   RdbBase::trash() on the original.
// . this is part of PageRepair's repair algorithm. all this stuff blocks.
bool RdbBase::removeRebuildFromFilenames ( ) {
	// loop over all files
	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) {
		// . rename the map file
		// . get the "base" filename, does not include directory
		BigFile *f = m_files[i];
		// return false if it fails
		//if ( ! removeRebuildFromFilename(f) ) return false;
		// DON'T STOP IF ONE FAILS
		removeRebuildFromFilename(f);
		// rename the map file now too!
		f = m_maps[i]->getFile();
		// return false if it fails
		//if ( ! removeRebuildFromFilename(f) ) return false;
		// DON'T STOP IF ONE FAILS
		removeRebuildFromFilename(f);
	}
	// reset all now
	reset();
	// now PageRepair should reload the original
	return true;
}

bool RdbBase::removeRebuildFromFilename ( BigFile *f ) {
	// get the filename
	char *ff = f->getFilename();
	// copy it
	char buf[1024];
	strcpy ( buf , ff );
	// remove "Rebuild" from it
	char *p = strstr ( buf , "Rebuild" );
	if ( ! p ) return log("repair: Rebuild not found in filename=%s",
			      buf);
	// bury it
	int32_t  rlen = gbstrlen("Rebuild");
	char *end  = buf + gbstrlen(buf);
	int32_t  size = end - (p + rlen);
	// +1 to include the ending \0
	memmove ( p , p + rlen , size + 1 );
	// now rename this file
	logf(LOG_INFO,"repair: Renaming %s to %s",
	     f->getFilename(),buf);
	if ( ! f->rename ( buf ) )
		return log("repair: Rename to %s failed",buf);
	return true;
}

// . this is called to open the initial rdb data and map files we have
// . first file is always the merge file (may be empty)
// . returns false on error
bool RdbBase::setFiles ( ) {
	// set our directory class
	if ( ! m_dir.open ( ) )
		// we are getting this from a bogus m_dir
		return log("db: Had error opening directory %s", getDir());
	// note it
	log(LOG_DEBUG,"db: Loading files for %s coll=%s (%"INT32").",
	     m_dbname,m_coll,(int32_t)m_collnum );
	// . set our m_files array
	// . addFile() will return -1 and set g_errno on error
	// . the lower the fileId the older the data 
	//   (but not necessarily the file)
	// . we now put a '*' at end of "*.dat*" since we may be reading in
	//   some headless BigFiles left over froma killed merge
	char *filename;
	// did we rename any files for titledb?
	bool converting = false;
	// getNextFilename() writes into this
	char pattern[8]; strcpy ( pattern , "*.dat*" );
	while ( ( filename = m_dir.getNextFilename ( pattern ) ) ) {
		// filename must be a certain length
		int32_t filenameLen = gbstrlen(filename);
		// we need at least "indexdb0000.dat"
		if ( filenameLen < m_dbnameLen + 8 ) continue;
		// ensure filename starts w/ our m_dbname
		if ( strncmp ( filename , m_dbname , m_dbnameLen ) != 0 )
			continue;
		// then a 4 digit number should follow
		char *s = filename + m_dbnameLen;
		if ( ! isdigit(*(s+0)) ) continue;
		if ( ! isdigit(*(s+1)) ) continue;
		if ( ! isdigit(*(s+2)) ) continue;
		if ( ! isdigit(*(s+3)) ) continue;
		// optional 5th digit
		int32_t len = 4;
		if (   isdigit(*(s+4)) ) len = 5;
		// . read that id
		// . older files have lower numbers
		int32_t id = atol2 ( s , len );
		// skip it
		s += len;
		// if we are titledb, we got the secondary id
		int32_t id2 = -1;
		// . if we are titledb we should have a -xxx after
		// . if none is there it needs to be converted!
		if ( m_isTitledb && *s != '-' ) {
			// critical
			log("gb: bad title filename of %s. Halting.",filename);
			g_errno = EBADENGINEER;
			return false;
			// flag it
			converting = true;
			// for now, just use the primary id as the secondary id
			id2 = id;
		}
		else if ( m_isTitledb ) { id2 = atol2 ( s + 1 , 3 ); s += 4; }
		// don't add if already in there
		int32_t i ;
		for ( i = 0 ; i < m_numFiles ; i++ ) 
			if ( m_fileIds[i] >= id ) break;
		if ( i < m_numFiles && m_fileIds[i] == id ) continue;
		// assume no mergNum
		int32_t mergeNum = -1;
		// if file id is even, we need the # of files being merged
		// otherwise, if it is odd, we do not
		if ( (id & 0x01) == 0x01 ) goto addIt;
		if ( *s != '.' ) continue; 
		s++;
		if ( ! isdigit(*(s+0)) ) continue;
		if ( ! isdigit(*(s+1)) ) continue;
		if ( ! isdigit(*(s+2)) ) continue;
		mergeNum = atol2 ( s , 3 );
 addIt:

		// sometimes an unlink() does not complete properly and we
		// end up with remnant files that are 0 bytes. so let's skip
		// those guys.
		File ff;
		ff.set ( m_dir.getDir() , filename );
		// does this file exist?
		int32_t exists = ff.doesExist() ;
		// core if does not exist (sanity check)
		if ( exists == 0 ) 
			return log("db: File %s does not exist.",filename);
		// bail on error calling ff.doesExist()
		if ( exists == -1 ) return false;
		// skip if 0 bytes or had error calling ff.getFileSize()
		if ( ff.getFileSize() == 0 ) {
			// actually, we have to move to trash because
			// if we leave it there and we start writing
			// to that file id, exit, then restart, it
			// causes problems...
			char src[1024];
			char dst[1024];
			sprintf ( src , "%s/%s",m_dir.getDir(),filename);
			sprintf ( dst , "%s/trash/%s",
				  g_hostdb.m_dir,filename);
			log("db: Moving file %s/%s of 0 bytes into trash "
			    "subdir. rename %s to %s", m_dir.getDir(),filename,
			    src,dst);
			if ( ::rename ( src , dst ) ) 
				return log("db: Moving file had error: %s.",
					   mstrerror(errno));
			continue;
		}

		// . put this file into our array of files/maps for this db
		// . MUST be in order of fileId for merging purposes
		// . we assume older files come first so newer can override
		//   in RdbList::merge() routine
		if (addFile(id,false/*newFile?*/,mergeNum,id2,converting) < 0) 
			return false;
	}

	// everyone should start with file 0001.dat or 0000.dat
	if ( m_numFiles > 0 && m_fileIds[0] > 1 && m_rdb->m_rdbId == RDB_SPIDERDB ) {
		log("db: missing file id 0001.dat for %s in coll %s. "
		    "Fix this or it'll core later. Just rename the next file "
		    "in line to 0001.dat/map. We probably cored at a "
		    "really bad time during the end of a merge process.",
		    m_dbname, m_coll );

		// do not re-do repair! hmmm
		if ( m_didRepair ) return false;

		// just fix it for them
		BigFile bf;
		SafeBuf oldName;
		oldName.safePrintf("%s%04"INT32".dat",m_dbname,m_fileIds[0]);
		bf.set ( m_dir.getDir() , oldName.getBufStart() );

		// rename it to like "spiderdb.0001.dat"
		SafeBuf newName;
		newName.safePrintf("%s/%s0001.dat",m_dir.getDir(),m_dbname);
		bf.rename ( newName.getBufStart() );

		// and delete the old map
		SafeBuf oldMap;
		oldMap.safePrintf("%s/%s0001.map",m_dir.getDir(),m_dbname);
		File omf;
		omf.set ( oldMap.getBufStart() );
		omf.unlink();

		// get the map file name we want to move to 0001.map
		BigFile cmf;
		SafeBuf curMap;
		curMap.safePrintf("%s%04"INT32".map",m_dbname,m_fileIds[0]);
		cmf.set ( m_dir.getDir(), curMap.getBufStart());

		// rename to spiderdb0081.map to spiderdb0001.map
		cmf.rename ( oldMap.getBufStart() );

		// replace that first file then
		m_didRepair = true;
		return true;
		//char *xx=NULL; *xx=0;
	}


	m_dir.close();

	// ensure files are sharded correctly
	verifyFileSharding();

	if ( ! converting ) return true;

	// now if we are converting old titledb names to new...
	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) {
		// . get the actual secondary to correspond with tfndb
		// . generate secondary id from primary id
		m_fileIds2[i] = (m_fileIds[i] - 1)/ 2;
		// . titledbRebuild? use m_dbname
		// . rename it
		char buf1[1024];
		sprintf ( buf1, "%s%04"INT32".dat", m_dbname,m_fileIds[i] );
		BigFile f;
		f.set ( g_hostdb.m_dir , buf1 );
		char buf2[1024];
		sprintf ( buf2 , "%s%04"INT32"-%03"INT32".dat",
			  m_dbname, m_fileIds[i] , m_fileIds2[i] );
		// log it
		log(LOG_INFO,"db: Renaming %s to %s",buf1,buf2);
		if ( ! f.rename ( buf2 ) )
			return log("db: Rename had error: %s.",
				   mstrerror(g_errno));
	}
	// otherwise, success
	return log("db: Rename succeeded.");
}

// return the fileNum we added it to in the array
// reutrn -1 and set g_errno on error
int32_t RdbBase::addFile ( int32_t id , bool isNew , int32_t mergeNum , 
			   int32_t id2 , bool converting ) {

	int32_t n = m_numFiles;
	// can't exceed this
	if ( n >= MAX_RDB_FILES ) { 
		g_errno = ETOOMANYFILES; 
		log(LOG_LOGIC,
		    "db: Can not have more than %"INT32" files. File add "
		    "failed.",(int32_t)MAX_RDB_FILES);
		return -1;
	}

	// HACK: skip to avoid a OOM lockup. if RdbBase cannot dump
	// its data to disk it can backlog everyone and memory will
	// never get freed up.
	int64_t mm = g_conf.m_maxMem;
	g_conf.m_maxMem = 0x0fffffffffffffffLL;
	BigFile *f ;

 tryAgain:

	try { f = new (BigFile); }
	catch ( ...  ) { 
		g_conf.m_maxMem = mm;
		g_errno = ENOMEM;
		log("RdbBase: new(%i): %s", 
		    (int)sizeof(BigFile),mstrerror(g_errno));
		return -1; 
	}
	mnew ( f , sizeof(BigFile) , "RdbBFile" );

	// set the data file's filename
	char name[512];
	if      ( mergeNum <= 0 && m_isTitledb )
		snprintf(name,511,"%s%04"INT32"-%03"INT32".dat",
			 m_dbname,id,id2 );
	else if ( mergeNum <= 0 )
		snprintf ( name ,511,"%s%04"INT32".dat"      , m_dbname, id );
	else if ( m_isTitledb )
		snprintf ( name ,511,"%s%04"INT32"-%03"INT32".%03"INT32".dat",
			  m_dbname, id , id2, mergeNum );
	else
		snprintf(name,511,"%s%04"INT32".%03"INT32".dat",
			m_dbname,id,mergeNum);

	f->set ( getDir() , name , NULL ); // getStripeDir() );

	// if new ensure does not exist
	if ( isNew && f->doesExist() ) {
		log("rdb: creating NEW file %s/%s which already exists!",
		    f->getDir(),
		    f->getFilename());
		// if length is NOT 0 then that sucks, just return
		bool isEmpty = ( f->getFileSize() == 0 );
		// move to trash if empty
		if ( isEmpty ) {
			// otherwise, move it to the trash
			SafeBuf cmd;
			cmd.safePrintf("mkdir %s/trash ; mv %s/%s %s/trash/",
				       g_hostdb.m_dir,
				       f->getDir(),
				       f->getFilename(),
				       g_hostdb.m_dir);
			log("rdb: %s",cmd.getBufStart() );
			gbsystem ( cmd.getBufStart() );
		}
		// nuke it either way
		mdelete ( f , sizeof(BigFile),"RdbBFile");
		delete (f); 
		// if it was not empty, we are done. i guess merges
		// will stockpile, that sucks... things will slow down
		if ( ! isEmpty ) {
			return -1;
			char *xx=NULL;*xx=0;
		}
		// ok, now try again since bogus file is gone
		goto tryAgain;
	}


	RdbMap  *m ;
	try { m = new (RdbMap); }
	catch ( ... ) { 
		g_conf.m_maxMem = mm;
		g_errno = ENOMEM;
		log("RdbBase: new(%i): %s", 
		    (int)sizeof(RdbMap),mstrerror(g_errno));
		mdelete ( f , sizeof(BigFile),"RdbBFile");
		delete (f); 
		return -1; 
	}
	mnew ( m , sizeof(RdbMap) , "RdbBMap" );
	// reinstate the memory limit
	g_conf.m_maxMem = mm;
	// sanity check
	if ( id2 < 0 && m_isTitledb ) { char *xx = NULL; *xx = 0; }

	CollectionRec *cr = NULL;

	// if we're converting, just add to m_filesIds and m_fileIds2
	if ( converting ) {
	       log("*-*-*-* Converting titledb files to new file name format");
	       goto skip;
	}


	// debug help
	if ( isNew )
		log("rdb: adding new file %s/%s",// m_numFiles=%"INT32"",
		    f->getDir(),f->getFilename());//,m_numFiles);

	// rename bug fix?
	/*
	if ( ! isNew && id != 1 && m_isTitledb && m_rdb->m_rdbId == RDB_TITLEDB ) {
		char newf[10245];
		sprintf(newf,"%s%04"INT32"-%03"INT32".dat",
			"titledbRebuild",id,id2);
		log("cmd: f->rename(%s%04"INT32"-%03"INT32".dat,%s)",m_dbname,id,id2,newf);
		g_conf.m_useThreads = 0;
		f->rename ( newf );
		g_conf.m_useThreads = 1;
		exit(-1);
	}
	*/

	// if not a new file sanity check it
	for ( int32_t j = 0 ; ! isNew && j < f->m_maxParts - 1 ; j++ ) {
		// might be headless
		File *ff = f->getFile2(j);//m_files[j];
		if ( ! ff ) continue;
		if ( ff->getFileSize() == MAX_PART_SIZE ) continue;
		log ( "db: File %s/%s has length %"INT64", but it should be %"INT64". "
		      "You should move it to a temporary directory "
		      "and restart. It probably happened when the power went "
		      "out and a file delete operation failed to complete.",
		      f->getDir(),
		      ff->getFilename() ,
		      (int64_t)ff->getFileSize(),
		      (int64_t)MAX_PART_SIZE);
		exit(0);
		return -1;
	}

	// set the map file's  filename
	sprintf ( name , "%s%04"INT32".map", m_dbname, id );
	m->set ( getDir() , name , m_fixedDataSize , m_useHalfKeys , m_ks ,
		 m_pageSize );
	if ( ! isNew && ! m->readMap ( f ) ) { 
		// if out of memory, do not try to regen for that
		if ( g_errno == ENOMEM ) return -1;
		g_errno = 0;
		log("db: Could not read map file %s",name);
		// if 'gb dump X collname' was called, bail, we do not
		// want to write any data
		if ( g_dumpMode ) return -1; 
		log("db: Attempting to generate map file for data "
		    "file %s* of %"INT64" bytes. May take a while.",
		    f->getFilename(), f->getFileSize() );
		// debug hack for now
		//exit(-1);
		// this returns false and sets g_errno on error
		if ( ! m->generateMap ( f ) ) {
			log("db: Map generation failed.");
			log("db: Moving .dat and .map file to trash dir");
			SafeBuf tmp;
			tmp.safePrintf("%s",f->getFilename());
			// take off .dat and make it * so we can move map file
			int32_t len = tmp.getLength();
			char *str = tmp.getBufStart();
			str[len-3] = '*';
			str[len-2] = '\0';
			SafeBuf cmd;
			cmd.safePrintf("mv %s/%s %s/trash/",
				       m_dir.getDir(),
				       str,
				       g_hostdb.m_dir);
			log("db: %s",cmd.getBufStart() );
			gbsystem ( cmd.getBufStart() );
			exit(0);
			mdelete ( f , sizeof(BigFile),"RdbBase");
			delete (f); 
			mdelete ( m , sizeof(RdbMap),"RdbBase");
			delete (m); 
			return -1; 
		}
		log("db: Map generation succeeded.");
		// . save it
		// . if we're an even #'d file a merge will follow
		//   when main.cpp calls attemptMerge()
		log("db: Saving generated map file to disk.");
		// turn off statsdb so it does not try to add records for 
		// these writes because it is not initialized yet and will
		// cause this write to fail!
		g_statsdb.m_disabled = true;
		// true = alldone
		bool status = m->writeMap( true );
		g_statsdb.m_disabled = false;
		if ( ! status ) return log("db: Save failed.");
	}

	if ( ! isNew ) log(LOG_DEBUG,"db: Added %s for collnum=%"INT32" pages=%"INT32"",
			    name ,(int32_t)m_collnum,m->getNumPages());

	// open this big data file for reading only
	if ( ! isNew ) {
		if ( mergeNum < 0 ) 
			f->open ( O_RDONLY | O_NONBLOCK | O_ASYNC , NULL );
		// otherwise, merge will have to be resumed so this file
		// should be writable
		else
			f->open ( O_RDWR | O_NONBLOCK | O_ASYNC , NULL );//pc
	}
 skip:
	// find the position to add so we maintain order by fileId
	int32_t i ;
	for ( i = 0 ; i < m_numFiles ; i++ ) if ( m_fileIds[i] >= id ) break;
	// cannot collide here
	if ( i < m_numFiles && m_fileIds[i] == id ) { 
		log(LOG_LOGIC,"db: addFile: fileId collided."); return -1; }
	// shift everyone up if we need to fit this file in the middle somewher
	if ( i < m_numFiles ) {
		int nn = m_numFiles-i;
		memmove ( &m_files  [i+1] , &m_files[i],nn*sizeof(BigFile *));
		memmove ( &m_fileIds[i+1] , &m_fileIds[i],nn*sizeof(int32_t));
		memmove ( &m_fileIds2[i+1], &m_fileIds2[i],nn*sizeof(int32_t));
		memmove ( &m_maps   [i+1] , &m_maps   [i],nn*sizeof(RdbMap *));
	}

	// insert this file into position #i
	m_fileIds  [i] = id;
	m_fileIds2 [i] = id2;
	m_files    [i] = f;
	m_maps     [i] = m;

	// debug point
	//log("map #0 is %s ptr=%llx (nf=%i)",
	//    m_maps[0]->getFilename(),(long long)m_maps[0],m_numFiles);

	// to free up mem for diffbot's many collections...
	cr = g_collectiondb.getRec ( m_collnum );
	if ( ! isNew && cr && cr->m_isCustomCrawl )
		m->reduceMemFootPrint();

	// are we resuming a killed merge?
	if ( g_conf.m_readOnlyMode && ((id & 0x01)==0) ) {
		log("db: Cannot start in read only mode with an incomplete "
		    "merge, because we might be a temporary cluster and "
		    "the merge might be active.");
		exit(-1);
	}

	// inc # of files we have
	m_numFiles++;
	// debug note
	//log("rdb: numFiles=%"INT32" for collnum=%"INT32" db=%s",
	//    m_numFiles,(int32_t)m_collnum,m_dbname);
	// keep it NULL terminated
	m_files [ m_numFiles ] = NULL;
	// if we added a merge file, mark it
	if ( mergeNum >= 0 ) {
		m_hasMergeFile      = true;
		m_mergeStartFileNum = i + 1 ; //merge was starting w/ this file
	}
	return i;
}

/*
// this is obsolete now
// returns -1 and sets g_errno if none available
int32_t RdbBase::getAvailId2 ( ) {
	// . otherwise we're titledb, find an available second id
	// . store id2s here as we find them
	char f[MAX_RDB_FILES]; 
	memset ( f , 0 , MAX_RDB_FILES );
	int32_t i;
	for ( i = 0 ; i < m_numFiles ; i++ ) {
		// sanity check
		if ( m_fileIds2[i] < 0 || m_fileIds2[i] >= MAX_RDB_FILES ) {
			char *xx = NULL; *xx = 0; }
		// flag it as used
		f [ m_fileIds2[i] ] = 1;
	}
	// find the first available one
	for ( i = 0 ; i < MAX_RDB_FILES ; i++ ) if ( f[i] == 0 ) break;
	// . error if none! return -1 on error
	// . 255 is reserved in tfndb for urls just in spiderdb or in 
	//   titledb tree
	if ( i >= MAX_RDB_FILES || i >= 255 ) {
		log(LOG_LOGIC,"db: No more secondary ids available for "
		    "new titledb file. All in use. This should never happen. "
		    "Are your hard drives extremely slow?");
		return -1;
	}
	return i;
}
*/

int32_t RdbBase::addNewFile ( int32_t id2 ) {

	int32_t fileId = 0;
	for ( int32_t i = 0 ; i < m_numFiles ; i++ )
		if ( m_fileIds[i] >= fileId ) fileId = m_fileIds[i] + 1;
	// . if not odd number then add one
	// . we like to keep even #'s for merge file names
	if ( (fileId & 0x01) == 0 ) fileId++;
	// otherwise, set it
	return addFile(fileId,true/*a new file?*/,-1 /*mergeNum*/,id2 );
}

static void doneWrapper ( void *state ) ;

// . called after the merge has successfully completed
// . the final merge file is always file #0 (i.e. "indexdb0000.dat/map")
bool RdbBase::incorporateMerge ( ) {
	// some int16_thand variable notation
	int32_t a = m_mergeStartFileNum;
	int32_t b = m_mergeStartFileNum + m_numFilesToMerge;
	// shouldn't be called if no files merged
	if ( a == b ) {
		// unless resuming after a merge completed and we exited
		// but forgot to finish renaming the final file!!!!
		log("merge: renaming final file");
		// decrement this count
		if ( m_isMerging ) m_rdb->m_numMergesOut--;
		// exit merge mode
		m_isMerging = false;
		// return the merge token, no need for a callback
		//g_msg35.releaseToken ( );
		//return true; 
	}
	// file #x is the merge file
	int32_t x = a - 1; 
	// . we can't just unlink the merge file on error anymore
	// . it may have some data that was deleted from the original file
	if ( g_errno ) {
		log("db: Merge failed for %s, Exiting.", m_dbname);
		// print mem table
		//g_mem.printMem();
		// we don't have a recovery system in place, so save state
	        // and dump core
		char *p = NULL;
		*p = 0;
		//m_isMerging = false;
		// return the merge token, no need for a callback
		//g_msg35.releaseToken ( );
		return true;
	}
	// note
	log(LOG_INFO,"db: Writing map %s.",m_maps[x]->getFilename());
	// . ensure we can save the map before deleting other files
	// . sets g_errno and return false on error
	// . allDone = true
	m_maps[x]->writeMap( true );

	// tfndb has his own merge class since titledb merges write tfndb recs
	RdbMerge *m = &g_merge;
	//if ( m_rdb == g_tfndb.getRdb() ) m = &g_merge2;

	// print out info of newly merged file
	int64_t tp = m_maps[x]->getNumPositiveRecs();
	int64_t tn = m_maps[x]->getNumNegativeRecs();
	log(LOG_INFO,
	    "merge: Merge succeeded. %s (#%"INT32") has %"INT64" positive "
	     "and %"INT64" negative recs.", m_files[x]->getFilename(), x, tp, tn);
	if ( m_rdb == g_posdb.getRdb() ) // || m_rdb == g_tfndb.getRdb() )
		log(LOG_INFO,"merge: Removed %"INT64" dup keys.",
		     m->getDupsRemoved() );
	// . bitch if bad news
	// . sanity checks to make sure we didn't mess up our data from merging
	// . these cause a seg fault on problem, seg fault should save and
	//   dump core. sometimes we'll have a chance to re-merge the 
	//   candidates to see what caused the problem.
	// . only do these checks for indexdb, titledb can have key overwrites
	//   and we can lose positives due to overwrites
	// . i just re-added some partially indexed urls so indexdb will
	//   have dup overwrites now too!!
	if ( tp > m_numPos ) {
		log(LOG_INFO,"merge: %s gained %"INT64" positives.",
		     m_dbname , tp - m_numPos );
			//char *xx = NULL; *xx = 0;
	}
	if ( tp < m_numPos - m_numNeg ) {
		log(LOG_INFO,"merge: %s: lost %"INT64" positives",
		     m_dbname , m_numPos - tp );
		//char *xx = NULL; *xx = 0;
	}
	if ( tn > m_numNeg ) {
		log(LOG_INFO,"merge: %s: gained %"INT64" negatives.",
		     m_dbname , tn - m_numNeg );
		//char *xx = NULL; *xx = 0;
	}
	if ( tn < m_numNeg - m_numPos ) {
		log(LOG_INFO,"merge: %s: lost %"INT64" negatives.",
		     m_dbname , m_numNeg - tn );
		//char *xx = NULL; *xx = 0;
	}

	// assume no unlinks blocked
	m_numThreads = 0;

	// . before unlinking the files, ensure merged file is the right size!!
	// . this will save us some anguish
	m_files[x]->m_fileSize = -1;
	int64_t fs = m_files[x]->getFileSize();
	// get file size from map
	int64_t fs2 = m_maps[x]->getFileSize();
	// compare, if only a key off allow that. that is an artificat of
	// generating a map for a file screwed up from a power outage. it
	// will end on a non-key boundary.
	if ( fs != fs2 ) {
		log("build: Map file size does not agree with actual file "
		    "size for %s. Map says it should be %"INT64" bytes but it "
		    "is %"INT64" bytes.", 
		    m_files[x]->getFilename(), fs2 , fs );
		if ( fs2-fs > 12 || fs-fs2 > 12 ) { char *xx = NULL; *xx = 0; }
		// now print the exception
		log("build: continuing since difference is less than 12 "
		    "bytes. Most likely a discrepancy caused by a power "
		    "outage and the generated map file is off a bit.");
	}

	// on success unlink the files we merged and free them
	for ( int32_t i = a ; i < b ; i++ ) {
		// incase we are starting with just the
		// linkdb0001.003.dat file and not the stuff we merged
		if ( ! m_files[i] ) continue;
		// debug msg
		log(LOG_INFO,"merge: Unlinking merged file %s/%s (#%"INT32").",
		    m_files[i]->getDir(),m_files[i]->getFilename(),i);
		// . append it to "sync" state we have in memory
		// . when host #0 sends a OP_SYNCTIME signal we dump to disk
		//g_sync.addOp ( OP_UNLINK , m_files[i] , 0 );
		// . these links will be done in a thread
		// . they will save the filename before spawning so we can
		//   delete the m_files[i] now
		if ( ! m_files[i]->unlink ( doneWrapper , this ) ) {
			m_numThreads++; g_numThreads++; }
		// debug msg
		// MDW this cores if file is bad... if collection
		// got delete from under us i guess!!
		else log(LOG_INFO,"merge: Unlinked %s (#%"INT32").",
			 m_files[i]->getFilename(),i);
		// debug msg
		log(LOG_INFO,"merge: Unlinking map file %s (#%"INT32").",
		     m_maps[i]->getFilename(),i);
		if ( ! m_maps[i]->unlink  ( doneWrapper , this ) ) {
			m_numThreads++; g_numThreads++; }
			// debug msg
		else log(LOG_INFO,"merge: Unlinked %s (#%"INT32").",
			 m_maps[i]->getFilename(),i);
		//if ( i == a ) continue;
		//delete (m_files[i]);
		//delete (m_maps[i]);
	}

	// save for re-use
	m_x = x;
	m_a = a;
	// save the merge file name so we can unlink from sync table later
	strncpy ( m_oldname , m_files[x]->getFilename() , 254 );
	// . let sync table know we closed the old file we merged to
	// . no, keep it around until after the rename finishes
	//g_sync.addOp ( OP_CLOSE , m_files[x] , 0 );	

	// wait for the above unlinks to finish before we do this rename
	// otherwise, we might end up doing this rename first and deleting
	// it!
	
	// if we blocked on all, keep going
	if ( m_numThreads == 0 ) { doneWrapper2 ( ); return true; }
	// . otherwise we blocked
	// . we are now unlinking
	// . this is so Msg3.cpp can avoid reading the [a,b) files
	m_isUnlinking = true;

	return true;
}

void doneWrapper ( void *state ) {
	RdbBase *THIS = (RdbBase *)state;
	log("merge: done unlinking file. #threads=%"INT32"",THIS->m_numThreads);
	THIS->doneWrapper2 ( );
}

static void doneWrapper3 ( void *state ) ;

void RdbBase::doneWrapper2 ( ) {
	// bail if waiting for more to come back
	if ( m_numThreads > 0 ) {
		g_numThreads--;
		if ( --m_numThreads > 0 ) return;
	}

	// debug msg
	log (LOG_INFO,"merge: Done unlinking all files.");

	// could be negative if all did not block
	m_numThreads = 0;

	int32_t x = m_x;
	int32_t a = m_a;
	// . the fileId of the merge file becomes that of a
	// . but secondary id should remain the same
	m_fileIds [ x ] = m_fileIds [ a ];
	if ( ! m_maps [x]->rename(m_maps[a]->getFilename(),doneWrapper3,this)){
		m_numThreads++; g_numThreads++; }

	// sanity check
	m_files[x]->m_fileSize = -1;
	int64_t fs = m_files[x]->getFileSize();
	// get file size from map
	int64_t fs2 = m_maps[x]->getFileSize();
	// compare
	if ( fs != fs2 ) {
		log("build: Map file size does not agree with actual file "
		    "size");
		char *xx = NULL; *xx = 0;
	}

	if ( ! m_isTitledb ) {
		// we are kind opening this up now for writing
		//g_sync.addOp ( OP_OPEN  , m_files[a]->getFilename() , 0 );
		// debug statement
		log(LOG_INFO,"db: Renaming %s of size %"INT64" to %s",
		    m_files[x]->getFilename(),fs , m_files[a]->getFilename());
		// rename it, this may block
		if ( ! m_files[x]->rename ( m_files[a]->getFilename() ,
					    doneWrapper3 , this ) ) {
			m_numThreads++; g_numThreads++; }
	}
	else {
		// rename to this (titledb%04"INT32"-%03"INT32".dat)
		char buf [ 1024 ];
		// use m_dbname in case its titledbRebuild
		sprintf ( buf , "%s%04"INT32"-%03"INT32".dat" , 
			  m_dbname, m_fileIds[a], m_fileIds2[x] );
		// we are kind opening this up now for writing
		//g_sync.addOp ( OP_OPEN  , buf , 0 );
		// rename it, this may block
		if ( ! m_files   [ x ]->rename ( buf , doneWrapper3, this ) ) {
			m_numThreads++; g_numThreads++; }
	}
	// if we blocked on all, keep going
	if ( m_numThreads == 0 ) { doneWrapper4 ( ); return ; }
	// . otherwise we blocked
	// . we are now unlinking
	// . this is so Msg3.cpp can avoid reading the [a,b) files
	m_isUnlinking = true;
}


void doneWrapper3 ( void *state ) {
	RdbBase *THIS = (RdbBase *)state;
	log("rdb: thread completed rename operation for collnum=%"INT32" "
	    "#thisbaserenamethreads=%"INT32"",
	    (int32_t)THIS->m_collnum,THIS->m_numThreads-1);
	THIS->doneWrapper4 ( );
}

static void checkThreadsAgainWrapper ( int fb, void *state  ) {
	RdbBase *THIS = (RdbBase *)state;
	g_loop.unregisterSleepCallback ( state,checkThreadsAgainWrapper);
	THIS->doneWrapper4 ( );
}

void RdbBase::doneWrapper4 ( ) {
	// bail if waiting for more to come back
	if ( m_numThreads > 0 ) {
		g_numThreads--;
		if ( --m_numThreads > 0 ) return;
	}

	// some shorthand variable notation
	int32_t a = m_mergeStartFileNum;
	int32_t b = m_mergeStartFileNum + m_numFilesToMerge;

	//
	// wait for all threads accessing this bigfile to go bye-bye
	//
	log("db: checking for outstanding read threads on unlinked files");
	bool wait = false;
	for ( int32_t i = a ; i < b ; i++ ) {
		BigFile *bf = m_files[i];
		if ( g_threads.isHittingFile(bf) ) wait = true;
	}
	if ( wait ) {
		log("db: waiting for read thread to exit on unlinked file");
		if (!g_loop.registerSleepCallback(100,this,
						  checkThreadsAgainWrapper)){
			char *xx=NULL;*xx=0; }
		return;
	}


	// . we are no longer unlinking
	// . this is so Msg3.cpp can avoid reading the [a,b) files
	m_isUnlinking = false;
	// file #x is the merge file
	//int32_t x = a - 1; 
	// rid ourselves of these files
	buryFiles ( a , b );
	// sanity check
	if ( m_numFilesToMerge != (b-a) ) {
		log(LOG_LOGIC,"db: Bury oops."); char *xx = NULL; *xx = 0; }
	// we no longer have a merge file
	m_hasMergeFile = false;
	// now unset m_mergeUrgent if we're close to our limit
	if ( m_mergeUrgent && m_numFiles - 14 < m_minToMerge ) {
		m_mergeUrgent = false;
		if ( g_numUrgentMerges > 0 ) g_numUrgentMerges--;
		if ( g_numUrgentMerges == 0 )
			log(LOG_INFO,"merge: Exiting urgent "
			    "merge mode for %s.",m_dbname);
	}
	// decrement this count
	if ( m_isMerging ) m_rdb->m_numMergesOut--;
	// exit merge mode
	m_isMerging = false;
	// return the merge token, no need for a callback
	//g_msg35.releaseToken ( );
	// the rename has completed at this point, so tell sync table in mem
	//g_sync.addOp ( OP_CLOSE , m_files[x] , 0 );
	// unlink old merge filename from sync table
	//g_sync.addOp ( OP_UNLINK , m_oldname , 0 );
	// . now in case dump dumped many files while we were merging
	//   we should see if they need to be merged now
	// . Msg35 may bitch if our get request arrives before our release
	//   request! he may think we're a dup request but that should not
	//   happen since we don't allow getToken() to be called if we are
	//   merging or have already made a request for it.
	//attemptMerge ( 1/*niceness*/ , false /*don't force it*/ ) ;
	// try all in case they were waiting (and not using tokens)
	//g_tfndb.getRdb()->attemptMerge      ( 1 , false );
	/*
	g_clusterdb.getRdb()->attemptMerge  ( 1 , false );
	g_linkdb.getRdb()->attemptMerge     ( 1 , false );
	//g_sectiondb.getRdb()->attemptMerge  ( 1 , false );
	g_tagdb.getRdb()->attemptMerge      ( 1 , false );
	//g_checksumdb.getRdb()->attemptMerge ( 1 , false );
	g_titledb.getRdb()->attemptMerge    ( 1 , false );
	g_doledb.getRdb()->attemptMerge     ( 1 , false );
	g_catdb.getRdb()->attemptMerge      ( 1 , false );
	//g_clusterdb.getRdb()->attemptMerge  ( 1 , false );
	g_statsdb.getRdb()->attemptMerge    ( 1 , false );
	g_syncdb.getRdb()->attemptMerge     ( 1 , false );
	g_cachedb.getRdb()->attemptMerge     ( 1 , false );
	g_serpdb.getRdb()->attemptMerge     ( 1 , false );
	g_monitordb.getRdb()->attemptMerge     ( 1 , false );
	//g_linkdb.getRdb()->attemptMerge     ( 1 , false );
	//g_indexdb.getRdb()->attemptMerge    ( 1 , false );
	g_posdb.getRdb()->attemptMerge    ( 1 , false );
	//g_datedb.getRdb()->attemptMerge     ( 1 , false );
	g_spiderdb.getRdb()->attemptMerge   ( 1 , false );
	*/

	// try to merge more when we are done
	attemptMergeAll2 ( );
}

void RdbBase::buryFiles ( int32_t a , int32_t b ) {
	// on succes unlink the files we merged and free them
	for ( int32_t i = a ; i < b ; i++ ) {
		mdelete ( m_files[i] , sizeof(BigFile),"RdbBase");
		delete (m_files[i]);
		mdelete ( m_maps[i] , sizeof(RdbMap),"RdbBase");
		delete (m_maps [i]);
	}
	// bury the merged files
	int32_t n = m_numFiles - b;
	gbmemcpy (&m_files   [a], &m_files   [b], n*sizeof(BigFile *));
	gbmemcpy (&m_maps    [a], &m_maps    [b], n*sizeof(RdbMap  *));
	gbmemcpy (&m_fileIds [a], &m_fileIds [b], n*sizeof(int32_t     ));
	gbmemcpy (&m_fileIds2[a], &m_fileIds2[b], n*sizeof(int32_t     ));
	// decrement the file count appropriately
	m_numFiles -= (b-a);
	// sanity
	log("rdb: bury files: numFiles now %"INT32" (b=%"INT32" a=%"INT32" collnum=%"INT32")",
	    m_numFiles,b,a,(int32_t)m_collnum);
	// ensure last file is NULL (so BigFile knows the end of m_files)
	m_files [ m_numFiles ] = NULL;
}

/*
void attemptMergeWrapper ( int fd , void *state ) {
	Rdb *THIS = (Rdb *) state;
	// keep sleeping if someone else still merging
	if ( g_merge.isMerging() ) return;
	// if no one else merging, stop the sleep-retry cycle
	//g_loop.unregisterSleepCallback ( state , attemptMergeWrapper );
	// now it's our turn
	THIS->attemptMerge ( 1, THIS->m_nextMergeForced );// 1=niceness
}
*/

//static void gotTokenForMergeWrapper ( void *state ) ;

// . the DailyMerge.cpp will set minToMergeOverride for titledb, and this
//   overrides "forceMergeAll" which is the same as setting 
//   "minToMergeOverride" to "2". (i.e. perform a merge if you got 2 or more 
//   files)
// . now return true if we started a merge, false otherwise
// . TODO: fix Rdb::attemptMergeAll() to not remove from linked list if
//   we had an error in addNewFile() or rdbmerge.cpp's call to rdbbase::addFile
bool RdbBase::attemptMerge ( int32_t niceness, bool forceMergeAll, bool doLog ,
			     int32_t minToMergeOverride ) {

	// don't do merge if we're in read only mode
	if ( g_conf.m_readOnlyMode ) return false;
	// or if we are copying our files to a new host
	//if ( g_hostdb.m_syncHost == g_hostdb.m_myHost ) return;
	// nor if EITHER of the merge classes are suspended
	if ( g_merge.m_isSuspended  ) return false;
	if ( g_merge2.m_isSuspended ) return false;

	// shutting down? do not start another merge then
	if ( g_process.m_mode == EXIT_MODE ) return false;

	// sanity checks
	if (   g_loop.m_inQuickPoll ) { 
		log("rdb: cant attempt merge in quickpoll");
		return false;
	}

	if (   niceness == 0 ) { char *xx=NULL;*xx=0; }

	if ( forceMergeAll ) m_nextMergeForced = true;

	if ( m_nextMergeForced ) forceMergeAll = true;

	if ( forceMergeAll )
		log(LOG_INFO,"merge: forcing merge for "
		    "for %s. (collnum=%"INT32")",m_dbname,(int32_t)m_collnum);

	// if we are trying to merge titledb but a titledb dump is going on
	// then do not do the merge, we do not want to overwrite tfndb via
	// RdbDump::updateTfndbLoop() 
	char rdbId = getIdFromRdb ( m_rdb );
	if ( rdbId == RDB_TITLEDB && g_titledb.m_rdb.m_dump.isDumping() ) {
		if ( doLog ) 
			log(LOG_INFO,"db: Can not merge titledb while it "
			    "is dumping.");
		return false;
	}

	// or if in repair mode, do not mess with any files in any coll
	// unless they are secondary rdbs... allow tagdb to merge, too.
	// secondary rdbs are like g_indexdb2, and should be merge. usually
	// when adding to g_indexdb, it is in a different collection for a 
	// full rebuild and we are not actively searching it so it doesn't
	// need to be merged.
	/*
	if ( g_repair.isRepairActive() && 
	     //! g_conf.m_fullRebuild &&
	     //! g_conf.m_removeBadPages &&
	     ! isSecondaryRdb ( (uint8_t)rdbId ) && 
	     rdbId != RDB_TAGDB )
		return;
	*/

	// if a dump is happening it will always be the last file, do not
	// include it in the merge
	int32_t numFiles = m_numFiles;
	if ( numFiles > 0 && m_dump->isDumping() ) numFiles--;
	// need at least 1 file to merge, stupid (& don't forget: u r stooopid)
	// crap, we need to recover those tagdb0000.002.dat files, so
	// comment this out so we can do a resume on it
	//if ( numFiles <= 1 ) return;
	// . wait for all unlinking and renaming activity to flush out
	// . otherwise, a rename or unlink might still be waiting to happen
	//   and it will mess up our merge
	// . right after a merge we get a few of these printed out...
	if ( m_numThreads > 0 ) {
		if ( doLog )
			log(LOG_INFO,"merge: Waiting for unlink/rename "
			    "operations to finish before attempting merge "
			    "for %s. (collnum=%"INT32")",m_dbname,(int32_t)m_collnum);
		return false;
	}

	if ( g_numThreads > 0 ) {
		// prevent log spam
		static int32_t s_lastTime = 0;
		int32_t now = getTimeLocal();
		if ( now - s_lastTime > 0 && doLog )
			log(LOG_INFO,"merge: Waiting for another "
			    "collection's unlink/rename "
			    "operations to finish before attempting merge "
			    "for %s (collnum=%"INT32").",
			    m_dbname,(int32_t)m_collnum);
		s_lastTime = now;
		return false;
	}


	// set m_minToMerge from coll rec if we're indexdb
	CollectionRec *cr = g_collectiondb.m_recs [ m_collnum ];
	// now see if collection rec is there to override us
	//if ( ! cr ) {
	if ( ! cr && ! m_rdb->m_isCollectionLess ) {
		g_errno = 0;
		log("merge: Could not find coll rec for %s.",m_coll);
	}
	// set the max files to merge in a single merge operation
	m_absMaxFiles = -1;
	m_absMaxFiles = 50;
	/*
	if ( cr && m_rdb == g_indexdb.getRdb() ) {
		m_absMaxFiles = cr->m_indexdbMinTotalFilesToMerge;
		// watch out for bad values
		if ( m_absMaxFiles < 2 ) m_absMaxFiles = 2;
	}
	if ( cr && m_rdb == g_datedb.getRdb() ) {
		m_absMaxFiles = cr->m_indexdbMinTotalFilesToMerge;
		// watch out for bad values
		if ( m_absMaxFiles < 2 ) m_absMaxFiles = 2;
	}
	*/
	// m_minToMerge is -1 if we should let cr override but if m_minToMerge
	// is actually valid at this point, use it as is, therefore, just set
	// cr to NULL
	m_minToMerge = m_minToMergeArg;
	if ( cr && m_minToMerge > 0 ) cr = NULL;
	// if cr is non-NULL use its value now
	if ( cr && m_rdb == g_posdb.getRdb() ) 
		m_minToMerge = cr->m_posdbMinFilesToMerge;
	if ( cr && m_rdb == g_titledb.getRdb() ) 
		m_minToMerge = cr->m_titledbMinFilesToMerge;
	if ( cr && m_rdb == g_spiderdb.getRdb() ) 
		m_minToMerge = cr->m_spiderdbMinFilesToMerge;
	//if ( cr && m_rdb == g_spiderdb.getRdb() ) 
	//	m_minToMerge = cr->m_spiderdbMinFilesToMerge;
	//if ( cr && m_rdb == g_sectiondb.getRdb() ) 
	//	m_minToMerge = cr->m_sectiondbMinFilesToMerge;
	//if ( cr && m_rdb == g_sectiondb.getRdb() ) 
	//	m_minToMerge = cr->m_sectiondbMinFilesToMerge;
	//if ( cr && m_rdb == g_checksumdb.getRdb() ) 
	//	m_minToMerge = cr->m_checksumdbMinFilesToMerge;
	//if ( cr && m_rdb == g_clusterdb.getRdb() ) 
	//	m_minToMerge = cr->m_clusterdbMinFilesToMerge;
	//if ( cr && m_rdb == g_datedb.getRdb() ) 
	//	m_minToMerge = cr->m_datedbMinFilesToMerge;
	//if ( cr && m_rdb == g_statsdb.getRdb() )
	//if ( m_rdb == g_statsdb.getRdb() )
	//	m_minToMerge = g_conf.m_statsdbMinFilesToMerge;
	if ( m_rdb == g_syncdb.getRdb() )
		m_minToMerge = g_syncdb.m_rdb.m_minToMerge;
	if ( cr && m_rdb == g_linkdb.getRdb() )
		m_minToMerge = cr->m_linkdbMinFilesToMerge;
	if ( cr && m_rdb == g_cachedb.getRdb() )
		m_minToMerge = 4;
	if ( cr && m_rdb == g_serpdb.getRdb() )
		m_minToMerge = 4;
	if ( cr && m_rdb == g_monitordb.getRdb() )
		m_minToMerge = 4;
	if ( cr && m_rdb == g_tagdb.getRdb() )
		m_minToMerge = cr->m_tagdbMinFilesToMerge;

	// if we are reblancing this coll then keep merges tight so all
	// the negative recs annihilate with the positive recs to free
	// up disk space since we could be int16_t on disk space.
	//if ( g_rebalance.m_isScanning &&
	//     // if might have moved on if not able to merge because
	//     // another was merging... so do this anyway...
	//     g_rebalance.m_collnum == m_collnum )
	//	m_minToMerge = 2;
	

	// secondary rdbs are used for rebuilding, so keep their limits high
	//if ( m_rdb == g_indexdb2.getRdb    () ) m_minToMerge = 50;
	// TODO: make this 200!!!
	//if ( m_rdb == g_posdb2.getRdb    () ) m_minToMerge = 10;
	//if ( m_rdb == g_spiderdb2.getRdb   () )	m_minToMerge = 20;
	//if ( m_rdb == g_sectiondb2.getRdb  () )	m_minToMerge = 200;
	//if ( m_rdb == g_checksumdb2.getRdb () )	m_minToMerge = 20;
	//if ( m_rdb == g_clusterdb2.getRdb  () )	m_minToMerge = 20;
	//if ( m_rdb == g_datedb2.getRdb     () )	m_minToMerge = 20;
	//if ( m_rdb == g_tfndb2.getRdb      () )	m_minToMerge = 2;
	//if ( m_rdb == g_tagdb2.getRdb     () )	m_minToMerge = 20;
	//if ( m_rdb == g_statsdb2.getRdb    () ) m_minToMerge = 20;

	// always obey the override
	if ( minToMergeOverride >= 2 ) {
		log("merge: Overriding min files to merge of %"INT32" with %"INT32"",
		    m_minToMerge,minToMergeOverride );
		m_minToMerge = minToMergeOverride;
	}

	// if still -1 that is a problem
	if ( m_minToMerge <= 0 ) {
		log("Got bad minToMerge of %"INT32" for %s. Set its default to "
		    "something besides -1 in Parms.cpp or add it to "
		    "CollectionRec.h.",
		    m_minToMerge,m_dbname);
		//m_minToMerge = 2;
		char *xx = NULL; *xx = 0;
	}
	// mdw: comment this out to reduce log spam when we have 800 colls!
	// print it
	//if ( doLog ) 
	//	log(LOG_INFO,"merge: Attempting to merge %"INT32" %s files on disk."
	//	    " %"INT32" files needed to trigger a merge.",
	//	    numFiles,m_dbname,m_minToMerge);
	// . even though another merge may be going on, we can speed it up
	//   by entering urgent merge mode. this will prefer the merge disk
	//   ops over dump disk ops... essentially starving the dumps and
	//   spider reads
	// . if we are significantly over our m_minToMerge limit
	//   then set m_mergeUrgent to true so merge disk operations will
	//   starve any spider disk reads (see Threads.cpp for that)
	// . TODO: fix this: if already merging we'll have an extra file
	// . i changed the 4 to a 14 so spider is not slowed down so much
	//   when getting link info... but real time queries will suffer!
	if ( ! m_mergeUrgent && numFiles - 14 >= m_minToMerge ) {
		m_mergeUrgent = true;
		if ( doLog ) 
		log(LOG_INFO,"merge: Entering urgent merge mode for %s "
		    "coll=%s.", m_dbname,m_coll);
		g_numUrgentMerges++;
	}


	// tfndb has his own merge class since titledb merges write tfndb recs
	RdbMerge *m = &g_merge;
	if ( m->isMerging() )
		return false;

	// if we are tfndb and someone else is merging, do not merge unless
	// we have 3 or more files
	int32_t minToMerge = m_minToMerge;
	//if (g_tfndb.getRdb()==m_rdb&& g_merge.isMerging() && minToMerge <=2 )
	//	minToMerge = 3;
	// do not start a tfndb merge while someone is dumping because the
	// dump starves the tfndb merge and we clog up adding links. i think
	// this is mainly just indexdb dumps, but we'll see.
	//if(g_tfndb.getRdb() == m_rdb && g_indexdb.m_rdb.isDumping() ) return;

	// are we resuming a killed merge?
	bool resuming = false;
	for ( int32_t j = 0 ; j < numFiles ; j++ ) {
		// skip odd numbered files
		if ( m_fileIds[j] & 0x01 ) continue;
		// yes we are resuming a merge
		resuming = true;
		break;
	}

	// this triggers the negative rec concentration msg below and
	// tries to merge on one file...
	if ( ! resuming && m_numFiles <= 1 ) {
		m_nextMergeForced = false;
		return false;
	}

	// what percent of recs in the collections' rdb are negative?
	// the rdbmaps hold this info
	int64_t totalRecs = 0LL;
	float percentNegativeRecs = getPercentNegativeRecsOnDisk ( &totalRecs);
	bool doNegCheck = false;
	// 1. if disk space is tight and >20% negative recs, force it
	if ( doNegCheck &&
	     g_process.m_diskAvail >= 0 && 
	     g_process.m_diskAvail < 10000000000LL && // 10GB
	     percentNegativeRecs > .20 ) {
		m_nextMergeForced = true;
		forceMergeAll = true;
		log("rdb: hit negative rec concentration of %f "
		    "(total=%"INT64") for "
		    "collnum %"INT32" on db %s when diskAvail=%"INT64" bytes",
		    percentNegativeRecs,totalRecs,(int32_t)m_collnum,
		    m_rdb->m_dbname,g_process.m_diskAvail);
	}
	// 2. if >40% negative recs force it
	if ( doNegCheck && 
	     percentNegativeRecs > .40 ) {
		m_nextMergeForced = true;
		forceMergeAll = true;
		log("rdb: hit negative rec concentration of %f "
		    "(total=%"INT64") for "
		    "collnum %"INT32" on db %s",
		    percentNegativeRecs,totalRecs,(int32_t)m_collnum,
		    m_rdb->m_dbname);
	}


	// . don't merge if we don't have the min # of files
	// . but skip this check if there is a merge to be resumed from b4
	if ( ! resuming && ! forceMergeAll && numFiles < minToMerge ) {
		// now we no longer have to check this collection rdb for
		// merging. this will save a lot of cpu time when we have
		// 20,000+ collections. if we dump a file to disk for it
		// then we set this flag back to false in Rdb.cpp.
		m_checkedForMerge = true;
		return false;
	}

	// bail if already merging THIS class
	if ( m_isMerging ) {
		if ( doLog ) 
			log(LOG_INFO,
			    "merge: Waiting for other merge to complete "
			    "before merging %s.",m_dbname);
		return false;
	}
	// bail if already waiting for it
	if ( m_waitingForTokenForMerge ) {
		if ( doLog ) 
			log(LOG_INFO,"merge: Already requested token. "
			    "Request for %s pending.",m_dbname);
		return false;
	}
	// score it
	m_waitingForTokenForMerge = true;
	// log a note
	//log(0,"RdbBase::attemptMerge: attempting merge for %s",m_dbname );
	// this merge forced?
	//m_nextMergeForced = forceMergeAll;
	// . bail if already merging
	// . no, RdbMerge will sleep in 5 sec cycles into they're done
	// . we may have multiple hosts running on the same cpu/hardDrive
	// . therefore, to maximize disk space, we should only have 1 merge
	//   at a time going on between these hosts
	// . we cannot merge files that are being dumped either because we'll
	//   lose data!!
	// . obsolete this shit for the token stuff
	/*
	if ( g_merge.isMerging() ) {
		// if we've been here once already, return now
		if ( m_inWaiting ) return;
		// otherwise let everyone know we're waiting
		log("RdbBase::attemptMerge: waiting for another merge to finish.");
		// set a flag so we don't keep printing the above msg
		m_inWaiting = true;
		// if it fails then sleep until it works
		g_loop.registerSleepCallback (5000,this,attemptMergeWrapper);
		return;
	}
	if ( m_dump.isDumping() ) {
		// bail if we've been here
		if ( m_inWaiting  ) return;
		// otherwise let everyone know we're waiting
		log("RdbBase::attemptMerge: waiting for dump to finish.");
		// set a flag so we don't keep printing the above msg
		m_inWaiting = true;
		// . if it fails then sleep until it works
		// . wait int16_ter to try and sneak a merge in if our
		//   buffer is so small that we're always dumping!
		//   should the next merge we do be forced?
		g_loop.registerSleepCallback (5000,this,attemptMergeWrapper);
		return;
	}
	*/
	// remember niceness for calling g_merge.merge()
	m_niceness = niceness;
	// debug msg
	//log (0,"RdbBase::attemptMerge: %s: getting token for merge", m_dbname);
	// . get token before merging
	// . returns true and sets g_errno on error
	// . returns true if we always have the token (just one host in group)
	// . returns false if blocks (the usual case)
	// . higher priority requests always supercede lower ones
	// . ensure we only call this once per dump we need otherwise, 
	//   gotTokenForMergeWrapper() may be called multiple times
	// . if a host is always in urgent mode he may starve another host
	//   whose is too, but his old request has an low priority.
	//int32_t priority = 0;
	// save this so gotTokenForMerge() can use it
	m_doLog = doLog;
	//if ( m_mergeUrgent ) priority = 2;
	//else                 priority = 0;
	// tfndb doesn't need token, since titledb merge writes tfndb recs
	// if ( //m_rdb != g_tfndb.getRdb() &&
	//      ! g_msg35.getToken ( this , gotTokenForMergeWrapper, priority))
	// 	return ;
	// bitch if we got token because there was an error somewhere
	if ( g_errno ) {
		log(LOG_LOGIC,"merge: attemptMerge: %s failed: %s",
		    m_dbname,mstrerror(g_errno));
		g_errno = 0 ;
		log(LOG_LOGIC,"merge: attemptMerge: %s: uh oh...",m_dbname);
		// undo request
		m_waitingForTokenForMerge = false;		 
		// we don't have the token, so we're fucked...
		return false;
	}
	// debug msg
	//if ( doLog )
	//log(LOG_INFO,"merge: Got merge token for %s without blocking.",
	//    m_dbname);
	// if did not block
/*
	gotTokenForMerge ( );
}

void gotTokenForMergeWrapper ( void *state ) {
	RdbBase *THIS = (RdbBase *)state;
	THIS->gotTokenForMerge();
}

void RdbBase::gotTokenForMerge ( ) {
*/
	// debug mg
	//log("RdbBase::gotTokenForMerge: for %s",m_dbname);
	// don't repeat
	m_waitingForTokenForMerge = false;
	// if a dump is happening it will always be the last file, do not
	// include it in the merge
	//int32_t numFiles = m_numFiles;
	//if ( numFiles > 0 && m_dump->isDumping() ) numFiles--;

	// . if we are significantly over our m_minToMerge limit
	//   then set m_mergeUrgent to true so merge disk operations will
	//   starve any spider disk reads (see Threads.cpp for that)
	// . TODO: fix this: if already merging we'll have an extra file
	// . i changed the 4 to a 14 so spider is not slowed down so much
	//   when getting link info... but real time queries will suffer!
	if ( ! m_mergeUrgent && numFiles - 14 >= m_minToMerge ) {
		m_mergeUrgent = true;
		if ( m_doLog )
		log(LOG_INFO,
		    "merge: Entering urgent merge mode (2) for %s coll=%s.", 
		    m_dbname,m_coll);
		g_numUrgentMerges++;
	}
	// tfndb has his own merge class since titledb merges write tfndb recs
	//RdbMerge *m = &g_merge;
	//if ( m_rdb == g_tfndb.getRdb() ) m = &g_merge2;
	// sanity check
	if ( m_isMerging || m->isMerging() ) {
		//if ( m_doLog )
			//log(LOG_INFO,
			//"merge: Someone already merging. Waiting for "
			//"merge token "
			//"in order to merge %s.",m_dbname);
		return false;
	}

	// or if # threads out is positive
	if ( m_numThreads > 0 ) return false;

	// clear for take-off
	//m_inWaiting = false;
	//	log(0,"RdbBase::attemptMerge: someone else merging"); return; }
	// . i used to just merge all the files into 1
	// . but it may be more efficient to merge just enough files as
	//   to put m_numFiles below m_minToMerge
	// . if we have the files : A B C D E F and m_minToMerge is 6
	//   then merge F and E, but if D is < E merged D too, etc...
	// . this merge algorithm is definitely better than merging everything
	//   if we don't do much reading to the db, only writing
	int32_t      n    = 0;
	int32_t      mergeFileId;
	int32_t      mergeFileNum;
	float     minr ;
	int64_t mint ;
	int32_t      mini ;
	bool      minOld ;
	int32_t      id2  = -1;
	//int32_t      minToMerge;
	bool      overide = false;
	//int32_t      smini = - 1;
	//int32_t      sn ;
	//int64_t tfndbSize = 0;
	int32_t      nowLocal = getTimeLocal();

	// but if niceness is 0 merge ALL files
	//if ( niceness == 0 ) {
	//	n = m_numFiles;
	//	i = -1;
	//	goto skip;
	//}

	//char rdbId = getIdFromRdb ( m_rdb );

	// if one file is even #'ed then we were merging into that, but
	// got interrupted and restarted. maybe the power went off or maybe
	// gb saved and exited w/o finishing the merge.
	for ( int32_t j = 0 ; j < numFiles ; j++ ) {
		// skip odd numbered files
		if ( m_fileIds[j] & 0x01 ) continue;
		// hey, we got a file that was being merged into
		mergeFileId = m_fileIds[j];
		// store the merged data into this file #
		mergeFileNum = j ;
		// files being merged into have a filename like 
		// indexdb0000.003.dat where the 003 indicates how many files
		// is is merging in case we have to resume them due to power 
		// loss or whatever
		char *s = m_files[j]->getFilename();
		// skip "indexdb0001."
		s += gbstrlen ( m_dbname ) + 5;
		// if titledb we got a "-023" part now
		if ( m_isTitledb ) {
			id2 = atol2 ( s , 3 );
			if ( id2 < 0 ) { char *xx = NULL; *xx =0; }
			s += 4;
		}
		// get the "003" part
		n = atol2 ( s , 3 );
		// sanity check
		if ( n <= 1 ) {
			log(LOG_LOGIC,"merge: attemptMerge: Resuming. bad "
			    "engineer for %s coll=%s",m_dbname,m_coll);
			//g_msg35.releaseToken();
			if ( m_mergeUrgent ) {
				log("merge: leaving urgent merge mode");
				g_numUrgentMerges--;
				m_mergeUrgent = false;
			}
			return false;
		}
		// make a log note
		log(LOG_INFO,"merge: Resuming killed merge for %s coll=%s.",
		    m_dbname,m_coll);
		// compute the total size of merged file
		mint = 0;
		int32_t mm = 0;
		for ( int32_t i = mergeFileNum ; i <= mergeFileNum + n ; i++ ) {
			if ( i >= m_numFiles ) {
				log("merge: Number of files to merge has "
				    "shrunk from %"INT32" to %"INT32" since time of "
				    "last merge. Probably because those files "
				    "were deleted because they were "
				    "exhausted and had no recs to offer."
				    ,n,m_numFiles);
				//char *xx=NULL;*xx=0;
				break;
			}
			if ( ! m_files[i] ) {
				log("merge: File #%"INT32" is NULL, skipping.",i);
				continue;
			}
			// only count files AFTER the file being merged to
			if ( i > j ) mm++;
			mint += m_files[i]->getFileSize();
		}
		if ( mm != n ) {
			log("merge: Only merging %"INT32" instead of the "
			    "original %"INT32" files.",mm,n);
			// cause the "if (mm==0)" to kick in below
			if ( mm == 1 || mm == 0 ) {
				mm = 0;
				// fix renaming final merged file tagdb-001.dat
				mergeFileId = 2;
				m_fileIds[j] = 1;
			}
		}
		// how many files to merge?
		n = mm;
		// allow a single file to continue merging if the other
		// file got merged out already
		if ( mm > 0 ) overide = true;

		// if we've already merged and already unlinked, then the
		// process exited, now we restart with just the final 
		// merge final and we need to do the rename
		if ( mm == 0 && rdbId != RDB_TITLEDB ) {
			m_isMerging = false;
			// make a fake file before us that we were merging
			// since it got nuked on disk
			//incorporateMerge();
			char fbuf[256];
			sprintf(fbuf,"%s%04"INT32".dat",m_dbname,mergeFileId-1);
			if ( m_isTitledb )
				sprintf(fbuf,"%s%04"INT32"-%03"INT32".dat",
					m_dbname,mergeFileId-1,id2);
			log("merge: renaming final merged file %s",fbuf);
			// this does not use a thread...
			m_files[j]->rename(fbuf);
			sprintf(fbuf,"%s%04"INT32".map",m_dbname,mergeFileId-1);
			//File *mf = m_maps[j]->getFile();
			m_maps[j]->rename(fbuf);
			log("merge: renaming final merged file %s",fbuf);
			return false;
		}

		// resume the merging
		goto startMerge;
	}

	minToMerge = m_minToMerge;


	// if we are reblancing this coll then keep merges tight so all
	// the negative recs annihilate with the positive recs to free
	// up disk space since we could be int16_t on disk space.
	//if ( g_rebalance.m_isScanning &&
	//     // if might have moved on if not able to merge because
	//     // another was merging... so do this anyway...
	//     g_rebalance.m_collnum == m_collnum )
	//	minToMerge = 2;


	//if (m_rdb==g_tfndb.getRdb()&& g_merge.isMerging() && minToMerge <=2 )
	//	minToMerge = 3;

	// look at this merge:
	// indexdb0003.dat.part1
	// indexdb0003.dat.part2
	// indexdb0003.dat.part3
	// indexdb0003.dat.part4
	// indexdb0003.dat.part5
	// indexdb0003.dat.part6
	// indexdb0003.dat.part7
	// indexdb0039.dat
	// indexdb0039.dat.part1
	// indexdb0045.dat
	// indexdb0047.dat
	// indexdb0002.002.dat
	// indexdb0002.002.dat.part1
	// it should have merged 45 and 46 since they are so much smaller
	// even though the ratio between 3 and 39 is lower. we did not compute
	// our dtotal correctly...

	// . use greedy method
	// . just merge the minimum # of files to stay under m_minToMerge
	// . files must be consecutive, however
	// . but ALWAYS make sure file i-1 is bigger than file i
	n = numFiles - minToMerge + 2 ;
	// . limit for posdb since more than about 8 gets abnormally slow
	// . no this was a ruse, the spider was evaluating firstips in the
	//   bg slowing the posdb merge down.
	//if ( m_rdb && m_rdb->m_rdbId == RDB_POSDB && n > 8 )
	//	n = 8;
	// titledb should always merge at least 50 files no matter what though
	// cuz i don't want it merging its huge root file and just one
	// other file... i've seen that happen... but don't know why it didn't
	// merge two small files! i guess because the root file was the
	// oldest file! (38.80 days old)???
	if ( m_isTitledb && n < 50 && minToMerge > 200 ) {
		// force it to 50 files to merge
		n = 50;
		// but must not exceed numFiles!
		if ( n > numFiles ) n = numFiles;
	}
	// NEVER merge more than this many files, our current merge routine
	// does not scale well to many files
	if ( m_absMaxFiles > 0 && n > m_absMaxFiles ) n = m_absMaxFiles;
	//smini = -1;
	// but if we are forcing then merge ALL, except one being dumped
	if ( m_nextMergeForced ) n = numFiles;
	// or if doing relabalncing, merge them all. tight merge
	//if ( g_rebalance.m_isScanning && g_rebalance.m_collnum == m_collnum) 
	//	n = numFiles;
	//else if ( m_isTitledb ) {
	//	RdbBase *base = g_tfndb.getRdb()->m_bases[m_collnum];
	//	tfndbSize = base->getDiskSpaceUsed();
	//}
	//tryAgain:
	minr = 99999999999.0;
	mint = 0x7fffffffffffffffLL ;
	mini = -1;
	minOld = false;
	for ( int32_t i = 0 ; i + n <= numFiles ; i++ ) {
		// oldest file
		time_t date = -1;
		// add up the string
		int64_t total = 0;
		for ( int32_t j = i ; j < i + n ; j++ ) {
			total += m_files[j]->getFileSize();
			time_t mtime = m_files[j]->getLastModifiedTime();
			// skip on error
			if ( mtime < 0 ) continue;
			if ( mtime > date ) date = mtime;
		}

		// does it have a file more than 30 days old?
		bool old = ( date < nowLocal - 30*24*3600 );
		// not old if error (date will be -1)
		if ( date < 0 ) old = false;
		// if it does, and current winner does not, force ourselves!
		if ( old && ! minOld ) mint = 0x7fffffffffffffffLL ;
		// and if we are not old and the min is, do not consider
		if ( ! old && minOld ) continue;

		// if merging titledb, just pick by the lowest total
		if ( m_isTitledb ) {
			if ( total < mint ) {
				mini   = i;
				mint   = total;
				minOld = old;
				log(LOG_INFO,"merge: titledb i=%"INT32" n=%"INT32" "
				    "mint=%"INT64" mini=%"INT32" "
				    "oldestfile=%.02fdays",
				    i,n,mint,mini,
				    ((float)nowLocal-date)/(24*3600.0) );
			}
			continue;
		}
		// . get the average ratio between mergees
		// . ratio in [1.0,inf)
		// . prefer the lowest average ratio
		double ratio = 0.0;
		for ( int32_t j = i ; j < i + n - 1 ; j++ ) {
			int64_t s1 = m_files[j  ]->getFileSize();
			int64_t s2 = m_files[j+1]->getFileSize();
			int64_t tmp;
			if ( s2 == 0 ) continue;
			if ( s1 < s2 ) { tmp = s1; s1 = s2 ; s2 = tmp; }
			ratio += (double)s1 / (double)s2 ;
		}
		if ( n >= 2 ) ratio /= (double)(n-1);
		// sanity check
		if ( ratio < 0.0 ) {
			logf(LOG_LOGIC,"merge: ratio is negative %.02f",ratio);
			char *xx = NULL; *xx = 0; 
		}
		// the adjusted ratio
		double adjratio = ratio;
		// . adjust ratio based on file size of current winner
		// . if winner is ratio of 1:1 and we are 10:1 but winner
		//   is 10 times bigger than us, then we have a tie. 
		// . i think if we are 10:1 and winner is 3 times bigger
		//   we should have a tie
		if ( mini >= 0 && total > 0 && mint > 0 ) {
			double sratio = (double)total/(double)mint;
			//if ( mint>total ) sratio = (float)mint/(float)total;
			//else              sratio = (float)total/(float)mint;
			adjratio *= sratio;
		}


		// debug the merge selection
		char      tooBig   = 0;
		int64_t prevSize = 0;
		if ( i > 0 ) prevSize = m_files[i-1]->getFileSize();
		if ( i > 0 && prevSize < total/4 ) tooBig = 1;
		log(LOG_INFO,"merge: i=%"INT32" n=%"INT32" ratio=%.2f adjratio=%.2f "
		    "minr=%.2f mint=%"INT64" mini=%"INT32" prevFileSize=%"INT64" "
		    "mergeFileSize=%"INT64" tooBig=%"INT32" oldestfile=%.02fdays "
		    "collnum=%"INT32"",
		    i,n,ratio,adjratio,minr,mint,mini,
		    prevSize , total ,(int32_t)tooBig,
		    ((float)nowLocal-date)/(24*3600.0) ,
		    (int32_t)m_collnum);

		// . if we are merging the last file, penalize it
		// . otherwise, we dump little files out and almost always
		//   merge them into the next file right away, but if we
		//   merged two in the back instead, our next little dump would
		//   merge with our previous little dump in no time. so we
		//   have to look into the future a step...
		// . count the next to last guy twice
		//if ( n == 2 && i + n == numFiles && i + n - 2 >= 0 ) 
		//	total += m_files[i+n-2]->getFileSize();
		// bring back the greedy merge
		if ( total >= mint ) continue;
		//if ( adjratio > minr && mini >= 0 ) continue;
		//if ( ratio > minr && mini >= 0 ) continue;
		// . don't get TOO lopsided on me now
		// . allow it for now! this is the true greedy method... no!
		// . an older small file can be cut off early on by a merge
		//   of middle files. the little guy can end up never getting
		//   merged unless we have this.
		// . allow a file to be 4x bigger than the one before it, this
		//   allows a little bit of lopsidedness.
		if (i > 0  && m_files[i-1]->getFileSize() < total/4 ) continue;
		//min  = total;
		minr   = ratio;
		mint   = total;
		mini   = i;
		minOld = old;
	}
	// . if titledb, merge at least enough titledb recs to equal the size
	//   of tfndb times three if we can, without going too far overboard
	//   because we have to read the *entire* tfndb just to merge two
	//   titledb files... for now at least
	// . save the settings that work in case we have to pop them
	// . times 6 is better than times 3 because reading from tfndb can
	//   be cpu intensive if we have to merge in the list from the rdb tree
	//if ( m_isTitledb && mint<6*tfndbSize && mini>=0 && n<numFiles ) {
	//	smini = mini;
	//	sn    = n;
	//	// but try increasing n unti we bust
	//	n++;
	//	goto tryAgain;
	//}
	// bring back saved if we should
	//if ( smini >= 0 ) {
	//	mini = smini;
	//	n    = sn;
	//}
	// if no valid range, bail
	if ( mini == -1 ) { 
		log(LOG_LOGIC,"merge: gotTokenForMerge: Bad engineer. mini "
		    "is -1.");
		//g_msg35.releaseToken(); 
		return false; 
	}
	// . merge from file #mini through file #(mini+n)
	// . these files should all have ODD fileIds so we can sneak a new
	//   mergeFileId in there
	mergeFileId = m_fileIds[mini] - 1;
	// get new id, -1 on error
	id2 = -1;
	if ( m_isTitledb ) {
		//id2 = getAvailId2 ( );
		// this is obsolete, make 000 for now
		id2 = 000;
		if ( id2 < 0 ) {
			log(LOG_LOGIC,"merge: attemptMerge: could not add "
			    "new file for titledb. No avail ids."); 
			g_errno = 0;
			//g_msg35.releaseToken();
			return false; 
		}
	}
	// . make a filename for the merge
	// . always starts with file #0
	// . merge targets are named like "indexdb0000.dat.002"
	// . for titledb is "titledb0000-023.dat.003" (23 is id2)
	// . this now also sets m_mergeStartFileNum for us... but we override
	//   below anyway. we have to set it there in case we startup and
	//   are resuming a merge.
	mergeFileNum = addFile ( mergeFileId , true/*new file?*/ , n , id2 ) ;
	if ( mergeFileNum < 0 ) {
		log(LOG_LOGIC,"merge: attemptMerge: Could not add new file."); 
		g_errno = 0;
		//g_msg35.releaseToken();
		return false; 
	}
	// we just opened a new file
	//g_sync.addOp ( OP_OPEN , m_files[mergeFileNum] , 0 );
	
	// is it a force?
	if ( m_nextMergeForced ) log(LOG_INFO,
				     "merge: Force merging all %s "
				     "files, except those being dumped now.",
				     m_dbname);
	// clear after each call to attemptMerge()
	m_nextMergeForced = false;

 startMerge:
	// sanity check
	if ( n <= 1 && ! overide ) {
	       log(LOG_LOGIC,"merge: gotTokenForMerge: Not merging %"INT32" files.",
		    n);
	       //g_msg35.releaseToken(); 
		return false; 
	}

	// . save the # of files we're merging for the cleanup process
	// . don't include the first file, which is now file #0
	m_numFilesToMerge   = n  ; // numFiles - 1;
	m_mergeStartFileNum = mergeFileNum + 1; // 1

	//CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
	char *coll = "";
	if ( cr ) coll = cr->m_coll;

	// log merge parms
	log(LOG_INFO,"merge: Merging %"INT32" %s files to file id %"INT32" now. "
	    "collnum=%"INT32" coll=%s",
	    n,m_dbname,mergeFileId,(int32_t)m_collnum,coll);

	// print out file info
	m_numPos = 0;
	m_numNeg = 0;
	for ( int32_t i = m_mergeStartFileNum ; 
	      i < m_mergeStartFileNum + m_numFilesToMerge ; i++ ) {
		m_numPos += m_maps[i]->getNumPositiveRecs();		
		m_numNeg += m_maps[i]->getNumNegativeRecs();
		log(LOG_INFO,"merge: %s (#%"INT32") has %"INT64" positive "
		     "and %"INT64" negative records." , 
		     m_files[i]->getFilename() ,
		     i , 
		     m_maps[i]->getNumPositiveRecs(),
		     m_maps[i]->getNumNegativeRecs() );
	}
	log(LOG_INFO,"merge: Total positive = %"INT64" Total negative = %"INT64".",
	     m_numPos,m_numNeg);

	// assume we are now officially merging
	m_isMerging = true;

	m_rdb->m_numMergesOut++;

	//char rdbId = getIdFromRdb ( m_rdb );

	// sanity check
	if ( m_niceness == 0 ) { char *xx=NULL;*xx=0 ; }

	// . start the merge
	// . returns false if blocked, true otherwise & sets g_errno
	if ( ! m->merge ( rdbId  ,
			  m_collnum ,
			  m_files[mergeFileNum] ,
			  m_maps [mergeFileNum] ,
			  id2                   ,
			  m_mergeStartFileNum   ,
			  m_numFilesToMerge     ,
			  m_niceness            ,
			  NULL,//m_pc                  ,
			  mint /*maxTargetFileSize*/ ,
			  m_ks                  ) ) 
		// we started the merge so return true here
		return true;
	// hey, we're no longer merging i guess
	m_isMerging = false;
	// decerment this count
	m_rdb->m_numMergesOut--;
	// . if we have no g_errno that is bad!!!
	// . we should dump core here or something cuz we have to remove the
	//   merge file still to be correct
	//if ( ! g_errno )
	//	log(LOG_INFO,"merge: Got token without blocking.");
	// we now set this in init() by calling m_merge.init() so it
	// can pre-alloc it's lists in it's s_msg3 class
	//		       g_conf.m_mergeMaxBufSize ) ) return ;
	// bitch on g_errno then clear it
	if ( g_errno ) 
		log("merge: Had error getting merge token for %s: %s.",
		    m_dbname,mstrerror(g_errno));
	g_errno = 0;
	// give token back
	//g_msg35.releaseToken();
	// try again
	//m_rdb->attemptMerge( m_niceness, false , true );
	// how did this happen?
	log("merge: did not block for some reason.");
	return true;
}

// . use the maps and tree to estimate the size of this list w/o hitting disk
// . used by Indexdb.cpp to get the size of a list for IDF weighting purposes
//int32_t RdbBase::getListSize ( key_t startKey , key_t endKey , key_t *max ,
int64_t RdbBase::getListSize ( char *startKey , char *endKey , char *max ,
			         int64_t oldTruncationLimit ) {
	// . reset this to low points
	// . this is on
	//*max = endKey;
	KEYSET(max,endKey,m_ks);
	bool first = true;
	// do some looping
	//key_t newGuy;
	char newGuy[MAX_KEY_BYTES];
	int64_t totalBytes = 0;
	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) {
		// the start and end pages for a page range
		int32_t pg1 , pg2;
		// get the start and end pages for this startKey/endKey
		m_maps[i]->getPageRange ( startKey , 
					  endKey   , 
					  &pg1     , 
					  &pg2     ,
					  //&newGuy  ,
					  newGuy   ,
					  oldTruncationLimit );
		// . get the range size add it to count
		// . some of these records are negative recs (deletes) so
		//   our count may be small
		// . also, count may be a bit small because getRecSizes() may
		//   not recognize some recs on the page boundaries as being
		//   in [startKey,endKey]
		// . this can now return negative sizes
		// . the "true" means to subtract page sizes that begin with
		//   delete keys (the key's low bit is clear)
		// . get the minKey and maxKey in this range
		// . minKey2 may be bigger than the actual minKey for this
		//   range, likewise, maxKey2 may be smaller than the actual
		//   maxKey, but should be good estimates
		int64_t maxBytes = m_maps[i]->getMaxRecSizes ( pg1     ,
							    pg2     ,
							    startKey,
							    endKey  ,
							    true    );//subtrct
		// get the min as well
		int64_t minBytes = m_maps[i]->getMinRecSizes ( pg1     ,
							    pg2     ,
							    startKey,
							    endKey  ,
							    true    );//subtrct
		int64_t avg = (maxBytes + minBytes) / 2LL;
		// use that
		totalBytes += avg;
		// if not too many pages then don't even bother setting "max"
		// since it is used only for interpolating if this term is
		// truncated. if only a few pages then it might be way too
		// small.
		if ( pg1 + 5 > pg2 ) continue;
		// replace *max automatically if this is our first time
		//if ( first ) { *max = newGuy; first = false; continue; }
		if ( first ) { 
			KEYSET(max,newGuy,m_ks); first = false; continue; }
		// . get the SMALLEST max key
		// . this is used for estimating what the size of the list
		//   would be without truncation
		//if ( newGuy > *max ) *max = newGuy;
		if ( KEYCMP(newGuy,max,m_ks)>0 ) KEYSET(max,newGuy,m_ks);
	}

	// debug
	// log("PHASE1: sk=%s ek=%s bytes=%i"
	//     ,KEYSTR(startKey,m_ks)
	//     ,KEYSTR(endKey,m_ks)
	//     ,(int)totalBytes);

	// TODO: now get from the btree!
	// before getting from the map (on disk IndexLists) get upper bound
	// from the in memory b-tree
	//int32_t n=getTree()->getListSize (startKey, endKey, &minKey2, &maxKey2);
	int64_t n;
	if(m_tree) n = m_tree->getListSize ( m_collnum ,
					     startKey , endKey , NULL , NULL );
	else n = m_buckets->getListSize ( m_collnum ,
					  startKey , endKey , NULL , NULL );

	// debug
	// RdbList list;
	// m_buckets->getList ( m_collnum,
	// 		     startKey,
	// 		     endKey,
	// 		     10000000,
	// 		     &list ,
	// 		     NULL,
	// 		     NULL,
	// 		     true);
	// g_posdb.printList ( list );


	totalBytes += n;
	//if ( minKey2 < *minKey ) *minKey = minKey2;
	//if ( maxKey2 > *maxKey ) *maxKey = maxKey2;	
	// ensure totalBytes >= 0
	if ( totalBytes < 0 ) totalBytes = 0;

	// debug
	// log("PHASE2: sk=%s ek=%s bytes=%i"
	//     ,KEYSTR(startKey,m_ks)
	//     ,KEYSTR(endKey,m_ks)
	//     ,(int)totalBytes);

	return totalBytes;
}

int64_t RdbBase::getNumGlobalRecs ( ) {
	return getNumTotalRecs() * g_hostdb.m_numShards;
}

// . return number of positive records - negative records
int64_t RdbBase::getNumTotalRecs ( ) {
	int64_t numPositiveRecs = 0;
	int64_t numNegativeRecs = 0;
	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) {
		// skip even #'d files -- those are merge files
		if ( (m_fileIds[i] & 0x01) == 0 ) continue;
		numPositiveRecs += m_maps[i]->getNumPositiveRecs();
		numNegativeRecs += m_maps[i]->getNumNegativeRecs();
	}
	// . add in the btree
	// . TODO: count negative and positive recs in the b-tree
	// . assume all positive for now
	// . for now let Rdb add the tree in RdbBase::getNumTotalRecs()
	if(m_tree) {
		numPositiveRecs += m_tree->getNumPositiveKeys(m_collnum);
		numNegativeRecs += m_tree->getNumNegativeKeys(m_collnum);
	}
	else {
		// i've seen this happen when adding a new coll i guess
		if ( ! m_buckets ) return 0;
		//these routines are slow because they count every time.
		numPositiveRecs += m_buckets->getNumKeys(m_collnum);
		//numPositiveRecs += m_buckets->getNumPositiveKeys(m_collnum);
		//numNegativeRecs += m_buckets->getNumNegativeKeys(m_collnum);
	}
	//int64_t total = numPositiveRecs - numNegativeRecs;
	//if ( total < 0 ) return 0LL;
	//return total;
	return numPositiveRecs - numNegativeRecs;
}

// . how much mem is alloced for all of our maps?
// . we have one map per file
int64_t RdbBase::getMapMemAlloced () {
	int64_t alloced = 0;
	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) 
		alloced += m_maps[i]->getMemAlloced();
	return alloced;
}

// sum of all parts of all big files
int32_t RdbBase::getNumSmallFiles ( ) {
	int32_t count = 0;
	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) 
		count += m_files[i]->getNumParts();
	return count;
}

int64_t RdbBase::getDiskSpaceUsed ( ) {
	int64_t count = 0;
	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) 
		count += m_files[i]->getFileSize();
	return count;
}

// . convert a tfn (title file num)(encoded in tfndb recs) to a real file num
// . used by Msg22
// . tfndb maps a docid to a tfn (tfndb is like an RdbMap but fatter)
// . if id2 is not present, it is because of our old bug: some middle files
//   got merged and the file nums changed, so all the old tfns are mapping to
//   an older file than they should!  so if 2 merges happened, if the merge 
//   above the other merge
// . IMPORTANT: de-compensate for merge file inserted before us
int32_t RdbBase::getFileNumFromId2 ( int32_t id2 ) {
	int32_t prev = -1;
	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) {
		if ( m_fileIds2[i] == id2 ) {
			if ( m_hasMergeFile && i >= m_mergeStartFileNum ) i--;
			return i;
		}
		if ( m_fileIds2[i] < id2 ) prev = i;
	}
	// if not there, must have been merged below, use prev
	if ( m_hasMergeFile && prev >= m_mergeStartFileNum ) prev--;
	// debug msg
	//log("Rdb:getFileNumFromId2: id2 of %"INT32" is invalid. returning "
	//    "startFileNum of %"INT32".",id2,prev,id2);
	log("db: titledb*-%"INT32".dat file in collection \"%s\" "
	    "is referenced but no longer exists. "
	    "To fix this do a tight merge on titledb; you may have to delete "
	    "tfndb* and regenerate it using the 'gb gendbs' command after the "
	    "tight merge completes if the document is indeed missing. Cause "
	    "may have been an improper shutdown, or not saving tfndb or "
	    "titledb, or a missing document in titledb.",id2,m_coll);
	//log("DISK: titledb*-%"INT32".dat file is referenced but no longer exists."
	//  " See section on Database Repair in overview.html to fix it.",id2);
	return -1; // prev;
}

// . used by Sync.cpp to convert a sync point to a list of file nums
//   to read from for syncing
// . if it is m_savedFile then return -2
// . if it is not found   then return -1
int32_t RdbBase::getFileNumFromName ( char *fname ) {
	// this is a special case
	if ( strstr ( fname , "-sav" ) ) return -2;
	// scan our current files
	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) {
		char *ff = m_files[i]->getFilename();
		if ( strcmp ( ff , fname ) == 0 ) return i;
	}
	return -1;
}

void RdbBase::closeMaps ( bool urgent ) {
	for ( int32_t i = 0 ; i < m_numFiles ; i++ )
		m_maps[i]->close ( urgent );
}

void RdbBase::saveMaps ( bool useThread ) {
	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) {
		if ( ! m_maps[i] ) {
			log("base: map for file #%i is null",i);
			continue;
		}
		m_maps[i]->writeMap ( false );
	}
}

void RdbBase::verifyDiskPageCache ( ) {
	//if ( !m_pc ) return;
	// disable for now
	return;
	// for ( int32_t i = 0; i < m_numFiles; i++ ){
	// 	BigFile *f = m_files[i];
	// 	m_pc->verifyData(f);
	// }
}

bool RdbBase::verifyFileSharding ( ) {

	if ( m_rdb->m_isCollectionLess ) return true;

	// if swapping in from CollectionRec::getBase() then do
	// not re-verify file sharding! only do at startup
	if ( g_loop.m_isDoingLoop ) return true;

	// skip for now to speed up startup
	static int32_t s_count = 0;
	s_count++;
	if ( s_count == 50 )
		log("db: skipping shard verification for remaining files");
	if ( s_count >= 50 ) 
		return true;

	g_threads.disableThreads();

	Msg5 msg5;
	//Msg5 msg5b;
	RdbList list;
	char startKey[MAX_KEY_BYTES];
	char endKey[MAX_KEY_BYTES];
	KEYMIN(startKey,MAX_KEY_BYTES);
	KEYMAX(endKey,MAX_KEY_BYTES);
	int32_t minRecSizes = 64000;
	char rdbId = m_rdb->m_rdbId;
	if ( rdbId == RDB_TITLEDB ) minRecSizes = 640000;
	
	log ( "db: Verifying shard parity for %s of %"INT32" bytes "
	      "for coll %s (collnum=%"INT32")...", 
	      m_dbname , 
	      minRecSizes,
	      m_coll , (int32_t)m_collnum );

	if ( ! msg5.getList ( m_rdb->m_rdbId, //RDB_POSDB   ,
			      m_collnum       ,
			      &list         ,
			      startKey      ,
			      endKey        ,
			      minRecSizes   ,
			      true          , // includeTree   ,
			      false         , // add to cache?
			      0             , // max cache age
			      0             , // startFileNum  ,
			      -1            , // numFiles      ,
			      NULL          , // state
			      NULL          , // callback
			      0             , // niceness
			      false         , // err correction?
			      NULL          ,
			      0             ,
			      -1            ,
			      true          ,
			      -1LL          ,
			      NULL          , // &msg5b        ,
			      true          )) {
		g_threads.enableThreads();
		return log("db: HEY! it did not block");
	}

	int32_t count = 0;
	int32_t got   = 0;
	int32_t printed = 0;
	char k[MAX_KEY_BYTES];

	for ( list.resetListPtr() ; ! list.isExhausted() ;
	      list.skipCurrentRecord() ) {
		//key144_t k;
		list.getCurrentKey(k);

		// skip negative keys
		if ( (k[0] & 0x01) == 0x00 ) continue;

		count++;
		//uint32_t groupId = k.n1 & g_hostdb.m_groupMask;
		//uint32_t groupId = getGroupId ( RDB_POSDB , &k );
		//if ( groupId == g_hostdb.m_groupId ) got++;
		uint32_t shardNum = getShardNum( rdbId , k );

		if ( shardNum == getMyShardNum() ) {
			got++;
			continue;
		}

		if ( ++printed > 100 ) continue;

		// avoid log spam... comment this out. nah print out 1st 100.
		log ( "db: Found bad key in list belongs to shard %"INT32"",
		      shardNum);
	}

	g_threads.enableThreads();

	//if ( got ) 
	//	log("db: verified %"INT32" recs for %s in coll %s",
	//	    got,m_dbname,m_coll);
       
	if ( got == count ) {
		//log("db: passed on %"INT32" recs (count=%"INT32")",got,count);
		return true;
	}

	// tally it up
	g_rebalance.m_numForeignRecs += count - got;
	log ("db: Out of first %"INT32" records in %s for %s.%"INT32", only %"INT32" belong "
	     "to our group.",count,m_dbname,m_coll,(int32_t)m_collnum,got);
	// exit if NONE, we probably got the wrong data
	//if ( got == 0 ) log("db: Are you sure you have the "
	//		    "right data in the right directory? ");

	//log ( "db: Exiting due to Posdb inconsistency." );
	g_threads.enableThreads();
	return true;//g_conf.m_bypassValidation;

	//log(LOG_DEBUG, "db: Posdb passed verification successfully for %"INT32" "
	//		"recs.", count );
	// DONE
	//return true;
}

float RdbBase::getPercentNegativeRecsOnDisk ( int64_t *totalArg ) {
	// scan the maps
	int64_t numPos = 0LL;
	int64_t numNeg = 0LL;
	for ( int32_t i = 0 ; i < m_numFiles ; i++ ) {
		numPos += m_maps[i]->getNumPositiveRecs();
		numNeg += m_maps[i]->getNumNegativeRecs();
	}
	int64_t total = numPos + numNeg;
	*totalArg = total;
	float percent = (float)numNeg / (float)total;
	return percent;
}