27using std::stringstream;
46const uint64_t UDPClient::MESSAGEACK_RESENDTIMES[UDPClient::MESSAGEACK_RESENDTIMES_TRIES] = {125L, 250L, 500L, 750L, 1000L, 2000L, 5000L};
48UDPClient::UDPClient(
const string& ip,
const unsigned int port) :
49 Thread(
"nioudpclientthread"),
50 messageQueueMutex(
"nioupclientthread_messagequeue"),
51 messageMapAckMutex(
"nioupclientthread_messagequeueack"),
52 recvMessageQueueMutex(
"nioupclientthread_recvmessagequeuemutex"),
53 messageMapSafeMutex(
"nioupclientthread_messagemasafemutex"),
88 Console::println(
"UDPClient::run(): start");
93 UDPSocket::createClientSocket(
socket, NetworkSocket::determineIpVersion(
ip));
103 uint64_t lastMessageQueueAckTime = Time::getCurrentMillis();
104 uint64_t lastMessageConnectTime = Time::getCurrentMillis();
105 uint64_t lastMessageSafeCleanTime = Time::getCurrentMillis();
107 uint64_t now = Time::getCurrentMillis();
110 if (
connected ==
false && now >= lastMessageConnectTime + 25L) {
113 lastMessageConnectTime = now;
117 if (now >= lastMessageQueueAckTime + 25L) {
119 lastMessageQueueAckTime = now;
123 if (now >= lastMessageSafeCleanTime + 25L) {
125 lastMessageQueueAckTime = now;
132 for(
unsigned int i = 0; i < (
unsigned int)events; i++) {
144 if (hasReadInterest) {
145 ssize_t bytesReceived;
147 unsigned int fromPort;
151 while ((bytesReceived =
socket.
read(fromIp, fromPort, (
void*)message,
sizeof(message))) > 0) {
157 if (clientMessage ==
nullptr) {
164 delete clientMessage;
181 auto frame = clientMessage->
getFrame();
183 uint8_t clientKeySize;
185 frame->read((
char*)&clientKeySize, 1);
186 for (uint8_t i = 0; i < clientKeySize; i++) {
190 delete clientMessage;
213 if (clientMessage !=
nullptr)
delete clientMessage;
217 "UDPClient::run(): " +
218 (RTTI::demangle(
typeid(exception).
name())) +
235 while (hasWriteInterest) {
241 messageQueueBatch.push(message);
247 while (messageQueueBatch.empty() ==
false) {
248 Message* message = messageQueueBatch.front();
256 Message* message = messageQueueBatch.front();
258 messageQueueBatch.pop();
265 if (messageQueueBatch.empty() ==
true) {
276 hasWriteInterest =
false;
282 Message* message = messageQueueBatch.front();
284 messageQueueBatch.pop();
285 }
while (messageQueueBatch.empty() ==
false);
289 hasWriteInterest =
false;
299 "UDPClient::run(): " +
300 (RTTI::demangle(
typeid(exception).
name())) +
311 Console::println(
"UDPClient::run(): done");
322 delete clientMessage;
327 MessageMapAck::iterator it;
345 *messageAck = *message;
376 bool messageAckValid =
true;
377 MessageMapAck::iterator iterator;
384 messageAckValid =
true;
386 if (messageAckValid ==
true) {
388 delete iterator->second;
395 if (messageAckValid ==
false) {
402 uint64_t now = Time::getCurrentMillis();
407 Message* messageAck = it->second;
424 *message = *messageAck;
430 clientMessage->
retry();
436 delete clientMessage;
439 messageQueueResend.push(message);
446 if (messageQueueResend.empty() ==
false) {
449 Message* message = messageQueueResend.front();
451 messageQueueResend.pop();
462 }
while (messageQueueResend.empty() ==
false);
468 bool messageProcessed =
false;
469 MessageMapSafe::iterator it;
479 messageProcessed =
true;
487 message->
time = Time::getCurrentMillis();
489 messageMapSafe.insert(it, pair<uint32_t, SafeMessage*>(messageId, message));
508 return messageProcessed ==
true?
false:
true;
517 uint64_t now = Time::getCurrentMillis();
534 if (retries == 0)
return 0L;
Base exception class for network client exceptions.
const uint32_t getMessageId()
void generate(char message[512], size_t &bytes)
Generate datagram.
void retry()
Mark message to be resend with increased retry count.
const uint8_t getRetryCount()
const uint32_t getClientId()
stringstream * getFrame()
@ MESSAGETYPE_ACKNOWLEDGEMENT
static UDPClientMessage * parse(const char message[512], const size_t bytes)
Parse.
const MessageType getMessageType()
void cleanUpSafeMessages()
Clean up safe messages.
static const int MESSAGEACK_RESENDTIMES_TRIES
virtual void run()
Run thread program.
MessageQueue messageQueue
MessageMapSafe messageMapSafe
MessageMapAck messageMapAck
void processAckReceived(const uint32_t messageId)
Processes ack reveived.
void processAckMessages()
Process ack messages.
queue< Message * > MessageQueue
void setClientKey(const string &clientKey)
Set client key.
const string & getClientKey()
static const uint64_t MESSAGESSAFE_KEEPTIME
const UDPClient_Statistics getStatistics()
static uint64_t getRetryTime(const uint8_t retries)
Get retry time for given retry count.
UDPClientMessage * createMessage(stringstream *frame)
Create message.
static const int MESSAGEQUEUE_SEND_BATCH_SIZE
void sendMessage(UDPClientMessage *clientMessage, bool safe)
Pushes a message to be send, takes over ownership of message.
const unsigned int getPort()
Mutex recvMessageQueueMutex
static const uint64_t MESSAGEACK_RESENDTIMES[MESSAGEACK_RESENDTIMES_TRIES]
Mutex messageMapSafeMutex
bool processSafeMessage(UDPClientMessage *clientMessage)
Returns if a message should be processed or already have been processed.
UDPClientMessage * receiveMessage()
Receive message.
UDPClient_Statistics statistics
RecvMessageQueue recvMessageQueue
Interface to kernel event mechanismns.
void shutdownKernelEventMechanism()
shutdowns the kernel event mechanism
void setSocketInterest(const NetworkSocket &socket, const NIOInterest lastInterest, const NIOInterest interest, const void *cookie)
sets a non blocked socket io interest
int doKernelEventMechanism()
do the kernel event mechanism
void initKernelEventMechanism(const unsigned int maxCCU)
initializes the kernel event mechanism
void decodeKernelEvent(const unsigned int index, NIOInterest &interest, void *&cookie)
decodes a kernel event
Base class of network sockets.
void close()
Closes the socket.
ssize_t read(string &from, unsigned int &port, void *buf, const size_t bytes)
reads a datagram from socket
ssize_t write(const string &to, const unsigned int port, void *buf, const size_t bytes)
writes up to "bytes" bytes to socket
void unlock()
Unlocks this mutex.
void lock()
Locks the mutex, additionally mutex locks will block until other locks have been unlocked.
bool isStopRequested()
Returns if stop has been requested.
Run time type information utility class.
const NIOInterest NIO_INTEREST_NONE
const NIOInterest NIO_INTEREST_READ
uint8_t NIOInterest
type definiton for network UI interest
const NIOInterest NIO_INTEREST_WRITE
std::exception Exception
Exception base class.