#include "InstanceInfoExchange.h" #include "Hostdb.h" #include "GbMutex.h" #include "ScopedLock.h" #include "Log.h" #include "GbUtil.h" #include "Hostdb.h" #include "Conf.h" #include "DailyMerge.h" #include "Collectiondb.h" #include "Version.h" #include "repair_mode.h" #include "HostFlags.h" #include "Process.h" #include "IOBuffer.h" #include <pthread.h> #include <netinet/in.h> #include <sys/socket.h> #include <errno.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <poll.h> #include <map> #include <string> #include <vector> #include <pwd.h> //We use Vagus for the information exchange. //Previously, PingInfo and PingServer was used but that scaled poorly to //hundreths of instances because it would take quite a while for the //information to be fully propagated //The vagus cluster name is configurable (eefaulting to gb-$USER) //The 'extra information' peice is used for piggubacking: // gbVersionStr // hosts.conf CRC // host flags // dailyMergeCollnum // repairMode // totalDocsIndexed //We use two sockets toward the local Vagus instance. One for sending //keepalives and one for regularly polling for instance information. It is //tempting to use the combined "keepalivepoll" feature in vagus but if the //thread isn't calling weAreAlive() often enough then the ( other) instance //information will be out-of-date. static bool enabled = false; static int fd_keepalive = -1; static int fd_pipe[2] = {-1,-1}; static bool please_shut_down=true; static pthread_t tid; static char vagus_cluster_name[128] = "gb"; //Connect to Vagus (blocking) static int connect_to_vagus(int port) { int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if(fd<0) { log(LOG_ERROR,"vagus: socket() failed with errno=%d (%s)", errno, strerror(errno)); return -1; } sockaddr_in sin; memset(&sin,0,sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(port); if(connect(fd,(sockaddr*)(void*)&sin,sizeof(sin))!=0) { log(LOG_ERROR,"vagus: connect() failed with errno=%d (%s)", errno, strerror(errno)); close(fd); return -1; } log(LOG_INFO, "vagus: Connected to Vagus on fd %d", fd); return fd; } static void process_alive_hosts(std::map<int,std::string> &alive_hosts) { if(alive_hosts.size() != (unsigned)g_hostdb.getNumHosts()) { log(LOG_WARN, "vagus: got %zu alive hosts instead of %d", alive_hosts.size(), g_hostdb.getNumHosts()); char hosts[g_hostdb.getNumHosts()]; memset(hosts, '.', sizeof(hosts)); for (const auto &iter : alive_hosts) { hosts[iter.first] = '+'; } log(LOG_WARN, "vagus: %.*s", g_hostdb.getNumHosts(), hosts); } time_t now = time(NULL); static time_t s_ownLastAliveTime = now; if ((now - s_ownLastAliveTime) > (g_conf.m_vagusMaxDeadTime * 60)) { log(LOG_ERROR, "vagus: We have not seen ourself alive for the past %d minutes. Aborting", g_conf.m_vagusMaxDeadTime); gbshutdownResourceError(); } std::vector<int> alive_hosts_ids; alive_hosts_ids.reserve(alive_hosts.size()); for(auto iter : alive_hosts) { int hostid = iter.first; if(hostid<0 || hostid>=g_hostdb.getNumHosts()) continue; if (hostid == g_hostdb.getMyHostId()) { s_ownLastAliveTime = now; } alive_hosts_ids.push_back(hostid); char extra_information[256]; if(iter.second.length()>=sizeof(extra_information)) continue; strcpy(extra_information,iter.second.c_str()); char *ss=NULL; const char *gb_version_str = strtok_r(extra_information,";",&ss); const char *hosts_conf_crc_str = strtok_r(NULL,";",&ss); const char *host_flags_str = strtok_r(NULL,";",&ss); const char *daily_merge_collection_number_str = strtok_r(NULL,";",&ss); const char *repair_mode_str = strtok_r(NULL,";",&ss); const char *total_docs_indexed_str = strtok_r(NULL,";",&ss); if(!gb_version_str || gb_version_str[0]=='\0') continue; if(strlen(gb_version_str)>=sizeof(HostRuntimeInformation::m_gbVersionStr)) continue; char *endptr; int hosts_conf_crc = (int)strtol(hosts_conf_crc_str,&endptr,0); if(endptr && *endptr) continue; int host_flags = (int)strtol(host_flags_str,&endptr,0); if(endptr && *endptr) continue; int daily_merge_collection_number = (int)strtol(daily_merge_collection_number_str,&endptr,0); if(endptr && *endptr) continue; int repair_mode = (int)strtol(repair_mode_str,&endptr,0); if(endptr && *endptr) continue; int total_docs_indexed = (int)strtol(total_docs_indexed_str,&endptr,0); if(endptr && *endptr) continue; //phase 1: update host fields that seem safe //when we get rid of PingServer entirely then this will take over HostRuntimeInformation hri; hri.m_valid = true; hri.m_flags = host_flags; hri.m_dailyMergeCollnum = daily_merge_collection_number; strcpy(hri.m_gbVersionStr,gb_version_str); hri.m_totalDocsIndexed = total_docs_indexed; hri.m_hostsConfCRC = hosts_conf_crc; hri.m_repairMode = repair_mode; g_hostdb.updateHostRuntimeInformation(hostid, hri); //Host *h = g_hostdb.getHost(hostid); //h->m_pingInfo.m_repairMode = repair_mode; } g_hostdb.updateAliveHosts(&alive_hosts_ids.front(),alive_hosts_ids.size()); } static bool do_vagus_poll(int fd) { char command[256]; sprintf(command,"poll %s\n", vagus_cluster_name); size_t bytes_to_write = strlen(command); ssize_t bytes_written = write(fd,command,bytes_to_write); if((size_t)bytes_written!=bytes_to_write) { log(LOG_ERROR,"vagus: write(fd=%d) returned %zd, expected %zu, errno=%d", fd, bytes_written,bytes_to_write,errno); return false; } IOBuffer in_buf; for(;;) { in_buf.reserve_extra(65536); ssize_t bytes_read = read(fd, in_buf.end(), in_buf.spare()); if(bytes_read<0) { log(LOG_ERROR,"vagus: read(fd=%d) from vagus failed with errno=%d (%s)",fd,errno,strerror(errno)); return false; } if(bytes_read==0) { log(LOG_ERROR,"vagus: read(fd=%d) from vagus returned 0",fd); return false; } in_buf.push_back((size_t)bytes_read); if(in_buf.used()>=2 && in_buf.end()[-2]=='\n' && in_buf.end()[-1]=='\n') break; if(in_buf.used()==1 && in_buf.end()[-1]=='\n') break; } //nul-terminate in_buf.reserve_extra(1); *(in_buf.end()) = '\0'; std::map<int,std::string> alive_hosts; for(char *p = in_buf.begin(); p!=in_buf.end(); ) { char *nl = strchr(p,'\n'); if(!nl) break; *nl='\0'; char *ss=NULL; char *hostid_str = strtok_r(p,":",&ss); char *extra_information = strtok_r(NULL,"",&ss); if(!hostid_str || !extra_information) continue; char *endptr = NULL; int hostid = (int)strtol(hostid_str,&endptr,0); if((endptr==0 || *endptr=='\0') && hostid>=0 && extra_information) { alive_hosts[hostid] = extra_information; } p = nl+1; } process_alive_hosts(alive_hosts); return true; } static void *poll_thread(void *) { pthread_setname_np(pthread_self(),"vaguspoll"); struct pollfd pfd[2]; memset(pfd,0,sizeof(pfd)); pfd[0].fd = fd_pipe[0]; pfd[0].events = POLLIN; pfd[1].fd = connect_to_vagus(g_conf.m_vagusPort); pfd[0].events = POLLIN; uint64_t next_poll_ms = 0; //poll toward vagus - not poll() syscall while(!please_shut_down) { uint64_t now_ms = getCurrentTimeNanoseconds()/1000000; if(next_poll_ms > now_ms) (void)poll(pfd,2,next_poll_ms-now_ms); if(please_shut_down) break; if(pfd[1].revents&(POLLIN|POLLHUP)) { //unexpected input or lost connection. (void)::close(pfd[1].fd); pfd[1].fd = -1; log(LOG_INFO,"vagus: lost connection to Vagus"); } if(pfd[1].fd<0) pfd[1].fd = connect_to_vagus(g_conf.m_vagusPort); next_poll_ms = getCurrentTimeNanoseconds()/1000000 + g_conf.m_vagusKeepaliveSendInterval; if(pfd[1].fd>=0) { if(!do_vagus_poll(pfd[1].fd)) { (void)close(pfd[1].fd); pfd[1].fd = -1; } } } if(pfd[1].fd>=0) (void)::close(pfd[1].fd); return 0; } bool InstanceInfoExchange::initialize() { enabled = g_hostdb.getNumHosts()>1; if(!enabled) { log(LOG_INFO,"vagus: only 1 host configured. No need for vagus communication"); return true; } please_shut_down = false; //set vagus_cluster_name if(g_conf.m_vagusClusterId[0]) strcpy(vagus_cluster_name, g_conf.m_vagusClusterId); else { //form one based on the username char buf[256]; struct passwd pwd, *pwdptr; if(getpwuid_r(geteuid(), &pwd,buf,sizeof(buf),&pwdptr)==0) { sprintf(vagus_cluster_name, "gb-%s", pwd.pw_name); log(LOG_INFO,"Using vagus cluster id '%s'",vagus_cluster_name); } else { log(LOG_ERROR,"getpwuid(geteuid()...) failed with errno=%d (%s)", errno,strerror(errno)); return false; } } if(pipe(fd_pipe)!=0) { log(LOG_ERROR,"pipe() failed with errno=%d (%s)", errno,strerror(errno)); return false; } int rc = pthread_create(&tid,NULL,poll_thread,NULL); if(rc!=0) { log(LOG_ERROR,"pthread_create() failed with rc=%d (%s)",rc,strerror(rc)); return false; } fd_keepalive = connect_to_vagus(g_conf.m_vagusPort); return true; } void InstanceInfoExchange::finalize() { if(!enabled) return; please_shut_down = true; char dummy='d'; (void)write(fd_pipe[1],&dummy,1); pthread_join(tid,NULL); close(fd_pipe[0]); fd_pipe[0]=-1; close(fd_pipe[1]); fd_pipe[1]=-1; close(fd_keepalive); fd_keepalive=-1; } void InstanceInfoExchange::weAreAlive() { if(!enabled) return; if(fd_keepalive<0) fd_keepalive = connect_to_vagus(g_conf.m_vagusPort); if(fd_keepalive<0) return; collnum_t daily_merge_collection_number = -1; if(g_dailyMerge.m_cr) daily_merge_collection_number = g_dailyMerge.m_cr->m_collnum; char extra_information[256]; sprintf(extra_information, "%s;%d;%d;%d;%d;%lu", getVersion(), g_hostdb.getCRC(), getOurHostFlags(), daily_merge_collection_number, g_repairMode, g_process.getTotalDocsIndexed()); char command[256]; sprintf(command, "keepalive %s:%d:%u:%s\n", vagus_cluster_name, g_hostdb.getMyHostId(), g_conf.m_vagusKeepaliveLifetime, extra_information); //logDebug(g_conf.m_logDebugVagus, "vagus: command='%s'",command); size_t bytes_to_write = strlen(command); ssize_t bytes_written = ::write(fd_keepalive, command, bytes_to_write); if((size_t)bytes_written != bytes_to_write) { log(LOG_ERROR,"vagus: write error, wrote %zd bytes, expected %zu, errno=%d", bytes_written, bytes_to_write, errno); ::close(fd_keepalive); fd_keepalive = -1; return; } logDebug(g_conf.m_logDebugVagus, "vagus: sent keepalive to Vagus"); char ignored_response[10]; (void)read(fd_keepalive,ignored_response,sizeof(ignored_response)); }