aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcrupest <crupest@outlook.com>2024-02-12 15:47:31 +0800
committercrupest <crupest@outlook.com>2024-03-24 20:03:58 +0800
commit944ea0e5b613d901ba834dc225eb9d379b750bd3 (patch)
tree9cdeae736bc635481b4cd3d6024d1d5d0c5072df
parentb2051e28cd36c49b7c5d49b511646a3856d7eaf0 (diff)
downloadcru-944ea0e5b613d901ba834dc225eb9d379b750bd3.tar.gz
cru-944ea0e5b613d901ba834dc225eb9d379b750bd3.tar.bz2
cru-944ea0e5b613d901ba834dc225eb9d379b750bd3.zip
WORKING: make Buffer track two sides.
-rw-r--r--include/cru/common/Buffer.h77
-rw-r--r--include/cru/common/concurrent/ConcurrentQueue.h22
-rw-r--r--include/cru/common/io/AutoReadStream.h7
-rw-r--r--include/cru/common/io/BufferStream.h89
-rw-r--r--src/common/Buffer.cpp83
-rw-r--r--src/common/CMakeLists.txt1
-rw-r--r--src/common/io/BufferStream.cpp45
7 files changed, 234 insertions, 90 deletions
diff --git a/include/cru/common/Buffer.h b/include/cru/common/Buffer.h
index 1fc894ae..8574cd86 100644
--- a/include/cru/common/Buffer.h
+++ b/include/cru/common/Buffer.h
@@ -2,10 +2,10 @@
#include "Base.h"
-#include <list>
-
namespace cru {
class Buffer final {
+ friend void swap(Buffer& left, Buffer& right) noexcept;
+
public:
explicit Buffer(Index size);
@@ -19,10 +19,12 @@ class Buffer final {
private:
Index GetBufferSize() const { return size_; }
- Index GetUsedSize() const { return used_size_; }
- Index GetRestSize() const { return GetBufferSize() - GetUsedSize(); }
+ Index GetUsedSize() const { return used_end_ - used_begin_; }
bool IsNull() const { return ptr_ == nullptr; }
- bool IsFull() const { return GetBufferSize() == GetUsedSize(); }
+ bool IsUsedReachEnd() const { return used_end_ == size_; }
+
+ Index GetUsedBegin() const { return used_begin_; }
+ Index GetUsedEnd() const { return used_end_; }
std::byte* GetPtr() { return GetPtrAt(0); }
const std::byte* GetPtr() const { return GetPtrAt(0); }
@@ -33,22 +35,23 @@ class Buffer final {
std::byte& GetRefAt(Index index) { return *GetPtrAt(index); }
const std::byte& GetRefAt(Index index) const { return *GetPtrAt(index); }
- std::byte* GetUsedEndPtr() { return GetPtrAt(GetUsedSize()); }
- const std::byte* GetUsedEndPtr() const { return GetPtrAt(GetUsedSize()); }
+ std::byte* GetUsedBeginPtr() { return GetPtrAt(GetUsedBegin()); }
+ const std::byte* GetUsedBeginPtr() const { return GetPtrAt(GetUsedBegin()); }
+ std::byte* GetUsedEndPtr() { return GetPtrAt(GetUsedEnd()); }
+ const std::byte* GetUsedEndPtr() const { return GetPtrAt(GetUsedEnd()); }
std::byte GetByteAt(Index index) const { return ptr_[index]; }
void SetByteAt(Index index, std::byte value) { ptr_[index] = value; }
- void AssignBytes(std::byte* src, Index src_size) {
- return AssignBytes(0, src, 0, src_size);
+ void AssignBytes(std::byte* src, Index src_size, bool use_memmove = false) {
+ return AssignBytes(0, src, 0, src_size, use_memmove);
}
- void AssignBytes(Index dst_offset, std::byte* src, Index src_size) {
- return AssignBytes(dst_offset, src, 0, src_size);
+ void AssignBytes(Index dst_offset, std::byte* src, Index src_size,
+ bool use_memmove = false) {
+ return AssignBytes(dst_offset, src, 0, src_size, use_memmove);
}
void AssignBytes(Index dst_offset, std::byte* src, Index src_offset,
- Index src_size);
-
- void SetUsedSize(Index new_size);
+ Index src_size, bool use_memmove = false);
/**
* @brief Change the size of the buffer.
@@ -61,6 +64,16 @@ class Buffer final {
void ResizeBuffer(Index new_size, bool preserve_used);
/**
+ * @brief Append data to the front of used bytes and increase used size.
+ * @return The actual size of data saved.
+ *
+ * If there is no enough space left for new data, the rest space will be
+ * written and the size of it will be returned, leaving exceeded data not
+ * saved.
+ */
+ Index PushFront(std::byte* other, Index other_size, bool use_memmove = false);
+
+ /**
* @brief Append data to the back of used bytes and increase used size.
* @return The actual size of data saved.
*
@@ -68,11 +81,20 @@ class Buffer final {
* written and the size of it will be returned, leaving exceeded data not
* saved.
*/
- Index PushEnd(std::byte* other, Index other_size);
+ Index PushBack(std::byte* other, Index other_size, bool use_memmove = false);
+
+ /**
+ * @brief Move forward the used-begin ptr.
+ * @return The actual size moved forward.
+ *
+ * If given size is bigger than current used size, the used size will be
+ * returned and set to 0.
+ */
+ Index PopFront(Index size);
/**
- * @brief Decrease used data size.
- * @return The actual size decreased.
+ * @brief Move backward the used-end ptr.
+ * @return The actual size moved backward.
*
* If given size is bigger than current used size, the used size will be
* returned and set to 0.
@@ -90,24 +112,9 @@ class Buffer final {
private:
std::byte* ptr_;
Index size_;
- Index used_size_;
+ Index used_begin_;
+ Index used_end_;
};
-void swap(Buffer& left, Buffer& right);
-
-class BufferList {
- public:
- explicit BufferList(Index buffer_size);
-
- BufferList(const BufferList& other);
- BufferList(BufferList&& other);
-
- BufferList& operator=(const BufferList& other);
- BufferList& operator=(BufferList&& other);
-
- ~BufferList();
-
- private:
- std::list<Buffer> buffers_;
-};
+void swap(Buffer& left, Buffer& right) noexcept;
} // namespace cru
diff --git a/include/cru/common/concurrent/ConcurrentQueue.h b/include/cru/common/concurrent/ConcurrentQueue.h
index 4f649a41..e311d5f9 100644
--- a/include/cru/common/concurrent/ConcurrentQueue.h
+++ b/include/cru/common/concurrent/ConcurrentQueue.h
@@ -24,28 +24,6 @@ class ConcurrentQueue {
ConcurrentQueue(const ConcurrentQueue&) = delete;
ConcurrentQueue& operator=(const ConcurrentQueue&) = delete;
- ConcurrentQueue(ConcurrentQueue&& other)
- : head_(other.head_),
- tail_(other.tail_),
- mutex_(std::move(other.mutex_)),
- condition_variable_(std::move(other.condition_variable_)) {
- other.head_ = nullptr;
- other.tail_ = nullptr;
- }
-
- ConcurrentQueue& operator=(ConcurrentQueue&& other) {
- if (this != &other) {
- head_ = other.head_;
- tail_ = other.tail_;
- mutex_ = std::move(other.mutex_);
- condition_variable_ = std::move(other.condition_variable_);
- other.head_ = nullptr;
- other.tail_ = nullptr;
- return *this;
- }
- return *this;
- }
-
~ConcurrentQueue() {
if (head_) {
auto node = head_;
diff --git a/include/cru/common/io/AutoReadStream.h b/include/cru/common/io/AutoReadStream.h
index 7857e8b9..e252bdff 100644
--- a/include/cru/common/io/AutoReadStream.h
+++ b/include/cru/common/io/AutoReadStream.h
@@ -1,13 +1,12 @@
#pragma once
-#include "Stream.h"
#include "../Buffer.h"
+#include "Stream.h"
#include <condition_variable>
#include <list>
#include <mutex>
#include <thread>
-#include <vector>
namespace cru::io {
struct AutoReadStreamOptions {
@@ -38,9 +37,7 @@ struct AutoReadStreamOptions {
*/
class CRU_BASE_API AutoReadStream : public Stream {
private:
- class BufferBlock {
-
- };
+ class BufferBlock {};
public:
/**
diff --git a/include/cru/common/io/BufferStream.h b/include/cru/common/io/BufferStream.h
new file mode 100644
index 00000000..64d1bb56
--- /dev/null
+++ b/include/cru/common/io/BufferStream.h
@@ -0,0 +1,89 @@
+#pragma once
+
+#include "../Buffer.h"
+#include "Stream.h"
+#include "../Exception.h"
+
+#include <condition_variable>
+#include <list>
+#include <mutex>
+
+namespace cru::io {
+class WriteAfterEofException : public Exception {
+ public:
+ using Exception::Exception;
+ ~WriteAfterEofException() override = default;
+};
+
+struct BufferStreamOptions {
+ /**
+ * @brief The size of a single buffer allocated each time new space is needed.
+ * Use default value if <= 0.
+ *
+ * When current buffer is full and there is no space for following data, a new
+ * buffer will be allocated and appended to the buffer list. Note if sum size
+ * of all buffers reaches the total_buffer_limit, no more buffer will be
+ * allocated but wait.
+ */
+ Index block_size = 0;
+
+ /**
+ * @brief Total size limit of saved data in buffer. Use default value if < 0.
+ * No limit if == 0.
+ *
+ * The size will be ceil(total_size_limit / block_size). When the buffer is
+ * filled, it will block and wait for user to read to get free space of buffer
+ * to continue read.
+ */
+ Index total_size_limit = 0;
+};
+
+class BufferStream : public Stream {
+ public:
+ /**
+ * Actually I have no ideas about the best value for this. May change it later
+ * when I get some ideas.
+ */
+ constexpr static Index kDefaultBlockSize = 1024;
+
+ /**
+ * Actually I have no ideas about the best value for this. May change it later
+ * when I get some ideas.
+ */
+ constexpr static Index kDefaultTotalSizeLimit = 1024;
+
+ public:
+ BufferStream(const BufferStreamOptions& options);
+
+ ~BufferStream() override;
+
+ bool CanSeek() override;
+ Index Seek(Index offset, SeekOrigin origin = SeekOrigin::Current) override;
+
+ bool CanRead() override;
+ Index Read(std::byte* buffer, Index offset, Index size) override;
+
+ bool CanWrite() = 0;
+ Index Write(const std::byte* buffer, Index offset, Index size) = 0;
+
+ virtual void Flush();
+
+ virtual void Close();
+
+ void SetEof();
+
+ private:
+ bool CheckClosed();
+
+ private:
+ Index block_size_;
+ Index total_size_limit_;
+ Index block_count_limit_;
+
+ std::list<Buffer> buffer_list_;
+ bool eof_;
+
+ std::mutex mutex_;
+ std::condition_variable condition_variable_;
+};
+} // namespace cru::io
diff --git a/src/common/Buffer.cpp b/src/common/Buffer.cpp
index 62904b17..00bbd166 100644
--- a/src/common/Buffer.cpp
+++ b/src/common/Buffer.cpp
@@ -1,7 +1,8 @@
#include "cru/common/Buffer.h"
-#include <cstring>
#include "cru/common/Exception.h"
+#include <cstring>
+
namespace cru {
namespace {
void CheckSize(Index size) {
@@ -15,11 +16,11 @@ Buffer::Buffer(Index size) {
CheckSize(size);
if (size == 0) {
ptr_ = nullptr;
- size_ = used_size_ = 0;
+ size_ = used_begin_ = used_end_ = 0;
} else {
ptr_ = new std::byte[size];
size_ = size;
- used_size_ = 0;
+ used_begin_ = used_end_ = 0;
}
}
@@ -46,17 +47,16 @@ Buffer& Buffer::operator=(Buffer&& other) noexcept {
Buffer::~Buffer() { Delete_(); }
void Buffer::AssignBytes(Index dst_offset, std::byte* src, Index src_offset,
- Index src_size) {
- std::memcpy(ptr_ + dst_offset, src + src_offset, src_size);
+ Index src_size, bool use_memmove) {
+ (use_memmove ? std::memmove : std::memcpy)(ptr_ + dst_offset,
+ src + src_offset, src_size);
}
-void Buffer::SetUsedSize(Index new_size) { used_size_ = new_size; }
-
void Buffer::ResizeBuffer(Index new_size, bool preserve_used) {
if (new_size == 0) {
Delete_();
ptr_ = nullptr;
- size_ = used_size_ = 0;
+ size_ = used_begin_ = used_end_ = 0;
return;
}
@@ -64,56 +64,75 @@ void Buffer::ResizeBuffer(Index new_size, bool preserve_used) {
ptr_ = new std::byte[new_size];
size_ = new_size;
+ used_begin_ = std::min(new_size, used_begin_);
+ used_end_ = std::min(new_size, used_end_);
if (old_ptr) {
- if (preserve_used) {
- auto copy_size = std::min(used_size_, new_size);
- std::memcpy(ptr_, old_ptr, copy_size);
- used_size_ = copy_size;
+ if (preserve_used && used_begin_ < used_end_) {
+ std::memcpy(ptr_ + used_begin_, old_ptr + used_begin_,
+ used_end_ - used_begin_);
}
delete[] old_ptr;
}
}
-Index Buffer::PushEnd(std::byte* other, Index other_size) {
- auto copy_size = std::min(GetRestSize(), other_size);
+Index Buffer::PushFront(std::byte* other, Index other_size, bool use_memmove) {
+ auto copy_size = std::min(used_begin_, other_size);
if (copy_size) {
- std::memcpy(GetUsedEndPtr(), other, copy_size);
- used_size_ += copy_size;
+ used_begin_ -= copy_size;
+ (use_memmove ? std::memmove : std::memcpy)(ptr_ + used_begin_, other,
+ copy_size);
}
return copy_size;
}
-Index Buffer::PopEnd(Index size) {
- if (used_size_ < size) {
- used_size_ = 0;
- return used_size_;
- } else {
- used_size_ -= size;
- return size;
+Index Buffer::PushBack(std::byte* other, Index other_size, bool use_memmove) {
+ auto copy_size = std::min(size_ - used_end_, other_size);
+
+ if (copy_size) {
+ (use_memmove ? std::memmove : std::memcpy)(ptr_ + used_end_, other,
+ copy_size);
+ used_end_ += copy_size;
}
+
+ return copy_size;
+}
+
+Index Buffer::PopFront(Index size) {
+ auto move = std::min(used_begin_, size);
+ used_begin_ -= move;
+ return move;
+}
+
+Index Buffer::PopEnd(Index size) {
+ auto move = std::min(size_ - used_end_, size);
+ used_end_ += move;
+ return move;
}
void Buffer::Copy_(const Buffer& other) {
if (other.ptr_ == nullptr) {
ptr_ = nullptr;
- size_ = used_size_ = 0;
+ size_ = used_begin_ = used_end_ = 0;
} else {
ptr_ = new std::byte[other.size_];
size_ = other.size_;
- used_size_ = other.used_size_;
- std::memcpy(ptr_, other.ptr_, used_size_);
+ used_begin_ = other.used_begin_;
+ used_end_ = other.used_end_;
+ std::memcpy(ptr_ + used_begin_, other.ptr_ + used_begin_,
+ used_end_ - used_begin_);
}
}
void Buffer::Move_(Buffer&& other) noexcept {
ptr_ = other.ptr_;
size_ = other.size_;
- used_size_ = other.used_size_;
+ used_begin_ = other.used_begin_;
+ used_end_ = other.used_end_;
other.ptr_ = nullptr;
- other.size_ = other.used_size_ = 0;
+ other.size_ = other.used_begin_ = other.used_end_ = 0;
}
void Buffer::Delete_() noexcept {
@@ -121,4 +140,12 @@ void Buffer::Delete_() noexcept {
delete[] ptr_;
}
}
+
+void swap(Buffer& left, Buffer& right) noexcept {
+ using std::swap;
+ swap(left.ptr_, right.ptr_);
+ swap(left.size_, right.size_);
+ swap(left.used_begin_, right.used_begin_);
+ swap(left.used_end_, right.used_end_);
+}
} // namespace cru
diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt
index bf3156ac..19feddba 100644
--- a/src/common/CMakeLists.txt
+++ b/src/common/CMakeLists.txt
@@ -9,6 +9,7 @@ add_library(CruBase
StringUtil.cpp
SubProcess.cpp
io/AutoReadStream.cpp
+ io/BufferStream.cpp
io/CFileStream.cpp
io/Stream.cpp
io/ProxyStream.cpp
diff --git a/src/common/io/BufferStream.cpp b/src/common/io/BufferStream.cpp
new file mode 100644
index 00000000..d4780f7a
--- /dev/null
+++ b/src/common/io/BufferStream.cpp
@@ -0,0 +1,45 @@
+#include "cru/common/io/BufferStream.h"
+#include <memory>
+#include "cru/common/io/Stream.h"
+
+namespace cru::io {
+BufferStream::BufferStream(const BufferStreamOptions& options) {
+ block_size_ =
+ options.block_size <= 0 ? kDefaultBlockSize : options.block_size;
+ total_size_limit_ = options.total_size_limit < 0 ? kDefaultTotalSizeLimit
+ : options.total_size_limit;
+ block_count_limit_ = total_size_limit_ / block_size_;
+
+ eof_ = false;
+}
+
+bool BufferStream::CanSeek() {
+ CheckClosed();
+ return false;
+}
+
+Index BufferStream::Seek(Index offset, SeekOrigin origin) {
+ CheckClosed();
+ throw StreamOperationNotSupportedException(
+ u"BufferStream does not support seeking.");
+}
+
+bool BufferStream::CanRead() {
+ CheckClosed();
+ return true; }
+
+Index BufferStream::Read(std::byte* buffer, Index offset, Index size) {
+ std::unique_lock lock(mutex_);
+
+ Index written_size = 0;
+
+ if (eof_ && buffer_list_.empty()) {
+ return 0;
+ }
+
+ while (!buffer_list_.empty()) {
+
+ }
+}
+
+} // namespace cru::io