Merge branch 'master' into nomerge2

This commit is contained in:
Ivan Skytte Jørgensen
2016-08-11 14:14:06 +02:00
41 changed files with 676 additions and 921 deletions

@ -689,7 +689,7 @@ void Blaster::gotDoc2 ( void *state, TcpSocket *s){
// yahoo we have to add other checks.
char domain2[256];
int32_t dlen = 0;
char *dom = getDomFast ( st->m_u2 , &dlen );
const char *dom = getDomFast ( st->m_u2 , &dlen );
if ( dom ) strncpy(domain2,dom,dlen);
domain2[dlen]='\0';
for (int32_t i=0;i<links2.getNumLinks();i++){

29
Dns.cpp

@ -22,7 +22,7 @@ static RdbCache g_timedoutCache;
static int64_t s_antiLockCount = 1LL;
#define TIMEOUT_SINGLE_HOST 30
#define TIMEOUT_SINGLE_HOST_MS 30000
#define TIMEOUT_TOTAL 90
static void gotIpWrapper ( void *state , UdpSlot *slot ) ;
@ -1172,7 +1172,6 @@ bool Dns::sendToNextDNS ( DnsState *ds ) {
iptoa(ds->m_dnsIps[depth][n]), (int32_t)depth,(int32_t)n,
(int32_t) ds->m_numTried, ds->m_hostname , (int32_t)transId);
UdpSlot *slotPtr = NULL;
// . queue a send
// . this returns false and sets g_errno on error
// . calls callback when reply is received
@ -1183,33 +1182,15 @@ bool Dns::sendToNextDNS ( DnsState *ds ) {
// . well, i went back to 30 seconds after i fixed the transId overflow
// bug
// . resend time is set to 20 seconds in UdpSlot::setResendTime()
if ( ! m_udpServer.sendRequest ( ds->m_request ,//copy ,
ds->m_requestSize,//msgSize ,
msg_type_0 , /// @todo ALC don't think dns should be using msg_type_0
ip , // ds->m_dnsIps[depth][n] ,
53 ,//g_conf.m_dnsPorts[n],
-1 ,// invalid host id
&slotPtr , // slot ptr
ds , // cback state
gotIpWrapper , // callback
TIMEOUT_SINGLE_HOST*1000 , // 20 secs?
-1, // backoff
-1, // maxWait
// use niceness 0 now so if the
// msgC slot gets converted from 1
// to 0 this will not hold it up!
0) ) {
// use niceness 0 now so if the msgC slot gets converted from 1 to 0 this will not hold it up!
if (!m_udpServer.sendRequest(ds->m_request, ds->m_requestSize, msg_type_dns, ip, 53, -1, NULL, ds, gotIpWrapper, TIMEOUT_SINGLE_HOST_MS, 0, ds->m_hostname)) {
// g_errno should be set at this point and we will not try
// any more nameservers because the error seemed too bad.
log(LOG_DEBUG, "dns: errors seemed too bad for '%s'...",
ds->m_hostname);
log(LOG_DEBUG, "dns: errors seemed too bad for '%s'...", ds->m_hostname);
return true;
}
// store a hack for PageSockets.cpp to print out the hostname
slotPtr->m_hostname = ds->m_hostname;
// return 0 cuz we're blocking on the reply
log(LOG_DEBUG, "dns: SendToNextDNS blocking on reply for '%s'",
ds->m_hostname);
log(LOG_DEBUG, "dns: SendToNextDNS blocking on reply for '%s'", ds->m_hostname);
return false;
}

@ -4,7 +4,7 @@
#include "Domains.h"
#include "Mem.h"
static bool isTLD ( char *tld, int32_t tldLen );
static bool isTLD ( const char *tld, int32_t tldLen );
char *getDomainOfIp ( char *host , int32_t hostLen , int32_t *dlen ) {
// get host length
@ -21,13 +21,13 @@ char *getDomainOfIp ( char *host , int32_t hostLen , int32_t *dlen ) {
}
char *getDomain ( char *host , int32_t hostLen , char *tld , int32_t *dlen ) {
const char *getDomain ( char *host , int32_t hostLen , const char *tld , int32_t *dlen ) {
// assume no domain
*dlen = 0;
// get host length
//int32_t hostLen = strlen(host);
// get the tld in host, if any, if not, it returns NULL
char *s = tld; // getTLD ( host , hostLen );
const char *s = tld; // getTLD ( host , hostLen );
// return NULL if host contains no valid tld
if ( ! s ) return NULL;
// if s is host we just have tld
@ -50,19 +50,19 @@ char *getDomain ( char *host , int32_t hostLen , char *tld , int32_t *dlen ) {
}
// host must be NULL terminated
char *getTLD ( char *host , int32_t hostLen ) {
const char *getTLD ( const char *host , int32_t hostLen ) {
if(hostLen==0)
return NULL;
// make "s" point to last period in the host
//char *s = host + strlen(host) - 1;
char *hostEnd = host + hostLen;
char *s = hostEnd - 1;
const char *hostEnd = host + hostLen;
const char *s = hostEnd - 1;
while ( s > host && *s !='.' ) s--;
// point to the tld in question
char *t = s;
const char *t = s;
if ( *t == '.' ) t++;
// reset our current tld ptr
char *tld = NULL;
const char *tld = NULL;
// is t a valid tld? if so, set "tld" to "t".
if ( isTLD ( t , hostEnd - t ) ) tld = t;
// host had no period at most we had just a tld so return NULL
@ -102,7 +102,7 @@ const char* getPrivacoreBlacklistedTLD() {
//static TermTable s_table(false);
static HashTableX s_table;
static bool isTLD ( char *tld , int32_t tldLen ) {
static bool isTLD ( const char *tld , int32_t tldLen ) {
int32_t pcount = 0;
// now they are random!

@ -7,13 +7,13 @@
// . returns NULL if not in the accepted list
// . "host" must be NULL terminated and in LOWER CASE
// . returns ptr into host that marks the domain name
char *getDomain ( char *host , int32_t hostLen , char *tld , int32_t *dlen );
const char *getDomain ( char *host , int32_t hostLen , const char *tld , int32_t *dlen );
// when host is like 192.0.2.1 use this one
char *getDomainOfIp ( char *host , int32_t hostLen , int32_t *dlen );
// used by getDomain() above
char *getTLD ( char *host , int32_t hostLen ) ;
const char *getTLD ( const char *host , int32_t hostLen ) ;
const char* getPrivacoreBlacklistedTLD();

@ -73,8 +73,7 @@ const char *Hostdb::getNetName ( ) {
// . gets filename that contains the hosts from the Conf file
// . return false on errro
// . g_errno may NOT be set
bool Hostdb::init ( int32_t hostIdArg , char *netName ,
bool proxyHost, char useTmpCluster, const char *cwd ) {
bool Hostdb::init(int32_t hostIdArg, char *netName, bool proxyHost, char useTmpCluster, const char *cwd) {
// reset my ip and port
m_myIp = 0;
m_myIpShotgun = 0;
@ -89,17 +88,21 @@ bool Hostdb::init ( int32_t hostIdArg , char *netName ,
m_initialized = true;
const char *dir = "./";
if ( cwd ) dir = cwd;
if (cwd) {
dir = cwd;
}
const char *filename = "hosts.conf";
// for now we autodetermine
if ( hostIdArg != -1 ) { g_process.shutdownAbort(true); }
if ( hostIdArg != -1 ) {
g_process.shutdownAbort(true);
}
// init to -1
m_hostId = -1;
retry:
retry:
// save the name of the network... we can have multiple networks now
// since we need to get title recs from separate networks for getting
// link text for gov.gigablast.com
@ -119,7 +122,7 @@ bool Hostdb::init ( int32_t hostIdArg , char *netName ,
if ( status == 0 ) {
g_errno = ENOHOSTSFILE;
// now we generate one if that is not there
createFile:
createFile:
if ( ! m_created ) {
m_created = true;
g_errno = 0;
@ -127,11 +130,13 @@ bool Hostdb::init ( int32_t hostIdArg , char *netName ,
createHostsConf( cwd );
goto retry;
}
log("conf: Filename %s does not exist." ,filename);
log(LOG_WARN, "conf: Filename %s does not exist." ,filename);
return false;
}
// get file size
m_bufSize = f.getFileSize();
// return false if too big
if ( m_bufSize > (MAX_HOSTS+MAX_SPARES) * 128 ) {
g_errno = EBUFTOOSMALL;
@ -139,8 +144,12 @@ bool Hostdb::init ( int32_t hostIdArg , char *netName ,
filename,m_bufSize, (int32_t)(MAX_HOSTS+MAX_SPARES)*128);
return false;
}
// open the file
if ( ! f.open ( O_RDONLY ) ) return false;
if ( ! f.open ( O_RDONLY ) ) {
return false;
}
// read in the file
numRead = f.read ( m_buf , m_bufSize , 0 /*offset*/ );
// ensure g_errno is now set if numRead != m_bufSize
@ -167,41 +176,38 @@ bool Hostdb::init ( int32_t hostIdArg , char *netName ,
// MUST be a number
if ( ! is_digit ( *p ) ) {
// skip known directives
if ( ! strncmp(p,"port-offset:",12) ||
! strncmp(p,"index-splits:",13) ||
! strncmp(p,"num-mirrors:",12) ||
! strncmp(p,"working-dir:",12) )
p = p;
// check if this is a spare host
else if ( //pend - p < 5 &&
strncasecmp(p, "spare", 5) == 0 )
// count as a spare
if (!strncmp(p, "port-offset:", 12) ||
!strncmp(p, "index-splits:", 13) ||
!strncmp(p, "num-mirrors:", 12) ||
!strncmp(p, "working-dir:", 12)) {
// no op
}else if (strncasecmp(p, "spare", 5) == 0) {
// check if this is a spare host
m_numSpareHosts++;
// check if this is a proxy host
else if ( //pend - p < 5 &&
strncasecmp(p, "proxy", 5) == 0 )
// count as a spare
} else if (strncasecmp(p, "proxy", 5) == 0) {
// check if this is a proxy host
m_numProxyHosts++;
// query compression proxies count as proxies
else if ( strncasecmp(p, "qcproxy", 7) == 0 )
} else if (strncasecmp(p, "qcproxy", 7) == 0) {
// query compression proxies count as proxies
m_numProxyHosts++;
// spider compression proxies count as proxies
else if ( strncasecmp(p, "scproxy", 7) == 0 )
} else if (strncasecmp(p, "scproxy", 7) == 0) {
// spider compression proxies count as proxies
m_numProxyHosts++;
else {
} else {
log(LOG_WARN, "conf: %s is malformed. First item of each non-comment line must be a NUMERIC hostId, "
"SPARE or PROXY. line=%s", filename, p);
return false;
}
}
else
} else {
// count it as a host
m_numHosts++;
}
i++;
// skip line
while ( *p && *p != '\n' ) p++;
while ( *p && *p != '\n' ) {
p++;
}
}
// set g_errno, log and return false if no hosts found in the file
@ -235,7 +241,6 @@ bool Hostdb::init ( int32_t hostIdArg , char *netName ,
int32_t num_nospider = 0;
int32_t num_noquery = 0;
// int32_t num_fullfunc = 0;
for ( ; *p ; p++ , line++ ) {
if ( is_wspace_a (*p) ) continue;
@ -455,13 +460,9 @@ bool Hostdb::init ( int32_t hostIdArg , char *netName ,
"not in /etc/hosts. Using secondary "
"ethernet (eth1) ip "
"of %s",hostname2,iptoa(ip));
//nextip = ip;
// just use the old ip then!
//g_errno = EBADENGINEER;
//return false;
}
}
//retired:
// if none, use initial ip as shotgun as well
if ( ! ip2 ) ip2 = ip;
// store the ip, the eth1 ip
@ -520,6 +521,7 @@ bool Hostdb::init ( int32_t hostIdArg , char *netName ,
h->m_queryEnabled = true;
h->m_spiderEnabled = true;
// check for something after the working dir
h->m_note[0] = '\0';
if ( *p != '\n' ) {
@ -540,12 +542,9 @@ bool Hostdb::init ( int32_t hostIdArg , char *netName ,
h->m_spiderEnabled = false;
num_nospider++;
}
// else {
// num_fullfunc++;
// }
} else {
*p = '\0';
}
else
*p = '\0';
// keep these the same for now
h->m_externalHttpPort = h->m_httpPort;
@ -586,7 +585,9 @@ bool Hostdb::init ( int32_t hostIdArg , char *netName ,
}
// take off slash if there
if ( wdir[wdirlen-1]=='/' ) wdir[--wdirlen]='\0';
if ( wdir[wdirlen-1]=='/' ) {
wdir[--wdirlen]='\0';
}
// get real path (no symlinks symbolic links)
// only if on same host, which we determine based on the IP-address.
@ -608,18 +609,15 @@ bool Hostdb::init ( int32_t hostIdArg , char *netName ,
// don't breach Host::m_dir[128] buffer
if ( wdirlen >= 128 ) {
log("conf: working dir %s is too long, >= 128 chars.",
wdir);
log(LOG_WARN, "conf: working dir %s is too long, >= 128 chars.", wdir);
return false;
}
// copy it over
//strcpy ( m_hosts[i].m_dir , wdir );
gbmemcpy(m_hosts[i].m_dir, wdir, wdirlen);
m_hosts[i].m_dir[wdirlen] = '\0';
// reset this
//m_hosts[i].m_pingInfo.m_lastPing = 0LL;
m_hosts[i].m_lastPing = 0LL;
// and don't send emails on him until we got a good ping
m_hosts[i].m_emailCode = -2;
@ -632,20 +630,17 @@ bool Hostdb::init ( int32_t hostIdArg , char *netName ,
// point to next one
i++;
}
//m_numHosts = i;
m_numTotalHosts = i;
// BR 20160313: Sanity check. I doubt the striping functionality works with an odd mix
// of noquery and nospider hosts. Make sure the number of each kind is the same for now.
if( num_nospider && num_noquery && num_nospider != num_noquery )
{
if (num_nospider && num_noquery && num_nospider != num_noquery) {
g_errno = EBADENGINEER;
log(LOG_ERROR,"Number of nospider and noquery hosts must match in hosts.conf");
return false;
}
// # of mirrors is zero if no mirrors,
// if it is 1 then each host has ONE MIRROR host
if ( numMirrors == 0 )
@ -1224,8 +1219,8 @@ Host *Hostdb::getHostWithSpideringEnabled ( uint32_t shardNum ) {
}
// if niceness 0 can't pick noquery host.
// if niceness 1 can't pick nospider host.
// if niceness 0 can't pick noquery host/ must pick spider host.
// if niceness 1 can't pick nospider host/ must pick query host.
Host *Hostdb::getLeastLoadedInShard ( uint32_t shardNum , char niceness ) {
int32_t minOutstandingRequests = 0x7fffffff;
int32_t minOutstandingRequestsIndex = -1;

@ -333,10 +333,11 @@ class Hostdb {
Host *getProxy ( int32_t proxyId ) {
return m_proxyHosts[proxyId]; }
int32_t getNumHosts ( ) { return m_numHosts; }
int32_t getNumProxy ( ) { return m_numProxyHosts; }
int32_t getNumProxies ( ) { return m_numProxyHosts; }
int32_t getNumGrunts ( ) { return m_numHosts; }
int32_t getNumHosts() { return m_numHosts; }
int32_t getNumProxy() { return m_numProxyHosts; }
int32_t getNumProxies() { return m_numProxyHosts; }
int32_t getNumGrunts() { return m_numHosts; }
// how many of the hosts are non-dead?
int32_t getNumHostsAlive ( ) { return m_numHostsAlive; }
int32_t getNumProxyAlive ( ) { return m_numProxyAlive; }

@ -580,23 +580,8 @@ log("@@@@ getLinkInfo(): url=%s",url?url:"<null>");
req->serialize();
// this should always block
if ( ! mcast->send (
(char *)req ,
req->getStoredSize() ,
msg_type_25 ,
false , // does multicast own request?
shardNum ,
false , // send to whole group?
0 , // key is passed on startKey
req , // state data
NULL , // state data
gotMulticastReplyWrapper25 ,
// if this is too low we core in XmlDoc.cpp
// after getNewSpiderReply() returns a -1 because
// it blocks for some reason.
multicast_infinite_send_timeout , // timeout
req->m_niceness ,
hostId )) {// firstHostId ,
// if timeout is too low we core in XmlDoc.cpp after getNewSpiderReply() returns a -1 because it blocks for some reason.
if (!mcast->send((char *)req, req->getStoredSize(), msg_type_25, false, shardNum, false, 0, req, NULL, gotMulticastReplyWrapper25, multicast_infinite_send_timeout, req->m_niceness, hostId)) {
log( LOG_WARN, "linkdb: Failed to send multicast for %s err=%s", u.getUrl(),mstrerror(g_errno));
return true;
}

@ -22,7 +22,7 @@ OBJS = UdpSlot.o Rebalance.o \
Collectiondb.o \
linkspam.o ip.o sort.o \
fctypes.o XmlNode.o XmlDoc.o XmlDoc_Indexing.o Xml.o \
Words.o UdpServer.o \
Words.o UdpServer.o UdpStatistic.o \
Titledb.o HashTable.o \
TcpServer.o Summary.o \
Spider.o SpiderColl.o SpiderLoop.o Doledb.o \
@ -411,6 +411,9 @@ TopTree.o:
UdpServer.o:
$(CXX) $(DEFS) $(CPPFLAGS) $(O2) -c $*.cpp
UdpStatistic.o:
$(CXX) $(DEFS) $(CPPFLAGS) $(O3) -c $*.cpp
RdbList.o:
$(CXX) $(DEFS) $(CPPFLAGS) $(O3) -c $*.cpp

@ -360,19 +360,7 @@ skip:
// . returns false on error and sets g_errno, true otherwise
// . calls callback when reply is received (or error)
// . we return true if it returns false
if ( ! g_udpServer.sendRequest ( m_request ,
m_requestSize ,
msg_type_0 ,
h->m_ip ,
port ,
m_hostId ,
NULL , // the slotPtr
this ,
gotSingleReplyWrapper ,
timeout ,
-1 , // backoff
-1 , // maxwait
m_niceness ) ) { // cback niceness
if (!g_udpServer.sendRequest(m_request, m_requestSize, msg_type_0, h->m_ip, port, m_hostId, NULL, this, gotSingleReplyWrapper, timeout, m_niceness)) {
logTrace( g_conf.m_logTraceMsg0, "END, return true. Request sent" );
return true;
}
@ -406,20 +394,8 @@ skip:
// get the multicast
Multicast *m = &m_mcast;
if ( ! m->send ( m_request ,
m_requestSize,
msg_type_0 ,
false , // does multicast own request?
m_shardNum ,
false , // send to whole group?
//m_startKey.n1, // key is passed on startKey
keyTop , // key is passed on startKey
this , // state data
NULL , // state data
gotMulticastReplyWrapper0 ,
timeout*1000 , // timeout
niceness ,
firstHostId) ) {
// key is passed on startKey
if (!m->send(m_request, m_requestSize, msg_type_0, false, m_shardNum, false, keyTop, this, NULL, gotMulticastReplyWrapper0, timeout * 1000, niceness, firstHostId, getDbnameFromId(m_rdbId))) {
log(LOG_ERROR, "net: Failed to send request for data from %s in shard "
"#%" PRIu32" over network: %s.",
getDbnameFromId(m_rdbId),m_shardNum, mstrerror(g_errno));
@ -614,25 +590,24 @@ void handleRequest0 ( UdpSlot *slot , int32_t netnice ) {
logTrace( g_conf.m_logTraceMsg0, "numFiles.... %" PRId32, numFiles );
}
// error set from XmlDoc::cacheTermLists()?
if ( g_errno ) {
logTrace( g_conf.m_logTraceMsg0, "END. Invalid collection" );
log(LOG_ERROR,"%s:%s:%d: call sendErrorReply. Invalid collection", __FILE__, __func__, __LINE__);
us->sendErrorReply ( slot , EBADRDBID );
us->sendErrorReply(slot, EBADRDBID);
return;
}
// . get the rdb we need to get the RdbList from
// . returns NULL and sets g_errno on error
//Msg0 msg0;
//Rdb *rdb = msg0.getRdb ( rdbId );
Rdb *rdb = getRdbFromId ( rdbId );
if ( ! rdb ) {
Rdb *rdb = getRdbFromId(rdbId);
if (!rdb) {
logTrace( g_conf.m_logTraceMsg0, "END. Invalid rdbId" );
log(LOG_ERROR,"%s:%s:%d: call sendErrorReply. Invalid rdbId", __FILE__, __func__, __LINE__);
us->sendErrorReply ( slot , EBADRDBID );
us->sendErrorReply(slot, EBADRDBID);
return;
}
@ -649,7 +624,7 @@ void handleRequest0 ( UdpSlot *slot , int32_t netnice ) {
(int32_t)sizeof(State00),mstrerror(g_errno));
log(LOG_ERROR,"%s:%s:%d: call sendErrorReply.", __FILE__, __func__, __LINE__);
us->sendErrorReply ( slot , g_errno );
us->sendErrorReply(slot, g_errno);
return;
}
mnew ( st0 , sizeof(State00) , "State00" );
@ -708,8 +683,6 @@ void handleRequest0 ( UdpSlot *slot , int32_t netnice ) {
logTrace( g_conf.m_logTraceMsg0, "END" );
}
#include "Sections.h" // SectionVote
// . slot should be auto-nuked upon transmission or error
// . TODO: ensure if this sendReply() fails does it really nuke the slot?
void gotListWrapper ( void *state , RdbList *listb , Msg5 *msg5xx ) {
@ -747,11 +720,10 @@ void gotListWrapper ( void *state , RdbList *listb , Msg5 *msg5xx ) {
// TODO: free "slot" if this send fails
log(LOG_ERROR,"%s:%s:%d: call sendErrorReply. error=%s", __FILE__, __func__, __LINE__, mstrerror(g_errno));
us->sendErrorReply ( slot , g_errno );
us->sendErrorReply(slot, g_errno);
return;
}
QUICKPOLL(st0->m_niceness);
// point to the serialized list in "list"
char *data = list->getList();
int32_t dataSize = list->getListSize();
@ -760,9 +732,13 @@ void gotListWrapper ( void *state , RdbList *listb , Msg5 *msg5xx ) {
// tell list not to free the data since it is a reply so UdpServer
// will free it when it destroys the slot
list->setOwnData ( false );
// keep track of stats
Rdb *rdb = getRdbFromId ( st0->m_rdbId );
if ( rdb ) rdb->sentReplyGet ( dataSize );
if ( rdb ) {
rdb->sentReplyGet ( dataSize );
}
// TODO: can we free any memory here???
// keep track of how long it takes to complete the send
@ -831,14 +807,11 @@ void gotListWrapper ( void *state , RdbList *listb , Msg5 *msg5xx ) {
dataSize = list->getListSize();
}
//log("sending replySize=%" PRId32" min=%" PRId32,dataSize,msg5->m_minRecSizes);
// . TODO: dataSize may not equal list->getListMaxSize() so
// Mem class may show an imblanace
// . TODO: dataSize may not equal list->getListMaxSize() so Mem class may show an imblanace
// . now g_udpServer is responsible for freeing data/dataSize
// . the "true" means to call doneSending_ass() from the signal handler
// if need be
st0->m_us->sendReply_ass( data, dataSize, alloc, allocSize, slot, st0, doneSending_ass, -1, -1, true );
st0->m_us->sendReply_ass(data, dataSize, alloc, allocSize, slot, st0, doneSending_ass, -1, -1, true);
logTrace( g_conf.m_logTraceMsg0, "END" );
}

@ -435,6 +435,9 @@ skip:
// . 1 byte for rdbId, 1 byte for flags,
// then collection NULL terminated, then list
int32_t requestLen = 1 + 1 + sizeof(collnum_t) + listSize ;
/// @warning Changing position of rdbId will require a same change in UdpStatistic
// make the request
char *request = (char *) mmalloc ( requestLen ,"Msg1" );
if ( ! request ) return true;
@ -467,22 +470,13 @@ skip:
//int32_t niceness = 2;
//if ( requestLen < TMPBUFSIZE - 32 ) niceness = 0;
//log("msg1: sending mcast niceness=%" PRId32,m_niceness);
// . multicast to all hosts in group "groupId"
// . multicast::send() returns false and sets g_errno on error
// . we return false if we block, true otherwise
// . will loop indefinitely if a host in this group is down
if ( m_mcast.send ( request , // sets mcast->m_msg to this
requestLen , // sets mcast->m_msgLen to this
msg_type_1 ,
true , // does multicast own msg?
shardNum , // group to send to (groupKey)
true , // send to whole group?
0 , // key is useless for us
this , // state data
NULL , // state data
gotReplyWrapper1 ,
multicast_msg1_senddata_timeout , // timeout
m_niceness )) { // niceness
// key is useless for us
if (m_mcast.send(request, requestLen, msg_type_1, true, shardNum, true, 0, this, NULL, gotReplyWrapper1, multicast_msg1_senddata_timeout, m_niceness, -1, getDbnameFromId(m_rdbId))) {
return false;
}
@ -558,7 +552,7 @@ void handleRequest1 ( UdpSlot *slot , int32_t netnice ) {
if ( readBufSize <= 4 ) {
g_errno = EREQUESTTOOSHORT;
log(LOG_ERROR,"%s:%s:%d: call sendErrorReply. Request too short", __FILE__, __func__, __LINE__);
us->sendErrorReply ( slot , g_errno );
us->sendErrorReply(slot, g_errno);
return;
}
char *p = readBuf;
@ -569,7 +563,7 @@ void handleRequest1 ( UdpSlot *slot , int32_t netnice ) {
Rdb *rdb = getRdbFromId ( (char) rdbId );
if ( ! rdb ) {
log(LOG_ERROR,"%s:%s:%d: call sendErrorReply. Bad rdbid", __FILE__, __func__, __LINE__);
us->sendErrorReply ( slot, EBADRDBID );
us->sendErrorReply(slot, EBADRDBID);
return;
}
// keep track of stats
@ -615,73 +609,40 @@ void handleRequest1 ( UdpSlot *slot , int32_t netnice ) {
addedList ( slot , rdb );
}
//static void tryAgainWrapper ( int fd , void *state ) ;
// g_errno may be set when this is called
void addedList ( UdpSlot *slot , Rdb *rdb ) {
// no memory means to try again
if ( g_errno == ENOMEM ) g_errno = ETRYAGAIN;
// doing a full rebuid will add collections
if ( g_errno == ENOCOLLREC &&
g_repairMode > 0 )
//g_repair.m_fullRebuild )
if ( g_errno == ENOMEM ) {
g_errno = ETRYAGAIN;
}
// doing a full rebuid will add collections
if ( g_errno == ENOCOLLREC && g_repairMode > 0 ) {
g_errno = ETRYAGAIN;
}
// it seems like someone can delete a collection and there can
// be adds in transit to doledb and it logs
// "doledb bad collnum of 30110"
// so just absorb those
if ( g_errno == ENOCOLLREC ) {
log("msg1: missing collrec to add to to %s. just dropping.",
rdb->m_dbname);
log(LOG_WARN, "msg1: missing collrec to add to to %s. just dropping.", rdb->m_dbname);
g_errno = 0;
}
// . if we got a ETRYAGAIN cuz the buffer we add to was full
// then we should sleep and try again!
// . return false cuz this blocks for a period of time
// before trying again
// . but now to free the udp slot when we are doing an urgent merge
// let's send an error back!
//if ( g_errno == ETRYAGAIN ) {
// debug msg
//log("REGISTERING SLEEP CALLBACK");
// try adding again in 1 second
// g_loop.registerSleepCallback ( 1000, slot, tryAgainWrapper );
// return now
// return;
//}
// random test
//if ( (rand() % 10) == 1 ) g_errno = ETRYAGAIN;
//int32_t niceness = slot->getNiceness() ;
// select udp server based on niceness
UdpServer *us = &g_udpServer ;
//if ( niceness == 0 ) us = &g_udpServer2;
//else us = &g_udpServer ;
// chalk it up
rdb->sentReplyAdd ( 0 );
rdb->sentReplyAdd(0);
// are we done
if ( ! g_errno ) {
if (!g_errno) {
// . send an empty (non-error) reply as verification
// . slot should be auto-nuked on transmission/timeout of reply
// . udpServer should free the readBuf
us->sendReply_ass ( NULL , 0 , NULL , 0 , slot ) ;
g_udpServer.sendReply_ass(NULL, 0, NULL, 0, slot);
return;
}
// on other errors just send the err code back
log(LOG_ERROR,"%s:%s:%d: call sendErrorReply. error=%s", __FILE__, __func__, __LINE__, mstrerror(g_errno));
us->sendErrorReply ( slot , g_errno );
g_udpServer.sendErrorReply(slot, g_errno);
}
/*
void tryAgainWrapper ( int fd , void *state ) {
// stop waiting
g_loop.unregisterSleepCallback ( state , tryAgainWrapper );
// clear g_errno
g_errno = 0;
// get slot
UdpSlot *slot = (UdpSlot *)state;
// try adding again
handleRequest1 ( slot , -2 ); // slot->getNiceness() );
return;
}
*/

@ -255,21 +255,9 @@ bool Msg13::forwardRequest ( ) {
// . otherwise, send the request to the key host
// . returns false and sets g_errno on error
// . now wait for 2 minutes before timing out
if ( ! g_udpServer.sendRequest ( requestBuf, // (char *)r ,
requestBufSize ,
msg_type_13 ,
h->m_ip ,
h->m_port ,
// it was not using the proxy! because
// it thinks the hostid #0 is not
// the proxy... b/c ninad screwed that
// up by giving proxies the same ids
// as regular hosts!
-1 , // h->m_hostId ,
NULL ,
this , // state data
gotForwardedReplyWrapper ,
200000 )){// 200 sec timeout
// it was not using the proxy! because it thinks the hostid #0 is not the proxy... b/c ninad screwed that
// up by giving proxies the same ids as regular hosts!
if (!g_udpServer.sendRequest(requestBuf, requestBufSize, msg_type_13, h->m_ip, h->m_port, -1, NULL, this, gotForwardedReplyWrapper, 200000, 1)) {
// sanity check
if ( ! g_errno ) { g_process.shutdownAbort(true); }
// report it
@ -496,12 +484,8 @@ void handleRequest13 ( UdpSlot *slot , int32_t niceness ) {
// . an empty rec is a cached not found (no robot.txt file)
// . therefore it's allowed, so set *reply to 1 (true)
if ( inCache ) {
// log debug?
//if ( r->m_isSquidProxiedUrl )
if ( g_conf.m_logDebugSpider )
log("proxy: found %" PRId32" bytes in cache for %s",
recSize,r->ptr_url);
if (inCache) {
logDebug(g_conf.m_logDebugSpider, "proxy: found %" PRId32" bytes in cache for %s", recSize,r->ptr_url);
// helpful for debugging. even though you may see a robots.txt
// redirect and think we are downloading that each time,
@ -509,7 +493,7 @@ void handleRequest13 ( UdpSlot *slot , int32_t niceness ) {
//log("spider: %s was in cache",r->ptr_url);
// . send the cached reply back
// . this will free send/read bufs on completion/g_errno
g_udpServer.sendReply_ass ( rec , recSize , rec, recSize,slot);
g_udpServer.sendReply_ass(rec, recSize, rec, recSize, slot);
return;
}
rcl.unlock();
@ -549,24 +533,12 @@ void handleRequest13 ( UdpSlot *slot , int32_t niceness ) {
if ( g_conf.m_logDebugSpider || g_conf.m_logDebugMsg13 )
log(LOG_DEBUG,"spider: sending to compression proxy "
"%s:%" PRIu32,iptoa(h->m_ip),(uint32_t)h->m_port);
// . otherwise, send the request to the key host
// . returns false and sets g_errno on error
// . now wait for 2 minutes before timing out
if ( ! g_udpServer.sendRequest ( (char *)r ,
r->getSize() ,
msg_type_13 ,
h->m_ip ,
h->m_port ,
// we are sending to the proxy
// so make this -1
-1 , // h->m_hostId ,
NULL ,
r , // state data
passOnReply ,
200000 , // 200 sec timeout
-1,//backoff
-1,//maxwait
niceness)) {
// we are sending to the proxy so make hostId -1
if (!g_udpServer.sendRequest((char *)r, r->getSize(), msg_type_13, h->m_ip, h->m_port, -1, NULL, r, passOnReply, 200000, niceness)) {
// g_errno should be set
log(LOG_ERROR,"%s:%s:%d: call sendErrorReply. error=%s", __FILE__, __func__, __LINE__, mstrerror(g_errno));
@ -661,20 +633,10 @@ void downloadTheDocForReals2 ( Msg13Request *r ) {
// host #1 must take over! if all are dead, it returns host #0.
// so we are guaranteed "h will be non-null
Host *h = g_hostdb.getFirstAliveHost();
// now ask that host for the best spider proxy to send to
if ( ! g_udpServer.sendRequest ( (char *)r,
// just the top part of the
// Msg13Request is sent to
// handleRequest54() now
r->getProxyRequestSize() ,
msg_type_54 ,
h->m_ip ,
h->m_port ,
-1 , // h->m_hostId ,
NULL ,
r , // state data
gotProxyHostReplyWrapper ,
udpserver_sendrequest_infinite_timeout )){
// just the top part of the Msg13Request is sent to handleRequest54() now
if (!g_udpServer.sendRequest((char *)r, r->getProxyRequestSize(), msg_type_54, h->m_ip, h->m_port, -1, NULL, r, gotProxyHostReplyWrapper, udpserver_sendrequest_infinite_timeout)) {
// sanity check
if ( ! g_errno ) { g_process.shutdownAbort(true); }
// report it
@ -1090,16 +1052,7 @@ void gotHttpReply9 ( void *state , TcpSocket *ts ) {
Host *h = g_hostdb.getFirstAliveHost();
// now return the proxy. this will decrement the load count on
// host "h" for this proxy.
if ( g_udpServer.sendRequest ( (char *)r,
r->getProxyRequestSize(),
msg_type_54 ,
h->m_ip ,
h->m_port ,
-1 , // h->m_hostId ,
NULL ,
r , // state data
doneReportingStatsWrapper ,
10000 )){// 10 sec timeout
if (g_udpServer.sendRequest((char *)r, r->getProxyRequestSize(), msg_type_54, h->m_ip, h->m_port, -1, NULL, r, doneReportingStatsWrapper, 10000)) {
// it blocked!
//r->m_blocked = true;
s_55Out++;
@ -1661,8 +1614,7 @@ void gotHttpReply2 ( void *state ,
// this is not freeable
if ( copy == g_fakeReply ) copyAllocSize = 0;
// get request
Msg13Request *r2;
r2 = *(Msg13Request **)s_rt.getValueFromSlot(tableSlot);
Msg13Request *r2 = *(Msg13Request **)s_rt.getValueFromSlot(tableSlot);
// get udp slot for this transaction
UdpSlot *slot = r2->m_udpSlot;
// remove from list
@ -1678,14 +1630,16 @@ void gotHttpReply2 ( void *state ,
iptoa(r2->m_urlIp));
log(LOG_ERROR,"%s:%s:%d: call sendErrorReply. error=%s", __FILE__, __func__, __LINE__, mstrerror(err));
g_udpServer.sendErrorReply ( slot , err );
g_udpServer.sendErrorReply(slot, err);
continue;
}
// for debug for now
if ( g_conf.m_logDebugSpider || g_conf.m_logDebugMsg13 )
log("msg13: sending reply for %s",r->ptr_url);
// send reply
us->sendReply_ass ( copy,replySize,copy,copyAllocSize, slot );
us->sendReply_ass(copy, replySize, copy, copyAllocSize, slot);
// now final udp slot will free the reply, so tcp server
// no longer has to. set this tcp buf to null then.
if ( ts && ts->m_readBuf == reply && count == 0 )
@ -1727,9 +1681,9 @@ void passOnReply ( void *state , UdpSlot *slot ) {
slot->m_readBufSize = 0;
// prevent udpserver from trying to free g_fakeReply
if ( reply == g_fakeReply ) replyAllocSize = 0;
//int32_t replyAllocSize = slot->m_readBufSize;
// just forward it on
g_udpServer.sendReply_ass( reply, replySize, reply, replyAllocSize, r->m_udpSlot );
g_udpServer.sendReply_ass(reply, replySize, reply, replyAllocSize, r->m_udpSlot);
}
// returns true if <iframe> tag in there

@ -25,16 +25,7 @@ bool Msg1f::getLog(int32_t hostId,
char* p = sendBuf;
*(int32_t*)p = numBytes;
p += sizeof(int32_t);
g_udpServer.sendRequest(sendBuf,
p - sendBuf,
msg_type_1f,
g_hostdb.m_hostPtrs[hostId]->m_ip,
g_hostdb.m_hostPtrs[hostId]->m_port,
g_hostdb.m_hostPtrs[hostId]->m_hostId,
NULL,
callbackState,
callback,
5);
g_udpServer.sendRequest(sendBuf, p - sendBuf, msg_type_1f, g_hostdb.m_hostPtrs[hostId]->m_ip, g_hostdb.m_hostPtrs[hostId]->m_port, g_hostdb.m_hostPtrs[hostId]->m_hostId, NULL, callbackState, callback, 5000);
return false;
}

@ -213,20 +213,7 @@ bool Msg20::getSummary ( Msg20Request *req ) {
// . returns false and sets g_errno on error
// . use a pre-allocated buffer to hold the reply
// . TMPBUFSIZE is how much a UdpSlot can hold w/o allocating
if ( ! m_mcast.send ( m_request ,
m_requestSize ,
msg_type_20 ,
false , // m_mcast own m_request?
shardNum , // send to group (groupKey)
false , // send to whole group?
probDocId , // key is lower bits of docId
this , // state data
NULL , // state data
gotReplyWrapper20 ,
timeout , // timeout
req->m_niceness ,
firstHostId , // first hostid
false )) { // free reply buf?
if (!m_mcast.send(m_request, m_requestSize, msg_type_20, false, shardNum, false, probDocId, this, NULL, gotReplyWrapper20, timeout, req->m_niceness, firstHostId, NULL, false)) {
// sendto() sometimes returns "Network is down" so i guess
// we just had an "error reply".
log("msg20: error sending mcast %s",mstrerror(g_errno));

@ -157,21 +157,7 @@ bool Msg22::getTitleRec ( Msg22Request *r ,
// . returns false and sets g_errno on error
// . use a pre-allocated buffer to hold the reply
// . TMPBUFSIZE is how much a UdpSlot can hold w/o allocating
if ( ! m_mcast.send ( (char *)r ,
r->getSize() ,
msg_type_22 ,
false , // m_mcast own m_request?
shardNum , // send to group (groupKey)
false , // send to whole group?
//hostKey , // key is lower bits of docId
0 , // key is lower bits of docId
this , // state data
NULL , // state data
gotReplyWrapper22 ,
timeout*1000 , // timeout
r->m_niceness , // nice, reply size can be huge
firstHostId , // first hostid
false ) ){ // free reply buf?
if (!m_mcast.send((char *)r, r->getSize(), msg_type_22, false, shardNum, false, 0, this, NULL, gotReplyWrapper22, timeout * 1000, r->m_niceness, firstHostId, NULL, false)) {
log("db: Requesting title record had error: %s.",
mstrerror(g_errno) );
// set m_errno
@ -395,7 +381,7 @@ void handleRequest22 ( UdpSlot *slot , int32_t netnice ) {
int32_t dlen = 0;
// this causes ip based urls to be inconsistent with the call
// to getProbableDocId(url) below
char *dom = getDomFast ( r->m_url , &dlen );
const char *dom = getDomFast ( r->m_url , &dlen );
// bogus url?
if ( ! dom ) {
log(LOG_WARN, "msg22: got bad url in request: %s from "

@ -395,20 +395,7 @@ bool Msg3a::getDocIds ( Msg39Request *r ,
// . if that host takes more than about 5 secs then sends to
// next host
// . key should be largest termId in group we're sending to
bool status;
status = m->send ( req , // m_rbufPtr ,
m_rbufSize , // request size
msg_type_39 ,
false , // mcast owns m_request?
shardNum , // group to send to
false , // send to whole group?
(int32_t)qh , // 0 // startKey.n1
this , // state1 data
m , // state2 data
gotReplyWrapper3a ,
timeout , // timeout
m_r->m_niceness ,
firstHostId ); // -1// bestHandlingHostId
bool status = m->send(req, m_rbufSize, msg_type_39, false, shardNum, false, (int32_t)qh, this, m, gotReplyWrapper3a, timeout, m_r->m_niceness, firstHostId);
// if successfully launch, do the next one
if ( status ) continue;
// . this serious error should make the whole query fail

@ -595,22 +595,10 @@ bool sendBuffer ( int32_t hostId , int32_t niceness ) {
// . in that case we should restart from the top and we will add
// the dead host ids to the top, and multicast will avoid sending
// to hostids that are dead now
if ( mcast->send ( request , // sets mcast->m_msg to this
requestSize, // sets mcast->m_msgLen to this
msg_type_4 ,
false , // does multicast own msg?
shardNum,//groupId , // group to send to (groupKey)
true , // send to whole group?
0 , // key is useless for us
(void *)(PTRTYPE)allocSize , // state data
(void *)mcast , // state data
gotReplyWrapper4 ,
// this was 60 seconds, but if we saved the
// addsinprogress at the wrong time we might miss
// it when its between having timed out and
// having been resent by us!
multicast_infinite_send_timeout , // timeout
MAX_NICENESS)) { // niceness
// key is useless for us
// timeout was 60 seconds, but if we saved the addsinprogress at the wrong time we might miss
// it when its between having timed out and having been resent by us!
if (mcast->send(request, requestSize, msg_type_4, false, shardNum, true, 0, (void *)(PTRTYPE)allocSize, (void *)mcast, gotReplyWrapper4, multicast_infinite_send_timeout, MAX_NICENESS)) {
// . let storeRec() do all the allocating...
// . only let the buffer go once multicast succeeds
s_hostBufs [ hostId ] = NULL;
@ -1242,16 +1230,7 @@ bool loadAddsInProgress ( const char *prefix ) {
p += numBytes;
// send it!
if ( ! g_udpServer.sendRequest ( buf ,
numBytes ,
msg_type_4 ,
h->m_ip ,
h->m_port ,
h->m_hostId ,
NULL ,
NULL , // state data
NULL , // callback
udpserver_sendrequest_infinite_timeout)){// timeout
if (!g_udpServer.sendRequest(buf, numBytes, msg_type_4, h->m_ip, h->m_port, h->m_hostId, NULL, NULL, NULL, udpserver_sendrequest_infinite_timeout)) {
close ( fd );
// report it
log(LOG_WARN, "%s:%s: could not resend reload buf: %s",

@ -162,23 +162,10 @@ bool MsgC::getIp(const char *hostname, int32_t hostnameLen, int32_t *ip, void *s
// here unless we are niceness 0, which we need in case the handling
// servers goes down, we do not want to wait for it and would rather
// call the callback with an EUDPTIMEDOUT error after 60 seconds.
int64_t timeout = (niceness==0)
? multicast_msg1c_getip_default_timeout
: multicast_infinite_send_timeout;
if ( !m_mcast.send (m_request , // sets mcast->m_msg to this
requestSize, // sets mcast->m_msgLen to this
msg_type_c ,
false , // does multicast own msg?
host->m_shardNum , // group to send to (groupKey)
false , // send to whole group?
0 , // key.n1 , // key is useless for us
this , // state data
state , // state data
gotReplyWrapper ,
timeout , // timeout
niceness , // niceness
firstHostId,// first host to try
false )) { // free reply buf?
int64_t timeout = (niceness==0) ? multicast_msg1c_getip_default_timeout : multicast_infinite_send_timeout;
// key is useless for us
if (!m_mcast.send(m_request, requestSize, msg_type_c, false, host->m_shardNum, false, 0, this, state, gotReplyWrapper, timeout, niceness, firstHostId, NULL, false)) {
//did not block, error
log(LOG_DEBUG,"dns: msgc: mcast had error: %s",
mstrerror(g_errno));

@ -18,7 +18,8 @@ enum msg_type_t {
msg_type_3f = 0x3f,
msg_type_54 = 0x54,
msg_type_c1 = 0xc1,
msg_type_fd = 0xfd
msg_type_fd = 0xfd,
msg_type_dns
};

@ -61,20 +61,9 @@ void Multicast::reset ( ) {
// . caller can now pass in his own reply buffer
// . if "freeReplyBuf" is true that means it needs to be freed at some point
// otherwise, it's probably on the stack or part of a larger allocate class.
bool Multicast::send ( char *msg ,
int32_t msgSize ,
msg_type_t msgType ,
bool ownMsg ,
uint32_t shardNum,
bool sendToWholeGroup ,
int32_t key ,
void *state ,
void *state2 ,
void (*callback) (void *state , void *state2),
int64_t totalTimeout , // in millseconds
int32_t niceness ,
int32_t firstHostId ,
bool freeReplyBuf ) {
bool Multicast::send(char *msg, int32_t msgSize, msg_type_t msgType, bool ownMsg, uint32_t shardNum, bool sendToWholeGroup,
int32_t key, void *state, void *state2, void (*callback)(void *state, void *state2),
int64_t totalTimeout, int32_t niceness, int32_t firstHostId, const char *extraInfo, bool freeReplyBuf) {
bool sendToSelf = true;
// make sure not being re-used!
@ -118,8 +107,6 @@ bool Multicast::send ( char *msg ,
memset ( m_errnos , 0 , sizeof(int32_t ) * MAX_HOSTS_PER_GROUP );
memset ( m_slots , 0 , sizeof(UdpSlot *) * MAX_HOSTS_PER_GROUP );
memset ( m_inProgress , 0 , sizeof(char ) * MAX_HOSTS_PER_GROUP );
// breathe
QUICKPOLL(m_niceness);
// . get the list of hosts in this group
// . returns false if blocked, true otherwise
@ -150,9 +137,10 @@ bool Multicast::send ( char *msg ,
}
//if ( ! sendToWholeGroup ) return sendToHostLoop ( key , -1 );
// . send to ALL hosts in this group if sendToWholeGroup is true
// . blocks forever until sends to all hosts are successfull
sendToGroup ( );
sendToGroup();
// . sendToGroup() always blocks, but we return true if no g_errno
// . we actually keep looping until all hosts get the msg w/o error
@ -171,7 +159,7 @@ bool Multicast::send ( char *msg ,
// . it does not send to hosts whose m_errnos is 0
// . TODO: deal with errors from g_udpServer::sendRequest() better
// . returns false and sets g_errno on error
void Multicast::sendToGroup ( ) {
void Multicast::sendToGroup() {
// see if anyone gets an error
bool hadError = false;
// . cast the msg to ALL hosts in the m_hosts group of hosts
@ -215,19 +203,7 @@ void Multicast::sendToGroup ( ) {
// . send to a single host
// . this creates a transaction control slot, "udpSlot"
// . returns false and sets g_errno on error
if ( us->sendRequest ( m_msg ,
m_msgSize ,
m_msgType ,
bestIp , // h->m_ip ,
destPort ,
hid ,
&m_slots[i] ,
this , // state
gotReplyWrapperM2 ,
m_totalTimeout ,
-1 , // backoff
-1 , // max wait in ms
m_niceness )) { // cback niceness
if (us->sendRequest(m_msg, m_msgSize, m_msgType, bestIp, destPort, hid, &m_slots[i], this, gotReplyWrapperM2, m_totalTimeout, m_niceness)) {
continue;
}
// g_errno must have been set, remember it
@ -650,20 +626,7 @@ bool Multicast::sendToHost ( int32_t i ) {
// . this creates a transaction control slot, "udpSlot"
// . return false and sets g_errno on error
// . returns true on successful launch and calls callback on completion
if ( ! us->sendRequest ( m_msg ,
m_msgSize ,
m_msgType ,
bestIp , // h->m_ip ,
destPort ,
hid ,
&m_slots[i] ,
this , // state
gotReplyWrapperM1 ,
timeRemaining , // timeout
-1 , // backoff
-1 , // max wait in ms
m_niceness , // cback niceness
maxResends )) {
if (!us->sendRequest(m_msg, m_msgSize, m_msgType, bestIp, destPort, hid, &m_slots[i], this, gotReplyWrapperM1, timeRemaining, m_niceness, NULL, -1, -1, maxResends)) {
log(LOG_WARN, "net: Had error sending msgtype 0x%02x to host #%" PRId32": %s. Not retrying.",
m_msgType,h->m_hostId,mstrerror(g_errno));
// i've seen ENOUDPSLOTS available msg here along with oom

@ -87,7 +87,8 @@ class Multicast {
int64_t totalTimeout , //relative timeout in milliseconds
int32_t niceness ,
int32_t firstHostId = -1 ,// first host to try
bool freeReplyBuf = true );
const char *extraInfo = NULL,
bool freeReplyBuf = true );
// . get the reply from your NON groupSend
// . if *freeReply is true then you are responsible for freeing this
@ -111,7 +112,7 @@ class Multicast {
void gotReply1 ( UdpSlot *slot ) ;
void closeUpShop ( UdpSlot *slot ) ;
void sendToGroup ( ) ;
void sendToGroup();
void gotReply2 ( UdpSlot *slot ) ;
// . stuff set directly by send() parameters

@ -213,22 +213,10 @@ bool Msg7::sendInjectionRequestToHost ( InjectionRequest *ir ,
// . and call got gotForwardedReplyWrapper when reply comes in
// . returns false and sets g_errno on error
// . returns true on success
if ( g_udpServer.sendRequest ( sir , // req ,
sirSize,
msg_type_7 ,
host->m_ip , // ip
host->m_port , // port
host->m_hostId,
NULL, // retslot
this,//state,
gotUdpReplyWrapper,//acallback,
udpserver_sendrequest_infinite_timeout , // timeout
-1 , // backoff
-1 , // maxwait
MAX_NICENESS // niceness
) )
if (g_udpServer.sendRequest(sir, sirSize, msg_type_7, host->m_ip, host->m_port, host->m_hostId, NULL, this, gotUdpReplyWrapper, udpserver_sendrequest_infinite_timeout, MAX_NICENESS)) {
// we also return true on success, false on error
return true;
}
if ( ! g_errno ) { g_process.shutdownAbort(true); }
// there was an error, g_errno should be set
@ -1088,18 +1076,7 @@ bool ImportState::importLoop ( ) {
// do not free it, let multicast free it after sending it
sbuf->detachBuf();
if ( ! mcast->send ( req ,
reqSize ,
msg_type_7 ,
true , // ownmsg?
shardNum,
false, // send to whole shard?
key , // for selecting host in shard
mcast , // state
NULL , // state2
gotMulticastReplyWrapper ,
multicast_infinite_send_timeout,
MAX_NICENESS ) ) {
if (!mcast->send(req, reqSize, msg_type_7, true, shardNum, false, key, mcast, NULL, gotMulticastReplyWrapper, multicast_infinite_send_timeout, MAX_NICENESS)) {
log(LOG_WARN, "import: import mcast had error: %s",mstrerror(g_errno));
m_numIn++;
}

@ -7,7 +7,7 @@
#include "Dns.h"
#include "SafeBuf.h"
#include "Msg13.h"
#include "Msg0.h"
#include <algorithm>
static void printTcpTable (SafeBuf *p, const char *title, TcpServer *server);
static void printUdpTable (SafeBuf *p, const char *title, UdpServer *server,
@ -202,6 +202,10 @@ void printTcpTable ( SafeBuf* p, const char *title, TcpServer *server ) {
p->safePrintf ("</table><br>\n" );
}
bool sortByStartTime(const UdpStatistic &s1, const UdpStatistic &s2) {
return (s1.getStartTime() < s2.getStartTime());
}
void printUdpTable(SafeBuf *p, const char *title, UdpServer *server, const char *coll, int32_t fromIp, bool isDns) {
if (!coll) {
coll = "main";
@ -210,45 +214,18 @@ void printUdpTable(SafeBuf *p, const char *title, UdpServer *server, const char
// time now
int64_t now = gettimeofdayInMilliseconds();
// store in buffer for sorting
int32_t times[50000];//MAX_UDP_SLOTS];
UdpSlot *slots[50000];//MAX_UDP_SLOTS];
int32_t nn = 0;
for (UdpSlot *s = server->getActiveHead(); s; s = s->getActiveListNext()) {
if ( nn >= 50000 ) {
log("admin: Too many udp sockets.");
break;
}
// store it
times[nn] = now - s->getStartTime();
slots[nn] = s;
nn++;
}
// bubble sort
keepSorting:
// assume no swap will happen
bool didSwap = false;
for ( int32_t i = 1 ; i < nn ; i++ ) {
if ( times[i-1] >= times[i] ) continue;
int32_t tmpTime = times[i-1];
UdpSlot *tmpSlot = slots[i-1];
times[i-1] = times[i];
slots[i-1] = slots[i];
times[i ] = tmpTime;
slots[i ] = tmpSlot;
didSwap = true;
}
if ( didSwap ) goto keepSorting;
std::vector<UdpStatistic> udp_statistics = server->getStatistics();
std::sort(udp_statistics.begin(), udp_statistics.end(), sortByStartTime);
// count how many of each msg we have
int32_t msgCount0[MAX_MSG_TYPES] = {};
int32_t msgCount1[MAX_MSG_TYPES] = {};
for ( int32_t i = 0; i < nn; i++ ) {
UdpSlot *s = slots[i];
if ( s->getNiceness() == 0 )
msgCount0[s->getMsgType()]++;
else
msgCount1[s->getMsgType()]++;
for (auto it = udp_statistics.begin(); it != udp_statistics.end(); ++it) {
if ( it->getNiceness() == 0 ) {
msgCount0[it->getMsgType()]++;
} else {
msgCount1[it->getMsgType()]++;
}
}
const char *wr = server->getWriteRegistered() ? " [write registered]" : "";
@ -268,18 +245,20 @@ void printUdpTable(SafeBuf *p, const char *title, UdpServer *server, const char
title , server->getNumUsedSlots() ,
wr ,
DARK_BLUE );
for ( int32_t i = 0; i < 96; i++ ) {
if ( msgCount0[i] <= 0 ) continue;
p->safePrintf("<tr bgcolor=#%s>"
"<td>0</td><td>0x%" PRIx32"</td><td>%" PRId32"</td></tr>",
LIGHT_BLUE,i, msgCount0[i]);
for ( int32_t i = 0; i < MAX_MSG_TYPES; i++ ) {
if ( msgCount0[i] <= 0 ) {
continue;
}
p->safePrintf("<tr bgcolor=#%s><td>0</td><td>0x%" PRIx32"</td><td>%" PRId32"</td></tr>", LIGHT_BLUE,i, msgCount0[i]);
}
for ( int32_t i = 0; i < 96; i++ ) {
if ( msgCount1[i] <= 0 ) continue;
p->safePrintf("<tr bgcolor=#%s>"
"<td>1</td><td>0x%" PRIx32"</td><td>%" PRId32"</td></tr>",
LIGHT_BLUE,i, msgCount1[i]);
for ( int32_t i = 0; i < MAX_MSG_TYPES; i++ ) {
if ( msgCount1[i] <= 0 ) {
continue;
}
p->safePrintf("<tr bgcolor=#%s><td>1</td><td>0x%" PRIx32"</td><td>%" PRId32"</td></tr>", LIGHT_BLUE,i, msgCount1[i]);
}
p->safePrintf ( "</table><br>" );
const char *dd = isDns ? "<td><b>hostname</b></td>" : "<td><b>msgType</td><td><b>desc</td><td><b>hostId</td>";
@ -315,24 +294,26 @@ void printUdpTable(SafeBuf *p, const char *title, UdpServer *server, const char
// now fill in the columns
for ( int32_t i = 0 ; i < nn ; i++ ) {
// get from sorted list
UdpSlot *s = slots[i];
// times
int64_t elapsed0 = (now - s->getStartTime() ) ;
int64_t elapsed1 = (now - s->getLastReadTime() ) ;
int64_t elapsed2 = (now - s->getLastSendTime() ) ;
char e0[32],e1[32], e2[32];
sprintf ( e0 , "%" PRId64"ms" , elapsed0 );
sprintf ( e1 , "%" PRId64"ms" , elapsed1 );
sprintf ( e2 , "%" PRId64"ms" , elapsed2 );
if ( s->getStartTime() == 0LL ) strcpy ( e0 , "--" );
if ( s->getLastReadTime() == 0LL ) strcpy ( e1 , "--" );
if ( s->getLastSendTime() == 0LL ) strcpy ( e2 , "--" );
for (auto it = udp_statistics.begin(); it != udp_statistics.end(); ++it) {
char e0[32] = "--";
char e1[32] = "--";
char e2[32] = "--";
if (it->getStartTime() != 0LL) {
sprintf(e0, "%" PRId64"ms", (now - it->getStartTime()));
}
if (it->getLastReadTime() != 0LL) {
sprintf(e1, "%" PRId64"ms", (now - it->getLastReadTime()));
}
if (it->getLastSendTime() != 0LL) {
sprintf(e2, "%" PRId64"ms", (now - it->getLastSendTime()));
}
// bgcolor is lighter for incoming requests
const char *bg = s->hasCallback() ? LIGHT_BLUE : LIGHTER_BLUE;
Host *h = g_hostdb.getHost(s->getIp(), s->getPort());
const char *bg = it->hasCallback() ? LIGHT_BLUE : LIGHTER_BLUE;
Host *h = g_hostdb.getHost(it->getIp(), it->getPort());
const char *eip = "??";
uint16_t eport = 0;
const char *ehostId = "-1";
@ -347,94 +328,13 @@ void printUdpTable(SafeBuf *p, const char *title, UdpServer *server, const char
} else {
// if no corresponding host, it could be a request from an external
// cluster, so just show the ip
sprintf(tmpHostId, "%s", iptoa(s->getIp()));
sprintf(tmpHostId, "%s", iptoa(it->getIp()));
ehostId = tmpHostId;
eip = tmpHostId;
}
// set description of the msg
msg_type_t msgType = s->getMsgType();
const char *desc = "";
char *rbuf = s->m_readBuf;
char *sbuf = s->m_sendBuf;
int32_t rbufSize = s->m_readBufSize;
int32_t sbufSize = s->m_sendBufSize;
bool weInit = s->hasCallback();
bool calledHandler = weInit ? s->hasCalledCallback() : s->hasCalledHandler();
char tt[64];
tt[0] = ' ';
tt[1] = '\0';
bool calledHandler = it->hasCallback() ? it->hasCalledCallback() : it->hasCalledHandler();
if (msgType == msg_type_0) {
char *buf = weInit ? sbuf : rbuf;
if (buf) {
int32_t rdbId = buf[RDBIDOFFSET];
Rdb *rdb = NULL;
if (rdbId >= 0 && !isDns) {
rdb = getRdbFromId((uint8_t) rdbId);
if (rdb) {
sprintf(tt, "get from %s", rdb->m_dbname);
}
}
}
desc = tt;
} else if (msgType == msg_type_1) {
char *buf = weInit ? sbuf : rbuf;
if (buf) {
int32_t rdbId = buf[0];
Rdb *rdb = NULL;
if (rdbId >= 0 && !isDns) {
rdb = getRdbFromId((uint8_t) rdbId);
if (rdb) {
sprintf(tt, "add to %s", rdb->m_dbname);
}
}
}
desc = tt;
} else if ( msgType == msg_type_c ) {
desc = "getting ip";
} else if ( msgType == msg_type_11 ) {
desc = "ping";
} else if ( msgType == msg_type_4 ) {
desc = "meta add";
} else if ( msgType == msg_type_13 ) {
char *buf = NULL;
int32_t bufSize = 0;
// . if callback was called this slot's sendbuf can be bogus
// . i put this here to try to avoid a core dump
if (weInit) {
if (!s->hasCalledCallback()) {
buf = sbuf;
bufSize = sbufSize;
}
} else {
buf = rbuf;
bufSize = rbufSize;
}
bool isRobotsTxt = true;
if ( buf && bufSize >= (int32_t)sizeof(Msg13Request)-(int32_t)MAX_URL_LEN ) {
Msg13Request *r = (Msg13Request *)buf;
isRobotsTxt = r->m_isRobotsTxt;
}
desc = isRobotsTxt ? "get robots.txt" : "get web page";
} else if ( msgType == msg_type_22 ) {
desc = "get titlerec";
} else if ( msgType == msg_type_20 ) {
desc = "get summary";
} else if ( msgType == msg_type_39 ) {
desc = "get docids";
} else if ( msgType == msg_type_7 ) {
desc = "inject";
} else if ( msgType == msg_type_25 ) {
desc = "get link info";
} else if ( msgType == msg_type_fd ) {
desc = "proxy forward";
}
p->safePrintf ( "<tr bgcolor=#%s>"
"<td>%s</td>" // age
"<td>%s</td>" // last read
@ -444,39 +344,39 @@ void printUdpTable(SafeBuf *p, const char *title, UdpServer *server, const char
e0 ,
e1 ,
e2 ,
s->getTimeout() );
it->getTimeout() );
// now use the ip for dns and hosts
p->safePrintf("<td>%s:%" PRIu32"</td>",
iptoa(s->getIp()),(uint32_t)s->getPort());
iptoa(it->getIp()),(uint32_t)it->getPort());
const char *cf1 = "";
const char *cf2 = "";
if ( s->m_convertedNiceness ) {
if ( it->getConvertedNiceness() ) {
cf1 = "<font color=red>";
cf2 = "</font>";
}
if ( isDns ) {
p->safePrintf("<td><nobr>%s", s->m_hostname);
p->safePrintf("<td><nobr>%s", it->getExtraInfo());
// get the domain from the hostname
int32_t dlen;
char *dbuf = ::getDomFast ( s->m_hostname,&dlen,false);
const char *dbuf = ::getDomFast(it->getExtraInfo(), &dlen, false);
p->safePrintf( " <a href=\"/admin/tagdb?user=admin&tagtype0=manualban&tagdata0=1&u=%s&c=%s\">"
"[<font color=red><b>BAN %s</b></font>]</nobr></a> " ,
dbuf , coll , dbuf );
p->safePrintf("</td><td>%s%" PRId32"%s</td>", cf1, (int32_t)s->getNiceness(), cf2);
p->safePrintf("</td><td>%s%" PRId32"%s</td>", cf1, (int32_t)it->getNiceness(), cf2);
} else {
// clickable hostId
const char *toFrom = s->hasCallback() ? "to" : "from";
const char *toFrom = it->hasCallback() ? "to" : "from";
p->safePrintf ( "<td>0x%02x</td>" // msgtype
"<td><nobr>%s</nobr></td>" // desc
"<td><nobr>%s <a href=http://%s:%hu/"
"admin/sockets?"
"c=%s>%s</a></nobr></td>"
"<td>%s%" PRId32"%s</td>" , // niceness
s->getMsgType() ,
desc,
it->getMsgType() ,
it->getDescription(),
// begin clickable hostId
toFrom,
eip ,
@ -484,7 +384,7 @@ void printUdpTable(SafeBuf *p, const char *title, UdpServer *server, const char
coll ,
ehostId ,
cf1,
(int32_t)s->getNiceness(),
(int32_t)it->getNiceness(),
cf2
// end clickable hostId
);
@ -492,7 +392,7 @@ void printUdpTable(SafeBuf *p, const char *title, UdpServer *server, const char
const char *rf1 = "";
const char *rf2 = "";
if ( s->getResendCount() ) {
if ( it->getResendCount() ) {
rf1 = "<b style=color:red;>";
rf2 = "</b>";
}
@ -507,16 +407,16 @@ void printUdpTable(SafeBuf *p, const char *title, UdpServer *server, const char
"<td>%" PRId32"</td>" // acks read
"<td>%s%hhu%s</td>" // resend count
"</tr>\n" ,
(uint32_t)s->getTransId(),
(uint32_t)it->getTransId(),
calledHandler,
s->getNumDgramsRead() ,
s->getDatagramsToRead() ,
s->getNumAcksSent() ,
s->getNumDgramsSent() ,
s->getDatagramsToSend() ,
s->getNumAcksRead() ,
it->getNumDatagramRead() ,
it->getNumPendingRead() ,
it->getNumAckSent() ,
it->getNumDatagramSent() ,
it->getNumPendingSend() ,
it->getNumAckRead() ,
rf1 ,
s->getResendCount() ,
it->getResendCount() ,
rf2
);
}

@ -11547,20 +11547,7 @@ bool Parms::doParmSendingLoop ( ) {
// count it
pn->m_numRequests++;
// ok, he's available
if ( ! g_udpServer.sendRequest ( pn->m_parmList.getBufStart(),
pn->m_parmList.length() ,
// a new msgtype
msg_type_3f,
h->m_ip, // ip
h->m_port, // port
h->m_hostId ,
NULL, // retslot
(void *)(PTRTYPE)h->m_hostId , // state
gotParmReplyWrapper ,
30*1000 , // timeout msecs
-1 , // backoff
-1 , // maxwait
0 ) ) { // niceness
if (!g_udpServer.sendRequest(pn->m_parmList.getBufStart(), pn->m_parmList.length(), msg_type_3f, h->m_ip, h->m_port, h->m_hostId, NULL, (void *)(PTRTYPE)h->m_hostId, gotParmReplyWrapper, 30000, 0)) {
log("parms: faild to send: %s",mstrerror(g_errno));
continue;
}
@ -11885,16 +11872,7 @@ bool Parms::syncParmsWithHost0 ( ) {
// . msg4 guarantees ordering of requests
// . there will be a record that is CMD_INSYNC so when we get
// that we set g_parms.m_inSyncWithHost0 to true
if ( ! g_udpServer.sendRequest ( request ,//hashList.getBufStart() ,
requestLen, //hashList.length() ,
msg_type_3e , // msgtype
h->m_ip, // ip
h->m_port, // port
h->m_hostId , // hostid , host #0!!!
NULL, // retslot
NULL , // state
gotReplyFromHost0Wrapper ,
udpserver_sendrequest_infinite_timeout ) ) { // timeout in msecs
if (!g_udpServer.sendRequest(request, requestLen, msg_type_3e, h->m_ip, h->m_port, h->m_hostId, NULL, NULL, gotReplyFromHost0Wrapper, udpserver_sendrequest_infinite_timeout)) {
log("parms: error syncing with host 0: %s",mstrerror(g_errno));
return false;
}

@ -380,25 +380,17 @@ void PingServer::pingHost ( Host *h , uint32_t ip , uint16_t port ) {
// the proxy may be interfacing with the temporary cluster while
// we update the main cluster...
//int32_t port = h->m_port;
if ( g_proxy.isProxyRunning() && g_conf.m_useTmpCluster )
if ( g_proxy.isProxyRunning() && g_conf.m_useTmpCluster ) {
port++;
}
if ( h->m_isProxy ) hostId = -1;
if ( g_udpServer.sendRequest ( (char *)pi , //request ,
sizeof(PingInfo),//requestSize ,
msg_type_11 ,
ip ,//h->m_ip ,
port ,//h->m_port ,
hostId ,
NULL ,
(void *)h , // callback state
gotReplyWrapperP ,
// timeout
g_conf.m_deadHostTimeout ,
1000 , // backoff
2000 , // max wait
0 )) // niceness
if ( h->m_isProxy ) {
hostId = -1;
}
if (g_udpServer.sendRequest((char *)pi, sizeof(PingInfo), msg_type_11, ip, port, hostId, NULL, (void *)h, gotReplyWrapperP, g_conf.m_deadHostTimeout, 0, NULL, 1000, 2000)) {
return;
}
// it had an error, so dec the count
s_outstandingPings--;
// consider it out of progress
@ -579,22 +571,9 @@ void gotReplyWrapperP ( void *state , UdpSlot *slot ) {
// send back what his ping was so he knows
*(int32_t *)h->m_tmpBuf = *pingPtr;
if ( g_udpServer.sendRequest (h->m_tmpBuf,//RequestBuf,
//h->m_requestBuf ,
4 , // 4 byte request
msg_type_11 ,
slot->getIp() , // h->m_ip ,
slot->getPort() , // h->m_port2 ,
hid ,
NULL ,
(void *)(PTRTYPE)h->m_hostId, //cb state
gotReplyWrapperP3 ,
// timeout
g_conf.m_deadHostTimeout ,
1000 , // backoff
2000 , // max wait
0 )) // niceness
if (g_udpServer.sendRequest(h->m_tmpBuf, 4, msg_type_11, slot->getIp(), slot->getPort(), hid, NULL, (void *)(PTRTYPE)h->m_hostId, gotReplyWrapperP3, g_conf.m_deadHostTimeout, 0, NULL, 1000, 2000)) {
return;
}
// he came back right away
s_outstandingPings--;
// had an error
@ -1340,21 +1319,10 @@ bool PingServer::broadcastShutdownNotes ( bool sendEmailAlert ,
// count as sent
m_numRequests++;
// send it right now
if ( g_udpServer.sendRequest ( s_buf ,
5 , // rqstSz
msg_type_11 ,
h->m_ip ,
h->m_port ,
// we are sending to a proxy!
-1 , // h->m_hostId ,
NULL , //
NULL , // state
gotReplyWrapperP2 ,
3000 , // 3 sec timeout
-1 , // default backoff
-1 , // default maxwait
0 ))// niceness
// we are sending to a proxy!
if (g_udpServer.sendRequest(s_buf, 5, msg_type_11, h->m_ip, h->m_port, -1, NULL, NULL, gotReplyWrapperP2, 3000, 0)) {
continue;
}
// otherwise, had an error
m_numReplies++;
// reset g_errno
@ -1375,20 +1343,9 @@ bool PingServer::broadcastShutdownNotes ( bool sendEmailAlert ,
//if ( ! r ) return true;
//gbmemcpy ( r , (char *)(&h->m_hostId) , 4 );
// send it right now
if ( g_udpServer.sendRequest ( s_buf ,
5 , // rqstSz
msg_type_11 ,
h->m_ip ,
h->m_port ,
h->m_hostId ,
NULL , //
NULL , // state
gotReplyWrapperP2 ,
3000 , // 3 sec timeout
-1 , // default backoff
-1 , // default maxwait
0 ))// niceness
if (g_udpServer.sendRequest(s_buf, 5, msg_type_11, h->m_ip, h->m_port, h->m_hostId, NULL, NULL, gotReplyWrapperP2, 3000, 0)) {
continue;
}
// otherwise, had an error
m_numReplies++;
// reset g_errno

@ -462,21 +462,7 @@ bool Proxy::forwardRequest ( StateControl *stC ) {
// . returns false and sets g_errno on error, true on success
// . after resending the request 4 times with no ACK recv'd, call
// it a EUDPTIMEDOUT error and deal with that below...
bool status;
status = g_udpServer.sendRequest ( req ,
reqSize ,
msg_type_fd ,
dstIp , // h->m_ip ,
dstPort , // h->m_port ,
dstId , // h->m_hostId ,
NULL , // the slotPtr
stC , // state
gotHttpReplyWrapper ,
stC->m_timeout ,
-1 , // backoff
-1 , // maxwait
0 , // niceness
4 );// maxResends
bool status = g_udpServer.sendRequest(req, reqSize, msg_type_fd, dstIp, dstPort, dstId, NULL, stC, gotHttpReplyWrapper, stC->m_timeout, 0, NULL, -1, -1, 4);
// if no error, return false, we blocked
if ( status ) return false;

@ -4343,7 +4343,7 @@ bool SpiderRequest::setFromAddUrl ( char *url ) {
// how to set m_firstIp? i guess addurl can be throttled independently
// of the other urls??? use the hash of the domain for it!
int32_t dlen;
char *dom = getDomFast ( url , &dlen );
const char *dom = getDomFast ( url , &dlen );
// sanity
if ( ! dom ) {

@ -2079,17 +2079,8 @@ void updateAllCrawlInfosSleepWrapper ( int fd , void *state ) {
// count it as launched
s_requests++;
// launch it
if ( ! g_udpServer.sendRequest ( request,
requestSize,
msg_type_c1 ,
h->m_ip ,
h->m_port ,
h->m_hostId ,
NULL, // retslot
NULL, // state
gotCrawlInfoReply ) ) {
log("spider: error sending c1 request: %s",
mstrerror(g_errno));
if (!g_udpServer.sendRequest(request, requestSize, msg_type_c1, h->m_ip, h->m_port, h->m_hostId, NULL, NULL, gotCrawlInfoReply)) {
log("spider: error sending c1 request: %s", mstrerror(g_errno));
s_replies++;
}
}
@ -2532,8 +2523,7 @@ void handleRequestc1 ( UdpSlot *slot , int32_t niceness ) {
cr->sentLocalCrawlInfoToHost ( hostId );
}
g_udpServer.sendReply_ass( replyBuf.getBufStart(), replyBuf.length(), replyBuf.getBufStart(),
replyBuf.getCapacity(), slot );
g_udpServer.sendReply_ass(replyBuf.getBufStart(), replyBuf.length(), replyBuf.getBufStart(), replyBuf.getCapacity(), slot);
// udp server will free this
replyBuf.detachBuf();

@ -833,8 +833,7 @@ void handleRequest54 ( UdpSlot *udpSlot , int32_t niceness ) {
}
// send the proxy ip/port/LBid back to user
g_udpServer.sendReply_ass( udpSlot->m_tmpBuf, sizeof( ProxyReply ), udpSlot->m_tmpBuf,
sizeof( ProxyReply ), udpSlot );
g_udpServer.sendReply_ass(udpSlot->m_tmpBuf, sizeof(ProxyReply), udpSlot->m_tmpBuf, sizeof(ProxyReply), udpSlot);
}
// . use msg 0x55 to say you are done using the proxy

@ -241,7 +241,7 @@ class Msg8a {
int32_t m_niceness;
char *m_dom;
const char *m_dom;
char *m_hostEnd;
char *m_p;

@ -66,7 +66,7 @@ class Titledb {
}
// a different way to do it
uint64_t getProbableDocId(char *url,char *dom,int32_t domLen) {
uint64_t getProbableDocId(const char *url,const char *dom,int32_t domLen) {
uint64_t probableDocId = hash64b(url,0) &
DOCID_MASK;
// clear bits 6-13 because we want to put the domain hash there

@ -137,10 +137,10 @@ bool UdpServer::init ( uint16_t port, UdpProtocol *proto,
// set up linked list of available slots
m_availableListHead = &m_slots[0];
for ( int32_t i = 0 ; i < m_maxSlots - 1 ; i++ ) {
m_slots[ i ].m_availableListNext = &m_slots[ i + 1 ];
for (int32_t i = 0; i < m_maxSlots - 1; i++) {
m_slots[i].m_availableListNext = &m_slots[i + 1];
}
m_slots [ m_maxSlots - 1].m_availableListNext = NULL;
m_slots[m_maxSlots - 1].m_availableListNext = NULL;
// the linked list of slots in use
m_activeListHead = NULL;
@ -324,14 +324,14 @@ bool UdpServer::sendRequest(char *msg,
void *state,
void (*callback)(void *state, UdpSlot *slot),
int64_t timeout, // in milliseconds
int32_t niceness,
const char *extraInfo,
int16_t backoff,
int16_t maxWait,
int32_t niceness,
int32_t maxResends) {
// sanity check
// proxy forwards the msg10 to a host in the cluster
if ( ! m_handlers[msgType] && this == &g_udpServer && ! g_proxy.isProxy() ) {
if ( ! m_handlers[msgType] && msgType != msg_type_dns ) {
g_process.shutdownAbort(true);
}
@ -401,9 +401,9 @@ bool UdpServer::sendRequest(char *msg,
}
logDebug(g_conf.m_logDebugUdp, "udp: sendrequest: ip2=%s port=%" PRId32" msgType=0x%02x msgSize=%" PRId32" "
"transId=%" PRId32" (niceness=%" PRId32") slot=%" PTRFMT".",
"transId=%" PRId32" (niceness=%" PRId32") slot=%p.",
iptoa(ip2),(int32_t)port, (unsigned char)msgType, (int32_t)msgSize,
(int32_t)transId, (int32_t)niceness , (PTRTYPE)slot );
(int32_t)transId, (int32_t)niceness , slot );
// . get time
int64_t now = gettimeofdayInMillisecondsLocal();
@ -418,7 +418,7 @@ bool UdpServer::sendRequest(char *msg,
}
// set up for a send
if (!slot->sendSetup(msg, msgSize, msg, msgSize, msgType, now, state, callback, niceness, backoff, maxWait)) {
if (!slot->sendSetup(msg, msgSize, msg, msgSize, msgType, now, state, callback, niceness, backoff, maxWait, extraInfo)) {
freeUdpSlot_ass ( slot );
log( LOG_WARN, "udp: Failed to initialize udp socket for sending req: %s",mstrerror(g_errno));
return false;
@ -447,46 +447,42 @@ bool UdpServer::sendRequest(char *msg,
}
// returns false and sets g_errno on error, true otherwise
void UdpServer::sendErrorReply( UdpSlot *slot, int32_t errnum ) {
void UdpServer::sendErrorReply(UdpSlot *slot, int32_t errnum) {
logDebug(g_conf.m_logDebugUdp, "udp: sendErrorReply slot=%p errnum=%" PRId32, slot, errnum);
// bitch if it is 0
if ( errnum == 0 ) {
log(LOG_LOGIC,"udp: sendErrorReply: errnum is 0.");
g_process.shutdownAbort(true);
}
// clear g_errno in case it was set
g_errno = 0;
// make a little msg
char *msg = slot->m_tmpBuf; //(char *)mmalloc(4,"UdpServer");
// make sure to destroy slot to free read/send bufs if this fails
//if ( ! msg ) {
// log("udp: sendErrorReply: %s",mstrerror(g_errno));
// destroySlot(slot);
// return;
//}
char *msg = slot->m_tmpBuf;
*(int32_t *)msg = htonl(errnum) ;
// set the m_localErrno in "slot" so it will set the dgrams error bit
slot->m_localErrno = errnum;
sendReply_ass ( msg , 4 , msg , 4 , slot );
sendReply_ass(msg, 4, msg, 4, slot);
}
// . destroys slot on error or completion (frees m_readBuf,m_sendBuf)
// . use a backoff of -1 for the default
void UdpServer::sendReply_ass ( char *msg ,
int32_t msgSize ,
char *alloc ,
int32_t allocSize ,
UdpSlot *slot ,
void *state ,
void (* callback2)(void *state, UdpSlot *slot),
int16_t backoff ,
int16_t maxWait ,
bool isCallback2Hot) {
void UdpServer::sendReply_ass(char *msg, int32_t msgSize, char *alloc, int32_t allocSize, UdpSlot *slot, void *state,
void (*callback2)(void *state, UdpSlot *slot), int16_t backoff, int16_t maxWait,
bool isCallback2Hot) {
logDebug(g_conf.m_logDebugUdp, "udp: sendReply_ass slot=%p", slot);
// the callback should be NULL
if ( slot->hasCallback() ) {
g_errno = EBADENGINEER;
log(LOG_LOGIC,"udp: sendReply_ass: Callback is non-NULL.");
return;
}
if ( ! msg && msgSize > 0 ) {
log( LOG_WARN, "udp: calling sendreply with null send buffer and positive size! will probably core." );
}
@ -524,7 +520,7 @@ void UdpServer::sendReply_ass ( char *msg ,
// discount this
if ( slot->m_convertedNiceness == 1 && slot->getNiceness() == 0 ) {
logDebug(g_conf.m_logDebugUdp, "udp: unconverting slot=%" PTRFMT"", (PTRTYPE)slot);
logDebug(g_conf.m_logDebugUdp, "udp: unconverting slot=%p", slot);
// go back to niceness 1 for sending back, otherwise their
// the callback will be called with niceness 0!!
@ -1430,12 +1426,10 @@ bool UdpServer::makeCallbacks_ass ( int32_t niceness ) {
slot->m_sendBufSize == 0 &&
doNicenessConversion &&
m_outstandingConverts < 20 ) {
// note it
if ( g_conf.m_logDebugUdp )
log("udp: converting slot from niceness 1 to "
"0. slot=%" PTRFMT" mmsgtype=0x%02x",
(PTRTYPE)slot,
slot->getMsgType());
logDebug(g_conf.m_logDebugUdp, "udp: converting slot from niceness 1 to 0. slot=%p mmsgtype=0x%02x",
slot, slot->getMsgType());
// convert the niceness
slot->m_niceness = 0;
// count it
@ -1461,12 +1455,8 @@ bool UdpServer::makeCallbacks_ass ( int32_t niceness ) {
slot->m_sendBufSize > 0 &&
doNicenessConversion &&
m_outstandingConverts < 20 ) {
// note it
if ( g_conf.m_logDebugUdp )
log("udp: converting slot2 from niceness 1 to "
"0. slot=%" PTRFMT" mmsgtype=0x%02x",
(PTRTYPE)slot,
slot->getMsgType());
logDebug(g_conf.m_logDebugUdp, "udp: converting slot2 from niceness 1 to 0. slot=%p mmsgtype=0x%02x",
slot, slot->getMsgType());
// convert the niceness
slot->m_niceness = 0;
// count it
@ -1500,36 +1490,33 @@ bool UdpServer::makeCallbacks_ass ( int32_t niceness ) {
bool logIt = false;
if ( slot->getNiceness() == 0 ) logIt = true;
if ( logIt ) start2 = gettimeofdayInMillisecondsLocal();
// log that
if ( g_conf.m_logDebugUdp )
log(LOG_DEBUG,"udp: calling callback/handler for "
"slot=%" PTRFMT" pass=%" PRId32" nice=%" PRId32,
(PTRTYPE)slot,
(int32_t)pass,(int32_t)slot->getNiceness());
logDebug(g_conf.m_logDebugUdp,"udp: calling callback/handler for slot=%p pass=%" PRId32" nice=%" PRId32,
slot, (int32_t)pass,(int32_t)slot->getNiceness());
// . crap, this can alter the linked list we are scanning
// if it deletes the slot! yes, but now we use "nextSlot"
// . return false on error and sets g_errno, true otherwise
// . return true if we called one
// . skip to next slot if did not call callback/handler
if ( ! makeCallback_ass ( slot ) ) continue;
if (!makeCallback_ass(slot)) {
continue;
}
// remove it from the callback list to avoid re-call
removeFromCallbackLinkedList ( slot );
removeFromCallbackLinkedList(slot);
int64_t took = 0;
if ( logIt )
took = gettimeofdayInMillisecondsLocal()-start2;
int64_t took = logIt ? (gettimeofdayInMillisecondsLocal()-start2) : 0;
if ( took > 1000 || (slot->getNiceness()==0 && took>100))
logf(LOG_DEBUG,"udp: took %" PRId64" ms to call "
"callback/handler for "
"msgtype=0x%" PRIx32" "
"nice=%" PRId32" "
"callback=%" PTRFMT"",
"callback=%p",
took,
(int32_t)slot->getMsgType(),
(int32_t)slot->getNiceness(),
(PTRTYPE)slot->m_callback);
slot->m_callback);
numCalled++;
// log how long callback took
@ -1565,7 +1552,6 @@ bool UdpServer::makeCallbacks_ass ( int32_t niceness ) {
// quickpoll, we have to reset the linked list scan after
// calling makeCallback(slot) below.
if ( ! g_loop.m_inQuickPoll ) goto fullRestart;
}
// clear
g_errno = 0;
@ -1644,12 +1630,12 @@ bool UdpServer::makeCallback_ass ( UdpSlot *slot ) {
"msgType=0x%02x "
"g_errno=%s "
"niceness=%" PRId32" "
"callback=%08" PTRFMT" "
"callback=%p "
"took %" PRId64" ms (%" PRId32" Mbps).",
slot->getTransId(),msgType,
mstrerror(g_errno),
slot->getNiceness(),
(PTRTYPE)slot->m_callback ,
slot->m_callback ,
took , Mbps );
start = now;
}
@ -1688,12 +1674,12 @@ bool UdpServer::makeCallback_ass ( UdpSlot *slot ) {
// g_process.shutdownAbort(true);}
// sanity check. has this slot been excised from linked list?
if ( slot->m_activeListPrev && slot->m_activeListPrev->m_activeListNext != slot ) {
if (slot->m_activeListPrev && slot->m_activeListPrev->m_activeListNext != slot) {
g_process.shutdownAbort(true);
}
// sanity check. has this slot been excised from linked list?
if ( slot->m_activeListPrev && slot->m_activeListPrev->m_activeListNext != slot ) {
if (slot->m_activeListPrev && slot->m_activeListPrev->m_activeListNext != slot) {
g_process.shutdownAbort(true);
}
@ -1792,11 +1778,11 @@ bool UdpServer::makeCallback_ass ( UdpSlot *slot ) {
log(LOG_DEBUG,
"udp: Callback2 transId=%" PRId32" "
"msgType=0x%02x "
"g_errno=%s callback2=%08" PTRFMT""
"g_errno=%s callback2=%p"
" took %" PRId64" ms.",
slot->getTransId(),msgType,
mstrerror(g_errno),
(PTRTYPE)slot->m_callback2,
slot->m_callback2,
took );
}
// clear any g_errno that may have been set
@ -1989,14 +1975,14 @@ bool UdpServer::makeCallback_ass ( UdpSlot *slot ) {
// this is kinda obsolete now that we have the stats above
if ( g_conf.m_logDebugNet ) {
int64_t took = gettimeofdayInMillisecondsLocal() - start;
log(LOG_DEBUG,"net: Handler transId=%" PRId32" slot=%" PTRFMT" "
log(LOG_DEBUG,"net: Handler transId=%" PRId32" slot=%p "
"msgType=0x%02x msgSize=%" PRId32" "
"g_errno=%s callback=%08" PTRFMT" "
"g_errno=%s callback=%p "
"niceness=%" PRId32" "
"took %" PRId64" ms.",
(int32_t)slot->getTransId() , (PTRTYPE)slot,
(int32_t)slot->getTransId() , slot,
msgType, (int32_t)slot->m_readBufSize , mstrerror(g_errno),
(PTRTYPE)slot->m_callback,
slot->m_callback,
(int32_t)slot->getNiceness(),
took );
}
@ -2013,12 +1999,6 @@ void UdpServer::timePollWrapper(int fd, void *state) {
}
void UdpServer::timePoll ( ) {
// debug msg
//if ( g_conf.m_logDebugUdp )
// log(LOG_DEBUG,"udp: timepoll: inSigHandler=%" PRId32", m_activeListHead=%" PRId32".",
// (int32_t)g_inSigHandler,(int32_t)m_activeListHead);
// timeout dead hosts if we should
//if ( g_conf.m_giveupOnDeadHosts ) timeoutDeadHosts ( );
if ( ! m_activeListHead ) return;
// debug msg
@ -2047,8 +2027,6 @@ void UdpServer::timePoll ( ) {
// repeat in case the send got reset
// if ( first ) { first = false; goto loop; }
}
// debug msg
//if ( g_conf.m_logDebugUdp ) log("exit timePoll");
}
@ -2096,8 +2074,6 @@ bool UdpServer::readTimeoutPoll ( int64_t now ) {
slot->m_readAckBitsOn);
}
// get the slot
//UdpSlot *slot = &m_slots[i];
// if the reading is completed, but we haven't generated a
// reply yet, then continue because when reply is generated
// UdpServer::sendReply(slot) will be called and we don't
@ -2289,7 +2265,12 @@ bool UdpServer::readTimeoutPoll ( int64_t now ) {
// . may be called twice on same slot by Multicast::destroySlotsInProgress()
void UdpServer::destroySlot ( UdpSlot *slot ) {
// return if no slot
if ( ! slot ) return;
if ( ! slot ) {
return;
}
logDebug(g_conf.m_logDebugUdp, "udp: destroy slot=%p", slot);
// if we're deleting a slot that was an incoming request then
// decrement m_requestsInWaiting (exclude pings)
if ( ! slot->hasCallback() && slot->getMsgType() != msg_type_11 ) {
@ -2362,7 +2343,6 @@ bool UdpServer::shutdown ( bool urgent ) {
time_t now = getTime();
int32_t count = 0;
if(!urgent) {
//if ( m_head && m_activeListHead->m_activeListNext ) return false;
for ( UdpSlot *slot = m_activeListHead ; slot ; slot = slot->m_activeListNext ) {
// if we initiated, then don't count it
if ( slot->hasCallback() ) continue;
@ -2420,8 +2400,7 @@ bool UdpServer::timeoutDeadHosts ( Host *h ) {
// or gk1! which have hostIds 0 and 1, like the proxy0
// and proxy1 do...
if ( h->m_isProxy ) return true;
// get time now
//time_t now = getTime();
// find sockets out to dead hosts and change the timeout
for ( UdpSlot *slot = m_activeListHead ; slot ; slot = slot->m_activeListNext ) {
// only change requests to dead hosts
@ -2441,8 +2420,10 @@ bool UdpServer::timeoutDeadHosts ( Host *h ) {
// verified that this is not interruptible
UdpSlot *UdpServer::getEmptyUdpSlot_ass ( key_t k , bool incoming ) {
UdpSlot *slot = removeFromAvailableLinkedList();
// return NULL if none left
if ( ! m_availableListHead ) {
if (!slot) {
g_errno = ENOSLOTS;
if (g_conf.m_logNetCongestion) {
log(LOG_WARN, "udp: %" PRId32" of %" PRId32" udp slots occupied. None available to handle this new transaction.",
@ -2451,25 +2432,7 @@ UdpSlot *UdpServer::getEmptyUdpSlot_ass ( key_t k , bool incoming ) {
return NULL;
}
UdpSlot *slot = m_availableListHead;
// remove from linked list of available slots
m_availableListHead = m_availableListHead->m_availableListNext;
// put the used slot at the tail so older slots are at the head and
// makeCallbacks() can take care of the callbacks that have been
// waiting the longest first...
if (m_activeListTail) {
slot->m_activeListNext = NULL;
slot->m_activeListPrev = m_activeListTail;
m_activeListTail->m_activeListNext = slot;
m_activeListTail = slot;
} else {
slot->m_activeListNext = NULL;
slot->m_activeListPrev = NULL;
m_activeListHead = slot;
m_activeListTail = slot;
}
addToActiveLinkedList(slot);
// count it
m_numUsedSlots++;
@ -2484,10 +2447,12 @@ UdpSlot *UdpServer::getEmptyUdpSlot_ass ( key_t k , bool incoming ) {
slot->m_key = k;
addKey(k, slot);
logDebug(g_conf.m_logDebugUdp, "udp: get %s empty slot=%p with key=%s", incoming ? "incoming" : "outgoing", slot, KEYSTR(&k, sizeof(key_t)));
return slot;
}
void UdpServer::addKey ( key_t k , UdpSlot *ptr ) {
logDebug(g_conf.m_logDebugUdp, "udp: add key=%s with slot=%p", KEYSTR(&k, sizeof(key_t)), ptr);
// we assume that k.n1 is the transId. if this changes we should
// change this to keep our hash lookups fast
@ -2503,25 +2468,53 @@ UdpSlot *UdpServer::getUdpSlot ( key_t k ) {
// . transId is key.n1, use that as hash
// . m_numBuckets must be a power of 2
int32_t i = hashLong(k.n1) & m_bucketMask;
while ( m_ptrs[i] && m_ptrs[i]->m_key != k )
if ( ++i >= m_numBuckets ) i = 0;
while ( m_ptrs[i] && m_ptrs[i]->m_key != k ) {
if (++i >= m_numBuckets) {
i = 0;
}
}
// if empty, return NULL
return m_ptrs[i];
}
void UdpServer::addToCallbackLinkedList ( UdpSlot *slot ) {
void UdpServer::addToAvailableLinkedList(UdpSlot *slot) {
log(LOG_DEBUG, "udp: adding slot=%p to available list", slot);
slot->m_availableListNext = m_availableListHead;
m_availableListHead = slot;
}
UdpSlot* UdpServer::removeFromAvailableLinkedList() {
// return NULL if none left
if ( ! m_availableListHead ) {
logDebug(g_conf.m_logDebugUdp, "udp: unable to remove slot from available list");
return NULL;
}
UdpSlot *slot = m_availableListHead;
// remove from linked list of available slots
m_availableListHead = slot->m_availableListNext;
logDebug(g_conf.m_logDebugUdp, "udp: removing slot=%p from available list", slot);
return slot;
}
void UdpServer::addToCallbackLinkedList(UdpSlot *slot) {
// debug log
if (g_conf.m_logDebugUdp) {
if (slot->getErrno()) {
log(LOG_DEBUG, "udp: adding slot with err=%s to callback list", mstrerror(slot->m_errno) );
log(LOG_DEBUG, "udp: adding slot=%p with err=%s to callback list", slot, mstrerror(slot->m_errno) );
} else {
log(LOG_DEBUG, "udp: adding slot=%" PTRFMT" to callback list", (PTRTYPE)slot);
log(LOG_DEBUG, "udp: adding slot=%p to callback list", slot);
}
}
// must not be in there already, lest we double add it
if ( isInCallbackLinkedList ( slot ) ) {
logDebug(g_conf.m_logDebugUdp, "udp: avoided double add slot=%" PTRFMT,(PTRTYPE)slot);
logDebug(g_conf.m_logDebugUdp, "udp: avoided double add slot=%p", slot);
return;
}
@ -2539,7 +2532,7 @@ void UdpServer::addToCallbackLinkedList ( UdpSlot *slot ) {
}
}
bool UdpServer::isInCallbackLinkedList ( UdpSlot *slot ) {
bool UdpServer::isInCallbackLinkedList(UdpSlot *slot) {
// return if not in the linked list
if ( slot->m_callbackListPrev || slot->m_callbackListNext || m_callbackListHead == slot ) {
return true;
@ -2547,14 +2540,13 @@ bool UdpServer::isInCallbackLinkedList ( UdpSlot *slot ) {
return false;
}
void UdpServer::removeFromCallbackLinkedList ( UdpSlot *slot ) {
logDebug(g_conf.m_logDebugUdp, "udp: removing slot=%" PTRFMT" from callback list",(PTRTYPE)slot);
void UdpServer::removeFromCallbackLinkedList(UdpSlot *slot) {
logDebug(g_conf.m_logDebugUdp, "udp: removing slot=%p from callback list", slot);
// return if not in the linked list
if ( slot->m_callbackListPrev == NULL &&
slot->m_callbackListNext == NULL &&
m_callbackListHead != slot )
if ( slot->m_callbackListPrev == NULL && slot->m_callbackListNext == NULL && m_callbackListHead != slot ) {
return;
}
// excise from linked list otherwise
if ( m_callbackListHead == slot ) {
@ -2563,35 +2555,76 @@ void UdpServer::removeFromCallbackLinkedList ( UdpSlot *slot ) {
if ( m_callbackListTail == slot )
m_callbackListTail = slot->m_callbackListPrev;
if ( slot->m_callbackListPrev )
if ( slot->m_callbackListPrev ) {
slot->m_callbackListPrev->m_callbackListNext = slot->m_callbackListNext;
if ( slot->m_callbackListNext )
}
if ( slot->m_callbackListNext ) {
slot->m_callbackListNext->m_callbackListPrev = slot->m_callbackListPrev;
}
// and so we do not try to re-excise it
slot->m_callbackListPrev = NULL;
slot->m_callbackListNext = NULL;
}
// verified that this is not interruptible
void UdpServer::freeUdpSlot_ass ( UdpSlot *slot ) {
// set the new head/tail if we were it
if ( slot == m_activeListTail ) {
m_activeListTail = slot->m_activeListPrev;
void UdpServer::addToActiveLinkedList(UdpSlot *slot) {
logDebug(g_conf.m_logDebugUdp, "udp: adding slot=%p to active list", slot);
// put the used slot at the tail so older slots are at the head and
// makeCallbacks() can take care of the callbacks that have been
// waiting the longest first...
slot->m_activeListNext = NULL;
slot->m_activeListPrev = NULL;
if (m_activeListTail) {
// insert at end of linked list otherwise
m_activeListTail->m_activeListNext = slot;
slot->m_activeListPrev = m_activeListTail;
m_activeListTail = slot;
} else {
m_activeListHead = slot;
m_activeListTail = slot;
}
if ( slot == m_activeListHead ) {
}
void UdpServer::removeFromActiveLinkedList(UdpSlot *slot) {
logDebug(g_conf.m_logDebugUdp, "udp: removing slot=%p from active list", slot);
// return if not in the linked list
if ( slot->m_activeListPrev == NULL && slot->m_activeListNext == NULL && m_activeListHead != slot ) {
return;
}
// excise from linked list otherwise
if ( m_activeListHead == slot ) {
m_activeListHead = slot->m_activeListNext;
}
// remove from linked list of used slots
if ( m_activeListTail == slot )
m_activeListTail = slot->m_activeListPrev;
if ( slot->m_activeListPrev ) {
slot->m_activeListPrev->m_activeListNext = slot->m_activeListNext;
}
if ( slot->m_activeListNext ) {
slot->m_activeListNext->m_activeListPrev = slot->m_activeListPrev;
}
// and so we do not try to re-excise it
slot->m_activeListPrev = NULL;
slot->m_activeListNext = NULL;
}
// verified that this is not interruptible
void UdpServer::freeUdpSlot_ass ( UdpSlot *slot ) {
logDebug(g_conf.m_logDebugUdp, "udp: free slot=%p", slot);
removeFromActiveLinkedList(slot);
// also from callback candidates if we should
removeFromCallbackLinkedList ( slot );
removeFromCallbackLinkedList(slot);
// discount it
m_numUsedSlots--;
@ -2599,8 +2632,8 @@ void UdpServer::freeUdpSlot_ass ( UdpSlot *slot ) {
if ( slot->m_incoming ) m_numUsedSlotsIncoming--;
// add to linked list of available slots
slot->m_availableListNext = m_availableListHead;
m_availableListHead = slot;
addToAvailableLinkedList(slot);
// . get bucket number in hash table
// . may have change since table often gets rehashed
key_t k = slot->m_key;
@ -2612,14 +2645,10 @@ void UdpServer::freeUdpSlot_ass ( UdpSlot *slot ) {
log(LOG_LOGIC,"udp: freeUdpSlot_ass: Not in hash table.");
g_process.shutdownAbort(true);
}
if ( g_conf.m_logDebugUdp )
log(LOG_DEBUG,"udp: freeUdpSlot_ass: Freeing slot "
"tid=%" PRId32" "
"dst=%s:%" PRIu32" slot=%" PTRFMT"",
slot->getTransId(),
iptoa(slot->getIp()),
(uint32_t)slot->getPort(),
(PTRTYPE)slot);
logDebug(g_conf.m_logDebugUdp, "udp: freeUdpSlot_ass: Freeing slot tid=%" PRId32" dst=%s:%" PRIu32" slot=%p",
slot->getTransId(), iptoa(slot->getIp()), (uint32_t)slot->getPort(), slot);
// remove the bucket
m_ptrs [ i ] = NULL;
// rehash all buckets below
@ -2644,7 +2673,7 @@ void UdpServer::cancel ( void *state , msg_type_t msgType ) {
}
// note it
log(LOG_INFO,"udp: cancelled udp socket. msgType=0x%02x.", slot->getMsgType());
log(LOG_INFO,"udp: cancelled udp slot=%p msgType=0x%02x.", slot, slot->getMsgType());
// let them know why we are calling the callback prematurely
g_errno = ECANCELLED;
@ -2684,8 +2713,8 @@ void UdpServer::replaceHost ( Host *oldHost, Host *newHost ) {
g_process.shutdownAbort(true);
}
logDebug(g_conf.m_logDebugUdp, "udp: replaceHost: Rehashing slot tid=%" PRId32" dst=%s:%" PRIu32" slot=%" PTRFMT,
slot->getTransId(), iptoa(slot->getIp()), (uint32_t)slot->getPort(), (PTRTYPE)slot);
logDebug(g_conf.m_logDebugUdp, "udp: replaceHost: Rehashing slot tid=%" PRId32" dst=%s:%" PRIu32" slot=%p",
slot->getTransId(), iptoa(slot->getIp()), (uint32_t)slot->getPort(), slot);
// remove the bucket
m_ptrs [ i ] = NULL;
@ -2738,7 +2767,7 @@ void UdpServer::printState() {
}
void UdpServer::saveActiveSlots(int fd, msg_type_t msg_type) {
for (UdpSlot *slot = g_udpServer.getActiveHead(); slot; slot = slot->getActiveListNext()) {
for (const UdpSlot *slot = m_activeListHead; slot; slot = slot->getActiveListNext()) {
// skip if not wanted msg type
if (slot->getMsgType() != msg_type) {
continue;
@ -2760,3 +2789,13 @@ void UdpServer::saveActiveSlots(int fd, msg_type_t msg_type) {
write(fd, slot->m_sendBuf, slot->m_sendBufSize);
}
}
std::vector<UdpStatistic> UdpServer::getStatistics() const {
std::vector<UdpStatistic> statistics;
for (const UdpSlot *slot = m_activeListHead; slot; slot = slot->getActiveListNext()) {
statistics.push_back(UdpStatistic(*slot));
}
return statistics;
}

@ -39,6 +39,7 @@
#include "UdpProtocol.h"
#include "Hostdb.h"
#include "Loop.h" // loop class that handles signals on our socket
#include "UdpStatistic.h"
// . The rules of Async Sig Safe functions
// 1. to be safe, _ass functions should only call other _ass functions.
@ -110,9 +111,10 @@ public:
void *state, // callback state
void (*callback )(void *state, UdpSlot *slot),
int64_t timeout = 60000, // milliseconds
int32_t niceness = 1,
const char *extraInfo = NULL,
int16_t backoff = -1,
int16_t maxWait = -1, // ms
int32_t niceness = 1,
int32_t maxResends = -1);
// . send a reply to the host specified in "slot"
@ -121,20 +123,13 @@ public:
// . backoff is how long to wait for an ACK in ms before we resend
// . we double backoff each time we wait w/o getting any ACK
// . don't wait longer than maxWait for a resend
void sendReply_ass (char *msg,
int32_t msgSize,
char *alloc,
int32_t allocSize,
UdpSlot *slot, // in seconds
void *state = NULL, // callback state
void (*callback2)(void *state, UdpSlot *slot) = NULL,
int16_t backoff = -1,
int16_t maxWait = -1,
bool isCallback2Hot = false);
void sendReply_ass(char *msg, int32_t msgSize, char *alloc, int32_t allocSize, UdpSlot *slot, void *state = NULL,
void (*callback2)(void *state, UdpSlot *slot) = NULL, int16_t backoff = -1, int16_t maxWait = -1,
bool isCallback2Hot = false);
// . propagate an errno to the requesting machine
// . his callback will be called with errno set to "errnum"
void sendErrorReply( UdpSlot *slot, int32_t errnum );
void sendErrorReply(UdpSlot *slot, int32_t errnum);
// . when a request/msg of type "msgType" is received we call the
// corresponding request handler on this machine
@ -181,8 +176,6 @@ public:
bool getWriteRegistered() const { return m_writeRegistered; }
UdpSlot *getActiveHead() { return m_activeListHead; }
bool hasHandler(int i) const { return (m_handlers[i]); }
// changes timeout to very low on dead hosts
@ -198,6 +191,8 @@ public:
// . TODO: make somewhat random cuz it's easy to spoof like it is now
int32_t m_nextTransId;
std::vector<UdpStatistic> getStatistics() const;
private:
static void readPollWrapper(int fd, void *state);
static void timePollWrapper(int fd, void *state);
@ -230,10 +225,18 @@ private:
// or timed a slot out so it's callback should be called
bool readTimeoutPoll ( int64_t now ) ;
// available linked list functions (m_availableListHead)
void addToAvailableLinkedList(UdpSlot *slot);
UdpSlot* removeFromAvailableLinkedList();
// callback linked list functions (m_callbackListHead)
void addToCallbackLinkedList ( UdpSlot *slot ) ;
bool isInCallbackLinkedList ( UdpSlot *slot );
void removeFromCallbackLinkedList ( UdpSlot *slot ) ;
void addToCallbackLinkedList(UdpSlot *slot);
bool isInCallbackLinkedList(UdpSlot *slot);
void removeFromCallbackLinkedList(UdpSlot *slot);
// active linkedlist functions (m_activeListHead)
void addToActiveLinkedList(UdpSlot *slot);
void removeFromActiveLinkedList(UdpSlot *slot);
// . we maintain a sequential list of transaction ids to guarantee
// uniquness to a point

@ -197,17 +197,9 @@ void UdpSlot::resetConnect ( ) {
// . callback is non-NULL iff you're sending a request
// . callback is NULL ifd you're sending a reply
// . returns false and sets g_errno on error
bool UdpSlot::sendSetup(char *msg,
int32_t msgSize,
char *alloc,
int32_t allocSize,
msg_type_t msgType,
int64_t now,
void *state,
void (*callback)(void *state, UdpSlot *slot),
int32_t niceness,
int16_t backoff,
int16_t maxWait) {
bool UdpSlot::sendSetup(char *msg, int32_t msgSize, char *alloc, int32_t allocSize, msg_type_t msgType, int64_t now,
void *state, void (*callback)(void *state, UdpSlot *slot), int32_t niceness, int16_t backoff,
int16_t maxWait, const char *extraInfo) {
#ifdef _VALGRIND_
VALGRIND_CHECK_MEM_IS_DEFINED(msg,msgSize);
@ -267,6 +259,13 @@ bool UdpSlot::sendSetup(char *msg,
m_dgramsToSend = 1;
}
// save additional info (if present)
if (extraInfo) {
strcpy(m_extraInfo, extraInfo);
} else {
m_extraInfo[0] = '\0';
}
// send to particular ip, but not for pings
if ( m_msgType == msg_type_11 ) {
return true;
@ -1533,7 +1532,7 @@ bool UdpSlot::makeReadBuf ( int32_t msgSize , int32_t numDgrams ) {
// . higher scoring slots will do their sending first
// . may have ACKs to send or plain old dgrams to send
// . now is current time in milliseconds since the epoch
int32_t UdpSlot::getScore ( int64_t now ) {
int32_t UdpSlot::getScore ( int64_t now ) const {
// do not do sends if callback was called. maybe cancelled?
// this was causing us to get into an infinite loop in
// UdpServer.cpp's sendPoll_ass(). there wasn't anything to send i

@ -72,6 +72,7 @@ public:
// what is our niceness level?
int32_t getNiceness() const { return m_niceness; }
char getConvertedNiceness() const { return m_convertedNiceness; }
bool hasCallback() const { return (m_callback); }
@ -100,6 +101,10 @@ public:
bool hasCalledCallback() const { return m_calledCallback; }
UdpSlot* getActiveListNext() { return m_activeListNext; }
const UdpSlot* getActiveListNext() const { return m_activeListNext; }
bool isIncoming() const { return m_incoming; }
const char* getExtraInfo() const { return m_extraInfo; }
// a ptr to the Host class for shotgun info
Host *m_host;
@ -117,8 +122,6 @@ public:
int32_t m_readBufSize; // w/o the dgram headers.
int32_t m_readBufMaxSize;
char m_convertedNiceness;
protected:
// set the UdpSlot's protocol, endpoint info, transId, timeout
void connect(UdpProtocol *proto, sockaddr_in *endPoint, Host *host, int32_t hostId, int32_t transId,
@ -136,7 +139,7 @@ protected:
// . use a backoff of -1 for the default
bool sendSetup(char *msg, int32_t msgSize, char *alloc, int32_t allocSize, msg_type_t msgType, int64_t now,
void *state, void (*callback)(void *state, class UdpSlot *), int32_t niceness, int16_t backoff,
int16_t maxWait);
int16_t maxWait, const char* extraInfo = NULL);
// . send a datagram from this slot on "sock" (call after sendSetup())
// . returns -2 if nothing to send, -1 on error, 0 if blocked,
@ -194,7 +197,7 @@ protected:
// . for sending purposes, the max scoring UdpSlot sends first
// . return < 0 if nothing to send
int32_t getScore ( int64_t now );
int32_t getScore ( int64_t now ) const;
void printState() ;
@ -251,6 +254,10 @@ protected:
UdpSlot *m_callbackListNext;
UdpSlot *m_callbackListPrev;
char m_convertedNiceness;
// additional information which could be useful for statistics (specific to msgtype)
char m_extraInfo[64];
private:
// . send an ACK
@ -432,8 +439,6 @@ public:
// . caller should pre-allocated m_readBuf when calling sendRequest() if he expects a large reply
// . incoming requests simply cannot be bigger than this for the hot udp server
char m_tmpBuf[TMPBUFSIZE];
char *m_hostname;
};
extern int32_t g_cancelAcksSent;

106
UdpStatistic.cpp Normal file

@ -0,0 +1,106 @@
#include <cstdio>
#include "UdpStatistic.h"
#include "UdpSlot.h"
#include "Msg13.h"
#include "Msg0.h"
#include "Rdb.h"
UdpStatistic::UdpStatistic(const UdpSlot &slot)
: m_transId(slot.getTransId()),
m_ip(slot.getIp()),
m_port(slot.getPort()),
m_msgType(slot.getMsgType()),
m_description(),
m_niceness(slot.getNiceness()),
m_convertedNiceness(slot.getConvertedNiceness()),
m_numDatagramRead(slot.getNumDgramsRead()),
m_numDatagramSent(slot.getNumDgramsSent()),
m_numAckRead(slot.getNumAcksRead()),
m_numAckSent(slot.getNumAcksSent()),
m_numPendingRead(slot.getDatagramsToRead()),
m_numPendingSend(slot.getDatagramsToSend()),
m_resendCount(slot.getResendCount()),
m_timeout(slot.getTimeout()),
m_startTime(slot.getStartTime()),
m_lastReadTime(slot.getLastReadTime()),
m_lastSendTime(slot.getLastSendTime()),
m_hasCallback(slot.hasCallback()),
m_hasCalledHandler(slot.hasCalledHandler()),
m_hasCalledCallback(slot.hasCalledCallback()),
m_extraInfo() {
char *buf;
int32_t bufSize;
if (slot.isIncoming()) {
buf = slot.m_readBuf;
bufSize = slot.m_readBufSize;
} else {
buf = slot.m_sendBuf;
bufSize = slot.m_sendBufSize;
}
switch (m_msgType) {
case msg_type_0:
if (buf && bufSize > RDBIDOFFSET) {
uint8_t rdbId = static_cast<uint8_t>(buf[RDBIDOFFSET]);
snprintf(m_description, sizeof(m_description), "get from %s", getDbnameFromId(rdbId));
}
break;
case msg_type_1:
if (buf) {
uint8_t rdbId = static_cast<uint8_t>(buf[0]);
snprintf(m_description, sizeof(m_description), "add to %s", getDbnameFromId(rdbId));
}
break;
case msg_type_4:
strcpy(m_description, "meta add");
break;
case msg_type_7:
strcpy(m_description, "inject");
break;
case msg_type_c:
strcpy(m_description, "getting ip");
break;
case msg_type_11:
strcpy(m_description, "ping");
break;
case msg_type_13:
if (buf && static_cast<size_t>(bufSize) >= sizeof(Msg13Request)) {
Msg13Request *r = reinterpret_cast<Msg13Request*>(buf);
snprintf(m_description, sizeof(m_description), "get %s", r->m_isRobotsTxt ? "web page" : "robot.txt");
}
break;
case msg_type_1f:
strcpy(m_description, "get remote log");
break;
case msg_type_20:
strcpy(m_description, "get summary");
break;
case msg_type_22:
strcpy(m_description, "get titlerec");
break;
case msg_type_25:
strcpy(m_description, "get link info");
break;
case msg_type_39:
strcpy(m_description, "get docids");
break;
case msg_type_3e:
strcpy(m_description, "sync parms");
break;
case msg_type_3f:
strcpy(m_description, "update parms");
break;
case msg_type_54:
strcpy(m_description, "proxy spider");
break;
case msg_type_c1:
strcpy(m_description, "get crawl info");
break;
case msg_type_fd:
strcpy(m_description, "proxy forward");
break;
case msg_type_dns:
strcpy(m_extraInfo, slot.getExtraInfo());
break;
}
}

76
UdpStatistic.h Normal file

@ -0,0 +1,76 @@
#ifndef GB_UDPSTATISTIC_H
#define GB_UDPSTATISTIC_H
#include <stdint.h>
#include <vector>
#include "MsgType.h"
class UdpSlot;
class UdpStatistic {
public:
UdpStatistic(const UdpSlot &slot);
int32_t getTransId() const { return m_transId; }
uint32_t getIp() const { return m_ip; }
uint16_t getPort() const { return m_port; }
msg_type_t getMsgType() const { return m_msgType; }
const char* getDescription() const { return m_description; };
int32_t getNiceness() const { return m_niceness; }
char getConvertedNiceness() const { return m_convertedNiceness; }
int32_t getNumDatagramRead() const { return m_numDatagramRead; }
int32_t getNumDatagramSent() const { return m_numDatagramSent; }
int32_t getNumAckRead() const { return m_numAckRead; }
int32_t getNumAckSent() const { return m_numAckSent; }
int32_t getNumPendingRead() const { return m_numPendingRead; }
int32_t getNumPendingSend() const { return m_numPendingSend; }
char getResendCount() const { return m_resendCount; }
int64_t getTimeout() const { return m_timeout; }
int64_t getStartTime() const { return m_startTime; }
int64_t getLastReadTime() const { return m_lastReadTime; }
int64_t getLastSendTime() const { return m_lastSendTime; }
bool hasCallback() const { return m_hasCallback; }
bool hasCalledHandler() const { return m_hasCalledHandler; }
bool hasCalledCallback() const { return m_hasCalledCallback; }
const char* getExtraInfo() const { return m_extraInfo; }
private:
int32_t m_transId;
uint32_t m_ip;
uint16_t m_port;
msg_type_t m_msgType;
char m_description[255];
int32_t m_niceness;
char m_convertedNiceness;
int32_t m_numDatagramRead;
int32_t m_numDatagramSent;
int32_t m_numAckRead;
int32_t m_numAckSent;
int32_t m_numPendingRead;
int32_t m_numPendingSend;
char m_resendCount;
int64_t m_timeout;
int64_t m_startTime;
int64_t m_lastReadTime;
int64_t m_lastSendTime;
bool m_hasCallback;
bool m_hasCalledHandler;
bool m_hasCalledCallback;
char m_extraInfo[64];
};
#endif // GB_UDPSTATISTIC_H

14
Url.cpp

@ -2344,7 +2344,7 @@ char *getPathFast ( char *url ) {
return pe;
}
char *getTLDFast ( char *url , int32_t *tldLen , bool hasHttp ) {
const char *getTLDFast ( char *url , int32_t *tldLen , bool hasHttp ) {
// point to the url
char *pp = url;
// only do this for some
@ -2387,7 +2387,7 @@ char *getTLDFast ( char *url , int32_t *tldLen , bool hasHttp ) {
}
// get the tld
char *tld = ::getTLD ( uhost , uhostLen );
const char *tld = ::getTLD ( uhost , uhostLen );
// if none, done
if ( ! tld ) {
@ -2447,12 +2447,12 @@ bool hasSubdomain ( char *url ) {
// if we are an ip, say yes
if ( ss == pp ) return true;
// get the tld
char *utld = ::getTLD ( uhost , uhostLen );
const char *utld = ::getTLD ( uhost , uhostLen );
// no tld, then no domain
if ( ! utld ) return false;
// the domain, can only be gotten once we know the TLD
// back up a couple chars
char *udom = utld - 2;
const char *udom = utld - 2;
// backup until we hit a '.' or hit the beginning
while ( udom > uhost && *udom != '.' ) udom--;
// fix http://ok/
@ -2469,7 +2469,7 @@ bool hasSubdomain ( char *url ) {
// was happening when a host gave us a bad redir url and xmldoc tried
// to set extra doc's robot.txt url to it "http://2010/robots.txt" where
// the host said "Location: 2010 ...".
char *getDomFast ( char *url , int32_t *domLen , bool hasHttp ) {
const char *getDomFast ( char *url , int32_t *domLen , bool hasHttp ) {
// point to the url
char *pp = url;
// skip http if there
@ -2522,14 +2522,14 @@ char *getDomFast ( char *url , int32_t *domLen , bool hasHttp ) {
return uhost;
}
// get the tld
char *utld = ::getTLD ( uhost , uhostLen );
const char *utld = ::getTLD ( uhost , uhostLen );
// no tld, then no domain
if ( ! utld ) return NULL;
// the domain, can only be gotten once we know the TLD
// set utldLen
//int32_t utldLen = hostEnd - utld;
// back up a couple chars
char *udom = utld - 2;
const char *udom = utld - 2;
// backup until we hit a '.' or hit the beginning
while ( udom > uhost && *udom != '.' ) udom--;
// fix http://ok/

10
Url.h

@ -19,8 +19,8 @@
class SafeBuf;
char *getPathFast ( char *url );
char *getTLDFast ( char *url , int32_t *tldLen , bool hasHttp = true ) ;
char *getDomFast ( char *url, int32_t *domLen, bool hasHttp = true ) ;
const char *getTLDFast ( char *url , int32_t *tldLen , bool hasHttp = true ) ;
const char *getDomFast ( char *url, int32_t *domLen, bool hasHttp = true ) ;
static inline const char *getDomFast ( const char *url, int32_t *domLen, bool hasHttp = true ) {
return getDomFast(const_cast<char*>(url),domLen,hasHttp);
}
@ -127,11 +127,9 @@ public:
const char *getHost() const { return m_host; }
int32_t getHostLen() const { return m_hlen; }
char *getDomain() { return m_domain; }
const char *getDomain() const { return m_domain; }
int32_t getDomainLen() const { return m_dlen; }
char *getTLD() { return m_tld; }
const char *getTLD() const { return m_tld; }
int32_t getTLDLen() const { return m_tldLen; }
@ -247,10 +245,10 @@ private:
char *m_filename;
int32_t m_flen;
char *m_domain;
const char *m_domain;
int32_t m_dlen;
char *m_tld;
const char *m_tld;
int32_t m_tldLen;
// char *m_midDomain equals m_domain

@ -5608,7 +5608,7 @@ Url **XmlDoc::getRedirUrl() {
// special hack for nytimes.com. do not consider simplified redirs
// because it uses a cookie along with redirs to get to the final
// page.
char *dom2 = m_firstUrl.getDomain();
const char *dom2 = m_firstUrl.getDomain();
int32_t dlen2 = m_firstUrl.getDomainLen();
//@todo BR: EEK! Improve this.

@ -3522,7 +3522,7 @@ int32_t dumpSpiderdb ( const char *coll, int32_t startFileNum, int32_t numFiles,
// get the domain
int32_t domLen = 0;
char *dom = getDomFast ( sreq->m_url , &domLen );
const char *dom = getDomFast ( sreq->m_url , &domLen );
// always need enough room...
if ( bufOff + 4 + domLen + 1 >= bufSize ) {
@ -7219,7 +7219,8 @@ void countdomains( const char* coll, int32_t numRecs, int32_t verbosity, int32_t
}
char *fu = xd.ptr_firstUrl;
int32_t dlen; char *dom = getDomFast ( fu , &dlen );
int32_t dlen;
const char *dom = getDomFast ( fu , &dlen );
int32_t dkey = hash32( dom , dlen );
for( i = 0; i < countDom; i++ ) {
@ -7271,7 +7272,8 @@ void countdomains( const char* coll, int32_t numRecs, int32_t verbosity, int32_t
for( int32_t i = 0; i < dlinks->getNumLinks(); i++ ) {
//struct lnk_info *slink;
char *link = dlinks->getLink(i);
int32_t dlen; char *dom = getDomFast ( link , &dlen );
int32_t dlen;
const char *dom = getDomFast ( link , &dlen );
uint32_t lkey = hash32( dom , dlen );
int32_t j;
for( j = 0; j < sdomi->lnkCnt; j++ ) {

@ -117,3 +117,8 @@
Helgrind:Race
fun:_Z38gettimeofdayInMillisecondsGlobalNoCorev
}
{
writing_s_adjustment
Helgrind:Race
fun:_Z32settimeofdayInMillisecondsGloball
}