diff options
author | crupest <crupest@outlook.com> | 2024-06-24 00:06:25 +0800 |
---|---|---|
committer | crupest <crupest@outlook.com> | 2024-07-21 13:45:23 +0800 |
commit | 5f0d7dc36a7a0091bfc152be9f06730cd08eb4dd (patch) | |
tree | 72ded26e72272dc2f7edf29146b907e52784ad6a | |
parent | e532469ca8844bf4daff8d462f80abdd776c018f (diff) | |
download | cru-5f0d7dc36a7a0091bfc152be9f06730cd08eb4dd.tar.gz cru-5f0d7dc36a7a0091bfc152be9f06730cd08eb4dd.tar.bz2 cru-5f0d7dc36a7a0091bfc152be9f06730cd08eb4dd.zip |
feat: reimplement subprocess without template.
NEED TEST: BufferStream, AutoReadStream, SubProcess.
-rw-r--r-- | include/cru/common/SubProcess.h | 253 | ||||
-rw-r--r-- | include/cru/common/platform/unix/PosixSpawnSubProcess.h | 17 | ||||
-rw-r--r-- | src/common/SubProcess.cpp | 131 | ||||
-rw-r--r-- | src/common/platform/unix/PosixSpawnSubProcess.cpp | 24 |
4 files changed, 195 insertions, 230 deletions
diff --git a/include/cru/common/SubProcess.h b/include/cru/common/SubProcess.h index 9cfe3a8e..078d5546 100644 --- a/include/cru/common/SubProcess.h +++ b/include/cru/common/SubProcess.h @@ -8,7 +8,6 @@ #include <condition_variable> #include <mutex> #include <optional> -#include <thread> #include <unordered_map> #include <vector> @@ -79,103 +78,43 @@ struct SubProcessExitResult { } }; -template <typename T> -concept PlatformSubProcessImpl = - requires(T process, const SubProcessStartInfo& start_info) { - /** - * @brief Default constructible. - */ - new T(); - - /** - * @brief Create and start a real process. - * - * If the program can't be created or start, an exception should be - * thrown. - * - * Implementation should fill internal data of the new process and start - * it. - * - * This method will be called only once in first call of `Start` on this - * thread with lock holdDefaultConstructible. - */ - process.PlatformCreateProcess(start_info); - - /** - * @brief Wait for the created process forever and return the exit result - * when process stops. - * - * Implementation should wait for the real process forever, after that, - * fill internal data and returns exit result. - * - * This method will be called only once on another thread after - * `PlatformCreateProcess` returns successfully - */ - { - process.PlatformWaitForProcess() - } -> std::same_as<SubProcessExitResult>; - - /** - * @brief Kill the process immediately. - * - * This method will be called only once on this thread given - * `PlatformCreateProcess` returns successfully. There will be a window - * that the window already exits but the status has not been updated and - * this is called. So handle this gracefully and write to internal data - * carefully. - */ - process.PlatformKillProcess(); - - { process.GetStdinStream() } -> std::convertible_to<io::Stream*>; - { process.GetStdoutStream() } -> std::convertible_to<io::Stream*>; - { process.GetStderrStream() } -> std::convertible_to<io::Stream*>; - }; - -struct IPlatformSubProcess : virtual Interface { +struct IPlatformSubProcessImpl : virtual Interface { /** - * @brief Create and start a real process. If the process can't be created or - * start, `SubProcessFailedToStartException` will be thrown. If this function - * is already called once, `SubProcessException` will be thrown. Ensure only - * call this method once. - */ - virtual void Start() = 0; - - /** - * @brief Wait for the process to exit optionally for at most `wait_time`. If - * the process already exits, it will return immediately. If the process has - * not started or failed to start, `SubProcessException` will be thrown. - * Ensure `Start` is called and does not throw before calling this. + * @brief Create and start a real process. * - * @remarks You may wish this returns bool to indicate whether it is timeout - * or the process exits. But no, even if it is timeout, the process may also - * have exited due to task schedule. - */ - virtual void Wait(std::optional<std::chrono::milliseconds> wait_time) = 0; - - /** - * @brief kill the process if it is running. If the process already exits, - * nothing will happen. If the process has not started or failed to start, - * `SubProcessException` will be throw. Ensure `Start` is called and does not - * throw before calling this. + * If the program can't be created or start, an exception should be + * thrown. + * + * Implementation should fill internal data of the new process and start + * it. + * + * This method will be called only once in first call of `Start` on this + * thread with lock holdDefaultConstructible. */ - virtual void Kill() = 0; + virtual void PlatformCreateProcess(const SubProcessStartInfo& start_info) = 0; /** - * @brief Get the status of the process. - * 1. If the process has tried to start, aka `Start` is called, then this - * method will return one of `Running`, `FailedToStart`, `Exited`. - * 2. If it returns `Prepare`, `Start` is not called. - * 3. It does NOT guarantee that this return `Running` and the process is - * actually running. Because there might be a window that the process exits - * already but status is not updated. + * @brief Wait for the created process forever and return the exit result + * when process stops. + * + * Implementation should wait for the real process forever, after that, + * fill internal data and returns exit result. + * + * This method will be called only once on another thread after + * `PlatformCreateProcess` returns successfully */ - virtual SubProcessStatus GetStatus() = 0; + virtual SubProcessExitResult PlatformWaitForProcess() = 0; /** - * @brief Get the exit result. If the process is not started, failed to start - * or running, `SubProcessException` will be thrown. + * @brief Kill the process immediately. + * + * This method will be called only once on this thread given + * `PlatformCreateProcess` returns successfully. There will be a window + * that the window already exits but the status has not been updated and + * this is called. So handle this gracefully and write to internal data + * carefully. */ - virtual SubProcessExitResult GetExitResult() = 0; + virtual void PlatformKillProcess() = 0; virtual io::Stream* GetStdinStream() = 0; virtual io::Stream* GetStdoutStream() = 0; @@ -190,14 +129,14 @@ struct IPlatformSubProcess : virtual Interface { * a long time, the resource related to it will not be released. It may cause a * leak. */ -template <PlatformSubProcessImpl Impl> -class PlatformSubProcess : public Object, public virtual IPlatformSubProcess { - CRU_DEFINE_CLASS_LOG_TAG(u"PlatformSubProcessBase") +class PlatformSubProcess : public Object { + CRU_DEFINE_CLASS_LOG_TAG(u"PlatformSubProcess") private: struct State { - explicit State(SubProcessStartInfo start_info) - : start_info(std::move(start_info)) {} + explicit State(SubProcessStartInfo start_info, + std::shared_ptr<IPlatformSubProcessImpl> impl) + : start_info(std::move(start_info)), impl(std::move(impl)) {} std::mutex mutex; std::unique_lock<std::mutex> lock{mutex, std::defer_lock}; @@ -206,14 +145,14 @@ class PlatformSubProcess : public Object, public virtual IPlatformSubProcess { SubProcessExitResult exit_result; SubProcessStatus status = SubProcessStatus::Prepare; bool killed = false; - Impl impl; + std::shared_ptr<IPlatformSubProcessImpl> impl; }; public: - explicit PlatformSubProcess(SubProcessStartInfo start_info) - : state_(new State(std::move(start_info))) {} + PlatformSubProcess(SubProcessStartInfo start_info, + std::shared_ptr<IPlatformSubProcessImpl> impl); - ~PlatformSubProcess() override {} + ~PlatformSubProcess() override; /** * @brief Create and start a real process. If the process can't be created or @@ -221,31 +160,7 @@ class PlatformSubProcess : public Object, public virtual IPlatformSubProcess { * is already called once, `SubProcessException` will be thrown. Ensure only * call this method once. */ - void Start() override { - std::lock_guard lock_guard(this->state_->lock); - - if (this->state_->status != SubProcessStatus::Prepare) { - throw SubProcessException(u"The process has already tried to start."); - } - - try { - this->state_->impl.PlatformCreateProcess(this->state_->start_info); - this->state_->status = SubProcessStatus::Running; - - auto thread = std::thread([state = state_] { - std::lock_guard lock_guard(state->lock); - state->exit_result = state->impl.PlatformWaitForProcess(); - state->status = SubProcessStatus::Exited; - state->condition_variable.notify_all(); - }); - - thread.detach(); - } catch (const std::exception& e) { - this->state_->status = SubProcessStatus::FailedToStart; - throw SubProcessFailedToStartException(u"Sub-process failed to start. " + - String::FromUtf8(e.what())); - } - } + void Start(); /** * @brief Wait for the process to exit optionally for at most `wait_time`. If @@ -257,34 +172,7 @@ class PlatformSubProcess : public Object, public virtual IPlatformSubProcess { * or the process exits. But no, even if it is timeout, the process may also * have exited due to task schedule. */ - void Wait(std::optional<std::chrono::milliseconds> wait_time) override { - std::lock_guard lock_guard(this->state_->lock); - - if (this->state_->status == SubProcessStatus::Prepare) { - throw SubProcessException( - u"The process does not start. Can't wait for it."); - } - - if (this->state_->status == SubProcessStatus::FailedToStart) { - throw SubProcessException( - u"The process failed to start. Can't wait for it."); - } - - if (this->state_->status == SubProcessStatus::Exited) { - return; - } - - auto predicate = [this] { - return this->state_->status == SubProcessStatus::Exited; - }; - - if (wait_time) { - this->state_->condition_variable.wait_for(this->state_->lock, *wait_time, - predicate); - } else { - this->state_->condition_variable.wait(this->state_->lock, predicate); - } - } + void Wait(std::optional<std::chrono::milliseconds> wait_time); /** * @brief kill the process if it is running. If the process already exits, @@ -292,28 +180,7 @@ class PlatformSubProcess : public Object, public virtual IPlatformSubProcess { * `SubProcessException` will be throw. Ensure `Start` is called and does not * throw before calling this. */ - void Kill() override { - std::lock_guard lock_guard(this->state_->lock); - - if (this->state_->status == SubProcessStatus::Prepare) { - throw SubProcessException(u"The process does not start. Can't kill it."); - } - - if (this->state_->status == SubProcessStatus::FailedToStart) { - throw SubProcessException(u"The process failed to start. Can't kill it."); - } - - if (this->state_->status == SubProcessStatus::Exited) { - return; - } - - if (this->state_->killed) { - return; - } - - this->state_->impl.PlatformKillProcess(); - this->state_->killed = true; - } + void Kill(); /** * @brief Get the status of the process. @@ -324,45 +191,17 @@ class PlatformSubProcess : public Object, public virtual IPlatformSubProcess { * actually running. Because there might be a window that the process exits * already but status is not updated. */ - SubProcessStatus GetStatus() override { - std::lock_guard lock_guard(this->state_->lock); - return this->state_->status; - } + SubProcessStatus GetStatus(); /** * @brief Get the exit result. If the process is not started, failed to start * or running, `SubProcessException` will be thrown. */ - SubProcessExitResult GetExitResult() override { - std::lock_guard lock_guard(this->state_->lock); - - if (this->state_->status == SubProcessStatus::Prepare) { - throw SubProcessException( - u"The process does not start. Can't get exit result."); - } - - if (this->state_->status == SubProcessStatus::FailedToStart) { - throw SubProcessException( - u"The process failed to start. Can't get exit result."); - } - - if (this->state_->status == SubProcessStatus::Running) { - throw SubProcessException( - u"The process is running. Can't get exit result."); - } - - return this->state_->exit_result; - } + SubProcessExitResult GetExitResult(); - io::Stream* GetStdinStream() override { - return this->state_->impl.GetStdinStream(); - } - io::Stream* GetStdoutStream() override { - return this->state_->impl.GetStdoutStream(); - } - io::Stream* GetStderrStream() override { - return this->state_->impl.GetStderrStream(); - } + io::Stream* GetStdinStream(); + io::Stream* GetStdoutStream(); + io::Stream* GetStderrStream(); private: std::shared_ptr<State> state_; @@ -406,6 +245,6 @@ class CRU_BASE_API SubProcess : public Object { void CheckValid() const; private: - std::unique_ptr<IPlatformSubProcess> platform_process_; + std::unique_ptr<PlatformSubProcess> platform_process_; }; } // namespace cru diff --git a/include/cru/common/platform/unix/PosixSpawnSubProcess.h b/include/cru/common/platform/unix/PosixSpawnSubProcess.h index d4df284b..ee4e912a 100644 --- a/include/cru/common/platform/unix/PosixSpawnSubProcess.h +++ b/include/cru/common/platform/unix/PosixSpawnSubProcess.h @@ -16,20 +16,21 @@ #include <spawn.h> namespace cru::platform::unix { -class PosixSpawnSubProcessImpl { +class PosixSpawnSubProcessImpl : public Object, + public virtual IPlatformSubProcessImpl { CRU_DEFINE_CLASS_LOG_TAG(u"PosixSpawnSubProcess") public: explicit PosixSpawnSubProcessImpl(); ~PosixSpawnSubProcessImpl(); - io::Stream* GetStdinStream(); - io::Stream* GetStdoutStream(); - io::Stream* GetStderrStream(); + void PlatformCreateProcess(const SubProcessStartInfo& start_info) override; + SubProcessExitResult PlatformWaitForProcess() override; + void PlatformKillProcess() override; - void PlatformCreateProcess(const SubProcessStartInfo& start_info); - SubProcessExitResult PlatformWaitForProcess(); - void PlatformKillProcess(); + io::Stream* GetStdinStream() override; + io::Stream* GetStdoutStream() override; + io::Stream* GetStderrStream() override; private: pid_t pid_; @@ -46,6 +47,4 @@ class PosixSpawnSubProcessImpl { std::unique_ptr<io::AutoReadStream> stdout_buffer_stream_; std::unique_ptr<io::AutoReadStream> stderr_buffer_stream_; }; - -using PosixSpawnSubProcess = PlatformSubProcess<PosixSpawnSubProcessImpl>; } // namespace cru::platform::unix diff --git a/src/common/SubProcess.cpp b/src/common/SubProcess.cpp index 69f52d9c..0ffb2387 100644 --- a/src/common/SubProcess.cpp +++ b/src/common/SubProcess.cpp @@ -1,5 +1,7 @@ #include "cru/common/SubProcess.h" +#include <thread> + #ifdef CRU_PLATFORM_UNIX #include "cru/common/platform/unix/PosixSpawnSubProcess.h" #endif @@ -7,9 +9,133 @@ namespace cru { #ifdef CRU_PLATFORM_UNIX -using ThisPlatformSubProcess = platform::unix::PosixSpawnSubProcess; +using ThisPlatformSubProcessImpl = platform::unix::PosixSpawnSubProcessImpl; #endif +PlatformSubProcess::PlatformSubProcess( + SubProcessStartInfo start_info, + std::shared_ptr<IPlatformSubProcessImpl> impl) + : state_(new State(std::move(start_info), std::move(impl))) {} + +PlatformSubProcess::~PlatformSubProcess() {} + +void PlatformSubProcess::Start() { + std::lock_guard lock_guard(this->state_->lock); + + if (this->state_->status != SubProcessStatus::Prepare) { + throw SubProcessException(u"The process has already tried to start."); + } + + try { + this->state_->impl->PlatformCreateProcess(this->state_->start_info); + this->state_->status = SubProcessStatus::Running; + + auto thread = std::thread([state = state_] { + std::lock_guard lock_guard(state->lock); + state->exit_result = state->impl->PlatformWaitForProcess(); + state->status = SubProcessStatus::Exited; + state->condition_variable.notify_all(); + }); + + thread.detach(); + } catch (const std::exception& e) { + this->state_->status = SubProcessStatus::FailedToStart; + throw SubProcessFailedToStartException(u"Sub-process failed to start. " + + String::FromUtf8(e.what())); + } +} + +void PlatformSubProcess::Wait( + std::optional<std::chrono::milliseconds> wait_time) { + std::lock_guard lock_guard(this->state_->lock); + + if (this->state_->status == SubProcessStatus::Prepare) { + throw SubProcessException( + u"The process does not start. Can't wait for it."); + } + + if (this->state_->status == SubProcessStatus::FailedToStart) { + throw SubProcessException( + u"The process failed to start. Can't wait for it."); + } + + if (this->state_->status == SubProcessStatus::Exited) { + return; + } + + auto predicate = [this] { + return this->state_->status == SubProcessStatus::Exited; + }; + + if (wait_time) { + this->state_->condition_variable.wait_for(this->state_->lock, *wait_time, + predicate); + } else { + this->state_->condition_variable.wait(this->state_->lock, predicate); + } +} + +void PlatformSubProcess::Kill() { + std::lock_guard lock_guard(this->state_->lock); + + if (this->state_->status == SubProcessStatus::Prepare) { + throw SubProcessException(u"The process does not start. Can't kill it."); + } + + if (this->state_->status == SubProcessStatus::FailedToStart) { + throw SubProcessException(u"The process failed to start. Can't kill it."); + } + + if (this->state_->status == SubProcessStatus::Exited) { + return; + } + + if (this->state_->killed) { + return; + } + + this->state_->impl->PlatformKillProcess(); + this->state_->killed = true; +} + +SubProcessStatus PlatformSubProcess::GetStatus() { + std::lock_guard lock_guard(this->state_->lock); + return this->state_->status; +} + +SubProcessExitResult PlatformSubProcess::GetExitResult() { + std::lock_guard lock_guard(this->state_->lock); + + if (this->state_->status == SubProcessStatus::Prepare) { + throw SubProcessException( + u"The process does not start. Can't get exit result."); + } + + if (this->state_->status == SubProcessStatus::FailedToStart) { + throw SubProcessException( + u"The process failed to start. Can't get exit result."); + } + + if (this->state_->status == SubProcessStatus::Running) { + throw SubProcessException( + u"The process is running. Can't get exit result."); + } + + return this->state_->exit_result; +} + +io::Stream* PlatformSubProcess::GetStdinStream() { + return this->state_->impl->GetStdinStream(); +} + +io::Stream* PlatformSubProcess::GetStdoutStream() { + return this->state_->impl->GetStdoutStream(); +} + +io::Stream* PlatformSubProcess::GetStderrStream() { + return this->state_->impl->GetStderrStream(); +} + SubProcess SubProcess::Create(String program, std::vector<String> arguments, std::unordered_map<String, String> environments) { SubProcessStartInfo start_info; @@ -20,7 +146,8 @@ SubProcess SubProcess::Create(String program, std::vector<String> arguments, } SubProcess::SubProcess(SubProcessStartInfo start_info) { - platform_process_.reset(new ThisPlatformSubProcess(std::move(start_info))); + platform_process_.reset(new PlatformSubProcess( + std::move(start_info), std::make_shared<ThisPlatformSubProcessImpl>())); platform_process_->Start(); } diff --git a/src/common/platform/unix/PosixSpawnSubProcess.cpp b/src/common/platform/unix/PosixSpawnSubProcess.cpp index 1e2a84d6..8b521a5e 100644 --- a/src/common/platform/unix/PosixSpawnSubProcess.cpp +++ b/src/common/platform/unix/PosixSpawnSubProcess.cpp @@ -35,18 +35,6 @@ PosixSpawnSubProcessImpl::PosixSpawnSubProcessImpl() PosixSpawnSubProcessImpl::~PosixSpawnSubProcessImpl() {} -io::Stream* PosixSpawnSubProcessImpl::GetStdinStream() { - return stdin_stream_.get(); -} - -io::Stream* PosixSpawnSubProcessImpl::GetStdoutStream() { - return stdout_buffer_stream_.get(); -} - -io::Stream* PosixSpawnSubProcessImpl::GetStderrStream() { - return stderr_buffer_stream_.get(); -} - namespace { char** CreateCstrArray(const std::vector<String>& argv) { std::vector<Buffer> utf8_argv; @@ -181,4 +169,16 @@ void PosixSpawnSubProcessImpl::PlatformKillProcess() { std::move(inner)); } } + +io::Stream* PosixSpawnSubProcessImpl::GetStdinStream() { + return stdin_stream_.get(); +} + +io::Stream* PosixSpawnSubProcessImpl::GetStdoutStream() { + return stdout_buffer_stream_.get(); +} + +io::Stream* PosixSpawnSubProcessImpl::GetStderrStream() { + return stderr_buffer_stream_.get(); +} } // namespace cru::platform::unix |