diff options
author | crupest <crupest@outlook.com> | 2024-06-24 00:06:25 +0800 |
---|---|---|
committer | crupest <crupest@outlook.com> | 2024-10-04 19:55:33 +0800 |
commit | f51eb955e188858272230a990565931e7403f23b (patch) | |
tree | 04de484bfcd056b6eea56c13c42cce83315c448f /src/common | |
parent | 1b30150ab79ff1338f209a8ddb54b3dc60cfb599 (diff) | |
download | cru-f51eb955e188858272230a990565931e7403f23b.tar.gz cru-f51eb955e188858272230a990565931e7403f23b.tar.bz2 cru-f51eb955e188858272230a990565931e7403f23b.zip |
HALF WORK: Stream refactor.
TODO: Complete refactor of BufferStream and AutoReadStream.
NEED TEST: BufferStream, AutoReadStream, SubProcess.
Diffstat (limited to 'src/common')
-rw-r--r-- | src/common/io/AutoReadStream.cpp | 32 | ||||
-rw-r--r-- | src/common/io/BufferStream.cpp | 21 | ||||
-rw-r--r-- | src/common/io/CFileStream.cpp | 64 | ||||
-rw-r--r-- | src/common/io/MemoryStream.cpp | 47 | ||||
-rw-r--r-- | src/common/io/ProxyStream.cpp | 49 | ||||
-rw-r--r-- | src/common/io/Stream.cpp | 137 | ||||
-rw-r--r-- | src/common/platform/unix/UnixFileStream.cpp | 50 |
7 files changed, 208 insertions, 192 deletions
diff --git a/src/common/io/AutoReadStream.cpp b/src/common/io/AutoReadStream.cpp index e7c851cf..18bc18da 100644 --- a/src/common/io/AutoReadStream.cpp +++ b/src/common/io/AutoReadStream.cpp @@ -6,14 +6,13 @@ namespace cru::io { AutoReadStream::AutoReadStream(Stream* stream, bool auto_delete, - const AutoReadStreamOptions& options) { + const AutoReadStreamOptions& options) + : Stream(false, true, stream->CanSeek()) { auto buffer_stream_options = options.GetBufferStreamOptions(); stream_ = stream; size_per_read_ = buffer_stream_options.GetBlockSizeOrDefault(); buffer_stream_ = std::make_unique<BufferStream>(buffer_stream_options); - auto background_thread = - std::thread(&AutoReadStream::BackgroundThreadRun, this); - background_thread.detach(); + background_thread_ = std::thread(&AutoReadStream::BackgroundThreadRun, this); } AutoReadStream::~AutoReadStream() { @@ -22,40 +21,25 @@ AutoReadStream::~AutoReadStream() { } } -bool AutoReadStream::CanSeek() { return false; } - -Index AutoReadStream::Seek(Index offset, SeekOrigin origin) { - throw StreamOperationNotSupportedException( - u"AutoReadStream does not support seek."); -} - -bool AutoReadStream::CanRead() { return true; } - -Index AutoReadStream::Read(std::byte* buffer, Index offset, Index size) { +Index AutoReadStream::DoRead(std::byte* buffer, Index offset, Index size) { std::unique_lock lock(buffer_stream_mutex_); return buffer_stream_->Read(buffer, offset, size); } -bool AutoReadStream::CanWrite() { return stream_->CanWrite(); } - -Index AutoReadStream::Write(const std::byte* buffer, Index offset, Index size) { +Index AutoReadStream::DoWrite(const std::byte* buffer, Index offset, + Index size) { return stream_->Write(buffer, offset, size); } -void AutoReadStream::Flush() { stream_->Flush(); } +void AutoReadStream::DoFlush() { stream_->Flush(); } -void AutoReadStream::Close() { stream_->Close(); } +void AutoReadStream::DoClose() {} void AutoReadStream::BackgroundThreadRun() { - auto resolver = CreateResolver(); std::vector<std::byte> buffer(size_per_read_); while (true) { try { auto read = stream_->Read(buffer.data(), buffer.size()); - auto self = resolver.Resolve(); - if (!self) { - break; - } if (read == 0) { buffer_stream_->SetEof(); break; diff --git a/src/common/io/BufferStream.cpp b/src/common/io/BufferStream.cpp index 242396cd..73e5719b 100644 --- a/src/common/io/BufferStream.cpp +++ b/src/common/io/BufferStream.cpp @@ -2,25 +2,17 @@ #include "cru/common/io/Stream.h" namespace cru::io { -BufferStream::BufferStream(const BufferStreamOptions& options) { +BufferStream::BufferStream(const BufferStreamOptions& options) + : Stream(false, true, true) { block_size_ = options.GetBlockSizeOrDefault(); max_block_count_ = options.GetMaxBlockCount(); eof_ = false; } -BufferStream::~BufferStream() {} +BufferStream::~BufferStream() { DoClose(); } -bool BufferStream::CanSeek() { return false; } - -Index BufferStream::Seek(Index offset, SeekOrigin origin) { - throw StreamOperationNotSupportedException( - u"BufferStream does not support seeking."); -} - -bool BufferStream::CanRead() { return true; } - -Index BufferStream::Read(std::byte* buffer, Index offset, Index size) { +Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) { std::unique_lock lock(mutex_); condition_variable_.wait(lock, @@ -56,9 +48,7 @@ Index BufferStream::Read(std::byte* buffer, Index offset, Index size) { return read; } -bool BufferStream::CanWrite() { return true; } - -Index BufferStream::Write(const std::byte* buffer, Index offset, Index size) { +Index BufferStream::DoWrite(const std::byte* buffer, Index offset, Index size) { std::unique_lock lock(mutex_); if (eof_) { @@ -115,4 +105,5 @@ void BufferStream::SetEof() { } } +void BufferStream::DoClose() { CRU_STREAM_BEGIN_CLOSE } } // namespace cru::io diff --git a/src/common/io/CFileStream.cpp b/src/common/io/CFileStream.cpp index 29d4819f..01456437 100644 --- a/src/common/io/CFileStream.cpp +++ b/src/common/io/CFileStream.cpp @@ -24,9 +24,8 @@ static bool ModeCanWrite(const char* mode) { } CFileStream::CFileStream(const char* path, const char* mode) - : file_(std::fopen(path, mode)), - readable_(ModeCanRead(mode)), - writable_(ModeCanWrite(mode)), + : Stream(true, ModeCanRead(mode), ModeCanWrite(mode)), + file_(std::fopen(path, mode)), auto_close_(true) { if (file_ == nullptr) { throw ErrnoException(u"Cannot open file."); @@ -35,10 +34,7 @@ CFileStream::CFileStream(const char* path, const char* mode) CFileStream::CFileStream(std::FILE* file, bool readable, bool writable, bool auto_close) - : file_(file), - readable_(readable), - writable_(writable), - auto_close_(auto_close) { + : Stream(true, readable, writable), file_(file), auto_close_(auto_close) { if (file_ == nullptr) { throw Exception(u"File is NULL."); } @@ -50,11 +46,6 @@ CFileStream::~CFileStream() { } } -bool CFileStream::CanSeek() { - CheckClosed(); - return true; -} - static int ConvertOriginFlag(Stream::SeekOrigin origin) { switch (origin) { case Stream::SeekOrigin::Begin: @@ -68,16 +59,14 @@ static int ConvertOriginFlag(Stream::SeekOrigin origin) { } } -Index CFileStream::Seek(Index offset, SeekOrigin origin) { - CheckClosed(); +Index CFileStream::DoSeek(Index offset, SeekOrigin origin) { if (std::fseek(file_, offset, ConvertOriginFlag(origin))) { throw ErrnoException(u"Seek failed."); } - return Tell(); + return DoTell(); } -Index CFileStream::Tell() { - CheckClosed(); +Index CFileStream::DoTell() { long position = std::ftell(file_); if (position == -1) { throw ErrnoException(u"Tell failed."); @@ -85,48 +74,23 @@ Index CFileStream::Tell() { return position; } -void CFileStream::Rewind() { - CheckClosed(); - std::rewind(file_); -} - -bool CFileStream::CanRead() { - CheckClosed(); - return readable_; -} +void CFileStream::DoRewind() { std::rewind(file_); } -Index CFileStream::Read(std::byte* buffer, Index offset, Index size) { - CheckClosed(); - StreamOperationNotSupportedException::CheckRead(readable_); +Index CFileStream::DoRead(std::byte* buffer, Index offset, Index size) { auto count = std::fread(buffer + offset, 1, size, file_); return count; } -bool CFileStream::CanWrite() { - CheckClosed(); - return writable_; -} - -Index CFileStream::Write(const std::byte* buffer, Index offset, Index size) { - CheckClosed(); - StreamOperationNotSupportedException::CheckWrite(writable_); +Index CFileStream::DoWrite(const std::byte* buffer, Index offset, Index size) { auto count = std::fwrite(buffer + offset, 1, size, file_); return count; } -void CFileStream::Flush() { - CheckClosed(); - std::fflush(file_); -} - -void CFileStream::Close() { - if (file_ != nullptr) { - std::fclose(file_); - file_ = nullptr; - } -} +void CFileStream::DoFlush() { std::fflush(file_); } -void CFileStream::CheckClosed() { - StreamAlreadyClosedException::Check(file_ == nullptr); +void CFileStream::DoClose() { + CRU_STREAM_BEGIN_CLOSE + std::fclose(file_); + file_ = nullptr; } } // namespace cru::io diff --git a/src/common/io/MemoryStream.cpp b/src/common/io/MemoryStream.cpp index 34116193..4b33d780 100644 --- a/src/common/io/MemoryStream.cpp +++ b/src/common/io/MemoryStream.cpp @@ -1,17 +1,31 @@ #include "cru/common/io/MemoryStream.h" #include <cstring> +#include "cru/common/Exception.h" +#include "cru/common/io/Stream.h" namespace cru::io { -MemoryStream::~MemoryStream() { - if (release_func_) { - release_func_(buffer_, size_); +MemoryStream::MemoryStream( + std::byte* buffer, Index size, bool read_only, + std::function<void(std::byte* buffer, Index size)> release_func) + : Stream(true, true, !read_only), + buffer_(buffer), + size_(size), + position_(0), + release_func_(std::move(release_func)) { + if (!buffer) { + throw Exception(u"Buffer is nullptr"); + } + if (size <= 0) { + throw Exception(u"Size is 0 or negative."); } } -bool MemoryStream::CanSeek() { return true; } +MemoryStream::~MemoryStream() {} + +void MemoryStream::Close() { DoClose(); } -Index MemoryStream::Seek(Index offset, SeekOrigin origin) { +Index MemoryStream::DoSeek(Index offset, SeekOrigin origin) { switch (origin) { case SeekOrigin::Current: position_ += offset; @@ -26,34 +40,35 @@ Index MemoryStream::Seek(Index offset, SeekOrigin origin) { return position_; } -bool MemoryStream::CanRead() { return true; } - -Index MemoryStream::Read(std::byte *buffer, Index offset, Index size) { +Index MemoryStream::DoRead(std::byte* buffer, Index offset, Index size) { if (position_ + size > size_) { size = size_ - position_; } if (size <= 0) { return 0; } - std::memcpy(buffer + offset, buffer_ + position_, size); + std::memmove(buffer + offset, buffer_ + position_, size); position_ += size; return size; } -bool MemoryStream::CanWrite() { return !read_only_; } - -Index MemoryStream::Write(const std::byte *buffer, Index offset, Index size) { - if (read_only_) { - return 0; - } +Index MemoryStream::DoWrite(const std::byte* buffer, Index offset, Index size) { if (position_ + size > size_) { size = size_ - position_; } if (size <= 0) { return 0; } - std::memcpy(buffer_ + position_, buffer + offset, size); + std::memmove(buffer_ + position_, buffer + offset, size); position_ += size; return size; } + +void MemoryStream::DoClose() { + CRU_STREAM_BEGIN_CLOSE + release_func_(buffer_, size_); + buffer_ = nullptr; + release_func_ = {}; +} + } // namespace cru::io diff --git a/src/common/io/ProxyStream.cpp b/src/common/io/ProxyStream.cpp index df8e7dcc..c2e64056 100644 --- a/src/common/io/ProxyStream.cpp +++ b/src/common/io/ProxyStream.cpp @@ -3,62 +3,35 @@ namespace cru::io { ProxyStream::ProxyStream(ProxyStreamHandlers handlers) - : closed_(false), handlers_(std::move(handlers)) {} + : Stream(static_cast<bool>(handlers.seek), static_cast<bool>(handlers.read), + static_cast<bool>(handlers.write)), + handlers_(std::move(handlers)) {} ProxyStream::~ProxyStream() { DoClose(); } -bool ProxyStream::CanSeek() { - CheckClosed(); - return static_cast<bool>(handlers_.seek); -} - -Index ProxyStream::Seek(Index offset, SeekOrigin origin) { - CheckClosed(); - StreamOperationNotSupportedException::CheckSeek(CanSeek()); +Index ProxyStream::DoSeek(Index offset, SeekOrigin origin) { return handlers_.seek(offset, origin); } -bool ProxyStream::CanRead() { - CheckClosed(); - return static_cast<bool>(handlers_.read); -} - -Index ProxyStream::Read(std::byte* buffer, Index offset, Index size) { - CheckClosed(); - StreamOperationNotSupportedException::CheckRead(CanRead()); +Index ProxyStream::DoRead(std::byte* buffer, Index offset, Index size) { return handlers_.read(buffer, offset, size); } -bool ProxyStream::CanWrite() { - CheckClosed(); - return static_cast<bool>(handlers_.write); -} - -Index ProxyStream::Write(const std::byte* buffer, Index offset, Index size) { - CheckClosed(); - StreamOperationNotSupportedException::CheckWrite(CanWrite()); +Index ProxyStream::DoWrite(const std::byte* buffer, Index offset, Index size) { return handlers_.write(buffer, offset, size); } -void ProxyStream::Flush() { - CheckClosed(); +void ProxyStream::DoFlush() { if (handlers_.flush) { handlers_.flush(); } } -void ProxyStream::Close() { DoClose(); } - -void ProxyStream::CheckClosed() { - StreamAlreadyClosedException::Check(closed_); -} - void ProxyStream::DoClose() { - if (!closed_) { - if (handlers_.close) { - handlers_.close(); - } - closed_ = true; + CRU_STREAM_BEGIN_CLOSE + if (handlers_.close) { + handlers_.close(); } + handlers_ = {}; } } // namespace cru::io diff --git a/src/common/io/Stream.cpp b/src/common/io/Stream.cpp index 97cfcf99..6b0a513c 100644 --- a/src/common/io/Stream.cpp +++ b/src/common/io/Stream.cpp @@ -30,22 +30,73 @@ void StreamAlreadyClosedException::Check(bool closed) { if (closed) throw StreamAlreadyClosedException(); } -Index Stream::Tell() { return Seek(0, SeekOrigin::Current); } +Stream::Stream(SupportedOperations supported_operations) + : supported_operations_(std::move(supported_operations)), closed_(false) {} -void Stream::Rewind() { Seek(0, SeekOrigin::Begin); } +Stream::Stream(std::optional<bool> can_seek, std::optional<bool> can_read, + std::optional<bool> can_write) + : Stream(SupportedOperations{can_seek, can_read, can_write}) {} + +bool Stream::CanSeek() { + CheckClosed(); + return DoCanSeek(); +} + +Index Stream::Seek(Index offset, SeekOrigin origin) { + CheckClosed(); + StreamOperationNotSupportedException::CheckSeek(DoCanSeek()); + return DoSeek(offset, origin); +} + +Index Stream::Tell() { + CheckClosed(); + return DoTell(); +} + +void Stream::Rewind() { + CheckClosed(); + return DoRewind(); +} Index Stream::GetSize() { - Index current_position = Tell(); - Seek(0, SeekOrigin::End); - Index size = Tell(); - Seek(current_position, SeekOrigin::Begin); - return size; + CheckClosed(); + return DoGetSize(); +} + +bool Stream::CanRead() { + CheckClosed(); + return DoCanRead(); +} + +Index Stream::Read(std::byte* buffer, Index offset, Index size) { + CheckClosed(); + StreamOperationNotSupportedException::CheckRead(DoCanRead()); + return DoRead(buffer, offset, size); } Index Stream::Read(std::byte* buffer, Index size) { return Read(buffer, 0, size); } +Index Stream::Read(char* buffer, Index offset, Index size) { + return Read(reinterpret_cast<std::byte*>(buffer), offset, size); +} + +Index Stream::Read(char* buffer, Index size) { + return Read(reinterpret_cast<std::byte*>(buffer), 0, size); +} + +bool Stream::CanWrite() { + CheckClosed(); + return DoCanWrite(); +} + +Index Stream::Write(const std::byte* buffer, Index offset, Index size) { + CheckClosed(); + StreamOperationNotSupportedException::CheckWrite(DoCanWrite()); + return DoWrite(buffer, offset, size); +} + Index Stream::Write(const std::byte* buffer, Index size) { return Write(buffer, 0, size); } @@ -58,6 +109,74 @@ Index Stream::Write(const char* buffer, Index size) { return Write(reinterpret_cast<const std::byte*>(buffer), size); } +void Stream::Flush() { + CheckClosed(); + DoFlush(); +} + +bool Stream::DoCanSeek() { + if (supported_operations_->can_seek) { + return *supported_operations_->can_seek; + } else { + throw Exception( + u"Can seek is neither set in supported_operations nor implemeted in " + u"virtual function."); + } +} + +bool Stream::DoCanRead() { + if (supported_operations_->can_read) { + return *supported_operations_->can_read; + } else { + throw Exception( + u"Can read is neither set in supported_operations nor implemeted in " + u"virtual function."); + } +} + +bool Stream::DoCanWrite() { + if (supported_operations_->can_write) { + return *supported_operations_->can_write; + } else { + throw Exception( + u"Can write is neither set in supported_operations nor implemeted in " + u"virtual function."); + } +} + +Index Stream::DoSeek(Index offset, SeekOrigin origin) { + throw Exception(u"Stream is seekable but DoSeek is not implemented."); +} + +Index Stream::DoTell() { + StreamOperationNotSupportedException::CheckSeek(DoCanSeek()); + return DoSeek(0, SeekOrigin::Current); +} + +void Stream::DoRewind() { + StreamOperationNotSupportedException::CheckSeek(DoCanSeek()); + DoSeek(0, SeekOrigin::Begin); +} + +Index Stream::DoGetSize() { + StreamOperationNotSupportedException::CheckSeek(DoCanSeek()); + Index current_position = DoTell(); + Seek(0, SeekOrigin::End); + Index size = DoTell(); + Seek(current_position, SeekOrigin::Begin); + return size; +} + +Index Stream::DoRead(std::byte* buffer, Index offset, Index size) { + throw Exception(u"Stream is readable but DoSeek is not implemented."); +} + +Index Stream::DoWrite(const std::byte* buffer, Index offset, Index size) { + throw Exception(u"Stream is writable but DoSeek is not implemented."); +} + +void Stream::DoFlush() {} + Buffer Stream::ReadToEnd(Index grow_size) { Buffer buffer(grow_size); while (true) { @@ -77,8 +196,4 @@ String Stream::ReadToEndAsUtf8String() { auto buffer = ReadToEnd(); return String::FromUtf8(buffer); } - -void Stream::Flush() {} - -void Stream::Close() {} } // namespace cru::io diff --git a/src/common/platform/unix/UnixFileStream.cpp b/src/common/platform/unix/UnixFileStream.cpp index 37f5110c..804e24f0 100644 --- a/src/common/platform/unix/UnixFileStream.cpp +++ b/src/common/platform/unix/UnixFileStream.cpp @@ -48,18 +48,16 @@ UnixFileStream::UnixFileStream(const char *path, int oflag, mode_t mode) { String::FromUtf8(path), oflag, mode)); } - can_seek_ = OflagCanSeek(oflag); - can_read_ = OflagCanRead(oflag); - can_write_ = OflagCanWrite(oflag); + SetSupportedOperations( + {OflagCanSeek(oflag), OflagCanRead(oflag), OflagCanWrite(oflag)}); + auto_close_ = true; } UnixFileStream::UnixFileStream(int fd, bool can_seek, bool can_read, - bool can_write, bool auto_close) { + bool can_write, bool auto_close) + : Stream(can_seek, can_read, can_write) { file_descriptor_ = fd; - can_seek_ = can_seek; - can_read_ = can_read; - can_write_ = can_write; auto_close_ = auto_close; } @@ -73,14 +71,7 @@ UnixFileStream::~UnixFileStream() { } } -bool UnixFileStream::CanSeek() { - CheckClosed(); - return can_seek_; -} - -Index UnixFileStream::Seek(Index offset, SeekOrigin origin) { - CheckClosed(); - StreamOperationNotSupportedException::CheckSeek(can_seek_); +Index UnixFileStream::DoSeek(Index offset, SeekOrigin origin) { off_t result = ::lseek(file_descriptor_, offset, MapSeekOrigin(origin)); if (result == -1) { throw ErrnoException(u"Failed to seek file."); @@ -88,14 +79,7 @@ Index UnixFileStream::Seek(Index offset, SeekOrigin origin) { return result; } -bool UnixFileStream::CanRead() { - CheckClosed(); - return can_read_; -} - -Index UnixFileStream::Read(std::byte *buffer, Index offset, Index size) { - CheckClosed(); - StreamOperationNotSupportedException::CheckRead(can_read_); +Index UnixFileStream::DoRead(std::byte *buffer, Index offset, Index size) { auto result = ::read(file_descriptor_, buffer + offset, size); if (result == -1) { throw ErrnoException(u"Failed to read file."); @@ -103,14 +87,8 @@ Index UnixFileStream::Read(std::byte *buffer, Index offset, Index size) { return result; } -bool UnixFileStream::CanWrite() { - CheckClosed(); - return can_write_; -} - -Index UnixFileStream::Write(const std::byte *buffer, Index offset, Index size) { - CheckClosed(); - StreamOperationNotSupportedException::CheckWrite(can_write_); +Index UnixFileStream::DoWrite(const std::byte *buffer, Index offset, + Index size) { auto result = ::write(file_descriptor_, buffer + offset, size); if (result == -1) { throw ErrnoException(u"Failed to write file."); @@ -118,15 +96,11 @@ Index UnixFileStream::Write(const std::byte *buffer, Index offset, Index size) { return result; } -void UnixFileStream::Close() { - if (file_descriptor_ < 0) return; - if (::close(file_descriptor_) == -1) { +void UnixFileStream::DoClose() { + CRU_STREAM_BEGIN_CLOSE + if (auto_close_ && ::close(file_descriptor_) == -1) { throw ErrnoException(u"Failed to close file."); } file_descriptor_ = -1; } - -void UnixFileStream::CheckClosed() { - StreamAlreadyClosedException::Check(file_descriptor_ < 0); -} } // namespace cru::platform::unix |