#include "RdbTree.h" #include "Collectiondb.h" #include "JobScheduler.h" #include "Mem.h" #include "RdbMem.h" #include "BigFile.h" #include "RdbList.h" #include "Spider.h" #include "Process.h" #include "Conf.h" #include "ScopedWriteLock.h" #include "ScopedReadLock.h" #include <fcntl.h> RdbTree::RdbTree () { m_collnums= NULL; m_keys = NULL; m_data = NULL; m_sizes = NULL; m_left = NULL; m_right = NULL; m_parents = NULL; m_depth = NULL; m_headNode = -1; m_numNodes = 0; m_numUsedNodes = 0; m_memAllocated = 0; m_memOccupied = 0; m_nextNode = 0; m_minUnusedNode = 0; m_fixedDataSize = -1; // variable dataSize, depends on individual node m_needsSave = false; m_pickRight = false; // PVS-Studio memset(m_dir, 0, sizeof(m_dir)); memset(m_memTag, 0, sizeof(m_memTag)); m_state = NULL; m_callback = NULL; m_ownData = false; m_overhead = 0; m_maxMem = 0; m_allocName = NULL; m_bytesWritten = 0; m_bytesRead = 0; m_errno = 0; m_ks = 0; m_corrupt = 0; // before resetting... we have to set this so clear() won't breach buffers m_rdbId = -1; reset_unlocked(); } RdbTree::~RdbTree ( ) { reset_unlocked(); } // "memMax" includes records plus the overhead bool RdbTree::set(int32_t fixedDataSize, int32_t maxNumNodes, int32_t memMax, bool ownData, const char *allocName, const char *dbname, char keySize, char rdbId) { ScopedWriteLock sl(m_rwlock); reset_unlocked(); m_fixedDataSize = fixedDataSize; m_maxMem = memMax; m_ownData = ownData; m_allocName = allocName; m_ks = keySize; m_needsSave = false; m_dbname[0] = '\0'; if ( dbname ) { int32_t dlen = strlen(dbname); if ( dlen > 30 ) dlen = 30; gbmemcpy(m_dbname,dbname,dlen); m_dbname[dlen] = '\0'; } // a malloc tag, must be LESS THAN 16 bytes including the NULL char *p = m_memTag; gbmemcpy ( p , "RdbTree" , 7 ); p += 7; if ( dbname ) strncpy ( p , dbname , 8 ); p += 8; *p++ = '\0'; // set rdbid m_rdbId = rdbId; // -1; // sanity if ( rdbId < -1 ) { g_process.shutdownAbort(true); } if ( rdbId >= RDB_END ) { g_process.shutdownAbort(true); } // adjust m_maxMem to virtual infinity if it was -1 if ( m_maxMem < 0 ) m_maxMem = 0x7fffffff; // . compute each node's memory overhead // . size of a key/left/right/parent m_overhead = (m_ks + 4*3 ); // include collection number, currently an uint16_t m_overhead += sizeof(collnum_t); // if we're a non-zero data length include a dataptr (-1 means variabl) if ( m_fixedDataSize != 0 ) m_overhead += 4; // include dataSize if our dataSize is variable (-1) if ( m_fixedDataSize == -1 ) m_overhead += 4; // if we're balanced include 1 byte per node for the depth m_overhead += 1; if( maxNumNodes == -1) { maxNumNodes = m_maxMem / m_overhead; if(maxNumNodes > 10000000) maxNumNodes = 10000000; } // allocate the nodes return growTree_unlocked(maxNumNodes); } void RdbTree::reset() { ScopedWriteLock sl(m_rwlock); reset_unlocked(); } void RdbTree::reset_unlocked() { // make sure string is NULL temrinated. this strlen() should if ( m_numNodes > 0 && m_dbname[0] && // don't be spammy we can have thousands of these, one per coll strcmp(m_dbname,"waitingtree") != 0 ) log(LOG_INFO,"db: Resetting tree for %s.",m_dbname); // liberate all the nodes clear_unlocked(); // do not require saving after a reset m_needsSave = false; // now free all the overhead structures of this tree int32_t n = m_numNodes; // free array of collectio numbers (shorts for now) if ( m_collnums) mfree ( m_collnums, sizeof(collnum_t) *n,m_allocName); // free the array of keys if ( m_keys ) mfree ( m_keys , m_ks * n , m_allocName ); // free the data ptrs if ( m_data ) mfree ( m_data , sizeof(char *) * n , m_allocName ); // free the array of dataSizes if ( m_sizes ) mfree ( m_sizes , n * 4 , m_allocName ); // free the sorted node #'s if ( m_left ) mfree ( m_left , n * 4 ,m_allocName); if ( m_right ) mfree ( m_right , n * 4 ,m_allocName); if ( m_parents ) mfree ( m_parents , n * 4 ,m_allocName); if ( m_depth ) mfree ( m_depth , n ,m_allocName); m_collnums = NULL; m_keys = NULL; m_data = NULL; m_sizes = NULL; m_left = NULL; m_right = NULL; m_parents = NULL; m_depth = NULL; // tree description vars m_headNode = -1; m_numNodes = 0; m_numUsedNodes = 0; m_memAllocated = 0; m_memOccupied = 0; m_nextNode = 0; m_minUnusedNode = 0; m_fixedDataSize = -1; // variable dataSize, depends on individual node // clear counts m_numNegativeKeys = 0; m_numPositiveKeys = 0; m_isSaving = false; } void RdbTree::delColl ( collnum_t collnum ) { ScopedWriteLock sl(m_rwlock); m_needsSave = true; const char *startKey = KEYMIN(); const char *endKey = KEYMAX(); deleteNodes_unlocked(collnum, startKey, endKey, true/*freeData*/) ; } int32_t RdbTree::clear() { ScopedWriteLock sl(m_rwlock); return clear_unlocked(); } // . this just makes all the nodes available for occupation (liberates them) // . it does not free this tree's control structures // . returns # of occupied nodes we liberated int32_t RdbTree::clear_unlocked( ) { if ( m_numUsedNodes > 0 ) m_needsSave = true; // the liberation count int32_t count = 0; // liberate all of our nodes int32_t dataSize = m_fixedDataSize; for ( int32_t i = 0 ; i < m_minUnusedNode ; i++ ) { // skip node if parents is -2 (unoccupied) if ( m_parents[i] == -2 ) continue; // we no longer count the overhead of this node as occupied m_memOccupied -= m_overhead; // make the ith node available for occupation m_parents[i] = -2; // keep count count++; // continue if we have no data to free if ( ! m_data ) continue; // read dataSize from m_sizes[i] if it's not fixed if ( m_fixedDataSize == -1 ) dataSize = m_sizes[i]; // free the data being pointed to if ( m_ownData ) mfree ( m_data[i] , dataSize ,m_allocName); // adjust our reported memory usage m_memAllocated -= dataSize; m_memOccupied -= dataSize; } // reset all these m_headNode = -1; m_numUsedNodes = 0; m_nextNode = 0; m_minUnusedNode = 0; // clear counts m_numNegativeKeys = 0; m_numPositiveKeys = 0; // clear tree counts for all collections! int32_t nc = g_collectiondb.getNumRecs(); // BUT only if we are an Rdb::m_tree!!! if ( m_rdbId == -1 ) nc = 0; // otherwise, we overwrite stuff in CollectionRec we shouldn't for ( int32_t i = 0 ; i < nc ; i++ ) { CollectionRec *cr = g_collectiondb.getRec(i); if ( ! cr ) continue; if ( m_rdbId < 0 ) continue; //if (((unsigned char)m_rdbId)>=RDB_END){g_process.shutdownAbort(true); } cr->m_numNegKeysInTree[(unsigned char)m_rdbId] = 0; cr->m_numPosKeysInTree[(unsigned char)m_rdbId] = 0; } return count; } bool RdbTree::getNode(collnum_t collnum, const char *key) const { ScopedReadLock sl(m_rwlock); return (getNode_unlocked(collnum, key) >= 0); } // . used by cache // . wrapper for getNode_unlocked() int32_t RdbTree::getNode_unlocked(collnum_t collnum, const char *key) const { m_rwlock.verify_is_locked(); int32_t i = m_headNode; // get the node (about 4 cycles per loop, 80cycles for 1 million items) while (i != -1) { if (collnum < m_collnums[i]) { i = m_left[i]; continue; } if (collnum > m_collnums[i]) { i = m_right[i]; continue; } int cmp = KEYCMP(key, 0, m_keys, i, m_ks); if (cmp < 0) { i = m_left[i]; continue; } if (cmp > 0) { i = m_right[i]; continue; } return i; } return -1; } // . returns node # whose key is >= "key" // . returns -1 if none // . used by RdbTree::getList() // . TODO: spiderdb stores records by time so our unbalanced tree really hurts // us for that. // . TODO: keep a m_lastStartNode and start from that since it tends to only // increase startKey via Msg3. if the key at m_lastStartNode is <= // the provided key then we did well. int32_t RdbTree::getNextNode_unlocked(collnum_t collnum, const char *key) const { m_rwlock.verify_is_locked(); // return -1 if no non-empty nodes in the tree if ( m_headNode < 0 ) return -1; // get the node (about 4 cycles per loop, 80cycles for 1 million items) int32_t parent; int32_t i = m_headNode ; while ( i != -1 ) { parent = i; if ( collnum < m_collnums[i] ) { i = m_left [i]; continue;} if ( collnum > m_collnums[i] ) { i = m_right[i]; continue;} int cmp = KEYCMP(key,0,m_keys,i,m_ks); if (cmp<0) { i = m_left [i]; continue;} if (cmp>0) { i = m_right[i]; continue;} return i; } if ( m_collnums [ parent ] > collnum ) return parent; if ( m_collnums [ parent ] == collnum && //m_keys [ parent ] > key ) KEYCMP(m_keys,parent,key,0,m_ks)>0 ) return parent; return getNextNode_unlocked(parent); } int32_t RdbTree::getFirstNode_unlocked() const { m_rwlock.verify_is_locked(); const char *k = KEYMIN(); return getNextNode_unlocked(0, k); } int32_t RdbTree::getLastNode_unlocked() const { m_rwlock.verify_is_locked(); const char *k = KEYMAX(); return getPrevNode_unlocked((collnum_t)0x7fff, k); } // . get the node whose key is <= "key" // . returns -1 if none int32_t RdbTree::getPrevNode_unlocked(collnum_t collnum, const char *key) const { m_rwlock.verify_is_locked(); // return -1 if no non-empty nodes in the tree if ( m_headNode < 0 ) return -1; // get the node (about 4 cycles per loop, 80cycles for 1 million items) int32_t parent; int32_t i = m_headNode ; while ( i != -1 ) { parent = i; if ( collnum < m_collnums[i] ) { i = m_left [i]; continue;} if ( collnum > m_collnums[i] ) { i = m_right[i]; continue;} int cmp = KEYCMP(key,0,m_keys,i,m_ks); if ( cmp<0) {i=m_left [i];continue;} if ( cmp>0) {i=m_right[i];continue;} return i; } if ( m_collnums [ parent ] < collnum ) return parent; if ( m_collnums [ parent ] == collnum && //m_keys [ parent ] < key ) KEYCMP(m_keys,parent,key,0,m_ks) < 0 ) return parent; return getPrevNode_unlocked(parent); } const char* RdbTree::getKey_unlocked(int32_t node) const { m_rwlock.verify_is_locked(); return &m_keys[node*m_ks]; } const char* RdbTree::getData(collnum_t collnum, const char *key) const { ScopedReadLock sl(m_rwlock); int32_t n = getNode_unlocked(collnum, key); if (n < 0) return NULL; return m_data[n]; }; int32_t RdbTree::getDataSize_unlocked(int32_t node) const { m_rwlock.verify_is_locked(); if (m_fixedDataSize == -1) { return m_sizes[node]; } return m_fixedDataSize; } // . "i" is the previous node number // . we could eliminate m_parents[] array if we limited tree depth! // . 24 cycles to get the first kid // . averages around 50 cycles per call probably // . 8 cycles are spent entering/exiting this subroutine (inline it? TODO) int32_t RdbTree::getNextNode_unlocked(int32_t i) const { m_rwlock.verify_is_locked(); // cruise the kids if we have a right one if ( m_right[i] >= 0 ) { // go to the right kid i = m_right [ i ]; // now go left as much as we can while ( m_left [ i ] >= 0 ) i = m_left [ i ]; // return that node (it's a leaf or has one right kid) return i; } // now keep getting parents until one has a key bigger than i's key int32_t p = m_parents[i]; // if parent is negative we're done if ( p < 0 ) return -1; // if we're the left kid of the parent, then the parent is the // next biggest node if ( m_left[p] == i ) return p; // otherwise keep getting the parent until it has a bigger key // or until we're the LEFT kid of the parent. that's better // cuz comparing keys takes longer. loop is 6 cycles per iteration. while ( p >= 0 && (m_collnums[p] < m_collnums[i] || ( m_collnums[p] == m_collnums[i] && KEYCMP(m_keys,p,m_keys,i,m_ks) < 0 )) ) p = m_parents[p]; // p will be -1 if none are left return p; } // . "i" is the next node number int32_t RdbTree::getPrevNode_unlocked(int32_t i) const { m_rwlock.verify_is_locked(); // cruise the kids if we have a left one if ( m_left[i] >= 0 ) { // go to the left kid i = m_left [ i ]; // now go right as much as we can while ( m_right [ i ] >= 0 ) i = m_right [ i ]; // return that node (it's a leaf or has one left kid) return i; } // now keep getting parents until one has a key bigger than i's key int32_t p = m_parents[i]; // if we're the right kid of the parent, then the parent is the // next least node if ( m_right[p] == i ) return p; // keep getting the parent until it has a bigger key // or until we're the RIGHT kid of the parent. that's better // cuz comparing keys takes longer. loop is 6 cycles per iteration. while ( p >= 0 && (m_collnums[p] > m_collnums[i] || ( m_collnums[p] == m_collnums[i] && KEYCMP(m_keys,p,m_keys,i,m_ks) > 0 )) ) p = m_parents[p]; // p will be -1 if none are left return p; } bool RdbTree::addKey(const void *key) { ScopedWriteLock sl(m_rwlock); return (addKey_unlocked(key) >= 0); } int32_t RdbTree::addKey_unlocked(const void *key) { m_rwlock.verify_is_write_locked(); return addNode_unlocked ( 0,(const char *)key,NULL,0); } bool RdbTree::addNode(collnum_t collnum, const char *key, char *data, int32_t dataSize) { ScopedWriteLock sl(m_rwlock); return (addNode_unlocked(collnum, key, data, dataSize) >= 0); } // . returns -1 if we coulnd't allocate the new space and sets g_errno to ENOMEM // or ETREENOGROW, ... // . returns node # we added it to on success // . this will replace any current node with the same key // . sets retNode to the node we added the data to (used internally) // . negative dataSizes should be interpreted as 0 // . probably about 120 cycles per add means we can add 2 million per sec // . NOTE: does not check to see if it will exceed m_maxMem int32_t RdbTree::addNode_unlocked ( collnum_t collnum , const char *key , char *data , int32_t dataSize ) { m_rwlock.verify_is_write_locked(); // if there's no more available nodes, error out if ( m_numUsedNodes >= m_numNodes) { g_errno = ENOMEM; return -1; } // we need to be saved now m_needsSave = true; // sanity check - no empty positive keys for doledb if ( m_rdbId == RDB_DOLEDB && dataSize == 0 && (key[0]&0x01) == 0x01){ g_process.shutdownAbort(true); } // for posdb if ( m_ks == 18 &&(key[0] & 0x06) ) {g_process.shutdownAbort(true); } // set up vars int32_t iparent ; int32_t rightGuy; // this is -1 iff there are no nodes used in the tree int32_t i = m_headNode; // . find the parent of node i and call it "iparent" // . if a node exists with our key then replace it while ( i != -1 ) { iparent = i; if ( collnum < m_collnums[i] ) { i = m_left [i]; continue;} if ( collnum > m_collnums[i] ) { i = m_right[i]; continue;} int cmp = KEYCMP(key, 0, m_keys, i, m_ks); if (cmp < 0) { i = m_left[i]; } else if (cmp > 0) { i = m_right[i]; } else { goto replaceIt; } } // . this overhead is key/left/right/parent // . we inc it by the data and sizes array if we need to below m_memOccupied += m_overhead; // point i to the next available node i = m_nextNode; // debug msg //if ( m_dbname && m_dbname[0]=='t' && dataSize >= 4 ) // logf(LOG_DEBUG, // "adding node #%" PRId32" with data ptr at %" PRIx32" " // "and data size of %" PRId32" into a list.", // i,data,dataSize); // if we're the first node we become the head node and our parent is -1 if ( m_numUsedNodes == 0 ) { m_headNode = i; iparent = -1; // ensure these are right m_numNegativeKeys = 0; m_numPositiveKeys = 0; // we only use these stats for Rdb::m_trees for a // PER COLLECTION count, since there can be multiple // collections using the same Rdb::m_tree! // crap, when fixing a tree this will segfault because // m_recs[collnum] is NULL. if ( m_rdbId >= 0 && g_collectiondb.getRec(collnum) ) { //if( ((unsigned char)m_rdbId)>=RDB_END){g_process.shutdownAbort(true); } g_collectiondb.getRec(collnum)-> m_numNegKeysInTree[(unsigned char)m_rdbId] =0; g_collectiondb.getRec(collnum)-> m_numPosKeysInTree[(unsigned char)m_rdbId] =0; } } // stick ourselves in the next available node, "m_nextNode" //m_keys [ i ] = key; KEYSET ( &m_keys[i*m_ks] , key , m_ks ); m_parents [ i ] = iparent; // save collection number now, too m_collnums [ i ] = collnum; // add the key // set the data and size only if we need to if ( m_fixedDataSize != 0 ) { m_data [ i ] = data; // ack used and occupied mem if ( m_fixedDataSize >= 0 ) { m_memAllocated += m_fixedDataSize; m_memOccupied += m_fixedDataSize; } else { m_memAllocated += dataSize ; m_memOccupied += dataSize ; } // we may have a variable size of data as well if ( m_fixedDataSize == -1 ) m_sizes [ i ] = dataSize; } // make our parent, if any, point to us if ( iparent >= 0 ) { if ( collnum < m_collnums[iparent] ) m_left [iparent] = i; else if (collnum==m_collnums[iparent]&&//key<m_keys[iparent] ) KEYCMP(key,0,m_keys,iparent,m_ks)<0 ) m_left [iparent] = i; else m_right[iparent] = i; } // . the right kid of an empty node is used as a linked list of // empty nodes formed by deleting nodes // . we keep the linked list so we can re-used these vacated nodes rightGuy = m_right [ i ]; // our kids are -1 (none) m_left [ i ] = -1; m_right [ i ] = -1; // . if we weren't recycling a node then advance to next // . m_minUnusedNode is the lowest node number that was never filled // at any one time in the past // . you might call it the brand new housing district if ( m_nextNode == m_minUnusedNode ) {m_nextNode++; m_minUnusedNode++;} // . otherwise, we're in a linked list of vacated used houses // . we have a linked list in the right kid // . make sure the new head doesn't have a left else { // point m_nextNode to the next available used house, if any if ( rightGuy >= 0 ) m_nextNode = rightGuy; // otherwise point it to the next brand new house (TODO:REMOVE) // this is an error, try to fix the tree else { log(LOG_WARN, "db: Encountered corruption in tree while trying to add a record. " "You should replace your memory sticks."); if ( !fixTree_unlocked() ) { g_process.shutdownAbort(true); } } } // we have one more used node increaseNodeCount_unlocked(collnum, key); // our depth is now 1 since we're a leaf node // (we include ourself) m_depth [ i ] = 1; // . reset depths starting at i's parent and ascending the tree // . will balance if child depths differ by 2 or more setDepths_unlocked(iparent); // return the node number of the node we occupied return i; // come here to replace node i with the new data/dataSize replaceIt: // debug msg //fprintf(stderr,"replaced it!\n"); // if we don't support any data then we're done if ( m_fixedDataSize == 0 ) { return i; } // get dataSize int32_t oldDataSize = m_fixedDataSize; // if datasize was 0 cuz it was a negative key, fix that for // calculating m_memOccupied if ( m_fixedDataSize >= 0 ) dataSize = m_fixedDataSize; if ( m_fixedDataSize < 0 ) oldDataSize = m_sizes[i]; // free i's data if ( m_data[i] && m_ownData ) mfree ( m_data[i] , oldDataSize ,m_allocName); // decrease mem occupied and increase by new size m_memOccupied -= oldDataSize; m_memOccupied += dataSize; m_memAllocated -= oldDataSize; m_memAllocated += dataSize; // otherwise set the data m_data [ i ] = data; // set the size if we need to as well if ( m_fixedDataSize < 0 ) m_sizes [ i ] = dataSize; return i; } bool RdbTree::deleteNode(collnum_t collnum, const char *key, bool freeData) { ScopedWriteLock sl(m_rwlock); return deleteNode_unlocked(collnum, key, freeData); } bool RdbTree::deleteNode_unlocked(collnum_t collnum, const char *key, bool freeData) { int32_t node = getNode_unlocked(collnum, key); if (node == -1) { return false; } return deleteNode_unlocked(node, freeData); } // delete all nodes with keys in [startKey,endKey] void RdbTree::deleteNodes_unlocked(collnum_t collnum, const char *startKey, const char *endKey, bool freeData) { m_rwlock.verify_is_write_locked(); int32_t node = getNextNode_unlocked(collnum, startKey); while ( node >= 0 ) { //int32_t next = getNextNode_unlocked ( node ); if ( m_collnums[node] != collnum ) break; //if ( m_keys [node] > endKey ) return; if ( KEYCMP(m_keys,node,endKey,0,m_ks) > 0 ) break; deleteNode_unlocked(node, freeData); // rotation in setDepths_unlocked() will cause him to be replaced // with one of his kids, unless he's a leaf node //node = next; node = getNextNode_unlocked(collnum, startKey); } } void RdbTree::increaseNodeCount_unlocked(collnum_t collNum, const char *key) { m_rwlock.verify_is_locked(); m_numUsedNodes++; if (KEYNEG(key)) { m_numNegativeKeys++; if (m_rdbId >= 0) { CollectionRec *cr = g_collectiondb.getRec(collNum); if (cr) { cr->m_numNegKeysInTree[(unsigned char)m_rdbId]++; } } } else { m_numPositiveKeys++; if (m_rdbId >= 0) { CollectionRec *cr = g_collectiondb.getRec(collNum); if (cr) { cr->m_numPosKeysInTree[(unsigned char)m_rdbId]++; } } } } void RdbTree::decreaseNodeCount_unlocked(collnum_t collNum, const char *key) { m_rwlock.verify_is_write_locked(); // we have one less used node m_numUsedNodes--; // update sign counts if (KEYNEG(key)) { m_numNegativeKeys--; if (m_rdbId >= 0) { CollectionRec *cr = g_collectiondb.getRec(collNum); if (cr) { cr->m_numNegKeysInTree[(unsigned char)m_rdbId]--; } } } else { m_numPositiveKeys--; if (m_rdbId >= 0) { CollectionRec *cr = g_collectiondb.getRec(collNum); if (cr) { cr->m_numPosKeysInTree[(unsigned char)m_rdbId]--; } } } } // . now replace node #i with node #j // . i should not equal j at this point bool RdbTree::replaceNode_unlocked(int32_t i, int32_t j) { m_rwlock.verify_is_locked(); // . j's parent should take j's one kid // . that child should likewise point to j's parent // . j should only have <= 1 kid now because of our algorithm above // . if j's parent is i then j keeps his kid int32_t jparent = m_parents[j]; if ( jparent != i ) { // parent: if j is my left kid, then i take j's right kid // otherwise, if j is my right kid, then i take j's left kid if ( m_left [ jparent ] == j ) { m_left [ jparent ] = m_right [ j ]; if (m_right[j]>=0) m_parents [ m_right[j] ] = jparent; } else { m_right [ jparent ] = m_left [ j ]; if (m_left [j]>=0) m_parents [ m_left[j] ] = jparent; } } // . j inherits i's children (providing i's child is not j) // . those children's parent should likewise point to j if ( m_left [i] != j ) { m_left [j] = m_left [i]; if ( m_left[j] >= 0 ) m_parents[m_left [j]] = j; } if ( m_right[i] != j ) { m_right[j] = m_right[i]; if ( m_right[j] >= 0 ) m_parents[m_right[j]] = j; } // j becomes the kid of i's parent, if any int32_t iparent = m_parents[i]; if ( iparent >= 0 ) { if ( m_left[iparent] == i ) m_left [iparent] = j; else m_right[iparent] = j; } // iparent may be -1 m_parents[j] = iparent; // if i was the head node now j becomes the head node if ( m_headNode == i ) m_headNode = j; // . i joins the linked list of available used homes // . put it at the head of the list // . "m_nextNode" is the head node of the linked list m_right[i] = m_nextNode; m_nextNode = i; // . i's parent should be -2 so we know it's unused in case we're // stepping through the nodes linearly for dumping in RdbDump // . used in getListUnordered() m_parents[i] = -2; // we have one less used node decreaseNodeCount_unlocked(m_collnums[i], getKey_unlocked(i)); // our depth becomes that of the node we replaced, unless moving j // up to i decreases the total depth, in which case setDepths_unlocked() fixes m_depth [ j ] = m_depth [ i ]; // . recalculate depths starting at old parent of j // . stops at the first node to have the correct depth // . will balance at pivot nodes that need it if ( jparent != i ) setDepths_unlocked(jparent); else setDepths_unlocked(j); // TODO: register growTree with g_mem to free on demand return true; } // . deletes node i from the tree // . i's parent should point to i's left or right kid // . if i has no parent then his left or right kid becomes the new top node bool RdbTree::deleteNode_unlocked(int32_t i, bool freeData) { m_rwlock.verify_is_write_locked(); // watch out for double deletes if ( m_parents[i] == -2 ) { log(LOG_LOGIC,"db: Caught double delete."); return false; } // we need to be saved now m_needsSave = true; // we have one less occupied node m_memOccupied -= m_overhead; // . free it now iff "freeIt" is true (default is true) // . m_data can be a NULL array if m_fixedDataSize is fixed to 0 if ( /*freeData &&*/ m_data ) { int32_t dataSize = m_fixedDataSize; if ( dataSize == -1 ) dataSize = m_sizes[i]; if ( m_ownData ) mfree ( m_data [i] , dataSize ,m_allocName); m_memAllocated -= dataSize; m_memOccupied -= dataSize; } // j will be the node that replace node #i int32_t j = i; // . now find a node to replace node #i // . get a node whose key is just to the right or left of i's key // . get i's right kid // . then get that kid's LEFT MOST leaf-node descendant // . this little routine is stolen from getNextNode_unlocked(i) // . try to pick a kid from the right the same % of time as from left if ( ( m_pickRight && m_right[j] >= 0 ) || ( m_left[j] < 0 && m_right[j] >= 0 ) ) { // try to pick a left kid next time m_pickRight = false; // go to the right kid j = m_right [ j ]; // now go left as much as we can while ( m_left [ j ] >= 0 ) j = m_left [ j ]; // use node j (it's a leaf or has a right kid) return replaceNode_unlocked(i, j); } // . now get the previous node if i has no right kid // . this little routine is stolen from getPrevNode(i) if ( m_left[j] >= 0 ) { // try to pick a right kid next time m_pickRight = true; // go to the left kid j = m_left [ j ]; // now go right as much as we can while ( m_right [ j ] >= 0 ) j = m_right [ j ]; // use node j (it's a leaf or has a left kid) return replaceNode_unlocked(i, j); } // . come here if i did not have any kids (i's a leaf node) // . get i's parent int32_t iparent = m_parents[i]; // make i's parent, if any, disown him if ( iparent >= 0 ) { if ( m_left[iparent] == i ) m_left [iparent] = -1; else m_right[iparent] = -1; } // node i now goes to the top of the list of vacated, available homes m_right[i] = m_nextNode; // m_nextNode now points to i m_nextNode = i; // his parent is -2 (god) cuz he's dead and available m_parents[i] = -2; // . if we were the head node then, since we didn't have any kids, // the tree must be empty // . one less node in the tree decreaseNodeCount_unlocked(m_collnums[i], getKey_unlocked(i)); // . reset the depths starting at iparent and going up until unchanged // . will balance at pivot nodes that need it setDepths_unlocked(iparent); // tree must be empty if (m_numUsedNodes <= 0) { m_headNode = -1; // this will nullify our linked list of vacated, used homes m_nextNode = 0; m_minUnusedNode = 0; // ensure these are right m_numNegativeKeys = 0; m_numPositiveKeys = 0; if (m_rdbId >= 0) { CollectionRec *cr; cr = g_collectiondb.getRec(m_collnums[i]); if (cr) { cr->m_numNegKeysInTree[(unsigned char)m_rdbId] = 0; cr->m_numPosKeysInTree[(unsigned char)m_rdbId] = 0; } } } return true; } bool RdbTree::fixTree() { ScopedWriteLock sl(m_rwlock); return fixTree_unlocked(); } // . this fixes the tree // returns false if could not fix tree and sets g_errno, otherwise true bool RdbTree::fixTree_unlocked() { m_rwlock.verify_is_write_locked(); // on error, fix the linked list log(LOG_WARN, "db: Trying to fix tree for %s.", m_dbname); log(LOG_WARN, "db: %" PRId32" occupied nodes and %" PRId32" empty of top %" PRId32" nodes.", m_numUsedNodes, m_minUnusedNode - m_numUsedNodes, m_minUnusedNode); // loop through our nodes int32_t n = m_minUnusedNode; int32_t count = 0; // "clear" the tree as far as addNode() is concerned m_headNode = -1; m_numUsedNodes = 0; m_memAllocated = 0; m_memOccupied = 0; m_nextNode = 0; m_minUnusedNode = 0; int32_t max = g_collectiondb.getNumRecs(); log("db: Valid collection numbers range from 0 to %" PRId32".",max); /// @todo ALC should we check repair RDB as well? bool isTitledb = (m_rdbId == RDB_TITLEDB || m_rdbId == RDB2_TITLEDB2); bool isSpiderdb = (m_rdbId == RDB_SPIDERDB || m_rdbId == RDB2_SPIDERDB2); // now re-add the old nods to the tree, they should not be overwritten // by addNode() for ( int32_t i = 0 ; i < n ; i++ ) { // speed update if ( (i % 100000) == 0 ) log("db: Fixing node #%" PRId32" of %" PRId32".",i,n); // skip if empty if ( m_parents[i] <= -2 ) continue; if ( isTitledb && m_data[i] ) { char *data = m_data[i]; int32_t ucompSize = *(int32_t *)data; if ( ucompSize < 0 || ucompSize > 100000000 ) { log("db: removing titlerec with uncompressed " "size of %i from tree",(int)ucompSize); continue; } } char *key = &m_keys[i*m_ks]; if (isSpiderdb && m_data[i] && Spiderdb::isSpiderRequest((spiderdbkey_t *)key)) { char *data = m_data[i]; data -= sizeof(spiderdbkey_t); data -= 4; SpiderRequest *sreq ; sreq =(SpiderRequest *)data; if ( strncmp(sreq->m_url,"http",4) != 0 ) { log("db: removing spiderrequest bad url " "%s from tree",sreq->m_url); //return false; continue; } } collnum_t cn = m_collnums[i]; // verify collnum if ( cn < 0 ) continue; if ( cn >= max ) continue; // collnum of non-existent coll if ( m_rdbId>=0 && ! g_collectiondb.getRec(cn) ) continue; // now add just to set m_right/m_left/m_parent if ( m_fixedDataSize == 0 ) addNode_unlocked(cn,&m_keys[i*m_ks], NULL, 0 ); else if ( m_fixedDataSize == -1 ) addNode_unlocked(cn,&m_keys[i*m_ks],m_data[i],m_sizes[i] ); else addNode_unlocked(cn,&m_keys[i*m_ks],m_data[i], m_fixedDataSize); // count em count++; } log("db: Fix tree removed %" PRId32" nodes for %s.",n - count,m_dbname); // esure it is still good if ( !checkTree_unlocked(false, true) ) { log(LOG_WARN, "db: Fix tree failed."); return false; } log("db: Fix tree succeeded for %s.",m_dbname); return true; } void RdbTree::verifyIntegrity() const { ScopedReadLock sl(m_rwlock); if (!checkTree_unlocked(false, true)) { gbshutdownCorrupted(); } } // returns false if tree had problem, true otherwise bool RdbTree::checkTree_unlocked(bool printMsgs, bool doChainTest) const { m_rwlock.verify_is_locked(); int32_t hkp = 0; bool useHalfKeys = false; if (m_rdbId == RDB_LINKDB || m_rdbId == RDB2_LINKDB2) { useHalfKeys = true; } /// @todo ALC should we check repair RDB as well? bool isTitledb = (m_rdbId == RDB_TITLEDB || m_rdbId == RDB2_TITLEDB2); bool isSpiderdb = (m_rdbId == RDB_SPIDERDB || m_rdbId == RDB2_SPIDERDB2); // now check parent kid correlations for ( int32_t i = 0 ; i < m_minUnusedNode ; i++ ) { // skip node if parents is -2 (unoccupied) if ( m_parents[i] == -2 ) continue; // all half key bits must be off in here if ( useHalfKeys && (m_keys[i*m_ks] & 0x02) ) { hkp++; // turn it off m_keys[i*m_ks] &= 0xfd; } // for posdb if ( m_ks == 18 &&(m_keys[i*m_ks] & 0x06) ) { g_process.shutdownAbort(true); } if ( isTitledb && m_data[i] ) { char *data = m_data[i]; int32_t ucompSize = *(int32_t *)data; if ( ucompSize < 0 || ucompSize > 100000000 ) { log("db: found titlerec with uncompressed " "size of %i from tree",(int)ucompSize); return false; } } char *key = &m_keys[i*m_ks]; if ( isSpiderdb && m_data[i] && Spiderdb::isSpiderRequest ( (spiderdbkey_t *)key ) ) { char *data = m_data[i]; data -= sizeof(spiderdbkey_t); data -= 4; SpiderRequest *sreq ; sreq =(SpiderRequest *)data; if ( strncmp(sreq->m_url,"http",4) != 0 ) { log("db: spiderrequest bad url " "%s",sreq->m_url); return false; } } // bad collnum? collnum_t cn = m_collnums[i]; if ( m_rdbId>=0 && (cn >= g_collectiondb.getNumRecs() || cn < 0) ) { log(LOG_WARN, "db: bad collnum in tree"); return false; } if ( m_rdbId>=0 && ! g_collectiondb.getRec(cn) ) { log(LOG_WARN, "db: collnum is obsolete in tree"); return false; } // if no left/right kid it MUST be -1 if ( m_left[i] < -1 ) { log(LOG_WARN, "db: Tree left kid < -1."); return false; } if ( m_left[i] >= m_numNodes ) { log(LOG_WARN, "db: Tree left kid is %" PRId32" >= %" PRId32".", m_left[i], m_numNodes); return false; } if ( m_right[i] < -1 ) { log(LOG_WARN, "db: Tree right kid < -1."); return false; } if ( m_right[i] >= m_numNodes ) { log(LOG_WARN, "db: Tree left kid is %" PRId32" >= %" PRId32".", m_right[i], m_numNodes); return false; } // check left kid if ( m_left[i] >= 0 && m_parents[m_left[i]] != i ) { log(LOG_WARN, "db: Tree left kid and parent disagree."); return false; } // then right kid if ( m_right[i] >= 0 && m_parents[m_right[i]] != i ) { log(LOG_WARN, "db: Tree right kid and parent disagree."); return false; } // MDW: why did i comment out the order checking? // check order if ( m_left[i] >= 0 && m_collnums[i] == m_collnums[m_left[i]] ) { char *key = &m_keys[i*m_ks]; char *left = &m_keys[m_left[i]*m_ks]; if ( KEYCMP(key,left,m_ks)<0) { log(LOG_WARN, "db: Tree left kid > parent %i", i); return false; } } if ( m_right[i] >= 0 && m_collnums[i] == m_collnums[m_right[i]] ) { char *key = &m_keys[i*m_ks]; char *right = &m_keys[m_right[i]*m_ks]; if ( KEYCMP(key,right,m_ks)>0) { log(LOG_WARN, "db: Tree right kid < parent %i %s < %s", i, KEYSTR(right, m_ks), KEYSTR(key, m_ks)); return false; } } } if ( hkp > 0 ) { log( LOG_WARN, "db: Had %" PRId32" half key bits on for %s.", hkp, m_dbname ); return false; } // now return if we aren't doing active balancing if ( ! m_depth ) return true; // debug -- just always return now if ( printMsgs )logf(LOG_DEBUG,"***m_headNode=%" PRId32", m_numUsedNodes=%" PRId32, m_headNode,m_numUsedNodes); int32_t max = g_collectiondb.getNumRecs(); // verify that parent links correspond to kids for ( int32_t i = 0 ; i < m_minUnusedNode ; i++ ) { // verify collnum collnum_t cn = m_collnums[i]; if ( cn < 0 ) { log( LOG_WARN, "db: Got bad collnum in tree, %i.", cn ); return false; } if ( cn > max ) { log( LOG_WARN, "db: Got too big collnum in tree. %i.", cn ); return false; } int32_t P = m_parents [i]; if ( P == -2 ) continue; // deleted node if ( P == -1 && i != m_headNode ) { log( LOG_WARN, "db: Tree node %" PRId32" has no parent.", i ); return false; } // check kids if ( P>=0 && m_left[P] != i && m_right[P] != i ) { log( LOG_WARN, "db: Tree kids of node # %" PRId32" disowned him.", i ); return false; } // speedy tests continue if ( ! doChainTest ) continue; // ensure i goes back to head node int32_t j = i; int32_t loopCount = 0; while ( j >= 0 ) { if ( j == m_headNode ) { break; } // sanity -- loop check if ( ++loopCount > 10000 ) { log( LOG_WARN, "db: tree had loop" ); return false; } j = m_parents[j]; } if ( j != m_headNode ) { log( LOG_WARN, "db: Node # %" PRId32" does not lead back to head node.", i ); return false; } if ( printMsgs ) { char *k = &m_keys[i*m_ks]; logf(LOG_DEBUG,"***node=%" PRId32" left=%" PRId32" rght=%" PRId32" " "prnt=%" PRId32", depth=%" PRId32" c=%" PRId32" key=%s", i,m_left[i],m_right[i],m_parents[i], (int32_t)m_depth[i],(int32_t)m_collnums[i], KEYSTR(k,m_ks)); } //ensure depth int32_t newDepth = computeDepth_unlocked(i); if ( m_depth[i] != newDepth ) { log( LOG_WARN, "db: Tree node # %" PRId32"'s depth should be %" PRId32".", i, newDepth ); return false; } } if ( printMsgs ) logf(LOG_DEBUG,"---------------"); // no problems found return true; } // . grow tree to "n" nodes // . this will now actually grow from a current size to a new one bool RdbTree::growTree_unlocked(int32_t nn) { m_rwlock.verify_is_write_locked(); // if we're that size, bail if ( m_numNodes == nn ) return true; // old number of nodes int32_t on = m_numNodes; // some quick type info int32_t k = m_ks; int32_t d = sizeof(char *); char *kp = NULL; int32_t *lp = NULL; int32_t *rp = NULL; int32_t *pp = NULL; char **dp = NULL; int32_t *sp = NULL; char *tp = NULL; collnum_t *cp = NULL; // do the reallocs int32_t cs = sizeof(collnum_t); cp =(collnum_t *)mrealloc (m_collnums, on*cs,nn*cs,m_allocName); if ( ! cp ) { goto error; } kp = (char *) mrealloc ( m_keys , on*k , nn*k , m_allocName ); if ( ! kp ) { goto error; } lp = (int32_t *) mrealloc ( m_left , on*4 , nn*4 , m_allocName ); if ( ! lp ) { goto error; } rp = (int32_t *) mrealloc ( m_right , on*4 , nn*4 , m_allocName ); if ( ! rp ) { goto error; } pp = (int32_t *) mrealloc ( m_parents , on*4 , nn*4 , m_allocName ); if ( ! pp ) { goto error; } // deal with data, sizes and depth arrays on a basis of need if ( m_fixedDataSize != 0 ) { dp =(char **)mrealloc (m_data , on*d,nn*d,m_allocName); if ( ! dp ) { goto error; } } if ( m_fixedDataSize == -1 ) { sp =(int32_t *)mrealloc (m_sizes , on*4,nn*4,m_allocName); if ( ! sp ) { goto error; } } tp =(char *)mrealloc (m_depth , on ,nn ,m_allocName); if ( ! tp ) { goto error; } // re-assign m_collnums= cp; m_keys = kp; m_left = lp; m_right = rp; m_parents = pp; m_data = dp; m_sizes = sp; m_depth = tp; // adjust memory usage m_memAllocated -= m_overhead * on; m_memAllocated += m_overhead * nn; // bitch an exit if too much if ( m_memAllocated > m_maxMem ) { log( LOG_ERROR, "db: Trying to grow tree for %s to %" PRId32", but max is %" PRId32". Consider changing gb.conf.", m_dbname, m_memAllocated, m_maxMem ); return false; } // and the new # of nodes we have m_numNodes = nn; return true; error: // . realloc back down if we need to // . downsizing should NEVER fail! if ( cp ) { collnum_t *ss = (collnum_t *)mrealloc ( cp , nn*cs , on*cs , m_allocName); if ( ! ss ) { g_process.shutdownAbort(true); } m_collnums = ss; } if ( kp ) { char *kk = (char *)mrealloc ( kp, nn*k, on*k, m_allocName ); if ( ! kk ) { g_process.shutdownAbort(true); } m_keys = kk; } if ( lp ) { int32_t *x = (int32_t *)mrealloc ( lp , nn*4 , on*4 , m_allocName ); if ( ! x ) { g_process.shutdownAbort(true); } m_left = x; } if ( rp ) { int32_t *x = (int32_t *)mrealloc ( rp , nn*4 , on*4 , m_allocName ); if ( ! x ) { g_process.shutdownAbort(true); } m_right = x; } if ( pp ) { int32_t *x = (int32_t *)mrealloc ( pp , nn*4 , on*4 , m_allocName ); if ( ! x ) { g_process.shutdownAbort(true); } m_parents = x; } if ( dp && m_fixedDataSize != 0 ) { char **p = (char **)mrealloc ( dp , nn*d , on*d , m_allocName ); if ( ! p ) { g_process.shutdownAbort(true); } m_data = p; } if ( sp && m_fixedDataSize == -1 ) { int32_t *x = (int32_t *)mrealloc ( sp , nn*4 , on*4 , m_allocName ); if ( ! x ) { g_process.shutdownAbort(true); } m_sizes = x; } /// @note ALC the following code will be used if we add something else below the mrealloc of m_depth above //if ( tp ) { // char *s = (char *)mrealloc ( tp , nn , on , m_allocName ); // if ( ! s ) { g_process.shutdownAbort(true); } // m_depth = s; //} log( LOG_ERROR, "db: Failed to grow tree for %s from %" PRId32" to %" PRId32" bytes: %s.", m_dbname, on, nn, mstrerror(g_errno) ); return false; } int32_t RdbTree::getMemOccupiedForList_unlocked(collnum_t collnum, const char *startKey, const char *endKey, int32_t minRecSizes) const { m_rwlock.verify_is_locked(); int32_t ne = 0; int32_t size = 0; int32_t i = getNextNode_unlocked(collnum, startKey) ; while ( i >= 0 ) { // break out if we should if ( KEYCMP(m_keys,i,endKey,0,m_ks) > 0 ) break; if ( m_collnums[i] != collnum ) break; if ( size >= minRecSizes ) break; // num elements ne++; // do we got data? if ( m_data ) { // is size fixed? if ( m_fixedDataSize >= 0 ) size += m_fixedDataSize; else size += m_sizes[i]; } // add in key overhead size += m_ks; // add in dataSize overhead (-1 means variable data size) if ( m_fixedDataSize < 0 ) size += 4; // advance i = getNextNode_unlocked(i); } // that's it return size; } int32_t RdbTree::getMemOccupiedForList() const { ScopedReadLock sl(m_rwlock); return getMemOccupiedForList_unlocked(); } int32_t RdbTree::getMemOccupiedForList_unlocked() const { m_rwlock.verify_is_locked(); int32_t mem = 0; if ( m_fixedDataSize >= 0 ) { mem += m_numUsedNodes * m_ks; mem += m_numUsedNodes * m_fixedDataSize; return mem; } // get total mem used by occupied nodes mem = m_memOccupied; // remove left/right/parent for each used node (3 int32_ts) mem -= m_overhead * m_numUsedNodes; // but do include the key in the list, even though it's in the overhead mem += m_ks * m_numUsedNodes; // but don't include the dataSize in the overhead -- that's in list too mem -= 4 * m_numUsedNodes; return mem; } // . returns false and sets g_errno on error // . throw all the records in this range into this list // . probably about 24-50 cycles per key we add // . if this turns out to be bottleneck we can use hardcore RdbGet later // . RdbDump should use this bool RdbTree::getList(collnum_t collnum, const char *startKey, const char *endKey, int32_t minRecSizes, RdbList *list, int32_t *numPosRecs, int32_t *numNegRecs, bool useHalfKeys) const { ScopedReadLock sl(m_rwlock); // reset the counts of positive and negative recs int32_t numNeg = 0; int32_t numPos = 0; if ( numNegRecs ) *numNegRecs = 0; if ( numPosRecs ) *numPosRecs = 0; // . set the start and end keys of this list // . set lists's m_ownData member to true list->reset(); // got set m_ks first so the set ( startKey, endKey ) works! list->setKeySize(m_ks); list->set ( startKey , endKey ); list->setFixedDataSize ( m_fixedDataSize ); list->setUseHalfKeys ( useHalfKeys ); // bitch if list does not own his own data if ( ! list->getOwnData() ) { g_errno = EBADENGINEER; log(LOG_LOGIC,"db: rdbtree: getList: List does not own data"); return false; } // bail if minRecSizes is 0 if ( minRecSizes == 0 ) return true; // return true if no non-empty nodes in the tree if ( m_numUsedNodes == 0 ) return true; // get first node >= startKey int32_t node = getNextNode_unlocked(collnum, startKey); if ( node < 0 ) return true; // if it's already beyond endKey, give up if ( KEYCMP ( m_keys,node,endKey,0,m_ks) > 0 ) return true; // or if we hit a different collection number if ( m_collnums [ node ] > collnum ) return true; // save lastNode for setting *lastKey int32_t lastNode = -1; // . how much space would whole tree take if we stored it in a list? // . this includes records that are deletes // . caller will often say give me 500MB for a fixeddatasize list // that is heavily constrained by keys... int32_t growth = getMemOccupiedForList_unlocked(); // do not allocate whole tree's worth of space if we have a fixed // data size and a finite minRecSizes if ( m_fixedDataSize >= 0 && minRecSizes >= 0 ) { // only assign if we require less than minRecSizes of growth // because some callers set minRecSizes to 500MB!! int32_t ng = minRecSizes + m_fixedDataSize + m_ks ; if ( ng < growth && ng > minRecSizes ) growth = ng; } // raise to virtual inifinite if not constraining us if ( minRecSizes < 0 ) minRecSizes = 0x7fffffff; // . nail it down if titledb because metalincs was getting // out of memory errors when getting a bunch of titleRecs // . only do this for titledb/spiderdb lookups since it can be slow // to go through a million indexdb nodes. // . this was because minRecSizes was way too big... 16MB i think // . if they pass us a size-unbounded request for a fixed data size // list then we should call this as well... as in Msg22.cpp's // call to msg5::getList for tfndb. // // ^^^^^ Partially obsolete rambling above ^^^^^ // Until we have verified that no call uses some silly huge minRecSizes we keep the // counting threshold but have it a 2MB which is more suitable for modern hardware. // The problem is that the getMemOccupiedForList_unlocked() method has to traverse all the // relevant nodes to calculate the total size and doing so is not cheap. if ( m_fixedDataSize < 0 || minRecSizes >= 2*1024*1024 ) growth = getMemOccupiedForList_unlocked(collnum, startKey, endKey, minRecSizes); // grow the list now if ( ! list->growList ( growth ) ) { log(LOG_WARN, "db: Failed to grow list to %" PRId32" bytes for storing records from tree: %s.", growth, mstrerror(g_errno)); return false; } // stop when we've hit or just exceed minRecSizes // or we're out of nodes for ( ; node >= 0 && list->getListSize() < minRecSizes ; node = getNextNode_unlocked(node) ) { // stop before exceeding endKey if ( KEYCMP (m_keys,node,endKey,0,m_ks) > 0 ) break; // or if we hit a different collection number if ( m_collnums [ node ] != collnum ) break; // if more recs were added to tree since we initialized the // list then grow the list to compensate so we do not end up // reallocating one key at a time. // add record to our list if ( m_fixedDataSize == 0 ) { if (!list->addRecord(&m_keys[node * m_ks], 0, NULL)) { log(LOG_WARN, "db: Failed to add record to tree list for %s: %s. Fix the growList algo.", m_dbname,mstrerror(g_errno)); return false; } } else { int32_t dataSize = (m_fixedDataSize == -1) ? m_sizes[node] : m_fixedDataSize; // point to key char *key = &m_keys[node*m_ks]; // do not allow negative keys to have data, or // at least ignore it! let's RdbList::addRecord() // core dump on us! if (KEYNEG(key)) { dataSize = 0; } // add the key and data if (!list->addRecord(key, dataSize, m_data[node])) { log(LOG_WARN, "db: Failed to add record to tree list for %s: %s. Fix the growList algo.", m_dbname,mstrerror(g_errno)); return false; } } // we are little endian if ( KEYNEG(m_keys,node,m_ks) ) numNeg++; else numPos++; // save lastNode for setting *lastKey lastNode = node; } // set counts to pass back if ( numNegRecs ) *numNegRecs = numNeg; if ( numPosRecs ) *numPosRecs = numPos; // . we broke out of the loop because either: // . 1. we surpassed endKey OR // . 2. we hit or surpassed minRecSizes // . constrain the endKey of the list to the key of "node" minus 1 // . "node" should be the next node we would have added to this list // . if "node" is < 0 then we can keep endKey set high the way it is // record the last key inserted into the list if ( lastNode >= 0 ) list->setLastKey ( &m_keys[lastNode*m_ks] ); // reset the list's endKey if we hit the minRecSizes barrier cuz // there may be more records before endKey than we put in "list" if ( list->getListSize() >= minRecSizes && lastNode >= 0 ) { // use the last key we read as the new endKey char newEndKey[MAX_KEY_BYTES]; KEYSET(newEndKey,&m_keys[lastNode*m_ks],m_ks); // . if he's negative, boost new endKey by 1 because endKey's // aren't allowed to be negative // . we're assured there's no positive counterpart to him // since Rdb::addRecord() doesn't allow both to exist in // the tree at the same time // . if by some chance his positive counterpart is in the // tree, then it's ok because we'd annihilate him anyway, // so we might as well ignore him // we are little endian if ( KEYNEG(newEndKey,0,m_ks) ) KEYINC(newEndKey,m_ks); // if we're using half keys set his half key bit if ( useHalfKeys ) KEYOR(newEndKey,0x02); // tell list his new endKey now list->set ( startKey , newEndKey ); } // reset list ptr to point to first record list->resetListPtr(); // success return true; } // . this just estimates the size of the list // . the more balanced the tree the better the accuracy // . this now returns total recSizes not # of NODES like it used to // in [startKey, endKey] in this tree // . if the count is < 200 it returns an EXACT count // . right now it only works for dataless nodes (keys only) int32_t RdbTree::estimateListSize(collnum_t collnum, const char *startKey, const char *endKey, char *minKey, char *maxKey) const { ScopedReadLock sl(m_rwlock); // make these as benign as possible if ( minKey ) KEYSET ( minKey , endKey , m_ks ); if ( maxKey ) KEYSET ( maxKey , startKey , m_ks ); // get order of a key as close to startKey as possible int32_t order1 = getOrderOfKey_unlocked(collnum, startKey, minKey); // get order of a key as close to endKey as possible int32_t order2 = getOrderOfKey_unlocked(collnum, endKey, maxKey); // how many recs? int32_t size = order2 - order1; // . if enough, return // . NOTE minKey/maxKey may be < or > startKey/endKey // . return an estimated list size if ( size > 200 ) return size * m_ks; // . otherwise, count exactly // . reset size and get the initial node size = 0; int32_t n = getPrevNode_unlocked(collnum, startKey); // return 0 if no nodes in that key range if( n < 0 ) { return 0; } // skip to next node if this one is < startKey if( KEYCMP(m_keys, n, startKey, 0, m_ks) < 0 ) { n = getNextNode_unlocked(n); } if( n < 0 ) { return 0; } // or collnum if ( m_collnums[n] < collnum ) { n = getNextNode_unlocked(n); } // loop until we run out of nodes or one breeches endKey while( n > 0 && KEYCMP(m_keys,n,endKey,0,m_ks) <= 0 && m_collnums[n]==collnum ) { size++; n = getNextNode_unlocked(n); } // this should be an exact list size (actually # of nodes) return size * m_ks; } bool RdbTree::collExists(collnum_t coll) const { ScopedReadLock sl(m_rwlock); int32_t nn = getNextNode_unlocked(coll, KEYMIN()); if(nn < 0) return false; if(getCollnum_unlocked(nn) != coll) return false; return true; } // we don't lock because variable is already atomic bool RdbTree::isSaving() const { return m_isSaving; } bool RdbTree::needsSave() const { ScopedReadLock sl(m_rwlock); return m_needsSave; } // . returns a number from 0 to m_numUsedNodes-1 // . represents the ordering of this key in that range // . *retKey is the key that has the returned order // . *retKey gets as close to "key" as it can // . returns # of NODES int32_t RdbTree::getOrderOfKey_unlocked(collnum_t collnum, const char *key, char *retKey) const { m_rwlock.verify_is_locked(); if ( m_numUsedNodes <= 0 ) return 0; int32_t i = m_headNode; // estimate the depth of tree if not balanced int32_t d = getTreeDepth_unlocked(); // TODO: WARNING: ensure d-1 not >= 32 !!!!!!!!!!!!!!!!! int32_t step = 1 << (d-1); int32_t order = step; while ( i != -1 ) { //if ( retKey ) *retKey = m_keys[i]; if ( retKey ) KEYSET ( retKey , &m_keys[i*m_ks] , m_ks ); step /= 2; if ( collnum < m_collnums[i] || (collnum==m_collnums[i] &&KEYCMP(key,0,m_keys,i,m_ks)<0)){ i = m_left [i]; if ( i >= 0 ) order -= step; continue; } if ( collnum > m_collnums[i] || (collnum==m_collnums[i] &&KEYCMP(key,0,m_keys,i,m_ks)>0)){ i = m_right[i]; if ( i >= 0 ) order += step; continue; } break; } // normalize order since tree probably has less then 2^d nodes int64_t normOrder = (int64_t)order * (int64_t)m_numUsedNodes / ( ((int64_t)1 << d)-1); return (int32_t) normOrder; } int32_t RdbTree::getTreeDepth_unlocked() const { m_rwlock.verify_is_locked(); return m_depth[m_headNode]; } // . recompute depths of nodes starting at i and ascending the tree // . call rotateRight/Left() when depth of children differs by 2 or more void RdbTree::setDepths_unlocked(int32_t i) { m_rwlock.verify_is_write_locked(); // inc the depth of all parents if it changes for them while ( i >= 0 ) { // . compute the new depth for node i // . get depth of left kid // . left/rightDepth is depth of subtree on left/right int32_t leftDepth = 0; int32_t rightDepth = 0; if ( m_left [i] >= 0 ) leftDepth = m_depth [ m_left [i] ] ; if ( m_right[i] >= 0 ) rightDepth = m_depth [ m_right[i] ] ; // . get the new depth for node i // . add 1 cuz we include ourself in our m_depth int32_t newDepth ; if ( leftDepth > rightDepth ) newDepth = leftDepth + 1; else newDepth = rightDepth + 1; // if the depth did not change for i then we're done int32_t oldDepth = m_depth[i] ; // set our new depth m_depth[i] = newDepth; // diff can be -2, -1, 0, +1 or +2 int32_t diff = leftDepth - rightDepth; // . if it's -1, 0 or 1 then we don't need to balance // . if rightside is deeper rotate left, i is the pivot // . otherwise, rotate left // . these should set the m_depth[*] for all nodes needing it if ( diff == -2 ) i = rotateLeft_unlocked(i); else if ( diff == 2 ) i = rotateRight_unlocked(i); // . return if our depth was ultimately unchanged // . i may have change if we rotated, but same logic applies if ( m_depth[i] == oldDepth ) break; // debug msg //fprintf (stderr,"changed node %" PRId32"'s depth from %" PRId32" to %" PRId32"\n", //i,oldDepth,newDepth); // get his parent to continue the ascension i = m_parents [ i ]; } // debug msg //printTree(); } /* // W , X and B are SUBTREES. // B's subtree was 1 less in depth than W or X, then a new node was added to // W or X triggering the imbalance. // However, if B gets deleted W and X can be the same size. // // Right rotation if W subtree depth is >= X subtree depth: // // A N // / \ / \ // / \ / \ // N B ---> W A // / \ / \ // W X X B // // Right rotation if W subtree depth is < X subtree depth: // A X // / \ / \ // / \ / \ // N B ---> N A // / \ / \ / \ // W X W Q T B // / \ // Q T */ // . we come here when A's left subtree is deeper than it's right subtree by 2 // . this rotation operation causes left to lose 1 depth and right to gain one // . the type of rotation depends on which subtree is deeper, W or X // . W or X must deeper by the other by exactly one // . if they were equal depth then how did adding a node inc the depth? // . if their depths differ by 2 then N would have been rotated first! // . the parameter "i" is the node # for A in the illustration above // . return the node # that replaced A so the balance() routine can continue // . TODO: check our depth modifications below int32_t RdbTree::rotateRight_unlocked(int32_t i) { m_rwlock.verify_is_write_locked(); return rotate_unlocked(i, m_left, m_right); } // . i just swapped left with m_right int32_t RdbTree::rotateLeft_unlocked(int32_t i) { m_rwlock.verify_is_write_locked(); return rotate_unlocked(i, m_right, m_left); } int32_t RdbTree::rotate_unlocked(int32_t i, int32_t *left, int32_t *right) { m_rwlock.verify_is_write_locked(); // i's left kid's right kid takes his place int32_t A = i; int32_t N = left [ A ]; int32_t W = left [ N ]; int32_t X = right [ N ]; int32_t Q = -1; int32_t T = -1; if ( X >= 0 ) { Q = left [ X ]; T = right [ X ]; } // let AP be A's parent int32_t AP = m_parents [ A ]; // whose the bigger subtree, W or X? (depth includes W or X itself) int32_t Wdepth = 0; int32_t Xdepth = 0; if ( W >= 0 ) Wdepth = m_depth[W]; if ( X >= 0 ) Xdepth = m_depth[X]; // goto Xdeeper if X is deeper if ( Wdepth < Xdepth ) goto Xdeeper; // N's parent becomes A's parent m_parents [ N ] = AP; // A's parent becomes N m_parents [ A ] = N; // X's parent becomes A if ( X >= 0 ) m_parents [ X ] = A; // A's parents kid becomes N if ( AP >= 0 ) { if ( left [ AP ] == A ) left [ AP ] = N; else right [ AP ] = N; } // if A had no parent, it was the headNode else { //fprintf(stderr,"changing head node from %" PRId32" to %" PRId32"\n", //m_headNode,N); m_headNode = N; } // N's right kid becomes A right [ N ] = A; // A's left kid becomes X left [ A ] = X; // . compute A's depth from it's X and B kids // . it should be one less if Xdepth smaller than Wdepth // . might set m_depth[A] to computeDepth_unlocked(A) if we have problems if ( Xdepth < Wdepth ) m_depth [ A ] -= 2; else m_depth [ A ] -= 1; // N gains a depth iff W and X were of equal depth if ( Wdepth == Xdepth ) m_depth [ N ] += 1; // now we're done, return the new pivot that replaced A return N; // come here if X is deeper Xdeeper: // X's parent becomes A's parent m_parents [ X ] = AP; // A's parent becomes X m_parents [ A ] = X; // N's parent becomes X m_parents [ N ] = X; // Q's parent becomes N if ( Q >= 0 ) m_parents [ Q ] = N; // T's parent becomes A if ( T >= 0 ) m_parents [ T ] = A; // A's parent's kid becomes X if ( AP >= 0 ) { if ( left [ AP ] == A ) left [ AP ] = X; else right [ AP ] = X; } // if A had no parent, it was the headNode else { //fprintf(stderr,"changing head node2 from %" PRId32" to %" PRId32"\n", //m_headNode,X); m_headNode = X; } // A's left kid becomes T left [ A ] = T; // N's right kid becomes Q right [ N ] = Q; // X's left kid becomes N left [ X ] = N; // X's right kid becomes A right [ X ] = A; // X's depth increases by 1 since it gained 1 level of 2 new kids m_depth [ X ] += 1; // N's depth decreases by 1 m_depth [ N ] -= 1; // A's depth decreases by 2 m_depth [ A ] -= 2; // now we're done, return the new pivot that replaced A return X; } // . depth of subtree with i as the head node // . includes i, so minimal depth is 1 int32_t RdbTree::computeDepth_unlocked(int32_t i) const { m_rwlock.verify_is_locked(); int32_t leftDepth = 0; int32_t rightDepth = 0; if ( m_left [i] >= 0 ) leftDepth = m_depth [ m_left [i] ] ; if ( m_right[i] >= 0 ) rightDepth = m_depth [ m_right[i] ] ; // . get the new depth for node i // . add 1 cuz we include ourself in our m_depth if ( leftDepth > rightDepth ) return leftDepth + 1; else return rightDepth + 1; } int32_t RdbTree::getMemAllocated() const { ScopedReadLock sl(m_rwlock); return m_memAllocated; } int32_t RdbTree::getMemOccupied() const { ScopedReadLock sl(m_rwlock); return m_memOccupied; } bool RdbTree::is90PercentFull() const { ScopedReadLock sl(m_rwlock); // . m_memOccupied is amount of alloc'd mem that data occupies // . now we /90 and /100 since multiplying overflowed return ( m_numUsedNodes/90 >= m_numNodes/100 ); } #define BLOCK_SIZE 10000 // . caller should call f->set() himself // . we'll open it here // . returns false if blocked, true otherwise // . sets g_errno on error bool RdbTree::fastSave(const char *dir, bool useThread, void *state, void (*callback)(void *)) { ScopedReadLock sl(m_rwlock); logTrace(g_conf.m_logTraceRdbTree, "BEGIN. dir=%s", dir); if (g_conf.m_readOnlyMode) { logTrace(g_conf.m_logTraceRdbTree, "END. Read only mode. Returning true."); return true; } // we do not need a save if (!m_needsSave) { logTrace(g_conf.m_logTraceRdbTree, "END. Don't need to save. Returning true."); return true; } // return true if already in the middle of saving bool isSaving = m_isSaving.exchange(true); if (isSaving) { logTrace(g_conf.m_logTraceRdbTree, "END. Is already saving. Returning false."); return false; } // note it logf(LOG_INFO, "db: Saving %s%s-saved.dat", dir, m_dbname); // save parms strncpy(m_dir, dir, sizeof(m_dir)-1); m_dir[sizeof(m_dir) - 1] = '\0'; m_state = state; m_callback = callback; // assume no error m_errno = 0; if (useThread) { // make this a thread now if (g_jobScheduler.submit(saveWrapper, saveDoneWrapper, this, thread_type_unspecified_io, 1/*niceness*/)) { return false; } // if it failed if (g_jobScheduler.are_new_jobs_allowed()) { log(LOG_WARN, "db: Thread creation failed. Blocking while saving tree. Hurts performance."); } } sl.unlock(); // no threads saveWrapper(this); saveDoneWrapper(this, job_exit_normal); logTrace(g_conf.m_logTraceRdbTree, "END. Returning true."); // we did not block return true; } void RdbTree::saveWrapper ( void *state ) { logTrace(g_conf.m_logTraceRdbTree, "BEGIN"); // get this class RdbTree *that = (RdbTree *)state; ScopedReadLock sl(that->getLock()); // assume no error since we're at the start of thread call that->m_errno = 0; // this returns false and sets g_errno on error that->fastSave_unlocked(); if (g_errno && !that->m_errno) { that->m_errno = g_errno; } if (that->m_errno) { log(LOG_ERROR, "db: Had error saving tree to disk for %s: %s.", that->m_dbname, mstrerror(that->m_errno)); } else { log(LOG_INFO, "db: Done saving %s with %" PRId32" keys (%" PRId64" bytes)", that->m_dbname, that->m_numUsedNodes, that->m_bytesWritten); } // . resume adding to the tree // . this will also allow other threads to be queued // . if we did this at the end of the thread we could end up with // an overflow of queued SAVETHREADs that->m_isSaving = false; // we do not need to be saved now? that->m_needsSave = false; logTrace(g_conf.m_logTraceRdbTree, "END"); } /// @todo ALC cater for when exit_type != job_exit_normal // we come here after thread exits void RdbTree::saveDoneWrapper(void *state, job_exit_t exit_type) { logTrace(g_conf.m_logTraceRdbTree, "BEGIN"); // get this class RdbTree *that = (RdbTree *)state; // store save error into g_errno g_errno = that->m_errno; // . call callback if ( that->m_callback ) { that->m_callback ( that->m_state ); } logTrace(g_conf.m_logTraceRdbTree, "END"); } // . returns false and sets g_errno on error // . NO USING g_errno IN A DAMN THREAD!!!!!!!!!!!!!!!!!!!!!!!!! bool RdbTree::fastSave_unlocked() { m_rwlock.verify_is_locked(); if ( g_conf.m_readOnlyMode ) return true; /// @todo ALC we should probably use BigFile now. don't think g_errno being messed up is a good reason // cannot use the BigFile class, since we may be in a thread and it messes with g_errno char s[1024]; sprintf ( s , "%s/%s-saving.dat", m_dir , m_dbname ); int fd = ::open ( s , O_RDWR | O_CREAT | O_TRUNC , getFileCreationFlags() ); if ( fd < 0 ) { m_errno = errno; log( LOG_ERROR, "db: Could not open %s for writing: %s.", s, mstrerror( errno ) ); return false; } // verify the tree if (g_conf.m_verifyWrites) { for (;;) { log("db: verify writes is enabled, checking tree before saving."); if (!checkTree_unlocked(false, true)) { log(LOG_WARN, "db: fixing tree and re-checking"); fixTree_unlocked(); continue; } break; } } // clear our own errno errno = 0; // . save the header // . force file head to the 0 byte in case offset was elsewhere int64_t offset = 0; int64_t br = 0; static const bool doBalancing = true; br += pwrite ( fd , &m_numNodes , 4 , offset ); offset += 4; br += pwrite ( fd , &m_fixedDataSize , 4 , offset ); offset += 4; br += pwrite ( fd , &m_numUsedNodes , 4 , offset ); offset += 4; br += pwrite ( fd , &m_headNode , 4 , offset ); offset += 4; br += pwrite ( fd , &m_nextNode , 4 , offset ); offset += 4; br += pwrite ( fd , &m_minUnusedNode , 4 , offset ); offset += 4; br += pwrite ( fd , &doBalancing , sizeof(doBalancing) , offset); offset += sizeof(doBalancing); br += pwrite ( fd , &m_ownData , sizeof(m_ownData) , offset); offset += sizeof(m_ownData); // bitch on error if ( br != offset ) { m_errno = errno; close ( fd ); log(LOG_WARN, "db: Failed to save tree1 for %s: %s.", m_dbname,mstrerror(errno)); return false; } // position to store into m_keys, ... int32_t start = 0; // save tree in block units while ( start < m_minUnusedNode ) { // . returns number of nodes, starting at node #i, saved // . returns -1 and sets errno on error int32_t bytesWritten = fastSaveBlock_unlocked(fd, start, offset) ; // returns -1 on error if ( bytesWritten < 0 ) { close ( fd ); m_errno = errno; log(LOG_WARN, "db: Failed to save tree2 for %s: %s.", m_dbname,mstrerror(errno)); return false; } // point to next block to save to start += BLOCK_SIZE; // and advance the file offset offset += bytesWritten; } // remember total bytes written m_bytesWritten = offset; // close it up close ( fd ); // now rename it char s2[1024]; sprintf ( s2 , "%s/%s-saved.dat", m_dir , m_dbname ); if( ::rename(s, s2) == -1 ) { logError("Error renaming file [%s] to [%s] (%d: %s)", s, s2, errno, mstrerror(errno)); } return true; } // return bytes written int32_t RdbTree::fastSaveBlock_unlocked(int fd, int32_t start, int64_t offset) { m_rwlock.verify_is_locked(); // save offset int64_t oldOffset = offset; // . just save each one right out, even if empty // because the empty's have a linked list in m_right[] // . set # n int32_t n = BLOCK_SIZE; // don't over do it if ( start + n > m_minUnusedNode ) n = m_minUnusedNode - start; errno = 0; int64_t br = 0; // write the block br += pwrite ( fd,&m_collnums[start], n * sizeof(collnum_t) , offset ); offset += n * sizeof(collnum_t); br += pwrite ( fd , &m_keys [start*m_ks] , n * m_ks , offset ); offset += n * m_ks; br += pwrite(fd, &m_left [start] , n * 4 , offset ); offset += n * 4; br += pwrite(fd, &m_right [start] , n * 4 , offset ); offset += n * 4; br += pwrite(fd, &m_parents[start] , n * 4 , offset ); offset += n * 4; br += pwrite ( fd , &m_depth[start] , n , offset ); offset += n ; if ( m_fixedDataSize == -1 ) { br += pwrite ( fd , &m_sizes[start] , n*4, offset ); offset += n*4; } // bitch on error if ( br != offset - oldOffset ) { log(LOG_WARN, "db: Failed to save tree3 for %s (%" PRId64"!=%" PRId64"): %s.", m_dbname, br, offset, mstrerror(errno)); return -1; } // if no data to write then return bytes written this call if ( m_fixedDataSize == 0 ) return offset - oldOffset ; // debug count //int32_t count = 0; // define ending node for all loops int32_t end = start + n ; // now we have to dump out all the records for ( int32_t i = start ; i < end ; i++ ) { // skip if empty if ( m_parents[i] == -2 ) continue; // write variable sized nodes if ( m_fixedDataSize == -1 ) { if ( m_sizes[i] <= 0 ) continue; pwrite ( fd , m_data[i] , m_sizes[i] , offset ); offset += m_sizes[i]; continue; } // write fixed sized nodes pwrite ( fd , m_data[i] , m_fixedDataSize , offset ); offset += m_fixedDataSize; } // return bytes written return offset - oldOffset; } // . caller should call f->set() himself // . we'll open it here // . returns false and sets g_errno on error (sometimes g_errno not set) bool RdbTree::fastLoad(BigFile *f, RdbMem *stack) { ScopedWriteLock sl(m_rwlock); log( LOG_INIT, "db: Loading %s.", f->getFilename() ); // open it up if ( ! f->open ( O_RDONLY ) ) { log( LOG_ERROR, "db: open failed" ); return false; } int32_t fsize = f->getFileSize(); // init offset int64_t offset = 0; // 16 byte header int32_t header = 4*6 + 1 + sizeof(m_ownData); // file size must be a min of "header" if ( fsize < header ) { f->close(); g_errno=EBADFILE; log( LOG_ERROR, "db: file size smaller than header" ); return false; } // get # of nodes in the tree int32_t n , fixedDataSize , numUsedNodes ; bool doBalancing , ownData ; int32_t headNode , nextNode , minUnusedNode; // force file head to the 0 byte in case offset was elsewhere f->read ( &n , 4 , offset ); offset += 4; f->read ( &fixedDataSize , 4 , offset ); offset += 4; f->read ( &numUsedNodes , 4 , offset ); offset += 4; f->read ( &headNode , 4 , offset ); offset += 4; f->read ( &nextNode , 4 , offset ); offset += 4; f->read ( &minUnusedNode , 4 , offset ); offset += 4; f->read ( &doBalancing , sizeof(doBalancing) , offset ); offset += sizeof(doBalancing); f->read ( &ownData , sizeof(m_ownData ) , offset ) ; offset += sizeof(m_ownData); // return false on read error if ( g_errno ) { f->close(); log( LOG_ERROR, "db: read error: %s", mstrerror( g_errno ) ); return false; } // parms check if ( m_fixedDataSize != fixedDataSize || !doBalancing || m_ownData != ownData ) { f->close(); log(LOG_LOGIC,"db: rdbtree: fastload: Bad parms. File " "may be corrupt or a key attribute was changed in " "the code and is not reflected in this file."); return false; } // make sure size it right again int32_t nodeSize = (sizeof(collnum_t)+m_ks+4+4+4); int32_t minFileSize = header + minUnusedNode * nodeSize; minFileSize += minUnusedNode ; if ( fixedDataSize == -1 ) minFileSize += minUnusedNode * 4 ; //if ( fixedDataSize > 0 ) minFileSize += minUnusedNode *fixedDataSize; // if no data, sizes much match exactly if ( fixedDataSize == 0 && fsize != minFileSize ) { g_errno = EBADFILE; log( LOG_ERROR, "db: File size of %s is %" PRId32", should be %" PRId32". File may be corrupted.", f->getFilename(),fsize,minFileSize); f->close(); return false; } // does it fit? if ( fsize < minFileSize ) { g_errno = EBADFILE; log( LOG_ERROR, "db: File size of %s is %" PRId32", should >= %" PRId32". File may be corrupted.", f->getFilename(),fsize,minFileSize); f->close(); return false; } // make room if we don't have any if ( m_numNodes < minUnusedNode ) { log( LOG_INIT, "db: Growing tree to make room for %s", f->getFilename() ); if (!growTree_unlocked(minUnusedNode)) { f->close(); log( LOG_ERROR, "db: Failed to grow tree" ); return false; } } // we'll read this many int32_t start = 0; // reset corruption count m_corrupt = 0; // read block by block while ( start < minUnusedNode ) { // . returns next place to start scan // . incs m_numPositive/NegativeKeys and m_numUsedNodes // . incs m_memAllocated and m_memOccupied int32_t bytesRead = fastLoadBlock_unlocked(f, start, minUnusedNode, stack, offset) ; if ( bytesRead < 0 ) { f->close(); g_errno = errno; return false; } // inc the start start += BLOCK_SIZE; // and the offset offset += bytesRead; } // print corruption if ( m_corrupt ) { log( LOG_WARN, "admin: Loaded %" PRId32" corrupted recs in tree for %s.", m_corrupt, m_dbname ); } // remember total bytes read m_bytesRead = offset; // set these m_headNode = headNode; m_nextNode = nextNode; m_minUnusedNode = minUnusedNode; // check it if ( !checkTree_unlocked(false, true) ) { return fixTree_unlocked(); } // no longer needs save m_needsSave = false; return true; } // . return bytes loaded // . returns -1 and sets g_errno on error int32_t RdbTree::fastLoadBlock_unlocked(BigFile *f, int32_t start, int32_t totalNodes, RdbMem *stack, int64_t offset) { m_rwlock.verify_is_write_locked(); // set # ndoes to read int32_t n = totalNodes - start; if ( n > BLOCK_SIZE ) n = BLOCK_SIZE; // debug msg //log("reading block at %" PRId64", %" PRId32" nodes", // f->m_currentOffset, n ); int64_t oldOffset = offset; // . copy them in // . start reading at beginning of file f->read ( &m_collnums[start], n * sizeof(collnum_t) , offset ); offset += n * sizeof(collnum_t); f->read ( &m_keys [start*m_ks] , n * m_ks , offset ); offset += n * m_ks; f->read ( &m_left [start] , n * 4 , offset ); offset += n * 4; f->read ( &m_right [start] , n * 4 , offset ); offset += n * 4; f->read ( &m_parents[start] , n * 4 , offset ); offset += n * 4; f->read ( &m_depth[start] , n , offset ); offset += n; if ( m_fixedDataSize == -1 ) { f->read ( &m_sizes[start] , n * 4 , offset); offset += n * 4; } // return false on read error if ( g_errno ) { log( LOG_ERROR, "db: Failed to read %s: %s.", f->getFilename(), mstrerror(g_errno) ); return -1; } // get valid collnum ranges int32_t max = g_collectiondb.getNumRecs(); // sanity check //if ( max >= MAX_COLLS ) { g_process.shutdownAbort(true); } // define ending node for all loops int32_t end = start + n ; // store into tree in the appropriate nodes for ( int32_t i = start ; i < end ; i++ ) { // skip if empty if ( m_parents[i] == -2 ) continue; // watch out for bad collnums... corruption... collnum_t c = m_collnums[i]; if ( c < 0 || c >= max ) { m_corrupt++; continue; } // must have rec as well. unless it its statsdb tree // or m_waitingTree which are collection-less and always use // 0 for their collnum. if collection-less m_rdbId==-1. if ( ! g_collectiondb.getRec(c) && m_rdbId >= 0 ) { m_corrupt++; continue; } // keep a tally on all this m_memOccupied += m_overhead; increaseNodeCount_unlocked(c, getKey_unlocked(i)); } // bail now if we can if ( m_fixedDataSize == 0 ) return offset - oldOffset ; // how much should we read? int32_t bufSize = 0; if ( m_fixedDataSize == -1 ) { for ( int32_t i = start ; i < end ; i++ ) if ( m_parents[i] != -2 ) bufSize += m_sizes[i]; } else if ( m_fixedDataSize > 0 ) { for ( int32_t i = start ; i < end ; i++ ) if ( m_parents[i] != -2 ) bufSize += m_fixedDataSize; } // get space char *buf = (char *) stack->allocData(bufSize); if ( ! buf ) { log( LOG_ERROR, "db: Failed to allocate %" PRId32" bytes to read %s. Increase tree size for it in gb.conf.", bufSize,f->getFilename()); return -1; } // debug //log("reading %" PRId32" bytes of raw rec data", bufSize ); // establish end point char *bufEnd = buf + bufSize; // . read all into that buf // . this should block since callback is NULL f->read ( buf , bufSize , offset ) ; // return false on read error if ( g_errno ) return -1; // advance file offset offset += bufSize; // part it out now int32_t size = m_fixedDataSize; for ( int32_t i = start ; i < end ; i++ ) { // skip unused if ( m_parents[i] == -2 ) continue; // get size of his data if it's variable if ( m_fixedDataSize == -1 ) size = m_sizes[i]; // ensure we have the room if ( buf + size > bufEnd ) { g_errno = EBADFILE; log( LOG_ERROR, "db: Encountered record with corrupted size parameter of %" PRId32" in %s.", size, f->getFilename() ); return -1; } m_data[i] = buf; buf += size; // update these m_memAllocated += size; m_memOccupied += size; } return offset - oldOffset ; } // . caller should call f->set() himself // . we'll open it here // . returns false and sets g_errno on error (sometimes g_errno not set) void RdbTree::cleanTree() { ScopedWriteLock sl(m_rwlock); // some trees always use 0 for all node collnum_t's like // statsdb, waiting tree etc. if ( m_rdbId < 0 ) return; // the liberation count int32_t count = 0; collnum_t collnum; int32_t max = g_collectiondb.getNumRecs(); for ( int32_t i = 0 ; i < m_minUnusedNode ; i++ ) { // skip node if parents is -2 (unoccupied) if ( m_parents[i] == -2 ) continue; // is collnum valid? if ( m_collnums[i] >= 0 && m_collnums[i] < max && g_collectiondb.getRec(m_collnums[i]) ) continue; // if it is negtiave, remove it, that is wierd corruption if ( m_collnums[i] < 0 ) deleteNode_unlocked(i, true); // remove it otherwise // don't actually remove it!!!! in case collection gets // moved accidentally. // no... otherwise it can clog up the tree forever!!!! deleteNode_unlocked(i, true); count++; // save it collnum = m_collnums[i]; } // print it if ( count == 0 ) return; log(LOG_LOGIC,"db: Removed %" PRId32" records from %s tree for invalid " "collection number %i.",count,m_dbname,collnum); //log(LOG_LOGIC,"db: Records not actually removed for safety. Except " // "for those with negative colnums."); static bool s_print = true; if ( ! s_print ) return; s_print = false; log (LOG_LOGIC,"db: This is bad. Did you remove a collection " "subdirectory? Don't do that, you should use the \"delete " "collections\" interface because it also removes records from " "memory, too."); } bool RdbTree::isEmpty() const { ScopedReadLock sl(m_rwlock); return isEmpty_unlocked(); } bool RdbTree::isEmpty_unlocked() const { m_rwlock.verify_is_locked(); return (m_numUsedNodes == 0); } int32_t RdbTree::getNumNodes() const { ScopedReadLock sl(m_rwlock); return getNumNodes_unlocked(); } int32_t RdbTree::getNumNodes_unlocked() const { m_rwlock.verify_is_locked(); return m_minUnusedNode; } int32_t RdbTree::getNumUsedNodes() const { ScopedReadLock sl(m_rwlock); return getNumUsedNodes_unlocked(); } int32_t RdbTree::getNumUsedNodes_unlocked() const { m_rwlock.verify_is_locked(); return m_numUsedNodes; } int32_t RdbTree::getNumAvailNodes() const { ScopedReadLock sl(m_rwlock); return m_numNodes - m_numUsedNodes; } int32_t RdbTree::getNumNegativeKeys() const { ScopedReadLock sl(m_rwlock); return m_numNegativeKeys; } int32_t RdbTree::getNumPositiveKeys() const { ScopedReadLock sl(m_rwlock); return m_numPositiveKeys; } int32_t RdbTree::getNumNegativeKeys(collnum_t collnum) const { ScopedReadLock sl(m_rwlock); // fix for collectionless rdbs if ( m_rdbId < 0 ) return m_numNegativeKeys; /// @todo ALC thread safety? CollectionRec *cr = g_collectiondb.getRec(collnum); if ( ! cr ) return 0; return cr->m_numNegKeysInTree[(unsigned char)m_rdbId]; } int32_t RdbTree::getNumPositiveKeys(collnum_t collnum) const { ScopedReadLock sl(m_rwlock); // fix for collectionless rdbs if ( m_rdbId < 0 ) return m_numPositiveKeys; /// @todo ALC thread safety? CollectionRec *cr = g_collectiondb.getRec(collnum); if ( ! cr ) return 0; return cr->m_numPosKeysInTree[(unsigned char)m_rdbId]; }