356 lines
14 KiB
C++
356 lines
14 KiB
C++
// Copyright Matt Wells Nov 2000
|
|
|
|
// TODO: use temporary bufs for each UdpSlot to avoid mallocs for reads
|
|
|
|
// . A reliable udp/datagram server
|
|
// . non-blocking, no threads
|
|
// . works on I/O interrupts by registering callbacks with the Loop class
|
|
// . great for broadcasting to many non-local IPs
|
|
// . great for communicating with thousands of random machines to avoid
|
|
// the connect/close overhead associated with TCP
|
|
// . the UdpSlot holds the details for one transaction
|
|
// . the UdpSlot is like a socket
|
|
// . we use transactionIds to link incoming replies w/ the initiating requests
|
|
// . the key of the UdpSlot is based on transactionId and ip/port of dest. host
|
|
// . when sending a request you supply a callback to be called on completion
|
|
// of that transaction
|
|
// . UdpSlot's m_errno will be set if an error or timeout occurred
|
|
// . UdpSlot's m_readBuf/m_readBufSize will hold the reply
|
|
// . you register your request handlers w/ UdpServer::registerHandler()
|
|
// . msgs are routed to handling routines based on msgType in the dgram
|
|
// . you can change the protocol by changing UdpProtocol
|
|
// . UdpProtocol just contains many virtual datagram parsing functions
|
|
// . UdpProtocol's default protocol is the Mattster Protocol(see UdpProtocol.h)
|
|
// . Dns.h overrides UdpProtocol to form DnsProtocol
|
|
// . we can send up to ACK_WINDOW_SIZE dgrams before requiring the reception
|
|
// of the first dgram we sent's ACK
|
|
// . this ACK window helps highPing/highBandwidth connections (distant hosts)
|
|
// . readPoll(), sendAckPoll(), readTimeoutPoll() can call callbacks/handlers
|
|
|
|
#ifndef GB_UDPSERVER_H
|
|
#define GB_UDPSERVER_H
|
|
|
|
#include "UdpStatistic.h"
|
|
#include "UdpProtocol.h"
|
|
#include "GbMutex.h"
|
|
#include <inttypes.h>
|
|
#include <atomic>
|
|
|
|
|
|
static const int64_t udpserver_sendrequest_infinite_timeout = 999999999999;
|
|
|
|
class UdpSlot;
|
|
class Host;
|
|
|
|
class UdpServer {
|
|
public:
|
|
UdpServer() ;
|
|
~UdpServer() ;
|
|
|
|
// free send/readBufs for all slots
|
|
void reset();
|
|
|
|
// . returns false and sets errno on error
|
|
// . we use this udp "port" for all this server's reads/writes
|
|
// . we need to save and read out last transaction Id in a file to
|
|
// maintain the msgdb properly for incremental syncing
|
|
// . niceness of 0 is highest priority, 1 is lowest
|
|
// . read/writeBufSize are the socket buf's size
|
|
// . pollTime is how often to call timePollWrapper() (in milliseconds)
|
|
// . it should be at least the minimal slot timeout
|
|
bool init(uint16_t port, UdpProtocol *proto, int32_t readBufSize, int32_t writeBufSize, int32_t pollTime,
|
|
int32_t maxSlots, bool isDns);
|
|
|
|
// . sends a request
|
|
// . returns false and sets g_errno on error, true on success
|
|
// . callback will be called on reception of reply or on error
|
|
// . we will set errno before calling callback if an error occurred
|
|
// . errno may be ETIMEDOUT
|
|
// . we call destroySlot(slot) after calling the callback
|
|
// . NULLify slot->m_readBuf,slot->m_sendBuf if you don't want them
|
|
// to be freed by destroySlot(slot)
|
|
// . hostId is used to lookup Host's in g_hostdb to get/put resend time
|
|
// . if an ack isn't received after a certain time we resend the dgram
|
|
// . if endpoint calls sendErrorReply() to reply to you your callback
|
|
// will be called with errno set to what he passed to sendErrorReply
|
|
// . UdpSlot is like a udp socket
|
|
// . "msgType" may be stored in each dgram's header depending on
|
|
// if you're using UdpProtocol, DnsProtocol, ...
|
|
// . "msgType" is used to route the request to handling functions
|
|
// on the remote machine
|
|
// . backoff is how long to wait for an ACK in ms before we resend
|
|
// . we double backoff each time we wait w/o getting any ACK
|
|
// . don't wait longer than maxWait for a resend
|
|
// . if we try to resend a request dgram MORE than "maxResends" times,
|
|
// we do not resend it and we returns with g_errno set to ENOACK,
|
|
// indicating we have not gotten ANY ack for a dgram. if a host dies
|
|
// we typically have 10-20 seconds or so before marking it as dead.
|
|
// BUT with this method we should realize it is fruiteless in like
|
|
// 500 ms or so and Multicast or Proxy.cpp can re-send to another
|
|
// host. for niceness=0 requests the backoff is usually constant
|
|
// and set to about 30 ms. so if you set maxResends to 10 that is
|
|
// probably at least 300 ms of resending tries.
|
|
// . use an ip of 0 and port of 0 if you provide a hostId. use a hostid
|
|
// of -1 to indicate no hostid.
|
|
bool sendRequest(char *msg,
|
|
int32_t msgSize,
|
|
msg_type_t msgType,
|
|
uint32_t ip,
|
|
uint16_t port,
|
|
int32_t hostId,
|
|
UdpSlot **retSlot, // can be NULL
|
|
void *state, // callback state
|
|
void (*callback )(void *state, UdpSlot *slot),
|
|
int64_t timeout = 60000, // milliseconds
|
|
int32_t niceness = 1,
|
|
const char *extraInfo = NULL,
|
|
int32_t maxResends = -1);
|
|
|
|
// . send a reply to the host specified in "slot"
|
|
// . slot is destroyed on error or completion of the send
|
|
// . the "msg" will be freed unless slot->m_sendBufAlloc is set to NULL
|
|
// . backoff is how long to wait for an ACK in ms before we resend
|
|
// . we double backoff each time we wait w/o getting any ACK
|
|
// . don't wait longer than maxWait for a resend
|
|
void sendReply(char *msg, int32_t msgSize, char *alloc, int32_t allocSize, UdpSlot *slot, void *state = NULL,
|
|
void (*callback2)(void *state, UdpSlot *slot) = NULL);
|
|
|
|
// . propagate an errno to the requesting machine
|
|
// . his callback will be called with errno set to "errnum"
|
|
void sendErrorReply(UdpSlot *slot, int32_t errnum);
|
|
|
|
// . when a request/msg of type "msgType" is received we call the
|
|
// corresponding request handler on this machine
|
|
// . use this function to register a handler for a msgType
|
|
// . we do NOT destroy the slot after calling the handler
|
|
// . handler MUST call sendReply() or sendErrorReply() no matter what
|
|
// . returns true if handler registered successfully
|
|
// . returns false on error
|
|
// . if you want the handler to be called while in an async signal
|
|
// handler then set "isHandlerHot" to true
|
|
bool registerHandler ( msg_type_t msgType, void(* handler)(UdpSlot *,int32_t) );
|
|
|
|
// . frees the m_readBuf and m_sendBuf
|
|
// . marks the slot as available
|
|
// . called after callback called for a slot you used to send a request
|
|
// . called after sendReply()/sendErrorReply() completes or has error
|
|
void destroySlot ( UdpSlot *slot );
|
|
|
|
// . this will wait until all fully received requests have had their
|
|
// reply sent to them
|
|
// . in the meantime it will send back error replies to all new
|
|
// incoming requests
|
|
// . this will do a blocking close on the listening socket descriptor
|
|
// . returns false if blocked, true otherwise if shutdown immediate
|
|
// . set g_errno on error
|
|
bool shutdown ( bool urgent );
|
|
|
|
// try calling makeCallback() on all slots
|
|
bool makeCallbacks(int32_t niceness);
|
|
|
|
// cancel a transaction
|
|
void cancel(void *state, msg_type_t msgType);
|
|
|
|
// replace ips and ports in outstanding slots
|
|
void replaceHost ( Host *oldHost, Host *newHost );
|
|
|
|
int32_t getNumUsedSlots() const;
|
|
int32_t getNumUsedSlotsIncoming() const;
|
|
|
|
bool needBottom() const { return m_needBottom; }
|
|
|
|
bool getWriteRegistered() const { return m_writeRegistered; }
|
|
|
|
bool hasHandler(int i) const { return (m_handlers[i]); }
|
|
|
|
void saveActiveSlots(int fd, msg_type_t msg_type);
|
|
|
|
// . we need a transaction id for every transaction so we can match
|
|
// incoming reply msgs with their corresponding request msgs
|
|
// . TODO: should be stored to disk on shutdown and every 1024 sends
|
|
// . store a shutdown bit with it so we know if we crashed
|
|
// . on crashes add 1024 or so to the read value
|
|
// . TODO: make somewhat random cuz it's easy to spoof like it is now
|
|
int32_t m_nextTransId;
|
|
|
|
std::vector<UdpStatistic> getStatistics() const;
|
|
|
|
GbMutex& getLock() { return m_mtx; }
|
|
|
|
private:
|
|
static void readPollWrapper(int fd, void *state);
|
|
static void timePollWrapper(int fd, void *state);
|
|
static void sendPollWrapper(int fd, void *state);
|
|
|
|
// these *Poll() routines must be public so wrappers can call them
|
|
|
|
// . this is called by main/Loop.cpp when m_sock is ready for writing
|
|
// . actually it calls sendPollWrapper()
|
|
// . it sends as much as it can from all UdpSlots until one blocks
|
|
// or until it's done
|
|
// . sends both dgrams AND ACKs
|
|
bool sendPoll(bool allowResends, int64_t now);
|
|
|
|
// called every 30ms to get tokens? not any more...
|
|
void timePoll ( );
|
|
|
|
// called by readPoll()/sendPoll()/readTimeoutPoll() to do
|
|
// reading/sending/callbacks in that order until nothing left to do
|
|
void process(int64_t now, int32_t maxNiceness = 100);
|
|
|
|
// . this is called by main/Loop.cpp every second
|
|
// . actually it calls readTimeoutPollWrapper()
|
|
// . call the callback of reception slots that have timed out
|
|
// . return true if we did something, like reset a slot for resend
|
|
// or timed a slot out so it's callback should be called
|
|
bool readTimeoutPoll ( int64_t now ) ;
|
|
|
|
// available linked list functions (m_availableListHead)
|
|
void addToAvailableLinkedList_unlocked(UdpSlot *slot);
|
|
UdpSlot* removeFromAvailableLinkedList_unlocked();
|
|
|
|
// callback linked list functions (m_callbackListHead)
|
|
void addToCallbackLinkedList_unlocked(UdpSlot *slot);
|
|
bool isInCallbackLinkedList_unlocked(UdpSlot *slot);
|
|
void removeFromCallbackLinkedList_unlocked(UdpSlot *slot);
|
|
|
|
// active linkedlist functions (m_activeListHead)
|
|
void addToActiveLinkedList_unlocked(UdpSlot *slot);
|
|
void removeFromActiveLinkedList_unlocked(UdpSlot *slot);
|
|
|
|
// . we maintain a sequential list of transaction ids to guarantee
|
|
// uniquness to a point
|
|
// . if server is restarted this will go back to 0 though
|
|
// . the key of a UdpSlot is based on this, the endpoint ip/port and
|
|
// whether it's a request/reply by/from us
|
|
int32_t getTransId_unlocked();
|
|
|
|
void destroySlot_unlocked(UdpSlot *slot);
|
|
|
|
// . send as many dgrams as you can from slot's m_sendBuf
|
|
// . returns false and sets errno on error, true otherwise
|
|
bool doSending_unlocked(UdpSlot *slot, bool allowResends, int64_t now);
|
|
|
|
// . calls a m_handler request handler if slot->m_callback is NULL
|
|
// which means it was an incoming request
|
|
// . otherwise calls slot->m_callback because it was an outgoing
|
|
// request
|
|
bool makeCallback(UdpSlot *slot);
|
|
|
|
// . picks the slot that is most caught up to it's ACKs
|
|
// . picks resends first, however
|
|
// . then we send a dgram from that slot
|
|
UdpSlot *getBestSlotToSend_unlocked(int64_t now);
|
|
|
|
// . reads a pending dgram on the udp stack
|
|
// . returns -1 on error, 0 if blocked, 1 if completed reading dgram
|
|
// . called by readPoll()
|
|
int32_t readSock(UdpSlot **slot, int64_t now);
|
|
|
|
void sendReply_unlocked(char *msg, int32_t msgSize, char *alloc, int32_t allocSize, UdpSlot *slot, void *state = NULL,
|
|
void (*callback2)(void *state, UdpSlot *slot) = NULL);
|
|
|
|
void sendErrorReply_unlocked(UdpSlot *slot, int32_t errnum);
|
|
|
|
// . we have up to 1 handler routine for each msg type
|
|
// . call these handlers for the corresponding msgType
|
|
// . msgTypes go from 0 to 64 i think (see UdpProtocol.h dgram header)
|
|
void (* m_handlers[MAX_MSG_TYPES])(UdpSlot *slot, int32_t niceness);
|
|
|
|
mutable GbMutex m_mtx; //mutex protecting this instance.
|
|
|
|
// when a call to sendto() blocks we set this to true so Loop.cpp
|
|
// will know to manually call sendPoll() rather than counting
|
|
// on receiving a fd-ready-for-writing signal for this UdpServer
|
|
bool m_needToSend;
|
|
|
|
// our listening/sending udp socket and port
|
|
int m_sock;
|
|
uint16_t m_port;
|
|
|
|
// for defining your own protocol on top of udp
|
|
UdpProtocol *m_proto;
|
|
|
|
bool m_isShuttingDown;
|
|
|
|
// did we have to give back control before we called all of the
|
|
bool m_needBottom;
|
|
|
|
bool m_writeRegistered;
|
|
|
|
// . how many requests are we handling at this momment
|
|
// . does not include requests whose replies we are sending, only
|
|
// those whose replies have not yet been generated
|
|
// . starts counting as soon as first dgram of request is recvd
|
|
int32_t m_requestsInWaiting;
|
|
|
|
// like m_requestsInWaiting but requests which spawn other requests
|
|
int32_t m_msg07sInWaiting;
|
|
int32_t m_msg25sInWaiting;
|
|
int32_t m_msg39sInWaiting;
|
|
int32_t m_msg20sInWaiting;
|
|
int32_t m_msg0csInWaiting;
|
|
int32_t m_msg0sInWaiting;
|
|
|
|
// but alloc MAX_UDP_SLOTS of these in init so we don't blow the stack
|
|
UdpSlot *m_slots;
|
|
int32_t m_maxSlots;
|
|
|
|
// routines
|
|
UdpSlot *getEmptyUdpSlot_unlocked(key96_t k, bool incoming);
|
|
void freeUdpSlot_unlocked(UdpSlot *slot);
|
|
|
|
void addKey_unlocked(key96_t key, UdpSlot *ptr);
|
|
|
|
// verified these are only called from within _ass routines that
|
|
// turn them interrupts off before calling this
|
|
UdpSlot *getUdpSlot_unlocked(key96_t k);
|
|
|
|
// . hash table for converting keys to slots
|
|
// . if m_ptrs[i] is NULL, ith bucket is empty
|
|
UdpSlot **m_ptrs;
|
|
int32_t m_numBuckets;
|
|
uint32_t m_bucketMask;
|
|
char *m_buf; // memory to hold m_ptrs
|
|
int32_t m_bufSize;
|
|
|
|
// linked list of available slots (uses UdpSlot::m_next)
|
|
UdpSlot *m_availableListHead;
|
|
|
|
// linked list of slots in use
|
|
UdpSlot *m_activeListHead;
|
|
UdpSlot *m_activeListTail;
|
|
|
|
// linked list of callback candidates
|
|
UdpSlot *m_callbackListHead;
|
|
UdpSlot *m_callbackListTail;
|
|
|
|
int32_t m_numUsedSlots;
|
|
std::atomic<int32_t> m_numUsedSlotsIncoming;
|
|
|
|
|
|
bool m_isDns;
|
|
|
|
public:
|
|
// stats
|
|
std::atomic<int64_t> m_eth0BytesIn;
|
|
std::atomic<int64_t> m_eth0BytesOut;
|
|
std::atomic<int64_t> m_eth0PacketsIn;
|
|
std::atomic<int64_t> m_eth0PacketsOut;
|
|
std::atomic<int64_t> m_eth1BytesIn;
|
|
std::atomic<int64_t> m_eth1BytesOut;
|
|
std::atomic<int64_t> m_eth1PacketsIn;
|
|
std::atomic<int64_t> m_eth1PacketsOut;
|
|
|
|
std::atomic<int64_t> m_outsiderPacketsIn;
|
|
std::atomic<int64_t> m_outsiderPacketsOut;
|
|
std::atomic<int64_t> m_outsiderBytesIn;
|
|
std::atomic<int64_t> m_outsiderBytesOut;
|
|
};
|
|
|
|
extern class UdpServer g_udpServer;
|
|
|
|
extern int32_t g_dropped;
|
|
|
|
#endif // GB_UDPSERVER_H
|