diff options
Diffstat (limited to 'tools/cru-py/cru/service')
| -rw-r--r-- | tools/cru-py/cru/service/__init__.py | 0 | ||||
| -rw-r--r-- | tools/cru-py/cru/service/__main__.py | 20 | ||||
| -rw-r--r-- | tools/cru-py/cru/service/_app.py | 34 | ||||
| -rw-r--r-- | tools/cru-py/cru/service/_base.py | 449 | ||||
| -rw-r--r-- | tools/cru-py/cru/service/_config.py | 444 | ||||
| -rw-r--r-- | tools/cru-py/cru/service/_external.py | 81 | ||||
| -rw-r--r-- | tools/cru-py/cru/service/_nginx.py | 268 | ||||
| -rw-r--r-- | tools/cru-py/cru/service/_template.py | 90 | 
8 files changed, 0 insertions, 1386 deletions
diff --git a/tools/cru-py/cru/service/__init__.py b/tools/cru-py/cru/service/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/tools/cru-py/cru/service/__init__.py +++ /dev/null diff --git a/tools/cru-py/cru/service/__main__.py b/tools/cru-py/cru/service/__main__.py deleted file mode 100644 index 1c10e82..0000000 --- a/tools/cru-py/cru/service/__main__.py +++ /dev/null @@ -1,20 +0,0 @@ -from cru import CruException - -from ._app import create_app - - -def main(): -    app = create_app() -    app.run_command() - - -if __name__ == "__main__": -    try: -        main() -    except CruException as e: -        user_message = e.get_user_message() -        if user_message is not None: -            print(f"Error: {user_message}") -            exit(1) -        else: -            raise diff --git a/tools/cru-py/cru/service/_app.py b/tools/cru-py/cru/service/_app.py deleted file mode 100644 index 6030dad..0000000 --- a/tools/cru-py/cru/service/_app.py +++ /dev/null @@ -1,34 +0,0 @@ -from ._base import ( -    AppBase, -    CommandDispatcher, -    AppInitializer, -    PathCommandProvider, -) -from ._config import ConfigManager -from ._template import TemplateManager -from ._nginx import NginxManager -from ._external import CliToolCommandProvider - -APP_ID = "crupest" - - -class App(AppBase): -    def __init__(self): -        super().__init__(APP_ID, f"{APP_ID}-service") -        self.add_feature(PathCommandProvider()) -        self.add_feature(AppInitializer()) -        self.add_feature(ConfigManager()) -        self.add_feature(TemplateManager()) -        self.add_feature(NginxManager()) -        self.add_feature(CliToolCommandProvider()) -        self.add_feature(CommandDispatcher()) - -    def run_command(self): -        command_dispatcher = self.get_feature(CommandDispatcher) -        command_dispatcher.run_command() - - -def create_app() -> App: -    app = App() -    app.setup() -    return app diff --git a/tools/cru-py/cru/service/_base.py b/tools/cru-py/cru/service/_base.py deleted file mode 100644 index ad813c9..0000000 --- a/tools/cru-py/cru/service/_base.py +++ /dev/null @@ -1,449 +0,0 @@ -from __future__ import annotations - -from argparse import ArgumentParser, Namespace -from abc import ABC, abstractmethod -import argparse -import os -from pathlib import Path -from typing import TypeVar, overload - -from cru import CruException, CruLogicError - -_Feature = TypeVar("_Feature", bound="AppFeatureProvider") - - -class AppError(CruException): -    pass - - -class AppFeatureError(AppError): -    def __init__(self, message, feature: type | str, *args, **kwargs): -        super().__init__(message, *args, **kwargs) -        self._feature = feature - -    @property -    def feature(self) -> type | str: -        return self._feature - - -class AppPathError(CruException): -    def __init__(self, message, _path: str | Path, *args, **kwargs): -        super().__init__(message, *args, **kwargs) -        self._path = str(_path) - -    @property -    def path(self) -> str: -        return self._path - - -class AppPath(ABC): -    def __init__(self, id: str, is_dir: bool, description: str) -> None: -        self._is_dir = is_dir -        self._id = id -        self._description = description - -    @property -    @abstractmethod -    def parent(self) -> AppPath | None: ... - -    @property -    @abstractmethod -    def app(self) -> AppBase: ... - -    @property -    def id(self) -> str: -        return self._id - -    @property -    def description(self) -> str: -        return self._description - -    @property -    def is_dir(self) -> bool: -        return self._is_dir - -    @property -    @abstractmethod -    def full_path(self) -> Path: ... - -    @property -    def full_path_str(self) -> str: -        return str(self.full_path) - -    def check_parents(self, must_exist: bool = False) -> bool: -        for p in reversed(self.full_path.parents): -            if not p.exists() and not must_exist: -                return False -            if not p.is_dir(): -                raise AppPathError("Parents' path must be a dir.", self.full_path) -        return True - -    def check_self(self, must_exist: bool = False) -> bool: -        if not self.check_parents(must_exist): -            return False -        if not self.full_path.exists(): -            if not must_exist: -                return False -            raise AppPathError("Not exist.", self.full_path) -        if self.is_dir: -            if not self.full_path.is_dir(): -                raise AppPathError("Should be a directory, but not.", self.full_path) -            else: -                return True -        else: -            if not self.full_path.is_file(): -                raise AppPathError("Should be a file, but not.", self.full_path) -            else: -                return True - -    def ensure(self, create_file: bool = False) -> None: -        e = self.check_self(False) -        if not e: -            os.makedirs(self.full_path.parent, exist_ok=True) -            if self.is_dir: -                os.mkdir(self.full_path) -            elif create_file: -                with open(self.full_path, "w") as f: -                    f.write("") - -    def add_subpath( -        self, -        name: str, -        is_dir: bool, -        /, -        id: str | None = None, -        description: str = "", -    ) -> AppFeaturePath: -        return self.app.add_path(name, is_dir, self, id, description) - -    @property -    def app_relative_path(self) -> Path: -        return self.full_path.relative_to(self.app.root.full_path) - - -class AppFeaturePath(AppPath): -    def __init__( -        self, -        parent: AppPath, -        name: str, -        is_dir: bool, -        /, -        id: str | None = None, -        description: str = "", -    ) -> None: -        super().__init__(id or name, is_dir, description) -        self._name = name -        self._parent = parent - -    @property -    def name(self) -> str: -        return self._name - -    @property -    def parent(self) -> AppPath: -        return self._parent - -    @property -    def app(self) -> AppBase: -        return self.parent.app - -    @property -    def full_path(self) -> Path: -        return Path(self.parent.full_path, self.name).resolve() - - -class AppRootPath(AppPath): -    def __init__(self, app: AppBase): -        super().__init__("root", True, "Application root path.") -        self._app = app -        self._full_path: Path | None = None - -    @property -    def parent(self) -> None: -        return None - -    @property -    def app(self) -> AppBase: -        return self._app - -    @property -    def full_path(self) -> Path: -        if self._full_path is None: -            raise AppError("App root path is not set yet.") -        return self._full_path - -    def setup(self, path: os.PathLike) -> None: -        if self._full_path is not None: -            raise AppError("App root path is already set.") -        self._full_path = Path(path).resolve() - - -class AppFeatureProvider(ABC): -    def __init__(self, name: str, /, app: AppBase | None = None): -        super().__init__() -        self._name = name -        self._app = app if app else AppBase.get_instance() - -    @property -    def app(self) -> AppBase: -        return self._app - -    @property -    def name(self) -> str: -        return self._name - -    @abstractmethod -    def setup(self) -> None: ... - - -class AppCommandFeatureProvider(AppFeatureProvider): -    @abstractmethod -    def get_command_info(self) -> tuple[str, str]: ... - -    @abstractmethod -    def setup_arg_parser(self, arg_parser: ArgumentParser): ... - -    @abstractmethod -    def run_command(self, args: Namespace) -> None: ... - - -DATA_DIR_NAME = "data" - - -class PathCommandProvider(AppCommandFeatureProvider): -    def __init__(self) -> None: -        super().__init__("path-command-provider") - -    def setup(self): -        pass - -    def get_command_info(self): -        return ("path", "Get information about paths used by app.") - -    def setup_arg_parser(self, arg_parser: ArgumentParser) -> None: -        subparsers = arg_parser.add_subparsers( -            dest="path_command", required=True, metavar="PATH_COMMAND" -        ) -        _list_parser = subparsers.add_parser( -            "list", help="list special paths used by app" -        ) - -    def run_command(self, args: Namespace) -> None: -        if args.path_command == "list": -            for path in self.app.paths: -                print(f"{path.app_relative_path.as_posix()}: {path.description}") - - -class CommandDispatcher(AppFeatureProvider): -    def __init__(self) -> None: -        super().__init__("command-dispatcher") -        self._parsed_args: argparse.Namespace | None = None - -    def setup_arg_parser(self) -> None: -        epilog = """ -==> to start, -./tools/manage init -./tools/manage config init -ln -s generated/docker-compose.yaml . -# Then edit config file. - -==> to update -git pull -./tools/manage template generate --no-dry-run -docker compose up -        """.strip() - -        self._map: dict[str, AppCommandFeatureProvider] = {} -        arg_parser = argparse.ArgumentParser( -            description="Service management", -            formatter_class=argparse.RawDescriptionHelpFormatter, -            epilog=epilog, -        ) -        arg_parser.add_argument( -            "--project-dir", -            help="The path of the project directory.", -            required=True, -            type=str, -        ) -        subparsers = arg_parser.add_subparsers( -            dest="command", -            help="The management command to execute.", -            metavar="COMMAND", -        ) -        for feature in self.app.features: -            if isinstance(feature, AppCommandFeatureProvider): -                info = feature.get_command_info() -                command_subparser = subparsers.add_parser(info[0], help=info[1]) -                feature.setup_arg_parser(command_subparser) -                self._map[info[0]] = feature -        self._arg_parser = arg_parser - -    def setup(self): -        pass - -    @property -    def arg_parser(self) -> argparse.ArgumentParser: -        return self._arg_parser - -    @property -    def map(self) -> dict[str, AppCommandFeatureProvider]: -        return self._map - -    def get_program_parsed_args(self) -> argparse.Namespace: -        if self._parsed_args is None: -            self._parsed_args = self.arg_parser.parse_args() -        return self._parsed_args - -    def run_command(self, args: argparse.Namespace | None = None) -> None: -        real_args = args or self.get_program_parsed_args() -        if real_args.command is None: -            self.arg_parser.print_help() -            return -        self.map[real_args.command].run_command(real_args) - - -class AppInitializer(AppCommandFeatureProvider): -    def __init__(self) -> None: -        super().__init__("app-initializer") - -    def _init_app(self) -> bool: -        if self.app.app_initialized: -            return False -        self.app.data_dir.ensure() -        return True - -    def setup(self): -        pass - -    def get_command_info(self): -        return ("init", "Initialize the app.") - -    def setup_arg_parser(self, arg_parser): -        pass - -    def run_command(self, args): -        init = self._init_app() -        if init: -            print("App initialized successfully.") -        else: -            print("App is already initialized. Do nothing.") - - -class AppBase: -    _instance: AppBase | None = None - -    @staticmethod -    def get_instance() -> AppBase: -        if AppBase._instance is None: -            raise AppError("App instance not initialized") -        return AppBase._instance - -    def __init__(self, app_id: str, name: str): -        AppBase._instance = self -        self._app_id = app_id -        self._name = name -        self._root = AppRootPath(self) -        self._paths: list[AppFeaturePath] = [] -        self._features: list[AppFeatureProvider] = [] - -    def setup(self) -> None: -        command_dispatcher = self.get_feature(CommandDispatcher) -        command_dispatcher.setup_arg_parser() -        program_args = command_dispatcher.get_program_parsed_args() -        self.setup_root(program_args.project_dir) -        self._data_dir = self.add_path(DATA_DIR_NAME, True, id="data") -        for feature in self.features: -            feature.setup() -        for path in self.paths: -            path.check_self() - -    @property -    def app_id(self) -> str: -        return self._app_id - -    @property -    def name(self) -> str: -        return self._name - -    @property -    def root(self) -> AppRootPath: -        return self._root - -    def setup_root(self, path: os.PathLike) -> None: -        self._root.setup(path) - -    @property -    def data_dir(self) -> AppFeaturePath: -        return self._data_dir - -    @property -    def app_initialized(self) -> bool: -        return self.data_dir.check_self() - -    def ensure_app_initialized(self) -> AppRootPath: -        if not self.app_initialized: -            raise AppError( -                user_message="Root directory does not exist. " -                "Please run 'init' to create one." -            ) -        return self.root - -    @property -    def features(self) -> list[AppFeatureProvider]: -        return self._features - -    @property -    def paths(self) -> list[AppFeaturePath]: -        return self._paths - -    def add_feature(self, feature: _Feature) -> _Feature: -        for f in self.features: -            if f.name == feature.name: -                raise AppFeatureError( -                    f"Duplicate feature name: {feature.name}.", feature.name -                ) -        self._features.append(feature) -        return feature - -    def add_path( -        self, -        name: str, -        is_dir: bool, -        /, -        parent: AppPath | None = None, -        id: str | None = None, -        description: str = "", -    ) -> AppFeaturePath: -        p = AppFeaturePath( -            parent or self.root, name, is_dir, id=id, description=description -        ) -        self._paths.append(p) -        return p - -    @overload -    def get_feature(self, feature: str) -> AppFeatureProvider: ... - -    @overload -    def get_feature(self, feature: type[_Feature]) -> _Feature: ... - -    def get_feature( -        self, feature: str | type[_Feature] -    ) -> AppFeatureProvider | _Feature: -        if isinstance(feature, str): -            for f in self._features: -                if f.name == feature: -                    return f -        elif isinstance(feature, type): -            for f in self._features: -                if isinstance(f, feature): -                    return f -        else: -            raise CruLogicError("Argument must be the name of feature or its class.") - -        raise AppFeatureError(f"Feature {feature} not found.", feature) - -    def get_path(self, name: str) -> AppFeaturePath: -        for p in self._paths: -            if p.id == name or p.name == name: -                return p -        raise AppPathError(f"Application path {name} not found.", name) diff --git a/tools/cru-py/cru/service/_config.py b/tools/cru-py/cru/service/_config.py deleted file mode 100644 index cbb9533..0000000 --- a/tools/cru-py/cru/service/_config.py +++ /dev/null @@ -1,444 +0,0 @@ -from collections.abc import Iterable -from typing import Any, Literal, overload - -from cru import CruException, CruNotFound -from cru.config import Configuration, ConfigItem -from cru.value import ( -    INTEGER_VALUE_TYPE, -    TEXT_VALUE_TYPE, -    CruValueTypeError, -    RandomStringValueGenerator, -    UuidValueGenerator, -) -from cru.parsing import ParseError, SimpleLineConfigParser - -from ._base import AppFeaturePath, AppCommandFeatureProvider - - -class AppConfigError(CruException): -    def __init__( -        self, message: str, configuration: Configuration, *args, **kwargs -    ) -> None: -        super().__init__(message, *args, **kwargs) -        self._configuration = configuration - -    @property -    def configuration(self) -> Configuration: -        return self._configuration - - -class AppConfigFileError(AppConfigError): -    def __init__( -        self, -        message: str, -        configuration: Configuration, -        *args, -        **kwargs, -    ) -> None: -        super().__init__(message, configuration, *args, **kwargs) - - -class AppConfigFileNotFoundError(AppConfigFileError): -    def __init__( -        self, -        message: str, -        configuration: Configuration, -        file_path: str, -        *args, -        **kwargs, -    ) -> None: -        super().__init__(message, configuration, *args, **kwargs) -        self._file_path = file_path - -    @property -    def file_path(self) -> str: -        return self._file_path - - -class AppConfigFileParseError(AppConfigFileError): -    def __init__( -        self, -        message: str, -        configuration: Configuration, -        file_content: str, -        *args, -        **kwargs, -    ) -> None: -        super().__init__(message, configuration, *args, **kwargs) -        self._file_content = file_content -        self.__cause__: ParseError - -    @property -    def file_content(self) -> str: -        return self._file_content - -    def get_user_message(self) -> str: -        return f"Error while parsing config file at line {self.__cause__.line_number}." - - -class AppConfigFileEntryError(AppConfigFileError): -    def __init__( -        self, -        message: str, -        configuration: Configuration, -        entries: Iterable[SimpleLineConfigParser.Entry], -        *args, -        **kwargs, -    ) -> None: -        super().__init__(message, configuration, *args, **kwargs) -        self._entries = list(entries) - -    @property -    def error_entries(self) -> list[SimpleLineConfigParser.Entry]: -        return self._entries - -    @staticmethod -    def entries_to_friendly_message( -        entries: Iterable[SimpleLineConfigParser.Entry], -    ) -> str: -        return "\n".join( -            f"line {entry.line_number}: {entry.key}={entry.value}" for entry in entries -        ) - -    @property -    def friendly_message_head(self) -> str: -        return "Error entries found in config file" - -    def get_user_message(self) -> str: -        return ( -            f"{self.friendly_message_head}:\n" -            f"{self.entries_to_friendly_message(self.error_entries)}" -        ) - - -class AppConfigDuplicateEntryError(AppConfigFileEntryError): -    @property -    def friendly_message_head(self) -> str: -        return "Duplicate entries found in config file" - - -class AppConfigEntryValueFormatError(AppConfigFileEntryError): -    @property -    def friendly_message_head(self) -> str: -        return "Invalid value format for entries" - - -class AppConfigItemNotSetError(AppConfigError): -    def __init__( -        self, -        message: str, -        configuration: Configuration, -        items: list[ConfigItem], -        *args, -        **kwargs, -    ) -> None: -        super().__init__(message, configuration, *args, **kwargs) -        self._items = items - - -class ConfigManager(AppCommandFeatureProvider): -    def __init__(self) -> None: -        super().__init__("config-manager") -        configuration = Configuration() -        self._configuration = configuration -        self._loaded: bool = False -        self._init_app_defined_items() - -    def _init_app_defined_items(self) -> None: -        prefix = self.config_name_prefix - -        def _add_text(name: str, description: str) -> ConfigItem: -            item = ConfigItem(f"{prefix}_{name}", description, TEXT_VALUE_TYPE) -            self.configuration.add(item) -            return item - -        def _add_uuid(name: str, description: str) -> ConfigItem: -            item = ConfigItem( -                f"{prefix}_{name}", -                description, -                TEXT_VALUE_TYPE, -                default=UuidValueGenerator(), -            ) -            self.configuration.add(item) -            return item - -        def _add_random_string( -            name: str, description: str, length: int = 32, secure: bool = True -        ) -> ConfigItem: -            item = ConfigItem( -                f"{prefix}_{name}", -                description, -                TEXT_VALUE_TYPE, -                default=RandomStringValueGenerator(length, secure), -            ) -            self.configuration.add(item) -            return item - -        def _add_int(name: str, description: str) -> ConfigItem: -            item = ConfigItem(f"{prefix}_{name}", description, INTEGER_VALUE_TYPE) -            self.configuration.add(item) -            return item - -        self._domain = _add_text("DOMAIN", "domain name") -        self._email = _add_text("EMAIL", "admin email address") -        _add_text( -            "AUTO_BACKUP_COS_SECRET_ID", -            "access key id for Tencent COS, used for auto backup", -        ) -        _add_text( -            "AUTO_BACKUP_COS_SECRET_KEY", -            "access key secret for Tencent COS, used for auto backup", -        ) -        _add_text( -            "AUTO_BACKUP_COS_ENDPOINT", -            "endpoint (cos.*.myqcloud.com) for Tencent COS, used for auto backup", -        ) -        _add_text( -            "AUTO_BACKUP_COS_BUCKET", -            "bucket name for Tencent COS, used for auto backup", -        ) -        _add_uuid("V2RAY_TOKEN", "v2ray user id") -        _add_uuid("V2RAY_PATH", "v2ray path, which will be prefixed by _") -        _add_random_string("2FAUTH_APP_KEY", "2FAuth App Key") -        _add_text("2FAUTH_MAIL_USERNAME", "2FAuth SMTP user") -        _add_text("2FAUTH_MAIL_PASSWORD", "2FAuth SMTP password") -        _add_text("GIT_SERVER_USERNAME", "Git server username") -        _add_text("GIT_SERVER_PASSWORD", "Git server password") - -    def setup(self) -> None: -        self._config_file_path = self.app.data_dir.add_subpath( -            "config", False, description="Configuration file path." -        ) - -    @property -    def config_name_prefix(self) -> str: -        return self.app.app_id.upper() - -    @property -    def configuration(self) -> Configuration: -        return self._configuration - -    @property -    def config_file_path(self) -> AppFeaturePath: -        return self._config_file_path - -    @property -    def all_set(self) -> bool: -        return self.configuration.all_set - -    def get_item(self, name: str) -> ConfigItem[Any]: -        if not name.startswith(self.config_name_prefix + "_"): -            name = f"{self.config_name_prefix}_{name}" - -        item = self.configuration.get_or(name, None) -        if item is None: -            raise AppConfigError(f"Config item '{name}' not found.", self.configuration) -        return item - -    @overload -    def get_item_value_str(self, name: str) -> str: ... - -    @overload -    def get_item_value_str(self, name: str, ensure_set: Literal[True]) -> str: ... - -    @overload -    def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: ... - -    def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: -        self.load_config_file() -        item = self.get_item(name) -        if not item.is_set: -            if ensure_set: -                raise AppConfigItemNotSetError( -                    f"Config item '{name}' is not set.", self.configuration, [item] -                ) -            return None -        return item.value_str - -    def get_str_dict(self, ensure_all_set: bool = True) -> dict[str, str]: -        self.load_config_file() -        if ensure_all_set and not self.configuration.all_set: -            raise AppConfigItemNotSetError( -                "Some config items are not set.", -                self.configuration, -                self.configuration.get_unset_items(), -            ) -        return self.configuration.to_str_dict() - -    @property -    def domain_item_name(self) -> str: -        return self._domain.name - -    def get_domain_value_str(self) -> str: -        return self.get_item_value_str(self._domain.name) - -    def get_email_value_str_optional(self) -> str | None: -        return self.get_item_value_str(self._email.name, ensure_set=False) - -    def _set_with_default(self) -> None: -        if not self.configuration.all_not_set: -            raise AppConfigError( -                "Config is not clean. " -                "Some config items are already set. " -                "Can't set again with default value.", -                self.configuration, -            ) -        for item in self.configuration: -            if item.can_generate_default: -                item.set_value(item.generate_default_value()) - -    def _to_config_file_content(self) -> str: -        content = "".join( -            [ -                f"{item.name}={item.value_str if item.is_set else ''}\n" -                for item in self.configuration -            ] -        ) -        return content - -    def _create_init_config_file(self) -> None: -        if self.config_file_path.check_self(): -            raise AppConfigError( -                "Config file already exists.", -                self.configuration, -                user_message=f"The config file at " -                f"{self.config_file_path.full_path_str} already exists.", -            ) -        self._set_with_default() -        self.config_file_path.ensure() -        with open( -            self.config_file_path.full_path, "w", encoding="utf-8", newline="\n" -        ) as file: -            file.write(self._to_config_file_content()) - -    def _parse_config_file(self) -> SimpleLineConfigParser.Result: -        if not self.config_file_path.check_self(): -            raise AppConfigFileNotFoundError( -                "Config file not found.", -                self.configuration, -                self.config_file_path.full_path_str, -                user_message=f"The config file at " -                f"{self.config_file_path.full_path_str} does not exist. " -                f"You can create an initial one with 'init' command.", -            ) - -        text = self.config_file_path.full_path.read_text() -        try: -            parser = SimpleLineConfigParser() -            return parser.parse(text) -        except ParseError as e: -            raise AppConfigFileParseError( -                "Failed to parse config file.", self.configuration, text -            ) from e - -    def _parse_and_print_config_file(self) -> None: -        parse_result = self._parse_config_file() -        for entry in parse_result: -            print(f"{entry.key}={entry.value}") - -    def _check_duplicate( -        self, -        parse_result: dict[str, list[SimpleLineConfigParser.Entry]], -    ) -> dict[str, SimpleLineConfigParser.Entry]: -        entry_dict: dict[str, SimpleLineConfigParser.Entry] = {} -        duplicate_entries: list[SimpleLineConfigParser.Entry] = [] -        for key, entries in parse_result.items(): -            entry_dict[key] = entries[0] -            if len(entries) > 1: -                duplicate_entries.extend(entries) -        if len(duplicate_entries) > 0: -            raise AppConfigDuplicateEntryError( -                "Duplicate entries found.", self.configuration, duplicate_entries -            ) - -        return entry_dict - -    def _check_type( -        self, entry_dict: dict[str, SimpleLineConfigParser.Entry] -    ) -> dict[str, Any]: -        value_dict: dict[str, Any] = {} -        error_entries: list[SimpleLineConfigParser.Entry] = [] -        errors: list[CruValueTypeError] = [] -        for key, entry in entry_dict.items(): -            try: -                if entry.value == "": -                    value_dict[key] = None -                else: -                    value = entry.value -                    config_item = self.configuration.get_or(key) -                    if config_item is not CruNotFound.VALUE: -                        value = config_item.value_type.convert_str_to_value(value) -                    value_dict[key] = value -            except CruValueTypeError as e: -                error_entries.append(entry) -                errors.append(e) -        if len(error_entries) > 0: -            raise AppConfigEntryValueFormatError( -                "Entry value format is not correct.", -                self.configuration, -                error_entries, -            ) from ExceptionGroup("Multiple format errors occurred.", errors) -        return value_dict - -    def _read_config_file(self) -> dict[str, Any]: -        parsed = self._parse_config_file() -        entry_groups = parsed.cru_iter().group_by(lambda e: e.key) -        entry_dict = self._check_duplicate(entry_groups) -        value_dict = self._check_type(entry_dict) -        return value_dict - -    def _real_load_config_file(self) -> None: -        self.configuration.reset_all() -        value_dict = self._read_config_file() -        for key, value in value_dict.items(): -            if value is None: -                continue -            self.configuration.set_config_item(key, value) - -    def load_config_file(self, force=False) -> None: -        if force or not self._loaded: -            self._real_load_config_file() -            self._loaded = True - -    def _print_app_config_info(self): -        for item in self.configuration: -            print(item.description_str) - -    def get_command_info(self): -        return "config", "Manage configuration." - -    def setup_arg_parser(self, arg_parser) -> None: -        subparsers = arg_parser.add_subparsers( -            dest="config_command", required=True, metavar="CONFIG_COMMAND" -        ) -        _init_parser = subparsers.add_parser( -            "init", help="create an initial config file" -        ) -        _print_app_parser = subparsers.add_parser( -            "print-app", -            help="print information of the config items defined by app", -        ) -        _print_parser = subparsers.add_parser("print", help="print current config") -        _check_config_parser = subparsers.add_parser( -            "check", -            help="check the validity of the config file", -        ) -        _check_config_parser.add_argument( -            "-f", -            "--format-only", -            action="store_true", -            help="only check content format, not app config item requirements.", -        ) - -    def run_command(self, args) -> None: -        if args.config_command == "init": -            self._create_init_config_file() -        elif args.config_command == "print-app": -            self._print_app_config_info() -        elif args.config_command == "print": -            self._parse_and_print_config_file() -        elif args.config_command == "check": -            if args.format_only: -                self._parse_config_file() -            else: -                self._read_config_file() diff --git a/tools/cru-py/cru/service/_external.py b/tools/cru-py/cru/service/_external.py deleted file mode 100644 index 2347e95..0000000 --- a/tools/cru-py/cru/service/_external.py +++ /dev/null @@ -1,81 +0,0 @@ -from ._base import AppCommandFeatureProvider -from ._nginx import NginxManager - - -class CliToolCommandProvider(AppCommandFeatureProvider): -    def __init__(self) -> None: -        super().__init__("cli-tool-command-provider") - -    def setup(self): -        pass - -    def get_command_info(self): -        return ("gen-cli", "Get commands of running external cli tools.") - -    def setup_arg_parser(self, arg_parser): -        subparsers = arg_parser.add_subparsers( -            dest="gen_cli_command", required=True, metavar="GEN_CLI_COMMAND" -        ) -        certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") -        certbot_parser.add_argument( -            "-t", "--test", action="store_true", help="run certbot in test mode" -        ) -        _install_docker_parser = subparsers.add_parser( -            "install-docker", help="print docker installation commands" -        ) -        _update_blog_parser = subparsers.add_parser( -            "update-blog", help="print blog update command" -        ) - -    def _print_install_docker_commands(self) -> None: -        output = """ -### COMMAND: uninstall apt docker -for pkg in docker.io docker-doc docker-compose \ -podman-docker containerd runc; \ -do sudo apt-get remove $pkg; done - -### COMMAND: prepare apt certs -sudo apt-get update -sudo apt-get install ca-certificates curl -sudo install -m 0755 -d /etc/apt/keyrings - -### COMMAND: install certs -sudo curl -fsSL https://download.docker.com/linux/debian/gpg \ --o /etc/apt/keyrings/docker.asc -sudo chmod a+r /etc/apt/keyrings/docker.asc - -### COMMAND: add docker apt source -echo \\ -  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \ -https://download.docker.com/linux/debian \\ -  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\ -  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null - -### COMMAND: update apt and install docker -sudo apt-get update -sudo apt-get install docker-ce docker-ce-cli containerd.io \ -docker-buildx-plugin docker-compose-plugin - -### COMMAND: setup system for docker -sudo systemctl enable docker -sudo systemctl start docker -sudo groupadd -f docker -sudo usermod -aG docker $USER -# Remember to log out and log back in for the group changes to take effect -""".strip() -        print(output) - -    def _print_update_blog_command(self): -        output = """ -### COMMAND: update blog -docker exec -it blog /scripts/update.bash -""".strip() -        print(output) - -    def run_command(self, args): -        if args.gen_cli_command == "certbot": -            self.app.get_feature(NginxManager).print_all_certbot_commands(args.test) -        elif args.gen_cli_command == "install-docker": -            self._print_install_docker_commands() -        elif args.gen_cli_command == "update-blog": -            self._print_update_blog_command()
\ No newline at end of file diff --git a/tools/cru-py/cru/service/_nginx.py b/tools/cru-py/cru/service/_nginx.py deleted file mode 100644 index 6c77971..0000000 --- a/tools/cru-py/cru/service/_nginx.py +++ /dev/null @@ -1,268 +0,0 @@ -from argparse import Namespace -from enum import Enum, auto -import re -import subprocess -from typing import TypeAlias - -from cru import CruInternalError - -from ._base import AppCommandFeatureProvider -from ._config import ConfigManager -from ._template import TemplateManager - - -class CertbotAction(Enum): -    CREATE = auto() -    EXPAND = auto() -    SHRINK = auto() -    RENEW = auto() - - -class NginxManager(AppCommandFeatureProvider): -    CertbotAction: TypeAlias = CertbotAction - -    def __init__(self) -> None: -        super().__init__("nginx-manager") -        self._domains_cache: list[str] | None = None - -    def setup(self) -> None: -        pass - -    @property -    def _config_manager(self) -> ConfigManager: -        return self.app.get_feature(ConfigManager) - -    @property -    def root_domain(self) -> str: -        return self._config_manager.get_domain_value_str() - -    @property -    def domains(self) -> list[str]: -        if self._domains_cache is None: -            self._domains_cache = self._get_domains() -        return self._domains_cache - -    @property -    def subdomains(self) -> list[str]: -        suffix = "." + self.root_domain -        return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)] - -    @property -    def _domain_config_name(self) -> str: -        return self._config_manager.domain_item_name - -    def _get_domains_from_text(self, text: str) -> set[str]: -        domains: set[str] = set() -        regex = re.compile(r"server_name\s+(\S+)\s*;") -        for match in regex.finditer(text): -            domains.add(match[1]) -        return domains - -    def _join_generated_nginx_conf_text(self) -> str: -        text = "" -        template_manager = self.app.get_feature(TemplateManager) -        for nginx_conf in template_manager.generate(): -            text += nginx_conf[1] -        return text - -    def _get_domains(self) -> list[str]: -        text = self._join_generated_nginx_conf_text() -        domains = list(self._get_domains_from_text(text)) -        domains.remove(self.root_domain) -        return [self.root_domain, *domains] - -    def _print_domains(self) -> None: -        for domain in self.domains: -            print(domain) - -    def _certbot_command( -        self, -        action: CertbotAction | str, -        test: bool, -        *, -        docker=True, -        standalone=None, -        email=None, -        agree_tos=True, -    ) -> str: -        if isinstance(action, str): -            action = CertbotAction[action.upper()] - -        command_args = [] - -        add_domain_option = True -        if action is CertbotAction.CREATE: -            if standalone is None: -                standalone = True -            command_action = "certonly" -        elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]: -            if standalone is None: -                standalone = False -            command_action = "certonly" -        elif action is CertbotAction.RENEW: -            if standalone is None: -                standalone = False -            add_domain_option = False -            command_action = "renew" -        else: -            raise CruInternalError("Invalid certbot action.") - -        data_dir = self.app.data_dir.full_path.as_posix() - -        if not docker: -            command_args.append("certbot") -        else: -            command_args.extend( -                [ -                    "docker run -it --rm --name certbot", -                    f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"', -                    f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"', -                ] -            ) -            if standalone: -                command_args.append('-p "0.0.0.0:80:80"') -            else: -                command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"') - -            command_args.append("certbot/certbot") - -        command_args.append(command_action) - -        command_args.append(f"--cert-name {self.root_domain}") - -        if standalone: -            command_args.append("--standalone") -        else: -            command_args.append("--webroot -w /var/www/certbot") - -        if add_domain_option: -            command_args.append(" ".join([f"-d {domain}" for domain in self.domains])) - -        if email is not None: -            command_args.append(f"--email {email}") - -        if agree_tos: -            command_args.append("--agree-tos") - -        if test: -            command_args.append("--test-cert --dry-run") - -        return " ".join(command_args) - -    def print_all_certbot_commands(self, test: bool): -        print("### COMMAND: (standalone) create certs") -        print( -            self._certbot_command( -                CertbotAction.CREATE, -                test, -                email=self._config_manager.get_email_value_str_optional(), -            ) -        ) -        print() -        print("### COMMAND: (webroot+nginx) expand or shrink certs") -        print( -            self._certbot_command( -                CertbotAction.EXPAND, -                test, -                email=self._config_manager.get_email_value_str_optional(), -            ) -        ) -        print() -        print("### COMMAND: (webroot+nginx) renew certs") -        print( -            self._certbot_command( -                CertbotAction.RENEW, -                test, -                email=self._config_manager.get_email_value_str_optional(), -            ) -        ) - -    @property -    def _cert_path_str(self) -> str: -        return str( -            self.app.data_dir.full_path -            / "certbot/certs/live" -            / self.root_domain -            / "fullchain.pem" -        ) - -    def get_command_info(self): -        return "nginx", "Manage nginx related things." - -    def setup_arg_parser(self, arg_parser): -        subparsers = arg_parser.add_subparsers( -            dest="nginx_command", required=True, metavar="NGINX_COMMAND" -        ) -        _list_parser = subparsers.add_parser("list", help="list domains") -        certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") -        certbot_parser.add_argument( -            "--no-test", -            action="store_true", -            help="remove args making certbot run in test mode", -        ) - -    def run_command(self, args: Namespace) -> None: -        if args.nginx_command == "list": -            self._print_domains() -        elif args.nginx_command == "certbot": -            self.print_all_certbot_commands(not args.no_test) - -    def _generate_dns_zone( -        self, -        ip: str, -        /, -        ttl: str | int = 600, -        *, -        enable_mail: bool = True, -        dkim: str | None = None, -    ) -> str: -        # TODO: Not complete and test now. -        root_domain = self.root_domain -        result = f"$ORIGIN {root_domain}.\n\n" -        result += "; A records\n" -        result += f"@ {ttl} IN A {ip}\n" -        for subdomain in self.subdomains: -            result += f"{subdomain} {ttl} IN A {ip}\n" - -        if enable_mail: -            result += "\n; MX records\n" -            result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n" -            result += "\n; SPF record\n" -            result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n' -            if dkim is not None: -                result += "\n; DKIM record\n" -                result += f'mail._domainkey {ttl} IN TEXT "{dkim}"' -                result += "\n; DMARC record\n" -                dmarc_options = [ -                    "v=DMARC1", -                    "p=none", -                    f"rua=mailto:dmarc.report@{root_domain}", -                    f"ruf=mailto:dmarc.report@{root_domain}", -                    "sp=none", -                    "ri=86400", -                ] -                result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n' -        return result - -    def _get_dkim_from_mailserver(self) -> str | None: -        # TODO: Not complete and test now. -        dkim_path = ( -            self.app.data_dir.full_path -            / "dms/config/opendkim/keys" -            / self.root_domain -            / "mail.txt" -        ) -        if not dkim_path.exists(): -            return None - -        p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True) -        value = "" -        for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")): -            value += match.group(1) -        return value - -    def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str: -        # TODO: Not complete and test now. -        return self._generate_dns_zone( -            ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver() -        ) diff --git a/tools/cru-py/cru/service/_template.py b/tools/cru-py/cru/service/_template.py deleted file mode 100644 index 1381700..0000000 --- a/tools/cru-py/cru/service/_template.py +++ /dev/null @@ -1,90 +0,0 @@ -from argparse import Namespace -from pathlib import Path -import shutil - -from cru.template import TemplateTree, CruStrWrapperTemplate - -from ._base import AppCommandFeatureProvider, AppFeaturePath -from ._config import ConfigManager - - -class TemplateManager(AppCommandFeatureProvider): -    def __init__(self, prefix: str | None = None): -        super().__init__("template-manager") -        self._prefix = prefix or self.app.app_id.upper() - -    def setup(self) -> None: -        self._templates_dir = self.app.add_path("templates", True) -        self._generated_dir = self.app.add_path("generated", True) -        self._template_tree: TemplateTree[CruStrWrapperTemplate] | None = None - -    @property -    def prefix(self) -> str: -        return self._prefix - -    @property -    def templates_dir(self) -> AppFeaturePath: -        return self._templates_dir - -    @property -    def generated_dir(self) -> AppFeaturePath: -        return self._generated_dir - -    @property -    def template_tree(self) -> TemplateTree[CruStrWrapperTemplate]: -        if self._template_tree is None: -            return self.reload() -        return self._template_tree - -    def reload(self) -> TemplateTree: -        self._template_tree = TemplateTree( -            lambda text: CruStrWrapperTemplate(text), self.templates_dir.full_path_str -        ) -        return self._template_tree - -    def _print_file_lists(self) -> None: -        for path, template in self.template_tree.templates: -            print(f"[{template.variable_count}]", path.as_posix()) - -    def generate(self) -> list[tuple[Path, str]]: -        config_manager = self.app.get_feature(ConfigManager) -        return self.template_tree.generate(config_manager.get_str_dict()) - -    def _generate_files(self, dry_run: bool) -> None: -        config_manager = self.app.get_feature(ConfigManager) -        if not dry_run and self.generated_dir.full_path.exists(): -            shutil.rmtree(self.generated_dir.full_path) -        self.template_tree.generate_to( -            self.generated_dir.full_path_str, config_manager.get_str_dict(), dry_run -        ) - -    def get_command_info(self): -        return ("template", "Manage templates.") - -    def setup_arg_parser(self, arg_parser): -        subparsers = arg_parser.add_subparsers( -            dest="template_command", required=True, metavar="TEMPLATE_COMMAND" -        ) -        _list_parser = subparsers.add_parser("list", help="list templates") -        _variables_parser = subparsers.add_parser( -            "variables", help="list variables used in all templates" -        ) -        generate_parser = subparsers.add_parser("generate", help="generate templates") -        generate_parser.add_argument( -            "--no-dry-run", action="store_true", help="generate and write target files" -        ) - -    def run_command(self, args: Namespace) -> None: -        if args.template_command == "list": -            self._print_file_lists() -        elif args.template_command == "variables": -            for var in self.template_tree.variables: -                print(var) -        elif args.template_command == "generate": -            dry_run = not args.no_dry_run -            self._generate_files(dry_run) -            if dry_run: -                print("Dry run successfully.") -                print( -                    f"Will delete dir {self.generated_dir.full_path_str} if it exists." -                )  | 
