aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/cru/base/Base.h3
-rw-r--r--include/cru/base/Buffer.h161
-rw-r--r--include/cru/base/io/AutoReadStream.h9
-rw-r--r--include/cru/base/io/BufferStream.h47
-rw-r--r--include/cru/base/io/CFileStream.h11
-rw-r--r--include/cru/base/io/MemoryStream.h8
-rw-r--r--include/cru/base/io/ProxyStream.h42
-rw-r--r--include/cru/base/io/Stream.h76
-rw-r--r--include/cru/base/platform/unix/UnixFileStream.h6
-rw-r--r--include/cru/base/platform/win/Stream.h12
-rw-r--r--src/base/Buffer.cpp276
-rw-r--r--src/base/CMakeLists.txt2
-rw-r--r--src/base/io/AutoReadStream.cpp21
-rw-r--r--src/base/io/BufferStream.cpp135
-rw-r--r--src/base/io/CFileStream.cpp23
-rw-r--r--src/base/io/MemoryStream.cpp19
-rw-r--r--src/base/io/ProxyStream.cpp37
-rw-r--r--src/base/io/Stream.cpp112
-rw-r--r--src/base/platform/unix/UnixFileStream.cpp15
-rw-r--r--src/base/platform/win/BridgeComStream.cpp36
-rw-r--r--src/base/platform/win/Stream.cpp32
-rw-r--r--src/base/platform/win/Win32SubProcess.cpp6
-rw-r--r--test/base/CMakeLists.txt3
-rw-r--r--test/base/io/AutoReadStreamTest.cpp32
-rw-r--r--test/base/io/BufferStreamTest.cpp39
-rw-r--r--test/base/io/MemoryStreamTest.cpp30
-rw-r--r--test/base/platform/win/StreamTest.cpp7
27 files changed, 428 insertions, 772 deletions
diff --git a/include/cru/base/Base.h b/include/cru/base/Base.h
index e62054eb..9d10a447 100644
--- a/include/cru/base/Base.h
+++ b/include/cru/base/Base.h
@@ -145,6 +145,9 @@ class CRU_BASE_API PlatformException : public Exception {
class CRU_BASE_API ErrnoException : public Exception {
public:
+ /**
+ * @brief will retrieve errno automatically.
+ */
ErrnoException();
explicit ErrnoException(int error_code);
/**
diff --git a/include/cru/base/Buffer.h b/include/cru/base/Buffer.h
deleted file mode 100644
index 17da4ada..00000000
--- a/include/cru/base/Buffer.h
+++ /dev/null
@@ -1,161 +0,0 @@
-#pragma once
-
-#include "Base.h"
-
-namespace cru {
-class CRU_BASE_API Buffer final {
- friend CRU_BASE_API void swap(Buffer& left, Buffer& right) noexcept;
-
- public:
- Buffer();
- explicit Buffer(Index size);
-
- Buffer(const Buffer& other);
- Buffer(Buffer&& other) noexcept;
-
- Buffer& operator=(const Buffer& other);
- Buffer& operator=(Buffer&& other) noexcept;
-
- ~Buffer();
-
- public:
- Index GetBufferSize() const { return size_; }
- Index GetUsedSize() const { return used_end_ - used_begin_; }
- bool IsNull() const { return ptr_ == nullptr; }
- bool IsUsedReachEnd() const { return used_end_ == size_; }
-
- Index GetFrontFree() const { return used_begin_; }
- Index GetBackFree() const { return size_ - used_end_; }
-
- Index GetUsedBegin() const { return used_begin_; }
- Index GetUsedEnd() const { return used_end_; }
-
- std::byte* GetPtr() { return GetPtrAt(0); }
- const std::byte* GetPtr() const { return GetPtrAt(0); }
-
- std::byte* GetPtrAt(Index index) { return ptr_ + index; }
- const std::byte* GetPtrAt(Index index) const { return ptr_ + index; }
-
- std::byte& GetRefAt(Index index) { return *GetPtrAt(index); }
- const std::byte& GetRefAt(Index index) const { return *GetPtrAt(index); }
-
- std::byte* GetUsedBeginPtr() { return GetPtrAt(GetUsedBegin()); }
- const std::byte* GetUsedBeginPtr() const { return GetPtrAt(GetUsedBegin()); }
- std::byte* GetUsedEndPtr() { return GetPtrAt(GetUsedEnd()); }
- const std::byte* GetUsedEndPtr() const { return GetPtrAt(GetUsedEnd()); }
-
- std::byte GetByteAt(Index index) const { return ptr_[index]; }
- void SetByteAt(Index index, std::byte value) { ptr_[index] = value; }
-
- void AssignBytes(std::byte* src, Index src_size, bool use_memmove = false) {
- return AssignBytes(0, src, 0, src_size, use_memmove);
- }
- void AssignBytes(Index dst_offset, std::byte* src, Index src_size,
- bool use_memmove = false) {
- return AssignBytes(dst_offset, src, 0, src_size, use_memmove);
- }
- void AssignBytes(Index dst_offset, std::byte* src, Index src_offset,
- Index src_size, bool use_memmove = false);
-
- /**
- * @brief Change the size of the buffer.
- *
- * Unless new size is the same as current size, the buffer is always released
- * and a new one is allocated. If preserve_used is true, the used size and old
- * data is copied to the new buffer. If new size is smaller than old used
- * size, then exceeded data will be lost.
- */
- void ResizeBuffer(Index new_size, bool preserve_used);
-
- /**
- * @brief Append data to the front of used bytes and increase used size.
- * @return The actual size of data saved.
- *
- * If there is no enough space left for new data, the rest space will be
- * written and the size of it will be returned, leaving exceeded data not
- * saved.
- */
- Index PushFront(const std::byte* other, Index other_size,
- bool use_memmove = false);
-
- bool PushBack(std::byte b);
-
- /**
- * @brief Append data to the back of used bytes and increase used size.
- * @return The actual size of data saved.
- *
- * If there is no enough space left for new data, the rest space will be
- * written and the size of it will be returned, leaving exceeded data not
- * saved.
- */
- Index PushBack(const std::byte* other, Index other_size,
- bool use_memmove = false);
-
- void PushBackCount(Index count);
-
- /**
- * @brief Move forward the used-begin ptr.
- * @return The actual size moved forward.
- *
- * If given size is bigger than current used size, the used size will be
- * returned and set to 0.
- */
- Index PopFront(Index size);
-
- /**
- * @brief Pop front data of used bytes into another buffer.
- * @return The actual size popped.
- *
- * If given size is bigger than current used size, then only current used size
- * of bytes will be popped. If given size is smaller than current used size,
- * then only given size of bytes will be popped.
- */
- Index PopFront(std::byte* buffer, Index size, bool use_memmove = false);
-
- /**
- * @brief Move backward the used-end ptr.
- * @return The actual size moved backward.
- *
- * If given size is bigger than current used size, the used size will be
- * returned and set to 0.
- */
- Index PopEnd(Index size);
-
- /**
- * @brief Pop back data of used bytes into another buffer.
- * @return The actual size popped.
- *
- * If given size is bigger than current used size, then only current used size
- * of bytes will be popped. If given size is smaller than current used size,
- * then only given size of bytes will be popped.
- */
- Index PopEnd(std::byte* buffer, Index size, bool use_memmove = false);
-
- operator std::byte*() { return GetPtr(); }
- operator const std::byte*() const { return GetPtr(); }
-
- /**
- * @brief Detach internal buffer and return it.
- * @param size If not null, size of the buffer is written to it.
- * @return The buffer pointer. May be nullptr.
- *
- * After detach, you are responsible to delete[] it.
- */
- std::byte* Detach(Index* size = nullptr);
-
- private:
- void Copy_(const Buffer& other);
- void Move_(Buffer&& other) noexcept;
- void Delete_() noexcept;
-
- void AssertValid();
-
- private:
- std::byte* ptr_;
- Index size_;
- Index used_begin_;
- Index used_end_;
-};
-
-CRU_BASE_API void swap(Buffer& left, Buffer& right) noexcept;
-} // namespace cru
diff --git a/include/cru/base/io/AutoReadStream.h b/include/cru/base/io/AutoReadStream.h
index 56e2beca..41ca1118 100644
--- a/include/cru/base/io/AutoReadStream.h
+++ b/include/cru/base/io/AutoReadStream.h
@@ -3,7 +3,6 @@
#include "BufferStream.h"
#include "Stream.h"
-#include <mutex>
#include <thread>
namespace cru::io {
@@ -45,18 +44,16 @@ class CRU_BASE_API AutoReadStream : public Stream {
~AutoReadStream() override;
public:
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
- void BeginToDrop(bool auto_close = true, bool auto_delete = true);
+ Stream* GetUnderlyingStream() { return stream_; }
protected:
+ bool DoCanWrite() override;
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
void DoFlush() override;
+ void DoClose() override;
private:
- void DoClose();
-
void BackgroundThreadRun();
private:
diff --git a/include/cru/base/io/BufferStream.h b/include/cru/base/io/BufferStream.h
index d4ee3837..c8e8f707 100644
--- a/include/cru/base/io/BufferStream.h
+++ b/include/cru/base/io/BufferStream.h
@@ -1,6 +1,5 @@
#pragma once
-#include "../Buffer.h"
#include "Stream.h"
#include <condition_variable>
@@ -8,12 +7,6 @@
#include <mutex>
namespace cru::io {
-class WriteAfterEofException : public Exception {
- public:
- using Exception::Exception;
- ~WriteAfterEofException() override = default;
-};
-
struct BufferStreamOptions {
/**
* Actually I have no ideas about the best value for this. May change it later
@@ -51,34 +44,50 @@ struct BufferStreamOptions {
};
/**
- * @brief SPSC (Single Producer Single Consumer) buffer stream.
- *
- * If used by multiple producer or multiple consumer, the behavior is undefined.
+ * @brief MPMC (Multiple Producer Multiple Consumer) buffer stream.
*/
class BufferStream : public Stream {
public:
- BufferStream(const BufferStreamOptions& options);
+ explicit BufferStream(const BufferStreamOptions& options = {});
~BufferStream() override;
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
- void SetEof();
+ void WriteEof();
protected:
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
+ void DoClose() override;
private:
- void DoClose();
+ struct Block {
+ std::byte* buffer;
+ Index size;
+ Index start;
+ Index end;
+
+ explicit Block(Index size);
+
+ CRU_DELETE_COPY(Block)
+
+ Block(Block&& other) noexcept;
+ Block& operator=(Block&& other) noexcept;
+
+ ~Block();
+
+ Index Read(std::byte* des, Index si);
+ Index Write(const std::byte* src, Index si);
+ bool IsFull() const;
+ bool IsEmpty() const;
+ };
- private:
Index block_size_;
Index max_block_count_;
- std::list<Buffer> buffer_list_;
- bool eof_;
+ std::list<Block> buffer_list_;
+ bool eof_written_;
std::mutex mutex_;
- std::condition_variable condition_variable_;
+ std::condition_variable read_cv_;
+ std::condition_variable write_cv_;
};
} // namespace cru::io
diff --git a/include/cru/base/io/CFileStream.h b/include/cru/base/io/CFileStream.h
index 0b58bdc9..66745955 100644
--- a/include/cru/base/io/CFileStream.h
+++ b/include/cru/base/io/CFileStream.h
@@ -11,15 +11,10 @@ class CRU_BASE_API CFileStream : public Stream {
explicit CFileStream(std::FILE* file, bool readable = true,
bool writable = true, bool auto_close = true);
- CRU_DELETE_COPY(CFileStream)
- CRU_DELETE_MOVE(CFileStream)
-
~CFileStream() override;
public:
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
- std::FILE* GetHandle() const;
+ std::FILE* GetHandle();
protected:
Index DoSeek(Index offset, SeekOrigin origin) override;
@@ -28,9 +23,7 @@ class CRU_BASE_API CFileStream : public Stream {
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
void DoFlush() override;
-
- private:
- void DoClose();
+ void DoClose() override;
private:
std::FILE* file_;
diff --git a/include/cru/base/io/MemoryStream.h b/include/cru/base/io/MemoryStream.h
index a1f90c3b..155219a8 100644
--- a/include/cru/base/io/MemoryStream.h
+++ b/include/cru/base/io/MemoryStream.h
@@ -14,18 +14,14 @@ class CRU_BASE_API MemoryStream : public Stream {
~MemoryStream() override;
public:
- void Close() override;
-
- std::byte* GetBuffer() const { return buffer_; }
+ std::byte* GetBuffer() { return buffer_; }
protected:
Index DoSeek(Index offset, SeekOrigin origin) override;
Index DoGetSize() override { return size_; }
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
-
- private:
- void DoClose();
+ void DoClose() override;
private:
std::byte* buffer_;
diff --git a/include/cru/base/io/ProxyStream.h b/include/cru/base/io/ProxyStream.h
deleted file mode 100644
index 42ec9dfd..00000000
--- a/include/cru/base/io/ProxyStream.h
+++ /dev/null
@@ -1,42 +0,0 @@
-#pragma once
-
-#include "Stream.h"
-
-#include <functional>
-
-namespace cru::io {
-struct ProxyStreamHandlers {
- std::function<Index(Index offset, Stream::SeekOrigin origin)> seek;
- std::function<Index(std::byte* buffer, Index offset, Index size)> read;
- std::function<Index(const std::byte* buffer, Index offset, Index size)> write;
- std::function<void()> flush;
-
- /**
- * @brief This method will be only called once when `Close` is called or the
- * stream is destructed.
- */
- std::function<void()> close;
-};
-
-class ProxyStream : public Stream {
- public:
- explicit ProxyStream(ProxyStreamHandlers handlers);
-
- ~ProxyStream() override;
-
- public:
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
- protected:
- Index DoSeek(Index offset, SeekOrigin origin) override;
- Index DoRead(std::byte* buffer, Index offset, Index size) override;
- Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
- void DoFlush() override;
-
- private:
- void DoClose();
-
- private:
- ProxyStreamHandlers handlers_;
-};
-} // namespace cru::io
diff --git a/include/cru/base/io/Stream.h b/include/cru/base/io/Stream.h
index 8c0d3669..f497d725 100644
--- a/include/cru/base/io/Stream.h
+++ b/include/cru/base/io/Stream.h
@@ -1,46 +1,51 @@
#pragma once
#include "../Base.h"
-#include "../Buffer.h"
+#include "../Guard.h" // IWYU pragma: keep
+#include <atomic>
#include <cstddef>
namespace cru::io {
-class CRU_BASE_API StreamOperationNotSupportedException : public Exception {
- public:
- explicit StreamOperationNotSupportedException(std::string operation);
+class Stream;
+class CRU_BASE_API StreamException : public Exception {
public:
- std::string GetOperation() const { return operation_; }
+ explicit StreamException(Stream* stream, std::string message = "",
+ std::shared_ptr<std::exception> inner = nullptr);
- public:
- static void CheckSeek(bool seekable);
- static void CheckRead(bool readable);
- static void CheckWrite(bool writable);
+ Stream* GetStream() const { return stream_; }
private:
- std::string operation_;
+ Stream* stream_;
};
-class CRU_BASE_API StreamClosedException : public Exception {
+class CRU_BASE_API StreamOperationNotSupportedException
+ : public StreamException {
public:
- StreamClosedException();
+ explicit StreamOperationNotSupportedException(Stream* stream,
+ std::string operation);
- CRU_DEFAULT_DESTRUCTOR(StreamClosedException)
+ public:
+ std::string GetOperation() const { return operation_; }
+
+ public:
+ static void CheckSeek(Stream* stream, bool seekable);
+ static void CheckRead(Stream* stream, bool readable);
+ static void CheckWrite(Stream* stream, bool writable);
- static void Check(bool closed);
+ private:
+ std::string operation_;
};
-#define CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE \
- void Close() override { DoClose(); }
+class CRU_BASE_API StreamClosedException : public StreamException {
+ using StreamException::StreamException;
+};
-#define CRU_STREAM_BEGIN_CLOSE \
- if (GetClosed()) return; \
- CloseGuard close_guard(this);
+class CRU_BASE_API StreamIOException : public StreamException {
+ using StreamException::StreamException;
+};
-/**
- * All stream is thread-unsafe by default unless being documented.
- */
class CRU_BASE_API Stream : public Object {
protected:
struct SupportedOperations {
@@ -49,12 +54,6 @@ class CRU_BASE_API Stream : public Object {
std::optional<bool> can_write;
};
- struct CloseGuard {
- explicit CloseGuard(Stream* stream) : stream(stream) {}
- ~CloseGuard() { stream->SetClosed(true); }
- Stream* stream;
- };
-
protected:
explicit Stream(SupportedOperations supported_operations = {});
Stream(std::optional<bool> can_seek, std::optional<bool> can_read,
@@ -62,8 +61,7 @@ class CRU_BASE_API Stream : public Object {
public:
enum class SeekOrigin { Current, Begin, End };
-
- ~Stream() override = default;
+ constexpr static Index kEOF = -1;
public:
bool CanSeek();
@@ -85,11 +83,11 @@ class CRU_BASE_API Stream : public Object {
Index Write(const char* buffer, Index size);
void Flush();
- virtual void Close() = 0;
- virtual Buffer ReadToEnd(Index grow_size = 256);
+ bool IsClosed();
+ bool Close();
- // Utf8 encoding.
+ virtual std::vector<std::byte> ReadToEnd(Index grow_size = 256);
std::string ReadToEndAsUtf8String();
protected:
@@ -103,17 +101,13 @@ class CRU_BASE_API Stream : public Object {
virtual Index DoRead(std::byte* buffer, Index offset, Index size);
virtual Index DoWrite(const std::byte* buffer, Index offset, Index size);
virtual void DoFlush();
+ virtual void DoClose();
- void SetSupportedOperations(SupportedOperations supported_operations) {
- supported_operations_ = std::move(supported_operations);
- }
-
- bool GetClosed() { return closed_; }
- void SetClosed(bool closed) { closed_ = closed; }
- void CheckClosed() { StreamClosedException::Check(closed_); }
+ void SetSupportedOperations(SupportedOperations supported_operations);
+ void CheckClosed();
private:
SupportedOperations supported_operations_;
- bool closed_;
+ std::atomic_bool closed_;
};
} // namespace cru::io
diff --git a/include/cru/base/platform/unix/UnixFileStream.h b/include/cru/base/platform/unix/UnixFileStream.h
index 1657bfbb..81174151 100644
--- a/include/cru/base/platform/unix/UnixFileStream.h
+++ b/include/cru/base/platform/unix/UnixFileStream.h
@@ -19,17 +19,13 @@ class UnixFileStream : public io::Stream {
~UnixFileStream() override;
public:
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
int GetFileDescriptor() const { return file_descriptor_; }
protected:
Index DoSeek(Index offset, SeekOrigin origin = SeekOrigin::Current) override;
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
-
- private:
- void DoClose();
+ void DoClose() override;
private:
UnixFileDescriptor file_descriptor_;
diff --git a/include/cru/base/platform/win/Stream.h b/include/cru/base/platform/win/Stream.h
index 104b3bd7..9dd33d35 100644
--- a/include/cru/base/platform/win/Stream.h
+++ b/include/cru/base/platform/win/Stream.h
@@ -24,15 +24,11 @@ class CRU_BASE_API Win32HandleStream : public io::Stream {
Index DoSeek(Index offset, SeekOrigin origin) override;
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
+ void DoClose() override;
public:
HANDLE GetHandle() { return handle_; }
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
- private:
- void DoClose();
-
private:
HANDLE handle_;
bool auto_close_;
@@ -51,15 +47,11 @@ class CRU_BASE_API ComStream : public io::Stream {
Index DoSeek(Index offset, SeekOrigin origin) override;
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
+ void DoClose() override;
public:
IStream* GetComStream() { return stream_; }
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
- private:
- void DoClose();
-
private:
IStream* stream_;
bool auto_release_;
diff --git a/src/base/Buffer.cpp b/src/base/Buffer.cpp
deleted file mode 100644
index 838c6f12..00000000
--- a/src/base/Buffer.cpp
+++ /dev/null
@@ -1,276 +0,0 @@
-#include "cru/base/Buffer.h"
-
-#include <cstring>
-
-namespace cru {
-namespace {
-void CheckSize(Index size) {
- if (size < 0) {
- throw Exception("Size of buffer can't be smaller than 0.");
- }
-}
-} // namespace
-
-Buffer::Buffer() {
- ptr_ = nullptr;
- size_ = used_begin_ = used_end_ = 0;
-}
-
-Buffer::Buffer(Index size) {
- CheckSize(size);
- if (size == 0) {
- ptr_ = nullptr;
- size_ = used_begin_ = used_end_ = 0;
- } else {
- ptr_ = new std::byte[size];
- size_ = size;
- used_begin_ = used_end_ = 0;
- }
- AssertValid();
-}
-
-Buffer::Buffer(const Buffer& other) { Copy_(other); }
-
-Buffer::Buffer(Buffer&& other) noexcept { Move_(std::move(other)); }
-
-Buffer& Buffer::operator=(const Buffer& other) {
- if (this != &other) {
- Delete_();
- Copy_(other);
- }
- return *this;
-}
-
-Buffer& Buffer::operator=(Buffer&& other) noexcept {
- if (this != &other) {
- Delete_();
- Move_(std::move(other));
- }
- return *this;
-}
-
-Buffer::~Buffer() { Delete_(); }
-
-void Buffer::AssignBytes(Index dst_offset, std::byte* src, Index src_offset,
- Index src_size, bool use_memmove) {
- CheckSize(src_size);
-
- AssertValid();
-
- (use_memmove ? std::memmove : std::memcpy)(ptr_ + dst_offset,
- src + src_offset, src_size);
- AssertValid();
-}
-
-void Buffer::ResizeBuffer(Index new_size, bool preserve_used) {
- CheckSize(new_size);
-
- AssertValid();
-
- if (new_size == 0) {
- Delete_();
- ptr_ = nullptr;
- size_ = used_begin_ = used_end_ = 0;
- return;
- }
-
- auto old_ptr = ptr_;
-
- ptr_ = new std::byte[new_size];
- size_ = new_size;
- used_begin_ = std::min(new_size, used_begin_);
- used_end_ = std::min(new_size, used_end_);
-
- if (old_ptr) {
- if (preserve_used && used_begin_ < used_end_) {
- std::memcpy(ptr_ + used_begin_, old_ptr + used_begin_,
- used_end_ - used_begin_);
- }
- delete[] old_ptr;
- }
-
- AssertValid();
-}
-
-Index Buffer::PushFront(const std::byte* other, Index other_size,
- bool use_memmove) {
- CheckSize(other_size);
-
- AssertValid();
-
- auto copy_size = std::min(used_begin_, other_size);
-
- if (copy_size) {
- used_begin_ -= copy_size;
- (use_memmove ? std::memmove : std::memcpy)(ptr_ + used_begin_, other,
- copy_size);
- }
-
- AssertValid();
-
- return copy_size;
-}
-
-bool Buffer::PushBack(std::byte b) {
- AssertValid();
- if (IsUsedReachEnd()) {
- return false;
- }
- ptr_[used_end_] = b;
- used_end_++;
- AssertValid();
- return true;
-}
-
-Index Buffer::PushBack(const std::byte* other, Index other_size,
- bool use_memmove) {
- CheckSize(other_size);
-
- AssertValid();
-
- auto copy_size = std::min(size_ - used_end_, other_size);
-
- if (copy_size) {
- (use_memmove ? std::memmove : std::memcpy)(ptr_ + used_end_, other,
- copy_size);
- used_end_ += copy_size;
- }
-
- AssertValid();
-
- return copy_size;
-}
-
-void Buffer::PushBackCount(Index count) {
- if (count < 0 || count > GetBackFree()) {
- throw Exception("Count out of range in PushBackCount.");
- }
- used_end_ += count;
-}
-
-Index Buffer::PopFront(Index size) {
- CheckSize(size);
-
- AssertValid();
-
- auto move = std::min(used_begin_, size);
- used_begin_ -= move;
-
- AssertValid();
-
- return move;
-}
-
-Index Buffer::PopFront(std::byte* buffer, Index size, bool use_memmove) {
- CheckSize(size);
-
- AssertValid();
-
- auto pop_size = std::min(GetUsedSize(), size);
-
- if (pop_size) {
- used_begin_ += pop_size;
- (use_memmove ? std::memmove : std::memcpy)(
- buffer, GetUsedBeginPtr() - pop_size, pop_size);
- }
-
- AssertValid();
-
- return pop_size;
-}
-
-Index Buffer::PopEnd(Index size) {
- CheckSize(size);
-
- AssertValid();
-
- auto move = std::min(size_ - used_end_, size);
- used_end_ += move;
-
- AssertValid();
-
- return move;
-}
-
-Index Buffer::PopEnd(std::byte* buffer, Index size, bool use_memmove) {
- CheckSize(size);
-
- AssertValid();
-
- auto pop_size = std::min(GetUsedSize(), size);
-
- if (pop_size) {
- used_end_ -= pop_size;
- (use_memmove ? std::memmove : std::memcpy)(buffer, GetUsedEndPtr(),
- pop_size);
- }
-
- AssertValid();
-
- return pop_size;
-}
-
-std::byte* Buffer::Detach(Index* size) {
- AssertValid();
-
- auto ptr = this->ptr_;
- if (size) {
- *size = this->size_;
- }
- this->ptr_ = nullptr;
- this->size_ = this->used_begin_ = this->used_end_ = 0;
-
- AssertValid();
-
- return ptr;
-}
-
-void Buffer::Copy_(const Buffer& other) {
- if (other.ptr_ == nullptr) {
- ptr_ = nullptr;
- size_ = used_begin_ = used_end_ = 0;
- } else {
- ptr_ = new std::byte[other.size_];
- size_ = other.size_;
- used_begin_ = other.used_begin_;
- used_end_ = other.used_end_;
- std::memcpy(ptr_ + used_begin_, other.ptr_ + used_begin_,
- used_end_ - used_begin_);
- }
- AssertValid();
-}
-
-void Buffer::Move_(Buffer&& other) noexcept {
- ptr_ = other.ptr_;
- size_ = other.size_;
- used_begin_ = other.used_begin_;
- used_end_ = other.used_end_;
- other.ptr_ = nullptr;
- other.size_ = other.used_begin_ = other.used_end_ = 0;
- AssertValid();
-}
-
-void Buffer::Delete_() noexcept {
- if (ptr_) {
- delete[] ptr_;
- }
-}
-
-void Buffer::AssertValid() {
- assert(size_ >= 0);
- assert(used_begin_ >= 0);
- assert(used_begin_ <= size_);
- assert(used_end_ >= 0);
- assert(used_end_ <= size_);
- assert(used_end_ >= used_begin_);
- assert((ptr_ == nullptr && size_ == 0) || (ptr_ != nullptr && size_ > 0));
-}
-
-void swap(Buffer& left, Buffer& right) noexcept {
- using std::swap;
- swap(left.ptr_, right.ptr_);
- swap(left.size_, right.size_);
- swap(left.used_begin_, right.used_begin_);
- swap(left.used_end_, right.used_end_);
-}
-} // namespace cru
diff --git a/src/base/CMakeLists.txt b/src/base/CMakeLists.txt
index 0840b130..a154ebee 100644
--- a/src/base/CMakeLists.txt
+++ b/src/base/CMakeLists.txt
@@ -1,6 +1,5 @@
add_library(CruBase
Base.cpp
- Buffer.cpp
PropertyTree.cpp
StringUtil.cpp
SubProcess.cpp
@@ -9,7 +8,6 @@ add_library(CruBase
io/BufferStream.cpp
io/CFileStream.cpp
io/Stream.cpp
- io/ProxyStream.cpp
io/Resource.cpp
io/MemoryStream.cpp
log/Logger.cpp
diff --git a/src/base/io/AutoReadStream.cpp b/src/base/io/AutoReadStream.cpp
index 0c035648..61cc9f26 100644
--- a/src/base/io/AutoReadStream.cpp
+++ b/src/base/io/AutoReadStream.cpp
@@ -8,7 +8,7 @@ namespace cru::io {
AutoReadStream::AutoReadStream(Stream* stream, bool auto_close,
bool auto_delete,
const AutoReadStreamOptions& options)
- : Stream(false, true, stream->CanSeek()),
+ : Stream(false, true, std::nullopt),
auto_close_(auto_close),
auto_delete_(auto_delete) {
auto buffer_stream_options = options.GetBufferStreamOptions();
@@ -19,10 +19,15 @@ AutoReadStream::AutoReadStream(Stream* stream, bool auto_close,
}
AutoReadStream::~AutoReadStream() {
- DoClose();
+ if (auto_delete_) {
+ delete stream_;
+ }
+ buffer_stream_->Close();
background_thread_.join();
}
+bool AutoReadStream::DoCanWrite() { return stream_->CanWrite(); }
+
Index AutoReadStream::DoRead(std::byte* buffer, Index offset, Index size) {
return buffer_stream_->Read(buffer, offset, size);
}
@@ -35,14 +40,9 @@ Index AutoReadStream::DoWrite(const std::byte* buffer, Index offset,
void AutoReadStream::DoFlush() { stream_->Flush(); }
void AutoReadStream::DoClose() {
- CRU_STREAM_BEGIN_CLOSE
if (auto_close_) {
stream_->Close();
}
- if (auto_delete_) {
- delete stream_;
- stream_ = nullptr;
- }
buffer_stream_->Close();
}
@@ -51,14 +51,13 @@ void AutoReadStream::BackgroundThreadRun() {
while (true) {
try {
auto read = stream_->Read(buffer.data(), buffer.size());
- if (read == 0) {
- buffer_stream_->SetEof();
+ if (read == kEOF) {
+ buffer_stream_->WriteEof();
break;
} else {
buffer_stream_->Write(buffer.data(), read);
}
- } catch (const StreamClosedException& exception) {
- buffer_stream_->Close();
+ } catch (const StreamException&) {
break;
}
}
diff --git a/src/base/io/BufferStream.cpp b/src/base/io/BufferStream.cpp
index 0dbb438b..fdcd25ff 100644
--- a/src/base/io/BufferStream.cpp
+++ b/src/base/io/BufferStream.cpp
@@ -6,35 +6,34 @@ BufferStream::BufferStream(const BufferStreamOptions& options)
: Stream(false, true, true) {
block_size_ = options.GetBlockSizeOrDefault();
max_block_count_ = options.GetMaxBlockCount();
-
- eof_ = false;
+ eof_written_ = false;
}
-BufferStream::~BufferStream() { DoClose(); }
+BufferStream::~BufferStream() {}
Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) {
std::unique_lock lock(mutex_);
- condition_variable_.wait(
- lock, [this] { return GetClosed() || !buffer_list_.empty() || eof_; });
+ read_cv_.wait(lock, [this] {
+ return eof_written_ || !buffer_list_.empty() || IsClosed();
+ });
- if (GetClosed()) {
- StreamClosedException::Check(true);
- }
+ CheckClosed();
- if (buffer_list_.empty() && eof_) {
- return 0;
+ if (buffer_list_.empty() && eof_written_) {
+ return kEOF;
}
- auto full = max_block_count_ > 0 && buffer_list_.size() == max_block_count_;
+ auto full_previously = max_block_count_ > 0 &&
+ buffer_list_.size() == max_block_count_ &&
+ buffer_list_.back().IsFull();
Index read = 0;
while (!buffer_list_.empty()) {
auto& stream_buffer = buffer_list_.front();
- auto this_read =
- stream_buffer.PopFront(buffer + offset + read, size - read);
- if (stream_buffer.GetUsedSize() == 0) {
+ auto this_read = stream_buffer.Read(buffer + offset + read, size - read);
+ if (stream_buffer.IsEmpty()) {
buffer_list_.pop_front();
}
read += this_read;
@@ -43,10 +42,8 @@ Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) {
}
}
- if (full && buffer_list_.size() < max_block_count_) {
- // By convention, there should be at most one producer waiting. So
- // notify_one and notify_all should be the same.
- condition_variable_.notify_one();
+ if (full_previously && buffer_list_.size() < max_block_count_) {
+ write_cv_.notify_all();
}
return read;
@@ -55,69 +52,109 @@ Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) {
Index BufferStream::DoWrite(const std::byte* buffer, Index offset, Index size) {
std::unique_lock lock(mutex_);
- if (eof_) {
- throw WriteAfterEofException(
- "Stream has been set eof. Can't write to it any more.");
- }
-
- condition_variable_.wait(lock, [this] {
- return GetClosed() || max_block_count_ <= 0 ||
+ write_cv_.wait(lock, [this] {
+ return eof_written_ || max_block_count_ <= 0 ||
buffer_list_.size() < max_block_count_ ||
- buffer_list_.back().GetBackFree() > 0;
+ !buffer_list_.back().IsFull() || IsClosed();
});
- if (GetClosed()) {
- StreamClosedException::Check(true);
+ CheckClosed();
+
+ if (eof_written_) {
+ throw StreamIOException(
+ this, "Stream has been set eof. Can't write to it any more.");
}
- auto empty = buffer_list_.empty();
+ auto empty_previously = buffer_list_.empty();
Index written = 0;
- if (empty) {
- buffer_list_.push_back(Buffer(block_size_));
+ if (empty_previously) {
+ buffer_list_.push_back(Block(block_size_));
}
while (true) {
- if (buffer_list_.back().GetBackFree() == 0) {
+ if (buffer_list_.back().IsFull()) {
if (max_block_count_ > 0 && buffer_list_.size() == max_block_count_) {
break;
}
- buffer_list_.push_back(Buffer(block_size_));
+ buffer_list_.push_back(Block(block_size_));
}
auto& stream_buffer = buffer_list_.back();
auto this_written =
- stream_buffer.PushBack(buffer + offset + written, size - written);
+ stream_buffer.Write(buffer + offset + written, size - written);
written += this_written;
if (written == size) {
break;
}
}
- if (empty) {
- // By convention, there should be at most one consumer waiting. So
- // notify_one and notify_all should be the same.
- condition_variable_.notify_one();
+ if (empty_previously) {
+ read_cv_.notify_all();
}
return written;
}
-void BufferStream::SetEof() {
+void BufferStream::DoClose() {
std::unique_lock lock(mutex_);
- eof_ = true;
- if (buffer_list_.empty()) {
- // By convention, there should be at most one consumer waiting. So
- // notify_one and notify_all should be the same.
- condition_variable_.notify_one();
+ buffer_list_.clear();
+ read_cv_.notify_all();
+ write_cv_.notify_all();
+}
+
+void BufferStream::WriteEof() {
+ std::unique_lock lock(mutex_);
+
+ eof_written_ = true;
+ read_cv_.notify_all();
+ write_cv_.notify_all();
+}
+
+BufferStream::Block::Block(Index size)
+ : buffer(new std::byte[size]), size(size), start(0), end(0) {}
+
+BufferStream::Block::Block(Block&& other) noexcept
+ : buffer(other.buffer),
+ size(other.size),
+ start(other.start),
+ end(other.end) {
+ other.buffer = nullptr;
+ other.size = other.start = other.end = 0;
+}
+
+BufferStream::Block& BufferStream::Block::operator=(Block&& other) noexcept {
+ if (this != &other) {
+ delete[] buffer;
+ buffer = other.buffer;
+ size = other.size;
+ start = other.start;
+ end = other.end;
+ other.buffer = nullptr;
+ other.size = other.start = other.end = 0;
}
+ return *this;
}
-void BufferStream::DoClose() {
- CRU_STREAM_BEGIN_CLOSE
- SetClosed(true);
- condition_variable_.notify_all();
- buffer_list_.clear();
+BufferStream::Block::~Block() { delete[] buffer; }
+
+Index BufferStream::Block::Read(std::byte* des, Index si) {
+ si = std::min(si, end - start);
+ std::memcpy(des, buffer + start, si);
+ start += si;
+ return si;
+}
+
+Index BufferStream::Block::Write(const std::byte* src, Index si) {
+ si = std::min(si, size - end);
+ std::memcpy(buffer + end, src, si);
+ end += si;
+ return si;
}
+
+bool BufferStream::Block::IsFull() const { return end == size; }
+
+bool BufferStream::Block::IsEmpty() const { return end == start; }
+
} // namespace cru::io
diff --git a/src/base/io/CFileStream.cpp b/src/base/io/CFileStream.cpp
index db477077..9632cba9 100644
--- a/src/base/io/CFileStream.cpp
+++ b/src/base/io/CFileStream.cpp
@@ -27,7 +27,8 @@ CFileStream::CFileStream(const char* path, const char* mode)
file_(std::fopen(path, mode)),
auto_close_(true) {
if (file_ == nullptr) {
- throw ErrnoException("Cannot open file.");
+ throw StreamIOException(this, "fopen failed.",
+ std::make_shared<ErrnoException>());
}
}
@@ -40,7 +41,7 @@ CFileStream::CFileStream(std::FILE* file, bool readable, bool writable,
}
CFileStream::~CFileStream() {
- if (auto_close_ && file_ != nullptr) {
+ if (file_ && auto_close_) {
std::fclose(file_);
}
}
@@ -60,7 +61,8 @@ static int ConvertOriginFlag(Stream::SeekOrigin origin) {
Index CFileStream::DoSeek(Index offset, SeekOrigin origin) {
if (std::fseek(file_, offset, ConvertOriginFlag(origin))) {
- throw ErrnoException("Seek failed.");
+ throw StreamIOException(this, "fseek failed.",
+ std::make_shared<ErrnoException>());
}
return DoTell();
}
@@ -68,7 +70,8 @@ Index CFileStream::DoSeek(Index offset, SeekOrigin origin) {
Index CFileStream::DoTell() {
long position = std::ftell(file_);
if (position == -1) {
- throw ErrnoException("Tell failed.");
+ throw StreamIOException(this, "ftell failed.",
+ std::make_shared<ErrnoException>());
}
return position;
}
@@ -77,20 +80,28 @@ void CFileStream::DoRewind() { std::rewind(file_); }
Index CFileStream::DoRead(std::byte* buffer, Index offset, Index size) {
auto count = std::fread(buffer + offset, 1, size, file_);
+ if (std::ferror(file_)) {
+ throw StreamIOException(this, "Error occurred when reading C FILE.");
+ }
+ if (count == 0 && std::feof(file_)) {
+ return kEOF;
+ }
return count;
}
Index CFileStream::DoWrite(const std::byte* buffer, Index offset, Index size) {
auto count = std::fwrite(buffer + offset, 1, size, file_);
+ if (std::ferror(file_)) {
+ throw StreamIOException(this, "Error occurred when writing C FILE.");
+ }
return count;
}
void CFileStream::DoFlush() { std::fflush(file_); }
void CFileStream::DoClose() {
- CRU_STREAM_BEGIN_CLOSE
if (auto_close_ && !std::fclose(file_)) {
- throw Exception("Failed to close FILE.");
+ throw StreamIOException(this, "fclose failed.");
}
file_ = nullptr;
}
diff --git a/src/base/io/MemoryStream.cpp b/src/base/io/MemoryStream.cpp
index 4d289197..555526cd 100644
--- a/src/base/io/MemoryStream.cpp
+++ b/src/base/io/MemoryStream.cpp
@@ -20,9 +20,11 @@ MemoryStream::MemoryStream(
}
}
-MemoryStream::~MemoryStream() {}
-
-void MemoryStream::Close() { DoClose(); }
+MemoryStream::~MemoryStream() {
+ if (buffer_ && release_func_) {
+ release_func_(buffer_, size_);
+ }
+}
Index MemoryStream::DoSeek(Index offset, SeekOrigin origin) {
switch (origin) {
@@ -40,6 +42,10 @@ Index MemoryStream::DoSeek(Index offset, SeekOrigin origin) {
}
Index MemoryStream::DoRead(std::byte* buffer, Index offset, Index size) {
+ if (position_ == size_) {
+ return kEOF;
+ }
+
if (position_ + size > size_) {
size = size_ - position_;
}
@@ -64,10 +70,11 @@ Index MemoryStream::DoWrite(const std::byte* buffer, Index offset, Index size) {
}
void MemoryStream::DoClose() {
- CRU_STREAM_BEGIN_CLOSE
- release_func_(buffer_, size_);
+ if (release_func_) {
+ release_func_(buffer_, size_);
+ release_func_ = {};
+ }
buffer_ = nullptr;
- release_func_ = {};
}
} // namespace cru::io
diff --git a/src/base/io/ProxyStream.cpp b/src/base/io/ProxyStream.cpp
deleted file mode 100644
index de66169e..00000000
--- a/src/base/io/ProxyStream.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-#include "cru/base/io/ProxyStream.h"
-#include "cru/base/io/Stream.h"
-
-namespace cru::io {
-ProxyStream::ProxyStream(ProxyStreamHandlers handlers)
- : Stream(static_cast<bool>(handlers.seek), static_cast<bool>(handlers.read),
- static_cast<bool>(handlers.write)),
- handlers_(std::move(handlers)) {}
-
-ProxyStream::~ProxyStream() { DoClose(); }
-
-Index ProxyStream::DoSeek(Index offset, SeekOrigin origin) {
- return handlers_.seek(offset, origin);
-}
-
-Index ProxyStream::DoRead(std::byte* buffer, Index offset, Index size) {
- return handlers_.read(buffer, offset, size);
-}
-
-Index ProxyStream::DoWrite(const std::byte* buffer, Index offset, Index size) {
- return handlers_.write(buffer, offset, size);
-}
-
-void ProxyStream::DoFlush() {
- if (handlers_.flush) {
- handlers_.flush();
- }
-}
-
-void ProxyStream::DoClose() {
- CRU_STREAM_BEGIN_CLOSE
- if (handlers_.close) {
- handlers_.close();
- }
- handlers_ = {};
-}
-} // namespace cru::io
diff --git a/src/base/io/Stream.cpp b/src/base/io/Stream.cpp
index c7286241..b0a9ab0c 100644
--- a/src/base/io/Stream.cpp
+++ b/src/base/io/Stream.cpp
@@ -1,33 +1,34 @@
#include "cru/base/io/Stream.h"
-#include <algorithm>
+#include <atomic>
#include <format>
-#include <iterator>
+#include <ranges>
#include <utility>
namespace cru::io {
+StreamException::StreamException(Stream* stream, std::string message,
+ std::shared_ptr<std::exception> inner)
+ : Exception(std::move(message), std::move(inner)), stream_(stream) {}
+
StreamOperationNotSupportedException::StreamOperationNotSupportedException(
- std::string operation)
- : Exception(std::format("Stream operation {} not supported.", operation)),
+ Stream* stream, std::string operation)
+ : StreamException(
+ stream, std::format("Stream operation {} not supported.", operation)),
operation_(std::move(operation)) {}
-void StreamOperationNotSupportedException::CheckSeek(bool seekable) {
- if (!seekable) throw StreamOperationNotSupportedException("seek");
-}
-
-void StreamOperationNotSupportedException::CheckRead(bool readable) {
- if (!readable) throw StreamOperationNotSupportedException("read");
+void StreamOperationNotSupportedException::CheckSeek(Stream* stream,
+ bool seekable) {
+ if (!seekable) throw StreamOperationNotSupportedException(stream, "seek");
}
-void StreamOperationNotSupportedException::CheckWrite(bool writable) {
- if (!writable) throw StreamOperationNotSupportedException("write");
+void StreamOperationNotSupportedException::CheckRead(Stream* stream,
+ bool readable) {
+ if (!readable) throw StreamOperationNotSupportedException(stream, "read");
}
-StreamClosedException::StreamClosedException()
- : Exception("Stream is already closed.") {}
-
-void StreamClosedException::Check(bool closed) {
- if (closed) throw StreamClosedException();
+void StreamOperationNotSupportedException::CheckWrite(Stream* stream,
+ bool writable) {
+ if (!writable) throw StreamOperationNotSupportedException(stream, "write");
}
Stream::Stream(SupportedOperations supported_operations)
@@ -44,7 +45,7 @@ bool Stream::CanSeek() {
Index Stream::Seek(Index offset, SeekOrigin origin) {
CheckClosed();
- StreamOperationNotSupportedException::CheckSeek(DoCanSeek());
+ StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek());
return DoSeek(offset, origin);
}
@@ -70,7 +71,7 @@ bool Stream::CanRead() {
Index Stream::Read(std::byte* buffer, Index offset, Index size) {
CheckClosed();
- StreamOperationNotSupportedException::CheckRead(DoCanRead());
+ StreamOperationNotSupportedException::CheckRead(this, DoCanRead());
return DoRead(buffer, offset, size);
}
@@ -93,7 +94,7 @@ bool Stream::CanWrite() {
Index Stream::Write(const std::byte* buffer, Index offset, Index size) {
CheckClosed();
- StreamOperationNotSupportedException::CheckWrite(DoCanWrite());
+ StreamOperationNotSupportedException::CheckWrite(this, DoCanWrite());
return DoWrite(buffer, offset, size);
}
@@ -114,6 +115,46 @@ void Stream::Flush() {
DoFlush();
}
+bool Stream::IsClosed() { return closed_.load(std::memory_order_acquire); }
+
+bool Stream::Close() {
+ bool expected = false;
+ if (closed_.compare_exchange_strong(expected, true,
+ std::memory_order_acq_rel)) {
+ DoClose();
+ }
+ return expected;
+}
+
+std::vector<std::byte> Stream::ReadToEnd(Index grow_size) {
+ std::vector<std::byte> buffer;
+ Index pos = 0;
+ while (true) {
+ if (pos == buffer.size()) {
+ buffer.resize(buffer.size() + grow_size);
+ }
+
+ auto read = Read(buffer.data(), pos, buffer.size() - pos);
+ if (read == kEOF) {
+ break;
+ }
+ pos += read;
+ }
+ buffer.resize(pos);
+ return buffer;
+}
+
+std::string Stream::ReadToEndAsUtf8String() {
+ auto buffer = ReadToEnd();
+ return std::views::transform(
+ buffer, [](std::byte c) { return static_cast<char>(c); }) |
+ std::ranges::to<std::string>();
+}
+
+void Stream::SetSupportedOperations(SupportedOperations supported_operations) {
+ supported_operations_ = std::move(supported_operations);
+}
+
bool Stream::DoCanSeek() {
if (supported_operations_.can_seek) {
return *supported_operations_.can_seek;
@@ -149,17 +190,17 @@ Index Stream::DoSeek(Index offset, SeekOrigin origin) {
}
Index Stream::DoTell() {
- StreamOperationNotSupportedException::CheckSeek(DoCanSeek());
+ StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek());
return DoSeek(0, SeekOrigin::Current);
}
void Stream::DoRewind() {
- StreamOperationNotSupportedException::CheckSeek(DoCanSeek());
+ StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek());
DoSeek(0, SeekOrigin::Begin);
}
Index Stream::DoGetSize() {
- StreamOperationNotSupportedException::CheckSeek(DoCanSeek());
+ StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek());
Index current_position = DoTell();
Seek(0, SeekOrigin::End);
Index size = DoTell();
@@ -177,27 +218,12 @@ Index Stream::DoWrite(const std::byte* buffer, Index offset, Index size) {
void Stream::DoFlush() {}
-Buffer Stream::ReadToEnd(Index grow_size) {
- Buffer buffer(grow_size);
- while (true) {
- auto read = Read(buffer.GetUsedEndPtr(), buffer.GetBackFree());
- buffer.PushBackCount(read);
- if (read == 0) {
- break;
- }
- if (buffer.IsUsedReachEnd()) {
- buffer.ResizeBuffer(buffer.GetBufferSize() + grow_size, true);
- }
+void Stream::DoClose() {}
+
+void Stream::CheckClosed() {
+ if (IsClosed()) {
+ throw StreamClosedException(this, "Stream is closed.");
}
- return buffer;
}
-std::string Stream::ReadToEndAsUtf8String() {
- auto buffer = ReadToEnd();
- std::string result;
- std::transform(buffer.GetUsedBeginPtr(), buffer.GetUsedEndPtr(),
- std::back_inserter(result),
- [](std::byte c) { return static_cast<char>(c); });
- return result;
-}
} // namespace cru::io
diff --git a/src/base/platform/unix/UnixFileStream.cpp b/src/base/platform/unix/UnixFileStream.cpp
index df1ddffa..aaaa8d6a 100644
--- a/src/base/platform/unix/UnixFileStream.cpp
+++ b/src/base/platform/unix/UnixFileStream.cpp
@@ -37,7 +37,7 @@ int MapSeekOrigin(Stream::SeekOrigin origin) {
}
} // namespace
-UnixFileStream::UnixFileStream(const char *path, int oflag, mode_t mode) {
+UnixFileStream::UnixFileStream(const char* path, int oflag, mode_t mode) {
file_descriptor_ = UnixFileDescriptor(::open(path, oflag, mode));
if (file_descriptor_ == -1) {
throw ErrnoException(std::format(
@@ -52,7 +52,7 @@ UnixFileStream::UnixFileStream(UnixFileDescriptor fd, bool can_seek,
bool can_read, bool can_write)
: Stream(can_seek, can_read, can_write), file_descriptor_(std::move(fd)) {}
-UnixFileStream::~UnixFileStream() { DoClose(); }
+UnixFileStream::~UnixFileStream() { file_descriptor_ = {}; }
Index UnixFileStream::DoSeek(Index offset, SeekOrigin origin) {
off_t result = ::lseek(file_descriptor_, offset, MapSeekOrigin(origin));
@@ -62,7 +62,7 @@ Index UnixFileStream::DoSeek(Index offset, SeekOrigin origin) {
return result;
}
-Index UnixFileStream::DoRead(std::byte *buffer, Index offset, Index size) {
+Index UnixFileStream::DoRead(std::byte* buffer, Index offset, Index size) {
auto result = ::read(file_descriptor_, buffer + offset, size);
if (result == -1) {
throw ErrnoException("Failed to read file.");
@@ -70,7 +70,7 @@ Index UnixFileStream::DoRead(std::byte *buffer, Index offset, Index size) {
return result;
}
-Index UnixFileStream::DoWrite(const std::byte *buffer, Index offset,
+Index UnixFileStream::DoWrite(const std::byte* buffer, Index offset,
Index size) {
auto result = ::write(file_descriptor_, buffer + offset, size);
if (result == -1) {
@@ -79,10 +79,5 @@ Index UnixFileStream::DoWrite(const std::byte *buffer, Index offset,
return result;
}
-void UnixFileStream::DoClose() {
- CRU_STREAM_BEGIN_CLOSE
- if (file_descriptor_) {
- file_descriptor_ = {};
- }
-}
+void UnixFileStream::DoClose() { file_descriptor_ = {}; }
} // namespace cru::platform::unix
diff --git a/src/base/platform/win/BridgeComStream.cpp b/src/base/platform/win/BridgeComStream.cpp
index c6987ab2..921209b4 100644
--- a/src/base/platform/win/BridgeComStream.cpp
+++ b/src/base/platform/win/BridgeComStream.cpp
@@ -2,7 +2,7 @@
#include "cru/base/io/Stream.h"
namespace cru::platform::win {
-BridgeComStream::BridgeComStream(io::Stream *stream)
+BridgeComStream::BridgeComStream(io::Stream* stream)
: stream_(stream), ref_count_(1) {}
BridgeComStream::~BridgeComStream() {}
@@ -20,17 +20,17 @@ ULONG BridgeComStream::Release() {
return ref_count_;
}
-HRESULT BridgeComStream::QueryInterface(const IID &riid, void **ppvObject) {
+HRESULT BridgeComStream::QueryInterface(const IID& riid, void** ppvObject) {
if (riid == IID_IStream) {
- *ppvObject = static_cast<IStream *>(this);
+ *ppvObject = static_cast<IStream*>(this);
AddRef();
return S_OK;
} else if (riid == IID_ISequentialStream) {
- *ppvObject = static_cast<ISequentialStream *>(this);
+ *ppvObject = static_cast<ISequentialStream*>(this);
AddRef();
return S_OK;
} else if (riid == IID_IUnknown) {
- *ppvObject = static_cast<IUnknown *>(this);
+ *ppvObject = static_cast<IUnknown*>(this);
AddRef();
return S_OK;
} else {
@@ -38,18 +38,22 @@ HRESULT BridgeComStream::QueryInterface(const IID &riid, void **ppvObject) {
}
}
-HRESULT BridgeComStream::Read(void *pv, ULONG cb, ULONG *pcbRead) {
- *pcbRead = stream_->Read(static_cast<std::byte *>(pv), cb);
- return S_OK;
+HRESULT BridgeComStream::Read(void* pv, ULONG cb, ULONG* pcbRead) {
+ auto count = stream_->Read(static_cast<std::byte*>(pv), cb);
+ if (count == cru::io::Stream::kEOF) {
+ count = 0;
+ }
+ *pcbRead = count;
+ return cb == count ? S_OK : S_FALSE;
}
-HRESULT BridgeComStream::Write(const void *pv, ULONG cb, ULONG *pcbWritten) {
- *pcbWritten = stream_->Write(static_cast<const std::byte *>(pv), cb);
+HRESULT BridgeComStream::Write(const void* pv, ULONG cb, ULONG* pcbWritten) {
+ *pcbWritten = stream_->Write(static_cast<const std::byte*>(pv), cb);
return S_OK;
}
HRESULT BridgeComStream::Seek(LARGE_INTEGER dlibMove, DWORD dwOrigin,
- ULARGE_INTEGER *plibNewPosition) {
+ ULARGE_INTEGER* plibNewPosition) {
io::Stream::SeekOrigin so;
switch (dwOrigin) {
@@ -74,9 +78,9 @@ HRESULT BridgeComStream::SetSize(ULARGE_INTEGER libNewSize) {
return E_NOTIMPL;
}
-HRESULT BridgeComStream::CopyTo(IStream *pstm, ULARGE_INTEGER cb,
- ULARGE_INTEGER *pcbRead,
- ULARGE_INTEGER *pcbWritten) {
+HRESULT BridgeComStream::CopyTo(IStream* pstm, ULARGE_INTEGER cb,
+ ULARGE_INTEGER* pcbRead,
+ ULARGE_INTEGER* pcbWritten) {
return E_NOTIMPL;
}
@@ -94,11 +98,11 @@ HRESULT BridgeComStream::UnlockRegion(ULARGE_INTEGER libOffset,
return S_OK;
}
-HRESULT BridgeComStream::Stat(STATSTG *pstatstg, DWORD grfStatFlag) {
+HRESULT BridgeComStream::Stat(STATSTG* pstatstg, DWORD grfStatFlag) {
return E_NOTIMPL;
}
-HRESULT BridgeComStream::Clone(IStream **ppstm) {
+HRESULT BridgeComStream::Clone(IStream** ppstm) {
*ppstm = new BridgeComStream(stream_);
return S_OK;
}
diff --git a/src/base/platform/win/Stream.cpp b/src/base/platform/win/Stream.cpp
index 10b80a16..611b2ca3 100644
--- a/src/base/platform/win/Stream.cpp
+++ b/src/base/platform/win/Stream.cpp
@@ -77,7 +77,11 @@ Win32HandleStream::Win32HandleStream(Win32Handle&& handle, bool can_seek,
: Win32HandleStream(handle.Release(), true, can_seek, can_read, can_write) {
}
-Win32HandleStream::~Win32HandleStream() { DoClose(); }
+Win32HandleStream::~Win32HandleStream() {
+ if (handle_ && auto_close_) {
+ ::CloseHandle(handle_);
+ }
+}
Index Win32HandleStream::DoSeek(Index offset, SeekOrigin origin) {
DWORD method = 0;
@@ -104,10 +108,14 @@ Index Win32HandleStream::DoRead(std::byte* buffer, Index offset, Index size) {
&real_read, nullptr);
if (r == FALSE) {
auto e = ::GetLastError();
- if (e != ERROR_BROKEN_PIPE || e != ERROR_BROKEN_PIPE) {
+ if (e != ERROR_BROKEN_PIPE && e != ERROR_MORE_DATA) {
throw Win32Error(e, "Failed to call ReadFile.");
}
}
+
+ if (real_read == 0) {
+ return kEOF;
+ }
return real_read;
}
@@ -120,13 +128,10 @@ Index Win32HandleStream::DoWrite(const std::byte* buffer, Index offset,
}
void Win32HandleStream::DoClose() {
- CRU_STREAM_BEGIN_CLOSE
-
if (auto_close_) {
::CloseHandle(handle_);
}
-
- handle_ = {};
+ handle_ = nullptr;
}
IStream* ToComStream(io::Stream* stream) {
@@ -185,7 +190,11 @@ ComStream::ComStream(IStream* com_stream, bool auto_release, bool can_seek,
stream_(com_stream),
auto_release_(auto_release) {}
-ComStream::~ComStream() { DoClose(); }
+ComStream::~ComStream() {
+ if (stream_ && auto_release_) {
+ stream_->Release();
+ }
+}
Index ComStream::DoSeek(Index offset, SeekOrigin origin) {
DWORD dwOrigin = 0;
@@ -210,6 +219,9 @@ Index ComStream::DoSeek(Index offset, SeekOrigin origin) {
Index ComStream::DoRead(std::byte* buffer, Index offset, Index size) {
ULONG n_read;
CheckHResult(stream_->Read(buffer + offset, size, &n_read));
+ if (n_read == 0) {
+ return kEOF;
+ }
return n_read;
}
@@ -225,11 +237,9 @@ Index ComStream::DoWrite(const std::byte* buffer, Index offset, Index size) {
}
void ComStream::DoClose() {
- CRU_STREAM_BEGIN_CLOSE
-
- if (stream_ && auto_release_) {
+ if (auto_release_) {
stream_->Release();
- stream_ = nullptr;
}
+ stream_ = nullptr;
}
} // namespace cru::platform::win
diff --git a/src/base/platform/win/Win32SubProcess.cpp b/src/base/platform/win/Win32SubProcess.cpp
index aed3937c..4f268ef3 100644
--- a/src/base/platform/win/Win32SubProcess.cpp
+++ b/src/base/platform/win/Win32SubProcess.cpp
@@ -1,8 +1,10 @@
#include "cru/base/platform/win/Win32SubProcess.h"
-#include <processthreadsapi.h>
-#include <synchapi.h>
#include "cru/base/StringUtil.h"
#include "cru/base/SubProcess.h"
+#include "cru/base/io/AutoReadStream.h"
+
+#include <processthreadsapi.h>
+#include <synchapi.h>
#include <memory>
#include <string_view>
diff --git a/test/base/CMakeLists.txt b/test/base/CMakeLists.txt
index 52f61a8c..94022b1a 100644
--- a/test/base/CMakeLists.txt
+++ b/test/base/CMakeLists.txt
@@ -5,6 +5,9 @@ add_executable(CruBaseTest
StringUtilTest.cpp
SubProcessTest.cpp
TimerTest.cpp
+ io/AutoReadStreamTest.cpp
+ io/BufferStreamTest.cpp
+ io/MemoryStreamTest.cpp
toml/ParserTest.cpp
xml/ParserTest.cpp
)
diff --git a/test/base/io/AutoReadStreamTest.cpp b/test/base/io/AutoReadStreamTest.cpp
new file mode 100644
index 00000000..e95b7c24
--- /dev/null
+++ b/test/base/io/AutoReadStreamTest.cpp
@@ -0,0 +1,32 @@
+#include "cru/base/io/AutoReadStream.h"
+#include "cru/base/io/MemoryStream.h"
+
+#include <catch2/catch_test_macros.hpp>
+
+#include <algorithm>
+#include <cstddef>
+#include <ranges>
+
+TEST_CASE("AutoReadStream should work.", "[io][stream]") {
+ using namespace cru::io;
+
+ const int size = 100;
+ std::vector<std::byte> buffer(size);
+ buffer[1] = std::byte(0xf0);
+ buffer[2] = std::byte(0x0f);
+
+ MemoryStream underlying_stream(buffer.data(), buffer.size(), true);
+ AutoReadStream stream(&underlying_stream, true, false);
+
+ REQUIRE(stream.CanRead());
+ REQUIRE(!stream.CanWrite());
+
+ std::vector<std::byte> buffer2(size * 2);
+
+ auto read = stream.Read(buffer2.data(), buffer2.size());
+ auto read2 = stream.Read(buffer2.data(), size, 1);
+
+ REQUIRE(read == size);
+ REQUIRE(std::ranges::equal(buffer, buffer2 | std::views::take(size)));
+ REQUIRE(read2 == Stream::kEOF);
+}
diff --git a/test/base/io/BufferStreamTest.cpp b/test/base/io/BufferStreamTest.cpp
new file mode 100644
index 00000000..70780b48
--- /dev/null
+++ b/test/base/io/BufferStreamTest.cpp
@@ -0,0 +1,39 @@
+#include "cru/base/io/BufferStream.h"
+
+#include <catch2/catch_test_macros.hpp>
+
+#include <algorithm>
+#include <cstddef>
+#include <ranges>
+#include <thread>
+
+TEST_CASE("BufferStream should work.", "[io][stream]") {
+ using namespace cru::io;
+
+ const int size = 100;
+ std::vector<std::byte> buffer(size);
+ buffer[1] = std::byte(0xf0);
+ buffer[2] = std::byte(0x0f);
+
+ BufferStream stream;
+
+ std::vector<std::byte> buffer2(size * 2);
+ cru::Index read, read2;
+
+ std::thread read_thread([&] {
+ read = stream.Read(buffer2.data(), buffer2.size());
+ read2 = stream.Read(buffer2.data(), size, 1);
+ });
+
+ std::thread write_thread([&] {
+ stream.Write(buffer.data(), buffer.size());
+ stream.WriteEof();
+ });
+
+ read_thread.join();
+ write_thread.join();
+
+ REQUIRE(read == size);
+ REQUIRE(std::ranges::equal(buffer, buffer2 | std::views::take(size)));
+ REQUIRE(read2 == Stream::kEOF);
+}
diff --git a/test/base/io/MemoryStreamTest.cpp b/test/base/io/MemoryStreamTest.cpp
new file mode 100644
index 00000000..684971b1
--- /dev/null
+++ b/test/base/io/MemoryStreamTest.cpp
@@ -0,0 +1,30 @@
+#include "cru/base/io/MemoryStream.h"
+#include "cru/base/io/Stream.h"
+
+#include <catch2/catch_test_macros.hpp>
+
+#include <algorithm>
+#include <cstddef>
+#include <ranges>
+
+TEST_CASE("MemoryStream should work.", "[io][stream]") {
+ using namespace cru::io;
+ const int size = 100;
+ std::vector<std::byte> buffer(size);
+
+ buffer[1] = std::byte(0xf0);
+ buffer[2] = std::byte(0x0f);
+
+ MemoryStream stream(buffer.data(), size, true);
+
+ REQUIRE(stream.CanRead());
+ REQUIRE(!stream.CanWrite());
+
+ std::vector<std::byte> buffer2(size * 2);
+
+ auto read = stream.Read(buffer2.data(), buffer2.size());
+ auto read2 = stream.Read(buffer2.data(), 1);
+ REQUIRE(read == size);
+ REQUIRE(std::ranges::equal(buffer, buffer2 | std::views::take(size)));
+ REQUIRE(read2 == Stream::kEOF);
+}
diff --git a/test/base/platform/win/StreamTest.cpp b/test/base/platform/win/StreamTest.cpp
index f42fc5c2..56761b14 100644
--- a/test/base/platform/win/StreamTest.cpp
+++ b/test/base/platform/win/StreamTest.cpp
@@ -1,3 +1,4 @@
+#include "cru/base/Guard.h"
#include "cru/base/StringUtil.h"
#include "cru/base/platform/win/Stream.h"
@@ -15,6 +16,7 @@ TEST_CASE("StreamConvert FileStreamWork", "[stream]") {
(std::filesystem::temp_directory_path() / "cru_test_temp.XXXXXX")
.native();
_wmktemp(temp_file_path.data());
+ Guard _([temp_file_path] { std::filesystem::remove(temp_file_path); });
std::string path = string::ToUtf8String(temp_file_path);
@@ -32,8 +34,6 @@ TEST_CASE("StreamConvert FileStreamWork", "[stream]") {
REQUIRE(std::string_view(buffer.get(), 3) == "abc");
com_stream->Release();
file2.Close();
-
- std::filesystem::remove(temp_file_path);
}
TEST_CASE("ComStream Work", "[stream]") {
@@ -45,6 +45,7 @@ TEST_CASE("ComStream Work", "[stream]") {
(std::filesystem::temp_directory_path() / "cru_test_temp.XXXXXX")
.native();
_wmktemp(temp_file_path.data());
+ Guard _([temp_file_path] { std::filesystem::remove(temp_file_path); });
std::string path = string::ToUtf8String(temp_file_path);
@@ -62,6 +63,4 @@ TEST_CASE("ComStream Work", "[stream]") {
REQUIRE(std::string_view(reinterpret_cast<const char*>(buffer.get()), 3) ==
"abc");
file2.Close();
-
- std::filesystem::remove(temp_file_path);
}