diff options
author | crupest <crupest@outlook.com> | 2024-05-12 00:19:16 +0800 |
---|---|---|
committer | crupest <crupest@outlook.com> | 2024-05-18 22:56:24 +0800 |
commit | c3cf29afa2b2dd3f2e972a25f35ef5840ad1e2f5 (patch) | |
tree | 6260a4143f254e7c974f66453be0be61f9b935f4 /src/common/io | |
parent | b05c7d76dcfef6d882bc01fea4663a4c2fcdcdf2 (diff) | |
download | cru-c3cf29afa2b2dd3f2e972a25f35ef5840ad1e2f5.tar.gz cru-c3cf29afa2b2dd3f2e972a25f35ef5840ad1e2f5.tar.bz2 cru-c3cf29afa2b2dd3f2e972a25f35ef5840ad1e2f5.zip |
feat: completed AutoReadStream.
NEED TEST: BufferStream, AutoReadStream.
Diffstat (limited to 'src/common/io')
-rw-r--r-- | src/common/io/AutoReadStream.cpp | 37 | ||||
-rw-r--r-- | src/common/io/BufferStream.cpp | 18 |
2 files changed, 41 insertions, 14 deletions
diff --git a/src/common/io/AutoReadStream.cpp b/src/common/io/AutoReadStream.cpp index 1acf8930..edb77a47 100644 --- a/src/common/io/AutoReadStream.cpp +++ b/src/common/io/AutoReadStream.cpp @@ -1,14 +1,22 @@ #include "cru/common/io/AutoReadStream.h" -#include <vector> #include "cru/common/io/Stream.h" namespace cru::io { AutoReadStream::AutoReadStream(Stream* stream, bool auto_delete, const AutoReadStreamOptions& options) { + auto buffer_stream_options = options.GetBufferStreamOptions(); + size_per_read_ = buffer_stream_options.GetBlockSizeOrDefault(); + buffer_stream_ = std::make_unique<BufferStream>(buffer_stream_options); background_thread_ = std::thread(&AutoReadStream::BackgroundThreadRun, this); } +AutoReadStream::~AutoReadStream() { + if (auto_delete_) { + delete stream_; + } +} + bool AutoReadStream::CanSeek() { return false; } Index AutoReadStream::Seek(Index offset, SeekOrigin origin) { @@ -16,6 +24,13 @@ Index AutoReadStream::Seek(Index offset, SeekOrigin origin) { u"AutoReadStream does not support seek."); } +bool AutoReadStream::CanRead() { return true; } + +Index AutoReadStream::Read(std::byte* buffer, Index offset, Index size) { + std::unique_lock lock(buffer_stream_mutex_); + return buffer_stream_->Read(buffer, offset, size); +} + bool AutoReadStream::CanWrite() { return stream_->CanWrite(); } Index AutoReadStream::Write(const std::byte* buffer, Index offset, Index size) { @@ -24,12 +39,24 @@ Index AutoReadStream::Write(const std::byte* buffer, Index offset, Index size) { void AutoReadStream::Flush() { stream_->Flush(); } +void AutoReadStream::Close() { stream_->Close(); } + void AutoReadStream::BackgroundThreadRun() { - std::unique_lock<std::mutex> lock(buffer_mutex_); - std::vector<std::byte>* buffer = nullptr; - if (!buffer_list_.empty()) { + std::vector<std::byte> buffer(size_per_read_); + while (true) { + try { + auto read = stream_->Read(buffer.data(), buffer.size()); + if (read == 0) { + buffer_stream_->SetEof(); + break; + } else { + buffer_stream_->Write(buffer.data(), read); + } + } catch (const StreamAlreadyClosedException& exception) { + buffer_stream_->SetEof(); + break; + } } - stream_->Read(); } } // namespace cru::io diff --git a/src/common/io/BufferStream.cpp b/src/common/io/BufferStream.cpp index 1becc1d9..b2467a17 100644 --- a/src/common/io/BufferStream.cpp +++ b/src/common/io/BufferStream.cpp @@ -3,11 +3,8 @@ namespace cru::io { BufferStream::BufferStream(const BufferStreamOptions& options) { - block_size_ = - options.block_size <= 0 ? kDefaultBlockSize : options.block_size; - total_size_limit_ = options.total_size_limit < 0 ? kDefaultTotalSizeLimit - : options.total_size_limit; - block_count_limit_ = total_size_limit_ / block_size_; + block_size_ = options.GetBlockSizeOrDefault(); + max_block_count_ = options.GetMaxBlockCount(); eof_ = false; } @@ -33,7 +30,7 @@ Index BufferStream::Read(std::byte* buffer, Index offset, Index size) { return 0; } - auto full = buffer_list_.size() == block_count_limit_; + auto full = max_block_count_ > 0 && buffer_list_.size() == max_block_count_; Index read = 0; @@ -50,7 +47,7 @@ Index BufferStream::Read(std::byte* buffer, Index offset, Index size) { } } - if (full && buffer_list_.size() < block_count_limit_) { + if (full && buffer_list_.size() < max_block_count_) { // By convention, there should be at most one producer waiting. So // notify_one and notify_all should be the same. condition_variable_.notify_one(); @@ -70,7 +67,7 @@ Index BufferStream::Write(const std::byte* buffer, Index offset, Index size) { } condition_variable_.wait(lock, [this] { - return buffer_list_.size() < block_count_limit_ || + return buffer_list_.size() < max_block_count_ || buffer_list_.back().GetBackFree() > 0; }); @@ -78,8 +75,11 @@ Index BufferStream::Write(const std::byte* buffer, Index offset, Index size) { Index written = 0; - while (buffer_list_.size() != block_count_limit_) { + while (true) { if (buffer_list_.back().GetBackFree() == 0) { + if (max_block_count_ > 0 && buffer_list_.size() == max_block_count_) { + break; + } buffer_list_.push_back(Buffer(block_size_)); } auto& stream_buffer = buffer_list_.back(); |