Software Developer

Subscribe

© 2020

C++ multithreaded TCP server

The article describes how to implement a concurrent TCP/IP server in C++ for Linux environment. Multithreading provides concurrency in my solution. Thanks to concurrency, clients do not have to wait for their turn and can be served immediately. The server I created has one thread to handle new connections (TCPServer class). After accepting such a connection, a new thread is created which is responsible for all communication with a given client (ConnectionHandler class). Implementation of ConnectionHandler can be freely changed. It may allow any use of the server, for example it could work well as an HTTP server.

TCPServer.h

#pragma once

#include <thread>

class TCPServer {
public:
    static const int PORT = 1234;

    // starts server
    void start();

    // stops server
    void stop();

    // joining server thread - waits for server to end
    void join();

    // constructor automatically starts server thread
    TCPServer();
    ~TCPServer();

private:
    // event file descriptor - it will be used to tell server to stop
    int efd;

    // server thread
    std::thread m_thread;

    // it works in server thread
    void threadFunc();
};

The server starts after the construction of TCPServer object.

A new thread is created which runs threadFunc() method. TCPServer::start() also created eventfd which will be used to notify the server to stop processing.

threadFunc() creates a socket and it binds to a specified port (in this example it is 1234). poll() is used to monitor if any of the opened file descriptors are ready to perform I/O. After receiving an I/O event on a listening socket, the new ConnectionHandler object is constructed which runs on a separate thread.

TCPServer.cpp

#include "TCPServer.h"
#include <unistd.h>
#include <poll.h>
#include <sys/eventfd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <cassert>
#include <vector>
#include <unordered_map>
#include "ConnectionHandler.h"
#include <iostream>

TCPServer::TCPServer()
        : efd(-1) { 
    this->start();
}

TCPServer::~TCPServer() {
    this->stop();
    if (this->efd != -1){
        close(this->efd);
    }
}

void TCPServer::start() {
    assert(!this->m_thread.joinable());

    if (this->efd != -1) {
        close(this->efd);
    }

    this->efd = eventfd(0, 0);
    if (this->efd == -1) {
        std::cout << "eventfd() => -1, errno=" << errno << std::endl;
        return;
    }

    // creates thread
    this->m_thread = std::thread([this]() { this->threadFunc(); });

    // sets name for thread
    pthread_setname_np(this->m_thread.native_handle(), "TCPServer");
}


void TCPServer::stop() {
    // writes to efd - it will be handled as stopping server thread
    uint64_t one = 1;
    auto ret = write(this->efd, &one, 8);
    if (ret == -1) {
        std::cout << "write => -1, errno=" << errno << std::endl;
    }
}

void TCPServer::join() {
    if (this->m_thread.joinable()) {
        this->m_thread.join();
    }
}

void TCPServer::threadFunc() {
    int sockfd;

    std::cout << "Listen on: " << PORT << std::endl;
    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        std::cout << "socket() => -1, errno=" << errno << std::endl;
        return;
    }

    int reuseaddr = 1;
    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) == -1) {
        std::cout << "setsockopt() => -1, errno=" << errno << std::endl;
    }

    struct sockaddr_in servaddr = {0, 0, 0, 0};
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = INADDR_ANY;
    servaddr.sin_port = htons(PORT);

    // binding to socket that will listen for new connections
    if (bind(sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr)) == -1) {
        std::cout << "bind() => -1, errno=" << errno << std::endl;
        close(sockfd);
        return;
    }

    // started listening, 50 pending connections can wait in a queue
    listen(sockfd, 50);

    // monitored file descriptors - at start there is efd and just created sockfd. POLLIN means we wait for data to read
    std::vector<struct pollfd> fds{ { this->efd, POLLIN, 0 }, { sockfd, POLLIN, 0 } };
    
    std::unordered_map<int, ConnectionHandler> handlers;

    while (true) {
        const int TIMEOUT = 1000;   // 1000 ms
        int n = poll(fds.data(), fds.size(), TIMEOUT);  // checking if there was any event on monitored file descriptors
        if (n == -1 && errno != ETIMEDOUT && errno != EINTR) {
            std::cout << "poll() => -1, errno=" << errno << std::endl;
            break;
        }
        
        // n pending events
        if (n > 0) {
            if (fds[0].revents) {   // handles server stop request (which is sent by TCPServer::stop())
                std::cout << "Received stop request" << std::endl;
                break;
            } else if (fds[1].revents) {    // new client connected
                // accepting connection
                int clientfd = accept(sockfd, NULL, NULL);
                std::cout << "New connection" << std::endl;
                if (clientfd != -1) {
                    // insert new pollfd to monitor
                    fds.push_back(pollfd{clientfd, POLLIN, 0});

                    // create ConnectionHandler object that will run in separate thread
                    handlers.emplace(clientfd, clientfd);
                } else {
                    std::cout << "accept => -1, errno=" << errno << std::endl;
                }

                // clearing revents
                fds[1].revents = 0;
            }
            
            // iterating all pollfds to check if anyone disconnected
            for (auto it = fds.begin() + 2; it != fds.end(); ) {
                char c;
                if (it->revents 
                        && recv(it->fd, &c, 1, MSG_PEEK | MSG_DONTWAIT) == 0) { // checks if disconnected or just fd readable
                    std::cout << "Client disconnected" << std::endl;
                    close(it->fd);  // closing socket
                    handlers.at(it->fd).terminate();    // terminating ConnectionHandler thread
                    handlers.erase(it->fd);
                    it = fds.erase(it);
                } else {
                    ++it;
                }
            }
        }
    }

    // cleaning section after receiving stop request
    for (auto it = fds.begin() + 1; it != fds.end(); it++) {
        close(it->fd);
        if (handlers.find(it->fd) != handlers.end()) {
            handlers.at(it->fd).terminate();
        }
    }

    std::cout << "TCP server stopped" << std::endl;
}

ConnectionHandler.h

#pragma once

#include <thread>

class ConnectionHandler {
private:
    std::thread m_thread;
    int fd = -1;
    bool m_terminate = false;

    std::string readMessage();
    void sendMessage(const std::string& msg);

    void stop();

public:
    explicit ConnectionHandler(int fd);
    ~ConnectionHandler();

    void terminate();
    void threadFunc();
};

ConnectionHandler.cpp

#include "ConnectionHandler.h"
#include <cassert>
#include <sys/socket.h>
#include <errno.h>
#include <iostream>
#include <pthread.h>

ConnectionHandler::ConnectionHandler(int fd) : fd(fd) {
    assert(!this->m_thread.joinable());

    // creating thread that handles received messages
    this->m_thread = std::thread([this]() { this->threadFunc(); });
    pthread_setname_np(this->m_thread.native_handle(), "ConnectionHandler");
}

ConnectionHandler::~ConnectionHandler() {
    this->stop();
}

void ConnectionHandler::stop() {
    if (this->m_thread.joinable()) {
        this->m_thread.join();
    }
}

void ConnectionHandler::threadFunc() {
    while (!this->m_terminate) {
        std::string msg = this->readMessage();
        std::cout << "Received message: " << msg << std::endl;
        this->sendMessage("Thank you for your message " + msg);
    }
}

std::string ConnectionHandler::readMessage() {
    std::string msg(1024, '\0');    // buffor with 1024 length which is filled with NULL character
    
    int readBytes = recv(this->fd, msg.data(), msg.size(), 0);
    if (readBytes < 1) {
        std::cout << "Error in readMessage, readBytes: " << readBytes << std::endl;
        return "";
    }

    return msg;
}


void ConnectionHandler::sendMessage(const std::string& msg) {
    int n = send(this->fd, msg.c_str(), msg.size(), 0);
    if (n != static_cast<int>(msg.size())) {
        std::cout << "Error while sending message, message size: " << msg.size() << " bytes sent: " << std::endl;
    }
}


void ConnectionHandler::terminate() {
    this->m_terminate = true;
}

main.cpp

#include "TCPServer.h"

int main() 
{
    // create server - it starts automatically in constructor
    TCPServer server;

    // server.stop(); // we could stop the server this way

    // wait for server thread to end
    server.join();

    return 0;
}

What could be done better?

  • there could be a thread pool that would handle client connections. Thread creation is costly and we would like to avoid it
  • there could be a SIGINT (ctrl-c) handler that calls TCPServer::stop() in main.cpp