Simplify logic a bit about streaming results in Msg40::gotSummary()

This commit is contained in:
Ivan Skytte Jørgensen
2017-02-03 14:12:48 +01:00
parent ef22f6eb86
commit d5777114f3

294
Msg40.cpp

@ -1174,164 +1174,164 @@ bool Msg40::gotSummary ( ) {
st->m_sb.reset();
// this is in PageResults.cpp
if ( m_si->m_streamResults && ! m_printedHeader ) {
// only print header once
m_printedHeader = true;
printHttpMime(m_si->m_format,&st->m_sb);
printSearchResultsHeader ( st );
}
if(m_si->m_streamResults) {
// this is in PageResults.cpp
if ( ! m_printedHeader ) {
// only print header once
m_printedHeader = true;
printHttpMime(m_si->m_format,&st->m_sb);
printSearchResultsHeader ( st );
}
for ( ; m_si->m_streamResults && m_printi<m_msg3a.m_numDocIds ; m_printi++) {
// if we are waiting on our previous send to complete... wait..
if ( m_sendsOut > m_sendsIn ) break;
for ( ; m_printi<m_msg3a.m_numDocIds ; m_printi++) {
// if we are waiting on our previous send to complete... wait..
if ( m_sendsOut > m_sendsIn ) break;
// get summary for result #m_printi
Msg20 *m20 = getCompletedSummary ( m_printi );
// get summary for result #m_printi
Msg20 *m20 = getCompletedSummary ( m_printi );
// if result summary #i not yet in, wait...
if ( ! m20 )
break;
// if result summary #i not yet in, wait...
if ( ! m20 )
break;
if ( m20->m_errno ) {
log("msg40: sum #%" PRId32" error: %s",
m_printi,mstrerror(m20->m_errno));
// make it available to be reused
if ( m20->m_errno ) {
log("msg40: sum #%" PRId32" error: %s",
m_printi,mstrerror(m20->m_errno));
// make it available to be reused
m20->reset();
continue;
}
// get the next reply we are waiting on to print results order
Msg20Reply *mr = m20->m_r;
if ( ! mr ) break;
// primitive deduping. for diffbot json exclude url's from the
// XmlDoc::m_contentHash32.. it will be zero if invalid i guess
if ( m_si->m_doDupContentRemoval && // &dr=1
mr->m_contentHash32 &&
// do not dedup CT_STATUS results, those are
// spider reply "documents" that indicate the last
// time a doc was spidered and the error code or success
// code
mr->m_contentType != CT_STATUS &&
m_dedupTable.isInTable ( &mr->m_contentHash32 ) ) {
//if ( g_conf.m_logDebugQuery )
log("msg40: dup sum #%" PRId32" (%" PRIu32")"
"(d=%" PRId64")",m_printi,
mr->m_contentHash32,mr->m_docId);
// make it available to be reused
m20->reset();
continue;
}
// return true with g_errno set on error
if ( m_si->m_doDupContentRemoval && // &dr=1
mr->m_contentHash32 &&
// do not dedup CT_STATUS results, those are
// spider reply "documents" that indicate the last
// time a doc was spidered and the error code or success
// code
mr->m_contentType != CT_STATUS &&
! m_dedupTable.addKey ( &mr->m_contentHash32 ) ) {
log("msg40: error adding to dedup table: %s",
mstrerror(g_errno));
}
// assume we show this to the user
m_numDisplayed++;
//log("msg40: numdisplayed=%" PRId32,m_numDisplayed);
// do not print it if before the &s=X start position though
if ( m_numDisplayed <= m_si->m_firstResultNum ){
if ( m_printCount == 0 )
log("msg40: hiding #%" PRId32" (%" PRIu32")"
"(d=%" PRId64")",
m_printi,mr->m_contentHash32,mr->m_docId);
m_printCount++;
if ( m_printCount == 100 ) m_printCount = 0;
m20->reset();
continue;
}
// . ok, we got it, so print it and stream it
// . this might set m_hadPrintError to true
printSearchResult9 ( m_printi , &m_numPrintedSoFar , mr );
// return it so getAvailMsg20() can use it again
// this will set m_launched to false
m20->reset();
continue;
}
// get the next reply we are waiting on to print results order
Msg20Reply *mr = m20->m_r;
if ( ! mr ) break;
// . set it to true on all but the last thing we send!
// . after each chunk of data we send out, TcpServer::sendChunk
// will call our callback, doneSendingWrapper9
if ( st->m_socket )
st->m_socket->m_streamingMode = true;
// primitive deduping. for diffbot json exclude url's from the
// XmlDoc::m_contentHash32.. it will be zero if invalid i guess
if ( m_si->m_doDupContentRemoval && // &dr=1
mr->m_contentHash32 &&
// do not dedup CT_STATUS results, those are
// spider reply "documents" that indicate the last
// time a doc was spidered and the error code or success
// code
mr->m_contentType != CT_STATUS &&
m_dedupTable.isInTable ( &mr->m_contentHash32 ) ) {
//if ( g_conf.m_logDebugQuery )
log("msg40: dup sum #%" PRId32" (%" PRIu32")"
"(d=%" PRId64")",m_printi,
mr->m_contentHash32,mr->m_docId);
// make it available to be reused
m20->reset();
continue;
// if streaming results, and too many results were clustered or
// deduped then try to get more by merging the docid lists that
// we already have from the shards. if this still does not provide
// enough docids then we will need to issue a new msg39 request to
// each shard to get even more docids from each shard.
if ( // this is coring as well on multi collection federated searches
// so disable that for now too. it is because Msg3a::m_r is
// NULL.
m_numCollsToSearch == 1 &&
// must have no streamed chunk sends out
m_sendsOut == m_sendsIn &&
// if we did not ask for enough docids and they were mostly
// dups so they got deduped, then ask for more.
// m_numDisplayed includes results before the &s=X parm.
// and so does m_docsToGetVisiable, so we can compare them.
m_numDisplayed < m_docsToGetVisible &&
// wait for us to have exhausted the docids we have merged
m_printi >= m_msg3a.m_numDocIds &&
// wait for us to have available msg20s to get summaries
m_numReplies == m_numRequests &&
// this is true if we can get more docids from merging
// more of the termlists from the shards together.
// otherwise, we will have to ask each shard for a
// higher number of docids.
m_msg3a.m_moreDocIdsAvail &&
// do not do this if client closed connection
! m_socketHadError ) { //&&
// can it cover us?
int32_t need = m_msg3a.m_docsToGet + 20;
// note it
log("msg40: too many summaries deduped. "
"getting more "
"docids from msg3a merge and getting summaries. "
"%" PRId32" are visible, need %" PRId32". "
"changing docsToGet from %" PRId32" to %" PRId32". "
"numReplies=%" PRId32" numRequests=%" PRId32,
m_numDisplayed,
m_docsToGetVisible,
m_msg3a.m_docsToGet,
need,
m_numReplies,
m_numRequests);
// merge more docids from the shards' termlists
m_msg3a.m_docsToGet = need;
// this should increase m_msg3a.m_numDocIds
m_msg3a.mergeLists();
}
// return true with g_errno set on error
if ( m_si->m_doDupContentRemoval && // &dr=1
mr->m_contentHash32 &&
// do not dedup CT_STATUS results, those are
// spider reply "documents" that indicate the last
// time a doc was spidered and the error code or success
// code
mr->m_contentType != CT_STATUS &&
! m_dedupTable.addKey ( &mr->m_contentHash32 ) ) {
log("msg40: error adding to dedup table: %s",
mstrerror(g_errno));
// . wrap it up with Next 10 etc.
// . this is in PageResults.cpp
if ( ! m_printedTail &&
m_printi >= m_msg3a.m_numDocIds ) {
m_printedTail = true;
printSearchResultsTail ( st );
if ( m_sendsIn < m_sendsOut ) { g_process.shutdownAbort(true); }
if ( g_conf.m_logDebugTcp )
log("tcp: disabling streamingMode now");
// this will be our final send
if ( st->m_socket ) st->m_socket->m_streamingMode = false;
}
// assume we show this to the user
m_numDisplayed++;
//log("msg40: numdisplayed=%" PRId32,m_numDisplayed);
// do not print it if before the &s=X start position though
if ( m_numDisplayed <= m_si->m_firstResultNum ){
if ( m_printCount == 0 )
log("msg40: hiding #%" PRId32" (%" PRIu32")"
"(d=%" PRId64")",
m_printi,mr->m_contentHash32,mr->m_docId);
m_printCount++;
if ( m_printCount == 100 ) m_printCount = 0;
m20->reset();
continue;
}
// . ok, we got it, so print it and stream it
// . this might set m_hadPrintError to true
printSearchResult9 ( m_printi , &m_numPrintedSoFar , mr );
// return it so getAvailMsg20() can use it again
// this will set m_launched to false
m20->reset();
}
// . set it to true on all but the last thing we send!
// . after each chunk of data we send out, TcpServer::sendChunk
// will call our callback, doneSendingWrapper9
if ( m_si->m_streamResults && st->m_socket )
st->m_socket->m_streamingMode = true;
// if streaming results, and too many results were clustered or
// deduped then try to get more by merging the docid lists that
// we already have from the shards. if this still does not provide
// enough docids then we will need to issue a new msg39 request to
// each shard to get even more docids from each shard.
if ( m_si->m_streamResults &&
// this is coring as well on multi collection federated searches
// so disable that for now too. it is because Msg3a::m_r is
// NULL.
m_numCollsToSearch == 1 &&
// must have no streamed chunk sends out
m_sendsOut == m_sendsIn &&
// if we did not ask for enough docids and they were mostly
// dups so they got deduped, then ask for more.
// m_numDisplayed includes results before the &s=X parm.
// and so does m_docsToGetVisiable, so we can compare them.
m_numDisplayed < m_docsToGetVisible &&
// wait for us to have exhausted the docids we have merged
m_printi >= m_msg3a.m_numDocIds &&
// wait for us to have available msg20s to get summaries
m_numReplies == m_numRequests &&
// this is true if we can get more docids from merging
// more of the termlists from the shards together.
// otherwise, we will have to ask each shard for a
// higher number of docids.
m_msg3a.m_moreDocIdsAvail &&
// do not do this if client closed connection
! m_socketHadError ) { //&&
// can it cover us?
int32_t need = m_msg3a.m_docsToGet + 20;
// note it
log("msg40: too many summaries deduped. "
"getting more "
"docids from msg3a merge and getting summaries. "
"%" PRId32" are visible, need %" PRId32". "
"changing docsToGet from %" PRId32" to %" PRId32". "
"numReplies=%" PRId32" numRequests=%" PRId32,
m_numDisplayed,
m_docsToGetVisible,
m_msg3a.m_docsToGet,
need,
m_numReplies,
m_numRequests);
// merge more docids from the shards' termlists
m_msg3a.m_docsToGet = need;
// this should increase m_msg3a.m_numDocIds
m_msg3a.mergeLists();
}
// . wrap it up with Next 10 etc.
// . this is in PageResults.cpp
if ( m_si->m_streamResults &&
! m_printedTail &&
m_printi >= m_msg3a.m_numDocIds ) {
m_printedTail = true;
printSearchResultsTail ( st );
if ( m_sendsIn < m_sendsOut ) { g_process.shutdownAbort(true); }
if ( g_conf.m_logDebugTcp )
log("tcp: disabling streamingMode now");
// this will be our final send
if ( st->m_socket ) st->m_socket->m_streamingMode = false;
}
} //m_si->m_streamResults
TcpServer *tcp = &g_httpServer.m_tcp;