add logic to save memory when streaming

over 200 results back. should fix oom when
streaming back hundreds of thousands of results.
This commit is contained in:
Matt Wells
2014-02-26 20:33:35 -08:00
parent a0697e1bb5
commit b429f12346
3 changed files with 56 additions and 6 deletions

@ -784,6 +784,7 @@ class Msg20 {
void reset ();
long m_hack;
long m_i;
// is the reply in progress? if msg20 has not launched a request
// this is false. if msg20 received its reply, this is false.

@ -736,6 +736,8 @@ bool Msg40::reallocMsg20Buf ( ) {
// MDW: try to preserve the old Msg20s if we are being re-called
if ( m_buf2 ) {
// we do not do recalls when streaming yet
if ( m_si->m_streamResults ) { char *xx=NULL;*xx=0; }
// use these 3 vars for mismatch stat reporting
//long mismatches = 0;
//long long mismatch1 = 0LL;
@ -879,19 +881,30 @@ bool Msg40::reallocMsg20Buf ( ) {
return true;
}
// do the alloc
m_buf2 = NULL;
m_bufMaxSize2 = need;
m_numMsg20s = m_msg3a.m_numDocIds;
// when streaming because we can have hundreds of thousands of
// search results we recycle a few msg20s to save mem
if ( m_si->m_streamResults ) {
long max = MAX_OUTSTANDING_MSG20S;
if ( m_msg3a.m_numDocIds < max ) max = m_msg3a.m_numDocIds;
need = max * (4+sizeof(Msg20));
m_numMsg20s = max;
}
// do the alloc
if ( need ) m_buf2 = (char *)mmalloc ( need ,"Msg40msg20");
if ( need && ! m_buf2 ) { m_errno = g_errno; return false; }
// point to the mem
char *p = m_buf2;
// point to the array, then make p point to the Msg20 buffer space
m_msg20 = (Msg20 **)p; p += m_msg3a.m_numDocIds * sizeof(Msg20 *);
m_msg20 = (Msg20 **)p; p += m_numMsg20s * sizeof(Msg20 *);
// start free here
m_msg20StartBuf = p;
// set the m_msg20[] array to use this memory, m_buf20
for ( long i = 0 ; i < m_msg3a.m_numDocIds ; i++ ) {
for ( long i = 0 ; i < m_numMsg20s ; i++ ) {
// assume empty
m_msg20[i] = NULL;
// if clustered, do a NULL ptr
@ -906,7 +919,7 @@ bool Msg40::reallocMsg20Buf ( ) {
m_numToFree++;
}
// remember how many we got in here in case we have to realloc above
m_numMsg20s = m_msg3a.m_numDocIds;
//m_numMsg20s = m_msg3a.m_numDocIds;
return true;
}
@ -1024,8 +1037,19 @@ bool Msg40::launchMsg20s ( bool recalled ) {
//if ( i <= m_lastProcessedi ) continue;
// do not repeat for this i
m_lastProcessedi = i;
// start up a Msg20 to get the summary
Msg20 *m = m_msg20[i];
Msg20 *m = NULL;
if ( m_si->m_streamResults ) {
// there can be hundreds of thousands of results
// when streaming, so recycle a few msg20s to save mem
m = getAvailMsg20();
// mark it so we know which docid it goes with
m->m_i = i;
}
else
m = m_msg20[i];
// if msg20 ptr null that means the cluster level is not CR_OK
if ( ! m ) {
m_numRequests++;
@ -1209,6 +1233,25 @@ bool Msg40::launchMsg20s ( bool recalled ) {
return gotSummary ( );
}
Msg20 *Msg40::getAvailMsg20 ( ) {
for ( long i = 0 ; i < m_numMsg20s ; i++ ) {
// m_inProgress is set to false right before it
// calls Msg20::m_callback which is gotSummaryWrapper()
// so we should be ok with this
if ( ! m_msg20[i]->m_inProgress ) return m_msg20[i];
}
// how can this happen???
char *xx=NULL;*xx=0;
return NULL;
}
Msg20 *Msg40::getCompletedSummary ( long ix ) {
for ( long i = 0 ; i < m_numMsg20s ; i++ )
if ( m_msg20[i]->m_i == ix ) return m_msg20[i];
return NULL;
}
bool gotSummaryWrapper ( void *state ) {
Msg40 *THIS = (Msg40 *)state;
// inc it here
@ -1328,8 +1371,12 @@ bool Msg40::gotSummary ( ) {
// if we are waiting on our previous send to complete... wait..
if ( m_sendsOut > m_sendsIn ) break;
// get summary for result #m_printi
Msg20 *m20 = getCompletedSummary ( m_printi );
// otherwise, get the summary for result #m_printi
Msg20 *m20 = m_msg20[m_printi];
//Msg20 *m20 = m_msg20[m_printi];
if ( ! m20 ) {
log("msg40: m20 NULL #%li",m_printi);
continue;

@ -132,6 +132,8 @@ class Msg40 {
// keep these public since called by wrapper functions
bool gotDocIds ( ) ;
bool launchMsg20s ( bool recalled ) ;
class Msg20 *getAvailMsg20();
class Msg20 *getCompletedSummary ( long ix );
bool getSummaries ( ) ;
bool gotSummary ( ) ;
bool reallocMsg20Buf ( ) ;