#include "Hostdb.h"
#include "UdpServer.h"
#include "Conf.h"
#include "JobScheduler.h"
#include "max_niceness.h"
#include "Process.h"
#include "Sanity.h"
#include "sort.h"
#include "Rdb.h"
#include "Posdb.h"
#include "Titledb.h"
#include "Spider.h"
#include "Clusterdb.h"
#include "HostFlags.h"
#include "Dns.h"
#include "File.h"
#include "IPAddressChecks.h"
#include "ip.h"
#include "Mem.h"
#include "ScopedLock.h"
#include <fcntl.h>
#include <sys/types.h>
#include <ifaddrs.h>
#include <sys/types.h>

// a global class extern'd in .h file
Hostdb g_hostdb;

static HashTableX g_hostTableUdp;
static HashTableX g_hostTableTcp;


void Hostdb::resetPortTables () {
	g_hostTableUdp.reset();
	g_hostTableTcp.reset();
}

static int cmp  ( const void *h1 , const void *h2 ) ;

Hostdb::Hostdb ( ) {
	m_hosts = NULL;
	m_numHosts = 0;
	m_ips = NULL;
	m_initialized = false;
	m_crcValid = false;
	m_crc = 0;
	m_created = false;
	m_myHost = NULL;
	m_myIp = 0;
	m_myIpShotgun = 0;
	m_myPort = 0;
	m_myHost = NULL;
	m_myShard = NULL;
	m_loopbackIp = atoip ( "127.0.0.1" , 9 );
	m_numHosts = 0;
	m_numHostsAlive = 0;
	m_allocSize = 0;
	m_numHostsPerShard  = 0;
	m_numStripeHostsPerShard = 0;
	m_bufSize = 0;
	m_numIps = 0;
	m_hostId = 0;
	m_numShards = 0;
	m_indexSplits = 0;
	m_numSpareHosts = 0;
	m_numProxyHosts = 0;
	m_numProxyAlive = 0;
	m_numTotalHosts = 0;
	m_useTmpCluster = false;

	memset(m_hostPtrs, 0, sizeof(m_hostPtrs));
	memset(m_shards, 0, sizeof(m_shards));
	memset(m_spareHosts, 0, sizeof(m_spareHosts));
	memset(m_proxyHosts, 0, sizeof(m_proxyHosts));
	memset(m_buf, 0, sizeof(m_buf));
	memset(m_dir, 0, sizeof(m_dir));
	memset(m_httpRootDir, 0, sizeof(m_httpRootDir));
	memset(m_logFilename, 0, sizeof(m_logFilename));
	memset(m_map, 0, sizeof(m_map));
	
	m_hostsConfInDisagreement = false;
	m_hostsConfInAgreement = false;
	
	m_minRepairMode = -1;
	m_minRepairModeBesides0 = -1;
	m_minRepairModeHost = NULL;
}


Hostdb::~Hostdb () {
	reset();
}

void Hostdb::reset ( ) {
	if ( m_hosts )
		mfree ( m_hosts, m_allocSize,"Hostdb" );
	if ( m_ips   ) mfree ( m_ips  , m_numIps * 4, "Hostdb" );
	m_hosts = NULL;
	m_ips               = NULL;
	m_numIps            = 0;
}

// . gets filename that contains the hosts from the Conf file
// . return false on errro
// . g_errno may NOT be set
bool Hostdb::init(int32_t hostIdArg, bool proxyHost, bool useTmpCluster, const char *cwd) {
	// reset my ip and port
	m_myIp             = 0;
	m_myIpShotgun      = 0;
	m_myPort           = 0;
	m_myHost           = NULL;
	//m_myPort2          = 0;
	m_numHosts         = 0;
	m_numHostsPerShard = 0;
	m_numStripeHostsPerShard = 0;
	m_loopbackIp       = atoip ( "127.0.0.1" , 9 );
	m_useTmpCluster    = useTmpCluster;
	m_initialized = true;

	const char *dir = "./";
	if (cwd) {
		dir = cwd;
	}

	const char *filename = "hosts.conf";

	// for now we autodetermine
	if ( hostIdArg != -1 ) {
		g_process.shutdownAbort(true);
	}

	// init to -1
	m_hostId = -1;

retry:
	// . File::open() open old if it exists, otherwise,
	File f;
	f.set ( dir , filename );
	// . returns -1 on error and sets g_errno
	// . returns false if does not exist, true otherwise
	int32_t status = f.doesExist();
	int32_t numRead;

	// return false on error (g_errno should be set)
	if ( status <= -1 ) return false;
	// return false if the conf file does not exist
	if ( status ==  0 ) { 
		g_errno = ENOHOSTSFILE; 
		// now we generate one if that is not there
createFile:
		if ( ! m_created ) {
			m_created = true;
			g_errno = 0;
			dir = cwd;
			createHostsConf( cwd );
			goto retry;
		}
		log(LOG_WARN, "conf: Filename %s does not exist." ,filename);
		return false; 
	}

	// get file size
	m_bufSize = f.getFileSize();

	// return false if too big
	if ( m_bufSize > (MAX_HOSTS+MAX_SPARES) * 128 ) { 
		g_errno = EBUFTOOSMALL; 
		log(LOG_WARN, "conf: %s has filesize of %" PRId32" bytes, which is greater than %" PRId32" max.",
		    filename,m_bufSize, (int32_t)(MAX_HOSTS+MAX_SPARES)*128);
		return false;
	}

	// open the file
	if ( ! f.open ( O_RDONLY ) ) {
		return false;
	}

	// read in the file
	numRead = f.read ( m_buf , m_bufSize , 0 /*offset*/ );
	// ensure g_errno is now set if numRead != m_bufSize
	if ( numRead != m_bufSize ) {
		log( LOG_WARN, "conf: Error reading %s : %s.", filename, mstrerror( g_errno ) );
		return false;
	}

	// NULL terminate what we read
	m_buf [ m_bufSize ] = '\0';

	// how many hosts do we have?
	char *p    = m_buf;
	char *pend = m_buf + m_bufSize;
	int32_t  i = 0;
	m_numSpareHosts = 0;
	m_numProxyHosts = 0;
	m_numHosts      = 0;
	
	for ( ; *p ; p++ ) {
		if ( is_wspace_a (*p) ) continue;
		// skip comments
		if ( *p == '#' ) { while ( *p && *p != '\n' ) p++; continue; }
		// MUST be a number
		if ( ! is_digit ( *p ) ) {
			// skip known directives
			if (!strncmp(p, "port-offset:", 12) ||
			    !strncmp(p, "index-splits:", 13) ||
			    !strncmp(p, "num-mirrors:", 12) ||
			    !strncmp(p, "working-dir:", 12)) {
				// no op
			}else if (strncasecmp(p, "spare", 5) == 0) {
				// check if this is a spare host
				m_numSpareHosts++;
			} else if (strncasecmp(p, "proxy", 5) == 0) {
				// check if this is a proxy host
				m_numProxyHosts++;
			} else if (strncasecmp(p, "qcproxy", 7) == 0) {
				// query compression proxies count as proxies
				m_numProxyHosts++;
			} else if (strncasecmp(p, "scproxy", 7) == 0) {
				// spider compression proxies count as proxies
				m_numProxyHosts++;
			} else {
				log(LOG_WARN, "conf: %s is malformed. First item of each non-comment line must be a NUMERIC hostId, "
					"SPARE or PROXY. line=%s", filename, p);
				return false;
			}
		} else {
			// count it as a host
			m_numHosts++;
		}
		i++;

		// skip line
		while ( *p && *p != '\n' ) {
			p++;
		}
	}

	// set g_errno, log and return false if no hosts found in the file
	if ( i == 0 ) { 
		g_errno = ENOHOSTS; 
		log( LOG_WARN, "conf: No host entries found in %s.",filename);
		goto createFile;
	}
	// alloc space for this many Hosts structures
	// save buffer size
	m_allocSize = sizeof(Host) * i;
	m_hosts = (Host *) mcalloc ( m_allocSize ,"Hostdb");
	if ( ! m_hosts ) {
		log( LOG_WARN, "conf: Memory allocation failed.");
		return false;
	}

	int32_t numGrunts = 0;

	// now fill up m_hosts
	p = m_buf;
	i = 0;
	int32_t line = 1;
	int32_t proxyNum = 0;

	// assume defaults
	int32_t indexSplits = 0;
	char *wdir2 = NULL;
	int32_t  wdirlen2 = 0;
	int32_t numMirrors = -1;

	int32_t num_nospider = 0;
	int32_t num_noquery  = 0;

	char tmp[256];

	for ( ; *p ; p++ , line++ ) {
		if ( is_wspace_a (*p) ) continue;
		// skip comments
		if ( *p == '#' ) { while ( *p && *p != '\n' ) p++; continue; }

		// does the line say "port-offset: xxxx" ?
		if ( ! strncmp(p,"index-splits:",13) ) {
			p += 13;
			// skip spaces after the colon
			while (  is_wspace_a(*p) ) p++;			
			indexSplits = atol(p);
			while ( *p && *p != '\n' ) p++; 
			continue; 
		}

		if ( ! strncmp(p,"num-mirrors:",12) ) {
			p += 12;
			// skip spaces after the colon
			while (  is_wspace_a(*p) ) p++;			
			numMirrors = atol(p);
			while ( *p && *p != '\n' ) p++; 
			continue; 
		}

		// does the line say "working-dir: xxxx" ?
		if ( ! strncmp(p,"working-dir:",12) ) {
			p += 12;
			// skip spaces after the colon
			while (  is_wspace_a(*p) ) p++;			
			wdir2 = p;
			// skip until not space
			while ( *p && ! is_wspace_a(*p) ) p++;
			// set length
			wdirlen2 = p - wdir2;
			// mark the end
			char *end = p;
			while ( *p && *p != '\n' ) p++; 
			// null term it
			*end = '\0';
			continue; 
		}

		// skip any spaces at start of line
		while (   is_wspace_a(*p) ) p++;

		// get host in order
		Host *h = &m_hosts[i];

		// clear it
		memset ( h , 0 , sizeof(Host) );

		// . see what type of host this is
		// . proxies are not given numbers as yet in the hosts.conf
		//   so number them in the order in which they come
		if ( is_digit(*p) ) {
			h->m_type = HT_GRUNT;
			h->m_hostId = atoi(p);
		}
		else if ( strncasecmp(p,"spare",5)==0 ) {
			h->m_type = HT_SPARE;
			h->m_hostId = -1;
		}
		else if ( strncasecmp(p,"qcproxy",7)==0 ) {
			h->m_type = HT_QCPROXY;
			h->m_hostId = proxyNum++;
		}
		else if ( strncasecmp(p,"scproxy",7)==0 ) {
			h->m_type = HT_SCPROXY;
			h->m_hostId = proxyNum++;
		}
		else if ( strncasecmp(p,"proxy",5)==0 ) {
			h->m_type = HT_PROXY;
			h->m_hostId = proxyNum++;
		}
		// ignore old version "port-offset:"
		else if ( strncasecmp(p,"port-offset:",12)==0 ) {
			while ( *p && *p != '\n' ) p++;
			continue;
		}
		else {
			logf(LOG_INFO,"hosts: hosts.conf bad line: %s",p);
			g_errno = EBADENGINEER;
			return false;
		}

		char *wdir;
		int32_t  wdirlen;

		// reset this
		h->m_retired = false;

		// skip numeric hostid or "proxy" keyword
		while ( ! is_wspace_a(*p) ) p++;

		// skip spaces after hostid/port/spare keyword
		while ( is_wspace_a(*p) ) p++;

		// now the four ports
		int32_t port1 = atol(p);
		// skip digits
		for ( ; is_digit(*p) ; p++ );
		// skip spaces after it
		while ( is_wspace_a(*p) ) p++;

		int32_t port2 = atol(p);
		// skip digits
		for ( ; is_digit(*p) ; p++ );
		// skip spaces after it
		while ( is_wspace_a(*p) ) p++;

		int32_t port3 = atol(p);
		// skip digits
		for ( ; is_digit(*p) ; p++ );
		// skip spaces after it
		while ( is_wspace_a(*p) ) p++;

		int32_t port4 = atol(p);
		// skip digits
		for ( ; is_digit(*p) ; p++ );
		// skip spaces after it
		while ( is_wspace_a(*p) ) p++;

		// set our ports
		h->m_dnsClientPort = port1; // 6000
		h->m_httpsPort     = port2; // 7000
		h->m_httpPort      = port3; // 8000
		h->m_port          = port4; // 9000

		// then hostname
		char *host = p;

		// skip hostname (can be an ip now)
		while ( *p && (*p=='.'||is_alnum_a(*p)) ) p++;
		// get length
		int32_t hlen = p - host;
		// limit
		if ( hlen > 15 ) {
			g_errno = EBADENGINEER;
			log(LOG_WARN, "admin: hostname too long in hosts.conf");
			return false;
		}
		// copy it
		gbmemcpy ( h->m_hostname , host , hlen );
		// null term it
		h->m_hostname[hlen] = '\0';
		// need this for hashing
		hashinit();
		// if hostname is an ip that's ok i guess
		int32_t ip = atoip ( h->m_hostname );
		// if not an ip, look it up
		if ( ! ip ) {
			// get key
			key96_t k = hash96 ( host , hlen );
			// get eth0 ip of hostname in /etc/hosts
			g_dns.isInFile ( k , &ip );
		}
		// still bad?
		if ( ! ip ) {
			g_errno = EBADENGINEER;
			log(LOG_WARN, "admin: no ip for hostname \"%s\" in "
			    "hosts.conf in /etc/hosts",
			    h->m_hostname);
			return false;
		}
		// store the ip
		h->m_ip = ip;
		
		// skip spaces or until \n
		for ( ; *p == ' ' ; p++ );
		// must be a 2nd hostname
		char *hostname2 = NULL;
		int32_t hlen2 = 0;
		if ( *p != '\n' ) {
			hostname2 = p;
			// find end of it
			for ( ; *p=='.' || 
				      is_digit(*p) || 
				      is_alnum_a(*p) ; p++ );
			hlen2 = p - hostname2;
		}

		int32_t ip2 = 0;
		// was it "retired"?
		if ( hostname2 && strncasecmp(hostname2,"retired",7) == 0 ) {
			h->m_retired = true;
			hostname2 = NULL;
			//goto retired;
		}

		// limit
		if ( hlen2 > 15 ) {
			g_errno = EBADENGINEER;
			log(LOG_WARN, "admin: hostname too long in hosts.conf");
			return false;
		}
		// a direct ip address?
		if ( hostname2 ) {
			gbmemcpy ( h->m_hostname2,hostname2,hlen2);
			h->m_hostname2[hlen2] = '\0';
			ip2 = atoip ( h->m_hostname2 );
		}
		if ( ! ip2 && hostname2 ) {
			// set this ip
			//int32_t nextip;
			// now that must have the eth1 ip in /etc/hosts
			key96_t k = hash96 ( h->m_hostname2 , hlen2 );
			// get eth1 ip of hostname in /etc/hosts
			if ( ! g_dns.isInFile ( k , &ip2 ) ) {
				char ipbuf[16];
				log(LOG_WARN, "admin: secondary host %s in hosts.conf "
				    "not in /etc/hosts. Using secondary "
				    "ethernet (eth1) ip "
				    "of %s",hostname2,iptoa(ip,ipbuf));
			}
		}

		// if none, use initial ip as shotgun as well
		if ( ! ip2 ) ip2 = ip;
		// store the ip, the eth1 ip
		h->m_ipShotgun = ip2; // nextip;

		// . now p should point to first char after hostname
		// . skip spaces and tabs
		while ( *p && (*p==' '|| *p=='\t') )p++;

		// is "RETIRED" after hostname?
		if ( strncasecmp(p,"retired",7) == 0 )
			h->m_retired = true;
		
		// for qcproxies, the next thing is always an
		// ip:port of another proxy that we forward the
		// queries to.
		if ( h->m_type & HT_QCPROXY ) {
			char *s = p;
			for ( ; *s && *s!=':' ; s++ );
			int32_t ip = 0;
			if ( *s == ':' ) ip = atoip(p,s-p);
			int32_t port = 0;
			if ( *s ) port = atol(s+1);
			// sanity
			if ( ip == 0 || port == 0 ) {
				g_errno = EBADENGINEER;
				log(LOG_WARN, "admin: bad qcproxy line. must have ip:port after hostname.");
				return false;
			}
			h->m_forwardIp   = ip;
			h->m_forwardPort = port;
			// skip that to port offset now
			for ( ; *p && *p!=' ' && *p !='\t' ; p++);
			// then skip spaces
			for ( ; *p && (*p==' '|| *p=='\t') ; p++ );
		}

		// i guess proxy and spares don't count
		if ( h->m_type != HT_GRUNT ) h->m_shardNum = 0;
		
		// this is the same
		wdir = wdir2;
		wdirlen = wdirlen2; // strlen ( wdir2 );
		// check for working dir override
		if ( *p == '/' ) {
			wdir = p;
			while ( *p && ! isspace(*p) ) p++;
			wdirlen = p - wdir;
		}
		
		if ( ! wdir ) {
			g_errno = EBADENGINEER;
			log(LOG_WARN, "admin: need working-dir for host in hosts.conf line %" PRId32,line);
			return false;
		}

		// skip spaces or until \n
		for ( ; *p == ' ' ; p++ );
		// then skip spaces
		for ( ; *p && (*p==' '|| *p=='\t') ; p++ );

		char *mdir = NULL;
		int32_t mdirlen = 0;

		// check for merge dir override
		if ( *p == '/' ) {
			mdir = p;
			while ( *p && ! isspace(*p) ) p++;
			mdirlen = p - mdir;
		}


		// skip spaces or until \n
		for ( ; *p == ' ' ; p++ );
		// then skip spaces
		for ( ; *p && (*p==' '|| *p=='\t') ; p++ );

		char *ldir = NULL;
		int32_t ldirlen = 0;

		// check for merge lock dir override
		if ( *p == '/' ) {
			ldir = p;
			while ( *p && ! isspace(*p) ) p++;
			ldirlen = p - ldir;
		}


		h->m_queryEnabled = true;
		h->m_spiderEnabled = true;

		// check for something after the working dir
		h->m_note[0] = '\0';
		if ( *p != '\n' ) {
			// save the note
			char *n = p;
			while ( *n && *n != '\n' && n < pend ) n++;

			int32_t noteSize = n - p;
			if ( noteSize > 127 ) noteSize = 127;
			gbmemcpy(h->m_note, p, noteSize);
			*p++ = '\0'; // NULL terminate for atoip

			if(strstr(h->m_note, "noquery")) {
				h->m_queryEnabled = false;
				num_noquery++;
			}
			if(strstr(h->m_note, "nospider")) {
				h->m_spiderEnabled = false;
				num_nospider++;
			}
		} else {
			*p = '\0';
		}

		// get max group number
		if ( h->m_type == HT_GRUNT )
			numGrunts++;

		// skip line now
		while ( *p && *p != '\n' )
			p++;

		// ensure they're in proper order without gaps
		if ( h->m_type==HT_GRUNT && h->m_hostId != i ) {
		     g_errno = EBADHOSTID; 
		     log(LOG_WARN, "conf: Unordered hostId of %" PRId32", should be %" PRId32" in %s line %" PRId32".",
		         h->m_hostId,i,filename,line);
			return false;
		}

		// and working dir
		if ( wdirlen > 255 ) {
			g_errno = EBADENGINEER;
			log(LOG_WARN, "conf: Host working dir too long in %s line %" PRId32".", filename, line);
			return false;
		}
		if ( wdirlen <= 0 ) {
			g_errno = EBADENGINEER;
			log(LOG_WARN, "conf: No working dir supplied in %s line %" PRId32".", filename, line);
			return false;
		}
		// make sure it is legit
		if ( wdir[0] != '/' ) {
			g_errno = EBADENGINEER;
			log(LOG_WARN, "conf: working dir must start with / in %s line %" PRId32, filename, line);
			return false;
		}

		// take off slash if there
		if ( wdir[wdirlen-1]=='/' ) {
			wdir[--wdirlen]='\0';
		}

		// get real path (no symlinks symbolic links)
		// only if on same host, which we determine based on the IP-address.
		if ( ip_distance(h->m_ip)==ip_distance_ourselves ) {
			int32_t tlen = readlink ( wdir , tmp , 250 );
			// if we got the actual path, copy that over
			if ( tlen != -1 ) {
				// wdir currently references into the 
				// hosts.conf buf so don't store the expanded
				// directory into there
				wdir = tmp;
				wdirlen = tlen;
			}
		}

		// add slash if none there
		if ( wdir[wdirlen-1] !='/' ) wdir[wdirlen++] = '/';
			
		// don't breach Host::m_dir[256] buffer
		if ( wdirlen >= 256 ) {
			log(LOG_WARN, "conf: working dir %s is too long, >= 256 chars.", wdir);
			return false;
		}

		// copy it over
		memcpy(m_hosts[i].m_dir, wdir, wdirlen);
		m_hosts[i].m_dir[wdirlen] = '\0';
		memcpy(m_hosts[i].m_mergeDir, mdir, mdirlen);
		m_hosts[i].m_mergeDir[mdirlen] = '\0';
		memcpy(m_hosts[i].m_mergeLockDir, ldir, ldirlen);
		m_hosts[i].m_mergeLockDir[ldirlen] = '\0';
		
		// and don't send emails on him until we got a good ping
		m_hosts[i].m_emailCode = -2;

		m_hosts[i].m_lastResponseReceiveTimestamp = 0;
		m_hosts[i].m_lastRequestSendTimestamp = 0;

		m_hosts[i].m_runtimeInformation.m_dailyMergeCollnum = -1;

		// point to next one
		i++;
	}

	m_numTotalHosts = i;

	// BR 20160313: Sanity check. I doubt the striping functionality works with an odd mix 
	// of noquery and nospider hosts. Make sure the number of each kind is the same for now.
	if (num_nospider && num_noquery && num_nospider != num_noquery) {
		g_errno = EBADENGINEER;
		log(LOG_ERROR,"Number of nospider and noquery hosts must match in hosts.conf");
		return false;
	}

	// # of mirrors is zero if no mirrors,
	// if it is 1 then each host has ONE MIRROR host
	if ( numMirrors == 0 )
		indexSplits = numGrunts;
	if ( numMirrors > 0 )
		indexSplits = numGrunts / (numMirrors+1);

	if ( indexSplits == 0 ) {
		g_errno = EBADENGINEER;
		log("admin: need num-mirrors: xxx or "
		    "index-splits: xxx directive "
		    "in hosts.conf");
		return false;
	}

	numMirrors = (numGrunts / indexSplits) - 1 ;

	if ( numMirrors < 0 ) {
		g_errno = EBADENGINEER;
		log("admin: need num-mirrors: xxx or "
		    "index-splits: xxx directive "
		    "in hosts.conf (2)");
		return false;
	}

	m_indexSplits = indexSplits;

	m_numShards = numGrunts / (numMirrors+1);

	//
	// set Host::m_shardNum
	//
	for ( int32_t i = 0 ; i < numGrunts ; i++ ) {
		Host *h = &m_hosts[i];
		h->m_shardNum = i % indexSplits;
	}

	// assign spare hosts
	if ( m_numSpareHosts > MAX_SPARES ) {
		log ( LOG_WARN, "conf: Number of spares (%" PRId32") exceeds max of %i, "
		      "truncating.", m_numSpareHosts, MAX_SPARES );
		m_numSpareHosts = MAX_SPARES;
	}
	for ( i = 0; i < m_numSpareHosts; i++ ) {
		m_spareHosts[i] = &m_hosts[m_numHosts + i];
	}
	
	// assign proxy hosts
	if ( m_numProxyHosts > MAX_PROXIES ) {
		log ( LOG_WARN, "conf: Number of proxies (%" PRId32") exceeds max of %i, "
		      "truncating.", m_numProxyHosts, MAX_PROXIES );
		g_process.shutdownAbort(true);
		m_numProxyHosts = MAX_PROXIES;
	}
	for ( i = 0; i < m_numProxyHosts; i++ ) {
		m_proxyHosts[i] = &m_hosts[m_numHosts + m_numSpareHosts + i];
		m_proxyHosts[i]->m_isProxy = true;
		// sanity
		if ( m_proxyHosts[i]->m_type == 0  ) { g_process.shutdownAbort(true); }
	}

	// log discovered hosts
	log ( LOG_INFO, "conf: Discovered %" PRId32" hosts and %" PRId32" spares and "
	      "%" PRId32" proxies.",m_numHosts, m_numSpareHosts, m_numProxyHosts );

	// if we have m_numShards we must have 
	int32_t hostsPerShard  = m_numHosts / m_numShards;
	// must be exact fit
	if ( hostsPerShard * m_numShards != m_numHosts ) {
		g_errno = EBADENGINEER;
		log(LOG_WARN, "conf: Bad number of hosts for %" PRId32" shards in hosts.conf.",m_numShards);
		return false;
	}
	// count number of hosts in each shard
	for ( i = 0 ; i < m_numShards ; i++ ) {
		int32_t count = 0;
		for ( int32_t j = 0 ; j < m_numHosts ; j++ )
			if ( m_hosts[j].m_shardNum == (uint32_t)i ) 
				count++;
		if ( count != hostsPerShard ) {
			g_errno = EBADENGINEER;
			log(LOG_WARN, "conf: Number of hosts in each shard in %s is not equal.",filename);
			return false;
		}
	}

	// now sort hosts by shard # then HOST id (both ascending order)
	gbsort ( m_hosts , m_numHosts , sizeof(Host), cmp );

	// . set m_shards array
	// . m_shards[i] is the first host in shardId "i"
	// . any other hosts w/ same shardId immediately follow it
	// . loop through each shard
	for ( i = 0 ; i < m_numShards ; i++ ) {
		int32_t j;
		for ( j = 0 ; j < m_numHosts ; j++ ) 
			if ( m_hosts[j].m_hostId == i ) break;
		// this points to list of all hosts in shard #j since
		// we sorted m_hosts by shardId
		m_shards[i] = &m_hosts[j];
	}
	// . set m_hostPtrs now so Hostdb::getHost() works
	// . the hosts are sorted by shard first so we must be careful
	for ( i = 0 ; i < m_numHosts ; i++ ) {
		int32_t j = m_hosts[i].m_hostId;
		m_hostPtrs[j] = &m_hosts[i];
	}
	// reset this count to 1, 1 counts for ourselves
	if(proxyHost) {
		m_numProxyAlive = 1;
	} else {
		m_numHostsAlive = 1;
	}
	// reset ping/stdDev times
	for ( int32_t i = 0 ; i < m_numHosts ; i++ ) {
		m_hosts[i].m_preferEth      = 0;
	}

	// get IPs of this server. last entry is 0.
	int32_t *localIps = getLocalIps();
	if ( ! localIps ) {
		log(LOG_WARN, "conf: Failed to get local IP address. Exiting.");
		return false;
	}

	// if no cwd, then probably calling 'gb inject foo.warc <hosts.conf>'
	if ( ! cwd ) {
		log("hosts: missing cwd");
		return true;
	}

	// now get host based on cwd and ip
	Host *host = getHost2 ( cwd , localIps );

	// now set m_myIp, m_myPort, m_myPort2 and m_myMachineNum
	if ( proxyHost )
		host = getProxy2 ( cwd , localIps ); //hostId );
	if ( ! host ) {
		log(LOG_WARN, "conf: Could not find host with path %s and local ip in %s", cwd, filename);
		return false;
	}
	m_myIp         = host->m_ip;    // internal IP
	m_myIpShotgun  = host->m_ipShotgun;
	m_myPort       = host->m_port;  // low priority udp port
	m_myHost       = host;

	//we are alive, obviously
	m_myHost->m_isAlive = true;

	//in single-instance setups the hosts.conf CRC is always OK
	if(m_numHosts==1) {
		m_hostsConfInDisagreement = false;
		m_hostsConfInAgreement = true;
	}
	
	//we are the lowest repair mode host until we know better
	m_minRepairModeHost = m_myHost;

	// THIS hostId
	m_hostId = m_myHost->m_hostId;
	// set hosts per shard (mirror group)
	m_numHostsPerShard = m_numHosts / m_numShards;

	// set m_stripe (aka m_twinNum) for each host
	for ( int32_t i = 0 ; i < m_numHosts ; i++ ) {
		// get this host
		Host *h = &m_hosts[i];
		// get his shard, array of hosts
		Host *shard = getShard ( h->m_shardNum );
		// how many hosts in the shard?
		int32_t ng = getNumHostsPerShard();
		// hosts in shard should be sorted by hostid i think, anyway,
		// they *need* to be. see above, hosts are in order in the
		// m_hosts[] array by shard then by hostId, so we should be
		// good to go.
		for ( int32_t j = 0 ; j < ng ; j++ ) {
			if ( &shard[j] != h ) continue;
			h->m_stripe = j;
			break;
		}
	}

	// BR 20160316: Make sure noquery hosts are not used when dividing
	// docIds for querying (Msg39)
	m_numStripeHostsPerShard = m_numHostsPerShard;
	if( m_numStripeHostsPerShard > 1 )
	{
		// Make sure we don't include noquery hosts
		m_numStripeHostsPerShard = (m_numHosts - num_noquery) / m_numShards;
	}


	// get THIS host
	Host *h = getHost ( m_hostId );
	if ( proxyHost )
		h = getProxy ( m_hostId );
	if ( ! h ) {
		log(LOG_WARN, "conf: HostId %" PRId32" not found in %s.", m_hostId,filename);
		return false;
	}
	// set m_dir to THIS host's working dir
	strncpy(m_dir, h->m_dir, sizeof(m_dir));
	m_dir[sizeof(m_dir)-1] = '\0';
	
	// likewise, set m_htmlDir to this host's html dir
	snprintf(m_httpRootDir, sizeof(m_httpRootDir), "%shtml/" , m_dir );
	m_httpRootDir[sizeof(m_httpRootDir)-1] = '\0';

	snprintf(m_logFilename, sizeof(m_logFilename), "%slog%03" PRId32, m_dir , m_hostId );
	m_logFilename[sizeof(m_logFilename)-1] = '\0';
	
	if ( ! g_conf.m_runAsDaemon &&
	     ! g_conf.m_logToFile )
		sprintf(m_logFilename,"/dev/stderr");


	int32_t gcount = 0;
	for ( int32_t i = 0 ; i < MAX_KSLOTS && m_numHosts ; i++ ) {
		// now just map to the shard # not the groupId... simpler...
		m_map[i] = gcount % m_numShards;
		// inc it
		gcount++;
	}

	// set our group
	m_myShard = getShard ( m_myHost->m_shardNum );

	//calculate CRC once and ofr all
	getCRC();

	// has the hosts
	return hashHosts();
}


bool Hostdb::hashHosts ( ) {
	// this also holds g_hosts2 as well as g_hosts so we cannot preallocate
	for ( int32_t i = 0 ; i < m_numHosts ; i++ ) {
		Host *h = &m_hosts[i];
		// init shotgun bit here, 0 or 1 depending on our hostId
		h->m_shotgunBit = m_hostId & 0x01;
		int32_t ip;
		ip = h->m_ip;
		if ( ! hashHost ( 1,h,ip, h->m_port     )) return false;
		if ( ! hashHost ( 0,h,ip, h->m_httpPort )) return false;
		if ( ! hashHost ( 0,h,ip, h->m_httpsPort)) return false;
		// . only hash this if not already in there
		// . just used to see if ip is in the network (local)
		if ( ! hashHost ( 1 , h , ip, 0 )) return false;

		// only hash shotgun ip if different
		if ( h->m_ip == h->m_ipShotgun ) continue;

		ip = h->m_ipShotgun;
		if ( ! hashHost ( 1,h,ip, h->m_port     )) return false;
		if ( ! hashHost ( 0,h,ip, h->m_httpPort )) return false;
		if ( ! hashHost ( 0,h,ip, h->m_httpsPort)) return false;
		// . only hash this if not already in there
		// . just used to see if ip is in the network (local)
		if ( ! hashHost ( 1 , h , ip, 0 )) return false;
	}

	// . hash loopback ip to point to us
	// . udpserver needs this?
	// . only do this if they did not already specify a 127.0.0.1 in
	//   the hosts.conf i guess
	int32_t lbip = atoip("127.0.0.1");
	Host *hxx = getUdpHost ( lbip , m_myHost->m_port );
	// only do this if not explicitly assigned to 127.0.0.1 in hosts.conf
	if ( ! hxx && (int32_t)m_myHost->m_ip != lbip ) {
		int32_t loopbackIP = atoip("127.0.0.1",9);
		if ( ! hashHost(1,m_myHost,loopbackIP,m_myHost->m_port)) 
			return false;
	}

	// and the proxies as well
	for ( int32_t i = 0 ; i < m_numProxyHosts ; i++ ) {
		Host *h = getProxy(i);
		// init shotgun bit here, 0 or 1 depending on our hostId
		h->m_shotgunBit = m_hostId & 0x01;
		int32_t ip;
		ip = h->m_ip;
		if ( ! hashHost ( 1,h,ip, h->m_port     )) return false;
		if ( ! hashHost ( 0,h,ip, h->m_httpPort )) return false;
		if ( ! hashHost ( 0,h,ip, h->m_httpsPort)) return false;
		// . only hash this if not already in there
		// . just used to see if ip is in the network (local)
		if ( ! hashHost ( 1 , h , ip, 0 )) return false;

		// only hash shotgun ip if different
		if ( h->m_ip == h->m_ipShotgun ) continue;

		ip = h->m_ipShotgun;
		if ( ! hashHost ( 1,h,ip, h->m_port     )) return false;
		if ( ! hashHost ( 0,h,ip, h->m_httpPort )) return false;
		if ( ! hashHost ( 0,h,ip, h->m_httpsPort)) return false;
		// . only hash this if not already in there
		// . just used to see if ip is in the network (local)
		if ( ! hashHost ( 1 , h , ip, 0 )) return false;
	}

	// verify g_hostTableUdp
	for ( int32_t i = 0 ; i < m_numHosts ; i++ ) {
		// get the ith host
		Host *h = &m_hosts[i];
		Host *h2 = getUdpHost ( h->m_ip , h->m_port );
		if ( h != h2 ) {
			log( LOG_WARN, "db: Host lookup failed for hostId %i.", h->m_hostId );
			return false;
		}

		h2 = getUdpHost ( h->m_ipShotgun , h->m_port );
		if ( h != h2 ) {
			log( LOG_WARN, "db: Host lookup2 failed for hostId %" PRId32".", h->m_hostId );
			return false;
		}
		if ( ! isIpInNetwork ( h->m_ip ) ) {
			log( LOG_WARN, "db: Host lookup5 failed for hostId %" PRId32".", h->m_hostId );
			return false;
		}
	}

	// verify g_hostTableTcp
	for ( int32_t i = 0 ; i < m_numHosts ; i++ ) {
		// get the ith host
		Host *h = &m_hosts[i];
		Host *h2 ;
		h2 = getTcpHost ( h->m_ip , h->m_httpPort );
		if ( h != h2 ) {
			char ipbuf[16];
			log( LOG_WARN, "db: Host lookup3 failed for hostId %" PRId32". ip=%s port=%hu",
			     h->m_hostId, iptoa(h->m_ip,ipbuf), h->m_httpPort );
			return false;
		}
		h2 = getTcpHost ( h->m_ip , h->m_httpsPort );
		if ( h != h2 ) {
			log( LOG_WARN, "db: Host lookup4 failed for hostId %" PRId32".", h->m_hostId );
			return false;
		}
	}

	return true;
}

bool Hostdb::hashHost (	bool udp , Host *h , uint32_t ip , uint16_t port ) {
	Host *hh = NULL;
	if ( udp ) hh = getUdpHost ( ip , port );

	if ( hh && port ) { 
		char ipbuf[16];
		log(LOG_WARN, "db: Must hash hosts.conf first, then hosts2.conf.");
		log(LOG_WARN, "db: or there is a repeated ip/port in hosts.conf.");
		log(LOG_WARN, "db: repeated host ip=%s port=%" PRId32" "
		    "name=%s",iptoa(ip,ipbuf),(int32_t)port,h->m_hostname);
		return false;//g_process.shutdownAbort(true);
	}

	HashTableX *t;
	if ( udp ) t = &g_hostTableUdp;
	else       t = &g_hostTableTcp;
	// initialize the table?
	if ( !t->isInitialized() ) {
		t->set ( 8 , sizeof(char *),16,NULL,0,false,"hostbl");
	}
	// get his key
	uint64_t key = 0;
	// masking the low bits of the ip is not good because it is
	// the same for every host! so reverse the key to get good hash
	char *dst = (char *)&key;
	char *src = (char *)&ip;
	dst[0] = src[3];
	dst[1] = src[2];
	dst[2] = src[1];
	dst[3] = src[0];
	// port too
	char *src2 = (char *)&port;
	dst[4] = src2[1];
	dst[5] = src2[0];
	// look it up
	int32_t slot = t->getSlot ( &key );
	// see if there is a collision
	Host *old = NULL;
	if ( slot >= 0 ) {
		// ports of 0 mean we are just adding an ip, and we can
		// have multiple hosts on the same ip. this call was just
		// to make isIpInNetwork() function work.
		if ( port == 0 ) return true;
		old = *(Host **)t->getValueFromSlot(slot);
		log(LOG_WARN, "db: Got collision between hostId %" PRId32" and %" PRId32"(proxy=%" PRId32"). "
			"Both have same ip/port. Does hosts.conf match hosts2.conf?",
		    old->m_hostId,h->m_hostId,(int32_t)h->m_isProxy);
		return false;
	}
	// add the new key with a ptr to host using m_port
	return t->addKey ( &key , &h ); // (uint32_t)h ) ;
}

int32_t Hostdb::getHostId ( uint32_t ip , uint16_t port ) {
	Host *h = getUdpHost ( ip , port );
	if ( ! h ) return -1;
	return h->m_hostId;
}

Host *Hostdb::getHostByIp ( uint32_t ip ) {
	return getHostFromTable ( true , ip , 0 );	
}

// . get Host entry from ip/port
// . port defaults to 0 for no port
Host *Hostdb::getUdpHost ( uint32_t ip , uint16_t port ) {
	return getHostFromTable ( true , ip , port );
}	

// . get Host entry from ip/port
// . port defaults to 0 for no port
Host *Hostdb::getTcpHost ( uint32_t ip , uint16_t port ) {
	return getHostFromTable ( false , ip , port );
}	

bool Hostdb::isIpInNetwork ( uint32_t ip ) {
	// use port of 0
	if ( getHostByIp ( ip ) ) return true;
	// not found
	return false;
}

// . get Host entry from ip/port
// . this works on proxy hosts as well!
// . use a port of 0 if we should disregard port
Host *Hostdb::getHostFromTable ( bool udp , uint32_t ip , uint16_t port ) {
	HashTableX *t;
	if ( udp ) t = &g_hostTableUdp;
	else       t = &g_hostTableTcp;
	// reset key
	uint64_t key = 0;
	// masking the low bits of the ip is not good because it is
	// the same for every host! so reverse the key to get good hash
	char *dst = (char *)&key;
	char *src = (char *)&ip;
	dst[0] = src[3];
	dst[1] = src[2];
	dst[2] = src[1];
	dst[3] = src[0];
	// port too
	char *src2 = (char *)&port;
	dst[4] = src2[1];
	dst[5] = src2[0];
	// look it up
	int32_t slot = t->getSlot ( &key );
	// return NULL if not found
	if ( slot < 0 ) return NULL;
	return *(Host **) t->getValueFromSlot ( slot );
}

// . this is used by gbsort() above
// . sorts Hosts by their shard
static int cmp (const void *v1, const void *v2) {
	Host *h1 = (Host *)v1;
	Host *h2 = (Host *)v2;
	// return if shards differ
	if ( h1->m_shardNum < h2->m_shardNum ) return -1; 
	if ( h1->m_shardNum > h2->m_shardNum ) return  1;
	// otherwise sort by hostId
	return h1->m_hostId - h2->m_hostId;
}

#include "Stats.h"

bool Hostdb::isShardDead(int32_t shardNum) const {
	// how many seconds since our main process was started?
	// i guess all nodes initially appear dead, so
	// compensate for that.
	long long now = gettimeofdayInMilliseconds();
	long elapsed = (now - g_stats.m_startTime) ;/// 1000;
	if ( elapsed < 60*1000 ) return false; // try 60 secs now

	const Host *shard = getShard(shardNum);
	//Host *live = NULL;
	for ( int32_t i = 0 ; i < m_numHostsPerShard ; i++ ) {
		if(!isDead(shard[i].m_hostId))
			return false;
	}
	return true;
}


int32_t Hostdb::getHostIdWithSpideringEnabled ( uint32_t shardNum ) {
	Host *hosts = getShard ( shardNum);
	int32_t numHosts = getNumHostsPerShard();

	int32_t hostNum = 0;
	int32_t numTried = 0;
	while( !hosts [ hostNum ].m_spiderEnabled && numTried < numHosts ) {
		hostNum = (hostNum+1) % numHosts;
		numTried++;
	}
	if( !hosts [ hostNum ].m_spiderEnabled) {
		log("build: cannot spider when entire shard has nospider enabled");
		g_process.shutdownAbort(true);
	}
	return hosts [ hostNum ].m_hostId ;
}


Host *Hostdb::getHostWithSpideringEnabled ( uint32_t shardNum ) {
	Host *hosts = getShard ( shardNum);
	int32_t numHosts = getNumHostsPerShard();

	int32_t hostNum = 0;
	int32_t numTried = 0;
	while( !hosts [ hostNum ].m_spiderEnabled && numTried < numHosts ) {
		hostNum = (hostNum+1) % numHosts;
		numTried++;
	}
	if( !hosts [ hostNum ].m_spiderEnabled) {
		log("build: cannot spider when entire shard has nospider enabled");
		g_process.shutdownAbort(true);
	}
	return &hosts [ hostNum ];
}


// if niceness 0 can't pick noquery host/ must pick spider host.
// if niceness 1 can't pick nospider host/ must pick query host.
// Used to select based on PingInfo::m_udpSlotsInUseIncoming but that information is not exchanged often enough to
// be even remotely accurate with any realistic number of shards.
Host *Hostdb::getLeastLoadedInShard ( uint32_t shardNum , char niceness ) {
	int32_t minOutstandingRequestsIndex = -1;
	Host *shard = getShard ( shardNum );
	Host *bestDead = NULL;
	ScopedLock sl(m_mtxPinginfo);
	for(int32_t i = 0; i < m_numHostsPerShard; i++) {
		Host *hh = &shard[i];
		// don't pick a 'no spider' host if niceness is 1
		if ( niceness >  0 && ! hh->m_spiderEnabled ) continue;
		// don't pick a 'no query' host if niceness is 0
		if ( niceness == 0 && ! hh->m_queryEnabled  ) continue;
		if ( ! bestDead ) bestDead = hh;
		if(isDead_unlocked(hh)) continue;

		minOutstandingRequestsIndex = i;
	}
	// we should never return a nospider/noquery host depending on
	// the niceness, so return bestDead
	if(minOutstandingRequestsIndex == -1) return bestDead;//shard;

	return &shard[minOutstandingRequestsIndex];
}

// if all are dead just return host #0
Host *Hostdb::getFirstAliveHost() {
	for ( int32_t i = 0 ; i < m_numHosts ; i++ )
		// if host #i is alive, return her
		if ( ! isDead ( i ) ) return getHost(i);
	// if all are dead just return host #0
	return getHost(0);
}

bool Hostdb::hasDeadHost() const {
	for ( int32_t i = 0 ; i < m_numHosts ; i++ )
		if ( isDead ( i ) ) return true;
	return false;
}

int Hostdb::getNumHostsDead() const {
	int count=0;
	for(int32_t i = 0; i < m_numHosts; i++)
		if(isDead(i))
			count++;
	return count;
}

bool Hostdb::isDead(int32_t hostId) const {
	const Host *h = getHost(hostId);
	return isDead ( h );
}

bool Hostdb::isDead(const Host *h) const {
	ScopedLock sl(m_mtxPinginfo);
	return isDead_unlocked(h);
}

bool Hostdb::isDead_unlocked(const Host *h) const {
	if(h->m_retired)
		return true; // retired means "don't use it", so it is essentially dead
	if(m_myHost == h)
		return false; //we are not dead
	return !h->m_isAlive;
}

int64_t Hostdb::getNumGlobalRecs ( ) {
	int64_t n = 0;
	for ( int32_t i = 0 ; i < m_numHosts ; i++ )
		n += getHost ( i )->m_runtimeInformation.m_totalDocsIndexed;
	return n / m_numHostsPerShard;
}

bool Hostdb::setNote ( int32_t hostId, const char *note, int32_t noteLen ) {
	// replace the note on the host
	if ( noteLen > 125 ) noteLen = 125;
	Host *h = getHost ( hostId );
	if ( !h ) return true;
	gbmemcpy(h->m_note, note, noteLen);
	h->m_note[noteLen] = '\0';
	// write this hosts conf out
	return saveHostsConf();
}

bool Hostdb::setSpareNote ( int32_t spareId, const char *note, int32_t noteLen ) {
	// replace the note on the host
	if ( noteLen > 125 ) noteLen = 125;
	Host *h = getSpare ( spareId );
	if ( !h ) return true;
	gbmemcpy(h->m_note, note, noteLen);
	h->m_note[noteLen] = '\0';
	// write this hosts conf out
	return saveHostsConf();
}

bool Hostdb::replaceHost ( int32_t origHostId, int32_t spareHostId ) {
	Host *oldHost = getHost(origHostId);
	Host *spareHost = getSpare(spareHostId);
	if ( !oldHost || !spareHost ) {
		log(LOG_WARN, "init: Bad Host or Spare given. Aborting.");
		return false;
	}
	// host must be dead
	if ( !isDead(oldHost) ) {
		log(LOG_WARN, "init: Cannot replace live host. Aborting.");
		return false;
	}


	Host tmp;
	gbmemcpy ( &tmp , oldHost , sizeof(Host) );
	gbmemcpy ( oldHost , spareHost , sizeof(Host) );
	gbmemcpy ( spareHost , &tmp , sizeof(Host) );

	// however, these values need to change
	oldHost->m_hostId      = origHostId;
	oldHost->m_shardNum    = spareHost->m_shardNum;
	oldHost->m_stripe      = spareHost->m_stripe;
	oldHost->m_isProxy     = spareHost->m_isProxy;
	oldHost->m_type        = HT_SPARE;

	// and the new spare gets a new hostid too
	spareHost->m_hostId = spareHostId;

	// reset these stats
	oldHost->m_runtimeInformation.m_totalDocsIndexed         = 0;
	oldHost->m_emailCode           = 0;
	oldHost->m_errorReplies        = 0;
	oldHost->m_dgramsTo            = 0;
	oldHost->m_dgramsFrom          = 0;
	oldHost->m_totalResends        = 0;
	oldHost->m_etryagains          = 0;
	oldHost->m_repairMode          = 0;
	oldHost->m_splitsDone          = 0;
	oldHost->m_splitTimes          = 0;

	// write this hosts conf out
	saveHostsConf();
	//
	// . now we need to replace the ips and ports in the hash tables
	//   just clear the hash tables and rehash
	// 
	g_hostTableUdp.clear();
	g_hostTableTcp.clear();
	// now restock everything
	hashHosts();

	// replace ips in udp server
	g_udpServer.replaceHost ( spareHost, oldHost );
	return true;
}

// use the ip that is not dead, prefer eth0
int32_t Hostdb::getBestIp(const Host *h) {
	//used to examin ping times and chose between normal Ip and "shotgun" ip
	return h->m_ip;
}

// . "h" is from g_hostdb2, the "external" cluster
// . should we send to its primary or shotgun ip?
// . this returns which ip we should send to
int32_t Hostdb::getBestHosts2IP(const Host *h) {
	if(ip_distance(h->m_ip) <= ip_distance(h->m_ipShotgun))
		return h->m_ip;
	else
		return h->m_ipShotgun;
}


void Hostdb::updateAliveHosts(const int32_t alive_hosts_ids[], size_t n) {
	ScopedLock sl(m_mtxPinginfo);
	for(int32_t i=0; i<m_numHosts; i++)
		m_hosts[i].m_isAlive = false;
	m_myHost->m_isAlive = true;
	for(size_t i=0; i<n; i++) {
		int32_t hostid = alive_hosts_ids[i];
		if(hostid>=0 && hostid<m_numHosts)
			m_hostPtrs[hostid]->m_isAlive = true;
	}
	//update m_numHostsAlive
	m_numHostsAlive = 0;
	for(int32_t i=0; i<m_numHosts; i++)
		if(m_hosts[i].m_isAlive)
			m_numHostsAlive++;
}

void Hostdb::updateHostRuntimeInformation(int hostId, const HostRuntimeInformation &hri) {
	if(hostId<0)
		gbshutdownLogicError();
	if(hostId>=m_numHosts)
		return; //out-of-sync hosts.conf ?
	ScopedLock sl(m_mtxPinginfo);
	bool crc_changed = m_hostPtrs[hostId]->m_runtimeInformation.m_hostsConfCRC != hri.m_hostsConfCRC;
	bool repairmode_changed = m_hostPtrs[hostId]->m_runtimeInformation.m_repairMode != hri.m_repairMode;
	m_hostPtrs[hostId]->m_runtimeInformation = hri;
	m_hostPtrs[hostId]->m_runtimeInformation.m_valid = true;
	if(crc_changed) {
		//recalculate m_hostsConfInAgreement and m_hostsConfInDisagreement
		m_hostsConfInDisagreement = false;
		m_hostsConfInAgreement = false;
		// if we haven't received crc from all hosts then we will not set either flag.
		int32_t agreeCount = 0;
		for(int i = 0; i < getNumGrunts(); i++) {
			// skip if not received yet
			if(m_hosts[i].isHostsConfCRCKnown()) {
				if(!m_hosts[i].hasSameHostsConfCRC()) {
					m_hostsConfInDisagreement = true;
					break;
				}
				agreeCount++;
			}
		}

		// if all in agreement, set this flag
		if(agreeCount == getNumGrunts()) {
			m_hostsConfInAgreement = true;
		}
	}
	if(repairmode_changed) {
		//recalculate m_minRepairMode/m_minRepairModeBesides0/m_minRepairModeHost
		const Host *newMinHost = NULL;
		char newMin = -1;
		char newMin0 = -1;
		for(int i=0; i<m_numHosts; i++) {
			if(newMin==-1 || m_hosts[i].m_runtimeInformation.m_repairMode < newMin) {
				newMinHost = &(m_hosts[i]);
				newMin = m_hosts[i].m_runtimeInformation.m_repairMode;
			}
			if(newMin0==-1 || (m_hosts[i].m_runtimeInformation.m_repairMode!=0 && m_hosts[i].m_runtimeInformation.m_repairMode<newMin0))
				newMin0 = m_hosts[i].m_runtimeInformation.m_repairMode;
		}
		m_minRepairMode = newMin;
		m_minRepairModeBesides0 = newMin0;
		m_minRepairModeHost = newMinHost;
	}
}


void Hostdb::setOurFlags() {
	m_myHost->m_runtimeInformation.m_flags = getOurHostFlags();
	m_myHost->m_runtimeInformation.m_valid = true;
}

void Hostdb::setOurTotalDocsIndexed() {
	m_myHost->m_runtimeInformation.m_totalDocsIndexed = g_process.getTotalDocsIndexed();
	
}

// assume to be from posdb here
uint32_t Hostdb::getShardNumByTermId(const void *k) const {
	return m_map [(*(uint16_t *)((char *)k + 16))>>3];
}

// . if false, we don't split index and date lists, other dbs are unaffected
// . this obsolets the g_*.getGroupId() functions
// . this allows us to have any # of groups in a stripe, not just power of 2
// . now we can use 3 stripes of 96 hosts each so spiders will almost never
//   go down
uint32_t Hostdb::getShardNum(rdbid_t rdbId, const void *k) const {
	switch(rdbId) {
		case RDB_POSDB:
		case RDB2_POSDB2:
			if( Posdb::isShardedByTermId ( k ) ) {
				// based on termid NOT docid!!!!!!
				// good for page checksums so we only have to do disk
				// seek on one shard, not all shards.
				// use top 13 bits of key.
				return m_map [(*(uint16_t *)((char *)k + 16))>>3];
			} else {
				uint64_t d = Posdb::getDocId ( k );
				return m_map [ ((d>>14)^(d>>7)) & (MAX_KSLOTS-1) ];
			}

		case RDB_LINKDB:
		case RDB2_LINKDB2:
			// sharded by part of linkee sitehash32
			return m_map [(*(uint16_t *)((char *)k + 26))>>3];

		case RDB_TITLEDB:
		case RDB2_TITLEDB2: {
			uint64_t d = Titledb::getDocId ( (key96_t *)k );
			return m_map [ ((d>>14)^(d>>7)) & (MAX_KSLOTS-1) ];
		}

		case RDB_SPIDERDB:
		case RDB2_SPIDERDB2: {
			int32_t firstIp = Spiderdb::getFirstIp((key128_t *)k);
			// do what Spider.h getGroupId() used to do so we are
			// backwards compatible
			uint32_t h = (uint32_t)hash32h(firstIp,0x123456);
			// use that for getting the group
			return m_map [ h & (MAX_KSLOTS-1)];
		}

		case RDB_CLUSTERDB:
		case RDB2_CLUSTERDB2: {
			uint64_t d = Clusterdb::getDocId ( k );
			return m_map [ ((d>>14)^(d>>7)) & (MAX_KSLOTS-1) ];
		}

		case RDB_TAGDB:
		case RDB2_TAGDB2:
			return m_map [(*(uint16_t *)((char *)k + 10))>>3];

		case RDB_DOLEDB:
			// HACK:!!!!!!  this is a trick!!! it is us!!!
			//return m_myHost->m_groupId;
			return m_myHost->m_shardNum;

		default:
			// core -- must be provided
			g_process.shutdownAbort(true);
	}
}

uint32_t Hostdb::getShardNumFromDocId(int64_t d) const {
	return m_map [ ((d>>14)^(d>>7)) & (MAX_KSLOTS-1) ];
}

Host *Hostdb::getBestSpiderCompressionProxy ( int32_t *key ) {
	static int32_t s_numTotal = 0;
	static int32_t s_numAlive = 0;
	static Host *s_alive[64];
	static Host *s_lastResort = NULL;
	static bool s_aliveValid = false;

	if ( ! s_aliveValid ) {
		// come up to "redo" from below if a host goes dead
	redo:
		s_aliveValid = true;
		for ( int32_t i = 0 ; i < m_numProxyHosts ; i++ ) {
			Host *h = getProxy(i);
			if ( ! (h->m_type & HT_SCPROXY ) ) continue;
			// if all dead use this
			s_lastResort = h;
			// count towards total even if not alive
			s_numTotal++;
			// now must be alive
			if ( isDead(h) ) continue;
			// stop to avoid breach
			if ( s_numAlive >= 64 ) { g_process.shutdownAbort(true); }
			// add it otherwise
			s_alive[s_numAlive++] = h;
		}
	}

	// if no scproxy in hosts.conf return NULL
	if ( s_numTotal == 0 ) return NULL;

	// if none alive, use last resort, a non-null dead host
	if ( s_numAlive == 0 ) return s_lastResort;

	// pick one based on the key
	int32_t ni = hash32((char *)key , 4 ) % s_numAlive;
	// get it
	Host *h = s_alive[ni];
	// if dead, recompute alive[] table and try again!
	if ( isDead(h) ) goto redo;
	// got a live one
	return h;
}

int32_t Hostdb::getCRC ( ) {
	if ( m_crcValid ) return m_crc;
	// hash up all host entries, just the grunts really.
	const int num_grunts = getNumGrunts();
	SafeBuf str;
	for ( int32_t i = 0 ; i < num_grunts ; i++ ) {
		Host *h = &m_hosts[i];
		char ipbuf[16];
		// dns client port not so important
		str.safePrintf("%" PRId32",", i);
		str.safePrintf("%s," , iptoa(h->m_ip,ipbuf));
		str.safePrintf("%s," , iptoa(h->m_ipShotgun,ipbuf));
		str.safePrintf("%" PRId32",", (int32_t)h->m_httpPort);
		str.safePrintf("%" PRId32",", (int32_t)h->m_httpsPort);
		str.safePrintf("%" PRId32",", (int32_t)h->m_port);
		str.pushChar('\n');
	}
	str.nullTerm();

	m_crc = hash32n ( str.getBufStart() );

	// make sure it is legit
	if ( m_crc == 0 ) m_crc = 1;

	m_crcValid = true;

	log(LOG_INFO,"conf: hosts.conf CRC calculated as %d based on %d grunts", m_crc, num_grunts);
	return m_crc;
}


static void printHostsConfPreamble(SafeBuf *sb, int numMirrors, int indexSplits) {
	sb->safePrintf("# The Gigablast host configuration file.\n");
	sb->safePrintf("# Tells us what hosts are participating in the distributed search engine.\n");

	sb->safePrintf("\n\n");
	sb->safePrintf("# How many mirrors do you want? If this is 0 then your data\n");
	sb->safePrintf("# will NOT be replicated. If it is 1 then each host listed\n");
	sb->safePrintf("# below will have one host that mirrors it, thereby decreasing\n");
	sb->safePrintf("# total index capacity, but increasing redundancy. If this is\n");
	sb->safePrintf("# 1 then the first half of hosts will be replicated by the\n");
	sb->safePrintf("# second half of the hosts listed below.\n");
	sb->safePrintf("\n");
	if(numMirrors>=0)
		sb->safePrintf("num-mirrors: %d\n",numMirrors);
	else
		sb->safePrintf("#num-mirrors: 0\n");
	sb->safePrintf("\n");
	
	sb->safePrintf("# Alternatively, how many shards(splits) are used?\n");
	if(indexSplits>0)
		sb->safePrintf("index-splits: %d\n",indexSplits);
	else
		sb->safePrintf("#index-splits: 1\n");
	
	sb->safePrintf("\n\n");
	sb->safePrintf("# List of hosts. Limited to 512 from MAX_HOSTS in Hostdb.h. Increase that\n");
	sb->safePrintf("# if you want more.\n");
	sb->safePrintf("#\n");

	sb->safePrintf("# Format:\n");
	sb->safePrintf("#\n");
	sb->safePrintf("# first   column: hostID (starts at 0 and increments from there)\n");
	sb->safePrintf("# second  column: the port used by the client DNS algorithms\n");
	sb->safePrintf("# third   column: port that HTTPS listens on\n");
	sb->safePrintf("# fourth  column: port that HTTP  listens on\n");
	sb->safePrintf("# fifth   column: port that udp server listens on\n");
	sb->safePrintf("# sixth   column: IP address or hostname that has an IP address in /etc/hosts\n");
	sb->safePrintf("# seventh column: like sixth column but for secondary ethernet port. Can be the same as the sixth column.\n");
	sb->safePrintf("# eigth   column: Working directory\n");
	sb->safePrintf("# ninth   column: An optional merge directory override\n");
	sb->safePrintf("# tenth   column: An optional text note that will display in the hosts table for this host.\n");

	sb->safePrintf("\n\n");
	sb->safePrintf("#\n");
	sb->safePrintf("# Example of a four-node distributed search index running on a single\n");
	sb->safePrintf("# server with four cores. The working directories are /home/mwells/hostN/.\n");
	sb->safePrintf("# The 'gb' binary resides in the working directories. We have to use\n");
	sb->safePrintf("# different ports for each gb instance since they are all on the same\n");
	sb->safePrintf("# server.\n");
	sb->safePrintf("#\n");

	sb->safePrintf("#\n");
	sb->safePrintf("#0 5998 7000 8000 9000 192.0.2.4 192.0.2.5 /home/mwells/host0/\n");
	sb->safePrintf("#1 5997 7001 8001 9001 192.0.2.4 192.0.2.5 /home/mwells/host1/\n");
	sb->safePrintf("#2 5996 7002 8002 9002 192.0.2.4 192.0.2.5 /home/mwells/host2/\n");
	sb->safePrintf("#3 5995 7003 8003 9003 192.0.2.4 192.0.2.5 /home/mwells/host3/\n");

	sb->safePrintf("\n");
	sb->safePrintf("# A four-node cluster with different merge dir:\n");
	sb->safePrintf("#0 5998 7000 8000 9000 192.0.2.4 192.0.2.5 /home/mwells/host0/ /mnt/merge/host0/\n");
	sb->safePrintf("#1 5997 7001 8001 9001 192.0.2.4 192.0.2.5 /home/mwells/host1/ /mnt/merge/host1/\n");
	sb->safePrintf("#2 5996 7002 8002 9002 192.0.2.4 192.0.2.5 /home/mwells/host2/ /mnt/merge/host2/\n");
	sb->safePrintf("#3 5995 7003 8003 9003 192.0.2.4 192.0.2.5 /home/mwells/host3/ /mnt/merge/host3/\n");

	sb->safePrintf("\n");
	sb->safePrintf("# A four-node cluster on four different servers:\n");
	sb->safePrintf("#0 5998 7000 8000 9000 192.0.2.4 192.0.2.5 /home/mwells/gigablast/\n");
	sb->safePrintf("#1 5998 7000 8000 9000 192.0.2.6 192.0.2.7 /home/mwells/gigablast/\n");
	sb->safePrintf("#2 5998 7000 8000 9000 192.0.2.8 192.0.2.9 /home/mwells/gigablast/\n");
	sb->safePrintf("#3 5998 7000 8000 9000 192.0.2.10 192.0.2.11 /home/mwells/gigablast/\n");
	sb->safePrintf("\n\n");
	sb->safePrintf("#\n");
	sb->safePrintf("# Example of an eight-node cluster.\n");
	sb->safePrintf("# Each line represents a single gb process with dual ethernet ports\n");
	sb->safePrintf("# whose IP addresses are in /etc/hosts under se0, se0b, se1, se1b, ...\n");
	sb->safePrintf("#\n");
	sb->safePrintf("#0 5998 7000 8000 9000 se0 se0b /home/mwells/gigablast/\n");
	sb->safePrintf("#1 5998 7000 8000 9000 se1 se1b /home/mwells/gigablast/\n");
	sb->safePrintf("#2 5998 7000 8000 9000 se2 se2b /home/mwells/gigablast/\n");
	sb->safePrintf("#3 5998 7000 8000 9000 se3 se3b /home/mwells/gigablast/\n");
	sb->safePrintf("#4 5998 7000 8000 9000 se4 se4b /home/mwells/gigablast/\n");
	sb->safePrintf("#5 5998 7000 8000 9000 se5 se5b /home/mwells/gigablast/\n");
	sb->safePrintf("#6 5998 7000 8000 9000 se6 se6b /home/mwells/gigablast/\n");
	sb->safePrintf("#7 5998 7000 8000 9000 se7 se7b /home/mwells/gigablast/\n");
}


bool Hostdb::createHostsConf(const char *cwd) {
	fprintf(stderr,"Creating %shosts.conf\n", cwd);
	
	SafeBuf sb;
	printHostsConfPreamble(&sb, 0, -1);

	sb.safePrintf("\n");
	sb.safePrintf("\n");
	sb.safePrintf("0 5998 7000 8000 9000 127.0.0.1 127.0.0.1 %s\n", cwd);


	log("%shosts.conf does not exist, creating it", cwd);
	int rc = sb.save(cwd, "hosts.conf");
	return rc>=0;
}


bool Hostdb::saveHostsConf() {
	SafeBuf sb;
	//we don't recalculate "numMirrors" so that is always transformed into numsplits. Hrmpf.
	printHostsConfPreamble(&sb, -1, m_indexSplits);
	
	for(int32_t i = 0; i < m_numTotalHosts; i++) {
		Host *h;
		if(i < m_numHosts)
			h = getHost(i);
		else if(i < m_numHosts + m_numSpareHosts)
			h = getSpare(i - m_numHosts);
		else
			h = getProxy(i - m_numHosts - m_numSpareHosts);

		// generate the host id
		if(i >= m_numHosts + m_numSpareHosts)
			sb.safePrintf("proxy ");
		else if(i >= m_numHosts )
			sb.safePrintf("spare ");
		else
			sb.safePrintf("%03d ", i);
		
		sb.safePrintf("%5d %5d %5d %5d %s %s %s %s %s %s\n",
		              h->m_dnsClientPort,
			      h->getInternalHttpsPort(),
			      h->getInternalHttpPort(),
			      h->m_port,
			      h->m_hostname, h->m_hostname2,
			      h->m_dir,
			      h->m_mergeDir,
			      h->m_mergeLockDir,
			      h->m_note);
	}
	if(sb.save(m_dir,"hosts.conf")>=0)
		return true;
	else
		return false;
}


static int32_t s_localIps[20];
int32_t *getLocalIps ( ) {
	static bool s_valid = false;
	if ( s_valid ) return s_localIps;
	s_valid = true;
	struct ifaddrs *ifap = NULL;
	if ( getifaddrs( &ifap ) < 0 ) {
		log("hostdb: getifaddrs: %s.",mstrerror(errno));
		return NULL;
	}
	int32_t ni = 0;
	// store loopback just in case
	int32_t loopback = atoip("127.0.0.1");
	s_localIps[ni++] = loopback;
	for(ifaddrs *p = ifap; p && ni < 18 ; p = p->ifa_next) {
		if ( ! p->ifa_addr ) continue;
		if(p->ifa_addr->sa_family != AF_INET)
			continue;
		struct sockaddr_in *xx = (sockaddr_in *)(void*)p->ifa_addr;
		int32_t ip = xx->sin_addr.s_addr;
		// skip if loopback we stored above
		if ( ip == loopback ) continue;
		// skip bogus ones
		if ( (uint32_t)ip <= 10 ) continue;
		// show it
		//log("host: detected local ip %s",iptoa(ip));
		// otherwise store it
		s_localIps[ni++] = ip;
	}
	// mark the end of it
	s_localIps[ni] = 0;
	// free that memore
	freeifaddrs ( ifap );
	// return the static buffer
	return s_localIps;
}


Host *Hostdb::getHost2 ( const char *cwd , int32_t *localIps ) {
	for ( int32_t i = 0 ; i < m_numHosts ; i++ ) {
		Host *h = &m_hosts[i];
		// . get the path. guaranteed to end in '/'
		//   as well as cwd!
		// . if the gb binary does not reside in the working dir
		//   for this host, skip it, it's not our host
		if ( strcmp(h->m_dir,cwd) != 0 ) continue;
		// now it must be our ip as well!
		int32_t *ipPtr = localIps;
		for ( ; *ipPtr ; ipPtr++ ) 
			// return the host if it also matches the ip!
			if ( (int32_t)h->m_ip == *ipPtr ) return h;
	}
	// what, no host?
	return NULL;
}

Host *Hostdb::getProxy2 ( const char *cwd , int32_t *localIps ) {
	for ( int32_t i = 0 ; i < m_numProxyHosts ; i++ ) {
		Host *h = getProxy(i);
		if ( ! (h->m_type & HT_PROXY ) ) continue;
		// . get the path. guaranteed to end in '/'
		//   as well as cwd!
		// . if the gb binary does not reside in the working dir
		//   for this host, skip it, it's not our host
		if ( strcmp(h->m_dir,cwd) != 0 ) continue;
		// now it must be our ip as well!
		int32_t *ipPtr = localIps;
		for ( ; *ipPtr ; ipPtr++ ) 
			// return the host if it also matches the ip!
			if ( (int32_t)h->m_ip == *ipPtr ) return h;
	}
	// what, no host?
	return NULL;
}