Move preparation of global index to thread as well

This commit is contained in:
Ai Lin Chia
2017-05-31 12:07:07 +02:00
parent 3404178dad
commit 04a589e806
2 changed files with 27 additions and 20 deletions

@ -2531,13 +2531,13 @@ void RdbBase::finalizeGlobalIndexThread() {
m_globalIndexThreadQueue.finalize();
}
docids_ptr_t RdbBase::prepareGlobalIndexJob(bool markFileReadable, int32_t fileId) {
std::vector<std::pair<int32_t, docidsconst_ptr_t>> RdbBase::prepareGlobalIndexJob(bool markFileReadable, int32_t fileId) {
ScopedLock sl(m_mtxFileInfo);
return prepareGlobalIndexJob_unlocked(markFileReadable, fileId);
}
docids_ptr_t RdbBase::prepareGlobalIndexJob_unlocked(bool markFileReadable, int32_t fileId) {
docids_ptr_t tmpDocIdFileIndex(new docids_t);
std::vector<std::pair<int32_t, docidsconst_ptr_t>> RdbBase::prepareGlobalIndexJob_unlocked(bool markFileReadable, int32_t fileId) {
std::vector<std::pair<int32_t, docidsconst_ptr_t>> docIdFileIndexes;
// global index does not include RdbIndex from tree/buckets
for (int32_t i = 0; i < m_numFiles; i++) {
@ -2546,16 +2546,11 @@ docids_ptr_t RdbBase::prepareGlobalIndexJob_unlocked(bool markFileReadable, int3
}
if(m_fileInfo[i].m_allowReads || m_fileInfo[i].m_pendingGenerateIndex) {
auto docIds = m_fileInfo[i].m_index->getDocIds();
tmpDocIdFileIndex->reserve(tmpDocIdFileIndex->size() + docIds->size());
std::transform(docIds->begin(), docIds->end(), std::back_inserter(*tmpDocIdFileIndex),
[i](uint64_t docId) {
return ((docId << s_docIdFileIndex_docIdOffset) | i); // docId has delete key
});
docIdFileIndexes.emplace_back(i, m_fileInfo[i].m_index->getDocIds());
}
}
return tmpDocIdFileIndex;
return docIdFileIndexes;
}
void RdbBase::submitGlobalIndexJob(bool markFileReadable, int32_t fileId) {
@ -2593,25 +2588,37 @@ void RdbBase::generateGlobalIndex(void *item) {
log(LOG_INFO, "db: Processing job %p to generate global index", item);
std::stable_sort(queueItem->m_docIdFileIndex->begin(), queueItem->m_docIdFileIndex->end(),
docids_ptr_t tmpDocIdFileIndex(new docids_t);
for (auto it = queueItem->m_docIdFileIndexes.begin(); it != queueItem->m_docIdFileIndexes.end(); ++it) {
auto i = it->first;
const auto &docIds = it->second;
tmpDocIdFileIndex->reserve(tmpDocIdFileIndex->size() + docIds->size());
std::transform(docIds->begin(), docIds->end(), std::back_inserter(*tmpDocIdFileIndex),
[i](uint64_t docId) {
return ((docId << s_docIdFileIndex_docIdOffset) | i); // docId has delete key
});
}
std::stable_sort(tmpDocIdFileIndex->begin(), tmpDocIdFileIndex->end(),
[](uint64_t a, uint64_t b) {
return (a & s_docIdFileIndex_docIdMask) < (b & s_docIdFileIndex_docIdMask);
});
// in reverse because we want to keep the highest file position
auto it = std::unique(queueItem->m_docIdFileIndex->rbegin(), queueItem->m_docIdFileIndex->rend(),
auto it = std::unique(tmpDocIdFileIndex->rbegin(), tmpDocIdFileIndex->rend(),
[](uint64_t a, uint64_t b) {
return (a & s_docIdFileIndex_docIdMask) == (b & s_docIdFileIndex_docIdMask);
});
queueItem->m_docIdFileIndex->erase(queueItem->m_docIdFileIndex->begin(), it.base());
tmpDocIdFileIndex->erase(tmpDocIdFileIndex->begin(), it.base());
// free up used space
queueItem->m_docIdFileIndex->shrink_to_fit();
tmpDocIdFileIndex->shrink_to_fit();
// replace with new index
ScopedLock sl(queueItem->m_base->m_mtxFileInfo);
ScopedLock sl2(queueItem->m_base->m_docIdFileIndexMtx);
queueItem->m_base->m_docIdFileIndex.swap(queueItem->m_docIdFileIndex);
queueItem->m_base->m_docIdFileIndex.swap(tmpDocIdFileIndex);
if (queueItem->m_markFileReadable) {
for (auto i = 0; i < queueItem->m_base->m_numFiles; ++i) {

@ -222,15 +222,15 @@ public:
static void generateGlobalIndex(void *item);
struct ThreadQueueItem {
ThreadQueueItem(RdbBase *base, docids_ptr_t docIdFileIndex, bool markFileReadable, int32_t fileId)
ThreadQueueItem(RdbBase *base, std::vector<std::pair<int32_t, docidsconst_ptr_t>> docIdFileIndexes, bool markFileReadable, int32_t fileId)
: m_base(base)
, m_docIdFileIndex(docIdFileIndex)
, m_docIdFileIndexes(docIdFileIndexes)
, m_markFileReadable(markFileReadable)
, m_fileId(fileId) {
}
RdbBase *m_base;
docids_ptr_t m_docIdFileIndex;
std::vector<std::pair<int32_t, docidsconst_ptr_t>> m_docIdFileIndexes;
bool m_markFileReadable;
int32_t m_fileId;
};
@ -250,8 +250,8 @@ public:
static const uint64_t s_docIdFileIndex_filePosMask = 0x000000000000ffffULL;
private:
docids_ptr_t prepareGlobalIndexJob(bool markFileReadable, int32_t fileId);
docids_ptr_t prepareGlobalIndexJob_unlocked(bool markFileReadable, int32_t fileId);
std::vector<std::pair<int32_t, docidsconst_ptr_t>> prepareGlobalIndexJob(bool markFileReadable, int32_t fileId);
std::vector<std::pair<int32_t, docidsconst_ptr_t>> prepareGlobalIndexJob_unlocked(bool markFileReadable, int32_t fileId);
void selectFilesToMerge(int32_t mergeNum, int32_t numFiles, int32_t *p_mini);