Skip to content

Commit

Permalink
Implement client disconnection and server resources release
Browse files Browse the repository at this point in the history
  • Loading branch information
Ria9993 committed Apr 9, 2024
1 parent 0f08677 commit 58666e7
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Source/Server/ClientCommand/USER.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ EIrcErrorCode Server::executeClientCommand_USER(SharedPtr<ClientControlBlock> cl
{
replyMsg = "ERROR :Invalid USER arguments";
sendMsgToClient(client, MakeShared<MsgBlock>(replyMsg));
disconnectClient(client);
forceDisconnectClient(client);

return IRC_SUCCESS;
}
Expand Down
10 changes: 4 additions & 6 deletions Source/Server/ClientControlBlock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,11 @@ struct ClientControlBlock : public FlexibleMemoryPoolingBase<ClientControlBlock>

bool bRegistered;


/** Flag that indicate whether the client's connection is expired.
*
* @details The server doesn't release the client control block
* for remaining events or messages immediately after the connection is closed.
*/
/** Flag that indicate whether the client is expired, and expired client will be released after the remaining messages are sent. */
bool bExpired;

bool bSocketClosed;

/** Received messages from the client.
*
* @details Each message block is not a separate message.
Expand Down Expand Up @@ -77,6 +74,7 @@ struct ClientControlBlock : public FlexibleMemoryPoolingBase<ClientControlBlock>
, LastActiveTime(0)
, bRegistered(false)
, bExpired(false)
, bSocketClosed(false)
, RecvMsgBlocks()
, RecvMsgBlockCursor(0)
, MsgSendingQueue()
Expand Down
166 changes: 134 additions & 32 deletions Source/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ namespace IRC
{
Server::~Server()
{
// TODO:
// mhKqueue
// mhListenSocket
destroyResources();
}

Server& Server::operator=(UNUSED const Server& rhs)
Expand Down Expand Up @@ -108,6 +106,13 @@ EIrcErrorCode Server::Startup()

// Start the event loop
EIrcErrorCode result = eventLoop();

// Destroy resources even after the event loop is successfully, so that a server can be restarted.
// if (UNLIKELY(result != IRC_SUCCESS))
{
destroyResources();
}

return result;
}

Expand All @@ -127,6 +132,9 @@ EIrcErrorCode Server::eventLoop()

while (true)
{
// Release expired clients. (see mClientReleaseQueue description for details)
mClientReleaseQueue.clear();

// Set the timeout of kevent.
// If there is no message to process, the timeout is NULL to wait indefinitely.
// Else, the timeout is zero to process the received messages from the clients.
Expand All @@ -140,6 +148,7 @@ EIrcErrorCode Server::eventLoop()
observedEventNum = kevent(mhKqueue, mEventRegistrationQueue.data(), mEventRegistrationQueue.size(), observedEvents, CLIENT_MAX, timeout);
if (UNLIKELY(observedEventNum == -1))
{
logErrorCode(IRC_FAILED_TO_WAIT_KEVENT);
return IRC_FAILED_TO_WAIT_KEVENT;
}
mEventRegistrationQueue.clear();
Expand Down Expand Up @@ -189,7 +198,7 @@ EIrcErrorCode Server::eventLoop()
{
logErrorCode(IRC_ERROR_CLIENT_SOCKET_EVENT);
SharedPtr<ClientControlBlock> client = getClientFromKeventUdata(currEvent);
EIrcErrorCode err = disconnectClient(client);
EIrcErrorCode err = forceDisconnectClient(client);
if (UNLIKELY(err != IRC_SUCCESS))
{
return err;
Expand All @@ -212,15 +221,15 @@ EIrcErrorCode Server::eventLoop()
const int clientSocket = accept(mhListenSocket, reinterpret_cast<sockaddr_t*>(&clientAddr), &clientAddrLen);
if (clientSocket == -1)
{
goto CONTINUE_NEXT_EVENT_LOOP;
break;
}

// Set client socket as non-blocking
if (UNLIKELY(fcntl(clientSocket, F_SETFL, O_NONBLOCK) == -1))
{
logErrorCode(IRC_FAILED_TO_SETSOCKOPT_SOCKET);
close(clientSocket);
goto CONTINUE_NEXT_EVENT_LOOP;
continue;
}

// Add client to the client list
Expand Down Expand Up @@ -270,7 +279,7 @@ EIrcErrorCode Server::eventLoop()
if (UNLIKELY(nRecvBytes == -1))
{
logErrorCode(IRC_FAILED_TO_RECV_SOCKET);
EIrcErrorCode err = disconnectClient(currClient);
EIrcErrorCode err = forceDisconnectClient(currClient);
if (UNLIKELY(err != IRC_SUCCESS))
{
return err;
Expand All @@ -281,7 +290,7 @@ EIrcErrorCode Server::eventLoop()
else if (nRecvBytes == 0)
{
logMessage("Client disconnected. IP: " + InetAddrToString(currClient->Addr) + ", Nick: " + currClient->Nickname);
EIrcErrorCode err = disconnectClient(currClient);
EIrcErrorCode err = forceDisconnectClient(currClient);
if (UNLIKELY(err != IRC_SUCCESS))
{
return err;
Expand Down Expand Up @@ -323,7 +332,7 @@ EIrcErrorCode Server::eventLoop()
if (UNLIKELY(nSentBytes == -1))
{
logErrorCode(IRC_FAILED_TO_SEND_SOCKET);
EIrcErrorCode err = disconnectClient(currClient);
EIrcErrorCode err = forceDisconnectClient(currClient);
if (UNLIKELY(err != IRC_SUCCESS))
{
return err;
Expand All @@ -344,9 +353,17 @@ EIrcErrorCode Server::eventLoop()
{
currClient->MsgSendingQueue.pop();
currClient->SendMsgBlockCursor = 0;
}

// EVFILTER_WRITE filter should be disabled after sending all messages.
if (currClient->MsgSendingQueue.empty())
// EVFILTER_WRITE filter should be disabled after sending all messages.
if (currClient->MsgSendingQueue.empty())
{
// Close the expired client connection after sending all messages. (See disconnectClient() for details)
if (currClient->bExpired)
{
forceDisconnectClient(currClient);
}
else
{
kevent_t kev;
kev.ident = currClient->hSocket;
Expand All @@ -372,14 +389,39 @@ EIrcErrorCode Server::eventLoop()
return IRC_FAILED_UNREACHABLE_CODE;
}

EIrcErrorCode Server::destroyResources()
{
// Server is already crashed with an error, and therefore we can't guarantee the resources are properly released.
// So, we don't handle the error of release.

// Close sockets
// Clients and message blocks are will be released automatically by SharedPtr.
close(mhKqueue);
close(mhListenSocket);
for (size_t i = 0; i < mClients.size(); i++)
{
if (!mClients[i]->bSocketClosed)
{
close(mClients[i]->hSocket);
}
}

// Release clients
// The clients and message blocks are will be released automatically by SharedPtr.
mClients.clear();
mNickToClientMap.clear();
mEventRegistrationQueue.clear();
mClientReleaseQueue.clear();

// TODO: memory check for memory pool

return IRC_SUCCESS;
}

EIrcErrorCode Server::separateMsgsFromClientRecvMsgs(SharedPtr<ClientControlBlock> client, std::vector< SharedPtr<MsgBlock> >& outSeparatedMsgs)
{
// Check the client is expired.
// The message block can be exist even if the client is expired.
// See disconnectClient() for details.
if (client->bExpired)
{
logVerbose("separateMsgsFromClientRecvMsgs(): The client of the message is already expired. IP: " + InetAddrToString(client->Addr) + ", Nick: " + client->Nickname);
return IRC_SUCCESS;
}

Expand All @@ -389,7 +431,6 @@ EIrcErrorCode Server::separateMsgsFromClientRecvMsgs(SharedPtr<ClientControlBloc
return IRC_SUCCESS;
}


// Reset the cursor if it is out of range
if (client->RecvMsgBlockCursor >= MESSAGE_LEN_MAX)
{
Expand Down Expand Up @@ -478,13 +519,11 @@ EIrcErrorCode Server::processClientMsg(SharedPtr<ClientControlBlock> client, Sha
{
if (client->bExpired)
{
logVerbose("processClientMsg(): The client of the message is already expired. IP: " + InetAddrToString(client->Addr) + ", Nick: " + client->Nickname);
return IRC_SUCCESS;
}

if (msg == NULL)
{
logVerbose("processClientMsg(): The message is NULL. IP: " + InetAddrToString(client->Addr) + ", Nick: " + client->Nickname);
return IRC_SUCCESS;
}

Expand Down Expand Up @@ -598,20 +637,30 @@ EIrcErrorCode Server::processClientMsg(SharedPtr<ClientControlBlock> client, Sha
return IRC_SUCCESS;
}

EIrcErrorCode Server::disconnectClient(SharedPtr<ClientControlBlock> client)
EIrcErrorCode Server::forceDisconnectClient(SharedPtr<ClientControlBlock> client)
{
if (client == NULL)
{
return IRC_SUCCESS;
}

Assert(client->bExpired == false);
// The client socket is already closed.
if (client->bSocketClosed)
{
return IRC_SUCCESS;
}

// Expire the client instead of memory release and remove from the client list.
// See ClientControlBlock::bExpired for details.
// Close the client socket.
// close() on a socket will delete the corresponding kevent from the kqueue.
if (UNLIKELY(close(client->hSocket) == -1))
{
logErrorCode(IRC_FAILED_TO_CLOSE_SOCKET);
return IRC_FAILED_TO_CLOSE_SOCKET;
}
client->bSocketClosed = true;
client->bExpired = true;

// Remove the client from the client list
// Remove the client from client lists
for (size_t i = 0; i < mClients.size(); i++)
{
if (mClients[i] == client)
Expand All @@ -636,13 +685,71 @@ EIrcErrorCode Server::disconnectClient(SharedPtr<ClientControlBlock> client)

// TODO: Send QUIT message to the channels the client is in.

// close() on a socket will delete the corresponding kevent from the kqueue.
if (UNLIKELY(close(client->hSocket) == -1))

// Defer the release of the client.
mClientReleaseQueue.push_back(client);

return IRC_SUCCESS;
}

EIrcErrorCode Server::disconnectClient(SharedPtr<ClientControlBlock> client)
{
if (client == NULL)
{
logErrorCode(IRC_FAILED_TO_CLOSE_SOCKET);
return IRC_FAILED_TO_CLOSE_SOCKET;
return IRC_SUCCESS;
}

if (client->bExpired)
{
return IRC_SUCCESS;
}

client->bExpired = true;

// If the client has no message to send, disconnect the client immediately.
if (client->MsgSendingQueue.empty())
{
return forceDisconnectClient(client);
}

// Block the messages from the client
kevent_t kev;
kev.ident = client->hSocket;
kev.filter = EVFILT_READ;
kev.flags = EV_DELETE;
kev.fflags = 0;
kev.data = 0;
kev.udata = reinterpret_cast<void *>(client.GetControlBlock());
mEventRegistrationQueue.push_back(kev);

// Remove the client from client lists
for (size_t i = 0; i < mClients.size(); i++)
{
if (mClients[i] == client)
{
// Fast remove (unordered)
mClients[i] = mClients.back();
mClients.pop_back();
break;
}
}
mNickToClientMap.erase(client->Nickname);

// Remove from the channels
for (std::map< std::string, WeakPtr< ChannelControlBlock > >::iterator it = client->Channels.begin(); it != client->Channels.end(); ++it)
{
SharedPtr<ChannelControlBlock> channel = it->second.Lock();
if (channel != NULL)
{
channel->Clients.erase(client->Nickname);
}
}

// TODO: Send QUIT message to the channels the client is in.

// Defer the release of the client.
mClientReleaseQueue.push_back(client);

return IRC_SUCCESS;
}

Expand Down Expand Up @@ -734,11 +841,6 @@ void Server::sendMsgToConnectedChannels(SharedPtr<ClientControlBlock> client, Sh
Assert(client != NULL);
Assert(msg != NULL);

if (client->bExpired)
{
return;
}

for (std::map< std::string, WeakPtr< ChannelControlBlock > >::iterator it = client->Channels.begin(); it != client->Channels.end(); ++it)
{
SharedPtr<ChannelControlBlock> channel = it->second.Lock();
Expand Down
Loading

0 comments on commit 58666e7

Please sign in to comment.