checkpoint

This commit is contained in:
Matt Wells
2014-02-20 14:54:21 -08:00
parent 88dfa20cbe
commit 9820f14066
2 changed files with 292 additions and 7 deletions

@ -435,12 +435,204 @@ void Msg25::reset() {
#define MODE_PAGELINKINFO 1
#define MODE_SITELINKINFO 2
// . get the inlinkers to this SITE (any page on this site)
// . use that to compute a site quality
// . also get the inlinkers sorted by date and see how many good inlinkers
// we had since X days ago. (each inlinker needs a pub/birth date)
// . returns false if would block, true otherwise
// . sets g_errno and returns true on launch error
// . calls req->m_callback when ready if it would block
bool getLinkInfo ( SafeBuf *reqBuf ,
Multicast *mcast ,
char *site ,
char *url ,
bool isSiteLinkInfo ,
long ip ,
long long docId ,
collnum_t collnum ,
void *state ,
void (* callback)(void *state) ,
bool isInjecting ,
SafeBuf *pbuf ,
bool printInXml ,
long siteNumInlinks ,
LinkInfo *oldLinkInfo ,
long niceness ,
bool doLinkSpamCheck ,
bool oneVotePerIpDom ,
bool canBeCancelled ,
long lastUpdateTime ,
bool onlyNeedGoodInlinks ,
bool getLinkerTitles ,
long ourHostHash32 ,
long ourDomHash32 ,
SafeBuf *linkInfoBuf ) {
long siteLen = gbstrlen(site);
long urlLen = gbstrlen(url);
long oldLinkSize = 0;
if ( oldLinkInfo )
oldLinkSize = oldLinkInfo->getStoredSize();
long need = sizeof(Msg25Request) + siteLen+1 + urlLen+1 + oldLinkSize;
// keep it in a safebuf so caller can just add "SafeBuf m_msg25Req;"
// to his .h file and not have to worry about freeing it.
reqBuf->purge();
if ( ! reqBuf->reserve ( need ,"m25req" ) ) return NULL;
Msg25Request *req = (Msg25Request *)mem;
req->m_allocSize = need;
req->ptr_site = site;
req->size_site = siteLen + 1;
req->ptr_url = url;
req->size_url = urlLen + 1;
req->ptr_oldLinkInfo = oldLinkInfo->getBuf();
req->size_oldLinkInfo = oldLinkInfo->getBufSize();
if ( isSiteLinkInfo ) req->m_mode = MODE_SITELINKINFO;
else req->m_mode = MODE_PAGELINKINFO;
req->m_ip = ip;
req->m_docId = docId;
req->m_collnum = collnum;
req->m_state = state;
req->m_callback = callback;
req->m_isInjecting = isInjecting;
req->m_printInXml = printInXml;
req->m_siteNumInlinks = siteNumInlinks;
req->m_niceness = niceness;
req->m_doLinkSpamCheck = doLinkSpamCheck;
req->m_oneVotePerIpDom = oneVotePerIpDom;
req->m_canBeCancelled = canBeCancelled;
req->m_lastUpdateTime = lastUpdateTime;
req->m_onlyNeedGoodInlinks = onlyNeedGoodInlinks;
req->m_getLinkerTitles = getLinkerTitles;
req->m_ourHostHash32 = ourHostHash32;
req->m_ourDomHash32 = ourDomHash32;
// turns the ptr_* members into offsets into req->m_buf[]
req->serialize();
Url u;
u.set ( req->ptr_url );
req->m_linkHash64 = (uint64_t)u.getUrlHash64();
// send to host for local linkdb lookup
key224_t startKey ;
long siteHash32 = hash32n ( req->ptr_site );
// access different parts of linkdb depending on the "mode"
if ( m_mode == MODE_SITELINKINFO )
startKey = g_linkdb.makeStartKey_uk ( siteHash32 );
else
startKey = g_linkdb.makeStartKey_uk (siteHash32,
req->m_linkHash64 );
// what group has this linkdb list?
unsigned long shardNum = getShardNum ( RDB_LINKDB, &startKey );
// use a biased lookup
long numTwins = g_hostdb.getNumHostsPerShard();
long long sectionWidth = (0xffffffff/(long long)numTwins) + 1;
// these are 192 bit keys, top 32 bits are a hash of the url
unsigned long x = siteHash32;//(startKey.n1 >> 32);
long hostNum = x / sectionWidth;
long numHosts = g_hostdb.getNumHostsPerShard();
Host *hosts = g_hostdb.getShard ( shardNum); // Group ( groupId );
if ( hostNum >= numHosts ) { char *xx = NULL; *xx = 0; }
long hostId = hosts [ hostNum ].m_hostId ;
// . serialize the string buffers
// . use Msg25Request::m_buf[MAX_NEEDED]
req->serialize();
// this should always block
if ( ! mcast->sendRequest(
(char *)req ,
req->getStoredSize() ,
0x25 ,
false , // does multicast own request?
shardNum ,
false , // send to whole group?
0 , // key is passed on startKey
req , // state data
NULL , // state data
gotMulticastReplyWrapper0 ,
1000 , // timeout in seconds (was 30)
req->m_niceness ,
false, // realtime ,
hostId )) {// firstHostId ,
log("linkdb: Failed to send multicast for %s err=%s",
u.getUrl(),mstrerror(g_errno));
return true;
}
// wait for req->m_callback(req->m_state) to be called
return false;
}
void handleRequest25 ( UdpSlot *slot , long netnice ) {
Msg25Request *req = (Msg25Request *)slot->m_readBuf;
// . if already working on this same request, wait for it
// . hashkey is combo of collection, url, and m_mode
long slot = g_lineTable.getSlot ( &req->m_reqHash64 );
if ( slot >= 0 ) {
Msg25Request *head ;
head = *(Msg25Request **)g_lineTable.getValFromSlot(slot);
if ( head->m_next )
req->m_next = head->m_next;
head->m_next = req;
// we will send a reply back for this guy when done
// getting the reply for the head msg25request
return;
}
// i guess make a msg25 class to hold hashtables etc.
Msg25 *m25 = (Msg25 *)mcalloc(sizeof(Msg25),"hrm25");
// since g_erron is set this should send back an error reply
if ( ! m25 ) sendReply ( slot , msg25 );
// this should call our callback when done
if ( ! m25->getLinkInfo2 ( req->ptr_site ,
req->ptr_url ,
req->m_isSiteLinkInfo ,
req->m_ip ,
req->m_docId ,
req->m_collnum , // coll
NULL, // qbuf
0 , // qbufSize
m25 , // state
doneGettingLinkInfoWrapper ,
req->m_isInjecting ,
SafeBuf *pbuf ,
XmlDoc *xd ,
//bool printInXml ,
req->m_siteNumInlinks ,
LinkInfo *oldLinkInfo ,
req->m_niceness ,
req->m_doLinkSpamCheck ,
req->m_oneVotePerIpDom ,
req->m_canBeCancelled ,
req->m_lastUpdateTime ,
req->m_onlyNeedGoodInlinks ,
req->m_getLinkerTitles ,
req->m_ourHostHash32 ,
req->m_ourDomHash32 ,
SafeBuf *linkInfoBuf ) )
return;
// it did not block... g_errno will be set on error so sendReply()
// should in that case send an error reply.
sendReply ();
}
//////
//
// OLD interface below here. use the stuff above now so we can send
// the request to a single host and multiple incoming requests can
// wait in line, and we can set network bandwidth too.
//
/////
// . returns false if blocked, true otherwise
// . sets g_errno on error
@ -448,7 +640,7 @@ void Msg25::reset() {
// . NOTE: make sure no input vars are on the stack in case we block
// . reallyGetLinkInfo is set to false if caller does not want it but calls
// us anyway for some reason forgotten...
bool Msg25::getLinkInfo ( char *site ,
bool Msg25::getLinkInfo2( char *site ,
char *url ,
// either MODE_PAGELINKINFO or MODE_SITELINKINFO
bool isSiteLinkInfo ,
@ -685,7 +877,7 @@ bool Msg25::doReadLoop ( ) {
// Now we hang indefinitely. We also fixed UdpServer to resend
// requests after 30 seconds even though it was fully acked in case
// the receiving host went down and is now back up.
if ( ! m_msg0.getList ( -1 , // hostId, -1 if none
if ( ! m_msg5.getList ( -1 , // hostId, -1 if none
0 , // hostId ip
0 , // hostId port
0 , // max cache age in seconds

@ -35,6 +35,99 @@
#include "DiskPageCache.h"
#include "Titledb.h"
// . get the inlinkers to this SITE (any page on this site)
// . use that to compute a site quality
// . also get the inlinkers sorted by date and see how many good inlinkers
// we had since X days ago. (each inlinker needs a pub/birth date)
class Msg25Request {
public:
// either MODE_PAGELINKINFO or MODE_SITELINKINFO
char m_mode; // bool m_isSiteLinkInfo ;
long m_ip ;
long long m_docId ;
collnum_t m_collnum ;
bool m_isInjecting ;
bool m_printInXml ;
// when we get a reply we call this
void *m_state ;
void (* m_callback)(void *state) ;
// server-side parms so it doesn't have to allocate a state
//SafeBuf m_pbuf ;
//SafeBuf m_linkInfoBuf ;
//char *coll ;
//char *qbuf ;
//long qbufSize ;
//XmlDoc *xd ;
long m_siteNumInlinks ;
LinkInfo *m_oldLinkInfo ;
long m_niceness ;
bool m_doLinkSpamCheck ;
bool m_oneVotePerIpDom ;
bool m_canBeCancelled ;
long m_lastUpdateTime ;
bool m_onlyNeedGoodInlinks ;
bool m_getLinkerTitles ;
long m_ourHostHash32 ;
long m_ourDomHash32 ;
char *ptr_site ;
char *ptr_url ;
char *ptr_pbuf;
char *ptr_linkInfoBuf;
long size_site;
long size_url;
long size_pbuf;
long size_linkInfoBuf;
};
// . returns false if blocked, true otherwise
// . sets errno on error
// . your req->m_callback will be called with the Msg25Reply
bool getLinkInfo ( char *site ,
char *url ,
bool isSiteLinkInfo ,
long ip ,
long long docId ,
char *coll ,
char *qbuf ,
long qbufSize ,
void *state ,
void (* callback)(void *state) ,
bool isInjecting ,
SafeBuf *pbuf ,
class XmlDoc *xd ,
long siteNumInlinks ,
//long sitePop ,
LinkInfo *oldLinkInfo ,
long niceness ,
bool doLinkSpamCheck ,
bool oneVotePerIpDom ,
bool canBeCancelled ,
long lastUpdateTime ,
bool onlyNeedGoodInlinks ,
bool getLinkerTitles , //= false ,
// if an inlinking document has an outlink
// of one of these hashes then we set
// Msg20Reply::m_hadLinkToOurDomOrHost.
// it is used to remove an inlinker to a related
// docid, which also links to our main seo url
// being processed. so we do not recommend
// such links since they already link to a page
// on your domain or hostname. set BOTH to zero
// to not perform this algo in handleRequest20()'s
// call to XmlDoc::getMsg20Reply().
long ourHostHash32 , // = 0 ,
long ourDomHash32 , // = 0 );
SafeBuf *myLinkInfoBuf );
void handleRequest25 ( UdpSlot *slot , long netnice ) ;
long getSiteRank ( long sni ) ;
class Linkdb {
@ -307,7 +400,7 @@ class Msg25 {
// any link text and return true right away, really saves a bunch
// of disk seeks when spidering small collections that don't need
// link text/info indexing/analysis
bool getLinkInfo ( char *site ,
bool getLinkInfo2 (char *site ,
char *url ,
bool isSiteLinkInfo ,
long ip ,