aboutsummaryrefslogtreecommitdiff
path: root/tools/cru-py/cru/service
diff options
context:
space:
mode:
Diffstat (limited to 'tools/cru-py/cru/service')
-rw-r--r--tools/cru-py/cru/service/__init__.py0
-rw-r--r--tools/cru-py/cru/service/__main__.py20
-rw-r--r--tools/cru-py/cru/service/_app.py34
-rw-r--r--tools/cru-py/cru/service/_base.py449
-rw-r--r--tools/cru-py/cru/service/_config.py446
-rw-r--r--tools/cru-py/cru/service/_external.py81
-rw-r--r--tools/cru-py/cru/service/_nginx.py281
-rw-r--r--tools/cru-py/cru/service/_template.py86
8 files changed, 1397 insertions, 0 deletions
diff --git a/tools/cru-py/cru/service/__init__.py b/tools/cru-py/cru/service/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tools/cru-py/cru/service/__init__.py
diff --git a/tools/cru-py/cru/service/__main__.py b/tools/cru-py/cru/service/__main__.py
new file mode 100644
index 0000000..1c10e82
--- /dev/null
+++ b/tools/cru-py/cru/service/__main__.py
@@ -0,0 +1,20 @@
+from cru import CruException
+
+from ._app import create_app
+
+
+def main():
+ app = create_app()
+ app.run_command()
+
+
+if __name__ == "__main__":
+ try:
+ main()
+ except CruException as e:
+ user_message = e.get_user_message()
+ if user_message is not None:
+ print(f"Error: {user_message}")
+ exit(1)
+ else:
+ raise
diff --git a/tools/cru-py/cru/service/_app.py b/tools/cru-py/cru/service/_app.py
new file mode 100644
index 0000000..6030dad
--- /dev/null
+++ b/tools/cru-py/cru/service/_app.py
@@ -0,0 +1,34 @@
+from ._base import (
+ AppBase,
+ CommandDispatcher,
+ AppInitializer,
+ PathCommandProvider,
+)
+from ._config import ConfigManager
+from ._template import TemplateManager
+from ._nginx import NginxManager
+from ._external import CliToolCommandProvider
+
+APP_ID = "crupest"
+
+
+class App(AppBase):
+ def __init__(self):
+ super().__init__(APP_ID, f"{APP_ID}-service")
+ self.add_feature(PathCommandProvider())
+ self.add_feature(AppInitializer())
+ self.add_feature(ConfigManager())
+ self.add_feature(TemplateManager())
+ self.add_feature(NginxManager())
+ self.add_feature(CliToolCommandProvider())
+ self.add_feature(CommandDispatcher())
+
+ def run_command(self):
+ command_dispatcher = self.get_feature(CommandDispatcher)
+ command_dispatcher.run_command()
+
+
+def create_app() -> App:
+ app = App()
+ app.setup()
+ return app
diff --git a/tools/cru-py/cru/service/_base.py b/tools/cru-py/cru/service/_base.py
new file mode 100644
index 0000000..ad813c9
--- /dev/null
+++ b/tools/cru-py/cru/service/_base.py
@@ -0,0 +1,449 @@
+from __future__ import annotations
+
+from argparse import ArgumentParser, Namespace
+from abc import ABC, abstractmethod
+import argparse
+import os
+from pathlib import Path
+from typing import TypeVar, overload
+
+from cru import CruException, CruLogicError
+
+_Feature = TypeVar("_Feature", bound="AppFeatureProvider")
+
+
+class AppError(CruException):
+ pass
+
+
+class AppFeatureError(AppError):
+ def __init__(self, message, feature: type | str, *args, **kwargs):
+ super().__init__(message, *args, **kwargs)
+ self._feature = feature
+
+ @property
+ def feature(self) -> type | str:
+ return self._feature
+
+
+class AppPathError(CruException):
+ def __init__(self, message, _path: str | Path, *args, **kwargs):
+ super().__init__(message, *args, **kwargs)
+ self._path = str(_path)
+
+ @property
+ def path(self) -> str:
+ return self._path
+
+
+class AppPath(ABC):
+ def __init__(self, id: str, is_dir: bool, description: str) -> None:
+ self._is_dir = is_dir
+ self._id = id
+ self._description = description
+
+ @property
+ @abstractmethod
+ def parent(self) -> AppPath | None: ...
+
+ @property
+ @abstractmethod
+ def app(self) -> AppBase: ...
+
+ @property
+ def id(self) -> str:
+ return self._id
+
+ @property
+ def description(self) -> str:
+ return self._description
+
+ @property
+ def is_dir(self) -> bool:
+ return self._is_dir
+
+ @property
+ @abstractmethod
+ def full_path(self) -> Path: ...
+
+ @property
+ def full_path_str(self) -> str:
+ return str(self.full_path)
+
+ def check_parents(self, must_exist: bool = False) -> bool:
+ for p in reversed(self.full_path.parents):
+ if not p.exists() and not must_exist:
+ return False
+ if not p.is_dir():
+ raise AppPathError("Parents' path must be a dir.", self.full_path)
+ return True
+
+ def check_self(self, must_exist: bool = False) -> bool:
+ if not self.check_parents(must_exist):
+ return False
+ if not self.full_path.exists():
+ if not must_exist:
+ return False
+ raise AppPathError("Not exist.", self.full_path)
+ if self.is_dir:
+ if not self.full_path.is_dir():
+ raise AppPathError("Should be a directory, but not.", self.full_path)
+ else:
+ return True
+ else:
+ if not self.full_path.is_file():
+ raise AppPathError("Should be a file, but not.", self.full_path)
+ else:
+ return True
+
+ def ensure(self, create_file: bool = False) -> None:
+ e = self.check_self(False)
+ if not e:
+ os.makedirs(self.full_path.parent, exist_ok=True)
+ if self.is_dir:
+ os.mkdir(self.full_path)
+ elif create_file:
+ with open(self.full_path, "w") as f:
+ f.write("")
+
+ def add_subpath(
+ self,
+ name: str,
+ is_dir: bool,
+ /,
+ id: str | None = None,
+ description: str = "",
+ ) -> AppFeaturePath:
+ return self.app.add_path(name, is_dir, self, id, description)
+
+ @property
+ def app_relative_path(self) -> Path:
+ return self.full_path.relative_to(self.app.root.full_path)
+
+
+class AppFeaturePath(AppPath):
+ def __init__(
+ self,
+ parent: AppPath,
+ name: str,
+ is_dir: bool,
+ /,
+ id: str | None = None,
+ description: str = "",
+ ) -> None:
+ super().__init__(id or name, is_dir, description)
+ self._name = name
+ self._parent = parent
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def parent(self) -> AppPath:
+ return self._parent
+
+ @property
+ def app(self) -> AppBase:
+ return self.parent.app
+
+ @property
+ def full_path(self) -> Path:
+ return Path(self.parent.full_path, self.name).resolve()
+
+
+class AppRootPath(AppPath):
+ def __init__(self, app: AppBase):
+ super().__init__("root", True, "Application root path.")
+ self._app = app
+ self._full_path: Path | None = None
+
+ @property
+ def parent(self) -> None:
+ return None
+
+ @property
+ def app(self) -> AppBase:
+ return self._app
+
+ @property
+ def full_path(self) -> Path:
+ if self._full_path is None:
+ raise AppError("App root path is not set yet.")
+ return self._full_path
+
+ def setup(self, path: os.PathLike) -> None:
+ if self._full_path is not None:
+ raise AppError("App root path is already set.")
+ self._full_path = Path(path).resolve()
+
+
+class AppFeatureProvider(ABC):
+ def __init__(self, name: str, /, app: AppBase | None = None):
+ super().__init__()
+ self._name = name
+ self._app = app if app else AppBase.get_instance()
+
+ @property
+ def app(self) -> AppBase:
+ return self._app
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @abstractmethod
+ def setup(self) -> None: ...
+
+
+class AppCommandFeatureProvider(AppFeatureProvider):
+ @abstractmethod
+ def get_command_info(self) -> tuple[str, str]: ...
+
+ @abstractmethod
+ def setup_arg_parser(self, arg_parser: ArgumentParser): ...
+
+ @abstractmethod
+ def run_command(self, args: Namespace) -> None: ...
+
+
+DATA_DIR_NAME = "data"
+
+
+class PathCommandProvider(AppCommandFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("path-command-provider")
+
+ def setup(self):
+ pass
+
+ def get_command_info(self):
+ return ("path", "Get information about paths used by app.")
+
+ def setup_arg_parser(self, arg_parser: ArgumentParser) -> None:
+ subparsers = arg_parser.add_subparsers(
+ dest="path_command", required=True, metavar="PATH_COMMAND"
+ )
+ _list_parser = subparsers.add_parser(
+ "list", help="list special paths used by app"
+ )
+
+ def run_command(self, args: Namespace) -> None:
+ if args.path_command == "list":
+ for path in self.app.paths:
+ print(f"{path.app_relative_path.as_posix()}: {path.description}")
+
+
+class CommandDispatcher(AppFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("command-dispatcher")
+ self._parsed_args: argparse.Namespace | None = None
+
+ def setup_arg_parser(self) -> None:
+ epilog = """
+==> to start,
+./tools/manage init
+./tools/manage config init
+ln -s generated/docker-compose.yaml .
+# Then edit config file.
+
+==> to update
+git pull
+./tools/manage template generate --no-dry-run
+docker compose up
+ """.strip()
+
+ self._map: dict[str, AppCommandFeatureProvider] = {}
+ arg_parser = argparse.ArgumentParser(
+ description="Service management",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog=epilog,
+ )
+ arg_parser.add_argument(
+ "--project-dir",
+ help="The path of the project directory.",
+ required=True,
+ type=str,
+ )
+ subparsers = arg_parser.add_subparsers(
+ dest="command",
+ help="The management command to execute.",
+ metavar="COMMAND",
+ )
+ for feature in self.app.features:
+ if isinstance(feature, AppCommandFeatureProvider):
+ info = feature.get_command_info()
+ command_subparser = subparsers.add_parser(info[0], help=info[1])
+ feature.setup_arg_parser(command_subparser)
+ self._map[info[0]] = feature
+ self._arg_parser = arg_parser
+
+ def setup(self):
+ pass
+
+ @property
+ def arg_parser(self) -> argparse.ArgumentParser:
+ return self._arg_parser
+
+ @property
+ def map(self) -> dict[str, AppCommandFeatureProvider]:
+ return self._map
+
+ def get_program_parsed_args(self) -> argparse.Namespace:
+ if self._parsed_args is None:
+ self._parsed_args = self.arg_parser.parse_args()
+ return self._parsed_args
+
+ def run_command(self, args: argparse.Namespace | None = None) -> None:
+ real_args = args or self.get_program_parsed_args()
+ if real_args.command is None:
+ self.arg_parser.print_help()
+ return
+ self.map[real_args.command].run_command(real_args)
+
+
+class AppInitializer(AppCommandFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("app-initializer")
+
+ def _init_app(self) -> bool:
+ if self.app.app_initialized:
+ return False
+ self.app.data_dir.ensure()
+ return True
+
+ def setup(self):
+ pass
+
+ def get_command_info(self):
+ return ("init", "Initialize the app.")
+
+ def setup_arg_parser(self, arg_parser):
+ pass
+
+ def run_command(self, args):
+ init = self._init_app()
+ if init:
+ print("App initialized successfully.")
+ else:
+ print("App is already initialized. Do nothing.")
+
+
+class AppBase:
+ _instance: AppBase | None = None
+
+ @staticmethod
+ def get_instance() -> AppBase:
+ if AppBase._instance is None:
+ raise AppError("App instance not initialized")
+ return AppBase._instance
+
+ def __init__(self, app_id: str, name: str):
+ AppBase._instance = self
+ self._app_id = app_id
+ self._name = name
+ self._root = AppRootPath(self)
+ self._paths: list[AppFeaturePath] = []
+ self._features: list[AppFeatureProvider] = []
+
+ def setup(self) -> None:
+ command_dispatcher = self.get_feature(CommandDispatcher)
+ command_dispatcher.setup_arg_parser()
+ program_args = command_dispatcher.get_program_parsed_args()
+ self.setup_root(program_args.project_dir)
+ self._data_dir = self.add_path(DATA_DIR_NAME, True, id="data")
+ for feature in self.features:
+ feature.setup()
+ for path in self.paths:
+ path.check_self()
+
+ @property
+ def app_id(self) -> str:
+ return self._app_id
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def root(self) -> AppRootPath:
+ return self._root
+
+ def setup_root(self, path: os.PathLike) -> None:
+ self._root.setup(path)
+
+ @property
+ def data_dir(self) -> AppFeaturePath:
+ return self._data_dir
+
+ @property
+ def app_initialized(self) -> bool:
+ return self.data_dir.check_self()
+
+ def ensure_app_initialized(self) -> AppRootPath:
+ if not self.app_initialized:
+ raise AppError(
+ user_message="Root directory does not exist. "
+ "Please run 'init' to create one."
+ )
+ return self.root
+
+ @property
+ def features(self) -> list[AppFeatureProvider]:
+ return self._features
+
+ @property
+ def paths(self) -> list[AppFeaturePath]:
+ return self._paths
+
+ def add_feature(self, feature: _Feature) -> _Feature:
+ for f in self.features:
+ if f.name == feature.name:
+ raise AppFeatureError(
+ f"Duplicate feature name: {feature.name}.", feature.name
+ )
+ self._features.append(feature)
+ return feature
+
+ def add_path(
+ self,
+ name: str,
+ is_dir: bool,
+ /,
+ parent: AppPath | None = None,
+ id: str | None = None,
+ description: str = "",
+ ) -> AppFeaturePath:
+ p = AppFeaturePath(
+ parent or self.root, name, is_dir, id=id, description=description
+ )
+ self._paths.append(p)
+ return p
+
+ @overload
+ def get_feature(self, feature: str) -> AppFeatureProvider: ...
+
+ @overload
+ def get_feature(self, feature: type[_Feature]) -> _Feature: ...
+
+ def get_feature(
+ self, feature: str | type[_Feature]
+ ) -> AppFeatureProvider | _Feature:
+ if isinstance(feature, str):
+ for f in self._features:
+ if f.name == feature:
+ return f
+ elif isinstance(feature, type):
+ for f in self._features:
+ if isinstance(f, feature):
+ return f
+ else:
+ raise CruLogicError("Argument must be the name of feature or its class.")
+
+ raise AppFeatureError(f"Feature {feature} not found.", feature)
+
+ def get_path(self, name: str) -> AppFeaturePath:
+ for p in self._paths:
+ if p.id == name or p.name == name:
+ return p
+ raise AppPathError(f"Application path {name} not found.", name)
diff --git a/tools/cru-py/cru/service/_config.py b/tools/cru-py/cru/service/_config.py
new file mode 100644
index 0000000..b51e21c
--- /dev/null
+++ b/tools/cru-py/cru/service/_config.py
@@ -0,0 +1,446 @@
+from collections.abc import Iterable
+from typing import Any, Literal, overload
+
+from cru import CruException
+from cru.config import Configuration, ConfigItem
+from cru.value import (
+ INTEGER_VALUE_TYPE,
+ TEXT_VALUE_TYPE,
+ CruValueTypeError,
+ RandomStringValueGenerator,
+ UuidValueGenerator,
+)
+from cru.parsing import ParseError, SimpleLineConfigParser
+
+from ._base import AppFeaturePath, AppCommandFeatureProvider
+
+
+class AppConfigError(CruException):
+ def __init__(
+ self, message: str, configuration: Configuration, *args, **kwargs
+ ) -> None:
+ super().__init__(message, *args, **kwargs)
+ self._configuration = configuration
+
+ @property
+ def configuration(self) -> Configuration:
+ return self._configuration
+
+
+class AppConfigFileError(AppConfigError):
+ def __init__(
+ self,
+ message: str,
+ configuration: Configuration,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(message, configuration, *args, **kwargs)
+
+
+class AppConfigFileNotFoundError(AppConfigFileError):
+ def __init__(
+ self,
+ message: str,
+ configuration: Configuration,
+ file_path: str,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(message, configuration, *args, **kwargs)
+ self._file_path = file_path
+
+ @property
+ def file_path(self) -> str:
+ return self._file_path
+
+
+class AppConfigFileParseError(AppConfigFileError):
+ def __init__(
+ self,
+ message: str,
+ configuration: Configuration,
+ file_content: str,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(message, configuration, *args, **kwargs)
+ self._file_content = file_content
+ self.__cause__: ParseError
+
+ @property
+ def file_content(self) -> str:
+ return self._file_content
+
+ def get_user_message(self) -> str:
+ return f"Error while parsing config file at line {self.__cause__.line_number}."
+
+
+class AppConfigFileEntryError(AppConfigFileError):
+ def __init__(
+ self,
+ message: str,
+ configuration: Configuration,
+ entries: Iterable[SimpleLineConfigParser.Entry],
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(message, configuration, *args, **kwargs)
+ self._entries = list(entries)
+
+ @property
+ def error_entries(self) -> list[SimpleLineConfigParser.Entry]:
+ return self._entries
+
+ @staticmethod
+ def entries_to_friendly_message(
+ entries: Iterable[SimpleLineConfigParser.Entry],
+ ) -> str:
+ return "\n".join(
+ f"line {entry.line_number}: {entry.key}={entry.value}" for entry in entries
+ )
+
+ @property
+ def friendly_message_head(self) -> str:
+ return "Error entries found in config file"
+
+ def get_user_message(self) -> str:
+ return (
+ f"{self.friendly_message_head}:\n"
+ f"{self.entries_to_friendly_message(self.error_entries)}"
+ )
+
+
+class AppConfigDuplicateEntryError(AppConfigFileEntryError):
+ @property
+ def friendly_message_head(self) -> str:
+ return "Duplicate entries found in config file"
+
+
+class AppConfigEntryValueFormatError(AppConfigFileEntryError):
+ @property
+ def friendly_message_head(self) -> str:
+ return "Invalid value format for entries"
+
+
+class AppConfigItemNotSetError(AppConfigError):
+ def __init__(
+ self,
+ message: str,
+ configuration: Configuration,
+ items: list[ConfigItem],
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(message, configuration, *args, **kwargs)
+ self._items = items
+
+
+class ConfigManager(AppCommandFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("config-manager")
+ configuration = Configuration()
+ self._configuration = configuration
+ self._loaded: bool = False
+ self._init_app_defined_items()
+
+ def _init_app_defined_items(self) -> None:
+ prefix = self.config_name_prefix
+
+ def _add_text(name: str, description: str) -> ConfigItem:
+ item = ConfigItem(f"{prefix}_{name}", description, TEXT_VALUE_TYPE)
+ self.configuration.add(item)
+ return item
+
+ def _add_uuid(name: str, description: str) -> ConfigItem:
+ item = ConfigItem(
+ f"{prefix}_{name}",
+ description,
+ TEXT_VALUE_TYPE,
+ default=UuidValueGenerator(),
+ )
+ self.configuration.add(item)
+ return item
+
+ def _add_random_string(
+ name: str, description: str, length: int = 32, secure: bool = True
+ ) -> ConfigItem:
+ item = ConfigItem(
+ f"{prefix}_{name}",
+ description,
+ TEXT_VALUE_TYPE,
+ default=RandomStringValueGenerator(length, secure),
+ )
+ self.configuration.add(item)
+ return item
+
+ def _add_int(name: str, description: str) -> ConfigItem:
+ item = ConfigItem(f"{prefix}_{name}", description, INTEGER_VALUE_TYPE)
+ self.configuration.add(item)
+ return item
+
+ self._domain = _add_text("DOMAIN", "domain name")
+ self._email = _add_text("EMAIL", "admin email address")
+ _add_text(
+ "AUTO_BACKUP_COS_SECRET_ID",
+ "access key id for Tencent COS, used for auto backup",
+ )
+ _add_text(
+ "AUTO_BACKUP_COS_SECRET_KEY",
+ "access key secret for Tencent COS, used for auto backup",
+ )
+ _add_text(
+ "AUTO_BACKUP_COS_REGION", "region for Tencent COS, used for auto backup"
+ )
+ _add_text(
+ "AUTO_BACKUP_BUCKET_NAME",
+ "bucket name for Tencent COS, used for auto backup",
+ )
+ _add_text("GITHUB_USERNAME", "github username for fetching todos")
+ _add_int("GITHUB_PROJECT_NUMBER", "github project number for fetching todos")
+ _add_text("GITHUB_TOKEN", "github token for fetching todos")
+ _add_text("GITHUB_TODO_COUNT", "github todo count")
+ _add_uuid("V2RAY_TOKEN", "v2ray user id")
+ _add_uuid("V2RAY_PATH", "v2ray path, which will be prefixed by _")
+ _add_text("FORGEJO_MAILER_USER", "Forgejo SMTP user")
+ _add_text("FORGEJO_MAILER_PASSWD", "Forgejo SMTP password")
+ _add_random_string("2FAUTH_APP_KEY", "2FAuth App Key")
+ _add_text("2FAUTH_MAIL_USERNAME", "2FAuth SMTP user")
+ _add_text("2FAUTH_MAIL_PASSWORD", "2FAuth SMTP password")
+
+ def setup(self) -> None:
+ self._config_file_path = self.app.data_dir.add_subpath(
+ "config", False, description="Configuration file path."
+ )
+
+ @property
+ def config_name_prefix(self) -> str:
+ return self.app.app_id.upper()
+
+ @property
+ def configuration(self) -> Configuration:
+ return self._configuration
+
+ @property
+ def config_file_path(self) -> AppFeaturePath:
+ return self._config_file_path
+
+ @property
+ def all_set(self) -> bool:
+ return self.configuration.all_set
+
+ def get_item(self, name: str) -> ConfigItem[Any]:
+ if not name.startswith(self.config_name_prefix + "_"):
+ name = f"{self.config_name_prefix}_{name}"
+
+ item = self.configuration.get_or(name, None)
+ if item is None:
+ raise AppConfigError(f"Config item '{name}' not found.", self.configuration)
+ return item
+
+ @overload
+ def get_item_value_str(self, name: str) -> str: ...
+
+ @overload
+ def get_item_value_str(self, name: str, ensure_set: Literal[True]) -> str: ...
+
+ @overload
+ def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: ...
+
+ def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None:
+ self.load_config_file()
+ item = self.get_item(name)
+ if not item.is_set:
+ if ensure_set:
+ raise AppConfigItemNotSetError(
+ f"Config item '{name}' is not set.", self.configuration, [item]
+ )
+ return None
+ return item.value_str
+
+ def get_str_dict(self, ensure_all_set: bool = True) -> dict[str, str]:
+ self.load_config_file()
+ if ensure_all_set and not self.configuration.all_set:
+ raise AppConfigItemNotSetError(
+ "Some config items are not set.",
+ self.configuration,
+ self.configuration.get_unset_items(),
+ )
+ return self.configuration.to_str_dict()
+
+ @property
+ def domain_item_name(self) -> str:
+ return self._domain.name
+
+ def get_domain_value_str(self) -> str:
+ return self.get_item_value_str(self._domain.name)
+
+ def get_email_value_str_optional(self) -> str | None:
+ return self.get_item_value_str(self._email.name, ensure_set=False)
+
+ def _set_with_default(self) -> None:
+ if not self.configuration.all_not_set:
+ raise AppConfigError(
+ "Config is not clean. "
+ "Some config items are already set. "
+ "Can't set again with default value.",
+ self.configuration,
+ )
+ for item in self.configuration:
+ if item.can_generate_default:
+ item.set_value(item.generate_default_value())
+
+ def _to_config_file_content(self) -> str:
+ content = "".join(
+ [
+ f"{item.name}={item.value_str if item.is_set else ''}\n"
+ for item in self.configuration
+ ]
+ )
+ return content
+
+ def _create_init_config_file(self) -> None:
+ if self.config_file_path.check_self():
+ raise AppConfigError(
+ "Config file already exists.",
+ self.configuration,
+ user_message=f"The config file at "
+ f"{self.config_file_path.full_path_str} already exists.",
+ )
+ self._set_with_default()
+ self.config_file_path.ensure()
+ with open(
+ self.config_file_path.full_path, "w", encoding="utf-8", newline="\n"
+ ) as file:
+ file.write(self._to_config_file_content())
+
+ def _parse_config_file(self) -> SimpleLineConfigParser.Result:
+ if not self.config_file_path.check_self():
+ raise AppConfigFileNotFoundError(
+ "Config file not found.",
+ self.configuration,
+ self.config_file_path.full_path_str,
+ user_message=f"The config file at "
+ f"{self.config_file_path.full_path_str} does not exist. "
+ f"You can create an initial one with 'init' command.",
+ )
+
+ text = self.config_file_path.full_path.read_text()
+ try:
+ parser = SimpleLineConfigParser()
+ return parser.parse(text)
+ except ParseError as e:
+ raise AppConfigFileParseError(
+ "Failed to parse config file.", self.configuration, text
+ ) from e
+
+ def _parse_and_print_config_file(self) -> None:
+ parse_result = self._parse_config_file()
+ for entry in parse_result:
+ print(f"{entry.key}={entry.value}")
+
+ def _check_duplicate(
+ self,
+ parse_result: dict[str, list[SimpleLineConfigParser.Entry]],
+ ) -> dict[str, SimpleLineConfigParser.Entry]:
+ entry_dict: dict[str, SimpleLineConfigParser.Entry] = {}
+ duplicate_entries: list[SimpleLineConfigParser.Entry] = []
+ for key, entries in parse_result.items():
+ entry_dict[key] = entries[0]
+ if len(entries) > 1:
+ duplicate_entries.extend(entries)
+ if len(duplicate_entries) > 0:
+ raise AppConfigDuplicateEntryError(
+ "Duplicate entries found.", self.configuration, duplicate_entries
+ )
+
+ return entry_dict
+
+ def _check_type(
+ self, entry_dict: dict[str, SimpleLineConfigParser.Entry]
+ ) -> dict[str, Any]:
+ value_dict: dict[str, Any] = {}
+ error_entries: list[SimpleLineConfigParser.Entry] = []
+ errors: list[CruValueTypeError] = []
+ for key, entry in entry_dict.items():
+ config_item = self.configuration.get(key)
+ try:
+ if entry.value == "":
+ value_dict[key] = None
+ else:
+ value_dict[key] = config_item.value_type.convert_str_to_value(
+ entry.value
+ )
+ except CruValueTypeError as e:
+ error_entries.append(entry)
+ errors.append(e)
+ if len(error_entries) > 0:
+ raise AppConfigEntryValueFormatError(
+ "Entry value format is not correct.",
+ self.configuration,
+ error_entries,
+ ) from ExceptionGroup("Multiple format errors occurred.", errors)
+ return value_dict
+
+ def _read_config_file(self) -> dict[str, Any]:
+ parsed = self._parse_config_file()
+ entry_groups = parsed.cru_iter().group_by(lambda e: e.key)
+ entry_dict = self._check_duplicate(entry_groups)
+ value_dict = self._check_type(entry_dict)
+ return value_dict
+
+ def _real_load_config_file(self) -> None:
+ self.configuration.reset_all()
+ value_dict = self._read_config_file()
+ for key, value in value_dict.items():
+ if value is None:
+ continue
+ self.configuration.set_config_item(key, value)
+
+ def load_config_file(self, force=False) -> None:
+ if force or not self._loaded:
+ self._real_load_config_file()
+ self._loaded = True
+
+ def _print_app_config_info(self):
+ for item in self.configuration:
+ print(item.description_str)
+
+ def get_command_info(self):
+ return "config", "Manage configuration."
+
+ def setup_arg_parser(self, arg_parser) -> None:
+ subparsers = arg_parser.add_subparsers(
+ dest="config_command", required=True, metavar="CONFIG_COMMAND"
+ )
+ _init_parser = subparsers.add_parser(
+ "init", help="create an initial config file"
+ )
+ _print_app_parser = subparsers.add_parser(
+ "print-app",
+ help="print information of the config items defined by app",
+ )
+ _print_parser = subparsers.add_parser("print", help="print current config")
+ _check_config_parser = subparsers.add_parser(
+ "check",
+ help="check the validity of the config file",
+ )
+ _check_config_parser.add_argument(
+ "-f",
+ "--format-only",
+ action="store_true",
+ help="only check content format, not app config item requirements.",
+ )
+
+ def run_command(self, args) -> None:
+ if args.config_command == "init":
+ self._create_init_config_file()
+ elif args.config_command == "print-app":
+ self._print_app_config_info()
+ elif args.config_command == "print":
+ self._parse_and_print_config_file()
+ elif args.config_command == "check":
+ if args.format_only:
+ self._parse_config_file()
+ else:
+ self._read_config_file()
diff --git a/tools/cru-py/cru/service/_external.py b/tools/cru-py/cru/service/_external.py
new file mode 100644
index 0000000..2347e95
--- /dev/null
+++ b/tools/cru-py/cru/service/_external.py
@@ -0,0 +1,81 @@
+from ._base import AppCommandFeatureProvider
+from ._nginx import NginxManager
+
+
+class CliToolCommandProvider(AppCommandFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("cli-tool-command-provider")
+
+ def setup(self):
+ pass
+
+ def get_command_info(self):
+ return ("gen-cli", "Get commands of running external cli tools.")
+
+ def setup_arg_parser(self, arg_parser):
+ subparsers = arg_parser.add_subparsers(
+ dest="gen_cli_command", required=True, metavar="GEN_CLI_COMMAND"
+ )
+ certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
+ certbot_parser.add_argument(
+ "-t", "--test", action="store_true", help="run certbot in test mode"
+ )
+ _install_docker_parser = subparsers.add_parser(
+ "install-docker", help="print docker installation commands"
+ )
+ _update_blog_parser = subparsers.add_parser(
+ "update-blog", help="print blog update command"
+ )
+
+ def _print_install_docker_commands(self) -> None:
+ output = """
+### COMMAND: uninstall apt docker
+for pkg in docker.io docker-doc docker-compose \
+podman-docker containerd runc; \
+do sudo apt-get remove $pkg; done
+
+### COMMAND: prepare apt certs
+sudo apt-get update
+sudo apt-get install ca-certificates curl
+sudo install -m 0755 -d /etc/apt/keyrings
+
+### COMMAND: install certs
+sudo curl -fsSL https://download.docker.com/linux/debian/gpg \
+-o /etc/apt/keyrings/docker.asc
+sudo chmod a+r /etc/apt/keyrings/docker.asc
+
+### COMMAND: add docker apt source
+echo \\
+ "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \
+https://download.docker.com/linux/debian \\
+ $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\
+ sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
+
+### COMMAND: update apt and install docker
+sudo apt-get update
+sudo apt-get install docker-ce docker-ce-cli containerd.io \
+docker-buildx-plugin docker-compose-plugin
+
+### COMMAND: setup system for docker
+sudo systemctl enable docker
+sudo systemctl start docker
+sudo groupadd -f docker
+sudo usermod -aG docker $USER
+# Remember to log out and log back in for the group changes to take effect
+""".strip()
+ print(output)
+
+ def _print_update_blog_command(self):
+ output = """
+### COMMAND: update blog
+docker exec -it blog /scripts/update.bash
+""".strip()
+ print(output)
+
+ def run_command(self, args):
+ if args.gen_cli_command == "certbot":
+ self.app.get_feature(NginxManager).print_all_certbot_commands(args.test)
+ elif args.gen_cli_command == "install-docker":
+ self._print_install_docker_commands()
+ elif args.gen_cli_command == "update-blog":
+ self._print_update_blog_command() \ No newline at end of file
diff --git a/tools/cru-py/cru/service/_nginx.py b/tools/cru-py/cru/service/_nginx.py
new file mode 100644
index 0000000..e0a9c60
--- /dev/null
+++ b/tools/cru-py/cru/service/_nginx.py
@@ -0,0 +1,281 @@
+from argparse import Namespace
+from enum import Enum, auto
+import re
+import subprocess
+from typing import TypeAlias
+
+from cru import CruInternalError
+
+from ._base import AppCommandFeatureProvider
+from ._config import ConfigManager
+from ._template import TemplateManager
+
+
+class CertbotAction(Enum):
+ CREATE = auto()
+ EXPAND = auto()
+ SHRINK = auto()
+ RENEW = auto()
+
+
+class NginxManager(AppCommandFeatureProvider):
+ CertbotAction: TypeAlias = CertbotAction
+
+ def __init__(self) -> None:
+ super().__init__("nginx-manager")
+ self._domains_cache: list[str] | None = None
+
+ def setup(self) -> None:
+ pass
+
+ @property
+ def _config_manager(self) -> ConfigManager:
+ return self.app.get_feature(ConfigManager)
+
+ @property
+ def root_domain(self) -> str:
+ return self._config_manager.get_domain_value_str()
+
+ @property
+ def domains(self) -> list[str]:
+ if self._domains_cache is None:
+ self._domains_cache = self._get_domains()
+ return self._domains_cache
+
+ @property
+ def subdomains(self) -> list[str]:
+ suffix = "." + self.root_domain
+ return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)]
+
+ @property
+ def _domain_config_name(self) -> str:
+ return self._config_manager.domain_item_name
+
+ def _get_domains_from_text(self, text: str) -> set[str]:
+ domains: set[str] = set()
+ regex = re.compile(r"server_name\s+(\S+)\s*;")
+ domain_variable_str = f"${self._domain_config_name}"
+ brace_domain_variable_regex = re.compile(
+ r"\$\{\s*" + self._domain_config_name + r"\s*\}"
+ )
+ for match in regex.finditer(text):
+ domain_part = match.group(1)
+ if domain_variable_str in domain_part:
+ domains.add(domain_part.replace(domain_variable_str, self.root_domain))
+ continue
+ m = brace_domain_variable_regex.search(domain_part)
+ if m:
+ domains.add(domain_part.replace(m.group(0), self.root_domain))
+ continue
+ domains.add(domain_part)
+ return domains
+
+ def _get_nginx_conf_template_text(self) -> str:
+ template_manager = self.app.get_feature(TemplateManager)
+ text = ""
+ for path, template in template_manager.template_tree.templates:
+ if path.as_posix().startswith("nginx/"):
+ text += template.raw_text
+ return text
+
+ def _get_domains(self) -> list[str]:
+ text = self._get_nginx_conf_template_text()
+ domains = list(self._get_domains_from_text(text))
+ domains.remove(self.root_domain)
+ return [self.root_domain, *domains]
+
+ def _print_domains(self) -> None:
+ for domain in self.domains:
+ print(domain)
+
+ def _certbot_command(
+ self,
+ action: CertbotAction | str,
+ test: bool,
+ *,
+ docker=True,
+ standalone=None,
+ email=None,
+ agree_tos=True,
+ ) -> str:
+ if isinstance(action, str):
+ action = CertbotAction[action.upper()]
+
+ command_args = []
+
+ add_domain_option = True
+ if action is CertbotAction.CREATE:
+ if standalone is None:
+ standalone = True
+ command_action = "certonly"
+ elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]:
+ if standalone is None:
+ standalone = False
+ command_action = "certonly"
+ elif action is CertbotAction.RENEW:
+ if standalone is None:
+ standalone = False
+ add_domain_option = False
+ command_action = "renew"
+ else:
+ raise CruInternalError("Invalid certbot action.")
+
+ data_dir = self.app.data_dir.full_path.as_posix()
+
+ if not docker:
+ command_args.append("certbot")
+ else:
+ command_args.extend(
+ [
+ "docker run -it --rm --name certbot",
+ f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"',
+ f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"',
+ ]
+ )
+ if standalone:
+ command_args.append('-p "0.0.0.0:80:80"')
+ else:
+ command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"')
+
+ command_args.append("certbot/certbot")
+
+ command_args.append(command_action)
+
+ command_args.append(f"--cert-name {self.root_domain}")
+
+ if standalone:
+ command_args.append("--standalone")
+ else:
+ command_args.append("--webroot -w /var/www/certbot")
+
+ if add_domain_option:
+ command_args.append(" ".join([f"-d {domain}" for domain in self.domains]))
+
+ if email is not None:
+ command_args.append(f"--email {email}")
+
+ if agree_tos:
+ command_args.append("--agree-tos")
+
+ if test:
+ command_args.append("--test-cert --dry-run")
+
+ return " ".join(command_args)
+
+ def print_all_certbot_commands(self, test: bool):
+ print("### COMMAND: (standalone) create certs")
+ print(
+ self._certbot_command(
+ CertbotAction.CREATE,
+ test,
+ email=self._config_manager.get_email_value_str_optional(),
+ )
+ )
+ print()
+ print("### COMMAND: (webroot+nginx) expand or shrink certs")
+ print(
+ self._certbot_command(
+ CertbotAction.EXPAND,
+ test,
+ email=self._config_manager.get_email_value_str_optional(),
+ )
+ )
+ print()
+ print("### COMMAND: (webroot+nginx) renew certs")
+ print(
+ self._certbot_command(
+ CertbotAction.RENEW,
+ test,
+ email=self._config_manager.get_email_value_str_optional(),
+ )
+ )
+
+ @property
+ def _cert_path_str(self) -> str:
+ return str(
+ self.app.data_dir.full_path
+ / "certbot/certs/live"
+ / self.root_domain
+ / "fullchain.pem"
+ )
+
+ def get_command_info(self):
+ return "nginx", "Manage nginx related things."
+
+ def setup_arg_parser(self, arg_parser):
+ subparsers = arg_parser.add_subparsers(
+ dest="nginx_command", required=True, metavar="NGINX_COMMAND"
+ )
+ _list_parser = subparsers.add_parser("list", help="list domains")
+ certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
+ certbot_parser.add_argument(
+ "--no-test",
+ action="store_true",
+ help="remove args making certbot run in test mode",
+ )
+
+ def run_command(self, args: Namespace) -> None:
+ if args.nginx_command == "list":
+ self._print_domains()
+ elif args.nginx_command == "certbot":
+ self.print_all_certbot_commands(not args.no_test)
+
+ def _generate_dns_zone(
+ self,
+ ip: str,
+ /,
+ ttl: str | int = 600,
+ *,
+ enable_mail: bool = True,
+ dkim: str | None = None,
+ ) -> str:
+ # TODO: Not complete and test now.
+ root_domain = self.root_domain
+ result = f"$ORIGIN {root_domain}.\n\n"
+ result += "; A records\n"
+ result += f"@ {ttl} IN A {ip}\n"
+ for subdomain in self.subdomains:
+ result += f"{subdomain} {ttl} IN A {ip}\n"
+
+ if enable_mail:
+ result += "\n; MX records\n"
+ result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n"
+ result += "\n; SPF record\n"
+ result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n'
+ if dkim is not None:
+ result += "\n; DKIM record\n"
+ result += f'mail._domainkey {ttl} IN TEXT "{dkim}"'
+ result += "\n; DMARC record\n"
+ dmarc_options = [
+ "v=DMARC1",
+ "p=none",
+ f"rua=mailto:dmarc.report@{root_domain}",
+ f"ruf=mailto:dmarc.report@{root_domain}",
+ "sp=none",
+ "ri=86400",
+ ]
+ result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n'
+ return result
+
+ def _get_dkim_from_mailserver(self) -> str | None:
+ # TODO: Not complete and test now.
+ dkim_path = (
+ self.app.data_dir.full_path
+ / "dms/config/opendkim/keys"
+ / self.root_domain
+ / "mail.txt"
+ )
+ if not dkim_path.exists():
+ return None
+
+ p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True)
+ value = ""
+ for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")):
+ value += match.group(1)
+ return value
+
+ def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str:
+ # TODO: Not complete and test now.
+ return self._generate_dns_zone(
+ ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver()
+ )
diff --git a/tools/cru-py/cru/service/_template.py b/tools/cru-py/cru/service/_template.py
new file mode 100644
index 0000000..170116c
--- /dev/null
+++ b/tools/cru-py/cru/service/_template.py
@@ -0,0 +1,86 @@
+from argparse import Namespace
+import shutil
+
+from cru import CruIterator
+from cru.template import TemplateTree
+
+from ._base import AppCommandFeatureProvider, AppFeaturePath
+from ._config import ConfigManager
+
+
+class TemplateManager(AppCommandFeatureProvider):
+ def __init__(self, prefix: str | None = None):
+ super().__init__("template-manager")
+ self._prefix = prefix or self.app.app_id.upper()
+
+ def setup(self) -> None:
+ self._templates_dir = self.app.add_path("templates", True)
+ self._generated_dir = self.app.add_path("generated", True)
+ self._template_tree: TemplateTree | None = None
+
+ @property
+ def prefix(self) -> str:
+ return self._prefix
+
+ @property
+ def templates_dir(self) -> AppFeaturePath:
+ return self._templates_dir
+
+ @property
+ def generated_dir(self) -> AppFeaturePath:
+ return self._generated_dir
+
+ @property
+ def template_tree(self) -> TemplateTree:
+ if self._template_tree is None:
+ return self.reload()
+ return self._template_tree
+
+ def reload(self) -> TemplateTree:
+ self._template_tree = TemplateTree(
+ self.prefix, self.templates_dir.full_path_str
+ )
+ return self._template_tree
+
+ def _print_file_lists(self) -> None:
+ for file in CruIterator(self.template_tree.templates).transform(lambda t: t[0]):
+ print(file.as_posix())
+
+ def _generate_files(self, dry_run: bool) -> None:
+ config_manager = self.app.get_feature(ConfigManager)
+ if not dry_run and self.generated_dir.full_path.exists():
+ shutil.rmtree(self.generated_dir.full_path)
+ self.template_tree.generate_to(
+ self.generated_dir.full_path_str, config_manager.get_str_dict(), dry_run
+ )
+
+ def get_command_info(self):
+ return ("template", "Manage templates.")
+
+ def setup_arg_parser(self, arg_parser):
+ subparsers = arg_parser.add_subparsers(
+ dest="template_command", required=True, metavar="TEMPLATE_COMMAND"
+ )
+ _list_parser = subparsers.add_parser("list", help="list templates")
+ _variables_parser = subparsers.add_parser(
+ "variables", help="list variables used in all templates"
+ )
+ generate_parser = subparsers.add_parser("generate", help="generate templates")
+ generate_parser.add_argument(
+ "--no-dry-run", action="store_true", help="generate and write target files"
+ )
+
+ def run_command(self, args: Namespace) -> None:
+ if args.template_command == "list":
+ self._print_file_lists()
+ elif args.template_command == "variables":
+ for var in self.template_tree.variables:
+ print(var)
+ elif args.template_command == "generate":
+ dry_run = not args.no_dry_run
+ self._generate_files(dry_run)
+ if dry_run:
+ print("Dry run successfully.")
+ print(
+ f"Will delete dir {self.generated_dir.full_path_str} if it exists."
+ )