Store end merge fileId to merge file name as well so we don't delete additional file when resuming a killed merge

This commit is contained in:
Ai Lin Chia
2016-07-13 12:49:29 +02:00
parent f33d2e35bc
commit a809b0607b
2 changed files with 63 additions and 30 deletions

@ -370,6 +370,7 @@ bool RdbBase::setFiles ( ) {
// assume no mergeNum
int32_t mergeNum = -1;
int32_t endMergeFileId = -1;
// if file id is even, we need the # of files being merged
// otherwise, if it is odd, we do not
@ -383,7 +384,10 @@ bool RdbBase::setFiles ( ) {
continue;
}
mergeNum = atol2 ( s , 3 );
mergeNum = atol2( s , 3 );
s += 4;
endMergeFileId = atol2( s, 4 );
}
// sometimes an unlink() does not complete properly and we
@ -429,7 +433,7 @@ bool RdbBase::setFiles ( ) {
// . MUST be in order of fileId for merging purposes
// . we assume older files come first so newer can override
// in RdbList::merge() routine
if ( addFile( id, false, mergeNum, id2 ) < 0 ) {
if ( addFile( false, id, id2, mergeNum, endMergeFileId ) < 0 ) {
return false;
}
}
@ -489,7 +493,7 @@ bool RdbBase::setFiles ( ) {
// return the fileNum we added it to in the array
// reutrn -1 and set g_errno on error
int32_t RdbBase::addFile ( int32_t id, bool isNew, int32_t mergeNum, int32_t id2 ) {
int32_t RdbBase::addFile ( bool isNew, int32_t fileId, int32_t fileId2, int32_t mergeNum, int32_t endMergeFileId ) {
int32_t n = m_numFiles;
// can't exceed this
@ -522,15 +526,17 @@ int32_t RdbBase::addFile ( int32_t id, bool isNew, int32_t mergeNum, int32_t id2
char name[512];
if ( mergeNum <= 0 ) {
if ( m_isTitledb ) {
snprintf( name, 511, "%s%04" PRId32"-%03" PRId32".dat", m_dbname, id, id2 );
snprintf( name, 511, "%s%04" PRId32"-%03" PRId32".dat", m_dbname, fileId, fileId2 );
} else {
snprintf( name, 511, "%s%04" PRId32".dat", m_dbname, id );
snprintf( name, 511, "%s%04" PRId32".dat", m_dbname, fileId );
}
} else {
if ( m_isTitledb ) {
snprintf( name, 511, "%s%04" PRId32"-%03" PRId32".%03" PRId32".dat", m_dbname, id, id2, mergeNum );
snprintf( name, 511, "%s%04" PRId32"-%03" PRId32".%03" PRId32".%04" PRId32".dat",
m_dbname, fileId, fileId2, mergeNum, endMergeFileId );
} else {
snprintf( name, 511, "%s%04" PRId32".%03" PRId32".dat", m_dbname, id, mergeNum );
snprintf( name, 511, "%s%04" PRId32".%03" PRId32".%04" PRId32".dat",
m_dbname, fileId, mergeNum, endMergeFileId );
}
}
@ -586,7 +592,7 @@ int32_t RdbBase::addFile ( int32_t id, bool isNew, int32_t mergeNum, int32_t id2
g_conf.m_maxMem = mm;
// sanity check
if ( id2 < 0 && m_isTitledb ) {
if ( fileId2 < 0 && m_isTitledb ) {
g_process.shutdownAbort(true);
}
@ -622,7 +628,7 @@ int32_t RdbBase::addFile ( int32_t id, bool isNew, int32_t mergeNum, int32_t id2
}
// set the map file's filename
sprintf ( name , "%s%04" PRId32".map", m_dbname, id );
sprintf ( name , "%s%04" PRId32".map", m_dbname, fileId );
m->set ( getDir(), name, m_fixedDataSize, m_useHalfKeys, m_ks, m_pageSize );
if ( ! isNew && ! m->readMap ( f ) ) {
// if out of memory, do not try to regen for that
@ -701,13 +707,13 @@ int32_t RdbBase::addFile ( int32_t id, bool isNew, int32_t mergeNum, int32_t id2
// find the position to add so we maintain order by fileId
int32_t i ;
for ( i = 0 ; i < m_numFiles ; i++ ) {
if ( m_fileIds[i] >= id ) {
if ( m_fileIds[i] >= fileId ) {
break;
}
}
// cannot collide here
if ( i < m_numFiles && m_fileIds[i] == id ) {
if ( i < m_numFiles && m_fileIds[i] == fileId ) {
log(LOG_LOGIC,"db: addFile: fileId collided.");
return -1;
}
@ -722,8 +728,8 @@ int32_t RdbBase::addFile ( int32_t id, bool isNew, int32_t mergeNum, int32_t id2
}
// insert this file into position #i
m_fileIds [i] = id;
m_fileIds2 [i] = id2;
m_fileIds [i] = fileId;
m_fileIds2 [i] = fileId2;
m_files [i] = f;
m_maps [i] = m;
@ -737,7 +743,7 @@ int32_t RdbBase::addFile ( int32_t id, bool isNew, int32_t mergeNum, int32_t id2
m->reduceMemFootPrint();
// are we resuming a killed merge?
if ( g_conf.m_readOnlyMode && ((id & 0x01)==0) ) {
if ( g_conf.m_readOnlyMode && ((fileId & 0x01)==0) ) {
log("db: Cannot start in read only mode with an incomplete "
"merge, because we might be a temporary cluster and "
"the merge might be active.");
@ -775,7 +781,7 @@ int32_t RdbBase::addNewFile ( int32_t id2 ) {
}
// otherwise, set it
return addFile( fileId, true, -1, id2 );
return addFile( true, fileId, id2, -1, -1 );
}
static void doneWrapper ( void *state ) ;
@ -1159,7 +1165,6 @@ void RdbBase::buryFiles ( int32_t a , int32_t b ) {
// . TODO: fix Rdb::attemptMergeAll() to not remove from linked list if
// we had an error in addNewFile() or rdbmerge.cpp's call to rdbbase::addFile
bool RdbBase::attemptMerge ( int32_t niceness, bool forceMergeAll, bool doLog , int32_t minToMergeOverride ) {
logTrace( g_conf.m_logTraceRdbBase, "BEGIN. minToMergeOverride: %" PRId32, minToMergeOverride);
// don't do merge if we're in read only mode
@ -1211,9 +1216,9 @@ bool RdbBase::attemptMerge ( int32_t niceness, bool forceMergeAll, bool doLog ,
// RdbDump::updateTfndbLoop()
char rdbId = getIdFromRdb ( m_rdb );
if ( rdbId == RDB_TITLEDB && g_titledb.m_rdb.m_dump.isDumping() ) {
if ( doLog )
log(LOG_INFO,"db: Can not merge titledb while it "
"is dumping.");
if ( doLog ) {
log( LOG_INFO, "db: Can not merge titledb while it is dumping." );
}
logTrace( g_conf.m_logTraceRdbBase, "END, wait for titledb dump" );
return false;
}
@ -1387,7 +1392,7 @@ bool RdbBase::attemptMerge ( int32_t niceness, bool forceMergeAll, bool doLog ,
percentNegativeRecs > .20 ) {
m_nextMergeForced = true;
forceMergeAll = true;
log("rdb: hit negative rec concentration of %f "
log( LOG_INFO, "rdb: hit negative rec concentration of %f "
"(total=%" PRId64") for "
"collnum %" PRId32" on db %s when diskAvail=%" PRId64" bytes",
percentNegativeRecs,totalRecs,(int32_t)m_collnum,
@ -1398,7 +1403,7 @@ bool RdbBase::attemptMerge ( int32_t niceness, bool forceMergeAll, bool doLog ,
percentNegativeRecs > .40 ) {
m_nextMergeForced = true;
forceMergeAll = true;
log("rdb: hit negative rec concentration of %f "
log( LOG_INFO, "rdb: hit negative rec concentration of %f "
"(total=%" PRId64") for "
"collnum %" PRId32" on db %s",
percentNegativeRecs,totalRecs,(int32_t)m_collnum,
@ -1516,9 +1521,11 @@ bool RdbBase::attemptMerge ( int32_t niceness, bool forceMergeAll, bool doLog ,
// then merge F and E, but if D is < E merged D too, etc...
// . this merge algorithm is definitely better than merging everything
// if we don't do much reading to the db, only writing
int32_t n = 0;
int32_t mergeFileId;
int32_t mergeFileNum;
int32_t n = 0;
int32_t mergeFileId;
int32_t mergeFileNum;
int32_t endMergeFileId;
int32_t endMergeFileNum;
float minr ;
int64_t mint ;
int32_t mini ;
@ -1564,6 +1571,28 @@ bool RdbBase::attemptMerge ( int32_t niceness, bool forceMergeAll, bool doLog ,
// get the "003" part
n = atol2 ( s , 3 );
s += 4;
// get end merge fileNum
int32_t endMergeFileId = atol2( s, 4 );
int32_t endMergeFileNum = 0;
if ( !endMergeFileId ) {
// bad file name (should not happen after the first run)
gbshutdownCorrupted();
}
int32_t kEnd = j + 1 + n;
for ( int32_t k = j + 1; k < kEnd; ++k ) {
if ( m_fileIds[ k ] == endMergeFileId ) {
endMergeFileNum = k;
break;
}
}
if ( endMergeFileNum < mergeFileNum ) {
// missing end merge file id. assume no files to merge.
endMergeFileNum = mergeFileNum;
}
// sanity check
if ( n <= 1 ) {
@ -1583,7 +1612,7 @@ bool RdbBase::attemptMerge ( int32_t niceness, bool forceMergeAll, bool doLog ,
// compute the total size of merged file
mint = 0;
int32_t mm = 0;
for ( int32_t i = mergeFileNum ; i <= mergeFileNum + n ; i++ ) {
for ( int32_t i = mergeFileNum ; i <= endMergeFileNum ; i++ ) {
if ( i >= m_numFiles ) {
log( LOG_INFO, "merge: Number of files to merge has "
"shrunk from %" PRId32" to %" PRId32" since time of "
@ -1854,13 +1883,18 @@ bool RdbBase::attemptMerge ( int32_t niceness, bool forceMergeAll, bool doLog ,
// . this now also sets m_mergeStartFileNum for us... but we override
// below anyway. we have to set it there in case we startup and
// are resuming a merge.
mergeFileNum = addFile ( mergeFileId , true, n, id2 ) ;
endMergeFileNum = mini + n - 1;
endMergeFileId = m_fileIds[ endMergeFileNum ];
log( LOG_INFO, "merge: n=%d mini=%d mergeFileId=%d endMergeFileNum=%d endMergeFileId=%d",
n, mini, mergeFileId, endMergeFileNum, endMergeFileId );
mergeFileNum = addFile ( true, mergeFileId , id2, n, endMergeFileId );
if ( mergeFileNum < 0 ) {
log(LOG_LOGIC,"merge: attemptMerge: Could not add new file.");
g_errno = 0;
return false;
}
// is it a force?
if ( m_nextMergeForced ) {
log(LOG_INFO, "merge: Force merging all %s files, except those being dumped now.", m_dbname);
@ -1891,8 +1925,7 @@ bool RdbBase::attemptMerge ( int32_t niceness, bool forceMergeAll, bool doLog ,
// print out file info
m_numPos = 0;
m_numNeg = 0;
for ( int32_t i = m_mergeStartFileNum ;
i < m_mergeStartFileNum + m_numFilesToMerge ; i++ ) {
for ( int32_t i = m_mergeStartFileNum; i < m_mergeStartFileNum + m_numFilesToMerge; ++i ) {
m_numPos += m_maps[i]->getNumPositiveRecs();
m_numNeg += m_maps[i]->getNumNegativeRecs();
log(LOG_INFO,"merge: %s (#%" PRId32") has %" PRId64" positive "

@ -151,7 +151,7 @@ class RdbBase {
// . add a (new) file to the m_files/m_maps/m_fileIds arrays
// . both return array position we added it to
// . both return -1 and set errno on error
int32_t addFile ( int32_t fileId, bool isNew, int32_t mergeNum, int32_t id2 ) ;
int32_t addFile ( bool isNew, int32_t fileId, int32_t fileId2, int32_t mergeNum, int32_t endMergeFileId ) ;
int32_t addNewFile ( int32_t id2 ) ;
// used by the high priority udp server to suspend merging for ALL