aboutsummaryrefslogtreecommitdiff
path: root/src/common/io
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/io')
-rw-r--r--src/common/io/AutoReadStream.cpp1
-rw-r--r--src/common/io/BufferStream.cpp91
2 files changed, 71 insertions, 21 deletions
diff --git a/src/common/io/AutoReadStream.cpp b/src/common/io/AutoReadStream.cpp
index 7cdc1268..1acf8930 100644
--- a/src/common/io/AutoReadStream.cpp
+++ b/src/common/io/AutoReadStream.cpp
@@ -31,6 +31,5 @@ void AutoReadStream::BackgroundThreadRun() {
}
stream_->Read();
}
-}
} // namespace cru::io
diff --git a/src/common/io/BufferStream.cpp b/src/common/io/BufferStream.cpp
index c08d04a5..1becc1d9 100644
--- a/src/common/io/BufferStream.cpp
+++ b/src/common/io/BufferStream.cpp
@@ -12,21 +12,16 @@ BufferStream::BufferStream(const BufferStreamOptions& options) {
eof_ = false;
}
-bool BufferStream::CanSeek() {
- CheckClosed();
- return false;
-}
+BufferStream::~BufferStream() {}
+
+bool BufferStream::CanSeek() { return false; }
Index BufferStream::Seek(Index offset, SeekOrigin origin) {
- CheckClosed();
throw StreamOperationNotSupportedException(
u"BufferStream does not support seeking.");
}
-bool BufferStream::CanRead() {
- CheckClosed();
- return true;
-}
+bool BufferStream::CanRead() { return true; }
Index BufferStream::Read(std::byte* buffer, Index offset, Index size) {
std::unique_lock lock(mutex_);
@@ -34,30 +29,86 @@ Index BufferStream::Read(std::byte* buffer, Index offset, Index size) {
condition_variable_.wait(lock,
[this] { return !buffer_list_.empty() || eof_; });
- if (eof_) {
+ if (buffer_list_.empty() && eof_) {
return 0;
}
- Index written_size = 0;
- auto current_offset = offset;
- auto rest_size = size;
+ auto full = buffer_list_.size() == block_count_limit_;
+
+ Index read = 0;
while (!buffer_list_.empty()) {
auto& stream_buffer = buffer_list_.front();
- auto this_written_size =
- stream_buffer.PopFront(buffer + current_offset, rest_size);
+ auto this_read =
+ stream_buffer.PopFront(buffer + offset + read, size - read);
if (stream_buffer.GetUsedSize() == 0) {
buffer_list_.pop_front();
}
- written_size += this_written_size;
- rest_size -= this_written_size;
- current_offset += this_written_size;
- if (rest_size == 0) {
+ read += this_read;
+ if (read == size) {
+ break;
+ }
+ }
+
+ if (full && buffer_list_.size() < block_count_limit_) {
+ // By convention, there should be at most one producer waiting. So
+ // notify_one and notify_all should be the same.
+ condition_variable_.notify_one();
+ }
+
+ return read;
+}
+
+bool BufferStream::CanWrite() { return true; }
+
+Index BufferStream::Write(const std::byte* buffer, Index offset, Index size) {
+ std::unique_lock lock(mutex_);
+
+ if (eof_) {
+ throw WriteAfterEofException(
+ u"Stream has been set eof. Can't write to it any more.");
+ }
+
+ condition_variable_.wait(lock, [this] {
+ return buffer_list_.size() < block_count_limit_ ||
+ buffer_list_.back().GetBackFree() > 0;
+ });
+
+ auto empty = buffer_list_.empty();
+
+ Index written = 0;
+
+ while (buffer_list_.size() != block_count_limit_) {
+ if (buffer_list_.back().GetBackFree() == 0) {
+ buffer_list_.push_back(Buffer(block_size_));
+ }
+ auto& stream_buffer = buffer_list_.back();
+ auto this_written =
+ stream_buffer.PushBack(buffer + offset + written, size - written);
+ written += this_written;
+ if (written == size) {
break;
}
}
- return written_size;
+ if (empty) {
+ // By convention, there should be at most one consumer waiting. So
+ // notify_one and notify_all should be the same.
+ condition_variable_.notify_one();
+ }
+
+ return written;
+}
+
+void BufferStream::SetEof() {
+ std::unique_lock lock(mutex_);
+
+ eof_ = true;
+ if (buffer_list_.empty()) {
+ // By convention, there should be at most one consumer waiting. So
+ // notify_one and notify_all should be the same.
+ condition_variable_.notify_one();
+ }
}
} // namespace cru::io