aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/cru/common/io/AutoReadStream.h44
-rw-r--r--include/cru/common/io/BufferStream.h33
-rw-r--r--src/common/io/AutoReadStream.cpp37
-rw-r--r--src/common/io/BufferStream.cpp18
4 files changed, 75 insertions, 57 deletions
diff --git a/include/cru/common/io/AutoReadStream.h b/include/cru/common/io/AutoReadStream.h
index e252bdff..4ba4066f 100644
--- a/include/cru/common/io/AutoReadStream.h
+++ b/include/cru/common/io/AutoReadStream.h
@@ -1,6 +1,7 @@
#pragma once
#include "../Buffer.h"
+#include "BufferStream.h"
#include "Stream.h"
#include <condition_variable>
@@ -11,24 +12,21 @@
namespace cru::io {
struct AutoReadStreamOptions {
/**
- * @brief The size of a single buffer allocated each time new space is needed.
- * Use default value if <= 0.
- *
- * When current buffer is full and there is no space for following data, a new
- * buffer will be allocated and appended to the buffer list. Note if sum size
- * of all buffers reaches the total_buffer_size, no more buffer will be
- * allocated but wait.
+ * @brief Will be passed to BufferStreamOptions::block_size.
*/
- int buffer_block_size = 0;
+ Index block_size = 0;
/**
- * @brief Total size limit of saved data in buffer. Use default value if < 0.
- * No limit if == 0.
- *
- * When the buffer is filled, it will block and wait for user to read to get
- * free space of buffer to continue read.
+ * @brief Will be passed to BufferStreamOptions::total_size_limit.
*/
- int total_buffer_size = 0;
+ Index total_size_limit = 0;
+
+ BufferStreamOptions GetBufferStreamOptions() const {
+ BufferStreamOptions options;
+ options.block_size = block_size;
+ options.total_size_limit = total_size_limit;
+ return options;
+ }
};
/**
@@ -36,9 +34,6 @@ struct AutoReadStreamOptions {
* background thread.
*/
class CRU_BASE_API AutoReadStream : public Stream {
- private:
- class BufferBlock {};
-
public:
/**
* @brief Wrap a stream and auto read it in background.
@@ -56,15 +51,15 @@ class CRU_BASE_API AutoReadStream : public Stream {
bool CanSeek() override;
Index Seek(Index offset, SeekOrigin origin = SeekOrigin::Current) override;
- bool CanRead() = 0;
- virtual Index Read(std::byte* buffer, Index offset, Index size) = 0;
+ bool CanRead() override;
+ Index Read(std::byte* buffer, Index offset, Index size) override;
bool CanWrite() override;
Index Write(const std::byte* buffer, Index offset, Index size) override;
void Flush() override;
- void Close() = 0;
+ void Close() override;
private:
void BackgroundThreadRun();
@@ -73,12 +68,9 @@ class CRU_BASE_API AutoReadStream : public Stream {
Stream* stream_;
bool auto_delete_;
- int buffer_block_size_;
- int total_buffer_size_;
-
- std::mutex buffer_mutex_;
- std::condition_variable buffer_condition_variable_;
- std::list<Buffer> buffer_list_;
+ Index size_per_read_;
+ std::unique_ptr<BufferStream> buffer_stream_;
+ std::mutex buffer_stream_mutex_;
std::thread background_thread_;
};
diff --git a/include/cru/common/io/BufferStream.h b/include/cru/common/io/BufferStream.h
index b3a5e9d1..d5d165cb 100644
--- a/include/cru/common/io/BufferStream.h
+++ b/include/cru/common/io/BufferStream.h
@@ -17,6 +17,12 @@ class WriteAfterEofException : public Exception {
struct BufferStreamOptions {
/**
+ * Actually I have no ideas about the best value for this. May change it later
+ * when I get some ideas.
+ */
+ constexpr static Index kDefaultBlockSize = 1024;
+
+ /**
* @brief The size of a single buffer allocated each time new space is needed.
* Use default value if <= 0.
*
@@ -28,14 +34,19 @@ struct BufferStreamOptions {
Index block_size = 0;
/**
- * @brief Total size limit of saved data in buffer. Use default value if < 0.
- * No limit if == 0.
+ * @brief Total size limit of saved data in buffer. No limit if <= 0.
*
* The size will be floor(total_size_limit / block_size). When the buffer is
* filled, it will block and wait for user to read to get free space of buffer
* to continue read.
*/
Index total_size_limit = 0;
+
+ Index GetBlockSizeOrDefault() const {
+ return block_size <= 0 ? kDefaultBlockSize : block_size;
+ }
+
+ Index GetMaxBlockCount() const { return total_size_limit / block_size; }
};
/**
@@ -45,19 +56,6 @@ struct BufferStreamOptions {
*/
class BufferStream : public Stream {
public:
- /**
- * Actually I have no ideas about the best value for this. May change it later
- * when I get some ideas.
- */
- constexpr static Index kDefaultBlockSize = 1024;
-
- /**
- * Actually I have no ideas about the best value for this. May change it later
- * when I get some ideas.
- */
- constexpr static Index kDefaultTotalSizeLimit = 1024 * 1024;
-
- public:
BufferStream(const BufferStreamOptions& options);
~BufferStream() override;
@@ -67,16 +65,17 @@ class BufferStream : public Stream {
bool CanRead() override;
Index Read(std::byte* buffer, Index offset, Index size) override;
+ using Stream::Read;
bool CanWrite() override;
Index Write(const std::byte* buffer, Index offset, Index size) override;
+ using Stream::Write;
void SetEof();
private:
Index block_size_;
- Index total_size_limit_;
- Index block_count_limit_;
+ Index max_block_count_;
std::list<Buffer> buffer_list_;
bool eof_;
diff --git a/src/common/io/AutoReadStream.cpp b/src/common/io/AutoReadStream.cpp
index 1acf8930..edb77a47 100644
--- a/src/common/io/AutoReadStream.cpp
+++ b/src/common/io/AutoReadStream.cpp
@@ -1,14 +1,22 @@
#include "cru/common/io/AutoReadStream.h"
-#include <vector>
#include "cru/common/io/Stream.h"
namespace cru::io {
AutoReadStream::AutoReadStream(Stream* stream, bool auto_delete,
const AutoReadStreamOptions& options) {
+ auto buffer_stream_options = options.GetBufferStreamOptions();
+ size_per_read_ = buffer_stream_options.GetBlockSizeOrDefault();
+ buffer_stream_ = std::make_unique<BufferStream>(buffer_stream_options);
background_thread_ = std::thread(&AutoReadStream::BackgroundThreadRun, this);
}
+AutoReadStream::~AutoReadStream() {
+ if (auto_delete_) {
+ delete stream_;
+ }
+}
+
bool AutoReadStream::CanSeek() { return false; }
Index AutoReadStream::Seek(Index offset, SeekOrigin origin) {
@@ -16,6 +24,13 @@ Index AutoReadStream::Seek(Index offset, SeekOrigin origin) {
u"AutoReadStream does not support seek.");
}
+bool AutoReadStream::CanRead() { return true; }
+
+Index AutoReadStream::Read(std::byte* buffer, Index offset, Index size) {
+ std::unique_lock lock(buffer_stream_mutex_);
+ return buffer_stream_->Read(buffer, offset, size);
+}
+
bool AutoReadStream::CanWrite() { return stream_->CanWrite(); }
Index AutoReadStream::Write(const std::byte* buffer, Index offset, Index size) {
@@ -24,12 +39,24 @@ Index AutoReadStream::Write(const std::byte* buffer, Index offset, Index size) {
void AutoReadStream::Flush() { stream_->Flush(); }
+void AutoReadStream::Close() { stream_->Close(); }
+
void AutoReadStream::BackgroundThreadRun() {
- std::unique_lock<std::mutex> lock(buffer_mutex_);
- std::vector<std::byte>* buffer = nullptr;
- if (!buffer_list_.empty()) {
+ std::vector<std::byte> buffer(size_per_read_);
+ while (true) {
+ try {
+ auto read = stream_->Read(buffer.data(), buffer.size());
+ if (read == 0) {
+ buffer_stream_->SetEof();
+ break;
+ } else {
+ buffer_stream_->Write(buffer.data(), read);
+ }
+ } catch (const StreamAlreadyClosedException& exception) {
+ buffer_stream_->SetEof();
+ break;
+ }
}
- stream_->Read();
}
} // namespace cru::io
diff --git a/src/common/io/BufferStream.cpp b/src/common/io/BufferStream.cpp
index 1becc1d9..b2467a17 100644
--- a/src/common/io/BufferStream.cpp
+++ b/src/common/io/BufferStream.cpp
@@ -3,11 +3,8 @@
namespace cru::io {
BufferStream::BufferStream(const BufferStreamOptions& options) {
- block_size_ =
- options.block_size <= 0 ? kDefaultBlockSize : options.block_size;
- total_size_limit_ = options.total_size_limit < 0 ? kDefaultTotalSizeLimit
- : options.total_size_limit;
- block_count_limit_ = total_size_limit_ / block_size_;
+ block_size_ = options.GetBlockSizeOrDefault();
+ max_block_count_ = options.GetMaxBlockCount();
eof_ = false;
}
@@ -33,7 +30,7 @@ Index BufferStream::Read(std::byte* buffer, Index offset, Index size) {
return 0;
}
- auto full = buffer_list_.size() == block_count_limit_;
+ auto full = max_block_count_ > 0 && buffer_list_.size() == max_block_count_;
Index read = 0;
@@ -50,7 +47,7 @@ Index BufferStream::Read(std::byte* buffer, Index offset, Index size) {
}
}
- if (full && buffer_list_.size() < block_count_limit_) {
+ if (full && buffer_list_.size() < max_block_count_) {
// By convention, there should be at most one producer waiting. So
// notify_one and notify_all should be the same.
condition_variable_.notify_one();
@@ -70,7 +67,7 @@ Index BufferStream::Write(const std::byte* buffer, Index offset, Index size) {
}
condition_variable_.wait(lock, [this] {
- return buffer_list_.size() < block_count_limit_ ||
+ return buffer_list_.size() < max_block_count_ ||
buffer_list_.back().GetBackFree() > 0;
});
@@ -78,8 +75,11 @@ Index BufferStream::Write(const std::byte* buffer, Index offset, Index size) {
Index written = 0;
- while (buffer_list_.size() != block_count_limit_) {
+ while (true) {
if (buffer_list_.back().GetBackFree() == 0) {
+ if (max_block_count_ > 0 && buffer_list_.size() == max_block_count_) {
+ break;
+ }
buffer_list_.push_back(Buffer(block_size_));
}
auto& stream_buffer = buffer_list_.back();