From 9c42586bc406015be0145576fd6cb3586686b4ca Mon Sep 17 00:00:00 2001 From: crupest Date: Mon, 7 Jun 2021 23:24:43 +0800 Subject: import(life): ... --- works/life/computer-network-experiment/IO.cpp | 80 +++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 works/life/computer-network-experiment/IO.cpp (limited to 'works/life/computer-network-experiment/IO.cpp') diff --git a/works/life/computer-network-experiment/IO.cpp b/works/life/computer-network-experiment/IO.cpp new file mode 100644 index 0000000..0c20537 --- /dev/null +++ b/works/life/computer-network-experiment/IO.cpp @@ -0,0 +1,80 @@ +#include "IO.h" + +#include + +#include +#include +#include +#include + +folly::MPMCQueue output_queue(100); + +folly::CancellationSource cancellation_source; + +std::thread io_thread; + +void PrintOutput(const Output &output) { + std::basic_ostream *stream; + + switch (output.type) { + case OutputType::Error: + stream = &error_stream; + break; + default: + stream = &output_stream; + break; + } + + switch (output.color) { + case OutputColor::Normal: + (*stream) << output.message; + break; + case OutputColor::Green: + (*stream) << CRUT("\x1b[32m") << output.message << CRUT("\x1b[39m") + << std::flush; + break; + case OutputColor::Red: + (*stream) << CRUT("\x1b[31m") << output.message << CRUT("\x1b[39m") + << std::flush; + break; + case OutputColor::Yellow: + (*stream) << CRUT("\x1b[33m") << output.message << CRUT("\x1b[39m") + << std::flush; + break; + } +} + +String ReadInputLine() { + String line; + std::getline(input_stream, line); + return line; +} + +void IOThread() { + while (true) { + if (cancellation_source.isCancellationRequested()) { + while (true) { + Output output; + if (output_queue.readIfNotEmpty(output)) { + PrintOutput(output); + } else { + return; + } + } + } + + Output output; + while (output_queue.readIfNotEmpty(output)) + PrintOutput(output); + + PrintOutput({CRUT("> ")}); + OnInputLine(ReadInputLine()); + } +} + +void SignalAndWaitForOutputThreadStop() { + cancellation_source.requestCancellation(); + io_thread.join(); +} + +void StartIOThread() { io_thread = std::thread(IOThread); } -- cgit v1.2.3 From 68cac85ea58c69301645167f92d94bdb6e360753 Mon Sep 17 00:00:00 2001 From: crupest Date: Tue, 8 Jun 2021 10:16:22 +0800 Subject: import(life): ... --- works/life/computer-network-experiment/IO.cpp | 3 +- works/life/computer-network-experiment/IO.h | 2 + works/life/computer-network-experiment/client.cpp | 59 ++++++++++++-- works/life/computer-network-experiment/server.cpp | 94 ++++++++++++++++++++--- 4 files changed, 138 insertions(+), 20 deletions(-) (limited to 'works/life/computer-network-experiment/IO.cpp') diff --git a/works/life/computer-network-experiment/IO.cpp b/works/life/computer-network-experiment/IO.cpp index 0c20537..5d3fe12 100644 --- a/works/life/computer-network-experiment/IO.cpp +++ b/works/life/computer-network-experiment/IO.cpp @@ -9,9 +9,10 @@ folly::MPMCQueue output_queue(100); +namespace { folly::CancellationSource cancellation_source; - std::thread io_thread; +} void PrintOutput(const Output &output) { std::basic_ostream *stream; diff --git a/works/life/computer-network-experiment/IO.h b/works/life/computer-network-experiment/IO.h index b0cf489..1658b78 100644 --- a/works/life/computer-network-experiment/IO.h +++ b/works/life/computer-network-experiment/IO.h @@ -64,3 +64,5 @@ void SignalAndWaitForOutputThreadStop(); void OnInputLine(StringView line); void StartIOThread(); + +String ReadInputLine(); \ No newline at end of file diff --git a/works/life/computer-network-experiment/client.cpp b/works/life/computer-network-experiment/client.cpp index a8ce8cf..c206856 100644 --- a/works/life/computer-network-experiment/client.cpp +++ b/works/life/computer-network-experiment/client.cpp @@ -5,6 +5,9 @@ #include "Common.h" #include "IO.h" +#include +#include + #ifdef WIN32 #include #include @@ -12,12 +15,29 @@ #include #include #include - #endif const auto connect_address = "127.0.0.1"; // control connect address const u_short port = 1234; // control connect port +namespace { +folly::ProducerConsumerQueue send_queue(100); +folly::CancellationSource cancellation_source; +} // namespace + +void PrintHelp() { + SendOutput(CRUT("Input anything to send to server. Or just enter to receive " + "lastest messages from server.\n")); +} + +void OnInputLine(StringView line) { + if (line.empty()) { + return; + } else { + send_queue.write(ConvertCharStringBack(line) + '\n'); + } +} + int Main() { int client_socket; @@ -37,17 +57,42 @@ int Main() { PrintErrorMessageAndExit(CRUT("Failed to connect!")); } - String name; - { - auto guard = BlockOutputThread(); - output_stream << CRUT("Please input your name:"); - name = ReadInputLine(); - } + output_stream << CRUT("Please input your name:\n> "); + String name = ReadInputLine(); + + PrintHelp(); + + StartIOThread(); name.push_back(CRUT('\n')); auto name_data = ConvertCharStringBack(name); SafeSend(client_socket, name_data); + std::thread receive_thread([client_socket] { + std::string rest; + while (true) { + if (cancellation_source.isCancellationRequested()) { + break; + } + + std::string s = SafeReadUntil(client_socket, '\n', rest); + + SendOutput(CRUT("Recived a message:\n{}\n"), ConvertCharString(s)); + } + }); + receive_thread.detach(); + + while (true) { + if (cancellation_source.isCancellationRequested()) { + break; + } + + std::string s; + if (send_queue.read(s)) { + SafeSend(client_socket, s); + } + } + CloseSocket(client_socket); return 0; } diff --git a/works/life/computer-network-experiment/server.cpp b/works/life/computer-network-experiment/server.cpp index 7008c7b..9654687 100644 --- a/works/life/computer-network-experiment/server.cpp +++ b/works/life/computer-network-experiment/server.cpp @@ -8,6 +8,8 @@ #include #include +#include +#include #include #include #include @@ -42,7 +44,20 @@ struct Connection { folly::CancellationSource cancellation_source; }; -std::vector connections; +std::vector> connections; + +void PrintConnections() { + if (connections.empty()) { + SendOutput(CRUT("Currently there is no connection.\n")); + } + + String s; + for (const auto &connection : connections) { + s += fmt::format(CRUT("{}: {}({})\n"), connection->id, connection->name, + connection->address_string); + } + SendOutput(s); +} void ResponseThreadProc(Connection *connection) { auto host = ConvertCharString(inet_ntoa(connection->address.sin_addr)); @@ -53,7 +68,7 @@ void ResponseThreadProc(Connection *connection) { std::string n = SafeReadUntil(connection->socket, '\n', rest); connection->name = ConvertCharString(n); - SendOutput(CRUT("Connected to {}, whose name is {}."), + SendOutput(OutputColor::Green, CRUT("Connected to {}, whose name is {}.\n"), connection->address_string, connection->name); std::thread revieve_thread( @@ -71,6 +86,7 @@ void ResponseThreadProc(Connection *connection) { } }, connection); + revieve_thread.detach(); while (true) { if (connection->cancellation_source.isCancellationRequested()) { @@ -86,9 +102,60 @@ void ResponseThreadProc(Connection *connection) { CloseSocket(connection->socket); } -void OnInputLine(StringView line) { StringStream ss{String(line)}; - ss. - } +void OnInputLine(StringView line) { + StringStream ss{String(line)}; + + ss >> std::ws; + if (ss.eof()) + return; + + String command; + ss >> command; + + if (command == CRUT("list")) { + if (!ss.eof()) { + SendOutput(OutputType::Error, + CRUT("List command can't have arguments!\n")); + PrintHelp(); + } else { + PrintConnections(); + } + return; + } else if (command == CRUT("send")) { + int id; + ss >> id; + if (!ss) { + SendOutput(OutputType::Error, CRUT("Send format error!\n")); + PrintHelp(); + return; + } + + String message; + getline(ss, message); + + if (message.empty()) { + SendOutput(OutputType::Error, CRUT("Send message can't be empty.!\n")); + PrintHelp(); + return; + } + + auto i = std::find_if( + connections.begin(), connections.end(), + [id](const std::unique_ptr &c) { return c->id == id; }); + + if (i == connections.end()) { + SendOutput(OutputType::Error, CRUT("No connection with such id.!\n")); + return; + } + + (*i)->send_queue.write(ConvertCharStringBack(message) + "\n"); + return; + } else { + SendOutput(OutputType::Error, CRUT("Unkown command!\n")); + PrintHelp(); + return; + } +} int Main() { int server_socket; @@ -116,6 +183,8 @@ int Main() { SendOutput(OutputColor::Green, CRUT("Now start to accept incoming connection.\n")); + PrintHelp(); + StartIOThread(); int current_id = 1; @@ -135,13 +204,14 @@ int Main() { PrintErrorMessageAndExit(CRUT("Failed to accecpt.")); } - Connection connection; - connection.id = current_id++; - connection.socket = client_socket; - connection.address = client_address; - connections.push_back(std::move(connection)); + connections.push_back(std::make_unique()); + const std::unique_ptr &connection = connections.back(); - connection.thread = std::thread(ResponseThreadProc, &connections.back()); - connection.thread.detach(); + connection->id = current_id++; + connection->socket = client_socket; + connection->address = client_address; + connection->thread = + std::thread(ResponseThreadProc, connections.back().get()); + connection->thread.detach(); } } -- cgit v1.2.3