diff options
-rw-r--r-- | include/cru/base/concurrent/ConcurrentQueue.h | 88 | ||||
-rw-r--r-- | include/cru/base/log/Logger.h | 15 | ||||
-rw-r--r-- | include/cru/base/platform/unix/EventLoop.h | 5 | ||||
-rw-r--r-- | src/base/log/Logger.cpp | 60 | ||||
-rw-r--r-- | src/base/platform/unix/EventLoop.cpp | 33 | ||||
-rw-r--r-- | test/base/platform/unix/EventLoopTest.cpp | 56 |
6 files changed, 150 insertions, 107 deletions
diff --git a/include/cru/base/concurrent/ConcurrentQueue.h b/include/cru/base/concurrent/ConcurrentQueue.h deleted file mode 100644 index e311d5f9..00000000 --- a/include/cru/base/concurrent/ConcurrentQueue.h +++ /dev/null @@ -1,88 +0,0 @@ -#pragma once -#include <condition_variable> -#include <mutex> -#include <optional> -#include <utility> - -namespace cru::concurrent { -namespace details { -template <typename T> -struct ConcurrentQueueNode { - ConcurrentQueueNode(T&& value, ConcurrentQueueNode* next = nullptr) - : value(std::move(value)), next(next) {} - - T value; - ConcurrentQueueNode* next; -}; -} // namespace details - -template <typename T> -class ConcurrentQueue { - public: - ConcurrentQueue() {} - - ConcurrentQueue(const ConcurrentQueue&) = delete; - ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; - - ~ConcurrentQueue() { - if (head_) { - auto node = head_; - while (node) { - auto next = node->next; - delete node; - node = next; - } - } - } - - public: - void Push(T&& value) { - std::unique_lock<std::mutex> lock(mutex_); - if (head_ == nullptr) { - head_ = tail_ = new details::ConcurrentQueueNode<T>(std::move(value)); - condition_variable_.notify_one(); - } else { - tail_->next = new details::ConcurrentQueueNode<T>(std::move(value)); - tail_ = tail_->next; - } - } - - T Pull() { - std::unique_lock<std::mutex> lock(mutex_); - if (head_ == nullptr) { - condition_variable_.wait(lock); - } - assert(head_ != nullptr); - auto value = std::move(head_->value); - auto next = head_->next; - delete head_; - head_ = next; - if (next == nullptr) { - tail_ = nullptr; - } - return value; - } - - std::optional<T> Poll() { - std::unique_lock<std::mutex> lock(mutex_); - if (head_ == nullptr) { - return std::nullopt; - } - auto value = std::move(head_->value); - auto next = head_->next; - delete head_; - head_ = next; - if (next == nullptr) { - tail_ = nullptr; - } - return value; - } - - private: - details::ConcurrentQueueNode<T>* head_ = nullptr; - details::ConcurrentQueueNode<T>* tail_ = nullptr; - - std::mutex mutex_; - std::condition_variable condition_variable_; -}; -} // namespace cru::concurrent diff --git a/include/cru/base/log/Logger.h b/include/cru/base/log/Logger.h index 3ed119e2..1ef53654 100644 --- a/include/cru/base/log/Logger.h +++ b/include/cru/base/log/Logger.h @@ -1,9 +1,9 @@ #pragma once #include "../Base.h" -#include "../concurrent/ConcurrentQueue.h" - +#include <condition_variable> #include <format> // IWYU pragma: keep +#include <list> #include <memory> #include <mutex> #include <thread> @@ -55,12 +55,17 @@ class CRU_BASE_API Logger : public Object2 { } private: - concurrent::ConcurrentQueue<LogInfo> log_queue_; + void LogThreadRun(); + + private: + std::mutex log_queue_mutex_; + std::condition_variable log_queue_condition_variable_; + std::list<LogInfo> log_queue_; + bool log_stop_; + std::thread log_thread_; std::mutex target_list_mutex_; std::vector<std::unique_ptr<ILogTarget>> target_list_; - - std::thread log_thread_; }; class CRU_BASE_API LoggerCppStream : public Object2 { diff --git a/include/cru/base/platform/unix/EventLoop.h b/include/cru/base/platform/unix/EventLoop.h index 784cb780..9def8c7b 100644 --- a/include/cru/base/platform/unix/EventLoop.h +++ b/include/cru/base/platform/unix/EventLoop.h @@ -45,14 +45,17 @@ class UnixTimerFile : public Object2 { }; class UnixEventLoop : public Object2 { + CRU_DEFINE_CLASS_LOG_TAG("cru::platform::unix::UnixEventLoop") public: UnixEventLoop(); + ~UnixEventLoop() override; int Run(); void RequestQuit(int exit_code = 0); int SetTimer(std::function<void()> action, std::chrono::milliseconds timeout, bool repeat); + void CancelTimer(int id); int SetImmediate(std::function<void()> action) { return this->SetTimer(std::move(action), std::chrono::milliseconds::zero(), @@ -91,6 +94,8 @@ class UnixEventLoop : public Object2 { bool CheckTimer(); bool ReadTimerPipe(); + void RemoveTimer(int id); + private: std::thread::id running_thread_; diff --git a/src/base/log/Logger.cpp b/src/base/log/Logger.cpp index c195cd87..4332ba75 100644 --- a/src/base/log/Logger.cpp +++ b/src/base/log/Logger.cpp @@ -2,8 +2,11 @@ #include "cru/base/log/StdioLogTarget.h" #include <algorithm> +#include <condition_variable> #include <ctime> #include <format> +#include <memory> +#include <mutex> #ifdef CRU_PLATFORM_WINDOWS #include "cru/base/platform/win/DebugLogTarget.h" @@ -65,18 +68,16 @@ std::string MakeLogFinalMessage(const LogInfo &log_info) { } } // namespace -Logger::Logger() - : log_thread_([this] { - while (true) { - auto log_info = log_queue_.Pull(); - std::lock_guard<std::mutex> lock_guard{target_list_mutex_}; - for (const auto &target : target_list_) { - target->Write(log_info.level, MakeLogFinalMessage(log_info)); - } - } - }) {} +Logger::Logger() : log_stop_(false), log_thread_(&Logger::LogThreadRun, this) {} -Logger::~Logger() { log_thread_.detach(); } +Logger::~Logger() { + { + std::unique_lock lock(log_queue_mutex_); + log_stop_ = true; + log_queue_condition_variable_.notify_one(); + } + log_thread_.join(); +} void Logger::Log(LogInfo log_info) { #ifndef CRU_DEBUG @@ -84,7 +85,41 @@ void Logger::Log(LogInfo log_info) { return; } #endif - log_queue_.Push(std::move(log_info)); + + std::unique_lock lock(log_queue_mutex_); + log_queue_.push_back(std::move(log_info)); + log_queue_condition_variable_.notify_one(); +} +void Logger::LogThreadRun() { + while (true) { + std::list<LogInfo> queue; + bool stop = false; + std::vector<ILogTarget *> target_list; + + { + std::unique_lock lock(log_queue_mutex_); + log_queue_condition_variable_.wait( + lock, [this] { return !log_queue_.empty() || log_stop_; }); + std::swap(queue, log_queue_); + stop = log_stop_; + } + + { + std::lock_guard<std::mutex> lock_guard(target_list_mutex_); + for (const auto &target : target_list_) { + target_list.push_back(target.get()); + } + } + + for (const auto &target : target_list) { + for (auto &log_info : queue) { + target->Write(log_info.level, MakeLogFinalMessage(log_info)); + } + } + + // TODO: Should still wait for queue to be cleared. + if (stop) return; + } } LoggerCppStream::LoggerCppStream(Logger *logger, LogLevel level, @@ -102,4 +137,5 @@ LoggerCppStream LoggerCppStream::WithTag(std::string tag) const { void LoggerCppStream::Consume(std::string_view str) { this->logger_->Log(this->level_, this->tag_, std::string(str)); } + } // namespace cru::log diff --git a/src/base/platform/unix/EventLoop.cpp b/src/base/platform/unix/EventLoop.cpp index 3c645a85..8fe3b7ad 100644 --- a/src/base/platform/unix/EventLoop.cpp +++ b/src/base/platform/unix/EventLoop.cpp @@ -1,5 +1,6 @@ #include "cru/base/platform/unix/EventLoop.h" #include "cru/base/Exception.h" +#include "cru/base/log/Logger.h" #include <poll.h> #include <algorithm> @@ -18,7 +19,11 @@ UnixEventLoop::UnixEventLoop() : timer_tag_(1), polls_(1), poll_actions_(1) { poll_actions_[0] = [](auto _) {}; } +UnixEventLoop::~UnixEventLoop() { CRU_LOG_TAG_DEBUG("Event loop destroyed."); } + int UnixEventLoop::Run() { + CRU_LOG_TAG_DEBUG("Event loop started."); + running_thread_ = std::this_thread::get_id(); while (!exit_code_) { @@ -63,7 +68,12 @@ int UnixEventLoop::Run() { return exit_code_.value(); } -void UnixEventLoop::RequestQuit(int exit_code) {} +void UnixEventLoop::RequestQuit(int exit_code) { + exit_code_ = exit_code; + if (std::this_thread::get_id() != running_thread_) { + SetImmediate([] {}); + } +} int UnixEventLoop::SetTimer(std::function<void()> action, std::chrono::milliseconds timeout, bool repeat) { @@ -91,6 +101,14 @@ int UnixEventLoop::SetTimer(std::function<void()> action, return tag; } +void UnixEventLoop::CancelTimer(int id) { + if (std::this_thread::get_id() == running_thread_) { + RemoveTimer(id); + } else { + SetImmediate([this, id] { RemoveTimer(id); }); + } +} + bool UnixEventLoop::CheckPoll() { auto iter = std::ranges::find_if( polls_, [](const pollfd &poll_fd) { return poll_fd.revents != 0; }); @@ -117,6 +135,8 @@ bool UnixEventLoop::CheckTimer() { return false; } + CRU_LOG_TAG_INFO("A timer is to be executed."); + auto &timer = *iter; if (timer.repeat) { while (timer.timeout <= std::chrono::milliseconds::zero()) { @@ -158,6 +178,17 @@ bool UnixEventLoop::ReadTimerPipe() { timers_.push_back(std::move(*pointer)); delete pointer; + CRU_LOG_TAG_INFO("A timer from pipe is received."); + return true; } + +void UnixEventLoop::RemoveTimer(int id) { + auto iter = std::ranges::find_if( + timers_, [id](const TimerData &timer) { return timer.id == id; }); + if (iter != timers_.cend()) { + timers_.erase(iter); + } +} + } // namespace cru::platform::unix diff --git a/test/base/platform/unix/EventLoopTest.cpp b/test/base/platform/unix/EventLoopTest.cpp index f5936b2a..4660efb3 100644 --- a/test/base/platform/unix/EventLoopTest.cpp +++ b/test/base/platform/unix/EventLoopTest.cpp @@ -20,5 +20,59 @@ TEST_CASE("UnixTimerFile Work", "[unix][time]") { fds[0].fd = timer.GetReadFd(); fds[0].events = POLLIN; REQUIRE(::poll(fds, 1, test_miliseconds * 2) == 1); - REQUIRE((std::chrono::steady_clock::now() - start) > test_duration); + auto delay = std::chrono::steady_clock::now() - start; + REQUIRE(delay > test_duration); + REQUIRE(delay < std::chrono::milliseconds(500)); +} + +TEST_CASE("UnixEventLoop Timer", "[unix][time]") { + using namespace cru; + using namespace cru::platform::unix; + + UnixEventLoop loop; + + auto test_miliseconds = 300; + auto test_duration = std::chrono::milliseconds(test_miliseconds); + auto start = std::chrono::steady_clock::now(); + + int counter = 0; + + loop.SetTimeout( + [test_duration, start] { + auto delay = std::chrono::steady_clock::now() - start; + REQUIRE(delay > test_duration); + REQUIRE(delay < std::chrono::milliseconds(500)); + }, + test_duration); + + int timer_id; + timer_id = loop.SetInterval( + [&loop, timer_id, test_duration, start, &counter] { + switch (counter) { + case 0: { + auto delay = std::chrono::steady_clock::now() - start; + REQUIRE(delay > test_duration); + REQUIRE(delay < std::chrono::milliseconds(500)); + counter++; + break; + } + case 1: { + auto delay = std::chrono::steady_clock::now() - start; + REQUIRE(delay > test_duration * 2); + REQUIRE(delay < std::chrono::milliseconds(1000)); + counter++; + break; + } + default: { + loop.CancelTimer(timer_id); + loop.RequestQuit(); + break; + } + } + }, + test_duration); + + auto exit_code = loop.Run(); + REQUIRE(exit_code == 0); + REQUIRE(counter == 2); } |