aboutsummaryrefslogtreecommitdiff
path: root/services/manager/service
diff options
context:
space:
mode:
Diffstat (limited to 'services/manager/service')
-rw-r--r--services/manager/service/__init__.py0
-rw-r--r--services/manager/service/__main__.py27
-rw-r--r--services/manager/service/_app.py30
-rw-r--r--services/manager/service/_base.py398
-rw-r--r--services/manager/service/_external.py81
-rw-r--r--services/manager/service/_nginx.py263
-rw-r--r--services/manager/service/_template.py228
7 files changed, 0 insertions, 1027 deletions
diff --git a/services/manager/service/__init__.py b/services/manager/service/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/services/manager/service/__init__.py
+++ /dev/null
diff --git a/services/manager/service/__main__.py b/services/manager/service/__main__.py
deleted file mode 100644
index 6ea0a8a..0000000
--- a/services/manager/service/__main__.py
+++ /dev/null
@@ -1,27 +0,0 @@
-import sys
-
-from manager import CruException
-
-from ._app import create_app
-
-
-def main():
- app = create_app()
- app.run_command()
-
-
-if __name__ == "__main__":
- version_info = sys.version_info
- if not (version_info.major == 3 and version_info.minor >= 11):
- print("This application requires Python 3.11 or later.", file=sys.stderr)
- sys.exit(1)
-
- try:
- main()
- except CruException as e:
- user_message = e.get_user_message()
- if user_message is not None:
- print(f"Error: {user_message}")
- exit(1)
- else:
- raise
diff --git a/services/manager/service/_app.py b/services/manager/service/_app.py
deleted file mode 100644
index 2304340..0000000
--- a/services/manager/service/_app.py
+++ /dev/null
@@ -1,30 +0,0 @@
-from ._base import (
- AppBase,
- CommandDispatcher,
- PathCommandProvider,
-)
-from ._template import TemplateManager
-from ._nginx import NginxManager
-from ._external import CliToolCommandProvider
-
-APP_ID = "crupest"
-
-
-class App(AppBase):
- def __init__(self):
- super().__init__(APP_ID, f"{APP_ID}-service")
- self.add_feature(PathCommandProvider())
- self.add_feature(TemplateManager())
- self.add_feature(NginxManager())
- self.add_feature(CliToolCommandProvider())
- self.add_feature(CommandDispatcher())
-
- def run_command(self):
- command_dispatcher = self.get_feature(CommandDispatcher)
- command_dispatcher.run_command()
-
-
-def create_app() -> App:
- app = App()
- app.setup()
- return app
diff --git a/services/manager/service/_base.py b/services/manager/service/_base.py
deleted file mode 100644
index 783296c..0000000
--- a/services/manager/service/_base.py
+++ /dev/null
@@ -1,398 +0,0 @@
-from __future__ import annotations
-
-from argparse import ArgumentParser, Namespace
-from abc import ABC, abstractmethod
-import argparse
-import os
-from pathlib import Path
-from typing import TypeVar, overload
-
-from manager import CruException, CruLogicError
-
-_Feature = TypeVar("_Feature", bound="AppFeatureProvider")
-
-
-class AppError(CruException):
- pass
-
-
-class AppFeatureError(AppError):
- def __init__(self, message, feature: type | str, *args, **kwargs):
- super().__init__(message, *args, **kwargs)
- self._feature = feature
-
- @property
- def feature(self) -> type | str:
- return self._feature
-
-
-class AppPathError(CruException):
- def __init__(self, message, _path: str | Path, *args, **kwargs):
- super().__init__(message, *args, **kwargs)
- self._path = str(_path)
-
- @property
- def path(self) -> str:
- return self._path
-
-
-class AppPath(ABC):
- def __init__(self, id: str, is_dir: bool, description: str) -> None:
- self._is_dir = is_dir
- self._id = id
- self._description = description
-
- @property
- @abstractmethod
- def parent(self) -> AppPath | None: ...
-
- @property
- @abstractmethod
- def app(self) -> AppBase: ...
-
- @property
- def id(self) -> str:
- return self._id
-
- @property
- def description(self) -> str:
- return self._description
-
- @property
- def is_dir(self) -> bool:
- return self._is_dir
-
- @property
- @abstractmethod
- def full_path(self) -> Path: ...
-
- @property
- def full_path_str(self) -> str:
- return str(self.full_path)
-
- def check_parents(self, must_exist: bool = False) -> bool:
- for p in reversed(self.full_path.parents):
- if not p.exists() and not must_exist:
- return False
- if not p.is_dir():
- raise AppPathError("Parents' path must be a dir.", self.full_path)
- return True
-
- def check_self(self, must_exist: bool = False) -> bool:
- if not self.check_parents(must_exist):
- return False
- if not self.full_path.exists():
- if not must_exist:
- return False
- raise AppPathError("Not exist.", self.full_path)
- if self.is_dir:
- if not self.full_path.is_dir():
- raise AppPathError("Should be a directory, but not.", self.full_path)
- else:
- return True
- else:
- if not self.full_path.is_file():
- raise AppPathError("Should be a file, but not.", self.full_path)
- else:
- return True
-
- def ensure(self, create_file: bool = False) -> None:
- e = self.check_self(False)
- if not e:
- os.makedirs(self.full_path.parent, exist_ok=True)
- if self.is_dir:
- os.mkdir(self.full_path)
- elif create_file:
- with open(self.full_path, "w") as f:
- f.write("")
-
- def read_text(self) -> str:
- if self.is_dir:
- raise AppPathError("Can't read text of a dir.", self.full_path)
- self.check_self()
- return self.full_path.read_text()
-
- def add_subpath(
- self,
- name: str,
- is_dir: bool,
- /,
- id: str | None = None,
- description: str = "",
- ) -> AppFeaturePath:
- return self.app._add_path(name, is_dir, self, id, description)
-
- @property
- def app_relative_path(self) -> Path:
- return self.full_path.relative_to(self.app.root.full_path)
-
-
-class AppFeaturePath(AppPath):
- def __init__(
- self,
- parent: AppPath,
- name: str,
- is_dir: bool,
- /,
- id: str | None = None,
- description: str = "",
- ) -> None:
- super().__init__(id or name, is_dir, description)
- self._name = name
- self._parent = parent
-
- @property
- def name(self) -> str:
- return self._name
-
- @property
- def parent(self) -> AppPath:
- return self._parent
-
- @property
- def app(self) -> AppBase:
- return self.parent.app
-
- @property
- def full_path(self) -> Path:
- return Path(self.parent.full_path, self.name).resolve()
-
-
-class AppRootPath(AppPath):
- def __init__(self, app: AppBase, path: Path):
- super().__init__(f"/{id}", True, f"Application {id} path.")
- self._app = app
- self._full_path = path.resolve()
-
- @property
- def parent(self) -> None:
- return None
-
- @property
- def app(self) -> AppBase:
- return self._app
-
- @property
- def full_path(self) -> Path:
- return self._full_path
-
-
-class AppFeatureProvider(ABC):
- def __init__(self, name: str, /, app: AppBase | None = None):
- super().__init__()
- self._name = name
- self._app = app if app else AppBase.get_instance()
-
- @property
- def app(self) -> AppBase:
- return self._app
-
- @property
- def name(self) -> str:
- return self._name
-
- @abstractmethod
- def setup(self) -> None: ...
-
-
-class AppCommandFeatureProvider(AppFeatureProvider):
- @abstractmethod
- def get_command_info(self) -> tuple[str, str]: ...
-
- @abstractmethod
- def setup_arg_parser(self, arg_parser: ArgumentParser): ...
-
- @abstractmethod
- def run_command(self, args: Namespace) -> None: ...
-
-
-class PathCommandProvider(AppCommandFeatureProvider):
- def __init__(self) -> None:
- super().__init__("path-command-provider")
-
- def setup(self):
- pass
-
- def get_command_info(self):
- return ("path", "Get information about paths used by app.")
-
- def setup_arg_parser(self, arg_parser: ArgumentParser) -> None:
- subparsers = arg_parser.add_subparsers(
- dest="path_command", required=True, metavar="PATH_COMMAND"
- )
- _list_parser = subparsers.add_parser(
- "list", help="list special paths used by app"
- )
-
- def run_command(self, args: Namespace) -> None:
- if args.path_command == "list":
- for path in self.app.paths:
- print(f"{path.app_relative_path.as_posix()}: {path.description}")
-
-
-class CommandDispatcher(AppFeatureProvider):
- def __init__(self) -> None:
- super().__init__("command-dispatcher")
-
- def setup_arg_parser(self) -> None:
- self._map: dict[str, AppCommandFeatureProvider] = {}
- arg_parser = argparse.ArgumentParser(
- description="Service management",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- )
- subparsers = arg_parser.add_subparsers(
- dest="command",
- help="The management command to execute.",
- metavar="COMMAND",
- )
- for feature in self.app.features:
- if isinstance(feature, AppCommandFeatureProvider):
- info = feature.get_command_info()
- command_subparser = subparsers.add_parser(info[0], help=info[1])
- feature.setup_arg_parser(command_subparser)
- self._map[info[0]] = feature
- self._arg_parser = arg_parser
-
- def setup(self):
- self._parsed_args = self.arg_parser.parse_args()
-
- @property
- def arg_parser(self) -> argparse.ArgumentParser:
- return self._arg_parser
-
- @property
- def command_map(self) -> dict[str, AppCommandFeatureProvider]:
- return self._map
-
- @property
- def program_args(self) -> argparse.Namespace:
- return self._parsed_args
-
- def run_command(self) -> None:
- args = self.program_args
- if args.command is None:
- self.arg_parser.print_help()
- return
- self.command_map[args.command].run_command(args)
-
-
-class AppBase:
- _instance: AppBase | None = None
-
- @staticmethod
- def get_instance() -> AppBase:
- if AppBase._instance is None:
- raise AppError("App instance not initialized")
- return AppBase._instance
-
- def __init__(self, app_id: str, name: str):
- AppBase._instance = self
- self._app_id = app_id
- self._name = name
- self._features: list[AppFeatureProvider] = []
- self._paths: list[AppFeaturePath] = []
-
- def setup(self) -> None:
- command_dispatcher = self.get_feature(CommandDispatcher)
- command_dispatcher.setup_arg_parser()
- self._root = AppRootPath(self, Path(self._ensure_env("CRUPEST_PROJECT_DIR")))
- self._data_dir = self._root.add_subpath(
- self._ensure_env("CRUPEST_DATA_DIR"), True, id="data"
- )
- self._services_dir = self._root.add_subpath(
- self._ensure_env("CRUPEST_SERVICES_DIR"), True, id="CRUPEST_SERVICES_DIR"
- )
- for feature in self.features:
- feature.setup()
- for path in self.paths:
- path.check_self()
-
- @property
- def app_id(self) -> str:
- return self._app_id
-
- @property
- def name(self) -> str:
- return self._name
-
- def _ensure_env(self, env_name: str) -> str:
- value = os.getenv(env_name)
- if value is None:
- raise AppError(f"Environment variable {env_name} not set")
- return value
-
- @property
- def root(self) -> AppRootPath:
- return self._root
-
- @property
- def data_dir(self) -> AppFeaturePath:
- return self._data_dir
-
- @property
- def services_dir(self) -> AppFeaturePath:
- return self._services_dir
-
- @property
- def app_initialized(self) -> bool:
- return self.data_dir.check_self()
-
- @property
- def features(self) -> list[AppFeatureProvider]:
- return self._features
-
- @property
- def paths(self) -> list[AppFeaturePath]:
- return self._paths
-
- def add_feature(self, feature: _Feature) -> _Feature:
- for f in self.features:
- if f.name == feature.name:
- raise AppFeatureError(
- f"Duplicate feature name: {feature.name}.", feature.name
- )
- self._features.append(feature)
- return feature
-
- def _add_path(
- self,
- name: str,
- is_dir: bool,
- /,
- parent: AppPath | None = None,
- id: str | None = None,
- description: str = "",
- ) -> AppFeaturePath:
- p = AppFeaturePath(
- parent or self.root, name, is_dir, id=id, description=description
- )
- self._paths.append(p)
- return p
-
- @overload
- def get_feature(self, feature: str) -> AppFeatureProvider: ...
-
- @overload
- def get_feature(self, feature: type[_Feature]) -> _Feature: ...
-
- def get_feature(
- self, feature: str | type[_Feature]
- ) -> AppFeatureProvider | _Feature:
- if isinstance(feature, str):
- for f in self._features:
- if f.name == feature:
- return f
- elif isinstance(feature, type):
- for f in self._features:
- if isinstance(f, feature):
- return f
- else:
- raise CruLogicError("Argument must be the name of feature or its class.")
-
- raise AppFeatureError(f"Feature {feature} not found.", feature)
-
- def get_path(self, name: str) -> AppFeaturePath:
- for p in self._paths:
- if p.id == name or p.name == name:
- return p
- raise AppPathError(f"Application path {name} not found.", name)
diff --git a/services/manager/service/_external.py b/services/manager/service/_external.py
deleted file mode 100644
index 2347e95..0000000
--- a/services/manager/service/_external.py
+++ /dev/null
@@ -1,81 +0,0 @@
-from ._base import AppCommandFeatureProvider
-from ._nginx import NginxManager
-
-
-class CliToolCommandProvider(AppCommandFeatureProvider):
- def __init__(self) -> None:
- super().__init__("cli-tool-command-provider")
-
- def setup(self):
- pass
-
- def get_command_info(self):
- return ("gen-cli", "Get commands of running external cli tools.")
-
- def setup_arg_parser(self, arg_parser):
- subparsers = arg_parser.add_subparsers(
- dest="gen_cli_command", required=True, metavar="GEN_CLI_COMMAND"
- )
- certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
- certbot_parser.add_argument(
- "-t", "--test", action="store_true", help="run certbot in test mode"
- )
- _install_docker_parser = subparsers.add_parser(
- "install-docker", help="print docker installation commands"
- )
- _update_blog_parser = subparsers.add_parser(
- "update-blog", help="print blog update command"
- )
-
- def _print_install_docker_commands(self) -> None:
- output = """
-### COMMAND: uninstall apt docker
-for pkg in docker.io docker-doc docker-compose \
-podman-docker containerd runc; \
-do sudo apt-get remove $pkg; done
-
-### COMMAND: prepare apt certs
-sudo apt-get update
-sudo apt-get install ca-certificates curl
-sudo install -m 0755 -d /etc/apt/keyrings
-
-### COMMAND: install certs
-sudo curl -fsSL https://download.docker.com/linux/debian/gpg \
--o /etc/apt/keyrings/docker.asc
-sudo chmod a+r /etc/apt/keyrings/docker.asc
-
-### COMMAND: add docker apt source
-echo \\
- "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \
-https://download.docker.com/linux/debian \\
- $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\
- sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
-
-### COMMAND: update apt and install docker
-sudo apt-get update
-sudo apt-get install docker-ce docker-ce-cli containerd.io \
-docker-buildx-plugin docker-compose-plugin
-
-### COMMAND: setup system for docker
-sudo systemctl enable docker
-sudo systemctl start docker
-sudo groupadd -f docker
-sudo usermod -aG docker $USER
-# Remember to log out and log back in for the group changes to take effect
-""".strip()
- print(output)
-
- def _print_update_blog_command(self):
- output = """
-### COMMAND: update blog
-docker exec -it blog /scripts/update.bash
-""".strip()
- print(output)
-
- def run_command(self, args):
- if args.gen_cli_command == "certbot":
- self.app.get_feature(NginxManager).print_all_certbot_commands(args.test)
- elif args.gen_cli_command == "install-docker":
- self._print_install_docker_commands()
- elif args.gen_cli_command == "update-blog":
- self._print_update_blog_command() \ No newline at end of file
diff --git a/services/manager/service/_nginx.py b/services/manager/service/_nginx.py
deleted file mode 100644
index 5dfc3ab..0000000
--- a/services/manager/service/_nginx.py
+++ /dev/null
@@ -1,263 +0,0 @@
-from argparse import Namespace
-from enum import Enum, auto
-import re
-import subprocess
-from typing import TypeAlias
-
-from manager import CruInternalError
-
-from ._base import AppCommandFeatureProvider
-from ._template import TemplateManager
-
-
-class CertbotAction(Enum):
- CREATE = auto()
- EXPAND = auto()
- SHRINK = auto()
- RENEW = auto()
-
-
-class NginxManager(AppCommandFeatureProvider):
- CertbotAction: TypeAlias = CertbotAction
-
- def __init__(self) -> None:
- super().__init__("nginx-manager")
- self._domains_cache: list[str] | None = None
-
- def setup(self) -> None:
- pass
-
- @property
- def _template_manager(self) -> TemplateManager:
- return self.app.get_feature(TemplateManager)
-
- @property
- def root_domain(self) -> str:
- return self._template_manager.get_domain()
-
- @property
- def domains(self) -> list[str]:
- if self._domains_cache is None:
- self._domains_cache = self._get_domains()
- return self._domains_cache
-
- @property
- def subdomains(self) -> list[str]:
- suffix = "." + self.root_domain
- return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)]
-
- def _get_domains_from_text(self, text: str) -> set[str]:
- domains: set[str] = set()
- regex = re.compile(r"server_name\s+(\S+)\s*;")
- for match in regex.finditer(text):
- domains.add(match[1])
- return domains
-
- def _join_generated_nginx_conf_text(self) -> str:
- result = ""
- for path, text in self._template_manager.generate():
- if path.parents[-1] == "nginx":
- result += text
- return result
-
- def _get_domains(self) -> list[str]:
- text = self._join_generated_nginx_conf_text()
- domains = self._get_domains_from_text(text)
- domains.remove(self.root_domain)
- return [self.root_domain, *domains]
-
- def _print_domains(self) -> None:
- for domain in self.domains:
- print(domain)
-
- def _certbot_command(
- self,
- action: CertbotAction | str,
- test: bool,
- *,
- docker=True,
- standalone=None,
- email=None,
- agree_tos=True,
- ) -> str:
- if isinstance(action, str):
- action = CertbotAction[action.upper()]
-
- command_args = []
-
- add_domain_option = True
- if action is CertbotAction.CREATE:
- if standalone is None:
- standalone = True
- command_action = "certonly"
- elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]:
- if standalone is None:
- standalone = False
- command_action = "certonly"
- elif action is CertbotAction.RENEW:
- if standalone is None:
- standalone = False
- add_domain_option = False
- command_action = "renew"
- else:
- raise CruInternalError("Invalid certbot action.")
-
- data_dir = self.app.data_dir.full_path.as_posix()
-
- if not docker:
- command_args.append("certbot")
- else:
- command_args.extend(
- [
- "docker run -it --rm --name certbot",
- f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"',
- f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"',
- ]
- )
- if standalone:
- command_args.append('-p "0.0.0.0:80:80"')
- else:
- command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"')
-
- command_args.append("certbot/certbot")
-
- command_args.append(command_action)
-
- command_args.append(f"--cert-name {self.root_domain}")
-
- if standalone:
- command_args.append("--standalone")
- else:
- command_args.append("--webroot -w /var/www/certbot")
-
- if add_domain_option:
- command_args.append(" ".join([f"-d {domain}" for domain in self.domains]))
-
- if email is not None:
- command_args.append(f"--email {email}")
-
- if agree_tos:
- command_args.append("--agree-tos")
-
- if test:
- command_args.append("--test-cert --dry-run")
-
- return " ".join(command_args)
-
- def print_all_certbot_commands(self, test: bool):
- print("### COMMAND: (standalone) create certs")
- print(
- self._certbot_command(
- CertbotAction.CREATE,
- test,
- email=self._template_manager.get_email(),
- )
- )
- print()
- print("### COMMAND: (webroot+nginx) expand or shrink certs")
- print(
- self._certbot_command(
- CertbotAction.EXPAND,
- test,
- email=self._template_manager.get_email(),
- )
- )
- print()
- print("### COMMAND: (webroot+nginx) renew certs")
- print(
- self._certbot_command(
- CertbotAction.RENEW,
- test,
- email=self._template_manager.get_email(),
- )
- )
-
- @property
- def _cert_path_str(self) -> str:
- return str(
- self.app.data_dir.full_path
- / "certbot/certs/live"
- / self.root_domain
- / "fullchain.pem"
- )
-
- def get_command_info(self):
- return "nginx", "Manage nginx related things."
-
- def setup_arg_parser(self, arg_parser):
- subparsers = arg_parser.add_subparsers(
- dest="nginx_command", required=True, metavar="NGINX_COMMAND"
- )
- _list_parser = subparsers.add_parser("list", help="list domains")
- certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
- certbot_parser.add_argument(
- "--no-test",
- action="store_true",
- help="remove args making certbot run in test mode",
- )
-
- def run_command(self, args: Namespace) -> None:
- if args.nginx_command == "list":
- self._print_domains()
- elif args.nginx_command == "certbot":
- self.print_all_certbot_commands(not args.no_test)
-
- def _generate_dns_zone(
- self,
- ip: str,
- /,
- ttl: str | int = 600,
- *,
- enable_mail: bool = True,
- dkim: str | None = None,
- ) -> str:
- # TODO: Not complete and test now.
- root_domain = self.root_domain
- result = f"$ORIGIN {root_domain}.\n\n"
- result += "; A records\n"
- result += f"@ {ttl} IN A {ip}\n"
- for subdomain in self.subdomains:
- result += f"{subdomain} {ttl} IN A {ip}\n"
-
- if enable_mail:
- result += "\n; MX records\n"
- result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n"
- result += "\n; SPF record\n"
- result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n'
- if dkim is not None:
- result += "\n; DKIM record\n"
- result += f'mail._domainkey {ttl} IN TEXT "{dkim}"'
- result += "\n; DMARC record\n"
- dmarc_options = [
- "v=DMARC1",
- "p=none",
- f"rua=mailto:dmarc.report@{root_domain}",
- f"ruf=mailto:dmarc.report@{root_domain}",
- "sp=none",
- "ri=86400",
- ]
- result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n'
- return result
-
- def _get_dkim_from_mailserver(self) -> str | None:
- # TODO: Not complete and test now.
- dkim_path = (
- self.app.data_dir.full_path
- / "dms/config/opendkim/keys"
- / self.root_domain
- / "mail.txt"
- )
- if not dkim_path.exists():
- return None
-
- p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True)
- value = ""
- for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")):
- value += match.group(1)
- return value
-
- def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str:
- # TODO: Not complete and test now.
- return self._generate_dns_zone(
- ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver()
- )
diff --git a/services/manager/service/_template.py b/services/manager/service/_template.py
deleted file mode 100644
index 90c19ec..0000000
--- a/services/manager/service/_template.py
+++ /dev/null
@@ -1,228 +0,0 @@
-from argparse import Namespace
-from pathlib import Path
-import shutil
-from typing import NamedTuple
-import graphlib
-
-from manager import CruException
-from manager.parsing import SimpleLineVarParser
-from manager.template import TemplateTree, CruStrWrapperTemplate
-
-from ._base import AppCommandFeatureProvider, AppFeaturePath
-
-
-class _Config(NamedTuple):
- text: str
- config: dict[str, str]
-
-
-class _GeneratedConfig(NamedTuple):
- base: _Config
- private: _Config
- merged: _Config
-
-
-class _PreConfig(NamedTuple):
- base: _Config
- private: _Config
- config: dict[str, str]
-
- @staticmethod
- def create(base: _Config, private: _Config) -> "_PreConfig":
- return _PreConfig(base, private, {**base.config, **private.config})
-
- def _merge(self, generated: _Config):
- text = (
- "\n".join(
- [
- self.private.text.strip(),
- self.base.text.strip(),
- generated.text.strip(),
- ]
- )
- + "\n"
- )
- config = {**self.config, **generated.config}
- return _GeneratedConfig(self.base, self.private, _Config(text, config))
-
-
-class _Template(NamedTuple):
- config: CruStrWrapperTemplate
- config_vars: set[str]
- tree: TemplateTree
-
-
-class TemplateManager(AppCommandFeatureProvider):
- def __init__(self):
- super().__init__("template-manager")
-
- def setup(self) -> None:
- self._base_config_file = self.app.services_dir.add_subpath("base-config", False)
- self._private_config_file = self.app.data_dir.add_subpath("config", False)
- self._template_config_file = self.app.services_dir.add_subpath(
- "config.template", False
- )
- self._templates_dir = self.app.services_dir.add_subpath("templates", True)
- self._generated_dir = self.app.services_dir.add_subpath("generated", True)
-
- self._config_parser = SimpleLineVarParser()
-
- def _read_pre(app_path: AppFeaturePath) -> _Config:
- text = app_path.read_text()
- config = self._read_config(text)
- return _Config(text, config)
-
- base = _read_pre(self._base_config_file)
- private = _read_pre(self._private_config_file)
- self._preconfig = _PreConfig.create(base, private)
-
- self._generated: _GeneratedConfig | None = None
-
- template_config_text = self._template_config_file.read_text()
- self._template_config = self._read_config(template_config_text)
-
- self._template = _Template(
- CruStrWrapperTemplate(template_config_text),
- set(self._template_config.keys()),
- TemplateTree(
- lambda text: CruStrWrapperTemplate(text),
- self.templates_dir.full_path_str,
- ),
- )
-
- self._real_required_vars = (
- self._template.config_vars | self._template.tree.variables
- ) - self._template.config_vars
- lacks = self._real_required_vars - self._preconfig.config.keys()
- self._lack_vars = lacks if len(lacks) > 0 else None
-
- def _read_config_entry_names(self, text: str) -> set[str]:
- return set(entry.key for entry in self._config_parser.parse(text))
-
- def _read_config(self, text: str) -> dict[str, str]:
- return {entry.key: entry.value for entry in self._config_parser.parse(text)}
-
- @property
- def templates_dir(self) -> AppFeaturePath:
- return self._templates_dir
-
- @property
- def generated_dir(self) -> AppFeaturePath:
- return self._generated_dir
-
- def get_domain(self) -> str:
- return self._preconfig.config["CRUPEST_DOMAIN"]
-
- def get_email(self) -> str:
- return self._preconfig.config["CRUPEST_EMAIL"]
-
- def _generate_template_config(self, config: dict[str, str]) -> dict[str, str]:
- entry_templates = {
- key: CruStrWrapperTemplate(value)
- for key, value in self._template_config.items()
- }
- sorter = graphlib.TopologicalSorter(
- config
- | {key: template.variables for key, template in entry_templates.items()}
- )
-
- vars: dict[str, str] = config.copy()
- for _ in sorter.static_order():
- del_keys = []
- for key, template in entry_templates.items():
- new = template.generate_partial(vars)
- if not new.has_variables:
- vars[key] = new.generate({})
- del_keys.append(key)
- else:
- entry_templates[key] = new
- for key in del_keys:
- del entry_templates[key]
- assert len(entry_templates) == 0
- return {key: value for key, value in vars.items() if key not in config}
-
- def _generate_config(self) -> _GeneratedConfig:
- if self._generated is not None:
- return self._generated
- if self._lack_vars is not None:
- raise CruException(f"Required vars are not defined: {self._lack_vars}.")
- config = self._generate_template_config(self._preconfig.config)
- text = self._template.config.generate(self._preconfig.config | config)
- self._generated = self._preconfig._merge(_Config(text, config))
- return self._generated
-
- def generate(self) -> list[tuple[Path, str]]:
- config = self._generate_config()
- return [
- (Path("config"), config.merged.text),
- *self._template.tree.generate(config.merged.config),
- ]
-
- def _generate_files(self, dry_run: bool) -> None:
- result = self.generate()
- if not dry_run:
- if self.generated_dir.full_path.exists():
- shutil.rmtree(self.generated_dir.full_path)
- for path, text in result:
- des = self.generated_dir.full_path / path
- des.parent.mkdir(parents=True, exist_ok=True)
- with open(des, "w") as f:
- f.write(text)
-
- def get_command_info(self):
- return ("template", "Manage templates.")
-
- def _print_file_lists(self) -> None:
- print(f"[{self._template.config.variable_count}]", "config")
- for path, template in self._template.tree.templates:
- print(f"[{template.variable_count}]", path.as_posix())
-
- def _print_vars(self, required: bool) -> None:
- for var in self._template.config.variables:
- print(f"[config] {var}")
- for var in self._template.tree.variables:
- if not (required and var in self._template.config_vars):
- print(f"[template] {var}")
-
- def _run_check_vars(self) -> None:
- if self._lack_vars is not None:
- print("Lacks:")
- for var in self._lack_vars:
- print(var)
-
- def setup_arg_parser(self, arg_parser):
- subparsers = arg_parser.add_subparsers(
- dest="template_command", required=True, metavar="TEMPLATE_COMMAND"
- )
- _list_parser = subparsers.add_parser("list", help="list templates")
- vars_parser = subparsers.add_parser(
- "vars", help="list variables used in all templates"
- )
- vars_parser.add_argument(
- "-r",
- "--required",
- help="only list really required one.",
- action="store_true",
- )
- _check_vars_parser = subparsers.add_parser(
- "check-vars",
- help="check if required vars are set",
- )
- generate_parser = subparsers.add_parser("generate", help="generate templates")
- generate_parser.add_argument(
- "--no-dry-run", action="store_true", help="generate and write target files"
- )
-
- def run_command(self, args: Namespace) -> None:
- if args.template_command == "list":
- self._print_file_lists()
- elif args.template_command == "vars":
- self._print_vars(args.required)
- elif args.template_command == "generate":
- dry_run = not args.no_dry_run
- self._generate_files(dry_run)
- if dry_run:
- print("Dry run successfully.")
- print(
- f"Will delete dir {self.generated_dir.full_path_str} if it exists."
- )