forked from Mirrors/privacore-open-source-search-engine
Merge branch 'master' into nomerge2
This commit is contained in:
119
RdbBuckets.cpp
119
RdbBuckets.cpp
@ -263,7 +263,7 @@ bool RdbBucket::sort() {
|
||||
char *p = mergeBuf;
|
||||
char v;
|
||||
char *lastKey = NULL;
|
||||
int32_t br = 0; //bytesRemoved (abbreviated for column width)
|
||||
int32_t bytesRemoved = 0;
|
||||
int32_t dso = ks + sizeof(char*);//datasize offset
|
||||
int32_t numNeg = 0;
|
||||
|
||||
@ -275,9 +275,9 @@ bool RdbBucket::sort() {
|
||||
//this is a dup, we are removing data
|
||||
if (fixedDataSize != 0) {
|
||||
if (fixedDataSize == -1) {
|
||||
br += *(int32_t *)(lastKey + dso);
|
||||
bytesRemoved += *(int32_t *)(lastKey + dso);
|
||||
} else {
|
||||
br += fixedDataSize;
|
||||
bytesRemoved += fixedDataSize;
|
||||
}
|
||||
}
|
||||
if (KEYNEG(lastKey)) {
|
||||
@ -323,9 +323,9 @@ bool RdbBucket::sort() {
|
||||
//this is a dup, we are removing data
|
||||
if (fixedDataSize != 0) {
|
||||
if (fixedDataSize == -1) {
|
||||
br += *(int32_t *)(lastKey + dso);
|
||||
bytesRemoved += *(int32_t *)(lastKey + dso);
|
||||
} else {
|
||||
br += fixedDataSize;
|
||||
bytesRemoved += fixedDataSize;
|
||||
}
|
||||
}
|
||||
if (KEYNEG(lastKey)) {
|
||||
@ -342,9 +342,9 @@ bool RdbBucket::sort() {
|
||||
if (lastKey && KEYCMPNEGEQ(list2, lastKey, ks) == 0) {
|
||||
if (fixedDataSize != 0) {
|
||||
if (fixedDataSize == -1) {
|
||||
br += *(int32_t *)(lastKey + dso);
|
||||
bytesRemoved += *(int32_t *)(lastKey + dso);
|
||||
} else {
|
||||
br += fixedDataSize;
|
||||
bytesRemoved += fixedDataSize;
|
||||
}
|
||||
}
|
||||
if (KEYNEG(lastKey)) {
|
||||
@ -364,7 +364,7 @@ bool RdbBucket::sort() {
|
||||
|
||||
//we compacted out the dups, so reflect that here
|
||||
int32_t newNumKeys = (p - mergeBuf) / recSize;
|
||||
m_parent->updateNumRecs_unlocked(newNumKeys - m_numKeys, -br, -numNeg);
|
||||
m_parent->updateNumRecs_unlocked(newNumKeys - m_numKeys, -bytesRemoved, -numNeg);
|
||||
m_numKeys = newNumKeys;
|
||||
|
||||
if (m_keys != mergeBuf) {
|
||||
@ -633,7 +633,7 @@ RdbBuckets::RdbBuckets()
|
||||
m_dir = NULL;
|
||||
m_state = NULL;
|
||||
m_callback = NULL;
|
||||
m_saveErrno = 0;
|
||||
m_errno = 0;
|
||||
m_allocName = NULL;
|
||||
}
|
||||
|
||||
@ -2112,30 +2112,32 @@ bool RdbBuckets::fastSave(const char *dir, bool useThread, void *state, void (*c
|
||||
// note it
|
||||
logf(LOG_INFO, "db: Saving %s%s-buckets-saved.dat", dir, m_dbname);
|
||||
|
||||
// do not use thread for now!! test it to make sure that was
|
||||
// not the problem
|
||||
//useThread = false;
|
||||
|
||||
// save parms
|
||||
m_dir = dir;
|
||||
m_state = state;
|
||||
m_callback = callback;
|
||||
// assume no error
|
||||
m_saveErrno = 0;
|
||||
m_errno = 0;
|
||||
// no adding to the tree now
|
||||
m_isSaving = true;
|
||||
|
||||
// . this returns false and sets g_errno on error
|
||||
// . must now lock for each bucket when saving that bucket, but
|
||||
// release lock to breathe between buckets
|
||||
fastSave_unlocked();
|
||||
if (useThread) {
|
||||
// make this a thread now
|
||||
if (g_jobScheduler.submit(saveWrapper, saveDoneWrapper, this, thread_type_unspecified_io, 1/*niceness*/)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// store save error into g_errno
|
||||
g_errno = m_saveErrno;
|
||||
// resume adding to the tree
|
||||
m_isSaving = false;
|
||||
// we do not need to be saved now?
|
||||
m_needsSave = false;
|
||||
// if it failed
|
||||
if (g_jobScheduler.are_new_jobs_allowed()) {
|
||||
log(LOG_WARN, "db: Thread creation failed. Blocking while saving tree. Hurts performance.");
|
||||
}
|
||||
}
|
||||
|
||||
sl.unlock();
|
||||
|
||||
// no threads
|
||||
saveWrapper(this);
|
||||
saveDoneWrapper(this, job_exit_normal);
|
||||
|
||||
logTrace(g_conf.m_logTraceRdbBuckets, "END. Returning true.");
|
||||
|
||||
@ -2144,6 +2146,62 @@ bool RdbBuckets::fastSave(const char *dir, bool useThread, void *state, void (*c
|
||||
}
|
||||
|
||||
|
||||
void RdbBuckets::saveWrapper(void *state) {
|
||||
logTrace(g_conf.m_logTraceRdbBuckets, "BEGIN");
|
||||
|
||||
// get this class
|
||||
RdbBuckets *that = (RdbBuckets *)state;
|
||||
|
||||
ScopedLock sl(that->getLock());
|
||||
|
||||
// assume no error since we're at the start of thread call
|
||||
that->m_errno = 0;
|
||||
|
||||
// this returns false and sets g_errno on error
|
||||
that->fastSave_unlocked();
|
||||
|
||||
// . resume adding to the tree
|
||||
// . this will also allow other threads to be queued
|
||||
// . if we did this at the end of the thread we could end up with
|
||||
// an overflow of queued SAVETHREADs
|
||||
that->m_isSaving = false;
|
||||
|
||||
// we do not need to be saved now?
|
||||
that->m_needsSave = false;
|
||||
|
||||
if (g_errno && !that->m_errno) {
|
||||
that->m_errno = g_errno;
|
||||
}
|
||||
|
||||
if (that->m_errno) {
|
||||
log(LOG_ERROR, "db: Had error saving tree to disk for %s: %s.", that->m_dbname, mstrerror(that->m_errno));
|
||||
} else {
|
||||
log(LOG_INFO, "db: Done saving %s with %" PRId32" keys (%" PRId64" bytes)",
|
||||
that->m_dbname, that->m_numKeysApprox, that->m_bytesWritten);
|
||||
}
|
||||
|
||||
logTrace(g_conf.m_logTraceRdbBuckets, "END");
|
||||
}
|
||||
|
||||
/// @todo ALC cater for when exit_type != job_exit_normal
|
||||
// we come here after thread exits
|
||||
void RdbBuckets::saveDoneWrapper(void *state, job_exit_t exit_type) {
|
||||
logTrace(g_conf.m_logTraceRdbBuckets, "BEGIN");
|
||||
|
||||
// get this class
|
||||
RdbBuckets *that = (RdbBuckets*)state;
|
||||
|
||||
// store save error into g_errno
|
||||
g_errno = that->m_errno;
|
||||
|
||||
// . call callback
|
||||
if (that->m_callback) {
|
||||
that->m_callback(that->m_state);
|
||||
}
|
||||
|
||||
logTrace(g_conf.m_logTraceRdbBuckets, "END");
|
||||
}
|
||||
|
||||
// . returns false and sets g_errno on error
|
||||
// . NO USING g_errno IN A DAMN THREAD!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
bool RdbBuckets::fastSave_unlocked() {
|
||||
@ -2159,7 +2217,7 @@ bool RdbBuckets::fastSave_unlocked() {
|
||||
sprintf(s, "%s/%s-buckets-saving.dat", m_dir, m_dbname);
|
||||
int fd = ::open(s, O_RDWR | O_CREAT | O_TRUNC, getFileCreationFlags());
|
||||
if (fd < 0) {
|
||||
m_saveErrno = errno;
|
||||
m_errno = errno;
|
||||
log(LOG_ERROR, "db: Could not open %s for writing: %s.", s, mstrerror(errno));
|
||||
return false;
|
||||
}
|
||||
@ -2168,7 +2226,8 @@ bool RdbBuckets::fastSave_unlocked() {
|
||||
errno = 0;
|
||||
|
||||
// . save the header
|
||||
int64_t offset = fastSaveColl_unlocked(fd);
|
||||
// remember total bytes written
|
||||
m_bytesWritten = fastSaveColl_unlocked(fd);
|
||||
|
||||
// close it up
|
||||
close(fd);
|
||||
@ -2180,9 +2239,7 @@ bool RdbBuckets::fastSave_unlocked() {
|
||||
gbshutdownAbort(true);
|
||||
}
|
||||
|
||||
log(LOG_INFO, "db: RdbBuckets saved %" PRId32" keys, %" PRId64" bytes for %s", getNumKeys_unlocked(), offset, m_dbname);
|
||||
|
||||
return offset >= 0;
|
||||
return m_bytesWritten >= 0;
|
||||
}
|
||||
|
||||
int64_t RdbBuckets::fastSaveColl_unlocked(int fd) {
|
||||
@ -2234,7 +2291,7 @@ int64_t RdbBuckets::fastSaveColl_unlocked(int fd) {
|
||||
|
||||
// bitch on error
|
||||
if (errno) {
|
||||
m_saveErrno = errno;
|
||||
m_errno = errno;
|
||||
close(fd);
|
||||
log(LOG_ERROR, "db: Failed to save buckets for %s: %s.", m_dbname, mstrerror(errno));
|
||||
return -1;
|
||||
@ -2246,7 +2303,7 @@ int64_t RdbBuckets::fastSaveColl_unlocked(int fd) {
|
||||
// returns -1 on error
|
||||
if (offset < 0) {
|
||||
close(fd);
|
||||
m_saveErrno = errno;
|
||||
m_errno = errno;
|
||||
log(LOG_ERROR, "db: Failed to save buckets for %s: %s.", m_dbname, mstrerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
10
RdbBuckets.h
10
RdbBuckets.h
@ -28,6 +28,7 @@
|
||||
#include "rdbid_t.h"
|
||||
#include "types.h"
|
||||
#include "GbMutex.h"
|
||||
#include "JobScheduler.h"
|
||||
|
||||
class BigFile;
|
||||
class RdbList;
|
||||
@ -43,6 +44,8 @@ public:
|
||||
void clear();
|
||||
void reset();
|
||||
|
||||
GbMutex& getLock() { return m_mtx; }
|
||||
|
||||
bool set(int32_t fixedDataSize, int32_t maxMem, const char *allocName, rdbid_t rdbId, const char *dbname,
|
||||
char keySize);
|
||||
|
||||
@ -101,6 +104,9 @@ public:
|
||||
bool loadBuckets(const char *dbname);
|
||||
|
||||
private:
|
||||
static void saveWrapper(void *state);
|
||||
static void saveDoneWrapper(void *state, job_exit_t exit_type);
|
||||
|
||||
void reset_unlocked();
|
||||
|
||||
bool resizeTable_unlocked(int32_t numNeeded);
|
||||
@ -163,7 +169,9 @@ private:
|
||||
|
||||
void (*m_callback)(void *state);
|
||||
|
||||
int32_t m_saveErrno;
|
||||
int64_t m_bytesWritten;
|
||||
|
||||
int32_t m_errno;
|
||||
const char *m_allocName;
|
||||
};
|
||||
|
||||
|
Reference in New Issue
Block a user