diff options
author | Yuqian Yang <crupest@crupest.life> | 2025-02-22 18:11:35 +0800 |
---|---|---|
committer | Yuqian Yang <crupest@crupest.life> | 2025-02-23 01:36:11 +0800 |
commit | 1e9b2436eaffa4130f6a69c3a108f6feb9dd4ac8 (patch) | |
tree | 585b6124b0100371b4bd8a291c4a59fbb5fbf1fe /tools/cru-py/cru/service | |
parent | a931457d61b053682d5e89a0cfb411e43e5e21c7 (diff) | |
download | crupest-1e9b2436eaffa4130f6a69c3a108f6feb9dd4ac8.tar.gz crupest-1e9b2436eaffa4130f6a69c3a108f6feb9dd4ac8.tar.bz2 crupest-1e9b2436eaffa4130f6a69c3a108f6feb9dd4ac8.zip |
feat(services): refactor structure.
Diffstat (limited to 'tools/cru-py/cru/service')
-rw-r--r-- | tools/cru-py/cru/service/__init__.py | 0 | ||||
-rw-r--r-- | tools/cru-py/cru/service/__main__.py | 20 | ||||
-rw-r--r-- | tools/cru-py/cru/service/_app.py | 34 | ||||
-rw-r--r-- | tools/cru-py/cru/service/_base.py | 449 | ||||
-rw-r--r-- | tools/cru-py/cru/service/_config.py | 444 | ||||
-rw-r--r-- | tools/cru-py/cru/service/_external.py | 81 | ||||
-rw-r--r-- | tools/cru-py/cru/service/_nginx.py | 268 | ||||
-rw-r--r-- | tools/cru-py/cru/service/_template.py | 90 |
8 files changed, 0 insertions, 1386 deletions
diff --git a/tools/cru-py/cru/service/__init__.py b/tools/cru-py/cru/service/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/tools/cru-py/cru/service/__init__.py +++ /dev/null diff --git a/tools/cru-py/cru/service/__main__.py b/tools/cru-py/cru/service/__main__.py deleted file mode 100644 index 1c10e82..0000000 --- a/tools/cru-py/cru/service/__main__.py +++ /dev/null @@ -1,20 +0,0 @@ -from cru import CruException - -from ._app import create_app - - -def main(): - app = create_app() - app.run_command() - - -if __name__ == "__main__": - try: - main() - except CruException as e: - user_message = e.get_user_message() - if user_message is not None: - print(f"Error: {user_message}") - exit(1) - else: - raise diff --git a/tools/cru-py/cru/service/_app.py b/tools/cru-py/cru/service/_app.py deleted file mode 100644 index 6030dad..0000000 --- a/tools/cru-py/cru/service/_app.py +++ /dev/null @@ -1,34 +0,0 @@ -from ._base import ( - AppBase, - CommandDispatcher, - AppInitializer, - PathCommandProvider, -) -from ._config import ConfigManager -from ._template import TemplateManager -from ._nginx import NginxManager -from ._external import CliToolCommandProvider - -APP_ID = "crupest" - - -class App(AppBase): - def __init__(self): - super().__init__(APP_ID, f"{APP_ID}-service") - self.add_feature(PathCommandProvider()) - self.add_feature(AppInitializer()) - self.add_feature(ConfigManager()) - self.add_feature(TemplateManager()) - self.add_feature(NginxManager()) - self.add_feature(CliToolCommandProvider()) - self.add_feature(CommandDispatcher()) - - def run_command(self): - command_dispatcher = self.get_feature(CommandDispatcher) - command_dispatcher.run_command() - - -def create_app() -> App: - app = App() - app.setup() - return app diff --git a/tools/cru-py/cru/service/_base.py b/tools/cru-py/cru/service/_base.py deleted file mode 100644 index ad813c9..0000000 --- a/tools/cru-py/cru/service/_base.py +++ /dev/null @@ -1,449 +0,0 @@ -from __future__ import annotations - -from argparse import ArgumentParser, Namespace -from abc import ABC, abstractmethod -import argparse -import os -from pathlib import Path -from typing import TypeVar, overload - -from cru import CruException, CruLogicError - -_Feature = TypeVar("_Feature", bound="AppFeatureProvider") - - -class AppError(CruException): - pass - - -class AppFeatureError(AppError): - def __init__(self, message, feature: type | str, *args, **kwargs): - super().__init__(message, *args, **kwargs) - self._feature = feature - - @property - def feature(self) -> type | str: - return self._feature - - -class AppPathError(CruException): - def __init__(self, message, _path: str | Path, *args, **kwargs): - super().__init__(message, *args, **kwargs) - self._path = str(_path) - - @property - def path(self) -> str: - return self._path - - -class AppPath(ABC): - def __init__(self, id: str, is_dir: bool, description: str) -> None: - self._is_dir = is_dir - self._id = id - self._description = description - - @property - @abstractmethod - def parent(self) -> AppPath | None: ... - - @property - @abstractmethod - def app(self) -> AppBase: ... - - @property - def id(self) -> str: - return self._id - - @property - def description(self) -> str: - return self._description - - @property - def is_dir(self) -> bool: - return self._is_dir - - @property - @abstractmethod - def full_path(self) -> Path: ... - - @property - def full_path_str(self) -> str: - return str(self.full_path) - - def check_parents(self, must_exist: bool = False) -> bool: - for p in reversed(self.full_path.parents): - if not p.exists() and not must_exist: - return False - if not p.is_dir(): - raise AppPathError("Parents' path must be a dir.", self.full_path) - return True - - def check_self(self, must_exist: bool = False) -> bool: - if not self.check_parents(must_exist): - return False - if not self.full_path.exists(): - if not must_exist: - return False - raise AppPathError("Not exist.", self.full_path) - if self.is_dir: - if not self.full_path.is_dir(): - raise AppPathError("Should be a directory, but not.", self.full_path) - else: - return True - else: - if not self.full_path.is_file(): - raise AppPathError("Should be a file, but not.", self.full_path) - else: - return True - - def ensure(self, create_file: bool = False) -> None: - e = self.check_self(False) - if not e: - os.makedirs(self.full_path.parent, exist_ok=True) - if self.is_dir: - os.mkdir(self.full_path) - elif create_file: - with open(self.full_path, "w") as f: - f.write("") - - def add_subpath( - self, - name: str, - is_dir: bool, - /, - id: str | None = None, - description: str = "", - ) -> AppFeaturePath: - return self.app.add_path(name, is_dir, self, id, description) - - @property - def app_relative_path(self) -> Path: - return self.full_path.relative_to(self.app.root.full_path) - - -class AppFeaturePath(AppPath): - def __init__( - self, - parent: AppPath, - name: str, - is_dir: bool, - /, - id: str | None = None, - description: str = "", - ) -> None: - super().__init__(id or name, is_dir, description) - self._name = name - self._parent = parent - - @property - def name(self) -> str: - return self._name - - @property - def parent(self) -> AppPath: - return self._parent - - @property - def app(self) -> AppBase: - return self.parent.app - - @property - def full_path(self) -> Path: - return Path(self.parent.full_path, self.name).resolve() - - -class AppRootPath(AppPath): - def __init__(self, app: AppBase): - super().__init__("root", True, "Application root path.") - self._app = app - self._full_path: Path | None = None - - @property - def parent(self) -> None: - return None - - @property - def app(self) -> AppBase: - return self._app - - @property - def full_path(self) -> Path: - if self._full_path is None: - raise AppError("App root path is not set yet.") - return self._full_path - - def setup(self, path: os.PathLike) -> None: - if self._full_path is not None: - raise AppError("App root path is already set.") - self._full_path = Path(path).resolve() - - -class AppFeatureProvider(ABC): - def __init__(self, name: str, /, app: AppBase | None = None): - super().__init__() - self._name = name - self._app = app if app else AppBase.get_instance() - - @property - def app(self) -> AppBase: - return self._app - - @property - def name(self) -> str: - return self._name - - @abstractmethod - def setup(self) -> None: ... - - -class AppCommandFeatureProvider(AppFeatureProvider): - @abstractmethod - def get_command_info(self) -> tuple[str, str]: ... - - @abstractmethod - def setup_arg_parser(self, arg_parser: ArgumentParser): ... - - @abstractmethod - def run_command(self, args: Namespace) -> None: ... - - -DATA_DIR_NAME = "data" - - -class PathCommandProvider(AppCommandFeatureProvider): - def __init__(self) -> None: - super().__init__("path-command-provider") - - def setup(self): - pass - - def get_command_info(self): - return ("path", "Get information about paths used by app.") - - def setup_arg_parser(self, arg_parser: ArgumentParser) -> None: - subparsers = arg_parser.add_subparsers( - dest="path_command", required=True, metavar="PATH_COMMAND" - ) - _list_parser = subparsers.add_parser( - "list", help="list special paths used by app" - ) - - def run_command(self, args: Namespace) -> None: - if args.path_command == "list": - for path in self.app.paths: - print(f"{path.app_relative_path.as_posix()}: {path.description}") - - -class CommandDispatcher(AppFeatureProvider): - def __init__(self) -> None: - super().__init__("command-dispatcher") - self._parsed_args: argparse.Namespace | None = None - - def setup_arg_parser(self) -> None: - epilog = """ -==> to start, -./tools/manage init -./tools/manage config init -ln -s generated/docker-compose.yaml . -# Then edit config file. - -==> to update -git pull -./tools/manage template generate --no-dry-run -docker compose up - """.strip() - - self._map: dict[str, AppCommandFeatureProvider] = {} - arg_parser = argparse.ArgumentParser( - description="Service management", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=epilog, - ) - arg_parser.add_argument( - "--project-dir", - help="The path of the project directory.", - required=True, - type=str, - ) - subparsers = arg_parser.add_subparsers( - dest="command", - help="The management command to execute.", - metavar="COMMAND", - ) - for feature in self.app.features: - if isinstance(feature, AppCommandFeatureProvider): - info = feature.get_command_info() - command_subparser = subparsers.add_parser(info[0], help=info[1]) - feature.setup_arg_parser(command_subparser) - self._map[info[0]] = feature - self._arg_parser = arg_parser - - def setup(self): - pass - - @property - def arg_parser(self) -> argparse.ArgumentParser: - return self._arg_parser - - @property - def map(self) -> dict[str, AppCommandFeatureProvider]: - return self._map - - def get_program_parsed_args(self) -> argparse.Namespace: - if self._parsed_args is None: - self._parsed_args = self.arg_parser.parse_args() - return self._parsed_args - - def run_command(self, args: argparse.Namespace | None = None) -> None: - real_args = args or self.get_program_parsed_args() - if real_args.command is None: - self.arg_parser.print_help() - return - self.map[real_args.command].run_command(real_args) - - -class AppInitializer(AppCommandFeatureProvider): - def __init__(self) -> None: - super().__init__("app-initializer") - - def _init_app(self) -> bool: - if self.app.app_initialized: - return False - self.app.data_dir.ensure() - return True - - def setup(self): - pass - - def get_command_info(self): - return ("init", "Initialize the app.") - - def setup_arg_parser(self, arg_parser): - pass - - def run_command(self, args): - init = self._init_app() - if init: - print("App initialized successfully.") - else: - print("App is already initialized. Do nothing.") - - -class AppBase: - _instance: AppBase | None = None - - @staticmethod - def get_instance() -> AppBase: - if AppBase._instance is None: - raise AppError("App instance not initialized") - return AppBase._instance - - def __init__(self, app_id: str, name: str): - AppBase._instance = self - self._app_id = app_id - self._name = name - self._root = AppRootPath(self) - self._paths: list[AppFeaturePath] = [] - self._features: list[AppFeatureProvider] = [] - - def setup(self) -> None: - command_dispatcher = self.get_feature(CommandDispatcher) - command_dispatcher.setup_arg_parser() - program_args = command_dispatcher.get_program_parsed_args() - self.setup_root(program_args.project_dir) - self._data_dir = self.add_path(DATA_DIR_NAME, True, id="data") - for feature in self.features: - feature.setup() - for path in self.paths: - path.check_self() - - @property - def app_id(self) -> str: - return self._app_id - - @property - def name(self) -> str: - return self._name - - @property - def root(self) -> AppRootPath: - return self._root - - def setup_root(self, path: os.PathLike) -> None: - self._root.setup(path) - - @property - def data_dir(self) -> AppFeaturePath: - return self._data_dir - - @property - def app_initialized(self) -> bool: - return self.data_dir.check_self() - - def ensure_app_initialized(self) -> AppRootPath: - if not self.app_initialized: - raise AppError( - user_message="Root directory does not exist. " - "Please run 'init' to create one." - ) - return self.root - - @property - def features(self) -> list[AppFeatureProvider]: - return self._features - - @property - def paths(self) -> list[AppFeaturePath]: - return self._paths - - def add_feature(self, feature: _Feature) -> _Feature: - for f in self.features: - if f.name == feature.name: - raise AppFeatureError( - f"Duplicate feature name: {feature.name}.", feature.name - ) - self._features.append(feature) - return feature - - def add_path( - self, - name: str, - is_dir: bool, - /, - parent: AppPath | None = None, - id: str | None = None, - description: str = "", - ) -> AppFeaturePath: - p = AppFeaturePath( - parent or self.root, name, is_dir, id=id, description=description - ) - self._paths.append(p) - return p - - @overload - def get_feature(self, feature: str) -> AppFeatureProvider: ... - - @overload - def get_feature(self, feature: type[_Feature]) -> _Feature: ... - - def get_feature( - self, feature: str | type[_Feature] - ) -> AppFeatureProvider | _Feature: - if isinstance(feature, str): - for f in self._features: - if f.name == feature: - return f - elif isinstance(feature, type): - for f in self._features: - if isinstance(f, feature): - return f - else: - raise CruLogicError("Argument must be the name of feature or its class.") - - raise AppFeatureError(f"Feature {feature} not found.", feature) - - def get_path(self, name: str) -> AppFeaturePath: - for p in self._paths: - if p.id == name or p.name == name: - return p - raise AppPathError(f"Application path {name} not found.", name) diff --git a/tools/cru-py/cru/service/_config.py b/tools/cru-py/cru/service/_config.py deleted file mode 100644 index cbb9533..0000000 --- a/tools/cru-py/cru/service/_config.py +++ /dev/null @@ -1,444 +0,0 @@ -from collections.abc import Iterable -from typing import Any, Literal, overload - -from cru import CruException, CruNotFound -from cru.config import Configuration, ConfigItem -from cru.value import ( - INTEGER_VALUE_TYPE, - TEXT_VALUE_TYPE, - CruValueTypeError, - RandomStringValueGenerator, - UuidValueGenerator, -) -from cru.parsing import ParseError, SimpleLineConfigParser - -from ._base import AppFeaturePath, AppCommandFeatureProvider - - -class AppConfigError(CruException): - def __init__( - self, message: str, configuration: Configuration, *args, **kwargs - ) -> None: - super().__init__(message, *args, **kwargs) - self._configuration = configuration - - @property - def configuration(self) -> Configuration: - return self._configuration - - -class AppConfigFileError(AppConfigError): - def __init__( - self, - message: str, - configuration: Configuration, - *args, - **kwargs, - ) -> None: - super().__init__(message, configuration, *args, **kwargs) - - -class AppConfigFileNotFoundError(AppConfigFileError): - def __init__( - self, - message: str, - configuration: Configuration, - file_path: str, - *args, - **kwargs, - ) -> None: - super().__init__(message, configuration, *args, **kwargs) - self._file_path = file_path - - @property - def file_path(self) -> str: - return self._file_path - - -class AppConfigFileParseError(AppConfigFileError): - def __init__( - self, - message: str, - configuration: Configuration, - file_content: str, - *args, - **kwargs, - ) -> None: - super().__init__(message, configuration, *args, **kwargs) - self._file_content = file_content - self.__cause__: ParseError - - @property - def file_content(self) -> str: - return self._file_content - - def get_user_message(self) -> str: - return f"Error while parsing config file at line {self.__cause__.line_number}." - - -class AppConfigFileEntryError(AppConfigFileError): - def __init__( - self, - message: str, - configuration: Configuration, - entries: Iterable[SimpleLineConfigParser.Entry], - *args, - **kwargs, - ) -> None: - super().__init__(message, configuration, *args, **kwargs) - self._entries = list(entries) - - @property - def error_entries(self) -> list[SimpleLineConfigParser.Entry]: - return self._entries - - @staticmethod - def entries_to_friendly_message( - entries: Iterable[SimpleLineConfigParser.Entry], - ) -> str: - return "\n".join( - f"line {entry.line_number}: {entry.key}={entry.value}" for entry in entries - ) - - @property - def friendly_message_head(self) -> str: - return "Error entries found in config file" - - def get_user_message(self) -> str: - return ( - f"{self.friendly_message_head}:\n" - f"{self.entries_to_friendly_message(self.error_entries)}" - ) - - -class AppConfigDuplicateEntryError(AppConfigFileEntryError): - @property - def friendly_message_head(self) -> str: - return "Duplicate entries found in config file" - - -class AppConfigEntryValueFormatError(AppConfigFileEntryError): - @property - def friendly_message_head(self) -> str: - return "Invalid value format for entries" - - -class AppConfigItemNotSetError(AppConfigError): - def __init__( - self, - message: str, - configuration: Configuration, - items: list[ConfigItem], - *args, - **kwargs, - ) -> None: - super().__init__(message, configuration, *args, **kwargs) - self._items = items - - -class ConfigManager(AppCommandFeatureProvider): - def __init__(self) -> None: - super().__init__("config-manager") - configuration = Configuration() - self._configuration = configuration - self._loaded: bool = False - self._init_app_defined_items() - - def _init_app_defined_items(self) -> None: - prefix = self.config_name_prefix - - def _add_text(name: str, description: str) -> ConfigItem: - item = ConfigItem(f"{prefix}_{name}", description, TEXT_VALUE_TYPE) - self.configuration.add(item) - return item - - def _add_uuid(name: str, description: str) -> ConfigItem: - item = ConfigItem( - f"{prefix}_{name}", - description, - TEXT_VALUE_TYPE, - default=UuidValueGenerator(), - ) - self.configuration.add(item) - return item - - def _add_random_string( - name: str, description: str, length: int = 32, secure: bool = True - ) -> ConfigItem: - item = ConfigItem( - f"{prefix}_{name}", - description, - TEXT_VALUE_TYPE, - default=RandomStringValueGenerator(length, secure), - ) - self.configuration.add(item) - return item - - def _add_int(name: str, description: str) -> ConfigItem: - item = ConfigItem(f"{prefix}_{name}", description, INTEGER_VALUE_TYPE) - self.configuration.add(item) - return item - - self._domain = _add_text("DOMAIN", "domain name") - self._email = _add_text("EMAIL", "admin email address") - _add_text( - "AUTO_BACKUP_COS_SECRET_ID", - "access key id for Tencent COS, used for auto backup", - ) - _add_text( - "AUTO_BACKUP_COS_SECRET_KEY", - "access key secret for Tencent COS, used for auto backup", - ) - _add_text( - "AUTO_BACKUP_COS_ENDPOINT", - "endpoint (cos.*.myqcloud.com) for Tencent COS, used for auto backup", - ) - _add_text( - "AUTO_BACKUP_COS_BUCKET", - "bucket name for Tencent COS, used for auto backup", - ) - _add_uuid("V2RAY_TOKEN", "v2ray user id") - _add_uuid("V2RAY_PATH", "v2ray path, which will be prefixed by _") - _add_random_string("2FAUTH_APP_KEY", "2FAuth App Key") - _add_text("2FAUTH_MAIL_USERNAME", "2FAuth SMTP user") - _add_text("2FAUTH_MAIL_PASSWORD", "2FAuth SMTP password") - _add_text("GIT_SERVER_USERNAME", "Git server username") - _add_text("GIT_SERVER_PASSWORD", "Git server password") - - def setup(self) -> None: - self._config_file_path = self.app.data_dir.add_subpath( - "config", False, description="Configuration file path." - ) - - @property - def config_name_prefix(self) -> str: - return self.app.app_id.upper() - - @property - def configuration(self) -> Configuration: - return self._configuration - - @property - def config_file_path(self) -> AppFeaturePath: - return self._config_file_path - - @property - def all_set(self) -> bool: - return self.configuration.all_set - - def get_item(self, name: str) -> ConfigItem[Any]: - if not name.startswith(self.config_name_prefix + "_"): - name = f"{self.config_name_prefix}_{name}" - - item = self.configuration.get_or(name, None) - if item is None: - raise AppConfigError(f"Config item '{name}' not found.", self.configuration) - return item - - @overload - def get_item_value_str(self, name: str) -> str: ... - - @overload - def get_item_value_str(self, name: str, ensure_set: Literal[True]) -> str: ... - - @overload - def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: ... - - def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: - self.load_config_file() - item = self.get_item(name) - if not item.is_set: - if ensure_set: - raise AppConfigItemNotSetError( - f"Config item '{name}' is not set.", self.configuration, [item] - ) - return None - return item.value_str - - def get_str_dict(self, ensure_all_set: bool = True) -> dict[str, str]: - self.load_config_file() - if ensure_all_set and not self.configuration.all_set: - raise AppConfigItemNotSetError( - "Some config items are not set.", - self.configuration, - self.configuration.get_unset_items(), - ) - return self.configuration.to_str_dict() - - @property - def domain_item_name(self) -> str: - return self._domain.name - - def get_domain_value_str(self) -> str: - return self.get_item_value_str(self._domain.name) - - def get_email_value_str_optional(self) -> str | None: - return self.get_item_value_str(self._email.name, ensure_set=False) - - def _set_with_default(self) -> None: - if not self.configuration.all_not_set: - raise AppConfigError( - "Config is not clean. " - "Some config items are already set. " - "Can't set again with default value.", - self.configuration, - ) - for item in self.configuration: - if item.can_generate_default: - item.set_value(item.generate_default_value()) - - def _to_config_file_content(self) -> str: - content = "".join( - [ - f"{item.name}={item.value_str if item.is_set else ''}\n" - for item in self.configuration - ] - ) - return content - - def _create_init_config_file(self) -> None: - if self.config_file_path.check_self(): - raise AppConfigError( - "Config file already exists.", - self.configuration, - user_message=f"The config file at " - f"{self.config_file_path.full_path_str} already exists.", - ) - self._set_with_default() - self.config_file_path.ensure() - with open( - self.config_file_path.full_path, "w", encoding="utf-8", newline="\n" - ) as file: - file.write(self._to_config_file_content()) - - def _parse_config_file(self) -> SimpleLineConfigParser.Result: - if not self.config_file_path.check_self(): - raise AppConfigFileNotFoundError( - "Config file not found.", - self.configuration, - self.config_file_path.full_path_str, - user_message=f"The config file at " - f"{self.config_file_path.full_path_str} does not exist. " - f"You can create an initial one with 'init' command.", - ) - - text = self.config_file_path.full_path.read_text() - try: - parser = SimpleLineConfigParser() - return parser.parse(text) - except ParseError as e: - raise AppConfigFileParseError( - "Failed to parse config file.", self.configuration, text - ) from e - - def _parse_and_print_config_file(self) -> None: - parse_result = self._parse_config_file() - for entry in parse_result: - print(f"{entry.key}={entry.value}") - - def _check_duplicate( - self, - parse_result: dict[str, list[SimpleLineConfigParser.Entry]], - ) -> dict[str, SimpleLineConfigParser.Entry]: - entry_dict: dict[str, SimpleLineConfigParser.Entry] = {} - duplicate_entries: list[SimpleLineConfigParser.Entry] = [] - for key, entries in parse_result.items(): - entry_dict[key] = entries[0] - if len(entries) > 1: - duplicate_entries.extend(entries) - if len(duplicate_entries) > 0: - raise AppConfigDuplicateEntryError( - "Duplicate entries found.", self.configuration, duplicate_entries - ) - - return entry_dict - - def _check_type( - self, entry_dict: dict[str, SimpleLineConfigParser.Entry] - ) -> dict[str, Any]: - value_dict: dict[str, Any] = {} - error_entries: list[SimpleLineConfigParser.Entry] = [] - errors: list[CruValueTypeError] = [] - for key, entry in entry_dict.items(): - try: - if entry.value == "": - value_dict[key] = None - else: - value = entry.value - config_item = self.configuration.get_or(key) - if config_item is not CruNotFound.VALUE: - value = config_item.value_type.convert_str_to_value(value) - value_dict[key] = value - except CruValueTypeError as e: - error_entries.append(entry) - errors.append(e) - if len(error_entries) > 0: - raise AppConfigEntryValueFormatError( - "Entry value format is not correct.", - self.configuration, - error_entries, - ) from ExceptionGroup("Multiple format errors occurred.", errors) - return value_dict - - def _read_config_file(self) -> dict[str, Any]: - parsed = self._parse_config_file() - entry_groups = parsed.cru_iter().group_by(lambda e: e.key) - entry_dict = self._check_duplicate(entry_groups) - value_dict = self._check_type(entry_dict) - return value_dict - - def _real_load_config_file(self) -> None: - self.configuration.reset_all() - value_dict = self._read_config_file() - for key, value in value_dict.items(): - if value is None: - continue - self.configuration.set_config_item(key, value) - - def load_config_file(self, force=False) -> None: - if force or not self._loaded: - self._real_load_config_file() - self._loaded = True - - def _print_app_config_info(self): - for item in self.configuration: - print(item.description_str) - - def get_command_info(self): - return "config", "Manage configuration." - - def setup_arg_parser(self, arg_parser) -> None: - subparsers = arg_parser.add_subparsers( - dest="config_command", required=True, metavar="CONFIG_COMMAND" - ) - _init_parser = subparsers.add_parser( - "init", help="create an initial config file" - ) - _print_app_parser = subparsers.add_parser( - "print-app", - help="print information of the config items defined by app", - ) - _print_parser = subparsers.add_parser("print", help="print current config") - _check_config_parser = subparsers.add_parser( - "check", - help="check the validity of the config file", - ) - _check_config_parser.add_argument( - "-f", - "--format-only", - action="store_true", - help="only check content format, not app config item requirements.", - ) - - def run_command(self, args) -> None: - if args.config_command == "init": - self._create_init_config_file() - elif args.config_command == "print-app": - self._print_app_config_info() - elif args.config_command == "print": - self._parse_and_print_config_file() - elif args.config_command == "check": - if args.format_only: - self._parse_config_file() - else: - self._read_config_file() diff --git a/tools/cru-py/cru/service/_external.py b/tools/cru-py/cru/service/_external.py deleted file mode 100644 index 2347e95..0000000 --- a/tools/cru-py/cru/service/_external.py +++ /dev/null @@ -1,81 +0,0 @@ -from ._base import AppCommandFeatureProvider -from ._nginx import NginxManager - - -class CliToolCommandProvider(AppCommandFeatureProvider): - def __init__(self) -> None: - super().__init__("cli-tool-command-provider") - - def setup(self): - pass - - def get_command_info(self): - return ("gen-cli", "Get commands of running external cli tools.") - - def setup_arg_parser(self, arg_parser): - subparsers = arg_parser.add_subparsers( - dest="gen_cli_command", required=True, metavar="GEN_CLI_COMMAND" - ) - certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") - certbot_parser.add_argument( - "-t", "--test", action="store_true", help="run certbot in test mode" - ) - _install_docker_parser = subparsers.add_parser( - "install-docker", help="print docker installation commands" - ) - _update_blog_parser = subparsers.add_parser( - "update-blog", help="print blog update command" - ) - - def _print_install_docker_commands(self) -> None: - output = """ -### COMMAND: uninstall apt docker -for pkg in docker.io docker-doc docker-compose \ -podman-docker containerd runc; \ -do sudo apt-get remove $pkg; done - -### COMMAND: prepare apt certs -sudo apt-get update -sudo apt-get install ca-certificates curl -sudo install -m 0755 -d /etc/apt/keyrings - -### COMMAND: install certs -sudo curl -fsSL https://download.docker.com/linux/debian/gpg \ --o /etc/apt/keyrings/docker.asc -sudo chmod a+r /etc/apt/keyrings/docker.asc - -### COMMAND: add docker apt source -echo \\ - "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \ -https://download.docker.com/linux/debian \\ - $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\ - sudo tee /etc/apt/sources.list.d/docker.list > /dev/null - -### COMMAND: update apt and install docker -sudo apt-get update -sudo apt-get install docker-ce docker-ce-cli containerd.io \ -docker-buildx-plugin docker-compose-plugin - -### COMMAND: setup system for docker -sudo systemctl enable docker -sudo systemctl start docker -sudo groupadd -f docker -sudo usermod -aG docker $USER -# Remember to log out and log back in for the group changes to take effect -""".strip() - print(output) - - def _print_update_blog_command(self): - output = """ -### COMMAND: update blog -docker exec -it blog /scripts/update.bash -""".strip() - print(output) - - def run_command(self, args): - if args.gen_cli_command == "certbot": - self.app.get_feature(NginxManager).print_all_certbot_commands(args.test) - elif args.gen_cli_command == "install-docker": - self._print_install_docker_commands() - elif args.gen_cli_command == "update-blog": - self._print_update_blog_command()
\ No newline at end of file diff --git a/tools/cru-py/cru/service/_nginx.py b/tools/cru-py/cru/service/_nginx.py deleted file mode 100644 index 6c77971..0000000 --- a/tools/cru-py/cru/service/_nginx.py +++ /dev/null @@ -1,268 +0,0 @@ -from argparse import Namespace -from enum import Enum, auto -import re -import subprocess -from typing import TypeAlias - -from cru import CruInternalError - -from ._base import AppCommandFeatureProvider -from ._config import ConfigManager -from ._template import TemplateManager - - -class CertbotAction(Enum): - CREATE = auto() - EXPAND = auto() - SHRINK = auto() - RENEW = auto() - - -class NginxManager(AppCommandFeatureProvider): - CertbotAction: TypeAlias = CertbotAction - - def __init__(self) -> None: - super().__init__("nginx-manager") - self._domains_cache: list[str] | None = None - - def setup(self) -> None: - pass - - @property - def _config_manager(self) -> ConfigManager: - return self.app.get_feature(ConfigManager) - - @property - def root_domain(self) -> str: - return self._config_manager.get_domain_value_str() - - @property - def domains(self) -> list[str]: - if self._domains_cache is None: - self._domains_cache = self._get_domains() - return self._domains_cache - - @property - def subdomains(self) -> list[str]: - suffix = "." + self.root_domain - return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)] - - @property - def _domain_config_name(self) -> str: - return self._config_manager.domain_item_name - - def _get_domains_from_text(self, text: str) -> set[str]: - domains: set[str] = set() - regex = re.compile(r"server_name\s+(\S+)\s*;") - for match in regex.finditer(text): - domains.add(match[1]) - return domains - - def _join_generated_nginx_conf_text(self) -> str: - text = "" - template_manager = self.app.get_feature(TemplateManager) - for nginx_conf in template_manager.generate(): - text += nginx_conf[1] - return text - - def _get_domains(self) -> list[str]: - text = self._join_generated_nginx_conf_text() - domains = list(self._get_domains_from_text(text)) - domains.remove(self.root_domain) - return [self.root_domain, *domains] - - def _print_domains(self) -> None: - for domain in self.domains: - print(domain) - - def _certbot_command( - self, - action: CertbotAction | str, - test: bool, - *, - docker=True, - standalone=None, - email=None, - agree_tos=True, - ) -> str: - if isinstance(action, str): - action = CertbotAction[action.upper()] - - command_args = [] - - add_domain_option = True - if action is CertbotAction.CREATE: - if standalone is None: - standalone = True - command_action = "certonly" - elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]: - if standalone is None: - standalone = False - command_action = "certonly" - elif action is CertbotAction.RENEW: - if standalone is None: - standalone = False - add_domain_option = False - command_action = "renew" - else: - raise CruInternalError("Invalid certbot action.") - - data_dir = self.app.data_dir.full_path.as_posix() - - if not docker: - command_args.append("certbot") - else: - command_args.extend( - [ - "docker run -it --rm --name certbot", - f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"', - f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"', - ] - ) - if standalone: - command_args.append('-p "0.0.0.0:80:80"') - else: - command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"') - - command_args.append("certbot/certbot") - - command_args.append(command_action) - - command_args.append(f"--cert-name {self.root_domain}") - - if standalone: - command_args.append("--standalone") - else: - command_args.append("--webroot -w /var/www/certbot") - - if add_domain_option: - command_args.append(" ".join([f"-d {domain}" for domain in self.domains])) - - if email is not None: - command_args.append(f"--email {email}") - - if agree_tos: - command_args.append("--agree-tos") - - if test: - command_args.append("--test-cert --dry-run") - - return " ".join(command_args) - - def print_all_certbot_commands(self, test: bool): - print("### COMMAND: (standalone) create certs") - print( - self._certbot_command( - CertbotAction.CREATE, - test, - email=self._config_manager.get_email_value_str_optional(), - ) - ) - print() - print("### COMMAND: (webroot+nginx) expand or shrink certs") - print( - self._certbot_command( - CertbotAction.EXPAND, - test, - email=self._config_manager.get_email_value_str_optional(), - ) - ) - print() - print("### COMMAND: (webroot+nginx) renew certs") - print( - self._certbot_command( - CertbotAction.RENEW, - test, - email=self._config_manager.get_email_value_str_optional(), - ) - ) - - @property - def _cert_path_str(self) -> str: - return str( - self.app.data_dir.full_path - / "certbot/certs/live" - / self.root_domain - / "fullchain.pem" - ) - - def get_command_info(self): - return "nginx", "Manage nginx related things." - - def setup_arg_parser(self, arg_parser): - subparsers = arg_parser.add_subparsers( - dest="nginx_command", required=True, metavar="NGINX_COMMAND" - ) - _list_parser = subparsers.add_parser("list", help="list domains") - certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") - certbot_parser.add_argument( - "--no-test", - action="store_true", - help="remove args making certbot run in test mode", - ) - - def run_command(self, args: Namespace) -> None: - if args.nginx_command == "list": - self._print_domains() - elif args.nginx_command == "certbot": - self.print_all_certbot_commands(not args.no_test) - - def _generate_dns_zone( - self, - ip: str, - /, - ttl: str | int = 600, - *, - enable_mail: bool = True, - dkim: str | None = None, - ) -> str: - # TODO: Not complete and test now. - root_domain = self.root_domain - result = f"$ORIGIN {root_domain}.\n\n" - result += "; A records\n" - result += f"@ {ttl} IN A {ip}\n" - for subdomain in self.subdomains: - result += f"{subdomain} {ttl} IN A {ip}\n" - - if enable_mail: - result += "\n; MX records\n" - result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n" - result += "\n; SPF record\n" - result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n' - if dkim is not None: - result += "\n; DKIM record\n" - result += f'mail._domainkey {ttl} IN TEXT "{dkim}"' - result += "\n; DMARC record\n" - dmarc_options = [ - "v=DMARC1", - "p=none", - f"rua=mailto:dmarc.report@{root_domain}", - f"ruf=mailto:dmarc.report@{root_domain}", - "sp=none", - "ri=86400", - ] - result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n' - return result - - def _get_dkim_from_mailserver(self) -> str | None: - # TODO: Not complete and test now. - dkim_path = ( - self.app.data_dir.full_path - / "dms/config/opendkim/keys" - / self.root_domain - / "mail.txt" - ) - if not dkim_path.exists(): - return None - - p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True) - value = "" - for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")): - value += match.group(1) - return value - - def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str: - # TODO: Not complete and test now. - return self._generate_dns_zone( - ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver() - ) diff --git a/tools/cru-py/cru/service/_template.py b/tools/cru-py/cru/service/_template.py deleted file mode 100644 index 1381700..0000000 --- a/tools/cru-py/cru/service/_template.py +++ /dev/null @@ -1,90 +0,0 @@ -from argparse import Namespace -from pathlib import Path -import shutil - -from cru.template import TemplateTree, CruStrWrapperTemplate - -from ._base import AppCommandFeatureProvider, AppFeaturePath -from ._config import ConfigManager - - -class TemplateManager(AppCommandFeatureProvider): - def __init__(self, prefix: str | None = None): - super().__init__("template-manager") - self._prefix = prefix or self.app.app_id.upper() - - def setup(self) -> None: - self._templates_dir = self.app.add_path("templates", True) - self._generated_dir = self.app.add_path("generated", True) - self._template_tree: TemplateTree[CruStrWrapperTemplate] | None = None - - @property - def prefix(self) -> str: - return self._prefix - - @property - def templates_dir(self) -> AppFeaturePath: - return self._templates_dir - - @property - def generated_dir(self) -> AppFeaturePath: - return self._generated_dir - - @property - def template_tree(self) -> TemplateTree[CruStrWrapperTemplate]: - if self._template_tree is None: - return self.reload() - return self._template_tree - - def reload(self) -> TemplateTree: - self._template_tree = TemplateTree( - lambda text: CruStrWrapperTemplate(text), self.templates_dir.full_path_str - ) - return self._template_tree - - def _print_file_lists(self) -> None: - for path, template in self.template_tree.templates: - print(f"[{template.variable_count}]", path.as_posix()) - - def generate(self) -> list[tuple[Path, str]]: - config_manager = self.app.get_feature(ConfigManager) - return self.template_tree.generate(config_manager.get_str_dict()) - - def _generate_files(self, dry_run: bool) -> None: - config_manager = self.app.get_feature(ConfigManager) - if not dry_run and self.generated_dir.full_path.exists(): - shutil.rmtree(self.generated_dir.full_path) - self.template_tree.generate_to( - self.generated_dir.full_path_str, config_manager.get_str_dict(), dry_run - ) - - def get_command_info(self): - return ("template", "Manage templates.") - - def setup_arg_parser(self, arg_parser): - subparsers = arg_parser.add_subparsers( - dest="template_command", required=True, metavar="TEMPLATE_COMMAND" - ) - _list_parser = subparsers.add_parser("list", help="list templates") - _variables_parser = subparsers.add_parser( - "variables", help="list variables used in all templates" - ) - generate_parser = subparsers.add_parser("generate", help="generate templates") - generate_parser.add_argument( - "--no-dry-run", action="store_true", help="generate and write target files" - ) - - def run_command(self, args: Namespace) -> None: - if args.template_command == "list": - self._print_file_lists() - elif args.template_command == "variables": - for var in self.template_tree.variables: - print(var) - elif args.template_command == "generate": - dry_run = not args.no_dry_run - self._generate_files(dry_run) - if dry_run: - print("Dry run successfully.") - print( - f"Will delete dir {self.generated_dir.full_path_str} if it exists." - ) |