1983 lines
70 KiB
C++
1983 lines
70 KiB
C++
#include "gb-include.h"
|
|
|
|
#include "UdpSlot.h"
|
|
#include "UdpServer.h"
|
|
#include "Stats.h"
|
|
#include "Proxy.h"
|
|
|
|
int32_t g_cancelAcksSent = 0;
|
|
int32_t g_cancelAcksRead = 0;
|
|
|
|
// max resend time (max backoff) for niceness 0
|
|
//#define MAX_RESEND_0 700
|
|
// . i lowered this because the wire supports 1 full packet about every 120
|
|
// MICROSECONDS. so in 40ms we could send ~350 1500-byte packets!!!
|
|
// . i also lowered the ack window down to 2 dgrams so this makes more sense
|
|
//#define MAX_RESEND_0 40
|
|
// let's not clog everybody up
|
|
#define MAX_RESEND_0 200
|
|
|
|
// max resend time (max backoff) for niceness 1+
|
|
//#define MAX_RESEND_1 30000
|
|
// since high priority udp server can suspend lower, make this zippier
|
|
//#define MAX_RESEND_1 400
|
|
// let's not clog everybody up
|
|
//#define MAX_RESEND_1 800
|
|
// let's not clog up our network switch internet port
|
|
#define MAX_RESEND_1 15000
|
|
|
|
// start resend time for niceness 0
|
|
//#define RESEND_0 30
|
|
//#define RESEND_0 60
|
|
//#define RESEND_0 120
|
|
|
|
// . now i increase resend time from 120 to 250 because packets don't seem
|
|
// to be getting lost as much as before since i increase
|
|
// /proc/sys/net/core/rmem_default and rmem_max to 10Megs
|
|
// . before, when it was 65k, kernel was dropping packets like a blind waiter
|
|
// . so try 250ms now, hopefully it will cut down on uneccessary resends
|
|
// . also it would help to make UdpServer use unmasked interrupt signals
|
|
// to be more responsive
|
|
// . but now we also have a problem of doing a bunch of sends, they just
|
|
// get put on the queue and some may be silently dropped (send sendto())
|
|
// . we base this resend time assuming we sent the packet when we called
|
|
// . sigtimedwait() only has a resolution of 20ms!!! so make due...
|
|
// . i lowered this down to 20 since our window is much smaller now
|
|
// . there's typically about 120 microseconds between full packets so we
|
|
// should resend quickly!!
|
|
//#define RESEND_0 250
|
|
//#define RESEND_0 120
|
|
//#define RESEND_0 20
|
|
// keep it to 40ms due to kernel time slicing problems
|
|
//#define RESEND_0 33
|
|
// but now that we have our query compression proxy over the internet, we got
|
|
// pings that are like 50ms...
|
|
// this was at 70 but gk0 pings to scproxy1 at like 150ms a lot via
|
|
// the roadrunner wireless link, so lets crank this up
|
|
#define RESEND_0 170
|
|
|
|
// let it all spray out like a wild hose
|
|
//#define RESEND_0 30
|
|
|
|
// . for int16_t msgs we can resend more rapidly
|
|
// . it doesn't help to go lower than 20ms cuz that's sigtimedwait()'s limit
|
|
//#define RESEND_0_SHORT 20
|
|
// keep it to 40ms due to kernel time slicing problems
|
|
//#define RESEND_0_SHORT 33
|
|
// we are going over the internet to our query compression proxy now
|
|
#define RESEND_0_SHORT 170
|
|
|
|
// start resend time for niceness 1+
|
|
//#define RESEND_1 2000
|
|
// we now suspend the low priority udp server when the high one is
|
|
// processing an incoming request, so this can be zippier here
|
|
//#define RESEND_1 200
|
|
//#define RESEND_1 100
|
|
// because of roadrunner... (See above)
|
|
#define RESEND_1 200
|
|
// try to fix a bunch of msg99 replies coming into host 0 at once
|
|
#define RESEND_1_LOCAL 100
|
|
|
|
|
|
// . the ack window is back and bigger, now 100 dgrams
|
|
// . this gives the receives a chance to respond to being blasted
|
|
// . without this acks being sent back are often lost for some reason,
|
|
// ?maybe it's just loopback sends?
|
|
//#define ACK_WINDOW_SIZE 50
|
|
//#define ACK_WINDOW_SIZE 14
|
|
//#define ACK_WINDOW_SIZE 30
|
|
//#define ACK_WINDOW_SIZE 40
|
|
//#define ACK_WINDOW_SIZE 4
|
|
// spider (low priority udp server) is having troubles finishing some trans
|
|
// so let's make this a bit bigger to alleviate the problem
|
|
//#define ACK_WINDOW_SIZE 12
|
|
// i don't know if that was the cause of it, i think it might be something
|
|
// else, so to try to prevent from dropping packets (ifconfig) let's put
|
|
// this down again.
|
|
#define ACK_WINDOW_SIZE 4
|
|
|
|
// . since we're not hot yet, make this bigger than 4
|
|
// . go all out --- assuming dgram size of ~64k this should cover a 1.5 meg msg
|
|
//#define ACK_WINDOW_SIZE 250
|
|
//#define ACK_WINDOW_SIZE 20
|
|
|
|
// size of window for local transactions over loopback interface
|
|
//#define ACK_WINDOW_SIZE_LB 40
|
|
//#define ACK_WINDOW_SIZE_LB 250
|
|
//#define ACK_WINDOW_SIZE_LB 4
|
|
// spider (low priority udp server) is having troubles finishing some trans
|
|
// so let's make this a bit bigger to alleviate the problem
|
|
//#define ACK_WINDOW_SIZE_LB 12
|
|
// see comment above for why we put this back from 12 to 4
|
|
#define ACK_WINDOW_SIZE_LB 4
|
|
|
|
static char s_shotgunBit = 0;
|
|
|
|
// i add this to resend time to jiggle it so it doesn't collide as much
|
|
//static int32_t s_incDelay = 0;
|
|
|
|
void UdpSlot::connect ( UdpProtocol *proto ,
|
|
sockaddr_in *endPoint ,
|
|
Host *host ,
|
|
int32_t hostId ,
|
|
int32_t transId ,
|
|
int32_t timeout , // in seconds
|
|
int64_t now ,
|
|
int32_t niceness ) {
|
|
// map loopback ip to our ip
|
|
uint32_t ip = endPoint->sin_addr.s_addr ;
|
|
if ( //!g_conf.m_interfaceMachine &&
|
|
ip == g_hostdb.getLoopbackIp() )
|
|
ip = g_hostdb.getMyIp();
|
|
connect ( proto ,
|
|
ip ,
|
|
ntohs ( endPoint->sin_port ) ,
|
|
host ,
|
|
hostId ,
|
|
transId ,
|
|
timeout ,
|
|
now ,
|
|
niceness );
|
|
}
|
|
|
|
// . call this after you make a new UdpSlot
|
|
// . make new slot using mcalloc() so it's zero'd out
|
|
// . NOTE: callback must be non-NULL if you're going to send a request
|
|
void UdpSlot::connect ( UdpProtocol *proto ,
|
|
uint32_t ip ,
|
|
uint16_t port ,
|
|
Host *host ,
|
|
int32_t hostId ,
|
|
int32_t transId ,
|
|
int32_t timeout , // in seconds
|
|
int64_t now ,
|
|
int32_t niceness ) {
|
|
// clear bufs
|
|
//m_sendBuf = NULL;
|
|
//m_readBuf = NULL;
|
|
// clear everything
|
|
//memset ( this , 0 , sizeof(UdpSlot) );
|
|
// . make async signal safe
|
|
// . TODO: this is slow to clear all those m_*bits, we got 1.7k of slot
|
|
//int32_t size = (uint32_t)m_tmpBuf - (uint32_t)this ;
|
|
//int32_t size = (uint32_t)&m_next - (uint32_t)this ;
|
|
//memset_ass ( (char *)this , 0 , size );
|
|
// avoid that heavy memset_ass() call using this logic.
|
|
// we will clear on demand using m_numBitsInitialized logic
|
|
// in UdpSlot.h
|
|
int32_t size = (char *)&m_sentBits2 - (char *)this ;
|
|
memset_ass ( (char *)this , 0 , size );
|
|
// store this info
|
|
m_proto = proto ;
|
|
m_ip = ip ; // keep in network order
|
|
m_port = port ; // keep in host order
|
|
m_host = host ;
|
|
m_hostId = hostId ;
|
|
m_transId = transId ;
|
|
m_timeout = timeout ;
|
|
m_niceness = niceness ;
|
|
// initialize our time of birth
|
|
m_startTime = now;
|
|
// reset this
|
|
m_queuedTime = -1;
|
|
// is it a local ip?
|
|
bool isLocal = false;
|
|
// int16_tcut
|
|
uint8_t *p = (uint8_t *)&m_ip;
|
|
// this is local
|
|
if ( p[0] == 10 ) isLocal = true;
|
|
// this is local
|
|
if ( p[0] == 192 && p[1] == 168 ) isLocal = true;
|
|
// if we match top two ips, then its local
|
|
if ( (m_ip&0x0000ffff) == (g_hostdb.m_myIp&0x0000ffff)) isLocal = true;
|
|
// loopback is local
|
|
if ( m_ip == 0x0100007f ) isLocal = true;
|
|
// . if we're sending to loopback make bigger
|
|
// . dns has its own max size (DNS_DGRAM_SIZE)
|
|
// . if we're going over the internet (interface machine)
|
|
// use a smaller DGRAM so it makes it
|
|
if ( ! m_proto->useAcks() )
|
|
m_maxDgramSize = DGRAM_SIZE_DNS;
|
|
else if ( //g_conf.m_interfaceMachine ||
|
|
// now that we use hosts2.conf so we can get link text
|
|
// via Msg20 from an external gb cluster, it need not come
|
|
// from the admin ip...
|
|
//( ! g_hostdb.isIpInNetwork ( ip ) &&
|
|
// g_conf.isAdminIp ( ip ) ) )
|
|
// this was 0000ffff but since we now use 10.5.*.* and
|
|
// 10.6.*.* i had to change that
|
|
! isLocal ) { // || ! g_hostdb.isIpInNetwork ( ip ) ) {
|
|
// i guess we use this
|
|
m_maxDgramSize = DGRAM_SIZE_INTERNET;
|
|
//char *xx=NULL; *xx=0;
|
|
}
|
|
//else if ( ip == g_hostdb.getMyIp() )
|
|
else if ( g_hostdb.isMyIp(ip) )
|
|
m_maxDgramSize = DGRAM_SIZE_LB;
|
|
else
|
|
m_maxDgramSize = DGRAM_SIZE;
|
|
}
|
|
|
|
void UdpSlot::resetConnect ( ) {
|
|
if ( //!g_conf.m_interfaceMachine &&
|
|
m_ip == g_hostdb.getLoopbackIp() )
|
|
m_ip = g_hostdb.getMyIp();
|
|
// . compute max dgram size
|
|
// . if we're sending to loopback make bigger
|
|
// . dns has its own max size (DNS_DGRAM_SIZE)
|
|
// . if we're going over the internet (interface machine)
|
|
// use a smaller DGRAM so it makes it
|
|
if ( ! m_proto->useAcks() )
|
|
m_maxDgramSize = DGRAM_SIZE_DNS;
|
|
else if ( //g_conf.m_interfaceMachine ||
|
|
// now that we use hosts2.conf so we can get link text
|
|
// via Msg20 from an external gb cluster, it need not come
|
|
// from the admin ip...
|
|
//( ! g_hostdb.isIpInNetwork ( ip ) &&
|
|
// g_conf.isAdminIp ( ip ) ) )
|
|
// this as 0x0000ffff but we use 10.5.* and 10.6.* addresses
|
|
(m_ip & 0x000000ff) != (g_hostdb.m_myIp & 0x000000ff) ||
|
|
! g_hostdb.isIpInNetwork ( m_ip ) ) {
|
|
m_maxDgramSize = DGRAM_SIZE_INTERNET;
|
|
char *xx=NULL;*xx=0;
|
|
}
|
|
//else if ( m_ip == g_hostdb.getMyIp() )
|
|
else if ( g_hostdb.isMyIp(m_ip) )
|
|
m_maxDgramSize = DGRAM_SIZE_LB;
|
|
else
|
|
m_maxDgramSize = DGRAM_SIZE;
|
|
// reset the slot
|
|
m_readBitsOn = 0;
|
|
m_sentBitsOn = 0;
|
|
m_readAckBitsOn = 0;
|
|
m_sentAckBitsOn = 0;
|
|
m_nextToSend = 0;
|
|
m_firstUnlitSentAckBit = 0;
|
|
m_numBitsInitialized = 0;
|
|
// for ( int32_t b = 0; b < m_dgramsToSend; b++ ) {
|
|
// clrBit(b, m_sentBits2);
|
|
// clrBit(b, m_readBits2);
|
|
// clrBit(b, m_sentAckBits2);
|
|
// clrBit(b, m_readAckBits2);
|
|
// }
|
|
// . set m_dgramsToSend
|
|
// . similar to UdpProtocol::getNumDgrams(char *dgram,int32_t dgramSize)
|
|
int32_t dataSpace = m_maxDgramSize ;
|
|
if ( m_proto->stripHeaders() )
|
|
dataSpace -= m_proto->getHeaderSize ( m_sendBufSize );
|
|
m_dgramsToSend = m_sendBufSize / dataSpace;
|
|
if ( m_sendBufSize % dataSpace != 0 ) m_dgramsToSend++;
|
|
// if msgSize was given as 0 force a dgram to be sent
|
|
if ( m_sendBufSize == 0 ) m_dgramsToSend = 1;
|
|
}
|
|
|
|
// . call this only AFTER calling connect() above
|
|
// . callback is non-NULL iff you're sending a request
|
|
// . callback is NULL ifd you're sending a reply
|
|
// . returns false and sets g_errno on error
|
|
bool UdpSlot::sendSetup ( char *msg ,
|
|
int32_t msgSize ,
|
|
char *alloc ,
|
|
int32_t allocSize ,
|
|
unsigned char msgType ,
|
|
int64_t now ,
|
|
void *state ,
|
|
void (*callback)(void *state, UdpSlot *slot) ,
|
|
int32_t niceness ,
|
|
int16_t backoff ,
|
|
int16_t maxWait ,
|
|
char *replyBuf ,
|
|
int32_t replyBufMaxSize ) {
|
|
|
|
// can't be too big
|
|
if ( msgSize / m_maxDgramSize + 1 >= MAX_DGRAMS ) {
|
|
int32_t maxMsgSize = m_maxDgramSize * MAX_DGRAMS;
|
|
log(LOG_LOGIC,"udp: Msg size of %" INT32 " bytes is too big "
|
|
"to send. Max dgram size = %" INT32 ". Max dgrams = "
|
|
"%" INT32 ". Max msg size = %" INT32 " msgtype=0x%hhx. Please "
|
|
"increase the #define MAX_DGRAMS in UdpSlot.h and "
|
|
"recompile to fix this.",
|
|
(int32_t)msgSize,(int32_t)m_maxDgramSize,
|
|
(int32_t)MAX_DGRAMS,maxMsgSize,
|
|
msgType);
|
|
//char *xx=NULL; *xx=0;
|
|
g_errno = EMSGTOOBIG;//EBADENGINEER;
|
|
return false;
|
|
//msgSize = MAX_DGRAMS * DGRAM_SIZE;
|
|
//sleep(50000);
|
|
//return false;
|
|
}
|
|
// get the timestamp in milliseconds
|
|
//int64_t now = gettimeofdayInMilliseconds();
|
|
// fill in the supplied parameters
|
|
m_sendBuf = msg;
|
|
m_sendBufSize = msgSize;
|
|
m_sendBufAllocSize = allocSize;
|
|
m_sendBufAlloc = alloc;
|
|
m_callback = callback;
|
|
m_state = state;
|
|
m_msgType = msgType;
|
|
m_lastSendTime = now;
|
|
m_lastReadTime = now;
|
|
m_niceness = niceness;
|
|
m_backoff = backoff;
|
|
m_maxWait = maxWait;
|
|
// . only set m_readBuf if we should
|
|
// . sendSetup() is called by slots sending a request
|
|
// . sendSetup() is called by slots sending a reply
|
|
// . so m_readBuf may have info in it if we're sending a reply so
|
|
// just don't NULLify it, it needs to be freed. This was causing
|
|
// a memleak for receivers of Msg0x01s
|
|
if ( replyBuf ) {
|
|
if ( m_readBuf ) {
|
|
g_errno = EBADENGINEER;
|
|
return log(LOG_LOGIC,"udp: Trying to initialize a udp "
|
|
"socket for sending, but its read buffer "
|
|
"is not empty.");
|
|
}
|
|
m_readBuf = replyBuf;
|
|
m_readBufSize = 0;
|
|
m_readBufMaxSize = replyBufMaxSize;
|
|
}
|
|
// we haven't sent anything yet so reset this to -1
|
|
m_firstSendTime = -1;
|
|
// creation time
|
|
//m_sendSetupCalled = now;
|
|
// set m_resendTime, based on m_resendCount and m_niceness
|
|
setResendTime();
|
|
// . set m_dgramsToSend
|
|
// . similar to UdpProtocol::getNumDgrams(char *dgram,int32_t dgramSize)
|
|
int32_t dataSpace = m_maxDgramSize ;
|
|
if ( m_proto->stripHeaders() )
|
|
dataSpace -= m_proto->getHeaderSize ( msgSize );
|
|
m_dgramsToSend = msgSize / dataSpace;
|
|
if ( msgSize % dataSpace != 0 ) m_dgramsToSend++;
|
|
// if msgSize was given as 0 force a dgram to be sent
|
|
if ( msgSize == 0 ) m_dgramsToSend = 1;
|
|
|
|
// send to particular ip, but not for pings
|
|
if ( m_msgType == 0x11 ) return true;
|
|
if ( ! m_host ) return true;
|
|
// inherit this from the last transactions
|
|
m_preferEth = m_host->m_preferEth;
|
|
// and set our ip accordingly
|
|
if ( m_host->m_preferEth == 1 ) m_ip = m_host->m_ipShotgun;
|
|
else m_ip = m_host->m_ip;
|
|
|
|
return true;
|
|
}
|
|
|
|
// resets a UdpSlot for a resend
|
|
void UdpSlot::prepareForResend ( int64_t now , bool resendAll ) {
|
|
// debug msg
|
|
//if ( g_conf.m_logDebugUdp )
|
|
// log(LOG_DEBUG,"udp: resending slot "
|
|
// "all=%" INT32 " "
|
|
// "tid=%" INT32 " "
|
|
// "dst=%s:%hu." ,
|
|
// (int32_t)resendAll ,
|
|
// (int32_t)m_transId ,
|
|
// iptoa(m_ip),
|
|
// (uint16_t)m_port);
|
|
// clear all if reset is true
|
|
if ( resendAll ) {
|
|
for ( int32_t i = 0 ; i < m_dgramsToSend ; i++ )
|
|
clrBit ( i , m_readAckBits2 );
|
|
m_readAckBitsOn = 0;
|
|
}
|
|
// how many sentBits we cleared
|
|
int32_t cleared = 0;
|
|
// clear each sent bit if it hasn't gotten an ACK
|
|
for ( int32_t i = 0 ; i < m_dgramsToSend ; i++ ) {
|
|
// continue if we already have an ack for this one
|
|
if ( isOn ( i , m_readAckBits2 ) ) continue;
|
|
// continue if it's already cleared
|
|
if ( ! isOn ( i , m_sentBits2 ) ) continue;
|
|
// mark dgram #i as unsent since we don't have ACK for it yet
|
|
clrBit ( i , m_sentBits2 );
|
|
// reduce the lit bit count
|
|
m_sentBitsOn--;
|
|
// may have to adjust m_nextToSend
|
|
if ( i < m_nextToSend ) m_nextToSend = i;
|
|
// count each cleared bit
|
|
cleared++;
|
|
}
|
|
|
|
// . if we were using eth0 try using eth1, and vice versa
|
|
// . those linksys switches seem to go down all the time and come
|
|
// back up after a few hours
|
|
// . only do this on the 2nd resend
|
|
if ( g_conf.m_useShotgun &&
|
|
// . only do this on 2nd resend.
|
|
// . MDW: no, i like flip flopping with each resend,
|
|
// it is like doing it in parallel
|
|
// m_resendCount == 1 &&
|
|
// need to be sending to a host in the network
|
|
m_host &&
|
|
// shotgun ip (eth1) must be different than eth0 ip
|
|
m_host->m_ip != m_host->m_ipShotgun &&
|
|
// pingserver.cpp sends to the exact ips it needs to
|
|
m_msgType != 0x11 ) {
|
|
// . were we using the eth0 ip? if so, switch to eth1
|
|
// . do not switch though if the ping is really bad for eth1
|
|
if ( m_preferEth == 0 && m_host->m_pingShotgun<3000 ){
|
|
// set m_ip to ip of eth1
|
|
m_ip = m_host->m_ipShotgun;
|
|
// this is now only used when sendSetup() is called
|
|
// for the start of sending a request/reply
|
|
m_host->m_preferEth = 1;
|
|
// use eth1 to talk to this guy for this tid
|
|
m_preferEth = 1;
|
|
// log it if changing
|
|
if ( g_conf.m_logDebugUdp )
|
|
logf(LOG_DEBUG,
|
|
"udp: switching to eth1 for host #%" INT32 " "
|
|
"tid=%" INT32 "", m_host->m_hostId,m_transId);
|
|
}
|
|
// . otherwise, we were using the eth1 (shotgun) ip
|
|
// . do not switch though if the ping is really bad for eth0
|
|
else if ( m_preferEth == 1 && m_host->m_ping < 3000 ) {
|
|
// set m_ip to ip of eth0
|
|
m_ip = m_host->m_ip;
|
|
// this is now only used when sendSetup() is called
|
|
// for the start of sending a request/reply
|
|
m_host->m_preferEth = 0;
|
|
// use eth0 to talk to this guy for this tid
|
|
m_preferEth = 0;
|
|
// log it
|
|
if ( g_conf.m_logDebugUdp )
|
|
logf(LOG_DEBUG,
|
|
"udp: switching to eth0 for host #%" INT32 " "
|
|
"tid=%" INT32 "",m_host->m_hostId,m_transId);
|
|
}
|
|
// . just some debug notes
|
|
// . this happens when host cores and both eth0 and eth1 r dead
|
|
//logf(LOG_DEBUG,"udp: not switching. preferEth=%" INT32 " "
|
|
// "pingSHotgun=%" INT32 " ping=%" INT32 "",(int32_t)m_host->m_preferEth,
|
|
// m_host->m_pingShotgun,m_host->m_ping);
|
|
}
|
|
|
|
// . tally the count
|
|
// . need to increment since won't resend to eth1 unless this is 2
|
|
m_resendCount++;
|
|
// debug msg
|
|
if ( g_conf.m_logDebugUdp ||
|
|
(g_conf.m_logDebugDns && !m_proto->useAcks()) )
|
|
logf(LOG_DEBUG,"udp: resending slot "
|
|
"all=%" INT32 " "
|
|
"tid=%" INT32 " "
|
|
"dst=%s:%hu "
|
|
"count=%" INT32 " "
|
|
"host=0x%" PTRFMT " "
|
|
"cleared=%" INT32 "" ,
|
|
(int32_t)resendAll ,
|
|
(int32_t)m_transId ,
|
|
iptoa(m_ip),//+9,
|
|
(uint16_t)m_port,
|
|
(int32_t)m_resendCount,
|
|
(PTRTYPE)m_host,
|
|
(int32_t)cleared);
|
|
// . after UdpServer::readTimeOutPoll() calls this prepareForResend()
|
|
// he then calls doSending()
|
|
// . but we cannot send unless the token is free or we're older (500ms)
|
|
// than the guy that has the token
|
|
// . therefore let's update the m_lastSentTime if we didn't send
|
|
// anything, just so readTimeoutPoll() quits calling us every time
|
|
m_lastSendTime = now;
|
|
// . don't increase our m_resendTime if we didn't resend anything
|
|
// . that way when the token is available or 500ms younger than us
|
|
// we won't be waiting 600ms until we can check that!
|
|
if ( cleared == 0 ) return;
|
|
// otheriwise, calculate how much time since our last send
|
|
// these values are computed, but not used, it seems as though
|
|
//the functionality was moved into setResendTime.
|
|
// int32_t max ;
|
|
// if ( m_maxWait >= 0 ) max = m_maxWait;
|
|
// else if ( m_niceness == 0 ) max = MAX_RESEND_0;
|
|
// else max = MAX_RESEND_1;
|
|
// // . how many milliseconds have we been trying to send to it?
|
|
// // . each retry doubles the previous (up to 30,000ms)
|
|
// int32_t elapsed = m_resendCount;
|
|
// for ( int32_t i = 0 ; i < m_resendCount ; i++ ) {
|
|
// int32_t step = m_resendCount << (i+1) ;
|
|
// // watch out for huge numbers
|
|
// if ( i >= 16 ) step = max;
|
|
// if ( step > max ) step = max;
|
|
// elapsed += step;
|
|
// }
|
|
// debug msg
|
|
//log("(resend) tripTime = %" INT32 "", m_resendTime );
|
|
// . we haven't gotten a read in m_resendTime milliseconds!
|
|
// . stamp host's times to show the affect of the slowdown
|
|
// . "timedOut" being true means host will only be stamped if it makes
|
|
// his avg ping time WORSE
|
|
// . use this when we didn't actually get a response from him
|
|
// . this is now handled by g_hostdb::pingHost()
|
|
//g_hostdb.stampHost( m_hostId , elapsed , true/*timedOut?*/ );
|
|
// tally the count
|
|
//m_resendCount++;
|
|
// update stats for this host for the PageHosts.cpp table
|
|
Host *h = m_host;
|
|
if ( ! h && m_hostId >= 0 ) h = g_hostdb.getHost ( m_hostId );
|
|
if ( h ) h->m_pingInfo.m_totalResends += cleared;
|
|
// . set the resend time based on m_resendCount and m_niceness
|
|
// . this typically doubles m_resendTime with each resendCount
|
|
setResendTime ();
|
|
}
|
|
|
|
void UdpSlot::setResendTime() {
|
|
// otherwise, calculate how much time since our last send
|
|
int32_t max ;
|
|
if ( m_maxWait >= 0 ) max = m_maxWait;
|
|
else if ( m_niceness == 0 ) max = MAX_RESEND_0;
|
|
else max = MAX_RESEND_1;
|
|
// if backoff not negative use that
|
|
if ( m_backoff >= 0 ) {
|
|
// compute resend time
|
|
int32_t bs = ( 1 << m_resendCount );
|
|
int32_t val = ((int32_t)m_backoff) * bs;
|
|
// check for overflow
|
|
if ( val < (int32_t)m_backoff )
|
|
m_resendTime = max;
|
|
else if ( val < bs )
|
|
m_resendTime = max;
|
|
else if ( bs < 0 )
|
|
m_resendTime = max;
|
|
else
|
|
m_resendTime = val;
|
|
// . don't exceed the max, though of .4 seconds
|
|
// . it's crucial to keep this fairly low because an old slot
|
|
// can only steal the token when a dgram or ack is read
|
|
// into that slot
|
|
if ( m_resendTime > max ) m_resendTime = max;
|
|
return;
|
|
}
|
|
// is it a local ip?
|
|
bool isLocal = false;
|
|
// int16_tcut
|
|
uint8_t *p = (uint8_t *)&m_ip;
|
|
// this is local
|
|
if ( p[0] == 10 ) isLocal = true;
|
|
// this is local
|
|
if ( p[0] == 192 && p[1] == 168 ) isLocal = true;
|
|
// if we match top two ips, then its local
|
|
if ( (m_ip&0x0000ffff) == (g_hostdb.m_myIp&0x0000ffff)) isLocal = true;
|
|
// loopback is local
|
|
if ( m_ip == 0x0100007f ) isLocal = true;
|
|
// . keep our resend times up-to-date
|
|
// . recompute a new resend time in milliseconds for the winning slot
|
|
// . we double,triple,... the deviation as our backoff scheme
|
|
// . get the avg/stdDev round trip times for this host from the hostmap
|
|
// . these times may change every time we receive an ACK for this host
|
|
// . the new resend time is like double
|
|
if ( m_niceness == 0 ) {
|
|
// if size is int16_t we typically use smaller resend time
|
|
if ( m_dgramsToSend <= 1 ) m_resendTime = RESEND_0_SHORT;
|
|
else m_resendTime = RESEND_0;
|
|
// save for checking for overflow
|
|
int32_t tt = m_resendTime;
|
|
// 30 ms resend time for starters for high priority slots
|
|
m_resendTime *= ( 1 << m_resendCount );
|
|
// watch out for overflow
|
|
if ( m_resendTime < tt ) m_resendTime = max;
|
|
// don't exceed the max, though of .4 seconds
|
|
if ( m_resendTime > max ) m_resendTime = max;
|
|
// quick and somewhat incorrect overflow check
|
|
if ( m_resendTime <= 0 ) m_resendTime = max;
|
|
}
|
|
else {
|
|
int32_t base = RESEND_1;
|
|
if ( isLocal ) base = RESEND_1_LOCAL;
|
|
m_resendTime = base * ( 1 << m_resendCount );
|
|
// watch out for overflow
|
|
if ( m_resendTime < base ) m_resendTime = max;
|
|
//try to prevent everyone from syncing up on
|
|
//a bogged down host when spidering.
|
|
m_resendTime += rand() % m_resendTime;
|
|
// don't exceed the max, though of 30 seconds
|
|
if ( m_resendTime > max ) m_resendTime = max;
|
|
// quick and somewhat incorrect overflow check
|
|
if ( m_resendTime <= 0 ) m_resendTime = max;
|
|
}
|
|
// add a rand amount of time to avoid collisions with
|
|
// other streams that will probably resend, max of 6 ms
|
|
//m_resendTime += s_incDelay;
|
|
// . inc up to 6 ms
|
|
// . this was a rand() statement, but that's not async signal safe
|
|
//if ( ++s_incDelay > 6 ) s_incDelay = 0;
|
|
// if we're dns protocol, always use resendTime of 4 seconds
|
|
if ( ! m_proto->useAcks() ) m_resendTime = 4000;
|
|
}
|
|
|
|
// . returns values:
|
|
// . -2 if nothing to send
|
|
// . -1 on error,
|
|
// . 0 if blocked,
|
|
// . 1 if completed sending a datagram/ACK
|
|
// . sets g_errno on error
|
|
// . this is only called by UdpServer::doSending()
|
|
// . we try to do ALL the reading before calling this so we can send
|
|
// many ACKs back in one packet
|
|
int32_t UdpSlot::sendDatagramOrAck ( int sock, bool allowResends, int64_t now ){
|
|
//log("sendDatagramOrAck");
|
|
// if acks we've sent isn't caught up to what we read, send an ack
|
|
if ( m_sentAckBitsOn < m_readBitsOn && m_proto->useAcks() )
|
|
return sendAck ( sock , now );
|
|
// we may have received an ack for an implied resend (from ack gap)
|
|
// so we clear some bits, but then got an ACK back later
|
|
while ( m_nextToSend < m_dgramsToSend &&
|
|
isOn ( m_nextToSend , m_sentBits2 ) )
|
|
m_nextToSend++;
|
|
// if we've sent it all return -2
|
|
if ( m_sentBitsOn >= m_dgramsToSend ) return -2;
|
|
// or if we hit the end of the road, but m_sentBitsOn is not full,
|
|
// then m_nextToSend must have been too high
|
|
if ( m_nextToSend >= m_dgramsToSend ) {
|
|
log(LOG_LOGIC,
|
|
"udp: senddatagramorack: m_nextToSend=%" INT32 " >= %" INT32 ". "
|
|
"Fixing it. Do not panic.",
|
|
m_nextToSend , m_dgramsToSend );
|
|
fixSlot();
|
|
return 1;
|
|
}
|
|
// get the ip
|
|
int32_t ip = m_ip;
|
|
// . if this is a send to our ip use the loopback interface
|
|
// . MTU is very high here
|
|
//if ( !g_conf.m_interfaceMachine && m_ip == g_hostdb.getMyIp() )
|
|
//if ( !g_conf.m_interfaceMachine && g_hostdb.isMyIp(m_ip) )
|
|
if ( g_hostdb.isMyIp(m_ip) )
|
|
ip = g_hostdb.getLoopbackIp();
|
|
// pick a dgram to send
|
|
int32_t dgramNum = m_nextToSend;
|
|
// debug msg
|
|
//log("setDgram");
|
|
// . store dgram #dgramNum from this send buf into "dgram"
|
|
// . let the protocol set the dgram from the m_sendBuf for us
|
|
char buf [ DGRAM_SIZE_CEILING ];
|
|
// should hold all headers
|
|
char saved [ 32 ];
|
|
// the header size
|
|
int32_t headerSize = m_proto->getHeaderSize(0);
|
|
// bitch if too big
|
|
if ( headerSize > 32 ) {
|
|
log(LOG_LOGIC,"udp: senddatagramorack: header size of %" INT32 " "
|
|
"is bigger than 32.",headerSize); return -1; }
|
|
// . now from here on we only use headerSize so we can strip the header
|
|
// . so if the protocol wants the headers, leave them in...
|
|
if ( ! m_proto->stripHeaders() ) headerSize = 0;
|
|
// offset into send buffer, the data to send
|
|
int32_t offset = dgramNum * ( m_maxDgramSize - headerSize );
|
|
// what should we send, and how much?
|
|
char *send = m_sendBuf + offset;
|
|
int32_t sendSize = m_sendBufSize - offset;
|
|
// truncate to max size of dgram we're allowed
|
|
if ( sendSize > m_maxDgramSize - headerSize )
|
|
sendSize = m_maxDgramSize - headerSize;
|
|
// where to store the dgram, header and data, assume "buf"
|
|
char *dgram = buf;
|
|
// size of dgram, header and data
|
|
int32_t dgramSize = headerSize + sendSize;
|
|
// if we're NOT the 1st dgram we can store into send buf directly
|
|
if ( dgramNum != 0 ) {
|
|
// where to store the header? right into send buf
|
|
dgram = send - headerSize;
|
|
// but save before overwriting
|
|
memcpy_ass ( saved , dgram , headerSize );
|
|
}
|
|
// store header into "dgram"
|
|
m_proto->setHeader ( dgram ,
|
|
m_sendBufSize ,
|
|
m_msgType ,
|
|
dgramNum ,
|
|
m_transId ,
|
|
m_callback , // weInitiated?
|
|
m_localErrno , // hadError?
|
|
m_niceness );
|
|
// . if we're the first dgram, we can't back up for the header...
|
|
// . copy data into dgram if we're the 1st dgram
|
|
if ( dgramNum == 0 )
|
|
memcpy_ass ( dgram + headerSize , send , sendSize );
|
|
//log("done set");
|
|
|
|
// if we are the proxy sending a udp packet to our flock, then make
|
|
// sure that we send to tmp cluster if we should
|
|
if ( g_proxy.isProxy() && g_conf.m_useTmpCluster && m_host )
|
|
m_port = m_host->m_port + 1;
|
|
else if ( m_host )
|
|
m_port = m_host->m_port ;
|
|
|
|
// we need a destination stored in a sockaddr for passing to sendto()
|
|
// get sending info from the send control slot (network order)
|
|
// TODO: ensure network order
|
|
struct sockaddr_in to;
|
|
to.sin_family = AF_INET;
|
|
// never use shotgun network if turned off...
|
|
if ( ! g_conf.m_useShotgun ) s_shotgunBit = 0;
|
|
// i am turning this flip flop stuff off for now and using the
|
|
// shotgun network as an emergency back up (see below for "shotgun")
|
|
// because now since we are fully split we have no need for huge
|
|
// amount of internal bandwidth and besides that it was very cpu
|
|
// intensive to send dgrams because the kernel sucks for that! MDW
|
|
s_shotgunBit = 0;
|
|
//to.sin_addr.s_addr = htonl ( (uint32_t ) (m_ip ) );
|
|
// are we sending to loopback? if so, treat as eth0.
|
|
if ( ip == 0x0100007f ) { // "127.0.0.1"
|
|
to.sin_addr.s_addr = ip;
|
|
to.sin_port = htons ( m_port );
|
|
// update stats, just put them all in g_udpServer
|
|
g_udpServer.m_eth0PacketsOut += 1;
|
|
g_udpServer.m_eth0BytesOut += dgramSize;
|
|
}
|
|
// . shotgun toggle this
|
|
// . do not do shotgun if sending to host in hosts2.conf
|
|
else if ( m_host && s_shotgunBit && m_host->m_hostdb == &g_hostdb ) {
|
|
to.sin_addr.s_addr = m_host->m_ipShotgun;
|
|
to.sin_port = htons ( m_port );
|
|
|
|
// don't fuck with it if we are ping though, because that
|
|
// needs to specify the exact ip!
|
|
if ( m_msgType == 0x11 ) to.sin_addr.s_addr = ip;
|
|
|
|
//m_host->m_shotgunBit = 0;
|
|
s_shotgunBit = 0;
|
|
// update stats, just put them all in g_udpServer
|
|
g_udpServer.m_eth1PacketsOut += 1;
|
|
g_udpServer.m_eth1BytesOut += dgramSize;
|
|
}
|
|
// do not do shotgun if sending to host in hosts2.conf
|
|
else if ( m_host && m_host->m_hostdb == &g_hostdb ) {
|
|
// we now pick ip based on this. if we fail to get a timely ACK
|
|
// then we set switch eth preferences. helps when a switch
|
|
// crashes.
|
|
if ( m_preferEth == 1 )
|
|
to.sin_addr.s_addr = m_host->m_ipShotgun;
|
|
else
|
|
to.sin_addr.s_addr = m_host->m_ip;
|
|
|
|
// don't fuck with it if we are ping though, because that
|
|
// needs to specify the exact ip!
|
|
if ( m_msgType == 0x11 ) to.sin_addr.s_addr = ip;
|
|
|
|
to.sin_port = htons ( m_port );
|
|
//if ( m_host ) m_host->m_shotgunBit = 1;
|
|
//if ( m_host ) s_shotgunBit = 1;
|
|
// flip the network every other packet we send
|
|
s_shotgunBit = 1;
|
|
// update stats, just put them all in g_udpServer
|
|
g_udpServer.m_eth0PacketsOut += 1;
|
|
g_udpServer.m_eth0BytesOut += dgramSize;
|
|
}
|
|
else {
|
|
// count packets to/from hosts outside the cluster separately
|
|
// these guys are importing link text usually
|
|
to.sin_addr.s_addr = ip;
|
|
to.sin_port = htons ( m_port );
|
|
g_udpServer.m_outsiderPacketsOut += 1;
|
|
g_udpServer.m_outsiderBytesOut += dgramSize;
|
|
}
|
|
|
|
// not async signal safe
|
|
//bzero ( &(to.sin_zero) , 8 );
|
|
memset_ass ( (char *)&(to.sin_zero), 0 , 8 );
|
|
// debug msg
|
|
//log("sendto");
|
|
// debug msg
|
|
//log("sending dgram of size=%" INT32 " (max=%" INT32 ")",dgramSize,m_maxDgramSize);
|
|
// . this socket should be non-blocking (i.e. return immediately)
|
|
// . this should set g_errno on error!
|
|
int bytesSent = sendto ( sock ,
|
|
dgram ,
|
|
dgramSize ,
|
|
0 , // makes dns fail->MSG_DONTROUTE ,
|
|
(struct sockaddr *)&to ,
|
|
sizeof ( to ) );
|
|
// restore what we overwrote
|
|
if ( dgramNum != 0 ) memcpy_ass ( dgram , saved , headerSize );
|
|
// debug msg
|
|
//log("back");
|
|
// return -1 on error or 0 if blocked
|
|
if ( bytesSent < 0 ) {
|
|
// copy errno to g_errno
|
|
g_errno = errno;
|
|
if ( g_errno == EAGAIN ) { g_errno = 0; return 0;}
|
|
#ifdef _VALGRIND_
|
|
// interrupted system call?
|
|
if ( g_errno == 4 ) { g_errno = 0; return 0; }
|
|
#endif
|
|
// not in linux
|
|
// . "output queue for a network interface was full"
|
|
// . however, linux just silently drops packets!!!!!!!
|
|
// . i think using more than 1GB in this process brings this
|
|
// problem up, the kernel's kmalloc fails...
|
|
if ( g_errno == ENOBUFS ) {
|
|
// log it once every 3 seconds so they know
|
|
static int32_t s_lastTime = 0;
|
|
static int32_t s_count = 0;
|
|
int32_t t = getTime();
|
|
if ( t - s_lastTime > 3 ||
|
|
s_lastTime - t > 3 ) { // clock skew?
|
|
s_lastTime = getTime();
|
|
log("udp: got ENOBUFS kernel bug %" INT32 " times.",
|
|
++s_count);
|
|
}
|
|
//g_errno = 0;
|
|
//return 0;
|
|
return -1;
|
|
}
|
|
// log the error
|
|
log("udp: Call to sendto had error (ignoring): %s.",
|
|
mstrerror(g_errno)) ;
|
|
// . now immediately switch the eth port to see if that helps!
|
|
// . actually, just pretend we sent it. we won't get an ack
|
|
// and the resend algo will switch ports
|
|
//return -1;
|
|
bytesSent = dgramSize;
|
|
}
|
|
// this should not happen
|
|
if ( bytesSent != dgramSize ) {
|
|
g_errno = EBADENGINEER;
|
|
log("udp: sendto only sent %i bytes, not %" INT32 ". Undersend.",
|
|
bytesSent,dgramSize);
|
|
return -1;
|
|
}
|
|
// general count
|
|
if ( m_niceness == 0 ) g_stats.m_packetsOut[m_msgType][0]++;
|
|
else g_stats.m_packetsOut[m_msgType][1]++;
|
|
// keep stats
|
|
if ( m_host ) m_host->m_dgramsTo++;
|
|
// sending to loopback, 127.0.0.1?
|
|
else if ( ip == 0x0100007f ) g_hostdb.getMyHost()->m_dgramsTo++;
|
|
// keep track of dgrams sent outside of our cluster
|
|
//else g_stats.m_dgramsToStrangers++;
|
|
// get time now
|
|
//int64_t now = gettimeofdayInMilliseconds();
|
|
// . if it's our first, mark this for g_stats UDP_*_OUT_BPS
|
|
// . sendSetup() will set m_firstSendTime to -1
|
|
if (m_sentBitsOn == 0 && m_firstSendTime == -1) m_firstSendTime =now;
|
|
// mark this dgram as sent
|
|
setBit ( dgramNum , m_sentBits2 );
|
|
// count the bit we lit
|
|
m_sentBitsOn++;
|
|
// update last send time stamp even if we're a resend
|
|
m_lastSendTime = now;
|
|
// update m_nextToSend
|
|
m_nextToSend = getNextUnlitBit ( dgramNum, m_sentBits2,m_dgramsToSend);
|
|
// log network info
|
|
if ( g_conf.m_logDebugUdp ) {
|
|
//int32_t shotgun = 0;
|
|
//if ( g_conf.m_useShotgun && ! s_shotgunBit ) shotgun = 1;
|
|
//if ( g_conf.m_useShotgun && s_useShotgunIp ) shotgun = 1;
|
|
int32_t eth = 1;
|
|
if ( m_host && m_host->m_ip == to.sin_addr.s_addr ) eth = 0;
|
|
// if sending outside, always use eth0
|
|
if ( ! m_host ) eth = 0;
|
|
//if ( m_host->m_ip == (uint32_t)ip ) eth = 0;
|
|
int32_t hid = -1;
|
|
if ( m_host && m_host->m_hostdb == &g_hostdb )
|
|
hid = m_host->m_hostId;
|
|
//#ifdef _UDPDEBUG_
|
|
//if ( ! m_proto->useAcks() ) {
|
|
int32_t kk = 0; if ( m_callback ) kk = 1;
|
|
log(LOG_DEBUG,
|
|
"udp: sent dgram "
|
|
"dgram=%" INT32 " "
|
|
"dgrams=%" INT32 " "
|
|
"msg=0x%hx "
|
|
"tid=%" INT32 " "
|
|
"dst=%s:%hu "
|
|
"eth=%" INT32 " "
|
|
"init=%" INT32 " "
|
|
"age=%" INT32 " "
|
|
"dsent=%" INT32 " "
|
|
"aread=%" INT32 " "
|
|
"len=%" INT32 " "
|
|
"msgSz=%" INT32 " "
|
|
"cnt=%" INT32 " "
|
|
"wait=%" INT32 " "
|
|
"error=%" INT32 " "
|
|
"k.n1=%" UINT32 " n0=%" UINT64 " "
|
|
"maxdgramsz=%" INT32 " "
|
|
"hid=%" INT32 "",
|
|
(int32_t)dgramNum,
|
|
(int32_t)m_dgramsToSend,
|
|
(int16_t)m_msgType,
|
|
(int32_t)m_transId,
|
|
//iptoa(m_ip),//+9,
|
|
iptoa(to.sin_addr.s_addr),
|
|
(uint16_t)m_port,
|
|
eth,//shotgun,
|
|
(int32_t)kk ,
|
|
(int32_t)(now-m_startTime) ,
|
|
(int32_t)m_sentBitsOn ,
|
|
(int32_t)m_readAckBitsOn ,
|
|
(int32_t)bytesSent ,
|
|
(int32_t)m_sendBufSize,
|
|
(int32_t)m_resendCount,
|
|
(int32_t)m_resendTime ,
|
|
(int32_t)m_localErrno ,
|
|
m_key.n1,m_key.n0 ,
|
|
m_maxDgramSize ,
|
|
hid );
|
|
//}
|
|
//#endif
|
|
}
|
|
// bail now if we're a re-send
|
|
if ( m_resendCount > 0 ) return 1;
|
|
// to save UdpSlot mem we only track every 8th dgram
|
|
if ( (dgramNum & 0x07) != 0 ) return 1;
|
|
// set the time
|
|
//setSendTime ( dgramNum >> 3 , now );
|
|
// return 1 cuz we didn't block
|
|
return 1;
|
|
}
|
|
|
|
// assume m_readBits2, m_sendBits2, m_sentAckBits2 and m_readAckBits2 are
|
|
// correct and update m_firstUnlitSentAckBit, m_sentAckBitsOn, m_readBitsOn,
|
|
// m_readAckBitsOn and m_sentBitsOn
|
|
void UdpSlot::fixSlot ( ) {
|
|
// log it
|
|
log(LOG_LOGIC,
|
|
"udp: before fixSlot(): "
|
|
"m_readBitsOn=%" INT32 " "
|
|
"m_readAckBitsOn=%" INT32 " "
|
|
"m_sentBitsOn=%" INT32 " "
|
|
"m_sentAckBitsOn=%" INT32 " "
|
|
"m_firstUnlitSentAckBit=%" INT32 " "
|
|
"m_nextToSend=%" INT32 " " ,
|
|
m_readBitsOn,
|
|
m_readAckBitsOn,
|
|
m_sentBitsOn,
|
|
m_sentAckBitsOn,
|
|
m_firstUnlitSentAckBit,
|
|
m_nextToSend );
|
|
|
|
m_readBitsOn = 0;
|
|
m_readAckBitsOn = 0;
|
|
m_sentBitsOn = 0;
|
|
m_sentAckBitsOn = 0;
|
|
for ( int32_t i = 0 ; i < m_dgramsToRead ; i++ ) {
|
|
if ( isOn ( i , m_readBits2 ) ) m_readBitsOn++;
|
|
// we send back an ack for every dgram read
|
|
if ( isOn ( i , m_sentAckBits2 ) ) m_sentAckBitsOn++;
|
|
}
|
|
for ( int32_t i = 0 ; i < m_dgramsToSend ; i++ ) {
|
|
if ( isOn ( i , m_sentBits2 ) ) m_sentBitsOn++;
|
|
// we must read an ack for every dgram sent
|
|
if ( isOn ( i , m_readAckBits2 ) ) m_readAckBitsOn++;
|
|
}
|
|
|
|
// start at bit #0 so this doesn't loop forever
|
|
m_firstUnlitSentAckBit=getNextUnlitBit(-1,m_sentAckBits2,m_dgramsToRead);
|
|
m_nextToSend =getNextUnlitBit(-1,m_sentBits2 ,m_dgramsToSend);
|
|
|
|
log(LOG_LOGIC,
|
|
"udp: after fixSlot(): "
|
|
"m_readBitsOn=%" INT32 " "
|
|
"m_readAckBitsOn=%" INT32 " "
|
|
"m_sentBitsOn=%" INT32 " "
|
|
"m_sentAckBitsOn=%" INT32 " "
|
|
"m_firstUnlitSentAckBit=%" INT32 " "
|
|
"m_nextToSend=%" INT32 " " ,
|
|
m_readBitsOn,
|
|
m_readAckBitsOn,
|
|
m_sentBitsOn,
|
|
m_sentAckBitsOn,
|
|
m_firstUnlitSentAckBit,
|
|
m_nextToSend );
|
|
}
|
|
|
|
// . this should be called only after read poll has nothing left to read so
|
|
// we can combine many ACKs into one mega ACK and save packets per second
|
|
// on the network (reduce by half?)
|
|
// . returns values:
|
|
// . -2 if nothing to send
|
|
// . -1 on error,
|
|
// . 0 if blocked,
|
|
// . 1 if completed sending a datagram/ACK
|
|
// . if we Initiated is the default -2, then we use m_callback to determine
|
|
// if we initiated the transaction or not
|
|
// . if m_callback is NULL we did NOT intiate the transaction
|
|
// . we should only be called if m_sentAckBitsOn < m_readBitsOn, i.e.
|
|
// when we're not caught up with ACKing with what we've read
|
|
int32_t UdpSlot::sendAck ( int sock , int64_t now ,
|
|
int32_t dgramNum , int32_t weInitiated ,
|
|
bool cancelTrans ) {
|
|
// protection from garbled dgrams
|
|
if ( dgramNum >= MAX_DGRAMS ) {
|
|
log(LOG_LOGIC,
|
|
"udp: Sending ack for dgram #%" INT32 " > max dgram of %" INT32 ".",
|
|
dgramNum,(int32_t)MAX_DGRAMS); return 1; }
|
|
// remember if forced or not
|
|
//int32_t forced = dgramNum;
|
|
// if this was not supplied, look at m_callback to determine it
|
|
if ( weInitiated == -2 ) {
|
|
if ( m_callback ) weInitiated = 1;
|
|
else weInitiated = 0;
|
|
}
|
|
// a little dgram buffer
|
|
char dgram[DGRAM_SIZE_CEILING];
|
|
|
|
// . if dgramNum is -1, send the next ack in line
|
|
// . it's the first bit in m_sentAckBits2 that is 0 while being
|
|
// lit in m_readBits
|
|
if ( dgramNum == -1 ) {
|
|
// m_firstUnlitSentAckBit is the first clr bit in m_sentAckBits
|
|
dgramNum = m_firstUnlitSentAckBit;
|
|
// . now find the first bit in m_sentAckBits2 that is off
|
|
// yet on in m_readBits
|
|
// . the OLD statement below didn't check to see if dgramNum is
|
|
// then off in m_sentAckBits!!!
|
|
// . let's do it custom then!
|
|
// . we know that m_sentAckBitsOn < m_readBitsOn so the
|
|
// we must find a bit with these properties
|
|
for ( ; dgramNum < m_dgramsToRead ; dgramNum++ ) {
|
|
// if bit off in m_readBits2, it's not an ACK candidate
|
|
if(!isOn(dgramNum,m_readBits2))continue;
|
|
// if bit is off in m_sentAckBits2, that's the one!
|
|
if(!isOn(dgramNum,m_sentAckBits2))break;
|
|
}
|
|
// if we had no match, that's an error!
|
|
if ( dgramNum >= m_dgramsToRead ) {
|
|
log(LOG_LOGIC,
|
|
"udp: Sending ack for dgram #%" INT32 " which is passed "
|
|
"the number of dgrams we have to read, %" INT32 ". "
|
|
"Fixing. Do not panic.",
|
|
dgramNum , m_dgramsToRead );
|
|
fixSlot();
|
|
//char *xx = NULL; *xx = 0;
|
|
//sleep(50000);
|
|
//return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
// . ask the protocol class to make an ACK for us and store in "dgram"
|
|
// . we initiated the transaction if our callback is non-NULL
|
|
int32_t dgramSize = m_proto->makeAck ( dgram ,
|
|
dgramNum ,
|
|
m_transId ,
|
|
weInitiated ,
|
|
cancelTrans );
|
|
// get the ip
|
|
uint32_t ip = m_ip;
|
|
// . if this is a send to our ip use the loopback interface
|
|
// . MTU is very high here
|
|
//if ( !g_conf.m_interfaceMachine &&
|
|
// m_ip == g_hostdb.getMyIp() )
|
|
//if ( !g_conf.m_interfaceMachine && g_hostdb.isMyIp(m_ip) )
|
|
if ( g_hostdb.isMyIp(m_ip) )
|
|
ip = g_hostdb.getLoopbackIp();
|
|
|
|
// if we are the proxy sending a udp packet to our flock, then make
|
|
// sure that we send to tmp cluster if we should
|
|
if ( g_proxy.isProxy() && g_conf.m_useTmpCluster && m_host )
|
|
m_port = m_host->m_port + 1;
|
|
else if ( m_host )
|
|
m_port = m_host->m_port ;
|
|
|
|
// get the ip address of dest. host from the slot
|
|
struct sockaddr_in to;
|
|
to.sin_family = AF_INET;
|
|
to.sin_addr.s_addr = ip;
|
|
to.sin_port = htons ( m_port );
|
|
|
|
// . respect the shotgun. ping though does not! (0x11)
|
|
// . NO! send ack back on the same eth port as the request we recvd
|
|
//if ( m_host && m_msgType != 0x11 ) {
|
|
// // send ack back on the same eth port as the request we recvd
|
|
// if ( m_host->m_preferEth == 1 )
|
|
// to.sin_addr.s_addr = m_host->m_ipShotgun;
|
|
// else
|
|
// to.sin_addr.s_addr = m_host->m_ip;
|
|
//}
|
|
|
|
// not async sig safe
|
|
//bzero ( &(to.sin_zero) , 8 );
|
|
memset_ass ( (char *)&(to.sin_zero), 0 , 8 );
|
|
// stat count
|
|
if ( cancelTrans ) g_cancelAcksSent++;
|
|
// . this socket should be non-blocking (i.e. return immediately)
|
|
// . this should set g_errno on error
|
|
int bytesSent = sendto ( sock ,
|
|
dgram ,
|
|
dgramSize ,
|
|
0 ,
|
|
(struct sockaddr *)&to ,
|
|
sizeof ( to ) );
|
|
// return -1 on error, 0 if blocked
|
|
if ( bytesSent < 0 ) {
|
|
// copy errno to g_errno
|
|
g_errno = errno;
|
|
if ( g_errno == EAGAIN ) { g_errno = 0; return 0; }
|
|
if ( g_errno == ENOBUFS ) { g_errno = 0; return 0; }
|
|
#ifdef _VALGRIND_
|
|
// interrupted system call
|
|
if ( g_errno == 4 ) { g_errno = 0; return 0; }
|
|
#endif
|
|
log("udp: error sending ack: %s",mstrerror(g_errno));
|
|
return -1;
|
|
}
|
|
// this should not happen
|
|
if ( bytesSent != dgramSize ) {
|
|
g_errno = EBADENGINEER;
|
|
log("udp: sendto only sent %i bytes, not %" INT32 ". Undersend.",
|
|
bytesSent,dgramSize);
|
|
//sleep(50000);
|
|
return -1;
|
|
}
|
|
// general count
|
|
if ( m_niceness == 0 ) g_stats.m_packetsOut[m_msgType][0]++;
|
|
else g_stats.m_packetsOut[m_msgType][1]++;
|
|
// we were an ack
|
|
if ( m_niceness == 0 ) g_stats.m_acksOut[m_msgType][0]++;
|
|
else g_stats.m_acksOut[m_msgType][1]++;
|
|
// keep stats
|
|
if ( m_host ) m_host->m_dgramsTo++;
|
|
// sending to loopback, 127.0.0.1?
|
|
else if ( ip == 0x0100007f ) g_hostdb.getMyHost()->m_dgramsTo++;
|
|
if ( ! isOn ( dgramNum , m_sentAckBits2 ) ) {
|
|
// mark this ack as sent
|
|
setBit ( dgramNum , m_sentAckBits2 );
|
|
// count the bit we lit
|
|
m_sentAckBitsOn++;
|
|
}
|
|
// update last send time stamp even if we're a resend
|
|
m_lastSendTime = now; // gettimeofdayInMilliseconds();
|
|
// . dgramNum should neveber <, though
|
|
// . but this can happen if we're hot (signal handler)??? how???
|
|
if ( dgramNum < m_firstUnlitSentAckBit ) {
|
|
g_errno = EBADENGINEER;
|
|
log(LOG_LOGIC,
|
|
"udp: Sending ack for dgram #%" INT32 " which should have "
|
|
"already been sent. Next ack to send should be for dgram "
|
|
"# %" INT32 ". Fixing. Do not panic.",
|
|
dgramNum , m_firstUnlitSentAckBit );
|
|
//char *xx = NULL; *xx = 0;
|
|
fixSlot();
|
|
return 1;
|
|
}
|
|
// . only update m_firstUnlitSentAckBit if we dgramNum was
|
|
// the first unlit bit in m_sentAckBits
|
|
// . otherwise, we had a read hole so we had to skip dgramNum around
|
|
if (dgramNum <= m_firstUnlitSentAckBit)
|
|
m_firstUnlitSentAckBit = getNextUnlitBit(dgramNum,
|
|
m_sentAckBits2,
|
|
m_dgramsToRead);
|
|
// log msg
|
|
if ( g_conf.m_logDebugUdp ) { // || cancelTrans ) {
|
|
//#ifdef _UDPDEBUG_
|
|
int32_t kk = 0; if ( m_callback ) kk = 1;
|
|
int32_t hid = -1;
|
|
if ( m_host && m_host->m_hostdb == &g_hostdb )
|
|
hid = m_host->m_hostId;
|
|
logf(LOG_DEBUG,
|
|
"udp: sent ACK "
|
|
"dgram=%" INT32 " "
|
|
"msg=0x%hx "
|
|
"tid=%" INT32 " "
|
|
"src=%s:%hu "
|
|
"init=%" INT32 " "
|
|
"age=%" INT32 " "
|
|
"cancel=%" INT32 " "
|
|
"dread=%" INT32 " "
|
|
"asent=%" INT32 " "
|
|
"hid=%" INT32 "",
|
|
(int32_t)dgramNum,
|
|
(int16_t)m_msgType ,
|
|
(int32_t)m_transId,
|
|
iptoa(m_ip),//+9 ,
|
|
(uint16_t)m_port,
|
|
(int32_t)kk ,
|
|
(int32_t)(gettimeofdayInMilliseconds() - m_startTime) ,
|
|
(int32_t)cancelTrans,
|
|
(int32_t)m_readBitsOn ,
|
|
(int32_t)m_sentAckBitsOn ,
|
|
hid);
|
|
//#endif
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
// . returns false and sets g_errno on error, true otherwise
|
|
// . if the read dgram had an error code we set g_errno to that and ret false
|
|
// . anyone calling this should call sendDatagramOrAck() immediately afterwards
|
|
// in case the send was blocking on receiving an ACK or we should send an ACK
|
|
// . updates: m_readBits2, m_readBitsOn, m_sentAckBits2, m_sentAckBitsOn
|
|
// m_firstUnlitSentAckBit
|
|
bool UdpSlot::readDatagramOrAck ( int sock ,
|
|
char *peek ,
|
|
int32_t peekSize,
|
|
int64_t now ,
|
|
bool *discard ,
|
|
int32_t *readSize ) {
|
|
// assume discard
|
|
*discard = true;
|
|
// get dgram Number
|
|
int32_t dgramNum = m_proto->getDgramNum ( peek , peekSize );
|
|
// protection from garbled dgrams
|
|
if ( dgramNum >= MAX_DGRAMS ) {
|
|
log(LOG_LOGIC,
|
|
"udp: Reading for dgram #%" INT32 " > max dgram of %" INT32 ".",
|
|
dgramNum,(int32_t)MAX_DGRAMS);
|
|
return true;
|
|
}
|
|
// was it a cancel signal?
|
|
if ( m_proto->isCancelTrans ( peek , peekSize ) ) {
|
|
//if ( g_conf.m_logDebugUdp )
|
|
//logf(LOG_INFO,//LOG_DEBUG,
|
|
log(LOG_DEBUG,
|
|
"udp: Read cancel ack hdrlen=%" INT32 " tid=%" INT32 " "
|
|
"src=%s:%hu msgType=0x%hhx weInitiated=%" PTRFMT " "
|
|
"sent=%" INT32 " "
|
|
"sendbufalloc=%" PTRFMT " sendbufsize=%" UINT32 "",
|
|
peekSize , m_proto->getTransId ( peek,peekSize ),
|
|
iptoa(m_ip),m_port,
|
|
m_proto->getMsgType(peek,peekSize),
|
|
(PTRTYPE)m_callback,
|
|
m_sentBitsOn,
|
|
(PTRTYPE)m_sendBufAlloc,
|
|
(uint32_t)m_sendBufSize);
|
|
// stat count
|
|
g_cancelAcksRead++;
|
|
// what happens is that if we are handling a request and we
|
|
// try to send back the reply on this slot, it will have been
|
|
// destroyed by a call to makeCallbacks(). but really the
|
|
// purpose is to avoid sending large termlists back and
|
|
// wasting network bandwidth, so let's just avoid this if
|
|
// we are not IN THE MIDDLE OF doing a large send. when we
|
|
// start the send it will probably send us another cancel ack
|
|
// and we can abort it then. before, this was causing Msg20
|
|
// to crash because the requester would send us a cancel ack
|
|
// and destroy the slot that msg20 would try to send its reply
|
|
// on. It's reply was delayed and when it finally came round
|
|
// the slot was destroyed...
|
|
// hey, m_sentBitsOn can be non-zero even if we haven't sent
|
|
// anything because m_sentBits[i] gets forced on below if we
|
|
// read an ack...
|
|
if ( m_sentBitsOn <= 0 ) return true;
|
|
// sometimes it points to a separate send buffer
|
|
//if ( ! m_sendBufAlloc ) return true;
|
|
// msg1 sends back an empty reply (0 bytes) so we need to
|
|
// check m_resendCount as well, because if we have never
|
|
// generated a reply it should be 0! we are having problems
|
|
// with acks getting dropped on the floor and the reply
|
|
// keeps getting re-sent over and over, and the received
|
|
// cancel acks are ignore because msg1 has a 0 byte reply...
|
|
if ( ! m_sendBufSize && m_resendCount<=0 ) return true;
|
|
// record if we cancelled it. how many cancel acks we read!
|
|
if ( m_niceness == 0 ) g_stats.m_cancelRead[m_msgType][0]++;
|
|
else g_stats.m_cancelRead[m_msgType][1]++;
|
|
// force to be done so UdpServer::makeCallback() will close it
|
|
m_dgramsToRead = 1;
|
|
m_dgramsToSend = 1;
|
|
m_readBitsOn = 1;
|
|
m_sentBitsOn = 1;
|
|
m_readAckBitsOn = 1;
|
|
m_sentAckBitsOn = 1;
|
|
// assume the receiver ran out of memory
|
|
m_errno = ENOMEM;
|
|
return true;
|
|
}
|
|
// handle acks
|
|
if ( m_proto->isAck ( peek , peekSize ) ) {
|
|
// if ack for msg4 core to test its save stuff
|
|
//if ( m_msgType == 0x04 ) { char *xx=NULL;*xx=0; }
|
|
readAck ( sock, dgramNum , now );
|
|
// keep stats
|
|
if ( m_host ) m_host->m_dgramsFrom++;
|
|
// reading from loopback, 127.0.0.1?
|
|
else if (m_ip==0x0100007f)g_hostdb.getMyHost()->m_dgramsFrom++;
|
|
return true;
|
|
}
|
|
// . now we have a regular dgram to process
|
|
// . get the timestamp in microseconds
|
|
// . change to g_now cuz this has to be async safe
|
|
//int64_t now = gettimeofdayInMilliseconds();
|
|
// log msg
|
|
if ( g_conf.m_logDebugUdp ) {
|
|
int32_t hid = -1;
|
|
if ( m_host && m_host->m_hostdb == &g_hostdb )
|
|
hid = m_host->m_hostId;
|
|
//#ifdef _UDPDEBUG_
|
|
//if ( ! m_proto->useAcks() ) {
|
|
int32_t kk = 0; if ( m_callback ) kk = 1;
|
|
log(LOG_DEBUG,
|
|
"udp: Read dgram "
|
|
"dgram=%" INT32 " "
|
|
"msg=0x%hx "
|
|
"tid=%" INT32 " "
|
|
"src=%s:%hu "
|
|
"init=%" INT32 " "
|
|
"age=%" INT32 " "
|
|
"dread=%" INT32 " "
|
|
"asent=%" INT32 " "
|
|
//"len=%" INT32 " "
|
|
"msgSz=%" INT32 " "
|
|
"error=%" INT32 " "
|
|
"hid=%" INT32 "",
|
|
(int32_t)dgramNum,
|
|
(int16_t)m_proto->getMsgType(peek,peekSize),
|
|
(int32_t)m_proto->getTransId(peek,peekSize),
|
|
iptoa(m_ip),
|
|
(uint16_t)m_port,
|
|
(int32_t)kk,
|
|
(int32_t)(gettimeofdayInMilliseconds() - m_startTime) ,
|
|
(int32_t)m_readBitsOn ,
|
|
(int32_t)m_sentAckBitsOn ,
|
|
//(int32_t)peekSize ,
|
|
(int32_t)m_proto->getMsgSize(peek,peekSize) ,
|
|
(int32_t)(m_proto->hadError(peek,peekSize)),
|
|
hid);
|
|
// }
|
|
//#endif
|
|
}
|
|
// update time of last read
|
|
m_lastReadTime = g_now;
|
|
// if it's passing us an g_errno then set our g_errno from it
|
|
if ( m_proto->hadError ( peek , peekSize ) ) {
|
|
// bitch if not dgramNum #0
|
|
if ( dgramNum != 0 )
|
|
log(LOG_LOGIC,"udp: Error dgram is not dgram #0.");
|
|
// it's new to us, set the read bits
|
|
setBit ( dgramNum, m_readBits2 );
|
|
// we read one dgram
|
|
m_readBitsOn = 1;
|
|
// only one dgram to read
|
|
m_dgramsToRead = 1;
|
|
// tell caller we haven't read anything
|
|
m_readBufSize = 0;
|
|
// . but set the remote error bit so we know it's not local
|
|
// . why? this was messing up g_errno interp. in Multicast!
|
|
g_errno = m_proto->getErrno(peek,peekSize);//|REMOTE_ERROR_BIT;
|
|
// return false cuz this was a remote-side error
|
|
return false;
|
|
}
|
|
// . if he's sending a REPLY then set all of our m_readAckBits
|
|
// because he must have sent ACKs (or tried) for all dgrams in the
|
|
// request
|
|
// . did we initiate?
|
|
// . AND did we miss some ACKs he sent to us?
|
|
if ( m_callback && m_readAckBitsOn != m_dgramsToSend ) {
|
|
// catch em all up
|
|
for ( int32_t i = 0 ; i < m_dgramsToSend ; i++ )
|
|
setBit ( i , m_readAckBits2 );
|
|
m_readAckBitsOn = m_dgramsToSend;
|
|
if ( g_conf.m_logDebugUdp )
|
|
log(LOG_DEBUG,"udp: Cramming ACKs "
|
|
"tid=%" INT32 " "
|
|
"dst=%s:%hu" ,
|
|
(int32_t)m_transId ,
|
|
iptoa(m_ip),
|
|
(uint16_t)m_port);
|
|
}
|
|
|
|
// . if it's our first, mark this for g_stats UDP_*_IN_BPS
|
|
// . makeReadBuf will init m_firstReadTime to -1
|
|
//if (m_readBitsOn == 0 && m_firstReadTime == -1) m_firstReadTime =now;
|
|
// did we already receive this dgram?
|
|
if ( isOn(dgramNum,m_readBits2) ) {
|
|
// did we already send the ack for it?
|
|
if ( isOn(dgramNum,m_sentAckBits2) ) {
|
|
// clear the ack we sent for this so we send it again
|
|
clrBit ( dgramNum , m_sentAckBits2 );
|
|
// reduce lit bit count of sent acks
|
|
m_sentAckBitsOn--;
|
|
// update the next ack to send
|
|
if ( dgramNum < m_firstUnlitSentAckBit )
|
|
m_firstUnlitSentAckBit = dgramNum;
|
|
return true;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// . copy the msg meat into our m_readBuf
|
|
// . how big is the dgram header?
|
|
int32_t headerSize = m_proto->getHeaderSize ( peek , peekSize );
|
|
// make it zero if proto wants them in m_readBuf
|
|
if ( ! m_proto->stripHeaders() ) headerSize = 0;
|
|
// . we store transId, size, type, etc. in the UdpSlot
|
|
// . we store the msg in it's pre-sent form (w/o dgram headers)
|
|
// . "maxDataSize" is max bytes of msg data per dgram (w/o hdr)
|
|
int32_t maxDataSize = m_maxDgramSize - headerSize;
|
|
int32_t offset = dgramNum * maxDataSize;
|
|
/*
|
|
// . this checks for undersends (dgrams with not enough data)
|
|
// . we set "size" to the space available in readBuf for dgram's data
|
|
// . we then truncate to maxDataSize in case it's too big
|
|
// . "size" should equal the msg (w/o hdr) in the dgram
|
|
// . return true on error
|
|
// NO: added the "dgramsToRead > 0" to allow underSends on 1 dgram msgs
|
|
int size = m_readBufSize - offset;
|
|
if ( size > maxDataSize ) size = maxDataSize;
|
|
if ( size != dgramSize - headerSize ) {
|
|
g_errno = EBADENGINEER;
|
|
// remove dgram from queue
|
|
discardDgram();
|
|
return log("UdpSlot::readDgram: read undersend") ;
|
|
}
|
|
// this checks for oversends, dgrams that fall outside our readBuf
|
|
if ( offset + dgramSize - headerSize > m_readBufSize ) {
|
|
g_errno = EBADENGINEER;
|
|
// remove dgram from queue
|
|
discardDgram();
|
|
return log("UdpSlot::readDgram: read buf overflow") ;
|
|
}
|
|
*/
|
|
|
|
// we'll read it ourselves, so tell caller not to read it
|
|
|
|
// . how many bytes should be in this dgram?
|
|
// . this will be -1 if unknown, but under a dgram's worth of bytes
|
|
// . -1 is used for the DNS protocol
|
|
int32_t msgSize = m_proto->getMsgSize ( peek , peekSize );
|
|
|
|
// if this is the first dgram then set this shit
|
|
if ( m_readBitsOn == 0 ) {
|
|
// how many dgrams are we reading for this msg?
|
|
m_dgramsToRead = m_proto->getNumDgrams(msgSize,m_maxDgramSize);
|
|
// set the msgType from the dgram header
|
|
m_msgType = m_proto->getMsgType ( peek , peekSize );
|
|
// how big is the msg? remember it
|
|
m_readBufSize = msgSize;
|
|
// . set the cback niceness
|
|
// . ONLY if slot is new! otherwise, we keep the sender's
|
|
// niceness. so if the slot niceness got converted by
|
|
// the handler we do not re-nice it on our end.
|
|
if ( ! m_sendBuf )
|
|
m_niceness = m_proto->isNice ( peek , peekSize );
|
|
}
|
|
|
|
// . if m_readBuf is NULL then init m_readBuf/m_readBufMaxSize big
|
|
// enough to hold "msgSize" bytes
|
|
// . if we are hot then do not call malloc but try to use m_tmpBuf
|
|
// . if we fail, return false and set g_errno
|
|
|
|
// . init m_readBuf, m_readBufMaxSize and m_dgramsToRed
|
|
// . only inits m_readBuf and m_readBufMaxSize if these are 0
|
|
//
|
|
// ERROR!!!! cannot call malloc() in a signal handler
|
|
// But now, IF WE'RE HOT, sendRequest() should pre-allocate m_readBuf
|
|
// and if we're reading in an incoming request then it cannot be bigger
|
|
// than the slot's m_quickBuf which we set m_readBuf to in
|
|
// makeReadBuf if we're hot...
|
|
//
|
|
|
|
// just use m_tmpBuf if our sendBuf is NULL and we are reading a
|
|
// small request. But Msg17 thinks this is allocated and tells Msg40
|
|
// to free it.
|
|
// i'm commenting this out to find the rdbtree corruption bug
|
|
//if ( ! m_sendBuf && ! m_readBuf &&
|
|
// msgSize < TMPBUFSIZE && m_msgType != 0x17 ) {
|
|
// m_readBuf = m_tmpBuf;
|
|
// m_readBufMaxSize = TMPBUFSIZE;
|
|
//}
|
|
|
|
// . this dgram should let us know how big the entire msg is
|
|
// . so allocate space for m_readBuf
|
|
// . we may already have a read buf if caller passed one in
|
|
retry:
|
|
if ( ! m_readBuf ) {
|
|
if ( ! makeReadBuf ( msgSize , m_dgramsToRead ) )
|
|
return log("udp: Failed to allocate %" INT32 " bytes to read "
|
|
"request or reply for udp socket.",msgSize);
|
|
// track down the mem leak.
|
|
// someone is not freeing their read buf!!
|
|
//logf(LOG_DEBUG,"udpslot alloc %" INT32 " at 0x%" XINT32 " msgType=%hhx",
|
|
// msgSize,m_readBuf,m_msgType);
|
|
}
|
|
|
|
// if we don't have enough room alloc a read buffer
|
|
if ( msgSize > m_readBufMaxSize ) {
|
|
// now we must alloc a buffer
|
|
m_readBuf = NULL;
|
|
goto retry;
|
|
}
|
|
|
|
// return false if we have no room for the entire reply
|
|
if ( msgSize > m_readBufMaxSize ) {
|
|
g_errno = EBUFTOOSMALL;
|
|
return log("udp: Msg size of %" INT32 " bytes is too big for the "
|
|
"buffer size, %" INT32 ", we allocated. msgType=0x%hhx.",
|
|
msgSize, m_readBufMaxSize , m_msgType );
|
|
}
|
|
|
|
// if its a msg 0x0c reply from a proxy ove roadrunner wireless
|
|
// they tend to damage our packets for some reason so i repeat
|
|
// the ip for a total of an 8 byte reply
|
|
/*
|
|
MDW: this seems to be causing problems on local networks
|
|
so taking it out. 4/7/2015.
|
|
if ( m_msgType == 0x0c && msgSize == 12 && peekSize == 24 &&
|
|
// must be reply! not request.
|
|
m_callback ) {
|
|
// sanity
|
|
if ( m_proto->getMaxPeekSize() < 24 ) { char *xx=NULL;*xx=0;}
|
|
if ( headerSize != 12 ) { char *xx=NULL;*xx=0;}
|
|
// ips must match. like a checksum kinda.
|
|
int32_t ip1 = *(int32_t *)(peek+headerSize);
|
|
int32_t ip2 = *(int32_t *)(peek+headerSize+4);
|
|
int32_t crc = *(int32_t *)(peek+headerSize+8);
|
|
// one more check since ip1 seems to equal ip2 sometimes
|
|
// when it should not!
|
|
int32_t h32 = hash32h ( ip1 , 0 );
|
|
if ( ip1 != ip2 || h32 != crc ) {
|
|
static int32_t s_lastTime = 0;
|
|
g_corruptPackets++;
|
|
int32_t tt = getTimeLocal();
|
|
if ( tt > s_lastTime + 5 ) {
|
|
s_lastTime = tt;
|
|
log("dns: dropping corrupt msgc reply dgram. "
|
|
"count=%" INT32 ".",g_corruptPackets);
|
|
return true;
|
|
}
|
|
return true;
|
|
}
|
|
}
|
|
*/
|
|
|
|
// we're doing the call to recvfrom() for sure now
|
|
*discard = false;
|
|
|
|
// dgram #'s above 0 can be copied directly into m_readBuf
|
|
if ( dgramNum > 0 ) {
|
|
// how much DATA can we read from this dgram?
|
|
int32_t avail = m_readBufMaxSize - offset;
|
|
if ( avail > maxDataSize ) avail = maxDataSize;
|
|
// include header too
|
|
int32_t toRead = avail + headerSize;
|
|
// where to put it?
|
|
char *dest = m_readBuf + offset - headerSize;
|
|
// sanity check, watch out for bad headers...
|
|
if ( toRead < 0 ) {
|
|
// throw this dgram away
|
|
*discard = true;
|
|
//g_errno = ECORRUPTDATA;
|
|
// do not spam the logs
|
|
static int32_t s_badCount = 0;
|
|
s_badCount++;
|
|
// only log it once every 1024 times it happens
|
|
//if ( ((s_badCount++) & 1023 ) == 0 )
|
|
log("udp: got %" INT32 " bad dgram headers. "
|
|
"dgramNum=%" INT32 " offset=%" INT32 " "
|
|
"readBufMaxSize=%" INT32 ". IS hosts.conf OUT OF SYNC???",
|
|
s_badCount,(int32_t)dgramNum,(int32_t)offset,
|
|
(int32_t)m_readBufMaxSize);
|
|
// this actually helps us to identify when hosts.conf
|
|
// is out of sync between hosts, so core
|
|
// SEEMS like the roadrunner wireless connection
|
|
// is spiking our packets sometimes with noise...
|
|
//char *xx = NULL; *xx = 0;
|
|
return false;
|
|
}
|
|
// save what's before us
|
|
char tmp[32];
|
|
memcpy_ass ( tmp , dest , headerSize );
|
|
int numRead = recvfrom ( sock ,
|
|
dest ,
|
|
toRead ,
|
|
0 ,
|
|
NULL ,
|
|
NULL );
|
|
//log("udp: recvfrom1 = %i",(int)numRead);
|
|
// let caller know how much we read for stats purposes
|
|
*readSize = numRead;
|
|
// restore what was at the header before we stored it there
|
|
memcpy_ass ( dest , tmp , headerSize );
|
|
// bail on error, how could this happen?
|
|
if ( numRead < 0 ) {
|
|
#ifdef _VALGRIND_
|
|
if ( errno == 4 ) goto retry;
|
|
#endif
|
|
g_errno = errno;
|
|
return log("udp: Call to recvfrom had error: %s.",
|
|
mstrerror(g_errno));
|
|
}
|
|
// keep stats
|
|
if ( m_host ) m_host->m_dgramsFrom++;
|
|
// reading from loopback, 127.0.0.1?
|
|
else if (m_ip==0x0100007f)g_hostdb.getMyHost()->m_dgramsFrom++;
|
|
// keep track of dgrams sent outside of our cluster
|
|
//else g_stats.m_dgramsFromStrangers++;
|
|
// it's new to us, set the read bits
|
|
setBit ( dgramNum, m_readBits2 );
|
|
// inc the lit bit count
|
|
m_readBitsOn++;
|
|
// if our proto doesn't use acks, treat this as an ACK as well
|
|
if ( ! m_proto->useAcks () ) readAck(sock,0/*dgramNum*/,now);
|
|
// if read everything, set the queued timer
|
|
if ( m_readBitsOn >= m_dgramsToRead )
|
|
m_queuedTime = gettimeofdayInMilliseconds();
|
|
// all done
|
|
return true;
|
|
}
|
|
|
|
// otherwise, copy into our tmp buffer
|
|
char dgram [DGRAM_SIZE_CEILING];
|
|
retry2:
|
|
// read in the whole dgram
|
|
int dgramSize = recvfrom ( sock ,
|
|
dgram ,
|
|
DGRAM_SIZE_CEILING ,
|
|
0 ,
|
|
NULL ,
|
|
NULL );
|
|
//log("udp: recvfrom2 = %i",(int)dgramSize);
|
|
// bail on error, how could this happen?
|
|
if ( dgramSize < 0 ) {
|
|
// valgrind
|
|
if ( errno == EINTR ) goto retry2;
|
|
g_errno = errno;
|
|
return log("udp: Call to recvfrom had error: %s.",
|
|
mstrerror(g_errno));
|
|
}
|
|
// keep stats
|
|
if ( m_host ) m_host->m_dgramsFrom++;
|
|
// . reading from loopback, 127.0.0.1?
|
|
// . monitor.cpp has NULL for g_hostdb.m_myHost
|
|
else if ( m_ip == 0x0100007f && g_hostdb.getMyHost() )
|
|
g_hostdb.getMyHost()->m_dgramsFrom++;
|
|
// keep track of dgrams sent outside of our cluster
|
|
//else g_stats.m_dgramsFromStrangers++;
|
|
// let caller know how much we read for stats purposes
|
|
*readSize = dgramSize;
|
|
|
|
// where to put it? it might not be dgram #0...
|
|
char *dest = m_readBuf + offset ;
|
|
// what to put?
|
|
char *src = dgram + headerSize ;
|
|
// how much to put
|
|
int32_t len = dgramSize - headerSize;
|
|
// if msgSize was -1 then m_readBufSize will be -1
|
|
if ( m_readBufSize == -1 ) m_readBufSize = len;
|
|
// bounce it back into m_readBuf
|
|
memcpy_ass ( dest , src , len );
|
|
// it's new to us, set the read bits
|
|
setBit ( dgramNum, m_readBits2 );
|
|
// inc the lit bit count
|
|
m_readBitsOn++;
|
|
// if our proto doesn't use acks, treat this as an ACK as well
|
|
if ( ! m_proto->useAcks () ) readAck(sock,0/*dgramNum*/,now);
|
|
// if read everything, set the queued timer
|
|
if ( m_readBitsOn >= m_dgramsToRead )
|
|
m_queuedTime = gettimeofdayInMilliseconds();
|
|
// success
|
|
return true;
|
|
}
|
|
|
|
// called to process an ack we read for dgram # "dgramNum"
|
|
void UdpSlot::readAck ( int sock , int32_t dgramNum , int64_t now ) {
|
|
// protection from garbled dgrams
|
|
if ( dgramNum >= MAX_DGRAMS ) {
|
|
log(LOG_LOGIC,
|
|
"udp: Reading ack for dgram #%" INT32 " > max dgram of %" INT32 ".",
|
|
dgramNum,(int32_t)MAX_DGRAMS);
|
|
return ; }
|
|
// . get time now
|
|
// . make async safe
|
|
//int64_t now = gettimeofdayInMilliseconds();
|
|
// update lastRead time for this transaction
|
|
m_lastReadTime = g_now;
|
|
// cease all resending
|
|
m_resendCount = 0;
|
|
// reset the resendTime to the starting point before back-off scheme
|
|
setResendTime();
|
|
// if this is a dup ack, ignore it
|
|
if ( isOn ( dgramNum , m_readAckBits2 ) ) return;
|
|
// mark this ack as read
|
|
setBit ( dgramNum , m_readAckBits2 );
|
|
// update lit bit count
|
|
m_readAckBitsOn++;
|
|
// if it was marked as unsent, fix that
|
|
if ( ! isOn ( dgramNum , m_sentBits2 ) ) {
|
|
// bitch if we do not even have a send buffer. why is he acking
|
|
// something we haven't even had to a chance to generate let
|
|
// alone send? network error?
|
|
if ( ! m_sendBufAlloc || m_dgramsToSend <= 0 )
|
|
log("udp: Read ack but send buf is empty.");
|
|
// mark this dgram as sent
|
|
setBit ( dgramNum , m_sentBits2 );
|
|
// update lit bit count
|
|
m_sentBitsOn++;
|
|
}
|
|
// . we often receive an out of order ACK
|
|
// . this usually means that the receiver did not get the dgrams
|
|
// for the gap of acks because of a collision
|
|
// . we detect this gap and automatically re-send the dgrams w/o delay
|
|
// . if our right neighbor read ack bit is off then mark all off bits
|
|
// on our right as having sent bits of 0, until we hit a lit ack bit
|
|
for ( int32_t i = dgramNum - 1 ; i >= 0 ; i-- ) {
|
|
// stop after hitting a lit bit
|
|
if ( isOn ( i , m_readAckBits2 ) ) break;
|
|
// mark as unsent iff it's marked as sent
|
|
if ( ! isOn ( i , m_sentBits2 ) ) continue;
|
|
// mark as unsent
|
|
clrBit ( i , m_sentBits2 );
|
|
// reduce the lit bit count
|
|
m_sentBitsOn--;
|
|
// update m_nextToSend
|
|
if ( i < m_nextToSend ) m_nextToSend = i;
|
|
}
|
|
|
|
// if the reply or request was fully acknowledged by the receiver
|
|
// then record some statistics
|
|
if ( ! hasAcksToRead() ) {
|
|
//if ( m_msgType == 0x39 )
|
|
// log("jey");
|
|
int64_t now = gettimeofdayInMilliseconds();
|
|
int32_t delta = now - m_startTime;
|
|
// but if we were sending a reply, use m_queuedTime
|
|
// as the start time of the send. we set m_queuedTime
|
|
// to the current time in sendReply().
|
|
if ( ! m_callback ) delta = now - m_queuedTime;
|
|
int32_t n = m_niceness;
|
|
if ( n < 0 ) n = 0;
|
|
if ( n > 1 ) n = 1;
|
|
int32_t r = 0;
|
|
// if m_callback then we are sending a request, not a reply,
|
|
// because only the sender of the request has a callback
|
|
if ( m_callback ) r = 1;
|
|
// add to average
|
|
g_stats.m_msgTotalOfSendTimes[m_msgType][n][r] += delta;
|
|
g_stats.m_msgTotalSent [m_msgType][n][r]++;
|
|
// bucket number is log base 2 of the delta
|
|
if ( delta > 64000 ) delta = 64000;
|
|
int32_t bucket = getHighestLitBit ( (uint16_t)delta );
|
|
g_stats.m_msgTotalSentByTime [m_msgType][n][r][bucket]++;
|
|
// set the queued time for stats on how long it sits in the
|
|
// queue.
|
|
m_queuedTime = now;
|
|
}
|
|
|
|
// to save memory in UdpSlot we only keep track of every 8th dgram time
|
|
//if ( (dgramNum & 0x07) == 0 ) {
|
|
// get when we sent this dgram
|
|
//int64_t start = getSendTime ( dgramNum >> 3 ) ;
|
|
// trip time
|
|
//int32_t tripTime = now - start;
|
|
// debug msg
|
|
//log("tripTime = %" INT32 "", tripTime );
|
|
// . update host time
|
|
// . we should also stamp the host each time we re-send, too
|
|
// . this is now handled by g_hostdb::pingHost()
|
|
//g_hostdb.stampHost( m_hostId , tripTime, false/*timedOut?*/);
|
|
//}
|
|
// log msg
|
|
if ( g_conf.m_logDebugUdp ) {
|
|
//#ifdef _UDPDEBUG_
|
|
int32_t kk = 0; if ( m_callback ) kk = 1;
|
|
int32_t hid = -1;
|
|
if ( m_host && m_host->m_hostdb == &g_hostdb )
|
|
hid = m_host->m_hostId;
|
|
log(LOG_DEBUG,
|
|
"udp: Read ACK "
|
|
"dgram=%" INT32 " "
|
|
"msg=0x%hx "
|
|
"tid=%" INT32 " "
|
|
"src=%s:%hu "
|
|
"init=%" INT32 " "
|
|
"age=%" INT32 " "
|
|
"dsent=%" INT32 " "
|
|
"aread=%" INT32 " "
|
|
"hid=%" INT32 "",
|
|
(int32_t)dgramNum,
|
|
(int16_t)m_msgType ,
|
|
(int32_t)m_transId,
|
|
iptoa(m_ip) ,
|
|
(uint16_t)m_port,
|
|
(int32_t)kk ,
|
|
(int32_t)(now -m_startTime) ,
|
|
(int32_t)m_sentBitsOn ,
|
|
(int32_t)m_readAckBitsOn ,
|
|
hid);
|
|
//#endif
|
|
}
|
|
}
|
|
|
|
// returns false and sets g_errno on error
|
|
bool UdpSlot::makeReadBuf ( int32_t msgSize , int32_t numDgrams ) {
|
|
// bitch if it's already there
|
|
if ( m_readBuf ) {
|
|
g_errno = EBADENGINEER;
|
|
return log(LOG_LOGIC,
|
|
"udp: makereadbuf: Read buf already there.");
|
|
}
|
|
// reset this to -1
|
|
//m_firstReadTime = -1;
|
|
// ensure msg not too big
|
|
if ( msgSize > m_proto->getMaxMsgSize() ) {
|
|
g_errno = EMSGTOOBIG;
|
|
return log(LOG_LOGIC,"udp: makereadbuf: msg size of %" INT32 " is "
|
|
"too big. Max is %" INT32 ".",msgSize,
|
|
(int32_t)m_proto->getMaxMsgSize());
|
|
}
|
|
// if msgSize is -1 then it is under 1 dgram, but assume the worst
|
|
if ( msgSize == -1 ) msgSize = m_maxDgramSize;
|
|
// . if we're in a sig handler do not call malloc()
|
|
// . also, if read is small, don't bother calling malloc()
|
|
// . i took out msgSize < TMPBUFSIZE because caller may be expecting
|
|
// to "steal" the reply for setting an RdbList or something and he
|
|
// won't want to do a copy. Fixes segv we had with Msg0 calling
|
|
// Multicast and setting the list with reply and ownData to true.
|
|
if ( g_inSigHandler ) { // || msgSize < TMPBUFSIZE ) {
|
|
// bitch if incoming read too big to handle
|
|
if ( msgSize > TMPBUFSIZE ) {
|
|
g_errno = EBUFTOOSMALL;
|
|
return log(LOG_LOGIC,"udp: makereadbuf: buffer size "
|
|
"of %" INT32 " is too big for async udp socket.",
|
|
msgSize);
|
|
}
|
|
m_readBuf = m_tmpBuf;
|
|
m_readBufMaxSize = TMPBUFSIZE;
|
|
return true;
|
|
}
|
|
// . if it is small enough, no need to malloc
|
|
// . but Msg0 requests like to set list to contents, so make it
|
|
// . we'll have to check everything before doing this...
|
|
//if ( msgSize <= TMPBUFSIZE && m_msgType != 0x00 ) {
|
|
// m_readBuf = m_tmpBuf;
|
|
// m_readBufMaxSize = TMPBUFSIZE;
|
|
// return true;
|
|
//}
|
|
// . create a msg buf to hold msg, zero out everything...
|
|
// . label it "umsg" so we can grep the *.cpp files for it
|
|
char bb[10];
|
|
bb[0] = 'u';
|
|
bb[1] = 'm';
|
|
bb[2] = 's';
|
|
bb[3] = 'g';
|
|
// msgType is 8 bits
|
|
char val ;
|
|
val = ((m_msgType >> 4) & 0x0f);
|
|
if ( val <= 9 ) bb[4] = '0' + val;
|
|
else bb[4] = 'a' + val - 10;
|
|
val = ((m_msgType ) & 0x0f);
|
|
if ( val <= 9 ) bb[5] = '0' + val;
|
|
else bb[5] = 'a' + val - 10;
|
|
bb[6] = '\0';
|
|
//sprintf(bb,"UdpSlot 0x%hx",m_msgType);
|
|
m_readBuf = (char *) mmalloc ( msgSize , bb ); // "UdpSlot") ;
|
|
if ( ! m_readBuf ) {
|
|
m_readBufSize = 0;
|
|
return log("udp: Failed to allocate %" INT32 " bytes to "
|
|
"read request or reply on udp socket.",
|
|
msgSize);
|
|
}
|
|
m_readBufMaxSize = msgSize;
|
|
// let the caller know we're good
|
|
return true;
|
|
}
|
|
|
|
// . returns a score of -1 if nothing to send
|
|
// . higher scoring slots will do their sending first
|
|
// . may have ACKs to send or plain old dgrams to send
|
|
// . now is current time in milliseconds since the epoch
|
|
// . returns -2 if token required to send more
|
|
//int32_t UdpSlot::getScore (int64_t now, UdpSlot *s_token ,
|
|
// uint32_t s_tokenTime , int32_t LARGE_MSG ) {
|
|
int32_t UdpSlot::getScore ( int64_t now ) {
|
|
// . allow tokens 500ms older than the current possessor of the token
|
|
// to send with an ack window of 1
|
|
// . if you change the 500 here change it in UdpServer::readSock() too
|
|
//bool old = ( (uint32_t)m_startTime + 100 < s_tokenTime );
|
|
// . do not send acks back for any large reply if s_token is in use
|
|
// . m_callback is non-NULL if we initiated the transaction
|
|
// . but it's ok if it's local, on the same host. NO!!! FIX!
|
|
// . don't send ack if either slot is taken!
|
|
/*
|
|
if ( m_callback && m_dgramsToRead >= LARGE_MSG &&
|
|
//g_hostdb.getMyIp() != m_ip &&
|
|
s_token && s_token != this && ! old )
|
|
return -1;
|
|
*/
|
|
// . we cannot send anything if someone already has s_token
|
|
// . or if someone has s_token (we're being blasted)
|
|
/*
|
|
if ( ! m_callback && m_dgramsToSend >= LARGE_MSG &&
|
|
//g_hostdb.getMyIp() != m_ip &&
|
|
s_token && s_token != this && ! old )
|
|
return -1;
|
|
*/
|
|
|
|
// do not do sends if callback was called. maybe cancelled?
|
|
// this was causing us to get into an infinite loop in
|
|
// UdpServer.cpp's sendPoll_ass(). there wasn't anything to send i
|
|
// guess because it got cancelled (?) but we kept returning it here
|
|
// nonetheless.
|
|
if ( m_calledCallback )
|
|
return -1;
|
|
|
|
// send ACKs before regular dgrams so we don't hold people up
|
|
if ( m_sentAckBitsOn < m_readBitsOn && m_proto->useAcks() )
|
|
return 0x7fffffff;
|
|
// return a score of -1 if we've sent all dgrams (and no acks to send)
|
|
if ( m_sentBitsOn >= m_dgramsToSend ) return -1;
|
|
// . if token is available (or you're older) you can always send one
|
|
// dgram passed the last ack you got (ack window of 1)
|
|
// . when receiver's token opens up he'll send you an ack for it
|
|
// and when you get that ack you can grab your token and send back!
|
|
/*
|
|
if ( (old || ! s_token) &&
|
|
! m_callback && m_dgramsToSend >= LARGE_MSG &&
|
|
getNumDgramsSent() > getNumAcksRead() ) return -1;
|
|
*/
|
|
// . let's use a window now, give acks a chance to catch up somewhat
|
|
// . if send is local, use a larger ack window of ?64? dgrams
|
|
//if ( ( m_ip != g_hostdb.getMyIp() || g_conf.m_interfaceMachine ) &&
|
|
//if ( ( ! g_hostdb.isMyIp(m_ip) || g_conf.m_interfaceMachine ) &&
|
|
if ( ! g_hostdb.isMyIp(m_ip) &&
|
|
m_sentBitsOn >= m_readAckBitsOn + ACK_WINDOW_SIZE ) return -1;
|
|
// well, give a window size of 100 to loopbacks
|
|
//if ( ( m_ip == g_hostdb.getMyIp() && !g_conf.m_interfaceMachine ) &&
|
|
//if ( ( g_hostdb.isMyIp(m_ip) && !g_conf.m_interfaceMachine ) &&
|
|
if ( g_hostdb.isMyIp(m_ip) &&
|
|
m_sentBitsOn >= m_readAckBitsOn + ACK_WINDOW_SIZE_LB ) return -1;
|
|
// . if we don't have s_token, but it is available, then our window
|
|
// size is only 1
|
|
// . when we read an ACK we'll try to claim s_token then and our
|
|
// window size will be back to ACK_WINDOW_SIZE
|
|
// . so you can only send 1 dgram more than acks read if you don't have
|
|
// the token!
|
|
/*
|
|
if ( ! s_token && ! m_callback && m_dgramsToSend >= LARGE_MSG &&
|
|
getNumAcksRead() + 1 <= getNumDgramsSent() ) return -1;
|
|
*/
|
|
// return 1 if now is 0
|
|
if ( now == 0LL ) return 1;
|
|
// sort regular sends by the last send time
|
|
int64_t score = now - m_lastSendTime + 1000;
|
|
// watch out if someone changed the system clock on us
|
|
if ( score < 1000 ) score = 1000;
|
|
// . if we've resent before, wait enough time to send again!
|
|
// . m_resendCount resets when we read an ack (in readAck())
|
|
//if ( m_resendCount > 0 && now - m_lastReadTime < m_resendTime ) {
|
|
//log("now=%"INT64"-lastRead=%"INT64" <%" INT32 "" , now,m_lastReadTime,m_resendTime);
|
|
// return -1;
|
|
//}
|
|
// let's give smaller msgs more pts to reduce latency
|
|
//if ( m_sendBufSize <= 1 *1024 ) return score + 30 ;
|
|
//if ( m_sendBufSize <= 10 *1024 ) return score + 20;
|
|
//if ( m_sendBufSize <= 100*1024 ) return score + 10;
|
|
// . is it a resend?
|
|
// . get the time we sent the first unacked dgram
|
|
// m_firstUnackedDgram
|
|
// bool resend = ( score >= m_resendTime );
|
|
// if it's a resend set the hi bit to give it precedence
|
|
// if ( resend ) score |= 0x80000000;
|
|
// else score &= 0x7fffffff;
|
|
return score;
|
|
}
|
|
|
|
|
|
void UdpSlot::printState() {
|
|
//int64_t now = gettimeofdayInMilliseconds();
|
|
log(LOG_TIMING,
|
|
"admin: UdpSlot - type:Msg%2" XINT32 " nice:%" INT32 " "
|
|
"queued:%" INT32 " "
|
|
"handlerCalled:%" INT32 "",
|
|
(int32_t)m_msgType,
|
|
m_niceness,
|
|
(int32_t)m_isQueued,
|
|
(int32_t)m_calledHandler);
|
|
|
|
}
|