From 08325faff52d0704c6f17d065ca8d72ea07ca6a0 Mon Sep 17 00:00:00 2001 From: crupest Date: Mon, 7 Jun 2021 21:13:44 +0800 Subject: import(life): ... --- works/life/computer-network-experiment/server.cpp | 68 ++++++++++++++++++++--- 1 file changed, 59 insertions(+), 9 deletions(-) (limited to 'works/life/computer-network-experiment/server.cpp') diff --git a/works/life/computer-network-experiment/server.cpp b/works/life/computer-network-experiment/server.cpp index 2877c61..3c87ea0 100644 --- a/works/life/computer-network-experiment/server.cpp +++ b/works/life/computer-network-experiment/server.cpp @@ -4,8 +4,13 @@ #include "Common.h" #include "Output.h" +#include "fmt/core.h" + +#include +#include #include +#include #include #ifdef WIN32 @@ -20,20 +25,61 @@ const auto bind_address = "127.0.0.1"; // control bind address const u_short port = 1234; // control bind port -void ResponseThreadProc(int socket, sockaddr_in address) { - auto address_string = inet_ntoa(address.sin_addr); +struct Connection { + std::thread thread; + int socket; + sockaddr_in address; + String address_string; + String name; + folly::ProducerConsumerQueue send_queue{100}; + folly::CancellationSource cancellation_source; +}; + +void ResponseThreadProc(Connection *connection) { + auto host = ConvertCharString(inet_ntoa(connection->address.sin_addr)); + auto port = htons(connection->address.sin_port); + connection->address_string = fmt::format(CRUT("{}:{}"), host, port); std::string rest; - std::string name = SafeReadUntil(socket, '\n', rest); - + std::string n = SafeReadUntil(connection->socket, '\n', rest); + connection->name = ConvertCharString(n); SendOutput(CRUT("Connected to {}, whose name is {}."), - ConvertCharString(address_string), ConvertCharString(name)); + connection->address_string, connection->name); + + std::thread revieve_thread( + [](Connection *connection) { + std::string rest; + while (true) { + if (connection->cancellation_source.isCancellationRequested()) { + break; + } + + std::string s = SafeReadUntil(connection->socket, '\n', rest); - CloseSocket(socket); + SendOutput(CRUT("{}({}) send a message:\n{}\n"), connection->name, + connection->address_string, ConvertCharString(s)); + } + }, + connection); + + while (true) { + if (connection->cancellation_source.isCancellationRequested()) { + break; + } + + std::string s; + if (connection->send_queue.read(s)) { + SafeSend(connection->socket, s); + } + } + + CloseSocket(connection->socket); } int Main() { + std::vector connections; + int server_socket; if ((server_socket = socket(AF_INET, SOCK_STREAM, 0)) == -1) { @@ -74,8 +120,12 @@ int Main() { PrintErrorMessageAndExit(CRUT("Failed to accecpt.")); } - std::thread response_thread(ResponseThreadProc, client_socket, - client_address); - response_thread.detach(); + Connection connection; + connection.socket = client_socket; + connection.address = client_address; + connections.push_back(std::move(connection)); + + connection.thread = std::thread(ResponseThreadProc, &connections.back()); + connection.thread.detach(); } } -- cgit v1.2.3