diff options
author | crupest <crupest@outlook.com> | 2024-02-12 15:47:31 +0800 |
---|---|---|
committer | crupest <crupest@outlook.com> | 2024-03-24 20:03:58 +0800 |
commit | 944ea0e5b613d901ba834dc225eb9d379b750bd3 (patch) | |
tree | 9cdeae736bc635481b4cd3d6024d1d5d0c5072df | |
parent | b2051e28cd36c49b7c5d49b511646a3856d7eaf0 (diff) | |
download | cru-944ea0e5b613d901ba834dc225eb9d379b750bd3.tar.gz cru-944ea0e5b613d901ba834dc225eb9d379b750bd3.tar.bz2 cru-944ea0e5b613d901ba834dc225eb9d379b750bd3.zip |
WORKING: make Buffer track two sides.
-rw-r--r-- | include/cru/common/Buffer.h | 77 | ||||
-rw-r--r-- | include/cru/common/concurrent/ConcurrentQueue.h | 22 | ||||
-rw-r--r-- | include/cru/common/io/AutoReadStream.h | 7 | ||||
-rw-r--r-- | include/cru/common/io/BufferStream.h | 89 | ||||
-rw-r--r-- | src/common/Buffer.cpp | 83 | ||||
-rw-r--r-- | src/common/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/common/io/BufferStream.cpp | 45 |
7 files changed, 234 insertions, 90 deletions
diff --git a/include/cru/common/Buffer.h b/include/cru/common/Buffer.h index 1fc894ae..8574cd86 100644 --- a/include/cru/common/Buffer.h +++ b/include/cru/common/Buffer.h @@ -2,10 +2,10 @@ #include "Base.h" -#include <list> - namespace cru { class Buffer final { + friend void swap(Buffer& left, Buffer& right) noexcept; + public: explicit Buffer(Index size); @@ -19,10 +19,12 @@ class Buffer final { private: Index GetBufferSize() const { return size_; } - Index GetUsedSize() const { return used_size_; } - Index GetRestSize() const { return GetBufferSize() - GetUsedSize(); } + Index GetUsedSize() const { return used_end_ - used_begin_; } bool IsNull() const { return ptr_ == nullptr; } - bool IsFull() const { return GetBufferSize() == GetUsedSize(); } + bool IsUsedReachEnd() const { return used_end_ == size_; } + + Index GetUsedBegin() const { return used_begin_; } + Index GetUsedEnd() const { return used_end_; } std::byte* GetPtr() { return GetPtrAt(0); } const std::byte* GetPtr() const { return GetPtrAt(0); } @@ -33,22 +35,23 @@ class Buffer final { std::byte& GetRefAt(Index index) { return *GetPtrAt(index); } const std::byte& GetRefAt(Index index) const { return *GetPtrAt(index); } - std::byte* GetUsedEndPtr() { return GetPtrAt(GetUsedSize()); } - const std::byte* GetUsedEndPtr() const { return GetPtrAt(GetUsedSize()); } + std::byte* GetUsedBeginPtr() { return GetPtrAt(GetUsedBegin()); } + const std::byte* GetUsedBeginPtr() const { return GetPtrAt(GetUsedBegin()); } + std::byte* GetUsedEndPtr() { return GetPtrAt(GetUsedEnd()); } + const std::byte* GetUsedEndPtr() const { return GetPtrAt(GetUsedEnd()); } std::byte GetByteAt(Index index) const { return ptr_[index]; } void SetByteAt(Index index, std::byte value) { ptr_[index] = value; } - void AssignBytes(std::byte* src, Index src_size) { - return AssignBytes(0, src, 0, src_size); + void AssignBytes(std::byte* src, Index src_size, bool use_memmove = false) { + return AssignBytes(0, src, 0, src_size, use_memmove); } - void AssignBytes(Index dst_offset, std::byte* src, Index src_size) { - return AssignBytes(dst_offset, src, 0, src_size); + void AssignBytes(Index dst_offset, std::byte* src, Index src_size, + bool use_memmove = false) { + return AssignBytes(dst_offset, src, 0, src_size, use_memmove); } void AssignBytes(Index dst_offset, std::byte* src, Index src_offset, - Index src_size); - - void SetUsedSize(Index new_size); + Index src_size, bool use_memmove = false); /** * @brief Change the size of the buffer. @@ -61,6 +64,16 @@ class Buffer final { void ResizeBuffer(Index new_size, bool preserve_used); /** + * @brief Append data to the front of used bytes and increase used size. + * @return The actual size of data saved. + * + * If there is no enough space left for new data, the rest space will be + * written and the size of it will be returned, leaving exceeded data not + * saved. + */ + Index PushFront(std::byte* other, Index other_size, bool use_memmove = false); + + /** * @brief Append data to the back of used bytes and increase used size. * @return The actual size of data saved. * @@ -68,11 +81,20 @@ class Buffer final { * written and the size of it will be returned, leaving exceeded data not * saved. */ - Index PushEnd(std::byte* other, Index other_size); + Index PushBack(std::byte* other, Index other_size, bool use_memmove = false); + + /** + * @brief Move forward the used-begin ptr. + * @return The actual size moved forward. + * + * If given size is bigger than current used size, the used size will be + * returned and set to 0. + */ + Index PopFront(Index size); /** - * @brief Decrease used data size. - * @return The actual size decreased. + * @brief Move backward the used-end ptr. + * @return The actual size moved backward. * * If given size is bigger than current used size, the used size will be * returned and set to 0. @@ -90,24 +112,9 @@ class Buffer final { private: std::byte* ptr_; Index size_; - Index used_size_; + Index used_begin_; + Index used_end_; }; -void swap(Buffer& left, Buffer& right); - -class BufferList { - public: - explicit BufferList(Index buffer_size); - - BufferList(const BufferList& other); - BufferList(BufferList&& other); - - BufferList& operator=(const BufferList& other); - BufferList& operator=(BufferList&& other); - - ~BufferList(); - - private: - std::list<Buffer> buffers_; -}; +void swap(Buffer& left, Buffer& right) noexcept; } // namespace cru diff --git a/include/cru/common/concurrent/ConcurrentQueue.h b/include/cru/common/concurrent/ConcurrentQueue.h index 4f649a41..e311d5f9 100644 --- a/include/cru/common/concurrent/ConcurrentQueue.h +++ b/include/cru/common/concurrent/ConcurrentQueue.h @@ -24,28 +24,6 @@ class ConcurrentQueue { ConcurrentQueue(const ConcurrentQueue&) = delete; ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; - ConcurrentQueue(ConcurrentQueue&& other) - : head_(other.head_), - tail_(other.tail_), - mutex_(std::move(other.mutex_)), - condition_variable_(std::move(other.condition_variable_)) { - other.head_ = nullptr; - other.tail_ = nullptr; - } - - ConcurrentQueue& operator=(ConcurrentQueue&& other) { - if (this != &other) { - head_ = other.head_; - tail_ = other.tail_; - mutex_ = std::move(other.mutex_); - condition_variable_ = std::move(other.condition_variable_); - other.head_ = nullptr; - other.tail_ = nullptr; - return *this; - } - return *this; - } - ~ConcurrentQueue() { if (head_) { auto node = head_; diff --git a/include/cru/common/io/AutoReadStream.h b/include/cru/common/io/AutoReadStream.h index 7857e8b9..e252bdff 100644 --- a/include/cru/common/io/AutoReadStream.h +++ b/include/cru/common/io/AutoReadStream.h @@ -1,13 +1,12 @@ #pragma once -#include "Stream.h" #include "../Buffer.h" +#include "Stream.h" #include <condition_variable> #include <list> #include <mutex> #include <thread> -#include <vector> namespace cru::io { struct AutoReadStreamOptions { @@ -38,9 +37,7 @@ struct AutoReadStreamOptions { */ class CRU_BASE_API AutoReadStream : public Stream { private: - class BufferBlock { - - }; + class BufferBlock {}; public: /** diff --git a/include/cru/common/io/BufferStream.h b/include/cru/common/io/BufferStream.h new file mode 100644 index 00000000..64d1bb56 --- /dev/null +++ b/include/cru/common/io/BufferStream.h @@ -0,0 +1,89 @@ +#pragma once + +#include "../Buffer.h" +#include "Stream.h" +#include "../Exception.h" + +#include <condition_variable> +#include <list> +#include <mutex> + +namespace cru::io { +class WriteAfterEofException : public Exception { + public: + using Exception::Exception; + ~WriteAfterEofException() override = default; +}; + +struct BufferStreamOptions { + /** + * @brief The size of a single buffer allocated each time new space is needed. + * Use default value if <= 0. + * + * When current buffer is full and there is no space for following data, a new + * buffer will be allocated and appended to the buffer list. Note if sum size + * of all buffers reaches the total_buffer_limit, no more buffer will be + * allocated but wait. + */ + Index block_size = 0; + + /** + * @brief Total size limit of saved data in buffer. Use default value if < 0. + * No limit if == 0. + * + * The size will be ceil(total_size_limit / block_size). When the buffer is + * filled, it will block and wait for user to read to get free space of buffer + * to continue read. + */ + Index total_size_limit = 0; +}; + +class BufferStream : public Stream { + public: + /** + * Actually I have no ideas about the best value for this. May change it later + * when I get some ideas. + */ + constexpr static Index kDefaultBlockSize = 1024; + + /** + * Actually I have no ideas about the best value for this. May change it later + * when I get some ideas. + */ + constexpr static Index kDefaultTotalSizeLimit = 1024; + + public: + BufferStream(const BufferStreamOptions& options); + + ~BufferStream() override; + + bool CanSeek() override; + Index Seek(Index offset, SeekOrigin origin = SeekOrigin::Current) override; + + bool CanRead() override; + Index Read(std::byte* buffer, Index offset, Index size) override; + + bool CanWrite() = 0; + Index Write(const std::byte* buffer, Index offset, Index size) = 0; + + virtual void Flush(); + + virtual void Close(); + + void SetEof(); + + private: + bool CheckClosed(); + + private: + Index block_size_; + Index total_size_limit_; + Index block_count_limit_; + + std::list<Buffer> buffer_list_; + bool eof_; + + std::mutex mutex_; + std::condition_variable condition_variable_; +}; +} // namespace cru::io diff --git a/src/common/Buffer.cpp b/src/common/Buffer.cpp index 62904b17..00bbd166 100644 --- a/src/common/Buffer.cpp +++ b/src/common/Buffer.cpp @@ -1,7 +1,8 @@ #include "cru/common/Buffer.h" -#include <cstring> #include "cru/common/Exception.h" +#include <cstring> + namespace cru { namespace { void CheckSize(Index size) { @@ -15,11 +16,11 @@ Buffer::Buffer(Index size) { CheckSize(size); if (size == 0) { ptr_ = nullptr; - size_ = used_size_ = 0; + size_ = used_begin_ = used_end_ = 0; } else { ptr_ = new std::byte[size]; size_ = size; - used_size_ = 0; + used_begin_ = used_end_ = 0; } } @@ -46,17 +47,16 @@ Buffer& Buffer::operator=(Buffer&& other) noexcept { Buffer::~Buffer() { Delete_(); } void Buffer::AssignBytes(Index dst_offset, std::byte* src, Index src_offset, - Index src_size) { - std::memcpy(ptr_ + dst_offset, src + src_offset, src_size); + Index src_size, bool use_memmove) { + (use_memmove ? std::memmove : std::memcpy)(ptr_ + dst_offset, + src + src_offset, src_size); } -void Buffer::SetUsedSize(Index new_size) { used_size_ = new_size; } - void Buffer::ResizeBuffer(Index new_size, bool preserve_used) { if (new_size == 0) { Delete_(); ptr_ = nullptr; - size_ = used_size_ = 0; + size_ = used_begin_ = used_end_ = 0; return; } @@ -64,56 +64,75 @@ void Buffer::ResizeBuffer(Index new_size, bool preserve_used) { ptr_ = new std::byte[new_size]; size_ = new_size; + used_begin_ = std::min(new_size, used_begin_); + used_end_ = std::min(new_size, used_end_); if (old_ptr) { - if (preserve_used) { - auto copy_size = std::min(used_size_, new_size); - std::memcpy(ptr_, old_ptr, copy_size); - used_size_ = copy_size; + if (preserve_used && used_begin_ < used_end_) { + std::memcpy(ptr_ + used_begin_, old_ptr + used_begin_, + used_end_ - used_begin_); } delete[] old_ptr; } } -Index Buffer::PushEnd(std::byte* other, Index other_size) { - auto copy_size = std::min(GetRestSize(), other_size); +Index Buffer::PushFront(std::byte* other, Index other_size, bool use_memmove) { + auto copy_size = std::min(used_begin_, other_size); if (copy_size) { - std::memcpy(GetUsedEndPtr(), other, copy_size); - used_size_ += copy_size; + used_begin_ -= copy_size; + (use_memmove ? std::memmove : std::memcpy)(ptr_ + used_begin_, other, + copy_size); } return copy_size; } -Index Buffer::PopEnd(Index size) { - if (used_size_ < size) { - used_size_ = 0; - return used_size_; - } else { - used_size_ -= size; - return size; +Index Buffer::PushBack(std::byte* other, Index other_size, bool use_memmove) { + auto copy_size = std::min(size_ - used_end_, other_size); + + if (copy_size) { + (use_memmove ? std::memmove : std::memcpy)(ptr_ + used_end_, other, + copy_size); + used_end_ += copy_size; } + + return copy_size; +} + +Index Buffer::PopFront(Index size) { + auto move = std::min(used_begin_, size); + used_begin_ -= move; + return move; +} + +Index Buffer::PopEnd(Index size) { + auto move = std::min(size_ - used_end_, size); + used_end_ += move; + return move; } void Buffer::Copy_(const Buffer& other) { if (other.ptr_ == nullptr) { ptr_ = nullptr; - size_ = used_size_ = 0; + size_ = used_begin_ = used_end_ = 0; } else { ptr_ = new std::byte[other.size_]; size_ = other.size_; - used_size_ = other.used_size_; - std::memcpy(ptr_, other.ptr_, used_size_); + used_begin_ = other.used_begin_; + used_end_ = other.used_end_; + std::memcpy(ptr_ + used_begin_, other.ptr_ + used_begin_, + used_end_ - used_begin_); } } void Buffer::Move_(Buffer&& other) noexcept { ptr_ = other.ptr_; size_ = other.size_; - used_size_ = other.used_size_; + used_begin_ = other.used_begin_; + used_end_ = other.used_end_; other.ptr_ = nullptr; - other.size_ = other.used_size_ = 0; + other.size_ = other.used_begin_ = other.used_end_ = 0; } void Buffer::Delete_() noexcept { @@ -121,4 +140,12 @@ void Buffer::Delete_() noexcept { delete[] ptr_; } } + +void swap(Buffer& left, Buffer& right) noexcept { + using std::swap; + swap(left.ptr_, right.ptr_); + swap(left.size_, right.size_); + swap(left.used_begin_, right.used_begin_); + swap(left.used_end_, right.used_end_); +} } // namespace cru diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index bf3156ac..19feddba 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -9,6 +9,7 @@ add_library(CruBase StringUtil.cpp SubProcess.cpp io/AutoReadStream.cpp + io/BufferStream.cpp io/CFileStream.cpp io/Stream.cpp io/ProxyStream.cpp diff --git a/src/common/io/BufferStream.cpp b/src/common/io/BufferStream.cpp new file mode 100644 index 00000000..d4780f7a --- /dev/null +++ b/src/common/io/BufferStream.cpp @@ -0,0 +1,45 @@ +#include "cru/common/io/BufferStream.h" +#include <memory> +#include "cru/common/io/Stream.h" + +namespace cru::io { +BufferStream::BufferStream(const BufferStreamOptions& options) { + block_size_ = + options.block_size <= 0 ? kDefaultBlockSize : options.block_size; + total_size_limit_ = options.total_size_limit < 0 ? kDefaultTotalSizeLimit + : options.total_size_limit; + block_count_limit_ = total_size_limit_ / block_size_; + + eof_ = false; +} + +bool BufferStream::CanSeek() { + CheckClosed(); + return false; +} + +Index BufferStream::Seek(Index offset, SeekOrigin origin) { + CheckClosed(); + throw StreamOperationNotSupportedException( + u"BufferStream does not support seeking."); +} + +bool BufferStream::CanRead() { + CheckClosed(); + return true; } + +Index BufferStream::Read(std::byte* buffer, Index offset, Index size) { + std::unique_lock lock(mutex_); + + Index written_size = 0; + + if (eof_ && buffer_list_.empty()) { + return 0; + } + + while (!buffer_list_.empty()) { + + } +} + +} // namespace cru::io |