464 lines
13 KiB
C++
464 lines
13 KiB
C++
#include "Msg4In.h"
|
|
#include "Parms.h"
|
|
|
|
#include "UdpServer.h"
|
|
#include "Hostdb.h"
|
|
#include "Conf.h"
|
|
#include "UdpSlot.h"
|
|
#include "Rdb.h"
|
|
#include "Repair.h"
|
|
#include "JobScheduler.h"
|
|
#include "ip.h"
|
|
#include "Mem.h"
|
|
#include "Titledb.h" // for Titledb::validateSerializedRecord
|
|
#include "SpiderdbRdbSqliteBridge.h"
|
|
#include "SiteMedianPageTemperatureRegistry.h"
|
|
#include "Errno.h"
|
|
#include "Log.h"
|
|
#include "fctypes.h"
|
|
#include <sys/stat.h> //stat()
|
|
#include <fcntl.h>
|
|
#include <cerrno>
|
|
|
|
#ifdef _VALGRIND_
|
|
#include <valgrind/memcheck.h>
|
|
#endif
|
|
|
|
// . call this once for every Msg14 so it can add all at once...
|
|
// . make Msg14 add the links before anything else since that uses Msg10
|
|
// . also, need to update spiderdb rec for the url in Msg14 using Msg4 too!
|
|
// . need to add support for passing in array of lists for Msg14
|
|
|
|
namespace Msg4In {
|
|
static bool addMetaList(const char *p, class UdpSlot *slot = NULL);
|
|
static void handleRequest4(UdpSlot *slot, int32_t niceness);
|
|
static void processMsg4(void *item);
|
|
|
|
static GbThreadQueue s_incomingThreadQueue;
|
|
}
|
|
|
|
// all these parameters should be preset
|
|
bool Msg4In::registerHandler() {
|
|
logTrace( g_conf.m_logTraceMsg4In, "BEGIN" );
|
|
|
|
// register ourselves with the udp server
|
|
if ( ! g_udpServer.registerHandler ( msg_type_4, handleRequest4 ) ) {
|
|
log(LOG_ERROR,"%s:%s: Could not register with UDP server!", __FILE__, __func__ );
|
|
return false;
|
|
}
|
|
|
|
logTrace( g_conf.m_logTraceMsg4In, "END - returning true");
|
|
|
|
return true;
|
|
}
|
|
|
|
bool Msg4In::initializeIncomingThread() {
|
|
return s_incomingThreadQueue.initialize(processMsg4, "process-msg4");
|
|
}
|
|
|
|
void Msg4In::finalizeIncomingThread() {
|
|
s_incomingThreadQueue.finalize();
|
|
}
|
|
|
|
// . destroys the slot if false is returned
|
|
// . this is registered in Msg4::set() to handle add rdb record msgs
|
|
// . seems like we should always send back a reply so we don't leave the
|
|
// requester's slot hanging, unless he can kill it after transmit success???
|
|
// . TODO: need we send a reply back on success????
|
|
// . NOTE: Must always call g_udpServer::sendReply or sendErrorReply() so
|
|
// read/send bufs can be freed
|
|
static void Msg4In::processMsg4(void *item) {
|
|
UdpSlot *slot = static_cast<UdpSlot*>(item);
|
|
|
|
logTrace( g_conf.m_logTraceMsg4In, "BEGIN" );
|
|
|
|
// extract what we read
|
|
char *readBuf = slot->m_readBuf;
|
|
|
|
// this returns false with g_errno set on error
|
|
if (!addMetaList(readBuf, slot)) {
|
|
logError("calling sendErrorReply error='%s'", mstrerror(g_errno));
|
|
g_udpServer.sendErrorReply(slot,g_errno);
|
|
|
|
logTrace(g_conf.m_logTraceMsg4In, "END - addMetaList returned false. g_errno=%d", g_errno);
|
|
return;
|
|
}
|
|
|
|
// good to go
|
|
g_udpServer.sendReply(NULL, 0, NULL, 0, slot);
|
|
|
|
logTrace(g_conf.m_logTraceMsg4In, "END - OK");
|
|
}
|
|
|
|
static void Msg4In::handleRequest4(UdpSlot *slot, int32_t /*netnice*/) {
|
|
// if we just came up we need to make sure our hosts.conf is in
|
|
// sync with everyone else before accepting this! it might have
|
|
// been the case that the sender thinks our hosts.conf is the same
|
|
// since last time we were up, so it is up to us to check this
|
|
if ( g_hostdb.hostsConfInDisagreement() ) {
|
|
g_errno = EBADHOSTSCONF;
|
|
logError("call sendErrorReply");
|
|
g_udpServer.sendErrorReply ( slot , g_errno );
|
|
|
|
log(LOG_WARN,"%s:%s: END - hostsConfInDisagreement", __FILE__, __func__ );
|
|
return;
|
|
}
|
|
|
|
// need to be in sync first
|
|
if ( ! g_hostdb.hostsConfInAgreement() ) {
|
|
// . if we do not know the sender's hosts.conf crc, wait 4 it
|
|
// . this is 0 if not received yet
|
|
if (!slot->m_host->isHostsConfCRCKnown()) {
|
|
g_errno = EWAITINGTOSYNCHOSTSCONF;
|
|
logError("call sendErrorReply");
|
|
g_udpServer.sendErrorReply ( slot , g_errno );
|
|
|
|
log(LOG_WARN,"%s:%s: END - EWAITINGTOSYNCHOSTCONF", __FILE__, __func__ );
|
|
return;
|
|
}
|
|
|
|
// compare our hosts.conf to sender's otherwise
|
|
if (!slot->m_host->hasSameHostsConfCRC()) {
|
|
g_errno = EBADHOSTSCONF;
|
|
logError("call sendErrorReply");
|
|
g_udpServer.sendErrorReply ( slot , g_errno );
|
|
|
|
log(LOG_WARN,"%s:%s: END - EBADHOSTSCONF", __FILE__, __func__ );
|
|
return;
|
|
}
|
|
}
|
|
|
|
// extract what we read
|
|
char *readBuf = slot->m_readBuf;
|
|
int32_t readBufSize = slot->m_readBufSize;
|
|
|
|
// must at least have an rdbId
|
|
if (readBufSize < 7) {
|
|
g_errno = EREQUESTTOOSHORT;
|
|
logError("call sendErrorReply");
|
|
g_udpServer.sendErrorReply ( slot , g_errno );
|
|
|
|
log(LOG_ERROR,"%s:%s: END - EREQUESTTOOSHORT", __FILE__, __func__ );
|
|
return;
|
|
}
|
|
|
|
|
|
// get total buf used
|
|
int32_t used = *(int32_t *)readBuf; //p += 4;
|
|
|
|
// sanity check
|
|
if ( used != readBufSize ) {
|
|
logError("msg4: got corrupted request from hostid %" PRId32" used [%" PRId32"] != readBufSize [%" PRId32"]. tid=%" PRId32 "",
|
|
slot->m_host->m_hostId, used, readBufSize, slot->getTransId());
|
|
loghex(LOG_ERROR, readBuf, (readBufSize < 160 ? readBufSize : 160), "readBuf (first max. 160 bytes)");
|
|
|
|
gbshutdownAbort(true);
|
|
}
|
|
|
|
// if we did not sync our parms up yet with host 0, wait...
|
|
if ( g_hostdb.m_myHostId != 0 && ! g_parms.inSyncWithHost0() ) {
|
|
// limit logging to once per second
|
|
static int32_t s_lastTime = 0;
|
|
int32_t now = getTime();
|
|
if ( now - s_lastTime >= 1 ) {
|
|
s_lastTime = now;
|
|
log(LOG_INFO, "msg4: waiting to sync with host #0 before accepting data");
|
|
}
|
|
// tell send to try again shortly
|
|
g_errno = ETRYAGAIN;
|
|
logError("call sendErrorReply");
|
|
g_udpServer.sendErrorReply(slot,g_errno);
|
|
|
|
logTrace( g_conf.m_logTraceMsg4In, "END - ETRYAGAIN. Waiting to sync with host #0" );
|
|
return;
|
|
}
|
|
|
|
s_incomingThreadQueue.addItem(slot);
|
|
}
|
|
|
|
struct RdbItem {
|
|
RdbItem(collnum_t collNum, const char *rec, int32_t recSize)
|
|
: m_collNum(collNum)
|
|
, m_rec(rec)
|
|
, m_recSize(recSize) {
|
|
}
|
|
|
|
collnum_t m_collNum;
|
|
const char *m_rec;
|
|
int32_t m_recSize;
|
|
};
|
|
|
|
struct RdbItems {
|
|
RdbItems()
|
|
: m_numRecs(0)
|
|
, m_dataSizes(0)
|
|
, m_items() {
|
|
}
|
|
|
|
int32_t m_numRecs;
|
|
int32_t m_dataSizes;
|
|
std::vector<RdbItem> m_items;
|
|
};
|
|
|
|
// . Syncdb.cpp will call this after it has received checkoff keys from
|
|
// all the alive hosts for this zid/sid
|
|
// . returns false and sets g_errno on error, returns true otherwise
|
|
static bool Msg4In::addMetaList(const char *p, UdpSlot *slot) {
|
|
logDebug(g_conf.m_logDebugSpider, "syncdb: calling addMetalist zid=%" PRIu64, *(int64_t *) (p + 4));
|
|
|
|
// get total buf used
|
|
int32_t used = *(int32_t *)p;
|
|
// the end
|
|
const char *pend = p + used;
|
|
// skip the used amount
|
|
p += 4;
|
|
// skip zid
|
|
p += 8;
|
|
|
|
Rdb *rdb = NULL;
|
|
char lastRdbId = -1;
|
|
|
|
/// @note we can have multiple meta list here
|
|
|
|
// check if we have enough room for the whole request
|
|
// note: we also use this variable for keeping track of which rdbs we have touch and may need an integrity check
|
|
std::map<rdbid_t, RdbItems> rdbItems;
|
|
|
|
while (p < pend) {
|
|
collnum_t collnum = *(collnum_t *)p;
|
|
p += sizeof(collnum_t);
|
|
|
|
rdbid_t rdbId = static_cast<rdbid_t>(*(char *)p);
|
|
p += 1;
|
|
|
|
int32_t recSize = *(int32_t *)p;
|
|
p += 4;
|
|
|
|
const char *rec = p;
|
|
|
|
// recSize can't go over pend
|
|
if(p + recSize > pend)
|
|
gbshutdownAbort(true);
|
|
|
|
// Sanity. Shut down if data sizes are wrong.
|
|
if( rdbId == RDB_TITLEDB ) {
|
|
Titledb::validateSerializedRecord( rec, recSize );
|
|
}
|
|
|
|
switch(rdbId) {
|
|
case RDB_SPIDERDB_DEPRECATED:
|
|
case RDB2_SPIDERDB2_DEPRECATED: {
|
|
//spiderdb records no longer reside in an Rdb
|
|
|
|
// don't add to spiderdb when we're nospider host
|
|
if(g_hostdb.getMyHost()->m_spiderEnabled) {
|
|
auto &rdbItem = rdbItems[rdbId];
|
|
++rdbItem.m_numRecs;
|
|
|
|
int32_t dataSize = recSize - sizeof(key128_t) - 4;
|
|
|
|
rdbItem.m_dataSizes += dataSize;
|
|
|
|
rdbItem.m_items.emplace_back(collnum, rec, recSize);
|
|
}
|
|
break;
|
|
}
|
|
case RDB_SITEDEFAULTPAGETEMPERATURE: {
|
|
//not an Rdb
|
|
//only used in queries, so if we are a noquery host then just drop it
|
|
if(g_hostdb.getMyHost()->m_queryEnabled) {
|
|
auto &rdbItem = rdbItems[rdbId];
|
|
++rdbItem.m_numRecs;
|
|
|
|
int32_t dataSize = 8+4+4;
|
|
|
|
rdbItem.m_dataSizes += dataSize;
|
|
|
|
rdbItem.m_items.emplace_back(collnum, rec, recSize);
|
|
}
|
|
break;
|
|
|
|
}
|
|
default: {
|
|
// . get the rdb to which it belongs, use Msg0::getRdb()
|
|
// . do not call this for every rec if we do not have to
|
|
if (rdbId != lastRdbId || !rdb) {
|
|
rdb = getRdbFromId(rdbId);
|
|
|
|
if (!rdb) {
|
|
char ipbuf[16];
|
|
log(LOG_WARN, "msg4: rdbId of %" PRId32" unrecognized from hostip=%s. dropping WHOLE request",
|
|
(int32_t)rdbId, slot ? iptoa(slot->getIp(),ipbuf) : "unknown");
|
|
gbshutdownAbort(true);
|
|
}
|
|
|
|
// an uninitialized secondary rdb?
|
|
// don't core any more, we probably restarted this shard
|
|
// and it needs to wait for host #0 to syncs its
|
|
// g_conf.m_repairingEnabled to '1' so it can start its
|
|
// Repair.cpp repairWrapper() loop and init the secondary
|
|
// rdbs so "rdb" here won't be NULL any more.
|
|
if (!rdb->isInitialized()) {
|
|
time_t currentTime = getTime();
|
|
static time_t s_lastTime = 0;
|
|
if (currentTime > s_lastTime + 10) {
|
|
s_lastTime = currentTime;
|
|
log(LOG_WARN, "msg4: oops. got an rdbId key for a secondary "
|
|
"rdb and not in repair mode. waiting to be in repair mode.");
|
|
}
|
|
g_errno = ETRYAGAIN;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// if we don't have data, recSize must be the same with keySize
|
|
if (rdb->getFixedDataSize() == 0 && recSize != rdb->getKeySize()) {
|
|
gbshutdownAbort(true);
|
|
}
|
|
|
|
auto &rdbItem = rdbItems[rdbId];
|
|
++rdbItem.m_numRecs;
|
|
|
|
int32_t dataSize = recSize - rdb->getKeySize();
|
|
if (rdb->getFixedDataSize() == -1) {
|
|
dataSize -= 4;
|
|
}
|
|
|
|
rdbItem.m_dataSizes += dataSize;
|
|
|
|
rdbItem.m_items.emplace_back(collnum, rec, recSize);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// advance over the rec data to point to next entry
|
|
p += recSize;
|
|
}
|
|
|
|
bool hasRoom = true;
|
|
bool anyDumping = false;
|
|
for (auto const &rdbItem : rdbItems) {
|
|
if(rdbItem.first!=RDB_SPIDERDB_DEPRECATED && rdbItem.first!=RDB2_SPIDERDB2_DEPRECATED && rdbItem.first!=RDB_SITEDEFAULTPAGETEMPERATURE) {
|
|
Rdb *rdb = getRdbFromId(rdbItem.first);
|
|
if (rdb->isDumping()) {
|
|
anyDumping = true;
|
|
} else if (!rdb->hasRoom(rdbItem.second.m_numRecs, rdbItem.second.m_dataSizes)) {
|
|
rdb->submitRdbDumpJob(true);
|
|
hasRoom = false;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!hasRoom) {
|
|
logDebug(g_conf.m_logDebugSpider, "One or more target Rdbs don't have room currently. Returning try-again for this Msg4");
|
|
g_errno = ETRYAGAIN;
|
|
return false;
|
|
}
|
|
|
|
if (anyDumping) {
|
|
logDebug(g_conf.m_logDebugSpider, "One or more target Rdbs is dumping. Returning try-again for this Msg4");
|
|
g_errno = ETRYAGAIN;
|
|
return false;
|
|
}
|
|
|
|
|
|
for (auto const &rdbItem : rdbItems) {
|
|
switch(rdbItem.first) {
|
|
case RDB_SPIDERDB_DEPRECATED:
|
|
case RDB2_SPIDERDB2_DEPRECATED: {
|
|
//transform record list into something the sqlite bridge understands
|
|
std::vector<SpiderdbRdbSqliteBridge::BatchedRecord> v;
|
|
for(auto const &item : rdbItem.second.m_items)
|
|
v.emplace_back(item.m_collNum, item.m_rec, item.m_recSize);
|
|
//then insert all at once
|
|
bool status = rdbItem.first==RDB_SPIDERDB_DEPRECATED ? SpiderdbRdbSqliteBridge::addRecords(v) : SpiderdbRdbSqliteBridge::addRecords2(v);
|
|
if(!status)
|
|
goto break_out_of_for;
|
|
break;
|
|
}
|
|
case RDB_SITEDEFAULTPAGETEMPERATURE: {
|
|
for(auto const &item : rdbItem.second.m_items) {
|
|
//uint64_t docId = (*(uint64_t*)(item.m_rec+0))>>1;
|
|
uint32_t sitehash32 = *(uint32_t*)(item.m_rec+8);
|
|
unsigned sdpt = *(unsigned*)(item.m_rec+12);
|
|
g_smptr.add(sitehash32,sdpt);
|
|
}
|
|
break;
|
|
}
|
|
default: {
|
|
Rdb *rdb = getRdbFromId(rdbItem.first);
|
|
|
|
for (auto const &item : rdbItem.second.m_items) {
|
|
// reset g_errno
|
|
g_errno = 0;
|
|
|
|
// . make a list from this data
|
|
// . skip over the first 4 bytes which is the rdbId
|
|
// . TODO: embed the rdbId in the msgtype or something...
|
|
RdbList list;
|
|
|
|
// set the list
|
|
// todo: dodgy cast to char*. RdbList should be fixed
|
|
list.set((char *)item.m_rec, item.m_recSize, (char *)item.m_rec, item.m_recSize,
|
|
rdb->getFixedDataSize(), false, rdb->useHalfKeys(), rdb->getKeySize());
|
|
|
|
// keep track of stats
|
|
rdb->readRequestAdd(item.m_recSize);
|
|
|
|
// this returns false and sets g_errno on error
|
|
bool status = rdb->addListNoSpaceCheck(item.m_collNum, &list);
|
|
|
|
// bad coll #? ignore it. common when deleting and resetting
|
|
// collections using crawlbot. but there are other recs in this
|
|
// list from different collections, so do not abandon the whole
|
|
// meta list!! otherwise we lose data!!
|
|
if(!status) {
|
|
if(g_errno == ENOCOLLREC)
|
|
g_errno = 0;
|
|
else
|
|
goto break_out_of_for;
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
break_out_of_for:
|
|
|
|
// verify integrity if wanted
|
|
if (g_conf.m_verifyTreeIntegrity) {
|
|
for(auto const &rdbItem : rdbItems) {
|
|
if(rdbItem.first!=RDB_SPIDERDB_DEPRECATED && rdbItem.first!=RDB2_SPIDERDB2_DEPRECATED && rdbItem.first!=RDB_SITEDEFAULTPAGETEMPERATURE) {
|
|
Rdb *rdb = getRdbFromId(rdbItem.first);
|
|
rdb->verifyTreeIntegrity();
|
|
}
|
|
}
|
|
}
|
|
|
|
// no memory means to try again
|
|
if (g_errno == ENOMEM) {
|
|
g_errno = ETRYAGAIN;
|
|
}
|
|
|
|
// doing a full rebuid will add collections
|
|
if (g_errno == ENOCOLLREC && g_repairMode > 0) {
|
|
g_errno = ETRYAGAIN;
|
|
}
|
|
|
|
// are we done
|
|
if (g_errno) {
|
|
return false;
|
|
}
|
|
|
|
// Initiate dumps for any Rdbs wanting it
|
|
for (auto const &rdbItem : rdbItems) {
|
|
if(rdbItem.first!=RDB_SPIDERDB_DEPRECATED && rdbItem.first!=RDB2_SPIDERDB2_DEPRECATED && rdbItem.first!=RDB_SITEDEFAULTPAGETEMPERATURE) {
|
|
Rdb *rdb = getRdbFromId(rdbItem.first);
|
|
rdb->submitRdbDumpJob(false);
|
|
}
|
|
}
|
|
|
|
// success
|
|
return true;
|
|
}
|