aboutsummaryrefslogtreecommitdiff
path: root/src/base/io
diff options
context:
space:
mode:
authorcrupest <crupest@outlook.com>2024-06-24 00:06:25 +0800
committercrupest <crupest@outlook.com>2024-10-06 18:03:42 +0800
commitb2a331aa527bdd5f4d91727904cc5c66fe897d62 (patch)
treee84d0bfdd4ed63255024e02954963d8c564ad024 /src/base/io
parentdfe62dcf8bcefc523b466e127c3edc4dc2756629 (diff)
downloadcru-b2a331aa527bdd5f4d91727904cc5c66fe897d62.tar.gz
cru-b2a331aa527bdd5f4d91727904cc5c66fe897d62.tar.bz2
cru-b2a331aa527bdd5f4d91727904cc5c66fe897d62.zip
Done Stream refactor.
NEED TEST: BufferStream, AutoReadStream, SubProcess.
Diffstat (limited to 'src/base/io')
-rw-r--r--src/base/io/AutoReadStream.cpp30
-rw-r--r--src/base/io/BufferStream.cpp22
-rw-r--r--src/base/io/CFileStream.cpp4
-rw-r--r--src/base/io/Stream.cpp6
4 files changed, 44 insertions, 18 deletions
diff --git a/src/base/io/AutoReadStream.cpp b/src/base/io/AutoReadStream.cpp
index c24f61d1..0c035648 100644
--- a/src/base/io/AutoReadStream.cpp
+++ b/src/base/io/AutoReadStream.cpp
@@ -5,9 +5,12 @@
namespace cru::io {
-AutoReadStream::AutoReadStream(Stream* stream, bool auto_delete,
+AutoReadStream::AutoReadStream(Stream* stream, bool auto_close,
+ bool auto_delete,
const AutoReadStreamOptions& options)
- : Stream(false, true, stream->CanSeek()) {
+ : Stream(false, true, stream->CanSeek()),
+ auto_close_(auto_close),
+ auto_delete_(auto_delete) {
auto buffer_stream_options = options.GetBufferStreamOptions();
stream_ = stream;
size_per_read_ = buffer_stream_options.GetBlockSizeOrDefault();
@@ -16,13 +19,11 @@ AutoReadStream::AutoReadStream(Stream* stream, bool auto_delete,
}
AutoReadStream::~AutoReadStream() {
- if (auto_delete_) {
- delete stream_;
- }
+ DoClose();
+ background_thread_.join();
}
Index AutoReadStream::DoRead(std::byte* buffer, Index offset, Index size) {
- std::unique_lock lock(buffer_stream_mutex_);
return buffer_stream_->Read(buffer, offset, size);
}
@@ -33,7 +34,17 @@ Index AutoReadStream::DoWrite(const std::byte* buffer, Index offset,
void AutoReadStream::DoFlush() { stream_->Flush(); }
-void AutoReadStream::DoClose() {}
+void AutoReadStream::DoClose() {
+ CRU_STREAM_BEGIN_CLOSE
+ if (auto_close_) {
+ stream_->Close();
+ }
+ if (auto_delete_) {
+ delete stream_;
+ stream_ = nullptr;
+ }
+ buffer_stream_->Close();
+}
void AutoReadStream::BackgroundThreadRun() {
std::vector<std::byte> buffer(size_per_read_);
@@ -46,11 +57,10 @@ void AutoReadStream::BackgroundThreadRun() {
} else {
buffer_stream_->Write(buffer.data(), read);
}
- } catch (const StreamAlreadyClosedException& exception) {
- buffer_stream_->SetEof();
+ } catch (const StreamClosedException& exception) {
+ buffer_stream_->Close();
break;
}
}
}
-
} // namespace cru::io
diff --git a/src/base/io/BufferStream.cpp b/src/base/io/BufferStream.cpp
index e81731e8..57a8b694 100644
--- a/src/base/io/BufferStream.cpp
+++ b/src/base/io/BufferStream.cpp
@@ -15,8 +15,12 @@ BufferStream::~BufferStream() { DoClose(); }
Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) {
std::unique_lock lock(mutex_);
- condition_variable_.wait(lock,
- [this] { return !buffer_list_.empty() || eof_; });
+ condition_variable_.wait(
+ lock, [this] { return GetClosed() || !buffer_list_.empty() || eof_; });
+
+ if (GetClosed()) {
+ StreamClosedException::Check(true);
+ }
if (buffer_list_.empty() && eof_) {
return 0;
@@ -57,10 +61,15 @@ Index BufferStream::DoWrite(const std::byte* buffer, Index offset, Index size) {
}
condition_variable_.wait(lock, [this] {
- return max_block_count_ <= 0 || buffer_list_.size() < max_block_count_ ||
+ return GetClosed() || max_block_count_ <= 0 ||
+ buffer_list_.size() < max_block_count_ ||
buffer_list_.back().GetBackFree() > 0;
});
+ if (GetClosed()) {
+ StreamClosedException::Check(true);
+ }
+
auto empty = buffer_list_.empty();
Index written = 0;
@@ -105,5 +114,10 @@ void BufferStream::SetEof() {
}
}
-void BufferStream::DoClose() { CRU_STREAM_BEGIN_CLOSE }
+void BufferStream::DoClose() {
+ CRU_STREAM_BEGIN_CLOSE
+ SetClosed(true);
+ condition_variable_.notify_all();
+ buffer_list_.clear();
+}
} // namespace cru::io
diff --git a/src/base/io/CFileStream.cpp b/src/base/io/CFileStream.cpp
index d5acc707..45eb2eaf 100644
--- a/src/base/io/CFileStream.cpp
+++ b/src/base/io/CFileStream.cpp
@@ -90,7 +90,9 @@ void CFileStream::DoFlush() { std::fflush(file_); }
void CFileStream::DoClose() {
CRU_STREAM_BEGIN_CLOSE
- std::fclose(file_);
+ if (auto_close_ && !std::fclose(file_)) {
+ throw Exception(u"Failed to close FILE.");
+ }
file_ = nullptr;
}
} // namespace cru::io
diff --git a/src/base/io/Stream.cpp b/src/base/io/Stream.cpp
index d65bac46..1944ea7e 100644
--- a/src/base/io/Stream.cpp
+++ b/src/base/io/Stream.cpp
@@ -23,11 +23,11 @@ void StreamOperationNotSupportedException::CheckWrite(bool writable) {
if (!writable) throw StreamOperationNotSupportedException(u"write");
}
-StreamAlreadyClosedException::StreamAlreadyClosedException()
+StreamClosedException::StreamClosedException()
: Exception(u"Stream is already closed.") {}
-void StreamAlreadyClosedException::Check(bool closed) {
- if (closed) throw StreamAlreadyClosedException();
+void StreamClosedException::Check(bool closed) {
+ if (closed) throw StreamClosedException();
}
Stream::Stream(SupportedOperations supported_operations)