aboutsummaryrefslogtreecommitdiff
path: root/src/base/io
diff options
context:
space:
mode:
Diffstat (limited to 'src/base/io')
-rw-r--r--src/base/io/AutoReadStream.cpp21
-rw-r--r--src/base/io/BufferStream.cpp135
-rw-r--r--src/base/io/CFileStream.cpp23
-rw-r--r--src/base/io/MemoryStream.cpp19
-rw-r--r--src/base/io/ProxyStream.cpp37
-rw-r--r--src/base/io/Stream.cpp112
6 files changed, 195 insertions, 152 deletions
diff --git a/src/base/io/AutoReadStream.cpp b/src/base/io/AutoReadStream.cpp
index 0c035648..61cc9f26 100644
--- a/src/base/io/AutoReadStream.cpp
+++ b/src/base/io/AutoReadStream.cpp
@@ -8,7 +8,7 @@ namespace cru::io {
AutoReadStream::AutoReadStream(Stream* stream, bool auto_close,
bool auto_delete,
const AutoReadStreamOptions& options)
- : Stream(false, true, stream->CanSeek()),
+ : Stream(false, true, std::nullopt),
auto_close_(auto_close),
auto_delete_(auto_delete) {
auto buffer_stream_options = options.GetBufferStreamOptions();
@@ -19,10 +19,15 @@ AutoReadStream::AutoReadStream(Stream* stream, bool auto_close,
}
AutoReadStream::~AutoReadStream() {
- DoClose();
+ if (auto_delete_) {
+ delete stream_;
+ }
+ buffer_stream_->Close();
background_thread_.join();
}
+bool AutoReadStream::DoCanWrite() { return stream_->CanWrite(); }
+
Index AutoReadStream::DoRead(std::byte* buffer, Index offset, Index size) {
return buffer_stream_->Read(buffer, offset, size);
}
@@ -35,14 +40,9 @@ Index AutoReadStream::DoWrite(const std::byte* buffer, Index offset,
void AutoReadStream::DoFlush() { stream_->Flush(); }
void AutoReadStream::DoClose() {
- CRU_STREAM_BEGIN_CLOSE
if (auto_close_) {
stream_->Close();
}
- if (auto_delete_) {
- delete stream_;
- stream_ = nullptr;
- }
buffer_stream_->Close();
}
@@ -51,14 +51,13 @@ void AutoReadStream::BackgroundThreadRun() {
while (true) {
try {
auto read = stream_->Read(buffer.data(), buffer.size());
- if (read == 0) {
- buffer_stream_->SetEof();
+ if (read == kEOF) {
+ buffer_stream_->WriteEof();
break;
} else {
buffer_stream_->Write(buffer.data(), read);
}
- } catch (const StreamClosedException& exception) {
- buffer_stream_->Close();
+ } catch (const StreamException&) {
break;
}
}
diff --git a/src/base/io/BufferStream.cpp b/src/base/io/BufferStream.cpp
index 0dbb438b..fdcd25ff 100644
--- a/src/base/io/BufferStream.cpp
+++ b/src/base/io/BufferStream.cpp
@@ -6,35 +6,34 @@ BufferStream::BufferStream(const BufferStreamOptions& options)
: Stream(false, true, true) {
block_size_ = options.GetBlockSizeOrDefault();
max_block_count_ = options.GetMaxBlockCount();
-
- eof_ = false;
+ eof_written_ = false;
}
-BufferStream::~BufferStream() { DoClose(); }
+BufferStream::~BufferStream() {}
Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) {
std::unique_lock lock(mutex_);
- condition_variable_.wait(
- lock, [this] { return GetClosed() || !buffer_list_.empty() || eof_; });
+ read_cv_.wait(lock, [this] {
+ return eof_written_ || !buffer_list_.empty() || IsClosed();
+ });
- if (GetClosed()) {
- StreamClosedException::Check(true);
- }
+ CheckClosed();
- if (buffer_list_.empty() && eof_) {
- return 0;
+ if (buffer_list_.empty() && eof_written_) {
+ return kEOF;
}
- auto full = max_block_count_ > 0 && buffer_list_.size() == max_block_count_;
+ auto full_previously = max_block_count_ > 0 &&
+ buffer_list_.size() == max_block_count_ &&
+ buffer_list_.back().IsFull();
Index read = 0;
while (!buffer_list_.empty()) {
auto& stream_buffer = buffer_list_.front();
- auto this_read =
- stream_buffer.PopFront(buffer + offset + read, size - read);
- if (stream_buffer.GetUsedSize() == 0) {
+ auto this_read = stream_buffer.Read(buffer + offset + read, size - read);
+ if (stream_buffer.IsEmpty()) {
buffer_list_.pop_front();
}
read += this_read;
@@ -43,10 +42,8 @@ Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) {
}
}
- if (full && buffer_list_.size() < max_block_count_) {
- // By convention, there should be at most one producer waiting. So
- // notify_one and notify_all should be the same.
- condition_variable_.notify_one();
+ if (full_previously && buffer_list_.size() < max_block_count_) {
+ write_cv_.notify_all();
}
return read;
@@ -55,69 +52,109 @@ Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) {
Index BufferStream::DoWrite(const std::byte* buffer, Index offset, Index size) {
std::unique_lock lock(mutex_);
- if (eof_) {
- throw WriteAfterEofException(
- "Stream has been set eof. Can't write to it any more.");
- }
-
- condition_variable_.wait(lock, [this] {
- return GetClosed() || max_block_count_ <= 0 ||
+ write_cv_.wait(lock, [this] {
+ return eof_written_ || max_block_count_ <= 0 ||
buffer_list_.size() < max_block_count_ ||
- buffer_list_.back().GetBackFree() > 0;
+ !buffer_list_.back().IsFull() || IsClosed();
});
- if (GetClosed()) {
- StreamClosedException::Check(true);
+ CheckClosed();
+
+ if (eof_written_) {
+ throw StreamIOException(
+ this, "Stream has been set eof. Can't write to it any more.");
}
- auto empty = buffer_list_.empty();
+ auto empty_previously = buffer_list_.empty();
Index written = 0;
- if (empty) {
- buffer_list_.push_back(Buffer(block_size_));
+ if (empty_previously) {
+ buffer_list_.push_back(Block(block_size_));
}
while (true) {
- if (buffer_list_.back().GetBackFree() == 0) {
+ if (buffer_list_.back().IsFull()) {
if (max_block_count_ > 0 && buffer_list_.size() == max_block_count_) {
break;
}
- buffer_list_.push_back(Buffer(block_size_));
+ buffer_list_.push_back(Block(block_size_));
}
auto& stream_buffer = buffer_list_.back();
auto this_written =
- stream_buffer.PushBack(buffer + offset + written, size - written);
+ stream_buffer.Write(buffer + offset + written, size - written);
written += this_written;
if (written == size) {
break;
}
}
- if (empty) {
- // By convention, there should be at most one consumer waiting. So
- // notify_one and notify_all should be the same.
- condition_variable_.notify_one();
+ if (empty_previously) {
+ read_cv_.notify_all();
}
return written;
}
-void BufferStream::SetEof() {
+void BufferStream::DoClose() {
std::unique_lock lock(mutex_);
- eof_ = true;
- if (buffer_list_.empty()) {
- // By convention, there should be at most one consumer waiting. So
- // notify_one and notify_all should be the same.
- condition_variable_.notify_one();
+ buffer_list_.clear();
+ read_cv_.notify_all();
+ write_cv_.notify_all();
+}
+
+void BufferStream::WriteEof() {
+ std::unique_lock lock(mutex_);
+
+ eof_written_ = true;
+ read_cv_.notify_all();
+ write_cv_.notify_all();
+}
+
+BufferStream::Block::Block(Index size)
+ : buffer(new std::byte[size]), size(size), start(0), end(0) {}
+
+BufferStream::Block::Block(Block&& other) noexcept
+ : buffer(other.buffer),
+ size(other.size),
+ start(other.start),
+ end(other.end) {
+ other.buffer = nullptr;
+ other.size = other.start = other.end = 0;
+}
+
+BufferStream::Block& BufferStream::Block::operator=(Block&& other) noexcept {
+ if (this != &other) {
+ delete[] buffer;
+ buffer = other.buffer;
+ size = other.size;
+ start = other.start;
+ end = other.end;
+ other.buffer = nullptr;
+ other.size = other.start = other.end = 0;
}
+ return *this;
}
-void BufferStream::DoClose() {
- CRU_STREAM_BEGIN_CLOSE
- SetClosed(true);
- condition_variable_.notify_all();
- buffer_list_.clear();
+BufferStream::Block::~Block() { delete[] buffer; }
+
+Index BufferStream::Block::Read(std::byte* des, Index si) {
+ si = std::min(si, end - start);
+ std::memcpy(des, buffer + start, si);
+ start += si;
+ return si;
+}
+
+Index BufferStream::Block::Write(const std::byte* src, Index si) {
+ si = std::min(si, size - end);
+ std::memcpy(buffer + end, src, si);
+ end += si;
+ return si;
}
+
+bool BufferStream::Block::IsFull() const { return end == size; }
+
+bool BufferStream::Block::IsEmpty() const { return end == start; }
+
} // namespace cru::io
diff --git a/src/base/io/CFileStream.cpp b/src/base/io/CFileStream.cpp
index db477077..9632cba9 100644
--- a/src/base/io/CFileStream.cpp
+++ b/src/base/io/CFileStream.cpp
@@ -27,7 +27,8 @@ CFileStream::CFileStream(const char* path, const char* mode)
file_(std::fopen(path, mode)),
auto_close_(true) {
if (file_ == nullptr) {
- throw ErrnoException("Cannot open file.");
+ throw StreamIOException(this, "fopen failed.",
+ std::make_shared<ErrnoException>());
}
}
@@ -40,7 +41,7 @@ CFileStream::CFileStream(std::FILE* file, bool readable, bool writable,
}
CFileStream::~CFileStream() {
- if (auto_close_ && file_ != nullptr) {
+ if (file_ && auto_close_) {
std::fclose(file_);
}
}
@@ -60,7 +61,8 @@ static int ConvertOriginFlag(Stream::SeekOrigin origin) {
Index CFileStream::DoSeek(Index offset, SeekOrigin origin) {
if (std::fseek(file_, offset, ConvertOriginFlag(origin))) {
- throw ErrnoException("Seek failed.");
+ throw StreamIOException(this, "fseek failed.",
+ std::make_shared<ErrnoException>());
}
return DoTell();
}
@@ -68,7 +70,8 @@ Index CFileStream::DoSeek(Index offset, SeekOrigin origin) {
Index CFileStream::DoTell() {
long position = std::ftell(file_);
if (position == -1) {
- throw ErrnoException("Tell failed.");
+ throw StreamIOException(this, "ftell failed.",
+ std::make_shared<ErrnoException>());
}
return position;
}
@@ -77,20 +80,28 @@ void CFileStream::DoRewind() { std::rewind(file_); }
Index CFileStream::DoRead(std::byte* buffer, Index offset, Index size) {
auto count = std::fread(buffer + offset, 1, size, file_);
+ if (std::ferror(file_)) {
+ throw StreamIOException(this, "Error occurred when reading C FILE.");
+ }
+ if (count == 0 && std::feof(file_)) {
+ return kEOF;
+ }
return count;
}
Index CFileStream::DoWrite(const std::byte* buffer, Index offset, Index size) {
auto count = std::fwrite(buffer + offset, 1, size, file_);
+ if (std::ferror(file_)) {
+ throw StreamIOException(this, "Error occurred when writing C FILE.");
+ }
return count;
}
void CFileStream::DoFlush() { std::fflush(file_); }
void CFileStream::DoClose() {
- CRU_STREAM_BEGIN_CLOSE
if (auto_close_ && !std::fclose(file_)) {
- throw Exception("Failed to close FILE.");
+ throw StreamIOException(this, "fclose failed.");
}
file_ = nullptr;
}
diff --git a/src/base/io/MemoryStream.cpp b/src/base/io/MemoryStream.cpp
index 4d289197..555526cd 100644
--- a/src/base/io/MemoryStream.cpp
+++ b/src/base/io/MemoryStream.cpp
@@ -20,9 +20,11 @@ MemoryStream::MemoryStream(
}
}
-MemoryStream::~MemoryStream() {}
-
-void MemoryStream::Close() { DoClose(); }
+MemoryStream::~MemoryStream() {
+ if (buffer_ && release_func_) {
+ release_func_(buffer_, size_);
+ }
+}
Index MemoryStream::DoSeek(Index offset, SeekOrigin origin) {
switch (origin) {
@@ -40,6 +42,10 @@ Index MemoryStream::DoSeek(Index offset, SeekOrigin origin) {
}
Index MemoryStream::DoRead(std::byte* buffer, Index offset, Index size) {
+ if (position_ == size_) {
+ return kEOF;
+ }
+
if (position_ + size > size_) {
size = size_ - position_;
}
@@ -64,10 +70,11 @@ Index MemoryStream::DoWrite(const std::byte* buffer, Index offset, Index size) {
}
void MemoryStream::DoClose() {
- CRU_STREAM_BEGIN_CLOSE
- release_func_(buffer_, size_);
+ if (release_func_) {
+ release_func_(buffer_, size_);
+ release_func_ = {};
+ }
buffer_ = nullptr;
- release_func_ = {};
}
} // namespace cru::io
diff --git a/src/base/io/ProxyStream.cpp b/src/base/io/ProxyStream.cpp
deleted file mode 100644
index de66169e..00000000
--- a/src/base/io/ProxyStream.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-#include "cru/base/io/ProxyStream.h"
-#include "cru/base/io/Stream.h"
-
-namespace cru::io {
-ProxyStream::ProxyStream(ProxyStreamHandlers handlers)
- : Stream(static_cast<bool>(handlers.seek), static_cast<bool>(handlers.read),
- static_cast<bool>(handlers.write)),
- handlers_(std::move(handlers)) {}
-
-ProxyStream::~ProxyStream() { DoClose(); }
-
-Index ProxyStream::DoSeek(Index offset, SeekOrigin origin) {
- return handlers_.seek(offset, origin);
-}
-
-Index ProxyStream::DoRead(std::byte* buffer, Index offset, Index size) {
- return handlers_.read(buffer, offset, size);
-}
-
-Index ProxyStream::DoWrite(const std::byte* buffer, Index offset, Index size) {
- return handlers_.write(buffer, offset, size);
-}
-
-void ProxyStream::DoFlush() {
- if (handlers_.flush) {
- handlers_.flush();
- }
-}
-
-void ProxyStream::DoClose() {
- CRU_STREAM_BEGIN_CLOSE
- if (handlers_.close) {
- handlers_.close();
- }
- handlers_ = {};
-}
-} // namespace cru::io
diff --git a/src/base/io/Stream.cpp b/src/base/io/Stream.cpp
index c7286241..b0a9ab0c 100644
--- a/src/base/io/Stream.cpp
+++ b/src/base/io/Stream.cpp
@@ -1,33 +1,34 @@
#include "cru/base/io/Stream.h"
-#include <algorithm>
+#include <atomic>
#include <format>
-#include <iterator>
+#include <ranges>
#include <utility>
namespace cru::io {
+StreamException::StreamException(Stream* stream, std::string message,
+ std::shared_ptr<std::exception> inner)
+ : Exception(std::move(message), std::move(inner)), stream_(stream) {}
+
StreamOperationNotSupportedException::StreamOperationNotSupportedException(
- std::string operation)
- : Exception(std::format("Stream operation {} not supported.", operation)),
+ Stream* stream, std::string operation)
+ : StreamException(
+ stream, std::format("Stream operation {} not supported.", operation)),
operation_(std::move(operation)) {}
-void StreamOperationNotSupportedException::CheckSeek(bool seekable) {
- if (!seekable) throw StreamOperationNotSupportedException("seek");
-}
-
-void StreamOperationNotSupportedException::CheckRead(bool readable) {
- if (!readable) throw StreamOperationNotSupportedException("read");
+void StreamOperationNotSupportedException::CheckSeek(Stream* stream,
+ bool seekable) {
+ if (!seekable) throw StreamOperationNotSupportedException(stream, "seek");
}
-void StreamOperationNotSupportedException::CheckWrite(bool writable) {
- if (!writable) throw StreamOperationNotSupportedException("write");
+void StreamOperationNotSupportedException::CheckRead(Stream* stream,
+ bool readable) {
+ if (!readable) throw StreamOperationNotSupportedException(stream, "read");
}
-StreamClosedException::StreamClosedException()
- : Exception("Stream is already closed.") {}
-
-void StreamClosedException::Check(bool closed) {
- if (closed) throw StreamClosedException();
+void StreamOperationNotSupportedException::CheckWrite(Stream* stream,
+ bool writable) {
+ if (!writable) throw StreamOperationNotSupportedException(stream, "write");
}
Stream::Stream(SupportedOperations supported_operations)
@@ -44,7 +45,7 @@ bool Stream::CanSeek() {
Index Stream::Seek(Index offset, SeekOrigin origin) {
CheckClosed();
- StreamOperationNotSupportedException::CheckSeek(DoCanSeek());
+ StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek());
return DoSeek(offset, origin);
}
@@ -70,7 +71,7 @@ bool Stream::CanRead() {
Index Stream::Read(std::byte* buffer, Index offset, Index size) {
CheckClosed();
- StreamOperationNotSupportedException::CheckRead(DoCanRead());
+ StreamOperationNotSupportedException::CheckRead(this, DoCanRead());
return DoRead(buffer, offset, size);
}
@@ -93,7 +94,7 @@ bool Stream::CanWrite() {
Index Stream::Write(const std::byte* buffer, Index offset, Index size) {
CheckClosed();
- StreamOperationNotSupportedException::CheckWrite(DoCanWrite());
+ StreamOperationNotSupportedException::CheckWrite(this, DoCanWrite());
return DoWrite(buffer, offset, size);
}
@@ -114,6 +115,46 @@ void Stream::Flush() {
DoFlush();
}
+bool Stream::IsClosed() { return closed_.load(std::memory_order_acquire); }
+
+bool Stream::Close() {
+ bool expected = false;
+ if (closed_.compare_exchange_strong(expected, true,
+ std::memory_order_acq_rel)) {
+ DoClose();
+ }
+ return expected;
+}
+
+std::vector<std::byte> Stream::ReadToEnd(Index grow_size) {
+ std::vector<std::byte> buffer;
+ Index pos = 0;
+ while (true) {
+ if (pos == buffer.size()) {
+ buffer.resize(buffer.size() + grow_size);
+ }
+
+ auto read = Read(buffer.data(), pos, buffer.size() - pos);
+ if (read == kEOF) {
+ break;
+ }
+ pos += read;
+ }
+ buffer.resize(pos);
+ return buffer;
+}
+
+std::string Stream::ReadToEndAsUtf8String() {
+ auto buffer = ReadToEnd();
+ return std::views::transform(
+ buffer, [](std::byte c) { return static_cast<char>(c); }) |
+ std::ranges::to<std::string>();
+}
+
+void Stream::SetSupportedOperations(SupportedOperations supported_operations) {
+ supported_operations_ = std::move(supported_operations);
+}
+
bool Stream::DoCanSeek() {
if (supported_operations_.can_seek) {
return *supported_operations_.can_seek;
@@ -149,17 +190,17 @@ Index Stream::DoSeek(Index offset, SeekOrigin origin) {
}
Index Stream::DoTell() {
- StreamOperationNotSupportedException::CheckSeek(DoCanSeek());
+ StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek());
return DoSeek(0, SeekOrigin::Current);
}
void Stream::DoRewind() {
- StreamOperationNotSupportedException::CheckSeek(DoCanSeek());
+ StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek());
DoSeek(0, SeekOrigin::Begin);
}
Index Stream::DoGetSize() {
- StreamOperationNotSupportedException::CheckSeek(DoCanSeek());
+ StreamOperationNotSupportedException::CheckSeek(this, DoCanSeek());
Index current_position = DoTell();
Seek(0, SeekOrigin::End);
Index size = DoTell();
@@ -177,27 +218,12 @@ Index Stream::DoWrite(const std::byte* buffer, Index offset, Index size) {
void Stream::DoFlush() {}
-Buffer Stream::ReadToEnd(Index grow_size) {
- Buffer buffer(grow_size);
- while (true) {
- auto read = Read(buffer.GetUsedEndPtr(), buffer.GetBackFree());
- buffer.PushBackCount(read);
- if (read == 0) {
- break;
- }
- if (buffer.IsUsedReachEnd()) {
- buffer.ResizeBuffer(buffer.GetBufferSize() + grow_size, true);
- }
+void Stream::DoClose() {}
+
+void Stream::CheckClosed() {
+ if (IsClosed()) {
+ throw StreamClosedException(this, "Stream is closed.");
}
- return buffer;
}
-std::string Stream::ReadToEndAsUtf8String() {
- auto buffer = ReadToEnd();
- std::string result;
- std::transform(buffer.GetUsedBeginPtr(), buffer.GetUsedEndPtr(),
- std::back_inserter(result),
- [](std::byte c) { return static_cast<char>(c); });
- return result;
-}
} // namespace cru::io