From 99e2e923d0c77b02f3fb4ff648ea916954868606 Mon Sep 17 00:00:00 2001 From: Yuqian Yang Date: Fri, 28 Feb 2025 23:13:39 +0800 Subject: chore(store): move everything to store. --- .../life/computer-network-experiment/server.cpp | 261 +++++++++++++++++++++ 1 file changed, 261 insertions(+) create mode 100644 store/works/life/computer-network-experiment/server.cpp (limited to 'store/works/life/computer-network-experiment/server.cpp') diff --git a/store/works/life/computer-network-experiment/server.cpp b/store/works/life/computer-network-experiment/server.cpp new file mode 100644 index 0000000..065687c --- /dev/null +++ b/store/works/life/computer-network-experiment/server.cpp @@ -0,0 +1,261 @@ +/** Created by crupest. + * This is the server program. + */ + +#include "Common.h" +#include "IO.h" +#include "ReadWriteLock.h" + +#include +#include + +#include +#include +#include +#include +#include + +#ifdef WIN32 +#include +#include +#else +#include +#include +#include +#endif + +const auto bind_address = "127.0.0.1"; // control bind address +const u_short port = 1234; // control bind port + +namespace { +void PrintHelp() { + SendOutput(CRUT( + "Input and run one of following command:\n\t> NOTHING -> Continue and " + "print new messages.\n\t> list -> List all connected client.\n\t> send " + "[i] [message] -> Send messages to client with number i.\n")); +} +} // namespace + +struct Connection { + int id; + std::thread thread; + std::thread receive_thread; + int socket; + sockaddr_in address; + String address_string; + String name; + folly::ProducerConsumerQueue send_queue{100}; + folly::CancellationSource cancellation_source; +}; + +namespace { +cru::ReadWriteLock connections_lock; +std::vector> connections; + +void RemoveConnection(int id) { + connections_lock.WriteLock(); + connections.erase( + std::remove_if(connections.begin(), connections.end(), + [id](const std::unique_ptr &connection) { + return connection->id == id; + }), + connections.end()); + + connections_lock.WriteUnlock(); +} + +void PrintConnections() { + connections_lock.ReadLock(); + if (connections.empty()) { + SendOutput(CRUT("Currently there is no connection.\n")); + } + + String s; + for (const auto &connection : connections) { + s += fmt::format(CRUT("{}: {}({})\n"), connection->id, connection->name, + connection->address_string); + } + SendOutput(s); + connections_lock.ReadUnlock(); +} +} // namespace + +void ResponseThreadProc(Connection *connection) { + auto host = ConvertCharString(inet_ntoa(connection->address.sin_addr)); + auto port = htons(connection->address.sin_port); + connection->address_string = fmt::format(CRUT("{}:{}"), host, port); + + std::string rest; + std::string name_data; + if (!SafeReadUntil(connection->socket, '\n', name_data, rest)) { + SendOutput(OutputType::Error, CRUT("Failed to read name of {}.\n"), + connection->address_string); + CloseSocket(connection->socket); + return; + } + + connection->name = ConvertCharString(name_data); + SendOutput(OutputColor::Green, CRUT("Connected to {}, whose name is {}.\n"), + connection->address_string, connection->name); + + connection->receive_thread = std::thread( + [](Connection *connection) { + std::string rest; + while (true) { + if (connection->cancellation_source.isCancellationRequested()) { + break; + } + + std::string data; + + if (!SafeReadUntil(connection->socket, '\n', data, rest)) { + SendOutput(OutputType::Error, + CRUT("Failed read data from socket of {}({}).\n"), + connection->name, connection->address_string); + connection->cancellation_source.requestCancellation(); + return; + } + + SendOutput(CRUT("{}({}) send a message:\n{}\n"), connection->name, + connection->address_string, ConvertCharString(data)); + } + }, + connection); + connection->receive_thread.detach(); + + while (true) { + if (connection->cancellation_source.isCancellationRequested()) { + break; + } + + std::string s; + if (connection->send_queue.read(s)) { + if (!SafeSend(connection->socket, s)) { + SendOutput(OutputType::Error, CRUT("Failed send data to {}({}).\n"), + connection->name, connection->address_string); + connection->cancellation_source.requestCancellation(); + break; + } + } + } + + CloseSocket(connection->socket); + + RemoveConnection(connection->id); +} + +void OnInputLine(StringView line) { + StringStream ss{String(line)}; + + ss >> std::ws; + if (ss.eof()) + return; + + String command; + ss >> command; + + if (command == CRUT("list")) { + if (!ss.eof()) { + SendOutput(OutputType::Error, + CRUT("List command can't have arguments!\n")); + PrintHelp(); + } else { + PrintConnections(); + } + return; + } else if (command == CRUT("send")) { + int id; + ss >> id; + if (!ss) { + SendOutput(OutputType::Error, CRUT("Send format error!\n")); + PrintHelp(); + return; + } + + String message; + getline(ss, message); + + if (message.empty()) { + SendOutput(OutputType::Error, CRUT("Send message can't be empty.!\n")); + PrintHelp(); + return; + } + + auto i = std::find_if( + connections.begin(), connections.end(), + [id](const std::unique_ptr &c) { return c->id == id; }); + + if (i == connections.end()) { + SendOutput(OutputType::Error, CRUT("No connection with such id.\n")); + return; + } + + (*i)->send_queue.write(ConvertCharStringBack(message) + "\n"); + return; + } else { + SendOutput(OutputType::Error, CRUT("Unkown command!\n")); + PrintHelp(); + return; + } +} + +int Main() { + int server_socket; + + if ((server_socket = socket(AF_INET, SOCK_STREAM, 0)) == -1) { + PrintErrorMessageAndExit(CRUT("Failed to create socket.")); + } + + sockaddr_in server_address; + + server_address.sin_family = AF_INET; + server_address.sin_port = htons(port); + server_address.sin_addr.s_addr = inet_addr(bind_address); + memset(&(server_address.sin_zero), 0, sizeof(server_address.sin_zero)); + + if (bind(server_socket, reinterpret_cast(&server_address), + sizeof(sockaddr_in)) == -1) { + PrintErrorMessageAndExit(CRUT("Failed to bind.")); + } + + if (listen(server_socket, SOMAXCONN) == -1) { + PrintErrorMessageAndExit(CRUT("Failed to listen.")); + } + + SendOutput(OutputColor::Green, + CRUT("Now start to accept incoming connection.\n")); + + PrintHelp(); + + StartIOThread(); + + int current_id = 1; + + while (true) { + sockaddr_in client_address; + int client_socket; + unsigned sin_size = sizeof(sockaddr_in); + client_socket = + accept(server_socket, reinterpret_cast(&client_address), +#ifdef WIN32 + reinterpret_cast +#endif + (&sin_size)); + + if (client_socket == -1) { + PrintErrorMessageAndExit(CRUT("Failed to accecpt.")); + } + + connections_lock.WriteLock(); + connections.push_back(std::make_unique()); + const std::unique_ptr &connection = connections.back(); + + connection->id = current_id++; + connection->socket = client_socket; + connection->address = client_address; + connection->thread = + std::thread(ResponseThreadProc, connections.back().get()); + connection->thread.detach(); + connections_lock.WriteUnlock(); + } +} -- cgit v1.2.3