TDME2 1.9.121
UDPClient.cpp
Go to the documentation of this file.
1#include <string.h>
2
3#include <iostream>
4#include <map>
5#include <sstream>
6#include <string>
7#include <typeinfo>
8
9#include <tdme/tdme.h>
20#include <tdme/utilities/RTTI.h>
21#include <tdme/utilities/Time.h>
22
23using std::ios;
24using std::map;
25using std::pair;
26using std::string;
27using std::stringstream;
28using std::to_string;
29
45
46const uint64_t UDPClient::MESSAGEACK_RESENDTIMES[UDPClient::MESSAGEACK_RESENDTIMES_TRIES] = {125L, 250L, 500L, 750L, 1000L, 2000L, 5000L};
47
48UDPClient::UDPClient(const string& ip, const unsigned int port) :
49 Thread("nioudpclientthread"),
50 messageQueueMutex("nioupclientthread_messagequeue"),
51 messageMapAckMutex("nioupclientthread_messagequeueack"),
52 recvMessageQueueMutex("nioupclientthread_recvmessagequeuemutex"),
53 messageMapSafeMutex("nioupclientthread_messagemasafemutex"),
54 ip(ip),
55 port(port),
56 clientId(0),
57 messageCount(0),
58 initialized(false),
59 connected(false) {
60 //
61}
62
64 return initialized;
65}
66
68 return connected;
69}
70
71const string& UDPClient::getIp() {
72 return ip;
73}
74
75const unsigned int UDPClient::getPort() {
76 return port;
77}
78
79const string& UDPClient::getClientKey() {
80 return clientKey;
81}
82
83void UDPClient::setClientKey(const string& clientKey) {
84 this->clientKey = clientKey;
85}
86
88 Console::println("UDPClient::run(): start");
89
90 // catch kernel event and server socket exceptions
91 try {
92 // create client socket
93 UDPSocket::createClientSocket(socket, NetworkSocket::determineIpVersion(ip));
94
95 // initialize kernel event mechanismn
98
99 // initialized
100 initialized = true;
101
102 // do event loop
103 uint64_t lastMessageQueueAckTime = Time::getCurrentMillis();
104 uint64_t lastMessageConnectTime = Time::getCurrentMillis();
105 uint64_t lastMessageSafeCleanTime = Time::getCurrentMillis();
106 while(isStopRequested() == false) {
107 uint64_t now = Time::getCurrentMillis();
108
109 // process connect messages every 25ms
110 if (connected == false && now >= lastMessageConnectTime + 25L) {
111 // send connection message
113 lastMessageConnectTime = now;
114 }
115
116 // process ack messages every 25ms
117 if (now >= lastMessageQueueAckTime + 25L) {
119 lastMessageQueueAckTime = now;
120 }
121
122 // process save messages clean up every 25ms
123 if (now >= lastMessageSafeCleanTime + 25L) {
125 lastMessageQueueAckTime = now;
126 }
127
128 // do kernel event mechanism
129 int events = kem.doKernelEventMechanism();
130
131 // iterate the event list
132 for(unsigned int i = 0; i < (unsigned int)events; i++) {
133 NIOInterest keInterest;
134 void* nil;
135
136 // decode kernel event
137 kem.decodeKernelEvent(i, keInterest, (void*&)nil);
138
139 // interests
140 bool hasReadInterest = (keInterest & NIO_INTEREST_READ) == NIO_INTEREST_READ;
141 bool hasWriteInterest = (keInterest & NIO_INTEREST_WRITE) == NIO_INTEREST_WRITE;
142
143 // process read interest
144 if (hasReadInterest) {
145 ssize_t bytesReceived;
146 string fromIp;
147 unsigned int fromPort;
148 char message[512];
149
150 // receive datagrams as long as its size > 0 and read would not block
151 while ((bytesReceived = socket.read(fromIp, fromPort, (void*)message, sizeof(message))) > 0) {
152 //
154 //
155 UDPClientMessage* clientMessage = UDPClientMessage::parse(message, bytesReceived);
156 try {
157 if (clientMessage == nullptr) {
158 throw NetworkClientException("invalid message");
159 }
160 switch(clientMessage->getMessageType()) {
162 {
163 processAckReceived(clientMessage->getMessageId());
164 delete clientMessage;
165 break;
166 }
168 {
172 clientMessage->getClientId(),
173 clientMessage->getMessageId(),
174 clientMessage->getRetryCount() + 1,
175 nullptr
176 ),
177 false
178 );
179 clientId = clientMessage->getClientId();
180 // read client key
181 auto frame = clientMessage->getFrame();
182 clientKey = "";
183 uint8_t clientKeySize;
184 char c;
185 frame->read((char*)&clientKeySize, 1);
186 for (uint8_t i = 0; i < clientKeySize; i++) {
187 frame->read(&c, 1);
188 clientKey+= c;
189 }
190 delete clientMessage;
191 // we are connected
192 connected = true;
193 break;
194 }
196 {
197 // check if message queue is full
199 if (recvMessageQueue.size() > 1000) {
201 throw NetworkClientException("recv message queue overflow");
202 }
203 recvMessageQueue.push(clientMessage);
205 break;
206 }
208 {
209 break;
210 }
211 }
212 } catch (Exception &exception) {
213 if (clientMessage != nullptr) delete clientMessage;
214
215 // log
216 Console::println(
217 "UDPClient::run(): " +
218 (RTTI::demangle(typeid(exception).name())) +
219 ": " +
220 (exception.what())
221 );
222
223 //
225
226 // rethrow to quit communication for now
227 // TODO: maybe find a better way to handle errors
228 // one layer up should be informed about network client problems somehow
229 throw exception;
230 }
231 }
232 }
233
234 // process write interest
235 while (hasWriteInterest) {
236 // fetch batch of messages to be send
237 MessageQueue messageQueueBatch;
239 for (int i = 0; i < MESSAGEQUEUE_SEND_BATCH_SIZE && messageQueue.empty() == false; i++) {
240 Message* message = messageQueue.front();
241 messageQueueBatch.push(message);
242 messageQueue.pop();
243 }
245
246 // try to send batch
247 while (messageQueueBatch.empty() == false) {
248 Message* message = messageQueueBatch.front();
249 if (socket.write(ip, port, (void*)message->message, message->bytes) == -1) {
250 // sending would block, stop trying to sendin
252 //
253 break;
254 } else {
255 // success, remove message from message queue batch and continue
256 Message* message = messageQueueBatch.front();
257 delete message;
258 messageQueueBatch.pop();
259 //
261 }
262 }
263
264 // re add messages not sent in batch to message queue
265 if (messageQueueBatch.empty() == true) {
267 if (messageQueue.empty() == true) {
269 socket,
272 NULL
273 );
274
275 // no more data to send, so stop the loop
276 hasWriteInterest = false;
277 }
279 } else {
281 do {
282 Message* message = messageQueueBatch.front();
283 messageQueue.push(message);
284 messageQueueBatch.pop();
285 } while (messageQueueBatch.empty() == false);
287
288 // we did not send all batched messages, so stop the loop
289 hasWriteInterest = false;
290 }
291 }
292 }
293 }
294
295 //
296 } catch (Exception &exception) {
297 // log
298 Console::println(
299 "UDPClient::run(): " +
300 (RTTI::demangle(typeid(exception).name())) +
301 ": " +
302 (exception.what())
303 );
304 }
305
306 // exit gracefully
308 socket.close();
309
310 // log
311 Console::println("UDPClient::run(): done");
312}
313
314void UDPClient::sendMessage(UDPClientMessage* clientMessage, bool safe) {
315 // create message
316 Message* message = new Message();
317 message->time = clientMessage->getTime();
318 message->messageType = clientMessage->getMessageType();
319 message->messageId = clientMessage->getMessageId();
320 message->retries = 0;
321 clientMessage->generate(message->message, message->bytes);
322 delete clientMessage;
323
324 // requires ack and retransmission ?
325 if (safe == true) {
327 MessageMapAck::iterator it;
328 // check if message has already be pushed to ack
329 it = messageMapAck.find(message->messageId);
330 if (it != messageMapAck.end()) {
331 // its on ack queue already, so unlock
333 delete message;
334 throw NetworkClientException("message already on message queue ack");
335 }
336 // check if message queue is full
337 if (messageMapAck.size() > 1000) {
339 delete message;
340 throw NetworkClientException("message queue ack overflow");
341 }
342 // push to message queue ack
343 // create message ack
344 Message* messageAck = new Message();
345 *messageAck = *message;
346 messageMapAck.insert(it, pair<uint32_t, Message*>(message->messageId, messageAck));
348 }
349
350 // push to message queue
352
353 // check if message queue is full
354 if (messageQueue.size() > 1000) {
356 delete message;
357 throw NetworkClientException("message queue overflow");
358 }
359 messageQueue.push(message);
360
361 // set nio interest
362 if (messageQueue.size() == 1) {
364 socket,
367 nullptr
368 );
369 }
370
371 // done
373}
374
375void UDPClient::processAckReceived(const uint32_t messageId) {
376 bool messageAckValid = true;
377 MessageMapAck::iterator iterator;
378
379 // delete message from message queue ack
381 iterator = messageMapAck.find(messageId);
382 if (iterator != messageMapAck.end()) {
383 // message ack valid?
384 messageAckValid = true; //messageAck->ip == client->ip && messageAck->port == client->port;
385 // remove if valid
386 if (messageAckValid == true) {
387 // remove message from message queue ack
388 delete iterator->second;
389 messageMapAck.erase(iterator);
390 }
391 }
393
394 // check if message ack was valid
395 if (messageAckValid == false) {
396 throw NetworkClientException("message ack invalid");
397 }
398}
399
401 MessageQueue messageQueueResend;
402 uint64_t now = Time::getCurrentMillis();
403
405 MessageMapAck::iterator it = messageMapAck.begin();
406 while (it != messageMapAck.end()) {
407 Message* messageAck = it->second;
408 // message ack timed out?
409 // most likely the client is gone
410 if (messageAck->retries == MESSAGEACK_RESENDTIMES_TRIES) {
411 // delete from message map ack
412 delete it->second;
413 messageMapAck.erase(it++);
414 // skip
415 continue;
416 } else
417 // message should be resend?
418 if (now > (messageAck->time + (MESSAGEACK_RESENDTIMES[messageAck->retries]))) {
419 // increase tries
420 messageAck->retries++;
421
422 // construct message
423 Message* message = new Message();
424 *message = *messageAck;
425
426 // parse client message from message raw data
427 UDPClientMessage* clientMessage = UDPClientMessage::parse(message->message, message->bytes);
428
429 // increase/set retry
430 clientMessage->retry();
431
432 // recreate message
433 clientMessage->generate(message->message, message->bytes);
434
435 // delete client message
436 delete clientMessage;
437
438 // and push to be resent
439 messageQueueResend.push(message);
440 }
441 ++it;
442 }
444
445 // reissue messages to be resent
446 if (messageQueueResend.empty() == false) {
448 do {
449 Message* message = messageQueueResend.front();
450 messageQueue.push(message);
451 messageQueueResend.pop();
452
453 // set nio interest
454 if (messageQueue.size() == 1) {
456 socket,
459 NULL
460 );
461 }
462 } while (messageQueueResend.empty() == false);
464 }
465}
466
468 bool messageProcessed = false;
469 MessageMapSafe::iterator it;
470 auto messageId = clientMessage->getMessageId();
471
472 //
474
475 // check if message has been already processed
476 it = messageMapSafe.find(messageId);
477 if (it != messageMapSafe.end()) {
478 // yep, we did
479 messageProcessed = true;
480 SafeMessage* message = it->second;
481 message->receptions++;
482 } else {
483 // nope, just remember message
484 SafeMessage* message = new SafeMessage();
485 message->messageId = messageId;
486 message->receptions = 1;
487 message->time = Time::getCurrentMillis();
488 // TODO: check for overflow
489 messageMapSafe.insert(it, pair<uint32_t, SafeMessage*>(messageId, message));
490 }
491
492 //
494
495 // always send ack
499 clientId,
500 clientMessage->getMessageId(),
501 0,
502 nullptr
503 ),
504 false
505 );
506
507 // return if message should be processed
508 return messageProcessed == true?false:true;
509}
510
511
513 //
515
516 // check if message has been already processed
517 uint64_t now = Time::getCurrentMillis();
518 MessageMapSafe::iterator it = messageMapSafe.begin();
519 while (it != messageMapSafe.end()) {
520 SafeMessage* message = it->second;
521 if (message->time < now - MESSAGESSAFE_KEEPTIME) {
522 delete it->second;
523 messageMapSafe.erase(it++);
524 continue;
525 }
526 ++it;
527 }
528
529 //
531}
532
533uint64_t UDPClient::getRetryTime(const uint8_t retries) {
534 if (retries == 0) return 0L;
535 if (retries > UDPClient::MESSAGEACK_RESENDTIMES_TRIES) return 0L;
536 return UDPClient::MESSAGEACK_RESENDTIMES[retries - 1];
537}
538
540 UDPClientMessage* message = nullptr;
542 if (recvMessageQueue.empty() == false) {
543 message = recvMessageQueue.front();
544 recvMessageQueue.pop();
545 }
547 return message;
548}
549
551 return new UDPClientMessage(
553 clientId,
554 messageCount++,
555 0,
556 frame
557 );
558}
559
561 auto stats = statistics;
562 statistics.time = Time::getCurrentMillis();
564 statistics.sent = 0;
565 statistics.errors = 0;
566 return stats;
567}
Base exception class for network client exceptions.
void generate(char message[512], size_t &bytes)
Generate datagram.
void retry()
Mark message to be resend with increased retry count.
static UDPClientMessage * parse(const char message[512], const size_t bytes)
Parse.
void cleanUpSafeMessages()
Clean up safe messages.
Definition: UDPClient.cpp:512
static const int MESSAGEACK_RESENDTIMES_TRIES
Definition: UDPClient.h:157
virtual void run()
Run thread program.
Definition: UDPClient.cpp:87
void processAckReceived(const uint32_t messageId)
Processes ack reveived.
Definition: UDPClient.cpp:375
void processAckMessages()
Process ack messages.
Definition: UDPClient.cpp:400
queue< Message * > MessageQueue
Definition: UDPClient.h:169
void setClientKey(const string &clientKey)
Set client key.
Definition: UDPClient.cpp:83
static const uint64_t MESSAGESSAFE_KEEPTIME
Definition: UDPClient.h:173
const UDPClient_Statistics getStatistics()
Definition: UDPClient.cpp:560
static uint64_t getRetryTime(const uint8_t retries)
Get retry time for given retry count.
Definition: UDPClient.cpp:533
UDPClientMessage * createMessage(stringstream *frame)
Create message.
Definition: UDPClient.cpp:550
static const int MESSAGEQUEUE_SEND_BATCH_SIZE
Definition: UDPClient.h:159
void sendMessage(UDPClientMessage *clientMessage, bool safe)
Pushes a message to be send, takes over ownership of message.
Definition: UDPClient.cpp:314
const unsigned int getPort()
Definition: UDPClient.cpp:75
static const uint64_t MESSAGEACK_RESENDTIMES[MESSAGEACK_RESENDTIMES_TRIES]
Definition: UDPClient.h:158
bool processSafeMessage(UDPClientMessage *clientMessage)
Returns if a message should be processed or already have been processed.
Definition: UDPClient.cpp:467
UDPClientMessage * receiveMessage()
Receive message.
Definition: UDPClient.cpp:539
UDPClient_Statistics statistics
Definition: UDPClient.h:198
Interface to kernel event mechanismns.
void shutdownKernelEventMechanism()
shutdowns the kernel event mechanism
void setSocketInterest(const NetworkSocket &socket, const NIOInterest lastInterest, const NIOInterest interest, const void *cookie)
sets a non blocked socket io interest
int doKernelEventMechanism()
do the kernel event mechanism
void initKernelEventMechanism(const unsigned int maxCCU)
initializes the kernel event mechanism
void decodeKernelEvent(const unsigned int index, NIOInterest &interest, void *&cookie)
decodes a kernel event
Base class of network sockets.
Definition: NetworkSocket.h:17
void close()
Closes the socket.
ssize_t read(string &from, unsigned int &port, void *buf, const size_t bytes)
reads a datagram from socket
Definition: UDPSocket.cpp:39
ssize_t write(const string &to, const unsigned int port, void *buf, const size_t bytes)
writes up to "bytes" bytes to socket
Definition: UDPSocket.cpp:105
Mutex implementation.
Definition: Mutex.h:27
void unlock()
Unlocks this mutex.
Definition: Mutex.cpp:48
void lock()
Locks the mutex, additionally mutex locks will block until other locks have been unlocked.
Definition: Mutex.cpp:39
Base class for threads.
Definition: Thread.h:26
bool isStopRequested()
Returns if stop has been requested.
Definition: Thread.cpp:98
Console class.
Definition: Console.h:26
Run time type information utility class.
Definition: RTTI.h:14
Time utility class.
Definition: Time.h:21
const NIOInterest NIO_INTEREST_NONE
Definition: NIOInterest.h:11
const NIOInterest NIO_INTEREST_READ
Definition: NIOInterest.h:12
uint8_t NIOInterest
type definiton for network UI interest
Definition: NIOInterest.h:10
const NIOInterest NIO_INTEREST_WRITE
Definition: NIOInterest.h:13
std::exception Exception
Exception base class.
Definition: Exception.h:19