9 Commits

13 changed files with 310 additions and 194 deletions

@ -55,7 +55,7 @@ Conf::Conf ( ) {
memset(m_dnsPorts, 0, sizeof(m_dnsPorts));
m_dnsCacheMaxAge = 0;
m_dnsCacheSize = 0;
m_dnsMaxCacheMem = 0;
m_dnsCacheMaxMem = 0;
m_askRootNameservers = false;
m_numRns = 0;
memset(m_rnsIps, 0, sizeof(m_rnsIps));
@ -117,6 +117,7 @@ Conf::Conf ( ) {
m_spiderDeadHostCheckInterval = 0;
m_spiderUrlCacheMaxAge = 0;
m_spiderUrlCacheSize = 0;
m_spiderUrlCacheMaxMem = 0;
m_indexdbMaxIndexListAge = 0;
m_udpMaxSockets = 0;
m_httpMaxSockets = 0;

4
Conf.h

@ -89,8 +89,7 @@ class Conf {
int64_t m_dnsCacheSize;
int64_t m_dnsCacheMaxAge;
int32_t m_dnsMaxCacheMem;
int64_t m_dnsCacheMaxMem;
SafeBuf m_proxyIps;
SafeBuf m_proxyAuth;
@ -200,6 +199,7 @@ class Conf {
int64_t m_spiderUrlCacheMaxAge;
int64_t m_spiderUrlCacheSize;
int64_t m_spiderUrlCacheMaxMem;
// indexdb has a max cached age for getting IndexLists (10 mins deflt)
int32_t m_indexdbMaxIndexListAge;

@ -151,7 +151,7 @@ bool Dns::init ( uint16_t clientPort ) {
// get primary dns server info from the conf class
m_dnsClientPort = clientPort; // g_conf.m_dnsClientPort;
// set the name of the cache. it will save to WORKDIR/{name}.dat
int32_t maxMem = g_conf.m_dnsMaxCacheMem ;
int32_t maxMem = g_conf.m_dnsCacheMaxMem ;
// . how many nodes in cache tree can we fit?
// . each rec is key (12) and ip(4)
// . overhead in cache is 56

229
FxCache.h Normal file

@ -0,0 +1,229 @@
//
// Copyright (C) 2017 Privacore ApS - https://www.privacore.com
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
// License TL;DR: If you change this file, you must publish your changes.
//
#ifndef FX_FXCACHE_H
#define FX_FXCACHE_H
#include <inttypes.h>
#include <stddef.h>
#include <unordered_map>
#include <deque>
#include <algorithm>
#include <sstream>
#include "Mem.h"
#include "GbMutex.h"
#include "ScopedLock.h"
#include "fctypes.h"
#include "Log.h"
template <typename TKey, typename TData, typename TKeyHash = std::hash<TKey>>
class FxCache {
public:
FxCache()
: m_mtx()
, m_queue()
, m_map()
, m_total_size(0)
, m_max_age(300000) // 5 minutes
, m_max_item(10000)
, m_max_size(20000000) // 20 Mb
, m_log_trace(false)
, m_log_cache_name("cache") {
}
~FxCache() {
clear();
}
void configure(int64_t max_age, size_t max_item, int64_t max_size, bool log_trace, const char *log_cache_name) {
ScopedLock sl(m_mtx);
m_max_age = max_age;
m_max_item = max_item;
m_max_size = max_size;
m_log_trace = log_trace;
m_log_cache_name = log_cache_name;
if (disabled()) {
clear_unlocked();
}
}
void clear() {
ScopedLock sl(m_mtx);
clear_unlocked();
}
void insert(const TKey &key, const TData &data) {
ScopedLock sl(m_mtx);
// cache disabled
if (disabled()) {
return;
}
purge_step();
CacheItem item(data);
if (m_log_trace) {
logTrace(m_log_trace, "inserting key='%s' size=%zu to %s", getKeyStr(key).c_str(), item.m_dataSize, m_log_cache_name);
}
auto map_it = m_map.find(key);
if (map_it == m_map.end()) {
m_map.insert(std::make_pair(key, item));
} else {
map_it->second = item;
auto queue_it = std::find(m_queue.begin(), m_queue.end(), key);
if (queue_it != m_queue.end()) {
m_queue.erase(queue_it);
}
}
m_queue.push_back(key);
m_total_size += item.m_dataSize;
if (m_queue.size() > m_max_item || m_total_size > m_max_size) {
purge_step(true);
}
}
bool lookup(const TKey &key, TData *data) {
ScopedLock sl(m_mtx);
// cache disabled
if (disabled()) {
return false;
}
purge_step();
auto map_it = m_map.find(key);
if (map_it != m_map.end()) {
if (expired(map_it->second)) {
logTrace(m_log_trace, "expired key='%s' in %s", getKeyStr(key).c_str(), m_log_cache_name);
return false;
} else {
logTrace(m_log_trace, "found key='%s' in %s", getKeyStr(key).c_str(), m_log_cache_name);
*data = map_it->second.m_data;
return true;
}
} else {
logTrace(m_log_trace, "unable to find key='%s' in %s", getKeyStr(key).c_str(), m_log_cache_name);
return false;
}
}
bool remove(const TKey &key) {
ScopedLock sl(m_mtx);
// cache disabled
if (disabled()) {
return false;
}
auto map_it = m_map.find(key);
if (map_it != m_map.end() && !expired(map_it->second)) {
logTrace(m_log_trace, "removing key='%s' in %s", getKeyStr(key).c_str(), m_log_cache_name);
m_total_size -= map_it->second.m_dataSize;
m_map.erase(map_it);
auto queue_it = std::find(m_queue.begin(), m_queue.end(), key);
m_queue.erase(queue_it);
return true;
} else {
logTrace(m_log_trace, "unable to find key='%s' in %s", getKeyStr(key).c_str(), m_log_cache_name);
return false;
}
}
private:
FxCache(const FxCache&);
FxCache& operator=(const FxCache&);
struct CacheItem {
CacheItem(const TData &data)
: m_timestamp(gettimeofdayInMilliseconds())
, m_data(data)
, m_dataSize(sizeof(data)) {
}
CacheItem(const TData &data, size_t dataSize)
: m_timestamp(gettimeofdayInMilliseconds())
, m_data(data)
, m_dataSize(dataSize) {
}
int64_t m_timestamp;
TData m_data;
size_t m_dataSize;
};
bool expired(const CacheItem &item) const {
return (item.m_timestamp + m_max_age < gettimeofdayInMilliseconds());
}
bool disabled() const {
return (m_max_age == 0 || m_max_item == 0 || m_max_size == 0);
}
void clear_unlocked() {
m_map.clear();
m_queue.clear();
}
static std::string getKeyStr(const TKey &key) {
std::stringstream os;
os << key;
return os.str();
}
void purge_step(bool forced=false) {
if (m_queue.empty()) {
return;
}
auto iter = m_map.find(m_queue.front());
assert(iter != m_map.end());
if (forced || expired(iter->second)) {
logTrace(m_log_trace, "removing key='%s' in %s", getKeyStr(iter->first).c_str(), m_log_cache_name);
m_total_size -= iter->second.m_dataSize;
m_map.erase(iter);
m_queue.pop_front();
}
}
GbMutex m_mtx;
std::deque<TKey> m_queue; // queue of items to expire, ordered by epiration time
std::unordered_map<TKey,CacheItem,TKeyHash> m_map; // cached items
int64_t m_total_size; // total data size
int64_t m_max_age; // max item age (expiry) in msecs
size_t m_max_item; // maximum number of items
int64_t m_max_size;
bool m_log_trace;
const char *m_log_cache_name;
};
#endif //FX_FXCACHE_H

165
GbCache.h

@ -1,165 +0,0 @@
#ifndef GB_GBCACHE_H
#define GB_GBCACHE_H
#include <inttypes.h>
#include <stddef.h>
#include <unordered_map>
#include <deque>
#include <algorithm>
#include <sstream>
#include "Mem.h"
#include "GbMutex.h"
#include "ScopedLock.h"
#include "fctypes.h"
#include "Log.h"
template <typename TKey, typename TData>
class GbCache {
public:
GbCache()
: m_mtx()
, m_queue()
, m_map()
, m_max_age(300000) // 5 minutes
, m_max_item(10000)
, m_log_trace(false)
, m_log_cache_name("cache") {
}
~GbCache() {
clear();
}
void configure(int64_t max_age, size_t max_item, bool log_trace, const char *log_cache_name) {
ScopedLock sl(m_mtx);
m_max_age = max_age;
m_max_item = max_item;
m_log_trace = log_trace;
m_log_cache_name = log_cache_name;
if (disabled()) {
clear_unlocked();
}
}
void clear() {
ScopedLock sl(m_mtx);
clear_unlocked();
}
void insert(const TKey &key, const TData &data) {
ScopedLock sl(m_mtx);
// cache disabled
if (disabled()) {
return;
}
purge_step();
if (m_log_trace) {
logTrace(m_log_trace, "inserting key='%s' to %s", getKeyStr(key).c_str(), m_log_cache_name);
}
CacheItem item(data);
auto map_it = m_map.find(key);
if (map_it == m_map.end()) {
m_map.insert(std::make_pair(key, item));
} else {
map_it->second = item;
auto queue_it = std::find(m_queue.begin(), m_queue.end(), key);
if (queue_it != m_queue.end()) {
m_queue.erase(queue_it);
}
}
m_queue.push_back(key);
if (m_queue.size() > m_max_item) {
purge_step(true);
}
}
bool lookup(const TKey &key, TData *data) {
ScopedLock sl(m_mtx);
// cache disabled
if (disabled()) {
return false;
}
purge_step();
auto map_it = m_map.find(key);
if (map_it != m_map.end() && !expired(map_it->second)) {
logTrace(m_log_trace, "found key='%s' in %s", getKeyStr(key).c_str(), m_log_cache_name);
*data = map_it->second.m_data;
return true;
} else {
logTrace(m_log_trace, "unable to find key='%s' in %s", getKeyStr(key).c_str(), m_log_cache_name);
return false;
}
}
private:
GbCache(const GbCache&);
GbCache& operator=(const GbCache&);
struct CacheItem {
CacheItem(const TData &data)
: m_timestamp(gettimeofdayInMilliseconds())
, m_data(data) {
}
int64_t m_timestamp;
TData m_data;
};
bool expired(const CacheItem &item) const {
return (item.m_timestamp + m_max_age < gettimeofdayInMilliseconds());
}
bool disabled() const {
return (m_max_age == 0 || m_max_item == 0);
}
void clear_unlocked() {
m_map.clear();
m_queue.clear();
}
static std::string getKeyStr(const TKey &key) {
std::stringstream os;
os << key;
return os.str();
}
void purge_step(bool forced=false) {
if (m_queue.empty()) {
return;
}
auto iter = m_map.find(m_queue.front());
assert(iter != m_map.end());
if (forced || expired(iter->second)) {
m_map.erase(iter);
m_queue.pop_front();
}
}
GbMutex m_mtx;
std::deque<TKey> m_queue; //queue of items to expire, ordered by epiration time
std::unordered_map<TKey,CacheItem> m_map; //cached items
int64_t m_max_age; //max item age (expiry) in msecs
size_t m_max_item; //maximum number of items
bool m_log_trace;
const char *m_log_cache_name;
};
#endif //GB_GBCACHE_H

@ -6,7 +6,7 @@
#include "third-party/c-ares/ares.h"
#include "ip.h"
#include "GbThreadQueue.h"
#include "GbCache.h"
#include "FxCache.h"
#include <arpa/nameser.h>
#include <netdb.h>
#include <vector>
@ -25,7 +25,7 @@ static std::queue<struct DnsItem*> s_callbackQueue;
static GbMutex s_callbackQueueMtx;
static GbThreadQueue s_requestQueue;
static GbCache<std::string, GbDns::DnsResponse> s_cache;
static FxCache<std::string, GbDns::DnsResponse> s_cache;
static void a_callback(void *arg, int status, int timeouts, unsigned char *abuf, int alen);
static void ns_callback(void *arg, int status, int timeouts, unsigned char *abuf, int alen);
@ -187,7 +187,7 @@ bool GbDns::initializeSettings() {
}
}
s_cache.configure(g_conf.m_dnsCacheMaxAge*1000, g_conf.m_dnsCacheSize, g_conf.m_logTraceDnsCache, "dns cache");
s_cache.configure(g_conf.m_dnsCacheMaxAge*1000, g_conf.m_dnsCacheSize, g_conf.m_dnsCacheMaxMem, g_conf.m_logTraceDnsCache, "dns cache");
return true;
}

@ -3148,14 +3148,6 @@ void Parms::init ( ) {
m->m_obj = OBJ_CONF;
m++;
m->m_title = "dns max cache mem";
m->m_desc = "How many bytes should be used for caching DNS replies?";
simple_m_set(Conf,m_dnsMaxCacheMem);
m->m_def = "128000";
m->m_flags = PF_NOSYNC|PF_NOAPI;
m->m_page = PAGE_NONE;
m++;
m->m_title = "http max send buf size";
m->m_desc = "Maximum bytes of a doc that can be sent before having "
"to read more from disk";
@ -4995,6 +4987,17 @@ void Parms::init ( ) {
m->m_page = PAGE_MASTER;
m++;
m->m_title = "spidered url cache max mem";
m->m_desc = "How many bytes shoudl be used for caching spiderd url?";
m->m_cgi = "spurlcachemaxmem";
simple_m_set(Conf,m_spiderUrlCacheMaxMem);
m->m_def = "10000000";
m->m_units = "bytes";
m->m_group = true;
m->m_flags = PF_REBUILDSPIDERSETTINGS;
m->m_page = PAGE_MASTER;
m++;
m->m_title = "spider IP based url";
m->m_desc = "Should we spider IP based url (eg: http://127.0.0.1/)";
m->m_cgi = "spipurl";
@ -5816,6 +5819,17 @@ void Parms::init ( ) {
m->m_page = PAGE_MASTER;
m++;
m->m_title = "dns cache max mem";
m->m_desc = "How many bytes should be used for caching DNS replies?";
m->m_cgi = "dnscachemaxmem";
simple_m_set(Conf,m_dnsCacheMaxMem);
m->m_def = "128000";
m->m_units = "bytes";
m->m_group = false;
m->m_flags = PF_REBUILDDNSSETTINGS;
m->m_page = PAGE_MASTER;
m++;
m->m_title = "default collection";
m->m_desc = "When no collection is explicitly specified, assume "
"this collection name.";

@ -142,7 +142,7 @@ void SpiderLoop::init() {
}
void SpiderLoop::initSettings() {
m_urlCache.configure(g_conf.m_spiderUrlCacheMaxAge*1000, g_conf.m_spiderUrlCacheSize, g_conf.m_logTraceSpiderUrlCache, "spider url cache");
m_urlCache.configure(g_conf.m_spiderUrlCacheMaxAge*1000, g_conf.m_spiderUrlCacheSize, g_conf.m_spiderUrlCacheMaxMem, g_conf.m_logTraceSpiderUrlCache, "spider url cache");
}
void SpiderLoop::nukeWinnerListCache(collnum_t collnum) {

@ -8,7 +8,7 @@
#include "Msg5.h"
#include "hash.h"
#include "RdbCache.h"
#include "GbCache.h"
#include "FxCache.h"
#include <time.h>
#include <atomic>
@ -93,7 +93,7 @@ private:
HashTableX m_lockTable;
mutable GbMutex m_lockTableMtx;
GbCache<std::string, void*> m_urlCache;
FxCache<std::string, void*> m_urlCache;
// . list for getting next url(s) to spider
RdbList m_list;

@ -1,9 +1,9 @@
#include <gtest/gtest.h>
#include "GbCache.h"
#include "FxCache.h"
TEST(GbCacheTest, InsertLookup) {
GbCache<int64_t, std::string> cache;
cache.configure(60000, 10, false, "test");
TEST(FxCacheTest, InsertLookup) {
FxCache<int64_t, std::string> cache;
cache.configure(60000, 10, 1000000, true, "test");
int64_t key = 1;
cache.insert(key, std::to_string(key));
@ -13,9 +13,9 @@ TEST(GbCacheTest, InsertLookup) {
EXPECT_STREQ(std::to_string(key).c_str(), stored_data.c_str());
}
TEST(GbCacheTest, InsertLookupExpired) {
GbCache<int64_t, std::string> cache;
cache.configure(1000, 10, false, "test");
TEST(FxCacheTest, InsertLookupExpired) {
FxCache<int64_t, std::string> cache;
cache.configure(1000, 10, 1000000, true, "test");
int64_t key = 1;
cache.insert(key, std::to_string(key));
@ -25,9 +25,9 @@ TEST(GbCacheTest, InsertLookupExpired) {
EXPECT_FALSE(cache.lookup(key, &stored_data));
}
TEST(GbCacheTest, InsertLookupMaxed) {
GbCache<int64_t, std::string> cache;
cache.configure(60000, 5, false, "test");
TEST(FxCacheTest, InsertLookupMaxed) {
FxCache<int64_t, std::string> cache;
cache.configure(60000, 5, 1000000, true, "test");
for (int64_t key = 1; key <= 10; ++key) {
cache.insert(key, std::to_string(key));

@ -9,7 +9,7 @@ OBJECTS = GigablastTest.o GigablastTestUtils.o \
ContentTypeBlockListTest.o \
DirTest.o DnsBlockListTest.o \
FctypesTest.o \
GbCacheTest.o \
FxCacheTest.o \
HttpMimeTest.o \
JsonTest.o \
PosTest.o PosdbTest.o ProcessTest.o \

26
types.cpp Normal file

@ -0,0 +1,26 @@
//
// Copyright (C) 2017 Privacore ApS - https://www.privacore.com
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
// License TL;DR: If you change this file, you must publish your changes.
//
#include "types.h"
std::ostream& operator<<(std::ostream& os, const u_int96_t& key) {
const char *keyStr = KEYSTR((const void*)&key, sizeof(key));
os.write(keyStr, strlen(keyStr));
return os;
}

11
types.h

@ -4,6 +4,8 @@
#include <stdint.h>
#include <string.h>
#include "Sanity.h"
#include <functional>
#include <ostream>
//#include "collnum_t.h"
@ -718,5 +720,14 @@ static inline const char *KEYMAX() {
return (const char *)s_foo;
}
struct KeyHash {
std::size_t operator()(u_int96_t const& key) const noexcept {
std::size_t h1 = std::hash<uint64_t>{}(key.n0);
std::size_t h2 = std::hash<uint32_t>{}(key.n1);
return h1 ^ (h2 << 1);
}
};
std::ostream& operator<<(std::ostream& os, const u_int96_t& key);
#endif // GB_TYPES_H