diff options
author | crupest <crupest@outlook.com> | 2024-02-12 15:47:31 +0800 |
---|---|---|
committer | crupest <crupest@outlook.com> | 2024-03-20 22:28:21 +0800 |
commit | a21be4edaa483b86872ad732fc4b93970a607952 (patch) | |
tree | e80010d0ef48cb2804c8509a753dc9455dee2a08 | |
parent | f9c404510584faab71b9e9d911d9b396b0f420b0 (diff) | |
download | cru-a21be4edaa483b86872ad732fc4b93970a607952.tar.gz cru-a21be4edaa483b86872ad732fc4b93970a607952.tar.bz2 cru-a21be4edaa483b86872ad732fc4b93970a607952.zip |
WORKING: add Buffer and AutoReadStream.
-rw-r--r-- | include/cru/common/Buffer.h | 50 | ||||
-rw-r--r-- | include/cru/common/io/AutoReadStream.h | 88 | ||||
-rw-r--r-- | include/cru/common/platform/unix/PosixSpawnSubProcess.h | 5 | ||||
-rw-r--r-- | src/common/Buffer.cpp | 0 | ||||
-rw-r--r-- | src/common/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/common/io/AutoReadStream.cpp | 36 | ||||
-rw-r--r-- | src/common/platform/unix/PosixSpawnSubProcess.cpp | 7 |
7 files changed, 188 insertions, 0 deletions
diff --git a/include/cru/common/Buffer.h b/include/cru/common/Buffer.h new file mode 100644 index 00000000..5390ebb4 --- /dev/null +++ b/include/cru/common/Buffer.h @@ -0,0 +1,50 @@ +#pragma once + +#include "Base.h" + +namespace cru { +class Buffer { + public: + explicit Buffer(Index size); + + Buffer(const Buffer& other); + Buffer(Buffer&& other) noexcept; + + Buffer& operator=(const Buffer& other); + Buffer& operator=(Buffer&& other) noexcept; + + ~Buffer(); + + private: + Index GetSize() const; + Index GetUsedSize() const; + bool IsFull() const { return GetSize() == GetUsedSize(); } + bool IsNotFull() const { return !IsFull(); } + + std::byte& GetAt(Index index); + std::byte GetAt(Index index) const; + + Index PushEnd(std::byte* other, Index other_size); + Index PopEnd(Index size); + + std::byte* Data(); + const std::byte* Data() const; + + operator std::byte*() { return Data(); } + operator const std::byte*() const { return Data(); } + + private: + std::byte* ptr_; + Index size_; + Index used_size_; +}; + +void swap(Buffer& left, Buffer& right); + +class BufferList { + public: + explicit BufferList(Index buffer_size); + private: + +}; +} // namespace cru diff --git a/include/cru/common/io/AutoReadStream.h b/include/cru/common/io/AutoReadStream.h new file mode 100644 index 00000000..7857e8b9 --- /dev/null +++ b/include/cru/common/io/AutoReadStream.h @@ -0,0 +1,88 @@ +#pragma once + +#include "Stream.h" +#include "../Buffer.h" + +#include <condition_variable> +#include <list> +#include <mutex> +#include <thread> +#include <vector> + +namespace cru::io { +struct AutoReadStreamOptions { + /** + * @brief The size of a single buffer allocated each time new space is needed. + * Use default value if <= 0. + * + * When current buffer is full and there is no space for following data, a new + * buffer will be allocated and appended to the buffer list. Note if sum size + * of all buffers reaches the total_buffer_size, no more buffer will be + * allocated but wait. + */ + int buffer_block_size = 0; + + /** + * @brief Total size limit of saved data in buffer. Use default value if < 0. + * No limit if == 0. + * + * When the buffer is filled, it will block and wait for user to read to get + * free space of buffer to continue read. + */ + int total_buffer_size = 0; +}; + +/** + * @brief A stream that wraps another stream and auto read it into a buffer in a + * background thread. + */ +class CRU_BASE_API AutoReadStream : public Stream { + private: + class BufferBlock { + + }; + + public: + /** + * @brief Wrap a stream and auto read it in background. + * @param stream The stream to auto read. + * @param auto_delete Whether to delete the stream object in destructor. + * @param options Options to modify the behavior. + */ + AutoReadStream( + Stream* stream, bool auto_delete, + const AutoReadStreamOptions& options = AutoReadStreamOptions()); + + ~AutoReadStream() override; + + public: + bool CanSeek() override; + Index Seek(Index offset, SeekOrigin origin = SeekOrigin::Current) override; + + bool CanRead() = 0; + virtual Index Read(std::byte* buffer, Index offset, Index size) = 0; + + bool CanWrite() override; + Index Write(const std::byte* buffer, Index offset, Index size) override; + + void Flush() override; + + void Close() = 0; + + private: + void BackgroundThreadRun(); + + private: + Stream* stream_; + bool auto_delete_; + + int buffer_block_size_; + int total_buffer_size_; + + std::mutex buffer_mutex_; + std::condition_variable buffer_condition_variable_; + std::list<Buffer> buffer_list_; + + std::thread background_thread_; +}; +} // namespace cru::io diff --git a/include/cru/common/platform/unix/PosixSpawnSubProcess.h b/include/cru/common/platform/unix/PosixSpawnSubProcess.h index a55c2c1c..0a0f0ad4 100644 --- a/include/cru/common/platform/unix/PosixSpawnSubProcess.h +++ b/include/cru/common/platform/unix/PosixSpawnSubProcess.h @@ -17,6 +17,11 @@ class PosixSpawnSubProcess : public PlatformSubProcessBase { explicit PosixSpawnSubProcess(const PlatformSubProcessStartInfo& start_info); ~PosixSpawnSubProcess(); + protected: + void PlatformCreateProcess() override; + PlatformSubProcessExitResult PlatformWaitForProcess() override; + void PlatformKillProcess() override; + private: pid_t pid_; int exit_code_; diff --git a/src/common/Buffer.cpp b/src/common/Buffer.cpp new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/common/Buffer.cpp diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index aaff70fe..bf3156ac 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -1,5 +1,6 @@ add_library(CruBase Base.cpp + Buffer.cpp Exception.cpp Format.cpp PropertyTree.cpp @@ -7,6 +8,7 @@ add_library(CruBase StringToNumberConverter.cpp StringUtil.cpp SubProcess.cpp + io/AutoReadStream.cpp io/CFileStream.cpp io/Stream.cpp io/ProxyStream.cpp diff --git a/src/common/io/AutoReadStream.cpp b/src/common/io/AutoReadStream.cpp new file mode 100644 index 00000000..7cdc1268 --- /dev/null +++ b/src/common/io/AutoReadStream.cpp @@ -0,0 +1,36 @@ +#include "cru/common/io/AutoReadStream.h" +#include <vector> +#include "cru/common/io/Stream.h" + +namespace cru::io { + +AutoReadStream::AutoReadStream(Stream* stream, bool auto_delete, + const AutoReadStreamOptions& options) { + background_thread_ = std::thread(&AutoReadStream::BackgroundThreadRun, this); +} + +bool AutoReadStream::CanSeek() { return false; } + +Index AutoReadStream::Seek(Index offset, SeekOrigin origin) { + throw StreamOperationNotSupportedException( + u"AutoReadStream does not support seek."); +} + +bool AutoReadStream::CanWrite() { return stream_->CanWrite(); } + +Index AutoReadStream::Write(const std::byte* buffer, Index offset, Index size) { + return stream_->Write(buffer, offset, size); +} + +void AutoReadStream::Flush() { stream_->Flush(); } + +void AutoReadStream::BackgroundThreadRun() { + std::unique_lock<std::mutex> lock(buffer_mutex_); + std::vector<std::byte>* buffer = nullptr; + if (!buffer_list_.empty()) { + } + stream_->Read(); +} +} + +} // namespace cru::io diff --git a/src/common/platform/unix/PosixSpawnSubProcess.cpp b/src/common/platform/unix/PosixSpawnSubProcess.cpp index a356de77..7d5ba6e4 100644 --- a/src/common/platform/unix/PosixSpawnSubProcess.cpp +++ b/src/common/platform/unix/PosixSpawnSubProcess.cpp @@ -23,4 +23,11 @@ PosixSpawnSubProcess::PosixSpawnSubProcess( PosixSpawnSubProcess::~PosixSpawnSubProcess() {} +void PosixSpawnSubProcess::PlatformCreateProcess() { + +} + +PlatformSubProcessExitResult PosixSpawnSubProcess::PlatformWaitForProcess() {} + +void PosixSpawnSubProcess::PlatformKillProcess() {} } // namespace cru::platform::unix |