From c3cf29afa2b2dd3f2e972a25f35ef5840ad1e2f5 Mon Sep 17 00:00:00 2001 From: crupest Date: Sun, 12 May 2024 00:19:16 +0800 Subject: feat: completed AutoReadStream. NEED TEST: BufferStream, AutoReadStream. --- include/cru/common/io/AutoReadStream.h | 44 ++++++++++++++-------------------- include/cru/common/io/BufferStream.h | 33 +++++++++++++------------ src/common/io/AutoReadStream.cpp | 37 ++++++++++++++++++++++++---- src/common/io/BufferStream.cpp | 18 +++++++------- 4 files changed, 75 insertions(+), 57 deletions(-) diff --git a/include/cru/common/io/AutoReadStream.h b/include/cru/common/io/AutoReadStream.h index e252bdff..4ba4066f 100644 --- a/include/cru/common/io/AutoReadStream.h +++ b/include/cru/common/io/AutoReadStream.h @@ -1,6 +1,7 @@ #pragma once #include "../Buffer.h" +#include "BufferStream.h" #include "Stream.h" #include @@ -11,24 +12,21 @@ namespace cru::io { struct AutoReadStreamOptions { /** - * @brief The size of a single buffer allocated each time new space is needed. - * Use default value if <= 0. - * - * When current buffer is full and there is no space for following data, a new - * buffer will be allocated and appended to the buffer list. Note if sum size - * of all buffers reaches the total_buffer_size, no more buffer will be - * allocated but wait. + * @brief Will be passed to BufferStreamOptions::block_size. */ - int buffer_block_size = 0; + Index block_size = 0; /** - * @brief Total size limit of saved data in buffer. Use default value if < 0. - * No limit if == 0. - * - * When the buffer is filled, it will block and wait for user to read to get - * free space of buffer to continue read. + * @brief Will be passed to BufferStreamOptions::total_size_limit. */ - int total_buffer_size = 0; + Index total_size_limit = 0; + + BufferStreamOptions GetBufferStreamOptions() const { + BufferStreamOptions options; + options.block_size = block_size; + options.total_size_limit = total_size_limit; + return options; + } }; /** @@ -36,9 +34,6 @@ struct AutoReadStreamOptions { * background thread. */ class CRU_BASE_API AutoReadStream : public Stream { - private: - class BufferBlock {}; - public: /** * @brief Wrap a stream and auto read it in background. @@ -56,15 +51,15 @@ class CRU_BASE_API AutoReadStream : public Stream { bool CanSeek() override; Index Seek(Index offset, SeekOrigin origin = SeekOrigin::Current) override; - bool CanRead() = 0; - virtual Index Read(std::byte* buffer, Index offset, Index size) = 0; + bool CanRead() override; + Index Read(std::byte* buffer, Index offset, Index size) override; bool CanWrite() override; Index Write(const std::byte* buffer, Index offset, Index size) override; void Flush() override; - void Close() = 0; + void Close() override; private: void BackgroundThreadRun(); @@ -73,12 +68,9 @@ class CRU_BASE_API AutoReadStream : public Stream { Stream* stream_; bool auto_delete_; - int buffer_block_size_; - int total_buffer_size_; - - std::mutex buffer_mutex_; - std::condition_variable buffer_condition_variable_; - std::list buffer_list_; + Index size_per_read_; + std::unique_ptr buffer_stream_; + std::mutex buffer_stream_mutex_; std::thread background_thread_; }; diff --git a/include/cru/common/io/BufferStream.h b/include/cru/common/io/BufferStream.h index b3a5e9d1..d5d165cb 100644 --- a/include/cru/common/io/BufferStream.h +++ b/include/cru/common/io/BufferStream.h @@ -16,6 +16,12 @@ class WriteAfterEofException : public Exception { }; struct BufferStreamOptions { + /** + * Actually I have no ideas about the best value for this. May change it later + * when I get some ideas. + */ + constexpr static Index kDefaultBlockSize = 1024; + /** * @brief The size of a single buffer allocated each time new space is needed. * Use default value if <= 0. @@ -28,14 +34,19 @@ struct BufferStreamOptions { Index block_size = 0; /** - * @brief Total size limit of saved data in buffer. Use default value if < 0. - * No limit if == 0. + * @brief Total size limit of saved data in buffer. No limit if <= 0. * * The size will be floor(total_size_limit / block_size). When the buffer is * filled, it will block and wait for user to read to get free space of buffer * to continue read. */ Index total_size_limit = 0; + + Index GetBlockSizeOrDefault() const { + return block_size <= 0 ? kDefaultBlockSize : block_size; + } + + Index GetMaxBlockCount() const { return total_size_limit / block_size; } }; /** @@ -44,19 +55,6 @@ struct BufferStreamOptions { * If used by multiple producer or multiple consumer, the behavior is undefined. */ class BufferStream : public Stream { - public: - /** - * Actually I have no ideas about the best value for this. May change it later - * when I get some ideas. - */ - constexpr static Index kDefaultBlockSize = 1024; - - /** - * Actually I have no ideas about the best value for this. May change it later - * when I get some ideas. - */ - constexpr static Index kDefaultTotalSizeLimit = 1024 * 1024; - public: BufferStream(const BufferStreamOptions& options); @@ -67,16 +65,17 @@ class BufferStream : public Stream { bool CanRead() override; Index Read(std::byte* buffer, Index offset, Index size) override; + using Stream::Read; bool CanWrite() override; Index Write(const std::byte* buffer, Index offset, Index size) override; + using Stream::Write; void SetEof(); private: Index block_size_; - Index total_size_limit_; - Index block_count_limit_; + Index max_block_count_; std::list buffer_list_; bool eof_; diff --git a/src/common/io/AutoReadStream.cpp b/src/common/io/AutoReadStream.cpp index 1acf8930..edb77a47 100644 --- a/src/common/io/AutoReadStream.cpp +++ b/src/common/io/AutoReadStream.cpp @@ -1,14 +1,22 @@ #include "cru/common/io/AutoReadStream.h" -#include #include "cru/common/io/Stream.h" namespace cru::io { AutoReadStream::AutoReadStream(Stream* stream, bool auto_delete, const AutoReadStreamOptions& options) { + auto buffer_stream_options = options.GetBufferStreamOptions(); + size_per_read_ = buffer_stream_options.GetBlockSizeOrDefault(); + buffer_stream_ = std::make_unique(buffer_stream_options); background_thread_ = std::thread(&AutoReadStream::BackgroundThreadRun, this); } +AutoReadStream::~AutoReadStream() { + if (auto_delete_) { + delete stream_; + } +} + bool AutoReadStream::CanSeek() { return false; } Index AutoReadStream::Seek(Index offset, SeekOrigin origin) { @@ -16,6 +24,13 @@ Index AutoReadStream::Seek(Index offset, SeekOrigin origin) { u"AutoReadStream does not support seek."); } +bool AutoReadStream::CanRead() { return true; } + +Index AutoReadStream::Read(std::byte* buffer, Index offset, Index size) { + std::unique_lock lock(buffer_stream_mutex_); + return buffer_stream_->Read(buffer, offset, size); +} + bool AutoReadStream::CanWrite() { return stream_->CanWrite(); } Index AutoReadStream::Write(const std::byte* buffer, Index offset, Index size) { @@ -24,12 +39,24 @@ Index AutoReadStream::Write(const std::byte* buffer, Index offset, Index size) { void AutoReadStream::Flush() { stream_->Flush(); } +void AutoReadStream::Close() { stream_->Close(); } + void AutoReadStream::BackgroundThreadRun() { - std::unique_lock lock(buffer_mutex_); - std::vector* buffer = nullptr; - if (!buffer_list_.empty()) { + std::vector buffer(size_per_read_); + while (true) { + try { + auto read = stream_->Read(buffer.data(), buffer.size()); + if (read == 0) { + buffer_stream_->SetEof(); + break; + } else { + buffer_stream_->Write(buffer.data(), read); + } + } catch (const StreamAlreadyClosedException& exception) { + buffer_stream_->SetEof(); + break; + } } - stream_->Read(); } } // namespace cru::io diff --git a/src/common/io/BufferStream.cpp b/src/common/io/BufferStream.cpp index 1becc1d9..b2467a17 100644 --- a/src/common/io/BufferStream.cpp +++ b/src/common/io/BufferStream.cpp @@ -3,11 +3,8 @@ namespace cru::io { BufferStream::BufferStream(const BufferStreamOptions& options) { - block_size_ = - options.block_size <= 0 ? kDefaultBlockSize : options.block_size; - total_size_limit_ = options.total_size_limit < 0 ? kDefaultTotalSizeLimit - : options.total_size_limit; - block_count_limit_ = total_size_limit_ / block_size_; + block_size_ = options.GetBlockSizeOrDefault(); + max_block_count_ = options.GetMaxBlockCount(); eof_ = false; } @@ -33,7 +30,7 @@ Index BufferStream::Read(std::byte* buffer, Index offset, Index size) { return 0; } - auto full = buffer_list_.size() == block_count_limit_; + auto full = max_block_count_ > 0 && buffer_list_.size() == max_block_count_; Index read = 0; @@ -50,7 +47,7 @@ Index BufferStream::Read(std::byte* buffer, Index offset, Index size) { } } - if (full && buffer_list_.size() < block_count_limit_) { + if (full && buffer_list_.size() < max_block_count_) { // By convention, there should be at most one producer waiting. So // notify_one and notify_all should be the same. condition_variable_.notify_one(); @@ -70,7 +67,7 @@ Index BufferStream::Write(const std::byte* buffer, Index offset, Index size) { } condition_variable_.wait(lock, [this] { - return buffer_list_.size() < block_count_limit_ || + return buffer_list_.size() < max_block_count_ || buffer_list_.back().GetBackFree() > 0; }); @@ -78,8 +75,11 @@ Index BufferStream::Write(const std::byte* buffer, Index offset, Index size) { Index written = 0; - while (buffer_list_.size() != block_count_limit_) { + while (true) { if (buffer_list_.back().GetBackFree() == 0) { + if (max_block_count_ > 0 && buffer_list_.size() == max_block_count_) { + break; + } buffer_list_.push_back(Buffer(block_size_)); } auto& stream_buffer = buffer_list_.back(); -- cgit v1.2.3