1655 lines
63 KiB
C++
1655 lines
63 KiB
C++
#include "gb-include.h"
|
|
|
|
|
|
// i guess both msg0 send requests failed with no route to host,
|
|
//and they got retired... why didn't they switch to eth1????
|
|
|
|
|
|
#include "Multicast.h"
|
|
#include "Rdb.h" // RDB_TITLEDB
|
|
#include "Msg20.h"
|
|
#include "Profiler.h"
|
|
#include "Stats.h"
|
|
#include "Process.h"
|
|
|
|
// up to 10 twins in a group
|
|
//#define MAX_HOSTS_PER_GROUP 10
|
|
|
|
// TODO: if we're ordered to close and we still are waiting on stuff
|
|
// to send we should send as much as we can and save the remaining
|
|
// slots to disk for sending later??
|
|
|
|
static void sleepWrapper1 ( int bogusfd , void *state ) ;
|
|
static void sleepWrapper1b ( int bogusfd , void *state ) ;
|
|
static void sleepWrapper2 ( int bogusfd , void *state ) ;
|
|
static void gotReplyWrapperM1 ( void *state , UdpSlot *slot ) ;
|
|
static void gotReplyWrapperM2 ( void *state , UdpSlot *slot ) ;
|
|
|
|
void Multicast::constructor ( ) {
|
|
m_msg = NULL;
|
|
m_readBuf = NULL;
|
|
m_replyBuf = NULL;
|
|
m_inUse = false;
|
|
}
|
|
void Multicast::destructor ( ) { reset(); }
|
|
|
|
Multicast::Multicast ( ) { constructor(); }
|
|
Multicast::~Multicast ( ) { reset(); }
|
|
|
|
// free the send/read (request/reply) bufs we pirated from a UdpSlot or
|
|
// got from the caller
|
|
void Multicast::reset ( ) {
|
|
// if this is called while we are shutting down and Scraper has a
|
|
// MsgE out it cores
|
|
if ( m_inUse && ! g_process.m_exiting ) {
|
|
log("net: Resetting multicast which is in use. msgType=0x%hhx",
|
|
m_msgType);
|
|
char *xx = NULL; *xx = 0;
|
|
// destroy the outstanding slots
|
|
destroySlotsInProgress(NULL);
|
|
// and undo any sleepwrapper
|
|
if ( m_registeredSleep ) {
|
|
g_loop.unregisterSleepCallback ( this , sleepWrapper1);
|
|
g_loop.unregisterSleepCallback ( this , sleepWrapper2);
|
|
m_registeredSleep = false;
|
|
}
|
|
if ( m_registeredSleep2 ) {
|
|
g_loop.unregisterSleepCallback ( this ,sleepWrapper1b);
|
|
m_registeredSleep2 = false;
|
|
}
|
|
}
|
|
if ( m_msg && m_ownMsg )
|
|
mfree ( m_msg , m_msgSize , "Multicast" );
|
|
if ( m_readBuf && m_ownReadBuf && m_freeReadBuf )
|
|
mfree ( m_readBuf , m_readBufMaxSize , "Multicast" );
|
|
// . replyBuf can be separate from m_readBuf if g_errno gets set
|
|
// and sets the slot's m_readBuf to NULL, then calls closeUpShop()
|
|
// which sets m_readBuf to the slot's readBuf, which is now NULL!
|
|
// . this was causing the "bad engineer" errors from Msg22 to leak mem
|
|
if ( m_replyBuf && m_ownReadBuf && m_freeReadBuf &&
|
|
m_replyBuf != m_readBuf )
|
|
mfree ( m_replyBuf , m_replyBufMaxSize , "Multicast" );
|
|
m_msg = NULL;
|
|
m_readBuf = NULL;
|
|
m_replyBuf = NULL;
|
|
m_inUse = false;
|
|
m_replyingHost = NULL;
|
|
}
|
|
|
|
// . an individual transaction's udpSlot is not be removed because we might
|
|
// get it a reply from it later after it's timeout
|
|
// . returns false and sets g_errno on error
|
|
// . caller can now pass in his own reply buffer
|
|
// . if "freeReplyBuf" is true that means it needs to be freed at some point
|
|
// otherwise, it's probably on the stack or part of a larger allocate class.
|
|
bool Multicast::send ( char *msg ,
|
|
int32_t msgSize ,
|
|
uint8_t msgType ,
|
|
bool ownMsg ,
|
|
//uint32_t groupId ,
|
|
uint32_t shardNum,
|
|
bool sendToWholeGroup ,
|
|
int32_t key ,
|
|
void *state ,
|
|
void *state2 ,
|
|
void (*callback) (void *state , void *state2),
|
|
int32_t totalTimeout , // in seconds
|
|
int32_t niceness ,
|
|
bool realtime ,
|
|
int32_t firstHostId ,
|
|
char *replyBuf ,
|
|
int32_t replyBufMaxSize ,
|
|
bool freeReplyBuf ,
|
|
bool doDiskLoadBalancing ,
|
|
int32_t maxCacheAge ,
|
|
key_t cacheKey ,
|
|
char rdbId ,
|
|
int32_t minRecSizes ,
|
|
bool sendToSelf ,
|
|
bool retryForever ,
|
|
class Hostdb *hostdb ,
|
|
int32_t redirectTimeout ,
|
|
class Host *firstHost ) {
|
|
// make sure not being re-used!
|
|
if ( m_inUse ) {
|
|
log("net: Attempt to re-use active multicast");
|
|
char *xx = NULL; *xx = 0;
|
|
}
|
|
// reset to free "m_msg" in case we are being re-used (like by Msg14)
|
|
//log(LOG_DEBUG, "Multicast: send() 0x%hhx",msgType);
|
|
reset();
|
|
// it is now in use
|
|
m_inUse = true;
|
|
// MDW: force this off for now, i'm not sure it helps and i'm tired
|
|
// of seeing the msg34 timed out msgs.
|
|
// . crap, but seems like the indexdb lookups are getting biased!!
|
|
//doDiskLoadBalancing = false;
|
|
// set the parameters in this class
|
|
m_msg = msg;
|
|
m_ownMsg = ownMsg;
|
|
m_ownReadBuf = true;
|
|
m_freeReadBuf = freeReplyBuf;
|
|
m_msgSize = msgSize;
|
|
m_msgType = msgType;
|
|
//m_groupId = groupId;
|
|
m_shardNum = shardNum;
|
|
m_sendToWholeGroup = sendToWholeGroup;
|
|
m_state = state;
|
|
m_state2 = state2;
|
|
m_callback = callback;
|
|
m_totalTimeout = totalTimeout; // in seconds
|
|
m_niceness = niceness;
|
|
m_realtime = realtime;
|
|
// this can't be -1 i guess
|
|
if ( totalTimeout <= 0 ) { char *xx=NULL;*xx=0; }
|
|
// don't use this anymore!
|
|
if ( m_realtime ) { char *xx = NULL; *xx = 0; }
|
|
m_replyBuf = replyBuf;
|
|
m_replyBufMaxSize = replyBufMaxSize;
|
|
m_startTime = getTime();
|
|
m_numReplies = 0;
|
|
m_readBuf = NULL;
|
|
m_readBufSize = 0;
|
|
m_readBufMaxSize = 0;
|
|
m_registeredSleep = false;
|
|
m_registeredSleep2 = false;
|
|
m_sendToSelf = sendToSelf;
|
|
m_retryForever = retryForever;
|
|
m_sentToTwin = false;
|
|
// turn it off until it is debugged
|
|
m_retryForever = false;
|
|
m_hostdb = hostdb;
|
|
if ( ! m_hostdb ) m_hostdb = &g_hostdb;
|
|
m_retryCount = 0;
|
|
m_key = key;
|
|
// reset Msg34's m_numRequests/m_numReplies since this may be
|
|
// the second time send() was called for this particular class instance
|
|
//m_msg34.reset();
|
|
// keep track of how many outstanding requests to a host
|
|
m_numLaunched = 0;
|
|
// variables for doing disk load balancing
|
|
//m_doDiskLoadBalancing = doDiskLoadBalancing;
|
|
m_maxCacheAge = maxCacheAge;
|
|
m_cacheKey = cacheKey;
|
|
m_rdbId = rdbId;
|
|
m_minRecSizes = minRecSizes; // amount we try to read from disk
|
|
m_redirectTimeout = redirectTimeout;
|
|
// clear m_retired, m_errnos, m_slots
|
|
memset ( m_retired , 0 , sizeof(char ) * MAX_HOSTS_PER_GROUP );
|
|
memset ( m_errnos , 0 , sizeof(int32_t ) * MAX_HOSTS_PER_GROUP );
|
|
memset ( m_slots , 0 , sizeof(UdpSlot *) * MAX_HOSTS_PER_GROUP );
|
|
memset ( m_inProgress , 0 , sizeof(char ) * MAX_HOSTS_PER_GROUP );
|
|
// breathe
|
|
QUICKPOLL(m_niceness);
|
|
|
|
|
|
int32_t hostNumToTry = -1;
|
|
|
|
if ( ! firstHost ) {
|
|
// . get the list of hosts in this group
|
|
// . returns false if blocked, true otherwise
|
|
// . sets g_errno on error
|
|
//Host *hostList = m_hostdb->getGroup ( groupId , &m_numHosts);
|
|
Host *hostList = g_hostdb.getShard ( shardNum , &m_numHosts );
|
|
if ( ! hostList ) {
|
|
log("mcast: no group");g_errno=ENOHOSTS;return false;}
|
|
// now copy the ptr into our array
|
|
for ( int32_t i = 0 ; i < m_numHosts ; i++ )
|
|
m_hostPtrs[i] = &hostList[i];
|
|
}
|
|
//
|
|
// if we are sending to an scproxy then put all scproxies into the
|
|
// list of hosts
|
|
//
|
|
else { // if ( firstHost && (firstHost->m_type & HT_SCPROXY) ) {
|
|
int32_t np = 0;
|
|
for ( int32_t i = 0 ; i < g_hostdb.m_numProxyHosts ; i++ ) {
|
|
// int16_tcut
|
|
Host *h = g_hostdb.getProxy(i);
|
|
if ( ! (h->m_type & HT_SCPROXY ) ) continue;
|
|
// stop breaching
|
|
if ( np >= 32 ) { char *xx=NULL;*xx=0; }
|
|
// assign this
|
|
if ( h == firstHost ) hostNumToTry = np;
|
|
// set our array of ptrs of valid hosts to send to
|
|
m_hostPtrs[np++] = h;
|
|
}
|
|
// assign
|
|
m_numHosts = np;
|
|
firstHostId = -1;
|
|
// panic
|
|
if ( ! np ) { char *xx=NULL;*xx=0; }
|
|
}
|
|
|
|
// . pick the fastest host in the group
|
|
// . this should pick the fastest one we haven't already sent to yet
|
|
if ( ! m_sendToWholeGroup ) {
|
|
bool retVal = sendToHostLoop (key,hostNumToTry,firstHostId) ;
|
|
// on error, un-use this class
|
|
if ( ! retVal ) m_inUse = false;
|
|
return retVal;
|
|
}
|
|
//if ( ! m_sendToWholeGroup ) return sendToHostLoop ( key , -1 );
|
|
// . send to ALL hosts in this group if sendToWholeGroup is true
|
|
// . blocks forever until sends to all hosts are successful
|
|
sendToGroup ( );
|
|
// . sendToGroup() always blocks, but we return true if no g_errno
|
|
// . we actually keep looping until all hosts get the msg w/o error
|
|
return true;
|
|
}
|
|
|
|
///////////////////////////////////////////////////////
|
|
// //
|
|
// GROUP SEND //
|
|
// //
|
|
///////////////////////////////////////////////////////
|
|
|
|
// . keeps calling itself back on any error
|
|
// . resends to host/ip's that had error forever
|
|
// . callback only called when all hosts transmission are successful
|
|
// . it does not send to hosts whose m_errnos is 0
|
|
// . TODO: deal with errors from g_udpServer::sendRequest() better
|
|
// . returns false and sets g_errno on error
|
|
void Multicast::sendToGroup ( ) {
|
|
// see if anyone gets an error
|
|
bool hadError = false;
|
|
// . cast the msg to ALL hosts in the m_hosts group of hosts
|
|
for ( int32_t i = 0 ; i < m_numHosts ; i++ ) {
|
|
// cancel any errors
|
|
g_errno = 0;
|
|
// get the host
|
|
Host *h = m_hostPtrs[i];//&m_hosts[i];
|
|
// if we got a nice reply from him skip him
|
|
//slots[i] && m_slots[i]->doneReading() ) continue;
|
|
if ( m_retired[i] ) continue;
|
|
// sometimes msg1.cpp is able to add the data to the tree
|
|
// without problems and will save us a network trans here
|
|
if ( ! m_sendToSelf &&
|
|
h->m_hostId == g_hostdb.m_hostId &&
|
|
m_hostdb == &g_hostdb &&
|
|
! g_conf.m_interfaceMachine ) {
|
|
m_retired[i] = true;
|
|
m_errnos [i] = 0;
|
|
m_numReplies++;
|
|
continue;
|
|
}
|
|
// . timeout is in seconds
|
|
// . timeout is just the time remaining for the whole groupcast
|
|
// int32_t timeout = m_startTime + m_totalTimeout - getTime();
|
|
// . since we now must get non-error replies from ALL hosts
|
|
// in the group we no longer have a "totalTimeout" per se
|
|
// reset the g_errno for host #i
|
|
m_errnos [i] = 0;
|
|
// if niceness is 0, use the higher priority udpServer
|
|
UdpServer *us = &g_udpServer;
|
|
if ( m_realtime ) us = &g_udpServer2;
|
|
// send to the same port as us!
|
|
int16_t destPort = h->m_port;
|
|
//if ( m_realtime ) destPort = h->m_port2;
|
|
|
|
// if from hosts2.conf pick the best ip!
|
|
int32_t bestIp = h->m_ip;
|
|
if ( m_hostdb == &g_hostdb2 )
|
|
bestIp = g_hostdb.getBestHosts2IP ( h );
|
|
|
|
// retire the host to prevent resends
|
|
m_retired [ i ] = true;
|
|
#ifdef _GLOBALSPEC_
|
|
// debug message for global spec
|
|
//logf(LOG_DEBUG,"net: mcast state=%08" XINT32 "",(int32_t)this);
|
|
#endif
|
|
int32_t hid = h->m_hostId;
|
|
if ( m_hostdb != &g_hostdb ) hid = -1;
|
|
// . send to a single host
|
|
// . this creates a transaction control slot, "udpSlot"
|
|
// . returns false and sets g_errno on error
|
|
if ( us->sendRequest ( m_msg ,
|
|
m_msgSize ,
|
|
m_msgType ,
|
|
bestIp , // h->m_ip ,
|
|
destPort ,
|
|
hid ,
|
|
&m_slots[i] ,
|
|
this , // state
|
|
gotReplyWrapperM2 ,
|
|
m_totalTimeout ,
|
|
-1 , // backoff
|
|
-1 , // max wait in ms
|
|
m_replyBuf ,
|
|
m_replyBufMaxSize ,
|
|
m_niceness )) { // cback niceness
|
|
#ifdef _GLOBALSPEC_
|
|
// note the slot ptr for reference
|
|
//logf(LOG_DEBUG, "net: mcast slotPtr=%08" XINT32 "",
|
|
// (int32_t)&m_slots[i]);
|
|
#endif
|
|
continue;
|
|
}
|
|
// g_errno must have been set, remember it
|
|
m_errnos [ i ] = g_errno;
|
|
// we had an error
|
|
hadError = true;
|
|
// bring him out of retirement to try again later in time
|
|
m_retired[i] = false;
|
|
// log the error
|
|
log("net: Got error sending add data request (0x%hhx) "
|
|
"to host #%" INT32 ": %s. "
|
|
"Sleeping one second and retrying.",
|
|
m_msgType,h->m_hostId,mstrerror(g_errno) );
|
|
// . clear it, we'll try again
|
|
// . if we don't clear Msg1::addList(), which returns
|
|
// true if it did not block, false if it did, will pick up
|
|
// on it and weird things might happen.
|
|
g_errno = 0;
|
|
// continue if we're already registered for sleep callbacks
|
|
if ( m_registeredSleep ) continue;
|
|
// otherwise register for sleep callback to try again
|
|
g_loop.registerSleepCallback (5000/*ms*/,this,sleepWrapper2,
|
|
m_niceness);
|
|
m_registeredSleep = true;
|
|
}
|
|
// if we had an error then we'll be called again in a second
|
|
if ( hadError ) return;
|
|
// otherwise, unregister sleep callback if we had no error
|
|
if ( m_registeredSleep ) {
|
|
g_loop.unregisterSleepCallback ( this , sleepWrapper2 );
|
|
m_registeredSleep = false;
|
|
}
|
|
}
|
|
|
|
void sleepWrapper2 ( int bogusfd , void *state ) {
|
|
Multicast *THIS = (Multicast *)state;
|
|
// try another round of sending to see if hosts had errors or not
|
|
THIS->sendToGroup ( );
|
|
}
|
|
|
|
// C wrapper for the C++ callback
|
|
void gotReplyWrapperM2 ( void *state , UdpSlot *slot ) {
|
|
Multicast *THIS = (Multicast *)state;
|
|
THIS->gotReply2 ( slot );
|
|
}
|
|
|
|
// . otherwise, we were sending to a whole group so ALL HOSTS must produce a
|
|
// successful reply
|
|
// . we keep re-trying forever until they do
|
|
void Multicast::gotReply2 ( UdpSlot *slot ) {
|
|
// don't ever let UdpServer free this send buf (it is m_msg)
|
|
slot->m_sendBufAlloc = NULL;
|
|
// save this for msg4 logic that calls injection callback
|
|
m_slot = slot;
|
|
// . log the error
|
|
// . ETRYAGAIN often happens when we are falling too far behind in
|
|
// our merging (see Rdb.cpp) and we enter urgent merge mode
|
|
// . it may also happen if tree is too full and is being dumped to disk
|
|
//if ( g_errno && g_errno != ETRYAGAIN )
|
|
// log("net: Got error reply sending to a host during a "
|
|
// "group send: %s.", mstrerror(g_errno) );
|
|
// set m_errnos for this slot
|
|
int32_t i;
|
|
for ( i = 0 ; i < m_numHosts ; i++ ) if ( m_slots[i] == slot ) break;
|
|
// if it matched no slot that's weird
|
|
if ( i == m_numHosts ) {
|
|
//log("not our slot: mcast=%" UINT32 "",(int32_t)this);
|
|
log(LOG_LOGIC,"net: multicast: Not our slot."); return; }
|
|
// clear a timeout error on dead hosts
|
|
if ( g_conf.m_giveupOnDeadHosts &&
|
|
g_hostdb.isDead ( m_hostPtrs[i]->m_hostId ) ) {
|
|
log ( "net: GIVING UP ON DEAD HOST! This will not "
|
|
"return an error." );
|
|
g_errno = 0;
|
|
}
|
|
// set m_errnos to g_errno, if any
|
|
m_errnos[i] = g_errno;
|
|
// if g_errno was not set we have a legit reply
|
|
if ( ! g_errno ) m_numReplies++;
|
|
// reset g_errno in case we do more sending
|
|
g_errno = 0;
|
|
// . if we got all the legit replies we're done, call the callback
|
|
// . all slots should be destroyed by UdpServer in this case
|
|
if ( m_numReplies >= m_numHosts ) {
|
|
// allow us to be re-used now, callback might relaunch
|
|
m_inUse = false;
|
|
if ( m_callback ) {
|
|
// uint64_t profilerStart,profilerEnd;
|
|
// uint64_t statStart,statEnd;
|
|
|
|
//if(g_conf.m_profilingEnabled){
|
|
// address=(int32_t)m_callback;
|
|
// g_profiler.startTimer(address,
|
|
// __PRETTY_FUNCTION__);
|
|
//}
|
|
//g_loop.startBlockedCpuTimer();
|
|
m_callback ( m_state , m_state2 );
|
|
//if(g_conf.m_profilingEnabled){
|
|
// if(!g_profiler.endTimer(address,
|
|
// __PRETTY_FUNCTION__))
|
|
// log(LOG_WARN,"admin: Couldn't add the"
|
|
// "fn %" INT32 "", (int32_t)address);
|
|
//}
|
|
}
|
|
return;
|
|
}
|
|
// if this guy had no error then wait for more callbacks
|
|
if ( ! m_errnos[i] ) return;
|
|
// bring this slot out of retirement so we can send to him again
|
|
m_retired[i] = false;
|
|
// do indeed log the try again things, cuz we have gotten into a
|
|
// nasty loop with them that took me a while to track down
|
|
bool logIt = false;
|
|
static int32_t s_elastTime = 0;
|
|
if ( m_errnos[i] != ETRYAGAIN ) logIt = true;
|
|
// log it every 10 seconds even if it was a try again
|
|
else {
|
|
int32_t now = getTime();
|
|
if (now - s_elastTime > 10) {s_elastTime = now; logIt=true;}
|
|
}
|
|
// don't log ETRYAGAIN, may come across as bad when it is normal
|
|
if ( m_errnos[i] == ETRYAGAIN ) logIt = false;
|
|
//logIt = true;
|
|
// log a failure msg
|
|
if ( logIt ) { // m_errnos[i] != ETRYAGAIN ) {
|
|
Host *h = m_hostdb->getHost ( slot->m_ip ,slot->m_port );
|
|
if ( h )
|
|
log("net: Got error sending request to hostId %" INT32 " "
|
|
"(msgType=0x%hhx transId=%" INT32 " net=%s): "
|
|
"%s. Retrying.",
|
|
h->m_hostId, slot->m_msgType, slot->m_transId,
|
|
m_hostdb->getNetName(),mstrerror(m_errnos[i]) );
|
|
else
|
|
log("net: Got error sending request to %s:%" INT32 " "
|
|
"(msgType=0x%hhx transId=%" INT32 " net=%s): "
|
|
"%s. Retrying.",
|
|
iptoa(slot->m_ip), (int32_t)slot->m_port,
|
|
slot->m_msgType, slot->m_transId,
|
|
m_hostdb->getNetName(),mstrerror(m_errnos[i]) );
|
|
}
|
|
// . let's sleep for a second before retrying the send
|
|
// . the g_errno could be ETRYAGAIN which happens if we're trying to
|
|
// add data but the other host is temporarily full
|
|
// . continue if we're already registered for sleep callbacks
|
|
if ( m_registeredSleep ) return ;
|
|
// . otherwise register for sleep callback to try again
|
|
// . sleepWrapper2() will call sendToGroup() for us
|
|
g_loop.registerSleepCallback (5000/*ms*/,this,sleepWrapper2,
|
|
m_niceness);
|
|
m_registeredSleep = true;
|
|
// . this was bad cause it looped incessantly quickly!
|
|
// . when we finally return, udpServer destroy this slot
|
|
// . try to re-send this guy again on error
|
|
// . this should always block
|
|
// sendToGroup ();
|
|
}
|
|
|
|
///////////////////////////////////////////////////////
|
|
// //
|
|
// PICK & SEND //
|
|
// //
|
|
///////////////////////////////////////////////////////
|
|
|
|
//static void gotBestHostWrapper ( void *state ) ;
|
|
|
|
// . returns false and sets g_errno on error
|
|
// . returns true if managed to send to one host ok (initially at least)
|
|
// . uses key to pick the first host to send to (for consistency)
|
|
// . after we pick a host and launch the request to him the sleepWrapper1
|
|
// will call this at regular intervals, so be careful, if msg34 is in
|
|
// progress, then just skip it and use pickBestHost!
|
|
bool Multicast::sendToHostLoop ( int32_t key , int32_t hostNumToTry ,
|
|
int32_t firstHostId ) {
|
|
// erase any errors we may have got
|
|
g_errno = 0 ;
|
|
// ALL multicast classes share "s_lastPing"
|
|
//static time_t s_lastPing = 0;
|
|
// . every 10 seconds we send to a random host in this group in
|
|
// addition to the best host to act like a ping
|
|
// . this keeps the ping times of all the hosts fresh
|
|
// . but we should only use msgs NOT of type 0 so we don't send over
|
|
// huge indexLists!
|
|
/*
|
|
if ( m_msgType == 0x36 && getTime() - s_lastPing > 10 ) {
|
|
// . pick a host other than the best at random
|
|
// . if there's multiple dead hosts we should hit a random one
|
|
int32_t i = pickRandomHost();
|
|
// if we got one, try sending to it
|
|
if ( i >= 0 ) sendToHost(i);
|
|
// erase any errors we may have got
|
|
g_errno = 0 ;
|
|
// note time of our last ping for ALL multicast classes
|
|
s_lastPing = getTime();
|
|
// the best host
|
|
int32_t bh = pickBestHost ( key );
|
|
// if we sent to the one we were going to already, return now
|
|
if ( i >= 0 && i == bh ) return true;
|
|
// if we sent to the one and only host, bail
|
|
if ( i >= 0 && bh < 0 ) return true;
|
|
}
|
|
*/
|
|
loop:
|
|
|
|
// if it is already in progress... wait for it to get back
|
|
//if ( m_msg34.isInProgress() ) return true ;
|
|
|
|
int32_t i;
|
|
|
|
// what if this host is dead?!?!?
|
|
if ( hostNumToTry >= 0 ) // && ! g_hostdb.isDead(hostNumToTry) )
|
|
i = hostNumToTry;
|
|
// try to stay on the same switch if the twin groups are on
|
|
// different switches
|
|
//else if ( g_conf.m_splitTwins && m_msgType == 0x00 )
|
|
// i = pickBestHost2 (key,-1,true);
|
|
// . gigablast will often fetch data across the network even if it
|
|
// is available locally because it thinks it will be faster than
|
|
// hitting the local disk too much. This is often bad, because local
|
|
// disk can be a fast scsi and network can be slow. so override
|
|
// gigablast here
|
|
// . only use this for msg0s, like for reading termlists, stuff like
|
|
// msg39 should still be routed without any problems
|
|
// . this is messing up the biasing logic in Msg51 which calls Msg0
|
|
// to get a cluster rec and want to bias the page cache to save mem
|
|
// . also, Msg0 should handle preferLocalReads itself. it has logic
|
|
// in there just for that
|
|
//else if ( g_conf.m_preferLocalReads &&
|
|
// ( m_msgType == 0x00 ) ) { // || m_msgType == 0x22 ) ) {
|
|
// i = pickBestHost (key,-1/*firstHostId*/,true/*preferLocal?*/);
|
|
//}
|
|
// . only requests that typically read from disk should set
|
|
// m_doDiskLoadBalancing to true
|
|
// . Msg39 should not do it otherwise host#16 ends up being starved
|
|
// since host0 is the gateway
|
|
// . this is called at regular intervals by sleepWrapper1 with
|
|
// hostNumToTry set to -1 so don't call Msg34 if its already going
|
|
// . NOTE: this is essentially an advanced replacement for
|
|
// pickBestHost() so it should return essentially the same values
|
|
// . pickBestHost() doesn't have a problem returning retired host #s
|
|
//else if ( m_doDiskLoadBalancing ) { // && hostNumToTry == -1 ) {
|
|
/*
|
|
else if ( m_doDiskLoadBalancing && firstHostId < 0 ) {
|
|
// debug msg
|
|
//log("getting best host (this=%" INT32 ")",(int32_t)&m_msg34);
|
|
// . if multiple hosts have it in memory,prefers the local host
|
|
// . return true if this blocks
|
|
if ( ! m_msg34.getBestHost ( m_hosts ,
|
|
m_retired ,
|
|
m_numHosts ,
|
|
m_niceness ,
|
|
m_maxCacheAge ,
|
|
m_cacheKey ,
|
|
m_rdbId ,
|
|
this ,
|
|
gotBestHostWrapper ) )
|
|
return true;
|
|
// . if we did not block then get the winning host
|
|
// . best hostNum is -1 if none untried or all had errors
|
|
// . this can return a retired host number if all its twins
|
|
// and itself are dead
|
|
i = m_msg34.getBestHostNum ();
|
|
// if no candidate, try pickBestHost
|
|
if ( i < 0 ) i = pickBestHost ( key , -1 , false );
|
|
}
|
|
*/
|
|
// . otherwise we had an error on this host
|
|
// . pick the next best host we haven't sent to yet
|
|
// . returns -1 if we've sent to all of them in our group, m_hosts
|
|
//else i = pickBestHost ( (uint32_t)key , firstHostId );
|
|
//else i = pickBestHost ( key , -1 , false ); // firstHostId
|
|
else i = pickBestHost ( key , firstHostId , false ); // firstHostId
|
|
|
|
// do not resend to retired hosts
|
|
if ( m_retired[i] ) i = -1;
|
|
// debug msg
|
|
//if ( m_msgType == 0x39 || m_msgType == 0x00 )
|
|
// log("Multicast:: routing msgType=0x%hhx to hostId %" INT32 "",
|
|
// m_msgType,m_hosts[i].m_hostId);
|
|
// . if no more hosts return FALSE
|
|
// . we need to return false to the caller of us below
|
|
if ( i < 0 ) {
|
|
// debug msg
|
|
//log("Multicast:: no hosts left to send to");
|
|
g_errno = ENOHOSTS; return false; }
|
|
|
|
// log("build: msg %x sent to host %"INT32 " first hostId is %"INT32
|
|
// " outstanding msgs %"INT32,
|
|
// m_msgType, i, firstHostId, m_hostPtrs[i]->m_numOutstandingRequests);
|
|
|
|
// . send to this guy, if we haven't yet
|
|
// . returns false and sets g_errno on error
|
|
// . if it returns true, we sent ok, so we should return true
|
|
// . will return false if the whole thing is timed out and g_errno
|
|
// will be set to ETIMEDOUT
|
|
// . i guess ENOSLOTS means the udp server has no slots available
|
|
// for sending, so its pointless to try to send to another host
|
|
if ( sendToHost ( i ) ) return true;
|
|
// if no more slots, we're done, don't loop!
|
|
if ( g_errno == ENOSLOTS ) return false;
|
|
// pointless as well if no time left in the multicast
|
|
if ( g_errno == EUDPTIMEDOUT ) return false;
|
|
// or if shutting down the server! otherwise it loops forever and
|
|
// won't exit when sending a msg20 request. i've seen this...
|
|
if ( g_errno == ESHUTTINGDOWN ) return false;
|
|
// otherwise try another host and hope for the best
|
|
g_errno = 0;
|
|
key = 0 ;
|
|
// what kind of error leads us here? EBUFTOOSMALL or EBADENGINEER...
|
|
hostNumToTry = -1;
|
|
goto loop;
|
|
}
|
|
|
|
/*
|
|
void gotBestHostWrapper ( void *state ) {
|
|
Multicast *THIS = (Multicast *)state;
|
|
//int32_t i = THIS->m_msg34.getBestHostNum ( );
|
|
// . if we could select none, go with non-intelligent load balancing
|
|
// . this should still return -1 if all hosts retired though
|
|
//if ( i < 0 ) i = THIS->pickBestHost ( 0 , -1 , false );
|
|
int32_t i = THIS->pickBestHost ( 0 , -1 , false );
|
|
// . if we got a candidate to try to send to him
|
|
// . i is -1 if we could get none
|
|
// . this also returns false on ENOSLOTS, if no slots available
|
|
// for sending on.
|
|
if ( i >= 0 && THIS->sendToHostLoop ( 0 , i , -1 ) ) return;
|
|
// if i was -1 or sendToHostLoop failed return now if we are still
|
|
// awaiting a reply... gotReplyWrapperM1() will be called when that
|
|
// reply comes back and that will call closeUpShop().
|
|
if ( THIS->m_numLaunched > 0 ) return;
|
|
// EUREKA! if the Msg34 replies timeout, that
|
|
// sets Msg34's LoadPoints m_errno var, and we end up with
|
|
// no host to try AFTER blocking, which means we're responsible
|
|
// for closing up shop and calling the callback.
|
|
// just call the closeUpShop() routine.
|
|
THIS->closeUpShop ( NULL );
|
|
}
|
|
*/
|
|
|
|
/*
|
|
int32_t Multicast::pickBestHost2 ( uint32_t key , int32_t firstHostId ,
|
|
bool preferLocal ) {
|
|
// now select the host on our same network switch
|
|
int32_t hpg = m_hostdb->m_numHostsPerShard;
|
|
// . get the hostid range on our switch
|
|
// . a segment is all the hosts on the same switch
|
|
int32_t segmentSize = m_hostdb->m_numHosts / hpg;
|
|
// get our segment
|
|
int32_t segment = m_hostdb->m_hostId / segmentSize;
|
|
int32_t i;
|
|
for ( i = 0 ; i < m_numHosts ; i++ ) {
|
|
// skip if he's dead
|
|
if ( m_hostdb->isDead ( &m_hosts[i] ) ) continue;
|
|
// skip if he's reporting system errors
|
|
if ( m_hostdb->kernelErrors(&m_hosts[i]) ) continue;
|
|
// skip if he's not on our segment
|
|
if ( m_hosts[i].m_hostId / segmentSize != segment ) continue;
|
|
break;
|
|
}
|
|
// return if we got someone in our group
|
|
if ( i < m_numHosts ) {
|
|
if ( g_conf.m_logDebugNet )
|
|
log(LOG_DEBUG,"net: Splitting request to hostid %" INT32 "",
|
|
m_hosts[i].m_hostId);
|
|
return i;
|
|
}
|
|
// if we got nothing, default to this one
|
|
return pickBestHost ( key , firstHostId , preferLocal );
|
|
}
|
|
*/
|
|
|
|
// . pick the fastest host from m_hosts based on avg roundtrip time for ACKs
|
|
// . skip hosts in our m_retired[] list of hostIds
|
|
// . returns -1 if none left to pick
|
|
int32_t Multicast::pickBestHost ( uint32_t key , int32_t firstHostId ,
|
|
bool preferLocal ) {
|
|
// debug msg
|
|
//log("pickBestHost manually");
|
|
// bail if no hosts
|
|
if ( m_numHosts == 0 ) return -1;
|
|
// . should we always pick host on same machine first?
|
|
// . we now only run one instance of gb per physical server, not like
|
|
// the old days... so this is somewhat obsolete... MDW
|
|
/*
|
|
if ( preferLocal && !g_conf.m_interfaceMachine ) {
|
|
for ( int32_t i = 0 ; i < m_numHosts ; i++ )
|
|
if ( m_hosts[i].m_machineNum ==
|
|
m_hostdb->getMyMachineNum() &&
|
|
! m_hostdb->isDead ( &m_hosts[i] ) &&
|
|
! m_hostdb->kernelErrors( &m_hosts[i] ) &&
|
|
! m_retired[i] ) return i;
|
|
}
|
|
*/
|
|
// . if firstHostId not -1, try it first
|
|
// . Msg0 uses this only to select hosts on same machine for now
|
|
// . Msg20 now uses this to try to make sure the lower half of docids
|
|
// go to one twin and the upper half to the other. this makes the
|
|
// tfndb page cache twice as effective when getting summaries.
|
|
if ( firstHostId >= 0 ) {
|
|
//log("got first hostId!!!!");
|
|
// find it in group
|
|
int32_t i;
|
|
for ( i = 0 ; i < m_numHosts ; i++ )
|
|
if ( m_hostPtrs[i]->m_hostId == firstHostId ) break;
|
|
// if not found bitch
|
|
if ( i >= m_numHosts ) {
|
|
log(LOG_LOGIC,"net: multicast: HostId %" INT32 " not "
|
|
"in group.", firstHostId );
|
|
char *xx = NULL; *xx = 0;
|
|
}
|
|
// if we got a match and it's not dead, and not reporting
|
|
// system errors, return it
|
|
if ( i < m_numHosts && ! m_hostdb->isDead ( m_hostPtrs[i] ) &&
|
|
! m_hostdb->kernelErrors ( m_hostPtrs[i] ) )
|
|
return i;
|
|
}
|
|
|
|
// round robin selection
|
|
//static int32_t s_lastGroup = 0;
|
|
//int32_t count = 0;
|
|
//int32_t i ;
|
|
//int32_t slow = -1;
|
|
int32_t numDead = 0;
|
|
int32_t dead = -1;
|
|
int32_t n = 0;
|
|
//int32_t count = 0;
|
|
bool balance = g_conf.m_doStripeBalancing;
|
|
// always turn off stripe balancing for all but these 3 msgTypes
|
|
if ( m_msgType != 0x39 &&
|
|
m_msgType != 0x37 &&
|
|
m_msgType != 0x36 )
|
|
balance = false;
|
|
// . pick the guy in our "stripe" first if we are doing these msgs
|
|
// . this will prevent a ton of msg39s from hitting one host and
|
|
// "spiking" it.
|
|
if ( balance ) n = g_hostdb.m_myHost->m_stripe;
|
|
// . if key is not zero, use it to select a host in this group
|
|
// . if the host we want is dead then do it the old way
|
|
// . ignore the key if balance is true though! MDW
|
|
if ( key != 0 && ! balance ) {
|
|
// often the groupId was selected based on the key, so lets
|
|
// randomize everything up a bit
|
|
uint32_t i = hashLong ( key ) % m_numHosts;
|
|
// if he's not dead or retired use him right away
|
|
if ( ! m_retired[i] &&
|
|
! m_hostdb->isDead ( m_hostPtrs[i] ) &&
|
|
! m_hostdb->kernelErrors( m_hostPtrs[i] ) ) return i;
|
|
}
|
|
|
|
// no no no we need to randomize the order that we try them
|
|
Host *fh = m_hostPtrs[n];
|
|
// if this host is not dead, and not reporting system errors, use him
|
|
if ( ! m_retired[n] &&
|
|
! m_hostdb->isDead(fh) &&
|
|
! m_hostdb->kernelErrors(fh) )
|
|
return n;
|
|
|
|
// . ok now select the kth available host
|
|
// . make a list of the candidates
|
|
int32_t cand[32];
|
|
int32_t nc = 0;
|
|
for ( int32_t i = 0 ; i < m_numHosts ; i++ ) {
|
|
// get the host
|
|
Host *h = m_hostPtrs[i];
|
|
// count those that are dead or are reporting system errors
|
|
if ( m_hostdb->isDead ( h ) || m_hostdb->kernelErrors(h) )
|
|
numDead++;
|
|
// skip host if we've retired it
|
|
if ( m_retired[i] ) continue;
|
|
// if this host is not dead, and not reporting system errors,
|
|
// use him
|
|
if ( !m_hostdb->isDead(h) && !m_hostdb->kernelErrors(h) )
|
|
cand[nc++] = i;
|
|
// pick a dead that isn't retired
|
|
dead = i;
|
|
}
|
|
// if a host was alive and untried, use him next
|
|
if ( nc > 0 ) {
|
|
int32_t k = ((uint32_t)m_key) % nc;
|
|
return cand[k];
|
|
}
|
|
// . come here if all hosts were DEAD
|
|
// . try sending to a host that is dead, but not retired now
|
|
// . if all deadies are retired this will return -1
|
|
// . sometimes a host can appear to be dead even though it was
|
|
// just under severe load
|
|
if ( numDead == m_numHosts ) return dead;
|
|
// otherwise, they weren't all dead so don't send to a deadie
|
|
return -1;
|
|
// . if no host we sent to had an error then we should send to deadies
|
|
// . TODO: we should only send to a deadie if we haven't got back a
|
|
// reply from any live hosts!!
|
|
//if ( numErrors == 0 ) return dead;
|
|
// . now alive host was found that we haven't tried, so return -1
|
|
// . before we were returning hosts that were marked as dead!! This
|
|
// caused problems when the only alive host returned an error code
|
|
// because it would take forever for the dead host to timeout...
|
|
//return -1;
|
|
// update lastGroup
|
|
//if ( ++s_lastGroup >= m_numHosts ) s_lastGroup = 0;
|
|
// return i if we got it
|
|
//if ( count >= m_numHosts ) return slow;
|
|
// otherwise return i
|
|
//return i;
|
|
}
|
|
|
|
// . pick the fastest host from m_hosts based on avg roundtrip time for ACKs
|
|
// . skip hosts in our m_retired[] list of hostIds
|
|
// . returns -1 if none left to pick
|
|
/*
|
|
int32_t Multicast::pickBestHost ( ) {
|
|
int32_t mini = -1;
|
|
int32_t minPing = 0x7fffffff;
|
|
// TODO: reset the sublist ptr????
|
|
// cast the msg to "hostsPerGroup" hosts in group "groupId"
|
|
for ( int32_t i = 0 ; i < m_numHosts ; i++ ) {
|
|
// skip host if we've retired it
|
|
if ( m_retired[i] ) continue;
|
|
// get the host
|
|
Host *h = &m_hosts[i];
|
|
// see if we got a new fastest host, continue if not
|
|
if ( h->m_pingAvg > minPing ) continue;
|
|
minPing = h->m_pingAvg;
|
|
mini = i;
|
|
}
|
|
// return our candidate, may be -1 if all were picked before
|
|
return mini;
|
|
}
|
|
*/
|
|
|
|
// . pick a random host
|
|
// . returns -1 if we already sent to that host (he's retired)
|
|
int32_t Multicast::pickRandomHost ( ) {
|
|
// select one of the dead hosts at random
|
|
int32_t i = rand() % m_numHosts ;
|
|
// if he's retired return -1
|
|
if ( m_retired[i] ) return -1;
|
|
// otherwise, he's a valid candidate
|
|
return i;
|
|
}
|
|
|
|
// . returns false and sets error on g_errno
|
|
// . returns true if kicked of the request (m_msg)
|
|
// . sends m_msg to host "h"
|
|
bool Multicast::sendToHost ( int32_t i ) {
|
|
// sanity check
|
|
if ( i >= m_numHosts ) { char *xx=NULL;*xx=0; }
|
|
// sanity check , bitch if retired
|
|
if ( m_retired [ i ] ) {
|
|
log(LOG_LOGIC,"net: multicast: Host #%" INT32 " is retired. "
|
|
"Bad engineer.",i);
|
|
//char *xx = NULL; *xx = 0;
|
|
return true;
|
|
}
|
|
// debug msg
|
|
//log("sending to host #%" INT32 " (this=%" INT32 ")",i,(int32_t)&m_msg34);
|
|
// . add this host to our retired list so we don't try again
|
|
// . only used by pickBestHost() and sendToHost()
|
|
m_retired [ i ] = true;
|
|
// what time is it now?
|
|
int64_t nowms = gettimeofdayInMilliseconds();
|
|
time_t now = nowms / 1000;
|
|
// save the time
|
|
m_launchTime [ i ] = nowms;
|
|
// sometimes clock is updated on us
|
|
if ( m_startTime > now ) m_startTime = now ;
|
|
// . timeout is in seconds
|
|
// . timeout is just the time remaining for the whole groupcast
|
|
int32_t timeRemaining = m_startTime + m_totalTimeout - now;
|
|
// . if timeout is negative then reset start time so we try forever
|
|
// . no, this could be called by a re-route in sleepWrapper1 in which
|
|
// case we really should timeout.
|
|
// . this can happen if sleepWrapper found a timeout before UdpServer
|
|
// got its timeout.
|
|
if ( timeRemaining <= 0 ) {
|
|
//m_startTime = getTime();; timeout = m_totalTimeout;}
|
|
//g_errno = ETIMEDOUT;
|
|
// this can happen if the udp reply timed out!!! like if a
|
|
// host is under severe load... with Msg23::getLinkText()
|
|
// or Msg22::getTitleRec() timing out on us. basically, our
|
|
// msg23 request tried to send a msg22 request which timed out
|
|
// on it so it sent us back this error.
|
|
if ( g_errno != EUDPTIMEDOUT )
|
|
log(LOG_INFO,"net: multicast: had negative timeout, %" INT32 ". "
|
|
"startTime=%" INT32 " totalTimeout=%" INT32 " "
|
|
"now=%" INT32 ". msgType=0x%hhx "
|
|
"niceness=%" INT32 " clock updated?",
|
|
timeRemaining,m_startTime,m_totalTimeout,
|
|
(int32_t)now,m_msgType,
|
|
(int32_t)m_niceness);
|
|
// we are timed out so do not bother re-routing
|
|
//g_errno = ETIMEDOUT;
|
|
//return false;
|
|
// give it a fighting chance of 2 seconds then
|
|
//timeout = 2;
|
|
timeRemaining = m_totalTimeout;
|
|
}
|
|
// get the host
|
|
Host *h = m_hostPtrs[i];
|
|
// if niceness is 0, use the higher priority udpServer
|
|
UdpServer *us = &g_udpServer;
|
|
if ( m_realtime ) us = &g_udpServer2;
|
|
// send to the same port as us!
|
|
int16_t destPort = h->m_port;
|
|
//if ( m_realtime ) destPort = h->m_port2;
|
|
|
|
// if from hosts2.conf pick the best ip!
|
|
int32_t bestIp = h->m_ip;
|
|
if ( m_hostdb == &g_hostdb2 )
|
|
bestIp = g_hostdb.getBestHosts2IP ( h );
|
|
|
|
#ifdef _GLOBALSPEC_
|
|
// debug message for global spec
|
|
//logf(LOG_DEBUG,"net: mcast2 state=%08" XINT32 "",(int32_t)this);
|
|
#endif
|
|
// sanity check
|
|
//if ( g_hostdb.isDead(h) ) {
|
|
// log("net: trying to send to dead host.");
|
|
// char *xx = NULL;
|
|
// *xx = 0;
|
|
//}
|
|
// don't set hostid if we're sending to a remote cluster
|
|
int32_t hid = h->m_hostId;
|
|
if ( m_hostdb != &g_hostdb ) hid = -1;
|
|
// if sending to a proxy keep this set to -1
|
|
if ( h->m_type != HT_GRUNT ) hid = -1;
|
|
// max resends. if we resend a request dgram this many times and
|
|
// got no ack, bail out with g_errno set to ENOACK. this is better
|
|
// than the timeout because it takes like 20 seconds to mark a
|
|
// host as dead and takes "timeRemaining" seconds to timeout the
|
|
// request
|
|
int32_t maxResends = -1;
|
|
// . only use for nicness 0
|
|
// . it uses a backoff scheme, increments delay for first few resends:
|
|
// . it starts of at 33ms, then 66, then 132, then 200 from there out
|
|
if ( m_niceness == 0 ) maxResends = 4;
|
|
// . send to a single host
|
|
// . this creates a transaction control slot, "udpSlot"
|
|
// . return false and sets g_errno on error
|
|
// . returns true on successful launch and calls callback on completion
|
|
if ( ! us->sendRequest ( m_msg ,
|
|
m_msgSize ,
|
|
m_msgType ,
|
|
bestIp , // h->m_ip ,
|
|
destPort ,
|
|
hid ,
|
|
&m_slots[i] ,
|
|
this , // state
|
|
gotReplyWrapperM1 ,
|
|
timeRemaining , // timeout
|
|
-1 , // backoff
|
|
-1 , // max wait in ms
|
|
m_replyBuf ,
|
|
m_replyBufMaxSize ,
|
|
m_niceness , // cback niceness
|
|
maxResends )) {
|
|
log("net: Had error sending msgtype 0x%hhx to host "
|
|
"#%" INT32 ": %s. Not retrying.",
|
|
m_msgType,h->m_hostId,mstrerror(g_errno));
|
|
// i've seen ENOUDPSLOTS available msg here along with oom
|
|
// condition...
|
|
//char *xx=NULL;*xx=0;
|
|
return false;
|
|
}
|
|
// mark it as outstanding
|
|
m_inProgress[i] = 1;
|
|
#ifdef _GLOBALSPEC_
|
|
// note the slot ptr for reference
|
|
//logf(LOG_DEBUG,"net: mcast2 slotPtr=%08" XINT32 "",(int32_t)&m_slots[i]);
|
|
#endif
|
|
// set our last launch date
|
|
m_lastLaunch = nowms ; // gettimeofdayInMilliseconds();
|
|
// save the host, too
|
|
m_lastLaunchHost = h;
|
|
/*
|
|
// assume this host has more disk load now
|
|
if ( m_doDiskLoadBalancing && ! m_msg34.isInBestHostCache() ) {
|
|
// . add the disk load right after sending it in case we have
|
|
// many successive sends right after this one
|
|
// . in the case of titledb use a low avg read size, otherwise
|
|
// this would be like over 1 Meg (the max titleRec size)
|
|
int32_t avg = m_minRecSizes;
|
|
if ( m_rdbId == RDB_TITLEDB ) avg = 32*1024; // titledb read?
|
|
m_msg34.addLoad ( avg , h->m_hostId , nowms );
|
|
}
|
|
*/
|
|
// count it as launched
|
|
m_numLaunched++;
|
|
// timing debug
|
|
//log("Multicast sent to hostId %" INT32 ", this=%" INT32 ", transId=%" INT32 "",
|
|
// h->m_hostId, (int32_t)this , m_slots[i]->m_transId );
|
|
// . let's sleep so we have a chance to launch to another host in
|
|
// the same group in case this guy takes too long
|
|
// . don't re-register if we already did
|
|
if ( m_registeredSleep ) return true;
|
|
// . otherwise register for sleep callback to try again
|
|
// . sleepWrapper1() will call sendToHostLoop() for us
|
|
g_loop.registerSleepCallback (50/*ms*/,this,sleepWrapper1,m_niceness);
|
|
m_registeredSleep = true;
|
|
#ifdef _GLOBALSPEC_
|
|
// debug msg
|
|
//logf(LOG_DEBUG,"net: mcast registered1 this=%08" XINT32 "",(int32_t)this);
|
|
#endif
|
|
// successful launch
|
|
return true;
|
|
}
|
|
|
|
// this is called every 50 ms so we have the chance to launch our request
|
|
// to a more responsive host
|
|
void sleepWrapper1 ( int bogusfd , void *state ) {
|
|
Multicast *THIS = (Multicast *) state;
|
|
// . if our last launch was less than X seconds ago, wait another tick
|
|
// . we often send out 2+ requests and end up getting one reply before
|
|
// the others and that results in us getting unwanted dgrams...
|
|
// . increasing this delay here results in fewer wasted requests but
|
|
// if a host goes down you don't want a user to wait too long
|
|
// . after a host goes down it's ping takes a few secs to decrease
|
|
// . if a host is shutdown properly it will broadcast a msg to
|
|
// all hosts using Hostdb::broadcast() informing them that it's
|
|
// going down so they know to stop sending to it and mark him as
|
|
// dead
|
|
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
// watch out for someone advancing the system clock
|
|
if ( THIS->m_lastLaunch > now ) THIS->m_lastLaunch = now;
|
|
// get elapsed time since we started the send
|
|
int32_t elapsed = now - THIS->m_lastLaunch;
|
|
int32_t docsWanted;
|
|
int32_t firstResultNum;
|
|
int32_t nqterms;
|
|
int32_t rerankRuleset;
|
|
int32_t wait;
|
|
char exact;
|
|
//int32_t hid = -1;
|
|
Host *hd;
|
|
//log("elapsed = %" INT32 " type=0x%hhx",elapsed,THIS->m_msgType);
|
|
|
|
// . don't relaunch any niceness 1 stuff for a while
|
|
// . it often gets suspended due to query traffic
|
|
//if ( THIS->m_niceness > 0 && elapsed < 800000 ) return;
|
|
if ( THIS->m_niceness > 0 ) return;
|
|
|
|
// TODO: if the host went dead on us, re-route
|
|
|
|
// . Msg36 is used to get the length of an IndexList (termFreq)
|
|
// and is very fast, all in memory, don't wait more than 50ms
|
|
// . if we do re-route this is sucks cuz we'll get slightly different
|
|
// termFreqs which impact the total results count as well as summary
|
|
// generation since it's based on termFreq, not too mention the
|
|
// biggest impact being ordering of search results since the
|
|
// score weight is based on termFreq as well
|
|
// . but unfortunately, this scheme doesn't increase the ping time
|
|
// of dead hosts that much!!
|
|
// . NOTE: 2/26/04: i put most everything to 8000 ms since rerouting
|
|
// too much on an already saturated network of drives just
|
|
// excacerbates the problem. this stuff was originally put here
|
|
// to reroute for when a host went down... let's keep it that way
|
|
//int32_t ta , nb;
|
|
if ( THIS->m_redirectTimeout != -1 ) {
|
|
if ( elapsed < THIS->m_redirectTimeout ) return;
|
|
goto redirectTimedout;
|
|
}
|
|
switch ( THIS->m_msgType ) {
|
|
// term freqs are msg 0x36 and don't hit disk, so reroute all the time
|
|
case 0x36:
|
|
exact = 0;
|
|
// first byte is 1 if caller wants an *exact* termlist size
|
|
// lookup which requires hitting disk and is *much* slower
|
|
if ( THIS->m_msg ) exact = *(char *)(THIS->m_msg);
|
|
// these don't take any resources... unless exact i guess,
|
|
// so let them fly... 10ms or more to reroute
|
|
if ( ! exact && elapsed < 10 ) return;
|
|
//if ( exact && elapsed < 20000 ) return;
|
|
// never re-reoute these, they may be incrementing/decrementing
|
|
// a count, and we only store that count on one host!!
|
|
return;
|
|
break;
|
|
// msg to get a summary from a query (calls msg22)
|
|
// buzz takes extra long! it calls Msg25 sometimes.
|
|
// no more buzz.. put back to 8 seconds.
|
|
// put to 5 seconds now since some hosts freezeup still it seems
|
|
// and i haven't seen a summary generation of 5 seconds
|
|
case 0x20: if ( elapsed < 5000 ) return; break;
|
|
// msg 0x20 calls this to get the title rec
|
|
case 0x22: if ( elapsed < 1000 ) return; break;
|
|
// Msg23 niceness 0 is only for doing &rerank=X queries
|
|
//case 0x23: if ( elapsed < 100000 ) return; break;
|
|
// a request to get the score of a docid, can be *very* intensive
|
|
case 0x3b: if ( elapsed < 500000 ) return; break;
|
|
// related topics request, calls many Msg22 to get titlerecs...
|
|
case 0x24: if ( elapsed < 2000 ) return; break;
|
|
// . msg to get an index list over the net
|
|
// . this limit should really be based on length of the index list
|
|
// . this was 15 then 12 now it is 4
|
|
case 0x00:
|
|
// this should just be for when a host goes down, not for
|
|
// performance reasons, cuz we do pretty good load balancing
|
|
// and when things get saturated, rerouting excacerbates it
|
|
if ( elapsed < 8000 ) return; break;
|
|
// how many bytes were requested?
|
|
/*
|
|
if ( THIS->m_msg ) nb=*(int32_t *)(THIS->m_msg + sizeof(key_t)*2);
|
|
else nb=2000000;
|
|
// . givem 300ms + 1ms per 5000 bytes
|
|
// . a 6M read would be allowed 1500ms before re-routing
|
|
// . a 1M read would be allowed 500ms
|
|
// . a 100k read would be allowed 320ms
|
|
ta = 300 + nb / 5000;
|
|
// limit it
|
|
if ( ta < 100 ) ta = 100;
|
|
if ( ta > 9000 ) ta = 9000; // could this hurt us?
|
|
if ( elapsed < ta ) return;
|
|
break;
|
|
*/
|
|
// msg to get a clusterdb rec
|
|
case 0x38: if ( elapsed < 2000 ) return; break;
|
|
// msg to get docIds from a query, may take a while
|
|
case 0x39:
|
|
// how many docsids request? first 4 bytes of request.
|
|
docsWanted = 10;
|
|
firstResultNum = 0;
|
|
nqterms = 0;
|
|
rerankRuleset = -1;
|
|
if ( THIS->m_msg ) docsWanted = *(int32_t *)(THIS->m_msg);
|
|
if ( THIS->m_msg ) firstResultNum = *(int32_t *)(THIS->m_msg+4);
|
|
if ( THIS->m_msg ) nqterms = *(int32_t *)(THIS->m_msg+8);
|
|
// never re-route if it has a rerank, those take forever
|
|
if ( THIS->m_msg ) rerankRuleset = *(int32_t *)(THIS->m_msg+12);
|
|
if ( rerankRuleset >= 0 ) return;
|
|
// . how many milliseconds of waiting before we re-route?
|
|
// . 100 ms per doc wanted, but if they all end up
|
|
// clustering then docsWanted is no indication of the
|
|
// actual number of titleRecs (or title keys) read
|
|
// . it may take a while to do dup removal on 1 million docs
|
|
wait = 5000 + 100 * (docsWanted+firstResultNum);
|
|
// those big UOR queries should not get re-routed all the time
|
|
if ( nqterms > 0 ) wait += 1000 * nqterms;
|
|
if ( wait < 8000 ) wait = 8000;
|
|
// seems like buzz is hammering the cluster and 0x39'saretiming
|
|
// out too much because of huge title recs taking forever with
|
|
// Msg20
|
|
//if ( wait < 120000 ) wait = 120000;
|
|
if ( elapsed < wait ) return;
|
|
break;
|
|
// these tagdb lookups are usually lickety split, should all be in mem
|
|
case 0x08: if ( elapsed < 10 ) return; break;
|
|
// this no longer exists! it uses msg0
|
|
//case 0x8a: if ( elapsed < 200 ) return; break;
|
|
case 0x8b: if ( elapsed < 10 ) return; break;
|
|
// don't relaunch anything else unless over 8 secs
|
|
default: if ( elapsed < 8000 ) return; break;
|
|
}
|
|
|
|
// find out which host timedout
|
|
//hid = -1;
|
|
hd = NULL;
|
|
//if ( THIS->m_retired[0] && THIS->m_hosts && THIS->m_numHosts >= 1 )
|
|
if ( THIS->m_retired[0] && THIS->m_numHosts >= 1 )
|
|
hd = THIS->m_hostPtrs[0];
|
|
//if ( THIS->m_retired[1] && THIS->m_hosts && THIS->m_numHosts >= 2 )
|
|
if ( THIS->m_retired[1] && THIS->m_numHosts >= 2 )
|
|
hd = THIS->m_hostPtrs[1];
|
|
// 11/21/06: now we only reroute if the host we sent to is marked as
|
|
// dead unless it is a msg type that takes little reply generation time
|
|
if ( hd && // hid >= 0 &&
|
|
//! g_hostdb.isDead(hid) &&
|
|
! g_hostdb.isDead(hd) &&
|
|
//m_msgType != 0x36 && (see above)
|
|
//m_msgType != 0x17 &&
|
|
// hosts freezeup sometimes and we don't get a summary in time...
|
|
// no! we got EDISKSTUCK now and this was causing a problem
|
|
// dumping core in the parse cache logic
|
|
//THIS->m_msgType != 0x20 &&
|
|
THIS->m_msgType != 0x08 &&
|
|
//THIS->m_msgType != 0x8a &&
|
|
THIS->m_msgType != 0x8b )
|
|
return;
|
|
|
|
redirectTimedout:
|
|
// cancel any outstanding transactions iff we have a m_replyBuf
|
|
// that we must read the reply into because we cannot share!!
|
|
if ( THIS->m_readBuf ) THIS->destroySlotsInProgress ( NULL );
|
|
//if ( THIS->m_replyBuf )
|
|
// THIS->destroySlotsInProgress ( NULL );
|
|
|
|
// . do a loop over all hosts in the group
|
|
// . if a whole group of twins is down this will loop forever here
|
|
// every Xms, based the sleepWrapper timer for the msgType
|
|
if ( g_conf.m_logDebugQuery ) {
|
|
for (int32_t i = 0 ; i < THIS->m_numHosts ; i++ ) {
|
|
if ( ! THIS->m_slots[i] ) continue;
|
|
// transaction is not in progress if m_errnos[i] is set
|
|
char *ee = "";
|
|
if ( THIS->m_errnos[i] ) ee = mstrerror(THIS->m_errnos[i]);
|
|
log("net: Multicast::sleepWrapper1: tried host "
|
|
"%s:%" INT32 " %s" ,iptoa(THIS->m_slots[i]->m_ip),
|
|
(int32_t)THIS->m_slots[i]->m_port , ee );
|
|
}
|
|
}
|
|
|
|
// log msg that we are trying to re-route
|
|
//log("Multicast::sleepWrapper1: trying to re-route msgType=0x%hhx "
|
|
// "to new host", THIS->m_msgType );
|
|
|
|
// if we were trying to contact a host in the secondary cluster,
|
|
// mark the host as dead. this is our passive monitoring system.
|
|
if ( THIS->m_hostdb == &g_hostdb2 ) {
|
|
log("net: Marking hostid %" INT32 " in secondary cluster as dead.",
|
|
(int32_t)THIS->m_lastLaunchHost->m_hostId);
|
|
THIS->m_lastLaunchHost->m_ping = g_conf.m_deadHostTimeout;
|
|
}
|
|
|
|
// . otherwise, launch another request if we can
|
|
// . returns true if we successfully sent to another host
|
|
// . returns false and sets g_errno if no hosts left or other error
|
|
if ( THIS->sendToHostLoop(0,-1,-1) ) {
|
|
// msgtype 0x36 is always rerouting because the timeout is so
|
|
// low because it is an easy request to satisfy... so don't
|
|
// flood the logs with it
|
|
int32_t logtype = LOG_WARN;
|
|
if ( THIS->m_msgType == 0x36 ) logtype = LOG_DEBUG;
|
|
// same goes for msg8
|
|
if ( THIS->m_msgType == 0x08 ) logtype = LOG_DEBUG;
|
|
//if ( THIS->m_msgType == 0x8a ) logtype = LOG_DEBUG;
|
|
if ( THIS->m_msgType == 0x8b ) logtype = LOG_DEBUG;
|
|
// log msg that we were successful
|
|
int32_t hid = -1;
|
|
if ( hd ) hid = hd->m_hostId;
|
|
log(logtype,
|
|
"net: Multicast::sleepWrapper1: rerouted msgType=0x%hhx "
|
|
"from host #%" INT32 " "
|
|
"to new host after waiting %" INT32 " ms",
|
|
THIS->m_msgType, hid,elapsed);
|
|
// . mark it in the stats for PageStats.cpp
|
|
// . this is timeout based rerouting
|
|
g_stats.m_reroutes[(int)THIS->m_msgType][THIS->m_niceness]++;
|
|
return;
|
|
}
|
|
// if we registered the sleep callback we must have launched a
|
|
// request to a host so let gotReplyWrapperM1() deal with closeUpShop()
|
|
|
|
// . let replyWrapper1 be called if we got one launched
|
|
// . it should then call closeUpShop()
|
|
//if ( THIS->m_numLaunched ) return;
|
|
// otherwise, no outstanding requests and we failed to send to another
|
|
// host, probably because :
|
|
// 1. Msg34 timed out on all hosts
|
|
// 2. there were no udp slots available (which is bad)
|
|
//log("Multicast:: re-route failed for msgType=%hhx. abandoning.",
|
|
// THIS->m_msgType );
|
|
// . the next send failed to send to a host, so close up shop
|
|
// . this is probably because the Msg34s timed out and we could not
|
|
// find a next "best host" to send to because of that
|
|
//THIS->closeUpShop ( NULL );
|
|
// . we were not able to send to another host, maybe it was dead or
|
|
// there are no hosts left!
|
|
// . i guess keep sleeping until host comes back up or transaction
|
|
// is cancelled
|
|
//log("Multicast::sleepWrapper1: re-route of msgType=0x%hhx failed",
|
|
// THIS->m_msgType);
|
|
}
|
|
|
|
// C wrapper for the C++ callback
|
|
void gotReplyWrapperM1 ( void *state , UdpSlot *slot ) {
|
|
Multicast *THIS = (Multicast *)state;
|
|
// debug msg
|
|
//log("gotReplyWrapperM1 for msg34=%" INT32 "",(int32_t)(&THIS->m_msg34));
|
|
THIS->gotReply1 ( slot );
|
|
}
|
|
|
|
// come here if we've got a reply from a host that's not part of a group send
|
|
void Multicast::gotReply1 ( UdpSlot *slot ) {
|
|
// debug msg
|
|
//log("gotReply1: this=%" INT32 " should exit",(int32_t)&m_msg34);
|
|
// count it as returned
|
|
m_numLaunched--;
|
|
// don't ever let UdpServer free this send buf (it is m_msg)
|
|
slot->m_sendBufAlloc = NULL;
|
|
// remove the slot from m_slots so it doesn't get nuked in
|
|
// gotSlot(slot) routine above
|
|
int32_t i = 0;
|
|
// careful! we might have recycled a slot!!! start with top and go down
|
|
// because UdpServer might give us the same slot ptr on our 3rd try
|
|
// that we had on our first try!
|
|
for ( i = 0 ; i < m_numHosts ; i++ ) {
|
|
// skip if not in progress
|
|
if ( ! m_inProgress[i] ) continue;
|
|
// slot must match
|
|
if ( m_slots[i] == slot ) break;
|
|
}
|
|
// if it matched no slot that's weird
|
|
if ( i >= m_numHosts ) {
|
|
log(LOG_LOGIC,"net: multicast: Not our slot 2.");
|
|
char *xx = NULL; *xx = 0;
|
|
return;
|
|
}
|
|
// set m_errnos[i], if any
|
|
if ( g_errno ) m_errnos[i] = g_errno;
|
|
|
|
// mark it as no longer in progress
|
|
m_inProgress[i] = 0;
|
|
|
|
// if he was marked as dead on the secondary cluster, mark him as up
|
|
Host *h = m_hostPtrs[i];
|
|
if ( m_hostdb == &g_hostdb2 && h && m_hostdb->isDead(h) ) {
|
|
log("net: Marking hostid %" INT32 " in secondary cluster "
|
|
"as alive.",
|
|
(int32_t)h->m_hostId);
|
|
h->m_ping = 0;
|
|
}
|
|
|
|
// save the host we got a reply from
|
|
m_replyingHost = h;
|
|
m_replyLaunchTime = m_launchTime[i];
|
|
|
|
if ( m_sentToTwin )
|
|
log("net: Twin msgType=0x%" XINT32 " (this=0x%" PTRFMT ") "
|
|
"reply: %s.",
|
|
(int32_t)m_msgType,(PTRTYPE)this,mstrerror(g_errno));
|
|
|
|
// on error try sending the request to another host
|
|
// return if we kicked another request off ok
|
|
if ( g_errno ) {
|
|
Host *h;
|
|
char logIt = true;
|
|
// do not log not found on an external network
|
|
if ( g_errno == ENOTFOUND && m_hostdb != &g_hostdb ) goto skip;
|
|
// log the error
|
|
h = m_hostdb->getHost ( slot->m_ip ,slot->m_port );
|
|
// do not log if not expected msg20
|
|
if ( slot->m_msgType == 0x20 && g_errno == ENOTFOUND &&
|
|
! ((Msg20 *)m_state)->m_expected )
|
|
logIt = false;
|
|
if ( slot->m_msgType == 0x20 && g_errno == EMISSINGQUERYTERMS )
|
|
logIt = false;
|
|
if ( h && logIt )
|
|
log("net: Multicast got error in reply from "
|
|
"hostId %" INT32 ""
|
|
" (msgType=0x%hhx transId=%" INT32 " "
|
|
"nice=%" INT32 " net=%s): "
|
|
"%s.",
|
|
h->m_hostId, slot->m_msgType, slot->m_transId,
|
|
m_niceness,
|
|
m_hostdb->getNetName(),mstrerror(g_errno ));
|
|
else if ( logIt )
|
|
log("net: Multicast got error in reply from %s:%" INT32 " "
|
|
"(msgType=0x%hhx transId=%" INT32 " nice =%" INT32 " net=%s): "
|
|
"%s.",
|
|
iptoa(slot->m_ip), (int32_t)slot->m_port,
|
|
slot->m_msgType, slot->m_transId, m_niceness,
|
|
m_hostdb->getNetName(),mstrerror(g_errno) );
|
|
skip:
|
|
// if this slot had an error we may have to tell UdpServer
|
|
// not to free the read buf
|
|
if ( m_replyBuf == slot->m_readBuf ) slot->m_readBuf = NULL;
|
|
// . try to send to another host
|
|
// . on successful sending return, we'll be called on reply
|
|
// . this also returns false if no new hosts left to send to
|
|
// . only try another host if g_errno is NOT ENOTFOUND cuz
|
|
// we have quite a few missing clustRecs and titleRecs
|
|
// and doing a second lookup will decrease query response
|
|
// . if the Msg22 lookup cannot find the titleRec for indexing
|
|
// purposes, it should check any twin hosts because this
|
|
// is very important... if this is for query time, however,
|
|
// then accept the ENOTFOUND without spawning another request
|
|
// . but if the record is really not there we waste seeks!
|
|
// . EBADENGINEER is now used by titledb's Msg22 when a docid
|
|
// is in tfndb but not in titledb (or id2 is invalid)
|
|
// . it is more important that we serve the title rec than
|
|
// the performance gain. if you want the performance gain
|
|
// then you should repair your index to avoid this. therefore
|
|
// send to twin on ENOTFOUND
|
|
// . often, though, we are restring to indexdb root so after
|
|
// doing a lot of deletes there will be a lot of not founds
|
|
// that are really not found (not corruption) so don't do it
|
|
// anymore
|
|
// . let's go for accuracy even for queries
|
|
// . until i fix the bug of losing titlerecs for some reason
|
|
// probably during merges now, we reroute on ENOTFOUND.
|
|
bool sendToTwin = true;
|
|
if ( g_errno == EBADENGINEER ) sendToTwin = false;
|
|
if ( g_errno == EMISSINGQUERYTERMS ) sendToTwin = false;
|
|
if ( g_errno == EMSGTOOBIG ) sendToTwin = false;
|
|
// "Argument list too long"
|
|
if ( g_errno == 7 ) sendToTwin = false;
|
|
// i guess msg50 calls msg25 with no ip sometimes?
|
|
if ( g_errno == EURLHASNOIP ) sendToTwin = false;
|
|
if ( g_errno == EUNCOMPRESSERROR ) sendToTwin = false;
|
|
// ok, let's give up on ENOTFOUND, because the vast majority
|
|
// of time it seems it is really not on the twin either...
|
|
if ( g_errno == ENOTFOUND && m_msgType == 0x020 )
|
|
sendToTwin = false;
|
|
if ( g_errno == ENOTFOUND && m_msgType == 0x022 )
|
|
sendToTwin = false;
|
|
// do not worry if it was a not found msg20 for a titleRec
|
|
// which was not expected to be there
|
|
if ( ! logIt ) sendToTwin = false;
|
|
// or a notfound on the external/secondary cluster
|
|
if ( g_errno == ENOTFOUND && m_hostdb == &g_hostdb2 )
|
|
sendToTwin = false;
|
|
// no longer do this for titledb, too common since msg4
|
|
// cached stuff can make us slightly out of sync
|
|
//if ( g_errno == ENOTFOUND )
|
|
// sendToTwin = false;
|
|
// or a problem with a tfndb lookup, those are different for
|
|
// each twin right now
|
|
if ( m_msgType == 0x00 && m_rdbId == RDB_TFNDB )
|
|
sendToTwin = false;
|
|
// do not send to twin if we are out of time
|
|
time_t now = getTime();
|
|
int32_t timeRemaining = m_startTime + m_totalTimeout - now;
|
|
if ( timeRemaining <= 0 ) sendToTwin = false;
|
|
// send to the twin
|
|
if ( sendToTwin && sendToHostLoop(0,-1,-1) ) {
|
|
log("net: Trying to send request msgType=0x%" XINT32 " "
|
|
"to a twin. (this=0x%" PTRFMT ")",
|
|
(int32_t)m_msgType,(PTRTYPE)this);
|
|
m_sentToTwin = true;
|
|
// . keep stats
|
|
// . this is error based rerouting
|
|
// . this can be timeouts as well, if the
|
|
// receiver sent a request itself and that
|
|
// timed out...
|
|
g_stats.m_reroutes[(int)m_msgType][m_niceness]++;
|
|
return;
|
|
}
|
|
// . otherwise we've failed on all hosts
|
|
// . re-instate g_errno,might have been set by sendToHostLoop()
|
|
g_errno = m_errnos[i];
|
|
// unregister our sleep wrapper if we did
|
|
//if ( m_registeredSleep ) {
|
|
// g_loop.unregisterSleepCallback ( this, sleepWrapper1 );
|
|
// m_registeredSleep = false;
|
|
//}
|
|
// destroy all slots that may be in progress (except "slot")
|
|
//destroySlotsInProgress ( slot );
|
|
// call callback with g_errno set
|
|
//if ( m_callback ) m_callback ( m_state );
|
|
// we're done, all slots should be destroyed by UdpServer
|
|
//return;
|
|
}
|
|
closeUpShop ( slot );
|
|
}
|
|
|
|
void Multicast::closeUpShop ( UdpSlot *slot ) {
|
|
// sanity check
|
|
if ( ! m_inUse ) { char *xx=NULL;*xx=0; }
|
|
// debug msg
|
|
//log("Multicast exiting (this=%" INT32 ")",(int32_t)&m_msg34);
|
|
// destroy the OTHER slots we've spawned that are in progress
|
|
destroySlotsInProgress ( slot );
|
|
// if we have no slot per se, skip this stuff
|
|
if ( ! slot ) goto skip;
|
|
// . now we have a good reply... but not if g_errno is set
|
|
// . save the reply of this slot here
|
|
// . this is bad if we got an g_errno above, it will set the slot's
|
|
// readBuf to NULL up there, and that will make m_readBuf NULL here
|
|
// causing a mem leak. i fixed by adding an mfree on m_replyBuf
|
|
// in Multicast::reset() routine.
|
|
// . i fixed again by ensuring we do not set m_ownReadBuf to false
|
|
// in getBestReply() below if m_readBuf is NULL
|
|
m_readBuf = slot->m_readBuf;
|
|
m_readBufSize = slot->m_readBufSize;
|
|
m_readBufMaxSize = slot->m_readBufMaxSize;
|
|
// . if the slot had an error, propagate it so it will be set when
|
|
// we call the callback.
|
|
if(!g_errno) g_errno = slot->m_errno;
|
|
// . sometimes UdpServer will read the reply into a temporary buffer
|
|
// . this happens if the udp server is hot (async signal based) and
|
|
// m_replyBuf is NULL because he cannot malloc a buf to read into
|
|
// because malloc is not async signal safe
|
|
if ( slot->m_tmpBuf == slot->m_readBuf ) m_freeReadBuf = false;
|
|
// don't let UdpServer free the readBuf now that we point to it
|
|
slot->m_readBuf = NULL;
|
|
|
|
// save slot so msg4 knows what slot replied in udpserver
|
|
// for doing its flush callback logic
|
|
m_slot = slot;
|
|
|
|
skip:
|
|
// unregister our sleep wrapper if we did
|
|
if ( m_registeredSleep ) {
|
|
g_loop.unregisterSleepCallback ( this , sleepWrapper1 );
|
|
m_registeredSleep = false;
|
|
#ifdef _GLOBALSPEC_
|
|
// debug msg
|
|
//logf(LOG_DEBUG,"net: mcast unregistered1 this= %08" XINT32 "",
|
|
// (int32_t)this);
|
|
#endif
|
|
}
|
|
// unregister our sleep wrapper if we did
|
|
if ( m_registeredSleep2 ) {
|
|
g_loop.unregisterSleepCallback ( this , sleepWrapper1b );
|
|
m_registeredSleep2 = false;
|
|
}
|
|
// ok, now if infiniteLoop is true, we must retry. this helps us
|
|
// alot if one host is under such sever load that any disk read
|
|
// returns ETRYAGAIN because there is no room in the thread queue.
|
|
// do not retry on persistent error, only on temporary ones.
|
|
if ( m_retryForever &&
|
|
g_errno &&
|
|
g_errno != ENOTFOUND &&
|
|
g_errno != EMISSINGQUERYTERMS &&
|
|
g_errno != EBADENGINEER ) {
|
|
log("net: Multicast retrying request send in 2 seconds.");
|
|
m_retryCount++;
|
|
// bail if already registered
|
|
if ( m_registeredSleep2 ) return;
|
|
// try the whole shebang again every 2 seconds
|
|
if ( ! g_loop.registerSleepCallback (2000,this,sleepWrapper1b,
|
|
m_niceness))
|
|
log("net: Failed to register sleep callback to "
|
|
"resend multicast request. Giving up. Retry "
|
|
"failed.");
|
|
else
|
|
m_registeredSleep2 = true;
|
|
return;
|
|
}
|
|
if ( ! g_errno && m_retryCount > 0 )
|
|
log("net: Multicast succeeded after %" INT32 " retries.",m_retryCount);
|
|
// allow us to be re-used now, callback might relaunch
|
|
m_inUse = false;
|
|
// now call the user callback if it exists
|
|
if ( m_callback ) {
|
|
// uint64_t profilerStart;
|
|
//uint64_t profilerEnd;
|
|
//uint64_t statStart,statEnd;
|
|
|
|
//if (g_conf.m_profilingEnabled){
|
|
// //profilerStart=gettimeofdayInMillisecondsLocal();
|
|
// address=(int32_t)m_callback;
|
|
// g_profiler.startTimer(address, __PRETTY_FUNCTION__);
|
|
//}
|
|
//g_loop.startBlockedCpuTimer();
|
|
m_callback ( m_state , m_state2 );
|
|
//if (g_conf.m_profilingEnabled) {
|
|
// if(!g_profiler.endTimer(address,__PRETTY_FUNCTION__))
|
|
// log(LOG_WARN,"admin: Couldn't add the fn %" INT32 "",
|
|
// (int32_t)address);
|
|
//}
|
|
}
|
|
}
|
|
|
|
void sleepWrapper1b ( int bogusfd , void *state ) {
|
|
Multicast *THIS = (Multicast *)state;
|
|
// clear m_retired, m_errnos, m_slots
|
|
memset ( THIS->m_retired, 0, sizeof(char ) * MAX_HOSTS_PER_GROUP );
|
|
memset ( THIS->m_errnos , 0, sizeof(int32_t ) * MAX_HOSTS_PER_GROUP );
|
|
memset ( THIS->m_slots , 0, sizeof(UdpSlot *) * MAX_HOSTS_PER_GROUP );
|
|
memset ( THIS->m_inProgress,0,sizeof(char ) * MAX_HOSTS_PER_GROUP );
|
|
// retry the whole she-bang
|
|
if ( THIS->sendToHostLoop ( THIS->m_key , -1 , -1 ) ) {
|
|
// if call succeeded, unregister our sleep callback
|
|
g_loop.unregisterSleepCallback ( THIS , sleepWrapper1b );
|
|
return;
|
|
}
|
|
// otherwise, retry forever
|
|
log("net: Failed to launch multicast request. THIS=%" PTRFMT ". Waiting "
|
|
"and retrying.",(PTRTYPE)THIS);
|
|
}
|
|
|
|
// destroy all slots that may be in progress (except "slot")
|
|
void Multicast::destroySlotsInProgress ( UdpSlot *slot ) {
|
|
// . always destroy any msg34 slots in progress
|
|
// . if we re-route then we span new msg34 requests, and if we get
|
|
// back an original reply we need to take out those msg34 requests
|
|
// because if they get a reply they may try to access a Multicast
|
|
// class that no longer exists
|
|
//if ( m_doDiskLoadBalancing ) m_msg34.destroySlotsInProgress ( );
|
|
// do a loop over all hosts in the group
|
|
for (int32_t i = 0 ; i < m_numHosts ; i++ ) {
|
|
// . destroy all slots but this one that are in progress
|
|
// . we'll be destroyed when we return from the cback
|
|
if ( ! m_slots[i] ) continue;
|
|
// transaction is not in progress if m_errnos[i] is set
|
|
if ( m_errnos[i] ) continue;
|
|
// dont' destroy us, it'll happen when we return
|
|
if ( m_slots[i] == slot ) continue;
|
|
// must be in progress
|
|
if ( ! m_inProgress[i] ) continue;
|
|
// sometimes the slot is recycled from under us because
|
|
// we already got a reply from it
|
|
//if ( m_slots[i]->m_state != this ) continue;
|
|
// don't free his sendBuf, readBuf is ok to free, however
|
|
m_slots[i]->m_sendBufAlloc = NULL;
|
|
// if niceness is 0, use the higher priority udpServer
|
|
UdpServer *us = &g_udpServer;
|
|
if ( m_realtime ) us = &g_udpServer2;
|
|
// . stamp him so he doesn't have a better ping than host of #i
|
|
// . timedOut=true -->only stamp him if it makes his ping worse
|
|
//int32_t hostId = m_slots[i]->m_hostId;
|
|
//int64_t lastSendTime = m_slots[i]->m_lastSendTime;
|
|
//int64_t now = gettimeofdayInMilliseconds() ;
|
|
//int64_t tripTime = now - lastSendTime;
|
|
// . we no longer stamp hosts here, leave that up to
|
|
// Hostdb::pingHost()
|
|
// tripTime is always in milliseconds
|
|
//m_hostdb->stampHost ( hostId , tripTime , true/*timedOut?*/);
|
|
//#ifdef _DEBUG_
|
|
//fprintf(stderr,"stamping host #%" INT32 " w/ tripTime=%" INT64 "ms\n",
|
|
// hostId, tripTime);
|
|
//#endif
|
|
|
|
// if caller provided the buffer, don't free it cuz "slot"
|
|
// contains it (or m_readBuf)
|
|
if ( m_replyBuf == m_slots[i]->m_readBuf )
|
|
m_slots[i]->m_readBuf = NULL;
|
|
// destroy this slot that's in progress
|
|
us->destroySlot ( m_slots[i] );
|
|
// do not re-destroy. consider no longer in progress.
|
|
m_inProgress[i] = 0;
|
|
}
|
|
}
|
|
|
|
|
|
// we set *freeReply to true if you'll need to free it
|
|
char *Multicast::getBestReply ( int32_t *replySize ,
|
|
int32_t *replyMaxSize ,
|
|
bool *freeReply,
|
|
bool steal) {
|
|
*replySize = m_readBufSize;
|
|
*replyMaxSize = m_readBufMaxSize;
|
|
if(steal) m_freeReadBuf = false;
|
|
*freeReply = m_freeReadBuf;
|
|
// this can be NULL if we destroyed the slot in progress only to
|
|
// try another host who was dead!
|
|
if ( m_readBuf ) m_ownReadBuf = false;
|
|
return m_readBuf;
|
|
}
|
|
|