363 lines
12 KiB
C++
363 lines
12 KiB
C++
#include "gb-include.h"
|
|
|
|
#include "RdbScan.h"
|
|
#include "DiskPageCache.h"
|
|
#include "Rdb.h"
|
|
|
|
void gotListWrapper ( void *state ) ;
|
|
|
|
// . readset up for a scan of slots in the RdbScans
|
|
// . returns false if blocked, true otherwise
|
|
// . sets errno on error
|
|
bool RdbScan::setRead ( BigFile *file ,
|
|
int32_t fixedDataSize,
|
|
int64_t offset ,
|
|
int32_t bytesToRead ,
|
|
//key_t startKey ,
|
|
//key_t endKey ,
|
|
char *startKey ,
|
|
char *endKey ,
|
|
char keySize ,
|
|
RdbList *list , // we fill this up
|
|
void *state ,
|
|
void (* callback) ( void *state ) ,
|
|
bool useHalfKeys ,
|
|
char rdbId ,
|
|
int32_t niceness ,
|
|
bool allowPageCache ,
|
|
bool hitDisk ) {
|
|
// remember list
|
|
m_list = list;
|
|
// reset the list
|
|
m_list->reset();
|
|
// save keySize
|
|
m_ks = keySize;
|
|
m_rdbId = rdbId;
|
|
// save allow page cache
|
|
m_allowPageCache = allowPageCache;
|
|
m_hitDisk = hitDisk;
|
|
// ensure startKey last bit clear, endKey last bit set
|
|
//if ( (startKey.n0 & 0x01) == 0x01 )
|
|
// log("RdbScan::setRead: warning startKey lastbit set");
|
|
//if ( (endKey.n0 & 0x01) == 0x00 )
|
|
// log("RdbScan::setRead: warning endKey lastbit clear");
|
|
// set list now
|
|
m_list->set ( NULL ,
|
|
0 ,
|
|
NULL ,
|
|
0 ,
|
|
startKey ,
|
|
endKey ,
|
|
fixedDataSize ,
|
|
true , // ownData?
|
|
useHalfKeys ,
|
|
keySize );
|
|
// . don't do anything if startKey exceeds endKey
|
|
// . often Msg3 will call us with this true because it's page range
|
|
// is empty because the map knows without having to hit disk.
|
|
// therefore, just return silently now.
|
|
// . Msg3 will not merge empty lists so don't worry about setting the
|
|
// lists startKey/endKey
|
|
//if ( startKey > endKey ) return true;
|
|
if ( KEYCMP(startKey,endKey,m_ks)>0 ) return true;
|
|
// log("RdbScan::readList: startKey > endKey warning");
|
|
// return true;
|
|
//}
|
|
// don't bother doing anything if nothing needs to be read
|
|
if ( bytesToRead == 0 ) return true;
|
|
|
|
// . start reading at m_offset in the file
|
|
// . also, remember this offset for finding the offset of the last key
|
|
// to set a tighter m_bufEnd in doneReading() so we don't have to
|
|
// keep checking if the returned record's key falls exactly in
|
|
// [m_startKey,m_endKey]
|
|
// . set m_bufSize to how many bytes we need to read
|
|
// . m_keyMin is the first key we read, may be < startKey
|
|
// . we won't read any keys strictly greater than "m_keyMax"
|
|
// . m_hint is set to the offset of the BIGGEST key found in the map
|
|
// that is still <= endKey
|
|
// . we use m_hint so that RdbList::merge() can find the last key
|
|
// in the startKey/endKey range w/o having to step through
|
|
// all the records in the read
|
|
// . m_hint will limit the stepping to a PAGE_SIZE worth of records
|
|
// . m_hint is an offset, like m_offset
|
|
// . TODO: what if it returns false?
|
|
|
|
// debug msg
|
|
//if ( m_bufSize > 1024 * 1024 * 3 ) {
|
|
// fprintf(stderr,"BIG READ\n");
|
|
// sleep(5);
|
|
//}
|
|
// . alloc some read buffer space, m_buf
|
|
// . add 4 extra in case first key is half key and needs to be full
|
|
int32_t bufSize = bytesToRead ;
|
|
// add 6 more if we use half keys
|
|
if ( useHalfKeys ) m_off = 6;
|
|
else m_off = 0;
|
|
// posdb keys are 18 bytes but can be 12 ot 6 bytes compressed
|
|
if ( m_rdbId == RDB_POSDB || m_rdbId == RDB2_POSDB2 ) m_off = 12;
|
|
// alloc more for expanding the first 6-byte key into 12 bytes,
|
|
// or in the case of posdb, expanding a 6 byte key into 18 bytes
|
|
bufSize += m_off;
|
|
// . and a little extra in case read() reads TOO much
|
|
// . i think a read overflow might be causing a segv in malloc
|
|
// . but try badding under us, maybe read() writes before the buf
|
|
int32_t pad = 16;
|
|
bufSize += pad;
|
|
// get the memory to hold what we read
|
|
//char *buf = (char *) mmalloc ( bufSize , "RdbScan" );
|
|
//if ( ! buf ) {
|
|
// log("disk: Could not allocate %"INT32" bytes for read of %s.",
|
|
// bufSize ,file->getFilename());
|
|
// return true;
|
|
//}
|
|
// note
|
|
//logf(LOG_DEBUG,"db: list %"UINT32" has buf %"UINT32".",(int32_t)m_list,(int32_t)buf);
|
|
// . set up the list
|
|
// . set min/max keys on list if we're done reading
|
|
// . the min/maxKey defines the range of keys we read
|
|
// . m_hint is the offset of the BIGGEST key in the map that is
|
|
// still <= the m_endKey specified in setRead()
|
|
// . it's used to make it easy to find the actual biggest key that is
|
|
// <= m_endKey
|
|
/*
|
|
m_list->set ( buf + pad + m_off ,
|
|
bytesToRead ,
|
|
buf ,
|
|
bufSize ,
|
|
startKey ,
|
|
endKey ,
|
|
fixedDataSize ,
|
|
true ,
|
|
useHalfKeys , // ownData?
|
|
m_ks );
|
|
*/
|
|
// save caller's callback
|
|
m_callback = callback;
|
|
m_state = state;
|
|
// save the first key in the list
|
|
//m_startKey = startKey;
|
|
KEYSET(m_startKey,startKey,m_ks);//m_list->m_ks);
|
|
KEYSET(m_endKey,endKey,m_ks);
|
|
m_fixedDataSize = fixedDataSize;
|
|
m_useHalfKeys = useHalfKeys;
|
|
m_bytesToRead = bytesToRead;
|
|
// save file and offset for sanity check
|
|
m_file = file;
|
|
m_offset = offset;
|
|
// ensure we don't mess around
|
|
m_fstate.m_allocBuf = NULL;
|
|
m_fstate.m_buf = NULL;
|
|
//m_fstate.m_usePartFiles = true;
|
|
// debug msg
|
|
//log("diskOff=%"INT64" nb=%"INT32"",offset,bytesToRead);
|
|
//if ( offset == 16386 && bytesToRead == 16386 )
|
|
// log("hey");
|
|
// . do a threaded, non-blocking read
|
|
// . we now pass in a NULL buffer so Threads.cpp will do the
|
|
// allocation right before launching the thread so we don't waste
|
|
// memory. i've seen like 19000 unlaunched threads each allocating
|
|
// 32KB for a tfndb read, hogging up all the memory.
|
|
//if ( ! file->read ( buf + pad + m_off ,
|
|
if ( ! file->read ( NULL ,
|
|
bytesToRead ,
|
|
offset ,
|
|
&m_fstate ,
|
|
this ,
|
|
gotListWrapper ,
|
|
niceness ,
|
|
m_allowPageCache ,
|
|
m_hitDisk ,
|
|
pad + m_off )) // allocOff, buf offset to read into
|
|
return false;
|
|
|
|
/*
|
|
// debug point
|
|
log("RDBSCAN: read %"INT32" bytes @ %"INT64"",bytesToRead, offset);
|
|
for ( int32_t i = 0 ; i < bytesToRead ; i++ ) {
|
|
if (((offset+i) % 20) == 0 )
|
|
fprintf(stderr,"\n%"INT64") ",offset+i);
|
|
fprintf(stderr,"%02hhx ",(buf+pad+m_off)[i]);
|
|
}
|
|
fprintf(stderr,"\n");
|
|
|
|
if ( offset == 49181 && bytesToRead == 98299 ) {
|
|
char *xx = NULL ;*xx = 0; }
|
|
*/
|
|
|
|
if ( m_fstate.m_errno && ! g_errno ) { char *xx=NULL;*xx=0; }
|
|
|
|
// fix the list if we need to
|
|
gotList();
|
|
// we did not block
|
|
return true;
|
|
}
|
|
|
|
void gotListWrapper ( void *state ) {
|
|
RdbScan *THIS = (RdbScan *)state;
|
|
THIS->gotList ();
|
|
// let caller know we're done
|
|
THIS->m_callback ( THIS->m_state );
|
|
}
|
|
|
|
#include "Threads.h"
|
|
|
|
void RdbScan::gotList ( ) {
|
|
char *allocBuf = m_fstate.m_allocBuf;
|
|
int32_t allocOff = m_fstate.m_allocOff; //buf=allocBuf+allocOff
|
|
int32_t allocSize = m_fstate.m_allocSize;
|
|
// do not free the allocated buf for when the actual thread
|
|
// does the read and finally completes in this case. we free it
|
|
// in Threads.cpp::ohcrap()
|
|
if ( m_fstate.m_errno == EDISKSTUCK )
|
|
return;
|
|
// just return on error, do nothing
|
|
if ( g_errno ) {
|
|
// free buffer though!! don't forget!
|
|
if ( allocBuf )
|
|
mfree ( allocBuf , allocSize , "RdbScan" );
|
|
m_fstate.m_allocBuf = NULL;
|
|
m_fstate.m_allocSize = 0;
|
|
return;
|
|
}
|
|
// . set our list here now since the buffer was allocated in
|
|
// DiskPageCache.cpp or Threads.cpp to save memory.
|
|
// . only set the list if there was a buffer. if not, it s probably
|
|
// due to a failed alloc and we'll just end up using the empty
|
|
// m_list we set way above.
|
|
if ( m_fstate.m_allocBuf ) {
|
|
// get the buffer info for setting the list
|
|
//char *allocBuf = m_fstate.m_allocBuf;
|
|
//int32_t allocSize = m_fstate.m_allocSize;
|
|
int32_t bytesDone = m_fstate.m_bytesDone;
|
|
// sanity checks
|
|
if ( bytesDone > allocSize ) {
|
|
char *xx = NULL; *xx = 0; }
|
|
if ( allocOff + m_bytesToRead != allocSize ) {
|
|
char *xx = NULL; *xx = 0; }
|
|
if ( allocOff != m_off + 16 ) {
|
|
char *xx = NULL; *xx = 0; }
|
|
// now set this list. this always succeeds.
|
|
m_list->set ( allocBuf + allocOff , // buf + pad + m_off ,
|
|
m_bytesToRead , // bytesToRead ,
|
|
allocBuf ,
|
|
allocSize ,
|
|
m_startKey ,
|
|
m_endKey ,
|
|
m_fixedDataSize ,
|
|
true , // ownData?
|
|
m_useHalfKeys ,
|
|
m_ks );
|
|
}
|
|
|
|
// this was bitching a lot when running on a multinode cluster,
|
|
// so i effectively disabled it by changing to _GBSANITYCHECK2_
|
|
//#ifdef GBSANITYCHECK2
|
|
// this first test, tests to make sure the read from cache worked
|
|
/*
|
|
DiskPageCache *pc = m_file->getDiskPageCache();
|
|
if ( pc &&
|
|
! g_errno &&
|
|
g_conf.m_logDebugDiskPageCache &&
|
|
// if we got it from the page cache, verify with disk
|
|
m_fstate.m_inPageCache ) {
|
|
// ensure threads disabled
|
|
bool on = ! g_threads.areThreadsDisabled();
|
|
if ( on ) g_threads.disableThreads();
|
|
//pc->disableCache();
|
|
FileState fstate;
|
|
// ensure we don't mess around
|
|
fstate.m_allocBuf = NULL;
|
|
fstate.m_buf = NULL;
|
|
char *bb = (char *)mmalloc ( m_bytesToRead , "RS" );
|
|
if ( ! bb ) {
|
|
log("db: Failed to alloc mem for page cache verify.");
|
|
goto skip;
|
|
}
|
|
m_file->read ( bb , // NULL, // buf + pad + m_off
|
|
m_bytesToRead ,
|
|
m_offset ,
|
|
&fstate , // &m_fstate
|
|
NULL , // callback state
|
|
gotListWrapper , // FAKE callback
|
|
MAX_NICENESS , // niceness
|
|
false, // m_allowPageCache ,... not for test!
|
|
m_hitDisk ,
|
|
16 + m_off );
|
|
//char *allocBuf = fstate.m_allocBuf;
|
|
//int32_t allocSize = fstate.m_allocSize;
|
|
//char *bb = allocBuf + fstate.m_allocOff;
|
|
// if file got unlinked from under us, or whatever, we get
|
|
// an error
|
|
if ( ! g_errno ) {
|
|
char *buf = m_list->getList();
|
|
if ( memcmp ( bb , buf , m_bytesToRead) != 0 ) {
|
|
char *xx = NULL; *xx = 0; }
|
|
if ( m_bytesToRead != m_list->getListSize() ) {
|
|
char *xx = NULL; *xx = 0; }
|
|
}
|
|
// compare
|
|
if ( memcmp ( allocBuf+allocOff, bb , m_bytesToRead ) ) {
|
|
log("db: failed diskpagecache verify");
|
|
char *xx=NULL;*xx=0;
|
|
}
|
|
//mfree ( allocBuf , allocSize , "RS" );
|
|
mfree ( bb , m_bytesToRead , "RS" );
|
|
if ( on ) g_threads.enableThreads();
|
|
//pc->enableCache();
|
|
// . this test tests to make sure the page stores worked
|
|
// . go through each page in page cache and verify on disk
|
|
//pc->verifyData ( m_file );
|
|
}
|
|
*/
|
|
// skip:
|
|
//#endif
|
|
// assume we did not shift it
|
|
m_shifted = 0;//false;
|
|
// if we were doing a cache only read, and got nothing, bail now
|
|
if ( ! m_hitDisk && m_list->isEmpty() ) return;
|
|
// if first key in list is half, make it full
|
|
char *p = m_list->getList();
|
|
// . bitch if we read too much!
|
|
// . i think a read overflow might be causing a segv in malloc
|
|
// . NOTE: BigFile's call to DiskPageCache alters these values
|
|
if ( m_fstate.m_bytesDone != m_fstate.m_bytesToGo && m_hitDisk )
|
|
log(LOG_INFO,"disk: Read %" INT64 " bytes but needed %" INT64 ".",
|
|
m_fstate.m_bytesDone , m_fstate.m_bytesToGo );
|
|
// adjust the list size for biased page cache if necessary
|
|
//if ( m_file->m_pc && m_allowPageCache &&
|
|
// m_file->m_pc->m_isOverriden &&
|
|
// m_fstate.m_bytesDone < m_list->m_listSize )
|
|
// m_list->m_listSize = m_fstate.m_bytesDone;
|
|
// bail if we don't do the 6 byte thing
|
|
if ( m_off == 0 ) return;
|
|
// posdb double compression?
|
|
if ( (m_rdbId == RDB_POSDB || m_rdbId == RDB2_POSDB2)
|
|
&& (p[0] & 0x04) ) {
|
|
// make it full
|
|
m_list->m_list -= 12;
|
|
m_list->m_listSize += 12;
|
|
p -= 12;
|
|
KEYSET(p,m_startKey,m_list->m_ks);
|
|
// clear the compression bits
|
|
*p &= 0xf9;
|
|
// let em know we shifted it so they can shift the hint offset
|
|
// up by 6
|
|
m_shifted = 12;
|
|
}
|
|
// if first key is already full (12 bytes) no need to do anything
|
|
else if ( m_list->isHalfBitOn ( p ) ) {
|
|
// otherwise, make it full
|
|
m_list->m_list -= 6;
|
|
m_list->m_listSize += 6;
|
|
p -= 6;
|
|
//*(key_t *)p = m_startKey;
|
|
KEYSET(p,m_startKey,m_list->m_ks);
|
|
// clear the half bit in case it is set
|
|
*p &= 0xfd;
|
|
// let em know we shifted it so they can shift the hint offset
|
|
// up by 6
|
|
m_shifted = 6; // true;
|
|
}
|
|
}
|