2013-08-02 13:12:24 -07:00
# include "gb-include.h"
# include "Rdb.h"
2016-11-12 20:24:20 +01:00
# include "Conf.h"
2013-08-02 13:12:24 -07:00
# include "Clusterdb.h"
# include "Hostdb.h"
# include "Tagdb.h"
# include "Posdb.h"
# include "Titledb.h"
# include "Sections.h"
# include "Spider.h"
# include "Linkdb.h"
# include "Collectiondb.h"
2016-11-14 22:35:12 +01:00
# include "RdbMerge.h"
2013-08-02 13:12:24 -07:00
# include "Repair.h"
2014-01-18 21:19:26 -08:00
# include "Rebalance.h"
2016-04-29 14:24:26 +02:00
# include "JobScheduler.h"
2016-07-11 12:54:17 +02:00
# include "Process.h"
2016-10-18 16:09:02 +02:00
# include "Sanity.h"
2016-10-21 13:38:48 +02:00
# include "Dir.h"
2016-10-27 13:47:49 +02:00
# include "File.h"
2016-10-18 14:07:55 +02:00
# include "GbMoveFile.h"
2016-10-31 18:16:40 +01:00
# include "GbMakePath.h"
2016-12-08 16:56:09 +01:00
# include "Mem.h"
2016-08-29 13:54:50 +02:00
# include "ScopedLock.h"
2016-10-18 15:59:59 +02:00
# include <sys/stat.h> //mkdir(), stat()
2016-10-28 14:12:25 +02:00
# include <fcntl.h>
2016-08-18 15:05:41 +02:00
# include <algorithm>
2016-10-31 18:16:40 +01:00
# include <set>
# include <signal.h>
2016-11-01 12:13:31 +01:00
# include <algorithm>
2016-10-31 18:16:40 +01:00
2016-11-03 16:57:38 +01:00
// An RdbBase instance is a set of files for a database, eg PosDB. Each file consists of one data file (which may
// actually be multiple part files), one map file, and optionally one index file. Example:
// posdb0001.dat
// posdb0001.map
// posdb0151.dat
// posdb0151.map
// posdb0231.dat
// posdb0231.map
// posdb0233.dat
// posdb0233.map
// posdb0235.dat
// posdb0235.map
// The files (RdbBase::FileInfo) are kept in RdbBase::m_fileInfo and are sorted according to file-id. The file ID
// is the number (1/151/231/233/235 in above example). Normal files have an odd number. During merge the temporary
// destination merge file has an even number and additional filename components. Eg:
// posdb0001.dat
// posdb0001.map
// posdb0151.dat
// posdb0151.map
// posdb0230.003.0235.dat
// posdb0230.003.0235.map
// posdb0231.dat
// posdb0231.map
// posdb0233.dat
// posdb0233.map
// posdb0235.dat
// posdb0235.map
// The extra filename components are the source file count and the fileId of the last source file.
//
// TitleDB is special due to legacy. The *.dat files have an additional component that is always "000". eg:
// titledb0001-000.dat
// titledb0000-000.002.0003.dat
2016-11-03 17:01:01 +01:00
// The extra component is not used anymore and there are no clues about what it was used for.
2016-11-03 16:57:38 +01:00
//
// RdbBase::attemptMerge() is called periodically and for various reasons and with different parameters. It selects
2016-11-03 17:01:01 +01:00
// a consecutive range of files to merge (eg 0231..0235), inserts a lowId-1 file (0230), and then hands off the
2016-11-03 16:57:38 +01:00
// hard work to RdbMerge.
//
2016-11-03 17:01:01 +01:00
// During merge, files can be marked as unreadable (testable with RdbBase::isReadable()) because the file may be
// incomplete (eg. the destination merge file) or about to be deleted (source files when merge has finishes).
//
2016-11-03 16:57:38 +01:00
// When RdbMerge finishes it calls back to RdbBase::incorporateMerge() which makes a circus trick with finishing
// the merge with multiple callbacks, phases and error recovery strategies. Ultimately, RdbBase::renamesDone() is
// called which cleans up (removes knowledge of deleted files, relinquishes merge space lock, ..)
2013-08-02 13:12:24 -07:00
2016-10-03 14:41:05 +02:00
bool g_dumpMode = false ;
2013-08-02 13:12:24 -07:00
2016-10-13 14:33:27 +02:00
// NEVER merge more than this many files, our current merge routine
// does not scale well to many files
static const int32_t absoluteMaxFilesToMerge = 50 ;
2017-03-17 14:48:10 +01:00
GbThreadQueue RdbBase : : m_globalIndexThreadQueue ;
2016-10-13 14:33:27 +02:00
2016-08-25 17:19:42 +02:00
RdbBase : : RdbBase ( )
2016-10-31 18:16:40 +01:00
: m_numFiles ( 0 ) ,
m_mtxFileInfo ( ) ,
m_docIdFileIndex ( new docids_t ) ,
2017-03-07 12:49:37 +01:00
m_attemptOnlyMergeResumption ( true ) ,
2017-04-19 16:20:24 +02:00
m_dumpingFileId ( - 1 ) ,
2016-10-31 18:16:40 +01:00
m_submittingJobs ( false ) ,
m_outstandingJobCount ( 0 ) ,
m_mtxJobCount ( )
{
2013-08-30 16:20:38 -07:00
m_rdb = NULL ;
2014-01-16 13:38:22 -08:00
m_nextMergeForced = false ;
2016-10-21 13:38:48 +02:00
m_collectionDirName [ 0 ] = ' \0 ' ;
2016-10-31 18:16:40 +01:00
m_mergeDirName [ 0 ] = ' \0 ' ;
2014-06-09 10:16:29 -07:00
m_dbname [ 0 ] = ' \0 ' ;
m_dbnameLen = 0 ;
2016-05-02 16:03:40 +02:00
2014-06-09 10:16:29 -07:00
// use bogus collnum just in case
m_collnum = - 1 ;
2016-08-29 13:54:50 +02:00
2016-09-23 14:21:44 +02:00
// init below mainly to quiet coverity
m_fixedDataSize = 0 ;
m_coll = NULL ;
m_didRepair = false ;
m_tree = NULL ;
m_buckets = NULL ;
2016-10-13 15:29:38 +02:00
m_minToMergeDefault = 0 ;
2016-09-23 14:21:44 +02:00
m_minToMerge = 0 ;
m_numFilesToMerge = 0 ;
m_mergeStartFileNum = 0 ;
m_useHalfKeys = false ;
m_useIndexFile = false ;
2016-10-20 17:02:34 +02:00
m_isTitledb = false ;
2016-09-23 14:21:44 +02:00
m_ks = 0 ;
m_pageSize = 0 ;
m_niceness = 0 ;
2016-10-13 14:45:47 +02:00
m_premergeNumPositiveRecords = 0 ;
m_premergeNumNegativeRecords = 0 ;
2016-10-04 15:06:54 +02:00
memset ( m_fileInfo , 0 , sizeof ( m_fileInfo ) ) ;
2016-09-23 14:21:44 +02:00
2013-08-02 13:12:24 -07:00
reset ( ) ;
}
void RdbBase : : reset ( ) {
2014-11-10 14:45:11 -08:00
for ( int32_t i = 0 ; i < m_numFiles ; i + + ) {
2016-10-04 15:06:54 +02:00
mdelete ( m_fileInfo [ i ] . m_file , sizeof ( BigFile ) , " RdbBFile " ) ;
delete m_fileInfo [ i ] . m_file ;
2016-08-11 14:23:57 +02:00
2016-10-04 15:06:54 +02:00
mdelete ( m_fileInfo [ i ] . m_map , sizeof ( RdbMap ) , " RdbBMap " ) ;
delete m_fileInfo [ i ] . m_map ;
2016-08-09 18:25:10 +02:00
2016-10-04 15:06:54 +02:00
mdelete ( m_fileInfo [ i ] . m_index , sizeof ( RdbIndex ) , " RdbBIndex " ) ;
delete m_fileInfo [ i ] . m_index ;
2013-08-02 13:12:24 -07:00
}
2016-08-11 14:23:57 +02:00
2013-08-02 13:12:24 -07:00
m_numFiles = 0 ;
m_isMerging = false ;
}
RdbBase : : ~ RdbBase ( ) {
//close ( NULL , NULL );
reset ( ) ;
}
2016-10-04 16:08:36 +02:00
bool RdbBase : : init ( const char * dir ,
const char * dbname ,
2016-08-24 13:43:29 +02:00
int32_t fixedDataSize ,
int32_t minToMergeArg ,
bool useHalfKeys ,
char keySize ,
int32_t pageSize ,
const char * coll ,
collnum_t collnum ,
RdbTree * tree ,
RdbBuckets * buckets ,
Rdb * rdb ,
bool useIndexFile ) {
2014-02-01 10:14:25 -08:00
2016-10-24 13:00:58 +02:00
if ( ! dir )
gbshutdownLogicError ( ) ;
2014-02-01 10:14:25 -08:00
m_didRepair = false ;
2016-07-12 12:52:34 +02:00
2016-10-21 13:38:48 +02:00
sprintf ( m_collectionDirName , " %scoll.%s.% " PRId32 , dir , coll , ( int32_t ) collnum ) ;
2016-11-14 15:25:28 +01:00
// use override from hosts.conf if present
const char * mergeSpaceDir = strlen ( g_hostdb . m_myHost - > m_mergeDir ) > 0 ? g_hostdb . m_myHost - > m_mergeDir : g_conf . m_mergespaceDirectory ;
sprintf ( m_mergeDirName , " %s/%d/coll.%s.%d " , mergeSpaceDir , getMyHostId ( ) , coll , ( int32_t ) collnum ) ;
2013-11-13 13:27:22 -08:00
2013-12-10 16:41:30 -08:00
// logDebugAdmin
2016-07-12 12:52:34 +02:00
log ( LOG_DEBUG , " db: adding new base for dir=%s coll=%s collnum=% " PRId32 " db=%s " ,
2014-11-10 14:45:11 -08:00
dir , coll , ( int32_t ) collnum , dbname ) ;
2013-11-13 13:27:22 -08:00
2016-10-31 18:16:40 +01:00
//make sure merge space directory exists
if ( makePath ( m_mergeDirName , getDirCreationFlags ( ) ) ! = 0 ) {
g_errno = errno ;
log ( LOG_ERROR , " makePath(%s) failed with errno=%d (%s) " , m_mergeDirName , errno , strerror ( errno ) ) ;
return false ;
}
2016-10-24 13:04:39 +02:00
top :
// reset all
reset ( ) ;
2013-08-02 13:12:24 -07:00
m_coll = coll ;
m_collnum = collnum ;
m_tree = tree ;
m_buckets = buckets ;
m_rdb = rdb ;
// save the dbname NULL terminated into m_dbname/m_dbnameLen
2016-07-28 17:04:35 +02:00
m_dbnameLen = strlen ( dbname ) ;
2015-01-13 12:25:42 -07:00
gbmemcpy ( m_dbname , dbname , m_dbnameLen ) ;
2013-08-02 13:12:24 -07:00
m_dbname [ m_dbnameLen ] = ' \0 ' ;
// store the other parameters
m_fixedDataSize = fixedDataSize ;
m_useHalfKeys = useHalfKeys ;
2016-10-20 17:02:34 +02:00
m_isTitledb = rdb - > isTitledb ( ) ;
2013-08-02 13:12:24 -07:00
m_ks = keySize ;
m_pageSize = pageSize ;
2016-08-17 01:21:25 +02:00
m_useIndexFile = useIndexFile ;
if ( m_useIndexFile ) {
char indexName [ 64 ] ;
sprintf ( indexName , " %s-saved.idx " , m_dbname ) ;
2017-03-15 15:58:52 +01:00
m_treeIndex . set ( m_collectionDirName , indexName , m_fixedDataSize , m_useHalfKeys , m_ks , m_rdb - > getRdbId ( ) , false ) ;
2016-08-17 01:21:25 +02:00
2016-10-04 11:16:33 +02:00
// only attempt to read/generate when we have tree/bucket
if ( ( m_tree & & m_tree - > getNumUsedNodes ( ) > 0 ) | | ( m_buckets & & m_buckets - > getNumKeys ( ) > 0 ) ) {
if ( ! ( m_treeIndex . readIndex ( ) & & m_treeIndex . verifyIndex ( ) ) ) {
g_errno = 0 ;
log ( LOG_WARN , " db: Could not read index file %s " , indexName ) ;
// if 'gb dump X collname' was called, bail, we do not want to write any data
if ( g_dumpMode ) {
return false ;
}
log ( LOG_INFO , " db: Attempting to generate index file %s/%s-saved.dat. May take a while. " ,
2016-10-21 13:38:48 +02:00
m_collectionDirName , m_dbname ) ;
2016-10-04 11:16:33 +02:00
bool result = m_tree ? m_treeIndex . generateIndex ( m_collnum , m_tree ) : m_treeIndex . generateIndex ( m_collnum , m_buckets ) ;
if ( ! result ) {
2016-10-21 13:38:48 +02:00
logError ( " db: Index generation failed for %s/%s-saved.dat. " , m_collectionDirName , m_dbname ) ;
2016-10-04 11:16:33 +02:00
gbshutdownCorrupted ( ) ;
}
log ( LOG_INFO , " db: Index generation succeeded. " ) ;
// . save it
// . if we're an even #'d file a merge will follow
// when main.cpp calls attemptMerge()
log ( " db: Saving generated index file to disk. " ) ;
2017-03-13 13:39:10 +01:00
bool status = m_treeIndex . writeIndex ( false ) ;
2016-10-04 11:16:33 +02:00
if ( ! status ) {
log ( LOG_ERROR , " db: Save failed. " ) ;
return false ;
}
2016-08-17 01:21:25 +02:00
}
}
}
2013-08-02 13:12:24 -07:00
// we can't merge more than MAX_RDB_FILES files at a time
2016-08-18 15:05:41 +02:00
if ( minToMergeArg > MAX_RDB_FILES ) {
minToMergeArg = MAX_RDB_FILES ;
}
2016-10-13 15:29:38 +02:00
m_minToMergeDefault = minToMergeArg ;
2016-08-18 15:05:41 +02:00
2013-11-13 13:27:22 -08:00
// . set our m_files array
2014-02-01 10:14:25 -08:00
if ( ! setFiles ( ) ) {
// try again if we did a repair
2016-08-18 15:05:41 +02:00
if ( m_didRepair ) {
goto top ;
}
2014-02-01 10:14:25 -08:00
// if no repair, give up
return false ;
}
2016-08-18 15:05:41 +02:00
2015-08-14 12:58:54 -06:00
// now diskpagecache is much simpler, just basically rdbcache...
return true ;
2013-08-02 13:12:24 -07:00
}
// . move all files into trash subdir
// . this is part of PageRepair's repair algorithm. all this stuff blocks.
2016-10-17 11:46:05 +02:00
bool RdbBase : : moveToTrash ( const char * dstDir ) {
2013-08-02 13:12:24 -07:00
// loop over all files
2014-11-10 14:45:11 -08:00
for ( int32_t i = 0 ; i < m_numFiles ; i + + ) {
2016-11-01 13:51:16 +01:00
// rename the map file
2016-10-20 17:25:41 +02:00
{
BigFile * f = m_fileInfo [ i ] . m_map - > getFile ( ) ;
logf ( LOG_INFO , " repair: Renaming %s to %s%s " , f - > getFilename ( ) , dstDir , f - > getFilename ( ) ) ;
2016-10-25 16:12:17 +02:00
if ( ! f - > rename ( f - > getFilename ( ) , dstDir ) ) {
2016-10-20 17:25:41 +02:00
log ( LOG_WARN , " repair: Moving file had error: %s. " , mstrerror ( errno ) ) ;
return false ;
}
2016-07-07 10:29:05 +02:00
}
2016-11-01 13:51:16 +01:00
// rename index file if used
2016-08-17 01:21:25 +02:00
if ( m_useIndexFile ) {
2016-10-20 17:25:41 +02:00
BigFile * f = m_fileInfo [ i ] . m_index - > getFile ( ) ;
2016-08-17 01:21:25 +02:00
if ( f - > doesExist ( ) ) {
2016-10-20 17:25:41 +02:00
logf ( LOG_INFO , " repair: Renaming %s to %s%s " , f - > getFilename ( ) , dstDir , f - > getFilename ( ) ) ;
2016-10-25 16:12:17 +02:00
if ( ! f - > rename ( f - > getFilename ( ) , dstDir ) ) {
2016-08-17 01:21:25 +02:00
log ( LOG_WARN , " repair: Moving file had error: %s. " , mstrerror ( errno ) ) ;
2016-08-05 15:42:20 +02:00
return false ;
}
}
}
2013-08-02 13:12:24 -07:00
// move the data file
2016-10-20 17:25:41 +02:00
{
BigFile * f = m_fileInfo [ i ] . m_file ;
logf ( LOG_INFO , " repair: Renaming %s to %s%s " , f - > getFilename ( ) , dstDir , f - > getFilename ( ) ) ;
2016-10-25 16:12:17 +02:00
if ( ! f - > rename ( f - > getFilename ( ) , dstDir ) ) {
2016-10-20 17:25:41 +02:00
log ( LOG_WARN , " repair: Moving file had error: %s. " , mstrerror ( errno ) ) ;
return false ;
}
2016-07-07 10:29:05 +02:00
}
2013-08-02 13:12:24 -07:00
}
2016-08-17 01:21:25 +02:00
if ( m_useIndexFile ) {
2016-11-01 13:51:16 +01:00
// rename tree index file
BigFile * f = m_treeIndex . getFile ( ) ;
if ( f - > doesExist ( ) ) {
logf ( LOG_INFO , " repair: Renaming %s to %s%s " , f - > getFilename ( ) , dstDir , f - > getFilename ( ) ) ;
if ( ! f - > rename ( f - > getFilename ( ) , dstDir ) ) {
log ( LOG_WARN , " repair: Moving file had error: %s. " , mstrerror ( errno ) ) ;
return false ;
}
}
2016-08-17 01:21:25 +02:00
}
2016-11-01 13:51:16 +01:00
2013-08-02 13:12:24 -07:00
// now just reset the files so we are empty, we should have our
// setFiles() called again once the newly rebuilt rdb files are
// renamed, when RdbBase::rename() is called below
reset ( ) ;
2016-08-17 01:21:25 +02:00
2013-08-02 13:12:24 -07:00
// success
return true ;
}
// . newly rebuilt rdb gets renamed to the original, after we call
// RdbBase::trash() on the original.
// . this is part of PageRepair's repair algorithm. all this stuff blocks.
2016-11-01 13:51:16 +01:00
bool RdbBase : : removeRebuildFromFilenames ( ) {
2013-08-02 13:12:24 -07:00
// loop over all files
2016-11-01 13:51:16 +01:00
// DON'T STOP IF ONE FAILS
for ( int32_t i = 0 ; i < m_numFiles ; i + + ) {
// rename the map file
// get the "base" filename, does not include directory
removeRebuildFromFilename ( m_fileInfo [ i ] . m_file ) ;
// rename the map file
removeRebuildFromFilename ( m_fileInfo [ i ] . m_map - > getFile ( ) ) ;
// rename the index file
if ( m_useIndexFile ) {
removeRebuildFromFilename ( m_fileInfo [ i ] . m_index - > getFile ( ) ) ;
2016-08-05 15:42:20 +02:00
}
2013-08-02 13:12:24 -07:00
}
2016-11-01 13:51:16 +01:00
2013-08-02 13:12:24 -07:00
// reset all now
reset ( ) ;
2016-11-01 13:51:16 +01:00
2013-08-02 13:12:24 -07:00
// now PageRepair should reload the original
return true ;
}
bool RdbBase : : removeRebuildFromFilename ( BigFile * f ) {
// get the filename
2016-10-24 16:03:38 +02:00
const char * ff = f - > getFilename ( ) ;
2013-08-02 13:12:24 -07:00
// copy it
char buf [ 1024 ] ;
2016-09-23 14:31:16 +02:00
strncpy ( buf , ff , sizeof ( buf ) ) ;
buf [ sizeof ( buf ) - 1 ] = ' \0 ' ;
2013-08-02 13:12:24 -07:00
// remove "Rebuild" from it
char * p = strstr ( buf , " Rebuild " ) ;
2016-07-07 10:29:05 +02:00
if ( ! p ) {
log ( LOG_WARN , " repair: Rebuild not found in filename=%s " , buf ) ;
return false ;
}
2013-08-02 13:12:24 -07:00
// bury it
2016-07-28 17:04:35 +02:00
int32_t rlen = strlen ( " Rebuild " ) ;
char * end = buf + strlen ( buf ) ;
2014-11-10 14:45:11 -08:00
int32_t size = end - ( p + rlen ) ;
2013-08-02 13:12:24 -07:00
// +1 to include the ending \0
memmove ( p , p + rlen , size + 1 ) ;
// now rename this file
logf ( LOG_INFO , " repair: Renaming %s to %s " ,
f - > getFilename ( ) , buf ) ;
2016-10-25 16:12:17 +02:00
if ( ! f - > rename ( buf , NULL ) ) {
2016-07-07 10:29:05 +02:00
log ( LOG_WARN , " repair: Rename to %s failed " , buf ) ;
return false ;
}
2013-08-02 13:12:24 -07:00
return true ;
}
2016-07-14 11:27:47 +02:00
bool RdbBase : : parseFilename ( const char * filename , int32_t * p_fileId , int32_t * p_fileId2 ,
int32_t * p_mergeNum , int32_t * p_endMergeFileId ) {
// then a 4 digit number should follow filename
const char * s = filename + m_dbnameLen ;
if ( ! isdigit ( * ( s + 0 ) ) | | ! isdigit ( * ( s + 1 ) ) | | ! isdigit ( * ( s + 2 ) ) | | ! isdigit ( * ( s + 3 ) ) ) {
return false ;
}
// optional 5th digit
int32_t len = 4 ;
if ( isdigit ( * ( s + 4 ) ) ) {
len = 5 ;
}
// . read that id
// . older files have lower numbers
* p_fileId = atol2 ( s , len ) ;
s + = len ;
* p_fileId2 = - 1 ;
// if we are titledb, we got the secondary id
if ( * s = = ' - ' ) {
* p_fileId2 = atol2 ( s + 1 , 3 ) ;
s + = 4 ;
}
// assume no mergeNum
* p_mergeNum = - 1 ;
* p_endMergeFileId = - 1 ;
// if file id is even, we need the # of files being merged
// otherwise, if it is odd, we do not
if ( ( * p_fileId & 0x01 ) = = 0x00 ) {
if ( * s ! = ' . ' ) {
return false ;
}
+ + s ;
// 3 digit number (mergeNum)
if ( ! isdigit ( * ( s + 0 ) ) | | ! isdigit ( * ( s + 1 ) ) | | ! isdigit ( * ( s + 2 ) ) ) {
return false ;
}
* p_mergeNum = atol2 ( s , 3 ) ;
s + = 4 ;
// 4 digit number (endMergeNum)
if ( ! isdigit ( * ( s + 0 ) ) | | ! isdigit ( * ( s + 1 ) ) | | ! isdigit ( * ( s + 2 ) ) | | ! isdigit ( * ( s + 3 ) ) ) {
return false ;
}
* p_endMergeFileId = atol2 ( s , 4 ) ;
}
return true ;
}
2016-10-18 15:49:44 +02:00
bool RdbBase : : hasFileId ( int32_t fildId ) const {
for ( int i = 0 ; i < m_numFiles ; i + + )
if ( m_fileInfo [ i ] . m_fileId = = fildId )
return true ;
return false ;
}
2013-08-02 13:12:24 -07:00
// . this is called to open the initial rdb data and map files we have
// . first file is always the merge file (may be empty)
// . returns false on error
bool RdbBase : : setFiles ( ) {
2016-10-31 18:16:40 +01:00
if ( ! cleanupAnyChrashedMerged ( ) )
return false ;
if ( ! loadFilesFromDir ( m_collectionDirName , false ) )
return false ;
2016-11-03 16:17:14 +01:00
2016-10-31 18:16:40 +01:00
if ( ! loadFilesFromDir ( m_mergeDirName , true ) )
2016-10-21 15:27:27 +02:00
return false ;
// spiderdb should start with file 0001.dat or 0000.dat
if ( m_numFiles > 0 & & m_fileInfo [ 0 ] . m_fileId > 1 & & m_rdb - > getRdbId ( ) = = RDB_SPIDERDB ) {
//isj: is that even true anymore? Ok, crashed merges and lost file0000* are not a
//good thing but I don't see why it should affect spiderdb especially bad.
return fixNonfirstSpiderdbFiles ( ) ;
}
2016-11-02 14:26:30 +01:00
// create global index
generateGlobalIndex ( ) ;
2016-10-21 15:27:27 +02:00
// ensure files are sharded correctly
verifyFileSharding ( ) ;
return true ;
}
2016-10-31 18:16:40 +01:00
//Clean up any unfinished and unrecoverable merge
// Because a half-finished mergedir/mergefile.dat can be resumed easily we don't clean
// up mergedir/*.dat. Half-copied datadir/mergefile.dat are removed because the
// copy/move can easily be restarted (and it would be too much effort to restart copying
// halfway). Orphaned mergedir/*.map and mergedir/*.idx are removed. Orphaned data/*.map
// and data/*.idx are removed. Missing *.map and *.idx are automatically regenerated.
bool RdbBase : : cleanupAnyChrashedMerged ( ) {
//note: we could submit the unlik() calls to the jobscheduler if we really wanted
//but since this recovery-cleanup is done during startup I don't see a big problem
//with waiting for unlink() to finish because the collection will not be ready for
//use until cleanup has happened.
log ( LOG_DEBUG , " Cleaning up any unfinished merges of %s %s " , m_coll , m_dbname ) ;
//Remove datadir/mergefile.dat
{
Dir dir ;
dir . set ( m_collectionDirName ) ;
if ( ! dir . open ( ) )
return false ;
char pattern [ 128 ] ;
sprintf ( pattern , " %s* " , m_dbname ) ;
while ( const char * filename = dir . getNextFilename ( pattern ) ) {
if ( strstr ( pattern , " .dat " ) ! = NULL ) {
int32_t fileId , fileId2 ;
int32_t mergeNum , endMergeFileId ;
if ( parseFilename ( filename , & fileId , & fileId2 , & mergeNum , & endMergeFileId ) ) {
if ( ( fileId % 2 ) = = 0 ) {
char fullname [ 1024 ] ;
sprintf ( fullname , " %s/%s " , m_collectionDirName , filename ) ;
log ( LOG_DEBUG , " Removing %s " , fullname ) ;
if ( unlink ( fullname ) ! = 0 ) {
g_errno = errno ;
log ( LOG_ERROR , " unlink(%s) failed with errno=%d (%s) " , fullname , errno , strerror ( errno ) ) ;
return false ;
}
}
}
}
}
}
//Remove orphaned datadir/*.map and datadir/*.idx
{
std : : set < int32_t > existingDataDirFileIds ;
Dir dir ;
2016-11-01 15:03:25 +01:00
dir . set ( m_collectionDirName ) ;
2016-10-31 18:16:40 +01:00
if ( ! dir . open ( ) )
return false ;
char pattern [ 128 ] ;
sprintf ( pattern , " %s* " , m_dbname ) ;
while ( const char * filename = dir . getNextFilename ( pattern ) ) {
2016-11-01 15:03:25 +01:00
if ( strstr ( filename , " .dat " ) ! = NULL ) {
2016-10-31 18:16:40 +01:00
int32_t fileId , fileId2 ;
int32_t mergeNum , endMergeFileId ;
if ( parseFilename ( filename , & fileId , & fileId2 , & mergeNum , & endMergeFileId ) ) {
existingDataDirFileIds . insert ( fileId ) ;
}
}
}
dir . close ( ) ;
2016-11-01 15:03:25 +01:00
log ( LOG_DEBUG , " Found %lu .dat files for %s " , existingDataDirFileIds . size ( ) , m_dbname ) ;
2016-10-31 18:16:40 +01:00
if ( ! dir . open ( ) )
return false ;
while ( const char * filename = dir . getNextFilename ( pattern ) ) {
int32_t fileId , fileId2 ;
int32_t mergeNum , endMergeFileId ;
if ( parseFilename ( filename , & fileId , & fileId2 , & mergeNum , & endMergeFileId ) ) {
if ( existingDataDirFileIds . find ( fileId ) = = existingDataDirFileIds . end ( ) & & //unseen fileid
( strstr ( filename , " .map " ) ! = NULL | | strstr ( filename , " .idx " ) ! = NULL ) ) //.map or .idx
{
char fullname [ 1024 ] ;
sprintf ( fullname , " %s/%s " , m_collectionDirName , filename ) ;
log ( LOG_DEBUG , " Removing %s " , fullname ) ;
if ( unlink ( fullname ) ! = 0 ) {
g_errno = errno ;
log ( LOG_ERROR , " unlink(%s) failed with errno=%d (%s) " , fullname , errno , strerror ( errno ) ) ;
return false ;
}
}
}
}
}
//Remove orphaned mergedir/*.map and mergedir/*.idx
{
std : : set < int32_t > existingMergeDirFileIds ;
Dir dir ;
dir . set ( m_mergeDirName ) ;
if ( ! dir . open ( ) )
return false ;
char pattern [ 128 ] ;
sprintf ( pattern , " %s* " , m_dbname ) ;
while ( const char * filename = dir . getNextFilename ( pattern ) ) {
2016-11-01 15:03:25 +01:00
if ( strstr ( filename , " .dat " ) ! = NULL ) {
2016-10-31 18:16:40 +01:00
int32_t fileId , fileId2 ;
int32_t mergeNum , endMergeFileId ;
if ( parseFilename ( filename , & fileId , & fileId2 , & mergeNum , & endMergeFileId ) ) {
existingMergeDirFileIds . insert ( fileId ) ;
}
}
}
dir . close ( ) ;
2016-11-01 15:03:25 +01:00
log ( LOG_DEBUG , " Found %lu .dat files for %s in merge-space " , existingMergeDirFileIds . size ( ) , m_dbname ) ;
2016-10-31 18:16:40 +01:00
if ( ! dir . open ( ) )
return false ;
while ( const char * filename = dir . getNextFilename ( pattern ) ) {
int32_t fileId , fileId2 ;
int32_t mergeNum , endMergeFileId ;
if ( parseFilename ( filename , & fileId , & fileId2 , & mergeNum , & endMergeFileId ) ) {
if ( existingMergeDirFileIds . find ( fileId ) = = existingMergeDirFileIds . end ( ) & & //unseen fileid
( strstr ( filename , " .map " ) ! = NULL | | strstr ( filename , " .idx " ) ! = NULL ) ) //.map or .idx
{
char fullname [ 1024 ] ;
2016-11-01 15:03:25 +01:00
sprintf ( fullname , " %s/%s " , m_mergeDirName , filename ) ;
2016-10-31 18:16:40 +01:00
log ( LOG_DEBUG , " Removing %s " , fullname ) ;
if ( unlink ( fullname ) ! = 0 ) {
g_errno = errno ;
log ( LOG_ERROR , " unlink(%s) failed with errno=%d (%s) " , fullname , errno , strerror ( errno ) ) ;
return false ;
}
}
}
}
}
log ( LOG_DEBUG , " Cleaned up any unfinished merges of %s %s " , m_coll , m_dbname ) ;
return true ;
}
bool RdbBase : : loadFilesFromDir ( const char * dirName , bool isInMergeDir ) {
2016-10-21 13:38:48 +02:00
Dir dir ;
2016-11-02 12:22:18 +01:00
if ( ! dir . set ( dirName ) )
2016-10-21 15:27:27 +02:00
return false ;
2016-11-02 12:22:18 +01:00
if ( ! dir . open ( ) ) {
2016-10-21 13:38:48 +02:00
// we are getting this from a bogus dir
2016-11-02 12:22:56 +01:00
log ( LOG_WARN , " db: Had error opening directory %s " , dirName ) ;
2016-07-07 10:29:05 +02:00
return false ;
}
2013-08-02 13:12:24 -07:00
// note it
2016-11-02 12:22:18 +01:00
log ( LOG_DEBUG , " db: Loading files for %s coll=%s (% " PRId32 " ) from %s " ,
m_dbname , m_coll , ( int32_t ) m_collnum , dirName ) ;
2013-08-02 13:12:24 -07:00
// . set our m_files array
// . addFile() will return -1 and set g_errno on error
// . the lower the fileId the older the data
// (but not necessarily the file)
// . we now put a '*' at end of "*.dat*" since we may be reading in
// some headless BigFiles left over froma killed merge
2016-11-02 12:22:18 +01:00
while ( const char * filename = dir . getNextFilename ( " *.dat* " ) ) {
2013-08-02 13:12:24 -07:00
// ensure filename starts w/ our m_dbname
2016-11-02 12:22:18 +01:00
if ( strncmp ( filename , m_dbname , m_dbnameLen ) ! = 0 ) {
2013-08-02 13:12:24 -07:00
continue ;
2016-07-14 11:27:47 +02:00
}
int32_t fileId ;
int32_t fileId2 ;
int32_t mergeNum ;
int32_t endMergeFileId ;
2016-11-02 12:22:18 +01:00
if ( ! parseFilename ( filename , & fileId , & fileId2 , & mergeNum , & endMergeFileId ) ) {
2016-07-14 11:27:47 +02:00
continue ;
}
// validation
2013-08-02 13:12:24 -07:00
// if we are titledb, we got the secondary id
// . if we are titledb we should have a -xxx after
// . if none is there it needs to be converted!
2016-11-02 12:22:18 +01:00
if ( m_isTitledb & & fileId2 = = - 1 ) {
2013-08-02 13:12:24 -07:00
// critical
2016-11-02 12:22:56 +01:00
log ( LOG_ERROR , " gb: bad title filename of %s. Halting. " , filename ) ;
2013-08-02 13:12:24 -07:00
g_errno = EBADENGINEER ;
return false ;
}
2016-07-12 12:52:34 +02:00
2016-10-20 15:54:47 +02:00
// don't add if already in there (happens for eg dbname0001.dat.part*)
2016-11-02 12:22:18 +01:00
if ( hasFileId ( fileId ) )
2016-07-12 12:52:34 +02:00
continue ;
2016-10-18 15:59:59 +02:00
// sometimes an unlink() does not complete properly and we end up with
// remnant files that are 0 bytes. so let's clean up and skip them
SafeBuf fullFilename ;
2016-11-02 12:38:57 +01:00
fullFilename . safePrintf ( " %s/%s " , dirName , filename ) ;
2016-10-18 15:59:59 +02:00
struct stat st ;
2016-11-02 12:22:56 +01:00
if ( stat ( fullFilename . getBufStart ( ) , & st ) ! = 0 ) {
logError ( " stat(%s) failed with errno=%d (%s) " , fullFilename . getBufStart ( ) , errno , strerror ( errno ) ) ;
2016-07-12 12:52:34 +02:00
return false ;
}
2017-03-28 12:13:06 +02:00
// zero-sized non-merge files are suspicions and typically indicate data loss. So crahs if they are found
if ( st . st_size = = 0 & & ( fileId & 2 ) ! = 0 ) {
logError ( " zero sized file found: %s " , filename ) ;
2017-03-14 15:44:48 +01:00
gbshutdownCorrupted ( ) ;
2013-08-02 13:12:24 -07:00
}
2017-02-24 13:56:54 +01:00
if ( isInMergeDir )
log ( LOG_WARN , " db: found leftover merge file in merge dir: %s " , fullFilename . getBufStart ( ) ) ;
2013-08-02 13:12:24 -07:00
// . put this file into our array of files/maps for this db
// . MUST be in order of fileId for merging purposes
// . we assume older files come first so newer can override
// in RdbList::merge() routine
2016-11-02 12:22:18 +01:00
if ( addFile ( false , fileId , fileId2 , mergeNum , endMergeFileId , isInMergeDir ) < 0 ) {
2013-08-02 13:12:24 -07:00
return false ;
2016-04-27 15:15:50 +02:00
}
2013-08-02 13:12:24 -07:00
}
2016-04-27 15:15:50 +02:00
return true ;
2013-08-02 13:12:24 -07:00
}
2016-10-18 17:01:54 +02:00
2016-10-21 13:58:03 +02:00
bool RdbBase : : fixNonfirstSpiderdbFiles ( ) {
log ( LOG_WARN , " db: missing file id 0001.dat for %s in coll %s. "
" Fix this or it'll core later. Just rename the next file "
" in line to 0001.dat/map. We probably cored at a "
" really bad time during the end of a merge process. " ,
m_dbname , m_coll ) ;
// do not re-do repair! hmmm
if ( m_didRepair ) return false ;
// just fix it for them
BigFile bf ;
SafeBuf oldName ;
oldName . safePrintf ( " %s%04 " PRId32 " .dat " , m_dbname , m_fileInfo [ 0 ] . m_fileId ) ;
bf . set ( m_collectionDirName , oldName . getBufStart ( ) ) ;
// rename it to like "spiderdb.0001.dat"
SafeBuf newName ;
newName . safePrintf ( " %s/%s0001.dat " , m_collectionDirName , m_dbname ) ;
2016-10-25 16:12:17 +02:00
bf . rename ( newName . getBufStart ( ) , NULL ) ;
2016-10-21 13:58:03 +02:00
// and delete the old map
SafeBuf oldMap ;
oldMap . safePrintf ( " %s/%s0001.map " , m_collectionDirName , m_dbname ) ;
File omf ;
omf . set ( oldMap . getBufStart ( ) ) ;
omf . unlink ( ) ;
// get the map file name we want to move to 0001.map
BigFile cmf ;
SafeBuf curMap ;
curMap . safePrintf ( " %s%04 " PRId32 " .map " , m_dbname , m_fileInfo [ 0 ] . m_fileId ) ;
cmf . set ( m_collectionDirName , curMap . getBufStart ( ) ) ;
// rename to spiderdb0081.map to spiderdb0001.map
2016-10-25 16:12:17 +02:00
cmf . rename ( oldMap . getBufStart ( ) , NULL ) ;
2016-10-21 13:58:03 +02:00
if ( m_useIndexFile ) {
// and delete the old index
SafeBuf oldIndex ;
oldIndex . safePrintf ( " %s/%s0001.idx " , m_collectionDirName , m_dbname ) ;
File oif ;
oif . set ( oldIndex . getBufStart ( ) ) ;
oif . unlink ( ) ;
// get the index file name we want to move to 0001.idx
BigFile cif ;
SafeBuf curIndex ;
curIndex . safePrintf ( " %s%04 " PRId32 " .idx " , m_dbname , m_fileInfo [ 0 ] . m_fileId ) ;
cif . set ( m_collectionDirName , curIndex . getBufStart ( ) ) ;
// rename to spiderdb0081.map to spiderdb0001.map
2016-10-25 16:12:17 +02:00
cif . rename ( oldIndex . getBufStart ( ) , NULL ) ;
2016-10-21 13:58:03 +02:00
}
// replace that first file then
m_didRepair = true ;
return true ;
}
2016-10-18 17:01:54 +02:00
//Generate filename from the total 4 combinations of titledb/not-titledb and normal-file/merging-file
2016-11-03 16:17:14 +01:00
void RdbBase : : generateFilename ( char * buf , size_t bufsize , int32_t fileId , int32_t fileId2 , int32_t mergeNum , int32_t endMergeFileId , const char * extension ) {
2016-10-18 17:01:54 +02:00
if ( mergeNum < = 0 ) {
2016-11-03 16:17:14 +01:00
if ( m_isTitledb & & fileId2 > = 0 ) {
snprintf ( buf , bufsize , " %s%04d-%03d.%s " ,
m_dbname , fileId , fileId2 , extension ) ;
2016-10-18 17:01:54 +02:00
} else {
2016-11-03 16:17:14 +01:00
snprintf ( buf , bufsize , " %s%04d.%s " ,
m_dbname , fileId , extension ) ;
2016-10-18 17:01:54 +02:00
}
} else {
2016-11-03 16:17:14 +01:00
if ( m_isTitledb & & fileId2 > = 0 ) {
snprintf ( buf , bufsize , " %s%04d-%03d.%03d.%04d.%s " ,
m_dbname , fileId , fileId2 , mergeNum , endMergeFileId , extension ) ;
2016-10-18 17:01:54 +02:00
} else {
2016-11-03 16:17:14 +01:00
snprintf ( buf , bufsize , " %s%04d.%03d.%04d.%s " ,
m_dbname , fileId , mergeNum , endMergeFileId , extension ) ;
2016-10-18 17:01:54 +02:00
}
}
}
2013-08-02 13:12:24 -07:00
// return the fileNum we added it to in the array
2016-10-31 17:33:16 +01:00
// return -1 and set g_errno on error
2016-10-31 18:16:40 +01:00
int32_t RdbBase : : addFile ( bool isNew , int32_t fileId , int32_t fileId2 , int32_t mergeNum , int32_t endMergeFileId , bool isInMergeDir ) {
2016-10-18 16:09:02 +02:00
// sanity check
2016-10-20 17:02:34 +02:00
if ( fileId2 < 0 & & m_isTitledb )
2016-10-18 16:09:02 +02:00
gbshutdownLogicError ( ) ;
2013-08-02 13:12:24 -07:00
// can't exceed this
2016-10-18 14:07:55 +02:00
if ( m_numFiles > = MAX_RDB_FILES ) {
2016-05-02 16:03:40 +02:00
g_errno = ETOOMANYFILES ;
2016-07-07 10:29:05 +02:00
log ( LOG_LOGIC , " db: Can not have more than % " PRId32 " files. File add failed. " , ( int32_t ) MAX_RDB_FILES ) ;
2013-08-02 13:12:24 -07:00
return - 1 ;
}
2014-01-21 22:39:01 -08:00
2016-10-18 16:41:24 +02:00
// set the data file's filename
char name [ 1024 ] ;
2016-11-03 16:17:14 +01:00
generateDataFilename ( name , sizeof ( name ) , fileId , fileId2 , mergeNum , endMergeFileId ) ;
2016-10-18 16:41:24 +02:00
2013-08-02 13:12:24 -07:00
// HACK: skip to avoid a OOM lockup. if RdbBase cannot dump
// its data to disk it can backlog everyone and memory will
// never get freed up.
2016-11-02 12:31:37 +01:00
ScopedMemoryLimitBypass scopedMemoryLimitBypass ;
2016-05-02 16:03:40 +02:00
BigFile * f ;
2016-05-13 14:49:54 -07:00
2016-10-31 18:16:40 +01:00
const char * dirName = ! isInMergeDir ? m_collectionDirName : m_mergeDirName ;
2016-05-13 14:49:54 -07:00
2016-07-07 10:29:05 +02:00
try {
f = new ( BigFile ) ;
2017-05-07 20:51:33 +02:00
} catch ( std : : bad_alloc & ) {
2013-08-02 13:12:24 -07:00
g_errno = ENOMEM ;
2016-07-07 10:29:05 +02:00
log ( LOG_WARN , " RdbBase: new(%i): %s " , ( int ) sizeof ( BigFile ) , mstrerror ( g_errno ) ) ;
2016-05-02 16:03:40 +02:00
return - 1 ;
2013-08-02 13:12:24 -07:00
}
2016-05-02 16:03:40 +02:00
mnew ( f , sizeof ( BigFile ) , " RdbBFile " ) ;
2015-08-21 14:00:40 -07:00
2016-10-31 18:16:40 +01:00
f - > set ( dirName , name ) ;
2015-08-21 14:00:40 -07:00
2016-05-13 13:21:43 -07:00
// if new ensure does not exist
2017-04-27 17:16:40 +02:00
if ( isNew & & f - > doesExist ( ) ) {
logError ( " rdb: trying to create NEW file %s/%s which already exists! " , f - > getDir ( ) , f - > getFilename ( ) ) ;
gbshutdownCorrupted ( ) ;
2015-08-21 14:00:40 -07:00
}
2013-08-02 13:12:24 -07:00
RdbMap * m ;
2016-07-07 10:29:05 +02:00
try {
m = new ( RdbMap ) ;
2017-05-07 20:51:33 +02:00
} catch ( std : : bad_alloc & ) {
2013-08-02 13:12:24 -07:00
g_errno = ENOMEM ;
2016-05-02 16:03:40 +02:00
log ( LOG_WARN , " RdbBase: new(%i): %s " , ( int ) sizeof ( RdbMap ) , mstrerror ( g_errno ) ) ;
2013-12-18 17:20:53 -08:00
mdelete ( f , sizeof ( BigFile ) , " RdbBFile " ) ;
2013-08-02 13:12:24 -07:00
delete ( f ) ;
return - 1 ;
}
2016-07-07 10:29:05 +02:00
2013-12-18 17:20:53 -08:00
mnew ( m , sizeof ( RdbMap ) , " RdbBMap " ) ;
2016-08-18 15:05:41 +02:00
RdbIndex * in = NULL ;
2016-08-05 15:42:20 +02:00
if ( m_useIndexFile ) {
try {
in = new ( RdbIndex ) ;
2017-05-07 20:51:33 +02:00
} catch ( std : : bad_alloc & ) {
2016-08-05 15:42:20 +02:00
g_errno = ENOMEM ;
log ( LOG_WARN , " RdbBase: new(%i): %s " , ( int ) sizeof ( RdbIndex ) , mstrerror ( g_errno ) ) ;
mdelete ( f , sizeof ( BigFile ) , " RdbBFile " ) ;
delete ( f ) ;
mdelete ( m , sizeof ( RdbMap ) , " RdbBMap " ) ;
delete ( m ) ;
return - 1 ;
}
mnew ( in , sizeof ( RdbIndex ) , " RdbBIndex " ) ;
}
2016-07-07 10:29:05 +02:00
2013-08-02 13:12:24 -07:00
// reinstate the memory limit
2016-11-02 12:31:37 +01:00
scopedMemoryLimitBypass . release ( ) ;
2016-07-07 10:29:05 +02:00
2014-06-04 12:15:12 -07:00
// debug help
2016-05-02 16:03:40 +02:00
if ( isNew ) {
2016-11-02 14:26:58 +01:00
log ( LOG_DEBUG , " db: adding new file %s/%s " , f - > getDir ( ) , f - > getFilename ( ) ) ;
2016-05-02 16:03:40 +02:00
}
2014-06-04 12:15:12 -07:00
2013-08-02 13:12:24 -07:00
// if not a new file sanity check it
2016-10-27 14:28:04 +02:00
for ( int32_t j = 0 ; ! isNew & & j < f - > getMaxParts ( ) - 1 ; j + + ) {
2015-08-16 19:40:08 -07:00
// might be headless
2016-07-07 10:29:05 +02:00
File * ff = f - > getFile2 ( j ) ;
if ( ! ff ) {
continue ;
}
2016-07-12 11:25:13 +02:00
if ( ff - > getFileSize ( ) = = MAX_PART_SIZE ) {
continue ;
}
2016-05-20 09:18:32 +02:00
log ( LOG_WARN , " db: File %s %s has length % " PRId64 " , but it should be % " PRId64 " . "
2013-08-02 13:12:24 -07:00
" You should move it to a temporary directory "
" and restart. It probably happened when the power went "
" out and a file delete operation failed to complete. " ,
2015-08-16 17:14:21 -07:00
f - > getDir ( ) ,
2013-08-02 13:12:24 -07:00
ff - > getFilename ( ) ,
2014-10-30 13:36:39 -06:00
( int64_t ) ff - > getFileSize ( ) ,
( int64_t ) MAX_PART_SIZE ) ;
2016-07-12 11:39:54 +02:00
gbshutdownCorrupted ( ) ;
2013-08-02 13:12:24 -07:00
}
// set the map file's filename
2016-11-02 14:26:58 +01:00
char mapName [ 1024 ] ;
2016-11-03 16:17:14 +01:00
generateMapFilename ( mapName , sizeof ( mapName ) , fileId , fileId2 , 0 , - 1 ) ;
2016-11-02 14:26:58 +01:00
m - > set ( dirName , mapName , m_fixedDataSize , m_useHalfKeys , m_ks , m_pageSize ) ;
2017-05-29 16:27:39 +02:00
if ( ! isNew & & ! isInMergeDir & & ! m - > readMap ( f ) ) {
2013-08-02 13:12:24 -07:00
// if out of memory, do not try to regen for that
2016-07-07 10:29:05 +02:00
if ( g_errno = = ENOMEM ) {
return - 1 ;
}
2013-08-02 13:12:24 -07:00
g_errno = 0 ;
2016-11-02 14:26:58 +01:00
log ( " db: Could not read map file %s " , mapName ) ;
2016-05-02 16:03:40 +02:00
2013-08-02 13:12:24 -07:00
// if 'gb dump X collname' was called, bail, we do not
// want to write any data
2016-07-20 18:15:27 +02:00
if ( g_dumpMode ) {
return - 1 ;
}
2016-05-02 16:03:40 +02:00
2016-07-12 11:25:13 +02:00
log ( LOG_INFO , " db: Attempting to generate map file for data file %s* of % " PRId64 " bytes. May take a while. " ,
f - > getFilename ( ) , f - > getFileSize ( ) ) ;
2016-04-27 15:15:50 +02:00
2013-08-02 13:12:24 -07:00
// this returns false and sets g_errno on error
2016-08-18 15:05:41 +02:00
if ( ! m - > generateMap ( f ) ) {
log ( LOG_ERROR , " db: Map generation failed. " ) ;
2016-07-12 11:39:54 +02:00
gbshutdownCorrupted ( ) ;
2013-08-02 13:12:24 -07:00
}
2016-07-07 10:29:05 +02:00
2016-07-12 14:51:25 +02:00
log ( LOG_INFO , " db: Map generation succeeded. " ) ;
2016-07-07 10:29:05 +02:00
2013-08-02 13:12:24 -07:00
// . save it
// . if we're an even #'d file a merge will follow
// when main.cpp calls attemptMerge()
log ( " db: Saving generated map file to disk. " ) ;
2016-07-07 10:29:05 +02:00
2015-08-08 15:43:09 -07:00
// true = alldone
bool status = m - > writeMap ( true ) ;
2016-07-07 10:29:05 +02:00
if ( ! status ) {
log ( LOG_ERROR , " db: Save failed. " ) ;
2016-10-22 10:02:59 +02:00
return - 1 ;
2016-07-07 10:29:05 +02:00
}
2013-08-02 13:12:24 -07:00
}
2014-11-20 16:53:07 -08:00
2016-11-02 14:26:58 +01:00
if ( ! isNew ) {
log ( LOG_DEBUG , " db: Added %s for collnum=% " PRId32 " pages=% " PRId32 ,
mapName , ( int32_t ) m_collnum , m - > getNumPages ( ) ) ;
}
2016-08-05 15:42:20 +02:00
if ( m_useIndexFile ) {
2016-11-02 14:26:58 +01:00
char indexName [ 1024 ] ;
2016-08-05 15:42:20 +02:00
// set the index file's filename
2016-11-03 16:17:14 +01:00
generateIndexFilename ( indexName , sizeof ( indexName ) , fileId , fileId2 , 0 , - 1 ) ;
2017-05-24 14:26:07 +02:00
in - > set ( dirName , indexName , m_fixedDataSize , m_useHalfKeys , m_ks , m_rdb - > getRdbId ( ) , ( ! isNew & & ! isInMergeDir ) ) ;
2017-05-29 16:27:39 +02:00
if ( ! isNew & & ! isInMergeDir & & ! ( in - > readIndex ( ) & & in - > verifyIndex ( ) ) ) {
2016-08-05 15:42:20 +02:00
// if out of memory, do not try to regen for that
2016-08-16 15:08:26 +02:00
if ( g_errno = = ENOMEM ) {
2016-08-05 15:42:20 +02:00
return - 1 ;
}
g_errno = 0 ;
2016-11-02 14:26:58 +01:00
log ( LOG_WARN , " db: Could not read index file %s " , indexName ) ;
2016-08-05 15:42:20 +02:00
2016-08-16 15:08:26 +02:00
// if 'gb dump X collname' was called, bail, we do not want to write any data
if ( g_dumpMode ) {
2016-08-05 15:42:20 +02:00
return - 1 ;
}
2016-08-16 15:08:26 +02:00
log ( LOG_INFO , " db: Attempting to generate index file for data file %s* of % " PRId64 " bytes. May take a while. " ,
2016-08-05 15:42:20 +02:00
f - > getFilename ( ) , f - > getFileSize ( ) ) ;
// this returns false and sets g_errno on error
2016-08-16 15:08:26 +02:00
if ( ! in - > generateIndex ( f ) ) {
2016-08-17 01:21:25 +02:00
logError ( " db: Index generation failed for %s. " , f - > getFilename ( ) ) ;
2016-08-05 15:42:20 +02:00
gbshutdownCorrupted ( ) ;
}
2016-08-17 01:21:25 +02:00
log ( LOG_INFO , " db: Index generation succeeded. " ) ;
2016-08-05 15:42:20 +02:00
// . save it
// . if we're an even #'d file a merge will follow
// when main.cpp calls attemptMerge()
log ( " db: Saving generated index file to disk. " ) ;
2017-03-13 13:39:10 +01:00
bool status = in - > writeIndex ( true ) ;
2016-08-05 15:42:20 +02:00
if ( ! status ) {
log ( LOG_ERROR , " db: Save failed. " ) ;
2016-10-22 10:02:59 +02:00
return - 1 ;
2016-08-05 15:42:20 +02:00
}
}
2016-11-02 14:26:58 +01:00
if ( ! isNew ) {
log ( LOG_DEBUG , " db: Added %s for collnum=% " PRId32 " docId count=% " PRIu64 ,
indexName , ( int32_t ) m_collnum , in - > getDocIds ( ) - > size ( ) ) ;
}
}
2014-11-07 08:49:08 -08:00
2016-11-02 14:26:58 +01:00
if ( ! isNew ) {
2016-10-22 10:02:59 +02:00
// open this big data file for reading only
2016-07-12 11:25:13 +02:00
if ( mergeNum < 0 ) {
2016-12-20 14:55:47 +01:00
f - > open ( O_RDONLY ) ;
2016-07-12 11:25:13 +02:00
} else {
// otherwise, merge will have to be resumed so this file
// should be writable
2016-12-20 14:55:47 +01:00
f - > open ( O_RDWR ) ;
2016-07-12 11:25:13 +02:00
}
2016-12-20 14:21:23 +01:00
f - > setFlushingIsApplicable ( ) ;
2013-08-02 13:12:24 -07:00
}
2016-04-27 15:15:50 +02:00
2013-08-02 13:12:24 -07:00
// find the position to add so we maintain order by fileId
2014-11-10 14:45:11 -08:00
int32_t i ;
2016-07-12 11:25:13 +02:00
for ( i = 0 ; i < m_numFiles ; i + + ) {
2016-10-04 15:06:54 +02:00
if ( m_fileInfo [ i ] . m_fileId > = fileId ) {
2016-07-12 11:25:13 +02:00
break ;
}
}
2013-08-02 13:12:24 -07:00
// cannot collide here
2016-10-04 15:06:54 +02:00
if ( i < m_numFiles & & m_fileInfo [ i ] . m_fileId = = fileId ) {
2016-07-12 11:25:13 +02:00
log ( LOG_LOGIC , " db: addFile: fileId collided. " ) ;
return - 1 ;
}
2016-10-18 16:20:17 +02:00
// shift everyone up if we need to fit this file in the middle somewhere
memmove ( m_fileInfo + i + 1 , m_fileInfo + i , ( m_numFiles - i ) * sizeof ( m_fileInfo [ 0 ] ) ) ;
2015-01-06 11:28:55 -08:00
2013-08-02 13:12:24 -07:00
// insert this file into position #i
2016-10-04 15:06:54 +02:00
m_fileInfo [ i ] . m_fileId = fileId ;
m_fileInfo [ i ] . m_fileId2 = fileId2 ;
m_fileInfo [ i ] . m_file = f ;
m_fileInfo [ i ] . m_map = m ;
m_fileInfo [ i ] . m_index = in ;
2016-10-31 18:16:40 +01:00
if ( ! isInMergeDir ) {
if ( fileId & 1 )
m_fileInfo [ i ] . m_allowReads = true ;
else
m_fileInfo [ i ] . m_allowReads = false ;
} else {
m_fileInfo [ i ] . m_allowReads = false ; //until we know for sure it is finished
}
2017-03-21 12:27:13 +01:00
m_fileInfo [ i ] . m_pendingGenerateIndex = false ;
2015-01-06 11:28:55 -08:00
2013-08-02 13:12:24 -07:00
// are we resuming a killed merge?
2016-07-13 12:49:29 +02:00
if ( g_conf . m_readOnlyMode & & ( ( fileId & 0x01 ) = = 0 ) ) {
2013-08-02 13:12:24 -07:00
log ( " db: Cannot start in read only mode with an incomplete "
" merge, because we might be a temporary cluster and "
" the merge might be active. " ) ;
2016-07-12 11:39:54 +02:00
gbshutdownCorrupted ( ) ;
2013-08-02 13:12:24 -07:00
}
// inc # of files we have
m_numFiles + + ;
2016-10-31 17:33:16 +01:00
2013-08-02 13:12:24 -07:00
// if we added a merge file, mark it
if ( mergeNum > = 0 ) {
m_mergeStartFileNum = i + 1 ; //merge was starting w/ this file
}
2016-07-14 11:27:47 +02:00
2013-08-02 13:12:24 -07:00
return i ;
}
2017-04-19 16:20:24 +02:00
int32_t RdbBase : : addNewFile ( int32_t * fileIdPtr ) {
2016-10-31 17:33:16 +01:00
//No clue about why titledb is different. it just is.
2016-10-20 17:02:34 +02:00
int32_t id2 = m_isTitledb ? 0 : - 1 ;
2016-10-20 16:43:23 +02:00
2016-10-31 18:16:40 +01:00
ScopedLock sl ( m_mtxFileInfo ) ; //a bit heavy-handed but incorporateMerge modifies and may call
//attemptMerge again while the normalt RdbDump calls addFile() too.
2016-07-14 11:27:47 +02:00
int32_t maxFileId = 0 ;
2016-05-02 14:07:14 +02:00
for ( int32_t i = 0 ; i < m_numFiles ; i + + ) {
2016-10-04 15:06:54 +02:00
if ( m_fileInfo [ i ] . m_fileId > maxFileId ) {
int32_t currentFileId = m_fileInfo [ i ] . m_fileId ;
2016-07-14 11:27:47 +02:00
if ( ( currentFileId & 0x01 ) = = 0 ) {
// merge file
2016-10-04 15:06:54 +02:00
const char * filename = m_fileInfo [ i ] . m_file - > getFilename ( ) ;
2016-07-14 11:27:47 +02:00
int32_t mergeFileId ;
int32_t mergeFileId2 ;
int32_t mergeNum ;
int32_t endMergeFileId ;
2017-04-26 13:21:00 +02:00
// we need to parse the filename to get the maxFileId to handle scenario where gb crashes while
// unlinking files. we don't want a newly dumped file to fall into the 'merge' range that have been
// unlinked because when we resume the killed merge, those files will be removed.
2016-07-14 11:27:47 +02:00
if ( parseFilename ( filename , & mergeFileId , & mergeFileId2 , & mergeNum , & endMergeFileId ) ) {
maxFileId = endMergeFileId ;
} else {
// error parsing file, fallback to previous behaviour
maxFileId = currentFileId ;
}
} else {
maxFileId = currentFileId ;
}
2016-05-02 14:07:14 +02:00
}
}
2013-08-02 13:12:24 -07:00
// . we like to keep even #'s for merge file names
2017-04-19 16:20:24 +02:00
* fileIdPtr = maxFileId + ( ( ( maxFileId & 0x01 ) = = 0 ) ? 1 : 2 ) ;
2016-04-27 15:15:50 +02:00
2017-04-19 16:20:24 +02:00
int32_t rc = addFile ( true , * fileIdPtr , id2 , - 1 , - 1 , false ) ;
2016-11-01 13:46:48 +01:00
if ( rc > = 0 )
m_fileInfo [ rc ] . m_allowReads = false ; //until we know for sure. See markNewFileReadable()
return rc ;
}
//Mark a newly dumped file as finished
void RdbBase : : markNewFileReadable ( ) {
ScopedLock sl ( m_mtxFileInfo ) ;
if ( m_numFiles = = 0 )
gbshutdownLogicError ( ) ;
if ( m_fileInfo [ m_numFiles - 1 ] . m_allowReads )
gbshutdownLogicError ( ) ;
m_fileInfo [ m_numFiles - 1 ] . m_allowReads = true ;
2013-08-02 13:12:24 -07:00
}
2017-05-10 14:05:32 +02:00
int32_t RdbBase : : getFileId ( int32_t n ) {
ScopedLock sl ( m_mtxFileInfo ) ;
return m_fileInfo [ n ] . m_fileId ;
}
2017-04-27 11:44:50 +02:00
BigFile * RdbBase : : getFile ( int32_t n ) {
2017-05-03 10:10:54 +02:00
ScopedLock sl ( m_mtxFileInfo ) ;
2017-04-27 11:44:50 +02:00
return m_fileInfo [ n ] . m_file ;
}
2017-05-10 14:05:32 +02:00
BigFile * RdbBase : : getFileById ( int32_t fileId ) {
ScopedLock sl ( m_mtxFileInfo ) ;
for ( auto i = 0 ; i < m_numFiles ; + + i ) {
if ( m_fileInfo [ i ] . m_fileId = = fileId ) {
return m_fileInfo [ i ] . m_file ;
}
}
return NULL ;
}
2017-04-27 11:44:50 +02:00
int32_t RdbBase : : isRootFile ( int32_t n ) const {
ScopedLock sl ( m_mtxFileInfo ) ;
return n = = 0 | | m_fileInfo [ n ] . m_fileId = = 1 ;
}
RdbMap * RdbBase : : getMap ( int32_t n ) {
2017-05-03 10:10:54 +02:00
ScopedLock sl ( m_mtxFileInfo ) ;
2017-04-27 11:44:50 +02:00
return m_fileInfo [ n ] . m_map ;
}
2017-05-10 14:05:32 +02:00
RdbMap * RdbBase : : getMapById ( int32_t fileId ) {
ScopedLock sl ( m_mtxFileInfo ) ;
for ( auto i = 0 ; i < m_numFiles ; + + i ) {
if ( m_fileInfo [ i ] . m_fileId = = fileId ) {
return m_fileInfo [ i ] . m_map ;
}
}
return NULL ;
}
2017-04-27 11:44:50 +02:00
RdbIndex * RdbBase : : getIndex ( int32_t n ) {
2017-05-03 10:10:54 +02:00
ScopedLock sl ( m_mtxFileInfo ) ;
2017-04-27 11:44:50 +02:00
return m_fileInfo [ n ] . m_index ;
}
bool RdbBase : : isReadable ( int32_t n ) const {
ScopedLock sl ( m_mtxFileInfo ) ;
return m_fileInfo [ n ] . m_allowReads ;
}
2016-10-03 16:11:32 +02:00
2016-10-31 18:16:40 +01:00
void RdbBase : : incrementOutstandingJobs ( ) {
ScopedLock sl ( m_mtxJobCount ) ;
m_outstandingJobCount + + ;
if ( m_outstandingJobCount < = 0 ) gbshutdownLogicError ( ) ;
}
bool RdbBase : : decrementOustandingJobs ( ) {
ScopedLock sl ( m_mtxJobCount ) ;
if ( m_outstandingJobCount < = 0 ) gbshutdownLogicError ( ) ;
m_outstandingJobCount - - ;
return m_outstandingJobCount = = 0 & & ! m_submittingJobs ;
2016-10-03 16:11:32 +02:00
}
2017-04-24 17:31:57 +02:00
static int32_t getMaxLostPositivesPercentage ( rdbid_t rdbId ) {
switch ( rdbId ) {
case RDB_POSDB :
case RDB2_POSDB2 :
return g_conf . m_posdbMaxLostPositivesPercentage ;
case RDB_TAGDB :
case RDB2_TAGDB2 :
return g_conf . m_tagdbMaxLostPositivesPercentage ;
case RDB_CLUSTERDB :
case RDB2_CLUSTERDB2 :
return g_conf . m_clusterdbMaxLostPositivesPercentage ;
case RDB_TITLEDB :
case RDB2_TITLEDB2 :
return g_conf . m_titledbMaxLostPositivesPercentage ;
case RDB_SPIDERDB :
case RDB2_SPIDERDB2 :
return g_conf . m_spiderdbMaxLostPositivesPercentage ;
case RDB_LINKDB :
case RDB2_LINKDB2 :
return g_conf . m_linkdbMaxLostPositivesPercentage ;
case RDB_NONE :
case RDB_END :
default :
logError ( " rdb: bad lookup rdbid of %i " , ( int ) rdbId ) ;
gbshutdownLogicError ( ) ;
}
}
2016-10-31 18:16:40 +01:00
2017-06-09 16:28:38 +02:00
void RdbBase : : saveRdbIndexRdbMap ( void * state ) {
RdbBase * that = static_cast < RdbBase * > ( state ) ;
int32_t x = that - > m_mergeStartFileNum - 1 ; // file #x is the merged file
// note
log ( LOG_INFO , " db: Writing map %s. " , that - > m_fileInfo [ x ] . m_map - > getFilename ( ) ) ;
// . ensure we can save the map before deleting other files
// . sets g_errno and return false on error
// . allDone = true
bool status = that - > m_fileInfo [ x ] . m_map - > writeMap ( true ) ;
if ( ! status ) {
// unable to write, let's abort
gbshutdownResourceError ( ) ;
}
if ( that - > m_useIndexFile ) {
status = that - > m_fileInfo [ x ] . m_index - > writeIndex ( true ) ;
if ( ! status ) {
// unable to write, let's abort
log ( LOG_ERROR , " db: Could not write index for %s, Exiting. " , that - > m_dbname ) ;
gbshutdownAbort ( true ) ;
}
}
}
void RdbBase : : savedRdbIndexRdbMap ( void * state , job_exit_t job_state ) {
RdbBase * that = static_cast < RdbBase * > ( state ) ;
if ( job_state = = job_exit_program_exit ) {
// we want to make sure we save before exiting
saveRdbIndexRdbMap ( that ) ;
}
that - > incorporateMerge2 ( ) ;
}
2013-08-02 13:12:24 -07:00
// . called after the merge has successfully completed
// . the final merge file is always file #0 (i.e. "indexdb0000.dat/map")
2017-06-09 16:28:38 +02:00
void RdbBase : : incorporateMerge ( ) {
// . we can't just unlink the merge file on error anymore
// . it may have some data that was deleted from the original file
if ( g_errno ) {
log ( LOG_ERROR , " db: Merge failed for %s, Exiting. " , m_dbname ) ;
// we don't have a recovery system in place, so save state and dump core
gbshutdownAbort ( true ) ;
}
2016-11-01 12:13:31 +01:00
// merge source range [a..b), merge target x
2014-11-10 14:45:11 -08:00
int32_t a = m_mergeStartFileNum ;
2016-11-01 12:13:31 +01:00
int32_t b = std : : min ( m_mergeStartFileNum + m_numFilesToMerge , m_numFiles ) ;
2015-12-18 15:19:24 +01:00
2017-06-09 16:28:38 +02:00
/// @todo ALC verify if this ever happens
2013-08-02 13:12:24 -07:00
// shouldn't be called if no files merged
2017-06-09 16:28:38 +02:00
if ( a > = b ) {
2014-03-14 22:15:08 -07:00
// unless resuming after a merge completed and we exited
// but forgot to finish renaming the final file!!!!
log ( " merge: renaming final file " ) ;
2015-12-18 15:19:24 +01:00
2014-01-16 13:38:22 -08:00
// decrement this count
2017-06-09 16:28:38 +02:00
if ( m_isMerging ) {
2016-08-24 18:04:06 +02:00
m_rdb - > decrementNumMerges ( ) ;
2015-12-18 15:19:24 +01:00
}
2013-08-02 13:12:24 -07:00
// exit merge mode
m_isMerging = false ;
}
2015-12-18 15:19:24 +01:00
2017-06-09 16:28:38 +02:00
if ( g_jobScheduler . submit ( saveRdbIndexRdbMap , savedRdbIndexRdbMap , this , thread_type_file_merge , 0 ) ) {
return ;
2013-08-02 13:12:24 -07:00
}
2015-12-18 15:19:24 +01:00
2017-06-09 16:28:38 +02:00
// unable to submit job
saveRdbIndexRdbMap ( this ) ;
savedRdbIndexRdbMap ( this , job_exit_normal ) ;
}
2013-08-02 13:12:24 -07:00
2017-06-09 16:28:38 +02:00
void RdbBase : : incorporateMerge2 ( ) {
// merge source range [a..b), merge target x
int32_t a = m_mergeStartFileNum ;
int32_t b = std : : min ( m_mergeStartFileNum + m_numFilesToMerge , m_numFiles ) ;
int32_t x = m_mergeStartFileNum - 1 ; // file #x is the merged file
2016-08-11 14:23:57 +02:00
// print out info of newly merged file
2016-10-31 17:36:48 +01:00
int64_t postmergePositiveRecords = m_fileInfo [ x ] . m_map - > getNumPositiveRecs ( ) ;
int64_t postmergeNegativeRecords = m_fileInfo [ x ] . m_map - > getNumNegativeRecs ( ) ;
2016-12-27 13:59:23 +01:00
log ( LOG_INFO , " merge: Merge succeeded. %s (#%d) has % " PRId64 " positive and % " PRId64 " negative recs. " ,
m_fileInfo [ x ] . m_file - > getFilename ( ) , x , postmergePositiveRecords , postmergeNegativeRecords ) ;
log ( LOG_INFO , " merge: Files had % " PRId64 " positive and % " PRId64 " negative recs. " ,
m_premergeNumPositiveRecords , m_premergeNumNegativeRecords ) ;
2016-08-05 15:42:20 +02:00
2017-04-27 17:19:40 +02:00
// there should never be a scenario where we get 0 positive recs when we have positive recs before merge
if ( m_premergeNumPositiveRecords ! = 0 & & postmergePositiveRecords = = 0 ) {
2017-03-14 10:53:56 +01:00
logError ( " Merge ended with 0 positive records. " ) ;
gbshutdownCorrupted ( ) ;
}
2016-08-05 15:42:20 +02:00
2013-08-02 13:12:24 -07:00
// . bitch if bad news
2016-10-31 17:36:48 +01:00
if ( postmergePositiveRecords > m_premergeNumPositiveRecords ) {
log ( LOG_INFO , " merge: %s gained % " PRId64 " positives. " , m_dbname , postmergePositiveRecords - m_premergeNumPositiveRecords ) ;
2016-11-10 16:53:18 +01:00
//note: also seen when resuming an interrupted merge, inwhich case there is probably nothing wrong
2013-08-02 13:12:24 -07:00
}
2015-12-18 15:19:24 +01:00
2017-04-28 16:10:37 +02:00
if ( postmergePositiveRecords < ( m_premergeNumPositiveRecords - m_premergeNumNegativeRecords ) ) {
2017-04-19 15:54:15 +02:00
int64_t lostPositive = m_premergeNumPositiveRecords - postmergePositiveRecords ;
2017-04-28 16:10:37 +02:00
if ( m_premergeNumPositiveRecords > 0 ) {
double lostPercentage = ( lostPositive * 100.00 ) / m_premergeNumPositiveRecords ;
2017-04-19 15:54:15 +02:00
2017-04-28 16:10:37 +02:00
log ( LOG_INFO , " merge: %s: lost % " PRId64 " (%.2f%%) positives " , m_dbname , lostPositive , lostPercentage ) ;
int32_t maxLostPercentage = getMaxLostPositivesPercentage ( m_rdb - > getRdbId ( ) ) ;
if ( lostPercentage > maxLostPercentage ) {
log ( LOG_ERROR , " merge: %s: lost more than %d%% of positive records. Aborting. " , m_dbname , maxLostPercentage ) ;
gbshutdownCorrupted ( ) ;
}
} else {
// this case is unlikely, but coverity complained about it
log ( LOG_INFO , " merge: %s: lost % " PRId64 " positives " , m_dbname , lostPositive ) ;
2017-04-19 15:54:15 +02:00
}
2013-08-02 13:12:24 -07:00
}
2015-12-18 15:19:24 +01:00
2016-10-31 17:36:48 +01:00
if ( postmergeNegativeRecords > m_premergeNumNegativeRecords ) {
log ( LOG_INFO , " merge: %s: gained % " PRId64 " negatives. " , m_dbname , postmergeNegativeRecords - m_premergeNumNegativeRecords ) ;
2013-08-02 13:12:24 -07:00
}
2015-12-18 15:19:24 +01:00
2016-10-31 17:36:48 +01:00
if ( postmergeNegativeRecords < m_premergeNumNegativeRecords - m_premergeNumPositiveRecords ) {
log ( LOG_INFO , " merge: %s: lost % " PRId64 " negatives. " , m_dbname , m_premergeNumNegativeRecords - postmergeNegativeRecords ) ;
2013-08-02 13:12:24 -07:00
}
2016-10-31 18:16:40 +01:00
{
ScopedLock sl ( m_mtxJobCount ) ;
if ( m_outstandingJobCount ! = 0 )
gbshutdownCorrupted ( ) ;
}
2013-08-02 13:12:24 -07:00
// . before unlinking the files, ensure merged file is the right size!!
// . this will save us some anguish
2016-10-04 15:11:39 +02:00
m_fileInfo [ x ] . m_file - > invalidateFileSize ( ) ;
2015-12-18 15:19:24 +01:00
2016-10-04 15:06:54 +02:00
int64_t fs = m_fileInfo [ x ] . m_file - > getFileSize ( ) ;
2017-03-14 15:11:39 +01:00
if ( fs = = 0 ) {
// zero sized file?
logError ( " zero sized file after merge for file=%s " , m_fileInfo [ x ] . m_file - > getFilename ( ) ) ;
gbshutdownCorrupted ( ) ;
}
2015-12-18 15:19:24 +01:00
2013-08-02 13:12:24 -07:00
// get file size from map
2016-10-04 15:06:54 +02:00
int64_t fs2 = m_fileInfo [ x ] . m_map - > getFileSize ( ) ;
2015-12-18 15:19:24 +01:00
2013-08-02 13:12:24 -07:00
// compare, if only a key off allow that. that is an artificat of
// generating a map for a file screwed up from a power outage. it
// will end on a non-key boundary.
if ( fs ! = fs2 ) {
2016-07-04 16:11:38 +02:00
log ( LOG_ERROR , " build: Map file size does not agree with actual file "
2016-05-20 09:18:32 +02:00
" size for %s. Map says it should be % " PRId64 " bytes but it "
" is % " PRId64 " bytes. " ,
2016-10-04 15:06:54 +02:00
m_fileInfo [ x ] . m_file - > getFilename ( ) , fs2 , fs ) ;
2016-10-18 16:18:42 +02:00
if ( fs2 - fs > 12 | | fs - fs2 > 12 )
gbshutdownCorrupted ( ) ;
2013-08-02 13:12:24 -07:00
// now print the exception
2016-07-04 16:11:38 +02:00
log ( LOG_WARN , " build: continuing since difference is less than 12 "
2013-08-02 13:12:24 -07:00
" bytes. Most likely a discrepancy caused by a power "
" outage and the generated map file is off a bit. " ) ;
}
2017-03-17 14:48:10 +01:00
{
ScopedLock sl ( m_mtxFileInfo ) ;
2016-10-31 18:16:40 +01:00
2017-03-17 14:48:10 +01:00
//allow/disallow reads while incorporating merged file
m_fileInfo [ x ] . m_allowReads = true ; //newly merge file is finished and valid
for ( int i = a ; i < b ; i + + ) {
m_fileInfo [ i ] . m_allowReads = false ; //source files will be deleted shortly
}
submitGlobalIndexJob_unlocked ( false , - 1 ) ;
}
2016-10-31 18:16:40 +01:00
{
ScopedLock sl ( m_mtxJobCount ) ;
m_submittingJobs = true ;
}
2013-08-02 13:12:24 -07:00
// on success unlink the files we merged and free them
2016-10-17 11:42:29 +02:00
for ( int32_t i = a ; i < b & & i < m_numFiles ; i + + ) {
2013-08-02 13:12:24 -07:00
// debug msg
2016-05-20 09:18:32 +02:00
log ( LOG_INFO , " merge: Unlinking merged file %s/%s (#% " PRId32 " ). " ,
2016-10-04 15:06:54 +02:00
m_fileInfo [ i ] . m_file - > getDir ( ) , m_fileInfo [ i ] . m_file - > getFilename ( ) , i ) ;
2015-12-18 15:19:24 +01:00
2013-08-02 13:12:24 -07:00
// . these links will be done in a thread
// . they will save the filename before spawning so we can
2016-10-04 15:06:54 +02:00
// delete the m_fileInfo[i].m_file now
if ( ! m_fileInfo [ i ] . m_file - > unlink ( unlinkDoneWrapper , this ) ) {
2016-10-31 18:16:40 +01:00
incrementOutstandingJobs ( ) ;
2015-12-18 15:19:24 +01:00
} else {
// debug msg
// MDW this cores if file is bad... if collection got delete from under us i guess!!
2016-10-04 15:06:54 +02:00
log ( LOG_INFO , " merge: Unlinked %s (#% " PRId32 " ). " , m_fileInfo [ i ] . m_file - > getFilename ( ) , i ) ;
2015-12-18 15:19:24 +01:00
}
2013-08-02 13:12:24 -07:00
// debug msg
2016-10-04 15:06:54 +02:00
log ( LOG_INFO , " merge: Unlinking map file %s (#% " PRId32 " ). " , m_fileInfo [ i ] . m_map - > getFilename ( ) , i ) ;
2015-12-18 15:19:24 +01:00
2016-10-04 15:06:54 +02:00
if ( ! m_fileInfo [ i ] . m_map - > unlink ( unlinkDoneWrapper , this ) ) {
2016-10-31 18:16:40 +01:00
incrementOutstandingJobs ( ) ;
2015-12-18 15:19:24 +01:00
} else {
2013-08-02 13:12:24 -07:00
// debug msg
2016-10-04 15:06:54 +02:00
log ( LOG_INFO , " merge: Unlinked %s (#% " PRId32 " ). " , m_fileInfo [ i ] . m_map - > getFilename ( ) , i ) ;
2015-12-18 15:19:24 +01:00
}
2016-10-06 14:30:13 +02:00
2016-08-05 15:42:20 +02:00
if ( m_useIndexFile ) {
2016-10-04 15:06:54 +02:00
log ( LOG_INFO , " merge: Unlinking index file %s (#% " PRId32 " ). " , m_fileInfo [ i ] . m_index - > getFilename ( ) , i ) ;
2016-08-05 15:42:20 +02:00
2016-10-04 15:06:54 +02:00
if ( ! m_fileInfo [ i ] . m_index - > unlink ( unlinkDoneWrapper , this ) ) {
2016-10-31 18:16:40 +01:00
incrementOutstandingJobs ( ) ;
2016-08-05 15:42:20 +02:00
} else {
// debug msg
2016-10-04 15:06:54 +02:00
log ( LOG_INFO , " merge: Unlinked %s (#% " PRId32 " ). " , m_fileInfo [ i ] . m_index - > getFilename ( ) , i ) ;
2016-08-05 15:42:20 +02:00
}
}
2013-08-02 13:12:24 -07:00
}
2017-02-28 13:45:51 +01:00
if ( g_errno ) {
log ( LOG_ERROR , " merge: unlinking source files failed, g_errno=%d (%s) " , g_errno , mstrerror ( g_errno ) ) ;
gbshutdownAbort ( true ) ;
}
2013-08-02 13:12:24 -07:00
// wait for the above unlinks to finish before we do this rename
// otherwise, we might end up doing this rename first and deleting
// it!
2016-10-31 18:16:40 +01:00
{
ScopedLock sl ( m_mtxJobCount ) ;
m_submittingJobs = false ;
if ( m_outstandingJobCount ! = 0 )
2017-06-09 16:28:38 +02:00
return ;
2015-12-18 15:19:24 +01:00
}
2016-11-03 16:17:14 +01:00
unlinksDone ( ) ;
2013-08-02 13:12:24 -07:00
}
2016-10-03 16:38:05 +02:00
void RdbBase : : unlinkDoneWrapper ( void * state ) {
RdbBase * that = static_cast < RdbBase * > ( state ) ;
2016-10-31 18:16:40 +01:00
log ( " merge: done unlinking file for collnum=%d #outstanding_jobs=%d " ,
( int ) that - > m_collnum , that - > m_outstandingJobCount ) ;
2016-10-03 16:38:05 +02:00
that - > unlinkDone ( ) ;
2013-08-02 13:12:24 -07:00
}
2016-10-03 16:38:05 +02:00
void RdbBase : : unlinkDone ( ) {
2017-02-28 13:45:51 +01:00
if ( g_errno ) {
log ( LOG_ERROR , " merge: unlinking source files failed, g_errno=%d (%s) " , g_errno , mstrerror ( g_errno ) ) ;
gbshutdownAbort ( true ) ;
}
2013-08-02 13:12:24 -07:00
// bail if waiting for more to come back
2016-10-31 18:16:40 +01:00
if ( ! decrementOustandingJobs ( ) )
return ; //still more to finish
2016-11-03 16:17:14 +01:00
unlinksDone ( ) ;
}
2013-08-02 13:12:24 -07:00
2016-11-03 16:17:14 +01:00
void RdbBase : : unlinksDone ( ) {
2013-08-02 13:12:24 -07:00
// debug msg
log ( LOG_INFO , " merge: Done unlinking all files. " ) ;
2017-02-28 13:45:51 +01:00
if ( g_errno ) {
log ( LOG_ERROR , " merge: unlinking source files failed, g_errno=%d (%s) " , g_errno , mstrerror ( g_errno ) ) ;
gbshutdownAbort ( true ) ;
}
2016-11-01 12:13:31 +01:00
// merge source range [a..b), merge target x
int32_t a = m_mergeStartFileNum ;
//int32_t b = std::min(m_mergeStartFileNum + m_numFilesToMerge, m_numFiles);
int32_t x = a - 1 ; // file #x is the merged file
2016-07-05 16:50:45 +02:00
2016-11-03 16:17:14 +01:00
// sanity check
m_fileInfo [ x ] . m_file - > invalidateFileSize ( ) ;
int64_t fs = m_fileInfo [ x ] . m_file - > getFileSize ( ) ;
// get file size from map
int64_t fs2 = m_fileInfo [ x ] . m_map - > getFileSize ( ) ;
// compare
if ( fs ! = fs2 ) {
log ( " build: Map file size does not agree with actual file size " ) ;
gbshutdownCorrupted ( ) ;
}
// . the fileId of the merge file becomes that of the first sourcefile, which happens to be one more than the tmp.merge file
2013-08-02 13:12:24 -07:00
// . but secondary id should remain the same
2016-11-03 16:17:14 +01:00
m_fileInfo [ x ] . m_fileId | = 1 ;
2016-07-05 16:50:45 +02:00
2016-10-31 18:16:40 +01:00
{
ScopedLock sl ( m_mtxJobCount ) ;
m_submittingJobs = true ;
}
2016-11-03 16:17:14 +01:00
log ( LOG_INFO , " db: Renaming %s of size % " PRId64 " to to final filename(s) " , m_fileInfo [ x ] . m_file - > getFilename ( ) , fs ) ;
char newMapFilename [ 1024 ] ;
generateMapFilename ( newMapFilename , sizeof ( newMapFilename ) , m_fileInfo [ x ] . m_fileId , m_fileInfo [ x ] . m_fileId2 , 0 , - 1 ) ;
if ( ! m_fileInfo [ x ] . m_map - > rename ( newMapFilename , m_collectionDirName , renameDoneWrapper , this ) ) {
2016-10-31 18:16:40 +01:00
incrementOutstandingJobs ( ) ;
2017-02-28 13:45:51 +01:00
} else if ( g_errno ) {
log ( LOG_ERROR , " merge: renaming file(s) failed, g_errno=%d (%s) " , g_errno , mstrerror ( g_errno ) ) ;
gbshutdownAbort ( true ) ;
2016-07-05 16:50:45 +02:00
}
2013-08-02 13:12:24 -07:00
2016-08-05 15:42:20 +02:00
if ( m_useIndexFile ) {
2016-11-03 16:17:14 +01:00
char newIndexFilename [ 1024 ] ;
generateIndexFilename ( newIndexFilename , sizeof ( newIndexFilename ) , m_fileInfo [ x ] . m_fileId , m_fileInfo [ x ] . m_fileId2 , 0 , - 1 ) ;
if ( ! m_fileInfo [ x ] . m_index - > rename ( newIndexFilename , m_collectionDirName , renameDoneWrapper , this ) ) {
2016-10-31 18:16:40 +01:00
incrementOutstandingJobs ( ) ;
2017-02-28 13:45:51 +01:00
} else if ( g_errno ) {
log ( LOG_ERROR , " merge: renaming file(s) failed, g_errno=%d (%s) " , g_errno , mstrerror ( g_errno ) ) ;
gbshutdownAbort ( true ) ;
2016-08-05 15:42:20 +02:00
}
}
2016-11-03 16:17:14 +01:00
char newDataName [ 1024 ] ;
generateDataFilename ( newDataName , sizeof ( newDataName ) , m_fileInfo [ x ] . m_fileId , m_fileInfo [ x ] . m_fileId2 , 0 , - 1 ) ;
2016-10-20 16:54:22 +02:00
// rename it, this may block
2016-11-03 16:17:14 +01:00
if ( ! m_fileInfo [ x ] . m_file - > rename ( newDataName , m_collectionDirName , renameDoneWrapper , this ) ) {
2016-10-31 18:16:40 +01:00
incrementOutstandingJobs ( ) ;
2017-02-28 13:45:51 +01:00
} else if ( g_errno ) {
log ( LOG_ERROR , " merge: renaming file(s) failed, g_errno=%d (%s) " , g_errno , mstrerror ( g_errno ) ) ;
gbshutdownAbort ( true ) ;
2013-08-02 13:12:24 -07:00
}
2016-07-05 16:50:45 +02:00
2016-10-31 18:16:40 +01:00
{
ScopedLock sl ( m_mtxJobCount ) ;
m_submittingJobs = false ;
if ( m_outstandingJobCount ! = 0 )
return ;
2016-07-05 16:50:45 +02:00
}
2016-10-31 18:16:40 +01:00
2016-11-03 16:17:14 +01:00
renamesDone ( ) ;
2013-08-02 13:12:24 -07:00
}
2016-10-03 16:38:05 +02:00
void RdbBase : : renameDoneWrapper ( void * state ) {
RdbBase * that = static_cast < RdbBase * > ( state ) ;
2016-10-31 18:16:40 +01:00
log ( LOG_DEBUG , " rdb: thread completed rename operation for collnum=%d #outstanding_jobs=%d " ,
( int ) that - > m_collnum , that - > m_outstandingJobCount ) ;
2016-10-03 16:38:05 +02:00
that - > renameDone ( ) ;
2013-08-02 13:12:24 -07:00
}
2016-10-03 16:38:05 +02:00
void RdbBase : : checkThreadsAgainWrapper ( int /*fd*/ , void * state ) {
RdbBase * that = static_cast < RdbBase * > ( state ) ;
2013-08-02 13:12:24 -07:00
g_loop . unregisterSleepCallback ( state , checkThreadsAgainWrapper ) ;
2016-10-03 16:38:05 +02:00
that - > renameDone ( ) ;
2013-08-02 13:12:24 -07:00
}
2016-10-03 16:38:05 +02:00
void RdbBase : : renameDone ( ) {
2017-02-28 13:45:51 +01:00
if ( g_errno ) {
log ( LOG_ERROR , " merge: renaming file(s) failed, g_errno=%d (%s) " , g_errno , mstrerror ( g_errno ) ) ;
gbshutdownAbort ( true ) ;
}
2013-08-02 13:12:24 -07:00
// bail if waiting for more to come back
2016-10-31 18:16:40 +01:00
if ( ! decrementOustandingJobs ( ) )
return ;
2016-11-03 16:17:14 +01:00
renamesDone ( ) ;
}
2013-08-02 13:12:24 -07:00
2016-11-03 16:17:14 +01:00
void RdbBase : : renamesDone ( ) {
2015-08-15 19:26:37 -06:00
// some shorthand variable notation
2014-11-10 14:45:11 -08:00
int32_t a = m_mergeStartFileNum ;
int32_t b = m_mergeStartFileNum + m_numFilesToMerge ;
2013-08-02 13:12:24 -07:00
//
// wait for all threads accessing this bigfile to go bye-bye
//
log ( " db: checking for outstanding read threads on unlinked files " ) ;
bool wait = false ;
2014-11-10 14:45:11 -08:00
for ( int32_t i = a ; i < b ; i + + ) {
2016-10-04 15:06:54 +02:00
BigFile * bf = m_fileInfo [ i ] . m_file ;
2016-04-29 14:24:26 +02:00
if ( g_jobScheduler . is_reading_file ( bf ) ) wait = true ;
2013-08-02 13:12:24 -07:00
}
if ( wait ) {
log ( " db: waiting for read thread to exit on unlinked file " ) ;
2017-05-30 12:11:10 +02:00
if ( ! g_loop . registerSleepCallback ( 100 , this , checkThreadsAgainWrapper , " RdbBase::checkThreadsAgainWrapper " ) ) {
2016-10-18 16:18:42 +02:00
gbshutdownResourceError ( ) ;
2016-07-12 11:46:42 +02:00
}
2013-08-02 13:12:24 -07:00
return ;
}
// file #x is the merge file
// rid ourselves of these files
2016-10-31 18:16:40 +01:00
{
ScopedLock sl ( m_mtxFileInfo ) ; //lock while manipulating m_fileInfo
buryFiles ( a , b ) ;
2017-03-17 14:48:10 +01:00
submitGlobalIndexJob_unlocked ( false , - 1 ) ;
2016-10-31 18:16:40 +01:00
}
2017-03-17 14:48:10 +01:00
2013-08-02 13:12:24 -07:00
// sanity check
if ( m_numFilesToMerge ! = ( b - a ) ) {
2016-07-08 11:54:23 +02:00
log ( LOG_LOGIC , " db: Bury oops. " ) ;
2016-10-18 16:18:42 +02:00
gbshutdownLogicError ( ) ;
2016-07-08 11:54:23 +02:00
}
2014-01-16 13:38:22 -08:00
// decrement this count
2016-07-08 11:54:23 +02:00
if ( m_isMerging ) {
2016-08-24 18:04:06 +02:00
m_rdb - > decrementNumMerges ( ) ;
2016-07-08 11:54:23 +02:00
}
2013-08-02 13:12:24 -07:00
// exit merge mode
m_isMerging = false ;
2017-03-06 17:10:31 +01:00
g_merge . mergeIncorporated ( this ) ;
2015-08-15 19:26:37 -06:00
// try to merge more when we are done
2016-07-11 14:07:49 +02:00
attemptMergeAll ( ) ;
2013-08-02 13:12:24 -07:00
}
2014-11-10 14:45:11 -08:00
void RdbBase : : buryFiles ( int32_t a , int32_t b ) {
2013-08-02 13:12:24 -07:00
// on succes unlink the files we merged and free them
2014-11-10 14:45:11 -08:00
for ( int32_t i = a ; i < b ; i + + ) {
2016-10-04 15:06:54 +02:00
mdelete ( m_fileInfo [ i ] . m_file , sizeof ( BigFile ) , " RdbBase " ) ;
delete m_fileInfo [ i ] . m_file ;
mdelete ( m_fileInfo [ i ] . m_map , sizeof ( RdbMap ) , " RdbBase " ) ;
delete m_fileInfo [ i ] . m_map ;
mdelete ( m_fileInfo [ i ] . m_index , sizeof ( RdbIndex ) , " RdbBase " ) ;
delete m_fileInfo [ i ] . m_index ;
2013-08-02 13:12:24 -07:00
}
// bury the merged files
2014-11-10 14:45:11 -08:00
int32_t n = m_numFiles - b ;
2016-10-04 15:06:54 +02:00
memmove ( m_fileInfo + a , m_fileInfo + b , n * sizeof ( m_fileInfo [ 0 ] ) ) ;
2013-08-02 13:12:24 -07:00
// decrement the file count appropriately
m_numFiles - = ( b - a ) ;
2014-06-05 12:26:06 -07:00
// sanity
2016-05-20 09:18:32 +02:00
log ( " rdb: bury files: numFiles now % " PRId32 " (b=% " PRId32 " a=% " PRId32 " collnum=% " PRId32 " ) " ,
2014-11-10 14:45:11 -08:00
m_numFiles , b , a , ( int32_t ) m_collnum ) ;
2013-08-02 13:12:24 -07:00
}
2016-11-03 16:33:25 +01:00
//Get the min-to-merge configuration for this collection and/or RDB
int32_t RdbBase : : getMinToMerge ( const CollectionRec * cr , rdbid_t rdbId , int32_t minToMergeOverride ) const {
// always obey the override
if ( minToMergeOverride > = 2 ) {
log ( LOG_INFO , " merge: Overriding min files to merge of %d with %d " , m_minToMerge , minToMergeOverride ) ;
return minToMergeOverride ;
}
2017-03-20 16:13:38 +01:00
logTrace ( g_conf . m_logTraceRdbBase , " m_minToMergeDefault: %d " , m_minToMergeDefault ) ;
// if m_minToMergeDefault is -1 then we should let cr override, but if m_minToMergeDefault
2016-11-03 16:33:25 +01:00
// is actually valid at this point, use it as is
2017-03-20 16:13:38 +01:00
if ( m_minToMergeDefault > 0 ) {
log ( LOG_INFO , " merge: Using already-set m_minToMergeDefault of %d for %s " , m_minToMergeDefault , m_dbname ) ;
return m_minToMergeDefault ;
2016-11-03 16:33:25 +01:00
}
2017-03-20 16:13:38 +01:00
int32_t result = - 1 ;
2016-11-03 16:33:25 +01:00
// if the collection exist use its values
if ( cr ) {
switch ( rdbId ) {
case RDB_POSDB :
result = cr - > m_posdbMinFilesToMerge ;
logTrace ( g_conf . m_logTraceRdbBase , " posdb. m_minToMerge: %d " , m_minToMerge ) ;
break ;
case RDB_TITLEDB :
result = cr - > m_titledbMinFilesToMerge ;
logTrace ( g_conf . m_logTraceRdbBase , " titledb. m_minToMerge: %d " , m_minToMerge ) ;
break ;
case RDB_SPIDERDB :
result = cr - > m_spiderdbMinFilesToMerge ;
logTrace ( g_conf . m_logTraceRdbBase , " spiderdb. m_minToMerge: %d " , m_minToMerge ) ;
break ;
// case RDB_CLUSTERDB:
// result = cr->m_clusterdbMinFilesToMerge;
// logTrace(g_conf.m_logTraceRdbBase, "clusterdb. m_minToMerge: %d", m_minToMerge);
// break;
case RDB_LINKDB :
result = cr - > m_linkdbMinFilesToMerge ;
logTrace ( g_conf . m_logTraceRdbBase , " linkdb. m_minToMerge: %d " , m_minToMerge ) ;
break ;
case RDB_TAGDB :
result = cr - > m_tagdbMinFilesToMerge ;
logTrace ( g_conf . m_logTraceRdbBase , " tagdb. m_minToMerge: %d " , m_minToMerge ) ;
break ;
default :
; //no per-collection override
}
}
2017-03-07 12:49:37 +01:00
log ( LOG_INFO , " merge: Using min files to merge %d for %s " , result , m_dbname ) ;
2016-11-03 16:33:25 +01:00
return result ;
}
2015-08-15 19:26:37 -06:00
// . the DailyMerge.cpp will set minToMergeOverride for titledb, and this
// overrides "forceMergeAll" which is the same as setting
// "minToMergeOverride" to "2". (i.e. perform a merge if you got 2 or more
// files)
// . now return true if we started a merge, false otherwise
2016-10-13 14:30:39 +02:00
bool RdbBase : : attemptMerge ( int32_t niceness , bool forceMergeAll , int32_t minToMergeOverride ) {
2016-05-20 09:18:32 +02:00
logTrace ( g_conf . m_logTraceRdbBase , " BEGIN. minToMergeOverride: % " PRId32 , minToMergeOverride ) ;
2016-01-28 17:11:42 +01:00
2013-08-02 13:12:24 -07:00
// don't do merge if we're in read only mode
2016-05-02 16:03:40 +02:00
if ( g_conf . m_readOnlyMode ) {
2016-05-02 11:45:02 +02:00
logTrace ( g_conf . m_logTraceRdbBase , " END, in read-only mode " ) ;
2016-01-28 17:11:42 +01:00
return false ;
}
2016-05-02 16:03:40 +02:00
2016-10-11 15:56:54 +02:00
// nor if the merge class is halted
if ( g_merge . isHalted ( ) ) {
2016-05-02 11:45:02 +02:00
logTrace ( g_conf . m_logTraceRdbBase , " END, is suspended " ) ;
2016-01-28 17:11:42 +01:00
return false ;
}
2015-06-14 11:34:13 -07:00
// shutting down? do not start another merge then
2017-04-20 14:00:45 +02:00
if ( g_process . isShuttingDown ( ) ) {
2016-05-02 11:45:02 +02:00
logTrace ( g_conf . m_logTraceRdbBase , " END, shutting down " ) ;
2016-01-28 17:11:42 +01:00
return false ;
}
2016-10-31 18:16:40 +01:00
// . wait for all unlinking and renaming activity to flush out
// . otherwise, a rename or unlink might still be waiting to happen
// and it will mess up our merge
// . right after a merge we get a few of these printed out...
if ( m_outstandingJobCount ) {
log ( LOG_INFO , " merge: Waiting for unlink/rename "
" operations to finish before attempting merge "
" for %s. (collnum=% " PRId32 " ) " , m_dbname , ( int32_t ) m_collnum ) ;
logTrace ( g_conf . m_logTraceRdbBase , " END, wait for unlink/rename " ) ;
return false ;
}
2015-06-14 11:34:13 -07:00
2014-01-16 13:38:22 -08:00
if ( forceMergeAll ) m_nextMergeForced = true ;
if ( m_nextMergeForced ) forceMergeAll = true ;
2016-01-28 17:11:42 +01:00
if ( forceMergeAll ) {
2016-05-20 09:18:32 +02:00
log ( LOG_INFO , " merge: forcing merge for %s. (collnum=% " PRId32 " ) " , m_dbname , ( int32_t ) m_collnum ) ;
2016-01-28 17:11:42 +01:00
}
2015-09-04 16:14:18 -07:00
2016-08-24 14:46:04 +02:00
rdbid_t rdbId = getIdFromRdb ( m_rdb ) ;
2013-08-02 13:12:24 -07:00
// if a dump is happening it will always be the last file, do not
// include it in the merge
2014-11-10 14:45:11 -08:00
int32_t numFiles = m_numFiles ;
2017-04-03 21:03:51 +02:00
if ( numFiles > 0 & & m_rdb - > isDumping ( ) ) numFiles - - ;
2016-10-31 17:33:16 +01:00
2013-08-02 13:12:24 -07:00
// set m_minToMerge from coll rec if we're indexdb
2017-01-12 16:49:21 +01:00
CollectionRec * cr = g_collectiondb . getRec ( m_collnum ) ;
2013-08-02 13:12:24 -07:00
// now see if collection rec is there to override us
//if ( ! cr ) {
2017-03-29 11:51:20 +02:00
if ( ! cr ) {
2013-08-02 13:12:24 -07:00
g_errno = 0 ;
log ( " merge: Could not find coll rec for %s. " , m_coll ) ;
}
2016-09-23 00:15:29 +02:00
2016-11-03 16:33:25 +01:00
m_minToMerge = getMinToMerge ( cr , rdbId , minToMergeOverride ) ;
2013-08-02 13:12:24 -07:00
// if still -1 that is a problem
if ( m_minToMerge < = 0 ) {
2016-07-12 11:46:42 +02:00
log ( LOG_WARN , " Got bad minToMerge of % " PRId32 " for %s. Set its default to "
2013-08-02 13:12:24 -07:00
" something besides -1 in Parms.cpp or add it to "
" CollectionRec.h. " ,
m_minToMerge , m_dbname ) ;
//m_minToMerge = 2;
2016-10-18 16:18:42 +02:00
gbshutdownLogicError ( ) ;
2013-08-02 13:12:24 -07:00
}
2016-10-13 14:30:39 +02:00
2013-08-02 13:12:24 -07:00
// print it
2017-03-07 12:49:37 +01:00
log ( LOG_INFO , " merge: Considering merging % " PRId32 " %s files on disk. % " PRId32 " files needed to trigger a merge. " ,
2016-10-13 14:30:39 +02:00
numFiles , m_dbname , m_minToMerge ) ;
2016-01-28 17:11:42 +01:00
2016-10-11 16:43:21 +02:00
if ( g_merge . isMerging ( ) )
2016-01-28 17:11:42 +01:00
{
2016-05-02 11:45:02 +02:00
logTrace ( g_conf . m_logTraceRdbBase , " END, is merging " ) ;
2015-08-15 19:26:37 -06:00
return false ;
2016-01-28 17:11:42 +01:00
}
2014-03-15 14:56:11 -07:00
2016-10-13 15:40:56 +02:00
// bail if already merging THIS class
if ( m_isMerging ) {
log ( LOG_INFO , " merge: Waiting for other merge to complete before merging %s. " , m_dbname ) ;
logTrace ( g_conf . m_logTraceRdbBase , " END, already merging this " ) ;
return false ;
}
2013-08-02 13:12:24 -07:00
// are we resuming a killed merge?
bool resuming = false ;
2014-11-10 14:45:11 -08:00
for ( int32_t j = 0 ; j < numFiles ; j + + ) {
2016-10-31 17:12:07 +01:00
// if an even-numered file exist then we are resuming a merge
if ( ( m_fileInfo [ j ] . m_fileId & 0x01 ) = = 0 ) {
resuming = true ;
logTrace ( g_conf . m_logTraceRdbBase , " Resuming a merge " ) ;
break ;
}
2013-08-02 13:12:24 -07:00
}
2014-03-15 14:56:11 -07:00
2017-03-07 12:49:37 +01:00
if ( m_attemptOnlyMergeResumption & & ! resuming ) {
m_attemptOnlyMergeResumption = false ;
log ( LOG_INFO , " merge: No interrupted merge of %s. Won't consider initiating a merge until next call " , m_dbname ) ;
logTrace ( g_conf . m_logTraceRdbBase , " END, no interrupted merge " ) ;
return false ;
}
//on next call to attempMerge() we are allowed to do normal non-interrupted merges
m_attemptOnlyMergeResumption = false ;
2016-11-07 11:56:05 +01:00
// this triggers the negative rec concentration and
2014-06-05 22:21:41 -07:00
// tries to merge on one file...
2015-09-04 16:14:18 -07:00
if ( ! resuming & & m_numFiles < = 1 ) {
m_nextMergeForced = false ;
2016-05-20 09:18:32 +02:00
logTrace ( g_conf . m_logTraceRdbBase , " END, too few files (% " PRId32 " ) " , m_numFiles ) ;
2015-09-04 16:14:18 -07:00
return false ;
}
2014-06-05 22:21:41 -07:00
2013-08-02 13:12:24 -07:00
// . don't merge if we don't have the min # of files
// . but skip this check if there is a merge to be resumed from b4
2016-10-13 14:13:53 +02:00
if ( ! resuming & & ! forceMergeAll & & numFiles < m_minToMerge ) {
2015-08-08 12:27:18 -07:00
// now we no longer have to check this collection rdb for
// merging. this will save a lot of cpu time when we have
// 20,000+ collections. if we dump a file to disk for it
// then we set this flag back to false in Rdb.cpp.
2016-10-13 14:13:53 +02:00
logTrace ( g_conf . m_logTraceRdbBase , " END, min files not reached (% " PRId32 " / % " PRId32 " ) " , numFiles , m_minToMerge ) ;
2015-08-15 19:26:37 -06:00
return false ;
2015-08-08 12:27:18 -07:00
}
2013-08-02 13:12:24 -07:00
// remember niceness for calling g_merge.merge()
m_niceness = niceness ;
2016-08-24 14:47:42 +02:00
2013-08-02 13:12:24 -07:00
// bitch if we got token because there was an error somewhere
if ( g_errno ) {
log ( LOG_LOGIC , " merge: attemptMerge: %s failed: %s " ,
m_dbname , mstrerror ( g_errno ) ) ;
g_errno = 0 ;
log ( LOG_LOGIC , " merge: attemptMerge: %s: uh oh... " , m_dbname ) ;
// we don't have the token, so we're fucked...
2015-08-15 19:26:37 -06:00
return false ;
2013-08-02 13:12:24 -07:00
}
2016-10-11 16:43:21 +02:00
if ( m_isMerging | | g_merge . isMerging ( ) ) {
2016-10-31 18:16:40 +01:00
logTrace ( g_conf . m_logTraceRdbBase , " END, already merging " ) ;
2015-08-15 19:26:37 -06:00
return false ;
2013-08-02 13:12:24 -07:00
}
2014-06-05 21:27:33 -07:00
// or if # threads out is positive
2016-10-31 18:16:40 +01:00
if ( m_outstandingJobCount ! = 0 ) {
2016-05-02 11:45:02 +02:00
logTrace ( g_conf . m_logTraceRdbBase , " END, threads already running " ) ;
2016-01-28 17:11:42 +01:00
return false ;
}
2014-06-05 21:27:33 -07:00
2013-08-02 13:12:24 -07:00
// clear for take-off
// . i used to just merge all the files into 1
// . but it may be more efficient to merge just enough files as
// to put m_numFiles below m_minToMerge
// . if we have the files : A B C D E F and m_minToMerge is 6
// then merge F and E, but if D is < E merged D too, etc...
// . this merge algorithm is definitely better than merging everything
// if we don't do much reading to the db, only writing
2016-10-14 17:02:41 +02:00
int32_t mergeFileCount = 0 ;
2016-07-13 12:49:29 +02:00
int32_t mergeFileId ;
int32_t mergeFileNum ;
2016-11-03 16:17:14 +01:00
bool foundInterruptedMerge = false ;
2013-08-02 13:12:24 -07:00
2016-05-02 11:45:02 +02:00
logTrace ( g_conf . m_logTraceRdbBase , " Checking files " ) ;
2016-05-02 16:03:40 +02:00
2016-11-03 16:17:14 +01:00
// Detect interrupted merges
for ( int32_t i = 0 ; i < numFiles ; i + + ) {
2013-08-02 13:12:24 -07:00
// skip odd numbered files
2016-11-03 16:17:14 +01:00
if ( m_fileInfo [ i ] . m_fileId & 0x01 ) {
2016-07-06 11:13:52 +02:00
continue ;
}
2016-11-03 16:17:14 +01:00
log ( LOG_DEBUG , " merge:found interrupted merge file %s " , m_fileInfo [ i ] . m_file - > getFilename ( ) ) ;
2016-07-06 11:13:52 +02:00
2016-11-03 16:17:14 +01:00
// store the merged data into this fileid and number
mergeFileId = m_fileInfo [ i ] . m_fileId ;
mergeFileNum = i ;
2016-07-06 11:13:52 +02:00
// files being merged into have a filename like
2013-08-02 13:12:24 -07:00
// indexdb0000.003.dat where the 003 indicates how many files
2016-07-14 11:27:47 +02:00
// is is merging in case we have to resume them due to power loss or whatever
2016-11-03 16:17:14 +01:00
//todo: don't re-parse filename. Just store the count+end in FileInfo structure
2016-10-13 17:24:58 +02:00
int32_t fileId , fileId2 ;
2016-10-14 16:37:53 +02:00
int32_t endMergeFileId ;
2016-11-03 16:17:14 +01:00
if ( ! parseFilename ( m_fileInfo [ i ] . m_file - > getFilename ( ) , & fileId , & fileId2 , & mergeFileCount , & endMergeFileId ) ) {
log ( LOG_LOGIC , " merge:attemptMerge:resuming: couln't parse pre-accepted filename of %s " , m_fileInfo [ i ] . m_file - > getFilename ( ) ) ;
gbshutdownLogicError ( ) ;
2016-07-14 11:27:47 +02:00
}
2016-07-06 11:13:52 +02:00
2016-11-03 16:17:14 +01:00
if ( m_isTitledb & & fileId2 < 0 ) { // if titledb we must have a "-023" part now
log ( LOG_LOGIC , " merge:attemptMerge:resuming: unexpected filename for %s coll=%s file=%s " , m_dbname , m_coll , m_fileInfo [ i ] . m_file - > getFilename ( ) ) ;
2016-10-18 16:18:42 +02:00
gbshutdownCorrupted ( ) ;
2016-11-03 16:17:14 +01:00
}
2016-07-06 11:13:52 +02:00
2016-11-03 16:17:14 +01:00
if ( mergeFileCount < = 0 ) {
log ( LOG_LOGIC , " merge:attemptMerge:resuming: unexpected filename for %s coll=%s file=%s " , m_dbname , m_coll , m_fileInfo [ i ] . m_file - > getFilename ( ) ) ;
2016-07-13 12:49:29 +02:00
gbshutdownCorrupted ( ) ;
}
2016-11-03 16:17:14 +01:00
if ( mergeFileCount = = 1 ) {
log ( LOG_LOGIC , " merge:attemptMerge:resuming: fishy filename for %s coll=%s file=%s " , m_dbname , m_coll , m_fileInfo [ i ] . m_file - > getFilename ( ) ) ;
2016-07-13 12:49:29 +02:00
}
2016-11-03 16:17:14 +01:00
if ( endMergeFileId < = 0 ) {
log ( LOG_LOGIC , " merge:attemptMerge:resuming: unexpected filename for %s coll=%s file=%s " , m_dbname , m_coll , m_fileInfo [ i ] . m_file - > getFilename ( ) ) ;
gbshutdownCorrupted ( ) ;
2016-07-13 12:49:29 +02:00
}
2016-07-06 11:13:52 +02:00
2016-11-03 16:17:14 +01:00
int32_t endMergeFileNum = mergeFileNum ;
2017-05-31 17:22:44 +02:00
for ( int32_t j = mergeFileNum + 1 ; j < m_numFiles ; j + + ) {
if ( m_fileInfo [ j ] . m_fileId < = endMergeFileId ) {
2016-11-03 16:17:14 +01:00
endMergeFileNum = j ;
2017-05-31 17:22:44 +02:00
if ( m_fileInfo [ j ] . m_fileId = = endMergeFileId ) {
break ;
}
2016-11-03 16:17:14 +01:00
}
2013-08-02 13:12:24 -07:00
}
2016-07-05 16:50:45 +02:00
2016-11-03 16:17:14 +01:00
log ( LOG_INFO , " merge: Resuming interrupted merge for %s coll=%s, mergefile=%s " , m_dbname , m_coll , m_fileInfo [ i ] . m_file - > getFilename ( ) ) ;
2016-07-06 11:13:52 +02:00
2016-11-03 16:17:14 +01:00
int32_t currentFilesToMerge = endMergeFileNum - mergeFileNum ;
if ( currentFilesToMerge < 0 )
gbshutdownLogicError ( ) ;
2016-07-05 16:50:45 +02:00
2017-05-31 17:22:44 +02:00
if ( currentFilesToMerge < mergeFileCount ) {
2016-11-03 16:17:14 +01:00
log ( LOG_INFO , " merge: Only merging % " PRId32 " instead of the original % " PRId32 " files. " , currentFilesToMerge , mergeFileCount ) ;
} else if ( currentFilesToMerge = = mergeFileCount ) {
//excellent
} else {
//What? This should not happen. Eg if we have these files:
// file0000.002.0007.dat
// file0001.dat
// file0003.dat
// file0005.dat
// file0007.dat
//then somehow extra files came into existence where they shouldn't
log ( LOG_ERROR , " merge:attemptMerge:resuming: found more merge-source files than expected for %s coll=%s file=%s " , m_dbname , m_coll , m_fileInfo [ i ] . m_file - > getFilename ( ) ) ;
gbshutdownCorrupted ( ) ;
2014-06-05 21:27:33 -07:00
}
2016-07-05 16:50:45 +02:00
2013-08-02 13:12:24 -07:00
// how many files to merge?
2016-11-03 16:17:14 +01:00
mergeFileCount = currentFilesToMerge ;
foundInterruptedMerge = true ;
break ;
}
//If there isn't an interrupted merge then we can do a normal new merge
if ( ! foundInterruptedMerge ) {
// look at this merge:
// indexdb0003.dat.part1
// indexdb0003.dat.part2
// indexdb0003.dat.part3
// indexdb0003.dat.part4
// indexdb0003.dat.part5
// indexdb0003.dat.part6
// indexdb0003.dat.part7
// indexdb0039.dat
// indexdb0039.dat.part1
// indexdb0045.dat
// indexdb0047.dat
// indexdb0002.002.dat
// indexdb0002.002.dat.part1
// it should have merged 45 and 46 since they are so much smaller
// even though the ratio between 3 and 39 is lower. we did not compute
// our dtotal correctly...
// . use greedy method
// . just merge the minimum # of files to stay under m_minToMerge
// . files must be consecutive, however
// . but ALWAYS make sure file i-1 is bigger than file i
mergeFileCount = numFiles - m_minToMerge + 2 ;
// titledb should always merge at least 50 files no matter what though
// cuz i don't want it merging its huge root file and just one
// other file... i've seen that happen... but don't know why it didn't
// merge two small files! i guess because the root file was the
// oldest file! (38.80 days old)???
if ( m_isTitledb & & mergeFileCount < 50 & & m_minToMerge > 200 ) {
// force it to 50 files to merge
mergeFileCount = 50 ;
// but must not exceed numFiles!
if ( mergeFileCount > numFiles ) {
mergeFileCount = numFiles ;
}
2016-07-05 16:50:45 +02:00
}
2014-03-14 22:15:08 -07:00
2016-11-03 16:17:14 +01:00
if ( mergeFileCount > absoluteMaxFilesToMerge ) {
mergeFileCount = absoluteMaxFilesToMerge ;
2014-03-14 22:15:08 -07:00
}
2016-11-03 16:17:14 +01:00
// but if we are forcing then merge ALL, except one being dumped
if ( m_nextMergeForced ) {
2016-10-14 17:02:41 +02:00
mergeFileCount = numFiles ;
2016-07-08 17:21:19 +02:00
}
2016-05-02 16:03:40 +02:00
2016-11-03 16:17:14 +01:00
int32_t mini ;
selectFilesToMerge ( mergeFileCount , numFiles , & mini ) ;
2016-07-13 12:49:29 +02:00
2016-11-03 16:17:14 +01:00
// if no valid range, bail
if ( mini = = - 1 ) {
log ( LOG_LOGIC , " merge: gotTokenForMerge: Bad engineer. mini is -1. " ) ;
return false ;
}
// . merge from file #mini through file #(mini+n)
// . these files should all have ODD fileIds so we can sneak a new
// mergeFileId in there
mergeFileId = m_fileInfo [ mini ] . m_fileId - 1 ;
// get new id, -1 on error
int32_t fileId2 ;
fileId2 = m_isTitledb ? 0 : - 1 ;
// . make a filename for the merge
// . always starts with file #0
// . merge targets are named like "indexdb0000.002.dat"
// . for titledb is "titledb0000-023.dat.003" (23 is id2)
// . this now also sets m_mergeStartFileNum for us... but we override
// below anyway. we have to set it there in case we startup and
// are resuming a merge.
int32_t endMergeFileNum = mini + mergeFileCount - 1 ;
int32_t endMergeFileId = m_fileInfo [ endMergeFileNum ] . m_fileId ;
log ( LOG_INFO , " merge: mergeFileCount=%d mini=%d mergeFileId=%d endMergeFileNum=%d endMergeFileId=%d " ,
mergeFileCount , mini , mergeFileId , endMergeFileNum , endMergeFileId ) ;
2016-10-31 18:16:40 +01:00
2016-11-03 16:17:14 +01:00
{
//The lock of m_mtxFileInfo is delayed until now because the previous accesses were reads only, but
// we must hold the mutex while colling addFile() which modifies the array.
ScopedLock sl ( m_mtxFileInfo ) ;
mergeFileNum = addFile ( true , mergeFileId , fileId2 , mergeFileCount , endMergeFileId , true ) ;
2017-03-17 14:48:10 +01:00
if ( mergeFileNum > = 0 ) {
submitGlobalIndexJob_unlocked ( false , - 1 ) ;
}
2016-11-03 16:17:14 +01:00
}
2016-07-13 12:49:29 +02:00
2016-11-03 16:17:14 +01:00
if ( mergeFileNum < 0 ) {
log ( LOG_LOGIC , " merge: attemptMerge: Could not add new file. " ) ;
g_errno = 0 ;
return false ;
}
2013-08-02 13:12:24 -07:00
2016-11-03 16:17:14 +01:00
// is it a force?
if ( m_nextMergeForced ) {
log ( LOG_INFO , " merge: Force merging all %s files, except those being dumped now. " , m_dbname ) ;
}
// clear after each call to attemptMerge()
m_nextMergeForced = false ;
// sanity check
if ( mergeFileCount < = 1 ) {
log ( LOG_LOGIC , " merge: attemptMerge: Not merging % " PRId32 " files. " , mergeFileCount ) ;
return false ;
}
2013-08-02 13:12:24 -07:00
}
2016-11-03 16:17:14 +01:00
2013-08-02 13:12:24 -07:00
2016-11-03 16:17:14 +01:00
// . save the start number and the count of files we're merging
2016-10-13 14:50:25 +02:00
m_mergeStartFileNum = mergeFileNum + 1 ;
2016-10-14 17:02:41 +02:00
m_numFilesToMerge = mergeFileCount ;
2013-08-02 13:12:24 -07:00
2016-11-03 16:17:14 +01:00
const char * coll = cr ? cr - > m_coll : " " ;
2014-06-04 12:15:12 -07:00
2013-08-02 13:12:24 -07:00
// log merge parms
2016-05-20 09:18:32 +02:00
log ( LOG_INFO , " merge: Merging % " PRId32 " %s files to file id % " PRId32 " now. "
" collnum=% " PRId32 " coll=%s " ,
2016-10-14 17:02:41 +02:00
mergeFileCount , m_dbname , mergeFileId , ( int32_t ) m_collnum , coll ) ;
2013-08-02 13:12:24 -07:00
// print out file info
2016-10-13 14:45:47 +02:00
m_premergeNumPositiveRecords = 0 ;
m_premergeNumNegativeRecords = 0 ;
2016-07-13 12:49:29 +02:00
for ( int32_t i = m_mergeStartFileNum ; i < m_mergeStartFileNum + m_numFilesToMerge ; + + i ) {
2016-10-13 14:45:47 +02:00
m_premergeNumPositiveRecords + = m_fileInfo [ i ] . m_map - > getNumPositiveRecs ( ) ;
m_premergeNumNegativeRecords + = m_fileInfo [ i ] . m_map - > getNumNegativeRecs ( ) ;
2016-05-20 09:18:32 +02:00
log ( LOG_INFO , " merge: %s (#% " PRId32 " ) has % " PRId64 " positive "
" and % " PRId64 " negative records. " ,
2016-10-04 15:06:54 +02:00
m_fileInfo [ i ] . m_file - > getFilename ( ) ,
2013-08-02 13:12:24 -07:00
i ,
2016-10-04 15:06:54 +02:00
m_fileInfo [ i ] . m_map - > getNumPositiveRecs ( ) ,
m_fileInfo [ i ] . m_map - > getNumNegativeRecs ( ) ) ;
2013-08-02 13:12:24 -07:00
}
2016-05-20 09:18:32 +02:00
log ( LOG_INFO , " merge: Total positive = % " PRId64 " Total negative = % " PRId64 " . " ,
2016-10-13 14:45:47 +02:00
m_premergeNumPositiveRecords , m_premergeNumNegativeRecords ) ;
2013-08-02 13:12:24 -07:00
// assume we are now officially merging
m_isMerging = true ;
2016-08-24 18:04:06 +02:00
m_rdb - > incrementNumMerges ( ) ;
2014-01-16 13:38:22 -08:00
2016-05-02 11:45:02 +02:00
logTrace ( g_conf . m_logTraceRdbBase , " merge! " ) ;
2013-08-02 13:12:24 -07:00
// . start the merge
// . returns false if blocked, true otherwise & sets g_errno
2016-09-29 17:20:12 +02:00
if ( ! g_merge . merge ( rdbId ,
m_collnum ,
2016-10-04 15:06:54 +02:00
m_fileInfo [ mergeFileNum ] . m_file ,
m_fileInfo [ mergeFileNum ] . m_map ,
m_fileInfo [ mergeFileNum ] . m_index ,
2016-09-29 17:20:12 +02:00
m_mergeStartFileNum ,
m_numFilesToMerge ,
2016-09-30 16:08:51 +02:00
m_niceness ) ) {
2015-08-15 19:26:37 -06:00
// we started the merge so return true here
2016-05-02 11:45:02 +02:00
logTrace ( g_conf . m_logTraceRdbBase , " END, started OK " ) ;
2015-08-15 19:26:37 -06:00
return true ;
2016-01-28 17:11:42 +01:00
}
2014-11-17 18:13:36 -08:00
// hey, we're no longer merging i guess
2013-08-02 13:12:24 -07:00
m_isMerging = false ;
2014-01-16 13:38:22 -08:00
// decerment this count
2016-08-24 18:04:06 +02:00
m_rdb - > decrementNumMerges ( ) ;
2013-08-02 13:12:24 -07:00
// bitch on g_errno then clear it
2016-07-12 14:51:25 +02:00
if ( g_errno ) {
log ( LOG_WARN , " merge: Had error getting merge token for %s: %s. " , m_dbname , mstrerror ( g_errno ) ) ;
}
2013-08-02 13:12:24 -07:00
g_errno = 0 ;
2016-05-02 16:03:40 +02:00
2015-08-15 19:26:37 -06:00
log ( " merge: did not block for some reason. " ) ;
2016-05-02 11:45:02 +02:00
logTrace ( g_conf . m_logTraceRdbBase , " END " ) ;
2015-08-15 19:26:37 -06:00
return true ;
2013-08-02 13:12:24 -07:00
}
2016-01-28 17:11:42 +01:00
2016-10-14 17:02:41 +02:00
void RdbBase : : selectFilesToMerge ( int32_t mergeFileCount , int32_t numFiles , int32_t * p_mini ) {
2016-10-13 17:34:27 +02:00
float minr = 99999999999.0 ;
int64_t mint = 0x7fffffffffffffffLL ;
int32_t mini = - 1 ;
bool minOld = false ;
int32_t nowLocal = getTimeLocal ( ) ;
2016-10-14 17:02:41 +02:00
for ( int32_t i = 0 ; i + mergeFileCount < = numFiles ; i + + ) {
2017-03-14 14:41:39 +01:00
//Consider the filees [i..i+mergeFileCount)
//if any of the files in the range are makred unreadable then skip that range.
//This should only happen for the last range while a new file is being dumped
bool anyUnreadableFiles = false ;
2017-04-19 16:15:54 +02:00
for ( int32_t j = i ; j < i + mergeFileCount ; j + + ) {
if ( ! m_fileInfo [ j ] . m_allowReads ) {
2017-03-14 14:41:39 +01:00
anyUnreadableFiles = true ;
2017-04-19 16:15:54 +02:00
break ;
}
2017-03-14 14:41:39 +01:00
}
2017-04-19 16:15:54 +02:00
if ( anyUnreadableFiles ) {
2017-03-14 14:41:39 +01:00
log ( LOG_DEBUG , " merge: file range [%d..%d] contains unreadable files " , i , i + mergeFileCount - 1 ) ;
continue ;
}
2016-10-13 17:34:27 +02:00
// oldest file
time_t date = - 1 ;
// add up the string
int64_t total = 0 ;
2016-10-14 17:02:41 +02:00
for ( int32_t j = i ; j < i + mergeFileCount ; j + + ) {
2016-10-13 17:34:27 +02:00
total + = m_fileInfo [ j ] . m_file - > getFileSize ( ) ;
time_t mtime = m_fileInfo [ j ] . m_file - > getLastModifiedTime ( ) ;
// skip on error
if ( mtime < 0 ) {
continue ;
}
if ( mtime > date ) {
date = mtime ;
}
}
// does it have a file more than 30 days old?
bool old = ( date < nowLocal - 30 * 24 * 3600 ) ;
// not old if error (date will be -1)
if ( date < 0 ) {
old = false ;
}
// if it does, and current winner does not, force ourselves!
if ( old & & ! minOld ) {
mint = 0x7fffffffffffffffLL ;
}
// and if we are not old and the min is, do not consider
if ( ! old & & minOld ) {
continue ;
}
// if merging titledb, just pick by the lowest total
2016-10-20 17:02:34 +02:00
if ( m_isTitledb ) {
2016-10-13 17:34:27 +02:00
if ( total < mint ) {
mini = i ;
mint = total ;
minOld = old ;
2016-10-14 17:02:41 +02:00
log ( LOG_INFO , " merge: titledb i=% " PRId32 " mergeFileCount=% " PRId32 " "
2016-10-13 17:34:27 +02:00
" mint=% " PRId64 " mini=% " PRId32 " "
" oldestfile=%.02fdays " ,
2016-10-14 17:02:41 +02:00
i , mergeFileCount , mint , mini ,
2016-10-13 17:34:27 +02:00
( ( float ) nowLocal - date ) / ( 24 * 3600.0 ) ) ;
}
continue ;
}
// . get the average ratio between mergees
// . ratio in [1.0,inf)
// . prefer the lowest average ratio
double ratio = 0.0 ;
2016-10-14 17:02:41 +02:00
for ( int32_t j = i ; j < i + mergeFileCount - 1 ; j + + ) {
2016-10-13 17:34:27 +02:00
int64_t s1 = m_fileInfo [ j ] . m_file - > getFileSize ( ) ;
int64_t s2 = m_fileInfo [ j + 1 ] . m_file - > getFileSize ( ) ;
int64_t tmp ;
if ( s2 = = 0 ) continue ;
if ( s1 < s2 ) { tmp = s1 ; s1 = s2 ; s2 = tmp ; }
ratio + = ( double ) s1 / ( double ) s2 ;
}
2016-10-14 17:02:41 +02:00
if ( mergeFileCount > = 2 ) ratio / = ( double ) ( mergeFileCount - 1 ) ;
2016-10-13 17:34:27 +02:00
// sanity check
if ( ratio < 0.0 ) {
logf ( LOG_LOGIC , " merge: ratio is negative %.02f " , ratio ) ;
2016-10-18 16:18:42 +02:00
gbshutdownLogicError ( ) ;
2016-10-13 17:34:27 +02:00
}
// the adjusted ratio
double adjratio = ratio ;
// . adjust ratio based on file size of current winner
// . if winner is ratio of 1:1 and we are 10:1 but winner
// is 10 times bigger than us, then we have a tie.
// . i think if we are 10:1 and winner is 3 times bigger
// we should have a tie
if ( mini > = 0 & & total > 0 & & mint > 0 ) {
double sratio = ( double ) total / ( double ) mint ;
//if(mint>total ) sratio = (float)mint/(float)total;
//else sratio = (float)total/(float)mint;
adjratio * = sratio ;
}
// debug the merge selection
int64_t prevSize = 0 ;
if ( i > 0 )
prevSize = m_fileInfo [ i - 1 ] . m_file - > getFileSize ( ) ;
log ( LOG_INFO , " merge: i=% " PRId32 " n=% " PRId32 " ratio=%.2f adjratio=%.2f "
" minr=%.2f mint=% " PRId64 " mini=% " PRId32 " prevFileSize=% " PRId64 " "
" mergeFileSize=% " PRId64 " oldestfile=%.02fdays "
" collnum=% " PRId32 ,
2016-10-14 17:02:41 +02:00
i , mergeFileCount , ratio , adjratio , minr , mint , mini ,
2016-10-13 17:34:27 +02:00
prevSize , total ,
( ( float ) nowLocal - date ) / ( 24 * 3600.0 ) ,
( int32_t ) m_collnum ) ;
// bring back the greedy merge
if ( total > = mint ) {
continue ;
}
// . don't get TOO lopsided on me now
// . allow it for now! this is the true greedy method... no!
// . an older small file can be cut off early on by a merge
// of middle files. the little guy can end up never getting
// merged unless we have this.
// . allow a file to be 4x bigger than the one before it, this
// allows a little bit of lopsidedness.
if ( i > 0 & & m_fileInfo [ i - 1 ] . m_file - > getFileSize ( ) < total / 4 ) {
continue ;
}
//min = total;
minr = ratio ;
mint = total ;
mini = i ;
minOld = old ;
}
* p_mini = mini ;
}
2013-08-02 13:12:24 -07:00
// . use the maps and tree to estimate the size of this list w/o hitting disk
// . used by Indexdb.cpp to get the size of a list for IDF weighting purposes
2016-10-28 15:34:41 +02:00
int64_t RdbBase : : estimateListSize ( const char * startKey , const char * endKey , char * maxKey ,
int64_t oldTruncationLimit ) const {
2013-08-02 13:12:24 -07:00
// . reset this to low points
// . this is on
2016-10-06 12:14:20 +02:00
KEYSET ( maxKey , endKey , m_ks ) ;
2013-08-02 13:12:24 -07:00
bool first = true ;
// do some looping
char newGuy [ MAX_KEY_BYTES ] ;
2014-10-30 13:36:39 -06:00
int64_t totalBytes = 0 ;
2014-11-10 14:45:11 -08:00
for ( int32_t i = 0 ; i < m_numFiles ; i + + ) {
2013-08-02 13:12:24 -07:00
// the start and end pages for a page range
2014-11-10 14:45:11 -08:00
int32_t pg1 , pg2 ;
2013-08-02 13:12:24 -07:00
// get the start and end pages for this startKey/endKey
2016-10-04 15:06:54 +02:00
m_fileInfo [ i ] . m_map - > getPageRange ( startKey ,
endKey ,
& pg1 ,
& pg2 ,
newGuy ,
oldTruncationLimit ) ;
2013-08-02 13:12:24 -07:00
// . get the range size add it to count
// . some of these records are negative recs (deletes) so
// our count may be small
// . also, count may be a bit small because getRecSizes() may
// not recognize some recs on the page boundaries as being
// in [startKey,endKey]
// . this can now return negative sizes
// . the "true" means to subtract page sizes that begin with
// delete keys (the key's low bit is clear)
// . get the minKey and maxKey in this range
// . minKey2 may be bigger than the actual minKey for this
// range, likewise, maxKey2 may be smaller than the actual
// maxKey, but should be good estimates
2016-10-04 15:06:54 +02:00
int64_t maxBytes = m_fileInfo [ i ] . m_map - > getMaxRecSizes ( pg1 , pg2 , startKey , endKey , true ) ; //subtrct
2016-05-02 16:03:40 +02:00
2013-08-02 13:12:24 -07:00
// get the min as well
2016-10-04 15:06:54 +02:00
int64_t minBytes = m_fileInfo [ i ] . m_map - > getMinRecSizes ( pg1 , pg2 , startKey , endKey , true ) ; //subtrct
2016-05-02 16:03:40 +02:00
2014-10-30 13:36:39 -06:00
int64_t avg = ( maxBytes + minBytes ) / 2LL ;
2016-05-02 16:03:40 +02:00
2013-08-02 13:12:24 -07:00
// use that
totalBytes + = avg ;
2016-05-02 16:03:40 +02:00
2016-10-06 12:14:20 +02:00
// if not too many pages then don't even bother setting "maxKey"
2013-08-02 13:12:24 -07:00
// since it is used only for interpolating if this term is
// truncated. if only a few pages then it might be way too
// small.
if ( pg1 + 5 > pg2 ) continue ;
2016-10-06 12:14:20 +02:00
// replace *maxKey automatically if this is our first time
if ( first ) {
KEYSET ( maxKey , newGuy , m_ks ) ;
first = false ;
continue ;
}
2013-08-02 13:12:24 -07:00
// . get the SMALLEST max key
// . this is used for estimating what the size of the list
// would be without truncation
2016-10-06 12:14:20 +02:00
if ( KEYCMP ( newGuy , maxKey , m_ks ) > 0 )
KEYSET ( maxKey , newGuy , m_ks ) ;
2013-08-02 13:12:24 -07:00
}
// TODO: now get from the btree!
// before getting from the map (on disk IndexLists) get upper bound
// from the in memory b-tree
2014-11-10 14:45:11 -08:00
//int32_t n=getTree()->getListSize (startKey, endKey, &minKey2, &maxKey2);
2014-10-30 13:36:39 -06:00
int64_t n ;
2016-10-06 12:14:20 +02:00
if ( m_tree )
2017-03-24 14:34:37 +01:00
n = m_tree - > estimateListSize ( m_collnum , startKey , endKey , NULL , NULL ) ;
2016-10-06 12:14:20 +02:00
else
2017-04-05 15:11:50 +02:00
n = m_buckets - > estimateListSize ( m_collnum , startKey , endKey , NULL , NULL ) ;
2014-11-25 15:54:15 -07:00
2013-08-02 13:12:24 -07:00
totalBytes + = n ;
// ensure totalBytes >= 0
if ( totalBytes < 0 ) totalBytes = 0 ;
return totalBytes ;
}
2016-10-28 16:16:47 +02:00
int64_t RdbBase : : estimateNumGlobalRecs ( ) const {
2013-10-04 16:18:56 -07:00
return getNumTotalRecs ( ) * g_hostdb . m_numShards ;
2013-08-02 13:12:24 -07:00
}
// . return number of positive records - negative records
2016-08-01 17:03:13 +02:00
int64_t RdbBase : : getNumTotalRecs ( ) const {
2014-10-30 13:36:39 -06:00
int64_t numPositiveRecs = 0 ;
int64_t numNegativeRecs = 0 ;
2014-11-10 14:45:11 -08:00
for ( int32_t i = 0 ; i < m_numFiles ; i + + ) {
2013-08-02 13:12:24 -07:00
// skip even #'d files -- those are merge files
2016-10-04 15:06:54 +02:00
if ( ( m_fileInfo [ i ] . m_fileId & 0x01 ) = = 0 ) continue ;
numPositiveRecs + = m_fileInfo [ i ] . m_map - > getNumPositiveRecs ( ) ;
numNegativeRecs + = m_fileInfo [ i ] . m_map - > getNumNegativeRecs ( ) ;
2013-08-02 13:12:24 -07:00
}
// . add in the btree
// . TODO: count negative and positive recs in the b-tree
// . assume all positive for now
// . for now let Rdb add the tree in RdbBase::getNumTotalRecs()
if ( m_tree ) {
numPositiveRecs + = m_tree - > getNumPositiveKeys ( m_collnum ) ;
numNegativeRecs + = m_tree - > getNumNegativeKeys ( m_collnum ) ;
}
else {
2014-02-07 18:28:09 -08:00
// i've seen this happen when adding a new coll i guess
if ( ! m_buckets ) return 0 ;
2015-12-11 19:07:14 +01:00
2013-08-02 13:12:24 -07:00
//these routines are slow because they count every time.
numPositiveRecs + = m_buckets - > getNumKeys ( m_collnum ) ;
}
return numPositiveRecs - numNegativeRecs ;
}
2016-10-17 11:57:37 +02:00
// . how much mem is allocated for all of our maps?
2013-08-02 13:12:24 -07:00
// . we have one map per file
2016-10-17 11:57:37 +02:00
int64_t RdbBase : : getMapMemAllocated ( ) const {
int64_t allocated = 0 ;
2014-11-10 14:45:11 -08:00
for ( int32_t i = 0 ; i < m_numFiles ; i + + )
2016-10-17 11:57:37 +02:00
allocated + = m_fileInfo [ i ] . m_map - > getMemAllocated ( ) ;
return allocated ;
2013-08-02 13:12:24 -07:00
}
2017-04-21 11:43:08 +02:00
int32_t RdbBase : : getNumFiles ( ) const {
ScopedLock sl ( m_mtxFileInfo ) ;
return m_numFiles ;
}
2013-08-02 13:12:24 -07:00
// sum of all parts of all big files
2016-08-01 17:03:13 +02:00
int32_t RdbBase : : getNumSmallFiles ( ) const {
2014-11-10 14:45:11 -08:00
int32_t count = 0 ;
for ( int32_t i = 0 ; i < m_numFiles ; i + + )
2016-10-04 15:06:54 +02:00
count + = m_fileInfo [ i ] . m_file - > getNumParts ( ) ;
2013-08-02 13:12:24 -07:00
return count ;
}
2016-08-01 17:03:13 +02:00
int64_t RdbBase : : getDiskSpaceUsed ( ) const {
2014-10-30 13:36:39 -06:00
int64_t count = 0 ;
2014-11-10 14:45:11 -08:00
for ( int32_t i = 0 ; i < m_numFiles ; i + + )
2016-10-04 15:06:54 +02:00
count + = m_fileInfo [ i ] . m_file - > getFileSize ( ) ;
2013-08-02 13:12:24 -07:00
return count ;
}
2016-10-31 18:16:40 +01:00
//Calculate how much space will be needed for merging files [startFileNum .. startFileNum+numFiles)
//The estimate is an upper bound.
uint64_t RdbBase : : getSpaceNeededForMerge ( int startFileNum , int numFiles ) const {
//The "upper bound" is implicitly true. Due to internal fragmenation in the file system we will
//likely use a fewer blocks/segments than the original files. It can be wrong if the target
//file system uses blocks/sectors/segments/extends much larger than the source file system.
uint64_t total = 0 ;
for ( int i = 0 ; i < startFileNum + numFiles & & i < m_numFiles ; i + + )
total + = m_fileInfo [ i ] . m_file - > getFileSize ( ) ;
return total ;
}
2015-12-18 15:19:24 +01:00
void RdbBase : : saveMaps ( ) {
2016-11-03 17:45:20 +01:00
logTrace ( g_conf . m_logTraceRdbBase , " BEGIN " ) ;
2017-04-28 16:35:56 +02:00
ScopedLock sl ( m_mtxFileInfo ) ;
for ( int32_t i = 0 ; i < m_numFiles ; i + + ) {
if ( ! m_fileInfo [ i ] . m_map ) {
2015-12-18 15:19:24 +01:00
log ( " base: map for file #%i is null " , i ) ;
2015-01-06 11:28:55 -08:00
continue ;
}
2015-12-18 15:19:24 +01:00
2017-04-28 16:35:56 +02:00
if ( ( m_fileInfo [ i ] . m_fileId & 0x01 ) = = 0 | | ! m_fileInfo [ i ] . m_allowReads ) {
// don't write map for files that are merging/dumping
continue ;
}
bool status = m_fileInfo [ i ] . m_map - > writeMap ( false ) ;
if ( ! status ) {
2015-12-18 15:19:24 +01:00
// unable to write, let's abort
2016-10-18 16:18:42 +02:00
gbshutdownResourceError ( ) ;
2015-12-18 15:19:24 +01:00
}
2015-01-06 11:28:55 -08:00
}
2016-11-03 17:45:20 +01:00
logTrace ( g_conf . m_logTraceRdbBase , " END " ) ;
2013-08-02 13:12:24 -07:00
}
2016-08-22 16:35:27 +02:00
void RdbBase : : saveTreeIndex ( ) {
2016-11-03 17:45:20 +01:00
logTrace ( g_conf . m_logTraceRdbBase , " BEGIN " ) ;
2016-08-17 01:21:25 +02:00
if ( ! m_useIndexFile ) {
2016-11-03 17:45:20 +01:00
logTrace ( g_conf . m_logTraceRdbBase , " END. useIndexFile disabled " ) ;
2016-08-17 01:21:25 +02:00
return ;
}
2017-03-13 13:39:10 +01:00
if ( ! m_treeIndex . writeIndex ( false ) ) {
2016-08-17 01:21:25 +02:00
// unable to write, let's abort
2016-10-18 16:18:42 +02:00
gbshutdownResourceError ( ) ;
2016-08-17 01:21:25 +02:00
}
2016-11-03 17:45:20 +01:00
logTrace ( g_conf . m_logTraceRdbBase , " END " ) ;
2016-08-22 16:35:27 +02:00
}
2016-08-17 01:21:25 +02:00
2016-08-22 16:35:27 +02:00
void RdbBase : : saveIndexes ( ) {
2016-11-03 17:45:20 +01:00
logTrace ( g_conf . m_logTraceRdbBase , " BEGIN " ) ;
2016-08-31 11:15:11 +02:00
if ( ! m_useIndexFile ) {
return ;
}
2017-04-28 16:35:56 +02:00
ScopedLock sl ( m_mtxFileInfo ) ;
2016-08-22 16:35:27 +02:00
for ( int32_t i = 0 ; i < m_numFiles ; i + + ) {
2016-10-04 15:06:54 +02:00
if ( ! m_fileInfo [ i ] . m_index ) {
2016-08-22 16:35:27 +02:00
log ( LOG_WARN , " base: index for file #%i is null " , i ) ;
2016-08-05 15:42:20 +02:00
continue ;
}
2017-04-28 16:35:56 +02:00
if ( ( m_fileInfo [ i ] . m_fileId & 0x01 ) = = 0 | | ! m_fileInfo [ i ] . m_allowReads ) {
// don't write index for files that are merging/dumping
2017-03-13 15:11:21 +01:00
continue ;
}
2017-03-13 13:39:10 +01:00
if ( ! m_fileInfo [ i ] . m_index - > writeIndex ( true ) ) {
2016-08-05 15:42:20 +02:00
// unable to write, let's abort
2016-10-18 16:18:42 +02:00
gbshutdownResourceError ( ) ;
2016-08-05 15:42:20 +02:00
}
}
2016-11-03 17:45:20 +01:00
logTrace ( g_conf . m_logTraceRdbBase , " END " ) ;
2016-08-05 15:42:20 +02:00
}
2014-01-18 21:19:26 -08:00
bool RdbBase : : verifyFileSharding ( ) {
2014-09-29 13:05:20 -07:00
// if swapping in from CollectionRec::getBase() then do
// not re-verify file sharding! only do at startup
if ( g_loop . m_isDoingLoop ) return true ;
2015-04-07 10:33:54 -07:00
// skip for now to speed up startup
2015-04-07 12:29:19 -07:00
static int32_t s_count = 0 ;
s_count + + ;
if ( s_count = = 50 )
2016-11-02 11:24:55 +01:00
log ( LOG_WARN , " db: skipping shard verification for remaining files " ) ;
2015-04-07 12:29:19 -07:00
if ( s_count > = 50 )
return true ;
2015-04-07 10:33:54 -07:00
2014-01-18 21:19:26 -08:00
Msg5 msg5 ;
RdbList list ;
2014-11-10 14:45:11 -08:00
int32_t minRecSizes = 64000 ;
2016-11-02 11:30:54 +01:00
rdbid_t rdbId = m_rdb - > getRdbId ( ) ;
2014-01-18 21:19:26 -08:00
if ( rdbId = = RDB_TITLEDB ) minRecSizes = 640000 ;
2016-07-12 14:51:25 +02:00
log ( LOG_DEBUG , " db: Verifying shard parity for %s of % " PRId32 " bytes for coll %s (collnum=% " PRId32 " )... " ,
m_dbname , minRecSizes , m_coll , ( int32_t ) m_collnum ) ;
2014-09-11 12:11:34 -07:00
2016-11-03 11:18:13 +01:00
if ( ! msg5 . getList ( m_rdb - > getRdbId ( ) ,
2014-03-06 15:48:11 -08:00
m_collnum ,
2014-01-18 21:19:26 -08:00
& list ,
2017-03-14 15:38:09 +01:00
KEYMIN ( ) ,
KEYMAX ( ) ,
2014-01-18 21:19:26 -08:00
minRecSizes ,
true , // includeTree ,
0 , // startFileNum ,
- 1 , // numFiles ,
NULL , // state
NULL , // callback
0 , // niceness
false , // err correction?
2016-08-01 13:34:10 +02:00
- 1 , // maxRetries
2017-05-31 16:07:51 +02:00
false ) ) { // isRealMerge
2016-07-07 10:29:05 +02:00
log ( LOG_DEBUG , " db: HEY! it did not block " ) ;
return false ;
2014-01-18 21:19:26 -08:00
}
2014-11-10 14:45:11 -08:00
int32_t count = 0 ;
int32_t got = 0 ;
int32_t printed = 0 ;
2014-01-18 21:19:26 -08:00
char k [ MAX_KEY_BYTES ] ;
2016-07-12 14:51:25 +02:00
for ( list . resetListPtr ( ) ; ! list . isExhausted ( ) ; list . skipCurrentRecord ( ) ) {
2014-01-18 21:19:26 -08:00
list . getCurrentKey ( k ) ;
2014-03-15 20:07:02 -07:00
// skip negative keys
2016-11-02 11:45:31 +01:00
if ( KEYNEG ( k ) ) {
continue ;
}
2014-03-15 20:07:02 -07:00
2014-01-18 21:19:26 -08:00
count + + ;
2014-11-10 14:45:11 -08:00
uint32_t shardNum = getShardNum ( rdbId , k ) ;
2014-01-18 21:19:26 -08:00
if ( shardNum = = getMyShardNum ( ) ) {
got + + ;
continue ;
}
if ( + + printed > 100 ) continue ;
2014-09-11 12:11:34 -07:00
// avoid log spam... comment this out. nah print out 1st 100.
2016-07-12 14:51:25 +02:00
log ( " db: Found bad key in list belongs to shard % " PRId32 , shardNum ) ;
2014-01-18 21:19:26 -08:00
}
2014-09-11 12:11:34 -07:00
if ( got = = count ) {
return true ;
}
2014-01-18 21:19:26 -08:00
// tally it up
g_rebalance . m_numForeignRecs + = count - got ;
2016-07-12 14:51:25 +02:00
log ( LOG_INFO , " db: Out of first % " PRId32 " records in %s for %s.% " PRId32 " , only % " PRId32 " belong "
" to our group. " , count , m_dbname , m_coll , ( int32_t ) m_collnum , got ) ;
2014-01-18 21:19:26 -08:00
//log ( "db: Exiting due to Posdb inconsistency." );
return true ; //g_conf.m_bypassValidation;
}
2017-03-17 14:48:10 +01:00
bool RdbBase : : initializeGlobalIndexThread ( ) {
return m_globalIndexThreadQueue . initialize ( generateGlobalIndex , " generate-index " ) ;
}
void RdbBase : : finalizeGlobalIndexThread ( ) {
m_globalIndexThreadQueue . finalize ( ) ;
}
2017-05-31 12:07:07 +02:00
std : : vector < std : : pair < int32_t , docidsconst_ptr_t > > RdbBase : : prepareGlobalIndexJob ( bool markFileReadable , int32_t fileId ) {
2017-03-17 14:48:10 +01:00
ScopedLock sl ( m_mtxFileInfo ) ;
2017-04-19 16:20:24 +02:00
return prepareGlobalIndexJob_unlocked ( markFileReadable , fileId ) ;
2017-03-17 14:48:10 +01:00
}
2017-05-31 12:07:07 +02:00
std : : vector < std : : pair < int32_t , docidsconst_ptr_t > > RdbBase : : prepareGlobalIndexJob_unlocked ( bool markFileReadable , int32_t fileId ) {
std : : vector < std : : pair < int32_t , docidsconst_ptr_t > > docIdFileIndexes ;
2017-03-17 14:48:10 +01:00
// global index does not include RdbIndex from tree/buckets
for ( int32_t i = 0 ; i < m_numFiles ; i + + ) {
2017-04-19 16:20:24 +02:00
if ( markFileReadable & & m_fileInfo [ i ] . m_fileId = = fileId ) {
m_fileInfo [ i ] . m_pendingGenerateIndex = true ;
}
2017-03-21 12:27:13 +01:00
if ( m_fileInfo [ i ] . m_allowReads | | m_fileInfo [ i ] . m_pendingGenerateIndex ) {
2017-05-31 12:07:07 +02:00
docIdFileIndexes . emplace_back ( i , m_fileInfo [ i ] . m_index - > getDocIds ( ) ) ;
2017-03-17 14:48:10 +01:00
}
}
2017-05-31 12:07:07 +02:00
return docIdFileIndexes ;
2017-03-17 14:48:10 +01:00
}
2017-04-19 16:20:24 +02:00
void RdbBase : : submitGlobalIndexJob ( bool markFileReadable , int32_t fileId ) {
2017-03-17 14:48:10 +01:00
if ( ! m_useIndexFile ) {
return ;
}
2017-04-19 16:20:24 +02:00
ThreadQueueItem * item = new ThreadQueueItem ( this , prepareGlobalIndexJob ( markFileReadable , fileId ) , markFileReadable , fileId ) ;
2017-03-17 14:48:10 +01:00
m_globalIndexThreadQueue . addItem ( item ) ;
log ( LOG_INFO , " db: Submitted job %p to generate global index for %s " , item , m_rdb - > getDbname ( ) ) ;
}
2017-04-19 16:20:24 +02:00
void RdbBase : : submitGlobalIndexJob_unlocked ( bool markFileReadable , int32_t fileId ) {
2017-03-17 14:48:10 +01:00
if ( ! m_useIndexFile ) {
return ;
}
2017-04-19 16:20:24 +02:00
ThreadQueueItem * item = new ThreadQueueItem ( this , prepareGlobalIndexJob_unlocked ( markFileReadable , fileId ) , markFileReadable , fileId ) ;
2017-03-17 14:48:10 +01:00
m_globalIndexThreadQueue . addItem ( item ) ;
log ( LOG_INFO , " db: Submitted job %p to generate global index for %s " , item , m_rdb - > getDbname ( ) ) ;
}
bool RdbBase : : hasPendingGlobalIndexJob ( ) {
if ( ! m_useIndexFile ) {
return false ;
}
return ! m_globalIndexThreadQueue . isEmpty ( ) ;
}
void RdbBase : : generateGlobalIndex ( void * item ) {
ThreadQueueItem * queueItem = static_cast < ThreadQueueItem * > ( item ) ;
log ( LOG_INFO , " db: Processing job %p to generate global index " , item ) ;
2017-05-31 12:07:07 +02:00
docids_ptr_t tmpDocIdFileIndex ( new docids_t ) ;
for ( auto it = queueItem - > m_docIdFileIndexes . begin ( ) ; it ! = queueItem - > m_docIdFileIndexes . end ( ) ; + + it ) {
auto i = it - > first ;
const auto & docIds = it - > second ;
tmpDocIdFileIndex - > reserve ( tmpDocIdFileIndex - > size ( ) + docIds - > size ( ) ) ;
std : : transform ( docIds - > begin ( ) , docIds - > end ( ) , std : : back_inserter ( * tmpDocIdFileIndex ) ,
[ i ] ( uint64_t docId ) {
return ( ( docId < < s_docIdFileIndex_docIdOffset ) | i ) ; // docId has delete key
} ) ;
}
std : : stable_sort ( tmpDocIdFileIndex - > begin ( ) , tmpDocIdFileIndex - > end ( ) ,
2017-03-17 14:48:10 +01:00
[ ] ( uint64_t a , uint64_t b ) {
return ( a & s_docIdFileIndex_docIdMask ) < ( b & s_docIdFileIndex_docIdMask ) ;
} ) ;
// in reverse because we want to keep the highest file position
2017-05-31 12:07:07 +02:00
auto it = std : : unique ( tmpDocIdFileIndex - > rbegin ( ) , tmpDocIdFileIndex - > rend ( ) ,
2017-03-17 14:48:10 +01:00
[ ] ( uint64_t a , uint64_t b ) {
return ( a & s_docIdFileIndex_docIdMask ) = = ( b & s_docIdFileIndex_docIdMask ) ;
} ) ;
2017-05-31 12:07:07 +02:00
tmpDocIdFileIndex - > erase ( tmpDocIdFileIndex - > begin ( ) , it . base ( ) ) ;
2017-03-17 14:48:10 +01:00
// free up used space
2017-05-31 12:07:07 +02:00
tmpDocIdFileIndex - > shrink_to_fit ( ) ;
2017-03-17 14:48:10 +01:00
// replace with new index
ScopedLock sl ( queueItem - > m_base - > m_mtxFileInfo ) ;
ScopedLock sl2 ( queueItem - > m_base - > m_docIdFileIndexMtx ) ;
2017-05-31 12:07:07 +02:00
queueItem - > m_base - > m_docIdFileIndex . swap ( tmpDocIdFileIndex ) ;
2017-03-17 14:48:10 +01:00
if ( queueItem - > m_markFileReadable ) {
2017-04-19 16:20:24 +02:00
for ( auto i = 0 ; i < queueItem - > m_base - > m_numFiles ; + + i ) {
if ( queueItem - > m_base - > m_fileInfo [ i ] . m_fileId = = queueItem - > m_fileId ) {
queueItem - > m_base - > m_fileInfo [ i ] . m_allowReads = true ;
queueItem - > m_base - > m_fileInfo [ i ] . m_pendingGenerateIndex = false ;
break ;
}
}
2017-03-17 14:48:10 +01:00
}
log ( LOG_INFO , " db: Processed job %p to generate global index " , item ) ;
2017-03-27 12:55:10 +02:00
2017-03-27 14:06:52 +02:00
delete queueItem ;
2017-03-17 14:48:10 +01:00
}
2016-10-04 15:06:54 +02:00
/// @todo ALC we should free up m_fileInfo[i].m_index->m_docIds when we don't need it, and load it back when we do
2016-08-18 15:05:41 +02:00
void RdbBase : : generateGlobalIndex ( ) {
if ( ! m_useIndexFile ) {
return ;
}
2016-10-06 14:32:32 +02:00
log ( LOG_INFO , " db: Generating global index for %s " , m_rdb - > getDbname ( ) ) ;
2016-08-29 13:54:50 +02:00
docids_ptr_t tmpDocIdFileIndex ( new docids_t ) ;
2016-08-05 15:42:20 +02:00
2017-03-17 14:48:10 +01:00
ScopedLock sl ( m_mtxFileInfo ) ;
2016-08-19 15:13:27 +02:00
// global index does not include RdbIndex from tree/buckets
2016-08-18 15:05:41 +02:00
for ( int32_t i = 0 ; i < m_numFiles ; i + + ) {
2017-03-17 14:48:10 +01:00
if ( ! m_fileInfo [ i ] . m_allowReads ) {
continue ;
}
2016-10-04 15:06:54 +02:00
auto docIds = m_fileInfo [ i ] . m_index - > getDocIds ( ) ;
2016-08-29 13:54:50 +02:00
tmpDocIdFileIndex - > reserve ( tmpDocIdFileIndex - > size ( ) + docIds - > size ( ) ) ;
std : : transform ( docIds - > begin ( ) , docIds - > end ( ) , std : : back_inserter ( * tmpDocIdFileIndex ) ,
2016-09-26 16:18:34 +02:00
[ i ] ( uint64_t docId ) {
2016-10-07 12:30:35 +02:00
return ( ( docId < < s_docIdFileIndex_docIdOffset ) | i ) ; // docId has delete key
2016-09-26 16:18:34 +02:00
} ) ;
2016-08-18 15:05:41 +02:00
}
2017-03-17 14:48:10 +01:00
sl . unlock ( ) ;
2016-08-05 15:42:20 +02:00
2016-10-10 12:33:45 +02:00
std : : stable_sort ( tmpDocIdFileIndex - > begin ( ) , tmpDocIdFileIndex - > end ( ) ,
[ ] ( uint64_t a , uint64_t b ) {
return ( a & s_docIdFileIndex_docIdMask ) < ( b & s_docIdFileIndex_docIdMask ) ;
} ) ;
2016-09-26 16:18:34 +02:00
// in reverse because we want to keep the highest file position
2016-08-29 13:54:50 +02:00
auto it = std : : unique ( tmpDocIdFileIndex - > rbegin ( ) , tmpDocIdFileIndex - > rend ( ) ,
2016-09-26 16:18:34 +02:00
[ ] ( uint64_t a , uint64_t b ) {
return ( a & s_docIdFileIndex_docIdMask ) = = ( b & s_docIdFileIndex_docIdMask ) ;
} ) ;
2016-08-29 13:54:50 +02:00
tmpDocIdFileIndex - > erase ( tmpDocIdFileIndex - > begin ( ) , it . base ( ) ) ;
2016-08-25 16:29:43 +02:00
// free up used space
2016-08-29 13:54:50 +02:00
tmpDocIdFileIndex - > shrink_to_fit ( ) ;
2016-11-07 16:03:17 +01:00
// replace with new index
ScopedLock sl2 ( m_docIdFileIndexMtx ) ;
m_docIdFileIndex . swap ( tmpDocIdFileIndex ) ;
}
2016-10-10 12:32:48 +02:00
void RdbBase : : printGlobalIndex ( ) {
2017-03-17 14:48:10 +01:00
logf ( LOG_TRACE , " db: global index " ) ;
2016-10-10 12:32:48 +02:00
auto globalIndex = getGlobalIndex ( ) ;
for ( auto key : * globalIndex ) {
logf ( LOG_TRACE , " db: docId=% " PRId64 " index=% " PRId64 " isDel=%d key=% " PRIx64 ,
key > > RdbBase : : s_docIdFileIndex_docIdDelKeyOffset ,
key & RdbBase : : s_docIdFileIndex_filePosMask ,
( ( key & RdbBase : : s_docIdFileIndex_delBitMask ) = = 0 ) ,
key ) ;
}
}
2016-08-29 13:54:50 +02:00
docidsconst_ptr_t RdbBase : : getGlobalIndex ( ) {
ScopedLock sl ( m_docIdFileIndexMtx ) ;
return m_docIdFileIndex ;
2016-08-19 15:13:27 +02:00
}