make csv streamable, faster and take almost no memory.

This commit is contained in:
Matt Wells 2014-03-04 10:45:57 -08:00
parent 11efab9862
commit b1381cc610
3 changed files with 328 additions and 1 deletions

318
Msg40.cpp
View File

@ -1373,6 +1373,17 @@ bool Msg40::gotSummary ( ) {
printSearchResultsHeader ( st );
}
// when returning search results in csv let's get the first 100
// results and use those to determine the most common column headers
// for the csv. any results past those that have new json fields we
// will add a header for, but the column will not be labelled with
// the header name unfortunately.
m_needFirstReplies = 0;
if ( m_si->m_format == FORMAT_CSV ) {
m_needFirstReplies = m_msg3a.m_numDocIds;
if ( m_needFirstReplies > 100 ) m_needFirstReplies = 100;
}
for ( ; m_si && m_si->m_streamResults&&m_printi<m_msg3a.m_numDocIds ;
m_printi++){
// if we are waiting on our previous send to complete... wait..
@ -1381,6 +1392,34 @@ bool Msg40::gotSummary ( ) {
// get summary for result #m_printi
Msg20 *m20 = getCompletedSummary ( m_printi );
// if printing csv we need the first 100 results back
// to get the most popular csv headers for to print that
// as the first row in the csv output. if we print a
// results with a column not in the header row then we
// augment the headers then and there, although the header
// row will be blank for the new column, we can put
// the new header row at the end of the file i guess. this way
// we can immediately start streaming back the csv.
if ( m_needFirstReplies ) {
// need at least this many replies to process
if ( m_numReplies < m_needFirstReplies )
break;
// ensure we got the TOP needFirstReplies in order
// of their display to ensure consistency
long k;
for ( k = 0 ; k < m_needFirstReplies ; k++ ) {
Msg20 *xx = getCompletedSummary(k);
if ( ! xx ) break;
if ( ! xx->m_r ) break;
}
// if not all have come back yet, wait longer...
if ( k < m_needFirstReplies ) break;
// now make the csv header and print it
printCSVHeaderRow ( sb );
// and no longer need to do this logic
m_needFirstReplies = 0;
}
// otherwise, get the summary for result #m_printi
//Msg20 *m20 = m_msg20[m_printi];
@ -5002,3 +5041,282 @@ bool printHttpMime ( State0 *st ) {
sb->safeMemcpy(mime.getMime(),mime.getMimeLen() );
return true;
}
/////////////////
//
// CSV LOGIC from PageResults.cpp
//
/////////////////
// return 1 if a should be before b
static int csvPtrCmp ( const void *a, const void *b ) {
//JsonItem *ja = (JsonItem **)a;
//JsonItem *jb = (JsonItem **)b;
char *pa = *(char **)a;
char *pb = *(char **)b;
if ( strcmp(pa,"type") == 0 ) return -1;
if ( strcmp(pb,"type") == 0 ) return 1;
// force title on top
if ( strcmp(pa,"product.title") == 0 ) return -1;
if ( strcmp(pb,"product.title") == 0 ) return 1;
if ( strcmp(pa,"title") == 0 ) return -1;
if ( strcmp(pb,"title") == 0 ) return 1;
// otherwise string compare
int val = strcmp(pa,pb);
return val;
}
#include "Json.h"
// returns false and sets g_errno on error
bool printJsonItemInCSV ( char *json , SafeBuf *sb , State0 *st ) ;
//
// print header row in csv
//
bool Msg40::printCSVHeaderRow ( SafeBuf *sb ) {
//Msg40 *msg40 = &st->m_msg40;
//long numResults = msg40->getNumResults();
char tmp1[1024];
SafeBuf tmpBuf (tmp1 , 1024);
char tmp2[1024];
SafeBuf nameBuf (tmp2, 1024);
char nbuf[27000];
HashTableX nameTable;
if ( ! nameTable.set ( 8,4,2048,nbuf,27000,false,0,"ntbuf") )
return false;
long niceness = 0;
// . scan every fucking json item in the search results.
// . we still need to deal with the case when there are so many
// search results we have to dump each msg20 reply to disk in
// order. then we'll have to update this code to scan that file.
for ( long i = 0 ; i < m_needFirstReplies ; i++ ) {
Msg20 *m20 = getCompletedSummary(i);
if ( ! m20 ) break;
if ( m20->m_errno ) continue;
if ( ! m20->m_r ) { char *xx=NULL;*xx=0; }
Msg20Reply *mr = m20->m_r;
// get content
char *json = mr->ptr_content;
// how can it be empty?
if ( ! json ) continue;
// parse it up
Json jp;
jp.parseJsonStringIntoJsonItems ( json , niceness );
// scan each json item
for ( JsonItem *ji = jp.getFirstItem(); ji ; ji = ji->m_next ){
// skip if not number or string
if ( ji->m_type != JT_NUMBER &&
ji->m_type != JT_STRING )
return true;
// if in an array, do not print! csv is not
// good for arrays... like "media":[....] . that
// one might be ok, but if the elements in the
// array are not simple types, like, if they are
// unflat json objects then it is not well suited
// for csv.
if ( ji->isInArray() )
return true;
// reset length of buf to 0
tmpBuf.reset();
// . get the name of the item into "nameBuf"
// . returns false with g_errno set on error
if ( ! ji->getCompoundName ( tmpBuf ) )
return false;
// is it new?
long long h64 = hash64n ( tmpBuf.getBufStart() );
if ( nameTable.isInTable ( &h64 ) ) continue;
// record offset of the name for our hash table
long nameBufOffset = nameBuf.length();
// store the name in our name buffer
if ( ! nameBuf.safeStrcpy ( tmpBuf.getBufStart() ) )
return false;
if ( ! nameBuf.pushChar ( '\0' ) )
return false;
// it's new. add it
if ( ! nameTable.addKey ( &h64 , &nameBufOffset ) )
return false;
}
}
// . make array of ptrs to the names so we can sort them
// . try to always put title first regardless
char *ptrs [ 1024 ];
long numPtrs = 0;
for ( long i = 0 ; i < nameTable.m_numSlots ; i++ ) {
if ( ! nameTable.m_flags[i] ) continue;
long off = *(long *)nameTable.getValueFromSlot(i);
char *p = nameBuf.getBufStart() + off;
ptrs[numPtrs++] = p;
if ( numPtrs >= 1024 ) break;
}
// sort them
qsort ( ptrs , numPtrs , 4 , csvPtrCmp );
// set up table to map field name to column for printing the json items
HashTableX *columnTable = &m_columnTable;
if ( ! columnTable->set ( 8,4, numPtrs * 4,NULL,0,false,0,"coltbl" ) )
return false;
// now print them out as the header row
for ( long i = 0 ; i < numPtrs ; i++ ) {
if ( i > 0 && ! sb->pushChar(',') ) return false;
if ( ! sb->safeStrcpy ( ptrs[i] ) ) return false;
// record the hash of each one for printing out further json
// objects in the same order so columns are aligned!
long long h64 = hash64n ( ptrs[i] );
if ( ! columnTable->addKey ( &h64 , &i ) )
return false;
}
m_numCSVColumns = numPtrs;
if ( ! sb->pushChar('\n') )
return false;
if ( ! sb->nullTerm() )
return false;
return true;
}
// returns false and sets g_errno on error
bool Msg40::printJsonItemInCSV ( char *json , SafeBuf *sb ) { // , State0 *st
long niceness = 0;
// parse the json
Json jp;
jp.parseJsonStringIntoJsonItems ( json , niceness );
HashTableX *columnTable = &m_columnTable;
long numCSVColumns = m_numCSVColumns;
// make buffer space that we need
char ttt[1024];
SafeBuf ptrBuf(ttt,1024);
long maxCols = numCSVColumns;
// allow for additionals colls
maxCols += 100;
long need = maxCols * sizeof(JsonItem *);
if ( ! ptrBuf.reserve ( need ) ) return false;
JsonItem **ptrs = (JsonItem **)ptrBuf.getBufStart();
// reset json item ptrs for csv columns. all to NULL
memset ( ptrs , 0 , need );
char tmp1[1024];
SafeBuf tmpBuf (tmp1 , 1024);
JsonItem *ji;
///////
//
// print json item in csv
//
///////
for ( ji = jp.getFirstItem(); ji ; ji = ji->m_next ) {
// skip if not number or string
if ( ji->m_type != JT_NUMBER &&
ji->m_type != JT_STRING )
continue;
// skip if not well suited for csv (see above comment)
if ( ji->isInArray() ) continue;
// . get the name of the item into "nameBuf"
// . returns false with g_errno set on error
if ( ! ji->getCompoundName ( tmpBuf ) )
return false;
// is it new?
long long h64 = hash64n ( tmpBuf.getBufStart() );
long slot = columnTable->getSlot ( &h64 ) ;
// MUST be in there
if ( slot < 0 ) { char *xx=NULL;*xx=0;}
// get col #
long column = *(long *)columnTable->getValueFromSlot ( slot );
// sanity
if ( column >= numCSVColumns ) {
// add a new column...
long newColnum = numCSVColumns + 1;
// silently drop it if we already have too many cols
if ( newColnum >= maxCols ) continue;
columnTable->addKey ( &h64 , &newColnum );
column = newColnum;
numCSVColumns++;
//char *xx=NULL;*xx=0; }
}
// set ptr to it for printing when done parsing every field
// for this json item
ptrs[column] = ji;
}
// now print out what we got
for ( long i = 0 ; i < numCSVColumns ; i++ ) {
// , delimeted
if ( i > 0 ) sb->pushChar(',');
// get it
ji = ptrs[i];
// skip if none
if ( ! ji ) continue;
// skip "html" field... too spammy for csv and > 32k causes
// libreoffice calc to truncate it and break its parsing
if ( ji->m_name &&
//! ji->m_parent &&
strcmp(ji->m_name,"html")==0)
continue;
//
// get value and print otherwise
//
if ( ji->m_type == JT_NUMBER ) {
// print numbers without double quotes
if ( ji->m_valueDouble *10000000.0 ==
(double)ji->m_valueLong * 10000000.0 )
sb->safePrintf("%li",ji->m_valueLong);
else
sb->safePrintf("%f",ji->m_valueDouble);
continue;
}
// print the value
sb->pushChar('\"');
sb->csvEncode ( ji->getValue() , ji->getValueLen() );
sb->pushChar('\"');
}
sb->pushChar('\n');
sb->nullTerm();
return true;
}

View File

@ -206,6 +206,11 @@ class Msg40 {
long m_lastHeartbeat;
bool printSearchResult9 ( long ix ) ;
HashTableX m_columnTable;
bool printCSVHeaderRow ( class SafeBuf *sb );
bool printJsonItemInCSV ( char *json , SafeBuf *sb ) ;
long m_numCSVColumns;
HashTableX m_dedupTable;
@ -218,7 +223,9 @@ class Msg40 {
// incoming parameters
void *m_state;
void (* m_callback ) ( void *state );
long m_needFirstReplies;
// max outstanding msg20s
//long m_maxOutstanding;

View File

@ -225,6 +225,8 @@ bool sendBackDump ( TcpSocket *sock, HttpRequest *hr ) {
// which is super fast.
"dr=%li&"
"c=%s&n=1000000&"
// stream it now
"stream=1&"
// no summary similarity dedup, only exact
// doc content hash. otherwise too slow!!
"pss=0&"