aboutsummaryrefslogtreecommitdiff
path: root/src/base/io/BufferStream.cpp
diff options
context:
space:
mode:
authorcrupest <crupest@outlook.com>2024-10-06 13:57:39 +0800
committercrupest <crupest@outlook.com>2024-10-06 13:57:39 +0800
commitdfe62dcf8bcefc523b466e127c3edc4dc2756629 (patch)
tree1c751a14ba0da07ca2ff805633f97568060aa4c9 /src/base/io/BufferStream.cpp
parentf51eb955e188858272230a990565931e7403f23b (diff)
downloadcru-dfe62dcf8bcefc523b466e127c3edc4dc2756629.tar.gz
cru-dfe62dcf8bcefc523b466e127c3edc4dc2756629.tar.bz2
cru-dfe62dcf8bcefc523b466e127c3edc4dc2756629.zip
Rename common to base.
Diffstat (limited to 'src/base/io/BufferStream.cpp')
-rw-r--r--src/base/io/BufferStream.cpp109
1 files changed, 109 insertions, 0 deletions
diff --git a/src/base/io/BufferStream.cpp b/src/base/io/BufferStream.cpp
new file mode 100644
index 00000000..e81731e8
--- /dev/null
+++ b/src/base/io/BufferStream.cpp
@@ -0,0 +1,109 @@
+#include "cru/base/io/BufferStream.h"
+#include "cru/base/io/Stream.h"
+
+namespace cru::io {
+BufferStream::BufferStream(const BufferStreamOptions& options)
+ : Stream(false, true, true) {
+ block_size_ = options.GetBlockSizeOrDefault();
+ max_block_count_ = options.GetMaxBlockCount();
+
+ eof_ = false;
+}
+
+BufferStream::~BufferStream() { DoClose(); }
+
+Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) {
+ std::unique_lock lock(mutex_);
+
+ condition_variable_.wait(lock,
+ [this] { return !buffer_list_.empty() || eof_; });
+
+ if (buffer_list_.empty() && eof_) {
+ return 0;
+ }
+
+ auto full = max_block_count_ > 0 && buffer_list_.size() == max_block_count_;
+
+ Index read = 0;
+
+ while (!buffer_list_.empty()) {
+ auto& stream_buffer = buffer_list_.front();
+ auto this_read =
+ stream_buffer.PopFront(buffer + offset + read, size - read);
+ if (stream_buffer.GetUsedSize() == 0) {
+ buffer_list_.pop_front();
+ }
+ read += this_read;
+ if (read == size) {
+ break;
+ }
+ }
+
+ if (full && buffer_list_.size() < max_block_count_) {
+ // By convention, there should be at most one producer waiting. So
+ // notify_one and notify_all should be the same.
+ condition_variable_.notify_one();
+ }
+
+ return read;
+}
+
+Index BufferStream::DoWrite(const std::byte* buffer, Index offset, Index size) {
+ std::unique_lock lock(mutex_);
+
+ if (eof_) {
+ throw WriteAfterEofException(
+ u"Stream has been set eof. Can't write to it any more.");
+ }
+
+ condition_variable_.wait(lock, [this] {
+ return max_block_count_ <= 0 || buffer_list_.size() < max_block_count_ ||
+ buffer_list_.back().GetBackFree() > 0;
+ });
+
+ auto empty = buffer_list_.empty();
+
+ Index written = 0;
+
+ if (empty) {
+ buffer_list_.push_back(Buffer(block_size_));
+ }
+
+ while (true) {
+ if (buffer_list_.back().GetBackFree() == 0) {
+ if (max_block_count_ > 0 && buffer_list_.size() == max_block_count_) {
+ break;
+ }
+ buffer_list_.push_back(Buffer(block_size_));
+ }
+ auto& stream_buffer = buffer_list_.back();
+ auto this_written =
+ stream_buffer.PushBack(buffer + offset + written, size - written);
+ written += this_written;
+ if (written == size) {
+ break;
+ }
+ }
+
+ if (empty) {
+ // By convention, there should be at most one consumer waiting. So
+ // notify_one and notify_all should be the same.
+ condition_variable_.notify_one();
+ }
+
+ return written;
+}
+
+void BufferStream::SetEof() {
+ std::unique_lock lock(mutex_);
+
+ eof_ = true;
+ if (buffer_list_.empty()) {
+ // By convention, there should be at most one consumer waiting. So
+ // notify_one and notify_all should be the same.
+ condition_variable_.notify_one();
+ }
+}
+
+void BufferStream::DoClose() { CRU_STREAM_BEGIN_CLOSE }
+} // namespace cru::io