e515e92dae
It has always been local time since ... forever. We rely on NTP doing its job.
908 lines
28 KiB
C++
908 lines
28 KiB
C++
#include "Pages.h"
|
|
#include "TcpSocket.h"
|
|
#include "HttpServer.h"
|
|
#include "SpiderProxy.h"
|
|
#include "max_niceness.h"
|
|
#include "Process.h"
|
|
#include "HashTableX.h"
|
|
#include "Conf.h"
|
|
#include "Hostdb.h"
|
|
#include "UdpSlot.h"
|
|
#include "UdpServer.h"
|
|
#include "Loop.h"
|
|
#include "ip.h"
|
|
#include "GbUtil.h"
|
|
#include "Errno.h"
|
|
|
|
|
|
//#define LOADPOINT_EXPIRE_MS (10*60*1000)
|
|
// make it 15 seconds not 10 minutes otherwise it gets too full with dup
|
|
// keys and really clogs things up
|
|
#define LOADPOINT_EXPIRE_MS (15*1000)
|
|
|
|
//
|
|
// BASIC DETAILS
|
|
//
|
|
// . host #0 is solely responsible for testing the proxies and keeping
|
|
// the results of the tests, using the user-defined test url which
|
|
// each proxy downloads every 60 seconds or so.
|
|
// . host #0 also saves these stats to the spiderproxies.dat file
|
|
// in the working dir (just an array of SpiderProxy instances)
|
|
// . any host needing a proxy server to use should ask host #0 for one
|
|
// but if host #0 is dead then it should ask host #1, etc.
|
|
// . host #1 (etc.) will take over if it sees host #0 went dead
|
|
// . Conf::m_proxyIps (safebuf) holds space-sep'd list of proxyip:port pairs
|
|
// . since host #0 is responsible for giving you a proxy ip it can
|
|
// do proxy load balancing for the whole cluster
|
|
// . TODO: to prevent host #0 from getting too slammed we can also recruit
|
|
// other hosts to act just like host #0.
|
|
|
|
|
|
// hashtable that maps an ip:port key (64-bits) to a SpiderProxy
|
|
static HashTableX s_iptab;
|
|
|
|
// . handle a udp request for msg 0x54 to get the best proxy to download
|
|
// the url they are requesting to download. this should be ultra fast
|
|
// because we might be downloading 1000+ urls a second, although in that
|
|
// case we should have more hosts that do the proxy load balancing.
|
|
// . when a host is done using a proxy to download a url it should
|
|
// send another 0x54 msg to indicate that.
|
|
// . if a host dies then it might not indicate it is done using a proxy
|
|
// so should timeout requests and decrement the proxy load count
|
|
// or if we notice the host is dead we should remove all its load
|
|
// . so each outstanding request needs to be kept in a hash table,
|
|
// which identifies the proxyip/port and the hostid using it and
|
|
// the time it made the request. the key is the proxy ip/port.
|
|
class LoadBucket {
|
|
public:
|
|
// ip address of the url being downloaded
|
|
int32_t m_urlIp;
|
|
// the time it started
|
|
int64_t m_downloadStartTimeMS;
|
|
int64_t m_downloadEndTimeMS;
|
|
// the host using the proxy
|
|
int32_t m_hostId;
|
|
// key is this for m_prTable
|
|
int32_t m_proxyIp;
|
|
int32_t m_proxyPort;
|
|
// id of this loadbucket in case same host is using the same
|
|
// proxy to download the same urlip
|
|
int32_t m_id;
|
|
} __attribute__((packed, aligned(4)));
|
|
|
|
// . similar to s_ipTable but maps a URL's ip to a LoadBucket
|
|
// . every download request in the last 10 minutes is represented by one
|
|
// LoadBucket
|
|
// . that way we can ensure when downloading multiple urls of the same IP
|
|
// that we splay them out evenly over all the proxies
|
|
static HashTableX s_loadTable;
|
|
|
|
// . when the Conf::m_proxyIps parm is updated we call this to rebuild
|
|
// s_iptab, our table of SpiderProxy instances, which has the proxies and
|
|
// their performance statistics.
|
|
// . we try to maintain stats of ip/ports that did NOT change when rebuilding.
|
|
bool buildProxyTable ( ) {
|
|
|
|
// scan the NEW list of proxy ip/port pairs in g_conf
|
|
char *p = g_conf.m_proxyIps.getBufStart();
|
|
|
|
HashTableX tmptab;
|
|
tmptab.set(8,0,16,NULL,0,false,"tmptab");
|
|
|
|
// scan the user inputted space-separated list of ip:ports
|
|
// (optional username:password@ip:port)
|
|
for ( ; *p ; ) {
|
|
// skip white space
|
|
if ( is_wspace_a(*p) ) { p++; continue; }
|
|
|
|
// skip http://
|
|
if ( strncasecmp(p,"http://",7) == 0 ) { p += 7; continue; }
|
|
|
|
// scan in an ip:port
|
|
char *s = p; char *portStr = NULL;
|
|
int32_t dc = 0, pc = 0, gc = 0, bc = 0;
|
|
const char *msg;
|
|
|
|
char *usernamePwd = NULL;
|
|
int32_t usernamePwdLen = 0;
|
|
char *ipStart = p;
|
|
|
|
// scan all characters until we hit \0 or another whitespace
|
|
for ( ; *s && !is_wspace_a(*s); s++) {
|
|
|
|
if ( *s == '@' ) {
|
|
// must be username:pwd
|
|
if ( pc != 1 ) {
|
|
msg = "bad username:password";
|
|
goto hadError;
|
|
}
|
|
usernamePwd = p;
|
|
usernamePwdLen = s - p;
|
|
if ( usernamePwdLen >= MAXUSERNAMEPWD-2 ) {
|
|
msg = "username:password too long";
|
|
goto hadError;
|
|
}
|
|
dc = 0;
|
|
gc = 0;
|
|
bc = 0;
|
|
pc = 0;
|
|
portStr = NULL;
|
|
ipStart = s+1;
|
|
continue;
|
|
}
|
|
|
|
if ( *s == '.' ) { dc++; continue; }
|
|
if ( *s == ':' ) { portStr=s; pc++; continue; }
|
|
if ( is_digit(*s) ) { gc++; continue; }
|
|
bc++;
|
|
continue;
|
|
}
|
|
// ensure it is a legit ip:port combo
|
|
msg = NULL;
|
|
if ( gc < 4 )
|
|
msg = "not enough digits for an ip";
|
|
if ( pc > 1 )
|
|
msg = "too many colons";
|
|
if ( dc != 3 )
|
|
msg = "need 3 dots for an ip address";
|
|
if ( bc )
|
|
msg = "got illegal char in ip:port listing";
|
|
if ( msg ) {
|
|
hadError:
|
|
char c = *s;
|
|
*s = '\0';
|
|
log("buf: %s for %s",msg,p);
|
|
*s = c;
|
|
return false;
|
|
}
|
|
|
|
// convert it
|
|
int32_t iplen = s - ipStart;
|
|
if ( portStr ) iplen = portStr - ipStart;
|
|
int32_t ip = atoip(ipStart,iplen);
|
|
// another sanity check
|
|
if ( ip == 0 || ip == -1 ) {
|
|
log("spider: got bad proxy ip for %s",p);
|
|
return false;
|
|
}
|
|
|
|
// and the port default is 80
|
|
int32_t port = 80;
|
|
if ( portStr ) port = atol2(portStr+1,s-portStr-1);
|
|
if ( port < 0 || port > 65535 ) {
|
|
log("spider: got bad proxy port for %s",p);
|
|
return false;
|
|
}
|
|
|
|
|
|
// . we got a legit ip:port
|
|
// . see if already in our table
|
|
uint64_t ipKey = (uint32_t)ip;
|
|
ipKey <<= 16;
|
|
ipKey |= (uint16_t)(port & 0xffff);
|
|
|
|
// also store into tmptable to see what we need to remove
|
|
tmptab.addKey(&ipKey);
|
|
|
|
// see if in table
|
|
int32_t islot = s_iptab.getSlot( &ipKey);
|
|
|
|
// advance p
|
|
p = s;
|
|
|
|
// if in there, keep it as is
|
|
if ( islot >= 0 ) continue;
|
|
|
|
// otherwise add new entry
|
|
SpiderProxy newThing;
|
|
memset ( &newThing , 0 , sizeof(SpiderProxy));
|
|
newThing.m_ip = ip;
|
|
newThing.m_port = port;
|
|
newThing.m_lastDownloadTookMS = -1;
|
|
newThing.m_lastSuccessfulTestMS = -1;
|
|
|
|
memcpy(newThing.m_usernamePwd,usernamePwd,usernamePwdLen);
|
|
// ensure it is NULL terminated
|
|
newThing.m_usernamePwd[usernamePwdLen] = '\0';
|
|
|
|
if ( ! s_iptab.addKey ( &ipKey, &newThing ) )
|
|
return false;
|
|
}
|
|
|
|
redo:
|
|
int32_t removed = 0;
|
|
// scan all SpiderProxies in tmptab
|
|
for ( int32_t i = 0 ; i < s_iptab.getNumSlots() ; i++ ) {
|
|
// skip empty buckets in hashtable s_iptab
|
|
if ( ! s_iptab.m_flags[i] ) continue;
|
|
// get the key
|
|
int64_t key = *(int64_t *)s_iptab.getKeyFromSlot(i);
|
|
// must also exist in tmptab, otherwise it got removed by user
|
|
if ( tmptab.isInTable ( &key ) ) continue;
|
|
// skip if not in table
|
|
if ( s_iptab.getSlot ( &key ) < 0 ) {
|
|
log("sproxy: iptable hashing messed up");
|
|
continue;
|
|
}
|
|
// shoot, it got removed. not in the new list of ip:ports
|
|
s_iptab.removeKey ( &key );
|
|
removed++;
|
|
// hashtable is messed up now, start over
|
|
//goto redo;
|
|
}
|
|
if ( removed ) goto redo;
|
|
return true;
|
|
}
|
|
|
|
static bool s_init = true;
|
|
static HashTableX s_proxyBannedTable;
|
|
static HashTableX s_banCountTable;
|
|
|
|
static bool initProxyTables() {
|
|
// initialize proxy/urlip ban table?
|
|
if ( ! s_init ) return true;
|
|
s_init = false;
|
|
s_proxyBannedTable.set(8,0,0,NULL,0,false,"proxban");
|
|
s_banCountTable.set(4,4,0,NULL,0,false,"bancnt");
|
|
return true;
|
|
}
|
|
|
|
bool resetProxyStats ( ) {
|
|
// s_proxyBannedTable.reset();
|
|
// s_banCountTable.reset();
|
|
// s_iptab.reset();
|
|
|
|
// skip port part of key magic, and get LSB of the IP as key magic
|
|
s_iptab.set(8, sizeof(SpiderProxy), 0, NULL, 0, false, "siptab", true, 5);
|
|
|
|
s_proxyBannedTable.set(8,0,0,NULL,0,false,"proxban");
|
|
s_banCountTable.set(4,4,0,NULL,0,false,"bancnt");
|
|
return buildProxyTable();
|
|
}
|
|
|
|
|
|
static void resetProxyStatWrapper(int fd, void *state) {
|
|
resetProxyStats();
|
|
}
|
|
|
|
// save the stats
|
|
bool saveSpiderProxyStats ( ) {
|
|
return true;
|
|
}
|
|
|
|
static bool loadSpiderProxyStats() {
|
|
|
|
initProxyTables();
|
|
|
|
// unset some flags
|
|
for ( int32_t i = 0 ; i < s_iptab.getNumSlots() ; i++ ) {
|
|
// skip empty slots
|
|
if ( ! s_iptab.m_flags[i] ) continue;
|
|
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValueFromSlot(i);
|
|
sp->m_isWaiting = false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
// . sets *current to how many downloads are CURRENTly outstanding for
|
|
// this proxy
|
|
// . returns total # of load points, i.e. downloads, in the last
|
|
// LOADPOINT_EXPIRE_MS milliseconds (currently 10 minutes)
|
|
static int32_t getNumLoadPoints(SpiderProxy *sp, int32_t *current) {
|
|
// currently outstanding downloads on proxy
|
|
*current = 0;
|
|
int32_t count = 0;
|
|
// scan all proxies that have this urlip outstanding
|
|
for ( int32_t i = 0 ; i < s_loadTable.getNumSlots() ; i++ ) {
|
|
// skip if empty
|
|
if ( ! s_loadTable.m_flags[i] ) continue;
|
|
// get the bucket
|
|
LoadBucket *lb= (LoadBucket *)s_loadTable.getValueFromSlot(i);
|
|
// get the spider proxy this load point was for
|
|
if ( lb->m_proxyIp != sp->m_ip ) continue;
|
|
if ( lb->m_proxyPort != sp->m_port ) continue;
|
|
|
|
// currently outstanding downloads on proxy
|
|
if ( lb->m_downloadEndTimeMS == 0LL )
|
|
*current = *current + 1;
|
|
|
|
count++;
|
|
}
|
|
return count;
|
|
}
|
|
|
|
// . we call this from Parms.cpp which prints out the proxy related controls
|
|
// and this table below them...
|
|
// . allows user to see the stats of each spider proxy
|
|
bool printSpiderProxyTable ( SafeBuf *sb ) {
|
|
|
|
// only host #0 will have the stats ... so print that link
|
|
if ( g_hostdb.m_myHost->m_hostId != 0 ) {
|
|
Host *h = g_hostdb.getHost(0);
|
|
char ipbuf[16];
|
|
sb->safePrintf("<br>"
|
|
"<b>See table on <a href=http://%s:%" PRId32"/"
|
|
"admin/proxies>"
|
|
"host #0</a></b>"
|
|
"<br>"
|
|
, iptoa(h->m_ip,ipbuf)
|
|
, (int32_t)(h->getInternalHttpPort())
|
|
);
|
|
//return true;
|
|
}
|
|
|
|
// print host table
|
|
sb->safePrintf (
|
|
"<table %s>"
|
|
|
|
"<tr><td colspan=10><center>"
|
|
"<b>Spider Proxies "
|
|
"</b>"
|
|
"</center></td></tr>"
|
|
|
|
"<tr bgcolor=#%s>"
|
|
"<td>"
|
|
"<b>proxy IP</b></td>"
|
|
"<td><b>proxy port</b></td>"
|
|
|
|
"<td><b>times used</b></td>"
|
|
|
|
"<td><b># website IPs banning</b></td>"
|
|
|
|
"<td><b>load points</b></td>"
|
|
|
|
"<td><b>currently out</b></td>"
|
|
|
|
// time of last successful download. print "none"
|
|
// if never successfully used
|
|
"<td><b>test url last successful download</b></td>"
|
|
// we fetch a test url every minute or so through
|
|
// each proxy to ensure it is up. typically this should
|
|
// be your website so you do not make someone angry.
|
|
"<td><b>test url last download attempt</b></td>"
|
|
// print "FAILED" in red if it failed to download
|
|
"<td><b>test url download took</b></td>"
|
|
|
|
"<td><b>last bytes downloaded</b></td>"
|
|
|
|
"<td><b>last test url error</b></td>"
|
|
|
|
"</tr>"
|
|
|
|
, TABLE_STYLE
|
|
, DARK_BLUE
|
|
);
|
|
|
|
int32_t now = getTime();
|
|
|
|
// print it
|
|
for ( int32_t i = 0 ; i < s_iptab.getNumSlots() ; i++ ) {
|
|
// skip empty slots
|
|
if ( ! s_iptab.m_flags[i] ) continue;
|
|
|
|
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValueFromSlot(i);
|
|
|
|
const char *bg = LIGHT_BLUE;
|
|
// mark with light red bg if last test url attempt failed
|
|
if ( sp->m_lastDownloadTookMS == -1 &&
|
|
sp->m_lastDownloadTestAttemptMS>0 )
|
|
bg = "ffa6a6";
|
|
|
|
// or a perm denied error (as opposed to a timeout above)
|
|
if ( sp->m_lastDownloadError )
|
|
bg = "ffa6a6";
|
|
|
|
// print it
|
|
char ipbuf[16];
|
|
sb->safePrintf (
|
|
"<tr bgcolor=#%s>"
|
|
"<td>%s</td>" // proxy ip
|
|
"<td>%" PRIu32"</td>" // port
|
|
, bg
|
|
, iptoa(sp->m_ip,ipbuf)
|
|
, (uint32_t)(uint16_t)sp->m_port
|
|
);
|
|
|
|
sb->safePrintf("<td>%" PRId64"</td>",sp->m_timesUsed);
|
|
|
|
int32_t banCount = s_banCountTable.getScore32(sp->m_ip);
|
|
if ( banCount < 0 ) banCount = 0;
|
|
sb->safePrintf("<td>%" PRId32"</td>",banCount);
|
|
|
|
int32_t currentLoad;
|
|
|
|
// get # times it appears in loadtable
|
|
int32_t np = getNumLoadPoints ( sp , ¤tLoad );
|
|
sb->safePrintf("<td>%" PRId32"</td>",np);
|
|
|
|
// currently outstanding downloads on this proxy
|
|
sb->safePrintf("<td>%" PRId32"</td>",currentLoad);
|
|
|
|
// last SUCCESSFUL download time ago. when it completed.
|
|
int32_t ago = now - sp->m_lastSuccessfulTestMS/1000;
|
|
sb->safePrintf("<td>");
|
|
// like 1 minute ago etc.
|
|
if ( sp->m_lastSuccessfulTestMS <= 0 )
|
|
sb->safePrintf("none");
|
|
else
|
|
printTimeAgo(sb, ago, now, true);
|
|
sb->safePrintf("</td>");
|
|
|
|
// last download time ago
|
|
ago = now - sp->m_lastDownloadTestAttemptMS/1000;
|
|
sb->safePrintf("<td>");
|
|
// like 1 minute ago etc.
|
|
if ( sp->m_lastDownloadTestAttemptMS<= 0 )
|
|
sb->safePrintf("none");
|
|
else
|
|
printTimeAgo(sb, ago, now, true);
|
|
sb->safePrintf("</td>");
|
|
|
|
// how long to download the test url?
|
|
if ( sp->m_lastDownloadTookMS != -1 )
|
|
sb->safePrintf("<td>%" PRId32"ms</td>",
|
|
(int32_t)sp->m_lastDownloadTookMS);
|
|
else if ( sp->m_lastDownloadTestAttemptMS<= 0 )
|
|
sb->safePrintf("<td>unknown</td>");
|
|
else
|
|
sb->safePrintf("<td>"
|
|
"<font color=red>FAILED</font>"
|
|
"</td>");
|
|
|
|
sb->safePrintf("<td>%" PRId32"</td>",sp->m_lastBytesDownloaded);
|
|
|
|
if ( sp->m_lastDownloadError )
|
|
sb->safePrintf("<td><font color=red>%s</font></td>",
|
|
mstrerror(sp->m_lastDownloadError));
|
|
else
|
|
sb->safePrintf("<td>none</td>");
|
|
|
|
sb->safePrintf("</tr>\n");
|
|
}
|
|
|
|
sb->safePrintf("</table><br>");
|
|
return true;
|
|
}
|
|
|
|
#include "Msg13.h"
|
|
|
|
// when a host is done using a proxy to download a url it sends a signal
|
|
// and we process that signal here. to reduce the load count of that proxy.
|
|
static void returnProxy ( Msg13Request *preq , UdpSlot *udpSlot ) ;
|
|
|
|
// a host is asking us (host #0) what proxy to use?
|
|
static void handleRequest54(UdpSlot *udpSlot, int32_t /*niceness*/) {
|
|
|
|
char *request = udpSlot->m_readBuf;
|
|
int32_t requestSize = udpSlot->m_readBufSize;
|
|
|
|
// we now use the top part of the Msg13Request as the ProxyRequest
|
|
Msg13Request *preq = (Msg13Request *)request;
|
|
|
|
// sanity check
|
|
if ( requestSize != preq->getProxyRequestSize() ) {
|
|
log("db: Got bad request 0x54 size of %" PRId32" bytes. bad",
|
|
requestSize );
|
|
g_udpServer.sendErrorReply ( udpSlot , EBADREQUESTSIZE );
|
|
return;
|
|
}
|
|
|
|
// is the request telling us it is done downloading through a proxy?
|
|
if ( preq->m_opCode == OP_RETPROXY ) {
|
|
returnProxy ( preq , udpSlot );
|
|
return;
|
|
}
|
|
|
|
// if sender is asking for a new proxy and wants us to ban
|
|
// the previous proxy we sent for this urlIp...
|
|
if ( preq->m_banProxyIp ) {
|
|
// don't core if misses sanity. it seems we don't always
|
|
// NULLify these or something.
|
|
// these must match
|
|
if(preq->m_banProxyIp != preq->m_proxyIp ||
|
|
preq->m_banProxyPort != preq->m_proxyPort){
|
|
log("db: proxy: banproxyip != proxyip. mismatch!");
|
|
g_udpServer.sendErrorReply ( udpSlot , EBADENGINEER);
|
|
return;
|
|
}
|
|
// this will "return" the banned proxy
|
|
returnProxy ( preq , NULL );
|
|
// now add it to the banned table
|
|
int64_t uip = preq->m_urlIp;
|
|
int64_t pip = preq->m_banProxyIp;
|
|
int64_t h64 = hash64h ( uip , pip );
|
|
if ( ! s_proxyBannedTable.isInTable ( &h64 ) ) {
|
|
s_proxyBannedTable.addKey ( &h64 );
|
|
// for stats counting. each proxy ip maps to #
|
|
// of unique website IPs that have banned it.
|
|
s_banCountTable.addTerm32((uint32_t)pip);
|
|
}
|
|
}
|
|
|
|
|
|
// shortcut
|
|
int32_t urlIp = preq->m_urlIp;
|
|
|
|
// send to a proxy that is up and has the least amount
|
|
// of LoadBuckets with this urlIp, if tied, go to least loaded.
|
|
|
|
// clear counts for this url ip for scoring the best proxy to use
|
|
for ( int32_t i = 0 ; i < s_iptab.getNumSlots() ; i++ ) {
|
|
// skip empty slots
|
|
if ( ! s_iptab.m_flags[i] ) continue;
|
|
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValueFromSlot(i);
|
|
sp->m_countForThisIp = 0;
|
|
sp->m_lastTimeUsedForThisIp = 0LL;
|
|
}
|
|
|
|
// this table maps a url's current IP to a possibly MULTIPLE slots
|
|
// which tell us what proxy is downloading a page from that IP.
|
|
// so we can try to find a proxy that is not download a url from
|
|
// this IP currently, or hasn't been for the longest time...
|
|
int32_t hslot = s_loadTable.getSlot ( &urlIp );
|
|
// scan all proxies that have this urlip outstanding
|
|
for ( int32_t i = hslot ; i >= 0 ; i = s_loadTable.getNextSlot(i,&urlIp)){
|
|
// get the bucket
|
|
LoadBucket *lb;
|
|
lb = (LoadBucket *)s_loadTable.getValueFromSlot(i);
|
|
// get the spider proxy this load point was for
|
|
uint64_t key = (uint32_t)lb->m_proxyIp;
|
|
key <<= 16;
|
|
key |= (uint16_t)lb->m_proxyPort;
|
|
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValue(&key);
|
|
// must be there unless user remove it from the list
|
|
if ( ! sp ) continue;
|
|
// count it up
|
|
if ( lb->m_downloadEndTimeMS == 0LL )
|
|
sp->m_countForThisIp++;
|
|
// set the last time used to the most recently downloaded time
|
|
// that this proxy has downloaded from this ip
|
|
if ( lb->m_downloadEndTimeMS &&
|
|
lb->m_downloadEndTimeMS > sp->m_lastTimeUsedForThisIp )
|
|
sp->m_lastTimeUsedForThisIp = lb->m_downloadEndTimeMS;
|
|
}
|
|
|
|
// first try to get a spider proxy that is not "dead"
|
|
bool skipDead = true;
|
|
|
|
int32_t numBannedProxies = 0;
|
|
int32_t aliveProxyCandidates = 0;
|
|
|
|
redo:
|
|
// get the min of the counts
|
|
int32_t minCount = 999999;
|
|
for ( int32_t i = 0 ; i < s_iptab.getNumSlots() ; i++ ) {
|
|
// skip empty slots
|
|
if ( ! s_iptab.m_flags[i] ) continue;
|
|
// get the spider proxy
|
|
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValueFromSlot(i);
|
|
|
|
// if this proxy was banned by the url's ip... skip it. it is
|
|
// not a candidate...
|
|
if ( skipDead ) {
|
|
int64_t uip = preq->m_urlIp;
|
|
int64_t pip = sp->m_ip;
|
|
int64_t h64 = hash64h ( uip , pip );
|
|
if ( s_proxyBannedTable.isInTable ( &h64 ) ) {
|
|
numBannedProxies++;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// if it failed the last test, skip it
|
|
if ( skipDead && sp->m_lastDownloadError ) continue;
|
|
|
|
if ( skipDead ) aliveProxyCandidates++;
|
|
|
|
if ( sp->m_countForThisIp >= minCount ) continue;
|
|
minCount = sp->m_countForThisIp;
|
|
}
|
|
|
|
// all dead? then get the best dead one
|
|
if ( minCount == 999999 ) {
|
|
skipDead = false;
|
|
goto redo;
|
|
}
|
|
|
|
// . we only use one proxy if none are banned by this IP
|
|
// . when that gets banned, we will use the next 2 proxies with
|
|
// a higher backoff/crawlDelay, etc.
|
|
int32_t threshHold;
|
|
if ( numBannedProxies <= 0 ) threshHold = 1;
|
|
|
|
// if first proxy gets banned, try next 2 proxies until both get ban'd
|
|
else if ( numBannedProxies == 1 ) threshHold = 2;
|
|
else if ( numBannedProxies < 1+2) threshHold = 3 - numBannedProxies;
|
|
|
|
// if next two proxies got banned, try next 4 proxies until banned
|
|
else if ( numBannedProxies == 3 ) threshHold = 4;
|
|
else if ( numBannedProxies < 3+4) threshHold = 7 - numBannedProxies;
|
|
|
|
// if next 4 proxies got banned, try next 8 proxies until they get band
|
|
else if ( numBannedProxies == 7 ) threshHold = 8;
|
|
else if ( numBannedProxies < 7+8) threshHold = 15 - numBannedProxies;
|
|
|
|
else if ( numBannedProxies == 15) threshHold = 16;
|
|
else if ( numBannedProxies < 15+16 ) threshHold = 31-numBannedProxies;
|
|
|
|
else if ( numBannedProxies == 31 ) threshHold = 32;
|
|
else if ( numBannedProxies < 31+32)threshHold=63-numBannedProxies;
|
|
|
|
else if ( numBannedProxies == 63 ) threshHold = 64;
|
|
else if ( numBannedProxies < 63+64)threshHold=127-numBannedProxies;
|
|
|
|
else if ( numBannedProxies == 127 ) threshHold = 128;
|
|
else if ( numBannedProxies < 127+128)threshHold=255-numBannedProxies;
|
|
|
|
else if ( numBannedProxies == 255 ) threshHold = 256;
|
|
else if ( numBannedProxies < 255+256)threshHold=512-numBannedProxies;
|
|
|
|
else if ( numBannedProxies == 511 ) threshHold = 512;
|
|
else if ( numBannedProxies < 511+512)threshHold=1024-numBannedProxies;
|
|
|
|
else threshHold = 1024;
|
|
|
|
|
|
if ( threshHold <= 0 ) {
|
|
log("proxy: spiderproxy error in threshold of %" PRId32" "
|
|
"for banned=%" PRId32,threshHold,numBannedProxies);
|
|
threshHold = 1;
|
|
}
|
|
|
|
// reset minCount so we can take the min over those we check here
|
|
minCount = -1;
|
|
int64_t oldest = 0x7fffffffffffffffLL;
|
|
SpiderProxy *winnersp = NULL;
|
|
int32_t count = 0;
|
|
// start at a random slot based on url's IP so we don't
|
|
// overload the first proxy
|
|
int32_t start = ((uint32_t)urlIp) % s_iptab.getNumSlots();
|
|
int32_t slotCount = s_iptab.getNumSlots();
|
|
// . now find the best proxy wih the minCount
|
|
for ( int32_t i = start ; ; i++ ) {
|
|
// scan all slots in hash table, then stop
|
|
if ( slotCount-- <= 0 ) break;
|
|
// wrap around to zero if we hit the end
|
|
if ( i == s_iptab.getNumSlots() ) i = 0;
|
|
// skip empty slots
|
|
if ( ! s_iptab.m_flags[i] ) continue;
|
|
// get the spider proxy
|
|
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValueFromSlot(i);
|
|
// if it failed the last test, skip it... not here...
|
|
if ( skipDead && sp->m_lastDownloadError ) continue;
|
|
|
|
// if this proxy was banned by the url's ip... skip it. it is
|
|
// not a candidate...
|
|
if ( skipDead ) {
|
|
int64_t uip = preq->m_urlIp;
|
|
int64_t pip = sp->m_ip;
|
|
int64_t h64 = hash64h ( uip , pip );
|
|
if ( s_proxyBannedTable.isInTable ( &h64 ) ) continue;
|
|
}
|
|
|
|
// if some proxies are "alive" then only pick from
|
|
// the first half of the proxies that are alive (i.e. still
|
|
// work). that way, when one of those goes dead we will inc
|
|
// the backoff (crawldelay) and a new proxy that we haven't
|
|
// used for this url's IP will take it's place. and such
|
|
// new proxies will only have the new backoff count used
|
|
// through them. that way, we don't get ALL of our proxies
|
|
// banned at about the same time since we do somewhat uniform
|
|
// load balancing over them.
|
|
if ( skipDead && count >= threshHold)//aliveProxyCandidates/2 )
|
|
continue;
|
|
|
|
// count the alive/non-banned candidates
|
|
count++;
|
|
|
|
// if all hosts were "dead" because they all had
|
|
// m_lastDownloadError set then minCount will be 999999
|
|
// and nobody should continue from this statement:
|
|
if ( sp->m_countForThisIp > minCount && minCount>=0 ) continue;
|
|
// then go by last download time for this ip
|
|
if ( sp->m_countForThisIp == minCount && minCount>=0 &&
|
|
sp->m_lastTimeUsedForThisIp >= oldest )
|
|
continue;
|
|
|
|
// pick the spider proxy used longest ago
|
|
oldest = sp->m_lastTimeUsedForThisIp;
|
|
minCount = sp->m_countForThisIp;
|
|
// got a new winner
|
|
winnersp = sp;
|
|
}
|
|
|
|
// we must have a winner
|
|
if ( ! winnersp ) { g_process.shutdownAbort(true); }
|
|
|
|
int64_t nowms = gettimeofdayInMilliseconds();
|
|
|
|
// add a new load bucket then!
|
|
LoadBucket bb;
|
|
bb.m_urlIp = urlIp;
|
|
// the time it started
|
|
bb.m_downloadStartTimeMS = nowms;
|
|
// download has not ended yet
|
|
bb.m_downloadEndTimeMS = 0LL;
|
|
// the host using the proxy
|
|
bb.m_hostId = udpSlot->getHostId();
|
|
// key is this for m_prTable
|
|
bb.m_proxyIp = winnersp->m_ip;
|
|
bb.m_proxyPort = winnersp->m_port;
|
|
// a new id. we use this to update the downloadEndTime when done
|
|
static int32_t s_lbid = 0;
|
|
// add it now
|
|
bb.m_id = s_lbid++;
|
|
s_loadTable.addKey ( &urlIp , &bb );
|
|
// winner count update
|
|
winnersp->m_timesUsed++;
|
|
|
|
// sanity
|
|
if ( (int32_t)sizeof(ProxyReply) > SHORTSENDBUFFERSIZE ) { g_process.shutdownAbort(true); }
|
|
|
|
// and give proxy ip/port back to the requester so they can
|
|
// use that to download their url
|
|
ProxyReply *prep = (ProxyReply *)udpSlot->m_shortSendBuffer;
|
|
prep->m_proxyIp = winnersp->m_ip;
|
|
prep->m_proxyPort = winnersp->m_port;
|
|
|
|
// this is just '\0' if none
|
|
strcpy(prep->m_usernamePwd,winnersp->m_usernamePwd);
|
|
|
|
// do not count the proxy we are returning as "more"
|
|
prep->m_hasMoreProxiesToTry = ( aliveProxyCandidates > 1 );
|
|
// and the loadbucket id, so requester can tell us it is done
|
|
// downloading through the proxy and we can update the LoadBucket
|
|
// for this transaction (m_lbId)
|
|
prep->m_lbId = bb.m_id;
|
|
// requester wants to know how many proxies have been banned by the
|
|
// urlIp so it can increase a self-imposed crawl-delay to be more
|
|
// sensitive to the spider policy.
|
|
prep->m_numBannedProxies = numBannedProxies;
|
|
|
|
//char *p = udpSlot->m_shortSendBuffer;
|
|
//*(int32_t *)p = winnersp->m_ip ; p += 4;
|
|
//*(int16_t *)p = winnersp->m_port; p += 2;
|
|
// and the loadbucket id
|
|
//*(int32_t *)p = bb.m_id; p += 4;
|
|
|
|
// with dup keys we end up with long chains of crap and this
|
|
// takes forever. so just flush the whole thing every 2 minutes AND
|
|
// when 20000+ entries are in there
|
|
static time_t s_lastTime = 0;
|
|
time_t now = nowms / 1000;
|
|
if ( s_lastTime == 0 ) s_lastTime = now;
|
|
time_t elapsed = now - s_lastTime;
|
|
if ( elapsed > 120 && s_loadTable.getNumSlots() > 10000 ) {
|
|
log("sproxy: flushing %i entries from proxy loadtable that "
|
|
"have accumulated since %i seconds ago",
|
|
(int)s_loadTable.getNumUsedSlots(),(int)elapsed);
|
|
s_loadTable.clear();
|
|
// only do this one per minute
|
|
s_lastTime = now;
|
|
}
|
|
|
|
|
|
int32_t sanityCount = 0;//s_loadTable.getNumSlots();
|
|
// top:
|
|
// now remove old entries from the load table. entries that
|
|
// have completed and have a download end time more than 10 mins ago.
|
|
for ( int32_t i = s_loadTable.getNumSlots() - 1 ; i >= 0 ; i-- ) {
|
|
// skip if empty
|
|
if ( ! s_loadTable.m_flags[i] ) continue;
|
|
// get the bucket
|
|
LoadBucket *pp =(LoadBucket *)s_loadTable.getValueFromSlot(i);
|
|
// skip if still active
|
|
if ( pp->m_downloadEndTimeMS == 0LL ) continue;
|
|
// delta t
|
|
int64_t took = nowms - pp->m_downloadEndTimeMS;
|
|
// < 10 mins? now it's < 15 seconds to prevent clogging.
|
|
if ( took < LOADPOINT_EXPIRE_MS ) continue;
|
|
|
|
// 100 at a time so we don't slam cpu
|
|
if ( sanityCount++ > 100 ) break;
|
|
|
|
// ok, its too old, nuke it to save memory
|
|
s_loadTable.removeSlot(i);
|
|
// the keys might have buried us but we really should not
|
|
// mis out on analyzing any keys if we just keep looping here
|
|
// should we? TODO: figure it out. if we miss a few it's not
|
|
// a big deal.
|
|
//i--;
|
|
//goto top;
|
|
}
|
|
|
|
// send the proxy ip/port/LBid back to user
|
|
g_udpServer.sendReply(udpSlot->m_shortSendBuffer, sizeof(ProxyReply), udpSlot->m_shortSendBuffer, sizeof(ProxyReply), udpSlot);
|
|
}
|
|
|
|
// . use msg 0x55 to say you are done using the proxy
|
|
// . we now use the top part of the Msg13Request as the proxy request
|
|
void returnProxy ( Msg13Request *preq , UdpSlot *udpSlot ) {
|
|
|
|
//char *p = request;
|
|
//int32_t proxyIp = *(int32_t *)p; p += 4;
|
|
//int16_t proxyPort = *(int16_t *)p; p += 2;
|
|
//int32_t lbId = *(int32_t *)p; p += 4;
|
|
|
|
int32_t urlIp = preq->m_urlIp;
|
|
|
|
//
|
|
// update the load bucket
|
|
//
|
|
|
|
// scan over all that match to find lbid
|
|
int32_t hslot = s_loadTable.getSlot ( &urlIp );
|
|
// scan all proxies that have this urlip outstanding
|
|
int32_t i;for (i=hslot ; i >= 0 ; i = s_loadTable.getNextSlot(i,&urlIp)){
|
|
// get the bucket
|
|
LoadBucket *lb= (LoadBucket *)s_loadTable.getValueFromSlot(i);
|
|
// is it the right id?
|
|
if ( lb->m_id != preq->m_lbId ) continue;
|
|
if ( lb->m_proxyIp != preq->m_proxyIp ) continue;
|
|
if ( lb->m_proxyPort != preq->m_proxyPort ) continue;
|
|
// that's it. set the download end time
|
|
lb->m_downloadEndTimeMS = gettimeofdayInMilliseconds();
|
|
break;
|
|
}
|
|
|
|
if ( i < 0 )
|
|
log("sproxy: could not find load bucket id #%" PRId32,preq->m_lbId);
|
|
|
|
// if no slot provided, return to called without sending back reply,
|
|
// they are banning a proxy and need to also return it before
|
|
// we send them back another proxy to try.
|
|
if ( ! udpSlot ) return;
|
|
|
|
// gotta send reply back
|
|
g_udpServer.sendReply(0, 0, 0, 0, udpSlot);
|
|
}
|
|
|
|
// call this at startup to register the handlers
|
|
bool initSpiderProxyStuff() {
|
|
// only host #0 has handlers
|
|
if ( ! g_udpServer.registerHandler ( msg_type_54, handleRequest54 ))
|
|
return false;
|
|
|
|
// key is ip/port
|
|
// skip port part of key magic, and get LSB of the IP as key magic
|
|
s_iptab.set(8, sizeof(SpiderProxy), 0, NULL, 0, false, "siptab", true, 5);
|
|
|
|
loadSpiderProxyStats();
|
|
|
|
// build the s_iptab hashtable for the first time
|
|
buildProxyTable ();
|
|
|
|
// reset spider proxy stats every hour to alleviate false positives (moved from Process.cpp)
|
|
if (!g_loop.registerSleepCallback(3600000, NULL, resetProxyStatWrapper, "SpiderProxy::resetProxyStatWrapper", 0)) {
|
|
gbshutdownResourceError();
|
|
}
|
|
|
|
// make the loadtable hashtable
|
|
static bool s_flag = 0;
|
|
if ( s_flag ) return true;
|
|
s_flag = true;
|
|
return s_loadTable.set(4,
|
|
sizeof(LoadBucket),
|
|
128,
|
|
NULL,
|
|
0,
|
|
// this slows us down
|
|
true, // allow dups?
|
|
"lbtab",
|
|
true); // use key magic to mix things up
|
|
|
|
}
|
|
|
|
SpiderProxy *getSpiderProxyByIpPort ( int32_t ip , uint16_t port ) {
|
|
for ( int32_t i = 0 ; i < s_iptab.getNumSlots() ; i++ ) {
|
|
// skip empty slots
|
|
if ( ! s_iptab.m_flags[i] ) continue;
|
|
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValueFromSlot(i);
|
|
if ( sp->m_ip != ip ) continue;
|
|
if ( sp->m_port != port ) continue;
|
|
return sp;
|
|
}
|
|
return NULL;
|
|
}
|