312 lines
7.3 KiB
C++
312 lines
7.3 KiB
C++
#include "Msge1.h"
|
|
#include "Process.h"
|
|
#include "Tagdb.h"
|
|
#include "ip.h"
|
|
#include "UrlBlockCheck.h"
|
|
#include "Conf.h"
|
|
#include "Mem.h"
|
|
#include "ScopedLock.h"
|
|
#include "SiteGetter.h"
|
|
#include "Errno.h"
|
|
#include "utf8_fast.h"
|
|
|
|
|
|
Msge1::Msge1()
|
|
: m_niceness(0),
|
|
m_urlPtrs(NULL),
|
|
m_urlFlags(NULL),
|
|
m_numUrls(0),
|
|
m_addTags(false),
|
|
m_buf(NULL),
|
|
m_bufSize(0),
|
|
m_ipBuf(NULL),
|
|
m_ipErrors(NULL),
|
|
m_numRequests(0),
|
|
m_numReplies(0),
|
|
m_n(0),
|
|
m_mtx(),
|
|
m_msgCs(),
|
|
m_grv(NULL),
|
|
m_state(NULL),
|
|
m_callback(NULL),
|
|
m_nowGlobal(0),
|
|
m_errno(0)
|
|
{
|
|
for(int i=0; i<MAX_OUTSTANDING_MSGE1; i++)
|
|
m_ns[i] = 0;
|
|
for(int i=0; i<MAX_OUTSTANDING_MSGE1; i++)
|
|
m_used[i] = false;
|
|
}
|
|
|
|
Msge1::~Msge1() {
|
|
reset();
|
|
}
|
|
|
|
|
|
void Msge1::reset() {
|
|
m_errno = 0;
|
|
if ( m_buf ) {
|
|
mfree(m_buf, m_bufSize, "Msge1buf");
|
|
m_buf = NULL;
|
|
}
|
|
m_ipBuf = NULL;
|
|
m_ipErrors = NULL;
|
|
m_numRequests = 0;
|
|
m_numReplies = 0;
|
|
m_n = 0;
|
|
|
|
for(int i=0; i<MAX_OUTSTANDING_MSGE1; i++)
|
|
m_ns[i] = 0;
|
|
for(int i=0; i<MAX_OUTSTANDING_MSGE1; i++)
|
|
m_used[i] = false;
|
|
}
|
|
|
|
|
|
// . get various information for each url in a list of urls
|
|
// . urls in "urlBuf" are \0 terminated
|
|
// . used to be called getSiteRecs()
|
|
// . you can pass in a list of docIds rather than urlPtrs
|
|
bool Msge1::getFirstIps ( TagRec **grv ,
|
|
const char **urlPtrs,
|
|
const linkflags_t *urlFlags,
|
|
int32_t numUrls ,
|
|
int32_t niceness ,
|
|
void *state ,
|
|
void (*callback)(void *state) ,
|
|
int32_t nowGlobal) {
|
|
|
|
reset();
|
|
// bail if no urls or linkee
|
|
if ( numUrls <= 0 ) return true;
|
|
|
|
// save all input parms
|
|
m_grv = grv;
|
|
m_urlPtrs = urlPtrs;
|
|
m_urlFlags = urlFlags;
|
|
m_numUrls = numUrls;
|
|
m_niceness = niceness;
|
|
m_state = state;
|
|
m_callback = callback;
|
|
m_nowGlobal = nowGlobal;
|
|
|
|
// . how much mem to alloc?
|
|
// . include an extra 4 bytes for each one to hold possible errno
|
|
int32_t needPerUrl = sizeof(*m_ipBuf)
|
|
+ sizeof(*m_ipErrors);
|
|
// one per url
|
|
int32_t needTotal = needPerUrl * numUrls;
|
|
// allocate the buffer to hold all the info we gather
|
|
m_buf = (char *)mcalloc ( needTotal , "Msge1buf" );
|
|
if ( ! m_buf ) return true;
|
|
m_bufSize = needTotal;
|
|
// clear it all
|
|
memset ( m_buf , 0 , m_bufSize );
|
|
// set the ptrs!
|
|
char *p = m_buf;
|
|
m_ipBuf = (int32_t *)p ; p += numUrls * sizeof(*m_ipBuf);
|
|
m_ipErrors = (int32_t *)p ; p += numUrls * sizeof(*m_ipErrors);
|
|
// initialize
|
|
m_numRequests = 0;
|
|
m_numReplies = 0;
|
|
// . point to first url to process
|
|
// . url # m_n
|
|
m_n = 0;
|
|
// clear the m_used flags
|
|
for(int i=0; i<MAX_OUTSTANDING_MSGE1; i++)
|
|
m_used[i] = false;
|
|
|
|
// . launch the requests
|
|
return launchRequests(0);
|
|
}
|
|
|
|
// we only come back up here 1) in the very beginning or 2) when a url
|
|
// completes its pipeline of requests
|
|
bool Msge1::launchRequests ( int32_t starti ) {
|
|
// reset any error code
|
|
g_errno = 0;
|
|
|
|
const int32_t maxOut = MAX_OUTSTANDING_MSGE1;
|
|
|
|
ScopedLock sl(m_mtx);
|
|
while(m_n < m_numUrls && m_numRequests - m_numReplies < maxOut) {
|
|
// grab the "firstip" from the tagRec if we can
|
|
TagRec *gr = m_grv[m_n];
|
|
if (gr) {
|
|
// verify tagrec
|
|
if (g_conf.m_verifyTagRec) {
|
|
const char *site = gr->getString("site");
|
|
if (site != nullptr) {
|
|
SiteGetter sg;
|
|
sg.getSite(m_urlPtrs[m_n], nullptr, 0, m_niceness);
|
|
|
|
if (strcmp(site, sg.getSite()) != 0) {
|
|
SafeBuf sb;
|
|
gr->printToBuf(&sb);
|
|
logError("tagrec: %s", sb.getBufStart());
|
|
logError("tagsite: %s sitegetter: %s", site, sg.getSite());
|
|
|
|
gbshutdownLogicError();
|
|
}
|
|
}
|
|
}
|
|
|
|
Tag *tag = gr->getTag("firstip");
|
|
if (tag) {
|
|
// grab the ip that was in there
|
|
int32_t ip = atoip(tag->getTagData());
|
|
|
|
// if we had it but it was 0 or -1, then time that out
|
|
// after a day or so in case it works again! 0 and -1 mean
|
|
// NXDOMAIN or timeout error, etc.
|
|
if (ip == 0 || ip == -1) {
|
|
if (m_nowGlobal - tag->m_timestamp > 3600 * 24) {
|
|
tag = NULL;
|
|
}
|
|
}
|
|
|
|
// . if we still got the tag, use that, even if ip is 0 or -1
|
|
// . this keeps things fast
|
|
// . this makes sure doConsistencyCheck() does not block too in
|
|
// XmlDoc.cpp... cuz it cores if it does block
|
|
if (tag) {
|
|
// now "ip" might actually be -1 or 0 (invalid) so be careful
|
|
m_ipBuf[m_n] = ip;
|
|
m_numRequests++;
|
|
m_numReplies++;
|
|
m_n++;
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
// . get the next url
|
|
// . if m_xd is set, create the url from the ad id
|
|
const char *p = m_urlPtrs[m_n];
|
|
|
|
// if it is ip based that makes things easy
|
|
int32_t hlen = 0;
|
|
const char *host = getHostFast ( p , &hlen );
|
|
|
|
// see if the hostname is actually an ip like "1.2.3.4"
|
|
if (host && is_digit(host[0])) {
|
|
int32_t ip = atoip(host, hlen);
|
|
// if legit this is non-zero
|
|
if (ip) {
|
|
m_ipBuf[m_n] = ip;
|
|
m_numRequests++;
|
|
m_numReplies++;
|
|
m_n++;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
Url url;
|
|
url.set(p);
|
|
if (isUrlBlocked(url)) {
|
|
// debug for now
|
|
if (g_conf.m_logDebugDns) {
|
|
log("dns: skipping dns lookup of '%*.*s' because the URL is blocked",
|
|
(int)url.getHostLen(), (int)url.getHostLen(), url.getHost());
|
|
}
|
|
|
|
// -1 means time out i guess
|
|
m_ipBuf[m_n] = -1;
|
|
m_numRequests++;
|
|
m_numReplies++;
|
|
m_n++;
|
|
continue;
|
|
}
|
|
|
|
// . grab a slot
|
|
int32_t i;
|
|
for (i = starti; i < MAX_OUTSTANDING_MSGE1; i++) {
|
|
if (!m_used[i]) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
// sanity check
|
|
if (i >= MAX_OUTSTANDING_MSGE1) {
|
|
g_process.shutdownAbort(true);
|
|
}
|
|
|
|
// save the url number, "n"
|
|
m_ns[i] = m_n++;
|
|
|
|
// claim it
|
|
m_used[i] = true;
|
|
|
|
// . start it off
|
|
// . this will start the pipeline for this url
|
|
// . it will set m_used[i] to true if we use it and block
|
|
// . it will increment m_numRequests and NOT m_numReplies if it blocked
|
|
m_numRequests++;
|
|
sendMsgC(i, host, hlen);
|
|
}
|
|
|
|
if (m_n >= m_numUrls) {
|
|
return m_numRequests == m_numReplies;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
|
|
bool Msge1::sendMsgC(int32_t slotIndex, const char *host, int32_t hlen) {
|
|
// set m_errno if we should at this point
|
|
if ( ! m_errno && g_errno != ENOTFOUND ) m_errno = g_errno;
|
|
g_errno = 0;
|
|
|
|
MsgC *m = &m_msgCs[slotIndex];
|
|
// save state into MsgC
|
|
m->m_msge1 = this;
|
|
m->m_msge1State = slotIndex;
|
|
|
|
// we are processing the nth url
|
|
int32_t n = m_ns[slotIndex];
|
|
|
|
if (!m->getIp(host, hlen, &m_ipBuf[n], m, gotMsgCWrapper))
|
|
return false;
|
|
doneSending_unlocked(slotIndex);
|
|
return true;
|
|
}
|
|
|
|
void Msge1::gotMsgCWrapper(void *state, int32_t ip) {
|
|
MsgC *m = (MsgC *)state;
|
|
Msge1 *THIS = m->m_msge1;
|
|
int32_t slotIndex = m->m_msge1State;
|
|
|
|
if(!THIS->m_used[slotIndex])
|
|
g_process.shutdownAbort(true);
|
|
|
|
THIS->doneSending(slotIndex);
|
|
|
|
// try to launch more, returns false if not done
|
|
if ( ! THIS->launchRequests(slotIndex) ) return;
|
|
// must be all done, call the callback
|
|
THIS->m_callback ( THIS->m_state );
|
|
}
|
|
|
|
|
|
void Msge1::doneSending(int32_t slotIndex) {
|
|
ScopedLock sl(m_mtx);
|
|
doneSending_unlocked(slotIndex);
|
|
}
|
|
|
|
|
|
void Msge1::doneSending_unlocked(int32_t slotIndex) {
|
|
// we are processing the nth url
|
|
int32_t n = m_ns[slotIndex];
|
|
// save the error if msgC had one
|
|
m_ipErrors[n] = g_errno;
|
|
// save m_errno
|
|
if ( g_errno && ! m_errno ) m_errno = g_errno;
|
|
// reset error for successive calls to other msgs
|
|
g_errno = 0;
|
|
|
|
// tally it up
|
|
m_numReplies++;
|
|
// free it
|
|
m_used[slotIndex] = false;
|
|
}
|