privacore-open-source-searc.../RdbIndex.cpp
2018-08-31 12:11:16 +02:00

755 lines
21 KiB
C++

#include "RdbIndex.h"
#include "BigFile.h"
#include "Titledb.h" // For MAX_DOCID
#include "Process.h"
#include "BitOperations.h"
#include "Conf.h"
#include "Mem.h"
#include <set>
#include <unordered_set>
#include <algorithm>
#include "RdbTree.h"
#include "RdbBuckets.h"
#include "JobScheduler.h"
#include "ScopedLock.h"
#include "Errno.h"
#include "fctypes.h"
#include <fcntl.h>
#include <iterator>
static const int32_t s_defaultMaxPendingTimeMs = 5000;
static const uint32_t s_defaultMaxPendingSize = 2000000;
static const uint32_t s_defaultReserveSize = 3000000;
// larger number equals more memory used; but faster generateIndex
// 10000000 * 8 bytes = ~80 megabytes
static const uint32_t s_generateMaxPendingSize = 10000000;
static const uint32_t s_generateReserveSize = 11000000;
static const int64_t s_rdbIndexCurrentVersion = 0;
RdbIndex::RdbIndex()
: m_file()
, m_fixedDataSize(0)
, m_useHalfKeys(false)
, m_ks(0)
, m_rdbId(RDB_NONE)
, m_version(s_rdbIndexCurrentVersion)
, m_docIds(new docids_t)
, m_docIdsMtx()
, m_pendingMergeMtx()
, m_pendingMergeCond(PTHREAD_COND_INITIALIZER)
, m_pendingMerge(false)
, m_pendingDocIdsMtx()
, m_pendingDocIds(new docids_t)
, m_prevPendingDocId(MAX_DOCID + 1)
, m_lastMergeTime(gettimeofdayInMilliseconds())
, m_needToWrite(false)
, m_registeredCallback(false)
, m_generatingIndex(false) {
}
// dont save index on deletion!
RdbIndex::~RdbIndex() {
if (m_registeredCallback) {
g_loop.unregisterSleepCallback(this, &timedMerge);
}
ScopedLock sl(m_pendingMergeMtx);
while (m_pendingMerge) { // spurious wakeup
pthread_cond_wait(&m_pendingMergeCond, &(m_pendingMergeMtx.mtx));
}
}
void RdbIndex::reset(bool isStatic) {
m_file.reset();
/// @todo ALC do we need to lock here?
m_docIds.reset(new docids_t);
m_pendingDocIds.reset(new docids_t);
if (!isStatic) {
m_pendingDocIds->reserve(m_generatingIndex ? s_generateReserveSize : s_defaultReserveSize);
}
m_prevPendingDocId = MAX_DOCID + 1;
m_lastMergeTime = gettimeofdayInMilliseconds();
m_needToWrite = false;
}
void RdbIndex::clear() {
ScopedLock sl(m_pendingDocIdsMtx);
m_docIds.reset(new docids_t);
m_pendingDocIds.reset(new docids_t);
m_pendingDocIds->reserve(m_generatingIndex ? s_generateReserveSize : s_defaultReserveSize);
m_prevPendingDocId = MAX_DOCID + 1;
m_lastMergeTime = gettimeofdayInMilliseconds();
m_needToWrite = false;
}
void RdbIndex::timedMerge(int /*fd*/, void *state) {
RdbIndex *index = static_cast<RdbIndex*>(state);
ScopedLock sl(index->m_pendingMergeMtx);
// make sure there is only a single merge job at one time
if (index->m_pendingMerge || index->m_generatingIndex) {
return;
}
index->m_pendingMerge = g_jobScheduler.submit(mergePendingDocIds, NULL, state, thread_type_index_merge, 0);
}
void RdbIndex::mergePendingDocIds(void *state) {
RdbIndex *index = static_cast<RdbIndex*>(state);
ScopedLock sl(index->m_pendingDocIdsMtx);
// don't merge if it's empty
if (!index->m_pendingDocIds->empty()) {
if ((index->m_pendingDocIds->size() >= (index->m_generatingIndex ? s_generateMaxPendingSize : s_defaultMaxPendingSize)) ||
(gettimeofdayInMilliseconds() - index->m_lastMergeTime >= s_defaultMaxPendingTimeMs)) {
(void)index->mergePendingDocIds_unlocked();
}
}
ScopedLock sl2(index->m_pendingMergeMtx);
index->m_pendingMerge = false;
pthread_cond_signal(&(index->m_pendingMergeCond));
}
/// @todo ALC collapse RdbIndex::set into constructor
void RdbIndex::set(const char *dir, const char *indexFilename, int32_t fixedDataSize, bool useHalfKeys,
char keySize, rdbid_t rdbId, bool isStatic) {
logTrace(g_conf.m_logTraceRdbIndex, "BEGIN. dir [%s], indexFilename [%s]", dir, indexFilename);
reset(isStatic);
m_fixedDataSize = fixedDataSize;
m_file.set(dir, indexFilename);
m_useHalfKeys = useHalfKeys;
m_ks = keySize;
m_rdbId = rdbId;
if (!isStatic) {
/// if we're not merging/adding record we don't need to merge
m_registeredCallback = g_loop.registerSleepCallback(1000, this, &timedMerge, "RdbIndex::timedMerge");
}
}
bool RdbIndex::close(bool urgent) {
bool status = true;
if (m_needToWrite) {
status = writeIndex(true);
}
// clears and frees everything
if (!urgent) {
reset(true);
}
if (m_registeredCallback) {
m_registeredCallback = false;
g_loop.unregisterSleepCallback(this, &timedMerge);
}
return status;
}
bool RdbIndex::writeIndex(bool finalWrite) {
logTrace(g_conf.m_logTraceRdbIndex, "BEGIN. filename [%s]", m_file.getFilename());
if (g_conf.m_readOnlyMode) {
logTrace(g_conf.m_logTraceRdbIndex, "END. Read-only mode, not writing index. filename [%s]. Returning true.",
m_file.getFilename());
return true;
}
if (!m_needToWrite) {
logTrace(g_conf.m_logTraceRdbIndex, "END. no need, not writing index. filename [%s]. Returning true.",
m_file.getFilename());
return true;
}
log(LOG_INFO, "db: Saving %s", m_file.getFilename());
// open a new file
if (!m_file.open(O_RDWR | O_CREAT | O_TRUNC)) {
logError("END. Could not open %s for writing: %s. Returning false.", m_file.getFilename(), mstrerror(g_errno));
return false;
}
// write index data
bool status = writeIndex2(finalWrite);
// on success, we don't need to write it anymore
if (!status) {
m_needToWrite = true;
}
logTrace(g_conf.m_logTraceRdbIndex, "END. filename [%s], returning %s", m_file.getFilename(), status ? "true" : "false");
return status;
}
bool RdbIndex::writeIndex2(bool finalWrite) {
logTrace(g_conf.m_logTraceRdbIndex, "BEGIN. filename [%s]", m_file.getFilename());
g_errno = 0;
int64_t offset = 0LL;
// make sure we always write the newest tree
// remove const as m_file.write does not accept const buffer
ScopedLock sl(m_pendingDocIdsMtx);
docids_ptr_t tmpDocIds = std::const_pointer_cast<docids_t>(mergePendingDocIds_unlocked(finalWrite));
m_needToWrite = false;
sl.unlock();
// first 8 bytes is the index version
m_file.write(&m_version, sizeof(m_version), offset);
if (g_errno) {
logError("Failed to write to %s (m_version): %s", m_file.getFilename(), mstrerror(g_errno));
return false;
}
offset += sizeof(m_version);
// next 8 bytes are the total number of docids in the index file
size_t docid_count = tmpDocIds->size();
m_file.write(&docid_count, sizeof(docid_count), offset);
if (g_errno) {
logError("Failed to write to %s (docid_count): %s", m_file.getFilename(), mstrerror(g_errno));
return false;
}
offset += sizeof(docid_count);
if (docid_count) {
m_file.write(&(*tmpDocIds)[0], docid_count * sizeof((*tmpDocIds)[0]), offset);
if (g_errno) {
logError("Failed to write to %s (docids): %s", m_file.getFilename(), mstrerror(g_errno));
return false;
}
}
log(LOG_INFO, "db: Saved %zu index keys for %s", docid_count, getDbnameFromId(m_rdbId));
logTrace(g_conf.m_logTraceRdbIndex, "END - OK, returning true.");
return true;
}
bool RdbIndex::readIndex() {
logTrace(g_conf.m_logTraceRdbIndex, "BEGIN. filename [%s]", m_file.getFilename());
// bail if does not exist
if (!m_file.doesExist()) {
logError("Index file [%s] does not exist.", m_file.getFilename());
logTrace(g_conf.m_logTraceRdbIndex, "END. Returning false");
return false;
}
// . open the file
// . do not open O_RDONLY because if we are resuming a killed merge
// we will add to this index and write it back out.
if (!m_file.open(O_RDWR)) {
logError("Could not open index file %s for reading: %s.", m_file.getFilename(), mstrerror(g_errno));
logTrace(g_conf.m_logTraceRdbIndex, "END. Returning false");
return false;
}
bool status = readIndex2();
m_file.closeFds();
logTrace(g_conf.m_logTraceRdbIndex, "END. Returning %s", status ? "true" : "false");
// return status
return status;
}
bool RdbIndex::readIndex2() {
logTrace(g_conf.m_logTraceRdbIndex, "BEGIN. filename [%s]", m_file.getFilename());
g_errno = 0;
int64_t offset = 0;
size_t docid_count = 0;
// first 8 bytes is the index version
m_file.read(&m_version, sizeof(m_version), offset);
if (g_errno) {
logError("Had error reading offset=%" PRId64" from %s: %s", offset, m_file.getFilename(), mstrerror(g_errno));
return false;
}
offset += sizeof(m_version);
// next 8 bytes are the total number of docids in the index file
m_file.read(&docid_count, sizeof(docid_count), offset);
if (g_errno) {
logError("Had error reading offset=%" PRId64" from %s: %s", offset, m_file.getFilename(), mstrerror(g_errno));
return false;
}
offset += sizeof(docid_count);
docids_ptr_t tmpDocIds(new docids_t);
int64_t readSize = docid_count * sizeof((*tmpDocIds)[0]);
int64_t expectedFileSize = offset + readSize;
if (expectedFileSize != m_file.getFileSize()) {
logError("Index file size[%" PRId64"] differs from expected size[%" PRId64"]", m_file.getFileSize(), expectedFileSize);
return false;
}
tmpDocIds->resize(docid_count);
m_file.read(&(*tmpDocIds)[0], readSize, offset);
if (g_errno) {
logError("Had error reading offset=%" PRId64" from %s: %s", offset, m_file.getFilename(), mstrerror(g_errno));
return false;
}
logTrace(g_conf.m_logTraceRdbIndex, "END. Returning true with %zu docIds loaded", tmpDocIds->size());
// replace with new index
swapDocIds(tmpDocIds);
return true;
}
bool RdbIndex::verifyIndex() {
logTrace(g_conf.m_logTraceRdbIndex, "BEGIN. filename [%s]", m_file.getFilename());
if (m_version != s_rdbIndexCurrentVersion) {
logTrace(g_conf.m_logTraceRdbIndex, "END. Index format have changed m_version=%" PRId64" currentVersion=%" PRId64". Returning false",
m_version, s_rdbIndexCurrentVersion);
return false;
}
logTrace(g_conf.m_logTraceRdbIndex, "END. Returning true");
return true;
}
docidsconst_ptr_t RdbIndex::mergePendingDocIds(bool finalWrite) {
ScopedLock sl(m_pendingDocIdsMtx);
return mergePendingDocIds_unlocked(finalWrite);
}
docidsconst_ptr_t RdbIndex::mergePendingDocIds_unlocked(bool finalWrite) {
logTrace(g_conf.m_logTraceRdbIndex, "BEGIN %s[%p] finalWrite=%s", m_file.getFilename(), this, finalWrite ? "true" : "false");
// don't need to merge when there are no pending docIds
// except when it's forWrite then we need to free memory from vector
if (!finalWrite && m_pendingDocIds->empty()) {
logTrace(g_conf.m_logTraceRdbIndex, "END %s[%p]", m_file.getFilename(), this);
return getDocIds();
}
m_lastMergeTime = gettimeofdayInMilliseconds();
auto cmplt_fn = [](uint64_t a, uint64_t b) {
return (a & s_docIdMask) < (b & s_docIdMask);
};
auto cmpeq_fn = [](uint64_t a, uint64_t b) {
return (a & s_docIdMask) == (b & s_docIdMask);
};
// merge pending docIds into docIds
std::stable_sort(m_pendingDocIds->begin(), m_pendingDocIds->end(), cmplt_fn);
docids_ptr_t tmpDocIds(new docids_t);
auto docIds = getDocIds();
std::merge(docIds->begin(), docIds->end(), m_pendingDocIds->begin(), m_pendingDocIds->end(), std::back_inserter(*tmpDocIds), cmplt_fn);
// in reverse because we want to keep the newest entry
auto it = std::unique(tmpDocIds->rbegin(), tmpDocIds->rend(), cmpeq_fn);
tmpDocIds->erase(tmpDocIds->begin(), it.base());
if (finalWrite) {
// shrink memory usage
tmpDocIds->shrink_to_fit();
}
// replace existing even if size doesn't change (could change from positive to negative key)
swapDocIds(tmpDocIds);
if (finalWrite) {
// make sure memory is freed
m_pendingDocIds.reset(new docids_t);
} else {
m_pendingDocIds->clear();
m_pendingDocIds->reserve(m_generatingIndex ? s_generateReserveSize : s_defaultReserveSize);
}
logTrace(g_conf.m_logTraceRdbIndex, "END %s[%p]", m_file.getFilename(), this);
return getDocIds();
}
void RdbIndex::addRecord(const char *key) {
ScopedLock sl(m_pendingDocIdsMtx);
addRecord_unlocked(key);
}
void RdbIndex::addRecord_unlocked(const char *key) {
m_needToWrite = true;
if (m_rdbId == RDB_POSDB || m_rdbId == RDB2_POSDB2) {
if (key[0] & 0x02 || !(key[0] & 0x04)) {
//it is a 12-byte docid+pos or 18-byte termid+docid+pos key
uint64_t keyneg = !KEYNEG(key);
uint64_t doc_id = ((extract_bits(key, 58, 96) << s_docIdOffset) | keyneg);
if (doc_id != m_prevPendingDocId) {
m_pendingDocIds->push_back(doc_id);
m_prevPendingDocId = doc_id;
}
}
} else {
logError("Not implemented for dbname=%s", getDbnameFromId(m_rdbId));
gbshutdownLogicError();
}
if (m_generatingIndex && (m_pendingDocIds->size() >= s_generateMaxPendingSize)) {
(void)mergePendingDocIds_unlocked();
}
}
void RdbIndex::printIndex() {
auto docIds = getDocIds();
for (auto it = docIds->begin(); it != docIds->end(); ++it) {
logf(LOG_DEBUG, "inindex: docId=%" PRIu64" isDel=%d", (*it >> s_docIdOffset), ((*it & s_delBitMask) == 0));
}
ScopedLock sl(m_pendingDocIdsMtx);
for (auto it = m_pendingDocIds->begin(); it != m_pendingDocIds->end(); ++it) {
logf(LOG_DEBUG, "pending: docId=%" PRIu64" isDel=%d", (*it >> s_docIdOffset), ((*it & s_delBitMask) == 0));
}
}
void RdbIndex::addList(RdbList *list) {
// sanity check
if (list->getKeySize() != m_ks) {
g_process.shutdownAbort(true);
}
// . do not reset list, because of HACK in RdbDump.cpp we set m_listPtrHi < m_list
// so our first key can be a half key, calling resetListPtr()
// will reset m_listPtrHi and fuck it up
// bail now if it's empty
if (list->isEmpty()) {
return;
}
char key[MAX_KEY_BYTES];
ScopedLock sl(m_pendingDocIdsMtx);
for (; !list->isExhausted(); list->skipCurrentRecord()) {
list->getCurrentKey(key);
addRecord_unlocked(key);
}
}
bool RdbIndex::generateIndex(collnum_t collnum, const RdbTree *tree) {
reset(false);
if (g_conf.m_readOnlyMode) {
return false;
}
log(LOG_INFO, "db: Generating index for %s tree", getDbnameFromId(m_rdbId));
m_needToWrite = true;
m_generatingIndex = true;
// use extremes
const char *startKey = KEYMIN();
const char *endKey = KEYMAX();
int32_t numPosRecs = 0;
int32_t numNegRecs = 0;
RdbList list;
if (!tree->getList(collnum, startKey, endKey, -1, &list, &numPosRecs, &numNegRecs, m_useHalfKeys)) {
m_generatingIndex = false;
return false;
}
list.resetListPtr();
addList(&list);
// make sure it's all sorted and merged
(void)mergePendingDocIds();
m_generatingIndex = false;
return true;
}
bool RdbIndex::generateIndex(collnum_t collnum, const RdbBuckets *buckets) {
reset(false);
if (g_conf.m_readOnlyMode) {
return false;
}
log(LOG_INFO, "db: Generating index for %s buckets", getDbnameFromId(m_rdbId));
m_needToWrite = true;
m_generatingIndex = true;
// use extremes
const char *startKey = KEYMIN();
const char *endKey = KEYMAX();
int32_t numPosRecs = 0;
int32_t numNegRecs = 0;
RdbList list;
if (!buckets->getList(collnum, startKey, endKey, -1, &list, &numPosRecs, &numNegRecs, m_useHalfKeys)) {
m_generatingIndex = false;
return false;
}
list.resetListPtr();
addList(&list);
// make sure it's all sorted and merged
(void)mergePendingDocIds();
m_generatingIndex = false;
return true;
}
// . attempts to auto-generate from data file, f
// . returns false and sets g_errno on error
bool RdbIndex::generateIndex(BigFile *f) {
reset(false);
if (g_conf.m_readOnlyMode) {
return false;
}
if (!f->doesPartExist(0)) {
g_errno = EBADENGINEER;
log(LOG_WARN, "db: Cannot generate index for this headless data file");
return false;
}
log(LOG_INFO, "db: Generating index for %s/%s", f->getDir(), f->getFilename());
m_needToWrite = true;
m_generatingIndex = true;
// scan through all the recs in f
int64_t offset = 0;
const int64_t fileSize = f->getFileSize();
// if file is length 0, we don't need to do much
// g_errno should be set on error
if (fileSize == 0 || fileSize < 0) {
m_generatingIndex = false;
return (fileSize == 0);
}
// don't read in more than 10 megs at a time initially
int64_t bufSize = fileSize;
if (bufSize > 10 * 1024 * 1024) {
bufSize = 10 * 1024 * 1024;
}
char *buf = (char *)mmalloc(bufSize, "RdbIndex");
// use extremes
const char *startKey = KEYMIN();
const char *endKey = KEYMAX();
// a rec needs to be at least this big
// negative keys do not have the dataSize field
int32_t minRecSize = (m_fixedDataSize == -1) ? 0 : m_fixedDataSize;
// POSDB
if (m_ks == 18) {
minRecSize += 6;
} else if (m_useHalfKeys) {
minRecSize += m_ks - 6;
} else {
minRecSize += m_ks;
}
// for parsing the lists into records
char key[MAX_KEY_BYTES];
int64_t next = 0LL;
ScopedLock sl(m_pendingDocIdsMtx);
// read in at most "bufSize" bytes with each read
for (; offset < fileSize;) {
// keep track of how many bytes read in the log
if (offset >= next) {
if (next != 0) {
logf(LOG_INFO, "db: Read %" PRId64" bytes [%s]", next, f->getFilename());
}
next += 500000000; // 500MB
}
// our reads should always block
int64_t readSize = fileSize - offset;
if (readSize > bufSize) {
readSize = bufSize;
}
// if the readSize is less than the minRecSize, we got a bad cutoff
// so we can't go any more
if (readSize < minRecSize) {
mfree(buf, bufSize, "RdbIndex");
m_generatingIndex = false;
return true;
}
// otherwise, read it in
if (!f->read(buf, readSize, offset)) {
mfree(buf, bufSize, "RdbIndex");
m_generatingIndex = false;
log(LOG_WARN, "db: Failed to read %" PRId64" bytes of %s at offset=%" PRId64". Map generation failed.",
bufSize, f->getFilename(), offset);
return false;
}
RdbList list;
// set the list
list.set(buf, readSize, buf, readSize, startKey, endKey, m_fixedDataSize, false, m_useHalfKeys, m_ks);
// . HACK to fix useHalfKeys compression thing from one read to the nxt
// . "key" should still be set to the last record we read last read
if (offset > 0) {
// ... fix for posdb!!!
if (m_ks == 18) {
list.setListPtrLo(key + (m_ks - 12));
} else {
list.setListPtrHi(key + (m_ks - 6));
}
}
bool advanceOffset = true;
// . parse through the records in the list
for (; !list.isExhausted(); list.skipCurrentRecord()) {
char *rec = list.getCurrentRec();
if (rec + 64 > list.getListEnd() && offset + readSize < fileSize) {
// set up so next read starts at this rec that MAY have been cut off
offset += (rec - buf);
advanceOffset = false;
break;
}
// WARNING: when data is corrupted these may cause segmentation faults?
list.getCurrentKey(key);
int32_t recSize = list.getCurrentRecSize();
// don't chop keys
if (recSize < 6) {
m_generatingIndex = false;
log(LOG_WARN, "db: Got negative recsize of %" PRId32" at offset=%" PRId64, recSize,
offset + (rec - buf));
// @todo ALC do we want to abort here?
return false;
}
// do we have a breech?
if (rec + recSize > buf + readSize) {
// save old
int64_t oldOffset = offset;
// set up so next read starts at this rec that got cut off
offset += (rec - buf);
// . if we advanced nothing, then we'll end up looping forever
// . this will == 0 too, for big recs that did not fit in our
// read but we take care of that below
// . this can happen if merge dumped out half-ass
// . the write split a record...
if (rec - buf == 0 && recSize <= bufSize) {
m_generatingIndex = false;
log(LOG_WARN, "db: Index generation failed because last record in data file was split. Power failure while writing?");
// @todo ALC do we want to abort here?
return false;
}
// is our buf big enough to hold this type of rec?
if (recSize > bufSize) {
mfree(buf, bufSize, "RdbIndex");
bufSize = recSize;
buf = (char *)mmalloc(bufSize, "RdbIndex");
if (!buf) {
m_generatingIndex = false;
log(LOG_WARN, "db: Got error while generating the index file: %s. offset=%" PRIu64".",
mstrerror(g_errno), oldOffset);
return false;
}
}
// read again starting at the adjusted offset
advanceOffset = false;
break;
}
addRecord_unlocked(key);
}
if (advanceOffset) {
offset += readSize;
}
}
// don't forget to free this
mfree(buf, bufSize, "RdbIndex");
// make sure it's all sorted and merged
(void)mergePendingDocIds_unlocked();
m_generatingIndex = false;
// otherwise, we're done
return true;
}
docidsconst_ptr_t RdbIndex::getDocIds() {
ScopedLock sl(m_docIdsMtx);
return m_docIds;
}
bool RdbIndex::exist(uint64_t docId) {
ScopedLock sl(m_pendingDocIdsMtx);
auto docIds = getDocIds();
auto it = std::lower_bound(docIds->cbegin(), docIds->cend(), docId << RdbIndex::s_docIdOffset);
if (it != docIds->cend() && ((*it >> RdbIndex::s_docIdOffset) == docId)) {
return true;
}
auto cmplt_fn = [](uint64_t a, uint64_t b) {
return (a & s_docIdMask) < (b & s_docIdMask);
};
// std::lower_bound works on sorted list
std::stable_sort(m_pendingDocIds->begin(), m_pendingDocIds->end(), cmplt_fn);
it = std::lower_bound(m_pendingDocIds->cbegin(), m_pendingDocIds->cend(), docId << RdbIndex::s_docIdOffset);
return (it != m_pendingDocIds->cend() && ((*it >> RdbIndex::s_docIdOffset) == docId));
}
void RdbIndex::swapDocIds(docidsconst_ptr_t docIds) {
ScopedLock sl(m_docIdsMtx);
m_docIds.swap(docIds);
}