mirror of
https://github.com/privacore/open-source-search-engine.git
synced 2025-01-22 02:18:42 -05:00
1495 lines
52 KiB
C++
1495 lines
52 KiB
C++
#include "UdpSlot.h"
|
|
#include "UdpServer.h"
|
|
#include "Hostdb.h"
|
|
#include "Stats.h"
|
|
#include "Proxy.h"
|
|
#include "IPAddressChecks.h"
|
|
#include "BitOperations.h"
|
|
#include "Process.h"
|
|
#include "Conf.h"
|
|
#include "ip.h"
|
|
#include "Mem.h"
|
|
#include "fctypes.h"
|
|
#include "Errno.h"
|
|
#include <errno.h>
|
|
#include <stdlib.h>
|
|
#ifdef _VALGRIND_
|
|
#include <valgrind/memcheck.h>
|
|
#endif
|
|
#include "gbmemcpy.h"
|
|
|
|
int32_t g_cancelAcksSent = 0;
|
|
int32_t g_cancelAcksRead = 0;
|
|
|
|
// max resend time (max backoff) for niceness 0
|
|
// . i lowered this because the wire supports 1 full packet about every 120
|
|
// MICROSECONDS. so in 40ms we could send ~350 1500-byte packets!!!
|
|
// . i also lowered the ack window down to 2 dgrams so this makes more sense
|
|
// let's not clog everybody up
|
|
#define MAX_RESEND_0 200
|
|
|
|
// max resend time (max backoff) for niceness 1+
|
|
// let's not clog up our network switch internet port
|
|
#define MAX_RESEND_1 15000
|
|
|
|
// start resend time for niceness 0
|
|
|
|
// . now i increase resend time from 120 to 250 because packets don't seem
|
|
// to be getting lost as much as before since i increase
|
|
// /proc/sys/net/core/rmem_default and rmem_max to 10Megs
|
|
// . before, when it was 65k, kernel was dropping packets like a blind waiter
|
|
// . so try 250ms now, hopefully it will cut down on uneccessary resends
|
|
// . also it would help to make UdpServer use unmasked interrupt signals
|
|
// to be more responsive
|
|
// . but now we also have a problem of doing a bunch of sends, they just
|
|
// get put on the queue and some may be silently dropped (send sendto())
|
|
// . we base this resend time assuming we sent the packet when we called
|
|
// . sigtimedwait() only has a resolution of 20ms!!! so make due...
|
|
// . i lowered this down to 20 since our window is much smaller now
|
|
// . there's typically about 120 microseconds between full packets so we
|
|
// should resend quickly!!
|
|
// . keep it to 40ms due to kernel time slicing problems
|
|
// . but now that we have our query compression proxy over the internet, we got
|
|
// pings that are like 50ms...
|
|
// . this was at 70 but gk0 pings to scproxy1 at like 150ms a lot via
|
|
// the roadrunner wireless link, so lets crank this up
|
|
|
|
//@todo: BR: We should experiment with these
|
|
#define RESEND_0 170
|
|
|
|
// . for short msgs we can resend more rapidly
|
|
// . it doesn't help to go lower than 20ms cuz that's sigtimedwait()'s limit
|
|
// . keep it to 40ms due to kernel time slicing problems
|
|
// we are going over the internet to our query compression proxy now
|
|
#define RESEND_0_SHORT 150
|
|
|
|
// start resend time for niceness 1+
|
|
// because of roadrunner... (See above)
|
|
#define RESEND_1 200
|
|
|
|
// try to fix a bunch of msg99 replies coming into host 0 at once
|
|
#define RESEND_1_LOCAL 100
|
|
|
|
|
|
// . the ack window is back and bigger, now 100 dgrams
|
|
// . this gives the receives a chance to respond to being blasted
|
|
// . without this acks being sent back are often lost for some reason,
|
|
// ?maybe it's just loopback sends?
|
|
// i don't know if that was the cause of it, i think it might be something
|
|
// else, so to try to prevent from dropping packets (ifconfig) let's put this down again.
|
|
#define ACK_WINDOW_SIZE 4
|
|
|
|
// size of window for local transactions over loopback interface
|
|
// see comment above for why we put this back from 12 to 4
|
|
#define ACK_WINDOW_SIZE_LB 4
|
|
|
|
void UdpSlot::connect ( UdpProtocol *proto ,
|
|
sockaddr_in *endPoint ,
|
|
Host *host ,
|
|
int32_t hostId ,
|
|
int32_t transId ,
|
|
int64_t timeout , // in milliseconds
|
|
int64_t now ,
|
|
int32_t niceness ) {
|
|
// map loopback ip to our ip
|
|
uint32_t ip = endPoint->sin_addr.s_addr;
|
|
if (ip == g_hostdb.getLoopbackIp()) {
|
|
ip = g_hostdb.getMyIp();
|
|
}
|
|
|
|
connect(proto, ip, ntohs(endPoint->sin_port), host, hostId, transId, timeout, now, niceness);
|
|
}
|
|
|
|
// . call this after you make a new UdpSlot
|
|
// . make new slot using mcalloc() so it's zero'd out
|
|
// . NOTE: callback must be non-NULL if you're going to send a request
|
|
void UdpSlot::connect ( UdpProtocol *proto ,
|
|
uint32_t ip ,
|
|
uint16_t port ,
|
|
Host *host ,
|
|
int32_t hostId ,
|
|
int32_t transId ,
|
|
int64_t timeout , // in milliseconds
|
|
int64_t now ,
|
|
int32_t niceness ) {
|
|
// avoid that heavy memset_ass() call using this logic.
|
|
// we will clear on demand using m_numBitsInitialized logic in UdpSlot.h
|
|
int32_t size = offsetof(UdpSlot,m_sentBits2);
|
|
memset((void *) this, 0, size);
|
|
// store this info
|
|
m_proto = proto ;
|
|
m_ip = ip ; // keep in network order
|
|
m_port = port ; // keep in host order
|
|
m_host = host ;
|
|
m_hostId = hostId ;
|
|
m_transId = transId ;
|
|
m_timeout = timeout ;
|
|
m_niceness = niceness ;
|
|
// initialize our time of birth
|
|
m_startTime = now;
|
|
// reset this
|
|
m_queuedTime = -1;
|
|
|
|
//determine datagram size
|
|
if (!m_proto->useAcks()) {
|
|
m_maxDgramSize = DGRAM_SIZE_DNS;
|
|
} else {
|
|
switch(ip_distance(m_ip)) {
|
|
case ip_distance_ourselves:
|
|
m_maxDgramSize = DGRAM_SIZE_LB;
|
|
break;
|
|
case ip_distance_lan:
|
|
//todo: check link MTU
|
|
//fallthrough
|
|
case ip_distance_nearby:
|
|
m_maxDgramSize = DGRAM_SIZE;
|
|
break;
|
|
default:
|
|
m_maxDgramSize = DGRAM_SIZE_INTERNET;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
void UdpSlot::resetConnect ( ) {
|
|
if (m_ip == g_hostdb.getLoopbackIp()) {
|
|
m_ip = g_hostdb.getMyIp();
|
|
}
|
|
|
|
// . compute max dgram size
|
|
// . if we're sending to loopback make bigger
|
|
// . dns has its own max size (DNS_DGRAM_SIZE)
|
|
// . if we're going over the internet (interface machine)
|
|
// use a smaller DGRAM so it makes it
|
|
if (!m_proto->useAcks()) {
|
|
m_maxDgramSize = DGRAM_SIZE_DNS;
|
|
} else if ((m_ip & 0x000000ff) != (g_hostdb.m_myIp & 0x000000ff) || !g_hostdb.isIpInNetwork(m_ip)) {
|
|
// this as 0x0000ffff but we use 10.5.* and 10.6.* addresses
|
|
m_maxDgramSize = DGRAM_SIZE_INTERNET;
|
|
g_process.shutdownAbort(true);
|
|
} else if ( ip_distance(m_ip)==ip_distance_ourselves ) {
|
|
m_maxDgramSize = DGRAM_SIZE_LB;
|
|
} else {
|
|
m_maxDgramSize = DGRAM_SIZE;
|
|
}
|
|
|
|
// reset the slot
|
|
m_readBitsOn = 0;
|
|
m_sentBitsOn = 0;
|
|
m_readAckBitsOn = 0;
|
|
m_sentAckBitsOn = 0;
|
|
m_nextToSend = 0;
|
|
m_firstUnlitSentAckBit = 0;
|
|
m_numBitsInitialized = 0;
|
|
|
|
// . set m_dgramsToSend
|
|
// . similar to UdpProtocol::getNumDgrams(char *dgram,int32_t dgramSize)
|
|
int32_t dataSpace = m_maxDgramSize ;
|
|
|
|
if (m_proto->stripHeaders()) {
|
|
dataSpace -= m_proto->getHeaderSize(m_sendBufSize);
|
|
}
|
|
|
|
m_dgramsToSend = m_sendBufSize / dataSpace;
|
|
if ( m_sendBufSize % dataSpace != 0 ) {
|
|
m_dgramsToSend++;
|
|
}
|
|
|
|
// if msgSize was given as 0 force a dgram to be sent
|
|
if ( m_sendBufSize == 0 ) {
|
|
m_dgramsToSend = 1;
|
|
}
|
|
}
|
|
|
|
// . call this only AFTER calling connect() above
|
|
// . callback is non-NULL iff you're sending a request
|
|
// . callback is NULL ifd you're sending a reply
|
|
// . returns false and sets g_errno on error
|
|
bool UdpSlot::sendSetup(char *msg, int32_t msgSize, char *alloc, int32_t allocSize, msg_type_t msgType, int64_t now,
|
|
void *state, void (*callback)(void *state, UdpSlot *slot), int32_t niceness, const char *extraInfo) {
|
|
|
|
#ifdef _VALGRIND_
|
|
VALGRIND_CHECK_MEM_IS_DEFINED(msg,msgSize);
|
|
#endif
|
|
// can't be too big
|
|
if ( msgSize / m_maxDgramSize + 1 >= MAX_DGRAMS ) {
|
|
int32_t maxMsgSize = m_maxDgramSize * MAX_DGRAMS;
|
|
log(LOG_LOGIC,"udp: Msg size of %" PRId32" bytes is too big "
|
|
"to send. Max dgram size = %" PRId32". Max dgrams = "
|
|
"%" PRId32". Max msg size = %" PRId32" msgtype=0x%02x. Please "
|
|
"increase the #define MAX_DGRAMS in UdpSlot.h and "
|
|
"recompile to fix this.",
|
|
(int32_t)msgSize,(int32_t)m_maxDgramSize,
|
|
(int32_t)MAX_DGRAMS,maxMsgSize,
|
|
(int)msgType);
|
|
g_errno = EMSGTOOBIG;
|
|
return false;
|
|
}
|
|
|
|
// fill in the supplied parameters
|
|
m_sendBuf = msg;
|
|
m_sendBufSize = msgSize;
|
|
m_sendBufAllocSize = allocSize;
|
|
m_sendBufAlloc = alloc;
|
|
m_callback = callback;
|
|
m_state = state;
|
|
m_msgType = msgType;
|
|
m_lastSendTime = now;
|
|
m_lastReadTime = now;
|
|
m_niceness = niceness;
|
|
m_backoff = -1;
|
|
m_maxWait = -1;
|
|
|
|
// we haven't sent anything yet so reset this to -1
|
|
m_firstSendTime = -1;
|
|
|
|
// creation time
|
|
//m_sendSetupCalled = now;
|
|
|
|
// set m_resendTime, based on m_resendCount and m_niceness
|
|
setResendTime();
|
|
|
|
// . set m_dgramsToSend
|
|
// . similar to UdpProtocol::getNumDgrams(char *dgram,int32_t dgramSize)
|
|
int32_t dataSpace = m_maxDgramSize ;
|
|
if ( m_proto->stripHeaders() ) {
|
|
dataSpace -= m_proto->getHeaderSize(msgSize);
|
|
}
|
|
|
|
m_dgramsToSend = msgSize / dataSpace;
|
|
if ( msgSize % dataSpace != 0 ) {
|
|
m_dgramsToSend++;
|
|
}
|
|
|
|
// if msgSize was given as 0 force a dgram to be sent
|
|
if ( msgSize == 0 ) {
|
|
m_dgramsToSend = 1;
|
|
}
|
|
|
|
// save additional info (if present)
|
|
if (extraInfo) {
|
|
strncpy(m_extraInfo, extraInfo, sizeof(m_extraInfo));
|
|
m_extraInfo[sizeof(m_extraInfo)-1] = '\0';
|
|
} else {
|
|
m_extraInfo[0] = '\0';
|
|
}
|
|
|
|
if ( ! m_host ) {
|
|
return true;
|
|
}
|
|
// inherit this from the last transactions
|
|
m_preferEth = m_host->m_preferEth;
|
|
// and set our ip accordingly
|
|
if ( m_host->m_preferEth == 1 ) m_ip = m_host->m_ipShotgun;
|
|
else m_ip = m_host->m_ip;
|
|
|
|
return true;
|
|
}
|
|
|
|
// resets a UdpSlot for a resend
|
|
void UdpSlot::prepareForResend ( int64_t now , bool resendAll ) {
|
|
// clear all if resend is true
|
|
if (resendAll) {
|
|
for (int32_t i = 0; i < m_dgramsToSend; ++i) {
|
|
clrBit(i, m_readAckBits2);
|
|
}
|
|
|
|
// we should clear previously receive dgrams as it could be different
|
|
for (int32_t i = 0; i < m_dgramsToRead; ++i) {
|
|
clrBit(i, m_readBits2);
|
|
clrBit(i, m_sentAckBits2);
|
|
}
|
|
|
|
m_dgramsToRead = 0;
|
|
m_readBitsOn = 0;
|
|
m_sentAckBitsOn = 0;
|
|
m_readAckBitsOn = 0;
|
|
|
|
if (m_readBuf) {
|
|
mfree(m_readBuf, m_readBufMaxSize, "UdpSlot");
|
|
m_readBuf = NULL;
|
|
m_readBufMaxSize = 0;
|
|
m_readBufSize = 0;
|
|
}
|
|
}
|
|
|
|
// how many sentBits we cleared
|
|
int32_t cleared = 0;
|
|
// clear each sent bit if it hasn't gotten an ACK
|
|
for ( int32_t i = 0 ; i < m_dgramsToSend ; i++ ) {
|
|
// continue if we already have an ack for this one
|
|
if ( isOn ( i , m_readAckBits2 ) ) continue;
|
|
// continue if it's already cleared
|
|
if ( ! isOn ( i , m_sentBits2 ) ) continue;
|
|
// mark dgram #i as unsent since we don't have ACK for it yet
|
|
clrBit ( i , m_sentBits2 );
|
|
// reduce the lit bit count
|
|
m_sentBitsOn--;
|
|
// may have to adjust m_nextToSend
|
|
if ( i < m_nextToSend ) m_nextToSend = i;
|
|
// count each cleared bit
|
|
cleared++;
|
|
}
|
|
|
|
// . if we were using eth0 try using eth1, and vice versa
|
|
// . those linksys switches seem to go down all the time and come
|
|
// back up after a few hours
|
|
// . only do this on the 2nd resend
|
|
if ( g_conf.m_useShotgun &&
|
|
// need to be sending to a host in the network
|
|
m_host &&
|
|
// shotgun ip (eth1) must be different than eth0 ip
|
|
m_host->m_ip != m_host->m_ipShotgun ) {
|
|
// . were we using the eth0 ip? if so, switch to eth1
|
|
// . do not switch though if the ping is really bad for eth1
|
|
if ( m_preferEth == 0 ) {
|
|
// set m_ip to ip of eth1
|
|
m_ip = m_host->m_ipShotgun;
|
|
// this is now only used when sendSetup() is called
|
|
// for the start of sending a request/reply
|
|
m_host->m_preferEth = 1;
|
|
// use eth1 to talk to this guy for this tid
|
|
m_preferEth = 1;
|
|
logDebug(g_conf.m_logDebugUdp, "udp: switching to eth1 for host #%" PRId32" tid=%" PRId32,
|
|
m_host->m_hostId, m_transId);
|
|
}
|
|
// . otherwise, we were using the eth1 (shotgun) ip
|
|
// . do not switch though if the ping is really bad for eth0
|
|
else if ( m_preferEth == 1 ) {
|
|
// set m_ip to ip of eth0
|
|
m_ip = m_host->m_ip;
|
|
// this is now only used when sendSetup() is called
|
|
// for the start of sending a request/reply
|
|
m_host->m_preferEth = 0;
|
|
// use eth0 to talk to this guy for this tid
|
|
m_preferEth = 0;
|
|
logDebug(g_conf.m_logDebugUdp, "udp: switching to eth0 for host #%" PRId32" tid=%" PRId32,
|
|
m_host->m_hostId, m_transId);
|
|
}
|
|
// . just some debug notes
|
|
// . this happens when host cores and both eth0 and eth1 r dead
|
|
//logf(LOG_DEBUG,"udp: not switching. preferEth=%" PRId32" "
|
|
// "pingSHotgun=%" PRId32" ping=%" PRId32,(int32_t)m_host->m_preferEth,
|
|
// m_host->m_pingShotgun,m_host->m_ping);
|
|
}
|
|
|
|
// . tally the count
|
|
// . need to increment since won't resend to eth1 unless this is 2
|
|
m_resendCount++;
|
|
// debug msg
|
|
if ( g_conf.m_logDebugUdp || (g_conf.m_logDebugDns && !m_proto->useAcks()) ) {
|
|
char ipbuf[16];
|
|
logf(LOG_DEBUG, "udp: resending slot all=%" PRId32" tid=%" PRId32" dst=%s:%hu count=%" PRId32" host=%p"
|
|
" cleared=%" PRId32,
|
|
(int32_t) resendAll,
|
|
m_transId,
|
|
iptoa(m_ip,ipbuf),
|
|
(uint16_t) m_port,
|
|
(int32_t) m_resendCount,
|
|
m_host,
|
|
(int32_t) cleared);
|
|
}
|
|
|
|
// . after UdpServer::readTimeOutPoll() calls this prepareForResend()
|
|
// he then calls doSending()
|
|
// . but we cannot send unless the token is free or we're older (500ms)
|
|
// than the guy that has the token
|
|
// . therefore let's update the m_lastSentTime if we didn't send
|
|
// anything, just so readTimeoutPoll() quits calling us every time
|
|
m_lastSendTime = now;
|
|
|
|
// . don't increase our m_resendTime if we didn't resend anything
|
|
// . that way when the token is available or 500ms younger than us
|
|
// we won't be waiting 600ms until we can check that!
|
|
if ( cleared == 0 ) {
|
|
return;
|
|
}
|
|
|
|
// update stats for this host for the PageHosts.cpp table
|
|
Host *h = m_host;
|
|
if ( ! h && m_hostId >= 0 ) h = g_hostdb.getHost ( m_hostId );
|
|
if ( h ) h->m_totalResends += cleared;
|
|
// . set the resend time based on m_resendCount and m_niceness
|
|
// . this typically doubles m_resendTime with each resendCount
|
|
setResendTime ();
|
|
}
|
|
|
|
void UdpSlot::setResendTime() {
|
|
// otherwise, calculate how much time since our last send
|
|
int32_t max ;
|
|
if ( m_maxWait >= 0 ) max = m_maxWait;
|
|
else if ( m_niceness == 0 ) max = MAX_RESEND_0;
|
|
else max = MAX_RESEND_1;
|
|
// if backoff not negative use that
|
|
if ( m_backoff >= 0 ) {
|
|
// compute resend time
|
|
int32_t bs = ( 1 << m_resendCount );
|
|
int32_t val = ((int32_t)m_backoff) * bs;
|
|
// check for overflow
|
|
if ( val < (int32_t)m_backoff )
|
|
m_resendTime = max;
|
|
else if ( val < bs )
|
|
m_resendTime = max;
|
|
else if ( bs < 0 )
|
|
m_resendTime = max;
|
|
else
|
|
m_resendTime = val;
|
|
// . don't exceed the max, though of .4 seconds
|
|
// . it's crucial to keep this fairly low because an old slot
|
|
// can only steal the token when a dgram or ack is read
|
|
// into that slot
|
|
if ( m_resendTime > max ) m_resendTime = max;
|
|
return;
|
|
}
|
|
|
|
// is it a local ip?
|
|
bool isLocal = ip_distance(m_ip)<=ip_distance_nearby;
|
|
// . keep our resend times up-to-date
|
|
// . recompute a new resend time in milliseconds for the winning slot
|
|
// . we double,triple,... the deviation as our backoff scheme
|
|
// . get the avg/stdDev round trip times for this host from the hostmap
|
|
// . these times may change every time we receive an ACK for this host
|
|
// . the new resend time is like double
|
|
if ( m_niceness == 0 ) {
|
|
// if size is short we typically use smaller resend time
|
|
if ( m_dgramsToSend <= 1 ) m_resendTime = RESEND_0_SHORT;
|
|
else m_resendTime = RESEND_0;
|
|
// save for checking for overflow
|
|
int32_t tt = m_resendTime;
|
|
// 30 ms resend time for starters for high priority slots
|
|
m_resendTime *= ( 1 << m_resendCount );
|
|
// watch out for overflow
|
|
if ( m_resendTime < tt ) m_resendTime = max;
|
|
// don't exceed the max, though of .4 seconds
|
|
if ( m_resendTime > max ) m_resendTime = max;
|
|
// quick and somewhat incorrect overflow check
|
|
if ( m_resendTime <= 0 ) m_resendTime = max;
|
|
} else {
|
|
int32_t base = RESEND_1;
|
|
if ( isLocal ) base = RESEND_1_LOCAL;
|
|
m_resendTime = base * ( 1 << m_resendCount );
|
|
// watch out for overflow
|
|
if ( m_resendTime < base ) m_resendTime = max;
|
|
//try to prevent everyone from synching up on
|
|
//a bogged down host when spidering.
|
|
m_resendTime += rand() % m_resendTime;
|
|
// don't exceed the max, though of 30 seconds
|
|
if ( m_resendTime > max ) m_resendTime = max;
|
|
// quick and somewhat incorrect overflow check
|
|
if ( m_resendTime <= 0 ) m_resendTime = max;
|
|
}
|
|
|
|
// if we're dns protocol, always use resendTime of 4 seconds
|
|
if ( ! m_proto->useAcks() ) {
|
|
m_resendTime = 4000;
|
|
}
|
|
}
|
|
|
|
// . returns values:
|
|
// . -2 if nothing to send
|
|
// . -1 on error,
|
|
// . 0 if blocked,
|
|
// . 1 if completed sending a datagram/ACK
|
|
// . sets g_errno on error
|
|
// . this is only called by UdpServer::doSending()
|
|
// . we try to do ALL the reading before calling this so we can send
|
|
// many ACKs back in one packet
|
|
int32_t UdpSlot::sendDatagramOrAck ( int sock, bool allowResends, int64_t now ){
|
|
//log("sendDatagramOrAck");
|
|
// if acks we've sent isn't caught up to what we read, send an ack
|
|
if ( m_sentAckBitsOn < m_readBitsOn && m_proto->useAcks() )
|
|
return sendPlainAck ( sock , now );
|
|
// we may have received an ack for an implied resend (from ack gap)
|
|
// so we clear some bits, but then got an ACK back later
|
|
while ( m_nextToSend < m_dgramsToSend &&
|
|
isOn ( m_nextToSend , m_sentBits2 ) )
|
|
m_nextToSend++;
|
|
// if we've sent it all return -2
|
|
if ( m_sentBitsOn >= m_dgramsToSend ) return -2;
|
|
// or if we hit the end of the road, but m_sentBitsOn is not full,
|
|
// then m_nextToSend must have been too high
|
|
if ( m_nextToSend >= m_dgramsToSend ) {
|
|
log(LOG_LOGIC,
|
|
"udp: senddatagramorack: m_nextToSend=%" PRId32" >= %" PRId32". "
|
|
"Fixing it. Do not panic.",
|
|
m_nextToSend , m_dgramsToSend );
|
|
fixSlot();
|
|
return 1;
|
|
}
|
|
// get the ip
|
|
int32_t ip = m_ip;
|
|
// . if this is a send to our ip use the loopback interface
|
|
// . MTU is very high here
|
|
if ( ip_distance(m_ip)==ip_distance_ourselves )
|
|
ip = g_hostdb.getLoopbackIp();
|
|
// pick a dgram to send
|
|
int32_t dgramNum = m_nextToSend;
|
|
|
|
// . store dgram #dgramNum from this send buf into "dgram"
|
|
// . let the protocol set the dgram from the m_sendBuf for us
|
|
char buf [ DGRAM_SIZE_CEILING ];
|
|
// should hold all headers
|
|
char saved [ 32 ];
|
|
// the header size
|
|
int32_t headerSize = m_proto->getHeaderSize(0);
|
|
// bitch if too big
|
|
if ( headerSize > 32 ) {
|
|
log(LOG_LOGIC,"udp: senddatagramorack: header size of %" PRId32" "
|
|
"is bigger than 32.",headerSize); return -1; }
|
|
// . now from here on we only use headerSize so we can strip the header
|
|
// . so if the protocol wants the headers, leave them in...
|
|
if ( ! m_proto->stripHeaders() ) headerSize = 0;
|
|
// offset into send buffer, the data to send
|
|
int32_t offset = dgramNum * ( m_maxDgramSize - headerSize );
|
|
// what should we send, and how much?
|
|
char *send = m_sendBuf + offset;
|
|
int32_t sendSize = m_sendBufSize - offset;
|
|
#ifdef _VALGRIND_
|
|
VALGRIND_CHECK_MEM_IS_DEFINED(send,sendSize);
|
|
#endif
|
|
// truncate to max size of dgram we're allowed
|
|
if ( sendSize > m_maxDgramSize - headerSize )
|
|
sendSize = m_maxDgramSize - headerSize;
|
|
// where to store the dgram, header and data, assume "buf"
|
|
char *dgram = buf;
|
|
// size of dgram, header and data
|
|
int32_t dgramSize = headerSize + sendSize;
|
|
// if we're NOT the 1st dgram we can store into send buf directly
|
|
if ( dgramNum != 0 ) {
|
|
// where to store the header? right into send buf
|
|
dgram = send - headerSize;
|
|
// but save before overwriting
|
|
gbmemcpy ( saved , dgram , headerSize );
|
|
}
|
|
// store header into "dgram"
|
|
m_proto->setHeader(dgram, m_sendBufSize, m_msgType, dgramNum, m_transId, m_callback, m_localErrno, m_niceness);
|
|
#ifdef _VALGRIND_
|
|
VALGRIND_CHECK_MEM_IS_DEFINED(dgram,headerSize);
|
|
#endif
|
|
// . if we're the first dgram, we can't back up for the header...
|
|
// . copy data into dgram if we're the 1st dgram
|
|
if ( dgramNum == 0 )
|
|
gbmemcpy ( dgram + headerSize , send , sendSize );
|
|
|
|
// if we are the proxy sending a udp packet to our flock, then make
|
|
// sure that we send to tmp cluster if we should
|
|
if ( g_proxy.isProxy() && g_conf.m_useTmpCluster && m_host )
|
|
m_port = m_host->m_port + 1;
|
|
else if ( m_host )
|
|
m_port = m_host->m_port ;
|
|
|
|
// we need a destination stored in a sockaddr for passing to sendto()
|
|
// get sending info from the send control slot (network order)
|
|
// TODO: ensure network order
|
|
struct sockaddr_in to;
|
|
memset(&to,0,sizeof(to));
|
|
to.sin_family = AF_INET;
|
|
//to.sin_addr.s_addr = .... more complicated than that
|
|
to.sin_port = htons ( m_port );
|
|
// are we sending to loopback? if so, treat as eth0.
|
|
if ( ip_distance(ip) == ip_distance_ourselves ) {
|
|
to.sin_addr.s_addr = ip;
|
|
// update stats, just put them all in g_udpServer
|
|
g_udpServer.m_eth0PacketsOut += 1;
|
|
g_udpServer.m_eth0BytesOut += dgramSize;
|
|
} else if ( m_host ) {
|
|
if ( m_preferEth == 1 ) {
|
|
// we now pick ip based on this. if we fail to get a timely ACK
|
|
// then we set switch eth preferences. helps when a switch crashes.
|
|
to.sin_addr.s_addr = m_host->m_ipShotgun;
|
|
} else {
|
|
to.sin_addr.s_addr = m_host->m_ip;
|
|
}
|
|
|
|
// update stats, just put them all in g_udpServer
|
|
g_udpServer.m_eth0PacketsOut += 1;
|
|
g_udpServer.m_eth0BytesOut += dgramSize;
|
|
} else {
|
|
// count packets to/from hosts outside the cluster separately
|
|
// these guys are importing link text usually
|
|
to.sin_addr.s_addr = ip;
|
|
g_udpServer.m_outsiderPacketsOut += 1;
|
|
g_udpServer.m_outsiderBytesOut += dgramSize;
|
|
}
|
|
|
|
// . this socket should be non-blocking (i.e. return immediately)
|
|
// . this should set g_errno on error!
|
|
// MSG_DONTROUTE makes dns fail
|
|
int bytesSent = sendto(sock, dgram, dgramSize, 0, (struct sockaddr *) (void *) &to, sizeof(to));
|
|
|
|
// restore what we overwrote
|
|
if ( dgramNum != 0 ) {
|
|
gbmemcpy ( dgram , saved , headerSize );
|
|
}
|
|
|
|
// return -1 on error or 0 if blocked
|
|
if ( bytesSent < 0 ) {
|
|
// copy errno to g_errno
|
|
g_errno = errno;
|
|
if ( g_errno == EAGAIN ) { g_errno = 0; return 0;}
|
|
// not in linux
|
|
// . "output queue for a network interface was full"
|
|
// . however, linux just silently drops packets!!!!!!!
|
|
// . i think using more than 1GB in this process brings this
|
|
// problem up, the kernel's kmalloc fails...
|
|
if ( g_errno == ENOBUFS ) {
|
|
// log it once every 3 seconds so they know
|
|
static int32_t s_lastTime = 0;
|
|
static int32_t s_count = 0;
|
|
int32_t t = getTime();
|
|
if ( t - s_lastTime > 3 ||
|
|
s_lastTime - t > 3 ) { // clock skew?
|
|
s_lastTime = getTime();
|
|
log(LOG_WARN, "udp: got ENOBUFS kernel bug %" PRId32" times.", ++s_count);
|
|
}
|
|
//g_errno = 0;
|
|
//return 0;
|
|
return -1;
|
|
}
|
|
// log the error
|
|
log(LOG_WARN, "udp: Call to sendto had error (ignoring): %s.", mstrerror(g_errno)) ;
|
|
// . now immediately switch the eth port to see if that helps!
|
|
// . actually, just pretend we sent it. we won't get an ack
|
|
// and the resend algo will switch ports
|
|
//return -1;
|
|
bytesSent = dgramSize;
|
|
}
|
|
// this should not happen
|
|
if ( bytesSent != dgramSize ) {
|
|
g_errno = EBADENGINEER;
|
|
log(LOG_WARN, "udp: sendto only sent %i bytes, not %" PRId32". Undersend.", bytesSent,dgramSize);
|
|
return -1;
|
|
}
|
|
// general count
|
|
if ( m_niceness == 0 ) g_stats.m_packetsOut[m_msgType][0]++;
|
|
else g_stats.m_packetsOut[m_msgType][1]++;
|
|
// keep stats
|
|
if ( m_host ) m_host->m_dgramsTo++;
|
|
// keep track of dgrams sent outside of our cluster
|
|
//else g_stats.m_dgramsToStrangers++;
|
|
// get time now
|
|
//int64_t now = gettimeofdayInMilliseconds();
|
|
// . if it's our first, mark this for g_stats UDP_*_OUT_BPS
|
|
// . sendSetup() will set m_firstSendTime to -1
|
|
if (m_sentBitsOn == 0 && m_firstSendTime == -1) m_firstSendTime =now;
|
|
// mark this dgram as sent
|
|
setBit ( dgramNum , m_sentBits2 );
|
|
// count the bit we lit
|
|
m_sentBitsOn++;
|
|
// update last send time stamp even if we're a resend
|
|
m_lastSendTime = now;
|
|
// update m_nextToSend
|
|
m_nextToSend = getNextUnlitBit ( dgramNum, m_sentBits2,m_dgramsToSend);
|
|
// log network info
|
|
if ( g_conf.m_logDebugUdp ) {
|
|
//int32_t shotgun = 0;
|
|
//if ( g_conf.m_useShotgun && s_useShotgunIp ) shotgun = 1;
|
|
int32_t eth = 1;
|
|
if ( m_host && m_host->m_ip == to.sin_addr.s_addr ) eth = 0;
|
|
// if sending outside, always use eth0
|
|
if ( ! m_host ) eth = 0;
|
|
//if ( m_host->m_ip == (uint32_t)ip ) eth = 0;
|
|
int32_t hid = -1;
|
|
if ( m_host )
|
|
hid = m_host->m_hostId;
|
|
|
|
int32_t kk = 0; if ( m_callback ) kk = 1;
|
|
char ipbuf[16];
|
|
log(LOG_DEBUG,
|
|
"udp: sent dgram "
|
|
"dgram=%" PRId32" "
|
|
"dgrams=%" PRId32" "
|
|
"msg=0x%02x "
|
|
"tid=%" PRId32" "
|
|
"dst=%s:%hu "
|
|
"eth=%" PRId32" "
|
|
"init=%" PRId32" "
|
|
"age=%" PRId32" "
|
|
"dsent=%" PRId32" "
|
|
"aread=%" PRId32" "
|
|
"len=%" PRId32" "
|
|
"msgSz=%" PRId32" "
|
|
"cnt=%" PRId32" "
|
|
"wait=%" PRId32" "
|
|
"error=%" PRId32" "
|
|
"k.n1=%" PRIu32" n0=%" PRIu64" "
|
|
"maxdgramsz=%" PRId32" "
|
|
"hid=%" PRId32 " "
|
|
"bytesSent=%d",
|
|
(int32_t)dgramNum,
|
|
(int32_t)m_dgramsToSend,
|
|
(int16_t)m_msgType,
|
|
m_transId,
|
|
iptoa(to.sin_addr.s_addr,ipbuf),
|
|
(uint16_t)m_port,
|
|
eth,//shotgun,
|
|
(int32_t)kk ,
|
|
(int32_t)(now-m_startTime) ,
|
|
(int32_t)m_sentBitsOn ,
|
|
(int32_t)m_readAckBitsOn ,
|
|
(int32_t)bytesSent ,
|
|
(int32_t)m_sendBufSize,
|
|
(int32_t)m_resendCount,
|
|
(int32_t)m_resendTime ,
|
|
(int32_t)m_localErrno ,
|
|
m_key.n1,m_key.n0 ,
|
|
m_maxDgramSize ,
|
|
hid,
|
|
bytesSent);
|
|
}
|
|
|
|
// return 1 cuz we didn't block
|
|
return 1;
|
|
}
|
|
|
|
// assume m_readBits2, m_sendBits2, m_sentAckBits2 and m_readAckBits2 are
|
|
// correct and update m_firstUnlitSentAckBit, m_sentAckBitsOn, m_readBitsOn,
|
|
// m_readAckBitsOn and m_sentBitsOn
|
|
void UdpSlot::fixSlot ( ) {
|
|
// log it
|
|
log(LOG_LOGIC,
|
|
"udp: before fixSlot(): "
|
|
"m_readBitsOn=%" PRId32" "
|
|
"m_readAckBitsOn=%" PRId32" "
|
|
"m_sentBitsOn=%" PRId32" "
|
|
"m_sentAckBitsOn=%" PRId32" "
|
|
"m_firstUnlitSentAckBit=%" PRId32" "
|
|
"m_nextToSend=%" PRId32" " ,
|
|
m_readBitsOn,
|
|
m_readAckBitsOn,
|
|
m_sentBitsOn,
|
|
m_sentAckBitsOn,
|
|
m_firstUnlitSentAckBit,
|
|
m_nextToSend );
|
|
|
|
m_readBitsOn = 0;
|
|
m_readAckBitsOn = 0;
|
|
m_sentBitsOn = 0;
|
|
m_sentAckBitsOn = 0;
|
|
for ( int32_t i = 0 ; i < m_dgramsToRead ; i++ ) {
|
|
if ( isOn ( i , m_readBits2 ) ) m_readBitsOn++;
|
|
// we send back an ack for every dgram read
|
|
if ( isOn ( i , m_sentAckBits2 ) ) m_sentAckBitsOn++;
|
|
}
|
|
for ( int32_t i = 0 ; i < m_dgramsToSend ; i++ ) {
|
|
if ( isOn ( i , m_sentBits2 ) ) m_sentBitsOn++;
|
|
// we must read an ack for every dgram sent
|
|
if ( isOn ( i , m_readAckBits2 ) ) m_readAckBitsOn++;
|
|
}
|
|
|
|
// start at bit #0 so this doesn't loop forever
|
|
m_firstUnlitSentAckBit=getNextUnlitBit(-1,m_sentAckBits2,m_dgramsToRead);
|
|
m_nextToSend =getNextUnlitBit(-1,m_sentBits2 ,m_dgramsToSend);
|
|
|
|
log(LOG_LOGIC,
|
|
"udp: after fixSlot(): "
|
|
"m_readBitsOn=%" PRId32" "
|
|
"m_readAckBitsOn=%" PRId32" "
|
|
"m_sentBitsOn=%" PRId32" "
|
|
"m_sentAckBitsOn=%" PRId32" "
|
|
"m_firstUnlitSentAckBit=%" PRId32" "
|
|
"m_nextToSend=%" PRId32" " ,
|
|
m_readBitsOn,
|
|
m_readAckBitsOn,
|
|
m_sentBitsOn,
|
|
m_sentAckBitsOn,
|
|
m_firstUnlitSentAckBit,
|
|
m_nextToSend );
|
|
}
|
|
|
|
// . this should be called only after read poll has nothing left to read so
|
|
// we can combine many ACKs into one mega ACK and save packets per second
|
|
// on the network (reduce by half?)
|
|
// . returns values:
|
|
// . -2 if nothing to send
|
|
// . -1 on error,
|
|
// . 0 if blocked,
|
|
// . 1 if completed sending a datagram/ACK
|
|
// . if we Initiated is the default -2, then we use m_callback to determine
|
|
// if we initiated the transaction or not
|
|
// . if m_callback is NULL we did NOT intiate the transaction
|
|
// . we should only be called if m_sentAckBitsOn < m_readBitsOn, i.e.
|
|
// when we're not caught up with ACKing with what we've read
|
|
int32_t UdpSlot::sendAck ( int sock , int64_t now ,
|
|
int32_t dgramNum , int32_t weInitiated ,
|
|
bool cancelTrans ) {
|
|
// protection from garbled dgrams
|
|
if ( dgramNum >= MAX_DGRAMS ) {
|
|
log(LOG_LOGIC,
|
|
"udp: Sending ack for dgram #%" PRId32" > max dgram of %" PRId32".",
|
|
dgramNum,(int32_t)MAX_DGRAMS); return 1; }
|
|
// remember if forced or not
|
|
//int32_t forced = dgramNum;
|
|
// if this was not supplied, look at m_callback to determine it
|
|
if ( weInitiated == -2 ) {
|
|
if ( m_callback ) weInitiated = 1;
|
|
else weInitiated = 0;
|
|
}
|
|
// a little dgram buffer
|
|
char dgram[DGRAM_SIZE_CEILING];
|
|
|
|
// . if dgramNum is -1, send the next ack in line
|
|
// . it's the first bit in m_sentAckBits2 that is 0 while being
|
|
// lit in m_readBits
|
|
if ( dgramNum == -1 ) {
|
|
// m_firstUnlitSentAckBit is the first clr bit in m_sentAckBits
|
|
dgramNum = m_firstUnlitSentAckBit;
|
|
// . now find the first bit in m_sentAckBits2 that is off
|
|
// yet on in m_readBits
|
|
// . the OLD statement below didn't check to see if dgramNum is
|
|
// then off in m_sentAckBits!!!
|
|
// . let's do it custom then!
|
|
// . we know that m_sentAckBitsOn < m_readBitsOn so the
|
|
// we must find a bit with these properties
|
|
for ( ; dgramNum < m_dgramsToRead ; dgramNum++ ) {
|
|
// if bit off in m_readBits2, it's not an ACK candidate
|
|
if(!isOn(dgramNum,m_readBits2))continue;
|
|
// if bit is off in m_sentAckBits2, that's the one!
|
|
if(!isOn(dgramNum,m_sentAckBits2))break;
|
|
}
|
|
// if we had no match, that's an error!
|
|
if ( dgramNum >= m_dgramsToRead ) {
|
|
log(LOG_LOGIC,
|
|
"udp: Sending ack for dgram #%" PRId32" which is passed "
|
|
"the number of dgrams we have to read, %" PRId32". "
|
|
"Fixing. Do not panic.",
|
|
dgramNum , m_dgramsToRead );
|
|
fixSlot();
|
|
return -1;
|
|
}
|
|
}
|
|
// . ask the protocol class to make an ACK for us and store in "dgram"
|
|
// . we initiated the transaction if our callback is non-NULL
|
|
int32_t dgramSize = m_proto->makeAck ( dgram ,
|
|
dgramNum ,
|
|
m_transId ,
|
|
weInitiated ,
|
|
cancelTrans );
|
|
// get the ip
|
|
uint32_t ip = m_ip;
|
|
// . if this is a send to our ip use the loopback interface
|
|
// . MTU is very high here
|
|
if ( ip_distance(m_ip)==ip_distance_ourselves )
|
|
ip = g_hostdb.getLoopbackIp();
|
|
|
|
// if we are the proxy sending a udp packet to our flock, then make
|
|
// sure that we send to tmp cluster if we should
|
|
if ( g_proxy.isProxy() && g_conf.m_useTmpCluster && m_host )
|
|
m_port = m_host->m_port + 1;
|
|
else if ( m_host )
|
|
m_port = m_host->m_port ;
|
|
|
|
// get the ip address of dest. host from the slot
|
|
struct sockaddr_in to;
|
|
memset(&to,0,sizeof(to));
|
|
to.sin_family = AF_INET;
|
|
to.sin_addr.s_addr = ip;
|
|
to.sin_port = htons ( m_port );
|
|
|
|
// stat count
|
|
if ( cancelTrans ) g_cancelAcksSent++;
|
|
// . this socket should be non-blocking (i.e. return immediately)
|
|
// . this should set g_errno on error
|
|
int bytesSent = sendto ( sock ,
|
|
dgram ,
|
|
dgramSize ,
|
|
0 ,
|
|
(struct sockaddr *)(void*)&to ,
|
|
sizeof ( to ) );
|
|
// return -1 on error, 0 if blocked
|
|
if ( bytesSent < 0 ) {
|
|
// copy errno to g_errno
|
|
g_errno = errno;
|
|
if ( g_errno == EAGAIN ) { g_errno = 0; return 0; }
|
|
if ( g_errno == ENOBUFS ) { g_errno = 0; return 0; }
|
|
log("udp: error sending ack: %s",mstrerror(g_errno));
|
|
return -1;
|
|
}
|
|
// this should not happen
|
|
if ( bytesSent != dgramSize ) {
|
|
g_errno = EBADENGINEER;
|
|
log("udp: sendto only sent %i bytes, not %" PRId32". Undersend.",
|
|
bytesSent,dgramSize);
|
|
//sleep(50000);
|
|
return -1;
|
|
}
|
|
// general count
|
|
if ( m_niceness == 0 ) g_stats.m_packetsOut[m_msgType][0]++;
|
|
else g_stats.m_packetsOut[m_msgType][1]++;
|
|
// we were an ack
|
|
if ( m_niceness == 0 ) g_stats.m_acksOut[m_msgType][0]++;
|
|
else g_stats.m_acksOut[m_msgType][1]++;
|
|
// keep stats
|
|
if ( m_host ) m_host->m_dgramsTo++;
|
|
if ( ! isOn ( dgramNum , m_sentAckBits2 ) ) {
|
|
// mark this ack as sent
|
|
setBit ( dgramNum , m_sentAckBits2 );
|
|
// count the bit we lit
|
|
m_sentAckBitsOn++;
|
|
}
|
|
// update last send time stamp even if we're a resend
|
|
m_lastSendTime = now; // gettimeofdayInMilliseconds();
|
|
// . dgramNum should neveber <, though
|
|
// . but this can happen if we're hot (signal handler)??? how???
|
|
if ( dgramNum < m_firstUnlitSentAckBit ) {
|
|
g_errno = EBADENGINEER;
|
|
log(LOG_LOGIC,
|
|
"udp: Sending ack for dgram #%" PRId32" which should have "
|
|
"already been sent. Next ack to send should be for dgram "
|
|
"# %" PRId32". Fixing. Do not panic.",
|
|
dgramNum , m_firstUnlitSentAckBit );
|
|
//g_process.shutdownAbort(true);
|
|
fixSlot();
|
|
return 1;
|
|
}
|
|
// . only update m_firstUnlitSentAckBit if we dgramNum was
|
|
// the first unlit bit in m_sentAckBits
|
|
// . otherwise, we had a read hole so we had to skip dgramNum around
|
|
if (dgramNum <= m_firstUnlitSentAckBit)
|
|
m_firstUnlitSentAckBit = getNextUnlitBit(dgramNum,
|
|
m_sentAckBits2,
|
|
m_dgramsToRead);
|
|
// log msg
|
|
if ( g_conf.m_logDebugUdp ) { // || cancelTrans ) {
|
|
//#ifdef _UDPDEBUG_
|
|
int32_t kk = 0; if ( m_callback ) kk = 1;
|
|
int32_t hid = -1;
|
|
if ( m_host )
|
|
hid = m_host->m_hostId;
|
|
char ipbuf[16];
|
|
logf(LOG_DEBUG,
|
|
"udp: sent ACK "
|
|
"dgram=%" PRId32" "
|
|
"msg=0x%02x "
|
|
"tid=%" PRId32" "
|
|
"src=%s:%hu "
|
|
"init=%" PRId32" "
|
|
"age=%" PRId32" "
|
|
"cancel=%" PRId32" "
|
|
"dread=%" PRId32" "
|
|
"asent=%" PRId32" "
|
|
"hid=%" PRId32,
|
|
(int32_t)dgramNum,
|
|
(int16_t)m_msgType ,
|
|
m_transId,
|
|
iptoa(m_ip,ipbuf),
|
|
(uint16_t)m_port,
|
|
(int32_t)kk ,
|
|
(int32_t)(gettimeofdayInMilliseconds() - m_startTime) ,
|
|
(int32_t)cancelTrans,
|
|
(int32_t)m_readBitsOn ,
|
|
(int32_t)m_sentAckBitsOn ,
|
|
hid);
|
|
//#endif
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
// . returns false and sets g_errno on error, true otherwise
|
|
// . if the read dgram had an error code we set g_errno to that and ret false
|
|
// . anyone calling this should call sendDatagramOrAck() immediately afterwards
|
|
// in case the send was blocking on receiving an ACK or we should send an ACK
|
|
// . updates: m_readBits2, m_readBitsOn, m_sentAckBits2, m_sentAckBitsOn
|
|
// m_firstUnlitSentAckBit
|
|
bool UdpSlot::readDatagramOrAck ( const void *readBuffer_,
|
|
int32_t readSize,
|
|
int64_t now) {
|
|
const char * const readBuffer = (const char*)readBuffer_;
|
|
// get dgram Number
|
|
int32_t dgramNum = m_proto->getDgramNum ( readBuffer, readSize );
|
|
// protection from garbled dgrams
|
|
if ( dgramNum >= MAX_DGRAMS ) {
|
|
log(LOG_LOGIC,
|
|
"udp: Reading for dgram #%" PRId32" > max dgram of %" PRId32".",
|
|
dgramNum,(int32_t)MAX_DGRAMS);
|
|
return true;
|
|
}
|
|
// was it a cancel signal?
|
|
if ( m_proto->isCancelTrans ( readBuffer, readSize ) ) {
|
|
char ipbuf[16];
|
|
log(LOG_DEBUG,
|
|
"udp: Read cancel ack hdrlen=%" PRId32" tid=%" PRId32" "
|
|
"src=%s:%hu msgType=0x%02x weInitiated=%p "
|
|
"sent=%" PRId32" "
|
|
"sendbufalloc=%p sendbufsize=%" PRIu32,
|
|
readSize , m_proto->getTransId ( readBuffer,readSize ),
|
|
iptoa(m_ip,ipbuf), m_port,
|
|
m_proto->getMsgType(readBuffer,readSize),
|
|
m_callback,
|
|
m_sentBitsOn,
|
|
m_sendBufAlloc,
|
|
(uint32_t)m_sendBufSize);
|
|
// stat count
|
|
g_cancelAcksRead++;
|
|
// what happens is that if we are handling a request and we
|
|
// try to send back the reply on this slot, it will have been
|
|
// destroyed by a call to makeCallbacks(). but really the
|
|
// purpose is to avoid sending large termlists back and
|
|
// wasting network bandwidth, so let's just avoid this if
|
|
// we are not IN THE MIDDLE OF doing a large send. when we
|
|
// start the send it will probably send us another cancel ack
|
|
// and we can abort it then. before, this was causing Msg20
|
|
// to crash because the requester would send us a cancel ack
|
|
// and destroy the slot that msg20 would try to send its reply
|
|
// on. It's reply was delayed and when it finally came round
|
|
// the slot was destroyed...
|
|
// hey, m_sentBitsOn can be non-zero even if we haven't sent
|
|
// anything because m_sentBits[i] gets forced on below if we
|
|
// read an ack...
|
|
if ( m_sentBitsOn <= 0 ) return true;
|
|
// sometimes it points to a separate send buffer
|
|
//if ( ! m_sendBufAlloc ) return true;
|
|
// msg1 sends back an empty reply (0 bytes) so we need to
|
|
// check m_resendCount as well, because if we have never
|
|
// generated a reply it should be 0! we are having problems
|
|
// with acks getting dropped on the floor and the reply
|
|
// keeps getting re-sent over and over, and the received
|
|
// cancel acks are ignore because msg1 has a 0 byte reply...
|
|
if ( ! m_sendBufSize && m_resendCount<=0 ) return true;
|
|
// record if we cancelled it. how many cancel acks we read!
|
|
if ( m_niceness == 0 ) g_stats.m_cancelRead[m_msgType][0]++;
|
|
else g_stats.m_cancelRead[m_msgType][1]++;
|
|
// force to be done so UdpServer::makeCallback() will close it
|
|
m_dgramsToRead = 1;
|
|
m_dgramsToSend = 1;
|
|
m_readBitsOn = 1;
|
|
m_sentBitsOn = 1;
|
|
m_readAckBitsOn = 1;
|
|
m_sentAckBitsOn = 1;
|
|
// assume the receiver ran out of memory
|
|
m_errno = ENOMEM;
|
|
return true;
|
|
}
|
|
// handle acks
|
|
if ( m_proto->isAck ( readBuffer, readSize ) ) {
|
|
readAck ( dgramNum , now );
|
|
// keep stats
|
|
if ( m_host ) m_host->m_dgramsFrom++;
|
|
return true;
|
|
}
|
|
// . now we have a regular dgram to process
|
|
// . get the timestamp in microseconds
|
|
// log msg
|
|
if ( g_conf.m_logDebugUdp ) {
|
|
int32_t hid = -1;
|
|
if ( m_host )
|
|
hid = m_host->m_hostId;
|
|
int32_t kk = 0; if ( m_callback ) kk = 1;
|
|
char ipbuf[16];
|
|
log(LOG_DEBUG,
|
|
"udp: Read dgram "
|
|
"dgram=%" PRId32" "
|
|
"msg=0x%02x "
|
|
"tid=%" PRId32" "
|
|
"src=%s:%hu "
|
|
"init=%" PRId32" "
|
|
"age=%" PRId32" "
|
|
"dread=%" PRId32" "
|
|
"asent=%" PRId32" "
|
|
"msgSz=%" PRId32" "
|
|
"error=%" PRId32" "
|
|
"hid=%" PRId32,
|
|
(int32_t)dgramNum,
|
|
(int16_t)m_proto->getMsgType(readBuffer,readSize),
|
|
(int32_t)m_proto->getTransId(readBuffer,readSize),
|
|
iptoa(m_ip,ipbuf),
|
|
(uint16_t)m_port,
|
|
(int32_t)kk,
|
|
(int32_t)(gettimeofdayInMilliseconds() - m_startTime) ,
|
|
(int32_t)m_readBitsOn ,
|
|
(int32_t)m_sentAckBitsOn ,
|
|
(int32_t)m_proto->getMsgSize(readBuffer,readSize) ,
|
|
(int32_t)(m_proto->hadError(readBuffer,readSize)),
|
|
hid);
|
|
}
|
|
// update time of last read
|
|
m_lastReadTime = gettimeofdayInMilliseconds();
|
|
// if it's passing us an g_errno then set our g_errno from it
|
|
if ( m_proto->hadError ( readBuffer, readSize ) ) {
|
|
// bitch if not dgramNum #0
|
|
if ( dgramNum != 0 )
|
|
log(LOG_LOGIC,"udp: Error dgram is not dgram #0.");
|
|
// it's new to us, set the read bits
|
|
setBit ( dgramNum, m_readBits2 );
|
|
// we read one dgram
|
|
m_readBitsOn = 1;
|
|
// only one dgram to read
|
|
m_dgramsToRead = 1;
|
|
// tell caller we haven't read anything
|
|
m_readBufSize = 0;
|
|
// . but set the remote error bit so we know it's not local
|
|
// . why? this was messing up g_errno interp. in Multicast!
|
|
g_errno = m_proto->getErrno(readBuffer,readSize);//|REMOTE_ERROR_BIT;
|
|
// return false cuz this was a remote-side error
|
|
return false;
|
|
}
|
|
// . if he's sending a REPLY then set all of our m_readAckBits
|
|
// because he must have sent ACKs (or tried) for all dgrams in the
|
|
// request
|
|
// . did we initiate?
|
|
// . AND did we miss some ACKs he sent to us?
|
|
if ( m_callback && m_readAckBitsOn != m_dgramsToSend ) {
|
|
// catch em all up
|
|
for ( int32_t i = 0 ; i < m_dgramsToSend ; i++ )
|
|
setBit ( i , m_readAckBits2 );
|
|
m_readAckBitsOn = m_dgramsToSend;
|
|
if ( g_conf.m_logDebugUdp ) {
|
|
char ipbuf[16];
|
|
log(LOG_DEBUG,"udp: Cramming ACKs "
|
|
"tid=%" PRId32" "
|
|
"dst=%s:%hu" ,
|
|
m_transId ,
|
|
iptoa(m_ip,ipbuf),
|
|
(uint16_t)m_port);
|
|
}
|
|
}
|
|
|
|
// . if it's our first, mark this for g_stats UDP_*_IN_BPS
|
|
// did we already receive this dgram?
|
|
if ( isOn(dgramNum,m_readBits2) ) {
|
|
// did we already send the ack for it?
|
|
if ( isOn(dgramNum,m_sentAckBits2) ) {
|
|
// clear the ack we sent for this so we send it again
|
|
clrBit ( dgramNum , m_sentAckBits2 );
|
|
// reduce lit bit count of sent acks
|
|
m_sentAckBitsOn--;
|
|
// update the next ack to send
|
|
if ( dgramNum < m_firstUnlitSentAckBit )
|
|
m_firstUnlitSentAckBit = dgramNum;
|
|
return true;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// . copy the msg meat into our m_readBuf
|
|
// . how big is the dgram header?
|
|
int32_t headerSize = m_proto->getHeaderSize ( readBuffer, readSize );
|
|
// make it zero if proto wants them in m_readBuf
|
|
if ( ! m_proto->stripHeaders() ) headerSize = 0;
|
|
// . we store transId, size, type, etc. in the UdpSlot
|
|
// . we store the msg in it's pre-sent form (w/o dgram headers)
|
|
// . "maxDataSize" is max bytes of msg data per dgram (w/o hdr)
|
|
int32_t maxDataSize = m_maxDgramSize - headerSize;
|
|
int32_t offset = dgramNum * maxDataSize;
|
|
|
|
// we'll read it ourselves, so tell caller not to read it
|
|
|
|
// . how many bytes should be in this dgram?
|
|
// . this will be -1 if unknown, but under a dgram's worth of bytes
|
|
// . -1 is used for the DNS protocol
|
|
int32_t msgSize = m_proto->getMsgSize ( readBuffer, readSize );
|
|
|
|
// if this is the first dgram then set this shit
|
|
if ( m_readBitsOn == 0 ) {
|
|
// how many dgrams are we reading for this msg?
|
|
m_dgramsToRead = m_proto->getNumDgrams(msgSize,m_maxDgramSize);
|
|
// set the msgType from the dgram header
|
|
m_msgType = static_cast<msg_type_t>(m_proto->getMsgType(readBuffer, readSize));
|
|
// how big is the msg? remember it
|
|
m_readBufSize = msgSize;
|
|
// . set the cback niceness
|
|
// . ONLY if slot is new! otherwise, we keep the sender's
|
|
// niceness. so if the slot niceness got converted by
|
|
// the handler we do not re-nice it on our end.
|
|
if ( ! m_sendBuf ) {
|
|
m_niceness = m_proto->isNice(readBuffer, readSize);
|
|
}
|
|
}
|
|
|
|
// . if m_readBuf is NULL then init m_readBuf/m_readBufMaxSize big
|
|
// enough to hold "msgSize" bytes
|
|
// . if we are hot then do not call malloc but try to use m_tmpBuf
|
|
// . if we fail, return false and set g_errno
|
|
|
|
// . init m_readBuf, m_readBufMaxSize and m_dgramsToRed
|
|
// . only inits m_readBuf and m_readBufMaxSize if these are 0
|
|
//
|
|
// ERROR!!!! cannot call malloc() in a signal handler
|
|
// But now, IF WE'RE HOT, sendRequest() should pre-allocate m_readBuf
|
|
// and if we're reading in an incoming request then it cannot be bigger
|
|
// than the slot's m_quickBuf which we set m_readBuf to in
|
|
// makeReadBuf if we're hot...
|
|
//
|
|
|
|
// . this dgram should let us know how big the entire msg is
|
|
// . so allocate space for m_readBuf
|
|
// . we may already have a read buf if caller passed one in
|
|
if ( ! m_readBuf ) {
|
|
if ( ! makeReadBuf ( msgSize , m_dgramsToRead ) ) {
|
|
log(LOG_WARN, "udp: Failed to allocate %" PRId32" bytes to read request or reply for udp socket.", msgSize);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// message size shouldn't change
|
|
if (msgSize > m_readBufMaxSize) {
|
|
g_udpServer.getLock().unlock();
|
|
gbshutdownLogicError();
|
|
}
|
|
|
|
// return false if we have no room for the entire reply
|
|
if ( msgSize > m_readBufMaxSize ) {
|
|
g_errno = EBUFTOOSMALL;
|
|
log( LOG_WARN, "udp: Msg size of %" PRId32" bytes is too big for the "
|
|
"buffer size, %" PRId32", we allocated. msgType=0x%02x.",
|
|
msgSize, m_readBufMaxSize, (int)m_msgType);
|
|
return false;
|
|
}
|
|
|
|
// dgram #'s above 0 can be copied directly into m_readBuf
|
|
int32_t avail = readSize - headerSize;
|
|
|
|
if (dgramNum == 0 && m_readBufSize == -1) {
|
|
m_readBufSize = avail;
|
|
}
|
|
|
|
// where to put it?
|
|
char *dest = m_readBuf + offset;
|
|
|
|
// sanity check, watch out for bad headers...
|
|
if ( avail < 0 ) {
|
|
// do not spam the logs
|
|
static int32_t s_badCount = 0;
|
|
s_badCount++;
|
|
|
|
log(LOG_WARN, "udp: got %" PRId32" bad dgram headers. "
|
|
"dgramNum=%" PRId32" offset=%" PRId32" "
|
|
"readBufMaxSize=%" PRId32". IS hosts.conf OUT OF SYNC???",
|
|
s_badCount,(int32_t)dgramNum,(int32_t)offset,
|
|
(int32_t)m_readBufMaxSize);
|
|
|
|
// this actually helps us to identify when hosts.conf
|
|
// is out of sync between hosts, so core
|
|
// SEEMS like the roadrunner wireless connection
|
|
// is spiking our packets sometimes with noise...
|
|
//g_process.shutdownAbort(true);
|
|
return false;
|
|
}
|
|
|
|
log(LOG_DEBUG, "udp: tid=%d dgram=%d avail=%d offset=%d", m_transId, dgramNum, avail, offset);
|
|
|
|
memcpy(dest, readBuffer + headerSize, avail);
|
|
|
|
// keep stats
|
|
if ( m_host ) m_host->m_dgramsFrom++;
|
|
|
|
// it's new to us, set the read bits
|
|
setBit ( dgramNum, m_readBits2 );
|
|
// inc the lit bit count
|
|
m_readBitsOn++;
|
|
// if our proto doesn't use acks, treat this as an ACK as well
|
|
if ( ! m_proto->useAcks () ) readAck(0/*dgramNum*/,now);
|
|
// if read everything, set the queued timer
|
|
if ( m_readBitsOn >= m_dgramsToRead )
|
|
m_queuedTime = gettimeofdayInMilliseconds();
|
|
// all done
|
|
return true;
|
|
}
|
|
|
|
// called to process an ack we read for dgram # "dgramNum"
|
|
void UdpSlot::readAck ( int32_t dgramNum, int64_t now ) {
|
|
// protection from garbled dgrams
|
|
if ( dgramNum >= MAX_DGRAMS ) {
|
|
log(LOG_LOGIC,
|
|
"udp: Reading ack for dgram #%" PRId32" > max dgram of %" PRId32".",
|
|
dgramNum,(int32_t)MAX_DGRAMS);
|
|
return ; }
|
|
// . get time now
|
|
// . make async safe
|
|
//int64_t now = gettimeofdayInMilliseconds();
|
|
// update lastRead time for this transaction
|
|
m_lastReadTime = gettimeofdayInMilliseconds();
|
|
// cease all resending
|
|
m_resendCount = 0;
|
|
// reset the resendTime to the starting point before back-off scheme
|
|
setResendTime();
|
|
// if this is a dup ack, ignore it
|
|
if ( isOn ( dgramNum , m_readAckBits2 ) ) return;
|
|
// mark this ack as read
|
|
setBit ( dgramNum , m_readAckBits2 );
|
|
// update lit bit count
|
|
m_readAckBitsOn++;
|
|
// if it was marked as unsent, fix that
|
|
if ( ! isOn ( dgramNum , m_sentBits2 ) ) {
|
|
// bitch if we do not even have a send buffer. why is he acking
|
|
// something we haven't even had to a chance to generate let
|
|
// alone send? network error?
|
|
if ( ! m_sendBufAlloc || m_dgramsToSend <= 0 )
|
|
log("udp: Read ack but send buf is empty.");
|
|
// mark this dgram as sent
|
|
setBit ( dgramNum , m_sentBits2 );
|
|
// update lit bit count
|
|
m_sentBitsOn++;
|
|
}
|
|
// . we often receive an out of order ACK
|
|
// . this usually means that the receiver did not get the dgrams
|
|
// for the gap of acks because of a collision
|
|
// . we detect this gap and automatically re-send the dgrams w/o delay
|
|
// . if our right neighbor read ack bit is off then mark all off bits
|
|
// on our right as having sent bits of 0, until we hit a lit ack bit
|
|
for ( int32_t i = dgramNum - 1 ; i >= 0 ; i-- ) {
|
|
// stop after hitting a lit bit
|
|
if ( isOn ( i , m_readAckBits2 ) ) break;
|
|
// mark as unsent iff it's marked as sent
|
|
if ( ! isOn ( i , m_sentBits2 ) ) continue;
|
|
// mark as unsent
|
|
clrBit ( i , m_sentBits2 );
|
|
// reduce the lit bit count
|
|
m_sentBitsOn--;
|
|
// update m_nextToSend
|
|
if ( i < m_nextToSend ) m_nextToSend = i;
|
|
}
|
|
|
|
// if the reply or request was fully acknowledged by the receiver
|
|
// then record some statistics
|
|
if ( ! hasAcksToRead() ) {
|
|
now = gettimeofdayInMilliseconds();
|
|
int32_t delta = now - m_startTime;
|
|
// but if we were sending a reply, use m_queuedTime
|
|
// as the start time of the send. we set m_queuedTime
|
|
// to the current time in sendReply().
|
|
if ( ! m_callback ) delta = now - m_queuedTime;
|
|
int32_t n = m_niceness;
|
|
if ( n < 0 ) n = 0;
|
|
if ( n > 1 ) n = 1;
|
|
int32_t r = 0;
|
|
// if m_callback then we are sending a request, not a reply,
|
|
// because only the sender of the request has a callback
|
|
if ( m_callback ) r = 1;
|
|
// add to average
|
|
g_stats.m_msgTotalOfSendTimes[m_msgType][n][r] += delta;
|
|
g_stats.m_msgTotalSent [m_msgType][n][r]++;
|
|
// bucket number is log base 2 of the delta
|
|
if ( delta > 64000 ) delta = 64000;
|
|
int32_t bucket = getHighestLitBit ( (uint16_t)delta );
|
|
g_stats.m_msgTotalSentByTime [m_msgType][n][r][bucket]++;
|
|
// set the queued time for stats on how long it sits in the
|
|
// queue.
|
|
m_queuedTime = now;
|
|
}
|
|
|
|
// log msg
|
|
if ( g_conf.m_logDebugUdp ) {
|
|
//#ifdef _UDPDEBUG_
|
|
int32_t kk = 0; if ( m_callback ) kk = 1;
|
|
int32_t hid = -1;
|
|
if ( m_host )
|
|
hid = m_host->m_hostId;
|
|
char ipbuf[16];
|
|
log(LOG_DEBUG,
|
|
"udp: Read ACK "
|
|
"dgram=%" PRId32" "
|
|
"msg=0x%02x "
|
|
"tid=%" PRId32" "
|
|
"src=%s:%hu "
|
|
"init=%" PRId32" "
|
|
"age=%" PRId64" "
|
|
"dsent=%" PRId32" "
|
|
"aread=%" PRId32" "
|
|
"hid=%" PRId32,
|
|
(int32_t)dgramNum,
|
|
(int16_t)m_msgType ,
|
|
m_transId,
|
|
iptoa(m_ip,ipbuf) ,
|
|
(uint16_t)m_port,
|
|
(int32_t)kk ,
|
|
now - m_startTime,
|
|
(int32_t)m_sentBitsOn ,
|
|
(int32_t)m_readAckBitsOn ,
|
|
hid);
|
|
//#endif
|
|
}
|
|
}
|
|
|
|
|
|
static const char umsg_label[256][7] = {
|
|
"umsg00", "umsg01", "umsg02", "umsg03", "umsg04", "umsg05", "umsg06", "umsg07", "umsg08", "umsg09", "umsg0a", "umsg0b", "umsg0c", "umsg0d", "umsg0e", "umsg0f",
|
|
"umsg10", "umsg11", "umsg12", "umsg13", "umsg14", "umsg15", "umsg16", "umsg17", "umsg18", "umsg19", "umsg1a", "umsg1b", "umsg1c", "umsg1d", "umsg1e", "umsg1f",
|
|
"umsg20", "umsg21", "umsg22", "umsg23", "umsg24", "umsg25", "umsg26", "umsg27", "umsg28", "umsg29", "umsg2a", "umsg2b", "umsg2c", "umsg2d", "umsg2e", "umsg2f",
|
|
"umsg30", "umsg31", "umsg32", "umsg33", "umsg34", "umsg35", "umsg36", "umsg37", "umsg38", "umsg39", "umsg3a", "umsg3b", "umsg3c", "umsg3d", "umsg3e", "umsg3f",
|
|
"umsg40", "umsg41", "umsg42", "umsg43", "umsg44", "umsg45", "umsg46", "umsg47", "umsg48", "umsg49", "umsg4a", "umsg4b", "umsg4c", "umsg4d", "umsg4e", "umsg4f",
|
|
"umsg50", "umsg51", "umsg52", "umsg53", "umsg54", "umsg55", "umsg56", "umsg57", "umsg58", "umsg59", "umsg5a", "umsg5b", "umsg5c", "umsg5d", "umsg5e", "umsg5f",
|
|
"umsg60", "umsg61", "umsg62", "umsg63", "umsg64", "umsg65", "umsg66", "umsg67", "umsg68", "umsg69", "umsg6a", "umsg6b", "umsg6c", "umsg6d", "umsg6e", "umsg6f",
|
|
"umsg70", "umsg71", "umsg72", "umsg73", "umsg74", "umsg75", "umsg76", "umsg77", "umsg78", "umsg79", "umsg7a", "umsg7b", "umsg7c", "umsg7d", "umsg7e", "umsg7f",
|
|
"umsg80", "umsg81", "umsg82", "umsg83", "umsg84", "umsg85", "umsg86", "umsg87", "umsg88", "umsg89", "umsg8a", "umsg8b", "umsg8c", "umsg8d", "umsg8e", "umsg8f",
|
|
"umsg90", "umsg91", "umsg92", "umsg93", "umsg94", "umsg95", "umsg96", "umsg97", "umsg98", "umsg99", "umsg9a", "umsg9b", "umsg9c", "umsg9d", "umsg9e", "umsg9f",
|
|
"umsga0", "umsga1", "umsga2", "umsga3", "umsga4", "umsga5", "umsga6", "umsga7", "umsga8", "umsga9", "umsgaa", "umsgab", "umsgac", "umsgad", "umsgae", "umsgaf",
|
|
"umsgb0", "umsgb1", "umsgb2", "umsgb3", "umsgb4", "umsgb5", "umsgb6", "umsgb7", "umsgb8", "umsgb9", "umsgba", "umsgbb", "umsgbc", "umsgbd", "umsgbe", "umsgbf",
|
|
"umsgc0", "umsgc1", "umsgc2", "umsgc3", "umsgc4", "umsgc5", "umsgc6", "umsgc7", "umsgc8", "umsgc9", "umsgca", "umsgcb", "umsgcc", "umsgcd", "umsgce", "umsgcf",
|
|
"umsgd0", "umsgd1", "umsgd2", "umsgd3", "umsgd4", "umsgd5", "umsgd6", "umsgd7", "umsgd8", "umsgd9", "umsgda", "umsgdb", "umsgdc", "umsgdd", "umsgde", "umsgdf",
|
|
"umsge0", "umsge1", "umsge2", "umsge3", "umsge4", "umsge5", "umsge6", "umsge7", "umsge8", "umsge9", "umsgea", "umsgeb", "umsgec", "umsged", "umsgee", "umsgef",
|
|
"umsgf0", "umsgf1", "umsgf2", "umsgf3", "umsgf4", "umsgf5", "umsgf6", "umsgf7", "umsgf8", "umsgf9", "umsgfa", "umsgfb", "umsgfc", "umsgfd", "umsgfe", "umsgff"
|
|
};
|
|
|
|
// returns false and sets g_errno on error
|
|
bool UdpSlot::makeReadBuf ( int32_t msgSize , int32_t numDgrams ) {
|
|
// bitch if it's already there
|
|
if ( m_readBuf ) {
|
|
g_errno = EBADENGINEER;
|
|
log(LOG_LOGIC, "udp: makereadbuf: Read buf already there.");
|
|
return false;
|
|
}
|
|
// ensure msg not too big
|
|
if ( msgSize > m_proto->getMaxMsgSize() ) {
|
|
g_errno = EMSGTOOBIG;
|
|
log(LOG_LOGIC,"udp: makereadbuf: msg size of %" PRId32" is "
|
|
"too big. Max is %" PRId32".",msgSize,
|
|
(int32_t)m_proto->getMaxMsgSize());
|
|
return false;
|
|
}
|
|
// if msgSize is -1 then it is under 1 dgram, but assume the worst
|
|
if ( msgSize == -1 ) msgSize = m_maxDgramSize;
|
|
|
|
if(msgSize!=0) {
|
|
// . create a msg buf to hold msg, zero out everything...
|
|
// . label it "umsg" so we can grep the *.cpp files for it
|
|
m_readBuf = (char *) mmalloc ( msgSize, umsg_label[(uint8_t)m_msgType] );
|
|
if ( ! m_readBuf ) {
|
|
m_readBufSize = 0;
|
|
log(LOG_WARN, "udp: Failed to allocate %" PRId32" bytes to read request or reply on udp socket.", msgSize);
|
|
return false;
|
|
}
|
|
|
|
// initialize readBuf to track down corruption
|
|
memset(m_readBuf, 0xfe, msgSize);
|
|
}
|
|
m_readBufMaxSize = msgSize;
|
|
// let the caller know we're good
|
|
return true;
|
|
}
|
|
|
|
// . returns a score of -1 if nothing to send
|
|
// . higher scoring slots will do their sending first
|
|
// . may have ACKs to send or plain old dgrams to send
|
|
// . now is current time in milliseconds since the epoch
|
|
int32_t UdpSlot::getScore ( int64_t now ) const {
|
|
// do not do sends if callback was called. maybe cancelled?
|
|
// this was causing us to get into an infinite loop in
|
|
// UdpServer.cpp's sendPoll_ass(). there wasn't anything to send i
|
|
// guess because it got cancelled (?) but we kept returning it here
|
|
// nonetheless.
|
|
if ( m_calledCallback )
|
|
return -1;
|
|
|
|
// send ACKs before regular dgrams so we don't hold people up
|
|
if ( m_sentAckBitsOn < m_readBitsOn && m_proto->useAcks() )
|
|
return 0x7fffffff;
|
|
// return a score of -1 if we've sent all dgrams (and no acks to send)
|
|
if ( m_sentBitsOn >= m_dgramsToSend ) return -1;
|
|
|
|
// . let's use a window now, give acks a chance to catch up somewhat
|
|
// . if send is local, use a larger ack window of ?64? dgrams
|
|
if ( ip_distance(m_ip)!=ip_distance_ourselves &&
|
|
m_sentBitsOn >= m_readAckBitsOn + ACK_WINDOW_SIZE ) return -1;
|
|
// well, give a window size of 100 to loopbacks
|
|
if ( ip_distance(m_ip)==ip_distance_ourselves &&
|
|
m_sentBitsOn >= m_readAckBitsOn + ACK_WINDOW_SIZE_LB ) return -1;
|
|
|
|
// sort regular sends by the last send time
|
|
int64_t score = now - m_lastSendTime + 1000;
|
|
// watch out if someone changed the system clock on us
|
|
if ( score < 1000 ) score = 1000;
|
|
|
|
return score;
|
|
}
|