code checkpoint. time slicing, faster spider code

compiling. now needs debug.
This commit is contained in:
Matt Wells
2014-02-04 17:34:43 -08:00
parent 7f4d3205e5
commit 189999509b
10 changed files with 128 additions and 87 deletions

@ -21,6 +21,9 @@ class HashTableX {
char *allocName ,
bool useKeyMagic = false );
// key size is 0 if UNinitialized
bool isInitialized ( ) { return (m_ks != 0); };
HashTableX ( );
~HashTableX ( );
void constructor ();

@ -198,7 +198,3 @@ license that then you can arrange a licensing agreement with Matt Wells.
Likewise, the Event datamining logic is in Events.cpp and must be separately licensed
as well.
And any code in between "#ifdef NEEDLICENSE" and "#endif" statements is not
covered by this license and must be licensed separately, too. That code is
not compiled by default and only pertains to a few isolated things.

@ -22,6 +22,18 @@ void Msg20::destructor () { reset(); m_mcast.destructor(); }
#include "Process.h"
void Msg20::freeReply() {
if ( ! m_r ) return;
// sometimes the msg20 reply carries an merged bffer from
// msg40 that is a constructed ptr_eventSummaryLines from a
// merge operation in msg40. this fixes the "merge20buf1" memory
// leak from Msg40.cpp
m_r->destructor();
if ( m_ownReply ) mfree ( m_r, m_replyMaxSize , "Msg20b" );
m_r = NULL;
}
void Msg20::reset() {
// not allowed to reset one in progress
if ( m_inProgress ) {
@ -33,15 +45,12 @@ void Msg20::reset() {
m_launched = false;
if ( m_request && m_request != m_requestBuf )
mfree ( m_request , m_requestSize , "Msg20rb" );
// sometimes the msg20 reply carries an merged bffer from
// msg40 that is a constructed ptr_eventSummaryLines from a
// merge operation in msg40. this fixes the "merge20buf1" memory
// leak from Msg40.cpp
if ( m_r ) m_r->destructor();
if ( m_r && m_ownReply ) //&& (char *)m_r != m_replyBuf )
mfree ( m_r , m_replyMaxSize , "Msg20b" );
freeReply();
//if ( m_r ) m_r->destructor();
//if ( m_r && m_ownReply ) //&& (char *)m_r != m_replyBuf )
// mfree ( m_r , m_replyMaxSize , "Msg20b" );
//m_r = NULL; // the reply ptr
m_request = NULL; // the request buf ptr
m_r = NULL; // the reply ptr
m_gotReply = false;
m_errno = 0;
m_requestDocId = -1LL;

@ -334,7 +334,7 @@ public:
//long m_numCatIds ; // use size_catIds
//long m_numIndCatIds ; // use size_indCatIds
long m_contentLen ; // was m_docLen
//long m_contentHash ;
long m_contentHash32 ; // for deduping diffbot json objects streaming
//long m_docSummaryScore ;
//long m_inSectionScore ;
//float m_proximityScore ;
@ -780,6 +780,7 @@ class Msg20 {
// so we can alloc arrays of these using mmalloc()
void constructor ();
void destructor ();
void freeReply ();
void reset ();
long m_hack;

@ -14,6 +14,8 @@
//#include "Facebook.h" // msgfb
#include "Speller.h"
#include "Wiki.h"
#include "HttpServer.h"
#include "PageResults.h"
// increasing this doesn't seem to improve performance any on a single
// node cluster....
@ -86,6 +88,7 @@ Msg40::Msg40() {
m_numMsg20s = 0;
m_msg20StartBuf = NULL;
m_numToFree = 0;
// new stuff for streaming results:
m_hadPrintError = false;
m_numPrinted = 0;
m_printedHeader = false;
@ -93,6 +96,8 @@ Msg40::Msg40() {
m_streamResults = false;
m_sendsOut = 0;
m_sendsIn = 0;
m_printi = 0;
m_lastChunk = false;
//m_numGigabitInfos = 0;
}
@ -1207,7 +1212,7 @@ bool gotSummaryWrapper ( void *state ) {
return true;
}
void doneSendingWrapper9 ( void *state ) {
void doneSendingWrapper9 ( void *state , TcpSocket *sock ) {
Msg40 *THIS = (Msg40 *)state;
// the send completed, count it
THIS->m_sendsIn++;
@ -1243,6 +1248,12 @@ bool Msg40::gotSummary ( ) {
// reset g_errno
g_errno = 0;
}
// initialize dedup table if we haven't already
if ( ! m_dedupTable.isInitialized() &&
! m_dedupTable.set ( 4,0,64,NULL,0,false,m_si->m_niceness,"srdt") )
log("query: error initializing dedup table: %s",mstrerror(g_errno));
/*
// sanity check
for ( long i = 0 ; i < m_msg3a.m_numDocIds ; i++ ) {
@ -1264,25 +1275,30 @@ bool Msg40::gotSummary ( ) {
doAgain:
st->m_sb.reset();
State0 *st = (State0 *)m_state;
SafeBuf *sb = &st->m_sb;
for ( ; m_streamResults && m_printi < m_msg3a.m_numDocIds ; m_printi++ ) {
sb->reset();
for ( ; m_si->m_streamResults && m_printi < m_msg3a.m_numDocIds ; m_printi++ ) {
// if we are waiting on our previous send to complete... wait...
if ( m_sendsOut > m_sendsIn ) break;
// otherwise, get the summary for result #m_printi
Msg20 *m20 = &m_msg20s[m_printi];
Msg20 *m20 = m_msg20[m_printi];
// get the next reply we are waiting on to print results in order
Msg20Reply *mr = m20->m_reply;
Msg20Reply *mr = m20->m_r;
if ( ! mr ) break;
// primitive deduping. for diffbot json exclude url's from the
// XmlDoc::m_contentHash32...
if ( st->m_dedupTable.isInTable ( mr->m_contentHash32 ) )
// XmlDoc::m_contentHash32... it will be zero if invalid i guess...
if ( mr->m_contentHash32 &&
m_dedupTable.isInTable ( &mr->m_contentHash32 ) )
continue;
// return true with g_errno set on error
if ( ! st->m_dedupTable.addKey ( &mr->m_contentHash32 ) ) {
if ( mr->m_contentHash32 &&
! m_dedupTable.addKey ( &mr->m_contentHash32 ) ) {
m_hadPrintError = true;
log("msg40: error adding to dedup table: %s",mstrerror(g_errno));
}
@ -1298,7 +1314,6 @@ bool Msg40::gotSummary ( ) {
TcpServer *tcp = &g_httpServer.m_tcp;
// . transmit the chunk in sb if non-zero length
// . steals the allocated buffer from sb and stores in the
// TcpSocket::m_sendBuf, which it frees when socket is
@ -1313,7 +1328,7 @@ bool Msg40::gotSummary ( ) {
sb ,
this ,
doneSendingWrapper9 ,
lastChunk ) )
m_lastChunk ) )
// if it blocked, inc this count. we'll only call m_callback above
// when m_sendsIn equals m_sendsOut... and m_numReplies == m_numRequests
m_sendsOut++;
@ -4742,7 +4757,7 @@ bool Msg40::printSearchResult9 ( long ix ) {
// get state0
State0 *st = (State0 *)m_state;
SafeBuf *sb = st->m_sb;
SafeBuf *sb = &st->m_sb;
// clear it since we are streaming
sb->reset();
@ -4776,14 +4791,12 @@ bool Msg40::printSearchResult9 ( long ix ) {
}
//bool lastChunk = false;
// . wrap it up with Next 10 etc.
// . this is in PageResults.cpp
if ( m_numPrinted >= m_numRequests && ! m_printedTail ) {
m_printedTail = true;
printSearchResultsTail ( st );
//lastChunk = true;
m_lastChunk = true;
}

18
Msg40.h

@ -117,7 +117,7 @@ class Msg40 {
bool computeGigabits( class TopicGroup *tg );
SafeBuf m_gigabitBuf;
#ifdef NEEDLICENSE
// nuggabits...
bool computeFastFacts ( );
bool addFacts ( HashTableX *queryTable,
HashTableX *gbitTable ,
@ -126,7 +126,6 @@ class Msg40 {
bool debugGigabits ,
class Msg20Reply *reply,
SafeBuf *factBuf ) ;
#endif
SafeBuf m_factBuf;
@ -202,6 +201,10 @@ class Msg40 {
// Msg39 and all Msg20s must use the same clock timestamp
time_t m_nowUTC;
bool printSearchResult9 ( long ix ) ;
HashTableX m_dedupTable;
long m_msg3aRecallCnt;
Msg39Request m_r;
@ -237,6 +240,17 @@ class Msg40 {
char *m_msg20StartBuf;
long m_numToFree;
bool m_hadPrintError ;
long m_numPrinted ;
bool m_printedHeader ;
bool m_printedTail ;
bool m_streamResults ;
bool m_lastChunk ;
long m_sendsOut ;
long m_sendsIn ;
long m_printi ;
// use msg3a to get docIds
Msg3a m_msg3a;

@ -58,15 +58,15 @@ bool sendPageDirectory ( TcpSocket *s , HttpRequest *r ) {
//
else {
// search box
printLogoAndSearchBox(sb,r,catId);
printLogoAndSearchBox(&sb,r,catId);
// radio buttons for search dmoz. no, this is printed
// from call to printLogoAndSearchBox()
//printDmozRadioButtons(sb,catId);
// the dmoz breadcrumb
printDMOZCrumb ( sb,catId,xml);
printDMOZCrumb ( &sb,catId,xml);
// print the subtopcis in this topic. show as links above
// the search results
printDMOZSubTopics ( sb, catId , xml );
printDMOZSubTopics ( &sb, catId , xml );
// ok, for now just print the dmoz topics since our search
// results will be empty... until populated!
g_categories->printUrlsInTopic ( &sb , catId );

@ -34,46 +34,6 @@ static void gotResultsWrapper ( void *state ) ;
static void gotState ( void *state ) ;
static bool gotResults ( void *state ) ;
class State0 {
public:
// store results page in this safebuf
SafeBuf m_sb;
collnum_t m_collnum;
Query m_q;
SearchInput m_si;
Msg40 m_msg40;
TcpSocket *m_socket;
Msg0 m_msg0;
long long m_startTime;
//Ads m_ads;
bool m_gotAds;
bool m_gotResults;
char m_spell [MAX_FRAG_SIZE]; // spelling recommendation
bool m_gotSpell;
long m_errno;
Query m_qq3;
long m_numDocIds;
long long m_took; // how long it took to get the results
HttpRequest m_hr;
bool m_printedHeaderRow;
char m_qe[MAX_QUERY_LEN+1];
// for printing our search result json items in csv:
HashTableX m_columnTable;
long m_numCSVColumns;
// stuff for doing redownloads
bool m_didRedownload;
XmlDoc *m_xd;
long m_oldContentHash32;
};
bool printSearchResultsHeader ( State0 *st ) ;
bool printSearchResultsTail ( State0 *st ) ;
bool printResult ( State0 *st, long ix );
bool printCSVHeaderRow ( SafeBuf *sb , State0 *st ) ;
@ -82,17 +42,11 @@ bool printJsonItemInCSV ( char *json , SafeBuf *sb , class State0 *st ) ;
bool printPairScore ( SafeBuf *sb , SearchInput *si , PairScore *ps ,
Msg20Reply *mr , Msg40 *msg40 , bool first ) ;
bool printTermPairs ( SafeBuf *sb , Query *q , PairScore *ps ) ;
bool printSingleTerm ( SafeBuf *sb , Query *q , SingleScore *ss ) ;
bool printScoresHeader ( SafeBuf *sb ) ;
bool printSingleScore ( SafeBuf *sb , SearchInput *si , SingleScore *ss ,
Msg20Reply *mr , Msg40 *msg40 ) ;
bool printLogoAndSearchBox ( SafeBuf *sb , HttpRequest *hr , long catId );
bool sendReply ( State0 *st , char *reply ) {
long savedErr = g_errno;
@ -388,7 +342,7 @@ bool sendPageResults ( TcpSocket *s , HttpRequest *hr ) {
//
// logo header
//
printLogoAndSearchBox ( sb , hr , -1 ); // catId = -1
printLogoAndSearchBox ( &sb , hr , -1 ); // catId = -1
//
// script to populate search results
//

@ -2,15 +2,62 @@
#define _PAGERESULTS_H_
#include "SafeBuf.h"
#include "Language.h" // MAX_FRAG_SIZE
#include "Msg40.h"
#include "Msg0.h"
bool printDmozRadioButtons ( SafeBuf &sb , long catId ) ;
bool printLogoAndSearchBox ( SafeBuf &sb , class HttpRequest *hr, long catId );
class State0 {
public:
bool printTermPairs ( SafeBuf &sb , class Query *q , class PairScore *ps ) ;
bool printSingleTerm ( SafeBuf &sb , class Query *q , class SingleScore *ss );
// store results page in this safebuf
SafeBuf m_sb;
collnum_t m_collnum;
Query m_q;
SearchInput m_si;
Msg40 m_msg40;
TcpSocket *m_socket;
Msg0 m_msg0;
long long m_startTime;
//Ads m_ads;
bool m_gotAds;
bool m_gotResults;
char m_spell [MAX_FRAG_SIZE]; // spelling recommendation
bool m_gotSpell;
long m_errno;
Query m_qq3;
long m_numDocIds;
long long m_took; // how long it took to get the results
HttpRequest m_hr;
bool m_printedHeaderRow;
char m_qe[MAX_QUERY_LEN+1];
// for printing our search result json items in csv:
HashTableX m_columnTable;
long m_numCSVColumns;
// stuff for doing redownloads
bool m_didRedownload;
XmlDoc *m_xd;
long m_oldContentHash32;
};
bool printEventAddress ( SafeBuf &sb , char *addrStr , class SearchInput *si ,
bool printSearchResultsHeader ( class State0 *st ) ;
bool printResult ( class State0 *st, long ix );
bool printSearchResultsTail ( class State0 *st ) ;
bool printDmozRadioButtons ( SafeBuf *sb , long catId ) ;
bool printLogoAndSearchBox ( SafeBuf *sb , class HttpRequest *hr, long catId );
bool printTermPairs ( SafeBuf *sb , class Query *q , class PairScore *ps ) ;
bool printSingleTerm ( SafeBuf *sb , class Query *q , class SingleScore *ss );
bool printEventAddress ( SafeBuf *sb , char *addrStr , class SearchInput *si ,
double *lat , double *lon , bool isXml ,
// use this for printing distance if lat/lon above
// is invalid. only for non-xml printing though.
@ -20,10 +67,10 @@ bool printEventAddress ( SafeBuf &sb , char *addrStr , class SearchInput *si ,
double eventGeocoderLon,
char *eventBestPlaceName );
bool printDMOZCrumb ( SafeBuf &sb , long catId , bool xml ) ;
bool printDMOZSubTopics ( SafeBuf& sb, long catId, bool inXml ) ;
bool printDMOZCrumb ( SafeBuf *sb , long catId , bool xml ) ;
bool printDMOZSubTopics ( SafeBuf *sb, long catId, bool inXml ) ;
bool printEventCountdown2 ( SafeBuf &sb ,
bool printEventCountdown2 ( SafeBuf *sb ,
SearchInput *si,
long now ,
long timeZoneOffset ,

@ -26757,6 +26757,10 @@ Msg20Reply *XmlDoc::getMsg20Reply ( ) {
// we use this instead of nowGlobal
//if ( ! m_spideredTimeValid ) { char *xx=NULL;*xx=0; }
// this should be valid, it is stored in title rec
if ( m_contentHash32Valid ) reply->m_contentHash32 = m_contentHash32;
else reply->m_contentHash32 = 0;
// if this page is potential spam, toss it!
//char *isSpam = getIsSpam();
//if ( ! isSpam || isSpam == (char *)-1 ) return (Msg20Reply *)isSpam;