diff options
Diffstat (limited to 'python/cru/service/_base.py')
-rw-r--r-- | python/cru/service/_base.py | 400 |
1 files changed, 400 insertions, 0 deletions
diff --git a/python/cru/service/_base.py b/python/cru/service/_base.py new file mode 100644 index 0000000..e1eee70 --- /dev/null +++ b/python/cru/service/_base.py @@ -0,0 +1,400 @@ +from __future__ import annotations + +from argparse import ArgumentParser, Namespace +from abc import ABC, abstractmethod +import argparse +import os +from pathlib import Path +from typing import TypeVar, overload + +from cru import CruException, CruLogicError + +_Feature = TypeVar("_Feature", bound="AppFeatureProvider") + + +class AppError(CruException): + pass + + +class AppFeatureError(AppError): + def __init__(self, message, feature: type | str, *args, **kwargs): + super().__init__(message, *args, **kwargs) + self._feature = feature + + @property + def feature(self) -> type | str: + return self._feature + + +class AppPathError(CruException): + def __init__(self, message, _path: str | Path, *args, **kwargs): + super().__init__(message, *args, **kwargs) + self._path = str(_path) + + @property + def path(self) -> str: + return self._path + + +class AppPath(ABC): + def __init__(self, id: str, is_dir: bool, description: str) -> None: + self._is_dir = is_dir + self._id = id + self._description = description + + @property + @abstractmethod + def parent(self) -> AppPath | None: ... + + @property + @abstractmethod + def app(self) -> AppBase: ... + + @property + def id(self) -> str: + return self._id + + @property + def description(self) -> str: + return self._description + + @property + def is_dir(self) -> bool: + return self._is_dir + + @property + @abstractmethod + def full_path(self) -> Path: ... + + @property + def full_path_str(self) -> str: + return str(self.full_path) + + def check_parents(self, must_exist: bool = False) -> bool: + for p in reversed(self.full_path.parents): + if not p.exists() and not must_exist: + return False + if not p.is_dir(): + raise AppPathError("Parents' path must be a dir.", self.full_path) + return True + + def check_self(self, must_exist: bool = False) -> bool: + if not self.check_parents(must_exist): + return False + if not self.full_path.exists(): + if not must_exist: + return False + raise AppPathError("Not exist.", self.full_path) + if self.is_dir: + if not self.full_path.is_dir(): + raise AppPathError("Should be a directory, but not.", self.full_path) + else: + return True + else: + if not self.full_path.is_file(): + raise AppPathError("Should be a file, but not.", self.full_path) + else: + return True + + def ensure(self, create_file: bool = False) -> None: + e = self.check_self(False) + if not e: + os.makedirs(self.full_path.parent, exist_ok=True) + if self.is_dir: + os.mkdir(self.full_path) + elif create_file: + with open(self.full_path, "w") as f: + f.write("") + + def read_text(self) -> str: + if self.is_dir: + raise AppPathError("Can't read text of a dir.", self.full_path) + self.check_self() + return self.full_path.read_text() + + def add_subpath( + self, + name: str, + is_dir: bool, + /, + id: str | None = None, + description: str = "", + ) -> AppFeaturePath: + return self.app._add_path(name, is_dir, self, id, description) + + @property + def app_relative_path(self) -> Path: + return self.full_path.relative_to(self.app.root.full_path) + + +class AppFeaturePath(AppPath): + def __init__( + self, + parent: AppPath, + name: str, + is_dir: bool, + /, + id: str | None = None, + description: str = "", + ) -> None: + super().__init__(id or name, is_dir, description) + self._name = name + self._parent = parent + + @property + def name(self) -> str: + return self._name + + @property + def parent(self) -> AppPath: + return self._parent + + @property + def app(self) -> AppBase: + return self.parent.app + + @property + def full_path(self) -> Path: + return Path(self.parent.full_path, self.name).resolve() + + +class AppRootPath(AppPath): + def __init__(self, app: AppBase, path: Path): + super().__init__(f"/{id}", True, f"Application {id} root path.") + self._app = app + self._full_path = path.resolve() + + @property + def parent(self) -> None: + return None + + @property + def app(self) -> AppBase: + return self._app + + @property + def full_path(self) -> Path: + return self._full_path + + +class AppFeatureProvider(ABC): + def __init__(self, name: str, /, app: AppBase | None = None): + super().__init__() + self._name = name + self._app = app if app else AppBase.get_instance() + + @property + def app(self) -> AppBase: + return self._app + + @property + def name(self) -> str: + return self._name + + @abstractmethod + def setup(self) -> None: ... + + +class AppCommandFeatureProvider(AppFeatureProvider): + @abstractmethod + def get_command_info(self) -> tuple[str, str]: ... + + @abstractmethod + def setup_arg_parser(self, arg_parser: ArgumentParser): ... + + @abstractmethod + def run_command(self, args: Namespace) -> None: ... + + +class PathCommandProvider(AppCommandFeatureProvider): + def __init__(self) -> None: + super().__init__("path-command-provider") + + def setup(self): + pass + + def get_command_info(self): + return ("path", "Get information about paths used by app.") + + def setup_arg_parser(self, arg_parser: ArgumentParser) -> None: + subparsers = arg_parser.add_subparsers( + dest="path_command", metavar="PATH_COMMAND" + ) + _list_parser = subparsers.add_parser( + "list", help="list special paths used by app" + ) + + def run_command(self, args: Namespace) -> None: + if args.path_command is None or args.path_command == "list": + for path in self.app.paths: + print( + f"{path.app_relative_path.as_posix()}{'/' if path.is_dir else ''}: {path.description}" + ) + + +class CommandDispatcher(AppFeatureProvider): + def __init__(self) -> None: + super().__init__("command-dispatcher") + + def setup_arg_parser(self) -> None: + self._map: dict[str, AppCommandFeatureProvider] = {} + arg_parser = argparse.ArgumentParser( + description="Service management", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + subparsers = arg_parser.add_subparsers( + dest="command", + help="The management command to execute.", + metavar="COMMAND", + ) + for feature in self.app.features: + if isinstance(feature, AppCommandFeatureProvider): + info = feature.get_command_info() + command_subparser = subparsers.add_parser(info[0], help=info[1]) + feature.setup_arg_parser(command_subparser) + self._map[info[0]] = feature + self._arg_parser = arg_parser + + def setup(self): + self._parsed_args = self.arg_parser.parse_args() + + @property + def arg_parser(self) -> argparse.ArgumentParser: + return self._arg_parser + + @property + def command_map(self) -> dict[str, AppCommandFeatureProvider]: + return self._map + + @property + def program_args(self) -> argparse.Namespace: + return self._parsed_args + + def run_command(self) -> None: + args = self.program_args + if args.command is None: + self.arg_parser.print_help() + return + self.command_map[args.command].run_command(args) + + +class AppBase: + _instance: AppBase | None = None + + @staticmethod + def get_instance() -> AppBase: + if AppBase._instance is None: + raise AppError("App instance not initialized") + return AppBase._instance + + def __init__(self, app_id: str, name: str): + AppBase._instance = self + self._app_id = app_id + self._name = name + self._features: list[AppFeatureProvider] = [] + self._paths: list[AppFeaturePath] = [] + + def setup(self) -> None: + command_dispatcher = self.get_feature(CommandDispatcher) + command_dispatcher.setup_arg_parser() + self._root = AppRootPath(self, Path(self._ensure_env("CRUPEST_PROJECT_DIR"))) + self._data_dir = self._root.add_subpath( + self._ensure_env("CRUPEST_DATA_DIR"), True, id="data" + ) + self._services_dir = self._root.add_subpath( + self._ensure_env("CRUPEST_SERVICES_DIR"), True, id="CRUPEST_SERVICES_DIR" + ) + for feature in self.features: + feature.setup() + for path in self.paths: + path.check_self() + + @property + def app_id(self) -> str: + return self._app_id + + @property + def name(self) -> str: + return self._name + + def _ensure_env(self, env_name: str) -> str: + value = os.getenv(env_name) + if value is None: + raise AppError(f"Environment variable {env_name} not set") + return value + + @property + def root(self) -> AppRootPath: + return self._root + + @property + def data_dir(self) -> AppFeaturePath: + return self._data_dir + + @property + def services_dir(self) -> AppFeaturePath: + return self._services_dir + + @property + def app_initialized(self) -> bool: + return self.data_dir.check_self() + + @property + def features(self) -> list[AppFeatureProvider]: + return self._features + + @property + def paths(self) -> list[AppFeaturePath]: + return self._paths + + def add_feature(self, feature: _Feature) -> _Feature: + for f in self.features: + if f.name == feature.name: + raise AppFeatureError( + f"Duplicate feature name: {feature.name}.", feature.name + ) + self._features.append(feature) + return feature + + def _add_path( + self, + name: str, + is_dir: bool, + /, + parent: AppPath | None = None, + id: str | None = None, + description: str = "", + ) -> AppFeaturePath: + p = AppFeaturePath( + parent or self.root, name, is_dir, id=id, description=description + ) + self._paths.append(p) + return p + + @overload + def get_feature(self, feature: str) -> AppFeatureProvider: ... + + @overload + def get_feature(self, feature: type[_Feature]) -> _Feature: ... + + def get_feature( + self, feature: str | type[_Feature] + ) -> AppFeatureProvider | _Feature: + if isinstance(feature, str): + for f in self._features: + if f.name == feature: + return f + elif isinstance(feature, type): + for f in self._features: + if isinstance(f, feature): + return f + else: + raise CruLogicError("Argument must be the name of feature or its class.") + + raise AppFeatureError(f"Feature {feature} not found.", feature) + + def get_path(self, name: str) -> AppFeaturePath: + for p in self._paths: + if p.id == name or p.name == name: + return p + raise AppPathError(f"Application path {name} not found.", name) |