arc indexing works again

This commit is contained in:
Matt
2015-05-03 13:10:17 -07:00
parent a07c94f85d
commit f63cccaf01
2 changed files with 115 additions and 216 deletions

@ -3237,205 +3237,10 @@ bool XmlDoc::indexContainerDoc ( ) {
}
// returns false if would block, true otherwise. returns true and sets g_errno on err
bool XmlDoc::indexArc ( ) {
int8_t *hc = getHopCount();
if ( ! hc ) return true; // error?
if ( hc == (void *)-1 ) return false;
// first download
char **acp = getUtf8Content();
// return true with g_errno set on error
if ( ! acp ) {
if ( ! g_errno ) { char *xx=NULL;*xx=0; }
return true;
}
// would block? return false then
if ( acp == (void *)-1 )
return false;
if ( *acp == NULL ) {
log("build: arc content was empty. did not index.");
return true;
}
// need this. it is almost 1MB in size, so alloc it
if ( ! m_msg7 ) {
try { m_msg7 = new ( Msg7 ); }
catch ( ... ) {
g_errno = ENOMEM;
return true;
}
mnew ( m_msg7 , sizeof(Msg7),"xdmsg7");
}
// inject input parms:
GigablastRequest *gr = &m_msg7->m_gr;
// the cursor for scanning the subdocs
if ( ! m_arcContentPtr ) {
// init the content cursor to point to the first subdoc
m_arcContentPtr = *acp;
// init the input parms
memset ( gr , 0 , sizeof(GigablastRequest) );
// reset it
gr->m_spiderLinks = false;
gr->m_injectLinks = false;
gr->m_hopCount = *hc + 1;
if ( ! m_collnumValid ) { char *xx=NULL;*xx=0; }
gr->m_collnum = m_collnum;
// will this work on a content delimeterized doc?
gr->m_deleteUrl = m_deleteFromIndex;
// each subdoc will have a mime since it is an arc
gr->m_hasMime = true;
}
subdocLoop:
QUICKPOLL ( m_niceness );
// we had \0 terminated the end of the previous record, so put back
if ( m_savedChar && ! *m_arcContentPtr ) *m_arcContentPtr = m_savedChar;
// . should have the url as well.
// . the url, ip etc. are on a single \n terminated line for an arc!
char *arcHeader = strstr(m_arcContentPtr,"\nhttp");
if ( ! arcHeader ) {
log("inject: arc: all done");
return true;
}
// find end of url
char *arcHeaderEnd = strstr (arcHeader+1,"\n");
if ( ! arcHeaderEnd ) {
log("inject: arc: no header end. all done");
return true;
}
// term it
*arcHeaderEnd = '\0';
//char *arcRecord = arcHeaderEnd + 1;
// get url
char *arcUrl = arcHeader + 1;
char *hp = arcUrl;
for ( ; *hp && *hp != ' ' ; hp++ );
if ( ! *hp ) {
log("inject: bad arc header 1.");
return true;
}
*hp++ = '\0';
// get ip
char *ipStr = hp;
for ( ; *hp && *hp != ' ' ; hp++ );
if ( ! *hp ) {
log("inject: bad arc header 2.");
return true;
}
*hp++ = '\0';
gr->m_injectDocIp = atoip(ipStr);
// get time
char *timeStr = hp;
for ( ; *hp && *hp != ' ' ; hp++ );
if ( ! *hp ) {
log("inject: bad arc header 3.");
return true;
}
*hp++ = '\0';
// get content type
char *arcConType = hp;
for ( ; *hp && *hp != ' ' ; hp++ );
if ( ! *hp ) {
log("inject: bad arc header 4.");
return true;
}
*hp++ = '\0';
// get length of following doc in bytes, it's already \0 terminated
// since it is the last thing on the line
char *arcConLenStr = hp;
// convert to number
int64_t arcRecLen = atoll(arcConLenStr);
// we could also use m_contentFile if it was on disk
gr->m_content = arcHeaderEnd + 1;
// advance for loop
m_arcContentPtr = arcHeaderEnd + 1 + arcRecLen;
// null term this record
m_savedChar = *m_arcContentPtr; *m_arcContentPtr = '\0';
// convert to timestamp
int64_t arcTime = 0;
// this time structure, once filled, will help yield a time_t
struct tm t;
// DAY OF MONTH
t.tm_mday = atol2 ( timeStr + 6 , 2 );
// MONTH
t.tm_mon = atol2 ( timeStr + 4 , 2 );
// YEAR
// # of years since 1900
t.tm_year = atol2 ( timeStr , 4 ) - 1900 ;
// TIME
t.tm_hour = atol2 ( timeStr + 8 , 2 );
t.tm_min = atol2 ( timeStr + 10 , 2 );
t.tm_sec = atol2 ( timeStr + 12 , 2 );
// unknown if we're in daylight savings time
t.tm_isdst = -1;
// translate using mktime
arcTime = timegm ( &t );
gr->m_firstIndexed = arcTime;
gr->m_lastSpidered = arcTime;
// assume "start" has the http mime
gr->m_hasMime = true;
gr->m_url = arcUrl;
// arcConType needs to indexable
int32_t ct = getContentTypeFromStr ( arcConType );
if ( ct != CT_HTML &&
ct != CT_TEXT &&
ct != CT_XML &&
ct != CT_JSON ) {
// read another arc record
goto subdocLoop;
}
// skip if robots.txt
if ( isRobotsTxtFile(arcUrl,gbstrlen(arcUrl) ) )
goto subdocLoop;
QUICKPOLL ( m_niceness );
// TODO: set these based on the date in the warc mime!!
//gr->m_firstIndexed = ;
//gr->m_lastSpidered = ;
// then process. this will scan over each delimeted
// doc in the arc/warc file and inject each one individually.
if ( ! m_msg7->inject2 ( m_masterState , m_masterLoop ) )
// it would block, callback will be called later
return false;
QUICKPOLL ( m_niceness );
// error?
if ( g_errno ) {
log("build: index arc error %s",mstrerror(g_errno));
return NULL;
}
// loop it up
goto subdocLoop;
}
void doneInjectingWarcRec ( void *state ) {
void doneInjectingArchiveRec ( void *state ) {
XmlDoc *THIS = (XmlDoc *)state;
THIS->m_numInjectionsOut--;
log("build: warc: injection thread returned. %"INT32" out now.",
log("build: archive: injection thread returned. %"INT32" out now.",
THIS->m_numInjectionsOut);
THIS->m_masterLoop ( THIS );
}
@ -3483,7 +3288,7 @@ bool XmlDoc::indexWarcOrArc ( char ctype ) {
if ( m_doneInjectingWarc ) {
warcDone:
m_doneInjectingWarc = true;
log("build: done parsing %"INT64 " bytes of warc file %s.",
log("build: done parsing %"INT64 " bytes of archive file %s.",
fileSize,file->getFilename());
// return if all injects have returned.
if ( m_numInjectionsOut == 0 ) {
@ -3500,12 +3305,13 @@ bool XmlDoc::indexWarcOrArc ( char ctype ) {
if ( needReadMore ) {
log("build: reading %"INT64" bytes more of warc file"
,(int64_t)MAXWARCRECSIZE);
log("build: reading %"INT64" bytes more of archive file %s"
,(int64_t)MAXWARCRECSIZE,file->getFilename());
// are we done?
if ( m_fileOff >= fileSize ) {
log("build: hit end of warc file. done.");
log("build: hit end of archive file %s. done.",
file->getFilename());
goto warcDone;
}
@ -3516,7 +3322,8 @@ bool XmlDoc::indexWarcOrArc ( char ctype ) {
m_fileBuf=(char *)mmalloc(m_fileBufAllocSize ,"sibuf");
}
if ( ! m_fileBuf ) {
log("build: failed to alloc buf to read warc file.");
log("build: failed to alloc buf to read archive file "
"%s",file->getFilename());
return true;
}
@ -3541,7 +3348,8 @@ bool XmlDoc::indexWarcOrArc ( char ctype ) {
// if not enough to constitute a WARC record probably just
// new lines
if ( toRead < 20 ) {
log("build: done processing warc file.");
log("build: done processing archive file %s",
file->getFilename());
goto warcDone;
}
// point to what we read
@ -3577,6 +3385,9 @@ bool XmlDoc::indexWarcOrArc ( char ctype ) {
uint64_t oldOff = m_fileOff;
//
// set recUrl, recIp, recTime, recContent, recContentLen and recSize
//
if ( ctype == CT_WARC ) {
// find "WARC/1.0" or whatever
char *whp = m_fptr;
@ -3697,9 +3508,97 @@ bool XmlDoc::indexWarcOrArc ( char ctype ) {
recTime = 0;
if ( warcDate ) recTime = atotime ( warcDate );
recIp = warcIp;
// END WARC SPECIFIC PARSING
}
// END WARC SPECIFIC PARSING
//
// set recUrl, recIp, recTime, recContent, recContentLen and recSize
//
if ( ctype == CT_ARC ) {
// find \n\nhttp://
char *whp = m_fptr;
for ( ; *whp ; whp++ ) {
if ( whp[0] != '\n' ) continue;
if ( strncmp(whp+1,"http://",7) ) continue;
break;
}
// none?
if ( ! *whp ) {
log("build: arc: could not find next \\nhttp:// in "
"arc file %s",file->getFilename());
goto warcDone;
}
char *arcHeader = whp;
// find end of arc header not the content
char *arcHeaderEnd = strstr(arcHeader+1,"\n");
if ( ! arcHeaderEnd ) {
log("inject: could not find end of ARC header.");
exit(0);
}
// \0 term for strstrs below
*arcHeaderEnd = '\0';
char *arcContent = arcHeaderEnd + 1;
// parse arc header line
char *url = arcHeader + 1;
char *hp = url;
for ( ; *hp && *hp != ' ' ; hp++ );
if ( ! *hp ) {log("inject: bad arc header 1.");exit(0);}
*hp++ = '\0';
char *ipStr = hp;
for ( ; *hp && *hp != ' ' ; hp++ );
if ( ! *hp ) {log("inject: bad arc header 2.");exit(0);}
*hp++ = '\0';
char *timeStr = hp;
for ( ; *hp && *hp != ' ' ; hp++ );
if ( ! *hp ) {log("inject: bad arc header 3.");exit(0);}
*hp++ = '\0'; // null term timeStr
char *arcConType = hp;
for ( ; *hp && *hp != ' ' ; hp++ );
if ( ! *hp ) {log("inject: bad arc header 4.");exit(0);}
*hp++ = '\0'; // null term arcContentType
char *arcContentLenStr = hp;
// get arc content len
int64_t arcContentLen = atoll(arcContentLenStr);
char *arcContentEnd = arcContent + arcContentLen;
//uint64_t oldOff = s_off;
recSize = (arcContentEnd - realStart);
// point to the next arc record
m_fptr += recSize;
// advance the file offset to the next record as well
m_fileOff += recSize;
// arcConType needs to indexable
int32_t ct = getContentTypeFromStr ( arcConType );
if ( ct != CT_HTML &&
ct != CT_TEXT &&
ct != CT_XML &&
ct != CT_JSON ) {
// read another arc record
goto loop;
}
// convert to timestamp
// this time structure, once filled, will help yield a time_t
struct tm t;
// DAY OF MONTH
t.tm_mday = atol2 ( timeStr + 6 , 2 );
// MONTH
t.tm_mon = atol2 ( timeStr + 4 , 2 );
// YEAR - # of years since 1900
t.tm_year = atol2 ( timeStr , 4 ) - 1900 ;
// TIME
t.tm_hour = atol2 ( timeStr + 8 , 2 );
t.tm_min = atol2 ( timeStr + 10 , 2 );
t.tm_sec = atol2 ( timeStr + 12 , 2 );
// unknown if we're in daylight savings time
t.tm_isdst = -1;
// translate using mktime
recTime = timegm ( &t );
// set content as well
recContent = arcContent;
recContentLen = arcContentLen;
recUrl = url;
recIp = ipStr;
}
// END ARC SPECIFIC PARSING
@ -3723,13 +3622,14 @@ bool XmlDoc::indexWarcOrArc ( char ctype ) {
// how can there be no more to read?
if ( m_fptr > m_fptrEnd && ! m_hasMoreToRead ) {
log("build: warc file exceeded file length.");
log("build: archive file %s exceeded file length.",
file->getFilename());
goto loop;
}
// if we fall outside of the current read buf, read next rec if too big
if ( m_fptr > m_fptrEnd && recSize > MAXWARCRECSIZE ) {
log("build: skipping warc file of %"INT64" "
log("build: skipping archive file of %"INT64" "
"bytes which is too big",recSize);
needReadMore = true;
goto readMore;
@ -3748,8 +3648,8 @@ bool XmlDoc::indexWarcOrArc ( char ctype ) {
// should be a mime that starts with GET or POST
HttpMime m;
if ( ! m.set ( httpReply , httpReplySize , NULL ) ) {
log("build: warc: failed to set http mime at %"INT64" in file"
,oldOff);
log("build: archive: failed to set http mime at %"INT64" in "
"file",oldOff);
goto loop;
}
@ -3826,7 +3726,7 @@ bool XmlDoc::indexWarcOrArc ( char ctype ) {
// get end of ip
char *ipEnd = recIp;
// skip digits and periods
while ( ! is_wspace_a(*ipEnd) ) ipEnd++;
while ( *ipEnd && ! is_wspace_a(*ipEnd) ) ipEnd++;
// we now have the ip address for doing ip: searches
// this func is in ip.h
gr->m_injectDocIp = atoip ( recIp, ipEnd-recIp );
@ -3861,14 +3761,14 @@ bool XmlDoc::indexWarcOrArc ( char ctype ) {
// log it
log("build: warc: injecting WARC/ARC url %s",recUrl);
log("build: archive: injecting archive url %s",recUrl);
QUICKPOLL ( m_niceness );
if ( ! msg7->inject2 ( this , doneInjectingWarcRec ) )
if ( ! msg7->inject2 ( this , doneInjectingArchiveRec ) )
m_numInjectionsOut++;
else
log("build: index warc: msg7: %s",mstrerror(g_errno));
log("build: index archive: msg7: %s",mstrerror(g_errno));
goto loop;
}
@ -19121,7 +19021,7 @@ void *systemStartWrapper_r ( void *state , ThreadEntry *t ) {
XmlDoc *THIS = (XmlDoc *)state;
char filename[2048];
snprintf(filename,2048,"%sgbwarcfile%"UINT32".gz",
snprintf(filename,2048,"%sgbarchivefile%"UINT32".gz",
g_hostdb.m_dir,
(int32_t)(int64_t)THIS);
@ -19168,7 +19068,7 @@ File *XmlDoc::getUtf8ContentInFile ( int64_t *fileSizeArg ) {
char filename[2048];
snprintf ( filename,
2048,
"%sgbwarcfile%"UINT32"",
"%sgbarchivefile%"UINT32"",
g_hostdb.m_dir,
(int32_t)(int64_t)this);

@ -505,7 +505,6 @@ class XmlDoc {
bool indexDoc2 ( );
bool isContainerDoc ( );
bool indexContainerDoc ( );
bool indexArc ( ) ;
bool indexWarcOrArc ( char ct ) ;
key_t *getTitleRecKey() ;
//char *getSkipIndexing ( );