privacore-open-source-searc.../InstanceInfoExchange.cpp
2017-08-11 14:50:24 +02:00

375 lines
10 KiB
C++

#include "InstanceInfoExchange.h"
#include "Hostdb.h"
#include "GbMutex.h"
#include "ScopedLock.h"
#include "Log.h"
#include "GbUtil.h"
#include "Hostdb.h"
#include "Conf.h"
#include "DailyMerge.h"
#include "Collectiondb.h"
#include "Version.h"
#include "repair_mode.h"
#include "HostFlags.h"
#include "Process.h"
#include "IOBuffer.h"
#include <pthread.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <poll.h>
#include <map>
#include <string>
#include <vector>
#include <pwd.h>
//We use Vagus for the information exchange.
//Previously, PingInfo and PingServer was used but that scaled poorly to
//hundreths of instances because it would take quite a while for the
//information to be fully propagated
//The vagus cluster name is configurable (eefaulting to gb-$USER)
//The 'extra information' peice is used for piggubacking:
// gbVersionStr
// hosts.conf CRC
// host flags
// dailyMergeCollnum
// repairMode
// totalDocsIndexed
//We use two sockets toward the local Vagus instance. One for sending
//keepalives and one for regularly polling for instance information. It is
//tempting to use the combined "keepalivepoll" feature in vagus but if the
//thread isn't calling weAreAlive() often enough then the ( other) instance
//information will be out-of-date.
static bool enabled = false;
static int fd_keepalive = -1;
static int fd_pipe[2] = {-1,-1};
static bool please_shut_down=true;
static pthread_t tid;
static char vagus_cluster_name[128] = "gb";
//Connect to Vagus (blocking)
static int connect_to_vagus(int port) {
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(fd<0) {
log(LOG_ERROR,"vagus: socket() failed with errno=%d (%s)", errno, strerror(errno));
return -1;
}
sockaddr_in sin;
memset(&sin,0,sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(port);
if(connect(fd,(sockaddr*)(void*)&sin,sizeof(sin))!=0) {
log(LOG_ERROR,"vagus: connect() failed with errno=%d (%s)", errno, strerror(errno));
close(fd);
return -1;
}
log(LOG_INFO, "vagus: Connected to Vagus on fd %d", fd);
return fd;
}
static void process_alive_hosts(std::map<int,std::string> &alive_hosts) {
static bool all_hosts_alive_last_time = false;
if(alive_hosts.size() == (unsigned)g_hostdb.getNumHosts()) {
if(!all_hosts_alive_last_time)
log(LOG_INFO, "vagus: got %zu alive hosts ", alive_hosts.size());
all_hosts_alive_last_time = true;
} else {
log(LOG_WARN, "vagus: got %zu alive hosts instead of %d", alive_hosts.size(), g_hostdb.getNumHosts());
char hosts[g_hostdb.getNumHosts()];
memset(hosts, '.', sizeof(hosts));
for (const auto &iter : alive_hosts) {
hosts[iter.first] = '+';
}
log(LOG_WARN, "vagus: %.*s", g_hostdb.getNumHosts(), hosts);
all_hosts_alive_last_time = false;
}
time_t now = time(NULL);
static time_t s_ownLastAliveTime = now;
if ((now - s_ownLastAliveTime) > (g_conf.m_vagusMaxDeadTime * 60)) {
log(LOG_ERROR, "vagus: We have not seen ourself alive for the past %d minutes. Aborting", g_conf.m_vagusMaxDeadTime);
gbshutdownResourceError();
}
std::vector<int> alive_hosts_ids;
alive_hosts_ids.reserve(alive_hosts.size());
for(auto iter : alive_hosts) {
int hostid = iter.first;
if(hostid<0 || hostid>=g_hostdb.getNumHosts())
continue;
if (hostid == g_hostdb.getMyHostId()) {
s_ownLastAliveTime = now;
}
alive_hosts_ids.push_back(hostid);
char extra_information[256];
if(iter.second.length()>=sizeof(extra_information))
continue;
strcpy(extra_information,iter.second.c_str());
char *ss=NULL;
const char *gb_version_str = strtok_r(extra_information,";",&ss);
const char *hosts_conf_crc_str = strtok_r(NULL,";",&ss);
const char *host_flags_str = strtok_r(NULL,";",&ss);
const char *daily_merge_collection_number_str = strtok_r(NULL,";",&ss);
const char *repair_mode_str = strtok_r(NULL,";",&ss);
const char *total_docs_indexed_str = strtok_r(NULL,";",&ss);
if(!gb_version_str || gb_version_str[0]=='\0')
continue;
if(strlen(gb_version_str)>=sizeof(HostRuntimeInformation::m_gbVersionStr))
continue;
char *endptr;
int hosts_conf_crc = (int)strtol(hosts_conf_crc_str,&endptr,0);
if(endptr && *endptr)
continue;
int host_flags = (int)strtol(host_flags_str,&endptr,0);
if(endptr && *endptr)
continue;
int daily_merge_collection_number = (int)strtol(daily_merge_collection_number_str,&endptr,0);
if(endptr && *endptr)
continue;
int repair_mode = (int)strtol(repair_mode_str,&endptr,0);
if(endptr && *endptr)
continue;
int total_docs_indexed = (int)strtol(total_docs_indexed_str,&endptr,0);
if(endptr && *endptr)
continue;
//phase 1: update host fields that seem safe
//when we get rid of PingServer entirely then this will take over
HostRuntimeInformation hri;
hri.m_valid = true;
hri.m_flags = host_flags;
hri.m_dailyMergeCollnum = daily_merge_collection_number;
strcpy(hri.m_gbVersionStr,gb_version_str);
hri.m_totalDocsIndexed = total_docs_indexed;
hri.m_hostsConfCRC = hosts_conf_crc;
hri.m_repairMode = repair_mode;
g_hostdb.updateHostRuntimeInformation(hostid, hri);
//Host *h = g_hostdb.getHost(hostid);
//h->m_pingInfo.m_repairMode = repair_mode;
}
g_hostdb.updateAliveHosts(&alive_hosts_ids.front(),alive_hosts_ids.size());
}
static bool do_vagus_poll(int fd) {
char command[256];
sprintf(command,"poll %s\n", vagus_cluster_name);
size_t bytes_to_write = strlen(command);
ssize_t bytes_written = write(fd,command,bytes_to_write);
if((size_t)bytes_written!=bytes_to_write) {
log(LOG_ERROR,"vagus: write(fd=%d) returned %zd, expected %zu, errno=%d", fd, bytes_written,bytes_to_write,errno);
return false;
}
IOBuffer in_buf;
for(;;) {
in_buf.reserve_extra(65536);
ssize_t bytes_read = read(fd, in_buf.end(), in_buf.spare());
if(bytes_read<0) {
log(LOG_ERROR,"vagus: read(fd=%d) from vagus failed with errno=%d (%s)",fd,errno,strerror(errno));
return false;
}
if(bytes_read==0) {
log(LOG_ERROR,"vagus: read(fd=%d) from vagus returned 0",fd);
return false;
}
in_buf.push_back((size_t)bytes_read);
if(in_buf.used()>=2 && in_buf.end()[-2]=='\n' && in_buf.end()[-1]=='\n')
break;
if(in_buf.used()==1 && in_buf.end()[-1]=='\n')
break;
}
//nul-terminate
in_buf.reserve_extra(1);
*(in_buf.end()) = '\0';
std::map<int,std::string> alive_hosts;
for(char *p = in_buf.begin(); p!=in_buf.end(); ) {
char *nl = strchr(p,'\n');
if(!nl) break;
*nl='\0';
char *ss=NULL;
char *hostid_str = strtok_r(p,":",&ss);
char *extra_information = strtok_r(NULL,"",&ss);
if(!hostid_str || !extra_information)
continue;
char *endptr = NULL;
int hostid = (int)strtol(hostid_str,&endptr,0);
if((endptr==0 || *endptr=='\0') && hostid>=0 && extra_information) {
alive_hosts[hostid] = extra_information;
}
p = nl+1;
}
process_alive_hosts(alive_hosts);
return true;
}
static void *poll_thread(void *) {
pthread_setname_np(pthread_self(),"vaguspoll");
struct pollfd pfd[2];
memset(pfd,0,sizeof(pfd));
pfd[0].fd = fd_pipe[0];
pfd[0].events = POLLIN;
pfd[1].fd = connect_to_vagus(g_conf.m_vagusPort);
pfd[0].events = POLLIN;
uint64_t next_poll_ms = 0; //poll toward vagus - not poll() syscall
while(!please_shut_down) {
uint64_t now_ms = getCurrentTimeNanoseconds()/1000000;
if(next_poll_ms > now_ms)
(void)poll(pfd,2,next_poll_ms-now_ms);
if(please_shut_down)
break;
if(pfd[1].revents&(POLLIN|POLLHUP)) {
//unexpected input or lost connection.
(void)::close(pfd[1].fd);
pfd[1].fd = -1;
log(LOG_INFO,"vagus: lost connection to Vagus");
}
if(pfd[1].fd<0)
pfd[1].fd = connect_to_vagus(g_conf.m_vagusPort);
next_poll_ms = getCurrentTimeNanoseconds()/1000000 + g_conf.m_vagusKeepaliveSendInterval;
if(pfd[1].fd>=0) {
if(!do_vagus_poll(pfd[1].fd)) {
(void)close(pfd[1].fd);
pfd[1].fd = -1;
}
}
}
if(pfd[1].fd>=0)
(void)::close(pfd[1].fd);
return 0;
}
bool InstanceInfoExchange::initialize() {
enabled = g_hostdb.getNumHosts()>1;
if(!enabled) {
log(LOG_INFO,"vagus: only 1 host configured. No need for vagus communication");
return true;
}
please_shut_down = false;
//set vagus_cluster_name
if(g_conf.m_vagusClusterId[0])
strcpy(vagus_cluster_name, g_conf.m_vagusClusterId);
else {
//form one based on the username
char buf[256];
struct passwd pwd, *pwdptr;
if(getpwuid_r(geteuid(), &pwd,buf,sizeof(buf),&pwdptr)==0) {
sprintf(vagus_cluster_name, "gb-%s", pwd.pw_name);
log(LOG_INFO,"Using vagus cluster id '%s'",vagus_cluster_name);
} else {
log(LOG_ERROR,"getpwuid(geteuid()...) failed with errno=%d (%s)", errno,strerror(errno));
return false;
}
}
if(pipe(fd_pipe)!=0) {
log(LOG_ERROR,"pipe() failed with errno=%d (%s)", errno,strerror(errno));
return false;
}
int rc = pthread_create(&tid,NULL,poll_thread,NULL);
if(rc!=0) {
log(LOG_ERROR,"pthread_create() failed with rc=%d (%s)",rc,strerror(rc));
return false;
}
fd_keepalive = connect_to_vagus(g_conf.m_vagusPort);
return true;
}
void InstanceInfoExchange::finalize() {
if(!enabled)
return;
please_shut_down = true;
char dummy='d';
(void)write(fd_pipe[1],&dummy,1);
pthread_join(tid,NULL);
close(fd_pipe[0]); fd_pipe[0]=-1;
close(fd_pipe[1]); fd_pipe[1]=-1;
close(fd_keepalive); fd_keepalive=-1;
}
void InstanceInfoExchange::weAreAlive() {
if(!enabled)
return;
if(fd_keepalive<0)
fd_keepalive = connect_to_vagus(g_conf.m_vagusPort);
if(fd_keepalive<0)
return;
collnum_t daily_merge_collection_number = -1;
if(g_dailyMerge.m_cr)
daily_merge_collection_number = g_dailyMerge.m_cr->m_collnum;
char extra_information[256];
sprintf(extra_information, "%s;%d;%d;%d;%d;%lu",
getVersion(),
g_hostdb.getCRC(),
getOurHostFlags(),
daily_merge_collection_number,
g_repairMode,
g_process.getTotalDocsIndexed());
char command[256];
sprintf(command, "keepalive %s:%d:%u:%s\n",
vagus_cluster_name,
g_hostdb.getMyHostId(),
g_conf.m_vagusKeepaliveLifetime,
extra_information);
//logDebug(g_conf.m_logDebugVagus, "vagus: command='%s'",command);
size_t bytes_to_write = strlen(command);
ssize_t bytes_written = ::write(fd_keepalive, command, bytes_to_write);
if((size_t)bytes_written != bytes_to_write) {
log(LOG_ERROR,"vagus: write error, wrote %zd bytes, expected %zu, errno=%d", bytes_written, bytes_to_write, errno);
::close(fd_keepalive); fd_keepalive = -1;
return;
}
logDebug(g_conf.m_logDebugVagus, "vagus: sent keepalive to Vagus");
char ignored_response[10];
(void)read(fd_keepalive,ignored_response,sizeof(ignored_response));
}