diff options
| author | Yuqian Yang <crupest@crupest.life> | 2025-06-10 16:34:42 +0800 | 
|---|---|---|
| committer | Yuqian Yang <crupest@crupest.life> | 2025-06-10 16:34:42 +0800 | 
| commit | 4abd6020df12427aed62599a68abba87b1ccc3b8 (patch) | |
| tree | e2504019eb8663ef0e49d409b2ca279030c70c60 /python/cru/service | |
| parent | 73d711416dc50378321982c88fe01e48ea18e20a (diff) | |
| download | crupest-4abd6020df12427aed62599a68abba87b1ccc3b8.tar.gz crupest-4abd6020df12427aed62599a68abba87b1ccc3b8.tar.bz2 crupest-4abd6020df12427aed62599a68abba87b1ccc3b8.zip  | |
refactor: bye, python!
Diffstat (limited to 'python/cru/service')
| -rw-r--r-- | python/cru/service/__init__.py | 0 | ||||
| -rw-r--r-- | python/cru/service/__main__.py | 27 | ||||
| -rw-r--r-- | python/cru/service/_app.py | 30 | ||||
| -rw-r--r-- | python/cru/service/_base.py | 400 | ||||
| -rw-r--r-- | python/cru/service/_gen_cmd.py | 200 | ||||
| -rw-r--r-- | python/cru/service/_nginx.py | 263 | ||||
| -rw-r--r-- | python/cru/service/_template.py | 228 | 
7 files changed, 0 insertions, 1148 deletions
diff --git a/python/cru/service/__init__.py b/python/cru/service/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/python/cru/service/__init__.py +++ /dev/null diff --git a/python/cru/service/__main__.py b/python/cru/service/__main__.py deleted file mode 100644 index 2a0268b..0000000 --- a/python/cru/service/__main__.py +++ /dev/null @@ -1,27 +0,0 @@ -import sys - -from cru import CruException - -from ._app import create_app - - -def main(): -    app = create_app() -    app.run_command() - - -if __name__ == "__main__": -    version_info = sys.version_info -    if not (version_info.major == 3 and version_info.minor >= 11): -        print("This application requires Python 3.11 or later.", file=sys.stderr) -        sys.exit(1) - -    try: -        main() -    except CruException as e: -        user_message = e.get_user_message() -        if user_message is not None: -            print(f"Error: {user_message}") -            exit(1) -        else: -            raise diff --git a/python/cru/service/_app.py b/python/cru/service/_app.py deleted file mode 100644 index b4c6271..0000000 --- a/python/cru/service/_app.py +++ /dev/null @@ -1,30 +0,0 @@ -from ._base import ( -    AppBase, -    CommandDispatcher, -    PathCommandProvider, -) -from ._template import TemplateManager -from ._nginx import NginxManager -from ._gen_cmd import GenCmdProvider - -APP_ID = "crupest" - - -class App(AppBase): -    def __init__(self): -        super().__init__(APP_ID, f"{APP_ID}-service") -        self.add_feature(PathCommandProvider()) -        self.add_feature(TemplateManager()) -        self.add_feature(NginxManager()) -        self.add_feature(GenCmdProvider()) -        self.add_feature(CommandDispatcher()) - -    def run_command(self): -        command_dispatcher = self.get_feature(CommandDispatcher) -        command_dispatcher.run_command() - - -def create_app() -> App: -    app = App() -    app.setup() -    return app diff --git a/python/cru/service/_base.py b/python/cru/service/_base.py deleted file mode 100644 index e1eee70..0000000 --- a/python/cru/service/_base.py +++ /dev/null @@ -1,400 +0,0 @@ -from __future__ import annotations - -from argparse import ArgumentParser, Namespace -from abc import ABC, abstractmethod -import argparse -import os -from pathlib import Path -from typing import TypeVar, overload - -from cru import CruException, CruLogicError - -_Feature = TypeVar("_Feature", bound="AppFeatureProvider") - - -class AppError(CruException): -    pass - - -class AppFeatureError(AppError): -    def __init__(self, message, feature: type | str, *args, **kwargs): -        super().__init__(message, *args, **kwargs) -        self._feature = feature - -    @property -    def feature(self) -> type | str: -        return self._feature - - -class AppPathError(CruException): -    def __init__(self, message, _path: str | Path, *args, **kwargs): -        super().__init__(message, *args, **kwargs) -        self._path = str(_path) - -    @property -    def path(self) -> str: -        return self._path - - -class AppPath(ABC): -    def __init__(self, id: str, is_dir: bool, description: str) -> None: -        self._is_dir = is_dir -        self._id = id -        self._description = description - -    @property -    @abstractmethod -    def parent(self) -> AppPath | None: ... - -    @property -    @abstractmethod -    def app(self) -> AppBase: ... - -    @property -    def id(self) -> str: -        return self._id - -    @property -    def description(self) -> str: -        return self._description - -    @property -    def is_dir(self) -> bool: -        return self._is_dir - -    @property -    @abstractmethod -    def full_path(self) -> Path: ... - -    @property -    def full_path_str(self) -> str: -        return str(self.full_path) - -    def check_parents(self, must_exist: bool = False) -> bool: -        for p in reversed(self.full_path.parents): -            if not p.exists() and not must_exist: -                return False -            if not p.is_dir(): -                raise AppPathError("Parents' path must be a dir.", self.full_path) -        return True - -    def check_self(self, must_exist: bool = False) -> bool: -        if not self.check_parents(must_exist): -            return False -        if not self.full_path.exists(): -            if not must_exist: -                return False -            raise AppPathError("Not exist.", self.full_path) -        if self.is_dir: -            if not self.full_path.is_dir(): -                raise AppPathError("Should be a directory, but not.", self.full_path) -            else: -                return True -        else: -            if not self.full_path.is_file(): -                raise AppPathError("Should be a file, but not.", self.full_path) -            else: -                return True - -    def ensure(self, create_file: bool = False) -> None: -        e = self.check_self(False) -        if not e: -            os.makedirs(self.full_path.parent, exist_ok=True) -            if self.is_dir: -                os.mkdir(self.full_path) -            elif create_file: -                with open(self.full_path, "w") as f: -                    f.write("") - -    def read_text(self) -> str: -        if self.is_dir: -            raise AppPathError("Can't read text of a dir.", self.full_path) -        self.check_self() -        return self.full_path.read_text() - -    def add_subpath( -        self, -        name: str, -        is_dir: bool, -        /, -        id: str | None = None, -        description: str = "", -    ) -> AppFeaturePath: -        return self.app._add_path(name, is_dir, self, id, description) - -    @property -    def app_relative_path(self) -> Path: -        return self.full_path.relative_to(self.app.root.full_path) - - -class AppFeaturePath(AppPath): -    def __init__( -        self, -        parent: AppPath, -        name: str, -        is_dir: bool, -        /, -        id: str | None = None, -        description: str = "", -    ) -> None: -        super().__init__(id or name, is_dir, description) -        self._name = name -        self._parent = parent - -    @property -    def name(self) -> str: -        return self._name - -    @property -    def parent(self) -> AppPath: -        return self._parent - -    @property -    def app(self) -> AppBase: -        return self.parent.app - -    @property -    def full_path(self) -> Path: -        return Path(self.parent.full_path, self.name).resolve() - - -class AppRootPath(AppPath): -    def __init__(self, app: AppBase, path: Path): -        super().__init__(f"/{id}", True, f"Application {id} root path.") -        self._app = app -        self._full_path = path.resolve() - -    @property -    def parent(self) -> None: -        return None - -    @property -    def app(self) -> AppBase: -        return self._app - -    @property -    def full_path(self) -> Path: -        return self._full_path - - -class AppFeatureProvider(ABC): -    def __init__(self, name: str, /, app: AppBase | None = None): -        super().__init__() -        self._name = name -        self._app = app if app else AppBase.get_instance() - -    @property -    def app(self) -> AppBase: -        return self._app - -    @property -    def name(self) -> str: -        return self._name - -    @abstractmethod -    def setup(self) -> None: ... - - -class AppCommandFeatureProvider(AppFeatureProvider): -    @abstractmethod -    def get_command_info(self) -> tuple[str, str]: ... - -    @abstractmethod -    def setup_arg_parser(self, arg_parser: ArgumentParser): ... - -    @abstractmethod -    def run_command(self, args: Namespace) -> None: ... - - -class PathCommandProvider(AppCommandFeatureProvider): -    def __init__(self) -> None: -        super().__init__("path-command-provider") - -    def setup(self): -        pass - -    def get_command_info(self): -        return ("path", "Get information about paths used by app.") - -    def setup_arg_parser(self, arg_parser: ArgumentParser) -> None: -        subparsers = arg_parser.add_subparsers( -            dest="path_command", metavar="PATH_COMMAND" -        ) -        _list_parser = subparsers.add_parser( -            "list", help="list special paths used by app" -        ) - -    def run_command(self, args: Namespace) -> None: -        if args.path_command is None or args.path_command == "list": -            for path in self.app.paths: -                print( -                    f"{path.app_relative_path.as_posix()}{'/' if path.is_dir else ''}: {path.description}" -                ) - - -class CommandDispatcher(AppFeatureProvider): -    def __init__(self) -> None: -        super().__init__("command-dispatcher") - -    def setup_arg_parser(self) -> None: -        self._map: dict[str, AppCommandFeatureProvider] = {} -        arg_parser = argparse.ArgumentParser( -            description="Service management", -            formatter_class=argparse.RawDescriptionHelpFormatter, -        ) -        subparsers = arg_parser.add_subparsers( -            dest="command", -            help="The management command to execute.", -            metavar="COMMAND", -        ) -        for feature in self.app.features: -            if isinstance(feature, AppCommandFeatureProvider): -                info = feature.get_command_info() -                command_subparser = subparsers.add_parser(info[0], help=info[1]) -                feature.setup_arg_parser(command_subparser) -                self._map[info[0]] = feature -        self._arg_parser = arg_parser - -    def setup(self): -        self._parsed_args = self.arg_parser.parse_args() - -    @property -    def arg_parser(self) -> argparse.ArgumentParser: -        return self._arg_parser - -    @property -    def command_map(self) -> dict[str, AppCommandFeatureProvider]: -        return self._map - -    @property -    def program_args(self) -> argparse.Namespace: -        return self._parsed_args - -    def run_command(self) -> None: -        args = self.program_args -        if args.command is None: -            self.arg_parser.print_help() -            return -        self.command_map[args.command].run_command(args) - - -class AppBase: -    _instance: AppBase | None = None - -    @staticmethod -    def get_instance() -> AppBase: -        if AppBase._instance is None: -            raise AppError("App instance not initialized") -        return AppBase._instance - -    def __init__(self, app_id: str, name: str): -        AppBase._instance = self -        self._app_id = app_id -        self._name = name -        self._features: list[AppFeatureProvider] = [] -        self._paths: list[AppFeaturePath] = [] - -    def setup(self) -> None: -        command_dispatcher = self.get_feature(CommandDispatcher) -        command_dispatcher.setup_arg_parser() -        self._root = AppRootPath(self, Path(self._ensure_env("CRUPEST_PROJECT_DIR"))) -        self._data_dir = self._root.add_subpath( -            self._ensure_env("CRUPEST_DATA_DIR"), True, id="data" -        ) -        self._services_dir = self._root.add_subpath( -            self._ensure_env("CRUPEST_SERVICES_DIR"), True, id="CRUPEST_SERVICES_DIR" -        ) -        for feature in self.features: -            feature.setup() -        for path in self.paths: -            path.check_self() - -    @property -    def app_id(self) -> str: -        return self._app_id - -    @property -    def name(self) -> str: -        return self._name - -    def _ensure_env(self, env_name: str) -> str: -        value = os.getenv(env_name) -        if value is None: -            raise AppError(f"Environment variable {env_name} not set") -        return value - -    @property -    def root(self) -> AppRootPath: -        return self._root - -    @property -    def data_dir(self) -> AppFeaturePath: -        return self._data_dir - -    @property -    def services_dir(self) -> AppFeaturePath: -        return self._services_dir - -    @property -    def app_initialized(self) -> bool: -        return self.data_dir.check_self() - -    @property -    def features(self) -> list[AppFeatureProvider]: -        return self._features - -    @property -    def paths(self) -> list[AppFeaturePath]: -        return self._paths - -    def add_feature(self, feature: _Feature) -> _Feature: -        for f in self.features: -            if f.name == feature.name: -                raise AppFeatureError( -                    f"Duplicate feature name: {feature.name}.", feature.name -                ) -        self._features.append(feature) -        return feature - -    def _add_path( -        self, -        name: str, -        is_dir: bool, -        /, -        parent: AppPath | None = None, -        id: str | None = None, -        description: str = "", -    ) -> AppFeaturePath: -        p = AppFeaturePath( -            parent or self.root, name, is_dir, id=id, description=description -        ) -        self._paths.append(p) -        return p - -    @overload -    def get_feature(self, feature: str) -> AppFeatureProvider: ... - -    @overload -    def get_feature(self, feature: type[_Feature]) -> _Feature: ... - -    def get_feature( -        self, feature: str | type[_Feature] -    ) -> AppFeatureProvider | _Feature: -        if isinstance(feature, str): -            for f in self._features: -                if f.name == feature: -                    return f -        elif isinstance(feature, type): -            for f in self._features: -                if isinstance(f, feature): -                    return f -        else: -            raise CruLogicError("Argument must be the name of feature or its class.") - -        raise AppFeatureError(f"Feature {feature} not found.", feature) - -    def get_path(self, name: str) -> AppFeaturePath: -        for p in self._paths: -            if p.id == name or p.name == name: -                return p -        raise AppPathError(f"Application path {name} not found.", name) diff --git a/python/cru/service/_gen_cmd.py b/python/cru/service/_gen_cmd.py deleted file mode 100644 index f51d65f..0000000 --- a/python/cru/service/_gen_cmd.py +++ /dev/null @@ -1,200 +0,0 @@ -from dataclasses import dataclass, replace -from typing import TypeAlias - -from ._base import AppCommandFeatureProvider -from ._nginx import NginxManager - -_Str_Or_Cmd_List: TypeAlias = str | list["_Cmd"] - - -@dataclass -class _Cmd: -    name: str -    desc: str -    cmd: _Str_Or_Cmd_List - -    def clean(self) -> "_Cmd": -        if isinstance(self.cmd, list): -            return replace( -                self, -                cmd=[cmd.clean() for cmd in self.cmd], -            ) -        elif isinstance(self.cmd, str): -            return replace(self, cmd=self.cmd.strip()) -        else: -            raise ValueError("Unexpected type for cmd.") - -    def generate_text( -        self, -        info_only: bool, -        *, -        parent: str | None = None, -    ) -> str: -        if parent is None: -            tag = "COMMAND" -            long_name = self.name -            indent = "" -        else: -            tag = "SUBCOMMAND" -            long_name = f"{parent}.{self.name}" -            indent = "  " - -        if info_only: -            return f"{indent}[{long_name}]: {self.desc}" - -        text = f"--- {tag}[{long_name}]: {self.desc}" -        if isinstance(self.cmd, str): -            text += "\n" + self.cmd -        elif isinstance(self.cmd, list): -            for sub in self.cmd: -                text += "\n" * 2 + sub.generate_text(info_only, parent=self.name) -        else: -            raise ValueError("Unexpected type for cmd.") - -        lines: list[str] = [] -        for line in text.splitlines(): -            if len(line) == 0: -                lines.append("") -            else: -                lines.append(indent + line) -        text = "\n".join(lines) - -        return text - - -_docker_uninstall = _Cmd( -    "uninstall", -    "uninstall apt docker", -    """ -for pkg in docker.io docker-doc docker-compose \ -podman-docker containerd runc; \ -do sudo apt-get remove $pkg; done -""", -) - -_docker_apt_certs = _Cmd( -    "apt-certs", -    "prepare apt certs", -    """ -sudo apt-get update -sudo apt-get install ca-certificates curl -sudo install -m 0755 -d /etc/apt/keyrings -""", -) - -_docker_docker_certs = _Cmd( -    "docker-certs", -    "add docker certs", -    """ -sudo curl -fsSL https://download.docker.com/linux/debian/gpg \ --o /etc/apt/keyrings/docker.asc -sudo chmod a+r /etc/apt/keyrings/docker.asc -""", -) - -_docker_apt_repo = _Cmd( -    "apt-repo", -    "add docker apt repo", -    """ -echo \\ -  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \ -https://download.docker.com/linux/debian \\ -  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\ -  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null -""", -) - -_docker_install = _Cmd( -    "install", -    "update apt and install docker", -    """ -sudo apt-get update -sudo apt-get install docker-ce docker-ce-cli containerd.io \ -docker-buildx-plugin docker-compose-plugin -""", -) - -_docker_setup = _Cmd( -    "setup", -    "setup system for docker", -    """ -sudo systemctl enable docker -sudo systemctl start docker -sudo groupadd -f docker -sudo usermod -aG docker $USER -# Remember to log out and log back in for the group changes to take effect -""", -) - -_docker = _Cmd( -    "install-docker", -    "install docker for a fresh new system", -    [ -        _docker_uninstall, -        _docker_apt_certs, -        _docker_docker_certs, -        _docker_apt_repo, -        _docker_install, -        _docker_setup, -    ], -) - -_update_blog = _Cmd( -    "update-blog", -    "re-generate blog pages", -    """ -docker exec -it blog /scripts/update.bash -""", -) - -_git_user = _Cmd( -    "git-user", -    "add/set git server user and password", -    """ -docker run -it --rm -v "$ps_file:/user-info" httpd htpasswd "/user-info" [username] -""", -) - - -class GenCmdProvider(AppCommandFeatureProvider): -    def __init__(self) -> None: -        super().__init__("gen-cmd-provider") -        self._cmds: dict[str, _Cmd] = {} -        self._register_cmds(_docker, _update_blog, _git_user) - -    def _register_cmd(self, cmd: "_Cmd"): -        self._cmds[cmd.name] = cmd.clean() - -    def _register_cmds(self, *cmds: "_Cmd"): -        for c in cmds: -            self._register_cmd(c) - -    def setup(self): -        pass - -    def get_command_info(self): -        return ("gen-cmd", "Get commands of running external cli tools.") - -    def setup_arg_parser(self, arg_parser): -        subparsers = arg_parser.add_subparsers( -            dest="gen_cmd", metavar="GEN_CMD_COMMAND" -        ) -        certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") -        certbot_parser.add_argument( -            "-t", "--test", action="store_true", help="run certbot in test mode" -        ) -        for cmd in self._cmds.values(): -            subparsers.add_parser(cmd.name, help=cmd.desc) - -    def _print_cmd(self, name: str): -        print(self._cmds[name].generate_text(False)) - -    def run_command(self, args): -        if args.gen_cmd is None or args.gen_cmd == "list": -            print("[certbot]: certbot ssl cert commands") -            for cmd in self._cmds.values(): -                print(cmd.generate_text(True)) -        elif args.gen_cmd == "certbot": -            self.app.get_feature(NginxManager).print_all_certbot_commands(args.test) -        else: -            self._print_cmd(args.gen_cmd) diff --git a/python/cru/service/_nginx.py b/python/cru/service/_nginx.py deleted file mode 100644 index 87cff6d..0000000 --- a/python/cru/service/_nginx.py +++ /dev/null @@ -1,263 +0,0 @@ -from argparse import Namespace -from enum import Enum, auto -import re -import subprocess -from typing import TypeAlias - -from cru import CruInternalError - -from ._base import AppCommandFeatureProvider -from ._template import TemplateManager - - -class CertbotAction(Enum): -    CREATE = auto() -    EXPAND = auto() -    SHRINK = auto() -    RENEW = auto() - - -class NginxManager(AppCommandFeatureProvider): -    CertbotAction: TypeAlias = CertbotAction - -    def __init__(self) -> None: -        super().__init__("nginx-manager") -        self._domains_cache: list[str] | None = None - -    def setup(self) -> None: -        pass - -    @property -    def _template_manager(self) -> TemplateManager: -        return self.app.get_feature(TemplateManager) - -    @property -    def root_domain(self) -> str: -        return self._template_manager.get_domain() - -    @property -    def domains(self) -> list[str]: -        if self._domains_cache is None: -            self._domains_cache = self._get_domains() -        return self._domains_cache - -    @property -    def subdomains(self) -> list[str]: -        suffix = "." + self.root_domain -        return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)] - -    def _get_domains_from_text(self, text: str) -> set[str]: -        domains: set[str] = set() -        regex = re.compile(r"server_name\s+(\S+)\s*;") -        for match in regex.finditer(text): -            domains.add(match[1]) -        return domains - -    def _join_generated_nginx_conf_text(self) -> str: -        result = "" -        for path, text in self._template_manager.generate(): -            if "nginx" in str(path): -                result += text -        return result - -    def _get_domains(self) -> list[str]: -        text = self._join_generated_nginx_conf_text() -        domains = self._get_domains_from_text(text) -        domains.remove(self.root_domain) -        return [self.root_domain, *domains] - -    def _print_domains(self) -> None: -        for domain in self.domains: -            print(domain) - -    def _certbot_command( -        self, -        action: CertbotAction | str, -        test: bool, -        *, -        docker=True, -        standalone=None, -        email=None, -        agree_tos=True, -    ) -> str: -        if isinstance(action, str): -            action = CertbotAction[action.upper()] - -        command_args = [] - -        add_domain_option = True -        if action is CertbotAction.CREATE: -            if standalone is None: -                standalone = True -            command_action = "certonly" -        elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]: -            if standalone is None: -                standalone = False -            command_action = "certonly" -        elif action is CertbotAction.RENEW: -            if standalone is None: -                standalone = False -            add_domain_option = False -            command_action = "renew" -        else: -            raise CruInternalError("Invalid certbot action.") - -        data_dir = self.app.data_dir.full_path.as_posix() - -        if not docker: -            command_args.append("certbot") -        else: -            command_args.extend( -                [ -                    "docker run -it --rm --name certbot", -                    f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"', -                    f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"', -                ] -            ) -            if standalone: -                command_args.append('-p "0.0.0.0:80:80"') -            else: -                command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"') - -            command_args.append("certbot/certbot") - -        command_args.append(command_action) - -        command_args.append(f"--cert-name {self.root_domain}") - -        if standalone: -            command_args.append("--standalone") -        else: -            command_args.append("--webroot -w /var/www/certbot") - -        if add_domain_option: -            command_args.append(" ".join([f"-d {domain}" for domain in self.domains])) - -        if email is not None: -            command_args.append(f"--email {email}") - -        if agree_tos: -            command_args.append("--agree-tos") - -        if test: -            command_args.append("--test-cert --dry-run") - -        return " ".join(command_args) - -    def print_all_certbot_commands(self, test: bool): -        print("### COMMAND: (standalone) create certs") -        print( -            self._certbot_command( -                CertbotAction.CREATE, -                test, -                email=self._template_manager.get_email(), -            ) -        ) -        print() -        print("### COMMAND: (webroot+nginx) expand or shrink certs") -        print( -            self._certbot_command( -                CertbotAction.EXPAND, -                test, -                email=self._template_manager.get_email(), -            ) -        ) -        print() -        print("### COMMAND: (webroot+nginx) renew certs") -        print( -            self._certbot_command( -                CertbotAction.RENEW, -                test, -                email=self._template_manager.get_email(), -            ) -        ) - -    @property -    def _cert_path_str(self) -> str: -        return str( -            self.app.data_dir.full_path -            / "certbot/certs/live" -            / self.root_domain -            / "fullchain.pem" -        ) - -    def get_command_info(self): -        return "nginx", "Manage nginx related things." - -    def setup_arg_parser(self, arg_parser): -        subparsers = arg_parser.add_subparsers( -            dest="nginx_command", required=True, metavar="NGINX_COMMAND" -        ) -        _list_parser = subparsers.add_parser("list", help="list domains") -        certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") -        certbot_parser.add_argument( -            "--no-test", -            action="store_true", -            help="remove args making certbot run in test mode", -        ) - -    def run_command(self, args: Namespace) -> None: -        if args.nginx_command == "list": -            self._print_domains() -        elif args.nginx_command == "certbot": -            self.print_all_certbot_commands(not args.no_test) - -    def _generate_dns_zone( -        self, -        ip: str, -        /, -        ttl: str | int = 600, -        *, -        enable_mail: bool = True, -        dkim: str | None = None, -    ) -> str: -        # TODO: Not complete and test now. -        root_domain = self.root_domain -        result = f"$ORIGIN {root_domain}.\n\n" -        result += "; A records\n" -        result += f"@ {ttl} IN A {ip}\n" -        for subdomain in self.subdomains: -            result += f"{subdomain} {ttl} IN A {ip}\n" - -        if enable_mail: -            result += "\n; MX records\n" -            result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n" -            result += "\n; SPF record\n" -            result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n' -            if dkim is not None: -                result += "\n; DKIM record\n" -                result += f'mail._domainkey {ttl} IN TEXT "{dkim}"' -                result += "\n; DMARC record\n" -                dmarc_options = [ -                    "v=DMARC1", -                    "p=none", -                    f"rua=mailto:dmarc.report@{root_domain}", -                    f"ruf=mailto:dmarc.report@{root_domain}", -                    "sp=none", -                    "ri=86400", -                ] -                result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n' -        return result - -    def _get_dkim_from_mailserver(self) -> str | None: -        # TODO: Not complete and test now. -        dkim_path = ( -            self.app.data_dir.full_path -            / "dms/config/opendkim/keys" -            / self.root_domain -            / "mail.txt" -        ) -        if not dkim_path.exists(): -            return None - -        p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True) -        value = "" -        for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")): -            value += match.group(1) -        return value - -    def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str: -        # TODO: Not complete and test now. -        return self._generate_dns_zone( -            ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver() -        ) diff --git a/python/cru/service/_template.py b/python/cru/service/_template.py deleted file mode 100644 index 22c1d21..0000000 --- a/python/cru/service/_template.py +++ /dev/null @@ -1,228 +0,0 @@ -from argparse import Namespace -from pathlib import Path -import shutil -from typing import NamedTuple -import graphlib - -from cru import CruException -from cru.parsing import SimpleLineVarParser -from cru.template import TemplateTree, CruStrWrapperTemplate - -from ._base import AppCommandFeatureProvider, AppFeaturePath - - -class _Config(NamedTuple): -    text: str -    config: dict[str, str] - - -class _GeneratedConfig(NamedTuple): -    base: _Config -    private: _Config -    merged: _Config - - -class _PreConfig(NamedTuple): -    base: _Config -    private: _Config -    config: dict[str, str] - -    @staticmethod -    def create(base: _Config, private: _Config) -> "_PreConfig": -        return _PreConfig(base, private, {**base.config, **private.config}) - -    def _merge(self, generated: _Config): -        text = ( -            "\n".join( -                [ -                    self.private.text.strip(), -                    self.base.text.strip(), -                    generated.text.strip(), -                ] -            ) -            + "\n" -        ) -        config = {**self.config, **generated.config} -        return _GeneratedConfig(self.base, self.private, _Config(text, config)) - - -class _Template(NamedTuple): -    config: CruStrWrapperTemplate -    config_vars: set[str] -    tree: TemplateTree - - -class TemplateManager(AppCommandFeatureProvider): -    def __init__(self): -        super().__init__("template-manager") - -    def setup(self) -> None: -        self._base_config_file = self.app.services_dir.add_subpath("base-config", False) -        self._private_config_file = self.app.data_dir.add_subpath("config", False) -        self._template_config_file = self.app.services_dir.add_subpath( -            "config.template", False -        ) -        self._templates_dir = self.app.services_dir.add_subpath("templates", True) -        self._generated_dir = self.app.services_dir.add_subpath("generated", True) - -        self._config_parser = SimpleLineVarParser() - -        def _read_pre(app_path: AppFeaturePath) -> _Config: -            text = app_path.read_text() -            config = self._read_config(text) -            return _Config(text, config) - -        base = _read_pre(self._base_config_file) -        private = _read_pre(self._private_config_file) -        self._preconfig = _PreConfig.create(base, private) - -        self._generated: _GeneratedConfig | None = None - -        template_config_text = self._template_config_file.read_text() -        self._template_config = self._read_config(template_config_text) - -        self._template = _Template( -            CruStrWrapperTemplate(template_config_text), -            set(self._template_config.keys()), -            TemplateTree( -                lambda text: CruStrWrapperTemplate(text), -                self.templates_dir.full_path_str, -            ), -        ) - -        self._real_required_vars = ( -            self._template.config_vars | self._template.tree.variables -        ) - self._template.config_vars -        lacks = self._real_required_vars - self._preconfig.config.keys() -        self._lack_vars = lacks if len(lacks) > 0 else None - -    def _read_config_entry_names(self, text: str) -> set[str]: -        return set(entry.key for entry in self._config_parser.parse(text)) - -    def _read_config(self, text: str) -> dict[str, str]: -        return {entry.key: entry.value for entry in self._config_parser.parse(text)} - -    @property -    def templates_dir(self) -> AppFeaturePath: -        return self._templates_dir - -    @property -    def generated_dir(self) -> AppFeaturePath: -        return self._generated_dir - -    def get_domain(self) -> str: -        return self._preconfig.config["CRUPEST_DOMAIN"] - -    def get_email(self) -> str: -        return self._preconfig.config["CRUPEST_EMAIL"] - -    def _generate_template_config(self, config: dict[str, str]) -> dict[str, str]: -        entry_templates = { -            key: CruStrWrapperTemplate(value) -            for key, value in self._template_config.items() -        } -        sorter = graphlib.TopologicalSorter( -            config -            | {key: template.variables for key, template in entry_templates.items()} -        ) - -        vars: dict[str, str] = config.copy() -        for _ in sorter.static_order(): -            del_keys = [] -            for key, template in entry_templates.items(): -                new = template.generate_partial(vars) -                if not new.has_variables: -                    vars[key] = new.generate({}) -                    del_keys.append(key) -                else: -                    entry_templates[key] = new -            for key in del_keys: -                del entry_templates[key] -        assert len(entry_templates) == 0 -        return {key: value for key, value in vars.items() if key not in config} - -    def _generate_config(self) -> _GeneratedConfig: -        if self._generated is not None: -            return self._generated -        if self._lack_vars is not None: -            raise CruException(f"Required vars are not defined: {self._lack_vars}.") -        config = self._generate_template_config(self._preconfig.config) -        text = self._template.config.generate(self._preconfig.config | config) -        self._generated = self._preconfig._merge(_Config(text, config)) -        return self._generated - -    def generate(self) -> list[tuple[Path, str]]: -        config = self._generate_config() -        return [ -            (Path("config"), config.merged.text), -            *self._template.tree.generate(config.merged.config), -        ] - -    def _generate_files(self, dry_run: bool) -> None: -        result = self.generate() -        if not dry_run: -            if self.generated_dir.full_path.exists(): -                shutil.rmtree(self.generated_dir.full_path) -            for path, text in result: -                des = self.generated_dir.full_path / path -                des.parent.mkdir(parents=True, exist_ok=True) -                with open(des, "w") as f: -                    f.write(text) - -    def get_command_info(self): -        return ("template", "Manage templates.") - -    def _print_file_lists(self) -> None: -        print(f"[{self._template.config.variable_count}]", "config") -        for path, template in self._template.tree.templates: -            print(f"[{template.variable_count}]", path.as_posix()) - -    def _print_vars(self, required: bool) -> None: -        for var in self._template.config.variables: -            print(f"[config] {var}") -        for var in self._template.tree.variables: -            if not (required and var in self._template.config_vars): -                print(f"[template] {var}") - -    def _run_check_vars(self) -> None: -        if self._lack_vars is not None: -            print("Lacks:") -            for var in self._lack_vars: -                print(var) - -    def setup_arg_parser(self, arg_parser): -        subparsers = arg_parser.add_subparsers( -            dest="template_command", required=True, metavar="TEMPLATE_COMMAND" -        ) -        _list_parser = subparsers.add_parser("list", help="list templates") -        vars_parser = subparsers.add_parser( -            "vars", help="list variables used in all templates" -        ) -        vars_parser.add_argument( -            "-r", -            "--required", -            help="only list really required one.", -            action="store_true", -        ) -        _check_vars_parser = subparsers.add_parser( -            "check-vars", -            help="check if required vars are set", -        ) -        generate_parser = subparsers.add_parser("generate", help="generate templates") -        generate_parser.add_argument( -            "--no-dry-run", action="store_true", help="generate and write target files" -        ) - -    def run_command(self, args: Namespace) -> None: -        if args.template_command == "list": -            self._print_file_lists() -        elif args.template_command == "vars": -            self._print_vars(args.required) -        elif args.template_command == "generate": -            dry_run = not args.no_dry_run -            self._generate_files(dry_run) -            if dry_run: -                print("Dry run successfully.") -                print( -                    f"Will delete dir {self.generated_dir.full_path_str} if it exists." -                )  | 
