#include "gb-include.h"
#include "XmlDoc.h"
#include "Hostdb.h"
#include "UdpSlot.h"
#include "UdpServer.h"
#include "ip.h"
#include "Process.h"
#include "Mem.h"
#ifdef _VALGRIND_
#include <valgrind/memcheck.h>
#endif
#include "SummaryCache.h"
#include "Conf.h"
#include "Stats.h"


struct Msg20State {
	UdpSlot *m_slot;
	Msg20Request *m_req;
	XmlDoc m_xmldoc;
	Msg20State(UdpSlot *slot, Msg20Request *req) : m_slot(slot), m_req(req), m_xmldoc() {}
};


static void handleRequest20(UdpSlot *slot, int32_t netnice);
static bool gotReplyWrapperxd(void *state);


static bool sendCachedReply ( Msg20Request *req, const void *cached_summary, size_t cached_summary_len, UdpSlot *slot );


Msg20::Msg20 () { 
	constructor(); 
}

Msg20::~Msg20() { 
	reset(); 
}

void Msg20::constructor () {
	m_request = NULL;
	m_r       = NULL;
	m_inProgress = false;
	m_launched = false;
	m_ii = -1;
	reset();
	m_mcast.constructor();
}

void Msg20::destructor() { 
	reset(); 
	m_mcast.destructor(); 
}


void Msg20::freeReply() {
	if (!m_r) {
		return;
	}

	// sometimes the msg20 reply carries an merged bffer from
	// msg40 that is a constructed ptr_eventSummaryLines from a
	// merge operation in msg40. this fixes the "merge20buf1" memory
	// leak from Msg40.cpp
	m_r->destructor();

	if ( m_ownReply ) {
		mfree(m_r, m_replyMaxSize, "Msg20b");
	}

	m_r = NULL;
}

void Msg20::reset() {
	// not allowed to reset one in progress
	if ( m_inProgress ) { 
		// do not core on abrupt exits!
		if (g_process.isShuttingDown()) {
			log("msg20: msg20 not being freed because exiting.");
			return;
		}
		// otherwise core
		g_process.shutdownAbort(true); 
	}

	m_launched = false;
	if ( m_request ) {
		mfree( m_request, m_requestSize, "Msg20rb1" );
	}

	freeReply();

	m_request      = NULL; // the request buf ptr
	m_gotReply     = false;
	m_errno        = 0;
	m_requestDocId = -1LL;
	m_callback     = NULL;
	m_state        = NULL;
	m_ownReply     = true;
	m_requestSize = 0;
	m_replySize = 0;
	m_replyMaxSize = 0;
	m_callback2 = NULL;
}

bool Msg20::registerHandler ( ) {
	// . register ourselves with the udp server
    // . it calls our callback when it receives a msg of type 0x20
    if ( ! g_udpServer.registerHandler ( msg_type_20, handleRequest20 ))
		return false;

	return true;
}

// copy "src" to ourselves
void Msg20::moveFrom(Msg20 *src) {
	memcpy(this, src, sizeof(Msg20));

	// make sure it does not free it!
	src->m_r = NULL;
	m_request = NULL;

	// make sure destructor does not free this
	src->m_request = NULL;
	src->destructor();
}

// returns true and sets g_errno on error, otherwise, blocks and returns false
bool Msg20::getSummary ( Msg20Request *req ) {
	// reset ourselves in case recycled
	reset();

	// consider it "launched"
	m_launched = true;

	// save it
	m_requestDocId = req->m_docId;
	m_state        = req->m_state;
	m_callback     = req->m_callback;
	m_callback2    = NULL;

	// does this ever happen?
	if ( g_hostdb.getNumHosts() <= 0 ) {
		log("build: hosts2.conf is not in working directory, or "
		    "contains no valid hosts.");
		g_errno = EBADENGINEER;
		return true;
	}

	if ( req->m_docId < 0 && ! req->ptr_ubuf ) {
		log("msg20: docid<0 and no url for msg20::getsummary");
		g_errno = EBADREQUEST;
		return true;
	}

	// get groupId from docId, if positive
	uint32_t shardNum;
	if ( req->m_docId >= 0 ) 
		shardNum = g_hostdb.getShardNumFromDocId(req->m_docId);
	else {
		int64_t pdocId = Titledb::getProbableDocId(req->ptr_ubuf);
		shardNum = getShardNumFromDocId(pdocId);
	}

	// we might be getting inlinks for a spider request
	// so make sure timeout is inifinite for that...
	const int32_t timeout = (req->m_niceness==0)
	                      ? multicast_msg20_summary_timeout
	                      : multicast_infinite_send_timeout;

	// get our group
	int32_t  allNumHosts = g_hostdb.getNumHostsPerShard();
	Host *allHosts    = g_hostdb.getShard ( shardNum );

	// put all alive hosts in this array
	Host *cand[32];
	int64_t  nc = 0;
	for ( int32_t i = 0 ; i < allNumHosts ; i++ ) {
		// get that host
		Host *hh = &allHosts[i];
		// skip if dead
		if ( g_hostdb.isDead(hh) ) continue;

		// Respect no-spider, no-query directives from hosts.conf 
		if ( !req->m_getLinkInfo && ! hh->m_queryEnabled ) continue;
		if ( req->m_getLinkInfo && ! hh->m_spiderEnabled ) continue;
		// add it if alive
		cand[nc++] = hh;
	}
	if(nc==0) {
		log(LOG_DEBUG, "msg20: no live candidate hosts for shard %d", shardNum);
		if(g_conf.m_msg20FallbackToAllHosts) {
			log(LOG_DEBUG,"msg20: No alive desired hosts in shard %d - falling back to all hosts in the shard", shardNum);
			for(int32_t i = 0; i < allNumHosts; i++) {
				cand[nc++] = &allHosts[i];
			}
		}
	}
	if ( nc == 0 ) {
		log(LOG_ERROR, "msg20: error sending mcast: no queryable hosts available to handle summary/linkinfo generation in shard %d", shardNum);
		g_errno = EBADENGINEER;
		m_gotReply = true;
		return true;
	}

	// route based on docid region, not parity, because we want to hit
	// the urldb page cache as much as possible
	int64_t sectionWidth =((128LL*1024*1024)/nc)+1;
	int64_t probDocId    = req->m_docId;
	// i think reference pages just pass in a url to get the summary
	if ( probDocId < 0 && req->size_ubuf ) 
		probDocId = Titledb::getProbableDocId ( req->ptr_ubuf );
	if ( probDocId < 0        ) {
		log("query: Got bad docid/url combo.");
		probDocId = 0;
	}
	// we mod by 1MB since tied scores resort to sorting by docid
	// so we don't want to overload the host responsible for the lowest
	// range of docids. CAUTION: do this for msg22 too!
	// in this way we should still ensure a pretty good biased urldb
	// cache... 
	// . TODO: fix the urldb cache preload logic
	int32_t hostNum = (probDocId % (128LL*1024*1024)) / sectionWidth;
	if ( hostNum < 0 ) hostNum = 0; // watch out for negative docids
	if ( hostNum >= nc ) { g_process.shutdownAbort(true); }
	int32_t firstHostId = cand [ hostNum ]->m_hostId ;

	m_requestSize = 0;
	m_request = req->serialize ( &m_requestSize );
	// . it sets g_errno on error and returns NULL
	// . we MUST call gotReply() here to set m_gotReply
	//   otherwise Msg40.cpp can end up looping forever
	//   calling Msg40::launchMsg20s()
	if ( ! m_request ) { gotReply(NULL); return true; }

	// . otherwise, multicast to a host in group "groupId"
	// . returns false and sets g_errno on error
	// . use a pre-allocated buffer to hold the reply
	// . TMPBUFSIZE is how much a UdpSlot can hold w/o allocating
	if (!m_mcast.send(m_request, m_requestSize, msg_type_20, false, shardNum, false, probDocId, this, NULL, gotReplyWrapper20, timeout, req->m_niceness, firstHostId, false)) {
		// sendto() sometimes returns "Network is down" so i guess
		// we just had an "error reply".
		log("msg20: error sending mcast %s",mstrerror(g_errno));
		m_gotReply = true;
		return true;
	}

	// we are officially "in progress"
	m_inProgress = true;

	// we blocked
	return false;
}

void Msg20::gotReplyWrapper20 ( void *state , void */*state2*/ ) {
	Msg20 *THIS = (Msg20 *)state;
	// gotReply() does not block, and does NOT call our callback
	THIS->gotReply ( NULL ) ;

	if ( THIS->m_callback ) {
		THIS->m_callback ( THIS->m_state );
	}
	else 
	if( THIS->m_callback2 ) {
		THIS->m_callback2 ( THIS->m_state );
	}
	else {
		log(LOG_LOGIC,"%s:%s: No callback!", __FILE__, __func__);
		g_process.shutdownAbort(true);
	}
}

// . set m_reply/m_replySize to the reply
void Msg20::gotReply ( UdpSlot *slot ) {
	// we got the reply
	m_gotReply = true;
	// no longer in progress, we got a reply
	m_inProgress = false;
	// sanity check
	if ( m_r ) { g_process.shutdownAbort(true); }

	// free our serialized request buffer to save mem
	if ( m_request ) {
		mfree ( m_request , m_requestSize  , "Msg20rb2" );
		m_request = NULL;
	}

	// save error so Msg40 can look at it
	if ( g_errno ) { 
		m_errno = g_errno;
		log( LOG_WARN, "query: msg20: got reply for docid %" PRId64" : %s", m_requestDocId,mstrerror(g_errno));
		return; 
	}
	// . get the best reply we got
	// . we are responsible for freeing this reply
	bool freeit;
	// . freeit is true if mcast will free it
	// . we should always own it since we call deserialize and has ptrs
	//   into it
	char *rp = NULL;
	if ( slot ) {
		rp             = slot->m_readBuf;
		m_replySize    = slot->m_readBufSize;
		m_replyMaxSize = slot->m_readBufMaxSize;
		freeit = false;
	}
	else {
		rp =m_mcast.getBestReply(&m_replySize,&m_replyMaxSize,&freeit);
	}

	relabel( rp , m_replyMaxSize, "Msg20-mcastGBR" );

	// sanity check. make sure multicast is not going to free the
	// slot's m_readBuf... we need to own it.
	if ( freeit ) {
		log(LOG_LOGIC,"query: msg20: gotReply: Bad engineer.");
		g_process.shutdownAbort(true);
	}

	// see if too small for a getSummary request
	if ( m_replySize < (int32_t)sizeof(Msg20Reply) ) { 
		log("query: Summary reply is too small.");
		//g_process.shutdownAbort(true);
		m_errno = g_errno = EREPLYTOOSMALL; return; }

	// cast it
	m_r = (Msg20Reply *)rp;

	// we own it now
	m_ownReply = true;

	// deserialize it, sets g_errno on error??? not yet TODO!
	m_r->deserialize();
}


// . this is called
// . destroys the UdpSlot if false is returned
static void handleRequest20(UdpSlot *slot, int32_t netnice) {
	// . check g_errno
	// . before, we were not sending a reply back here and we continued
	//   to process the request, even though it was empty. the slot
	//   had a NULL m_readBuf because it could not alloc mem for the read
	//   buf i'm assuming. and the slot was saved in a line below here...
	//   state20->m_msg22.m_parent = slot;
	if ( g_errno ) {
		log(LOG_WARN, "net: Msg20 handler got error: %s.",mstrerror(g_errno));
		log(LOG_ERROR,"%s:%s:%d: call sendErrorReply.", __FILE__, __func__, __LINE__);
		g_udpServer.sendErrorReply ( slot , g_errno );
		return;
	}

	// ensure request is big enough
	if ( slot->m_readBufSize < (int32_t)sizeof(Msg20Request) ) {
		log(LOG_ERROR,"%s:%s:%d: call sendErrorReply. Bad request size", __FILE__, __func__, __LINE__);
		g_udpServer.sendErrorReply ( slot , EBADREQUESTSIZE );
		return;
	}

	// parse the request
	Msg20Request *req = (Msg20Request *)slot->m_readBuf;

	// . turn the string offsets into ptrs in the request
	// . this is "destructive" on "request"
	int32_t nb = req->deserialize();
	// sanity check
	if ( nb != slot->m_readBufSize ) { g_process.shutdownAbort(true); }

	// sanity check, the size include the \0
	if ( req->m_collnum < 0 ) {
		char ipbuf[16];
		log(LOG_WARN, "query: Got empty collection in msg20 handler. FIX! "
		    "from ip=%s port=%i",iptoa(slot->getIp(),ipbuf),(int)slot->getPort());
		    
		log(LOG_ERROR,"%s:%s:%d: call sendErrorReply.", __FILE__, __func__, __LINE__);
		g_udpServer.sendErrorReply ( slot , ENOTFOUND );
		return; 
	}

	int64_t cache_key = req->makeCacheKey();
	const void *cached_summary;
	size_t cached_summary_len;
	if(g_stable_summary_cache.lookup(cache_key, &cached_summary, &cached_summary_len) ||
	   g_unstable_summary_cache.lookup(cache_key, &cached_summary, &cached_summary_len))
	{
		log(LOG_DEBUG, "query: Summary cache hit");
		sendCachedReply(req,cached_summary,cached_summary_len,slot);
		return;
	} else
		log(LOG_DEBUG, "query: Summary cache miss");

	// if it's not stored locally that's an error
	if ( req->m_docId >= 0 && ! Titledb::isLocal ( req->m_docId ) ) {
		log(LOG_WARN, "query: Got msg20 request for non-local docId %" PRId64, req->m_docId);
		log(LOG_ERROR,"%s:%s:%d: call sendErrorReply.", __FILE__, __func__, __LINE__);
		g_udpServer.sendErrorReply ( slot , ENOTLOCAL ); 
		return; 
	}

	// sanity
	if ( req->m_docId == 0 && ! req->ptr_ubuf ) { //g_process.shutdownAbort(true); }
		log( LOG_WARN, "query: Got msg20 request for docid of 0 and no url for "
		    "collnum=%" PRId32" query %s",(int32_t)req->m_collnum,req->ptr_qbuf);

		log(LOG_ERROR,"%s:%s:%d: call sendErrorReply.", __FILE__, __func__, __LINE__);
		g_udpServer.sendErrorReply ( slot , ENOTFOUND );
		return; 
	}

	int64_t startTime = gettimeofdayInMilliseconds();

	// alloc a new state to get the titlerec
	Msg20State *state;
	try {
		state = new Msg20State(slot,req);
	} catch(std::bad_alloc&) {
		g_errno = ENOMEM;
		log("query: msg20 new(%" PRId32"): %s", (int32_t)sizeof(XmlDoc),
		    mstrerror(g_errno));
		log(LOG_ERROR,"%s:%s:%d: call sendErrorReply. error=%s", __FILE__, __func__, __LINE__, mstrerror( g_errno ));
		g_udpServer.sendErrorReply ( slot, g_errno ); 
		return; 
	}
	mnew(state, sizeof(*state), "xd20");

	// ok, let's use the new XmlDoc.cpp class now!
	state->m_xmldoc.setMsg20Request(req);

	// set the callback
	state->m_xmldoc.setCallback(state, gotReplyWrapperxd);

	// set set time
	state->m_xmldoc.m_setTime = startTime;
	state->m_xmldoc.m_cpuSummaryStartTime = 0;

	// . now as for the msg20 reply!
	// . TODO: move the parse state cache into just a cache of the
	//   XmlDoc itself, and put that cache logic into XmlDoc.cpp so
	//   it can be used more generally.
	Msg20Reply *reply = state->m_xmldoc.getMsg20Reply ( );

	// this is just blocked
	if ( reply == (void *)-1 ) return;

	// got it?
	gotReplyWrapperxd (state);
}

bool gotReplyWrapperxd(void *state_) {
	Msg20State *state = static_cast<Msg20State*>(state_);
	// print time
	int64_t now = gettimeofdayInMilliseconds();
	int64_t took = now - state->m_xmldoc.m_setTime;
	int64_t took2 = 0;
	if ( state->m_xmldoc.m_cpuSummaryStartTime) {
		took2 = now - state->m_xmldoc.m_cpuSummaryStartTime;
	}

	// if there is a baclkog of msg20 summary generation requests this
	// is really not the cpu it took to make the smmary, but how long it
	// took to get the reply. this request might have had to wait for the
	// other summaries to finish computing before it got its turn, 
	// meanwhile its clock was ticking. TODO: make this better?
	// only do for niceness 0 otherwise it gets interrupted by quickpoll
	// and can take a int32_t time.
	if ( state->m_req->m_niceness == 0 && (state->m_req->m_isDebug || took > 100 || took2 > 100 ) ) {
		log(LOG_TIMING, "query: Took %" PRId64" ms (total=%" PRId64" ms) to compute summary for d=%" PRId64" "
		    "u=%s status=%s q=%s",
		    took2,
			took,
		    state->m_xmldoc.m_docId, state->m_xmldoc.m_firstUrl.getUrl(),
		    mstrerror(g_errno),
		    state->m_req->ptr_qbuf);
	}

	// error?
	if ( g_errno ) {
		state->m_xmldoc.m_reply.sendReply(state);
		return true;
	}
	// this should not block now
	Msg20Reply *reply = state->m_xmldoc.getMsg20Reply();
	// sanity check, should not block here now
	if ( reply == (void *)-1 ) { g_process.shutdownAbort(true); }
	// NULL means error, -1 means blocked. on error g_errno should be set
	if ( ! reply && ! g_errno ) { g_process.shutdownAbort(true);}
	// send it off. will send an error reply if g_errno is set
	return reply->sendReply(state);
}

Msg20Reply::Msg20Reply ( ) {
	m_ip = 0;
	m_firstIp = 0;
	m_wordPosStart = 0;
	m_docId = 0;
	m_firstSpidered = 0;
	m_lastSpidered = 0;
	m_lastModified = 0;
	m_datedbDate = 0;
	m_firstIndexedDate = 0;
	m_discoveryDate = 0;
	m_errno = 0;
	m_collnum = 0;
	m_noArchive = 0;
	m_contentType = 0;
	m_siteRank = 0;
	m_isBanned = false;
	m_hopcount = 0;
	m_recycled = 0;
	m_language = langUnknown;
	m_country = 0;
	m_isAdult = false;
	m_httpStatus = 0;
	m_contentLen = 0;
	m_contentHash32 = 0;
	m_pageNumInlinks = 0;
	m_pageNumGoodInlinks = 0;
	m_pageNumUniqueIps = 0;
	m_pageNumUniqueCBlocks = 0;
	m_pageInlinksLastUpdated = 0;
	m_siteNumInlinks = 0;
	m_numOutlinks = 0;
	m_linkTextNumWords = 0;
	m_midDomHash = 0;
	m_isLinkSpam = 0;
	m_outlinkInContent = 0;
	m_outlinkInComment = 0;
	m_isPermalink = 0;
	m_isDisplaySumSetFromTags = 0;

	ptr_tbuf = NULL;
	ptr_htag = NULL;
	ptr_ubuf = NULL;
	ptr_rubuf = NULL;
	ptr_displaySum = NULL;
	ptr_dbuf = NULL;
	ptr_vbuf = NULL;
	ptr_imgData = NULL;
	ptr_site = NULL;
	ptr_linkInfo = NULL;
	ptr_outlinks = NULL;
	ptr_vector1 = NULL;
	ptr_vector2 = NULL;
	ptr_vector3 = NULL;
	ptr_linkText = NULL;
	ptr_surroundingText = NULL;
	ptr_linkUrl = NULL;
	ptr_rssItem = NULL;
	ptr_categories = NULL;
	ptr_content = NULL;
	ptr_templateVector = NULL;
	ptr_metadataBuf = NULL;
	ptr_note = NULL;
		
	size_tbuf = 0;
	size_htag = 0;
	size_ubuf = 0;
	size_rubuf = 0;
	size_displaySum = 0;
	size_dbuf = 0;
	size_vbuf = 0;
	size_imgData = 0;
	size_site = 0;
	size_linkInfo = 0;
	size_outlinks = 0;
	size_vector1 = 0;
	size_vector2 = 0;
	size_vector3 = 0;
	size_linkText = 0;
	size_surroundingText = 0;
	size_linkUrl = 0;
	size_rssItem = 0;
	size_categories = 0;
	size_content = 0; // page content in utf8
	size_templateVector = 0;
	size_metadataBuf = 0;
	size_note = 0;
}


// we need to free the ptr_summaryLines if it is pointing into a new buffer
// which is what Msg40 sometimes does to it when it merges Msg20Reply's 
// summaries for events together.
Msg20Reply::~Msg20Reply ( ) {
	destructor();
}

void Msg20Reply::destructor ( ) {
}


// . return ptr to the buffer we serialize into
// . return NULL and set g_errno on error
bool Msg20Reply::sendReply(Msg20State *state) {
	if ( g_errno ) {
		// extract titleRec ptr
		log(LOG_ERROR, "query: Had error generating msg20 reply for d=%" PRId64": %s",state->m_xmldoc.m_docId, mstrerror(g_errno));
		// don't forget to delete this list
	haderror:
		UdpSlot *slot = state->m_slot;
		mdelete(state, sizeof(*state), "Msg20");
		delete state;
		log(LOG_ERROR,"%s:%s:%d: call sendErrorReply. error=%s", __FILE__, __func__, __LINE__, mstrerror( g_errno ));
		g_udpServer.sendErrorReply(slot, g_errno);
		return true;
	}

	// now create a buffer to store title/summary/url/docLen and send back
	int32_t  need = getStoredSize();
	char *buf  = (char *)mmalloc ( need , "Msg20Reply" );
	if ( ! buf ) goto haderror;

	// should never have an error!
	int32_t used = serialize ( buf , need );

	// sanity
	if (ptr_linkInfo && ((LinkInfo *)ptr_linkInfo)->m_lisize != size_linkInfo) {
		log(LOG_ERROR,"!!! CORRUPTED LINKINFO detected for docId %" PRId64 " - resetting linkinfo", state->m_xmldoc.m_docId);
		size_linkInfo = 0;
		ptr_linkInfo = NULL;
		// gbshutdownAbort(true);
	}

	// sanity
	if ( used != need ) { g_process.shutdownAbort(true); }

	// use blue for our color
	int32_t color = 0x0000ff;

	// but use dark blue for niceness > 0
	if ( state->m_xmldoc.m_niceness > 0 ) color = 0x0000b0;

	// sanity check
	if ( ! state->m_xmldoc.m_utf8ContentValid ) { g_process.shutdownAbort(true); }

	// for records
	int32_t clen = 0;

	if ( state->m_xmldoc.m_utf8ContentValid ) clen = state->m_xmldoc.size_utf8Content - 1;

	// show it in performance graph
	if ( state->m_xmldoc.m_startTimeValid ) {
		g_stats.addStat_r( clen, state->m_xmldoc.m_startTime, gettimeofdayInMilliseconds(), color );
	}
	
	
	//put the reply into the summary cache
	if(m_isDisplaySumSetFromTags && !state->m_req->m_highlightQueryTerms)
		g_stable_summary_cache.insert(state->m_req->makeCacheKey(), buf, need);
	else
		g_unstable_summary_cache.insert(state->m_req->makeCacheKey(), buf, need);

	UdpSlot *slot = state->m_slot;
	// . del the list at this point, we've copied all the data into reply
	// . this will free a non-null State20::m_ps (ParseState) for us
	mdelete(state, sizeof(*state), "Msg20");
	delete state;
	
	g_udpServer.sendReply(buf, need, buf, need, slot);

	return true;
}


static bool sendCachedReply ( Msg20Request *req, const void *cached_summary, size_t cached_summary_len, UdpSlot *slot )
{
	//copy the cached summary to a new temporary buffer, so that UDPSlot/Server can free it when possible
	char *buf  = (char *)mmalloc ( cached_summary_len , "Msg20Reply" );
	if(!buf) {
		log(LOG_ERROR,"%s:%s:%d: call sendErrorReply. error=%s", __FILE__, __func__, __LINE__, mstrerror( g_errno ));
		g_udpServer.sendErrorReply ( slot , g_errno ) ;
		return true;
	}
	memcpy(buf,cached_summary,cached_summary_len);
	
	g_udpServer.sendReply(buf, cached_summary_len, buf, cached_summary_len, slot);
	
	return true;
}


// . this is destructive on the "buf". it converts offs to ptrs
// . sets m_r to the modified "buf" when done
// . sets g_errno and returns -1 on error, otherwise # of bytes deseril
int32_t Msg20::deserialize ( char *buf , int32_t bufSize ) { 
	if ( bufSize < (int32_t)sizeof(Msg20Reply) ) {
		g_errno = ECORRUPTDATA; return -1; }
	m_r = (Msg20Reply *)buf;
	// do not free "buf"/"m_r"
	m_ownReply = false;
	return m_r->deserialize ( );
}

int32_t Msg20Request::getStoredSize() const {
	return getMsgStoredSize(sizeof(*this), &size_qbuf, &size_displayMetas);
}

// . return ptr to the buffer we serialize into
// . return NULL and set g_errno on error
char *Msg20Request::serialize(int32_t *retSize) const {
	// make a buffer to serialize into
	int32_t  need = getStoredSize();
	// alloc if we should
	char *buf = (char *)mmalloc ( need , "Msg20Ra" );
	// bail on error, g_errno should be set
	if ( ! buf ) return NULL;

	return serializeMsg(sizeof(*this),
			    &size_qbuf, &size_displayMetas,
			    &ptr_qbuf,
			    this,
			    retSize,
			    buf, need);
}

// convert offsets back into ptrs
int32_t Msg20Request::deserialize ( ) {
	return deserializeMsg(sizeof(*this),
			      &size_qbuf, &size_displayMetas,
		              &ptr_qbuf,
		              ((char*)this) + sizeof(*this));
}


//make a cache key for a request
int64_t Msg20Request::makeCacheKey() const
{
	SafeBuf hash_buffer;
	hash_buffer.pushLong(m_numSummaryLines);
	hash_buffer.pushLong(m_getHeaderTag);
	hash_buffer.pushLongLong(m_docId);
	hash_buffer.pushLong(m_titleMaxLen);
	hash_buffer.pushLong(m_summaryMaxLen);
	hash_buffer.pushLong(m_summaryMaxNumCharsPerLine);
	hash_buffer.pushLong(m_collnum);
	hash_buffer.pushLong(m_highlightQueryTerms);
	hash_buffer.pushLong(m_getSummaryVector);
	hash_buffer.pushLong(m_showBanned);
	hash_buffer.pushLong(m_includeCachedCopy);
	hash_buffer.pushLong(m_doLinkSpamCheck);
	hash_buffer.pushLong(m_isLinkSpam);
	hash_buffer.pushLong(m_isSiteLinkInfo);
	hash_buffer.pushLong(m_getLinkInfo);
	hash_buffer.pushLong(m_onlyNeedGoodInlinks);
	hash_buffer.pushLong(m_getLinkText);
	hash_buffer.safeMemcpy(ptr_qbuf,size_qbuf);
	hash_buffer.safeMemcpy(ptr_ubuf,size_ubuf);
	hash_buffer.safeMemcpy(ptr_linkee,size_linkee);
	hash_buffer.safeMemcpy(ptr_displayMetas,size_displayMetas);
	int64_t h = hash64(hash_buffer.getBufStart(), hash_buffer.length());
	return h;
}


int32_t Msg20Reply::getStoredSize() const {
	return getMsgStoredSize(sizeof(*this), &size_tbuf, &size_note);
}


// returns NULL and set g_errno on error
int32_t Msg20Reply::serialize(char *buf, int32_t bufSize) const {
#ifdef _VALGRIND_
	VALGRIND_CHECK_MEM_IS_DEFINED(this,sizeof(*this));
	if(ptr_htag)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_htag,size_htag);
	if(ptr_ubuf)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_ubuf,size_ubuf);
	if(ptr_rubuf)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_rubuf,size_rubuf);
	if(ptr_displaySum)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_displaySum,size_displaySum);
	if(ptr_dbuf)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_dbuf,size_dbuf);
	if(ptr_vbuf)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_vbuf,size_vbuf);
	if(ptr_imgData)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_imgData,size_imgData);
	if(ptr_site)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_site,size_site);
	if(ptr_linkInfo)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_linkInfo,size_linkInfo);
	if(ptr_outlinks)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_outlinks,size_outlinks);
	if(ptr_vector1)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_vector1,size_vector1);
	if(ptr_vector2)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_vector2,size_vector2);
	if(ptr_vector3)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_vector3,size_vector3);
	if(ptr_linkText)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_linkText,size_linkText);
	if(ptr_surroundingText)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_surroundingText,size_surroundingText);
	if(ptr_linkUrl)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_linkUrl,size_linkUrl);
	if(ptr_rssItem)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_rssItem,size_rssItem);
	if(ptr_categories)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_categories,size_categories);
	if(ptr_content)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_content,size_content);
	if(ptr_templateVector)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_templateVector,size_templateVector);
	if(ptr_metadataBuf)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_metadataBuf,size_metadataBuf);
	if(ptr_note)
		VALGRIND_CHECK_MEM_IS_DEFINED(ptr_note,size_note);
#endif
	int32_t retSize;
	serializeMsg(sizeof(*this),
	             &size_tbuf, &size_note,
	             &ptr_tbuf,
	             this,
	             &retSize,
	             buf, bufSize);
	if ( retSize > bufSize ) { g_process.shutdownAbort(true); }
	// return it
	return retSize;
}

// convert offsets back into ptrs
int32_t Msg20Reply::deserialize ( ) {
	int32_t bytesParsed = deserializeMsg(sizeof(*this),
					     &size_tbuf, &size_note,
					     &ptr_tbuf,
					     ((char*)this) + sizeof(*this));
	if(bytesParsed<0)
		return bytesParsed;
	
	// sanity
	if (ptr_linkInfo && ((LinkInfo *)ptr_linkInfo)->m_lisize != size_linkInfo) {
		log(LOG_ERROR,"!!! CORRUPTED LINKINFO detected for docId %" PRId64 " - resetting linkinfo", m_docId);
		size_linkInfo = 0;
		ptr_linkInfo = NULL;
		// gbshutdownAbort(true);
	}

	// return how many bytes we used
	return bytesParsed;
}