fixes for streaming mode.

This commit is contained in:
Matt Wells
2014-02-06 16:28:42 -08:00
parent 5787b15884
commit 874311ae52
4 changed files with 59 additions and 19 deletions

@ -1298,6 +1298,7 @@ bool Msg40::gotSummary ( ) {
m_printi++){
// if we are waiting on our previous send to complete... wait..
if ( m_sendsOut > m_sendsIn ) break;
// otherwise, get the summary for result #m_printi
Msg20 *m20 = m_msg20[m_printi];
if ( ! m20 ) {
@ -1349,7 +1350,20 @@ bool Msg40::gotSummary ( ) {
m_printi >= m_msg3a.m_numDocIds ) {
m_printedTail = true;
printSearchResultsTail ( st );
m_lastChunk = true;
}
// . if everything has been sent on the socket, then we are done!
// . we are likely being called from TcpServer::writeSOcketWrapper()
// calling makeCallback()
if ( m_si &&
m_si->m_streamResults &&
m_sendsIn >= m_sendsOut &&
sb->length() == 0 && m_printedTail ) {
// this will cause the socket to be destroyed immediately!
// and we are only here because our last write completed!
st->m_socket->m_streamingMode = false;
return true;
}
@ -1368,8 +1382,7 @@ bool Msg40::gotSummary ( ) {
! tcp->sendChunk ( st->m_socket ,
sb ,
this ,
doneSendingWrapper9 ,
m_lastChunk ) )
doneSendingWrapper9 ) )
// if it blocked, inc this count. we'll only call m_callback
// above when m_sendsIn equals m_sendsOut... and
// m_numReplies == m_numRequests

@ -557,10 +557,6 @@ bool StateCD::sendList ( ) {
// (long)m_rdbId,(long)m_fmt,(long)m_someoneNeedsMore,
// (long)m_printedEndingBracket);
bool lastChunk = false;
if ( ! m_someoneNeedsMore )
lastChunk = true;
// if nobody needs to read more...
if ( m_rdbId == RDB_TITLEDB &&
m_fmt == FMT_JSON &&
@ -572,6 +568,15 @@ bool StateCD::sendList ( ) {
//log("adding ]. len=%li",sb.length());
}
if ( ! m_someoneNeedsMore && sb.length() == 0 ) {
// i guess the send has completed. we are likely being
// called by TcpServer::writeSocketWrapper() makeCallback()
// so take us out of streaming mode so socket can be
// immediately destroyed
m_socket->m_streamingMode = false;
return true;
}
TcpServer *tcp = &g_httpServer.m_tcp;
// . transmit the chunk in sb
@ -581,13 +586,10 @@ bool StateCD::sendList ( ) {
// . when TcpServer is done transmitting, it does not close the
// socket but rather calls doneSendingWrapper() which can call
// this function again to send another chunk
// . when we are truly done sending all the data, then we set lastChunk
// to true and TcpServer.cpp will destroy m_socket when done
if ( ! tcp->sendChunk ( m_socket ,
&sb ,
this ,
doneSendingWrapper ,
lastChunk ) )
doneSendingWrapper ) )
return false;
// we are done sending this chunk, i guess tcp write was cached

@ -1455,7 +1455,12 @@ void writeSocketWrapper ( int sd , void *state ) {
if ( status == 1 && ! s->m_readBuf ) return;
// good?
g_errno = 0;
// otherwise, call callback on done writing or error
// . otherwise, call callback on done writing or error
// . in m_streamingMode this may call another sendChunk()!!!
// OR it may set streamingMode to false.. it can only do one or
// the other and not both!!! because if it sets streamingMode to
// false then we destroy the socket below!!!! so it can't be
// sending anything new!!!
THIS->makeCallback ( s );
// if callback changed socket status to ST_SEND_AGAIN
@ -1494,6 +1499,8 @@ long TcpServer::writeSocket ( TcpSocket *s ) {
loop:
// send some stuff
long toSend = s->m_sendBufUsed - s->m_sendOffset;
// if nothing to send we are done!
if ( ! toSend ) return 1;
// get a ptr to the msg piece to send
char *msg = s->m_sendBuf;
if ( ! msg ) return 1;
@ -1545,7 +1552,11 @@ long TcpServer::writeSocket ( TcpSocket *s ) {
s->m_sendOffset += n;
// . if we sent less than we tried to send then block
// . we should be notified via sig/callback when we can send the rest
if ( n < toSend ) return 0;
if ( n < toSend ) {
//if ( g_conf.m_logDebugTcp )
// log(".... Tcpserver: %li<%li",n,toSend);
return 0;
}
// . we sent all we were asked to, but our sendBuf may need a refill
// . call this routine to refill it
if ( s->m_totalSent < s->m_totalToSend ) {
@ -2248,8 +2259,7 @@ bool TcpServer::sendChunk ( TcpSocket *s ,
// call this function when done sending this chunk
// so that it can read another chunk and call
// sendChunk() again.
void (* doneSendingWrapper)( void *,TcpSocket *) ,
bool lastChunk ) {
void (* doneSendingWrapper)( void *,TcpSocket *)){
log("tcp: sending chunk of %li bytes", sb->length() );
@ -2264,10 +2274,26 @@ bool TcpServer::sendChunk ( TcpSocket *s ,
s->m_sendOffset = 0;
s->m_totalSent = 0;
s->m_totalToSend = 0;
s->m_totalSent = 0;
// let it know not to close the socket while this is set
if ( ! lastChunk ) s->m_streamingMode = true;
else s->m_streamingMode = false;
//if ( ! lastChunk ) s->m_streamingMode = true;
//else s->m_streamingMode = false;
s->m_streamingMode = true;
//g_conf.m_logDebugTcp = true;
long term = 20;
if ( sb->length() < term ) term = sb->length();
char *cp = sb->getBufStart() + term;
char c = *cp;
*cp = '\0';
log("tcp: chunkstart=%s",sb->getBufStart());
*cp = c;
long minus = 20;
if ( sb->length() < minus ) minus = sb->length() ;
log("tcp: chunkend=%s",sb->getBuf() - minus);
// . start the send process
// . returns false if send did not complete

@ -105,8 +105,7 @@ class TcpServer {
// call this function when done sending this chunk
// so that it can read another chunk and call
// sendChunk() again.
void (* doneSendingWrapper)( void *state,TcpSocket *),
bool lastChunk );
void (* doneSendingWrapper)(void *state,TcpSocket *));
// . returns false if blocked, true otherwise
// . sets errno on error