diff options
author | crupest <crupest@outlook.com> | 2021-06-07 21:13:44 +0800 |
---|---|---|
committer | crupest <crupest@outlook.com> | 2021-06-07 21:13:44 +0800 |
commit | 08325faff52d0704c6f17d065ca8d72ea07ca6a0 (patch) | |
tree | 9a0dad369346cf11670c7d3db45a8d6065bfae31 /works/life | |
parent | 42f7fc1876cbe68569771b97a8935fbca7fa3ee4 (diff) | |
download | crupest-08325faff52d0704c6f17d065ca8d72ea07ca6a0.tar.gz crupest-08325faff52d0704c6f17d065ca8d72ea07ca6a0.tar.bz2 crupest-08325faff52d0704c6f17d065ca8d72ea07ca6a0.zip |
import(life): ...
Diffstat (limited to 'works/life')
-rw-r--r-- | works/life/computer-network-experiment/server.cpp | 68 |
1 files changed, 59 insertions, 9 deletions
diff --git a/works/life/computer-network-experiment/server.cpp b/works/life/computer-network-experiment/server.cpp index 2877c61..3c87ea0 100644 --- a/works/life/computer-network-experiment/server.cpp +++ b/works/life/computer-network-experiment/server.cpp @@ -4,8 +4,13 @@ #include "Common.h"
#include "Output.h"
+#include "fmt/core.h"
+
+#include <folly/CancellationToken.h>
+#include <folly/ProducerConsumerQueue.h>
#include <optional>
+#include <stdint.h>
#include <thread>
#ifdef WIN32
@@ -20,20 +25,61 @@ const auto bind_address = "127.0.0.1"; // control bind address
const u_short port = 1234; // control bind port
-void ResponseThreadProc(int socket, sockaddr_in address) {
- auto address_string = inet_ntoa(address.sin_addr);
+struct Connection {
+ std::thread thread;
+ int socket;
+ sockaddr_in address;
+ String address_string;
+ String name;
+ folly::ProducerConsumerQueue<std::string> send_queue{100};
+ folly::CancellationSource cancellation_source;
+};
+
+void ResponseThreadProc(Connection *connection) {
+ auto host = ConvertCharString(inet_ntoa(connection->address.sin_addr));
+ auto port = htons(connection->address.sin_port);
+ connection->address_string = fmt::format(CRUT("{}:{}"), host, port);
std::string rest;
- std::string name = SafeReadUntil(socket, '\n', rest);
-
+ std::string n = SafeReadUntil(connection->socket, '\n', rest);
+ connection->name = ConvertCharString(n);
SendOutput(CRUT("Connected to {}, whose name is {}."),
- ConvertCharString(address_string), ConvertCharString(name));
+ connection->address_string, connection->name);
+
+ std::thread revieve_thread(
+ [](Connection *connection) {
+ std::string rest;
+ while (true) {
+ if (connection->cancellation_source.isCancellationRequested()) {
+ break;
+ }
+
+ std::string s = SafeReadUntil(connection->socket, '\n', rest);
- CloseSocket(socket);
+ SendOutput(CRUT("{}({}) send a message:\n{}\n"), connection->name,
+ connection->address_string, ConvertCharString(s));
+ }
+ },
+ connection);
+
+ while (true) {
+ if (connection->cancellation_source.isCancellationRequested()) {
+ break;
+ }
+
+ std::string s;
+ if (connection->send_queue.read(s)) {
+ SafeSend(connection->socket, s);
+ }
+ }
+
+ CloseSocket(connection->socket);
}
int Main() {
+ std::vector<Connection> connections;
+
int server_socket;
if ((server_socket = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
@@ -74,8 +120,12 @@ int Main() { PrintErrorMessageAndExit(CRUT("Failed to accecpt."));
}
- std::thread response_thread(ResponseThreadProc, client_socket,
- client_address);
- response_thread.detach();
+ Connection connection;
+ connection.socket = client_socket;
+ connection.address = client_address;
+ connections.push_back(std::move(connection));
+
+ connection.thread = std::thread(ResponseThreadProc, &connections.back());
+ connection.thread.detach();
}
}
|