Merge branch 'master' into nomerge2

This commit is contained in:
Ai Lin Chia
2017-04-03 12:50:17 +02:00
14 changed files with 107 additions and 460 deletions

@ -134,7 +134,6 @@ Conf::Conf ( ) {
}
m_sendEmailAlerts = false;
m_delayNonCriticalEmailAlerts = false;
m_sendEmailAlertsToSysadmin = false;
m_sendEmailAlertsToEmail1 = false;
memset(m_email1MX, 0, sizeof(m_email1MX));
memset(m_email1Addr, 0, sizeof(m_email1Addr));

1
Conf.h

@ -233,7 +233,6 @@ class Conf {
bool m_sendEmailAlerts;
//should we delay when only 1 host goes down out of twins till 9 30 am?
bool m_delayNonCriticalEmailAlerts;
bool m_sendEmailAlertsToSysadmin;
bool m_sendEmailAlertsToEmail1;
char m_email1MX[MAX_MX_LEN];

@ -681,8 +681,6 @@ createFile:
m_hosts[i].m_lastPing = 0LL;
// and don't send emails on him until we got a good ping
m_hosts[i].m_emailCode = -2;
// so UdpServer.cpp knows if we are in g_hostdb or g_hostdb2
m_hosts[i].m_hostdb = this;
// reset these
m_hosts[i].m_pingInfo.m_flags = 0;
m_hosts[i].m_pingInfo.m_cpuUsage = 0.0;
@ -1352,7 +1350,6 @@ bool Hostdb::replaceHost ( int32_t origHostId, int32_t spareHostId ) {
oldHost->m_stripe = spareHost->m_stripe;
oldHost->m_isProxy = spareHost->m_isProxy;
oldHost->m_type = HT_SPARE;
oldHost->m_hostdb = spareHost->m_hostdb;
oldHost->m_inProgress1 = spareHost->m_inProgress1;
oldHost->m_inProgress2 = spareHost->m_inProgress2;

@ -188,12 +188,6 @@ public:
int32_t m_splitsDone;
int64_t m_splitTimes;
// . the hostdb to which this host belongs!
// . getHost(ip,port) will return a Host ptr from either
// g_hostdb or g_hostdb2, so UdpServer.cpp needs to know which it
// is from when making the UdpSlot key.
class Hostdb *m_hostdb;
// . used by Parms.cpp for broadcasting parm change requests
// . each parm change request has an id
// . this let's us know which id is in progress and what the last

@ -2378,7 +2378,6 @@ void Parms::setParm(char *THIS, Parm *m, int32_t array_index, const char *s, boo
sprintf(tmp,"%s: parm \"%s\" changed value",iptoa(ip0),m->m_title);
g_pingServer.sendEmail ( NULL , // Host ptr
tmp , // msg
true , // sendToAdmin
false , // oom?
true , // parm change?
true );// force it? even if disabled?
@ -5104,15 +5103,6 @@ void Parms::init ( ) {
m->m_flags = PF_HIDDEN ;
m++;
m->m_title = "send email alerts to sysadmin";
m->m_desc = "Sends to sysadmin@example.com.";
m->m_cgi = "seatsa";
simple_m_set(Conf,m_sendEmailAlertsToSysadmin);
m->m_def = "0";
m->m_flags = PF_HIDDEN | PF_NOSAVE;
m->m_page = PAGE_MASTER;
m++;
m->m_title = "ping spacer";
m->m_desc = "Wait this many milliseconds before pinging the next "
"host. Each host pings all other hosts in the network.";

@ -5,13 +5,10 @@
#include "UdpSlot.h"
#include "Conf.h"
#include "HttpServer.h"
#include "HttpMime.h"
#include "Proxy.h"
#include "Repair.h"
#include "Process.h"
#include "DailyMerge.h"
#include "Spider.h"
#include "SpiderColl.h"
#include "SpiderLoop.h"
#include "Collectiondb.h"
#include "Rebalance.h"
@ -291,7 +288,6 @@ void PingServer::pingHost ( Host *h , uint32_t ip , uint16_t port ) {
// the collection number we are daily merging (currently 2 bytes)
collnum_t cn = -1;
if ( g_dailyMerge.m_cr ) cn = g_dailyMerge.m_cr->m_collnum;
//*(collnum_t *)p = cn ; p += sizeof(collnum_t);
newPingInfo.m_dailyMergeCollnum = cn;
newPingInfo.m_hostId = me->m_hostId;
@ -349,7 +345,7 @@ void PingServer::pingHost ( Host *h , uint32_t ip , uint16_t port ) {
// only needs one notification.
static int32_t s_lastSentHostId = -1;
void gotReplyWrapperP ( void *state , UdpSlot *slot ) {
static void gotReplyWrapperP ( void *state , UdpSlot *slot ) {
// state is the host
Host *h = (Host *)state;
if( !h ) {
@ -460,7 +456,7 @@ void gotReplyWrapperP ( void *state , UdpSlot *slot ) {
g_errno = 0;
}
void gotReplyWrapperP3 ( void *state , UdpSlot *slot ) {
static void gotReplyWrapperP3 ( void *state , UdpSlot *slot ) {
// do not free this!
slot->m_sendBufAlloc = NULL;
// un-count it
@ -473,9 +469,8 @@ void gotReplyWrapperP3 ( void *state , UdpSlot *slot ) {
static int64_t s_deltaTime = 0;
// this may be called from a signal handler now...
void handleRequest11(UdpSlot *slot , int32_t /*niceness*/) {
static void handleRequest11(UdpSlot *slot , int32_t /*niceness*/) {
// get request
//char *request = slot->m_readBuf;
int32_t requestSize = slot->m_readBufSize;
char *request = slot->m_readBuf;
// get the ip/port of requester
@ -704,8 +699,6 @@ void handleRequest11(UdpSlot *slot , int32_t /*niceness*/) {
g_pingServer.m_bestPingDate = nowLocal;
// and the ping
g_pingServer.m_bestPing = g_pingServer.m_currentPing;
// clear this
//s_deltaTime = 0;
}
}
// all pings now deliver a timestamp of the sending host
@ -827,11 +820,9 @@ static void sleepWrapper ( int fd , void *state ) {
// . sets g_errno on error
bool PingServer::sendEmail ( Host *h ,
char *errmsg ,
bool sendToAdmin ,
bool oom ,
bool parmChanged ,
bool forceIt ,
int32_t mxIP ) { // 0 means none
bool forceIt) {
// clear this
g_errno = 0;
// not if we have outstanding requests
@ -942,17 +933,6 @@ bool PingServer::sendEmail ( Host *h ,
m_numRequests2 = 0;
m_numReplies2 = 0;
// sysadmin
if ( g_conf.m_sendEmailAlertsToSysadmin && sendToAdmin ) {
m_numRequests2++;
if ( ! sendAdminEmail ( h,
"sysadmin@example.com",
"sysadmin@example.com",
errmsg ,
"mail.example.com" ) )
status = false;
}
// set the max for sanity checking in gotdoc
m_maxRequests2 = m_numRequests2;
@ -965,29 +945,24 @@ bool PingServer::sendEmail ( Host *h ,
// between 10:00pm and 9:30am unless all the other twins of the
// dead host are also dead. Instead, wait till after 9:30 am if
// the host is still dead.
if ( delay && h && sendToAdmin ) {
if ( delay && h ) {
// always delay no matter the time now
bool delay = true;
if ( delay ) {
//check if the hosts twins are dead too
int32_t numTwins = 0;
Host *hosts = g_hostdb.getShard( h->m_shardNum,
&numTwins );
int32_t i = 0;
while ( i < numTwins ){
if ( !g_hostdb.isDead ( hosts[i].m_hostId ) )
break;
i++;
}
//if no twin is alive, emergency ! send email !
//if even one twin is alive, don't send now
if ( i == numTwins ) goto skipSleep;
return true;
//check if the hosts twins are dead too
int32_t numTwins = 0;
Host *hosts = g_hostdb.getShard( h->m_shardNum,
&numTwins );
int32_t i = 0;
while ( i < numTwins ){
if ( !g_hostdb.isDead ( hosts[i].m_hostId ) )
break;
i++;
}
//if no twin is alive, emergency ! send email !
//if even one twin is alive, don't send now
if ( i == numTwins ) goto skipSleep;
return true;
}
skipSleep:
@ -1003,19 +978,11 @@ bool PingServer::sendEmail ( Host *h ,
if ( parmChanged && ! g_conf.m_sendParmChangeAlertsToEmail3) e3=false;
if ( parmChanged && ! g_conf.m_sendParmChangeAlertsToEmail4) e4=false;
// point to provided IP as string
char *mxIPStr = NULL;
char ipBuf[64];
if ( mxIP ) {
sprintf(ipBuf,"%s",iptoa(mxIP));
mxIPStr = ipBuf;
}
if ( e1 ) {
m_numRequests2++;
m_maxRequests2++;
char *mxHost = g_conf.m_email1MX;
if ( mxIP ) mxHost = mxIPStr;
if ( ! sendAdminEmail ( h,
g_conf.m_email1From,
g_conf.m_email1Addr,
@ -1027,7 +994,6 @@ bool PingServer::sendEmail ( Host *h ,
m_numRequests2++;
m_maxRequests2++;
char *mxHost = g_conf.m_email2MX;
if ( mxIP ) mxHost = mxIPStr;
if ( ! sendAdminEmail ( h,
g_conf.m_email2From,
g_conf.m_email2Addr,
@ -1039,7 +1005,6 @@ bool PingServer::sendEmail ( Host *h ,
m_numRequests2++;
m_maxRequests2++;
char *mxHost = g_conf.m_email3MX;
if ( mxIP ) mxHost = mxIPStr;
if ( ! sendAdminEmail ( h,
g_conf.m_email3From,
g_conf.m_email3Addr,
@ -1051,7 +1016,6 @@ bool PingServer::sendEmail ( Host *h ,
m_numRequests2++;
m_maxRequests2++;
char *mxHost = g_conf.m_email4MX;
if ( mxIP ) mxHost = mxIPStr;
if ( ! sendAdminEmail ( h,
g_conf.m_email4From,
g_conf.m_email4Addr,
@ -1067,10 +1031,9 @@ bool PingServer::sendEmail ( Host *h ,
}
#include "HttpServer.h"
static void gotDocWrapper ( void *state , TcpSocket *ts ) ;
bool sendAdminEmail ( Host *h,
static bool sendAdminEmail ( Host *h,
const char *fromAddress,
const char *toAddress,
char *body ,
@ -1102,16 +1065,7 @@ bool sendAdminEmail ( Host *h,
// send the message
TcpServer *ts = g_httpServer.getTcp();
log ( LOG_WARN, "PingServer: Sending email to sysadmin:\n %s", buf );
//if ( !ts->sendMsg ( g_conf.m_smtpHost,
// strlen(g_conf.m_smtpHost),
// g_conf.m_smtpPort,
const char *ip = emailServIp; // gf39, mail server ip
// use backup if there
//char ipString[64];
//if ( g_emailServIPBackup ) {
// iptoa(ipString,g_emailMX1IPBackup);
// ip = ipString;
//}
const char *ip = emailServIp;
if ( !ts->sendMsg( ip, strlen( ip ), 25, buf, PAGER_BUF_SIZE, buffLen, buffLen, h, gotDocWrapper,
60 * 1000, 100 * 1024, 100 * 1024 ) ) {
return false;
@ -1138,9 +1092,6 @@ void gotDocWrapper ( void *state , TcpSocket *s ) {
}
Host *h = (Host *)state;
// if ( ! h ) { log("net: h is NULL in pingserver."); return; }
// don't let tcp server free the sendbuf, that's static
//s->m_sendBuf = NULL;
if ( g_errno ) {
if(h) {
log("net: Had error sending email to mobile for dead "
@ -1213,8 +1164,6 @@ bool PingServer::broadcastShutdownNotes ( bool sendEmailAlert ,
for ( int32_t i = 0 ; i < np ; i++ ) {
// get host
Host *h = g_hostdb.getProxy(i);
// skip ourselves
//if ( h->m_hostId == g_hostdb.m_hostId ) continue;
// count as sent
m_numRequests++;
// send it right now
@ -1237,10 +1186,6 @@ bool PingServer::broadcastShutdownNotes ( bool sendEmailAlert ,
if ( h->m_hostId == g_hostdb.m_hostId ) continue;
// count as sent
m_numRequests++;
// request will be freed by UdpServer
//char *r = (char *) mmalloc ( 4 , "PingServer" );
//if ( ! r ) return true;
//gbmemcpy ( r , (char *)(&h->m_hostId) , 4 );
// send it right now
if (g_udpServer.sendRequest(s_buf, 5, msg_type_11, h->m_ip, h->m_port, h->m_hostId, NULL, NULL, gotReplyWrapperP2, 3000, 0)) {
continue;
@ -1256,7 +1201,7 @@ bool PingServer::broadcastShutdownNotes ( bool sendEmailAlert ,
return false;
}
void gotReplyWrapperP2 ( void *state , UdpSlot *slot ) {
static void gotReplyWrapperP2 ( void *state , UdpSlot *slot ) {
// count it
g_pingServer.m_numReplies++;
// don't let udp server free our send buf, we own it
@ -1284,7 +1229,7 @@ void gotReplyWrapperP2 ( void *state , UdpSlot *slot ) {
// if its status changes from dead to alive or vice versa, we have to
// update g_hostdb.m_numHostsAlive. Dns.cpp and Msg17 will use this count
void updatePingTime ( Host *h , int32_t *pingPtr , int32_t tripTime ) {
static void updatePingTime ( Host *h , int32_t *pingPtr , int32_t tripTime ) {
// sanity check
if ( pingPtr != &h->m_ping && pingPtr != &h->m_pingShotgun ) {
@ -1331,11 +1276,9 @@ void updatePingTime ( Host *h , int32_t *pingPtr , int32_t tripTime ) {
void PingServer::sendEmailMsg ( int32_t *lastTimeStamp , const char *msg ) {
// leave if we already sent and alert within 5 mins
//static int32_t s_lasttime = 0;
int32_t now = getTimeGlobal();
if ( now - *lastTimeStamp < 5*60 ) return;
// prepare msg to send
//Host *h0 = g_hostdb.getHost ( 0 );
char msgbuf[1024];
snprintf(msgbuf, 1024,
"cluster %s : proxy: %s",
@ -1344,7 +1287,6 @@ void PingServer::sendEmailMsg ( int32_t *lastTimeStamp , const char *msg ) {
// send it, force it, so even if email alerts off, it sends it
g_pingServer.sendEmail ( NULL , // Host *h
msgbuf , // char *errmsg = NULL ,
true , // bool sendToAdmin = true ,
false , // bool oom = false ,
false , // bool parmChanged = false ,
true );// bool forceIt = false );

@ -38,11 +38,9 @@ class PingServer {
//bool sendEmail ( Host *h );
bool sendEmail ( Host *h ,
char *errmsg = NULL ,
bool sendToAdmin = true ,
bool oom = false ,
bool parmChanged = false ,
bool forceIt = false ,
int32_t mxIP = 0 );
bool forceIt = false);
int32_t m_i;

@ -608,11 +608,9 @@ bool Process::save2 ( ) {
// main process, not in a thread
disableTreeWrites( false );
bool useThreads = true;
// . tell all rdbs to save trees
// . will return true if no rdb tree needs a save
if (!saveRdbTrees(useThreads, false)) {
if (!saveRdbTrees(false)) {
return false;
}
@ -720,7 +718,7 @@ bool Process::shutdown2() {
// . tell all rdbs to save trees
// . will return true if no rdb tree needs a save
if (!saveRdbTrees(false, true)) {
if (!saveRdbTrees(true)) {
if (!m_urgent) {
return false;
}
@ -940,13 +938,14 @@ bool Process::isRdbMerging ( ) {
// . returns false if blocked, true otherwise
// . calls callback when done saving
bool Process::saveRdbTrees ( bool useThread , bool shuttingDown ) {
bool Process::saveRdbTrees(bool shuttingDown) {
// never if in read only mode
if ( g_conf.m_readOnlyMode ) return true;
// no thread if shutting down
if ( shuttingDown ) {
useThread = false;
bool useThread = (!shuttingDown);
if (shuttingDown) {
log("gb: trying to shutdown");
}
@ -973,8 +972,7 @@ bool Process::saveRdbTrees ( bool useThread , bool shuttingDown ) {
log( "gb: calling save tree for rdbid %i", ( int ) rdb->getRdbId() );
}
rdb->saveTree(useThread);
rdb->saveTreeIndex(useThread);
rdb->saveTree(useThread, NULL, NULL);
}
// . save waitingtrees for each collection, blocks.
@ -1001,8 +999,6 @@ bool Process::saveRdbTrees ( bool useThread , bool shuttingDown ) {
Rdb *rdb = m_rdbs[i];
//if ( rdb->needsSave ( ) ) return false;
// we disable the tree while saving so we can't really add recs
// to one rdb tree while saving, but for crawlbot
// we might have added or deleted collections.

@ -44,7 +44,7 @@ class Process {
void enableTreeWrites ( bool shuttingDown ) ;
bool isRdbDumping ( ) ;
bool isRdbMerging ( ) ;
bool saveRdbTrees(bool useThread, bool shuttingDown);
bool saveRdbTrees(bool shuttingDown);
bool saveRdbIndexes();
bool saveRdbMaps();
bool saveBlockingFiles1 ( ) ;

289
Rdb.cpp

@ -44,12 +44,9 @@ Rdb::Rdb ( ) {
m_dbnameLen = 0;
m_useIndexFile = false;
m_useTree = false;
m_closeState = NULL;
m_closeCallback = NULL;
m_minToMerge = 0;
m_dumpErrno = 0;
m_useHalfKeys = false;
m_urgent = false;
m_niceness = false;
m_dumpCollnum = 0;
m_inDumpLoop = false;
@ -69,12 +66,6 @@ void Rdb::reset ( ) {
m_tree.reset();
m_buckets.reset();
m_mem.reset();
m_isClosing = false;
m_isClosed = false;
m_isSaving = false;
m_isReallyClosing = false;
m_registered = false;
m_lastTime = 0LL;
}
Rdb::~Rdb ( ) {
@ -598,210 +589,47 @@ bool Rdb::delColl(const char *coll) {
return true;
}
// . returns false if blocked true otherwise
// . sets g_errno on error
// . CAUTION: only set urgent to true if we got a SIGSEGV or SIGPWR...
bool Rdb::close ( void *state , void (* callback)(void *state ), bool urgent , bool isReallyClosing ) {
// unregister in case already registered
if ( m_registered )
g_loop.unregisterSleepCallback (this,closeSleepWrapper);
// reset g_errno
g_errno = 0;
// return true if no RdbBases in m_bases[] to close
if ( getNumBases() <= 0 ) return true;
// return true if already closed
if ( m_isClosed ) return true;
// don't call more than once
if ( m_isSaving ) return true;
// set the m_isClosing flag in case we're waiting for a dump.
// then, when the dump is done, it will come here again
m_closeState = state;
m_closeCallback = callback;
m_urgent = urgent;
m_isReallyClosing = isReallyClosing;
if ( m_isReallyClosing ) m_isClosing = true;
// . don't call more than once
// . really only for when isReallyClosing is false... just a quick save
m_isSaving = true;
// suspend any merge permanently (not just for this rdb), we're exiting
if ( m_isReallyClosing ) {
g_merge.haltMerge();
}
// . allow dumps to complete unless we're urgent
// . if we're urgent, we'll end up with a half dumped file, which
// is ok now, since it should get its RdbMap auto-generated for it
// when we come back up again
if ( ! m_urgent && m_inDumpLoop ) { // m_dump.isDumping() ) {
m_isSaving = false;
const char *tt = "save";
if ( m_isReallyClosing ) tt = "close";
log(LOG_INFO, "db: Cannot %s %s until dump finishes.", tt, m_dbname);
return false;
}
// if a write thread is outstanding, and we exit now, we can end up
// freeing the buffer it is writing and it will core... and things
// won't be in sync with the map when it is saved below...
if ( m_isReallyClosing &&
// if we cored, we are urgent and need to make sure we save even
// if we are merging this rdb...
! m_urgent &&
g_merge.getRdbId() == m_rdbId &&
g_merge.isMerging() ) {
// do not spam this message
int64_t now = gettimeofdayInMilliseconds();
if ( now - m_lastTime >= 500 ) {
log(LOG_INFO,"db: Waiting for merge to finish last "
"write for %s.",m_dbname);
m_lastTime = now;
}
g_loop.registerSleepCallback (500,this,closeSleepWrapper);
m_registered = true;
// allow to be called again
m_isSaving = false;
return false;
}
// if we were merging to a file and are being closed urgently
// save the map! Also save the maps of the files we were merging
// in case the got their heads chopped (RdbMap::chopHead()) which
// we do to save disk space while merging.
// try to save the cache, may not save
if (m_isReallyClosing) {
// now loop over bases
for (int32_t i = 0; i < g_collectiondb.getNumRecs(); i++) {
// shut it down
RdbBase *base = getBase(i);
if (base) {
base->closeMaps(m_urgent);
base->closeIndexes(m_urgent);
}
}
}
// save it using a thread?
bool useThread = !(m_urgent || m_isReallyClosing);
// . returns false if blocked, true otherwise
// . sets g_errno on error
if (m_useTree) {
if (!m_tree.fastSave(getDir(), m_dbname, useThread, this, doneSavingWrapper)) {
return false;
}
} else {
if (!m_buckets.fastSave(getDir(), useThread, this, doneSavingWrapper)) {
return false;
}
}
// save index for tree as well
for (int32_t i = 0; i < g_collectiondb.getNumRecs(); i++) {
RdbBase *base = getBase(i);
if (base) {
base->saveTreeIndex();
}
}
// we saved it w/o blocking OR we had an g_errno
doneSaving();
return true;
}
void Rdb::closeSleepWrapper ( int fd , void *state ) {
Rdb *THIS = (Rdb *)state;
// sanity check
if ( ! THIS->m_isClosing ) { g_process.shutdownAbort(true); }
// continue closing, this returns false if blocked
if (!THIS->close(THIS->m_closeState, THIS->m_closeCallback, false, true)) {
return;
}
// otherwise, we call the callback
THIS->m_closeCallback ( THIS->m_closeState );
}
void Rdb::doneSavingWrapper ( void *state ) {
Rdb *THIS = (Rdb *)state;
THIS->doneSaving();
// . call the callback if any
// . this let's PageMaster.cpp know when we're closed
if (THIS->m_closeCallback) THIS->m_closeCallback(THIS->m_closeState);
}
void Rdb::doneSaving ( ) {
// bail if g_errno was set
if ( g_errno ) {
log(LOG_WARN, "db: Had error saving %s-saved.dat: %s.", m_dbname,mstrerror(g_errno));
g_errno = 0;
m_isSaving = false;
return;
}
// sanity
if ( m_dbname[0]=='\0' ) {
g_process.shutdownAbort(true);
}
// display any error, if any, otherwise prints "Success"
logf(LOG_INFO,"db: Successfully saved %s-saved.dat.", m_dbname);
// mdw ---> file doesn't save right, seems like it keeps the same length as the old file...
// . we're now closed
// . keep m_isClosing set to true so no one can add data
if ( m_isReallyClosing ) m_isClosed = true;
// call it again now
m_isSaving = false;
}
bool Rdb::isSavingTree() const {
if ( m_useTree ) return m_tree.isSaving();
return m_buckets.isSaving();
}
bool Rdb::saveTree ( bool useThread ) {
const char *dbn = m_dbname;
if ( ! dbn[0] ) {
dbn = "unknown";
bool Rdb::saveTree(bool useThread, void *state, void (*callback)(void *state)) {
// sanity check
if (isWritable()) {
// we need to make sure it's not writable before calling saveTree
gbshutdownLogicError();
}
bool result;
// . if RdbTree::m_needsSave is false this will return true
// . if RdbTree::m_isSaving is true this will return false
// . returns false if blocked, true otherwise
// . sets g_errno on error
if (m_useTree) {
if (m_tree.needsSave()) {
log( LOG_DEBUG, "db: saving tree %s", dbn );
}
return m_tree.fastSave(getDir(), m_dbname, useThread, NULL, NULL);
result = m_tree.fastSave(getDir(), m_dbname, useThread, state, callback);
} else {
if (m_buckets.needsSave()) {
log( LOG_DEBUG, "db: saving buckets %s", dbn );
}
return m_buckets.fastSave ( getDir(), useThread, NULL, NULL );
}
}
bool Rdb::saveTreeIndex(bool /* useThread */) {
if( !m_useIndexFile ) {
return true;
result = m_buckets.fastSave(getDir(), useThread, state, callback);
}
// now loop over bases
for ( int32_t i = 0 ; i < getNumBases() ; i++ ) {
CollectionRec *cr = g_collectiondb.getRec(i);
if ( ! cr ) {
continue;
}
if (m_useIndexFile) {
// now loop over bases
for (int32_t i = 0; i < getNumBases(); i++) {
CollectionRec *cr = g_collectiondb.getRec(i);
if (!cr) {
continue;
}
// if swapped out, this will be NULL, so skip it
RdbBase *base = cr->getBase(m_rdbId);
if (base) {
base->saveTreeIndex();
// if swapped out, this will be NULL, so skip it
RdbBase *base = cr->getBase(m_rdbId);
if (base) {
base->saveTreeIndex();
}
}
}
return true;
return result;
}
bool Rdb::saveIndexes() {
@ -1287,16 +1115,7 @@ void Rdb::doneDumping ( ) {
//for ( int32_t i = 0 ; i < getNumBases() ; i++ ) {
// if ( m_bases[i] ) m_bases[i]->doneDumping();
//}
// if we're closing shop then return
if ( m_isClosing ) {
// continue closing, this returns false if blocked
if (!close(m_closeState, m_closeCallback, false, true)) {
return;
}
// otherwise, we call the callback
m_closeCallback ( m_closeState );
return;
}
// try merge for all, first one that needs it will do it, preventing
// the rest from doing it
// don't attempt merge if we're niceness 0
@ -1553,13 +1372,7 @@ bool Rdb::needsDump() const {
}
}
if (m_rdbId != RDB_DOLEDB) {
return false;
}
// dump doledb if a ton of negative recs...
// otherwise, no need to dump doledb just yet
return (m_tree.getNumNegativeKeys() > 50000);
return false;
}
bool Rdb::hasRoom(int32_t totalRecs, int32_t totalDataSize) const {
@ -1637,12 +1450,12 @@ bool Rdb::hasRoom(RdbList *list) {
bool Rdb::canAdd() const {
if(!isWritable())
if (!isWritable()) {
return false;
if(m_isClosing)
return false;
if(m_dump.isDumping()) //this is more conservative than the actual check in addRecord()
}
if (isInDumpLoop()) {
return false;
}
return true;
}
@ -1662,46 +1475,22 @@ bool Rdb::addRecord(collnum_t collnum, const char *key, const char *data, int32_
return false;
}
// we can also use this logic to avoid adding to the waiting tree
// because Process.cpp locks all the trees up at once and unlocks
// them all at once as well. so since SpiderRequests are added to
// spiderdb and then alter the waiting tree, this statement should
// protect us.
if (!isWritable()) {
// don't continue if we're not allowed to add to Rdb
if (!canAdd()) {
g_errno = ETRYAGAIN;
logTrace(g_conf.m_logTraceRdb, "END. %s: Not writable. Returning false", m_dbname);
return false;
}
// bail if we're closing
if (m_isClosing) {
g_errno = ECLOSING;
logTrace(g_conf.m_logTraceRdb, "END. %s: Closing. Returning false", m_dbname);
logTrace(g_conf.m_logTraceRdb, "END. %s: Unable to add. Returning false", m_dbname);
return false;
}
// sanity check
if (KEYNEG(key)) {
if ((dataSize > 0 && data)) {
log( LOG_WARN, "db: Got data for a negative key." );
g_process.shutdownAbort(true);
log(LOG_LOGIC, "db: Got data for a negative key.");
gbshutdownLogicError();
}
} else if ( m_fixedDataSize >= 0 && dataSize != m_fixedDataSize ) {
// sanity check
g_errno = EBADENGINEER;
log(LOG_LOGIC,"db: addRecord: DataSize is %" PRId32" should be %" PRId32, dataSize,m_fixedDataSize );
g_process.shutdownAbort(true);
}
// do not add if range being dumped at all because when the
// dump completes it calls deleteList() and removes the nodes from
// the tree, so if you were overriding a node currently being dumped
// we would lose it.
if ( m_dump.isDumping()) {
// tell caller to wait and try again later
g_errno = ETRYAGAIN;
logTrace(g_conf.m_logTraceRdb, "END. %s: Dumping. Returning false", m_dbname);
return false;
log(LOG_LOGIC, "db: addRecord: DataSize is %" PRId32" should be %" PRId32, dataSize, m_fixedDataSize);
gbshutdownLogicError();
}
// copy the data before adding if we don't already own it
@ -1709,10 +1498,8 @@ bool Rdb::addRecord(collnum_t collnum, const char *key, const char *data, int32_
if (data) {
// sanity check
if ( m_fixedDataSize == 0 && dataSize > 0 ) {
g_errno = EBADENGINEER;
log(LOG_LOGIC,"db: addRecord: Data is present. Should not be");
logTrace(g_conf.m_logTraceRdb, "END. %s: Data is present. Returning false", m_dbname);
return false;
log(LOG_LOGIC, "db: addRecord: Data is present. Should not be");
gbshutdownLogicError();
}
dataCopy = (char *) m_mem.dupData(data, dataSize);

43
Rdb.h

@ -67,21 +67,7 @@ public:
bool useHalfKeys ,
char keySize,
bool useIndexFile);
// . frees up all the memory and closes all files
// . suspends any current merge (saves state to disk)
// . calls reset() for each file
// . will cause any open map files to dump
// . will dump tables to backup or store
// . calls close on each file
// . returns false if blocked, true otherwise
// . sets errno on error
bool close ( void *state ,
void (* callback)(void *state ) ,
bool urgent ,
bool exitAfterClosing );
// used by PageMaster.cpp to check to see if all rdb's are closed yet
bool isClosed() const { return m_isClosed; }
bool needsSave() const;
// . returns false and sets g_errno on error
@ -227,8 +213,7 @@ public:
bool isSavingTree() const;
bool saveTree(bool useThread);
bool saveTreeIndex(bool useThread);
bool saveTree(bool useThread, void *state, void (*callback)(void *state));
bool saveIndexes();
bool saveMaps();
@ -254,9 +239,6 @@ public:
// rebuilt files, pointed to by rdb2.
bool updateToRebuildFiles ( Rdb *rdb2 , char *coll ) ;
static void doneSavingWrapper(void *state);
static void closeSleepWrapper(int fd, void *state);
static void doneDumpingCollWrapper(void *state);
private:
@ -270,9 +252,6 @@ private:
// get the directory name where this rdb stores its files
const char *getDir() const { return g_hostdb.m_dir; }
// . called when done saving a tree to disk (keys not ordered)
void doneSaving ( ) ;
bool dumpCollLoop ( ) ;
// . called when we've dumped the tree to disk w/ keys ordered
@ -305,13 +284,6 @@ private:
std::atomic<int32_t> m_numMergesOut;
bool m_isClosing;
bool m_isClosed;
// this callback called when close is complete
void *m_closeState;
void (* m_closeCallback) (void *state );
int32_t m_minToMerge; // need at least this many files b4 merging
int32_t m_dumpErrno;
@ -339,26 +311,13 @@ private:
// . currently exclusively used by indexdb
bool m_useHalfKeys;
// are we saving the tree urgently? like we cored...
bool m_urgent;
// after saving the tree in call to Rdb::close() should the tree
// remain closed to writes?
bool m_isReallyClosing;
bool m_niceness;
// so only one save thread launches at a time
bool m_isSaving;
char m_treeAllocName[64]; //for memory used m_tree/m_buckets
char m_memAllocName[64]; //for memory used by m_mem
collnum_t m_dumpCollnum;
bool m_registered;
int64_t m_lastTime;
// set to true when dumping tree so RdbMem does not use the memory
// being dumped to hold newly added records
bool m_inDumpLoop;

@ -51,32 +51,6 @@ static Rdb **getSecondaryRdbs ( int32_t *nsr ) {
return s_rdbs;
}
static Rdb **getAllRdbs ( int32_t *nsr ) {
static Rdb *s_rdbs[50];
static int32_t s_nsr = 0;
static bool s_init = false;
if ( ! s_init ) {
s_init = true;
s_nsr = 0;
s_rdbs[s_nsr++] = g_titledb.getRdb ();
s_rdbs[s_nsr++] = g_posdb.getRdb ();
s_rdbs[s_nsr++] = g_spiderdb.getRdb ();
s_rdbs[s_nsr++] = g_clusterdb.getRdb ();
s_rdbs[s_nsr++] = g_linkdb.getRdb ();
s_rdbs[s_nsr++] = g_tagdb.getRdb ();
s_rdbs[s_nsr++] = g_titledb2.getRdb ();
s_rdbs[s_nsr++] = g_posdb2.getRdb ();
s_rdbs[s_nsr++] = g_spiderdb2.getRdb ();
s_rdbs[s_nsr++] = g_clusterdb2.getRdb ();
s_rdbs[s_nsr++] = g_linkdb2.getRdb ();
s_rdbs[s_nsr++] = g_tagdb2.getRdb ();
}
*nsr = s_nsr;
return s_rdbs;
}
Repair::Repair() {
// Coverity
m_docId = 0;
@ -1689,46 +1663,58 @@ bool Repair::saveAllRdbs() {
if (s_savingAll) {
return false;
}
// set it
s_savingAll = true;
// TODO: why is this called like 100x per second when a merge is
// going on? why don't we sleep longer in between?
int32_t nsr;
Rdb **rdbs = getAllRdbs ( &nsr );
for ( int32_t i = 0 ; i < nsr ; i++ ) {
Rdb *rdb = rdbs[i];
// skip if not initialized
if ( ! rdb->isInitialized() ) continue;
for (int32_t i = 0; i < g_process.m_numRdbs; i++) {
Rdb *rdb = g_process.m_rdbs[i];
if (!rdb->isInitialized() || rdb->getRdbId() == RDB_DOLEDB) {
continue;
}
// save/close it
rdb->close(NULL,doneSavingRdb,false,false);
rdb->disableWrites();
rdb->saveTree(true, rdb, doneSavingRdb);
}
// return if still waiting on one to close
if ( anyRdbNeedsSave() ) return false;
if (anyRdbNeedsSave()) {
return false;
}
// all done
return true;
}
// return false if one or more is still not closed yet
bool Repair::anyRdbNeedsSave() {
int32_t count = 0;
int32_t nsr;
Rdb **rdbs = getAllRdbs ( &nsr );
for ( int32_t i = 0 ; i < nsr ; i++ ) {
Rdb *rdb = rdbs[i];
if(rdb->needsSave())
count++;
for (int32_t i = 0; i < g_process.m_numRdbs; i++) {
Rdb *rdb = g_process.m_rdbs[i];
if (rdb->getRdbId() == RDB_DOLEDB) {
continue;
}
if (rdb->needsSave()) {
return true;
}
}
if ( count ) return true;
s_savingAll = false;
return false;
}
// returns false if waiting on some to save
void Repair::doneSavingRdb(void *state) {
if ( ! anyRdbNeedsSave() ) return;
Rdb *rdb = static_cast<Rdb*>(state);
rdb->enableWrites();
if (!anyRdbNeedsSave()) {
return;
}
// all done
s_savingAll = false;
}

@ -576,7 +576,7 @@ int32_t UdpSlot::sendDatagramOrAck ( int sock, bool allowResends, int64_t now ){
// update stats, just put them all in g_udpServer
g_udpServer.m_eth0PacketsOut += 1;
g_udpServer.m_eth0BytesOut += dgramSize;
} else if ( m_host && m_host->m_hostdb == &g_hostdb ) {
} else if ( m_host ) {
// don't fuck with it if we are ping though, because that needs to specify the exact ip!
if ( m_msgType == msg_type_11 ) {
to.sin_addr.s_addr = ip;
@ -677,7 +677,7 @@ int32_t UdpSlot::sendDatagramOrAck ( int sock, bool allowResends, int64_t now ){
if ( ! m_host ) eth = 0;
//if ( m_host->m_ip == (uint32_t)ip ) eth = 0;
int32_t hid = -1;
if ( m_host && m_host->m_hostdb == &g_hostdb )
if ( m_host )
hid = m_host->m_hostId;
int32_t kk = 0; if ( m_callback ) kk = 1;
@ -945,7 +945,7 @@ int32_t UdpSlot::sendAck ( int sock , int64_t now ,
//#ifdef _UDPDEBUG_
int32_t kk = 0; if ( m_callback ) kk = 1;
int32_t hid = -1;
if ( m_host && m_host->m_hostdb == &g_hostdb )
if ( m_host )
hid = m_host->m_hostId;
logf(LOG_DEBUG,
"udp: sent ACK "
@ -1066,7 +1066,7 @@ bool UdpSlot::readDatagramOrAck ( const void *readBuffer_,
// log msg
if ( g_conf.m_logDebugUdp ) {
int32_t hid = -1;
if ( m_host && m_host->m_hostdb == &g_hostdb )
if ( m_host )
hid = m_host->m_hostId;
int32_t kk = 0; if ( m_callback ) kk = 1;
log(LOG_DEBUG,
@ -1458,7 +1458,7 @@ void UdpSlot::readAck ( int32_t dgramNum, int64_t now ) {
//#ifdef _UDPDEBUG_
int32_t kk = 0; if ( m_callback ) kk = 1;
int32_t hid = -1;
if ( m_host && m_host->m_hostdb == &g_hostdb )
if ( m_host )
hid = m_host->m_hostId;
log(LOG_DEBUG,
"udp: Read ACK "

@ -5,7 +5,7 @@
#include "Conf.h"
static void saveAndReloadPosdbBucket() {
g_posdb.getRdb()->saveTree(false);
g_posdb.getRdb()->saveTree(false, NULL, NULL);
g_posdb.getRdb()->getBuckets()->clear();
g_posdb.getRdb()->loadTree();
if (g_posdb.getRdb()->isUseIndexFile()) {