diff options
-rw-r--r-- | include/cru/common/Buffer.h | 3 | ||||
-rw-r--r-- | include/cru/common/io/BufferStream.h | 20 | ||||
-rw-r--r-- | src/common/io/AutoReadStream.cpp | 1 | ||||
-rw-r--r-- | src/common/io/BufferStream.cpp | 91 |
4 files changed, 83 insertions, 32 deletions
diff --git a/include/cru/common/Buffer.h b/include/cru/common/Buffer.h index 69c47aff..5c1f7ba3 100644 --- a/include/cru/common/Buffer.h +++ b/include/cru/common/Buffer.h @@ -23,6 +23,9 @@ class Buffer final { bool IsNull() const { return ptr_ == nullptr; } bool IsUsedReachEnd() const { return used_end_ == size_; } + Index GetFrontFree() const { return used_begin_; } + Index GetBackFree() const { return size_ - used_end_; } + Index GetUsedBegin() const { return used_begin_; } Index GetUsedEnd() const { return used_end_; } diff --git a/include/cru/common/io/BufferStream.h b/include/cru/common/io/BufferStream.h index a95b3487..b3a5e9d1 100644 --- a/include/cru/common/io/BufferStream.h +++ b/include/cru/common/io/BufferStream.h @@ -31,13 +31,18 @@ struct BufferStreamOptions { * @brief Total size limit of saved data in buffer. Use default value if < 0. * No limit if == 0. * - * The size will be ceil(total_size_limit / block_size). When the buffer is + * The size will be floor(total_size_limit / block_size). When the buffer is * filled, it will block and wait for user to read to get free space of buffer * to continue read. */ Index total_size_limit = 0; }; +/** + * @brief SPSC (Single Producer Single Consumer) buffer stream. + * + * If used by multiple producer or multiple consumer, the behavior is undefined. + */ class BufferStream : public Stream { public: /** @@ -50,7 +55,7 @@ class BufferStream : public Stream { * Actually I have no ideas about the best value for this. May change it later * when I get some ideas. */ - constexpr static Index kDefaultTotalSizeLimit = 1024; + constexpr static Index kDefaultTotalSizeLimit = 1024 * 1024; public: BufferStream(const BufferStreamOptions& options); @@ -63,19 +68,12 @@ class BufferStream : public Stream { bool CanRead() override; Index Read(std::byte* buffer, Index offset, Index size) override; - bool CanWrite() = 0; - Index Write(const std::byte* buffer, Index offset, Index size) = 0; - - virtual void Flush(); - - virtual void Close(); + bool CanWrite() override; + Index Write(const std::byte* buffer, Index offset, Index size) override; void SetEof(); private: - bool CheckClosed(); - - private: Index block_size_; Index total_size_limit_; Index block_count_limit_; diff --git a/src/common/io/AutoReadStream.cpp b/src/common/io/AutoReadStream.cpp index 7cdc1268..1acf8930 100644 --- a/src/common/io/AutoReadStream.cpp +++ b/src/common/io/AutoReadStream.cpp @@ -31,6 +31,5 @@ void AutoReadStream::BackgroundThreadRun() { } stream_->Read(); } -} } // namespace cru::io diff --git a/src/common/io/BufferStream.cpp b/src/common/io/BufferStream.cpp index c08d04a5..1becc1d9 100644 --- a/src/common/io/BufferStream.cpp +++ b/src/common/io/BufferStream.cpp @@ -12,21 +12,16 @@ BufferStream::BufferStream(const BufferStreamOptions& options) { eof_ = false; } -bool BufferStream::CanSeek() { - CheckClosed(); - return false; -} +BufferStream::~BufferStream() {} + +bool BufferStream::CanSeek() { return false; } Index BufferStream::Seek(Index offset, SeekOrigin origin) { - CheckClosed(); throw StreamOperationNotSupportedException( u"BufferStream does not support seeking."); } -bool BufferStream::CanRead() { - CheckClosed(); - return true; -} +bool BufferStream::CanRead() { return true; } Index BufferStream::Read(std::byte* buffer, Index offset, Index size) { std::unique_lock lock(mutex_); @@ -34,30 +29,86 @@ Index BufferStream::Read(std::byte* buffer, Index offset, Index size) { condition_variable_.wait(lock, [this] { return !buffer_list_.empty() || eof_; }); - if (eof_) { + if (buffer_list_.empty() && eof_) { return 0; } - Index written_size = 0; - auto current_offset = offset; - auto rest_size = size; + auto full = buffer_list_.size() == block_count_limit_; + + Index read = 0; while (!buffer_list_.empty()) { auto& stream_buffer = buffer_list_.front(); - auto this_written_size = - stream_buffer.PopFront(buffer + current_offset, rest_size); + auto this_read = + stream_buffer.PopFront(buffer + offset + read, size - read); if (stream_buffer.GetUsedSize() == 0) { buffer_list_.pop_front(); } - written_size += this_written_size; - rest_size -= this_written_size; - current_offset += this_written_size; - if (rest_size == 0) { + read += this_read; + if (read == size) { + break; + } + } + + if (full && buffer_list_.size() < block_count_limit_) { + // By convention, there should be at most one producer waiting. So + // notify_one and notify_all should be the same. + condition_variable_.notify_one(); + } + + return read; +} + +bool BufferStream::CanWrite() { return true; } + +Index BufferStream::Write(const std::byte* buffer, Index offset, Index size) { + std::unique_lock lock(mutex_); + + if (eof_) { + throw WriteAfterEofException( + u"Stream has been set eof. Can't write to it any more."); + } + + condition_variable_.wait(lock, [this] { + return buffer_list_.size() < block_count_limit_ || + buffer_list_.back().GetBackFree() > 0; + }); + + auto empty = buffer_list_.empty(); + + Index written = 0; + + while (buffer_list_.size() != block_count_limit_) { + if (buffer_list_.back().GetBackFree() == 0) { + buffer_list_.push_back(Buffer(block_size_)); + } + auto& stream_buffer = buffer_list_.back(); + auto this_written = + stream_buffer.PushBack(buffer + offset + written, size - written); + written += this_written; + if (written == size) { break; } } - return written_size; + if (empty) { + // By convention, there should be at most one consumer waiting. So + // notify_one and notify_all should be the same. + condition_variable_.notify_one(); + } + + return written; +} + +void BufferStream::SetEof() { + std::unique_lock lock(mutex_); + + eof_ = true; + if (buffer_list_.empty()) { + // By convention, there should be at most one consumer waiting. So + // notify_one and notify_all should be the same. + condition_variable_.notify_one(); + } } } // namespace cru::io |