From ced1a63686e6c64cb574d74a34d1bbd07d0a668e Mon Sep 17 00:00:00 2001 From: crupest Date: Mon, 21 Feb 2022 18:44:40 +0800 Subject: ... --- include/cru/common/concurrent/ConcurrentQueue.h | 110 ++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 include/cru/common/concurrent/ConcurrentQueue.h (limited to 'include/cru/common/concurrent/ConcurrentQueue.h') diff --git a/include/cru/common/concurrent/ConcurrentQueue.h b/include/cru/common/concurrent/ConcurrentQueue.h new file mode 100644 index 00000000..4f649a41 --- /dev/null +++ b/include/cru/common/concurrent/ConcurrentQueue.h @@ -0,0 +1,110 @@ +#pragma once +#include +#include +#include +#include + +namespace cru::concurrent { +namespace details { +template +struct ConcurrentQueueNode { + ConcurrentQueueNode(T&& value, ConcurrentQueueNode* next = nullptr) + : value(std::move(value)), next(next) {} + + T value; + ConcurrentQueueNode* next; +}; +} // namespace details + +template +class ConcurrentQueue { + public: + ConcurrentQueue() {} + + ConcurrentQueue(const ConcurrentQueue&) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; + + ConcurrentQueue(ConcurrentQueue&& other) + : head_(other.head_), + tail_(other.tail_), + mutex_(std::move(other.mutex_)), + condition_variable_(std::move(other.condition_variable_)) { + other.head_ = nullptr; + other.tail_ = nullptr; + } + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { + if (this != &other) { + head_ = other.head_; + tail_ = other.tail_; + mutex_ = std::move(other.mutex_); + condition_variable_ = std::move(other.condition_variable_); + other.head_ = nullptr; + other.tail_ = nullptr; + return *this; + } + return *this; + } + + ~ConcurrentQueue() { + if (head_) { + auto node = head_; + while (node) { + auto next = node->next; + delete node; + node = next; + } + } + } + + public: + void Push(T&& value) { + std::unique_lock lock(mutex_); + if (head_ == nullptr) { + head_ = tail_ = new details::ConcurrentQueueNode(std::move(value)); + condition_variable_.notify_one(); + } else { + tail_->next = new details::ConcurrentQueueNode(std::move(value)); + tail_ = tail_->next; + } + } + + T Pull() { + std::unique_lock lock(mutex_); + if (head_ == nullptr) { + condition_variable_.wait(lock); + } + assert(head_ != nullptr); + auto value = std::move(head_->value); + auto next = head_->next; + delete head_; + head_ = next; + if (next == nullptr) { + tail_ = nullptr; + } + return value; + } + + std::optional Poll() { + std::unique_lock lock(mutex_); + if (head_ == nullptr) { + return std::nullopt; + } + auto value = std::move(head_->value); + auto next = head_->next; + delete head_; + head_ = next; + if (next == nullptr) { + tail_ = nullptr; + } + return value; + } + + private: + details::ConcurrentQueueNode* head_ = nullptr; + details::ConcurrentQueueNode* tail_ = nullptr; + + std::mutex mutex_; + std::condition_variable condition_variable_; +}; +} // namespace cru::concurrent -- cgit v1.2.3