aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcrupest <crupest@outlook.com>2024-06-24 00:06:25 +0800
committercrupest <crupest@outlook.com>2024-07-20 22:58:10 +0800
commite532469ca8844bf4daff8d462f80abdd776c018f (patch)
tree7182c3a3bb5978f5be7b5f8798eef0bef9c797eb
parent937e64e8a115a0d6d7e6e2c466b03945b71114bc (diff)
downloadcru-e532469ca8844bf4daff8d462f80abdd776c018f.tar.gz
cru-e532469ca8844bf4daff8d462f80abdd776c018f.tar.bz2
cru-e532469ca8844bf4daff8d462f80abdd776c018f.zip
feat: change subprocess implementation.
NEED TEST: BufferStream, AutoReadStream, SubProcess.
-rw-r--r--include/cru/common/SubProcess.h285
-rw-r--r--include/cru/common/platform/unix/PosixSpawnSubProcess.h21
-rw-r--r--src/common/SubProcess.cpp136
-rw-r--r--src/common/platform/unix/PosixSpawnSubProcess.cpp30
4 files changed, 259 insertions, 213 deletions
diff --git a/include/cru/common/SubProcess.h b/include/cru/common/SubProcess.h
index 86dd3ebe..9cfe3a8e 100644
--- a/include/cru/common/SubProcess.h
+++ b/include/cru/common/SubProcess.h
@@ -79,28 +79,66 @@ struct SubProcessExitResult {
}
};
-/**
- * @brief Base class of a platform process. It is one-time, which means it
- * starts and exits and can't start again.
- * @remarks
- * If an object of this class is destructed before the process exits, the
- * process will be killed.
- */
-class PlatformSubProcessBase : public Object {
- CRU_DEFINE_CLASS_LOG_TAG(u"PlatformSubProcessBase")
-
- public:
- explicit PlatformSubProcessBase(SubProcessStartInfo start_info);
-
- ~PlatformSubProcessBase() override;
-
+template <typename T>
+concept PlatformSubProcessImpl =
+ requires(T process, const SubProcessStartInfo& start_info) {
+ /**
+ * @brief Default constructible.
+ */
+ new T();
+
+ /**
+ * @brief Create and start a real process.
+ *
+ * If the program can't be created or start, an exception should be
+ * thrown.
+ *
+ * Implementation should fill internal data of the new process and start
+ * it.
+ *
+ * This method will be called only once in first call of `Start` on this
+ * thread with lock holdDefaultConstructible.
+ */
+ process.PlatformCreateProcess(start_info);
+
+ /**
+ * @brief Wait for the created process forever and return the exit result
+ * when process stops.
+ *
+ * Implementation should wait for the real process forever, after that,
+ * fill internal data and returns exit result.
+ *
+ * This method will be called only once on another thread after
+ * `PlatformCreateProcess` returns successfully
+ */
+ {
+ process.PlatformWaitForProcess()
+ } -> std::same_as<SubProcessExitResult>;
+
+ /**
+ * @brief Kill the process immediately.
+ *
+ * This method will be called only once on this thread given
+ * `PlatformCreateProcess` returns successfully. There will be a window
+ * that the window already exits but the status has not been updated and
+ * this is called. So handle this gracefully and write to internal data
+ * carefully.
+ */
+ process.PlatformKillProcess();
+
+ { process.GetStdinStream() } -> std::convertible_to<io::Stream*>;
+ { process.GetStdoutStream() } -> std::convertible_to<io::Stream*>;
+ { process.GetStderrStream() } -> std::convertible_to<io::Stream*>;
+ };
+
+struct IPlatformSubProcess : virtual Interface {
/**
* @brief Create and start a real process. If the process can't be created or
* start, `SubProcessFailedToStartException` will be thrown. If this function
* is already called once, `SubProcessException` will be thrown. Ensure only
* call this method once.
*/
- void Start();
+ virtual void Start() = 0;
/**
* @brief Wait for the process to exit optionally for at most `wait_time`. If
@@ -112,7 +150,7 @@ class PlatformSubProcessBase : public Object {
* or the process exits. But no, even if it is timeout, the process may also
* have exited due to task schedule.
*/
- void Wait(std::optional<std::chrono::milliseconds> wait_time);
+ virtual void Wait(std::optional<std::chrono::milliseconds> wait_time) = 0;
/**
* @brief kill the process if it is running. If the process already exits,
@@ -120,7 +158,7 @@ class PlatformSubProcessBase : public Object {
* `SubProcessException` will be throw. Ensure `Start` is called and does not
* throw before calling this.
*/
- void Kill();
+ virtual void Kill() = 0;
/**
* @brief Get the status of the process.
@@ -131,66 +169,203 @@ class PlatformSubProcessBase : public Object {
* actually running. Because there might be a window that the process exits
* already but status is not updated.
*/
- SubProcessStatus GetStatus();
+ virtual SubProcessStatus GetStatus() = 0;
/**
* @brief Get the exit result. If the process is not started, failed to start
* or running, `SubProcessException` will be thrown.
*/
- SubProcessExitResult GetExitResult();
+ virtual SubProcessExitResult GetExitResult() = 0;
virtual io::Stream* GetStdinStream() = 0;
virtual io::Stream* GetStdoutStream() = 0;
virtual io::Stream* GetStderrStream() = 0;
+};
+
+/**
+ * @brief A wrapper platform process. It is one-time, which means it
+ * starts and exits and can't start again.
+ *
+ * TODO: Current implementation has a problem. If the process does not exit for
+ * a long time, the resource related to it will not be released. It may cause a
+ * leak.
+ */
+template <PlatformSubProcessImpl Impl>
+class PlatformSubProcess : public Object, public virtual IPlatformSubProcess {
+ CRU_DEFINE_CLASS_LOG_TAG(u"PlatformSubProcessBase")
+
+ private:
+ struct State {
+ explicit State(SubProcessStartInfo start_info)
+ : start_info(std::move(start_info)) {}
+
+ std::mutex mutex;
+ std::unique_lock<std::mutex> lock{mutex, std::defer_lock};
+ std::condition_variable condition_variable;
+ SubProcessStartInfo start_info;
+ SubProcessExitResult exit_result;
+ SubProcessStatus status = SubProcessStatus::Prepare;
+ bool killed = false;
+ Impl impl;
+ };
+
+ public:
+ explicit PlatformSubProcess(SubProcessStartInfo start_info)
+ : state_(new State(std::move(start_info))) {}
- void SetDeleteSelfOnExit(bool enable);
+ ~PlatformSubProcess() override {}
- protected:
/**
- * @brief Create and start a real process. If the program can't be created or
- * start, an exception should be thrown.
- *
- * Implementation should fill internal data of the new process and start it.
- *
- * This method will be called only once in first call of `Start` on this
- * thread with lock hold.
+ * @brief Create and start a real process. If the process can't be created or
+ * start, `SubProcessFailedToStartException` will be thrown. If this function
+ * is already called once, `SubProcessException` will be thrown. Ensure only
+ * call this method once.
*/
- virtual void PlatformCreateProcess() = 0;
+ void Start() override {
+ std::lock_guard lock_guard(this->state_->lock);
+
+ if (this->state_->status != SubProcessStatus::Prepare) {
+ throw SubProcessException(u"The process has already tried to start.");
+ }
+
+ try {
+ this->state_->impl.PlatformCreateProcess(this->state_->start_info);
+ this->state_->status = SubProcessStatus::Running;
+
+ auto thread = std::thread([state = state_] {
+ std::lock_guard lock_guard(state->lock);
+ state->exit_result = state->impl.PlatformWaitForProcess();
+ state->status = SubProcessStatus::Exited;
+ state->condition_variable.notify_all();
+ });
+
+ thread.detach();
+ } catch (const std::exception& e) {
+ this->state_->status = SubProcessStatus::FailedToStart;
+ throw SubProcessFailedToStartException(u"Sub-process failed to start. " +
+ String::FromUtf8(e.what()));
+ }
+ }
/**
- * @brief Wait for the created process forever.
- *
- * Implementation should wait for the real process forever, after that, fill
- * internal data and returns exit result.
+ * @brief Wait for the process to exit optionally for at most `wait_time`. If
+ * the process already exits, it will return immediately. If the process has
+ * not started or failed to start, `SubProcessException` will be thrown.
+ * Ensure `Start` is called and does not throw before calling this.
*
- * This method will be called only once on another thread after
- * `PlatformCreateProcess` returns successfully
+ * @remarks You may wish this returns bool to indicate whether it is timeout
+ * or the process exits. But no, even if it is timeout, the process may also
+ * have exited due to task schedule.
*/
- virtual SubProcessExitResult PlatformWaitForProcess() = 0;
+ void Wait(std::optional<std::chrono::milliseconds> wait_time) override {
+ std::lock_guard lock_guard(this->state_->lock);
+
+ if (this->state_->status == SubProcessStatus::Prepare) {
+ throw SubProcessException(
+ u"The process does not start. Can't wait for it.");
+ }
+
+ if (this->state_->status == SubProcessStatus::FailedToStart) {
+ throw SubProcessException(
+ u"The process failed to start. Can't wait for it.");
+ }
+
+ if (this->state_->status == SubProcessStatus::Exited) {
+ return;
+ }
+
+ auto predicate = [this] {
+ return this->state_->status == SubProcessStatus::Exited;
+ };
+
+ if (wait_time) {
+ this->state_->condition_variable.wait_for(this->state_->lock, *wait_time,
+ predicate);
+ } else {
+ this->state_->condition_variable.wait(this->state_->lock, predicate);
+ }
+ }
/**
- * @brief Kill the process immediately.
- *
- * This method will be called only once on this thread given
- * `PlatformCreateProcess` returns successfullyThere will be a window that the
- * window already exits but the status has not been updated and this is
- * called. So handle this gracefully and write to internal data carefully.
+ * @brief kill the process if it is running. If the process already exits,
+ * nothing will happen. If the process has not started or failed to start,
+ * `SubProcessException` will be throw. Ensure `Start` is called and does not
+ * throw before calling this.
*/
- virtual void PlatformKillProcess() = 0;
+ void Kill() override {
+ std::lock_guard lock_guard(this->state_->lock);
- protected:
- SubProcessStartInfo start_info_;
- SubProcessExitResult exit_result_;
+ if (this->state_->status == SubProcessStatus::Prepare) {
+ throw SubProcessException(u"The process does not start. Can't kill it.");
+ }
- private:
- SubProcessStatus status_;
+ if (this->state_->status == SubProcessStatus::FailedToStart) {
+ throw SubProcessException(u"The process failed to start. Can't kill it.");
+ }
+
+ if (this->state_->status == SubProcessStatus::Exited) {
+ return;
+ }
+
+ if (this->state_->killed) {
+ return;
+ }
+
+ this->state_->impl.PlatformKillProcess();
+ this->state_->killed = true;
+ }
+
+ /**
+ * @brief Get the status of the process.
+ * 1. If the process has tried to start, aka `Start` is called, then this
+ * method will return one of `Running`, `FailedToStart`, `Exited`.
+ * 2. If it returns `Prepare`, `Start` is not called.
+ * 3. It does NOT guarantee that this return `Running` and the process is
+ * actually running. Because there might be a window that the process exits
+ * already but status is not updated.
+ */
+ SubProcessStatus GetStatus() override {
+ std::lock_guard lock_guard(this->state_->lock);
+ return this->state_->status;
+ }
+
+ /**
+ * @brief Get the exit result. If the process is not started, failed to start
+ * or running, `SubProcessException` will be thrown.
+ */
+ SubProcessExitResult GetExitResult() override {
+ std::lock_guard lock_guard(this->state_->lock);
- bool delete_self_;
+ if (this->state_->status == SubProcessStatus::Prepare) {
+ throw SubProcessException(
+ u"The process does not start. Can't get exit result.");
+ }
- std::thread process_thread_;
- std::recursive_mutex process_mutex_;
- std::unique_lock<std::recursive_mutex> process_lock_;
- std::condition_variable_any process_condition_variable_;
+ if (this->state_->status == SubProcessStatus::FailedToStart) {
+ throw SubProcessException(
+ u"The process failed to start. Can't get exit result.");
+ }
+
+ if (this->state_->status == SubProcessStatus::Running) {
+ throw SubProcessException(
+ u"The process is running. Can't get exit result.");
+ }
+
+ return this->state_->exit_result;
+ }
+
+ io::Stream* GetStdinStream() override {
+ return this->state_->impl.GetStdinStream();
+ }
+ io::Stream* GetStdoutStream() override {
+ return this->state_->impl.GetStdoutStream();
+ }
+ io::Stream* GetStderrStream() override {
+ return this->state_->impl.GetStderrStream();
+ }
+
+ private:
+ std::shared_ptr<State> state_;
};
class CRU_BASE_API SubProcess : public Object {
@@ -231,6 +406,6 @@ class CRU_BASE_API SubProcess : public Object {
void CheckValid() const;
private:
- std::unique_ptr<PlatformSubProcessBase> platform_process_;
+ std::unique_ptr<IPlatformSubProcess> platform_process_;
};
} // namespace cru
diff --git a/include/cru/common/platform/unix/PosixSpawnSubProcess.h b/include/cru/common/platform/unix/PosixSpawnSubProcess.h
index 9c303700..d4df284b 100644
--- a/include/cru/common/platform/unix/PosixSpawnSubProcess.h
+++ b/include/cru/common/platform/unix/PosixSpawnSubProcess.h
@@ -16,21 +16,20 @@
#include <spawn.h>
namespace cru::platform::unix {
-class PosixSpawnSubProcess : public PlatformSubProcessBase {
+class PosixSpawnSubProcessImpl {
CRU_DEFINE_CLASS_LOG_TAG(u"PosixSpawnSubProcess")
public:
- explicit PosixSpawnSubProcess(SubProcessStartInfo start_info);
- ~PosixSpawnSubProcess();
+ explicit PosixSpawnSubProcessImpl();
+ ~PosixSpawnSubProcessImpl();
- io::Stream* GetStdinStream() override;
- io::Stream* GetStdoutStream() override;
- io::Stream* GetStderrStream() override;
+ io::Stream* GetStdinStream();
+ io::Stream* GetStdoutStream();
+ io::Stream* GetStderrStream();
- protected:
- void PlatformCreateProcess() override;
- SubProcessExitResult PlatformWaitForProcess() override;
- void PlatformKillProcess() override;
+ void PlatformCreateProcess(const SubProcessStartInfo& start_info);
+ SubProcessExitResult PlatformWaitForProcess();
+ void PlatformKillProcess();
private:
pid_t pid_;
@@ -47,4 +46,6 @@ class PosixSpawnSubProcess : public PlatformSubProcessBase {
std::unique_ptr<io::AutoReadStream> stdout_buffer_stream_;
std::unique_ptr<io::AutoReadStream> stderr_buffer_stream_;
};
+
+using PosixSpawnSubProcess = PlatformSubProcess<PosixSpawnSubProcessImpl>;
} // namespace cru::platform::unix
diff --git a/src/common/SubProcess.cpp b/src/common/SubProcess.cpp
index e2738248..69f52d9c 100644
--- a/src/common/SubProcess.cpp
+++ b/src/common/SubProcess.cpp
@@ -1,140 +1,13 @@
#include "cru/common/SubProcess.h"
-#include <exception>
-#include "cru/common/Exception.h"
-#include "cru/common/log/Logger.h"
#ifdef CRU_PLATFORM_UNIX
#include "cru/common/platform/unix/PosixSpawnSubProcess.h"
#endif
-#include <mutex>
-
namespace cru {
-PlatformSubProcessBase::PlatformSubProcessBase(SubProcessStartInfo start_info)
- : start_info_(std::move(start_info)),
- delete_self_(false),
- process_lock_(process_mutex_, std::defer_lock) {}
-
-PlatformSubProcessBase::~PlatformSubProcessBase() {
- std::lock_guard lock_guard(process_lock_);
- if (status_ == SubProcessStatus::Running) {
- CRU_LOG_ERROR(
- u"PlatformSubProcessBase is destroyed but process is still running.");
- std::terminate();
- }
-}
-
-void PlatformSubProcessBase::Start() {
- std::lock_guard lock_guard(process_lock_);
-
- if (status_ != SubProcessStatus::Prepare) {
- throw SubProcessException(u"The process has already tried to start.");
- }
-
- try {
- PlatformCreateProcess();
-
- status_ = SubProcessStatus::Running;
-
- process_thread_ = std::thread([this] {
- auto exit_result = PlatformWaitForProcess();
- {
- std::lock_guard lock_guard(process_lock_);
- exit_result_ = std::move(exit_result);
- status_ = SubProcessStatus::Exited;
- }
- this->process_condition_variable_.notify_all();
- if (this->delete_self_) {
- delete this;
- }
- });
-
- process_thread_.detach();
- } catch (const std::exception& e) {
- status_ = SubProcessStatus::FailedToStart;
- throw SubProcessFailedToStartException(u"Sub-process failed to start. " +
- String::FromUtf8(e.what()));
- }
-}
-
-void PlatformSubProcessBase::Wait(
- std::optional<std::chrono::milliseconds> wait_time) {
- std::lock_guard lock_guard(process_lock_);
-
- if (status_ == SubProcessStatus::Prepare) {
- throw SubProcessException(
- u"The process does not start. Can't wait for it.");
- }
-
- if (status_ == SubProcessStatus::FailedToStart) {
- throw SubProcessException(
- u"The process failed to start. Can't wait for it.");
- }
-
- if (status_ == SubProcessStatus::Exited) {
- return;
- }
-
- auto predicate = [this] { return status_ == SubProcessStatus::Exited; };
-
- if (wait_time) {
- process_condition_variable_.wait_for(process_lock_, *wait_time, predicate);
- } else {
- process_condition_variable_.wait(process_lock_, predicate);
- }
-}
-
-void PlatformSubProcessBase::Kill() {
- std::lock_guard data_lock_guard(process_lock_);
-
- if (status_ == SubProcessStatus::Prepare) {
- throw SubProcessException(u"The process does not start. Can't kill it.");
- }
-
- if (status_ == SubProcessStatus::FailedToStart) {
- throw SubProcessException(u"The process failed to start. Can't kill it.");
- }
-
- if (status_ == SubProcessStatus::Exited) {
- return;
- }
-
- PlatformKillProcess();
-}
-
-SubProcessStatus PlatformSubProcessBase::GetStatus() {
- std::lock_guard data_lock_guard(process_lock_);
- return status_;
-}
-
-SubProcessExitResult PlatformSubProcessBase::GetExitResult() {
- std::lock_guard lock_guard(process_lock_);
-
- if (status_ == SubProcessStatus::Prepare) {
- throw SubProcessException(
- u"The process does not start. Can't get exit result.");
- }
-
- if (status_ == SubProcessStatus::FailedToStart) {
- throw SubProcessException(
- u"The process failed to start. Can't get exit result.");
- }
-
- if (status_ == SubProcessStatus::Running) {
- throw SubProcessException(
- u"The process is running. Can't get exit result.");
- }
-
- return exit_result_;
-}
-
-void PlatformSubProcessBase::SetDeleteSelfOnExit(bool enable) {
- std::lock_guard lock_guard(process_lock_);
- delete_self_ = enable;
-}
#ifdef CRU_PLATFORM_UNIX
-using PlatformSubProcess = platform::unix::PosixSpawnSubProcess;
+using ThisPlatformSubProcess = platform::unix::PosixSpawnSubProcess;
#endif
SubProcess SubProcess::Create(String program, std::vector<String> arguments,
@@ -147,7 +20,7 @@ SubProcess SubProcess::Create(String program, std::vector<String> arguments,
}
SubProcess::SubProcess(SubProcessStartInfo start_info) {
- platform_process_.reset(new PlatformSubProcess(std::move(start_info)));
+ platform_process_.reset(new ThisPlatformSubProcess(std::move(start_info)));
platform_process_->Start();
}
@@ -188,10 +61,7 @@ io::Stream* SubProcess::GetStderrStream() {
return platform_process_->GetStderrStream();
}
-void SubProcess::Detach() {
- auto p = platform_process_.release();
- p->SetDeleteSelfOnExit(true);
-}
+void SubProcess::Detach() { auto p = platform_process_.release(); }
void SubProcess::CheckValid() const {
if (!IsValid()) {
diff --git a/src/common/platform/unix/PosixSpawnSubProcess.cpp b/src/common/platform/unix/PosixSpawnSubProcess.cpp
index 4065d6f6..1e2a84d6 100644
--- a/src/common/platform/unix/PosixSpawnSubProcess.cpp
+++ b/src/common/platform/unix/PosixSpawnSubProcess.cpp
@@ -14,9 +14,8 @@
#include <unordered_map>
namespace cru::platform::unix {
-PosixSpawnSubProcess::PosixSpawnSubProcess(SubProcessStartInfo start_info)
- : PlatformSubProcessBase(std::move(start_info)),
- pid_(0),
+PosixSpawnSubProcessImpl::PosixSpawnSubProcessImpl()
+ : pid_(0),
exit_code_(0),
stdin_pipe_(UnixPipe::Usage::Send),
stdout_pipe_(UnixPipe::Usage::Receive),
@@ -34,17 +33,17 @@ PosixSpawnSubProcess::PosixSpawnSubProcess(SubProcessStartInfo start_info)
std::make_unique<io::AutoReadStream>(stderr_stream_.get(), false);
}
-PosixSpawnSubProcess::~PosixSpawnSubProcess() {}
+PosixSpawnSubProcessImpl::~PosixSpawnSubProcessImpl() {}
-io::Stream* PosixSpawnSubProcess::GetStdinStream() {
+io::Stream* PosixSpawnSubProcessImpl::GetStdinStream() {
return stdin_stream_.get();
}
-io::Stream* PosixSpawnSubProcess::GetStdoutStream() {
+io::Stream* PosixSpawnSubProcessImpl::GetStdoutStream() {
return stdout_buffer_stream_.get();
}
-io::Stream* PosixSpawnSubProcess::GetStderrStream() {
+io::Stream* PosixSpawnSubProcessImpl::GetStderrStream() {
return stderr_buffer_stream_.get();
}
@@ -82,7 +81,8 @@ void DestroyCstrArray(char** argv) {
}
} // namespace
-void PosixSpawnSubProcess::PlatformCreateProcess() {
+void PosixSpawnSubProcessImpl::PlatformCreateProcess(
+ const SubProcessStartInfo& start_info) {
int error;
auto check_error = [&error](String message) {
if (error == 0) return;
@@ -127,15 +127,15 @@ void PosixSpawnSubProcess::PlatformCreateProcess() {
check_error(u"Failed to set flag POSIX_SPAWN_CLOEXEC_DEFAULT (osx).");
#endif
- auto exe = start_info_.program.ToUtf8();
- std::vector<String> arguments{start_info_.program};
- arguments.insert(arguments.cend(), start_info_.arguments.cbegin(),
- start_info_.arguments.cend());
+ auto exe = start_info.program.ToUtf8();
+ std::vector<String> arguments{start_info.program};
+ arguments.insert(arguments.cend(), start_info.arguments.cbegin(),
+ start_info.arguments.cend());
auto argv = CreateCstrArray(arguments);
Guard argv_guard([argv] { DestroyCstrArray(argv); });
- auto envp = CreateCstrArray(start_info_.environments);
+ auto envp = CreateCstrArray(start_info.environments);
Guard envp_guard([envp] { DestroyCstrArray(envp); });
error = posix_spawnp(&pid_, exe.c_str(), &file_actions, &attr, argv, envp);
@@ -149,7 +149,7 @@ void PosixSpawnSubProcess::PlatformCreateProcess() {
check_error(u"Failed to close stderr.");
}
-SubProcessExitResult PosixSpawnSubProcess::PlatformWaitForProcess() {
+SubProcessExitResult PosixSpawnSubProcessImpl::PlatformWaitForProcess() {
int wstatus;
while (waitpid(pid_, &wstatus, 0) == -1) {
@@ -173,7 +173,7 @@ SubProcessExitResult PosixSpawnSubProcess::PlatformWaitForProcess() {
}
}
-void PosixSpawnSubProcess::PlatformKillProcess() {
+void PosixSpawnSubProcessImpl::PlatformKillProcess() {
int error = kill(pid_, SIGKILL);
if (error != 0) {
std::unique_ptr<ErrnoException> inner(new ErrnoException({}, errno));