aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcrupest <crupest@outlook.com>2024-05-12 00:19:16 +0800
committercrupest <crupest@outlook.com>2024-05-17 23:55:37 +0800
commitb05c7d76dcfef6d882bc01fea4663a4c2fcdcdf2 (patch)
tree7a52140660b2c9d177513c3dadb4a689a5c45b07
parentd861d799d2f614a6b58a8f0c0bc6a8911ad0dcfd (diff)
downloadcru-b05c7d76dcfef6d882bc01fea4663a4c2fcdcdf2.tar.gz
cru-b05c7d76dcfef6d882bc01fea4663a4c2fcdcdf2.tar.bz2
cru-b05c7d76dcfef6d882bc01fea4663a4c2fcdcdf2.zip
feat: completed BufferStream.
NEED TEST: BufferStream.
-rw-r--r--include/cru/common/Buffer.h3
-rw-r--r--include/cru/common/io/BufferStream.h20
-rw-r--r--src/common/io/AutoReadStream.cpp1
-rw-r--r--src/common/io/BufferStream.cpp91
4 files changed, 83 insertions, 32 deletions
diff --git a/include/cru/common/Buffer.h b/include/cru/common/Buffer.h
index 69c47aff..5c1f7ba3 100644
--- a/include/cru/common/Buffer.h
+++ b/include/cru/common/Buffer.h
@@ -23,6 +23,9 @@ class Buffer final {
bool IsNull() const { return ptr_ == nullptr; }
bool IsUsedReachEnd() const { return used_end_ == size_; }
+ Index GetFrontFree() const { return used_begin_; }
+ Index GetBackFree() const { return size_ - used_end_; }
+
Index GetUsedBegin() const { return used_begin_; }
Index GetUsedEnd() const { return used_end_; }
diff --git a/include/cru/common/io/BufferStream.h b/include/cru/common/io/BufferStream.h
index a95b3487..b3a5e9d1 100644
--- a/include/cru/common/io/BufferStream.h
+++ b/include/cru/common/io/BufferStream.h
@@ -31,13 +31,18 @@ struct BufferStreamOptions {
* @brief Total size limit of saved data in buffer. Use default value if < 0.
* No limit if == 0.
*
- * The size will be ceil(total_size_limit / block_size). When the buffer is
+ * The size will be floor(total_size_limit / block_size). When the buffer is
* filled, it will block and wait for user to read to get free space of buffer
* to continue read.
*/
Index total_size_limit = 0;
};
+/**
+ * @brief SPSC (Single Producer Single Consumer) buffer stream.
+ *
+ * If used by multiple producer or multiple consumer, the behavior is undefined.
+ */
class BufferStream : public Stream {
public:
/**
@@ -50,7 +55,7 @@ class BufferStream : public Stream {
* Actually I have no ideas about the best value for this. May change it later
* when I get some ideas.
*/
- constexpr static Index kDefaultTotalSizeLimit = 1024;
+ constexpr static Index kDefaultTotalSizeLimit = 1024 * 1024;
public:
BufferStream(const BufferStreamOptions& options);
@@ -63,19 +68,12 @@ class BufferStream : public Stream {
bool CanRead() override;
Index Read(std::byte* buffer, Index offset, Index size) override;
- bool CanWrite() = 0;
- Index Write(const std::byte* buffer, Index offset, Index size) = 0;
-
- virtual void Flush();
-
- virtual void Close();
+ bool CanWrite() override;
+ Index Write(const std::byte* buffer, Index offset, Index size) override;
void SetEof();
private:
- bool CheckClosed();
-
- private:
Index block_size_;
Index total_size_limit_;
Index block_count_limit_;
diff --git a/src/common/io/AutoReadStream.cpp b/src/common/io/AutoReadStream.cpp
index 7cdc1268..1acf8930 100644
--- a/src/common/io/AutoReadStream.cpp
+++ b/src/common/io/AutoReadStream.cpp
@@ -31,6 +31,5 @@ void AutoReadStream::BackgroundThreadRun() {
}
stream_->Read();
}
-}
} // namespace cru::io
diff --git a/src/common/io/BufferStream.cpp b/src/common/io/BufferStream.cpp
index c08d04a5..1becc1d9 100644
--- a/src/common/io/BufferStream.cpp
+++ b/src/common/io/BufferStream.cpp
@@ -12,21 +12,16 @@ BufferStream::BufferStream(const BufferStreamOptions& options) {
eof_ = false;
}
-bool BufferStream::CanSeek() {
- CheckClosed();
- return false;
-}
+BufferStream::~BufferStream() {}
+
+bool BufferStream::CanSeek() { return false; }
Index BufferStream::Seek(Index offset, SeekOrigin origin) {
- CheckClosed();
throw StreamOperationNotSupportedException(
u"BufferStream does not support seeking.");
}
-bool BufferStream::CanRead() {
- CheckClosed();
- return true;
-}
+bool BufferStream::CanRead() { return true; }
Index BufferStream::Read(std::byte* buffer, Index offset, Index size) {
std::unique_lock lock(mutex_);
@@ -34,30 +29,86 @@ Index BufferStream::Read(std::byte* buffer, Index offset, Index size) {
condition_variable_.wait(lock,
[this] { return !buffer_list_.empty() || eof_; });
- if (eof_) {
+ if (buffer_list_.empty() && eof_) {
return 0;
}
- Index written_size = 0;
- auto current_offset = offset;
- auto rest_size = size;
+ auto full = buffer_list_.size() == block_count_limit_;
+
+ Index read = 0;
while (!buffer_list_.empty()) {
auto& stream_buffer = buffer_list_.front();
- auto this_written_size =
- stream_buffer.PopFront(buffer + current_offset, rest_size);
+ auto this_read =
+ stream_buffer.PopFront(buffer + offset + read, size - read);
if (stream_buffer.GetUsedSize() == 0) {
buffer_list_.pop_front();
}
- written_size += this_written_size;
- rest_size -= this_written_size;
- current_offset += this_written_size;
- if (rest_size == 0) {
+ read += this_read;
+ if (read == size) {
+ break;
+ }
+ }
+
+ if (full && buffer_list_.size() < block_count_limit_) {
+ // By convention, there should be at most one producer waiting. So
+ // notify_one and notify_all should be the same.
+ condition_variable_.notify_one();
+ }
+
+ return read;
+}
+
+bool BufferStream::CanWrite() { return true; }
+
+Index BufferStream::Write(const std::byte* buffer, Index offset, Index size) {
+ std::unique_lock lock(mutex_);
+
+ if (eof_) {
+ throw WriteAfterEofException(
+ u"Stream has been set eof. Can't write to it any more.");
+ }
+
+ condition_variable_.wait(lock, [this] {
+ return buffer_list_.size() < block_count_limit_ ||
+ buffer_list_.back().GetBackFree() > 0;
+ });
+
+ auto empty = buffer_list_.empty();
+
+ Index written = 0;
+
+ while (buffer_list_.size() != block_count_limit_) {
+ if (buffer_list_.back().GetBackFree() == 0) {
+ buffer_list_.push_back(Buffer(block_size_));
+ }
+ auto& stream_buffer = buffer_list_.back();
+ auto this_written =
+ stream_buffer.PushBack(buffer + offset + written, size - written);
+ written += this_written;
+ if (written == size) {
break;
}
}
- return written_size;
+ if (empty) {
+ // By convention, there should be at most one consumer waiting. So
+ // notify_one and notify_all should be the same.
+ condition_variable_.notify_one();
+ }
+
+ return written;
+}
+
+void BufferStream::SetEof() {
+ std::unique_lock lock(mutex_);
+
+ eof_ = true;
+ if (buffer_list_.empty()) {
+ // By convention, there should be at most one consumer waiting. So
+ // notify_one and notify_all should be the same.
+ condition_variable_.notify_one();
+ }
}
} // namespace cru::io