aboutsummaryrefslogtreecommitdiff
path: root/include/cru/base
diff options
context:
space:
mode:
authorYuqian Yang <crupest@crupest.life>2026-03-07 20:42:37 +0800
committerYuqian Yang <crupest@crupest.life>2026-03-07 20:42:37 +0800
commit38756822825e20eca3b9e01b735946175223d692 (patch)
treefc2a495bfc0e082d5ed9a1642278ae6467fe2742 /include/cru/base
parent924f4b472712d0cfc55b81dcb3eaed3f8a478288 (diff)
downloadcru-38756822825e20eca3b9e01b735946175223d692.tar.gz
cru-38756822825e20eca3b9e01b735946175223d692.tar.bz2
cru-38756822825e20eca3b9e01b735946175223d692.zip
Refactor stream.
Diffstat (limited to 'include/cru/base')
-rw-r--r--include/cru/base/Base.h3
-rw-r--r--include/cru/base/Buffer.h161
-rw-r--r--include/cru/base/io/AutoReadStream.h9
-rw-r--r--include/cru/base/io/BufferStream.h47
-rw-r--r--include/cru/base/io/CFileStream.h11
-rw-r--r--include/cru/base/io/MemoryStream.h8
-rw-r--r--include/cru/base/io/ProxyStream.h42
-rw-r--r--include/cru/base/io/Stream.h76
-rw-r--r--include/cru/base/platform/unix/UnixFileStream.h6
-rw-r--r--include/cru/base/platform/win/Stream.h12
10 files changed, 76 insertions, 299 deletions
diff --git a/include/cru/base/Base.h b/include/cru/base/Base.h
index e62054eb..9d10a447 100644
--- a/include/cru/base/Base.h
+++ b/include/cru/base/Base.h
@@ -145,6 +145,9 @@ class CRU_BASE_API PlatformException : public Exception {
class CRU_BASE_API ErrnoException : public Exception {
public:
+ /**
+ * @brief will retrieve errno automatically.
+ */
ErrnoException();
explicit ErrnoException(int error_code);
/**
diff --git a/include/cru/base/Buffer.h b/include/cru/base/Buffer.h
deleted file mode 100644
index 17da4ada..00000000
--- a/include/cru/base/Buffer.h
+++ /dev/null
@@ -1,161 +0,0 @@
-#pragma once
-
-#include "Base.h"
-
-namespace cru {
-class CRU_BASE_API Buffer final {
- friend CRU_BASE_API void swap(Buffer& left, Buffer& right) noexcept;
-
- public:
- Buffer();
- explicit Buffer(Index size);
-
- Buffer(const Buffer& other);
- Buffer(Buffer&& other) noexcept;
-
- Buffer& operator=(const Buffer& other);
- Buffer& operator=(Buffer&& other) noexcept;
-
- ~Buffer();
-
- public:
- Index GetBufferSize() const { return size_; }
- Index GetUsedSize() const { return used_end_ - used_begin_; }
- bool IsNull() const { return ptr_ == nullptr; }
- bool IsUsedReachEnd() const { return used_end_ == size_; }
-
- Index GetFrontFree() const { return used_begin_; }
- Index GetBackFree() const { return size_ - used_end_; }
-
- Index GetUsedBegin() const { return used_begin_; }
- Index GetUsedEnd() const { return used_end_; }
-
- std::byte* GetPtr() { return GetPtrAt(0); }
- const std::byte* GetPtr() const { return GetPtrAt(0); }
-
- std::byte* GetPtrAt(Index index) { return ptr_ + index; }
- const std::byte* GetPtrAt(Index index) const { return ptr_ + index; }
-
- std::byte& GetRefAt(Index index) { return *GetPtrAt(index); }
- const std::byte& GetRefAt(Index index) const { return *GetPtrAt(index); }
-
- std::byte* GetUsedBeginPtr() { return GetPtrAt(GetUsedBegin()); }
- const std::byte* GetUsedBeginPtr() const { return GetPtrAt(GetUsedBegin()); }
- std::byte* GetUsedEndPtr() { return GetPtrAt(GetUsedEnd()); }
- const std::byte* GetUsedEndPtr() const { return GetPtrAt(GetUsedEnd()); }
-
- std::byte GetByteAt(Index index) const { return ptr_[index]; }
- void SetByteAt(Index index, std::byte value) { ptr_[index] = value; }
-
- void AssignBytes(std::byte* src, Index src_size, bool use_memmove = false) {
- return AssignBytes(0, src, 0, src_size, use_memmove);
- }
- void AssignBytes(Index dst_offset, std::byte* src, Index src_size,
- bool use_memmove = false) {
- return AssignBytes(dst_offset, src, 0, src_size, use_memmove);
- }
- void AssignBytes(Index dst_offset, std::byte* src, Index src_offset,
- Index src_size, bool use_memmove = false);
-
- /**
- * @brief Change the size of the buffer.
- *
- * Unless new size is the same as current size, the buffer is always released
- * and a new one is allocated. If preserve_used is true, the used size and old
- * data is copied to the new buffer. If new size is smaller than old used
- * size, then exceeded data will be lost.
- */
- void ResizeBuffer(Index new_size, bool preserve_used);
-
- /**
- * @brief Append data to the front of used bytes and increase used size.
- * @return The actual size of data saved.
- *
- * If there is no enough space left for new data, the rest space will be
- * written and the size of it will be returned, leaving exceeded data not
- * saved.
- */
- Index PushFront(const std::byte* other, Index other_size,
- bool use_memmove = false);
-
- bool PushBack(std::byte b);
-
- /**
- * @brief Append data to the back of used bytes and increase used size.
- * @return The actual size of data saved.
- *
- * If there is no enough space left for new data, the rest space will be
- * written and the size of it will be returned, leaving exceeded data not
- * saved.
- */
- Index PushBack(const std::byte* other, Index other_size,
- bool use_memmove = false);
-
- void PushBackCount(Index count);
-
- /**
- * @brief Move forward the used-begin ptr.
- * @return The actual size moved forward.
- *
- * If given size is bigger than current used size, the used size will be
- * returned and set to 0.
- */
- Index PopFront(Index size);
-
- /**
- * @brief Pop front data of used bytes into another buffer.
- * @return The actual size popped.
- *
- * If given size is bigger than current used size, then only current used size
- * of bytes will be popped. If given size is smaller than current used size,
- * then only given size of bytes will be popped.
- */
- Index PopFront(std::byte* buffer, Index size, bool use_memmove = false);
-
- /**
- * @brief Move backward the used-end ptr.
- * @return The actual size moved backward.
- *
- * If given size is bigger than current used size, the used size will be
- * returned and set to 0.
- */
- Index PopEnd(Index size);
-
- /**
- * @brief Pop back data of used bytes into another buffer.
- * @return The actual size popped.
- *
- * If given size is bigger than current used size, then only current used size
- * of bytes will be popped. If given size is smaller than current used size,
- * then only given size of bytes will be popped.
- */
- Index PopEnd(std::byte* buffer, Index size, bool use_memmove = false);
-
- operator std::byte*() { return GetPtr(); }
- operator const std::byte*() const { return GetPtr(); }
-
- /**
- * @brief Detach internal buffer and return it.
- * @param size If not null, size of the buffer is written to it.
- * @return The buffer pointer. May be nullptr.
- *
- * After detach, you are responsible to delete[] it.
- */
- std::byte* Detach(Index* size = nullptr);
-
- private:
- void Copy_(const Buffer& other);
- void Move_(Buffer&& other) noexcept;
- void Delete_() noexcept;
-
- void AssertValid();
-
- private:
- std::byte* ptr_;
- Index size_;
- Index used_begin_;
- Index used_end_;
-};
-
-CRU_BASE_API void swap(Buffer& left, Buffer& right) noexcept;
-} // namespace cru
diff --git a/include/cru/base/io/AutoReadStream.h b/include/cru/base/io/AutoReadStream.h
index 56e2beca..41ca1118 100644
--- a/include/cru/base/io/AutoReadStream.h
+++ b/include/cru/base/io/AutoReadStream.h
@@ -3,7 +3,6 @@
#include "BufferStream.h"
#include "Stream.h"
-#include <mutex>
#include <thread>
namespace cru::io {
@@ -45,18 +44,16 @@ class CRU_BASE_API AutoReadStream : public Stream {
~AutoReadStream() override;
public:
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
- void BeginToDrop(bool auto_close = true, bool auto_delete = true);
+ Stream* GetUnderlyingStream() { return stream_; }
protected:
+ bool DoCanWrite() override;
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
void DoFlush() override;
+ void DoClose() override;
private:
- void DoClose();
-
void BackgroundThreadRun();
private:
diff --git a/include/cru/base/io/BufferStream.h b/include/cru/base/io/BufferStream.h
index d4ee3837..c8e8f707 100644
--- a/include/cru/base/io/BufferStream.h
+++ b/include/cru/base/io/BufferStream.h
@@ -1,6 +1,5 @@
#pragma once
-#include "../Buffer.h"
#include "Stream.h"
#include <condition_variable>
@@ -8,12 +7,6 @@
#include <mutex>
namespace cru::io {
-class WriteAfterEofException : public Exception {
- public:
- using Exception::Exception;
- ~WriteAfterEofException() override = default;
-};
-
struct BufferStreamOptions {
/**
* Actually I have no ideas about the best value for this. May change it later
@@ -51,34 +44,50 @@ struct BufferStreamOptions {
};
/**
- * @brief SPSC (Single Producer Single Consumer) buffer stream.
- *
- * If used by multiple producer or multiple consumer, the behavior is undefined.
+ * @brief MPMC (Multiple Producer Multiple Consumer) buffer stream.
*/
class BufferStream : public Stream {
public:
- BufferStream(const BufferStreamOptions& options);
+ explicit BufferStream(const BufferStreamOptions& options = {});
~BufferStream() override;
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
- void SetEof();
+ void WriteEof();
protected:
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
+ void DoClose() override;
private:
- void DoClose();
+ struct Block {
+ std::byte* buffer;
+ Index size;
+ Index start;
+ Index end;
+
+ explicit Block(Index size);
+
+ CRU_DELETE_COPY(Block)
+
+ Block(Block&& other) noexcept;
+ Block& operator=(Block&& other) noexcept;
+
+ ~Block();
+
+ Index Read(std::byte* des, Index si);
+ Index Write(const std::byte* src, Index si);
+ bool IsFull() const;
+ bool IsEmpty() const;
+ };
- private:
Index block_size_;
Index max_block_count_;
- std::list<Buffer> buffer_list_;
- bool eof_;
+ std::list<Block> buffer_list_;
+ bool eof_written_;
std::mutex mutex_;
- std::condition_variable condition_variable_;
+ std::condition_variable read_cv_;
+ std::condition_variable write_cv_;
};
} // namespace cru::io
diff --git a/include/cru/base/io/CFileStream.h b/include/cru/base/io/CFileStream.h
index 0b58bdc9..66745955 100644
--- a/include/cru/base/io/CFileStream.h
+++ b/include/cru/base/io/CFileStream.h
@@ -11,15 +11,10 @@ class CRU_BASE_API CFileStream : public Stream {
explicit CFileStream(std::FILE* file, bool readable = true,
bool writable = true, bool auto_close = true);
- CRU_DELETE_COPY(CFileStream)
- CRU_DELETE_MOVE(CFileStream)
-
~CFileStream() override;
public:
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
- std::FILE* GetHandle() const;
+ std::FILE* GetHandle();
protected:
Index DoSeek(Index offset, SeekOrigin origin) override;
@@ -28,9 +23,7 @@ class CRU_BASE_API CFileStream : public Stream {
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
void DoFlush() override;
-
- private:
- void DoClose();
+ void DoClose() override;
private:
std::FILE* file_;
diff --git a/include/cru/base/io/MemoryStream.h b/include/cru/base/io/MemoryStream.h
index a1f90c3b..155219a8 100644
--- a/include/cru/base/io/MemoryStream.h
+++ b/include/cru/base/io/MemoryStream.h
@@ -14,18 +14,14 @@ class CRU_BASE_API MemoryStream : public Stream {
~MemoryStream() override;
public:
- void Close() override;
-
- std::byte* GetBuffer() const { return buffer_; }
+ std::byte* GetBuffer() { return buffer_; }
protected:
Index DoSeek(Index offset, SeekOrigin origin) override;
Index DoGetSize() override { return size_; }
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
-
- private:
- void DoClose();
+ void DoClose() override;
private:
std::byte* buffer_;
diff --git a/include/cru/base/io/ProxyStream.h b/include/cru/base/io/ProxyStream.h
deleted file mode 100644
index 42ec9dfd..00000000
--- a/include/cru/base/io/ProxyStream.h
+++ /dev/null
@@ -1,42 +0,0 @@
-#pragma once
-
-#include "Stream.h"
-
-#include <functional>
-
-namespace cru::io {
-struct ProxyStreamHandlers {
- std::function<Index(Index offset, Stream::SeekOrigin origin)> seek;
- std::function<Index(std::byte* buffer, Index offset, Index size)> read;
- std::function<Index(const std::byte* buffer, Index offset, Index size)> write;
- std::function<void()> flush;
-
- /**
- * @brief This method will be only called once when `Close` is called or the
- * stream is destructed.
- */
- std::function<void()> close;
-};
-
-class ProxyStream : public Stream {
- public:
- explicit ProxyStream(ProxyStreamHandlers handlers);
-
- ~ProxyStream() override;
-
- public:
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
- protected:
- Index DoSeek(Index offset, SeekOrigin origin) override;
- Index DoRead(std::byte* buffer, Index offset, Index size) override;
- Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
- void DoFlush() override;
-
- private:
- void DoClose();
-
- private:
- ProxyStreamHandlers handlers_;
-};
-} // namespace cru::io
diff --git a/include/cru/base/io/Stream.h b/include/cru/base/io/Stream.h
index 8c0d3669..f497d725 100644
--- a/include/cru/base/io/Stream.h
+++ b/include/cru/base/io/Stream.h
@@ -1,46 +1,51 @@
#pragma once
#include "../Base.h"
-#include "../Buffer.h"
+#include "../Guard.h" // IWYU pragma: keep
+#include <atomic>
#include <cstddef>
namespace cru::io {
-class CRU_BASE_API StreamOperationNotSupportedException : public Exception {
- public:
- explicit StreamOperationNotSupportedException(std::string operation);
+class Stream;
+class CRU_BASE_API StreamException : public Exception {
public:
- std::string GetOperation() const { return operation_; }
+ explicit StreamException(Stream* stream, std::string message = "",
+ std::shared_ptr<std::exception> inner = nullptr);
- public:
- static void CheckSeek(bool seekable);
- static void CheckRead(bool readable);
- static void CheckWrite(bool writable);
+ Stream* GetStream() const { return stream_; }
private:
- std::string operation_;
+ Stream* stream_;
};
-class CRU_BASE_API StreamClosedException : public Exception {
+class CRU_BASE_API StreamOperationNotSupportedException
+ : public StreamException {
public:
- StreamClosedException();
+ explicit StreamOperationNotSupportedException(Stream* stream,
+ std::string operation);
- CRU_DEFAULT_DESTRUCTOR(StreamClosedException)
+ public:
+ std::string GetOperation() const { return operation_; }
+
+ public:
+ static void CheckSeek(Stream* stream, bool seekable);
+ static void CheckRead(Stream* stream, bool readable);
+ static void CheckWrite(Stream* stream, bool writable);
- static void Check(bool closed);
+ private:
+ std::string operation_;
};
-#define CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE \
- void Close() override { DoClose(); }
+class CRU_BASE_API StreamClosedException : public StreamException {
+ using StreamException::StreamException;
+};
-#define CRU_STREAM_BEGIN_CLOSE \
- if (GetClosed()) return; \
- CloseGuard close_guard(this);
+class CRU_BASE_API StreamIOException : public StreamException {
+ using StreamException::StreamException;
+};
-/**
- * All stream is thread-unsafe by default unless being documented.
- */
class CRU_BASE_API Stream : public Object {
protected:
struct SupportedOperations {
@@ -49,12 +54,6 @@ class CRU_BASE_API Stream : public Object {
std::optional<bool> can_write;
};
- struct CloseGuard {
- explicit CloseGuard(Stream* stream) : stream(stream) {}
- ~CloseGuard() { stream->SetClosed(true); }
- Stream* stream;
- };
-
protected:
explicit Stream(SupportedOperations supported_operations = {});
Stream(std::optional<bool> can_seek, std::optional<bool> can_read,
@@ -62,8 +61,7 @@ class CRU_BASE_API Stream : public Object {
public:
enum class SeekOrigin { Current, Begin, End };
-
- ~Stream() override = default;
+ constexpr static Index kEOF = -1;
public:
bool CanSeek();
@@ -85,11 +83,11 @@ class CRU_BASE_API Stream : public Object {
Index Write(const char* buffer, Index size);
void Flush();
- virtual void Close() = 0;
- virtual Buffer ReadToEnd(Index grow_size = 256);
+ bool IsClosed();
+ bool Close();
- // Utf8 encoding.
+ virtual std::vector<std::byte> ReadToEnd(Index grow_size = 256);
std::string ReadToEndAsUtf8String();
protected:
@@ -103,17 +101,13 @@ class CRU_BASE_API Stream : public Object {
virtual Index DoRead(std::byte* buffer, Index offset, Index size);
virtual Index DoWrite(const std::byte* buffer, Index offset, Index size);
virtual void DoFlush();
+ virtual void DoClose();
- void SetSupportedOperations(SupportedOperations supported_operations) {
- supported_operations_ = std::move(supported_operations);
- }
-
- bool GetClosed() { return closed_; }
- void SetClosed(bool closed) { closed_ = closed; }
- void CheckClosed() { StreamClosedException::Check(closed_); }
+ void SetSupportedOperations(SupportedOperations supported_operations);
+ void CheckClosed();
private:
SupportedOperations supported_operations_;
- bool closed_;
+ std::atomic_bool closed_;
};
} // namespace cru::io
diff --git a/include/cru/base/platform/unix/UnixFileStream.h b/include/cru/base/platform/unix/UnixFileStream.h
index 1657bfbb..81174151 100644
--- a/include/cru/base/platform/unix/UnixFileStream.h
+++ b/include/cru/base/platform/unix/UnixFileStream.h
@@ -19,17 +19,13 @@ class UnixFileStream : public io::Stream {
~UnixFileStream() override;
public:
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
int GetFileDescriptor() const { return file_descriptor_; }
protected:
Index DoSeek(Index offset, SeekOrigin origin = SeekOrigin::Current) override;
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
-
- private:
- void DoClose();
+ void DoClose() override;
private:
UnixFileDescriptor file_descriptor_;
diff --git a/include/cru/base/platform/win/Stream.h b/include/cru/base/platform/win/Stream.h
index 104b3bd7..9dd33d35 100644
--- a/include/cru/base/platform/win/Stream.h
+++ b/include/cru/base/platform/win/Stream.h
@@ -24,15 +24,11 @@ class CRU_BASE_API Win32HandleStream : public io::Stream {
Index DoSeek(Index offset, SeekOrigin origin) override;
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
+ void DoClose() override;
public:
HANDLE GetHandle() { return handle_; }
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
- private:
- void DoClose();
-
private:
HANDLE handle_;
bool auto_close_;
@@ -51,15 +47,11 @@ class CRU_BASE_API ComStream : public io::Stream {
Index DoSeek(Index offset, SeekOrigin origin) override;
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
+ void DoClose() override;
public:
IStream* GetComStream() { return stream_; }
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
- private:
- void DoClose();
-
private:
IStream* stream_;
bool auto_release_;