diff options
Diffstat (limited to 'services/manager/service')
-rw-r--r-- | services/manager/service/__init__.py | 0 | ||||
-rw-r--r-- | services/manager/service/__main__.py | 27 | ||||
-rw-r--r-- | services/manager/service/_app.py | 30 | ||||
-rw-r--r-- | services/manager/service/_base.py | 398 | ||||
-rw-r--r-- | services/manager/service/_external.py | 81 | ||||
-rw-r--r-- | services/manager/service/_nginx.py | 263 | ||||
-rw-r--r-- | services/manager/service/_template.py | 228 |
7 files changed, 0 insertions, 1027 deletions
diff --git a/services/manager/service/__init__.py b/services/manager/service/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/services/manager/service/__init__.py +++ /dev/null diff --git a/services/manager/service/__main__.py b/services/manager/service/__main__.py deleted file mode 100644 index 6ea0a8a..0000000 --- a/services/manager/service/__main__.py +++ /dev/null @@ -1,27 +0,0 @@ -import sys - -from manager import CruException - -from ._app import create_app - - -def main(): - app = create_app() - app.run_command() - - -if __name__ == "__main__": - version_info = sys.version_info - if not (version_info.major == 3 and version_info.minor >= 11): - print("This application requires Python 3.11 or later.", file=sys.stderr) - sys.exit(1) - - try: - main() - except CruException as e: - user_message = e.get_user_message() - if user_message is not None: - print(f"Error: {user_message}") - exit(1) - else: - raise diff --git a/services/manager/service/_app.py b/services/manager/service/_app.py deleted file mode 100644 index 2304340..0000000 --- a/services/manager/service/_app.py +++ /dev/null @@ -1,30 +0,0 @@ -from ._base import ( - AppBase, - CommandDispatcher, - PathCommandProvider, -) -from ._template import TemplateManager -from ._nginx import NginxManager -from ._external import CliToolCommandProvider - -APP_ID = "crupest" - - -class App(AppBase): - def __init__(self): - super().__init__(APP_ID, f"{APP_ID}-service") - self.add_feature(PathCommandProvider()) - self.add_feature(TemplateManager()) - self.add_feature(NginxManager()) - self.add_feature(CliToolCommandProvider()) - self.add_feature(CommandDispatcher()) - - def run_command(self): - command_dispatcher = self.get_feature(CommandDispatcher) - command_dispatcher.run_command() - - -def create_app() -> App: - app = App() - app.setup() - return app diff --git a/services/manager/service/_base.py b/services/manager/service/_base.py deleted file mode 100644 index 783296c..0000000 --- a/services/manager/service/_base.py +++ /dev/null @@ -1,398 +0,0 @@ -from __future__ import annotations - -from argparse import ArgumentParser, Namespace -from abc import ABC, abstractmethod -import argparse -import os -from pathlib import Path -from typing import TypeVar, overload - -from manager import CruException, CruLogicError - -_Feature = TypeVar("_Feature", bound="AppFeatureProvider") - - -class AppError(CruException): - pass - - -class AppFeatureError(AppError): - def __init__(self, message, feature: type | str, *args, **kwargs): - super().__init__(message, *args, **kwargs) - self._feature = feature - - @property - def feature(self) -> type | str: - return self._feature - - -class AppPathError(CruException): - def __init__(self, message, _path: str | Path, *args, **kwargs): - super().__init__(message, *args, **kwargs) - self._path = str(_path) - - @property - def path(self) -> str: - return self._path - - -class AppPath(ABC): - def __init__(self, id: str, is_dir: bool, description: str) -> None: - self._is_dir = is_dir - self._id = id - self._description = description - - @property - @abstractmethod - def parent(self) -> AppPath | None: ... - - @property - @abstractmethod - def app(self) -> AppBase: ... - - @property - def id(self) -> str: - return self._id - - @property - def description(self) -> str: - return self._description - - @property - def is_dir(self) -> bool: - return self._is_dir - - @property - @abstractmethod - def full_path(self) -> Path: ... - - @property - def full_path_str(self) -> str: - return str(self.full_path) - - def check_parents(self, must_exist: bool = False) -> bool: - for p in reversed(self.full_path.parents): - if not p.exists() and not must_exist: - return False - if not p.is_dir(): - raise AppPathError("Parents' path must be a dir.", self.full_path) - return True - - def check_self(self, must_exist: bool = False) -> bool: - if not self.check_parents(must_exist): - return False - if not self.full_path.exists(): - if not must_exist: - return False - raise AppPathError("Not exist.", self.full_path) - if self.is_dir: - if not self.full_path.is_dir(): - raise AppPathError("Should be a directory, but not.", self.full_path) - else: - return True - else: - if not self.full_path.is_file(): - raise AppPathError("Should be a file, but not.", self.full_path) - else: - return True - - def ensure(self, create_file: bool = False) -> None: - e = self.check_self(False) - if not e: - os.makedirs(self.full_path.parent, exist_ok=True) - if self.is_dir: - os.mkdir(self.full_path) - elif create_file: - with open(self.full_path, "w") as f: - f.write("") - - def read_text(self) -> str: - if self.is_dir: - raise AppPathError("Can't read text of a dir.", self.full_path) - self.check_self() - return self.full_path.read_text() - - def add_subpath( - self, - name: str, - is_dir: bool, - /, - id: str | None = None, - description: str = "", - ) -> AppFeaturePath: - return self.app._add_path(name, is_dir, self, id, description) - - @property - def app_relative_path(self) -> Path: - return self.full_path.relative_to(self.app.root.full_path) - - -class AppFeaturePath(AppPath): - def __init__( - self, - parent: AppPath, - name: str, - is_dir: bool, - /, - id: str | None = None, - description: str = "", - ) -> None: - super().__init__(id or name, is_dir, description) - self._name = name - self._parent = parent - - @property - def name(self) -> str: - return self._name - - @property - def parent(self) -> AppPath: - return self._parent - - @property - def app(self) -> AppBase: - return self.parent.app - - @property - def full_path(self) -> Path: - return Path(self.parent.full_path, self.name).resolve() - - -class AppRootPath(AppPath): - def __init__(self, app: AppBase, path: Path): - super().__init__(f"/{id}", True, f"Application {id} path.") - self._app = app - self._full_path = path.resolve() - - @property - def parent(self) -> None: - return None - - @property - def app(self) -> AppBase: - return self._app - - @property - def full_path(self) -> Path: - return self._full_path - - -class AppFeatureProvider(ABC): - def __init__(self, name: str, /, app: AppBase | None = None): - super().__init__() - self._name = name - self._app = app if app else AppBase.get_instance() - - @property - def app(self) -> AppBase: - return self._app - - @property - def name(self) -> str: - return self._name - - @abstractmethod - def setup(self) -> None: ... - - -class AppCommandFeatureProvider(AppFeatureProvider): - @abstractmethod - def get_command_info(self) -> tuple[str, str]: ... - - @abstractmethod - def setup_arg_parser(self, arg_parser: ArgumentParser): ... - - @abstractmethod - def run_command(self, args: Namespace) -> None: ... - - -class PathCommandProvider(AppCommandFeatureProvider): - def __init__(self) -> None: - super().__init__("path-command-provider") - - def setup(self): - pass - - def get_command_info(self): - return ("path", "Get information about paths used by app.") - - def setup_arg_parser(self, arg_parser: ArgumentParser) -> None: - subparsers = arg_parser.add_subparsers( - dest="path_command", required=True, metavar="PATH_COMMAND" - ) - _list_parser = subparsers.add_parser( - "list", help="list special paths used by app" - ) - - def run_command(self, args: Namespace) -> None: - if args.path_command == "list": - for path in self.app.paths: - print(f"{path.app_relative_path.as_posix()}: {path.description}") - - -class CommandDispatcher(AppFeatureProvider): - def __init__(self) -> None: - super().__init__("command-dispatcher") - - def setup_arg_parser(self) -> None: - self._map: dict[str, AppCommandFeatureProvider] = {} - arg_parser = argparse.ArgumentParser( - description="Service management", - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - subparsers = arg_parser.add_subparsers( - dest="command", - help="The management command to execute.", - metavar="COMMAND", - ) - for feature in self.app.features: - if isinstance(feature, AppCommandFeatureProvider): - info = feature.get_command_info() - command_subparser = subparsers.add_parser(info[0], help=info[1]) - feature.setup_arg_parser(command_subparser) - self._map[info[0]] = feature - self._arg_parser = arg_parser - - def setup(self): - self._parsed_args = self.arg_parser.parse_args() - - @property - def arg_parser(self) -> argparse.ArgumentParser: - return self._arg_parser - - @property - def command_map(self) -> dict[str, AppCommandFeatureProvider]: - return self._map - - @property - def program_args(self) -> argparse.Namespace: - return self._parsed_args - - def run_command(self) -> None: - args = self.program_args - if args.command is None: - self.arg_parser.print_help() - return - self.command_map[args.command].run_command(args) - - -class AppBase: - _instance: AppBase | None = None - - @staticmethod - def get_instance() -> AppBase: - if AppBase._instance is None: - raise AppError("App instance not initialized") - return AppBase._instance - - def __init__(self, app_id: str, name: str): - AppBase._instance = self - self._app_id = app_id - self._name = name - self._features: list[AppFeatureProvider] = [] - self._paths: list[AppFeaturePath] = [] - - def setup(self) -> None: - command_dispatcher = self.get_feature(CommandDispatcher) - command_dispatcher.setup_arg_parser() - self._root = AppRootPath(self, Path(self._ensure_env("CRUPEST_PROJECT_DIR"))) - self._data_dir = self._root.add_subpath( - self._ensure_env("CRUPEST_DATA_DIR"), True, id="data" - ) - self._services_dir = self._root.add_subpath( - self._ensure_env("CRUPEST_SERVICES_DIR"), True, id="CRUPEST_SERVICES_DIR" - ) - for feature in self.features: - feature.setup() - for path in self.paths: - path.check_self() - - @property - def app_id(self) -> str: - return self._app_id - - @property - def name(self) -> str: - return self._name - - def _ensure_env(self, env_name: str) -> str: - value = os.getenv(env_name) - if value is None: - raise AppError(f"Environment variable {env_name} not set") - return value - - @property - def root(self) -> AppRootPath: - return self._root - - @property - def data_dir(self) -> AppFeaturePath: - return self._data_dir - - @property - def services_dir(self) -> AppFeaturePath: - return self._services_dir - - @property - def app_initialized(self) -> bool: - return self.data_dir.check_self() - - @property - def features(self) -> list[AppFeatureProvider]: - return self._features - - @property - def paths(self) -> list[AppFeaturePath]: - return self._paths - - def add_feature(self, feature: _Feature) -> _Feature: - for f in self.features: - if f.name == feature.name: - raise AppFeatureError( - f"Duplicate feature name: {feature.name}.", feature.name - ) - self._features.append(feature) - return feature - - def _add_path( - self, - name: str, - is_dir: bool, - /, - parent: AppPath | None = None, - id: str | None = None, - description: str = "", - ) -> AppFeaturePath: - p = AppFeaturePath( - parent or self.root, name, is_dir, id=id, description=description - ) - self._paths.append(p) - return p - - @overload - def get_feature(self, feature: str) -> AppFeatureProvider: ... - - @overload - def get_feature(self, feature: type[_Feature]) -> _Feature: ... - - def get_feature( - self, feature: str | type[_Feature] - ) -> AppFeatureProvider | _Feature: - if isinstance(feature, str): - for f in self._features: - if f.name == feature: - return f - elif isinstance(feature, type): - for f in self._features: - if isinstance(f, feature): - return f - else: - raise CruLogicError("Argument must be the name of feature or its class.") - - raise AppFeatureError(f"Feature {feature} not found.", feature) - - def get_path(self, name: str) -> AppFeaturePath: - for p in self._paths: - if p.id == name or p.name == name: - return p - raise AppPathError(f"Application path {name} not found.", name) diff --git a/services/manager/service/_external.py b/services/manager/service/_external.py deleted file mode 100644 index 2347e95..0000000 --- a/services/manager/service/_external.py +++ /dev/null @@ -1,81 +0,0 @@ -from ._base import AppCommandFeatureProvider -from ._nginx import NginxManager - - -class CliToolCommandProvider(AppCommandFeatureProvider): - def __init__(self) -> None: - super().__init__("cli-tool-command-provider") - - def setup(self): - pass - - def get_command_info(self): - return ("gen-cli", "Get commands of running external cli tools.") - - def setup_arg_parser(self, arg_parser): - subparsers = arg_parser.add_subparsers( - dest="gen_cli_command", required=True, metavar="GEN_CLI_COMMAND" - ) - certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") - certbot_parser.add_argument( - "-t", "--test", action="store_true", help="run certbot in test mode" - ) - _install_docker_parser = subparsers.add_parser( - "install-docker", help="print docker installation commands" - ) - _update_blog_parser = subparsers.add_parser( - "update-blog", help="print blog update command" - ) - - def _print_install_docker_commands(self) -> None: - output = """ -### COMMAND: uninstall apt docker -for pkg in docker.io docker-doc docker-compose \ -podman-docker containerd runc; \ -do sudo apt-get remove $pkg; done - -### COMMAND: prepare apt certs -sudo apt-get update -sudo apt-get install ca-certificates curl -sudo install -m 0755 -d /etc/apt/keyrings - -### COMMAND: install certs -sudo curl -fsSL https://download.docker.com/linux/debian/gpg \ --o /etc/apt/keyrings/docker.asc -sudo chmod a+r /etc/apt/keyrings/docker.asc - -### COMMAND: add docker apt source -echo \\ - "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \ -https://download.docker.com/linux/debian \\ - $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\ - sudo tee /etc/apt/sources.list.d/docker.list > /dev/null - -### COMMAND: update apt and install docker -sudo apt-get update -sudo apt-get install docker-ce docker-ce-cli containerd.io \ -docker-buildx-plugin docker-compose-plugin - -### COMMAND: setup system for docker -sudo systemctl enable docker -sudo systemctl start docker -sudo groupadd -f docker -sudo usermod -aG docker $USER -# Remember to log out and log back in for the group changes to take effect -""".strip() - print(output) - - def _print_update_blog_command(self): - output = """ -### COMMAND: update blog -docker exec -it blog /scripts/update.bash -""".strip() - print(output) - - def run_command(self, args): - if args.gen_cli_command == "certbot": - self.app.get_feature(NginxManager).print_all_certbot_commands(args.test) - elif args.gen_cli_command == "install-docker": - self._print_install_docker_commands() - elif args.gen_cli_command == "update-blog": - self._print_update_blog_command()
\ No newline at end of file diff --git a/services/manager/service/_nginx.py b/services/manager/service/_nginx.py deleted file mode 100644 index 5dfc3ab..0000000 --- a/services/manager/service/_nginx.py +++ /dev/null @@ -1,263 +0,0 @@ -from argparse import Namespace -from enum import Enum, auto -import re -import subprocess -from typing import TypeAlias - -from manager import CruInternalError - -from ._base import AppCommandFeatureProvider -from ._template import TemplateManager - - -class CertbotAction(Enum): - CREATE = auto() - EXPAND = auto() - SHRINK = auto() - RENEW = auto() - - -class NginxManager(AppCommandFeatureProvider): - CertbotAction: TypeAlias = CertbotAction - - def __init__(self) -> None: - super().__init__("nginx-manager") - self._domains_cache: list[str] | None = None - - def setup(self) -> None: - pass - - @property - def _template_manager(self) -> TemplateManager: - return self.app.get_feature(TemplateManager) - - @property - def root_domain(self) -> str: - return self._template_manager.get_domain() - - @property - def domains(self) -> list[str]: - if self._domains_cache is None: - self._domains_cache = self._get_domains() - return self._domains_cache - - @property - def subdomains(self) -> list[str]: - suffix = "." + self.root_domain - return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)] - - def _get_domains_from_text(self, text: str) -> set[str]: - domains: set[str] = set() - regex = re.compile(r"server_name\s+(\S+)\s*;") - for match in regex.finditer(text): - domains.add(match[1]) - return domains - - def _join_generated_nginx_conf_text(self) -> str: - result = "" - for path, text in self._template_manager.generate(): - if path.parents[-1] == "nginx": - result += text - return result - - def _get_domains(self) -> list[str]: - text = self._join_generated_nginx_conf_text() - domains = self._get_domains_from_text(text) - domains.remove(self.root_domain) - return [self.root_domain, *domains] - - def _print_domains(self) -> None: - for domain in self.domains: - print(domain) - - def _certbot_command( - self, - action: CertbotAction | str, - test: bool, - *, - docker=True, - standalone=None, - email=None, - agree_tos=True, - ) -> str: - if isinstance(action, str): - action = CertbotAction[action.upper()] - - command_args = [] - - add_domain_option = True - if action is CertbotAction.CREATE: - if standalone is None: - standalone = True - command_action = "certonly" - elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]: - if standalone is None: - standalone = False - command_action = "certonly" - elif action is CertbotAction.RENEW: - if standalone is None: - standalone = False - add_domain_option = False - command_action = "renew" - else: - raise CruInternalError("Invalid certbot action.") - - data_dir = self.app.data_dir.full_path.as_posix() - - if not docker: - command_args.append("certbot") - else: - command_args.extend( - [ - "docker run -it --rm --name certbot", - f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"', - f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"', - ] - ) - if standalone: - command_args.append('-p "0.0.0.0:80:80"') - else: - command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"') - - command_args.append("certbot/certbot") - - command_args.append(command_action) - - command_args.append(f"--cert-name {self.root_domain}") - - if standalone: - command_args.append("--standalone") - else: - command_args.append("--webroot -w /var/www/certbot") - - if add_domain_option: - command_args.append(" ".join([f"-d {domain}" for domain in self.domains])) - - if email is not None: - command_args.append(f"--email {email}") - - if agree_tos: - command_args.append("--agree-tos") - - if test: - command_args.append("--test-cert --dry-run") - - return " ".join(command_args) - - def print_all_certbot_commands(self, test: bool): - print("### COMMAND: (standalone) create certs") - print( - self._certbot_command( - CertbotAction.CREATE, - test, - email=self._template_manager.get_email(), - ) - ) - print() - print("### COMMAND: (webroot+nginx) expand or shrink certs") - print( - self._certbot_command( - CertbotAction.EXPAND, - test, - email=self._template_manager.get_email(), - ) - ) - print() - print("### COMMAND: (webroot+nginx) renew certs") - print( - self._certbot_command( - CertbotAction.RENEW, - test, - email=self._template_manager.get_email(), - ) - ) - - @property - def _cert_path_str(self) -> str: - return str( - self.app.data_dir.full_path - / "certbot/certs/live" - / self.root_domain - / "fullchain.pem" - ) - - def get_command_info(self): - return "nginx", "Manage nginx related things." - - def setup_arg_parser(self, arg_parser): - subparsers = arg_parser.add_subparsers( - dest="nginx_command", required=True, metavar="NGINX_COMMAND" - ) - _list_parser = subparsers.add_parser("list", help="list domains") - certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") - certbot_parser.add_argument( - "--no-test", - action="store_true", - help="remove args making certbot run in test mode", - ) - - def run_command(self, args: Namespace) -> None: - if args.nginx_command == "list": - self._print_domains() - elif args.nginx_command == "certbot": - self.print_all_certbot_commands(not args.no_test) - - def _generate_dns_zone( - self, - ip: str, - /, - ttl: str | int = 600, - *, - enable_mail: bool = True, - dkim: str | None = None, - ) -> str: - # TODO: Not complete and test now. - root_domain = self.root_domain - result = f"$ORIGIN {root_domain}.\n\n" - result += "; A records\n" - result += f"@ {ttl} IN A {ip}\n" - for subdomain in self.subdomains: - result += f"{subdomain} {ttl} IN A {ip}\n" - - if enable_mail: - result += "\n; MX records\n" - result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n" - result += "\n; SPF record\n" - result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n' - if dkim is not None: - result += "\n; DKIM record\n" - result += f'mail._domainkey {ttl} IN TEXT "{dkim}"' - result += "\n; DMARC record\n" - dmarc_options = [ - "v=DMARC1", - "p=none", - f"rua=mailto:dmarc.report@{root_domain}", - f"ruf=mailto:dmarc.report@{root_domain}", - "sp=none", - "ri=86400", - ] - result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n' - return result - - def _get_dkim_from_mailserver(self) -> str | None: - # TODO: Not complete and test now. - dkim_path = ( - self.app.data_dir.full_path - / "dms/config/opendkim/keys" - / self.root_domain - / "mail.txt" - ) - if not dkim_path.exists(): - return None - - p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True) - value = "" - for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")): - value += match.group(1) - return value - - def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str: - # TODO: Not complete and test now. - return self._generate_dns_zone( - ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver() - ) diff --git a/services/manager/service/_template.py b/services/manager/service/_template.py deleted file mode 100644 index 90c19ec..0000000 --- a/services/manager/service/_template.py +++ /dev/null @@ -1,228 +0,0 @@ -from argparse import Namespace -from pathlib import Path -import shutil -from typing import NamedTuple -import graphlib - -from manager import CruException -from manager.parsing import SimpleLineVarParser -from manager.template import TemplateTree, CruStrWrapperTemplate - -from ._base import AppCommandFeatureProvider, AppFeaturePath - - -class _Config(NamedTuple): - text: str - config: dict[str, str] - - -class _GeneratedConfig(NamedTuple): - base: _Config - private: _Config - merged: _Config - - -class _PreConfig(NamedTuple): - base: _Config - private: _Config - config: dict[str, str] - - @staticmethod - def create(base: _Config, private: _Config) -> "_PreConfig": - return _PreConfig(base, private, {**base.config, **private.config}) - - def _merge(self, generated: _Config): - text = ( - "\n".join( - [ - self.private.text.strip(), - self.base.text.strip(), - generated.text.strip(), - ] - ) - + "\n" - ) - config = {**self.config, **generated.config} - return _GeneratedConfig(self.base, self.private, _Config(text, config)) - - -class _Template(NamedTuple): - config: CruStrWrapperTemplate - config_vars: set[str] - tree: TemplateTree - - -class TemplateManager(AppCommandFeatureProvider): - def __init__(self): - super().__init__("template-manager") - - def setup(self) -> None: - self._base_config_file = self.app.services_dir.add_subpath("base-config", False) - self._private_config_file = self.app.data_dir.add_subpath("config", False) - self._template_config_file = self.app.services_dir.add_subpath( - "config.template", False - ) - self._templates_dir = self.app.services_dir.add_subpath("templates", True) - self._generated_dir = self.app.services_dir.add_subpath("generated", True) - - self._config_parser = SimpleLineVarParser() - - def _read_pre(app_path: AppFeaturePath) -> _Config: - text = app_path.read_text() - config = self._read_config(text) - return _Config(text, config) - - base = _read_pre(self._base_config_file) - private = _read_pre(self._private_config_file) - self._preconfig = _PreConfig.create(base, private) - - self._generated: _GeneratedConfig | None = None - - template_config_text = self._template_config_file.read_text() - self._template_config = self._read_config(template_config_text) - - self._template = _Template( - CruStrWrapperTemplate(template_config_text), - set(self._template_config.keys()), - TemplateTree( - lambda text: CruStrWrapperTemplate(text), - self.templates_dir.full_path_str, - ), - ) - - self._real_required_vars = ( - self._template.config_vars | self._template.tree.variables - ) - self._template.config_vars - lacks = self._real_required_vars - self._preconfig.config.keys() - self._lack_vars = lacks if len(lacks) > 0 else None - - def _read_config_entry_names(self, text: str) -> set[str]: - return set(entry.key for entry in self._config_parser.parse(text)) - - def _read_config(self, text: str) -> dict[str, str]: - return {entry.key: entry.value for entry in self._config_parser.parse(text)} - - @property - def templates_dir(self) -> AppFeaturePath: - return self._templates_dir - - @property - def generated_dir(self) -> AppFeaturePath: - return self._generated_dir - - def get_domain(self) -> str: - return self._preconfig.config["CRUPEST_DOMAIN"] - - def get_email(self) -> str: - return self._preconfig.config["CRUPEST_EMAIL"] - - def _generate_template_config(self, config: dict[str, str]) -> dict[str, str]: - entry_templates = { - key: CruStrWrapperTemplate(value) - for key, value in self._template_config.items() - } - sorter = graphlib.TopologicalSorter( - config - | {key: template.variables for key, template in entry_templates.items()} - ) - - vars: dict[str, str] = config.copy() - for _ in sorter.static_order(): - del_keys = [] - for key, template in entry_templates.items(): - new = template.generate_partial(vars) - if not new.has_variables: - vars[key] = new.generate({}) - del_keys.append(key) - else: - entry_templates[key] = new - for key in del_keys: - del entry_templates[key] - assert len(entry_templates) == 0 - return {key: value for key, value in vars.items() if key not in config} - - def _generate_config(self) -> _GeneratedConfig: - if self._generated is not None: - return self._generated - if self._lack_vars is not None: - raise CruException(f"Required vars are not defined: {self._lack_vars}.") - config = self._generate_template_config(self._preconfig.config) - text = self._template.config.generate(self._preconfig.config | config) - self._generated = self._preconfig._merge(_Config(text, config)) - return self._generated - - def generate(self) -> list[tuple[Path, str]]: - config = self._generate_config() - return [ - (Path("config"), config.merged.text), - *self._template.tree.generate(config.merged.config), - ] - - def _generate_files(self, dry_run: bool) -> None: - result = self.generate() - if not dry_run: - if self.generated_dir.full_path.exists(): - shutil.rmtree(self.generated_dir.full_path) - for path, text in result: - des = self.generated_dir.full_path / path - des.parent.mkdir(parents=True, exist_ok=True) - with open(des, "w") as f: - f.write(text) - - def get_command_info(self): - return ("template", "Manage templates.") - - def _print_file_lists(self) -> None: - print(f"[{self._template.config.variable_count}]", "config") - for path, template in self._template.tree.templates: - print(f"[{template.variable_count}]", path.as_posix()) - - def _print_vars(self, required: bool) -> None: - for var in self._template.config.variables: - print(f"[config] {var}") - for var in self._template.tree.variables: - if not (required and var in self._template.config_vars): - print(f"[template] {var}") - - def _run_check_vars(self) -> None: - if self._lack_vars is not None: - print("Lacks:") - for var in self._lack_vars: - print(var) - - def setup_arg_parser(self, arg_parser): - subparsers = arg_parser.add_subparsers( - dest="template_command", required=True, metavar="TEMPLATE_COMMAND" - ) - _list_parser = subparsers.add_parser("list", help="list templates") - vars_parser = subparsers.add_parser( - "vars", help="list variables used in all templates" - ) - vars_parser.add_argument( - "-r", - "--required", - help="only list really required one.", - action="store_true", - ) - _check_vars_parser = subparsers.add_parser( - "check-vars", - help="check if required vars are set", - ) - generate_parser = subparsers.add_parser("generate", help="generate templates") - generate_parser.add_argument( - "--no-dry-run", action="store_true", help="generate and write target files" - ) - - def run_command(self, args: Namespace) -> None: - if args.template_command == "list": - self._print_file_lists() - elif args.template_command == "vars": - self._print_vars(args.required) - elif args.template_command == "generate": - dry_run = not args.no_dry_run - self._generate_files(dry_run) - if dry_run: - print("Dry run successfully.") - print( - f"Will delete dir {self.generated_dir.full_path_str} if it exists." - ) |