25using std::stringstream;
39UDPServer::UDPServer(
const std::string& name,
const std::string& host,
const unsigned int port,
const unsigned int maxCCU) :
42 clientIdMapReadWriteLock(
"nioudpserver_clientidmap"),
43 clientIpMapReadWriteLock(
"nioudpserver_clientipmap"),
46 workerThreadPool(NULL),
56 Console::println(
"UDPServer::run(): start");
87 Console::println(
"UDPServer::run(): ready");
90 uint64_t lastCleanUpClientsTime = Time::getCurrentMillis();
91 uint64_t lastCleanUpClientsSafeMessagesTime = Time::getCurrentMillis();
94 uint64_t now = Time::getCurrentMillis();
97 if (now >= lastCleanUpClientsTime + 100L) {
99 lastCleanUpClientsTime = now;
103 if (now >= lastCleanUpClientsSafeMessagesTime + 100L) {
105 for (ClientKeySet::iterator i = _clientKeySet.begin(); i != _clientKeySet.end(); ++i) {
109 if (client == NULL)
continue;
117 lastCleanUpClientsSafeMessagesTime = now;
121 uint64_t duration = Time::getCurrentMillis() - now;
124 if (duration < 100L) {
125 sleep(100L - duration);
137 for (ClientKeySet::iterator i = _clientKeySet.begin(); i != _clientKeySet.end(); ++i) {
140 if (client == NULL)
continue;
160 Console::println(
"UDPServer::run(): done");
170 char inConnectionId[6];
175 if ((
unsigned int)frame->tellp() - (
unsigned int)frame->tellg() <
176 sizeof(inMessageType) +
177 sizeof(inConnectionId) +
178 sizeof(inMessageId) +
184 frame->read((
char*)&inMessageType,
sizeof(inMessageType));
185 switch(inMessageType) {
200 string strConnectionId;
201 frame->read((
char*)&inConnectionId,
sizeof(inConnectionId));
202 strConnectionId.append(inConnectionId,
sizeof(inConnectionId));
203 if (Integer::decode(strConnectionId, connectionId) ==
false) {
209 frame->read((
char*)&inMessageId,
sizeof(inMessageId));
210 strMessageId.append(inMessageId,
sizeof(inMessageId));
211 if (Integer::decode(strMessageId, messageId) ==
false) {
217 frame->read((
char*)&inRetries,
sizeof(inRetries));
218 strRetries.append(inRetries,
sizeof(inRetries));
220 if (Integer::decode(strRetries, _retries) ==
false) {
231 char emptyHeader[14] =
232 "\0\0\0\0\0\0\0\0\0\0"
235 frame->write(emptyHeader,
sizeof(emptyHeader));
238 frame->seekp(0, ios_base::end);
243 frame->seekp(0, ios_base::beg);
246 switch(messageType) {
263 Integer::encode(clientId, strClientId);
264 *frame << strClientId;
268 Integer::encode(messageId, strMessageId);
269 *frame << strMessageId;
273 Integer::encode((uint32_t)retries, strRetriesId);
274 *frame << strRetriesId[strRetriesId.size() - 1];
277 frame->seekp(0, ios_base::end);
281 uint32_t clientId = client->
clientId;
295 ClientIdMap::iterator clientIdMapIt =
clientIdMap.find(clientId);
307 _clientId->
client = client;
308 _clientId->
time = Time::getCurrentMillis();
317 string clientIp = client->
getIp() +
":" + to_string(client->
getPort());
318 ClientIpMap::iterator clientIpMapIt =
clientIpMap.find(clientIp);
342 uint32_t clientId = client->
clientId;
348 ClientIdMap::iterator clientIdMapit =
clientIdMap.find(clientId);
358 delete clientIdMapit->second;
365 string clientIp = client->
getIp() +
":" + to_string(client->
getPort());
366 ClientIpMap::iterator clientIpMapIt =
clientIpMap.find(clientIp);
391 ClientIdMap::iterator it;
409 _client->
time = Time::getCurrentMillis();
426 string clientIp = ip +
":" + to_string(
port);
427 ClientIpMap::iterator clientIpMapIt =
clientIpMap.find(clientIp);
429 client = clientIpMapIt->second;
442 uint64_t now = Time::getCurrentMillis();
452 clientCloseList.insert(client->
client);
460 for (ClientSet::iterator it = clientCloseList.begin(); it != clientCloseList.end(); ++it) {
472 switch(messageType) {
478 _messageId = messageId;
487 ioThreads[threadIdx]->
sendMessage(client, (uint8_t)messageType, _messageId, frame, safe, deleteFrame);
Base exception class for network server exceptions.
Simple server worker thread pool class.
void start()
Start worker thread pool.
void stop()
Stop worker thread pool.
Base class for network servers.
ClientKeySet getClientKeySet()
get a copy of current client keys
unsigned int workerThreadPoolMaxElements
unsigned int workerThreadPoolCount
unsigned int ioThreadCount
UDPServerClient * getClientByKey(const std::string &clientKey)
retrieve a client by key, the client reference is acquired, must be released after usage
std::set< std::string > ClientKeySet
Base class for network UDP server clients.
void cleanUpSafeMessages()
Clean up safe messages.
void close()
Shuts down this network client.
const string & getIp() const
returns client's ip
const unsigned int getPort() const
returns client port
volatile bool shutdownRequested
UDP Network server IO thread.
void sendMessage(const UDPServerClient *client, const uint8_t messageType, const uint32_t messageId, stringstream *frame, const bool safe, const bool deleteFrame)
pushes a message to be send, takes over ownership of frame
void processAckReceived(UDPServerClient *client, const uint32_t messageId)
Processes an acknowlegdement reception.
Base class for network UDP servers.
static void initializeHeader(stringstream *frame)
Writes a empty header to message.
UDPServer_Statistics statistics
virtual void run()
main event loop
friend class UDPServerIOThread
void processAckReceived(UDPServerClient *client, const uint32_t messageId)
Processes an acknowlegdement reception.
ReadWriteLock clientIpMapReadWriteLock
virtual ~UDPServer()
destructor
const UDPServer_Statistics getStatistics()
void addClient(UDPServerClient *client)
maps a new client to a given client id
UDPServerClient * lookupClient(const uint32_t clientId)
Look ups a client by client id.
virtual void identify(stringstream *frame, MessageType &messageType, uint32_t &connectionId, uint32_t &messageId, uint8_t &retries)
Identifies a client message.
std::set< UDPServerClient * > ClientSet
UDPServerClient * getClientByIp(const string &ip, const unsigned int port)
Returns client by host name and port.
void cleanUpClients()
Clean up clients that have been idle for some time or are flagged to be shut down.
static const uint64_t CLIENT_CLEANUP_IDLETIME
@ MESSAGETYPE_ACKNOWLEDGEMENT
ReadWriteLock clientIdMapReadWriteLock
UDPServerIOThread ** ioThreads
virtual void validate(stringstream *frame)
Validates a client message.
const uint32_t allocateClientId()
Allocates a client id for a new client.
virtual UDPServerClient * accept(const uint32_t clientId, const std::string &ip, const unsigned int port)
method to implement for accepting clients
ServerWorkerThreadPool * workerThreadPool
void sendMessage(const UDPServerClient *client, stringstream *frame, const bool safe, const bool deleteFrame, const MessageType messageType, const uint32_t messageId=MESSAGE_ID_NONE)
pushes a message to be send, takes over ownership of frame
void removeClient(UDPServerClient *client)
removes a client
virtual void writeHeader(stringstream *frame, MessageType messageType, const uint32_t clientId, const uint32_t messageId, const uint8_t retries)
Writes a message header to message.
bool wait()
Waits on barrier.
Implementation for read/write lock.
void writeLock()
Locks for writing / exclusive lock.
void unlock()
Unlocks this read write lock.
void readLock()
Locks for reading / shared lock.
void start()
Starts this objects thread.
void join()
Blocks caller thread until this thread has been terminated.
void stop()
Requests that this thread should be stopped.
static void sleep(const uint64_t milliseconds)
sleeps current thread for given time in milliseconds
bool isStopRequested()
Returns if stop has been requested.
Run time type information utility class.
void releaseReference()
releases a reference, thus decrementing the counter and delete it if reference counter is zero
void acquireReference()
acquires a reference, incrementing the counter