aboutsummaryrefslogtreecommitdiff
path: root/python/cru/service
diff options
context:
space:
mode:
Diffstat (limited to 'python/cru/service')
-rw-r--r--python/cru/service/__init__.py0
-rw-r--r--python/cru/service/__main__.py27
-rw-r--r--python/cru/service/_app.py30
-rw-r--r--python/cru/service/_base.py400
-rw-r--r--python/cru/service/_gen_cmd.py200
-rw-r--r--python/cru/service/_nginx.py263
-rw-r--r--python/cru/service/_template.py228
7 files changed, 1148 insertions, 0 deletions
diff --git a/python/cru/service/__init__.py b/python/cru/service/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/python/cru/service/__init__.py
diff --git a/python/cru/service/__main__.py b/python/cru/service/__main__.py
new file mode 100644
index 0000000..2a0268b
--- /dev/null
+++ b/python/cru/service/__main__.py
@@ -0,0 +1,27 @@
+import sys
+
+from cru import CruException
+
+from ._app import create_app
+
+
+def main():
+ app = create_app()
+ app.run_command()
+
+
+if __name__ == "__main__":
+ version_info = sys.version_info
+ if not (version_info.major == 3 and version_info.minor >= 11):
+ print("This application requires Python 3.11 or later.", file=sys.stderr)
+ sys.exit(1)
+
+ try:
+ main()
+ except CruException as e:
+ user_message = e.get_user_message()
+ if user_message is not None:
+ print(f"Error: {user_message}")
+ exit(1)
+ else:
+ raise
diff --git a/python/cru/service/_app.py b/python/cru/service/_app.py
new file mode 100644
index 0000000..b4c6271
--- /dev/null
+++ b/python/cru/service/_app.py
@@ -0,0 +1,30 @@
+from ._base import (
+ AppBase,
+ CommandDispatcher,
+ PathCommandProvider,
+)
+from ._template import TemplateManager
+from ._nginx import NginxManager
+from ._gen_cmd import GenCmdProvider
+
+APP_ID = "crupest"
+
+
+class App(AppBase):
+ def __init__(self):
+ super().__init__(APP_ID, f"{APP_ID}-service")
+ self.add_feature(PathCommandProvider())
+ self.add_feature(TemplateManager())
+ self.add_feature(NginxManager())
+ self.add_feature(GenCmdProvider())
+ self.add_feature(CommandDispatcher())
+
+ def run_command(self):
+ command_dispatcher = self.get_feature(CommandDispatcher)
+ command_dispatcher.run_command()
+
+
+def create_app() -> App:
+ app = App()
+ app.setup()
+ return app
diff --git a/python/cru/service/_base.py b/python/cru/service/_base.py
new file mode 100644
index 0000000..e1eee70
--- /dev/null
+++ b/python/cru/service/_base.py
@@ -0,0 +1,400 @@
+from __future__ import annotations
+
+from argparse import ArgumentParser, Namespace
+from abc import ABC, abstractmethod
+import argparse
+import os
+from pathlib import Path
+from typing import TypeVar, overload
+
+from cru import CruException, CruLogicError
+
+_Feature = TypeVar("_Feature", bound="AppFeatureProvider")
+
+
+class AppError(CruException):
+ pass
+
+
+class AppFeatureError(AppError):
+ def __init__(self, message, feature: type | str, *args, **kwargs):
+ super().__init__(message, *args, **kwargs)
+ self._feature = feature
+
+ @property
+ def feature(self) -> type | str:
+ return self._feature
+
+
+class AppPathError(CruException):
+ def __init__(self, message, _path: str | Path, *args, **kwargs):
+ super().__init__(message, *args, **kwargs)
+ self._path = str(_path)
+
+ @property
+ def path(self) -> str:
+ return self._path
+
+
+class AppPath(ABC):
+ def __init__(self, id: str, is_dir: bool, description: str) -> None:
+ self._is_dir = is_dir
+ self._id = id
+ self._description = description
+
+ @property
+ @abstractmethod
+ def parent(self) -> AppPath | None: ...
+
+ @property
+ @abstractmethod
+ def app(self) -> AppBase: ...
+
+ @property
+ def id(self) -> str:
+ return self._id
+
+ @property
+ def description(self) -> str:
+ return self._description
+
+ @property
+ def is_dir(self) -> bool:
+ return self._is_dir
+
+ @property
+ @abstractmethod
+ def full_path(self) -> Path: ...
+
+ @property
+ def full_path_str(self) -> str:
+ return str(self.full_path)
+
+ def check_parents(self, must_exist: bool = False) -> bool:
+ for p in reversed(self.full_path.parents):
+ if not p.exists() and not must_exist:
+ return False
+ if not p.is_dir():
+ raise AppPathError("Parents' path must be a dir.", self.full_path)
+ return True
+
+ def check_self(self, must_exist: bool = False) -> bool:
+ if not self.check_parents(must_exist):
+ return False
+ if not self.full_path.exists():
+ if not must_exist:
+ return False
+ raise AppPathError("Not exist.", self.full_path)
+ if self.is_dir:
+ if not self.full_path.is_dir():
+ raise AppPathError("Should be a directory, but not.", self.full_path)
+ else:
+ return True
+ else:
+ if not self.full_path.is_file():
+ raise AppPathError("Should be a file, but not.", self.full_path)
+ else:
+ return True
+
+ def ensure(self, create_file: bool = False) -> None:
+ e = self.check_self(False)
+ if not e:
+ os.makedirs(self.full_path.parent, exist_ok=True)
+ if self.is_dir:
+ os.mkdir(self.full_path)
+ elif create_file:
+ with open(self.full_path, "w") as f:
+ f.write("")
+
+ def read_text(self) -> str:
+ if self.is_dir:
+ raise AppPathError("Can't read text of a dir.", self.full_path)
+ self.check_self()
+ return self.full_path.read_text()
+
+ def add_subpath(
+ self,
+ name: str,
+ is_dir: bool,
+ /,
+ id: str | None = None,
+ description: str = "",
+ ) -> AppFeaturePath:
+ return self.app._add_path(name, is_dir, self, id, description)
+
+ @property
+ def app_relative_path(self) -> Path:
+ return self.full_path.relative_to(self.app.root.full_path)
+
+
+class AppFeaturePath(AppPath):
+ def __init__(
+ self,
+ parent: AppPath,
+ name: str,
+ is_dir: bool,
+ /,
+ id: str | None = None,
+ description: str = "",
+ ) -> None:
+ super().__init__(id or name, is_dir, description)
+ self._name = name
+ self._parent = parent
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def parent(self) -> AppPath:
+ return self._parent
+
+ @property
+ def app(self) -> AppBase:
+ return self.parent.app
+
+ @property
+ def full_path(self) -> Path:
+ return Path(self.parent.full_path, self.name).resolve()
+
+
+class AppRootPath(AppPath):
+ def __init__(self, app: AppBase, path: Path):
+ super().__init__(f"/{id}", True, f"Application {id} root path.")
+ self._app = app
+ self._full_path = path.resolve()
+
+ @property
+ def parent(self) -> None:
+ return None
+
+ @property
+ def app(self) -> AppBase:
+ return self._app
+
+ @property
+ def full_path(self) -> Path:
+ return self._full_path
+
+
+class AppFeatureProvider(ABC):
+ def __init__(self, name: str, /, app: AppBase | None = None):
+ super().__init__()
+ self._name = name
+ self._app = app if app else AppBase.get_instance()
+
+ @property
+ def app(self) -> AppBase:
+ return self._app
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @abstractmethod
+ def setup(self) -> None: ...
+
+
+class AppCommandFeatureProvider(AppFeatureProvider):
+ @abstractmethod
+ def get_command_info(self) -> tuple[str, str]: ...
+
+ @abstractmethod
+ def setup_arg_parser(self, arg_parser: ArgumentParser): ...
+
+ @abstractmethod
+ def run_command(self, args: Namespace) -> None: ...
+
+
+class PathCommandProvider(AppCommandFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("path-command-provider")
+
+ def setup(self):
+ pass
+
+ def get_command_info(self):
+ return ("path", "Get information about paths used by app.")
+
+ def setup_arg_parser(self, arg_parser: ArgumentParser) -> None:
+ subparsers = arg_parser.add_subparsers(
+ dest="path_command", metavar="PATH_COMMAND"
+ )
+ _list_parser = subparsers.add_parser(
+ "list", help="list special paths used by app"
+ )
+
+ def run_command(self, args: Namespace) -> None:
+ if args.path_command is None or args.path_command == "list":
+ for path in self.app.paths:
+ print(
+ f"{path.app_relative_path.as_posix()}{'/' if path.is_dir else ''}: {path.description}"
+ )
+
+
+class CommandDispatcher(AppFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("command-dispatcher")
+
+ def setup_arg_parser(self) -> None:
+ self._map: dict[str, AppCommandFeatureProvider] = {}
+ arg_parser = argparse.ArgumentParser(
+ description="Service management",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ )
+ subparsers = arg_parser.add_subparsers(
+ dest="command",
+ help="The management command to execute.",
+ metavar="COMMAND",
+ )
+ for feature in self.app.features:
+ if isinstance(feature, AppCommandFeatureProvider):
+ info = feature.get_command_info()
+ command_subparser = subparsers.add_parser(info[0], help=info[1])
+ feature.setup_arg_parser(command_subparser)
+ self._map[info[0]] = feature
+ self._arg_parser = arg_parser
+
+ def setup(self):
+ self._parsed_args = self.arg_parser.parse_args()
+
+ @property
+ def arg_parser(self) -> argparse.ArgumentParser:
+ return self._arg_parser
+
+ @property
+ def command_map(self) -> dict[str, AppCommandFeatureProvider]:
+ return self._map
+
+ @property
+ def program_args(self) -> argparse.Namespace:
+ return self._parsed_args
+
+ def run_command(self) -> None:
+ args = self.program_args
+ if args.command is None:
+ self.arg_parser.print_help()
+ return
+ self.command_map[args.command].run_command(args)
+
+
+class AppBase:
+ _instance: AppBase | None = None
+
+ @staticmethod
+ def get_instance() -> AppBase:
+ if AppBase._instance is None:
+ raise AppError("App instance not initialized")
+ return AppBase._instance
+
+ def __init__(self, app_id: str, name: str):
+ AppBase._instance = self
+ self._app_id = app_id
+ self._name = name
+ self._features: list[AppFeatureProvider] = []
+ self._paths: list[AppFeaturePath] = []
+
+ def setup(self) -> None:
+ command_dispatcher = self.get_feature(CommandDispatcher)
+ command_dispatcher.setup_arg_parser()
+ self._root = AppRootPath(self, Path(self._ensure_env("CRUPEST_PROJECT_DIR")))
+ self._data_dir = self._root.add_subpath(
+ self._ensure_env("CRUPEST_DATA_DIR"), True, id="data"
+ )
+ self._services_dir = self._root.add_subpath(
+ self._ensure_env("CRUPEST_SERVICES_DIR"), True, id="CRUPEST_SERVICES_DIR"
+ )
+ for feature in self.features:
+ feature.setup()
+ for path in self.paths:
+ path.check_self()
+
+ @property
+ def app_id(self) -> str:
+ return self._app_id
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ def _ensure_env(self, env_name: str) -> str:
+ value = os.getenv(env_name)
+ if value is None:
+ raise AppError(f"Environment variable {env_name} not set")
+ return value
+
+ @property
+ def root(self) -> AppRootPath:
+ return self._root
+
+ @property
+ def data_dir(self) -> AppFeaturePath:
+ return self._data_dir
+
+ @property
+ def services_dir(self) -> AppFeaturePath:
+ return self._services_dir
+
+ @property
+ def app_initialized(self) -> bool:
+ return self.data_dir.check_self()
+
+ @property
+ def features(self) -> list[AppFeatureProvider]:
+ return self._features
+
+ @property
+ def paths(self) -> list[AppFeaturePath]:
+ return self._paths
+
+ def add_feature(self, feature: _Feature) -> _Feature:
+ for f in self.features:
+ if f.name == feature.name:
+ raise AppFeatureError(
+ f"Duplicate feature name: {feature.name}.", feature.name
+ )
+ self._features.append(feature)
+ return feature
+
+ def _add_path(
+ self,
+ name: str,
+ is_dir: bool,
+ /,
+ parent: AppPath | None = None,
+ id: str | None = None,
+ description: str = "",
+ ) -> AppFeaturePath:
+ p = AppFeaturePath(
+ parent or self.root, name, is_dir, id=id, description=description
+ )
+ self._paths.append(p)
+ return p
+
+ @overload
+ def get_feature(self, feature: str) -> AppFeatureProvider: ...
+
+ @overload
+ def get_feature(self, feature: type[_Feature]) -> _Feature: ...
+
+ def get_feature(
+ self, feature: str | type[_Feature]
+ ) -> AppFeatureProvider | _Feature:
+ if isinstance(feature, str):
+ for f in self._features:
+ if f.name == feature:
+ return f
+ elif isinstance(feature, type):
+ for f in self._features:
+ if isinstance(f, feature):
+ return f
+ else:
+ raise CruLogicError("Argument must be the name of feature or its class.")
+
+ raise AppFeatureError(f"Feature {feature} not found.", feature)
+
+ def get_path(self, name: str) -> AppFeaturePath:
+ for p in self._paths:
+ if p.id == name or p.name == name:
+ return p
+ raise AppPathError(f"Application path {name} not found.", name)
diff --git a/python/cru/service/_gen_cmd.py b/python/cru/service/_gen_cmd.py
new file mode 100644
index 0000000..f51d65f
--- /dev/null
+++ b/python/cru/service/_gen_cmd.py
@@ -0,0 +1,200 @@
+from dataclasses import dataclass, replace
+from typing import TypeAlias
+
+from ._base import AppCommandFeatureProvider
+from ._nginx import NginxManager
+
+_Str_Or_Cmd_List: TypeAlias = str | list["_Cmd"]
+
+
+@dataclass
+class _Cmd:
+ name: str
+ desc: str
+ cmd: _Str_Or_Cmd_List
+
+ def clean(self) -> "_Cmd":
+ if isinstance(self.cmd, list):
+ return replace(
+ self,
+ cmd=[cmd.clean() for cmd in self.cmd],
+ )
+ elif isinstance(self.cmd, str):
+ return replace(self, cmd=self.cmd.strip())
+ else:
+ raise ValueError("Unexpected type for cmd.")
+
+ def generate_text(
+ self,
+ info_only: bool,
+ *,
+ parent: str | None = None,
+ ) -> str:
+ if parent is None:
+ tag = "COMMAND"
+ long_name = self.name
+ indent = ""
+ else:
+ tag = "SUBCOMMAND"
+ long_name = f"{parent}.{self.name}"
+ indent = " "
+
+ if info_only:
+ return f"{indent}[{long_name}]: {self.desc}"
+
+ text = f"--- {tag}[{long_name}]: {self.desc}"
+ if isinstance(self.cmd, str):
+ text += "\n" + self.cmd
+ elif isinstance(self.cmd, list):
+ for sub in self.cmd:
+ text += "\n" * 2 + sub.generate_text(info_only, parent=self.name)
+ else:
+ raise ValueError("Unexpected type for cmd.")
+
+ lines: list[str] = []
+ for line in text.splitlines():
+ if len(line) == 0:
+ lines.append("")
+ else:
+ lines.append(indent + line)
+ text = "\n".join(lines)
+
+ return text
+
+
+_docker_uninstall = _Cmd(
+ "uninstall",
+ "uninstall apt docker",
+ """
+for pkg in docker.io docker-doc docker-compose \
+podman-docker containerd runc; \
+do sudo apt-get remove $pkg; done
+""",
+)
+
+_docker_apt_certs = _Cmd(
+ "apt-certs",
+ "prepare apt certs",
+ """
+sudo apt-get update
+sudo apt-get install ca-certificates curl
+sudo install -m 0755 -d /etc/apt/keyrings
+""",
+)
+
+_docker_docker_certs = _Cmd(
+ "docker-certs",
+ "add docker certs",
+ """
+sudo curl -fsSL https://download.docker.com/linux/debian/gpg \
+-o /etc/apt/keyrings/docker.asc
+sudo chmod a+r /etc/apt/keyrings/docker.asc
+""",
+)
+
+_docker_apt_repo = _Cmd(
+ "apt-repo",
+ "add docker apt repo",
+ """
+echo \\
+ "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \
+https://download.docker.com/linux/debian \\
+ $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\
+ sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
+""",
+)
+
+_docker_install = _Cmd(
+ "install",
+ "update apt and install docker",
+ """
+sudo apt-get update
+sudo apt-get install docker-ce docker-ce-cli containerd.io \
+docker-buildx-plugin docker-compose-plugin
+""",
+)
+
+_docker_setup = _Cmd(
+ "setup",
+ "setup system for docker",
+ """
+sudo systemctl enable docker
+sudo systemctl start docker
+sudo groupadd -f docker
+sudo usermod -aG docker $USER
+# Remember to log out and log back in for the group changes to take effect
+""",
+)
+
+_docker = _Cmd(
+ "install-docker",
+ "install docker for a fresh new system",
+ [
+ _docker_uninstall,
+ _docker_apt_certs,
+ _docker_docker_certs,
+ _docker_apt_repo,
+ _docker_install,
+ _docker_setup,
+ ],
+)
+
+_update_blog = _Cmd(
+ "update-blog",
+ "re-generate blog pages",
+ """
+docker exec -it blog /scripts/update.bash
+""",
+)
+
+_git_user = _Cmd(
+ "git-user",
+ "add/set git server user and password",
+ """
+docker run -it --rm -v "$ps_file:/user-info" httpd htpasswd "/user-info" [username]
+""",
+)
+
+
+class GenCmdProvider(AppCommandFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("gen-cmd-provider")
+ self._cmds: dict[str, _Cmd] = {}
+ self._register_cmds(_docker, _update_blog, _git_user)
+
+ def _register_cmd(self, cmd: "_Cmd"):
+ self._cmds[cmd.name] = cmd.clean()
+
+ def _register_cmds(self, *cmds: "_Cmd"):
+ for c in cmds:
+ self._register_cmd(c)
+
+ def setup(self):
+ pass
+
+ def get_command_info(self):
+ return ("gen-cmd", "Get commands of running external cli tools.")
+
+ def setup_arg_parser(self, arg_parser):
+ subparsers = arg_parser.add_subparsers(
+ dest="gen_cmd", metavar="GEN_CMD_COMMAND"
+ )
+ certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
+ certbot_parser.add_argument(
+ "-t", "--test", action="store_true", help="run certbot in test mode"
+ )
+ for cmd in self._cmds.values():
+ subparsers.add_parser(cmd.name, help=cmd.desc)
+
+ def _print_cmd(self, name: str):
+ print(self._cmds[name].generate_text(False))
+
+ def run_command(self, args):
+ if args.gen_cmd is None or args.gen_cmd == "list":
+ print("[certbot]: certbot ssl cert commands")
+ for cmd in self._cmds.values():
+ print(cmd.generate_text(True))
+ elif args.gen_cmd == "certbot":
+ self.app.get_feature(NginxManager).print_all_certbot_commands(args.test)
+ else:
+ self._print_cmd(args.gen_cmd)
diff --git a/python/cru/service/_nginx.py b/python/cru/service/_nginx.py
new file mode 100644
index 0000000..87cff6d
--- /dev/null
+++ b/python/cru/service/_nginx.py
@@ -0,0 +1,263 @@
+from argparse import Namespace
+from enum import Enum, auto
+import re
+import subprocess
+from typing import TypeAlias
+
+from cru import CruInternalError
+
+from ._base import AppCommandFeatureProvider
+from ._template import TemplateManager
+
+
+class CertbotAction(Enum):
+ CREATE = auto()
+ EXPAND = auto()
+ SHRINK = auto()
+ RENEW = auto()
+
+
+class NginxManager(AppCommandFeatureProvider):
+ CertbotAction: TypeAlias = CertbotAction
+
+ def __init__(self) -> None:
+ super().__init__("nginx-manager")
+ self._domains_cache: list[str] | None = None
+
+ def setup(self) -> None:
+ pass
+
+ @property
+ def _template_manager(self) -> TemplateManager:
+ return self.app.get_feature(TemplateManager)
+
+ @property
+ def root_domain(self) -> str:
+ return self._template_manager.get_domain()
+
+ @property
+ def domains(self) -> list[str]:
+ if self._domains_cache is None:
+ self._domains_cache = self._get_domains()
+ return self._domains_cache
+
+ @property
+ def subdomains(self) -> list[str]:
+ suffix = "." + self.root_domain
+ return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)]
+
+ def _get_domains_from_text(self, text: str) -> set[str]:
+ domains: set[str] = set()
+ regex = re.compile(r"server_name\s+(\S+)\s*;")
+ for match in regex.finditer(text):
+ domains.add(match[1])
+ return domains
+
+ def _join_generated_nginx_conf_text(self) -> str:
+ result = ""
+ for path, text in self._template_manager.generate():
+ if "nginx" in str(path):
+ result += text
+ return result
+
+ def _get_domains(self) -> list[str]:
+ text = self._join_generated_nginx_conf_text()
+ domains = self._get_domains_from_text(text)
+ domains.remove(self.root_domain)
+ return [self.root_domain, *domains]
+
+ def _print_domains(self) -> None:
+ for domain in self.domains:
+ print(domain)
+
+ def _certbot_command(
+ self,
+ action: CertbotAction | str,
+ test: bool,
+ *,
+ docker=True,
+ standalone=None,
+ email=None,
+ agree_tos=True,
+ ) -> str:
+ if isinstance(action, str):
+ action = CertbotAction[action.upper()]
+
+ command_args = []
+
+ add_domain_option = True
+ if action is CertbotAction.CREATE:
+ if standalone is None:
+ standalone = True
+ command_action = "certonly"
+ elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]:
+ if standalone is None:
+ standalone = False
+ command_action = "certonly"
+ elif action is CertbotAction.RENEW:
+ if standalone is None:
+ standalone = False
+ add_domain_option = False
+ command_action = "renew"
+ else:
+ raise CruInternalError("Invalid certbot action.")
+
+ data_dir = self.app.data_dir.full_path.as_posix()
+
+ if not docker:
+ command_args.append("certbot")
+ else:
+ command_args.extend(
+ [
+ "docker run -it --rm --name certbot",
+ f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"',
+ f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"',
+ ]
+ )
+ if standalone:
+ command_args.append('-p "0.0.0.0:80:80"')
+ else:
+ command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"')
+
+ command_args.append("certbot/certbot")
+
+ command_args.append(command_action)
+
+ command_args.append(f"--cert-name {self.root_domain}")
+
+ if standalone:
+ command_args.append("--standalone")
+ else:
+ command_args.append("--webroot -w /var/www/certbot")
+
+ if add_domain_option:
+ command_args.append(" ".join([f"-d {domain}" for domain in self.domains]))
+
+ if email is not None:
+ command_args.append(f"--email {email}")
+
+ if agree_tos:
+ command_args.append("--agree-tos")
+
+ if test:
+ command_args.append("--test-cert --dry-run")
+
+ return " ".join(command_args)
+
+ def print_all_certbot_commands(self, test: bool):
+ print("### COMMAND: (standalone) create certs")
+ print(
+ self._certbot_command(
+ CertbotAction.CREATE,
+ test,
+ email=self._template_manager.get_email(),
+ )
+ )
+ print()
+ print("### COMMAND: (webroot+nginx) expand or shrink certs")
+ print(
+ self._certbot_command(
+ CertbotAction.EXPAND,
+ test,
+ email=self._template_manager.get_email(),
+ )
+ )
+ print()
+ print("### COMMAND: (webroot+nginx) renew certs")
+ print(
+ self._certbot_command(
+ CertbotAction.RENEW,
+ test,
+ email=self._template_manager.get_email(),
+ )
+ )
+
+ @property
+ def _cert_path_str(self) -> str:
+ return str(
+ self.app.data_dir.full_path
+ / "certbot/certs/live"
+ / self.root_domain
+ / "fullchain.pem"
+ )
+
+ def get_command_info(self):
+ return "nginx", "Manage nginx related things."
+
+ def setup_arg_parser(self, arg_parser):
+ subparsers = arg_parser.add_subparsers(
+ dest="nginx_command", required=True, metavar="NGINX_COMMAND"
+ )
+ _list_parser = subparsers.add_parser("list", help="list domains")
+ certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
+ certbot_parser.add_argument(
+ "--no-test",
+ action="store_true",
+ help="remove args making certbot run in test mode",
+ )
+
+ def run_command(self, args: Namespace) -> None:
+ if args.nginx_command == "list":
+ self._print_domains()
+ elif args.nginx_command == "certbot":
+ self.print_all_certbot_commands(not args.no_test)
+
+ def _generate_dns_zone(
+ self,
+ ip: str,
+ /,
+ ttl: str | int = 600,
+ *,
+ enable_mail: bool = True,
+ dkim: str | None = None,
+ ) -> str:
+ # TODO: Not complete and test now.
+ root_domain = self.root_domain
+ result = f"$ORIGIN {root_domain}.\n\n"
+ result += "; A records\n"
+ result += f"@ {ttl} IN A {ip}\n"
+ for subdomain in self.subdomains:
+ result += f"{subdomain} {ttl} IN A {ip}\n"
+
+ if enable_mail:
+ result += "\n; MX records\n"
+ result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n"
+ result += "\n; SPF record\n"
+ result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n'
+ if dkim is not None:
+ result += "\n; DKIM record\n"
+ result += f'mail._domainkey {ttl} IN TEXT "{dkim}"'
+ result += "\n; DMARC record\n"
+ dmarc_options = [
+ "v=DMARC1",
+ "p=none",
+ f"rua=mailto:dmarc.report@{root_domain}",
+ f"ruf=mailto:dmarc.report@{root_domain}",
+ "sp=none",
+ "ri=86400",
+ ]
+ result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n'
+ return result
+
+ def _get_dkim_from_mailserver(self) -> str | None:
+ # TODO: Not complete and test now.
+ dkim_path = (
+ self.app.data_dir.full_path
+ / "dms/config/opendkim/keys"
+ / self.root_domain
+ / "mail.txt"
+ )
+ if not dkim_path.exists():
+ return None
+
+ p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True)
+ value = ""
+ for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")):
+ value += match.group(1)
+ return value
+
+ def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str:
+ # TODO: Not complete and test now.
+ return self._generate_dns_zone(
+ ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver()
+ )
diff --git a/python/cru/service/_template.py b/python/cru/service/_template.py
new file mode 100644
index 0000000..22c1d21
--- /dev/null
+++ b/python/cru/service/_template.py
@@ -0,0 +1,228 @@
+from argparse import Namespace
+from pathlib import Path
+import shutil
+from typing import NamedTuple
+import graphlib
+
+from cru import CruException
+from cru.parsing import SimpleLineVarParser
+from cru.template import TemplateTree, CruStrWrapperTemplate
+
+from ._base import AppCommandFeatureProvider, AppFeaturePath
+
+
+class _Config(NamedTuple):
+ text: str
+ config: dict[str, str]
+
+
+class _GeneratedConfig(NamedTuple):
+ base: _Config
+ private: _Config
+ merged: _Config
+
+
+class _PreConfig(NamedTuple):
+ base: _Config
+ private: _Config
+ config: dict[str, str]
+
+ @staticmethod
+ def create(base: _Config, private: _Config) -> "_PreConfig":
+ return _PreConfig(base, private, {**base.config, **private.config})
+
+ def _merge(self, generated: _Config):
+ text = (
+ "\n".join(
+ [
+ self.private.text.strip(),
+ self.base.text.strip(),
+ generated.text.strip(),
+ ]
+ )
+ + "\n"
+ )
+ config = {**self.config, **generated.config}
+ return _GeneratedConfig(self.base, self.private, _Config(text, config))
+
+
+class _Template(NamedTuple):
+ config: CruStrWrapperTemplate
+ config_vars: set[str]
+ tree: TemplateTree
+
+
+class TemplateManager(AppCommandFeatureProvider):
+ def __init__(self):
+ super().__init__("template-manager")
+
+ def setup(self) -> None:
+ self._base_config_file = self.app.services_dir.add_subpath("base-config", False)
+ self._private_config_file = self.app.data_dir.add_subpath("config", False)
+ self._template_config_file = self.app.services_dir.add_subpath(
+ "config.template", False
+ )
+ self._templates_dir = self.app.services_dir.add_subpath("templates", True)
+ self._generated_dir = self.app.services_dir.add_subpath("generated", True)
+
+ self._config_parser = SimpleLineVarParser()
+
+ def _read_pre(app_path: AppFeaturePath) -> _Config:
+ text = app_path.read_text()
+ config = self._read_config(text)
+ return _Config(text, config)
+
+ base = _read_pre(self._base_config_file)
+ private = _read_pre(self._private_config_file)
+ self._preconfig = _PreConfig.create(base, private)
+
+ self._generated: _GeneratedConfig | None = None
+
+ template_config_text = self._template_config_file.read_text()
+ self._template_config = self._read_config(template_config_text)
+
+ self._template = _Template(
+ CruStrWrapperTemplate(template_config_text),
+ set(self._template_config.keys()),
+ TemplateTree(
+ lambda text: CruStrWrapperTemplate(text),
+ self.templates_dir.full_path_str,
+ ),
+ )
+
+ self._real_required_vars = (
+ self._template.config_vars | self._template.tree.variables
+ ) - self._template.config_vars
+ lacks = self._real_required_vars - self._preconfig.config.keys()
+ self._lack_vars = lacks if len(lacks) > 0 else None
+
+ def _read_config_entry_names(self, text: str) -> set[str]:
+ return set(entry.key for entry in self._config_parser.parse(text))
+
+ def _read_config(self, text: str) -> dict[str, str]:
+ return {entry.key: entry.value for entry in self._config_parser.parse(text)}
+
+ @property
+ def templates_dir(self) -> AppFeaturePath:
+ return self._templates_dir
+
+ @property
+ def generated_dir(self) -> AppFeaturePath:
+ return self._generated_dir
+
+ def get_domain(self) -> str:
+ return self._preconfig.config["CRUPEST_DOMAIN"]
+
+ def get_email(self) -> str:
+ return self._preconfig.config["CRUPEST_EMAIL"]
+
+ def _generate_template_config(self, config: dict[str, str]) -> dict[str, str]:
+ entry_templates = {
+ key: CruStrWrapperTemplate(value)
+ for key, value in self._template_config.items()
+ }
+ sorter = graphlib.TopologicalSorter(
+ config
+ | {key: template.variables for key, template in entry_templates.items()}
+ )
+
+ vars: dict[str, str] = config.copy()
+ for _ in sorter.static_order():
+ del_keys = []
+ for key, template in entry_templates.items():
+ new = template.generate_partial(vars)
+ if not new.has_variables:
+ vars[key] = new.generate({})
+ del_keys.append(key)
+ else:
+ entry_templates[key] = new
+ for key in del_keys:
+ del entry_templates[key]
+ assert len(entry_templates) == 0
+ return {key: value for key, value in vars.items() if key not in config}
+
+ def _generate_config(self) -> _GeneratedConfig:
+ if self._generated is not None:
+ return self._generated
+ if self._lack_vars is not None:
+ raise CruException(f"Required vars are not defined: {self._lack_vars}.")
+ config = self._generate_template_config(self._preconfig.config)
+ text = self._template.config.generate(self._preconfig.config | config)
+ self._generated = self._preconfig._merge(_Config(text, config))
+ return self._generated
+
+ def generate(self) -> list[tuple[Path, str]]:
+ config = self._generate_config()
+ return [
+ (Path("config"), config.merged.text),
+ *self._template.tree.generate(config.merged.config),
+ ]
+
+ def _generate_files(self, dry_run: bool) -> None:
+ result = self.generate()
+ if not dry_run:
+ if self.generated_dir.full_path.exists():
+ shutil.rmtree(self.generated_dir.full_path)
+ for path, text in result:
+ des = self.generated_dir.full_path / path
+ des.parent.mkdir(parents=True, exist_ok=True)
+ with open(des, "w") as f:
+ f.write(text)
+
+ def get_command_info(self):
+ return ("template", "Manage templates.")
+
+ def _print_file_lists(self) -> None:
+ print(f"[{self._template.config.variable_count}]", "config")
+ for path, template in self._template.tree.templates:
+ print(f"[{template.variable_count}]", path.as_posix())
+
+ def _print_vars(self, required: bool) -> None:
+ for var in self._template.config.variables:
+ print(f"[config] {var}")
+ for var in self._template.tree.variables:
+ if not (required and var in self._template.config_vars):
+ print(f"[template] {var}")
+
+ def _run_check_vars(self) -> None:
+ if self._lack_vars is not None:
+ print("Lacks:")
+ for var in self._lack_vars:
+ print(var)
+
+ def setup_arg_parser(self, arg_parser):
+ subparsers = arg_parser.add_subparsers(
+ dest="template_command", required=True, metavar="TEMPLATE_COMMAND"
+ )
+ _list_parser = subparsers.add_parser("list", help="list templates")
+ vars_parser = subparsers.add_parser(
+ "vars", help="list variables used in all templates"
+ )
+ vars_parser.add_argument(
+ "-r",
+ "--required",
+ help="only list really required one.",
+ action="store_true",
+ )
+ _check_vars_parser = subparsers.add_parser(
+ "check-vars",
+ help="check if required vars are set",
+ )
+ generate_parser = subparsers.add_parser("generate", help="generate templates")
+ generate_parser.add_argument(
+ "--no-dry-run", action="store_true", help="generate and write target files"
+ )
+
+ def run_command(self, args: Namespace) -> None:
+ if args.template_command == "list":
+ self._print_file_lists()
+ elif args.template_command == "vars":
+ self._print_vars(args.required)
+ elif args.template_command == "generate":
+ dry_run = not args.no_dry_run
+ self._generate_files(dry_run)
+ if dry_run:
+ print("Dry run successfully.")
+ print(
+ f"Will delete dir {self.generated_dir.full_path_str} if it exists."
+ )