#include "gb-include.h"

#include "Msg40.h"
#include "Stats.h"        // for timing and graphing time to get all summaries
#include "Collectiondb.h"
#include "LanguageIdentifier.h"
#include "sort.h"
#include "matches2.h"
#include "XmlDoc.h" // computeSimilarity()
#include "Speller.h"
#include "Wiki.h"
#include "HttpServer.h"
#include "PageResults.h"
#include "HashTable.h"
#include "AdultCheck.h"
#include "Process.h"
#include "UrlRealtimeClassification.h"
#include "UdpServer.h"
#include "Conf.h"
#include "GbMutex.h"
#include "ScopedLock.h"
#include "Mem.h"
#include "ScopedLock.h"
#include <new>


// increasing this doesn't seem to improve performance any on a single
// node cluster....
#define MAX_OUTSTANDING_MSG20S 200

static bool printHttpMime(int32_t format, SafeBuf *sb);

static void gotDocIdsWrapper             ( void *state );
static bool gotSummaryWrapper            ( void *state );

static bool isVariantLikeSubDomain(const char *s, int32_t len);

Msg40::Msg40()
  : m_numRealtimeClassificationsStarted(0),
    m_numRealtimeClassificationsCompleted(0),
    m_mtxRealtimeClassificationsCounters(),
    m_realtimeClassificationsSubmitted(false)
{
	m_socketHadError = 0;
	m_buf           = NULL;
	m_buf2          = NULL;
	m_msg20         = NULL;
	m_numMsg20s     = 0;
	m_msg20StartBuf = NULL;
	m_numToFree     = 0;
	// new stuff for streaming results:
	m_numPrinted    = 0;
	m_printedHeader = false;
	m_printedTail   = false;
	m_sendsOut      = 0;
	m_sendsIn       = 0;
	m_printi        = 0;
	m_numDisplayed  = 0;
	m_numPrintedSoFar = 0;
	m_didSummarySkip = false;
	m_omitCount      = 0;
	m_printCount = 0;
	m_numCollsToSearch = 0;
	m_numMsg20sIn = 0;
	m_numMsg20sOut = 0;

	// Coverity
	m_msg3aRecallCnt = 0;
	m_docsToGet = 0;
	m_docsToGetVisible = 0;
	m_state = NULL;
	m_callback = NULL;
	m_numRequests = 0;
	m_numReplies = 0;
	m_moreToCome = false;
	m_lastProcessedi = 0;
	m_startTime = 0;
	m_cachedTime = 0;
	m_tasksRemaining = 0;
	m_bufMaxSize = 0;
	m_bufMaxSize2 = 0;
	m_errno = 0;
	m_si = NULL;
	m_msg3aPtrs = NULL;
	m_num3aRequests = 0;
	m_num3aReplies = 0;
	m_firstCollnum = 0;
}

void Msg40::resetBuf2 ( ) {
	// remember num to free in reset() function
	char *p = m_msg20StartBuf;
	// msg20 destructors
	for ( int32_t i = 0 ; i < m_numToFree ; i++ ) {
		// cast it
		Msg20 *m = (Msg20 *)p;
		// free its stuff
		m->~Msg20();
		// advance
		p += sizeof(Msg20);
	}
	// now free the msg20 ptrs and buffer space
	if ( m_buf2 ) mfree ( m_buf2 , m_bufMaxSize2 , "Msg40b" );
	m_buf2 = NULL;
}

Msg40::~Msg40() {
	// free tmp msg3as now
	for ( int32_t i = 0 ; i < m_numCollsToSearch ; i++ ) {
		if ( ! m_msg3aPtrs[i] ) continue;
		if ( m_msg3aPtrs[i] == &m_msg3a ) continue;
		mdelete ( m_msg3aPtrs[i] , sizeof(Msg3a), "tmsg3a");
		delete  ( m_msg3aPtrs[i] );
		m_msg3aPtrs[i] = NULL;
	}
	if ( m_buf  ) mfree ( m_buf  , m_bufMaxSize  , "Msg40" );
	m_buf  = NULL;
	resetBuf2();
}

// . returns false if blocked, true otherwise
// . sets g_errno on error
// . uses Msg3a to get docIds
// . uses many msg20s to get title/summary/url/docLen for each docId
bool Msg40::getResults ( SearchInput *si      ,
			 bool         forward ,
			 void        *state   ,
			 void   (* callback) ( void *state ) ) {

	m_omitCount = 0;

	// warning
	//if ( ! si->m_coll2 ) log(LOG_LOGIC,"net: NULL collection. msg40.");
	if ( si->m_collnumBuf.length() < (int32_t)sizeof(collnum_t) )
		log(LOG_LOGIC,"net: NULL collection. msg40.");


	m_lastProcessedi = -1;
	m_didSummarySkip = false;

	m_si             = si;
	m_state          = state;
	m_callback       = callback;
	m_msg3aRecallCnt = 0;
	// we haven't allocated any Msg20s yet
	m_numMsg20s      = 0;
	// reset our error keeper
	m_errno = 0;

	// take search parms i guess from first collnum
	const collnum_t *cp = (const collnum_t *)m_si->m_collnumBuf.getBufStart();

	// get the collection rec
	CollectionRec *cr =g_collectiondb.getRec( cp[0] );

	// g_errno should be set if not found
	if ( ! cr ) { g_errno = ENOCOLLREC; return true; }

	// save that
	m_firstCollnum = cr->m_collnum;

	// . reset these
	// . Next X Results links? yes or no?
	m_moreToCome = false;
	// set this to zero -- assume not in cache
	m_cachedTime = 0;

	// bail now if 0 requested!
	// crap then we don't stream anything if in streaming mode.
	if ( m_si->m_docsWanted == 0 ) {
		log("msg40: setting streamresults to false. n=0.");
		m_si->m_streamResults = false;
		return true;
	}

	// or if no query terms
	if ( m_si->m_q.m_numTerms <= 0 ) {
		log("msg40: setting streamresults to false. numTerms=0.");
		m_si->m_streamResults = false;
		return true;
	}

	//enforce hard limits
	if(m_si->m_docsWanted > g_conf.m_maxDocsWanted) {
		log(LOG_DEBUG,"msg40: limiting docs-wanted from %d to %d", m_si->m_docsWanted, g_conf.m_maxDocsWanted);
		m_si->m_docsWanted = g_conf.m_maxDocsWanted;
	}
	if(m_si->m_firstResultNum > g_conf.m_maxFirstResultNum) {
		log(LOG_DEBUG,"msg40: limiting docs-offset from %d to %d", m_si->m_firstResultNum, g_conf.m_maxFirstResultNum);
		m_si->m_firstResultNum = g_conf.m_maxFirstResultNum;
	}
	// how many docids do we need to get?
	int32_t get = m_si->m_docsWanted + m_si->m_firstResultNum ;
	// we get one extra for so we can set m_moreToFollow so we know
	// if more docids can be gotten (i.e. show a "Next 10" link)
	get++;

	// ok, need some sane limit though to prevent malloc from 
	// trying to get 7800003 docids and going ENOMEM
	if ( get > MAXDOCIDSTOCOMPUTE ) {
		log("msg40: asking for too many docids. reducing to %" PRId32,
		    (int32_t)MAXDOCIDSTOCOMPUTE);
		get = MAXDOCIDSTOCOMPUTE;
	}
	// this is how many visible results we need, after filtering/clustering
	m_docsToGetVisible = get;

	// . get a little more since this usually doesn't remove many docIds
	// . deduping is now done in Msg40.cpp once the summaries are gotten
	if ( m_si->m_doDupContentRemoval ) get = (get*120LL)/100LL;

	// . ALWAYS get at least this many
	// . this allows Msg3a to allow higher scoring docids in tier #1 to
	//   outrank lower-scoring docids in tier #0, even if such docids have
	//   all the query terms explicitly. and we can guarantee consistency
	//   as long as we only allow for this outranking within the first
	//   MIN_DOCS_TO_GET docids.
	if ( get < MIN_DOCS_TO_GET ) get = MIN_DOCS_TO_GET;
	// this is how many docids to get total, assuming that some will be
	// filtered out for being dups, etc. and that we will have at least
	// m_docsToGetVisible leftover that are unfiltered and visible. so
	// we tell each msg39 split to get more docids than we actually want
	// in anticipation some will be filtered out in this class.
	m_docsToGet = get;

	// debug msg
	if ( m_si->m_debug ) 
		logf(LOG_DEBUG,"query: msg40 mapped %" PRId32" wanted to %" PRId32" to get",
		     m_docsToGetVisible,m_docsToGet );

	// let's try using msg 0xfd like Proxy.cpp uses to forward an http
	// request! then we just need specify the ip of the proxy and we
	// do not need hosts2.conf!
	if ( forward ) { g_process.shutdownAbort(true); }

	// time the cache lookup
	if ( g_conf.m_logTimingQuery || m_si->m_debug || g_conf.m_logDebugQuery) 
		m_startTime = gettimeofdayInMilliseconds();

	// keep going
	bool status = prepareToGetDocIds ( );

	if ( status && m_si->m_streamResults ) {
		log("msg40: setting streamresults to false. "
		    "prepare did not block.");
		m_si->m_streamResults = false;
	}

	return status;
}

bool Msg40::prepareToGetDocIds ( ) {

	// log the time it took for cache lookup
	if ( g_conf.m_logTimingQuery || m_si->m_debug || g_conf.m_logDebugQuery) {
		int64_t now  = gettimeofdayInMilliseconds();
		int64_t took = now - m_startTime;
		logf(LOG_TIMING,"query: [%" PTRFMT"] Not found in cache. Lookup took %" PRId64" ms.",(PTRTYPE)this,took);
		m_startTime = now;
		logf(LOG_TIMING,"query: msg40: [%" PTRFMT"] Getting up to %" PRId32" (docToGet=%" PRId32") docids", (PTRTYPE)this,
		     m_docsToGetVisible,  m_docsToGet);
	}

	// . if query has dirty words and family filter is on, set
	//   number of results to 0, and set the m_queryClen flag to true
	// . m_qbuf1 should be the advanced/composite query
	if ( m_si->m_familyFilter && 
	     getAdultPoints ( m_si->m_sbuf1.getBufStart() , 
			      m_si->m_sbuf1.length() , 
			      NULL ) ) {
		// make sure the m_numDocIds gets set to 0
		m_msg3a.reset();
		return true;
	}

	return getDocIds( false );
}

// . returns false if blocked, true otherwise
// . sets g_errno on error
bool Msg40::getDocIds ( bool recall ) {

	////
	//
	// NEW CODE FOR LAUNCHING one MSG3a per collnum to search a token
	//
	////
	m_num3aReplies = 0;
	m_num3aRequests = 0;

	// how many are we searching? usually just one.
	m_numCollsToSearch = m_si->m_collnumBuf.length() /sizeof(collnum_t);

	// make enough for ptrs
	int32_t need = sizeof(Msg3a *) * m_numCollsToSearch;
	if ( ! m_msg3aPtrBuf.reserve ( need ) ) return true;
	// cast the mem buffer
	m_msg3aPtrs = (Msg3a **)m_msg3aPtrBuf.getBufStart();

	// clear these out so we do not free them when destructing
	for ( int32_t i = 0 ; i < m_numCollsToSearch ;i++ )
		m_msg3aPtrs[i] = NULL;

	// use first guy in case only one coll we are searching, the std case
	if ( m_numCollsToSearch <= 1 )
		m_msg3aPtrs[0] = &m_msg3a;

	return federatedLoop();
}

bool Msg40::federatedLoop ( ) {

	// search the provided collnums (collections)
	const collnum_t *cp = (const collnum_t *)m_si->m_collnumBuf.getBufStart();

	// we modified m_rcache above to be true if we should read from cache
	int32_t maxAge = 0 ;
	if ( m_si->m_rcache ) maxAge = g_conf.m_indexdbMaxIndexListAge;


	// reset it
	Msg39Request mr;
	mr.reset();

	mr.m_maxAge                    = maxAge;
	mr.m_addToCache                = m_si->m_wcache;
	mr.m_docsToGet                 = m_docsToGet;
	mr.m_niceness                  = m_si->m_niceness;
	mr.m_debug                     = m_si->m_debug          ;
	mr.m_getDocIdScoringInfo       = m_si->m_getDocIdScoringInfo;
	mr.m_doSiteClustering          = m_si->m_doSiteClustering    ;
	mr.m_hideAllClustered          = m_si->m_hideAllClustered;
	mr.m_familyFilter              = m_si->m_familyFilter;
	mr.m_doMaxScoreAlgo            = m_si->m_doMaxScoreAlgo;
	mr.m_scoringWeights.init(m_si->m_diversityWeightMin, m_si->m_diversityWeightMax,
				 m_si->m_densityWeightMin, m_si->m_densityWeightMax,
				 m_si->m_hashGroupWeightBody,
				 m_si->m_hashGroupWeightTitle,
				 m_si->m_hashGroupWeightHeading,
				 m_si->m_hashGroupWeightInlist,
				 m_si->m_hashGroupWeightInMetaTag,
				 m_si->m_hashGroupWeightInLinkText,
				 m_si->m_hashGroupWeightInTag,
				 m_si->m_hashGroupWeightNeighborhood,
				 m_si->m_hashGroupWeightInternalLinkText,
				 m_si->m_hashGroupWeightInUrl,
				 m_si->m_hashGroupWeightInMenu);

	mr.m_termFreqWeightFreqMin = m_si->m_termFreqWeightFreqMin;
	mr.m_termFreqWeightFreqMax = m_si->m_termFreqWeightFreqMax;
	mr.m_termFreqWeightMin = m_si->m_termFreqWeightMin;
	mr.m_termFreqWeightMax = m_si->m_termFreqWeightMax;

	mr.m_synonymWeight             = m_si->m_synonymWeight;
	mr.m_pageTemperatureWeightMin = m_si->m_pageTemperatureWeightMin;
	mr.m_pageTemperatureWeightMax = m_si->m_pageTemperatureWeightMax;

	mr.m_usePageTemperatureForRanking = m_si->m_usePageTemperatureForRanking;
	memcpy(mr.m_flagScoreMultiplier, m_si->m_flagScoreMultiplier, sizeof(mr.m_flagScoreMultiplier));
	memcpy(mr.m_flagRankAdjustment, m_si->m_flagRankAdjustment, sizeof(mr.m_flagRankAdjustment));
	mr.m_doDupContentRemoval       = m_si->m_doDupContentRemoval ;
	mr.m_queryExpansion            = m_si->m_queryExpansion; 
	mr.m_familyFilter              = m_si->m_familyFilter        ;
	mr.m_allowHighFrequencyTermCache = m_si->m_allowHighFrequencyTermCache;
	mr.m_language                  = (unsigned char)m_si->m_queryLangId;
	mr.ptr_query                   = const_cast<char*>(m_si->m_q.originalQuery());
	mr.size_query                  = strlen(m_si->m_q.originalQuery())+1;
	int32_t slen = 0; if ( m_si->m_sites ) slen=strlen(m_si->m_sites)+1;
	mr.ptr_whiteList               = m_si->m_sites;
	mr.size_whiteList              = slen;
	mr.m_timeout                   = g_conf.m_msg40_msg39_timeout;
	mr.m_realMaxTop                = m_si->m_realMaxTop;

	mr.m_minSerpDocId              = m_si->m_minSerpDocId;
	mr.m_maxSerpScore              = m_si->m_maxSerpScore;
	mr.m_sameLangWeight            = m_si->m_sameLangWeight;
	mr.m_unknownLangWeight = m_si->m_unknownLangWeight;
	memcpy(mr.m_queryId, m_si->m_queryId, sizeof(m_si->m_queryId));

	if ( mr.m_timeout < m_si->m_minMsg3aTimeout )
		mr.m_timeout = m_si->m_minMsg3aTimeout;
	
	//
	// how many docid splits should we do to avoid going OOM?
	//
	const CollectionRec *cr = g_collectiondb.getRec(m_firstCollnum);
	RdbBase *base = NULL;
	if ( cr ) g_titledb.getRdb()->getBase(cr->m_collnum);
	//NOTE: the above line is a bug, but the obvious fix causes numDocIdSplits to become huge (eg 200)
	int64_t numDocs = 0;
	if ( base ) numDocs = base->getNumTotalRecs();
	// for every 5M docids per host, lets split up the docid range
	// to avoid going OOM
	int32_t mult = numDocs / 5000000;
	if ( mult <= 0 ) mult = 1;

	int32_t nt = m_si->m_q.getNumTerms();
	int32_t numDocIdSplits = nt / 2; // ;/// 2;
	if ( numDocIdSplits <= 0 ) numDocIdSplits = 1;
	// and mult based on index size
	numDocIdSplits *= mult;

	if ( cr ) mr.m_maxQueryTerms = cr->m_maxQueryTerms; 
	else      mr.m_maxQueryTerms = 100;

	// limit numDocIdSplits to the range specified in the configuration
	if ( numDocIdSplits < g_conf.min_docid_splits ) 
		numDocIdSplits = g_conf.min_docid_splits;
	if ( numDocIdSplits > g_conf.max_docid_splits ) 
		numDocIdSplits = g_conf.max_docid_splits;
	log(LOG_DEBUG,"query: Msg40::federatedLoop: numDocIdSplits=%d", numDocIdSplits);
	// store it in the reuquest now
	mr.m_numDocIdSplits = numDocIdSplits;

	int32_t maxOutMsg3as = 1;

	// create new ones if searching more than 1 coll
	for ( int32_t i = m_num3aRequests ; i < m_numCollsToSearch ; i++ ) {

		// do not have more than this many outstanding
		if ( m_num3aRequests - m_num3aReplies >= maxOutMsg3as )
			// wait for it to return before launching another
			return false;

		// get it
		Msg3a *mp = m_msg3aPtrs[i];
		// stop if only searching one collection
		if ( ! mp ) {
			try { mp = new ( Msg3a); }
			catch(std::bad_alloc&) {
				g_errno = ENOMEM;
				return true;
			}
			mnew(mp,sizeof(Msg3a),"tm3ap");
		}
		// assign it
		m_msg3aPtrs[i] = mp;
		// assign the request for it
		gbmemcpy ( &mp->m_msg39req , &mr , sizeof(Msg39Request) );
		// then customize it to just search this collnum
		mp->m_msg39req.m_collnum = cp[i];

		// launch a search request
		m_num3aRequests++;
		// this returns false if it would block and will call callback
		// m_si is actually contained in State0 in PageResults.cpp
		// and Msg40::m_si points to that. so State0's destructor
		// should call SearchInput's destructor which calls
		// Query's destructor to destroy &m_si->m_q here when done.
		if(!mp->getDocIds(m_si,&m_si->m_q,this,gotDocIdsWrapper))
			continue;
		if ( g_errno && ! m_errno ) 
			m_errno = g_errno;
		m_num3aReplies++;
	}

	// call again w/o parameters now
	return gotDocIds ( );
}	

// . uses parameters assigned to local member vars above
// . returns false if blocked, true otherwise
// . sets g_errno on error
void gotDocIdsWrapper ( void *state ) {
	Msg40 *THIS = (Msg40 *) state;
	// if this blocked, it returns false
	//if ( ! checkTurnOffRAT ( state ) ) return;
	THIS->m_num3aReplies++;
	// try to launch more if there are more colls left to search
	if ( THIS->m_num3aRequests < THIS->m_numCollsToSearch ) {
		THIS->federatedLoop ( );
		return;
	}
	// return if this blocked
	if ( ! THIS->gotDocIds() ) return;
	// now call callback, we're done
	THIS->m_callback ( THIS->m_state );
}

// . return false if blocked, true otherwise
// . sets g_errno on error
bool Msg40::gotDocIds ( ) {

	// return now if still waiting for a msg3a reply to get in
	if ( m_num3aReplies < m_num3aRequests ) return false;


	// if searching over multiple collections let's merge their docids
	// into m_msg3a now before we go forward
	// this will set g_errno on error, like oom
	if ( ! mergeDocIdsIntoBaseMsg3a() )
		log("msg40: error: %s",mstrerror(g_errno));

	adjustRankingBasedOnFlags();

	// log the time it took for cache lookup
	int64_t now  = gettimeofdayInMilliseconds();

	if ( g_conf.m_logTimingQuery || m_si->m_debug||g_conf.m_logDebugQuery){
		int64_t took = now - m_startTime;
		logf(LOG_DEBUG,"query: msg40: [%" PTRFMT"] Got %" PRId32" docids in %" PRId64" ms",
		     (PTRTYPE)this,m_msg3a.getNumDocIds(),took);
		logf(LOG_DEBUG,"query: msg40: [%" PTRFMT"] Getting up to %" PRId32" summaries",
		     (PTRTYPE)this,m_docsToGetVisible);
	}

	// save any covered up error
	if ( ! m_errno && m_msg3a.m_errno ) m_errno = m_msg3a.m_errno;
	//sanity check.  we might not have allocated due to out of memory
	if ( g_errno ) { m_errno = g_errno; return true; }

	// time this
	m_startTime = gettimeofdayInMilliseconds();

	// we haven't got any Msg20 responses as of yet or sent any requests
	m_numRequests  =  0;
	m_numReplies   =  0;

	if ( ! m_urlTable.set ( m_msg3a.getNumDocIds() * 2 ) ) {
		m_errno = g_errno;
		log("query: Failed to allocate memory for url deduping. Not deduping search results.");
		return true;
	}

	// if only getting docids, skip summaries, and references
	if ( m_si->m_docIdsOnly ) return true;

	// . alloc buf to hold all m_msg20[i] ptrs and the Msg20s they point to
	// . returns false and sets g_errno/m_errno on error
	// . salvage any Msg20s that we can if we are being re-called
	if ( ! reallocMsg20Buf() ) return true;

	// . launch a bunch of task that depend on the docids we got
	// . keep track of how many are out
	m_tasksRemaining = 0;

	// debug msg
	if ( m_si->m_debug || g_conf.m_logDebugQuery )
		logf(LOG_DEBUG,"query: [%" PTRFMT"] Getting reference pages and dir pages.",
		     (PTRTYPE)this);

	return launchMsg20s ( false );
}

bool Msg40::mergeDocIdsIntoBaseMsg3a() {

	// only do this if we were searching multiple collections, otherwise
	// all the docids are already in m_msg3a
	if ( m_numCollsToSearch <= 1 ) return true;
	
	// free any mem in use
	m_msg3a.reset();

	// count total docids into "td"
	int32_t td = 0LL;
	for ( int32_t i = 0 ; i < m_numCollsToSearch ; i++ ) {
		Msg3a *mp = m_msg3aPtrs[i];
		td += mp->getNumDocIds();
		// reset cursor for list of docids from this collection
		mp->m_cursor = 0;
		// add up here too
		m_msg3a.m_numTotalEstimatedHits += mp->m_numTotalEstimatedHits;
	}

	// setup to to merge all msg3as into our one m_msg3a
	int32_t need = 0;
	need += td * 8;
	need += td * sizeof(double);
	need += td * sizeof(unsigned);
	need += td * sizeof(key96_t);
	need += td * 1;
	need += td * sizeof(collnum_t);
	// make room for the merged docids
	m_msg3a.m_finalBuf =  (char *)mmalloc ( need , "finalBuf" );
	m_msg3a.m_finalBufSize = need;
	// return true with g_errno set
	if ( ! m_msg3a.m_finalBuf ) return true;
	// parse the memory up into arrays
	char *p = m_msg3a.m_finalBuf;
	m_msg3a.m_docIds        = (int64_t *)p; p += td * 8;
	m_msg3a.m_scores        = (double    *)p; p += td * sizeof(double);
	m_msg3a.m_flags         = (unsigned*)p; p += td * sizeof(unsigned);
	m_msg3a.m_clusterRecs   = (key96_t     *)p; p += td * sizeof(key96_t);
	m_msg3a.m_clusterLevels = (char      *)p; p += td * 1;
	m_msg3a.m_scoreInfos    = NULL;
	m_msg3a.m_collnums      = (collnum_t *)p; p += td * sizeof(collnum_t);
	if ( p - m_msg3a.m_finalBuf != need ) { g_process.shutdownAbort(true); }

	m_msg3a.m_numDocIds = td;

	//
	// begin the collection merge
	//

	for(int32_t next = 0; next<td; next++) {
		// get next biggest score in the msg3as
		double max  = -1000000000.0;
		Msg3a *maxmp = NULL;
		for ( int32_t i = 0 ; i < m_numCollsToSearch ; i++ ) {
			// shortcut
			Msg3a *mp = m_msg3aPtrs[i];
			// get cursor
			int32_t cursor = mp->m_cursor;
			// skip if exhausted
			if ( cursor >= mp->m_numDocIds ) continue;
			// get his next score
			double score = mp->m_scores[ cursor ];
			if ( score <= max ) continue;
			// got a new winner
			max = score;
			maxmp = mp;
		}

		if(!maxmp )
			break; //done
		// store him
		m_msg3a.m_docIds  [next] = maxmp->m_docIds[maxmp->m_cursor];
		m_msg3a.m_scores  [next] = maxmp->m_scores[maxmp->m_cursor];
		m_msg3a.m_flags   [next] = maxmp->m_flags[maxmp->m_cursor];
		m_msg3a.m_collnums[next] = maxmp->m_msg39req.m_collnum;
		m_msg3a.m_clusterLevels[next] = CR_OK;
		maxmp->m_cursor++;
	}

	// free tmp msg3as now
	for ( int32_t i = 0 ; i < m_numCollsToSearch ; i++ ) {
		if ( m_msg3aPtrs[i] == &m_msg3a ) continue;
		mdelete ( m_msg3aPtrs[i] , sizeof(Msg3a), "tmsg3a");
		delete  ( m_msg3aPtrs[i] );
		m_msg3aPtrs[i] = NULL;
	}

	return true;
}


//adjust the order of the results based on the flags of the documents
void Msg40::adjustRankingBasedOnFlags() {
	int *rank = (int*)mmalloc(m_msg3a.m_numDocIds * sizeof(int), "ranksort");
	if(!rank)
		return; //not a serious problem
	
	for(int i=0; i<m_msg3a.m_numDocIds; i++) {
		unsigned flags = m_msg3a.m_flags[i];
		int adjustment = 0;
		if(flags) {
			for(int bit=0; bit<26; bit++)
				if((1<<bit)&flags)
					adjustment += m_si->m_flagRankAdjustment[bit];
			rank[i] = i + adjustment;
		} else
			rank[i] = i;
	}
	
	//There are no library functions to sort multiple side-by-side arrays. Fortunately, the positions are almost sorted, so a plain insertion sort is the ideal algorithm
	for(int i=1; i<m_msg3a.m_numDocIds; i++) {
		for(int j=i; j>0 && rank[j-1] > rank[j]; j--) {
			std::swap(m_msg3a.m_docIds[j],m_msg3a.m_docIds[j-1]);
			std::swap(m_msg3a.m_scores[j],m_msg3a.m_scores[j-1]);
			std::swap(m_msg3a.m_flags[j],m_msg3a.m_flags[j-1]);
			if(m_msg3a.m_collnums)
				std::swap(m_msg3a.m_collnums[j],m_msg3a.m_collnums[j-1]);
			std::swap(m_msg3a.m_clusterLevels[j],m_msg3a.m_clusterLevels[j-1]);
			std::swap(rank[j],rank[j-1]);
		}
	}
	
	mfree(rank,m_msg3a.m_numDocIds * sizeof(int),"ranksort");
}


// . returns false and sets g_errno/m_errno on error
// . makes m_msg3a.m_numDocIds ptrs to Msg20s. 
// . does not allocate a Msg20 in the buffer if the m_msg3a.m_clusterLevels[i]
//   is something other than CR_OK
bool Msg40::reallocMsg20Buf ( ) {

	// if the user only requested docids, we have no summaries
	if ( m_si->m_docIdsOnly ) return true;

	// . allocate m_buf2 to hold all our Msg20 pointers and Msg20 classes
	// . how much mem do we need?
	// . need space for the msg20 ptrs
	int64_t need = m_msg3a.m_numDocIds * sizeof(Msg20 *);
	// need space for the classes themselves, only if "visible" though
	for ( int32_t i = 0 ; i < m_msg3a.m_numDocIds ; i++ ) 
		if ( m_msg3a.m_clusterLevels[i] == CR_OK ) 
			need += sizeof(Msg20);

	// MDW: try to preserve the old Msg20s if we are being re-called
	if ( m_buf2 ) {
		// we do not do recalls when streaming yet
		if ( m_si->m_streamResults ) { g_process.shutdownAbort(true); }

		// make new buf
		char *newBuf = (char *)mmalloc(need,"Msg40d");
		// return false if it fails
		if ( ! newBuf ) { m_errno = g_errno; return false; }
		// fill it up
		char *p = newBuf;
		// point to our new array of Msg20 ptrs
		Msg20 **tmp = (Msg20 **)p;
		// skip over pointer array
		p += m_msg3a.m_numDocIds * sizeof(Msg20 *);
		// record start to set to m_msg20StartBuf
		char *pstart = p;
		// and count for m_numToFree
		int32_t pcount = 0;
		// fill in the actual Msg20s from the old buffer
		for ( int32_t i = 0 ; i < m_msg3a.m_numDocIds ; i++ ) {
			// assume empty, because clustered, filtered, etc.
			tmp[i] = NULL;
			// if clustered, keep it as a NULL ptr
			if ( m_msg3a.m_clusterLevels[i] != CR_OK ) continue;
			// point it to its memory
			tmp[i] = (Msg20 *)p;
			// point to the next Msg20
			p += sizeof(Msg20);
			// init it
			new (tmp[i]) Msg20();
			// count it
			pcount++;
			// skip it if it is a new docid, we do not have a Msg20
			// for it from the previous tier. IF it is from
			// the current tier, THEN it is new.
			//if ( m_msg3a.m_tiers[i] == m_msg3a.m_tier ) continue;
			// see if we can find this docid from the old list!
			int32_t k = 0;
			for ( ; k < m_numMsg20s ; k++ ) {
				// skip if NULL
				if ( ! m_msg20[k] ) continue;
				// if it never gave us a reply then skip it
				if ( ! m_msg20[k]->m_gotReply ) continue;
				//or if it had an error
				if ( m_msg20[k]->m_errno ) continue;
				// skip if no match
				if ( m_msg3a    .m_docIds[i] !=
				     m_msg20[k]->m_r->m_docId )//getDocId() )
					continue;
				// we got a match, grab its Msg20
				break;
			}
			// . skip if we could not match it... strange...
			// . no, because it may have been in the prev tier,
			//   from a split, but it was not in msg3a's final 
			//   merged list made in Msg3a::mergeLists(), but now 
			//   it is in there, with the previous tier, because
			//   we asked for more docids from msg3a.
			// . NO! why did we go to the next tier unnecessarily
			//   THEN? no again, because we did a msg3a recall
			//   and asked for more docids which required us
			//   going to the next tier, even though some (but
			//   not enough) docids remained in the previous tier.
			if ( k >= m_numMsg20s ) {
				/*
				logf(LOG_DEBUG,"query: msg40: could not match "
				     "docid %" PRId64" (max=%" PRId32") "
				     "to msg20. newBitScore=0x%hhx q=%s",
				     m_msg3a.m_docIds[i],
				     (char)m_msg3a.m_bitScores[i],
				     m_msg3a.m_q->m_orig);
				*/
				continue;
			}
			// it is from an older tier but never got the msg20 
			// for it? what happened? it got unclustered??
			if ( ! m_msg20[k] ) continue;

			// . otherwise copy the memory if available
			// . if m_msg20[i]->m_docId is set this will save us
			//   repeating a summary lookup
			tmp[i]->moveFrom(m_msg20[k]);
		}
		// sanity check
		if ( p - (char *)tmp != need ) { g_process.shutdownAbort(true); }

		resetBuf2();

		// the new buf2 stuff
		m_numToFree     = pcount;
		m_msg20StartBuf = pstart;

		// re-assign the msg20 ptr to the ptrs
		m_msg20 = tmp;
		// update new count
		m_numMsg20s = m_msg3a.m_numDocIds;

		// assign to new mem
		m_buf2        = newBuf;
		m_bufMaxSize2 = need;

		// all done
		return true;
	}

	m_numMsg20s = m_msg3a.m_numDocIds;

	// when streaming because we can have hundreds of thousands of
	// search results we recycle a few msg20s to save mem
	if ( m_si->m_streamResults ) {
		int32_t max = MAX_OUTSTANDING_MSG20S * 2;
		if ( m_msg3a.m_numDocIds < max ) max = m_msg3a.m_numDocIds;
		need = 0;
		need += max * sizeof(Msg20 *);
		need += max * sizeof(Msg20);
		m_numMsg20s = max;
	}

	m_buf2        = NULL;
	m_bufMaxSize2 = need;

	// do the alloc
	if ( need ) m_buf2 = (char *)mmalloc ( need ,"Msg40msg20");
	if ( need && ! m_buf2 ) { m_errno = g_errno; return false; }
	// point to the mem
	char *p = m_buf2;
	// point to the array, then make p point to the Msg20 buffer space
	m_msg20 = (Msg20 **)p; 
	p += m_numMsg20s * sizeof(Msg20 *);
	// start free here
	m_msg20StartBuf = p;
	// set the m_msg20[] array to use this memory, m_buf20
	for ( int32_t i = 0 ; i < m_numMsg20s ; i++ ) {
		// assume empty
		m_msg20[i] = NULL;
		// if clustered, do a NULL ptr
		if ( m_msg3a.m_clusterLevels[i] != CR_OK ) continue;
		// point it to its memory
		m_msg20[i] = (Msg20 *)p;
		// call its constructor
		new (m_msg20[i]) Msg20();
		// point to the next Msg20
		p += sizeof(Msg20);
		// remember num to free in reset() function
		m_numToFree++;
	}

	return true;
}

bool Msg40::launchMsg20s(bool recalled) {

	if( !m_si ) {
		logError("cannot use this function when m_si is not set");
		gbshutdownLogicError();
	}

	// don't launch any more if client browser closed socket
	if ( m_socketHadError ) {
		g_process.shutdownAbort(true);
	}

	// these are just like for passing to Msg39 above
	int64_t maxCacheAge = 0 ;
	// may it somewhat jive with the search results caching, otherwise
	// it will tell me a search result was indexed like 3 days ago
	// when it was just indexed 10 minutes ago because the 
	// titledbMaxCacheAge was set way too high
	if ( m_si->m_rcache )
		maxCacheAge = g_conf.m_docSummaryWithDescriptionMaxCacheAge;

	int32_t maxOut = (int32_t)MAX_OUTSTANDING_MSG20S;
	if ( g_udpServer.getNumUsedSlots() > 500 ) maxOut = 10;
	if ( g_udpServer.getNumUsedSlots() > 800 ) maxOut = 1;

	// if not deduping or site clustering, then
	// just skip over docids for speed.
	// don't bother with summaries we do not need
	if ( ! m_si->m_doDupContentRemoval &&
	     ! m_si->m_doSiteClustering &&
	     m_lastProcessedi == -1 ) {
		// start getting summaries with the result # they want
		m_lastProcessedi = m_si->m_firstResultNum-1;
		// assume we printed the summaries before
		m_printi = m_si->m_firstResultNum;
		m_numDisplayed = m_si->m_firstResultNum;
		// fake this so Msg40::gotSummary() can let us finish
		// because it checks m_numRequests <  m_msg3a.m_numDocIds
		m_numRequests = m_si->m_firstResultNum;
		m_numReplies  = m_si->m_firstResultNum;
		m_didSummarySkip = true;
		log("query: skipping summary generation of first %" PRId32" docs",
		    m_si->m_firstResultNum);
	}

	// . launch a msg20 getSummary() for each docid
	// . m_numContiguous should preceed any gap, see below
	for ( int32_t i = m_lastProcessedi+1 ; i < m_msg3a.m_numDocIds ;i++ ) {
		// if the user only requested docids, do not get the summaries
		if ( m_si->m_docIdsOnly ) break;
		// hard limit
		if ( m_numRequests-m_numReplies >= maxOut ) break;
		// do not launch another until m_printi comes back because
		// all summaries are bottlenecked on printing him out now.
		if ( m_si->m_streamResults &&
		     // must have at least one outstanding summary guy
		     // otherwise we can return true below and cause
		     // the stream to truncate results in gotSummary()
		     //m_numReplies < m_numRequests &&
		     i >= m_printi + MAX_OUTSTANDING_MSG20S - 1 )
			break;

		// do not repeat for this i
		m_lastProcessedi = i;

		// start up a Msg20 to get the summary
		Msg20 *m = NULL;
		if ( m_si->m_streamResults ) {
			// there can be hundreds of thousands of results
			// when streaming, so recycle a few msg20s to save mem
			m = getAvailMsg20();
			// mark it so we know which docid it goes with
			m->m_ii = i;
		}
		else
			m = m_msg20[i];

		// if to a dead host, skip it
		int64_t docId = m_msg3a.m_docIds[i];
		uint32_t shardNum = g_hostdb.getShardNumFromDocId ( docId );
		// get the collection rec
		const CollectionRec *cr = g_collectiondb.getRec(m_firstCollnum);
		// if shard is dead then do not send to it if not crawlbot
		if ( g_hostdb.isShardDead ( shardNum ) && cr &&
		     // this is causing us to truncate streamed results
		     // too early when we have false positives that a 
		     // host is dead because the server is locking up 
		     // periodically
		     ! m_si->m_streamResults ) {
			log("msg40: skipping summary lookup #%" PRId32" of docid %" PRId64" for dead shard #%" PRId32
			    , i
			    , docId
			    , shardNum );
			m_numRequests++;
			m_numReplies++;
			continue;
		}


		// if msg20 ptr null that means the cluster level is not CR_OK
		if ( ! m ) {
			m_numRequests++;
			m_numReplies++;
			continue;
		}
		// . did we already TRY to get the summary for this docid?
		// . we might be re-called from the refilter: below
		// . if already did it, skip it
		// . Msg20::getSummary() sets m_docId, first thing
		if ( m_msg3a.m_docIds[i] == m->getRequestDocId() ) {
			m_numRequests++;
			m_numReplies++;
			continue;
		}

		// assume no error
		g_errno = 0;
		// debug msg
		if ( m_si->m_debug || g_conf.m_logDebugQuery )
			logf(LOG_DEBUG,"query: msg40: [%" PTRFMT"] Getting summary #%" PRId32" for docId=%" PRId64,
			     (PTRTYPE)this,i,m_msg3a.m_docIds[i]);
		// launch it
		m_numRequests++;

		if ( ! cr ) {
			log("msg40: missing coll");
			g_errno = ENOCOLLREC;
			if ( m_numReplies < m_numRequests ) return false;
			return true;
		}

		// set the summary request then get it!
		Msg20Request req;
		Query *q = &m_si->m_q;
		req.ptr_qbuf             = const_cast<char*>(q->getQuery());
		req.size_qbuf            = q->getQueryLen()+1;
		req.m_langId             = m_si->m_queryLangId;

		req.m_highlightQueryTerms = m_si->m_doQueryHighlighting;

		req.m_isDebug            = (bool)m_si->m_debug;

		if ( m_si->m_displayMetas && m_si->m_displayMetas[0] ) {
			int32_t dlen = strlen(m_si->m_displayMetas);
			req.ptr_displayMetas     = m_si->m_displayMetas;
			req.size_displayMetas    = dlen+1;
		}

		req.m_docId              = m_msg3a.m_docIds[i];
		
		// if the msg3a was merged from other msg3as because we
		// were searching multiple collections...
		if ( m_msg3a.m_collnums )
			req.m_collnum = m_msg3a.m_collnums[i];
		// otherwise, just one collection
		else
			req.m_collnum = m_msg3a.m_msg39req.m_collnum;

		req.m_numSummaryLines    = m_si->m_numLinesInSummary;
		req.m_maxCacheAge        = maxCacheAge;
		req.m_state              = this;
		req.m_callback           = gotSummaryWrapper;
		req.m_niceness           = m_si->m_niceness;
		req.m_showBanned         = m_si->m_showBanned;
		req.m_includeCachedCopy  = m_si->m_includeCachedCopy;
		req.m_getSummaryVector   = true;
		req.m_titleMaxLen = m_si->m_titleMaxLen;
		req.m_summaryMaxLen = cr->m_summaryMaxLen;
		req.m_queryExpansion     = m_si->m_queryExpansion;
		req.m_useQueryStopWords  = m_si->m_queryExpansion; //SearchInput doesn't have a m_useQueryStopWords, but if they wanted synonyms (m_queryExpansion) then they probably also want stop words

		// Line means excerpt 
		req.m_summaryMaxNumCharsPerLine = m_si->m_summaryMaxNumCharsPerLine;

		// a special undocumented thing for getting <h1> tag
		req.m_getHeaderTag       = m_si->m_hr.getLong("geth1tag",0);
		// let "ns" parm override
		req.m_numSummaryLines    = m_si->m_numLinesInSummary;

		// . buzz likes to do the &inlinks=1 parm to get inlinks
		// . use "&inlinks=1" for realtime inlink info, use 
		//   "&inlinks=2" to just get it from the title rec, which is 
		//   more stale, but does not take extra time or resources
		// . we "default" to the realtime stuff... i.e. since buzz
		//   is already using "&inlinks=1"
		if ( m_si->m_displayInlinks == 2 ) 
			req.m_getLinkInfo     = true;

		// it copies this using a serialize() function
		if ( ! m->getSummary ( &req ) ) continue;

		// got reply
		m_numReplies++;
		// . otherwise we got summary without blocking
		// . deal with an error
		if ( ! g_errno ) continue;
		// log it
		log("query: Had error getting summary: %s.",
		    mstrerror(g_errno));
		// record g_errno
		if ( ! m_errno ) m_errno = g_errno;
		// reset g_errno
		g_errno   = 0;
	}
	// return false if still waiting on replies
	if ( m_numReplies < m_numRequests ) return false;
	// do not re-call gotSummary() to avoid a possible recursive stack
	// explosion. this is only true if we are being called from 
	// gotSummary() already, so do not call it again!!
	if ( recalled ) 
		return true;
	// if we got nothing, that's it
	if ( m_msg3a.m_numDocIds <= 0 ) {
		// but if in streaming mode we still have to stream the
		// empty results back
		if ( m_si->m_streamResults ) return gotSummary ( );
		// otherwise, we're done
		return true;
	}

	// . i guess crash here for now
	// . seems like we can call reallocMsg20Buf() and the first 50
	//   can already be set, so we drop down to here... so don't core
	logf(LOG_DEBUG,"query: Had all msg20s already.");
	// . otherwise, we got everyone, so go right to the merge routine
	// . returns false if not all replies have been received 
	// . returns true if done
	// . sets g_errno on error
	return gotSummary ( );
}

Msg20 *Msg40::getAvailMsg20 ( ) {
	for ( int32_t i = 0 ; i < m_numMsg20s ; i++ ) {
		// m_inProgress is set to false right before it
		// calls Msg20::m_callback which is gotSummaryWrapper()
		// so we should be ok with this
		if ( m_msg20[i]->m_launched ) continue;
		return m_msg20[i];
	}
	// how can this happen???  THIS HAPPEND
	g_process.shutdownAbort(true); 
	return NULL;
}

Msg20 *Msg40::getCompletedSummary ( int32_t ix ) {
	for ( int32_t i = 0 ; i < m_numMsg20s ; i++ ) {
		// it seems m_numMsg20s can be > m_numRequests when doing
		// a multi collection federated search somehow and this
		// can therefore be null
		if ( ! m_msg20[i] ) 
			continue;
			
		if ( m_msg20[i]->m_ii != ix ) continue;
		if ( m_msg20[i]->m_inProgress ) return NULL;
		return m_msg20[i];
	}
	return NULL;
}

bool gotSummaryWrapper ( void *state ) {
	Msg40 *THIS  = (Msg40 *)state;
	// inc it here
	THIS->m_numReplies++;

	if ( (THIS->m_numReplies % 10) == 0 ) {
		log( "msg40: got %" PRId32 " summaries out of %" PRId32 "",
		     THIS->m_numReplies,
		     THIS->m_msg3a.m_numDocIds );
	}

	// it returns false if we're still awaiting replies
	if ( !THIS->gotSummary() ) {
		return false;
	}

	// now call callback, we're done
	THIS->m_callback ( THIS->m_state );

	return true;
}

static void doneSendingWrapper9(void *state, TcpSocket *sock) {
	Msg40 *THIS = (Msg40 *)state;

	// the send completed, count it
	THIS->m_sendsIn++;

	// error?
	if ( THIS->m_sendsIn > THIS->m_sendsOut ) {
		log("msg40: sendsin > sendsout. bailing!!!");
		// try to prevent a core i haven't fixed right yet!!!
		// seems like a reply coming back after we've destroyed the
		// state!!!
		return;
	}
	// socket error? if client closes the socket midstream we get one.
	if ( g_errno ) {
		THIS->m_socketHadError = g_errno;
		log("msg40: streaming socket had error: %s",
		    mstrerror(g_errno));
		// i guess destroy the socket here so we don't get called again?
	}

	// clear it so we don't think it was a msg20 error below
	g_errno = 0;

	// try to send more... returns false if blocked on something
	if ( ! THIS->gotSummary() ) return;

	// all done!!!???
	THIS->m_callback ( THIS->m_state );
}

// . returns false if not all replies have been received (or timed/erroredout)
// . returns true if done (or an error finished us)
// . sets g_errno on error
bool Msg40::gotSummary ( ) {
	// now m_linkInfo[i] (for some i, i dunno which) is filled
	if ( m_si->m_debug || g_conf.m_logDebugQuery )
		logf(LOG_DEBUG,"query: msg40: [%" PTRFMT"] Got summary. Total got=#%" PRId32".",
		     (PTRTYPE)this,m_numReplies);

	// did we have a problem getting this summary?
	if ( g_errno ) {
		// save it
		m_errno = g_errno;
		// log it
		log("query: msg40: Got error getting summary: %s.", mstrerror(g_errno));
		// reset g_errno
		g_errno = 0;
	}

	// initialize dedup table if we haven't already
	if ( ! m_dedupTable.isInitialized() &&
	     ! m_dedupTable.set (4,0,64,NULL,0,false,"srdt") )
		log("query: error initializing dedup table: %s",
		    mstrerror(g_errno));

	State0 *st = (State0 *)m_state;

 doAgain:

	st->m_sb.reset();

	if(m_si->m_streamResults) {
		// this is in PageResults.cpp
		if ( ! m_printedHeader ) {
			// only print header once
			m_printedHeader = true;
			printHttpMime(m_si->m_format,&st->m_sb);
			printSearchResultsHeader ( st );
		}

		for ( ; m_printi<m_msg3a.m_numDocIds ; m_printi++) {
			// if we are waiting on our previous send to complete... wait..
			if ( m_sendsOut > m_sendsIn ) break;

			// get summary for result #m_printi
			Msg20 *m20 = getCompletedSummary ( m_printi );

			// if result summary #i not yet in, wait...
			if ( ! m20 )
				break;

			if ( m20->m_errno ) {
				log("msg40: sum #%" PRId32" error: %s",
				    m_printi,mstrerror(m20->m_errno));
				// make it available to be reused
				m20->reset();
				continue;
			}

			// get the next reply we are waiting on to print results order
			Msg20Reply *mr = m20->m_r;
			if ( ! mr ) break;

			// primitive deduping. for diffbot json exclude url's from the
			// XmlDoc::m_contentHash32.. it will be zero if invalid i guess
			if ( m_si->m_doDupContentRemoval && // &dr=1
			     mr->m_contentHash32 &&
			     // do not dedup CT_STATUS results, those are
			     // spider reply "documents" that indicate the last
			     // time a doc was spidered and the error code or success
			     // code
			     mr->m_contentType != CT_STATUS &&
			     m_dedupTable.isInTable ( &mr->m_contentHash32 ) ) {
				//if ( g_conf.m_logDebugQuery )
				log("msg40: dup sum #%" PRId32" (%" PRIu32") (d=%" PRId64")",m_printi,
				    mr->m_contentHash32,mr->m_docId);
				// make it available to be reused
				m20->reset();
				continue;
			}

			// return true with g_errno set on error
			if ( m_si->m_doDupContentRemoval && // &dr=1
			     mr->m_contentHash32 &&
			     // do not dedup CT_STATUS results, those are
			     // spider reply "documents" that indicate the last
			     // time a doc was spidered and the error code or success
			     // code
			     mr->m_contentType != CT_STATUS &&
			     ! m_dedupTable.addKey ( &mr->m_contentHash32 ) ) {
				log("msg40: error adding to dedup table: %s",
				    mstrerror(g_errno));
			}

			// assume we show this to the user
			m_numDisplayed++;
			//log("msg40: numdisplayed=%" PRId32,m_numDisplayed);

			// do not print it if before the &s=X start position though
			if ( m_numDisplayed <= m_si->m_firstResultNum ){
				if ( m_printCount == 0 )
					log("msg40: hiding #%" PRId32" (%" PRIu32") (d=%" PRId64")",
					    m_printi,mr->m_contentHash32,mr->m_docId);
				m_printCount++;
				if ( m_printCount == 100 ) m_printCount = 0;
				m20->reset();
				continue;
			}

			// . ok, we got it, so print it and stream it
			// . this might set m_hadPrintError to true
			printSearchResult9 ( m_printi , &m_numPrintedSoFar , mr );

			// return it so getAvailMsg20() can use it again
			// this will set m_launched to false
			m20->reset();
		}

		// . set it to true on all but the last thing we send!
		// . after each chunk of data we send out, TcpServer::sendChunk
		//   will call our callback, doneSendingWrapper9
		if ( st->m_socket )
			st->m_socket->m_streamingMode = true;


		// if streaming results, and too many results were clustered or
		// deduped then try to get more by merging the docid lists that
		// we already have from the shards. if this still does not provide
		// enough docids then we will need to issue a new msg39 request to
		// each shard to get even more docids from each shard.
		if ( // this is coring as well on multi collection federated searches
		     // so disable that for now too. it is because Msg3a::m_r is
		     // NULL.
		     m_numCollsToSearch == 1 &&
		     // must have no streamed chunk sends out
		     m_sendsOut == m_sendsIn &&
		     // if we did not ask for enough docids and they were mostly
		     // dups so they got deduped, then ask for more.
		     // m_numDisplayed includes results before the &s=X parm.
		     // and so does m_docsToGetVisiable, so we can compare them.
		     m_numDisplayed < m_docsToGetVisible &&
		     // wait for us to have exhausted the docids we have merged
		     m_printi >= m_msg3a.m_numDocIds &&
		     // wait for us to have available msg20s to get summaries
		     m_numReplies == m_numRequests &&
		     // this is true if we can get more docids from merging
		     // more of the termlists from the shards together.
		     // otherwise, we will have to ask each shard for a
		     // higher number of docids.
		     m_msg3a.m_moreDocIdsAvail &&
		     // do not do this if client closed connection
		     ! m_socketHadError ) { //&&

			// can it cover us?
			int32_t need = m_msg3a.m_docsToGet + 20;
			// note it
			log("msg40: too many summaries deduped. "
			    "getting more docids from msg3a merge and getting summaries. "
			    "%" PRId32" are visible, need %" PRId32". changing docsToGet from %" PRId32" to %" PRId32". numReplies=%" PRId32" numRequests=%" PRId32,
			    m_numDisplayed,
			    m_docsToGetVisible,
			    m_msg3a.m_docsToGet,
			    need,
			    m_numReplies,
			    m_numRequests);
			// merge more docids from the shards' termlists
			m_msg3a.m_docsToGet = need;
			// this should increase m_msg3a.m_numDocIds
			m_msg3a.mergeLists();
		}

		// . wrap it up with Next 10 etc.
		// . this is in PageResults.cpp
		if ( ! m_printedTail &&
		     m_printi >= m_msg3a.m_numDocIds ) {
			m_printedTail = true;
			printSearchResultsTail ( st );
			if ( m_sendsIn < m_sendsOut ) { g_process.shutdownAbort(true); }
			if ( g_conf.m_logDebugTcp )
				log("tcp: disabling streamingMode now");
			// this will be our final send
			if ( st->m_socket ) st->m_socket->m_streamingMode = false;
		}
	} //m_si->m_streamResults


	// do we still own this socket? i am thinking it got closed somewhere
	// and the socket descriptor was re-assigned to another socket
	// getting a diffbot reply from XmLDoc::getDiffbotReply()
	if ( st->m_socket && 
	     st->m_socket->m_startTime != st->m_socketStartTimeHack ) {
		log("msg40: lost control of socket. sd=%i. the socket descriptor closed on us and got re-used by someone else.",
		    (int)st->m_socket->m_sd);
		// if there wasn't already an error like 'broken pipe' then
		// set it here so we stop getting summaries if streaming.
		if ( ! m_socketHadError ) m_socketHadError = EBADENGINEER;
		// make it NULL to avoid us from doing anything to it
		// since sommeone else is using it now.
		st->m_socket = NULL;
	}


	// . transmit the chunk in sb if non-zero length
	// . steals the allocated buffer from sb and stores in the 
	//   TcpSocket::m_sendBuf, which it frees when socket is
	//   ultimately destroyed or we call sendChunk() again.
	// . when TcpServer is done transmitting, it does not close the
	//   socket but rather calls doneSendingWrapper() which can call
	//   this function again to send another chunk
	// . when we are truly done sending all the data, then we set lastChunk
	//   to true and TcpServer.cpp will destroy m_socket when done.
	//   no, actually we just set m_streamingMode to false i guess above
	if ( st->m_sb.length() &&
	     // did client browser close the socket on us midstream?
	     ! m_socketHadError &&
	     st->m_socket)
	{
		if( ! g_httpServer.m_tcp.sendChunk ( st->m_socket,
						     &st->m_sb,
						     this,
						     doneSendingWrapper9 ) )
			// if it blocked, inc this count. we'll only call m_callback
			// above when m_sendsIn equals m_sendsOut... and
			// m_numReplies == m_numRequests
			m_sendsOut++;

		// writing on closed socket?
		if ( g_errno ) {
			if ( ! m_socketHadError ) m_socketHadError = g_errno;
			log("msg40: got tcp error : %s",mstrerror(g_errno));
			// disown it here so we do not damage in case it gets
			// reopened by someone else
			st->m_socket = NULL;
		}
	}

	// do we need to launch another batch of summary requests?
	if ( m_numRequests < m_msg3a.m_numDocIds && ! m_socketHadError ) {
		// . if we can launch another, do it
		// . say "true" here so it does not call us, gotSummary() and 
		//   do a recursive stack explosion
		// . this returns false if still waiting on more to come back
		if ( ! launchMsg20s ( true ) ) return false; 
		// it won't launch now if we are bottlnecked waiting for
		// m_printi's summary to come in
		if ( m_si->m_streamResults ) {
			goto complete;
		}

		// it returned true, so m_numRequests == m_numReplies and
		// we don't need to launch any more! but that does NOT
		// make sense because m_numContiguous < m_msg3a.m_numDocIds
		// . i guess the launch can fail because of oom... and
		//   end up returning true here... seen it happen, and
		//   we had full requests/replies for m_msg3a.m_numDocIds
		log("msg40: got all replies i guess");
		goto doAgain;
	}

 complete:

	// . ok, now i wait for all msg20s (getsummary) to come back in.
	// . TODO: evaluate if this hurts us
	if ( m_numReplies < m_numRequests )
		return false;

	// if streaming results, we are done
	if ( m_si->m_streamResults ) {
		// unless waiting for last transmit to complete
		if ( m_sendsOut > m_sendsIn ) return false;
		// delete everything! no, doneSendingWrapper9 does...
		//mdelete(st, sizeof(State0), "msg40st0");
		//delete st;
		// otherwise, all done!
		log("msg40: did not send last search result summary. this=0x%" PTRFMT" because had error: %s",
		    (PTRTYPE)this,
		    mstrerror(m_socketHadError));
		return true;
	}

	int64_t startTime = gettimeofdayInMilliseconds();

	// loop over each clusterLevel and set it
	for ( int32_t i = 0 ; i < m_numReplies ; i++ ) {
		// did we skip the first X summaries because we were
		// not deduping/siteclustering/gettingGigabits?
		if ( m_didSummarySkip && i < m_si->m_firstResultNum )
			continue;
		// get current cluster level
		char *level = &m_msg3a.m_clusterLevels[i];
		// sanity check -- this is a transistional value msg3a should 
		// set it to something else!
		if ( *level == CR_GOT_REC         ) { g_process.shutdownAbort(true); }
		if ( *level == CR_ERROR_CLUSTERDB ) { g_process.shutdownAbort(true); }
		// skip if already "bad"
		if ( *level != CR_OK ) continue;
		// if the user only requested docids, we have no summaries
		if ( m_si->m_docIdsOnly ) break;
		// convenient var
		Msg20 *m = m_msg20[i];
		// get the Msg20 reply
		const Msg20Reply *mr = m->m_r;
		// if no reply, all hosts must have been dead i guess so
		// filter out this guy
		if ( ! mr && ! m->m_errno ) {
			logf(LOG_DEBUG,"query: msg 20 reply was null.");
			m->m_errno = ENOHOSTS;
		}

		if ( m_si->m_familyFilter && mr && mr->m_isAdult) {
			logf(LOG_DEBUG,"query: msg20.is_adult and family filter is on.");
			m->m_errno = EDOCADULT;
		}
		// if any msg20 has m_errno set, then set ours so at least the
		// xml feed will know there was a problem even though it may 
		// have gotten search results.
		if ( m->m_errno ) {
			if ( m_si->m_debug || g_conf.m_logDebugQuery ) {
				logf( LOG_DEBUG, "query: result %" PRId32 " (docid=%" PRId64 ") had an error (%s) and will not be shown.",
				      i, m_msg3a.m_docIds[i], mstrerror( m->m_errno ) );
			}

			// update our m_errno while here
			if ( ! m_errno ) {
				m_errno = m->m_errno;
			}

			if ( ! m_si->m_showErrors ) {
				*level = CR_ERROR_SUMMARY;
				continue;
			}
		}

		// a special case
		if ( mr && ( mr->m_errno == CR_RULESET_FILTERED || mr->m_errno == EDOCFILTERED ) ) {
			*level = CR_RULESET_FILTERED;
			continue;
		}

		if ( ! m_si->m_showBanned && mr && mr->m_isBanned ) {
			if ( m_si->m_debug || g_conf.m_logDebugQuery )
				logf( LOG_DEBUG, "query: result %" PRId32 " (docid=%" PRId64 ") is banned and will not be shown.",
					  i, m_msg3a.m_docIds[i] );
			*level = CR_BANNED_URL;
			continue;
		}

		// corruption?
		if ( mr && !mr->ptr_ubuf ) {
			log( "msg40: got corrupt msg20 reply for docid %" PRId64, mr->m_docId );
			*level = CR_BAD_URL;
			continue;
		}

		// filter simplified redirection/non-caconical document
		if (mr && mr->size_rubuf > 1 && (mr->m_contentLen <= 0 || mr->m_httpStatus != 200)) {
			if (!m_si->m_showErrors) {
				*level = CR_EMPTY_REDIRECTION_PAGE;
				continue;
			}
		}

		// filter empty title & summaries
		if ( mr && mr->size_tbuf <= 1 && mr->size_displaySum <= 1 ) {
			if ( ! m_si->m_showErrors ) {
				*level = CR_EMPTY_TITLE_SUMMARY;
				continue;
			}
		}
	}

	// what is the deduping threshhold? 0 means do not do deuping
	int32_t dedupPercent = 0;
	if ( m_si->m_doDupContentRemoval && m_si->m_percentSimilarSummary )
		dedupPercent = m_si->m_percentSimilarSummary;
	// icc=1 turns this off too i think
	if ( m_si->m_includeCachedCopy ) dedupPercent = 0;
	// if the user only requested docids, we have no summaries
	if ( m_si->m_docIdsOnly ) dedupPercent = 0;

	// filter out duplicate/similar summaries
	for ( int32_t i = 0 ; dedupPercent && i < m_numReplies ; i++ ) {
		// skip if already invisible
		if ( m_msg3a.m_clusterLevels[i] != CR_OK ) continue;
		// Skip if invalid
		if ( m_msg20[i]->m_errno ) continue;

		// get it
		const Msg20Reply *mri = m_msg20[i]->m_r;
		// do not dedup CT_STATUS results, those are
		// spider reply "documents" that indicate the last
		// time a doc was spidered and the error code or 
		// success code
		if ( mri->m_contentType == CT_STATUS ) continue;

		// see if any result lower-scoring than #i is a dup of #i
		for( int32_t m = i+1 ; m < m_numReplies ; m++ ) {
			// get current cluster level
			char *level = &m_msg3a.m_clusterLevels[m];
			// skip if already invisible
			if ( *level != CR_OK ) continue;
			// get it
			if ( m_msg20[m]->m_errno ) continue;

			const Msg20Reply *mrm = m_msg20[m]->m_r;
			// do not dedup CT_STATUS results, those are
			// spider reply "documents" that indicate the last
			// time a doc was spidered and the error code or 
			// success code
			if ( mrm->m_contentType == CT_STATUS ) continue;
			// use gigabit vector to do topic clustering, etc.
			const int32_t *vi = (int32_t *)mri->ptr_vbuf;
			const int32_t *vm = (int32_t *)mrm->ptr_vbuf;
			float s ;
			s = computeSimilarity(vi,vm,NULL,NULL,NULL);
			// skip if not similar
			if ( (int32_t)s < dedupPercent ) continue;
			// otherwise mark it as a summary dup
			if ( m_si->m_debug || g_conf.m_logDebugQuery )
				logf( LOG_DEBUG, "query: result #%" PRId32" (docid=%" PRId64") is %.02f%% similar-summary of #%" PRId32" (docid=%" PRId64")",
				      m, m_msg3a.m_docIds[m] , 
				      s, i, m_msg3a.m_docIds[i] );
			*level = CR_DUP_SUMMARY;
		}
	}



	//
	// BEGIN URL NORMALIZE AND COMPARE
	//
        
	// . ONLY DEDUP URL if it explicitly enabled AND we are not performing
	//   a site: or suburl: query.
	if(m_si->m_dedupURL &&
	   !m_si->m_q.m_hasPositiveSiteField &&
	   !m_si->m_q.m_hasSubUrlField) {
		for(int32_t i = 0 ; i < m_msg3a.m_numDocIds ; i++) {
			// skip if already invisible
			if(m_msg3a.m_clusterLevels[i] != CR_OK) continue;

			// get it
			const Msg20Reply *mr = m_msg20[i]->m_r;

			// hash the URL all in lower case to catch wiki dups
			const char *url  = mr-> ptr_ubuf;
			int32_t  ulen = mr->size_ubuf - 1;
			
			// since the redirect url is a more accurate 
			// representation of the conent do that if it exists.
			if ( mr->ptr_rubuf ) {
				url  = mr-> ptr_rubuf;
				ulen = mr->size_rubuf - 1;
			}

			// fix for directories, sometimes they are indexed
			// without a trailing slash, so let's normalize to
			// this standard.
			if(url[ulen-1] == '/')
				ulen--;

			Url u;
			u.set( url, ulen );
			url   = u.getHost();

			if(u.getPathLen() > 1) {
				// . remove sub-domain to fix conflicts with
				//   sites having www,us,en,fr,de,uk,etc AND
				//   it redirects to the same page.
				const char *host = u.getHost();
				const char *mdom = u.getMidDomain();
				if(mdom && host) {
					int32_t  hlen = mdom - host;
					if (isVariantLikeSubDomain(host, hlen-1))
						url = mdom;
				}
			}

			// adjust url string length
			ulen -= url - u.getUrl();

			uint64_t h = hash64Lower_a(url, ulen);
			int32_t slot = m_urlTable.getSlot(h);
			// if there is no slot,this url doesn't exist => add it
			if(slot == -1) {
				m_urlTable.addKey(h,mr->m_docId);
			} else {
				// If there was a slot, denote with the
				// cluster level URL already exited previously
				char *level = &m_msg3a.m_clusterLevels[i];
				if(m_si->m_debug || g_conf.m_logDebugQuery)
					logf(LOG_DEBUG, "query: result #%" PRId32" (docid=%" PRId64") is the same URL as (docid=%" PRId64")",
						 i,m_msg3a.m_docIds[i],
						 m_urlTable.getValueFromSlot(slot));
				*level = CR_DUP_URL;
			}
		}
	}

	//
	// END URL NORMALIZE AND COMPARE
	//

	// show time
	int64_t took = gettimeofdayInMilliseconds() - startTime;
	if ( took > 3 )
		log(LOG_INFO,"query: Took %" PRId64" ms to do clustering and dup removal.",took);

	if(!submitUrlRealtimeClassification())
		return false;
	else
		return gotEnoughSummaries();
}


struct UrlClassificationContext {
	Msg40 *msg40;
	int i;
	UrlClassificationContext(Msg40 *msg40_, int i_) : msg40(msg40_), i(i_) {}
};

//start classifying the URLs of the results
bool Msg40::submitUrlRealtimeClassification() {
	if(!realtimeUrlClassificationWorks()) {
		log(LOG_DEBUG,"Bypassing URL realtime classification because it is diabled or not working");
		return true; //done
	}
	
	{
		ScopedLock sl(m_mtxRealtimeClassificationsCounters);
		m_realtimeClassificationsSubmitted = true;
	}
	
	int num_started = 0;
	int num_wanted_to_start = 0;
	for(int i=0; i<m_numReplies; i++) {
		if(m_msg3a.m_clusterLevels[i]==CR_OK) {
			num_wanted_to_start++;
			Msg20 *m = m_msg20[i];
			std::string url(m->m_r->ptr_ubuf,m->m_r->size_ubuf);
			UrlClassificationContext *ucc = new UrlClassificationContext(this,i);
			incrementRealtimeClassificationsStarted();
			if(classifyUrl(url.c_str(),&urlClassificationCallback0,ucc)) {
				if(g_conf.m_logTraceUrlClassification)
					log(LOG_TRACE,"URL classification of '%s' started",url.c_str());
				num_started++;
			} else {
				if(g_conf.m_logTraceUrlClassification)
					log(LOG_TRACE,"URL classification of '%s' NOT started",url.c_str());
				incrementRealtimeClassificationsCompleted();
				delete ucc;
			}
		}
	}
	
	log(LOG_DEBUG,"msg40: Started URL classification on %d out of %d URLs (wanted %d)", num_started, m_numReplies, num_wanted_to_start);
	
	bool done;
	{
		ScopedLock sl(m_mtxRealtimeClassificationsCounters);
		m_realtimeClassificationsSubmitted = false;
		if(m_numRealtimeClassificationsCompleted==m_numRealtimeClassificationsStarted)
			done = true;
		else
			done = false;
	}
	
	return done;
}


void Msg40::urlClassificationCallback0(void *context, uint32_t classification) {
	UrlClassificationContext *ucc = reinterpret_cast<UrlClassificationContext*>(context);
	ucc->msg40->urlClassificationCallback1(ucc->i,classification);
	delete ucc;
}

void Msg40::urlClassificationCallback1(int i, uint32_t classification) {
	if(classification&URL_CLASSIFICATION_MALICIOUS) {
		m_msg3a.m_clusterLevels[i] = CR_MALICIOUS;
		log(LOG_DEBUG,"URL '%*.*s' classified as malicous. Filtering it out",
		    (int)m_msg20[i]->m_r->size_ubuf, (int)m_msg20[i]->m_r->size_ubuf, m_msg20[i]->m_r->ptr_ubuf);
	}
	if(incrementRealtimeClassificationsCompleted()) {
		log(LOG_TRACE,"msg40: all URL classifications completed");
		if(gotEnoughSummaries()) {
			m_callback(m_state);
		}
	}
}


bool Msg40::gotEnoughSummaries() {
	m_omitCount = 0;

	// count how many are visible!
	int32_t visible = 0;
	// loop over each clusterLevel and set it
	for ( int32_t i = 0 ; i < m_numReplies ; i++ ) {
		// get current cluster level
		const char *level = &m_msg3a.m_clusterLevels[i];
		// on CR_OK
		if ( *level == CR_OK ) visible++;
		// otherwise count as ommitted
		else m_omitCount++;
	}

	// . let's wait for the tasks to complete before even trying to launch
	//   more than the first MAX_OUTSTANDING msg20s
	// . the msg3a re-call will end up re-doing our tasks as well! so we
	//   have to make sure they complete at this point
	if ( m_tasksRemaining > 0 ) return false;

	// debug
	bool debug = (m_si->m_debug || g_conf.m_logDebugQuery);
	for ( int32_t i = 0 ; debug && i < m_msg3a.m_numDocIds ; i++ ) {
		int32_t cn = (int32_t)m_msg3a.m_clusterLevels[i];
		if ( cn < 0 || cn >= CR_END ) { g_process.shutdownAbort(true); }
		const char *s = g_crStrings[cn];
		if ( ! s ) { g_process.shutdownAbort(true); }
		logf(LOG_DEBUG, "query: msg40 final hit #%" PRId32") d=%" PRIu64" cl=%" PRId32" (%s)",
		     i,m_msg3a.m_docIds[i],(int32_t)m_msg3a.m_clusterLevels[i],s);
	}

	if ( debug )
		logf (LOG_DEBUG,"query: msg40: firstResult=%" PRId32", "
		      "totalDocIds=%" PRId32", resultsWanted=%" PRId32" "
		      "visible=%" PRId32" toGet=%" PRId32" recallCnt=%" PRId32,
		      m_si->m_firstResultNum, m_msg3a.m_numDocIds ,
		      m_docsToGetVisible, visible,
		      //m_numContiguous, 
		      m_docsToGet , m_msg3aRecallCnt);

	// if we do not have enough visible, try to get more
	if ( visible < m_docsToGetVisible && m_msg3a.m_moreDocIdsAvail &&
	     // do not spin too long in this!
	     // TODO: fix this better somehow later
	     m_docsToGet <= 1000 &&
	     // doesn't work on multi-coll just yet, it cores
	     m_numCollsToSearch == 1 ) {
		// can it cover us?
		int32_t need = m_docsToGet + 20;
		// increase by 25 percent as well
		need *= 1.25;
		// note it
		log("msg40: too many summaries invisible. getting more docids from msg3a merge and getting summaries. "
		    "%" PRId32" are visible, need %" PRId32". %" PRId32" to %" PRId32". numReplies=%" PRId32" numRequests=%" PRId32,
		    visible, m_docsToGetVisible,
		    m_msg3a.m_docsToGet, need,
		    m_numReplies, m_numRequests);

		// get more!
		m_docsToGet = need;
		// reset this before launch
		m_numReplies  = 0;
		m_numRequests = 0;
		// reprocess all!
		m_lastProcessedi = -1;
		// let's do it all from the top!
		return getDocIds ( true ) ;
	}

	// get time now
	int64_t now = gettimeofdayInMilliseconds();
	// . add the stat for how long to get all the summaries
	// . use purple for tie to get all summaries
	// . THIS INCLUDES Msg3a/Msg39 RECALLS!!!
	// . can we subtract that?
	//"get_all_summaries"
	g_stats.addStat_r ( 0, m_startTime, now, 0x008220ff );

	// timestamp log
	if ( g_conf.m_logTimingQuery || m_si->m_debug )
		logf(LOG_DEBUG,"query: msg40: [%" PTRFMT"] Got %" PRId32" summaries in %" PRId64" ms",
		     (PTRTYPE)this ,
		     visible, // m_visibleContiguous,
		     now - m_startTime );


	/////////////
	//
	//
	// prepare query term extra info for gigabits
	//
	////////////


	// set m_moreToCome, if true, we print a "Next 10" link
	m_moreToCome = (visible > m_si->m_docsWanted+m_si->m_firstResultNum);
	if ( m_si->m_debug || g_conf.m_logDebugQuery ) {
		logf( LOG_DEBUG, "query: msg40: more? %d", m_moreToCome );
	}

	// alloc m_buf, which should be NULL
	if ( m_buf ) { g_process.shutdownAbort(true); }

	// . we need to collapse m_msg3a.m_docIds[], etc. into m_docIds[] etc
	//   to be just the docids we wanted.
	// . at this point we should merge in all docids from all Msg40s from
	//   different clusters, etc.
	// . now alloc space for "docsWanted" m_docIds[], m_scores[], 
	//   m_bitScores[], m_clusterLevels[] and m_newMsg20[]

	//
	// HACK TIME
	//

	// . bury filtered/clustered docids from m_msg3a.m_docIds[]
	// . also remove result no in the request window specified by &s=X&n=Y
	//   where "s" is m_si->m_firstResultNum (which starts at 0) and "n" 
	//   is the number of results requested, m_si->m_docsWanted
	// . this is a bit of a hack (MDW)
	int32_t c = 0;
	int32_t v = 0;
	for ( int32_t i = 0 ; i < m_msg3a.m_numDocIds ; i++ ) {
		// must ahve a cluster level of CR_OK (visible)
		// v is the visible count
		if ( ( m_msg3a.m_clusterLevels[i] != CR_OK ) || ( v++ < m_si->m_firstResultNum ) ) {
			// skip
			continue;
		}

		// we got a winner, save it
		m_msg3a.m_docIds        [c] = m_msg3a.m_docIds        [i];
		m_msg3a.m_scores        [c] = m_msg3a.m_scores        [i];
		m_msg3a.m_flags         [c] = m_msg3a.m_flags         [i];
		m_msg3a.m_clusterLevels [c] = m_msg3a.m_clusterLevels [i];
		m_msg20                 [c] = m_msg20                 [i];

		if ( m_msg3a.m_scoreInfos ) {
			m_msg3a.m_scoreInfos [c] = m_msg3a.m_scoreInfos [i];
		}

		int32_t need = m_si->m_docsWanted;

		// if done, bail
		if ( ++c >= need ) {
			break;
		}
	}
	// reset the # of docids we got to how many we kept!
	m_msg3a.m_numDocIds = c;

	// debug
	for ( int32_t i = 0 ; debug && i < m_msg3a.m_numDocIds ; i++ )
		logf(LOG_DEBUG, "query: msg40 clipped hit #%" PRId32") d=%" PRIu64" cl=%" PRId32" (%s)",
		     i,m_msg3a.m_docIds[i],(int32_t)m_msg3a.m_clusterLevels[i],
		     g_crStrings[(int32_t)m_msg3a.m_clusterLevels[i]]);

	//
	// END HACK
	// 

	//Old logic for whether to store the msg40+results in the cache or not. Logic looks fine
	//but the cache has been removed a long time ago.
	//
	// // . uc = use cache?
	// // . store in cache now if we need to
	// bool uc = false;
	// if ( m_si->m_useCache   ) uc = true;
	// if ( m_si->m_wcache     ) uc = true;
	// // . do not store if there was an error
	// // . no, allow errors in cache since we often have lots of
	// //   docid not founds and what not, due to index corruption and
	// //   being out of sync with titledb
	// if ( m_errno            &&
	//      // forgive "Record not found" errors, they are quite common
	//      m_errno != ENOTFOUND ) {
	// 	logf(LOG_DEBUG,"query: not storing in cache: %s",
	// 	     mstrerror(m_errno));
	// 	uc = false;
	// }
	// if ( m_si->m_docIdsOnly ) uc = false;
	//
	// // all done if not storing in cache
	// if ( ! uc ) return true;


	// ignore errors
	g_errno = 0;
 	return true;
}

//For the purpose of clustering and result suppression these hosts are considered the same:
//  example.com
//  www.example.com
//  fr.example.com
//  pl.example.com
//etc.
static const char * const s_variantLikeSubDomains[] = {
        // Common Language sub-domains
        "en" ,
        "fr" ,
        "es" ,
        "ru" ,
        "zz" ,
        "ja" ,
        "tw" ,
        "cn" ,
        "ko" ,
        "de" ,
        "nl" ,
        "it" ,
        "fi" ,
        "sv" ,
        "no" ,
        "pt" ,
        "vi" ,
        "ar" ,
        "he" ,
        "id" ,
        "el" ,
        "th" ,
        "hi" ,
        "bn" ,
        "pl" ,
        "tl" ,
        // Common Country sub-domains
        "us" ,
        "uk" ,
        // Common web sub-domains
        "www"
};
static HashTable  s_variantLikeSubDomainTable;
static bool       s_variantLikeSubDomainInitialized = false;
static GbMutex    s_variantLikeSubDomainMutex;

static bool initVariantLikeSubDomainTable(HashTable *table, const char * const words[], int32_t size ){
	// set up the hash table
	if ( ! table->set ( size * 2 ) ) {
		log(LOG_INIT, "build: Could not init sub-domain table.");
		return false;
	}
	// now add in all the stop words
	int32_t n = (int32_t)size/ sizeof(char *); 
	for ( int32_t i = 0 ; i < n ; i++ ) {
		const char      *sw    = words[i];
		int32_t       swlen = strlen ( sw );
                int32_t h = hash32Lower_a(sw, swlen);
                int32_t slot = table->getSlot(h);
                // if there is no slot, this url doesn't exist => add it
                if(slot == -1)
                        table->addKey(h,0);
                else 
                        log(LOG_INIT,"build: Sub-domain table has duplicates");
	}
	return true;
}

static bool isVariantLikeSubDomain(const char *s , int32_t len) {
	ScopedLock sl(s_variantLikeSubDomainMutex);
	if ( ! s_variantLikeSubDomainInitialized ) {
		s_variantLikeSubDomainInitialized = initVariantLikeSubDomainTable(&s_variantLikeSubDomainTable, s_variantLikeSubDomains, sizeof(s_variantLikeSubDomains));
		if (!s_variantLikeSubDomainInitialized)
			return false;
	} 
	sl.unlock();

	// get from table
        int32_t h = hash32Lower_a(s, len);
        if(s_variantLikeSubDomainTable.getSlot(h) == -1)
                return false;
	return true;
}		

// . printSearchResult into "sb"
bool Msg40::printSearchResult9 ( int32_t ix , int32_t *numPrintedSoFar ,
				 Msg20Reply *mr ) {

	// . we stream results right onto the socket
	// . useful for thousands of results... and saving mem
	if ( ! m_si || ! m_si->m_streamResults ) { g_process.shutdownAbort(true); }

	// get state0
	State0 *st = (State0 *)m_state;

	Msg40 *msg40 = &st->m_msg40;

	// then print each result
	// don't display more than docsWanted results
	if ( m_numPrinted >= msg40->getDocsWanted() ) {
		// i guess we can print "Next 10" link
		m_moreToCome = true;
		// hide if above limit
		if ( m_printCount == 0 )
			log(LOG_INFO,"msg40: hiding above docsWanted #%" PRId32" (%" PRIu32")(d=%" PRId64")",
			    m_printi,mr->m_contentHash32,mr->m_docId);
		m_printCount++;
		if ( m_printCount == 100 ) m_printCount = 0;
		// do not exceed what the user asked for
		return true;
	}

	// print that out into st->m_sb safebuf
	if ( ! printResult ( st , ix , numPrintedSoFar ) ) {
		// oom?
		if ( ! g_errno ) g_errno = EBADENGINEER;
		log("query: had error: %s",mstrerror(g_errno));
	}

	// count it
	m_numPrinted++;

	return true;
}
	

static bool printHttpMime(int32_t format, SafeBuf *sb) {
	// reserve 1.5MB now!
	if ( ! sb->reserve(1500000 ,"pgresbuf" ) ) // 128000) )
		return true;
	// just in case it is empty, make it null terminated
	sb->nullTerm();

	// defaults to FORMAT_HTML
	const char *ct = "text/html";
	if(format == FORMAT_JSON) {
		ct = "application/json";
	} else if(format == FORMAT_XML) {
		ct = "text/xml";
	}

	// . if we haven't yet sent an http mime back to the user
	//   then do so here, the content-length will not be in there
	//   because we might have to call for more spiderdb data
	HttpMime mime;
	mime.makeMime ( -1, // totel content-lenght is unknown!
			0 , // do not cache (cacheTime)
			0 , // lastModified
			0 , // offset
			-1 , // bytesToSend
			NULL , // ext
			false, // POSTReply
			ct, // "text/csv", // contenttype
			"utf-8" , // charset
			-1 , // httpstatus
			NULL ); //cookie
	sb->safeMemcpy(mime.getMime(),mime.getMimeLen() );
	return true;
}


void Msg40::incrementRealtimeClassificationsStarted() {
	ScopedLock sl(m_mtxRealtimeClassificationsCounters);
	m_numRealtimeClassificationsStarted++;
	if(m_numRealtimeClassificationsCompleted>=m_numRealtimeClassificationsStarted) gbshutdownLogicError();
}

bool Msg40::incrementRealtimeClassificationsCompleted() {
	ScopedLock sl(m_mtxRealtimeClassificationsCounters);
	m_numRealtimeClassificationsCompleted++;
	if(m_numRealtimeClassificationsCompleted>m_numRealtimeClassificationsStarted) gbshutdownLogicError();
	return m_numRealtimeClassificationsCompleted==m_numRealtimeClassificationsStarted && !m_realtimeClassificationsSubmitted;
}

bool Msg40::areAllRealtimeClassificationsCompleted() const {
	ScopedLock sl(const_cast<GbMutex&>(m_mtxRealtimeClassificationsCounters));
	return (!m_realtimeClassificationsSubmitted) && (m_numRealtimeClassificationsCompleted==m_numRealtimeClassificationsStarted);
}