TDME2 1.9.121
UDPServerIOThread.cpp
Go to the documentation of this file.
1#include <string.h>
2
3#include <iostream>
4#include <map>
5#include <string>
6#include <typeinfo>
7
8#include <tdme/tdme.h>
18#include <tdme/utilities/RTTI.h>
19#include <tdme/utilities/Time.h>
20
21using std::ios_base;
22using std::map;
23using std::pair;
24using std::string;
25using std::to_string;
26
41
42const uint64_t UDPServerIOThread::MESSAGEACK_RESENDTIMES[UDPServerIOThread::MESSAGEACK_RESENDTIMES_TRIES] = {125L, 250L, 500L, 750L, 1000L, 2000L, 5000L};
43
44UDPServerIOThread::UDPServerIOThread(const unsigned int id, UDPServer *server, const unsigned int maxCCU) :
45 Thread("nioudpserveriothread"),
46 id(id),
47 server(server),
48 maxCCU(maxCCU),
49 messageQueueMutex("nioupserveriothread_messagequeue"),
50 messageMapAckMutex("nioupserveriothread_messagequeueack") {
51 //
52}
53
55 Console::println("UDPServerIOThread[" + to_string(id) + "]::run(): start");
56
57 // wait on startup barrier
59
60 // catch kernel event and server socket exceptions
61 try {
62 // create server socket
63 UDPSocket::createServerSocket(socket, server->host, server->port);
64
65 // initialize kernel event mechanismn
68
69 // do event loop
70 uint64_t lastMessageQueueAckTime = Time::getCurrentMillis();
71 while(isStopRequested() == false) {
72 uint64_t now = Time::getCurrentMillis();
73
74 // process ack messages every 25ms
75 if (now >= lastMessageQueueAckTime + 25L) {
77 lastMessageQueueAckTime = now;
78 }
79
80 // do kernel event mechanism
81 int events = kem.doKernelEventMechanism();
82
83 // iterate the event list
84 for(unsigned int i = 0; i < (unsigned int)events; i++) {
85 NIOInterest keInterest;
86 void* nil;
87
88 // decode kernel event
89 kem.decodeKernelEvent(i, keInterest, (void*&)nil);
90
91 // interests
92 bool hasReadInterest = (keInterest & NIO_INTEREST_READ) == NIO_INTEREST_READ;
93 bool hasWriteInterest = (keInterest & NIO_INTEREST_WRITE) == NIO_INTEREST_WRITE;
94
95 // process read interest
96 if (hasReadInterest) {
97 ssize_t bytesReceived;
98 string ip;
99 unsigned int port;
100 char message[512];
101
102 // receive datagrams as long as its size > 0 and read would not block
103 while ((bytesReceived = socket.read(ip, port, (void*)message, sizeof(message))) > 0) {
104 //
105 AtomicOperations::increment(server->statistics.received);
106
107 // process event, catch and handle client related exceptions
108 UDPServerClient* client = NULL;
109 UDPServerClient* clientNew = NULL;
110 stringstream* frame = NULL;
111 try {
112 // transfer buffer to string stream
113 frame = new stringstream();
114 frame->write(message, bytesReceived);
115
116 // validate datagram
117 server->validate(frame);
118
119 // identify datagram
120 UDPServer::MessageType messageType;
121 uint32_t clientId;
122 uint32_t messageId;
123 uint8_t retries;
124 server->identify(frame, messageType, clientId, messageId, retries);
125
126 // process message depending on messageType
127 switch(messageType) {
129 {
130 //
131 AtomicOperations::increment(server->statistics.accepts);
132
133 // check if client is connected already
134 client = server->getClientByIp(ip, port);
135 if (client != NULL) {
136 // delete frame
137 delete frame;
138 frame = NULL;
139 client->sendConnected();
140 client->releaseReference();
141 // we are done
142 break;
143 }
144
145 // create client
146 clientNew = server->accept(
148 ip,
149 port
150 );
151
152 // assign server
153 clientNew->server = server;
154
155 // add client to server
156 server->addClient(clientNew);
157
158 // delete frame
159 delete frame;
160 frame = NULL;
161
162 // switch from client new to client
163 client = clientNew;
164 clientNew = NULL;
165
166 // send connected ack
167 client->sendConnected();
168
169 // set/register client in Server
170 if (client->setKey(client->getKey()) == false) {
171 throw NetworkServerException("Client key is already in use");
172 }
173
174 // fire on init
175 client->init();
176
177 // we are done
178 break;
179 }
181 {
182 // look up client
183 client = server->lookupClient(clientId);
184 // check if client ip, port matches datagram ip and prt
185 if (client->ip != ip || client->port != port) {
186 //
187 client->releaseReference();
188 throw NetworkServerException("message invalid");
189 }
190 // delegate
191 client->onFrameReceived(frame, messageId, retries);
192 break;
193 }
195 {
196 client = server->lookupClient(clientId);
197 server->processAckReceived(client, messageId);
198 delete frame;
199 frame = NULL;
200 break;
201 }
202 default:
203 throw NetworkServerException("Invalid message type");
204 }
205 } catch(Exception& exception) {
206 // delete frame
207 if (frame != NULL) delete frame;
208
209 // log
210 Console::println(
211 "UDPServerIOThread[" +
212 to_string(id) +
213 "]::run(): " +
214 (RTTI::demangle(typeid(exception).name())) +
215 ": " +
216 (exception.what())
217 );
218
219 if (clientNew != NULL) {
220 delete clientNew;
221 }
222 // in case it was a client related exception
223 if (client != NULL) {
224 // otherwise shut down client
225 client->shutdown();
226 }
227 //
228 AtomicOperations::increment(server->statistics.errors);
229 }
230 }
231 }
232
233 // process write interest
234 while (hasWriteInterest) {
235 // fetch batch of messages to be send
236 MessageQueue messageQueueBatch;
238 for (int i = 0; i < MESSAGEQUEUE_SEND_BATCH_SIZE && messageQueue.empty() == false; i++) {
239 Message* message = messageQueue.front();
240 messageQueueBatch.push(message);
241 messageQueue.pop();
242 }
244
245 // try to send batch
246 while (messageQueueBatch.empty() == false) {
247 Message* message = messageQueueBatch.front();
248 if (socket.write(message->ip, message->port, (void*)message->message, message->bytes) == -1) {
249 // sending would block, stop trying to sendin
250 AtomicOperations::increment(server->statistics.errors);
251 break;
252 } else {
253 // success, remove message from message queue batch and continue
254 delete message;
255 messageQueueBatch.pop();
256 //
257 AtomicOperations::increment(server->statistics.sent);
258 }
259 }
260
261 // re add messages not sent in batch to message queue
262 if (messageQueueBatch.empty() == true) {
264 if (messageQueue.empty() == true) {
266 socket,
269 NULL
270 );
271
272 // no more data to send, so stop the loop
273 hasWriteInterest = false;
274 }
276 } else {
278 do {
279 Message* message = messageQueueBatch.front();
280 messageQueue.push(message);
281 messageQueueBatch.pop();
282 } while (messageQueueBatch.empty() == false);
284
285 // we did not send all batched messages, so stop the loop
286 hasWriteInterest = false;
287 }
288 }
289 }
290 }
291
292 //
293 } catch (Exception &exception) {
294 // log
295 Console::println(
296 "UDPServerIOThread[" +
297 to_string(id) +
298 "]::run(): " +
299 (RTTI::demangle(typeid(exception).name())) +
300 ": " +
301 (exception.what())
302 );
303 }
304
305 // exit gracefully
307 socket.close();
308
309 // log
310 Console::println("UDPServerIOThread[" + to_string(id) + "]::run(): done");
311}
312
313void UDPServerIOThread::sendMessage(const UDPServerClient* client, const uint8_t messageType, const uint32_t messageId, stringstream* frame, const bool safe, const bool deleteFrame) {
314 // FIXME:
315 // We could use lock free queues here
316 // For now, we will go with plain mutexes
317
318 // reset stream for read
319 frame->seekg(0, ios_base::beg);
320 frame->seekp(0, ios_base::end);
321
322 // create message
323 Message* message = new Message();
324 message->ip = client->ip;
325 message->port = client->port;
326 message->time = Time::getCurrentMillis();
327 message->messageType = messageType;
328 message->clientId = client->clientId;
329 message->messageId = messageId;
330 message->retries = 0;
331 message->bytes = frame->tellp();
332 if (message->bytes > 512) message->bytes = 512;
333 frame->read(message->message, message->bytes);
334
335 if (deleteFrame == true) delete frame;
336
337 // requires ack and retransmission ?
338 if (safe == true) {
340 MessageMapAck::iterator it;
341 // check if message has already be pushed to ack
342 it = messageMapAck.find(messageId);
343 if (it != messageMapAck.end()) {
344 // its on ack queue already, so unlock
346 delete message;
347 throw NetworkServerException("message already on message queue ack");
348 }
349 // check if message queue is full
350 if (messageMapAck.size() > maxCCU * 20) {
352 delete message;
353 throw NetworkServerException("message queue ack overflow");
354 }
355 // push to message queue ack
356 // create message ack
357 Message* messageAck = new Message();
358 *messageAck = *message;
359 messageMapAck.insert(it, pair<uint32_t, Message*>(messageId, messageAck));
361 }
362
363 // push to message queue
365
366 // check if message queue is full
367 if (messageQueue.size() > maxCCU * 20) {
369 delete message;
370 throw NetworkServerException("message queue overflow");
371 }
372 messageQueue.push(message);
373
374 // set nio interest
375 if (messageQueue.size() == 1) {
377 socket,
380 NULL
381 );
382 }
383
384 // done
386}
387
388void UDPServerIOThread::processAckReceived(UDPServerClient* client, const uint32_t messageId) {
389 bool messageAckValid = true;
390 MessageMapAck::iterator iterator;
391
392 // delete message from message queue ack
394 iterator = messageMapAck.find(messageId);
395 if (iterator != messageMapAck.end()) {
396 // message exists
397 Message* messageAck = iterator->second;
398 // message ack valid?
399 messageAckValid = messageAck->ip == client->ip && messageAck->port == client->port;
400 // remove if valid
401 if (messageAckValid == true) {
402 // remove message from message queue ack
403 delete iterator->second;
404 messageMapAck.erase(iterator);
405 }
406 }
408
409 //
410 client->releaseReference();
411
412 // check if message ack was valid
413 if (messageAckValid == false) {
414 throw NetworkServerException("message ack invalid");
415 }
416}
417
419 MessageQueue messageQueueResend;
420 uint64_t now = Time::getCurrentMillis();
421
423 MessageMapAck::iterator it = messageMapAck.begin();
424 while (it != messageMapAck.end()) {
425 Message* messageAck = it->second;
426 // message ack timed out?
427 // most likely the client is gone
428 if (messageAck->retries == MESSAGEACK_RESENDTIMES_TRIES) {
429 // delete from message map ack
430 delete it->second;
431 messageMapAck.erase(it++);
432 // skip
433 continue;
434 } else
435 // message should be resend?
436 if (now > (messageAck->time + (MESSAGEACK_RESENDTIMES[messageAck->retries]))) {
437 // increase tries
438 messageAck->retries++;
439
440 // construct message
441 Message* message = new Message();
442 *message = *messageAck;
443
444 // recreate frame header with updated hash and retries
445 stringstream frame;
446 frame.write(message->message, message->bytes);
447 server->writeHeader(&frame, (UDPServer::MessageType)message->messageType, message->clientId, message->messageId, message->retries);
448 frame.read(message->message, message->bytes);
449
450 // and push to be resent
451 messageQueueResend.push(message);
452 }
453 ++it;
454 }
456
457 // reissue messages to be resent
458 if (messageQueueResend.empty() == false) {
460 do {
461 Message* message = messageQueueResend.front();
462 messageQueue.push(message);
463 messageQueueResend.pop();
464
465 // set nio interest
466 if (messageQueue.size() == 1) {
468 socket,
471 NULL
472 );
473 }
474 } while (messageQueueResend.empty() == false);
476 }
477}
Base exception class for network server exceptions.
Base class for network UDP server clients.
void init()
initiates this network client
void shutdown()
Shuts down this network client.
const string & getKey() const
Client identification key.
virtual void onFrameReceived(stringstream *frame, const uint32_t messageId=0, const uint8_t retries=0)
Event, which will be called if frame has been received, defaults to worker thread pool.
const bool setKey(const string &key)
sets the clients identification key
void sendConnected()
Sends an connect message to client.
void sendMessage(const UDPServerClient *client, const uint8_t messageType, const uint32_t messageId, stringstream *frame, const bool safe, const bool deleteFrame)
pushes a message to be send, takes over ownership of frame
void processAckMessages()
Clean up timed out safe messages, reissue messages not beeing acknowlegded from client.
void processAckReceived(UDPServerClient *client, const uint32_t messageId)
Processes an acknowlegdement reception.
static const uint64_t MESSAGEACK_RESENDTIMES[MESSAGEACK_RESENDTIMES_TRIES]
Base class for network UDP servers.
Definition: UDPServer.h:39
UDPServer_Statistics statistics
Definition: UDPServer.h:213
void processAckReceived(UDPServerClient *client, const uint32_t messageId)
Processes an acknowlegdement reception.
Definition: UDPServer.cpp:490
void addClient(UDPServerClient *client)
maps a new client to a given client id
Definition: UDPServer.cpp:280
UDPServerClient * lookupClient(const uint32_t clientId)
Look ups a client by client id.
Definition: UDPServer.cpp:389
virtual void identify(stringstream *frame, MessageType &messageType, uint32_t &connectionId, uint32_t &messageId, uint8_t &retries)
Identifies a client message.
Definition: UDPServer.cpp:167
UDPServerClient * getClientByIp(const string &ip, const unsigned int port)
Returns client by host name and port.
Definition: UDPServer.cpp:423
virtual void validate(stringstream *frame)
Validates a client message.
Definition: UDPServer.cpp:226
const uint32_t allocateClientId()
Allocates a client id for a new client.
Definition: UDPServer.cpp:495
virtual UDPServerClient * accept(const uint32_t clientId, const std::string &ip, const unsigned int port)
method to implement for accepting clients
Definition: UDPServer.cpp:163
virtual void writeHeader(stringstream *frame, MessageType messageType, const uint32_t clientId, const uint32_t messageId, const uint8_t retries)
Writes a message header to message.
Definition: UDPServer.cpp:241
Interface to kernel event mechanismns.
void shutdownKernelEventMechanism()
shutdowns the kernel event mechanism
void setSocketInterest(const NetworkSocket &socket, const NIOInterest lastInterest, const NIOInterest interest, const void *cookie)
sets a non blocked socket io interest
int doKernelEventMechanism()
do the kernel event mechanism
void initKernelEventMechanism(const unsigned int maxCCU)
initializes the kernel event mechanism
void decodeKernelEvent(const unsigned int index, NIOInterest &interest, void *&cookie)
decodes a kernel event
void close()
Closes the socket.
ssize_t read(string &from, unsigned int &port, void *buf, const size_t bytes)
reads a datagram from socket
Definition: UDPSocket.cpp:39
ssize_t write(const string &to, const unsigned int port, void *buf, const size_t bytes)
writes up to "bytes" bytes to socket
Definition: UDPSocket.cpp:105
bool wait()
Waits on barrier.
Definition: Barrier.cpp:31
Mutex implementation.
Definition: Mutex.h:27
void unlock()
Unlocks this mutex.
Definition: Mutex.cpp:48
void lock()
Locks the mutex, additionally mutex locks will block until other locks have been unlocked.
Definition: Mutex.cpp:39
Base class for threads.
Definition: Thread.h:26
bool isStopRequested()
Returns if stop has been requested.
Definition: Thread.cpp:98
Console class.
Definition: Console.h:26
Run time type information utility class.
Definition: RTTI.h:14
void releaseReference()
releases a reference, thus decrementing the counter and delete it if reference counter is zero
Definition: Reference.cpp:20
Time utility class.
Definition: Time.h:21
const NIOInterest NIO_INTEREST_NONE
Definition: NIOInterest.h:11
const NIOInterest NIO_INTEREST_READ
Definition: NIOInterest.h:12
uint8_t NIOInterest
type definiton for network UI interest
Definition: NIOInterest.h:10
const NIOInterest NIO_INTEREST_WRITE
Definition: NIOInterest.h:13
std::exception Exception
Exception base class.
Definition: Exception.h:19