aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYuqian Yang <crupest@crupest.life>2025-09-07 22:27:17 +0800
committerYuqian Yang <crupest@crupest.life>2025-09-07 22:27:17 +0800
commitdedc046844c7e4e7b53cdc6935fc896f64da2fe5 (patch)
treed62bd84546e7834a1abdc43d9b962570a0a52783
parent0b5085db82c06b4dfd9d46dafeee8e3a3e4b21ce (diff)
downloadcru-dedc046844c7e4e7b53cdc6935fc896f64da2fe5.tar.gz
cru-dedc046844c7e4e7b53cdc6935fc896f64da2fe5.tar.bz2
cru-dedc046844c7e4e7b53cdc6935fc896f64da2fe5.zip
More single-threaded event loop.
-rw-r--r--include/cru/base/platform/unix/EventLoop.h25
-rw-r--r--src/base/platform/unix/EventLoop.cpp68
-rw-r--r--test/base/platform/unix/EventLoopTest.cpp4
3 files changed, 55 insertions, 42 deletions
diff --git a/include/cru/base/platform/unix/EventLoop.h b/include/cru/base/platform/unix/EventLoop.h
index a3fac176..1fea6598 100644
--- a/include/cru/base/platform/unix/EventLoop.h
+++ b/include/cru/base/platform/unix/EventLoop.h
@@ -47,8 +47,16 @@ class UnixTimerFile : public Object2 {
class UnixEventLoop : public Object2 {
CRU_DEFINE_CLASS_LOG_TAG("cru::platform::unix::UnixEventLoop")
public:
+ using PollHandler =
+ std::function<void(decltype(std::declval<pollfd>().revents) revent)>;
+
UnixEventLoop();
+ /**
+ * The only thread-safe function.
+ */
+ void QueueAction(std::function<void()> action);
+
int Run();
void RequestQuit(int exit_code = 0);
@@ -71,6 +79,9 @@ class UnixEventLoop : public Object2 {
return this->SetTimer(std::move(action), std::move(interval), true);
}
+ void AddPoll(int fd, PollHandler action);
+ void RemovePoll(int fd);
+
private:
struct TimerData {
int id;
@@ -91,23 +102,19 @@ class UnixEventLoop : public Object2 {
private:
bool CheckPoll();
bool CheckTimer();
- bool ReadTimerPipe();
-
- void RemoveTimer(int id);
+ bool CheckActionPipe();
private:
std::optional<std::thread::id> running_thread_;
std::vector<pollfd> polls_;
- std::vector<
- std::function<void(decltype(std::declval<pollfd>().revents) revent)>>
- poll_actions_;
+ std::vector<PollHandler> poll_actions_;
- std::atomic_int timer_tag_;
+ int timer_tag_;
std::vector<TimerData> timers_;
- UnixFileDescriptor timer_pipe_read_end_;
- UnixFileDescriptor timer_pipe_write_end_;
+ UnixFileDescriptor action_pipe_read_end_;
+ UnixFileDescriptor action_pipe_write_end_;
std::optional<int> exit_code_;
};
diff --git a/src/base/platform/unix/EventLoop.cpp b/src/base/platform/unix/EventLoop.cpp
index c6a80835..53c54866 100644
--- a/src/base/platform/unix/EventLoop.cpp
+++ b/src/base/platform/unix/EventLoop.cpp
@@ -1,23 +1,34 @@
#include "cru/base/platform/unix/EventLoop.h"
+#include "cru/base/Base.h"
#include "cru/base/Exception.h"
#include <poll.h>
#include <algorithm>
#include <chrono>
+#include <functional>
#include <thread>
namespace cru::platform::unix {
int UnixTimerFile::GetReadFd() const { return this->read_fd_; }
UnixEventLoop::UnixEventLoop() : polls_(1), poll_actions_(1), timer_tag_(1) {
- auto timer_pipe = OpenUniDirectionalPipe(UnixPipeFlags::NonBlock);
- timer_pipe_read_end_ = std::move(timer_pipe.read);
- timer_pipe_write_end_ = std::move(timer_pipe.write);
- polls_[0].fd = timer_pipe_read_end_;
+ auto action_pipe = OpenUniDirectionalPipe(UnixPipeFlags::NonBlock);
+ action_pipe_read_end_ = std::move(action_pipe.read);
+ action_pipe_write_end_ = std::move(action_pipe.write);
+ polls_[0].fd = action_pipe_read_end_;
polls_[0].events = POLLIN;
poll_actions_[0] = [](auto _) {};
}
+void UnixEventLoop::QueueAction(std::function<void()> action) {
+ if (std::this_thread::get_id() == running_thread_) {
+ action();
+ } else {
+ auto pointer = new std::function<void()>(std::move(action));
+ action_pipe_write_end_.Write(&pointer, sizeof(decltype(pointer)));
+ }
+}
+
int UnixEventLoop::Run() {
running_thread_ = std::this_thread::get_id();
@@ -32,7 +43,7 @@ int UnixEventLoop::Run() {
continue;
}
- while (ReadTimerPipe()) {
+ while (CheckActionPipe()) {
continue;
}
@@ -63,12 +74,7 @@ int UnixEventLoop::Run() {
return exit_code_.value();
}
-void UnixEventLoop::RequestQuit(int exit_code) {
- exit_code_ = exit_code;
- if (std::this_thread::get_id() != running_thread_) {
- SetImmediate([] {});
- }
-}
+void UnixEventLoop::RequestQuit(int exit_code) { exit_code_ = exit_code; }
int UnixEventLoop::SetTimer(std::function<void()> action,
std::chrono::milliseconds timeout, bool repeat) {
@@ -84,23 +90,17 @@ int UnixEventLoop::SetTimer(std::function<void()> action,
auto tag = timer_tag_++;
- if (std::this_thread::get_id() == running_thread_) {
- timers_.push_back(
- TimerData(tag, std::move(timeout), repeat, std::move(action)));
- } else {
- auto timer =
- new TimerData(tag, std::move(timeout), repeat, std::move(action));
- timer_pipe_write_end_.Write(&timer, sizeof(decltype(timer)));
- }
+ timers_.push_back(
+ TimerData(tag, std::move(timeout), repeat, std::move(action)));
return tag;
}
void UnixEventLoop::CancelTimer(int id) {
- if (std::this_thread::get_id() == running_thread_) {
- RemoveTimer(id);
- } else {
- SetImmediate([this, id] { RemoveTimer(id); });
+ auto iter = std::ranges::find_if(
+ timers_, [id](const TimerData &timer) { return timer.id == id; });
+ if (iter != timers_.cend()) {
+ timers_.erase(iter);
}
}
@@ -145,12 +145,12 @@ bool UnixEventLoop::CheckTimer() {
return true;
}
-bool UnixEventLoop::ReadTimerPipe() {
- TimerData *pointer;
+bool UnixEventLoop::CheckActionPipe() {
+ std::function<void()> *pointer;
constexpr size_t pointer_size = sizeof(decltype(pointer));
auto rest = pointer_size;
while (rest > 0) {
- auto result = timer_pipe_read_end_.Read(&pointer, rest);
+ auto result = action_pipe_read_end_.Read(&pointer, rest);
if (result == -1) { // If no data.
if (rest == pointer_size) {
@@ -168,18 +168,22 @@ bool UnixEventLoop::ReadTimerPipe() {
rest -= result;
}
- timers_.push_back(std::move(*pointer));
+ (*pointer)();
delete pointer;
return true;
}
-void UnixEventLoop::RemoveTimer(int id) {
- auto iter = std::ranges::find_if(
- timers_, [id](const TimerData &timer) { return timer.id == id; });
- if (iter != timers_.cend()) {
- timers_.erase(iter);
+void UnixEventLoop::AddPoll(int fd, PollHandler action) {
+ for (const auto &poll_fd : polls_) {
+ if (poll_fd.fd == fd) {
+ throw Exception("The file descriptor is already in poll list.");
+ }
}
+
+ NotImplemented();
}
+void UnixEventLoop::RemovePoll(int fd) { NotImplemented(); }
+
} // namespace cru::platform::unix
diff --git a/test/base/platform/unix/EventLoopTest.cpp b/test/base/platform/unix/EventLoopTest.cpp
index 4660efb3..5d4dbcf2 100644
--- a/test/base/platform/unix/EventLoopTest.cpp
+++ b/test/base/platform/unix/EventLoopTest.cpp
@@ -25,7 +25,7 @@ TEST_CASE("UnixTimerFile Work", "[unix][time]") {
REQUIRE(delay < std::chrono::milliseconds(500));
}
-TEST_CASE("UnixEventLoop Timer", "[unix][time]") {
+TEST_CASE("UnixEventLoop Work", "[unix][time]") {
using namespace cru;
using namespace cru::platform::unix;
@@ -35,6 +35,8 @@ TEST_CASE("UnixEventLoop Timer", "[unix][time]") {
auto test_duration = std::chrono::milliseconds(test_miliseconds);
auto start = std::chrono::steady_clock::now();
+ auto test_pipe = OpenUniDirectionalPipe();
+
int counter = 0;
loop.SetTimeout(