Merge branch 'master' into nomerge2

This commit is contained in:
Ivan Skytte Jørgensen
2016-08-04 11:30:48 +02:00
32 changed files with 329 additions and 1106 deletions

@ -1325,11 +1325,11 @@ struct UnlinkRenameState {
*
* @param newBaseFilename non-NULL for renames, NULL for unlinks
* @param part part num to unlink, -1 for all (or rename)
* @param useThread
* @param callback
* @param state
* @param useThread should thread be used
* @param callback function to call when operation is done
* @param state state to be passed to callback function
* @param newBaseFilenameDir if NULL, defaults to m_dir, the current dir in which this file already exists
* @param force
* @param force should rename be done even if destination file exists
* @return false if blocked, true otherwise
*/
// . sets g_errno on error

@ -215,9 +215,6 @@ bool Conf::init ( char *dir ) { // , int32_t hostId ) {
// and this, in case you forgot to turn it off
if ( g_conf.m_isLive ) g_conf.m_doConsistencyTesting = false;
// and this on
g_conf.m_indexDeletes = true;
// this off
g_conf.m_repairingEnabled = false;

9
Conf.h

@ -225,9 +225,6 @@ class Conf {
float m_synonymWeight;
// use sendmail to forward emails we send out
char m_sendmailIp[MAX_MX_LEN];
// send emails when a host goes down?
bool m_sendEmailAlerts;
//should we delay when only 1 host goes down out of twins till 9 30 am?
@ -316,10 +313,8 @@ class Conf {
//bool m_onlyAddUnchangedTermIds;
bool m_doIncrementalUpdating;
// always true unless entire indexdb was deleted and we are rebuilding
bool m_indexDeletes;
bool m_splitTwins;
// Temporary (?) setting for new Posdb no-in-memory-merge feature
bool m_noInMemoryPosdbMerge;
bool m_useQuickpoll;

@ -1185,9 +1185,7 @@ bool Dns::sendToNextDNS ( DnsState *ds ) {
// . resend time is set to 20 seconds in UdpSlot::setResendTime()
if ( ! m_udpServer.sendRequest ( ds->m_request ,//copy ,
ds->m_requestSize,//msgSize ,
/// @todo ALC don't think dns should be using msg_type_0, create another msgtype for dns
msg_type_0 ,
//g_conf.m_dnsIps [n] ,
msg_type_0 , /// @todo ALC don't think dns should be using msg_type_0
ip , // ds->m_dnsIps[depth][n] ,
53 ,//g_conf.m_dnsPorts[n],
-1 ,// invalid host id
@ -1197,8 +1195,6 @@ bool Dns::sendToNextDNS ( DnsState *ds ) {
TIMEOUT_SINGLE_HOST*1000 , // 20 secs?
-1, // backoff
-1, // maxWait
NULL, // replyBuf
0, // replyBufMaxSize
// use niceness 0 now so if the
// msgC slot gets converted from 1
// to 0 this will not hold it up!

@ -767,7 +767,6 @@ bool HttpServer::sendReply ( TcpSocket *s , HttpRequest *r , bool isAdmin) {
if(redirLen > 0) redir = r->getRedir();
else if (!isAdmin &&
*g_conf.m_redirect != '\0' &&
// was "raw"
r->getLong("xml", -1) == -1 &&
// do not redirect a 'gb proxy stop' request away,
// which POSTS cast=0&save=1. that is done from the

@ -636,13 +636,10 @@ static void gotImgIpWrapper ( void *state , int32_t ip ) {
}
bool Images::getImageIp ( ) {
if ( ! m_msgc.getIp ( m_imageUrl.getHost () ,
m_imageUrl.getHostLen() ,
&m_latestIp ,
this ,
gotImgIpWrapper ))
if (!m_msgc.getIp(m_imageUrl.getHost(), m_imageUrl.getHostLen(), &m_latestIp, this, gotImgIpWrapper)) {
// we blocked
return false;
}
return true;
}

@ -326,10 +326,6 @@ skip:
KEY1(m_startKey,m_ks),KEY0(m_startKey),
(int32_t)m_niceness);
char *replyBuf = NULL;
int32_t replyBufMaxSize = 0;
bool freeReply = true;
// . make a request with the info above (note: not in network order)
// . IMPORTANT!!!!! if you change this change
// Multicast.cpp::sleepWrapper1 too!!!!!!!!!!!!
@ -384,8 +380,6 @@ skip:
timeout ,
-1 , // backoff
-1 , // maxwait
replyBuf ,
replyBufMaxSize ,
m_niceness ) ) { // cback niceness
logTrace( g_conf.m_logTraceMsg0, "END, return true. Request sent" );
return true;
@ -416,14 +410,11 @@ skip:
//for ( int32_t i = 0; i < m_numSplit; i++ ) {
QUICKPOLL(m_niceness);
//int32_t gr;
char *buf;
buf = replyBuf;
// get the multicast
Multicast *m = &m_mcast;
if ( ! m->send ( m_request ,
if ( ! m->send ( m_request ,
m_requestSize,
msg_type_0 ,
false , // does multicast own request?
@ -436,23 +427,7 @@ skip:
gotMulticastReplyWrapper0 ,
timeout*1000 , // timeout
niceness ,
firstHostId ,
buf ,
replyBufMaxSize ,
freeReply , // free reply buf?
true , // do disk load balancing?
maxCacheAge ,
//(key_t *)cacheKey ,
// multicast uses it for determining the best
// host to send the request to when doing
// disk load balancing. if the host has our
// data cached, then it will probably get to
// handle the request. for now let's just assume
// this is a 96-bit key. TODO: fix...
0 , // *(key_t *)cacheKey ,
rdbId ,
minRecSizes ) )
{
firstHostId) ) {
log(LOG_ERROR, "net: Failed to send request for data from %s in shard "
"#%" PRIu32" over network: %s.",
getDbnameFromId(m_rdbId),m_shardNum, mstrerror(g_errno));

@ -471,7 +471,6 @@ skip:
// . multicast::send() returns false and sets g_errno on error
// . we return false if we block, true otherwise
// . will loop indefinitely if a host in this group is down
key_t k; k.setMin();
if ( m_mcast.send ( request , // sets mcast->m_msg to this
requestLen , // sets mcast->m_msgLen to this
msg_type_1 ,
@ -483,19 +482,9 @@ skip:
NULL , // state data
gotReplyWrapper1 ,
multicast_msg1_senddata_timeout , // timeout
m_niceness , // niceness
-1 , // first host to try
NULL , // replyBuf = NULL ,
0 , // replyBufMaxSize = 0 ,
true , // freeReplyBuf = true ,
false , // doDiskLoadBalancing = false ,
-1 , // no max cache age limit
//(key_t)0 , // cache key
k , // cache key
RDB_NONE , // bogus rdbId
-1 , // unknown minRecSizes read size
true )) // sendToSelf ))
m_niceness )) { // niceness
return false;
}
QUICKPOLL(m_niceness);
// g_errno should be set

@ -566,8 +566,6 @@ void handleRequest13 ( UdpSlot *slot , int32_t niceness ) {
200000 , // 200 sec timeout
-1,//backoff
-1,//maxwait
NULL,//replybuf
0,//replybufmaxsize
niceness)) {
// g_errno should be set

@ -25,14 +25,13 @@ bool Msg1f::getLog(int32_t hostId,
char* p = sendBuf;
*(int32_t*)p = numBytes;
p += sizeof(int32_t);
UdpSlot *slot;
g_udpServer.sendRequest(sendBuf,
p - sendBuf,
msg_type_1f,
g_hostdb.m_hostPtrs[hostId]->m_ip,
g_hostdb.m_hostPtrs[hostId]->m_port,
g_hostdb.m_hostPtrs[hostId]->m_hostId,
&slot,
NULL,
callbackState,
callback,
5);

@ -226,15 +226,7 @@ bool Msg20::getSummary ( Msg20Request *req ) {
timeout , // timeout
req->m_niceness ,
firstHostId , // first hostid
NULL , // reply buffer
0 , // reply buffer size
false , // free reply buf?
false , // do disk load balancing?
-1 , // max cache age
0 , // cacheKey
0 , // bogus rdbId
-1 , // minRecSizes(unknownRDsize)
true )) { // sendToSelf
false )) { // free reply buf?
// sendto() sometimes returns "Network is down" so i guess
// we just had an "error reply".
log("msg20: error sending mcast %s",mstrerror(g_errno));

@ -144,18 +144,10 @@ bool Msg22::getTitleRec ( Msg22Request *r ,
// get groupId from docId
uint32_t shardNum = getShardNumFromDocId ( docId );
// generate cacheKey, just use docid now
key_t cacheKey ; cacheKey.n1 = 0; cacheKey.n0 = docId;
// do load balancing iff we're the spider because if we send this
// request to a merging host, and prefer local reads is true, the
// resulting disk read will be starved somewhat. otherwise, we save
// time by not having to cast a Msg36
bool balance = false;
Host *firstHost ;
// if niceness 0 can't pick noquery host.
// if niceness 1 can't pick nospider host.
firstHost = g_hostdb.getLeastLoadedInShard ( shardNum, r->m_niceness );
Host *firstHost = g_hostdb.getLeastLoadedInShard ( shardNum, r->m_niceness );
int32_t firstHostId = firstHost->m_hostId;
m_outstanding = true;
@ -179,14 +171,7 @@ bool Msg22::getTitleRec ( Msg22Request *r ,
timeout*1000 , // timeout
r->m_niceness , // nice, reply size can be huge
firstHostId , // first hostid
NULL , // replyBuf
0 , // replyBufMaxSize
false , // free reply buf?
balance , // do disk load balancing?
maxCacheAge , // maxCacheAge
cacheKey , // cacheKey
RDB_TITLEDB , // rdbId of titledb
32*1024 ) ){// minRecSizes avg
false ) ){ // free reply buf?
log("db: Requesting title record had error: %s.",
mstrerror(g_errno) );
// set m_errno

@ -408,20 +408,7 @@ bool Msg3a::getDocIds ( Msg39Request *r ,
gotReplyWrapper3a ,
timeout , // timeout
m_r->m_niceness ,
firstHostId, // -1// bestHandlingHostId ,
NULL , // m_replyBuf ,
0 , // MSG39REPLYSIZE,
// this is true if multicast should free the
// reply, otherwise caller is responsible
// for freeing it after calling
// getBestReply().
// actually, this should always be false,
// there is a bug in Multicast.cpp.
// no, if we error out and never steal
// the buffers then they will go unfreed
// so they are freed by multicast by default
// then we steal control explicitly
true );
firstHostId ); // -1// bestHandlingHostId
// if successfully launch, do the next one
if ( status ) continue;
// . this serious error should make the whole query fail

@ -688,7 +688,6 @@ bool sendBuffer ( int32_t hostId , int32_t niceness ) {
// . in that case we should restart from the top and we will add
// the dead host ids to the top, and multicast will avoid sending
// to hostids that are dead now
key_t k; k.setMin();
if ( mcast->send ( request , // sets mcast->m_msg to this
requestSize, // sets mcast->m_msgLen to this
msg_type_4 ,
@ -704,17 +703,7 @@ bool sendBuffer ( int32_t hostId , int32_t niceness ) {
// it when its between having timed out and
// having been resent by us!
multicast_infinite_send_timeout , // timeout
MAX_NICENESS, // niceness
-1 , // first host to try
NULL , // replyBuf = NULL ,
0 , // replyBufMaxSize = 0 ,
true , // freeReplyBuf = true ,
false , // doDiskLoadBalancing = false ,
-1 , // no max cache age limit
k , // cache key
RDB_NONE , // bogus rdbId
-1 , // unknown minRecSizes read size
true )) { // sendToSelf?
MAX_NICENESS)) { // niceness
// . let storeRec() do all the allocating...
// . only let the buffer go once multicast succeeds
s_hostBufs [ hostId ] = NULL;

156
MsgC.cpp

@ -25,18 +25,12 @@ bool MsgC::registerHandler ( ) {
return true;
}
// returns false if blocked, true otherwise
bool MsgC::getIp(const char *hostname , int32_t hostnameLen ,
int32_t *ip ,
void *state ,
void (* callback) ( void *state , int32_t ip ),
int32_t niceness ,
bool forwardToProxy ){
bool MsgC::getIp(const char *hostname, int32_t hostnameLen, int32_t *ip, void *state,
void (*callback)(void *state, int32_t ip), int32_t niceness) {
m_mcast.reset();
m_callback=callback;
m_ipPtr = ip;
m_forwardToProxy = forwardToProxy;
// sanity check
if ( ! m_ipPtr ) { g_process.shutdownAbort(true); }
// First check if g_dns has it. This function is a part of the
@ -60,8 +54,7 @@ bool MsgC::getIp(const char *hostname , int32_t hostnameLen ,
// debug
//char c = hostname[hostnameLen];
//if ( c != 0 ) hostname[hostnameLen] = 0;
log(LOG_DEBUG,"dns: msgc: getting ip (sendtoproxy=%" PRId32") for [%s]",
(int32_t)forwardToProxy, hostname);
log(LOG_DEBUG,"dns: msgc: getting ip for [%s]", hostname);
//if ( c != 0 ) hostname[hostnameLen] = c;
@ -128,8 +121,6 @@ bool MsgC::getIp(const char *hostname , int32_t hostnameLen ,
//uint32_t groupId=g_hostdb.getGroupId(groupNum);
// get a hostid that should house this ip in its local cache
Host *host = NULL; // g_dns.getResponsibleHost ( key );
Host *firstHost;
// with the new iframe tag expansion logic in Msg13.cpp, the
// spider proxy will create a newXmlDoc to do that and will call
@ -146,31 +137,10 @@ bool MsgC::getIp(const char *hostname , int32_t hostnameLen ,
return false;
}
// . use scproxy if we should
// . so if we are behind a nat this should make the nat table overflow
// a moot point, because we are now tunneling via msgC to a
// server that has its own IP address and does not use NAT.
// . and if the msgc packets got screwed up by NAT that is ok because
// it will re-send forever using our reliable udp protocol
// . if we are hitting a bind9 server then askRootNameservers will be
// false. so if we are using a bind9 server then do not send the
// request to an scproxy, but send it to a grunt with a cache.
//if ( g_conf.m_useCompressionProxy && g_conf.m_askRootNameservers ) {
if ( forwardToProxy ) {
host = g_hostdb.getBestSpiderCompressionProxy((int32_t *)&key.n1);
if ( ! host && ! g_errno ) {
log("msgc: using compression proxy and asking root "
"name servers, but no compression proxy was "
"given in hosts.conf");
g_errno = EBADENGINEER;
}
if ( ! host ) return true;
firstHost = host;
}
else {
host = g_dns.getResponsibleHost ( key );
firstHost = NULL;
}
// there was logic for getting ip from a proxy here
// removed in commit bab9e9da06a8edeb8a7677c2e90f72766f6ba782 as it was never used
host = g_dns.getResponsibleHost ( key );
if ( g_conf.m_logDebugDns ) {
int32_t fip = 0;
@ -185,7 +155,7 @@ bool MsgC::getIp(const char *hostname , int32_t hostnameLen ,
if ( ! g_errno ) { g_process.shutdownAbort(true); }
return true;
}
//uint32_t groupId = host->m_groupId;
int32_t firstHostId = host->m_hostId;
// the handling server will timeout its dns algorithm and send us
// back an EDNSTIMEDOUT error, so we do not need to have any timeout
@ -208,17 +178,7 @@ bool MsgC::getIp(const char *hostname , int32_t hostnameLen ,
timeout , // timeout
niceness , // niceness
firstHostId,// first host to try
NULL , // reply buf
0 , // replybuf max size
false , // free reply buf?
false , // diskloadbalancing?
-1 , // maxCacheAge
0 , // cacheKey
0 , // rdbId
-1 , // minRecSizes
true , // sendtoself
-1 , // redirecttimeout
firstHost )){ // firstProxyHost
false )) { // free reply buf?
//did not block, error
log(LOG_DEBUG,"dns: msgc: mcast had error: %s",
mstrerror(g_errno));
@ -235,11 +195,9 @@ void gotReplyWrapper ( void *state , void *state2 ) {
int32_t ip = THIS->gotReply();
// debug
if ( g_conf.m_logDebugDns ) {
const char *s ="";
if ( THIS->m_forwardToProxy ) s = "from proxy ";
logf(LOG_DEBUG,"dns: msgc: got reply %sof %s for %s. "
logf(LOG_DEBUG,"dns: msgc: got reply of %s for %s. "
"state=0x%" PTRFMT" mcast=0x%" PTRFMT"",
s,iptoa(*THIS->m_ipPtr),THIS->m_u.getUrl(),(PTRTYPE)state2,
iptoa(*THIS->m_ipPtr),THIS->m_u.getUrl(),(PTRTYPE)state2,
(PTRTYPE)&THIS->m_mcast);
}
THIS->m_callback(state2,ip);
@ -279,10 +237,7 @@ int32_t MsgC::gotReply(){
// . if we have to free the buffer
// . if freeIt is false that maeans we own the reply buffer
// . if not sending to a proxy this means we got the ip from
// another host that is not a proxy and we should free it...
// . but if we got this reply froma proxy, do not free it
if ( ! freeIt && ! m_forwardToProxy ) {
if (!freeIt) {
//log (LOG_DEBUG,"msgC: Multicast asked to free buffer");
mfree(reply,maxSize,"MulticastMsgC");
}
@ -312,98 +267,25 @@ int32_t MsgC::gotReply(){
return *m_ipPtr;
}
// like gotReply() but it is a reply from the proxy, so we gotta free our
// msgc before sending back a reply to the first guy's msgc
void gotProxyReplyWrapper ( void *state , int32_t ipArg ) {
// get the msgc we used to send to proxy
MsgC *THIS = (MsgC *)state;
// get ip from the proxy reply
int32_t ip = THIS->gotReply();
// debug
log(LOG_DEBUG,"dns: msgc: got reply from proxy of %s for %s [%s].",
iptoa(*THIS->m_ipPtr),
THIS->m_u.getUrl(),
THIS->m_u.getScheme());
UdpSlot *slot = THIS->m_slot;
// free the msgc we used to communicate with the proxy
mdelete ( THIS , sizeof(MsgC), "proxmsgc");
delete ( THIS );
// send ip back to the first guy to launch a msgc request
gotMsgCIpWrapper ( slot , ip );
}
// . only return false if you want slot to be nuked w/o replying
// . MUST always call g_udpServer::sendReply() or sendErrorReply()
void handleRequest ( UdpSlot *slot , int32_t niceness ) {
void handleRequest(UdpSlot *slot, int32_t niceness) {
// get the request, should be the hostname
char *hostname = slot->m_readBuf;
char *hostname = slot->m_readBuf;
// do not include the \0 at the end in the length
int32_t hostnameLen = slot->m_readBufSize - 1;
int32_t ip=0;
//char c = hostname[hostnameLen];
//if ( c != 0 ) hostname[hostnameLen] = 0;
log(LOG_DEBUG,"dns: msgc: handle request called for %s state=%" PTRFMT"",
hostname,(PTRTYPE)slot);
//if ( c != 0 ) hostname[hostnameLen] = c;
bool useProxy = g_conf.m_useCompressionProxy;
// not if we are though
if ( g_hostdb.m_myHost->m_isProxy ) useProxy = false;
// . turn off for now for testing
// . roadrunner wireless injects garbage into our msgc replies
// which makes our checksum fail in UdpSlot.cpp, however, it really
// slows everything down when we are silently dropping so many
// packets. so do not forward requests to the spider proxy for
// now. re-enable this later perhaps when we have cogent installed.
useProxy = false;
log(LOG_DEBUG,"dns: msgc: handle request called for %s state=%" PTRFMT, hostname,(PTRTYPE)slot);
// check dns cache for the hostname. This should also send to
// the dnsServer. If it is not in the cache, getIp puts it in.
if ( ! useProxy ) {
if ( g_dns.getIp( hostname,
hostnameLen,
&ip,
slot,
gotMsgCIpWrapper ))
gotMsgCIpWrapper(slot,ip);
return;
if (g_dns.getIp(hostname, hostnameLen, &ip, slot, gotMsgCIpWrapper)) {
gotMsgCIpWrapper(slot, ip);
}
// . if we need to go to proxy, forward it now with a new msgC
// . before sending off to compression proxy, check the cache
// . gotProxyReplyWrapper shold add it to our cache
key_t key = g_dns.getKey ( hostname , hostnameLen );
if ( g_dns.isInCache ( key , &ip ) ) {
gotMsgCIpWrapper ( slot , ip );
return;
}
// ok, not in cache, send request to proxy now
MsgC *msgc;
try { msgc = new ( MsgC ); }
catch ( ... ) {
g_errno = ENOMEM;
log(LOG_ERROR,"%s:%s:%d: call sendErrorReply.", __FILE__, __func__, __LINE__);
g_udpServer.sendErrorReply ( slot , g_errno );
return;
}
mnew ( msgc , sizeof(MsgC), "proxmsgc" );
// save this for sending back to request
msgc->m_slot = slot;
// send request to proxy now
msgc->getIp ( hostname ,
hostnameLen ,
&msgc->m_tmpIp,
msgc, // state
gotProxyReplyWrapper, // callback
niceness,
true ); // forwardToProxy?
return;
}

3
MsgC.h

@ -35,8 +35,7 @@ class MsgC {
int32_t *ip ,
void *state ,
void (* callback) ( void *state , int32_t ip ),
int32_t niceness = 2,
bool forwardToProxy = false );
int32_t niceness = 2);
// register our request handle with g_udp server
static bool registerHandler();

@ -245,12 +245,9 @@ bool Msge1::sendMsgC ( int32_t i , const char *host , int32_t hlen ) {
m->m_state2 = this;
m->m_state3 = (void *)(PTRTYPE)i;
if ( ! m->getIp ( host ,
hlen ,
&m_ipBuf[n] ,
m , // state
gotMsgCWrapper ))// callback
if (!m->getIp(host, hlen, &m_ipBuf[n], m, gotMsgCWrapper)) {
return false;
}
return doneSending ( i );
}

@ -27,7 +27,6 @@ static void gotReplyWrapperM2 ( void *state , UdpSlot *slot ) ;
void Multicast::constructor ( ) {
m_msg = NULL;
m_readBuf = NULL;
m_replyBuf = NULL;
m_inUse = false;
}
void Multicast::destructor ( ) { reset(); }
@ -49,16 +48,9 @@ void Multicast::reset ( ) {
mfree ( m_msg , m_msgSize , "Multicast" );
if ( m_readBuf && m_ownReadBuf && m_freeReadBuf )
mfree ( m_readBuf , m_readBufMaxSize , "Multicast" );
// . replyBuf can be separate from m_readBuf if g_errno gets set
// and sets the slot's m_readBuf to NULL, then calls closeUpShop()
// which sets m_readBuf to the slot's readBuf, which is now NULL!
// . this was causing the "bad engineer" errors from Msg22 to leak mem
if ( m_replyBuf && m_ownReadBuf && m_freeReadBuf &&
m_replyBuf != m_readBuf )
mfree ( m_replyBuf , m_replyBufMaxSize , "Multicast" );
m_msg = NULL;
m_readBuf = NULL;
m_replyBuf = NULL;
m_inUse = false;
m_replyingHost = NULL;
}
@ -82,17 +74,9 @@ bool Multicast::send ( char *msg ,
int64_t totalTimeout , // in millseconds
int32_t niceness ,
int32_t firstHostId ,
char *replyBuf ,
int32_t replyBufMaxSize ,
bool freeReplyBuf ,
bool doDiskLoadBalancing ,
int32_t maxCacheAge ,
key_t cacheKey ,
char rdbId ,
int32_t minRecSizes ,
bool sendToSelf ,
int32_t redirectTimeout ,
class Host *firstHost ) {
bool freeReplyBuf ) {
bool sendToSelf = true;
// make sure not being re-used!
if ( m_inUse ) {
log( LOG_ERROR, "net: Attempt to re-use active multicast");
@ -118,8 +102,6 @@ bool Multicast::send ( char *msg ,
m_niceness = niceness;
// this can't be -1 i guess
if ( totalTimeout <= 0 ) { g_process.shutdownAbort(true); }
m_replyBuf = replyBuf;
m_replyBufMaxSize = replyBufMaxSize;
m_startTime = gettimeofdayInMilliseconds();
m_numReplies = 0;
m_readBuf = NULL;
@ -130,8 +112,7 @@ bool Multicast::send ( char *msg ,
m_sentToTwin = false;
m_retryCount = 0;
m_key = key;
m_rdbId = rdbId;
m_redirectTimeout = redirectTimeout;
// clear m_retired, m_errnos, m_slots
memset ( m_retired , 0 , sizeof(bool ) * MAX_HOSTS_PER_GROUP );
memset ( m_errnos , 0 , sizeof(int32_t ) * MAX_HOSTS_PER_GROUP );
@ -140,57 +121,39 @@ bool Multicast::send ( char *msg ,
// breathe
QUICKPOLL(m_niceness);
int32_t hostNumToTry = -1;
if ( ! firstHost ) {
// . get the list of hosts in this group
// . returns false if blocked, true otherwise
// . sets g_errno on error
//Host *hostList = g_hostdb.getGroup ( groupId , &m_numHosts);
Host *hostList = g_hostdb.getShard ( shardNum , &m_numHosts );
if ( ! hostList ) {
log("mcast: no group");g_errno=ENOHOSTS;return false;}
// now copy the ptr into our array
for ( int32_t i = 0 ; i < m_numHosts ; i++ )
m_hostPtrs[i] = &hostList[i];
// . get the list of hosts in this group
// . returns false if blocked, true otherwise
// . sets g_errno on error
Host *hostList = g_hostdb.getShard ( shardNum , &m_numHosts );
if ( ! hostList ) {
log(LOG_WARN, "mcast: no group");
g_errno=ENOHOSTS;
return false;
}
//
// if we are sending to an scproxy then put all scproxies into the
// list of hosts
//
else { // if ( firstHost && (firstHost->m_type & HT_SCPROXY) ) {
int32_t np = 0;
for ( int32_t i = 0 ; i < g_hostdb.m_numProxyHosts ; i++ ) {
// shortcut
Host *h = g_hostdb.getProxy(i);
if ( ! (h->m_type & HT_SCPROXY ) ) continue;
// stop breaching
if ( np >= 32 ) { g_process.shutdownAbort(true); }
// assign this
if ( h == firstHost ) hostNumToTry = np;
// set our array of ptrs of valid hosts to send to
m_hostPtrs[np++] = h;
}
// assign
m_numHosts = np;
firstHostId = -1;
// panic
if ( ! np ) { g_process.shutdownAbort(true); }
// now copy the ptr into our array
for ( int32_t i = 0 ; i < m_numHosts ; i++ ) {
m_hostPtrs[i] = &hostList[i];
}
// . pick the fastest host in the group
// . this should pick the fastest one we haven't already sent to yet
if ( ! sendToWholeGroup ) {
bool retVal = sendToHostLoop (key,hostNumToTry,firstHostId) ;
bool retVal = sendToHostLoop (key,-1,firstHostId) ;
// on error, un-use this class
if ( ! retVal ) m_inUse = false;
if ( ! retVal ) {
m_inUse = false;
}
return retVal;
}
//if ( ! sendToWholeGroup ) return sendToHostLoop ( key , -1 );
// . send to ALL hosts in this group if sendToWholeGroup is true
// . blocks forever until sends to all hosts are successfull
sendToGroup ( );
sendToGroup ( );
// . sendToGroup() always blocks, but we return true if no g_errno
// . we actually keep looping until all hosts get the msg w/o error
return true;
@ -264,8 +227,6 @@ void Multicast::sendToGroup ( ) {
m_totalTimeout ,
-1 , // backoff
-1 , // max wait in ms
m_replyBuf ,
m_replyBufMaxSize ,
m_niceness )) { // cback niceness
continue;
}
@ -420,55 +381,52 @@ void Multicast::gotReply2 ( UdpSlot *slot ) {
// . uses key to pick the first host to send to (for consistency)
// . after we pick a host and launch the request to him the sleepWrapper1
// will call this at regular intervals, so be careful,
bool Multicast::sendToHostLoop ( int32_t key , int32_t hostNumToTry ,
int32_t firstHostId ) {
bool Multicast::sendToHostLoop(int32_t key, int32_t hostNumToTry, int32_t firstHostId) {
// erase any errors we may have got
g_errno = 0 ;
loop:
int32_t i;
for (;;) {
// what if this host is dead?!?!?
int32_t i = (hostNumToTry >= 0) ? hostNumToTry : pickBestHost(key, firstHostId);
// what if this host is dead?!?!?
if ( hostNumToTry >= 0 ) // && ! g_hostdb.isDead(hostNumToTry) )
i = hostNumToTry;
else i = pickBestHost ( key , firstHostId );
// do not resend to retired hosts
if ( m_retired[i] ) i = -1;
// do not resend to retired hosts
if (m_retired[i]) {
i = -1;
}
// . if no more hosts return FALSE
// . we need to return false to the caller of us below
if ( i < 0 ) {
// debug msg
//log("Multicast:: no hosts left to send to");
g_errno = ENOHOSTS;
return false;
// . if no more hosts return FALSE
// . we need to return false to the caller of us below
if (i < 0) {
g_errno = ENOHOSTS;
return false;
}
// . send to this guy, if we haven't yet
// . returns false and sets g_errno on error
// . if it returns true, we sent ok, so we should return true
// . will return false if the whole thing is timed out and g_errno
// will be set to ETIMEDOUT
// . i guess ENOSLOTS means the udp server has no slots available
// for sending, so its pointless to try to send to another host
if (sendToHost(i)) {
return true;
}
// . if no more slots, we're done, don't loop!
// . pointless as well if no time left in the multicast
// . or if shutting down the server! otherwise it loops forever and
// won't exit when sending a msg20 request. i've seen this...
if (g_errno == ENOSLOTS || g_errno == EUDPTIMEDOUT || g_errno == ESHUTTINGDOWN) {
return false;
}
// otherwise try another host and hope for the best
g_errno = 0;
key = 0;
// what kind of error leads us here? EBUFTOOSMALL or EBADENGINEER...
hostNumToTry = -1;
}
// log("build: msg %x sent to host %" PRId32 " first hostId is %" PRId32 ,
// m_msgType, i, firstHostId);
// . send to this guy, if we haven't yet
// . returns false and sets g_errno on error
// . if it returns true, we sent ok, so we should return true
// . will return false if the whole thing is timed out and g_errno
// will be set to ETIMEDOUT
// . i guess ENOSLOTS means the udp server has no slots available
// for sending, so its pointless to try to send to another host
if ( sendToHost ( i ) ) return true;
// if no more slots, we're done, don't loop!
if ( g_errno == ENOSLOTS ) return false;
// pointless as well if no time left in the multicast
if ( g_errno == EUDPTIMEDOUT ) return false;
// or if shutting down the server! otherwise it loops forever and
// won't exit when sending a msg20 request. i've seen this...
if ( g_errno == ESHUTTINGDOWN ) return false;
// otherwise try another host and hope for the best
g_errno = 0;
key = 0 ;
// what kind of error leads us here? EBUFTOOSMALL or EBADENGINEER...
hostNumToTry = -1;
goto loop;
}
// . pick the fastest host from m_hosts based on avg roundtrip time for ACKs
@ -604,30 +562,6 @@ int32_t Multicast::pickBestHost ( uint32_t key , int32_t firstHostId ) {
//return i;
}
// . pick the fastest host from m_hosts based on avg roundtrip time for ACKs
// . skip hosts in our m_retired[] list of hostIds
// . returns -1 if none left to pick
/*
int32_t Multicast::pickBestHost ( ) {
int32_t mini = -1;
int32_t minPing = 0x7fffffff;
// TODO: reset the sublist ptr????
// cast the msg to "hostsPerGroup" hosts in group "groupId"
for ( int32_t i = 0 ; i < m_numHosts ; i++ ) {
// skip host if we've retired it
if ( m_retired[i] ) continue;
// get the host
Host *h = &m_hosts[i];
// see if we got a new fastest host, continue if not
if ( h->m_pingAvg > minPing ) continue;
minPing = h->m_pingAvg;
mini = i;
}
// return our candidate, may be -1 if all were picked before
return mini;
}
*/
// . returns false and sets error on g_errno
// . returns true if kicked of the request (m_msg)
// . sends m_msg to host "h"
@ -728,8 +662,6 @@ bool Multicast::sendToHost ( int32_t i ) {
timeRemaining , // timeout
-1 , // backoff
-1 , // max wait in ms
m_replyBuf ,
m_replyBufMaxSize ,
m_niceness , // cback niceness
maxResends )) {
log(LOG_WARN, "net: Had error sending msgtype 0x%02x to host #%" PRId32": %s. Not retrying.",
@ -807,11 +739,7 @@ void sleepWrapper1 ( int bogusfd , void *state ) {
// too much on an already saturated network of drives just
// excacerbates the problem. this stuff was originally put here
// to reroute for when a host went down... let's keep it that way
//int32_t ta , nb;
if ( THIS->m_redirectTimeout != -1 ) {
if ( elapsed < THIS->m_redirectTimeout ) return;
goto redirectTimedout;
}
switch ( THIS->m_msgType ) {
// msg to get a summary from a query (calls msg22)
// buzz takes extra long! it calls Msg25 sometimes.
@ -892,7 +820,6 @@ void sleepWrapper1 ( int bogusfd , void *state ) {
return;
}
redirectTimedout:
// cancel any outstanding transactions iff we have a m_replyBuf
// that we must read the reply into because we cannot share!!
if ( THIS->m_readBuf ) {
@ -964,18 +891,18 @@ redirectTimedout:
// C wrapper for the C++ callback
void gotReplyWrapperM1 ( void *state , UdpSlot *slot ) {
Multicast *THIS = (Multicast *)state;
// debug msg
//log("gotReplyWrapperM1 for msg34=%" PRId32,(int32_t)(&THIS->m_msg34));
THIS->gotReply1 ( slot );
THIS->gotReply1 ( slot );
}
// come here if we've got a reply from a host that's not part of a group send
void Multicast::gotReply1 ( UdpSlot *slot ) {
// don't ever let UdpServer free this send buf (it is m_msg)
slot->m_sendBufAlloc = NULL;
// remove the slot from m_slots so it doesn't get nuked in
// gotSlot(slot) routine above
int32_t i = 0;
// careful! we might have recycled a slot!!! start with top and go down
// because UdpServer might give us the same slot ptr on our 3rd try
// that we had on our first try!
@ -990,8 +917,11 @@ void Multicast::gotReply1 ( UdpSlot *slot ) {
log(LOG_LOGIC,"net: multicast: Not our slot 2.");
g_process.shutdownAbort(true);
}
// set m_errnos[i], if any
if ( g_errno ) m_errnos[i] = g_errno;
if ( g_errno ) {
m_errnos[i] = g_errno;
}
// mark it as no longer in progress
m_inProgress[i] = 0;
@ -1002,44 +932,37 @@ void Multicast::gotReply1 ( UdpSlot *slot ) {
m_replyingHost = h;
m_replyLaunchTime = m_launchTime[i];
if ( m_sentToTwin )
log("net: Twin msgType=0x%" PRIx32" (this=0x%" PTRFMT") "
"reply: %s.",
(int32_t)m_msgType,(PTRTYPE)this,mstrerror(g_errno));
if ( m_sentToTwin ) {
log(LOG_DEBUG, "net: Twin msgType=0x%" PRIx32" (this=0x%" PTRFMT") reply: %s.",
(int32_t) m_msgType, (PTRTYPE) this, mstrerror(g_errno));
}
// on error try sending the request to another host
// return if we kicked another request off ok
if ( g_errno ) {
Host *h;
char logIt = true;
// do not log not found on an external network
if ( g_errno == ENOTFOUND ) goto skip;
// log the error
h = g_hostdb.getHost ( slot->m_ip ,slot->m_port );
// do not log if not expected msg20
if ( slot->getMsgType() == msg_type_20 && g_errno == ENOTFOUND && ! ((Msg20 *)m_state)->m_expected ) {
logIt = false;
if ( g_errno != ENOTFOUND ) {
// log the error
Host *h = g_hostdb.getHost(slot->m_ip, slot->m_port);
if (h) {
log(LOG_WARN, "net: Multicast got error in reply from "
"hostId %" PRId32
" (msgType=0x%02x transId=%" PRId32" "
"nice=%" PRId32" net=%s): "
"%s.",
h->m_hostId, slot->getMsgType(), slot->m_transId,
m_niceness,
g_hostdb.getNetName(), mstrerror(g_errno));
} else {
log(LOG_WARN, "net: Multicast got error in reply from %s:%" PRId32" "
"(msgType=0x%02x transId=%" PRId32" nice =%" PRId32" net=%s): "
"%s.",
iptoa(slot->m_ip), (int32_t) slot->m_port,
slot->getMsgType(), slot->m_transId, m_niceness,
g_hostdb.getNetName(), mstrerror(g_errno));
}
}
if ( h && logIt )
log( LOG_WARN, "net: Multicast got error in reply from "
"hostId %" PRId32
" (msgType=0x%02x transId=%" PRId32" "
"nice=%" PRId32" net=%s): "
"%s.",
h->m_hostId, slot->getMsgType(), slot->m_transId,
m_niceness,
g_hostdb.getNetName(),mstrerror(g_errno ));
else if ( logIt )
log( LOG_WARN, "net: Multicast got error in reply from %s:%" PRId32" "
"(msgType=0x%02x transId=%" PRId32" nice =%" PRId32" net=%s): "
"%s.",
iptoa(slot->m_ip), (int32_t)slot->m_port,
slot->getMsgType(), slot->m_transId, m_niceness,
g_hostdb.getNetName(),mstrerror(g_errno) );
skip:
// if this slot had an error we may have to tell UdpServer
// not to free the read buf
if ( m_replyBuf == slot->m_readBuf ) slot->m_readBuf = NULL;
// . try to send to another host
// . on successful sending return, we'll be called on reply
// . this also returns false if no new hosts left to send to
@ -1075,22 +998,13 @@ void Multicast::gotReply1 ( UdpSlot *slot ) {
sendToTwin = false;
}
// do not worry if it was a not found msg20 for a titleRec
// which was not expected to be there
if ( ! logIt ) sendToTwin = false;
// no longer do this for titledb, too common since msg4
// cached stuff can make us slightly out of sync
//if ( g_errno == ENOTFOUND )
// sendToTwin = false;
// do not send to twin if we are out of time
time_t now = getTime();
int32_t timeRemaining = m_startTime + m_totalTimeout - now;
if ( timeRemaining <= 0 ) sendToTwin = false;
// send to the twin
if ( sendToTwin && sendToHostLoop(0,-1,-1) ) {
log("net: Trying to send request msgType=0x%" PRIx32" "
"to a twin. (this=0x%" PTRFMT")",
log(LOG_INFO, "net: Trying to send request msgType=0x%" PRIx32" to a twin. (this=0x%" PTRFMT")",
(int32_t)m_msgType,(PTRTYPE)this);
m_sentToTwin = true;
// . keep stats
@ -1104,64 +1018,66 @@ void Multicast::gotReply1 ( UdpSlot *slot ) {
// . otherwise we've failed on all hosts
// . re-instate g_errno,might have been set by sendToHostLoop()
g_errno = m_errnos[i];
// unregister our sleep wrapper if we did
//if ( m_registeredSleep ) {
// g_loop.unregisterSleepCallback ( this, sleepWrapper1 );
// m_registeredSleep = false;
//}
// destroy all slots that may be in progress (except "slot")
//destroySlotsInProgress ( slot );
// call callback with g_errno set
//if ( m_callback ) m_callback ( m_state );
// we're done, all slots should be destroyed by UdpServer
//return;
}
closeUpShop ( slot );
}
void Multicast::closeUpShop ( UdpSlot *slot ) {
// sanity check
if ( ! m_inUse ) { g_process.shutdownAbort(true); }
if (!m_inUse) {
g_process.shutdownAbort(true);
}
// destroy the OTHER slots we've spawned that are in progress
destroySlotsInProgress ( slot );
// if we have no slot per se, skip this stuff
if ( ! slot ) goto skip;
// . now we have a good reply... but not if g_errno is set
// . save the reply of this slot here
// . this is bad if we got an g_errno above, it will set the slot's
// readBuf to NULL up there, and that will make m_readBuf NULL here
// causing a mem leak. i fixed by adding an mfree on m_replyBuf
// in Multicast::reset() routine.
// . i fixed again by ensuring we do not set m_ownReadBuf to false
// in getBestReply() below if m_readBuf is NULL
m_readBuf = slot->m_readBuf;
m_readBufSize = slot->m_readBufSize;
m_readBufMaxSize = slot->m_readBufMaxSize;
// . if the slot had an error, propagate it so it will be set when
// we call the callback.
if(!g_errno) g_errno = slot->m_errno;
// . sometimes UdpServer will read the reply into a temporary buffer
// . this happens if the udp server is hot (async signal based) and
// m_replyBuf is NULL because he cannot malloc a buf to read into
// because malloc is not async signal safe
if ( slot->m_tmpBuf == slot->m_readBuf ) m_freeReadBuf = false;
// don't let UdpServer free the readBuf now that we point to it
slot->m_readBuf = NULL;
// save slot so msg4 knows what slot replied in udpserver
// for doing its flush callback logic
m_slot = slot;
if (slot) {
// . now we have a good reply... but not if g_errno is set
// . save the reply of this slot here
// . this is bad if we got an g_errno above, it will set the slot's
// readBuf to NULL up there, and that will make m_readBuf NULL here
// causing a mem leak. i fixed by adding an mfree on m_replyBuf
// in Multicast::reset() routine.
// . i fixed again by ensuring we do not set m_ownReadBuf to false
// in getBestReply() below if m_readBuf is NULL
m_readBuf = slot->m_readBuf;
m_readBufSize = slot->m_readBufSize;
m_readBufMaxSize = slot->m_readBufMaxSize;
// . if the slot had an error, propagate it so it will be set when
// we call the callback.
if (!g_errno) {
g_errno = slot->m_errno;
}
// . sometimes UdpServer will read the reply into a temporary buffer
// . this happens if the udp server is hot (async signal based) and
// m_replyBuf is NULL because he cannot malloc a buf to read into
// because malloc is not async signal safe
if (slot->m_tmpBuf == slot->m_readBuf) {
m_freeReadBuf = false;
}
// don't let UdpServer free the readBuf now that we point to it
slot->m_readBuf = NULL;
// save slot so msg4 knows what slot replied in udpserver
// for doing its flush callback logic
m_slot = slot;
}
skip:
// unregister our sleep wrapper if we did
if ( m_registeredSleep ) {
g_loop.unregisterSleepCallback ( this , sleepWrapper1 );
m_registeredSleep = false;
}
if ( ! g_errno && m_retryCount > 0 )
log("net: Multicast succeeded after %" PRId32" retries.",m_retryCount);
if ( ! g_errno && m_retryCount > 0 ) {
log(LOG_INFO, "net: Multicast succeeded after %" PRId32" retries.", m_retryCount);
}
// allow us to be re-used now, callback might relaunch
m_inUse = false;
// now call the user callback if it exists
if ( m_callback ) {
m_callback ( m_state , m_state2 );
@ -1187,10 +1103,6 @@ void Multicast::destroySlotsInProgress ( UdpSlot *slot ) {
// don't free his sendBuf, readBuf is ok to free, however
m_slots[i]->m_sendBufAlloc = NULL;
// if caller provided the buffer, don't free it cuz "slot"
// contains it (or m_readBuf)
if ( m_replyBuf == m_slots[i]->m_readBuf )
m_slots[i]->m_readBuf = NULL;
// destroy this slot that's in progress
g_udpServer.destroySlot ( m_slots[i] );
// do not re-destroy. consider no longer in progress.
@ -1203,7 +1115,9 @@ void Multicast::destroySlotsInProgress ( UdpSlot *slot ) {
char *Multicast::getBestReply(int32_t *replySize, int32_t *replyMaxSize, bool *freeReply, bool steal) {
*replySize = m_readBufSize;
*replyMaxSize = m_readBufMaxSize;
if(steal) m_freeReadBuf = false;
if(steal) {
m_freeReadBuf = false;
}
*freeReply = m_freeReadBuf;
// this can be NULL if we destroyed the slot in progress only to
// try another host who was dead!

@ -87,17 +87,7 @@ class Multicast {
int64_t totalTimeout , //relative timeout in milliseconds
int32_t niceness ,
int32_t firstHostId = -1 ,// first host to try
char *replyBuf = NULL ,
int32_t replyBufMaxSize = 0 ,
bool freeReplyBuf = true ,
bool doDiskLoadBalancing = false ,
int32_t maxCacheAge = -1 , // no age limit
key_t cacheKey = 0 ,
char rdbId = 0 , // bogus rdbId
int32_t minRecSizes = -1 ,// unknown read size
bool sendToSelf = true ,// if we should.
int32_t redirectTimeout = -1 ,
class Host *firstProxyHost = NULL );
bool freeReplyBuf = true );
// . get the reply from your NON groupSend
// . if *freeReply is true then you are responsible for freeing this
@ -115,7 +105,7 @@ class Multicast {
void destroySlotsInProgress ( UdpSlot *slot );
// keep these public so C wrapper can call them
bool sendToHostLoop ( int32_t key, int32_t hostNumToTry, int32_t firstHostId );
bool sendToHostLoop(int32_t key, int32_t hostNumToTry, int32_t firstHostId);
bool sendToHost ( int32_t i );
int32_t pickBestHost ( uint32_t key , int32_t hostNumToTry );
void gotReply1 ( UdpSlot *slot ) ;
@ -167,11 +157,6 @@ class Multicast {
int32_t m_readBufSize;
int32_t m_readBufMaxSize;
// if caller passes in a reply buf then we reference it here
char *m_replyBuf;
int32_t m_replyBufSize;
int32_t m_replyBufMaxSize;
// we own it until caller calls getBestReply()
bool m_ownReadBuf;
// are we registered for a callback every 1 second
@ -189,9 +174,6 @@ class Multicast {
int32_t m_key;
//bool m_doDiskLoadBalancing;
char m_rdbId ;
// Msg1 might be able to add data to our tree to save a net trans.
bool m_sendToSelf;
@ -199,7 +181,6 @@ class Multicast {
char m_sentToTwin;
int32_t m_redirectTimeout;
char m_inUse;
// for linked list of available Multicasts in Msg4.cpp

@ -278,8 +278,6 @@ bool Msg7::sendInjectionRequestToHost ( InjectionRequest *ir ,
udpserver_sendrequest_infinite_timeout , // timeout
-1 , // backoff
-1 , // maxwait
NULL, // replybuf
0, // replybufmaxsize
MAX_NICENESS // niceness
) )
// we also return true on success, false on error

@ -1400,7 +1400,7 @@ bool sendPageStats ( TcpSocket *s , HttpRequest *r ) {
// print each msg stat
for ( int32_t i1 = 0 ; i1 < MAX_MSG_TYPES ; i1++ ) {
// skip it if has no handler
if ( ! g_udpServer.m_handlers[i1] ) continue;
if ( ! g_udpServer.hasHandler(i1) ) continue;
if ( ! g_stats.m_reroutes [i1][i3] &&
! g_stats.m_packetsIn [i1][i3] &&
! g_stats.m_packetsOut [i1][i3] &&
@ -1559,7 +1559,7 @@ bool sendPageStats ( TcpSocket *s , HttpRequest *r ) {
// print each msg stat
for ( int32_t i1 = 0 ; i1 < MAX_MSG_TYPES ; i1++ ) {
// skip it if has no handler
if ( ! g_udpServer.m_handlers[i1] ) continue;
if ( ! g_udpServer.hasHandler(i1) ) continue;
// skip if xml
if ( format != FORMAT_HTML ) continue;
// print it all out.
@ -1628,7 +1628,7 @@ bool sendPageStats ( TcpSocket *s , HttpRequest *r ) {
// only html
if ( format != FORMAT_HTML ) break;
// skip it if has no handler
if ( ! g_udpServer.m_handlers[i1] ) continue;
if ( ! g_udpServer.hasHandler(i1) ) continue;
// print it all out
int64_t total = g_stats.m_msgTotalOfQueuedTimes[i1][i3];
int64_t nt = g_stats.m_msgTotalQueued [i1][i3];
@ -1696,7 +1696,7 @@ bool sendPageStats ( TcpSocket *s , HttpRequest *r ) {
}
// skip it if has no handler
if ( ! g_udpServer.m_handlers[i1] ) {
if ( ! g_udpServer.hasHandler(i1) ) {
continue;
}

@ -5045,6 +5045,18 @@ void Parms::init ( ) {
m->m_obj = OBJ_CONF;
m++;
m->m_title = "Use new no-in-memory-merge feature";
m->m_desc = "Posdb will no longer contain delete keys, and the entire document is indexed every time a change is found.";
m->m_cgi = "noinmemmerge";
m->m_off = offsetof(Conf,m_noInMemoryPosdbMerge);
m->m_type = TYPE_BOOL;
m->m_def = "0";
m->m_page = PAGE_MASTER;
m->m_obj = OBJ_CONF;
m++;
m->m_title = "injections enabled";
m->m_desc = "Controls injecting for all collections";
m->m_cgi = "injen";
@ -5462,20 +5474,6 @@ void Parms::init ( ) {
m->m_obj = OBJ_CONF;
m++;
m->m_title = "sendmail IP";
m->m_desc = "We send crawlbot notification emails to this sendmail "
"server which forwards them to the specified email address.";
m->m_cgi = "smip";
m->m_off = offsetof(Conf,m_sendmailIp);
m->m_type = TYPE_STRING;
m->m_def = "";
m->m_size = sizeof(Conf::m_sendmailIp);
m->m_flags = PF_HIDDEN | PF_NOSAVE;
m->m_page = PAGE_MASTER;
m->m_obj = OBJ_CONF;
m++;
m->m_title = "send email alerts";
m->m_desc = "Sends emails to admin if a host goes down.";
m->m_cgi = "sea";
@ -6019,20 +6017,6 @@ void Parms::init ( ) {
m->m_obj = OBJ_CONF;
m++;
m->m_title = "twins are split";
m->m_desc = "If enabled, Gigablast assumes the first half of "
"machines in hosts.conf "
"are on a different network switch than the second half, "
"and minimizes transmits between the switches.";
m->m_cgi = "stw";
m->m_off = offsetof(Conf,m_splitTwins);
m->m_type = TYPE_BOOL;
m->m_def = "0";
m->m_flags = PF_HIDDEN | PF_NOSAVE;
m->m_page = PAGE_MASTER;
m->m_obj = OBJ_CONF;
m++;
m->m_title = "do out of memory testing";
m->m_desc = "When enabled Gigablast will randomly fail at "
"allocating memory. Used for testing stability.";
@ -11564,8 +11548,6 @@ bool Parms::doParmSendingLoop ( ) {
30*1000 , // timeout msecs
-1 , // backoff
-1 , // maxwait
NULL , // replybuf
0 , // replybufmaxsize
0 ) ) { // niceness
log("parms: faild to send: %s",mstrerror(g_errno));
continue;

@ -11,7 +11,6 @@ int32_t klogctl( int, char *,int ) { return 0; }
#include "PingServer.h"
#include "UdpServer.h"
//#include "Sync.h"
#include "Conf.h"
#include "HttpServer.h"
#include "HttpMime.h"
@ -44,16 +43,7 @@ static void handleRequest11 ( UdpSlot *slot , int32_t niceness ) ;
static void gotReplyWrapperP2 ( void *state , UdpSlot *slot );
static void gotReplyWrapperP3 ( void *state , UdpSlot *slot );
static void updatePingTime ( Host *h , int32_t *pingPtr , int32_t tripTime ) ;
// JAB: warning abatement
//static bool pageTMobile ( Host *h , char *errmsg ) ;
//static bool pageAlltel ( Host *h , char *errmsg , char *num ) ;
//static bool pageVerizon ( Host *h , char *errmsg ) ;
//static void verizonWrapper ( void *state , TcpSocket *ts ) ;
//static bool pageVerizon2 ( void *state , TcpSocket *s ) ;
// JAB: warning abatement
//static bool pageSprintPCS ( Host *h , char *errmsg , char *num ) ;
//static void sprintPCSWrapper ( void *state , TcpSocket *ts ) ;
//static bool pageSprintPCS2 ( void *state , TcpSocket *ts ) ;
static bool sendAdminEmail ( Host *h, const char *fromAddress,
const char *toAddress, char *body ,
const char *emailServIp );
@ -62,18 +52,21 @@ bool PingServer::registerHandler ( ) {
// . we'll handle msgTypes of 0x11 for pings
// . register ourselves with the udp server
// . it calls our callback when it receives a msg of type 0x0A
// . it is now hot..., no, not hot anymore
//if ( ! g_udpServer2.registerHandler ( 0x11, handleRequest11 ))
// return false;
// register on low priority server to make transition easier
if ( ! g_udpServer.registerHandler ( msg_type_11, handleRequest11 ))
if ( ! g_udpServer.registerHandler ( msg_type_11, handleRequest11 )) {
return false;
}
// limit this to 1000ms
if ( g_conf.m_pingSpacer > 1000 ) g_conf.m_pingSpacer = 1000;
if ( g_conf.m_pingSpacer > 1000 ) {
g_conf.m_pingSpacer = 1000;
}
// save this
m_pingSpacer = g_conf.m_pingSpacer;
// this starts off at zero
m_callnum = 0;
// . this was 500ms but now when a host shuts downs it sends all other
// hosts a msg saying so... PingServer::broadcastShutdownNotes() ...
// . so i put it up to 2000ms to save bandwidth some
@ -85,23 +78,8 @@ bool PingServer::registerHandler ( ) {
return false;
}
// if not host #0, we're done
if ( g_hostdb.m_hostId != 0 ) return true;
// we have disabled syncing for now
return true;
// . this is called ever 10 minutes
// . it tells all hosts to store a sync point at about the same time
// . the sync point is a fixed value in time used as a reference
// for performing incremental synchronization by Sync.cpp should
// a host go down without saving. one of its twins must of course
// remain up in order to sync him.
//if ( ! g_loop.registerSleepCallback ( SYNC_TIME*1000 , // ms
// NULL ,
// sleepWrapper10 ) )
// return false;
//return true;
}
static int32_t s_outstandingPings = 0;
@ -418,8 +396,6 @@ void PingServer::pingHost ( Host *h , uint32_t ip , uint16_t port ) {
g_conf.m_deadHostTimeout ,
1000 , // backoff
2000 , // max wait
NULL , // reply buf
0 , // reply buf size
0 )) // niceness
return;
// it had an error, so dec the count
@ -616,8 +592,6 @@ void gotReplyWrapperP ( void *state , UdpSlot *slot ) {
g_conf.m_deadHostTimeout ,
1000 , // backoff
2000 , // max wait
NULL , // reply buf
0 , // reply buf size
0 )) // niceness
return;
// he came back right away
@ -1378,8 +1352,6 @@ bool PingServer::broadcastShutdownNotes ( bool sendEmailAlert ,
3000 , // 3 sec timeout
-1 , // default backoff
-1 , // default maxwait
NULL , // reply buf
0 , // reply buf size
0 ))// niceness
continue;
// otherwise, had an error
@ -1414,8 +1386,6 @@ bool PingServer::broadcastShutdownNotes ( bool sendEmailAlert ,
3000 , // 3 sec timeout
-1 , // default backoff
-1 , // default maxwait
NULL , // reply buf
0 , // reply buf size
0 ))// niceness
continue;
// otherwise, had an error
@ -1455,100 +1425,6 @@ void gotReplyWrapperP2 ( void *state , UdpSlot *slot ) {
// the files were created. this if for doing incremental synchronization.
// all "sync points" are from host #0's clock.
// ensure not too many sync point store requests off at once
static int32_t s_outstandingTaps = 0;
static char s_lastSyncPoint [ 9 ];
static int32_t s_nextTapHostId ;
static void tapLoop ( ) ;
static void gotTapReplyWrapper ( void *state , UdpSlot *slot ) ;
void tapLoop ( ) {
// . don't use more than 16 UdpSlots for tapping
// . don't use more than numHosts UdpSlots for tapping
int32_t max = g_hostdb.getNumHosts();
if ( max > 16 ) max = 16;
loop:
if ( s_outstandingTaps >= max ) return;
// cycle through pinging different hosts
int32_t n = g_hostdb.getNumHosts();
// if done sending requests then just return
if ( s_nextTapHostId >= n ) return;
// otherwise, tap this guy
g_pingServer.tapHost ( s_nextTapHostId );
// inc next host to ping
s_nextTapHostId++;
// do as many in a row as we can
goto loop;
}
void gotTapReplyWrapper ( void *state , UdpSlot *slot ) {
// it came back
s_outstandingTaps--;
// don't let udp server free our send buf, we own it
slot->m_sendBufAlloc = NULL;
// discard errors
g_errno = 0;
// loop to do more if we need to
tapLoop ( );
}
// ping host #i
void PingServer::tapHost ( int32_t hostId ) {
// don't ping on interface machines
if ( g_conf.m_interfaceMachine ) return;
// don't tap ourselves
//if ( hostId == g_hostdb.m_hostId ) return;
// watch for out of bounds
if ( hostId < 0 || hostId >= g_hostdb.getNumHosts() ) return;
// get host ptr
Host *h = g_hostdb.getHost ( hostId );
// return if NULL
if ( ! h ) return;
// don't tap again if already in progress
//if ( h->m_inTapProgress ) return;
// count it
s_outstandingTaps++;
// consider it in progress
//h->m_inProgress = true;
// . launch one
// . returns false and sets errno on error
// . returns true if request sent and it blocked
// . size of 2 is unique to a tap
// . use MsgType of 0x11 for pinging
// . we now use the high-prioirty server, g_udpServer2
// . now we send over our current time so remote host will sync up
// with us
// . only sync up with hostId #0
// . if he goes down its ok because time is mostly important for
// spidering and spidering is suspended if a host is down
if ( g_udpServer.sendRequest ( s_lastSyncPoint ,
9 ,
msg_type_11 ,
h->m_ip ,
h->m_port ,
h->m_hostId ,
NULL ,
(void *)(PTRTYPE)h->m_hostId,// cb state
gotTapReplyWrapper ,
30000 , // timeout
1000 , // backoff
10000 , // max wait
NULL , // reply buf
0 , // reply buf size
0 ))// niceness
return;
// it had an error, so dec the count
s_outstandingTaps--;
// consider it out of progress
//h->m_inTapProgress = false;
// had an error
log("net: Had error sending sync point request to host #%" PRId32": %s.",
h->m_hostId,mstrerror(g_errno) );
// reset it cuz it's not a showstopper
g_errno = 0;
}
// if its status changes from dead to alive or vice versa, we have to
// update g_hostdb.m_numHostsAlive. Dns.cpp and Msg17 will use this count
void updatePingTime ( Host *h , int32_t *pingPtr , int32_t tripTime ) {
@ -1731,112 +1607,3 @@ void PingServer::sendEmailMsg ( int32_t *lastTimeStamp , const char *msg ) {
*lastTimeStamp = now;
return;
}
/////////////////
//
// for sending email notifications to external addresses
//
///////////////////
/*
bool gotMxIp ( EmailInfo *ei ) ;
void gotMxIpWrapper ( void *state , int32_t ip ) {
EmailInfo *ei = (EmailInfo *)state;
// i guess set it
ei->m_mxIp = ip;
// handle it
if ( ! gotMxIp ( ei ) ) return;
// did not block, call callback
ei->m_callback ( ei->m_state );
}
*/
void doneSendingEmailWrapper ( void *state , TcpSocket *sock ) {
if ( g_errno )
log("crawlbot: error sending email = %s",mstrerror(g_errno));
// log the reply
if ( sock && sock->m_readBuf )
log("crawlbot: got socket reply=%s", sock->m_readBuf);
EmailInfo *ei = (EmailInfo *)state;
ei->m_callback ( ei->m_state );
}
// returns false if blocked, true otherwise
bool sendEmail ( class EmailInfo *ei ) {
// this is often set from XmlDoc.cpp::indexDoc()
g_errno = 0;
char *to = ei->m_toAddress.getBufStart();
char *dom = strstr(to,"@");
if ( ! dom || ! dom[1] ) {
g_errno = EBADENGINEER;
log("email: missing @ sign in email %s",to);
return true;
}
// point to domain
dom++;
// ref that for printing HELO line
ei->m_dom = dom;
// just send it to a sendmail server which will forward it,
// because a lot of email servers don't like us connecting directly
// beause i think our IP address does not match that of our
// MX ip for our domain? sendmail must be configured to allow
// forwarding if it receives an email from the IP of host #0
// in the cluster.
ei->m_mxIp = atoip(g_conf.m_sendmailIp);
// wtf?
if ( ei->m_mxIp == 0 ) {
log("crawlbot: got bad MX ip of 0 for %s",
ei->m_mxDomain.getBufStart());
return true;
}
// label alloc'd mem with gotmxip in case of mem leak
SafeBuf sb;//("gotmxip");
// helo line
sb.safePrintf("HELO %s\r\n",ei->m_dom);
// mail line
sb.safePrintf( "MAIL FROM:<%s>\r\n", ei->m_fromAddress.getBufStart());
// to line
sb.safePrintf( "RCPT TO:<%s>\r\n", ei->m_toAddress.getBufStart());
// data
sb.safePrintf( "DATA\r\n");
// body
sb.safePrintf( "To: %s\r\n", ei->m_toAddress.getBufStart());
sb.safePrintf( "Subject: %s\r\n",ei->m_subject.getBufStart());
// mime header must be separated from body by an extra \r\n
sb.safePrintf( "\r\n");
sb.safePrintf( "%s", ei->m_body.getBufStart() );
// quit
sb.safePrintf( "\r\n.\r\nQUIT\r\n\r\n");
// send the message
TcpServer *ts = g_httpServer.getTcp();
log ( LOG_WARN, "crawlbot: Sending email to %s (MX IP=%s):\n %s",
ei->m_toAddress.getBufStart(),
iptoa(ei->m_mxIp),
sb.getBufStart() );
// make a temp string
SafeBuf mxIpStr;
mxIpStr.safePrintf("%s",iptoa(ei->m_mxIp) );
if ( !ts->sendMsg( mxIpStr.getBufStart(), mxIpStr.length(), 25, sb.getBufStart(), sb.getCapacity(),
sb.getLength(), sb.getLength(), ei, doneSendingEmailWrapper, 60 * 1000, 100 * 1024,
100 * 1024 ) ) {
// tcpserver will free it, so prevent double free with detach
sb.detachBuf();
return false;
}
// error? if no error, it was a successful write and it done,
// otherwise we will want to free the sendbuf here so do not
// call detachBuf()
if ( ! g_errno )
// tcpserver will free it, so prevent double free with detach
sb.detachBuf();
// we did not block
return true;
}

@ -9,26 +9,6 @@
extern char g_repairMode;
class EmailInfo {
public:
SafeBuf m_toAddress;
SafeBuf m_fromAddress;
SafeBuf m_subject;
SafeBuf m_body;
SafeBuf m_spiderStatusMsg;
collnum_t m_collnum;
char *m_dom; // ref into m_toAddress of the domain in email addr
SafeBuf m_mxDomain; // just the domain with a "gbmxrec-" prepended
void *m_state;
void (* m_callback ) (void *state);
void *m_finalState;
void (* m_finalCallback ) (void *state);
// ip address of MX record for this domain
int32_t m_mxIp;
int32_t m_notifyBlocked;
class CollectionRec *m_collRec;
};
class PingServer {
public:

@ -198,11 +198,10 @@ bool Proxy::handleRequest (TcpSocket *s){
// . it only redirects there if the raw/code/site/sites is NULL
*g_conf.m_redirect != '\0' &&
hr.getLong("xml", -1) == -1 &&
hr.getLong("raw", -1) == -1 &&
hr.getString("code") == NULL &&
hr.getString("site") == NULL &&
hr.getString("sites") == NULL) {
//direct all non-raw, non admin traffic away.
//direct all non-xml, non admin traffic away.
redir = g_conf.m_redirect;
redirLen = strlen(g_conf.m_redirect);
}
@ -310,9 +309,8 @@ bool Proxy::handleRequest (TcpSocket *s){
stC->m_hostId = -1;
stC->m_slot = NULL;
// support &xml=1 or &raw=9 or &raw=8 to indicate xml output is wanted
// support &xml=1 to indicate xml output is wanted
stC->m_raw = hr.getLong ( "xml", 0 );
stC->m_raw = hr.getLong("raw",stC->m_raw);
stC->m_s = s;
@ -449,34 +447,6 @@ bool Proxy::forwardRequest ( StateControl *stC ) {
dstId = -1;
}
// rewrite &xml=1 as &raw=8 so old search engine sends back xml
if ( req[0]=='G' &&
req[1]=='E' &&
req[2]=='T' &&
req[3] == ' ' ) {
// replace &xml=1 in request with &raw=8 to support others
char *p = req + 4;
char *pend = req + reqSize;
// skip GET
for ( ; p < pend ; p++ ) {
// stop after url is over
if ( *p == ' ' ) break;
// match?
if ( p[0] != '?' && p[0] != '&' ) continue;
if ( p[1] != 'x' ) continue;
if ( p[2] != 'm' ) continue;
if ( p[3] != 'l' ) continue;
if ( p[4] != '=' ) continue;
if ( p[5] != '1' ) continue;
p[1] = 'r';
p[2] = 'a';
p[3] = 'w';
p[5] = '9';
break;
}
}
// . let's use the udp server instead because it quickly switches
// to using eth1 if eth0 does two or more resends without an ACK,
// and vice versa. this ensure that if a network switch fails then
@ -500,8 +470,6 @@ bool Proxy::forwardRequest ( StateControl *stC ) {
stC->m_timeout ,
-1 , // backoff
-1 , // maxwait
NULL , // replyBuf
0 , // replyBufMaxSize
0 , // niceness
4 );// maxResends

@ -1867,7 +1867,7 @@ bool sendReply2 ( void *state ) {
char buf[1024*32];
SafeBuf sb(buf, 1024*32);
// do they want an xml reply?
if( r->getLong("xml",0) ) { // was "raw"
if( r->getLong("xml",0) ) {
sb.safePrintf("<?xml version=\"1.0\" "
"encoding=\"ISO-8859-1\"?>\n"
"<response>\n");

@ -322,57 +322,68 @@ bool UdpServer::sendRequest(char *msg,
int64_t timeout, // in milliseconds
int16_t backoff,
int16_t maxWait,
char *replyBuf,
int32_t replyBufMaxSize,
int32_t niceness,
int32_t maxResends) {
// sanity check
if ( ! m_handlers[msgType] && this == &g_udpServer &&
// proxy forwards the msg10 to a host in the cluster
! g_proxy.isProxy() ) {
// proxy forwards the msg10 to a host in the cluster
if ( ! m_handlers[msgType] && this == &g_udpServer && ! g_proxy.isProxy() ) {
g_process.shutdownAbort(true);
}
// NULLify slot if any
if ( retslot ) *retslot = NULL;
if ( retslot ) {
*retslot = NULL;
}
// if shutting down return an error
if ( m_isShuttingDown ) {
g_errno = ESHUTTINGDOWN;
return false;
}
// ensure timeout ok
if ( timeout < 0 ) {
//g_errno = EBADENGINEER;
log(LOG_LOGIC,"udp: sendrequest: Timeout is negative. ");
g_process.shutdownAbort(true);
}
// . we only allow niceness 0 or 1 now
// . this niceness is only used for makeCallbacks_ass()
if ( niceness > 1 ) niceness = 1;
if ( niceness < 0 ) niceness = 0;
// get a new transId
int32_t transId = getTransId();
// set up shotgunning for this hostId
Host *h = NULL;
uint32_t ip2 = ip;
//if ( g_conf.m_useShotgun && hostId >= 0 ) {
// . now we always set UdpSlot::m_host
// . hostId is -1 when sending to a host in g_hostdb2 (hosts2.conf)
if ( hostId >= 0 ) h = g_hostdb.getHost ( hostId );
if ( hostId >= 0 ) {
h = g_hostdb.getHost ( hostId );
}
// get it from g_hostdb2 then via ip lookup if still NULL
if ( ! h ) h = g_hostdb.getHost ( ip , port );
if ( ! h ) {
h = g_hostdb.getHost ( ip , port );
}
// sanity check
if ( h && ip && ip != (uint32_t)-1 && h->m_ip != ip &&
h->m_ipShotgun != ip && ip != 0x0100007f ) { // "127.0.0.1"
if ( h && ip && ip != (uint32_t)-1 && h->m_ip != ip && h->m_ipShotgun != ip && ip != 0x0100007f ) { // "127.0.0.1"
log(LOG_LOGIC,"udp: provided hostid does not match ip");
g_process.shutdownAbort(true);
}
// ok, we are probably sending a dns request to a dns server...
//if ( ! h ) { g_process.shutdownAbort(true); }
// always use the primary ip for making the key,
// do not use the shotgun ip. because we can be getting packets
// from either ip for the same transaction.
if ( h ) ip2 = h->m_ip;
if ( h ) {
ip2 = h->m_ip;
}
// make a key for this new slot
key_t key = m_proto->makeKey (ip2,port,transId,true/*weInitiated?*/);
@ -392,34 +403,30 @@ bool UdpServer::sendRequest(char *msg,
// . get time
int64_t now = gettimeofdayInMillisecondsLocal();
// connect to the ip/port (udp-style: does not do much)
slot->connect ( m_proto, ip, port, h, hostId, transId, timeout, now ,
niceness );
slot->connect(m_proto, ip, port, h, hostId, transId, timeout, now, niceness);
// . use default callback if none provided
// . slot has a callback iff it's an outgoing request
if ( ! callback ) callback = defaultCallbackWrapper;
if ( ! callback ) {
callback = defaultCallbackWrapper;
}
// set up for a send
if ( ! slot->sendSetup( msg ,
msgSize ,
msg ,
msgSize ,
msgType ,
now ,
state ,
callback ,
niceness ,
backoff ,
maxWait ,
replyBuf ,
replyBufMaxSize ) ) {
if (!slot->sendSetup(msg, msgSize, msg, msgSize, msgType, now, state, callback, niceness, backoff, maxWait)) {
freeUdpSlot_ass ( slot );
log( LOG_WARN, "udp: Failed to initialize udp socket for sending req: %s",mstrerror(g_errno));
return false;
}
if ( slot->m_next3 || slot->m_prev3 ) { g_process.shutdownAbort(true); }
if (slot->m_next3 || slot->m_prev3) {
g_process.shutdownAbort(true);
}
// set this
slot->m_maxResends = maxResends;
// keep sending dgrams until we have no more or hit ACK_WINDOW limit
if ( ! doSending_ass ( slot , true /*allow resends?*/ , now ) ) {
freeUdpSlot_ass ( slot );
@ -427,17 +434,11 @@ bool UdpServer::sendRequest(char *msg,
return false;
}
// debug msg
//int64_t now = gettimeofdayInMilliseconds();
//log("***added node #%" PRId32", isTimedOut=%" PRId32"\n",node,
//slot->isTimedOut(now));
// let caller know the slot if he wants to
if ( retslot ) *retslot = slot;
// debug msg
//log("UdpServer added slot to send on, key={%" PRId32",%" PRId64"},"
//"msgType=0x%02x\n",
//key.n1,key.n0, msgType );
// success
if ( retslot ) {
*retslot = slot;
}
return true;
}
@ -533,19 +534,7 @@ void UdpServer::sendReply_ass ( char *msg ,
// . use a NULL callback since we're sending a reply
// . set up for a send
if ( ! slot->sendSetup ( msg ,
msgSize ,
alloc ,
allocSize ,
slot->getMsgType() ,
now ,
NULL ,
NULL ,
slot->m_niceness ,
backoff ,
maxWait ,
NULL ,
0 ) ) {
if (!slot->sendSetup(msg, msgSize, alloc, allocSize, slot->getMsgType(), now, NULL, NULL, slot->m_niceness, backoff, maxWait)) {
log( LOG_WARN, "udp: Failed to initialize udp socket for sending reply: %s", mstrerror(g_errno));
mfree ( alloc , allocSize , "UdpServer");
// was EBADENGINEER
@ -2454,8 +2443,6 @@ bool UdpServer::timeoutDeadHosts ( Host *h ) {
// verified that this is not interruptible
UdpSlot *UdpServer::getEmptyUdpSlot_ass ( key_t k , bool incoming ) {
// tmp debug
//if ( (rand() % 10) == 1 ) slot = NULL
// return NULL if none left
if ( ! m_head ) {
g_errno = ENOSLOTS;
@ -2465,14 +2452,12 @@ UdpSlot *UdpServer::getEmptyUdpSlot_ass ( key_t k , bool incoming ) {
}
return NULL;
}
UdpSlot *slot = m_head;
// remove from linked list of available slots
m_head = m_head->m_next;
// add to linked list of used slots
//slot->m_next2 = m_head2;
//slot->m_prev2 = NULL;
//if ( m_head2 ) m_head2->m_prev2 = slot;
//m_head2 = slot;
// put the used slot at the tail so older slots are at the head and
// makeCallbacks() can take care of the callbacks that have been
// waiting the longest first...
@ -2481,30 +2466,26 @@ UdpSlot *UdpServer::getEmptyUdpSlot_ass ( key_t k , bool incoming ) {
slot->m_prev2 = m_tail2;
m_tail2->m_next2 = slot;
m_tail2 = slot;
}
else {
} else {
slot->m_next2 = NULL;
slot->m_prev2 = NULL;
m_head2 = slot;
m_tail2 = slot;
}
// also to callback candidates if we should
// if ( hasCallback ) {
// slot->m_next3 = m_head3;
// slot->m_prev3 = NULL;
// if ( m_head3 ) m_head3->m_prev3 = slot;
// m_head3 = slot;
// }
// count it
m_numUsedSlots++;
if ( incoming ) m_numUsedSlotsIncoming++;
if ( incoming ) {
m_numUsedSlotsIncoming++;
}
slot->m_incoming = incoming;
// now store ptr in hash table
slot->m_key = k;
addKey ( k , slot );
addKey(k, slot);
return slot;
}
@ -2533,25 +2514,22 @@ UdpSlot *UdpServer::getUdpSlot ( key_t k ) {
void UdpServer::addToCallbackLinkedList ( UdpSlot *slot ) {
// debug log
if ( g_conf.m_logDebugUdp && slot->m_errno )
log("udp: adding slot with err = %s to callback list"
, mstrerror(slot->m_errno) );
log("udp: adding slot with err = %s to callback list", mstrerror(slot->m_errno) );
if ( g_conf.m_logDebugUdp )
log("udp: adding slot=%" PTRFMT" to callback list"
,(PTRTYPE)slot);
log("udp: adding slot=%" PTRFMT" to callback list", (PTRTYPE)slot);
// must not be in there already, lest we double add it
if ( isInCallbackLinkedList ( slot ) ) {
if ( g_conf.m_logDebugUdp )
log("udp: avoided double add slot=%" PTRFMT
,(PTRTYPE)slot);
logDebug(g_conf.m_logDebugUdp, "udp: avoided double add slot=%" PTRFMT,(PTRTYPE)slot);
return;
}
slot->m_next3 = NULL;
slot->m_prev3 = NULL;
if ( ! m_tail3 ) {
m_head3 = slot;
m_tail3 = slot;
}
else {
} else {
// insert at end of linked list otherwise
m_tail3->m_next3 = slot;
slot->m_prev3 = m_tail3;
@ -2568,10 +2546,7 @@ bool UdpServer::isInCallbackLinkedList ( UdpSlot *slot ) {
}
void UdpServer::removeFromCallbackLinkedList ( UdpSlot *slot ) {
if ( g_conf.m_logDebugUdp )
log("udp: removing slot=%" PTRFMT" from callback list"
,(PTRTYPE)slot);
logDebug(g_conf.m_logDebugUdp, "udp: removing slot=%" PTRFMT" from callback list",(PTRTYPE)slot);
// return if not in the linked list
if ( slot->m_prev3 == NULL &&
@ -2654,8 +2629,10 @@ void UdpServer::cancel ( void *state , msg_type_t msgType ) {
if (slot->m_state != state || slot->getMsgType() != msgType) {
continue;
}
// note it
log(LOG_INFO,"udp: cancelled udp socket. msgType=0x%02x.", slot->getMsgType());
// let them know why we are calling the callback prematurely
g_errno = ECANCELLED;
// stop waiting for reply, this will call destroySlot(), too
@ -2690,19 +2667,13 @@ void UdpServer::replaceHost ( Host *oldHost, Host *newHost ) {
if ( ++i >= m_numBuckets ) i = 0;
// sanity check
if ( ! m_ptrs[i] ) {
log(LOG_LOGIC,"udp: replaceHost: Slot not in hash "
"table.");
log(LOG_LOGIC,"udp: replaceHost: Slot not in hash table.");
g_process.shutdownAbort(true);
}
if ( g_conf.m_logDebugUdp )
log(LOG_DEBUG,
"udp: replaceHost: Rehashing slot "
"tid=%" PRId32" dst=%s:%" PRIu32" "
"slot=%" PTRFMT"",
slot->m_transId,
iptoa(slot->m_ip)+6,
(uint32_t)slot->m_port,
(PTRTYPE)slot);
logDebug(g_conf.m_logDebugUdp, "udp: replaceHost: Rehashing slot tid=%" PRId32" dst=%s:%" PRIu32" slot=%" PTRFMT,
slot->m_transId, iptoa(slot->m_ip)+6, (uint32_t)slot->m_port, (PTRTYPE)slot);
// remove the bucket
m_ptrs [ i ] = NULL;
// rehash all buckets below

@ -40,24 +40,14 @@
#include "Hostdb.h"
#include "Loop.h" // loop class that handles signals on our socket
//#ifdef _SMALLDGRAMS_
//#define MAX_UDP_SLOTS 1000
//#else
//#define MAX_UDP_SLOTS 5000
//#endif
// . The rules of Async Sig Safe functions
// 1. to be safe, _ass functions should only call other _ass functions.
// otherwise, that function may be re-entered by the async sig handler.
// 2. most _ass functions turn off interrupts or return if g_inSigHandler.
// otherwise, they will need to be re_entrant... i.e. while entered from the
// main process they might be interrupted and entered from the sig handler.
// 3. only _ass functions should be called from an ASYNC signal handler.
static const int64_t udpserver_sendrequest_infinite_timeout = 999999999999;
class UdpServer {
@ -122,8 +112,6 @@ public:
int64_t timeout = 60000, // milliseconds
int16_t backoff = -1,
int16_t maxWait = -1, // ms
char *replyBuf = NULL,
int32_t replyBufMaxSize = 0,
int32_t niceness = 1,
int32_t maxResends = -1);
@ -148,15 +136,6 @@ public:
// . his callback will be called with errno set to "errnum"
void sendErrorReply( UdpSlot *slot, int32_t errnum );
int32_t getNumUsedSlots() {
return m_numUsedSlots;
}
int32_t getNumUsedSlotsIncoming () {
return m_numUsedSlotsIncoming;
}
// . when a request/msg of type "msgType" is received we call the
// corresponding request handler on this machine
// . use this function to register a handler for a msgType
@ -183,18 +162,9 @@ public:
// . set g_errno on error
bool shutdown ( bool urgent );
bool needBottom () { return m_needBottom; }
// try calling makeCallback() on all slots
bool makeCallbacks_ass ( int32_t niceness );
bool m_writeRegistered;
UdpSlot *getActiveHead ( ) {
return m_head2;
}
// cancel a transaction
void cancel(void *state, msg_type_t msgType);
@ -203,18 +173,31 @@ public:
void printState();
// . we have up to 1 handler routine for each msg type
// . call these handlers for the corresponding msgType
// . msgTypes go from 0 to 64 i think (see UdpProtocol.h dgram header)
void (* m_handlers[MAX_MSG_TYPES])(UdpSlot *slot, int32_t niceness);
int32_t getNumUsedSlots() {
return m_numUsedSlots;
}
int32_t getNumUsedSlotsIncoming () {
return m_numUsedSlotsIncoming;
}
bool needBottom() {
return m_needBottom;
}
bool m_writeRegistered;
UdpSlot *getActiveHead ( ) {
return m_head2;
}
bool hasHandler(int i) {
return (m_handlers[i]);
}
// changes timeout to very low on dead hosts
bool timeoutDeadHosts ( class Host *h );
static void readPollWrapper(int fd, void *state);
static void timePollWrapper(int fd, void *state);
static void sendPollWrapper(int fd, void *state);
// . we need a transaction id for every transaction so we can match
// incoming reply msgs with their corresponding request msgs
// . TODO: should be stored to disk on shutdown and every 1024 sends
@ -224,6 +207,10 @@ public:
int32_t m_nextTransId;
private:
static void readPollWrapper(int fd, void *state);
static void timePollWrapper(int fd, void *state);
static void sendPollWrapper(int fd, void *state);
// . take a slot that we made from sendRequest() above and reset it
// . you request will be sent again w/ the original parameters
// void resendSlot ( UdpSlot *slot );
@ -289,6 +276,11 @@ private:
// . called by readPoll()
int32_t readSock_ass ( UdpSlot **slot , int64_t now );
// . we have up to 1 handler routine for each msg type
// . call these handlers for the corresponding msgType
// . msgTypes go from 0 to 64 i think (see UdpProtocol.h dgram header)
void (* m_handlers[MAX_MSG_TYPES])(UdpSlot *slot, int32_t niceness);
// when a call to sendto() blocks we set this to true so Loop.cpp
// will know to manually call sendPoll_ass() rather than counting
// on receiving a fd-ready-for-writing signal for this UdpServer
@ -337,8 +329,6 @@ private:
// turn them interrupts off before calling this
UdpSlot *getUdpSlot(key_t k);
// . hash table for converting keys to slots
// . if m_ptrs[i] is NULL, ith bucket is empty
UdpSlot **m_ptrs;

@ -274,9 +274,7 @@ bool UdpSlot::sendSetup(char *msg,
void (*callback)(void *state, UdpSlot *slot),
int32_t niceness,
int16_t backoff,
int16_t maxWait,
char *replyBuf,
int32_t replyBufMaxSize) {
int16_t maxWait) {
#ifdef _VALGRIND_
VALGRIND_CHECK_MEM_IS_DEFINED(msg,msgSize);
@ -311,24 +309,6 @@ bool UdpSlot::sendSetup(char *msg,
m_backoff = backoff;
m_maxWait = maxWait;
// . only set m_readBuf if we should
// . sendSetup() is called by slots sending a request
// . sendSetup() is called by slots sending a reply
// . so m_readBuf may have info in it if we're sending a reply so
// just don't NULLify it, it needs to be freed. This was causing
// a memleak for receivers of Msg0x01s
if ( replyBuf ) {
if ( m_readBuf ) {
g_errno = EBADENGINEER;
log(LOG_LOGIC,"udp: Trying to initialize a udp socket for sending, but its read buffer is not empty.");
return false;
}
m_readBuf = replyBuf;
m_readBufSize = 0;
m_readBufMaxSize = replyBufMaxSize;
}
// we haven't sent anything yet so reset this to -1
m_firstSendTime = -1;

@ -76,7 +76,7 @@ public:
// . use a backoff of -1 for the default
bool sendSetup(char *msg, int32_t msgSize, char *alloc, int32_t allocSize, msg_type_t msgType, int64_t now,
void *state, void (*callback)(void *state, class UdpSlot *), int32_t niceness, int16_t backoff,
int16_t maxWait, char *replyBuf, int32_t replyBufMaxSize);
int16_t maxWait);
// . send a datagram from this slot on "sock" (call after sendSetup())
// . returns -2 if nothing to send, -1 on error, 0 if blocked,

@ -7025,12 +7025,7 @@ int32_t *XmlDoc::getIp ( ) {
//m_ipValid = true;
// get it
logTrace( g_conf.m_logTraceXmlDoc, "Calling MsgC.getIp [%s]", u->getHost());
if ( ! m_msgc.getIp ( u->getHost () ,
u->getHostLen() ,
&m_ip ,
this ,
gotIpWrapper ))
{
if (!m_msgc.getIp(u->getHost(), u->getHostLen(), &m_ip, this, gotIpWrapper)) {
// we blocked
logTrace( g_conf.m_logTraceXmlDoc, "END, return -1. Blocked." );
return (int32_t *)-1;
@ -14200,8 +14195,16 @@ char *XmlDoc::getMetaList ( bool forDelete ) {
m_docId ) {
g_process.shutdownAbort(true); }
if ( ! dt8.addKey(&hk,&rec) )
{
if( g_conf.m_noInMemoryPosdbMerge && rdbId == RDB_POSDB ) {
// NEW 20160803.
// Do not store records for POSDB in the hash table of old
// values. This makes sure that no delete records are
// stored in posdb for existing terms, which is needed for
// the new no-merge feature.
continue;
}
if ( ! dt8.addKey(&hk,&rec) ) {
logTrace( g_conf.m_logTraceXmlDoc, "addKey failed" );
return NULL;
}
@ -19904,11 +19907,8 @@ bool XmlDoc::printCachedPage ( SafeBuf *sb , HttpRequest *hr ) {
int32_t isXml = hr->getLong("xml",0);
int32_t raw = hr->getLong("raw",0);
if ( ! isXml && ! raw ) printMenu ( sb );
if ( ! isXml ) {
printMenu ( sb );
// just copy it otherwise
if ( ptr_utf8Content )
sb->safeMemcpy ( ptr_utf8Content ,size_utf8Content -1);

@ -100,8 +100,6 @@ void dumpClusterdb ( const char *coll,int32_t sfn,int32_t numFiles,bool in
void dumpLinkdb ( const char *coll, int32_t sfn, int32_t numFiles, bool includeTree,
const char *url );
bool isRecoveryFutile ( ) ;
int copyFiles ( const char *dstDir ) ;
@ -1427,17 +1425,6 @@ int main2 ( int argc , char *argv[] ) {
return doCmd( replaceCmd, -1, "admin/hosts", true, true );
}
// once we are in recoverymode, that means we are being restarted
// from having cored, so to prevent immediate core and restart
// ad inifinitum, look got "sigbadhandler" at the end of the
// last 5 logs in the last 60 seconds. if we see that then something
// is prevent is from starting up so give up and exit gracefully
if ( g_recoveryMode && isRecoveryFutile () ) {
// exiting with 0 means no error and should tell our
// keep alive loop to not restart us and exit himself.
exit (0);
}
// HACK: enable logging for Conf.cpp, etc.
g_process.m_powerIsOn = true;
@ -7835,77 +7822,6 @@ int collinject ( char *newHostsConf ) {
return 1;
}
bool isRecoveryFutile ( ) {
// scan logs in last 60 seconds
Dir dir;
dir.set ( g_hostdb.m_dir );
dir.open ();
// scan files in dir
const char *filename;
int32_t now = getTimeLocal();
int32_t fails = 0;
while ( ( filename = dir.getNextFilename ( "*" ) ) ) {
// filename must be a certain length
//int32_t filenameLen = strlen(filename);
const char *p = filename;
if ( !strstr ( filename,"log") ) continue;
// skip "log"
p += 3;
// skip digits for hostid
while ( isdigit(*p) ) p++;
// skip hyphen
if ( *p != '-' ) continue;
p++;
// open file
File ff;
ff.set ( dir.getDir() , filename );
// skip if 0 bytes or had error calling ff.getFileSize()
int32_t fsize = ff.getFileSize();
if ( fsize == 0 ) continue;
ff.open ( O_RDONLY );
// get time stamp
int32_t timestamp = ff.getLastModifiedTime ( );
// skip if not iwthin 2 minutes
if ( timestamp < now - 2*60 ) continue;
// open it up to see if ends with sighandle
int32_t toRead = 3000;
if ( toRead > fsize ) toRead = fsize;
char mbuf[3002];
ff.read ( mbuf , toRead , fsize - toRead );
bool failedToStart = false;
if ( strstr (mbuf,"sigbadhandler") ) failedToStart = true;
if ( strstr (mbuf,"Failed to bind") ) failedToStart = true;
if ( ! failedToStart ) continue;
// count it otherwise
fails++;
}
// if we had less than 5 failures to start in last 60 secs
// do not consider futile
if ( fails < 5 ) return false;
log( LOG_WARN, "process: KEEP ALIVE LOOP GIVING UP. Five or more cores in last 60 seconds.");
// otherwise, give up!
return true;
}
const char *getcwd2 ( char *arg2 ) {
char argBuf[1026];
char *arg = argBuf;