aboutsummaryrefslogtreecommitdiff
path: root/works/life
diff options
context:
space:
mode:
authorcrupest <crupest@outlook.com>2021-06-08 10:16:22 +0800
committercrupest <crupest@outlook.com>2021-06-08 10:16:22 +0800
commit68cac85ea58c69301645167f92d94bdb6e360753 (patch)
treea1ebfc214cd9fc378ba21398faa82043c91ff82c /works/life
parent9c42586bc406015be0145576fd6cb3586686b4ca (diff)
downloadcrupest-68cac85ea58c69301645167f92d94bdb6e360753.tar.gz
crupest-68cac85ea58c69301645167f92d94bdb6e360753.tar.bz2
crupest-68cac85ea58c69301645167f92d94bdb6e360753.zip
import(life): ...
Diffstat (limited to 'works/life')
-rw-r--r--works/life/computer-network-experiment/IO.cpp3
-rw-r--r--works/life/computer-network-experiment/IO.h2
-rw-r--r--works/life/computer-network-experiment/client.cpp59
-rw-r--r--works/life/computer-network-experiment/server.cpp94
4 files changed, 138 insertions, 20 deletions
diff --git a/works/life/computer-network-experiment/IO.cpp b/works/life/computer-network-experiment/IO.cpp
index 0c20537..5d3fe12 100644
--- a/works/life/computer-network-experiment/IO.cpp
+++ b/works/life/computer-network-experiment/IO.cpp
@@ -9,9 +9,10 @@
folly::MPMCQueue<Output> output_queue(100);
+namespace {
folly::CancellationSource cancellation_source;
-
std::thread io_thread;
+}
void PrintOutput(const Output &output) {
std::basic_ostream<Char> *stream;
diff --git a/works/life/computer-network-experiment/IO.h b/works/life/computer-network-experiment/IO.h
index b0cf489..1658b78 100644
--- a/works/life/computer-network-experiment/IO.h
+++ b/works/life/computer-network-experiment/IO.h
@@ -64,3 +64,5 @@ void SignalAndWaitForOutputThreadStop();
void OnInputLine(StringView line);
void StartIOThread();
+
+String ReadInputLine(); \ No newline at end of file
diff --git a/works/life/computer-network-experiment/client.cpp b/works/life/computer-network-experiment/client.cpp
index a8ce8cf..c206856 100644
--- a/works/life/computer-network-experiment/client.cpp
+++ b/works/life/computer-network-experiment/client.cpp
@@ -5,6 +5,9 @@
#include "Common.h"
#include "IO.h"
+#include <folly/CancellationToken.h>
+#include <folly/ProducerConsumerQueue.h>
+
#ifdef WIN32
#include <Windows.h>
#include <winsock.h>
@@ -12,12 +15,29 @@
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
-
#endif
const auto connect_address = "127.0.0.1"; // control connect address
const u_short port = 1234; // control connect port
+namespace {
+folly::ProducerConsumerQueue<std::string> send_queue(100);
+folly::CancellationSource cancellation_source;
+} // namespace
+
+void PrintHelp() {
+ SendOutput(CRUT("Input anything to send to server. Or just enter to receive "
+ "lastest messages from server.\n"));
+}
+
+void OnInputLine(StringView line) {
+ if (line.empty()) {
+ return;
+ } else {
+ send_queue.write(ConvertCharStringBack(line) + '\n');
+ }
+}
+
int Main() {
int client_socket;
@@ -37,17 +57,42 @@ int Main() {
PrintErrorMessageAndExit(CRUT("Failed to connect!"));
}
- String name;
- {
- auto guard = BlockOutputThread();
- output_stream << CRUT("Please input your name:");
- name = ReadInputLine();
- }
+ output_stream << CRUT("Please input your name:\n> ");
+ String name = ReadInputLine();
+
+ PrintHelp();
+
+ StartIOThread();
name.push_back(CRUT('\n'));
auto name_data = ConvertCharStringBack(name);
SafeSend(client_socket, name_data);
+ std::thread receive_thread([client_socket] {
+ std::string rest;
+ while (true) {
+ if (cancellation_source.isCancellationRequested()) {
+ break;
+ }
+
+ std::string s = SafeReadUntil(client_socket, '\n', rest);
+
+ SendOutput(CRUT("Recived a message:\n{}\n"), ConvertCharString(s));
+ }
+ });
+ receive_thread.detach();
+
+ while (true) {
+ if (cancellation_source.isCancellationRequested()) {
+ break;
+ }
+
+ std::string s;
+ if (send_queue.read(s)) {
+ SafeSend(client_socket, s);
+ }
+ }
+
CloseSocket(client_socket);
return 0;
}
diff --git a/works/life/computer-network-experiment/server.cpp b/works/life/computer-network-experiment/server.cpp
index 7008c7b..9654687 100644
--- a/works/life/computer-network-experiment/server.cpp
+++ b/works/life/computer-network-experiment/server.cpp
@@ -8,6 +8,8 @@
#include <folly/CancellationToken.h>
#include <folly/ProducerConsumerQueue.h>
+#include <algorithm>
+#include <memory>
#include <optional>
#include <stdint.h>
#include <thread>
@@ -42,7 +44,20 @@ struct Connection {
folly::CancellationSource cancellation_source;
};
-std::vector<Connection> connections;
+std::vector<std::unique_ptr<Connection>> connections;
+
+void PrintConnections() {
+ if (connections.empty()) {
+ SendOutput(CRUT("Currently there is no connection.\n"));
+ }
+
+ String s;
+ for (const auto &connection : connections) {
+ s += fmt::format(CRUT("{}: {}({})\n"), connection->id, connection->name,
+ connection->address_string);
+ }
+ SendOutput(s);
+}
void ResponseThreadProc(Connection *connection) {
auto host = ConvertCharString(inet_ntoa(connection->address.sin_addr));
@@ -53,7 +68,7 @@ void ResponseThreadProc(Connection *connection) {
std::string n = SafeReadUntil(connection->socket, '\n', rest);
connection->name = ConvertCharString(n);
- SendOutput(CRUT("Connected to {}, whose name is {}."),
+ SendOutput(OutputColor::Green, CRUT("Connected to {}, whose name is {}.\n"),
connection->address_string, connection->name);
std::thread revieve_thread(
@@ -71,6 +86,7 @@ void ResponseThreadProc(Connection *connection) {
}
},
connection);
+ revieve_thread.detach();
while (true) {
if (connection->cancellation_source.isCancellationRequested()) {
@@ -86,9 +102,60 @@ void ResponseThreadProc(Connection *connection) {
CloseSocket(connection->socket);
}
-void OnInputLine(StringView line) { StringStream ss{String(line)};
- ss.
- }
+void OnInputLine(StringView line) {
+ StringStream ss{String(line)};
+
+ ss >> std::ws;
+ if (ss.eof())
+ return;
+
+ String command;
+ ss >> command;
+
+ if (command == CRUT("list")) {
+ if (!ss.eof()) {
+ SendOutput(OutputType::Error,
+ CRUT("List command can't have arguments!\n"));
+ PrintHelp();
+ } else {
+ PrintConnections();
+ }
+ return;
+ } else if (command == CRUT("send")) {
+ int id;
+ ss >> id;
+ if (!ss) {
+ SendOutput(OutputType::Error, CRUT("Send format error!\n"));
+ PrintHelp();
+ return;
+ }
+
+ String message;
+ getline(ss, message);
+
+ if (message.empty()) {
+ SendOutput(OutputType::Error, CRUT("Send message can't be empty.!\n"));
+ PrintHelp();
+ return;
+ }
+
+ auto i = std::find_if(
+ connections.begin(), connections.end(),
+ [id](const std::unique_ptr<Connection> &c) { return c->id == id; });
+
+ if (i == connections.end()) {
+ SendOutput(OutputType::Error, CRUT("No connection with such id.!\n"));
+ return;
+ }
+
+ (*i)->send_queue.write(ConvertCharStringBack(message) + "\n");
+ return;
+ } else {
+ SendOutput(OutputType::Error, CRUT("Unkown command!\n"));
+ PrintHelp();
+ return;
+ }
+}
int Main() {
int server_socket;
@@ -116,6 +183,8 @@ int Main() {
SendOutput(OutputColor::Green,
CRUT("Now start to accept incoming connection.\n"));
+ PrintHelp();
+
StartIOThread();
int current_id = 1;
@@ -135,13 +204,14 @@ int Main() {
PrintErrorMessageAndExit(CRUT("Failed to accecpt."));
}
- Connection connection;
- connection.id = current_id++;
- connection.socket = client_socket;
- connection.address = client_address;
- connections.push_back(std::move(connection));
+ connections.push_back(std::make_unique<Connection>());
+ const std::unique_ptr<Connection> &connection = connections.back();
- connection.thread = std::thread(ResponseThreadProc, &connections.back());
- connection.thread.detach();
+ connection->id = current_id++;
+ connection->socket = client_socket;
+ connection->address = client_address;
+ connection->thread =
+ std::thread(ResponseThreadProc, connections.back().get());
+ connection->thread.detach();
}
}