Move add key logic into RdbIndex::addRecord

This commit is contained in:
Ai Lin Chia
2016-08-17 17:42:07 +02:00
parent aa599cd59d
commit 4639be504c
2 changed files with 60 additions and 114 deletions

@ -6,16 +6,39 @@
#include "Process.h"
#include "BitOperations.h"
#include "Conf.h"
#include <iterator>
RdbIndex::RdbIndex() {
reset();
RdbIndex::RdbIndex()
: m_file()
, m_docIds()
, m_fixedDataSize(0)
, m_useHalfKeys(false)
, m_ks(0)
, m_rdbId(RDB_NONE)
, m_prevDocId(MAX_DOCID + 1)
, m_needToSort(false)
, m_startSortPos(0)
, m_sortCount(0)
, m_needToWrite(false) {
m_docIds.reserve(20000000);
}
// dont save index on deletion!
RdbIndex::~RdbIndex() {
reset();
}
void RdbIndex::reset() {
m_file.reset();
m_docIds.clear();
m_docIds.reserve(20000000);
m_prevDocId = MAX_DOCID + 1;
m_needToSort = false;
m_startSortPos = 0;
m_sortCount = 0;
m_needToWrite = false;
}
/// @todo collapse RdbIndex::set into constructor
void RdbIndex::set(const char *dir, const char *indexFilename,
int32_t fixedDataSize , bool useHalfKeys , char keySize, char rdbId) {
logTrace(g_conf.m_logTraceRdbIndex, "BEGIN. dir [%s], indexFilename [%s]", dir, indexFilename);
@ -43,22 +66,6 @@ bool RdbIndex::close(bool urgent) {
return status;
}
void RdbIndex::reset() {
//@todo: IMPLEMENT!
// log( LOG_ERROR,"%s:%s: NOT IMPLEMENTED YET", __FILE__, __func__);
m_lastDocId = MAX_DOCID + 1;
m_needToWrite = false;
m_docIds.clear();
//@@@ free mem here
m_file.reset();
}
bool RdbIndex::writeIndex() {
logTrace(g_conf.m_logTraceRdbIndex, "BEGIN. filename [%s]", m_file.getFilename());
@ -80,7 +87,6 @@ bool RdbIndex::writeIndex() {
return false;
}
// write index data
bool status = writeIndex2();
@ -89,7 +95,6 @@ bool RdbIndex::writeIndex() {
m_needToWrite = false;
}
logTrace(g_conf.m_logTraceRdbIndex, "END. filename [%s], returning %s", m_file.getFilename(), status ? "true" : "false");
return status;
@ -107,7 +112,7 @@ bool RdbIndex::writeIndex2() {
size_t docid_count = m_docIds.size();
m_file.write(&docid_count, sizeof(docid_count), offset);
if ( g_errno ) {
if (g_errno) {
logError("Failed to write to %s (docid_count): %s", m_file.getFilename(), mstrerror(g_errno))
return false;
}
@ -115,7 +120,7 @@ bool RdbIndex::writeIndex2() {
offset += sizeof(docid_count);
m_file.write(&m_docIds[0], docid_count * sizeof(m_docIds[0]), offset);
if ( g_errno ) {
if (g_errno) {
logError("Failed to write to %s (docids): %s", m_file.getFilename(), mstrerror(g_errno))
return false;
}
@ -184,28 +189,41 @@ bool RdbIndex::readIndex2() {
}
bool RdbIndex::addRecord(char rdbId, char *key) {
if (rdbId == RDB_POSDB || rdbId == RDB2_POSDB2) {
void RdbIndex::addRecord(char *key) {
if (m_rdbId == RDB_POSDB || m_rdbId == RDB2_POSDB2) {
if (key[0] & 0x02 || !(key[0] & 0x04)) {
//it is a 12-byte docid+pos or 18-byte termid+docid+pos key
uint64_t doc_id = extract_bits(key, 58, 96);
if (doc_id != m_lastDocId) {
log(LOG_ERROR, "@@@ GOT DocId %" PRIu64 "", doc_id);
if (doc_id != m_prevDocId) {
m_docIds.push_back(doc_id);
m_prevDocId = doc_id;
m_lastDocId = doc_id;
//@todo: IMPLEMENT!
logError("ADD TO INDEX - NOT IMPLEMENTED YET");
m_needToSort = true;
++m_sortCount;
m_needToWrite = true;
}
}
} else {
logError("Not implemented for dbname=%s", getDbnameFromId(rdbId));
logError("Not implemented for dbname=%s", getDbnameFromId(m_rdbId));
gbshutdownLogicError();
}
return true;
// make sure our docids don't get too large
if (m_sortCount >= 20000000) {
auto startIt = std::next(m_docIds.begin(), m_startSortPos);
std::sort(startIt, m_docIds.end());
m_docIds.erase(std::unique(startIt, m_docIds.end()), m_docIds.end());
// do a full sort in this case
if (m_docIds.size() >= 20000000) {
std::sort(m_docIds.begin(), m_docIds.end());
m_docIds.erase(std::unique(m_docIds.begin(), m_docIds.end()), m_docIds.end());
}
m_sortCount = 0;
m_startSortPos = m_docIds.size() - 1;
}
}
@ -254,48 +272,14 @@ bool RdbIndex::generateIndex(RdbBuckets *buckets, collnum_t collnum) {
return false;
}
int64_t total = 0;
uint64_t count = 0;
char key[MAX_KEY_BYTES];
m_docIds.reserve(20000000);
for (list.resetListPtr(); !list.isExhausted(); list.skipCurrentRecord()) {
// make sure our docids don't get too large
if (count >= 20000000) {
std::sort(m_docIds.begin(), m_docIds.end());
m_docIds.erase(std::unique(m_docIds.begin(), m_docIds.end()), m_docIds.end());
count = 0;
}
list.getCurrentKey(key);
if (m_rdbId == RDB_POSDB || m_rdbId == RDB2_POSDB2) {
if (key[0] & 0x02 || !(key[0] & 0x04)) {
//it is a 12-byte docid+pos or 18-byte termid+docid+pos key
static uint64_t prev_docid = 0;
uint64_t docid = extract_bits(key, 58, 96);
if (prev_docid != docid) {
m_docIds.push_back(docid);
++total;
++count;
}
prev_docid = docid;
}
}
addRecord(key);
}
if (!m_docIds.empty()) {
std::sort(m_docIds.begin(), m_docIds.end());
m_docIds.erase(std::unique(m_docIds.begin(), m_docIds.end()), m_docIds.end());
m_needToWrite = true;
}
//logError("TODO NOT IMPLEMENTED YET");
return true;
}
@ -370,8 +354,6 @@ bool RdbIndex::generateIndex(BigFile *f) {
m_docIds.reserve(20000000);
uint64_t docid = 0;
int64_t total = 0;
uint64_t count = 0;
// read in at most "bufSize" bytes with each read
readLoop:
@ -384,13 +366,6 @@ readLoop:
next += 500000000; // 500MB
}
// make sure our docids don't get too large
if (count >= 20000000) {
std::sort(m_docIds.begin(), m_docIds.end());
m_docIds.erase(std::unique(m_docIds.begin(), m_docIds.end()), m_docIds.end());
count = 0;
}
// our reads should always block
int64_t readSize = fileSize - offset;
if ( readSize > bufSize ) {
@ -500,39 +475,7 @@ nextRec:
goto readLoop;
}
if (m_rdbId == RDB_POSDB || m_rdbId == RDB2_POSDB2) {
if (key[0] & 0x02 || !(key[0] & 0x04)) {
//it is a 12-byte docid+pos or 18-byte termid+docid+pos key
static uint64_t prev_docid = 0;
docid = extract_bits(key, 58, 96);
//logf(LOG_DEBUG, "%lu", docid);
if (prev_docid != docid) {
// result = unique_docids_set.insert(docid);
// if (result.second) {
m_docIds.push_back(docid);
++total;
++count;
// }
}
prev_docid = docid;
}
}
// if (!addRecord(key, rec, recSize)) {
// // if it was key out of order, it might be because the
// // power went out and we ended up writing a a few bytes of
// // garbage then a bunch of 0's at the end of the file.
// // if the truncate works out then we are done.
// if (g_errno == ECORRUPTDATA && truncateFile(f)) {
// goto done;
// }
//
// // otherwise, give it up
// mfree(buf, bufSize, "RdbIndex");
// log(LOG_WARN, "db: Index generation failed: %s.", mstrerror(g_errno));
// return false;
// }
addRecord(key);
// skip current good record now
if (list.skipCurrentRecord()) {
@ -558,8 +501,6 @@ done:
m_needToWrite = true;
}
logf(LOG_DEBUG, "@@@ totalRec=%lu unique=%lu", total, m_docIds.size());
// // if there was bad data we probably added out of order keys
// if (m_needVerify) {
// logError("Fixing map for [%s]. Added at least %" PRId64" bad keys.", f->getFilename(), m_badKeys);

@ -54,7 +54,7 @@ public:
bool generateIndex(RdbBuckets *buckets, collnum_t collnum);
bool generateIndex(RdbTree *tree, collnum_t collnum);
bool addRecord ( char rdbId, char *key);
void addRecord(char *key);
private:
void printIndex();
@ -69,7 +69,12 @@ private:
char m_ks;
char m_rdbId;
uint64_t m_lastDocId;
uint64_t m_prevDocId;
bool m_needToSort;
auto m_startSortPos;
unsigned m_sortCount;
// when close is called, must we write the index?
bool m_needToWrite;