From b2a331aa527bdd5f4d91727904cc5c66fe897d62 Mon Sep 17 00:00:00 2001 From: crupest Date: Mon, 24 Jun 2024 00:06:25 +0800 Subject: Done Stream refactor. NEED TEST: BufferStream, AutoReadStream, SubProcess. --- src/base/io/AutoReadStream.cpp | 30 ++++++++++++++++--------- src/base/io/BufferStream.cpp | 22 ++++++++++++++---- src/base/io/CFileStream.cpp | 4 +++- src/base/io/Stream.cpp | 6 ++--- src/base/platform/unix/PosixSpawnSubProcess.cpp | 4 ++-- src/base/platform/unix/UnixFileStream.cpp | 10 +-------- 6 files changed, 47 insertions(+), 29 deletions(-) (limited to 'src') diff --git a/src/base/io/AutoReadStream.cpp b/src/base/io/AutoReadStream.cpp index c24f61d1..0c035648 100644 --- a/src/base/io/AutoReadStream.cpp +++ b/src/base/io/AutoReadStream.cpp @@ -5,9 +5,12 @@ namespace cru::io { -AutoReadStream::AutoReadStream(Stream* stream, bool auto_delete, +AutoReadStream::AutoReadStream(Stream* stream, bool auto_close, + bool auto_delete, const AutoReadStreamOptions& options) - : Stream(false, true, stream->CanSeek()) { + : Stream(false, true, stream->CanSeek()), + auto_close_(auto_close), + auto_delete_(auto_delete) { auto buffer_stream_options = options.GetBufferStreamOptions(); stream_ = stream; size_per_read_ = buffer_stream_options.GetBlockSizeOrDefault(); @@ -16,13 +19,11 @@ AutoReadStream::AutoReadStream(Stream* stream, bool auto_delete, } AutoReadStream::~AutoReadStream() { - if (auto_delete_) { - delete stream_; - } + DoClose(); + background_thread_.join(); } Index AutoReadStream::DoRead(std::byte* buffer, Index offset, Index size) { - std::unique_lock lock(buffer_stream_mutex_); return buffer_stream_->Read(buffer, offset, size); } @@ -33,7 +34,17 @@ Index AutoReadStream::DoWrite(const std::byte* buffer, Index offset, void AutoReadStream::DoFlush() { stream_->Flush(); } -void AutoReadStream::DoClose() {} +void AutoReadStream::DoClose() { + CRU_STREAM_BEGIN_CLOSE + if (auto_close_) { + stream_->Close(); + } + if (auto_delete_) { + delete stream_; + stream_ = nullptr; + } + buffer_stream_->Close(); +} void AutoReadStream::BackgroundThreadRun() { std::vector buffer(size_per_read_); @@ -46,11 +57,10 @@ void AutoReadStream::BackgroundThreadRun() { } else { buffer_stream_->Write(buffer.data(), read); } - } catch (const StreamAlreadyClosedException& exception) { - buffer_stream_->SetEof(); + } catch (const StreamClosedException& exception) { + buffer_stream_->Close(); break; } } } - } // namespace cru::io diff --git a/src/base/io/BufferStream.cpp b/src/base/io/BufferStream.cpp index e81731e8..57a8b694 100644 --- a/src/base/io/BufferStream.cpp +++ b/src/base/io/BufferStream.cpp @@ -15,8 +15,12 @@ BufferStream::~BufferStream() { DoClose(); } Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) { std::unique_lock lock(mutex_); - condition_variable_.wait(lock, - [this] { return !buffer_list_.empty() || eof_; }); + condition_variable_.wait( + lock, [this] { return GetClosed() || !buffer_list_.empty() || eof_; }); + + if (GetClosed()) { + StreamClosedException::Check(true); + } if (buffer_list_.empty() && eof_) { return 0; @@ -57,10 +61,15 @@ Index BufferStream::DoWrite(const std::byte* buffer, Index offset, Index size) { } condition_variable_.wait(lock, [this] { - return max_block_count_ <= 0 || buffer_list_.size() < max_block_count_ || + return GetClosed() || max_block_count_ <= 0 || + buffer_list_.size() < max_block_count_ || buffer_list_.back().GetBackFree() > 0; }); + if (GetClosed()) { + StreamClosedException::Check(true); + } + auto empty = buffer_list_.empty(); Index written = 0; @@ -105,5 +114,10 @@ void BufferStream::SetEof() { } } -void BufferStream::DoClose() { CRU_STREAM_BEGIN_CLOSE } +void BufferStream::DoClose() { + CRU_STREAM_BEGIN_CLOSE + SetClosed(true); + condition_variable_.notify_all(); + buffer_list_.clear(); +} } // namespace cru::io diff --git a/src/base/io/CFileStream.cpp b/src/base/io/CFileStream.cpp index d5acc707..45eb2eaf 100644 --- a/src/base/io/CFileStream.cpp +++ b/src/base/io/CFileStream.cpp @@ -90,7 +90,9 @@ void CFileStream::DoFlush() { std::fflush(file_); } void CFileStream::DoClose() { CRU_STREAM_BEGIN_CLOSE - std::fclose(file_); + if (auto_close_ && !std::fclose(file_)) { + throw Exception(u"Failed to close FILE."); + } file_ = nullptr; } } // namespace cru::io diff --git a/src/base/io/Stream.cpp b/src/base/io/Stream.cpp index d65bac46..1944ea7e 100644 --- a/src/base/io/Stream.cpp +++ b/src/base/io/Stream.cpp @@ -23,11 +23,11 @@ void StreamOperationNotSupportedException::CheckWrite(bool writable) { if (!writable) throw StreamOperationNotSupportedException(u"write"); } -StreamAlreadyClosedException::StreamAlreadyClosedException() +StreamClosedException::StreamClosedException() : Exception(u"Stream is already closed.") {} -void StreamAlreadyClosedException::Check(bool closed) { - if (closed) throw StreamAlreadyClosedException(); +void StreamClosedException::Check(bool closed) { + if (closed) throw StreamClosedException(); } Stream::Stream(SupportedOperations supported_operations) diff --git a/src/base/platform/unix/PosixSpawnSubProcess.cpp b/src/base/platform/unix/PosixSpawnSubProcess.cpp index 75d48cc2..5e09c494 100644 --- a/src/base/platform/unix/PosixSpawnSubProcess.cpp +++ b/src/base/platform/unix/PosixSpawnSubProcess.cpp @@ -28,9 +28,9 @@ PosixSpawnSubProcessImpl::PosixSpawnSubProcessImpl() stderr_pipe_.GetSelfFileDescriptor(), false, true, false, true); stdout_buffer_stream_ = - std::make_unique(stdout_stream_.get(), false); + std::make_unique(stdout_stream_.get(), true, false); stderr_buffer_stream_ = - std::make_unique(stderr_stream_.get(), false); + std::make_unique(stderr_stream_.get(), true, false); } PosixSpawnSubProcessImpl::~PosixSpawnSubProcessImpl() {} diff --git a/src/base/platform/unix/UnixFileStream.cpp b/src/base/platform/unix/UnixFileStream.cpp index c53bbbaa..6d8bab25 100644 --- a/src/base/platform/unix/UnixFileStream.cpp +++ b/src/base/platform/unix/UnixFileStream.cpp @@ -61,15 +61,7 @@ UnixFileStream::UnixFileStream(int fd, bool can_seek, bool can_read, auto_close_ = auto_close; } -UnixFileStream::~UnixFileStream() { - if (auto_close_ && file_descriptor_ >= 0) { - if (::close(file_descriptor_) == -1) { - // We are in destructor, so we can not throw. - CRU_LOG_WARN(u"Failed to close file descriptor {}, errno {}.", - file_descriptor_, errno); - } - } -} +UnixFileStream::~UnixFileStream() { DoClose(); } Index UnixFileStream::DoSeek(Index offset, SeekOrigin origin) { off_t result = ::lseek(file_descriptor_, offset, MapSeekOrigin(origin)); -- cgit v1.2.3