aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/cru/base/concurrent/ConcurrentQueue.h88
-rw-r--r--include/cru/base/log/Logger.h15
-rw-r--r--include/cru/base/platform/unix/EventLoop.h5
-rw-r--r--src/base/log/Logger.cpp60
-rw-r--r--src/base/platform/unix/EventLoop.cpp33
-rw-r--r--test/base/platform/unix/EventLoopTest.cpp56
6 files changed, 150 insertions, 107 deletions
diff --git a/include/cru/base/concurrent/ConcurrentQueue.h b/include/cru/base/concurrent/ConcurrentQueue.h
deleted file mode 100644
index e311d5f9..00000000
--- a/include/cru/base/concurrent/ConcurrentQueue.h
+++ /dev/null
@@ -1,88 +0,0 @@
-#pragma once
-#include <condition_variable>
-#include <mutex>
-#include <optional>
-#include <utility>
-
-namespace cru::concurrent {
-namespace details {
-template <typename T>
-struct ConcurrentQueueNode {
- ConcurrentQueueNode(T&& value, ConcurrentQueueNode* next = nullptr)
- : value(std::move(value)), next(next) {}
-
- T value;
- ConcurrentQueueNode* next;
-};
-} // namespace details
-
-template <typename T>
-class ConcurrentQueue {
- public:
- ConcurrentQueue() {}
-
- ConcurrentQueue(const ConcurrentQueue&) = delete;
- ConcurrentQueue& operator=(const ConcurrentQueue&) = delete;
-
- ~ConcurrentQueue() {
- if (head_) {
- auto node = head_;
- while (node) {
- auto next = node->next;
- delete node;
- node = next;
- }
- }
- }
-
- public:
- void Push(T&& value) {
- std::unique_lock<std::mutex> lock(mutex_);
- if (head_ == nullptr) {
- head_ = tail_ = new details::ConcurrentQueueNode<T>(std::move(value));
- condition_variable_.notify_one();
- } else {
- tail_->next = new details::ConcurrentQueueNode<T>(std::move(value));
- tail_ = tail_->next;
- }
- }
-
- T Pull() {
- std::unique_lock<std::mutex> lock(mutex_);
- if (head_ == nullptr) {
- condition_variable_.wait(lock);
- }
- assert(head_ != nullptr);
- auto value = std::move(head_->value);
- auto next = head_->next;
- delete head_;
- head_ = next;
- if (next == nullptr) {
- tail_ = nullptr;
- }
- return value;
- }
-
- std::optional<T> Poll() {
- std::unique_lock<std::mutex> lock(mutex_);
- if (head_ == nullptr) {
- return std::nullopt;
- }
- auto value = std::move(head_->value);
- auto next = head_->next;
- delete head_;
- head_ = next;
- if (next == nullptr) {
- tail_ = nullptr;
- }
- return value;
- }
-
- private:
- details::ConcurrentQueueNode<T>* head_ = nullptr;
- details::ConcurrentQueueNode<T>* tail_ = nullptr;
-
- std::mutex mutex_;
- std::condition_variable condition_variable_;
-};
-} // namespace cru::concurrent
diff --git a/include/cru/base/log/Logger.h b/include/cru/base/log/Logger.h
index 3ed119e2..1ef53654 100644
--- a/include/cru/base/log/Logger.h
+++ b/include/cru/base/log/Logger.h
@@ -1,9 +1,9 @@
#pragma once
#include "../Base.h"
-#include "../concurrent/ConcurrentQueue.h"
-
+#include <condition_variable>
#include <format> // IWYU pragma: keep
+#include <list>
#include <memory>
#include <mutex>
#include <thread>
@@ -55,12 +55,17 @@ class CRU_BASE_API Logger : public Object2 {
}
private:
- concurrent::ConcurrentQueue<LogInfo> log_queue_;
+ void LogThreadRun();
+
+ private:
+ std::mutex log_queue_mutex_;
+ std::condition_variable log_queue_condition_variable_;
+ std::list<LogInfo> log_queue_;
+ bool log_stop_;
+ std::thread log_thread_;
std::mutex target_list_mutex_;
std::vector<std::unique_ptr<ILogTarget>> target_list_;
-
- std::thread log_thread_;
};
class CRU_BASE_API LoggerCppStream : public Object2 {
diff --git a/include/cru/base/platform/unix/EventLoop.h b/include/cru/base/platform/unix/EventLoop.h
index 784cb780..9def8c7b 100644
--- a/include/cru/base/platform/unix/EventLoop.h
+++ b/include/cru/base/platform/unix/EventLoop.h
@@ -45,14 +45,17 @@ class UnixTimerFile : public Object2 {
};
class UnixEventLoop : public Object2 {
+ CRU_DEFINE_CLASS_LOG_TAG("cru::platform::unix::UnixEventLoop")
public:
UnixEventLoop();
+ ~UnixEventLoop() override;
int Run();
void RequestQuit(int exit_code = 0);
int SetTimer(std::function<void()> action, std::chrono::milliseconds timeout,
bool repeat);
+ void CancelTimer(int id);
int SetImmediate(std::function<void()> action) {
return this->SetTimer(std::move(action), std::chrono::milliseconds::zero(),
@@ -91,6 +94,8 @@ class UnixEventLoop : public Object2 {
bool CheckTimer();
bool ReadTimerPipe();
+ void RemoveTimer(int id);
+
private:
std::thread::id running_thread_;
diff --git a/src/base/log/Logger.cpp b/src/base/log/Logger.cpp
index c195cd87..4332ba75 100644
--- a/src/base/log/Logger.cpp
+++ b/src/base/log/Logger.cpp
@@ -2,8 +2,11 @@
#include "cru/base/log/StdioLogTarget.h"
#include <algorithm>
+#include <condition_variable>
#include <ctime>
#include <format>
+#include <memory>
+#include <mutex>
#ifdef CRU_PLATFORM_WINDOWS
#include "cru/base/platform/win/DebugLogTarget.h"
@@ -65,18 +68,16 @@ std::string MakeLogFinalMessage(const LogInfo &log_info) {
}
} // namespace
-Logger::Logger()
- : log_thread_([this] {
- while (true) {
- auto log_info = log_queue_.Pull();
- std::lock_guard<std::mutex> lock_guard{target_list_mutex_};
- for (const auto &target : target_list_) {
- target->Write(log_info.level, MakeLogFinalMessage(log_info));
- }
- }
- }) {}
+Logger::Logger() : log_stop_(false), log_thread_(&Logger::LogThreadRun, this) {}
-Logger::~Logger() { log_thread_.detach(); }
+Logger::~Logger() {
+ {
+ std::unique_lock lock(log_queue_mutex_);
+ log_stop_ = true;
+ log_queue_condition_variable_.notify_one();
+ }
+ log_thread_.join();
+}
void Logger::Log(LogInfo log_info) {
#ifndef CRU_DEBUG
@@ -84,7 +85,41 @@ void Logger::Log(LogInfo log_info) {
return;
}
#endif
- log_queue_.Push(std::move(log_info));
+
+ std::unique_lock lock(log_queue_mutex_);
+ log_queue_.push_back(std::move(log_info));
+ log_queue_condition_variable_.notify_one();
+}
+void Logger::LogThreadRun() {
+ while (true) {
+ std::list<LogInfo> queue;
+ bool stop = false;
+ std::vector<ILogTarget *> target_list;
+
+ {
+ std::unique_lock lock(log_queue_mutex_);
+ log_queue_condition_variable_.wait(
+ lock, [this] { return !log_queue_.empty() || log_stop_; });
+ std::swap(queue, log_queue_);
+ stop = log_stop_;
+ }
+
+ {
+ std::lock_guard<std::mutex> lock_guard(target_list_mutex_);
+ for (const auto &target : target_list_) {
+ target_list.push_back(target.get());
+ }
+ }
+
+ for (const auto &target : target_list) {
+ for (auto &log_info : queue) {
+ target->Write(log_info.level, MakeLogFinalMessage(log_info));
+ }
+ }
+
+ // TODO: Should still wait for queue to be cleared.
+ if (stop) return;
+ }
}
LoggerCppStream::LoggerCppStream(Logger *logger, LogLevel level,
@@ -102,4 +137,5 @@ LoggerCppStream LoggerCppStream::WithTag(std::string tag) const {
void LoggerCppStream::Consume(std::string_view str) {
this->logger_->Log(this->level_, this->tag_, std::string(str));
}
+
} // namespace cru::log
diff --git a/src/base/platform/unix/EventLoop.cpp b/src/base/platform/unix/EventLoop.cpp
index 3c645a85..8fe3b7ad 100644
--- a/src/base/platform/unix/EventLoop.cpp
+++ b/src/base/platform/unix/EventLoop.cpp
@@ -1,5 +1,6 @@
#include "cru/base/platform/unix/EventLoop.h"
#include "cru/base/Exception.h"
+#include "cru/base/log/Logger.h"
#include <poll.h>
#include <algorithm>
@@ -18,7 +19,11 @@ UnixEventLoop::UnixEventLoop() : timer_tag_(1), polls_(1), poll_actions_(1) {
poll_actions_[0] = [](auto _) {};
}
+UnixEventLoop::~UnixEventLoop() { CRU_LOG_TAG_DEBUG("Event loop destroyed."); }
+
int UnixEventLoop::Run() {
+ CRU_LOG_TAG_DEBUG("Event loop started.");
+
running_thread_ = std::this_thread::get_id();
while (!exit_code_) {
@@ -63,7 +68,12 @@ int UnixEventLoop::Run() {
return exit_code_.value();
}
-void UnixEventLoop::RequestQuit(int exit_code) {}
+void UnixEventLoop::RequestQuit(int exit_code) {
+ exit_code_ = exit_code;
+ if (std::this_thread::get_id() != running_thread_) {
+ SetImmediate([] {});
+ }
+}
int UnixEventLoop::SetTimer(std::function<void()> action,
std::chrono::milliseconds timeout, bool repeat) {
@@ -91,6 +101,14 @@ int UnixEventLoop::SetTimer(std::function<void()> action,
return tag;
}
+void UnixEventLoop::CancelTimer(int id) {
+ if (std::this_thread::get_id() == running_thread_) {
+ RemoveTimer(id);
+ } else {
+ SetImmediate([this, id] { RemoveTimer(id); });
+ }
+}
+
bool UnixEventLoop::CheckPoll() {
auto iter = std::ranges::find_if(
polls_, [](const pollfd &poll_fd) { return poll_fd.revents != 0; });
@@ -117,6 +135,8 @@ bool UnixEventLoop::CheckTimer() {
return false;
}
+ CRU_LOG_TAG_INFO("A timer is to be executed.");
+
auto &timer = *iter;
if (timer.repeat) {
while (timer.timeout <= std::chrono::milliseconds::zero()) {
@@ -158,6 +178,17 @@ bool UnixEventLoop::ReadTimerPipe() {
timers_.push_back(std::move(*pointer));
delete pointer;
+ CRU_LOG_TAG_INFO("A timer from pipe is received.");
+
return true;
}
+
+void UnixEventLoop::RemoveTimer(int id) {
+ auto iter = std::ranges::find_if(
+ timers_, [id](const TimerData &timer) { return timer.id == id; });
+ if (iter != timers_.cend()) {
+ timers_.erase(iter);
+ }
+}
+
} // namespace cru::platform::unix
diff --git a/test/base/platform/unix/EventLoopTest.cpp b/test/base/platform/unix/EventLoopTest.cpp
index f5936b2a..4660efb3 100644
--- a/test/base/platform/unix/EventLoopTest.cpp
+++ b/test/base/platform/unix/EventLoopTest.cpp
@@ -20,5 +20,59 @@ TEST_CASE("UnixTimerFile Work", "[unix][time]") {
fds[0].fd = timer.GetReadFd();
fds[0].events = POLLIN;
REQUIRE(::poll(fds, 1, test_miliseconds * 2) == 1);
- REQUIRE((std::chrono::steady_clock::now() - start) > test_duration);
+ auto delay = std::chrono::steady_clock::now() - start;
+ REQUIRE(delay > test_duration);
+ REQUIRE(delay < std::chrono::milliseconds(500));
+}
+
+TEST_CASE("UnixEventLoop Timer", "[unix][time]") {
+ using namespace cru;
+ using namespace cru::platform::unix;
+
+ UnixEventLoop loop;
+
+ auto test_miliseconds = 300;
+ auto test_duration = std::chrono::milliseconds(test_miliseconds);
+ auto start = std::chrono::steady_clock::now();
+
+ int counter = 0;
+
+ loop.SetTimeout(
+ [test_duration, start] {
+ auto delay = std::chrono::steady_clock::now() - start;
+ REQUIRE(delay > test_duration);
+ REQUIRE(delay < std::chrono::milliseconds(500));
+ },
+ test_duration);
+
+ int timer_id;
+ timer_id = loop.SetInterval(
+ [&loop, timer_id, test_duration, start, &counter] {
+ switch (counter) {
+ case 0: {
+ auto delay = std::chrono::steady_clock::now() - start;
+ REQUIRE(delay > test_duration);
+ REQUIRE(delay < std::chrono::milliseconds(500));
+ counter++;
+ break;
+ }
+ case 1: {
+ auto delay = std::chrono::steady_clock::now() - start;
+ REQUIRE(delay > test_duration * 2);
+ REQUIRE(delay < std::chrono::milliseconds(1000));
+ counter++;
+ break;
+ }
+ default: {
+ loop.CancelTimer(timer_id);
+ loop.RequestQuit();
+ break;
+ }
+ }
+ },
+ test_duration);
+
+ auto exit_code = loop.Run();
+ REQUIRE(exit_code == 0);
+ REQUIRE(counter == 2);
}