blob: e3d7fda9fb69cc0346b2f790b45bcdbc2469bb42 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
#pragma once
#include "Stream.h"
#include <condition_variable>
#include <list>
#include <mutex>
namespace cru::io {
struct BufferStreamOptions {
/**
* Actually I have no ideas about the best value for this. May change it later
* when I get some ideas.
*/
constexpr static Index kDefaultBlockSize = 1024;
/**
* @brief The size of a single buffer allocated each time new space is needed.
* Use default value if <= 0.
*
* When current buffer is full and there is no space for following data, a new
* buffer will be allocated and appended to the buffer list. Note if sum size
* of all buffers reaches the total_buffer_limit, no more buffer will be
* allocated but wait.
*/
Index block_size = 0;
/**
* @brief Total size limit of saved data in buffer. No limit if <= 0.
*
* The size will be floor(total_size_limit / block_size). When the buffer is
* filled, it will block and wait for user to read to get free space of buffer
* to continue read.
*/
Index total_size_limit = 0;
Index GetBlockSizeOrDefault() const {
return block_size <= 0 ? kDefaultBlockSize : block_size;
}
Index GetMaxBlockCount() const {
return total_size_limit / GetBlockSizeOrDefault();
}
};
/**
* @brief MPMC (Multiple Producer Multiple Consumer) buffer stream.
*/
class CRU_BASE_API BufferStream : public Stream {
public:
explicit BufferStream(const BufferStreamOptions& options = {});
~BufferStream() override;
void WriteEof();
protected:
Index DoRead(std::byte* buffer, Index offset, Index size) override;
Index DoWrite(const std::byte* buffer, Index offset, Index size) override;
void DoClose() override;
private:
struct Block {
std::byte* buffer;
Index size;
Index start;
Index end;
explicit Block(Index size);
CRU_DELETE_COPY(Block)
Block(Block&& other) noexcept;
Block& operator=(Block&& other) noexcept;
~Block();
Index Read(std::byte* des, Index si);
Index Write(const std::byte* src, Index si);
bool IsFull() const;
bool IsEmpty() const;
};
Index block_size_;
Index max_block_count_;
std::list<Block> buffer_list_;
bool eof_written_;
std::mutex mutex_;
std::condition_variable read_cv_;
std::condition_variable write_cv_;
};
} // namespace cru::io
|