Decouple PageInject from UdpServer active slots by changing logic to pick which host to send injection to

This commit is contained in:
Ai Lin Chia
2016-08-05 14:53:00 +02:00
parent 16abd75a14
commit 63153c05a5

@ -125,40 +125,14 @@ Host *getHostToHandleInjection ( char *url ) {
// a msg7 injection request for each doc in the warc/arc file
// so let's do load balancing differently for them so one host
// doesn't end up doing a bunch of wget/gunzips on warc files
// thereby bottlenecking the cluster. get the first hostid that
// we have not sent a msg7 injection request to that is still out
for ( int32_t i = 0 ; i < g_hostdb.getNumHosts() ; i++ ) {
Host *h = g_hostdb.getHost(i);
h->m_tmpCount = 0;
}
// thereby bottlenecking the cluster.
for (UdpSlot *slot = g_udpServer.getActiveHead(); slot; slot = slot->getActiveListNext()) {
// skip if not injection request
if ( slot->getMsgType() != msg_type_7 ) continue;
//if ( ! slot->m_weInitiated ) continue;
// if we did not initiate the injection request, i.e. if
// it is to us, skip it
if ( ! slot->hasCallback() ) continue;
// who is it from?
int32_t hostId = slot->getHostId();
if ( hostId < 0 ) continue;
Host *h = g_hostdb.getHost ( hostId );
if ( ! h ) continue;
h->m_tmpCount++;
}
int32_t min = 999999;
Host *minh = NULL;
for ( int32_t i = 0 ; i < g_hostdb.getNumHosts() ; i++ ) {
Host *h = g_hostdb.getHost(i);
if ( h->m_tmpCount == 0 ) return h;
if ( h->m_tmpCount >= min ) continue;
min = h->m_tmpCount;
minh = h;
}
if ( minh ) return minh;
// old logic:
// get the first hostid that we have not sent a msg7 injection request to that is still out
// how can this happen?
return host;
// new logic:
// replaced with simpler logic of hashing the url and mod it with number of shards
return g_hostdb.getLeastLoadedInShard(static_cast<uint32_t>(norm.getUrlHash64() % g_hostdb.getNumShards()), 0);
}
static void gotUdpReplyWrapper ( void *state , UdpSlot *slot ) {