Use GbMutex in UdpServer (and enable error checking in this branch)

This commit is contained in:
Ivan Skytte Jørgensen
2016-08-15 16:40:39 +02:00
parent 788fdf6bb1
commit 9128612253
3 changed files with 18 additions and 20 deletions

@ -82,6 +82,7 @@ OBJS = UdpSlot.o Rebalance.o \
# common flags
DEFS = -D_REENTRANT_ -I.
DEFS += -DDEBUG_MUTEXES
CPPFLAGS = -g -fno-stack-protector -DPTHREADS
CPPFLAGS += -std=c++11

@ -61,7 +61,6 @@ void UdpServer::reset() {
}
static pthread_mutex_t errorcheck_mutex = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP; //tmp hack
UdpServer::UdpServer ( ) {
m_sock = -1;
m_slots = NULL;
@ -69,8 +68,6 @@ UdpServer::UdpServer ( ) {
m_buf = NULL;
m_outstandingConverts = 0;
m_writeRegistered = false;
memcpy(&m_mtx,&errorcheck_mutex,sizeof(m_mtx));
//pthread_mutex_init(&m_mtx);
}
UdpServer::~UdpServer() {
@ -721,7 +718,7 @@ bool UdpServer::sendPoll(bool allowResends, int64_t now) {
// . f(x) = a*(now - startTime) + b/msgSize
// . verified that this is not interruptible
UdpSlot *UdpServer::getBestSlotToSend ( int64_t now ) {
assert(pthread_mutex_lock(&m_mtx)==EDEADLK);
m_mtx.verify_is_locked();
// . we send msgs that are mostly "caught up" with their acks first
// . the slot with the lowest score gets sent
// . re-sends have priority over NONre-sends(ACK was not recvd in time)
@ -1438,12 +1435,12 @@ bool UdpServer::makeCallbacks(int32_t niceness) {
// . return false on error and sets g_errno, true otherwise
// . return true if we called one
// . skip to next slot if did not call callback/handler
pthread_mutex_unlock(&m_mtx);
pthread_mutex_unlock(&m_mtx.mtx);
if (!makeCallback(slot)) {
pthread_mutex_lock(&m_mtx);
pthread_mutex_lock(&m_mtx.mtx);
continue;
}
pthread_mutex_lock(&m_mtx);
pthread_mutex_lock(&m_mtx.mtx);
// remove it from the callback list to avoid re-call
removeFromCallbackLinkedList(slot);
@ -2361,7 +2358,7 @@ bool UdpServer::timeoutDeadHosts ( Host *h ) {
// verified that this is not interruptible
UdpSlot *UdpServer::getEmptyUdpSlot(key_t k, bool incoming) {
assert(pthread_mutex_lock(&m_mtx)==EDEADLK);
m_mtx.verify_is_locked();
UdpSlot *slot = removeFromAvailableLinkedList();
// return NULL if none left
@ -2445,7 +2442,7 @@ UdpSlot* UdpServer::removeFromAvailableLinkedList() {
}
void UdpServer::addToCallbackLinkedList(UdpSlot *slot) {
assert(pthread_mutex_lock(&m_mtx)==EDEADLK);
m_mtx.verify_is_locked();
// debug log
if (g_conf.m_logDebugUdp) {
if (slot->getErrno()) {
@ -2476,7 +2473,7 @@ void UdpServer::addToCallbackLinkedList(UdpSlot *slot) {
}
bool UdpServer::isInCallbackLinkedList(UdpSlot *slot) {
assert(pthread_mutex_lock(&m_mtx)==EDEADLK);
m_mtx.verify_is_locked();
// return if not in the linked list
if ( slot->m_callbackListPrev || slot->m_callbackListNext || m_callbackListHead == slot ) {
return true;
@ -2485,7 +2482,7 @@ bool UdpServer::isInCallbackLinkedList(UdpSlot *slot) {
}
void UdpServer::removeFromCallbackLinkedList(UdpSlot *slot) {
assert(pthread_mutex_lock(&m_mtx)==EDEADLK);
m_mtx.verify_is_locked();
logDebug(g_conf.m_logDebugUdp, "udp: removing slot=%p from callback list", slot);
// return if not in the linked list
@ -2514,7 +2511,7 @@ void UdpServer::removeFromCallbackLinkedList(UdpSlot *slot) {
}
void UdpServer::addToActiveLinkedList(UdpSlot *slot) {
assert(pthread_mutex_lock(&m_mtx)==EDEADLK);
m_mtx.verify_is_locked();
logDebug(g_conf.m_logDebugUdp, "udp: adding slot=%p to active list", slot);
// put the used slot at the tail so older slots are at the head and
@ -2536,7 +2533,7 @@ void UdpServer::addToActiveLinkedList(UdpSlot *slot) {
}
void UdpServer::removeFromActiveLinkedList(UdpSlot *slot) {
assert(pthread_mutex_lock(&m_mtx)==EDEADLK);
m_mtx.verify_is_locked();
logDebug(g_conf.m_logDebugUdp, "udp: removing slot=%p from active list", slot);
// return if not in the linked list
@ -2566,7 +2563,7 @@ void UdpServer::removeFromActiveLinkedList(UdpSlot *slot) {
// verified that this is not interruptible
void UdpServer::freeUdpSlot(UdpSlot *slot ) {
assert(pthread_mutex_lock(&m_mtx)==EDEADLK);
m_mtx.verify_is_locked();
logDebug(g_conf.m_logDebugUdp, "udp: free slot=%p", slot);
removeFromActiveLinkedList(slot);
@ -2614,7 +2611,7 @@ void UdpServer::freeUdpSlot(UdpSlot *slot ) {
void UdpServer::cancel ( void *state , msg_type_t msgType ) {
// . if we have transactions in progress wait
// . but if we're waiting for a reply, don't bother
pthread_mutex_lock(&m_mtx);
pthread_mutex_lock(&m_mtx.mtx);
for ( UdpSlot *slot = m_activeListHead ; slot ; slot = slot->m_activeListNext ) {
// skip if not a match
if (slot->m_state != state || slot->getMsgType() != msgType) {
@ -2627,11 +2624,11 @@ void UdpServer::cancel ( void *state , msg_type_t msgType ) {
// let them know why we are calling the callback prematurely
g_errno = ECANCELLED;
// stop waiting for reply, this will call destroySlot(), too
pthread_mutex_unlock(&m_mtx);
pthread_mutex_unlock(&m_mtx.mtx);
makeCallback(slot);
pthread_mutex_lock(&m_mtx);
pthread_mutex_lock(&m_mtx.mtx);
}
pthread_mutex_unlock(&m_mtx);
pthread_mutex_unlock(&m_mtx.mtx);
}
void UdpServer::replaceHost ( Host *oldHost, Host *newHost ) {

@ -40,7 +40,7 @@
#include "Hostdb.h"
#include "Loop.h" // loop class that handles signals on our socket
#include "UdpStatistic.h"
#include <pthread.h>
#include "GbMutex.h"
static const int64_t udpserver_sendrequest_infinite_timeout = 999999999999;
@ -270,7 +270,7 @@ private:
// . msgTypes go from 0 to 64 i think (see UdpProtocol.h dgram header)
void (* m_handlers[MAX_MSG_TYPES])(UdpSlot *slot, int32_t niceness);
pthread_mutex_t m_mtx; //mutex protecting this instance.
GbMutex m_mtx; //mutex protecting this instance.
// when a call to sendto() blocks we set this to true so Loop.cpp
// will know to manually call sendPoll() rather than counting