Move addList to a thread for RdbDump

This commit is contained in:
Ai Lin Chia 2017-06-09 13:47:49 +02:00
parent 623a11f74f
commit d4a95b2d10
2 changed files with 128 additions and 110 deletions

View File

@ -630,6 +630,124 @@ bool RdbDump::doneDumpingList() {
return rc;
}
void RdbDump::addListToRdbMapRdbIndex(void *state) {
RdbDump *that = static_cast<RdbDump*>(state);
// we're using the list multiple times. if hi/lo are 'hacked', then we need to save it when using it the second time.
const char *savedListPtrHi = that->m_list->getListPtrHi();
const char *savedListPtrLo = that->m_list->getListPtrLo();
// . register this with the map now
// . only register AFTER it's ALL on disk so we don't get partial
// record reads and we don't read stuff on disk that's also in tree
// . add the list to the rdb map if we have one
// . we don't have maps when we do unordered dumps
// . careful, map is NULL if we're doing unordered dump
if (that->m_map) {
int64_t t1 = gettimeofdayInMilliseconds();
bool triedToFix = false;
while (!that->m_map->addList(that->m_list)) {
// keys out of order in list from tree?
if (g_errno == ECORRUPTDATA) {
logError("m_map->addList resulted in ECORRUPTDATA");
if (that->m_tree) {
logError("trying to fix tree");
that->m_tree->fixTree();
}
if (that->m_buckets) {
logError("Contains buckets, cannot fix this yet");
that->m_list->printList(); //@@@@@@ EXCESSIVE
gbshutdownCorrupted();
}
if (triedToFix) {
logError("already tried to fix, exiting hard");
gbshutdownCorrupted();
}
triedToFix = true;
continue;
}
g_errno = ENOMEM;
// this should never happen now since we call prealloc() above
logError("Failed to add data to map, exiting hard");
gbshutdownCorrupted();
}
int64_t took = gettimeofdayInMilliseconds() - t1;
if (took > g_conf.m_logRdbMapAddListTimeThreshold) {
log(LOG_WARN, "db: adding to map took %" PRIu64" ms", took);
} else {
log(LOG_TIMING, "db: adding to map took %" PRIu64" ms", took);
}
}
if (that->m_index) {
// restore hi/lo ptr which was reset after generating map
that->m_list->setListPtrHi(savedListPtrHi);
that->m_list->setListPtrLo(savedListPtrLo);
int64_t t1 = gettimeofdayInMilliseconds();
that->m_index->addList(that->m_list);
int64_t took = gettimeofdayInMilliseconds() - t1;
if (took > g_conf.m_logRdbIndexAddListTimeThreshold) {
log(LOG_WARN, "db: adding to index took %" PRIu64" ms", took);
} else {
log(LOG_TIMING, "db: adding to index took %" PRIu64" ms", took);
}
}
// . HACK: fix hacked lists before deleting from tree
// . iff the first key has the half bit set
if (that->m_hacked) {
char tmp[MAX_KEY_BYTES];
char *p = that->m_list->getList() - 6 ;
gbmemcpy (tmp, p, 6);
gbmemcpy (p, p + 6, that->m_ks - 6);
gbmemcpy (p + (that->m_ks - 6), tmp, 6);
// undo the big hack
that->m_list->setList(p);
// make this work for POSDB...
that->m_list->setListPtr(p);
that->m_list->setListPtrLo(p + that->m_ks - 12);
that->m_list->setListPtrHi(p + that->m_ks - 6);
that->m_list->setListSize(that->m_list->getListSize() + 6);
// hack off the half bit, we're 12 bytes again
*p &= 0xfd;
// turn it off again just in case
that->m_hacked = false;
}
if (that->m_hacked12) {
char tmp[MAX_KEY_BYTES];
char *p = that->m_list->getList() - 12 ;
// swap high 12 bytes with low 6 bytes for first key
gbmemcpy (tmp, p, 12);
gbmemcpy (p, p + 12, 6);
gbmemcpy (p + 6, tmp, 12);
// big hack here
that->m_list->setList(p);
that->m_list->setListPtr(p);
that->m_list->setListPtrLo(p + 6);
that->m_list->setListPtrHi(p + 12);
that->m_list->setListSize(that->m_list->getListSize() + 12);
// hack off the half bit, we're 12 bytes again
*p &= 0xf9;
that->m_hacked12 = false;
}
}
void RdbDump::addedListToRdbMapRdbIndex(void *state, job_exit_t exit_type) {
RdbDump *that = static_cast<RdbDump*>(state);
that->continueDumping();
}
void RdbDump::doneReadingForVerifyWrapper ( void *state ) {
RdbDump *THIS = (RdbDump *)state;
@ -675,119 +793,16 @@ bool RdbDump::doneReadingForVerify ( ) {
gbshutdownCorrupted();
}
// we're using the list multiple times. if hi/lo are 'hacked', then we need to save it when using it the second time.
const char *savedListPtrHi = m_list->getListPtrHi();
const char *savedListPtrLo = m_list->getListPtrLo();
int64_t t1 = gettimeofdayInMilliseconds();
// . register this with the map now
// . only register AFTER it's ALL on disk so we don't get partial
// record reads and we don't read stuff on disk that's also in tree
// . add the list to the rdb map if we have one
// . we don't have maps when we do unordered dumps
// . careful, map is NULL if we're doing unordered dump
if (m_map) {
bool triedToFix = false;
while (!m_map->addList(m_list)) {
// keys out of order in list from tree?
if (g_errno == ECORRUPTDATA) {
logError("m_map->addList resulted in ECORRUPTDATA");
if (m_tree) {
logError("trying to fix tree");
m_tree->fixTree();
}
if (m_buckets) {
logError("Contains buckets, cannot fix this yet");
m_list->printList(); //@@@@@@ EXCESSIVE
gbshutdownCorrupted();
}
if (triedToFix) {
logError("already tried to fix, exiting hard");
gbshutdownCorrupted();
}
triedToFix = true;
continue;
}
g_errno = ENOMEM;
// this should never happen now since we call prealloc() above
logError("Failed to add data to map, exiting hard");
gbshutdownCorrupted();
}
if (m_callback && g_jobScheduler.submit(addListToRdbMapRdbIndex, addedListToRdbMapRdbIndex, this, thread_type_file_merge, 0)) {
logTrace(g_conf.m_logTraceRdbDump, "END - submited addListToRdbMapRdbIndex job, returning false" );
return false;
}
int64_t t2 = gettimeofdayInMilliseconds();
// running in main thread
addListToRdbMapRdbIndex(this);
{
int64_t took = t2 - t1;
if (took > g_conf.m_logRdbMapAddListTimeThreshold) {
log(LOG_WARN, "db: adding to map took %" PRIu64" ms", took);
} else {
log(LOG_TIMING, "db: adding to map took %" PRIu64" ms", took);
}
}
if (m_index) {
// restore hi/lo ptr which was reset after generating map
m_list->setListPtrHi(savedListPtrHi);
m_list->setListPtrLo(savedListPtrLo);
m_index->addList(m_list);
int64_t took = gettimeofdayInMilliseconds() - t2;
if (took > g_conf.m_logRdbIndexAddListTimeThreshold) {
log(LOG_WARN, "db: adding to index took %" PRIu64" ms", took);
} else {
log(LOG_TIMING, "db: adding to index took %" PRIu64" ms", took);
}
}
// . HACK: fix hacked lists before deleting from tree
// . iff the first key has the half bit set
if (m_hacked) {
char tmp[MAX_KEY_BYTES];
char *p = m_list->getList() - 6 ;
gbmemcpy (tmp, p, 6);
gbmemcpy (p, p + 6, m_ks - 6);
gbmemcpy (p + (m_ks - 6), tmp, 6);
// undo the big hack
m_list->setList(p);
// make this work for POSDB...
m_list->setListPtr(p);
m_list->setListPtrLo(p + m_ks - 12);
m_list->setListPtrHi(p + m_ks - 6);
m_list->setListSize(m_list->getListSize() + 6);
// hack off the half bit, we're 12 bytes again
*p &= 0xfd;
// turn it off again just in case
m_hacked = false;
}
if (m_hacked12) {
char tmp[MAX_KEY_BYTES];
char *p = m_list->getList() - 12 ;
// swap high 12 bytes with low 6 bytes for first key
gbmemcpy (tmp, p, 12);
gbmemcpy (p, p + 12, 6);
gbmemcpy (p + 6, tmp, 12);
// big hack here
m_list->setList(p);
m_list->setListPtr(p);
m_list->setListPtrLo(p + 6);
m_list->setListPtrHi(p + 12);
m_list->setListSize(m_list->getListSize() + 12);
// hack off the half bit, we're 12 bytes again
*p &= 0xf9;
m_hacked12 = false;
}
// don't need to call addedListToRdbMapRdbIndex here because it's just continuing the dump
// which we will when returning true
logTrace( g_conf.m_logTraceRdbDump, "END - OK, returning true" );
return true;

View File

@ -66,6 +66,9 @@ private:
void doneDumping();
bool doneReadingForVerify();
static void addListToRdbMapRdbIndex(void *state);
static void addedListToRdbMapRdbIndex(void *state, job_exit_t /*exit_type*/);
// called when we've finished writing an RdbList to the file
bool doneDumpingList();
void continueDumping();