almost done adding support for whitelists.

i.e. list of sites to restrict search results to,
for instance.
This commit is contained in:
mwells
2013-09-15 15:15:56 -06:00
parent e152205765
commit b684414e16
10 changed files with 215 additions and 49 deletions

195
Msg2.cpp

@ -6,6 +6,7 @@
#include "Rdb.h"
#include "Threads.h"
#include "Posdb.h" // getTermId()
#include "Msg39.h" // MAX_QUERY_LISTS
//static void gotListWrapper0 ( void *state ) ;
static void gotListWrapper ( void *state , RdbList *list , Msg5 *msg5 ) ;
@ -32,8 +33,12 @@ bool Msg2::getLists ( long rdbId ,
bool addToCache ,
//QueryTerm *qterms ,
Query *query ,
// put list of sites to restrict to in here
// or perhaps make it collections for federated search?
//char *whiteList ,
long *minRecSizes ,
//long numLists ,
// make max MAX_MSG39_LISTS
RdbList *lists ,
void *state ,
void (* callback)(void *state ) ,
@ -79,7 +84,7 @@ bool Msg2::getLists ( long rdbId ,
m_numReplies = 0;
m_numRequests = 0;
// save rdbid in case getDbnameFromId() is called below
m_msg5[0].m_rdbId = rdbId;
//m_msg5[0].m_rdbId = rdbId;
// start the timer
m_startTime = gettimeofdayInMilliseconds();
// set this
@ -87,10 +92,10 @@ bool Msg2::getLists ( long rdbId ,
// make sure not too many lists being requested
if ( m_numLists > MAX_NUM_LISTS ) {g_errno=ETOOMANYLISTS; return true;}
// clear them all
for ( long i = 0 ; i < m_numLists ; i++ ) {
m_inProgress[i] = true;
//m_slotNum[i] = -1;
}
//for ( long i = 0 ; i < m_numLists ; i++ ) {
// m_inProgress[i] = true;
// //m_slotNum[i] = -1;
//}
// all msg5 available for use
for ( long i = 0 ; i < MSG2_MAX_REQUESTS ; i++ ) m_avail[i] = true;
if ( m_isDebug ) {
@ -99,6 +104,8 @@ bool Msg2::getLists ( long rdbId ,
}
// reset error
m_errno = 0;
// reset list counter
m_i = 0;
// fetch what we need
return getLists ( );
}
@ -116,14 +123,16 @@ bool Msg2::getLists ( ) {
// loop:
// . send out a bunch of msg5 requests
// . make slots for all
for ( long i = 0 ; i < m_numLists ; i++ ) {
for ( ; m_i < m_numLists ; m_i++ ) {
// sanity for Msg39's sake. do no breach m_lists[].
if ( m_i >= MAX_QUERY_LISTS ) { char *xx=NULL;*xx=0; }
// if any had error, forget the rest. do not launch any more
if ( m_errno ) break;
// skip if already did it
if ( ! m_inProgress[i] ) continue;
//if ( ! m_inProgress[i] ) continue;
// skip if currently launched
//if ( m_slotNum[i] >= 0 ) continue;
if ( ! m_avail[i] ) continue;
//if ( ! m_avail[i] ) continue;
// do not allow too many outstanding requests
//if ( m_numRequests - m_numReplies >= MSG2_MAX_REQUESTS )
// return false;
@ -131,7 +140,7 @@ bool Msg2::getLists ( ) {
// . now we call Msg39::reset() which frees each m_list[i]
//m_lists[i].freeList();
// skip if no bytes requested
if ( m_minRecSizes[i] == 0 ) continue;
if ( m_minRecSizes[m_i] == 0 ) continue;
// get a free msg5
//long j = 0;
//for( ; j < MSG2_MAX_REQUESTS ; j++ ) if ( m_avail[j] ) break;
@ -152,27 +161,27 @@ bool Msg2::getLists ( ) {
if ( m_isDebug ) {
key144_t *sk ;
key144_t *ek ;
sk = (key144_t *)m_qterms[i].m_startKey;
ek = (key144_t *)m_qterms[i].m_endKey;
sk = (key144_t *)m_qterms[m_i].m_startKey;
ek = (key144_t *)m_qterms[m_i].m_endKey;
long long docId0 = g_posdb.getDocId(sk);
long long docId1 = g_posdb.getDocId(ek);
log("query: reading termlist #%li "//from "
//"distributed cache on host #%li. "
"termId=%lli. k=%s mr=%li (docid0=%lli -"
"docid1=%lli).",
i,
m_i,
//hostId,
g_posdb.getTermId(sk),
KEYSTR(sk,sizeof(POSDBKEY)),
//sk->n2,
//sk->n1,
//(long)sk->n0,
m_minRecSizes[i],
m_minRecSizes[m_i],
docId0,
docId1);
}
long minRecSize = m_minRecSizes[i];
long minRecSize = m_minRecSizes[m_i];
// sanity check
if ( ( minRecSize > ( 500 * 1024 * 1024 ) ||
@ -185,11 +194,8 @@ bool Msg2::getLists ( ) {
// if it is a no-split term, we may gotta get it over the net
//if ( ! m_qterms[i].isSplit() )
// forceLocalIndexdb = false;
// stash this
m_msg5[i].m_parent = this;
m_msg5[i].m_i = i;
QueryTerm *qt = &m_qterms[i];
QueryTerm *qt = &m_qterms[m_i];
char *sk2 = NULL;
char *ek2 = NULL;
@ -202,6 +208,14 @@ bool Msg2::getLists ( ) {
! qt->m_synonymOf )
continue;
Msg5 *msg5 = getAvailMsg5();
// return if all are in use
if ( ! msg5 ) return false;
// stash this
msg5->m_parent = this;
msg5->m_i = m_i;
/*
// if doing a gbdocid:| restricted query then use msg0
// because it is probably stored remotely!
@ -254,10 +268,10 @@ bool Msg2::getLists ( ) {
// really needs to do it and he doesn't call Msg2
// . this is really only used to get IndexLists
// . we now always compress the list for 2x faster transmits
if ( ! m_msg5[i].getList (
if ( ! msg5->getList (
m_rdbId , // rdbid
m_coll ,
&m_lists[i], // listPtr
&m_lists[m_i], // listPtr
sk2,//&m_startKeys [i*ks],
ek2,//&m_endKeys [i*ks],
minRecSize ,
@ -266,7 +280,7 @@ bool Msg2::getLists ( ) {
0, // maxcacheage
0 , // start file num
numFiles,//-1 , // num files
&m_msg5[i] , // state
msg5,//&m_msg5[i] , // state
gotListWrapper ,
m_niceness ,
false , // error correction
@ -283,7 +297,7 @@ bool Msg2::getLists ( ) {
true) ) { // MERGE AGAIN NOW!
m_numRequests++;
//m_slotNum [i] = i;
m_avail [i] = false;
//m_avail [i] = false;
continue;
}
@ -295,30 +309,122 @@ bool Msg2::getLists ( ) {
// that is no longer the case!! we do a merge now... i
// think we decided it was easier to deal with shit n posdb.cpp
// but i don't know how much this matters really
m_avail [i] = false;
//m_avail [i] = false;
// set our end time if we need to
//if ( g_conf.m_logTimingNet )
// m_endTimes[i] = gettimeofdayInMilliseconds();
// if the list is empty, we can get its components now
m_inProgress[i] = false;
//m_inProgress[i] = false;
// we didn't block, so do this
m_numReplies++;
m_numRequests++;
// break out on error and wait for replies if we blocked
if ( g_errno ) {
m_errno = g_errno;
log("query: Got error reading termlist: %s.",
mstrerror(g_errno));
break;
}
// return the msg5 now
returnMsg5 ( msg5 );
// note it
//if ( m_isDebug )
// logf(LOG_DEBUG,"query: got list #%li size=%li",
// i,m_lists[i].getListSize() );
// count it
//m_totalRead += m_lists[i].getListSize();
// break out on error and wait for replies if we blocked
if ( ! g_errno ) continue;
// report the error and return
m_errno = g_errno;
log("query: Got error reading termlist: %s.",
mstrerror(g_errno));
goto skip;
}
//
// now read in lists from the terms in the "whiteList"
//
// initialize ptr
if ( ! m_p ) m_p = m_whiteList;
// loop over terms in the whitelist, space separated
for ( char *p = m_p ; *p ; m_i++ ) {
// advance
char *current = p;
for ( ; *p && *p != ' ' ; p++ );
// save end of "current"
char *end = p;
// advance to point to next item in whiteList
for ( ; *p == ' ' ; p++ );
// convert whiteList term into key
long long termId = hash64 ( current , end - current , 0LL );
// mask to 48 bits
termId &= TERMID_MASK;
// make key. be sure to limit to provided docid range
// if we are doing docid range splits to prevent OOM
key_t sk3 = g_posdb.makeStartKey ( termId , docId1 );
key_t ek3 = g_posdb.makeEndKey ( termId , docId2 );
// get one
Msg5 *msg5 = getAvailMsg5();
// return if all are in use
if ( ! msg5 ) return false;
// stash this
msg5->m_parent = this;
msg5->m_i = m_i;
// advance cursor
m_p = p;
// sanity for Msg39's sake. do no breach m_lists[].
if ( m_i >= MAX_QUERY_LISTS ) { char *xx=NULL;*xx=0; }
// start up the read. thread will wait in thread queue to
// launch if too many threads are out.
if ( ! msg5->getList (
m_rdbId , // rdbid
m_coll ,
&m_lists[m_i], // listPtr
&sk3,//&m_startKeys [i*ks],
&ek3,//&m_endKeys [i*ks],
-1,//minRecSize ,
includeTree,//true, // include tree?
false , // addtocache
0, // maxcacheage
0 , // start file num
numFiles,//-1 , // num files
msg5,//&m_msg5[i] , // state
gotListWrapper ,
m_niceness ,
false , // error correction
NULL , // cachekeyptr
0, // retrynum
-1, // maxretries
true, // compensateformerge?
-1, // syncpoint
NULL, // msg5b
false, // isrealmerge?
true,// allow disk page cache?
true, // hit disk?
//false)) {// MERGE LISTS??? NO!!!
true) ) { // MERGE AGAIN NOW!
m_numRequests++;
//m_avail [i] = false;
continue;
}
// return it!
m_numReplies++;
m_numRequests++;
// . return the msg5 now
returnMsg5 ( msg5 );
// break out on error and wait for replies if we blocked
if ( ! g_errno ) continue;
// report the error and return
m_errno = g_errno;
log("query: Got error reading termlist: %s.",
mstrerror(g_errno));
goto skip;
}
skip:
// do the 2nd pass if we need to (and there was no error)
//if ( ! g_errno && pass == 0 ) { pass = 1; goto loop; }
// if we did get a compound list reply w/o blocking, re-do the loop
@ -334,6 +440,24 @@ bool Msg2::getLists ( ) {
return gotList ( NULL );
}
Msg5 *Msg2::getAvailMsg5 ( ) {
for ( long i = 0 ; i < MSG2_MAX_REQUESTS ; i++ )
if ( m_avail[i] ) return &m_msg5[i];
return NULL;
}
void Msg2::returnMsg5 ( Msg5 *msg5 ) {
long i; for ( i = 0 ; i < MSG2_MAX_REQUESTS ; i++ )
if ( &m_msg5[i] != msg5 ) continue;
// wtf?
if ( i >= MSG2_MAX_REQUESTS ) { char *xx=NULL;*xx=0; }
// make it available
m_avail[i] = true;
// reset it
msg5->reset();
}
/*
void gotListWrapper0 ( void *state ) {
Msg0 *msg0 = (Msg0 *)state;
@ -369,11 +493,12 @@ void gotListWrapper ( void *state , RdbList *rdblist, Msg5 *msg5 ) {
g_errno = 0;
}
// identify the msg0 slot we use
//long i = list - THIS->m_lists;
long i = ms->m_i;
long i = list - THIS->m_lists;
//long i = ms->m_i;
//if ( i < 0 || i >= MSG2_MAX_REQUESTS ) { char *xx=NULL;*xx=0; }
//long nn = THIS->m_slotNum [ i ];
THIS->m_inProgress[ i] = false;
//THIS->m_inProgress[ i] = false;
THIS->returnMsg5 ( ms );
// now we keep for because Msg2::getGroupList() needs it!!
//THIS->m_avail [nn] = true;
//THIS->m_slotNum [ i] = -1;
@ -421,7 +546,7 @@ bool Msg2::gotList ( RdbList *list ) {
//if ( g_errno ) return true;
if ( m_errno )
log("net: Had error fetching data from %s: %s.",
getDbnameFromId(m_msg5[0].m_rdbId),
getDbnameFromId(m_rdbId),
mstrerror(m_errno) );
// note it

13
Msg2.h

@ -14,6 +14,8 @@
// launch up to 25 msg0 requests at a time
//#define MSG2_MAX_REQUESTS 25
// how many outstanding msg5 requests at one time?
#define MSG2_MAX_REQUESTS MAX_QUERY_TERMS
class Msg2 {
@ -54,6 +56,10 @@ class Msg2 {
bool checkCache = false);
bool getLists();
// list of sites to restrict search results to. space separated
char *m_whiteList;
char *m_p;
long m_i;
// for posdbtable to get lists
//long getNumListGroups ( ) { return m_query->m_numTerms; }
@ -70,6 +76,9 @@ class Msg2 {
// get how many bytes we read
//long getTotalRead() { return m_totalRead; };
class Msg5 *getAvailMsg5();
void returnMsg5 ( class Msg5 *msg5 ) ;
// leave public so C wrapper can call
bool gotList ( RdbList *list );
@ -82,8 +91,8 @@ class Msg2 {
RdbList *m_lists;
char m_inProgress [ MAX_NUM_LISTS ];
char m_slotNum [ MAX_NUM_LISTS ];
//char m_inProgress [ MAX_NUM_LISTS ];
//char m_slotNum [ MAX_NUM_LISTS ];
// used for getting component lists if compound list is empty
void mergeLists_r ( ) ;

6
Msg3.h

@ -18,10 +18,14 @@
//#define MAX_RDB_FILES 2048
// make Msg5 footprint smaller
//#define MAX_RDB_FILES 512
// make Msg5 footprint smaller since we have "whitelist" in msg2.cpp
// we need to run one msg5 per whitelisted site then and we can have up to
// 500 sites in the whitelist.
#define MAX_RDB_FILES 1024
//#define MSG3_BUF_SIZE ((sizeof(RdbScan)+sizeof(key_t)+sizeof(RdbList)+20)*6)
#define MSG3_BUF_SIZE ((sizeof(RdbScan)+MAX_KEY_BYTES+sizeof(RdbList)+20)*6)
//#define MSG3_BUF_SIZE ((sizeof(RdbScan)+MAX_KEY_BYTES+sizeof(RdbList)+20)*6)
#define MSG3_BUF_SIZE 64
#include "RdbList.h"
#include "RdbScan.h"

@ -577,6 +577,7 @@ bool Msg39::getLists () {
m_r->m_addToCache ,
//m_tmpq.m_qterms ,
&m_tmpq,
m_r->ptr_whiteList,
// how much of each termlist to read in bytes
(long *)m_r->ptr_readSizes ,
//m_tmpq.getNumTerms() , // numLists

11
Msg39.h

@ -17,6 +17,10 @@
#include "Msg51.h"
#include "HashTableX.h"
// how many RdbLists to hold?
#define MAX_WHITELIST_TERMS 500
#define MAX_QUERY_LISTS (MAX_QUERY_TERMS+MAX_WHITELIST_TERMS)
#define MAX_MSG39_REQUEST_SIZE (500+MAX_QUERY_LEN)
void handleRequest39 ( UdpSlot *slot , long netnice ) ;
@ -57,10 +61,12 @@ class Msg39Request {
ptr_readSizes = NULL;
ptr_query = NULL; // in utf8?
ptr_whiteList = NULL;
ptr_coll = NULL;
size_readSizes = 0;
size_query = 0;
size_whiteList = 0;
size_coll = 0;
m_getDocIdScoringInfo = 1;
@ -125,11 +131,13 @@ class Msg39Request {
char *ptr_readSizes;
char *ptr_termFreqWeights;
char *ptr_query; // in utf8?
char *ptr_whiteList;
char *ptr_coll;
long size_readSizes;
long size_termFreqWeights;
long size_query;
long size_whiteList;
long size_coll;
char m_buf[0];
@ -176,7 +184,6 @@ public:
};
class Msg39 {
public:
@ -229,7 +236,7 @@ class Msg39 {
// . we hold our IndexLists here for passing to PosdbTable
// . one array for each of the tiers
IndexList m_lists [ MAX_QUERY_TERMS ];
IndexList m_lists [ MAX_QUERY_LISTS ];
// used for timing
long long m_startTime;

@ -380,6 +380,8 @@ bool Msg3a::gotCacheReply ( ) {
// Query::expandQuery() above
m_r->ptr_query = m_q->m_orig;
m_r->size_query = m_q->m_origLen+1;
// the white list now too...
m_r->ptr_whiteList = si->m_whiteListBuf.getBufStart();
// free us?
if ( m_rbufPtr && m_rbufPtr != m_rbuf ) {
mfree ( m_rbufPtr , m_rbufSize, "Msg3a" );

@ -512,6 +512,8 @@ bool Msg40::getDocIds ( bool recall ) {
m_r.m_language = (unsigned char)m_si->m_queryLang;
m_r.ptr_query = m_si->m_q->m_orig;
m_r.size_query = m_si->m_q->m_origLen+1;
m_r.ptr_whiteList = m_si->m_whiteListBuf.getBufStart();
m_r.size_whiteList = m_si->m_whiteListBuf.length()+1;
m_r.m_timeout = -1; // auto-determine based on #terms
// make sure query term counts match in msg39
m_r.m_maxQueryTerms = m_si->m_maxQueryTerms;

@ -267,6 +267,13 @@ bool sendPageResults ( TcpSocket *s , HttpRequest *hr ) {
// propagate "admin" if set
long admin = hr->getLong("admin",-1);
if ( admin != -1 ) sb.safePrintf("&admin=%li",admin);
// propagate list of sites to restrict query to
long sitesLen;
char *sites = hr->getString("sites",&sitesLen,NULL);
if ( sites ) {
sb.safePrintf("&sites=");
sb.urlEncode ( sites,true);
}
// propagate "debug" if set
long debug = hr->getLong("debug",0);
if ( debug ) sb.safePrintf("&debug=%li",debug);

@ -341,7 +341,12 @@ m if (! cr->hasSearchPermission ( sock, encapIp ) ) {
// based on if we're doing an xml feed, have a site: query, etc.
long xml = r->getLong ( "xml" , 0 ); // was "raw"
long siteLen = 0; r->getString ("site",&siteLen);
long sitesLen = 0; r->getString ("sites",&sitesLen);
long sitesLen = 0;
char *sites = r->getString ("sites",&sitesLen,NULL);
// save it if there
if ( sites && sitesLen > 0 && ! m_whiteListBuf.safeStrcpy(sites) )
return log("query: unable to strcpy whitelist");
// now override automatic defaults for special cases
if ( xml > 0 ) {
@ -361,12 +366,12 @@ m if (! cr->hasSearchPermission ( sock, encapIp ) ) {
m_doSiteClustering = false;
m_ipRestrictForTopics = false;
}
else if ( m_sitesLen > 0 ) {
else if ( m_whiteListBuf.length() > 0 ) {
m_ipRestrictForTopics = false;
}
m_doIpClustering = false;
m_sitesQueryLen = 0;
//m_sitesQueryLen = 0;
// set the user ip, "uip"
long uip = m_queryIP;
@ -830,7 +835,7 @@ m if (! cr->hasSearchPermission ( sock, encapIp ) ) {
else if ( m_q->m_hasIpField ) m_useCache = 0;
else if ( m_q->m_hasUrlField ) m_useCache = 0;
else if ( m_siteLen > 0 ) m_useCache = 0;
else if ( m_sitesLen > 0 ) m_useCache = 0;
else if ( m_whiteListBuf.length() ) m_useCache = 0;
else if ( m_urlLen > 0 ) m_useCache = 0;
}
}
@ -909,11 +914,12 @@ bool SearchInput::setQueryBuffers ( ) {
numSites = 0;
csStr = get_charset_str(qcs);
/*
if ( m_sites && m_sites[0] ) {
char *s = m_sites;
char *t;
long len;
m_sbuf1.pushChar('(');//*p++ = '(';
m_sbuf1.pushChar('(');// *p++ = '(';
loop:
// skip white space
while ( *s && ! is_alnum_a(*s) ) s++;
@ -930,7 +936,7 @@ bool SearchInput::setQueryBuffers ( ) {
//p += ucToUtf8(p, pend-p,s, len, csStr, 0,0);
m_sbuf1.safeMemcpy ( s , len );
//memcpy ( p , s , len ); p += len;
//*p++ = ' ';
// *p++ = ' ';
m_sbuf1.pushChar(' ');
s = t;
numSites++;
@ -940,6 +946,7 @@ bool SearchInput::setQueryBuffers ( ) {
// inc totalLen
m_sitesQueryLen = m_sitesLen + (numSites * 10);
}
*/
// append site: term
if ( m_siteLen > 0 ) {
//if ( p > pstart ) *p++ = ' ';

@ -304,8 +304,8 @@ class SearchInput {
char *m_htmlTail;
long m_siteLen;
char *m_site;
long m_sitesLen;
char *m_sites;
//long m_sitesLen;
//char *m_sites;
long m_plusLen;
char *m_plus;
long m_minusLen;
@ -324,7 +324,9 @@ class SearchInput {
char *m_url;
long m_imgWidth;
long m_imgHeight;
long m_sitesQueryLen;
//long m_sitesQueryLen;
SafeBuf m_whiteListBuf;
// password is for all
long m_pwdLen;