privacore-open-source-searc.../Msge0.cpp
Ivan Skytte Jørgensen beeddcf35d Got rid of gb-include.h
2018-07-26 17:29:51 +02:00

262 lines
6.4 KiB
C++

#include "Msge0.h"
#include "Process.h"
#include "Tagdb.h"
#include "ip.h"
#include "UrlBlockCheck.h"
#include "UdpServer.h"
#include "Mem.h"
#include "ScopedLock.h"
#include "Errno.h"
#include <new>
Msge0::Msge0()
: m_collnum(0),
m_niceness(0),
m_urlPtrs(NULL),
m_urlFlags(NULL),
m_numUrls(0),
m_buf(NULL),
m_bufSize(0),
m_baseTagRec(NULL),
m_tagRecErrors(NULL),
m_tagRecPtrs(NULL),
m_tagRecs(NULL),
m_numRequests(0),
m_numReplies(0),
m_n(0),
m_mtx(),
m_state(NULL),
m_callback(NULL),
m_errno(0)
{
for(int i=0; i<MAX_OUTSTANDING_MSGE0; i++)
m_ns[i] = 0;
for(int i=0; i<MAX_OUTSTANDING_MSGE0; i++)
m_used[i] = false;
}
Msge0::~Msge0() {
reset();
}
void Msge0::reset() {
m_errno = 0;
//free TagRecs that are not the base tag
for ( int32_t i = 0 ; i < m_n ; i++ ) {
if(m_tagRecPtrs[i] && m_tagRecPtrs[i]!=m_baseTagRec)
m_tagRecPtrs[i]->~TagRec();
}
if ( m_buf ) {
mfree ( m_buf , m_bufSize,"Msge0buf");
}
m_buf = NULL;
m_tagRecErrors = NULL;
m_tagRecPtrs = NULL;
m_tagRecs = NULL;
m_numRequests = 0;
m_numReplies = 0;
m_n = 0;
for(int i=0; i<MAX_OUTSTANDING_MSGE0; i++)
m_urls[i].reset();
for(int i=0; i<MAX_OUTSTANDING_MSGE0; i++)
m_msg8as[i].reset();
}
// . get various information for each url in a list of urls
// . urls in "urlBuf" are \0 terminated
// . used to be called getSiteRecs()
// . you can pass in a list of docIds rather than urlPtrs
bool Msge0::getTagRecs ( const char **urlPtrs ,
const linkflags_t *urlFlags , //Links::m_linkFlags
int32_t numUrls ,
TagRec *baseTagRec ,
collnum_t collnum,
int32_t niceness ,
void *state ,
void (*callback)(void *state) ) {
reset();
// bail if no urls or linkee
if ( numUrls <= 0 ) return true;
// save all input parms
m_urlPtrs = urlPtrs;
m_urlFlags = urlFlags;
m_numUrls = numUrls;
m_baseTagRec = baseTagRec;
m_collnum = collnum;
m_niceness = niceness;
m_state = state;
m_callback = callback;
// . how much mem to alloc?
// . include an extra 4 bytes for each one to hold possible errno
int32_t needPerUrl = sizeof(int32_t) // error
+ sizeof(TagRec*) // tag ptr
+ sizeof(TagRec); // m_tagRecs
// one per url
int32_t needTotal = needPerUrl * numUrls;
// allocate the buffer to hold all the info we gather
m_buf = (char *)mcalloc ( needTotal , "Msge0buf" );
if ( ! m_buf ) return true;
m_bufSize = needTotal;
// clear it all
memset ( m_buf , 0 , m_bufSize );
// set the ptrs!
char *p = m_buf;
m_tagRecErrors = (int32_t *)p ; p += numUrls * sizeof(int32_t);
m_tagRecPtrs = (TagRec **)p ; p += numUrls * sizeof(TagRec *);
m_tagRecs = (TagRec*)p; p += numUrls * sizeof(TagRec);
// initialize
m_numRequests = 0;
m_numReplies = 0;
// . point to first url to process
// . url # m_n
m_n = 0;
// clear the m_used flags
for(int i=0; i<MAX_OUTSTANDING_MSGE0; i++)
m_used[i] = false;
// . launch the requests
return launchRequests();
}
// we only come back up here 1) in the very beginning or 2) when a url
// completes its pipeline of requests
bool Msge0::launchRequests() {
// reset any error code
g_errno = 0;
// if all hosts are getting a diffbot reply with 50 spiders and they
// all timeout at the same time we can very easily clog up the
// udp sockets, so use this to limit... i've seen the whole
// spider tables stuck with "getting outlink tag rec vector"statuses
const int32_t maxOut = g_udpServer.getNumUsedSlots() > 500 ? 1 : MAX_OUTSTANDING_MSGE0;
ScopedLock sl(m_mtx);
while(m_n < m_numUrls && m_numRequests - m_numReplies < maxOut) {
// if url is same host as the tagrec provided, just reference that!
if ( m_urlFlags && (m_urlFlags[m_n] & LF_SAMESITE) && m_baseTagRec) {
m_tagRecPtrs[m_n] = m_baseTagRec;
m_numRequests++;
m_numReplies++;
m_n++;
continue;
}
Url url;
url.set(m_urlPtrs[m_n]);
if (isUrlBlocked(url)) {
// skip tagrec lookup if url is blocked
m_tagRecPtrs[m_n] = nullptr;
m_numRequests++;
m_numReplies++;
m_n++;
continue;
}
// . grab a slot
int32_t i;
for (i = 0; i < MAX_OUTSTANDING_MSGE0; i++) {
if (!m_used[i]) {
break;
}
}
// sanity check
if (i >= MAX_OUTSTANDING_MSGE0) {
g_process.shutdownAbort(true);
}
// normalize the url
m_urls[i].set(m_urlPtrs[m_n]);
// save the url number, "n"
m_ns[i] = m_n;
// claim it
m_used[i] = true;
// . start it off
// . this will start the pipeline for this url
m_numRequests++;
m_n++;
sendMsg8a(i);
}
if (m_n >= m_numUrls) {
return m_numRequests == m_numReplies;
}
return false;
}
bool Msge0::sendMsg8a(int32_t slotIndex) {
// set m_errno if we should at this point
if ( g_errno && ! m_errno ) m_errno = g_errno;
g_errno = 0;
Msg8a *m = &m_msg8as[slotIndex];
// save state into Msg8a
m->m_msge0 = this;
m->m_msge0State = slotIndex;
// we are processing the nth url
int32_t n = m_ns[slotIndex];
// now use it
m_tagRecPtrs[n] = new (m_tagRecs+n) TagRec();
// . this now employs the tagdb filters table for lookups
// . that is really a hack until we find a way to identify subsites
// on a domain automatically, like blogspot.com/users/harry/ is a
// subsite.
if ( !m->getTagRec( &m_urls[slotIndex], m_collnum, m_niceness, m, gotTagRecWrapper, m_tagRecPtrs[n] ))
return false;
doneSending_unlocked(slotIndex);
return true;
}
void Msge0::gotTagRecWrapper(void *state) {
Msg8a *m = reinterpret_cast<Msg8a*>(state);
Msge0 *THIS = m->m_msge0;
int32_t slotIndex = m->m_msge0State;
if(!THIS->m_used[slotIndex])
g_process.shutdownAbort(true);
THIS->doneSending(slotIndex);
// try to launch more, returns false if not done
if ( ! THIS->launchRequests() ) return;
// must be all done, call the callback
THIS->m_callback ( THIS->m_state );
}
void Msge0::doneSending(int32_t slotIndex) {
ScopedLock sl(m_mtx);
doneSending_unlocked(slotIndex);
}
void Msge0::doneSending_unlocked(int32_t slotIndex) {
// we are processing the nth url
int32_t n = m_ns[slotIndex];
// save the error if msg8a had one
m_tagRecErrors[n] = g_errno;
// also, set m_errno for this Msge0 class...
if ( g_errno && ! m_errno ) m_errno = g_errno;
// reset error for successive calls to other msgs
g_errno = 0;
// tally it up
m_numReplies++;
// free it
m_used[slotIndex] = false;
}