42const uint64_t UDPServerIOThread::MESSAGEACK_RESENDTIMES[UDPServerIOThread::MESSAGEACK_RESENDTIMES_TRIES] = {125L, 250L, 500L, 750L, 1000L, 2000L, 5000L};
44UDPServerIOThread::UDPServerIOThread(
const unsigned int id,
UDPServer *server,
const unsigned int maxCCU) :
45 Thread(
"nioudpserveriothread"),
49 messageQueueMutex(
"nioupserveriothread_messagequeue"),
50 messageMapAckMutex(
"nioupserveriothread_messagequeueack") {
55 Console::println(
"UDPServerIOThread[" + to_string(
id) +
"]::run(): start");
70 uint64_t lastMessageQueueAckTime = Time::getCurrentMillis();
72 uint64_t now = Time::getCurrentMillis();
75 if (now >= lastMessageQueueAckTime + 25L) {
77 lastMessageQueueAckTime = now;
84 for(
unsigned int i = 0; i < (
unsigned int)events; i++) {
96 if (hasReadInterest) {
97 ssize_t bytesReceived;
103 while ((bytesReceived =
socket.
read(ip, port, (
void*)message,
sizeof(message))) > 0) {
110 stringstream* frame = NULL;
113 frame =
new stringstream();
114 frame->write(message, bytesReceived);
124 server->
identify(frame, messageType, clientId, messageId, retries);
127 switch(messageType) {
135 if (client != NULL) {
185 if (client->
ip != ip || client->
port != port) {
207 if (frame != NULL)
delete frame;
211 "UDPServerIOThread[" +
214 (RTTI::demangle(
typeid(exception).
name())) +
219 if (clientNew != NULL) {
223 if (client != NULL) {
234 while (hasWriteInterest) {
240 messageQueueBatch.push(message);
246 while (messageQueueBatch.empty() ==
false) {
247 Message* message = messageQueueBatch.front();
255 messageQueueBatch.pop();
262 if (messageQueueBatch.empty() ==
true) {
273 hasWriteInterest =
false;
279 Message* message = messageQueueBatch.front();
281 messageQueueBatch.pop();
282 }
while (messageQueueBatch.empty() ==
false);
286 hasWriteInterest =
false;
296 "UDPServerIOThread[" +
299 (RTTI::demangle(
typeid(exception).
name())) +
310 Console::println(
"UDPServerIOThread[" + to_string(
id) +
"]::run(): done");
319 frame->seekg(0, ios_base::beg);
320 frame->seekp(0, ios_base::end);
324 message->
ip = client->
ip;
326 message->
time = Time::getCurrentMillis();
331 message->
bytes = frame->tellp();
332 if (message->
bytes > 512) message->
bytes = 512;
335 if (deleteFrame ==
true)
delete frame;
340 MessageMapAck::iterator it;
358 *messageAck = *message;
359 messageMapAck.insert(it, pair<uint32_t, Message*>(messageId, messageAck));
389 bool messageAckValid =
true;
390 MessageMapAck::iterator iterator;
397 Message* messageAck = iterator->second;
399 messageAckValid = messageAck->
ip == client->
ip && messageAck->
port == client->
port;
401 if (messageAckValid ==
true) {
403 delete iterator->second;
413 if (messageAckValid ==
false) {
420 uint64_t now = Time::getCurrentMillis();
425 Message* messageAck = it->second;
442 *message = *messageAck;
451 messageQueueResend.push(message);
458 if (messageQueueResend.empty() ==
false) {
461 Message* message = messageQueueResend.front();
463 messageQueueResend.pop();
474 }
while (messageQueueResend.empty() ==
false);
Base exception class for network server exceptions.
Base class for network UDP server clients.
void init()
initiates this network client
void shutdown()
Shuts down this network client.
const string & getKey() const
Client identification key.
virtual void onFrameReceived(stringstream *frame, const uint32_t messageId=0, const uint8_t retries=0)
Event, which will be called if frame has been received, defaults to worker thread pool.
const bool setKey(const string &key)
sets the clients identification key
void sendConnected()
Sends an connect message to client.
UDP Network server IO thread.
void sendMessage(const UDPServerClient *client, const uint8_t messageType, const uint32_t messageId, stringstream *frame, const bool safe, const bool deleteFrame)
pushes a message to be send, takes over ownership of frame
static const int MESSAGEACK_RESENDTIMES_TRIES
virtual void run()
thread program
MessageQueue messageQueue
MessageMapAck messageMapAck
void processAckMessages()
Clean up timed out safe messages, reissue messages not beeing acknowlegded from client.
void processAckReceived(UDPServerClient *client, const uint32_t messageId)
Processes an acknowlegdement reception.
queue< Message * > MessageQueue
static const int MESSAGEQUEUE_SEND_BATCH_SIZE
static const uint64_t MESSAGEACK_RESENDTIMES[MESSAGEACK_RESENDTIMES_TRIES]
Base class for network UDP servers.
UDPServer_Statistics statistics
void processAckReceived(UDPServerClient *client, const uint32_t messageId)
Processes an acknowlegdement reception.
void addClient(UDPServerClient *client)
maps a new client to a given client id
UDPServerClient * lookupClient(const uint32_t clientId)
Look ups a client by client id.
virtual void identify(stringstream *frame, MessageType &messageType, uint32_t &connectionId, uint32_t &messageId, uint8_t &retries)
Identifies a client message.
UDPServerClient * getClientByIp(const string &ip, const unsigned int port)
Returns client by host name and port.
@ MESSAGETYPE_ACKNOWLEDGEMENT
virtual void validate(stringstream *frame)
Validates a client message.
const uint32_t allocateClientId()
Allocates a client id for a new client.
virtual UDPServerClient * accept(const uint32_t clientId, const std::string &ip, const unsigned int port)
method to implement for accepting clients
virtual void writeHeader(stringstream *frame, MessageType messageType, const uint32_t clientId, const uint32_t messageId, const uint8_t retries)
Writes a message header to message.
Interface to kernel event mechanismns.
void shutdownKernelEventMechanism()
shutdowns the kernel event mechanism
void setSocketInterest(const NetworkSocket &socket, const NIOInterest lastInterest, const NIOInterest interest, const void *cookie)
sets a non blocked socket io interest
int doKernelEventMechanism()
do the kernel event mechanism
void initKernelEventMechanism(const unsigned int maxCCU)
initializes the kernel event mechanism
void decodeKernelEvent(const unsigned int index, NIOInterest &interest, void *&cookie)
decodes a kernel event
void close()
Closes the socket.
ssize_t read(string &from, unsigned int &port, void *buf, const size_t bytes)
reads a datagram from socket
ssize_t write(const string &to, const unsigned int port, void *buf, const size_t bytes)
writes up to "bytes" bytes to socket
bool wait()
Waits on barrier.
void unlock()
Unlocks this mutex.
void lock()
Locks the mutex, additionally mutex locks will block until other locks have been unlocked.
bool isStopRequested()
Returns if stop has been requested.
Run time type information utility class.
void releaseReference()
releases a reference, thus decrementing the counter and delete it if reference counter is zero
const NIOInterest NIO_INTEREST_NONE
const NIOInterest NIO_INTEREST_READ
uint8_t NIOInterest
type definiton for network UI interest
const NIOInterest NIO_INTEREST_WRITE
std::exception Exception
Exception base class.