Fix merge list when posdb is merging

This commit is contained in:
Ai Lin Chia 2018-01-05 11:21:51 +01:00
parent 17ec77139c
commit e0e24b6498
5 changed files with 68 additions and 39 deletions

View File

@ -445,11 +445,15 @@ bool Msg5::readList ( ) {
// and smaller than that when a file is large
// . but just to be save reading an extra 2% won't hurt too much
if ( base->useHalfKeys() ) {
int32_t numSources = m_numFiles;
if ( numSources == -1 )
numSources = base->getNumFiles();
m_numSources = m_numFiles;
if (m_numSources == -1) {
m_numSources = base->getNumFiles();
}
// if tree is empty, don't count it
if ( m_includeTree && ! m_treeList.isEmpty() ) numSources++;
if (m_includeTree && !m_treeList.isEmpty()) {
m_numSources++;
}
// . if we don't do a merge then we return the list directly
// (see condition where m_numListPtrs == 1 below)
// from Msg3 (or tree) and we must hit minRecSizes as
@ -459,7 +463,7 @@ bool Msg5::readList ( ) {
// the single list we get back from Msg3 will not have
// been constrained with m_minRecSizes, but constrained
// with m_newMinRecSizes (x2%) and be too big for our UdpSlot
if ( numSources >= 2 ) {
if ( m_numSources >= 2 ) {
int64_t newmin = (int64_t)m_newMinRecSizes ;
newmin = (newmin * 50LL) / 49LL ;
// watch out for wrap around
@ -1145,7 +1149,8 @@ void Msg5::mergeLists() {
// one of them was PROBABLY in the dump queue and we decided in
// Rdb::addRecord() NOT to do the annihilation, therefore it's good
// to do the merge to do the annihilation
m_list->merge_r(m_listPtrs, m_numListPtrs, m_startKey, m_minEndKey, m_minRecSizes, m_removeNegRecs, m_rdbId, m_collnum, m_startFileNum, m_isRealMerge);
m_list->merge_r(m_listPtrs, m_numListPtrs, m_startKey, m_minEndKey, m_minRecSizes, m_removeNegRecs, m_rdbId, m_collnum,
m_numSources, m_startFileNum, m_isRealMerge);
// maintain this info for truncation purposes
if ( m_list->isLastKeyValid() )

1
Msg5.h
View File

@ -123,6 +123,7 @@ private:
bool m_includeTree;
int32_t m_numFiles;
int32_t m_numSources;
int32_t m_startFileNum;
int32_t m_minRecSizes;
rdbid_t m_rdbId;

View File

@ -1759,7 +1759,8 @@ bool RdbList::posdbConstrain(const char *startKey, char *endKey, int32_t minRecS
// . CAUTION: you should call constrain() on all "lists" before calling this
// so we don't have to do boundary checks on the keys here
void RdbList::merge_r(RdbList **lists, int32_t numLists, const char *startKey, const char *endKey, int32_t minRecSizes,
bool removeNegRecs, rdbid_t rdbId, collnum_t collNum, int32_t startFileNum, bool isRealMerge) {
bool removeNegRecs, rdbid_t rdbId, collnum_t collNum, int32_t totalFiles, int32_t startFileNum,
bool isRealMerge) {
assert(this);
verify_signature();
// sanity
@ -1834,7 +1835,7 @@ void RdbList::merge_r(RdbList **lists, int32_t numLists, const char *startKey, c
Rdb* rdb = getRdbFromId(rdbId);
if (rdbId == RDB_POSDB || rdbId == RDB2_POSDB2) {
posdbMerge_r(lists, numLists, startKey, endKey, m_mergeMinListSize, rdbId, removeNegRecs, rdb->isUseIndexFile(), collNum, startFileNum, isRealMerge);
posdbMerge_r(lists, numLists, startKey, endKey, m_mergeMinListSize, rdbId, removeNegRecs, rdb->isUseIndexFile(), collNum, totalFiles, startFileNum, isRealMerge);
verify_signature();
return;
}
@ -2160,6 +2161,22 @@ int getPtrIndex(RdbList **lists, int32_t numLists, const char *ptr) {
return -1;
}
int getListOffset(RdbBase *base, int ptrIndex, int32_t oriNumLists, int32_t startFileIndex, int32_t totalFiles) {
// all files are readable
if (totalFiles == oriNumLists) {
return 0;
}
int listOffset = 0;
for (int i = 0; i < base->getNumFiles() && i <= ptrIndex; ++i) {
if (!base->isReadable(i)) {
++listOffset;
}
}
return listOffset;
}
////////
//
// SPECIALTY MERGE FOR POSDB
@ -2167,7 +2184,8 @@ int getPtrIndex(RdbList **lists, int32_t numLists, const char *ptr) {
///////
bool RdbList::posdbMerge_r(RdbList **lists, int32_t numLists, const char *startKey, const char *endKey, int32_t minRecSizes,
rdbid_t rdbId, bool removeNegKeys, bool useIndexFile, collnum_t collNum, int32_t startFileIndex, bool isRealMerge) {
rdbid_t rdbId, bool removeNegKeys, bool useIndexFile, collnum_t collNum, int32_t totalFiles,
int32_t startFileIndex, bool isRealMerge) {
logTrace(g_conf.m_logTraceRdbList, "BEGIN");
int oriNumLists = numLists;
@ -2411,13 +2429,17 @@ bool RdbList::posdbMerge_r(RdbList **lists, int32_t numLists, const char *startK
logTrace(g_conf.m_logTraceRdbList, "Found docId=%" PRIu64" with filePos=%" PRId32, docId, filePos);
int prtIndex = getPtrIndex(lists, oriNumLists, ends[mini]);
if (filePos > prtIndex + startFileIndex) {
// docId is present in newer file
logTrace(g_conf.m_logTraceRdbList, "docId in newer list. skip. filePos=%d mini=%hd listOffset=%d startFileIndex=%d ptrIndex=%d",
filePos, mini, listOffset, startFileIndex, prtIndex);
int ptrIndex = getPtrIndex(lists, oriNumLists, ends[mini]);
if (ptrIndex >= 0) {
int listOffset = getListOffset(base, ptrIndex, oriNumLists, startFileIndex, totalFiles);
if (filePos > ptrIndex + listOffset + startFileIndex) {
// docId is present in newer file
logTrace(g_conf.m_logTraceRdbList,
"docId in newer list. skip. filePos=%d mini=%hd listOffset=%d startFileIndex=%d ptrIndex=%d",
filePos, mini, listOffset, startFileIndex, ptrIndex);
goto skip;
goto skip;
}
}
}

View File

@ -198,7 +198,7 @@ public:
// . set our startKey/endKey to "startKey"/"endKey"
// . exclude any records from lists not in that range
void merge_r(RdbList **lists, int32_t numLists, const char *startKey, const char *endKey, int32_t minRecSizes,
bool removeNegRecs, rdbid_t rdbId, collnum_t collNum, int32_t startFileNum, bool isRealMerge);
bool removeNegRecs, rdbid_t rdbId, collnum_t collNum, int32_t totalFiles, int32_t startFileNum, bool isRealMerge);
bool growList(int32_t newSize);
@ -274,7 +274,8 @@ private:
int32_t hintOffset, const char *hintKey, const char *filename);
bool posdbMerge_r(RdbList **lists, int32_t numLists, const char *startKey, const char *endKey, int32_t minRecSizes,
rdbid_t rdbId, bool removeNegKeys, bool useIndexFile, collnum_t collNum, int32_t startFileIndex, bool isRealMerge);
rdbid_t rdbId, bool removeNegKeys, bool useIndexFile, collnum_t collNum, int32_t totalFiles,
int32_t startFileIndex, bool isRealMerge);
// the unalterd raw list. keys may be outside of [m_startKey,m_endKey]
char *m_list;

View File

@ -75,7 +75,7 @@ TEST_F(RdbListTest, MergeTestPosdbEmptyAll) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, true, RDB_POSDB, 0, 0, false);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, true, RDB_POSDB, 0, 2, 0, false);
// verify merged list
EXPECT_EQ(0, final1.getListSize());
@ -109,7 +109,7 @@ TEST_F(RdbListTest, MergeTestPosdbEmptyOneFirst) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, true, RDB_POSDB, 0, 0, true);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, true, RDB_POSDB, 0, 2, 0, true);
// verify merged list
EXPECT_EQ(list1.getListSize(), final1.getListSize());
@ -157,7 +157,7 @@ TEST_F(RdbListTest, MergeTestPosdbEmptyOneMiddle) {
RdbList final2;
final2.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final2.prepareForMerge(lists2, lists2_size, -1);
final2.merge_r(lists2, lists2_size, KEYMIN(), KEYMAX(), -1, true, RDB_POSDB, 0, 0, true);
final2.merge_r(lists2, lists2_size, KEYMIN(), KEYMAX(), -1, true, RDB_POSDB, 0, 3, 0, true);
// verify merged list
EXPECT_EQ(list3.getListSize(), final2.getListSize());
@ -195,7 +195,7 @@ TEST_F(RdbListTest, MergeTestPosdbEmptyOneLast) {
RdbList final2;
final2.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final2.prepareForMerge(lists2, lists2_size, -1);
final2.merge_r(lists2, lists2_size, KEYMIN(), KEYMAX(), -1, true, RDB_POSDB, 0, 0, true);
final2.merge_r(lists2, lists2_size, KEYMIN(), KEYMAX(), -1, true, RDB_POSDB, 0, 2, 0, true);
// verify merged list
EXPECT_EQ(list2.getListSize(), final2.getListSize());
@ -239,7 +239,7 @@ TEST_F(RdbListTest, MergeTestPosdbVerifyListOrderFirst) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, true, RDB_POSDB, 0, 0, false);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, true, RDB_POSDB, 0, 2, 0, false);
// verify merged list
EXPECT_EQ(0, final1.getListSize());
@ -278,7 +278,7 @@ TEST_F(RdbListTest, MergeTestPosdbVerifyListOrderLast) {
RdbList final2;
final2.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final2.prepareForMerge(lists2, lists2_size, -1);
final2.merge_r(lists2, lists2_size, KEYMIN(), KEYMAX(), -1, true, RDB_POSDB, 0, 0, true);
final2.merge_r(lists2, lists2_size, KEYMIN(), KEYMAX(), -1, true, RDB_POSDB, 0, 2, 0, true);
// verify merged list
EXPECT_EQ(list2.getListSize(), final2.getListSize());
@ -323,7 +323,7 @@ TEST_F(RdbListTest, MergeTestPosdbVerifyRemoveNegRecords) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, true, RDB_POSDB, 0, 0, true);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, true, RDB_POSDB, 0, 2, 0, true);
// verify merged list
EXPECT_EQ(0, final1.getListSize());
@ -332,7 +332,7 @@ TEST_F(RdbListTest, MergeTestPosdbVerifyRemoveNegRecords) {
RdbList final2;
final2.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final2.prepareForMerge(lists1, lists1_size, -1);
final2.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, 0, 0, true);
final2.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, 0, 2, 0, true);
// verify merged list
EXPECT_EQ(list2.getListSize(), final2.getListSize());
@ -372,7 +372,7 @@ TEST_F(RdbListTest, MergeTestTitledb) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Titledb::getFixedDataSize(), true, Titledb::getUseHalfKeys(), Titledb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, startKey, endKey, -1, false, RDB_TITLEDB, 0, 0, false);
final1.merge_r(lists1, lists1_size, startKey, endKey, -1, false, RDB_TITLEDB, 0, 2, 0, false);
// verify merged list
int i = 1;
@ -420,7 +420,7 @@ TEST_F(RdbListTest, MergeTestTitledbDelEndKey) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Titledb::getFixedDataSize(), true, Titledb::getUseHalfKeys(), Titledb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, startKey, endKey, -1, false, RDB_TITLEDB, 0, 0, false);
final1.merge_r(lists1, lists1_size, startKey, endKey, -1, false, RDB_TITLEDB, 0, 2, 0, false);
// verify merged list
int i = 1;
@ -468,7 +468,7 @@ TEST_F(RdbListTest, MergeTestTitledbDoubleDelEndKey) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Titledb::getFixedDataSize(), true, Titledb::getUseHalfKeys(), Titledb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, startKey, endKey, -1, false, RDB_TITLEDB, 0, 0, false);
final1.merge_r(lists1, lists1_size, startKey, endKey, -1, false, RDB_TITLEDB, 0, 2, 0, false);
// verify merged list
int i = 1;
@ -564,7 +564,7 @@ TEST_F(RdbListNoMergeTest, MergeTestPosdbSingleDocSpiderSpiderSpider) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 0, false);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 3, 0, false);
EXPECT_EQ(list3.getListSize(), final1.getListSize());
for (list3.resetListPtr(), final1.resetListPtr(); !final1.isExhausted(); list3.skipCurrentRecord(), final1.skipCurrentRecord()) {
@ -617,7 +617,7 @@ TEST_F(RdbListNoMergeTest, MergeTestPosdbSingleDocSpiderSpiderDelete) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 0, false);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 3, 0, false);
EXPECT_EQ(list3.getListSize(), final1.getListSize());
for (list3.resetListPtr(), final1.resetListPtr(); !final1.isExhausted(); list3.skipCurrentRecord(), final1.skipCurrentRecord()) {
@ -670,7 +670,7 @@ TEST_F(RdbListNoMergeTest, MergeTestPosdbSingleDocSpiderDeleteSpider) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 0, false);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 3, 0, false);
EXPECT_EQ(list3.getListSize(), final1.getListSize());
for (list3.resetListPtr(), final1.resetListPtr(); !final1.isExhausted(); list3.skipCurrentRecord(), final1.skipCurrentRecord()) {
@ -727,7 +727,7 @@ TEST_F(RdbListNoMergeTest, MergeTestPosdbSingleDocMergeStartSecondFile) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 1, false);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 2, 1, false);
EXPECT_EQ(list3.getListSize(), final1.getListSize());
for (list3.resetListPtr(), final1.resetListPtr(); !final1.isExhausted(); list3.skipCurrentRecord(), final1.skipCurrentRecord()) {
@ -793,7 +793,7 @@ TEST_F(RdbListNoMergeTest, MergeTestPosdbMultiDocS1S2N1S2S1N2) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 0, false);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 3, 0, false);
EXPECT_EQ(list2.getListSize() + list3.getListSize(), final1.getListSize());
@ -862,7 +862,7 @@ TEST_F(RdbListNoMergeTest, MergeTestPosdbMultiDocS1N2N1S2S1N2) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 0, false);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 3, 0, false);
EXPECT_EQ(list2.getListSize() + list3.getListSize(), final1.getListSize());
@ -933,7 +933,7 @@ TEST_F(RdbListNoMergeTest, MergeTestPosdbMultiDocS1S2D1S2S1N2) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 0, false);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 3, 0, false);
// first record from list2 is not in output list
list2.resetListPtr();
@ -1008,7 +1008,7 @@ TEST_F(RdbListNoMergeTest, MergeTestPosdbMultiDocS1S2D1S2S1D2) {
RdbList final1;
final1.set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final1.prepareForMerge(lists1, lists1_size, -1);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 0, false);
final1.merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 3, 0, false);
EXPECT_EQ(list3.getListSize(), final1.getListSize());
for (list3.resetListPtr(), final1.resetListPtr(); !final1.isExhausted(); list3.skipCurrentRecord(), final1.skipCurrentRecord()) {
@ -1082,7 +1082,7 @@ static void mergePosdbLists(collnum_t collNum, RdbList *final1, RdbList *list1,
// merge
final1->set(nullptr, 0, nullptr, 0, Posdb::getFixedDataSize(), true, Posdb::getUseHalfKeys(), Posdb::getKeySize());
final1->prepareForMerge(lists1, lists1_size, -1);
final1->merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, startFileNum, true);
final1->merge_r(lists1, lists1_size, KEYMIN(), KEYMAX(), -1, false, RDB_POSDB, collNum, 2, startFileNum, true);
}
static void expectEqualList(RdbList *list1, RdbList *list2) {
@ -1227,7 +1227,7 @@ TEST(RdbListTest, DISABLED_MergeTest1) {
logf(LOG_DEBUG,"starting merge");
int64_t t = gettimeofdayInMilliseconds();
// do it
list.merge_r(lists, numToMerge, KEYMIN(), KEYMAX(), numKeysWanted, true, RDB_NONE, 0, 0, true);
list.merge_r(lists, numToMerge, KEYMIN(), KEYMAX(), numKeysWanted, true, RDB_NONE, 0, 4, 0, true);
// completed
int64_t now = gettimeofdayInMilliseconds();
@ -1303,7 +1303,7 @@ TEST(RdbListTest, DISABLED_MergeTest2) {
logf(LOG_DEBUG, "-------list #3-------");
list3.printList();
final.merge_r(lists, 3, KEYMIN(), KEYMAX(), min, true, RDB_NONE, 0, 0, true);
final.merge_r(lists, 3, KEYMIN(), KEYMAX(), min, true, RDB_NONE, 0, 3, 0, true);
logf(LOG_DEBUG,"------list final------");
final.printList();