From 90868bf85dc295f70620dbcbd5790999fe239550 Mon Sep 17 00:00:00 2001 From: Yuqian Yang Date: Sun, 23 Feb 2025 16:40:32 +0800 Subject: feat(python): move python codes. --- python/.gitignore | 3 + python/.python-version | 1 + python/cru/__init__.py | 60 +++++ python/cru/_base.py | 101 ++++++++ python/cru/_const.py | 49 ++++ python/cru/_decorator.py | 97 +++++++ python/cru/_error.py | 89 +++++++ python/cru/_event.py | 61 +++++ python/cru/_func.py | 172 +++++++++++++ python/cru/_helper.py | 16 ++ python/cru/_iter.py | 469 ++++++++++++++++++++++++++++++++++ python/cru/_type.py | 52 ++++ python/cru/attr.py | 364 ++++++++++++++++++++++++++ python/cru/config.py | 196 ++++++++++++++ python/cru/list.py | 160 ++++++++++++ python/cru/parsing.py | 290 +++++++++++++++++++++ python/cru/service/__init__.py | 0 python/cru/service/__main__.py | 27 ++ python/cru/service/_app.py | 30 +++ python/cru/service/_base.py | 400 +++++++++++++++++++++++++++++ python/cru/service/_gen_cmd.py | 200 +++++++++++++++ python/cru/service/_nginx.py | 263 +++++++++++++++++++ python/cru/service/_template.py | 228 +++++++++++++++++ python/cru/system.py | 23 ++ python/cru/template.py | 209 +++++++++++++++ python/cru/tool.py | 82 ++++++ python/cru/value.py | 292 +++++++++++++++++++++ python/poetry.lock | 111 ++++++++ python/pyproject.toml | 19 ++ services/.gitignore | 4 - services/.python-version | 1 - services/gen-tplt | 7 - services/git-add-user | 14 - services/manage | 18 +- services/manager/__init__.py | 60 ----- services/manager/_base.py | 101 -------- services/manager/_const.py | 49 ---- services/manager/_decorator.py | 97 ------- services/manager/_error.py | 89 ------- services/manager/_event.py | 61 ----- services/manager/_func.py | 172 ------------- services/manager/_helper.py | 16 -- services/manager/_iter.py | 469 ---------------------------------- services/manager/_type.py | 52 ---- services/manager/attr.py | 364 -------------------------- services/manager/config.py | 196 -------------- services/manager/list.py | 160 ------------ services/manager/parsing.py | 290 --------------------- services/manager/service/__init__.py | 0 services/manager/service/__main__.py | 27 -- services/manager/service/_app.py | 30 --- services/manager/service/_base.py | 398 ----------------------------- services/manager/service/_external.py | 81 ------ services/manager/service/_nginx.py | 263 ------------------- services/manager/service/_template.py | 228 ----------------- services/manager/system.py | 23 -- services/manager/template.py | 209 --------------- services/manager/tool.py | 82 ------ services/manager/value.py | 292 --------------------- services/poetry.lock | 111 -------- services/pyproject.toml | 19 -- services/update-blog | 5 - 62 files changed, 4078 insertions(+), 3974 deletions(-) create mode 100644 python/.gitignore create mode 100644 python/.python-version create mode 100644 python/cru/__init__.py create mode 100644 python/cru/_base.py create mode 100644 python/cru/_const.py create mode 100644 python/cru/_decorator.py create mode 100644 python/cru/_error.py create mode 100644 python/cru/_event.py create mode 100644 python/cru/_func.py create mode 100644 python/cru/_helper.py create mode 100644 python/cru/_iter.py create mode 100644 python/cru/_type.py create mode 100644 python/cru/attr.py create mode 100644 python/cru/config.py create mode 100644 python/cru/list.py create mode 100644 python/cru/parsing.py create mode 100644 python/cru/service/__init__.py create mode 100644 python/cru/service/__main__.py create mode 100644 python/cru/service/_app.py create mode 100644 python/cru/service/_base.py create mode 100644 python/cru/service/_gen_cmd.py create mode 100644 python/cru/service/_nginx.py create mode 100644 python/cru/service/_template.py create mode 100644 python/cru/system.py create mode 100644 python/cru/template.py create mode 100644 python/cru/tool.py create mode 100644 python/cru/value.py create mode 100644 python/poetry.lock create mode 100644 python/pyproject.toml delete mode 100644 services/.python-version delete mode 100755 services/gen-tplt delete mode 100755 services/git-add-user delete mode 100644 services/manager/__init__.py delete mode 100644 services/manager/_base.py delete mode 100644 services/manager/_const.py delete mode 100644 services/manager/_decorator.py delete mode 100644 services/manager/_error.py delete mode 100644 services/manager/_event.py delete mode 100644 services/manager/_func.py delete mode 100644 services/manager/_helper.py delete mode 100644 services/manager/_iter.py delete mode 100644 services/manager/_type.py delete mode 100644 services/manager/attr.py delete mode 100644 services/manager/config.py delete mode 100644 services/manager/list.py delete mode 100644 services/manager/parsing.py delete mode 100644 services/manager/service/__init__.py delete mode 100644 services/manager/service/__main__.py delete mode 100644 services/manager/service/_app.py delete mode 100644 services/manager/service/_base.py delete mode 100644 services/manager/service/_external.py delete mode 100644 services/manager/service/_nginx.py delete mode 100644 services/manager/service/_template.py delete mode 100644 services/manager/system.py delete mode 100644 services/manager/template.py delete mode 100644 services/manager/tool.py delete mode 100644 services/manager/value.py delete mode 100644 services/poetry.lock delete mode 100644 services/pyproject.toml delete mode 100755 services/update-blog diff --git a/python/.gitignore b/python/.gitignore new file mode 100644 index 0000000..f5833b1 --- /dev/null +++ b/python/.gitignore @@ -0,0 +1,3 @@ +__pycache__ +.venv +.mypy_cache diff --git a/python/.python-version b/python/.python-version new file mode 100644 index 0000000..2c07333 --- /dev/null +++ b/python/.python-version @@ -0,0 +1 @@ +3.11 diff --git a/python/cru/__init__.py b/python/cru/__init__.py new file mode 100644 index 0000000..17799a9 --- /dev/null +++ b/python/cru/__init__.py @@ -0,0 +1,60 @@ +import sys + +from ._base import CRU, CruNamespaceError, CRU_NAME_PREFIXES +from ._error import ( + CruException, + CruLogicError, + CruInternalError, + CruUnreachableError, + cru_unreachable, +) +from ._const import ( + CruConstantBase, + CruDontChange, + CruNotFound, + CruNoValue, + CruPlaceholder, + CruUseDefault, +) +from ._func import CruFunction +from ._iter import CruIterable, CruIterator +from ._event import CruEvent, CruEventHandlerToken +from ._type import CruTypeSet, CruTypeCheckError + + +class CruInitError(CruException): + pass + + +def check_python_version(required_version=(3, 11)): + if sys.version_info < required_version: + raise CruInitError(f"Python version must be >= {required_version}!") + + +check_python_version() + +__all__ = [ + "CRU", + "CruNamespaceError", + "CRU_NAME_PREFIXES", + "check_python_version", + "CruException", + "CruInternalError", + "CruLogicError", + "CruUnreachableError", + "cru_unreachable", + "CruInitError", + "CruConstantBase", + "CruDontChange", + "CruNotFound", + "CruNoValue", + "CruPlaceholder", + "CruUseDefault", + "CruFunction", + "CruIterable", + "CruIterator", + "CruEvent", + "CruEventHandlerToken", + "CruTypeSet", + "CruTypeCheckError", +] diff --git a/python/cru/_base.py b/python/cru/_base.py new file mode 100644 index 0000000..2599d8f --- /dev/null +++ b/python/cru/_base.py @@ -0,0 +1,101 @@ +from typing import Any + +from ._helper import remove_none +from ._error import CruException + + +class CruNamespaceError(CruException): + """Raised when a namespace is not found.""" + + +class _Cru: + NAME_PREFIXES = ("CRU_", "Cru", "cru_") + + def __init__(self) -> None: + self._d: dict[str, Any] = {} + + def all_names(self) -> list[str]: + return list(self._d.keys()) + + def get(self, name: str) -> Any: + return self._d[name] + + def has_name(self, name: str) -> bool: + return name in self._d + + @staticmethod + def _maybe_remove_prefix(name: str) -> str | None: + for prefix in _Cru.NAME_PREFIXES: + if name.startswith(prefix): + return name[len(prefix) :] + return None + + def _check_name_exist(self, *names: str | None) -> None: + for name in names: + if name is None: + continue + if self.has_name(name): + raise CruNamespaceError(f"Name {name} exists in CRU.") + + @staticmethod + def check_name_format(name: str) -> tuple[str, str]: + no_prefix_name = _Cru._maybe_remove_prefix(name) + if no_prefix_name is None: + raise CruNamespaceError( + f"Name {name} is not prefixed with any of {_Cru.NAME_PREFIXES}." + ) + return name, no_prefix_name + + @staticmethod + def _check_object_name(o) -> tuple[str, str]: + return _Cru.check_name_format(o.__name__) + + def _do_add(self, o, *names: str | None) -> list[str]: + name_list: list[str] = remove_none(names) + for name in name_list: + self._d[name] = o + return name_list + + def add(self, o, name: str | None) -> tuple[str, str | None]: + no_prefix_name: str | None + if name is None: + name, no_prefix_name = self._check_object_name(o) + else: + no_prefix_name = self._maybe_remove_prefix(name) + + self._check_name_exist(name, no_prefix_name) + self._do_add(o, name, no_prefix_name) + return name, no_prefix_name + + def add_with_alias(self, o, name: str | None = None, *aliases: str) -> list[str]: + final_names: list[str | None] = [] + no_prefix_name: str | None + if name is None: + name, no_prefix_name = self._check_object_name(o) + self._check_name_exist(name, no_prefix_name) + final_names.extend([name, no_prefix_name]) + for alias in aliases: + no_prefix_name = self._maybe_remove_prefix(alias) + self._check_name_exist(alias, no_prefix_name) + final_names.extend([alias, no_prefix_name]) + + return self._do_add(o, *final_names) + + def add_objects(self, *objects): + final_list = [] + for o in objects: + name, no_prefix_name = self._check_object_name(o) + self._check_name_exist(name, no_prefix_name) + final_list.append((o, name, no_prefix_name)) + for o, name, no_prefix_name in final_list: + self._do_add(o, name, no_prefix_name) + + def __getitem__(self, item): + return self.get(item) + + def __getattr__(self, item): + return self.get(item) + + +CRU_NAME_PREFIXES = _Cru.NAME_PREFIXES +CRU = _Cru() diff --git a/python/cru/_const.py b/python/cru/_const.py new file mode 100644 index 0000000..8246b35 --- /dev/null +++ b/python/cru/_const.py @@ -0,0 +1,49 @@ +from enum import Enum, auto +from typing import Self, TypeGuard, TypeVar + +from ._base import CRU + +_T = TypeVar("_T") + + +class CruConstantBase(Enum): + @classmethod + def check(cls, v: _T | Self) -> TypeGuard[Self]: + return isinstance(v, cls) + + @classmethod + def check_not(cls, v: _T | Self) -> TypeGuard[_T]: + return not cls.check(v) + + @classmethod + def value(cls) -> Self: + return cls.VALUE # type: ignore + + +class CruNotFound(CruConstantBase): + VALUE = auto() + + +class CruUseDefault(CruConstantBase): + VALUE = auto() + + +class CruDontChange(CruConstantBase): + VALUE = auto() + + +class CruNoValue(CruConstantBase): + VALUE = auto() + + +class CruPlaceholder(CruConstantBase): + VALUE = auto() + + +CRU.add_objects( + CruNotFound, + CruUseDefault, + CruDontChange, + CruNoValue, + CruPlaceholder, +) diff --git a/python/cru/_decorator.py b/python/cru/_decorator.py new file mode 100644 index 0000000..137fc05 --- /dev/null +++ b/python/cru/_decorator.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import ( + Concatenate, + Generic, + ParamSpec, + TypeVar, + cast, +) + +from ._base import CRU + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_O = TypeVar("_O") +_R = TypeVar("_R") + + +class CruDecorator: + + class ConvertResult(Generic[_T, _O]): + def __init__( + self, + converter: Callable[[_T], _O], + ) -> None: + self.converter = converter + + def __call__(self, origin: Callable[_P, _T]) -> Callable[_P, _O]: + converter = self.converter + + def real_impl(*args: _P.args, **kwargs: _P.kwargs) -> _O: + return converter(origin(*args, **kwargs)) + + return real_impl + + class ImplementedBy(Generic[_T, _O, _P, _R]): + def __init__( + self, + impl: Callable[Concatenate[_O, _P], _R], + converter: Callable[[_T], _O], + ) -> None: + self.impl = impl + self.converter = converter + + def __call__( + self, _origin: Callable[[_T], None] + ) -> Callable[Concatenate[_T, _P], _R]: + converter = self.converter + impl = self.impl + + def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: + return cast(Callable[Concatenate[_O, _P], _R], impl)( + converter(_self), *args, **kwargs + ) + + return real_impl + + @staticmethod + def create_factory(converter: Callable[[_T], _O]) -> Callable[ + [Callable[Concatenate[_O, _P], _R]], + CruDecorator.ImplementedBy[_T, _O, _P, _R], + ]: + def create( + m: Callable[Concatenate[_O, _P], _R], + ) -> CruDecorator.ImplementedBy[_T, _O, _P, _R]: + return CruDecorator.ImplementedBy(m, converter) + + return create + + class ImplementedByNoSelf(Generic[_P, _R]): + def __init__(self, impl: Callable[_P, _R]) -> None: + self.impl = impl + + def __call__( + self, _origin: Callable[[_T], None] + ) -> Callable[Concatenate[_T, _P], _R]: + impl = self.impl + + def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: + return cast(Callable[_P, _R], impl)(*args, **kwargs) + + return real_impl + + @staticmethod + def create_factory() -> ( + Callable[[Callable[_P, _R]], CruDecorator.ImplementedByNoSelf[_P, _R]] + ): + def create( + m: Callable[_P, _R], + ) -> CruDecorator.ImplementedByNoSelf[_P, _R]: + return CruDecorator.ImplementedByNoSelf(m) + + return create + + +CRU.add_objects(CruDecorator) diff --git a/python/cru/_error.py b/python/cru/_error.py new file mode 100644 index 0000000..e53c787 --- /dev/null +++ b/python/cru/_error.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +from typing import NoReturn, cast, overload + + +class CruException(Exception): + """Base exception class of all exceptions in cru.""" + + @overload + def __init__( + self, + message: None = None, + *args, + user_message: str, + **kwargs, + ): ... + + @overload + def __init__( + self, + message: str, + *args, + user_message: str | None = None, + **kwargs, + ): ... + + def __init__( + self, + message: str | None = None, + *args, + user_message: str | None = None, + **kwargs, + ): + if message is None: + message = user_message + + super().__init__( + message, + *args, + **kwargs, + ) + self._message: str + self._message = cast(str, message) + self._user_message = user_message + + @property + def message(self) -> str: + return self._message + + def get_user_message(self) -> str | None: + return self._user_message + + def get_message(self, use_user: bool = True) -> str: + if use_user and self._user_message is not None: + return self._user_message + else: + return self._message + + @property + def is_internal(self) -> bool: + return False + + @property + def is_logic_error(self) -> bool: + return False + + +class CruLogicError(CruException): + """Raised when a logic error occurs.""" + + @property + def is_logic_error(self) -> bool: + return True + + +class CruInternalError(CruException): + """Raised when an internal error occurs.""" + + @property + def is_internal(self) -> bool: + return True + + +class CruUnreachableError(CruInternalError): + """Raised when a code path is unreachable.""" + + +def cru_unreachable() -> NoReturn: + raise CruUnreachableError("Code should not reach here!") diff --git a/python/cru/_event.py b/python/cru/_event.py new file mode 100644 index 0000000..51a794c --- /dev/null +++ b/python/cru/_event.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import Generic, ParamSpec, TypeVar + +from .list import CruList + +_P = ParamSpec("_P") +_R = TypeVar("_R") + + +class CruEventHandlerToken(Generic[_P, _R]): + def __init__( + self, event: CruEvent, handler: Callable[_P, _R], once: bool = False + ) -> None: + self._event = event + self._handler = handler + self._once = once + + @property + def event(self) -> CruEvent: + return self._event + + @property + def handler(self) -> Callable[_P, _R]: + return self._handler + + @property + def once(self) -> bool: + return self._once + + +class CruEvent(Generic[_P, _R]): + def __init__(self, name: str) -> None: + self._name = name + self._tokens: CruList[CruEventHandlerToken] = CruList() + + def register( + self, handler: Callable[_P, _R], once: bool = False + ) -> CruEventHandlerToken: + token = CruEventHandlerToken(self, handler, once) + self._tokens.append(token) + return token + + def unregister(self, *handlers: CruEventHandlerToken | Callable[_P, _R]) -> int: + old_length = len(self._tokens) + self._tokens.reset( + self._tokens.as_cru_iterator().filter( + (lambda t: t in handlers or t.handler in handlers) + ) + ) + return old_length - len(self._tokens) + + def trigger(self, *args: _P.args, **kwargs: _P.kwargs) -> CruList[_R]: + results = CruList( + self._tokens.as_cru_iterator() + .transform(lambda t: t.handler(*args, **kwargs)) + .to_list() + ) + self._tokens.reset(self._tokens.as_cru_iterator().filter(lambda t: not t.once)) + return results diff --git a/python/cru/_func.py b/python/cru/_func.py new file mode 100644 index 0000000..fc57802 --- /dev/null +++ b/python/cru/_func.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +from collections.abc import Callable, Iterable +from enum import Flag, auto +from typing import ( + Any, + Generic, + Literal, + ParamSpec, + TypeAlias, + TypeVar, +) + + +from ._base import CRU +from ._const import CruPlaceholder + +_P = ParamSpec("_P") +_P1 = ParamSpec("_P1") +_T = TypeVar("_T") + + +class _Dec: + @staticmethod + def wrap( + origin: Callable[_P, Callable[_P1, _T]] + ) -> Callable[_P, _Wrapper[_P1, _T]]: + def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_P1, _T]: + return _Wrapper(origin(*args, **kwargs)) + + return _wrapped + + +class _RawBase: + @staticmethod + def none(*_v, **_kwargs) -> None: + return None + + @staticmethod + def true(*_v, **_kwargs) -> Literal[True]: + return True + + @staticmethod + def false(*_v, **_kwargs) -> Literal[False]: + return False + + @staticmethod + def identity(v: _T) -> _T: + return v + + @staticmethod + def only_you(v: _T, *_v, **_kwargs) -> _T: + return v + + @staticmethod + def equal(a: Any, b: Any) -> bool: + return a == b + + @staticmethod + def not_equal(a: Any, b: Any) -> bool: + return a != b + + @staticmethod + def not_(v: Any) -> Any: + return not v + + +class _Wrapper(Generic[_P, _T]): + def __init__(self, f: Callable[_P, _T]): + self._f = f + + @property + def me(self) -> Callable[_P, _T]: + return self._f + + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: + return self._f(*args, **kwargs) + + @_Dec.wrap + def bind(self, *bind_args, **bind_kwargs) -> Callable[..., _T]: + func = self.me + + def bound_func(*args, **kwargs): + popped = 0 + real_args = [] + for arg in bind_args: + if CruPlaceholder.check(arg): + real_args.append(args[popped]) + popped += 1 + else: + real_args.append(arg) + real_args.extend(args[popped:]) + return func(*real_args, **(bind_kwargs | kwargs)) + + return bound_func + + class ChainMode(Flag): + ARGS = auto() + KWARGS = auto() + BOTH = ARGS | KWARGS + + ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]] + KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]] + ChainableCallable: TypeAlias = Callable[ + ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]] + ] + + @_Dec.wrap + def chain_with_args( + self, funcs: Iterable[ArgsChainableCallable], *bind_args, **bind_kwargs + ) -> ArgsChainableCallable: + def chained_func(*args): + args = self.bind(*bind_args, **bind_kwargs)(*args) + + for func in funcs: + args = _Wrapper(func).bind(*bind_args, **bind_kwargs)(*args) + return args + + return chained_func + + @_Dec.wrap + def chain_with_kwargs( + self, funcs: Iterable[KwargsChainableCallable], *bind_args, **bind_kwargs + ) -> KwargsChainableCallable: + def chained_func(**kwargs): + kwargs = self.bind(*bind_args, **bind_kwargs)(**kwargs) + for func in funcs: + kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(**kwargs) + return kwargs + + return chained_func + + @_Dec.wrap + def chain_with_both( + self, funcs: Iterable[ChainableCallable], *bind_args, **bind_kwargs + ) -> ChainableCallable: + def chained_func(*args, **kwargs): + for func in funcs: + args, kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)( + *args, **kwargs + ) + return args, kwargs + + return chained_func + + +class _Base: + none = _Wrapper(_RawBase.none) + true = _Wrapper(_RawBase.true) + false = _Wrapper(_RawBase.false) + identity = _Wrapper(_RawBase.identity) + only_you = _Wrapper(_RawBase.only_you) + equal = _Wrapper(_RawBase.equal) + not_equal = _Wrapper(_RawBase.not_equal) + not_ = _Wrapper(_RawBase.not_) + + +class _Creators: + @staticmethod + def make_isinstance_of_types(*types: type) -> Callable: + return _Wrapper(lambda v: type(v) in types) + + +class CruFunction: + RawBase: TypeAlias = _RawBase + Base: TypeAlias = _Base + Creators: TypeAlias = _Creators + Wrapper: TypeAlias = _Wrapper + Decorators: TypeAlias = _Dec + + +CRU.add_objects(CruFunction) diff --git a/python/cru/_helper.py b/python/cru/_helper.py new file mode 100644 index 0000000..43baf46 --- /dev/null +++ b/python/cru/_helper.py @@ -0,0 +1,16 @@ +from collections.abc import Callable +from typing import Any, Iterable, TypeVar, cast + +_T = TypeVar("_T") +_D = TypeVar("_D") + + +def remove_element( + iterable: Iterable[_T | None], to_rm: Iterable[Any], des: type[_D] | None = None +) -> _D: + to_rm = set(to_rm) + return cast(Callable[..., _D], des or list)(v for v in iterable if v not in to_rm) + + +def remove_none(iterable: Iterable[_T | None], des: type[_D] | None = None) -> _D: + return cast(Callable[..., _D], des or list)(v for v in iterable if v is not None) diff --git a/python/cru/_iter.py b/python/cru/_iter.py new file mode 100644 index 0000000..f9683ca --- /dev/null +++ b/python/cru/_iter.py @@ -0,0 +1,469 @@ +from __future__ import annotations + +from collections.abc import Iterable, Callable, Generator, Iterator +from dataclasses import dataclass +from enum import Enum +from typing import ( + Concatenate, + Literal, + Never, + Self, + TypeAlias, + TypeVar, + ParamSpec, + Any, + Generic, + cast, +) + +from ._base import CRU +from ._const import CruNotFound +from ._error import cru_unreachable + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_O = TypeVar("_O") +_V = TypeVar("_V") +_R = TypeVar("_R") + + +class _Generic: + class StepActionKind(Enum): + SKIP = 0 + PUSH = 1 + STOP = 2 + AGGREGATE = 3 + + @dataclass + class StepAction(Generic[_V, _R]): + value: Iterable[Self] | _V | _R | None + kind: _Generic.StepActionKind + + @property + def push_value(self) -> _V: + assert self.kind == _Generic.StepActionKind.PUSH + return cast(_V, self.value) + + @property + def stop_value(self) -> _R: + assert self.kind == _Generic.StepActionKind.STOP + return cast(_R, self.value) + + @staticmethod + def skip() -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(None, _Generic.StepActionKind.SKIP) + + @staticmethod + def push(value: _V | None) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(value, _Generic.StepActionKind.PUSH) + + @staticmethod + def stop(value: _R | None = None) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(value, _Generic.StepActionKind.STOP) + + @staticmethod + def aggregate( + *results: _Generic.StepAction[_V, _R], + ) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(results, _Generic.StepActionKind.AGGREGATE) + + @staticmethod + def push_last(value: _V | None) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction.aggregate( + _Generic.StepAction.push(value), _Generic.StepAction.stop() + ) + + def flatten(self) -> Iterable[Self]: + return _Generic.flatten( + self, + is_leave=lambda r: r.kind != _Generic.StepActionKind.AGGREGATE, + get_children=lambda r: cast(Iterable[Self], r.value), + ) + + GeneralStepAction: TypeAlias = StepAction[_V, _R] | _V | _R | None + IterateOperation: TypeAlias = Callable[[_T, int], GeneralStepAction[_V, _R]] + IteratePreHook: TypeAlias = Callable[[Iterable[_T]], GeneralStepAction[_V, _R]] + IteratePostHook: TypeAlias = Callable[[int], GeneralStepAction[_V, _R]] + + @staticmethod + def _is_not_iterable(o: Any) -> bool: + return not isinstance(o, Iterable) + + @staticmethod + def _return_self(o): + return o + + @staticmethod + def iterable_flatten( + maybe_iterable: Iterable[_T] | _T, max_depth: int = -1, *, _depth: int = 0 + ) -> Iterable[Iterable[_T] | _T]: + if _depth == max_depth or not isinstance(maybe_iterable, Iterable): + yield maybe_iterable + return + + for child in maybe_iterable: + yield from _Generic.iterable_flatten( + child, + max_depth, + _depth=_depth + 1, + ) + + @staticmethod + def flatten( + o: _O, + max_depth: int = -1, + /, + is_leave: CruIterator.ElementPredicate[_O] = _is_not_iterable, + get_children: CruIterator.ElementTransformer[_O, Iterable[_O]] = _return_self, + *, + _depth: int = 0, + ) -> Iterable[_O]: + if _depth == max_depth or is_leave(o): + yield o + return + for child in get_children(o): + yield from _Generic.flatten( + child, + max_depth, + is_leave, + get_children, + _depth=_depth + 1, + ) + + class Results: + @staticmethod + def true(_) -> Literal[True]: + return True + + @staticmethod + def false(_) -> Literal[False]: + return False + + @staticmethod + def not_found(_) -> Literal[CruNotFound.VALUE]: + return CruNotFound.VALUE + + @staticmethod + def _non_result_to_push(value: Any) -> StepAction[_V, _R]: + return _Generic.StepAction.push(value) + + @staticmethod + def _non_result_to_stop(value: Any) -> StepAction[_V, _R]: + return _Generic.StepAction.stop(value) + + @staticmethod + def _none_hook(_: Any) -> StepAction[_V, _R]: + return _Generic.StepAction.skip() + + def iterate( + iterable: Iterable[_T], + operation: IterateOperation[_T, _V, _R], + fallback_return: _R, + pre_iterate: IteratePreHook[_T, _V, _R], + post_iterate: IteratePostHook[_V, _R], + convert_value_result: Callable[[_V | _R | None], StepAction[_V, _R]], + ) -> Generator[_V, None, _R]: + pre_result = pre_iterate(iterable) + if not isinstance(pre_result, _Generic.StepAction): + real_pre_result = convert_value_result(pre_result) + for r in real_pre_result.flatten(): + if r.kind == _Generic.StepActionKind.STOP: + return r.stop_value + elif r.kind == _Generic.StepActionKind.PUSH: + yield r.push_value + else: + assert r.kind == _Generic.StepActionKind.SKIP + + for index, element in enumerate(iterable): + result = operation(element, index) + if not isinstance(result, _Generic.StepAction): + real_result = convert_value_result(result) + for r in real_result.flatten(): + if r.kind == _Generic.StepActionKind.STOP: + return r.stop_value + elif r.kind == _Generic.StepActionKind.PUSH: + yield r.push_value + else: + assert r.kind == _Generic.StepActionKind.SKIP + continue + + post_result = post_iterate(index + 1) + if not isinstance(post_result, _Generic.StepAction): + real_post_result = convert_value_result(post_result) + for r in real_post_result.flatten(): + if r.kind == _Generic.StepActionKind.STOP: + return r.stop_value + elif r.kind == _Generic.StepActionKind.PUSH: + yield r.push_value + else: + assert r.kind == _Generic.StepActionKind.SKIP + + return fallback_return + + def create_new( + iterable: Iterable[_T], + operation: IterateOperation[_T, _V, _R], + fallback_return: _R, + /, + pre_iterate: IteratePreHook[_T, _V, _R] | None = None, + post_iterate: IteratePostHook[_V, _R] | None = None, + ) -> Generator[_V, None, _R]: + return _Generic.iterate( + iterable, + operation, + fallback_return, + pre_iterate or _Generic._none_hook, + post_iterate or _Generic._none_hook, + _Generic._non_result_to_push, + ) + + def get_result( + iterable: Iterable[_T], + operation: IterateOperation[_T, _V, _R], + fallback_return: _R, + /, + pre_iterate: IteratePreHook[_T, _V, _R] | None = None, + post_iterate: IteratePostHook[_V, _R] | None = None, + ) -> _R: + try: + for _ in _Generic.iterate( + iterable, + operation, + fallback_return, + pre_iterate or _Generic._none_hook, + post_iterate or _Generic._none_hook, + _Generic._non_result_to_stop, + ): + pass + except StopIteration as stop: + return stop.value + cru_unreachable() + + +class _Helpers: + @staticmethod + def auto_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]: + count = 0 + + def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _O: + nonlocal count + r = c(count, *args, **kwargs) + count += 1 + return r + + return wrapper + + +class _Creators: + class Raw: + @staticmethod + def empty() -> Iterator[Never]: + return iter([]) + + @staticmethod + def range(*args) -> Iterator[int]: + return iter(range(*args)) + + @staticmethod + def unite(*args: _T) -> Iterator[_T]: + return iter(args) + + @staticmethod + def _concat(*iterables: Iterable[_T]) -> Iterable[_T]: + for iterable in iterables: + yield from iterable + + @staticmethod + def concat(*iterables: Iterable[_T]) -> Iterator[_T]: + return iter(_Creators.Raw._concat(*iterables)) + + @staticmethod + def _wrap(f: Callable[_P, Iterable[_O]]) -> Callable[_P, CruIterator[_O]]: + def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> CruIterator[_O]: + return CruIterator(f(*args, **kwargs)) + + return _wrapped + + empty = _wrap(Raw.empty) + range = _wrap(Raw.range) + unite = _wrap(Raw.unite) + concat = _wrap(Raw.concat) + + +class CruIterator(Generic[_T]): + ElementOperation: TypeAlias = Callable[[_V], Any] + ElementPredicate: TypeAlias = Callable[[_V], bool] + AnyElementPredicate: TypeAlias = ElementPredicate[Any] + ElementTransformer: TypeAlias = Callable[[_V], _O] + SelfElementTransformer: TypeAlias = ElementTransformer[_V, _V] + AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] + + Creators: TypeAlias = _Creators + Helpers: TypeAlias = _Helpers + + def __init__(self, iterable: Iterable[_T]) -> None: + self._iterator = iter(iterable) + + def __iter__(self) -> Iterator[_T]: + return self._iterator + + def create_new_me(self, iterable: Iterable[_O]) -> CruIterator[_O]: + return type(self)(iterable) # type: ignore + + @staticmethod + def _wrap( + f: Callable[Concatenate[CruIterator[_T], _P], Iterable[_O]], + ) -> Callable[Concatenate[CruIterator[_T], _P], CruIterator[_O]]: + def _wrapped( + self: CruIterator[_T], *args: _P.args, **kwargs: _P.kwargs + ) -> CruIterator[_O]: + return self.create_new_me(f(self, *args, **kwargs)) + + return _wrapped + + @_wrap + def replace_me(self, iterable: Iterable[_O]) -> Iterable[_O]: + return iterable + + def replace_me_with_empty(self) -> CruIterator[Never]: + return self.create_new_me(_Creators.Raw.empty()) + + def replace_me_with_range(self, *args) -> CruIterator[int]: + return self.create_new_me(_Creators.Raw.range(*args)) + + def replace_me_with_unite(self, *args: _O) -> CruIterator[_O]: + return self.create_new_me(_Creators.Raw.unite(*args)) + + def replace_me_with_concat(self, *iterables: Iterable[_T]) -> CruIterator[_T]: + return self.create_new_me(_Creators.Raw.concat(*iterables)) + + def to_set(self) -> set[_T]: + return set(self) + + def to_list(self) -> list[_T]: + return list(self) + + def all(self, predicate: ElementPredicate[_T]) -> bool: + for value in self: + if not predicate(value): + return False + return True + + def any(self, predicate: ElementPredicate[_T]) -> bool: + for value in self: + if predicate(value): + return True + return False + + def foreach(self, operation: ElementOperation[_T]) -> None: + for value in self: + operation(value) + + @_wrap + def transform(self, transformer: ElementTransformer[_T, _O]) -> Iterable[_O]: + for value in self: + yield transformer(value) + + map = transform + + @_wrap + def filter(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: + for value in self: + if predicate(value): + yield value + + @_wrap + def continue_if(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: + for value in self: + yield value + if not predicate(value): + break + + def first_n(self, max_count: int) -> CruIterator[_T]: + if max_count < 0: + raise ValueError("max_count must be 0 or positive.") + if max_count == 0: + return self.replace_me_with_empty() # type: ignore + return self.continue_if(_Helpers.auto_count(lambda i, _: i < max_count - 1)) + + def drop_n(self, n: int) -> CruIterator[_T]: + if n < 0: + raise ValueError("n must be 0 or positive.") + if n == 0: + return self + return self.filter(_Helpers.auto_count(lambda i, _: i < n)) + + def single_or( + self, fallback: _O | CruNotFound = CruNotFound.VALUE + ) -> _T | _O | CruNotFound: + first_2 = self.first_n(2) + has_value = False + for element in first_2: + if has_value: + raise ValueError("More than one value found.") + has_value = True + value = element + if has_value: + return value + else: + return fallback + + def first_or( + self, fallback: _O | CruNotFound = CruNotFound.VALUE + ) -> _T | _O | CruNotFound: + return self.first_n(1).single_or(fallback) + + @_wrap + def flatten(self, max_depth: int = -1) -> Iterable[Any]: + return _Generic.iterable_flatten(self, max_depth) + + def select_by_indices(self, indices: Iterable[int]) -> CruIterator[_T]: + index_set = set(indices) + max_index = max(index_set) + return self.first_n(max_index + 1).filter( + _Helpers.auto_count(lambda i, _: i in index_set) + ) + + def remove_values(self, values: Iterable[Any]) -> CruIterator[_T]: + value_set = set(values) + return self.filter(lambda v: v not in value_set) + + def replace_values( + self, old_values: Iterable[Any], new_value: _O + ) -> Iterable[_T | _O]: + value_set = set(old_values) + return self.transform(lambda v: new_value if v in value_set else v) + + def group_by(self, key_getter: Callable[[_T], _O]) -> dict[_O, list[_T]]: + result: dict[_O, list[_T]] = {} + + for item in self: + key = key_getter(item) + if key not in result: + result[key] = [] + result[key].append(item) + + return result + + def join_str(self: CruIterator[str], separator: str) -> str: + return separator.join(self) + + +class CruIterMixin(Generic[_T]): + def cru_iter(self: Iterable[_T]) -> CruIterator[_T]: + return CruIterator(self) + + +class CruIterList(list[_T], CruIterMixin[_T]): + pass + + +class CruIterable: + Generic: TypeAlias = _Generic + Iterator: TypeAlias = CruIterator[_T] + Helpers: TypeAlias = _Helpers + Mixin: TypeAlias = CruIterMixin[_T] + IterList: TypeAlias = CruIterList[_T] + + +CRU.add_objects(CruIterable, CruIterator) diff --git a/python/cru/_type.py b/python/cru/_type.py new file mode 100644 index 0000000..1f81da3 --- /dev/null +++ b/python/cru/_type.py @@ -0,0 +1,52 @@ +from collections.abc import Iterable +from typing import Any + +from ._error import CruException, CruLogicError +from ._iter import CruIterator + + +class CruTypeCheckError(CruException): + pass + + +DEFAULT_NONE_ERR_MSG = "None is not allowed here." +DEFAULT_TYPE_ERR_MSG = "Object of this type is not allowed here." + + +class CruTypeSet(set[type]): + def __init__(self, *types: type): + type_set = CruIterator(types).filter(lambda t: t is not None).to_set() + if not CruIterator(type_set).all(lambda t: isinstance(t, type)): + raise CruLogicError("TypeSet can only contain type.") + super().__init__(type_set) + + def check_value( + self, + value: Any, + /, + allow_none: bool = False, + empty_allow_all: bool = True, + ) -> None: + if value is None: + if allow_none: + return + else: + raise CruTypeCheckError(DEFAULT_NONE_ERR_MSG) + if len(self) == 0 and empty_allow_all: + return + if not CruIterator(self).any(lambda t: isinstance(value, t)): + raise CruTypeCheckError(DEFAULT_TYPE_ERR_MSG) + + def check_value_list( + self, + values: Iterable[Any], + /, + allow_none: bool = False, + empty_allow_all: bool = True, + ) -> None: + for value in values: + self.check_value( + value, + allow_none, + empty_allow_all, + ) diff --git a/python/cru/attr.py b/python/cru/attr.py new file mode 100644 index 0000000..d4cc86a --- /dev/null +++ b/python/cru/attr.py @@ -0,0 +1,364 @@ +from __future__ import annotations + +import copy +from collections.abc import Callable, Iterable +from dataclasses import dataclass, field +from typing import Any + +from .list import CruUniqueKeyList +from ._type import CruTypeSet +from ._const import CruNotFound, CruUseDefault, CruDontChange +from ._iter import CruIterator + + +@dataclass +class CruAttr: + + name: str + value: Any + description: str | None + + @staticmethod + def make( + name: str, value: Any = CruUseDefault.VALUE, description: str | None = None + ) -> CruAttr: + return CruAttr(name, value, description) + + +CruAttrDefaultFactory = Callable[["CruAttrDef"], Any] +CruAttrTransformer = Callable[[Any, "CruAttrDef"], Any] +CruAttrValidator = Callable[[Any, "CruAttrDef"], None] + + +@dataclass +class CruAttrDef: + name: str + description: str + default_factory: CruAttrDefaultFactory + transformer: CruAttrTransformer + validator: CruAttrValidator + + def __init__( + self, + name: str, + description: str, + default_factory: CruAttrDefaultFactory, + transformer: CruAttrTransformer, + validator: CruAttrValidator, + ) -> None: + self.name = name + self.description = description + self.default_factory = default_factory + self.transformer = transformer + self.validator = validator + + def transform(self, value: Any) -> Any: + if self.transformer is not None: + return self.transformer(value, self) + return value + + def validate(self, value: Any, /, force_allow_none: bool = False) -> None: + if force_allow_none is value is None: + return + if self.validator is not None: + self.validator(value, self) + + def transform_and_validate( + self, value: Any, /, force_allow_none: bool = False + ) -> Any: + value = self.transform(value) + self.validate(value, force_allow_none) + return value + + def make_default_value(self) -> Any: + return self.transform_and_validate(self.default_factory(self)) + + def adopt(self, attr: CruAttr) -> CruAttr: + attr = copy.deepcopy(attr) + + if attr.name is None: + attr.name = self.name + elif attr.name != self.name: + raise ValueError(f"Attr name is not match: {attr.name} != {self.name}") + + if attr.value is CruUseDefault.VALUE: + attr.value = self.make_default_value() + else: + attr.value = self.transform_and_validate(attr.value) + + if attr.description is None: + attr.description = self.description + + return attr + + def make( + self, value: Any = CruUseDefault.VALUE, description: None | str = None + ) -> CruAttr: + value = self.make_default_value() if value is CruUseDefault.VALUE else value + value = self.transform_and_validate(value) + return CruAttr( + self.name, + value, + description if description is not None else self.description, + ) + + +@dataclass +class CruAttrDefBuilder: + + name: str + description: str + types: list[type] | None = field(default=None) + allow_none: bool = field(default=False) + default: Any = field(default=CruUseDefault.VALUE) + default_factory: CruAttrDefaultFactory | None = field(default=None) + auto_list: bool = field(default=False) + transformers: list[CruAttrTransformer] = field(default_factory=list) + validators: list[CruAttrValidator] = field(default_factory=list) + override_transformer: CruAttrTransformer | None = field(default=None) + override_validator: CruAttrValidator | None = field(default=None) + + build_hook: Callable[[CruAttrDef], None] | None = field(default=None) + + def __init__(self, name: str, description: str) -> None: + super().__init__() + self.name = name + self.description = description + + def auto_adjust_default(self) -> None: + if self.default is not CruUseDefault.VALUE and self.default is not None: + return + if self.allow_none and self.default is CruUseDefault.VALUE: + self.default = None + if not self.allow_none and self.default is None: + self.default = CruUseDefault.VALUE + if self.auto_list and not self.allow_none: + self.default = [] + + def with_name(self, name: str | CruDontChange) -> CruAttrDefBuilder: + if name is not CruDontChange.VALUE: + self.name = name + return self + + def with_description( + self, default_description: str | CruDontChange + ) -> CruAttrDefBuilder: + if default_description is not CruDontChange.VALUE: + self.description = default_description + return self + + def with_default(self, default: Any) -> CruAttrDefBuilder: + if default is not CruDontChange.VALUE: + self.default = default + return self + + def with_default_factory( + self, + default_factory: CruAttrDefaultFactory | CruDontChange, + ) -> CruAttrDefBuilder: + if default_factory is not CruDontChange.VALUE: + self.default_factory = default_factory + return self + + def with_types( + self, + types: Iterable[type] | None | CruDontChange, + ) -> CruAttrDefBuilder: + if types is not CruDontChange.VALUE: + self.types = None if types is None else list(types) + return self + + def with_allow_none(self, allow_none: bool | CruDontChange) -> CruAttrDefBuilder: + if allow_none is not CruDontChange.VALUE: + self.allow_none = allow_none + return self + + def with_auto_list( + self, auto_list: bool | CruDontChange = True + ) -> CruAttrDefBuilder: + if auto_list is not CruDontChange.VALUE: + self.auto_list = auto_list + return self + + def with_constraint( + self, + /, + allow_none: bool | CruDontChange = CruDontChange.VALUE, + types: Iterable[type] | None | CruDontChange = CruDontChange.VALUE, + default: Any = CruDontChange.VALUE, + default_factory: CruAttrDefaultFactory | CruDontChange = CruDontChange.VALUE, + auto_list: bool | CruDontChange = CruDontChange.VALUE, + ) -> CruAttrDefBuilder: + return ( + self.with_allow_none(allow_none) + .with_types(types) + .with_default(default) + .with_default_factory(default_factory) + .with_auto_list(auto_list) + ) + + def add_transformer(self, transformer: CruAttrTransformer) -> CruAttrDefBuilder: + self.transformers.append(transformer) + return self + + def clear_transformers(self) -> CruAttrDefBuilder: + self.transformers.clear() + return self + + def add_validator(self, validator: CruAttrValidator) -> CruAttrDefBuilder: + self.validators.append(validator) + return self + + def clear_validators(self) -> CruAttrDefBuilder: + self.validators.clear() + return self + + def with_override_transformer( + self, override_transformer: CruAttrTransformer | None | CruDontChange + ) -> CruAttrDefBuilder: + if override_transformer is not CruDontChange.VALUE: + self.override_transformer = override_transformer + return self + + def with_override_validator( + self, override_validator: CruAttrValidator | None | CruDontChange + ) -> CruAttrDefBuilder: + if override_validator is not CruDontChange.VALUE: + self.override_validator = override_validator + return self + + def is_valid(self) -> tuple[bool, str]: + if not isinstance(self.name, str): + return False, "Name must be a string!" + if not isinstance(self.description, str): + return False, "Default description must be a string!" + if ( + not self.allow_none + and self.default is None + and self.default_factory is None + ): + return False, "Default must be set if allow_none is False!" + return True, "" + + @staticmethod + def _build( + builder: CruAttrDefBuilder, auto_adjust_default: bool = True + ) -> CruAttrDef: + if auto_adjust_default: + builder.auto_adjust_default() + + valid, err = builder.is_valid() + if not valid: + raise ValueError(err) + + def composed_transformer(value: Any, attr_def: CruAttrDef) -> Any: + def transform_value(single_value: Any) -> Any: + for transformer in builder.transformers: + single_value = transformer(single_value, attr_def) + return single_value + + if builder.auto_list: + if not isinstance(value, list): + value = [value] + value = CruIterator(value).transform(transform_value).to_list() + + else: + value = transform_value(value) + return value + + type_set = None if builder.types is None else CruTypeSet(*builder.types) + + def composed_validator(value: Any, attr_def: CruAttrDef): + def validate_value(single_value: Any) -> None: + if type_set is not None: + type_set.check_value(single_value, allow_none=builder.allow_none) + for validator in builder.validators: + validator(single_value, attr_def) + + if builder.auto_list: + CruIterator(value).foreach(validate_value) + else: + validate_value(value) + + real_transformer = builder.override_transformer or composed_transformer + real_validator = builder.override_validator or composed_validator + + default_factory = builder.default_factory + if default_factory is None: + + def default_factory(_d): + return copy.deepcopy(builder.default) + + d = CruAttrDef( + builder.name, + builder.description, + default_factory, + real_transformer, + real_validator, + ) + if builder.build_hook: + builder.build_hook(d) + return d + + def build(self, auto_adjust_default=True) -> CruAttrDef: + c = copy.deepcopy(self) + self.build_hook = None + return CruAttrDefBuilder._build(c, auto_adjust_default) + + +class CruAttrDefRegistry(CruUniqueKeyList[CruAttrDef, str]): + + def __init__(self) -> None: + super().__init__(lambda d: d.name) + + def make_builder(self, name: str, default_description: str) -> CruAttrDefBuilder: + b = CruAttrDefBuilder(name, default_description) + b.build_hook = lambda a: self.add(a) + return b + + def adopt(self, attr: CruAttr) -> CruAttr: + d = self.get(attr.name) + return d.adopt(attr) + + +class CruAttrTable(CruUniqueKeyList[CruAttr, str]): + def __init__(self, registry: CruAttrDefRegistry) -> None: + self._registry: CruAttrDefRegistry = registry + super().__init__(lambda a: a.name, before_add=registry.adopt) + + @property + def registry(self) -> CruAttrDefRegistry: + return self._registry + + def get_value_or(self, name: str, fallback: Any = CruNotFound.VALUE) -> Any: + a = self.get_or(name, CruNotFound.VALUE) + if a is CruNotFound.VALUE: + return fallback + return a.value + + def get_value(self, name: str) -> Any: + a = self.get(name) + return a.value + + def make_attr( + self, + name: str, + value: Any = CruUseDefault.VALUE, + /, + description: str | None = None, + ) -> CruAttr: + d = self._registry.get(name) + return d.make(value, description or d.description) + + def add_value( + self, + name: str, + value: Any = CruUseDefault.VALUE, + /, + description: str | None = None, + *, + replace: bool = False, + ) -> CruAttr: + attr = self.make_attr(name, value, description) + self.add(attr, replace) + return attr diff --git a/python/cru/config.py b/python/cru/config.py new file mode 100644 index 0000000..0f6f0d0 --- /dev/null +++ b/python/cru/config.py @@ -0,0 +1,196 @@ +from __future__ import annotations + +from typing import Any, TypeVar, Generic + +from ._error import CruException +from .list import CruUniqueKeyList +from .value import ( + INTEGER_VALUE_TYPE, + TEXT_VALUE_TYPE, + CruValueTypeError, + ValueGeneratorBase, + ValueType, +) + +_T = TypeVar("_T") + + +class CruConfigError(CruException): + def __init__(self, message: str, item: ConfigItem, *args, **kwargs): + super().__init__(message, *args, **kwargs) + self._item = item + + @property + def item(self) -> ConfigItem: + return self._item + + +class ConfigItem(Generic[_T]): + def __init__( + self, + name: str, + description: str, + value_type: ValueType[_T], + value: _T | None = None, + /, + default: ValueGeneratorBase[_T] | _T | None = None, + ) -> None: + self._name = name + self._description = description + self._value_type = value_type + self._value = value + self._default = default + + @property + def name(self) -> str: + return self._name + + @property + def description(self) -> str: + return self._description + + @property + def value_type(self) -> ValueType[_T]: + return self._value_type + + @property + def is_set(self) -> bool: + return self._value is not None + + @property + def value(self) -> _T: + if self._value is None: + raise CruConfigError( + "Config value is not set.", + self, + user_message=f"Config item {self.name} is not set.", + ) + return self._value + + @property + def value_str(self) -> str: + return self.value_type.convert_value_to_str(self.value) + + def set_value(self, v: _T | str, allow_convert_from_str=False): + if allow_convert_from_str: + self._value = self.value_type.check_value_or_try_convert_from_str(v) + else: + self._value = self.value_type.check_value(v) + + def reset(self): + self._value = None + + @property + def default(self) -> ValueGeneratorBase[_T] | _T | None: + return self._default + + @property + def can_generate_default(self) -> bool: + return self.default is not None + + def generate_default_value(self) -> _T: + if self.default is None: + raise CruConfigError( + "Config item does not support default value generation.", self + ) + elif isinstance(self.default, ValueGeneratorBase): + v = self.default.generate() + else: + v = self.default + try: + self.value_type.check_value(v) + return v + except CruValueTypeError as e: + raise CruConfigError( + "Config value generator returns an invalid value.", self + ) from e + + def copy(self) -> "ConfigItem": + return ConfigItem( + self.name, + self.description, + self.value_type, + self.value, + self.default, + ) + + @property + def description_str(self) -> str: + return f"{self.name} ({self.value_type.name}): {self.description}" + + +class Configuration(CruUniqueKeyList[ConfigItem[Any], str]): + def __init__(self): + super().__init__(lambda c: c.name) + + def get_set_items(self) -> list[ConfigItem[Any]]: + return [item for item in self if item.is_set] + + def get_unset_items(self) -> list[ConfigItem[Any]]: + return [item for item in self if not item.is_set] + + @property + def all_set(self) -> bool: + return len(self.get_unset_items()) == 0 + + @property + def all_not_set(self) -> bool: + return len(self.get_set_items()) == 0 + + def add_text_config( + self, + name: str, + description: str, + value: str | None = None, + default: ValueGeneratorBase[str] | str | None = None, + ) -> ConfigItem[str]: + item = ConfigItem(name, description, TEXT_VALUE_TYPE, value, default) + self.add(item) + return item + + def add_int_config( + self, + name: str, + description: str, + value: int | None = None, + default: ValueGeneratorBase[int] | int | None = None, + ) -> ConfigItem[int]: + item = ConfigItem(name, description, INTEGER_VALUE_TYPE, value, default) + self.add(item) + return item + + def set_config_item( + self, + name: str, + value: Any | str, + allow_convert_from_str=True, + ) -> None: + item = self.get(name) + item.set_value( + value, + allow_convert_from_str=allow_convert_from_str, + ) + + def reset_all(self) -> None: + for item in self: + item.reset() + + def to_dict(self) -> dict[str, Any]: + return {item.name: item.value for item in self} + + def to_str_dict(self) -> dict[str, str]: + return { + item.name: item.value_type.convert_value_to_str(item.value) for item in self + } + + def set_value_dict( + self, + value_dict: dict[str, Any], + allow_convert_from_str: bool = False, + ) -> None: + for name, value in value_dict.items(): + item = self.get(name) + item.set_value( + value, + allow_convert_from_str=allow_convert_from_str, + ) diff --git a/python/cru/list.py b/python/cru/list.py new file mode 100644 index 0000000..216a561 --- /dev/null +++ b/python/cru/list.py @@ -0,0 +1,160 @@ +from __future__ import annotations + +from collections.abc import Callable, Iterator +from typing import Any, Generic, Iterable, TypeAlias, TypeVar, overload + +from ._error import CruInternalError +from ._iter import CruIterator +from ._const import CruNotFound + +_T = TypeVar("_T") +_O = TypeVar("_O") + + +class CruListEdit(CruIterator[_T]): + def __init__(self, iterable: Iterable[_T], _list: CruList[Any]) -> None: + super().__init__(iterable) + self._list = _list + + def create_me(self, iterable: Iterable[_O]) -> CruListEdit[_O]: + return CruListEdit(iterable, self._list) + + @property + def list(self) -> CruList[Any]: + return self._list + + def done(self) -> CruList[Any]: + self._list.reset(self) + return self._list + + +class CruList(list[_T]): + def reset(self, new_values: Iterable[_T]): + if self is new_values: + new_values = list(new_values) + self.clear() + self.extend(new_values) + return self + + def as_cru_iterator(self) -> CruIterator[_T]: + return CruIterator(self) + + @staticmethod + def make(maybe_list: Iterable[_T] | _T | None) -> CruList[_T]: + if maybe_list is None: + return CruList() + if isinstance(maybe_list, Iterable): + return CruList(maybe_list) + return CruList([maybe_list]) + + +_K = TypeVar("_K") + +_KeyGetter: TypeAlias = Callable[[_T], _K] + + +class CruUniqueKeyList(Generic[_T, _K]): + def __init__( + self, + key_getter: _KeyGetter[_T, _K], + *, + before_add: Callable[[_T], _T] | None = None, + ): + super().__init__() + self._key_getter = key_getter + self._before_add = before_add + self._list: CruList[_T] = CruList() + + @property + def key_getter(self) -> _KeyGetter[_T, _K]: + return self._key_getter + + @property + def internal_list(self) -> CruList[_T]: + return self._list + + def validate_self(self): + keys = self._list.transform(self._key_getter) + if len(keys) != len(set(keys)): + raise CruInternalError("Duplicate keys!") + + @overload + def get_or( + self, key: _K, fallback: CruNotFound = CruNotFound.VALUE + ) -> _T | CruNotFound: ... + + @overload + def get_or(self, key: _K, fallback: _O) -> _T | _O: ... + + def get_or( + self, key: _K, fallback: _O | CruNotFound = CruNotFound.VALUE + ) -> _T | _O | CruNotFound: + return ( + self._list.as_cru_iterator() + .filter(lambda v: key == self._key_getter(v)) + .first_or(fallback) + ) + + def get(self, key: _K) -> _T: + value = self.get_or(key) + if value is CruNotFound.VALUE: + raise KeyError(f"Key {key} not found!") + return value # type: ignore + + @property + def keys(self) -> Iterable[_K]: + return self._list.as_cru_iterator().map(self._key_getter) + + def has_key(self, key: _K) -> bool: + return self.get_or(key) != CruNotFound.VALUE + + def try_remove(self, key: _K) -> bool: + value = self.get_or(key) + if value is CruNotFound.VALUE: + return False + self._list.remove(value) + return True + + def remove(self, key: _K, allow_absence: bool = False) -> None: + if not self.try_remove(key) and not allow_absence: + raise KeyError(f"Key {key} not found!") + + def add(self, value: _T, /, replace: bool = False) -> None: + v = self.get_or(self._key_getter(value)) + if v is not CruNotFound.VALUE: + if not replace: + raise KeyError(f"Key {self._key_getter(v)} already exists!") + self._list.remove(v) + if self._before_add is not None: + value = self._before_add(value) + self._list.append(value) + + def set(self, value: _T) -> None: + self.add(value, True) + + def extend(self, iterable: Iterable[_T], /, replace: bool = False) -> None: + values = list(iterable) + to_remove = [] + for value in values: + v = self.get_or(self._key_getter(value)) + if v is not CruNotFound.VALUE: + if not replace: + raise KeyError(f"Key {self._key_getter(v)} already exists!") + to_remove.append(v) + for value in to_remove: + self._list.remove(value) + if self._before_add is not None: + values = [self._before_add(value) for value in values] + self._list.extend(values) + + def clear(self) -> None: + self._list.reset([]) + + def __iter__(self) -> Iterator[_T]: + return iter(self._list) + + def __len__(self) -> int: + return len(self._list) + + def cru_iter(self) -> CruIterator[_T]: + return CruIterator(self._list) diff --git a/python/cru/parsing.py b/python/cru/parsing.py new file mode 100644 index 0000000..0e9239d --- /dev/null +++ b/python/cru/parsing.py @@ -0,0 +1,290 @@ +from __future__ import annotations + +from abc import ABCMeta, abstractmethod +from dataclasses import dataclass +from enum import Enum +from typing import NamedTuple, TypeAlias, TypeVar, Generic, NoReturn, Callable + +from ._error import CruException +from ._iter import CruIterable + +_T = TypeVar("_T") + + +class StrParseStream: + class MemStackEntry(NamedTuple): + pos: int + lineno: int + + class MemStackPopStr(NamedTuple): + text: str + lineno: int + + def __init__(self, text: str) -> None: + self._text = text + self._pos = 0 + self._lineno = 1 + self._length = len(self._text) + self._valid_pos_range = range(0, self.length + 1) + self._valid_offset_range = range(-self.length, self.length + 1) + self._mem_stack: CruIterable.IterList[StrParseStream.MemStackEntry] = ( + CruIterable.IterList() + ) + + @property + def text(self) -> str: + return self._text + + @property + def length(self) -> int: + return self._length + + @property + def valid_pos_range(self) -> range: + return self._valid_pos_range + + @property + def valid_offset_range(self) -> range: + return self._valid_offset_range + + @property + def pos(self) -> int: + return self._pos + + @property + def lineno(self) -> int: + return self._lineno + + @property + def eof(self) -> bool: + return self._pos == self.length + + def peek(self, length: int) -> str: + real_length = min(length, self.length - self._pos) + new_position = self._pos + real_length + text = self._text[self._pos : new_position] + return text + + def read(self, length: int) -> str: + text = self.peek(length) + self._pos += len(text) + self._lineno += text.count("\n") + return text + + def skip(self, length: int) -> None: + self.read(length) + + def peek_str(self, text: str) -> bool: + if self.pos + len(text) > self.length: + return False + for offset in range(len(text)): + if self._text[self.pos + offset] != text[offset]: + return False + return True + + def read_str(self, text: str) -> bool: + if not self.peek_str(text): + return False + self._pos += len(text) + self._lineno += text.count("\n") + return True + + @property + def mem_stack(self) -> CruIterable.IterList[MemStackEntry]: + return self._mem_stack + + def push_mem(self) -> None: + self.mem_stack.append(self.MemStackEntry(self.pos, self.lineno)) + + def pop_mem(self) -> MemStackEntry: + return self.mem_stack.pop() + + def pop_mem_str(self, strip_end: int = 0) -> MemStackPopStr: + old = self.pop_mem() + assert self.pos >= old.pos + return self.MemStackPopStr( + self._text[old.pos : self.pos - strip_end], old.lineno + ) + + +class ParseError(CruException, Generic[_T]): + def __init__( + self, + message, + parser: Parser[_T], + text: str, + line_number: int | None = None, + *args, + **kwargs, + ): + super().__init__(message, *args, **kwargs) + self._parser = parser + self._text = text + self._line_number = line_number + + @property + def parser(self) -> Parser[_T]: + return self._parser + + @property + def text(self) -> str: + return self._text + + @property + def line_number(self) -> int | None: + return self._line_number + + +class Parser(Generic[_T], metaclass=ABCMeta): + def __init__(self, name: str) -> None: + self._name = name + + @property + def name(self) -> str: + return self._name + + @abstractmethod + def parse(self, s: str) -> _T: + raise NotImplementedError() + + def raise_parse_exception( + self, text: str, line_number: int | None = None + ) -> NoReturn: + a = line_number and f" at line {line_number}" or "" + raise ParseError(f"Parser {self.name} failed{a}.", self, text, line_number) + + +class _SimpleLineVarParserEntry(NamedTuple): + key: str + value: str + line_number: int | None = None + + +class _SimpleLineVarParserResult(CruIterable.IterList[_SimpleLineVarParserEntry]): + pass + + +class SimpleLineVarParser(Parser[_SimpleLineVarParserResult]): + """ + The parsing result is a list of tuples (key, value, line number). + """ + + Entry: TypeAlias = _SimpleLineVarParserEntry + Result: TypeAlias = _SimpleLineVarParserResult + + def __init__(self) -> None: + super().__init__(type(self).__name__) + + def _parse(self, text: str, callback: Callable[[Entry], None]) -> None: + for ln, line in enumerate(text.splitlines()): + line_number = ln + 1 + # check if it's a comment + if line.strip().startswith("#"): + continue + # check if there is a '=' + if line.find("=") == -1: + self.raise_parse_exception("There is even no '='!", line_number) + # split at first '=' + key, value = line.split("=", 1) + key = key.strip() + value = value.strip() + callback(_SimpleLineVarParserEntry(key, value, line_number)) + + def parse(self, text: str) -> Result: + result = _SimpleLineVarParserResult() + self._parse(text, lambda item: result.append(item)) + return result + + +class _StrWrapperVarParserTokenKind(Enum): + TEXT = "TEXT" + VAR = "VAR" + + +@dataclass +class _StrWrapperVarParserToken: + kind: _StrWrapperVarParserTokenKind + value: str + line_number: int + + @property + def is_text(self) -> bool: + return self.kind is _StrWrapperVarParserTokenKind.TEXT + + @property + def is_var(self) -> bool: + return self.kind is _StrWrapperVarParserTokenKind.VAR + + @staticmethod + def from_mem_str( + kind: _StrWrapperVarParserTokenKind, mem_str: StrParseStream.MemStackPopStr + ) -> _StrWrapperVarParserToken: + return _StrWrapperVarParserToken(kind, mem_str.text, mem_str.lineno) + + def __repr__(self) -> str: + return f"VAR: {self.value}" if self.is_var else "TEXT: ..." + + +class _StrWrapperVarParserResult(CruIterable.IterList[_StrWrapperVarParserToken]): + pass + + +class StrWrapperVarParser(Parser[_StrWrapperVarParserResult]): + TokenKind: TypeAlias = _StrWrapperVarParserTokenKind + Token: TypeAlias = _StrWrapperVarParserToken + Result: TypeAlias = _StrWrapperVarParserResult + + def __init__(self, wrapper: str): + super().__init__(f"StrWrapperVarParser({wrapper})") + self._wrapper = wrapper + + @property + def wrapper(self) -> str: + return self._wrapper + + def parse(self, text: str) -> Result: + result = self.Result() + + class _State(Enum): + TEXT = "TEXT" + VAR = "VAR" + + state = _State.TEXT + stream = StrParseStream(text) + stream.push_mem() + + while True: + if stream.eof: + break + + if stream.read_str(self.wrapper): + if state is _State.TEXT: + result.append( + self.Token.from_mem_str( + self.TokenKind.TEXT, stream.pop_mem_str(len(self.wrapper)) + ) + ) + state = _State.VAR + stream.push_mem() + else: + result.append( + self.Token.from_mem_str( + self.TokenKind.VAR, + stream.pop_mem_str(len(self.wrapper)), + ) + ) + state = _State.TEXT + stream.push_mem() + + continue + + stream.skip(1) + + if state is _State.VAR: + raise ParseError("Text ended without closing variable.", self, text) + + mem_str = stream.pop_mem_str() + if len(mem_str.text) != 0: + result.append(self.Token.from_mem_str(self.TokenKind.TEXT, mem_str)) + + return result diff --git a/python/cru/service/__init__.py b/python/cru/service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/cru/service/__main__.py b/python/cru/service/__main__.py new file mode 100644 index 0000000..2a0268b --- /dev/null +++ b/python/cru/service/__main__.py @@ -0,0 +1,27 @@ +import sys + +from cru import CruException + +from ._app import create_app + + +def main(): + app = create_app() + app.run_command() + + +if __name__ == "__main__": + version_info = sys.version_info + if not (version_info.major == 3 and version_info.minor >= 11): + print("This application requires Python 3.11 or later.", file=sys.stderr) + sys.exit(1) + + try: + main() + except CruException as e: + user_message = e.get_user_message() + if user_message is not None: + print(f"Error: {user_message}") + exit(1) + else: + raise diff --git a/python/cru/service/_app.py b/python/cru/service/_app.py new file mode 100644 index 0000000..b4c6271 --- /dev/null +++ b/python/cru/service/_app.py @@ -0,0 +1,30 @@ +from ._base import ( + AppBase, + CommandDispatcher, + PathCommandProvider, +) +from ._template import TemplateManager +from ._nginx import NginxManager +from ._gen_cmd import GenCmdProvider + +APP_ID = "crupest" + + +class App(AppBase): + def __init__(self): + super().__init__(APP_ID, f"{APP_ID}-service") + self.add_feature(PathCommandProvider()) + self.add_feature(TemplateManager()) + self.add_feature(NginxManager()) + self.add_feature(GenCmdProvider()) + self.add_feature(CommandDispatcher()) + + def run_command(self): + command_dispatcher = self.get_feature(CommandDispatcher) + command_dispatcher.run_command() + + +def create_app() -> App: + app = App() + app.setup() + return app diff --git a/python/cru/service/_base.py b/python/cru/service/_base.py new file mode 100644 index 0000000..e1eee70 --- /dev/null +++ b/python/cru/service/_base.py @@ -0,0 +1,400 @@ +from __future__ import annotations + +from argparse import ArgumentParser, Namespace +from abc import ABC, abstractmethod +import argparse +import os +from pathlib import Path +from typing import TypeVar, overload + +from cru import CruException, CruLogicError + +_Feature = TypeVar("_Feature", bound="AppFeatureProvider") + + +class AppError(CruException): + pass + + +class AppFeatureError(AppError): + def __init__(self, message, feature: type | str, *args, **kwargs): + super().__init__(message, *args, **kwargs) + self._feature = feature + + @property + def feature(self) -> type | str: + return self._feature + + +class AppPathError(CruException): + def __init__(self, message, _path: str | Path, *args, **kwargs): + super().__init__(message, *args, **kwargs) + self._path = str(_path) + + @property + def path(self) -> str: + return self._path + + +class AppPath(ABC): + def __init__(self, id: str, is_dir: bool, description: str) -> None: + self._is_dir = is_dir + self._id = id + self._description = description + + @property + @abstractmethod + def parent(self) -> AppPath | None: ... + + @property + @abstractmethod + def app(self) -> AppBase: ... + + @property + def id(self) -> str: + return self._id + + @property + def description(self) -> str: + return self._description + + @property + def is_dir(self) -> bool: + return self._is_dir + + @property + @abstractmethod + def full_path(self) -> Path: ... + + @property + def full_path_str(self) -> str: + return str(self.full_path) + + def check_parents(self, must_exist: bool = False) -> bool: + for p in reversed(self.full_path.parents): + if not p.exists() and not must_exist: + return False + if not p.is_dir(): + raise AppPathError("Parents' path must be a dir.", self.full_path) + return True + + def check_self(self, must_exist: bool = False) -> bool: + if not self.check_parents(must_exist): + return False + if not self.full_path.exists(): + if not must_exist: + return False + raise AppPathError("Not exist.", self.full_path) + if self.is_dir: + if not self.full_path.is_dir(): + raise AppPathError("Should be a directory, but not.", self.full_path) + else: + return True + else: + if not self.full_path.is_file(): + raise AppPathError("Should be a file, but not.", self.full_path) + else: + return True + + def ensure(self, create_file: bool = False) -> None: + e = self.check_self(False) + if not e: + os.makedirs(self.full_path.parent, exist_ok=True) + if self.is_dir: + os.mkdir(self.full_path) + elif create_file: + with open(self.full_path, "w") as f: + f.write("") + + def read_text(self) -> str: + if self.is_dir: + raise AppPathError("Can't read text of a dir.", self.full_path) + self.check_self() + return self.full_path.read_text() + + def add_subpath( + self, + name: str, + is_dir: bool, + /, + id: str | None = None, + description: str = "", + ) -> AppFeaturePath: + return self.app._add_path(name, is_dir, self, id, description) + + @property + def app_relative_path(self) -> Path: + return self.full_path.relative_to(self.app.root.full_path) + + +class AppFeaturePath(AppPath): + def __init__( + self, + parent: AppPath, + name: str, + is_dir: bool, + /, + id: str | None = None, + description: str = "", + ) -> None: + super().__init__(id or name, is_dir, description) + self._name = name + self._parent = parent + + @property + def name(self) -> str: + return self._name + + @property + def parent(self) -> AppPath: + return self._parent + + @property + def app(self) -> AppBase: + return self.parent.app + + @property + def full_path(self) -> Path: + return Path(self.parent.full_path, self.name).resolve() + + +class AppRootPath(AppPath): + def __init__(self, app: AppBase, path: Path): + super().__init__(f"/{id}", True, f"Application {id} root path.") + self._app = app + self._full_path = path.resolve() + + @property + def parent(self) -> None: + return None + + @property + def app(self) -> AppBase: + return self._app + + @property + def full_path(self) -> Path: + return self._full_path + + +class AppFeatureProvider(ABC): + def __init__(self, name: str, /, app: AppBase | None = None): + super().__init__() + self._name = name + self._app = app if app else AppBase.get_instance() + + @property + def app(self) -> AppBase: + return self._app + + @property + def name(self) -> str: + return self._name + + @abstractmethod + def setup(self) -> None: ... + + +class AppCommandFeatureProvider(AppFeatureProvider): + @abstractmethod + def get_command_info(self) -> tuple[str, str]: ... + + @abstractmethod + def setup_arg_parser(self, arg_parser: ArgumentParser): ... + + @abstractmethod + def run_command(self, args: Namespace) -> None: ... + + +class PathCommandProvider(AppCommandFeatureProvider): + def __init__(self) -> None: + super().__init__("path-command-provider") + + def setup(self): + pass + + def get_command_info(self): + return ("path", "Get information about paths used by app.") + + def setup_arg_parser(self, arg_parser: ArgumentParser) -> None: + subparsers = arg_parser.add_subparsers( + dest="path_command", metavar="PATH_COMMAND" + ) + _list_parser = subparsers.add_parser( + "list", help="list special paths used by app" + ) + + def run_command(self, args: Namespace) -> None: + if args.path_command is None or args.path_command == "list": + for path in self.app.paths: + print( + f"{path.app_relative_path.as_posix()}{'/' if path.is_dir else ''}: {path.description}" + ) + + +class CommandDispatcher(AppFeatureProvider): + def __init__(self) -> None: + super().__init__("command-dispatcher") + + def setup_arg_parser(self) -> None: + self._map: dict[str, AppCommandFeatureProvider] = {} + arg_parser = argparse.ArgumentParser( + description="Service management", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + subparsers = arg_parser.add_subparsers( + dest="command", + help="The management command to execute.", + metavar="COMMAND", + ) + for feature in self.app.features: + if isinstance(feature, AppCommandFeatureProvider): + info = feature.get_command_info() + command_subparser = subparsers.add_parser(info[0], help=info[1]) + feature.setup_arg_parser(command_subparser) + self._map[info[0]] = feature + self._arg_parser = arg_parser + + def setup(self): + self._parsed_args = self.arg_parser.parse_args() + + @property + def arg_parser(self) -> argparse.ArgumentParser: + return self._arg_parser + + @property + def command_map(self) -> dict[str, AppCommandFeatureProvider]: + return self._map + + @property + def program_args(self) -> argparse.Namespace: + return self._parsed_args + + def run_command(self) -> None: + args = self.program_args + if args.command is None: + self.arg_parser.print_help() + return + self.command_map[args.command].run_command(args) + + +class AppBase: + _instance: AppBase | None = None + + @staticmethod + def get_instance() -> AppBase: + if AppBase._instance is None: + raise AppError("App instance not initialized") + return AppBase._instance + + def __init__(self, app_id: str, name: str): + AppBase._instance = self + self._app_id = app_id + self._name = name + self._features: list[AppFeatureProvider] = [] + self._paths: list[AppFeaturePath] = [] + + def setup(self) -> None: + command_dispatcher = self.get_feature(CommandDispatcher) + command_dispatcher.setup_arg_parser() + self._root = AppRootPath(self, Path(self._ensure_env("CRUPEST_PROJECT_DIR"))) + self._data_dir = self._root.add_subpath( + self._ensure_env("CRUPEST_DATA_DIR"), True, id="data" + ) + self._services_dir = self._root.add_subpath( + self._ensure_env("CRUPEST_SERVICES_DIR"), True, id="CRUPEST_SERVICES_DIR" + ) + for feature in self.features: + feature.setup() + for path in self.paths: + path.check_self() + + @property + def app_id(self) -> str: + return self._app_id + + @property + def name(self) -> str: + return self._name + + def _ensure_env(self, env_name: str) -> str: + value = os.getenv(env_name) + if value is None: + raise AppError(f"Environment variable {env_name} not set") + return value + + @property + def root(self) -> AppRootPath: + return self._root + + @property + def data_dir(self) -> AppFeaturePath: + return self._data_dir + + @property + def services_dir(self) -> AppFeaturePath: + return self._services_dir + + @property + def app_initialized(self) -> bool: + return self.data_dir.check_self() + + @property + def features(self) -> list[AppFeatureProvider]: + return self._features + + @property + def paths(self) -> list[AppFeaturePath]: + return self._paths + + def add_feature(self, feature: _Feature) -> _Feature: + for f in self.features: + if f.name == feature.name: + raise AppFeatureError( + f"Duplicate feature name: {feature.name}.", feature.name + ) + self._features.append(feature) + return feature + + def _add_path( + self, + name: str, + is_dir: bool, + /, + parent: AppPath | None = None, + id: str | None = None, + description: str = "", + ) -> AppFeaturePath: + p = AppFeaturePath( + parent or self.root, name, is_dir, id=id, description=description + ) + self._paths.append(p) + return p + + @overload + def get_feature(self, feature: str) -> AppFeatureProvider: ... + + @overload + def get_feature(self, feature: type[_Feature]) -> _Feature: ... + + def get_feature( + self, feature: str | type[_Feature] + ) -> AppFeatureProvider | _Feature: + if isinstance(feature, str): + for f in self._features: + if f.name == feature: + return f + elif isinstance(feature, type): + for f in self._features: + if isinstance(f, feature): + return f + else: + raise CruLogicError("Argument must be the name of feature or its class.") + + raise AppFeatureError(f"Feature {feature} not found.", feature) + + def get_path(self, name: str) -> AppFeaturePath: + for p in self._paths: + if p.id == name or p.name == name: + return p + raise AppPathError(f"Application path {name} not found.", name) diff --git a/python/cru/service/_gen_cmd.py b/python/cru/service/_gen_cmd.py new file mode 100644 index 0000000..f51d65f --- /dev/null +++ b/python/cru/service/_gen_cmd.py @@ -0,0 +1,200 @@ +from dataclasses import dataclass, replace +from typing import TypeAlias + +from ._base import AppCommandFeatureProvider +from ._nginx import NginxManager + +_Str_Or_Cmd_List: TypeAlias = str | list["_Cmd"] + + +@dataclass +class _Cmd: + name: str + desc: str + cmd: _Str_Or_Cmd_List + + def clean(self) -> "_Cmd": + if isinstance(self.cmd, list): + return replace( + self, + cmd=[cmd.clean() for cmd in self.cmd], + ) + elif isinstance(self.cmd, str): + return replace(self, cmd=self.cmd.strip()) + else: + raise ValueError("Unexpected type for cmd.") + + def generate_text( + self, + info_only: bool, + *, + parent: str | None = None, + ) -> str: + if parent is None: + tag = "COMMAND" + long_name = self.name + indent = "" + else: + tag = "SUBCOMMAND" + long_name = f"{parent}.{self.name}" + indent = " " + + if info_only: + return f"{indent}[{long_name}]: {self.desc}" + + text = f"--- {tag}[{long_name}]: {self.desc}" + if isinstance(self.cmd, str): + text += "\n" + self.cmd + elif isinstance(self.cmd, list): + for sub in self.cmd: + text += "\n" * 2 + sub.generate_text(info_only, parent=self.name) + else: + raise ValueError("Unexpected type for cmd.") + + lines: list[str] = [] + for line in text.splitlines(): + if len(line) == 0: + lines.append("") + else: + lines.append(indent + line) + text = "\n".join(lines) + + return text + + +_docker_uninstall = _Cmd( + "uninstall", + "uninstall apt docker", + """ +for pkg in docker.io docker-doc docker-compose \ +podman-docker containerd runc; \ +do sudo apt-get remove $pkg; done +""", +) + +_docker_apt_certs = _Cmd( + "apt-certs", + "prepare apt certs", + """ +sudo apt-get update +sudo apt-get install ca-certificates curl +sudo install -m 0755 -d /etc/apt/keyrings +""", +) + +_docker_docker_certs = _Cmd( + "docker-certs", + "add docker certs", + """ +sudo curl -fsSL https://download.docker.com/linux/debian/gpg \ +-o /etc/apt/keyrings/docker.asc +sudo chmod a+r /etc/apt/keyrings/docker.asc +""", +) + +_docker_apt_repo = _Cmd( + "apt-repo", + "add docker apt repo", + """ +echo \\ + "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \ +https://download.docker.com/linux/debian \\ + $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\ + sudo tee /etc/apt/sources.list.d/docker.list > /dev/null +""", +) + +_docker_install = _Cmd( + "install", + "update apt and install docker", + """ +sudo apt-get update +sudo apt-get install docker-ce docker-ce-cli containerd.io \ +docker-buildx-plugin docker-compose-plugin +""", +) + +_docker_setup = _Cmd( + "setup", + "setup system for docker", + """ +sudo systemctl enable docker +sudo systemctl start docker +sudo groupadd -f docker +sudo usermod -aG docker $USER +# Remember to log out and log back in for the group changes to take effect +""", +) + +_docker = _Cmd( + "install-docker", + "install docker for a fresh new system", + [ + _docker_uninstall, + _docker_apt_certs, + _docker_docker_certs, + _docker_apt_repo, + _docker_install, + _docker_setup, + ], +) + +_update_blog = _Cmd( + "update-blog", + "re-generate blog pages", + """ +docker exec -it blog /scripts/update.bash +""", +) + +_git_user = _Cmd( + "git-user", + "add/set git server user and password", + """ +docker run -it --rm -v "$ps_file:/user-info" httpd htpasswd "/user-info" [username] +""", +) + + +class GenCmdProvider(AppCommandFeatureProvider): + def __init__(self) -> None: + super().__init__("gen-cmd-provider") + self._cmds: dict[str, _Cmd] = {} + self._register_cmds(_docker, _update_blog, _git_user) + + def _register_cmd(self, cmd: "_Cmd"): + self._cmds[cmd.name] = cmd.clean() + + def _register_cmds(self, *cmds: "_Cmd"): + for c in cmds: + self._register_cmd(c) + + def setup(self): + pass + + def get_command_info(self): + return ("gen-cmd", "Get commands of running external cli tools.") + + def setup_arg_parser(self, arg_parser): + subparsers = arg_parser.add_subparsers( + dest="gen_cmd", metavar="GEN_CMD_COMMAND" + ) + certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") + certbot_parser.add_argument( + "-t", "--test", action="store_true", help="run certbot in test mode" + ) + for cmd in self._cmds.values(): + subparsers.add_parser(cmd.name, help=cmd.desc) + + def _print_cmd(self, name: str): + print(self._cmds[name].generate_text(False)) + + def run_command(self, args): + if args.gen_cmd is None or args.gen_cmd == "list": + print("[certbot]: certbot ssl cert commands") + for cmd in self._cmds.values(): + print(cmd.generate_text(True)) + elif args.gen_cmd == "certbot": + self.app.get_feature(NginxManager).print_all_certbot_commands(args.test) + else: + self._print_cmd(args.gen_cmd) diff --git a/python/cru/service/_nginx.py b/python/cru/service/_nginx.py new file mode 100644 index 0000000..87cff6d --- /dev/null +++ b/python/cru/service/_nginx.py @@ -0,0 +1,263 @@ +from argparse import Namespace +from enum import Enum, auto +import re +import subprocess +from typing import TypeAlias + +from cru import CruInternalError + +from ._base import AppCommandFeatureProvider +from ._template import TemplateManager + + +class CertbotAction(Enum): + CREATE = auto() + EXPAND = auto() + SHRINK = auto() + RENEW = auto() + + +class NginxManager(AppCommandFeatureProvider): + CertbotAction: TypeAlias = CertbotAction + + def __init__(self) -> None: + super().__init__("nginx-manager") + self._domains_cache: list[str] | None = None + + def setup(self) -> None: + pass + + @property + def _template_manager(self) -> TemplateManager: + return self.app.get_feature(TemplateManager) + + @property + def root_domain(self) -> str: + return self._template_manager.get_domain() + + @property + def domains(self) -> list[str]: + if self._domains_cache is None: + self._domains_cache = self._get_domains() + return self._domains_cache + + @property + def subdomains(self) -> list[str]: + suffix = "." + self.root_domain + return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)] + + def _get_domains_from_text(self, text: str) -> set[str]: + domains: set[str] = set() + regex = re.compile(r"server_name\s+(\S+)\s*;") + for match in regex.finditer(text): + domains.add(match[1]) + return domains + + def _join_generated_nginx_conf_text(self) -> str: + result = "" + for path, text in self._template_manager.generate(): + if "nginx" in str(path): + result += text + return result + + def _get_domains(self) -> list[str]: + text = self._join_generated_nginx_conf_text() + domains = self._get_domains_from_text(text) + domains.remove(self.root_domain) + return [self.root_domain, *domains] + + def _print_domains(self) -> None: + for domain in self.domains: + print(domain) + + def _certbot_command( + self, + action: CertbotAction | str, + test: bool, + *, + docker=True, + standalone=None, + email=None, + agree_tos=True, + ) -> str: + if isinstance(action, str): + action = CertbotAction[action.upper()] + + command_args = [] + + add_domain_option = True + if action is CertbotAction.CREATE: + if standalone is None: + standalone = True + command_action = "certonly" + elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]: + if standalone is None: + standalone = False + command_action = "certonly" + elif action is CertbotAction.RENEW: + if standalone is None: + standalone = False + add_domain_option = False + command_action = "renew" + else: + raise CruInternalError("Invalid certbot action.") + + data_dir = self.app.data_dir.full_path.as_posix() + + if not docker: + command_args.append("certbot") + else: + command_args.extend( + [ + "docker run -it --rm --name certbot", + f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"', + f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"', + ] + ) + if standalone: + command_args.append('-p "0.0.0.0:80:80"') + else: + command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"') + + command_args.append("certbot/certbot") + + command_args.append(command_action) + + command_args.append(f"--cert-name {self.root_domain}") + + if standalone: + command_args.append("--standalone") + else: + command_args.append("--webroot -w /var/www/certbot") + + if add_domain_option: + command_args.append(" ".join([f"-d {domain}" for domain in self.domains])) + + if email is not None: + command_args.append(f"--email {email}") + + if agree_tos: + command_args.append("--agree-tos") + + if test: + command_args.append("--test-cert --dry-run") + + return " ".join(command_args) + + def print_all_certbot_commands(self, test: bool): + print("### COMMAND: (standalone) create certs") + print( + self._certbot_command( + CertbotAction.CREATE, + test, + email=self._template_manager.get_email(), + ) + ) + print() + print("### COMMAND: (webroot+nginx) expand or shrink certs") + print( + self._certbot_command( + CertbotAction.EXPAND, + test, + email=self._template_manager.get_email(), + ) + ) + print() + print("### COMMAND: (webroot+nginx) renew certs") + print( + self._certbot_command( + CertbotAction.RENEW, + test, + email=self._template_manager.get_email(), + ) + ) + + @property + def _cert_path_str(self) -> str: + return str( + self.app.data_dir.full_path + / "certbot/certs/live" + / self.root_domain + / "fullchain.pem" + ) + + def get_command_info(self): + return "nginx", "Manage nginx related things." + + def setup_arg_parser(self, arg_parser): + subparsers = arg_parser.add_subparsers( + dest="nginx_command", required=True, metavar="NGINX_COMMAND" + ) + _list_parser = subparsers.add_parser("list", help="list domains") + certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") + certbot_parser.add_argument( + "--no-test", + action="store_true", + help="remove args making certbot run in test mode", + ) + + def run_command(self, args: Namespace) -> None: + if args.nginx_command == "list": + self._print_domains() + elif args.nginx_command == "certbot": + self.print_all_certbot_commands(not args.no_test) + + def _generate_dns_zone( + self, + ip: str, + /, + ttl: str | int = 600, + *, + enable_mail: bool = True, + dkim: str | None = None, + ) -> str: + # TODO: Not complete and test now. + root_domain = self.root_domain + result = f"$ORIGIN {root_domain}.\n\n" + result += "; A records\n" + result += f"@ {ttl} IN A {ip}\n" + for subdomain in self.subdomains: + result += f"{subdomain} {ttl} IN A {ip}\n" + + if enable_mail: + result += "\n; MX records\n" + result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n" + result += "\n; SPF record\n" + result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n' + if dkim is not None: + result += "\n; DKIM record\n" + result += f'mail._domainkey {ttl} IN TEXT "{dkim}"' + result += "\n; DMARC record\n" + dmarc_options = [ + "v=DMARC1", + "p=none", + f"rua=mailto:dmarc.report@{root_domain}", + f"ruf=mailto:dmarc.report@{root_domain}", + "sp=none", + "ri=86400", + ] + result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n' + return result + + def _get_dkim_from_mailserver(self) -> str | None: + # TODO: Not complete and test now. + dkim_path = ( + self.app.data_dir.full_path + / "dms/config/opendkim/keys" + / self.root_domain + / "mail.txt" + ) + if not dkim_path.exists(): + return None + + p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True) + value = "" + for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")): + value += match.group(1) + return value + + def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str: + # TODO: Not complete and test now. + return self._generate_dns_zone( + ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver() + ) diff --git a/python/cru/service/_template.py b/python/cru/service/_template.py new file mode 100644 index 0000000..22c1d21 --- /dev/null +++ b/python/cru/service/_template.py @@ -0,0 +1,228 @@ +from argparse import Namespace +from pathlib import Path +import shutil +from typing import NamedTuple +import graphlib + +from cru import CruException +from cru.parsing import SimpleLineVarParser +from cru.template import TemplateTree, CruStrWrapperTemplate + +from ._base import AppCommandFeatureProvider, AppFeaturePath + + +class _Config(NamedTuple): + text: str + config: dict[str, str] + + +class _GeneratedConfig(NamedTuple): + base: _Config + private: _Config + merged: _Config + + +class _PreConfig(NamedTuple): + base: _Config + private: _Config + config: dict[str, str] + + @staticmethod + def create(base: _Config, private: _Config) -> "_PreConfig": + return _PreConfig(base, private, {**base.config, **private.config}) + + def _merge(self, generated: _Config): + text = ( + "\n".join( + [ + self.private.text.strip(), + self.base.text.strip(), + generated.text.strip(), + ] + ) + + "\n" + ) + config = {**self.config, **generated.config} + return _GeneratedConfig(self.base, self.private, _Config(text, config)) + + +class _Template(NamedTuple): + config: CruStrWrapperTemplate + config_vars: set[str] + tree: TemplateTree + + +class TemplateManager(AppCommandFeatureProvider): + def __init__(self): + super().__init__("template-manager") + + def setup(self) -> None: + self._base_config_file = self.app.services_dir.add_subpath("base-config", False) + self._private_config_file = self.app.data_dir.add_subpath("config", False) + self._template_config_file = self.app.services_dir.add_subpath( + "config.template", False + ) + self._templates_dir = self.app.services_dir.add_subpath("templates", True) + self._generated_dir = self.app.services_dir.add_subpath("generated", True) + + self._config_parser = SimpleLineVarParser() + + def _read_pre(app_path: AppFeaturePath) -> _Config: + text = app_path.read_text() + config = self._read_config(text) + return _Config(text, config) + + base = _read_pre(self._base_config_file) + private = _read_pre(self._private_config_file) + self._preconfig = _PreConfig.create(base, private) + + self._generated: _GeneratedConfig | None = None + + template_config_text = self._template_config_file.read_text() + self._template_config = self._read_config(template_config_text) + + self._template = _Template( + CruStrWrapperTemplate(template_config_text), + set(self._template_config.keys()), + TemplateTree( + lambda text: CruStrWrapperTemplate(text), + self.templates_dir.full_path_str, + ), + ) + + self._real_required_vars = ( + self._template.config_vars | self._template.tree.variables + ) - self._template.config_vars + lacks = self._real_required_vars - self._preconfig.config.keys() + self._lack_vars = lacks if len(lacks) > 0 else None + + def _read_config_entry_names(self, text: str) -> set[str]: + return set(entry.key for entry in self._config_parser.parse(text)) + + def _read_config(self, text: str) -> dict[str, str]: + return {entry.key: entry.value for entry in self._config_parser.parse(text)} + + @property + def templates_dir(self) -> AppFeaturePath: + return self._templates_dir + + @property + def generated_dir(self) -> AppFeaturePath: + return self._generated_dir + + def get_domain(self) -> str: + return self._preconfig.config["CRUPEST_DOMAIN"] + + def get_email(self) -> str: + return self._preconfig.config["CRUPEST_EMAIL"] + + def _generate_template_config(self, config: dict[str, str]) -> dict[str, str]: + entry_templates = { + key: CruStrWrapperTemplate(value) + for key, value in self._template_config.items() + } + sorter = graphlib.TopologicalSorter( + config + | {key: template.variables for key, template in entry_templates.items()} + ) + + vars: dict[str, str] = config.copy() + for _ in sorter.static_order(): + del_keys = [] + for key, template in entry_templates.items(): + new = template.generate_partial(vars) + if not new.has_variables: + vars[key] = new.generate({}) + del_keys.append(key) + else: + entry_templates[key] = new + for key in del_keys: + del entry_templates[key] + assert len(entry_templates) == 0 + return {key: value for key, value in vars.items() if key not in config} + + def _generate_config(self) -> _GeneratedConfig: + if self._generated is not None: + return self._generated + if self._lack_vars is not None: + raise CruException(f"Required vars are not defined: {self._lack_vars}.") + config = self._generate_template_config(self._preconfig.config) + text = self._template.config.generate(self._preconfig.config | config) + self._generated = self._preconfig._merge(_Config(text, config)) + return self._generated + + def generate(self) -> list[tuple[Path, str]]: + config = self._generate_config() + return [ + (Path("config"), config.merged.text), + *self._template.tree.generate(config.merged.config), + ] + + def _generate_files(self, dry_run: bool) -> None: + result = self.generate() + if not dry_run: + if self.generated_dir.full_path.exists(): + shutil.rmtree(self.generated_dir.full_path) + for path, text in result: + des = self.generated_dir.full_path / path + des.parent.mkdir(parents=True, exist_ok=True) + with open(des, "w") as f: + f.write(text) + + def get_command_info(self): + return ("template", "Manage templates.") + + def _print_file_lists(self) -> None: + print(f"[{self._template.config.variable_count}]", "config") + for path, template in self._template.tree.templates: + print(f"[{template.variable_count}]", path.as_posix()) + + def _print_vars(self, required: bool) -> None: + for var in self._template.config.variables: + print(f"[config] {var}") + for var in self._template.tree.variables: + if not (required and var in self._template.config_vars): + print(f"[template] {var}") + + def _run_check_vars(self) -> None: + if self._lack_vars is not None: + print("Lacks:") + for var in self._lack_vars: + print(var) + + def setup_arg_parser(self, arg_parser): + subparsers = arg_parser.add_subparsers( + dest="template_command", required=True, metavar="TEMPLATE_COMMAND" + ) + _list_parser = subparsers.add_parser("list", help="list templates") + vars_parser = subparsers.add_parser( + "vars", help="list variables used in all templates" + ) + vars_parser.add_argument( + "-r", + "--required", + help="only list really required one.", + action="store_true", + ) + _check_vars_parser = subparsers.add_parser( + "check-vars", + help="check if required vars are set", + ) + generate_parser = subparsers.add_parser("generate", help="generate templates") + generate_parser.add_argument( + "--no-dry-run", action="store_true", help="generate and write target files" + ) + + def run_command(self, args: Namespace) -> None: + if args.template_command == "list": + self._print_file_lists() + elif args.template_command == "vars": + self._print_vars(args.required) + elif args.template_command == "generate": + dry_run = not args.no_dry_run + self._generate_files(dry_run) + if dry_run: + print("Dry run successfully.") + print( + f"Will delete dir {self.generated_dir.full_path_str} if it exists." + ) diff --git a/python/cru/system.py b/python/cru/system.py new file mode 100644 index 0000000..f321717 --- /dev/null +++ b/python/cru/system.py @@ -0,0 +1,23 @@ +import os.path +import re + + +def check_debian_derivative_version(name: str) -> None | str: + if not os.path.isfile("/etc/os-release"): + return None + with open("/etc/os-release", "r") as f: + content = f.read() + if f"ID={name}" not in content: + return None + m = re.search(r'VERSION_ID="(.+)"', content) + if m is None: + return None + return m.group(1) + + +def check_ubuntu_version() -> None | str: + return check_debian_derivative_version("ubuntu") + + +def check_debian_version() -> None | str: + return check_debian_derivative_version("debian") diff --git a/python/cru/template.py b/python/cru/template.py new file mode 100644 index 0000000..3a70337 --- /dev/null +++ b/python/cru/template.py @@ -0,0 +1,209 @@ +from abc import ABCMeta, abstractmethod +from collections.abc import Callable, Mapping +from pathlib import Path +from string import Template +from typing import Generic, Self, TypeVar + +from ._iter import CruIterator +from ._error import CruException + +from .parsing import StrWrapperVarParser + + +class CruTemplateError(CruException): + pass + + +class CruTemplateBase(metaclass=ABCMeta): + def __init__(self, text: str): + self._text = text + self._variables: set[str] | None = None + + @abstractmethod + def _get_variables(self) -> set[str]: + raise NotImplementedError() + + @property + def text(self) -> str: + return self._text + + @property + def variables(self) -> set[str]: + if self._variables is None: + self._variables = self._get_variables() + return self._variables + + @property + def variable_count(self) -> int: + return len(self.variables) + + @property + def has_variables(self) -> bool: + return self.variable_count > 0 + + @abstractmethod + def _do_generate(self, mapping: dict[str, str]) -> str: + raise NotImplementedError() + + def _generate_partial( + self, mapping: Mapping[str, str], allow_unused: bool = True + ) -> str: + values = dict(mapping) + if not allow_unused and not len(set(values.keys() - self.variables)) != 0: + raise CruTemplateError("Unused variables.") + return self._do_generate(values) + + def generate_partial( + self, mapping: Mapping[str, str], allow_unused: bool = True + ) -> Self: + return self.__class__(self._generate_partial(mapping, allow_unused)) + + def generate(self, mapping: Mapping[str, str], allow_unused: bool = True) -> str: + values = dict(mapping) + if len(self.variables - values.keys()) != 0: + raise CruTemplateError( + f"Missing variables: {self.variables - values.keys()} ." + ) + return self._generate_partial(values, allow_unused) + + +class CruTemplate(CruTemplateBase): + def __init__(self, prefix: str, text: str): + super().__init__(text) + self._prefix = prefix + self._template = Template(text) + + def _get_variables(self) -> set[str]: + return ( + CruIterator(self._template.get_identifiers()) + .filter(lambda i: i.startswith(self.prefix)) + .to_set() + ) + + @property + def prefix(self) -> str: + return self._prefix + + @property + def py_template(self) -> Template: + return self._template + + @property + def all_variables(self) -> set[str]: + return set(self._template.get_identifiers()) + + def _do_generate(self, mapping: dict[str, str]) -> str: + return self._template.safe_substitute(mapping) + + +class CruStrWrapperTemplate(CruTemplateBase): + def __init__(self, text: str, wrapper: str = "@@"): + super().__init__(text) + self._wrapper = wrapper + self._tokens: StrWrapperVarParser.Result + + @property + def wrapper(self) -> str: + return self._wrapper + + def _get_variables(self): + self._tokens = StrWrapperVarParser(self.wrapper).parse(self.text) + return ( + self._tokens.cru_iter() + .filter(lambda t: t.is_var) + .map(lambda t: t.value) + .to_set() + ) + + def _do_generate(self, mapping): + return ( + self._tokens.cru_iter() + .map(lambda t: mapping[t.value] if t.is_var else t.value) + .join_str("") + ) + + +_Template = TypeVar("_Template", bound=CruTemplateBase) + + +class TemplateTree(Generic[_Template]): + def __init__( + self, + template_generator: Callable[[str], _Template], + source: str, + *, + template_file_suffix: str | None = ".template", + ): + """ + If template_file_suffix is not None, the files will be checked according to the + suffix of the file name. If the suffix matches, the file will be regarded as a + template file. Otherwise, it will be regarded as a non-template file. + Content of template file must contain variables that need to be replaced, while + content of non-template file may not contain any variables. + If either case is false, it generally means whether the file is a template is + wrongly handled. + """ + self._template_generator = template_generator + self._files: list[tuple[Path, _Template]] = [] + self._source = source + self._template_file_suffix = template_file_suffix + self._load() + + @property + def templates(self) -> list[tuple[Path, _Template]]: + return self._files + + @property + def source(self) -> str: + return self._source + + @property + def template_file_suffix(self) -> str | None: + return self._template_file_suffix + + @staticmethod + def _scan_files(root: str) -> list[Path]: + root_path = Path(root) + result: list[Path] = [] + for path in root_path.glob("**/*"): + if not path.is_file(): + continue + path = path.relative_to(root_path) + result.append(Path(path)) + return result + + def _load(self) -> None: + files = self._scan_files(self.source) + for file_path in files: + template_file = Path(self.source) / file_path + with open(template_file, "r") as f: + content = f.read() + template = self._template_generator(content) + if self.template_file_suffix is not None: + should_be_template = file_path.name.endswith(self.template_file_suffix) + if should_be_template and not template.has_variables: + raise CruTemplateError( + f"Template file {file_path} has no variables." + ) + elif not should_be_template and template.has_variables: + raise CruTemplateError(f"Non-template {file_path} has variables.") + self._files.append((file_path, template)) + + @property + def variables(self) -> set[str]: + s = set() + for _, template in self.templates: + s.update(template.variables) + return s + + def generate(self, variables: Mapping[str, str]) -> list[tuple[Path, str]]: + result: list[tuple[Path, str]] = [] + for path, template in self.templates: + if self.template_file_suffix is not None and path.name.endswith( + self.template_file_suffix + ): + path = path.parent / (path.name[: -len(self.template_file_suffix)]) + + text = template.generate(variables) + result.append((path, text)) + return result diff --git a/python/cru/tool.py b/python/cru/tool.py new file mode 100644 index 0000000..377f5d7 --- /dev/null +++ b/python/cru/tool.py @@ -0,0 +1,82 @@ +import shutil +import subprocess +from typing import Any +from collections.abc import Iterable + +from ._error import CruException + + +class CruExternalToolError(CruException): + def __init__(self, message: str, tool: str, *args, **kwargs) -> None: + super().__init__(message, *args, **kwargs) + self._tool = tool + + @property + def tool(self) -> str: + return self._tool + + +class CruExternalToolNotFoundError(CruExternalToolError): + def __init__(self, message: str | None, tool: str, *args, **kwargs) -> None: + super().__init__( + message or f"Could not find binary for {tool}.", tool, *args, **kwargs + ) + + +class CruExternalToolRunError(CruExternalToolError): + def __init__( + self, + message: str, + tool: str, + tool_args: Iterable[str], + tool_error: Any, + *args, + **kwargs, + ) -> None: + super().__init__(message, tool, *args, **kwargs) + self._tool_args = list(tool_args) + self._tool_error = tool_error + + @property + def tool_args(self) -> list[str]: + return self._tool_args + + @property + def tool_error(self) -> Any: + return self._tool_error + + +class ExternalTool: + def __init__(self, bin: str) -> None: + self._bin = bin + + @property + def bin(self) -> str: + return self._bin + + @bin.setter + def bin(self, value: str) -> None: + self._bin = value + + @property + def bin_path(self) -> str: + real_bin = shutil.which(self.bin) + if not real_bin: + raise CruExternalToolNotFoundError(None, self.bin) + return real_bin + + def run( + self, *process_args: str, **subprocess_kwargs + ) -> subprocess.CompletedProcess: + try: + return subprocess.run( + [self.bin_path] + list(process_args), **subprocess_kwargs + ) + except subprocess.CalledProcessError as e: + raise CruExternalToolError("Subprocess failed.", self.bin) from e + except OSError as e: + raise CruExternalToolError("Failed to start subprocess", self.bin) from e + + def run_get_output(self, *process_args: str, **subprocess_kwargs) -> Any: + process = self.run(*process_args, capture_output=True, **subprocess_kwargs) + return process.stdout diff --git a/python/cru/value.py b/python/cru/value.py new file mode 100644 index 0000000..9c03219 --- /dev/null +++ b/python/cru/value.py @@ -0,0 +1,292 @@ +from __future__ import annotations + +import random +import secrets +import string +import uuid +from abc import abstractmethod, ABCMeta +from collections.abc import Callable +from typing import Any, ClassVar, TypeVar, Generic + +from ._error import CruException + + +def _str_case_in(s: str, case: bool, str_list: list[str]) -> bool: + if case: + return s in str_list + else: + return s.lower() in [s.lower() for s in str_list] + + +_T = TypeVar("_T") + + +class CruValueTypeError(CruException): + def __init__( + self, + message: str, + value: Any, + value_type: ValueType | None, + *args, + **kwargs, + ): + super().__init__( + message, + *args, + **kwargs, + ) + self._value = value + self._value_type = value_type + + @property + def value(self) -> Any: + return self._value + + @property + def value_type(self) -> ValueType | None: + return self._value_type + + +class ValueType(Generic[_T], metaclass=ABCMeta): + def __init__(self, name: str, _type: type[_T]) -> None: + self._name = name + self._type = _type + + @property + def name(self) -> str: + return self._name + + @property + def type(self) -> type[_T]: + return self._type + + def check_value_type(self, value: Any) -> None: + if not isinstance(value, self.type): + raise CruValueTypeError("Type of value is wrong.", value, self) + + def _do_check_value(self, value: Any) -> _T: + return value + + def check_value(self, value: Any) -> _T: + self.check_value_type(value) + return self._do_check_value(value) + + @abstractmethod + def _do_check_str_format(self, s: str) -> None: + raise NotImplementedError() + + def check_str_format(self, s: str) -> None: + if not isinstance(s, str): + raise CruValueTypeError("Try to check format on a non-str.", s, self) + self._do_check_str_format(s) + + @abstractmethod + def _do_convert_value_to_str(self, value: _T) -> str: + raise NotImplementedError() + + def convert_value_to_str(self, value: _T) -> str: + self.check_value(value) + return self._do_convert_value_to_str(value) + + @abstractmethod + def _do_convert_str_to_value(self, s: str) -> _T: + raise NotImplementedError() + + def convert_str_to_value(self, s: str) -> _T: + self.check_str_format(s) + return self._do_convert_str_to_value(s) + + def check_value_or_try_convert_from_str(self, value_or_str: Any) -> _T: + try: + return self.check_value(value_or_str) + except CruValueTypeError: + if isinstance(value_or_str, str): + return self.convert_str_to_value(value_or_str) + else: + raise + + def create_default_value(self) -> _T: + return self.type() + + +class TextValueType(ValueType[str]): + def __init__(self) -> None: + super().__init__("text", str) + + def _do_check_str_format(self, _s): + return + + def _do_convert_value_to_str(self, value): + return value + + def _do_convert_str_to_value(self, s): + return s + + +class IntegerValueType(ValueType[int]): + def __init__(self) -> None: + super().__init__("integer", int) + + def _do_check_str_format(self, s): + try: + int(s) + except ValueError as e: + raise CruValueTypeError("Invalid integer format.", s, self) from e + + def _do_convert_value_to_str(self, value): + return str(value) + + def _do_convert_str_to_value(self, s): + return int(s) + + +class FloatValueType(ValueType[float]): + def __init__(self) -> None: + super().__init__("float", float) + + def _do_check_str_format(self, s): + try: + float(s) + except ValueError as e: + raise CruValueTypeError("Invalid float format.", s, self) from e + + def _do_convert_value_to_str(self, value): + return str(value) + + def _do_convert_str_to_value(self, s): + return float(s) + + +class BooleanValueType(ValueType[bool]): + DEFAULT_TRUE_LIST: ClassVar[list[str]] = ["true", "yes", "y", "on", "1"] + DEFAULT_FALSE_LIST: ClassVar[list[str]] = ["false", "no", "n", "off", "0"] + + def __init__( + self, + *, + case_sensitive=False, + true_list: None | list[str] = None, + false_list: None | list[str] = None, + ) -> None: + super().__init__("boolean", bool) + self._case_sensitive = case_sensitive + self._valid_true_strs: list[str] = ( + true_list or BooleanValueType.DEFAULT_TRUE_LIST + ) + self._valid_false_strs: list[str] = ( + false_list or BooleanValueType.DEFAULT_FALSE_LIST + ) + + @property + def case_sensitive(self) -> bool: + return self._case_sensitive + + @property + def valid_true_strs(self) -> list[str]: + return self._valid_true_strs + + @property + def valid_false_strs(self) -> list[str]: + return self._valid_false_strs + + @property + def valid_boolean_strs(self) -> list[str]: + return self._valid_true_strs + self._valid_false_strs + + def _do_check_str_format(self, s): + if not _str_case_in(s, self.case_sensitive, self.valid_boolean_strs): + raise CruValueTypeError("Invalid boolean format.", s, self) + + def _do_convert_value_to_str(self, value): + return self._valid_true_strs[0] if value else self._valid_false_strs[0] + + def _do_convert_str_to_value(self, s): + return _str_case_in(s, self.case_sensitive, self._valid_true_strs) + + def create_default_value(self): + return self.valid_false_strs[0] + + +class EnumValueType(ValueType[str]): + def __init__(self, valid_values: list[str], /, case_sensitive=False) -> None: + super().__init__(f"enum({'|'.join(valid_values)})", str) + self._case_sensitive = case_sensitive + self._valid_values = valid_values + + @property + def case_sensitive(self) -> bool: + return self._case_sensitive + + @property + def valid_values(self) -> list[str]: + return self._valid_values + + def _do_check_value(self, value): + self._do_check_str_format(value) + + def _do_check_str_format(self, s): + if not _str_case_in(s, self.case_sensitive, self.valid_values): + raise CruValueTypeError("Invalid enum value", s, self) + + def _do_convert_value_to_str(self, value): + return value + + def _do_convert_str_to_value(self, s): + return s + + def create_default_value(self): + return self.valid_values[0] + + +TEXT_VALUE_TYPE = TextValueType() +INTEGER_VALUE_TYPE = IntegerValueType() +BOOLEAN_VALUE_TYPE = BooleanValueType() + + +class ValueGeneratorBase(Generic[_T], metaclass=ABCMeta): + @abstractmethod + def generate(self) -> _T: + raise NotImplementedError() + + def __call__(self) -> _T: + return self.generate() + + +class ValueGenerator(ValueGeneratorBase[_T]): + def __init__(self, generate_func: Callable[[], _T]) -> None: + self._generate_func = generate_func + + @property + def generate_func(self) -> Callable[[], _T]: + return self._generate_func + + def generate(self) -> _T: + return self._generate_func() + + +class UuidValueGenerator(ValueGeneratorBase[str]): + def generate(self): + return str(uuid.uuid4()) + + +class RandomStringValueGenerator(ValueGeneratorBase[str]): + def __init__(self, length: int, secure: bool) -> None: + self._length = length + self._secure = secure + + @property + def length(self) -> int: + return self._length + + @property + def secure(self) -> bool: + return self._secure + + def generate(self): + random_func = secrets.choice if self._secure else random.choice + characters = string.ascii_letters + string.digits + random_string = "".join(random_func(characters) for _ in range(self._length)) + return random_string + + +UUID_VALUE_GENERATOR = UuidValueGenerator() diff --git a/python/poetry.lock b/python/poetry.lock new file mode 100644 index 0000000..4338200 --- /dev/null +++ b/python/poetry.lock @@ -0,0 +1,111 @@ +# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. + +[[package]] +name = "mypy" +version = "1.15.0" +description = "Optional static typing for Python" +optional = false +python-versions = ">=3.9" +groups = ["dev"] +files = [ + {file = "mypy-1.15.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:979e4e1a006511dacf628e36fadfecbcc0160a8af6ca7dad2f5025529e082c13"}, + {file = "mypy-1.15.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c4bb0e1bd29f7d34efcccd71cf733580191e9a264a2202b0239da95984c5b559"}, + {file = "mypy-1.15.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:be68172e9fd9ad8fb876c6389f16d1c1b5f100ffa779f77b1fb2176fcc9ab95b"}, + {file = "mypy-1.15.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c7be1e46525adfa0d97681432ee9fcd61a3964c2446795714699a998d193f1a3"}, + {file = "mypy-1.15.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:2e2c2e6d3593f6451b18588848e66260ff62ccca522dd231cd4dd59b0160668b"}, + {file = "mypy-1.15.0-cp310-cp310-win_amd64.whl", hash = "sha256:6983aae8b2f653e098edb77f893f7b6aca69f6cffb19b2cc7443f23cce5f4828"}, + {file = "mypy-1.15.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:2922d42e16d6de288022e5ca321cd0618b238cfc5570e0263e5ba0a77dbef56f"}, + {file = "mypy-1.15.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2ee2d57e01a7c35de00f4634ba1bbf015185b219e4dc5909e281016df43f5ee5"}, + {file = "mypy-1.15.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:973500e0774b85d9689715feeffcc980193086551110fd678ebe1f4342fb7c5e"}, + {file = "mypy-1.15.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5a95fb17c13e29d2d5195869262f8125dfdb5c134dc8d9a9d0aecf7525b10c2c"}, + {file = "mypy-1.15.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:1905f494bfd7d85a23a88c5d97840888a7bd516545fc5aaedff0267e0bb54e2f"}, + {file = "mypy-1.15.0-cp311-cp311-win_amd64.whl", hash = "sha256:c9817fa23833ff189db061e6d2eff49b2f3b6ed9856b4a0a73046e41932d744f"}, + {file = "mypy-1.15.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:aea39e0583d05124836ea645f412e88a5c7d0fd77a6d694b60d9b6b2d9f184fd"}, + {file = "mypy-1.15.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:2f2147ab812b75e5b5499b01ade1f4a81489a147c01585cda36019102538615f"}, + {file = "mypy-1.15.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ce436f4c6d218a070048ed6a44c0bbb10cd2cc5e272b29e7845f6a2f57ee4464"}, + {file = "mypy-1.15.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8023ff13985661b50a5928fc7a5ca15f3d1affb41e5f0a9952cb68ef090b31ee"}, + {file = "mypy-1.15.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1124a18bc11a6a62887e3e137f37f53fbae476dc36c185d549d4f837a2a6a14e"}, + {file = "mypy-1.15.0-cp312-cp312-win_amd64.whl", hash = "sha256:171a9ca9a40cd1843abeca0e405bc1940cd9b305eaeea2dda769ba096932bb22"}, + {file = "mypy-1.15.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:93faf3fdb04768d44bf28693293f3904bbb555d076b781ad2530214ee53e3445"}, + {file = "mypy-1.15.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:811aeccadfb730024c5d3e326b2fbe9249bb7413553f15499a4050f7c30e801d"}, + {file = "mypy-1.15.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:98b7b9b9aedb65fe628c62a6dc57f6d5088ef2dfca37903a7d9ee374d03acca5"}, + {file = "mypy-1.15.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c43a7682e24b4f576d93072216bf56eeff70d9140241f9edec0c104d0c515036"}, + {file = "mypy-1.15.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:baefc32840a9f00babd83251560e0ae1573e2f9d1b067719479bfb0e987c6357"}, + {file = "mypy-1.15.0-cp313-cp313-win_amd64.whl", hash = "sha256:b9378e2c00146c44793c98b8d5a61039a048e31f429fb0eb546d93f4b000bedf"}, + {file = "mypy-1.15.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:e601a7fa172c2131bff456bb3ee08a88360760d0d2f8cbd7a75a65497e2df078"}, + {file = "mypy-1.15.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:712e962a6357634fef20412699a3655c610110e01cdaa6180acec7fc9f8513ba"}, + {file = "mypy-1.15.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f95579473af29ab73a10bada2f9722856792a36ec5af5399b653aa28360290a5"}, + {file = "mypy-1.15.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8f8722560a14cde92fdb1e31597760dc35f9f5524cce17836c0d22841830fd5b"}, + {file = "mypy-1.15.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:1fbb8da62dc352133d7d7ca90ed2fb0e9d42bb1a32724c287d3c76c58cbaa9c2"}, + {file = "mypy-1.15.0-cp39-cp39-win_amd64.whl", hash = "sha256:d10d994b41fb3497719bbf866f227b3489048ea4bbbb5015357db306249f7980"}, + {file = "mypy-1.15.0-py3-none-any.whl", hash = "sha256:5469affef548bd1895d86d3bf10ce2b44e33d86923c29e4d675b3e323437ea3e"}, + {file = "mypy-1.15.0.tar.gz", hash = "sha256:404534629d51d3efea5c800ee7c42b72a6554d6c400e6a79eafe15d11341fd43"}, +] + +[package.dependencies] +mypy_extensions = ">=1.0.0" +typing_extensions = ">=4.6.0" + +[package.extras] +dmypy = ["psutil (>=4.0)"] +faster-cache = ["orjson"] +install-types = ["pip"] +mypyc = ["setuptools (>=50)"] +reports = ["lxml"] + +[[package]] +name = "mypy-extensions" +version = "1.0.0" +description = "Type system extensions for programs checked with the mypy type checker." +optional = false +python-versions = ">=3.5" +groups = ["dev"] +files = [ + {file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"}, + {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, +] + +[[package]] +name = "ruff" +version = "0.9.6" +description = "An extremely fast Python linter and code formatter, written in Rust." +optional = false +python-versions = ">=3.7" +groups = ["dev"] +files = [ + {file = "ruff-0.9.6-py3-none-linux_armv6l.whl", hash = "sha256:2f218f356dd2d995839f1941322ff021c72a492c470f0b26a34f844c29cdf5ba"}, + {file = "ruff-0.9.6-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b908ff4df65dad7b251c9968a2e4560836d8f5487c2f0cc238321ed951ea0504"}, + {file = "ruff-0.9.6-py3-none-macosx_11_0_arm64.whl", hash = "sha256:b109c0ad2ececf42e75fa99dc4043ff72a357436bb171900714a9ea581ddef83"}, + {file = "ruff-0.9.6-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1de4367cca3dac99bcbd15c161404e849bb0bfd543664db39232648dc00112dc"}, + {file = "ruff-0.9.6-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ac3ee4d7c2c92ddfdaedf0bf31b2b176fa7aa8950efc454628d477394d35638b"}, + {file = "ruff-0.9.6-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5dc1edd1775270e6aa2386119aea692039781429f0be1e0949ea5884e011aa8e"}, + {file = "ruff-0.9.6-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:4a091729086dffa4bd070aa5dab7e39cc6b9d62eb2bef8f3d91172d30d599666"}, + {file = "ruff-0.9.6-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d1bbc6808bf7b15796cef0815e1dfb796fbd383e7dbd4334709642649625e7c5"}, + {file = "ruff-0.9.6-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:589d1d9f25b5754ff230dce914a174a7c951a85a4e9270613a2b74231fdac2f5"}, + {file = "ruff-0.9.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc61dd5131742e21103fbbdcad683a8813be0e3c204472d520d9a5021ca8b217"}, + {file = "ruff-0.9.6-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5e2d9126161d0357e5c8f30b0bd6168d2c3872372f14481136d13de9937f79b6"}, + {file = "ruff-0.9.6-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:68660eab1a8e65babb5229a1f97b46e3120923757a68b5413d8561f8a85d4897"}, + {file = "ruff-0.9.6-py3-none-musllinux_1_2_i686.whl", hash = "sha256:c4cae6c4cc7b9b4017c71114115db0445b00a16de3bcde0946273e8392856f08"}, + {file = "ruff-0.9.6-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:19f505b643228b417c1111a2a536424ddde0db4ef9023b9e04a46ed8a1cb4656"}, + {file = "ruff-0.9.6-py3-none-win32.whl", hash = "sha256:194d8402bceef1b31164909540a597e0d913c0e4952015a5b40e28c146121b5d"}, + {file = "ruff-0.9.6-py3-none-win_amd64.whl", hash = "sha256:03482d5c09d90d4ee3f40d97578423698ad895c87314c4de39ed2af945633caa"}, + {file = "ruff-0.9.6-py3-none-win_arm64.whl", hash = "sha256:0e2bb706a2be7ddfea4a4af918562fdc1bcb16df255e5fa595bbd800ce322a5a"}, + {file = "ruff-0.9.6.tar.gz", hash = "sha256:81761592f72b620ec8fa1068a6fd00e98a5ebee342a3642efd84454f3031dca9"}, +] + +[[package]] +name = "typing-extensions" +version = "4.12.2" +description = "Backported and Experimental Type Hints for Python 3.8+" +optional = false +python-versions = ">=3.8" +groups = ["dev"] +files = [ + {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, + {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, +] + +[metadata] +lock-version = "2.1" +python-versions = "^3.11" +content-hash = "674a21dbda993a1ee761e2e6e2f13ccece8289336a83fd0a154285eac48f3a76" diff --git a/python/pyproject.toml b/python/pyproject.toml new file mode 100644 index 0000000..28c753e --- /dev/null +++ b/python/pyproject.toml @@ -0,0 +1,19 @@ +[project] +name = "cru" +version = "0.1.0" +requires-python = ">=3.11" +license = "MIT" + +[tool.poetry] +package-mode = false + +[tool.poetry.group.dev.dependencies] +mypy = "^1.13.0" +ruff = "^0.9.6" + +[tool.ruff.lint] +select = ["E", "F", "B"] + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/services/.gitignore b/services/.gitignore index b284dd9..e324eac 100644 --- a/services/.gitignore +++ b/services/.gitignore @@ -1,5 +1 @@ -__pycache__ -.venv -.mypy_cache - /generated diff --git a/services/.python-version b/services/.python-version deleted file mode 100644 index 2c07333..0000000 --- a/services/.python-version +++ /dev/null @@ -1 +0,0 @@ -3.11 diff --git a/services/gen-tplt b/services/gen-tplt deleted file mode 100755 index 38ceb33..0000000 --- a/services/gen-tplt +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env bash - -set -e - -script_dir="$(dirname "$0")" - -exec "$script_dir/manage" "template" "generate" "$@" diff --git a/services/git-add-user b/services/git-add-user deleted file mode 100755 index 2e500d2..0000000 --- a/services/git-add-user +++ /dev/null @@ -1,14 +0,0 @@ -#! /usr/bin/bash - -set -e - -script_dir="$(dirname "$0")" -. "$script_dir/common.bash" - -ps_dir="$CRUPEST_PROJECT_DIR/$CRUPEST_DATA_DIR/git/private" -ps_file="$ps_dir/user-info" -echo "Password file at $ps_file" -[[ -d "$ps_dir" ]] || mkdir -p "$ps_dir" -[[ -f "$ps_file" ]] || touch "$ps_file" - -exec docker run -it --rm -v "$ps_file:/user-info" httpd htpasswd "/user-info" "$1" diff --git a/services/manage b/services/manage index 01f3145..4589475 100755 --- a/services/manage +++ b/services/manage @@ -2,13 +2,23 @@ set -e -python3 --version > /dev/null 2>&1 || ( +python3 --version >/dev/null 2>&1 || ( echo Error: failed to run Python with python3 --version. exit 1 ) script_dir="$(dirname "$0")" -. "$script_dir/common.bash" -export PYTHONPATH="$CRUPEST_PROJECT_DIR/$CRUPEST_SERVICES_DIR:$PYTHONPATH" -python3 -m manager.service "$@" +# shellcheck disable=SC2046 +export $(xargs <"${script_dir:?}/base-config") + +CRUPEST_PROJECT_DIR="$(realpath "$script_dir/..")" +export CRUPEST_PROJECT_DIR + +export PYTHONPATH="$CRUPEST_PROJECT_DIR/python:$PYTHONPATH" + +if [[ "$#" != "0" ]] && [[ "$1" == "gen-tmpl" ]]; then + python3 -m cru.service template generate "${@:2}" +else + python3 -m cru.service "$@" +fi diff --git a/services/manager/__init__.py b/services/manager/__init__.py deleted file mode 100644 index 17799a9..0000000 --- a/services/manager/__init__.py +++ /dev/null @@ -1,60 +0,0 @@ -import sys - -from ._base import CRU, CruNamespaceError, CRU_NAME_PREFIXES -from ._error import ( - CruException, - CruLogicError, - CruInternalError, - CruUnreachableError, - cru_unreachable, -) -from ._const import ( - CruConstantBase, - CruDontChange, - CruNotFound, - CruNoValue, - CruPlaceholder, - CruUseDefault, -) -from ._func import CruFunction -from ._iter import CruIterable, CruIterator -from ._event import CruEvent, CruEventHandlerToken -from ._type import CruTypeSet, CruTypeCheckError - - -class CruInitError(CruException): - pass - - -def check_python_version(required_version=(3, 11)): - if sys.version_info < required_version: - raise CruInitError(f"Python version must be >= {required_version}!") - - -check_python_version() - -__all__ = [ - "CRU", - "CruNamespaceError", - "CRU_NAME_PREFIXES", - "check_python_version", - "CruException", - "CruInternalError", - "CruLogicError", - "CruUnreachableError", - "cru_unreachable", - "CruInitError", - "CruConstantBase", - "CruDontChange", - "CruNotFound", - "CruNoValue", - "CruPlaceholder", - "CruUseDefault", - "CruFunction", - "CruIterable", - "CruIterator", - "CruEvent", - "CruEventHandlerToken", - "CruTypeSet", - "CruTypeCheckError", -] diff --git a/services/manager/_base.py b/services/manager/_base.py deleted file mode 100644 index 2599d8f..0000000 --- a/services/manager/_base.py +++ /dev/null @@ -1,101 +0,0 @@ -from typing import Any - -from ._helper import remove_none -from ._error import CruException - - -class CruNamespaceError(CruException): - """Raised when a namespace is not found.""" - - -class _Cru: - NAME_PREFIXES = ("CRU_", "Cru", "cru_") - - def __init__(self) -> None: - self._d: dict[str, Any] = {} - - def all_names(self) -> list[str]: - return list(self._d.keys()) - - def get(self, name: str) -> Any: - return self._d[name] - - def has_name(self, name: str) -> bool: - return name in self._d - - @staticmethod - def _maybe_remove_prefix(name: str) -> str | None: - for prefix in _Cru.NAME_PREFIXES: - if name.startswith(prefix): - return name[len(prefix) :] - return None - - def _check_name_exist(self, *names: str | None) -> None: - for name in names: - if name is None: - continue - if self.has_name(name): - raise CruNamespaceError(f"Name {name} exists in CRU.") - - @staticmethod - def check_name_format(name: str) -> tuple[str, str]: - no_prefix_name = _Cru._maybe_remove_prefix(name) - if no_prefix_name is None: - raise CruNamespaceError( - f"Name {name} is not prefixed with any of {_Cru.NAME_PREFIXES}." - ) - return name, no_prefix_name - - @staticmethod - def _check_object_name(o) -> tuple[str, str]: - return _Cru.check_name_format(o.__name__) - - def _do_add(self, o, *names: str | None) -> list[str]: - name_list: list[str] = remove_none(names) - for name in name_list: - self._d[name] = o - return name_list - - def add(self, o, name: str | None) -> tuple[str, str | None]: - no_prefix_name: str | None - if name is None: - name, no_prefix_name = self._check_object_name(o) - else: - no_prefix_name = self._maybe_remove_prefix(name) - - self._check_name_exist(name, no_prefix_name) - self._do_add(o, name, no_prefix_name) - return name, no_prefix_name - - def add_with_alias(self, o, name: str | None = None, *aliases: str) -> list[str]: - final_names: list[str | None] = [] - no_prefix_name: str | None - if name is None: - name, no_prefix_name = self._check_object_name(o) - self._check_name_exist(name, no_prefix_name) - final_names.extend([name, no_prefix_name]) - for alias in aliases: - no_prefix_name = self._maybe_remove_prefix(alias) - self._check_name_exist(alias, no_prefix_name) - final_names.extend([alias, no_prefix_name]) - - return self._do_add(o, *final_names) - - def add_objects(self, *objects): - final_list = [] - for o in objects: - name, no_prefix_name = self._check_object_name(o) - self._check_name_exist(name, no_prefix_name) - final_list.append((o, name, no_prefix_name)) - for o, name, no_prefix_name in final_list: - self._do_add(o, name, no_prefix_name) - - def __getitem__(self, item): - return self.get(item) - - def __getattr__(self, item): - return self.get(item) - - -CRU_NAME_PREFIXES = _Cru.NAME_PREFIXES -CRU = _Cru() diff --git a/services/manager/_const.py b/services/manager/_const.py deleted file mode 100644 index 8246b35..0000000 --- a/services/manager/_const.py +++ /dev/null @@ -1,49 +0,0 @@ -from enum import Enum, auto -from typing import Self, TypeGuard, TypeVar - -from ._base import CRU - -_T = TypeVar("_T") - - -class CruConstantBase(Enum): - @classmethod - def check(cls, v: _T | Self) -> TypeGuard[Self]: - return isinstance(v, cls) - - @classmethod - def check_not(cls, v: _T | Self) -> TypeGuard[_T]: - return not cls.check(v) - - @classmethod - def value(cls) -> Self: - return cls.VALUE # type: ignore - - -class CruNotFound(CruConstantBase): - VALUE = auto() - - -class CruUseDefault(CruConstantBase): - VALUE = auto() - - -class CruDontChange(CruConstantBase): - VALUE = auto() - - -class CruNoValue(CruConstantBase): - VALUE = auto() - - -class CruPlaceholder(CruConstantBase): - VALUE = auto() - - -CRU.add_objects( - CruNotFound, - CruUseDefault, - CruDontChange, - CruNoValue, - CruPlaceholder, -) diff --git a/services/manager/_decorator.py b/services/manager/_decorator.py deleted file mode 100644 index 137fc05..0000000 --- a/services/manager/_decorator.py +++ /dev/null @@ -1,97 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable -from typing import ( - Concatenate, - Generic, - ParamSpec, - TypeVar, - cast, -) - -from ._base import CRU - -_P = ParamSpec("_P") -_T = TypeVar("_T") -_O = TypeVar("_O") -_R = TypeVar("_R") - - -class CruDecorator: - - class ConvertResult(Generic[_T, _O]): - def __init__( - self, - converter: Callable[[_T], _O], - ) -> None: - self.converter = converter - - def __call__(self, origin: Callable[_P, _T]) -> Callable[_P, _O]: - converter = self.converter - - def real_impl(*args: _P.args, **kwargs: _P.kwargs) -> _O: - return converter(origin(*args, **kwargs)) - - return real_impl - - class ImplementedBy(Generic[_T, _O, _P, _R]): - def __init__( - self, - impl: Callable[Concatenate[_O, _P], _R], - converter: Callable[[_T], _O], - ) -> None: - self.impl = impl - self.converter = converter - - def __call__( - self, _origin: Callable[[_T], None] - ) -> Callable[Concatenate[_T, _P], _R]: - converter = self.converter - impl = self.impl - - def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: - return cast(Callable[Concatenate[_O, _P], _R], impl)( - converter(_self), *args, **kwargs - ) - - return real_impl - - @staticmethod - def create_factory(converter: Callable[[_T], _O]) -> Callable[ - [Callable[Concatenate[_O, _P], _R]], - CruDecorator.ImplementedBy[_T, _O, _P, _R], - ]: - def create( - m: Callable[Concatenate[_O, _P], _R], - ) -> CruDecorator.ImplementedBy[_T, _O, _P, _R]: - return CruDecorator.ImplementedBy(m, converter) - - return create - - class ImplementedByNoSelf(Generic[_P, _R]): - def __init__(self, impl: Callable[_P, _R]) -> None: - self.impl = impl - - def __call__( - self, _origin: Callable[[_T], None] - ) -> Callable[Concatenate[_T, _P], _R]: - impl = self.impl - - def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: - return cast(Callable[_P, _R], impl)(*args, **kwargs) - - return real_impl - - @staticmethod - def create_factory() -> ( - Callable[[Callable[_P, _R]], CruDecorator.ImplementedByNoSelf[_P, _R]] - ): - def create( - m: Callable[_P, _R], - ) -> CruDecorator.ImplementedByNoSelf[_P, _R]: - return CruDecorator.ImplementedByNoSelf(m) - - return create - - -CRU.add_objects(CruDecorator) diff --git a/services/manager/_error.py b/services/manager/_error.py deleted file mode 100644 index e53c787..0000000 --- a/services/manager/_error.py +++ /dev/null @@ -1,89 +0,0 @@ -from __future__ import annotations - -from typing import NoReturn, cast, overload - - -class CruException(Exception): - """Base exception class of all exceptions in cru.""" - - @overload - def __init__( - self, - message: None = None, - *args, - user_message: str, - **kwargs, - ): ... - - @overload - def __init__( - self, - message: str, - *args, - user_message: str | None = None, - **kwargs, - ): ... - - def __init__( - self, - message: str | None = None, - *args, - user_message: str | None = None, - **kwargs, - ): - if message is None: - message = user_message - - super().__init__( - message, - *args, - **kwargs, - ) - self._message: str - self._message = cast(str, message) - self._user_message = user_message - - @property - def message(self) -> str: - return self._message - - def get_user_message(self) -> str | None: - return self._user_message - - def get_message(self, use_user: bool = True) -> str: - if use_user and self._user_message is not None: - return self._user_message - else: - return self._message - - @property - def is_internal(self) -> bool: - return False - - @property - def is_logic_error(self) -> bool: - return False - - -class CruLogicError(CruException): - """Raised when a logic error occurs.""" - - @property - def is_logic_error(self) -> bool: - return True - - -class CruInternalError(CruException): - """Raised when an internal error occurs.""" - - @property - def is_internal(self) -> bool: - return True - - -class CruUnreachableError(CruInternalError): - """Raised when a code path is unreachable.""" - - -def cru_unreachable() -> NoReturn: - raise CruUnreachableError("Code should not reach here!") diff --git a/services/manager/_event.py b/services/manager/_event.py deleted file mode 100644 index 51a794c..0000000 --- a/services/manager/_event.py +++ /dev/null @@ -1,61 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable -from typing import Generic, ParamSpec, TypeVar - -from .list import CruList - -_P = ParamSpec("_P") -_R = TypeVar("_R") - - -class CruEventHandlerToken(Generic[_P, _R]): - def __init__( - self, event: CruEvent, handler: Callable[_P, _R], once: bool = False - ) -> None: - self._event = event - self._handler = handler - self._once = once - - @property - def event(self) -> CruEvent: - return self._event - - @property - def handler(self) -> Callable[_P, _R]: - return self._handler - - @property - def once(self) -> bool: - return self._once - - -class CruEvent(Generic[_P, _R]): - def __init__(self, name: str) -> None: - self._name = name - self._tokens: CruList[CruEventHandlerToken] = CruList() - - def register( - self, handler: Callable[_P, _R], once: bool = False - ) -> CruEventHandlerToken: - token = CruEventHandlerToken(self, handler, once) - self._tokens.append(token) - return token - - def unregister(self, *handlers: CruEventHandlerToken | Callable[_P, _R]) -> int: - old_length = len(self._tokens) - self._tokens.reset( - self._tokens.as_cru_iterator().filter( - (lambda t: t in handlers or t.handler in handlers) - ) - ) - return old_length - len(self._tokens) - - def trigger(self, *args: _P.args, **kwargs: _P.kwargs) -> CruList[_R]: - results = CruList( - self._tokens.as_cru_iterator() - .transform(lambda t: t.handler(*args, **kwargs)) - .to_list() - ) - self._tokens.reset(self._tokens.as_cru_iterator().filter(lambda t: not t.once)) - return results diff --git a/services/manager/_func.py b/services/manager/_func.py deleted file mode 100644 index fc57802..0000000 --- a/services/manager/_func.py +++ /dev/null @@ -1,172 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable, Iterable -from enum import Flag, auto -from typing import ( - Any, - Generic, - Literal, - ParamSpec, - TypeAlias, - TypeVar, -) - - -from ._base import CRU -from ._const import CruPlaceholder - -_P = ParamSpec("_P") -_P1 = ParamSpec("_P1") -_T = TypeVar("_T") - - -class _Dec: - @staticmethod - def wrap( - origin: Callable[_P, Callable[_P1, _T]] - ) -> Callable[_P, _Wrapper[_P1, _T]]: - def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_P1, _T]: - return _Wrapper(origin(*args, **kwargs)) - - return _wrapped - - -class _RawBase: - @staticmethod - def none(*_v, **_kwargs) -> None: - return None - - @staticmethod - def true(*_v, **_kwargs) -> Literal[True]: - return True - - @staticmethod - def false(*_v, **_kwargs) -> Literal[False]: - return False - - @staticmethod - def identity(v: _T) -> _T: - return v - - @staticmethod - def only_you(v: _T, *_v, **_kwargs) -> _T: - return v - - @staticmethod - def equal(a: Any, b: Any) -> bool: - return a == b - - @staticmethod - def not_equal(a: Any, b: Any) -> bool: - return a != b - - @staticmethod - def not_(v: Any) -> Any: - return not v - - -class _Wrapper(Generic[_P, _T]): - def __init__(self, f: Callable[_P, _T]): - self._f = f - - @property - def me(self) -> Callable[_P, _T]: - return self._f - - def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: - return self._f(*args, **kwargs) - - @_Dec.wrap - def bind(self, *bind_args, **bind_kwargs) -> Callable[..., _T]: - func = self.me - - def bound_func(*args, **kwargs): - popped = 0 - real_args = [] - for arg in bind_args: - if CruPlaceholder.check(arg): - real_args.append(args[popped]) - popped += 1 - else: - real_args.append(arg) - real_args.extend(args[popped:]) - return func(*real_args, **(bind_kwargs | kwargs)) - - return bound_func - - class ChainMode(Flag): - ARGS = auto() - KWARGS = auto() - BOTH = ARGS | KWARGS - - ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]] - KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]] - ChainableCallable: TypeAlias = Callable[ - ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]] - ] - - @_Dec.wrap - def chain_with_args( - self, funcs: Iterable[ArgsChainableCallable], *bind_args, **bind_kwargs - ) -> ArgsChainableCallable: - def chained_func(*args): - args = self.bind(*bind_args, **bind_kwargs)(*args) - - for func in funcs: - args = _Wrapper(func).bind(*bind_args, **bind_kwargs)(*args) - return args - - return chained_func - - @_Dec.wrap - def chain_with_kwargs( - self, funcs: Iterable[KwargsChainableCallable], *bind_args, **bind_kwargs - ) -> KwargsChainableCallable: - def chained_func(**kwargs): - kwargs = self.bind(*bind_args, **bind_kwargs)(**kwargs) - for func in funcs: - kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(**kwargs) - return kwargs - - return chained_func - - @_Dec.wrap - def chain_with_both( - self, funcs: Iterable[ChainableCallable], *bind_args, **bind_kwargs - ) -> ChainableCallable: - def chained_func(*args, **kwargs): - for func in funcs: - args, kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)( - *args, **kwargs - ) - return args, kwargs - - return chained_func - - -class _Base: - none = _Wrapper(_RawBase.none) - true = _Wrapper(_RawBase.true) - false = _Wrapper(_RawBase.false) - identity = _Wrapper(_RawBase.identity) - only_you = _Wrapper(_RawBase.only_you) - equal = _Wrapper(_RawBase.equal) - not_equal = _Wrapper(_RawBase.not_equal) - not_ = _Wrapper(_RawBase.not_) - - -class _Creators: - @staticmethod - def make_isinstance_of_types(*types: type) -> Callable: - return _Wrapper(lambda v: type(v) in types) - - -class CruFunction: - RawBase: TypeAlias = _RawBase - Base: TypeAlias = _Base - Creators: TypeAlias = _Creators - Wrapper: TypeAlias = _Wrapper - Decorators: TypeAlias = _Dec - - -CRU.add_objects(CruFunction) diff --git a/services/manager/_helper.py b/services/manager/_helper.py deleted file mode 100644 index 43baf46..0000000 --- a/services/manager/_helper.py +++ /dev/null @@ -1,16 +0,0 @@ -from collections.abc import Callable -from typing import Any, Iterable, TypeVar, cast - -_T = TypeVar("_T") -_D = TypeVar("_D") - - -def remove_element( - iterable: Iterable[_T | None], to_rm: Iterable[Any], des: type[_D] | None = None -) -> _D: - to_rm = set(to_rm) - return cast(Callable[..., _D], des or list)(v for v in iterable if v not in to_rm) - - -def remove_none(iterable: Iterable[_T | None], des: type[_D] | None = None) -> _D: - return cast(Callable[..., _D], des or list)(v for v in iterable if v is not None) diff --git a/services/manager/_iter.py b/services/manager/_iter.py deleted file mode 100644 index f9683ca..0000000 --- a/services/manager/_iter.py +++ /dev/null @@ -1,469 +0,0 @@ -from __future__ import annotations - -from collections.abc import Iterable, Callable, Generator, Iterator -from dataclasses import dataclass -from enum import Enum -from typing import ( - Concatenate, - Literal, - Never, - Self, - TypeAlias, - TypeVar, - ParamSpec, - Any, - Generic, - cast, -) - -from ._base import CRU -from ._const import CruNotFound -from ._error import cru_unreachable - -_P = ParamSpec("_P") -_T = TypeVar("_T") -_O = TypeVar("_O") -_V = TypeVar("_V") -_R = TypeVar("_R") - - -class _Generic: - class StepActionKind(Enum): - SKIP = 0 - PUSH = 1 - STOP = 2 - AGGREGATE = 3 - - @dataclass - class StepAction(Generic[_V, _R]): - value: Iterable[Self] | _V | _R | None - kind: _Generic.StepActionKind - - @property - def push_value(self) -> _V: - assert self.kind == _Generic.StepActionKind.PUSH - return cast(_V, self.value) - - @property - def stop_value(self) -> _R: - assert self.kind == _Generic.StepActionKind.STOP - return cast(_R, self.value) - - @staticmethod - def skip() -> _Generic.StepAction[_V, _R]: - return _Generic.StepAction(None, _Generic.StepActionKind.SKIP) - - @staticmethod - def push(value: _V | None) -> _Generic.StepAction[_V, _R]: - return _Generic.StepAction(value, _Generic.StepActionKind.PUSH) - - @staticmethod - def stop(value: _R | None = None) -> _Generic.StepAction[_V, _R]: - return _Generic.StepAction(value, _Generic.StepActionKind.STOP) - - @staticmethod - def aggregate( - *results: _Generic.StepAction[_V, _R], - ) -> _Generic.StepAction[_V, _R]: - return _Generic.StepAction(results, _Generic.StepActionKind.AGGREGATE) - - @staticmethod - def push_last(value: _V | None) -> _Generic.StepAction[_V, _R]: - return _Generic.StepAction.aggregate( - _Generic.StepAction.push(value), _Generic.StepAction.stop() - ) - - def flatten(self) -> Iterable[Self]: - return _Generic.flatten( - self, - is_leave=lambda r: r.kind != _Generic.StepActionKind.AGGREGATE, - get_children=lambda r: cast(Iterable[Self], r.value), - ) - - GeneralStepAction: TypeAlias = StepAction[_V, _R] | _V | _R | None - IterateOperation: TypeAlias = Callable[[_T, int], GeneralStepAction[_V, _R]] - IteratePreHook: TypeAlias = Callable[[Iterable[_T]], GeneralStepAction[_V, _R]] - IteratePostHook: TypeAlias = Callable[[int], GeneralStepAction[_V, _R]] - - @staticmethod - def _is_not_iterable(o: Any) -> bool: - return not isinstance(o, Iterable) - - @staticmethod - def _return_self(o): - return o - - @staticmethod - def iterable_flatten( - maybe_iterable: Iterable[_T] | _T, max_depth: int = -1, *, _depth: int = 0 - ) -> Iterable[Iterable[_T] | _T]: - if _depth == max_depth or not isinstance(maybe_iterable, Iterable): - yield maybe_iterable - return - - for child in maybe_iterable: - yield from _Generic.iterable_flatten( - child, - max_depth, - _depth=_depth + 1, - ) - - @staticmethod - def flatten( - o: _O, - max_depth: int = -1, - /, - is_leave: CruIterator.ElementPredicate[_O] = _is_not_iterable, - get_children: CruIterator.ElementTransformer[_O, Iterable[_O]] = _return_self, - *, - _depth: int = 0, - ) -> Iterable[_O]: - if _depth == max_depth or is_leave(o): - yield o - return - for child in get_children(o): - yield from _Generic.flatten( - child, - max_depth, - is_leave, - get_children, - _depth=_depth + 1, - ) - - class Results: - @staticmethod - def true(_) -> Literal[True]: - return True - - @staticmethod - def false(_) -> Literal[False]: - return False - - @staticmethod - def not_found(_) -> Literal[CruNotFound.VALUE]: - return CruNotFound.VALUE - - @staticmethod - def _non_result_to_push(value: Any) -> StepAction[_V, _R]: - return _Generic.StepAction.push(value) - - @staticmethod - def _non_result_to_stop(value: Any) -> StepAction[_V, _R]: - return _Generic.StepAction.stop(value) - - @staticmethod - def _none_hook(_: Any) -> StepAction[_V, _R]: - return _Generic.StepAction.skip() - - def iterate( - iterable: Iterable[_T], - operation: IterateOperation[_T, _V, _R], - fallback_return: _R, - pre_iterate: IteratePreHook[_T, _V, _R], - post_iterate: IteratePostHook[_V, _R], - convert_value_result: Callable[[_V | _R | None], StepAction[_V, _R]], - ) -> Generator[_V, None, _R]: - pre_result = pre_iterate(iterable) - if not isinstance(pre_result, _Generic.StepAction): - real_pre_result = convert_value_result(pre_result) - for r in real_pre_result.flatten(): - if r.kind == _Generic.StepActionKind.STOP: - return r.stop_value - elif r.kind == _Generic.StepActionKind.PUSH: - yield r.push_value - else: - assert r.kind == _Generic.StepActionKind.SKIP - - for index, element in enumerate(iterable): - result = operation(element, index) - if not isinstance(result, _Generic.StepAction): - real_result = convert_value_result(result) - for r in real_result.flatten(): - if r.kind == _Generic.StepActionKind.STOP: - return r.stop_value - elif r.kind == _Generic.StepActionKind.PUSH: - yield r.push_value - else: - assert r.kind == _Generic.StepActionKind.SKIP - continue - - post_result = post_iterate(index + 1) - if not isinstance(post_result, _Generic.StepAction): - real_post_result = convert_value_result(post_result) - for r in real_post_result.flatten(): - if r.kind == _Generic.StepActionKind.STOP: - return r.stop_value - elif r.kind == _Generic.StepActionKind.PUSH: - yield r.push_value - else: - assert r.kind == _Generic.StepActionKind.SKIP - - return fallback_return - - def create_new( - iterable: Iterable[_T], - operation: IterateOperation[_T, _V, _R], - fallback_return: _R, - /, - pre_iterate: IteratePreHook[_T, _V, _R] | None = None, - post_iterate: IteratePostHook[_V, _R] | None = None, - ) -> Generator[_V, None, _R]: - return _Generic.iterate( - iterable, - operation, - fallback_return, - pre_iterate or _Generic._none_hook, - post_iterate or _Generic._none_hook, - _Generic._non_result_to_push, - ) - - def get_result( - iterable: Iterable[_T], - operation: IterateOperation[_T, _V, _R], - fallback_return: _R, - /, - pre_iterate: IteratePreHook[_T, _V, _R] | None = None, - post_iterate: IteratePostHook[_V, _R] | None = None, - ) -> _R: - try: - for _ in _Generic.iterate( - iterable, - operation, - fallback_return, - pre_iterate or _Generic._none_hook, - post_iterate or _Generic._none_hook, - _Generic._non_result_to_stop, - ): - pass - except StopIteration as stop: - return stop.value - cru_unreachable() - - -class _Helpers: - @staticmethod - def auto_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]: - count = 0 - - def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _O: - nonlocal count - r = c(count, *args, **kwargs) - count += 1 - return r - - return wrapper - - -class _Creators: - class Raw: - @staticmethod - def empty() -> Iterator[Never]: - return iter([]) - - @staticmethod - def range(*args) -> Iterator[int]: - return iter(range(*args)) - - @staticmethod - def unite(*args: _T) -> Iterator[_T]: - return iter(args) - - @staticmethod - def _concat(*iterables: Iterable[_T]) -> Iterable[_T]: - for iterable in iterables: - yield from iterable - - @staticmethod - def concat(*iterables: Iterable[_T]) -> Iterator[_T]: - return iter(_Creators.Raw._concat(*iterables)) - - @staticmethod - def _wrap(f: Callable[_P, Iterable[_O]]) -> Callable[_P, CruIterator[_O]]: - def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> CruIterator[_O]: - return CruIterator(f(*args, **kwargs)) - - return _wrapped - - empty = _wrap(Raw.empty) - range = _wrap(Raw.range) - unite = _wrap(Raw.unite) - concat = _wrap(Raw.concat) - - -class CruIterator(Generic[_T]): - ElementOperation: TypeAlias = Callable[[_V], Any] - ElementPredicate: TypeAlias = Callable[[_V], bool] - AnyElementPredicate: TypeAlias = ElementPredicate[Any] - ElementTransformer: TypeAlias = Callable[[_V], _O] - SelfElementTransformer: TypeAlias = ElementTransformer[_V, _V] - AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] - - Creators: TypeAlias = _Creators - Helpers: TypeAlias = _Helpers - - def __init__(self, iterable: Iterable[_T]) -> None: - self._iterator = iter(iterable) - - def __iter__(self) -> Iterator[_T]: - return self._iterator - - def create_new_me(self, iterable: Iterable[_O]) -> CruIterator[_O]: - return type(self)(iterable) # type: ignore - - @staticmethod - def _wrap( - f: Callable[Concatenate[CruIterator[_T], _P], Iterable[_O]], - ) -> Callable[Concatenate[CruIterator[_T], _P], CruIterator[_O]]: - def _wrapped( - self: CruIterator[_T], *args: _P.args, **kwargs: _P.kwargs - ) -> CruIterator[_O]: - return self.create_new_me(f(self, *args, **kwargs)) - - return _wrapped - - @_wrap - def replace_me(self, iterable: Iterable[_O]) -> Iterable[_O]: - return iterable - - def replace_me_with_empty(self) -> CruIterator[Never]: - return self.create_new_me(_Creators.Raw.empty()) - - def replace_me_with_range(self, *args) -> CruIterator[int]: - return self.create_new_me(_Creators.Raw.range(*args)) - - def replace_me_with_unite(self, *args: _O) -> CruIterator[_O]: - return self.create_new_me(_Creators.Raw.unite(*args)) - - def replace_me_with_concat(self, *iterables: Iterable[_T]) -> CruIterator[_T]: - return self.create_new_me(_Creators.Raw.concat(*iterables)) - - def to_set(self) -> set[_T]: - return set(self) - - def to_list(self) -> list[_T]: - return list(self) - - def all(self, predicate: ElementPredicate[_T]) -> bool: - for value in self: - if not predicate(value): - return False - return True - - def any(self, predicate: ElementPredicate[_T]) -> bool: - for value in self: - if predicate(value): - return True - return False - - def foreach(self, operation: ElementOperation[_T]) -> None: - for value in self: - operation(value) - - @_wrap - def transform(self, transformer: ElementTransformer[_T, _O]) -> Iterable[_O]: - for value in self: - yield transformer(value) - - map = transform - - @_wrap - def filter(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: - for value in self: - if predicate(value): - yield value - - @_wrap - def continue_if(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: - for value in self: - yield value - if not predicate(value): - break - - def first_n(self, max_count: int) -> CruIterator[_T]: - if max_count < 0: - raise ValueError("max_count must be 0 or positive.") - if max_count == 0: - return self.replace_me_with_empty() # type: ignore - return self.continue_if(_Helpers.auto_count(lambda i, _: i < max_count - 1)) - - def drop_n(self, n: int) -> CruIterator[_T]: - if n < 0: - raise ValueError("n must be 0 or positive.") - if n == 0: - return self - return self.filter(_Helpers.auto_count(lambda i, _: i < n)) - - def single_or( - self, fallback: _O | CruNotFound = CruNotFound.VALUE - ) -> _T | _O | CruNotFound: - first_2 = self.first_n(2) - has_value = False - for element in first_2: - if has_value: - raise ValueError("More than one value found.") - has_value = True - value = element - if has_value: - return value - else: - return fallback - - def first_or( - self, fallback: _O | CruNotFound = CruNotFound.VALUE - ) -> _T | _O | CruNotFound: - return self.first_n(1).single_or(fallback) - - @_wrap - def flatten(self, max_depth: int = -1) -> Iterable[Any]: - return _Generic.iterable_flatten(self, max_depth) - - def select_by_indices(self, indices: Iterable[int]) -> CruIterator[_T]: - index_set = set(indices) - max_index = max(index_set) - return self.first_n(max_index + 1).filter( - _Helpers.auto_count(lambda i, _: i in index_set) - ) - - def remove_values(self, values: Iterable[Any]) -> CruIterator[_T]: - value_set = set(values) - return self.filter(lambda v: v not in value_set) - - def replace_values( - self, old_values: Iterable[Any], new_value: _O - ) -> Iterable[_T | _O]: - value_set = set(old_values) - return self.transform(lambda v: new_value if v in value_set else v) - - def group_by(self, key_getter: Callable[[_T], _O]) -> dict[_O, list[_T]]: - result: dict[_O, list[_T]] = {} - - for item in self: - key = key_getter(item) - if key not in result: - result[key] = [] - result[key].append(item) - - return result - - def join_str(self: CruIterator[str], separator: str) -> str: - return separator.join(self) - - -class CruIterMixin(Generic[_T]): - def cru_iter(self: Iterable[_T]) -> CruIterator[_T]: - return CruIterator(self) - - -class CruIterList(list[_T], CruIterMixin[_T]): - pass - - -class CruIterable: - Generic: TypeAlias = _Generic - Iterator: TypeAlias = CruIterator[_T] - Helpers: TypeAlias = _Helpers - Mixin: TypeAlias = CruIterMixin[_T] - IterList: TypeAlias = CruIterList[_T] - - -CRU.add_objects(CruIterable, CruIterator) diff --git a/services/manager/_type.py b/services/manager/_type.py deleted file mode 100644 index 1f81da3..0000000 --- a/services/manager/_type.py +++ /dev/null @@ -1,52 +0,0 @@ -from collections.abc import Iterable -from typing import Any - -from ._error import CruException, CruLogicError -from ._iter import CruIterator - - -class CruTypeCheckError(CruException): - pass - - -DEFAULT_NONE_ERR_MSG = "None is not allowed here." -DEFAULT_TYPE_ERR_MSG = "Object of this type is not allowed here." - - -class CruTypeSet(set[type]): - def __init__(self, *types: type): - type_set = CruIterator(types).filter(lambda t: t is not None).to_set() - if not CruIterator(type_set).all(lambda t: isinstance(t, type)): - raise CruLogicError("TypeSet can only contain type.") - super().__init__(type_set) - - def check_value( - self, - value: Any, - /, - allow_none: bool = False, - empty_allow_all: bool = True, - ) -> None: - if value is None: - if allow_none: - return - else: - raise CruTypeCheckError(DEFAULT_NONE_ERR_MSG) - if len(self) == 0 and empty_allow_all: - return - if not CruIterator(self).any(lambda t: isinstance(value, t)): - raise CruTypeCheckError(DEFAULT_TYPE_ERR_MSG) - - def check_value_list( - self, - values: Iterable[Any], - /, - allow_none: bool = False, - empty_allow_all: bool = True, - ) -> None: - for value in values: - self.check_value( - value, - allow_none, - empty_allow_all, - ) diff --git a/services/manager/attr.py b/services/manager/attr.py deleted file mode 100644 index d4cc86a..0000000 --- a/services/manager/attr.py +++ /dev/null @@ -1,364 +0,0 @@ -from __future__ import annotations - -import copy -from collections.abc import Callable, Iterable -from dataclasses import dataclass, field -from typing import Any - -from .list import CruUniqueKeyList -from ._type import CruTypeSet -from ._const import CruNotFound, CruUseDefault, CruDontChange -from ._iter import CruIterator - - -@dataclass -class CruAttr: - - name: str - value: Any - description: str | None - - @staticmethod - def make( - name: str, value: Any = CruUseDefault.VALUE, description: str | None = None - ) -> CruAttr: - return CruAttr(name, value, description) - - -CruAttrDefaultFactory = Callable[["CruAttrDef"], Any] -CruAttrTransformer = Callable[[Any, "CruAttrDef"], Any] -CruAttrValidator = Callable[[Any, "CruAttrDef"], None] - - -@dataclass -class CruAttrDef: - name: str - description: str - default_factory: CruAttrDefaultFactory - transformer: CruAttrTransformer - validator: CruAttrValidator - - def __init__( - self, - name: str, - description: str, - default_factory: CruAttrDefaultFactory, - transformer: CruAttrTransformer, - validator: CruAttrValidator, - ) -> None: - self.name = name - self.description = description - self.default_factory = default_factory - self.transformer = transformer - self.validator = validator - - def transform(self, value: Any) -> Any: - if self.transformer is not None: - return self.transformer(value, self) - return value - - def validate(self, value: Any, /, force_allow_none: bool = False) -> None: - if force_allow_none is value is None: - return - if self.validator is not None: - self.validator(value, self) - - def transform_and_validate( - self, value: Any, /, force_allow_none: bool = False - ) -> Any: - value = self.transform(value) - self.validate(value, force_allow_none) - return value - - def make_default_value(self) -> Any: - return self.transform_and_validate(self.default_factory(self)) - - def adopt(self, attr: CruAttr) -> CruAttr: - attr = copy.deepcopy(attr) - - if attr.name is None: - attr.name = self.name - elif attr.name != self.name: - raise ValueError(f"Attr name is not match: {attr.name} != {self.name}") - - if attr.value is CruUseDefault.VALUE: - attr.value = self.make_default_value() - else: - attr.value = self.transform_and_validate(attr.value) - - if attr.description is None: - attr.description = self.description - - return attr - - def make( - self, value: Any = CruUseDefault.VALUE, description: None | str = None - ) -> CruAttr: - value = self.make_default_value() if value is CruUseDefault.VALUE else value - value = self.transform_and_validate(value) - return CruAttr( - self.name, - value, - description if description is not None else self.description, - ) - - -@dataclass -class CruAttrDefBuilder: - - name: str - description: str - types: list[type] | None = field(default=None) - allow_none: bool = field(default=False) - default: Any = field(default=CruUseDefault.VALUE) - default_factory: CruAttrDefaultFactory | None = field(default=None) - auto_list: bool = field(default=False) - transformers: list[CruAttrTransformer] = field(default_factory=list) - validators: list[CruAttrValidator] = field(default_factory=list) - override_transformer: CruAttrTransformer | None = field(default=None) - override_validator: CruAttrValidator | None = field(default=None) - - build_hook: Callable[[CruAttrDef], None] | None = field(default=None) - - def __init__(self, name: str, description: str) -> None: - super().__init__() - self.name = name - self.description = description - - def auto_adjust_default(self) -> None: - if self.default is not CruUseDefault.VALUE and self.default is not None: - return - if self.allow_none and self.default is CruUseDefault.VALUE: - self.default = None - if not self.allow_none and self.default is None: - self.default = CruUseDefault.VALUE - if self.auto_list and not self.allow_none: - self.default = [] - - def with_name(self, name: str | CruDontChange) -> CruAttrDefBuilder: - if name is not CruDontChange.VALUE: - self.name = name - return self - - def with_description( - self, default_description: str | CruDontChange - ) -> CruAttrDefBuilder: - if default_description is not CruDontChange.VALUE: - self.description = default_description - return self - - def with_default(self, default: Any) -> CruAttrDefBuilder: - if default is not CruDontChange.VALUE: - self.default = default - return self - - def with_default_factory( - self, - default_factory: CruAttrDefaultFactory | CruDontChange, - ) -> CruAttrDefBuilder: - if default_factory is not CruDontChange.VALUE: - self.default_factory = default_factory - return self - - def with_types( - self, - types: Iterable[type] | None | CruDontChange, - ) -> CruAttrDefBuilder: - if types is not CruDontChange.VALUE: - self.types = None if types is None else list(types) - return self - - def with_allow_none(self, allow_none: bool | CruDontChange) -> CruAttrDefBuilder: - if allow_none is not CruDontChange.VALUE: - self.allow_none = allow_none - return self - - def with_auto_list( - self, auto_list: bool | CruDontChange = True - ) -> CruAttrDefBuilder: - if auto_list is not CruDontChange.VALUE: - self.auto_list = auto_list - return self - - def with_constraint( - self, - /, - allow_none: bool | CruDontChange = CruDontChange.VALUE, - types: Iterable[type] | None | CruDontChange = CruDontChange.VALUE, - default: Any = CruDontChange.VALUE, - default_factory: CruAttrDefaultFactory | CruDontChange = CruDontChange.VALUE, - auto_list: bool | CruDontChange = CruDontChange.VALUE, - ) -> CruAttrDefBuilder: - return ( - self.with_allow_none(allow_none) - .with_types(types) - .with_default(default) - .with_default_factory(default_factory) - .with_auto_list(auto_list) - ) - - def add_transformer(self, transformer: CruAttrTransformer) -> CruAttrDefBuilder: - self.transformers.append(transformer) - return self - - def clear_transformers(self) -> CruAttrDefBuilder: - self.transformers.clear() - return self - - def add_validator(self, validator: CruAttrValidator) -> CruAttrDefBuilder: - self.validators.append(validator) - return self - - def clear_validators(self) -> CruAttrDefBuilder: - self.validators.clear() - return self - - def with_override_transformer( - self, override_transformer: CruAttrTransformer | None | CruDontChange - ) -> CruAttrDefBuilder: - if override_transformer is not CruDontChange.VALUE: - self.override_transformer = override_transformer - return self - - def with_override_validator( - self, override_validator: CruAttrValidator | None | CruDontChange - ) -> CruAttrDefBuilder: - if override_validator is not CruDontChange.VALUE: - self.override_validator = override_validator - return self - - def is_valid(self) -> tuple[bool, str]: - if not isinstance(self.name, str): - return False, "Name must be a string!" - if not isinstance(self.description, str): - return False, "Default description must be a string!" - if ( - not self.allow_none - and self.default is None - and self.default_factory is None - ): - return False, "Default must be set if allow_none is False!" - return True, "" - - @staticmethod - def _build( - builder: CruAttrDefBuilder, auto_adjust_default: bool = True - ) -> CruAttrDef: - if auto_adjust_default: - builder.auto_adjust_default() - - valid, err = builder.is_valid() - if not valid: - raise ValueError(err) - - def composed_transformer(value: Any, attr_def: CruAttrDef) -> Any: - def transform_value(single_value: Any) -> Any: - for transformer in builder.transformers: - single_value = transformer(single_value, attr_def) - return single_value - - if builder.auto_list: - if not isinstance(value, list): - value = [value] - value = CruIterator(value).transform(transform_value).to_list() - - else: - value = transform_value(value) - return value - - type_set = None if builder.types is None else CruTypeSet(*builder.types) - - def composed_validator(value: Any, attr_def: CruAttrDef): - def validate_value(single_value: Any) -> None: - if type_set is not None: - type_set.check_value(single_value, allow_none=builder.allow_none) - for validator in builder.validators: - validator(single_value, attr_def) - - if builder.auto_list: - CruIterator(value).foreach(validate_value) - else: - validate_value(value) - - real_transformer = builder.override_transformer or composed_transformer - real_validator = builder.override_validator or composed_validator - - default_factory = builder.default_factory - if default_factory is None: - - def default_factory(_d): - return copy.deepcopy(builder.default) - - d = CruAttrDef( - builder.name, - builder.description, - default_factory, - real_transformer, - real_validator, - ) - if builder.build_hook: - builder.build_hook(d) - return d - - def build(self, auto_adjust_default=True) -> CruAttrDef: - c = copy.deepcopy(self) - self.build_hook = None - return CruAttrDefBuilder._build(c, auto_adjust_default) - - -class CruAttrDefRegistry(CruUniqueKeyList[CruAttrDef, str]): - - def __init__(self) -> None: - super().__init__(lambda d: d.name) - - def make_builder(self, name: str, default_description: str) -> CruAttrDefBuilder: - b = CruAttrDefBuilder(name, default_description) - b.build_hook = lambda a: self.add(a) - return b - - def adopt(self, attr: CruAttr) -> CruAttr: - d = self.get(attr.name) - return d.adopt(attr) - - -class CruAttrTable(CruUniqueKeyList[CruAttr, str]): - def __init__(self, registry: CruAttrDefRegistry) -> None: - self._registry: CruAttrDefRegistry = registry - super().__init__(lambda a: a.name, before_add=registry.adopt) - - @property - def registry(self) -> CruAttrDefRegistry: - return self._registry - - def get_value_or(self, name: str, fallback: Any = CruNotFound.VALUE) -> Any: - a = self.get_or(name, CruNotFound.VALUE) - if a is CruNotFound.VALUE: - return fallback - return a.value - - def get_value(self, name: str) -> Any: - a = self.get(name) - return a.value - - def make_attr( - self, - name: str, - value: Any = CruUseDefault.VALUE, - /, - description: str | None = None, - ) -> CruAttr: - d = self._registry.get(name) - return d.make(value, description or d.description) - - def add_value( - self, - name: str, - value: Any = CruUseDefault.VALUE, - /, - description: str | None = None, - *, - replace: bool = False, - ) -> CruAttr: - attr = self.make_attr(name, value, description) - self.add(attr, replace) - return attr diff --git a/services/manager/config.py b/services/manager/config.py deleted file mode 100644 index 0f6f0d0..0000000 --- a/services/manager/config.py +++ /dev/null @@ -1,196 +0,0 @@ -from __future__ import annotations - -from typing import Any, TypeVar, Generic - -from ._error import CruException -from .list import CruUniqueKeyList -from .value import ( - INTEGER_VALUE_TYPE, - TEXT_VALUE_TYPE, - CruValueTypeError, - ValueGeneratorBase, - ValueType, -) - -_T = TypeVar("_T") - - -class CruConfigError(CruException): - def __init__(self, message: str, item: ConfigItem, *args, **kwargs): - super().__init__(message, *args, **kwargs) - self._item = item - - @property - def item(self) -> ConfigItem: - return self._item - - -class ConfigItem(Generic[_T]): - def __init__( - self, - name: str, - description: str, - value_type: ValueType[_T], - value: _T | None = None, - /, - default: ValueGeneratorBase[_T] | _T | None = None, - ) -> None: - self._name = name - self._description = description - self._value_type = value_type - self._value = value - self._default = default - - @property - def name(self) -> str: - return self._name - - @property - def description(self) -> str: - return self._description - - @property - def value_type(self) -> ValueType[_T]: - return self._value_type - - @property - def is_set(self) -> bool: - return self._value is not None - - @property - def value(self) -> _T: - if self._value is None: - raise CruConfigError( - "Config value is not set.", - self, - user_message=f"Config item {self.name} is not set.", - ) - return self._value - - @property - def value_str(self) -> str: - return self.value_type.convert_value_to_str(self.value) - - def set_value(self, v: _T | str, allow_convert_from_str=False): - if allow_convert_from_str: - self._value = self.value_type.check_value_or_try_convert_from_str(v) - else: - self._value = self.value_type.check_value(v) - - def reset(self): - self._value = None - - @property - def default(self) -> ValueGeneratorBase[_T] | _T | None: - return self._default - - @property - def can_generate_default(self) -> bool: - return self.default is not None - - def generate_default_value(self) -> _T: - if self.default is None: - raise CruConfigError( - "Config item does not support default value generation.", self - ) - elif isinstance(self.default, ValueGeneratorBase): - v = self.default.generate() - else: - v = self.default - try: - self.value_type.check_value(v) - return v - except CruValueTypeError as e: - raise CruConfigError( - "Config value generator returns an invalid value.", self - ) from e - - def copy(self) -> "ConfigItem": - return ConfigItem( - self.name, - self.description, - self.value_type, - self.value, - self.default, - ) - - @property - def description_str(self) -> str: - return f"{self.name} ({self.value_type.name}): {self.description}" - - -class Configuration(CruUniqueKeyList[ConfigItem[Any], str]): - def __init__(self): - super().__init__(lambda c: c.name) - - def get_set_items(self) -> list[ConfigItem[Any]]: - return [item for item in self if item.is_set] - - def get_unset_items(self) -> list[ConfigItem[Any]]: - return [item for item in self if not item.is_set] - - @property - def all_set(self) -> bool: - return len(self.get_unset_items()) == 0 - - @property - def all_not_set(self) -> bool: - return len(self.get_set_items()) == 0 - - def add_text_config( - self, - name: str, - description: str, - value: str | None = None, - default: ValueGeneratorBase[str] | str | None = None, - ) -> ConfigItem[str]: - item = ConfigItem(name, description, TEXT_VALUE_TYPE, value, default) - self.add(item) - return item - - def add_int_config( - self, - name: str, - description: str, - value: int | None = None, - default: ValueGeneratorBase[int] | int | None = None, - ) -> ConfigItem[int]: - item = ConfigItem(name, description, INTEGER_VALUE_TYPE, value, default) - self.add(item) - return item - - def set_config_item( - self, - name: str, - value: Any | str, - allow_convert_from_str=True, - ) -> None: - item = self.get(name) - item.set_value( - value, - allow_convert_from_str=allow_convert_from_str, - ) - - def reset_all(self) -> None: - for item in self: - item.reset() - - def to_dict(self) -> dict[str, Any]: - return {item.name: item.value for item in self} - - def to_str_dict(self) -> dict[str, str]: - return { - item.name: item.value_type.convert_value_to_str(item.value) for item in self - } - - def set_value_dict( - self, - value_dict: dict[str, Any], - allow_convert_from_str: bool = False, - ) -> None: - for name, value in value_dict.items(): - item = self.get(name) - item.set_value( - value, - allow_convert_from_str=allow_convert_from_str, - ) diff --git a/services/manager/list.py b/services/manager/list.py deleted file mode 100644 index 216a561..0000000 --- a/services/manager/list.py +++ /dev/null @@ -1,160 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable, Iterator -from typing import Any, Generic, Iterable, TypeAlias, TypeVar, overload - -from ._error import CruInternalError -from ._iter import CruIterator -from ._const import CruNotFound - -_T = TypeVar("_T") -_O = TypeVar("_O") - - -class CruListEdit(CruIterator[_T]): - def __init__(self, iterable: Iterable[_T], _list: CruList[Any]) -> None: - super().__init__(iterable) - self._list = _list - - def create_me(self, iterable: Iterable[_O]) -> CruListEdit[_O]: - return CruListEdit(iterable, self._list) - - @property - def list(self) -> CruList[Any]: - return self._list - - def done(self) -> CruList[Any]: - self._list.reset(self) - return self._list - - -class CruList(list[_T]): - def reset(self, new_values: Iterable[_T]): - if self is new_values: - new_values = list(new_values) - self.clear() - self.extend(new_values) - return self - - def as_cru_iterator(self) -> CruIterator[_T]: - return CruIterator(self) - - @staticmethod - def make(maybe_list: Iterable[_T] | _T | None) -> CruList[_T]: - if maybe_list is None: - return CruList() - if isinstance(maybe_list, Iterable): - return CruList(maybe_list) - return CruList([maybe_list]) - - -_K = TypeVar("_K") - -_KeyGetter: TypeAlias = Callable[[_T], _K] - - -class CruUniqueKeyList(Generic[_T, _K]): - def __init__( - self, - key_getter: _KeyGetter[_T, _K], - *, - before_add: Callable[[_T], _T] | None = None, - ): - super().__init__() - self._key_getter = key_getter - self._before_add = before_add - self._list: CruList[_T] = CruList() - - @property - def key_getter(self) -> _KeyGetter[_T, _K]: - return self._key_getter - - @property - def internal_list(self) -> CruList[_T]: - return self._list - - def validate_self(self): - keys = self._list.transform(self._key_getter) - if len(keys) != len(set(keys)): - raise CruInternalError("Duplicate keys!") - - @overload - def get_or( - self, key: _K, fallback: CruNotFound = CruNotFound.VALUE - ) -> _T | CruNotFound: ... - - @overload - def get_or(self, key: _K, fallback: _O) -> _T | _O: ... - - def get_or( - self, key: _K, fallback: _O | CruNotFound = CruNotFound.VALUE - ) -> _T | _O | CruNotFound: - return ( - self._list.as_cru_iterator() - .filter(lambda v: key == self._key_getter(v)) - .first_or(fallback) - ) - - def get(self, key: _K) -> _T: - value = self.get_or(key) - if value is CruNotFound.VALUE: - raise KeyError(f"Key {key} not found!") - return value # type: ignore - - @property - def keys(self) -> Iterable[_K]: - return self._list.as_cru_iterator().map(self._key_getter) - - def has_key(self, key: _K) -> bool: - return self.get_or(key) != CruNotFound.VALUE - - def try_remove(self, key: _K) -> bool: - value = self.get_or(key) - if value is CruNotFound.VALUE: - return False - self._list.remove(value) - return True - - def remove(self, key: _K, allow_absence: bool = False) -> None: - if not self.try_remove(key) and not allow_absence: - raise KeyError(f"Key {key} not found!") - - def add(self, value: _T, /, replace: bool = False) -> None: - v = self.get_or(self._key_getter(value)) - if v is not CruNotFound.VALUE: - if not replace: - raise KeyError(f"Key {self._key_getter(v)} already exists!") - self._list.remove(v) - if self._before_add is not None: - value = self._before_add(value) - self._list.append(value) - - def set(self, value: _T) -> None: - self.add(value, True) - - def extend(self, iterable: Iterable[_T], /, replace: bool = False) -> None: - values = list(iterable) - to_remove = [] - for value in values: - v = self.get_or(self._key_getter(value)) - if v is not CruNotFound.VALUE: - if not replace: - raise KeyError(f"Key {self._key_getter(v)} already exists!") - to_remove.append(v) - for value in to_remove: - self._list.remove(value) - if self._before_add is not None: - values = [self._before_add(value) for value in values] - self._list.extend(values) - - def clear(self) -> None: - self._list.reset([]) - - def __iter__(self) -> Iterator[_T]: - return iter(self._list) - - def __len__(self) -> int: - return len(self._list) - - def cru_iter(self) -> CruIterator[_T]: - return CruIterator(self._list) diff --git a/services/manager/parsing.py b/services/manager/parsing.py deleted file mode 100644 index 0e9239d..0000000 --- a/services/manager/parsing.py +++ /dev/null @@ -1,290 +0,0 @@ -from __future__ import annotations - -from abc import ABCMeta, abstractmethod -from dataclasses import dataclass -from enum import Enum -from typing import NamedTuple, TypeAlias, TypeVar, Generic, NoReturn, Callable - -from ._error import CruException -from ._iter import CruIterable - -_T = TypeVar("_T") - - -class StrParseStream: - class MemStackEntry(NamedTuple): - pos: int - lineno: int - - class MemStackPopStr(NamedTuple): - text: str - lineno: int - - def __init__(self, text: str) -> None: - self._text = text - self._pos = 0 - self._lineno = 1 - self._length = len(self._text) - self._valid_pos_range = range(0, self.length + 1) - self._valid_offset_range = range(-self.length, self.length + 1) - self._mem_stack: CruIterable.IterList[StrParseStream.MemStackEntry] = ( - CruIterable.IterList() - ) - - @property - def text(self) -> str: - return self._text - - @property - def length(self) -> int: - return self._length - - @property - def valid_pos_range(self) -> range: - return self._valid_pos_range - - @property - def valid_offset_range(self) -> range: - return self._valid_offset_range - - @property - def pos(self) -> int: - return self._pos - - @property - def lineno(self) -> int: - return self._lineno - - @property - def eof(self) -> bool: - return self._pos == self.length - - def peek(self, length: int) -> str: - real_length = min(length, self.length - self._pos) - new_position = self._pos + real_length - text = self._text[self._pos : new_position] - return text - - def read(self, length: int) -> str: - text = self.peek(length) - self._pos += len(text) - self._lineno += text.count("\n") - return text - - def skip(self, length: int) -> None: - self.read(length) - - def peek_str(self, text: str) -> bool: - if self.pos + len(text) > self.length: - return False - for offset in range(len(text)): - if self._text[self.pos + offset] != text[offset]: - return False - return True - - def read_str(self, text: str) -> bool: - if not self.peek_str(text): - return False - self._pos += len(text) - self._lineno += text.count("\n") - return True - - @property - def mem_stack(self) -> CruIterable.IterList[MemStackEntry]: - return self._mem_stack - - def push_mem(self) -> None: - self.mem_stack.append(self.MemStackEntry(self.pos, self.lineno)) - - def pop_mem(self) -> MemStackEntry: - return self.mem_stack.pop() - - def pop_mem_str(self, strip_end: int = 0) -> MemStackPopStr: - old = self.pop_mem() - assert self.pos >= old.pos - return self.MemStackPopStr( - self._text[old.pos : self.pos - strip_end], old.lineno - ) - - -class ParseError(CruException, Generic[_T]): - def __init__( - self, - message, - parser: Parser[_T], - text: str, - line_number: int | None = None, - *args, - **kwargs, - ): - super().__init__(message, *args, **kwargs) - self._parser = parser - self._text = text - self._line_number = line_number - - @property - def parser(self) -> Parser[_T]: - return self._parser - - @property - def text(self) -> str: - return self._text - - @property - def line_number(self) -> int | None: - return self._line_number - - -class Parser(Generic[_T], metaclass=ABCMeta): - def __init__(self, name: str) -> None: - self._name = name - - @property - def name(self) -> str: - return self._name - - @abstractmethod - def parse(self, s: str) -> _T: - raise NotImplementedError() - - def raise_parse_exception( - self, text: str, line_number: int | None = None - ) -> NoReturn: - a = line_number and f" at line {line_number}" or "" - raise ParseError(f"Parser {self.name} failed{a}.", self, text, line_number) - - -class _SimpleLineVarParserEntry(NamedTuple): - key: str - value: str - line_number: int | None = None - - -class _SimpleLineVarParserResult(CruIterable.IterList[_SimpleLineVarParserEntry]): - pass - - -class SimpleLineVarParser(Parser[_SimpleLineVarParserResult]): - """ - The parsing result is a list of tuples (key, value, line number). - """ - - Entry: TypeAlias = _SimpleLineVarParserEntry - Result: TypeAlias = _SimpleLineVarParserResult - - def __init__(self) -> None: - super().__init__(type(self).__name__) - - def _parse(self, text: str, callback: Callable[[Entry], None]) -> None: - for ln, line in enumerate(text.splitlines()): - line_number = ln + 1 - # check if it's a comment - if line.strip().startswith("#"): - continue - # check if there is a '=' - if line.find("=") == -1: - self.raise_parse_exception("There is even no '='!", line_number) - # split at first '=' - key, value = line.split("=", 1) - key = key.strip() - value = value.strip() - callback(_SimpleLineVarParserEntry(key, value, line_number)) - - def parse(self, text: str) -> Result: - result = _SimpleLineVarParserResult() - self._parse(text, lambda item: result.append(item)) - return result - - -class _StrWrapperVarParserTokenKind(Enum): - TEXT = "TEXT" - VAR = "VAR" - - -@dataclass -class _StrWrapperVarParserToken: - kind: _StrWrapperVarParserTokenKind - value: str - line_number: int - - @property - def is_text(self) -> bool: - return self.kind is _StrWrapperVarParserTokenKind.TEXT - - @property - def is_var(self) -> bool: - return self.kind is _StrWrapperVarParserTokenKind.VAR - - @staticmethod - def from_mem_str( - kind: _StrWrapperVarParserTokenKind, mem_str: StrParseStream.MemStackPopStr - ) -> _StrWrapperVarParserToken: - return _StrWrapperVarParserToken(kind, mem_str.text, mem_str.lineno) - - def __repr__(self) -> str: - return f"VAR: {self.value}" if self.is_var else "TEXT: ..." - - -class _StrWrapperVarParserResult(CruIterable.IterList[_StrWrapperVarParserToken]): - pass - - -class StrWrapperVarParser(Parser[_StrWrapperVarParserResult]): - TokenKind: TypeAlias = _StrWrapperVarParserTokenKind - Token: TypeAlias = _StrWrapperVarParserToken - Result: TypeAlias = _StrWrapperVarParserResult - - def __init__(self, wrapper: str): - super().__init__(f"StrWrapperVarParser({wrapper})") - self._wrapper = wrapper - - @property - def wrapper(self) -> str: - return self._wrapper - - def parse(self, text: str) -> Result: - result = self.Result() - - class _State(Enum): - TEXT = "TEXT" - VAR = "VAR" - - state = _State.TEXT - stream = StrParseStream(text) - stream.push_mem() - - while True: - if stream.eof: - break - - if stream.read_str(self.wrapper): - if state is _State.TEXT: - result.append( - self.Token.from_mem_str( - self.TokenKind.TEXT, stream.pop_mem_str(len(self.wrapper)) - ) - ) - state = _State.VAR - stream.push_mem() - else: - result.append( - self.Token.from_mem_str( - self.TokenKind.VAR, - stream.pop_mem_str(len(self.wrapper)), - ) - ) - state = _State.TEXT - stream.push_mem() - - continue - - stream.skip(1) - - if state is _State.VAR: - raise ParseError("Text ended without closing variable.", self, text) - - mem_str = stream.pop_mem_str() - if len(mem_str.text) != 0: - result.append(self.Token.from_mem_str(self.TokenKind.TEXT, mem_str)) - - return result diff --git a/services/manager/service/__init__.py b/services/manager/service/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/services/manager/service/__main__.py b/services/manager/service/__main__.py deleted file mode 100644 index 6ea0a8a..0000000 --- a/services/manager/service/__main__.py +++ /dev/null @@ -1,27 +0,0 @@ -import sys - -from manager import CruException - -from ._app import create_app - - -def main(): - app = create_app() - app.run_command() - - -if __name__ == "__main__": - version_info = sys.version_info - if not (version_info.major == 3 and version_info.minor >= 11): - print("This application requires Python 3.11 or later.", file=sys.stderr) - sys.exit(1) - - try: - main() - except CruException as e: - user_message = e.get_user_message() - if user_message is not None: - print(f"Error: {user_message}") - exit(1) - else: - raise diff --git a/services/manager/service/_app.py b/services/manager/service/_app.py deleted file mode 100644 index 2304340..0000000 --- a/services/manager/service/_app.py +++ /dev/null @@ -1,30 +0,0 @@ -from ._base import ( - AppBase, - CommandDispatcher, - PathCommandProvider, -) -from ._template import TemplateManager -from ._nginx import NginxManager -from ._external import CliToolCommandProvider - -APP_ID = "crupest" - - -class App(AppBase): - def __init__(self): - super().__init__(APP_ID, f"{APP_ID}-service") - self.add_feature(PathCommandProvider()) - self.add_feature(TemplateManager()) - self.add_feature(NginxManager()) - self.add_feature(CliToolCommandProvider()) - self.add_feature(CommandDispatcher()) - - def run_command(self): - command_dispatcher = self.get_feature(CommandDispatcher) - command_dispatcher.run_command() - - -def create_app() -> App: - app = App() - app.setup() - return app diff --git a/services/manager/service/_base.py b/services/manager/service/_base.py deleted file mode 100644 index 783296c..0000000 --- a/services/manager/service/_base.py +++ /dev/null @@ -1,398 +0,0 @@ -from __future__ import annotations - -from argparse import ArgumentParser, Namespace -from abc import ABC, abstractmethod -import argparse -import os -from pathlib import Path -from typing import TypeVar, overload - -from manager import CruException, CruLogicError - -_Feature = TypeVar("_Feature", bound="AppFeatureProvider") - - -class AppError(CruException): - pass - - -class AppFeatureError(AppError): - def __init__(self, message, feature: type | str, *args, **kwargs): - super().__init__(message, *args, **kwargs) - self._feature = feature - - @property - def feature(self) -> type | str: - return self._feature - - -class AppPathError(CruException): - def __init__(self, message, _path: str | Path, *args, **kwargs): - super().__init__(message, *args, **kwargs) - self._path = str(_path) - - @property - def path(self) -> str: - return self._path - - -class AppPath(ABC): - def __init__(self, id: str, is_dir: bool, description: str) -> None: - self._is_dir = is_dir - self._id = id - self._description = description - - @property - @abstractmethod - def parent(self) -> AppPath | None: ... - - @property - @abstractmethod - def app(self) -> AppBase: ... - - @property - def id(self) -> str: - return self._id - - @property - def description(self) -> str: - return self._description - - @property - def is_dir(self) -> bool: - return self._is_dir - - @property - @abstractmethod - def full_path(self) -> Path: ... - - @property - def full_path_str(self) -> str: - return str(self.full_path) - - def check_parents(self, must_exist: bool = False) -> bool: - for p in reversed(self.full_path.parents): - if not p.exists() and not must_exist: - return False - if not p.is_dir(): - raise AppPathError("Parents' path must be a dir.", self.full_path) - return True - - def check_self(self, must_exist: bool = False) -> bool: - if not self.check_parents(must_exist): - return False - if not self.full_path.exists(): - if not must_exist: - return False - raise AppPathError("Not exist.", self.full_path) - if self.is_dir: - if not self.full_path.is_dir(): - raise AppPathError("Should be a directory, but not.", self.full_path) - else: - return True - else: - if not self.full_path.is_file(): - raise AppPathError("Should be a file, but not.", self.full_path) - else: - return True - - def ensure(self, create_file: bool = False) -> None: - e = self.check_self(False) - if not e: - os.makedirs(self.full_path.parent, exist_ok=True) - if self.is_dir: - os.mkdir(self.full_path) - elif create_file: - with open(self.full_path, "w") as f: - f.write("") - - def read_text(self) -> str: - if self.is_dir: - raise AppPathError("Can't read text of a dir.", self.full_path) - self.check_self() - return self.full_path.read_text() - - def add_subpath( - self, - name: str, - is_dir: bool, - /, - id: str | None = None, - description: str = "", - ) -> AppFeaturePath: - return self.app._add_path(name, is_dir, self, id, description) - - @property - def app_relative_path(self) -> Path: - return self.full_path.relative_to(self.app.root.full_path) - - -class AppFeaturePath(AppPath): - def __init__( - self, - parent: AppPath, - name: str, - is_dir: bool, - /, - id: str | None = None, - description: str = "", - ) -> None: - super().__init__(id or name, is_dir, description) - self._name = name - self._parent = parent - - @property - def name(self) -> str: - return self._name - - @property - def parent(self) -> AppPath: - return self._parent - - @property - def app(self) -> AppBase: - return self.parent.app - - @property - def full_path(self) -> Path: - return Path(self.parent.full_path, self.name).resolve() - - -class AppRootPath(AppPath): - def __init__(self, app: AppBase, path: Path): - super().__init__(f"/{id}", True, f"Application {id} path.") - self._app = app - self._full_path = path.resolve() - - @property - def parent(self) -> None: - return None - - @property - def app(self) -> AppBase: - return self._app - - @property - def full_path(self) -> Path: - return self._full_path - - -class AppFeatureProvider(ABC): - def __init__(self, name: str, /, app: AppBase | None = None): - super().__init__() - self._name = name - self._app = app if app else AppBase.get_instance() - - @property - def app(self) -> AppBase: - return self._app - - @property - def name(self) -> str: - return self._name - - @abstractmethod - def setup(self) -> None: ... - - -class AppCommandFeatureProvider(AppFeatureProvider): - @abstractmethod - def get_command_info(self) -> tuple[str, str]: ... - - @abstractmethod - def setup_arg_parser(self, arg_parser: ArgumentParser): ... - - @abstractmethod - def run_command(self, args: Namespace) -> None: ... - - -class PathCommandProvider(AppCommandFeatureProvider): - def __init__(self) -> None: - super().__init__("path-command-provider") - - def setup(self): - pass - - def get_command_info(self): - return ("path", "Get information about paths used by app.") - - def setup_arg_parser(self, arg_parser: ArgumentParser) -> None: - subparsers = arg_parser.add_subparsers( - dest="path_command", required=True, metavar="PATH_COMMAND" - ) - _list_parser = subparsers.add_parser( - "list", help="list special paths used by app" - ) - - def run_command(self, args: Namespace) -> None: - if args.path_command == "list": - for path in self.app.paths: - print(f"{path.app_relative_path.as_posix()}: {path.description}") - - -class CommandDispatcher(AppFeatureProvider): - def __init__(self) -> None: - super().__init__("command-dispatcher") - - def setup_arg_parser(self) -> None: - self._map: dict[str, AppCommandFeatureProvider] = {} - arg_parser = argparse.ArgumentParser( - description="Service management", - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - subparsers = arg_parser.add_subparsers( - dest="command", - help="The management command to execute.", - metavar="COMMAND", - ) - for feature in self.app.features: - if isinstance(feature, AppCommandFeatureProvider): - info = feature.get_command_info() - command_subparser = subparsers.add_parser(info[0], help=info[1]) - feature.setup_arg_parser(command_subparser) - self._map[info[0]] = feature - self._arg_parser = arg_parser - - def setup(self): - self._parsed_args = self.arg_parser.parse_args() - - @property - def arg_parser(self) -> argparse.ArgumentParser: - return self._arg_parser - - @property - def command_map(self) -> dict[str, AppCommandFeatureProvider]: - return self._map - - @property - def program_args(self) -> argparse.Namespace: - return self._parsed_args - - def run_command(self) -> None: - args = self.program_args - if args.command is None: - self.arg_parser.print_help() - return - self.command_map[args.command].run_command(args) - - -class AppBase: - _instance: AppBase | None = None - - @staticmethod - def get_instance() -> AppBase: - if AppBase._instance is None: - raise AppError("App instance not initialized") - return AppBase._instance - - def __init__(self, app_id: str, name: str): - AppBase._instance = self - self._app_id = app_id - self._name = name - self._features: list[AppFeatureProvider] = [] - self._paths: list[AppFeaturePath] = [] - - def setup(self) -> None: - command_dispatcher = self.get_feature(CommandDispatcher) - command_dispatcher.setup_arg_parser() - self._root = AppRootPath(self, Path(self._ensure_env("CRUPEST_PROJECT_DIR"))) - self._data_dir = self._root.add_subpath( - self._ensure_env("CRUPEST_DATA_DIR"), True, id="data" - ) - self._services_dir = self._root.add_subpath( - self._ensure_env("CRUPEST_SERVICES_DIR"), True, id="CRUPEST_SERVICES_DIR" - ) - for feature in self.features: - feature.setup() - for path in self.paths: - path.check_self() - - @property - def app_id(self) -> str: - return self._app_id - - @property - def name(self) -> str: - return self._name - - def _ensure_env(self, env_name: str) -> str: - value = os.getenv(env_name) - if value is None: - raise AppError(f"Environment variable {env_name} not set") - return value - - @property - def root(self) -> AppRootPath: - return self._root - - @property - def data_dir(self) -> AppFeaturePath: - return self._data_dir - - @property - def services_dir(self) -> AppFeaturePath: - return self._services_dir - - @property - def app_initialized(self) -> bool: - return self.data_dir.check_self() - - @property - def features(self) -> list[AppFeatureProvider]: - return self._features - - @property - def paths(self) -> list[AppFeaturePath]: - return self._paths - - def add_feature(self, feature: _Feature) -> _Feature: - for f in self.features: - if f.name == feature.name: - raise AppFeatureError( - f"Duplicate feature name: {feature.name}.", feature.name - ) - self._features.append(feature) - return feature - - def _add_path( - self, - name: str, - is_dir: bool, - /, - parent: AppPath | None = None, - id: str | None = None, - description: str = "", - ) -> AppFeaturePath: - p = AppFeaturePath( - parent or self.root, name, is_dir, id=id, description=description - ) - self._paths.append(p) - return p - - @overload - def get_feature(self, feature: str) -> AppFeatureProvider: ... - - @overload - def get_feature(self, feature: type[_Feature]) -> _Feature: ... - - def get_feature( - self, feature: str | type[_Feature] - ) -> AppFeatureProvider | _Feature: - if isinstance(feature, str): - for f in self._features: - if f.name == feature: - return f - elif isinstance(feature, type): - for f in self._features: - if isinstance(f, feature): - return f - else: - raise CruLogicError("Argument must be the name of feature or its class.") - - raise AppFeatureError(f"Feature {feature} not found.", feature) - - def get_path(self, name: str) -> AppFeaturePath: - for p in self._paths: - if p.id == name or p.name == name: - return p - raise AppPathError(f"Application path {name} not found.", name) diff --git a/services/manager/service/_external.py b/services/manager/service/_external.py deleted file mode 100644 index 2347e95..0000000 --- a/services/manager/service/_external.py +++ /dev/null @@ -1,81 +0,0 @@ -from ._base import AppCommandFeatureProvider -from ._nginx import NginxManager - - -class CliToolCommandProvider(AppCommandFeatureProvider): - def __init__(self) -> None: - super().__init__("cli-tool-command-provider") - - def setup(self): - pass - - def get_command_info(self): - return ("gen-cli", "Get commands of running external cli tools.") - - def setup_arg_parser(self, arg_parser): - subparsers = arg_parser.add_subparsers( - dest="gen_cli_command", required=True, metavar="GEN_CLI_COMMAND" - ) - certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") - certbot_parser.add_argument( - "-t", "--test", action="store_true", help="run certbot in test mode" - ) - _install_docker_parser = subparsers.add_parser( - "install-docker", help="print docker installation commands" - ) - _update_blog_parser = subparsers.add_parser( - "update-blog", help="print blog update command" - ) - - def _print_install_docker_commands(self) -> None: - output = """ -### COMMAND: uninstall apt docker -for pkg in docker.io docker-doc docker-compose \ -podman-docker containerd runc; \ -do sudo apt-get remove $pkg; done - -### COMMAND: prepare apt certs -sudo apt-get update -sudo apt-get install ca-certificates curl -sudo install -m 0755 -d /etc/apt/keyrings - -### COMMAND: install certs -sudo curl -fsSL https://download.docker.com/linux/debian/gpg \ --o /etc/apt/keyrings/docker.asc -sudo chmod a+r /etc/apt/keyrings/docker.asc - -### COMMAND: add docker apt source -echo \\ - "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \ -https://download.docker.com/linux/debian \\ - $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\ - sudo tee /etc/apt/sources.list.d/docker.list > /dev/null - -### COMMAND: update apt and install docker -sudo apt-get update -sudo apt-get install docker-ce docker-ce-cli containerd.io \ -docker-buildx-plugin docker-compose-plugin - -### COMMAND: setup system for docker -sudo systemctl enable docker -sudo systemctl start docker -sudo groupadd -f docker -sudo usermod -aG docker $USER -# Remember to log out and log back in for the group changes to take effect -""".strip() - print(output) - - def _print_update_blog_command(self): - output = """ -### COMMAND: update blog -docker exec -it blog /scripts/update.bash -""".strip() - print(output) - - def run_command(self, args): - if args.gen_cli_command == "certbot": - self.app.get_feature(NginxManager).print_all_certbot_commands(args.test) - elif args.gen_cli_command == "install-docker": - self._print_install_docker_commands() - elif args.gen_cli_command == "update-blog": - self._print_update_blog_command() \ No newline at end of file diff --git a/services/manager/service/_nginx.py b/services/manager/service/_nginx.py deleted file mode 100644 index 5dfc3ab..0000000 --- a/services/manager/service/_nginx.py +++ /dev/null @@ -1,263 +0,0 @@ -from argparse import Namespace -from enum import Enum, auto -import re -import subprocess -from typing import TypeAlias - -from manager import CruInternalError - -from ._base import AppCommandFeatureProvider -from ._template import TemplateManager - - -class CertbotAction(Enum): - CREATE = auto() - EXPAND = auto() - SHRINK = auto() - RENEW = auto() - - -class NginxManager(AppCommandFeatureProvider): - CertbotAction: TypeAlias = CertbotAction - - def __init__(self) -> None: - super().__init__("nginx-manager") - self._domains_cache: list[str] | None = None - - def setup(self) -> None: - pass - - @property - def _template_manager(self) -> TemplateManager: - return self.app.get_feature(TemplateManager) - - @property - def root_domain(self) -> str: - return self._template_manager.get_domain() - - @property - def domains(self) -> list[str]: - if self._domains_cache is None: - self._domains_cache = self._get_domains() - return self._domains_cache - - @property - def subdomains(self) -> list[str]: - suffix = "." + self.root_domain - return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)] - - def _get_domains_from_text(self, text: str) -> set[str]: - domains: set[str] = set() - regex = re.compile(r"server_name\s+(\S+)\s*;") - for match in regex.finditer(text): - domains.add(match[1]) - return domains - - def _join_generated_nginx_conf_text(self) -> str: - result = "" - for path, text in self._template_manager.generate(): - if path.parents[-1] == "nginx": - result += text - return result - - def _get_domains(self) -> list[str]: - text = self._join_generated_nginx_conf_text() - domains = self._get_domains_from_text(text) - domains.remove(self.root_domain) - return [self.root_domain, *domains] - - def _print_domains(self) -> None: - for domain in self.domains: - print(domain) - - def _certbot_command( - self, - action: CertbotAction | str, - test: bool, - *, - docker=True, - standalone=None, - email=None, - agree_tos=True, - ) -> str: - if isinstance(action, str): - action = CertbotAction[action.upper()] - - command_args = [] - - add_domain_option = True - if action is CertbotAction.CREATE: - if standalone is None: - standalone = True - command_action = "certonly" - elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]: - if standalone is None: - standalone = False - command_action = "certonly" - elif action is CertbotAction.RENEW: - if standalone is None: - standalone = False - add_domain_option = False - command_action = "renew" - else: - raise CruInternalError("Invalid certbot action.") - - data_dir = self.app.data_dir.full_path.as_posix() - - if not docker: - command_args.append("certbot") - else: - command_args.extend( - [ - "docker run -it --rm --name certbot", - f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"', - f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"', - ] - ) - if standalone: - command_args.append('-p "0.0.0.0:80:80"') - else: - command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"') - - command_args.append("certbot/certbot") - - command_args.append(command_action) - - command_args.append(f"--cert-name {self.root_domain}") - - if standalone: - command_args.append("--standalone") - else: - command_args.append("--webroot -w /var/www/certbot") - - if add_domain_option: - command_args.append(" ".join([f"-d {domain}" for domain in self.domains])) - - if email is not None: - command_args.append(f"--email {email}") - - if agree_tos: - command_args.append("--agree-tos") - - if test: - command_args.append("--test-cert --dry-run") - - return " ".join(command_args) - - def print_all_certbot_commands(self, test: bool): - print("### COMMAND: (standalone) create certs") - print( - self._certbot_command( - CertbotAction.CREATE, - test, - email=self._template_manager.get_email(), - ) - ) - print() - print("### COMMAND: (webroot+nginx) expand or shrink certs") - print( - self._certbot_command( - CertbotAction.EXPAND, - test, - email=self._template_manager.get_email(), - ) - ) - print() - print("### COMMAND: (webroot+nginx) renew certs") - print( - self._certbot_command( - CertbotAction.RENEW, - test, - email=self._template_manager.get_email(), - ) - ) - - @property - def _cert_path_str(self) -> str: - return str( - self.app.data_dir.full_path - / "certbot/certs/live" - / self.root_domain - / "fullchain.pem" - ) - - def get_command_info(self): - return "nginx", "Manage nginx related things." - - def setup_arg_parser(self, arg_parser): - subparsers = arg_parser.add_subparsers( - dest="nginx_command", required=True, metavar="NGINX_COMMAND" - ) - _list_parser = subparsers.add_parser("list", help="list domains") - certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") - certbot_parser.add_argument( - "--no-test", - action="store_true", - help="remove args making certbot run in test mode", - ) - - def run_command(self, args: Namespace) -> None: - if args.nginx_command == "list": - self._print_domains() - elif args.nginx_command == "certbot": - self.print_all_certbot_commands(not args.no_test) - - def _generate_dns_zone( - self, - ip: str, - /, - ttl: str | int = 600, - *, - enable_mail: bool = True, - dkim: str | None = None, - ) -> str: - # TODO: Not complete and test now. - root_domain = self.root_domain - result = f"$ORIGIN {root_domain}.\n\n" - result += "; A records\n" - result += f"@ {ttl} IN A {ip}\n" - for subdomain in self.subdomains: - result += f"{subdomain} {ttl} IN A {ip}\n" - - if enable_mail: - result += "\n; MX records\n" - result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n" - result += "\n; SPF record\n" - result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n' - if dkim is not None: - result += "\n; DKIM record\n" - result += f'mail._domainkey {ttl} IN TEXT "{dkim}"' - result += "\n; DMARC record\n" - dmarc_options = [ - "v=DMARC1", - "p=none", - f"rua=mailto:dmarc.report@{root_domain}", - f"ruf=mailto:dmarc.report@{root_domain}", - "sp=none", - "ri=86400", - ] - result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n' - return result - - def _get_dkim_from_mailserver(self) -> str | None: - # TODO: Not complete and test now. - dkim_path = ( - self.app.data_dir.full_path - / "dms/config/opendkim/keys" - / self.root_domain - / "mail.txt" - ) - if not dkim_path.exists(): - return None - - p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True) - value = "" - for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")): - value += match.group(1) - return value - - def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str: - # TODO: Not complete and test now. - return self._generate_dns_zone( - ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver() - ) diff --git a/services/manager/service/_template.py b/services/manager/service/_template.py deleted file mode 100644 index 90c19ec..0000000 --- a/services/manager/service/_template.py +++ /dev/null @@ -1,228 +0,0 @@ -from argparse import Namespace -from pathlib import Path -import shutil -from typing import NamedTuple -import graphlib - -from manager import CruException -from manager.parsing import SimpleLineVarParser -from manager.template import TemplateTree, CruStrWrapperTemplate - -from ._base import AppCommandFeatureProvider, AppFeaturePath - - -class _Config(NamedTuple): - text: str - config: dict[str, str] - - -class _GeneratedConfig(NamedTuple): - base: _Config - private: _Config - merged: _Config - - -class _PreConfig(NamedTuple): - base: _Config - private: _Config - config: dict[str, str] - - @staticmethod - def create(base: _Config, private: _Config) -> "_PreConfig": - return _PreConfig(base, private, {**base.config, **private.config}) - - def _merge(self, generated: _Config): - text = ( - "\n".join( - [ - self.private.text.strip(), - self.base.text.strip(), - generated.text.strip(), - ] - ) - + "\n" - ) - config = {**self.config, **generated.config} - return _GeneratedConfig(self.base, self.private, _Config(text, config)) - - -class _Template(NamedTuple): - config: CruStrWrapperTemplate - config_vars: set[str] - tree: TemplateTree - - -class TemplateManager(AppCommandFeatureProvider): - def __init__(self): - super().__init__("template-manager") - - def setup(self) -> None: - self._base_config_file = self.app.services_dir.add_subpath("base-config", False) - self._private_config_file = self.app.data_dir.add_subpath("config", False) - self._template_config_file = self.app.services_dir.add_subpath( - "config.template", False - ) - self._templates_dir = self.app.services_dir.add_subpath("templates", True) - self._generated_dir = self.app.services_dir.add_subpath("generated", True) - - self._config_parser = SimpleLineVarParser() - - def _read_pre(app_path: AppFeaturePath) -> _Config: - text = app_path.read_text() - config = self._read_config(text) - return _Config(text, config) - - base = _read_pre(self._base_config_file) - private = _read_pre(self._private_config_file) - self._preconfig = _PreConfig.create(base, private) - - self._generated: _GeneratedConfig | None = None - - template_config_text = self._template_config_file.read_text() - self._template_config = self._read_config(template_config_text) - - self._template = _Template( - CruStrWrapperTemplate(template_config_text), - set(self._template_config.keys()), - TemplateTree( - lambda text: CruStrWrapperTemplate(text), - self.templates_dir.full_path_str, - ), - ) - - self._real_required_vars = ( - self._template.config_vars | self._template.tree.variables - ) - self._template.config_vars - lacks = self._real_required_vars - self._preconfig.config.keys() - self._lack_vars = lacks if len(lacks) > 0 else None - - def _read_config_entry_names(self, text: str) -> set[str]: - return set(entry.key for entry in self._config_parser.parse(text)) - - def _read_config(self, text: str) -> dict[str, str]: - return {entry.key: entry.value for entry in self._config_parser.parse(text)} - - @property - def templates_dir(self) -> AppFeaturePath: - return self._templates_dir - - @property - def generated_dir(self) -> AppFeaturePath: - return self._generated_dir - - def get_domain(self) -> str: - return self._preconfig.config["CRUPEST_DOMAIN"] - - def get_email(self) -> str: - return self._preconfig.config["CRUPEST_EMAIL"] - - def _generate_template_config(self, config: dict[str, str]) -> dict[str, str]: - entry_templates = { - key: CruStrWrapperTemplate(value) - for key, value in self._template_config.items() - } - sorter = graphlib.TopologicalSorter( - config - | {key: template.variables for key, template in entry_templates.items()} - ) - - vars: dict[str, str] = config.copy() - for _ in sorter.static_order(): - del_keys = [] - for key, template in entry_templates.items(): - new = template.generate_partial(vars) - if not new.has_variables: - vars[key] = new.generate({}) - del_keys.append(key) - else: - entry_templates[key] = new - for key in del_keys: - del entry_templates[key] - assert len(entry_templates) == 0 - return {key: value for key, value in vars.items() if key not in config} - - def _generate_config(self) -> _GeneratedConfig: - if self._generated is not None: - return self._generated - if self._lack_vars is not None: - raise CruException(f"Required vars are not defined: {self._lack_vars}.") - config = self._generate_template_config(self._preconfig.config) - text = self._template.config.generate(self._preconfig.config | config) - self._generated = self._preconfig._merge(_Config(text, config)) - return self._generated - - def generate(self) -> list[tuple[Path, str]]: - config = self._generate_config() - return [ - (Path("config"), config.merged.text), - *self._template.tree.generate(config.merged.config), - ] - - def _generate_files(self, dry_run: bool) -> None: - result = self.generate() - if not dry_run: - if self.generated_dir.full_path.exists(): - shutil.rmtree(self.generated_dir.full_path) - for path, text in result: - des = self.generated_dir.full_path / path - des.parent.mkdir(parents=True, exist_ok=True) - with open(des, "w") as f: - f.write(text) - - def get_command_info(self): - return ("template", "Manage templates.") - - def _print_file_lists(self) -> None: - print(f"[{self._template.config.variable_count}]", "config") - for path, template in self._template.tree.templates: - print(f"[{template.variable_count}]", path.as_posix()) - - def _print_vars(self, required: bool) -> None: - for var in self._template.config.variables: - print(f"[config] {var}") - for var in self._template.tree.variables: - if not (required and var in self._template.config_vars): - print(f"[template] {var}") - - def _run_check_vars(self) -> None: - if self._lack_vars is not None: - print("Lacks:") - for var in self._lack_vars: - print(var) - - def setup_arg_parser(self, arg_parser): - subparsers = arg_parser.add_subparsers( - dest="template_command", required=True, metavar="TEMPLATE_COMMAND" - ) - _list_parser = subparsers.add_parser("list", help="list templates") - vars_parser = subparsers.add_parser( - "vars", help="list variables used in all templates" - ) - vars_parser.add_argument( - "-r", - "--required", - help="only list really required one.", - action="store_true", - ) - _check_vars_parser = subparsers.add_parser( - "check-vars", - help="check if required vars are set", - ) - generate_parser = subparsers.add_parser("generate", help="generate templates") - generate_parser.add_argument( - "--no-dry-run", action="store_true", help="generate and write target files" - ) - - def run_command(self, args: Namespace) -> None: - if args.template_command == "list": - self._print_file_lists() - elif args.template_command == "vars": - self._print_vars(args.required) - elif args.template_command == "generate": - dry_run = not args.no_dry_run - self._generate_files(dry_run) - if dry_run: - print("Dry run successfully.") - print( - f"Will delete dir {self.generated_dir.full_path_str} if it exists." - ) diff --git a/services/manager/system.py b/services/manager/system.py deleted file mode 100644 index f321717..0000000 --- a/services/manager/system.py +++ /dev/null @@ -1,23 +0,0 @@ -import os.path -import re - - -def check_debian_derivative_version(name: str) -> None | str: - if not os.path.isfile("/etc/os-release"): - return None - with open("/etc/os-release", "r") as f: - content = f.read() - if f"ID={name}" not in content: - return None - m = re.search(r'VERSION_ID="(.+)"', content) - if m is None: - return None - return m.group(1) - - -def check_ubuntu_version() -> None | str: - return check_debian_derivative_version("ubuntu") - - -def check_debian_version() -> None | str: - return check_debian_derivative_version("debian") diff --git a/services/manager/template.py b/services/manager/template.py deleted file mode 100644 index 3a70337..0000000 --- a/services/manager/template.py +++ /dev/null @@ -1,209 +0,0 @@ -from abc import ABCMeta, abstractmethod -from collections.abc import Callable, Mapping -from pathlib import Path -from string import Template -from typing import Generic, Self, TypeVar - -from ._iter import CruIterator -from ._error import CruException - -from .parsing import StrWrapperVarParser - - -class CruTemplateError(CruException): - pass - - -class CruTemplateBase(metaclass=ABCMeta): - def __init__(self, text: str): - self._text = text - self._variables: set[str] | None = None - - @abstractmethod - def _get_variables(self) -> set[str]: - raise NotImplementedError() - - @property - def text(self) -> str: - return self._text - - @property - def variables(self) -> set[str]: - if self._variables is None: - self._variables = self._get_variables() - return self._variables - - @property - def variable_count(self) -> int: - return len(self.variables) - - @property - def has_variables(self) -> bool: - return self.variable_count > 0 - - @abstractmethod - def _do_generate(self, mapping: dict[str, str]) -> str: - raise NotImplementedError() - - def _generate_partial( - self, mapping: Mapping[str, str], allow_unused: bool = True - ) -> str: - values = dict(mapping) - if not allow_unused and not len(set(values.keys() - self.variables)) != 0: - raise CruTemplateError("Unused variables.") - return self._do_generate(values) - - def generate_partial( - self, mapping: Mapping[str, str], allow_unused: bool = True - ) -> Self: - return self.__class__(self._generate_partial(mapping, allow_unused)) - - def generate(self, mapping: Mapping[str, str], allow_unused: bool = True) -> str: - values = dict(mapping) - if len(self.variables - values.keys()) != 0: - raise CruTemplateError( - f"Missing variables: {self.variables - values.keys()} ." - ) - return self._generate_partial(values, allow_unused) - - -class CruTemplate(CruTemplateBase): - def __init__(self, prefix: str, text: str): - super().__init__(text) - self._prefix = prefix - self._template = Template(text) - - def _get_variables(self) -> set[str]: - return ( - CruIterator(self._template.get_identifiers()) - .filter(lambda i: i.startswith(self.prefix)) - .to_set() - ) - - @property - def prefix(self) -> str: - return self._prefix - - @property - def py_template(self) -> Template: - return self._template - - @property - def all_variables(self) -> set[str]: - return set(self._template.get_identifiers()) - - def _do_generate(self, mapping: dict[str, str]) -> str: - return self._template.safe_substitute(mapping) - - -class CruStrWrapperTemplate(CruTemplateBase): - def __init__(self, text: str, wrapper: str = "@@"): - super().__init__(text) - self._wrapper = wrapper - self._tokens: StrWrapperVarParser.Result - - @property - def wrapper(self) -> str: - return self._wrapper - - def _get_variables(self): - self._tokens = StrWrapperVarParser(self.wrapper).parse(self.text) - return ( - self._tokens.cru_iter() - .filter(lambda t: t.is_var) - .map(lambda t: t.value) - .to_set() - ) - - def _do_generate(self, mapping): - return ( - self._tokens.cru_iter() - .map(lambda t: mapping[t.value] if t.is_var else t.value) - .join_str("") - ) - - -_Template = TypeVar("_Template", bound=CruTemplateBase) - - -class TemplateTree(Generic[_Template]): - def __init__( - self, - template_generator: Callable[[str], _Template], - source: str, - *, - template_file_suffix: str | None = ".template", - ): - """ - If template_file_suffix is not None, the files will be checked according to the - suffix of the file name. If the suffix matches, the file will be regarded as a - template file. Otherwise, it will be regarded as a non-template file. - Content of template file must contain variables that need to be replaced, while - content of non-template file may not contain any variables. - If either case is false, it generally means whether the file is a template is - wrongly handled. - """ - self._template_generator = template_generator - self._files: list[tuple[Path, _Template]] = [] - self._source = source - self._template_file_suffix = template_file_suffix - self._load() - - @property - def templates(self) -> list[tuple[Path, _Template]]: - return self._files - - @property - def source(self) -> str: - return self._source - - @property - def template_file_suffix(self) -> str | None: - return self._template_file_suffix - - @staticmethod - def _scan_files(root: str) -> list[Path]: - root_path = Path(root) - result: list[Path] = [] - for path in root_path.glob("**/*"): - if not path.is_file(): - continue - path = path.relative_to(root_path) - result.append(Path(path)) - return result - - def _load(self) -> None: - files = self._scan_files(self.source) - for file_path in files: - template_file = Path(self.source) / file_path - with open(template_file, "r") as f: - content = f.read() - template = self._template_generator(content) - if self.template_file_suffix is not None: - should_be_template = file_path.name.endswith(self.template_file_suffix) - if should_be_template and not template.has_variables: - raise CruTemplateError( - f"Template file {file_path} has no variables." - ) - elif not should_be_template and template.has_variables: - raise CruTemplateError(f"Non-template {file_path} has variables.") - self._files.append((file_path, template)) - - @property - def variables(self) -> set[str]: - s = set() - for _, template in self.templates: - s.update(template.variables) - return s - - def generate(self, variables: Mapping[str, str]) -> list[tuple[Path, str]]: - result: list[tuple[Path, str]] = [] - for path, template in self.templates: - if self.template_file_suffix is not None and path.name.endswith( - self.template_file_suffix - ): - path = path.parent / (path.name[: -len(self.template_file_suffix)]) - - text = template.generate(variables) - result.append((path, text)) - return result diff --git a/services/manager/tool.py b/services/manager/tool.py deleted file mode 100644 index 377f5d7..0000000 --- a/services/manager/tool.py +++ /dev/null @@ -1,82 +0,0 @@ -import shutil -import subprocess -from typing import Any -from collections.abc import Iterable - -from ._error import CruException - - -class CruExternalToolError(CruException): - def __init__(self, message: str, tool: str, *args, **kwargs) -> None: - super().__init__(message, *args, **kwargs) - self._tool = tool - - @property - def tool(self) -> str: - return self._tool - - -class CruExternalToolNotFoundError(CruExternalToolError): - def __init__(self, message: str | None, tool: str, *args, **kwargs) -> None: - super().__init__( - message or f"Could not find binary for {tool}.", tool, *args, **kwargs - ) - - -class CruExternalToolRunError(CruExternalToolError): - def __init__( - self, - message: str, - tool: str, - tool_args: Iterable[str], - tool_error: Any, - *args, - **kwargs, - ) -> None: - super().__init__(message, tool, *args, **kwargs) - self._tool_args = list(tool_args) - self._tool_error = tool_error - - @property - def tool_args(self) -> list[str]: - return self._tool_args - - @property - def tool_error(self) -> Any: - return self._tool_error - - -class ExternalTool: - def __init__(self, bin: str) -> None: - self._bin = bin - - @property - def bin(self) -> str: - return self._bin - - @bin.setter - def bin(self, value: str) -> None: - self._bin = value - - @property - def bin_path(self) -> str: - real_bin = shutil.which(self.bin) - if not real_bin: - raise CruExternalToolNotFoundError(None, self.bin) - return real_bin - - def run( - self, *process_args: str, **subprocess_kwargs - ) -> subprocess.CompletedProcess: - try: - return subprocess.run( - [self.bin_path] + list(process_args), **subprocess_kwargs - ) - except subprocess.CalledProcessError as e: - raise CruExternalToolError("Subprocess failed.", self.bin) from e - except OSError as e: - raise CruExternalToolError("Failed to start subprocess", self.bin) from e - - def run_get_output(self, *process_args: str, **subprocess_kwargs) -> Any: - process = self.run(*process_args, capture_output=True, **subprocess_kwargs) - return process.stdout diff --git a/services/manager/value.py b/services/manager/value.py deleted file mode 100644 index 9c03219..0000000 --- a/services/manager/value.py +++ /dev/null @@ -1,292 +0,0 @@ -from __future__ import annotations - -import random -import secrets -import string -import uuid -from abc import abstractmethod, ABCMeta -from collections.abc import Callable -from typing import Any, ClassVar, TypeVar, Generic - -from ._error import CruException - - -def _str_case_in(s: str, case: bool, str_list: list[str]) -> bool: - if case: - return s in str_list - else: - return s.lower() in [s.lower() for s in str_list] - - -_T = TypeVar("_T") - - -class CruValueTypeError(CruException): - def __init__( - self, - message: str, - value: Any, - value_type: ValueType | None, - *args, - **kwargs, - ): - super().__init__( - message, - *args, - **kwargs, - ) - self._value = value - self._value_type = value_type - - @property - def value(self) -> Any: - return self._value - - @property - def value_type(self) -> ValueType | None: - return self._value_type - - -class ValueType(Generic[_T], metaclass=ABCMeta): - def __init__(self, name: str, _type: type[_T]) -> None: - self._name = name - self._type = _type - - @property - def name(self) -> str: - return self._name - - @property - def type(self) -> type[_T]: - return self._type - - def check_value_type(self, value: Any) -> None: - if not isinstance(value, self.type): - raise CruValueTypeError("Type of value is wrong.", value, self) - - def _do_check_value(self, value: Any) -> _T: - return value - - def check_value(self, value: Any) -> _T: - self.check_value_type(value) - return self._do_check_value(value) - - @abstractmethod - def _do_check_str_format(self, s: str) -> None: - raise NotImplementedError() - - def check_str_format(self, s: str) -> None: - if not isinstance(s, str): - raise CruValueTypeError("Try to check format on a non-str.", s, self) - self._do_check_str_format(s) - - @abstractmethod - def _do_convert_value_to_str(self, value: _T) -> str: - raise NotImplementedError() - - def convert_value_to_str(self, value: _T) -> str: - self.check_value(value) - return self._do_convert_value_to_str(value) - - @abstractmethod - def _do_convert_str_to_value(self, s: str) -> _T: - raise NotImplementedError() - - def convert_str_to_value(self, s: str) -> _T: - self.check_str_format(s) - return self._do_convert_str_to_value(s) - - def check_value_or_try_convert_from_str(self, value_or_str: Any) -> _T: - try: - return self.check_value(value_or_str) - except CruValueTypeError: - if isinstance(value_or_str, str): - return self.convert_str_to_value(value_or_str) - else: - raise - - def create_default_value(self) -> _T: - return self.type() - - -class TextValueType(ValueType[str]): - def __init__(self) -> None: - super().__init__("text", str) - - def _do_check_str_format(self, _s): - return - - def _do_convert_value_to_str(self, value): - return value - - def _do_convert_str_to_value(self, s): - return s - - -class IntegerValueType(ValueType[int]): - def __init__(self) -> None: - super().__init__("integer", int) - - def _do_check_str_format(self, s): - try: - int(s) - except ValueError as e: - raise CruValueTypeError("Invalid integer format.", s, self) from e - - def _do_convert_value_to_str(self, value): - return str(value) - - def _do_convert_str_to_value(self, s): - return int(s) - - -class FloatValueType(ValueType[float]): - def __init__(self) -> None: - super().__init__("float", float) - - def _do_check_str_format(self, s): - try: - float(s) - except ValueError as e: - raise CruValueTypeError("Invalid float format.", s, self) from e - - def _do_convert_value_to_str(self, value): - return str(value) - - def _do_convert_str_to_value(self, s): - return float(s) - - -class BooleanValueType(ValueType[bool]): - DEFAULT_TRUE_LIST: ClassVar[list[str]] = ["true", "yes", "y", "on", "1"] - DEFAULT_FALSE_LIST: ClassVar[list[str]] = ["false", "no", "n", "off", "0"] - - def __init__( - self, - *, - case_sensitive=False, - true_list: None | list[str] = None, - false_list: None | list[str] = None, - ) -> None: - super().__init__("boolean", bool) - self._case_sensitive = case_sensitive - self._valid_true_strs: list[str] = ( - true_list or BooleanValueType.DEFAULT_TRUE_LIST - ) - self._valid_false_strs: list[str] = ( - false_list or BooleanValueType.DEFAULT_FALSE_LIST - ) - - @property - def case_sensitive(self) -> bool: - return self._case_sensitive - - @property - def valid_true_strs(self) -> list[str]: - return self._valid_true_strs - - @property - def valid_false_strs(self) -> list[str]: - return self._valid_false_strs - - @property - def valid_boolean_strs(self) -> list[str]: - return self._valid_true_strs + self._valid_false_strs - - def _do_check_str_format(self, s): - if not _str_case_in(s, self.case_sensitive, self.valid_boolean_strs): - raise CruValueTypeError("Invalid boolean format.", s, self) - - def _do_convert_value_to_str(self, value): - return self._valid_true_strs[0] if value else self._valid_false_strs[0] - - def _do_convert_str_to_value(self, s): - return _str_case_in(s, self.case_sensitive, self._valid_true_strs) - - def create_default_value(self): - return self.valid_false_strs[0] - - -class EnumValueType(ValueType[str]): - def __init__(self, valid_values: list[str], /, case_sensitive=False) -> None: - super().__init__(f"enum({'|'.join(valid_values)})", str) - self._case_sensitive = case_sensitive - self._valid_values = valid_values - - @property - def case_sensitive(self) -> bool: - return self._case_sensitive - - @property - def valid_values(self) -> list[str]: - return self._valid_values - - def _do_check_value(self, value): - self._do_check_str_format(value) - - def _do_check_str_format(self, s): - if not _str_case_in(s, self.case_sensitive, self.valid_values): - raise CruValueTypeError("Invalid enum value", s, self) - - def _do_convert_value_to_str(self, value): - return value - - def _do_convert_str_to_value(self, s): - return s - - def create_default_value(self): - return self.valid_values[0] - - -TEXT_VALUE_TYPE = TextValueType() -INTEGER_VALUE_TYPE = IntegerValueType() -BOOLEAN_VALUE_TYPE = BooleanValueType() - - -class ValueGeneratorBase(Generic[_T], metaclass=ABCMeta): - @abstractmethod - def generate(self) -> _T: - raise NotImplementedError() - - def __call__(self) -> _T: - return self.generate() - - -class ValueGenerator(ValueGeneratorBase[_T]): - def __init__(self, generate_func: Callable[[], _T]) -> None: - self._generate_func = generate_func - - @property - def generate_func(self) -> Callable[[], _T]: - return self._generate_func - - def generate(self) -> _T: - return self._generate_func() - - -class UuidValueGenerator(ValueGeneratorBase[str]): - def generate(self): - return str(uuid.uuid4()) - - -class RandomStringValueGenerator(ValueGeneratorBase[str]): - def __init__(self, length: int, secure: bool) -> None: - self._length = length - self._secure = secure - - @property - def length(self) -> int: - return self._length - - @property - def secure(self) -> bool: - return self._secure - - def generate(self): - random_func = secrets.choice if self._secure else random.choice - characters = string.ascii_letters + string.digits - random_string = "".join(random_func(characters) for _ in range(self._length)) - return random_string - - -UUID_VALUE_GENERATOR = UuidValueGenerator() diff --git a/services/poetry.lock b/services/poetry.lock deleted file mode 100644 index 4338200..0000000 --- a/services/poetry.lock +++ /dev/null @@ -1,111 +0,0 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. - -[[package]] -name = "mypy" -version = "1.15.0" -description = "Optional static typing for Python" -optional = false -python-versions = ">=3.9" -groups = ["dev"] -files = [ - {file = "mypy-1.15.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:979e4e1a006511dacf628e36fadfecbcc0160a8af6ca7dad2f5025529e082c13"}, - {file = "mypy-1.15.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c4bb0e1bd29f7d34efcccd71cf733580191e9a264a2202b0239da95984c5b559"}, - {file = "mypy-1.15.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:be68172e9fd9ad8fb876c6389f16d1c1b5f100ffa779f77b1fb2176fcc9ab95b"}, - {file = "mypy-1.15.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c7be1e46525adfa0d97681432ee9fcd61a3964c2446795714699a998d193f1a3"}, - {file = "mypy-1.15.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:2e2c2e6d3593f6451b18588848e66260ff62ccca522dd231cd4dd59b0160668b"}, - {file = "mypy-1.15.0-cp310-cp310-win_amd64.whl", hash = "sha256:6983aae8b2f653e098edb77f893f7b6aca69f6cffb19b2cc7443f23cce5f4828"}, - {file = "mypy-1.15.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:2922d42e16d6de288022e5ca321cd0618b238cfc5570e0263e5ba0a77dbef56f"}, - {file = "mypy-1.15.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2ee2d57e01a7c35de00f4634ba1bbf015185b219e4dc5909e281016df43f5ee5"}, - {file = "mypy-1.15.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:973500e0774b85d9689715feeffcc980193086551110fd678ebe1f4342fb7c5e"}, - {file = "mypy-1.15.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5a95fb17c13e29d2d5195869262f8125dfdb5c134dc8d9a9d0aecf7525b10c2c"}, - {file = "mypy-1.15.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:1905f494bfd7d85a23a88c5d97840888a7bd516545fc5aaedff0267e0bb54e2f"}, - {file = "mypy-1.15.0-cp311-cp311-win_amd64.whl", hash = "sha256:c9817fa23833ff189db061e6d2eff49b2f3b6ed9856b4a0a73046e41932d744f"}, - {file = "mypy-1.15.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:aea39e0583d05124836ea645f412e88a5c7d0fd77a6d694b60d9b6b2d9f184fd"}, - {file = "mypy-1.15.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:2f2147ab812b75e5b5499b01ade1f4a81489a147c01585cda36019102538615f"}, - {file = "mypy-1.15.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ce436f4c6d218a070048ed6a44c0bbb10cd2cc5e272b29e7845f6a2f57ee4464"}, - {file = "mypy-1.15.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8023ff13985661b50a5928fc7a5ca15f3d1affb41e5f0a9952cb68ef090b31ee"}, - {file = "mypy-1.15.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1124a18bc11a6a62887e3e137f37f53fbae476dc36c185d549d4f837a2a6a14e"}, - {file = "mypy-1.15.0-cp312-cp312-win_amd64.whl", hash = "sha256:171a9ca9a40cd1843abeca0e405bc1940cd9b305eaeea2dda769ba096932bb22"}, - {file = "mypy-1.15.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:93faf3fdb04768d44bf28693293f3904bbb555d076b781ad2530214ee53e3445"}, - {file = "mypy-1.15.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:811aeccadfb730024c5d3e326b2fbe9249bb7413553f15499a4050f7c30e801d"}, - {file = "mypy-1.15.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:98b7b9b9aedb65fe628c62a6dc57f6d5088ef2dfca37903a7d9ee374d03acca5"}, - {file = "mypy-1.15.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c43a7682e24b4f576d93072216bf56eeff70d9140241f9edec0c104d0c515036"}, - {file = "mypy-1.15.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:baefc32840a9f00babd83251560e0ae1573e2f9d1b067719479bfb0e987c6357"}, - {file = "mypy-1.15.0-cp313-cp313-win_amd64.whl", hash = "sha256:b9378e2c00146c44793c98b8d5a61039a048e31f429fb0eb546d93f4b000bedf"}, - {file = "mypy-1.15.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:e601a7fa172c2131bff456bb3ee08a88360760d0d2f8cbd7a75a65497e2df078"}, - {file = "mypy-1.15.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:712e962a6357634fef20412699a3655c610110e01cdaa6180acec7fc9f8513ba"}, - {file = "mypy-1.15.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f95579473af29ab73a10bada2f9722856792a36ec5af5399b653aa28360290a5"}, - {file = "mypy-1.15.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8f8722560a14cde92fdb1e31597760dc35f9f5524cce17836c0d22841830fd5b"}, - {file = "mypy-1.15.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:1fbb8da62dc352133d7d7ca90ed2fb0e9d42bb1a32724c287d3c76c58cbaa9c2"}, - {file = "mypy-1.15.0-cp39-cp39-win_amd64.whl", hash = "sha256:d10d994b41fb3497719bbf866f227b3489048ea4bbbb5015357db306249f7980"}, - {file = "mypy-1.15.0-py3-none-any.whl", hash = "sha256:5469affef548bd1895d86d3bf10ce2b44e33d86923c29e4d675b3e323437ea3e"}, - {file = "mypy-1.15.0.tar.gz", hash = "sha256:404534629d51d3efea5c800ee7c42b72a6554d6c400e6a79eafe15d11341fd43"}, -] - -[package.dependencies] -mypy_extensions = ">=1.0.0" -typing_extensions = ">=4.6.0" - -[package.extras] -dmypy = ["psutil (>=4.0)"] -faster-cache = ["orjson"] -install-types = ["pip"] -mypyc = ["setuptools (>=50)"] -reports = ["lxml"] - -[[package]] -name = "mypy-extensions" -version = "1.0.0" -description = "Type system extensions for programs checked with the mypy type checker." -optional = false -python-versions = ">=3.5" -groups = ["dev"] -files = [ - {file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"}, - {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, -] - -[[package]] -name = "ruff" -version = "0.9.6" -description = "An extremely fast Python linter and code formatter, written in Rust." -optional = false -python-versions = ">=3.7" -groups = ["dev"] -files = [ - {file = "ruff-0.9.6-py3-none-linux_armv6l.whl", hash = "sha256:2f218f356dd2d995839f1941322ff021c72a492c470f0b26a34f844c29cdf5ba"}, - {file = "ruff-0.9.6-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b908ff4df65dad7b251c9968a2e4560836d8f5487c2f0cc238321ed951ea0504"}, - {file = "ruff-0.9.6-py3-none-macosx_11_0_arm64.whl", hash = "sha256:b109c0ad2ececf42e75fa99dc4043ff72a357436bb171900714a9ea581ddef83"}, - {file = "ruff-0.9.6-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1de4367cca3dac99bcbd15c161404e849bb0bfd543664db39232648dc00112dc"}, - {file = "ruff-0.9.6-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ac3ee4d7c2c92ddfdaedf0bf31b2b176fa7aa8950efc454628d477394d35638b"}, - {file = "ruff-0.9.6-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5dc1edd1775270e6aa2386119aea692039781429f0be1e0949ea5884e011aa8e"}, - {file = "ruff-0.9.6-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:4a091729086dffa4bd070aa5dab7e39cc6b9d62eb2bef8f3d91172d30d599666"}, - {file = "ruff-0.9.6-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d1bbc6808bf7b15796cef0815e1dfb796fbd383e7dbd4334709642649625e7c5"}, - {file = "ruff-0.9.6-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:589d1d9f25b5754ff230dce914a174a7c951a85a4e9270613a2b74231fdac2f5"}, - {file = "ruff-0.9.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc61dd5131742e21103fbbdcad683a8813be0e3c204472d520d9a5021ca8b217"}, - {file = "ruff-0.9.6-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5e2d9126161d0357e5c8f30b0bd6168d2c3872372f14481136d13de9937f79b6"}, - {file = "ruff-0.9.6-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:68660eab1a8e65babb5229a1f97b46e3120923757a68b5413d8561f8a85d4897"}, - {file = "ruff-0.9.6-py3-none-musllinux_1_2_i686.whl", hash = "sha256:c4cae6c4cc7b9b4017c71114115db0445b00a16de3bcde0946273e8392856f08"}, - {file = "ruff-0.9.6-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:19f505b643228b417c1111a2a536424ddde0db4ef9023b9e04a46ed8a1cb4656"}, - {file = "ruff-0.9.6-py3-none-win32.whl", hash = "sha256:194d8402bceef1b31164909540a597e0d913c0e4952015a5b40e28c146121b5d"}, - {file = "ruff-0.9.6-py3-none-win_amd64.whl", hash = "sha256:03482d5c09d90d4ee3f40d97578423698ad895c87314c4de39ed2af945633caa"}, - {file = "ruff-0.9.6-py3-none-win_arm64.whl", hash = "sha256:0e2bb706a2be7ddfea4a4af918562fdc1bcb16df255e5fa595bbd800ce322a5a"}, - {file = "ruff-0.9.6.tar.gz", hash = "sha256:81761592f72b620ec8fa1068a6fd00e98a5ebee342a3642efd84454f3031dca9"}, -] - -[[package]] -name = "typing-extensions" -version = "4.12.2" -description = "Backported and Experimental Type Hints for Python 3.8+" -optional = false -python-versions = ">=3.8" -groups = ["dev"] -files = [ - {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, - {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, -] - -[metadata] -lock-version = "2.1" -python-versions = "^3.11" -content-hash = "674a21dbda993a1ee761e2e6e2f13ccece8289336a83fd0a154285eac48f3a76" diff --git a/services/pyproject.toml b/services/pyproject.toml deleted file mode 100644 index 960e161..0000000 --- a/services/pyproject.toml +++ /dev/null @@ -1,19 +0,0 @@ -[project] -name = "cru-service-manager" -version = "0.1.0" -requires-python = ">=3.11" -license = "MIT" - -[tool.poetry] -package-mode = false - -[tool.poetry.group.dev.dependencies] -mypy = "^1.13.0" -ruff = "^0.9.6" - -[tool.ruff.lint] -select = ["E", "F", "B"] - -[build-system] -requires = ["poetry-core"] -build-backend = "poetry.core.masonry.api" diff --git a/services/update-blog b/services/update-blog deleted file mode 100755 index d85acc1..0000000 --- a/services/update-blog +++ /dev/null @@ -1,5 +0,0 @@ -#!/usr/bin/env bash - -set -e - -exec docker compose exec -it blog /scripts/update.bash -- cgit v1.2.3