diff options
Diffstat (limited to 'tools/cru-py/cru/service')
-rw-r--r-- | tools/cru-py/cru/service/__init__.py | 0 | ||||
-rw-r--r-- | tools/cru-py/cru/service/__main__.py | 20 | ||||
-rw-r--r-- | tools/cru-py/cru/service/_app.py | 34 | ||||
-rw-r--r-- | tools/cru-py/cru/service/_base.py | 449 | ||||
-rw-r--r-- | tools/cru-py/cru/service/_config.py | 446 | ||||
-rw-r--r-- | tools/cru-py/cru/service/_external.py | 81 | ||||
-rw-r--r-- | tools/cru-py/cru/service/_nginx.py | 281 | ||||
-rw-r--r-- | tools/cru-py/cru/service/_template.py | 86 |
8 files changed, 1397 insertions, 0 deletions
diff --git a/tools/cru-py/cru/service/__init__.py b/tools/cru-py/cru/service/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tools/cru-py/cru/service/__init__.py diff --git a/tools/cru-py/cru/service/__main__.py b/tools/cru-py/cru/service/__main__.py new file mode 100644 index 0000000..1c10e82 --- /dev/null +++ b/tools/cru-py/cru/service/__main__.py @@ -0,0 +1,20 @@ +from cru import CruException + +from ._app import create_app + + +def main(): + app = create_app() + app.run_command() + + +if __name__ == "__main__": + try: + main() + except CruException as e: + user_message = e.get_user_message() + if user_message is not None: + print(f"Error: {user_message}") + exit(1) + else: + raise diff --git a/tools/cru-py/cru/service/_app.py b/tools/cru-py/cru/service/_app.py new file mode 100644 index 0000000..6030dad --- /dev/null +++ b/tools/cru-py/cru/service/_app.py @@ -0,0 +1,34 @@ +from ._base import ( + AppBase, + CommandDispatcher, + AppInitializer, + PathCommandProvider, +) +from ._config import ConfigManager +from ._template import TemplateManager +from ._nginx import NginxManager +from ._external import CliToolCommandProvider + +APP_ID = "crupest" + + +class App(AppBase): + def __init__(self): + super().__init__(APP_ID, f"{APP_ID}-service") + self.add_feature(PathCommandProvider()) + self.add_feature(AppInitializer()) + self.add_feature(ConfigManager()) + self.add_feature(TemplateManager()) + self.add_feature(NginxManager()) + self.add_feature(CliToolCommandProvider()) + self.add_feature(CommandDispatcher()) + + def run_command(self): + command_dispatcher = self.get_feature(CommandDispatcher) + command_dispatcher.run_command() + + +def create_app() -> App: + app = App() + app.setup() + return app diff --git a/tools/cru-py/cru/service/_base.py b/tools/cru-py/cru/service/_base.py new file mode 100644 index 0000000..ad813c9 --- /dev/null +++ b/tools/cru-py/cru/service/_base.py @@ -0,0 +1,449 @@ +from __future__ import annotations + +from argparse import ArgumentParser, Namespace +from abc import ABC, abstractmethod +import argparse +import os +from pathlib import Path +from typing import TypeVar, overload + +from cru import CruException, CruLogicError + +_Feature = TypeVar("_Feature", bound="AppFeatureProvider") + + +class AppError(CruException): + pass + + +class AppFeatureError(AppError): + def __init__(self, message, feature: type | str, *args, **kwargs): + super().__init__(message, *args, **kwargs) + self._feature = feature + + @property + def feature(self) -> type | str: + return self._feature + + +class AppPathError(CruException): + def __init__(self, message, _path: str | Path, *args, **kwargs): + super().__init__(message, *args, **kwargs) + self._path = str(_path) + + @property + def path(self) -> str: + return self._path + + +class AppPath(ABC): + def __init__(self, id: str, is_dir: bool, description: str) -> None: + self._is_dir = is_dir + self._id = id + self._description = description + + @property + @abstractmethod + def parent(self) -> AppPath | None: ... + + @property + @abstractmethod + def app(self) -> AppBase: ... + + @property + def id(self) -> str: + return self._id + + @property + def description(self) -> str: + return self._description + + @property + def is_dir(self) -> bool: + return self._is_dir + + @property + @abstractmethod + def full_path(self) -> Path: ... + + @property + def full_path_str(self) -> str: + return str(self.full_path) + + def check_parents(self, must_exist: bool = False) -> bool: + for p in reversed(self.full_path.parents): + if not p.exists() and not must_exist: + return False + if not p.is_dir(): + raise AppPathError("Parents' path must be a dir.", self.full_path) + return True + + def check_self(self, must_exist: bool = False) -> bool: + if not self.check_parents(must_exist): + return False + if not self.full_path.exists(): + if not must_exist: + return False + raise AppPathError("Not exist.", self.full_path) + if self.is_dir: + if not self.full_path.is_dir(): + raise AppPathError("Should be a directory, but not.", self.full_path) + else: + return True + else: + if not self.full_path.is_file(): + raise AppPathError("Should be a file, but not.", self.full_path) + else: + return True + + def ensure(self, create_file: bool = False) -> None: + e = self.check_self(False) + if not e: + os.makedirs(self.full_path.parent, exist_ok=True) + if self.is_dir: + os.mkdir(self.full_path) + elif create_file: + with open(self.full_path, "w") as f: + f.write("") + + def add_subpath( + self, + name: str, + is_dir: bool, + /, + id: str | None = None, + description: str = "", + ) -> AppFeaturePath: + return self.app.add_path(name, is_dir, self, id, description) + + @property + def app_relative_path(self) -> Path: + return self.full_path.relative_to(self.app.root.full_path) + + +class AppFeaturePath(AppPath): + def __init__( + self, + parent: AppPath, + name: str, + is_dir: bool, + /, + id: str | None = None, + description: str = "", + ) -> None: + super().__init__(id or name, is_dir, description) + self._name = name + self._parent = parent + + @property + def name(self) -> str: + return self._name + + @property + def parent(self) -> AppPath: + return self._parent + + @property + def app(self) -> AppBase: + return self.parent.app + + @property + def full_path(self) -> Path: + return Path(self.parent.full_path, self.name).resolve() + + +class AppRootPath(AppPath): + def __init__(self, app: AppBase): + super().__init__("root", True, "Application root path.") + self._app = app + self._full_path: Path | None = None + + @property + def parent(self) -> None: + return None + + @property + def app(self) -> AppBase: + return self._app + + @property + def full_path(self) -> Path: + if self._full_path is None: + raise AppError("App root path is not set yet.") + return self._full_path + + def setup(self, path: os.PathLike) -> None: + if self._full_path is not None: + raise AppError("App root path is already set.") + self._full_path = Path(path).resolve() + + +class AppFeatureProvider(ABC): + def __init__(self, name: str, /, app: AppBase | None = None): + super().__init__() + self._name = name + self._app = app if app else AppBase.get_instance() + + @property + def app(self) -> AppBase: + return self._app + + @property + def name(self) -> str: + return self._name + + @abstractmethod + def setup(self) -> None: ... + + +class AppCommandFeatureProvider(AppFeatureProvider): + @abstractmethod + def get_command_info(self) -> tuple[str, str]: ... + + @abstractmethod + def setup_arg_parser(self, arg_parser: ArgumentParser): ... + + @abstractmethod + def run_command(self, args: Namespace) -> None: ... + + +DATA_DIR_NAME = "data" + + +class PathCommandProvider(AppCommandFeatureProvider): + def __init__(self) -> None: + super().__init__("path-command-provider") + + def setup(self): + pass + + def get_command_info(self): + return ("path", "Get information about paths used by app.") + + def setup_arg_parser(self, arg_parser: ArgumentParser) -> None: + subparsers = arg_parser.add_subparsers( + dest="path_command", required=True, metavar="PATH_COMMAND" + ) + _list_parser = subparsers.add_parser( + "list", help="list special paths used by app" + ) + + def run_command(self, args: Namespace) -> None: + if args.path_command == "list": + for path in self.app.paths: + print(f"{path.app_relative_path.as_posix()}: {path.description}") + + +class CommandDispatcher(AppFeatureProvider): + def __init__(self) -> None: + super().__init__("command-dispatcher") + self._parsed_args: argparse.Namespace | None = None + + def setup_arg_parser(self) -> None: + epilog = """ +==> to start, +./tools/manage init +./tools/manage config init +ln -s generated/docker-compose.yaml . +# Then edit config file. + +==> to update +git pull +./tools/manage template generate --no-dry-run +docker compose up + """.strip() + + self._map: dict[str, AppCommandFeatureProvider] = {} + arg_parser = argparse.ArgumentParser( + description="Service management", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=epilog, + ) + arg_parser.add_argument( + "--project-dir", + help="The path of the project directory.", + required=True, + type=str, + ) + subparsers = arg_parser.add_subparsers( + dest="command", + help="The management command to execute.", + metavar="COMMAND", + ) + for feature in self.app.features: + if isinstance(feature, AppCommandFeatureProvider): + info = feature.get_command_info() + command_subparser = subparsers.add_parser(info[0], help=info[1]) + feature.setup_arg_parser(command_subparser) + self._map[info[0]] = feature + self._arg_parser = arg_parser + + def setup(self): + pass + + @property + def arg_parser(self) -> argparse.ArgumentParser: + return self._arg_parser + + @property + def map(self) -> dict[str, AppCommandFeatureProvider]: + return self._map + + def get_program_parsed_args(self) -> argparse.Namespace: + if self._parsed_args is None: + self._parsed_args = self.arg_parser.parse_args() + return self._parsed_args + + def run_command(self, args: argparse.Namespace | None = None) -> None: + real_args = args or self.get_program_parsed_args() + if real_args.command is None: + self.arg_parser.print_help() + return + self.map[real_args.command].run_command(real_args) + + +class AppInitializer(AppCommandFeatureProvider): + def __init__(self) -> None: + super().__init__("app-initializer") + + def _init_app(self) -> bool: + if self.app.app_initialized: + return False + self.app.data_dir.ensure() + return True + + def setup(self): + pass + + def get_command_info(self): + return ("init", "Initialize the app.") + + def setup_arg_parser(self, arg_parser): + pass + + def run_command(self, args): + init = self._init_app() + if init: + print("App initialized successfully.") + else: + print("App is already initialized. Do nothing.") + + +class AppBase: + _instance: AppBase | None = None + + @staticmethod + def get_instance() -> AppBase: + if AppBase._instance is None: + raise AppError("App instance not initialized") + return AppBase._instance + + def __init__(self, app_id: str, name: str): + AppBase._instance = self + self._app_id = app_id + self._name = name + self._root = AppRootPath(self) + self._paths: list[AppFeaturePath] = [] + self._features: list[AppFeatureProvider] = [] + + def setup(self) -> None: + command_dispatcher = self.get_feature(CommandDispatcher) + command_dispatcher.setup_arg_parser() + program_args = command_dispatcher.get_program_parsed_args() + self.setup_root(program_args.project_dir) + self._data_dir = self.add_path(DATA_DIR_NAME, True, id="data") + for feature in self.features: + feature.setup() + for path in self.paths: + path.check_self() + + @property + def app_id(self) -> str: + return self._app_id + + @property + def name(self) -> str: + return self._name + + @property + def root(self) -> AppRootPath: + return self._root + + def setup_root(self, path: os.PathLike) -> None: + self._root.setup(path) + + @property + def data_dir(self) -> AppFeaturePath: + return self._data_dir + + @property + def app_initialized(self) -> bool: + return self.data_dir.check_self() + + def ensure_app_initialized(self) -> AppRootPath: + if not self.app_initialized: + raise AppError( + user_message="Root directory does not exist. " + "Please run 'init' to create one." + ) + return self.root + + @property + def features(self) -> list[AppFeatureProvider]: + return self._features + + @property + def paths(self) -> list[AppFeaturePath]: + return self._paths + + def add_feature(self, feature: _Feature) -> _Feature: + for f in self.features: + if f.name == feature.name: + raise AppFeatureError( + f"Duplicate feature name: {feature.name}.", feature.name + ) + self._features.append(feature) + return feature + + def add_path( + self, + name: str, + is_dir: bool, + /, + parent: AppPath | None = None, + id: str | None = None, + description: str = "", + ) -> AppFeaturePath: + p = AppFeaturePath( + parent or self.root, name, is_dir, id=id, description=description + ) + self._paths.append(p) + return p + + @overload + def get_feature(self, feature: str) -> AppFeatureProvider: ... + + @overload + def get_feature(self, feature: type[_Feature]) -> _Feature: ... + + def get_feature( + self, feature: str | type[_Feature] + ) -> AppFeatureProvider | _Feature: + if isinstance(feature, str): + for f in self._features: + if f.name == feature: + return f + elif isinstance(feature, type): + for f in self._features: + if isinstance(f, feature): + return f + else: + raise CruLogicError("Argument must be the name of feature or its class.") + + raise AppFeatureError(f"Feature {feature} not found.", feature) + + def get_path(self, name: str) -> AppFeaturePath: + for p in self._paths: + if p.id == name or p.name == name: + return p + raise AppPathError(f"Application path {name} not found.", name) diff --git a/tools/cru-py/cru/service/_config.py b/tools/cru-py/cru/service/_config.py new file mode 100644 index 0000000..b51e21c --- /dev/null +++ b/tools/cru-py/cru/service/_config.py @@ -0,0 +1,446 @@ +from collections.abc import Iterable +from typing import Any, Literal, overload + +from cru import CruException +from cru.config import Configuration, ConfigItem +from cru.value import ( + INTEGER_VALUE_TYPE, + TEXT_VALUE_TYPE, + CruValueTypeError, + RandomStringValueGenerator, + UuidValueGenerator, +) +from cru.parsing import ParseError, SimpleLineConfigParser + +from ._base import AppFeaturePath, AppCommandFeatureProvider + + +class AppConfigError(CruException): + def __init__( + self, message: str, configuration: Configuration, *args, **kwargs + ) -> None: + super().__init__(message, *args, **kwargs) + self._configuration = configuration + + @property + def configuration(self) -> Configuration: + return self._configuration + + +class AppConfigFileError(AppConfigError): + def __init__( + self, + message: str, + configuration: Configuration, + *args, + **kwargs, + ) -> None: + super().__init__(message, configuration, *args, **kwargs) + + +class AppConfigFileNotFoundError(AppConfigFileError): + def __init__( + self, + message: str, + configuration: Configuration, + file_path: str, + *args, + **kwargs, + ) -> None: + super().__init__(message, configuration, *args, **kwargs) + self._file_path = file_path + + @property + def file_path(self) -> str: + return self._file_path + + +class AppConfigFileParseError(AppConfigFileError): + def __init__( + self, + message: str, + configuration: Configuration, + file_content: str, + *args, + **kwargs, + ) -> None: + super().__init__(message, configuration, *args, **kwargs) + self._file_content = file_content + self.__cause__: ParseError + + @property + def file_content(self) -> str: + return self._file_content + + def get_user_message(self) -> str: + return f"Error while parsing config file at line {self.__cause__.line_number}." + + +class AppConfigFileEntryError(AppConfigFileError): + def __init__( + self, + message: str, + configuration: Configuration, + entries: Iterable[SimpleLineConfigParser.Entry], + *args, + **kwargs, + ) -> None: + super().__init__(message, configuration, *args, **kwargs) + self._entries = list(entries) + + @property + def error_entries(self) -> list[SimpleLineConfigParser.Entry]: + return self._entries + + @staticmethod + def entries_to_friendly_message( + entries: Iterable[SimpleLineConfigParser.Entry], + ) -> str: + return "\n".join( + f"line {entry.line_number}: {entry.key}={entry.value}" for entry in entries + ) + + @property + def friendly_message_head(self) -> str: + return "Error entries found in config file" + + def get_user_message(self) -> str: + return ( + f"{self.friendly_message_head}:\n" + f"{self.entries_to_friendly_message(self.error_entries)}" + ) + + +class AppConfigDuplicateEntryError(AppConfigFileEntryError): + @property + def friendly_message_head(self) -> str: + return "Duplicate entries found in config file" + + +class AppConfigEntryValueFormatError(AppConfigFileEntryError): + @property + def friendly_message_head(self) -> str: + return "Invalid value format for entries" + + +class AppConfigItemNotSetError(AppConfigError): + def __init__( + self, + message: str, + configuration: Configuration, + items: list[ConfigItem], + *args, + **kwargs, + ) -> None: + super().__init__(message, configuration, *args, **kwargs) + self._items = items + + +class ConfigManager(AppCommandFeatureProvider): + def __init__(self) -> None: + super().__init__("config-manager") + configuration = Configuration() + self._configuration = configuration + self._loaded: bool = False + self._init_app_defined_items() + + def _init_app_defined_items(self) -> None: + prefix = self.config_name_prefix + + def _add_text(name: str, description: str) -> ConfigItem: + item = ConfigItem(f"{prefix}_{name}", description, TEXT_VALUE_TYPE) + self.configuration.add(item) + return item + + def _add_uuid(name: str, description: str) -> ConfigItem: + item = ConfigItem( + f"{prefix}_{name}", + description, + TEXT_VALUE_TYPE, + default=UuidValueGenerator(), + ) + self.configuration.add(item) + return item + + def _add_random_string( + name: str, description: str, length: int = 32, secure: bool = True + ) -> ConfigItem: + item = ConfigItem( + f"{prefix}_{name}", + description, + TEXT_VALUE_TYPE, + default=RandomStringValueGenerator(length, secure), + ) + self.configuration.add(item) + return item + + def _add_int(name: str, description: str) -> ConfigItem: + item = ConfigItem(f"{prefix}_{name}", description, INTEGER_VALUE_TYPE) + self.configuration.add(item) + return item + + self._domain = _add_text("DOMAIN", "domain name") + self._email = _add_text("EMAIL", "admin email address") + _add_text( + "AUTO_BACKUP_COS_SECRET_ID", + "access key id for Tencent COS, used for auto backup", + ) + _add_text( + "AUTO_BACKUP_COS_SECRET_KEY", + "access key secret for Tencent COS, used for auto backup", + ) + _add_text( + "AUTO_BACKUP_COS_REGION", "region for Tencent COS, used for auto backup" + ) + _add_text( + "AUTO_BACKUP_BUCKET_NAME", + "bucket name for Tencent COS, used for auto backup", + ) + _add_text("GITHUB_USERNAME", "github username for fetching todos") + _add_int("GITHUB_PROJECT_NUMBER", "github project number for fetching todos") + _add_text("GITHUB_TOKEN", "github token for fetching todos") + _add_text("GITHUB_TODO_COUNT", "github todo count") + _add_uuid("V2RAY_TOKEN", "v2ray user id") + _add_uuid("V2RAY_PATH", "v2ray path, which will be prefixed by _") + _add_text("FORGEJO_MAILER_USER", "Forgejo SMTP user") + _add_text("FORGEJO_MAILER_PASSWD", "Forgejo SMTP password") + _add_random_string("2FAUTH_APP_KEY", "2FAuth App Key") + _add_text("2FAUTH_MAIL_USERNAME", "2FAuth SMTP user") + _add_text("2FAUTH_MAIL_PASSWORD", "2FAuth SMTP password") + + def setup(self) -> None: + self._config_file_path = self.app.data_dir.add_subpath( + "config", False, description="Configuration file path." + ) + + @property + def config_name_prefix(self) -> str: + return self.app.app_id.upper() + + @property + def configuration(self) -> Configuration: + return self._configuration + + @property + def config_file_path(self) -> AppFeaturePath: + return self._config_file_path + + @property + def all_set(self) -> bool: + return self.configuration.all_set + + def get_item(self, name: str) -> ConfigItem[Any]: + if not name.startswith(self.config_name_prefix + "_"): + name = f"{self.config_name_prefix}_{name}" + + item = self.configuration.get_or(name, None) + if item is None: + raise AppConfigError(f"Config item '{name}' not found.", self.configuration) + return item + + @overload + def get_item_value_str(self, name: str) -> str: ... + + @overload + def get_item_value_str(self, name: str, ensure_set: Literal[True]) -> str: ... + + @overload + def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: ... + + def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: + self.load_config_file() + item = self.get_item(name) + if not item.is_set: + if ensure_set: + raise AppConfigItemNotSetError( + f"Config item '{name}' is not set.", self.configuration, [item] + ) + return None + return item.value_str + + def get_str_dict(self, ensure_all_set: bool = True) -> dict[str, str]: + self.load_config_file() + if ensure_all_set and not self.configuration.all_set: + raise AppConfigItemNotSetError( + "Some config items are not set.", + self.configuration, + self.configuration.get_unset_items(), + ) + return self.configuration.to_str_dict() + + @property + def domain_item_name(self) -> str: + return self._domain.name + + def get_domain_value_str(self) -> str: + return self.get_item_value_str(self._domain.name) + + def get_email_value_str_optional(self) -> str | None: + return self.get_item_value_str(self._email.name, ensure_set=False) + + def _set_with_default(self) -> None: + if not self.configuration.all_not_set: + raise AppConfigError( + "Config is not clean. " + "Some config items are already set. " + "Can't set again with default value.", + self.configuration, + ) + for item in self.configuration: + if item.can_generate_default: + item.set_value(item.generate_default_value()) + + def _to_config_file_content(self) -> str: + content = "".join( + [ + f"{item.name}={item.value_str if item.is_set else ''}\n" + for item in self.configuration + ] + ) + return content + + def _create_init_config_file(self) -> None: + if self.config_file_path.check_self(): + raise AppConfigError( + "Config file already exists.", + self.configuration, + user_message=f"The config file at " + f"{self.config_file_path.full_path_str} already exists.", + ) + self._set_with_default() + self.config_file_path.ensure() + with open( + self.config_file_path.full_path, "w", encoding="utf-8", newline="\n" + ) as file: + file.write(self._to_config_file_content()) + + def _parse_config_file(self) -> SimpleLineConfigParser.Result: + if not self.config_file_path.check_self(): + raise AppConfigFileNotFoundError( + "Config file not found.", + self.configuration, + self.config_file_path.full_path_str, + user_message=f"The config file at " + f"{self.config_file_path.full_path_str} does not exist. " + f"You can create an initial one with 'init' command.", + ) + + text = self.config_file_path.full_path.read_text() + try: + parser = SimpleLineConfigParser() + return parser.parse(text) + except ParseError as e: + raise AppConfigFileParseError( + "Failed to parse config file.", self.configuration, text + ) from e + + def _parse_and_print_config_file(self) -> None: + parse_result = self._parse_config_file() + for entry in parse_result: + print(f"{entry.key}={entry.value}") + + def _check_duplicate( + self, + parse_result: dict[str, list[SimpleLineConfigParser.Entry]], + ) -> dict[str, SimpleLineConfigParser.Entry]: + entry_dict: dict[str, SimpleLineConfigParser.Entry] = {} + duplicate_entries: list[SimpleLineConfigParser.Entry] = [] + for key, entries in parse_result.items(): + entry_dict[key] = entries[0] + if len(entries) > 1: + duplicate_entries.extend(entries) + if len(duplicate_entries) > 0: + raise AppConfigDuplicateEntryError( + "Duplicate entries found.", self.configuration, duplicate_entries + ) + + return entry_dict + + def _check_type( + self, entry_dict: dict[str, SimpleLineConfigParser.Entry] + ) -> dict[str, Any]: + value_dict: dict[str, Any] = {} + error_entries: list[SimpleLineConfigParser.Entry] = [] + errors: list[CruValueTypeError] = [] + for key, entry in entry_dict.items(): + config_item = self.configuration.get(key) + try: + if entry.value == "": + value_dict[key] = None + else: + value_dict[key] = config_item.value_type.convert_str_to_value( + entry.value + ) + except CruValueTypeError as e: + error_entries.append(entry) + errors.append(e) + if len(error_entries) > 0: + raise AppConfigEntryValueFormatError( + "Entry value format is not correct.", + self.configuration, + error_entries, + ) from ExceptionGroup("Multiple format errors occurred.", errors) + return value_dict + + def _read_config_file(self) -> dict[str, Any]: + parsed = self._parse_config_file() + entry_groups = parsed.cru_iter().group_by(lambda e: e.key) + entry_dict = self._check_duplicate(entry_groups) + value_dict = self._check_type(entry_dict) + return value_dict + + def _real_load_config_file(self) -> None: + self.configuration.reset_all() + value_dict = self._read_config_file() + for key, value in value_dict.items(): + if value is None: + continue + self.configuration.set_config_item(key, value) + + def load_config_file(self, force=False) -> None: + if force or not self._loaded: + self._real_load_config_file() + self._loaded = True + + def _print_app_config_info(self): + for item in self.configuration: + print(item.description_str) + + def get_command_info(self): + return "config", "Manage configuration." + + def setup_arg_parser(self, arg_parser) -> None: + subparsers = arg_parser.add_subparsers( + dest="config_command", required=True, metavar="CONFIG_COMMAND" + ) + _init_parser = subparsers.add_parser( + "init", help="create an initial config file" + ) + _print_app_parser = subparsers.add_parser( + "print-app", + help="print information of the config items defined by app", + ) + _print_parser = subparsers.add_parser("print", help="print current config") + _check_config_parser = subparsers.add_parser( + "check", + help="check the validity of the config file", + ) + _check_config_parser.add_argument( + "-f", + "--format-only", + action="store_true", + help="only check content format, not app config item requirements.", + ) + + def run_command(self, args) -> None: + if args.config_command == "init": + self._create_init_config_file() + elif args.config_command == "print-app": + self._print_app_config_info() + elif args.config_command == "print": + self._parse_and_print_config_file() + elif args.config_command == "check": + if args.format_only: + self._parse_config_file() + else: + self._read_config_file() diff --git a/tools/cru-py/cru/service/_external.py b/tools/cru-py/cru/service/_external.py new file mode 100644 index 0000000..2347e95 --- /dev/null +++ b/tools/cru-py/cru/service/_external.py @@ -0,0 +1,81 @@ +from ._base import AppCommandFeatureProvider +from ._nginx import NginxManager + + +class CliToolCommandProvider(AppCommandFeatureProvider): + def __init__(self) -> None: + super().__init__("cli-tool-command-provider") + + def setup(self): + pass + + def get_command_info(self): + return ("gen-cli", "Get commands of running external cli tools.") + + def setup_arg_parser(self, arg_parser): + subparsers = arg_parser.add_subparsers( + dest="gen_cli_command", required=True, metavar="GEN_CLI_COMMAND" + ) + certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") + certbot_parser.add_argument( + "-t", "--test", action="store_true", help="run certbot in test mode" + ) + _install_docker_parser = subparsers.add_parser( + "install-docker", help="print docker installation commands" + ) + _update_blog_parser = subparsers.add_parser( + "update-blog", help="print blog update command" + ) + + def _print_install_docker_commands(self) -> None: + output = """ +### COMMAND: uninstall apt docker +for pkg in docker.io docker-doc docker-compose \ +podman-docker containerd runc; \ +do sudo apt-get remove $pkg; done + +### COMMAND: prepare apt certs +sudo apt-get update +sudo apt-get install ca-certificates curl +sudo install -m 0755 -d /etc/apt/keyrings + +### COMMAND: install certs +sudo curl -fsSL https://download.docker.com/linux/debian/gpg \ +-o /etc/apt/keyrings/docker.asc +sudo chmod a+r /etc/apt/keyrings/docker.asc + +### COMMAND: add docker apt source +echo \\ + "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \ +https://download.docker.com/linux/debian \\ + $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\ + sudo tee /etc/apt/sources.list.d/docker.list > /dev/null + +### COMMAND: update apt and install docker +sudo apt-get update +sudo apt-get install docker-ce docker-ce-cli containerd.io \ +docker-buildx-plugin docker-compose-plugin + +### COMMAND: setup system for docker +sudo systemctl enable docker +sudo systemctl start docker +sudo groupadd -f docker +sudo usermod -aG docker $USER +# Remember to log out and log back in for the group changes to take effect +""".strip() + print(output) + + def _print_update_blog_command(self): + output = """ +### COMMAND: update blog +docker exec -it blog /scripts/update.bash +""".strip() + print(output) + + def run_command(self, args): + if args.gen_cli_command == "certbot": + self.app.get_feature(NginxManager).print_all_certbot_commands(args.test) + elif args.gen_cli_command == "install-docker": + self._print_install_docker_commands() + elif args.gen_cli_command == "update-blog": + self._print_update_blog_command()
\ No newline at end of file diff --git a/tools/cru-py/cru/service/_nginx.py b/tools/cru-py/cru/service/_nginx.py new file mode 100644 index 0000000..e0a9c60 --- /dev/null +++ b/tools/cru-py/cru/service/_nginx.py @@ -0,0 +1,281 @@ +from argparse import Namespace +from enum import Enum, auto +import re +import subprocess +from typing import TypeAlias + +from cru import CruInternalError + +from ._base import AppCommandFeatureProvider +from ._config import ConfigManager +from ._template import TemplateManager + + +class CertbotAction(Enum): + CREATE = auto() + EXPAND = auto() + SHRINK = auto() + RENEW = auto() + + +class NginxManager(AppCommandFeatureProvider): + CertbotAction: TypeAlias = CertbotAction + + def __init__(self) -> None: + super().__init__("nginx-manager") + self._domains_cache: list[str] | None = None + + def setup(self) -> None: + pass + + @property + def _config_manager(self) -> ConfigManager: + return self.app.get_feature(ConfigManager) + + @property + def root_domain(self) -> str: + return self._config_manager.get_domain_value_str() + + @property + def domains(self) -> list[str]: + if self._domains_cache is None: + self._domains_cache = self._get_domains() + return self._domains_cache + + @property + def subdomains(self) -> list[str]: + suffix = "." + self.root_domain + return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)] + + @property + def _domain_config_name(self) -> str: + return self._config_manager.domain_item_name + + def _get_domains_from_text(self, text: str) -> set[str]: + domains: set[str] = set() + regex = re.compile(r"server_name\s+(\S+)\s*;") + domain_variable_str = f"${self._domain_config_name}" + brace_domain_variable_regex = re.compile( + r"\$\{\s*" + self._domain_config_name + r"\s*\}" + ) + for match in regex.finditer(text): + domain_part = match.group(1) + if domain_variable_str in domain_part: + domains.add(domain_part.replace(domain_variable_str, self.root_domain)) + continue + m = brace_domain_variable_regex.search(domain_part) + if m: + domains.add(domain_part.replace(m.group(0), self.root_domain)) + continue + domains.add(domain_part) + return domains + + def _get_nginx_conf_template_text(self) -> str: + template_manager = self.app.get_feature(TemplateManager) + text = "" + for path, template in template_manager.template_tree.templates: + if path.as_posix().startswith("nginx/"): + text += template.raw_text + return text + + def _get_domains(self) -> list[str]: + text = self._get_nginx_conf_template_text() + domains = list(self._get_domains_from_text(text)) + domains.remove(self.root_domain) + return [self.root_domain, *domains] + + def _print_domains(self) -> None: + for domain in self.domains: + print(domain) + + def _certbot_command( + self, + action: CertbotAction | str, + test: bool, + *, + docker=True, + standalone=None, + email=None, + agree_tos=True, + ) -> str: + if isinstance(action, str): + action = CertbotAction[action.upper()] + + command_args = [] + + add_domain_option = True + if action is CertbotAction.CREATE: + if standalone is None: + standalone = True + command_action = "certonly" + elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]: + if standalone is None: + standalone = False + command_action = "certonly" + elif action is CertbotAction.RENEW: + if standalone is None: + standalone = False + add_domain_option = False + command_action = "renew" + else: + raise CruInternalError("Invalid certbot action.") + + data_dir = self.app.data_dir.full_path.as_posix() + + if not docker: + command_args.append("certbot") + else: + command_args.extend( + [ + "docker run -it --rm --name certbot", + f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"', + f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"', + ] + ) + if standalone: + command_args.append('-p "0.0.0.0:80:80"') + else: + command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"') + + command_args.append("certbot/certbot") + + command_args.append(command_action) + + command_args.append(f"--cert-name {self.root_domain}") + + if standalone: + command_args.append("--standalone") + else: + command_args.append("--webroot -w /var/www/certbot") + + if add_domain_option: + command_args.append(" ".join([f"-d {domain}" for domain in self.domains])) + + if email is not None: + command_args.append(f"--email {email}") + + if agree_tos: + command_args.append("--agree-tos") + + if test: + command_args.append("--test-cert --dry-run") + + return " ".join(command_args) + + def print_all_certbot_commands(self, test: bool): + print("### COMMAND: (standalone) create certs") + print( + self._certbot_command( + CertbotAction.CREATE, + test, + email=self._config_manager.get_email_value_str_optional(), + ) + ) + print() + print("### COMMAND: (webroot+nginx) expand or shrink certs") + print( + self._certbot_command( + CertbotAction.EXPAND, + test, + email=self._config_manager.get_email_value_str_optional(), + ) + ) + print() + print("### COMMAND: (webroot+nginx) renew certs") + print( + self._certbot_command( + CertbotAction.RENEW, + test, + email=self._config_manager.get_email_value_str_optional(), + ) + ) + + @property + def _cert_path_str(self) -> str: + return str( + self.app.data_dir.full_path + / "certbot/certs/live" + / self.root_domain + / "fullchain.pem" + ) + + def get_command_info(self): + return "nginx", "Manage nginx related things." + + def setup_arg_parser(self, arg_parser): + subparsers = arg_parser.add_subparsers( + dest="nginx_command", required=True, metavar="NGINX_COMMAND" + ) + _list_parser = subparsers.add_parser("list", help="list domains") + certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") + certbot_parser.add_argument( + "--no-test", + action="store_true", + help="remove args making certbot run in test mode", + ) + + def run_command(self, args: Namespace) -> None: + if args.nginx_command == "list": + self._print_domains() + elif args.nginx_command == "certbot": + self.print_all_certbot_commands(not args.no_test) + + def _generate_dns_zone( + self, + ip: str, + /, + ttl: str | int = 600, + *, + enable_mail: bool = True, + dkim: str | None = None, + ) -> str: + # TODO: Not complete and test now. + root_domain = self.root_domain + result = f"$ORIGIN {root_domain}.\n\n" + result += "; A records\n" + result += f"@ {ttl} IN A {ip}\n" + for subdomain in self.subdomains: + result += f"{subdomain} {ttl} IN A {ip}\n" + + if enable_mail: + result += "\n; MX records\n" + result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n" + result += "\n; SPF record\n" + result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n' + if dkim is not None: + result += "\n; DKIM record\n" + result += f'mail._domainkey {ttl} IN TEXT "{dkim}"' + result += "\n; DMARC record\n" + dmarc_options = [ + "v=DMARC1", + "p=none", + f"rua=mailto:dmarc.report@{root_domain}", + f"ruf=mailto:dmarc.report@{root_domain}", + "sp=none", + "ri=86400", + ] + result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n' + return result + + def _get_dkim_from_mailserver(self) -> str | None: + # TODO: Not complete and test now. + dkim_path = ( + self.app.data_dir.full_path + / "dms/config/opendkim/keys" + / self.root_domain + / "mail.txt" + ) + if not dkim_path.exists(): + return None + + p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True) + value = "" + for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")): + value += match.group(1) + return value + + def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str: + # TODO: Not complete and test now. + return self._generate_dns_zone( + ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver() + ) diff --git a/tools/cru-py/cru/service/_template.py b/tools/cru-py/cru/service/_template.py new file mode 100644 index 0000000..170116c --- /dev/null +++ b/tools/cru-py/cru/service/_template.py @@ -0,0 +1,86 @@ +from argparse import Namespace +import shutil + +from cru import CruIterator +from cru.template import TemplateTree + +from ._base import AppCommandFeatureProvider, AppFeaturePath +from ._config import ConfigManager + + +class TemplateManager(AppCommandFeatureProvider): + def __init__(self, prefix: str | None = None): + super().__init__("template-manager") + self._prefix = prefix or self.app.app_id.upper() + + def setup(self) -> None: + self._templates_dir = self.app.add_path("templates", True) + self._generated_dir = self.app.add_path("generated", True) + self._template_tree: TemplateTree | None = None + + @property + def prefix(self) -> str: + return self._prefix + + @property + def templates_dir(self) -> AppFeaturePath: + return self._templates_dir + + @property + def generated_dir(self) -> AppFeaturePath: + return self._generated_dir + + @property + def template_tree(self) -> TemplateTree: + if self._template_tree is None: + return self.reload() + return self._template_tree + + def reload(self) -> TemplateTree: + self._template_tree = TemplateTree( + self.prefix, self.templates_dir.full_path_str + ) + return self._template_tree + + def _print_file_lists(self) -> None: + for file in CruIterator(self.template_tree.templates).transform(lambda t: t[0]): + print(file.as_posix()) + + def _generate_files(self, dry_run: bool) -> None: + config_manager = self.app.get_feature(ConfigManager) + if not dry_run and self.generated_dir.full_path.exists(): + shutil.rmtree(self.generated_dir.full_path) + self.template_tree.generate_to( + self.generated_dir.full_path_str, config_manager.get_str_dict(), dry_run + ) + + def get_command_info(self): + return ("template", "Manage templates.") + + def setup_arg_parser(self, arg_parser): + subparsers = arg_parser.add_subparsers( + dest="template_command", required=True, metavar="TEMPLATE_COMMAND" + ) + _list_parser = subparsers.add_parser("list", help="list templates") + _variables_parser = subparsers.add_parser( + "variables", help="list variables used in all templates" + ) + generate_parser = subparsers.add_parser("generate", help="generate templates") + generate_parser.add_argument( + "--no-dry-run", action="store_true", help="generate and write target files" + ) + + def run_command(self, args: Namespace) -> None: + if args.template_command == "list": + self._print_file_lists() + elif args.template_command == "variables": + for var in self.template_tree.variables: + print(var) + elif args.template_command == "generate": + dry_run = not args.no_dry_run + self._generate_files(dry_run) + if dry_run: + print("Dry run successfully.") + print( + f"Will delete dir {self.generated_dir.full_path_str} if it exists." + ) |