Merge branch 'ia' into ia-zak

This commit is contained in:
Matt
2015-09-12 11:54:57 -06:00
5 changed files with 110 additions and 20 deletions

@ -265,6 +265,15 @@ bool BigFile::doesPartExist ( int32_t n ) {
static int64_t s_vfd = 0;
// do not use part files for this open so we can open regular really >2GB
// sized files with it
bool BigFile::open2 ( int flags ,
void *pc ,
int64_t maxFileSize ,
int permissions ) {
return open ( flags , pc , maxFileSize , permissions , false );
}
// . overide File::open so we can set m_numParts
// . set maxFileSize when opening a new file for writing and using
// DiskPageCache
@ -273,12 +282,15 @@ bool BigFile::open ( int flags ,
//class DiskPageCache *pc ,
void *pc ,
int64_t maxFileSize ,
int permissions ) {
int permissions ,
bool usePartFiles ) {
m_flags = flags;
//m_pc = pc;
m_permissions = permissions;
m_isClosing = false;
// this is true except when parsing big warc files
m_usePartFiles = usePartFiles;
// . init the page cache for this vfd
// . this returns our "virtual fd", not the same as File::m_vfd
// . returns -1 and sets g_errno on failure
@ -595,6 +607,7 @@ bool BigFile::readwrite ( void *buf ,
fstate->m_callback = callback;
fstate->m_niceness = niceness;
fstate->m_flags = m_flags;
fstate->m_usePartFiles = m_usePartFiles;
// sanity
if ( fstate->m_bytesToGo > 150000000 )
log("file: huge read of %"INT64" bytes",(int64_t)size);
@ -607,6 +620,13 @@ bool BigFile::readwrite ( void *buf ,
// situation occurs and pass a g_errno back to the caller.
fstate->m_filenum1 = offset / MAX_PART_SIZE;
fstate->m_filenum2 = (offset + size ) / MAX_PART_SIZE;
// if not really a big file. we use this for parsing huge warc files
if ( ! m_usePartFiles ) {
fstate->m_filenum1 = 0;
fstate->m_filenum2 = 0;
}
// . save the open count for this fd
// . if it changes when we're done with the read we do a re-read
// . it gets incremented once every time File calls ::open and gets
@ -769,7 +789,7 @@ bool BigFile::readwrite ( void *buf ,
// how many bytes to read from each file?
int64_t readSize1 = size;
int64_t readSize2 = 0;
if ( off1 + readSize1 > MAX_PART_SIZE ) {
if ( off1 + readSize1 > MAX_PART_SIZE && m_usePartFiles ) {
readSize1 = ((int64_t)MAX_PART_SIZE) - off1;
readSize2 = size - readSize1;
}
@ -788,6 +808,10 @@ bool BigFile::readwrite ( void *buf ,
int32_t filenum = offset / MAX_PART_SIZE;
int32_t localOffset = offset % MAX_PART_SIZE;
if ( ! m_usePartFiles ) {
filenum = 0;
localOffset = offset;
}
// read or write?
if ( doWrite ) a0->aio_lio_opcode = LIO_WRITE;
@ -856,7 +880,7 @@ bool BigFile::readwrite ( void *buf ,
int32_t rate = 100000;
if ( took > 500 ) rate = fstate->m_bytesDone / took ;
if ( rate < 8000 && fstate->m_niceness <= 0 ) {
log(LOG_INFO,"disk: Read %"INT32" bytes in %"INT64" "
log(LOG_INFO,"disk: Read %"INT64" bytes in %"INT64" "
"ms (%"INT32"KB/s).",
fstate->m_bytesDone,took,rate);
g_stats.m_slowDiskReads++;
@ -957,7 +981,7 @@ void doneWrapper ( void *state , ThreadEntry *t ) {
if ( fstate->m_errno == EDISKSTUCK ) slow = true;
if ( slow && fstate->m_niceness <= 0 ) {
if ( fstate->m_errno != EDISKSTUCK )
log(LOG_INFO, "disk: Read %"INT32" bytes in %"INT64" "
log(LOG_INFO, "disk: Read %"INT64" bytes in %"INT64" "
"ms (%"INT32"KB/s).",
fstate->m_bytesDone,took,rate);
g_stats.m_slowDiskReads++;
@ -1264,6 +1288,12 @@ bool readwrite_r ( FileState *fstate , ThreadEntry *t ) {
int32_t len = bytesToGo - bytesDone;
// how many bytes can we write to it now
if ( len > avail ) len = avail;
// hack for reading warc files
if ( ! fstate->m_usePartFiles ) {
filenum = 0;
localOffset = offset;
len = bytesToGo - bytesDone;
}
// get the fd for this filenum
int fd = -1;
if ( filenum == fstate->m_filenum1 ) fd = fstate->m_fd1;

@ -47,14 +47,14 @@ public:
class BigFile *m_this;
//struct aiocb m_aiostate;
char *m_buf;
int32_t m_bytesToGo;
int64_t m_bytesToGo;
int64_t m_offset;
// . the original offset, because we set m_offset to m_currentOffset
// if the original offset specified is -1
// . we also advance BigFile::m_currentOffset when done w/ read/write
//int64_t m_origOffset;
bool m_doWrite;
int32_t m_bytesDone;
int64_t m_bytesDone;
void *m_state ;
void (*m_callback) ( void *state ) ;
// goes from 0 to 1, the lower the niceness, the higher the priority
@ -79,6 +79,7 @@ public:
// when we started for graphing purposes (in milliseconds)
int64_t m_startTime;
int64_t m_doneTime;
char m_usePartFiles;
// this is used for calling DiskPageCache::addPages() when done
// with the read/write
//class DiskPageCache *m_pc;
@ -102,10 +103,10 @@ public:
// threads each hogging up 32KB of memory waiting to read tfndb.
// m_allocBuf points to what we allocated.
char *m_allocBuf;
int32_t m_allocSize;
int64_t m_allocSize;
// m_allocOff is offset into m_allocBuf where we start reading into
// from the file
int32_t m_allocOff;
int64_t m_allocOff;
// do not call pthread_create() for every read we do. use async io
// because it should be much much faster
#ifdef ASYNCIO
@ -142,7 +143,19 @@ class BigFile {
void *pc = NULL ,
int64_t maxFileSize = -1 ,
int permissions =
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH );
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH ,
bool usePartFiles = true );
// this will set usepartfiles to false! so use this to open large
// warc or arc files
bool open2 ( int flags ,
//class DiskPageCache *pc = NULL ,
void *pc = NULL ,
int64_t maxFileSize = -1 ,
int permissions =
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH );
int getFlags() { return m_flags; };
@ -355,6 +368,8 @@ class BigFile {
// prevent circular calls to BigFile::close() with this
char m_isClosing;
char m_usePartFiles;
int64_t m_fileSize;
// oldest of the last modified dates of all the part files

@ -321,7 +321,7 @@ void RdbScan::gotList ( ) {
// . i think a read overflow might be causing a segv in malloc
// . NOTE: BigFile's call to DiskPageCache alters these values
if ( m_fstate.m_bytesDone != m_fstate.m_bytesToGo && m_hitDisk )
log(LOG_INFO,"disk: Read %"INT32" bytes but needed %"INT32".",
log(LOG_INFO,"disk: Read %"INT64" bytes but needed %"INT64".",
m_fstate.m_bytesDone , m_fstate.m_bytesToGo );
// adjust the list size for biased page cache if necessary
//if ( m_file->m_pc && m_allowPageCache &&

@ -109,6 +109,7 @@ char *getFirstJSONObject ( char *p ,
char *getJSONObjectEnd ( char *p , int32_t niceness ) ;
XmlDoc::XmlDoc() {
m_readThreadOut = false;
for ( int32_t i = 0 ; i < MAXMSG7S ; i++ ) m_msg7s[i] = NULL;
m_esbuf.setLabel("exputfbuf");
for ( int32_t i = 0 ; i < MAX_XML_DOCS ; i++ ) m_xmlDocs[i] = NULL;
@ -208,6 +209,10 @@ class XmlDoc *g_xd;
void XmlDoc::reset ( ) {
if ( m_readThreadOut )
log("build: deleting xmldoc class that has a read thread out "
"on a warc file");
if ( m_fileValid ) {
m_file.close();
m_file.unlink();
@ -3351,6 +3356,14 @@ void doneInjectingArchiveRec ( void *state ) {
xd->m_masterLoop ( xd );
}
void doneReadingArchiveFileWrapper ( void *state ) {
XmlDoc *THIS = (XmlDoc *)state;
// . go back to the main entry function
// . make sure g_errno is clear from a msg3a g_errno before calling
// this lest it abandon the loop
THIS->m_masterLoop ( THIS->m_masterState );
}
#define MAXWARCRECSIZE 1000000
@ -3368,7 +3381,7 @@ bool XmlDoc::indexWarcOrArc ( char ctype ) {
// so big we can fit it in memory. just do a wget then gunzip
// then open it. use a system call in a thread.
int64_t fileSize = -1;
File *file = getUtf8ContentInFile( &fileSize );
BigFile *file = getUtf8ContentInFile( &fileSize );
// return true with g_errno set on error
if ( ! file ) {
if ( ! g_errno ) { char *xx=NULL;*xx=0; }
@ -3444,7 +3457,37 @@ bool XmlDoc::indexWarcOrArc ( char ctype ) {
toRead = fileSize - m_fileOff;
m_hasMoreToRead = false;
}
int32_t bytesRead = file->read (m_fileBuf, toRead, m_fileOff);
bool status;
if ( m_readThreadOut ) {
m_readThreadOut = false;
status = false;
goto skipRead;
}
// make a thread to read now
status = file->read (m_fileBuf,
toRead,
m_fileOff,
&m_fileState,
this,
doneReadingArchiveFileWrapper,
MAX_NICENESS );
// if thread was queue or launched, wait for it to come back
if ( ! status ) {
// set a signal so we do not recall thread
// when callback brings us back here
m_readThreadOut = true;
// wait for callback
return false;
}
skipRead:
int64_t bytesRead = m_fileState.m_bytesDone;
if ( bytesRead != toRead ) {
log("build: read of %s failed at offset "
"%"INT64"", file->getFilename(), m_fileOff);
@ -19295,7 +19338,7 @@ void systemDoneWrapper ( void *state , ThreadEntry *t ) {
}
// we download large files to a file on disk, like warcs and arcs
File *XmlDoc::getUtf8ContentInFile ( int64_t *fileSizeArg ) {
BigFile *XmlDoc::getUtf8ContentInFile ( int64_t *fileSizeArg ) {
if ( m_fileValid ) {
*fileSizeArg = m_fileSize;
@ -19309,15 +19352,15 @@ File *XmlDoc::getUtf8ContentInFile ( int64_t *fileSizeArg ) {
char filename[2048];
snprintf ( filename,
2048,
"%sgbarchivefile%"UINT32"",
g_hostdb.m_dir,
"gbarchivefile%"UINT32"",
(int32_t)(int64_t)this);
m_file.set ( filename );
m_file.set ( g_hostdb.m_dir , filename );
m_fileSize = m_file.getFileSize();
m_fileValid = true;
*fileSizeArg = m_fileSize;
m_file.open(O_RDONLY);
// open2() has usepartfiles = false!!!
m_file.open2(O_RDONLY);
return &m_file;
}
@ -19405,7 +19448,7 @@ File *XmlDoc::getUtf8ContentInFile ( int64_t *fileSizeArg ) {
systemDoneWrapper ,
systemStartWrapper_r ) )
// would block, wait for thread
return (File *)-1;
return (BigFile *)-1;
// failed?
log("build: failed to launch wget thread");
// If we run it in this thread then if we are fetching

@ -705,7 +705,7 @@ class XmlDoc {
char **getExpandedUtf8Content ( ) ;
char **getUtf8Content ( ) ;
// we download large files to a file on disk, like warcs and arcs
File *getUtf8ContentInFile ( int64_t *fileSizeArg );
BigFile *getUtf8ContentInFile ( int64_t *fileSizeArg );
int32_t *getContentHash32 ( ) ;
int32_t *getContentHashJson32 ( ) ;
//int32_t *getTagHash32 ( ) ;
@ -1090,8 +1090,10 @@ class XmlDoc {
int32_t m_fileBufAllocSize;
char *m_fptr ;
char *m_fptrEnd ;
File m_file;
BigFile m_file;
int64_t m_fileSize;
FileState m_fileState;
bool m_readThreadOut;
bool m_hasMoreToRead;
int32_t m_numInjectionsOut;
bool m_calledWgetThread;