2349 lines
60 KiB
C++
2349 lines
60 KiB
C++
#undef _XOPEN_SOURCE
|
|
#define _XOPEN_SOURCE 500
|
|
#include "RdbBuckets.h"
|
|
#include "gb-include.h"
|
|
#include "sort.h"
|
|
#include "SafeBuf.h"
|
|
#include "Threads.h"
|
|
#include <unistd.h>
|
|
#include <sys/stat.h>
|
|
#include "Loop.h"
|
|
#include "Rdb.h"
|
|
|
|
//#define BUCKET_SIZE 64
|
|
//#define BUCKET_SIZE 1024
|
|
//#define BUCKET_SIZE 2048
|
|
//#define BUCKET_SIZE 4096
|
|
#define BUCKET_SIZE 8192
|
|
//#define BUCKET_SIZE 16384
|
|
//#define BUCKET_SIZE 1638400
|
|
#define INIT_SIZE 4096
|
|
#define SAVE_VERSION 0
|
|
|
|
|
|
inline int KEYCMP12 ( const void *a, const void *b ) {
|
|
char* k1 = (char*)a;
|
|
char* k2 = (char*)b;
|
|
if ( (*(uint64_t *)(k1+4)) <
|
|
(*(uint64_t *)(k2+4)) ) return -1;
|
|
if ( (*(uint64_t *)(k1+4)) >
|
|
(*(uint64_t *)(k2+4)) ) return 1;
|
|
uint32_t k1n0 = ((*(uint32_t*)(k1)) & ~0x01UL);
|
|
uint32_t k2n0 = ((*(uint32_t*)(k2)) & ~0x01UL);
|
|
if ( k1n0 < k2n0 ) return -1;
|
|
if ( k1n0 > k2n0 ) return 1;
|
|
return 0;
|
|
}
|
|
|
|
|
|
|
|
inline int KEYCMP16 ( const void *a, const void *b ) {
|
|
char* k1 = (char*)a;
|
|
char* k2 = (char*)b;
|
|
if ( (*(uint64_t *)(k1+8)) <
|
|
(*(uint64_t *)(k2+8)) ) return -1;
|
|
if ( (*(uint64_t *)(k1+8)) >
|
|
(*(uint64_t *)(k2+8)) ) return 1;
|
|
uint64_t k1n0 = ((*(uint64_t *)(k1)) & ~0x01ULL);
|
|
uint64_t k2n0 = ((*(uint64_t *)(k2)) & ~0x01ULL);
|
|
if ( k1n0 < k2n0 ) return -1;
|
|
if ( k1n0 > k2n0 ) return 1;
|
|
return 0;
|
|
}
|
|
|
|
inline int KEYCMP18 ( const void *a, const void *b ) {
|
|
char* k1 = (char*)a;
|
|
char* k2 = (char*)b;
|
|
if ( (*(uint64_t *)(k1+10)) <
|
|
(*(uint64_t *)(k2+10)) ) return -1;
|
|
if ( (*(uint64_t *)(k1+10)) >
|
|
(*(uint64_t *)(k2+10)) ) return 1;
|
|
if ( (*(uint64_t *)(k1+2)) <
|
|
(*(uint64_t *)(k2+2)) ) return -1;
|
|
if ( (*(uint64_t *)(k1+2)) >
|
|
(*(uint64_t *)(k2+2)) ) return 1;
|
|
uint16_t k1n0 = ((*(uint16_t *)(k1)) & 0xfffe);
|
|
uint16_t k2n0 = ((*(uint16_t *)(k2)) & 0xfffe);
|
|
if ( k1n0 < k2n0 ) return -1;
|
|
if ( k1n0 > k2n0 ) return 1;
|
|
return 0;
|
|
}
|
|
|
|
inline int KEYCMP24 ( const void *a, const void *b ) {
|
|
char* k1 = (char*)a;
|
|
char* k2 = (char*)b;
|
|
if ( (*(uint64_t *)(k1+16)) <
|
|
(*(uint64_t *)(k2+16)) ) return -1;
|
|
if ( (*(uint64_t *)(k1+16)) >
|
|
(*(uint64_t *)(k2+16)) ) return 1;
|
|
if ( (*(uint64_t *)(k1+8)) <
|
|
(*(uint64_t *)(k2+8)) ) return -1;
|
|
if ( (*(uint64_t *)(k1+8)) >
|
|
(*(uint64_t *)(k2+8)) ) return 1;
|
|
uint64_t k1n0 = ((*(uint64_t *)(k1)) & ~0x01ULL);
|
|
uint64_t k2n0 = ((*(uint64_t *)(k2)) & ~0x01ULL);
|
|
if ( k1n0 < k2n0 ) return -1;
|
|
if ( k1n0 > k2n0 ) return 1;
|
|
return 0;
|
|
}
|
|
|
|
inline int KEYCMP6 ( const void *a, const void *b ) {
|
|
char* k1 = (char*)a;
|
|
char* k2 = (char*)b;
|
|
if ( (*(uint32_t *)(k1+2)) <
|
|
(*(uint32_t *)(k2+2)) ) return -1;
|
|
if ( (*(uint32_t *)(k1+2)) >
|
|
(*(uint32_t *)(k2+2)) ) return 1;
|
|
if ( (*(uint16_t *)(k1+0)) <
|
|
(*(uint16_t *)(k2+0)) ) return -1;
|
|
if ( (*(uint16_t *)(k1+0)) >
|
|
(*(uint16_t *)(k2+0)) ) return 1;
|
|
return 0;
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bool RdbBucket::set(RdbBuckets* parent, char* newbuf) {
|
|
m_endKey = NULL;
|
|
m_parent = parent;
|
|
m_lastSorted = 0;
|
|
m_numKeys = 0;
|
|
m_keys = newbuf;
|
|
return true;
|
|
}
|
|
|
|
|
|
void RdbBucket::reBuf(char* newbuf) {
|
|
if(!m_keys) {
|
|
m_keys = newbuf;
|
|
return;
|
|
}
|
|
gbmemcpy(newbuf, m_keys, m_numKeys * m_parent->getRecSize());
|
|
if(m_endKey) m_endKey = newbuf + (m_endKey - m_keys);
|
|
m_keys = newbuf;
|
|
}
|
|
|
|
|
|
|
|
|
|
RdbBucket::~RdbBucket() {
|
|
reset();
|
|
}
|
|
|
|
|
|
void RdbBucket::reset() {
|
|
//m_keys = NULL;
|
|
m_numKeys = 0;
|
|
m_lastSorted = 0;
|
|
m_endKey = NULL;
|
|
}
|
|
|
|
int32_t RdbBuckets::getMemAlloced () {
|
|
int32_t alloced = sizeof(RdbBuckets) + m_masterSize + m_dataMemOccupied;
|
|
return alloced;
|
|
}
|
|
|
|
//includes data in the data ptrs
|
|
int32_t RdbBuckets::getMemOccupied() {
|
|
return (m_numKeysApprox * m_recSize) + m_dataMemOccupied +
|
|
sizeof(RdbBuckets) +
|
|
m_sortBufSize +
|
|
BUCKET_SIZE * m_recSize; //swapbuf
|
|
}
|
|
|
|
|
|
int32_t RdbBuckets::getMemAvailable() {
|
|
return m_maxMem - getMemOccupied();
|
|
}
|
|
|
|
|
|
bool RdbBuckets::is90PercentFull() {
|
|
return getMemOccupied () > m_maxMem * .9;
|
|
}
|
|
|
|
bool RdbBuckets::needsDump() {
|
|
if(m_numBuckets + 1 < m_maxBuckets) return false;
|
|
if(m_maxBuckets == m_maxBucketsCapacity) return true;
|
|
return false;
|
|
}
|
|
|
|
//be very conservative with this because if we say we can fit it
|
|
//and we can't then we'll get a partial list added and we will
|
|
//add the whole list again.
|
|
bool RdbBuckets::hasRoom ( int32_t numRecs ) {
|
|
int32_t numBucketsRequired = (((numRecs / BUCKET_SIZE)+1) * 2);
|
|
if(m_maxBucketsCapacity - m_numBuckets < numBucketsRequired)
|
|
return false;
|
|
return true;
|
|
}
|
|
|
|
|
|
|
|
bool RdbBucket::sort() {
|
|
|
|
//m_lastSorted = 0;//for debugging
|
|
if(m_lastSorted == m_numKeys) return true;
|
|
|
|
|
|
if(m_numKeys < 2) {
|
|
m_lastSorted = m_numKeys;
|
|
return true;
|
|
}
|
|
|
|
uint8_t ks = m_parent->getKeySize();
|
|
int32_t recSize = m_parent->getRecSize();
|
|
int32_t fixedDataSize = m_parent->getFixedDataSize();
|
|
int (*cmpfn) (const void*, const void *) = NULL;
|
|
if ( ks == 18 ) cmpfn = KEYCMP18;
|
|
else if ( ks == 24 ) cmpfn = KEYCMP24;
|
|
else if ( ks == 12 ) cmpfn = KEYCMP12;
|
|
else if ( ks == 16 ) cmpfn = KEYCMP16;
|
|
else if ( ks == 6 ) cmpfn = KEYCMP6;
|
|
else { char *xx=NULL;*xx=0; }
|
|
|
|
char* mergeBuf = m_parent->getSwapBuf();
|
|
if(!mergeBuf) { char* xx = NULL; *xx = 0; }
|
|
|
|
int32_t numUnsorted = m_numKeys - m_lastSorted;
|
|
char *list1 = m_keys;
|
|
char *list2 = m_keys + (recSize*m_lastSorted);
|
|
char *list1end = list2;
|
|
char *list2end = list2 + (recSize * numUnsorted);
|
|
//turn quickpoll off while we sort,
|
|
//because we do not know what sort of indeterminate state
|
|
//we will be in while sorting
|
|
// MDW: this no longer disables it since it is based on g_niceness
|
|
// now, but what is the point, does it use static vars or what?
|
|
//bool canQuickpoll = g_loop.m_canQuickPoll;
|
|
//g_loop.m_canQuickPoll = false;
|
|
//sort the unsorted portion
|
|
// turn off this way
|
|
int32_t saved = g_niceness;
|
|
g_niceness = 0;
|
|
// . use merge sort because it is stable, and we need to always keep
|
|
// . the identical keys that were added last
|
|
// . now we pass in a buffer to merge into, otherwise one is malloced,
|
|
// . which can fail. It falls back on qsort which is not stable.
|
|
if(!m_parent->getSortBuf()) {char *xx = NULL; *xx = 0;}
|
|
gbmergesort (list2, numUnsorted , recSize , cmpfn, 0,
|
|
m_parent->getSortBuf(), m_parent->getSortBufSize());
|
|
|
|
//g_loop.m_canQuickPoll = canQuickpoll;
|
|
g_niceness = saved;
|
|
|
|
char *p = mergeBuf;
|
|
char v;
|
|
char *lastKey = NULL;
|
|
int32_t br = 0; //bytesRemoved (abbreviated for column width)
|
|
int32_t dso = ks + sizeof(char*);//datasize offset
|
|
int32_t numNeg = 0;
|
|
|
|
while(1) {
|
|
if(list1 >= list1end) {
|
|
// . just copy into place, deduping as we go
|
|
while(list2 < list2end) {
|
|
if(lastKey && KEYCMPNEGEQ(list2,lastKey,ks) == 0) {
|
|
//this is a dup, we are removing data
|
|
if(fixedDataSize != 0) {
|
|
if(fixedDataSize == -1)
|
|
br += *(int32_t*)(lastKey+dso);
|
|
else br += fixedDataSize;
|
|
}
|
|
if ( KEYNEG(lastKey) ) numNeg++;
|
|
p = lastKey;
|
|
}
|
|
gbmemcpy(p, list2, recSize);
|
|
lastKey = p;
|
|
p += recSize;
|
|
list2 += recSize;
|
|
}
|
|
|
|
break;
|
|
}
|
|
if(list2 >= list2end) {
|
|
// . if all that is left is list 1 just copy it into
|
|
// . place, since it is already deduped
|
|
gbmemcpy(p, list1, list1end - list1);
|
|
p += list1end - list1;
|
|
break;
|
|
}
|
|
v = KEYCMPNEGEQ(list1, list2, ks);
|
|
if(v < 0) {
|
|
//never overwrite the merged list from list1 because
|
|
//it is always older and it is already deduped
|
|
if(lastKey && KEYCMPNEGEQ(list1, lastKey, ks) == 0) {
|
|
if ( KEYNEG(lastKey) ) numNeg++;
|
|
list1 += recSize;
|
|
continue;
|
|
}
|
|
gbmemcpy(p, list1, recSize);
|
|
lastKey = p;
|
|
p += recSize;
|
|
list1 += recSize;
|
|
}
|
|
else if(v > 0) {
|
|
//copy it over the one we just copied in
|
|
if(lastKey && KEYCMPNEGEQ(list2, lastKey, ks) == 0) {
|
|
//this is a dup, we are removing data
|
|
if(fixedDataSize != 0) {
|
|
if(fixedDataSize == -1)
|
|
br += *(int32_t*)(lastKey+dso);
|
|
else br += fixedDataSize;
|
|
}
|
|
if ( KEYNEG(lastKey) ) numNeg++;
|
|
p = lastKey;
|
|
}
|
|
|
|
gbmemcpy(p, list2, recSize);
|
|
lastKey = p;
|
|
p += recSize;
|
|
list2 += recSize;
|
|
}
|
|
else {
|
|
if(lastKey && KEYCMPNEGEQ(list2, lastKey, ks) == 0) {
|
|
if(fixedDataSize != 0) {
|
|
if(fixedDataSize == -1)
|
|
br += *(int32_t*)(lastKey+dso);
|
|
else br += fixedDataSize;
|
|
}
|
|
if ( KEYNEG(lastKey) ) numNeg++;
|
|
p = lastKey;
|
|
}
|
|
|
|
//found dup, take list2's
|
|
gbmemcpy(p, list2, recSize);
|
|
lastKey = p;
|
|
p += recSize;
|
|
list2 += recSize;
|
|
list1 += recSize; //fuggedaboutit!
|
|
}
|
|
}
|
|
|
|
//we compacted out the dups, so reflect that here
|
|
int32_t newNumKeys = (p - mergeBuf) / recSize;
|
|
m_parent->updateNumRecs(newNumKeys - m_numKeys , - br, -numNeg);
|
|
m_numKeys = newNumKeys;
|
|
|
|
if(m_keys != mergeBuf) m_parent->setSwapBuf(m_keys);
|
|
m_keys = mergeBuf;
|
|
m_lastSorted = m_numKeys;
|
|
m_endKey = m_keys + ((m_numKeys - 1) * recSize);
|
|
return true;
|
|
|
|
|
|
}
|
|
|
|
|
|
//make 2 half full buckets,
|
|
//addKey assumes that the *this bucket retains the lower half of the keys
|
|
//returns a new bucket with the remaining upper half.
|
|
RdbBucket* RdbBucket::split(RdbBucket* newBucket) {
|
|
|
|
// log(LOG_WARN, "splitting bucket");
|
|
int32_t b1NumKeys = m_numKeys >> 1; //m_numkeys / 2
|
|
int32_t b2NumKeys = m_numKeys - b1NumKeys;
|
|
int32_t recSize = m_parent->getRecSize();
|
|
//configure the new bucket
|
|
gbmemcpy(newBucket->m_keys,
|
|
m_keys + (b1NumKeys*recSize),
|
|
b2NumKeys * recSize);
|
|
newBucket->m_numKeys = b2NumKeys;
|
|
newBucket->m_lastSorted = b2NumKeys;
|
|
newBucket->m_endKey = newBucket->m_keys + ((b2NumKeys - 1) * recSize);
|
|
|
|
//reconfigure the old bucket
|
|
m_numKeys = b1NumKeys;
|
|
m_lastSorted = b1NumKeys;
|
|
m_endKey = m_keys + ((b1NumKeys - 1) * recSize);
|
|
|
|
//add it to our parent
|
|
return newBucket;
|
|
}
|
|
|
|
|
|
bool RdbBucket::addKey(char *key , char *data , int32_t dataSize) {
|
|
|
|
uint8_t ks = m_parent->getKeySize();
|
|
int32_t recSize = m_parent->getRecSize();
|
|
bool isNeg = KEYNEG(key);
|
|
|
|
char *newLoc = m_keys + (recSize * m_numKeys);
|
|
gbmemcpy(newLoc, key, ks);
|
|
|
|
if(data) {
|
|
*(char**)(newLoc + ks) = data;
|
|
if(m_parent->getFixedDataSize() == -1) {
|
|
*(int32_t*)(newLoc + ks + sizeof(char*)) = (int32_t)dataSize;
|
|
}
|
|
}
|
|
|
|
if(m_endKey == NULL) { //are we the first key?
|
|
if(m_numKeys > 0) {char* xx = NULL; *xx = 0;}
|
|
m_endKey = newLoc;
|
|
m_lastSorted = 1;
|
|
}
|
|
else {
|
|
// . minor optimization: if we are almost sorted, then
|
|
// . see if we can't maintain that state.
|
|
char v = KEYCMPNEGEQ(key, m_endKey, ks);
|
|
//char v = KEYCMP(key, m_endKey, ks);
|
|
if(v == 0) {
|
|
// . just replace the old key if we were the same,
|
|
// . don't inc num keys
|
|
gbmemcpy(m_endKey, newLoc, recSize);
|
|
if(KEYNEG(m_endKey)) {
|
|
if(isNeg) return true;
|
|
else m_parent->updateNumRecs(0, 0, -1);
|
|
}
|
|
else if(isNeg) m_parent->updateNumRecs(0, 0, 1);;
|
|
return true;
|
|
}
|
|
else if(v > 0) {
|
|
// . if we were greater than the old key,
|
|
// . we can assume we are still sorted, which
|
|
// . really helps us for adds which are in order
|
|
if(m_lastSorted == m_numKeys) m_lastSorted++;
|
|
m_endKey = newLoc;
|
|
}
|
|
}
|
|
m_numKeys++;
|
|
m_parent->updateNumRecs(1 , dataSize, isNeg?1:0);
|
|
return true;
|
|
}
|
|
|
|
char* RdbBucket::getKeyVal ( char *key , char **data , int32_t* dataSize ) {
|
|
|
|
sort();
|
|
int32_t i = getKeyNumExact(key);
|
|
if(i < 0) return NULL;
|
|
|
|
int32_t recSize = m_parent->getRecSize();
|
|
uint8_t ks = m_parent->getKeySize();
|
|
char* rec = m_keys + (recSize * i);
|
|
|
|
if(data) {
|
|
*data = rec + ks;
|
|
if(dataSize)
|
|
*dataSize = *(int32_t*)*data + sizeof(char*);
|
|
}
|
|
return rec;
|
|
}
|
|
|
|
int32_t RdbBucket::getKeyNumExact(char* key) {
|
|
|
|
uint8_t ks = m_parent->getKeySize();
|
|
int32_t recSize = m_parent->getRecSize();
|
|
int32_t i = 0;
|
|
char v;
|
|
char* kk;
|
|
int32_t low = 0;
|
|
int32_t high = m_numKeys - 1;
|
|
while(low <= high) {
|
|
int32_t delta = high - low;
|
|
i = low + (delta >> 1);
|
|
kk = m_keys + (recSize * i);
|
|
v = KEYCMP(key,kk,ks);
|
|
if(v < 0) {
|
|
high = i - 1;
|
|
continue;
|
|
}
|
|
else if(v > 0) {
|
|
low = i + 1;
|
|
continue;
|
|
}
|
|
else return i;
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
bool RdbBucket::selfTest (char* prevKey) {
|
|
sort();
|
|
char* last = NULL;
|
|
char* kk = m_keys;
|
|
int32_t recSize = m_parent->getRecSize();
|
|
int32_t ks = m_parent->getKeySize();
|
|
|
|
//ensure our first key is > the last guy's end key
|
|
if(prevKey != NULL && m_numKeys > 0) {
|
|
if(KEYCMP(prevKey, m_keys,ks) > 0) {
|
|
log(LOG_WARN, "db: bucket's first key: %016" XINT64 "%08" XINT32 " "
|
|
"is less than last bucket's end key: "
|
|
"%016" XINT64 "%08" XINT32 "!!!!!",
|
|
*(int64_t*)(m_keys+(sizeof(int32_t))),
|
|
*(int32_t*)m_keys,
|
|
*(int64_t*)(prevKey+(sizeof(int32_t))),
|
|
*(int32_t*)prevKey);
|
|
//printBucket();
|
|
return false;
|
|
//char* xx = NULL; *xx = 0;
|
|
}
|
|
}
|
|
|
|
for(int32_t i = 0; i < m_numKeys; i++) {
|
|
//log(LOG_WARN, "rdbbuckets last key: ""%016" XINT64 "%08" XINT32 " num keys: %" INT32 "",
|
|
// *(int64_t*)(kk+(sizeof(int32_t))), *(int32_t*)kk, m_numKeys);
|
|
if(i > 0 && KEYCMP(last, kk, ks) > 0) {
|
|
log(LOG_WARN, "db: bucket's last key was out "
|
|
"of order!!!!!"
|
|
"key was: %016" XINT64 "%08" XINT32 " vs prev: %016" XINT64 "%08" XINT32 ""
|
|
" num keys: %" INT32 ""
|
|
" ks=%" INT32 " bucketNum=%" INT32 "",
|
|
*(int64_t*)(kk+(sizeof(int32_t))), *(int32_t*)kk,
|
|
*(int64_t*)(last+(sizeof(int32_t))), *(int32_t*)last,
|
|
m_numKeys, ks, i);
|
|
return false;
|
|
//char* xx = NULL; *xx = 0;
|
|
}
|
|
last = kk;
|
|
kk += recSize;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void RdbBuckets::printBuckets() {
|
|
for(int32_t i = 0; i < m_numBuckets; i++) {
|
|
m_buckets[i]->printBucket();
|
|
}
|
|
}
|
|
|
|
|
|
void RdbBucket::printBucket() {
|
|
char* kk = m_keys;
|
|
int32_t recSize = m_parent->getRecSize();
|
|
for(int32_t i = 0; i < m_numKeys;i++) {
|
|
log(LOG_WARN, "rdbbuckets last key: ""%016" XINT64 "%08" XINT32 " num "
|
|
"keys: %" INT32 "",
|
|
*(int64_t*)(kk+(sizeof(int32_t))), *(int32_t*)kk, m_numKeys);
|
|
kk += recSize;
|
|
}
|
|
}
|
|
|
|
|
|
RdbBuckets::RdbBuckets() {
|
|
m_numBuckets = 0;
|
|
m_masterPtr = NULL;
|
|
m_buckets = NULL;
|
|
m_swapBuf = NULL;
|
|
m_sortBuf = NULL;
|
|
m_isWritable = true;
|
|
m_isSaving = false;
|
|
m_dataMemOccupied = 0;
|
|
m_needsSave = false;
|
|
m_repairMode = false;
|
|
}
|
|
|
|
|
|
|
|
bool RdbBuckets::set ( int32_t fixedDataSize , int32_t maxMem,
|
|
bool ownData ,
|
|
char *allocName ,
|
|
char rdbId,
|
|
bool dataInPtrs ,
|
|
char *dbname ,
|
|
char keySize ,
|
|
bool useProtection ) {
|
|
m_numBuckets = 0;
|
|
m_ks = keySize;
|
|
m_rdbId = rdbId;
|
|
m_allocName = allocName;
|
|
m_fixedDataSize = fixedDataSize;
|
|
m_recSize = m_ks;
|
|
if(m_fixedDataSize != 0) {
|
|
m_recSize += sizeof(char*);
|
|
if(m_fixedDataSize == -1) m_recSize += sizeof(int32_t);
|
|
}
|
|
m_numKeysApprox = 0;
|
|
m_numNegKeys = 0;
|
|
m_dbname = dbname;
|
|
m_swapBuf = NULL;
|
|
m_sortBuf = NULL;
|
|
//taken from sort.cpp, this is to prevent mergesort from mallocing
|
|
m_sortBufSize = BUCKET_SIZE * m_recSize + sizeof(char*);
|
|
if(m_buckets) {char *xx = NULL; *xx = 0;}
|
|
m_maxBuckets = 0;
|
|
m_masterSize = 0;
|
|
m_masterPtr = NULL;
|
|
m_maxMem = maxMem;
|
|
|
|
int32_t perBucket = sizeof(RdbBucket*) +
|
|
sizeof(RdbBucket)
|
|
+ BUCKET_SIZE * m_recSize;
|
|
int32_t overhead = m_sortBufSize +
|
|
BUCKET_SIZE * m_recSize + //swapbuf
|
|
sizeof(RdbBuckets); //that's us, silly
|
|
int32_t avail = m_maxMem - overhead;
|
|
|
|
m_maxBucketsCapacity = avail / perBucket;
|
|
if(m_maxBucketsCapacity <= 0) {
|
|
log("db: max memory for %s's buckets is way too small to"
|
|
" accommodate even 1 bucket, reduce bucket size(%" INT32 ") "
|
|
"or increase max mem(%" INT32 ")",
|
|
m_dbname, (int32_t)BUCKET_SIZE, m_maxMem);
|
|
char *xx = NULL; *xx = 0;
|
|
}
|
|
|
|
if(!resizeTable(INIT_SIZE)) {
|
|
g_errno = ENOMEM;
|
|
return false;
|
|
}
|
|
|
|
// log("init: Successfully initialized buckets for %s, "
|
|
// "keysize is %" INT32 ", max mem is %" INT32 ", datasize is %" INT32 "",
|
|
// m_dbname, (int32_t)m_ks, m_maxMem, m_fixedDataSize);
|
|
|
|
|
|
/*
|
|
RdbBuckets b;
|
|
b.set ( 0, // fixedDataSize,
|
|
50000000 , // maxTreeMem,
|
|
false, //own data
|
|
"tbuck", // m_treeName, // allocName
|
|
false, //data in ptrs
|
|
"tbuck",//m_dbname,
|
|
16, // m_ks,
|
|
false);
|
|
collnum_t cn = 1;
|
|
key128_t k;
|
|
k.n1 = 12;
|
|
k.n0 = 11;
|
|
b.addNode ( cn , (char *)&k, NULL, 0 );
|
|
// negate it
|
|
k.n0 = 10;
|
|
b.addNode ( cn , (char *)&k, NULL, 0 );
|
|
|
|
// try that
|
|
key128_t k1;
|
|
key128_t k2;
|
|
k1.setMin();
|
|
k2.setMax();
|
|
RdbList list;
|
|
int32_t np,nn;
|
|
b.getList ( cn,(char *)&k1,(char *)&k2,1000,&list,&np,&nn,false);
|
|
if ( np != 0 || nn != 1 ) { char *xx=NULL;*xx=0; }
|
|
// must be empty
|
|
if ( b.getNumKeys() != 0 ) { char *xx=NULL;*xx=0; }
|
|
*/
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
RdbBuckets::~RdbBuckets( ) {
|
|
reset();
|
|
}
|
|
|
|
|
|
void RdbBuckets::setNeedsSave(bool s) {
|
|
m_needsSave = s;
|
|
}
|
|
|
|
|
|
void RdbBuckets::reset() {
|
|
for(int32_t j = 0; j < m_numBuckets; j++) {
|
|
m_buckets[j]->reset();
|
|
}
|
|
if(m_masterPtr) mfree(m_masterPtr, m_masterSize, m_allocName );
|
|
m_masterPtr = NULL;
|
|
m_buckets = NULL;
|
|
m_bucketsSpace = NULL;
|
|
m_numBuckets = 0;
|
|
m_maxBuckets = 0;
|
|
m_dataMemOccupied = 0;
|
|
m_firstOpenSlot = 0;
|
|
m_numKeysApprox = 0;
|
|
m_numNegKeys = 0;
|
|
m_sortBuf = NULL;
|
|
m_swapBuf = NULL;
|
|
}
|
|
|
|
|
|
void RdbBuckets::clear() {
|
|
for(int32_t j = 0; j < m_numBuckets; j++) {
|
|
m_buckets[j]->reset();
|
|
}
|
|
m_numBuckets = 0;
|
|
m_firstOpenSlot = 0;
|
|
m_dataMemOccupied = 0;
|
|
m_numKeysApprox = 0;
|
|
m_numNegKeys = 0;
|
|
m_needsSave = true;
|
|
}
|
|
|
|
|
|
|
|
|
|
RdbBucket* RdbBuckets::bucketFactory() {
|
|
|
|
if(m_numBuckets == m_maxBuckets - 1) {
|
|
if(!resizeTable(m_maxBuckets * 2)) return NULL;
|
|
}
|
|
|
|
RdbBucket *b;
|
|
if(m_firstOpenSlot > m_numBuckets) {
|
|
int32_t i = 0;
|
|
for(; i < m_numBuckets; i++) {
|
|
if(m_bucketsSpace[i].getNumKeys() == 0) break;
|
|
}
|
|
b = &m_bucketsSpace[i];
|
|
}
|
|
else {
|
|
b = &m_bucketsSpace[m_firstOpenSlot];
|
|
m_firstOpenSlot++;
|
|
}
|
|
return b;
|
|
}
|
|
|
|
|
|
|
|
|
|
bool RdbBuckets::resizeTable(int32_t numNeeded) {
|
|
if(numNeeded == m_maxBuckets) return true;
|
|
|
|
if(numNeeded < INIT_SIZE) numNeeded = INIT_SIZE;
|
|
|
|
if(numNeeded > m_maxBucketsCapacity) {
|
|
if(m_maxBucketsCapacity <= m_maxBuckets) {
|
|
log(LOG_INFO,
|
|
"db: could not resize buckets currently have %" INT32 " "
|
|
"buckets, asked for %" INT32 ", max number of buckets"
|
|
" for %" INT32 " bytes with keysize %" INT32 " is %" INT32 "",
|
|
m_maxBuckets, numNeeded, m_maxMem, (int32_t)m_ks,
|
|
m_maxBucketsCapacity);
|
|
g_errno = ENOMEM;
|
|
return false;
|
|
}
|
|
// log(LOG_INFO,
|
|
// "db: scaling down request for buckets. "
|
|
// "Currently have %" INT32 " "
|
|
// "buckets, asked for %" INT32 ", max number of buckets"
|
|
// " for %" INT32 " bytes is %" INT32 ".",
|
|
// m_maxBuckets, numNeeded, m_maxMem, m_maxBucketsCapacity);
|
|
|
|
numNeeded = m_maxBucketsCapacity;
|
|
}
|
|
|
|
int32_t perBucket = sizeof(RdbBucket*) +
|
|
sizeof(RdbBucket)
|
|
+ BUCKET_SIZE * m_recSize;
|
|
|
|
int32_t tmpMaxBuckets = numNeeded;
|
|
int32_t newMasterSize = tmpMaxBuckets * perBucket +
|
|
(BUCKET_SIZE * m_recSize) + /*swap buf*/
|
|
m_sortBufSize; /*sort buf*/
|
|
|
|
if(newMasterSize > m_maxMem) {
|
|
log(LOG_WARN,
|
|
"db: Buckets oops, trying to malloc more(%" INT32 ") that max "
|
|
"mem(%" INT32 "), should've caught this earlier.",
|
|
newMasterSize, m_maxMem);
|
|
char* xx = NULL; *xx = 0;
|
|
}
|
|
|
|
char *tmpMasterPtr = (char*)mmalloc(newMasterSize, m_allocName);
|
|
if(!tmpMasterPtr) {
|
|
g_errno = ENOMEM;
|
|
return false;
|
|
}
|
|
char* p = tmpMasterPtr;
|
|
char* bucketMemPtr = p;
|
|
p += (BUCKET_SIZE * m_recSize) * tmpMaxBuckets;
|
|
m_swapBuf = p;
|
|
p += (BUCKET_SIZE * m_recSize);
|
|
m_sortBuf = p;
|
|
p += m_sortBufSize;
|
|
|
|
RdbBucket** tmpBucketPtrs = (RdbBucket**)p;
|
|
p += tmpMaxBuckets * sizeof(RdbBucket*);
|
|
RdbBucket* tmpBucketSpace = (RdbBucket*)p;
|
|
p += tmpMaxBuckets * sizeof(RdbBucket);
|
|
if(p - tmpMasterPtr != newMasterSize) {char* xx = NULL; *xx = 0;}
|
|
|
|
for(int32_t i = 0; i < m_numBuckets; i++) {
|
|
//copy them over one at a time so they
|
|
//will now be contiguous and consistent
|
|
//with the ptrs array.
|
|
tmpBucketPtrs[i] = &tmpBucketSpace[i];
|
|
gbmemcpy(&tmpBucketSpace[i],
|
|
m_buckets[i],
|
|
sizeof(RdbBucket));
|
|
tmpBucketSpace[i].reBuf(bucketMemPtr);
|
|
bucketMemPtr += (BUCKET_SIZE * m_recSize);
|
|
}
|
|
//now do the rest
|
|
for(int32_t i = m_numBuckets; i < tmpMaxBuckets; i++) {
|
|
tmpBucketSpace[i].set(this, bucketMemPtr);
|
|
bucketMemPtr += (BUCKET_SIZE * m_recSize);
|
|
}
|
|
if(bucketMemPtr != m_swapBuf) {char* xx = NULL; *xx = 0;}
|
|
|
|
// log(LOG_WARN, "new size = %" INT32 ", old size = %" INT32 ", newMemUsed = %" INT32 " "
|
|
// "oldMemUsed = %" INT32 "",
|
|
// numNeeded, m_maxBuckets, newMasterSize, m_masterSize);
|
|
|
|
if(m_masterPtr) mfree(m_masterPtr, m_masterSize, m_allocName);
|
|
m_masterPtr = tmpMasterPtr;
|
|
m_masterSize = newMasterSize;
|
|
m_buckets = tmpBucketPtrs;
|
|
m_bucketsSpace = tmpBucketSpace;
|
|
m_maxBuckets = tmpMaxBuckets;
|
|
m_firstOpenSlot = m_numBuckets;
|
|
return true;
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t RdbBuckets::addNode (collnum_t collnum,
|
|
char *key,
|
|
char *data , int32_t dataSize ) {
|
|
|
|
if(!m_isWritable || m_isSaving ) {
|
|
g_errno = EAGAIN;
|
|
return false;
|
|
}
|
|
|
|
m_needsSave = true;
|
|
|
|
int32_t i;
|
|
|
|
i = getBucketNum(key, collnum);
|
|
if(i == m_numBuckets ||
|
|
m_buckets[i]->getCollnum() != collnum) {
|
|
int32_t bucketsCutoff = (BUCKET_SIZE>>1);
|
|
//when repairing the keys are added in order,
|
|
//so fill them up all of the way before moving
|
|
//on to the next one.
|
|
if(m_repairMode) bucketsCutoff = BUCKET_SIZE;
|
|
|
|
if(i != 0 &&
|
|
m_buckets[i-1]->getCollnum() == collnum &&
|
|
m_buckets[i-1]->getNumKeys() < bucketsCutoff) {
|
|
i--;
|
|
}
|
|
else if(i == m_numBuckets) {
|
|
m_buckets[i] = bucketFactory();
|
|
if(m_buckets[i] == NULL) {
|
|
g_errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
m_buckets[i]->setCollnum(collnum);
|
|
m_numBuckets++;
|
|
}
|
|
else { //m_buckets[i]->getCollnum() != collnum
|
|
RdbBucket* newBucket = bucketFactory();
|
|
if(m_buckets[i] == NULL) {//can't really happen here..
|
|
g_errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
newBucket->setCollnum(collnum);
|
|
addBucket(newBucket, i);
|
|
}
|
|
|
|
}
|
|
//check if we are full
|
|
if(m_buckets[i]->getNumKeys() == BUCKET_SIZE) {
|
|
//split the bucket
|
|
int64_t t = gettimeofdayInMilliseconds();
|
|
m_buckets[i]->sort();
|
|
RdbBucket* newBucket = bucketFactory();
|
|
if(newBucket == NULL ) {
|
|
g_errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
newBucket->setCollnum(collnum);
|
|
m_buckets[i]->split(newBucket);
|
|
addBucket(newBucket, i+1);
|
|
if(bucketCmp(key, collnum, m_buckets[i]) > 0) i++;
|
|
|
|
int64_t took = gettimeofdayInMilliseconds() - t;
|
|
if(took > 10) log(LOG_WARN,
|
|
"db: split bucket in %" INT64 " ms for %s",took,
|
|
m_dbname);
|
|
}
|
|
|
|
m_buckets[i]->addKey(key, data, dataSize);
|
|
//if(rand() % 100 == 0) selfTest(true, true);
|
|
return 0;
|
|
}
|
|
|
|
|
|
bool RdbBuckets::addBucket (RdbBucket* newBucket, int32_t i) {
|
|
|
|
//int32_t i = getBucketNum(newBucket->getEndKey(), newBucket->getCollnum());
|
|
m_numBuckets++;
|
|
int32_t moveSize = (m_numBuckets - i)*sizeof(RdbBuckets*);
|
|
if(moveSize > 0)
|
|
memmove(&m_buckets[i+1], &m_buckets[i], moveSize);
|
|
m_buckets[i] = newBucket;
|
|
return true;
|
|
}
|
|
|
|
// void RdbBuckets::deleteBucket ( int32_t i ) {
|
|
// int32_t moveSize = (m_numBuckets - i)*sizeof(RdbBuckets*);
|
|
// if(moveSize > 0)
|
|
// memmove(&m_buckets[i+1], &m_buckets[i], moveSize);
|
|
// m_numBuckets--;
|
|
// }
|
|
|
|
bool RdbBuckets::getList ( collnum_t collnum ,
|
|
char *startKey, char *endKey, int32_t minRecSizes ,
|
|
RdbList *list , int32_t *numPosRecs ,
|
|
int32_t *numNegRecs,
|
|
bool useHalfKeys ) {
|
|
|
|
if ( numNegRecs ) *numNegRecs = 0;
|
|
if ( numPosRecs ) *numPosRecs = 0;
|
|
// set *lastKey in case we have no nodes in the list
|
|
//if ( lastKey ) *lastKey = endKey;
|
|
// . set the start and end keys of this list
|
|
// . set lists's m_ownData member to true
|
|
list->reset();
|
|
// got set m_ks first so the set ( startKey, endKey ) works!
|
|
list->m_ks = m_ks;
|
|
list->set ( startKey , endKey );
|
|
list->setFixedDataSize ( m_fixedDataSize );
|
|
list->setUseHalfKeys ( useHalfKeys );
|
|
// bitch if list does not own his own data
|
|
if ( ! list->getOwnData() ) {
|
|
g_errno = EBADENGINEER;
|
|
return log(LOG_LOGIC,"db: rdbbuckets: getList: List does not "
|
|
"own data");
|
|
}
|
|
// bail if minRecSizes is 0
|
|
if ( minRecSizes == 0 ) return true;
|
|
if ( minRecSizes < 0 ) minRecSizes = 0x7fffffff;//LONG_MAX;
|
|
|
|
int32_t startBucket = getBucketNum(startKey, collnum);
|
|
if(startBucket > 0 &&
|
|
bucketCmp(startKey, collnum, m_buckets[startBucket-1]) < 0)
|
|
startBucket--;
|
|
|
|
// if the startKey is past our last bucket, then nothing
|
|
// to return
|
|
if(startBucket == m_numBuckets ||
|
|
m_buckets[startBucket]->getCollnum() != collnum) return true;
|
|
|
|
|
|
int32_t endBucket;
|
|
if(bucketCmp(endKey, collnum, m_buckets[startBucket]) <= 0)
|
|
endBucket = startBucket;
|
|
else endBucket = getBucketNum(endKey, collnum);
|
|
if(endBucket == m_numBuckets ||
|
|
m_buckets[endBucket]->getCollnum() != collnum) endBucket--;
|
|
//log(LOG_WARN, "db numBuckets %" INT32 " start %" INT32 " end %" INT32 "",
|
|
//m_numBuckets, startBucket, endBucket);
|
|
if(m_buckets[endBucket]->getCollnum() != collnum) {
|
|
char* xx = NULL; *xx = 0;
|
|
}
|
|
|
|
int32_t growth = 0;
|
|
|
|
if(startBucket == endBucket) {
|
|
growth = m_buckets[startBucket]->getNumKeys() * m_recSize;
|
|
if(growth > minRecSizes) growth = minRecSizes + m_recSize;
|
|
if(!list->growList(growth))
|
|
return log("db: Failed to grow list to %" INT32 " bytes "
|
|
"for storing "
|
|
"records from buckets: %s.",
|
|
growth,mstrerror(g_errno));
|
|
|
|
if(!m_buckets[startBucket]->getList(list,
|
|
startKey,
|
|
endKey,
|
|
minRecSizes,
|
|
numPosRecs,
|
|
numNegRecs,
|
|
useHalfKeys))
|
|
return false;
|
|
return true;
|
|
}
|
|
|
|
//reserve some space, it is an upper bound
|
|
for(int32_t i = startBucket; i <= endBucket; i++)
|
|
growth += m_buckets[i]->getNumKeys() * m_recSize;
|
|
|
|
if(growth > minRecSizes) growth = minRecSizes + m_recSize;
|
|
if(!list->growList(growth))
|
|
return log("db: Failed to grow list to %" INT32 " bytes for storing "
|
|
"records from buckets: %s.",
|
|
growth, mstrerror(g_errno));
|
|
|
|
// separate into 3 different calls so we don't have
|
|
// to search for the start and end keys within the buckets
|
|
// unnecessarily.
|
|
if(!m_buckets[startBucket]->getList(list,
|
|
startKey,
|
|
NULL,
|
|
minRecSizes,
|
|
numPosRecs,
|
|
numNegRecs,
|
|
useHalfKeys))
|
|
return false;
|
|
|
|
int32_t i = startBucket + 1;
|
|
for(; i < endBucket && list->getListSize() < minRecSizes; i++) {
|
|
if(!m_buckets[i]->getList(list,
|
|
NULL,
|
|
NULL,
|
|
minRecSizes,
|
|
numPosRecs,
|
|
numNegRecs,
|
|
useHalfKeys))
|
|
return false;
|
|
}
|
|
|
|
if(list->getListSize() < minRecSizes)
|
|
if(!m_buckets[i]->getList(list,
|
|
NULL,
|
|
endKey,
|
|
minRecSizes,
|
|
numPosRecs,
|
|
numNegRecs,
|
|
useHalfKeys))
|
|
return false;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
int RdbBuckets::getListSizeExact ( collnum_t collnum ,
|
|
char *startKey,
|
|
char *endKey ) {
|
|
|
|
int numBytes = 0;
|
|
|
|
int32_t startBucket = getBucketNum(startKey, collnum);
|
|
|
|
// does this mean empty?
|
|
if(startBucket > 0 &&
|
|
bucketCmp(startKey, collnum, m_buckets[startBucket-1]) < 0)
|
|
startBucket--;
|
|
|
|
if(startBucket == m_numBuckets ||
|
|
m_buckets[startBucket]->getCollnum() != collnum)
|
|
return 0;
|
|
|
|
int32_t endBucket;
|
|
if(bucketCmp(endKey, collnum, m_buckets[startBucket]) <= 0)
|
|
endBucket = startBucket;
|
|
else
|
|
endBucket = getBucketNum(endKey, collnum);
|
|
|
|
if(endBucket == m_numBuckets ||
|
|
m_buckets[endBucket]->getCollnum() != collnum) endBucket--;
|
|
|
|
//log(LOG_WARN, "db numBuckets %" INT32 " start %" INT32 " end %" INT32 "",
|
|
//m_numBuckets, startBucket, endBucket);
|
|
if(m_buckets[endBucket]->getCollnum() != collnum) {
|
|
char* xx = NULL; *xx = 0; }
|
|
|
|
for( int32_t i = startBucket ; i <= endBucket ; i++)
|
|
numBytes += m_buckets[i]->getListSizeExact(startKey,endKey);
|
|
|
|
return numBytes;
|
|
}
|
|
|
|
bool RdbBuckets::testAndRepair() {
|
|
if(!selfTest(true/*thorough*/,
|
|
false/*core on error*/)) {
|
|
if(!repair()) return false;
|
|
m_needsSave = true;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
bool RdbBuckets::repair() {
|
|
if(m_numBuckets == 0 &&
|
|
(m_numKeysApprox != 0 || m_numNegKeys != 0)) {
|
|
m_numKeysApprox = 0;
|
|
m_numNegKeys = 0;
|
|
log("db: RdbBuckets repaired approx key count to reflect "
|
|
"true number of keys.");
|
|
}
|
|
|
|
//int32_t tmpMaxBuckets = m_maxBuckets;
|
|
int32_t tmpMasterSize = m_masterSize;
|
|
char *tmpMasterPtr = m_masterPtr;
|
|
RdbBucket **tmpBucketPtrs = m_buckets;
|
|
int32_t tmpNumBuckets = m_numBuckets;
|
|
m_masterPtr = NULL;
|
|
m_masterSize = 0;
|
|
m_numBuckets = 0;
|
|
reset();
|
|
if(!resizeTable(INIT_SIZE)) {
|
|
log("db: RdbBuckets could not alloc enough memory to repair "
|
|
"corruption.");
|
|
g_errno = ENOMEM;
|
|
return false;
|
|
}
|
|
|
|
m_repairMode = true;
|
|
|
|
for(int32_t j = 0; j < tmpNumBuckets; j++) {
|
|
collnum_t collnum = tmpBucketPtrs[j]->getCollnum();
|
|
for(int32_t i = 0; i < tmpBucketPtrs[j]->getNumKeys(); i++) {
|
|
char* currRec = tmpBucketPtrs[j]->getKeys() +
|
|
m_recSize * i;
|
|
char* data = NULL;
|
|
int32_t dataSize = m_fixedDataSize;
|
|
|
|
if(m_fixedDataSize != 0) {
|
|
data = currRec + m_ks;
|
|
if(m_fixedDataSize == -1)
|
|
dataSize = *(int32_t*)(data + sizeof(char*));
|
|
}
|
|
if(addNode(collnum, currRec, data, dataSize) < 0) {
|
|
log(LOG_WARN, "db: got unrepairable error in "
|
|
"RdbBuckets, could not re-add data");
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
|
|
m_repairMode = false;
|
|
|
|
if(tmpMasterPtr) mfree(tmpMasterPtr, tmpMasterSize, m_allocName);
|
|
|
|
log("db: RdbBuckets repair for %" INT32 " keys complete", m_numKeysApprox);
|
|
return true;
|
|
}
|
|
|
|
|
|
|
|
bool RdbBuckets::selfTest(bool thorough, bool core) {
|
|
if(m_numBuckets == 0 && m_numKeysApprox != 0) return false;
|
|
int32_t totalNumKeys = 0;
|
|
char* last = NULL;
|
|
collnum_t lastcoll = -1;
|
|
int32_t numColls = 0;
|
|
|
|
for(int32_t i = 0; i < m_numBuckets; i++) {
|
|
RdbBucket* b = m_buckets[i];
|
|
if(lastcoll != b->getCollnum()) {
|
|
last = NULL;
|
|
numColls++;
|
|
}
|
|
if(thorough) {
|
|
if(!b->selfTest (last)) {
|
|
if(!core) return false;
|
|
char* xx = NULL; *xx = 0;
|
|
}
|
|
}
|
|
|
|
|
|
totalNumKeys += b->getNumKeys();
|
|
char* kk = b->getEndKey();
|
|
//log(LOG_WARN, "rdbbuckets last key: ""%016" XINT64 "%08" XINT32 " "
|
|
//"num keys: %" INT32 "",
|
|
//*(int64_t*)(kk+(sizeof(int32_t))),*(int32_t*)kk,b->getNumKeys());
|
|
if(i > 0 &&
|
|
lastcoll == b->getCollnum() &&
|
|
KEYCMPNEGEQ(last, kk,m_ks) >= 0) {
|
|
log(LOG_WARN, "rdbbuckets last key: "
|
|
"%016" XINT64 "%08" XINT32 " num keys: %" INT32 "",
|
|
*(int64_t*)(kk+(sizeof(int32_t))),
|
|
*(int32_t*)kk, b->getNumKeys());
|
|
log(LOG_WARN, "rdbbuckets last key was out "
|
|
"of order!!!!!");
|
|
if(!core) return false;
|
|
char* xx = NULL; *xx = 0;
|
|
}
|
|
last = kk;
|
|
lastcoll = b->getCollnum();
|
|
}
|
|
if ( totalNumKeys != m_numKeysApprox )
|
|
log(LOG_WARN, "db have %" INT32 " keys, should have %" INT32 ". "
|
|
"%" INT32 " buckets in %" INT32 " colls for db %s",
|
|
totalNumKeys, m_numKeysApprox, m_numBuckets,
|
|
numColls, m_dbname);
|
|
|
|
if(thorough && totalNumKeys != m_numKeysApprox) {
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
char RdbBuckets::bucketCmp(char *akey, collnum_t acoll,
|
|
char *bkey, collnum_t bcoll) {
|
|
if (acoll == bcoll) return KEYCMPNEGEQ(akey, bkey, m_ks);
|
|
if (acoll < bcoll) return -1;
|
|
return 1;
|
|
}
|
|
|
|
char RdbBuckets::bucketCmp(char *akey, collnum_t acoll,
|
|
RdbBucket* b) {
|
|
if (acoll == b->getCollnum())
|
|
return KEYCMPNEGEQ(akey, b->getEndKey(), m_ks);
|
|
if (acoll < b->getCollnum()) return -1;
|
|
return 1;
|
|
}
|
|
|
|
|
|
|
|
int32_t RdbBuckets::getBucketNum(char* key, collnum_t collnum) {
|
|
|
|
if(m_numBuckets < 10) {
|
|
int32_t i = 0;
|
|
for(; i < m_numBuckets; i++) {
|
|
RdbBucket* b = m_buckets[i];
|
|
char v = bucketCmp(key, collnum, b);
|
|
if(v > 0) continue;
|
|
if(v < 0) {break;}
|
|
else break;
|
|
}
|
|
return i;
|
|
}
|
|
int32_t i = 0;
|
|
char v;
|
|
RdbBucket* b = NULL;
|
|
int32_t low = 0;
|
|
int32_t high = m_numBuckets - 1;
|
|
while(low <= high) {
|
|
int32_t delta = high - low;
|
|
i = low + (delta >> 1);
|
|
b = m_buckets[i];
|
|
char v = bucketCmp(key, collnum, b);
|
|
if(v < 0) {
|
|
high = i - 1;
|
|
continue;
|
|
}
|
|
else if(v > 0) {
|
|
low = i + 1;
|
|
continue;
|
|
}
|
|
else return i;
|
|
}
|
|
|
|
//now fine tune:
|
|
v = bucketCmp(key, collnum, b);
|
|
if(v > 0) i++;
|
|
return i;
|
|
}
|
|
|
|
|
|
bool RdbBuckets::collExists(collnum_t collnum) {
|
|
for(int32_t i = 0; i < m_numBuckets; i++) {
|
|
if(m_buckets[i]->getCollnum() == collnum)return true;
|
|
if(m_buckets[i]->getCollnum() > collnum) break;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
int32_t RdbBuckets::getNumKeys(collnum_t collnum) {
|
|
int32_t numKeys = 0;
|
|
for(int32_t i = 0; i < m_numBuckets; i++) {
|
|
if(m_buckets[i]->getCollnum() == collnum)
|
|
numKeys += m_buckets[i]->getNumKeys();
|
|
if(m_buckets[i]->getCollnum() > collnum) break;
|
|
}
|
|
return numKeys;
|
|
}
|
|
|
|
|
|
int32_t RdbBuckets::getNumKeys() {
|
|
return m_numKeysApprox;
|
|
}
|
|
|
|
|
|
// int32_t RdbBuckets::getNumNegativeKeys ( collnum_t collnum ) {
|
|
// return m_numNegKeys;
|
|
// }
|
|
|
|
|
|
// int32_t RdbBuckets::getNumPositiveKeys ( collnum_t collnum ) {
|
|
// return getNumKeys(collnum) - getNumNegativeKeys(collnum);
|
|
// }
|
|
|
|
|
|
int32_t RdbBuckets::getNumNegativeKeys ( ) {
|
|
return m_numNegKeys;
|
|
}
|
|
|
|
|
|
int32_t RdbBuckets::getNumPositiveKeys ( ) {
|
|
return getNumKeys() - getNumNegativeKeys ( );
|
|
}
|
|
|
|
|
|
char* RdbBuckets::getKeyVal ( collnum_t collnum , char *key ,
|
|
char **data , int32_t* dataSize ) {
|
|
|
|
int32_t i = getBucketNum(key, collnum);
|
|
if(i == m_numBuckets ||
|
|
m_buckets[i]->getCollnum() != collnum ) return NULL;
|
|
return m_buckets[i]->getKeyVal(key, data, dataSize);
|
|
}
|
|
|
|
|
|
void RdbBuckets::updateNumRecs(int32_t n, int32_t bytes, int32_t numNeg) {
|
|
m_numKeysApprox += n;
|
|
m_dataMemOccupied += bytes;
|
|
m_numNegKeys += numNeg;
|
|
}
|
|
|
|
|
|
char *RdbBucket::getFirstKey() {
|
|
sort();
|
|
return m_keys;
|
|
}
|
|
|
|
int32_t RdbBucket::getNumNegativeKeys ( ) {
|
|
int32_t numNeg = 0;
|
|
int32_t recSize = m_parent->getRecSize();
|
|
char *currKey = m_keys;
|
|
|
|
char *lastKey = m_keys + (m_numKeys * recSize);
|
|
|
|
while(currKey < lastKey) { //&& !list->isExhausted()
|
|
if ( KEYNEG(currKey) ) numNeg++;
|
|
currKey += recSize;
|
|
}
|
|
return numNeg;
|
|
}
|
|
|
|
|
|
bool RdbBucket::getList(RdbList* list,
|
|
char* startKey,
|
|
char* endKey,
|
|
int32_t minRecSizes,
|
|
int32_t *numPosRecs,
|
|
int32_t *numNegRecs,
|
|
bool useHalfKeys) {
|
|
|
|
sort();
|
|
//get our bounds within the bucket:
|
|
uint8_t ks = m_parent->getKeySize();
|
|
int32_t recSize = m_parent->getRecSize();
|
|
int32_t start = 0;
|
|
int32_t end = m_numKeys - 1;
|
|
char v;
|
|
char* kk = NULL;
|
|
if(startKey) {
|
|
int32_t low = 0;
|
|
int32_t high = m_numKeys - 1;
|
|
while(low <= high) {
|
|
int32_t delta = high - low;
|
|
start = low + (delta >> 1);
|
|
kk = m_keys + (recSize * start);
|
|
v = KEYCMP(startKey,kk,ks);
|
|
if(v < 0) {
|
|
high = start - 1;
|
|
continue;
|
|
}
|
|
else if(v > 0) {
|
|
low = start + 1;
|
|
continue;
|
|
}
|
|
else break;
|
|
|
|
}
|
|
//now back up or move forward s.t. startKey
|
|
//is <= start
|
|
while(start < m_numKeys) {
|
|
kk = m_keys + (recSize * start);
|
|
v = KEYCMP(startKey, kk, ks);
|
|
if(v > 0) start++;
|
|
else break;
|
|
}
|
|
}
|
|
else start = 0;
|
|
|
|
|
|
if(endKey) {
|
|
int32_t low = start;
|
|
int32_t high = m_numKeys - 1;
|
|
while(low <= high) {
|
|
int32_t delta = high - low;
|
|
end = low + (delta >> 1);
|
|
kk = m_keys + (recSize * end);
|
|
v = KEYCMP(endKey,kk,ks);
|
|
if(v < 0) {
|
|
high = end - 1;
|
|
continue;
|
|
}
|
|
else if(v > 0) {
|
|
low = end + 1;
|
|
continue;
|
|
}
|
|
else break;
|
|
|
|
}
|
|
while(end > 0) {
|
|
kk = m_keys + (recSize * end);
|
|
v = KEYCMP(endKey, kk, ks);
|
|
if(v < 0) end--;
|
|
else break;
|
|
}
|
|
|
|
}
|
|
else end = m_numKeys - 1;
|
|
|
|
//log(LOG_WARN, "numKeys %" INT32 " start %" INT32 " end %" INT32 "",
|
|
//m_numKeys, start, end);
|
|
|
|
//keep track of our negative a positive recs
|
|
int32_t numNeg = 0;
|
|
int32_t numPos = 0;
|
|
|
|
int32_t fixedDataSize = m_parent->getFixedDataSize();
|
|
|
|
char* currKey = m_keys + (start * recSize);
|
|
|
|
//bail now if there is only one key and it is out of range.
|
|
if(start == end &&
|
|
((startKey && KEYCMP(currKey, startKey, ks) < 0) ||
|
|
(endKey && KEYCMP(currKey, endKey, ks) > 0))) {
|
|
return true;
|
|
}
|
|
// //set our real start key
|
|
// if(startKey != NULL) list->setStartKey(currKey);
|
|
|
|
char* lastKey = NULL;
|
|
for(int32_t i = start;
|
|
i <= end && list->getListSize() < minRecSizes;
|
|
i++, currKey += recSize) {
|
|
if ( fixedDataSize == 0 ) {
|
|
if ( ! list->addRecord(currKey, 0, NULL))
|
|
return log("db: Failed to add record "
|
|
"to list for %s: %s. "
|
|
"Fix the growList algo.",
|
|
m_parent->getDbname(),
|
|
mstrerror(g_errno));
|
|
}
|
|
else {
|
|
int32_t dataSize = fixedDataSize;
|
|
if ( fixedDataSize == -1 )
|
|
dataSize = *(int32_t*)(currKey +
|
|
ks + sizeof(char*));
|
|
if ( ! list->addRecord ( currKey ,
|
|
dataSize,
|
|
currKey + ks) )
|
|
return log("db: Failed to add record "
|
|
"to list for %s: %s. "
|
|
"Fix the growList algo.",
|
|
m_parent->getDbname(),
|
|
mstrerror(g_errno));
|
|
}
|
|
if ( KEYNEG(currKey) ) numNeg++;
|
|
else numPos++;
|
|
lastKey = currKey;
|
|
|
|
#ifdef GBSANITYCHECK
|
|
//sanity, remove for production
|
|
if(startKey && KEYCMP(currKey, startKey, ks) < 0) {
|
|
log("db: key is outside the "
|
|
"keyrange given for getList."
|
|
" it is < startkey."
|
|
" %016" XINT64 "%08" XINT32 " %016" XINT64 "%08" XINT32 "."
|
|
" getting keys %" INT32 " to %" INT32 " for list"
|
|
"bounded by %016" XINT64 "%08" XINT32 " %016" XINT64 "%08" XINT32 "",
|
|
*(int64_t*)(startKey+(sizeof(int32_t))),
|
|
*(int32_t*)startKey,
|
|
*(int64_t*)(currKey+(sizeof(int32_t))),
|
|
*(int32_t*)currKey,
|
|
start, end,
|
|
*(int64_t*)(startKey+(sizeof(int32_t))),
|
|
*(int32_t*)startKey,
|
|
*(int64_t*)(endKey+(sizeof(int32_t))),
|
|
*(int32_t*)endKey);
|
|
|
|
printBucket();
|
|
char* xx=NULL; *xx=0;
|
|
}
|
|
if(endKey && KEYCMP(currKey, endKey, ks) > 0) {
|
|
log("db: key is outside the "
|
|
"keyrange given for getList."
|
|
" it is > endkey"
|
|
" %016" XINT64 "%08" XINT32 " %016" XINT64 "%08" XINT32 "."
|
|
" getting keys %" INT32 " to %" INT32 " for list"
|
|
"bounded by %016" XINT64 "%08" XINT32 " %016" XINT64 "%08" XINT32 "",
|
|
*(int64_t*)(currKey+(sizeof(int32_t))),
|
|
*(int32_t*)currKey,
|
|
*(int64_t*)(endKey+(sizeof(int32_t))),
|
|
*(int32_t*)endKey,
|
|
start, end,
|
|
*(int64_t*)(startKey+(sizeof(int32_t))),
|
|
*(int32_t*)startKey,
|
|
*(int64_t*)(endKey+(sizeof(int32_t))),
|
|
*(int32_t*)endKey);
|
|
|
|
printBucket();
|
|
char* xx=NULL; *xx=0;
|
|
}
|
|
#endif
|
|
}
|
|
|
|
// set counts to pass back, we may be accumulating over multiple
|
|
// buckets so add it to the count
|
|
if ( numNegRecs ) *numNegRecs += numNeg;
|
|
if ( numPosRecs ) *numPosRecs += numPos;
|
|
|
|
//if we don't have an end key, we were not the last bucket, so don't
|
|
//finalize the list... yes do, because we might've hit min rec sizes
|
|
if(endKey == NULL && list->getListSize() < minRecSizes) return true;
|
|
|
|
if ( lastKey != NULL ) list->setLastKey ( lastKey );
|
|
|
|
// reset the list's endKey if we hit the minRecSizes barrier cuz
|
|
// there may be more records before endKey than we put in "list"
|
|
if ( list->getListSize() >= minRecSizes && lastKey != NULL ) {
|
|
// use the last key we read as the new endKey
|
|
//key_t newEndKey = m_keys[lastNode];
|
|
char newEndKey[MAX_KEY_BYTES];
|
|
KEYSET(newEndKey, lastKey, ks);
|
|
// . if he's negative, boost new endKey by 1 because endKey's
|
|
// aren't allowed to be negative
|
|
// . we're assured there's no positive counterpart to him
|
|
// since Rdb::addRecord() doesn't allow both to exist in
|
|
// the tree at the same time
|
|
// . if by some chance his positive counterpart is in the
|
|
// tree, then it's ok because we'd annihilate him anyway,
|
|
// so we might as well ignore him
|
|
// we are little endian
|
|
if ( KEYNEG(newEndKey,0,ks) ) KEYADD(newEndKey,1,ks);
|
|
// if we're using half keys set his half key bit
|
|
if ( useHalfKeys ) KEYOR(newEndKey,0x02);
|
|
if ( m_parent->m_rdbId == RDB_POSDB ||
|
|
m_parent->m_rdbId == RDB2_POSDB2 )
|
|
newEndKey[0] |= 0x04;
|
|
// tell list his new endKey now
|
|
list->setEndKey ( newEndKey );
|
|
}
|
|
// reset list ptr to point to first record
|
|
list->resetListPtr();
|
|
|
|
// success
|
|
return true;
|
|
}
|
|
|
|
|
|
|
|
int RdbBucket::getListSizeExact (char* startKey, char* endKey ) {
|
|
|
|
int32_t numRecs = 0;
|
|
|
|
sort();
|
|
//get our bounds within the bucket:
|
|
uint8_t ks = m_parent->getKeySize();
|
|
int32_t recSize = m_parent->getRecSize();
|
|
int32_t start = 0;
|
|
int32_t end = m_numKeys - 1;
|
|
char v;
|
|
char* kk = NULL;
|
|
|
|
int32_t low = 0;
|
|
int32_t high = m_numKeys - 1;
|
|
while(low <= high) {
|
|
int32_t delta = high - low;
|
|
start = low + (delta >> 1);
|
|
kk = m_keys + (recSize * start);
|
|
v = KEYCMP(startKey,kk,ks);
|
|
if(v < 0) {
|
|
high = start - 1;
|
|
continue;
|
|
}
|
|
else if(v > 0) {
|
|
low = start + 1;
|
|
continue;
|
|
}
|
|
else break;
|
|
|
|
}
|
|
//now back up or move forward s.t. startKey
|
|
//is <= start
|
|
while(start < m_numKeys) {
|
|
kk = m_keys + (recSize * start);
|
|
v = KEYCMP(startKey, kk, ks);
|
|
if(v > 0) start++;
|
|
else break;
|
|
}
|
|
|
|
|
|
|
|
|
|
low = start;
|
|
high = m_numKeys - 1;
|
|
while(low <= high) {
|
|
int32_t delta = high - low;
|
|
end = low + (delta >> 1);
|
|
kk = m_keys + (recSize * end);
|
|
v = KEYCMP(endKey,kk,ks);
|
|
if(v < 0) {
|
|
high = end - 1;
|
|
continue;
|
|
}
|
|
else if(v > 0) {
|
|
low = end + 1;
|
|
continue;
|
|
}
|
|
else break;
|
|
|
|
}
|
|
while(end > 0) {
|
|
kk = m_keys + (recSize * end);
|
|
v = KEYCMP(endKey, kk, ks);
|
|
if(v < 0) end--;
|
|
else break;
|
|
}
|
|
|
|
|
|
//keep track of our negative a positive recs
|
|
//int32_t numNeg = 0;
|
|
//int32_t numPos = 0;
|
|
|
|
int32_t fixedDataSize = m_parent->getFixedDataSize();
|
|
|
|
char* currKey = m_keys + (start * recSize);
|
|
|
|
//bail now if there is only one key and it is out of range.
|
|
if(start == end &&
|
|
((startKey && KEYCMP(currKey, startKey, ks) < 0) ||
|
|
(endKey && KEYCMP(currKey, endKey, ks) > 0))) {
|
|
return 0;
|
|
}
|
|
|
|
// MDW: are none negatives?
|
|
if ( fixedDataSize == 0 ) {
|
|
numRecs = (end - start) * ks;
|
|
return numRecs;
|
|
}
|
|
|
|
char* lastKey = NULL;
|
|
for(int32_t i = start;
|
|
i <= end ; //&& list->getListSize() < minRecSizes;
|
|
i++, currKey += recSize) {
|
|
// if ( fixedDataSize == 0 ) {
|
|
// numRecs++;
|
|
// }
|
|
// else {
|
|
int32_t dataSize = fixedDataSize;
|
|
if ( fixedDataSize == -1 )
|
|
dataSize = *(int32_t*)(currKey+ks+sizeof(char*));
|
|
numRecs++;
|
|
//}
|
|
lastKey = currKey;
|
|
}
|
|
|
|
// success
|
|
return numRecs * ks ;
|
|
}
|
|
|
|
|
|
bool RdbBuckets::deleteList(collnum_t collnum, RdbList *list) {
|
|
if(list->getListSize() == 0) return true;
|
|
|
|
if(!m_isWritable || m_isSaving ) {
|
|
g_errno = EAGAIN;
|
|
return false;
|
|
}
|
|
|
|
// . set this right away because the head bucket needs to know if we
|
|
// . need to save
|
|
m_needsSave = true;
|
|
|
|
char startKey [ MAX_KEY_BYTES ];
|
|
char endKey [ MAX_KEY_BYTES ];
|
|
list->getStartKey ( startKey );
|
|
list->getEndKey ( endKey );
|
|
|
|
int32_t startBucket = getBucketNum(startKey, collnum);
|
|
if(startBucket > 0 &&
|
|
bucketCmp(startKey, collnum, m_buckets[startBucket-1]) < 0)
|
|
startBucket--;
|
|
|
|
// if the startKey is past our last bucket, then nothing
|
|
// to delete
|
|
if(startBucket == m_numBuckets ||
|
|
m_buckets[startBucket]->getCollnum() != collnum) return true;
|
|
|
|
|
|
int32_t endBucket = getBucketNum(endKey, collnum);
|
|
if(endBucket == m_numBuckets ||
|
|
m_buckets[endBucket]->getCollnum() != collnum) endBucket--;
|
|
//log(LOG_WARN, "db numBuckets %" INT32 " start %" INT32 " end %" INT32 "",
|
|
// m_numBuckets, startBucket, endBucket);
|
|
|
|
list->resetListPtr();
|
|
for(int32_t i= startBucket; i <= endBucket && !list->isExhausted(); i++) {
|
|
if(!m_buckets[i]->deleteList(list)) {
|
|
m_buckets[i]->reset();
|
|
m_buckets[i] = NULL;
|
|
}
|
|
}
|
|
int32_t j = 0;
|
|
for(int32_t i = 0; i < m_numBuckets; i++)
|
|
if(m_buckets[i]) m_buckets[j++] = m_buckets[i];
|
|
m_numBuckets = j;
|
|
|
|
//did we delete the whole darn thing?
|
|
if(m_numBuckets == 0) {
|
|
if(m_numKeysApprox != 0) {
|
|
log("db: bucket's number of keys is getting off by %" INT32 ""
|
|
" after deleting a list", m_numKeysApprox);
|
|
char *xx = NULL; *xx = 0;
|
|
}
|
|
m_firstOpenSlot = 0;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
bool RdbBucket::deleteList(RdbList *list) {
|
|
|
|
sort();
|
|
uint8_t ks = m_parent->getKeySize();
|
|
int32_t recSize = m_parent->getRecSize();
|
|
int32_t fixedDataSize = m_parent->getFixedDataSize();
|
|
char v;
|
|
|
|
char *currKey = m_keys;
|
|
char *p = currKey;
|
|
char listkey[MAX_KEY_BYTES];
|
|
|
|
char *lastKey = m_keys + (m_numKeys * recSize);
|
|
int32_t br = 0; //bytes removed
|
|
int32_t dso = ks+sizeof(char*);//datasize offset
|
|
int32_t numNeg = 0;
|
|
|
|
list->getCurrentKey(listkey);
|
|
while(currKey < lastKey) { //&& !list->isExhausted()
|
|
|
|
v = KEYCMP(currKey, listkey, ks);
|
|
if(v == 0) {
|
|
if(fixedDataSize != 0) {
|
|
if(fixedDataSize == -1)
|
|
br += *(int32_t*)(currKey+dso);
|
|
else br += fixedDataSize;
|
|
}
|
|
if(KEYNEG(currKey)) numNeg++;
|
|
|
|
// . forget it exists by advancing read ptr without
|
|
// . advancing the write ptr
|
|
currKey += recSize;
|
|
if(!list->skipCurrentRecord()) break;
|
|
list->getCurrentKey(listkey);
|
|
continue;
|
|
}
|
|
else if (v < 0) {
|
|
// . copy this key into place, it was not in the
|
|
// . delete list
|
|
if(p != currKey) gbmemcpy(p, currKey, recSize);
|
|
p += recSize;
|
|
currKey += recSize;
|
|
}
|
|
else { //list key > current key
|
|
// . otherwise advance the delete list until
|
|
//listKey is <= currKey
|
|
if(!list->skipCurrentRecord()) break;
|
|
list->getCurrentKey(listkey);
|
|
}
|
|
}
|
|
|
|
// . do we need to finish copying our list down to the
|
|
// . vacated mem?
|
|
if(currKey < lastKey) {
|
|
int32_t tmpSize = lastKey - currKey;
|
|
gbmemcpy(p, currKey, tmpSize);
|
|
p += tmpSize;
|
|
}
|
|
|
|
if(p > m_keys) { //do we have anything left?
|
|
int32_t newNumKeys = (p - m_keys) / recSize;
|
|
m_parent->updateNumRecs(newNumKeys - m_numKeys, - br, -numNeg);
|
|
m_numKeys = newNumKeys;
|
|
m_lastSorted = m_numKeys;
|
|
m_endKey = m_keys + ((m_numKeys - 1) * recSize);
|
|
return true;
|
|
|
|
}
|
|
else {
|
|
//we deleted the entire bucket, let our parent know to free us
|
|
m_parent->updateNumRecs( - m_numKeys, - br, -numNeg);
|
|
return false;
|
|
}
|
|
// success
|
|
return true;
|
|
}
|
|
|
|
// remove keys from any non-existent collection
|
|
void RdbBuckets::cleanBuckets ( ) {
|
|
|
|
// what buckets have -1 rdbid???
|
|
if ( m_rdbId < 0 ) return;
|
|
|
|
// the liberation count
|
|
int32_t count = 0;
|
|
|
|
/*
|
|
char buf[50000];
|
|
RdbList list;
|
|
list.set ( NULL,
|
|
0,
|
|
buf,
|
|
50000,
|
|
0, // fixeddatasize
|
|
false, // own data? should rdblist free it
|
|
false, // usehalfkeys
|
|
m_ks);
|
|
*/
|
|
|
|
top:
|
|
|
|
for ( int32_t i = 0; i < m_numBuckets; i++ ) {
|
|
RdbBucket *b = m_buckets[i];
|
|
collnum_t collnum = b->getCollnum();
|
|
CollectionRec *cr = NULL;
|
|
if ( collnum < g_collectiondb.m_numRecs )
|
|
cr = g_collectiondb.m_recs[collnum];
|
|
if ( cr ) continue;
|
|
// count # deleted
|
|
count += b->getNumKeys();
|
|
// delete that coll
|
|
delColl ( collnum );
|
|
// restart
|
|
goto top;
|
|
/*
|
|
int32_t nk = b->getNumKeys();
|
|
for (int32_t j = 0 ; j < nk ; j++ ) {
|
|
char *kp = b->m_keys + j*m_ks;
|
|
// add into list. should just be a gbmemcpy()
|
|
list.addKey ( kp , 0 , NULL );
|
|
*/
|
|
//deleteBucket ( i );
|
|
}
|
|
|
|
// print it
|
|
if ( count == 0 ) return;
|
|
log(LOG_LOGIC,"db: Removed %" INT32 " records from %s buckets "
|
|
"for invalid collection numbers.",count,m_dbname);
|
|
//log(LOG_LOGIC,"db: Records not actually removed for safety. Except "
|
|
// "for those with negative colnums.");
|
|
// static bool s_print = true;
|
|
// if ( ! s_print ) return;
|
|
// s_print = false;
|
|
// log (LOG_LOGIC,"db: This is bad. Did you remove a collection "
|
|
// "subdirectory? Don't do that, you should use the \"delete "
|
|
// "collections\" interface because it also removes records from "
|
|
// "memory, too.");
|
|
}
|
|
|
|
|
|
bool RdbBuckets::delColl(collnum_t collnum) {
|
|
|
|
m_needsSave = true;
|
|
RdbList list;
|
|
int32_t minRecSizes = 1024*1024;
|
|
int32_t numPosRecs = 0;
|
|
int32_t numNegRecs = 0;
|
|
while (1) {
|
|
if(!getList(collnum, KEYMIN(), KEYMAX(), minRecSizes ,
|
|
&list , &numPosRecs , &numNegRecs, false )) {
|
|
if(g_errno == ENOMEM && minRecSizes > 1024) {
|
|
minRecSizes /= 2;
|
|
continue;
|
|
} else {
|
|
log("db: buckets could not delete "
|
|
"collection: %s.",
|
|
mstrerror(errno));
|
|
return false;
|
|
}
|
|
}
|
|
if(list.isEmpty()) break;
|
|
deleteList(collnum, &list);
|
|
}
|
|
|
|
log("buckets: deleted all keys for collnum %" INT32 ,(int32_t)collnum);
|
|
return true;
|
|
}
|
|
|
|
int32_t RdbBuckets::addTree(RdbTree* rt) {
|
|
|
|
int32_t n = rt->getFirstNode();
|
|
int32_t count = 0;
|
|
char* data = NULL;
|
|
|
|
int32_t dataSize = m_fixedDataSize;
|
|
|
|
while ( n >= 0 ) {
|
|
if(m_fixedDataSize != 0) {
|
|
data = rt->getData ( n );
|
|
if(m_fixedDataSize == -1)
|
|
dataSize = rt->getDataSize(n);
|
|
}
|
|
|
|
if(addNode ( rt->getCollnum (n),
|
|
rt->getKey ( n ) ,
|
|
data , dataSize) < 0)
|
|
break;
|
|
n = rt->getNextNode ( n );
|
|
count++;
|
|
}
|
|
log("db: added %" INT32 " keys from tree to buckets for %s.",count, m_dbname);
|
|
return count;
|
|
}
|
|
|
|
|
|
//this could be sped up a lot, but it is only called from repair at
|
|
//the moment.
|
|
bool RdbBuckets::addList(RdbList* list, collnum_t collnum) {
|
|
char listKey[MAX_KEY_BYTES];
|
|
|
|
for( list->resetListPtr();
|
|
!list->isExhausted();
|
|
list->skipCurrentRecord()) {
|
|
|
|
list->getCurrentKey(listKey);
|
|
|
|
if(addNode(collnum,
|
|
listKey ,
|
|
list->getCurrentData() ,
|
|
list->getCurrentDataSize()) < 0)
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
|
|
|
|
//return the total bytes of the list bookended by startKey and endKey
|
|
int64_t RdbBuckets::getListSize ( collnum_t collnum,
|
|
char *startKey , char *endKey ,
|
|
char *minKey , char *maxKey ) {
|
|
|
|
if ( minKey ) KEYSET ( minKey , endKey , m_ks );
|
|
if ( maxKey ) KEYSET ( maxKey , startKey , m_ks );
|
|
|
|
int32_t startBucket = getBucketNum(startKey, collnum);
|
|
if(startBucket > 0 &&
|
|
bucketCmp(startKey, collnum, m_buckets[startBucket-1]) < 0)
|
|
startBucket--;
|
|
|
|
if(startBucket == m_numBuckets ||
|
|
m_buckets[startBucket]->getCollnum() != collnum) return 0;
|
|
|
|
|
|
int32_t endBucket = getBucketNum(endKey, collnum);
|
|
|
|
// not sure if i should have added this: MDW
|
|
//if(bucketCmp(endKey, collnum, m_buckets[startBucket]) <= 0)
|
|
// endBucket = startBucket;
|
|
|
|
if(endBucket == m_numBuckets ||
|
|
m_buckets[endBucket]->getCollnum() != collnum) endBucket--;
|
|
//log(LOG_WARN, "db numBuckets %" INT32 " start %" INT32 " end %" INT32 "",
|
|
//m_numBuckets, startBucket, endBucket);
|
|
|
|
int64_t retval = 0;
|
|
for(int32_t i = startBucket; i <= endBucket; i++) {
|
|
retval += m_buckets[i]->getNumKeys();
|
|
}
|
|
return retval * m_recSize;
|
|
}
|
|
|
|
void *saveBucketsWrapper ( void *state , ThreadEntry *t ) ;
|
|
void threadDoneBucketsWrapper ( void *state , ThreadEntry *t ) ;
|
|
|
|
// . caller should call f->set() himself
|
|
// . we'll open it here
|
|
// . returns false if blocked, true otherwise
|
|
// . sets g_errno on error
|
|
bool RdbBuckets::fastSave ( char *dir ,
|
|
bool useThread ,
|
|
void *state ,
|
|
void (* callback) (void *state) ) {
|
|
if ( g_conf.m_readOnlyMode ) return true;
|
|
// we do not need a save
|
|
if ( ! m_needsSave ) return true;
|
|
// return true if already in the middle of saving
|
|
if ( m_isSaving ) return false;
|
|
|
|
// do not use thread for now!! test it to make sure that was
|
|
// not the problem
|
|
//useThread = false;
|
|
|
|
// save parms
|
|
//m_saveFile = f;
|
|
m_dir = dir;
|
|
m_state = state;
|
|
m_callback = callback;
|
|
// assume no error
|
|
m_saveErrno = 0;
|
|
// no adding to the tree now
|
|
m_isSaving = true;
|
|
|
|
/*
|
|
// skip thread call if we should
|
|
if ( ! useThread ) goto skip;
|
|
// make this a thread now
|
|
if ( g_threads.call ( SAVETREE_THREAD , // threadType
|
|
1 , // niceness
|
|
this ,
|
|
threadDoneBucketsWrapper ,
|
|
saveBucketsWrapper) ) return false;
|
|
// if it failed
|
|
if ( ! g_threads.m_disabled )
|
|
log("db: Thread creation failed. Blocking while "
|
|
"saving buckets. Hurts performance.");
|
|
skip:
|
|
*/
|
|
|
|
// . this returns false and sets g_errno on error
|
|
// . must now lock for each bucket when saving that bucket, but
|
|
// release lock to breathe between buckets
|
|
fastSave_r ();
|
|
|
|
// store save error into g_errno
|
|
g_errno = m_saveErrno;
|
|
// resume adding to the tree
|
|
m_isSaving = false;
|
|
// we do not need to be saved now?
|
|
m_needsSave = false;
|
|
// we did not block
|
|
return true;
|
|
}
|
|
|
|
void *saveBucketsWrapper ( void *state , ThreadEntry *t ) {
|
|
// get this class
|
|
RdbBuckets *THIS = (RdbBuckets *)state;
|
|
// this returns false and sets g_errno on error
|
|
THIS->fastSave_r();
|
|
// now exit the thread, bogus return
|
|
return NULL;
|
|
}
|
|
|
|
// we come here after thread exits
|
|
void threadDoneBucketsWrapper ( void *state , ThreadEntry *t ) {
|
|
// get this class
|
|
RdbBuckets *THIS = (RdbBuckets *)state;
|
|
// store save error into g_errno
|
|
g_errno = THIS->m_saveErrno;
|
|
// . resume adding to the tree
|
|
// . this will also allow other threads to be queued
|
|
// . if we did this at the end of the thread we could end up with
|
|
// an overflow of queued SAVETHREADs
|
|
THIS->m_isSaving = false;
|
|
// we do not need to be saved now?
|
|
THIS->m_needsSave = false;
|
|
// g_errno should be preserved from the thread so if fastSave_r()
|
|
// had an error it will be set
|
|
if ( g_errno )
|
|
log("db: Had error saving tree to disk for %s: %s.",
|
|
THIS->m_dbname,mstrerror(g_errno));
|
|
// . call callback
|
|
if ( THIS->m_callback ) THIS->m_callback ( THIS->m_state );
|
|
}
|
|
|
|
// . returns false and sets g_errno on error
|
|
// . NO USING g_errno IN A DAMN THREAD!!!!!!!!!!!!!!!!!!!!!!!!!
|
|
bool RdbBuckets::fastSave_r() {
|
|
if ( g_conf.m_readOnlyMode ) return true;
|
|
// recover the file
|
|
//BigFile *f = m_saveFile;
|
|
// open it up
|
|
//if ( ! f->open ( O_RDWR | O_CREAT ) )
|
|
// return log("RdbTree::fastSave_r: %s",mstrerror(g_errno));
|
|
// cannot use the BigFile class, since we may be in a thread and it
|
|
// messes with g_errno
|
|
//char *s = m_saveFile->getFilename();
|
|
char s[1024];
|
|
sprintf ( s , "%s/%s-buckets-saving.dat", m_dir , m_dbname );
|
|
int fd = ::open ( s ,
|
|
O_RDWR | O_CREAT | O_TRUNC ,
|
|
getFileCreationFlags() );
|
|
// S_IRUSR | S_IWUSR |
|
|
// S_IRGRP | S_IWGRP | S_IROTH);
|
|
if ( fd < 0 ) {
|
|
m_saveErrno = errno;
|
|
return log("db: Could not open %s for writing: %s.",
|
|
s,mstrerror(errno));
|
|
}
|
|
// clear our own errno
|
|
errno = 0;
|
|
// . save the header
|
|
// . force file head to the 0 byte in case offset was elsewhere
|
|
int64_t offset = 0;
|
|
|
|
offset = fastSaveColl_r(fd, offset);
|
|
|
|
// close it up
|
|
close ( fd );
|
|
// now fucking rename it
|
|
char s2[1024];
|
|
sprintf ( s2 , "%s/%s-buckets-saved.dat", m_dir , m_dbname );
|
|
::rename ( s , s2 ) ; //fuck yeah!
|
|
// info
|
|
log("db RdbBuckets saved %" INT32 " keys, %" INT64 " bytes for %s",
|
|
getNumKeys(), offset, m_dbname);
|
|
|
|
return offset >= 0;
|
|
}
|
|
|
|
int64_t RdbBuckets::fastSaveColl_r(int fd, int64_t offset) {
|
|
if(m_numKeysApprox == 0) return offset;
|
|
int32_t version = SAVE_VERSION;
|
|
int32_t err = 0;
|
|
if ( pwrite ( fd , &version, sizeof(int32_t) , offset ) != 4 ) err=errno;
|
|
offset += sizeof(int32_t);
|
|
|
|
if ( pwrite ( fd , &m_numBuckets, sizeof(int32_t) , offset)!=4)err=errno;
|
|
offset += sizeof(int32_t);
|
|
if ( pwrite ( fd , &m_maxBuckets, sizeof(int32_t) , offset)!=4)err=errno;
|
|
offset += sizeof(int32_t);
|
|
|
|
if ( pwrite ( fd , &m_ks, sizeof(uint8_t) , offset ) != 1) err=errno;
|
|
offset += sizeof(uint8_t);
|
|
if ( pwrite ( fd , &m_fixedDataSize,sizeof(int32_t),offset)!=4) err=errno;
|
|
offset += sizeof(int32_t);
|
|
if ( pwrite ( fd , &m_recSize, sizeof(int32_t) , offset ) != 4) err=errno;
|
|
offset += sizeof(int32_t);
|
|
if ( pwrite ( fd , &m_numKeysApprox,sizeof(int32_t),offset) !=4)err=errno;
|
|
offset += sizeof(int32_t);
|
|
if ( pwrite ( fd , &m_numNegKeys,sizeof(int32_t),offset) != 4 ) err=errno;
|
|
offset += sizeof(int32_t);
|
|
|
|
if ( pwrite ( fd,&m_dataMemOccupied,sizeof(int32_t),offset)!=4)err=errno;
|
|
offset += sizeof(int32_t);
|
|
|
|
int32_t tmp = BUCKET_SIZE;
|
|
if ( pwrite ( fd , &tmp, sizeof(int32_t) , offset ) != 4 ) err=errno;
|
|
offset += sizeof(int32_t);
|
|
|
|
// int32_t len = gbstrlen(m_dbname) + 1;
|
|
// pwrite ( fd , &m_dbname, len , offset );
|
|
// offset += len;
|
|
|
|
// set it
|
|
if ( err ) errno = err;
|
|
|
|
// bitch on error
|
|
if ( errno ) {
|
|
m_saveErrno = errno;
|
|
close ( fd );
|
|
log("db: Failed to save buckets for %s: %s.",
|
|
m_dbname,mstrerror(errno));
|
|
return -1;
|
|
}
|
|
// position to store into m_keys, ...
|
|
for (int32_t i = 0; i < m_numBuckets; i++ ) {
|
|
offset = m_buckets[i]->fastSave_r(fd, offset);
|
|
// returns -1 on error
|
|
if ( offset < 0 ) {
|
|
close ( fd );
|
|
m_saveErrno = errno;
|
|
log("db: Failed to save buckets for %s: %s.",
|
|
m_dbname,mstrerror(errno));
|
|
return -1;
|
|
}
|
|
}
|
|
return offset;
|
|
}
|
|
|
|
|
|
bool RdbBuckets::loadBuckets ( char* dbname) {
|
|
char filename[256];
|
|
sprintf(filename,"%s-buckets-saved.dat",dbname);
|
|
// set this to false
|
|
// msg
|
|
//log (0,"Rdb::loadTree: loading %s",filename);
|
|
// set a BigFile to this filename
|
|
BigFile file;//g_hostdb.m_dir
|
|
char *dir = g_hostdb.m_dir;
|
|
if( *dir == '\0') dir = ".";
|
|
file.set ( dir , filename , NULL );
|
|
if ( file.doesExist() <= 0 ) return true;
|
|
// load the table with file named "THISDIR/saved"
|
|
bool status = false ;
|
|
status = fastLoad ( &file , dbname ) ;
|
|
file.close();
|
|
return status;
|
|
}
|
|
|
|
bool RdbBuckets::fastLoad ( BigFile *f , char* dbname) {
|
|
// msg
|
|
log(LOG_INIT,"db: Loading %s.",f->getFilename());
|
|
// open it up
|
|
if ( ! f->open ( O_RDONLY ) ) return false;
|
|
int32_t fsize = f->getFileSize();
|
|
if ( fsize == 0 ) return true;
|
|
|
|
// init offset
|
|
int64_t offset = 0;
|
|
|
|
offset = fastLoadColl(f, dbname, offset);
|
|
|
|
if ( offset < 0 ) {
|
|
log("db: Failed to load buckets for %s: %s.",
|
|
m_dbname,mstrerror(g_errno));
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
int64_t RdbBuckets::fastLoadColl( BigFile *f,
|
|
char *dbname,
|
|
int64_t offset ) {
|
|
int32_t maxBuckets;
|
|
int32_t numBuckets;
|
|
int32_t version;
|
|
|
|
f->read ( &version,sizeof(int32_t), offset );
|
|
offset += sizeof(int32_t);
|
|
if(version > SAVE_VERSION) {
|
|
log("db: Failed to load buckets for %s: "
|
|
"saved version is in the future or is corrupt, "
|
|
"please restart old executable and do a ddump.",
|
|
m_dbname);
|
|
return -1;
|
|
}
|
|
|
|
f->read ( &numBuckets,sizeof(int32_t), offset );
|
|
offset += sizeof(int32_t);
|
|
|
|
f->read ( &maxBuckets,sizeof(int32_t), offset );
|
|
offset += sizeof(int32_t);
|
|
|
|
f->read ( &m_ks,sizeof(uint8_t), offset );
|
|
offset += sizeof(uint8_t);
|
|
|
|
f->read ( &m_fixedDataSize,sizeof(int32_t), offset );
|
|
offset += sizeof(int32_t);
|
|
|
|
f->read ( &m_recSize,sizeof(int32_t), offset );
|
|
offset += sizeof(int32_t);
|
|
|
|
f->read ( &m_numKeysApprox,sizeof(int32_t), offset );
|
|
offset += sizeof(int32_t);
|
|
|
|
f->read ( &m_numNegKeys,sizeof(int32_t), offset );
|
|
offset += sizeof(int32_t);
|
|
|
|
f->read ( &m_dataMemOccupied, sizeof(int32_t), offset );
|
|
offset += sizeof(int32_t);
|
|
|
|
int32_t bucketSize;
|
|
f->read ( &bucketSize, sizeof(int32_t), offset );
|
|
offset += sizeof(int32_t);
|
|
|
|
|
|
if(bucketSize != BUCKET_SIZE) {
|
|
log("db: It appears you have changed the bucket size "
|
|
"please restart the old executable and dump "
|
|
"buckets to disk. old=%" INT32 " new=%" INT32 "",
|
|
bucketSize, (int32_t)BUCKET_SIZE);
|
|
char *xx = NULL; *xx = 0;
|
|
}
|
|
|
|
m_dbname = dbname;
|
|
|
|
if ( g_errno )
|
|
return -1;
|
|
|
|
for (int32_t i = 0; i < numBuckets; i++ ) {
|
|
m_buckets[i] = bucketFactory();
|
|
if(m_buckets[i] == NULL) return -1;
|
|
offset = m_buckets[i]->fastLoad(f, offset);
|
|
// returns -1 on error
|
|
if ( offset < 0 )
|
|
return -1;
|
|
m_numBuckets++;
|
|
}
|
|
return offset;
|
|
}
|
|
|
|
// max key size -- posdb, 18 bytes, so use 18 here
|
|
#define BTMP_SIZE (BUCKET_SIZE*18+1000)
|
|
|
|
int64_t RdbBucket::fastSave_r(int fd, int64_t offset) {
|
|
|
|
// first copy to a buf before saving so we can unlock!
|
|
char tmp[BTMP_SIZE];
|
|
char *p = tmp;
|
|
|
|
gbmemcpy ( p , &m_collnum, sizeof(collnum_t) );
|
|
p += sizeof(collnum_t);
|
|
//pwrite ( fd , &m_collnum, sizeof(collnum_t) , offset );
|
|
//offset += sizeof(collnum_t);
|
|
|
|
gbmemcpy ( p , &m_numKeys, sizeof(int32_t) );
|
|
p += sizeof(m_numKeys);
|
|
//pwrite ( fd , &m_numKeys, sizeof(int32_t) , offset );
|
|
//offset += sizeof(m_numKeys);
|
|
|
|
gbmemcpy ( p , &m_lastSorted, sizeof(int32_t) );
|
|
p += sizeof(m_lastSorted);
|
|
//pwrite ( fd , &m_lastSorted, sizeof(int32_t) , offset );
|
|
//offset += sizeof(m_lastSorted);
|
|
|
|
int32_t endKeyOffset = m_endKey - m_keys;
|
|
gbmemcpy ( p , &endKeyOffset, sizeof(int32_t) );
|
|
p += sizeof(int32_t);
|
|
//pwrite ( fd , &endKeyOffset, sizeof(int32_t) , offset );
|
|
//offset += sizeof(int32_t);
|
|
|
|
int32_t recSize = m_parent->getRecSize();
|
|
|
|
gbmemcpy ( p , m_keys, recSize*m_numKeys );
|
|
p += recSize*m_numKeys;
|
|
//pwrite ( fd , m_keys, recSize*m_numKeys , offset );
|
|
//offset += recSize*m_numKeys;
|
|
|
|
int32_t size = p - tmp;
|
|
if ( size > BTMP_SIZE ) {
|
|
log("buckets: btmp_size too small. keysize>18 bytes?");
|
|
char *xx=NULL;*xx=0;
|
|
}
|
|
|
|
// now we can save it without fear of being interrupted and having
|
|
// the bucket altered
|
|
errno = 0;
|
|
if ( pwrite ( fd , tmp , size , offset ) != size ) {
|
|
log("db:fastSave_r: %s.",mstrerror(errno));
|
|
return -1;
|
|
}
|
|
|
|
return offset + size;
|
|
}
|
|
|
|
int64_t RdbBucket::fastLoad(BigFile *f, int64_t offset) {
|
|
//errno = 0;
|
|
|
|
f->read ( &m_collnum,sizeof(collnum_t), offset );
|
|
offset += sizeof(collnum_t);
|
|
|
|
f->read ( &m_numKeys,sizeof(int32_t), offset );
|
|
offset += sizeof(int32_t);
|
|
|
|
f->read ( &m_lastSorted,sizeof(int32_t), offset );
|
|
offset += sizeof(int32_t);
|
|
|
|
int32_t endKeyOffset;
|
|
f->read ( &endKeyOffset,sizeof(int32_t), offset );
|
|
offset += sizeof(int32_t);
|
|
|
|
int32_t recSize = m_parent->getRecSize();
|
|
|
|
f->read ( m_keys,recSize*m_numKeys, offset );
|
|
offset += recSize*m_numKeys;
|
|
|
|
m_endKey = m_keys + endKeyOffset;
|
|
if ( g_errno ) {
|
|
log("bucket: fastload %s",mstrerror(g_errno));
|
|
return -1;
|
|
}
|
|
|
|
return offset;
|
|
}
|