diff options
| author | crupest <crupest@outlook.com> | 2021-06-07 21:13:44 +0800 | 
|---|---|---|
| committer | crupest <crupest@outlook.com> | 2021-06-07 21:13:44 +0800 | 
| commit | 08325faff52d0704c6f17d065ca8d72ea07ca6a0 (patch) | |
| tree | 9a0dad369346cf11670c7d3db45a8d6065bfae31 /works/life/computer-network-experiment | |
| parent | 42f7fc1876cbe68569771b97a8935fbca7fa3ee4 (diff) | |
| download | crupest-08325faff52d0704c6f17d065ca8d72ea07ca6a0.tar.gz crupest-08325faff52d0704c6f17d065ca8d72ea07ca6a0.tar.bz2 crupest-08325faff52d0704c6f17d065ca8d72ea07ca6a0.zip  | |
import(life): ...
Diffstat (limited to 'works/life/computer-network-experiment')
| -rw-r--r-- | works/life/computer-network-experiment/server.cpp | 68 | 
1 files changed, 59 insertions, 9 deletions
diff --git a/works/life/computer-network-experiment/server.cpp b/works/life/computer-network-experiment/server.cpp index 2877c61..3c87ea0 100644 --- a/works/life/computer-network-experiment/server.cpp +++ b/works/life/computer-network-experiment/server.cpp @@ -4,8 +4,13 @@  #include "Common.h"
  #include "Output.h"
 +#include "fmt/core.h"
 +
 +#include <folly/CancellationToken.h>
 +#include <folly/ProducerConsumerQueue.h>
  #include <optional>
 +#include <stdint.h>
  #include <thread>
  #ifdef WIN32
 @@ -20,20 +25,61 @@  const auto bind_address = "127.0.0.1"; // control bind address
  const u_short port = 1234;             // control bind port
 -void ResponseThreadProc(int socket, sockaddr_in address) {
 -  auto address_string = inet_ntoa(address.sin_addr);
 +struct Connection {
 +  std::thread thread;
 +  int socket;
 +  sockaddr_in address;
 +  String address_string;
 +  String name;
 +  folly::ProducerConsumerQueue<std::string> send_queue{100};
 +  folly::CancellationSource cancellation_source;
 +};
 +
 +void ResponseThreadProc(Connection *connection) {
 +  auto host = ConvertCharString(inet_ntoa(connection->address.sin_addr));
 +  auto port = htons(connection->address.sin_port);
 +  connection->address_string = fmt::format(CRUT("{}:{}"), host, port);
    std::string rest;
 -  std::string name = SafeReadUntil(socket, '\n', rest);
 -
 +  std::string n = SafeReadUntil(connection->socket, '\n', rest);
 +  connection->name = ConvertCharString(n);
    SendOutput(CRUT("Connected to {}, whose name is {}."),
 -             ConvertCharString(address_string), ConvertCharString(name));
 +             connection->address_string, connection->name);
 +
 +  std::thread revieve_thread(
 +      [](Connection *connection) {
 +        std::string rest;
 +        while (true) {
 +          if (connection->cancellation_source.isCancellationRequested()) {
 +            break;
 +          }
 +
 +          std::string s = SafeReadUntil(connection->socket, '\n', rest);
 -  CloseSocket(socket);
 +          SendOutput(CRUT("{}({}) send a message:\n{}\n"), connection->name,
 +                     connection->address_string, ConvertCharString(s));
 +        }
 +      },
 +      connection);
 +
 +  while (true) {
 +    if (connection->cancellation_source.isCancellationRequested()) {
 +      break;
 +    }
 +
 +    std::string s;
 +    if (connection->send_queue.read(s)) {
 +      SafeSend(connection->socket, s);
 +    }
 +  }
 +
 +  CloseSocket(connection->socket);
  }
  int Main() {
 +  std::vector<Connection> connections;
 +
    int server_socket;
    if ((server_socket = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
 @@ -74,8 +120,12 @@ int Main() {        PrintErrorMessageAndExit(CRUT("Failed to accecpt."));
      }
 -    std::thread response_thread(ResponseThreadProc, client_socket,
 -                                client_address);
 -    response_thread.detach();
 +    Connection connection;
 +    connection.socket = client_socket;
 +    connection.address = client_address;
 +    connections.push_back(std::move(connection));
 +
 +    connection.thread = std::thread(ResponseThreadProc, &connections.back());
 +    connection.thread.detach();
    }
  }
  | 
