diff options
Diffstat (limited to 'src/common/io/BufferStream.cpp')
-rw-r--r-- | src/common/io/BufferStream.cpp | 91 |
1 files changed, 71 insertions, 20 deletions
diff --git a/src/common/io/BufferStream.cpp b/src/common/io/BufferStream.cpp index c08d04a5..1becc1d9 100644 --- a/src/common/io/BufferStream.cpp +++ b/src/common/io/BufferStream.cpp @@ -12,21 +12,16 @@ BufferStream::BufferStream(const BufferStreamOptions& options) { eof_ = false; } -bool BufferStream::CanSeek() { - CheckClosed(); - return false; -} +BufferStream::~BufferStream() {} + +bool BufferStream::CanSeek() { return false; } Index BufferStream::Seek(Index offset, SeekOrigin origin) { - CheckClosed(); throw StreamOperationNotSupportedException( u"BufferStream does not support seeking."); } -bool BufferStream::CanRead() { - CheckClosed(); - return true; -} +bool BufferStream::CanRead() { return true; } Index BufferStream::Read(std::byte* buffer, Index offset, Index size) { std::unique_lock lock(mutex_); @@ -34,30 +29,86 @@ Index BufferStream::Read(std::byte* buffer, Index offset, Index size) { condition_variable_.wait(lock, [this] { return !buffer_list_.empty() || eof_; }); - if (eof_) { + if (buffer_list_.empty() && eof_) { return 0; } - Index written_size = 0; - auto current_offset = offset; - auto rest_size = size; + auto full = buffer_list_.size() == block_count_limit_; + + Index read = 0; while (!buffer_list_.empty()) { auto& stream_buffer = buffer_list_.front(); - auto this_written_size = - stream_buffer.PopFront(buffer + current_offset, rest_size); + auto this_read = + stream_buffer.PopFront(buffer + offset + read, size - read); if (stream_buffer.GetUsedSize() == 0) { buffer_list_.pop_front(); } - written_size += this_written_size; - rest_size -= this_written_size; - current_offset += this_written_size; - if (rest_size == 0) { + read += this_read; + if (read == size) { + break; + } + } + + if (full && buffer_list_.size() < block_count_limit_) { + // By convention, there should be at most one producer waiting. So + // notify_one and notify_all should be the same. + condition_variable_.notify_one(); + } + + return read; +} + +bool BufferStream::CanWrite() { return true; } + +Index BufferStream::Write(const std::byte* buffer, Index offset, Index size) { + std::unique_lock lock(mutex_); + + if (eof_) { + throw WriteAfterEofException( + u"Stream has been set eof. Can't write to it any more."); + } + + condition_variable_.wait(lock, [this] { + return buffer_list_.size() < block_count_limit_ || + buffer_list_.back().GetBackFree() > 0; + }); + + auto empty = buffer_list_.empty(); + + Index written = 0; + + while (buffer_list_.size() != block_count_limit_) { + if (buffer_list_.back().GetBackFree() == 0) { + buffer_list_.push_back(Buffer(block_size_)); + } + auto& stream_buffer = buffer_list_.back(); + auto this_written = + stream_buffer.PushBack(buffer + offset + written, size - written); + written += this_written; + if (written == size) { break; } } - return written_size; + if (empty) { + // By convention, there should be at most one consumer waiting. So + // notify_one and notify_all should be the same. + condition_variable_.notify_one(); + } + + return written; +} + +void BufferStream::SetEof() { + std::unique_lock lock(mutex_); + + eof_ = true; + if (buffer_list_.empty()) { + // By convention, there should be at most one consumer waiting. So + // notify_one and notify_all should be the same. + condition_variable_.notify_one(); + } } } // namespace cru::io |