diff options
Diffstat (limited to 'include/cru/common')
-rw-r--r-- | include/cru/common/Buffer.h | 77 | ||||
-rw-r--r-- | include/cru/common/concurrent/ConcurrentQueue.h | 22 | ||||
-rw-r--r-- | include/cru/common/io/AutoReadStream.h | 7 | ||||
-rw-r--r-- | include/cru/common/io/BufferStream.h | 89 |
4 files changed, 133 insertions, 62 deletions
diff --git a/include/cru/common/Buffer.h b/include/cru/common/Buffer.h index 1fc894ae..8574cd86 100644 --- a/include/cru/common/Buffer.h +++ b/include/cru/common/Buffer.h @@ -2,10 +2,10 @@ #include "Base.h" -#include <list> - namespace cru { class Buffer final { + friend void swap(Buffer& left, Buffer& right) noexcept; + public: explicit Buffer(Index size); @@ -19,10 +19,12 @@ class Buffer final { private: Index GetBufferSize() const { return size_; } - Index GetUsedSize() const { return used_size_; } - Index GetRestSize() const { return GetBufferSize() - GetUsedSize(); } + Index GetUsedSize() const { return used_end_ - used_begin_; } bool IsNull() const { return ptr_ == nullptr; } - bool IsFull() const { return GetBufferSize() == GetUsedSize(); } + bool IsUsedReachEnd() const { return used_end_ == size_; } + + Index GetUsedBegin() const { return used_begin_; } + Index GetUsedEnd() const { return used_end_; } std::byte* GetPtr() { return GetPtrAt(0); } const std::byte* GetPtr() const { return GetPtrAt(0); } @@ -33,22 +35,23 @@ class Buffer final { std::byte& GetRefAt(Index index) { return *GetPtrAt(index); } const std::byte& GetRefAt(Index index) const { return *GetPtrAt(index); } - std::byte* GetUsedEndPtr() { return GetPtrAt(GetUsedSize()); } - const std::byte* GetUsedEndPtr() const { return GetPtrAt(GetUsedSize()); } + std::byte* GetUsedBeginPtr() { return GetPtrAt(GetUsedBegin()); } + const std::byte* GetUsedBeginPtr() const { return GetPtrAt(GetUsedBegin()); } + std::byte* GetUsedEndPtr() { return GetPtrAt(GetUsedEnd()); } + const std::byte* GetUsedEndPtr() const { return GetPtrAt(GetUsedEnd()); } std::byte GetByteAt(Index index) const { return ptr_[index]; } void SetByteAt(Index index, std::byte value) { ptr_[index] = value; } - void AssignBytes(std::byte* src, Index src_size) { - return AssignBytes(0, src, 0, src_size); + void AssignBytes(std::byte* src, Index src_size, bool use_memmove = false) { + return AssignBytes(0, src, 0, src_size, use_memmove); } - void AssignBytes(Index dst_offset, std::byte* src, Index src_size) { - return AssignBytes(dst_offset, src, 0, src_size); + void AssignBytes(Index dst_offset, std::byte* src, Index src_size, + bool use_memmove = false) { + return AssignBytes(dst_offset, src, 0, src_size, use_memmove); } void AssignBytes(Index dst_offset, std::byte* src, Index src_offset, - Index src_size); - - void SetUsedSize(Index new_size); + Index src_size, bool use_memmove = false); /** * @brief Change the size of the buffer. @@ -61,6 +64,16 @@ class Buffer final { void ResizeBuffer(Index new_size, bool preserve_used); /** + * @brief Append data to the front of used bytes and increase used size. + * @return The actual size of data saved. + * + * If there is no enough space left for new data, the rest space will be + * written and the size of it will be returned, leaving exceeded data not + * saved. + */ + Index PushFront(std::byte* other, Index other_size, bool use_memmove = false); + + /** * @brief Append data to the back of used bytes and increase used size. * @return The actual size of data saved. * @@ -68,11 +81,20 @@ class Buffer final { * written and the size of it will be returned, leaving exceeded data not * saved. */ - Index PushEnd(std::byte* other, Index other_size); + Index PushBack(std::byte* other, Index other_size, bool use_memmove = false); + + /** + * @brief Move forward the used-begin ptr. + * @return The actual size moved forward. + * + * If given size is bigger than current used size, the used size will be + * returned and set to 0. + */ + Index PopFront(Index size); /** - * @brief Decrease used data size. - * @return The actual size decreased. + * @brief Move backward the used-end ptr. + * @return The actual size moved backward. * * If given size is bigger than current used size, the used size will be * returned and set to 0. @@ -90,24 +112,9 @@ class Buffer final { private: std::byte* ptr_; Index size_; - Index used_size_; + Index used_begin_; + Index used_end_; }; -void swap(Buffer& left, Buffer& right); - -class BufferList { - public: - explicit BufferList(Index buffer_size); - - BufferList(const BufferList& other); - BufferList(BufferList&& other); - - BufferList& operator=(const BufferList& other); - BufferList& operator=(BufferList&& other); - - ~BufferList(); - - private: - std::list<Buffer> buffers_; -}; +void swap(Buffer& left, Buffer& right) noexcept; } // namespace cru diff --git a/include/cru/common/concurrent/ConcurrentQueue.h b/include/cru/common/concurrent/ConcurrentQueue.h index 4f649a41..e311d5f9 100644 --- a/include/cru/common/concurrent/ConcurrentQueue.h +++ b/include/cru/common/concurrent/ConcurrentQueue.h @@ -24,28 +24,6 @@ class ConcurrentQueue { ConcurrentQueue(const ConcurrentQueue&) = delete; ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; - ConcurrentQueue(ConcurrentQueue&& other) - : head_(other.head_), - tail_(other.tail_), - mutex_(std::move(other.mutex_)), - condition_variable_(std::move(other.condition_variable_)) { - other.head_ = nullptr; - other.tail_ = nullptr; - } - - ConcurrentQueue& operator=(ConcurrentQueue&& other) { - if (this != &other) { - head_ = other.head_; - tail_ = other.tail_; - mutex_ = std::move(other.mutex_); - condition_variable_ = std::move(other.condition_variable_); - other.head_ = nullptr; - other.tail_ = nullptr; - return *this; - } - return *this; - } - ~ConcurrentQueue() { if (head_) { auto node = head_; diff --git a/include/cru/common/io/AutoReadStream.h b/include/cru/common/io/AutoReadStream.h index 7857e8b9..e252bdff 100644 --- a/include/cru/common/io/AutoReadStream.h +++ b/include/cru/common/io/AutoReadStream.h @@ -1,13 +1,12 @@ #pragma once -#include "Stream.h" #include "../Buffer.h" +#include "Stream.h" #include <condition_variable> #include <list> #include <mutex> #include <thread> -#include <vector> namespace cru::io { struct AutoReadStreamOptions { @@ -38,9 +37,7 @@ struct AutoReadStreamOptions { */ class CRU_BASE_API AutoReadStream : public Stream { private: - class BufferBlock { - - }; + class BufferBlock {}; public: /** diff --git a/include/cru/common/io/BufferStream.h b/include/cru/common/io/BufferStream.h new file mode 100644 index 00000000..64d1bb56 --- /dev/null +++ b/include/cru/common/io/BufferStream.h @@ -0,0 +1,89 @@ +#pragma once + +#include "../Buffer.h" +#include "Stream.h" +#include "../Exception.h" + +#include <condition_variable> +#include <list> +#include <mutex> + +namespace cru::io { +class WriteAfterEofException : public Exception { + public: + using Exception::Exception; + ~WriteAfterEofException() override = default; +}; + +struct BufferStreamOptions { + /** + * @brief The size of a single buffer allocated each time new space is needed. + * Use default value if <= 0. + * + * When current buffer is full and there is no space for following data, a new + * buffer will be allocated and appended to the buffer list. Note if sum size + * of all buffers reaches the total_buffer_limit, no more buffer will be + * allocated but wait. + */ + Index block_size = 0; + + /** + * @brief Total size limit of saved data in buffer. Use default value if < 0. + * No limit if == 0. + * + * The size will be ceil(total_size_limit / block_size). When the buffer is + * filled, it will block and wait for user to read to get free space of buffer + * to continue read. + */ + Index total_size_limit = 0; +}; + +class BufferStream : public Stream { + public: + /** + * Actually I have no ideas about the best value for this. May change it later + * when I get some ideas. + */ + constexpr static Index kDefaultBlockSize = 1024; + + /** + * Actually I have no ideas about the best value for this. May change it later + * when I get some ideas. + */ + constexpr static Index kDefaultTotalSizeLimit = 1024; + + public: + BufferStream(const BufferStreamOptions& options); + + ~BufferStream() override; + + bool CanSeek() override; + Index Seek(Index offset, SeekOrigin origin = SeekOrigin::Current) override; + + bool CanRead() override; + Index Read(std::byte* buffer, Index offset, Index size) override; + + bool CanWrite() = 0; + Index Write(const std::byte* buffer, Index offset, Index size) = 0; + + virtual void Flush(); + + virtual void Close(); + + void SetEof(); + + private: + bool CheckClosed(); + + private: + Index block_size_; + Index total_size_limit_; + Index block_count_limit_; + + std::list<Buffer> buffer_list_; + bool eof_; + + std::mutex mutex_; + std::condition_variable condition_variable_; +}; +} // namespace cru::io |