From 90868bf85dc295f70620dbcbd5790999fe239550 Mon Sep 17 00:00:00 2001
From: Yuqian Yang <crupest@crupest.life>
Date: Sun, 23 Feb 2025 16:40:32 +0800
Subject: feat(python): move python codes.

---
 python/cru/service/__init__.py  |   0
 python/cru/service/__main__.py  |  27 +++
 python/cru/service/_app.py      |  30 +++
 python/cru/service/_base.py     | 400 ++++++++++++++++++++++++++++++++++++++++
 python/cru/service/_gen_cmd.py  | 200 ++++++++++++++++++++
 python/cru/service/_nginx.py    | 263 ++++++++++++++++++++++++++
 python/cru/service/_template.py | 228 +++++++++++++++++++++++
 7 files changed, 1148 insertions(+)
 create mode 100644 python/cru/service/__init__.py
 create mode 100644 python/cru/service/__main__.py
 create mode 100644 python/cru/service/_app.py
 create mode 100644 python/cru/service/_base.py
 create mode 100644 python/cru/service/_gen_cmd.py
 create mode 100644 python/cru/service/_nginx.py
 create mode 100644 python/cru/service/_template.py

(limited to 'python/cru/service')

diff --git a/python/cru/service/__init__.py b/python/cru/service/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/python/cru/service/__main__.py b/python/cru/service/__main__.py
new file mode 100644
index 0000000..2a0268b
--- /dev/null
+++ b/python/cru/service/__main__.py
@@ -0,0 +1,27 @@
+import sys
+
+from cru import CruException
+
+from ._app import create_app
+
+
+def main():
+    app = create_app()
+    app.run_command()
+
+
+if __name__ == "__main__":
+    version_info = sys.version_info
+    if not (version_info.major == 3 and version_info.minor >= 11):
+        print("This application requires Python 3.11 or later.", file=sys.stderr)
+        sys.exit(1)
+
+    try:
+        main()
+    except CruException as e:
+        user_message = e.get_user_message()
+        if user_message is not None:
+            print(f"Error: {user_message}")
+            exit(1)
+        else:
+            raise
diff --git a/python/cru/service/_app.py b/python/cru/service/_app.py
new file mode 100644
index 0000000..b4c6271
--- /dev/null
+++ b/python/cru/service/_app.py
@@ -0,0 +1,30 @@
+from ._base import (
+    AppBase,
+    CommandDispatcher,
+    PathCommandProvider,
+)
+from ._template import TemplateManager
+from ._nginx import NginxManager
+from ._gen_cmd import GenCmdProvider
+
+APP_ID = "crupest"
+
+
+class App(AppBase):
+    def __init__(self):
+        super().__init__(APP_ID, f"{APP_ID}-service")
+        self.add_feature(PathCommandProvider())
+        self.add_feature(TemplateManager())
+        self.add_feature(NginxManager())
+        self.add_feature(GenCmdProvider())
+        self.add_feature(CommandDispatcher())
+
+    def run_command(self):
+        command_dispatcher = self.get_feature(CommandDispatcher)
+        command_dispatcher.run_command()
+
+
+def create_app() -> App:
+    app = App()
+    app.setup()
+    return app
diff --git a/python/cru/service/_base.py b/python/cru/service/_base.py
new file mode 100644
index 0000000..e1eee70
--- /dev/null
+++ b/python/cru/service/_base.py
@@ -0,0 +1,400 @@
+from __future__ import annotations
+
+from argparse import ArgumentParser, Namespace
+from abc import ABC, abstractmethod
+import argparse
+import os
+from pathlib import Path
+from typing import TypeVar, overload
+
+from cru import CruException, CruLogicError
+
+_Feature = TypeVar("_Feature", bound="AppFeatureProvider")
+
+
+class AppError(CruException):
+    pass
+
+
+class AppFeatureError(AppError):
+    def __init__(self, message, feature: type | str, *args, **kwargs):
+        super().__init__(message, *args, **kwargs)
+        self._feature = feature
+
+    @property
+    def feature(self) -> type | str:
+        return self._feature
+
+
+class AppPathError(CruException):
+    def __init__(self, message, _path: str | Path, *args, **kwargs):
+        super().__init__(message, *args, **kwargs)
+        self._path = str(_path)
+
+    @property
+    def path(self) -> str:
+        return self._path
+
+
+class AppPath(ABC):
+    def __init__(self, id: str, is_dir: bool, description: str) -> None:
+        self._is_dir = is_dir
+        self._id = id
+        self._description = description
+
+    @property
+    @abstractmethod
+    def parent(self) -> AppPath | None: ...
+
+    @property
+    @abstractmethod
+    def app(self) -> AppBase: ...
+
+    @property
+    def id(self) -> str:
+        return self._id
+
+    @property
+    def description(self) -> str:
+        return self._description
+
+    @property
+    def is_dir(self) -> bool:
+        return self._is_dir
+
+    @property
+    @abstractmethod
+    def full_path(self) -> Path: ...
+
+    @property
+    def full_path_str(self) -> str:
+        return str(self.full_path)
+
+    def check_parents(self, must_exist: bool = False) -> bool:
+        for p in reversed(self.full_path.parents):
+            if not p.exists() and not must_exist:
+                return False
+            if not p.is_dir():
+                raise AppPathError("Parents' path must be a dir.", self.full_path)
+        return True
+
+    def check_self(self, must_exist: bool = False) -> bool:
+        if not self.check_parents(must_exist):
+            return False
+        if not self.full_path.exists():
+            if not must_exist:
+                return False
+            raise AppPathError("Not exist.", self.full_path)
+        if self.is_dir:
+            if not self.full_path.is_dir():
+                raise AppPathError("Should be a directory, but not.", self.full_path)
+            else:
+                return True
+        else:
+            if not self.full_path.is_file():
+                raise AppPathError("Should be a file, but not.", self.full_path)
+            else:
+                return True
+
+    def ensure(self, create_file: bool = False) -> None:
+        e = self.check_self(False)
+        if not e:
+            os.makedirs(self.full_path.parent, exist_ok=True)
+            if self.is_dir:
+                os.mkdir(self.full_path)
+            elif create_file:
+                with open(self.full_path, "w") as f:
+                    f.write("")
+
+    def read_text(self) -> str:
+        if self.is_dir:
+            raise AppPathError("Can't read text of a dir.", self.full_path)
+        self.check_self()
+        return self.full_path.read_text()
+
+    def add_subpath(
+        self,
+        name: str,
+        is_dir: bool,
+        /,
+        id: str | None = None,
+        description: str = "",
+    ) -> AppFeaturePath:
+        return self.app._add_path(name, is_dir, self, id, description)
+
+    @property
+    def app_relative_path(self) -> Path:
+        return self.full_path.relative_to(self.app.root.full_path)
+
+
+class AppFeaturePath(AppPath):
+    def __init__(
+        self,
+        parent: AppPath,
+        name: str,
+        is_dir: bool,
+        /,
+        id: str | None = None,
+        description: str = "",
+    ) -> None:
+        super().__init__(id or name, is_dir, description)
+        self._name = name
+        self._parent = parent
+
+    @property
+    def name(self) -> str:
+        return self._name
+
+    @property
+    def parent(self) -> AppPath:
+        return self._parent
+
+    @property
+    def app(self) -> AppBase:
+        return self.parent.app
+
+    @property
+    def full_path(self) -> Path:
+        return Path(self.parent.full_path, self.name).resolve()
+
+
+class AppRootPath(AppPath):
+    def __init__(self, app: AppBase, path: Path):
+        super().__init__(f"/{id}", True, f"Application {id} root path.")
+        self._app = app
+        self._full_path = path.resolve()
+
+    @property
+    def parent(self) -> None:
+        return None
+
+    @property
+    def app(self) -> AppBase:
+        return self._app
+
+    @property
+    def full_path(self) -> Path:
+        return self._full_path
+
+
+class AppFeatureProvider(ABC):
+    def __init__(self, name: str, /, app: AppBase | None = None):
+        super().__init__()
+        self._name = name
+        self._app = app if app else AppBase.get_instance()
+
+    @property
+    def app(self) -> AppBase:
+        return self._app
+
+    @property
+    def name(self) -> str:
+        return self._name
+
+    @abstractmethod
+    def setup(self) -> None: ...
+
+
+class AppCommandFeatureProvider(AppFeatureProvider):
+    @abstractmethod
+    def get_command_info(self) -> tuple[str, str]: ...
+
+    @abstractmethod
+    def setup_arg_parser(self, arg_parser: ArgumentParser): ...
+
+    @abstractmethod
+    def run_command(self, args: Namespace) -> None: ...
+
+
+class PathCommandProvider(AppCommandFeatureProvider):
+    def __init__(self) -> None:
+        super().__init__("path-command-provider")
+
+    def setup(self):
+        pass
+
+    def get_command_info(self):
+        return ("path", "Get information about paths used by app.")
+
+    def setup_arg_parser(self, arg_parser: ArgumentParser) -> None:
+        subparsers = arg_parser.add_subparsers(
+            dest="path_command", metavar="PATH_COMMAND"
+        )
+        _list_parser = subparsers.add_parser(
+            "list", help="list special paths used by app"
+        )
+
+    def run_command(self, args: Namespace) -> None:
+        if args.path_command is None or args.path_command == "list":
+            for path in self.app.paths:
+                print(
+                    f"{path.app_relative_path.as_posix()}{'/' if path.is_dir else ''}: {path.description}"
+                )
+
+
+class CommandDispatcher(AppFeatureProvider):
+    def __init__(self) -> None:
+        super().__init__("command-dispatcher")
+
+    def setup_arg_parser(self) -> None:
+        self._map: dict[str, AppCommandFeatureProvider] = {}
+        arg_parser = argparse.ArgumentParser(
+            description="Service management",
+            formatter_class=argparse.RawDescriptionHelpFormatter,
+        )
+        subparsers = arg_parser.add_subparsers(
+            dest="command",
+            help="The management command to execute.",
+            metavar="COMMAND",
+        )
+        for feature in self.app.features:
+            if isinstance(feature, AppCommandFeatureProvider):
+                info = feature.get_command_info()
+                command_subparser = subparsers.add_parser(info[0], help=info[1])
+                feature.setup_arg_parser(command_subparser)
+                self._map[info[0]] = feature
+        self._arg_parser = arg_parser
+
+    def setup(self):
+        self._parsed_args = self.arg_parser.parse_args()
+
+    @property
+    def arg_parser(self) -> argparse.ArgumentParser:
+        return self._arg_parser
+
+    @property
+    def command_map(self) -> dict[str, AppCommandFeatureProvider]:
+        return self._map
+
+    @property
+    def program_args(self) -> argparse.Namespace:
+        return self._parsed_args
+
+    def run_command(self) -> None:
+        args = self.program_args
+        if args.command is None:
+            self.arg_parser.print_help()
+            return
+        self.command_map[args.command].run_command(args)
+
+
+class AppBase:
+    _instance: AppBase | None = None
+
+    @staticmethod
+    def get_instance() -> AppBase:
+        if AppBase._instance is None:
+            raise AppError("App instance not initialized")
+        return AppBase._instance
+
+    def __init__(self, app_id: str, name: str):
+        AppBase._instance = self
+        self._app_id = app_id
+        self._name = name
+        self._features: list[AppFeatureProvider] = []
+        self._paths: list[AppFeaturePath] = []
+
+    def setup(self) -> None:
+        command_dispatcher = self.get_feature(CommandDispatcher)
+        command_dispatcher.setup_arg_parser()
+        self._root = AppRootPath(self, Path(self._ensure_env("CRUPEST_PROJECT_DIR")))
+        self._data_dir = self._root.add_subpath(
+            self._ensure_env("CRUPEST_DATA_DIR"), True, id="data"
+        )
+        self._services_dir = self._root.add_subpath(
+            self._ensure_env("CRUPEST_SERVICES_DIR"), True, id="CRUPEST_SERVICES_DIR"
+        )
+        for feature in self.features:
+            feature.setup()
+        for path in self.paths:
+            path.check_self()
+
+    @property
+    def app_id(self) -> str:
+        return self._app_id
+
+    @property
+    def name(self) -> str:
+        return self._name
+
+    def _ensure_env(self, env_name: str) -> str:
+        value = os.getenv(env_name)
+        if value is None:
+            raise AppError(f"Environment variable {env_name} not set")
+        return value
+
+    @property
+    def root(self) -> AppRootPath:
+        return self._root
+
+    @property
+    def data_dir(self) -> AppFeaturePath:
+        return self._data_dir
+
+    @property
+    def services_dir(self) -> AppFeaturePath:
+        return self._services_dir
+
+    @property
+    def app_initialized(self) -> bool:
+        return self.data_dir.check_self()
+
+    @property
+    def features(self) -> list[AppFeatureProvider]:
+        return self._features
+
+    @property
+    def paths(self) -> list[AppFeaturePath]:
+        return self._paths
+
+    def add_feature(self, feature: _Feature) -> _Feature:
+        for f in self.features:
+            if f.name == feature.name:
+                raise AppFeatureError(
+                    f"Duplicate feature name: {feature.name}.", feature.name
+                )
+        self._features.append(feature)
+        return feature
+
+    def _add_path(
+        self,
+        name: str,
+        is_dir: bool,
+        /,
+        parent: AppPath | None = None,
+        id: str | None = None,
+        description: str = "",
+    ) -> AppFeaturePath:
+        p = AppFeaturePath(
+            parent or self.root, name, is_dir, id=id, description=description
+        )
+        self._paths.append(p)
+        return p
+
+    @overload
+    def get_feature(self, feature: str) -> AppFeatureProvider: ...
+
+    @overload
+    def get_feature(self, feature: type[_Feature]) -> _Feature: ...
+
+    def get_feature(
+        self, feature: str | type[_Feature]
+    ) -> AppFeatureProvider | _Feature:
+        if isinstance(feature, str):
+            for f in self._features:
+                if f.name == feature:
+                    return f
+        elif isinstance(feature, type):
+            for f in self._features:
+                if isinstance(f, feature):
+                    return f
+        else:
+            raise CruLogicError("Argument must be the name of feature or its class.")
+
+        raise AppFeatureError(f"Feature {feature} not found.", feature)
+
+    def get_path(self, name: str) -> AppFeaturePath:
+        for p in self._paths:
+            if p.id == name or p.name == name:
+                return p
+        raise AppPathError(f"Application path {name} not found.", name)
diff --git a/python/cru/service/_gen_cmd.py b/python/cru/service/_gen_cmd.py
new file mode 100644
index 0000000..f51d65f
--- /dev/null
+++ b/python/cru/service/_gen_cmd.py
@@ -0,0 +1,200 @@
+from dataclasses import dataclass, replace
+from typing import TypeAlias
+
+from ._base import AppCommandFeatureProvider
+from ._nginx import NginxManager
+
+_Str_Or_Cmd_List: TypeAlias = str | list["_Cmd"]
+
+
+@dataclass
+class _Cmd:
+    name: str
+    desc: str
+    cmd: _Str_Or_Cmd_List
+
+    def clean(self) -> "_Cmd":
+        if isinstance(self.cmd, list):
+            return replace(
+                self,
+                cmd=[cmd.clean() for cmd in self.cmd],
+            )
+        elif isinstance(self.cmd, str):
+            return replace(self, cmd=self.cmd.strip())
+        else:
+            raise ValueError("Unexpected type for cmd.")
+
+    def generate_text(
+        self,
+        info_only: bool,
+        *,
+        parent: str | None = None,
+    ) -> str:
+        if parent is None:
+            tag = "COMMAND"
+            long_name = self.name
+            indent = ""
+        else:
+            tag = "SUBCOMMAND"
+            long_name = f"{parent}.{self.name}"
+            indent = "  "
+
+        if info_only:
+            return f"{indent}[{long_name}]: {self.desc}"
+
+        text = f"--- {tag}[{long_name}]: {self.desc}"
+        if isinstance(self.cmd, str):
+            text += "\n" + self.cmd
+        elif isinstance(self.cmd, list):
+            for sub in self.cmd:
+                text += "\n" * 2 + sub.generate_text(info_only, parent=self.name)
+        else:
+            raise ValueError("Unexpected type for cmd.")
+
+        lines: list[str] = []
+        for line in text.splitlines():
+            if len(line) == 0:
+                lines.append("")
+            else:
+                lines.append(indent + line)
+        text = "\n".join(lines)
+
+        return text
+
+
+_docker_uninstall = _Cmd(
+    "uninstall",
+    "uninstall apt docker",
+    """
+for pkg in docker.io docker-doc docker-compose \
+podman-docker containerd runc; \
+do sudo apt-get remove $pkg; done
+""",
+)
+
+_docker_apt_certs = _Cmd(
+    "apt-certs",
+    "prepare apt certs",
+    """
+sudo apt-get update
+sudo apt-get install ca-certificates curl
+sudo install -m 0755 -d /etc/apt/keyrings
+""",
+)
+
+_docker_docker_certs = _Cmd(
+    "docker-certs",
+    "add docker certs",
+    """
+sudo curl -fsSL https://download.docker.com/linux/debian/gpg \
+-o /etc/apt/keyrings/docker.asc
+sudo chmod a+r /etc/apt/keyrings/docker.asc
+""",
+)
+
+_docker_apt_repo = _Cmd(
+    "apt-repo",
+    "add docker apt repo",
+    """
+echo \\
+  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \
+https://download.docker.com/linux/debian \\
+  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\
+  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
+""",
+)
+
+_docker_install = _Cmd(
+    "install",
+    "update apt and install docker",
+    """
+sudo apt-get update
+sudo apt-get install docker-ce docker-ce-cli containerd.io \
+docker-buildx-plugin docker-compose-plugin
+""",
+)
+
+_docker_setup = _Cmd(
+    "setup",
+    "setup system for docker",
+    """
+sudo systemctl enable docker
+sudo systemctl start docker
+sudo groupadd -f docker
+sudo usermod -aG docker $USER
+# Remember to log out and log back in for the group changes to take effect
+""",
+)
+
+_docker = _Cmd(
+    "install-docker",
+    "install docker for a fresh new system",
+    [
+        _docker_uninstall,
+        _docker_apt_certs,
+        _docker_docker_certs,
+        _docker_apt_repo,
+        _docker_install,
+        _docker_setup,
+    ],
+)
+
+_update_blog = _Cmd(
+    "update-blog",
+    "re-generate blog pages",
+    """
+docker exec -it blog /scripts/update.bash
+""",
+)
+
+_git_user = _Cmd(
+    "git-user",
+    "add/set git server user and password",
+    """
+docker run -it --rm -v "$ps_file:/user-info" httpd htpasswd "/user-info" [username]
+""",
+)
+
+
+class GenCmdProvider(AppCommandFeatureProvider):
+    def __init__(self) -> None:
+        super().__init__("gen-cmd-provider")
+        self._cmds: dict[str, _Cmd] = {}
+        self._register_cmds(_docker, _update_blog, _git_user)
+
+    def _register_cmd(self, cmd: "_Cmd"):
+        self._cmds[cmd.name] = cmd.clean()
+
+    def _register_cmds(self, *cmds: "_Cmd"):
+        for c in cmds:
+            self._register_cmd(c)
+
+    def setup(self):
+        pass
+
+    def get_command_info(self):
+        return ("gen-cmd", "Get commands of running external cli tools.")
+
+    def setup_arg_parser(self, arg_parser):
+        subparsers = arg_parser.add_subparsers(
+            dest="gen_cmd", metavar="GEN_CMD_COMMAND"
+        )
+        certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
+        certbot_parser.add_argument(
+            "-t", "--test", action="store_true", help="run certbot in test mode"
+        )
+        for cmd in self._cmds.values():
+            subparsers.add_parser(cmd.name, help=cmd.desc)
+
+    def _print_cmd(self, name: str):
+        print(self._cmds[name].generate_text(False))
+
+    def run_command(self, args):
+        if args.gen_cmd is None or args.gen_cmd == "list":
+            print("[certbot]: certbot ssl cert commands")
+            for cmd in self._cmds.values():
+                print(cmd.generate_text(True))
+        elif args.gen_cmd == "certbot":
+            self.app.get_feature(NginxManager).print_all_certbot_commands(args.test)
+        else:
+            self._print_cmd(args.gen_cmd)
diff --git a/python/cru/service/_nginx.py b/python/cru/service/_nginx.py
new file mode 100644
index 0000000..87cff6d
--- /dev/null
+++ b/python/cru/service/_nginx.py
@@ -0,0 +1,263 @@
+from argparse import Namespace
+from enum import Enum, auto
+import re
+import subprocess
+from typing import TypeAlias
+
+from cru import CruInternalError
+
+from ._base import AppCommandFeatureProvider
+from ._template import TemplateManager
+
+
+class CertbotAction(Enum):
+    CREATE = auto()
+    EXPAND = auto()
+    SHRINK = auto()
+    RENEW = auto()
+
+
+class NginxManager(AppCommandFeatureProvider):
+    CertbotAction: TypeAlias = CertbotAction
+
+    def __init__(self) -> None:
+        super().__init__("nginx-manager")
+        self._domains_cache: list[str] | None = None
+
+    def setup(self) -> None:
+        pass
+
+    @property
+    def _template_manager(self) -> TemplateManager:
+        return self.app.get_feature(TemplateManager)
+
+    @property
+    def root_domain(self) -> str:
+        return self._template_manager.get_domain()
+
+    @property
+    def domains(self) -> list[str]:
+        if self._domains_cache is None:
+            self._domains_cache = self._get_domains()
+        return self._domains_cache
+
+    @property
+    def subdomains(self) -> list[str]:
+        suffix = "." + self.root_domain
+        return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)]
+
+    def _get_domains_from_text(self, text: str) -> set[str]:
+        domains: set[str] = set()
+        regex = re.compile(r"server_name\s+(\S+)\s*;")
+        for match in regex.finditer(text):
+            domains.add(match[1])
+        return domains
+
+    def _join_generated_nginx_conf_text(self) -> str:
+        result = ""
+        for path, text in self._template_manager.generate():
+            if "nginx" in str(path):
+                result += text
+        return result
+
+    def _get_domains(self) -> list[str]:
+        text = self._join_generated_nginx_conf_text()
+        domains = self._get_domains_from_text(text)
+        domains.remove(self.root_domain)
+        return [self.root_domain, *domains]
+
+    def _print_domains(self) -> None:
+        for domain in self.domains:
+            print(domain)
+
+    def _certbot_command(
+        self,
+        action: CertbotAction | str,
+        test: bool,
+        *,
+        docker=True,
+        standalone=None,
+        email=None,
+        agree_tos=True,
+    ) -> str:
+        if isinstance(action, str):
+            action = CertbotAction[action.upper()]
+
+        command_args = []
+
+        add_domain_option = True
+        if action is CertbotAction.CREATE:
+            if standalone is None:
+                standalone = True
+            command_action = "certonly"
+        elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]:
+            if standalone is None:
+                standalone = False
+            command_action = "certonly"
+        elif action is CertbotAction.RENEW:
+            if standalone is None:
+                standalone = False
+            add_domain_option = False
+            command_action = "renew"
+        else:
+            raise CruInternalError("Invalid certbot action.")
+
+        data_dir = self.app.data_dir.full_path.as_posix()
+
+        if not docker:
+            command_args.append("certbot")
+        else:
+            command_args.extend(
+                [
+                    "docker run -it --rm --name certbot",
+                    f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"',
+                    f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"',
+                ]
+            )
+            if standalone:
+                command_args.append('-p "0.0.0.0:80:80"')
+            else:
+                command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"')
+
+            command_args.append("certbot/certbot")
+
+        command_args.append(command_action)
+
+        command_args.append(f"--cert-name {self.root_domain}")
+
+        if standalone:
+            command_args.append("--standalone")
+        else:
+            command_args.append("--webroot -w /var/www/certbot")
+
+        if add_domain_option:
+            command_args.append(" ".join([f"-d {domain}" for domain in self.domains]))
+
+        if email is not None:
+            command_args.append(f"--email {email}")
+
+        if agree_tos:
+            command_args.append("--agree-tos")
+
+        if test:
+            command_args.append("--test-cert --dry-run")
+
+        return " ".join(command_args)
+
+    def print_all_certbot_commands(self, test: bool):
+        print("### COMMAND: (standalone) create certs")
+        print(
+            self._certbot_command(
+                CertbotAction.CREATE,
+                test,
+                email=self._template_manager.get_email(),
+            )
+        )
+        print()
+        print("### COMMAND: (webroot+nginx) expand or shrink certs")
+        print(
+            self._certbot_command(
+                CertbotAction.EXPAND,
+                test,
+                email=self._template_manager.get_email(),
+            )
+        )
+        print()
+        print("### COMMAND: (webroot+nginx) renew certs")
+        print(
+            self._certbot_command(
+                CertbotAction.RENEW,
+                test,
+                email=self._template_manager.get_email(),
+            )
+        )
+
+    @property
+    def _cert_path_str(self) -> str:
+        return str(
+            self.app.data_dir.full_path
+            / "certbot/certs/live"
+            / self.root_domain
+            / "fullchain.pem"
+        )
+
+    def get_command_info(self):
+        return "nginx", "Manage nginx related things."
+
+    def setup_arg_parser(self, arg_parser):
+        subparsers = arg_parser.add_subparsers(
+            dest="nginx_command", required=True, metavar="NGINX_COMMAND"
+        )
+        _list_parser = subparsers.add_parser("list", help="list domains")
+        certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
+        certbot_parser.add_argument(
+            "--no-test",
+            action="store_true",
+            help="remove args making certbot run in test mode",
+        )
+
+    def run_command(self, args: Namespace) -> None:
+        if args.nginx_command == "list":
+            self._print_domains()
+        elif args.nginx_command == "certbot":
+            self.print_all_certbot_commands(not args.no_test)
+
+    def _generate_dns_zone(
+        self,
+        ip: str,
+        /,
+        ttl: str | int = 600,
+        *,
+        enable_mail: bool = True,
+        dkim: str | None = None,
+    ) -> str:
+        # TODO: Not complete and test now.
+        root_domain = self.root_domain
+        result = f"$ORIGIN {root_domain}.\n\n"
+        result += "; A records\n"
+        result += f"@ {ttl} IN A {ip}\n"
+        for subdomain in self.subdomains:
+            result += f"{subdomain} {ttl} IN A {ip}\n"
+
+        if enable_mail:
+            result += "\n; MX records\n"
+            result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n"
+            result += "\n; SPF record\n"
+            result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n'
+            if dkim is not None:
+                result += "\n; DKIM record\n"
+                result += f'mail._domainkey {ttl} IN TEXT "{dkim}"'
+                result += "\n; DMARC record\n"
+                dmarc_options = [
+                    "v=DMARC1",
+                    "p=none",
+                    f"rua=mailto:dmarc.report@{root_domain}",
+                    f"ruf=mailto:dmarc.report@{root_domain}",
+                    "sp=none",
+                    "ri=86400",
+                ]
+                result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n'
+        return result
+
+    def _get_dkim_from_mailserver(self) -> str | None:
+        # TODO: Not complete and test now.
+        dkim_path = (
+            self.app.data_dir.full_path
+            / "dms/config/opendkim/keys"
+            / self.root_domain
+            / "mail.txt"
+        )
+        if not dkim_path.exists():
+            return None
+
+        p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True)
+        value = ""
+        for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")):
+            value += match.group(1)
+        return value
+
+    def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str:
+        # TODO: Not complete and test now.
+        return self._generate_dns_zone(
+            ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver()
+        )
diff --git a/python/cru/service/_template.py b/python/cru/service/_template.py
new file mode 100644
index 0000000..22c1d21
--- /dev/null
+++ b/python/cru/service/_template.py
@@ -0,0 +1,228 @@
+from argparse import Namespace
+from pathlib import Path
+import shutil
+from typing import NamedTuple
+import graphlib
+
+from cru import CruException
+from cru.parsing import SimpleLineVarParser
+from cru.template import TemplateTree, CruStrWrapperTemplate
+
+from ._base import AppCommandFeatureProvider, AppFeaturePath
+
+
+class _Config(NamedTuple):
+    text: str
+    config: dict[str, str]
+
+
+class _GeneratedConfig(NamedTuple):
+    base: _Config
+    private: _Config
+    merged: _Config
+
+
+class _PreConfig(NamedTuple):
+    base: _Config
+    private: _Config
+    config: dict[str, str]
+
+    @staticmethod
+    def create(base: _Config, private: _Config) -> "_PreConfig":
+        return _PreConfig(base, private, {**base.config, **private.config})
+
+    def _merge(self, generated: _Config):
+        text = (
+            "\n".join(
+                [
+                    self.private.text.strip(),
+                    self.base.text.strip(),
+                    generated.text.strip(),
+                ]
+            )
+            + "\n"
+        )
+        config = {**self.config, **generated.config}
+        return _GeneratedConfig(self.base, self.private, _Config(text, config))
+
+
+class _Template(NamedTuple):
+    config: CruStrWrapperTemplate
+    config_vars: set[str]
+    tree: TemplateTree
+
+
+class TemplateManager(AppCommandFeatureProvider):
+    def __init__(self):
+        super().__init__("template-manager")
+
+    def setup(self) -> None:
+        self._base_config_file = self.app.services_dir.add_subpath("base-config", False)
+        self._private_config_file = self.app.data_dir.add_subpath("config", False)
+        self._template_config_file = self.app.services_dir.add_subpath(
+            "config.template", False
+        )
+        self._templates_dir = self.app.services_dir.add_subpath("templates", True)
+        self._generated_dir = self.app.services_dir.add_subpath("generated", True)
+
+        self._config_parser = SimpleLineVarParser()
+
+        def _read_pre(app_path: AppFeaturePath) -> _Config:
+            text = app_path.read_text()
+            config = self._read_config(text)
+            return _Config(text, config)
+
+        base = _read_pre(self._base_config_file)
+        private = _read_pre(self._private_config_file)
+        self._preconfig = _PreConfig.create(base, private)
+
+        self._generated: _GeneratedConfig | None = None
+
+        template_config_text = self._template_config_file.read_text()
+        self._template_config = self._read_config(template_config_text)
+
+        self._template = _Template(
+            CruStrWrapperTemplate(template_config_text),
+            set(self._template_config.keys()),
+            TemplateTree(
+                lambda text: CruStrWrapperTemplate(text),
+                self.templates_dir.full_path_str,
+            ),
+        )
+
+        self._real_required_vars = (
+            self._template.config_vars | self._template.tree.variables
+        ) - self._template.config_vars
+        lacks = self._real_required_vars - self._preconfig.config.keys()
+        self._lack_vars = lacks if len(lacks) > 0 else None
+
+    def _read_config_entry_names(self, text: str) -> set[str]:
+        return set(entry.key for entry in self._config_parser.parse(text))
+
+    def _read_config(self, text: str) -> dict[str, str]:
+        return {entry.key: entry.value for entry in self._config_parser.parse(text)}
+
+    @property
+    def templates_dir(self) -> AppFeaturePath:
+        return self._templates_dir
+
+    @property
+    def generated_dir(self) -> AppFeaturePath:
+        return self._generated_dir
+
+    def get_domain(self) -> str:
+        return self._preconfig.config["CRUPEST_DOMAIN"]
+
+    def get_email(self) -> str:
+        return self._preconfig.config["CRUPEST_EMAIL"]
+
+    def _generate_template_config(self, config: dict[str, str]) -> dict[str, str]:
+        entry_templates = {
+            key: CruStrWrapperTemplate(value)
+            for key, value in self._template_config.items()
+        }
+        sorter = graphlib.TopologicalSorter(
+            config
+            | {key: template.variables for key, template in entry_templates.items()}
+        )
+
+        vars: dict[str, str] = config.copy()
+        for _ in sorter.static_order():
+            del_keys = []
+            for key, template in entry_templates.items():
+                new = template.generate_partial(vars)
+                if not new.has_variables:
+                    vars[key] = new.generate({})
+                    del_keys.append(key)
+                else:
+                    entry_templates[key] = new
+            for key in del_keys:
+                del entry_templates[key]
+        assert len(entry_templates) == 0
+        return {key: value for key, value in vars.items() if key not in config}
+
+    def _generate_config(self) -> _GeneratedConfig:
+        if self._generated is not None:
+            return self._generated
+        if self._lack_vars is not None:
+            raise CruException(f"Required vars are not defined: {self._lack_vars}.")
+        config = self._generate_template_config(self._preconfig.config)
+        text = self._template.config.generate(self._preconfig.config | config)
+        self._generated = self._preconfig._merge(_Config(text, config))
+        return self._generated
+
+    def generate(self) -> list[tuple[Path, str]]:
+        config = self._generate_config()
+        return [
+            (Path("config"), config.merged.text),
+            *self._template.tree.generate(config.merged.config),
+        ]
+
+    def _generate_files(self, dry_run: bool) -> None:
+        result = self.generate()
+        if not dry_run:
+            if self.generated_dir.full_path.exists():
+                shutil.rmtree(self.generated_dir.full_path)
+            for path, text in result:
+                des = self.generated_dir.full_path / path
+                des.parent.mkdir(parents=True, exist_ok=True)
+                with open(des, "w") as f:
+                    f.write(text)
+
+    def get_command_info(self):
+        return ("template", "Manage templates.")
+
+    def _print_file_lists(self) -> None:
+        print(f"[{self._template.config.variable_count}]", "config")
+        for path, template in self._template.tree.templates:
+            print(f"[{template.variable_count}]", path.as_posix())
+
+    def _print_vars(self, required: bool) -> None:
+        for var in self._template.config.variables:
+            print(f"[config] {var}")
+        for var in self._template.tree.variables:
+            if not (required and var in self._template.config_vars):
+                print(f"[template] {var}")
+
+    def _run_check_vars(self) -> None:
+        if self._lack_vars is not None:
+            print("Lacks:")
+            for var in self._lack_vars:
+                print(var)
+
+    def setup_arg_parser(self, arg_parser):
+        subparsers = arg_parser.add_subparsers(
+            dest="template_command", required=True, metavar="TEMPLATE_COMMAND"
+        )
+        _list_parser = subparsers.add_parser("list", help="list templates")
+        vars_parser = subparsers.add_parser(
+            "vars", help="list variables used in all templates"
+        )
+        vars_parser.add_argument(
+            "-r",
+            "--required",
+            help="only list really required one.",
+            action="store_true",
+        )
+        _check_vars_parser = subparsers.add_parser(
+            "check-vars",
+            help="check if required vars are set",
+        )
+        generate_parser = subparsers.add_parser("generate", help="generate templates")
+        generate_parser.add_argument(
+            "--no-dry-run", action="store_true", help="generate and write target files"
+        )
+
+    def run_command(self, args: Namespace) -> None:
+        if args.template_command == "list":
+            self._print_file_lists()
+        elif args.template_command == "vars":
+            self._print_vars(args.required)
+        elif args.template_command == "generate":
+            dry_run = not args.no_dry_run
+            self._generate_files(dry_run)
+            if dry_run:
+                print("Dry run successfully.")
+                print(
+                    f"Will delete dir {self.generated_dir.full_path_str} if it exists."
+                )
-- 
cgit v1.2.3