diff options
author | crupest <crupest@outlook.com> | 2024-06-24 00:06:25 +0800 |
---|---|---|
committer | crupest <crupest@outlook.com> | 2024-07-20 22:58:10 +0800 |
commit | e532469ca8844bf4daff8d462f80abdd776c018f (patch) | |
tree | 7182c3a3bb5978f5be7b5f8798eef0bef9c797eb | |
parent | 937e64e8a115a0d6d7e6e2c466b03945b71114bc (diff) | |
download | cru-e532469ca8844bf4daff8d462f80abdd776c018f.tar.gz cru-e532469ca8844bf4daff8d462f80abdd776c018f.tar.bz2 cru-e532469ca8844bf4daff8d462f80abdd776c018f.zip |
feat: change subprocess implementation.
NEED TEST: BufferStream, AutoReadStream, SubProcess.
-rw-r--r-- | include/cru/common/SubProcess.h | 285 | ||||
-rw-r--r-- | include/cru/common/platform/unix/PosixSpawnSubProcess.h | 21 | ||||
-rw-r--r-- | src/common/SubProcess.cpp | 136 | ||||
-rw-r--r-- | src/common/platform/unix/PosixSpawnSubProcess.cpp | 30 |
4 files changed, 259 insertions, 213 deletions
diff --git a/include/cru/common/SubProcess.h b/include/cru/common/SubProcess.h index 86dd3ebe..9cfe3a8e 100644 --- a/include/cru/common/SubProcess.h +++ b/include/cru/common/SubProcess.h @@ -79,28 +79,66 @@ struct SubProcessExitResult { } }; -/** - * @brief Base class of a platform process. It is one-time, which means it - * starts and exits and can't start again. - * @remarks - * If an object of this class is destructed before the process exits, the - * process will be killed. - */ -class PlatformSubProcessBase : public Object { - CRU_DEFINE_CLASS_LOG_TAG(u"PlatformSubProcessBase") - - public: - explicit PlatformSubProcessBase(SubProcessStartInfo start_info); - - ~PlatformSubProcessBase() override; - +template <typename T> +concept PlatformSubProcessImpl = + requires(T process, const SubProcessStartInfo& start_info) { + /** + * @brief Default constructible. + */ + new T(); + + /** + * @brief Create and start a real process. + * + * If the program can't be created or start, an exception should be + * thrown. + * + * Implementation should fill internal data of the new process and start + * it. + * + * This method will be called only once in first call of `Start` on this + * thread with lock holdDefaultConstructible. + */ + process.PlatformCreateProcess(start_info); + + /** + * @brief Wait for the created process forever and return the exit result + * when process stops. + * + * Implementation should wait for the real process forever, after that, + * fill internal data and returns exit result. + * + * This method will be called only once on another thread after + * `PlatformCreateProcess` returns successfully + */ + { + process.PlatformWaitForProcess() + } -> std::same_as<SubProcessExitResult>; + + /** + * @brief Kill the process immediately. + * + * This method will be called only once on this thread given + * `PlatformCreateProcess` returns successfully. There will be a window + * that the window already exits but the status has not been updated and + * this is called. So handle this gracefully and write to internal data + * carefully. + */ + process.PlatformKillProcess(); + + { process.GetStdinStream() } -> std::convertible_to<io::Stream*>; + { process.GetStdoutStream() } -> std::convertible_to<io::Stream*>; + { process.GetStderrStream() } -> std::convertible_to<io::Stream*>; + }; + +struct IPlatformSubProcess : virtual Interface { /** * @brief Create and start a real process. If the process can't be created or * start, `SubProcessFailedToStartException` will be thrown. If this function * is already called once, `SubProcessException` will be thrown. Ensure only * call this method once. */ - void Start(); + virtual void Start() = 0; /** * @brief Wait for the process to exit optionally for at most `wait_time`. If @@ -112,7 +150,7 @@ class PlatformSubProcessBase : public Object { * or the process exits. But no, even if it is timeout, the process may also * have exited due to task schedule. */ - void Wait(std::optional<std::chrono::milliseconds> wait_time); + virtual void Wait(std::optional<std::chrono::milliseconds> wait_time) = 0; /** * @brief kill the process if it is running. If the process already exits, @@ -120,7 +158,7 @@ class PlatformSubProcessBase : public Object { * `SubProcessException` will be throw. Ensure `Start` is called and does not * throw before calling this. */ - void Kill(); + virtual void Kill() = 0; /** * @brief Get the status of the process. @@ -131,66 +169,203 @@ class PlatformSubProcessBase : public Object { * actually running. Because there might be a window that the process exits * already but status is not updated. */ - SubProcessStatus GetStatus(); + virtual SubProcessStatus GetStatus() = 0; /** * @brief Get the exit result. If the process is not started, failed to start * or running, `SubProcessException` will be thrown. */ - SubProcessExitResult GetExitResult(); + virtual SubProcessExitResult GetExitResult() = 0; virtual io::Stream* GetStdinStream() = 0; virtual io::Stream* GetStdoutStream() = 0; virtual io::Stream* GetStderrStream() = 0; +}; + +/** + * @brief A wrapper platform process. It is one-time, which means it + * starts and exits and can't start again. + * + * TODO: Current implementation has a problem. If the process does not exit for + * a long time, the resource related to it will not be released. It may cause a + * leak. + */ +template <PlatformSubProcessImpl Impl> +class PlatformSubProcess : public Object, public virtual IPlatformSubProcess { + CRU_DEFINE_CLASS_LOG_TAG(u"PlatformSubProcessBase") + + private: + struct State { + explicit State(SubProcessStartInfo start_info) + : start_info(std::move(start_info)) {} + + std::mutex mutex; + std::unique_lock<std::mutex> lock{mutex, std::defer_lock}; + std::condition_variable condition_variable; + SubProcessStartInfo start_info; + SubProcessExitResult exit_result; + SubProcessStatus status = SubProcessStatus::Prepare; + bool killed = false; + Impl impl; + }; + + public: + explicit PlatformSubProcess(SubProcessStartInfo start_info) + : state_(new State(std::move(start_info))) {} - void SetDeleteSelfOnExit(bool enable); + ~PlatformSubProcess() override {} - protected: /** - * @brief Create and start a real process. If the program can't be created or - * start, an exception should be thrown. - * - * Implementation should fill internal data of the new process and start it. - * - * This method will be called only once in first call of `Start` on this - * thread with lock hold. + * @brief Create and start a real process. If the process can't be created or + * start, `SubProcessFailedToStartException` will be thrown. If this function + * is already called once, `SubProcessException` will be thrown. Ensure only + * call this method once. */ - virtual void PlatformCreateProcess() = 0; + void Start() override { + std::lock_guard lock_guard(this->state_->lock); + + if (this->state_->status != SubProcessStatus::Prepare) { + throw SubProcessException(u"The process has already tried to start."); + } + + try { + this->state_->impl.PlatformCreateProcess(this->state_->start_info); + this->state_->status = SubProcessStatus::Running; + + auto thread = std::thread([state = state_] { + std::lock_guard lock_guard(state->lock); + state->exit_result = state->impl.PlatformWaitForProcess(); + state->status = SubProcessStatus::Exited; + state->condition_variable.notify_all(); + }); + + thread.detach(); + } catch (const std::exception& e) { + this->state_->status = SubProcessStatus::FailedToStart; + throw SubProcessFailedToStartException(u"Sub-process failed to start. " + + String::FromUtf8(e.what())); + } + } /** - * @brief Wait for the created process forever. - * - * Implementation should wait for the real process forever, after that, fill - * internal data and returns exit result. + * @brief Wait for the process to exit optionally for at most `wait_time`. If + * the process already exits, it will return immediately. If the process has + * not started or failed to start, `SubProcessException` will be thrown. + * Ensure `Start` is called and does not throw before calling this. * - * This method will be called only once on another thread after - * `PlatformCreateProcess` returns successfully + * @remarks You may wish this returns bool to indicate whether it is timeout + * or the process exits. But no, even if it is timeout, the process may also + * have exited due to task schedule. */ - virtual SubProcessExitResult PlatformWaitForProcess() = 0; + void Wait(std::optional<std::chrono::milliseconds> wait_time) override { + std::lock_guard lock_guard(this->state_->lock); + + if (this->state_->status == SubProcessStatus::Prepare) { + throw SubProcessException( + u"The process does not start. Can't wait for it."); + } + + if (this->state_->status == SubProcessStatus::FailedToStart) { + throw SubProcessException( + u"The process failed to start. Can't wait for it."); + } + + if (this->state_->status == SubProcessStatus::Exited) { + return; + } + + auto predicate = [this] { + return this->state_->status == SubProcessStatus::Exited; + }; + + if (wait_time) { + this->state_->condition_variable.wait_for(this->state_->lock, *wait_time, + predicate); + } else { + this->state_->condition_variable.wait(this->state_->lock, predicate); + } + } /** - * @brief Kill the process immediately. - * - * This method will be called only once on this thread given - * `PlatformCreateProcess` returns successfullyThere will be a window that the - * window already exits but the status has not been updated and this is - * called. So handle this gracefully and write to internal data carefully. + * @brief kill the process if it is running. If the process already exits, + * nothing will happen. If the process has not started or failed to start, + * `SubProcessException` will be throw. Ensure `Start` is called and does not + * throw before calling this. */ - virtual void PlatformKillProcess() = 0; + void Kill() override { + std::lock_guard lock_guard(this->state_->lock); - protected: - SubProcessStartInfo start_info_; - SubProcessExitResult exit_result_; + if (this->state_->status == SubProcessStatus::Prepare) { + throw SubProcessException(u"The process does not start. Can't kill it."); + } - private: - SubProcessStatus status_; + if (this->state_->status == SubProcessStatus::FailedToStart) { + throw SubProcessException(u"The process failed to start. Can't kill it."); + } + + if (this->state_->status == SubProcessStatus::Exited) { + return; + } + + if (this->state_->killed) { + return; + } + + this->state_->impl.PlatformKillProcess(); + this->state_->killed = true; + } + + /** + * @brief Get the status of the process. + * 1. If the process has tried to start, aka `Start` is called, then this + * method will return one of `Running`, `FailedToStart`, `Exited`. + * 2. If it returns `Prepare`, `Start` is not called. + * 3. It does NOT guarantee that this return `Running` and the process is + * actually running. Because there might be a window that the process exits + * already but status is not updated. + */ + SubProcessStatus GetStatus() override { + std::lock_guard lock_guard(this->state_->lock); + return this->state_->status; + } + + /** + * @brief Get the exit result. If the process is not started, failed to start + * or running, `SubProcessException` will be thrown. + */ + SubProcessExitResult GetExitResult() override { + std::lock_guard lock_guard(this->state_->lock); - bool delete_self_; + if (this->state_->status == SubProcessStatus::Prepare) { + throw SubProcessException( + u"The process does not start. Can't get exit result."); + } - std::thread process_thread_; - std::recursive_mutex process_mutex_; - std::unique_lock<std::recursive_mutex> process_lock_; - std::condition_variable_any process_condition_variable_; + if (this->state_->status == SubProcessStatus::FailedToStart) { + throw SubProcessException( + u"The process failed to start. Can't get exit result."); + } + + if (this->state_->status == SubProcessStatus::Running) { + throw SubProcessException( + u"The process is running. Can't get exit result."); + } + + return this->state_->exit_result; + } + + io::Stream* GetStdinStream() override { + return this->state_->impl.GetStdinStream(); + } + io::Stream* GetStdoutStream() override { + return this->state_->impl.GetStdoutStream(); + } + io::Stream* GetStderrStream() override { + return this->state_->impl.GetStderrStream(); + } + + private: + std::shared_ptr<State> state_; }; class CRU_BASE_API SubProcess : public Object { @@ -231,6 +406,6 @@ class CRU_BASE_API SubProcess : public Object { void CheckValid() const; private: - std::unique_ptr<PlatformSubProcessBase> platform_process_; + std::unique_ptr<IPlatformSubProcess> platform_process_; }; } // namespace cru diff --git a/include/cru/common/platform/unix/PosixSpawnSubProcess.h b/include/cru/common/platform/unix/PosixSpawnSubProcess.h index 9c303700..d4df284b 100644 --- a/include/cru/common/platform/unix/PosixSpawnSubProcess.h +++ b/include/cru/common/platform/unix/PosixSpawnSubProcess.h @@ -16,21 +16,20 @@ #include <spawn.h> namespace cru::platform::unix { -class PosixSpawnSubProcess : public PlatformSubProcessBase { +class PosixSpawnSubProcessImpl { CRU_DEFINE_CLASS_LOG_TAG(u"PosixSpawnSubProcess") public: - explicit PosixSpawnSubProcess(SubProcessStartInfo start_info); - ~PosixSpawnSubProcess(); + explicit PosixSpawnSubProcessImpl(); + ~PosixSpawnSubProcessImpl(); - io::Stream* GetStdinStream() override; - io::Stream* GetStdoutStream() override; - io::Stream* GetStderrStream() override; + io::Stream* GetStdinStream(); + io::Stream* GetStdoutStream(); + io::Stream* GetStderrStream(); - protected: - void PlatformCreateProcess() override; - SubProcessExitResult PlatformWaitForProcess() override; - void PlatformKillProcess() override; + void PlatformCreateProcess(const SubProcessStartInfo& start_info); + SubProcessExitResult PlatformWaitForProcess(); + void PlatformKillProcess(); private: pid_t pid_; @@ -47,4 +46,6 @@ class PosixSpawnSubProcess : public PlatformSubProcessBase { std::unique_ptr<io::AutoReadStream> stdout_buffer_stream_; std::unique_ptr<io::AutoReadStream> stderr_buffer_stream_; }; + +using PosixSpawnSubProcess = PlatformSubProcess<PosixSpawnSubProcessImpl>; } // namespace cru::platform::unix diff --git a/src/common/SubProcess.cpp b/src/common/SubProcess.cpp index e2738248..69f52d9c 100644 --- a/src/common/SubProcess.cpp +++ b/src/common/SubProcess.cpp @@ -1,140 +1,13 @@ #include "cru/common/SubProcess.h" -#include <exception> -#include "cru/common/Exception.h" -#include "cru/common/log/Logger.h" #ifdef CRU_PLATFORM_UNIX #include "cru/common/platform/unix/PosixSpawnSubProcess.h" #endif -#include <mutex> - namespace cru { -PlatformSubProcessBase::PlatformSubProcessBase(SubProcessStartInfo start_info) - : start_info_(std::move(start_info)), - delete_self_(false), - process_lock_(process_mutex_, std::defer_lock) {} - -PlatformSubProcessBase::~PlatformSubProcessBase() { - std::lock_guard lock_guard(process_lock_); - if (status_ == SubProcessStatus::Running) { - CRU_LOG_ERROR( - u"PlatformSubProcessBase is destroyed but process is still running."); - std::terminate(); - } -} - -void PlatformSubProcessBase::Start() { - std::lock_guard lock_guard(process_lock_); - - if (status_ != SubProcessStatus::Prepare) { - throw SubProcessException(u"The process has already tried to start."); - } - - try { - PlatformCreateProcess(); - - status_ = SubProcessStatus::Running; - - process_thread_ = std::thread([this] { - auto exit_result = PlatformWaitForProcess(); - { - std::lock_guard lock_guard(process_lock_); - exit_result_ = std::move(exit_result); - status_ = SubProcessStatus::Exited; - } - this->process_condition_variable_.notify_all(); - if (this->delete_self_) { - delete this; - } - }); - - process_thread_.detach(); - } catch (const std::exception& e) { - status_ = SubProcessStatus::FailedToStart; - throw SubProcessFailedToStartException(u"Sub-process failed to start. " + - String::FromUtf8(e.what())); - } -} - -void PlatformSubProcessBase::Wait( - std::optional<std::chrono::milliseconds> wait_time) { - std::lock_guard lock_guard(process_lock_); - - if (status_ == SubProcessStatus::Prepare) { - throw SubProcessException( - u"The process does not start. Can't wait for it."); - } - - if (status_ == SubProcessStatus::FailedToStart) { - throw SubProcessException( - u"The process failed to start. Can't wait for it."); - } - - if (status_ == SubProcessStatus::Exited) { - return; - } - - auto predicate = [this] { return status_ == SubProcessStatus::Exited; }; - - if (wait_time) { - process_condition_variable_.wait_for(process_lock_, *wait_time, predicate); - } else { - process_condition_variable_.wait(process_lock_, predicate); - } -} - -void PlatformSubProcessBase::Kill() { - std::lock_guard data_lock_guard(process_lock_); - - if (status_ == SubProcessStatus::Prepare) { - throw SubProcessException(u"The process does not start. Can't kill it."); - } - - if (status_ == SubProcessStatus::FailedToStart) { - throw SubProcessException(u"The process failed to start. Can't kill it."); - } - - if (status_ == SubProcessStatus::Exited) { - return; - } - - PlatformKillProcess(); -} - -SubProcessStatus PlatformSubProcessBase::GetStatus() { - std::lock_guard data_lock_guard(process_lock_); - return status_; -} - -SubProcessExitResult PlatformSubProcessBase::GetExitResult() { - std::lock_guard lock_guard(process_lock_); - - if (status_ == SubProcessStatus::Prepare) { - throw SubProcessException( - u"The process does not start. Can't get exit result."); - } - - if (status_ == SubProcessStatus::FailedToStart) { - throw SubProcessException( - u"The process failed to start. Can't get exit result."); - } - - if (status_ == SubProcessStatus::Running) { - throw SubProcessException( - u"The process is running. Can't get exit result."); - } - - return exit_result_; -} - -void PlatformSubProcessBase::SetDeleteSelfOnExit(bool enable) { - std::lock_guard lock_guard(process_lock_); - delete_self_ = enable; -} #ifdef CRU_PLATFORM_UNIX -using PlatformSubProcess = platform::unix::PosixSpawnSubProcess; +using ThisPlatformSubProcess = platform::unix::PosixSpawnSubProcess; #endif SubProcess SubProcess::Create(String program, std::vector<String> arguments, @@ -147,7 +20,7 @@ SubProcess SubProcess::Create(String program, std::vector<String> arguments, } SubProcess::SubProcess(SubProcessStartInfo start_info) { - platform_process_.reset(new PlatformSubProcess(std::move(start_info))); + platform_process_.reset(new ThisPlatformSubProcess(std::move(start_info))); platform_process_->Start(); } @@ -188,10 +61,7 @@ io::Stream* SubProcess::GetStderrStream() { return platform_process_->GetStderrStream(); } -void SubProcess::Detach() { - auto p = platform_process_.release(); - p->SetDeleteSelfOnExit(true); -} +void SubProcess::Detach() { auto p = platform_process_.release(); } void SubProcess::CheckValid() const { if (!IsValid()) { diff --git a/src/common/platform/unix/PosixSpawnSubProcess.cpp b/src/common/platform/unix/PosixSpawnSubProcess.cpp index 4065d6f6..1e2a84d6 100644 --- a/src/common/platform/unix/PosixSpawnSubProcess.cpp +++ b/src/common/platform/unix/PosixSpawnSubProcess.cpp @@ -14,9 +14,8 @@ #include <unordered_map> namespace cru::platform::unix { -PosixSpawnSubProcess::PosixSpawnSubProcess(SubProcessStartInfo start_info) - : PlatformSubProcessBase(std::move(start_info)), - pid_(0), +PosixSpawnSubProcessImpl::PosixSpawnSubProcessImpl() + : pid_(0), exit_code_(0), stdin_pipe_(UnixPipe::Usage::Send), stdout_pipe_(UnixPipe::Usage::Receive), @@ -34,17 +33,17 @@ PosixSpawnSubProcess::PosixSpawnSubProcess(SubProcessStartInfo start_info) std::make_unique<io::AutoReadStream>(stderr_stream_.get(), false); } -PosixSpawnSubProcess::~PosixSpawnSubProcess() {} +PosixSpawnSubProcessImpl::~PosixSpawnSubProcessImpl() {} -io::Stream* PosixSpawnSubProcess::GetStdinStream() { +io::Stream* PosixSpawnSubProcessImpl::GetStdinStream() { return stdin_stream_.get(); } -io::Stream* PosixSpawnSubProcess::GetStdoutStream() { +io::Stream* PosixSpawnSubProcessImpl::GetStdoutStream() { return stdout_buffer_stream_.get(); } -io::Stream* PosixSpawnSubProcess::GetStderrStream() { +io::Stream* PosixSpawnSubProcessImpl::GetStderrStream() { return stderr_buffer_stream_.get(); } @@ -82,7 +81,8 @@ void DestroyCstrArray(char** argv) { } } // namespace -void PosixSpawnSubProcess::PlatformCreateProcess() { +void PosixSpawnSubProcessImpl::PlatformCreateProcess( + const SubProcessStartInfo& start_info) { int error; auto check_error = [&error](String message) { if (error == 0) return; @@ -127,15 +127,15 @@ void PosixSpawnSubProcess::PlatformCreateProcess() { check_error(u"Failed to set flag POSIX_SPAWN_CLOEXEC_DEFAULT (osx)."); #endif - auto exe = start_info_.program.ToUtf8(); - std::vector<String> arguments{start_info_.program}; - arguments.insert(arguments.cend(), start_info_.arguments.cbegin(), - start_info_.arguments.cend()); + auto exe = start_info.program.ToUtf8(); + std::vector<String> arguments{start_info.program}; + arguments.insert(arguments.cend(), start_info.arguments.cbegin(), + start_info.arguments.cend()); auto argv = CreateCstrArray(arguments); Guard argv_guard([argv] { DestroyCstrArray(argv); }); - auto envp = CreateCstrArray(start_info_.environments); + auto envp = CreateCstrArray(start_info.environments); Guard envp_guard([envp] { DestroyCstrArray(envp); }); error = posix_spawnp(&pid_, exe.c_str(), &file_actions, &attr, argv, envp); @@ -149,7 +149,7 @@ void PosixSpawnSubProcess::PlatformCreateProcess() { check_error(u"Failed to close stderr."); } -SubProcessExitResult PosixSpawnSubProcess::PlatformWaitForProcess() { +SubProcessExitResult PosixSpawnSubProcessImpl::PlatformWaitForProcess() { int wstatus; while (waitpid(pid_, &wstatus, 0) == -1) { @@ -173,7 +173,7 @@ SubProcessExitResult PosixSpawnSubProcess::PlatformWaitForProcess() { } } -void PosixSpawnSubProcess::PlatformKillProcess() { +void PosixSpawnSubProcessImpl::PlatformKillProcess() { int error = kill(pid_, SIGKILL); if (error != 0) { std::unique_ptr<ErrnoException> inner(new ErrnoException({}, errno)); |