#include "RdbTree.h"
#include "Collectiondb.h"
#include "JobScheduler.h"
#include "Mem.h"
#include "RdbMem.h"
#include "BigFile.h"
#include "RdbList.h"
#include "Spider.h"
#include "Process.h"
#include "Conf.h"
#include "ScopedWriteLock.h"
#include "ScopedReadLock.h"
#include <fcntl.h>


RdbTree::RdbTree () {
	m_collnums= NULL;
	m_keys    = NULL;
	m_data    = NULL;
	m_sizes   = NULL;
	m_left    = NULL;
	m_right   = NULL;
	m_parents = NULL;
	m_depth   = NULL;
	m_headNode      = -1;
	m_numNodes      =  0;
	m_numUsedNodes  =  0;
	m_memAllocated  =  0;
	m_memOccupied   =  0;
	m_nextNode      =  0;
	m_minUnusedNode =  0; 
	m_fixedDataSize = -1; // variable dataSize, depends on individual node
	m_needsSave     = false;
	m_pickRight     = false;

	// PVS-Studio
	memset(m_dir, 0, sizeof(m_dir));
	memset(m_memTag, 0, sizeof(m_memTag));
	m_state = NULL;
	m_callback = NULL;
	m_ownData = false;
	m_overhead = 0;
	m_maxMem = 0;
	m_allocName = NULL;
	m_bytesWritten = 0;
	m_bytesRead = 0;
	m_errno = 0;
	m_ks = 0;
	m_corrupt = 0;
	
	// before resetting... we have to set this so clear() won't breach buffers
	m_rdbId = -1;

	reset_unlocked();
}

RdbTree::~RdbTree ( ) {
	reset_unlocked();
}


// "memMax" includes records plus the overhead
bool RdbTree::set(int32_t fixedDataSize, int32_t maxNumNodes, int32_t memMax, bool ownData,
                  const char *allocName, const char *dbname, char keySize, char rdbId) {
	ScopedWriteLock sl(m_rwlock);

	reset_unlocked();
	m_fixedDataSize   = fixedDataSize; 
	m_maxMem          = memMax;
	m_ownData         = ownData;
	m_allocName       = allocName;
	m_ks              = keySize;

	m_needsSave       = false;

	m_dbname[0] = '\0';
	if ( dbname ) {
		int32_t dlen = strlen(dbname);
		if ( dlen > 30 ) dlen = 30;
		gbmemcpy(m_dbname,dbname,dlen);
		m_dbname[dlen] = '\0';
	}

	// a malloc tag, must be LESS THAN 16 bytes including the NULL
	char *p = m_memTag;
	gbmemcpy  ( p , "RdbTree" , 7 ); p += 7;
	if ( dbname ) strncpy ( p , dbname    , 8 );
	p += 8;

	*p++ = '\0';
	// set rdbid
	m_rdbId = rdbId; // -1;
	// sanity
	if ( rdbId < -1       ) { g_process.shutdownAbort(true); }
	if ( rdbId >= RDB_END ) { g_process.shutdownAbort(true); }

	// adjust m_maxMem to virtual infinity if it was -1
	if ( m_maxMem < 0 ) m_maxMem = 0x7fffffff;
	// . compute each node's memory overhead
	// . size of a key/left/right/parent
	m_overhead = (m_ks + 4*3 );
	// include collection number, currently an uint16_t
	m_overhead += sizeof(collnum_t);
	// if we're a non-zero data length include a dataptr (-1 means variabl)
	if ( m_fixedDataSize !=  0 ) m_overhead += 4;
	// include dataSize if our dataSize is variable (-1)
	if ( m_fixedDataSize == -1 ) m_overhead += 4;
	// if we're balanced include 1 byte per node for the depth
	m_overhead += 1;
	if( maxNumNodes == -1) {
		maxNumNodes = m_maxMem / m_overhead;
		if(maxNumNodes > 10000000) maxNumNodes = 10000000;
	}
	// allocate the nodes
	return growTree_unlocked(maxNumNodes);
}

void RdbTree::reset() {
	ScopedWriteLock sl(m_rwlock);
	reset_unlocked();
}

void RdbTree::reset_unlocked() {
	// make sure string is NULL temrinated. this strlen() should 
	if ( m_numNodes > 0 && 
	     m_dbname[0] &&
	     // don't be spammy we can have thousands of these, one per coll
	     strcmp(m_dbname,"waitingtree") != 0 )
		log(LOG_INFO,"db: Resetting tree for %s.",m_dbname);

	// liberate all the nodes
	clear_unlocked();

	// do not require saving after a reset
	m_needsSave = false;
	// now free all the overhead structures of this tree
	int32_t n = m_numNodes;
	// free array of collectio numbers (shorts for now)
	if ( m_collnums) mfree ( m_collnums, sizeof(collnum_t) *n,m_allocName);
	// free the array of keys
	if ( m_keys  ) mfree ( m_keys  , m_ks      * n , m_allocName ); 
	// free the data ptrs
	if ( m_data  ) mfree ( m_data  , sizeof(char *) * n , m_allocName ); 
	// free the array of dataSizes
	if ( m_sizes ) mfree ( m_sizes , n * 4              , m_allocName ); 
	// free the sorted node #'s
	if ( m_left    ) mfree ( m_left    , n * 4 ,m_allocName);
	if ( m_right   ) mfree ( m_right   , n * 4 ,m_allocName);
	if ( m_parents ) mfree ( m_parents , n * 4 ,m_allocName);
	if ( m_depth   ) mfree ( m_depth   , n     ,m_allocName);
	m_collnums      = NULL; 
	m_keys          = NULL; 
	m_data          = NULL;
	m_sizes         = NULL;
	m_left          = NULL;
	m_right         = NULL;
	m_parents       = NULL;
	m_depth         = NULL;
	// tree description vars
	m_headNode      = -1;
	m_numNodes      =  0;
	m_numUsedNodes  =  0;
	m_memAllocated  =  0;
	m_memOccupied   =  0;
	m_nextNode      =  0;
	m_minUnusedNode =  0; 
	m_fixedDataSize = -1; // variable dataSize, depends on individual node
	// clear counts
	m_numNegativeKeys = 0;
	m_numPositiveKeys = 0;
	m_isSaving        = false;
}

void RdbTree::delColl ( collnum_t collnum ) {
	ScopedWriteLock sl(m_rwlock);

	m_needsSave = true;
	const char *startKey = KEYMIN();
	const char *endKey   = KEYMAX();
	deleteNodes_unlocked(collnum, startKey, endKey, true/*freeData*/) ;
}

int32_t RdbTree::clear() {
	ScopedWriteLock sl(m_rwlock);
	return clear_unlocked();
}

// . this just makes all the nodes available for occupation (liberates them)
// . it does not free this tree's control structures
// . returns # of occupied nodes we liberated
int32_t RdbTree::clear_unlocked( ) {
	if ( m_numUsedNodes > 0 ) m_needsSave = true;
	// the liberation count
	int32_t count = 0;
	// liberate all of our nodes
	int32_t dataSize = m_fixedDataSize;
	for ( int32_t i = 0 ; i < m_minUnusedNode ; i++ ) {
		// skip node if parents is -2 (unoccupied)
		if ( m_parents[i] == -2 ) continue;
		// we no longer count the overhead of this node as occupied
		m_memOccupied -= m_overhead;
		// make the ith node available for occupation
		m_parents[i] = -2;
		// keep count
		count++;
		// continue if we have no data to free
		if ( ! m_data ) continue;
		// read dataSize from m_sizes[i] if it's not fixed
		if ( m_fixedDataSize == -1 ) dataSize = m_sizes[i];
		// free the data being pointed to
		if ( m_ownData ) mfree ( m_data[i] , dataSize ,m_allocName);
		// adjust our reported memory usage
		m_memAllocated -= dataSize;
		m_memOccupied -= dataSize;
	}
	// reset all these
	m_headNode      = -1;
	m_numUsedNodes  =  0;
	m_nextNode      =  0;
	m_minUnusedNode =  0; 
	// clear counts
	m_numNegativeKeys = 0;
	m_numPositiveKeys = 0;


	// clear tree counts for all collections!
	int32_t nc = g_collectiondb.getNumRecs();
	// BUT only if we are an Rdb::m_tree!!!
	if ( m_rdbId == -1 ) nc = 0;
	// otherwise, we overwrite stuff in CollectionRec we shouldn't
	for ( int32_t i = 0 ; i < nc ; i++ ) {
		CollectionRec *cr = g_collectiondb.getRec(i);
		if ( ! cr ) continue;
		if ( m_rdbId < 0 ) continue;
		//if (((unsigned char)m_rdbId)>=RDB_END){g_process.shutdownAbort(true); }
		cr->m_numNegKeysInTree[(unsigned char)m_rdbId] = 0;
		cr->m_numPosKeysInTree[(unsigned char)m_rdbId] = 0;
	}

	return count;
}

bool RdbTree::getNode(collnum_t collnum, const char *key) const {
	ScopedReadLock sl(m_rwlock);
	return (getNode_unlocked(collnum, key) >= 0);
}

// . used by cache 
// . wrapper for getNode_unlocked()
int32_t RdbTree::getNode_unlocked(collnum_t collnum, const char *key) const {
	m_rwlock.verify_is_locked();

	int32_t i = m_headNode;

	// get the node (about 4 cycles per loop, 80cycles for 1 million items)
	while (i != -1) {
		if (collnum < m_collnums[i]) {
			i = m_left[i];
			continue;
		}

		if (collnum > m_collnums[i]) {
			i = m_right[i];
			continue;
		}

		int cmp = KEYCMP(key, 0, m_keys, i, m_ks);
		if (cmp < 0) {
			i = m_left[i];
			continue;
		}

		if (cmp > 0) {
			i = m_right[i];
			continue;
		}

		return i;
	}

	return -1;
}

// . returns node # whose key is >= "key"
// . returns -1 if none
// . used by RdbTree::getList()
// . TODO: spiderdb stores records by time so our unbalanced tree really hurts
//         us for that.
// . TODO: keep a m_lastStartNode and start from that since it tends to only
//         increase startKey via Msg3. if the key at m_lastStartNode is <=
//         the provided key then we did well.
int32_t RdbTree::getNextNode_unlocked(collnum_t collnum, const char *key) const {
	m_rwlock.verify_is_locked();

	// return -1 if no non-empty nodes in the tree
	if ( m_headNode < 0 ) return -1;
	// get the node (about 4 cycles per loop, 80cycles for 1 million items)
	int32_t parent;
	int32_t i = m_headNode ;

	while ( i != -1 ) {
		parent = i;
		if ( collnum < m_collnums[i] ) { i = m_left [i]; continue;}
		if ( collnum > m_collnums[i] ) { i = m_right[i]; continue;}
		int cmp = KEYCMP(key,0,m_keys,i,m_ks);
		if (cmp<0) { i = m_left [i]; continue;}
		if (cmp>0) { i = m_right[i]; continue;}
		return i;
        }
	if ( m_collnums [ parent ] >  collnum ) return parent;
	if ( m_collnums [ parent ] == collnum && //m_keys [ parent ] > key ) 
	     KEYCMP(m_keys,parent,key,0,m_ks)>0 )
		return parent;
	return getNextNode_unlocked(parent);
}

int32_t RdbTree::getFirstNode_unlocked() const {
	m_rwlock.verify_is_locked();

	const char *k = KEYMIN();
	return getNextNode_unlocked(0, k);
}

int32_t RdbTree::getLastNode_unlocked() const {
	m_rwlock.verify_is_locked();

	const char *k = KEYMAX();
	return getPrevNode_unlocked((collnum_t)0x7fff, k);
}

// . get the node whose key is <= "key"
// . returns -1 if none
int32_t RdbTree::getPrevNode_unlocked(collnum_t collnum, const char *key) const {
	m_rwlock.verify_is_locked();

	// return -1 if no non-empty nodes in the tree
	if ( m_headNode < 0  ) return -1;
	// get the node (about 4 cycles per loop, 80cycles for 1 million items)
	int32_t parent;
	int32_t i = m_headNode ;
	while ( i != -1 ) {
		parent = i;
		if ( collnum < m_collnums[i] ) { i = m_left [i]; continue;}
		if ( collnum > m_collnums[i] ) { i = m_right[i]; continue;}
		int cmp = KEYCMP(key,0,m_keys,i,m_ks);
		if ( cmp<0) {i=m_left [i];continue;}
		if ( cmp>0) {i=m_right[i];continue;}
		return i;
        }
	if ( m_collnums [ parent ] <  collnum ) return parent;
	if ( m_collnums [ parent ] == collnum && //m_keys [ parent ] < key ) 
	     KEYCMP(m_keys,parent,key,0,m_ks) < 0 ) return parent;
	return getPrevNode_unlocked(parent);
}

const char* RdbTree::getKey_unlocked(int32_t node) const {
	m_rwlock.verify_is_locked();

	return &m_keys[node*m_ks];
}

const char* RdbTree::getData(collnum_t collnum, const char *key) const {
	ScopedReadLock sl(m_rwlock);
	int32_t n = getNode_unlocked(collnum, key);
	if (n < 0) return NULL;
	return m_data[n];
};

int32_t RdbTree::getDataSize_unlocked(int32_t node) const {
	m_rwlock.verify_is_locked();

	if (m_fixedDataSize == -1) {
		return m_sizes[node];
	}
	return m_fixedDataSize;
}

// . "i" is the previous node number
// . we could eliminate m_parents[] array if we limited tree depth!
// . 24 cycles to get the first kid
// . averages around 50 cycles per call probably
// . 8 cycles are spent entering/exiting this subroutine (inline it? TODO)
int32_t RdbTree::getNextNode_unlocked(int32_t i) const {
	m_rwlock.verify_is_locked();

	// cruise the kids if we have a right one
	if ( m_right[i] >= 0 ) {
		// go to the right kid
		i = m_right [ i ];
		// now go left as much as we can
		while ( m_left [ i ] >= 0 ) i = m_left [ i ];
		// return that node (it's a leaf or has one right kid)
		return i;
	}
	// now keep getting parents until one has a key bigger than i's key
	int32_t p = m_parents[i];
	// if parent is negative we're done
	if ( p < 0 ) return -1;
	// if we're the left kid of the parent, then the parent is the
	// next biggest node
	if ( m_left[p] == i ) return p;
	// otherwise keep getting the parent until it has a bigger key
	// or until we're the LEFT kid of the parent. that's better
	// cuz comparing keys takes longer. loop is 6 cycles per iteration.
	while ( p >= 0  &&  (m_collnums[p] < m_collnums[i] ||
			     ( m_collnums[p] == m_collnums[i] && 
			       KEYCMP(m_keys,p,m_keys,i,m_ks) < 0 )) )
		p = m_parents[p];
	// p will be -1 if none are left
	return p;
}

// . "i" is the next node number
int32_t RdbTree::getPrevNode_unlocked(int32_t i) const {
	m_rwlock.verify_is_locked();

	// cruise the kids if we have a left one
	if ( m_left[i] >= 0 ) {
		// go to the left kid
		i = m_left [ i ];
		// now go right as much as we can
		while ( m_right [ i ] >= 0 ) i = m_right [ i ];
		// return that node (it's a leaf or has one left kid)
		return i;
	}
	// now keep getting parents until one has a key bigger than i's key
	int32_t p = m_parents[i];
	// if we're the right kid of the parent, then the parent is the
	// next least node
	if ( m_right[p] == i ) return p;
	// keep getting the parent until it has a bigger key
	// or until we're the RIGHT kid of the parent. that's better
	// cuz comparing keys takes longer. loop is 6 cycles per iteration.
	while ( p >= 0  &&  (m_collnums[p] > m_collnums[i] ||
			     ( m_collnums[p] == m_collnums[i] && 
			       KEYCMP(m_keys,p,m_keys,i,m_ks) > 0 )) )
		p = m_parents[p];
	// p will be -1 if none are left
	return p;
}

bool RdbTree::addKey(const void *key) {
	ScopedWriteLock sl(m_rwlock);
	return (addKey_unlocked(key) >= 0);
}

int32_t RdbTree::addKey_unlocked(const void *key) {
	m_rwlock.verify_is_write_locked();

	return addNode_unlocked ( 0,(const char *)key,NULL,0);
}

bool RdbTree::addNode(collnum_t collnum, const char *key, char *data, int32_t dataSize) {
	ScopedWriteLock sl(m_rwlock);
	return (addNode_unlocked(collnum, key, data, dataSize) >= 0);
}

// . returns -1 if we coulnd't allocate the new space and sets g_errno to ENOMEM
//   or ETREENOGROW, ...
// . returns node # we added it to on success
// . this will replace any current node with the same key
// . sets retNode to the node we added the data to (used internally)
// . negative dataSizes should be interpreted as 0
// . probably about 120 cycles per add means we can add 2 million per sec
// . NOTE: does not check to see if it will exceed m_maxMem
int32_t RdbTree::addNode_unlocked ( collnum_t collnum , const char *key , char *data , int32_t dataSize ) {
	m_rwlock.verify_is_write_locked();

	// if there's no more available nodes, error out
	if ( m_numUsedNodes >= m_numNodes) { g_errno = ENOMEM; return -1; }
	// we need to be saved now
	m_needsSave = true;

	// sanity check - no empty positive keys for doledb
	if ( m_rdbId == RDB_DOLEDB && dataSize == 0 && (key[0]&0x01) == 0x01){
		g_process.shutdownAbort(true); }

	// for posdb
	if ( m_ks == 18 &&(key[0] & 0x06) ) {g_process.shutdownAbort(true); }

	// set up vars
	int32_t iparent ;
	int32_t rightGuy;
	// this is -1 iff there are no nodes used in the tree
	int32_t i = m_headNode;
	// . find the parent of node i and call it "iparent"
	// . if a node exists with our key then replace it
	while ( i != -1 ) {
		iparent = i;
		if ( collnum < m_collnums[i] ) { i = m_left [i]; continue;}
		if ( collnum > m_collnums[i] ) { i = m_right[i]; continue;}
		int cmp = KEYCMP(key, 0, m_keys, i, m_ks);
		if (cmp < 0) {
			i = m_left[i];
		} else if (cmp > 0) {
			i = m_right[i];
		} else {
			goto replaceIt;
		}
	}

	// . this overhead is key/left/right/parent
	// . we inc it by the data and sizes array if we need to below
	m_memOccupied += m_overhead;
	// point i to the next available node
	i = m_nextNode;
	// debug msg
	//if ( m_dbname && m_dbname[0]=='t' && dataSize >= 4 )
	//	logf(LOG_DEBUG,
	//	     "adding node #%" PRId32" with data ptr at %" PRIx32" "
	//	     "and data size of %" PRId32" into a list.",
	//	     i,data,dataSize);
	// if we're the first node we become the head node and our parent is -1
	if ( m_numUsedNodes == 0 ) {
		m_headNode =  i;
		iparent    = -1;
		// ensure these are right
		m_numNegativeKeys = 0;
		m_numPositiveKeys = 0;
		// we only use these stats for Rdb::m_trees for a 
		// PER COLLECTION count, since there can be multiple 
		// collections using the same Rdb::m_tree!
		// crap, when fixing a tree this will segfault because
		// m_recs[collnum] is NULL.
		if ( m_rdbId >= 0 && g_collectiondb.getRec(collnum) ) {
			//if( ((unsigned char)m_rdbId)>=RDB_END){g_process.shutdownAbort(true); }
			g_collectiondb.getRec(collnum)->
				m_numNegKeysInTree[(unsigned char)m_rdbId] =0;
			g_collectiondb.getRec(collnum)->
				m_numPosKeysInTree[(unsigned char)m_rdbId] =0;
		}
	}

	// stick ourselves in the next available node, "m_nextNode"
	//m_keys    [ i ] = key;
	KEYSET ( &m_keys[i*m_ks] , key , m_ks );
	m_parents [ i ] = iparent;
	// save collection number now, too
	m_collnums [ i ] = collnum;
	// add the key
	// set the data and size only if we need to
	if ( m_fixedDataSize != 0 ) {
		m_data [ i ] = data;
		// ack used and occupied mem
		if ( m_fixedDataSize >= 0 ) {
			m_memAllocated += m_fixedDataSize;
			m_memOccupied += m_fixedDataSize;
		}
		else {
			m_memAllocated += dataSize ; 
			m_memOccupied += dataSize ;
		}
		// we may have a variable size of data as well
		if ( m_fixedDataSize == -1 ) m_sizes [ i ] = dataSize;
	}
	// make our parent, if any, point to us
	if ( iparent >= 0 ) {
		if      ( collnum < m_collnums[iparent] ) 
			m_left [iparent] = i;
		else if (collnum==m_collnums[iparent]&&//key<m_keys[iparent] ) 
			 KEYCMP(key,0,m_keys,iparent,m_ks)<0 )
			m_left [iparent] = i;
		else    
			m_right[iparent] = i;
	}
	// . the right kid of an empty node is used as a linked list of
	//   empty nodes formed by deleting nodes
	// . we keep the linked list so we can re-used these vacated nodes
	rightGuy = m_right [ i ];
	// our kids are -1 (none)
	m_left  [ i ] = -1;
	m_right [ i ] = -1;
	// . if we weren't recycling a node then advance to next
	// . m_minUnusedNode is the lowest node number that was never filled
	//   at any one time in the past
	// . you might call it the brand new housing district
	if ( m_nextNode == m_minUnusedNode ) {m_nextNode++; m_minUnusedNode++;}
	// . otherwise, we're in a linked list of vacated used houses
	// . we have a linked list in the right kid
	// . make sure the new head doesn't have a left
	else {
		// point m_nextNode to the next available used house, if any
		if ( rightGuy >= 0 ) m_nextNode = rightGuy;
		// otherwise point it to the next brand new house (TODO:REMOVE)
		// this is an error, try to fix the tree
		else {
			log(LOG_WARN, "db: Encountered corruption in tree while trying to add a record. "
				"You should replace your memory sticks.");
			if ( !fixTree_unlocked() ) {
				g_process.shutdownAbort(true);
			}
		}
	}
	// we have one more used node
	increaseNodeCount_unlocked(collnum, key);

	// our depth is now 1 since we're a leaf node
	// (we include ourself)
	m_depth [ i ] = 1;

	// . reset depths starting at i's parent and ascending the tree
	// . will balance if child depths differ by 2 or more
	setDepths_unlocked(iparent);

	// return the node number of the node we occupied
	return i; 

	// come here to replace node i with the new data/dataSize
 replaceIt:
	// debug msg
	//fprintf(stderr,"replaced it!\n");
	// if we don't support any data then we're done
	if ( m_fixedDataSize == 0 ) {
		return i;
	}
	// get dataSize
	int32_t oldDataSize = m_fixedDataSize;
	// if datasize was 0 cuz it was a negative key, fix that for
	// calculating m_memOccupied
	if ( m_fixedDataSize >= 0 ) dataSize = m_fixedDataSize;
	if ( m_fixedDataSize < 0  ) oldDataSize = m_sizes[i];
	// free i's data
	if ( m_data[i] && m_ownData ) 
		mfree ( m_data[i] , oldDataSize ,m_allocName);
	// decrease mem occupied and increase by new size
	m_memOccupied -= oldDataSize;
	m_memOccupied += dataSize;
	m_memAllocated -= oldDataSize;
	m_memAllocated += dataSize;
	// otherwise set the data
	m_data [ i ] = data;
	// set the size if we need to as well
	if ( m_fixedDataSize < 0 ) m_sizes [ i ] = dataSize;
	return i;
}

bool RdbTree::deleteNode(collnum_t collnum, const char *key, bool freeData) {
	ScopedWriteLock sl(m_rwlock);
	return deleteNode_unlocked(collnum, key, freeData);
}

bool RdbTree::deleteNode_unlocked(collnum_t collnum, const char *key, bool freeData) {
	int32_t node = getNode_unlocked(collnum, key);
	if (node == -1) {
		return false;
	}

	return deleteNode_unlocked(node, freeData);
}

// delete all nodes with keys in [startKey,endKey]
void RdbTree::deleteNodes_unlocked(collnum_t collnum, const char *startKey, const char *endKey, bool freeData) {
	m_rwlock.verify_is_write_locked();

	int32_t node = getNextNode_unlocked(collnum, startKey);
	while ( node >= 0 ) {
		//int32_t next = getNextNode_unlocked ( node );
		if ( m_collnums[node] != collnum ) break;
		//if ( m_keys    [node] > endKey   ) return;
		if ( KEYCMP(m_keys,node,endKey,0,m_ks) > 0 ) break;
		deleteNode_unlocked(node, freeData);
		// rotation in setDepths_unlocked() will cause him to be replaced
		// with one of his kids, unless he's a leaf node
		//node = next;
		node = getNextNode_unlocked(collnum, startKey);
	}
}

void RdbTree::increaseNodeCount_unlocked(collnum_t collNum, const char *key) {
	m_rwlock.verify_is_locked();

	m_numUsedNodes++;

	if (KEYNEG(key)) {
		m_numNegativeKeys++;
		if (m_rdbId >= 0) {
			CollectionRec *cr = g_collectiondb.getRec(collNum);
			if (cr) {
				cr->m_numNegKeysInTree[(unsigned char)m_rdbId]++;
			}
		}
	} else {
		m_numPositiveKeys++;
		if (m_rdbId >= 0) {
			CollectionRec *cr = g_collectiondb.getRec(collNum);
			if (cr) {
				cr->m_numPosKeysInTree[(unsigned char)m_rdbId]++;
			}
		}
	}
}

void RdbTree::decreaseNodeCount_unlocked(collnum_t collNum, const char *key) {
	m_rwlock.verify_is_write_locked();

	// we have one less used node
	m_numUsedNodes--;

	// update sign counts
	if (KEYNEG(key)) {
		m_numNegativeKeys--;
		if (m_rdbId >= 0) {
			CollectionRec *cr = g_collectiondb.getRec(collNum);
			if (cr) {
				cr->m_numNegKeysInTree[(unsigned char)m_rdbId]--;
			}
		}
	} else {
		m_numPositiveKeys--;
		if (m_rdbId >= 0) {
			CollectionRec *cr = g_collectiondb.getRec(collNum);
			if (cr) {
				cr->m_numPosKeysInTree[(unsigned char)m_rdbId]--;
			}
		}
	}
}

// . now replace node #i with node #j
// . i should not equal j at this point
bool RdbTree::replaceNode_unlocked(int32_t i, int32_t j) {
	m_rwlock.verify_is_locked();

	// . j's parent should take j's one kid
	// . that child should likewise point to j's parent
	// . j should only have <= 1 kid now because of our algorithm above
	// . if j's parent is i then j keeps his kid
	int32_t jparent = m_parents[j];
	if ( jparent != i ) {
		// parent:    if j is my left  kid, then i take j's right kid
		// otherwise, if j is my right kid, then i take j's left kid
		if ( m_left [ jparent ] == j ) {
			m_left  [ jparent ] = m_right [ j ];
			if (m_right[j]>=0) m_parents [ m_right[j] ] = jparent;
		}
		else {
			m_right [ jparent ] = m_left   [ j ];
			if (m_left [j]>=0) m_parents [ m_left[j] ] = jparent;
		}
	}

	// . j inherits i's children (providing i's child is not j)
	// . those children's parent should likewise point to j
	if ( m_left [i] != j ) {
		m_left [j] = m_left [i];
		if ( m_left[j] >= 0 ) m_parents[m_left [j]] = j;
	}
	if ( m_right[i] != j ) {
		m_right[j] = m_right[i];
		if ( m_right[j] >= 0 ) m_parents[m_right[j]] = j;
	}
	// j becomes the kid of i's parent, if any
	int32_t iparent = m_parents[i];
	if ( iparent >= 0 ) {
		if   ( m_left[iparent] == i ) m_left [iparent] = j;
		else                          m_right[iparent] = j;
	}
	// iparent may be -1
	m_parents[j] = iparent;

	// if i was the head node now j becomes the head node
	if ( m_headNode == i ) m_headNode = j;

	// . i joins the linked list of available used homes
	// . put it at the head of the list
	// . "m_nextNode" is the head node of the linked list
	m_right[i]   = m_nextNode;
	m_nextNode   = i;
	// . i's parent should be -2 so we know it's unused in case we're
	//   stepping through the nodes linearly for dumping in RdbDump
	// . used in getListUnordered()
	m_parents[i] = -2;
	// we have one less used node
	decreaseNodeCount_unlocked(m_collnums[i], getKey_unlocked(i));

	// our depth becomes that of the node we replaced, unless moving j
	// up to i decreases the total depth, in which case setDepths_unlocked() fixes
	m_depth [ j ] = m_depth [ i ];
	// . recalculate depths starting at old parent of j
	// . stops at the first node to have the correct depth
	// . will balance at pivot nodes that need it
	if ( jparent != i ) setDepths_unlocked(jparent);
	else setDepths_unlocked(j);
	// TODO: register growTree with g_mem to free on demand

	return true;
}

// . deletes node i from the tree
// . i's parent should point to i's left or right kid
// . if i has no parent then his left or right kid becomes the new top node
bool RdbTree::deleteNode_unlocked(int32_t i, bool freeData) {
	m_rwlock.verify_is_write_locked();

	// watch out for double deletes
	if ( m_parents[i] == -2 ) {
		log(LOG_LOGIC,"db: Caught double delete.");
		return false;
	}

	// we need to be saved now
	m_needsSave = true;

	// we have one less occupied node
	m_memOccupied -= m_overhead;
	// . free it now iff "freeIt" is true (default is true)
	// . m_data can be a NULL array if m_fixedDataSize is fixed to 0
	if ( /*freeData &&*/ m_data ) {
		int32_t dataSize = m_fixedDataSize;
		if ( dataSize == -1 ) dataSize = m_sizes[i];
		if ( m_ownData ) mfree ( m_data [i] , dataSize ,m_allocName);
		m_memAllocated -= dataSize;
		m_memOccupied -= dataSize;
	}

	// j will be the node that replace node #i
	int32_t j = i;
	// . now find a node to replace node #i
	// . get a node whose key is just to the right or left of i's key
	// . get i's right kid
	// . then get that kid's LEFT MOST leaf-node descendant
	// . this little routine is stolen from getNextNode_unlocked(i)
	// . try to pick a kid from the right the same % of time as from left
	if ( ( m_pickRight     && m_right[j] >= 0 ) || 
	     ( m_left[j]   < 0 && m_right[j] >= 0 )  ) {
		// try to pick a left kid next time
		m_pickRight = false;
		// go to the right kid
		j = m_right [ j ];
		// now go left as much as we can
		while ( m_left [ j ] >= 0 ) j = m_left [ j ];
		// use node j (it's a leaf or has a right kid)
		return replaceNode_unlocked(i, j);
	}
	// . now get the previous node if i has no right kid
	// . this little routine is stolen from getPrevNode(i)
	if ( m_left[j] >= 0 ) {
		// try to pick a right kid next time
		m_pickRight = true;
		// go to the left kid
		j = m_left [ j ];
		// now go right as much as we can
		while ( m_right [ j ] >= 0 ) j = m_right [ j ];
		// use node j (it's a leaf or has a left kid)
		return replaceNode_unlocked(i, j);
	}

	// . come here if i did not have any kids (i's a leaf node)
	// . get i's parent
	int32_t iparent = m_parents[i];

	// make i's parent, if any, disown him
	if ( iparent >= 0 ) {
		if   ( m_left[iparent] == i ) m_left [iparent] = -1;
		else                          m_right[iparent] = -1;
	}
	// node i now goes to the top of the list of vacated, available homes
	m_right[i] = m_nextNode;
	// m_nextNode now points to i
	m_nextNode = i;
	// his parent is -2 (god) cuz he's dead and available
	m_parents[i] = -2;

	// . if we were the head node then, since we didn't have any kids,
	//   the tree must be empty
	// . one less node in the tree
	decreaseNodeCount_unlocked(m_collnums[i], getKey_unlocked(i));

	// . reset the depths starting at iparent and going up until unchanged
	// . will balance at pivot nodes that need it
	setDepths_unlocked(iparent);

	// tree must be empty
	if (m_numUsedNodes <= 0) {
		m_headNode = -1;
		// this will nullify our linked list of vacated, used homes
		m_nextNode = 0;
		m_minUnusedNode = 0;
		// ensure these are right
		m_numNegativeKeys = 0;
		m_numPositiveKeys = 0;
		if (m_rdbId >= 0) {
			CollectionRec *cr;
			cr = g_collectiondb.getRec(m_collnums[i]);
			if (cr) {
				cr->m_numNegKeysInTree[(unsigned char)m_rdbId] = 0;
				cr->m_numPosKeysInTree[(unsigned char)m_rdbId] = 0;
			}
		}
	}

	return true;
}

bool RdbTree::fixTree() {
	ScopedWriteLock sl(m_rwlock);
	return fixTree_unlocked();
}

// . this fixes the tree
// returns false if could not fix tree and sets g_errno, otherwise true
bool RdbTree::fixTree_unlocked() {
	m_rwlock.verify_is_write_locked();

	// on error, fix the linked list
	log(LOG_WARN, "db: Trying to fix tree for %s.", m_dbname);
	log(LOG_WARN, "db: %" PRId32" occupied nodes and %" PRId32" empty of top %" PRId32" nodes.",
	    m_numUsedNodes, m_minUnusedNode - m_numUsedNodes, m_minUnusedNode);

	// loop through our nodes
	int32_t n = m_minUnusedNode;
	int32_t count = 0;
	// "clear" the tree as far as addNode() is concerned
	m_headNode      = -1;
	m_numUsedNodes  =  0;
	m_memAllocated  =  0;
	m_memOccupied   =  0;
	m_nextNode      =  0;
	m_minUnusedNode =  0; 
	int32_t           max  = g_collectiondb.getNumRecs();
	log("db: Valid collection numbers range from 0 to %" PRId32".",max);

	/// @todo ALC should we check repair RDB as well?
	bool isTitledb = (m_rdbId == RDB_TITLEDB || m_rdbId == RDB2_TITLEDB2);
	bool isSpiderdb = (m_rdbId == RDB_SPIDERDB || m_rdbId == RDB2_SPIDERDB2);

	// now re-add the old nods to the tree, they should not be overwritten
	// by addNode()
	for ( int32_t i = 0 ; i < n ; i++ ) {
		// speed update
		if ( (i % 100000) == 0 ) 
			log("db: Fixing node #%" PRId32" of %" PRId32".",i,n);
		// skip if empty
		if ( m_parents[i] <= -2 ) continue;
			
		if ( isTitledb && m_data[i] ) {
			char *data = m_data[i];
			int32_t ucompSize = *(int32_t *)data;
			if ( ucompSize < 0 || ucompSize > 100000000 ) {
				log("db: removing titlerec with uncompressed "
				     "size of %i from tree",(int)ucompSize);
				continue;
			}
		}

		char *key = &m_keys[i*m_ks];
		if (isSpiderdb && m_data[i] && Spiderdb::isSpiderRequest((spiderdbkey_t *)key)) {
			char *data = m_data[i];
			data -= sizeof(spiderdbkey_t);
			data -= 4;
			SpiderRequest *sreq ;
			sreq =(SpiderRequest *)data;
			if ( strncmp(sreq->m_url,"http",4) != 0 ) {
				log("db: removing spiderrequest bad url "
					"%s from tree",sreq->m_url);
				//return false;
				continue;
			}
		}
			
			
		collnum_t cn = m_collnums[i];
		// verify collnum
		if ( cn <  0   ) continue;
		if ( cn >= max ) continue;
		// collnum of non-existent coll
		if ( m_rdbId>=0 && ! g_collectiondb.getRec(cn) )
			continue;
		// now add just to set m_right/m_left/m_parent
		if ( m_fixedDataSize == 0 )
			addNode_unlocked(cn,&m_keys[i*m_ks], NULL, 0 );
		else if ( m_fixedDataSize == -1 )
			addNode_unlocked(cn,&m_keys[i*m_ks],m_data[i],m_sizes[i] );
		else 
			addNode_unlocked(cn,&m_keys[i*m_ks],m_data[i], m_fixedDataSize);
		// count em
		count++;
	}

	log("db: Fix tree removed %" PRId32" nodes for %s.",n - count,m_dbname);
	// esure it is still good
	if ( !checkTree_unlocked(false, true) ) {
		log(LOG_WARN, "db: Fix tree failed.");
		return false;
	}
	log("db: Fix tree succeeded for %s.",m_dbname);
	return true;
}

void RdbTree::verifyIntegrity() const {
	ScopedReadLock sl(m_rwlock);

	if (!checkTree_unlocked(false, true)) {
		gbshutdownCorrupted();
	}
}

// returns false if tree had problem, true otherwise
bool RdbTree::checkTree_unlocked(bool printMsgs, bool doChainTest) const {
	m_rwlock.verify_is_locked();

	int32_t hkp = 0;
	bool useHalfKeys = false;

	if (m_rdbId == RDB_LINKDB || m_rdbId == RDB2_LINKDB2) {
		useHalfKeys = true;
	}

	/// @todo ALC should we check repair RDB as well?
	bool isTitledb = (m_rdbId == RDB_TITLEDB || m_rdbId == RDB2_TITLEDB2);
	bool isSpiderdb = (m_rdbId == RDB_SPIDERDB || m_rdbId == RDB2_SPIDERDB2);

	// now check parent kid correlations
	for ( int32_t i = 0 ; i < m_minUnusedNode ; i++ ) {
		// skip node if parents is -2 (unoccupied)
		if ( m_parents[i] == -2 ) continue;
		// all half key bits must be off in here
		if ( useHalfKeys && (m_keys[i*m_ks] & 0x02) ) {
			hkp++;
			// turn it off
			m_keys[i*m_ks] &= 0xfd;
		}
		// for posdb
		if ( m_ks == 18 &&(m_keys[i*m_ks] & 0x06) ) {
			g_process.shutdownAbort(true); }

		if ( isTitledb && m_data[i] ) {
			char *data = m_data[i];
			int32_t ucompSize = *(int32_t *)data;
			if ( ucompSize < 0 || ucompSize > 100000000 ) {
				log("db: found titlerec with uncompressed "
					"size of %i from tree",(int)ucompSize);
				return false;
			}
		}

		char *key = &m_keys[i*m_ks];
		if ( isSpiderdb && m_data[i] &&
				Spiderdb::isSpiderRequest ( (spiderdbkey_t *)key ) ) {
				char *data = m_data[i];
				data -= sizeof(spiderdbkey_t);
				data -= 4;
				SpiderRequest *sreq ;
				sreq =(SpiderRequest *)data;
				if ( strncmp(sreq->m_url,"http",4) != 0 ) {
					log("db: spiderrequest bad url "
						"%s",sreq->m_url);
					return false;
			}
		}

		// bad collnum?
		collnum_t cn = m_collnums[i];
		if ( m_rdbId>=0 && (cn >= g_collectiondb.getNumRecs() || cn < 0) ) {
			log(LOG_WARN, "db: bad collnum in tree");
			return false;
		}
		if ( m_rdbId>=0 && ! g_collectiondb.getRec(cn) ) {
			log(LOG_WARN, "db: collnum is obsolete in tree");
			return false;
		}

		// if no left/right kid it MUST be -1
		if ( m_left[i] < -1 ) {
			log(LOG_WARN, "db: Tree left kid < -1.");
			return false;
		}
		if ( m_left[i] >= m_numNodes ) {
			log(LOG_WARN, "db: Tree left kid is %" PRId32" >= %" PRId32".", m_left[i], m_numNodes);
			return false;
		}
		if ( m_right[i] < -1 ) {
			log(LOG_WARN, "db: Tree right kid < -1.");
			return false;
		}
		if ( m_right[i] >= m_numNodes ) {
			log(LOG_WARN, "db: Tree left kid is %" PRId32" >= %" PRId32".", m_right[i], m_numNodes);
			return false;
		}
		// check left kid
		if ( m_left[i] >= 0 && m_parents[m_left[i]] != i ) {
			log(LOG_WARN, "db: Tree left kid and parent disagree.");
			return false;
		}
		// then right kid
		if ( m_right[i] >= 0 && m_parents[m_right[i]] != i ) {
			log(LOG_WARN, "db: Tree right kid and parent disagree.");
			return false;
		}
		// MDW: why did i comment out the order checking?
		// check order
		if ( m_left[i] >= 0 &&
		     m_collnums[i] == m_collnums[m_left[i]] ) {
			char *key = &m_keys[i*m_ks];
			char *left = &m_keys[m_left[i]*m_ks];
			if ( KEYCMP(key,left,m_ks)<0) {
				log(LOG_WARN, "db: Tree left kid > parent %i", i);
				return false;
			}
			
		}
		if ( m_right[i] >= 0 &&
		     m_collnums[i] == m_collnums[m_right[i]] ) {
			char *key = &m_keys[i*m_ks];
			char *right = &m_keys[m_right[i]*m_ks];
			if ( KEYCMP(key,right,m_ks)>0) {
				log(LOG_WARN, "db: Tree right kid < parent %i %s < %s", i, KEYSTR(right, m_ks), KEYSTR(key, m_ks));
				return false;
			}
		}
	}

	if ( hkp > 0 ) {
		log( LOG_WARN, "db: Had %" PRId32" half key bits on for %s.", hkp, m_dbname );
		return false;
	}

	// now return if we aren't doing active balancing
	if ( ! m_depth ) return true;
	// debug -- just always return now
	if ( printMsgs )logf(LOG_DEBUG,"***m_headNode=%" PRId32", m_numUsedNodes=%" PRId32,
			      m_headNode,m_numUsedNodes);
	int32_t           max  = g_collectiondb.getNumRecs();
	// verify that parent links correspond to kids
	for ( int32_t i = 0 ; i < m_minUnusedNode ; i++ ) {
		// verify collnum
		collnum_t cn = m_collnums[i];
		if ( cn < 0 ) {
			log( LOG_WARN, "db: Got bad collnum in tree, %i.", cn );
			return false;
		}
		if ( cn > max ) {
			log( LOG_WARN, "db: Got too big collnum in tree. %i.", cn );
			return false;
		}

		int32_t P = m_parents [i];
		if ( P == -2 ) continue; // deleted node

		if ( P == -1 && i != m_headNode ) {
			log( LOG_WARN, "db: Tree node %" PRId32" has no parent.", i );
			return false;
		}

		// check kids
		if ( P>=0 && m_left[P] != i && m_right[P] != i ) {
			log( LOG_WARN, "db: Tree kids of node # %" PRId32" disowned him.", i );
			return false;
		}

		// speedy tests continue
		if ( ! doChainTest ) continue;
		// ensure i goes back to head node
		int32_t j = i;
		int32_t loopCount = 0;
		while ( j >= 0 ) { 
			if ( j == m_headNode ) {
				break;
			}
			// sanity -- loop check
			if ( ++loopCount > 10000 ) {
				log( LOG_WARN, "db: tree had loop" );
				return false;
			}
			j = m_parents[j];
		}

		if ( j != m_headNode ) {
			log( LOG_WARN, "db: Node # %" PRId32" does not lead back to head node.", i );
			return false;
		}

		if ( printMsgs ) {
		        char *k = &m_keys[i*m_ks];
			logf(LOG_DEBUG,"***node=%" PRId32" left=%" PRId32" rght=%" PRId32" "
			    "prnt=%" PRId32", depth=%" PRId32" c=%" PRId32" key=%s",
			    i,m_left[i],m_right[i],m_parents[i],
			    (int32_t)m_depth[i],(int32_t)m_collnums[i],
			     KEYSTR(k,m_ks));
		}
		//ensure depth
		int32_t newDepth = computeDepth_unlocked(i);
		if ( m_depth[i] != newDepth ) {
			log( LOG_WARN, "db: Tree node # %" PRId32"'s depth should be %" PRId32".", i, newDepth );
			return false;
		}
	}
	if ( printMsgs ) logf(LOG_DEBUG,"---------------");
	// no problems found
	return true;
}

// . grow tree to "n" nodes
// . this will now actually grow from a current size to a new one
bool RdbTree::growTree_unlocked(int32_t nn) {
	m_rwlock.verify_is_write_locked();

	// if we're that size, bail
	if ( m_numNodes == nn ) return true;

	// old number of nodes
	int32_t on = m_numNodes;
	// some quick type info
	int32_t   k = m_ks;
	int32_t   d = sizeof(char *);

	char  *kp = NULL;
	int32_t  *lp = NULL;
	int32_t  *rp = NULL;
	int32_t  *pp = NULL;
	char **dp = NULL;
	int32_t  *sp = NULL;
	char  *tp = NULL;
	collnum_t *cp = NULL;

	// do the reallocs
	int32_t cs = sizeof(collnum_t);
	cp =(collnum_t *)mrealloc (m_collnums, on*cs,nn*cs,m_allocName);
	if ( ! cp ) {
		goto error;
	}

	kp = (char  *) mrealloc ( m_keys    , on*k , nn*k , m_allocName );
	if ( ! kp ) {
		goto error;
	}

	lp = (int32_t  *) mrealloc ( m_left    , on*4 , nn*4 , m_allocName );
	if ( ! lp ) {
		goto error;
	}

	rp = (int32_t  *) mrealloc ( m_right   , on*4 , nn*4 , m_allocName );
	if ( ! rp ) {
		goto error;
	}

	pp = (int32_t  *) mrealloc ( m_parents , on*4 , nn*4 , m_allocName );
	if ( ! pp ) {
		goto error;
	}

	// deal with data, sizes and depth arrays on a basis of need
	if ( m_fixedDataSize !=  0 ) {
		dp =(char **)mrealloc (m_data  , on*d,nn*d,m_allocName);
		if ( ! dp ) {
			goto error;
		}
	}
	if ( m_fixedDataSize == -1 ) {
		sp =(int32_t  *)mrealloc (m_sizes , on*4,nn*4,m_allocName);
		if ( ! sp ) {
			goto error;
		}
	}
	tp =(char  *)mrealloc (m_depth , on  ,nn  ,m_allocName);
	if ( ! tp ) {
		goto error;
	}

	// re-assign
	m_collnums= cp;
	m_keys    = kp;
	m_left    = lp;
	m_right   = rp;
	m_parents = pp;
	m_data    = dp;
	m_sizes   = sp;
	m_depth   = tp;

	// adjust memory usage
	m_memAllocated -= m_overhead * on;
	m_memAllocated += m_overhead * nn;
	// bitch an exit if too much
	if ( m_memAllocated > m_maxMem ) {
		log( LOG_ERROR, "db: Trying to grow tree for %s to %" PRId32", but max is %" PRId32". Consider changing gb.conf.",
		     m_dbname, m_memAllocated, m_maxMem );
		return false;
	}

	// and the new # of nodes we have
	m_numNodes = nn;

	return true;

 error:

	// . realloc back down if we need to
	// . downsizing should NEVER fail!
	if ( cp ) {
		collnum_t *ss = (collnum_t *)mrealloc ( cp , nn*cs , on*cs , m_allocName);
		if ( ! ss ) { g_process.shutdownAbort(true); }
		m_collnums = ss;
	}
	if ( kp ) {
		char  *kk = (char *)mrealloc ( kp, nn*k, on*k, m_allocName );
		if ( ! kk ) { g_process.shutdownAbort(true); }
		m_keys = kk;
	}
	if ( lp ) {
		int32_t  *x = (int32_t *)mrealloc ( lp , nn*4 , on*4 , m_allocName );
		if ( ! x ) { g_process.shutdownAbort(true); }
		m_left = x;
	}
	if ( rp ) {
		int32_t  *x = (int32_t *)mrealloc ( rp , nn*4 , on*4 , m_allocName );
		if ( ! x ) { g_process.shutdownAbort(true); }
		m_right = x;
	}
	if ( pp ) {
		int32_t  *x = (int32_t *)mrealloc ( pp , nn*4 , on*4 , m_allocName );
		if ( ! x ) { g_process.shutdownAbort(true); }
		m_parents = x;
	}
	if ( dp && m_fixedDataSize != 0 ) {
		char **p = (char **)mrealloc ( dp , nn*d , on*d , m_allocName );
		if ( ! p ) { g_process.shutdownAbort(true); }
		m_data = p;
	}
	if ( sp && m_fixedDataSize == -1 ) {
		int32_t  *x = (int32_t *)mrealloc ( sp , nn*4 , on*4 , m_allocName );
		if ( ! x ) { g_process.shutdownAbort(true); }
		m_sizes = x;
	}
	/// @note ALC the following code will be used if we add something else below the mrealloc of m_depth above
	//if ( tp ) {
	//	char *s = (char *)mrealloc ( tp , nn   , on   , m_allocName );
	//	if ( ! s ) { g_process.shutdownAbort(true); }
	//	m_depth = s;
	//}

	log( LOG_ERROR, "db: Failed to grow tree for %s from %" PRId32" to %" PRId32" bytes: %s.",
	     m_dbname, on, nn, mstrerror(g_errno) );
	return false;
}


int32_t RdbTree::getMemOccupiedForList_unlocked(collnum_t collnum, const char *startKey, const char *endKey, int32_t minRecSizes) const {
	m_rwlock.verify_is_locked();

	int32_t ne = 0;
	int32_t size = 0;
	int32_t i = getNextNode_unlocked(collnum, startKey) ;
	while ( i  >= 0 ) {
		// break out if we should
		if ( KEYCMP(m_keys,i,endKey,0,m_ks) > 0 ) break;
		if ( m_collnums[i] != collnum ) break;
		if ( size >= minRecSizes      ) break;
		// num elements
		ne++;
		// do we got data?
		if ( m_data ) {
			// is size fixed?
			if ( m_fixedDataSize >= 0 ) size += m_fixedDataSize;
			else                        size += m_sizes[i];
		}
		// add in key overhead
		size += m_ks;
		// add in dataSize overhead (-1 means variable data size)
		if ( m_fixedDataSize < 0 ) size += 4;
		// advance
		i = getNextNode_unlocked(i);
	}
	// that's it
	return size;
}

int32_t RdbTree::getMemOccupiedForList() const {
	ScopedReadLock sl(m_rwlock);
	return getMemOccupiedForList_unlocked();
}

int32_t RdbTree::getMemOccupiedForList_unlocked() const {
	m_rwlock.verify_is_locked();

	int32_t mem = 0;
	if ( m_fixedDataSize >= 0 ) {
		mem += m_numUsedNodes * m_ks;
		mem += m_numUsedNodes * m_fixedDataSize;
		return mem;
	}
	// get total mem used by occupied nodes
	mem  = m_memOccupied;
	// remove left/right/parent for each used node (3 int32_ts)
	mem -= m_overhead * m_numUsedNodes;
	// but do include the key in the list, even though it's in the overhead
	mem += m_ks * m_numUsedNodes;
	// but don't include the dataSize in the overhead -- that's in list too
	mem -= 4 * m_numUsedNodes;

	return mem;
}

// . returns false and sets g_errno on error
// . throw all the records in this range into this list
// . probably about 24-50 cycles per key we add
// . if this turns out to be bottleneck we can use hardcore RdbGet later
// . RdbDump should use this
bool RdbTree::getList(collnum_t collnum, const char *startKey, const char *endKey, int32_t minRecSizes,
                      RdbList *list, int32_t *numPosRecs, int32_t *numNegRecs, bool useHalfKeys) const {
	ScopedReadLock sl(m_rwlock);

	// reset the counts of positive and negative recs
	int32_t numNeg = 0;
	int32_t numPos = 0;
	if ( numNegRecs ) *numNegRecs = 0;
	if ( numPosRecs ) *numPosRecs = 0;

	// . set the start and end keys of this list
	// . set lists's m_ownData member to true
	list->reset();
	// got set m_ks first so the set ( startKey, endKey ) works!
	list->setKeySize(m_ks);
	list->set              ( startKey , endKey );
	list->setFixedDataSize ( m_fixedDataSize   );
	list->setUseHalfKeys   ( useHalfKeys       );
	// bitch if list does not own his own data
	if ( ! list->getOwnData() ) {
		g_errno = EBADENGINEER;
		log(LOG_LOGIC,"db: rdbtree: getList: List does not own data");
		return false;
	}
	// bail if minRecSizes is 0
	if ( minRecSizes == 0 ) return true;
	// return true if no non-empty nodes in the tree
	if ( m_numUsedNodes == 0 ) return true;

	// get first node >= startKey
	int32_t node = getNextNode_unlocked(collnum, startKey);
	if ( node < 0 ) return true;
	// if it's already beyond endKey, give up
	if ( KEYCMP ( m_keys,node,endKey,0,m_ks) > 0 ) return true;
	// or if we hit a different collection number
	if ( m_collnums [ node ] > collnum ) return true;
	// save lastNode for setting *lastKey
	int32_t lastNode = -1;
	// . how much space would whole tree take if we stored it in a list?
	// . this includes records that are deletes
	// . caller will often say give me 500MB for a fixeddatasize list
	//   that is heavily constrained by keys...
	int32_t growth = getMemOccupiedForList_unlocked();

	// do not allocate whole tree's worth of space if we have a fixed
	// data size and a finite minRecSizes
	if ( m_fixedDataSize >= 0 && minRecSizes >= 0 ) {
		// only assign if we require less than minRecSizes of growth
		// because some callers set minRecSizes to 500MB!!
		int32_t ng = minRecSizes + m_fixedDataSize + m_ks ;
		if ( ng < growth && ng > minRecSizes ) growth = ng;
	}

	// raise to virtual inifinite if not constraining us
	if ( minRecSizes < 0 ) minRecSizes = 0x7fffffff;

	// . nail it down if titledb because metalincs was getting
	//   out of memory errors when getting a bunch of titleRecs
	// . only do this for titledb/spiderdb lookups since it can be slow
	//   to go through a million indexdb nodes.
	// . this was because minRecSizes was way too big... 16MB i think
	// . if they pass us a size-unbounded request for a fixed data size
	//   list then we should call this as well... as in Msg22.cpp's
	//   call to msg5::getList for tfndb.
	//
	// ^^^^^ Partially obsolete rambling above ^^^^^
	// Until we have verified that no call uses some silly huge minRecSizes we keep the
	// counting threshold but have it a 2MB which is more suitable for modern hardware.
	// The problem is that the getMemOccupiedForList_unlocked() method has to traverse all the
	// relevant nodes to calculate the total size and doing so is not cheap.
	if ( m_fixedDataSize < 0 || minRecSizes >= 2*1024*1024 )
		growth = getMemOccupiedForList_unlocked(collnum, startKey, endKey, minRecSizes);

	// grow the list now
	if ( ! list->growList ( growth ) ) {
		log(LOG_WARN, "db: Failed to grow list to %" PRId32" bytes for storing records from tree: %s.",
		    growth, mstrerror(g_errno));
		return false;
	}

	// stop when we've hit or just exceed minRecSizes
	// or we're out of nodes
	for ( ; node >= 0 && list->getListSize() < minRecSizes ; node = getNextNode_unlocked(node) ) {
		// stop before exceeding endKey
		if ( KEYCMP (m_keys,node,endKey,0,m_ks) > 0 ) break;
		// or if we hit a different collection number
		if ( m_collnums [ node ] != collnum ) break;
		// if more recs were added to tree since we initialized the
		// list then grow the list to compensate so we do not end up
		// reallocating one key at a time.

		// add record to our list
		if ( m_fixedDataSize == 0 ) {
			if (!list->addRecord(&m_keys[node * m_ks], 0, NULL)) {
				log(LOG_WARN, "db: Failed to add record to tree list for %s: %s. Fix the growList algo.",
				    m_dbname,mstrerror(g_errno));
				return false;
			}
		}
		else {
			int32_t dataSize = (m_fixedDataSize == -1) ? m_sizes[node] : m_fixedDataSize;

			// point to key
			char *key = &m_keys[node*m_ks];

			// do not allow negative keys to have data, or
			// at least ignore it! let's RdbList::addRecord()
			// core dump on us!
			if (KEYNEG(key)) {
				dataSize = 0;
			}

			// add the key and data
			if (!list->addRecord(key, dataSize, m_data[node])) {
				log(LOG_WARN, "db: Failed to add record to tree list for %s: %s. Fix the growList algo.",
				    m_dbname,mstrerror(g_errno));
				return false;
			}
		}

		// we are little endian
		if ( KEYNEG(m_keys,node,m_ks) ) numNeg++;
		else                            numPos++;
		// save lastNode for setting *lastKey
		lastNode = node;
	}

	// set counts to pass back
	if ( numNegRecs ) *numNegRecs = numNeg;
	if ( numPosRecs ) *numPosRecs = numPos;
	// . we broke out of the loop because either:
	// . 1. we surpassed endKey OR
	// . 2. we hit or surpassed minRecSizes
	// . constrain the endKey of the list to the key of "node" minus 1
	// . "node" should be the next node we would have added to this list
	// . if "node" is < 0 then we can keep endKey set high the way it is

	// record the last key inserted into the list
	if ( lastNode >= 0 ) 
		list->setLastKey ( &m_keys[lastNode*m_ks] );
	// reset the list's endKey if we hit the minRecSizes barrier cuz
	// there may be more records before endKey than we put in "list"
	if ( list->getListSize() >= minRecSizes && lastNode >= 0 ) {
		// use the last key we read as the new endKey
		char newEndKey[MAX_KEY_BYTES];
		KEYSET(newEndKey,&m_keys[lastNode*m_ks],m_ks);
		// . if he's negative, boost new endKey by 1 because endKey's
		//   aren't allowed to be negative
		// . we're assured there's no positive counterpart to him 
		//   since Rdb::addRecord() doesn't allow both to exist in
		//   the tree at the same time
		// . if by some chance his positive counterpart is in the
		//   tree, then it's ok because we'd annihilate him anyway,
		//   so we might as well ignore him
		// we are little endian
		if ( KEYNEG(newEndKey,0,m_ks) ) KEYINC(newEndKey,m_ks);
		// if we're using half keys set his half key bit
		if ( useHalfKeys ) KEYOR(newEndKey,0x02);
		// tell list his new endKey now
		list->set ( startKey , newEndKey );
	}
	// reset list ptr to point to first record
	list->resetListPtr();

	// success
	return true;
}

// . this just estimates the size of the list 
// . the more balanced the tree the better the accuracy
// . this now returns total recSizes not # of NODES like it used to
//   in [startKey, endKey] in this tree
// . if the count is < 200 it returns an EXACT count
// . right now it only works for dataless nodes (keys only)
int32_t RdbTree::estimateListSize(collnum_t collnum, const char *startKey, const char *endKey, char *minKey, char *maxKey) const {
	ScopedReadLock sl(m_rwlock);

	// make these as benign as possible
	if ( minKey ) KEYSET ( minKey , endKey   , m_ks );
	if ( maxKey ) KEYSET ( maxKey , startKey , m_ks );
	// get order of a key as close to startKey as possible
	int32_t order1 = getOrderOfKey_unlocked(collnum, startKey, minKey);
	// get order of a key as close to endKey as possible
	int32_t order2 = getOrderOfKey_unlocked(collnum, endKey, maxKey);
	// how many recs?
	int32_t size = order2 - order1;
	// . if enough, return
	// . NOTE minKey/maxKey may be < or > startKey/endKey
	// . return an estimated list size
	if ( size > 200 ) return size * m_ks;
	// . otherwise, count exactly
	// . reset size and get the initial node
	size = 0;
	int32_t n = getPrevNode_unlocked(collnum, startKey);
	// return 0 if no nodes in that key range

	if( n < 0 ) {
		return 0;
	}

	// skip to next node if this one is < startKey
	if( KEYCMP(m_keys, n, startKey, 0, m_ks) < 0 ) {
		n = getNextNode_unlocked(n);
	}

	if( n < 0 ) {
		return 0;
	}
	
	// or collnum
	if ( m_collnums[n] < collnum ) {
		n = getNextNode_unlocked(n);
	}

	// loop until we run out of nodes or one breeches endKey
	while( n > 0 && KEYCMP(m_keys,n,endKey,0,m_ks) <= 0 && m_collnums[n]==collnum ) {
		size++;
		n = getNextNode_unlocked(n);
	}
	// this should be an exact list size (actually # of nodes)
	return size * m_ks;
}


bool RdbTree::collExists(collnum_t coll) const {
	ScopedReadLock sl(m_rwlock);
	int32_t nn = getNextNode_unlocked(coll, KEYMIN());
	if(nn < 0)
		return false;
	if(getCollnum_unlocked(nn) != coll)
		return false;
	return true;
}

// we don't lock because variable is already atomic
bool RdbTree::isSaving() const {
	return m_isSaving;
}

bool RdbTree::needsSave() const {
	ScopedReadLock sl(m_rwlock);
	return m_needsSave;
}

// . returns a number from 0 to m_numUsedNodes-1
// . represents the ordering of this key in that range
// . *retKey is the key that has the returned order
// . *retKey gets as close to "key" as it can
// . returns # of NODES
int32_t RdbTree::getOrderOfKey_unlocked(collnum_t collnum, const char *key, char *retKey) const {
	m_rwlock.verify_is_locked();

	if ( m_numUsedNodes <= 0 ) return 0;
	int32_t i     = m_headNode;
	// estimate the depth of tree if not balanced
	int32_t d     = getTreeDepth_unlocked();
	// TODO: WARNING: ensure d-1 not >= 32 !!!!!!!!!!!!!!!!!
	int32_t step  = 1 << (d-1);
	int32_t order = step;
	while ( i != -1 ) {
		//if ( retKey ) *retKey = m_keys[i];
		if ( retKey ) KEYSET ( retKey , &m_keys[i*m_ks] , m_ks );
		step /= 2;
		if ( collnum < m_collnums[i] ||
		     (collnum==m_collnums[i] &&KEYCMP(key,0,m_keys,i,m_ks)<0)){
			i = m_left [i]; 
			if ( i >= 0 ) order -= step;
			continue;
		}
		if ( collnum > m_collnums[i] ||
		     (collnum==m_collnums[i] &&KEYCMP(key,0,m_keys,i,m_ks)>0)){
			i = m_right[i]; 
			if ( i >= 0 ) order += step;
			continue;
		}
		break;
        }
	// normalize order since tree probably has less then 2^d nodes
	int64_t normOrder = (int64_t)order * (int64_t)m_numUsedNodes / ( ((int64_t)1 << d)-1);
	return (int32_t) normOrder;
}

int32_t RdbTree::getTreeDepth_unlocked() const {
	m_rwlock.verify_is_locked();
	return m_depth[m_headNode];
}


// . recompute depths of nodes starting at i and ascending the tree
// . call rotateRight/Left() when depth of children differs by 2 or more
void RdbTree::setDepths_unlocked(int32_t i) {
	m_rwlock.verify_is_write_locked();

	// inc the depth of all parents if it changes for them
	while ( i >= 0 ) {
		// . compute the new depth for node i
		// . get depth of left kid
		// . left/rightDepth is depth of subtree on left/right
		int32_t leftDepth  = 0;
		int32_t rightDepth = 0;
		if ( m_left [i] >= 0 ) leftDepth  = m_depth [ m_left [i] ] ;
		if ( m_right[i] >= 0 ) rightDepth = m_depth [ m_right[i] ] ;
		// . get the new depth for node i
		// . add 1 cuz we include ourself in our m_depth
		int32_t newDepth ;
		if ( leftDepth > rightDepth ) newDepth = leftDepth  + 1;
		else                          newDepth = rightDepth + 1;
		// if the depth did not change for i then we're done
		int32_t oldDepth = m_depth[i] ;
		// set our new depth
		m_depth[i] = newDepth;
		// diff can be -2, -1, 0, +1 or +2
		int32_t diff = leftDepth - rightDepth;
		// . if it's -1, 0 or 1 then we don't need to balance
		// . if rightside is deeper rotate left, i is the pivot
		// . otherwise, rotate left
		// . these should set the m_depth[*] for all nodes needing it
		if      ( diff == -2 ) i = rotateLeft_unlocked(i);
		else if ( diff ==  2 ) i = rotateRight_unlocked(i);
		// . return if our depth was ultimately unchanged
		// . i may have change if we rotated, but same logic applies
		if ( m_depth[i] == oldDepth ) break;
		// debug msg
		//fprintf (stderr,"changed node %" PRId32"'s depth from %" PRId32" to %" PRId32"\n",
		//i,oldDepth,newDepth);
		// get his parent to continue the ascension
		i = m_parents [ i ];
	}
	// debug msg
	//printTree();
}

/*
// W , X and B are SUBTREES.
// B's subtree was 1 less in depth than W or X, then a new node was added to
// W or X triggering the imbalance.
// However, if B gets deleted W and X can be the same size.
//
// Right rotation if W subtree depth is >= X subtree depth:
//
//          A                N
//         / \              / \
//        /   \            /   \
//       N     B   --->   W     A
//      / \                    / \
//     W   X                  X   B
//
// Right rotation if W subtree depth is <  X subtree depth:
//          A                X
//         / \              / \
//        /   \            /   \
//       N     B   --->   N     A
//      / \              / \   / \
//     W   X            W   Q T   B
//        / \                 
//       Q   T               
*/
// . we come here when A's left subtree is deeper than it's right subtree by 2
// . this rotation operation causes left to lose 1 depth and right to gain one
// . the type of rotation depends on which subtree is deeper, W or X
// . W or X must deeper by the other by exactly one
// . if they were equal depth then how did adding a node inc the depth?
// . if their depths differ by 2 then N would have been rotated first!
// . the parameter "i" is the node # for A in the illustration above
// . return the node # that replaced A so the balance() routine can continue
// . TODO: check our depth modifications below
int32_t RdbTree::rotateRight_unlocked(int32_t i) {
	m_rwlock.verify_is_write_locked();
	return rotate_unlocked(i, m_left, m_right);
}

// . i just swapped left with m_right
int32_t RdbTree::rotateLeft_unlocked(int32_t i) {
	m_rwlock.verify_is_write_locked();
	return rotate_unlocked(i, m_right, m_left);
}

int32_t RdbTree::rotate_unlocked(int32_t i, int32_t *left, int32_t *right) {
	m_rwlock.verify_is_write_locked();

	// i's left kid's right kid takes his place
	int32_t A = i;
	int32_t N = left  [ A ];
	int32_t W = left  [ N ];
	int32_t X = right [ N ];
	int32_t Q = -1;
	int32_t T = -1;
	if ( X >= 0 ) {
		Q = left  [ X ];
		T = right [ X ];
	}
	// let AP be A's parent
	int32_t AP = m_parents [ A ];
	// whose the bigger subtree, W or X? (depth includes W or X itself)
	int32_t Wdepth = 0;
	int32_t Xdepth = 0;
	if ( W >= 0 ) Wdepth = m_depth[W];
	if ( X >= 0 ) Xdepth = m_depth[X];

	// goto Xdeeper if X is deeper
	if ( Wdepth < Xdepth ) goto Xdeeper;
	// N's parent becomes A's parent
	m_parents [ N ] = AP;
	// A's parent becomes N
	m_parents [ A ] = N;
	// X's parent becomes A
	if ( X >= 0 ) m_parents [ X ] = A;
	// A's parents kid becomes N
	if ( AP >= 0 ) {
		if ( left [ AP ] == A ) left  [ AP ] = N;
		else                    right [ AP ] = N;
	}
	// if A had no parent, it was the headNode
	else {
		//fprintf(stderr,"changing head node from %" PRId32" to %" PRId32"\n",
		//m_headNode,N);
		m_headNode = N;
	}
	// N's right kid becomes A
	right [ N ] = A;
	// A's left  kid becomes X		
	left  [ A ] = X;
	// . compute A's depth from it's X and B kids
	// . it should be one less if Xdepth smaller than Wdepth
	// . might set m_depth[A] to computeDepth_unlocked(A) if we have problems
	if ( Xdepth < Wdepth ) m_depth [ A ] -= 2;
	else                   m_depth [ A ] -= 1;
	// N gains a depth iff W and X were of equal depth
	if ( Wdepth == Xdepth ) m_depth [ N ] += 1;
	// now we're done, return the new pivot that replaced A
	return N;
	// come here if X is deeper
 Xdeeper:
	// X's parent becomes A's parent
	m_parents [ X ] = AP;
	// A's parent becomes X
	m_parents [ A ] = X;
	// N's parent becomes X
	m_parents [ N ] = X;
	// Q's parent becomes N
	if ( Q >= 0 ) m_parents [ Q ] = N;
	// T's parent becomes A
	if ( T >= 0 ) m_parents [ T ] = A;
	// A's parent's kid becomes X
	if ( AP >= 0 ) {
		if ( left [ AP ] == A ) left  [ AP ] = X;
		else	                right [ AP ] = X;
	}
	// if A had no parent, it was the headNode
	else {
		//fprintf(stderr,"changing head node2 from %" PRId32" to %" PRId32"\n",
		//m_headNode,X);
		m_headNode = X;
	}
	// A's left     kid becomes T
	left  [ A ] = T;
	// N's right    kid becomes Q
	right [ N ] = Q;
	// X's left     kid becomes N
	left  [ X ] = N;
	// X's right    kid becomes A
	right [ X ] = A;
	// X's depth increases by 1 since it gained 1 level of 2 new kids
	m_depth [ X ] += 1;
	// N's depth decreases by 1
	m_depth [ N ] -= 1;
	// A's depth decreases by 2
	m_depth [ A ] -= 2; 
	// now we're done, return the new pivot that replaced A
	return X;
}

// . depth of subtree with i as the head node
// . includes i, so minimal depth is 1
int32_t RdbTree::computeDepth_unlocked(int32_t i) const {
	m_rwlock.verify_is_locked();

	int32_t leftDepth  = 0;
	int32_t rightDepth = 0;
	if ( m_left [i] >= 0 ) leftDepth  = m_depth [ m_left [i] ] ;
	if ( m_right[i] >= 0 ) rightDepth = m_depth [ m_right[i] ] ;
	// . get the new depth for node i
	// . add 1 cuz we include ourself in our m_depth
	if ( leftDepth > rightDepth ) return leftDepth  + 1;
	else                          return rightDepth + 1;  
}

int32_t RdbTree::getMemAllocated() const {
	ScopedReadLock sl(m_rwlock);
	return m_memAllocated;
}

int32_t RdbTree::getMemOccupied() const {
	ScopedReadLock sl(m_rwlock);
	return m_memOccupied;
}

bool RdbTree::is90PercentFull() const {
	ScopedReadLock sl(m_rwlock);

	// . m_memOccupied is amount of alloc'd mem that data occupies
	// . now we /90 and /100 since multiplying overflowed
	return ( m_numUsedNodes/90 >= m_numNodes/100 );
}

#define BLOCK_SIZE 10000

// . caller should call f->set() himself
// . we'll open it here
// . returns false if blocked, true otherwise
// . sets g_errno on error
bool RdbTree::fastSave(const char *dir, bool useThread, void *state, void (*callback)(void *)) {
	ScopedReadLock sl(m_rwlock);

	logTrace(g_conf.m_logTraceRdbTree, "BEGIN. dir=%s", dir);

	if (g_conf.m_readOnlyMode) {
		logTrace(g_conf.m_logTraceRdbTree, "END. Read only mode. Returning true.");
		return true;
	}

	// we do not need a save
	if (!m_needsSave) {
		logTrace(g_conf.m_logTraceRdbTree, "END. Don't need to save. Returning true.");
		return true;
	}

	// return true if already in the middle of saving
	bool isSaving = m_isSaving.exchange(true);
	if (isSaving) {
		logTrace(g_conf.m_logTraceRdbTree, "END. Is already saving. Returning false.");
		return false;
	}

	// note it
	logf(LOG_INFO, "db: Saving %s%s-saved.dat", dir, m_dbname);

	// save parms
	strncpy(m_dir, dir, sizeof(m_dir)-1);
	m_dir[sizeof(m_dir) - 1] = '\0';

	m_state    = state;
	m_callback = callback;

	// assume no error
	m_errno = 0;

	if (useThread) {
		// make this a thread now
		if (g_jobScheduler.submit(saveWrapper, saveDoneWrapper, this, thread_type_unspecified_io, 1/*niceness*/)) {
			return false;
		}

		// if it failed
		if (g_jobScheduler.are_new_jobs_allowed()) {
			log(LOG_WARN, "db: Thread creation failed. Blocking while saving tree. Hurts performance.");
		}
	}

	sl.unlock();

	// no threads
	saveWrapper(this);
	saveDoneWrapper(this, job_exit_normal);

	logTrace(g_conf.m_logTraceRdbTree, "END. Returning true.");

	// we did not block
	return true;
}

void RdbTree::saveWrapper ( void *state ) {
	logTrace(g_conf.m_logTraceRdbTree, "BEGIN");

	// get this class
	RdbTree *that = (RdbTree *)state;

	ScopedReadLock sl(that->getLock());

	// assume no error since we're at the start of thread call
	that->m_errno = 0;

	// this returns false and sets g_errno on error
	that->fastSave_unlocked();

	if (g_errno && !that->m_errno) {
		that->m_errno = g_errno;
	}

	if (that->m_errno) {
		log(LOG_ERROR, "db: Had error saving tree to disk for %s: %s.", that->m_dbname, mstrerror(that->m_errno));
	} else {
		log(LOG_INFO, "db: Done saving %s with %" PRId32" keys (%" PRId64" bytes)",
		    that->m_dbname, that->m_numUsedNodes, that->m_bytesWritten);
	}

	// . resume adding to the tree
	// . this will also allow other threads to be queued
	// . if we did this at the end of the thread we could end up with
	//   an overflow of queued SAVETHREADs
	that->m_isSaving = false;

	// we do not need to be saved now?
	that->m_needsSave = false;

	logTrace(g_conf.m_logTraceRdbTree, "END");
}

/// @todo ALC cater for when exit_type != job_exit_normal
// we come here after thread exits
void RdbTree::saveDoneWrapper(void *state, job_exit_t exit_type) {
	logTrace(g_conf.m_logTraceRdbTree, "BEGIN");

	// get this class
	RdbTree *that = (RdbTree *)state;

	// store save error into g_errno
	g_errno = that->m_errno;

	// . call callback
	if ( that->m_callback ) {
		that->m_callback ( that->m_state );
	}

	logTrace(g_conf.m_logTraceRdbTree, "END");
}

// . returns false and sets g_errno on error
// . NO USING g_errno IN A DAMN THREAD!!!!!!!!!!!!!!!!!!!!!!!!!
bool RdbTree::fastSave_unlocked() {
	m_rwlock.verify_is_locked();

	if ( g_conf.m_readOnlyMode ) return true;

	/// @todo ALC we should probably use BigFile now. don't think g_errno being messed up is a good reason
	// cannot use the BigFile class, since we may be in a thread and it messes with g_errno
	char s[1024];
	sprintf ( s , "%s/%s-saving.dat", m_dir , m_dbname );
	int fd = ::open ( s , O_RDWR | O_CREAT | O_TRUNC , getFileCreationFlags() );
	if ( fd < 0 ) {
		m_errno = errno;
		log( LOG_ERROR, "db: Could not open %s for writing: %s.", s, mstrerror( errno ) );
		return false;
	}

	// verify the tree
	if (g_conf.m_verifyWrites) {
		for (;;) {
			log("db: verify writes is enabled, checking tree before saving.");
			if (!checkTree_unlocked(false, true)) {
				log(LOG_WARN, "db: fixing tree and re-checking");
				fixTree_unlocked();
				continue;
			}
			break;
		}
	}


	// clear our own errno
	errno = 0;
	// . save the header
	// . force file head to the 0 byte in case offset was elsewhere
	int64_t offset = 0;
	int64_t br = 0;
	static const bool doBalancing = true;
	br += pwrite ( fd , &m_numNodes       , 4 , offset ); offset += 4;
	br += pwrite ( fd , &m_fixedDataSize  , 4 , offset ); offset += 4;
	br += pwrite ( fd , &m_numUsedNodes   , 4 , offset ); offset += 4;
	br += pwrite ( fd , &m_headNode       , 4 , offset ); offset += 4;
	br += pwrite ( fd , &m_nextNode       , 4 , offset ); offset += 4;
	br += pwrite ( fd , &m_minUnusedNode  , 4 , offset ); offset += 4;
	br += pwrite ( fd , &doBalancing   , sizeof(doBalancing) , offset);
	offset += sizeof(doBalancing);
	br += pwrite ( fd , &m_ownData       , sizeof(m_ownData)     , offset);
	offset += sizeof(m_ownData);
	// bitch on error
	if ( br != offset ) {
		m_errno = errno;
		close ( fd );
		log(LOG_WARN, "db: Failed to save tree1 for %s: %s.", m_dbname,mstrerror(errno));
		return false;
	}
	// position to store into m_keys, ...
	int32_t start = 0;
	// save tree in block units
	while ( start < m_minUnusedNode ) {
		// . returns number of nodes, starting at node #i, saved
		// . returns -1 and sets errno on error
		int32_t bytesWritten = fastSaveBlock_unlocked(fd, start, offset) ;
		// returns -1 on error
		if ( bytesWritten < 0 ) {
			close ( fd );
			m_errno = errno;
			log(LOG_WARN, "db: Failed to save tree2 for %s: %s.", m_dbname,mstrerror(errno));
			return false;
		}
		// point to next block to save to
		start += BLOCK_SIZE;
		// and advance the file offset
		offset += bytesWritten;
	}
	// remember total bytes written
	m_bytesWritten = offset;
	// close it up
	close ( fd );

	// now rename it
	char s2[1024];
	sprintf ( s2 , "%s/%s-saved.dat", m_dir , m_dbname );
	if( ::rename(s, s2) == -1 ) {
		logError("Error renaming file [%s] to [%s] (%d: %s)", s, s2, errno, mstrerror(errno));
	}

	return true;
}

// return bytes written
int32_t RdbTree::fastSaveBlock_unlocked(int fd, int32_t start, int64_t offset) {
	m_rwlock.verify_is_locked();

	// save offset
	int64_t oldOffset = offset;
	// . just save each one right out, even if empty
	//   because the empty's have a linked list in m_right[]
	// . set # n
	int32_t n = BLOCK_SIZE;
	// don't over do it
	if ( start + n > m_minUnusedNode ) n = m_minUnusedNode - start;

	errno = 0;
	int64_t br = 0;
	// write the block
	br += pwrite ( fd,&m_collnums[start], n * sizeof(collnum_t) , offset );
	offset += n * sizeof(collnum_t);
	br += pwrite ( fd , &m_keys   [start*m_ks] , n * m_ks , offset );
	offset += n * m_ks;
	br += pwrite(fd, &m_left   [start] , n * 4 , offset ); offset += n * 4;
	br += pwrite(fd, &m_right  [start] , n * 4 , offset ); offset += n * 4;
	br += pwrite(fd, &m_parents[start] , n * 4 , offset ); offset += n * 4;
	br += pwrite ( fd , &m_depth[start] , n  , offset ); offset += n  ;
	if ( m_fixedDataSize == -1 ) {
	  br += pwrite ( fd , &m_sizes[start] , n*4, offset ); offset += n*4; }
	// bitch on error
	if ( br != offset - oldOffset ) {
		log(LOG_WARN, "db: Failed to save tree3 for %s (%" PRId64"!=%" PRId64"): %s.",
		    m_dbname, br, offset, mstrerror(errno));
		return -1;
	}

	// if no data to write then return bytes written this call
	if ( m_fixedDataSize == 0 ) return offset - oldOffset ;

	// debug count
	//int32_t count = 0;
	// define ending node for all loops
	int32_t end = start + n ;
	// now we have to dump out all the records
	for ( int32_t i = start ; i < end ; i++ ) {
		// skip if empty
		if ( m_parents[i] == -2 ) continue;
		// write variable sized nodes
		if ( m_fixedDataSize == -1 ) {
			if ( m_sizes[i] <= 0 ) continue;
			pwrite ( fd , m_data[i] , m_sizes[i] , offset );
			offset += m_sizes[i];
			continue;
		}
		// write fixed sized nodes
		pwrite (  fd , m_data[i] , m_fixedDataSize , offset );
		offset += m_fixedDataSize;
	}

	// return bytes written
	return offset - oldOffset;
}

// . caller should call f->set() himself
// . we'll open it here
// . returns false and sets g_errno on error (sometimes g_errno not set)
bool RdbTree::fastLoad(BigFile *f, RdbMem *stack) {
	ScopedWriteLock sl(m_rwlock);

	log( LOG_INIT, "db: Loading %s.", f->getFilename() );

	// open it up
	if ( ! f->open ( O_RDONLY ) ) {
		log( LOG_ERROR, "db: open failed" );
		return false;
	}

	int32_t fsize = f->getFileSize();
	// init offset
	int64_t offset = 0;
	// 16 byte header
	int32_t header = 4*6 + 1 + sizeof(m_ownData);
	// file size must be a min of "header"
	if ( fsize < header ) {
		f->close();
		g_errno=EBADFILE;
		log( LOG_ERROR, "db: file size smaller than header" );
		return false;
	}

	// get # of nodes in the tree
	int32_t n , fixedDataSize , numUsedNodes ;
	bool doBalancing , ownData ;
	int32_t headNode , nextNode , minUnusedNode;
	// force file head to the 0 byte in case offset was elsewhere
	f->read  ( &n              , 4 , offset ); offset += 4;
	f->read  ( &fixedDataSize  , 4 , offset ); offset += 4;
	f->read  ( &numUsedNodes   , 4 , offset ); offset += 4;
	f->read  ( &headNode       , 4 , offset ); offset += 4;
	f->read  ( &nextNode       , 4 , offset ); offset += 4;
	f->read  ( &minUnusedNode  , 4 , offset ); offset += 4;
	f->read  ( &doBalancing    , sizeof(doBalancing) , offset );
	offset += sizeof(doBalancing);
	f->read  ( &ownData        , sizeof(m_ownData    ) , offset ) ; 
	offset += sizeof(m_ownData);
	// return false on read error
	if ( g_errno ) {
		f->close();
		log( LOG_ERROR, "db: read error: %s", mstrerror( g_errno ) );
		return false;
	}
	// parms check
	if ( m_fixedDataSize != fixedDataSize || 
	     !doBalancing   ||
	     m_ownData       != ownData        ) {
		f->close();
		log(LOG_LOGIC,"db: rdbtree: fastload: Bad parms. File "
			   "may be corrupt or a key attribute was changed in "
			   "the code and is not reflected in this file.");
		return false;
	}
	// make sure size it right again
	int32_t nodeSize    = (sizeof(collnum_t)+m_ks+4+4+4);
	int32_t minFileSize = header + minUnusedNode * nodeSize;
	minFileSize += minUnusedNode     ;
	if ( fixedDataSize == -1 ) minFileSize += minUnusedNode * 4 ;
	//if ( fixedDataSize > 0 ) minFileSize += minUnusedNode *fixedDataSize;
	// if no data, sizes much match exactly
	if ( fixedDataSize == 0 && fsize != minFileSize ) {
		g_errno = EBADFILE;
		log( LOG_ERROR, "db: File size of %s is %" PRId32", should be %" PRId32". File may be corrupted.",
		     f->getFilename(),fsize,minFileSize);
		f->close();
		return false;
	}
	// does it fit?
	if ( fsize < minFileSize ) {
		g_errno = EBADFILE;
		log( LOG_ERROR, "db: File size of %s is %" PRId32", should >= %" PRId32". File may be corrupted.",
		     f->getFilename(),fsize,minFileSize);
		f->close();
		return false;
	}

	// make room if we don't have any
	if ( m_numNodes < minUnusedNode ) {
		log( LOG_INIT, "db: Growing tree to make room for %s", f->getFilename() );
		if (!growTree_unlocked(minUnusedNode)) {
			f->close();
			log( LOG_ERROR, "db: Failed to grow tree" );
			return false;
		}
	}
	// we'll read this many
	int32_t start = 0;

	// reset corruption count
	m_corrupt = 0;

	// read block by block
	while ( start < minUnusedNode ) {
		// . returns next place to start scan
		// . incs m_numPositive/NegativeKeys and m_numUsedNodes 
		// . incs m_memAllocated and m_memOccupied
		int32_t bytesRead = fastLoadBlock_unlocked(f, start, minUnusedNode, stack, offset) ;
		if ( bytesRead < 0 ) {
			f->close();
			g_errno = errno;
			return false;
		}
		// inc the start
		start += BLOCK_SIZE;
		// and the offset
		offset += bytesRead;
	}

	// print corruption
	if ( m_corrupt ) {
		log( LOG_WARN, "admin: Loaded %" PRId32" corrupted recs in tree for %s.", m_corrupt, m_dbname );
	}

	// remember total bytes read
	m_bytesRead = offset;
	// set these
	m_headNode      = headNode;
	m_nextNode      = nextNode;
	m_minUnusedNode = minUnusedNode;

	// check it
	if ( !checkTree_unlocked(false, true) ) {
		return fixTree_unlocked();
	}

	// no longer needs save
	m_needsSave = false;

	return true;
}

// . return bytes loaded
// . returns -1 and sets g_errno on error
int32_t RdbTree::fastLoadBlock_unlocked(BigFile *f, int32_t start, int32_t totalNodes, RdbMem *stack, int64_t offset) {
	m_rwlock.verify_is_write_locked();

	// set # ndoes to read
	int32_t n = totalNodes - start;
	if ( n > BLOCK_SIZE ) n = BLOCK_SIZE;
	// debug msg
	//log("reading block at %" PRId64", %" PRId32" nodes",
	//     f->m_currentOffset, n );
	int64_t oldOffset = offset;
	// . copy them in
	// . start reading at beginning of file
	f->read ( &m_collnums[start], n * sizeof(collnum_t) , offset ); 
	offset += n * sizeof(collnum_t);
	f->read ( &m_keys   [start*m_ks] , n * m_ks , offset ); 
	offset += n * m_ks;
	f->read ( &m_left   [start] , n * 4 , offset ); offset += n * 4;
	f->read ( &m_right  [start] , n * 4 , offset ); offset += n * 4;
	f->read ( &m_parents[start] , n * 4 , offset ); offset += n * 4;
	f->read ( &m_depth[start] , n , offset    ); offset += n;
	if ( m_fixedDataSize == -1 ) {
		f->read ( &m_sizes[start] , n * 4 , offset); offset += n * 4; }
	// return false on read error
	if ( g_errno ) {
		log( LOG_ERROR, "db: Failed to read %s: %s.", f->getFilename(), mstrerror(g_errno) );
		return -1;
	}
	// get valid collnum ranges
	int32_t max  = g_collectiondb.getNumRecs();
	// sanity check
	//if ( max >= MAX_COLLS ) { g_process.shutdownAbort(true); }
	// define ending node for all loops
	int32_t end = start + n ;
	// store into tree in the appropriate nodes
	for ( int32_t i = start ; i < end ; i++ ) {
		// skip if empty
		if ( m_parents[i] == -2 ) continue;
		// watch out for bad collnums... corruption...
		collnum_t c = m_collnums[i];
		if ( c < 0 || c >= max ) {
			m_corrupt++;
			continue;
		}
		// must have rec as well. unless it its statsdb tree
		// or m_waitingTree which are collection-less and always use
		// 0 for their collnum. if collection-less m_rdbId==-1.
		if ( ! g_collectiondb.getRec(c) && m_rdbId >= 0 ) {
			m_corrupt++;
			continue;
		}
		// keep a tally on all this
		m_memOccupied += m_overhead;
		increaseNodeCount_unlocked(c, getKey_unlocked(i));
	}
	// bail now if we can 
	if ( m_fixedDataSize == 0 ) return offset - oldOffset ;
	// how much should we read?
	int32_t bufSize = 0;
	if ( m_fixedDataSize == -1 ) {
		for ( int32_t i = start ; i < end ; i++ ) 
			if ( m_parents[i] != -2 ) bufSize += m_sizes[i];
	}
	else if ( m_fixedDataSize > 0 ) {
		for ( int32_t i = start ; i < end ; i++ ) 
			if ( m_parents[i] != -2 ) bufSize += m_fixedDataSize;
	}
	// get space
	char *buf = (char *) stack->allocData(bufSize);
	if ( ! buf ) {
	        log( LOG_ERROR, "db: Failed to allocate %" PRId32" bytes to read %s. Increase tree size for it in gb.conf.",
	             bufSize,f->getFilename());
		return -1;
	}

	// debug
	//log("reading %" PRId32" bytes of raw rec data", bufSize );
	// establish end point
	char *bufEnd = buf + bufSize;
	// . read all into that buf
	// . this should block since callback is NULL
	f->read ( buf , bufSize , offset ) ;
	// return false on read error
	if ( g_errno ) return -1;
	// advance file offset
	offset += bufSize;
	// part it out now
	int32_t  size = m_fixedDataSize;
	for ( int32_t i = start ; i < end ; i++ ) {
		// skip unused
		if ( m_parents[i] == -2 ) continue;
		// get size of his data if it's variable
		if ( m_fixedDataSize == -1 ) size = m_sizes[i];
		// ensure we have the room
		if ( buf + size > bufEnd ) {
			g_errno = EBADFILE;
			log( LOG_ERROR, "db: Encountered record with corrupted size parameter of %" PRId32" in %s.",
			     size, f->getFilename() );
			return -1;
		}
		m_data[i]  = buf;
		buf       += size;
		// update these
		m_memAllocated  += size;
		m_memOccupied += size;
	}
	return offset - oldOffset ;
}
// . caller should call f->set() himself
// . we'll open it here
// . returns false and sets g_errno on error (sometimes g_errno not set)


void RdbTree::cleanTree() {
	ScopedWriteLock sl(m_rwlock);

	// some trees always use 0 for all node collnum_t's like
	// statsdb, waiting tree etc.
	if ( m_rdbId < 0 ) return;

	// the liberation count
	int32_t count = 0;
	collnum_t collnum;
	int32_t max = g_collectiondb.getNumRecs();

	for ( int32_t i = 0 ; i < m_minUnusedNode ; i++ ) {
		// skip node if parents is -2 (unoccupied)
		if ( m_parents[i] == -2 ) continue;
		// is collnum valid?
		if ( m_collnums[i] >= 0   &&
		     m_collnums[i] <  max &&
		     g_collectiondb.getRec(m_collnums[i]) ) continue;
		// if it is negtiave, remove it, that is wierd corruption
		if ( m_collnums[i] < 0 )
			deleteNode_unlocked(i, true);
		// remove it otherwise
		// don't actually remove it!!!! in case collection gets
		// moved accidentally.
		// no... otherwise it can clog up the tree forever!!!!
		deleteNode_unlocked(i, true);
		count++;
		// save it
		collnum = m_collnums[i];
	}

	// print it
	if ( count == 0 ) return;
	log(LOG_LOGIC,"db: Removed %" PRId32" records from %s tree for invalid "
	    "collection number %i.",count,m_dbname,collnum);
	//log(LOG_LOGIC,"db: Records not actually removed for safety. Except "
	//    "for those with negative colnums.");
	static bool s_print = true;
	if ( ! s_print ) return;
	s_print = false;
	log (LOG_LOGIC,"db: This is bad. Did you remove a collection "
	     "subdirectory? Don't do that, you should use the \"delete "
	     "collections\" interface because it also removes records from "
	     "memory, too.");
}

bool RdbTree::isEmpty() const {
	ScopedReadLock sl(m_rwlock);
	return isEmpty_unlocked();
}

bool RdbTree::isEmpty_unlocked() const {
	m_rwlock.verify_is_locked();
	return (m_numUsedNodes == 0);
}

int32_t RdbTree::getNumNodes() const {
	ScopedReadLock sl(m_rwlock);
	return getNumNodes_unlocked();
}

int32_t RdbTree::getNumNodes_unlocked() const {
	m_rwlock.verify_is_locked();
	return m_minUnusedNode;
}

int32_t RdbTree::getNumUsedNodes() const {
	ScopedReadLock sl(m_rwlock);
	return getNumUsedNodes_unlocked();
}

int32_t RdbTree::getNumUsedNodes_unlocked() const {
	m_rwlock.verify_is_locked();
	return m_numUsedNodes;
}

int32_t  RdbTree::getNumAvailNodes() const {
	ScopedReadLock sl(m_rwlock);
	return m_numNodes - m_numUsedNodes;
}

int32_t RdbTree::getNumNegativeKeys() const {
	ScopedReadLock sl(m_rwlock);
	return m_numNegativeKeys;
}

int32_t RdbTree::getNumPositiveKeys() const {
	ScopedReadLock sl(m_rwlock);
	return m_numPositiveKeys;
}

int32_t RdbTree::getNumNegativeKeys(collnum_t collnum) const {
	ScopedReadLock sl(m_rwlock);
	// fix for collectionless rdbs
	if ( m_rdbId < 0 ) return m_numNegativeKeys;

	/// @todo ALC thread safety?
	CollectionRec *cr = g_collectiondb.getRec(collnum);
	if ( ! cr ) return 0;
	return cr->m_numNegKeysInTree[(unsigned char)m_rdbId]; 
}

int32_t RdbTree::getNumPositiveKeys(collnum_t collnum) const {
	ScopedReadLock sl(m_rwlock);
	// fix for collectionless rdbs
	if ( m_rdbId < 0 ) return m_numPositiveKeys;

	/// @todo ALC thread safety?
	CollectionRec *cr = g_collectiondb.getRec(collnum);
	if ( ! cr ) return 0;
	return cr->m_numPosKeysInTree[(unsigned char)m_rdbId]; 
}