aboutsummaryrefslogtreecommitdiff
path: root/include/cru/base/io/BufferStream.h
diff options
context:
space:
mode:
authorYuqian Yang <crupest@crupest.life>2026-03-07 20:42:37 +0800
committerYuqian Yang <crupest@crupest.life>2026-03-07 20:42:37 +0800
commit38756822825e20eca3b9e01b735946175223d692 (patch)
treefc2a495bfc0e082d5ed9a1642278ae6467fe2742 /include/cru/base/io/BufferStream.h
parent924f4b472712d0cfc55b81dcb3eaed3f8a478288 (diff)
downloadcru-38756822825e20eca3b9e01b735946175223d692.tar.gz
cru-38756822825e20eca3b9e01b735946175223d692.tar.bz2
cru-38756822825e20eca3b9e01b735946175223d692.zip
Refactor stream.
Diffstat (limited to 'include/cru/base/io/BufferStream.h')
-rw-r--r--include/cru/base/io/BufferStream.h47
1 files changed, 28 insertions, 19 deletions
diff --git a/include/cru/base/io/BufferStream.h b/include/cru/base/io/BufferStream.h
index d4ee3837..c8e8f707 100644
--- a/include/cru/base/io/BufferStream.h
+++ b/include/cru/base/io/BufferStream.h
@@ -1,6 +1,5 @@
#pragma once
-#include "../Buffer.h"
#include "Stream.h"
#include <condition_variable>
@@ -8,12 +7,6 @@
#include <mutex>
namespace cru::io {
-class WriteAfterEofException : public Exception {
- public:
- using Exception::Exception;
- ~WriteAfterEofException() override = default;
-};
-
struct BufferStreamOptions {
/**
* Actually I have no ideas about the best value for this. May change it later
@@ -51,34 +44,50 @@ struct BufferStreamOptions {
};
/**
- * @brief SPSC (Single Producer Single Consumer) buffer stream.
- *
- * If used by multiple producer or multiple consumer, the behavior is undefined.
+ * @brief MPMC (Multiple Producer Multiple Consumer) buffer stream.
*/
class BufferStream : public Stream {
public:
- BufferStream(const BufferStreamOptions& options);
+ explicit BufferStream(const BufferStreamOptions& options = {});
~BufferStream() override;
- CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
-
- void SetEof();
+ void WriteEof();
protected:
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
+ void DoClose() override;
private:
- void DoClose();
+ struct Block {
+ std::byte* buffer;
+ Index size;
+ Index start;
+ Index end;
+
+ explicit Block(Index size);
+
+ CRU_DELETE_COPY(Block)
+
+ Block(Block&& other) noexcept;
+ Block& operator=(Block&& other) noexcept;
+
+ ~Block();
+
+ Index Read(std::byte* des, Index si);
+ Index Write(const std::byte* src, Index si);
+ bool IsFull() const;
+ bool IsEmpty() const;
+ };
- private:
Index block_size_;
Index max_block_count_;
- std::list<Buffer> buffer_list_;
- bool eof_;
+ std::list<Block> buffer_list_;
+ bool eof_written_;
std::mutex mutex_;
- std::condition_variable condition_variable_;
+ std::condition_variable read_cv_;
+ std::condition_variable write_cv_;
};
} // namespace cru::io