From 38756822825e20eca3b9e01b735946175223d692 Mon Sep 17 00:00:00 2001 From: Yuqian Yang Date: Sat, 7 Mar 2026 20:42:37 +0800 Subject: Refactor stream. --- src/base/io/AutoReadStream.cpp | 21 +++---- src/base/io/BufferStream.cpp | 135 ++++++++++++++++++++++++++--------------- src/base/io/CFileStream.cpp | 23 +++++-- src/base/io/MemoryStream.cpp | 19 ++++-- src/base/io/ProxyStream.cpp | 37 ----------- src/base/io/Stream.cpp | 112 +++++++++++++++++++++------------- 6 files changed, 195 insertions(+), 152 deletions(-) delete mode 100644 src/base/io/ProxyStream.cpp (limited to 'src/base/io') diff --git a/src/base/io/AutoReadStream.cpp b/src/base/io/AutoReadStream.cpp index 0c035648..61cc9f26 100644 --- a/src/base/io/AutoReadStream.cpp +++ b/src/base/io/AutoReadStream.cpp @@ -8,7 +8,7 @@ namespace cru::io { AutoReadStream::AutoReadStream(Stream* stream, bool auto_close, bool auto_delete, const AutoReadStreamOptions& options) - : Stream(false, true, stream->CanSeek()), + : Stream(false, true, std::nullopt), auto_close_(auto_close), auto_delete_(auto_delete) { auto buffer_stream_options = options.GetBufferStreamOptions(); @@ -19,10 +19,15 @@ AutoReadStream::AutoReadStream(Stream* stream, bool auto_close, } AutoReadStream::~AutoReadStream() { - DoClose(); + if (auto_delete_) { + delete stream_; + } + buffer_stream_->Close(); background_thread_.join(); } +bool AutoReadStream::DoCanWrite() { return stream_->CanWrite(); } + Index AutoReadStream::DoRead(std::byte* buffer, Index offset, Index size) { return buffer_stream_->Read(buffer, offset, size); } @@ -35,14 +40,9 @@ Index AutoReadStream::DoWrite(const std::byte* buffer, Index offset, void AutoReadStream::DoFlush() { stream_->Flush(); } void AutoReadStream::DoClose() { - CRU_STREAM_BEGIN_CLOSE if (auto_close_) { stream_->Close(); } - if (auto_delete_) { - delete stream_; - stream_ = nullptr; - } buffer_stream_->Close(); } @@ -51,14 +51,13 @@ void AutoReadStream::BackgroundThreadRun() { while (true) { try { auto read = stream_->Read(buffer.data(), buffer.size()); - if (read == 0) { - buffer_stream_->SetEof(); + if (read == kEOF) { + buffer_stream_->WriteEof(); break; } else { buffer_stream_->Write(buffer.data(), read); } - } catch (const StreamClosedException& exception) { - buffer_stream_->Close(); + } catch (const StreamException&) { break; } } diff --git a/src/base/io/BufferStream.cpp b/src/base/io/BufferStream.cpp index 0dbb438b..fdcd25ff 100644 --- a/src/base/io/BufferStream.cpp +++ b/src/base/io/BufferStream.cpp @@ -6,35 +6,34 @@ BufferStream::BufferStream(const BufferStreamOptions& options) : Stream(false, true, true) { block_size_ = options.GetBlockSizeOrDefault(); max_block_count_ = options.GetMaxBlockCount(); - - eof_ = false; + eof_written_ = false; } -BufferStream::~BufferStream() { DoClose(); } +BufferStream::~BufferStream() {} Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) { std::unique_lock lock(mutex_); - condition_variable_.wait( - lock, [this] { return GetClosed() || !buffer_list_.empty() || eof_; }); + read_cv_.wait(lock, [this] { + return eof_written_ || !buffer_list_.empty() || IsClosed(); + }); - if (GetClosed()) { - StreamClosedException::Check(true); - } + CheckClosed(); - if (buffer_list_.empty() && eof_) { - return 0; + if (buffer_list_.empty() && eof_written_) { + return kEOF; } - auto full = max_block_count_ > 0 && buffer_list_.size() == max_block_count_; + auto full_previously = max_block_count_ > 0 && + buffer_list_.size() == max_block_count_ && + buffer_list_.back().IsFull(); Index read = 0; while (!buffer_list_.empty()) { auto& stream_buffer = buffer_list_.front(); - auto this_read = - stream_buffer.PopFront(buffer + offset + read, size - read); - if (stream_buffer.GetUsedSize() == 0) { + auto this_read = stream_buffer.Read(buffer + offset + read, size - read); + if (stream_buffer.IsEmpty()) { buffer_list_.pop_front(); } read += this_read; @@ -43,10 +42,8 @@ Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) { } } - if (full && buffer_list_.size() < max_block_count_) { - // By convention, there should be at most one producer waiting. So - // notify_one and notify_all should be the same. - condition_variable_.notify_one(); + if (full_previously && buffer_list_.size() < max_block_count_) { + write_cv_.notify_all(); } return read; @@ -55,69 +52,109 @@ Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) { Index BufferStream::DoWrite(const std::byte* buffer, Index offset, Index size) { std::unique_lock lock(mutex_); - if (eof_) { - throw WriteAfterEofException( - "Stream has been set eof. Can't write to it any more."); - } - - condition_variable_.wait(lock, [this] { - return GetClosed() || max_block_count_ <= 0 || + write_cv_.wait(lock, [this] { + return eof_written_ || max_block_count_ <= 0 || buffer_list_.size() < max_block_count_ || - buffer_list_.back().GetBackFree() > 0; + !buffer_list_.back().IsFull() || IsClosed(); }); - if (GetClosed()) { - StreamClosedException::Check(true); + CheckClosed(); + + if (eof_written_) { + throw StreamIOException( + this, "Stream has been set eof. Can't write to it any more."); } - auto empty = buffer_list_.empty(); + auto empty_previously = buffer_list_.empty(); Index written = 0; - if (empty) { - buffer_list_.push_back(Buffer(block_size_)); + if (empty_previously) { + buffer_list_.push_back(Block(block_size_)); } while (true) { - if (buffer_list_.back().GetBackFree() == 0) { + if (buffer_list_.back().IsFull()) { if (max_block_count_ > 0 && buffer_list_.size() == max_block_count_) { break; } - buffer_list_.push_back(Buffer(block_size_)); + buffer_list_.push_back(Block(block_size_)); } auto& stream_buffer = buffer_list_.back(); auto this_written = - stream_buffer.PushBack(buffer + offset + written, size - written); + stream_buffer.Write(buffer + offset + written, size - written); written += this_written; if (written == size) { break; } } - if (empty) { - // By convention, there should be at most one consumer waiting. So - // notify_one and notify_all should be the same. - condition_variable_.notify_one(); + if (empty_previously) { + read_cv_.notify_all(); } return written; } -void BufferStream::SetEof() { +void BufferStream::DoClose() { std::unique_lock lock(mutex_); - eof_ = true; - if (buffer_list_.empty()) { - // By convention, there should be at most one consumer waiting. So - // notify_one and notify_all should be the same. - condition_variable_.notify_one(); + buffer_list_.clear(); + read_cv_.notify_all(); + write_cv_.notify_all(); +} + +void BufferStream::WriteEof() { + std::unique_lock lock(mutex_); + + eof_written_ = true; + read_cv_.notify_all(); + write_cv_.notify_all(); +} + +BufferStream::Block::Block(Index size) + : buffer(new std::byte[size]), size(size), start(0), end(0) {} + +BufferStream::Block::Block(Block&& other) noexcept + : buffer(other.buffer), + size(other.size), + start(other.start), + end(other.end) { + other.buffer = nullptr; + other.size = other.start = other.end = 0; +} + +BufferStream::Block& BufferStream::Block::operator=(Block&& other) noexcept { + if (this != &other) { + delete[] buffer; + buffer = other.buffer; + size = other.size; + start = other.start; + end = other.end; + other.buffer = nullptr; + other.size = other.start = other.end = 0; } + return *this; } -void BufferStream::DoClose() { - CRU_STREAM_BEGIN_CLOSE - SetClosed(true); - condition_variable_.notify_all(); - buffer_list_.clear(); +BufferStream::Block::~Block() { delete[] buffer; } + +Index BufferStream::Block::Read(std::byte* des, Index si) { + si = std::min(si, end - start); + std::memcpy(des, buffer + start, si); + start += si; + return si; +} + +Index BufferStream::Block::Write(const std::byte* src, Index si) { + si = std::min(si, size - end); + std::memcpy(buffer + end, src, si); + end += si; + return si; } + +bool BufferStream::Block::IsFull() const { return end == size; } + +bool BufferStream::Block::IsEmpty() const { return end == start; } + } // namespace cru::io diff --git a/src/base/io/CFileStream.cpp b/src/base/io/CFileStream.cpp index db477077..9632cba9 100644 --- a/src/base/io/CFileStream.cpp +++ b/src/base/io/CFileStream.cpp @@ -27,7 +27,8 @@ CFileStream::CFileStream(const char* path, const char* mode) file_(std::fopen(path, mode)), auto_close_(true) { if (file_ == nullptr) { - throw ErrnoException("Cannot open file."); + throw StreamIOException(this, "fopen failed.", + std::make_shared()); } } @@ -40,7 +41,7 @@ CFileStream::CFileStream(std::FILE* file, bool readable, bool writable, } CFileStream::~CFileStream() { - if (auto_close_ && file_ != nullptr) { + if (file_ && auto_close_) { std::fclose(file_); } } @@ -60,7 +61,8 @@ static int ConvertOriginFlag(Stream::SeekOrigin origin) { Index CFileStream::DoSeek(Index offset, SeekOrigin origin) { if (std::fseek(file_, offset, ConvertOriginFlag(origin))) { - throw ErrnoException("Seek failed."); + throw StreamIOException(this, "fseek failed.", + std::make_shared()); } return DoTell(); } @@ -68,7 +70,8 @@ Index CFileStream::DoSeek(Index offset, SeekOrigin origin) { Index CFileStream::DoTell() { long position = std::ftell(file_); if (position == -1) { - throw ErrnoException("Tell failed."); + throw StreamIOException(this, "ftell failed.", + std::make_shared()); } return position; } @@ -77,20 +80,28 @@ void CFileStream::DoRewind() { std::rewind(file_); } Index CFileStream::DoRead(std::byte* buffer, Index offset, Index size) { auto count = std::fread(buffer + offset, 1, size, file_); + if (std::ferror(file_)) { + throw StreamIOException(this, "Error occurred when reading C FILE."); + } + if (count == 0 && std::feof(file_)) { + return kEOF; + } return count; } Index CFileStream::DoWrite(const std::byte* buffer, Index offset, Index size) { auto count = std::fwrite(buffer + offset, 1, size, file_); + if (std::ferror(file_)) { + throw StreamIOException(this, "Error occurred when writing C FILE."); + } return count; } void CFileStream::DoFlush() { std::fflush(file_); } void CFileStream::DoClose() { - CRU_STREAM_BEGIN_CLOSE if (auto_close_ && !std::fclose(file_)) { - throw Exception("Failed to close FILE."); + throw StreamIOException(this, "fclose failed."); } file_ = nullptr; } diff --git a/src/base/io/MemoryStream.cpp b/src/base/io/MemoryStream.cpp index 4d289197..555526cd 100644 --- a/src/base/io/MemoryStream.cpp +++ b/src/base/io/MemoryStream.cpp @@ -20,9 +20,11 @@ MemoryStream::MemoryStream( } } -MemoryStream::~MemoryStream() {} - -void MemoryStream::Close() { DoClose(); } +MemoryStream::~MemoryStream() { + if (buffer_ && release_func_) { + release_func_(buffer_, size_); + } +} Index MemoryStream::DoSeek(Index offset, SeekOrigin origin) { switch (origin) { @@ -40,6 +42,10 @@ Index MemoryStream::DoSeek(Index offset, SeekOrigin origin) { } Index MemoryStream::DoRead(std::byte* buffer, Index offset, Index size) { + if (position_ == size_) { + return kEOF; + } + if (position_ + size > size_) { size = size_ - position_; } @@ -64,10 +70,11 @@ Index MemoryStream::DoWrite(const std::byte* buffer, Index offset, Index size) { } void MemoryStream::DoClose() { - CRU_STREAM_BEGIN_CLOSE - release_func_(buffer_, size_); + if (release_func_) { + release_func_(buffer_, size_); + release_func_ = {}; + } buffer_ = nullptr; - release_func_ = {}; } } // namespace cru::io diff --git a/src/base/io/ProxyStream.cpp b/src/base/io/ProxyStream.cpp deleted file mode 100644 index de66169e..00000000 --- a/src/base/io/ProxyStream.cpp +++ /dev/null @@ -1,37 +0,0 @@ -#include "cru/base/io/ProxyStream.h" -#include "cru/base/io/Stream.h" - -namespace cru::io { -ProxyStream::ProxyStream(ProxyStreamHandlers handlers) - : Stream(static_cast(handlers.seek), static_cast(handlers.read), - static_cast(handlers.write)), - handlers_(std::move(handlers)) {} - -ProxyStream::~ProxyStream() { DoClose(); } - -Index ProxyStream::DoSeek(Index offset, SeekOrigin origin) { - return handlers_.seek(offset, origin); -} - -Index ProxyStream::DoRead(std::byte* buffer, Index offset, Index size) { - return handlers_.read(buffer, offset, size); -} - -Index ProxyStream::DoWrite(const std::byte* buffer, Index offset, Index size) { - return handlers_.write(buffer, offset, size); -} - -void ProxyStream::DoFlush() { - if (handlers_.flush) { - handlers_.flush(); - } -} - -void ProxyStream::DoClose() { - CRU_STREAM_BEGIN_CLOSE - if (handlers_.close) { - handlers_.close(); - } - handlers_ = {}; -} -} // namespace cru::io diff --git a/src/base/io/Stream.cpp b/src/base/io/Stream.cpp index c7286241..b0a9ab0c 100644 --- a/src/base/io/Stream.cpp +++ b/src/base/io/Stream.cpp @@ -1,33 +1,34 @@ #include "cru/base/io/Stream.h" -#include +#include #include -#include +#include #include namespace cru::io { +StreamException::StreamException(Stream* stream, std::string message, + std::shared_ptr inner) + : Exception(std::move(message), std::move(inner)), stream_(stream) {} + StreamOperationNotSupportedException::StreamOperationNotSupportedException( - std::string operation) - : Exception(std::format("Stream operation {} not supported.", operation)), + Stream* stream, std::string operation) + : StreamException( + stream, std::format("Stream operation {} not supported.", operation)), operation_(std::move(operation)) {} -void StreamOperationNotSupportedException::CheckSeek(bool seekable) { - if (!seekable) throw StreamOperationNotSupportedException("seek"); -} - -void StreamOperationNotSupportedException::CheckRead(bool readable) { - if (!readable) throw StreamOperationNotSupportedException("read"); +void StreamOperationNotSupportedException::CheckSeek(Stream* stream, + bool seekable) { + if (!seekable) throw StreamOperationNotSupportedException(stream, "seek"); } -void StreamOperationNotSupportedException::CheckWrite(bool writable) { - if (!writable) throw StreamOperationNotSupportedException("write"); +void StreamOperationNotSupportedException::CheckRead(Stream* stream, + bool readable) { + if (!readable) throw StreamOperationNotSupportedException(stream, "read"); } -StreamClosedException::StreamClosedException() - : Exception("Stream is already closed.") {} - -void StreamClosedException::Check(bool closed) { - if (closed) throw StreamClosedException(); +void StreamOperationNotSupportedException::CheckWrite(Stream* stream, + bool writable) { + if (!writable) throw StreamOperationNotSupportedException(stream, "write"); } Stream::Stream(SupportedOperations supported_operations) @@ -44,7 +45,7 @@ bool Stream::CanSeek() { Index Stream::Seek(Index offset, SeekOrigin origin) { CheckClosed(); - StreamOperationNotSupportedException::CheckSeek(DoCanSeek()); + StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek()); return DoSeek(offset, origin); } @@ -70,7 +71,7 @@ bool Stream::CanRead() { Index Stream::Read(std::byte* buffer, Index offset, Index size) { CheckClosed(); - StreamOperationNotSupportedException::CheckRead(DoCanRead()); + StreamOperationNotSupportedException::CheckRead(this, DoCanRead()); return DoRead(buffer, offset, size); } @@ -93,7 +94,7 @@ bool Stream::CanWrite() { Index Stream::Write(const std::byte* buffer, Index offset, Index size) { CheckClosed(); - StreamOperationNotSupportedException::CheckWrite(DoCanWrite()); + StreamOperationNotSupportedException::CheckWrite(this, DoCanWrite()); return DoWrite(buffer, offset, size); } @@ -114,6 +115,46 @@ void Stream::Flush() { DoFlush(); } +bool Stream::IsClosed() { return closed_.load(std::memory_order_acquire); } + +bool Stream::Close() { + bool expected = false; + if (closed_.compare_exchange_strong(expected, true, + std::memory_order_acq_rel)) { + DoClose(); + } + return expected; +} + +std::vector Stream::ReadToEnd(Index grow_size) { + std::vector buffer; + Index pos = 0; + while (true) { + if (pos == buffer.size()) { + buffer.resize(buffer.size() + grow_size); + } + + auto read = Read(buffer.data(), pos, buffer.size() - pos); + if (read == kEOF) { + break; + } + pos += read; + } + buffer.resize(pos); + return buffer; +} + +std::string Stream::ReadToEndAsUtf8String() { + auto buffer = ReadToEnd(); + return std::views::transform( + buffer, [](std::byte c) { return static_cast(c); }) | + std::ranges::to(); +} + +void Stream::SetSupportedOperations(SupportedOperations supported_operations) { + supported_operations_ = std::move(supported_operations); +} + bool Stream::DoCanSeek() { if (supported_operations_.can_seek) { return *supported_operations_.can_seek; @@ -149,17 +190,17 @@ Index Stream::DoSeek(Index offset, SeekOrigin origin) { } Index Stream::DoTell() { - StreamOperationNotSupportedException::CheckSeek(DoCanSeek()); + StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek()); return DoSeek(0, SeekOrigin::Current); } void Stream::DoRewind() { - StreamOperationNotSupportedException::CheckSeek(DoCanSeek()); + StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek()); DoSeek(0, SeekOrigin::Begin); } Index Stream::DoGetSize() { - StreamOperationNotSupportedException::CheckSeek(DoCanSeek()); + StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek()); Index current_position = DoTell(); Seek(0, SeekOrigin::End); Index size = DoTell(); @@ -177,27 +218,12 @@ Index Stream::DoWrite(const std::byte* buffer, Index offset, Index size) { void Stream::DoFlush() {} -Buffer Stream::ReadToEnd(Index grow_size) { - Buffer buffer(grow_size); - while (true) { - auto read = Read(buffer.GetUsedEndPtr(), buffer.GetBackFree()); - buffer.PushBackCount(read); - if (read == 0) { - break; - } - if (buffer.IsUsedReachEnd()) { - buffer.ResizeBuffer(buffer.GetBufferSize() + grow_size, true); - } +void Stream::DoClose() {} + +void Stream::CheckClosed() { + if (IsClosed()) { + throw StreamClosedException(this, "Stream is closed."); } - return buffer; } -std::string Stream::ReadToEndAsUtf8String() { - auto buffer = ReadToEnd(); - std::string result; - std::transform(buffer.GetUsedBeginPtr(), buffer.GetUsedEndPtr(), - std::back_inserter(result), - [](std::byte c) { return static_cast(c); }); - return result; -} } // namespace cru::io -- cgit v1.2.3