mirror of
https://github.com/privacore/open-source-search-engine.git
synced 2025-07-13 02:36:06 -04:00
doesn't appare to coredump anymore. bad commit - (debug and disabled code still left in)
This commit is contained in:
34
Msg2.cpp
34
Msg2.cpp
@ -40,6 +40,8 @@ Msg2::Msg2()
|
||||
m_msg5(0),
|
||||
m_avail(0),
|
||||
m_numLists(0),
|
||||
m_numReplies(0),
|
||||
m_numRequests(0),
|
||||
m_requestsBeingSubmitted(false)
|
||||
{
|
||||
set_signature();
|
||||
@ -52,10 +54,17 @@ Msg2::~Msg2() {
|
||||
|
||||
void Msg2::reset ( ) {
|
||||
verify_signature();
|
||||
if(!allRequestsReplied())
|
||||
gbshutdownLogicError();
|
||||
m_numLists = 0;
|
||||
m_whiteList = 0;
|
||||
m_p = 0;
|
||||
delete[] m_msg5;
|
||||
//if(m_msg5) {
|
||||
// for(int i=0; i<m_numLists+m_numWhitelists; i++)
|
||||
// (m_msg5+i)->~Msg5();
|
||||
// memset(m_msg5,-4,sizeof(*m_msg5)*(m_numLists+m_numWhitelists));
|
||||
//}
|
||||
m_msg5 = 0;
|
||||
delete[] m_avail;
|
||||
m_avail = 0;
|
||||
@ -73,6 +82,8 @@ void Msg2::incrementRequestCount() {
|
||||
|
||||
void Msg2::incrementReplyCount() {
|
||||
ScopedLock sl(m_mtxCounters);
|
||||
if(m_numReplies>=m_numRequests)
|
||||
abort();
|
||||
m_numReplies++;
|
||||
}
|
||||
|
||||
@ -412,28 +423,31 @@ bool Msg2::getLists ( ) {
|
||||
}
|
||||
|
||||
Msg5 *Msg2::getAvailMsg5 ( ) {
|
||||
verify_signature();
|
||||
ScopedLock sl(m_mtxMsg5);
|
||||
for ( int32_t i = 0; i < m_numLists+m_numWhitelists; i++ ) {
|
||||
if(m_avail[i]) {
|
||||
m_avail[i] = false;
|
||||
return &m_msg5[i];
|
||||
}
|
||||
//if(m_avail[i]!=0) abort();
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void Msg2::returnMsg5 ( Msg5 *msg5 ) {
|
||||
verify_signature();
|
||||
ScopedLock sl(m_mtxMsg5);
|
||||
for(int32_t i = 0; i < m_numLists+m_numWhitelists; i++) {
|
||||
if(&m_msg5[i] == msg5) {
|
||||
if(m_avail[i])
|
||||
gbshutdownLogicError();
|
||||
m_avail[i] = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
// wtf?
|
||||
gbshutdownLogicError();
|
||||
if(msg5 < m_msg5)
|
||||
gbshutdownLogicError();
|
||||
if(msg5 >= m_msg5+m_numLists+m_numWhitelists)
|
||||
gbshutdownLogicError();
|
||||
int32_t i = (int32_t)(msg5-m_msg5);
|
||||
if(m_avail[i])
|
||||
gbshutdownLogicError();
|
||||
msg5->reset();
|
||||
// m_avail[i] = true;
|
||||
verify_signature();
|
||||
}
|
||||
|
||||
|
||||
|
25
Msg3.cpp
25
Msg3.cpp
@ -54,11 +54,13 @@ void Msg3::reset() {
|
||||
void Msg3::incrementScansStarted() {
|
||||
ScopedLock sl(m_mtxScanCounters);
|
||||
m_numScansStarted++;
|
||||
if(m_numScansCompleted>=m_numScansStarted) gbshutdownLogicError();
|
||||
}
|
||||
|
||||
void Msg3::incrementScansCompleted() {
|
||||
ScopedLock sl(m_mtxScanCounters);
|
||||
m_numScansCompleted++;
|
||||
if(m_numScansCompleted>m_numScansStarted) gbshutdownLogicError();
|
||||
}
|
||||
|
||||
bool Msg3::areAllScansCompleted() const {
|
||||
@ -480,6 +482,7 @@ bool Msg3::readList ( char rdbId ,
|
||||
//m_endKey.n0 |= 0x01LL;
|
||||
// . now start reading/scanning the files
|
||||
// . our m_scans array starts at 0
|
||||
bool anyAsyncScans = false;
|
||||
for ( int32_t i = 0 ; i < m_numFileNums ; i++ ) {
|
||||
// get the page range
|
||||
//int32_t p1 = m_startpg [ i ];
|
||||
@ -697,7 +700,8 @@ bool Msg3::readList ( char rdbId ,
|
||||
startKey2, endKey2, m_ks, &m_lists[i], this, doneScanningWrapper,
|
||||
base->useHalfKeys(), m_rdbId, m_niceness, m_allowPageCache, m_hitDisk ) ;
|
||||
|
||||
// debug msg
|
||||
anyAsyncScans = anyAsyncScans || !done;
|
||||
// debug msg
|
||||
//fprintf(stderr,"Msg3:: reading %" PRId32" bytes from file #%" PRId32","
|
||||
// "done=%" PRId32",offset=%" PRId64",g_errno=%s,"
|
||||
// "startKey=n1=%" PRIu32",n0=%" PRIu64", "
|
||||
@ -726,16 +730,17 @@ bool Msg3::readList ( char rdbId ,
|
||||
{
|
||||
ScopedLock sl(m_mtxScanCounters);
|
||||
m_scansBeingSubmitted = false;
|
||||
//note: there is a weak race condition in this logic.
|
||||
}
|
||||
if ( !areAllScansCompleted() )
|
||||
return false;
|
||||
else {
|
||||
// . if all scans completed without blocking then wrap it up & ret true
|
||||
// . doneScanning may now block if it finds data corruption and must
|
||||
// get the list remotely
|
||||
verify_signature();
|
||||
return doneScanning();
|
||||
}
|
||||
|
||||
if(anyAsyncScans)
|
||||
return false; //not completed yet
|
||||
|
||||
// . if all scans completed without blocking then wrap it up & ret true
|
||||
// . doneScanning may now block if it finds data corruption and must
|
||||
// get the list remotely
|
||||
verify_signature();
|
||||
return doneScanning();
|
||||
}
|
||||
|
||||
|
||||
|
@ -217,6 +217,9 @@ static void sendReply ( UdpSlot *slot , Msg39 *msg39 , char *reply , int32_t rep
|
||||
if ( msg39 ) {
|
||||
mdelete ( msg39 , sizeof(Msg39) , "Msg39" );
|
||||
delete (msg39);
|
||||
//msg39->~Msg39();
|
||||
//memset(msg39,-3,sizeof(*msg39));
|
||||
//::operator delete((void*)msg39);
|
||||
}
|
||||
}
|
||||
|
||||
|
38
Msg5.cpp
38
Msg5.cpp
@ -17,13 +17,15 @@ static const int signature_init = 0x2c3a4f5d;
|
||||
int32_t g_numCorrupt = 0;
|
||||
|
||||
Msg5::Msg5()
|
||||
: m_isSingleUnmergedListGet(false)
|
||||
: m_rdbId(RDB_NONE),
|
||||
m_isSingleUnmergedListGet(false)
|
||||
{
|
||||
m_waitingForList = false;
|
||||
//m_waitingForMerge = false;
|
||||
m_numListPtrs = 0;
|
||||
set_signature();
|
||||
reset();
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
}
|
||||
|
||||
Msg5::~Msg5() {
|
||||
@ -182,6 +184,8 @@ bool Msg5::getList ( char rdbId ,
|
||||
bool isRealMerge ,
|
||||
bool allowPageCache ) {
|
||||
verify_signature();
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
|
||||
const char *startKey = static_cast<const char*>(startKey_);
|
||||
const char *endKey = static_cast<const char*>(endKey_);
|
||||
|
||||
@ -334,6 +338,7 @@ bool Msg5::getList ( char rdbId ,
|
||||
// . loops until m_minRecSizes is satisfied OR m_endKey is reached
|
||||
bool Msg5::readList ( ) {
|
||||
verify_signature();
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
|
||||
RdbBase *base = getRdbBase( m_rdbId, m_collnum );
|
||||
if ( ! base ) {
|
||||
@ -579,6 +584,7 @@ bool Msg5::readList ( ) {
|
||||
|
||||
// we may need to re-call getList
|
||||
verify_signature();
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
} while(needsRecall());
|
||||
// we did not block
|
||||
return true;
|
||||
@ -586,6 +592,7 @@ bool Msg5::readList ( ) {
|
||||
|
||||
bool Msg5::needsRecall ( ) {
|
||||
verify_signature();
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
|
||||
RdbBase *base = getRdbBase ( m_rdbId , m_collnum );
|
||||
// if collection was deleted from under us, base will be NULL
|
||||
@ -649,6 +656,9 @@ void Msg5::gotListWrapper(void *state) {
|
||||
|
||||
void Msg5::gotListWrapper() {
|
||||
verify_signature();
|
||||
//log("@@@@ Msg5(%p)::gotListWrapper()",this);
|
||||
if ( m_calledCallback ) abort();
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
// . this sets g_errno on error
|
||||
// . this will merge cache/tree and disk lists into m_list
|
||||
// . it will update m_newMinRecSizes
|
||||
@ -659,7 +669,7 @@ void Msg5::gotListWrapper() {
|
||||
// . only returns true if COMPLETELY done
|
||||
if ( needsRecall() && ! readList() ) return;
|
||||
// sanity check
|
||||
if ( m_calledCallback ) { g_process.shutdownAbort(true); }
|
||||
if ( m_calledCallback ) abort();
|
||||
// set it now
|
||||
m_calledCallback = 1;
|
||||
// we are no longer waiting for the list
|
||||
@ -688,6 +698,7 @@ bool Msg5::gotList ( ) {
|
||||
// . sets g_errno on error
|
||||
bool Msg5::gotList2 ( ) {
|
||||
verify_signature();
|
||||
//log("@@@ msg5(%p)::gotList2",this);
|
||||
// reset this
|
||||
m_startTime = 0LL;
|
||||
// return if g_errno is set
|
||||
@ -743,6 +754,7 @@ bool Msg5::gotList2 ( ) {
|
||||
//if ( KEYNEG( m_minEndKey) ) {g_process.shutdownAbort(true); }
|
||||
|
||||
QUICKPOLL(m_niceness);
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
|
||||
// . is treeList included?
|
||||
// . constrain treelist for the merge
|
||||
@ -890,6 +902,7 @@ bool Msg5::gotList2 ( ) {
|
||||
// super quick just for this purpose
|
||||
// . crap, rather than do that just deal with the negative recs
|
||||
// in the caller code... in this case Spider.cpp::gotDoledbList2()
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
if ( m_numListPtrs == 1 && m_list->isEmpty() &&
|
||||
// just do this logic for doledb now, it was causing us to
|
||||
// return search results whose keys were negative indexdb keys.
|
||||
@ -973,6 +986,7 @@ bool Msg5::gotList2 ( ) {
|
||||
|
||||
QUICKPOLL((m_niceness));
|
||||
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
if ( g_jobScheduler.submit(mergeListsWrapper, mergeDoneWrapper, this, thread_type_query_merge, m_niceness) ) {
|
||||
return false;
|
||||
}
|
||||
@ -985,12 +999,15 @@ bool Msg5::gotList2 ( ) {
|
||||
// clear g_errno because it really isn't a problem, we just block
|
||||
g_errno = 0;
|
||||
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
// repair any corruption
|
||||
repairLists();
|
||||
|
||||
// do it
|
||||
//log("@@@@ msg5(%p): gotlist2: blocking merge",this);
|
||||
mergeLists();
|
||||
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
// . add m_list to our cache if we should
|
||||
// . this returns false if blocked, true otherwise
|
||||
// . sets g_errno on error
|
||||
@ -1011,6 +1028,7 @@ void Msg5::mergeListsWrapper(void *state) {
|
||||
// repair any corruption
|
||||
that->repairLists();
|
||||
|
||||
//log("@@@ msg5(%p)::mergeListsWrapper()",that);
|
||||
// do the merge
|
||||
that->mergeLists();
|
||||
|
||||
@ -1033,7 +1051,8 @@ void Msg5::mergeDoneWrapper(void *state, job_exit_t exit_type) {
|
||||
void Msg5::mergeDone(job_exit_t /*exit_type*/) {
|
||||
verify_signature();
|
||||
|
||||
if(m_calledCallback) gbshutdownLogicError();
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
if(m_calledCallback) abort();
|
||||
|
||||
// we MAY be in a thread now
|
||||
|
||||
@ -1049,10 +1068,13 @@ void Msg5::mergeDone(job_exit_t /*exit_type*/) {
|
||||
|
||||
// . throw it back into the loop if necessary
|
||||
// . only returns true if COMPLETELY done
|
||||
if ( needsRecall() && ! readList() ) return;
|
||||
if ( needsRecall() ) {
|
||||
//log("@@@@ Msg5(%p)::mergeDone(): recalling readList()",this);
|
||||
if ( ! readList() ) return;
|
||||
}
|
||||
|
||||
// sanity check
|
||||
if ( m_calledCallback ) { g_process.shutdownAbort(true); }
|
||||
if ( m_calledCallback ) abort();
|
||||
|
||||
// we are no longer waiting for the list
|
||||
m_waitingForList = false;
|
||||
@ -1068,6 +1090,7 @@ void Msg5::mergeDone(job_exit_t /*exit_type*/) {
|
||||
// check lists in the thread
|
||||
void Msg5::repairLists() {
|
||||
verify_signature();
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
// assume none
|
||||
m_hadCorruption = false;
|
||||
// return if no need to
|
||||
@ -1139,7 +1162,9 @@ void Msg5::repairLists() {
|
||||
|
||||
void Msg5::mergeLists() {
|
||||
verify_signature();
|
||||
// log("@@@ Msg5(%p)::mergeLists",this);
|
||||
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
// . don't do any merge if this is true
|
||||
// . if our fetch of remote list fails, then we'll be called
|
||||
// again with this set to false
|
||||
@ -1220,6 +1245,7 @@ void Msg5::mergeLists() {
|
||||
bool Msg5::doneMerging ( ) {
|
||||
verify_signature();
|
||||
|
||||
if(m_rdbId==RDB_POSDB && !m_isSingleUnmergedListGet) abort();
|
||||
//m_waitingForMerge = false;
|
||||
|
||||
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
|
||||
@ -1283,6 +1309,7 @@ bool Msg5::doneMerging ( ) {
|
||||
// . merge the modified lists again
|
||||
// . this is not in a thread
|
||||
// . it should not block
|
||||
abort();
|
||||
mergeLists();
|
||||
}
|
||||
}
|
||||
@ -1614,6 +1641,7 @@ bool Msg5::gotRemoteList ( ) {
|
||||
g_errno = 0;
|
||||
// . we have the lists ready to merge
|
||||
// . hadCorruption should be false at this point
|
||||
log("@@@@ msg5(%p)::badList merge",this);
|
||||
mergeLists();
|
||||
// process the result
|
||||
return doneMerging();
|
||||
|
@ -2477,6 +2477,7 @@ void RdbBase::verifyDiskPageCache ( ) {
|
||||
}
|
||||
|
||||
bool RdbBase::verifyFileSharding ( ) {
|
||||
return true;
|
||||
|
||||
if ( m_rdb->m_isCollectionLess ) return true;
|
||||
|
||||
|
Reference in New Issue
Block a user