aboutsummaryrefslogtreecommitdiff
path: root/src/base/io/BufferStream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/base/io/BufferStream.cpp')
-rw-r--r--src/base/io/BufferStream.cpp135
1 files changed, 86 insertions, 49 deletions
diff --git a/src/base/io/BufferStream.cpp b/src/base/io/BufferStream.cpp
index 0dbb438b..fdcd25ff 100644
--- a/src/base/io/BufferStream.cpp
+++ b/src/base/io/BufferStream.cpp
@@ -6,35 +6,34 @@ BufferStream::BufferStream(const BufferStreamOptions& options)
: Stream(false, true, true) {
block_size_ = options.GetBlockSizeOrDefault();
max_block_count_ = options.GetMaxBlockCount();
-
- eof_ = false;
+ eof_written_ = false;
}
-BufferStream::~BufferStream() { DoClose(); }
+BufferStream::~BufferStream() {}
Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) {
std::unique_lock lock(mutex_);
- condition_variable_.wait(
- lock, [this] { return GetClosed() || !buffer_list_.empty() || eof_; });
+ read_cv_.wait(lock, [this] {
+ return eof_written_ || !buffer_list_.empty() || IsClosed();
+ });
- if (GetClosed()) {
- StreamClosedException::Check(true);
- }
+ CheckClosed();
- if (buffer_list_.empty() && eof_) {
- return 0;
+ if (buffer_list_.empty() && eof_written_) {
+ return kEOF;
}
- auto full = max_block_count_ > 0 && buffer_list_.size() == max_block_count_;
+ auto full_previously = max_block_count_ > 0 &&
+ buffer_list_.size() == max_block_count_ &&
+ buffer_list_.back().IsFull();
Index read = 0;
while (!buffer_list_.empty()) {
auto& stream_buffer = buffer_list_.front();
- auto this_read =
- stream_buffer.PopFront(buffer + offset + read, size - read);
- if (stream_buffer.GetUsedSize() == 0) {
+ auto this_read = stream_buffer.Read(buffer + offset + read, size - read);
+ if (stream_buffer.IsEmpty()) {
buffer_list_.pop_front();
}
read += this_read;
@@ -43,10 +42,8 @@ Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) {
}
}
- if (full && buffer_list_.size() < max_block_count_) {
- // By convention, there should be at most one producer waiting. So
- // notify_one and notify_all should be the same.
- condition_variable_.notify_one();
+ if (full_previously && buffer_list_.size() < max_block_count_) {
+ write_cv_.notify_all();
}
return read;
@@ -55,69 +52,109 @@ Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) {
Index BufferStream::DoWrite(const std::byte* buffer, Index offset, Index size) {
std::unique_lock lock(mutex_);
- if (eof_) {
- throw WriteAfterEofException(
- "Stream has been set eof. Can't write to it any more.");
- }
-
- condition_variable_.wait(lock, [this] {
- return GetClosed() || max_block_count_ <= 0 ||
+ write_cv_.wait(lock, [this] {
+ return eof_written_ || max_block_count_ <= 0 ||
buffer_list_.size() < max_block_count_ ||
- buffer_list_.back().GetBackFree() > 0;
+ !buffer_list_.back().IsFull() || IsClosed();
});
- if (GetClosed()) {
- StreamClosedException::Check(true);
+ CheckClosed();
+
+ if (eof_written_) {
+ throw StreamIOException(
+ this, "Stream has been set eof. Can't write to it any more.");
}
- auto empty = buffer_list_.empty();
+ auto empty_previously = buffer_list_.empty();
Index written = 0;
- if (empty) {
- buffer_list_.push_back(Buffer(block_size_));
+ if (empty_previously) {
+ buffer_list_.push_back(Block(block_size_));
}
while (true) {
- if (buffer_list_.back().GetBackFree() == 0) {
+ if (buffer_list_.back().IsFull()) {
if (max_block_count_ > 0 && buffer_list_.size() == max_block_count_) {
break;
}
- buffer_list_.push_back(Buffer(block_size_));
+ buffer_list_.push_back(Block(block_size_));
}
auto& stream_buffer = buffer_list_.back();
auto this_written =
- stream_buffer.PushBack(buffer + offset + written, size - written);
+ stream_buffer.Write(buffer + offset + written, size - written);
written += this_written;
if (written == size) {
break;
}
}
- if (empty) {
- // By convention, there should be at most one consumer waiting. So
- // notify_one and notify_all should be the same.
- condition_variable_.notify_one();
+ if (empty_previously) {
+ read_cv_.notify_all();
}
return written;
}
-void BufferStream::SetEof() {
+void BufferStream::DoClose() {
std::unique_lock lock(mutex_);
- eof_ = true;
- if (buffer_list_.empty()) {
- // By convention, there should be at most one consumer waiting. So
- // notify_one and notify_all should be the same.
- condition_variable_.notify_one();
+ buffer_list_.clear();
+ read_cv_.notify_all();
+ write_cv_.notify_all();
+}
+
+void BufferStream::WriteEof() {
+ std::unique_lock lock(mutex_);
+
+ eof_written_ = true;
+ read_cv_.notify_all();
+ write_cv_.notify_all();
+}
+
+BufferStream::Block::Block(Index size)
+ : buffer(new std::byte[size]), size(size), start(0), end(0) {}
+
+BufferStream::Block::Block(Block&& other) noexcept
+ : buffer(other.buffer),
+ size(other.size),
+ start(other.start),
+ end(other.end) {
+ other.buffer = nullptr;
+ other.size = other.start = other.end = 0;
+}
+
+BufferStream::Block& BufferStream::Block::operator=(Block&& other) noexcept {
+ if (this != &other) {
+ delete[] buffer;
+ buffer = other.buffer;
+ size = other.size;
+ start = other.start;
+ end = other.end;
+ other.buffer = nullptr;
+ other.size = other.start = other.end = 0;
}
+ return *this;
}
-void BufferStream::DoClose() {
- CRU_STREAM_BEGIN_CLOSE
- SetClosed(true);
- condition_variable_.notify_all();
- buffer_list_.clear();
+BufferStream::Block::~Block() { delete[] buffer; }
+
+Index BufferStream::Block::Read(std::byte* des, Index si) {
+ si = std::min(si, end - start);
+ std::memcpy(des, buffer + start, si);
+ start += si;
+ return si;
+}
+
+Index BufferStream::Block::Write(const std::byte* src, Index si) {
+ si = std::min(si, size - end);
+ std::memcpy(buffer + end, src, si);
+ end += si;
+ return si;
}
+
+bool BufferStream::Block::IsFull() const { return end == size; }
+
+bool BufferStream::Block::IsEmpty() const { return end == start; }
+
} // namespace cru::io