Merge branch 'ia-zak' of github.com:gigablast/open-source-search-engine into ia-zak
This commit is contained in:
commit
5aaa08d81a
29
BigFile.cpp
29
BigFile.cpp
@ -35,6 +35,7 @@ BigFile::~BigFile () {
|
||||
BigFile::BigFile () {
|
||||
m_permissions = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH ;
|
||||
m_flags = O_RDWR ; // | O_DIRECT;
|
||||
m_usePartFiles = true;
|
||||
// NULLify all ptrs to files
|
||||
//for ( int32_t i = 0 ; i < MAX_PART_FILES ; i++ ) m_files[i] = NULL;
|
||||
m_maxParts = 0;
|
||||
@ -74,6 +75,8 @@ bool BigFile::set ( char *dir , char *baseFilename , char *stripeDir ) {
|
||||
m_dir .setLabel("bfd");
|
||||
m_baseFilename.setLabel("bfbf");
|
||||
|
||||
m_usePartFiles = true;
|
||||
|
||||
// use this 32 byte char buf to avoid a malloc if possible
|
||||
m_baseFilename.setBuf (m_tmpBaseBuf,32,0,false);
|
||||
|
||||
@ -267,12 +270,12 @@ static int64_t s_vfd = 0;
|
||||
|
||||
// do not use part files for this open so we can open regular really >2GB
|
||||
// sized files with it
|
||||
bool BigFile::open2 ( int flags ,
|
||||
void *pc ,
|
||||
int64_t maxFileSize ,
|
||||
int permissions ) {
|
||||
return open ( flags , pc , maxFileSize , permissions , false );
|
||||
}
|
||||
// bool BigFile::open2 ( int flags ,
|
||||
// void *pc ,
|
||||
// int64_t maxFileSize ,
|
||||
// int permissions ) {
|
||||
// return open ( flags , pc , maxFileSize , permissions , false );
|
||||
// }
|
||||
|
||||
// . overide File::open so we can set m_numParts
|
||||
// . set maxFileSize when opening a new file for writing and using
|
||||
@ -282,15 +285,14 @@ bool BigFile::open ( int flags ,
|
||||
//class DiskPageCache *pc ,
|
||||
void *pc ,
|
||||
int64_t maxFileSize ,
|
||||
int permissions ,
|
||||
bool usePartFiles ) {
|
||||
int permissions ) {
|
||||
|
||||
m_flags = flags;
|
||||
//m_pc = pc;
|
||||
m_permissions = permissions;
|
||||
m_isClosing = false;
|
||||
// this is true except when parsing big warc files
|
||||
m_usePartFiles = usePartFiles;
|
||||
m_usePartFiles = true;//usePartFiles;
|
||||
// . init the page cache for this vfd
|
||||
// . this returns our "virtual fd", not the same as File::m_vfd
|
||||
// . returns -1 and sets g_errno on failure
|
||||
@ -1378,10 +1380,17 @@ bool readwrite_r ( FileState *fstate , ThreadEntry *t ) {
|
||||
log("disk: Read of %"INT32" bytes at offset %"INT64" "
|
||||
" failed because file is too short for that "
|
||||
"offset? Our fd was probably stolen from us by another "
|
||||
"thread. Will retry. error=%s.",
|
||||
"thread. fd1=%i fd2=%i len=%i filenum=%i "
|
||||
"localoffset=%i. usepart=%i error=%s.",
|
||||
(int32_t)len,fstate->m_offset,
|
||||
//fstate->m_this->getDir(),
|
||||
//fstate->m_this->getFilename(),
|
||||
fstate->m_fd1,
|
||||
fstate->m_fd2,
|
||||
len,
|
||||
filenum,
|
||||
localOffset,
|
||||
fstate->m_usePartFiles,
|
||||
mstrerror(errno));
|
||||
errno = EBADENGINEER;
|
||||
return false; // log("disk::read/write: offset too big");
|
||||
|
16
BigFile.h
16
BigFile.h
@ -143,17 +143,17 @@ class BigFile {
|
||||
void *pc = NULL ,
|
||||
int64_t maxFileSize = -1 ,
|
||||
int permissions =
|
||||
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH ,
|
||||
bool usePartFiles = true );
|
||||
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH );
|
||||
//bool usePartFiles = true );
|
||||
|
||||
// this will set usepartfiles to false! so use this to open large
|
||||
// warc or arc files
|
||||
bool open2 ( int flags ,
|
||||
//class DiskPageCache *pc = NULL ,
|
||||
void *pc = NULL ,
|
||||
int64_t maxFileSize = -1 ,
|
||||
int permissions =
|
||||
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH );
|
||||
//bool open2 ( int flags ,
|
||||
// //class DiskPageCache *pc = NULL ,
|
||||
// void *pc = NULL ,
|
||||
// int64_t maxFileSize = -1 ,
|
||||
// int permissions =
|
||||
// S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH );
|
||||
|
||||
|
||||
|
||||
|
@ -1700,13 +1700,24 @@ collnum_t Collectiondb::reserveCollNum ( ) {
|
||||
return next;
|
||||
}
|
||||
|
||||
// collnum_t is signed right now because we use -1 to indicate a
|
||||
// bad collnum.
|
||||
int32_t scanned = 0;
|
||||
// search for an empty slot
|
||||
for ( int32_t i = m_wrapped ; i < m_numRecs ; i++ ) {
|
||||
for ( int32_t i = m_wrapped ; ; i++ ) {
|
||||
// because collnum_t is 2 bytes, signed, limit this here
|
||||
if ( i > 0x7fff ) i = 0;
|
||||
// how can this happen?
|
||||
if ( i < 0 ) i = 0;
|
||||
// if we scanned the max # of recs we could have, we are done
|
||||
if ( ++scanned >= m_numRecs ) break;
|
||||
// skip if this is in use
|
||||
if ( m_recs[i] ) continue;
|
||||
// start after this one next time
|
||||
m_wrapped = i+1;
|
||||
// note it
|
||||
log("colldb: returning wrapped collnum of %"INT32"",(int32_t)i);
|
||||
log("colldb: returning wrapped collnum "
|
||||
"of %"INT32"",(int32_t)i);
|
||||
return (collnum_t)i;
|
||||
}
|
||||
|
||||
|
1
Conf.h
1
Conf.h
@ -682,6 +682,7 @@ class Conf {
|
||||
bool m_diffbotMsg13Hack ;
|
||||
bool m_logDebugUrlAttempts ;
|
||||
bool m_logDebugTcp ;
|
||||
bool m_logDebugTcpBuf ;
|
||||
bool m_logDebugThread ;
|
||||
bool m_logDebugTimedb ;
|
||||
bool m_logDebugTitle ;
|
||||
|
1
Hostdb.h
1
Hostdb.h
@ -211,6 +211,7 @@ class Host {
|
||||
int64_t m_lastPing;
|
||||
|
||||
char m_tmpBuf[4];
|
||||
int16_t m_tmpCount;
|
||||
|
||||
// . first time we sent an unanswered ping request to this host
|
||||
// . used so we can determine when to send an email alert
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include "sort.h"
|
||||
#include "XmlDoc.h" // score32to8()
|
||||
#include "Rebalance.h"
|
||||
#include "Process.h"
|
||||
|
||||
Linkdb g_linkdb;
|
||||
Linkdb g_linkdb2;
|
||||
@ -1130,6 +1131,12 @@ bool Msg25::doReadLoop ( ) {
|
||||
ms,m_site,m_url,m_docId,KEYSTR(&startKey,LDBKS));
|
||||
}
|
||||
|
||||
if ( g_process.m_mode == EXIT_MODE ) {
|
||||
log("linkdb: shutting down. exiting link text loop.");
|
||||
g_errno = ESHUTTINGDOWN;
|
||||
return false;
|
||||
}
|
||||
|
||||
m_gettingList = true;
|
||||
|
||||
CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
|
||||
|
@ -124,6 +124,53 @@ Host *getHostToHandleInjection ( char *url ) {
|
||||
Host *group = g_hostdb.getShard ( shardNum );
|
||||
int32_t hostNum = docId % g_hostdb.m_numHostsPerShard;
|
||||
Host *host = &group[hostNum];
|
||||
|
||||
bool isWarcInjection = false;
|
||||
int32_t ulen = gbstrlen(url);
|
||||
if ( ulen > 10 && strcmp(url+ulen-8,".warc.gz") == 0 )
|
||||
isWarcInjection = true;
|
||||
if ( ulen > 10 && strcmp(url+ulen-5,".warc") == 0 )
|
||||
isWarcInjection = true;
|
||||
|
||||
if ( ! isWarcInjection ) return host;
|
||||
|
||||
// warc files end up calling XmlDoc::indexWarcOrArc() which spawns
|
||||
// a msg7 injection request for each doc in the warc/arc file
|
||||
// so let's do load balancing differently for them so one host
|
||||
// doesn't end up doing a bunch of wget/gunzips on warc files
|
||||
// thereby bottlenecking the cluster. get the first hostid that
|
||||
// we have not sent a msg7 injection request to that is still out
|
||||
for ( int32_t i = 0 ; i < g_hostdb.m_numHosts ; i++ ) {
|
||||
Host *h = g_hostdb.getHost(i);
|
||||
h->m_tmpCount = 0;
|
||||
}
|
||||
for ( UdpSlot *slot = g_udpServer.m_head2 ;
|
||||
slot ;
|
||||
slot = slot->m_next2 ) {
|
||||
// skip if not injection request
|
||||
if ( slot->m_msgType != 0x07 ) continue;
|
||||
//if ( ! slot->m_weInitiated ) continue;
|
||||
// if we did not initiate the injection request, i.e. if
|
||||
// it is to us, skip it
|
||||
if ( ! slot->m_callback ) continue;
|
||||
// who is it from?
|
||||
int32_t hostId = slot->m_hostId;
|
||||
if ( hostId < 0 ) continue;
|
||||
Host *h = g_hostdb.getHost ( hostId );
|
||||
if ( ! h ) continue;
|
||||
h->m_tmpCount++;
|
||||
}
|
||||
int32_t min = 999999;
|
||||
Host *minh = NULL;
|
||||
for ( int32_t i = 0 ; i < g_hostdb.m_numHosts ; i++ ) {
|
||||
Host *h = g_hostdb.getHost(i);
|
||||
if ( h->m_tmpCount == 0 ) return h;
|
||||
if ( h->m_tmpCount >= min ) continue;
|
||||
min = h->m_tmpCount;
|
||||
minh = h;
|
||||
}
|
||||
if ( minh ) return minh;
|
||||
// how can this happen?
|
||||
return host;
|
||||
}
|
||||
|
||||
|
@ -49,9 +49,18 @@ class StateStatsdb {
|
||||
static time_t genDate( char *date, int32_t dateLen ) ;
|
||||
static void sendReply ( void *st ) ;
|
||||
|
||||
static bool s_graphInUse = false;
|
||||
|
||||
// . returns false if blocked, otherwise true
|
||||
// . sets g_errno on error
|
||||
bool sendPageGraph ( TcpSocket *s, HttpRequest *r ) {
|
||||
|
||||
if ( s_graphInUse ) {
|
||||
char *msg = "stats graph calculating for another user. "
|
||||
"Try again later.";
|
||||
g_httpServer.sendErrorReply(s,500,msg);
|
||||
return true;
|
||||
}
|
||||
|
||||
char *cgi;
|
||||
int32_t cgiLen;
|
||||
@ -129,8 +138,10 @@ bool sendPageGraph ( TcpSocket *s, HttpRequest *r ) {
|
||||
st->m_samples ,
|
||||
&st->m_sb2 ,
|
||||
st ,
|
||||
sendReply ) )
|
||||
sendReply ) ) {
|
||||
s_graphInUse = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
// if we didn't block call it ourselves directly
|
||||
sendReply ( st );
|
||||
@ -185,6 +196,8 @@ void genStatsGraphTable(SafeBuf *buf, StateStatsdb *st) {
|
||||
|
||||
void sendReply ( void *state ) {
|
||||
|
||||
s_graphInUse = false;
|
||||
|
||||
StateStatsdb *st = (StateStatsdb *)state;
|
||||
|
||||
if ( g_errno ) {
|
||||
|
10
Parms.cpp
10
Parms.cpp
@ -19750,6 +19750,16 @@ void Parms::init ( ) {
|
||||
m->m_obj = OBJ_CONF;
|
||||
m++;
|
||||
|
||||
m->m_title = "log debug tcp buffer messages";
|
||||
m->m_cgi = "ldtb";
|
||||
m->m_off = (char *)&g_conf.m_logDebugTcpBuf - g;
|
||||
m->m_type = TYPE_BOOL;
|
||||
m->m_def = "0";
|
||||
m->m_priv = 1;
|
||||
m->m_page = PAGE_LOG;
|
||||
m->m_obj = OBJ_CONF;
|
||||
m++;
|
||||
|
||||
m->m_title = "log debug thread messages";
|
||||
m->m_cgi = "ldth";
|
||||
m->m_off = (char *)&g_conf.m_logDebugThread - g;
|
||||
|
@ -1865,7 +1865,7 @@ Profiler::printRealTimeInfo(SafeBuf *sb,
|
||||
int fd = open ( filename , O_RDWR | O_CREAT , S_IRWXU );
|
||||
if ( fd < 0 ) {
|
||||
sb->safePrintf("FAILED TO OPEN %s for writing: %s"
|
||||
,ff.getBufStart(),strerror(errno));
|
||||
,ff.getBufStart(),mstrerror(errno));
|
||||
return false;
|
||||
}
|
||||
for ( ; ip < ipEnd ; ip += sizeof(uint64_t) ) {
|
||||
@ -1892,6 +1892,13 @@ Profiler::printRealTimeInfo(SafeBuf *sb,
|
||||
|
||||
// restrict to top 100 lines
|
||||
char *x = out.getBufStart();
|
||||
|
||||
if ( ! x ) {
|
||||
sb->safePrintf("FAILED TO READ trash/output.txt: %s"
|
||||
,mstrerror(g_errno));
|
||||
return false;
|
||||
}
|
||||
|
||||
int lineCount = 0;
|
||||
for ( ; *x ; x++ ) {
|
||||
if ( *x != '\n' ) continue;
|
||||
|
@ -148,6 +148,7 @@ bool RdbScan::setRead ( BigFile *file ,
|
||||
// ensure we don't mess around
|
||||
m_fstate.m_allocBuf = NULL;
|
||||
m_fstate.m_buf = NULL;
|
||||
//m_fstate.m_usePartFiles = true;
|
||||
// debug msg
|
||||
//log("diskOff=%"INT64" nb=%"INT32"",offset,bytesToRead);
|
||||
//if ( offset == 16386 && bytesToRead == 16386 )
|
||||
|
@ -77,7 +77,7 @@ static Label s_labels[] = {
|
||||
{GRAPH_OPS,20,"parse_doc", .005,"%.1f dps" , 1.0 , 0x00fea915,"parsed doc" },
|
||||
|
||||
|
||||
{GRAPH_QUANTITY_PER_OP,-1,"docs_per_second", .1,"%.1f docs per second" , -1 , 0x1F2F5C,"docs per second" },
|
||||
{GRAPH_QUANTITY_PER_OP,-1,"docs_per_second", .1,"%.1f docs per second" , -1 , 0x1F2F5C,"*successfully* indexed docs per second" },
|
||||
|
||||
// . use .1 * 1000 docs as the min resolution per pixel
|
||||
// . max = -1, means dynamic size the ymax!
|
||||
|
@ -598,7 +598,7 @@ bool TcpServer::sendMsg ( int32_t ip ,
|
||||
// we think that they are using an sd used by a streaming socket,
|
||||
// who closed, but then proceed to use TcpSocket class as if he
|
||||
// had not closed it.
|
||||
if ( 1==2 && g_hostdb.m_hostId == 0 ) {
|
||||
if ( g_conf.m_logDebugTcpBuf ) {
|
||||
SafeBuf sb;
|
||||
sb.safePrintf("tcp: open newsd=%i sendbuf=",s->m_sd);
|
||||
sb.safeTruncateEllipsis (sendBuf,sendBufSize,200);
|
||||
@ -2276,7 +2276,7 @@ void TcpServer::destroySocket ( TcpSocket *s ) {
|
||||
// 0 is the FD for stdin so i don't know how that is happening.
|
||||
if ( sd != 0 ) cret = ::close ( sd );
|
||||
|
||||
if ( 1==2 && g_hostdb.m_hostId == 0 ) {
|
||||
if ( g_conf.m_logDebugTcpBuf ) {
|
||||
SafeBuf sb;
|
||||
sb.safePrintf("tcp: closing sd=%i bytessent=%i "
|
||||
"sendbufused=%i streaming=%i "
|
||||
@ -2619,7 +2619,7 @@ TcpSocket *TcpServer::acceptSocket ( ) {
|
||||
// we think that they are using an sd used by a streaming socket,
|
||||
// who closed, but then proceed to use TcpSocket class as if he
|
||||
// had not closed it.
|
||||
if ( 1==2 && g_hostdb.m_hostId == 0 ) {
|
||||
if ( g_conf.m_logDebugTcpBuf ) {
|
||||
SafeBuf sb;
|
||||
sb.safePrintf("tcp: accept newsd=%i incoming req",newsd);
|
||||
//sb.safeTruncateEllipsis (sendBuf,sendBufSize,200);
|
||||
|
@ -323,7 +323,7 @@ bool Threads::init ( ) {
|
||||
if ( ! g_threads.registerType ( INTERSECT_THREAD,max,200) )
|
||||
return log("thread: Failed to register thread type." );
|
||||
// filter thread spawned to call popen() to filter an http reply
|
||||
if ( ! g_threads.registerType ( FILTER_THREAD , 1/*maxThreads*/,300) )
|
||||
if ( ! g_threads.registerType ( FILTER_THREAD, 2/*maxThreads*/,300) )
|
||||
return log("thread: Failed to register thread type." );
|
||||
// RdbTree uses this to save itself
|
||||
if ( ! g_threads.registerType ( SAVETREE_THREAD,1/*maxThreads*/,100) )
|
||||
|
@ -19363,8 +19363,10 @@ BigFile *XmlDoc::getUtf8ContentInFile ( int64_t *fileSizeArg ) {
|
||||
m_fileSize = m_file.getFileSize();
|
||||
m_fileValid = true;
|
||||
*fileSizeArg = m_fileSize;
|
||||
// open2() has usepartfiles = false!!!
|
||||
m_file.open2(O_RDONLY);
|
||||
m_file.open(O_RDONLY);
|
||||
// explicitly set it to false now to make it harder for
|
||||
// it not to be true because that messes things up
|
||||
m_file.m_usePartFiles = false;
|
||||
return &m_file;
|
||||
}
|
||||
|
||||
|
@ -27,6 +27,7 @@ bool sendPageSEO(TcpSocket *s, HttpRequest *hr) {return true;}
|
||||
//SafeBuf g_qbuf;
|
||||
|
||||
bool g_recoveryMode;
|
||||
int32_t g_recoveryLevel;
|
||||
|
||||
int g_inMemcpy;
|
||||
|
||||
|
@ -120,7 +120,8 @@ def injectItem(item, db, mode):
|
||||
postVars = {'url':'http://archive.org/download/%s/%s' %
|
||||
(item,ff['name']),
|
||||
'metadata':json.dumps(itemMetadata),
|
||||
'c':'ait'}
|
||||
'c':'ait',
|
||||
'spiderlinks':0}
|
||||
start = time.time()
|
||||
if mode == 'production':
|
||||
try:
|
||||
|
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user