Conflicts:
	Msg3.cpp
This commit is contained in:
Zak Betz
2015-09-11 14:22:08 -06:00
8 changed files with 160 additions and 40 deletions

@ -683,6 +683,7 @@ bool Msg3::readList ( char rdbId ,
////////
BigFile *ff = base->getFile(m_fileNums[i]);
RdbCache *rpc = getDiskPageCache ( m_rdbId );
if ( ! m_allowPageCache ) rpc = NULL;
// . vfd is unique 64 bit file id
// . if file is opened vfd is -1, only set in call to open()
int64_t vfd = ff->getVfd();
@ -701,8 +702,11 @@ bool Msg3::readList ( char rdbId ,
if ( inCache ) {
m_scans[i].m_inPageCache = true;
m_numScansCompleted++;
m_lists[i].set ( rec ,
recSize ,
// now we have to store this value, 6 or 12 so
// we can modify the hint appropriately
m_scans[i].m_shifted = *rec;
m_lists[i].set ( rec +1,
recSize-1 ,
rec , // alloc
recSize , // allocSize
startKey2 ,
@ -1050,21 +1054,18 @@ bool Msg3::doneScanning ( ) {
// if we did a merge really quick and delete one of the
// files we were reading, i've seen 'ff' be NULL
char *filename = "lostfilename";
int64_t vfd = -1;
if ( ff ) {
filename = ff->getFilename();
vfd = ff->getVfd();
}
if ( ff ) filename = ff->getFilename();
// compute cache info
RdbCache *rpc = getDiskPageCache ( m_rdbId );
if ( ! m_allowPageCache ) rpc = NULL;
int64_t vfd ;
if ( ff ) vfd = ff->getVfd();
key192_t ck ;
ck = makeCacheKey ( vfd ,
m_scans[i].m_offset ,
m_scans[i].m_bytesToRead );
if ( ff )
ck = makeCacheKey ( vfd ,
m_scans[i].m_offset ,
m_scans[i].m_bytesToRead );
if ( m_validateCache && ff && rpc && vfd != -1 ) {
bool inCache;
char *rec; int32_t recSize;
@ -1076,8 +1077,10 @@ bool Msg3::doneScanning ( ) {
-1 , // maxAge, none
true ); // inccounts?
if ( inCache &&
( m_lists[i].m_listSize != recSize ||
memcmp ( m_lists[i].m_list , rec , recSize ))) {
// 1st byte is RdbScan::m_shifted
( m_lists[i].m_listSize != recSize-1 ||
memcmp ( m_lists[i].m_list , rec+1,recSize-1) ||
*rec != m_scans[i].m_shifted ) ) {
log("msg3: cache did not validate");
char *xx=NULL;*xx=0;
}
@ -1097,8 +1100,13 @@ bool Msg3::doneScanning ( ) {
! m_scans[i].m_inPageCache )
rpc->addRecord ( (collnum_t)0 , // collnum
(char *)&ck ,
// rec1 is this little thingy
&m_scans[i].m_shifted,
1,
// rec2
m_lists[i].getList() ,
m_lists[i].getListSize() );
m_lists[i].getListSize() ,
0 ); // timestamp. 0 = now
// if from our 'page' cache, no need to constrain
if ( ! m_lists[i].constrain ( m_startKey ,

@ -2030,7 +2030,7 @@ bool Msg40::gotSummary ( ) {
// . set it to true on all but the last thing we send!
// . after each chunk of data we send out, TcpServer::sendChunk
// will call our callback, doneSendingWrapper9
if ( m_si->m_streamResults )
if ( m_si->m_streamResults && st->m_socket )
st->m_socket->m_streamingMode = true;
@ -2112,7 +2112,7 @@ bool Msg40::gotSummary ( ) {
if ( g_conf.m_logDebugTcp )
log("tcp: disabling streamingMode now");
// this will be our final send
st->m_socket->m_streamingMode = false;
if ( st->m_socket ) st->m_socket->m_streamingMode = false;
}
@ -2120,6 +2120,24 @@ bool Msg40::gotSummary ( ) {
//g_conf.m_logDebugTcp = 1;
// do we still own this socket? i am thinking it got closed somewhere
// and the socket descriptor was re-assigned to another socket
// getting a diffbot reply from XmLDoc::getDiffbotReply()
if ( st->m_socket &&
st->m_socket->m_startTime != st->m_socketStartTimeHack ) {
log("msg40: lost control of socket. sd=%i. the socket "
"descriptor closed on us and got re-used by someone else.",
(int)st->m_socket->m_sd);
// if there wasn't already an error like 'broken pipe' then
// set it here so we stop getting summaries if streaming.
if ( ! m_socketHadError ) m_socketHadError = EBADENGINEER;
// make it NULL to avoid us from doing anything to it
// since sommeone else is using it now.
st->m_socket = NULL;
//g_errno = EBADENGINEER;
}
// . transmit the chunk in sb if non-zero length
// . steals the allocated buffer from sb and stores in the
// TcpSocket::m_sendBuf, which it frees when socket is
@ -2133,6 +2151,7 @@ bool Msg40::gotSummary ( ) {
if ( sb->length() &&
// did client browser close the socket on us midstream?
! m_socketHadError &&
st->m_socket &&
! tcp->sendChunk ( st->m_socket ,
sb ,
this ,
@ -2145,8 +2164,11 @@ bool Msg40::gotSummary ( ) {
// writing on closed socket?
if ( g_errno ) {
m_socketHadError = g_errno;
if ( ! m_socketHadError ) m_socketHadError = g_errno;
log("msg40: got tcp error : %s",mstrerror(g_errno));
// disown it here so we do not damage in case it gets
// reopened by someone else
st->m_socket = NULL;
}
// do we need to launch another batch of summary requests?
@ -2200,8 +2222,9 @@ bool Msg40::gotSummary ( ) {
//mdelete(st, sizeof(State0), "msg40st0");
//delete st;
// otherwise, all done!
log("msg40: did not send stuff from last summary. BUG "
"this=0x%"PTRFMT"",(PTRTYPE)this);
log("msg40: did not send last search result summary. "
"this=0x%"PTRFMT" because had error: %s",(PTRTYPE)this,
mstrerror(m_socketHadError));
return true;
}

@ -531,6 +531,10 @@ bool Msg5::readList ( ) {
int32_t niceness = m_niceness;
if ( niceness > 0 ) niceness = 2;
if ( m_isRealMerge ) niceness = 1;
bool allowPageCache = true;
// just in case cache is corrupted, do not use it for doing real
// merges, also it would kick out good lists we have in there already
if ( m_isRealMerge ) allowPageCache = false;
if ( compute ) {
m_msg3.readList ( m_rdbId ,
m_collnum ,
@ -547,7 +551,7 @@ bool Msg5::readList ( ) {
m_compensateForMerge ,
-1,//m_syncPoint ,
true , // just get endKey?
m_allowPageCache );
allowPageCache );
if ( g_errno ) {
log("db: Msg5: getting endKey: %s",mstrerrno(g_errno));
return true;

@ -68,8 +68,12 @@ bool sendReply ( State0 *st , char *reply ) {
int32_t savedErr = g_errno;
TcpSocket *s = st->m_socket;
if ( ! s ) { char *xx=NULL;*xx=0; }
TcpSocket *sock = st->m_socket;
if ( ! sock ) {
log("results: not sending back results on an empty socket."
"socket must have closed on us abruptly.");
//char *xx=NULL;*xx=0; }
}
SearchInput *si = &st->m_si;
char *ct = "text/html";
if ( si && si->m_format == FORMAT_XML ) ct = "text/xml";
@ -143,7 +147,8 @@ bool sendReply ( State0 *st , char *reply ) {
//
// send back the actual search results
//
g_httpServer.sendDynamicPage(s,
if ( sock )
g_httpServer.sendDynamicPage(sock,
reply,
rlen,//gbstrlen(reply),
// don't let the ajax re-gen
@ -199,9 +204,9 @@ bool sendReply ( State0 *st , char *reply ) {
// if we had a broken pipe from the browser while sending
// them the search results, then we end up closing the socket fd
// in TcpServer::sendChunk() > sendMsg() > destroySocket()
if ( s->m_numDestroys ) {
if ( sock && sock->m_numDestroys ) {
log("results: not sending back error on destroyed socket "
"sd=%"INT32"",s->m_sd);
"sd=%"INT32"",sock->m_sd);
return true;
}
@ -212,7 +217,8 @@ bool sendReply ( State0 *st , char *reply ) {
savedErr == ENOCOLLREC)
status = 400;
g_httpServer.sendQueryErrorReply(s,
if ( sock )
g_httpServer.sendQueryErrorReply(sock,
status,
mstrerror(savedErr),
format,//xml,
@ -542,6 +548,9 @@ bool sendPageResults ( TcpSocket *s , HttpRequest *hr ) {
// set this in case SearchInput::set fails!
st->m_socket = s;
// record timestamp so we know if we got our socket closed and swapped
st->m_socketStartTimeHack = s->m_startTime;
// save this count so we know if TcpServer.cpp calls destroySocket(s)
st->m_numDestroys = s->m_numDestroys;
@ -1154,6 +1163,16 @@ bool gotResults ( void *state ) {
SearchInput *si = &st->m_si;
// if we lost the socket because we were streaming and it
// got closed from a broken pipe or something, then Msg40.cpp
// will set st->m_socket to NULL if the fd ends up ending closed
// because someone else might be using it and we do not want to
// mess with their TcpSocket settings.
if ( ! st->m_socket ) {
log("results: socket is NULL. sending failed.");
return sendReply(st,NULL);
}
// if in streaming mode and we never sent anything and we had
// an error, then send that back. we never really entered streaming
// mode in that case. this happens when someone deletes a coll

@ -63,6 +63,7 @@ public:
bool m_didRedownload;
XmlDoc *m_xd;
int32_t m_oldContentHash32;
int64_t m_socketStartTimeHack;
};

@ -442,7 +442,8 @@ bool RdbCache::getRecord ( collnum_t collnum ,
if ( m_numPtrsMax <= 0 ) return false;
// if init() called failed because of oom...
if ( ! m_ptrs )
return log("cache: getRecord: failed because oom");
//return log("cache: getRecord: failed because oom");
return false;
// time it -- debug
int64_t t = 0LL ;
if ( g_conf.m_logTimingDb ) t = gettimeofdayInMillisecondsLocal();
@ -779,14 +780,15 @@ bool RdbCache::addRecord ( collnum_t collnum ,
int32_t timestamp ,
char **retRecPtr ) {
// bail if cache empty. maybe m_maxMem is 0.
if ( m_totalBufSize <= 0 ) return true;
//int64_t startTime = gettimeofdayInMillisecondsLocal();
if ( collnum < (collnum_t)0) {char *xx=NULL;*xx=0; }
if ( collnum >= m_maxColls ) {char *xx=NULL;*xx=0; }
// full key not allowed because we use that in markDeletedRecord()
if ( KEYCMP(cacheKey,KEYMAX(),m_cks) == 0 ) { char *xx=NULL;*xx=0; }
// bail if cache empty
if ( m_totalBufSize <= 0 ) return true;
// debug msg
int64_t t = 0LL ;
if ( g_conf.m_logTimingDb ) t = gettimeofdayInMillisecondsLocal();

@ -9971,7 +9971,7 @@ bool sendPage ( State11 *st ) {
// the the waiting tree
int32_t node = sc->m_waitingTree.getFirstNode();
int32_t count = 0;
uint64_t nowMS = gettimeofdayInMillisecondsGlobal();
//uint64_t nowMS = gettimeofdayInMillisecondsGlobal();
for ( ; node >= 0 ; node = sc->m_waitingTree.getNextNode(node) ) {
// breathe
QUICKPOLL(MAX_NICENESS);
@ -9987,10 +9987,12 @@ bool sendPage ( State11 *st ) {
spiderTimeMS |= (key->n0 >> 32);
char *note = "";
// if a day more in the future -- complain
if ( spiderTimeMS > nowMS + 1000 * 86400 )
note = " (<b><font color=red>This should not be "
"this far into the future. Probably a corrupt "
"SpiderRequest?</font></b>)";
// no! we set the repeat crawl to 3000 days for crawl jobs that
// do not repeat...
// if ( spiderTimeMS > nowMS + 1000 * 86400 )
// note = " (<b><font color=red>This should not be "
// "this far into the future. Probably a corrupt "
// "SpiderRequest?</font></b>)";
// get the rest of the data
sb.safePrintf("<tr bgcolor=#%s>"
"<td>%"INT64"%s</td>"

@ -5,6 +5,7 @@
#include "Profiler.h"
#include "PingServer.h"
//#include "AutoBan.h"
#include "Hostdb.h"
// . TODO: deleting nodes from under Loop::callCallbacks is dangerous!!
@ -593,6 +594,17 @@ bool TcpServer::sendMsg ( int32_t ip ,
// return true if s is NULL and g_errno was set by getNewSocket()
// might set g_errno to EOUTOFSOCKETS
if ( ! s ) { mfree ( sendBuf , sendBufSize,"TcpServer"); return true; }
// debug to find why sockets getting diffbot replies get commandeered.
// we think that they are using an sd used by a streaming socket,
// who closed, but then proceed to use TcpSocket class as if he
// had not closed it.
if ( 1==2 && g_hostdb.m_hostId == 0 ) {
SafeBuf sb;
sb.safePrintf("tcp: open newsd=%i sendbuf=",s->m_sd);
sb.safeTruncateEllipsis (sendBuf,sendBufSize,200);
log("%s",sb.getBufStart());
}
// set up the new TcpSocket for connecting
s->m_state = state;
s->m_callback = callback;
@ -846,6 +858,7 @@ TcpSocket *TcpServer::getNewSocket ( ) {
// . TODO: ensure this blocks even if sd was set nonblock by wrapSock()
if ( ! s ) {
if ( sd == 0 ) log("tcp: closing1 sd of 0");
log("tcp: wrapsocket2 returned null for sd=%i",(int)sd);
if ( ::close(sd) == -1 )
log("tcp: close2(%"INT32") = %s",(int32_t)sd,mstrerror(errno));
else {
@ -1732,6 +1745,8 @@ void writeSocketWrapper ( int sd , void *state ) {
bool wasStreaming = s->m_streamingMode;
// otherwise, call callback on done writing or error
// MDW: if we close the socket descriptor, then a getdiffbotreply
// gets it, we have to know.
THIS->makeCallback ( s );
// if callback changed socket status to ST_SEND_AGAIN
@ -1921,7 +1936,7 @@ int32_t TcpServer::writeSocket ( TcpSocket *s ) {
// another debug
//if ( g_conf.m_logDebugTcp )
log("tcp: only wrote %"INT32" of %"INT32" bytes "
"tried.",n,toSend);
"tried. sd=%i",n,toSend,s->m_sd);
// need to listen for writability now since our write
// failed to write everythin gout
if ( ! s->m_writeRegistered &&
@ -2260,6 +2275,32 @@ void TcpServer::destroySocket ( TcpSocket *s ) {
// if sd is 0 do not really close it. seems to fix that bug.
// 0 is the FD for stdin so i don't know how that is happening.
if ( sd != 0 ) cret = ::close ( sd );
if ( 1==2 && g_hostdb.m_hostId == 0 ) {
SafeBuf sb;
sb.safePrintf("tcp: closing sd=%i bytessent=%i "
"sendbufused=%i streaming=%i "
"sendbuf=",
s->m_sd,
s->m_sendOffset,
s->m_sendBufUsed,
(int)s->m_streamingMode);
if ( s->m_sendBuf )
sb.safeTruncateEllipsis(s->m_sendBuf,
s->m_sendBufSize,
200);
sb.safePrintf(" readbuf=");
if ( s->m_readBuf )
sb.safeTruncateEllipsis(s->m_readBuf,
s->m_readBufSize,
200);
log("%s",sb.getBufStart());
}
// force it out of streaming mode since we closed it. then we
// should avoid the "not timing out streaming socket fd=123" msgs.
s->m_streamingMode = false;
if ( cret != 0 ) { // == -1 )
log("tcp: s=%"PTRFMT" close(%"INT32") = %"INT32" = %s",
(PTRTYPE)s,(int32_t)sd,cret,mstrerror(errno));
@ -2272,7 +2313,7 @@ void TcpServer::destroySocket ( TcpSocket *s ) {
// log("tcp: closing sock %i (open=%"INT32")",sd,
// m_numOpen-m_numClosed);
// set it negative to try to fix the double close while
// streaming bug.
// streaming bug. -sd -m_sd m_sd = m_sd=
if ( s->m_sd > 0 ) s->m_sd *= -1;
}
// a 2nd close? it should return -1 with errno set!
@ -2574,6 +2615,18 @@ TcpSocket *TcpServer::acceptSocket ( ) {
if ( g_conf.m_logDebugTcp )
logf(LOG_DEBUG,"tcp: ...... accepted sd=%"INT32"",(int32_t)newsd);
// debug to find why sockets getting diffbot replies get commandeered.
// we think that they are using an sd used by a streaming socket,
// who closed, but then proceed to use TcpSocket class as if he
// had not closed it.
if ( 1==2 && g_hostdb.m_hostId == 0 ) {
SafeBuf sb;
sb.safePrintf("tcp: accept newsd=%i incoming req",newsd);
//sb.safeTruncateEllipsis (sendBuf,sendBufSize,200);
log("%s",sb.getBufStart());
}
// ssl debug!
//log("tcp: accept returned fd=%i",newsd);
@ -2621,6 +2674,7 @@ TcpSocket *TcpServer::acceptSocket ( ) {
if ( ! s ) {
//log("tcp: wrapsocket returned null fd=%i",newsd);
if ( newsd == 0 ) log("tcp: closing sd of 0");
log("tcp: wrapsocket1 returned null for sd=%i",(int)newsd);
if ( ::close(newsd)== -1 )
log("tcp: close2(%"INT32") = %s",
(int32_t)newsd,mstrerror(errno));
@ -2726,7 +2780,8 @@ bool TcpServer::sslAccept ( TcpSocket *s ) {
void TcpServer::makeCallback ( TcpSocket * s ) {
if ( ! s->m_callback ) {
// note it
log("tcp: null callback for s=0x%"PTRFMT"",(PTRTYPE)s);
if ( g_conf.m_logDebugTcp )
log("tcp: null callback for s=0x%"PTRFMT"",(PTRTYPE)s);
return;
}
// record times for profiler
@ -2777,7 +2832,8 @@ bool TcpServer::sendChunk ( TcpSocket *s ,
// sendChunk() again.
void (* doneSendingWrapper)( void *,TcpSocket *)){
log("tcp: sending chunk of %"INT32" bytes", sb->length() );
log("tcp: sending chunk of %"INT32" bytes sd=%i", sb->length() ,
s->m_sd );
// if socket had shit on there already, free that memory
// just like TcpServer::destroySocket would
@ -2818,6 +2874,11 @@ bool TcpServer::sendChunk ( TcpSocket *s ,
log("tcp: chunkend=%s",sb->getBuf() - minus);
*/
// char *p = sb->getBufStart();
// char *pend = p + sb->length();
// for ( ; p < pend ; p++ ) {
// if ( *p == '\0' ) { char *xx=NULL;*xx=0; }
// }
// . start the send process
// . returns false if send did not complete