diff options
| -rw-r--r-- | works/life/computer-network-experiment/IO.cpp | 3 | ||||
| -rw-r--r-- | works/life/computer-network-experiment/IO.h | 2 | ||||
| -rw-r--r-- | works/life/computer-network-experiment/client.cpp | 59 | ||||
| -rw-r--r-- | works/life/computer-network-experiment/server.cpp | 94 | 
4 files changed, 138 insertions, 20 deletions
| diff --git a/works/life/computer-network-experiment/IO.cpp b/works/life/computer-network-experiment/IO.cpp index 0c20537..5d3fe12 100644 --- a/works/life/computer-network-experiment/IO.cpp +++ b/works/life/computer-network-experiment/IO.cpp @@ -9,9 +9,10 @@  folly::MPMCQueue<Output> output_queue(100);
 +namespace {
  folly::CancellationSource cancellation_source;
 -
  std::thread io_thread;
 +}
  void PrintOutput(const Output &output) {
    std::basic_ostream<Char> *stream;
 diff --git a/works/life/computer-network-experiment/IO.h b/works/life/computer-network-experiment/IO.h index b0cf489..1658b78 100644 --- a/works/life/computer-network-experiment/IO.h +++ b/works/life/computer-network-experiment/IO.h @@ -64,3 +64,5 @@ void SignalAndWaitForOutputThreadStop();  void OnInputLine(StringView line);
  void StartIOThread();
 +
 +String ReadInputLine();
\ No newline at end of file diff --git a/works/life/computer-network-experiment/client.cpp b/works/life/computer-network-experiment/client.cpp index a8ce8cf..c206856 100644 --- a/works/life/computer-network-experiment/client.cpp +++ b/works/life/computer-network-experiment/client.cpp @@ -5,6 +5,9 @@  #include "Common.h"
  #include "IO.h"
 +#include <folly/CancellationToken.h>
 +#include <folly/ProducerConsumerQueue.h>
 +
  #ifdef WIN32
  #include <Windows.h>
  #include <winsock.h>
 @@ -12,12 +15,29 @@  #include <arpa/inet.h>
  #include <netinet/in.h>
  #include <sys/socket.h>
 -
  #endif
  const auto connect_address = "127.0.0.1"; // control connect address
  const u_short port = 1234;                // control connect port
 +namespace {
 +folly::ProducerConsumerQueue<std::string> send_queue(100);
 +folly::CancellationSource cancellation_source;
 +} // namespace
 +
 +void PrintHelp() {
 +  SendOutput(CRUT("Input anything to send to server. Or just enter to receive "
 +                  "lastest messages from server.\n"));
 +}
 +
 +void OnInputLine(StringView line) {
 +  if (line.empty()) {
 +    return;
 +  } else {
 +    send_queue.write(ConvertCharStringBack(line) + '\n');
 +  }
 +}
 +
  int Main() {
    int client_socket;
 @@ -37,17 +57,42 @@ int Main() {      PrintErrorMessageAndExit(CRUT("Failed to connect!"));
    }
 -  String name;
 -  {
 -    auto guard = BlockOutputThread();
 -    output_stream << CRUT("Please input your name:");
 -    name = ReadInputLine();
 -  }
 +  output_stream << CRUT("Please input your name:\n> ");
 +  String name = ReadInputLine();
 +
 +  PrintHelp();
 +
 +  StartIOThread();
    name.push_back(CRUT('\n'));
    auto name_data = ConvertCharStringBack(name);
    SafeSend(client_socket, name_data);
 +  std::thread receive_thread([client_socket] {
 +    std::string rest;
 +    while (true) {
 +      if (cancellation_source.isCancellationRequested()) {
 +        break;
 +      }
 +
 +      std::string s = SafeReadUntil(client_socket, '\n', rest);
 +
 +      SendOutput(CRUT("Recived a message:\n{}\n"), ConvertCharString(s));
 +    }
 +  });
 +  receive_thread.detach();
 +
 +  while (true) {
 +    if (cancellation_source.isCancellationRequested()) {
 +      break;
 +    }
 +
 +    std::string s;
 +    if (send_queue.read(s)) {
 +      SafeSend(client_socket, s);
 +    }
 +  }
 +
    CloseSocket(client_socket);
    return 0;
  }
 diff --git a/works/life/computer-network-experiment/server.cpp b/works/life/computer-network-experiment/server.cpp index 7008c7b..9654687 100644 --- a/works/life/computer-network-experiment/server.cpp +++ b/works/life/computer-network-experiment/server.cpp @@ -8,6 +8,8 @@  #include <folly/CancellationToken.h>
  #include <folly/ProducerConsumerQueue.h>
 +#include <algorithm>
 +#include <memory>
  #include <optional>
  #include <stdint.h>
  #include <thread>
 @@ -42,7 +44,20 @@ struct Connection {    folly::CancellationSource cancellation_source;
  };
 -std::vector<Connection> connections;
 +std::vector<std::unique_ptr<Connection>> connections;
 +
 +void PrintConnections() {
 +  if (connections.empty()) {
 +    SendOutput(CRUT("Currently there is no connection.\n"));
 +  }
 +
 +  String s;
 +  for (const auto &connection : connections) {
 +    s += fmt::format(CRUT("{}: {}({})\n"), connection->id, connection->name,
 +                     connection->address_string);
 +  }
 +  SendOutput(s);
 +}
  void ResponseThreadProc(Connection *connection) {
    auto host = ConvertCharString(inet_ntoa(connection->address.sin_addr));
 @@ -53,7 +68,7 @@ void ResponseThreadProc(Connection *connection) {    std::string n = SafeReadUntil(connection->socket, '\n', rest);
    connection->name = ConvertCharString(n);
 -  SendOutput(CRUT("Connected to {}, whose name is {}."),
 +  SendOutput(OutputColor::Green, CRUT("Connected to {}, whose name is {}.\n"),
               connection->address_string, connection->name);
    std::thread revieve_thread(
 @@ -71,6 +86,7 @@ void ResponseThreadProc(Connection *connection) {          }
        },
        connection);
 +  revieve_thread.detach();
    while (true) {
      if (connection->cancellation_source.isCancellationRequested()) {
 @@ -86,9 +102,60 @@ void ResponseThreadProc(Connection *connection) {    CloseSocket(connection->socket);
  }
 -void OnInputLine(StringView line) { StringStream ss{String(line)};
 -  ss.
 - }
 +void OnInputLine(StringView line) {
 +  StringStream ss{String(line)};
 +
 +  ss >> std::ws;
 +  if (ss.eof())
 +    return;
 +
 +  String command;
 +  ss >> command;
 +
 +  if (command == CRUT("list")) {
 +    if (!ss.eof()) {
 +      SendOutput(OutputType::Error,
 +                 CRUT("List command can't have arguments!\n"));
 +      PrintHelp();
 +    } else {
 +      PrintConnections();
 +    }
 +    return;
 +  } else if (command == CRUT("send")) {
 +    int id;
 +    ss >> id;
 +    if (!ss) {
 +      SendOutput(OutputType::Error, CRUT("Send format error!\n"));
 +      PrintHelp();
 +      return;
 +    }
 +
 +    String message;
 +    getline(ss, message);
 +
 +    if (message.empty()) {
 +      SendOutput(OutputType::Error, CRUT("Send message can't be empty.!\n"));
 +      PrintHelp();
 +      return;
 +    }
 +
 +    auto i = std::find_if(
 +        connections.begin(), connections.end(),
 +        [id](const std::unique_ptr<Connection> &c) { return c->id == id; });
 +
 +    if (i == connections.end()) {
 +      SendOutput(OutputType::Error, CRUT("No connection with such id.!\n"));
 +      return;
 +    }
 +
 +    (*i)->send_queue.write(ConvertCharStringBack(message) + "\n");
 +    return;
 +  } else {
 +    SendOutput(OutputType::Error, CRUT("Unkown command!\n"));
 +    PrintHelp();
 +    return;
 +  }
 +}
  int Main() {
    int server_socket;
 @@ -116,6 +183,8 @@ int Main() {    SendOutput(OutputColor::Green,
               CRUT("Now start to accept incoming connection.\n"));
 +  PrintHelp();
 +
    StartIOThread();
    int current_id = 1;
 @@ -135,13 +204,14 @@ int Main() {        PrintErrorMessageAndExit(CRUT("Failed to accecpt."));
      }
 -    Connection connection;
 -    connection.id = current_id++;
 -    connection.socket = client_socket;
 -    connection.address = client_address;
 -    connections.push_back(std::move(connection));
 +    connections.push_back(std::make_unique<Connection>());
 +    const std::unique_ptr<Connection> &connection = connections.back();
 -    connection.thread = std::thread(ResponseThreadProc, &connections.back());
 -    connection.thread.detach();
 +    connection->id = current_id++;
 +    connection->socket = client_socket;
 +    connection->address = client_address;
 +    connection->thread =
 +        std::thread(ResponseThreadProc, connections.back().get());
 +    connection->thread.detach();
    }
  }
 | 
