mirror of
https://github.com/privacore/open-source-search-engine.git
synced 2025-07-15 02:36:08 -04:00
Simplify UdpSlot::readDatagramOrAck logic
This commit is contained in:
107
UdpSlot.cpp
107
UdpSlot.cpp
@ -1169,8 +1169,9 @@ bool UdpSlot::readDatagramOrAck ( const void *readBuffer_,
|
||||
// . ONLY if slot is new! otherwise, we keep the sender's
|
||||
// niceness. so if the slot niceness got converted by
|
||||
// the handler we do not re-nice it on our end.
|
||||
if ( ! m_sendBuf )
|
||||
m_niceness = m_proto->isNice ( readBuffer, readSize );
|
||||
if ( ! m_sendBuf ) {
|
||||
m_niceness = m_proto->isNice(readBuffer, readSize);
|
||||
}
|
||||
}
|
||||
|
||||
// . if m_readBuf is NULL then init m_readBuf/m_readBufMaxSize big
|
||||
@ -1216,76 +1217,48 @@ bool UdpSlot::readDatagramOrAck ( const void *readBuffer_,
|
||||
}
|
||||
|
||||
// dgram #'s above 0 can be copied directly into m_readBuf
|
||||
if ( dgramNum > 0 ) {
|
||||
// how much DATA can we read from this dgram?
|
||||
int32_t avail = m_readBufMaxSize - offset;
|
||||
if ( avail > maxDataSize ) avail = maxDataSize;
|
||||
// include header too
|
||||
int32_t toRead = avail + headerSize;
|
||||
// where to put it?
|
||||
char *dest = m_readBuf + offset - headerSize;
|
||||
// sanity check, watch out for bad headers...
|
||||
if ( toRead < 0 ) {
|
||||
//g_errno = ECORRUPTDATA;
|
||||
// do not spam the logs
|
||||
static int32_t s_badCount = 0;
|
||||
s_badCount++;
|
||||
int32_t avail = readSize - headerSize;
|
||||
|
||||
log(LOG_WARN, "udp: got %" PRId32" bad dgram headers. "
|
||||
"dgramNum=%" PRId32" offset=%" PRId32" "
|
||||
"readBufMaxSize=%" PRId32". IS hosts.conf OUT OF SYNC???",
|
||||
s_badCount,(int32_t)dgramNum,(int32_t)offset,
|
||||
(int32_t)m_readBufMaxSize);
|
||||
|
||||
// this actually helps us to identify when hosts.conf
|
||||
// is out of sync between hosts, so core
|
||||
// SEEMS like the roadrunner wireless connection
|
||||
// is spiking our packets sometimes with noise...
|
||||
//g_process.shutdownAbort(true);
|
||||
return false;
|
||||
}
|
||||
// save what's before us
|
||||
char tmp[32];
|
||||
gbmemcpy ( tmp , dest , headerSize );
|
||||
memcpy(dest, readBuffer, toRead);
|
||||
//log("udp: recvfrom1 = %i",(int)numRead);
|
||||
// restore what was at the header before we stored it there
|
||||
gbmemcpy ( dest , tmp , headerSize );
|
||||
// keep stats
|
||||
if ( m_host ) m_host->m_dgramsFrom++;
|
||||
// keep track of dgrams sent outside of our cluster
|
||||
//else g_stats.m_dgramsFromStrangers++;
|
||||
// it's new to us, set the read bits
|
||||
setBit ( dgramNum, m_readBits2 );
|
||||
// inc the lit bit count
|
||||
m_readBitsOn++;
|
||||
// if our proto doesn't use acks, treat this as an ACK as well
|
||||
if ( ! m_proto->useAcks () ) readAck(0/*dgramNum*/,now);
|
||||
// if read everything, set the queued timer
|
||||
if ( m_readBitsOn >= m_dgramsToRead )
|
||||
m_queuedTime = gettimeofdayInMilliseconds();
|
||||
// all done
|
||||
return true;
|
||||
if (dgramNum == 0 && m_readBufSize == -1) {
|
||||
m_readBufSize = avail;
|
||||
}
|
||||
|
||||
// otherwise, copy into our tmp buffer
|
||||
char dgram [DGRAM_SIZE_CEILING];
|
||||
memcpy(dgram, readBuffer, readSize);
|
||||
if (dgramNum > 0) {
|
||||
int32_t validate = m_readBufMaxSize - offset;
|
||||
if (validate > maxDataSize) validate = maxDataSize;
|
||||
if (validate != avail) {
|
||||
gbshutdownLogicError();
|
||||
}
|
||||
}
|
||||
|
||||
// where to put it?
|
||||
char *dest = m_readBuf + offset;
|
||||
|
||||
// sanity check, watch out for bad headers...
|
||||
if ( avail < 0 ) {
|
||||
// do not spam the logs
|
||||
static int32_t s_badCount = 0;
|
||||
s_badCount++;
|
||||
|
||||
log(LOG_WARN, "udp: got %" PRId32" bad dgram headers. "
|
||||
"dgramNum=%" PRId32" offset=%" PRId32" "
|
||||
"readBufMaxSize=%" PRId32". IS hosts.conf OUT OF SYNC???",
|
||||
s_badCount,(int32_t)dgramNum,(int32_t)offset,
|
||||
(int32_t)m_readBufMaxSize);
|
||||
|
||||
// this actually helps us to identify when hosts.conf
|
||||
// is out of sync between hosts, so core
|
||||
// SEEMS like the roadrunner wireless connection
|
||||
// is spiking our packets sometimes with noise...
|
||||
//g_process.shutdownAbort(true);
|
||||
return false;
|
||||
}
|
||||
|
||||
memcpy(dest, readBuffer + headerSize, avail);
|
||||
|
||||
// keep stats
|
||||
if ( m_host ) m_host->m_dgramsFrom++;
|
||||
// keep track of dgrams sent outside of our cluster
|
||||
//else g_stats.m_dgramsFromStrangers++;
|
||||
|
||||
// where to put it? it might not be dgram #0...
|
||||
char *dest = m_readBuf + offset ;
|
||||
// what to put?
|
||||
char *src = dgram + headerSize ;
|
||||
// how much to put
|
||||
int32_t len = readSize - headerSize;
|
||||
// if msgSize was -1 then m_readBufSize will be -1
|
||||
if ( m_readBufSize == -1 ) m_readBufSize = len;
|
||||
// bounce it back into m_readBuf
|
||||
gbmemcpy ( dest , src , len );
|
||||
// it's new to us, set the read bits
|
||||
setBit ( dgramNum, m_readBits2 );
|
||||
// inc the lit bit count
|
||||
@ -1295,7 +1268,7 @@ bool UdpSlot::readDatagramOrAck ( const void *readBuffer_,
|
||||
// if read everything, set the queued timer
|
||||
if ( m_readBitsOn >= m_dgramsToRead )
|
||||
m_queuedTime = gettimeofdayInMilliseconds();
|
||||
// success
|
||||
// all done
|
||||
return true;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user