Merge branch 'ia-zak' into ia
This commit is contained in:
commit
710661a0f3
29
BigFile.cpp
29
BigFile.cpp
@ -35,6 +35,7 @@ BigFile::~BigFile () {
|
||||
BigFile::BigFile () {
|
||||
m_permissions = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH ;
|
||||
m_flags = O_RDWR ; // | O_DIRECT;
|
||||
m_usePartFiles = true;
|
||||
// NULLify all ptrs to files
|
||||
//for ( int32_t i = 0 ; i < MAX_PART_FILES ; i++ ) m_files[i] = NULL;
|
||||
m_maxParts = 0;
|
||||
@ -74,6 +75,8 @@ bool BigFile::set ( char *dir , char *baseFilename , char *stripeDir ) {
|
||||
m_dir .setLabel("bfd");
|
||||
m_baseFilename.setLabel("bfbf");
|
||||
|
||||
m_usePartFiles = true;
|
||||
|
||||
// use this 32 byte char buf to avoid a malloc if possible
|
||||
m_baseFilename.setBuf (m_tmpBaseBuf,32,0,false);
|
||||
|
||||
@ -267,12 +270,12 @@ static int64_t s_vfd = 0;
|
||||
|
||||
// do not use part files for this open so we can open regular really >2GB
|
||||
// sized files with it
|
||||
bool BigFile::open2 ( int flags ,
|
||||
void *pc ,
|
||||
int64_t maxFileSize ,
|
||||
int permissions ) {
|
||||
return open ( flags , pc , maxFileSize , permissions , false );
|
||||
}
|
||||
// bool BigFile::open2 ( int flags ,
|
||||
// void *pc ,
|
||||
// int64_t maxFileSize ,
|
||||
// int permissions ) {
|
||||
// return open ( flags , pc , maxFileSize , permissions , false );
|
||||
// }
|
||||
|
||||
// . overide File::open so we can set m_numParts
|
||||
// . set maxFileSize when opening a new file for writing and using
|
||||
@ -282,15 +285,14 @@ bool BigFile::open ( int flags ,
|
||||
//class DiskPageCache *pc ,
|
||||
void *pc ,
|
||||
int64_t maxFileSize ,
|
||||
int permissions ,
|
||||
bool usePartFiles ) {
|
||||
int permissions ) {
|
||||
|
||||
m_flags = flags;
|
||||
//m_pc = pc;
|
||||
m_permissions = permissions;
|
||||
m_isClosing = false;
|
||||
// this is true except when parsing big warc files
|
||||
m_usePartFiles = usePartFiles;
|
||||
m_usePartFiles = true;//usePartFiles;
|
||||
// . init the page cache for this vfd
|
||||
// . this returns our "virtual fd", not the same as File::m_vfd
|
||||
// . returns -1 and sets g_errno on failure
|
||||
@ -1378,10 +1380,17 @@ bool readwrite_r ( FileState *fstate , ThreadEntry *t ) {
|
||||
log("disk: Read of %"INT32" bytes at offset %"INT64" "
|
||||
" failed because file is too short for that "
|
||||
"offset? Our fd was probably stolen from us by another "
|
||||
"thread. Will retry. error=%s.",
|
||||
"thread. fd1=%i fd2=%i len=%i filenum=%i "
|
||||
"localoffset=%i. usepart=%i error=%s.",
|
||||
(int32_t)len,fstate->m_offset,
|
||||
//fstate->m_this->getDir(),
|
||||
//fstate->m_this->getFilename(),
|
||||
fstate->m_fd1,
|
||||
fstate->m_fd2,
|
||||
len,
|
||||
filenum,
|
||||
localOffset,
|
||||
fstate->m_usePartFiles,
|
||||
mstrerror(errno));
|
||||
errno = EBADENGINEER;
|
||||
return false; // log("disk::read/write: offset too big");
|
||||
|
16
BigFile.h
16
BigFile.h
@ -143,17 +143,17 @@ class BigFile {
|
||||
void *pc = NULL ,
|
||||
int64_t maxFileSize = -1 ,
|
||||
int permissions =
|
||||
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH ,
|
||||
bool usePartFiles = true );
|
||||
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH );
|
||||
//bool usePartFiles = true );
|
||||
|
||||
// this will set usepartfiles to false! so use this to open large
|
||||
// warc or arc files
|
||||
bool open2 ( int flags ,
|
||||
//class DiskPageCache *pc = NULL ,
|
||||
void *pc = NULL ,
|
||||
int64_t maxFileSize = -1 ,
|
||||
int permissions =
|
||||
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH );
|
||||
//bool open2 ( int flags ,
|
||||
// //class DiskPageCache *pc = NULL ,
|
||||
// void *pc = NULL ,
|
||||
// int64_t maxFileSize = -1 ,
|
||||
// int permissions =
|
||||
// S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH );
|
||||
|
||||
|
||||
|
||||
|
@ -1700,13 +1700,24 @@ collnum_t Collectiondb::reserveCollNum ( ) {
|
||||
return next;
|
||||
}
|
||||
|
||||
// collnum_t is signed right now because we use -1 to indicate a
|
||||
// bad collnum.
|
||||
int32_t scanned = 0;
|
||||
// search for an empty slot
|
||||
for ( int32_t i = m_wrapped ; i < m_numRecs ; i++ ) {
|
||||
for ( int32_t i = m_wrapped ; ; i++ ) {
|
||||
// because collnum_t is 2 bytes, signed, limit this here
|
||||
if ( i > 0x7fff ) i = 0;
|
||||
// how can this happen?
|
||||
if ( i < 0 ) i = 0;
|
||||
// if we scanned the max # of recs we could have, we are done
|
||||
if ( ++scanned >= m_numRecs ) break;
|
||||
// skip if this is in use
|
||||
if ( m_recs[i] ) continue;
|
||||
// start after this one next time
|
||||
m_wrapped = i+1;
|
||||
// note it
|
||||
log("colldb: returning wrapped collnum of %"INT32"",(int32_t)i);
|
||||
log("colldb: returning wrapped collnum "
|
||||
"of %"INT32"",(int32_t)i);
|
||||
return (collnum_t)i;
|
||||
}
|
||||
|
||||
|
1
Conf.h
1
Conf.h
@ -682,6 +682,7 @@ class Conf {
|
||||
bool m_diffbotMsg13Hack ;
|
||||
bool m_logDebugUrlAttempts ;
|
||||
bool m_logDebugTcp ;
|
||||
bool m_logDebugTcpBuf ;
|
||||
bool m_logDebugThread ;
|
||||
bool m_logDebugTimedb ;
|
||||
bool m_logDebugTitle ;
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include "sort.h"
|
||||
#include "XmlDoc.h" // score32to8()
|
||||
#include "Rebalance.h"
|
||||
#include "Process.h"
|
||||
|
||||
Linkdb g_linkdb;
|
||||
Linkdb g_linkdb2;
|
||||
@ -1130,6 +1131,12 @@ bool Msg25::doReadLoop ( ) {
|
||||
ms,m_site,m_url,m_docId,KEYSTR(&startKey,LDBKS));
|
||||
}
|
||||
|
||||
if ( g_process.m_mode == EXIT_MODE ) {
|
||||
log("linkdb: shutting down. exiting link text loop.");
|
||||
g_errno = ESHUTTINGDOWN;
|
||||
return false;
|
||||
}
|
||||
|
||||
m_gettingList = true;
|
||||
|
||||
CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
|
||||
|
@ -2437,6 +2437,9 @@ bool Msg40::gotSummary ( ) {
|
||||
for ( int32_t i = 0 ; dedupPercent && i < m_numReplies ; i++ ) {
|
||||
// skip if already invisible
|
||||
if ( m_msg3a.m_clusterLevels[i] != CR_OK ) continue;
|
||||
// Skip if invalid
|
||||
if ( m_msg20[i]->m_errno ) continue;
|
||||
|
||||
// start with the first docid we have not yet checked!
|
||||
//int32_t m = oldNumContiguous;
|
||||
// get it
|
||||
@ -2455,6 +2458,8 @@ bool Msg40::gotSummary ( ) {
|
||||
// skip if already invisible
|
||||
if ( *level != CR_OK ) continue;
|
||||
// get it
|
||||
if ( m_msg20[m]->m_errno ) continue;
|
||||
|
||||
Msg20Reply *mrm = m_msg20[m]->m_r;
|
||||
// do not dedup CT_STATUS results, those are
|
||||
// spider reply "documents" that indicate the last
|
||||
|
@ -608,7 +608,7 @@ void sendUdpReply7 ( void *state ) {
|
||||
|
||||
uint32_t statColor = 0xccffcc;
|
||||
if(xd->m_indexCode) {
|
||||
statColor = 0x4e99e9;
|
||||
statColor = 0xaaddaa;//0x4e99e9;
|
||||
}
|
||||
g_stats.addStat_r ( xd->m_rawUtf8ContentSize,
|
||||
xd->m_injectStartTime,
|
||||
@ -645,7 +645,6 @@ void sendUdpReply7 ( void *state ) {
|
||||
|
||||
void handleRequest7 ( UdpSlot *slot , int32_t netnice ) {
|
||||
|
||||
|
||||
InjectionRequest *ir = (InjectionRequest *)slot->m_readBuf;
|
||||
|
||||
// now just supply the first guy's char ** and size ptr
|
||||
|
@ -121,7 +121,6 @@ bool sendPageGraph ( TcpSocket *s, HttpRequest *r ) {
|
||||
st->m_endDate = st->m_endDateR;
|
||||
}
|
||||
|
||||
g_statsdb.addDocsIndexed();
|
||||
//
|
||||
// this is no longer a gif, but an html graph in g_statsdb.m_sb
|
||||
//
|
||||
|
10
Parms.cpp
10
Parms.cpp
@ -19750,6 +19750,16 @@ void Parms::init ( ) {
|
||||
m->m_obj = OBJ_CONF;
|
||||
m++;
|
||||
|
||||
m->m_title = "log debug tcp buffer messages";
|
||||
m->m_cgi = "ldtb";
|
||||
m->m_off = (char *)&g_conf.m_logDebugTcpBuf - g;
|
||||
m->m_type = TYPE_BOOL;
|
||||
m->m_def = "0";
|
||||
m->m_priv = 1;
|
||||
m->m_page = PAGE_LOG;
|
||||
m->m_obj = OBJ_CONF;
|
||||
m++;
|
||||
|
||||
m->m_title = "log debug thread messages";
|
||||
m->m_cgi = "ldth";
|
||||
m->m_off = (char *)&g_conf.m_logDebugThread - g;
|
||||
|
@ -1865,7 +1865,7 @@ Profiler::printRealTimeInfo(SafeBuf *sb,
|
||||
int fd = open ( filename , O_RDWR | O_CREAT , S_IRWXU );
|
||||
if ( fd < 0 ) {
|
||||
sb->safePrintf("FAILED TO OPEN %s for writing: %s"
|
||||
,ff.getBufStart(),strerror(errno));
|
||||
,ff.getBufStart(),mstrerror(errno));
|
||||
return false;
|
||||
}
|
||||
for ( ; ip < ipEnd ; ip += sizeof(uint64_t) ) {
|
||||
@ -1892,6 +1892,13 @@ Profiler::printRealTimeInfo(SafeBuf *sb,
|
||||
|
||||
// restrict to top 100 lines
|
||||
char *x = out.getBufStart();
|
||||
|
||||
if ( ! x ) {
|
||||
sb->safePrintf("FAILED TO READ trash/output.txt: %s"
|
||||
,mstrerror(g_errno));
|
||||
return false;
|
||||
}
|
||||
|
||||
int lineCount = 0;
|
||||
for ( ; *x ; x++ ) {
|
||||
if ( *x != '\n' ) continue;
|
||||
|
@ -148,6 +148,7 @@ bool RdbScan::setRead ( BigFile *file ,
|
||||
// ensure we don't mess around
|
||||
m_fstate.m_allocBuf = NULL;
|
||||
m_fstate.m_buf = NULL;
|
||||
//m_fstate.m_usePartFiles = true;
|
||||
// debug msg
|
||||
//log("diskOff=%"INT64" nb=%"INT32"",offset,bytesToRead);
|
||||
//if ( offset == 16386 && bytesToRead == 16386 )
|
||||
|
@ -431,7 +431,7 @@ bool SafeBuf::reserve(int32_t i , char *label, bool clearIt ) {
|
||||
//buffer size.
|
||||
bool SafeBuf::reserve2x(int32_t i, char *label) {
|
||||
//watch out for overflow!
|
||||
if((m_capacity << 1) + i < 0) return false;
|
||||
if((m_capacity << 1) + i < m_capacity) return false;
|
||||
if(i + m_length >= m_capacity)
|
||||
return reserve(m_capacity + i,label);
|
||||
else return true;
|
||||
|
@ -13598,10 +13598,10 @@ void gotCrawlInfoReply ( void *state , UdpSlot *slot ) {
|
||||
// just copy into the stats buf
|
||||
if ( ! cr->m_crawlInfoBuf.getBufStart() ) {
|
||||
int32_t need = sizeof(CrawlInfo) * g_hostdb.m_numHosts;
|
||||
cr->m_crawlInfoBuf.setLabel("cibuf");
|
||||
cr->m_crawlInfoBuf.reserve(need);
|
||||
// in case one was udp server timed out or something
|
||||
cr->m_crawlInfoBuf.zeroOut();
|
||||
cr->m_crawlInfoBuf.setLabel("cibuf");
|
||||
}
|
||||
|
||||
CrawlInfo *cia = (CrawlInfo *)cr->m_crawlInfoBuf.getBufStart();
|
||||
|
@ -5,7 +5,10 @@
|
||||
#include "HttpServer.h"
|
||||
#include "SpiderProxy.h"
|
||||
|
||||
#define LOADPOINT_EXPIRE_MS (10*60*1000)
|
||||
//#define LOADPOINT_EXPIRE_MS (10*60*1000)
|
||||
// make it 15 seconds not 10 minutes otherwise it gets too full with dup
|
||||
// keys and really clogs things up
|
||||
#define LOADPOINT_EXPIRE_MS (15*1000)
|
||||
|
||||
//
|
||||
// BASIC DETAILS
|
||||
@ -927,12 +930,12 @@ void handleRequest54 ( UdpSlot *udpSlot , int32_t niceness ) {
|
||||
// and the loadbucket id
|
||||
//*(int32_t *)p = bb.m_id; p += 4;
|
||||
|
||||
int32_t sanityCount = s_loadTable.getNumSlots();
|
||||
//int32_t sanityCount = 0;//s_loadTable.getNumSlots();
|
||||
// top:
|
||||
|
||||
// now remove old entries from the load table. entries that
|
||||
// have completed and have a download end time more than 10 mins ago
|
||||
for ( int32_t i = 0 ; i < s_loadTable.getNumSlots() ; i++ ) {
|
||||
if ( sanityCount-- < 0 ) break;
|
||||
// skip if empty
|
||||
if ( ! s_loadTable.m_flags[i] ) continue;
|
||||
// get the bucket
|
||||
@ -941,8 +944,12 @@ void handleRequest54 ( UdpSlot *udpSlot , int32_t niceness ) {
|
||||
if ( pp->m_downloadEndTimeMS == 0LL ) continue;
|
||||
// delta t
|
||||
int64_t took = nowms - pp->m_downloadEndTimeMS;
|
||||
// < 10 mins?
|
||||
// < 10 mins? now it's < 15 seconds to prevent clogging.
|
||||
if ( took < LOADPOINT_EXPIRE_MS ) continue;
|
||||
|
||||
// 100 at a time
|
||||
//if ( sanityCount++ > 100 ) break;
|
||||
|
||||
// ok, its too old, nuke it to save memory
|
||||
s_loadTable.removeSlot(i);
|
||||
// the keys might have buried us but we really should not
|
||||
@ -950,6 +957,7 @@ void handleRequest54 ( UdpSlot *udpSlot , int32_t niceness ) {
|
||||
// should we? TODO: figure it out. if we miss a few it's not
|
||||
// a big deal.
|
||||
i--;
|
||||
//goto top;
|
||||
}
|
||||
|
||||
// send the proxy ip/port/LBid back to user
|
||||
@ -1041,6 +1049,7 @@ bool initSpiderProxyStuff() {
|
||||
128,
|
||||
NULL,
|
||||
0,
|
||||
// this slows us down
|
||||
true, // allow dups?
|
||||
MAX_NICENESS,
|
||||
"lbtab",
|
||||
|
64
Statsdb.cpp
64
Statsdb.cpp
@ -65,21 +65,19 @@ static Label s_labels[] = {
|
||||
// . 300MB/s is max read rate regardless to stop graph shrinkage
|
||||
// . use 1KB as the min resolution per pixel
|
||||
// . stored in Bps so use 1/1000 as scalar to get into KBps
|
||||
{ GRAPH_QUANTITY,200,"disk_read",1,"%.0f MBps",1.0/(1000.0*1000.0),0x000000,
|
||||
"disk read"},
|
||||
{ GRAPH_QUANTITY,200,"disk_read",1,"%.0f MBps",1.0/(1000.0*1000.0),0x000000,"disk read"},
|
||||
|
||||
// . 300MB/s is max write rate regardless to stop graph shrinkage
|
||||
// . use 1KB as the min resolution per pixel
|
||||
// . stored in Bps so use 1/1000 as scalar to get into KBps
|
||||
{GRAPH_QUANTITY,200,"disk_write",1,"%.0f Mbps",1.0/(1000.0*1000.0), 0xff0000,
|
||||
"disk write"},
|
||||
{GRAPH_QUANTITY,200,"disk_write",1,"%.0f Mbps",1.0/(1000.0*1000.0), 0xff0000, "disk write"},
|
||||
|
||||
// . 20 is the max dps regardless to stop graph shrinkage
|
||||
// . use .03 qps as the min resolution per pixel
|
||||
{GRAPH_OPS,20,"parse_doc", .005,"%.1f dps" , 1.0 , 0x00fea915,"parsed doc" },
|
||||
|
||||
|
||||
{GRAPH_QUANTITY_PER_OP,1000,"docs_per_second", .005,"%.1f docs" , .001 , 0x1F2F5C,"docs per second" },
|
||||
{GRAPH_QUANTITY_PER_OP,-1,"docs_per_second", .1,"%.1f docs per second" , -1 , 0x1F2F5C,"docs per second" },
|
||||
|
||||
// . use .1 * 1000 docs as the min resolution per pixel
|
||||
// . max = -1, means dynamic size the ymax!
|
||||
@ -88,7 +86,7 @@ static Label s_labels[] = {
|
||||
// . make it 2M now not 50M. seems like it is per pixel and theres
|
||||
// like 1000 pixels vertically. but we need to autoscale it
|
||||
// eventually
|
||||
{GRAPH_QUANTITY,2000000.0,"docs_indexed", .1,"%.0fK docs" , .001 , 0x00cc0099,"docs indexed" }
|
||||
{GRAPH_QUANTITY,-1,"docs_indexed", .1,"%.0f docs" , -1, 0x00cc0099,"docs indexed" }
|
||||
|
||||
|
||||
//{ "termlist_intersect",0x0000ff00},
|
||||
@ -122,6 +120,7 @@ Label *Statsdb::getLabel ( int32_t labelHash ) {
|
||||
return *label;
|
||||
}
|
||||
|
||||
|
||||
Statsdb::Statsdb ( ) {
|
||||
m_init = false;
|
||||
m_disabled = true;
|
||||
@ -246,6 +245,8 @@ void flushStatsWrapper ( int fd , void *state ) {
|
||||
void Statsdb::addDocsIndexed ( ) {
|
||||
|
||||
if ( ! isClockInSync() ) return;
|
||||
if ( g_hostdb.hasDeadHost() ) return;
|
||||
|
||||
|
||||
// only host #0 needs this
|
||||
if ( g_hostdb.m_hostId != 0 ) return;
|
||||
@ -270,18 +271,23 @@ void Statsdb::addDocsIndexed ( ) {
|
||||
// divide by # of groups
|
||||
total /= g_hostdb.getNumHostsPerShard();
|
||||
// skip if no change
|
||||
|
||||
if ( total == s_lastTotal ) return;
|
||||
|
||||
int32_t docsIndexedInInterval = total - s_lastTotal;
|
||||
float docsPerSecond = docsIndexedInInterval / (float)interval;
|
||||
|
||||
s_lastTotal = total;
|
||||
log("build: total docs indexed: %f. docs per second %f %i %i", (float)total, docsPerSecond, docsIndexedInInterval, interval);
|
||||
|
||||
// add it if changed though
|
||||
int64_t nowms = gettimeofdayInMillisecondsGlobal();
|
||||
addStat ( MAX_NICENESS,"docs_indexed", nowms, nowms, (float)total );
|
||||
addStat ( MAX_NICENESS,"docs_per_second", nowms, nowms, docsPerSecond );
|
||||
// Prevent a datapoint which adds all of the docs indexed to date.
|
||||
if( s_lastTotal != 0 ) {
|
||||
addStat ( MAX_NICENESS,"docs_per_second", nowms, nowms, docsPerSecond );
|
||||
}
|
||||
|
||||
s_lastTotal = total;
|
||||
}
|
||||
|
||||
// . m_key bitmap in statsdb:
|
||||
@ -896,7 +902,7 @@ char *Statsdb::plotGraph ( char *pstart ,
|
||||
bool needMax = true;
|
||||
float ymin = 0.0;
|
||||
float ymax = 0.0;
|
||||
|
||||
float yscalar = label->m_yscalar;
|
||||
char *p = pstart;
|
||||
|
||||
for ( ; p < pend ; p += 12 ) {
|
||||
@ -909,7 +915,8 @@ char *Statsdb::plotGraph ( char *pstart ,
|
||||
// stop if not us
|
||||
if ( gh != graphHash ) continue;
|
||||
// put into scaled space right away
|
||||
y2 = y2 * label->m_yscalar;
|
||||
if (label->m_yscalar >= 0)
|
||||
y2 = y2 * label->m_yscalar;
|
||||
// . limit y to absolute max
|
||||
// . these units should be scaled as well!
|
||||
if ( y2 > label->m_absYMax && label->m_absYMax > 0.0 )
|
||||
@ -922,13 +929,21 @@ char *Statsdb::plotGraph ( char *pstart ,
|
||||
}
|
||||
|
||||
// force to zero for now
|
||||
ymin = 0.0;
|
||||
//ymin = 0.0;
|
||||
// . and force to ymax for now as well
|
||||
// . -1 indicates dynamic though!
|
||||
if ( label->m_absYMax > 0.0 ) ymax = label->m_absYMax;
|
||||
// add a 20% ceiling
|
||||
else ymax *= 1.20;
|
||||
// else ymax *= 1.20;
|
||||
|
||||
|
||||
if( label->m_yscalar <= 0 ) {
|
||||
if(ymax == ymin) {
|
||||
yscalar = 0;
|
||||
} else {
|
||||
yscalar = (float)DY2 / (ymax - ymin);
|
||||
}
|
||||
}
|
||||
// return that!
|
||||
char *retp = p;
|
||||
|
||||
@ -951,7 +966,7 @@ char *Statsdb::plotGraph ( char *pstart ,
|
||||
|
||||
// . pad y range if total range is small
|
||||
// . only do this for certain types of stats, like qps and disk i/o
|
||||
if ( ourDiff < minDiff ) {
|
||||
if ( label->m_yscalar >=0 && ourDiff < minDiff ) {
|
||||
float pad = (minDiff - ourDiff) / 2;
|
||||
// pad it out
|
||||
ymin -= pad ;
|
||||
@ -987,10 +1002,16 @@ char *Statsdb::plotGraph ( char *pstart ,
|
||||
float y2 = *(float *)p; p += 4;
|
||||
|
||||
// scale it right away
|
||||
y2 *= label->m_yscalar;
|
||||
if(label->m_yscalar < 0) {
|
||||
y2 = (y2 - ymin) * yscalar;
|
||||
}
|
||||
else {
|
||||
y2 *= yscalar;
|
||||
|
||||
}
|
||||
// adjust
|
||||
if ( y2 > ymax ) y2 = ymax;
|
||||
if ( y2 < 0 ) y2 = 0;
|
||||
|
||||
// then graphHash
|
||||
int32_t gh = *(int32_t *)p; p += 4;
|
||||
@ -1003,8 +1024,10 @@ char *Statsdb::plotGraph ( char *pstart ,
|
||||
float y1 = lasty;
|
||||
|
||||
// normalize y into pixel space
|
||||
y2 = ((float)DY2 * (y2 - ymin)) / (ymax-ymin);
|
||||
|
||||
if(label->m_yscalar >= 0 && ymax != ymin) {
|
||||
y2 = ((float)DY2 * (y2 - ymin)) / (ymax-ymin);
|
||||
}
|
||||
|
||||
// set lasts for next iteration of this loop
|
||||
lastx = x2;
|
||||
lasty = y2;
|
||||
@ -1194,6 +1217,13 @@ bool Statsdb::processList ( ) {
|
||||
m_done = true;
|
||||
}
|
||||
|
||||
// HACK: the user can request all of the events, it can
|
||||
// become quite large. so limit to 100 mb right now.
|
||||
if( m_sb3.length() > 100000000) {
|
||||
log("statsdb: truncating statsdb results.");
|
||||
m_done = true;
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// all these points are accumulated into 1-second buckets
|
||||
@ -1599,3 +1629,5 @@ void Statsdb::drawLine3 ( SafeBuf &sb ,
|
||||
, color
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
|
@ -598,7 +598,7 @@ bool TcpServer::sendMsg ( int32_t ip ,
|
||||
// we think that they are using an sd used by a streaming socket,
|
||||
// who closed, but then proceed to use TcpSocket class as if he
|
||||
// had not closed it.
|
||||
if ( 1==2 && g_hostdb.m_hostId == 0 ) {
|
||||
if ( g_conf.m_logDebugTcpBuf ) {
|
||||
SafeBuf sb;
|
||||
sb.safePrintf("tcp: open newsd=%i sendbuf=",s->m_sd);
|
||||
sb.safeTruncateEllipsis (sendBuf,sendBufSize,200);
|
||||
@ -2276,7 +2276,7 @@ void TcpServer::destroySocket ( TcpSocket *s ) {
|
||||
// 0 is the FD for stdin so i don't know how that is happening.
|
||||
if ( sd != 0 ) cret = ::close ( sd );
|
||||
|
||||
if ( 1==2 && g_hostdb.m_hostId == 0 ) {
|
||||
if ( g_conf.m_logDebugTcpBuf ) {
|
||||
SafeBuf sb;
|
||||
sb.safePrintf("tcp: closing sd=%i bytessent=%i "
|
||||
"sendbufused=%i streaming=%i "
|
||||
@ -2619,7 +2619,7 @@ TcpSocket *TcpServer::acceptSocket ( ) {
|
||||
// we think that they are using an sd used by a streaming socket,
|
||||
// who closed, but then proceed to use TcpSocket class as if he
|
||||
// had not closed it.
|
||||
if ( 1==2 && g_hostdb.m_hostId == 0 ) {
|
||||
if ( g_conf.m_logDebugTcpBuf ) {
|
||||
SafeBuf sb;
|
||||
sb.safePrintf("tcp: accept newsd=%i incoming req",newsd);
|
||||
//sb.safeTruncateEllipsis (sendBuf,sendBufSize,200);
|
||||
|
@ -286,6 +286,7 @@ bool UdpServer::init ( uint16_t port, UdpProtocol *proto, int32_t niceness,
|
||||
// no requests waiting yet
|
||||
m_requestsInWaiting = 0;
|
||||
// special count
|
||||
m_msg07sInWaiting = 0;
|
||||
m_msg10sInWaiting = 0;
|
||||
m_msgc1sInWaiting = 0;
|
||||
//m_msgDsInWaiting = 0;
|
||||
@ -1005,7 +1006,7 @@ UdpSlot *UdpServer::getBestSlotToSend ( int64_t now ) {
|
||||
UdpSlot *maxi = NULL;
|
||||
int32_t score;
|
||||
//UdpSlot *slot;
|
||||
// . we send dgrams with the lowest "score" first
|
||||
// . we send dgrams with the lowest "score" first
|
||||
// . the "score" is just number of ACKs you're waiting for
|
||||
// . that way transmissions that are the most caught up to their ACKs
|
||||
// are considered faster so we send to them first
|
||||
@ -1482,6 +1483,9 @@ int32_t UdpServer::readSock_ass ( UdpSlot **slotPtr , int64_t now ) {
|
||||
// rate, these are pretty lightweight. msg 0x10 reply gen times
|
||||
// are VERY low. MDW
|
||||
bool getSlot = true;
|
||||
if ( msgType == 0x07 && m_msg07sInWaiting >= 100 )
|
||||
getSlot = false;
|
||||
|
||||
if ( msgType == 0x10 && m_msg10sInWaiting >= 50 )
|
||||
getSlot = false;
|
||||
// crawl update info from Spider.cpp
|
||||
@ -1671,6 +1675,7 @@ int32_t UdpServer::readSock_ass ( UdpSlot **slotPtr , int64_t now ) {
|
||||
// if we connected to a request slot, count it
|
||||
m_requestsInWaiting++;
|
||||
// special count
|
||||
if ( msgType == 0x07 ) m_msg07sInWaiting++;
|
||||
if ( msgType == 0x10 ) m_msg10sInWaiting++;
|
||||
if ( msgType == 0xc1 ) m_msgc1sInWaiting++;
|
||||
//if ( msgType == 0xd ) m_msgDsInWaiting++;
|
||||
@ -3122,6 +3127,7 @@ void UdpServer::destroySlot ( UdpSlot *slot ) {
|
||||
// one less request in waiting
|
||||
m_requestsInWaiting--;
|
||||
// special count
|
||||
if ( slot->m_msgType == 0x07 ) m_msg07sInWaiting--;
|
||||
if ( slot->m_msgType == 0x10 ) m_msg10sInWaiting--;
|
||||
if ( slot->m_msgType == 0xc1 ) m_msgc1sInWaiting--;
|
||||
//if ( slot->m_msgType == 0xd ) m_msgDsInWaiting--;
|
||||
|
@ -390,6 +390,7 @@ class UdpServer {
|
||||
int32_t m_requestsInWaiting;
|
||||
|
||||
// like m_requestsInWaiting but requests which spawn other requests
|
||||
int32_t m_msg07sInWaiting;
|
||||
int32_t m_msg10sInWaiting;
|
||||
int32_t m_msgc1sInWaiting;
|
||||
//int32_t m_msgDsInWaiting;
|
||||
|
12
XmlDoc.cpp
12
XmlDoc.cpp
@ -226,7 +226,7 @@ void XmlDoc::reset ( ) {
|
||||
if ( ! msg7 ) continue;
|
||||
if(msg7->m_inUse) {
|
||||
log("build: archive: reseting xmldoc when msg7s are outstanding");
|
||||
|
||||
|
||||
}
|
||||
mdelete ( msg7 , sizeof(Msg7) , "xdmsg7" );
|
||||
delete ( msg7 );
|
||||
@ -3353,6 +3353,10 @@ void doneInjectingArchiveRec ( void *state ) {
|
||||
xd->m_numInjectionsOut--;
|
||||
log("build: archive: injection thread returned. %"INT32" out now.",
|
||||
xd->m_numInjectionsOut);
|
||||
// reset g_errno so it doesn't error out in ::indexDoc() when
|
||||
// we are injecting a ton of these msg7s and then xmldoc ends up
|
||||
// getting reset and when a msg7 reply comes back in, we core
|
||||
g_errno = 0;
|
||||
xd->m_masterLoop ( xd );
|
||||
}
|
||||
|
||||
@ -19359,8 +19363,10 @@ BigFile *XmlDoc::getUtf8ContentInFile ( int64_t *fileSizeArg ) {
|
||||
m_fileSize = m_file.getFileSize();
|
||||
m_fileValid = true;
|
||||
*fileSizeArg = m_fileSize;
|
||||
// open2() has usepartfiles = false!!!
|
||||
m_file.open2(O_RDONLY);
|
||||
m_file.open(O_RDONLY);
|
||||
// explicitly set it to false now to make it harder for
|
||||
// it not to be true because that messes things up
|
||||
m_file.m_usePartFiles = false;
|
||||
return &m_file;
|
||||
}
|
||||
|
||||
|
@ -27,6 +27,7 @@ bool sendPageSEO(TcpSocket *s, HttpRequest *hr) {return true;}
|
||||
//SafeBuf g_qbuf;
|
||||
|
||||
bool g_recoveryMode;
|
||||
int32_t g_recoveryLevel;
|
||||
|
||||
int g_inMemcpy;
|
||||
|
||||
|
2
main.cpp
2
main.cpp
@ -5022,7 +5022,7 @@ int install ( install_flag_konst_t installFlag , int32_t hostId , char *dir ,
|
||||
if ( ! f.doesExist() ) target = "gb";
|
||||
|
||||
sprintf(tmp,
|
||||
"scp -c arcfour " // blowfish is faster
|
||||
"scp " // blowfish is faster
|
||||
"%s%s "
|
||||
"%s:%s/gb.installed%s",
|
||||
dir,
|
||||
|
@ -14,6 +14,8 @@ import time
|
||||
import flask
|
||||
import signal, os
|
||||
import random
|
||||
from itertools import repeat
|
||||
|
||||
|
||||
app = flask.Flask(__name__)
|
||||
app.secret_key = 'oaisj84alwsdkjhf9238u'
|
||||
@ -315,21 +317,39 @@ def main():
|
||||
|
||||
if len(sys.argv) == 3:
|
||||
|
||||
if sys.argv[1] == 'force':
|
||||
itemName = sys.argv[2]
|
||||
db = getDb()
|
||||
injectItem(itemName, db, 'production')
|
||||
sys.exit(0)
|
||||
|
||||
if sys.argv[1] == 'forcefile':
|
||||
global staleTime
|
||||
staleTime = datetime.timedelta(0,0,0)
|
||||
from multiprocessing.pool import ThreadPool
|
||||
fileName = sys.argv[2]
|
||||
items = filter(lambda x: x, open(fileName, 'r').read().split('\n'))
|
||||
pool = ThreadPool(processes=len(items))
|
||||
#print zip(files, repeat(getDb(), len(files)), repeat('production', len(files)))
|
||||
def injectItemTupleWrapper(itemName):
|
||||
db = getDb()
|
||||
ret = injectItem(itemName, db, 'production')
|
||||
db.close()
|
||||
return ret
|
||||
|
||||
answer = pool.map(injectItemTupleWrapper, items)
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if sys.argv[1] == 'run':
|
||||
threads = int(sys.argv[2])
|
||||
runInjects(threads)
|
||||
|
||||
# else:
|
||||
# #getPage(3)
|
||||
# from multiprocessing.pool import ThreadPool
|
||||
# pool = ThreadPool(processes=150)
|
||||
# pool.map(getPage, xrange(1,1300))
|
||||
|
||||
def runInjects(threads, mode='production'):
|
||||
from multiprocessing.pool import ThreadPool
|
||||
pool = ThreadPool(processes=threads)
|
||||
try:
|
||||
from itertools import repeat
|
||||
maxPages = 1300
|
||||
answer = pool.map(getPage, zip(xrange(1,maxPages), repeat(mode, maxPages)))
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
|
@ -100,7 +100,7 @@ def getSplitTime():
|
||||
|
||||
|
||||
|
||||
def copyToTwins(fname):
|
||||
def copyToTwins(fname, backToFront=False):
|
||||
fh = open(fname, 'r')
|
||||
ret = {}
|
||||
hosts = []
|
||||
@ -117,23 +117,25 @@ def copyToTwins(fname):
|
||||
continue
|
||||
#print directory, ip1, note
|
||||
step = len(hosts)/2
|
||||
hostPlex = {}
|
||||
someIp = None
|
||||
cmds = []
|
||||
for hostId, dnsPort, httpsPort, httpPort, udbPort,ip1, ip2, directory, note in hosts[:step]:
|
||||
if ip1 not in hostPlex:
|
||||
hostPlex[ip1] = []
|
||||
someIp = ip1
|
||||
hostPlex[ip1].append('scp -r %s:%s* %s:%s. ' % (ip1, directory, (hosts[hostId + step][5]), (hosts[hostId + step][7])))
|
||||
backHostId, backDnsPort, backHttpsPort, backHttpPort, backUdbPort,backIp1, backIp2, backDirectory, backNote = hosts[hostId + step]
|
||||
|
||||
if note != directory:
|
||||
print 'oh looks like you overlooked host %s' % hostId
|
||||
if backNote != backDirectory:
|
||||
print 'oh looks like you overlooked host %s' % backHostId
|
||||
|
||||
if backToFront:
|
||||
cmd = 'scp -r %s:%s* %s:%s. &' % (backIp1, backDirectory, ip1, directory )
|
||||
else:
|
||||
cmd = 'scp -r %s:%s* %s:%s. &' % (ip1, directory, backIp1, backDirectory)
|
||||
cmds.append(cmd)
|
||||
#print 'scp -r %s:%s* %s:%s. &' % (ip1, directory, (hosts[hostId + step][5]), (hosts[hostId + step][7]))
|
||||
|
||||
while len(hostPlex[someIp]) > 0:
|
||||
cmd = []
|
||||
for cmd in cmds:
|
||||
print cmd
|
||||
|
||||
for ip in hostPlex.iterkeys():
|
||||
cmd.append(hostPlex[ip].pop())
|
||||
#print hostPlex[ip].pop()
|
||||
|
||||
print '&\n'.join(cmd), ';'
|
||||
|
||||
|
||||
def testDiskSpeed(host, directory):
|
||||
|
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user