aboutsummaryrefslogtreecommitdiff
path: root/python/cru/service/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/cru/service/_base.py')
-rw-r--r--python/cru/service/_base.py400
1 files changed, 400 insertions, 0 deletions
diff --git a/python/cru/service/_base.py b/python/cru/service/_base.py
new file mode 100644
index 0000000..e1eee70
--- /dev/null
+++ b/python/cru/service/_base.py
@@ -0,0 +1,400 @@
+from __future__ import annotations
+
+from argparse import ArgumentParser, Namespace
+from abc import ABC, abstractmethod
+import argparse
+import os
+from pathlib import Path
+from typing import TypeVar, overload
+
+from cru import CruException, CruLogicError
+
+_Feature = TypeVar("_Feature", bound="AppFeatureProvider")
+
+
+class AppError(CruException):
+ pass
+
+
+class AppFeatureError(AppError):
+ def __init__(self, message, feature: type | str, *args, **kwargs):
+ super().__init__(message, *args, **kwargs)
+ self._feature = feature
+
+ @property
+ def feature(self) -> type | str:
+ return self._feature
+
+
+class AppPathError(CruException):
+ def __init__(self, message, _path: str | Path, *args, **kwargs):
+ super().__init__(message, *args, **kwargs)
+ self._path = str(_path)
+
+ @property
+ def path(self) -> str:
+ return self._path
+
+
+class AppPath(ABC):
+ def __init__(self, id: str, is_dir: bool, description: str) -> None:
+ self._is_dir = is_dir
+ self._id = id
+ self._description = description
+
+ @property
+ @abstractmethod
+ def parent(self) -> AppPath | None: ...
+
+ @property
+ @abstractmethod
+ def app(self) -> AppBase: ...
+
+ @property
+ def id(self) -> str:
+ return self._id
+
+ @property
+ def description(self) -> str:
+ return self._description
+
+ @property
+ def is_dir(self) -> bool:
+ return self._is_dir
+
+ @property
+ @abstractmethod
+ def full_path(self) -> Path: ...
+
+ @property
+ def full_path_str(self) -> str:
+ return str(self.full_path)
+
+ def check_parents(self, must_exist: bool = False) -> bool:
+ for p in reversed(self.full_path.parents):
+ if not p.exists() and not must_exist:
+ return False
+ if not p.is_dir():
+ raise AppPathError("Parents' path must be a dir.", self.full_path)
+ return True
+
+ def check_self(self, must_exist: bool = False) -> bool:
+ if not self.check_parents(must_exist):
+ return False
+ if not self.full_path.exists():
+ if not must_exist:
+ return False
+ raise AppPathError("Not exist.", self.full_path)
+ if self.is_dir:
+ if not self.full_path.is_dir():
+ raise AppPathError("Should be a directory, but not.", self.full_path)
+ else:
+ return True
+ else:
+ if not self.full_path.is_file():
+ raise AppPathError("Should be a file, but not.", self.full_path)
+ else:
+ return True
+
+ def ensure(self, create_file: bool = False) -> None:
+ e = self.check_self(False)
+ if not e:
+ os.makedirs(self.full_path.parent, exist_ok=True)
+ if self.is_dir:
+ os.mkdir(self.full_path)
+ elif create_file:
+ with open(self.full_path, "w") as f:
+ f.write("")
+
+ def read_text(self) -> str:
+ if self.is_dir:
+ raise AppPathError("Can't read text of a dir.", self.full_path)
+ self.check_self()
+ return self.full_path.read_text()
+
+ def add_subpath(
+ self,
+ name: str,
+ is_dir: bool,
+ /,
+ id: str | None = None,
+ description: str = "",
+ ) -> AppFeaturePath:
+ return self.app._add_path(name, is_dir, self, id, description)
+
+ @property
+ def app_relative_path(self) -> Path:
+ return self.full_path.relative_to(self.app.root.full_path)
+
+
+class AppFeaturePath(AppPath):
+ def __init__(
+ self,
+ parent: AppPath,
+ name: str,
+ is_dir: bool,
+ /,
+ id: str | None = None,
+ description: str = "",
+ ) -> None:
+ super().__init__(id or name, is_dir, description)
+ self._name = name
+ self._parent = parent
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def parent(self) -> AppPath:
+ return self._parent
+
+ @property
+ def app(self) -> AppBase:
+ return self.parent.app
+
+ @property
+ def full_path(self) -> Path:
+ return Path(self.parent.full_path, self.name).resolve()
+
+
+class AppRootPath(AppPath):
+ def __init__(self, app: AppBase, path: Path):
+ super().__init__(f"/{id}", True, f"Application {id} root path.")
+ self._app = app
+ self._full_path = path.resolve()
+
+ @property
+ def parent(self) -> None:
+ return None
+
+ @property
+ def app(self) -> AppBase:
+ return self._app
+
+ @property
+ def full_path(self) -> Path:
+ return self._full_path
+
+
+class AppFeatureProvider(ABC):
+ def __init__(self, name: str, /, app: AppBase | None = None):
+ super().__init__()
+ self._name = name
+ self._app = app if app else AppBase.get_instance()
+
+ @property
+ def app(self) -> AppBase:
+ return self._app
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @abstractmethod
+ def setup(self) -> None: ...
+
+
+class AppCommandFeatureProvider(AppFeatureProvider):
+ @abstractmethod
+ def get_command_info(self) -> tuple[str, str]: ...
+
+ @abstractmethod
+ def setup_arg_parser(self, arg_parser: ArgumentParser): ...
+
+ @abstractmethod
+ def run_command(self, args: Namespace) -> None: ...
+
+
+class PathCommandProvider(AppCommandFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("path-command-provider")
+
+ def setup(self):
+ pass
+
+ def get_command_info(self):
+ return ("path", "Get information about paths used by app.")
+
+ def setup_arg_parser(self, arg_parser: ArgumentParser) -> None:
+ subparsers = arg_parser.add_subparsers(
+ dest="path_command", metavar="PATH_COMMAND"
+ )
+ _list_parser = subparsers.add_parser(
+ "list", help="list special paths used by app"
+ )
+
+ def run_command(self, args: Namespace) -> None:
+ if args.path_command is None or args.path_command == "list":
+ for path in self.app.paths:
+ print(
+ f"{path.app_relative_path.as_posix()}{'/' if path.is_dir else ''}: {path.description}"
+ )
+
+
+class CommandDispatcher(AppFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("command-dispatcher")
+
+ def setup_arg_parser(self) -> None:
+ self._map: dict[str, AppCommandFeatureProvider] = {}
+ arg_parser = argparse.ArgumentParser(
+ description="Service management",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ )
+ subparsers = arg_parser.add_subparsers(
+ dest="command",
+ help="The management command to execute.",
+ metavar="COMMAND",
+ )
+ for feature in self.app.features:
+ if isinstance(feature, AppCommandFeatureProvider):
+ info = feature.get_command_info()
+ command_subparser = subparsers.add_parser(info[0], help=info[1])
+ feature.setup_arg_parser(command_subparser)
+ self._map[info[0]] = feature
+ self._arg_parser = arg_parser
+
+ def setup(self):
+ self._parsed_args = self.arg_parser.parse_args()
+
+ @property
+ def arg_parser(self) -> argparse.ArgumentParser:
+ return self._arg_parser
+
+ @property
+ def command_map(self) -> dict[str, AppCommandFeatureProvider]:
+ return self._map
+
+ @property
+ def program_args(self) -> argparse.Namespace:
+ return self._parsed_args
+
+ def run_command(self) -> None:
+ args = self.program_args
+ if args.command is None:
+ self.arg_parser.print_help()
+ return
+ self.command_map[args.command].run_command(args)
+
+
+class AppBase:
+ _instance: AppBase | None = None
+
+ @staticmethod
+ def get_instance() -> AppBase:
+ if AppBase._instance is None:
+ raise AppError("App instance not initialized")
+ return AppBase._instance
+
+ def __init__(self, app_id: str, name: str):
+ AppBase._instance = self
+ self._app_id = app_id
+ self._name = name
+ self._features: list[AppFeatureProvider] = []
+ self._paths: list[AppFeaturePath] = []
+
+ def setup(self) -> None:
+ command_dispatcher = self.get_feature(CommandDispatcher)
+ command_dispatcher.setup_arg_parser()
+ self._root = AppRootPath(self, Path(self._ensure_env("CRUPEST_PROJECT_DIR")))
+ self._data_dir = self._root.add_subpath(
+ self._ensure_env("CRUPEST_DATA_DIR"), True, id="data"
+ )
+ self._services_dir = self._root.add_subpath(
+ self._ensure_env("CRUPEST_SERVICES_DIR"), True, id="CRUPEST_SERVICES_DIR"
+ )
+ for feature in self.features:
+ feature.setup()
+ for path in self.paths:
+ path.check_self()
+
+ @property
+ def app_id(self) -> str:
+ return self._app_id
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ def _ensure_env(self, env_name: str) -> str:
+ value = os.getenv(env_name)
+ if value is None:
+ raise AppError(f"Environment variable {env_name} not set")
+ return value
+
+ @property
+ def root(self) -> AppRootPath:
+ return self._root
+
+ @property
+ def data_dir(self) -> AppFeaturePath:
+ return self._data_dir
+
+ @property
+ def services_dir(self) -> AppFeaturePath:
+ return self._services_dir
+
+ @property
+ def app_initialized(self) -> bool:
+ return self.data_dir.check_self()
+
+ @property
+ def features(self) -> list[AppFeatureProvider]:
+ return self._features
+
+ @property
+ def paths(self) -> list[AppFeaturePath]:
+ return self._paths
+
+ def add_feature(self, feature: _Feature) -> _Feature:
+ for f in self.features:
+ if f.name == feature.name:
+ raise AppFeatureError(
+ f"Duplicate feature name: {feature.name}.", feature.name
+ )
+ self._features.append(feature)
+ return feature
+
+ def _add_path(
+ self,
+ name: str,
+ is_dir: bool,
+ /,
+ parent: AppPath | None = None,
+ id: str | None = None,
+ description: str = "",
+ ) -> AppFeaturePath:
+ p = AppFeaturePath(
+ parent or self.root, name, is_dir, id=id, description=description
+ )
+ self._paths.append(p)
+ return p
+
+ @overload
+ def get_feature(self, feature: str) -> AppFeatureProvider: ...
+
+ @overload
+ def get_feature(self, feature: type[_Feature]) -> _Feature: ...
+
+ def get_feature(
+ self, feature: str | type[_Feature]
+ ) -> AppFeatureProvider | _Feature:
+ if isinstance(feature, str):
+ for f in self._features:
+ if f.name == feature:
+ return f
+ elif isinstance(feature, type):
+ for f in self._features:
+ if isinstance(f, feature):
+ return f
+ else:
+ raise CruLogicError("Argument must be the name of feature or its class.")
+
+ raise AppFeatureError(f"Feature {feature} not found.", feature)
+
+ def get_path(self, name: str) -> AppFeaturePath:
+ for p in self._paths:
+ if p.id == name or p.name == name:
+ return p
+ raise AppPathError(f"Application path {name} not found.", name)