#undef _XOPEN_SOURCE
#define _XOPEN_SOURCE 500
#include "RdbBuckets.h"
#include "gb-include.h"
#include "sort.h"
#include "SafeBuf.h"
#include "Threads.h"
#include <unistd.h>
#include <sys/stat.h>
#include "Loop.h"
#include "Rdb.h"

//#define BUCKET_SIZE 64
//#define BUCKET_SIZE 1024
//#define BUCKET_SIZE 2048
//#define BUCKET_SIZE 4096
#define BUCKET_SIZE 8192
//#define BUCKET_SIZE 16384
//#define BUCKET_SIZE 1638400
#define INIT_SIZE 4096
#define SAVE_VERSION 0


inline int KEYCMP12 ( const void *a, const void *b ) {
	char* k1 = (char*)a;
	char* k2 = (char*)b;
	if ( (*(uint64_t *)(k1+4)) <
	     (*(uint64_t *)(k2+4)) ) return -1;
	if ( (*(uint64_t *)(k1+4)) > 
	     (*(uint64_t *)(k2+4)) ) return  1;
	uint32_t k1n0 = ((*(uint32_t*)(k1)) & ~0x01UL);
	uint32_t k2n0 = ((*(uint32_t*)(k2)) & ~0x01UL);
	if ( k1n0 < k2n0 ) return -1;
	if ( k1n0 > k2n0 ) return  1;
	return 0;
}



inline int KEYCMP16 ( const void *a, const void *b ) {
	char* k1 = (char*)a;
	char* k2 = (char*)b;
	if ( (*(uint64_t *)(k1+8)) <
	     (*(uint64_t *)(k2+8)) ) return -1;
	if ( (*(uint64_t *)(k1+8)) >
	     (*(uint64_t *)(k2+8)) ) return  1;
	uint64_t k1n0 = ((*(uint64_t *)(k1)) & ~0x01ULL);
	uint64_t k2n0 = ((*(uint64_t *)(k2)) & ~0x01ULL);
	if ( k1n0 < k2n0 ) return -1;
	if ( k1n0 > k2n0 ) return  1;
	return 0;
}

inline int KEYCMP18 ( const void *a, const void *b ) {
	char* k1 = (char*)a;
	char* k2 = (char*)b;
	if ( (*(uint64_t *)(k1+10)) <
	     (*(uint64_t *)(k2+10)) ) return -1;
	if ( (*(uint64_t *)(k1+10)) >
	     (*(uint64_t *)(k2+10)) ) return  1;
	if ( (*(uint64_t *)(k1+2)) <
	     (*(uint64_t *)(k2+2)) ) return -1;
	if ( (*(uint64_t *)(k1+2)) >
	     (*(uint64_t *)(k2+2)) ) return  1;
	uint16_t k1n0 = ((*(uint16_t *)(k1)) & 0xfffe);
	uint16_t k2n0 = ((*(uint16_t *)(k2)) & 0xfffe);
	if ( k1n0 < k2n0 ) return -1;
	if ( k1n0 > k2n0 ) return  1;
	return 0;
}

inline int KEYCMP24 ( const void *a, const void *b ) {
	char* k1 = (char*)a;
	char* k2 = (char*)b;
	if ( (*(uint64_t *)(k1+16)) <
	     (*(uint64_t *)(k2+16)) ) return -1;
	if ( (*(uint64_t *)(k1+16)) >
	     (*(uint64_t *)(k2+16)) ) return  1;
	if ( (*(uint64_t *)(k1+8)) <
	     (*(uint64_t *)(k2+8)) ) return -1;
	if ( (*(uint64_t *)(k1+8)) >
	     (*(uint64_t *)(k2+8)) ) return  1;
	uint64_t k1n0 = ((*(uint64_t *)(k1)) & ~0x01ULL);
	uint64_t k2n0 = ((*(uint64_t *)(k2)) & ~0x01ULL);
	if ( k1n0 < k2n0 ) return -1;
	if ( k1n0 > k2n0 ) return  1;
	return 0;
}

inline int KEYCMP6 ( const void *a, const void *b ) {
	char* k1 = (char*)a;
	char* k2 = (char*)b;
	if ( (*(uint32_t  *)(k1+2)) <
	     (*(uint32_t  *)(k2+2)) ) return -1;
	if ( (*(uint32_t  *)(k1+2)) >
	     (*(uint32_t  *)(k2+2)) ) return  1;
	if ( (*(uint16_t *)(k1+0)) <
	     (*(uint16_t *)(k2+0)) ) return -1;
	if ( (*(uint16_t *)(k1+0)) >
	     (*(uint16_t *)(k2+0)) ) return  1;
	return 0;
}






bool RdbBucket::set(RdbBuckets* parent, char* newbuf) {
	m_endKey = NULL;
	m_parent = parent;
	m_lastSorted = 0;
	m_numKeys = 0;
	m_keys = newbuf;
	return true;
}


void RdbBucket::reBuf(char* newbuf) {
	if(!m_keys) {
		m_keys = newbuf;
		return;
	}
	gbmemcpy(newbuf, m_keys, m_numKeys * m_parent->getRecSize());
	if(m_endKey) m_endKey = newbuf + (m_endKey - m_keys);
	m_keys = newbuf;
}




RdbBucket::~RdbBucket() {
	reset();
}


void RdbBucket::reset() {
	//m_keys = NULL;
	m_numKeys = 0;
	m_lastSorted = 0;
	m_endKey = NULL;
}

int32_t RdbBuckets::getMemAlloced () {
	int32_t alloced = sizeof(RdbBuckets) + m_masterSize + m_dataMemOccupied;
	return alloced;
}

//includes data in the data ptrs
int32_t RdbBuckets::getMemOccupied() {
	return (m_numKeysApprox * m_recSize) + m_dataMemOccupied +
		sizeof(RdbBuckets) + 
		m_sortBufSize + 
		BUCKET_SIZE * m_recSize; //swapbuf
}


int32_t RdbBuckets::getMemAvailable() {
	return m_maxMem - getMemOccupied();
}


bool RdbBuckets::is90PercentFull() {
	return getMemOccupied () > m_maxMem * .9;
}

bool RdbBuckets::needsDump() {
	if(m_numBuckets + 1 < m_maxBuckets) return false;
	if(m_maxBuckets == m_maxBucketsCapacity) return true;
	return false;
}

//be very conservative with this because if we say we can fit it
//and we can't then we'll get a partial list added and we will
//add the whole list again.
bool RdbBuckets::hasRoom ( int32_t numRecs ) {
	int32_t numBucketsRequired = (((numRecs / BUCKET_SIZE)+1) * 2);
	if(m_maxBucketsCapacity - m_numBuckets < numBucketsRequired) 
		return false;
	return true;
}



bool RdbBucket::sort() {

	//m_lastSorted = 0;//for debugging
	if(m_lastSorted == m_numKeys) return true;


	if(m_numKeys < 2) {
		m_lastSorted = m_numKeys;
		return true;		
	}

	uint8_t ks = m_parent->getKeySize();
	int32_t recSize = m_parent->getRecSize();
	int32_t fixedDataSize = m_parent->getFixedDataSize();
	int (*cmpfn) (const void*, const void *) = NULL;
	if ( ks == 18 ) cmpfn = KEYCMP18;
	else if ( ks == 24 ) cmpfn = KEYCMP24;
	else if ( ks == 12 ) cmpfn = KEYCMP12;
	else if ( ks == 16 ) cmpfn = KEYCMP16;
	else if ( ks ==  6 ) cmpfn = KEYCMP6;
	else { char *xx=NULL;*xx=0; }

	char* mergeBuf  = m_parent->getSwapBuf();
	if(!mergeBuf) {	char* xx = NULL; *xx = 0; }

	int32_t numUnsorted = m_numKeys - m_lastSorted;
	char *list1  = m_keys;
	char *list2  = m_keys + (recSize*m_lastSorted);
	char *list1end  = list2;
	char *list2end  = list2 + (recSize * numUnsorted);
	//turn quickpoll off while we sort,
	//because we do not know what sort of indeterminate state
	//we will be in while sorting
	// MDW: this no longer disables it since it is based on g_niceness
	// now, but what is the point, does it use static vars or what?
	//bool canQuickpoll = g_loop.m_canQuickPoll;
	//g_loop.m_canQuickPoll = false;
	//sort the unsorted portion
	// turn off this way
	int32_t saved = g_niceness;
	g_niceness = 0;
	// . use merge sort because it is stable, and we need to always keep
	// . the identical keys that were added last
	// . now we pass in a buffer to merge into, otherwise one is malloced,
	// . which can fail.  It falls back on qsort which is not stable.
	if(!m_parent->getSortBuf()) {char *xx = NULL; *xx = 0;}
	gbmergesort (list2, numUnsorted , recSize , cmpfn, 0,
		     m_parent->getSortBuf(), m_parent->getSortBufSize()); 
	
	//g_loop.m_canQuickPoll = canQuickpoll;
	g_niceness = saved;

	char *p  = mergeBuf;
	char v;
	char *lastKey = NULL;
	int32_t br = 0; //bytesRemoved (abbreviated for column width)
	int32_t dso = ks + sizeof(char*);//datasize offset
	int32_t numNeg = 0;

	while(1) {
		if(list1 >= list1end) {
			// . just copy into place, deduping as we go
			while(list2 < list2end) {
				if(lastKey && KEYCMPNEGEQ(list2,lastKey,ks) == 0) {
					//this is a dup, we are removing data
					if(fixedDataSize != 0) {
					      if(fixedDataSize == -1) 
					           br += *(int32_t*)(lastKey+dso);
					      else br += fixedDataSize;
					}
					if ( KEYNEG(lastKey) ) numNeg++;
					p = lastKey;
				}
				gbmemcpy(p, list2, recSize);
				lastKey = p;
				p += recSize;
				list2 += recSize;
			}

			break;
		}
		if(list2 >= list2end) {
			// . if all that is left is list 1 just copy it into 
			// . place, since it is already deduped
			gbmemcpy(p, list1, list1end - list1);
			p += list1end - list1;
			break;
		}
		v = KEYCMPNEGEQ(list1, list2, ks); 
		if(v < 0) {
			//never overwrite the merged list from list1 because
			//it is always older and it is already deduped
			if(lastKey && KEYCMPNEGEQ(list1, lastKey, ks) == 0) {
				if ( KEYNEG(lastKey) ) numNeg++;
				list1 += recSize;
				continue;
			}
			gbmemcpy(p, list1, recSize);
			lastKey = p;
			p += recSize;
			list1 += recSize;
		}
		else if(v > 0) {
			//copy it over the one we just copied in
			if(lastKey && KEYCMPNEGEQ(list2, lastKey, ks) == 0) {
				//this is a dup, we are removing data
				if(fixedDataSize != 0) {
					if(fixedDataSize == -1) 
						br += *(int32_t*)(lastKey+dso);
					else br += fixedDataSize;
				}
				if ( KEYNEG(lastKey) ) numNeg++;
				p = lastKey;
			}

			gbmemcpy(p, list2, recSize);
			lastKey = p;
			p += recSize;
			list2 += recSize;
		}
		else {
			if(lastKey && KEYCMPNEGEQ(list2, lastKey, ks) == 0) {
				if(fixedDataSize != 0) {
					if(fixedDataSize == -1) 
						br += *(int32_t*)(lastKey+dso);
					else br += fixedDataSize;
				}
				if ( KEYNEG(lastKey) ) numNeg++;
				p = lastKey;
			}

			//found dup, take list2's
 			gbmemcpy(p, list2, recSize);
			lastKey = p;
 			p += recSize;
 			list2 += recSize;
 			list1 += recSize; //fuggedaboutit!
		}
	}

	//we compacted out the dups, so reflect that here
	int32_t newNumKeys = (p - mergeBuf) / recSize;
	m_parent->updateNumRecs(newNumKeys - m_numKeys , - br, -numNeg);
	m_numKeys = newNumKeys;

	if(m_keys != mergeBuf) m_parent->setSwapBuf(m_keys);
	m_keys = mergeBuf;
	m_lastSorted = m_numKeys;
	m_endKey = m_keys + ((m_numKeys - 1) * recSize);
	return true;
	

}


//make 2 half full buckets,
//addKey assumes that the *this bucket retains the lower half of the keys
//returns a new bucket with the remaining upper half.
RdbBucket* RdbBucket::split(RdbBucket* newBucket) {

	//	log(LOG_WARN, "splitting bucket");
	int32_t b1NumKeys = m_numKeys >> 1; //m_numkeys / 2
	int32_t b2NumKeys = m_numKeys - b1NumKeys;
	int32_t recSize = m_parent->getRecSize();
	//configure the new bucket
	gbmemcpy(newBucket->m_keys, 
	       m_keys + (b1NumKeys*recSize),
	       b2NumKeys * recSize);
	newBucket->m_numKeys = b2NumKeys;
	newBucket->m_lastSorted = b2NumKeys;
	newBucket->m_endKey = newBucket->m_keys + ((b2NumKeys - 1) * recSize);
	
	//reconfigure the old bucket
	m_numKeys = b1NumKeys;
	m_lastSorted = b1NumKeys;
	m_endKey = m_keys + ((b1NumKeys - 1) * recSize);

	//add it to our parent
	return newBucket;
}


bool RdbBucket::addKey(char *key , char *data , int32_t dataSize) {

	uint8_t ks = m_parent->getKeySize();
	int32_t recSize = m_parent->getRecSize();
	bool isNeg = KEYNEG(key);

	char *newLoc = m_keys + (recSize * m_numKeys);
	gbmemcpy(newLoc, key, ks);
	
	if(data) {
		*(char**)(newLoc + ks) = data;
		if(m_parent->getFixedDataSize() == -1) {
			*(int32_t*)(newLoc + ks + sizeof(char*)) = (int32_t)dataSize;
		}
	}
	
	if(m_endKey == NULL) { //are we the first key?
		if(m_numKeys > 0) {char* xx = NULL; *xx = 0;}
		m_endKey = newLoc;
		m_lastSorted = 1;
	}
	else {
		// . minor optimization: if we are almost sorted, then
		// . see if we can't maintain that state.
		char v = KEYCMPNEGEQ(key, m_endKey, ks); 
		//char v = KEYCMP(key, m_endKey, ks);
		if(v == 0) {
			// . just replace the old key if we were the same,
			// . don't inc num keys
			gbmemcpy(m_endKey, newLoc, recSize);
			if(KEYNEG(m_endKey)) {
				if(isNeg) return true;
				else m_parent->updateNumRecs(0, 0, -1);
			}
			else if(isNeg) m_parent->updateNumRecs(0, 0, 1);;
			return true;
		}
		else if(v > 0) {
			// . if we were greater than the old key, 
			// . we can assume we are still sorted, which
			// . really helps us for adds which are in order
			if(m_lastSorted == m_numKeys) m_lastSorted++;
			m_endKey = newLoc;
		}
	}
	m_numKeys++;
	m_parent->updateNumRecs(1 , dataSize, isNeg?1:0);
	return true;
}

char* RdbBucket::getKeyVal ( char *key , char **data , int32_t* dataSize ) {

	sort();
	int32_t i = getKeyNumExact(key);
	if(i < 0) return NULL;

	int32_t recSize = m_parent->getRecSize();
	uint8_t ks = m_parent->getKeySize();
	char* rec = m_keys + (recSize * i);

	if(data) {
		*data = rec + ks;
		if(dataSize)
			*dataSize = *(int32_t*)*data + sizeof(char*);
	}
	return rec;
}

int32_t RdbBucket::getKeyNumExact(char* key) {

	uint8_t ks = m_parent->getKeySize();
	int32_t recSize = m_parent->getRecSize();
	int32_t i = 0;
	char v;
	char* kk;
	int32_t low = 0;
	int32_t high = m_numKeys - 1;
	while(low <= high) {
		int32_t delta = high - low;
		i = low + (delta >> 1);
		kk = m_keys + (recSize * i);
		v = KEYCMP(key,kk,ks);
		if(v < 0) {
			high = i - 1;
			continue;
		}
		else if(v > 0) {
			low = i + 1;
			continue;
		}
		else return i;
	}
	return -1;
}





bool RdbBucket::selfTest (char* prevKey) {
	sort();
	char* last = NULL;
	char* kk = m_keys;
	int32_t recSize = m_parent->getRecSize();
	int32_t ks = m_parent->getKeySize();

	//ensure our first key is > the last guy's end key
	if(prevKey != NULL && m_numKeys > 0) {
		if(KEYCMP(prevKey, m_keys,ks) > 0) {
			log(LOG_WARN, "db: bucket's first key: %016"XINT64"%08"XINT32" "
			    "is less than last bucket's end key: "
			    "%016"XINT64"%08"XINT32"!!!!!",
			    *(int64_t*)(m_keys+(sizeof(int32_t))), 
			    *(int32_t*)m_keys,
			    *(int64_t*)(prevKey+(sizeof(int32_t))), 
			    *(int32_t*)prevKey);
			//printBucket();
			return false;
			//char* xx = NULL; *xx = 0;
		}
	}

 	for(int32_t i = 0; i < m_numKeys; i++) {
//log(LOG_WARN, "rdbbuckets last key: ""%016"XINT64"%08"XINT32" num keys: %"INT32"",
//   *(int64_t*)(kk+(sizeof(int32_t))), *(int32_t*)kk, m_numKeys);
		if(i > 0 && KEYCMP(last, kk, ks) > 0) {
			log(LOG_WARN, "db: bucket's last key was out "
			    "of order!!!!!"
			    "key was: %016"XINT64"%08"XINT32" vs prev: %016"XINT64"%08"XINT32""
			    " num keys: %"INT32""
			    " ks=%"INT32" bucketNum=%"INT32"",
			    *(int64_t*)(kk+(sizeof(int32_t))), *(int32_t*)kk, 
			    *(int64_t*)(last+(sizeof(int32_t))), *(int32_t*)last, 
			    m_numKeys, ks, i);
			return false;
			//char* xx = NULL; *xx = 0;
		}
		last = kk;
		kk += recSize;
	}
	return true;
}

void RdbBuckets::printBuckets() {
 	for(int32_t i = 0; i < m_numBuckets; i++) {
		m_buckets[i]->printBucket();
	}
}


void RdbBucket::printBucket() {
	char* kk = m_keys;
	int32_t recSize = m_parent->getRecSize();
 	for(int32_t i = 0; i < m_numKeys;i++) {
 		log(LOG_WARN, "rdbbuckets last key: ""%016"XINT64"%08"XINT32" num "
		    "keys: %"INT32"",
 		    *(int64_t*)(kk+(sizeof(int32_t))), *(int32_t*)kk, m_numKeys);
		kk += recSize;
	}
}


RdbBuckets::RdbBuckets() {
	m_numBuckets = 0; 
	m_masterPtr = NULL;
	m_buckets = NULL;
	m_swapBuf = NULL;
	m_sortBuf = NULL;
	m_isWritable = true;
	m_isSaving = false;
	m_dataMemOccupied = 0;
	m_needsSave = false;
	m_repairMode = false;
}



bool RdbBuckets::set ( int32_t fixedDataSize , int32_t maxMem, 
		       bool ownData ,
		       char *allocName ,
		       char rdbId,
		       bool dataInPtrs  ,
		       char *dbname ,
		       char keySize ,
		       bool useProtection ) {
	m_numBuckets = 0;
	m_ks = keySize;
	m_rdbId = rdbId;
	m_allocName = allocName;
	m_fixedDataSize = fixedDataSize;
	m_recSize = m_ks;
	if(m_fixedDataSize != 0) {
		m_recSize += sizeof(char*);
		if(m_fixedDataSize == -1) m_recSize += sizeof(int32_t);
	}
	m_numKeysApprox = 0;
	m_numNegKeys = 0;
	m_dbname = dbname;
	m_swapBuf = NULL;
 	m_sortBuf = NULL;
	//taken from sort.cpp, this is to prevent mergesort from mallocing
	m_sortBufSize = BUCKET_SIZE * m_recSize + sizeof(char*);
	if(m_buckets) {char *xx = NULL; *xx = 0;}
	m_maxBuckets = 0;
	m_masterSize = 0;
	m_masterPtr =  NULL;
	m_maxMem = maxMem;

	int32_t perBucket = sizeof(RdbBucket*) + 
		sizeof(RdbBucket)
		+ BUCKET_SIZE * m_recSize;
	int32_t overhead = m_sortBufSize +
		BUCKET_SIZE * m_recSize + //swapbuf
		sizeof(RdbBuckets); //thats us, silly
	int32_t avail = m_maxMem - overhead;

	m_maxBucketsCapacity = avail / perBucket;
	if(m_maxBucketsCapacity <= 0) {
		log("db: max memory for %s's buckets is way too small to"
		    " accomodate even 1 bucket, reduce bucket size(%"INT32") "
		    "or increase max mem(%"INT32")",
		    m_dbname, (int32_t)BUCKET_SIZE, m_maxMem);
		char *xx = NULL; *xx = 0;
	}

	if(!resizeTable(INIT_SIZE)) {
		g_errno = ENOMEM;
		return false;
	}
	
	// log("init: Successfully initialized buckets for %s, "
	//     "keysize is %"INT32", max mem is %"INT32", datasize is %"INT32"",
	//     m_dbname, (int32_t)m_ks, m_maxMem, m_fixedDataSize);


	/*
	RdbBuckets b;
	b.set ( 0, // fixedDataSize, 
		50000000 , // maxTreeMem,
		false, //own data
		"tbuck", // m_treeName, // allocName
		false, //data in ptrs
		"tbuck",//m_dbname,
		16, // m_ks,
		false);
	collnum_t cn = 1;
	key128_t k;
	k.n1 = 12;
	k.n0 = 11;
	b.addNode ( cn , (char *)&k, NULL, 0 );
	// negate it
	k.n0 = 10;
	b.addNode ( cn , (char *)&k, NULL, 0 );

	// try that
	key128_t k1;
	key128_t k2;
	k1.setMin();
	k2.setMax();
	RdbList list;
	int32_t np,nn;
	b.getList ( cn,(char *)&k1,(char *)&k2,1000,&list,&np,&nn,false);
	if ( np != 0 || nn != 1 ) { char *xx=NULL;*xx=0; }
	// must be empty
	if ( b.getNumKeys() != 0 ) { char *xx=NULL;*xx=0; }
	*/

	return true;
}


RdbBuckets::~RdbBuckets( ) {
	reset();
}


void RdbBuckets::setNeedsSave(bool s) {
	m_needsSave = s;
}


void RdbBuckets::reset() {
	for(int32_t j = 0; j < m_numBuckets; j++) {
		m_buckets[j]->reset();
	}
	if(m_masterPtr) mfree(m_masterPtr, m_masterSize, m_allocName );
	m_masterPtr = NULL;
	m_buckets = NULL;
	m_bucketsSpace = NULL;
	m_numBuckets = 0;
	m_maxBuckets = 0;
	m_dataMemOccupied = 0;
	m_firstOpenSlot = 0;
	m_numKeysApprox = 0;
	m_numNegKeys = 0;
	m_sortBuf = NULL;
	m_swapBuf = NULL;
}


void RdbBuckets::clear() {
	for(int32_t j = 0; j < m_numBuckets; j++) {
		m_buckets[j]->reset();
	}
	m_numBuckets = 0;
	m_firstOpenSlot = 0;
	m_dataMemOccupied = 0;
	m_numKeysApprox = 0;
	m_numNegKeys = 0;
	m_needsSave = true;
}




RdbBucket* RdbBuckets::bucketFactory() {

	if(m_numBuckets == m_maxBuckets - 1) {
		if(!resizeTable(m_maxBuckets * 2)) return NULL;
	}
	
	RdbBucket *b;
	if(m_firstOpenSlot > m_numBuckets) {
		int32_t i = 0;
		for(; i < m_numBuckets; i++) {
			if(m_bucketsSpace[i].getNumKeys() == 0) break;
		}
		b = &m_bucketsSpace[i];
	}
	else {
		b = &m_bucketsSpace[m_firstOpenSlot];
		m_firstOpenSlot++;
	}
	return b;
}




bool RdbBuckets::resizeTable(int32_t numNeeded) {
	if(numNeeded == m_maxBuckets) return true;

	if(numNeeded < INIT_SIZE) numNeeded = INIT_SIZE;

	if(numNeeded > m_maxBucketsCapacity) {
		if(m_maxBucketsCapacity <= m_maxBuckets) {
			log(LOG_INFO,
			    "db: could not resize buckets currently have %"INT32" "
			    "buckets, asked for %"INT32", max number of buckets"
			    " for %"INT32" bytes with keysize %"INT32" is %"INT32"",
			    m_maxBuckets, numNeeded, m_maxMem, (int32_t)m_ks,
			    m_maxBucketsCapacity);
			g_errno = ENOMEM;
			return false;
		}
		// log(LOG_INFO,
		//     "db: scaling down request for buckets.  "
		//     "Currently have %"INT32" "
		//     "buckets, asked for %"INT32", max number of buckets"
		//     " for %"INT32" bytes is %"INT32".",
		//     m_maxBuckets, numNeeded, m_maxMem, m_maxBucketsCapacity);

		numNeeded = m_maxBucketsCapacity;
	}

	int32_t perBucket = sizeof(RdbBucket*) + 
		sizeof(RdbBucket)
		+ BUCKET_SIZE * m_recSize;

	int32_t tmpMaxBuckets = numNeeded;
	int32_t newMasterSize = tmpMaxBuckets * perBucket +
		(BUCKET_SIZE * m_recSize) + /*swap buf*/
		m_sortBufSize;         /*sort buf*/

	if(newMasterSize > m_maxMem) {
		log(LOG_WARN,
		    "db: Buckets oops, trying to malloc more(%"INT32") that max "
		    "mem(%"INT32"), should've caught this earlier.",
		    newMasterSize, m_maxMem);
		char* xx = NULL; *xx = 0;
	}

	char *tmpMasterPtr = (char*)mmalloc(newMasterSize, m_allocName);
	if(!tmpMasterPtr) {
		g_errno = ENOMEM;
		return false;
	}
	char* p = tmpMasterPtr;
	char* bucketMemPtr = p;
	p += (BUCKET_SIZE * m_recSize) * tmpMaxBuckets;
	m_swapBuf = p;
	p += (BUCKET_SIZE * m_recSize);
	m_sortBuf = p;
	p += m_sortBufSize;

	RdbBucket** tmpBucketPtrs = (RdbBucket**)p;
	p += tmpMaxBuckets * sizeof(RdbBucket*);
	RdbBucket* tmpBucketSpace = (RdbBucket*)p;
	p += tmpMaxBuckets * sizeof(RdbBucket);
	if(p - tmpMasterPtr != newMasterSize) {char* xx = NULL; *xx = 0;}

	for(int32_t i = 0; i < m_numBuckets; i++) {
		//copy them over one at a time so they
		//will now be contiguous and consistent
		//with the ptrs array.
		tmpBucketPtrs[i] = &tmpBucketSpace[i];
		gbmemcpy(&tmpBucketSpace[i],
		       m_buckets[i],
		       sizeof(RdbBucket));
		tmpBucketSpace[i].reBuf(bucketMemPtr);
		bucketMemPtr += (BUCKET_SIZE * m_recSize);
	}
	//now do the rest
	for(int32_t i = m_numBuckets; i < tmpMaxBuckets; i++) {
		tmpBucketSpace[i].set(this, bucketMemPtr);
		bucketMemPtr += (BUCKET_SIZE * m_recSize);
	}
	if(bucketMemPtr != m_swapBuf) {char* xx = NULL; *xx = 0;}

// 	log(LOG_WARN, "new size = %"INT32", old size = %"INT32", newMemUsed = %"INT32" "
// 	    "oldMemUsed = %"INT32"", 
// 	    numNeeded, m_maxBuckets, newMasterSize, m_masterSize);

	if(m_masterPtr) mfree(m_masterPtr, m_masterSize, m_allocName);
	m_masterPtr = tmpMasterPtr;
	m_masterSize = newMasterSize;
	m_buckets = tmpBucketPtrs;
	m_bucketsSpace = tmpBucketSpace;
	m_maxBuckets = tmpMaxBuckets;
	m_firstOpenSlot = m_numBuckets;
	return true;
}




int32_t RdbBuckets::addNode (collnum_t collnum,
			  char *key,
			  char *data , int32_t dataSize ) {

	if(!m_isWritable || m_isSaving ) {
		g_errno = EAGAIN;
		return false;
	}

	m_needsSave = true;

	int32_t i;

	i = getBucketNum(key, collnum);
	if(i == m_numBuckets  ||
	   m_buckets[i]->getCollnum() != collnum) {
		int32_t bucketsCutoff = (BUCKET_SIZE>>1);
		//when repairing the keys are added in order,
		//so fill them up all of the way before moving
		//on to the next one.
		if(m_repairMode) bucketsCutoff = BUCKET_SIZE;

  		if(i != 0 && 
		   m_buckets[i-1]->getCollnum() == collnum &&
		   m_buckets[i-1]->getNumKeys() < bucketsCutoff) {
  			i--;
  		}
		else if(i == m_numBuckets) {
			m_buckets[i] = bucketFactory();
			if(m_buckets[i] == NULL) {
				g_errno = ENOMEM;
				return -1;
			}
			m_buckets[i]->setCollnum(collnum);
			m_numBuckets++;
		}
		else { //m_buckets[i]->getCollnum() != collnum
			RdbBucket* newBucket = bucketFactory();
			if(m_buckets[i] == NULL) {//can't really happen here..
				g_errno = ENOMEM;
				return -1;
			}
			newBucket->setCollnum(collnum);
			addBucket(newBucket, i);
		}

	}
	//check if we are full
	if(m_buckets[i]->getNumKeys() == BUCKET_SIZE) {
		//split the bucket
		int64_t t = gettimeofdayInMilliseconds();
		m_buckets[i]->sort();
		RdbBucket* newBucket = bucketFactory();
		if(newBucket == NULL ) {
			g_errno = ENOMEM;
			return -1;
		}
		newBucket->setCollnum(collnum);
		m_buckets[i]->split(newBucket);
		addBucket(newBucket, i+1);
		if(bucketCmp(key, collnum, m_buckets[i]) > 0) i++;
		
		int64_t took = gettimeofdayInMilliseconds() - t;
		if(took > 10) log(LOG_WARN, 
				  "db: split bucket in %"INT64" ms for %s",took, 
				  m_dbname);
	}

	m_buckets[i]->addKey(key, data, dataSize);
	//if(rand() % 100 == 0) 	selfTest(true, true);
	return 0;
}


bool RdbBuckets::addBucket (RdbBucket* newBucket, int32_t i) {

	//int32_t i = getBucketNum(newBucket->getEndKey(), newBucket->getCollnum());
	m_numBuckets++;
	int32_t moveSize = (m_numBuckets - i)*sizeof(RdbBuckets*);
	if(moveSize > 0)
		memmove(&m_buckets[i+1], &m_buckets[i], moveSize);
	m_buckets[i] = newBucket;
	return true;
}

// void RdbBuckets::deleteBucket ( int32_t i ) {
// 	int32_t moveSize = (m_numBuckets - i)*sizeof(RdbBuckets*);
// 	if(moveSize > 0)
// 		memmove(&m_buckets[i+1], &m_buckets[i], moveSize);
// 	m_numBuckets--;
// }

bool RdbBuckets::getList ( collnum_t collnum ,
			   char *startKey, char *endKey, int32_t minRecSizes ,
			   RdbList *list , int32_t *numPosRecs , 
			   int32_t *numNegRecs,
			   bool useHalfKeys ) {

	if ( numNegRecs ) *numNegRecs = 0;
	if ( numPosRecs ) *numPosRecs = 0;
	// set *lastKey in case we have no nodes in the list
	//if ( lastKey ) *lastKey = endKey;
	// . set the start and end keys of this list
	// . set lists's m_ownData member to true
	list->reset();
	// got set m_ks first so the set ( startKey, endKey ) works!
	list->m_ks = m_ks;
	list->set              ( startKey , endKey );
	list->setFixedDataSize ( m_fixedDataSize   );
	list->setUseHalfKeys   ( useHalfKeys       );
	// bitch if list does not own his own data
	if ( ! list->getOwnData() ) {
		g_errno = EBADENGINEER;
		return log(LOG_LOGIC,"db: rdbbuckets: getList: List does not "
			   "own data");
	}
	// bail if minRecSizes is 0
	if ( minRecSizes == 0 ) return true;
	if ( minRecSizes < 0 ) minRecSizes = 0x7fffffff;//LONG_MAX;

	int32_t startBucket = getBucketNum(startKey, collnum);
	if(startBucket > 0 && 
	   bucketCmp(startKey, collnum, m_buckets[startBucket-1]) < 0)
		startBucket--;

	// if the startKey is past our last bucket, then nothing
	// to return
	if(startBucket == m_numBuckets ||
	   m_buckets[startBucket]->getCollnum() != collnum) return true;


	int32_t endBucket;
	if(bucketCmp(endKey, collnum, m_buckets[startBucket]) <= 0)
		endBucket = startBucket;
	else endBucket = getBucketNum(endKey, collnum);
	if(endBucket == m_numBuckets ||
	   m_buckets[endBucket]->getCollnum() != collnum) endBucket--;
	//log(LOG_WARN, "db numBuckets %"INT32" start %"INT32" end %"INT32"", 
	//m_numBuckets, startBucket, endBucket);
	if(m_buckets[endBucket]->getCollnum() != collnum) {
		char* xx = NULL; *xx = 0;
	}

	int32_t growth = 0;

	if(startBucket == endBucket) {
		growth = m_buckets[startBucket]->getNumKeys() * m_recSize;
		if(growth > minRecSizes) growth = minRecSizes + m_recSize;
		if(!list->growList(growth))
			return log("db: Failed to grow list to %"INT32" bytes "
				   "for storing "
				   "records from buckets: %s.",
				   growth,mstrerror(g_errno));

		if(!m_buckets[startBucket]->getList(list, 
						    startKey, 
						    endKey,
						    minRecSizes,
						    numPosRecs, 
						    numNegRecs, 
						    useHalfKeys))
			return false;
		return true;
	}

	//reserve some space, it is an upper bound
	for(int32_t i = startBucket; i <= endBucket; i++) 
		growth += m_buckets[i]->getNumKeys() * m_recSize;

	if(growth > minRecSizes) growth = minRecSizes + m_recSize;
	if(!list->growList(growth))
		return log("db: Failed to grow list to %"INT32" bytes for storing "
			   "records from buckets: %s.",
			   growth, mstrerror(g_errno));

	// separate into 3 different calls so we don't have 
	// to search for the start and end keys within the buckets
	// unnecessarily.
	if(!m_buckets[startBucket]->getList(list, 
					    startKey, 
					    NULL,
					    minRecSizes,
					    numPosRecs, 
					    numNegRecs, 
					    useHalfKeys))
		return false;

	int32_t i = startBucket + 1;
	for(; i < endBucket && list->getListSize() < minRecSizes; i++) {
		if(!m_buckets[i]->getList(list, 
					  NULL, 
					  NULL,
					  minRecSizes,
					  numPosRecs, 
					  numNegRecs, 
					  useHalfKeys))
			return false;
	}
	
	if(list->getListSize() < minRecSizes)
		if(!m_buckets[i]->getList(list, 
					  NULL, 
					  endKey,
					  minRecSizes,
					  numPosRecs, 
					  numNegRecs, 
					  useHalfKeys))
			return false;

	return true;

}


int RdbBuckets::getListSizeExact ( collnum_t collnum ,
				   char *startKey, 
				   char *endKey ) {

	int numBytes = 0;

	int32_t startBucket = getBucketNum(startKey, collnum);

	// does this mean empty?
	if(startBucket > 0 && 
	   bucketCmp(startKey, collnum, m_buckets[startBucket-1]) < 0)
		startBucket--;

	if(startBucket == m_numBuckets ||
	   m_buckets[startBucket]->getCollnum() != collnum) 
		return 0;

	int32_t endBucket;
	if(bucketCmp(endKey, collnum, m_buckets[startBucket]) <= 0)
		endBucket = startBucket;
	else 
		endBucket = getBucketNum(endKey, collnum);

	if(endBucket == m_numBuckets ||
	   m_buckets[endBucket]->getCollnum() != collnum) endBucket--;

	//log(LOG_WARN, "db numBuckets %"INT32" start %"INT32" end %"INT32"", 
	//m_numBuckets, startBucket, endBucket);
	if(m_buckets[endBucket]->getCollnum() != collnum) {
		char* xx = NULL; *xx = 0; }

	for( int32_t i = startBucket ; i <= endBucket ; i++)
		numBytes += m_buckets[i]->getListSizeExact(startKey,endKey);
	
	return numBytes;
}

bool RdbBuckets::testAndRepair() {
	if(!selfTest(true/*thorough*/, 
		     false/*core on error*/)) {
		if(!repair()) return false;
		m_needsSave = true;
	}
	return true;
}


bool RdbBuckets::repair() {
	if(m_numBuckets == 0 && 
	   (m_numKeysApprox != 0 || m_numNegKeys != 0)) {
		   m_numKeysApprox = 0;
		   m_numNegKeys = 0;
		   log("db: RdbBuckets repaired approx key count to reflect "
		       "true number of keys.");
	}

	//int32_t tmpMaxBuckets = m_maxBuckets;
	int32_t tmpMasterSize = m_masterSize;
	char *tmpMasterPtr = m_masterPtr;
	RdbBucket **tmpBucketPtrs = m_buckets;
	int32_t tmpNumBuckets = m_numBuckets;
	m_masterPtr = NULL;
	m_masterSize = 0;
	m_numBuckets = 0;
	reset();
	if(!resizeTable(INIT_SIZE)) {
		log("db: RdbBuckets could not alloc enough memory to repair "
		    "corruption.");
		g_errno = ENOMEM;
		return false;
	}

	m_repairMode = true;

 	for(int32_t j = 0; j < tmpNumBuckets; j++) {
		collnum_t collnum = tmpBucketPtrs[j]->getCollnum();
 		for(int32_t i = 0; i < tmpBucketPtrs[j]->getNumKeys(); i++) {
 			char* currRec = tmpBucketPtrs[j]->getKeys() + 
				m_recSize * i;
 			char* data = NULL;
 			int32_t dataSize = m_fixedDataSize;
			
 			if(m_fixedDataSize != 0) {
 				data = currRec + m_ks;
 				if(m_fixedDataSize == -1)
 					dataSize = *(int32_t*)(data + sizeof(char*));
 			}
 			if(addNode(collnum, currRec, data, dataSize) < 0) {
 				log(LOG_WARN, "db: got unrepairable error in "
 				    "RdbBuckets, could not re-add data");
				return false;
			}
 		}
 	}

	m_repairMode = false;

	if(tmpMasterPtr) mfree(tmpMasterPtr, tmpMasterSize, m_allocName);

 	log("db: RdbBuckets repair for %"INT32" keys complete", m_numKeysApprox);
	return true;
}



bool RdbBuckets::selfTest(bool thorough, bool core) {
	if(m_numBuckets == 0 && m_numKeysApprox != 0) return false;
	int32_t totalNumKeys = 0;
	char* last = NULL;
	collnum_t lastcoll = -1;
	int32_t numColls = 0;

	for(int32_t i = 0; i < m_numBuckets; i++) {
		RdbBucket* b = m_buckets[i];
		if(lastcoll != b->getCollnum()) {
			last = NULL;
			numColls++;
		}
		if(thorough) {
			if(!b->selfTest (last)) {
				if(!core) return false;
				char* xx = NULL; *xx = 0;
			}
		}


		totalNumKeys += b->getNumKeys();
		char* kk = b->getEndKey();
		//log(LOG_WARN, "rdbbuckets last key: ""%016"XINT64"%08"XINT32" "
		//"num keys: %"INT32"",
		//*(int64_t*)(kk+(sizeof(int32_t))),*(int32_t*)kk,b->getNumKeys());
		if(i > 0 &&
		   lastcoll == b->getCollnum() && 
		   KEYCMPNEGEQ(last, kk,m_ks) >= 0) {
			log(LOG_WARN, "rdbbuckets last key: "
			    "%016"XINT64"%08"XINT32" num keys: %"INT32"",
			    *(int64_t*)(kk+(sizeof(int32_t))),
			    *(int32_t*)kk, b->getNumKeys());
			log(LOG_WARN, "rdbbuckets last key was out "
			    "of order!!!!!");
			if(!core) return false;
			char* xx = NULL; *xx = 0;
		}
		last = kk;
		lastcoll = b->getCollnum();
	}
	if ( totalNumKeys != m_numKeysApprox )
	log(LOG_WARN, "db have %"INT32" keys,  should have %"INT32". "
	    "%"INT32" buckets in %"INT32" colls for db %s", 
	    totalNumKeys, m_numKeysApprox, m_numBuckets, 
	    numColls, m_dbname);

	if(thorough && totalNumKeys != m_numKeysApprox) {
		return false;
	}
	return true;
}


char RdbBuckets::bucketCmp(char *akey, collnum_t acoll, 
			   char *bkey, collnum_t bcoll) {
	if (acoll == bcoll) return KEYCMPNEGEQ(akey, bkey, m_ks);
	if (acoll <  bcoll) return -1;
	return 1;
}

char RdbBuckets::bucketCmp(char *akey, collnum_t acoll, 
			   RdbBucket* b) {
	if (acoll == b->getCollnum())
		return KEYCMPNEGEQ(akey, b->getEndKey(), m_ks);
	if (acoll < b->getCollnum()) return -1;
	return 1;
}



int32_t RdbBuckets::getBucketNum(char* key, collnum_t collnum) {

	if(m_numBuckets < 10) {
		int32_t i = 0;
		for(; i < m_numBuckets; i++) {
			RdbBucket* b = m_buckets[i];
			char v = bucketCmp(key, collnum, b);
			if(v > 0) continue;
			if(v < 0) {break;}
			else break;
		}
		return i;
	}
	int32_t i = 0;
	char v;
	RdbBucket* b = NULL;
	int32_t low = 0;
	int32_t high = m_numBuckets - 1;
	while(low <= high) {
		int32_t delta = high - low;
		i = low + (delta >> 1);
		b = m_buckets[i];
		char v = bucketCmp(key, collnum, b);
		if(v < 0) {
			high = i - 1;
			continue;
		}
		else if(v > 0) {
			low = i + 1;
			continue;
		}
		else return i;
	}
	
	//now fine tune:
  	v = bucketCmp(key, collnum, b);
	if(v > 0) i++;
	return i;
}


bool RdbBuckets::collExists(collnum_t collnum) {
	for(int32_t i = 0; i < m_numBuckets; i++) {
		if(m_buckets[i]->getCollnum() == collnum)return true;
		if(m_buckets[i]->getCollnum() > collnum)  break;
	}
	return false;
}

int32_t RdbBuckets::getNumKeys(collnum_t collnum) {
	int32_t numKeys = 0;
	for(int32_t i = 0; i < m_numBuckets; i++) {
		if(m_buckets[i]->getCollnum() == collnum) 
			numKeys += m_buckets[i]->getNumKeys();
		if(m_buckets[i]->getCollnum() > collnum)  break;
	}
	return numKeys;
}

	
int32_t RdbBuckets::getNumKeys() {
	return m_numKeysApprox;
}


// int32_t RdbBuckets::getNumNegativeKeys ( collnum_t collnum ) {
//  	return m_numNegKeys;
// }


// int32_t RdbBuckets::getNumPositiveKeys ( collnum_t collnum ) {
// 	return getNumKeys(collnum) - getNumNegativeKeys(collnum); 
// }


int32_t RdbBuckets::getNumNegativeKeys ( ) {
	return m_numNegKeys;
}


int32_t  RdbBuckets::getNumPositiveKeys ( ) {
	return getNumKeys() - getNumNegativeKeys ( );
}


char* RdbBuckets::getKeyVal ( collnum_t collnum , char *key , 
			      char **data , int32_t* dataSize ) {

	int32_t i = getBucketNum(key, collnum);
	if(i == m_numBuckets ||
	   m_buckets[i]->getCollnum() != collnum ) return NULL;
	return m_buckets[i]->getKeyVal(key, data, dataSize);
}


void RdbBuckets::updateNumRecs(int32_t n, int32_t bytes, int32_t numNeg) {
	m_numKeysApprox += n;
	m_dataMemOccupied += bytes;
	m_numNegKeys += numNeg;
}


char *RdbBucket::getFirstKey() {
	sort();
	return m_keys;
}

int32_t RdbBucket::getNumNegativeKeys ( ) {
	int32_t numNeg = 0;
	int32_t recSize = m_parent->getRecSize();
	char *currKey = m_keys;

	char *lastKey = m_keys + (m_numKeys * recSize);

	while(currKey < lastKey) {  //&& !list->isExhausted()
		if ( KEYNEG(currKey) ) numNeg++;
		currKey += recSize;
	}
	return numNeg;
}


bool RdbBucket::getList(RdbList* list, 
			char* startKey, 
			char* endKey,
			int32_t minRecSizes,
			int32_t *numPosRecs, 
			int32_t *numNegRecs, 
			bool useHalfKeys) {

	sort();
	//get our bounds within the bucket:
	uint8_t ks = m_parent->getKeySize();
	int32_t recSize = m_parent->getRecSize();
	int32_t start = 0;
	int32_t end = m_numKeys - 1;
	char v;
	char* kk = NULL;
	if(startKey) {
		int32_t low = 0;
		int32_t high = m_numKeys - 1;
		while(low <= high) {
			int32_t delta = high - low;
			start = low + (delta >> 1);
			kk = m_keys + (recSize * start);
			v = KEYCMP(startKey,kk,ks);
			if(v < 0) {
				high = start - 1;
				continue;
			}
			else if(v > 0) {
				low = start + 1;
				continue;
			}
			else break;

		}
		//now back up or move forward s.t. startKey
		//is <= start
		while(start < m_numKeys) {
			kk = m_keys + (recSize * start);
			v = KEYCMP(startKey, kk, ks);
			if(v > 0) start++;
			else break;
		}
	}
	else start = 0;


	if(endKey) {
		int32_t low = start;
		int32_t high = m_numKeys - 1;
		while(low <= high) {
			int32_t delta = high - low;
			end = low + (delta >> 1);
			kk = m_keys + (recSize * end);
			v = KEYCMP(endKey,kk,ks);
			if(v < 0) {
				high = end - 1;
				continue;
			}
			else if(v > 0) {
				low = end + 1;
				continue;
			}
			else break;

		}
		while(end > 0) {
			kk = m_keys + (recSize * end);
			v = KEYCMP(endKey, kk, ks);
			if(v < 0) end--;
			else break;
		}

	}
	else end = m_numKeys - 1;

	//log(LOG_WARN, "numKeys %"INT32" start %"INT32" end %"INT32"", 
	//m_numKeys, start, end);
	
	//keep track of our negative a positive recs
	int32_t numNeg = 0;
	int32_t numPos = 0;

	int32_t fixedDataSize = m_parent->getFixedDataSize();

	char* currKey = m_keys + (start * recSize);

	//bail now if there is only one key and it is out of range.
	if(start == end && 
	   ((startKey && KEYCMP(currKey, startKey, ks) < 0) ||
	    (endKey   && KEYCMP(currKey, endKey, ks) > 0))) {
		return true;
	}
	// 	//set our real start key
	// 	if(startKey != NULL) list->setStartKey(currKey);

	char* lastKey = NULL;
	for(int32_t i = start; 
	    i <= end && list->getListSize() < minRecSizes; 
	    i++, currKey += recSize) {
		if ( fixedDataSize == 0 ) {
			if ( ! list->addRecord(currKey, 0, NULL))
				return log("db: Failed to add record "
					   "to list for %s: %s. "
					   "Fix the growList algo.",
					   m_parent->getDbname(),
					   mstrerror(g_errno));
		} 
		else {
			int32_t dataSize = fixedDataSize;
			if ( fixedDataSize == -1 ) 
				dataSize = *(int32_t*)(currKey + 
						    ks + sizeof(char*));
			if ( ! list->addRecord ( currKey ,
						 dataSize,
						 currKey + ks) )
				return log("db: Failed to add record "
					   "to list for %s: %s. "
					   "Fix the growList algo.",
					   m_parent->getDbname(),
					   mstrerror(g_errno));
		}
		if ( KEYNEG(currKey) ) numNeg++;
		else                   numPos++;
		lastKey = currKey;

#ifdef GBSANITYCHECK
		//sanity, remove for production
		if(startKey && KEYCMP(currKey, startKey, ks) < 0) {
			log("db: key is outside the "
			    "keyrange given for getList."
			    "  it is < startkey."
			    "  %016"XINT64"%08"XINT32" %016"XINT64"%08"XINT32"."
			    "  getting keys %"INT32" to %"INT32" for list"
			    "bounded by %016"XINT64"%08"XINT32" %016"XINT64"%08"XINT32"",
			    *(int64_t*)(startKey+(sizeof(int32_t))),
			    *(int32_t*)startKey,
			    *(int64_t*)(currKey+(sizeof(int32_t))),
			    *(int32_t*)currKey,
			    start, end,
 			    *(int64_t*)(startKey+(sizeof(int32_t))),
			    *(int32_t*)startKey,
 			    *(int64_t*)(endKey+(sizeof(int32_t))),
			    *(int32_t*)endKey);

			printBucket();
			char* xx=NULL; *xx=0;
		}
		if(endKey &&   KEYCMP(currKey, endKey, ks) > 0) {
			log("db: key is outside the "
			    "keyrange given for getList."
			    "  it is > endkey"
			    "  %016"XINT64"%08"XINT32" %016"XINT64"%08"XINT32"."
			    "  getting keys %"INT32" to %"INT32" for list"
			    "bounded by %016"XINT64"%08"XINT32" %016"XINT64"%08"XINT32"",
			    *(int64_t*)(currKey+(sizeof(int32_t))),
			    *(int32_t*)currKey,
			    *(int64_t*)(endKey+(sizeof(int32_t))),
			    *(int32_t*)endKey,
			    start, end,
 			    *(int64_t*)(startKey+(sizeof(int32_t))),
			    *(int32_t*)startKey,
 			    *(int64_t*)(endKey+(sizeof(int32_t))),
			    *(int32_t*)endKey);

			printBucket();
			char* xx=NULL; *xx=0;
		}
#endif
	}

	// set counts to pass back, we may be accumulating over multiple
	// buckets so add it to the count
	if ( numNegRecs ) *numNegRecs += numNeg;
	if ( numPosRecs ) *numPosRecs += numPos;

	//if we don't have an end key, we were not the last bucket, so don't 
	//finalize the list... yes do, because we might've hit min rec sizes
	if(endKey == NULL && list->getListSize() < minRecSizes) return true;

	if ( lastKey != NULL ) list->setLastKey ( lastKey );
	
	// reset the list's endKey if we hit the minRecSizes barrier cuz
	// there may be more records before endKey than we put in "list"
	if ( list->getListSize() >= minRecSizes && lastKey != NULL ) {
		// use the last key we read as the new endKey
		//key_t newEndKey = m_keys[lastNode];
		char newEndKey[MAX_KEY_BYTES];
		KEYSET(newEndKey, lastKey, ks);
		// . if he's negative, boost new endKey by 1 because endKey's
		//   aren't allowed to be negative
		// . we're assured there's no positive counterpart to him 
		//   since Rdb::addRecord() doesn't allow both to exist in
		//   the tree at the same time
		// . if by some chance his positive counterpart is in the
		//   tree, then it's ok because we'd annihilate him anyway,
		//   so we might as well ignore him
		// we are little endian
		if ( KEYNEG(newEndKey,0,ks) ) KEYADD(newEndKey,1,ks);
		// if we're using half keys set his half key bit
		if ( useHalfKeys ) KEYOR(newEndKey,0x02);
		if ( m_parent->m_rdbId == RDB_POSDB ||
		     m_parent->m_rdbId == RDB2_POSDB2 ) 
			newEndKey[0] |= 0x04;
		// tell list his new endKey now
		list->setEndKey ( newEndKey );
	}
	// reset list ptr to point to first record
	list->resetListPtr();

	// success
	return true;
}



int RdbBucket::getListSizeExact (char* startKey, char* endKey ) {

	int32_t numRecs = 0;

	sort();
	//get our bounds within the bucket:
	uint8_t ks = m_parent->getKeySize();
	int32_t recSize = m_parent->getRecSize();
	int32_t start = 0;
	int32_t end = m_numKeys - 1;
	char v;
	char* kk = NULL;

	int32_t low = 0;
	int32_t high = m_numKeys - 1;
	while(low <= high) {
		int32_t delta = high - low;
		start = low + (delta >> 1);
		kk = m_keys + (recSize * start);
		v = KEYCMP(startKey,kk,ks);
		if(v < 0) {
			high = start - 1;
			continue;
		}
		else if(v > 0) {
			low = start + 1;
			continue;
		}
		else break;

	}
	//now back up or move forward s.t. startKey
	//is <= start
	while(start < m_numKeys) {
		kk = m_keys + (recSize * start);
		v = KEYCMP(startKey, kk, ks);
		if(v > 0) start++;
		else break;
	}




	low = start;
	high = m_numKeys - 1;
	while(low <= high) {
		int32_t delta = high - low;
		end = low + (delta >> 1);
		kk = m_keys + (recSize * end);
		v = KEYCMP(endKey,kk,ks);
		if(v < 0) {
			high = end - 1;
			continue;
		}
		else if(v > 0) {
			low = end + 1;
			continue;
		}
		else break;

	}
	while(end > 0) {
		kk = m_keys + (recSize * end);
		v = KEYCMP(endKey, kk, ks);
		if(v < 0) end--;
		else break;
	}


	//keep track of our negative a positive recs
	//int32_t numNeg = 0;
	//int32_t numPos = 0;

	int32_t fixedDataSize = m_parent->getFixedDataSize();

	char* currKey = m_keys + (start * recSize);

	//bail now if there is only one key and it is out of range.
	if(start == end && 
	   ((startKey && KEYCMP(currKey, startKey, ks) < 0) ||
	    (endKey   && KEYCMP(currKey, endKey, ks) > 0))) {
		return 0;
	}

	// MDW: are none negatives?
	if ( fixedDataSize == 0 ) {
		numRecs = (end - start) * ks;
		return numRecs;
	}

	char* lastKey = NULL;
	for(int32_t i = start; 
	    i <= end ; //&& list->getListSize() < minRecSizes; 
	    i++, currKey += recSize) {
		// if ( fixedDataSize == 0 ) {
		// 	numRecs++;
		// } 
		// else {
		int32_t dataSize = fixedDataSize;
		if ( fixedDataSize == -1 ) 
			dataSize = *(int32_t*)(currKey+ks+sizeof(char*));
		numRecs++;
		//}
		lastKey = currKey;
	}

	// success
	return numRecs * ks ;
}


bool RdbBuckets::deleteList(collnum_t collnum, RdbList *list) {
	if(list->getListSize() == 0) return true;

	if(!m_isWritable || m_isSaving ) {
		g_errno = EAGAIN;
		return false;
	}

	// . set this right away because the head bucket needs to know if we
	// . need to save
	m_needsSave = true;

	char startKey [ MAX_KEY_BYTES ];
	char endKey   [ MAX_KEY_BYTES ];
	list->getStartKey ( startKey );
	list->getEndKey   ( endKey   );

	int32_t startBucket = getBucketNum(startKey, collnum);
	if(startBucket > 0 && 
	   bucketCmp(startKey, collnum, m_buckets[startBucket-1]) < 0)
		startBucket--;

	// if the startKey is past our last bucket, then nothing
	// to delete
	if(startBucket == m_numBuckets ||
	   m_buckets[startBucket]->getCollnum() != collnum) return true;


	int32_t endBucket = getBucketNum(endKey, collnum);
	if(endBucket == m_numBuckets ||
	   m_buckets[endBucket]->getCollnum() != collnum) endBucket--;
	//log(LOG_WARN, "db numBuckets %"INT32" start %"INT32" end %"INT32"", 
	//  m_numBuckets, startBucket, endBucket);

	list->resetListPtr();
	for(int32_t i= startBucket; i <= endBucket && !list->isExhausted(); i++) {
		if(!m_buckets[i]->deleteList(list)) {
			m_buckets[i]->reset();
			m_buckets[i] = NULL;
		}
	}
	int32_t j = 0;
	for(int32_t i = 0; i < m_numBuckets; i++) 
		if(m_buckets[i]) m_buckets[j++] = m_buckets[i];
	m_numBuckets = j;

	//did we delete the whole darn thing?  
	if(m_numBuckets == 0) {
		if(m_numKeysApprox != 0) {
			log("db: bucket's number of keys is getting off by %"INT32""
			    " after deleting a list", m_numKeysApprox);
			char *xx = NULL; *xx = 0;
		}
		m_firstOpenSlot = 0;
	}
	return true;
}


bool RdbBucket::deleteList(RdbList *list) {

	sort();
	uint8_t ks = m_parent->getKeySize();
	int32_t recSize = m_parent->getRecSize();
	int32_t fixedDataSize = m_parent->getFixedDataSize();
	char v;

	char *currKey = m_keys;
	char *p = currKey;
	char listkey[MAX_KEY_BYTES]; 

	char *lastKey = m_keys + (m_numKeys * recSize);
	int32_t br = 0; //bytes removed
	int32_t dso = ks+sizeof(char*);//datasize offset
	int32_t numNeg = 0;

	list->getCurrentKey(listkey);
	while(currKey < lastKey) {  //&& !list->isExhausted()

		v = KEYCMP(currKey, listkey, ks);
		if(v == 0) {
			if(fixedDataSize != 0) {
				if(fixedDataSize == -1) 
					br += *(int32_t*)(currKey+dso);
				else br += fixedDataSize;
			}
			if(KEYNEG(currKey)) numNeg++;

			// . forget it exists by advancing read ptr without 
			// . advancing the write ptr
			currKey += recSize;
			if(!list->skipCurrentRecord()) break;
			list->getCurrentKey(listkey);
			continue;
		} 
		else if (v < 0) {
			// . copy this key into place, it was not in the 
			// . delete list
			if(p != currKey) gbmemcpy(p, currKey, recSize);
			p += recSize;
			currKey += recSize;
		} 
		else { //list key > current key
			// . otherwise advance the delete list until 
			//listKey is <= currKey
			if(!list->skipCurrentRecord()) break;
			list->getCurrentKey(listkey);
		}
	}

	// . do we need to finish copying our list down to the 
	// . vacated mem?
	if(currKey < lastKey) {
		int32_t tmpSize = lastKey - currKey;
		gbmemcpy(p, currKey, tmpSize);
		p += tmpSize;
	}

	if(p > m_keys) {	//do we have anything left?
		int32_t newNumKeys = (p - m_keys) / recSize;
		m_parent->updateNumRecs(newNumKeys - m_numKeys, - br, -numNeg);
		m_numKeys = newNumKeys;
		m_lastSorted = m_numKeys;
		m_endKey = m_keys + ((m_numKeys - 1) * recSize);
		return true;

	}
	else {
		//we deleted the entire bucket, let our parent know to free us
		m_parent->updateNumRecs( - m_numKeys, - br, -numNeg);
		return false;
	}
	// success
	return true;
}

// remove keys from any non-existent collection
void RdbBuckets::cleanBuckets ( ) {

	// what buckets have -1 rdbid???
	if ( m_rdbId < 0 ) return;

	// the liberation count
	int32_t count = 0;

	/*
	char buf[50000];
	RdbList list;
	list.set ( NULL,
		   0,
		   buf,
		   50000,
		   0, // fixeddatasize
		   false, // own data? should rdblist free it
		   false, // usehalfkeys
		   m_ks);
	*/

 top:

	for ( int32_t i = 0; i < m_numBuckets; i++ ) {
		RdbBucket *b = m_buckets[i];
		collnum_t collnum = b->getCollnum();
		CollectionRec *cr = NULL;
		if ( collnum < g_collectiondb.m_numRecs ) 
			cr = g_collectiondb.m_recs[collnum];
		if ( cr ) continue;
		// count # deleted
		count += b->getNumKeys();
		// delete that coll
		delColl ( collnum );
		// restart
		goto top;
		/*
		int32_t nk = b->getNumKeys();
		for (int32_t j = 0 ; j < nk ; j++ ) {
			char *kp = b->m_keys + j*m_ks;
			// add into list. should just be a gbmemcpy()
			list.addKey ( kp , 0 , NULL );
		*/
		//deleteBucket ( i );
	}

	// print it
	if ( count == 0 ) return;
	log(LOG_LOGIC,"db: Removed %"INT32" records from %s buckets "
	    "for invalid collection numbers.",count,m_dbname);
	//log(LOG_LOGIC,"db: Records not actually removed for safety. Except "
	//    "for those with negative colnums.");
	// static bool s_print = true;
	// if ( ! s_print ) return;
	// s_print = false;
	// log (LOG_LOGIC,"db: This is bad. Did you remove a collection "
	//      "subdirectory? Don't do that, you should use the \"delete "
	//      "collections\" interface because it also removes records from "
	//      "memory, too.");
}


bool RdbBuckets::delColl(collnum_t collnum) {

	m_needsSave = true;
	RdbList list;
	int32_t minRecSizes = 1024*1024;
	int32_t numPosRecs  = 0;
	int32_t numNegRecs = 0;
	while (1) {
		if(!getList(collnum, KEYMIN(), KEYMAX(), minRecSizes ,
			    &list , &numPosRecs , &numNegRecs, false )) {
			if(g_errno == ENOMEM && minRecSizes > 1024) {
				minRecSizes /= 2;
				continue;
			} else {
				log("db: buckets could not delete "
				    "collection: %s.",
				    mstrerror(errno));
				return false;
			}
		} 
		if(list.isEmpty()) break;
		deleteList(collnum, &list);
	}

	log("buckets: deleted all keys for collnum %"INT32,(int32_t)collnum);
	return true;
}

int32_t RdbBuckets::addTree(RdbTree* rt) {

	int32_t n = rt->getFirstNode();
	int32_t count = 0;
	char* data = NULL;

	int32_t dataSize = m_fixedDataSize;

	while ( n >= 0 ) {
		if(m_fixedDataSize != 0) {
			data = rt->getData ( n );
			if(m_fixedDataSize == -1) 
				dataSize = rt->getDataSize(n);
		}

		if(addNode ( rt->getCollnum (n), 
			     rt->getKey ( n ) , 
			     data , dataSize) < 0) 
			break;
		n = rt->getNextNode ( n );
		count++;
	}
	log("db: added %"INT32" keys from tree to buckets for %s.",count, m_dbname);
	return count;
}


//this could be sped up a lot, but it is only called from repair at
//the moment.
bool RdbBuckets::addList(RdbList* list, collnum_t collnum) {
	char listKey[MAX_KEY_BYTES]; 

	for( list->resetListPtr(); 
	     !list->isExhausted(); 
	     list->skipCurrentRecord()) {

		list->getCurrentKey(listKey);

		if(addNode(collnum,  
			   listKey , 
			   list->getCurrentData() , 
			   list->getCurrentDataSize()) < 0)
			return false;
	}

	return true;
}




//return the total bytes of the list bookended by startKey and endKey
int64_t RdbBuckets::getListSize ( collnum_t collnum,
				    char *startKey , char *endKey , 
				    char *minKey , char *maxKey ) {

	if ( minKey ) KEYSET ( minKey , endKey   , m_ks );
	if ( maxKey ) KEYSET ( maxKey , startKey , m_ks );

	int32_t startBucket = getBucketNum(startKey, collnum);
	if(startBucket > 0 && 
	   bucketCmp(startKey, collnum, m_buckets[startBucket-1]) < 0)
		startBucket--;

	if(startBucket == m_numBuckets ||
	   m_buckets[startBucket]->getCollnum() != collnum) return 0;


	int32_t endBucket = getBucketNum(endKey, collnum);

	// not sure if i should have added this: MDW
	//if(bucketCmp(endKey, collnum, m_buckets[startBucket]) <= 0)
	//	endBucket = startBucket;

	if(endBucket == m_numBuckets ||
	   m_buckets[endBucket]->getCollnum() != collnum) endBucket--;
	//log(LOG_WARN, "db numBuckets %"INT32" start %"INT32" end %"INT32"", 
	//m_numBuckets, startBucket, endBucket);

	int64_t retval = 0;
	for(int32_t i = startBucket; i <= endBucket; i++) {
		retval += m_buckets[i]->getNumKeys();
	}
	return retval * m_recSize;
}

void *saveBucketsWrapper      ( void *state , ThreadEntry *t ) ;
void threadDoneBucketsWrapper ( void *state , ThreadEntry *t ) ;

// . caller should call f->set() himself
// . we'll open it here
// . returns false if blocked, true otherwise
// . sets g_errno on error
bool RdbBuckets::fastSave ( char    *dir       ,
			    bool     useThread ,
			    void    *state     ,
			    void    (* callback) (void *state) ) {
	if ( g_conf.m_readOnlyMode ) return true;
	// we do not need a save
	if ( ! m_needsSave ) return true;
	// return true if already in the middle of saving
	if ( m_isSaving ) return false;

	// do not use thread for now!! test it to make sure that was
	// not the problem
	//useThread = false;

	// save parms
	//m_saveFile = f;
	m_dir      = dir;
	m_state    = state;
	m_callback = callback;
	// assume no error
	m_saveErrno = 0;
	// no adding to the tree now
	m_isSaving = true;

	/*
// skip thread call if we should
	if ( ! useThread ) goto skip;
	// make this a thread now
	if ( g_threads.call ( SAVETREE_THREAD   , // threadType
			      1                 , // niceness
			      this              , 
			      threadDoneBucketsWrapper ,
			      saveBucketsWrapper) ) return false;
	// if it failed
	if ( ! g_threads.m_disabled ) 
		log("db: Thread creation failed. Blocking while "
		    "saving buckets. Hurts performance.");
 skip:
	*/

	// . this returns false and sets g_errno on error
	// . must now lock for each bucket when saving that bucket, but
	//   release lock to breathe between buckets
	fastSave_r ();

	// store save error into g_errno
	g_errno = m_saveErrno;
	// resume adding to the tree
	m_isSaving = false;
	// we do not need to be saved now?
	m_needsSave = false;
	// we did not block
	return true;
}

void *saveBucketsWrapper ( void *state , ThreadEntry *t ) {
	// get this class
	RdbBuckets *THIS = (RdbBuckets *)state;
	// this returns false and sets g_errno on error
	THIS->fastSave_r();
	// now exit the thread, bogus return
	return NULL;
}

// we come here after thread exits
void threadDoneBucketsWrapper ( void *state , ThreadEntry *t ) {
	// get this class
	RdbBuckets *THIS = (RdbBuckets *)state;
	// store save error into g_errno
	g_errno = THIS->m_saveErrno;
	// . resume adding to the tree
	// . this will also allow other threads to be queued
	// . if we did this at the end of the thread we could end up with
	//   an overflow of queued SAVETHREADs
	THIS->m_isSaving = false;
	// we do not need to be saved now?
	THIS->m_needsSave = false;
	// g_errno should be preserved from the thread so if fastSave_r()
	// had an error it will be set
	if ( g_errno )
		log("db: Had error saving tree to disk for %s: %s.",
		    THIS->m_dbname,mstrerror(g_errno));
	// . call callback
	if ( THIS->m_callback ) THIS->m_callback ( THIS->m_state );
}

// . returns false and sets g_errno on error
// . NO USING g_errno IN A DAMN THREAD!!!!!!!!!!!!!!!!!!!!!!!!!
bool RdbBuckets::fastSave_r() {
	if ( g_conf.m_readOnlyMode ) return true;
	// recover the file
	//BigFile *f = m_saveFile;
	// open it up
	//if ( ! f->open ( O_RDWR | O_CREAT ) ) 
	//	return log("RdbTree::fastSave_r: %s",mstrerror(g_errno));
	// cannot use the BigFile class, since we may be in a thread and it 
	// messes with g_errno
	//char *s = m_saveFile->getFilename();
	char s[1024];
	sprintf ( s , "%s/%s-buckets-saving.dat", m_dir , m_dbname );
	int fd = ::open ( s , 
			  O_RDWR | O_CREAT | O_TRUNC ,
			  getFileCreationFlags() );
			  // S_IRUSR | S_IWUSR | 
			  // S_IRGRP | S_IWGRP | S_IROTH);
	if ( fd < 0 ) {
		m_saveErrno = errno;
		return log("db: Could not open %s for writing: %s.",
			   s,mstrerror(errno));
	}
	// clear our own errno
	errno = 0;
	// . save the header
	// . force file head to the 0 byte in case offset was elsewhere
	int64_t offset = 0;

	offset = fastSaveColl_r(fd, offset);
	
	// close it up
	close ( fd );
	// now fucking rename it
	char s2[1024];
	sprintf ( s2 , "%s/%s-buckets-saved.dat", m_dir , m_dbname );
	::rename ( s , s2 ) ; //fuck yeah!
	// info
	log("db RdbBuckets saved %"INT32" keys, %"INT64" bytes for %s",
	    getNumKeys(), offset, m_dbname);

	return offset >= 0;
}

int64_t RdbBuckets::fastSaveColl_r(int fd, int64_t offset) {
	if(m_numKeysApprox == 0) return offset;
	int32_t version = SAVE_VERSION;
	int32_t err = 0;
	if ( pwrite ( fd , &version, sizeof(int32_t) , offset ) != 4 ) err=errno;
	offset += sizeof(int32_t);

	if ( pwrite ( fd , &m_numBuckets, sizeof(int32_t) , offset)!=4)err=errno; 
	offset += sizeof(int32_t);
	if ( pwrite ( fd , &m_maxBuckets, sizeof(int32_t) , offset)!=4)err=errno; 
	offset += sizeof(int32_t);

	if ( pwrite ( fd , &m_ks, sizeof(uint8_t) , offset ) != 1) err=errno; 
	offset += sizeof(uint8_t);
	if ( pwrite ( fd , &m_fixedDataSize,sizeof(int32_t),offset)!=4) err=errno;
	offset += sizeof(int32_t);
	if ( pwrite ( fd , &m_recSize, sizeof(int32_t) , offset ) != 4) err=errno;
	offset += sizeof(int32_t);
	if ( pwrite ( fd , &m_numKeysApprox,sizeof(int32_t),offset) !=4)err=errno;
	offset += sizeof(int32_t);
	if ( pwrite ( fd , &m_numNegKeys,sizeof(int32_t),offset) != 4 ) err=errno;
	offset += sizeof(int32_t);

	if ( pwrite ( fd,&m_dataMemOccupied,sizeof(int32_t),offset)!=4)err=errno;
	offset += sizeof(int32_t);

	int32_t tmp = BUCKET_SIZE;
	if ( pwrite ( fd , &tmp, sizeof(int32_t) , offset ) != 4 ) err=errno;
	offset += sizeof(int32_t);

	// 	int32_t len = gbstrlen(m_dbname) + 1;
	// 	pwrite ( fd , &m_dbname, len , offset ); 
	// 	offset += len;

	// set it
	if ( err ) errno = err;

	// bitch on error
	if ( errno ) {
		m_saveErrno = errno;
		close ( fd );
		log("db: Failed to save buckets for %s: %s.",
		    m_dbname,mstrerror(errno));
		return -1;
	}
	// position to store into m_keys, ...
	for (int32_t i = 0; i < m_numBuckets; i++ ) {
		offset = m_buckets[i]->fastSave_r(fd, offset);
		// returns -1 on error
		if ( offset < 0 ) {
			close ( fd );
			m_saveErrno = errno;
			log("db: Failed to save buckets for %s: %s.",
			    m_dbname,mstrerror(errno));
			return -1;
		}
	}
	return offset;
}


bool RdbBuckets::loadBuckets ( char* dbname) {
	char filename[256];
	sprintf(filename,"%s-buckets-saved.dat",dbname);
	// set this to false
	// msg
	//log (0,"Rdb::loadTree: loading %s",filename);
	// set a BigFile to this filename
	BigFile file;//g_hostdb.m_dir
	char *dir = g_hostdb.m_dir;
	if( *dir == '\0') dir = ".";
	file.set ( dir , filename , NULL ); 
	if ( file.doesExist() <= 0 ) return true;
	// load the table with file named "THISDIR/saved"
	bool status = false ;
	status = fastLoad ( &file , dbname ) ;
	file.close();
	return status;
}

bool RdbBuckets::fastLoad ( BigFile *f , char* dbname) {
	// msg
	log(LOG_INIT,"db: Loading %s.",f->getFilename());
	// open it up
	if ( ! f->open ( O_RDONLY ) ) return false;
	int32_t fsize = f->getFileSize();
	if ( fsize == 0 ) return true;

	// init offset
	int64_t offset = 0;

	offset = fastLoadColl(f, dbname, offset);

	if ( offset < 0 ) {
		log("db: Failed to load buckets for %s: %s.",
		    m_dbname,mstrerror(g_errno));
		return false;
	}
	return true;
}


int64_t RdbBuckets::fastLoadColl( BigFile *f,
				    char *dbname,
				    int64_t offset ) {
	int32_t maxBuckets;
	int32_t numBuckets;
	int32_t version;

	f->read  ( &version,sizeof(int32_t), offset ); 
	offset += sizeof(int32_t);
	if(version > SAVE_VERSION) {
		log("db: Failed to load buckets for %s: "
		    "saved version is in the future or is corrupt, "
		    "please restart old executable and do a ddump.",
		    m_dbname);
		return -1;
	}

	f->read  ( &numBuckets,sizeof(int32_t), offset ); 
	offset += sizeof(int32_t);

	f->read  ( &maxBuckets,sizeof(int32_t), offset ); 
	offset += sizeof(int32_t);

	f->read  ( &m_ks,sizeof(uint8_t), offset ); 
	offset += sizeof(uint8_t);

	f->read  ( &m_fixedDataSize,sizeof(int32_t), offset ); 
	offset += sizeof(int32_t);

	f->read  ( &m_recSize,sizeof(int32_t), offset ); 
	offset += sizeof(int32_t);

	f->read  ( &m_numKeysApprox,sizeof(int32_t), offset ); 
	offset += sizeof(int32_t);

	f->read  ( &m_numNegKeys,sizeof(int32_t), offset ); 
	offset += sizeof(int32_t);

	f->read  ( &m_dataMemOccupied, sizeof(int32_t), offset ); 
	offset += sizeof(int32_t);

	int32_t bucketSize;
	f->read  ( &bucketSize, sizeof(int32_t), offset ); 
	offset += sizeof(int32_t);


	if(bucketSize != BUCKET_SIZE) {
		log("db: It appears you have changed the bucket size "
		    "please restart the old executable and dump "
		    "buckets to disk. old=%"INT32" new=%"INT32"", 
		    bucketSize, (int32_t)BUCKET_SIZE);
		char *xx = NULL; *xx = 0;
	}

	m_dbname = dbname;

	if ( g_errno ) 
		return -1;

	for (int32_t i = 0; i < numBuckets; i++ ) {
		m_buckets[i] = bucketFactory();
		if(m_buckets[i] == NULL) return -1;
		offset = m_buckets[i]->fastLoad(f, offset);
		// returns -1 on error
		if ( offset < 0 ) 
			return -1;
		m_numBuckets++;
	}
	return offset;
}

// max key size -- posdb, 18 bytes, so use 18 here
#define BTMP_SIZE (BUCKET_SIZE*18+1000)

int64_t RdbBucket::fastSave_r(int fd, int64_t offset) {

	// first copy to a buf before saving so we can unlock!
	char tmp[BTMP_SIZE];
	char *p = tmp;

	gbmemcpy ( p , &m_collnum, sizeof(collnum_t) );
	p += sizeof(collnum_t);
	//pwrite ( fd , &m_collnum, sizeof(collnum_t) , offset ); 
	//offset += sizeof(collnum_t);

	gbmemcpy ( p , &m_numKeys, sizeof(int32_t) );
	p += sizeof(m_numKeys);
	//pwrite ( fd , &m_numKeys, sizeof(int32_t) , offset ); 
	//offset += sizeof(m_numKeys);

	gbmemcpy ( p , &m_lastSorted, sizeof(int32_t) ); 
	p += sizeof(m_lastSorted);
	//pwrite ( fd , &m_lastSorted, sizeof(int32_t) , offset ); 
	//offset += sizeof(m_lastSorted);

	int32_t endKeyOffset = m_endKey - m_keys;
	gbmemcpy ( p , &endKeyOffset, sizeof(int32_t) ); 
	p += sizeof(int32_t);
	//pwrite ( fd , &endKeyOffset, sizeof(int32_t) , offset ); 
	//offset += sizeof(int32_t);
	
	int32_t recSize = m_parent->getRecSize();
	
	gbmemcpy ( p , m_keys, recSize*m_numKeys ); 
	p += recSize*m_numKeys;
	//pwrite ( fd , m_keys, recSize*m_numKeys , offset ); 
	//offset += recSize*m_numKeys;

	int32_t size = p - tmp;
	if ( size > BTMP_SIZE ) { 
		log("buckets: btmp_size too small. keysize>18 bytes?");
		char *xx=NULL;*xx=0; 
	}

	// now we can save it without fear of being interrupted and having
	// the bucket altered
	errno = 0;
	if ( pwrite ( fd , tmp , size , offset ) != size ) {
		log("db:fastSave_r: %s.",mstrerror(errno));
		return -1;
	}
	
	return offset + size;
}

int64_t RdbBucket::fastLoad(BigFile *f, int64_t offset) {
	//errno = 0;

	f->read  ( &m_collnum,sizeof(collnum_t), offset ); 
	offset += sizeof(collnum_t);

	f->read  ( &m_numKeys,sizeof(int32_t), offset ); 
	offset += sizeof(int32_t);

	f->read  ( &m_lastSorted,sizeof(int32_t), offset ); 
	offset += sizeof(int32_t);

	int32_t endKeyOffset;
	f->read  ( &endKeyOffset,sizeof(int32_t), offset ); 
	offset += sizeof(int32_t);

	int32_t recSize = m_parent->getRecSize();
	
	f->read  ( m_keys,recSize*m_numKeys, offset ); 
	offset += recSize*m_numKeys;

	m_endKey = m_keys + endKeyOffset;
	if ( g_errno ) {
		log("bucket: fastload %s",mstrerror(g_errno));
		return -1;
	}

	return offset;
}