diff options
author | crupest <crupest@outlook.com> | 2024-10-06 13:57:39 +0800 |
---|---|---|
committer | crupest <crupest@outlook.com> | 2024-10-06 13:57:39 +0800 |
commit | dfe62dcf8bcefc523b466e127c3edc4dc2756629 (patch) | |
tree | 1c751a14ba0da07ca2ff805633f97568060aa4c9 /include/cru/base/concurrent/ConcurrentQueue.h | |
parent | f51eb955e188858272230a990565931e7403f23b (diff) | |
download | cru-dfe62dcf8bcefc523b466e127c3edc4dc2756629.tar.gz cru-dfe62dcf8bcefc523b466e127c3edc4dc2756629.tar.bz2 cru-dfe62dcf8bcefc523b466e127c3edc4dc2756629.zip |
Rename common to base.
Diffstat (limited to 'include/cru/base/concurrent/ConcurrentQueue.h')
-rw-r--r-- | include/cru/base/concurrent/ConcurrentQueue.h | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/include/cru/base/concurrent/ConcurrentQueue.h b/include/cru/base/concurrent/ConcurrentQueue.h new file mode 100644 index 00000000..e311d5f9 --- /dev/null +++ b/include/cru/base/concurrent/ConcurrentQueue.h @@ -0,0 +1,88 @@ +#pragma once +#include <condition_variable> +#include <mutex> +#include <optional> +#include <utility> + +namespace cru::concurrent { +namespace details { +template <typename T> +struct ConcurrentQueueNode { + ConcurrentQueueNode(T&& value, ConcurrentQueueNode* next = nullptr) + : value(std::move(value)), next(next) {} + + T value; + ConcurrentQueueNode* next; +}; +} // namespace details + +template <typename T> +class ConcurrentQueue { + public: + ConcurrentQueue() {} + + ConcurrentQueue(const ConcurrentQueue&) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; + + ~ConcurrentQueue() { + if (head_) { + auto node = head_; + while (node) { + auto next = node->next; + delete node; + node = next; + } + } + } + + public: + void Push(T&& value) { + std::unique_lock<std::mutex> lock(mutex_); + if (head_ == nullptr) { + head_ = tail_ = new details::ConcurrentQueueNode<T>(std::move(value)); + condition_variable_.notify_one(); + } else { + tail_->next = new details::ConcurrentQueueNode<T>(std::move(value)); + tail_ = tail_->next; + } + } + + T Pull() { + std::unique_lock<std::mutex> lock(mutex_); + if (head_ == nullptr) { + condition_variable_.wait(lock); + } + assert(head_ != nullptr); + auto value = std::move(head_->value); + auto next = head_->next; + delete head_; + head_ = next; + if (next == nullptr) { + tail_ = nullptr; + } + return value; + } + + std::optional<T> Poll() { + std::unique_lock<std::mutex> lock(mutex_); + if (head_ == nullptr) { + return std::nullopt; + } + auto value = std::move(head_->value); + auto next = head_->next; + delete head_; + head_ = next; + if (next == nullptr) { + tail_ = nullptr; + } + return value; + } + + private: + details::ConcurrentQueueNode<T>* head_ = nullptr; + details::ConcurrentQueueNode<T>* tail_ = nullptr; + + std::mutex mutex_; + std::condition_variable condition_variable_; +}; +} // namespace cru::concurrent |