aboutsummaryrefslogtreecommitdiff
path: root/tools/cru-py/cru/service
diff options
context:
space:
mode:
authorYuqian Yang <crupest@crupest.life>2025-02-22 18:11:35 +0800
committerYuqian Yang <crupest@crupest.life>2025-02-23 01:36:11 +0800
commit1e9b2436eaffa4130f6a69c3a108f6feb9dd4ac8 (patch)
tree585b6124b0100371b4bd8a291c4a59fbb5fbf1fe /tools/cru-py/cru/service
parenta931457d61b053682d5e89a0cfb411e43e5e21c7 (diff)
downloadcrupest-1e9b2436eaffa4130f6a69c3a108f6feb9dd4ac8.tar.gz
crupest-1e9b2436eaffa4130f6a69c3a108f6feb9dd4ac8.tar.bz2
crupest-1e9b2436eaffa4130f6a69c3a108f6feb9dd4ac8.zip
feat(services): refactor structure.
Diffstat (limited to 'tools/cru-py/cru/service')
-rw-r--r--tools/cru-py/cru/service/__init__.py0
-rw-r--r--tools/cru-py/cru/service/__main__.py20
-rw-r--r--tools/cru-py/cru/service/_app.py34
-rw-r--r--tools/cru-py/cru/service/_base.py449
-rw-r--r--tools/cru-py/cru/service/_config.py444
-rw-r--r--tools/cru-py/cru/service/_external.py81
-rw-r--r--tools/cru-py/cru/service/_nginx.py268
-rw-r--r--tools/cru-py/cru/service/_template.py90
8 files changed, 0 insertions, 1386 deletions
diff --git a/tools/cru-py/cru/service/__init__.py b/tools/cru-py/cru/service/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/tools/cru-py/cru/service/__init__.py
+++ /dev/null
diff --git a/tools/cru-py/cru/service/__main__.py b/tools/cru-py/cru/service/__main__.py
deleted file mode 100644
index 1c10e82..0000000
--- a/tools/cru-py/cru/service/__main__.py
+++ /dev/null
@@ -1,20 +0,0 @@
-from cru import CruException
-
-from ._app import create_app
-
-
-def main():
- app = create_app()
- app.run_command()
-
-
-if __name__ == "__main__":
- try:
- main()
- except CruException as e:
- user_message = e.get_user_message()
- if user_message is not None:
- print(f"Error: {user_message}")
- exit(1)
- else:
- raise
diff --git a/tools/cru-py/cru/service/_app.py b/tools/cru-py/cru/service/_app.py
deleted file mode 100644
index 6030dad..0000000
--- a/tools/cru-py/cru/service/_app.py
+++ /dev/null
@@ -1,34 +0,0 @@
-from ._base import (
- AppBase,
- CommandDispatcher,
- AppInitializer,
- PathCommandProvider,
-)
-from ._config import ConfigManager
-from ._template import TemplateManager
-from ._nginx import NginxManager
-from ._external import CliToolCommandProvider
-
-APP_ID = "crupest"
-
-
-class App(AppBase):
- def __init__(self):
- super().__init__(APP_ID, f"{APP_ID}-service")
- self.add_feature(PathCommandProvider())
- self.add_feature(AppInitializer())
- self.add_feature(ConfigManager())
- self.add_feature(TemplateManager())
- self.add_feature(NginxManager())
- self.add_feature(CliToolCommandProvider())
- self.add_feature(CommandDispatcher())
-
- def run_command(self):
- command_dispatcher = self.get_feature(CommandDispatcher)
- command_dispatcher.run_command()
-
-
-def create_app() -> App:
- app = App()
- app.setup()
- return app
diff --git a/tools/cru-py/cru/service/_base.py b/tools/cru-py/cru/service/_base.py
deleted file mode 100644
index ad813c9..0000000
--- a/tools/cru-py/cru/service/_base.py
+++ /dev/null
@@ -1,449 +0,0 @@
-from __future__ import annotations
-
-from argparse import ArgumentParser, Namespace
-from abc import ABC, abstractmethod
-import argparse
-import os
-from pathlib import Path
-from typing import TypeVar, overload
-
-from cru import CruException, CruLogicError
-
-_Feature = TypeVar("_Feature", bound="AppFeatureProvider")
-
-
-class AppError(CruException):
- pass
-
-
-class AppFeatureError(AppError):
- def __init__(self, message, feature: type | str, *args, **kwargs):
- super().__init__(message, *args, **kwargs)
- self._feature = feature
-
- @property
- def feature(self) -> type | str:
- return self._feature
-
-
-class AppPathError(CruException):
- def __init__(self, message, _path: str | Path, *args, **kwargs):
- super().__init__(message, *args, **kwargs)
- self._path = str(_path)
-
- @property
- def path(self) -> str:
- return self._path
-
-
-class AppPath(ABC):
- def __init__(self, id: str, is_dir: bool, description: str) -> None:
- self._is_dir = is_dir
- self._id = id
- self._description = description
-
- @property
- @abstractmethod
- def parent(self) -> AppPath | None: ...
-
- @property
- @abstractmethod
- def app(self) -> AppBase: ...
-
- @property
- def id(self) -> str:
- return self._id
-
- @property
- def description(self) -> str:
- return self._description
-
- @property
- def is_dir(self) -> bool:
- return self._is_dir
-
- @property
- @abstractmethod
- def full_path(self) -> Path: ...
-
- @property
- def full_path_str(self) -> str:
- return str(self.full_path)
-
- def check_parents(self, must_exist: bool = False) -> bool:
- for p in reversed(self.full_path.parents):
- if not p.exists() and not must_exist:
- return False
- if not p.is_dir():
- raise AppPathError("Parents' path must be a dir.", self.full_path)
- return True
-
- def check_self(self, must_exist: bool = False) -> bool:
- if not self.check_parents(must_exist):
- return False
- if not self.full_path.exists():
- if not must_exist:
- return False
- raise AppPathError("Not exist.", self.full_path)
- if self.is_dir:
- if not self.full_path.is_dir():
- raise AppPathError("Should be a directory, but not.", self.full_path)
- else:
- return True
- else:
- if not self.full_path.is_file():
- raise AppPathError("Should be a file, but not.", self.full_path)
- else:
- return True
-
- def ensure(self, create_file: bool = False) -> None:
- e = self.check_self(False)
- if not e:
- os.makedirs(self.full_path.parent, exist_ok=True)
- if self.is_dir:
- os.mkdir(self.full_path)
- elif create_file:
- with open(self.full_path, "w") as f:
- f.write("")
-
- def add_subpath(
- self,
- name: str,
- is_dir: bool,
- /,
- id: str | None = None,
- description: str = "",
- ) -> AppFeaturePath:
- return self.app.add_path(name, is_dir, self, id, description)
-
- @property
- def app_relative_path(self) -> Path:
- return self.full_path.relative_to(self.app.root.full_path)
-
-
-class AppFeaturePath(AppPath):
- def __init__(
- self,
- parent: AppPath,
- name: str,
- is_dir: bool,
- /,
- id: str | None = None,
- description: str = "",
- ) -> None:
- super().__init__(id or name, is_dir, description)
- self._name = name
- self._parent = parent
-
- @property
- def name(self) -> str:
- return self._name
-
- @property
- def parent(self) -> AppPath:
- return self._parent
-
- @property
- def app(self) -> AppBase:
- return self.parent.app
-
- @property
- def full_path(self) -> Path:
- return Path(self.parent.full_path, self.name).resolve()
-
-
-class AppRootPath(AppPath):
- def __init__(self, app: AppBase):
- super().__init__("root", True, "Application root path.")
- self._app = app
- self._full_path: Path | None = None
-
- @property
- def parent(self) -> None:
- return None
-
- @property
- def app(self) -> AppBase:
- return self._app
-
- @property
- def full_path(self) -> Path:
- if self._full_path is None:
- raise AppError("App root path is not set yet.")
- return self._full_path
-
- def setup(self, path: os.PathLike) -> None:
- if self._full_path is not None:
- raise AppError("App root path is already set.")
- self._full_path = Path(path).resolve()
-
-
-class AppFeatureProvider(ABC):
- def __init__(self, name: str, /, app: AppBase | None = None):
- super().__init__()
- self._name = name
- self._app = app if app else AppBase.get_instance()
-
- @property
- def app(self) -> AppBase:
- return self._app
-
- @property
- def name(self) -> str:
- return self._name
-
- @abstractmethod
- def setup(self) -> None: ...
-
-
-class AppCommandFeatureProvider(AppFeatureProvider):
- @abstractmethod
- def get_command_info(self) -> tuple[str, str]: ...
-
- @abstractmethod
- def setup_arg_parser(self, arg_parser: ArgumentParser): ...
-
- @abstractmethod
- def run_command(self, args: Namespace) -> None: ...
-
-
-DATA_DIR_NAME = "data"
-
-
-class PathCommandProvider(AppCommandFeatureProvider):
- def __init__(self) -> None:
- super().__init__("path-command-provider")
-
- def setup(self):
- pass
-
- def get_command_info(self):
- return ("path", "Get information about paths used by app.")
-
- def setup_arg_parser(self, arg_parser: ArgumentParser) -> None:
- subparsers = arg_parser.add_subparsers(
- dest="path_command", required=True, metavar="PATH_COMMAND"
- )
- _list_parser = subparsers.add_parser(
- "list", help="list special paths used by app"
- )
-
- def run_command(self, args: Namespace) -> None:
- if args.path_command == "list":
- for path in self.app.paths:
- print(f"{path.app_relative_path.as_posix()}: {path.description}")
-
-
-class CommandDispatcher(AppFeatureProvider):
- def __init__(self) -> None:
- super().__init__("command-dispatcher")
- self._parsed_args: argparse.Namespace | None = None
-
- def setup_arg_parser(self) -> None:
- epilog = """
-==> to start,
-./tools/manage init
-./tools/manage config init
-ln -s generated/docker-compose.yaml .
-# Then edit config file.
-
-==> to update
-git pull
-./tools/manage template generate --no-dry-run
-docker compose up
- """.strip()
-
- self._map: dict[str, AppCommandFeatureProvider] = {}
- arg_parser = argparse.ArgumentParser(
- description="Service management",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog=epilog,
- )
- arg_parser.add_argument(
- "--project-dir",
- help="The path of the project directory.",
- required=True,
- type=str,
- )
- subparsers = arg_parser.add_subparsers(
- dest="command",
- help="The management command to execute.",
- metavar="COMMAND",
- )
- for feature in self.app.features:
- if isinstance(feature, AppCommandFeatureProvider):
- info = feature.get_command_info()
- command_subparser = subparsers.add_parser(info[0], help=info[1])
- feature.setup_arg_parser(command_subparser)
- self._map[info[0]] = feature
- self._arg_parser = arg_parser
-
- def setup(self):
- pass
-
- @property
- def arg_parser(self) -> argparse.ArgumentParser:
- return self._arg_parser
-
- @property
- def map(self) -> dict[str, AppCommandFeatureProvider]:
- return self._map
-
- def get_program_parsed_args(self) -> argparse.Namespace:
- if self._parsed_args is None:
- self._parsed_args = self.arg_parser.parse_args()
- return self._parsed_args
-
- def run_command(self, args: argparse.Namespace | None = None) -> None:
- real_args = args or self.get_program_parsed_args()
- if real_args.command is None:
- self.arg_parser.print_help()
- return
- self.map[real_args.command].run_command(real_args)
-
-
-class AppInitializer(AppCommandFeatureProvider):
- def __init__(self) -> None:
- super().__init__("app-initializer")
-
- def _init_app(self) -> bool:
- if self.app.app_initialized:
- return False
- self.app.data_dir.ensure()
- return True
-
- def setup(self):
- pass
-
- def get_command_info(self):
- return ("init", "Initialize the app.")
-
- def setup_arg_parser(self, arg_parser):
- pass
-
- def run_command(self, args):
- init = self._init_app()
- if init:
- print("App initialized successfully.")
- else:
- print("App is already initialized. Do nothing.")
-
-
-class AppBase:
- _instance: AppBase | None = None
-
- @staticmethod
- def get_instance() -> AppBase:
- if AppBase._instance is None:
- raise AppError("App instance not initialized")
- return AppBase._instance
-
- def __init__(self, app_id: str, name: str):
- AppBase._instance = self
- self._app_id = app_id
- self._name = name
- self._root = AppRootPath(self)
- self._paths: list[AppFeaturePath] = []
- self._features: list[AppFeatureProvider] = []
-
- def setup(self) -> None:
- command_dispatcher = self.get_feature(CommandDispatcher)
- command_dispatcher.setup_arg_parser()
- program_args = command_dispatcher.get_program_parsed_args()
- self.setup_root(program_args.project_dir)
- self._data_dir = self.add_path(DATA_DIR_NAME, True, id="data")
- for feature in self.features:
- feature.setup()
- for path in self.paths:
- path.check_self()
-
- @property
- def app_id(self) -> str:
- return self._app_id
-
- @property
- def name(self) -> str:
- return self._name
-
- @property
- def root(self) -> AppRootPath:
- return self._root
-
- def setup_root(self, path: os.PathLike) -> None:
- self._root.setup(path)
-
- @property
- def data_dir(self) -> AppFeaturePath:
- return self._data_dir
-
- @property
- def app_initialized(self) -> bool:
- return self.data_dir.check_self()
-
- def ensure_app_initialized(self) -> AppRootPath:
- if not self.app_initialized:
- raise AppError(
- user_message="Root directory does not exist. "
- "Please run 'init' to create one."
- )
- return self.root
-
- @property
- def features(self) -> list[AppFeatureProvider]:
- return self._features
-
- @property
- def paths(self) -> list[AppFeaturePath]:
- return self._paths
-
- def add_feature(self, feature: _Feature) -> _Feature:
- for f in self.features:
- if f.name == feature.name:
- raise AppFeatureError(
- f"Duplicate feature name: {feature.name}.", feature.name
- )
- self._features.append(feature)
- return feature
-
- def add_path(
- self,
- name: str,
- is_dir: bool,
- /,
- parent: AppPath | None = None,
- id: str | None = None,
- description: str = "",
- ) -> AppFeaturePath:
- p = AppFeaturePath(
- parent or self.root, name, is_dir, id=id, description=description
- )
- self._paths.append(p)
- return p
-
- @overload
- def get_feature(self, feature: str) -> AppFeatureProvider: ...
-
- @overload
- def get_feature(self, feature: type[_Feature]) -> _Feature: ...
-
- def get_feature(
- self, feature: str | type[_Feature]
- ) -> AppFeatureProvider | _Feature:
- if isinstance(feature, str):
- for f in self._features:
- if f.name == feature:
- return f
- elif isinstance(feature, type):
- for f in self._features:
- if isinstance(f, feature):
- return f
- else:
- raise CruLogicError("Argument must be the name of feature or its class.")
-
- raise AppFeatureError(f"Feature {feature} not found.", feature)
-
- def get_path(self, name: str) -> AppFeaturePath:
- for p in self._paths:
- if p.id == name or p.name == name:
- return p
- raise AppPathError(f"Application path {name} not found.", name)
diff --git a/tools/cru-py/cru/service/_config.py b/tools/cru-py/cru/service/_config.py
deleted file mode 100644
index cbb9533..0000000
--- a/tools/cru-py/cru/service/_config.py
+++ /dev/null
@@ -1,444 +0,0 @@
-from collections.abc import Iterable
-from typing import Any, Literal, overload
-
-from cru import CruException, CruNotFound
-from cru.config import Configuration, ConfigItem
-from cru.value import (
- INTEGER_VALUE_TYPE,
- TEXT_VALUE_TYPE,
- CruValueTypeError,
- RandomStringValueGenerator,
- UuidValueGenerator,
-)
-from cru.parsing import ParseError, SimpleLineConfigParser
-
-from ._base import AppFeaturePath, AppCommandFeatureProvider
-
-
-class AppConfigError(CruException):
- def __init__(
- self, message: str, configuration: Configuration, *args, **kwargs
- ) -> None:
- super().__init__(message, *args, **kwargs)
- self._configuration = configuration
-
- @property
- def configuration(self) -> Configuration:
- return self._configuration
-
-
-class AppConfigFileError(AppConfigError):
- def __init__(
- self,
- message: str,
- configuration: Configuration,
- *args,
- **kwargs,
- ) -> None:
- super().__init__(message, configuration, *args, **kwargs)
-
-
-class AppConfigFileNotFoundError(AppConfigFileError):
- def __init__(
- self,
- message: str,
- configuration: Configuration,
- file_path: str,
- *args,
- **kwargs,
- ) -> None:
- super().__init__(message, configuration, *args, **kwargs)
- self._file_path = file_path
-
- @property
- def file_path(self) -> str:
- return self._file_path
-
-
-class AppConfigFileParseError(AppConfigFileError):
- def __init__(
- self,
- message: str,
- configuration: Configuration,
- file_content: str,
- *args,
- **kwargs,
- ) -> None:
- super().__init__(message, configuration, *args, **kwargs)
- self._file_content = file_content
- self.__cause__: ParseError
-
- @property
- def file_content(self) -> str:
- return self._file_content
-
- def get_user_message(self) -> str:
- return f"Error while parsing config file at line {self.__cause__.line_number}."
-
-
-class AppConfigFileEntryError(AppConfigFileError):
- def __init__(
- self,
- message: str,
- configuration: Configuration,
- entries: Iterable[SimpleLineConfigParser.Entry],
- *args,
- **kwargs,
- ) -> None:
- super().__init__(message, configuration, *args, **kwargs)
- self._entries = list(entries)
-
- @property
- def error_entries(self) -> list[SimpleLineConfigParser.Entry]:
- return self._entries
-
- @staticmethod
- def entries_to_friendly_message(
- entries: Iterable[SimpleLineConfigParser.Entry],
- ) -> str:
- return "\n".join(
- f"line {entry.line_number}: {entry.key}={entry.value}" for entry in entries
- )
-
- @property
- def friendly_message_head(self) -> str:
- return "Error entries found in config file"
-
- def get_user_message(self) -> str:
- return (
- f"{self.friendly_message_head}:\n"
- f"{self.entries_to_friendly_message(self.error_entries)}"
- )
-
-
-class AppConfigDuplicateEntryError(AppConfigFileEntryError):
- @property
- def friendly_message_head(self) -> str:
- return "Duplicate entries found in config file"
-
-
-class AppConfigEntryValueFormatError(AppConfigFileEntryError):
- @property
- def friendly_message_head(self) -> str:
- return "Invalid value format for entries"
-
-
-class AppConfigItemNotSetError(AppConfigError):
- def __init__(
- self,
- message: str,
- configuration: Configuration,
- items: list[ConfigItem],
- *args,
- **kwargs,
- ) -> None:
- super().__init__(message, configuration, *args, **kwargs)
- self._items = items
-
-
-class ConfigManager(AppCommandFeatureProvider):
- def __init__(self) -> None:
- super().__init__("config-manager")
- configuration = Configuration()
- self._configuration = configuration
- self._loaded: bool = False
- self._init_app_defined_items()
-
- def _init_app_defined_items(self) -> None:
- prefix = self.config_name_prefix
-
- def _add_text(name: str, description: str) -> ConfigItem:
- item = ConfigItem(f"{prefix}_{name}", description, TEXT_VALUE_TYPE)
- self.configuration.add(item)
- return item
-
- def _add_uuid(name: str, description: str) -> ConfigItem:
- item = ConfigItem(
- f"{prefix}_{name}",
- description,
- TEXT_VALUE_TYPE,
- default=UuidValueGenerator(),
- )
- self.configuration.add(item)
- return item
-
- def _add_random_string(
- name: str, description: str, length: int = 32, secure: bool = True
- ) -> ConfigItem:
- item = ConfigItem(
- f"{prefix}_{name}",
- description,
- TEXT_VALUE_TYPE,
- default=RandomStringValueGenerator(length, secure),
- )
- self.configuration.add(item)
- return item
-
- def _add_int(name: str, description: str) -> ConfigItem:
- item = ConfigItem(f"{prefix}_{name}", description, INTEGER_VALUE_TYPE)
- self.configuration.add(item)
- return item
-
- self._domain = _add_text("DOMAIN", "domain name")
- self._email = _add_text("EMAIL", "admin email address")
- _add_text(
- "AUTO_BACKUP_COS_SECRET_ID",
- "access key id for Tencent COS, used for auto backup",
- )
- _add_text(
- "AUTO_BACKUP_COS_SECRET_KEY",
- "access key secret for Tencent COS, used for auto backup",
- )
- _add_text(
- "AUTO_BACKUP_COS_ENDPOINT",
- "endpoint (cos.*.myqcloud.com) for Tencent COS, used for auto backup",
- )
- _add_text(
- "AUTO_BACKUP_COS_BUCKET",
- "bucket name for Tencent COS, used for auto backup",
- )
- _add_uuid("V2RAY_TOKEN", "v2ray user id")
- _add_uuid("V2RAY_PATH", "v2ray path, which will be prefixed by _")
- _add_random_string("2FAUTH_APP_KEY", "2FAuth App Key")
- _add_text("2FAUTH_MAIL_USERNAME", "2FAuth SMTP user")
- _add_text("2FAUTH_MAIL_PASSWORD", "2FAuth SMTP password")
- _add_text("GIT_SERVER_USERNAME", "Git server username")
- _add_text("GIT_SERVER_PASSWORD", "Git server password")
-
- def setup(self) -> None:
- self._config_file_path = self.app.data_dir.add_subpath(
- "config", False, description="Configuration file path."
- )
-
- @property
- def config_name_prefix(self) -> str:
- return self.app.app_id.upper()
-
- @property
- def configuration(self) -> Configuration:
- return self._configuration
-
- @property
- def config_file_path(self) -> AppFeaturePath:
- return self._config_file_path
-
- @property
- def all_set(self) -> bool:
- return self.configuration.all_set
-
- def get_item(self, name: str) -> ConfigItem[Any]:
- if not name.startswith(self.config_name_prefix + "_"):
- name = f"{self.config_name_prefix}_{name}"
-
- item = self.configuration.get_or(name, None)
- if item is None:
- raise AppConfigError(f"Config item '{name}' not found.", self.configuration)
- return item
-
- @overload
- def get_item_value_str(self, name: str) -> str: ...
-
- @overload
- def get_item_value_str(self, name: str, ensure_set: Literal[True]) -> str: ...
-
- @overload
- def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: ...
-
- def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None:
- self.load_config_file()
- item = self.get_item(name)
- if not item.is_set:
- if ensure_set:
- raise AppConfigItemNotSetError(
- f"Config item '{name}' is not set.", self.configuration, [item]
- )
- return None
- return item.value_str
-
- def get_str_dict(self, ensure_all_set: bool = True) -> dict[str, str]:
- self.load_config_file()
- if ensure_all_set and not self.configuration.all_set:
- raise AppConfigItemNotSetError(
- "Some config items are not set.",
- self.configuration,
- self.configuration.get_unset_items(),
- )
- return self.configuration.to_str_dict()
-
- @property
- def domain_item_name(self) -> str:
- return self._domain.name
-
- def get_domain_value_str(self) -> str:
- return self.get_item_value_str(self._domain.name)
-
- def get_email_value_str_optional(self) -> str | None:
- return self.get_item_value_str(self._email.name, ensure_set=False)
-
- def _set_with_default(self) -> None:
- if not self.configuration.all_not_set:
- raise AppConfigError(
- "Config is not clean. "
- "Some config items are already set. "
- "Can't set again with default value.",
- self.configuration,
- )
- for item in self.configuration:
- if item.can_generate_default:
- item.set_value(item.generate_default_value())
-
- def _to_config_file_content(self) -> str:
- content = "".join(
- [
- f"{item.name}={item.value_str if item.is_set else ''}\n"
- for item in self.configuration
- ]
- )
- return content
-
- def _create_init_config_file(self) -> None:
- if self.config_file_path.check_self():
- raise AppConfigError(
- "Config file already exists.",
- self.configuration,
- user_message=f"The config file at "
- f"{self.config_file_path.full_path_str} already exists.",
- )
- self._set_with_default()
- self.config_file_path.ensure()
- with open(
- self.config_file_path.full_path, "w", encoding="utf-8", newline="\n"
- ) as file:
- file.write(self._to_config_file_content())
-
- def _parse_config_file(self) -> SimpleLineConfigParser.Result:
- if not self.config_file_path.check_self():
- raise AppConfigFileNotFoundError(
- "Config file not found.",
- self.configuration,
- self.config_file_path.full_path_str,
- user_message=f"The config file at "
- f"{self.config_file_path.full_path_str} does not exist. "
- f"You can create an initial one with 'init' command.",
- )
-
- text = self.config_file_path.full_path.read_text()
- try:
- parser = SimpleLineConfigParser()
- return parser.parse(text)
- except ParseError as e:
- raise AppConfigFileParseError(
- "Failed to parse config file.", self.configuration, text
- ) from e
-
- def _parse_and_print_config_file(self) -> None:
- parse_result = self._parse_config_file()
- for entry in parse_result:
- print(f"{entry.key}={entry.value}")
-
- def _check_duplicate(
- self,
- parse_result: dict[str, list[SimpleLineConfigParser.Entry]],
- ) -> dict[str, SimpleLineConfigParser.Entry]:
- entry_dict: dict[str, SimpleLineConfigParser.Entry] = {}
- duplicate_entries: list[SimpleLineConfigParser.Entry] = []
- for key, entries in parse_result.items():
- entry_dict[key] = entries[0]
- if len(entries) > 1:
- duplicate_entries.extend(entries)
- if len(duplicate_entries) > 0:
- raise AppConfigDuplicateEntryError(
- "Duplicate entries found.", self.configuration, duplicate_entries
- )
-
- return entry_dict
-
- def _check_type(
- self, entry_dict: dict[str, SimpleLineConfigParser.Entry]
- ) -> dict[str, Any]:
- value_dict: dict[str, Any] = {}
- error_entries: list[SimpleLineConfigParser.Entry] = []
- errors: list[CruValueTypeError] = []
- for key, entry in entry_dict.items():
- try:
- if entry.value == "":
- value_dict[key] = None
- else:
- value = entry.value
- config_item = self.configuration.get_or(key)
- if config_item is not CruNotFound.VALUE:
- value = config_item.value_type.convert_str_to_value(value)
- value_dict[key] = value
- except CruValueTypeError as e:
- error_entries.append(entry)
- errors.append(e)
- if len(error_entries) > 0:
- raise AppConfigEntryValueFormatError(
- "Entry value format is not correct.",
- self.configuration,
- error_entries,
- ) from ExceptionGroup("Multiple format errors occurred.", errors)
- return value_dict
-
- def _read_config_file(self) -> dict[str, Any]:
- parsed = self._parse_config_file()
- entry_groups = parsed.cru_iter().group_by(lambda e: e.key)
- entry_dict = self._check_duplicate(entry_groups)
- value_dict = self._check_type(entry_dict)
- return value_dict
-
- def _real_load_config_file(self) -> None:
- self.configuration.reset_all()
- value_dict = self._read_config_file()
- for key, value in value_dict.items():
- if value is None:
- continue
- self.configuration.set_config_item(key, value)
-
- def load_config_file(self, force=False) -> None:
- if force or not self._loaded:
- self._real_load_config_file()
- self._loaded = True
-
- def _print_app_config_info(self):
- for item in self.configuration:
- print(item.description_str)
-
- def get_command_info(self):
- return "config", "Manage configuration."
-
- def setup_arg_parser(self, arg_parser) -> None:
- subparsers = arg_parser.add_subparsers(
- dest="config_command", required=True, metavar="CONFIG_COMMAND"
- )
- _init_parser = subparsers.add_parser(
- "init", help="create an initial config file"
- )
- _print_app_parser = subparsers.add_parser(
- "print-app",
- help="print information of the config items defined by app",
- )
- _print_parser = subparsers.add_parser("print", help="print current config")
- _check_config_parser = subparsers.add_parser(
- "check",
- help="check the validity of the config file",
- )
- _check_config_parser.add_argument(
- "-f",
- "--format-only",
- action="store_true",
- help="only check content format, not app config item requirements.",
- )
-
- def run_command(self, args) -> None:
- if args.config_command == "init":
- self._create_init_config_file()
- elif args.config_command == "print-app":
- self._print_app_config_info()
- elif args.config_command == "print":
- self._parse_and_print_config_file()
- elif args.config_command == "check":
- if args.format_only:
- self._parse_config_file()
- else:
- self._read_config_file()
diff --git a/tools/cru-py/cru/service/_external.py b/tools/cru-py/cru/service/_external.py
deleted file mode 100644
index 2347e95..0000000
--- a/tools/cru-py/cru/service/_external.py
+++ /dev/null
@@ -1,81 +0,0 @@
-from ._base import AppCommandFeatureProvider
-from ._nginx import NginxManager
-
-
-class CliToolCommandProvider(AppCommandFeatureProvider):
- def __init__(self) -> None:
- super().__init__("cli-tool-command-provider")
-
- def setup(self):
- pass
-
- def get_command_info(self):
- return ("gen-cli", "Get commands of running external cli tools.")
-
- def setup_arg_parser(self, arg_parser):
- subparsers = arg_parser.add_subparsers(
- dest="gen_cli_command", required=True, metavar="GEN_CLI_COMMAND"
- )
- certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
- certbot_parser.add_argument(
- "-t", "--test", action="store_true", help="run certbot in test mode"
- )
- _install_docker_parser = subparsers.add_parser(
- "install-docker", help="print docker installation commands"
- )
- _update_blog_parser = subparsers.add_parser(
- "update-blog", help="print blog update command"
- )
-
- def _print_install_docker_commands(self) -> None:
- output = """
-### COMMAND: uninstall apt docker
-for pkg in docker.io docker-doc docker-compose \
-podman-docker containerd runc; \
-do sudo apt-get remove $pkg; done
-
-### COMMAND: prepare apt certs
-sudo apt-get update
-sudo apt-get install ca-certificates curl
-sudo install -m 0755 -d /etc/apt/keyrings
-
-### COMMAND: install certs
-sudo curl -fsSL https://download.docker.com/linux/debian/gpg \
--o /etc/apt/keyrings/docker.asc
-sudo chmod a+r /etc/apt/keyrings/docker.asc
-
-### COMMAND: add docker apt source
-echo \\
- "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \
-https://download.docker.com/linux/debian \\
- $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\
- sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
-
-### COMMAND: update apt and install docker
-sudo apt-get update
-sudo apt-get install docker-ce docker-ce-cli containerd.io \
-docker-buildx-plugin docker-compose-plugin
-
-### COMMAND: setup system for docker
-sudo systemctl enable docker
-sudo systemctl start docker
-sudo groupadd -f docker
-sudo usermod -aG docker $USER
-# Remember to log out and log back in for the group changes to take effect
-""".strip()
- print(output)
-
- def _print_update_blog_command(self):
- output = """
-### COMMAND: update blog
-docker exec -it blog /scripts/update.bash
-""".strip()
- print(output)
-
- def run_command(self, args):
- if args.gen_cli_command == "certbot":
- self.app.get_feature(NginxManager).print_all_certbot_commands(args.test)
- elif args.gen_cli_command == "install-docker":
- self._print_install_docker_commands()
- elif args.gen_cli_command == "update-blog":
- self._print_update_blog_command() \ No newline at end of file
diff --git a/tools/cru-py/cru/service/_nginx.py b/tools/cru-py/cru/service/_nginx.py
deleted file mode 100644
index 6c77971..0000000
--- a/tools/cru-py/cru/service/_nginx.py
+++ /dev/null
@@ -1,268 +0,0 @@
-from argparse import Namespace
-from enum import Enum, auto
-import re
-import subprocess
-from typing import TypeAlias
-
-from cru import CruInternalError
-
-from ._base import AppCommandFeatureProvider
-from ._config import ConfigManager
-from ._template import TemplateManager
-
-
-class CertbotAction(Enum):
- CREATE = auto()
- EXPAND = auto()
- SHRINK = auto()
- RENEW = auto()
-
-
-class NginxManager(AppCommandFeatureProvider):
- CertbotAction: TypeAlias = CertbotAction
-
- def __init__(self) -> None:
- super().__init__("nginx-manager")
- self._domains_cache: list[str] | None = None
-
- def setup(self) -> None:
- pass
-
- @property
- def _config_manager(self) -> ConfigManager:
- return self.app.get_feature(ConfigManager)
-
- @property
- def root_domain(self) -> str:
- return self._config_manager.get_domain_value_str()
-
- @property
- def domains(self) -> list[str]:
- if self._domains_cache is None:
- self._domains_cache = self._get_domains()
- return self._domains_cache
-
- @property
- def subdomains(self) -> list[str]:
- suffix = "." + self.root_domain
- return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)]
-
- @property
- def _domain_config_name(self) -> str:
- return self._config_manager.domain_item_name
-
- def _get_domains_from_text(self, text: str) -> set[str]:
- domains: set[str] = set()
- regex = re.compile(r"server_name\s+(\S+)\s*;")
- for match in regex.finditer(text):
- domains.add(match[1])
- return domains
-
- def _join_generated_nginx_conf_text(self) -> str:
- text = ""
- template_manager = self.app.get_feature(TemplateManager)
- for nginx_conf in template_manager.generate():
- text += nginx_conf[1]
- return text
-
- def _get_domains(self) -> list[str]:
- text = self._join_generated_nginx_conf_text()
- domains = list(self._get_domains_from_text(text))
- domains.remove(self.root_domain)
- return [self.root_domain, *domains]
-
- def _print_domains(self) -> None:
- for domain in self.domains:
- print(domain)
-
- def _certbot_command(
- self,
- action: CertbotAction | str,
- test: bool,
- *,
- docker=True,
- standalone=None,
- email=None,
- agree_tos=True,
- ) -> str:
- if isinstance(action, str):
- action = CertbotAction[action.upper()]
-
- command_args = []
-
- add_domain_option = True
- if action is CertbotAction.CREATE:
- if standalone is None:
- standalone = True
- command_action = "certonly"
- elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]:
- if standalone is None:
- standalone = False
- command_action = "certonly"
- elif action is CertbotAction.RENEW:
- if standalone is None:
- standalone = False
- add_domain_option = False
- command_action = "renew"
- else:
- raise CruInternalError("Invalid certbot action.")
-
- data_dir = self.app.data_dir.full_path.as_posix()
-
- if not docker:
- command_args.append("certbot")
- else:
- command_args.extend(
- [
- "docker run -it --rm --name certbot",
- f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"',
- f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"',
- ]
- )
- if standalone:
- command_args.append('-p "0.0.0.0:80:80"')
- else:
- command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"')
-
- command_args.append("certbot/certbot")
-
- command_args.append(command_action)
-
- command_args.append(f"--cert-name {self.root_domain}")
-
- if standalone:
- command_args.append("--standalone")
- else:
- command_args.append("--webroot -w /var/www/certbot")
-
- if add_domain_option:
- command_args.append(" ".join([f"-d {domain}" for domain in self.domains]))
-
- if email is not None:
- command_args.append(f"--email {email}")
-
- if agree_tos:
- command_args.append("--agree-tos")
-
- if test:
- command_args.append("--test-cert --dry-run")
-
- return " ".join(command_args)
-
- def print_all_certbot_commands(self, test: bool):
- print("### COMMAND: (standalone) create certs")
- print(
- self._certbot_command(
- CertbotAction.CREATE,
- test,
- email=self._config_manager.get_email_value_str_optional(),
- )
- )
- print()
- print("### COMMAND: (webroot+nginx) expand or shrink certs")
- print(
- self._certbot_command(
- CertbotAction.EXPAND,
- test,
- email=self._config_manager.get_email_value_str_optional(),
- )
- )
- print()
- print("### COMMAND: (webroot+nginx) renew certs")
- print(
- self._certbot_command(
- CertbotAction.RENEW,
- test,
- email=self._config_manager.get_email_value_str_optional(),
- )
- )
-
- @property
- def _cert_path_str(self) -> str:
- return str(
- self.app.data_dir.full_path
- / "certbot/certs/live"
- / self.root_domain
- / "fullchain.pem"
- )
-
- def get_command_info(self):
- return "nginx", "Manage nginx related things."
-
- def setup_arg_parser(self, arg_parser):
- subparsers = arg_parser.add_subparsers(
- dest="nginx_command", required=True, metavar="NGINX_COMMAND"
- )
- _list_parser = subparsers.add_parser("list", help="list domains")
- certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
- certbot_parser.add_argument(
- "--no-test",
- action="store_true",
- help="remove args making certbot run in test mode",
- )
-
- def run_command(self, args: Namespace) -> None:
- if args.nginx_command == "list":
- self._print_domains()
- elif args.nginx_command == "certbot":
- self.print_all_certbot_commands(not args.no_test)
-
- def _generate_dns_zone(
- self,
- ip: str,
- /,
- ttl: str | int = 600,
- *,
- enable_mail: bool = True,
- dkim: str | None = None,
- ) -> str:
- # TODO: Not complete and test now.
- root_domain = self.root_domain
- result = f"$ORIGIN {root_domain}.\n\n"
- result += "; A records\n"
- result += f"@ {ttl} IN A {ip}\n"
- for subdomain in self.subdomains:
- result += f"{subdomain} {ttl} IN A {ip}\n"
-
- if enable_mail:
- result += "\n; MX records\n"
- result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n"
- result += "\n; SPF record\n"
- result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n'
- if dkim is not None:
- result += "\n; DKIM record\n"
- result += f'mail._domainkey {ttl} IN TEXT "{dkim}"'
- result += "\n; DMARC record\n"
- dmarc_options = [
- "v=DMARC1",
- "p=none",
- f"rua=mailto:dmarc.report@{root_domain}",
- f"ruf=mailto:dmarc.report@{root_domain}",
- "sp=none",
- "ri=86400",
- ]
- result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n'
- return result
-
- def _get_dkim_from_mailserver(self) -> str | None:
- # TODO: Not complete and test now.
- dkim_path = (
- self.app.data_dir.full_path
- / "dms/config/opendkim/keys"
- / self.root_domain
- / "mail.txt"
- )
- if not dkim_path.exists():
- return None
-
- p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True)
- value = ""
- for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")):
- value += match.group(1)
- return value
-
- def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str:
- # TODO: Not complete and test now.
- return self._generate_dns_zone(
- ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver()
- )
diff --git a/tools/cru-py/cru/service/_template.py b/tools/cru-py/cru/service/_template.py
deleted file mode 100644
index 1381700..0000000
--- a/tools/cru-py/cru/service/_template.py
+++ /dev/null
@@ -1,90 +0,0 @@
-from argparse import Namespace
-from pathlib import Path
-import shutil
-
-from cru.template import TemplateTree, CruStrWrapperTemplate
-
-from ._base import AppCommandFeatureProvider, AppFeaturePath
-from ._config import ConfigManager
-
-
-class TemplateManager(AppCommandFeatureProvider):
- def __init__(self, prefix: str | None = None):
- super().__init__("template-manager")
- self._prefix = prefix or self.app.app_id.upper()
-
- def setup(self) -> None:
- self._templates_dir = self.app.add_path("templates", True)
- self._generated_dir = self.app.add_path("generated", True)
- self._template_tree: TemplateTree[CruStrWrapperTemplate] | None = None
-
- @property
- def prefix(self) -> str:
- return self._prefix
-
- @property
- def templates_dir(self) -> AppFeaturePath:
- return self._templates_dir
-
- @property
- def generated_dir(self) -> AppFeaturePath:
- return self._generated_dir
-
- @property
- def template_tree(self) -> TemplateTree[CruStrWrapperTemplate]:
- if self._template_tree is None:
- return self.reload()
- return self._template_tree
-
- def reload(self) -> TemplateTree:
- self._template_tree = TemplateTree(
- lambda text: CruStrWrapperTemplate(text), self.templates_dir.full_path_str
- )
- return self._template_tree
-
- def _print_file_lists(self) -> None:
- for path, template in self.template_tree.templates:
- print(f"[{template.variable_count}]", path.as_posix())
-
- def generate(self) -> list[tuple[Path, str]]:
- config_manager = self.app.get_feature(ConfigManager)
- return self.template_tree.generate(config_manager.get_str_dict())
-
- def _generate_files(self, dry_run: bool) -> None:
- config_manager = self.app.get_feature(ConfigManager)
- if not dry_run and self.generated_dir.full_path.exists():
- shutil.rmtree(self.generated_dir.full_path)
- self.template_tree.generate_to(
- self.generated_dir.full_path_str, config_manager.get_str_dict(), dry_run
- )
-
- def get_command_info(self):
- return ("template", "Manage templates.")
-
- def setup_arg_parser(self, arg_parser):
- subparsers = arg_parser.add_subparsers(
- dest="template_command", required=True, metavar="TEMPLATE_COMMAND"
- )
- _list_parser = subparsers.add_parser("list", help="list templates")
- _variables_parser = subparsers.add_parser(
- "variables", help="list variables used in all templates"
- )
- generate_parser = subparsers.add_parser("generate", help="generate templates")
- generate_parser.add_argument(
- "--no-dry-run", action="store_true", help="generate and write target files"
- )
-
- def run_command(self, args: Namespace) -> None:
- if args.template_command == "list":
- self._print_file_lists()
- elif args.template_command == "variables":
- for var in self.template_tree.variables:
- print(var)
- elif args.template_command == "generate":
- dry_run = not args.no_dry_run
- self._generate_files(dry_run)
- if dry_run:
- print("Dry run successfully.")
- print(
- f"Will delete dir {self.generated_dir.full_path_str} if it exists."
- )