diff options
27 files changed, 428 insertions, 772 deletions
diff --git a/include/cru/base/Base.h b/include/cru/base/Base.h index e62054eb..9d10a447 100644 --- a/include/cru/base/Base.h +++ b/include/cru/base/Base.h @@ -145,6 +145,9 @@ class CRU_BASE_API PlatformException : public Exception { class CRU_BASE_API ErrnoException : public Exception { public: + /** + * @brief will retrieve errno automatically. + */ ErrnoException(); explicit ErrnoException(int error_code); /** diff --git a/include/cru/base/Buffer.h b/include/cru/base/Buffer.h deleted file mode 100644 index 17da4ada..00000000 --- a/include/cru/base/Buffer.h +++ /dev/null @@ -1,161 +0,0 @@ -#pragma once - -#include "Base.h" - -namespace cru { -class CRU_BASE_API Buffer final { - friend CRU_BASE_API void swap(Buffer& left, Buffer& right) noexcept; - - public: - Buffer(); - explicit Buffer(Index size); - - Buffer(const Buffer& other); - Buffer(Buffer&& other) noexcept; - - Buffer& operator=(const Buffer& other); - Buffer& operator=(Buffer&& other) noexcept; - - ~Buffer(); - - public: - Index GetBufferSize() const { return size_; } - Index GetUsedSize() const { return used_end_ - used_begin_; } - bool IsNull() const { return ptr_ == nullptr; } - bool IsUsedReachEnd() const { return used_end_ == size_; } - - Index GetFrontFree() const { return used_begin_; } - Index GetBackFree() const { return size_ - used_end_; } - - Index GetUsedBegin() const { return used_begin_; } - Index GetUsedEnd() const { return used_end_; } - - std::byte* GetPtr() { return GetPtrAt(0); } - const std::byte* GetPtr() const { return GetPtrAt(0); } - - std::byte* GetPtrAt(Index index) { return ptr_ + index; } - const std::byte* GetPtrAt(Index index) const { return ptr_ + index; } - - std::byte& GetRefAt(Index index) { return *GetPtrAt(index); } - const std::byte& GetRefAt(Index index) const { return *GetPtrAt(index); } - - std::byte* GetUsedBeginPtr() { return GetPtrAt(GetUsedBegin()); } - const std::byte* GetUsedBeginPtr() const { return GetPtrAt(GetUsedBegin()); } - std::byte* GetUsedEndPtr() { return GetPtrAt(GetUsedEnd()); } - const std::byte* GetUsedEndPtr() const { return GetPtrAt(GetUsedEnd()); } - - std::byte GetByteAt(Index index) const { return ptr_[index]; } - void SetByteAt(Index index, std::byte value) { ptr_[index] = value; } - - void AssignBytes(std::byte* src, Index src_size, bool use_memmove = false) { - return AssignBytes(0, src, 0, src_size, use_memmove); - } - void AssignBytes(Index dst_offset, std::byte* src, Index src_size, - bool use_memmove = false) { - return AssignBytes(dst_offset, src, 0, src_size, use_memmove); - } - void AssignBytes(Index dst_offset, std::byte* src, Index src_offset, - Index src_size, bool use_memmove = false); - - /** - * @brief Change the size of the buffer. - * - * Unless new size is the same as current size, the buffer is always released - * and a new one is allocated. If preserve_used is true, the used size and old - * data is copied to the new buffer. If new size is smaller than old used - * size, then exceeded data will be lost. - */ - void ResizeBuffer(Index new_size, bool preserve_used); - - /** - * @brief Append data to the front of used bytes and increase used size. - * @return The actual size of data saved. - * - * If there is no enough space left for new data, the rest space will be - * written and the size of it will be returned, leaving exceeded data not - * saved. - */ - Index PushFront(const std::byte* other, Index other_size, - bool use_memmove = false); - - bool PushBack(std::byte b); - - /** - * @brief Append data to the back of used bytes and increase used size. - * @return The actual size of data saved. - * - * If there is no enough space left for new data, the rest space will be - * written and the size of it will be returned, leaving exceeded data not - * saved. - */ - Index PushBack(const std::byte* other, Index other_size, - bool use_memmove = false); - - void PushBackCount(Index count); - - /** - * @brief Move forward the used-begin ptr. - * @return The actual size moved forward. - * - * If given size is bigger than current used size, the used size will be - * returned and set to 0. - */ - Index PopFront(Index size); - - /** - * @brief Pop front data of used bytes into another buffer. - * @return The actual size popped. - * - * If given size is bigger than current used size, then only current used size - * of bytes will be popped. If given size is smaller than current used size, - * then only given size of bytes will be popped. - */ - Index PopFront(std::byte* buffer, Index size, bool use_memmove = false); - - /** - * @brief Move backward the used-end ptr. - * @return The actual size moved backward. - * - * If given size is bigger than current used size, the used size will be - * returned and set to 0. - */ - Index PopEnd(Index size); - - /** - * @brief Pop back data of used bytes into another buffer. - * @return The actual size popped. - * - * If given size is bigger than current used size, then only current used size - * of bytes will be popped. If given size is smaller than current used size, - * then only given size of bytes will be popped. - */ - Index PopEnd(std::byte* buffer, Index size, bool use_memmove = false); - - operator std::byte*() { return GetPtr(); } - operator const std::byte*() const { return GetPtr(); } - - /** - * @brief Detach internal buffer and return it. - * @param size If not null, size of the buffer is written to it. - * @return The buffer pointer. May be nullptr. - * - * After detach, you are responsible to delete[] it. - */ - std::byte* Detach(Index* size = nullptr); - - private: - void Copy_(const Buffer& other); - void Move_(Buffer&& other) noexcept; - void Delete_() noexcept; - - void AssertValid(); - - private: - std::byte* ptr_; - Index size_; - Index used_begin_; - Index used_end_; -}; - -CRU_BASE_API void swap(Buffer& left, Buffer& right) noexcept; -} // namespace cru diff --git a/include/cru/base/io/AutoReadStream.h b/include/cru/base/io/AutoReadStream.h index 56e2beca..41ca1118 100644 --- a/include/cru/base/io/AutoReadStream.h +++ b/include/cru/base/io/AutoReadStream.h @@ -3,7 +3,6 @@ #include "BufferStream.h" #include "Stream.h" -#include <mutex> #include <thread> namespace cru::io { @@ -45,18 +44,16 @@ class CRU_BASE_API AutoReadStream : public Stream { ~AutoReadStream() override; public: - CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE - - void BeginToDrop(bool auto_close = true, bool auto_delete = true); + Stream* GetUnderlyingStream() { return stream_; } protected: + bool DoCanWrite() override; Index DoRead(std::byte* buffer, Index offset, Index size) override; Index DoWrite(const std::byte* buffer, Index offset, Index size) override; void DoFlush() override; + void DoClose() override; private: - void DoClose(); - void BackgroundThreadRun(); private: diff --git a/include/cru/base/io/BufferStream.h b/include/cru/base/io/BufferStream.h index d4ee3837..c8e8f707 100644 --- a/include/cru/base/io/BufferStream.h +++ b/include/cru/base/io/BufferStream.h @@ -1,6 +1,5 @@ #pragma once -#include "../Buffer.h" #include "Stream.h" #include <condition_variable> @@ -8,12 +7,6 @@ #include <mutex> namespace cru::io { -class WriteAfterEofException : public Exception { - public: - using Exception::Exception; - ~WriteAfterEofException() override = default; -}; - struct BufferStreamOptions { /** * Actually I have no ideas about the best value for this. May change it later @@ -51,34 +44,50 @@ struct BufferStreamOptions { }; /** - * @brief SPSC (Single Producer Single Consumer) buffer stream. - * - * If used by multiple producer or multiple consumer, the behavior is undefined. + * @brief MPMC (Multiple Producer Multiple Consumer) buffer stream. */ class BufferStream : public Stream { public: - BufferStream(const BufferStreamOptions& options); + explicit BufferStream(const BufferStreamOptions& options = {}); ~BufferStream() override; - CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE - - void SetEof(); + void WriteEof(); protected: Index DoRead(std::byte* buffer, Index offset, Index size) override; Index DoWrite(const std::byte* buffer, Index offset, Index size) override; + void DoClose() override; private: - void DoClose(); + struct Block { + std::byte* buffer; + Index size; + Index start; + Index end; + + explicit Block(Index size); + + CRU_DELETE_COPY(Block) + + Block(Block&& other) noexcept; + Block& operator=(Block&& other) noexcept; + + ~Block(); + + Index Read(std::byte* des, Index si); + Index Write(const std::byte* src, Index si); + bool IsFull() const; + bool IsEmpty() const; + }; - private: Index block_size_; Index max_block_count_; - std::list<Buffer> buffer_list_; - bool eof_; + std::list<Block> buffer_list_; + bool eof_written_; std::mutex mutex_; - std::condition_variable condition_variable_; + std::condition_variable read_cv_; + std::condition_variable write_cv_; }; } // namespace cru::io diff --git a/include/cru/base/io/CFileStream.h b/include/cru/base/io/CFileStream.h index 0b58bdc9..66745955 100644 --- a/include/cru/base/io/CFileStream.h +++ b/include/cru/base/io/CFileStream.h @@ -11,15 +11,10 @@ class CRU_BASE_API CFileStream : public Stream { explicit CFileStream(std::FILE* file, bool readable = true, bool writable = true, bool auto_close = true); - CRU_DELETE_COPY(CFileStream) - CRU_DELETE_MOVE(CFileStream) - ~CFileStream() override; public: - CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE - - std::FILE* GetHandle() const; + std::FILE* GetHandle(); protected: Index DoSeek(Index offset, SeekOrigin origin) override; @@ -28,9 +23,7 @@ class CRU_BASE_API CFileStream : public Stream { Index DoRead(std::byte* buffer, Index offset, Index size) override; Index DoWrite(const std::byte* buffer, Index offset, Index size) override; void DoFlush() override; - - private: - void DoClose(); + void DoClose() override; private: std::FILE* file_; diff --git a/include/cru/base/io/MemoryStream.h b/include/cru/base/io/MemoryStream.h index a1f90c3b..155219a8 100644 --- a/include/cru/base/io/MemoryStream.h +++ b/include/cru/base/io/MemoryStream.h @@ -14,18 +14,14 @@ class CRU_BASE_API MemoryStream : public Stream { ~MemoryStream() override; public: - void Close() override; - - std::byte* GetBuffer() const { return buffer_; } + std::byte* GetBuffer() { return buffer_; } protected: Index DoSeek(Index offset, SeekOrigin origin) override; Index DoGetSize() override { return size_; } Index DoRead(std::byte* buffer, Index offset, Index size) override; Index DoWrite(const std::byte* buffer, Index offset, Index size) override; - - private: - void DoClose(); + void DoClose() override; private: std::byte* buffer_; diff --git a/include/cru/base/io/ProxyStream.h b/include/cru/base/io/ProxyStream.h deleted file mode 100644 index 42ec9dfd..00000000 --- a/include/cru/base/io/ProxyStream.h +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once - -#include "Stream.h" - -#include <functional> - -namespace cru::io { -struct ProxyStreamHandlers { - std::function<Index(Index offset, Stream::SeekOrigin origin)> seek; - std::function<Index(std::byte* buffer, Index offset, Index size)> read; - std::function<Index(const std::byte* buffer, Index offset, Index size)> write; - std::function<void()> flush; - - /** - * @brief This method will be only called once when `Close` is called or the - * stream is destructed. - */ - std::function<void()> close; -}; - -class ProxyStream : public Stream { - public: - explicit ProxyStream(ProxyStreamHandlers handlers); - - ~ProxyStream() override; - - public: - CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE - - protected: - Index DoSeek(Index offset, SeekOrigin origin) override; - Index DoRead(std::byte* buffer, Index offset, Index size) override; - Index DoWrite(const std::byte* buffer, Index offset, Index size) override; - void DoFlush() override; - - private: - void DoClose(); - - private: - ProxyStreamHandlers handlers_; -}; -} // namespace cru::io diff --git a/include/cru/base/io/Stream.h b/include/cru/base/io/Stream.h index 8c0d3669..f497d725 100644 --- a/include/cru/base/io/Stream.h +++ b/include/cru/base/io/Stream.h @@ -1,46 +1,51 @@ #pragma once #include "../Base.h" -#include "../Buffer.h" +#include "../Guard.h" // IWYU pragma: keep +#include <atomic> #include <cstddef> namespace cru::io { -class CRU_BASE_API StreamOperationNotSupportedException : public Exception { - public: - explicit StreamOperationNotSupportedException(std::string operation); +class Stream; +class CRU_BASE_API StreamException : public Exception { public: - std::string GetOperation() const { return operation_; } + explicit StreamException(Stream* stream, std::string message = "", + std::shared_ptr<std::exception> inner = nullptr); - public: - static void CheckSeek(bool seekable); - static void CheckRead(bool readable); - static void CheckWrite(bool writable); + Stream* GetStream() const { return stream_; } private: - std::string operation_; + Stream* stream_; }; -class CRU_BASE_API StreamClosedException : public Exception { +class CRU_BASE_API StreamOperationNotSupportedException + : public StreamException { public: - StreamClosedException(); + explicit StreamOperationNotSupportedException(Stream* stream, + std::string operation); - CRU_DEFAULT_DESTRUCTOR(StreamClosedException) + public: + std::string GetOperation() const { return operation_; } + + public: + static void CheckSeek(Stream* stream, bool seekable); + static void CheckRead(Stream* stream, bool readable); + static void CheckWrite(Stream* stream, bool writable); - static void Check(bool closed); + private: + std::string operation_; }; -#define CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE \ - void Close() override { DoClose(); } +class CRU_BASE_API StreamClosedException : public StreamException { + using StreamException::StreamException; +}; -#define CRU_STREAM_BEGIN_CLOSE \ - if (GetClosed()) return; \ - CloseGuard close_guard(this); +class CRU_BASE_API StreamIOException : public StreamException { + using StreamException::StreamException; +}; -/** - * All stream is thread-unsafe by default unless being documented. - */ class CRU_BASE_API Stream : public Object { protected: struct SupportedOperations { @@ -49,12 +54,6 @@ class CRU_BASE_API Stream : public Object { std::optional<bool> can_write; }; - struct CloseGuard { - explicit CloseGuard(Stream* stream) : stream(stream) {} - ~CloseGuard() { stream->SetClosed(true); } - Stream* stream; - }; - protected: explicit Stream(SupportedOperations supported_operations = {}); Stream(std::optional<bool> can_seek, std::optional<bool> can_read, @@ -62,8 +61,7 @@ class CRU_BASE_API Stream : public Object { public: enum class SeekOrigin { Current, Begin, End }; - - ~Stream() override = default; + constexpr static Index kEOF = -1; public: bool CanSeek(); @@ -85,11 +83,11 @@ class CRU_BASE_API Stream : public Object { Index Write(const char* buffer, Index size); void Flush(); - virtual void Close() = 0; - virtual Buffer ReadToEnd(Index grow_size = 256); + bool IsClosed(); + bool Close(); - // Utf8 encoding. + virtual std::vector<std::byte> ReadToEnd(Index grow_size = 256); std::string ReadToEndAsUtf8String(); protected: @@ -103,17 +101,13 @@ class CRU_BASE_API Stream : public Object { virtual Index DoRead(std::byte* buffer, Index offset, Index size); virtual Index DoWrite(const std::byte* buffer, Index offset, Index size); virtual void DoFlush(); + virtual void DoClose(); - void SetSupportedOperations(SupportedOperations supported_operations) { - supported_operations_ = std::move(supported_operations); - } - - bool GetClosed() { return closed_; } - void SetClosed(bool closed) { closed_ = closed; } - void CheckClosed() { StreamClosedException::Check(closed_); } + void SetSupportedOperations(SupportedOperations supported_operations); + void CheckClosed(); private: SupportedOperations supported_operations_; - bool closed_; + std::atomic_bool closed_; }; } // namespace cru::io diff --git a/include/cru/base/platform/unix/UnixFileStream.h b/include/cru/base/platform/unix/UnixFileStream.h index 1657bfbb..81174151 100644 --- a/include/cru/base/platform/unix/UnixFileStream.h +++ b/include/cru/base/platform/unix/UnixFileStream.h @@ -19,17 +19,13 @@ class UnixFileStream : public io::Stream { ~UnixFileStream() override; public: - CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE - int GetFileDescriptor() const { return file_descriptor_; } protected: Index DoSeek(Index offset, SeekOrigin origin = SeekOrigin::Current) override; Index DoRead(std::byte* buffer, Index offset, Index size) override; Index DoWrite(const std::byte* buffer, Index offset, Index size) override; - - private: - void DoClose(); + void DoClose() override; private: UnixFileDescriptor file_descriptor_; diff --git a/include/cru/base/platform/win/Stream.h b/include/cru/base/platform/win/Stream.h index 104b3bd7..9dd33d35 100644 --- a/include/cru/base/platform/win/Stream.h +++ b/include/cru/base/platform/win/Stream.h @@ -24,15 +24,11 @@ class CRU_BASE_API Win32HandleStream : public io::Stream { Index DoSeek(Index offset, SeekOrigin origin) override; Index DoRead(std::byte* buffer, Index offset, Index size) override; Index DoWrite(const std::byte* buffer, Index offset, Index size) override; + void DoClose() override; public: HANDLE GetHandle() { return handle_; } - CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE - - private: - void DoClose(); - private: HANDLE handle_; bool auto_close_; @@ -51,15 +47,11 @@ class CRU_BASE_API ComStream : public io::Stream { Index DoSeek(Index offset, SeekOrigin origin) override; Index DoRead(std::byte* buffer, Index offset, Index size) override; Index DoWrite(const std::byte* buffer, Index offset, Index size) override; + void DoClose() override; public: IStream* GetComStream() { return stream_; } - CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE - - private: - void DoClose(); - private: IStream* stream_; bool auto_release_; diff --git a/src/base/Buffer.cpp b/src/base/Buffer.cpp deleted file mode 100644 index 838c6f12..00000000 --- a/src/base/Buffer.cpp +++ /dev/null @@ -1,276 +0,0 @@ -#include "cru/base/Buffer.h" - -#include <cstring> - -namespace cru { -namespace { -void CheckSize(Index size) { - if (size < 0) { - throw Exception("Size of buffer can't be smaller than 0."); - } -} -} // namespace - -Buffer::Buffer() { - ptr_ = nullptr; - size_ = used_begin_ = used_end_ = 0; -} - -Buffer::Buffer(Index size) { - CheckSize(size); - if (size == 0) { - ptr_ = nullptr; - size_ = used_begin_ = used_end_ = 0; - } else { - ptr_ = new std::byte[size]; - size_ = size; - used_begin_ = used_end_ = 0; - } - AssertValid(); -} - -Buffer::Buffer(const Buffer& other) { Copy_(other); } - -Buffer::Buffer(Buffer&& other) noexcept { Move_(std::move(other)); } - -Buffer& Buffer::operator=(const Buffer& other) { - if (this != &other) { - Delete_(); - Copy_(other); - } - return *this; -} - -Buffer& Buffer::operator=(Buffer&& other) noexcept { - if (this != &other) { - Delete_(); - Move_(std::move(other)); - } - return *this; -} - -Buffer::~Buffer() { Delete_(); } - -void Buffer::AssignBytes(Index dst_offset, std::byte* src, Index src_offset, - Index src_size, bool use_memmove) { - CheckSize(src_size); - - AssertValid(); - - (use_memmove ? std::memmove : std::memcpy)(ptr_ + dst_offset, - src + src_offset, src_size); - AssertValid(); -} - -void Buffer::ResizeBuffer(Index new_size, bool preserve_used) { - CheckSize(new_size); - - AssertValid(); - - if (new_size == 0) { - Delete_(); - ptr_ = nullptr; - size_ = used_begin_ = used_end_ = 0; - return; - } - - auto old_ptr = ptr_; - - ptr_ = new std::byte[new_size]; - size_ = new_size; - used_begin_ = std::min(new_size, used_begin_); - used_end_ = std::min(new_size, used_end_); - - if (old_ptr) { - if (preserve_used && used_begin_ < used_end_) { - std::memcpy(ptr_ + used_begin_, old_ptr + used_begin_, - used_end_ - used_begin_); - } - delete[] old_ptr; - } - - AssertValid(); -} - -Index Buffer::PushFront(const std::byte* other, Index other_size, - bool use_memmove) { - CheckSize(other_size); - - AssertValid(); - - auto copy_size = std::min(used_begin_, other_size); - - if (copy_size) { - used_begin_ -= copy_size; - (use_memmove ? std::memmove : std::memcpy)(ptr_ + used_begin_, other, - copy_size); - } - - AssertValid(); - - return copy_size; -} - -bool Buffer::PushBack(std::byte b) { - AssertValid(); - if (IsUsedReachEnd()) { - return false; - } - ptr_[used_end_] = b; - used_end_++; - AssertValid(); - return true; -} - -Index Buffer::PushBack(const std::byte* other, Index other_size, - bool use_memmove) { - CheckSize(other_size); - - AssertValid(); - - auto copy_size = std::min(size_ - used_end_, other_size); - - if (copy_size) { - (use_memmove ? std::memmove : std::memcpy)(ptr_ + used_end_, other, - copy_size); - used_end_ += copy_size; - } - - AssertValid(); - - return copy_size; -} - -void Buffer::PushBackCount(Index count) { - if (count < 0 || count > GetBackFree()) { - throw Exception("Count out of range in PushBackCount."); - } - used_end_ += count; -} - -Index Buffer::PopFront(Index size) { - CheckSize(size); - - AssertValid(); - - auto move = std::min(used_begin_, size); - used_begin_ -= move; - - AssertValid(); - - return move; -} - -Index Buffer::PopFront(std::byte* buffer, Index size, bool use_memmove) { - CheckSize(size); - - AssertValid(); - - auto pop_size = std::min(GetUsedSize(), size); - - if (pop_size) { - used_begin_ += pop_size; - (use_memmove ? std::memmove : std::memcpy)( - buffer, GetUsedBeginPtr() - pop_size, pop_size); - } - - AssertValid(); - - return pop_size; -} - -Index Buffer::PopEnd(Index size) { - CheckSize(size); - - AssertValid(); - - auto move = std::min(size_ - used_end_, size); - used_end_ += move; - - AssertValid(); - - return move; -} - -Index Buffer::PopEnd(std::byte* buffer, Index size, bool use_memmove) { - CheckSize(size); - - AssertValid(); - - auto pop_size = std::min(GetUsedSize(), size); - - if (pop_size) { - used_end_ -= pop_size; - (use_memmove ? std::memmove : std::memcpy)(buffer, GetUsedEndPtr(), - pop_size); - } - - AssertValid(); - - return pop_size; -} - -std::byte* Buffer::Detach(Index* size) { - AssertValid(); - - auto ptr = this->ptr_; - if (size) { - *size = this->size_; - } - this->ptr_ = nullptr; - this->size_ = this->used_begin_ = this->used_end_ = 0; - - AssertValid(); - - return ptr; -} - -void Buffer::Copy_(const Buffer& other) { - if (other.ptr_ == nullptr) { - ptr_ = nullptr; - size_ = used_begin_ = used_end_ = 0; - } else { - ptr_ = new std::byte[other.size_]; - size_ = other.size_; - used_begin_ = other.used_begin_; - used_end_ = other.used_end_; - std::memcpy(ptr_ + used_begin_, other.ptr_ + used_begin_, - used_end_ - used_begin_); - } - AssertValid(); -} - -void Buffer::Move_(Buffer&& other) noexcept { - ptr_ = other.ptr_; - size_ = other.size_; - used_begin_ = other.used_begin_; - used_end_ = other.used_end_; - other.ptr_ = nullptr; - other.size_ = other.used_begin_ = other.used_end_ = 0; - AssertValid(); -} - -void Buffer::Delete_() noexcept { - if (ptr_) { - delete[] ptr_; - } -} - -void Buffer::AssertValid() { - assert(size_ >= 0); - assert(used_begin_ >= 0); - assert(used_begin_ <= size_); - assert(used_end_ >= 0); - assert(used_end_ <= size_); - assert(used_end_ >= used_begin_); - assert((ptr_ == nullptr && size_ == 0) || (ptr_ != nullptr && size_ > 0)); -} - -void swap(Buffer& left, Buffer& right) noexcept { - using std::swap; - swap(left.ptr_, right.ptr_); - swap(left.size_, right.size_); - swap(left.used_begin_, right.used_begin_); - swap(left.used_end_, right.used_end_); -} -} // namespace cru diff --git a/src/base/CMakeLists.txt b/src/base/CMakeLists.txt index 0840b130..a154ebee 100644 --- a/src/base/CMakeLists.txt +++ b/src/base/CMakeLists.txt @@ -1,6 +1,5 @@ add_library(CruBase Base.cpp - Buffer.cpp PropertyTree.cpp StringUtil.cpp SubProcess.cpp @@ -9,7 +8,6 @@ add_library(CruBase io/BufferStream.cpp io/CFileStream.cpp io/Stream.cpp - io/ProxyStream.cpp io/Resource.cpp io/MemoryStream.cpp log/Logger.cpp diff --git a/src/base/io/AutoReadStream.cpp b/src/base/io/AutoReadStream.cpp index 0c035648..61cc9f26 100644 --- a/src/base/io/AutoReadStream.cpp +++ b/src/base/io/AutoReadStream.cpp @@ -8,7 +8,7 @@ namespace cru::io { AutoReadStream::AutoReadStream(Stream* stream, bool auto_close, bool auto_delete, const AutoReadStreamOptions& options) - : Stream(false, true, stream->CanSeek()), + : Stream(false, true, std::nullopt), auto_close_(auto_close), auto_delete_(auto_delete) { auto buffer_stream_options = options.GetBufferStreamOptions(); @@ -19,10 +19,15 @@ AutoReadStream::AutoReadStream(Stream* stream, bool auto_close, } AutoReadStream::~AutoReadStream() { - DoClose(); + if (auto_delete_) { + delete stream_; + } + buffer_stream_->Close(); background_thread_.join(); } +bool AutoReadStream::DoCanWrite() { return stream_->CanWrite(); } + Index AutoReadStream::DoRead(std::byte* buffer, Index offset, Index size) { return buffer_stream_->Read(buffer, offset, size); } @@ -35,14 +40,9 @@ Index AutoReadStream::DoWrite(const std::byte* buffer, Index offset, void AutoReadStream::DoFlush() { stream_->Flush(); } void AutoReadStream::DoClose() { - CRU_STREAM_BEGIN_CLOSE if (auto_close_) { stream_->Close(); } - if (auto_delete_) { - delete stream_; - stream_ = nullptr; - } buffer_stream_->Close(); } @@ -51,14 +51,13 @@ void AutoReadStream::BackgroundThreadRun() { while (true) { try { auto read = stream_->Read(buffer.data(), buffer.size()); - if (read == 0) { - buffer_stream_->SetEof(); + if (read == kEOF) { + buffer_stream_->WriteEof(); break; } else { buffer_stream_->Write(buffer.data(), read); } - } catch (const StreamClosedException& exception) { - buffer_stream_->Close(); + } catch (const StreamException&) { break; } } diff --git a/src/base/io/BufferStream.cpp b/src/base/io/BufferStream.cpp index 0dbb438b..fdcd25ff 100644 --- a/src/base/io/BufferStream.cpp +++ b/src/base/io/BufferStream.cpp @@ -6,35 +6,34 @@ BufferStream::BufferStream(const BufferStreamOptions& options) : Stream(false, true, true) { block_size_ = options.GetBlockSizeOrDefault(); max_block_count_ = options.GetMaxBlockCount(); - - eof_ = false; + eof_written_ = false; } -BufferStream::~BufferStream() { DoClose(); } +BufferStream::~BufferStream() {} Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) { std::unique_lock lock(mutex_); - condition_variable_.wait( - lock, [this] { return GetClosed() || !buffer_list_.empty() || eof_; }); + read_cv_.wait(lock, [this] { + return eof_written_ || !buffer_list_.empty() || IsClosed(); + }); - if (GetClosed()) { - StreamClosedException::Check(true); - } + CheckClosed(); - if (buffer_list_.empty() && eof_) { - return 0; + if (buffer_list_.empty() && eof_written_) { + return kEOF; } - auto full = max_block_count_ > 0 && buffer_list_.size() == max_block_count_; + auto full_previously = max_block_count_ > 0 && + buffer_list_.size() == max_block_count_ && + buffer_list_.back().IsFull(); Index read = 0; while (!buffer_list_.empty()) { auto& stream_buffer = buffer_list_.front(); - auto this_read = - stream_buffer.PopFront(buffer + offset + read, size - read); - if (stream_buffer.GetUsedSize() == 0) { + auto this_read = stream_buffer.Read(buffer + offset + read, size - read); + if (stream_buffer.IsEmpty()) { buffer_list_.pop_front(); } read += this_read; @@ -43,10 +42,8 @@ Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) { } } - if (full && buffer_list_.size() < max_block_count_) { - // By convention, there should be at most one producer waiting. So - // notify_one and notify_all should be the same. - condition_variable_.notify_one(); + if (full_previously && buffer_list_.size() < max_block_count_) { + write_cv_.notify_all(); } return read; @@ -55,69 +52,109 @@ Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) { Index BufferStream::DoWrite(const std::byte* buffer, Index offset, Index size) { std::unique_lock lock(mutex_); - if (eof_) { - throw WriteAfterEofException( - "Stream has been set eof. Can't write to it any more."); - } - - condition_variable_.wait(lock, [this] { - return GetClosed() || max_block_count_ <= 0 || + write_cv_.wait(lock, [this] { + return eof_written_ || max_block_count_ <= 0 || buffer_list_.size() < max_block_count_ || - buffer_list_.back().GetBackFree() > 0; + !buffer_list_.back().IsFull() || IsClosed(); }); - if (GetClosed()) { - StreamClosedException::Check(true); + CheckClosed(); + + if (eof_written_) { + throw StreamIOException( + this, "Stream has been set eof. Can't write to it any more."); } - auto empty = buffer_list_.empty(); + auto empty_previously = buffer_list_.empty(); Index written = 0; - if (empty) { - buffer_list_.push_back(Buffer(block_size_)); + if (empty_previously) { + buffer_list_.push_back(Block(block_size_)); } while (true) { - if (buffer_list_.back().GetBackFree() == 0) { + if (buffer_list_.back().IsFull()) { if (max_block_count_ > 0 && buffer_list_.size() == max_block_count_) { break; } - buffer_list_.push_back(Buffer(block_size_)); + buffer_list_.push_back(Block(block_size_)); } auto& stream_buffer = buffer_list_.back(); auto this_written = - stream_buffer.PushBack(buffer + offset + written, size - written); + stream_buffer.Write(buffer + offset + written, size - written); written += this_written; if (written == size) { break; } } - if (empty) { - // By convention, there should be at most one consumer waiting. So - // notify_one and notify_all should be the same. - condition_variable_.notify_one(); + if (empty_previously) { + read_cv_.notify_all(); } return written; } -void BufferStream::SetEof() { +void BufferStream::DoClose() { std::unique_lock lock(mutex_); - eof_ = true; - if (buffer_list_.empty()) { - // By convention, there should be at most one consumer waiting. So - // notify_one and notify_all should be the same. - condition_variable_.notify_one(); + buffer_list_.clear(); + read_cv_.notify_all(); + write_cv_.notify_all(); +} + +void BufferStream::WriteEof() { + std::unique_lock lock(mutex_); + + eof_written_ = true; + read_cv_.notify_all(); + write_cv_.notify_all(); +} + +BufferStream::Block::Block(Index size) + : buffer(new std::byte[size]), size(size), start(0), end(0) {} + +BufferStream::Block::Block(Block&& other) noexcept + : buffer(other.buffer), + size(other.size), + start(other.start), + end(other.end) { + other.buffer = nullptr; + other.size = other.start = other.end = 0; +} + +BufferStream::Block& BufferStream::Block::operator=(Block&& other) noexcept { + if (this != &other) { + delete[] buffer; + buffer = other.buffer; + size = other.size; + start = other.start; + end = other.end; + other.buffer = nullptr; + other.size = other.start = other.end = 0; } + return *this; } -void BufferStream::DoClose() { - CRU_STREAM_BEGIN_CLOSE - SetClosed(true); - condition_variable_.notify_all(); - buffer_list_.clear(); +BufferStream::Block::~Block() { delete[] buffer; } + +Index BufferStream::Block::Read(std::byte* des, Index si) { + si = std::min(si, end - start); + std::memcpy(des, buffer + start, si); + start += si; + return si; +} + +Index BufferStream::Block::Write(const std::byte* src, Index si) { + si = std::min(si, size - end); + std::memcpy(buffer + end, src, si); + end += si; + return si; } + +bool BufferStream::Block::IsFull() const { return end == size; } + +bool BufferStream::Block::IsEmpty() const { return end == start; } + } // namespace cru::io diff --git a/src/base/io/CFileStream.cpp b/src/base/io/CFileStream.cpp index db477077..9632cba9 100644 --- a/src/base/io/CFileStream.cpp +++ b/src/base/io/CFileStream.cpp @@ -27,7 +27,8 @@ CFileStream::CFileStream(const char* path, const char* mode) file_(std::fopen(path, mode)), auto_close_(true) { if (file_ == nullptr) { - throw ErrnoException("Cannot open file."); + throw StreamIOException(this, "fopen failed.", + std::make_shared<ErrnoException>()); } } @@ -40,7 +41,7 @@ CFileStream::CFileStream(std::FILE* file, bool readable, bool writable, } CFileStream::~CFileStream() { - if (auto_close_ && file_ != nullptr) { + if (file_ && auto_close_) { std::fclose(file_); } } @@ -60,7 +61,8 @@ static int ConvertOriginFlag(Stream::SeekOrigin origin) { Index CFileStream::DoSeek(Index offset, SeekOrigin origin) { if (std::fseek(file_, offset, ConvertOriginFlag(origin))) { - throw ErrnoException("Seek failed."); + throw StreamIOException(this, "fseek failed.", + std::make_shared<ErrnoException>()); } return DoTell(); } @@ -68,7 +70,8 @@ Index CFileStream::DoSeek(Index offset, SeekOrigin origin) { Index CFileStream::DoTell() { long position = std::ftell(file_); if (position == -1) { - throw ErrnoException("Tell failed."); + throw StreamIOException(this, "ftell failed.", + std::make_shared<ErrnoException>()); } return position; } @@ -77,20 +80,28 @@ void CFileStream::DoRewind() { std::rewind(file_); } Index CFileStream::DoRead(std::byte* buffer, Index offset, Index size) { auto count = std::fread(buffer + offset, 1, size, file_); + if (std::ferror(file_)) { + throw StreamIOException(this, "Error occurred when reading C FILE."); + } + if (count == 0 && std::feof(file_)) { + return kEOF; + } return count; } Index CFileStream::DoWrite(const std::byte* buffer, Index offset, Index size) { auto count = std::fwrite(buffer + offset, 1, size, file_); + if (std::ferror(file_)) { + throw StreamIOException(this, "Error occurred when writing C FILE."); + } return count; } void CFileStream::DoFlush() { std::fflush(file_); } void CFileStream::DoClose() { - CRU_STREAM_BEGIN_CLOSE if (auto_close_ && !std::fclose(file_)) { - throw Exception("Failed to close FILE."); + throw StreamIOException(this, "fclose failed."); } file_ = nullptr; } diff --git a/src/base/io/MemoryStream.cpp b/src/base/io/MemoryStream.cpp index 4d289197..555526cd 100644 --- a/src/base/io/MemoryStream.cpp +++ b/src/base/io/MemoryStream.cpp @@ -20,9 +20,11 @@ MemoryStream::MemoryStream( } } -MemoryStream::~MemoryStream() {} - -void MemoryStream::Close() { DoClose(); } +MemoryStream::~MemoryStream() { + if (buffer_ && release_func_) { + release_func_(buffer_, size_); + } +} Index MemoryStream::DoSeek(Index offset, SeekOrigin origin) { switch (origin) { @@ -40,6 +42,10 @@ Index MemoryStream::DoSeek(Index offset, SeekOrigin origin) { } Index MemoryStream::DoRead(std::byte* buffer, Index offset, Index size) { + if (position_ == size_) { + return kEOF; + } + if (position_ + size > size_) { size = size_ - position_; } @@ -64,10 +70,11 @@ Index MemoryStream::DoWrite(const std::byte* buffer, Index offset, Index size) { } void MemoryStream::DoClose() { - CRU_STREAM_BEGIN_CLOSE - release_func_(buffer_, size_); + if (release_func_) { + release_func_(buffer_, size_); + release_func_ = {}; + } buffer_ = nullptr; - release_func_ = {}; } } // namespace cru::io diff --git a/src/base/io/ProxyStream.cpp b/src/base/io/ProxyStream.cpp deleted file mode 100644 index de66169e..00000000 --- a/src/base/io/ProxyStream.cpp +++ /dev/null @@ -1,37 +0,0 @@ -#include "cru/base/io/ProxyStream.h" -#include "cru/base/io/Stream.h" - -namespace cru::io { -ProxyStream::ProxyStream(ProxyStreamHandlers handlers) - : Stream(static_cast<bool>(handlers.seek), static_cast<bool>(handlers.read), - static_cast<bool>(handlers.write)), - handlers_(std::move(handlers)) {} - -ProxyStream::~ProxyStream() { DoClose(); } - -Index ProxyStream::DoSeek(Index offset, SeekOrigin origin) { - return handlers_.seek(offset, origin); -} - -Index ProxyStream::DoRead(std::byte* buffer, Index offset, Index size) { - return handlers_.read(buffer, offset, size); -} - -Index ProxyStream::DoWrite(const std::byte* buffer, Index offset, Index size) { - return handlers_.write(buffer, offset, size); -} - -void ProxyStream::DoFlush() { - if (handlers_.flush) { - handlers_.flush(); - } -} - -void ProxyStream::DoClose() { - CRU_STREAM_BEGIN_CLOSE - if (handlers_.close) { - handlers_.close(); - } - handlers_ = {}; -} -} // namespace cru::io diff --git a/src/base/io/Stream.cpp b/src/base/io/Stream.cpp index c7286241..b0a9ab0c 100644 --- a/src/base/io/Stream.cpp +++ b/src/base/io/Stream.cpp @@ -1,33 +1,34 @@ #include "cru/base/io/Stream.h" -#include <algorithm> +#include <atomic> #include <format> -#include <iterator> +#include <ranges> #include <utility> namespace cru::io { +StreamException::StreamException(Stream* stream, std::string message, + std::shared_ptr<std::exception> inner) + : Exception(std::move(message), std::move(inner)), stream_(stream) {} + StreamOperationNotSupportedException::StreamOperationNotSupportedException( - std::string operation) - : Exception(std::format("Stream operation {} not supported.", operation)), + Stream* stream, std::string operation) + : StreamException( + stream, std::format("Stream operation {} not supported.", operation)), operation_(std::move(operation)) {} -void StreamOperationNotSupportedException::CheckSeek(bool seekable) { - if (!seekable) throw StreamOperationNotSupportedException("seek"); -} - -void StreamOperationNotSupportedException::CheckRead(bool readable) { - if (!readable) throw StreamOperationNotSupportedException("read"); +void StreamOperationNotSupportedException::CheckSeek(Stream* stream, + bool seekable) { + if (!seekable) throw StreamOperationNotSupportedException(stream, "seek"); } -void StreamOperationNotSupportedException::CheckWrite(bool writable) { - if (!writable) throw StreamOperationNotSupportedException("write"); +void StreamOperationNotSupportedException::CheckRead(Stream* stream, + bool readable) { + if (!readable) throw StreamOperationNotSupportedException(stream, "read"); } -StreamClosedException::StreamClosedException() - : Exception("Stream is already closed.") {} - -void StreamClosedException::Check(bool closed) { - if (closed) throw StreamClosedException(); +void StreamOperationNotSupportedException::CheckWrite(Stream* stream, + bool writable) { + if (!writable) throw StreamOperationNotSupportedException(stream, "write"); } Stream::Stream(SupportedOperations supported_operations) @@ -44,7 +45,7 @@ bool Stream::CanSeek() { Index Stream::Seek(Index offset, SeekOrigin origin) { CheckClosed(); - StreamOperationNotSupportedException::CheckSeek(DoCanSeek()); + StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek()); return DoSeek(offset, origin); } @@ -70,7 +71,7 @@ bool Stream::CanRead() { Index Stream::Read(std::byte* buffer, Index offset, Index size) { CheckClosed(); - StreamOperationNotSupportedException::CheckRead(DoCanRead()); + StreamOperationNotSupportedException::CheckRead(this, DoCanRead()); return DoRead(buffer, offset, size); } @@ -93,7 +94,7 @@ bool Stream::CanWrite() { Index Stream::Write(const std::byte* buffer, Index offset, Index size) { CheckClosed(); - StreamOperationNotSupportedException::CheckWrite(DoCanWrite()); + StreamOperationNotSupportedException::CheckWrite(this, DoCanWrite()); return DoWrite(buffer, offset, size); } @@ -114,6 +115,46 @@ void Stream::Flush() { DoFlush(); } +bool Stream::IsClosed() { return closed_.load(std::memory_order_acquire); } + +bool Stream::Close() { + bool expected = false; + if (closed_.compare_exchange_strong(expected, true, + std::memory_order_acq_rel)) { + DoClose(); + } + return expected; +} + +std::vector<std::byte> Stream::ReadToEnd(Index grow_size) { + std::vector<std::byte> buffer; + Index pos = 0; + while (true) { + if (pos == buffer.size()) { + buffer.resize(buffer.size() + grow_size); + } + + auto read = Read(buffer.data(), pos, buffer.size() - pos); + if (read == kEOF) { + break; + } + pos += read; + } + buffer.resize(pos); + return buffer; +} + +std::string Stream::ReadToEndAsUtf8String() { + auto buffer = ReadToEnd(); + return std::views::transform( + buffer, [](std::byte c) { return static_cast<char>(c); }) | + std::ranges::to<std::string>(); +} + +void Stream::SetSupportedOperations(SupportedOperations supported_operations) { + supported_operations_ = std::move(supported_operations); +} + bool Stream::DoCanSeek() { if (supported_operations_.can_seek) { return *supported_operations_.can_seek; @@ -149,17 +190,17 @@ Index Stream::DoSeek(Index offset, SeekOrigin origin) { } Index Stream::DoTell() { - StreamOperationNotSupportedException::CheckSeek(DoCanSeek()); + StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek()); return DoSeek(0, SeekOrigin::Current); } void Stream::DoRewind() { - StreamOperationNotSupportedException::CheckSeek(DoCanSeek()); + StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek()); DoSeek(0, SeekOrigin::Begin); } Index Stream::DoGetSize() { - StreamOperationNotSupportedException::CheckSeek(DoCanSeek()); + StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek()); Index current_position = DoTell(); Seek(0, SeekOrigin::End); Index size = DoTell(); @@ -177,27 +218,12 @@ Index Stream::DoWrite(const std::byte* buffer, Index offset, Index size) { void Stream::DoFlush() {} -Buffer Stream::ReadToEnd(Index grow_size) { - Buffer buffer(grow_size); - while (true) { - auto read = Read(buffer.GetUsedEndPtr(), buffer.GetBackFree()); - buffer.PushBackCount(read); - if (read == 0) { - break; - } - if (buffer.IsUsedReachEnd()) { - buffer.ResizeBuffer(buffer.GetBufferSize() + grow_size, true); - } +void Stream::DoClose() {} + +void Stream::CheckClosed() { + if (IsClosed()) { + throw StreamClosedException(this, "Stream is closed."); } - return buffer; } -std::string Stream::ReadToEndAsUtf8String() { - auto buffer = ReadToEnd(); - std::string result; - std::transform(buffer.GetUsedBeginPtr(), buffer.GetUsedEndPtr(), - std::back_inserter(result), - [](std::byte c) { return static_cast<char>(c); }); - return result; -} } // namespace cru::io diff --git a/src/base/platform/unix/UnixFileStream.cpp b/src/base/platform/unix/UnixFileStream.cpp index df1ddffa..aaaa8d6a 100644 --- a/src/base/platform/unix/UnixFileStream.cpp +++ b/src/base/platform/unix/UnixFileStream.cpp @@ -37,7 +37,7 @@ int MapSeekOrigin(Stream::SeekOrigin origin) { } } // namespace -UnixFileStream::UnixFileStream(const char *path, int oflag, mode_t mode) { +UnixFileStream::UnixFileStream(const char* path, int oflag, mode_t mode) { file_descriptor_ = UnixFileDescriptor(::open(path, oflag, mode)); if (file_descriptor_ == -1) { throw ErrnoException(std::format( @@ -52,7 +52,7 @@ UnixFileStream::UnixFileStream(UnixFileDescriptor fd, bool can_seek, bool can_read, bool can_write) : Stream(can_seek, can_read, can_write), file_descriptor_(std::move(fd)) {} -UnixFileStream::~UnixFileStream() { DoClose(); } +UnixFileStream::~UnixFileStream() { file_descriptor_ = {}; } Index UnixFileStream::DoSeek(Index offset, SeekOrigin origin) { off_t result = ::lseek(file_descriptor_, offset, MapSeekOrigin(origin)); @@ -62,7 +62,7 @@ Index UnixFileStream::DoSeek(Index offset, SeekOrigin origin) { return result; } -Index UnixFileStream::DoRead(std::byte *buffer, Index offset, Index size) { +Index UnixFileStream::DoRead(std::byte* buffer, Index offset, Index size) { auto result = ::read(file_descriptor_, buffer + offset, size); if (result == -1) { throw ErrnoException("Failed to read file."); @@ -70,7 +70,7 @@ Index UnixFileStream::DoRead(std::byte *buffer, Index offset, Index size) { return result; } -Index UnixFileStream::DoWrite(const std::byte *buffer, Index offset, +Index UnixFileStream::DoWrite(const std::byte* buffer, Index offset, Index size) { auto result = ::write(file_descriptor_, buffer + offset, size); if (result == -1) { @@ -79,10 +79,5 @@ Index UnixFileStream::DoWrite(const std::byte *buffer, Index offset, return result; } -void UnixFileStream::DoClose() { - CRU_STREAM_BEGIN_CLOSE - if (file_descriptor_) { - file_descriptor_ = {}; - } -} +void UnixFileStream::DoClose() { file_descriptor_ = {}; } } // namespace cru::platform::unix diff --git a/src/base/platform/win/BridgeComStream.cpp b/src/base/platform/win/BridgeComStream.cpp index c6987ab2..921209b4 100644 --- a/src/base/platform/win/BridgeComStream.cpp +++ b/src/base/platform/win/BridgeComStream.cpp @@ -2,7 +2,7 @@ #include "cru/base/io/Stream.h" namespace cru::platform::win { -BridgeComStream::BridgeComStream(io::Stream *stream) +BridgeComStream::BridgeComStream(io::Stream* stream) : stream_(stream), ref_count_(1) {} BridgeComStream::~BridgeComStream() {} @@ -20,17 +20,17 @@ ULONG BridgeComStream::Release() { return ref_count_; } -HRESULT BridgeComStream::QueryInterface(const IID &riid, void **ppvObject) { +HRESULT BridgeComStream::QueryInterface(const IID& riid, void** ppvObject) { if (riid == IID_IStream) { - *ppvObject = static_cast<IStream *>(this); + *ppvObject = static_cast<IStream*>(this); AddRef(); return S_OK; } else if (riid == IID_ISequentialStream) { - *ppvObject = static_cast<ISequentialStream *>(this); + *ppvObject = static_cast<ISequentialStream*>(this); AddRef(); return S_OK; } else if (riid == IID_IUnknown) { - *ppvObject = static_cast<IUnknown *>(this); + *ppvObject = static_cast<IUnknown*>(this); AddRef(); return S_OK; } else { @@ -38,18 +38,22 @@ HRESULT BridgeComStream::QueryInterface(const IID &riid, void **ppvObject) { } } -HRESULT BridgeComStream::Read(void *pv, ULONG cb, ULONG *pcbRead) { - *pcbRead = stream_->Read(static_cast<std::byte *>(pv), cb); - return S_OK; +HRESULT BridgeComStream::Read(void* pv, ULONG cb, ULONG* pcbRead) { + auto count = stream_->Read(static_cast<std::byte*>(pv), cb); + if (count == cru::io::Stream::kEOF) { + count = 0; + } + *pcbRead = count; + return cb == count ? S_OK : S_FALSE; } -HRESULT BridgeComStream::Write(const void *pv, ULONG cb, ULONG *pcbWritten) { - *pcbWritten = stream_->Write(static_cast<const std::byte *>(pv), cb); +HRESULT BridgeComStream::Write(const void* pv, ULONG cb, ULONG* pcbWritten) { + *pcbWritten = stream_->Write(static_cast<const std::byte*>(pv), cb); return S_OK; } HRESULT BridgeComStream::Seek(LARGE_INTEGER dlibMove, DWORD dwOrigin, - ULARGE_INTEGER *plibNewPosition) { + ULARGE_INTEGER* plibNewPosition) { io::Stream::SeekOrigin so; switch (dwOrigin) { @@ -74,9 +78,9 @@ HRESULT BridgeComStream::SetSize(ULARGE_INTEGER libNewSize) { return E_NOTIMPL; } -HRESULT BridgeComStream::CopyTo(IStream *pstm, ULARGE_INTEGER cb, - ULARGE_INTEGER *pcbRead, - ULARGE_INTEGER *pcbWritten) { +HRESULT BridgeComStream::CopyTo(IStream* pstm, ULARGE_INTEGER cb, + ULARGE_INTEGER* pcbRead, + ULARGE_INTEGER* pcbWritten) { return E_NOTIMPL; } @@ -94,11 +98,11 @@ HRESULT BridgeComStream::UnlockRegion(ULARGE_INTEGER libOffset, return S_OK; } -HRESULT BridgeComStream::Stat(STATSTG *pstatstg, DWORD grfStatFlag) { +HRESULT BridgeComStream::Stat(STATSTG* pstatstg, DWORD grfStatFlag) { return E_NOTIMPL; } -HRESULT BridgeComStream::Clone(IStream **ppstm) { +HRESULT BridgeComStream::Clone(IStream** ppstm) { *ppstm = new BridgeComStream(stream_); return S_OK; } diff --git a/src/base/platform/win/Stream.cpp b/src/base/platform/win/Stream.cpp index 10b80a16..611b2ca3 100644 --- a/src/base/platform/win/Stream.cpp +++ b/src/base/platform/win/Stream.cpp @@ -77,7 +77,11 @@ Win32HandleStream::Win32HandleStream(Win32Handle&& handle, bool can_seek, : Win32HandleStream(handle.Release(), true, can_seek, can_read, can_write) { } -Win32HandleStream::~Win32HandleStream() { DoClose(); } +Win32HandleStream::~Win32HandleStream() { + if (handle_ && auto_close_) { + ::CloseHandle(handle_); + } +} Index Win32HandleStream::DoSeek(Index offset, SeekOrigin origin) { DWORD method = 0; @@ -104,10 +108,14 @@ Index Win32HandleStream::DoRead(std::byte* buffer, Index offset, Index size) { &real_read, nullptr); if (r == FALSE) { auto e = ::GetLastError(); - if (e != ERROR_BROKEN_PIPE || e != ERROR_BROKEN_PIPE) { + if (e != ERROR_BROKEN_PIPE && e != ERROR_MORE_DATA) { throw Win32Error(e, "Failed to call ReadFile."); } } + + if (real_read == 0) { + return kEOF; + } return real_read; } @@ -120,13 +128,10 @@ Index Win32HandleStream::DoWrite(const std::byte* buffer, Index offset, } void Win32HandleStream::DoClose() { - CRU_STREAM_BEGIN_CLOSE - if (auto_close_) { ::CloseHandle(handle_); } - - handle_ = {}; + handle_ = nullptr; } IStream* ToComStream(io::Stream* stream) { @@ -185,7 +190,11 @@ ComStream::ComStream(IStream* com_stream, bool auto_release, bool can_seek, stream_(com_stream), auto_release_(auto_release) {} -ComStream::~ComStream() { DoClose(); } +ComStream::~ComStream() { + if (stream_ && auto_release_) { + stream_->Release(); + } +} Index ComStream::DoSeek(Index offset, SeekOrigin origin) { DWORD dwOrigin = 0; @@ -210,6 +219,9 @@ Index ComStream::DoSeek(Index offset, SeekOrigin origin) { Index ComStream::DoRead(std::byte* buffer, Index offset, Index size) { ULONG n_read; CheckHResult(stream_->Read(buffer + offset, size, &n_read)); + if (n_read == 0) { + return kEOF; + } return n_read; } @@ -225,11 +237,9 @@ Index ComStream::DoWrite(const std::byte* buffer, Index offset, Index size) { } void ComStream::DoClose() { - CRU_STREAM_BEGIN_CLOSE - - if (stream_ && auto_release_) { + if (auto_release_) { stream_->Release(); - stream_ = nullptr; } + stream_ = nullptr; } } // namespace cru::platform::win diff --git a/src/base/platform/win/Win32SubProcess.cpp b/src/base/platform/win/Win32SubProcess.cpp index aed3937c..4f268ef3 100644 --- a/src/base/platform/win/Win32SubProcess.cpp +++ b/src/base/platform/win/Win32SubProcess.cpp @@ -1,8 +1,10 @@ #include "cru/base/platform/win/Win32SubProcess.h" -#include <processthreadsapi.h> -#include <synchapi.h> #include "cru/base/StringUtil.h" #include "cru/base/SubProcess.h" +#include "cru/base/io/AutoReadStream.h" + +#include <processthreadsapi.h> +#include <synchapi.h> #include <memory> #include <string_view> diff --git a/test/base/CMakeLists.txt b/test/base/CMakeLists.txt index 52f61a8c..94022b1a 100644 --- a/test/base/CMakeLists.txt +++ b/test/base/CMakeLists.txt @@ -5,6 +5,9 @@ add_executable(CruBaseTest StringUtilTest.cpp SubProcessTest.cpp TimerTest.cpp + io/AutoReadStreamTest.cpp + io/BufferStreamTest.cpp + io/MemoryStreamTest.cpp toml/ParserTest.cpp xml/ParserTest.cpp ) diff --git a/test/base/io/AutoReadStreamTest.cpp b/test/base/io/AutoReadStreamTest.cpp new file mode 100644 index 00000000..e95b7c24 --- /dev/null +++ b/test/base/io/AutoReadStreamTest.cpp @@ -0,0 +1,32 @@ +#include "cru/base/io/AutoReadStream.h" +#include "cru/base/io/MemoryStream.h" + +#include <catch2/catch_test_macros.hpp> + +#include <algorithm> +#include <cstddef> +#include <ranges> + +TEST_CASE("AutoReadStream should work.", "[io][stream]") { + using namespace cru::io; + + const int size = 100; + std::vector<std::byte> buffer(size); + buffer[1] = std::byte(0xf0); + buffer[2] = std::byte(0x0f); + + MemoryStream underlying_stream(buffer.data(), buffer.size(), true); + AutoReadStream stream(&underlying_stream, true, false); + + REQUIRE(stream.CanRead()); + REQUIRE(!stream.CanWrite()); + + std::vector<std::byte> buffer2(size * 2); + + auto read = stream.Read(buffer2.data(), buffer2.size()); + auto read2 = stream.Read(buffer2.data(), size, 1); + + REQUIRE(read == size); + REQUIRE(std::ranges::equal(buffer, buffer2 | std::views::take(size))); + REQUIRE(read2 == Stream::kEOF); +} diff --git a/test/base/io/BufferStreamTest.cpp b/test/base/io/BufferStreamTest.cpp new file mode 100644 index 00000000..70780b48 --- /dev/null +++ b/test/base/io/BufferStreamTest.cpp @@ -0,0 +1,39 @@ +#include "cru/base/io/BufferStream.h" + +#include <catch2/catch_test_macros.hpp> + +#include <algorithm> +#include <cstddef> +#include <ranges> +#include <thread> + +TEST_CASE("BufferStream should work.", "[io][stream]") { + using namespace cru::io; + + const int size = 100; + std::vector<std::byte> buffer(size); + buffer[1] = std::byte(0xf0); + buffer[2] = std::byte(0x0f); + + BufferStream stream; + + std::vector<std::byte> buffer2(size * 2); + cru::Index read, read2; + + std::thread read_thread([&] { + read = stream.Read(buffer2.data(), buffer2.size()); + read2 = stream.Read(buffer2.data(), size, 1); + }); + + std::thread write_thread([&] { + stream.Write(buffer.data(), buffer.size()); + stream.WriteEof(); + }); + + read_thread.join(); + write_thread.join(); + + REQUIRE(read == size); + REQUIRE(std::ranges::equal(buffer, buffer2 | std::views::take(size))); + REQUIRE(read2 == Stream::kEOF); +} diff --git a/test/base/io/MemoryStreamTest.cpp b/test/base/io/MemoryStreamTest.cpp new file mode 100644 index 00000000..684971b1 --- /dev/null +++ b/test/base/io/MemoryStreamTest.cpp @@ -0,0 +1,30 @@ +#include "cru/base/io/MemoryStream.h" +#include "cru/base/io/Stream.h" + +#include <catch2/catch_test_macros.hpp> + +#include <algorithm> +#include <cstddef> +#include <ranges> + +TEST_CASE("MemoryStream should work.", "[io][stream]") { + using namespace cru::io; + const int size = 100; + std::vector<std::byte> buffer(size); + + buffer[1] = std::byte(0xf0); + buffer[2] = std::byte(0x0f); + + MemoryStream stream(buffer.data(), size, true); + + REQUIRE(stream.CanRead()); + REQUIRE(!stream.CanWrite()); + + std::vector<std::byte> buffer2(size * 2); + + auto read = stream.Read(buffer2.data(), buffer2.size()); + auto read2 = stream.Read(buffer2.data(), 1); + REQUIRE(read == size); + REQUIRE(std::ranges::equal(buffer, buffer2 | std::views::take(size))); + REQUIRE(read2 == Stream::kEOF); +} diff --git a/test/base/platform/win/StreamTest.cpp b/test/base/platform/win/StreamTest.cpp index f42fc5c2..56761b14 100644 --- a/test/base/platform/win/StreamTest.cpp +++ b/test/base/platform/win/StreamTest.cpp @@ -1,3 +1,4 @@ +#include "cru/base/Guard.h" #include "cru/base/StringUtil.h" #include "cru/base/platform/win/Stream.h" @@ -15,6 +16,7 @@ TEST_CASE("StreamConvert FileStreamWork", "[stream]") { (std::filesystem::temp_directory_path() / "cru_test_temp.XXXXXX") .native(); _wmktemp(temp_file_path.data()); + Guard _([temp_file_path] { std::filesystem::remove(temp_file_path); }); std::string path = string::ToUtf8String(temp_file_path); @@ -32,8 +34,6 @@ TEST_CASE("StreamConvert FileStreamWork", "[stream]") { REQUIRE(std::string_view(buffer.get(), 3) == "abc"); com_stream->Release(); file2.Close(); - - std::filesystem::remove(temp_file_path); } TEST_CASE("ComStream Work", "[stream]") { @@ -45,6 +45,7 @@ TEST_CASE("ComStream Work", "[stream]") { (std::filesystem::temp_directory_path() / "cru_test_temp.XXXXXX") .native(); _wmktemp(temp_file_path.data()); + Guard _([temp_file_path] { std::filesystem::remove(temp_file_path); }); std::string path = string::ToUtf8String(temp_file_path); @@ -62,6 +63,4 @@ TEST_CASE("ComStream Work", "[stream]") { REQUIRE(std::string_view(reinterpret_cast<const char*>(buffer.get()), 3) == "abc"); file2.Close(); - - std::filesystem::remove(temp_file_path); } |
