#include "Msge0.h" #include "Process.h" #include "Tagdb.h" #include "ip.h" #include "UrlBlockList.h" #include "UdpServer.h" #include "Mem.h" #include "ScopedLock.h" #include <new> Msge0::Msge0() : m_collnum(0), m_niceness(0), m_urlPtrs(NULL), m_urlFlags(NULL), m_numUrls(0), m_buf(NULL), m_bufSize(0), m_baseTagRec(NULL), m_tagRecErrors(NULL), m_tagRecPtrs(NULL), m_tagRecs(NULL), m_numRequests(0), m_numReplies(0), m_n(0), m_mtx(), m_state(NULL), m_callback(NULL), m_errno(0) { for(int i=0; i<MAX_OUTSTANDING_MSGE0; i++) m_ns[i] = 0; for(int i=0; i<MAX_OUTSTANDING_MSGE0; i++) m_used[i] = false; } Msge0::~Msge0() { reset(); } void Msge0::reset() { m_errno = 0; //free TagRecs that are not the base tag for ( int32_t i = 0 ; i < m_n ; i++ ) { if(m_tagRecPtrs[i] && m_tagRecPtrs[i]!=m_baseTagRec) m_tagRecPtrs[i]->~TagRec(); } if ( m_buf ) { mfree ( m_buf , m_bufSize,"Msge0buf"); } m_buf = NULL; m_tagRecErrors = NULL; m_tagRecPtrs = NULL; m_tagRecs = NULL; m_numRequests = 0; m_numReplies = 0; m_n = 0; for(int i=0; i<MAX_OUTSTANDING_MSGE0; i++) m_urls[i].reset(); for(int i=0; i<MAX_OUTSTANDING_MSGE0; i++) m_msg8as[i].reset(); } // . get various information for each url in a list of urls // . urls in "urlBuf" are \0 terminated // . used to be called getSiteRecs() // . you can pass in a list of docIds rather than urlPtrs bool Msge0::getTagRecs ( const char **urlPtrs , const linkflags_t *urlFlags , //Links::m_linkFlags int32_t numUrls , TagRec *baseTagRec , collnum_t collnum, int32_t niceness , void *state , void (*callback)(void *state) ) { reset(); // bail if no urls or linkee if ( numUrls <= 0 ) return true; // save all input parms m_urlPtrs = urlPtrs; m_urlFlags = urlFlags; m_numUrls = numUrls; m_baseTagRec = baseTagRec; m_collnum = collnum; m_niceness = niceness; m_state = state; m_callback = callback; // . how much mem to alloc? // . include an extra 4 bytes for each one to hold possible errno int32_t needPerUrl = sizeof(int32_t) // error + sizeof(TagRec*) // tag ptr + sizeof(TagRec); // m_tagRecs // one per url int32_t needTotal = needPerUrl * numUrls; // allocate the buffer to hold all the info we gather m_buf = (char *)mcalloc ( needTotal , "Msge0buf" ); if ( ! m_buf ) return true; m_bufSize = needTotal; // clear it all memset ( m_buf , 0 , m_bufSize ); // set the ptrs! char *p = m_buf; m_tagRecErrors = (int32_t *)p ; p += numUrls * sizeof(int32_t); m_tagRecPtrs = (TagRec **)p ; p += numUrls * sizeof(TagRec *); m_tagRecs = (TagRec*)p; p += numUrls * sizeof(TagRec); // initialize m_numRequests = 0; m_numReplies = 0; // . point to first url to process // . url # m_n m_n = 0; // clear the m_used flags for(int i=0; i<MAX_OUTSTANDING_MSGE0; i++) m_used[i] = false; // . launch the requests return launchRequests(); } // we only come back up here 1) in the very beginning or 2) when a url // completes its pipeline of requests bool Msge0::launchRequests() { // reset any error code g_errno = 0; // if all hosts are getting a diffbot reply with 50 spiders and they // all timeout at the same time we can very easily clog up the // udp sockets, so use this to limit... i've seen the whole // spider tables stuck with "getting outlink tag rec vector"statuses const int32_t maxOut = g_udpServer.getNumUsedSlots() > 500 ? 1 : MAX_OUTSTANDING_MSGE0; ScopedLock sl(m_mtx); while(m_n < m_numUrls && m_numRequests - m_numReplies < maxOut) { // if url is same host as the tagrec provided, just reference that! if ( m_urlFlags && (m_urlFlags[m_n] & LF_SAMEHOST) && m_baseTagRec) { m_tagRecPtrs[m_n] = (TagRec *)m_baseTagRec; m_numRequests++; m_numReplies++; m_n++; continue; } // . get the next url const char *p = m_urlPtrs[m_n]; Url url; url.set(p); if(g_urlBlockList.isUrlBlocked(url)) { //if(g_conf.m_logDebug...something...) // log("...something...: skipping tagrec lookup of '%*.*s' because the URL is blocked", (int)url.getHostLen(), (int)url.getHostLen(), url.getHost()); m_tagRecPtrs[m_n] = (TagRec *)m_baseTagRec; m_numRequests++; m_numReplies++; m_n++; continue; } // get the length int32_t plen = strlen(p); // . grab a slot int32_t i; for ( i = 0; i < MAX_OUTSTANDING_MSGE0 ; i++ ) if ( ! m_used[i] ) break; // sanity check if ( i >= MAX_OUTSTANDING_MSGE0 ) { g_process.shutdownAbort(true); } // normalize the url m_urls[i].set( p, plen ); // save the url number, "n" m_ns [i] = m_n++; // claim it m_used[i] = true; // . start it off // . this will start the pipeline for this url m_numRequests++; sendMsg8a(i); } if( m_n >= m_numUrls ) return m_numRequests == m_numReplies; return false; } bool Msge0::sendMsg8a(int32_t slotIndex) { // set m_errno if we should at this point if ( g_errno && ! m_errno ) m_errno = g_errno; g_errno = 0; Msg8a *m = &m_msg8as[slotIndex]; // save state into Msg8a m->m_msge0 = this; m->m_msge0State = slotIndex; // we are processing the nth url int32_t n = m_ns[slotIndex]; // now use it m_tagRecPtrs[n] = new (m_tagRecs+n) TagRec(); // . this now employs the tagdb filters table for lookups // . that is really a hack until we find a way to identify subsites // on a domain automatically, like blogspot.com/users/harry/ is a // subsite. if ( !m->getTagRec( &m_urls[slotIndex], m_collnum, m_niceness, m, gotTagRecWrapper, m_tagRecPtrs[n] )) return false; doneSending_unlocked(slotIndex); return true; } void Msge0::gotTagRecWrapper(void *state) { Msg8a *m = reinterpret_cast<Msg8a*>(state); Msge0 *THIS = m->m_msge0; int32_t slotIndex = m->m_msge0State; if(!THIS->m_used[slotIndex]) g_process.shutdownAbort(true); THIS->doneSending(slotIndex); // try to launch more, returns false if not done if ( ! THIS->launchRequests() ) return; // must be all done, call the callback THIS->m_callback ( THIS->m_state ); } void Msge0::doneSending(int32_t slotIndex) { ScopedLock sl(m_mtx); doneSending_unlocked(slotIndex); } void Msge0::doneSending_unlocked(int32_t slotIndex) { // we are processing the nth url int32_t n = m_ns[slotIndex]; // save the error if msg8a had one m_tagRecErrors[n] = g_errno; // also, set m_errno for this Msge0 class... if ( g_errno && ! m_errno ) m_errno = g_errno; // reset error for successive calls to other msgs g_errno = 0; // tally it up m_numReplies++; // free it m_used[slotIndex] = false; }