aboutsummaryrefslogtreecommitdiff
path: root/include/cru
diff options
context:
space:
mode:
Diffstat (limited to 'include/cru')
-rw-r--r--include/cru/common/Buffer.h77
-rw-r--r--include/cru/common/concurrent/ConcurrentQueue.h22
-rw-r--r--include/cru/common/io/AutoReadStream.h7
-rw-r--r--include/cru/common/io/BufferStream.h89
4 files changed, 133 insertions, 62 deletions
diff --git a/include/cru/common/Buffer.h b/include/cru/common/Buffer.h
index 1fc894ae..8574cd86 100644
--- a/include/cru/common/Buffer.h
+++ b/include/cru/common/Buffer.h
@@ -2,10 +2,10 @@
#include "Base.h"
-#include <list>
-
namespace cru {
class Buffer final {
+ friend void swap(Buffer& left, Buffer& right) noexcept;
+
public:
explicit Buffer(Index size);
@@ -19,10 +19,12 @@ class Buffer final {
private:
Index GetBufferSize() const { return size_; }
- Index GetUsedSize() const { return used_size_; }
- Index GetRestSize() const { return GetBufferSize() - GetUsedSize(); }
+ Index GetUsedSize() const { return used_end_ - used_begin_; }
bool IsNull() const { return ptr_ == nullptr; }
- bool IsFull() const { return GetBufferSize() == GetUsedSize(); }
+ bool IsUsedReachEnd() const { return used_end_ == size_; }
+
+ Index GetUsedBegin() const { return used_begin_; }
+ Index GetUsedEnd() const { return used_end_; }
std::byte* GetPtr() { return GetPtrAt(0); }
const std::byte* GetPtr() const { return GetPtrAt(0); }
@@ -33,22 +35,23 @@ class Buffer final {
std::byte& GetRefAt(Index index) { return *GetPtrAt(index); }
const std::byte& GetRefAt(Index index) const { return *GetPtrAt(index); }
- std::byte* GetUsedEndPtr() { return GetPtrAt(GetUsedSize()); }
- const std::byte* GetUsedEndPtr() const { return GetPtrAt(GetUsedSize()); }
+ std::byte* GetUsedBeginPtr() { return GetPtrAt(GetUsedBegin()); }
+ const std::byte* GetUsedBeginPtr() const { return GetPtrAt(GetUsedBegin()); }
+ std::byte* GetUsedEndPtr() { return GetPtrAt(GetUsedEnd()); }
+ const std::byte* GetUsedEndPtr() const { return GetPtrAt(GetUsedEnd()); }
std::byte GetByteAt(Index index) const { return ptr_[index]; }
void SetByteAt(Index index, std::byte value) { ptr_[index] = value; }
- void AssignBytes(std::byte* src, Index src_size) {
- return AssignBytes(0, src, 0, src_size);
+ void AssignBytes(std::byte* src, Index src_size, bool use_memmove = false) {
+ return AssignBytes(0, src, 0, src_size, use_memmove);
}
- void AssignBytes(Index dst_offset, std::byte* src, Index src_size) {
- return AssignBytes(dst_offset, src, 0, src_size);
+ void AssignBytes(Index dst_offset, std::byte* src, Index src_size,
+ bool use_memmove = false) {
+ return AssignBytes(dst_offset, src, 0, src_size, use_memmove);
}
void AssignBytes(Index dst_offset, std::byte* src, Index src_offset,
- Index src_size);
-
- void SetUsedSize(Index new_size);
+ Index src_size, bool use_memmove = false);
/**
* @brief Change the size of the buffer.
@@ -61,6 +64,16 @@ class Buffer final {
void ResizeBuffer(Index new_size, bool preserve_used);
/**
+ * @brief Append data to the front of used bytes and increase used size.
+ * @return The actual size of data saved.
+ *
+ * If there is no enough space left for new data, the rest space will be
+ * written and the size of it will be returned, leaving exceeded data not
+ * saved.
+ */
+ Index PushFront(std::byte* other, Index other_size, bool use_memmove = false);
+
+ /**
* @brief Append data to the back of used bytes and increase used size.
* @return The actual size of data saved.
*
@@ -68,11 +81,20 @@ class Buffer final {
* written and the size of it will be returned, leaving exceeded data not
* saved.
*/
- Index PushEnd(std::byte* other, Index other_size);
+ Index PushBack(std::byte* other, Index other_size, bool use_memmove = false);
+
+ /**
+ * @brief Move forward the used-begin ptr.
+ * @return The actual size moved forward.
+ *
+ * If given size is bigger than current used size, the used size will be
+ * returned and set to 0.
+ */
+ Index PopFront(Index size);
/**
- * @brief Decrease used data size.
- * @return The actual size decreased.
+ * @brief Move backward the used-end ptr.
+ * @return The actual size moved backward.
*
* If given size is bigger than current used size, the used size will be
* returned and set to 0.
@@ -90,24 +112,9 @@ class Buffer final {
private:
std::byte* ptr_;
Index size_;
- Index used_size_;
+ Index used_begin_;
+ Index used_end_;
};
-void swap(Buffer& left, Buffer& right);
-
-class BufferList {
- public:
- explicit BufferList(Index buffer_size);
-
- BufferList(const BufferList& other);
- BufferList(BufferList&& other);
-
- BufferList& operator=(const BufferList& other);
- BufferList& operator=(BufferList&& other);
-
- ~BufferList();
-
- private:
- std::list<Buffer> buffers_;
-};
+void swap(Buffer& left, Buffer& right) noexcept;
} // namespace cru
diff --git a/include/cru/common/concurrent/ConcurrentQueue.h b/include/cru/common/concurrent/ConcurrentQueue.h
index 4f649a41..e311d5f9 100644
--- a/include/cru/common/concurrent/ConcurrentQueue.h
+++ b/include/cru/common/concurrent/ConcurrentQueue.h
@@ -24,28 +24,6 @@ class ConcurrentQueue {
ConcurrentQueue(const ConcurrentQueue&) = delete;
ConcurrentQueue& operator=(const ConcurrentQueue&) = delete;
- ConcurrentQueue(ConcurrentQueue&& other)
- : head_(other.head_),
- tail_(other.tail_),
- mutex_(std::move(other.mutex_)),
- condition_variable_(std::move(other.condition_variable_)) {
- other.head_ = nullptr;
- other.tail_ = nullptr;
- }
-
- ConcurrentQueue& operator=(ConcurrentQueue&& other) {
- if (this != &other) {
- head_ = other.head_;
- tail_ = other.tail_;
- mutex_ = std::move(other.mutex_);
- condition_variable_ = std::move(other.condition_variable_);
- other.head_ = nullptr;
- other.tail_ = nullptr;
- return *this;
- }
- return *this;
- }
-
~ConcurrentQueue() {
if (head_) {
auto node = head_;
diff --git a/include/cru/common/io/AutoReadStream.h b/include/cru/common/io/AutoReadStream.h
index 7857e8b9..e252bdff 100644
--- a/include/cru/common/io/AutoReadStream.h
+++ b/include/cru/common/io/AutoReadStream.h
@@ -1,13 +1,12 @@
#pragma once
-#include "Stream.h"
#include "../Buffer.h"
+#include "Stream.h"
#include <condition_variable>
#include <list>
#include <mutex>
#include <thread>
-#include <vector>
namespace cru::io {
struct AutoReadStreamOptions {
@@ -38,9 +37,7 @@ struct AutoReadStreamOptions {
*/
class CRU_BASE_API AutoReadStream : public Stream {
private:
- class BufferBlock {
-
- };
+ class BufferBlock {};
public:
/**
diff --git a/include/cru/common/io/BufferStream.h b/include/cru/common/io/BufferStream.h
new file mode 100644
index 00000000..64d1bb56
--- /dev/null
+++ b/include/cru/common/io/BufferStream.h
@@ -0,0 +1,89 @@
+#pragma once
+
+#include "../Buffer.h"
+#include "Stream.h"
+#include "../Exception.h"
+
+#include <condition_variable>
+#include <list>
+#include <mutex>
+
+namespace cru::io {
+class WriteAfterEofException : public Exception {
+ public:
+ using Exception::Exception;
+ ~WriteAfterEofException() override = default;
+};
+
+struct BufferStreamOptions {
+ /**
+ * @brief The size of a single buffer allocated each time new space is needed.
+ * Use default value if <= 0.
+ *
+ * When current buffer is full and there is no space for following data, a new
+ * buffer will be allocated and appended to the buffer list. Note if sum size
+ * of all buffers reaches the total_buffer_limit, no more buffer will be
+ * allocated but wait.
+ */
+ Index block_size = 0;
+
+ /**
+ * @brief Total size limit of saved data in buffer. Use default value if < 0.
+ * No limit if == 0.
+ *
+ * The size will be ceil(total_size_limit / block_size). When the buffer is
+ * filled, it will block and wait for user to read to get free space of buffer
+ * to continue read.
+ */
+ Index total_size_limit = 0;
+};
+
+class BufferStream : public Stream {
+ public:
+ /**
+ * Actually I have no ideas about the best value for this. May change it later
+ * when I get some ideas.
+ */
+ constexpr static Index kDefaultBlockSize = 1024;
+
+ /**
+ * Actually I have no ideas about the best value for this. May change it later
+ * when I get some ideas.
+ */
+ constexpr static Index kDefaultTotalSizeLimit = 1024;
+
+ public:
+ BufferStream(const BufferStreamOptions& options);
+
+ ~BufferStream() override;
+
+ bool CanSeek() override;
+ Index Seek(Index offset, SeekOrigin origin = SeekOrigin::Current) override;
+
+ bool CanRead() override;
+ Index Read(std::byte* buffer, Index offset, Index size) override;
+
+ bool CanWrite() = 0;
+ Index Write(const std::byte* buffer, Index offset, Index size) = 0;
+
+ virtual void Flush();
+
+ virtual void Close();
+
+ void SetEof();
+
+ private:
+ bool CheckClosed();
+
+ private:
+ Index block_size_;
+ Index total_size_limit_;
+ Index block_count_limit_;
+
+ std::list<Buffer> buffer_list_;
+ bool eof_;
+
+ std::mutex mutex_;
+ std::condition_variable condition_variable_;
+};
+} // namespace cru::io