diff options
| author | Yuqian Yang <crupest@crupest.life> | 2026-03-07 20:42:37 +0800 |
|---|---|---|
| committer | Yuqian Yang <crupest@crupest.life> | 2026-03-07 20:42:37 +0800 |
| commit | 38756822825e20eca3b9e01b735946175223d692 (patch) | |
| tree | fc2a495bfc0e082d5ed9a1642278ae6467fe2742 /include/cru | |
| parent | 924f4b472712d0cfc55b81dcb3eaed3f8a478288 (diff) | |
| download | cru-38756822825e20eca3b9e01b735946175223d692.tar.gz cru-38756822825e20eca3b9e01b735946175223d692.tar.bz2 cru-38756822825e20eca3b9e01b735946175223d692.zip | |
Refactor stream.
Diffstat (limited to 'include/cru')
| -rw-r--r-- | include/cru/base/Base.h | 3 | ||||
| -rw-r--r-- | include/cru/base/Buffer.h | 161 | ||||
| -rw-r--r-- | include/cru/base/io/AutoReadStream.h | 9 | ||||
| -rw-r--r-- | include/cru/base/io/BufferStream.h | 47 | ||||
| -rw-r--r-- | include/cru/base/io/CFileStream.h | 11 | ||||
| -rw-r--r-- | include/cru/base/io/MemoryStream.h | 8 | ||||
| -rw-r--r-- | include/cru/base/io/ProxyStream.h | 42 | ||||
| -rw-r--r-- | include/cru/base/io/Stream.h | 76 | ||||
| -rw-r--r-- | include/cru/base/platform/unix/UnixFileStream.h | 6 | ||||
| -rw-r--r-- | include/cru/base/platform/win/Stream.h | 12 |
10 files changed, 76 insertions, 299 deletions
diff --git a/include/cru/base/Base.h b/include/cru/base/Base.h index e62054eb..9d10a447 100644 --- a/include/cru/base/Base.h +++ b/include/cru/base/Base.h @@ -145,6 +145,9 @@ class CRU_BASE_API PlatformException : public Exception { class CRU_BASE_API ErrnoException : public Exception { public: + /** + * @brief will retrieve errno automatically. + */ ErrnoException(); explicit ErrnoException(int error_code); /** diff --git a/include/cru/base/Buffer.h b/include/cru/base/Buffer.h deleted file mode 100644 index 17da4ada..00000000 --- a/include/cru/base/Buffer.h +++ /dev/null @@ -1,161 +0,0 @@ -#pragma once - -#include "Base.h" - -namespace cru { -class CRU_BASE_API Buffer final { - friend CRU_BASE_API void swap(Buffer& left, Buffer& right) noexcept; - - public: - Buffer(); - explicit Buffer(Index size); - - Buffer(const Buffer& other); - Buffer(Buffer&& other) noexcept; - - Buffer& operator=(const Buffer& other); - Buffer& operator=(Buffer&& other) noexcept; - - ~Buffer(); - - public: - Index GetBufferSize() const { return size_; } - Index GetUsedSize() const { return used_end_ - used_begin_; } - bool IsNull() const { return ptr_ == nullptr; } - bool IsUsedReachEnd() const { return used_end_ == size_; } - - Index GetFrontFree() const { return used_begin_; } - Index GetBackFree() const { return size_ - used_end_; } - - Index GetUsedBegin() const { return used_begin_; } - Index GetUsedEnd() const { return used_end_; } - - std::byte* GetPtr() { return GetPtrAt(0); } - const std::byte* GetPtr() const { return GetPtrAt(0); } - - std::byte* GetPtrAt(Index index) { return ptr_ + index; } - const std::byte* GetPtrAt(Index index) const { return ptr_ + index; } - - std::byte& GetRefAt(Index index) { return *GetPtrAt(index); } - const std::byte& GetRefAt(Index index) const { return *GetPtrAt(index); } - - std::byte* GetUsedBeginPtr() { return GetPtrAt(GetUsedBegin()); } - const std::byte* GetUsedBeginPtr() const { return GetPtrAt(GetUsedBegin()); } - std::byte* GetUsedEndPtr() { return GetPtrAt(GetUsedEnd()); } - const std::byte* GetUsedEndPtr() const { return GetPtrAt(GetUsedEnd()); } - - std::byte GetByteAt(Index index) const { return ptr_[index]; } - void SetByteAt(Index index, std::byte value) { ptr_[index] = value; } - - void AssignBytes(std::byte* src, Index src_size, bool use_memmove = false) { - return AssignBytes(0, src, 0, src_size, use_memmove); - } - void AssignBytes(Index dst_offset, std::byte* src, Index src_size, - bool use_memmove = false) { - return AssignBytes(dst_offset, src, 0, src_size, use_memmove); - } - void AssignBytes(Index dst_offset, std::byte* src, Index src_offset, - Index src_size, bool use_memmove = false); - - /** - * @brief Change the size of the buffer. - * - * Unless new size is the same as current size, the buffer is always released - * and a new one is allocated. If preserve_used is true, the used size and old - * data is copied to the new buffer. If new size is smaller than old used - * size, then exceeded data will be lost. - */ - void ResizeBuffer(Index new_size, bool preserve_used); - - /** - * @brief Append data to the front of used bytes and increase used size. - * @return The actual size of data saved. - * - * If there is no enough space left for new data, the rest space will be - * written and the size of it will be returned, leaving exceeded data not - * saved. - */ - Index PushFront(const std::byte* other, Index other_size, - bool use_memmove = false); - - bool PushBack(std::byte b); - - /** - * @brief Append data to the back of used bytes and increase used size. - * @return The actual size of data saved. - * - * If there is no enough space left for new data, the rest space will be - * written and the size of it will be returned, leaving exceeded data not - * saved. - */ - Index PushBack(const std::byte* other, Index other_size, - bool use_memmove = false); - - void PushBackCount(Index count); - - /** - * @brief Move forward the used-begin ptr. - * @return The actual size moved forward. - * - * If given size is bigger than current used size, the used size will be - * returned and set to 0. - */ - Index PopFront(Index size); - - /** - * @brief Pop front data of used bytes into another buffer. - * @return The actual size popped. - * - * If given size is bigger than current used size, then only current used size - * of bytes will be popped. If given size is smaller than current used size, - * then only given size of bytes will be popped. - */ - Index PopFront(std::byte* buffer, Index size, bool use_memmove = false); - - /** - * @brief Move backward the used-end ptr. - * @return The actual size moved backward. - * - * If given size is bigger than current used size, the used size will be - * returned and set to 0. - */ - Index PopEnd(Index size); - - /** - * @brief Pop back data of used bytes into another buffer. - * @return The actual size popped. - * - * If given size is bigger than current used size, then only current used size - * of bytes will be popped. If given size is smaller than current used size, - * then only given size of bytes will be popped. - */ - Index PopEnd(std::byte* buffer, Index size, bool use_memmove = false); - - operator std::byte*() { return GetPtr(); } - operator const std::byte*() const { return GetPtr(); } - - /** - * @brief Detach internal buffer and return it. - * @param size If not null, size of the buffer is written to it. - * @return The buffer pointer. May be nullptr. - * - * After detach, you are responsible to delete[] it. - */ - std::byte* Detach(Index* size = nullptr); - - private: - void Copy_(const Buffer& other); - void Move_(Buffer&& other) noexcept; - void Delete_() noexcept; - - void AssertValid(); - - private: - std::byte* ptr_; - Index size_; - Index used_begin_; - Index used_end_; -}; - -CRU_BASE_API void swap(Buffer& left, Buffer& right) noexcept; -} // namespace cru diff --git a/include/cru/base/io/AutoReadStream.h b/include/cru/base/io/AutoReadStream.h index 56e2beca..41ca1118 100644 --- a/include/cru/base/io/AutoReadStream.h +++ b/include/cru/base/io/AutoReadStream.h @@ -3,7 +3,6 @@ #include "BufferStream.h" #include "Stream.h" -#include <mutex> #include <thread> namespace cru::io { @@ -45,18 +44,16 @@ class CRU_BASE_API AutoReadStream : public Stream { ~AutoReadStream() override; public: - CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE - - void BeginToDrop(bool auto_close = true, bool auto_delete = true); + Stream* GetUnderlyingStream() { return stream_; } protected: + bool DoCanWrite() override; Index DoRead(std::byte* buffer, Index offset, Index size) override; Index DoWrite(const std::byte* buffer, Index offset, Index size) override; void DoFlush() override; + void DoClose() override; private: - void DoClose(); - void BackgroundThreadRun(); private: diff --git a/include/cru/base/io/BufferStream.h b/include/cru/base/io/BufferStream.h index d4ee3837..c8e8f707 100644 --- a/include/cru/base/io/BufferStream.h +++ b/include/cru/base/io/BufferStream.h @@ -1,6 +1,5 @@ #pragma once -#include "../Buffer.h" #include "Stream.h" #include <condition_variable> @@ -8,12 +7,6 @@ #include <mutex> namespace cru::io { -class WriteAfterEofException : public Exception { - public: - using Exception::Exception; - ~WriteAfterEofException() override = default; -}; - struct BufferStreamOptions { /** * Actually I have no ideas about the best value for this. May change it later @@ -51,34 +44,50 @@ struct BufferStreamOptions { }; /** - * @brief SPSC (Single Producer Single Consumer) buffer stream. - * - * If used by multiple producer or multiple consumer, the behavior is undefined. + * @brief MPMC (Multiple Producer Multiple Consumer) buffer stream. */ class BufferStream : public Stream { public: - BufferStream(const BufferStreamOptions& options); + explicit BufferStream(const BufferStreamOptions& options = {}); ~BufferStream() override; - CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE - - void SetEof(); + void WriteEof(); protected: Index DoRead(std::byte* buffer, Index offset, Index size) override; Index DoWrite(const std::byte* buffer, Index offset, Index size) override; + void DoClose() override; private: - void DoClose(); + struct Block { + std::byte* buffer; + Index size; + Index start; + Index end; + + explicit Block(Index size); + + CRU_DELETE_COPY(Block) + + Block(Block&& other) noexcept; + Block& operator=(Block&& other) noexcept; + + ~Block(); + + Index Read(std::byte* des, Index si); + Index Write(const std::byte* src, Index si); + bool IsFull() const; + bool IsEmpty() const; + }; - private: Index block_size_; Index max_block_count_; - std::list<Buffer> buffer_list_; - bool eof_; + std::list<Block> buffer_list_; + bool eof_written_; std::mutex mutex_; - std::condition_variable condition_variable_; + std::condition_variable read_cv_; + std::condition_variable write_cv_; }; } // namespace cru::io diff --git a/include/cru/base/io/CFileStream.h b/include/cru/base/io/CFileStream.h index 0b58bdc9..66745955 100644 --- a/include/cru/base/io/CFileStream.h +++ b/include/cru/base/io/CFileStream.h @@ -11,15 +11,10 @@ class CRU_BASE_API CFileStream : public Stream { explicit CFileStream(std::FILE* file, bool readable = true, bool writable = true, bool auto_close = true); - CRU_DELETE_COPY(CFileStream) - CRU_DELETE_MOVE(CFileStream) - ~CFileStream() override; public: - CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE - - std::FILE* GetHandle() const; + std::FILE* GetHandle(); protected: Index DoSeek(Index offset, SeekOrigin origin) override; @@ -28,9 +23,7 @@ class CRU_BASE_API CFileStream : public Stream { Index DoRead(std::byte* buffer, Index offset, Index size) override; Index DoWrite(const std::byte* buffer, Index offset, Index size) override; void DoFlush() override; - - private: - void DoClose(); + void DoClose() override; private: std::FILE* file_; diff --git a/include/cru/base/io/MemoryStream.h b/include/cru/base/io/MemoryStream.h index a1f90c3b..155219a8 100644 --- a/include/cru/base/io/MemoryStream.h +++ b/include/cru/base/io/MemoryStream.h @@ -14,18 +14,14 @@ class CRU_BASE_API MemoryStream : public Stream { ~MemoryStream() override; public: - void Close() override; - - std::byte* GetBuffer() const { return buffer_; } + std::byte* GetBuffer() { return buffer_; } protected: Index DoSeek(Index offset, SeekOrigin origin) override; Index DoGetSize() override { return size_; } Index DoRead(std::byte* buffer, Index offset, Index size) override; Index DoWrite(const std::byte* buffer, Index offset, Index size) override; - - private: - void DoClose(); + void DoClose() override; private: std::byte* buffer_; diff --git a/include/cru/base/io/ProxyStream.h b/include/cru/base/io/ProxyStream.h deleted file mode 100644 index 42ec9dfd..00000000 --- a/include/cru/base/io/ProxyStream.h +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once - -#include "Stream.h" - -#include <functional> - -namespace cru::io { -struct ProxyStreamHandlers { - std::function<Index(Index offset, Stream::SeekOrigin origin)> seek; - std::function<Index(std::byte* buffer, Index offset, Index size)> read; - std::function<Index(const std::byte* buffer, Index offset, Index size)> write; - std::function<void()> flush; - - /** - * @brief This method will be only called once when `Close` is called or the - * stream is destructed. - */ - std::function<void()> close; -}; - -class ProxyStream : public Stream { - public: - explicit ProxyStream(ProxyStreamHandlers handlers); - - ~ProxyStream() override; - - public: - CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE - - protected: - Index DoSeek(Index offset, SeekOrigin origin) override; - Index DoRead(std::byte* buffer, Index offset, Index size) override; - Index DoWrite(const std::byte* buffer, Index offset, Index size) override; - void DoFlush() override; - - private: - void DoClose(); - - private: - ProxyStreamHandlers handlers_; -}; -} // namespace cru::io diff --git a/include/cru/base/io/Stream.h b/include/cru/base/io/Stream.h index 8c0d3669..f497d725 100644 --- a/include/cru/base/io/Stream.h +++ b/include/cru/base/io/Stream.h @@ -1,46 +1,51 @@ #pragma once #include "../Base.h" -#include "../Buffer.h" +#include "../Guard.h" // IWYU pragma: keep +#include <atomic> #include <cstddef> namespace cru::io { -class CRU_BASE_API StreamOperationNotSupportedException : public Exception { - public: - explicit StreamOperationNotSupportedException(std::string operation); +class Stream; +class CRU_BASE_API StreamException : public Exception { public: - std::string GetOperation() const { return operation_; } + explicit StreamException(Stream* stream, std::string message = "", + std::shared_ptr<std::exception> inner = nullptr); - public: - static void CheckSeek(bool seekable); - static void CheckRead(bool readable); - static void CheckWrite(bool writable); + Stream* GetStream() const { return stream_; } private: - std::string operation_; + Stream* stream_; }; -class CRU_BASE_API StreamClosedException : public Exception { +class CRU_BASE_API StreamOperationNotSupportedException + : public StreamException { public: - StreamClosedException(); + explicit StreamOperationNotSupportedException(Stream* stream, + std::string operation); - CRU_DEFAULT_DESTRUCTOR(StreamClosedException) + public: + std::string GetOperation() const { return operation_; } + + public: + static void CheckSeek(Stream* stream, bool seekable); + static void CheckRead(Stream* stream, bool readable); + static void CheckWrite(Stream* stream, bool writable); - static void Check(bool closed); + private: + std::string operation_; }; -#define CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE \ - void Close() override { DoClose(); } +class CRU_BASE_API StreamClosedException : public StreamException { + using StreamException::StreamException; +}; -#define CRU_STREAM_BEGIN_CLOSE \ - if (GetClosed()) return; \ - CloseGuard close_guard(this); +class CRU_BASE_API StreamIOException : public StreamException { + using StreamException::StreamException; +}; -/** - * All stream is thread-unsafe by default unless being documented. - */ class CRU_BASE_API Stream : public Object { protected: struct SupportedOperations { @@ -49,12 +54,6 @@ class CRU_BASE_API Stream : public Object { std::optional<bool> can_write; }; - struct CloseGuard { - explicit CloseGuard(Stream* stream) : stream(stream) {} - ~CloseGuard() { stream->SetClosed(true); } - Stream* stream; - }; - protected: explicit Stream(SupportedOperations supported_operations = {}); Stream(std::optional<bool> can_seek, std::optional<bool> can_read, @@ -62,8 +61,7 @@ class CRU_BASE_API Stream : public Object { public: enum class SeekOrigin { Current, Begin, End }; - - ~Stream() override = default; + constexpr static Index kEOF = -1; public: bool CanSeek(); @@ -85,11 +83,11 @@ class CRU_BASE_API Stream : public Object { Index Write(const char* buffer, Index size); void Flush(); - virtual void Close() = 0; - virtual Buffer ReadToEnd(Index grow_size = 256); + bool IsClosed(); + bool Close(); - // Utf8 encoding. + virtual std::vector<std::byte> ReadToEnd(Index grow_size = 256); std::string ReadToEndAsUtf8String(); protected: @@ -103,17 +101,13 @@ class CRU_BASE_API Stream : public Object { virtual Index DoRead(std::byte* buffer, Index offset, Index size); virtual Index DoWrite(const std::byte* buffer, Index offset, Index size); virtual void DoFlush(); + virtual void DoClose(); - void SetSupportedOperations(SupportedOperations supported_operations) { - supported_operations_ = std::move(supported_operations); - } - - bool GetClosed() { return closed_; } - void SetClosed(bool closed) { closed_ = closed; } - void CheckClosed() { StreamClosedException::Check(closed_); } + void SetSupportedOperations(SupportedOperations supported_operations); + void CheckClosed(); private: SupportedOperations supported_operations_; - bool closed_; + std::atomic_bool closed_; }; } // namespace cru::io diff --git a/include/cru/base/platform/unix/UnixFileStream.h b/include/cru/base/platform/unix/UnixFileStream.h index 1657bfbb..81174151 100644 --- a/include/cru/base/platform/unix/UnixFileStream.h +++ b/include/cru/base/platform/unix/UnixFileStream.h @@ -19,17 +19,13 @@ class UnixFileStream : public io::Stream { ~UnixFileStream() override; public: - CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE - int GetFileDescriptor() const { return file_descriptor_; } protected: Index DoSeek(Index offset, SeekOrigin origin = SeekOrigin::Current) override; Index DoRead(std::byte* buffer, Index offset, Index size) override; Index DoWrite(const std::byte* buffer, Index offset, Index size) override; - - private: - void DoClose(); + void DoClose() override; private: UnixFileDescriptor file_descriptor_; diff --git a/include/cru/base/platform/win/Stream.h b/include/cru/base/platform/win/Stream.h index 104b3bd7..9dd33d35 100644 --- a/include/cru/base/platform/win/Stream.h +++ b/include/cru/base/platform/win/Stream.h @@ -24,15 +24,11 @@ class CRU_BASE_API Win32HandleStream : public io::Stream { Index DoSeek(Index offset, SeekOrigin origin) override; Index DoRead(std::byte* buffer, Index offset, Index size) override; Index DoWrite(const std::byte* buffer, Index offset, Index size) override; + void DoClose() override; public: HANDLE GetHandle() { return handle_; } - CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE - - private: - void DoClose(); - private: HANDLE handle_; bool auto_close_; @@ -51,15 +47,11 @@ class CRU_BASE_API ComStream : public io::Stream { Index DoSeek(Index offset, SeekOrigin origin) override; Index DoRead(std::byte* buffer, Index offset, Index size) override; Index DoWrite(const std::byte* buffer, Index offset, Index size) override; + void DoClose() override; public: IStream* GetComStream() { return stream_; } - CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE - - private: - void DoClose(); - private: IStream* stream_; bool auto_release_; |
