#include "Msg40.h"
#include "Stats.h"        // for timing and graphing time to get all summaries
#include "Collectiondb.h"
#include "sort.h"
#include "matches2.h"
#include "XmlDoc.h" // computeSimilarity()
#include "Speller.h"
#include "Wiki.h"
#include "HttpServer.h"
#include "PageResults.h"
#include "HashTable.h"
//#include "AdultCheck.h"
#include "Process.h"
#include "UrlRealtimeClassification.h"
#include "UdpServer.h"
#include "Conf.h"
#include "GbMutex.h"
#include "ScopedLock.h"
#include "Mem.h"
#include "ScopedLock.h"
#include "Errno.h"
#include <new>


// increasing this doesn't seem to improve performance any on a single
// node cluster....
#define MAX_OUTSTANDING_MSG20S 200

static void gotDocIdsWrapper             ( void *state );
static bool gotSummaryWrapper            ( void *state );

static bool isVariantLikeSubDomain(const char *s, int32_t len);

Msg40::Msg40()
  : m_deadline(0),
    m_numRealtimeClassificationsStarted(0),
    m_numRealtimeClassificationsCompleted(0),
    m_mtxRealtimeClassificationsCounters(),
    m_realtimeClassificationsSubmitted(false)
{
	m_socketHadError = 0;
	m_buf           = NULL;
	m_buf2          = NULL;
	m_msg20         = NULL;
	m_numMsg20s     = 0;
	m_msg20StartBuf = NULL;
	m_numToFree     = 0;
	// new stuff for streaming results:
	m_numPrinted    = 0;
	m_printedHeader = false;
	m_printedTail   = false;
	m_sendsOut      = 0;
	m_sendsIn       = 0;
	m_printi        = 0;
	m_numDisplayed  = 0;
	m_numPrintedSoFar = 0;
	m_didSummarySkip = false;
	m_omitCount      = 0;
	m_printCount = 0;
	m_numCollsToSearch = 0;
	m_numMsg20sIn = 0;
	m_numMsg20sOut = 0;

	// Coverity
	m_msg3aRecallCnt = 0;
	m_docsToGet = 0;
	m_docsToGetVisible = 0;
	m_state = NULL;
	m_callback = NULL;
	m_numRequests = 0;
	m_numReplies = 0;
	m_moreToCome = false;
	m_lastProcessedi = 0;
	m_startTime = 0;
	m_cachedTime = 0;
	m_tasksRemaining = 0;
	m_bufMaxSize = 0;
	m_bufMaxSize2 = 0;
	m_errno = 0;
	m_si = NULL;
	m_msg3aPtrs = NULL;
	m_num3aRequests = 0;
	m_num3aReplies = 0;
	m_firstCollnum = 0;
}

void Msg40::resetBuf2 ( ) {
	// remember num to free in reset() function
	char *p = m_msg20StartBuf;
	// msg20 destructors
	for ( int32_t i = 0 ; i < m_numToFree ; i++ ) {
		// cast it
		Msg20 *m = (Msg20 *)p;
		// free its stuff
		m->~Msg20();
		// advance
		p += sizeof(Msg20);
	}
	// now free the msg20 ptrs and buffer space
	if ( m_buf2 ) mfree ( m_buf2 , m_bufMaxSize2 , "Msg40b" );
	m_buf2 = NULL;
}

Msg40::~Msg40() {
	// free tmp msg3as now
	for ( int32_t i = 0 ; i < m_numCollsToSearch ; i++ ) {
		if ( ! m_msg3aPtrs[i] ) continue;
		if ( m_msg3aPtrs[i] == &m_msg3a ) continue;
		mdelete ( m_msg3aPtrs[i] , sizeof(Msg3a), "tmsg3a");
		delete  ( m_msg3aPtrs[i] );
		m_msg3aPtrs[i] = NULL;
	}
	if ( m_buf  ) mfree ( m_buf  , m_bufMaxSize  , "Msg40" );
	m_buf  = NULL;
	resetBuf2();
}

// . returns false if blocked, true otherwise
// . sets g_errno on error
// . uses Msg3a to get docIds
// . uses many msg20s to get title/summary/url/docLen for each docId
bool Msg40::getResults ( SearchInput *si      ,
			 bool         forward ,
			 void        *state   ,
			 void   (* callback) ( void *state ) ) {

	log(LOG_INFO, "query: Msg40 start: query_id='%s' query='%s'", si->m_queryId, si->m_query);
	m_omitCount = 0;

	if(g_conf.m_msg40_msg39_timeout>0) {
		m_deadline = gettimeofdayInMilliseconds() + g_conf.m_msg40_msg39_timeout;
	}

	// warning
	//if ( ! si->m_coll2 ) log(LOG_LOGIC,"net: NULL collection. msg40.");
	if ( si->m_collnumBuf.length() < (int32_t)sizeof(collnum_t) )
		log(LOG_LOGIC,"net: NULL collection. msg40.");


	m_lastProcessedi = -1;
	m_didSummarySkip = false;

	m_si             = si;
	m_state          = state;
	m_callback       = callback;
	m_msg3aRecallCnt = 0;
	// we haven't allocated any Msg20s yet
	m_numMsg20s      = 0;
	// reset our error keeper
	m_errno = 0;

	// take search parms i guess from first collnum
	const collnum_t *cp = (const collnum_t *)m_si->m_collnumBuf.getBufStart();

	// get the collection rec
	CollectionRec *cr =g_collectiondb.getRec( cp[0] );

	// g_errno should be set if not found
	if ( ! cr ) { g_errno = ENOCOLLREC; return true; }

	// save that
	m_firstCollnum = cr->m_collnum;

	// . reset these
	// . Next X Results links? yes or no?
	m_moreToCome = false;
	// set this to zero -- assume not in cache
	m_cachedTime = 0;

	// bail now if 0 requested!
	if ( m_si->m_docsWanted == 0 ) {
		log("msg40: n=0.");
		return true;
	}

	// or if no query terms
	if ( m_si->m_q.m_numTerms <= 0 ) {
		log("msg40: numTerms=0.");
		return true;
	}

	//enforce hard limits
	if(m_si->m_docsWanted > g_conf.m_maxDocsWanted) {
		log(LOG_DEBUG,"msg40: limiting docs-wanted from %d to %d", m_si->m_docsWanted, g_conf.m_maxDocsWanted);
		m_si->m_docsWanted = g_conf.m_maxDocsWanted;
	}
	if(m_si->m_firstResultNum > g_conf.m_maxFirstResultNum) {
		log(LOG_DEBUG,"msg40: limiting docs-offset from %d to %d", m_si->m_firstResultNum, g_conf.m_maxFirstResultNum);
		m_si->m_firstResultNum = g_conf.m_maxFirstResultNum;
	}
	// how many docids do we need to get?
	int32_t get = m_si->m_docsWanted + m_si->m_firstResultNum ;
	// we get one extra for so we can set m_moreToFollow so we know
	// if more docids can be gotten (i.e. show a "Next 10" link)
	get++;

	// ok, need some sane limit though to prevent malloc from 
	// trying to get 7800003 docids and going ENOMEM
	if ( get > MAXDOCIDSTOCOMPUTE ) {
		log("msg40: asking for too many docids. reducing to %" PRId32,
		    (int32_t)MAXDOCIDSTOCOMPUTE);
		get = MAXDOCIDSTOCOMPUTE;
	}
	// this is how many visible results we need, after filtering/clustering
	m_docsToGetVisible = get;

	// . get a little more since this usually doesn't remove many docIds
	// . deduping is now done in Msg40.cpp once the summaries are gotten
	if ( m_si->m_doDupContentRemoval ) get = (get*120LL)/100LL;

	// . ALWAYS get at least this many
	// . this allows Msg3a to allow higher scoring docids in tier #1 to
	//   outrank lower-scoring docids in tier #0, even if such docids have
	//   all the query terms explicitly. and we can guarantee consistency
	//   as long as we only allow for this outranking within the first
	//   MIN_DOCS_TO_GET docids.
	if ( get < MIN_DOCS_TO_GET ) get = MIN_DOCS_TO_GET;
	// this is how many docids to get total, assuming that some will be
	// filtered out for being dups, etc. and that we will have at least
	// m_docsToGetVisible leftover that are unfiltered and visible. so
	// we tell each msg39 split to get more docids than we actually want
	// in anticipation some will be filtered out in this class.
	m_docsToGet = get;

	// debug msg
	if ( m_si->m_debug ) 
		logf(LOG_DEBUG,"query: msg40 mapped %" PRId32" wanted to %" PRId32" to get",
		     m_docsToGetVisible,m_docsToGet );

	// let's try using msg 0xfd like Proxy.cpp uses to forward an http
	// request! then we just need specify the ip of the proxy and we
	// do not need hosts2.conf!
	if ( forward ) { g_process.shutdownAbort(true); }

	// time the cache lookup
	if ( g_conf.m_logTimingQuery || m_si->m_debug || g_conf.m_logDebugQuery) 
		m_startTime = gettimeofdayInMilliseconds();

	// keep going
	bool status = prepareToGetDocIds ( );

	return status;
}

bool Msg40::prepareToGetDocIds ( ) {

	// log the time it took for cache lookup
	if ( g_conf.m_logTimingQuery || m_si->m_debug || g_conf.m_logDebugQuery) {
		int64_t now  = gettimeofdayInMilliseconds();
		int64_t took = now - m_startTime;
		logf(LOG_TIMING,"query: [%p] Not found in cache. Lookup took %" PRId64" ms.",this,took);
		m_startTime = now;
		logf(LOG_TIMING,"query: msg40: [%p] Getting up to %" PRId32" (docToGet=%" PRId32") docids", this,
		     m_docsToGetVisible,  m_docsToGet);
	}

	// . if query has dirty words and family filter is on, set
	//   number of results to 0, and set the m_queryClen flag to true
	// . m_qbuf1 should be the advanced/composite query
#if 0
	//@@@ TODO
	if ( m_si->m_familyFilter && 
	     getAdultPoints ( m_si->m_sbuf1.getBufStart() , 
			      m_si->m_sbuf1.length() , 
			      NULL ) ) {
		// make sure the m_numDocIds gets set to 0
		m_msg3a.reset();
		return true;
	}
#endif

	return getDocIds( false );
}

// . returns false if blocked, true otherwise
// . sets g_errno on error
bool Msg40::getDocIds ( bool recall ) {

	////
	//
	// NEW CODE FOR LAUNCHING one MSG3a per collnum to search a token
	//
	////
	m_num3aReplies = 0;
	m_num3aRequests = 0;

	// how many are we searching? usually just one.
	m_numCollsToSearch = m_si->m_collnumBuf.length() /sizeof(collnum_t);

	// make enough for ptrs
	int32_t need = sizeof(Msg3a *) * m_numCollsToSearch;
	if ( ! m_msg3aPtrBuf.reserve ( need ) ) return true;
	// cast the mem buffer
	m_msg3aPtrs = (Msg3a **)m_msg3aPtrBuf.getBufStart();

	// clear these out so we do not free them when destructing
	for ( int32_t i = 0 ; i < m_numCollsToSearch ;i++ )
		m_msg3aPtrs[i] = NULL;

	// use first guy in case only one coll we are searching, the std case
	if ( m_numCollsToSearch <= 1 )
		m_msg3aPtrs[0] = &m_msg3a;

	return federatedLoop();
}

bool Msg40::federatedLoop ( ) {

	// search the provided collnums (collections)
	const collnum_t *cp = (const collnum_t *)m_si->m_collnumBuf.getBufStart();

	// we modified m_rcache above to be true if we should read from cache
	int32_t maxAge = 0 ;
	if ( m_si->m_rcache ) maxAge = g_conf.m_indexdbMaxIndexListAge;


	if(g_conf.m_logTraceQuery || m_si->m_debug)
		m_si->m_baseScoringParameters.traceToLog("query:msg40");
	
	// reset it
	Msg39Request mr;
	mr.reset();

	mr.m_maxAge                    = maxAge;
	mr.m_addToCache                = m_si->m_wcache;
	mr.m_docsToGet                 = m_docsToGet;
	mr.m_niceness                  = m_si->m_niceness;
	mr.m_debug                     = m_si->m_debug          ;
	mr.m_getDocIdScoringInfo       = m_si->m_getDocIdScoringInfo;
	mr.m_doSiteClustering          = m_si->m_doSiteClustering    ;
	mr.m_hideAllClustered          = m_si->m_hideAllClustered;
	mr.m_familyFilter              = m_si->m_familyFilter;
	mr.m_doMaxScoreAlgo            = m_si->m_doMaxScoreAlgo;
	mr.m_modifyQuery               = true; //we are a user-specified query so modifying it is ok. todo/hack until msg39 can carry the full query information
	mr.m_baseScoringParameters     = m_si->m_baseScoringParameters;
	mr.m_doDupContentRemoval       = m_si->m_doDupContentRemoval ;
	mr.m_word_variations_config    = m_si->m_word_variations_config;
	mr.m_familyFilter              = m_si->m_familyFilter        ;
	mr.m_allowHighFrequencyTermCache = m_si->m_allowHighFrequencyTermCache;
	mr.m_language                  = (unsigned char)m_si->m_queryLangId;
	mr.ptr_query                   = const_cast<char*>(m_si->m_q.originalQuery());
	mr.size_query                  = strlen(m_si->m_q.originalQuery())+1;
	int32_t slen = 0; if ( m_si->m_sites ) slen=strlen(m_si->m_sites)+1;
	mr.ptr_whiteList               = const_cast<char*>(m_si->m_sites);
	mr.size_whiteList              = slen;
	mr.m_timeout                   = g_conf.m_msg40_msg39_timeout;
	mr.m_realMaxTop                = m_si->m_realMaxTop;

	mr.m_minSerpDocId              = m_si->m_minSerpDocId;
	mr.m_maxSerpScore              = m_si->m_maxSerpScore;
	memcpy(mr.m_queryId, m_si->m_queryId, sizeof(m_si->m_queryId));

	if ( mr.m_timeout < m_si->m_minMsg3aTimeout )
		mr.m_timeout = m_si->m_minMsg3aTimeout;
	
	//
	// how many docid splits should we do to avoid going OOM?
	//
	const CollectionRec *cr = g_collectiondb.getRec(m_firstCollnum);
	RdbBase *base = NULL;
	if ( cr ) g_titledb.getRdb()->getBase(cr->m_collnum);
	//NOTE: the above line is a bug, but the obvious fix causes numDocIdSplits to become huge (eg 200)
	int64_t numDocs = 0;
	if ( base ) numDocs = base->getNumTotalRecs();
	// for every 5M docids per host, lets split up the docid range
	// to avoid going OOM
	int32_t mult = numDocs / 5000000;
	if ( mult <= 0 ) mult = 1;

	int32_t nt = m_si->m_q.getNumTerms();
	int32_t numDocIdSplits = nt / 2; // ;/// 2;
	if ( numDocIdSplits <= 0 ) numDocIdSplits = 1;
	// and mult based on index size
	numDocIdSplits *= mult;

	if ( cr ) mr.m_maxQueryTerms = cr->m_maxQueryTerms; 
	else      mr.m_maxQueryTerms = 100;

	// limit numDocIdSplits to the range specified in the configuration
	if ( numDocIdSplits < g_conf.min_docid_splits ) 
		numDocIdSplits = g_conf.min_docid_splits;
	if ( numDocIdSplits > g_conf.max_docid_splits ) 
		numDocIdSplits = g_conf.max_docid_splits;
	log(LOG_DEBUG,"query: Msg40::federatedLoop: numDocIdSplits=%d", numDocIdSplits);
	// store it in the reuquest now
	mr.m_numDocIdSplits = numDocIdSplits;

	if(m_si->m_doSiteClustering && cr) {
		//Make a temporary query instance so we can calculate if site clustering should be turned off. We cannot use m_si->m_q because that could affect word highlighting
		Query tmpQuery;
		const CollectionRec *cr = g_collectiondb.getRec(m_firstCollnum);
		tmpQuery.set(m_si->m_query,
		             m_si->m_queryLangId,
		             1.0, 1.0, NULL,
		             true,
		             m_si->m_allowHighFrequencyTermCache,
		             cr->m_maxQueryTerms);
		
		DerivedScoringWeights dsw; //don't care about the values.
		tmpQuery.modifyQuery(&dsw, *cr, &m_si->m_doSiteClustering);
		mr.m_doSiteClustering = m_si->m_doSiteClustering;
	}


	int32_t maxOutMsg3as = 1;

	// create new ones if searching more than 1 coll
	for ( int32_t i = m_num3aRequests ; i < m_numCollsToSearch ; i++ ) {

		// do not have more than this many outstanding
		if ( m_num3aRequests - m_num3aReplies >= maxOutMsg3as )
			// wait for it to return before launching another
			return false;

		// get it
		Msg3a *mp = m_msg3aPtrs[i];
		// stop if only searching one collection
		if ( ! mp ) {
			try { mp = new ( Msg3a); }
			catch(std::bad_alloc&) {
				g_errno = ENOMEM;
				return true;
			}
			mnew(mp,sizeof(Msg3a),"tm3ap");
		}
		// assign it
		m_msg3aPtrs[i] = mp;
		// assign the request for it
		memcpy ( &mp->m_msg39req , &mr , sizeof(Msg39Request) );
		// then customize it to just search this collnum
		mp->m_msg39req.m_collnum = cp[i];

		// launch a search request
		m_num3aRequests++;
		// this returns false if it would block and will call callback
		// m_si is actually contained in State0 in PageResults.cpp
		// and Msg40::m_si points to that. so State0's destructor
		// should call SearchInput's destructor which calls
		// Query's destructor to destroy &m_si->m_q here when done.
		if(!mp->getDocIds(m_si,&m_si->m_q,this,gotDocIdsWrapper))
			continue;
		if ( g_errno && ! m_errno ) 
			m_errno = g_errno;
		m_num3aReplies++;
	}

	// call again w/o parameters now
	return gotDocIds ( );
}	

// . uses parameters assigned to local member vars above
// . returns false if blocked, true otherwise
// . sets g_errno on error
void gotDocIdsWrapper ( void *state ) {
	Msg40 *THIS = (Msg40 *) state;
	// if this blocked, it returns false
	//if ( ! checkTurnOffRAT ( state ) ) return;
	THIS->m_num3aReplies++;
	// try to launch more if there are more colls left to search
	if ( THIS->m_num3aRequests < THIS->m_numCollsToSearch ) {
		THIS->federatedLoop ( );
		return;
	}
	// return if this blocked
	if ( ! THIS->gotDocIds() ) return;
	// now call callback, we're done
	log(LOG_INFO, "query: Msg40 end: query_id='%s' query='%s', results=%d", THIS->m_si->m_queryId, THIS->m_si->m_query, THIS->getNumResults());
	THIS->m_callback ( THIS->m_state );
}

// . return false if blocked, true otherwise
// . sets g_errno on error
bool Msg40::gotDocIds ( ) {

	// return now if still waiting for a msg3a reply to get in
	if ( m_num3aReplies < m_num3aRequests ) return false;


	// if searching over multiple collections let's merge their docids
	// into m_msg3a now before we go forward
	// this will set g_errno on error, like oom
	if ( ! mergeDocIdsIntoBaseMsg3a() )
		log("msg40: error: %s",mstrerror(g_errno));

	adjustRankingBasedOnFlags();

	// log the time it took for cache lookup
	int64_t now  = gettimeofdayInMilliseconds();

	if ( g_conf.m_logTimingQuery || m_si->m_debug||g_conf.m_logDebugQuery){
		int64_t took = now - m_startTime;
		logf(LOG_DEBUG,"query: msg40: [%p] Got %" PRId32" docids in %" PRId64" ms",
		     this,m_msg3a.getNumDocIds(),took);
		logf(LOG_DEBUG,"query: msg40: [%p] Getting up to %" PRId32" summaries",
		     this,m_docsToGetVisible);
	}

	// save any covered up error
	if ( ! m_errno && m_msg3a.m_errno ) m_errno = m_msg3a.m_errno;
	//sanity check.  we might not have allocated due to out of memory
	if ( g_errno ) { m_errno = g_errno; return true; }

	// time this
	m_startTime = gettimeofdayInMilliseconds();

	// we haven't got any Msg20 responses as of yet or sent any requests
	m_numRequests  =  0;
	m_numReplies   =  0;

	if ( ! m_urlTable.set ( m_msg3a.getNumDocIds() * 2 ) ) {
		m_errno = g_errno;
		log("query: Failed to allocate memory for url deduping. Not deduping search results.");
		return true;
	}

	// if only getting docids, skip summaries, and references
	if ( m_si->m_docIdsOnly ) return true;

	// . alloc buf to hold all m_msg20[i] ptrs and the Msg20s they point to
	// . returns false and sets g_errno/m_errno on error
	// . salvage any Msg20s that we can if we are being re-called
	if ( ! reallocMsg20Buf() ) return true;

	// . launch a bunch of task that depend on the docids we got
	// . keep track of how many are out
	m_tasksRemaining = 0;

	// debug msg
	if ( m_si->m_debug || g_conf.m_logDebugQuery )
		logf(LOG_DEBUG,"query: [%p] Getting reference pages and dir pages.",
		     this);

	return launchMsg20s ( false );
}

bool Msg40::mergeDocIdsIntoBaseMsg3a() {

	// only do this if we were searching multiple collections, otherwise
	// all the docids are already in m_msg3a
	if ( m_numCollsToSearch <= 1 ) return true;
	
	// free any mem in use
	m_msg3a.reset();

	// count total docids into "td"
	int32_t td = 0LL;
	for ( int32_t i = 0 ; i < m_numCollsToSearch ; i++ ) {
		Msg3a *mp = m_msg3aPtrs[i];
		td += mp->getNumDocIds();
		// reset cursor for list of docids from this collection
		mp->m_cursor = 0;
		// add up here too
		m_msg3a.m_numTotalEstimatedHits += mp->m_numTotalEstimatedHits;
	}

	// setup to to merge all msg3as into our one m_msg3a
	int32_t need = 0;
	need += td * 8;
	need += td * sizeof(double);
	need += td * sizeof(unsigned);
	need += td * sizeof(key96_t);
	need += td * 1;
	need += td * sizeof(collnum_t);
	// make room for the merged docids
	m_msg3a.m_finalBuf =  (char *)mmalloc ( need , "finalBuf" );
	m_msg3a.m_finalBufSize = need;
	// return true with g_errno set
	if ( ! m_msg3a.m_finalBuf ) return true;
	// parse the memory up into arrays
	char *p = m_msg3a.m_finalBuf;
	m_msg3a.m_docIds        = (int64_t *)p; p += td * 8;
	m_msg3a.m_scores        = (double    *)p; p += td * sizeof(double);
	m_msg3a.m_flags         = (unsigned*)p; p += td * sizeof(unsigned);
	m_msg3a.m_clusterRecs   = (key96_t     *)p; p += td * sizeof(key96_t);
	m_msg3a.m_clusterLevels = (char      *)p; p += td * 1;
	m_msg3a.m_scoreInfos    = NULL;
	m_msg3a.m_collnums      = (collnum_t *)p; p += td * sizeof(collnum_t);
	if ( p - m_msg3a.m_finalBuf != need ) { g_process.shutdownAbort(true); }

	m_msg3a.m_numDocIds = td;

	//
	// begin the collection merge
	//

	for(int32_t next = 0; next<td; next++) {
		// get next biggest score in the msg3as
		double max  = -1000000000.0;
		Msg3a *maxmp = NULL;
		for ( int32_t i = 0 ; i < m_numCollsToSearch ; i++ ) {
			// shortcut
			Msg3a *mp = m_msg3aPtrs[i];
			// get cursor
			int32_t cursor = mp->m_cursor;
			// skip if exhausted
			if ( cursor >= mp->m_numDocIds ) continue;
			// get his next score
			double score = mp->m_scores[ cursor ];
			if ( score <= max ) continue;
			// got a new winner
			max = score;
			maxmp = mp;
		}

		if(!maxmp )
			break; //done
		// store him
		m_msg3a.m_docIds  [next] = maxmp->m_docIds[maxmp->m_cursor];
		m_msg3a.m_scores  [next] = maxmp->m_scores[maxmp->m_cursor];
		m_msg3a.m_flags   [next] = maxmp->m_flags[maxmp->m_cursor];
		m_msg3a.m_collnums[next] = maxmp->m_msg39req.m_collnum;
		m_msg3a.m_clusterLevels[next] = CR_OK;
		maxmp->m_cursor++;
	}

	// free tmp msg3as now
	for ( int32_t i = 0 ; i < m_numCollsToSearch ; i++ ) {
		if ( m_msg3aPtrs[i] == &m_msg3a ) continue;
		mdelete ( m_msg3aPtrs[i] , sizeof(Msg3a), "tmsg3a");
		delete  ( m_msg3aPtrs[i] );
		m_msg3aPtrs[i] = NULL;
	}

	return true;
}


//adjust the order of the results based on the flags of the documents
void Msg40::adjustRankingBasedOnFlags() {
	int *rank = (int*)mmalloc(m_msg3a.m_numDocIds * sizeof(int), "ranksort");
	if(!rank)
		return; //not a serious problem
	
	for(int i=0; i<m_msg3a.m_numDocIds; i++) {
		unsigned flags = m_msg3a.m_flags[i];
		int adjustment = 0;
		if(flags) {
			for(int bit=0; bit<26; bit++)
				if((1<<bit)&flags)
					adjustment += m_si->m_baseScoringParameters.m_flagRankAdjustment[bit];
			rank[i] = i + adjustment;
		} else
			rank[i] = i;
	}
	
	//There are no library functions to sort multiple side-by-side arrays. Fortunately, the positions are almost sorted, so a plain insertion sort is the ideal algorithm
	for(int i=1; i<m_msg3a.m_numDocIds; i++) {
		for(int j=i; j>0 && rank[j-1] > rank[j]; j--) {
			std::swap(m_msg3a.m_docIds[j],m_msg3a.m_docIds[j-1]);
			std::swap(m_msg3a.m_scores[j],m_msg3a.m_scores[j-1]);
			std::swap(m_msg3a.m_flags[j],m_msg3a.m_flags[j-1]);
			if(m_msg3a.m_collnums)
				std::swap(m_msg3a.m_collnums[j],m_msg3a.m_collnums[j-1]);
			std::swap(m_msg3a.m_clusterLevels[j],m_msg3a.m_clusterLevels[j-1]);
			std::swap(rank[j],rank[j-1]);
		}
	}
	
	mfree(rank,m_msg3a.m_numDocIds * sizeof(int),"ranksort");
}


// . returns false and sets g_errno/m_errno on error
// . makes m_msg3a.m_numDocIds ptrs to Msg20s. 
// . does not allocate a Msg20 in the buffer if the m_msg3a.m_clusterLevels[i]
//   is something other than CR_OK
bool Msg40::reallocMsg20Buf ( ) {

	// if the user only requested docids, we have no summaries
	if ( m_si->m_docIdsOnly ) return true;

	// . allocate m_buf2 to hold all our Msg20 pointers and Msg20 classes
	// . how much mem do we need?
	// . need space for the msg20 ptrs
	int64_t need = m_msg3a.m_numDocIds * sizeof(Msg20 *);
	// need space for the classes themselves, only if "visible" though
	for ( int32_t i = 0 ; i < m_msg3a.m_numDocIds ; i++ ) 
		if ( m_msg3a.m_clusterLevels[i] == CR_OK ) 
			need += sizeof(Msg20);

	// MDW: try to preserve the old Msg20s if we are being re-called
	if ( m_buf2 ) {
		// make new buf
		char *newBuf = (char *)mmalloc(need,"Msg40d");
		// return false if it fails
		if ( ! newBuf ) { m_errno = g_errno; return false; }
		// fill it up
		char *p = newBuf;
		// point to our new array of Msg20 ptrs
		Msg20 **tmp = (Msg20 **)p;
		// skip over pointer array
		p += m_msg3a.m_numDocIds * sizeof(Msg20 *);
		// record start to set to m_msg20StartBuf
		char *pstart = p;
		// and count for m_numToFree
		int32_t pcount = 0;
		// fill in the actual Msg20s from the old buffer
		for ( int32_t i = 0 ; i < m_msg3a.m_numDocIds ; i++ ) {
			// assume empty, because clustered, filtered, etc.
			tmp[i] = NULL;
			// if clustered, keep it as a NULL ptr
			if ( m_msg3a.m_clusterLevels[i] != CR_OK ) continue;
			// point it to its memory
			tmp[i] = (Msg20 *)p;
			// point to the next Msg20
			p += sizeof(Msg20);
			// init it
			new (tmp[i]) Msg20();
			// count it
			pcount++;
			// skip it if it is a new docid, we do not have a Msg20
			// for it from the previous tier. IF it is from
			// the current tier, THEN it is new.
			//if ( m_msg3a.m_tiers[i] == m_msg3a.m_tier ) continue;
			// see if we can find this docid from the old list!
			int32_t k = 0;
			for ( ; k < m_numMsg20s ; k++ ) {
				// skip if NULL
				if ( ! m_msg20[k] ) continue;
				// if it never gave us a reply then skip it
				if ( ! m_msg20[k]->m_gotReply ) continue;
				//or if it had an error
				if ( m_msg20[k]->m_errno ) continue;
				// skip if no match
				if ( m_msg3a    .m_docIds[i] !=
				     m_msg20[k]->m_r->m_docId )//getDocId() )
					continue;
				// we got a match, grab its Msg20
				break;
			}
			// . skip if we could not match it... strange...
			// . no, because it may have been in the prev tier,
			//   from a split, but it was not in msg3a's final 
			//   merged list made in Msg3a::mergeLists(), but now 
			//   it is in there, with the previous tier, because
			//   we asked for more docids from msg3a.
			// . NO! why did we go to the next tier unnecessarily
			//   THEN? no again, because we did a msg3a recall
			//   and asked for more docids which required us
			//   going to the next tier, even though some (but
			//   not enough) docids remained in the previous tier.
			if ( k >= m_numMsg20s ) {
				/*
				logf(LOG_DEBUG,"query: msg40: could not match "
				     "docid %" PRId64" (max=%" PRId32") "
				     "to msg20. newBitScore=0x%hhx q=%s",
				     m_msg3a.m_docIds[i],
				     (char)m_msg3a.m_bitScores[i],
				     m_msg3a.m_q->m_orig);
				*/
				continue;
			}
			// it is from an older tier but never got the msg20 
			// for it? what happened? it got unclustered??
			if ( ! m_msg20[k] ) continue;

			// . otherwise copy the memory if available
			// . if m_msg20[i]->m_docId is set this will save us
			//   repeating a summary lookup
			tmp[i]->moveFrom(m_msg20[k]);
		}
		// sanity check
		if ( p - (char *)tmp != need ) { g_process.shutdownAbort(true); }

		resetBuf2();

		// the new buf2 stuff
		m_numToFree     = pcount;
		m_msg20StartBuf = pstart;

		// re-assign the msg20 ptr to the ptrs
		m_msg20 = tmp;
		// update new count
		m_numMsg20s = m_msg3a.m_numDocIds;

		// assign to new mem
		m_buf2        = newBuf;
		m_bufMaxSize2 = need;

		// all done
		return true;
	}

	m_numMsg20s = m_msg3a.m_numDocIds;

	m_buf2        = NULL;
	m_bufMaxSize2 = need;

	// do the alloc
	if ( need ) m_buf2 = (char *)mmalloc ( need ,"Msg40msg20");
	if ( need && ! m_buf2 ) { m_errno = g_errno; return false; }
	// point to the mem
	char *p = m_buf2;
	// point to the array, then make p point to the Msg20 buffer space
	m_msg20 = (Msg20 **)p; 
	p += m_numMsg20s * sizeof(Msg20 *);
	// start free here
	m_msg20StartBuf = p;
	// set the m_msg20[] array to use this memory, m_buf20
	for ( int32_t i = 0 ; i < m_numMsg20s ; i++ ) {
		// assume empty
		m_msg20[i] = NULL;
		// if clustered, do a NULL ptr
		if ( m_msg3a.m_clusterLevels[i] != CR_OK ) continue;
		// point it to its memory
		m_msg20[i] = (Msg20 *)p;
		// call its constructor
		new (m_msg20[i]) Msg20();
		// point to the next Msg20
		p += sizeof(Msg20);
		// remember num to free in reset() function
		m_numToFree++;
	}

	return true;
}

bool Msg40::launchMsg20s(bool recalled) {

	if( !m_si ) {
		logError("cannot use this function when m_si is not set");
		gbshutdownLogicError();
	}

	// don't launch any more if client browser closed socket
	if ( m_socketHadError ) {
		g_process.shutdownAbort(true);
	}

	// these are just like for passing to Msg39 above
	int64_t maxCacheAge = 0 ;
	// may it somewhat jive with the search results caching, otherwise
	// it will tell me a search result was indexed like 3 days ago
	// when it was just indexed 10 minutes ago because the 
	// titledbMaxCacheAge was set way too high
	if ( m_si->m_rcache )
		maxCacheAge = g_conf.m_docSummaryWithDescriptionMaxCacheAge;

	int32_t maxOut = (int32_t)MAX_OUTSTANDING_MSG20S;
	if ( g_udpServer.getNumUsedSlots() > 500 ) maxOut = 10;
	if ( g_udpServer.getNumUsedSlots() > 800 ) maxOut = 1;

	// if not deduping or site clustering, then
	// just skip over docids for speed.
	// don't bother with summaries we do not need
	if ( ! m_si->m_doDupContentRemoval &&
	     ! m_si->m_doSiteClustering &&
	     m_lastProcessedi == -1 ) {
		// start getting summaries with the result # they want
		m_lastProcessedi = m_si->m_firstResultNum-1;
		// assume we printed the summaries before
		m_printi = m_si->m_firstResultNum;
		m_numDisplayed = m_si->m_firstResultNum;
		// fake this so Msg40::gotSummary() can let us finish
		// because it checks m_numRequests <  m_msg3a.m_numDocIds
		m_numRequests = m_si->m_firstResultNum;
		m_numReplies  = m_si->m_firstResultNum;
		m_didSummarySkip = true;
		log("query: skipping summary generation of first %" PRId32" docs",
		    m_si->m_firstResultNum);
	}

	// . launch a msg20 getSummary() for each docid
	// . m_numContiguous should preceed any gap, see below
	for ( int32_t i = m_lastProcessedi+1 ; i < m_msg3a.m_numDocIds ;i++ ) {
		// if the user only requested docids, do not get the summaries
		if ( m_si->m_docIdsOnly ) break;
		// hard limit
		if ( m_numRequests-m_numReplies >= maxOut ) break;

		// do not repeat for this i
		m_lastProcessedi = i;

		// start up a Msg20 to get the summary
		Msg20 *m = m_msg20[i];

		// if to a dead host, skip it
		int64_t docId = m_msg3a.m_docIds[i];
		uint32_t shardNum = g_hostdb.getShardNumFromDocId ( docId );
		// get the collection rec
		const CollectionRec *cr = g_collectiondb.getRec(m_firstCollnum);
		// if shard is dead then do not send to it if not crawlbot
		if ( g_hostdb.isShardDead ( shardNum ) && cr ) {
			log("msg40: skipping summary lookup #%" PRId32" of docid %" PRId64" for dead shard #%" PRId32
			    , i
			    , docId
			    , shardNum );
			m_numRequests++;
			m_numReplies++;
			continue;
		}


		// if msg20 ptr null that means the cluster level is not CR_OK
		if ( ! m ) {
			m_numRequests++;
			m_numReplies++;
			continue;
		}
		// . did we already TRY to get the summary for this docid?
		// . we might be re-called from the refilter: below
		// . if already did it, skip it
		// . Msg20::getSummary() sets m_docId, first thing
		if ( m_msg3a.m_docIds[i] == m->getRequestDocId() ) {
			m_numRequests++;
			m_numReplies++;
			continue;
		}

		// assume no error
		g_errno = 0;
		// debug msg
		if ( m_si->m_debug || g_conf.m_logDebugQuery )
			logf(LOG_DEBUG,"query: msg40: [%p] Getting summary #%" PRId32" for docId=%" PRId64,
			     this,i,m_msg3a.m_docIds[i]);
		// launch it
		m_numRequests++;

		if ( ! cr ) {
			log("msg40: missing coll");
			g_errno = ENOCOLLREC;
			if ( m_numReplies < m_numRequests ) return false;
			return true;
		}

		// set the summary request then get it!
		Msg20Request req;
		Query *q = &m_si->m_q;
		req.ptr_qbuf             = const_cast<char*>(q->getQuery());
		req.size_qbuf            = q->getQueryLen()+1;
		req.m_langId             = m_si->m_queryLangId;
		req.m_prefferedResultLangId = getLangIdFromAbbr(m_si->getPreferredResultLanguage().c_str());

		req.m_highlightQueryTerms = m_si->m_doQueryHighlighting;

		req.m_isDebug            = (bool)m_si->m_debug;

		if ( m_si->m_displayMetas && m_si->m_displayMetas[0] ) {
			int32_t dlen = strlen(m_si->m_displayMetas);
			req.ptr_displayMetas     = const_cast<char *>(m_si->m_displayMetas);
			req.size_displayMetas    = dlen+1;
		}

		req.m_docId              = m_msg3a.m_docIds[i];
		
		// if the msg3a was merged from other msg3as because we
		// were searching multiple collections...
		if ( m_msg3a.m_collnums )
			req.m_collnum = m_msg3a.m_collnums[i];
		// otherwise, just one collection
		else
			req.m_collnum = m_msg3a.m_msg39req.m_collnum;

		req.m_numSummaryLines    = m_si->m_numLinesInSummary;
		req.m_maxCacheAge        = maxCacheAge;
		req.m_state              = this;
		req.m_callback           = gotSummaryWrapper;
		req.m_niceness           = m_si->m_niceness;
		req.m_showBanned         = m_si->m_showBanned;
		req.m_includeCachedCopy  = m_si->m_includeCachedCopy;
		req.m_getSummaryVector   = true;
		req.m_titleMaxLen = m_si->m_titleMaxLen;
		req.m_summaryMaxLen = cr->m_summaryMaxLen;
		req.m_word_variations_config     = m_si->m_word_variations_config;
		req.m_useQueryStopWords  = m_si->m_word_variations_config.m_wiktionaryWordVariations; //SearchInput doesn't have a m_useQueryStopWords, but if they wanted synonyms (m_queryExpansion) then they probably also want stop words
		req.m_allowHighFrequencyTermCache = m_si->m_allowHighFrequencyTermCache;

		// Line means excerpt 
		req.m_summaryMaxNumCharsPerLine = m_si->m_summaryMaxNumCharsPerLine;

		// a special undocumented thing for getting <h1> tag
		req.m_getHeaderTag       = m_si->m_hr.getLong("geth1tag",0);
		// let "ns" parm override
		req.m_numSummaryLines    = m_si->m_numLinesInSummary;

		// . buzz likes to do the &inlinks=1 parm to get inlinks
		// . use "&inlinks=1" for realtime inlink info, use 
		//   "&inlinks=2" to just get it from the title rec, which is 
		//   more stale, but does not take extra time or resources
		// . we "default" to the realtime stuff... i.e. since buzz
		//   is already using "&inlinks=1"
		if ( m_si->m_displayInlinks == 2 ) 
			req.m_getLinkInfo     = true;

		// it copies this using a serialize() function
		if ( ! m->getSummary ( &req ) ) continue;

		// got reply
		m_numReplies++;
		// . otherwise we got summary without blocking
		// . deal with an error
		if ( ! g_errno ) continue;
		// log it
		log("query: Had error getting summary: %s.",
		    mstrerror(g_errno));
		// record g_errno
		if ( ! m_errno ) m_errno = g_errno;
		// reset g_errno
		g_errno   = 0;
	}
	// return false if still waiting on replies
	if ( m_numReplies < m_numRequests ) return false;
	// do not re-call gotSummary() to avoid a possible recursive stack
	// explosion. this is only true if we are being called from 
	// gotSummary() already, so do not call it again!!
	if ( recalled ) 
		return true;
	// if we got nothing, that's it
	if ( m_msg3a.m_numDocIds <= 0 ) {
		// otherwise, we're done
		return true;
	}

	// . i guess crash here for now
	// . seems like we can call reallocMsg20Buf() and the first 50
	//   can already be set, so we drop down to here... so don't core
	logf(LOG_DEBUG,"query: Had all msg20s already.");
	// . otherwise, we got everyone, so go right to the merge routine
	// . returns false if not all replies have been received 
	// . returns true if done
	// . sets g_errno on error
	return gotSummary ( );
}

Msg20 *Msg40::getAvailMsg20 ( ) {
	for ( int32_t i = 0 ; i < m_numMsg20s ; i++ ) {
		// m_inProgress is set to false right before it
		// calls Msg20::m_callback which is gotSummaryWrapper()
		// so we should be ok with this
		if ( m_msg20[i]->m_launched ) continue;
		return m_msg20[i];
	}
	// how can this happen???  THIS HAPPEND
	g_process.shutdownAbort(true); 
	return NULL;
}

Msg20 *Msg40::getCompletedSummary ( int32_t ix ) {
	for ( int32_t i = 0 ; i < m_numMsg20s ; i++ ) {
		// it seems m_numMsg20s can be > m_numRequests when doing
		// a multi collection federated search somehow and this
		// can therefore be null
		if ( ! m_msg20[i] ) 
			continue;
			
		if ( m_msg20[i]->m_ii != ix ) continue;
		if ( m_msg20[i]->m_inProgress ) return NULL;
		return m_msg20[i];
	}
	return NULL;
}

bool gotSummaryWrapper ( void *state ) {
	Msg40 *THIS  = (Msg40 *)state;
	// inc it here
	THIS->m_numReplies++;

	if ( (THIS->m_numReplies % 10) == 0 ) {
		log( "msg40: got %" PRId32 " summaries out of %" PRId32 "",
		     THIS->m_numReplies,
		     THIS->m_msg3a.m_numDocIds );
	}

	// it returns false if we're still awaiting replies
	if ( !THIS->gotSummary() ) {
		return false;
	}

	// now call callback, we're done
	log(LOG_INFO, "query: Msg40 end: query_id='%s' query='%s', results=%d", THIS->m_si->m_queryId, THIS->m_si->m_query, THIS->getNumResults());
	THIS->m_callback ( THIS->m_state );

	return true;
}

static void doneSendingWrapper9(void *state, TcpSocket *sock) {
	Msg40 *THIS = (Msg40 *)state;

	// the send completed, count it
	THIS->m_sendsIn++;

	// error?
	if ( THIS->m_sendsIn > THIS->m_sendsOut ) {
		log("msg40: sendsin > sendsout. bailing!!!");
		// try to prevent a core i haven't fixed right yet!!!
		// seems like a reply coming back after we've destroyed the
		// state!!!
		return;
	}
	// socket error? if client closes the socket midstream we get one.
	if ( g_errno ) {
		THIS->m_socketHadError = g_errno;
		log("msg40: streaming socket had error: %s",
		    mstrerror(g_errno));
		// i guess destroy the socket here so we don't get called again?
	}

	// clear it so we don't think it was a msg20 error below
	g_errno = 0;

	// try to send more... returns false if blocked on something
	if ( ! THIS->gotSummary() ) return;

	// all done!!!???
	log(LOG_INFO, "query: Msg40 end: query_id='%s' query='%s', results=%d", THIS->m_si->m_queryId, THIS->m_si->m_query, THIS->getNumResults());
	THIS->m_callback ( THIS->m_state );
}

// . returns false if not all replies have been received (or timed/erroredout)
// . returns true if done (or an error finished us)
// . sets g_errno on error
bool Msg40::gotSummary ( ) {
	// now m_linkInfo[i] (for some i, i dunno which) is filled
	if ( m_si->m_debug || g_conf.m_logDebugQuery )
		logf(LOG_DEBUG,"query: msg40: [%p] Got summary. Total got=#%" PRId32".",
		     this,m_numReplies);

	// did we have a problem getting this summary?
	if ( g_errno ) {
		// save it
		m_errno = g_errno;
		// log it
		log("query: msg40: Got error getting summary: %s.", mstrerror(g_errno));
		// reset g_errno
		g_errno = 0;
	}

	// initialize dedup table if we haven't already
	if ( ! m_dedupTable.isInitialized() &&
	     ! m_dedupTable.set (4,0,64,NULL,0,false,"srdt") )
		log("query: error initializing dedup table: %s",
		    mstrerror(g_errno));

	State0 *st = (State0 *)m_state;

 doAgain:

	st->m_sb.reset();

	// do we still own this socket? i am thinking it got closed somewhere
	// and the socket descriptor was re-assigned to another socket
	// getting a diffbot reply from XmLDoc::getDiffbotReply()
	if ( st->m_socket && 
	     st->m_socket->m_startTime != st->m_socketStartTimeHack ) {
		log("msg40: lost control of socket. sd=%i. the socket descriptor closed on us and got re-used by someone else.",
		    (int)st->m_socket->m_sd);
		// if there wasn't already an error like 'broken pipe' then
		// set it here so we stop getting summaries if streaming.
		if ( ! m_socketHadError ) m_socketHadError = EBADENGINEER;
		// make it NULL to avoid us from doing anything to it
		// since sommeone else is using it now.
		st->m_socket = NULL;
	}


	// . transmit the chunk in sb if non-zero length
	// . steals the allocated buffer from sb and stores in the 
	//   TcpSocket::m_sendBuf, which it frees when socket is
	//   ultimately destroyed or we call sendChunk() again.
	// . when TcpServer is done transmitting, it does not close the
	//   socket but rather calls doneSendingWrapper() which can call
	//   this function again to send another chunk
	// . when we are truly done sending all the data, then we set lastChunk
	//   to true and TcpServer.cpp will destroy m_socket when done.
	//   no, actually we just set m_streamingMode to false i guess above
	if ( st->m_sb.length() &&
	     // did client browser close the socket on us midstream?
	     ! m_socketHadError &&
	     st->m_socket)
	{
		if( ! g_httpServer.m_tcp.sendChunk ( st->m_socket,
						     &st->m_sb,
						     this,
						     doneSendingWrapper9 ) )
			// if it blocked, inc this count. we'll only call m_callback
			// above when m_sendsIn equals m_sendsOut... and
			// m_numReplies == m_numRequests
			m_sendsOut++;

		// writing on closed socket?
		if ( g_errno ) {
			if ( ! m_socketHadError ) m_socketHadError = g_errno;
			log("msg40: got tcp error : %s",mstrerror(g_errno));
			// disown it here so we do not damage in case it gets
			// reopened by someone else
			st->m_socket = NULL;
		}
	}

	// do we need to launch another batch of summary requests?
	if ( m_numRequests < m_msg3a.m_numDocIds && ! m_socketHadError ) {
		// . if we can launch another, do it
		// . say "true" here so it does not call us, gotSummary() and 
		//   do a recursive stack explosion
		// . this returns false if still waiting on more to come back
		if ( ! launchMsg20s ( true ) ) return false; 

		// it returned true, so m_numRequests == m_numReplies and
		// we don't need to launch any more! but that does NOT
		// make sense because m_numContiguous < m_msg3a.m_numDocIds
		// . i guess the launch can fail because of oom... and
		//   end up returning true here... seen it happen, and
		//   we had full requests/replies for m_msg3a.m_numDocIds
		log("msg40: got all replies i guess");
		goto doAgain;
	}


	// . ok, now i wait for all msg20s (getsummary) to come back in.
	// . TODO: evaluate if this hurts us
	if ( m_numReplies < m_numRequests )
		return false;
	
	return gotSummaries();
}


//We got the replies we initially requested. Now set cluster levels, and filter on clustering, family filter,
//error/show-errors, etc. After that, the summaries are ready for realtime-classification.
bool Msg40::gotSummaries() {
	int64_t startTime = gettimeofdayInMilliseconds();

	// loop over each clusterLevel and set it
	for ( int32_t i = 0 ; i < m_numReplies ; i++ ) {
		// did we skip the first X summaries because we were
		// not deduping/siteclustering/gettingGigabits?
		if ( m_didSummarySkip && i < m_si->m_firstResultNum )
			continue;
		// get current cluster level
		char *level = &m_msg3a.m_clusterLevels[i];
		// sanity check -- this is a transistional value msg3a should 
		// set it to something else!
		if ( *level == CR_GOT_REC         ) { g_process.shutdownAbort(true); }
		if ( *level == CR_ERROR_CLUSTERDB ) { g_process.shutdownAbort(true); }
		// skip if already "bad"
		if ( *level != CR_OK ) continue;
		// if the user only requested docids, we have no summaries
		if ( m_si->m_docIdsOnly ) break;
		// convenient var
		Msg20 *m = m_msg20[i];
		// get the Msg20 reply
		const Msg20Reply *mr = m->m_r;
		// if no reply, all hosts must have been dead i guess so
		// filter out this guy
		if ( ! mr && ! m->m_errno ) {
			logf(LOG_DEBUG,"query: msg 20 reply was null.");
			m->m_errno = ENOHOSTS;
		}

		if ( m_si->m_familyFilter && mr && mr->m_isAdult) {
			logf(LOG_DEBUG,"query: msg20.is_adult and family filter is on.");
			m->m_errno = EDOCADULT;
		}
		// if any msg20 has m_errno set, then set ours so at least the
		// xml feed will know there was a problem even though it may 
		// have gotten search results.
		if ( m->m_errno ) {
			if ( m_si->m_debug || g_conf.m_logDebugQuery ) {
				logf( LOG_DEBUG, "query: result %" PRId32 " (docid=%" PRId64 ") had an error (%s) and will not be shown.",
				      i, m_msg3a.m_docIds[i], mstrerror( m->m_errno ) );
			}

			// update our m_errno while here
			if ( ! m_errno ) {
				m_errno = m->m_errno;
			}

			if ( ! m_si->m_showErrors ) {
				*level = CR_ERROR_SUMMARY;
				continue;
			}
		}

		// a special case
		if ( mr && ( mr->m_errno == CR_RULESET_FILTERED || mr->m_errno == EDOCFILTERED ) ) {
			*level = CR_RULESET_FILTERED;
			continue;
		}

		if ( ! m_si->m_showBanned && mr && mr->m_isBanned ) {
			if ( m_si->m_debug || g_conf.m_logDebugQuery )
				logf( LOG_DEBUG, "query: result %" PRId32 " (docid=%" PRId64 ") is banned and will not be shown.",
					  i, m_msg3a.m_docIds[i] );
			*level = CR_BANNED_URL;
			continue;
		}

		// corruption?
		if ( mr && !mr->ptr_ubuf ) {
			log( "msg40: got corrupt msg20 reply for docid %" PRId64, mr->m_docId );
			*level = CR_BAD_URL;
			continue;
		}

		// don't filter out disallowed root doc
		if (mr && mr->m_indexCode == EDOCDISALLOWEDROOT) {
			continue;
		}

//              temporarily disabled: if titledb has old records with content and redirect then this ends up filtering out most results and the whole query will be very slow
// 		// filter simplified redirection/non-caconical document
// 		if (mr && mr->size_rubuf > 1 && (mr->m_contentLen <= 0 || mr->m_httpStatus != 200 ||
// 		    mr->m_indexCode == EDOCNONCANONICAL || mr->m_indexCode == EDOCSIMPLIFIEDREDIR)) {
// 			if (!m_si->m_showErrors) {
// 				*level = CR_EMPTY_REDIRECTION_PAGE;
// 				continue;
// 			}
// 		}

		// filter empty title & summaries
		if ( mr && mr->size_tbuf <= 1 && mr->size_displaySum <= 1 ) {
			if ( ! m_si->m_showErrors ) {
				*level = CR_EMPTY_TITLE_SUMMARY;
				continue;
			}
		}
	}

	// what is the deduping threshhold? 0 means do not do deuping
	int32_t dedupPercent = 0;
	if ( m_si->m_doDupContentRemoval && m_si->m_percentSimilarSummary )
		dedupPercent = m_si->m_percentSimilarSummary;
	// icc=1 turns this off too i think
	if ( m_si->m_includeCachedCopy ) dedupPercent = 0;
	// if the user only requested docids, we have no summaries
	if ( m_si->m_docIdsOnly ) dedupPercent = 0;

	// filter out duplicate/similar summaries
	for ( int32_t i = 0 ; dedupPercent && i < m_numReplies ; i++ ) {
		// skip if already invisible
		if ( m_msg3a.m_clusterLevels[i] != CR_OK ) continue;
		// Skip if invalid
		if ( m_msg20[i]->m_errno ) continue;

		// get it
		const Msg20Reply *mri = m_msg20[i]->m_r;

		// see if any result lower-scoring than #i is a dup of #i
		for( int32_t m = i+1 ; m < m_numReplies ; m++ ) {
			// get current cluster level
			char *level = &m_msg3a.m_clusterLevels[m];
			// skip if already invisible
			if ( *level != CR_OK ) continue;
			// get it
			if ( m_msg20[m]->m_errno ) continue;

			const Msg20Reply *mrm = m_msg20[m]->m_r;

			// use gigabit vector to do topic clustering, etc.
			const int32_t *vi = (int32_t *)mri->ptr_vbuf;
			const int32_t *vm = (int32_t *)mrm->ptr_vbuf;
			float s ;
			s = computeSimilarity(vi,vm,NULL,NULL,NULL);
			// skip if not similar
			if ( (int32_t)s < dedupPercent ) continue;
			// otherwise mark it as a summary dup
			if ( m_si->m_debug || g_conf.m_logDebugQuery )
				logf( LOG_DEBUG, "query: result #%" PRId32" (docid=%" PRId64") is %.02f%% similar-summary of #%" PRId32" (docid=%" PRId64")",
				      m, m_msg3a.m_docIds[m] , 
				      s, i, m_msg3a.m_docIds[i] );
			*level = CR_DUP_SUMMARY;
		}
	}



	//
	// BEGIN URL NORMALIZE AND COMPARE
	//
        
	// . ONLY DEDUP URL if it explicitly enabled AND we are not performing
	//   a site: or suburl: query.
	if(m_si->m_dedupURL &&
	   !m_si->m_q.m_hasPositiveSiteField &&
	   !m_si->m_q.m_hasSubUrlField) {
		for(int32_t i = 0 ; i < m_msg3a.m_numDocIds ; i++) {
			// skip if already invisible
			if(m_msg3a.m_clusterLevels[i] != CR_OK) continue;

			// get it
			const Msg20Reply *mr = m_msg20[i]->m_r;

			// hash the URL all in lower case to catch wiki dups
			const char *url  = mr-> ptr_ubuf;
			int32_t  ulen = mr->size_ubuf - 1;
			
			// since the redirect url is a more accurate 
			// representation of the conent do that if it exists.
			if ( mr->ptr_rubuf ) {
				url  = mr-> ptr_rubuf;
				ulen = mr->size_rubuf - 1;
			}

			// fix for directories, sometimes they are indexed
			// without a trailing slash, so let's normalize to
			// this standard.
			if(url[ulen-1] == '/')
				ulen--;

			Url u;
			u.set( url, ulen );
			url   = u.getHost();

			if(u.getPathLen() > 1) {
				// . remove sub-domain to fix conflicts with
				//   sites having www,us,en,fr,de,uk,etc AND
				//   it redirects to the same page.
				const char *host = u.getHost();
				const char *mdom = u.getMidDomain();
				if(mdom && host) {
					int32_t  hlen = mdom - host;
					if (isVariantLikeSubDomain(host, hlen-1))
						url = mdom;
				}
			}

			// adjust url string length
			ulen -= url - u.getUrl();

			uint64_t h = hash64Lower_a(url, ulen);
			int32_t slot = m_urlTable.getSlot(h);
			// if there is no slot,this url doesn't exist => add it
			if(slot == -1) {
				m_urlTable.addKey(h,mr->m_docId);
			} else {
				// If there was a slot, denote with the
				// cluster level URL already exited previously
				char *level = &m_msg3a.m_clusterLevels[i];
				if(m_si->m_debug || g_conf.m_logDebugQuery)
					logf(LOG_DEBUG, "query: result #%" PRId32" (docid=%" PRId64") is the same URL as (docid=%" PRId64")",
						 i,m_msg3a.m_docIds[i],
						 m_urlTable.getValueFromSlot(slot));
				*level = CR_DUP_URL;
			}
		}
	}

	//
	// END URL NORMALIZE AND COMPARE
	//

	// show time
	int64_t took = gettimeofdayInMilliseconds() - startTime;
	if ( took > 3 )
		log(LOG_INFO,"query: Took %" PRId64" ms to do clustering and dup removal.",took);

	if(!submitUrlRealtimeClassification())
		return false;
	else
		return gotEnoughSummaries();
}


struct UrlClassificationContext {
	Msg40 *msg40;
	int i;
	UrlClassificationContext(Msg40 *msg40_, int i_) : msg40(msg40_), i(i_) {}
};

//start classifying the URLs of the results
bool Msg40::submitUrlRealtimeClassification() {
	if(!g_urlRealtimeClassification.realtimeUrlClassificationWorks()) {
		log(LOG_DEBUG,"Bypassing URL realtime classification because it is diabled or not working");
		return true; //done
	}
	
	{
		ScopedLock sl(m_mtxRealtimeClassificationsCounters);
		m_realtimeClassificationsSubmitted = true;
	}
	
	int num_started = 0;
	int num_wanted_to_start = 0;
	for(int i=0; i<m_numReplies; i++) {
		if(m_msg3a.m_clusterLevels[i]==CR_OK) {
			num_wanted_to_start++;
			Msg20 *m = m_msg20[i];
			std::string url(m->m_r->ptr_ubuf,m->m_r->size_ubuf);
			UrlClassificationContext *ucc = new UrlClassificationContext(this,i);
			incrementRealtimeClassificationsStarted();
			if(g_urlRealtimeClassification.classifyUrl(url.c_str(),&urlClassificationCallback0,ucc)) {
				logTrace(g_conf.m_logTraceUrlClassification, "URL classification of '%s' started", url.c_str());
				num_started++;
			} else {
				logTrace(g_conf.m_logTraceUrlClassification, "URL classification of '%s' NOT started", url.c_str());
				incrementRealtimeClassificationsCompleted();
				delete ucc;
			}
		}
	}
	
	log(LOG_DEBUG,"msg40: Started URL classification on %d out of %d URLs (wanted %d)", num_started, m_numReplies, num_wanted_to_start);
	
	bool done;
	{
		ScopedLock sl(m_mtxRealtimeClassificationsCounters);
		m_realtimeClassificationsSubmitted = false;
		done = (m_numRealtimeClassificationsCompleted == m_numRealtimeClassificationsStarted);
	}
	
	return done;
}


void Msg40::urlClassificationCallback0(void *context, uint32_t classification) {
	UrlClassificationContext *ucc = reinterpret_cast<UrlClassificationContext*>(context);
	ucc->msg40->urlClassificationCallback1(ucc->i,classification);
	delete ucc;
}

void Msg40::urlClassificationCallback1(int i, uint32_t classification) {
	if(classification&URL_CLASSIFICATION_MALICIOUS) {
		m_msg3a.m_clusterLevels[i] = CR_MALICIOUS;
		log(LOG_DEBUG,"URL '%*.*s' classified as malicous. Filtering it out",
		    (int)m_msg20[i]->m_r->size_ubuf, (int)m_msg20[i]->m_r->size_ubuf, m_msg20[i]->m_r->ptr_ubuf);
	}
	if(incrementRealtimeClassificationsCompleted()) {
		log(LOG_TRACE,"msg40: all URL classifications completed");
		if(gotEnoughSummaries()) {
			log(LOG_INFO, "query: Msg40 end: query_id='%s' query='%s', results=%d", m_si->m_queryId, m_si->m_query, getNumResults());
			m_callback(m_state);
		}
	}
}


bool Msg40::gotEnoughSummaries() {
	m_omitCount = 0;

	// count how many are visible!
	int32_t visible = 0;
	// loop over each clusterLevel and set it
	for ( int32_t i = 0 ; i < m_numReplies ; i++ ) {
		// get current cluster level
		const char *level = &m_msg3a.m_clusterLevels[i];
		// on CR_OK
		if ( *level == CR_OK ) visible++;
		// otherwise count as ommitted
		else m_omitCount++;
	}

	// . let's wait for the tasks to complete before even trying to launch
	//   more than the first MAX_OUTSTANDING msg20s
	// . the msg3a re-call will end up re-doing our tasks as well! so we
	//   have to make sure they complete at this point
	if ( m_tasksRemaining > 0 ) return false;

	// debug
	bool debug = (m_si->m_debug || g_conf.m_logDebugQuery);
	for ( int32_t i = 0 ; debug && i < m_msg3a.m_numDocIds ; i++ ) {
		int32_t cn = (int32_t)m_msg3a.m_clusterLevels[i];
		if ( cn < 0 || cn >= CR_END ) { g_process.shutdownAbort(true); }
		const char *s = g_crStrings[cn];
		if ( ! s ) { g_process.shutdownAbort(true); }
		logf(LOG_DEBUG, "query: msg40 final hit #%" PRId32") d=%" PRIu64" cl=%" PRId32" (%s)",
		     i,m_msg3a.m_docIds[i],(int32_t)m_msg3a.m_clusterLevels[i],s);
	}

	if ( debug )
		logf (LOG_DEBUG,"query: msg40: firstResult=%" PRId32", "
		      "totalDocIds=%" PRId32", resultsWanted=%" PRId32" "
		      "visible=%" PRId32" toGet=%" PRId32" recallCnt=%" PRId32,
		      m_si->m_firstResultNum, m_msg3a.m_numDocIds ,
		      m_docsToGetVisible, visible,
		      //m_numContiguous, 
		      m_docsToGet , m_msg3aRecallCnt);

	// if we do not have enough visible, try to get more
	if ( visible < m_docsToGetVisible && m_msg3a.m_moreDocIdsAvail &&
	     // do not spin too long in this!
	     // TODO: fix this better somehow later
	     m_docsToGet <= 1000 &&
	     // doesn't work on multi-coll just yet, it cores
	     m_numCollsToSearch == 1 ) {
		if(m_deadline>0 && m_deadline>gettimeofdayInMilliseconds()) {
			// can it cover us?
			int32_t need = m_docsToGet + 20;
			// increase by 25 percent as well
			need *= 1.25;
			// note it
			log("msg40: too many summaries invisible. getting more docids from msg3a merge and getting summaries. "
			    "%" PRId32" are visible, need %" PRId32". %" PRId32" to %" PRId32". numReplies=%" PRId32" numRequests=%" PRId32,
			    visible, m_docsToGetVisible,
			    m_msg3a.m_docsToGet, need,
			    m_numReplies, m_numRequests);

			// get more!
			m_docsToGet = need;
			// reset this before launch
			m_numReplies  = 0;
			m_numRequests = 0;
			// reprocess all!
			m_lastProcessedi = -1;
			// let's do it all from the top!
			log(LOG_INFO, "query: Msg40 redo: query_id='%s' query='%s', visible=%d", m_si->m_queryId, m_si->m_query, visible);
			return getDocIds ( true ) ;
		} else {
			log("msg40: many summaries invisible but deadline has been passed. %d are visible, wanted %d",
			    visible, m_docsToGetVisible);
		}
	}

	// get time now
	int64_t now = gettimeofdayInMilliseconds();
	// . add the stat for how long to get all the summaries
	// . use purple for tie to get all summaries
	// . THIS INCLUDES Msg3a/Msg39 RECALLS!!!
	// . can we subtract that?
	//"get_all_summaries"
	g_stats.addStat_r ( 0, m_startTime, now, 0x008220ff );

	// timestamp log
	if ( g_conf.m_logTimingQuery || m_si->m_debug )
		logf(LOG_DEBUG,"query: msg40: [%p] Got %" PRId32" summaries in %" PRId64" ms",
		     this ,
		     visible, // m_visibleContiguous,
		     now - m_startTime );


	// set m_moreToCome, if true, we print a "Next 10" link
	m_moreToCome = (visible > m_si->m_docsWanted+m_si->m_firstResultNum);
	if ( m_si->m_debug || g_conf.m_logDebugQuery ) {
		logf( LOG_DEBUG, "query: msg40: more? %d", m_moreToCome );
	}

	// alloc m_buf, which should be NULL
	if ( m_buf ) { g_process.shutdownAbort(true); }

	// . we need to collapse m_msg3a.m_docIds[], etc. into m_docIds[] etc
	//   to be just the docids we wanted.
	// . at this point we should merge in all docids from all Msg40s from
	//   different clusters, etc.
	// . now alloc space for "docsWanted" m_docIds[], m_scores[], 
	//   m_bitScores[], m_clusterLevels[] and m_newMsg20[]

	//
	// HACK TIME
	//

	// . bury filtered/clustered docids from m_msg3a.m_docIds[]
	// . also remove result no in the request window specified by &s=X&n=Y
	//   where "s" is m_si->m_firstResultNum (which starts at 0) and "n" 
	//   is the number of results requested, m_si->m_docsWanted
	// . this is a bit of a hack (MDW)
	int32_t c = 0;
	int32_t v = 0;
	for ( int32_t i = 0 ; i < m_msg3a.m_numDocIds ; i++ ) {
		// must ahve a cluster level of CR_OK (visible)
		// v is the visible count
		if ( ( m_msg3a.m_clusterLevels[i] != CR_OK ) || ( v++ < m_si->m_firstResultNum ) ) {
			// skip
			continue;
		}

		// we got a winner, save it
		m_msg3a.m_docIds        [c] = m_msg3a.m_docIds        [i];
		m_msg3a.m_scores        [c] = m_msg3a.m_scores        [i];
		m_msg3a.m_flags         [c] = m_msg3a.m_flags         [i];
		m_msg3a.m_clusterLevels [c] = m_msg3a.m_clusterLevels [i];
		m_msg20                 [c] = m_msg20                 [i];

		if ( m_msg3a.m_scoreInfos ) {
			m_msg3a.m_scoreInfos [c] = m_msg3a.m_scoreInfos [i];
		}

		int32_t need = m_si->m_docsWanted;

		// if done, bail
		if ( ++c >= need ) {
			break;
		}
	}
	// reset the # of docids we got to how many we kept!
	m_msg3a.m_numDocIds = c;

	// debug
	for ( int32_t i = 0 ; debug && i < m_msg3a.m_numDocIds ; i++ )
		logf(LOG_DEBUG, "query: msg40 clipped hit #%" PRId32") d=%" PRIu64" cl=%" PRId32" (%s)",
		     i,m_msg3a.m_docIds[i],(int32_t)m_msg3a.m_clusterLevels[i],
		     g_crStrings[(int32_t)m_msg3a.m_clusterLevels[i]]);

	//
	// END HACK
	// 

	//Old logic for whether to store the msg40+results in the cache or not. Logic looks fine
	//but the cache has been removed a long time ago.
	//
	// // . uc = use cache?
	// // . store in cache now if we need to
	// bool uc = false;
	// if ( m_si->m_useCache   ) uc = true;
	// if ( m_si->m_wcache     ) uc = true;
	// // . do not store if there was an error
	// // . no, allow errors in cache since we often have lots of
	// //   docid not founds and what not, due to index corruption and
	// //   being out of sync with titledb
	// if ( m_errno            &&
	//      // forgive "Record not found" errors, they are quite common
	//      m_errno != ENOTFOUND ) {
	// 	logf(LOG_DEBUG,"query: not storing in cache: %s",
	// 	     mstrerror(m_errno));
	// 	uc = false;
	// }
	// if ( m_si->m_docIdsOnly ) uc = false;
	//
	// // all done if not storing in cache
	// if ( ! uc ) return true;


	// ignore errors
	g_errno = 0;
 	return true;
}

//For the purpose of clustering and result suppression these hosts are considered the same:
//  example.com
//  www.example.com
//  fr.example.com
//  pl.example.com
//etc.
static const char * const s_variantLikeSubDomains[] = {
	//Language sub-domains. Basically ISO 639-1 language codes, unknown revision (two-letter codes)
	//Not perfec, eg. we don't detect nynorsk vs. bokmål because those have iso 639-2 three-letter codes
	"aa",
	"ab",
	"af",
	"am",
	"ar",
	"as",
	"ay",
	"az",
	"ba",
	"be",
	"bg",
	"bh",
	"bi",
	"bn",
	"bo",
	"br",
	"ca",
	"co",
	"cs",
	"cy",
	"da",
	"de",
	"dz",
	"el",
	"en",
	"eo",
	"es",
	"et",
	"eu",
	"fa",
	"fi",
	"fj",
	"fo",
	"fr",
	"fy",
	"ga",
	"gd",
	"gl",
	"gn",
	"gu",
	"ha",
	"he",
	"hi",
	"hr",
	"hu",
	"hy",
	"ia",
	"id",
	"ie",
	"ik",
	"is",
	"it",
	"iu",
	"ja",
	"jw",
	"ka",
	"kk",
	"kl",
	"km",
	"kn",
	"ko",
	"ks",
	"ku",
	"ky",
	"la",
	"ln",
	"lo",
	"lt",
	"lv",
	"mg",
	"mi",
	"mk",
	"ml",
	"mn",
	"mo",
	"mr",
	"ms",
	"mt",
	"my",
	"na",
	"ne",
	"nl",
	"no",
	"oc",
	"om",
	"or",
	"pa",
	"pl",
	"ps",
	"pt",
	"qu",
	"rm",
	"rn",
	"ro",
	"ru",
	"rw",
	"sa",
	"sd",
	"sg",
	"sh",
	"si",
	"sk",
	"sl",
	"sm",
	"sn",
	"so",
	"sq",
	"sr",
	"ss",
	"st",
	"su",
	"sv",
	"sw",
	"ta",
	"te",
	"tg",
	"th",
	"ti",
	"tk",
	"tl",
	"tn",
	"to",
	"tr",
	"ts",
	"tt",
	"tw",
	"ug",
	"uk",
	"ur",
	"uz",
	"vi",
	"vo",
	"wo",
	"xh",
	"yi",
	"yo",
	"za",
	"zh",
	"zu",
        // Common Country sub-domains
        "us" ,
        "uk" ,
        // Common web sub-domains
        "www"
};
static HashTable  s_variantLikeSubDomainTable;
static bool       s_variantLikeSubDomainInitialized = false;
static GbMutex    s_variantLikeSubDomainMutex;

static bool initVariantLikeSubDomainTable(HashTable *table, const char * const words[], int32_t size ){
	// set up the hash table
	if ( ! table->set ( size * 2 ) ) {
		log(LOG_INIT, "build: Could not init sub-domain table.");
		return false;
	}
	// now add in all the stop words
	int32_t n = (int32_t)size/ sizeof(char *); 
	for ( int32_t i = 0 ; i < n ; i++ ) {
		const char      *sw    = words[i];
		int32_t       swlen = strlen ( sw );
                int32_t h = hash32Lower_a(sw, swlen);
                int32_t slot = table->getSlot(h);
                // if there is no slot, this url doesn't exist => add it
                if(slot == -1)
                        table->addKey(h,0);
                else 
                        log(LOG_INIT,"build: Sub-domain table has duplicates");
	}
	return true;
}

static bool isVariantLikeSubDomain(const char *s , int32_t len) {
	ScopedLock sl(s_variantLikeSubDomainMutex);
	if ( ! s_variantLikeSubDomainInitialized ) {
		s_variantLikeSubDomainInitialized = initVariantLikeSubDomainTable(&s_variantLikeSubDomainTable, s_variantLikeSubDomains, sizeof(s_variantLikeSubDomains));
		if (!s_variantLikeSubDomainInitialized)
			return false;
	} 
	sl.unlock();

	// get from table
        int32_t h = hash32Lower_a(s, len);
        if(s_variantLikeSubDomainTable.getSlot(h) == -1)
                return false;
	return true;
}		


void Msg40::incrementRealtimeClassificationsStarted() {
	ScopedLock sl(m_mtxRealtimeClassificationsCounters);
	m_numRealtimeClassificationsStarted++;
	if(m_numRealtimeClassificationsCompleted>=m_numRealtimeClassificationsStarted) gbshutdownLogicError();
}

bool Msg40::incrementRealtimeClassificationsCompleted() {
	ScopedLock sl(m_mtxRealtimeClassificationsCounters);
	m_numRealtimeClassificationsCompleted++;
	if(m_numRealtimeClassificationsCompleted>m_numRealtimeClassificationsStarted) gbshutdownLogicError();
	return m_numRealtimeClassificationsCompleted==m_numRealtimeClassificationsStarted && !m_realtimeClassificationsSubmitted;
}

bool Msg40::areAllRealtimeClassificationsCompleted() const {
	ScopedLock sl(const_cast<GbMutex&>(m_mtxRealtimeClassificationsCounters));
	return (!m_realtimeClassificationsSubmitted) && (m_numRealtimeClassificationsCompleted==m_numRealtimeClassificationsStarted);
}