From dfe62dcf8bcefc523b466e127c3edc4dc2756629 Mon Sep 17 00:00:00 2001 From: crupest Date: Sun, 6 Oct 2024 13:57:39 +0800 Subject: Rename common to base. --- include/cru/base/concurrent/ConcurrentQueue.h | 88 +++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 include/cru/base/concurrent/ConcurrentQueue.h (limited to 'include/cru/base/concurrent') diff --git a/include/cru/base/concurrent/ConcurrentQueue.h b/include/cru/base/concurrent/ConcurrentQueue.h new file mode 100644 index 00000000..e311d5f9 --- /dev/null +++ b/include/cru/base/concurrent/ConcurrentQueue.h @@ -0,0 +1,88 @@ +#pragma once +#include +#include +#include +#include + +namespace cru::concurrent { +namespace details { +template +struct ConcurrentQueueNode { + ConcurrentQueueNode(T&& value, ConcurrentQueueNode* next = nullptr) + : value(std::move(value)), next(next) {} + + T value; + ConcurrentQueueNode* next; +}; +} // namespace details + +template +class ConcurrentQueue { + public: + ConcurrentQueue() {} + + ConcurrentQueue(const ConcurrentQueue&) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; + + ~ConcurrentQueue() { + if (head_) { + auto node = head_; + while (node) { + auto next = node->next; + delete node; + node = next; + } + } + } + + public: + void Push(T&& value) { + std::unique_lock lock(mutex_); + if (head_ == nullptr) { + head_ = tail_ = new details::ConcurrentQueueNode(std::move(value)); + condition_variable_.notify_one(); + } else { + tail_->next = new details::ConcurrentQueueNode(std::move(value)); + tail_ = tail_->next; + } + } + + T Pull() { + std::unique_lock lock(mutex_); + if (head_ == nullptr) { + condition_variable_.wait(lock); + } + assert(head_ != nullptr); + auto value = std::move(head_->value); + auto next = head_->next; + delete head_; + head_ = next; + if (next == nullptr) { + tail_ = nullptr; + } + return value; + } + + std::optional Poll() { + std::unique_lock lock(mutex_); + if (head_ == nullptr) { + return std::nullopt; + } + auto value = std::move(head_->value); + auto next = head_->next; + delete head_; + head_ = next; + if (next == nullptr) { + tail_ = nullptr; + } + return value; + } + + private: + details::ConcurrentQueueNode* head_ = nullptr; + details::ConcurrentQueueNode* tail_ = nullptr; + + std::mutex mutex_; + std::condition_variable condition_variable_; +}; +} // namespace cru::concurrent -- cgit v1.2.3