Msg3In: insert spiderrecords using batches

This commit is contained in:
Ivan Skytte Jørgensen
2017-10-26 15:38:44 +02:00
parent 127b80baa0
commit 02f27b42d3
3 changed files with 81 additions and 10 deletions

@ -379,12 +379,12 @@ static bool Msg4In::addMetaList(const char *p, UdpSlot *slot) {
break;
}
} else {
bool status = true;
for(auto const &item : rdbItem.second.m_items) {
status = SpiderdbRdbSqliteBridge::addRecord(item.m_collNum, item.m_rec, item.m_recSize);
if(!status)
break;
}
//transform record list into something the sqlite bridge understands
std::vector<SpiderdbRdbSqliteBridge::BatchedRecord> v;
for(auto const &item : rdbItem.second.m_items)
v.emplace_back(item.m_collNum, item.m_rec, item.m_recSize);
//then insert all at once
bool status = SpiderdbRdbSqliteBridge::addRecords(v);
if(!status)
break;
}

@ -9,23 +9,83 @@
#include "SpiderCache.h"
#include "SpiderColl.h"
#include "Conf.h"
#include <algorithm>
static bool addRecords(collnum_t collnum, std::vector<SpiderdbRdbSqliteBridge::BatchedRecord>::const_iterator begin, std::vector<SpiderdbRdbSqliteBridge::BatchedRecord>::const_iterator end);
static bool addRecord(collnum_t collnum, sqlite3 *db, const void *record, size_t record_len);
static bool addRequestRecord(sqlite3 *db, const void *record, size_t record_len);
static bool addReplyRecord(sqlite3 *db, const void *record, size_t record_len);
bool SpiderdbRdbSqliteBridge::addRecord(collnum_t collnum, const void *record, size_t record_len) {
if(KEYNEG((const char*)record)) {
log(LOG_ERROR,"sqlitespider: Got negative spiderrecord");
gbshutdownCorrupted();
bool SpiderdbRdbSqliteBridge::addRecords(const std::vector<BatchedRecord> &records) {
//copy&sort
auto records_copy(records);
std::sort(records_copy.begin(), records_copy.end(), [](const BatchedRecord &a, const BatchedRecord &b) {
return a.collnum < b.collnum;
});
//find ranges of same collnum, do each range at a time
auto range_begin = records_copy.begin();
while(range_begin != records_copy.end()) {
auto range_end = range_begin;
while(range_end != records_copy.end()) {
if(range_end->collnum == range_begin->collnum)
++range_end;
else
break;
}
if(!::addRecords(range_begin->collnum, range_begin, range_end))
return false;
range_begin = range_end;
}
return true;
}
static bool addRecords(collnum_t collnum, std::vector<SpiderdbRdbSqliteBridge::BatchedRecord>::const_iterator begin, std::vector<SpiderdbRdbSqliteBridge::BatchedRecord>::const_iterator end) {
sqlite3 *db = g_spiderdb_sqlite.getDb(collnum);
if(!db) {
log(LOG_ERROR,"sqlitespider: Could not get sqlite db for collection %d", collnum);
return false;
}
char *errmsg = NULL;
int rc = sqlite3_exec(db, "begin transaction", NULL, NULL, &errmsg);
if(rc!=SQLITE_OK) {
log(LOG_ERROR,"sqlitespider: could not start transaction: %s", errmsg);
return false;
}
for(auto iter = begin; iter!=end; ++iter) {
if(!addRecord(collnum, db, iter->record, iter->record_len)) {
sqlite3_exec(db, "rollback", NULL, NULL, &errmsg);
return false;
}
}
sqlite3_exec(db, "commit", NULL, NULL, &errmsg);
return true;
}
bool SpiderdbRdbSqliteBridge::addRecord(collnum_t collnum, const void *record, size_t record_len) {
sqlite3 *db = g_spiderdb_sqlite.getDb(collnum);
if(!db) {
log(LOG_ERROR,"sqlitespider: Could not get sqlite db for collection %d", collnum);
return false;
}
return addRecord(collnum,db,record,record_len);
}
static bool addRecord(collnum_t collnum, sqlite3 *db, const void *record, size_t record_len) {
if(KEYNEG((const char*)record)) {
log(LOG_ERROR,"sqlitespider: Got negative spiderrecord");
gbshutdownCorrupted();
}
bool rc;
if(Spiderdb::isSpiderRequest(reinterpret_cast<const key128_t *>(record)))
rc = addRequestRecord(db,record,record_len);

@ -2,6 +2,7 @@
#define SPIDERDB_RDB_SQLITE_BRIDGE_H_
#include "collnum_t.h"
#include <stddef.h>
#include <vector>
class RdbList;
class u_int128_t;
@ -13,6 +14,16 @@ namespace SpiderdbRdbSqliteBridge {
//Add a record (request or reply) to spiderdb. Returns false if something fails
bool addRecord(collnum_t collnum, const void *record, size_t record_len);
struct BatchedRecord {
collnum_t collnum;
const void *record;
size_t record_len;
BatchedRecord(collnum_t collnum_, const void *record_, size_t record_len_)
: collnum(collnum_), record(record_), record_len(record_len_)
{}
};
bool addRecords(const std::vector<BatchedRecord> &records);
//Fetch all records or a subset of the recoreds with startKey<=key<=endKey, and try to limit the rdblist size of recSizes
//Returns false on error
bool getList(collnum_t collnum,