would block when deleting or resetting

a collection when the rdb tree is saving to
disk. keeps retrying every 100ms since it
modifies the tree.
This commit is contained in:
Matt Wells 2013-10-30 13:12:46 -07:00
parent b83dd59913
commit d0ddfb7d7d
7 changed files with 154 additions and 44 deletions

@ -521,6 +521,26 @@ bool Collectiondb::isAdmin ( HttpRequest *r , TcpSocket *s ) {
//return cr->hasPermission ( r , s );
}
void savingCheckWrapper1 ( int fd , void *state ) {
WaitEntry *we = (WaitEntry *)state;
// if it blocked again i guess tree is still saving
if ( ! g_collectiondb.resetColl ( we->m_coll , we ) ) return;
// unregister too
g_loop.unregisterSleepCallback ( state,savingCheckWrapper1 );
// all done
we->m_callback ( we->m_state );
}
void savingCheckWrapper2 ( int fd , void *state ) {
WaitEntry *we = (WaitEntry *)state;
// if it blocked again i guess tree is still saving
if ( ! g_collectiondb.deleteRec ( we->m_coll , we ) ) return;
// unregister too
g_loop.unregisterSleepCallback ( state,savingCheckWrapper2 );
// all done
we->m_callback ( we->m_state );
}
// delete all records checked in the list
bool Collectiondb::deleteRecs ( HttpRequest *r ) {
for ( long i = 0 ; i < r->getNumFields() ; i++ ) {
@ -529,16 +549,17 @@ bool Collectiondb::deleteRecs ( HttpRequest *r ) {
char *coll = f + 3;
//if ( ! is_digit ( f[3] ) ) continue;
//long h = atol ( f + 3 );
deleteRec ( coll );
deleteRec ( coll , NULL );
}
return true;
}
// . delete a collection
// . this uses blocking unlinks, may make non-blocking later
bool Collectiondb::deleteRec ( char *coll , bool deleteTurkdb ) {
// . returns false if blocked, true otherwise
bool Collectiondb::deleteRec ( char *coll , WaitEntry *we ) {
// force on for now
deleteTurkdb = true;
//deleteTurkdb = true;
// no spiders can be out. they may be referencing the CollectionRec
// in XmlDoc.cpp... quite likely.
//if ( g_conf.m_spideringEnabled ||
@ -550,23 +571,44 @@ bool Collectiondb::deleteRec ( char *coll , bool deleteTurkdb ) {
// do not allow this if in repair mode
if ( g_repairMode > 0 ) {
log("admin: Can not delete collection while in repair mode.");
return false;
g_errno = EBADENGINEER;
return true;
}
// ensure it's not NULL
if ( ! coll ) {
log(LOG_LOGIC,"admin: Collection name to delete is NULL.");
return false;
g_errno = ENOTFOUND;
return true;
}
// find the rec for this collection
collnum_t collnum = getCollnum ( coll );
// bitch if not found
if ( collnum < 0 ) {
g_errno = ENOTFOUND;
return log(LOG_LOGIC,"admin: Collection \"%s\" not found, "
"delete failed.",coll);
log(LOG_LOGIC,"admin: Collection \"%s\" not found, "
"delete failed.",coll);
return true;
}
CollectionRec *cr = m_recs [ collnum ];
if ( ! cr ) return log("admin: Collection id problem. Delete failed.");
if ( ! cr ) {
log("admin: Collection id problem. Delete failed.");
g_errno = ENOTFOUND;
return true;
}
if ( g_process.isAnyTreeSaving() ) {
// note it
log("admin: tree is saving. waiting2.");
// try again in 100ms
if ( ! g_loop.registerSleepCallback ( 100 ,
we ,
savingCheckWrapper2 ,
0 ) ) // niceness
return true;
// all done
return false;
}
// spiders off
//if ( cr->m_spiderColl &&
// cr->m_spiderColl->getTotalOutstandingSpiders() > 0 ) {
@ -650,15 +692,16 @@ bool Collectiondb::deleteRec ( char *coll , bool deleteTurkdb ) {
return true;
}
#include "PageTurk.h"
//#include "PageTurk.h"
// . reset a collection
// . returns false if failed
bool Collectiondb::resetColl ( char *coll , bool resetTurkdb ) {
// . returns false if blocked and will call callback
bool Collectiondb::resetColl ( char *coll , WaitEntry *we ) {
// ensure it's not NULL
if ( ! coll ) {
log(LOG_LOGIC,"admin: Collection name to delete is NULL.");
return false;
g_errno = ENOCOLLREC;
return true;
}
// now must be "test" only for now
//if ( strcmp(coll,"test") ) { char *xx=NULL;*xx=0; }
@ -673,11 +716,29 @@ bool Collectiondb::resetColl ( char *coll , bool resetTurkdb ) {
// do not allow this if in repair mode
if ( g_repairMode > 0 ) {
log("admin: Can not delete collection while in repair mode.");
return false;
g_errno = EBADENGINEER;
return true;
}
log("admin: resetting coll \"%s\"",coll);
// CAUTION: tree might be in the middle of saving
// we deal with this in Process.cpp now
if ( g_process.isAnyTreeSaving() ) {
// note it
log("admin: tree is saving. waiting1.");
// try again in 100ms
if ( ! g_loop.registerSleepCallback ( 100 ,
we ,
savingCheckWrapper1 ,
0 ) ) // niceness
return true;
// all done
return false;
}
// get the CollectionRec for "test"
CollectionRec *cr = getRec ( coll ); // "test" );
@ -692,8 +753,10 @@ bool Collectiondb::resetColl ( char *coll , bool resetTurkdb ) {
long have = m_recPtrBuf.getLength();
need -= have;
// true here means to clear the new space to zeroes
if ( ! m_recPtrBuf.reserve ( need ,NULL, true ) )
return log("admin: error growing rec ptr buf2.");
if ( ! m_recPtrBuf.reserve ( need ,NULL, true ) ) {
log("admin: error growing rec ptr buf2.");
return true;
}
// re-ref it in case it is different
m_recs = (CollectionRec **)m_recPtrBuf.getBufStart();
// ensure last is NULL
@ -799,12 +862,11 @@ bool Collectiondb::resetColl ( char *coll , bool resetTurkdb ) {
// right now we #define collnum_t short
if ( m_numRecs > 0x7fff ) { char *xx=NULL;*xx=0; }
// CAUTION: tree might be in the middle of saving
// we deal with this in Process.cpp now
// . unlink all the *.dat and *.map files for this coll in its subdir
// . remove all recs from this collnum from m_tree/m_buckets
// . updates RdbBase::m_collnum
// . so for the tree it just needs to mark the old collnum recs
// with a collnum -1 in case it is saving...
g_posdb.getRdb()->resetColl ( oldCollnum , newCollnum );
g_titledb.getRdb()->resetColl ( oldCollnum , newCollnum );
g_tagdb.getRdb()->resetColl ( oldCollnum , newCollnum );

@ -13,6 +13,13 @@
#include "SafeBuf.h"
class WaitEntry {
public:
void (* m_callback) (void *state);
void *m_state;
char *m_coll;
};
class Collectiondb {
public:
@ -80,11 +87,14 @@ class Collectiondb {
bool addRec ( char *coll , char *cc , long cclen , bool isNew ,
collnum_t collnum , bool isDump , // = false );
bool saveRec ); // = true
bool deleteRec ( char *coll , bool deleteTurkdb = true );
// returns false if blocked, true otherwise.
bool deleteRec ( char *coll , WaitEntry *we );
//bool updateRec ( CollectionRec *newrec );
bool deleteRecs ( class HttpRequest *r ) ;
bool resetColl ( char *coll , bool resetTurkdb = true ) ;
// returns false if blocked, true otherwise.
bool resetColl ( char *coll , WaitEntry *we );
// . keep up to 128 of them, these reference into m_list
// . COllectionRec now includes m_needsSave and m_lastUpdateTime

@ -36,7 +36,7 @@ OBJS = Tfndb.o UdpSlot.o \
Stats.o BigFile.o Msg17.o \
Speller.o DiskPageCache.o \
PingServer.o StopWords.o TopTree.o \
Parms.o Pages.o Msg28.o Msg30.o \
Parms.o Pages.o Msg28.o \
Unicode.o iana_charset.o Iso8859.o \
SearchInput.o \
Categories.o Msg2a.o PageCatdb.o PageDirectory.o \

@ -743,6 +743,8 @@ public:
HttpRequest m_hr;
Msg7 m_msg7;
WaitEntry m_waitEntry;
bool m_needsMime;
char m_rdbId;
bool m_downloadJSON;
@ -2049,6 +2051,14 @@ char *getInputString ( char *string , HttpRequest *hr , Json *JS ) {
}
*/
void collOpDoneWrapper ( void *state ) {
StateCD *st = (StateCD *)state;
TcpSocket *socket = st->m_socket;
delete st;
mdelete ( st , sizeof(StateCD) , "stcd" );
g_httpServer.sendDynamicPage (socket,"OK",2);
}
// . when we receive the request from john we call broadcastRequest() from
// Pages.cpp. then msg28 sends this replay with a &cast=0 appended to it
// to every host in the network. then when msg28 gets back replies from all
@ -2258,6 +2268,21 @@ bool sendPageCrawlbot ( TcpSocket *socket , HttpRequest *hr ) {
return sendErrorReply2 (socket,fmt,msg);
}
// make a new state
StateCD *st;
try { st = new (StateCD); }
catch ( ... ) {
return sendErrorReply2 ( socket , fmt , mstrerror(g_errno));
}
mnew ( st , sizeof(StateCD), "statecd");
// copy crap
st->m_hr.copy ( hr );
st->m_socket = socket;
st->m_fmt = fmt;
if ( cr ) st->m_collnum = cr->m_collnum;
else st->m_collnum = -1;
// . if this is a cast=0 request it is received by all hosts in the
// network
// . this code is the only code run by EVERY host in the network
@ -2269,6 +2294,11 @@ bool sendPageCrawlbot ( TcpSocket *socket , HttpRequest *hr ) {
// hopefully it will still be set
// . but we should take care of add/del/reset coll here.
if ( cast == 0 ) {
// add a new collection by default
if ( ! cr && name && name[0] )
cr = addNewDiffbotColl ( collName , token , name );
// also support the good 'ole html form interface
if ( cr ) setSpiderParmsFromHtmlRequest ( socket , hr , cr );
// . we can't sync these operations on a dead host when it
// comes back up yet. we can only sync parms, not collection
// adds/deletes/resets
@ -2289,25 +2319,39 @@ bool sendPageCrawlbot ( TcpSocket *socket , HttpRequest *hr ) {
// return sendErrorReply2(socket,fmt,msg);
// }
//}
// set this up
WaitEntry *we = &st->m_waitEntry;
we->m_state = st;
we->m_callback = collOpDoneWrapper;
we->m_coll = collName;
if ( delColl ) {
// delete collection name
g_collectiondb.deleteRec ( collName , true );
// this can block if tree is saving, it has to wait
// for tree save to complete before removing old
// collnum recs from tree
if ( ! g_collectiondb.deleteRec ( collName , we ) )
return false;
// all done
return g_httpServer.sendDynamicPage (socket,"OK",2);
}
if ( resetColl ) {
//cr = g_collectiondb.getRec ( resetColl );
g_collectiondb.resetColl ( collName );//resetColl );
// this can block if tree is saving, it has to wait
// for tree save to complete before removing old
// collnum recs from tree
if ( ! g_collectiondb.resetColl ( collName , we ) )
return false;
// it is a NEW ptr now!
cr = g_collectiondb.getRec( collName );
// if reset from crawlbot api page then enable spiders
// to avoid user confusion
if ( cr ) cr->m_spideringEnabled = 1;
// all done
return g_httpServer.sendDynamicPage (socket,"OK",2);
}
// add a new collection by default
if ( ! cr && name && name[0] )
cr = addNewDiffbotColl ( collName , token , name );
// problem?
if ( ! cr ) {
// send back error
@ -2319,8 +2363,6 @@ bool sendPageCrawlbot ( TcpSocket *socket , HttpRequest *hr ) {
}
// this will set the the collection parms from json
//setSpiderParmsFromJSONPost ( socket , hr , cr , &JS );
// also support the good 'ole html form interface
setSpiderParmsFromHtmlRequest ( socket , hr , cr );
// this is a cast, so just return simple response
return g_httpServer.sendDynamicPage (socket,"OK",2);
}
@ -2348,20 +2390,6 @@ bool sendPageCrawlbot ( TcpSocket *socket , HttpRequest *hr ) {
//char *spots = hr->getString("spots",NULL,NULL);
//char *seeds = hr->getString("seeds",NULL,NULL);
// make a new state
StateCD *st;
try { st = new (StateCD); }
catch ( ... ) {
return sendErrorReply2 ( socket , fmt , mstrerror(g_errno));
}
mnew ( st , sizeof(StateCD), "statecd");
// copy crap
st->m_hr.copy ( hr );
st->m_socket = socket;
st->m_fmt = fmt;
st->m_collnum = cr->m_collnum;
if ( seeds )
log("crawlbot: adding seeds=\"%s\"",seeds);

@ -495,6 +495,15 @@ bool Process::init ( ) {
return true;
}
bool Process::isAnyTreeSaving ( ) {
for ( long i = 0 ; i < m_numRdbs ; i++ ) {
Rdb *rdb = m_rdbs[i];
if ( rdb->m_isCollectionLess ) continue;
if ( rdb->isSavingTree() ) return true;
}
return false;
}
void powerMonitorWrapper ( int fd , void *state ) {
if ( g_isYippy ) return;

@ -32,6 +32,7 @@ class Process {
Process ( ) ;
bool init ( ) ;
bool isAnyTreeSaving ( ) ;
bool save2 ( ) ;
bool shutdown2 ( ) ;
void disableTreeWrites ( ) ;

@ -80,7 +80,7 @@
#include "Msg35.h"
//#include "Msg24.h"
#include "Msg28.h"
#include "Msg30.h"
//#include "Msg30.h"
//#include "MsgB.h"
#include "Msg3e.h"
//#include "Msg50.h"
@ -4855,7 +4855,7 @@ bool registerMsgHandlers1(){
//Msg23 msg23; if ( ! msg23.registerHandler () ) return false;
Msg2a msg2a; if ( ! msg2a.registerHandler () ) return false;
Msg36 msg36; if ( ! msg36.registerHandler () ) return false;
Msg30 msg30; if ( ! msg30.registerHandler () ) return false;
//Msg30 msg30; if ( ! msg30.registerHandler () ) return false;
MsgC msgC ; if ( ! msgC.registerHandler () ) return false;
if ( ! Msg22::registerHandler() ) return false;