Merge branch 'diffbot-testing' into testing

This commit is contained in:
Matt
2015-09-11 12:41:22 -06:00
6 changed files with 73 additions and 23 deletions

@ -683,6 +683,7 @@ bool Msg3::readList ( char rdbId ,
////////
BigFile *ff = base->getFile(m_fileNums[i]);
RdbCache *rpc = getDiskPageCache ( m_rdbId );
if ( ! m_allowPageCache ) rpc = NULL;
// . vfd is unique 64 bit file id
// . if file is opened vfd is -1, only set in call to open()
int64_t vfd = ff->getVfd();
@ -1057,6 +1058,7 @@ bool Msg3::doneScanning ( ) {
// compute cache info
RdbCache *rpc = getDiskPageCache ( m_rdbId );
if ( ! m_allowPageCache ) rpc = NULL;
int64_t vfd ;
if ( ff ) vfd = ff->getVfd();
key192_t ck ;

@ -2030,7 +2030,7 @@ bool Msg40::gotSummary ( ) {
// . set it to true on all but the last thing we send!
// . after each chunk of data we send out, TcpServer::sendChunk
// will call our callback, doneSendingWrapper9
if ( m_si->m_streamResults )
if ( m_si->m_streamResults && st->m_socket )
st->m_socket->m_streamingMode = true;
@ -2112,7 +2112,7 @@ bool Msg40::gotSummary ( ) {
if ( g_conf.m_logDebugTcp )
log("tcp: disabling streamingMode now");
// this will be our final send
st->m_socket->m_streamingMode = false;
if ( st->m_socket ) st->m_socket->m_streamingMode = false;
}
@ -2123,12 +2123,18 @@ bool Msg40::gotSummary ( ) {
// do we still own this socket? i am thinking it got closed somewhere
// and the socket descriptor was re-assigned to another socket
// getting a diffbot reply from XmLDoc::getDiffbotReply()
TcpSocket *s = st->m_socket;
if ( s->m_startTime != st->m_socketStartTimeHack ) {
log("msg40: lost control of socket. sd=%i. closed on us?",
(int)s->m_sd);
m_socketHadError = EBADENGINEER;
g_errno = EBADENGINEER;
if ( st->m_socket &&
st->m_socket->m_startTime != st->m_socketStartTimeHack ) {
log("msg40: lost control of socket. sd=%i. the socket "
"descriptor closed on us and got re-used by someone else.",
(int)st->m_socket->m_sd);
// if there wasn't already an error like 'broken pipe' then
// set it here so we stop getting summaries if streaming.
if ( ! m_socketHadError ) m_socketHadError = EBADENGINEER;
// make it NULL to avoid us from doing anything to it
// since sommeone else is using it now.
st->m_socket = NULL;
//g_errno = EBADENGINEER;
}
@ -2145,6 +2151,7 @@ bool Msg40::gotSummary ( ) {
if ( sb->length() &&
// did client browser close the socket on us midstream?
! m_socketHadError &&
st->m_socket &&
! tcp->sendChunk ( st->m_socket ,
sb ,
this ,
@ -2157,8 +2164,11 @@ bool Msg40::gotSummary ( ) {
// writing on closed socket?
if ( g_errno ) {
m_socketHadError = g_errno;
if ( ! m_socketHadError ) m_socketHadError = g_errno;
log("msg40: got tcp error : %s",mstrerror(g_errno));
// disown it here so we do not damage in case it gets
// reopened by someone else
st->m_socket = NULL;
}
// do we need to launch another batch of summary requests?
@ -2212,8 +2222,9 @@ bool Msg40::gotSummary ( ) {
//mdelete(st, sizeof(State0), "msg40st0");
//delete st;
// otherwise, all done!
log("msg40: did not send stuff from last summary. BUG "
"this=0x%"PTRFMT"",(PTRTYPE)this);
log("msg40: did not send last search result summary. "
"this=0x%"PTRFMT" because had error: %s",(PTRTYPE)this,
mstrerror(m_socketHadError));
return true;
}

@ -531,6 +531,10 @@ bool Msg5::readList ( ) {
int32_t niceness = m_niceness;
if ( niceness > 0 ) niceness = 2;
if ( m_isRealMerge ) niceness = 1;
bool allowPageCache = true;
// just in case cache is corrupted, do not use it for doing real
// merges, also it would kick out good lists we have in there already
if ( m_isRealMerge ) allowPageCache = false;
if ( compute ) {
m_msg3.readList ( m_rdbId ,
m_collnum ,
@ -547,7 +551,7 @@ bool Msg5::readList ( ) {
m_compensateForMerge ,
-1,//m_syncPoint ,
true , // just get endKey?
m_allowPageCache );
allowPageCache );
if ( g_errno ) {
log("db: Msg5: getting endKey: %s",mstrerrno(g_errno));
return true;

@ -68,8 +68,12 @@ bool sendReply ( State0 *st , char *reply ) {
int32_t savedErr = g_errno;
TcpSocket *s = st->m_socket;
if ( ! s ) { char *xx=NULL;*xx=0; }
TcpSocket *sock = st->m_socket;
if ( ! sock ) {
log("results: not sending back results on an empty socket."
"socket must have closed on us abruptly.");
//char *xx=NULL;*xx=0; }
}
SearchInput *si = &st->m_si;
char *ct = "text/html";
if ( si && si->m_format == FORMAT_XML ) ct = "text/xml";
@ -143,7 +147,8 @@ bool sendReply ( State0 *st , char *reply ) {
//
// send back the actual search results
//
g_httpServer.sendDynamicPage(s,
if ( sock )
g_httpServer.sendDynamicPage(sock,
reply,
rlen,//gbstrlen(reply),
// don't let the ajax re-gen
@ -199,9 +204,9 @@ bool sendReply ( State0 *st , char *reply ) {
// if we had a broken pipe from the browser while sending
// them the search results, then we end up closing the socket fd
// in TcpServer::sendChunk() > sendMsg() > destroySocket()
if ( s->m_numDestroys ) {
if ( sock && sock->m_numDestroys ) {
log("results: not sending back error on destroyed socket "
"sd=%"INT32"",s->m_sd);
"sd=%"INT32"",sock->m_sd);
return true;
}
@ -212,7 +217,8 @@ bool sendReply ( State0 *st , char *reply ) {
savedErr == ENOCOLLREC)
status = 400;
g_httpServer.sendQueryErrorReply(s,
if ( sock )
g_httpServer.sendQueryErrorReply(sock,
status,
mstrerror(savedErr),
format,//xml,
@ -1157,6 +1163,16 @@ bool gotResults ( void *state ) {
SearchInput *si = &st->m_si;
// if we lost the socket because we were streaming and it
// got closed from a broken pipe or something, then Msg40.cpp
// will set st->m_socket to NULL if the fd ends up ending closed
// because someone else might be using it and we do not want to
// mess with their TcpSocket settings.
if ( ! st->m_socket ) {
log("results: socket is NULL. sending failed.");
return sendReply(st,NULL);
}
// if in streaming mode and we never sent anything and we had
// an error, then send that back. we never really entered streaming
// mode in that case. this happens when someone deletes a coll

@ -9971,7 +9971,7 @@ bool sendPage ( State11 *st ) {
// the the waiting tree
int32_t node = sc->m_waitingTree.getFirstNode();
int32_t count = 0;
uint64_t nowMS = gettimeofdayInMillisecondsGlobal();
//uint64_t nowMS = gettimeofdayInMillisecondsGlobal();
for ( ; node >= 0 ; node = sc->m_waitingTree.getNextNode(node) ) {
// breathe
QUICKPOLL(MAX_NICENESS);

@ -1936,7 +1936,7 @@ int32_t TcpServer::writeSocket ( TcpSocket *s ) {
// another debug
//if ( g_conf.m_logDebugTcp )
log("tcp: only wrote %"INT32" of %"INT32" bytes "
"tried.",n,toSend);
"tried. sd=%i",n,toSend,s->m_sd);
// need to listen for writability now since our write
// failed to write everythin gout
if ( ! s->m_writeRegistered &&
@ -2278,7 +2278,13 @@ void TcpServer::destroySocket ( TcpSocket *s ) {
if ( 1==2 && g_hostdb.m_hostId == 0 ) {
SafeBuf sb;
sb.safePrintf("tcp: closing sd=%i sendbuf=",s->m_sd);
sb.safePrintf("tcp: closing sd=%i bytessent=%i "
"sendbufused=%i streaming=%i "
"sendbuf=",
s->m_sd,
s->m_sendOffset,
s->m_sendBufUsed,
(int)s->m_streamingMode);
if ( s->m_sendBuf )
sb.safeTruncateEllipsis(s->m_sendBuf,
s->m_sendBufSize,
@ -2291,6 +2297,10 @@ void TcpServer::destroySocket ( TcpSocket *s ) {
log("%s",sb.getBufStart());
}
// force it out of streaming mode since we closed it. then we
// should avoid the "not timing out streaming socket fd=123" msgs.
s->m_streamingMode = false;
if ( cret != 0 ) { // == -1 )
log("tcp: s=%"PTRFMT" close(%"INT32") = %"INT32" = %s",
(PTRTYPE)s,(int32_t)sd,cret,mstrerror(errno));
@ -2770,7 +2780,8 @@ bool TcpServer::sslAccept ( TcpSocket *s ) {
void TcpServer::makeCallback ( TcpSocket * s ) {
if ( ! s->m_callback ) {
// note it
log("tcp: null callback for s=0x%"PTRFMT"",(PTRTYPE)s);
if ( g_conf.m_logDebugTcp )
log("tcp: null callback for s=0x%"PTRFMT"",(PTRTYPE)s);
return;
}
// record times for profiler
@ -2821,7 +2832,8 @@ bool TcpServer::sendChunk ( TcpSocket *s ,
// sendChunk() again.
void (* doneSendingWrapper)( void *,TcpSocket *)){
log("tcp: sending chunk of %"INT32" bytes", sb->length() );
log("tcp: sending chunk of %"INT32" bytes sd=%i", sb->length() ,
s->m_sd );
// if socket had shit on there already, free that memory
// just like TcpServer::destroySocket would
@ -2862,6 +2874,11 @@ bool TcpServer::sendChunk ( TcpSocket *s ,
log("tcp: chunkend=%s",sb->getBuf() - minus);
*/
// char *p = sb->getBufStart();
// char *pend = p + sb->length();
// for ( ; p < pend ; p++ ) {
// if ( *p == '\0' ) { char *xx=NULL;*xx=0; }
// }
// . start the send process
// . returns false if send did not complete