diff options
Diffstat (limited to 'python/cru/service')
-rw-r--r-- | python/cru/service/__init__.py | 0 | ||||
-rw-r--r-- | python/cru/service/__main__.py | 27 | ||||
-rw-r--r-- | python/cru/service/_app.py | 30 | ||||
-rw-r--r-- | python/cru/service/_base.py | 400 | ||||
-rw-r--r-- | python/cru/service/_gen_cmd.py | 200 | ||||
-rw-r--r-- | python/cru/service/_nginx.py | 263 | ||||
-rw-r--r-- | python/cru/service/_template.py | 228 |
7 files changed, 1148 insertions, 0 deletions
diff --git a/python/cru/service/__init__.py b/python/cru/service/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/python/cru/service/__init__.py diff --git a/python/cru/service/__main__.py b/python/cru/service/__main__.py new file mode 100644 index 0000000..2a0268b --- /dev/null +++ b/python/cru/service/__main__.py @@ -0,0 +1,27 @@ +import sys + +from cru import CruException + +from ._app import create_app + + +def main(): + app = create_app() + app.run_command() + + +if __name__ == "__main__": + version_info = sys.version_info + if not (version_info.major == 3 and version_info.minor >= 11): + print("This application requires Python 3.11 or later.", file=sys.stderr) + sys.exit(1) + + try: + main() + except CruException as e: + user_message = e.get_user_message() + if user_message is not None: + print(f"Error: {user_message}") + exit(1) + else: + raise diff --git a/python/cru/service/_app.py b/python/cru/service/_app.py new file mode 100644 index 0000000..b4c6271 --- /dev/null +++ b/python/cru/service/_app.py @@ -0,0 +1,30 @@ +from ._base import ( + AppBase, + CommandDispatcher, + PathCommandProvider, +) +from ._template import TemplateManager +from ._nginx import NginxManager +from ._gen_cmd import GenCmdProvider + +APP_ID = "crupest" + + +class App(AppBase): + def __init__(self): + super().__init__(APP_ID, f"{APP_ID}-service") + self.add_feature(PathCommandProvider()) + self.add_feature(TemplateManager()) + self.add_feature(NginxManager()) + self.add_feature(GenCmdProvider()) + self.add_feature(CommandDispatcher()) + + def run_command(self): + command_dispatcher = self.get_feature(CommandDispatcher) + command_dispatcher.run_command() + + +def create_app() -> App: + app = App() + app.setup() + return app diff --git a/python/cru/service/_base.py b/python/cru/service/_base.py new file mode 100644 index 0000000..e1eee70 --- /dev/null +++ b/python/cru/service/_base.py @@ -0,0 +1,400 @@ +from __future__ import annotations + +from argparse import ArgumentParser, Namespace +from abc import ABC, abstractmethod +import argparse +import os +from pathlib import Path +from typing import TypeVar, overload + +from cru import CruException, CruLogicError + +_Feature = TypeVar("_Feature", bound="AppFeatureProvider") + + +class AppError(CruException): + pass + + +class AppFeatureError(AppError): + def __init__(self, message, feature: type | str, *args, **kwargs): + super().__init__(message, *args, **kwargs) + self._feature = feature + + @property + def feature(self) -> type | str: + return self._feature + + +class AppPathError(CruException): + def __init__(self, message, _path: str | Path, *args, **kwargs): + super().__init__(message, *args, **kwargs) + self._path = str(_path) + + @property + def path(self) -> str: + return self._path + + +class AppPath(ABC): + def __init__(self, id: str, is_dir: bool, description: str) -> None: + self._is_dir = is_dir + self._id = id + self._description = description + + @property + @abstractmethod + def parent(self) -> AppPath | None: ... + + @property + @abstractmethod + def app(self) -> AppBase: ... + + @property + def id(self) -> str: + return self._id + + @property + def description(self) -> str: + return self._description + + @property + def is_dir(self) -> bool: + return self._is_dir + + @property + @abstractmethod + def full_path(self) -> Path: ... + + @property + def full_path_str(self) -> str: + return str(self.full_path) + + def check_parents(self, must_exist: bool = False) -> bool: + for p in reversed(self.full_path.parents): + if not p.exists() and not must_exist: + return False + if not p.is_dir(): + raise AppPathError("Parents' path must be a dir.", self.full_path) + return True + + def check_self(self, must_exist: bool = False) -> bool: + if not self.check_parents(must_exist): + return False + if not self.full_path.exists(): + if not must_exist: + return False + raise AppPathError("Not exist.", self.full_path) + if self.is_dir: + if not self.full_path.is_dir(): + raise AppPathError("Should be a directory, but not.", self.full_path) + else: + return True + else: + if not self.full_path.is_file(): + raise AppPathError("Should be a file, but not.", self.full_path) + else: + return True + + def ensure(self, create_file: bool = False) -> None: + e = self.check_self(False) + if not e: + os.makedirs(self.full_path.parent, exist_ok=True) + if self.is_dir: + os.mkdir(self.full_path) + elif create_file: + with open(self.full_path, "w") as f: + f.write("") + + def read_text(self) -> str: + if self.is_dir: + raise AppPathError("Can't read text of a dir.", self.full_path) + self.check_self() + return self.full_path.read_text() + + def add_subpath( + self, + name: str, + is_dir: bool, + /, + id: str | None = None, + description: str = "", + ) -> AppFeaturePath: + return self.app._add_path(name, is_dir, self, id, description) + + @property + def app_relative_path(self) -> Path: + return self.full_path.relative_to(self.app.root.full_path) + + +class AppFeaturePath(AppPath): + def __init__( + self, + parent: AppPath, + name: str, + is_dir: bool, + /, + id: str | None = None, + description: str = "", + ) -> None: + super().__init__(id or name, is_dir, description) + self._name = name + self._parent = parent + + @property + def name(self) -> str: + return self._name + + @property + def parent(self) -> AppPath: + return self._parent + + @property + def app(self) -> AppBase: + return self.parent.app + + @property + def full_path(self) -> Path: + return Path(self.parent.full_path, self.name).resolve() + + +class AppRootPath(AppPath): + def __init__(self, app: AppBase, path: Path): + super().__init__(f"/{id}", True, f"Application {id} root path.") + self._app = app + self._full_path = path.resolve() + + @property + def parent(self) -> None: + return None + + @property + def app(self) -> AppBase: + return self._app + + @property + def full_path(self) -> Path: + return self._full_path + + +class AppFeatureProvider(ABC): + def __init__(self, name: str, /, app: AppBase | None = None): + super().__init__() + self._name = name + self._app = app if app else AppBase.get_instance() + + @property + def app(self) -> AppBase: + return self._app + + @property + def name(self) -> str: + return self._name + + @abstractmethod + def setup(self) -> None: ... + + +class AppCommandFeatureProvider(AppFeatureProvider): + @abstractmethod + def get_command_info(self) -> tuple[str, str]: ... + + @abstractmethod + def setup_arg_parser(self, arg_parser: ArgumentParser): ... + + @abstractmethod + def run_command(self, args: Namespace) -> None: ... + + +class PathCommandProvider(AppCommandFeatureProvider): + def __init__(self) -> None: + super().__init__("path-command-provider") + + def setup(self): + pass + + def get_command_info(self): + return ("path", "Get information about paths used by app.") + + def setup_arg_parser(self, arg_parser: ArgumentParser) -> None: + subparsers = arg_parser.add_subparsers( + dest="path_command", metavar="PATH_COMMAND" + ) + _list_parser = subparsers.add_parser( + "list", help="list special paths used by app" + ) + + def run_command(self, args: Namespace) -> None: + if args.path_command is None or args.path_command == "list": + for path in self.app.paths: + print( + f"{path.app_relative_path.as_posix()}{'/' if path.is_dir else ''}: {path.description}" + ) + + +class CommandDispatcher(AppFeatureProvider): + def __init__(self) -> None: + super().__init__("command-dispatcher") + + def setup_arg_parser(self) -> None: + self._map: dict[str, AppCommandFeatureProvider] = {} + arg_parser = argparse.ArgumentParser( + description="Service management", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + subparsers = arg_parser.add_subparsers( + dest="command", + help="The management command to execute.", + metavar="COMMAND", + ) + for feature in self.app.features: + if isinstance(feature, AppCommandFeatureProvider): + info = feature.get_command_info() + command_subparser = subparsers.add_parser(info[0], help=info[1]) + feature.setup_arg_parser(command_subparser) + self._map[info[0]] = feature + self._arg_parser = arg_parser + + def setup(self): + self._parsed_args = self.arg_parser.parse_args() + + @property + def arg_parser(self) -> argparse.ArgumentParser: + return self._arg_parser + + @property + def command_map(self) -> dict[str, AppCommandFeatureProvider]: + return self._map + + @property + def program_args(self) -> argparse.Namespace: + return self._parsed_args + + def run_command(self) -> None: + args = self.program_args + if args.command is None: + self.arg_parser.print_help() + return + self.command_map[args.command].run_command(args) + + +class AppBase: + _instance: AppBase | None = None + + @staticmethod + def get_instance() -> AppBase: + if AppBase._instance is None: + raise AppError("App instance not initialized") + return AppBase._instance + + def __init__(self, app_id: str, name: str): + AppBase._instance = self + self._app_id = app_id + self._name = name + self._features: list[AppFeatureProvider] = [] + self._paths: list[AppFeaturePath] = [] + + def setup(self) -> None: + command_dispatcher = self.get_feature(CommandDispatcher) + command_dispatcher.setup_arg_parser() + self._root = AppRootPath(self, Path(self._ensure_env("CRUPEST_PROJECT_DIR"))) + self._data_dir = self._root.add_subpath( + self._ensure_env("CRUPEST_DATA_DIR"), True, id="data" + ) + self._services_dir = self._root.add_subpath( + self._ensure_env("CRUPEST_SERVICES_DIR"), True, id="CRUPEST_SERVICES_DIR" + ) + for feature in self.features: + feature.setup() + for path in self.paths: + path.check_self() + + @property + def app_id(self) -> str: + return self._app_id + + @property + def name(self) -> str: + return self._name + + def _ensure_env(self, env_name: str) -> str: + value = os.getenv(env_name) + if value is None: + raise AppError(f"Environment variable {env_name} not set") + return value + + @property + def root(self) -> AppRootPath: + return self._root + + @property + def data_dir(self) -> AppFeaturePath: + return self._data_dir + + @property + def services_dir(self) -> AppFeaturePath: + return self._services_dir + + @property + def app_initialized(self) -> bool: + return self.data_dir.check_self() + + @property + def features(self) -> list[AppFeatureProvider]: + return self._features + + @property + def paths(self) -> list[AppFeaturePath]: + return self._paths + + def add_feature(self, feature: _Feature) -> _Feature: + for f in self.features: + if f.name == feature.name: + raise AppFeatureError( + f"Duplicate feature name: {feature.name}.", feature.name + ) + self._features.append(feature) + return feature + + def _add_path( + self, + name: str, + is_dir: bool, + /, + parent: AppPath | None = None, + id: str | None = None, + description: str = "", + ) -> AppFeaturePath: + p = AppFeaturePath( + parent or self.root, name, is_dir, id=id, description=description + ) + self._paths.append(p) + return p + + @overload + def get_feature(self, feature: str) -> AppFeatureProvider: ... + + @overload + def get_feature(self, feature: type[_Feature]) -> _Feature: ... + + def get_feature( + self, feature: str | type[_Feature] + ) -> AppFeatureProvider | _Feature: + if isinstance(feature, str): + for f in self._features: + if f.name == feature: + return f + elif isinstance(feature, type): + for f in self._features: + if isinstance(f, feature): + return f + else: + raise CruLogicError("Argument must be the name of feature or its class.") + + raise AppFeatureError(f"Feature {feature} not found.", feature) + + def get_path(self, name: str) -> AppFeaturePath: + for p in self._paths: + if p.id == name or p.name == name: + return p + raise AppPathError(f"Application path {name} not found.", name) diff --git a/python/cru/service/_gen_cmd.py b/python/cru/service/_gen_cmd.py new file mode 100644 index 0000000..f51d65f --- /dev/null +++ b/python/cru/service/_gen_cmd.py @@ -0,0 +1,200 @@ +from dataclasses import dataclass, replace +from typing import TypeAlias + +from ._base import AppCommandFeatureProvider +from ._nginx import NginxManager + +_Str_Or_Cmd_List: TypeAlias = str | list["_Cmd"] + + +@dataclass +class _Cmd: + name: str + desc: str + cmd: _Str_Or_Cmd_List + + def clean(self) -> "_Cmd": + if isinstance(self.cmd, list): + return replace( + self, + cmd=[cmd.clean() for cmd in self.cmd], + ) + elif isinstance(self.cmd, str): + return replace(self, cmd=self.cmd.strip()) + else: + raise ValueError("Unexpected type for cmd.") + + def generate_text( + self, + info_only: bool, + *, + parent: str | None = None, + ) -> str: + if parent is None: + tag = "COMMAND" + long_name = self.name + indent = "" + else: + tag = "SUBCOMMAND" + long_name = f"{parent}.{self.name}" + indent = " " + + if info_only: + return f"{indent}[{long_name}]: {self.desc}" + + text = f"--- {tag}[{long_name}]: {self.desc}" + if isinstance(self.cmd, str): + text += "\n" + self.cmd + elif isinstance(self.cmd, list): + for sub in self.cmd: + text += "\n" * 2 + sub.generate_text(info_only, parent=self.name) + else: + raise ValueError("Unexpected type for cmd.") + + lines: list[str] = [] + for line in text.splitlines(): + if len(line) == 0: + lines.append("") + else: + lines.append(indent + line) + text = "\n".join(lines) + + return text + + +_docker_uninstall = _Cmd( + "uninstall", + "uninstall apt docker", + """ +for pkg in docker.io docker-doc docker-compose \ +podman-docker containerd runc; \ +do sudo apt-get remove $pkg; done +""", +) + +_docker_apt_certs = _Cmd( + "apt-certs", + "prepare apt certs", + """ +sudo apt-get update +sudo apt-get install ca-certificates curl +sudo install -m 0755 -d /etc/apt/keyrings +""", +) + +_docker_docker_certs = _Cmd( + "docker-certs", + "add docker certs", + """ +sudo curl -fsSL https://download.docker.com/linux/debian/gpg \ +-o /etc/apt/keyrings/docker.asc +sudo chmod a+r /etc/apt/keyrings/docker.asc +""", +) + +_docker_apt_repo = _Cmd( + "apt-repo", + "add docker apt repo", + """ +echo \\ + "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \ +https://download.docker.com/linux/debian \\ + $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\ + sudo tee /etc/apt/sources.list.d/docker.list > /dev/null +""", +) + +_docker_install = _Cmd( + "install", + "update apt and install docker", + """ +sudo apt-get update +sudo apt-get install docker-ce docker-ce-cli containerd.io \ +docker-buildx-plugin docker-compose-plugin +""", +) + +_docker_setup = _Cmd( + "setup", + "setup system for docker", + """ +sudo systemctl enable docker +sudo systemctl start docker +sudo groupadd -f docker +sudo usermod -aG docker $USER +# Remember to log out and log back in for the group changes to take effect +""", +) + +_docker = _Cmd( + "install-docker", + "install docker for a fresh new system", + [ + _docker_uninstall, + _docker_apt_certs, + _docker_docker_certs, + _docker_apt_repo, + _docker_install, + _docker_setup, + ], +) + +_update_blog = _Cmd( + "update-blog", + "re-generate blog pages", + """ +docker exec -it blog /scripts/update.bash +""", +) + +_git_user = _Cmd( + "git-user", + "add/set git server user and password", + """ +docker run -it --rm -v "$ps_file:/user-info" httpd htpasswd "/user-info" [username] +""", +) + + +class GenCmdProvider(AppCommandFeatureProvider): + def __init__(self) -> None: + super().__init__("gen-cmd-provider") + self._cmds: dict[str, _Cmd] = {} + self._register_cmds(_docker, _update_blog, _git_user) + + def _register_cmd(self, cmd: "_Cmd"): + self._cmds[cmd.name] = cmd.clean() + + def _register_cmds(self, *cmds: "_Cmd"): + for c in cmds: + self._register_cmd(c) + + def setup(self): + pass + + def get_command_info(self): + return ("gen-cmd", "Get commands of running external cli tools.") + + def setup_arg_parser(self, arg_parser): + subparsers = arg_parser.add_subparsers( + dest="gen_cmd", metavar="GEN_CMD_COMMAND" + ) + certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") + certbot_parser.add_argument( + "-t", "--test", action="store_true", help="run certbot in test mode" + ) + for cmd in self._cmds.values(): + subparsers.add_parser(cmd.name, help=cmd.desc) + + def _print_cmd(self, name: str): + print(self._cmds[name].generate_text(False)) + + def run_command(self, args): + if args.gen_cmd is None or args.gen_cmd == "list": + print("[certbot]: certbot ssl cert commands") + for cmd in self._cmds.values(): + print(cmd.generate_text(True)) + elif args.gen_cmd == "certbot": + self.app.get_feature(NginxManager).print_all_certbot_commands(args.test) + else: + self._print_cmd(args.gen_cmd) diff --git a/python/cru/service/_nginx.py b/python/cru/service/_nginx.py new file mode 100644 index 0000000..87cff6d --- /dev/null +++ b/python/cru/service/_nginx.py @@ -0,0 +1,263 @@ +from argparse import Namespace +from enum import Enum, auto +import re +import subprocess +from typing import TypeAlias + +from cru import CruInternalError + +from ._base import AppCommandFeatureProvider +from ._template import TemplateManager + + +class CertbotAction(Enum): + CREATE = auto() + EXPAND = auto() + SHRINK = auto() + RENEW = auto() + + +class NginxManager(AppCommandFeatureProvider): + CertbotAction: TypeAlias = CertbotAction + + def __init__(self) -> None: + super().__init__("nginx-manager") + self._domains_cache: list[str] | None = None + + def setup(self) -> None: + pass + + @property + def _template_manager(self) -> TemplateManager: + return self.app.get_feature(TemplateManager) + + @property + def root_domain(self) -> str: + return self._template_manager.get_domain() + + @property + def domains(self) -> list[str]: + if self._domains_cache is None: + self._domains_cache = self._get_domains() + return self._domains_cache + + @property + def subdomains(self) -> list[str]: + suffix = "." + self.root_domain + return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)] + + def _get_domains_from_text(self, text: str) -> set[str]: + domains: set[str] = set() + regex = re.compile(r"server_name\s+(\S+)\s*;") + for match in regex.finditer(text): + domains.add(match[1]) + return domains + + def _join_generated_nginx_conf_text(self) -> str: + result = "" + for path, text in self._template_manager.generate(): + if "nginx" in str(path): + result += text + return result + + def _get_domains(self) -> list[str]: + text = self._join_generated_nginx_conf_text() + domains = self._get_domains_from_text(text) + domains.remove(self.root_domain) + return [self.root_domain, *domains] + + def _print_domains(self) -> None: + for domain in self.domains: + print(domain) + + def _certbot_command( + self, + action: CertbotAction | str, + test: bool, + *, + docker=True, + standalone=None, + email=None, + agree_tos=True, + ) -> str: + if isinstance(action, str): + action = CertbotAction[action.upper()] + + command_args = [] + + add_domain_option = True + if action is CertbotAction.CREATE: + if standalone is None: + standalone = True + command_action = "certonly" + elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]: + if standalone is None: + standalone = False + command_action = "certonly" + elif action is CertbotAction.RENEW: + if standalone is None: + standalone = False + add_domain_option = False + command_action = "renew" + else: + raise CruInternalError("Invalid certbot action.") + + data_dir = self.app.data_dir.full_path.as_posix() + + if not docker: + command_args.append("certbot") + else: + command_args.extend( + [ + "docker run -it --rm --name certbot", + f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"', + f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"', + ] + ) + if standalone: + command_args.append('-p "0.0.0.0:80:80"') + else: + command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"') + + command_args.append("certbot/certbot") + + command_args.append(command_action) + + command_args.append(f"--cert-name {self.root_domain}") + + if standalone: + command_args.append("--standalone") + else: + command_args.append("--webroot -w /var/www/certbot") + + if add_domain_option: + command_args.append(" ".join([f"-d {domain}" for domain in self.domains])) + + if email is not None: + command_args.append(f"--email {email}") + + if agree_tos: + command_args.append("--agree-tos") + + if test: + command_args.append("--test-cert --dry-run") + + return " ".join(command_args) + + def print_all_certbot_commands(self, test: bool): + print("### COMMAND: (standalone) create certs") + print( + self._certbot_command( + CertbotAction.CREATE, + test, + email=self._template_manager.get_email(), + ) + ) + print() + print("### COMMAND: (webroot+nginx) expand or shrink certs") + print( + self._certbot_command( + CertbotAction.EXPAND, + test, + email=self._template_manager.get_email(), + ) + ) + print() + print("### COMMAND: (webroot+nginx) renew certs") + print( + self._certbot_command( + CertbotAction.RENEW, + test, + email=self._template_manager.get_email(), + ) + ) + + @property + def _cert_path_str(self) -> str: + return str( + self.app.data_dir.full_path + / "certbot/certs/live" + / self.root_domain + / "fullchain.pem" + ) + + def get_command_info(self): + return "nginx", "Manage nginx related things." + + def setup_arg_parser(self, arg_parser): + subparsers = arg_parser.add_subparsers( + dest="nginx_command", required=True, metavar="NGINX_COMMAND" + ) + _list_parser = subparsers.add_parser("list", help="list domains") + certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") + certbot_parser.add_argument( + "--no-test", + action="store_true", + help="remove args making certbot run in test mode", + ) + + def run_command(self, args: Namespace) -> None: + if args.nginx_command == "list": + self._print_domains() + elif args.nginx_command == "certbot": + self.print_all_certbot_commands(not args.no_test) + + def _generate_dns_zone( + self, + ip: str, + /, + ttl: str | int = 600, + *, + enable_mail: bool = True, + dkim: str | None = None, + ) -> str: + # TODO: Not complete and test now. + root_domain = self.root_domain + result = f"$ORIGIN {root_domain}.\n\n" + result += "; A records\n" + result += f"@ {ttl} IN A {ip}\n" + for subdomain in self.subdomains: + result += f"{subdomain} {ttl} IN A {ip}\n" + + if enable_mail: + result += "\n; MX records\n" + result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n" + result += "\n; SPF record\n" + result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n' + if dkim is not None: + result += "\n; DKIM record\n" + result += f'mail._domainkey {ttl} IN TEXT "{dkim}"' + result += "\n; DMARC record\n" + dmarc_options = [ + "v=DMARC1", + "p=none", + f"rua=mailto:dmarc.report@{root_domain}", + f"ruf=mailto:dmarc.report@{root_domain}", + "sp=none", + "ri=86400", + ] + result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n' + return result + + def _get_dkim_from_mailserver(self) -> str | None: + # TODO: Not complete and test now. + dkim_path = ( + self.app.data_dir.full_path + / "dms/config/opendkim/keys" + / self.root_domain + / "mail.txt" + ) + if not dkim_path.exists(): + return None + + p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True) + value = "" + for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")): + value += match.group(1) + return value + + def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str: + # TODO: Not complete and test now. + return self._generate_dns_zone( + ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver() + ) diff --git a/python/cru/service/_template.py b/python/cru/service/_template.py new file mode 100644 index 0000000..22c1d21 --- /dev/null +++ b/python/cru/service/_template.py @@ -0,0 +1,228 @@ +from argparse import Namespace +from pathlib import Path +import shutil +from typing import NamedTuple +import graphlib + +from cru import CruException +from cru.parsing import SimpleLineVarParser +from cru.template import TemplateTree, CruStrWrapperTemplate + +from ._base import AppCommandFeatureProvider, AppFeaturePath + + +class _Config(NamedTuple): + text: str + config: dict[str, str] + + +class _GeneratedConfig(NamedTuple): + base: _Config + private: _Config + merged: _Config + + +class _PreConfig(NamedTuple): + base: _Config + private: _Config + config: dict[str, str] + + @staticmethod + def create(base: _Config, private: _Config) -> "_PreConfig": + return _PreConfig(base, private, {**base.config, **private.config}) + + def _merge(self, generated: _Config): + text = ( + "\n".join( + [ + self.private.text.strip(), + self.base.text.strip(), + generated.text.strip(), + ] + ) + + "\n" + ) + config = {**self.config, **generated.config} + return _GeneratedConfig(self.base, self.private, _Config(text, config)) + + +class _Template(NamedTuple): + config: CruStrWrapperTemplate + config_vars: set[str] + tree: TemplateTree + + +class TemplateManager(AppCommandFeatureProvider): + def __init__(self): + super().__init__("template-manager") + + def setup(self) -> None: + self._base_config_file = self.app.services_dir.add_subpath("base-config", False) + self._private_config_file = self.app.data_dir.add_subpath("config", False) + self._template_config_file = self.app.services_dir.add_subpath( + "config.template", False + ) + self._templates_dir = self.app.services_dir.add_subpath("templates", True) + self._generated_dir = self.app.services_dir.add_subpath("generated", True) + + self._config_parser = SimpleLineVarParser() + + def _read_pre(app_path: AppFeaturePath) -> _Config: + text = app_path.read_text() + config = self._read_config(text) + return _Config(text, config) + + base = _read_pre(self._base_config_file) + private = _read_pre(self._private_config_file) + self._preconfig = _PreConfig.create(base, private) + + self._generated: _GeneratedConfig | None = None + + template_config_text = self._template_config_file.read_text() + self._template_config = self._read_config(template_config_text) + + self._template = _Template( + CruStrWrapperTemplate(template_config_text), + set(self._template_config.keys()), + TemplateTree( + lambda text: CruStrWrapperTemplate(text), + self.templates_dir.full_path_str, + ), + ) + + self._real_required_vars = ( + self._template.config_vars | self._template.tree.variables + ) - self._template.config_vars + lacks = self._real_required_vars - self._preconfig.config.keys() + self._lack_vars = lacks if len(lacks) > 0 else None + + def _read_config_entry_names(self, text: str) -> set[str]: + return set(entry.key for entry in self._config_parser.parse(text)) + + def _read_config(self, text: str) -> dict[str, str]: + return {entry.key: entry.value for entry in self._config_parser.parse(text)} + + @property + def templates_dir(self) -> AppFeaturePath: + return self._templates_dir + + @property + def generated_dir(self) -> AppFeaturePath: + return self._generated_dir + + def get_domain(self) -> str: + return self._preconfig.config["CRUPEST_DOMAIN"] + + def get_email(self) -> str: + return self._preconfig.config["CRUPEST_EMAIL"] + + def _generate_template_config(self, config: dict[str, str]) -> dict[str, str]: + entry_templates = { + key: CruStrWrapperTemplate(value) + for key, value in self._template_config.items() + } + sorter = graphlib.TopologicalSorter( + config + | {key: template.variables for key, template in entry_templates.items()} + ) + + vars: dict[str, str] = config.copy() + for _ in sorter.static_order(): + del_keys = [] + for key, template in entry_templates.items(): + new = template.generate_partial(vars) + if not new.has_variables: + vars[key] = new.generate({}) + del_keys.append(key) + else: + entry_templates[key] = new + for key in del_keys: + del entry_templates[key] + assert len(entry_templates) == 0 + return {key: value for key, value in vars.items() if key not in config} + + def _generate_config(self) -> _GeneratedConfig: + if self._generated is not None: + return self._generated + if self._lack_vars is not None: + raise CruException(f"Required vars are not defined: {self._lack_vars}.") + config = self._generate_template_config(self._preconfig.config) + text = self._template.config.generate(self._preconfig.config | config) + self._generated = self._preconfig._merge(_Config(text, config)) + return self._generated + + def generate(self) -> list[tuple[Path, str]]: + config = self._generate_config() + return [ + (Path("config"), config.merged.text), + *self._template.tree.generate(config.merged.config), + ] + + def _generate_files(self, dry_run: bool) -> None: + result = self.generate() + if not dry_run: + if self.generated_dir.full_path.exists(): + shutil.rmtree(self.generated_dir.full_path) + for path, text in result: + des = self.generated_dir.full_path / path + des.parent.mkdir(parents=True, exist_ok=True) + with open(des, "w") as f: + f.write(text) + + def get_command_info(self): + return ("template", "Manage templates.") + + def _print_file_lists(self) -> None: + print(f"[{self._template.config.variable_count}]", "config") + for path, template in self._template.tree.templates: + print(f"[{template.variable_count}]", path.as_posix()) + + def _print_vars(self, required: bool) -> None: + for var in self._template.config.variables: + print(f"[config] {var}") + for var in self._template.tree.variables: + if not (required and var in self._template.config_vars): + print(f"[template] {var}") + + def _run_check_vars(self) -> None: + if self._lack_vars is not None: + print("Lacks:") + for var in self._lack_vars: + print(var) + + def setup_arg_parser(self, arg_parser): + subparsers = arg_parser.add_subparsers( + dest="template_command", required=True, metavar="TEMPLATE_COMMAND" + ) + _list_parser = subparsers.add_parser("list", help="list templates") + vars_parser = subparsers.add_parser( + "vars", help="list variables used in all templates" + ) + vars_parser.add_argument( + "-r", + "--required", + help="only list really required one.", + action="store_true", + ) + _check_vars_parser = subparsers.add_parser( + "check-vars", + help="check if required vars are set", + ) + generate_parser = subparsers.add_parser("generate", help="generate templates") + generate_parser.add_argument( + "--no-dry-run", action="store_true", help="generate and write target files" + ) + + def run_command(self, args: Namespace) -> None: + if args.template_command == "list": + self._print_file_lists() + elif args.template_command == "vars": + self._print_vars(args.required) + elif args.template_command == "generate": + dry_run = not args.no_dry_run + self._generate_files(dry_run) + if dry_run: + print("Dry run successfully.") + print( + f"Will delete dir {self.generated_dir.full_path_str} if it exists." + ) |