a whole new threads stack

This commit is contained in:
Matt 2015-09-03 11:59:43 -06:00
parent 129c9d65db
commit b50f36cd73
5 changed files with 744 additions and 145 deletions

@ -1273,9 +1273,9 @@ bool readwrite_r ( FileState *fstate , ThreadEntry *t ) {
if ( t && t->m_callback == ohcrap ) return false;
// only set this now if we are the first one
if ( g_threads.m_threadQueues[DISK_THREAD].m_hiReturned ==
g_threads.m_threadQueues[DISK_THREAD].m_hiLaunched )
g_lastDiskReadStarted = fstate->m_startTime;
// if ( g_threads.m_threadQueues[DISK_THREAD].m_hiReturned ==
// g_threads.m_threadQueues[DISK_THREAD].m_hiLaunched )
// g_lastDiskReadStarted = fstate->m_startTime;
// fake it out
//static int32_t s_poo = 0;

@ -2017,12 +2017,12 @@ void Loop::doPoll ( ) {
// if shutting down was it a sigterm ?
if ( m_shutdown ) goto again;
// handle returned threads for niceness 0
if ( g_threads.m_needsCleanup )
g_threads.timedCleanUp(-3,0); // 3 ms
//if ( g_threads.m_needsCleanup )
g_threads.timedCleanUp(-3,0); // 3 ms
if ( m_inQuickPoll ) goto again;
// high niceness threads
if ( g_threads.m_needsCleanup )
g_threads.timedCleanUp(-4,MAX_NICENESS); //3 ms
//if ( g_threads.m_needsCleanup )
g_threads.timedCleanUp(-4,MAX_NICENESS); //3 ms
goto again;
}

@ -29,24 +29,31 @@ bool sendPageThreads ( TcpSocket *s , HttpRequest *r ) {
int32_t loActive = q->m_loLaunched - q->m_loReturned;
int32_t mdActive = q->m_mdLaunched - q->m_mdReturned;
int32_t hiActive = q->m_hiLaunched - q->m_hiReturned;
int32_t total = loActive + mdActive + hiActive;
// int32_t loActive = q->m_loLaunched - q->m_loReturned;
// int32_t mdActive = q->m_mdLaunched - q->m_mdReturned;
// int32_t hiActive = q->m_hiLaunched - q->m_hiReturned;
// int32_t total = loActive + mdActive + hiActive;
int32_t total = q->m_launched - q->m_returned;
p.safePrintf ( "<table %s>"
"<tr class=hdrow><td colspan=\"11\">"
//"<center>"
//"<font size=+1>"
"<b>Thread Type: %s"
" (low: %"INT32""
" med: %"INT32""
" high: %"INT32""
" total: %"INT32")</td></tr>",
// " (low: %"INT32""
// " med: %"INT32""
// " high: %"INT32""
" (launched: %"INT32" "
"returned: %"INT32" "
"total: %"INT32")</td></tr>",
TABLE_STYLE,
q->getThreadType(),
loActive, mdActive,
hiActive, total);
// loActive, mdActive,
// hiActive,
(int32_t)q->m_launched,
(int32_t)q->m_returned,
total);
p.safePrintf ("<tr bgcolor=#%s>"
@ -59,19 +66,20 @@ bool sendPageThreads ( TcpSocket *s , HttpRequest *r ) {
"<td><b>Callback</b></td>"
"<td><b>Routine</b></td>"
"<td><b>Bytes Done</b></td>"
"<td><b>KBytes/Sec</b></td>"
"<td><b>Megabytes/Sec</b></td>"
"<td><b>Read|Write</b></td>"
"</tr>"
, LIGHT_BLUE
);
for ( int32_t j = 0 ; j < q->m_top ; j++ ) {
for ( int32_t j = 0 ; j < q->m_maxEntries ; j++ ) {
ThreadEntry *t = &q->m_entries[j];
if(!t->m_isOccupied) continue;
FileState *fs = (FileState *)t->m_state;
bool diskThread = false;
if(q->m_threadType == DISK_THREAD && fs) diskThread = true;
if(q->m_threadType == DISK_THREAD && fs)
diskThread = true;
// might have got pre-called from EDISKSTUCK
if ( ! t->m_callback ) fs = NULL;
@ -81,18 +89,18 @@ bool sendPageThreads ( TcpSocket *s , HttpRequest *r ) {
if(t->m_isDone) {
p.safePrintf("<td><font color='red'><b>done</b></font></td>");
p.safePrintf("<td>%"INT32"</td>", t->m_niceness);
p.safePrintf("<td>%"INT64"</td>", t->m_launchedTime - t->m_queuedTime); //queued
p.safePrintf("<td>%"INT64"</td>", t->m_exitTime - t->m_launchedTime); //run time
p.safePrintf("<td>%"INT64"</td>", now - t->m_exitTime); //cleanup
p.safePrintf("<td>%"INT64"</td>", now - t->m_queuedTime); //total
p.safePrintf("<td>%"INT64"ms</td>", t->m_launchedTime - t->m_queuedTime); //queued
p.safePrintf("<td>%"INT64"ms</td>", t->m_exitTime - t->m_launchedTime); //run time
p.safePrintf("<td>%"INT64"ms</td>", now - t->m_exitTime); //cleanup
p.safePrintf("<td>%"INT64"ms</td>", now - t->m_queuedTime); //total
p.safePrintf("<td>%s</td>", g_profiler.getFnName((PTRTYPE)t->m_callback));
p.safePrintf("<td>%s</td>", g_profiler.getFnName((PTRTYPE)t->m_startRoutine));
if(diskThread && fs) {
int64_t took = (t->m_exitTime - t->m_launchedTime);
if(took <= 0) took = 1;
p.safePrintf("<td>%"INT32"/%"INT32"</td>", t->m_bytesToGo, t->m_bytesToGo);
p.safePrintf("<td>%.2f kbps</td>", (float)t->m_bytesToGo/took);
p.safePrintf("<td>%s</td>",t->m_doWrite? "Write":"Read");
p.safePrintf("<td>%.2f MB/s</td>", (float)t->m_bytesToGo/(1024.0*1024.0)/((float)took/1000.0));
p.safePrintf("<td>%s</td>",t->m_doWrite? "<font color=red>Write</font>":"Read");
}
else {
p.safePrintf("<td>--</td>");
@ -151,7 +159,7 @@ bool sendPageThreads ( TcpSocket *s , HttpRequest *r ) {
}
/*
int32_t loActiveBig = disk->m_loLaunchedBig - disk->m_loReturnedBig;
int32_t loActiveMed = disk->m_loLaunchedMed - disk->m_loReturnedMed;
int32_t loActiveSma = disk->m_loLaunchedSma - disk->m_loReturnedSma;
@ -208,7 +216,7 @@ bool sendPageThreads ( TcpSocket *s , HttpRequest *r ) {
"<td><b>Active Write Threads</b></td><td>%"INT32"</td>"
"</tr></table>",
activeWrites);
*/
return g_httpServer.sendDynamicPage ( s , (char*) p.getBufStart() ,
p.length() );

File diff suppressed because it is too large Load Diff

@ -59,6 +59,13 @@ class ThreadEntry {
bool m_needsJoin;
pthread_t m_joinTid;
class ThreadEntry *m_nextLink;
class ThreadEntry *m_prevLink;
// the waiting linked list we came from
ThreadEntry **m_bestHeadPtr;
ThreadEntry **m_bestTailPtr;
};
//#define MAX_THREAD_ENTRIES 1024
@ -85,6 +92,33 @@ class ThreadQueue {
int32_t m_entriesSize;
int32_t m_maxEntries;
// linked list head for launched thread entries
ThreadEntry *m_launchedHead;
// linked list head for empty thread entries
ThreadEntry *m_emptyHead;
// 8 heads/tails for linked lists of thread entries waiting to launch
ThreadEntry *m_waitHead0;
ThreadEntry *m_waitHead1;
ThreadEntry *m_waitHead2;
ThreadEntry *m_waitHead3;
ThreadEntry *m_waitHead4;
ThreadEntry *m_waitHead5;
ThreadEntry *m_waitHead6;
ThreadEntry *m_waitHead7;
ThreadEntry *m_waitTail0;
ThreadEntry *m_waitTail1;
ThreadEntry *m_waitTail2;
ThreadEntry *m_waitTail3;
ThreadEntry *m_waitTail4;
ThreadEntry *m_waitTail5;
ThreadEntry *m_waitTail6;
ThreadEntry *m_waitTail7;
/*
// counts the high/low priority (niceness <= 0) threads
int64_t m_hiLaunched;
int64_t m_hiReturned;
@ -114,6 +148,7 @@ class ThreadQueue {
int64_t m_mdReturnedSma;
int64_t m_loLaunchedSma;
int64_t m_loReturnedSma;
*/
// init
bool init (char threadType, int32_t maxThreads, int32_t maxEntries);
@ -143,6 +178,13 @@ class ThreadQueue {
// . returns false and sets errno on error
bool launchThread2 ( ThreadEntry *te );
bool launchThreadForReals ( ThreadEntry **headPtr ,
ThreadEntry **tailPtr ) ;
void removeThreads2 ( ThreadEntry **headPtr ,
ThreadEntry **tailPtr ,
class BigFile *bf ) ;
void print ( ) ;
// these are called by g_udpServer2, the high priority udp server
@ -245,10 +287,12 @@ class Threads {
int32_t getNumThreadQueues() { return m_numQueues; }
// used by UdpServer to see if it should call a low priority callback
int32_t getNumActiveHighPriorityCpuThreads() ;
//int32_t getNumActiveHighPriorityCpuThreads() ;
// all high priority threads...
int32_t getNumActiveHighPriorityThreads() ;
bool hasHighPriorityCpuThreads() ;
int32_t getNumThreadsOutOrQueued();
// counts the high/low priority (niceness <= 0) threads