aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcrupest <crupest@outlook.com>2024-06-24 00:06:25 +0800
committercrupest <crupest@outlook.com>2024-10-06 18:03:42 +0800
commitb2a331aa527bdd5f4d91727904cc5c66fe897d62 (patch)
treee84d0bfdd4ed63255024e02954963d8c564ad024
parentdfe62dcf8bcefc523b466e127c3edc4dc2756629 (diff)
downloadcru-b2a331aa527bdd5f4d91727904cc5c66fe897d62.tar.gz
cru-b2a331aa527bdd5f4d91727904cc5c66fe897d62.tar.bz2
cru-b2a331aa527bdd5f4d91727904cc5c66fe897d62.zip
Done Stream refactor.
NEED TEST: BufferStream, AutoReadStream, SubProcess.
-rw-r--r--include/cru/base/io/AutoReadStream.h6
-rw-r--r--include/cru/base/io/Stream.h8
-rw-r--r--src/base/io/AutoReadStream.cpp30
-rw-r--r--src/base/io/BufferStream.cpp22
-rw-r--r--src/base/io/CFileStream.cpp4
-rw-r--r--src/base/io/Stream.cpp6
-rw-r--r--src/base/platform/unix/PosixSpawnSubProcess.cpp4
-rw-r--r--src/base/platform/unix/UnixFileStream.cpp10
8 files changed, 54 insertions, 36 deletions
diff --git a/include/cru/base/io/AutoReadStream.h b/include/cru/base/io/AutoReadStream.h
index 759d5026..56e2beca 100644
--- a/include/cru/base/io/AutoReadStream.h
+++ b/include/cru/base/io/AutoReadStream.h
@@ -39,7 +39,7 @@ class CRU_BASE_API AutoReadStream : public Stream {
* @param options Options to modify the behavior.
*/
AutoReadStream(
- Stream* stream, bool auto_delete,
+ Stream* stream, bool auto_close, bool auto_delete,
const AutoReadStreamOptions& options = AutoReadStreamOptions());
~AutoReadStream() override;
@@ -47,7 +47,7 @@ class CRU_BASE_API AutoReadStream : public Stream {
public:
CRU_STREAM_IMPLEMENT_CLOSE_BY_DO_CLOSE
- void BeginToDrop(bool auto_delete = true);
+ void BeginToDrop(bool auto_close = true, bool auto_delete = true);
protected:
Index DoRead(std::byte* buffer, Index offset, Index size) override;
@@ -61,11 +61,11 @@ class CRU_BASE_API AutoReadStream : public Stream {
private:
Stream* stream_;
+ bool auto_close_;
bool auto_delete_;
Index size_per_read_;
std::unique_ptr<BufferStream> buffer_stream_;
- std::mutex buffer_stream_mutex_;
std::thread background_thread_;
};
diff --git a/include/cru/base/io/Stream.h b/include/cru/base/io/Stream.h
index e0b61627..0965cf22 100644
--- a/include/cru/base/io/Stream.h
+++ b/include/cru/base/io/Stream.h
@@ -26,11 +26,11 @@ class CRU_BASE_API StreamOperationNotSupportedException : public Exception {
String operation_;
};
-class CRU_BASE_API StreamAlreadyClosedException : public Exception {
+class CRU_BASE_API StreamClosedException : public Exception {
public:
- StreamAlreadyClosedException();
+ StreamClosedException();
- CRU_DEFAULT_DESTRUCTOR(StreamAlreadyClosedException)
+ CRU_DEFAULT_DESTRUCTOR(StreamClosedException)
static void Check(bool closed);
};
@@ -117,7 +117,7 @@ class CRU_BASE_API Stream : public Object {
bool GetClosed() { return closed_; }
void SetClosed(bool closed) { closed_ = closed; }
- void CheckClosed() { StreamAlreadyClosedException::Check(closed_); }
+ void CheckClosed() { StreamClosedException::Check(closed_); }
private:
std::optional<SupportedOperations> supported_operations_;
diff --git a/src/base/io/AutoReadStream.cpp b/src/base/io/AutoReadStream.cpp
index c24f61d1..0c035648 100644
--- a/src/base/io/AutoReadStream.cpp
+++ b/src/base/io/AutoReadStream.cpp
@@ -5,9 +5,12 @@
namespace cru::io {
-AutoReadStream::AutoReadStream(Stream* stream, bool auto_delete,
+AutoReadStream::AutoReadStream(Stream* stream, bool auto_close,
+ bool auto_delete,
const AutoReadStreamOptions& options)
- : Stream(false, true, stream->CanSeek()) {
+ : Stream(false, true, stream->CanSeek()),
+ auto_close_(auto_close),
+ auto_delete_(auto_delete) {
auto buffer_stream_options = options.GetBufferStreamOptions();
stream_ = stream;
size_per_read_ = buffer_stream_options.GetBlockSizeOrDefault();
@@ -16,13 +19,11 @@ AutoReadStream::AutoReadStream(Stream* stream, bool auto_delete,
}
AutoReadStream::~AutoReadStream() {
- if (auto_delete_) {
- delete stream_;
- }
+ DoClose();
+ background_thread_.join();
}
Index AutoReadStream::DoRead(std::byte* buffer, Index offset, Index size) {
- std::unique_lock lock(buffer_stream_mutex_);
return buffer_stream_->Read(buffer, offset, size);
}
@@ -33,7 +34,17 @@ Index AutoReadStream::DoWrite(const std::byte* buffer, Index offset,
void AutoReadStream::DoFlush() { stream_->Flush(); }
-void AutoReadStream::DoClose() {}
+void AutoReadStream::DoClose() {
+ CRU_STREAM_BEGIN_CLOSE
+ if (auto_close_) {
+ stream_->Close();
+ }
+ if (auto_delete_) {
+ delete stream_;
+ stream_ = nullptr;
+ }
+ buffer_stream_->Close();
+}
void AutoReadStream::BackgroundThreadRun() {
std::vector<std::byte> buffer(size_per_read_);
@@ -46,11 +57,10 @@ void AutoReadStream::BackgroundThreadRun() {
} else {
buffer_stream_->Write(buffer.data(), read);
}
- } catch (const StreamAlreadyClosedException& exception) {
- buffer_stream_->SetEof();
+ } catch (const StreamClosedException& exception) {
+ buffer_stream_->Close();
break;
}
}
}
-
} // namespace cru::io
diff --git a/src/base/io/BufferStream.cpp b/src/base/io/BufferStream.cpp
index e81731e8..57a8b694 100644
--- a/src/base/io/BufferStream.cpp
+++ b/src/base/io/BufferStream.cpp
@@ -15,8 +15,12 @@ BufferStream::~BufferStream() { DoClose(); }
Index BufferStream::DoRead(std::byte* buffer, Index offset, Index size) {
std::unique_lock lock(mutex_);
- condition_variable_.wait(lock,
- [this] { return !buffer_list_.empty() || eof_; });
+ condition_variable_.wait(
+ lock, [this] { return GetClosed() || !buffer_list_.empty() || eof_; });
+
+ if (GetClosed()) {
+ StreamClosedException::Check(true);
+ }
if (buffer_list_.empty() && eof_) {
return 0;
@@ -57,10 +61,15 @@ Index BufferStream::DoWrite(const std::byte* buffer, Index offset, Index size) {
}
condition_variable_.wait(lock, [this] {
- return max_block_count_ <= 0 || buffer_list_.size() < max_block_count_ ||
+ return GetClosed() || max_block_count_ <= 0 ||
+ buffer_list_.size() < max_block_count_ ||
buffer_list_.back().GetBackFree() > 0;
});
+ if (GetClosed()) {
+ StreamClosedException::Check(true);
+ }
+
auto empty = buffer_list_.empty();
Index written = 0;
@@ -105,5 +114,10 @@ void BufferStream::SetEof() {
}
}
-void BufferStream::DoClose() { CRU_STREAM_BEGIN_CLOSE }
+void BufferStream::DoClose() {
+ CRU_STREAM_BEGIN_CLOSE
+ SetClosed(true);
+ condition_variable_.notify_all();
+ buffer_list_.clear();
+}
} // namespace cru::io
diff --git a/src/base/io/CFileStream.cpp b/src/base/io/CFileStream.cpp
index d5acc707..45eb2eaf 100644
--- a/src/base/io/CFileStream.cpp
+++ b/src/base/io/CFileStream.cpp
@@ -90,7 +90,9 @@ void CFileStream::DoFlush() { std::fflush(file_); }
void CFileStream::DoClose() {
CRU_STREAM_BEGIN_CLOSE
- std::fclose(file_);
+ if (auto_close_ && !std::fclose(file_)) {
+ throw Exception(u"Failed to close FILE.");
+ }
file_ = nullptr;
}
} // namespace cru::io
diff --git a/src/base/io/Stream.cpp b/src/base/io/Stream.cpp
index d65bac46..1944ea7e 100644
--- a/src/base/io/Stream.cpp
+++ b/src/base/io/Stream.cpp
@@ -23,11 +23,11 @@ void StreamOperationNotSupportedException::CheckWrite(bool writable) {
if (!writable) throw StreamOperationNotSupportedException(u"write");
}
-StreamAlreadyClosedException::StreamAlreadyClosedException()
+StreamClosedException::StreamClosedException()
: Exception(u"Stream is already closed.") {}
-void StreamAlreadyClosedException::Check(bool closed) {
- if (closed) throw StreamAlreadyClosedException();
+void StreamClosedException::Check(bool closed) {
+ if (closed) throw StreamClosedException();
}
Stream::Stream(SupportedOperations supported_operations)
diff --git a/src/base/platform/unix/PosixSpawnSubProcess.cpp b/src/base/platform/unix/PosixSpawnSubProcess.cpp
index 75d48cc2..5e09c494 100644
--- a/src/base/platform/unix/PosixSpawnSubProcess.cpp
+++ b/src/base/platform/unix/PosixSpawnSubProcess.cpp
@@ -28,9 +28,9 @@ PosixSpawnSubProcessImpl::PosixSpawnSubProcessImpl()
stderr_pipe_.GetSelfFileDescriptor(), false, true, false, true);
stdout_buffer_stream_ =
- std::make_unique<io::AutoReadStream>(stdout_stream_.get(), false);
+ std::make_unique<io::AutoReadStream>(stdout_stream_.get(), true, false);
stderr_buffer_stream_ =
- std::make_unique<io::AutoReadStream>(stderr_stream_.get(), false);
+ std::make_unique<io::AutoReadStream>(stderr_stream_.get(), true, false);
}
PosixSpawnSubProcessImpl::~PosixSpawnSubProcessImpl() {}
diff --git a/src/base/platform/unix/UnixFileStream.cpp b/src/base/platform/unix/UnixFileStream.cpp
index c53bbbaa..6d8bab25 100644
--- a/src/base/platform/unix/UnixFileStream.cpp
+++ b/src/base/platform/unix/UnixFileStream.cpp
@@ -61,15 +61,7 @@ UnixFileStream::UnixFileStream(int fd, bool can_seek, bool can_read,
auto_close_ = auto_close;
}
-UnixFileStream::~UnixFileStream() {
- if (auto_close_ && file_descriptor_ >= 0) {
- if (::close(file_descriptor_) == -1) {
- // We are in destructor, so we can not throw.
- CRU_LOG_WARN(u"Failed to close file descriptor {}, errno {}.",
- file_descriptor_, errno);
- }
- }
-}
+UnixFileStream::~UnixFileStream() { DoClose(); }
Index UnixFileStream::DoSeek(Index offset, SeekOrigin origin) {
off_t result = ::lseek(file_descriptor_, offset, MapSeekOrigin(origin));