diff options
author | crupest <crupest@outlook.com> | 2024-10-06 13:57:39 +0800 |
---|---|---|
committer | crupest <crupest@outlook.com> | 2024-10-06 13:57:39 +0800 |
commit | dfe62dcf8bcefc523b466e127c3edc4dc2756629 (patch) | |
tree | 1c751a14ba0da07ca2ff805633f97568060aa4c9 /src/base/io/BufferStream.cpp | |
parent | f51eb955e188858272230a990565931e7403f23b (diff) | |
download | cru-dfe62dcf8bcefc523b466e127c3edc4dc2756629.tar.gz cru-dfe62dcf8bcefc523b466e127c3edc4dc2756629.tar.bz2 cru-dfe62dcf8bcefc523b466e127c3edc4dc2756629.zip |
Rename common to base.
Diffstat (limited to 'src/base/io/BufferStream.cpp')
-rw-r--r-- | src/base/io/BufferStream.cpp | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/src/base/io/BufferStream.cpp b/src/base/io/BufferStream.cpp new file mode 100644 index 00000000..e81731e8 --- /dev/null +++ b/src/base/io/BufferStream.cpp @@ -0,0 +1,109 @@ +#include "cru/base/io/BufferStream.h" +#include "cru/base/io/Stream.h" + +namespace cru::io { +BufferStream::BufferStream(const BufferStreamOptions& options) + : Stream(false, true, true) { + block_size_ = options.GetBlockSizeOrDefault(); + max_block_count_ = options.GetMaxBlockCount(); + + eof_ = false; +} + +BufferStream::~BufferStream() { DoClose(); } + +Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) { + std::unique_lock lock(mutex_); + + condition_variable_.wait(lock, + [this] { return !buffer_list_.empty() || eof_; }); + + if (buffer_list_.empty() && eof_) { + return 0; + } + + auto full = max_block_count_ > 0 && buffer_list_.size() == max_block_count_; + + Index read = 0; + + while (!buffer_list_.empty()) { + auto& stream_buffer = buffer_list_.front(); + auto this_read = + stream_buffer.PopFront(buffer + offset + read, size - read); + if (stream_buffer.GetUsedSize() == 0) { + buffer_list_.pop_front(); + } + read += this_read; + if (read == size) { + break; + } + } + + if (full && buffer_list_.size() < max_block_count_) { + // By convention, there should be at most one producer waiting. So + // notify_one and notify_all should be the same. + condition_variable_.notify_one(); + } + + return read; +} + +Index BufferStream::DoWrite(const std::byte* buffer, Index offset, Index size) { + std::unique_lock lock(mutex_); + + if (eof_) { + throw WriteAfterEofException( + u"Stream has been set eof. Can't write to it any more."); + } + + condition_variable_.wait(lock, [this] { + return max_block_count_ <= 0 || buffer_list_.size() < max_block_count_ || + buffer_list_.back().GetBackFree() > 0; + }); + + auto empty = buffer_list_.empty(); + + Index written = 0; + + if (empty) { + buffer_list_.push_back(Buffer(block_size_)); + } + + while (true) { + if (buffer_list_.back().GetBackFree() == 0) { + if (max_block_count_ > 0 && buffer_list_.size() == max_block_count_) { + break; + } + buffer_list_.push_back(Buffer(block_size_)); + } + auto& stream_buffer = buffer_list_.back(); + auto this_written = + stream_buffer.PushBack(buffer + offset + written, size - written); + written += this_written; + if (written == size) { + break; + } + } + + if (empty) { + // By convention, there should be at most one consumer waiting. So + // notify_one and notify_all should be the same. + condition_variable_.notify_one(); + } + + return written; +} + +void BufferStream::SetEof() { + std::unique_lock lock(mutex_); + + eof_ = true; + if (buffer_list_.empty()) { + // By convention, there should be at most one consumer waiting. So + // notify_one and notify_all should be the same. + condition_variable_.notify_one(); + } +} + +void BufferStream::DoClose() { CRU_STREAM_BEGIN_CLOSE } +} // namespace cru::io |