TDME2 1.9.121
UDPServer.cpp
Go to the documentation of this file.
1/**
2 * @version $Id: baf35fe106f82d8bd3b13366cbf9d28daba32aed $
3 */
4#include <math.h>
5
6#include <exception>
7#include <sstream>
8#include <string>
9#include <typeinfo>
10
11#include <tdme/tdme.h>
20#include <tdme/utilities/RTTI.h>
21#include <tdme/utilities/Time.h>
22
23using std::ios_base;
24using std::string;
25using std::stringstream;
26using std::to_string;
27
38
39UDPServer::UDPServer(const std::string& name, const std::string& host, const unsigned int port, const unsigned int maxCCU) :
40 Server<UDPServerClient, UDPServerGroup>(name, host, port, maxCCU),
41 Thread("nioudpserver"),
42 clientIdMapReadWriteLock("nioudpserver_clientidmap"),
43 clientIpMapReadWriteLock("nioudpserver_clientipmap"),
44 ioThreadCurrent(0),
45 ioThreads(NULL),
46 workerThreadPool(NULL),
47 clientCount(0),
48 messageCount(0) {
49 //
50}
51
53}
54
56 Console::println("UDPServer::run(): start");
57
58 // create start up barrier for io threads
59 startUpBarrier = new Barrier("nioudpserver_startup_workers", workerThreadPoolCount + 1);
60
61 // setup worker thread pool
64
65 // wait on startup barrier and delete it
67 delete startUpBarrier;
68 startUpBarrier = NULL;
69
70 // create start up barrier for IO threads
71 startUpBarrier = new Barrier("nioudpserver_startup_iothreads", ioThreadCount + 1);
72
73 // create and start IO threads
75 for(unsigned int i = 0; i < ioThreadCount; i++) {
76 ioThreads[i] = new UDPServerIOThread(i, this, (int)ceil((float)maxCCU / (float)ioThreadCount));
77 ioThreads[i] ->start();
78 }
79
80 // wait on startup barrier and delete it
82 delete startUpBarrier;
83 startUpBarrier = NULL;
84
85 // init worker thread pool
86 //
87 Console::println("UDPServer::run(): ready");
88
89 // do main event loop, waiting until stop requested
90 uint64_t lastCleanUpClientsTime = Time::getCurrentMillis();
91 uint64_t lastCleanUpClientsSafeMessagesTime = Time::getCurrentMillis();
92 while(isStopRequested() == false) {
93 // start time
94 uint64_t now = Time::getCurrentMillis();
95
96 // clean up clients
97 if (now >= lastCleanUpClientsTime + 100L) {
99 lastCleanUpClientsTime = now;
100 }
101
102 // iterate over clients and clean up safe messages
103 if (now >= lastCleanUpClientsSafeMessagesTime + 100L) {
104 ClientKeySet _clientKeySet = getClientKeySet();
105 for (ClientKeySet::iterator i = _clientKeySet.begin(); i != _clientKeySet.end(); ++i) {
106 UDPServerClient* client = getClientByKey(*i);
107
108 // skip on clients that have been gone
109 if (client == NULL) continue;
110
111 // clean up safe messages
112 client->cleanUpSafeMessages();
113
114 // never forget to release ;)
115 client->releaseReference();
116 }
117 lastCleanUpClientsSafeMessagesTime = now;
118 }
119
120 // duration
121 uint64_t duration = Time::getCurrentMillis() - now;
122
123 // wait total of 100ms seconds before repeat
124 if (duration < 100L) {
125 sleep(100L - duration);
126 }
127 }
128
129 // we stopped accept, now stop io threads, but leave them intact
130 for(unsigned int i = 0; i < ioThreadCount; i++) {
131 ioThreads[i]->stop();
132 ioThreads[i]->join();
133 }
134
135 // iterate over clients and close them
136 ClientKeySet _clientKeySet = getClientKeySet();
137 for (ClientKeySet::iterator i = _clientKeySet.begin(); i != _clientKeySet.end(); ++i) {
138 UDPServerClient* client = getClientByKey(*i);
139 // continue if gone already
140 if (client == NULL) continue;
141 // client close logic
142 client->close();
143 // remove from udp client list
144 removeClient(client);
145 }
146
147 // stop thread pool
149 delete workerThreadPool;
150 workerThreadPool = NULL;
151
152 // delete io threads
153 for(unsigned int i = 0; i < ioThreadCount; i++) {
154 delete ioThreads[i];
155 }
156 delete [] ioThreads;
157 ioThreads = NULL;
158
159 //
160 Console::println("UDPServer::run(): done");
161}
162
163UDPServerClient* UDPServer::accept(const uint32_t clientId, const std::string& ip, const unsigned int port) {
164 return NULL;
165}
166
167void UDPServer::identify(stringstream* frame, MessageType& messageType, uint32_t& connectionId, uint32_t& messageId, uint8_t& retries) {
168 // format 1char_message_type,6_char_connection_id,6_char_message_id,1_char_retries
169 char inMessageType;
170 char inConnectionId[6];
171 char inMessageId[6];
172 char inRetries[1];
173
174 // check if enough data available
175 if ((unsigned int)frame->tellp() - (unsigned int)frame->tellg() <
176 sizeof(inMessageType) +
177 sizeof(inConnectionId) +
178 sizeof(inMessageId) +
179 sizeof(inRetries)) {
180 throw NetworkServerException("Invalid message header size");
181 }
182
183 // check message type
184 frame->read((char*)&inMessageType, sizeof(inMessageType));
185 switch(inMessageType) {
186 case('C'):
187 messageType = MESSAGETYPE_CONNECT;
188 break;
189 case('M'):
190 messageType = MESSAGETYPE_MESSAGE;
191 break;
192 case('A'):
193 messageType = MESSAGETYPE_ACKNOWLEDGEMENT;
194 break;
195 default:
196 throw NetworkServerException("Invalid message type");
197 }
198
199 // connection id
200 string strConnectionId;
201 frame->read((char*)&inConnectionId, sizeof(inConnectionId));
202 strConnectionId.append(inConnectionId, sizeof(inConnectionId));
203 if (Integer::decode(strConnectionId, connectionId) == false) {
204 throw NetworkServerException("Invalid connection id");
205 }
206
207 // decode message id
208 string strMessageId;
209 frame->read((char*)&inMessageId, sizeof(inMessageId));
210 strMessageId.append(inMessageId, sizeof(inMessageId));
211 if (Integer::decode(strMessageId, messageId) == false) {
212 throw NetworkServerException("Invalid message id");
213 }
214
215 // decode retries
216 string strRetries;
217 frame->read((char*)&inRetries, sizeof(inRetries));
218 strRetries.append(inRetries, sizeof(inRetries));
219 uint32_t _retries;
220 if (Integer::decode(strRetries, _retries) == false) {
221 throw NetworkServerException("Invalid retries");
222 }
223 retries = _retries;
224}
225
226void UDPServer::validate(stringstream* frame) {
227}
228
229void UDPServer::initializeHeader(stringstream* frame) {
230 // 14(messagetype, clientid, messageid, retries)
231 char emptyHeader[14] =
232 "\0\0\0\0\0\0\0\0\0\0"
233 "\0\0\0";
234
235 frame->write(emptyHeader, sizeof(emptyHeader));
236
237 // seek to end of stream
238 frame->seekp(0, ios_base::end);
239}
240
241void UDPServer::writeHeader(stringstream* frame, MessageType messageType, const uint32_t clientId, const uint32_t messageId, const uint8_t retries) {
242 // seek writing to beginning of header
243 frame->seekp(0, ios_base::beg);
244
245 // message type
246 switch(messageType) {
248 *frame << "C";
249 break;
251 *frame << "M";
252 break;
254 *frame << "A";
255 break;
256 default:
257 delete frame;
258 throw NetworkServerException("Invalid message type");
259 }
260
261 // client id
262 string strClientId;
263 Integer::encode(clientId, strClientId);
264 *frame << strClientId;
265
266 // message id
267 string strMessageId;
268 Integer::encode(messageId, strMessageId);
269 *frame << strMessageId;
270
271 // retries
272 string strRetriesId;
273 Integer::encode((uint32_t)retries, strRetriesId);
274 *frame << strRetriesId[strRetriesId.size() - 1];
275
276 // seek writing to end of stream
277 frame->seekp(0, ios_base::end);
278}
279
281 uint32_t clientId = client->clientId;
282
283 //
285
286 if (clientIdMap.size() >= maxCCU) {
287 // should actually never happen
289
290 // failure
291 throw NetworkServerException("too many clients");
292 }
293
294 // check if client id was mapped already?
295 ClientIdMap::iterator clientIdMapIt = clientIdMap.find(clientId);
296 if (clientIdMapIt != clientIdMap.end()) {
297 // should actually never happen
299
300 // failure
301 throw NetworkServerException("client id is already mapped");
302 }
303
304 // prepare client struct for map
305 ClientId* _clientId = new ClientId();
306 _clientId->clientId = clientId;
307 _clientId->client = client;
308 _clientId->time = Time::getCurrentMillis();
309
310 // put to map
311 clientIdMap[clientId] = _clientId;
312
313 // put to client ip set
315
316 // check if ip exists already?
317 string clientIp = client->getIp() + ":" + to_string(client->getPort());
318 ClientIpMap::iterator clientIpMapIt = clientIpMap.find(clientIp);
319 if (clientIpMapIt != clientIpMap.end()) {
320 // should actually never happen
323
324 // failure
325 throw NetworkServerException("client ip is already registered");
326 }
327
328 // put to map
329 clientIpMap[clientIp] = client;
330
331 ///
333
334 // reference counter +1
335 client->acquireReference();
336
337 // unlock
339}
340
342 uint32_t clientId = client->clientId;
343
344 //
346
347 // check if client id was mapped already?
348 ClientIdMap::iterator clientIdMapit = clientIdMap.find(clientId);
349 if (clientIdMapit == clientIdMap.end()) {
350 // should actually never happen
352
353 // failure
354 throw NetworkServerException("client id is not mapped");
355 }
356
357 // remove from client id map
358 delete clientIdMapit->second;
359 clientIdMap.erase(clientIdMapit);
360
361 // remove from client ip set
363
364 // check if ip exists already?
365 string clientIp = client->getIp() + ":" + to_string(client->getPort());
366 ClientIpMap::iterator clientIpMapIt = clientIpMap.find(clientIp);
367 if (clientIpMapIt == clientIpMap.end()) {
368 // should actually never happen
371
372 // failure
373 throw NetworkServerException("client ip is not registered");
374 }
375
376 // remove from ip map
377 clientIpMap.erase(clientIpMapIt);
378
379 //
381
382 // reference counter -1
383 client->releaseReference();
384
385 // unlock
387}
388
389UDPServerClient* UDPServer::lookupClient(const uint32_t clientId) {
390 UDPServerClient* client = NULL;
391 ClientIdMap::iterator it;
392
393 //
395
396 // check if client id was mapped already?
397 it = clientIdMap.find(clientId);
398 if (it == clientIdMap.end()) {
399 // should actually never happen
401
402 // failure
403 throw NetworkServerException("client does not exist");
404 }
405
406 // get client
407 ClientId* _client = it->second;
408 // update last access time
409 _client->time = Time::getCurrentMillis();
410 // get client
411 client = _client->client;
412
413 //
414 client->acquireReference();
415
416 // unlock
418
419 //
420 return client;
421}
422
423UDPServerClient* UDPServer::getClientByIp(const string& ip, const unsigned int port) {
424 UDPServerClient* client = NULL;
426 string clientIp = ip + ":" + to_string(port);
427 ClientIpMap::iterator clientIpMapIt = clientIpMap.find(clientIp);
428 if (clientIpMapIt != clientIpMap.end()) {
429 client = clientIpMapIt->second;
430 client->acquireReference();
431 }
433 return client;
434}
435
437 ClientSet clientCloseList;
438
439 // determine clients that are idle or beeing flagged to be shut down
441
442 uint64_t now = Time::getCurrentMillis();
443 for(ClientIdMap::iterator it = clientIdMap.begin(); it != clientIdMap.end(); ++it) {
444 ClientId* client = it->second;
445 if (client->client->shutdownRequested == true ||
446 client->time < now - CLIENT_CLEANUP_IDLETIME) {
447
448 // acquire reference for worker
449 client->client->acquireReference();
450
451 // mark for beeing closed
452 clientCloseList.insert(client->client);
453 }
454 }
455
456 //
458
459 // erase clients
460 for (ClientSet::iterator it = clientCloseList.begin(); it != clientCloseList.end(); ++it) {
461 UDPServerClient* client = *it;
462 // client close logic
463 client->close();
464 // remove from udp client list
465 removeClient(client);
466 }
467}
468
469void UDPServer::sendMessage(const UDPServerClient* client, stringstream* frame, const bool safe, const bool deleteFrame, const MessageType messageType, const uint32_t messageId) {
470 // determine message id by message type
471 uint32_t _messageId;
472 switch(messageType) {
475 _messageId = AtomicOperations::increment(messageCount);
476 break;
478 _messageId = messageId;
479 break;
480 default:
481 delete frame;
482 throw NetworkServerException("Invalid message type");
483 }
484
485 unsigned int threadIdx = _messageId % ioThreadCount;
486 writeHeader(frame, messageType, client->clientId, _messageId, 0);
487 ioThreads[threadIdx]->sendMessage(client, (uint8_t)messageType, _messageId, frame, safe, deleteFrame);
488}
489
490void UDPServer::processAckReceived(UDPServerClient* client, const uint32_t messageId) {
491 unsigned int threadIdx = messageId % ioThreadCount;
492 ioThreads[threadIdx]->processAckReceived(client, messageId);
493}
494
496 return AtomicOperations::increment(clientCount);
497}
498
500 auto stats = statistics;
501 statistics.time = Time::getCurrentMillis();
503 statistics.sent = 0;
505 statistics.errors = 0;
506 // determine clients that are idle or beeing flagged to be shut down
508 stats.clients = clientIdMap.size();
510 return stats;
511}
Base exception class for network server exceptions.
Simple server worker thread pool class.
Base class for network servers.
Definition: Server.h:26
ClientKeySet getClientKeySet()
get a copy of current client keys
Definition: Server.h:86
UDPServerClient * getClientByKey(const std::string &clientKey)
retrieve a client by key, the client reference is acquired, must be released after usage
Definition: Server.h:101
Base class for network UDP server clients.
void cleanUpSafeMessages()
Clean up safe messages.
void close()
Shuts down this network client.
const string & getIp() const
returns client's ip
const unsigned int getPort() const
returns client port
void sendMessage(const UDPServerClient *client, const uint8_t messageType, const uint32_t messageId, stringstream *frame, const bool safe, const bool deleteFrame)
pushes a message to be send, takes over ownership of frame
void processAckReceived(UDPServerClient *client, const uint32_t messageId)
Processes an acknowlegdement reception.
Base class for network UDP servers.
Definition: UDPServer.h:39
static void initializeHeader(stringstream *frame)
Writes a empty header to message.
Definition: UDPServer.cpp:229
UDPServer_Statistics statistics
Definition: UDPServer.h:213
virtual void run()
main event loop
Definition: UDPServer.cpp:55
void processAckReceived(UDPServerClient *client, const uint32_t messageId)
Processes an acknowlegdement reception.
Definition: UDPServer.cpp:490
virtual ~UDPServer()
destructor
Definition: UDPServer.cpp:52
const UDPServer_Statistics getStatistics()
Definition: UDPServer.cpp:499
void addClient(UDPServerClient *client)
maps a new client to a given client id
Definition: UDPServer.cpp:280
UDPServerClient * lookupClient(const uint32_t clientId)
Look ups a client by client id.
Definition: UDPServer.cpp:389
virtual void identify(stringstream *frame, MessageType &messageType, uint32_t &connectionId, uint32_t &messageId, uint8_t &retries)
Identifies a client message.
Definition: UDPServer.cpp:167
std::set< UDPServerClient * > ClientSet
Definition: UDPServer.h:135
UDPServerClient * getClientByIp(const string &ip, const unsigned int port)
Returns client by host name and port.
Definition: UDPServer.cpp:423
void cleanUpClients()
Clean up clients that have been idle for some time or are flagged to be shut down.
Definition: UDPServer.cpp:436
static const uint64_t CLIENT_CLEANUP_IDLETIME
Definition: UDPServer.h:127
UDPServerIOThread ** ioThreads
Definition: UDPServer.h:207
virtual void validate(stringstream *frame)
Validates a client message.
Definition: UDPServer.cpp:226
const uint32_t allocateClientId()
Allocates a client id for a new client.
Definition: UDPServer.cpp:495
virtual UDPServerClient * accept(const uint32_t clientId, const std::string &ip, const unsigned int port)
method to implement for accepting clients
Definition: UDPServer.cpp:163
ServerWorkerThreadPool * workerThreadPool
Definition: UDPServer.h:208
void sendMessage(const UDPServerClient *client, stringstream *frame, const bool safe, const bool deleteFrame, const MessageType messageType, const uint32_t messageId=MESSAGE_ID_NONE)
pushes a message to be send, takes over ownership of frame
Definition: UDPServer.cpp:469
void removeClient(UDPServerClient *client)
removes a client
Definition: UDPServer.cpp:341
virtual void writeHeader(stringstream *frame, MessageType messageType, const uint32_t clientId, const uint32_t messageId, const uint8_t retries)
Writes a message header to message.
Definition: UDPServer.cpp:241
Barrier implementation.
Definition: Barrier.h:21
bool wait()
Waits on barrier.
Definition: Barrier.cpp:31
Implementation for read/write lock.
Definition: ReadWriteLock.h:21
void writeLock()
Locks for writing / exclusive lock.
void unlock()
Unlocks this read write lock.
void readLock()
Locks for reading / shared lock.
Base class for threads.
Definition: Thread.h:26
void start()
Starts this objects thread.
Definition: Thread.cpp:65
void join()
Blocks caller thread until this thread has been terminated.
Definition: Thread.cpp:55
void stop()
Requests that this thread should be stopped.
Definition: Thread.cpp:94
static void sleep(const uint64_t milliseconds)
sleeps current thread for given time in milliseconds
Definition: Thread.cpp:34
bool isStopRequested()
Returns if stop has been requested.
Definition: Thread.cpp:98
Console class.
Definition: Console.h:26
Integer class.
Definition: Integer.h:26
Run time type information utility class.
Definition: RTTI.h:14
void releaseReference()
releases a reference, thus decrementing the counter and delete it if reference counter is zero
Definition: Reference.cpp:20
void acquireReference()
acquires a reference, incrementing the counter
Definition: Reference.cpp:16
Time utility class.
Definition: Time.h:21