Made Msge0 safe to be used by non-main thread

There were several assumptions that the callback was not called while submitting requests. THere is now a big, fat mutex on that.
This commit is contained in:
Ivan Skytte Jørgensen
2017-03-13 15:12:50 +01:00
parent 3fcdd92311
commit 0ee7a7136f
2 changed files with 22 additions and 15 deletions

@ -3,6 +3,7 @@
#include "Tagdb.h"
#include "ip.h"
#include "Mem.h"
#include "ScopedLock.h"
#include <new>
@ -21,6 +22,7 @@ Msge0::Msge0()
m_numRequests(0),
m_numReplies(0),
m_n(0),
m_mtx(),
m_state(NULL),
m_callback(NULL),
m_errno(0)
@ -117,13 +119,7 @@ bool Msge0::getTagRecs ( const char **urlPtrs ,
m_used[i] = false;
// . launch the requests
// . when a reply returns, the next request is launched for that url
// . we keep a msgESlot state for each active url in the buffer
// . we can have up to MAX_ACTIVE urls active
if ( ! launchRequests() ) return false;
// none blocked, we are done
return true;
return launchRequests();
}
// we only come back up here 1) in the very beginning or 2) when a url
@ -138,6 +134,7 @@ bool Msge0::launchRequests() {
// spider tables stuck with "getting outlink tag rec vector"statuses
const int32_t maxOut = g_udpServer.getNumUsedSlots() > 500 ? 1 : MAX_OUTSTANDING_MSGE0;
ScopedLock sl(m_mtx);
while(m_n < m_numUrls && m_numRequests - m_numReplies < maxOut) {
// if url is same host as the tagrec provided, just reference that!
if ( m_urlFlags && (m_urlFlags[m_n] & LF_SAMEHOST) && m_baseTagRec) {
@ -153,9 +150,7 @@ bool Msge0::launchRequests() {
// get the length
int32_t plen = strlen(p);
// . grab a slot
// . m_msg8as[i], m_msgCs[i], m_msg50s[i], m_msg20s[i]
int32_t i;
// make this 0 since "maxOut" now changes!!
for ( i = 0; i < MAX_OUTSTANDING_MSGE0 ; i++ )
if ( ! m_used[i] ) break;
// sanity check
@ -163,7 +158,7 @@ bool Msge0::launchRequests() {
// normalize the url
m_urls[i].set( p, plen );
// save the url number, "n"
m_ns [i] = m_n;
m_ns [i] = m_n++;
// claim it
m_used[i] = true;
@ -171,8 +166,6 @@ bool Msge0::launchRequests() {
// . this will start the pipeline for this url
m_numRequests++;
sendMsg8a(i);
// inc the url count
m_n++;
}
if( m_n >= m_numUrls )
@ -200,23 +193,34 @@ bool Msge0::sendMsg8a(int32_t slotIndex) {
// subsite.
if ( !m->getTagRec( &m_urls[slotIndex], m_collnum, m_niceness, m, gotTagRecWrapper, m_tagRecPtrs[n] ))
return false;
doneSending(slotIndex);
doneSending_unlocked(slotIndex);
return true;
}
void Msge0::gotTagRecWrapper(void *state) {
Msg8a *m = (Msg8a *)state;
//TagRec *m = (TagRec *)state;
Msg8a *m = reinterpret_cast<Msg8a*>(state);
Msge0 *THIS = m->m_msge0;
int32_t slotIndex = m->m_msge0State;
if(!THIS->m_used[slotIndex])
g_process.shutdownAbort(true);
THIS->doneSending(slotIndex);
// try to launch more, returns false if not done
if ( ! THIS->launchRequests() ) return;
// must be all done, call the callback
THIS->m_callback ( THIS->m_state );
}
void Msge0::doneSending(int32_t slotIndex) {
ScopedLock sl(m_mtx);
doneSending_unlocked(slotIndex);
}
void Msge0::doneSending_unlocked(int32_t slotIndex) {
// we are processing the nth url
int32_t n = m_ns[slotIndex];
// save the error if msg8a had one

@ -7,6 +7,7 @@
#include "Linkdb.h"
#include "Tagdb.h"
#include "GbMutex.h"
class Msge0 {
@ -36,6 +37,7 @@ private:
bool launchRequests();
bool sendMsg8a(int32_t slotIndex);
void doneSending(int32_t slotIndex);
void doneSending_unlocked(int32_t slotIndex);
collnum_t m_collnum;
int32_t m_niceness ;
@ -61,6 +63,7 @@ private:
int32_t m_numRequests;
int32_t m_numReplies;
int32_t m_n;
GbMutex m_mtx;
Url m_urls [ MAX_OUTSTANDING_MSGE0 ];
int32_t m_ns [ MAX_OUTSTANDING_MSGE0 ];