diff options
Diffstat (limited to 'src/base/io/BufferStream.cpp')
| -rw-r--r-- | src/base/io/BufferStream.cpp | 135 |
1 files changed, 86 insertions, 49 deletions
diff --git a/src/base/io/BufferStream.cpp b/src/base/io/BufferStream.cpp index 0dbb438b..fdcd25ff 100644 --- a/src/base/io/BufferStream.cpp +++ b/src/base/io/BufferStream.cpp @@ -6,35 +6,34 @@ BufferStream::BufferStream(const BufferStreamOptions& options) : Stream(false, true, true) { block_size_ = options.GetBlockSizeOrDefault(); max_block_count_ = options.GetMaxBlockCount(); - - eof_ = false; + eof_written_ = false; } -BufferStream::~BufferStream() { DoClose(); } +BufferStream::~BufferStream() {} Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) { std::unique_lock lock(mutex_); - condition_variable_.wait( - lock, [this] { return GetClosed() || !buffer_list_.empty() || eof_; }); + read_cv_.wait(lock, [this] { + return eof_written_ || !buffer_list_.empty() || IsClosed(); + }); - if (GetClosed()) { - StreamClosedException::Check(true); - } + CheckClosed(); - if (buffer_list_.empty() && eof_) { - return 0; + if (buffer_list_.empty() && eof_written_) { + return kEOF; } - auto full = max_block_count_ > 0 && buffer_list_.size() == max_block_count_; + auto full_previously = max_block_count_ > 0 && + buffer_list_.size() == max_block_count_ && + buffer_list_.back().IsFull(); Index read = 0; while (!buffer_list_.empty()) { auto& stream_buffer = buffer_list_.front(); - auto this_read = - stream_buffer.PopFront(buffer + offset + read, size - read); - if (stream_buffer.GetUsedSize() == 0) { + auto this_read = stream_buffer.Read(buffer + offset + read, size - read); + if (stream_buffer.IsEmpty()) { buffer_list_.pop_front(); } read += this_read; @@ -43,10 +42,8 @@ Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) { } } - if (full && buffer_list_.size() < max_block_count_) { - // By convention, there should be at most one producer waiting. So - // notify_one and notify_all should be the same. - condition_variable_.notify_one(); + if (full_previously && buffer_list_.size() < max_block_count_) { + write_cv_.notify_all(); } return read; @@ -55,69 +52,109 @@ Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) { Index BufferStream::DoWrite(const std::byte* buffer, Index offset, Index size) { std::unique_lock lock(mutex_); - if (eof_) { - throw WriteAfterEofException( - "Stream has been set eof. Can't write to it any more."); - } - - condition_variable_.wait(lock, [this] { - return GetClosed() || max_block_count_ <= 0 || + write_cv_.wait(lock, [this] { + return eof_written_ || max_block_count_ <= 0 || buffer_list_.size() < max_block_count_ || - buffer_list_.back().GetBackFree() > 0; + !buffer_list_.back().IsFull() || IsClosed(); }); - if (GetClosed()) { - StreamClosedException::Check(true); + CheckClosed(); + + if (eof_written_) { + throw StreamIOException( + this, "Stream has been set eof. Can't write to it any more."); } - auto empty = buffer_list_.empty(); + auto empty_previously = buffer_list_.empty(); Index written = 0; - if (empty) { - buffer_list_.push_back(Buffer(block_size_)); + if (empty_previously) { + buffer_list_.push_back(Block(block_size_)); } while (true) { - if (buffer_list_.back().GetBackFree() == 0) { + if (buffer_list_.back().IsFull()) { if (max_block_count_ > 0 && buffer_list_.size() == max_block_count_) { break; } - buffer_list_.push_back(Buffer(block_size_)); + buffer_list_.push_back(Block(block_size_)); } auto& stream_buffer = buffer_list_.back(); auto this_written = - stream_buffer.PushBack(buffer + offset + written, size - written); + stream_buffer.Write(buffer + offset + written, size - written); written += this_written; if (written == size) { break; } } - if (empty) { - // By convention, there should be at most one consumer waiting. So - // notify_one and notify_all should be the same. - condition_variable_.notify_one(); + if (empty_previously) { + read_cv_.notify_all(); } return written; } -void BufferStream::SetEof() { +void BufferStream::DoClose() { std::unique_lock lock(mutex_); - eof_ = true; - if (buffer_list_.empty()) { - // By convention, there should be at most one consumer waiting. So - // notify_one and notify_all should be the same. - condition_variable_.notify_one(); + buffer_list_.clear(); + read_cv_.notify_all(); + write_cv_.notify_all(); +} + +void BufferStream::WriteEof() { + std::unique_lock lock(mutex_); + + eof_written_ = true; + read_cv_.notify_all(); + write_cv_.notify_all(); +} + +BufferStream::Block::Block(Index size) + : buffer(new std::byte[size]), size(size), start(0), end(0) {} + +BufferStream::Block::Block(Block&& other) noexcept + : buffer(other.buffer), + size(other.size), + start(other.start), + end(other.end) { + other.buffer = nullptr; + other.size = other.start = other.end = 0; +} + +BufferStream::Block& BufferStream::Block::operator=(Block&& other) noexcept { + if (this != &other) { + delete[] buffer; + buffer = other.buffer; + size = other.size; + start = other.start; + end = other.end; + other.buffer = nullptr; + other.size = other.start = other.end = 0; } + return *this; } -void BufferStream::DoClose() { - CRU_STREAM_BEGIN_CLOSE - SetClosed(true); - condition_variable_.notify_all(); - buffer_list_.clear(); +BufferStream::Block::~Block() { delete[] buffer; } + +Index BufferStream::Block::Read(std::byte* des, Index si) { + si = std::min(si, end - start); + std::memcpy(des, buffer + start, si); + start += si; + return si; +} + +Index BufferStream::Block::Write(const std::byte* src, Index si) { + si = std::min(si, size - end); + std::memcpy(buffer + end, src, si); + end += si; + return si; } + +bool BufferStream::Block::IsFull() const { return end == size; } + +bool BufferStream::Block::IsEmpty() const { return end == start; } + } // namespace cru::io |
