mirror of
https://github.com/privacore/open-source-search-engine.git
synced 2025-01-22 02:18:42 -05:00
e515e92dae
It has always been local time since ... forever. We rely on NTP doing its job.
2608 lines
84 KiB
C++
2608 lines
84 KiB
C++
#include "UdpServer.h"
|
|
#include "UdpProtocol.h"
|
|
#include "UdpSlot.h"
|
|
#include "Hostdb.h"
|
|
#include "Profiler.h"
|
|
#include "Stats.h"
|
|
#include "Proxy.h"
|
|
#include "Process.h"
|
|
#include "Loop.h"
|
|
#include "IPAddressChecks.h"
|
|
#include "BitOperations.h"
|
|
#include "Msg0.h" //RDBIDOFFSET
|
|
#include "Rdb.h" //RDB_...
|
|
#include "GbUtil.h"
|
|
#include "Mem.h"
|
|
#include "max_niceness.h"
|
|
#include "ScopedLock.h"
|
|
#include <netinet/in.h> // ntohl() ntohs()
|
|
#include "ip.h"
|
|
#include "Conf.h"
|
|
#include "Errno.h"
|
|
#include <assert.h>
|
|
#include <unistd.h>
|
|
|
|
|
|
// . any changes made to the slots should only be done without risk of
|
|
// interruption because makeCallbacks() reads from the slots to call
|
|
// callbacks, we don't want it reading garbage
|
|
|
|
|
|
int32_t g_dropped = 0;
|
|
static int32_t g_consecutiveOOMErrors = 0;
|
|
|
|
// . making a hot udp server (realtime signal based)
|
|
// . caller calls to sendRequest() or sendReply() should turn off interrupts
|
|
// before messing with our data
|
|
// . timepoll should turn off interrupts, too
|
|
// . we should call sigqueue if a callback needs to be made if we're hot
|
|
|
|
// a global class extern'd in .h file
|
|
UdpServer g_udpServer;
|
|
|
|
// used when sendRequest() is called with a NULL callback
|
|
static void defaultCallbackWrapper(void * /*state*/, UdpSlot * /*slot*/) {
|
|
}
|
|
|
|
|
|
// free send/readBufs
|
|
void UdpServer::reset() {
|
|
|
|
// clear our slots
|
|
if ( ! m_slots ) return;
|
|
log(LOG_DEBUG,"db: resetting udp server");
|
|
mfree ( m_slots , m_maxSlots * sizeof(UdpSlot) , "UdpServer" );
|
|
m_slots = NULL;
|
|
if ( m_buf ) mfree ( m_buf , m_bufSize , "UdpServer");
|
|
m_buf = NULL;
|
|
}
|
|
|
|
|
|
UdpServer::UdpServer ( ) {
|
|
m_sock = -1;
|
|
m_slots = NULL;
|
|
m_maxSlots = 0;
|
|
m_buf = NULL;
|
|
m_writeRegistered = false;
|
|
|
|
// Coverity
|
|
m_nextTransId = 0;
|
|
memset(m_handlers, 0, sizeof(m_handlers));
|
|
m_needToSend = false;
|
|
m_port = 0;
|
|
m_proto = NULL;
|
|
m_isShuttingDown = false;
|
|
m_needBottom = false;
|
|
m_requestsInWaiting = 0;
|
|
m_msg07sInWaiting = 0;
|
|
m_msg25sInWaiting = 0;
|
|
m_msg39sInWaiting = 0;
|
|
m_msg20sInWaiting = 0;
|
|
m_msg0csInWaiting = 0;
|
|
m_msg0sInWaiting = 0;
|
|
m_ptrs = NULL;
|
|
m_numBuckets = 0;
|
|
m_bucketMask = 0;
|
|
m_bufSize = 0;
|
|
m_availableListHead = NULL;
|
|
m_activeListHead = NULL;
|
|
m_activeListTail = NULL;
|
|
m_callbackListHead = NULL;
|
|
m_callbackListTail = NULL;
|
|
m_numUsedSlots = 0;
|
|
m_numUsedSlotsIncoming = 0;
|
|
m_isDns = false;
|
|
m_eth0BytesIn = 0;
|
|
m_eth0BytesOut = 0;
|
|
m_eth0PacketsIn = 0;
|
|
m_eth0PacketsOut = 0;
|
|
m_eth1BytesIn = 0;
|
|
m_eth1BytesOut = 0;
|
|
m_eth1PacketsIn = 0;
|
|
m_eth1PacketsOut = 0;
|
|
m_outsiderPacketsIn = 0;
|
|
m_outsiderPacketsOut = 0;
|
|
m_outsiderBytesIn = 0;
|
|
m_outsiderBytesOut = 0;
|
|
}
|
|
|
|
|
|
UdpServer::~UdpServer() {
|
|
reset();
|
|
}
|
|
|
|
|
|
//Enlarge receive or send buffer on UDP socket. The trouble is that if we try
|
|
//to set them too high then setsockopt() just fails, so we have to do a binary
|
|
//search for the maximum size. Or use some ghastly linux-specificy way of
|
|
//seeing what the kernel will allow.
|
|
static void enlargeUdpSocketBufffer(int fd, const char *bufname, int optname, int proposed_size)
|
|
{
|
|
int current_buffer_size;
|
|
socklen_t optlen = sizeof(current_buffer_size);
|
|
|
|
if(getsockopt(fd, SOL_SOCKET, optname, (char*)¤t_buffer_size, &optlen)) {
|
|
log(LOG_ERROR,"udp: Could not getsockopt() on fd %d, errno = %d", fd, errno);
|
|
return;
|
|
}
|
|
|
|
if(current_buffer_size>=proposed_size) {
|
|
log(LOG_DEBUG, "udp: %s buffer on fd %d is already at %d", bufname, fd, current_buffer_size);
|
|
return;
|
|
}
|
|
|
|
|
|
int buffer_size = proposed_size;
|
|
|
|
while(buffer_size > current_buffer_size) {
|
|
if(setsockopt(fd, SOL_SOCKET, optname, (const char*)&buffer_size, sizeof(buffer_size)) == 0)
|
|
break;
|
|
// Buffer too large, let's try with half the size...
|
|
buffer_size /=2;
|
|
}
|
|
|
|
log(LOG_DEBUG, "udp: %s buffer on fd %d enlarged from %d to %d", bufname, fd, current_buffer_size, buffer_size);
|
|
}
|
|
|
|
|
|
// . returns false and sets g_errno on error
|
|
// . use 1 socket for recving and sending
|
|
// . pollTime is how often to call timePollWrapper() (in milliseconds)
|
|
// . it should be at least the minimal slot timeout
|
|
bool UdpServer::init ( uint16_t port, UdpProtocol *proto,
|
|
int32_t readBufSize , int32_t writeBufSize ,
|
|
int32_t pollTime , int32_t maxSlots , bool isDns ){
|
|
|
|
// save this
|
|
m_isDns = isDns;
|
|
|
|
// we now alloc so we don't blow up stack
|
|
if ( m_slots ) {
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
if ( maxSlots < 100 ) maxSlots = 100;
|
|
m_slots =(UdpSlot *)mmalloc(maxSlots*sizeof(UdpSlot),"UdpServer");
|
|
if ( ! m_slots ) {
|
|
log("udp: Failed to allocate %" PRId32" bytes.", maxSlots*(int32_t)sizeof(UdpSlot));
|
|
return false;
|
|
}
|
|
|
|
log(LOG_DEBUG,"udp: Allocated %" PRId32" bytes for %" PRId32" sockets.",
|
|
maxSlots*(int32_t)sizeof(UdpSlot),maxSlots);
|
|
m_maxSlots = maxSlots;
|
|
|
|
// dgram size
|
|
log(LOG_DEBUG,"udp: Using dgram size of %" PRId32" bytes.", (int32_t)DGRAM_SIZE);
|
|
|
|
// set up linked list of available slots
|
|
m_availableListHead = &m_slots[0];
|
|
for (int32_t i = 0; i < m_maxSlots - 1; i++) {
|
|
m_slots[i].m_availableListNext = &m_slots[i + 1];
|
|
}
|
|
m_slots[m_maxSlots - 1].m_availableListNext = NULL;
|
|
|
|
// the linked list of slots in use
|
|
m_activeListHead = NULL;
|
|
m_activeListTail = NULL;
|
|
|
|
// linked list of callback candidates
|
|
m_callbackListHead = NULL;
|
|
|
|
// . set up hash table that converts key (ip/port/transId) to a slot
|
|
// . m_numBuckets must be power of 2
|
|
m_numBuckets = getHighestLitBitValue ( m_maxSlots * 6 );
|
|
m_bucketMask = m_numBuckets - 1;
|
|
// alloc space for hash table
|
|
m_bufSize = m_numBuckets * sizeof(UdpSlot *);
|
|
m_buf = (char *)mmalloc ( m_bufSize , "UdpServer" );
|
|
if ( ! m_buf ) {
|
|
log("udp: Failed to allocate %" PRId32" bytes for table.",m_bufSize);
|
|
return false;
|
|
}
|
|
|
|
m_ptrs = (UdpSlot **)m_buf;
|
|
|
|
// clear
|
|
memset ( m_ptrs , 0 , sizeof(UdpSlot *)*m_numBuckets );
|
|
log(LOG_DEBUG,"udp: Allocated %" PRId32" bytes for table.",m_bufSize);
|
|
|
|
m_numUsedSlots = 0;
|
|
m_numUsedSlotsIncoming = 0;
|
|
// clear this
|
|
m_isShuttingDown = false;
|
|
// . TODO: IMPORTANT: FIX this to read and save from disk!!!!
|
|
// . NOTE: only need to fix if doing incremental sync/storage??
|
|
m_nextTransId = g_hostdb.getMyHostId() << 19;
|
|
// clear handlers
|
|
memset ( m_handlers, 0 , sizeof(void(* )(UdpSlot *slot,int32_t)) * 128);
|
|
|
|
// save the port in case we need it later
|
|
m_port = port;
|
|
// no requests waiting yet
|
|
m_requestsInWaiting = 0;
|
|
// special count
|
|
m_msg07sInWaiting = 0;
|
|
m_msg25sInWaiting = 0;
|
|
m_msg39sInWaiting = 0;
|
|
m_msg20sInWaiting = 0;
|
|
m_msg0csInWaiting = 0;
|
|
m_msg0sInWaiting = 0;
|
|
// maintain a ptr to the protocol
|
|
m_proto = proto;
|
|
// sanity test so we can peek at the rdbid in a msg0 request
|
|
if( ! m_isDns && MSG0RDBIDOFFSET +1 > m_proto->getMaxPeekSize() ) {
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
// set up our socket
|
|
m_sock = socket ( AF_INET, SOCK_DGRAM , 0 );
|
|
|
|
if ( m_sock < 0 ) {
|
|
// copy errno to g_errno
|
|
g_errno = errno;
|
|
log(LOG_WARN, "udp: Failed to create socket: %s.", mstrerror(g_errno));
|
|
return false;
|
|
}
|
|
// sockaddr_in provides interface to sockaddr
|
|
struct sockaddr_in name;
|
|
// reset it all just to be safe
|
|
memset(&name,0,sizeof(name));
|
|
name.sin_family = AF_INET;
|
|
name.sin_addr.s_addr = INADDR_ANY;
|
|
name.sin_port = htons(port);
|
|
// we want to re-use port it if we need to restart
|
|
int options = 1;
|
|
if ( setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &options,sizeof(options)) < 0 ) {
|
|
// copy errno to g_errno
|
|
g_errno = errno;
|
|
log( LOG_WARN, "udp: Call to setsockopt: %s.",mstrerror(g_errno));
|
|
return false;
|
|
}
|
|
|
|
// the lower the RT signal we use, the higher our priority
|
|
|
|
// . before we start getting signals on this socket let's make sure
|
|
// we have a handler registered with the Loop class
|
|
// . this makes m_sock non-blocking, too
|
|
// . use the original niceness for this
|
|
if (!g_loop.registerReadCallback(m_sock, this, readPollWrapper, "UdpServer::readPollWrapper", 0)) {
|
|
return false;
|
|
}
|
|
|
|
// . also register for 30 ms tix (was 15ms)
|
|
// but we aren't using tokens any more so I raised it
|
|
// . it's low so we can claim any unclaimed tokens!
|
|
// . now resends are at 20ms... i'd go lower, but sigtimedqueue() only
|
|
// has a timer resolution of 20ms, probably due to kernel time slicin
|
|
if (!g_loop.registerSleepCallback(pollTime, this, timePollWrapper, "UdpServer::timePollWrapper", 0)) {
|
|
return false;
|
|
}
|
|
|
|
// . set the read buffer size to 256k for high priority socket
|
|
// so our indexlists don't have to be re-transmitted so much in case
|
|
// we delay a bit
|
|
// . set after calling socket() but before calling bind() for tcp
|
|
// because of http://jes.home.cern.ch/jes/gige/acenic.html
|
|
// . do these cmds on the cmd line as root for gigabit ethernet
|
|
// . echo 262144 > /proc/sys/net/core/rmem_max
|
|
// . echo 262144 > /proc/sys/net/core/wmem_max
|
|
// print the size of the buffers
|
|
enlargeUdpSocketBufffer(m_sock, "Receive", SO_RCVBUF, readBufSize);
|
|
enlargeUdpSocketBufffer(m_sock, "Send", SO_SNDBUF, writeBufSize);
|
|
|
|
// bind this name to the socket
|
|
if ( bind ( m_sock, (struct sockaddr *)(void*)&name, sizeof(name)) < 0) {
|
|
// copy errno to g_errno
|
|
g_errno = errno;
|
|
close ( m_sock );
|
|
log( LOG_WARN, "udp: Failed to bind to port %hu: %s.", port,strerror(g_errno));
|
|
return false;
|
|
}
|
|
|
|
// init stats
|
|
m_eth0BytesIn = 0LL;
|
|
m_eth0BytesOut = 0LL;
|
|
m_eth0PacketsIn = 0LL;
|
|
m_eth0PacketsOut = 0LL;
|
|
m_eth1BytesIn = 0LL;
|
|
m_eth1BytesOut = 0LL;
|
|
m_eth1PacketsIn = 0LL;
|
|
m_eth1PacketsOut = 0LL;
|
|
|
|
// for packets coming in from other clusters usually for importing
|
|
// link information
|
|
m_outsiderPacketsIn = 0LL;
|
|
m_outsiderBytesIn = 0LL;
|
|
m_outsiderPacketsOut = 0LL;
|
|
m_outsiderBytesOut = 0LL;
|
|
|
|
log ( LOG_INIT, "udp: Listening on UDP port %hu with fd=%i.", m_port, m_sock );
|
|
return true;
|
|
}
|
|
|
|
// . use a backoff of -1 for the default
|
|
// . use maxWait of -1 for the default
|
|
// . returns false and sets g_errno on error
|
|
// . returns true on success
|
|
bool UdpServer::sendRequest(char *msg,
|
|
int32_t msgSize,
|
|
msg_type_t msgType,
|
|
uint32_t ip,
|
|
uint16_t port,
|
|
int32_t hostId,
|
|
UdpSlot **retslot, // can be NULL
|
|
void *state,
|
|
void (*callback)(void *state, UdpSlot *slot),
|
|
int64_t timeout, // in milliseconds
|
|
int32_t niceness,
|
|
const char *extraInfo,
|
|
int32_t maxResends) {
|
|
|
|
// sanity check
|
|
if ( ! m_handlers[msgType] && msgType != msg_type_dns ) {
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
// NULLify slot if any
|
|
if ( retslot ) {
|
|
*retslot = NULL;
|
|
}
|
|
|
|
// if shutting down return an error
|
|
if ( m_isShuttingDown ) {
|
|
g_errno = ESHUTTINGDOWN;
|
|
return false;
|
|
}
|
|
|
|
// ensure timeout ok
|
|
if ( timeout < 0 ) {
|
|
//g_errno = EBADENGINEER;
|
|
log(LOG_LOGIC,"udp: sendrequest: Timeout is negative. ");
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
// . we only allow niceness 0 or 1 now
|
|
// . this niceness is only used for makeCallbacks()
|
|
if ( niceness > 1 ) niceness = 1;
|
|
if ( niceness < 0 ) niceness = 0;
|
|
|
|
// set up shotgunning for this hostId
|
|
Host *h = NULL;
|
|
uint32_t ip2 = ip;
|
|
|
|
// . now we always set UdpSlot::m_host
|
|
// . hostId is -1 when sending to a host in g_hostdb2 (hosts2.conf)
|
|
if ( hostId >= 0 ) {
|
|
h = g_hostdb.getHost ( hostId );
|
|
}
|
|
|
|
// get it from g_hostdb2 then via ip lookup if still NULL
|
|
if ( ! h ) {
|
|
h = g_hostdb.getUdpHost ( ip , port );
|
|
}
|
|
|
|
// sanity check
|
|
if ( h && ip && ip != (uint32_t)-1 && h->m_ip != ip && h->m_ipShotgun != ip && ip != 0x0100007f ) { // "127.0.0.1"
|
|
log(LOG_LOGIC,"udp: provided hostid does not match ip");
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
// always use the primary ip for making the key,
|
|
// do not use the shotgun ip. because we can be getting packets
|
|
// from either ip for the same transaction.
|
|
if ( h ) {
|
|
ip2 = h->m_ip;
|
|
}
|
|
|
|
ScopedLock sl(m_mtx);
|
|
|
|
// get a new transId
|
|
int32_t transId = getTransId_unlocked();
|
|
|
|
// make a key for this new slot
|
|
key96_t key = m_proto->makeKey (ip2,port,transId,true/*weInitiated?*/);
|
|
|
|
// . create a new slot to control the transmission of this request
|
|
// . should set g_errno on failure
|
|
UdpSlot *slot = getEmptyUdpSlot_unlocked(key, false);
|
|
if ( ! slot ) {
|
|
log( LOG_WARN, "udp: All %" PRId32" slots are in use.",m_maxSlots);
|
|
static time_t lastLogTime = 0;
|
|
time_t now = time(0);
|
|
if(lastLogTime+10 < now) {
|
|
lastLogTime = now;
|
|
for(int i = 0; i < m_maxSlots; i++) {
|
|
char ipbuf[16];
|
|
log(LOG_WARN,"udp: slot[%4d]: peer=%s:%d type=%02x %s",
|
|
i,
|
|
iptoa(m_slots[i].getIp(),ipbuf),
|
|
m_slots[i].getPort(),
|
|
m_slots[i].getMsgType(),
|
|
m_slots[i].isIncoming()?"in":"out");
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
char ipbuf[16];
|
|
logDebug(g_conf.m_logDebugUdp, "udp: sendrequest: ip2=%s port=%" PRId32" msgType=0x%02x msgSize=%" PRId32" "
|
|
"tid=%" PRId32" (niceness=%" PRId32") slot=%p.",
|
|
iptoa(ip2,ipbuf),(int32_t)port, (unsigned char)msgType, (int32_t)msgSize,
|
|
(int32_t)transId, (int32_t)niceness , slot );
|
|
|
|
// . get time
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
|
|
// connect to the ip/port (udp-style: does not do much)
|
|
slot->connect(m_proto, ip, port, h, hostId, transId, timeout, now, niceness);
|
|
|
|
// . use default callback if none provided
|
|
// . slot has a callback iff it's an outgoing request
|
|
if ( ! callback ) {
|
|
callback = defaultCallbackWrapper;
|
|
}
|
|
|
|
// set up for a send
|
|
if (!slot->sendSetup(msg, msgSize, msg, msgSize, msgType, now, state, callback, niceness, extraInfo)) {
|
|
freeUdpSlot_unlocked(slot);
|
|
log( LOG_WARN, "udp: Failed to initialize udp socket for sending req: %s",mstrerror(g_errno));
|
|
return false;
|
|
}
|
|
|
|
if (slot->m_callbackListNext || slot->m_callbackListPrev) {
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
// set this
|
|
slot->m_maxResends = maxResends;
|
|
|
|
// keep sending dgrams until we have no more or hit ACK_WINDOW limit
|
|
if ( !doSending_unlocked(slot, true /*allow resends?*/, now) ) {
|
|
freeUdpSlot_unlocked(slot);
|
|
log(LOG_WARN, "udp: Failed to send dgrams for udp socket.");
|
|
return false;
|
|
}
|
|
|
|
// let caller know the slot if he wants to
|
|
if ( retslot ) {
|
|
*retslot = slot;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void UdpServer::sendErrorReply(UdpSlot *slot, int32_t errnum) {
|
|
ScopedLock sl(m_mtx);
|
|
sendErrorReply_unlocked(slot, errnum);
|
|
}
|
|
|
|
// returns false and sets g_errno on error, true otherwise
|
|
void UdpServer::sendErrorReply_unlocked(UdpSlot *slot, int32_t errnum) {
|
|
m_mtx.verify_is_locked();
|
|
|
|
logDebug(g_conf.m_logDebugUdp, "udp: sendErrorReply slot=%p errnum=%" PRId32, slot, errnum);
|
|
|
|
// bitch if it is 0
|
|
if ( errnum == 0 ) {
|
|
log(LOG_LOGIC,"udp: sendErrorReply: errnum is 0.");
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
// clear g_errno in case it was set
|
|
g_errno = 0;
|
|
|
|
// make a little msg
|
|
char *msg = slot->m_shortSendBuffer;
|
|
*(int32_t *)msg = htonl(errnum) ;
|
|
|
|
// set the m_localErrno in "slot" so it will set the dgrams error bit
|
|
slot->m_localErrno = errnum;
|
|
|
|
sendReply_unlocked(msg, 4, msg, 4, slot);
|
|
}
|
|
|
|
void UdpServer::sendReply(char *msg, int32_t msgSize, char *alloc, int32_t allocSize, UdpSlot *slot, void *state,
|
|
void (*callback2)(void *state, UdpSlot *slot)) {
|
|
ScopedLock sl(m_mtx);
|
|
sendReply_unlocked(msg, msgSize, alloc, allocSize, slot, state, callback2);
|
|
}
|
|
|
|
// . destroys slot on error or completion (frees m_readBuf,m_sendBuf)
|
|
// . use a backoff of -1 for the default
|
|
void UdpServer::sendReply_unlocked(char *msg, int32_t msgSize, char *alloc, int32_t allocSize, UdpSlot *slot, void *state,
|
|
void (*callback2)(void *state, UdpSlot *slot)) {
|
|
m_mtx.verify_is_locked();
|
|
|
|
logDebug(g_conf.m_logDebugUdp, "udp: sendReply slot=%p", slot);
|
|
|
|
// we can only sendReply if it's an incoming slot
|
|
if (!slot->isIncoming() || slot->hasCallback()) {
|
|
logError("udp: sendReply: Not incoming slot/has callback");
|
|
gbshutdownLogicError();
|
|
}
|
|
|
|
if ( ! msg && msgSize > 0 ) {
|
|
log( LOG_WARN, "udp: calling sendreply with null send buffer and positive size! will probably core." );
|
|
}
|
|
|
|
// record some statistics on how long these msg handlers are taking
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
// m_queuedTime should have been set before m_handlers[] was called
|
|
int32_t delta = now - slot->m_queuedTime;
|
|
int32_t n = slot->getNiceness();
|
|
if ( n < 0 ) n = 0;
|
|
if ( n > 1 ) n = 1;
|
|
// add to average, this is now the reply GENERATION, not handler time
|
|
g_stats.m_msgTotalOfHandlerTimes [slot->getMsgType()][n] += delta;
|
|
g_stats.m_msgTotalHandlersCalled [slot->getMsgType()][n]++;
|
|
// bucket number is log base 2 of the delta
|
|
if ( delta > 64000 ) delta = 64000;
|
|
int32_t bucket = getHighestLitBit ( (uint16_t)delta );
|
|
// MAX_BUCKETS is probably 16 and #define'd in Stats.h
|
|
if ( bucket >= MAX_BUCKETS ) bucket = MAX_BUCKETS-1;
|
|
g_stats.m_msgTotalHandlersByTime [slot->getMsgType()][n][bucket]++;
|
|
// we have to use a different clock for measuring how long to
|
|
// send the reply now
|
|
slot->m_queuedTime = now;
|
|
|
|
// now we always set m_host, we use s_shotgun to toggle
|
|
slot->m_host = g_hostdb.getUdpHost ( slot->getIp() , slot->getPort() );
|
|
|
|
// discount this
|
|
if ( slot->m_convertedNiceness == 1 && slot->getNiceness() == 0 ) {
|
|
logDebug(g_conf.m_logDebugUdp, "udp: unconverting slot=%p", slot);
|
|
|
|
// go back to niceness 1 for sending back, otherwise their
|
|
// the callback will be called with niceness 0!!
|
|
//slot->m_niceness = 1;
|
|
slot->m_convertedNiceness = 2;
|
|
}
|
|
|
|
// . use a NULL callback since we're sending a reply
|
|
// . set up for a send
|
|
if (!slot->sendSetup(msg, msgSize, alloc, allocSize, slot->getMsgType(), now, NULL, NULL, slot->getNiceness())) {
|
|
log( LOG_WARN, "udp: Failed to initialize udp socket for sending reply: %s", mstrerror(g_errno));
|
|
mfree ( alloc , allocSize , "UdpServer");
|
|
// was EBADENGINEER
|
|
log(LOG_ERROR,"%s:%s:%d: call sendErrorReply.", __FILE__, __func__, __LINE__);
|
|
sendErrorReply_unlocked(slot, g_errno);
|
|
return ;
|
|
}
|
|
// set the callback2 , it might not be NULL if we're recording stats
|
|
// OR we need to call Msg21::freeBandwidth() after sending
|
|
slot->m_state = state;
|
|
slot->m_callback2 = callback2;
|
|
// set this
|
|
slot->m_maxResends = -1;
|
|
|
|
logDebug(g_conf.m_logDebugUdp, "udp: Sending reply tid=%" PRId32" msgType=0x%02x (niceness=%" PRId32").",
|
|
slot->getTransId(), (int)slot->getMsgType(), (int32_t)slot->getNiceness());
|
|
// keep sending dgrams until we have no more or hit ACK_WINDOW limit
|
|
if ( !doSending_unlocked(slot, true /*allow resends?*/, now) ) {
|
|
// . on error deal with that
|
|
// . errors from doSending() are from
|
|
// UdpSlot::sendDatagramOrAck()
|
|
// which are usually errors from sendto() or something
|
|
// . TODO: we may have to destroy this slot ourselves now...
|
|
log(LOG_WARN, "udp: Got error sending dgrams.");
|
|
// destroy it i guess
|
|
destroySlot_unlocked(slot);
|
|
}
|
|
}
|
|
|
|
// . this wrapper is called when m_sock is ready for writing
|
|
// . should only be called by Loop.cpp since it calls callbacks
|
|
// . should only be called if in an interrupt or interrupts are off!!
|
|
void UdpServer::sendPollWrapper(int fd, void *state) {
|
|
UdpServer *that = static_cast<UdpServer*>(state);
|
|
// begin the read/send/callback loop
|
|
that->sendPoll(true, gettimeofdayInMilliseconds());
|
|
}
|
|
|
|
// . returns false and sets g_errno on error, true otherwise
|
|
// . will send an ACK or dgram
|
|
// . this is called by sendRequest() which is not async safe
|
|
// and by sendPollWrapper()
|
|
// . that means we can be calling doSending() on a slot made in
|
|
// sendRequest() and then be interrupted by sendPollWrapper()
|
|
// . Fortunately, we have a lock around it in sendRequest()!
|
|
bool UdpServer::doSending_unlocked(UdpSlot *slot, bool allowResends, int64_t now) {
|
|
m_mtx.verify_is_locked();
|
|
|
|
// if UdpServer::cancel() was called and this slot's callback was
|
|
// called, make sure to hault sending if we are in a quickpoll
|
|
// interrupt...
|
|
if ( slot->hasCalledCallback() ) {
|
|
log("udp: trying to send on called callback slot");
|
|
return true;
|
|
}
|
|
|
|
for(;;) {
|
|
if ( slot->getScore(now) < 0 ) {
|
|
//enough or all sent
|
|
if(slot->m_callback && slot->m_host)
|
|
slot->m_host->updateLastRequestSendTimestamp(getCurrentTimeNanoseconds());
|
|
return true;
|
|
}
|
|
|
|
// . returns -2 if nothing to send, -1 on error, 0 if blocked,
|
|
// 1 if sent something
|
|
// . it will send a dgram or an ACK
|
|
int32_t status = slot->sendDatagramOrAck ( m_sock , allowResends , now );
|
|
// return 1 if nothing to send
|
|
if ( status == -2 ) {
|
|
//all sent
|
|
if(slot->m_callback && slot->m_host)
|
|
slot->m_host->updateLastRequestSendTimestamp(getCurrentTimeNanoseconds());
|
|
return true;
|
|
}
|
|
// return -1 on error
|
|
if ( status == -1 ) {
|
|
log("udp: Had error sending dgram: %s.",mstrerror(g_errno));
|
|
return false;
|
|
}
|
|
// return 0 if we blocked on this dgram
|
|
if ( status == 0 ) {
|
|
// but Loop should call us again asap because I don't think
|
|
// we'll get a ready to write signal... don't count on it
|
|
m_needToSend = true;
|
|
// ok, now it should
|
|
if ( ! m_writeRegistered ) {
|
|
if (!g_loop.registerWriteCallback(m_sock, this, sendPollWrapper,
|
|
"UdpServer::sendPollWrapper", 0)) {
|
|
logError("registerWriteCallback failed");
|
|
return false;
|
|
}
|
|
m_writeRegistered = true;
|
|
}
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
|
|
// . should only be called from process() since this is not re-entrant
|
|
// . sends all the awaiting dgrams it can
|
|
// . returns false if blocked, true otherwise
|
|
// . sets g_errno on error
|
|
// . tries to send msgs that are the "most caught up" to their ACKs first
|
|
// . call the callback of slots that are TIMEDOUT or get an error!
|
|
// . verified that this is not interruptible
|
|
// . MDW: THIS IS NOW called by Loop.cpp when our udp socket is ready for
|
|
// sending on, and a previous sendto() would have blocked.
|
|
bool UdpServer::sendPoll(bool allowResends, int64_t now) {
|
|
ScopedLock sl(m_mtx);
|
|
|
|
// just so caller knows we don't need to send again yet
|
|
m_needToSend = false;
|
|
|
|
// if we don'thave anything to send, or we're waiting on ACKS, then
|
|
// just return false, we didn't do anything.
|
|
// assume we didn't process anything
|
|
bool something = false;
|
|
|
|
for(;;) {
|
|
// . don't do any sending until we leave the wait state
|
|
// or if is shutting down
|
|
if ( m_isShuttingDown )
|
|
return false;
|
|
// . get the next slot to send on
|
|
// . it sets "isResend" to true if it's a resend
|
|
// . this sets g_errno to ETIMEOUT if the slot it returns has timed out
|
|
// . in that case we'll destroy that slot
|
|
UdpSlot *slot = getBestSlotToSend_unlocked(now);
|
|
// . slot is NULL if no more slots need sending
|
|
// . return true if we processed something
|
|
if ( ! slot ) {
|
|
// if nobody needs to send now unregister write callback
|
|
// so select() loop in Loop.cpp does not keep freaking out
|
|
if ( ! m_needToSend && m_writeRegistered ) {
|
|
g_loop.unregisterWriteCallback(m_sock, this, sendPollWrapper);
|
|
m_writeRegistered = false;
|
|
}
|
|
return something;
|
|
}
|
|
// otherwise, we can send something
|
|
something = true;
|
|
|
|
// . send all we can from this slot
|
|
// . when shutting down during a dump we can get EBADF during a send
|
|
// so do not loop forever
|
|
// . this returns false on error, i haven't seen it happen though
|
|
if ( !doSending_unlocked(slot, allowResends, now) )
|
|
return true;
|
|
}
|
|
}
|
|
|
|
// . returns NULL if no slots need sending
|
|
// . otherwise returns a slot
|
|
// . slot may have dgrams or ACKs to send
|
|
// . sets g_errno to ETIMEDOUT if that slot is timed out as well as set
|
|
// that slot's m_doneSending to true
|
|
// . let's send the shortest first, but weight by how long it's been waiting!
|
|
// . f(x) = a*(now - startTime) + b/msgSize
|
|
// . verified that this is not interruptible
|
|
UdpSlot *UdpServer::getBestSlotToSend_unlocked(int64_t now) {
|
|
m_mtx.verify_is_locked();
|
|
|
|
// . we send msgs that are mostly "caught up" with their acks first
|
|
// . the slot with the lowest score gets sent
|
|
// . re-sends have priority over NONre-sends(ACK was not recvd in time)
|
|
int32_t maxScore = -1;
|
|
UdpSlot *maxi = NULL;
|
|
|
|
// . we send dgrams with the lowest "score" first
|
|
// . the "score" is just number of ACKs you're waiting for
|
|
// . that way transmissions that are the most caught up to their ACKs
|
|
// are considered faster so we send to them first
|
|
// . we set the hi bit in the score for non-resends so dgrams that
|
|
// are being resent take precedence
|
|
for ( UdpSlot *slot = m_activeListHead ; slot ; slot = slot->m_activeListNext ) {
|
|
// . we don't allow time out on slots waiting for us to send
|
|
// stuff, because we'd just end up calling the handler
|
|
// too many times. we could invent a "stop" cmd or something.
|
|
// . mdw
|
|
|
|
// . how many acks are we currently waiting for from dgrams
|
|
// that have already been sent?
|
|
// . can be up to ACK_WINDOW_SIZE (16?).
|
|
// . we're a "Fastest First" (FF) protocol stack.
|
|
int32_t score = slot->getScore ( now );
|
|
// a negative score means it's not a candidate
|
|
if ( score < 0 ) {
|
|
continue;
|
|
}
|
|
// see if we're a winner
|
|
if ( score > maxScore ) {
|
|
maxi = slot;
|
|
maxScore = score;
|
|
}
|
|
}
|
|
|
|
// return the winning slot
|
|
return maxi;
|
|
}
|
|
|
|
// . must give level of niceness for continuing the transaction at that lvl
|
|
bool UdpServer::registerHandler( msg_type_t msgType, void (* handler)(UdpSlot *, int32_t niceness) ) {
|
|
if (m_handlers[msgType]) {
|
|
log(LOG_LOGIC, "udp: msgType %02x already in use.", (int)msgType);
|
|
return false;
|
|
}
|
|
|
|
m_handlers[msgType] = handler;
|
|
return true;
|
|
}
|
|
|
|
// . read and send as much as we can before calling any callbacks
|
|
// . if forceCallbacks is true we call them regardless if we read/sent anything
|
|
void UdpServer::process(int64_t now, int32_t maxNiceness) {
|
|
// bail if no main sock
|
|
if ( m_sock < 0 ) return ;
|
|
|
|
//log("process");
|
|
|
|
// if we call this while in the sighandler it crashes since
|
|
// gettimeofdayInMilliseconds() is not async safe
|
|
int64_t startTimer = gettimeofdayInMilliseconds();
|
|
bigloop:
|
|
bool needCallback = false;
|
|
loop:
|
|
// did we read or send something?
|
|
bool something = false;
|
|
// a common var
|
|
UdpSlot *slot;
|
|
// read loop
|
|
readAgain:
|
|
// bail if no main sock, could have been shutdown in the middle
|
|
if ( m_sock < 0 ) return ;
|
|
// . returns -1 on error, 0 if blocked, 1 if completed reading dgram
|
|
// . *slot is set to the slot on which the dgram was read
|
|
// . *slot will be NULL on some errors (read errors or alloc errors)
|
|
// . *slot will be NULL if we read and processed a slotless ACK
|
|
// . *slot will be NULL if we read nothing (0 bytes read & 0 returned)
|
|
int32_t status = readSock(&slot, now);
|
|
// if we read something
|
|
if ( status != 0 ) {
|
|
// if no slot was set, it was a slotless read so keep looping
|
|
if ( ! slot ) { g_errno = 0; goto readAgain; }
|
|
// if there was a read error let makeCallback() know about it
|
|
if ( status == -1 ) {
|
|
slot->m_errno = g_errno;
|
|
// prepare to call the callback by adding it to this
|
|
// special linked list
|
|
if ( g_errno ) {
|
|
ScopedLock sl(m_mtx);
|
|
addToCallbackLinkedList_unlocked(slot);
|
|
}
|
|
// sanity
|
|
if ( ! g_errno )
|
|
log("udp: missing g_errno from read error");
|
|
}
|
|
// we read something
|
|
something = true;
|
|
{
|
|
ScopedLock sl(m_mtx);
|
|
// try sending an ACK on the slot we read something from
|
|
doSending_unlocked(slot, false, now);
|
|
}
|
|
}
|
|
// if we read something, try for more
|
|
if ( something ) {
|
|
needCallback = true;
|
|
goto loop;
|
|
}
|
|
// if we don't need a callback, bail
|
|
if ( ! needCallback ) {
|
|
if ( m_needBottom ) goto callBottom;
|
|
else return;
|
|
}
|
|
// . set flag to call low priority callbacks
|
|
// . need to force it on here because makeCallbacks() may
|
|
// return false when there are only low priority (high niceness)
|
|
// callbacks to call...
|
|
m_needBottom = true;
|
|
// . TODO: if we read/sent nothing don't bother calling callbacks
|
|
// . call any outstanding callbacks
|
|
// . now we have a niceness bit in the dgram header. if set, those
|
|
// callback will only be called after all unset dgram's callbacks are
|
|
// . this returns true if we called one
|
|
if ( makeCallbacks(/*niceness level*/ 0) ) {
|
|
// set flag to call low priority callbacks
|
|
m_needBottom = true;
|
|
// note it
|
|
//log("made callback");
|
|
// but not now, only when we don't call any high priorities
|
|
goto bigloop;
|
|
}
|
|
callBottom:
|
|
if(maxNiceness < 1) return;
|
|
// if we call this while in the sighandler it crashes since
|
|
// gettimeofdayInMilliseconds() is not async safe
|
|
int64_t elapsed = gettimeofdayInMilliseconds() - startTimer;
|
|
if(elapsed < 10) {
|
|
// we did not call any, so resort to nice callbacks
|
|
// . only go to bigloop if we called a callback
|
|
if ( makeCallbacks(/*niceness level*/1) )
|
|
goto bigloop;
|
|
}
|
|
else {
|
|
m_needBottom = true;
|
|
}
|
|
}
|
|
|
|
// . this wrapper is called when the Loop class has found that m_sock
|
|
// needs to be read from (it got a SIGIO/GB_SIGRTMIN signal for it)
|
|
// . should only be called if in an interrupt or interrupts are off!!
|
|
void UdpServer::readPollWrapper(int fd, void *state) {
|
|
UdpServer *that = static_cast<UdpServer*>(state);
|
|
// begin the read/send/callback loop
|
|
that->process(gettimeofdayInMilliseconds());
|
|
}
|
|
|
|
|
|
// . returns -1 on error, 0 if blocked, 1 if completed reading dgram
|
|
int32_t UdpServer::readSock(UdpSlot **slotPtr, int64_t now) {
|
|
ScopedLock sl(m_mtx);
|
|
|
|
// NULLify slot
|
|
*slotPtr = NULL;
|
|
sockaddr_in from;
|
|
socklen_t fromLen = sizeof ( struct sockaddr );
|
|
char readBuffer[64*1024];
|
|
int readSize = recvfrom ( m_sock,
|
|
readBuffer,
|
|
sizeof(readBuffer),
|
|
0, //flags
|
|
(sockaddr *)(void*)&from,
|
|
&fromLen);
|
|
|
|
logDebug(g_conf.m_logDebugLoop, "loop: readsock: readSize=%i m_sock/fd=%i", readSize,m_sock);
|
|
|
|
// cancel silly g_errnos and return 0 since we blocked
|
|
if ( readSize < 0 ) {
|
|
g_errno = errno;
|
|
|
|
if ( g_errno == 0 || g_errno == EILSEQ || g_errno == EAGAIN ) {
|
|
g_errno = 0;
|
|
return 0;
|
|
}
|
|
|
|
// Interrupted system call (4) (from valgrind)
|
|
log( LOG_WARN, "udp: readDgram: %s (%d).", mstrerror( g_errno ), g_errno );
|
|
return -1;
|
|
}
|
|
|
|
uint32_t ip2;
|
|
Host *h;
|
|
key96_t key;
|
|
UdpSlot *slot;
|
|
int32_t dgramNum;
|
|
bool wasAck;
|
|
int32_t transId;
|
|
bool status;
|
|
msg_type_t msgType;
|
|
int32_t niceness;
|
|
|
|
// get the ip
|
|
uint32_t ip = from.sin_addr.s_addr;
|
|
// if it's 127.0.0.1 then change it to our ip
|
|
if ( ip == g_hostdb.getLoopbackIp() ) ip = g_hostdb.getMyIp();
|
|
// . if ip is not from a host in hosts.conf, discard it
|
|
// . don't bother checking for dns server, who knows where that is
|
|
// . now also allow all admin ips
|
|
else if ( m_proto->useAcks() &&
|
|
! is_trusted_protocol_ip(ip) &&
|
|
! g_hostdb.isIpInNetwork ( ip ) &&
|
|
! g_conf.isMasterIp ( ip ) &&
|
|
! g_conf.isConnectIp ( ip ) ) {
|
|
// bitch, wait at least 5 seconds though
|
|
static int32_t s_lastTime = 0;
|
|
static int64_t s_count = 0LL;
|
|
s_count++;
|
|
if ( getTime() - s_lastTime > 5 ) {
|
|
s_lastTime = getTime();
|
|
char ipbuf[16];
|
|
log(LOG_WARN, "udp: Received unauthorized udp packet from %s. Count=%" PRId64".",iptoa(ip,ipbuf),s_count);
|
|
}
|
|
// make it return 1 cuz we did read something
|
|
status = true;
|
|
// not an ack? assume not
|
|
wasAck = false;
|
|
// assume no shotgun
|
|
h = NULL;
|
|
// read it into the temporary discard buf
|
|
goto discard;
|
|
}
|
|
// get hostid of the ip, use that instead of ip to make the key
|
|
// since shotgunning may change the ip
|
|
ip2 = ip;
|
|
// i modified Hostdb::hashHosts() to hash the loopback ip now!
|
|
h = g_hostdb.getUdpHost ( ip , ntohs(from.sin_port) );
|
|
|
|
// always use the primary ip for making the key,
|
|
// do not use the shotgun ip. because we can be getting packets
|
|
// from either ip for the same transaction. h can be NULL if the packet
|
|
// is from a dns server.
|
|
if ( h ) ip2 = h->m_ip;
|
|
//logf(LOG_DEBUG,"net: got h=%" PRIu32,(int32_t)h);
|
|
// generate a unique KEY for this TRANSACTION
|
|
key = m_proto->makeKey ( readBuffer ,
|
|
readSize ,
|
|
//from.sin_addr.s_addr, // network order
|
|
ip2 , // ip ,
|
|
//ip , // network order
|
|
ntohs(from.sin_port) );// host order
|
|
// get the corresponding slot for this key, if it exists
|
|
slot = getUdpSlot_unlocked(key);
|
|
// get the dgram number on this dgram
|
|
dgramNum = m_proto->getDgramNum ( readBuffer, readSize );
|
|
// was it an ack?
|
|
wasAck = m_proto->isAck ( readBuffer, readSize );
|
|
// everybody has a transId
|
|
transId = m_proto->getTransId ( readBuffer, readSize );
|
|
// other vars we'll use later
|
|
status = true;
|
|
// if we don't already have a slot set up for it then it can be:
|
|
// #1) a new incoming request
|
|
// #2) a reply we ACKed but it didn't get our ACK and we've closed
|
|
// #3) a stray ACK???
|
|
// #4) a reply but we timed out and our slot is gone
|
|
msgType = static_cast<msg_type_t>(m_proto->getMsgType(readBuffer, readSize));
|
|
niceness = m_proto->isNice ( readBuffer, readSize );
|
|
// general count
|
|
if ( niceness == 0 ) {
|
|
g_stats.m_packetsIn[msgType][0]++;
|
|
if ( wasAck ) g_stats.m_acksIn[msgType][0]++;
|
|
}
|
|
else {
|
|
g_stats.m_packetsIn[msgType][1]++;
|
|
if ( wasAck ) g_stats.m_acksIn[msgType][1]++;
|
|
}
|
|
// if we're shutting down do not accept new connections, discard
|
|
if ( m_isShuttingDown ) goto discard;
|
|
if(slot) {
|
|
if (h) {
|
|
h->updateLastResponseReceiveTimestamp(getCurrentTimeNanoseconds());
|
|
}
|
|
} else {
|
|
//no slot
|
|
// condition #3
|
|
if ( wasAck ) {
|
|
if ( g_conf.m_logDebugUdp ) {
|
|
char ipbuf1[16];
|
|
char ipbuf2[16];
|
|
log(LOG_DEBUG,
|
|
"udp: Read stray ACK, tid=%" PRId32", "
|
|
"ip2=%s "
|
|
"port=%" PRId32" "
|
|
"dgram=%" PRId32" "
|
|
"dst=%s:%hu "
|
|
"k.n1=%" PRIu32" n0=%" PRIu64".",
|
|
transId,
|
|
iptoa(ip2,ipbuf1),
|
|
(int32_t)ntohs(from.sin_port) ,
|
|
dgramNum,
|
|
iptoa(ip,ipbuf2),
|
|
(uint16_t)ntohs(from.sin_port),
|
|
key.n1,key.n0);
|
|
}
|
|
goto discard;
|
|
}
|
|
// condition #2
|
|
if ( m_proto->isReply ( readBuffer, readSize ) ) {
|
|
// if we don't use ACK then do nothing!
|
|
if ( ! m_proto->useAcks () ) {
|
|
log(LOG_REMIND,"dns: Dns reply too late "
|
|
"or reply from a resend.");
|
|
goto discard;
|
|
}
|
|
// . if they didn't get our ACK they might resend to us
|
|
// even though we think the transaction is completed
|
|
// . either our send is slow or their read buf is slow
|
|
// . to avoid these msg crank up the resend time
|
|
// . Multicast likes to send you AND your groupees
|
|
// the same request, take the first reply it gets
|
|
// and dump the rest, this is probably why we get
|
|
// this often
|
|
if ( g_conf.m_logDebugUdp ) {
|
|
char ipbuf[16];
|
|
log(LOG_DEBUG,
|
|
"udp: got dgram we acked, but we closed, "
|
|
"tid=%" PRId32" dgram=%" PRId32" dgramSize=%i "
|
|
"fromIp=%s fromPort=%i msgType=0x%02x",
|
|
transId, dgramNum , readSize,
|
|
iptoa((int32_t)from.sin_addr.s_addr,ipbuf) ,
|
|
ntohs(from.sin_port), (int)msgType);
|
|
}
|
|
cancelTrans:
|
|
// temporary slot for sending back bogus ack
|
|
UdpSlot tmp;
|
|
// . send them another ACK so they shut up
|
|
// . they might not have gotten due to network error
|
|
// . this will clear "tmp" with memset
|
|
tmp.connect (m_proto,&from,NULL,-1,transId, 10000/*timeout*/,
|
|
now , 0 ); // m_niceness );
|
|
// . if this blocks, that sucks, we'll probably get
|
|
// another untethered read... oh well...
|
|
// . ack from 0 to infinite to prevent more from coming
|
|
tmp.sendCancelAck(m_sock, now, dgramNum);
|
|
//return 1;
|
|
goto discard;
|
|
}
|
|
// . if we're shutting down do not accept new connections
|
|
// . just ignore
|
|
if ( m_isShuttingDown ) goto discard; // return 1;
|
|
// shortcut
|
|
bool isProxy = g_proxy.isProxy();
|
|
// do not read any incoming request if half the slots are
|
|
// being used for incoming requests right now. we don't want
|
|
// to lose all of our memory. MDW
|
|
bool getSlot = true;
|
|
if ( msgType == msg_type_7 && m_msg07sInWaiting >= 100 )
|
|
getSlot = false;
|
|
|
|
// msg25 spawns an indexdb request lookup and unless we limit
|
|
// the msg25 requests we can jam ourslves if all the indexdb
|
|
// lookups hit ourselves... we won't have enough free slots
|
|
// to answer the msg0 indexdb lookups!
|
|
if ( msgType == msg_type_25 && m_msg25sInWaiting >= 70 )
|
|
getSlot = false;
|
|
|
|
// . i've seen us freeze up from this too
|
|
// . but only drop spider's msg39s
|
|
if ( msgType == msg_type_39 && m_msg39sInWaiting >= 10 && niceness )
|
|
getSlot = false;
|
|
// try to prevent another lockup condition of msg20 spawing
|
|
// a msg22 request to self but failing...
|
|
if ( msgType == msg_type_20 && m_msg20sInWaiting >= 50 && niceness )
|
|
getSlot = false;
|
|
|
|
// . msg13 is clogging thiings up when we synchost a host
|
|
// and it comes back up
|
|
// . allow spider compression proxy to have a bunch
|
|
// . MDW: do we need this one anymore? relax it a little.
|
|
if ( msgType == msg_type_13 && m_numUsedSlotsIncoming>400 &&
|
|
m_numUsedSlots>800 && !isProxy)
|
|
getSlot = false;
|
|
|
|
// . avoid slamming thread queues with sectiondb disk reads
|
|
// . mdw 1/22/2014 take this out now too, we got ssds
|
|
// let's see if taking this out fixes the jam described
|
|
// below
|
|
// . mdw 1/31/2014 got stuck doing linktext 0x20 lookups
|
|
// leading to tagdb lookups with not enough slots left!!!
|
|
// so decrease 0x20
|
|
// and/or increase 0x00. ill boost from 500 to 1500
|
|
// although i
|
|
// think we should limit the msg20 niceness 1 requests really
|
|
// when slot usage is high... ok, i changed Msg25.cpp to only
|
|
// allow 1 msg20 out if 300+ sockets are in use.
|
|
// . these kinds of techniques ultimately just end up
|
|
// in loop, the proper way is to throttle back the # of
|
|
// outstanding tagdb lookups or whatever at the source
|
|
// otherwise we jam up
|
|
// . tagdb lookups were being dropped because of this being
|
|
// 500 so i raised to 900. a lot of them were from
|
|
// 'get outlink tag recs' or 'get link info' (0x20)
|
|
if ( msgType == msg_type_0 && m_numUsedSlots > 1500 && niceness ) {
|
|
// allow a ton of those tagdb lookups to come in
|
|
char rdbId = 0;
|
|
if ( readSize > MSG0RDBIDOFFSET )
|
|
rdbId = readBuffer[MSG0RDBIDOFFSET];
|
|
if ( rdbId != RDB_TAGDB )
|
|
getSlot = false;
|
|
}
|
|
|
|
// lower priorty slots are dropped first
|
|
if ( m_numUsedSlots >= 1300 && niceness > 0 && ! isProxy &&
|
|
// we dealt with special tagdb msg00's above so
|
|
// do not deal with them here
|
|
msgType != msg_type_0 )
|
|
getSlot = false;
|
|
|
|
// . reserve 300 slots for outgoing query-related requests
|
|
// . this was 1700, but the max udp slots is set to 3500
|
|
// in main.cpp, so let's up this to 2300. i don't want to
|
|
// drop stuff like Msg39 because it takes 8 seconds before
|
|
// it is re-routed in Multicast.cpp! now that we show what
|
|
// msgtypes are being dropped exactly in PageStats.cpp we
|
|
// will know if this is hurting us.
|
|
if ( m_numUsedSlots >= 2300 && ! isProxy ) getSlot = false;
|
|
// getting a titlerec does not send out a 2nd request. i really
|
|
// hate those title rec timeout msgs.
|
|
if ( msgType == msg_type_22 && niceness == 0 ) getSlot = true;
|
|
|
|
if ( getSlot )
|
|
// get a new UdpSlot
|
|
slot = getEmptyUdpSlot_unlocked(key, true);
|
|
// return -1 on failure
|
|
if ( ! slot ) {
|
|
// return -1
|
|
status = false;
|
|
// discard it!
|
|
// only log this message up to once per second to avoid
|
|
// flooding the log
|
|
static int64_t s_lastTime = 0LL;
|
|
g_dropped++;
|
|
// count each msgType we drop
|
|
if ( niceness == 0 ) g_stats.m_dropped[msgType][0]++;
|
|
else g_stats.m_dropped[msgType][1]++;
|
|
if ( now - s_lastTime >= 1000 ) {
|
|
s_lastTime = now;
|
|
log(LOG_INFO, "udp: No udp slots to handle datagram. (msgType=0x%x niceness=%" PRId32") "
|
|
"Discarding. It should be resent. Dropped dgrams=%" PRId32".", (int)msgType,niceness,g_dropped);
|
|
}
|
|
goto discard;
|
|
}
|
|
// default timeout, sender has 60 seconds to send request!
|
|
int64_t timeout = 60000;
|
|
// connect this slot (callback should be NULL)
|
|
slot->connect ( m_proto ,
|
|
&from , // ip/port
|
|
// we now put in the host, which may be NULL
|
|
// if not in cluster, but we need this for
|
|
// keeping track of dgrams sent/read to/from
|
|
// this host (Host::m_dgramsTo/From)
|
|
h , // NULL , // hostPtr
|
|
-1 , // hostId
|
|
transId ,
|
|
timeout , // timeout in 60 secs
|
|
now ,
|
|
// . slot->m_niceness should be set to this now
|
|
// . originally m_niceness is that of this udp
|
|
// server, and we were using it as the slot's
|
|
// but it should be correct now...
|
|
niceness ); // 0 // m_niceness );
|
|
// if we connected to a request slot, count it
|
|
m_requestsInWaiting++;
|
|
// special count
|
|
if ( msgType == msg_type_7 ) m_msg07sInWaiting++;
|
|
if ( msgType == msg_type_25 ) m_msg25sInWaiting++;
|
|
if ( msgType == msg_type_39 ) m_msg39sInWaiting++;
|
|
if ( msgType == msg_type_20 ) m_msg20sInWaiting++;
|
|
if ( msgType == msg_type_c ) m_msg0csInWaiting++;
|
|
if ( msgType == msg_type_0 ) m_msg0sInWaiting++;
|
|
}
|
|
// let caller know the slot associated with reading this dgram
|
|
*slotPtr = slot;
|
|
|
|
// . HACK: kinda.
|
|
// . change the ip we reply on to wherever the sender came from!
|
|
// . because we know that that eth port is mostly likely the best
|
|
// . that way if he resends a request on a different ip because we
|
|
// did not ack him because the eth port went down, we need to send
|
|
// our ack on his changed src ip. really only the sendAck() routine
|
|
// uses this ip, because the send datagram thing will send on the
|
|
// preferred eth port, be it eth0 or eth1, based on if it got a
|
|
// timely ACK or not.
|
|
// . pings should never switch ips though... this was causing
|
|
// Host::m_inProgress1 to be unset instead of m_inProgress2 and
|
|
// we were never able to regain a dead host on eth1 in PingServer.cpp
|
|
if ( ip != slot->getIp() ) {
|
|
if ( g_conf.m_logDebugUdp ) {
|
|
char ipbuf[16];
|
|
log(LOG_DEBUG,"udp: changing ip to %s for acking",
|
|
iptoa(ip,ipbuf));
|
|
}
|
|
slot->m_ip = ip;
|
|
}
|
|
|
|
//if ( ! slot->m_host ) { g_process.shutdownAbort(true);}
|
|
status = slot->readDatagramOrAck(readBuffer,readSize,now);
|
|
|
|
// we we could not allocate a read buffer to hold the request/reply
|
|
// just send a cancel ack so the send will call its callback with
|
|
// g_errno set
|
|
// MDW: it won't make it into the m_callbackListHead linked list with
|
|
// this logic.... maybe it just times out or resends later...
|
|
if ( ! status && g_errno == ENOMEM ) goto cancelTrans;
|
|
|
|
// if it is now a complete REPLY, callback will need to be called
|
|
// so insert into the callback linked list, m_callbackListHead.
|
|
// we have to put slots with NULL callbacks in here since they
|
|
// are incoming requests to handle.
|
|
if ((slot->isDoneReading() || slot->getErrno())) {
|
|
// prepare to call the callback by adding it to this
|
|
// special linked list
|
|
addToCallbackLinkedList_unlocked(slot);
|
|
}
|
|
|
|
discard:
|
|
// . update stats, just put them all in g_udpServer
|
|
// . do not count acks
|
|
// . do not count discarded dgrams here
|
|
if ( ! wasAck && readSize > 0 ) {
|
|
// in case shotgun ip equals ip, check this first
|
|
if ( h && h->m_ip == ip ) {
|
|
g_udpServer.m_eth0PacketsIn += 1;
|
|
g_udpServer.m_eth0BytesIn += readSize;
|
|
}
|
|
// it can come from outside the cluster so check this
|
|
else if ( h && h->m_ipShotgun == ip ) {
|
|
g_udpServer.m_eth1PacketsIn += 1;
|
|
g_udpServer.m_eth1BytesIn += readSize;
|
|
}
|
|
// count packets to/from hosts outside separately usually
|
|
// for importing link information. this can be from the dns
|
|
// quite often!!
|
|
else {
|
|
//log("ip=%s",iptoa(ip));
|
|
g_udpServer.m_outsiderPacketsIn += 1;
|
|
g_udpServer.m_outsiderBytesIn += readSize;
|
|
}
|
|
}
|
|
// return -1 on error
|
|
if ( ! status ) return -1;
|
|
// . return 1 cuz we did read the dgram ok
|
|
// . if we read a dgram, ACK will be sent in readPoll() after we return
|
|
return 1;
|
|
}
|
|
|
|
// . try calling makeCallback() on all slots
|
|
// . return true if we called one
|
|
// . this is basically double entrant!!! CAUTION!!!
|
|
// . if niceness is 0 we may be in a quickpoll or may not be. but we
|
|
// will not enter a quickpoll in that case.
|
|
// . however, if we are in a quickpoll and call makeCallbacks then
|
|
// it will use niceness 0 exclusively, but the function that was niceness
|
|
// 1 and called quickpoll may itself have been indirectly in
|
|
// makeCallbacks(1), so we have to make sure that if we change the
|
|
// linked list here, we make sure the parent adjusts.
|
|
// . the problem is when we call this with niceness 1 and we convert
|
|
// a niceness 1 callback to 0...
|
|
bool UdpServer::makeCallbacks(int32_t niceness) {
|
|
// if nothing to call, forget it
|
|
if (!m_callbackListHead) {
|
|
return false;
|
|
}
|
|
|
|
logDebug(g_conf.m_logDebugUdp, "udp: makeCallbacks: start. nice=%" PRId32, niceness);
|
|
|
|
// assume noone called
|
|
int32_t numCalled = 0;
|
|
if(niceness > 0) m_needBottom = false;
|
|
|
|
int64_t startTime = gettimeofdayInMilliseconds();
|
|
|
|
ScopedLock sl(m_mtx);
|
|
|
|
fullRestart:
|
|
|
|
// take care of certain handlers/callbacks before any others
|
|
// regardless of niceness levels because these handlers are so fast
|
|
for(int pass=0; pass<2; pass++) {
|
|
|
|
UdpSlot *nextSlot = NULL;
|
|
|
|
// only scan those slots that are ready
|
|
for ( UdpSlot *slot = m_callbackListHead ; slot ; slot = nextSlot ) {
|
|
// because makeCallback() can delete the slot, use this
|
|
nextSlot = slot->m_callbackListNext;
|
|
// call quick handlers in pass 0, they do not take any time
|
|
// and if they do not get called right away can cause this host
|
|
// to bottleneck many hosts
|
|
if ( pass == 0 ) {
|
|
// only call handlers in pass 0, not reply callbacks
|
|
if ( slot->hasCallback() ) continue;
|
|
// only call certain msg handlers...
|
|
if ( slot->getMsgType() != msg_type_0 ) // read RdbList
|
|
continue;
|
|
// only allow niceness 0 msg 0x00 requests here since
|
|
// we call a msg8a from msg20.cpp summary generation
|
|
// which uses msg0 to read tagdb list from disk
|
|
if ( slot->getMsgType() == msg_type_0 && slot->getNiceness() ) {
|
|
// to keep udp slots from clogging up with
|
|
// tagdb reads allow even niceness 1 tagdb
|
|
// reads through. cache rate should be super
|
|
// higher and reads short.
|
|
char rdbId = 0;
|
|
if ( slot->m_readBuf &&
|
|
slot->m_readBufSize > MSG0RDBIDOFFSET )
|
|
rdbId = slot->m_readBuf[MSG0RDBIDOFFSET];
|
|
if ( rdbId != RDB_TAGDB )
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// skip if not level we want
|
|
if ( niceness <= 0 && slot->getNiceness() > 0 && pass>0) continue;
|
|
// set g_errno before calling
|
|
g_errno = slot->getErrno();
|
|
// if we got an error from him, set his stats
|
|
Host *h = NULL;
|
|
if ( g_errno && slot->getHostId() >= 0 )
|
|
h = g_hostdb.getHost ( slot->getHostId() );
|
|
if ( h ) {
|
|
h->m_errorReplies++;
|
|
if ( g_errno == ETRYAGAIN )
|
|
h->m_etryagains++;
|
|
}
|
|
|
|
// try to call the callback for this slot
|
|
// time it now
|
|
int64_t start2 = 0;
|
|
bool logIt = false;
|
|
if ( slot->getNiceness() == 0 ) logIt = true;
|
|
if ( logIt ) start2 = gettimeofdayInMilliseconds();
|
|
|
|
logDebug(g_conf.m_logDebugUdp,"udp: calling callback/handler for slot=%p pass=%" PRId32" nice=%" PRId32,
|
|
slot, (int32_t)pass,(int32_t)slot->getNiceness());
|
|
|
|
// . crap, this can alter the linked list we are scanning
|
|
// if it deletes the slot! yes, but now we use "nextSlot"
|
|
// . return false on error and sets g_errno, true otherwise
|
|
// . return true if we called one
|
|
// . skip to next slot if did not call callback/handler
|
|
pthread_mutex_unlock(&m_mtx.mtx);
|
|
if (!makeCallback(slot)) {
|
|
pthread_mutex_lock(&m_mtx.mtx);
|
|
continue;
|
|
}
|
|
pthread_mutex_lock(&m_mtx.mtx);
|
|
|
|
// remove it from the callback list to avoid re-call
|
|
removeFromCallbackLinkedList_unlocked(slot);
|
|
|
|
int64_t took = logIt ? (gettimeofdayInMilliseconds()-start2) : 0;
|
|
if ( took > 1000 || (slot->getNiceness()==0 && took>100))
|
|
logf(LOG_DEBUG,"udp: took %" PRId64" ms to call "
|
|
"callback/handler for "
|
|
"msgtype=0x%" PRIx32" "
|
|
"nice=%" PRId32" "
|
|
"callback=%p",
|
|
took,
|
|
(int32_t)slot->getMsgType(),
|
|
(int32_t)slot->getNiceness(),
|
|
slot->m_callback);
|
|
numCalled++;
|
|
|
|
// log how long callback took
|
|
if(niceness > 0 &&
|
|
(gettimeofdayInMilliseconds() - startTime) > 5 ) {
|
|
//bail if we're taking too long and we're a
|
|
//low niceness request. we can always come
|
|
//back.
|
|
//TODO: call sigqueue if we need to
|
|
m_needBottom = true;
|
|
// now we just finish out the list with a
|
|
// lower niceness
|
|
//niceness = 0;
|
|
return numCalled;
|
|
}
|
|
|
|
// CRAP, what happens is we are not in a quickpoll,
|
|
// we call some handler/callback, we enter a quickpoll,
|
|
// we convert him, send him, delete him, then return
|
|
// back to this function and the linked list is
|
|
// altered because we double entered this function
|
|
// from within a quickpoll. so if we are not in a
|
|
// quickpoll, we have to reset the linked list scan after
|
|
// calling makeCallback(slot) below.
|
|
goto fullRestart;
|
|
}
|
|
// clear
|
|
g_errno = 0;
|
|
}
|
|
|
|
return numCalled;
|
|
}
|
|
|
|
|
|
// . return false on error and sets g_errno, true otherwise
|
|
// . g_errno may already be set when this is called... that's the reason why
|
|
// it was called
|
|
// . this is also called by readTimeoutPoll()
|
|
// . IMPORTANT: call this every time after you read or send a dgram/ACK
|
|
// . or when g_errno gets set
|
|
// . return true if we called one
|
|
bool UdpServer::makeCallback(UdpSlot *slot) {
|
|
// get msgType
|
|
msg_type_t msgType = slot->getMsgType();
|
|
// . if we are the low priority server we do not make callbacks
|
|
// until there are no ongoing transactions in the high priority
|
|
// server
|
|
// . BUT, we are always allowed to call Msg0's m_callback2 so we can
|
|
// give back the bandwidth token (via Msg21) HACK!
|
|
// . undo this for now
|
|
|
|
// for timing callbacks and handlers
|
|
int64_t start = 0;
|
|
int64_t now ;
|
|
int32_t delta , n , bucket;
|
|
|
|
// debug timing
|
|
if ( g_conf.m_logDebugUdp )
|
|
start = gettimeofdayInMilliseconds();
|
|
|
|
// callback is non-NULL if we initiated the transaction
|
|
if ( slot->hasCallback() ) {
|
|
if (!slot->isOutgoing()) {
|
|
gbshutdownLogicError();
|
|
}
|
|
|
|
// assume the slot's error when making callback
|
|
// like EUDPTIMEDOUT
|
|
if ( ! g_errno ) {
|
|
g_errno = slot->getErrno();
|
|
}
|
|
|
|
// . if transaction has not fully completed, bail
|
|
// . unless there was an error
|
|
// . g_errno could be ECANCELLED
|
|
if ( ! g_errno && ! slot->isTransactionComplete()) {
|
|
//log("udp: why calling callback when not ready???");
|
|
return false;
|
|
}
|
|
|
|
// debug msg
|
|
if ( g_conf.m_logDebugUdp ) {
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
int64_t took = now - slot->getStartTime();
|
|
//if ( took > 10 )
|
|
int32_t Mbps = 0;
|
|
if ( took > 0 ) Mbps = slot->m_readBufSize / took;
|
|
Mbps = (Mbps * 1000) / (1024*1024);
|
|
log(LOG_DEBUG,"udp: Got reply tid=%" PRId32" "
|
|
"msgType=0x%02x "
|
|
"g_errno=%s "
|
|
"niceness=%" PRId32" "
|
|
"callback=%p "
|
|
"took %" PRId64" ms (%" PRId32" Mbps).",
|
|
slot->getTransId(), (int)msgType,
|
|
mstrerror(g_errno),
|
|
slot->getNiceness(),
|
|
slot->m_callback ,
|
|
took , Mbps );
|
|
start = now;
|
|
}
|
|
// mark it in the stats for PageStats.cpp
|
|
if ( g_errno == EUDPTIMEDOUT )
|
|
g_stats.m_timeouts[msgType][slot->getNiceness()]++;
|
|
else if ( g_errno == ENOMEM )
|
|
g_stats.m_nomem[msgType][slot->getNiceness()]++;
|
|
else if ( g_errno )
|
|
g_stats.m_errors[msgType][slot->getNiceness()]++;
|
|
|
|
if ( g_conf.m_maxCallbackDelay >= 0 ) {
|
|
start = gettimeofdayInMilliseconds();
|
|
}
|
|
|
|
// sanity check for double callbacks
|
|
if ( slot->hasCalledCallback() ) {
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
slot->m_calledCallback = true;
|
|
|
|
// now we got a reply or an g_errno so call the callback
|
|
|
|
if ( g_conf.m_logDebugLoop )
|
|
log(LOG_DEBUG,"loop: enter callback for 0x%" PRIx32" "
|
|
"nice=%" PRId32,(int32_t)slot->getMsgType(),slot->getNiceness());
|
|
|
|
// sanity check. has this slot been excised from linked list?
|
|
if (slot->m_activeListPrev && slot->m_activeListPrev->m_activeListNext != slot) {
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
slot->m_callback(slot->m_state, slot);
|
|
|
|
if ( g_conf.m_logDebugLoop )
|
|
log(LOG_DEBUG,"loop: exit callback for 0x%" PRIx32" "
|
|
"nice=%" PRId32,(int32_t)slot->getMsgType(),slot->getNiceness());
|
|
|
|
if ( g_conf.m_maxCallbackDelay >= 0 ) {
|
|
int64_t elapsed = gettimeofdayInMilliseconds() - start;
|
|
|
|
if ( elapsed >= g_conf.m_maxCallbackDelay ) {
|
|
log(LOG_WARN, "UdpServer Took %" PRId64" ms to call callback for msgType=0x%02x niceness=%" PRId32,
|
|
elapsed, (int)slot->getMsgType(), (int32_t)slot->getNiceness());
|
|
}
|
|
}
|
|
|
|
// time it
|
|
logDebug(g_conf.m_logDebugUdp, "udp: Reply callback tid=%d slot=%p took %" PRId64" ms.",
|
|
slot->getTransId(), slot, gettimeofdayInMilliseconds() - start );
|
|
|
|
// clear any g_errno that may have been set
|
|
g_errno = 0;
|
|
|
|
// . now lets destroy the slot, bufs and all
|
|
// . if the caller wanted to hang on to request or reply then
|
|
// it should NULLify slot->m_sendBuf and/or slot->m_readBuf
|
|
destroySlot ( slot );
|
|
return true;
|
|
}
|
|
|
|
// we can only reach here if it's incoming
|
|
if (!slot->isIncoming()) {
|
|
gbshutdownLogicError();
|
|
}
|
|
|
|
// don't repeat call the handler if we already called it
|
|
if ( slot->hasCalledHandler() ) {
|
|
// . if transaction has not fully completed, keep sending
|
|
// . unless there was an error
|
|
if ( ! g_errno &&
|
|
! slot->isTransactionComplete() &&
|
|
! slot->getErrno() ) {
|
|
if ( g_conf.m_logDebugUdp )
|
|
log("udp: why calling handler "
|
|
"when not ready?");
|
|
return false;
|
|
}
|
|
|
|
// . now we sent the reply so try calling callback2
|
|
// . this is usually NULL, but so I could make pretty graphs
|
|
// of transmission time it won't be
|
|
// . if callback2 is hot it will be called here, possibly,
|
|
// more than once, but we also call m_callback2 later, too,
|
|
// since we cannot call destroySlot() in a hot sig handler
|
|
if ( slot->m_callback2 ) {
|
|
if ( g_conf.m_logDebugLoop )
|
|
log(LOG_DEBUG,"loop: enter callback2 for "
|
|
"0x%" PRIx32,(int32_t)slot->getMsgType());
|
|
|
|
if ( g_conf.m_maxCallbackDelay >= 0 ) {
|
|
start = gettimeofdayInMilliseconds();
|
|
}
|
|
|
|
// call it
|
|
slot->m_callback2 ( slot->m_state , slot );
|
|
|
|
if ( g_conf.m_logDebugLoop )
|
|
log(LOG_DEBUG,"loop: exit callback2 for 0x%" PRIx32,
|
|
(int32_t)slot->getMsgType());
|
|
|
|
|
|
if ( g_conf.m_maxCallbackDelay >= 0 ) {
|
|
int64_t elapsed = gettimeofdayInMilliseconds() - start;
|
|
|
|
if ( elapsed >= g_conf.m_maxCallbackDelay ) {
|
|
log(LOG_WARN, "UdpServer Took %" PRId64" ms to call callback2 for msgType=0x%02x niceness=%" PRId32,
|
|
elapsed, (int)slot->getMsgType(), (int32_t)slot->getNiceness());
|
|
}
|
|
}
|
|
|
|
|
|
// debug msg
|
|
if ( g_conf.m_logDebugUdp ) {
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
int64_t took = now - start ;
|
|
//if ( took > 10 )
|
|
log(LOG_DEBUG,
|
|
"udp: Callback2 tid=%" PRId32" "
|
|
"msgType=0x%02x "
|
|
"g_errno=%s callback2=%p"
|
|
" took %" PRId64" ms.",
|
|
slot->getTransId(), (int)msgType,
|
|
mstrerror(g_errno),
|
|
slot->m_callback2,
|
|
took );
|
|
}
|
|
// clear any g_errno that may have been set
|
|
g_errno = 0;
|
|
}
|
|
// nuke the slot, we gave them a reply...
|
|
destroySlot ( slot );
|
|
// this kind of callback doesn't count
|
|
return false;
|
|
}
|
|
|
|
// . if we're not done reading the request, don't call the handler
|
|
// . we now destroy it if the request timed out
|
|
if ( ! slot->isDoneReading () ) {
|
|
// . if g_errno not set, keep reading the new request
|
|
// . otherwise it's usually EUDPTIMEOUT, set by readTimeoutPoll
|
|
// . multicast will abandon sending a request if it doesn't
|
|
// get a response in X seconds, then it may move on to
|
|
// using another transaction id to resend the request
|
|
if ( ! g_errno ) return false;
|
|
// log a msg
|
|
log(LOG_LOGIC,
|
|
"udp: makeCallback: Requester stopped sending: %s.",
|
|
mstrerror(g_errno));
|
|
// . nuke the half-ass request slot
|
|
// . now if they continue later to send this request we
|
|
// will auto-ACK the dgrams, but we won't send a reply and
|
|
// the requester will time out waiting for the reply
|
|
destroySlot ( slot );
|
|
return false;
|
|
}
|
|
|
|
// . otherwise it was an incoming request we haven't answered yet
|
|
// . call the registered handler to handle it
|
|
// . bail if no handler
|
|
if ( ! m_handlers [ msgType ] ) {
|
|
log(LOG_LOGIC,
|
|
"udp: makeCallback: Recvd unsupported msg type 0x%02x."
|
|
" Did you forget to call registerHandler() for your "
|
|
"message class from main.cpp?", (char)msgType);
|
|
destroySlot ( slot );
|
|
return false;
|
|
}
|
|
|
|
logDebug(g_conf.m_logDebugUdp, "udp: Calling handler for tid=%" PRId32" slot=%p msgType=0x%02x.",
|
|
slot->getTransId(), slot, (int)msgType);
|
|
|
|
// record some statistics on how long this was waiting to be called
|
|
now = gettimeofdayInMilliseconds();
|
|
delta = now - slot->m_queuedTime;
|
|
// sanity check
|
|
if ( slot->m_queuedTime == -1 ) { g_process.shutdownAbort(true); }
|
|
n = slot->getNiceness();
|
|
if ( n < 0 ) n = 0;
|
|
if ( n > 1 ) n = 1;
|
|
// add to average
|
|
g_stats.m_msgTotalOfQueuedTimes [msgType][n] += delta;
|
|
g_stats.m_msgTotalQueued [msgType][n]++;
|
|
// bucket number is log base 2 of the delta
|
|
if ( delta > 64000 ) delta = 64000;
|
|
bucket = getHighestLitBit ( (uint16_t)delta );
|
|
// MAX_BUCKETS is probably 16 and #define'd in Stats.h
|
|
if ( bucket >= MAX_BUCKETS ) bucket = MAX_BUCKETS-1;
|
|
g_stats.m_msgTotalQueuedByTime [msgType][n][bucket]++;
|
|
|
|
|
|
// time it
|
|
start = now; // gettimeofdayInMilliseconds();
|
|
|
|
// use this for recording how long it takes to generate the reply
|
|
slot->m_queuedTime = now;
|
|
|
|
// log it now
|
|
if ( g_conf.m_logDebugLoop )
|
|
log(LOG_DEBUG,"loop: enter handler for 0x%" PRIx32" nice=%" PRId32,
|
|
(int32_t)slot->getMsgType(),(int32_t)slot->getNiceness());
|
|
|
|
bool oom = g_mem.getUsedMemPercentage() >= 99.0;
|
|
|
|
// if we are out of mem basically, do not waste time fucking around
|
|
if ( slot->getNiceness() == 0 && oom ) {
|
|
// log it
|
|
static int32_t lcount = 0;
|
|
if ( lcount == 0 )
|
|
log(LOG_DEBUG,"loop: sending back enomem for ""msg 0x%02x", (int)slot->getMsgType());
|
|
if ( ++lcount == 20 ) lcount = 0;
|
|
|
|
g_consecutiveOOMErrors++;
|
|
|
|
log(LOG_ERROR,"%s:%s:%d: call sendErrorReply. UsedMem=%" PRId64", MaxMem=%" PRId64, __FILE__, __func__, __LINE__, g_mem.getUsedMem(), g_mem.getMaxMem() );
|
|
sendErrorReply ( slot , ENOMEM );
|
|
|
|
if( g_consecutiveOOMErrors == 200 )
|
|
{
|
|
log(LOG_ERROR,"%s:%s:%d: 200 replies could not be sent due to of Out of Memory. SHUTTING DOWN.", __FILE__, __func__, __LINE__);
|
|
g_process.shutdownAbort(false);
|
|
}
|
|
}
|
|
else {
|
|
if( !oom )
|
|
{
|
|
g_consecutiveOOMErrors = 0;
|
|
}
|
|
|
|
// sanity
|
|
if ( slot->hasCalledHandler() ) {
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
// set this here now so it doesn't get its niceness converted
|
|
// then it re-enters the same handler here but in a quickpoll!
|
|
slot->m_calledHandler = true;
|
|
|
|
// sanity so msg0.cpp hack works
|
|
if ( slot->getNiceness() == 99 ) { g_process.shutdownAbort(true); }
|
|
// . this is the niceness of the server, not the slot
|
|
// . NO, now it is the slot's niceness. that makes sense.
|
|
m_handlers [ slot->getMsgType() ] ( slot , slot->getNiceness() ) ;
|
|
}
|
|
|
|
if ( g_conf.m_logDebugLoop )
|
|
log(LOG_DEBUG,"loop: exit handler for 0x%" PRIx32" nice=%" PRId32,
|
|
(int32_t)slot->getMsgType(),(int32_t)slot->getNiceness());
|
|
|
|
// we called the handler, don't call it again
|
|
slot->m_calledHandler = true;
|
|
|
|
// g_errno was set from m_errno before calling the handler, but to
|
|
// make sure the slot doesn't get destroyed now, reset this to 0. see
|
|
// comment about Msg20 above.
|
|
slot->m_errno = 0;
|
|
|
|
if ( g_conf.m_maxCallbackDelay >= 0 ) {
|
|
int64_t elapsed = gettimeofdayInMilliseconds() - start;
|
|
|
|
if ( elapsed >= g_conf.m_maxCallbackDelay ) {
|
|
log(LOG_WARN, "UdpServer Took %" PRId64" ms to call "
|
|
"HANDLER for msgType=0x%02x niceness=%" PRId32,
|
|
elapsed, (int)slot->getMsgType(), (int32_t)slot->getNiceness());
|
|
}
|
|
}
|
|
|
|
// this is kinda obsolete now that we have the stats above
|
|
if ( g_conf.m_logDebugNet ) {
|
|
int64_t took = gettimeofdayInMilliseconds() - start;
|
|
log(LOG_DEBUG,"net: Handler tid=%" PRId32" slot=%p "
|
|
"msgType=0x%02x msgSize=%" PRId32" "
|
|
"g_errno=%s callback=%p "
|
|
"niceness=%" PRId32" "
|
|
"took %" PRId64" ms.",
|
|
(int32_t)slot->getTransId() , slot,
|
|
(int)msgType, (int32_t)slot->m_readBufSize , mstrerror(g_errno),
|
|
slot->m_callback,
|
|
(int32_t)slot->getNiceness(),
|
|
took );
|
|
}
|
|
|
|
// clear any g_errno that may have been set
|
|
g_errno = 0;
|
|
// calling a handler counts
|
|
return true;
|
|
}
|
|
|
|
// this wrapper is called every 15 ms by the Loop class
|
|
void UdpServer::timePollWrapper(int fd, void *state) {
|
|
UdpServer *that = static_cast<UdpServer*>(state);
|
|
that->timePoll();
|
|
}
|
|
|
|
void UdpServer::timePoll ( ) {
|
|
//no active slots -> nothing to do
|
|
{
|
|
ScopedLock sl(m_mtx);
|
|
if ( ! m_activeListHead ) return;
|
|
}
|
|
|
|
// get time now
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
// before timing everyone out or starting resends, just to make
|
|
// sure we read everything. we have have just been blocking on a interrupt
|
|
// handler or callback or sequence of those things and have stuff
|
|
// waiting to be read.
|
|
process(now);
|
|
// get again if changed
|
|
now = gettimeofdayInMilliseconds();
|
|
// loop:
|
|
// do read/send/callbacks
|
|
// process(now);
|
|
// then do the timeout-ing
|
|
if ( readTimeoutPoll ( now ) ) {
|
|
// if we timed something out or reset it then call the
|
|
// callbacks to do sending and loop back up
|
|
makeCallbacks(MAX_NICENESS);
|
|
}
|
|
}
|
|
|
|
|
|
// . this is called once per second
|
|
// . return false and sets g_errno on error
|
|
// . calls the callback of REPLY-reception slots that have timed out
|
|
// . just nuke the REQUEST-reception slots that have timed out
|
|
// . returns true if we timed one out OR reset one for resending
|
|
bool UdpServer::readTimeoutPoll ( int64_t now ) {
|
|
// did we do something? assume not.
|
|
bool something = false;
|
|
// loop over occupied slots
|
|
ScopedLock sl(m_mtx);
|
|
for ( UdpSlot *slot = m_activeListHead ; slot ; slot = slot->m_activeListNext ) {
|
|
// clear g_errno
|
|
g_errno = 0;
|
|
// debug msg
|
|
if ( g_conf.m_logDebugUdp ) {
|
|
char ipbuf[16];
|
|
log(LOG_DEBUG,
|
|
"udp: resend TRY tid=%" PRId32" "
|
|
"dst=%s:%hu "
|
|
"doneReading=%" PRId32" "
|
|
"dgramsToSend=%" PRId32" "
|
|
"resendTime=%" PRId32" "
|
|
"lastReadTime=%" PRIu64" "
|
|
"delta=%" PRIu64" "
|
|
"lastSendTime=%" PRIu64" "
|
|
"delta=%" PRIu64" "
|
|
"timeout=%" PRIu64" "
|
|
"sentBitsOn=%" PRId32" "
|
|
"readAckBitsOn=%" PRId32" ",
|
|
slot->getTransId(),
|
|
iptoa(slot->getIp(),ipbuf),
|
|
(uint16_t) slot->getPort(),
|
|
(int32_t) slot->isDoneReading(),
|
|
slot->getDatagramsToSend(),
|
|
slot->getResendTime(),
|
|
(uint64_t) slot->getLastReadTime(),
|
|
(uint64_t) (now - slot->getLastReadTime()),
|
|
(uint64_t) slot->getLastSendTime(),
|
|
(uint64_t) (now - slot->getLastSendTime()),
|
|
(uint64_t) slot->getTimeout(),
|
|
slot->m_sentBitsOn,
|
|
slot->m_readAckBitsOn);
|
|
}
|
|
|
|
// if the reading is completed, but we haven't generated a
|
|
// reply yet, then continue because when reply is generated
|
|
// UdpServer::sendReply(slot) will be called and we don't
|
|
// want slot to be destroyed because it timed out...
|
|
if ( slot->isDoneReading() && slot->getDatagramsToSend() <= 0 ) {
|
|
continue;
|
|
}
|
|
|
|
// fix if clock changed!
|
|
if ( slot->getLastReadTime() > now ) {
|
|
slot->m_lastReadTime = now;
|
|
}
|
|
if ( slot->getLastSendTime() > now ) {
|
|
slot->m_lastSendTime = now;
|
|
}
|
|
|
|
// get time elapsed since last read
|
|
int64_t elapsed = now - slot->getLastReadTime();
|
|
// set all timeouts to 4 secs if we are shutting down
|
|
if ( m_isShuttingDown && slot->getTimeout() > 4000 ) {
|
|
slot->m_timeout = 4000;
|
|
}
|
|
|
|
// . deal w/ slots that are timed out
|
|
// . could be 1 of the 4 things:
|
|
// . 1. they take too long to send their reply
|
|
// . 2. they take too long to send their request
|
|
// . 3. they take too long to ACK our reply
|
|
// . 4. they take too long to ACK our request
|
|
// . only flag it if we haven't already...
|
|
if ( elapsed >= slot->getTimeout() && slot->getErrno() != EUDPTIMEDOUT ) {
|
|
logDebug(g_conf.m_logDebugUdp, "udp: timeout reached for tid=%" PRId32" slot=%p ", slot->m_transId, slot);
|
|
|
|
// . set slot's m_errno field
|
|
// . makeCallbacks() should call its callback
|
|
slot->m_errno = EUDPTIMEDOUT;
|
|
// prepare to call the callback by adding it to this
|
|
// special linked list
|
|
addToCallbackLinkedList_unlocked(slot);
|
|
// let caller know we did something
|
|
something = true;
|
|
// keep going
|
|
continue;
|
|
}
|
|
|
|
// Time out the slot if the host has been detected as unresponsive.
|
|
if(slot->m_host && g_hostdb.isDead(slot->m_host)) {
|
|
logDebug(g_conf.m_logDebugUdp, "udp: host #%" PRId32" is dead for tid=%" PRId32" slot=%p ",
|
|
slot->m_host->m_hostId, slot->m_transId, slot);
|
|
|
|
slot->m_errno = EUDPTIMEDOUT;
|
|
addToCallbackLinkedList_unlocked(slot);
|
|
something = true;
|
|
continue;
|
|
}
|
|
|
|
// how long since last send?
|
|
int64_t delta = now - slot->getLastSendTime();
|
|
|
|
// if elapsed is negative, then someone changed the system
|
|
// clock on us, so it won't hurt to resend just to update
|
|
// otherwise, we could be waiting years to resend
|
|
if ( delta < 0 ) {
|
|
delta = slot->getResendTime();
|
|
}
|
|
|
|
// continue if we just sent something
|
|
if ( delta < slot->getResendTime() ) {
|
|
continue;
|
|
}
|
|
|
|
// if we don't have anything ready to send continue
|
|
if ( slot->getDatagramsToSend() <= 0 ) continue;
|
|
// if shutting down, rather than resending the reply, just
|
|
// force it as if it were sent. then makeCallbacks can
|
|
// destroy it.
|
|
if ( m_isShuttingDown ) {
|
|
// do not let this function free the buffers, they
|
|
// may not be allocated really. this may cause a memory
|
|
// leak.
|
|
slot->m_readBuf = NULL;
|
|
slot->m_sendBufAlloc = NULL;
|
|
// just nuke the slot... this will leave the memory
|
|
// leaked... (memleak, memory leak, memoryleak)
|
|
destroySlot_unlocked(slot);
|
|
continue;
|
|
}
|
|
// should we resend all dgrams?
|
|
bool resendAll = false;
|
|
// . HACK: if our request was sent but 30 seconds have passed
|
|
// and we got no reply, resend our whole request!
|
|
// . this fixes the stuck Msg10 fiasco because it uses
|
|
// timeouts of 1 year
|
|
// . this is mainly for msgs with infinite timeouts
|
|
// . so if recpipient crashes and comes back up later then
|
|
// we can resend him EVERYTHING!!
|
|
// . TODO: what if we get reply before we sent everything!?!?
|
|
// . if over 30 secs has passed, resend it ALL!!
|
|
// . this will reset the sent bits and read ack bits
|
|
if ( slot->m_sentBitsOn == slot->m_readAckBitsOn ) {
|
|
// give him 30 seconds to send a reply
|
|
if ( elapsed < 30000 ) continue;
|
|
// otherwise, resend the whole thing, he
|
|
resendAll = true;
|
|
}
|
|
|
|
//
|
|
// SHIT, sometimes a summary generator on a huge asian lang
|
|
// page takes over 1 second and we are unable to send acks
|
|
// for an incoming msg20 request etc, and this code triggers..
|
|
// maybe QUICKPOLL(0) should at least send/read the udp ports?
|
|
//
|
|
// FOR NOW though since hosts do not go down that much
|
|
// let's also require that it has been 5 secs or more...
|
|
//
|
|
|
|
int64_t timeout = 5000;
|
|
// spider time requests typically have timeouts of 1 year!
|
|
// so we end up waiting for the host to come back online
|
|
// before the spider can proceed.
|
|
if ( slot->getNiceness() ) {
|
|
timeout = slot->getTimeout();
|
|
}
|
|
|
|
// check it
|
|
if ( slot->m_maxResends >= 0 &&
|
|
// if maxResends it 0, do not do ANY resend! just err out.
|
|
slot->getResendCount() >= slot->m_maxResends &&
|
|
// did not get all acks
|
|
slot->m_sentBitsOn > slot->m_readAckBitsOn &&
|
|
// respect slot's timeout too!
|
|
elapsed > timeout &&
|
|
// only do this when sending a request
|
|
slot->hasCallback() ) {
|
|
// should this be ENOACK or something?
|
|
slot->m_errno = EUDPTIMEDOUT;
|
|
// prepare to call the callback by adding it to this
|
|
// special linked list
|
|
addToCallbackLinkedList_unlocked(slot);
|
|
// let caller know we did something
|
|
something = true;
|
|
// note it
|
|
log(LOG_INFO, "udp: Timing out slot (msgType=0x%" PRIx32") "
|
|
"after %" PRId32" resends. hostid=%" PRId32" "
|
|
"(elapsed=%" PRId64")" ,
|
|
(int32_t)slot->getMsgType(),
|
|
(int32_t)slot->getResendCount() ,
|
|
slot->getHostId(),elapsed);
|
|
// keep going
|
|
continue;
|
|
}
|
|
// . this should clear the sentBits of all unacked dgrams
|
|
// so they can be resent
|
|
// . this doubles m_resendTime and updates m_resendCount
|
|
slot->prepareForResend ( now , resendAll );
|
|
// . we resend our first unACKed dgram if some time has passed
|
|
// . send as much as we can on this slot
|
|
doSending_unlocked(slot, true /*allow resends?*/, now);
|
|
// return if we had an error sending, like EBADF we get
|
|
// when we've shut down the servers...
|
|
if ( g_errno == EBADF ) return something;
|
|
|
|
something = true;
|
|
}
|
|
// return true if we did something
|
|
return something;
|
|
}
|
|
|
|
void UdpServer::destroySlot(UdpSlot *slot) {
|
|
ScopedLock sl(m_mtx);
|
|
destroySlot_unlocked(slot);
|
|
}
|
|
|
|
// . IMPORTANT: only called for transactions that we initiated!!!
|
|
// so we know to set the key.n0 hi bit
|
|
// . may be called twice on same slot by Multicast::destroySlotsInProgress()
|
|
void UdpServer::destroySlot_unlocked( UdpSlot *slot ) {
|
|
if (!slot) {
|
|
gbshutdownLogicError();
|
|
}
|
|
|
|
logDebug(g_conf.m_logDebugUdp, "udp: destroy tid=%d slot=%p", slot->getTransId(), slot);
|
|
|
|
// if we're deleting a slot that was an incoming request then
|
|
// decrement m_requestsInWaiting (exclude pings)
|
|
if ( ! slot->hasCallback() ) {
|
|
// one less request in waiting
|
|
m_requestsInWaiting--;
|
|
// special count
|
|
if ( slot->getMsgType() == msg_type_7 ) m_msg07sInWaiting--;
|
|
if ( slot->getMsgType() == msg_type_25 ) m_msg25sInWaiting--;
|
|
if ( slot->getMsgType() == msg_type_39 ) m_msg39sInWaiting--;
|
|
if ( slot->getMsgType() == msg_type_20 ) m_msg20sInWaiting--;
|
|
if ( slot->getMsgType() == msg_type_c ) m_msg0csInWaiting--;
|
|
if ( slot->getMsgType() == msg_type_0 ) m_msg0sInWaiting--;
|
|
}
|
|
|
|
// save buf ptrs so we can free them
|
|
char *rbuf = slot->m_readBuf;
|
|
int32_t rbufSize = slot->m_readBufMaxSize;
|
|
char *sbuf = slot->m_sendBufAlloc;
|
|
int32_t sbufSize = slot->m_sendBufAllocSize;
|
|
// don't free our static buffer
|
|
// sometimes handlers will use our slots m_shortSendBuffer to store the reply
|
|
if ( sbuf == slot->m_shortSendBuffer ) sbuf = NULL;
|
|
// nothing allocated. used by Msg13.cpp g_fakeBuf
|
|
if ( sbufSize == 0 ) sbuf = NULL;
|
|
|
|
// NULLify here now just in case
|
|
slot->m_readBuf = NULL;
|
|
slot->m_sendBuf = NULL;
|
|
slot->m_sendBufAlloc = NULL;
|
|
|
|
// . sig handler may allocate new read buf here!!!!... but not now
|
|
// since we turned interrupts off
|
|
// . free this slot available right away so sig handler won't
|
|
// write into m_readBuf or use m_sendBuf, but it may claim it!
|
|
freeUdpSlot_unlocked(slot);
|
|
|
|
// free the send/read buffers
|
|
if ( rbuf ) mfree ( rbuf , rbufSize , "UdpServer");
|
|
if ( sbuf ) mfree ( sbuf , sbufSize , "UdpServer");
|
|
}
|
|
|
|
|
|
|
|
// . called once per second from Process.cpp::shutdown2() when we are trying
|
|
// to shutdown
|
|
// . we'll stop answering ping requests
|
|
// . we'll wait for replies to those notes, but timeout is 3 seconds
|
|
// we're shutting down so they won't bother sending requests to us
|
|
// . this will wait until all fully received requests have had their
|
|
// reply sent to them
|
|
// . in the meantime it will send back error replies to all new
|
|
// incoming requests
|
|
// . this will do a blocking close on the listening socket descriptor
|
|
// . this will call the callback when shutdown was completed
|
|
// . returns false if blocked, true otherwise
|
|
// . set g_errno on error
|
|
bool UdpServer::shutdown ( bool urgent ) {
|
|
|
|
if ( ! m_isShuttingDown && m_port == 0 )
|
|
log(LOG_INFO,"gb: Shutting down dns resolver.");
|
|
else if ( ! m_isShuttingDown )
|
|
log(LOG_INFO,"gb: Shutting down udp server port %hu.",m_port);
|
|
|
|
ScopedLock sl(m_mtx);
|
|
|
|
// so we know not to accept new connections
|
|
m_isShuttingDown = true;
|
|
|
|
// wait for all transactions to complete
|
|
time_t now = getTime();
|
|
int32_t count = 0;
|
|
if(!urgent) {
|
|
for ( UdpSlot *slot = m_activeListHead ; slot ; slot = slot->m_activeListNext ) {
|
|
// if we initiated, then don't count it
|
|
if ( slot->hasCallback() ) continue;
|
|
// set all timeouts to 3 secs
|
|
if ( slot->getTimeout() > 3000 ) {
|
|
slot->m_timeout = 3000;
|
|
}
|
|
// . don't count lagging slots that haven't got
|
|
// a read in 5 sec
|
|
if ( now - slot->getLastReadTime() > 5 ) continue;
|
|
// don't count if timer fucked up
|
|
if ( now - slot->getLastReadTime() < 0 ) continue;
|
|
// count it
|
|
count++;
|
|
}
|
|
}
|
|
if ( count > 0 ) {
|
|
log(LOG_LOGIC,"udp: stilll processing udp traffic after "
|
|
"shutdown note was sent.");
|
|
return false;
|
|
}
|
|
|
|
if ( m_port == 0 )
|
|
log(LOG_INFO,"gb: Closing dns resolver.");
|
|
else
|
|
log(LOG_INFO,"gb: Closing udp server socket port %hu.",m_port);
|
|
|
|
// close our socket descriptor, may block to finish sending
|
|
int s = m_sock;
|
|
// . make it -1 so thread exits
|
|
// . g_process.shutdown2() will wait untill all threads exit before
|
|
// exiting the main process
|
|
// . the timepollwrapper should kick our udp thread out of its
|
|
// lock on recvfrom so that it will see that m_sock is -1 and
|
|
// it will exit
|
|
m_sock = -1;
|
|
// then close it
|
|
close ( s );
|
|
|
|
if ( m_port == 0 )
|
|
log(LOG_INFO,"gb: Shut down dns resolver successfully.");
|
|
else
|
|
log(LOG_INFO,"gb: Shut down udp server port %hu successfully.",
|
|
m_port);
|
|
|
|
// all done
|
|
return true;
|
|
}
|
|
|
|
int32_t UdpServer::getTransId_unlocked() {
|
|
m_mtx.verify_is_locked();
|
|
|
|
int32_t tid = m_nextTransId++;
|
|
if ( m_nextTransId >= UDP_MAX_TRANSID ) {
|
|
m_nextTransId = 0;
|
|
}
|
|
return tid;
|
|
}
|
|
|
|
// verified that this is not interruptible
|
|
UdpSlot *UdpServer::getEmptyUdpSlot_unlocked(key96_t k, bool incoming) {
|
|
m_mtx.verify_is_locked();
|
|
|
|
UdpSlot *slot = removeFromAvailableLinkedList_unlocked();
|
|
if (!slot) {
|
|
// return NULL if none left
|
|
g_errno = ENOSLOTS;
|
|
if (g_conf.m_logNetCongestion) {
|
|
log(LOG_WARN, "udp: %" PRId32" of %" PRId32" udp slots occupied. None available to handle this new transaction.",
|
|
(int32_t) m_numUsedSlots, (int32_t) m_maxSlots);
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
addToActiveLinkedList_unlocked(slot);
|
|
|
|
// count it
|
|
m_numUsedSlots++;
|
|
|
|
if ( incoming ) {
|
|
m_numUsedSlotsIncoming++;
|
|
}
|
|
|
|
slot->m_slotStatus = incoming ? UdpSlot::slot_status_incoming : UdpSlot::slot_status_outgoing;
|
|
|
|
// now store ptr in hash table
|
|
slot->m_key = k;
|
|
addKey_unlocked(k, slot);
|
|
|
|
logDebug(g_conf.m_logDebugUdp, "udp: get %s empty slot=%p with key=%s", incoming ? "incoming" : "outgoing", slot, KEYSTR(&k, sizeof(key96_t)));
|
|
return slot;
|
|
}
|
|
|
|
void UdpServer::addKey_unlocked(key96_t k, UdpSlot *ptr) {
|
|
m_mtx.verify_is_locked();
|
|
|
|
logDebug(g_conf.m_logDebugUdp, "udp: add key=%s with slot=%p", KEYSTR(&k, sizeof(key96_t)), ptr);
|
|
|
|
// we assume that k.n1 is the transId. if this changes we should
|
|
// change this to keep our hash lookups fast
|
|
int32_t i = hashLong(k.n1) & m_bucketMask;
|
|
while ( m_ptrs[i] )
|
|
if ( ++i >= m_numBuckets ) i = 0;
|
|
m_ptrs[i] = ptr;
|
|
}
|
|
|
|
// verify that interrupts are always off before calling this
|
|
UdpSlot *UdpServer::getUdpSlot_unlocked(key96_t k) {
|
|
m_mtx.verify_is_locked();
|
|
|
|
// . hash into table
|
|
// . transId is key.n1, use that as hash
|
|
// . m_numBuckets must be a power of 2
|
|
int32_t i = hashLong(k.n1) & m_bucketMask;
|
|
while ( m_ptrs[i] && m_ptrs[i]->m_key != k ) {
|
|
if (++i >= m_numBuckets) {
|
|
i = 0;
|
|
}
|
|
}
|
|
|
|
// if empty, return NULL
|
|
return m_ptrs[i];
|
|
}
|
|
|
|
void UdpServer::addToAvailableLinkedList_unlocked(UdpSlot *slot) {
|
|
m_mtx.verify_is_locked();
|
|
|
|
log(LOG_DEBUG, "udp: adding tid=%d slot=%p to available list", slot->getTransId(), slot);
|
|
|
|
slot->m_availableListNext = m_availableListHead;
|
|
m_availableListHead = slot;
|
|
}
|
|
|
|
UdpSlot* UdpServer::removeFromAvailableLinkedList_unlocked() {
|
|
m_mtx.verify_is_locked();
|
|
|
|
// return NULL if none left
|
|
if ( ! m_availableListHead ) {
|
|
logDebug(g_conf.m_logDebugUdp, "udp: unable to remove slot from available list");
|
|
return NULL;
|
|
}
|
|
|
|
UdpSlot *slot = m_availableListHead;
|
|
|
|
// remove from linked list of available slots
|
|
m_availableListHead = slot->m_availableListNext;
|
|
|
|
logDebug(g_conf.m_logDebugUdp, "udp: removing slot=%p from available list", slot);
|
|
|
|
return slot;
|
|
}
|
|
|
|
void UdpServer::addToCallbackLinkedList_unlocked(UdpSlot *slot) {
|
|
m_mtx.verify_is_locked();
|
|
|
|
// debug log
|
|
if (g_conf.m_logDebugUdp) {
|
|
if (slot->getErrno()) {
|
|
log(LOG_DEBUG, "udp: adding tid=%d slot=%p with err=%s to callback list",
|
|
slot->getTransId(), slot, mstrerror(slot->m_errno) );
|
|
} else {
|
|
log(LOG_DEBUG, "udp: adding tid=%d slot=%p to callback list", slot->getTransId(), slot);
|
|
}
|
|
}
|
|
|
|
// must not be in there already, lest we double add it
|
|
if (isInCallbackLinkedList_unlocked(slot) ) {
|
|
logDebug(g_conf.m_logDebugUdp, "udp: avoided double add slot=%p", slot);
|
|
return;
|
|
}
|
|
|
|
slot->m_callbackListNext = NULL;
|
|
slot->m_callbackListPrev = NULL;
|
|
|
|
if ( ! m_callbackListTail ) {
|
|
m_callbackListHead = slot;
|
|
m_callbackListTail = slot;
|
|
} else {
|
|
// insert at end of linked list otherwise
|
|
m_callbackListTail->m_callbackListNext = slot;
|
|
slot->m_callbackListPrev = m_callbackListTail;
|
|
m_callbackListTail = slot;
|
|
}
|
|
}
|
|
|
|
bool UdpServer::isInCallbackLinkedList_unlocked(UdpSlot *slot) {
|
|
m_mtx.verify_is_locked();
|
|
|
|
// return if not in the linked list
|
|
if (slot->m_callbackListPrev || slot->m_callbackListNext || m_callbackListHead == slot) {
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
void UdpServer::removeFromCallbackLinkedList_unlocked(UdpSlot *slot) {
|
|
m_mtx.verify_is_locked();
|
|
|
|
logDebug(g_conf.m_logDebugUdp, "udp: removing tid=%d slot=%p from callback list", slot->getTransId(), slot);
|
|
|
|
// return if not in the linked list
|
|
if ( slot->m_callbackListPrev == NULL && slot->m_callbackListNext == NULL && m_callbackListHead != slot ) {
|
|
return;
|
|
}
|
|
|
|
// excise from linked list otherwise
|
|
if ( m_callbackListHead == slot ) {
|
|
m_callbackListHead = slot->m_callbackListNext;
|
|
}
|
|
if ( m_callbackListTail == slot )
|
|
m_callbackListTail = slot->m_callbackListPrev;
|
|
|
|
if ( slot->m_callbackListPrev ) {
|
|
slot->m_callbackListPrev->m_callbackListNext = slot->m_callbackListNext;
|
|
}
|
|
|
|
if ( slot->m_callbackListNext ) {
|
|
slot->m_callbackListNext->m_callbackListPrev = slot->m_callbackListPrev;
|
|
}
|
|
|
|
// and so we do not try to re-excise it
|
|
slot->m_callbackListPrev = NULL;
|
|
slot->m_callbackListNext = NULL;
|
|
}
|
|
|
|
void UdpServer::addToActiveLinkedList_unlocked(UdpSlot *slot) {
|
|
m_mtx.verify_is_locked();
|
|
|
|
logDebug(g_conf.m_logDebugUdp, "udp: adding slot=%p to active list", slot);
|
|
|
|
// put the used slot at the tail so older slots are at the head and
|
|
// makeCallbacks() can take care of the callbacks that have been
|
|
// waiting the longest first...
|
|
|
|
slot->m_activeListNext = NULL;
|
|
slot->m_activeListPrev = NULL;
|
|
|
|
if (m_activeListTail) {
|
|
// insert at end of linked list otherwise
|
|
m_activeListTail->m_activeListNext = slot;
|
|
slot->m_activeListPrev = m_activeListTail;
|
|
m_activeListTail = slot;
|
|
} else {
|
|
m_activeListHead = slot;
|
|
m_activeListTail = slot;
|
|
}
|
|
}
|
|
|
|
void UdpServer::removeFromActiveLinkedList_unlocked(UdpSlot *slot) {
|
|
m_mtx.verify_is_locked();
|
|
|
|
logDebug(g_conf.m_logDebugUdp, "udp: removing tid=%d slot=%p from active list", slot->getTransId(), slot);
|
|
|
|
// return if not in the linked list
|
|
if ( slot->m_activeListPrev == NULL && slot->m_activeListNext == NULL && m_activeListHead != slot ) {
|
|
return;
|
|
}
|
|
|
|
// excise from linked list otherwise
|
|
if ( m_activeListHead == slot ) {
|
|
m_activeListHead = slot->m_activeListNext;
|
|
}
|
|
if ( m_activeListTail == slot )
|
|
m_activeListTail = slot->m_activeListPrev;
|
|
|
|
if ( slot->m_activeListPrev ) {
|
|
slot->m_activeListPrev->m_activeListNext = slot->m_activeListNext;
|
|
}
|
|
|
|
if ( slot->m_activeListNext ) {
|
|
slot->m_activeListNext->m_activeListPrev = slot->m_activeListPrev;
|
|
}
|
|
|
|
// and so we do not try to re-excise it
|
|
slot->m_activeListPrev = NULL;
|
|
slot->m_activeListNext = NULL;
|
|
}
|
|
|
|
// verified that this is not interruptible
|
|
void UdpServer::freeUdpSlot_unlocked(UdpSlot *slot) {
|
|
m_mtx.verify_is_locked();
|
|
|
|
logDebug(g_conf.m_logDebugUdp, "udp: free tid=%d slot=%p", slot->getTransId(), slot);
|
|
|
|
removeFromActiveLinkedList_unlocked(slot);
|
|
|
|
// also from callback candidates if we should
|
|
removeFromCallbackLinkedList_unlocked(slot);
|
|
|
|
// discount it
|
|
m_numUsedSlots--;
|
|
|
|
if (slot->m_slotStatus == UdpSlot::slot_status_incoming) {
|
|
m_numUsedSlotsIncoming--;
|
|
}
|
|
slot->m_slotStatus = UdpSlot::slot_status_unused;
|
|
|
|
// reset some size variable (we may get our buffer stolen, but size has not been reset)
|
|
if (!slot->m_readBuf) {
|
|
slot->m_readBufMaxSize = 0;
|
|
slot->m_readBufSize = 0;
|
|
}
|
|
|
|
if (!slot->m_sendBuf) {
|
|
slot->m_sendBufSize = 0;
|
|
}
|
|
|
|
if (!slot->m_sendBufAlloc) {
|
|
slot->m_sendBufAllocSize = 0;
|
|
}
|
|
|
|
// add to linked list of available slots
|
|
addToAvailableLinkedList_unlocked(slot);
|
|
|
|
// . get bucket number in hash table
|
|
// . may have change since table often gets rehashed
|
|
key96_t k = slot->m_key;
|
|
int32_t i = hashLong(k.n1) & m_bucketMask;
|
|
while ( m_ptrs[i] && m_ptrs[i]->m_key != k )
|
|
if ( ++i >= m_numBuckets ) i = 0;
|
|
// sanity check
|
|
if ( ! m_ptrs[i] ) {
|
|
log(LOG_LOGIC,"udp: freeUdpSlot: Not in hash table.");
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
char ipbuf[16];
|
|
logDebug(g_conf.m_logDebugUdp, "udp: freeUdpSlot: Freeing slot tid=%" PRId32" dst=%s:%" PRIu32" slot=%p",
|
|
slot->getTransId(), iptoa(slot->getIp(),ipbuf), (uint32_t)slot->getPort(), slot);
|
|
|
|
// remove the bucket
|
|
m_ptrs [ i ] = NULL;
|
|
// rehash all buckets below
|
|
if ( ++i >= m_numBuckets ) i = 0;
|
|
// keep looping until we hit an empty slot
|
|
while ( m_ptrs[i] ) {
|
|
UdpSlot *ptr = m_ptrs[i];
|
|
m_ptrs[i] = NULL;
|
|
// re-hash it
|
|
addKey_unlocked(ptr->m_key, ptr);
|
|
if ( ++i >= m_numBuckets ) i = 0;
|
|
}
|
|
}
|
|
|
|
void UdpServer::cancel ( void *state , msg_type_t msgType ) {
|
|
// . if we have transactions in progress wait
|
|
// . but if we're waiting for a reply, don't bother
|
|
pthread_mutex_lock(&m_mtx.mtx);
|
|
for ( UdpSlot *slot = m_activeListHead ; slot ; slot = slot->m_activeListNext ) {
|
|
// skip if not a match
|
|
if (slot->m_state != state || slot->getMsgType() != msgType) {
|
|
continue;
|
|
}
|
|
|
|
// note it
|
|
log(LOG_INFO,"udp: cancelled udp tid=%d slot=%p msgType=0x%02x.", slot->getTransId(), slot, (int)slot->getMsgType());
|
|
|
|
// let them know why we are calling the callback prematurely
|
|
g_errno = ECANCELLED;
|
|
// stop waiting for reply, this will call destroySlot(), too
|
|
pthread_mutex_unlock(&m_mtx.mtx);
|
|
makeCallback(slot);
|
|
pthread_mutex_lock(&m_mtx.mtx);
|
|
}
|
|
pthread_mutex_unlock(&m_mtx.mtx);
|
|
}
|
|
|
|
void UdpServer::replaceHost ( Host *oldHost, Host *newHost ) {
|
|
log ( LOG_INFO, "udp: Replacing slots for ip: "
|
|
"%" PRIu32"/%" PRIu32" port: %" PRIu32,
|
|
(uint32_t)oldHost->m_ip,
|
|
(uint32_t)oldHost->m_ipShotgun,
|
|
(uint32_t)oldHost->m_port );//, oldHost->m_port2 );
|
|
ScopedLock sl(m_mtx);
|
|
// . loop over outstanding transactions looking for ones to oldHost
|
|
for ( UdpSlot *slot = m_activeListHead; slot; slot = slot->m_activeListNext ) {
|
|
// ignore incoming
|
|
if ( ! slot->hasCallback() ) continue;
|
|
// check for ip match
|
|
if ( slot->getIp() != oldHost->m_ip &&
|
|
slot->getIp() != oldHost->m_ipShotgun )
|
|
continue;
|
|
// check for port match
|
|
if ( this == &g_udpServer && slot->getPort() != oldHost->m_port )
|
|
continue;
|
|
// . match, replace the slot ip/port with the newHost
|
|
// . first remove the old hashed key for this slot
|
|
// . get bucket number in hash table
|
|
// . may have change since table often gets rehashed
|
|
key96_t k = slot->m_key;
|
|
int32_t i = hashLong(k.n1) & m_bucketMask;
|
|
while ( m_ptrs[i] && m_ptrs[i]->m_key != k )
|
|
if ( ++i >= m_numBuckets ) i = 0;
|
|
// sanity check
|
|
if ( ! m_ptrs[i] ) {
|
|
log(LOG_LOGIC,"udp: replaceHost: Slot not in hash table.");
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
char ipbuf[16];
|
|
logDebug(g_conf.m_logDebugUdp, "udp: replaceHost: Rehashing slot tid=%" PRId32" dst=%s:%" PRIu32" slot=%p",
|
|
slot->getTransId(), iptoa(slot->getIp(),ipbuf), (uint32_t)slot->getPort(), slot);
|
|
|
|
// remove the bucket
|
|
m_ptrs [ i ] = NULL;
|
|
// rehash all buckets below
|
|
if ( ++i >= m_numBuckets ) i = 0;
|
|
// keep looping until we hit an empty slot
|
|
while ( m_ptrs[i] ) {
|
|
UdpSlot *ptr = m_ptrs[i];
|
|
m_ptrs[i] = NULL;
|
|
// re-hash it
|
|
addKey_unlocked(ptr->m_key, ptr);
|
|
if ( ++i >= m_numBuckets ) i = 0;
|
|
}
|
|
|
|
// careful with this! if we were using shotgun, use that
|
|
// otherwise We core in PingServer because the
|
|
// m_inProgress[1-2] does net mesh
|
|
if ( slot->getIp() == oldHost->m_ip )
|
|
slot->m_ip = newHost->m_ip;
|
|
else
|
|
slot->m_ip = newHost->m_ipShotgun;
|
|
|
|
// replace the data in the slot
|
|
slot->m_port = newHost->m_port;
|
|
|
|
// . now readd the slot to the hash table
|
|
key96_t key = m_proto->makeKey ( slot->getIp(),
|
|
slot->getPort(),
|
|
slot->getTransId(),
|
|
true/*weInitiated?*/);
|
|
addKey_unlocked(key, slot);
|
|
slot->m_key = key;
|
|
slot->resetConnect();
|
|
// log it
|
|
log(LOG_INFO, "udp: Reset Slot For Replaced Host: tid=%" PRId32" msgType=%i",
|
|
slot->getTransId(), (int)slot->getMsgType());
|
|
}
|
|
}
|
|
|
|
|
|
int32_t UdpServer::getNumUsedSlots() const {
|
|
ScopedLock sl(m_mtx);
|
|
return m_numUsedSlots;
|
|
}
|
|
|
|
int32_t UdpServer::getNumUsedSlotsIncoming() const {
|
|
ScopedLock sl(m_mtx);
|
|
return m_numUsedSlotsIncoming;
|
|
}
|
|
|
|
void UdpServer::saveActiveSlots(int fd, msg_type_t msg_type) {
|
|
ScopedLock sl(m_mtx);
|
|
|
|
for (const UdpSlot *slot = m_activeListHead; slot; slot = slot->m_activeListNext) {
|
|
// skip if not wanted msg type
|
|
if (slot->getMsgType() != msg_type) {
|
|
continue;
|
|
}
|
|
|
|
// skip if got reply
|
|
if (slot->m_readBuf) {
|
|
continue;
|
|
}
|
|
|
|
// shut up gcc warning: ignoring return value
|
|
|
|
// write hostid sent to
|
|
int32_t hostId = slot->getHostId();
|
|
ssize_t ignored1 __attribute__((unused)) = write(fd, &hostId, 4);
|
|
|
|
// write that
|
|
ssize_t ignored2 __attribute__((unused)) = write(fd, &slot->m_sendBufSize, 4);
|
|
|
|
// then the buf data itself
|
|
ssize_t ignored3 __attribute__((unused)) = write(fd, slot->m_sendBuf, slot->m_sendBufSize);
|
|
}
|
|
}
|
|
|
|
std::vector<UdpStatistic> UdpServer::getStatistics() const {
|
|
ScopedLock sl(m_mtx);
|
|
|
|
std::vector<UdpStatistic> statistics;
|
|
for (const UdpSlot *slot = m_activeListHead; slot; slot = slot->m_activeListNext) {
|
|
statistics.push_back(UdpStatistic(*slot));
|
|
}
|
|
|
|
return statistics;
|
|
}
|