diff options
Diffstat (limited to 'python/cru/service')
| -rw-r--r-- | python/cru/service/__init__.py | 0 | ||||
| -rw-r--r-- | python/cru/service/__main__.py | 27 | ||||
| -rw-r--r-- | python/cru/service/_app.py | 30 | ||||
| -rw-r--r-- | python/cru/service/_base.py | 400 | ||||
| -rw-r--r-- | python/cru/service/_gen_cmd.py | 200 | ||||
| -rw-r--r-- | python/cru/service/_nginx.py | 263 | ||||
| -rw-r--r-- | python/cru/service/_template.py | 228 | 
7 files changed, 1148 insertions, 0 deletions
| diff --git a/python/cru/service/__init__.py b/python/cru/service/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/python/cru/service/__init__.py diff --git a/python/cru/service/__main__.py b/python/cru/service/__main__.py new file mode 100644 index 0000000..2a0268b --- /dev/null +++ b/python/cru/service/__main__.py @@ -0,0 +1,27 @@ +import sys + +from cru import CruException + +from ._app import create_app + + +def main(): +    app = create_app() +    app.run_command() + + +if __name__ == "__main__": +    version_info = sys.version_info +    if not (version_info.major == 3 and version_info.minor >= 11): +        print("This application requires Python 3.11 or later.", file=sys.stderr) +        sys.exit(1) + +    try: +        main() +    except CruException as e: +        user_message = e.get_user_message() +        if user_message is not None: +            print(f"Error: {user_message}") +            exit(1) +        else: +            raise diff --git a/python/cru/service/_app.py b/python/cru/service/_app.py new file mode 100644 index 0000000..b4c6271 --- /dev/null +++ b/python/cru/service/_app.py @@ -0,0 +1,30 @@ +from ._base import ( +    AppBase, +    CommandDispatcher, +    PathCommandProvider, +) +from ._template import TemplateManager +from ._nginx import NginxManager +from ._gen_cmd import GenCmdProvider + +APP_ID = "crupest" + + +class App(AppBase): +    def __init__(self): +        super().__init__(APP_ID, f"{APP_ID}-service") +        self.add_feature(PathCommandProvider()) +        self.add_feature(TemplateManager()) +        self.add_feature(NginxManager()) +        self.add_feature(GenCmdProvider()) +        self.add_feature(CommandDispatcher()) + +    def run_command(self): +        command_dispatcher = self.get_feature(CommandDispatcher) +        command_dispatcher.run_command() + + +def create_app() -> App: +    app = App() +    app.setup() +    return app diff --git a/python/cru/service/_base.py b/python/cru/service/_base.py new file mode 100644 index 0000000..e1eee70 --- /dev/null +++ b/python/cru/service/_base.py @@ -0,0 +1,400 @@ +from __future__ import annotations + +from argparse import ArgumentParser, Namespace +from abc import ABC, abstractmethod +import argparse +import os +from pathlib import Path +from typing import TypeVar, overload + +from cru import CruException, CruLogicError + +_Feature = TypeVar("_Feature", bound="AppFeatureProvider") + + +class AppError(CruException): +    pass + + +class AppFeatureError(AppError): +    def __init__(self, message, feature: type | str, *args, **kwargs): +        super().__init__(message, *args, **kwargs) +        self._feature = feature + +    @property +    def feature(self) -> type | str: +        return self._feature + + +class AppPathError(CruException): +    def __init__(self, message, _path: str | Path, *args, **kwargs): +        super().__init__(message, *args, **kwargs) +        self._path = str(_path) + +    @property +    def path(self) -> str: +        return self._path + + +class AppPath(ABC): +    def __init__(self, id: str, is_dir: bool, description: str) -> None: +        self._is_dir = is_dir +        self._id = id +        self._description = description + +    @property +    @abstractmethod +    def parent(self) -> AppPath | None: ... + +    @property +    @abstractmethod +    def app(self) -> AppBase: ... + +    @property +    def id(self) -> str: +        return self._id + +    @property +    def description(self) -> str: +        return self._description + +    @property +    def is_dir(self) -> bool: +        return self._is_dir + +    @property +    @abstractmethod +    def full_path(self) -> Path: ... + +    @property +    def full_path_str(self) -> str: +        return str(self.full_path) + +    def check_parents(self, must_exist: bool = False) -> bool: +        for p in reversed(self.full_path.parents): +            if not p.exists() and not must_exist: +                return False +            if not p.is_dir(): +                raise AppPathError("Parents' path must be a dir.", self.full_path) +        return True + +    def check_self(self, must_exist: bool = False) -> bool: +        if not self.check_parents(must_exist): +            return False +        if not self.full_path.exists(): +            if not must_exist: +                return False +            raise AppPathError("Not exist.", self.full_path) +        if self.is_dir: +            if not self.full_path.is_dir(): +                raise AppPathError("Should be a directory, but not.", self.full_path) +            else: +                return True +        else: +            if not self.full_path.is_file(): +                raise AppPathError("Should be a file, but not.", self.full_path) +            else: +                return True + +    def ensure(self, create_file: bool = False) -> None: +        e = self.check_self(False) +        if not e: +            os.makedirs(self.full_path.parent, exist_ok=True) +            if self.is_dir: +                os.mkdir(self.full_path) +            elif create_file: +                with open(self.full_path, "w") as f: +                    f.write("") + +    def read_text(self) -> str: +        if self.is_dir: +            raise AppPathError("Can't read text of a dir.", self.full_path) +        self.check_self() +        return self.full_path.read_text() + +    def add_subpath( +        self, +        name: str, +        is_dir: bool, +        /, +        id: str | None = None, +        description: str = "", +    ) -> AppFeaturePath: +        return self.app._add_path(name, is_dir, self, id, description) + +    @property +    def app_relative_path(self) -> Path: +        return self.full_path.relative_to(self.app.root.full_path) + + +class AppFeaturePath(AppPath): +    def __init__( +        self, +        parent: AppPath, +        name: str, +        is_dir: bool, +        /, +        id: str | None = None, +        description: str = "", +    ) -> None: +        super().__init__(id or name, is_dir, description) +        self._name = name +        self._parent = parent + +    @property +    def name(self) -> str: +        return self._name + +    @property +    def parent(self) -> AppPath: +        return self._parent + +    @property +    def app(self) -> AppBase: +        return self.parent.app + +    @property +    def full_path(self) -> Path: +        return Path(self.parent.full_path, self.name).resolve() + + +class AppRootPath(AppPath): +    def __init__(self, app: AppBase, path: Path): +        super().__init__(f"/{id}", True, f"Application {id} root path.") +        self._app = app +        self._full_path = path.resolve() + +    @property +    def parent(self) -> None: +        return None + +    @property +    def app(self) -> AppBase: +        return self._app + +    @property +    def full_path(self) -> Path: +        return self._full_path + + +class AppFeatureProvider(ABC): +    def __init__(self, name: str, /, app: AppBase | None = None): +        super().__init__() +        self._name = name +        self._app = app if app else AppBase.get_instance() + +    @property +    def app(self) -> AppBase: +        return self._app + +    @property +    def name(self) -> str: +        return self._name + +    @abstractmethod +    def setup(self) -> None: ... + + +class AppCommandFeatureProvider(AppFeatureProvider): +    @abstractmethod +    def get_command_info(self) -> tuple[str, str]: ... + +    @abstractmethod +    def setup_arg_parser(self, arg_parser: ArgumentParser): ... + +    @abstractmethod +    def run_command(self, args: Namespace) -> None: ... + + +class PathCommandProvider(AppCommandFeatureProvider): +    def __init__(self) -> None: +        super().__init__("path-command-provider") + +    def setup(self): +        pass + +    def get_command_info(self): +        return ("path", "Get information about paths used by app.") + +    def setup_arg_parser(self, arg_parser: ArgumentParser) -> None: +        subparsers = arg_parser.add_subparsers( +            dest="path_command", metavar="PATH_COMMAND" +        ) +        _list_parser = subparsers.add_parser( +            "list", help="list special paths used by app" +        ) + +    def run_command(self, args: Namespace) -> None: +        if args.path_command is None or args.path_command == "list": +            for path in self.app.paths: +                print( +                    f"{path.app_relative_path.as_posix()}{'/' if path.is_dir else ''}: {path.description}" +                ) + + +class CommandDispatcher(AppFeatureProvider): +    def __init__(self) -> None: +        super().__init__("command-dispatcher") + +    def setup_arg_parser(self) -> None: +        self._map: dict[str, AppCommandFeatureProvider] = {} +        arg_parser = argparse.ArgumentParser( +            description="Service management", +            formatter_class=argparse.RawDescriptionHelpFormatter, +        ) +        subparsers = arg_parser.add_subparsers( +            dest="command", +            help="The management command to execute.", +            metavar="COMMAND", +        ) +        for feature in self.app.features: +            if isinstance(feature, AppCommandFeatureProvider): +                info = feature.get_command_info() +                command_subparser = subparsers.add_parser(info[0], help=info[1]) +                feature.setup_arg_parser(command_subparser) +                self._map[info[0]] = feature +        self._arg_parser = arg_parser + +    def setup(self): +        self._parsed_args = self.arg_parser.parse_args() + +    @property +    def arg_parser(self) -> argparse.ArgumentParser: +        return self._arg_parser + +    @property +    def command_map(self) -> dict[str, AppCommandFeatureProvider]: +        return self._map + +    @property +    def program_args(self) -> argparse.Namespace: +        return self._parsed_args + +    def run_command(self) -> None: +        args = self.program_args +        if args.command is None: +            self.arg_parser.print_help() +            return +        self.command_map[args.command].run_command(args) + + +class AppBase: +    _instance: AppBase | None = None + +    @staticmethod +    def get_instance() -> AppBase: +        if AppBase._instance is None: +            raise AppError("App instance not initialized") +        return AppBase._instance + +    def __init__(self, app_id: str, name: str): +        AppBase._instance = self +        self._app_id = app_id +        self._name = name +        self._features: list[AppFeatureProvider] = [] +        self._paths: list[AppFeaturePath] = [] + +    def setup(self) -> None: +        command_dispatcher = self.get_feature(CommandDispatcher) +        command_dispatcher.setup_arg_parser() +        self._root = AppRootPath(self, Path(self._ensure_env("CRUPEST_PROJECT_DIR"))) +        self._data_dir = self._root.add_subpath( +            self._ensure_env("CRUPEST_DATA_DIR"), True, id="data" +        ) +        self._services_dir = self._root.add_subpath( +            self._ensure_env("CRUPEST_SERVICES_DIR"), True, id="CRUPEST_SERVICES_DIR" +        ) +        for feature in self.features: +            feature.setup() +        for path in self.paths: +            path.check_self() + +    @property +    def app_id(self) -> str: +        return self._app_id + +    @property +    def name(self) -> str: +        return self._name + +    def _ensure_env(self, env_name: str) -> str: +        value = os.getenv(env_name) +        if value is None: +            raise AppError(f"Environment variable {env_name} not set") +        return value + +    @property +    def root(self) -> AppRootPath: +        return self._root + +    @property +    def data_dir(self) -> AppFeaturePath: +        return self._data_dir + +    @property +    def services_dir(self) -> AppFeaturePath: +        return self._services_dir + +    @property +    def app_initialized(self) -> bool: +        return self.data_dir.check_self() + +    @property +    def features(self) -> list[AppFeatureProvider]: +        return self._features + +    @property +    def paths(self) -> list[AppFeaturePath]: +        return self._paths + +    def add_feature(self, feature: _Feature) -> _Feature: +        for f in self.features: +            if f.name == feature.name: +                raise AppFeatureError( +                    f"Duplicate feature name: {feature.name}.", feature.name +                ) +        self._features.append(feature) +        return feature + +    def _add_path( +        self, +        name: str, +        is_dir: bool, +        /, +        parent: AppPath | None = None, +        id: str | None = None, +        description: str = "", +    ) -> AppFeaturePath: +        p = AppFeaturePath( +            parent or self.root, name, is_dir, id=id, description=description +        ) +        self._paths.append(p) +        return p + +    @overload +    def get_feature(self, feature: str) -> AppFeatureProvider: ... + +    @overload +    def get_feature(self, feature: type[_Feature]) -> _Feature: ... + +    def get_feature( +        self, feature: str | type[_Feature] +    ) -> AppFeatureProvider | _Feature: +        if isinstance(feature, str): +            for f in self._features: +                if f.name == feature: +                    return f +        elif isinstance(feature, type): +            for f in self._features: +                if isinstance(f, feature): +                    return f +        else: +            raise CruLogicError("Argument must be the name of feature or its class.") + +        raise AppFeatureError(f"Feature {feature} not found.", feature) + +    def get_path(self, name: str) -> AppFeaturePath: +        for p in self._paths: +            if p.id == name or p.name == name: +                return p +        raise AppPathError(f"Application path {name} not found.", name) diff --git a/python/cru/service/_gen_cmd.py b/python/cru/service/_gen_cmd.py new file mode 100644 index 0000000..f51d65f --- /dev/null +++ b/python/cru/service/_gen_cmd.py @@ -0,0 +1,200 @@ +from dataclasses import dataclass, replace +from typing import TypeAlias + +from ._base import AppCommandFeatureProvider +from ._nginx import NginxManager + +_Str_Or_Cmd_List: TypeAlias = str | list["_Cmd"] + + +@dataclass +class _Cmd: +    name: str +    desc: str +    cmd: _Str_Or_Cmd_List + +    def clean(self) -> "_Cmd": +        if isinstance(self.cmd, list): +            return replace( +                self, +                cmd=[cmd.clean() for cmd in self.cmd], +            ) +        elif isinstance(self.cmd, str): +            return replace(self, cmd=self.cmd.strip()) +        else: +            raise ValueError("Unexpected type for cmd.") + +    def generate_text( +        self, +        info_only: bool, +        *, +        parent: str | None = None, +    ) -> str: +        if parent is None: +            tag = "COMMAND" +            long_name = self.name +            indent = "" +        else: +            tag = "SUBCOMMAND" +            long_name = f"{parent}.{self.name}" +            indent = "  " + +        if info_only: +            return f"{indent}[{long_name}]: {self.desc}" + +        text = f"--- {tag}[{long_name}]: {self.desc}" +        if isinstance(self.cmd, str): +            text += "\n" + self.cmd +        elif isinstance(self.cmd, list): +            for sub in self.cmd: +                text += "\n" * 2 + sub.generate_text(info_only, parent=self.name) +        else: +            raise ValueError("Unexpected type for cmd.") + +        lines: list[str] = [] +        for line in text.splitlines(): +            if len(line) == 0: +                lines.append("") +            else: +                lines.append(indent + line) +        text = "\n".join(lines) + +        return text + + +_docker_uninstall = _Cmd( +    "uninstall", +    "uninstall apt docker", +    """ +for pkg in docker.io docker-doc docker-compose \ +podman-docker containerd runc; \ +do sudo apt-get remove $pkg; done +""", +) + +_docker_apt_certs = _Cmd( +    "apt-certs", +    "prepare apt certs", +    """ +sudo apt-get update +sudo apt-get install ca-certificates curl +sudo install -m 0755 -d /etc/apt/keyrings +""", +) + +_docker_docker_certs = _Cmd( +    "docker-certs", +    "add docker certs", +    """ +sudo curl -fsSL https://download.docker.com/linux/debian/gpg \ +-o /etc/apt/keyrings/docker.asc +sudo chmod a+r /etc/apt/keyrings/docker.asc +""", +) + +_docker_apt_repo = _Cmd( +    "apt-repo", +    "add docker apt repo", +    """ +echo \\ +  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \ +https://download.docker.com/linux/debian \\ +  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\ +  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null +""", +) + +_docker_install = _Cmd( +    "install", +    "update apt and install docker", +    """ +sudo apt-get update +sudo apt-get install docker-ce docker-ce-cli containerd.io \ +docker-buildx-plugin docker-compose-plugin +""", +) + +_docker_setup = _Cmd( +    "setup", +    "setup system for docker", +    """ +sudo systemctl enable docker +sudo systemctl start docker +sudo groupadd -f docker +sudo usermod -aG docker $USER +# Remember to log out and log back in for the group changes to take effect +""", +) + +_docker = _Cmd( +    "install-docker", +    "install docker for a fresh new system", +    [ +        _docker_uninstall, +        _docker_apt_certs, +        _docker_docker_certs, +        _docker_apt_repo, +        _docker_install, +        _docker_setup, +    ], +) + +_update_blog = _Cmd( +    "update-blog", +    "re-generate blog pages", +    """ +docker exec -it blog /scripts/update.bash +""", +) + +_git_user = _Cmd( +    "git-user", +    "add/set git server user and password", +    """ +docker run -it --rm -v "$ps_file:/user-info" httpd htpasswd "/user-info" [username] +""", +) + + +class GenCmdProvider(AppCommandFeatureProvider): +    def __init__(self) -> None: +        super().__init__("gen-cmd-provider") +        self._cmds: dict[str, _Cmd] = {} +        self._register_cmds(_docker, _update_blog, _git_user) + +    def _register_cmd(self, cmd: "_Cmd"): +        self._cmds[cmd.name] = cmd.clean() + +    def _register_cmds(self, *cmds: "_Cmd"): +        for c in cmds: +            self._register_cmd(c) + +    def setup(self): +        pass + +    def get_command_info(self): +        return ("gen-cmd", "Get commands of running external cli tools.") + +    def setup_arg_parser(self, arg_parser): +        subparsers = arg_parser.add_subparsers( +            dest="gen_cmd", metavar="GEN_CMD_COMMAND" +        ) +        certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") +        certbot_parser.add_argument( +            "-t", "--test", action="store_true", help="run certbot in test mode" +        ) +        for cmd in self._cmds.values(): +            subparsers.add_parser(cmd.name, help=cmd.desc) + +    def _print_cmd(self, name: str): +        print(self._cmds[name].generate_text(False)) + +    def run_command(self, args): +        if args.gen_cmd is None or args.gen_cmd == "list": +            print("[certbot]: certbot ssl cert commands") +            for cmd in self._cmds.values(): +                print(cmd.generate_text(True)) +        elif args.gen_cmd == "certbot": +            self.app.get_feature(NginxManager).print_all_certbot_commands(args.test) +        else: +            self._print_cmd(args.gen_cmd) diff --git a/python/cru/service/_nginx.py b/python/cru/service/_nginx.py new file mode 100644 index 0000000..87cff6d --- /dev/null +++ b/python/cru/service/_nginx.py @@ -0,0 +1,263 @@ +from argparse import Namespace +from enum import Enum, auto +import re +import subprocess +from typing import TypeAlias + +from cru import CruInternalError + +from ._base import AppCommandFeatureProvider +from ._template import TemplateManager + + +class CertbotAction(Enum): +    CREATE = auto() +    EXPAND = auto() +    SHRINK = auto() +    RENEW = auto() + + +class NginxManager(AppCommandFeatureProvider): +    CertbotAction: TypeAlias = CertbotAction + +    def __init__(self) -> None: +        super().__init__("nginx-manager") +        self._domains_cache: list[str] | None = None + +    def setup(self) -> None: +        pass + +    @property +    def _template_manager(self) -> TemplateManager: +        return self.app.get_feature(TemplateManager) + +    @property +    def root_domain(self) -> str: +        return self._template_manager.get_domain() + +    @property +    def domains(self) -> list[str]: +        if self._domains_cache is None: +            self._domains_cache = self._get_domains() +        return self._domains_cache + +    @property +    def subdomains(self) -> list[str]: +        suffix = "." + self.root_domain +        return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)] + +    def _get_domains_from_text(self, text: str) -> set[str]: +        domains: set[str] = set() +        regex = re.compile(r"server_name\s+(\S+)\s*;") +        for match in regex.finditer(text): +            domains.add(match[1]) +        return domains + +    def _join_generated_nginx_conf_text(self) -> str: +        result = "" +        for path, text in self._template_manager.generate(): +            if "nginx" in str(path): +                result += text +        return result + +    def _get_domains(self) -> list[str]: +        text = self._join_generated_nginx_conf_text() +        domains = self._get_domains_from_text(text) +        domains.remove(self.root_domain) +        return [self.root_domain, *domains] + +    def _print_domains(self) -> None: +        for domain in self.domains: +            print(domain) + +    def _certbot_command( +        self, +        action: CertbotAction | str, +        test: bool, +        *, +        docker=True, +        standalone=None, +        email=None, +        agree_tos=True, +    ) -> str: +        if isinstance(action, str): +            action = CertbotAction[action.upper()] + +        command_args = [] + +        add_domain_option = True +        if action is CertbotAction.CREATE: +            if standalone is None: +                standalone = True +            command_action = "certonly" +        elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]: +            if standalone is None: +                standalone = False +            command_action = "certonly" +        elif action is CertbotAction.RENEW: +            if standalone is None: +                standalone = False +            add_domain_option = False +            command_action = "renew" +        else: +            raise CruInternalError("Invalid certbot action.") + +        data_dir = self.app.data_dir.full_path.as_posix() + +        if not docker: +            command_args.append("certbot") +        else: +            command_args.extend( +                [ +                    "docker run -it --rm --name certbot", +                    f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"', +                    f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"', +                ] +            ) +            if standalone: +                command_args.append('-p "0.0.0.0:80:80"') +            else: +                command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"') + +            command_args.append("certbot/certbot") + +        command_args.append(command_action) + +        command_args.append(f"--cert-name {self.root_domain}") + +        if standalone: +            command_args.append("--standalone") +        else: +            command_args.append("--webroot -w /var/www/certbot") + +        if add_domain_option: +            command_args.append(" ".join([f"-d {domain}" for domain in self.domains])) + +        if email is not None: +            command_args.append(f"--email {email}") + +        if agree_tos: +            command_args.append("--agree-tos") + +        if test: +            command_args.append("--test-cert --dry-run") + +        return " ".join(command_args) + +    def print_all_certbot_commands(self, test: bool): +        print("### COMMAND: (standalone) create certs") +        print( +            self._certbot_command( +                CertbotAction.CREATE, +                test, +                email=self._template_manager.get_email(), +            ) +        ) +        print() +        print("### COMMAND: (webroot+nginx) expand or shrink certs") +        print( +            self._certbot_command( +                CertbotAction.EXPAND, +                test, +                email=self._template_manager.get_email(), +            ) +        ) +        print() +        print("### COMMAND: (webroot+nginx) renew certs") +        print( +            self._certbot_command( +                CertbotAction.RENEW, +                test, +                email=self._template_manager.get_email(), +            ) +        ) + +    @property +    def _cert_path_str(self) -> str: +        return str( +            self.app.data_dir.full_path +            / "certbot/certs/live" +            / self.root_domain +            / "fullchain.pem" +        ) + +    def get_command_info(self): +        return "nginx", "Manage nginx related things." + +    def setup_arg_parser(self, arg_parser): +        subparsers = arg_parser.add_subparsers( +            dest="nginx_command", required=True, metavar="NGINX_COMMAND" +        ) +        _list_parser = subparsers.add_parser("list", help="list domains") +        certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") +        certbot_parser.add_argument( +            "--no-test", +            action="store_true", +            help="remove args making certbot run in test mode", +        ) + +    def run_command(self, args: Namespace) -> None: +        if args.nginx_command == "list": +            self._print_domains() +        elif args.nginx_command == "certbot": +            self.print_all_certbot_commands(not args.no_test) + +    def _generate_dns_zone( +        self, +        ip: str, +        /, +        ttl: str | int = 600, +        *, +        enable_mail: bool = True, +        dkim: str | None = None, +    ) -> str: +        # TODO: Not complete and test now. +        root_domain = self.root_domain +        result = f"$ORIGIN {root_domain}.\n\n" +        result += "; A records\n" +        result += f"@ {ttl} IN A {ip}\n" +        for subdomain in self.subdomains: +            result += f"{subdomain} {ttl} IN A {ip}\n" + +        if enable_mail: +            result += "\n; MX records\n" +            result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n" +            result += "\n; SPF record\n" +            result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n' +            if dkim is not None: +                result += "\n; DKIM record\n" +                result += f'mail._domainkey {ttl} IN TEXT "{dkim}"' +                result += "\n; DMARC record\n" +                dmarc_options = [ +                    "v=DMARC1", +                    "p=none", +                    f"rua=mailto:dmarc.report@{root_domain}", +                    f"ruf=mailto:dmarc.report@{root_domain}", +                    "sp=none", +                    "ri=86400", +                ] +                result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n' +        return result + +    def _get_dkim_from_mailserver(self) -> str | None: +        # TODO: Not complete and test now. +        dkim_path = ( +            self.app.data_dir.full_path +            / "dms/config/opendkim/keys" +            / self.root_domain +            / "mail.txt" +        ) +        if not dkim_path.exists(): +            return None + +        p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True) +        value = "" +        for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")): +            value += match.group(1) +        return value + +    def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str: +        # TODO: Not complete and test now. +        return self._generate_dns_zone( +            ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver() +        ) diff --git a/python/cru/service/_template.py b/python/cru/service/_template.py new file mode 100644 index 0000000..22c1d21 --- /dev/null +++ b/python/cru/service/_template.py @@ -0,0 +1,228 @@ +from argparse import Namespace +from pathlib import Path +import shutil +from typing import NamedTuple +import graphlib + +from cru import CruException +from cru.parsing import SimpleLineVarParser +from cru.template import TemplateTree, CruStrWrapperTemplate + +from ._base import AppCommandFeatureProvider, AppFeaturePath + + +class _Config(NamedTuple): +    text: str +    config: dict[str, str] + + +class _GeneratedConfig(NamedTuple): +    base: _Config +    private: _Config +    merged: _Config + + +class _PreConfig(NamedTuple): +    base: _Config +    private: _Config +    config: dict[str, str] + +    @staticmethod +    def create(base: _Config, private: _Config) -> "_PreConfig": +        return _PreConfig(base, private, {**base.config, **private.config}) + +    def _merge(self, generated: _Config): +        text = ( +            "\n".join( +                [ +                    self.private.text.strip(), +                    self.base.text.strip(), +                    generated.text.strip(), +                ] +            ) +            + "\n" +        ) +        config = {**self.config, **generated.config} +        return _GeneratedConfig(self.base, self.private, _Config(text, config)) + + +class _Template(NamedTuple): +    config: CruStrWrapperTemplate +    config_vars: set[str] +    tree: TemplateTree + + +class TemplateManager(AppCommandFeatureProvider): +    def __init__(self): +        super().__init__("template-manager") + +    def setup(self) -> None: +        self._base_config_file = self.app.services_dir.add_subpath("base-config", False) +        self._private_config_file = self.app.data_dir.add_subpath("config", False) +        self._template_config_file = self.app.services_dir.add_subpath( +            "config.template", False +        ) +        self._templates_dir = self.app.services_dir.add_subpath("templates", True) +        self._generated_dir = self.app.services_dir.add_subpath("generated", True) + +        self._config_parser = SimpleLineVarParser() + +        def _read_pre(app_path: AppFeaturePath) -> _Config: +            text = app_path.read_text() +            config = self._read_config(text) +            return _Config(text, config) + +        base = _read_pre(self._base_config_file) +        private = _read_pre(self._private_config_file) +        self._preconfig = _PreConfig.create(base, private) + +        self._generated: _GeneratedConfig | None = None + +        template_config_text = self._template_config_file.read_text() +        self._template_config = self._read_config(template_config_text) + +        self._template = _Template( +            CruStrWrapperTemplate(template_config_text), +            set(self._template_config.keys()), +            TemplateTree( +                lambda text: CruStrWrapperTemplate(text), +                self.templates_dir.full_path_str, +            ), +        ) + +        self._real_required_vars = ( +            self._template.config_vars | self._template.tree.variables +        ) - self._template.config_vars +        lacks = self._real_required_vars - self._preconfig.config.keys() +        self._lack_vars = lacks if len(lacks) > 0 else None + +    def _read_config_entry_names(self, text: str) -> set[str]: +        return set(entry.key for entry in self._config_parser.parse(text)) + +    def _read_config(self, text: str) -> dict[str, str]: +        return {entry.key: entry.value for entry in self._config_parser.parse(text)} + +    @property +    def templates_dir(self) -> AppFeaturePath: +        return self._templates_dir + +    @property +    def generated_dir(self) -> AppFeaturePath: +        return self._generated_dir + +    def get_domain(self) -> str: +        return self._preconfig.config["CRUPEST_DOMAIN"] + +    def get_email(self) -> str: +        return self._preconfig.config["CRUPEST_EMAIL"] + +    def _generate_template_config(self, config: dict[str, str]) -> dict[str, str]: +        entry_templates = { +            key: CruStrWrapperTemplate(value) +            for key, value in self._template_config.items() +        } +        sorter = graphlib.TopologicalSorter( +            config +            | {key: template.variables for key, template in entry_templates.items()} +        ) + +        vars: dict[str, str] = config.copy() +        for _ in sorter.static_order(): +            del_keys = [] +            for key, template in entry_templates.items(): +                new = template.generate_partial(vars) +                if not new.has_variables: +                    vars[key] = new.generate({}) +                    del_keys.append(key) +                else: +                    entry_templates[key] = new +            for key in del_keys: +                del entry_templates[key] +        assert len(entry_templates) == 0 +        return {key: value for key, value in vars.items() if key not in config} + +    def _generate_config(self) -> _GeneratedConfig: +        if self._generated is not None: +            return self._generated +        if self._lack_vars is not None: +            raise CruException(f"Required vars are not defined: {self._lack_vars}.") +        config = self._generate_template_config(self._preconfig.config) +        text = self._template.config.generate(self._preconfig.config | config) +        self._generated = self._preconfig._merge(_Config(text, config)) +        return self._generated + +    def generate(self) -> list[tuple[Path, str]]: +        config = self._generate_config() +        return [ +            (Path("config"), config.merged.text), +            *self._template.tree.generate(config.merged.config), +        ] + +    def _generate_files(self, dry_run: bool) -> None: +        result = self.generate() +        if not dry_run: +            if self.generated_dir.full_path.exists(): +                shutil.rmtree(self.generated_dir.full_path) +            for path, text in result: +                des = self.generated_dir.full_path / path +                des.parent.mkdir(parents=True, exist_ok=True) +                with open(des, "w") as f: +                    f.write(text) + +    def get_command_info(self): +        return ("template", "Manage templates.") + +    def _print_file_lists(self) -> None: +        print(f"[{self._template.config.variable_count}]", "config") +        for path, template in self._template.tree.templates: +            print(f"[{template.variable_count}]", path.as_posix()) + +    def _print_vars(self, required: bool) -> None: +        for var in self._template.config.variables: +            print(f"[config] {var}") +        for var in self._template.tree.variables: +            if not (required and var in self._template.config_vars): +                print(f"[template] {var}") + +    def _run_check_vars(self) -> None: +        if self._lack_vars is not None: +            print("Lacks:") +            for var in self._lack_vars: +                print(var) + +    def setup_arg_parser(self, arg_parser): +        subparsers = arg_parser.add_subparsers( +            dest="template_command", required=True, metavar="TEMPLATE_COMMAND" +        ) +        _list_parser = subparsers.add_parser("list", help="list templates") +        vars_parser = subparsers.add_parser( +            "vars", help="list variables used in all templates" +        ) +        vars_parser.add_argument( +            "-r", +            "--required", +            help="only list really required one.", +            action="store_true", +        ) +        _check_vars_parser = subparsers.add_parser( +            "check-vars", +            help="check if required vars are set", +        ) +        generate_parser = subparsers.add_parser("generate", help="generate templates") +        generate_parser.add_argument( +            "--no-dry-run", action="store_true", help="generate and write target files" +        ) + +    def run_command(self, args: Namespace) -> None: +        if args.template_command == "list": +            self._print_file_lists() +        elif args.template_command == "vars": +            self._print_vars(args.required) +        elif args.template_command == "generate": +            dry_run = not args.no_dry_run +            self._generate_files(dry_run) +            if dry_run: +                print("Dry run successfully.") +                print( +                    f"Will delete dir {self.generated_dir.full_path_str} if it exists." +                ) | 
