diff options
Diffstat (limited to 'tools/cru-py/cru')
26 files changed, 3928 insertions, 0 deletions
| diff --git a/tools/cru-py/cru/__init__.py b/tools/cru-py/cru/__init__.py new file mode 100644 index 0000000..17799a9 --- /dev/null +++ b/tools/cru-py/cru/__init__.py @@ -0,0 +1,60 @@ +import sys + +from ._base import CRU, CruNamespaceError, CRU_NAME_PREFIXES +from ._error import ( +    CruException, +    CruLogicError, +    CruInternalError, +    CruUnreachableError, +    cru_unreachable, +) +from ._const import ( +    CruConstantBase, +    CruDontChange, +    CruNotFound, +    CruNoValue, +    CruPlaceholder, +    CruUseDefault, +) +from ._func import CruFunction +from ._iter import CruIterable, CruIterator +from ._event import CruEvent, CruEventHandlerToken +from ._type import CruTypeSet, CruTypeCheckError + + +class CruInitError(CruException): +    pass + + +def check_python_version(required_version=(3, 11)): +    if sys.version_info < required_version: +        raise CruInitError(f"Python version must be >= {required_version}!") + + +check_python_version() + +__all__ = [ +    "CRU", +    "CruNamespaceError", +    "CRU_NAME_PREFIXES", +    "check_python_version", +    "CruException", +    "CruInternalError", +    "CruLogicError", +    "CruUnreachableError", +    "cru_unreachable", +    "CruInitError", +    "CruConstantBase", +    "CruDontChange", +    "CruNotFound", +    "CruNoValue", +    "CruPlaceholder", +    "CruUseDefault", +    "CruFunction", +    "CruIterable", +    "CruIterator", +    "CruEvent", +    "CruEventHandlerToken", +    "CruTypeSet", +    "CruTypeCheckError", +] diff --git a/tools/cru-py/cru/_base.py b/tools/cru-py/cru/_base.py new file mode 100644 index 0000000..2599d8f --- /dev/null +++ b/tools/cru-py/cru/_base.py @@ -0,0 +1,101 @@ +from typing import Any + +from ._helper import remove_none +from ._error import CruException + + +class CruNamespaceError(CruException): +    """Raised when a namespace is not found.""" + + +class _Cru: +    NAME_PREFIXES = ("CRU_", "Cru", "cru_") + +    def __init__(self) -> None: +        self._d: dict[str, Any] = {} + +    def all_names(self) -> list[str]: +        return list(self._d.keys()) + +    def get(self, name: str) -> Any: +        return self._d[name] + +    def has_name(self, name: str) -> bool: +        return name in self._d + +    @staticmethod +    def _maybe_remove_prefix(name: str) -> str | None: +        for prefix in _Cru.NAME_PREFIXES: +            if name.startswith(prefix): +                return name[len(prefix) :] +        return None + +    def _check_name_exist(self, *names: str | None) -> None: +        for name in names: +            if name is None: +                continue +            if self.has_name(name): +                raise CruNamespaceError(f"Name {name} exists in CRU.") + +    @staticmethod +    def check_name_format(name: str) -> tuple[str, str]: +        no_prefix_name = _Cru._maybe_remove_prefix(name) +        if no_prefix_name is None: +            raise CruNamespaceError( +                f"Name {name} is not prefixed with any of {_Cru.NAME_PREFIXES}." +            ) +        return name, no_prefix_name + +    @staticmethod +    def _check_object_name(o) -> tuple[str, str]: +        return _Cru.check_name_format(o.__name__) + +    def _do_add(self, o, *names: str | None) -> list[str]: +        name_list: list[str] = remove_none(names) +        for name in name_list: +            self._d[name] = o +        return name_list + +    def add(self, o, name: str | None) -> tuple[str, str | None]: +        no_prefix_name: str | None +        if name is None: +            name, no_prefix_name = self._check_object_name(o) +        else: +            no_prefix_name = self._maybe_remove_prefix(name) + +        self._check_name_exist(name, no_prefix_name) +        self._do_add(o, name, no_prefix_name) +        return name, no_prefix_name + +    def add_with_alias(self, o, name: str | None = None, *aliases: str) -> list[str]: +        final_names: list[str | None] = [] +        no_prefix_name: str | None +        if name is None: +            name, no_prefix_name = self._check_object_name(o) +            self._check_name_exist(name, no_prefix_name) +            final_names.extend([name, no_prefix_name]) +        for alias in aliases: +            no_prefix_name = self._maybe_remove_prefix(alias) +            self._check_name_exist(alias, no_prefix_name) +            final_names.extend([alias, no_prefix_name]) + +        return self._do_add(o, *final_names) + +    def add_objects(self, *objects): +        final_list = [] +        for o in objects: +            name, no_prefix_name = self._check_object_name(o) +            self._check_name_exist(name, no_prefix_name) +            final_list.append((o, name, no_prefix_name)) +        for o, name, no_prefix_name in final_list: +            self._do_add(o, name, no_prefix_name) + +    def __getitem__(self, item): +        return self.get(item) + +    def __getattr__(self, item): +        return self.get(item) + + +CRU_NAME_PREFIXES = _Cru.NAME_PREFIXES +CRU = _Cru() diff --git a/tools/cru-py/cru/_const.py b/tools/cru-py/cru/_const.py new file mode 100644 index 0000000..8246b35 --- /dev/null +++ b/tools/cru-py/cru/_const.py @@ -0,0 +1,49 @@ +from enum import Enum, auto +from typing import Self, TypeGuard, TypeVar + +from ._base import CRU + +_T = TypeVar("_T") + + +class CruConstantBase(Enum): +    @classmethod +    def check(cls, v: _T | Self) -> TypeGuard[Self]: +        return isinstance(v, cls) + +    @classmethod +    def check_not(cls, v: _T | Self) -> TypeGuard[_T]: +        return not cls.check(v) + +    @classmethod +    def value(cls) -> Self: +        return cls.VALUE  # type: ignore + + +class CruNotFound(CruConstantBase): +    VALUE = auto() + + +class CruUseDefault(CruConstantBase): +    VALUE = auto() + + +class CruDontChange(CruConstantBase): +    VALUE = auto() + + +class CruNoValue(CruConstantBase): +    VALUE = auto() + + +class CruPlaceholder(CruConstantBase): +    VALUE = auto() + + +CRU.add_objects( +    CruNotFound, +    CruUseDefault, +    CruDontChange, +    CruNoValue, +    CruPlaceholder, +) diff --git a/tools/cru-py/cru/_decorator.py b/tools/cru-py/cru/_decorator.py new file mode 100644 index 0000000..137fc05 --- /dev/null +++ b/tools/cru-py/cru/_decorator.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import ( +    Concatenate, +    Generic, +    ParamSpec, +    TypeVar, +    cast, +) + +from ._base import CRU + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_O = TypeVar("_O") +_R = TypeVar("_R") + + +class CruDecorator: + +    class ConvertResult(Generic[_T, _O]): +        def __init__( +            self, +            converter: Callable[[_T], _O], +        ) -> None: +            self.converter = converter + +        def __call__(self, origin: Callable[_P, _T]) -> Callable[_P, _O]: +            converter = self.converter + +            def real_impl(*args: _P.args, **kwargs: _P.kwargs) -> _O: +                return converter(origin(*args, **kwargs)) + +            return real_impl + +    class ImplementedBy(Generic[_T, _O, _P, _R]): +        def __init__( +            self, +            impl: Callable[Concatenate[_O, _P], _R], +            converter: Callable[[_T], _O], +        ) -> None: +            self.impl = impl +            self.converter = converter + +        def __call__( +            self, _origin: Callable[[_T], None] +        ) -> Callable[Concatenate[_T, _P], _R]: +            converter = self.converter +            impl = self.impl + +            def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: +                return cast(Callable[Concatenate[_O, _P], _R], impl)( +                    converter(_self), *args, **kwargs +                ) + +            return real_impl + +        @staticmethod +        def create_factory(converter: Callable[[_T], _O]) -> Callable[ +            [Callable[Concatenate[_O, _P], _R]], +            CruDecorator.ImplementedBy[_T, _O, _P, _R], +        ]: +            def create( +                m: Callable[Concatenate[_O, _P], _R], +            ) -> CruDecorator.ImplementedBy[_T, _O, _P, _R]: +                return CruDecorator.ImplementedBy(m, converter) + +            return create + +    class ImplementedByNoSelf(Generic[_P, _R]): +        def __init__(self, impl: Callable[_P, _R]) -> None: +            self.impl = impl + +        def __call__( +            self, _origin: Callable[[_T], None] +        ) -> Callable[Concatenate[_T, _P], _R]: +            impl = self.impl + +            def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: +                return cast(Callable[_P, _R], impl)(*args, **kwargs) + +            return real_impl + +        @staticmethod +        def create_factory() -> ( +            Callable[[Callable[_P, _R]], CruDecorator.ImplementedByNoSelf[_P, _R]] +        ): +            def create( +                m: Callable[_P, _R], +            ) -> CruDecorator.ImplementedByNoSelf[_P, _R]: +                return CruDecorator.ImplementedByNoSelf(m) + +            return create + + +CRU.add_objects(CruDecorator) diff --git a/tools/cru-py/cru/_error.py b/tools/cru-py/cru/_error.py new file mode 100644 index 0000000..e53c787 --- /dev/null +++ b/tools/cru-py/cru/_error.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +from typing import NoReturn, cast, overload + + +class CruException(Exception): +    """Base exception class of all exceptions in cru.""" + +    @overload +    def __init__( +        self, +        message: None = None, +        *args, +        user_message: str, +        **kwargs, +    ): ... + +    @overload +    def __init__( +        self, +        message: str, +        *args, +        user_message: str | None = None, +        **kwargs, +    ): ... + +    def __init__( +        self, +        message: str | None = None, +        *args, +        user_message: str | None = None, +        **kwargs, +    ): +        if message is None: +            message = user_message + +        super().__init__( +            message, +            *args, +            **kwargs, +        ) +        self._message: str +        self._message = cast(str, message) +        self._user_message = user_message + +    @property +    def message(self) -> str: +        return self._message + +    def get_user_message(self) -> str | None: +        return self._user_message + +    def get_message(self, use_user: bool = True) -> str: +        if use_user and self._user_message is not None: +            return self._user_message +        else: +            return self._message + +    @property +    def is_internal(self) -> bool: +        return False + +    @property +    def is_logic_error(self) -> bool: +        return False + + +class CruLogicError(CruException): +    """Raised when a logic error occurs.""" + +    @property +    def is_logic_error(self) -> bool: +        return True + + +class CruInternalError(CruException): +    """Raised when an internal error occurs.""" + +    @property +    def is_internal(self) -> bool: +        return True + + +class CruUnreachableError(CruInternalError): +    """Raised when a code path is unreachable.""" + + +def cru_unreachable() -> NoReturn: +    raise CruUnreachableError("Code should not reach here!") diff --git a/tools/cru-py/cru/_event.py b/tools/cru-py/cru/_event.py new file mode 100644 index 0000000..51a794c --- /dev/null +++ b/tools/cru-py/cru/_event.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import Generic, ParamSpec, TypeVar + +from .list import CruList + +_P = ParamSpec("_P") +_R = TypeVar("_R") + + +class CruEventHandlerToken(Generic[_P, _R]): +    def __init__( +        self, event: CruEvent, handler: Callable[_P, _R], once: bool = False +    ) -> None: +        self._event = event +        self._handler = handler +        self._once = once + +    @property +    def event(self) -> CruEvent: +        return self._event + +    @property +    def handler(self) -> Callable[_P, _R]: +        return self._handler + +    @property +    def once(self) -> bool: +        return self._once + + +class CruEvent(Generic[_P, _R]): +    def __init__(self, name: str) -> None: +        self._name = name +        self._tokens: CruList[CruEventHandlerToken] = CruList() + +    def register( +        self, handler: Callable[_P, _R], once: bool = False +    ) -> CruEventHandlerToken: +        token = CruEventHandlerToken(self, handler, once) +        self._tokens.append(token) +        return token + +    def unregister(self, *handlers: CruEventHandlerToken | Callable[_P, _R]) -> int: +        old_length = len(self._tokens) +        self._tokens.reset( +            self._tokens.as_cru_iterator().filter( +                (lambda t: t in handlers or t.handler in handlers) +            ) +        ) +        return old_length - len(self._tokens) + +    def trigger(self, *args: _P.args, **kwargs: _P.kwargs) -> CruList[_R]: +        results = CruList( +            self._tokens.as_cru_iterator() +            .transform(lambda t: t.handler(*args, **kwargs)) +            .to_list() +        ) +        self._tokens.reset(self._tokens.as_cru_iterator().filter(lambda t: not t.once)) +        return results diff --git a/tools/cru-py/cru/_func.py b/tools/cru-py/cru/_func.py new file mode 100644 index 0000000..fc57802 --- /dev/null +++ b/tools/cru-py/cru/_func.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +from collections.abc import Callable, Iterable +from enum import Flag, auto +from typing import ( +    Any, +    Generic, +    Literal, +    ParamSpec, +    TypeAlias, +    TypeVar, +) + + +from ._base import CRU +from ._const import CruPlaceholder + +_P = ParamSpec("_P") +_P1 = ParamSpec("_P1") +_T = TypeVar("_T") + + +class _Dec: +    @staticmethod +    def wrap( +        origin: Callable[_P, Callable[_P1, _T]] +    ) -> Callable[_P, _Wrapper[_P1, _T]]: +        def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_P1, _T]: +            return _Wrapper(origin(*args, **kwargs)) + +        return _wrapped + + +class _RawBase: +    @staticmethod +    def none(*_v, **_kwargs) -> None: +        return None + +    @staticmethod +    def true(*_v, **_kwargs) -> Literal[True]: +        return True + +    @staticmethod +    def false(*_v, **_kwargs) -> Literal[False]: +        return False + +    @staticmethod +    def identity(v: _T) -> _T: +        return v + +    @staticmethod +    def only_you(v: _T, *_v, **_kwargs) -> _T: +        return v + +    @staticmethod +    def equal(a: Any, b: Any) -> bool: +        return a == b + +    @staticmethod +    def not_equal(a: Any, b: Any) -> bool: +        return a != b + +    @staticmethod +    def not_(v: Any) -> Any: +        return not v + + +class _Wrapper(Generic[_P, _T]): +    def __init__(self, f: Callable[_P, _T]): +        self._f = f + +    @property +    def me(self) -> Callable[_P, _T]: +        return self._f + +    def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: +        return self._f(*args, **kwargs) + +    @_Dec.wrap +    def bind(self, *bind_args, **bind_kwargs) -> Callable[..., _T]: +        func = self.me + +        def bound_func(*args, **kwargs): +            popped = 0 +            real_args = [] +            for arg in bind_args: +                if CruPlaceholder.check(arg): +                    real_args.append(args[popped]) +                    popped += 1 +                else: +                    real_args.append(arg) +            real_args.extend(args[popped:]) +            return func(*real_args, **(bind_kwargs | kwargs)) + +        return bound_func + +    class ChainMode(Flag): +        ARGS = auto() +        KWARGS = auto() +        BOTH = ARGS | KWARGS + +    ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]] +    KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]] +    ChainableCallable: TypeAlias = Callable[ +        ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]] +    ] + +    @_Dec.wrap +    def chain_with_args( +        self, funcs: Iterable[ArgsChainableCallable], *bind_args, **bind_kwargs +    ) -> ArgsChainableCallable: +        def chained_func(*args): +            args = self.bind(*bind_args, **bind_kwargs)(*args) + +            for func in funcs: +                args = _Wrapper(func).bind(*bind_args, **bind_kwargs)(*args) +            return args + +        return chained_func + +    @_Dec.wrap +    def chain_with_kwargs( +        self, funcs: Iterable[KwargsChainableCallable], *bind_args, **bind_kwargs +    ) -> KwargsChainableCallable: +        def chained_func(**kwargs): +            kwargs = self.bind(*bind_args, **bind_kwargs)(**kwargs) +            for func in funcs: +                kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(**kwargs) +            return kwargs + +        return chained_func + +    @_Dec.wrap +    def chain_with_both( +        self, funcs: Iterable[ChainableCallable], *bind_args, **bind_kwargs +    ) -> ChainableCallable: +        def chained_func(*args, **kwargs): +            for func in funcs: +                args, kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)( +                    *args, **kwargs +                ) +            return args, kwargs + +        return chained_func + + +class _Base: +    none = _Wrapper(_RawBase.none) +    true = _Wrapper(_RawBase.true) +    false = _Wrapper(_RawBase.false) +    identity = _Wrapper(_RawBase.identity) +    only_you = _Wrapper(_RawBase.only_you) +    equal = _Wrapper(_RawBase.equal) +    not_equal = _Wrapper(_RawBase.not_equal) +    not_ = _Wrapper(_RawBase.not_) + + +class _Creators: +    @staticmethod +    def make_isinstance_of_types(*types: type) -> Callable: +        return _Wrapper(lambda v: type(v) in types) + + +class CruFunction: +    RawBase: TypeAlias = _RawBase +    Base: TypeAlias = _Base +    Creators: TypeAlias = _Creators +    Wrapper: TypeAlias = _Wrapper +    Decorators: TypeAlias = _Dec + + +CRU.add_objects(CruFunction) diff --git a/tools/cru-py/cru/_helper.py b/tools/cru-py/cru/_helper.py new file mode 100644 index 0000000..43baf46 --- /dev/null +++ b/tools/cru-py/cru/_helper.py @@ -0,0 +1,16 @@ +from collections.abc import Callable +from typing import Any, Iterable, TypeVar, cast + +_T = TypeVar("_T") +_D = TypeVar("_D") + + +def remove_element( +    iterable: Iterable[_T | None], to_rm: Iterable[Any], des: type[_D] | None = None +) -> _D: +    to_rm = set(to_rm) +    return cast(Callable[..., _D], des or list)(v for v in iterable if v not in to_rm) + + +def remove_none(iterable: Iterable[_T | None], des: type[_D] | None = None) -> _D: +    return cast(Callable[..., _D], des or list)(v for v in iterable if v is not None) diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py new file mode 100644 index 0000000..8f58561 --- /dev/null +++ b/tools/cru-py/cru/_iter.py @@ -0,0 +1,466 @@ +from __future__ import annotations + +from collections.abc import Iterable, Callable, Generator, Iterator +from dataclasses import dataclass +from enum import Enum +from typing import ( +    Concatenate, +    Literal, +    Never, +    Self, +    TypeAlias, +    TypeVar, +    ParamSpec, +    Any, +    Generic, +    cast, +) + +from ._base import CRU +from ._const import CruNotFound +from ._error import cru_unreachable + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_O = TypeVar("_O") +_V = TypeVar("_V") +_R = TypeVar("_R") + + +class _Generic: +    class StepActionKind(Enum): +        SKIP = 0 +        PUSH = 1 +        STOP = 2 +        AGGREGATE = 3 + +    @dataclass +    class StepAction(Generic[_V, _R]): +        value: Iterable[Self] | _V | _R | None +        kind: _Generic.StepActionKind + +        @property +        def push_value(self) -> _V: +            assert self.kind == _Generic.StepActionKind.PUSH +            return cast(_V, self.value) + +        @property +        def stop_value(self) -> _R: +            assert self.kind == _Generic.StepActionKind.STOP +            return cast(_R, self.value) + +        @staticmethod +        def skip() -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction(None, _Generic.StepActionKind.SKIP) + +        @staticmethod +        def push(value: _V | None) -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction(value, _Generic.StepActionKind.PUSH) + +        @staticmethod +        def stop(value: _R | None = None) -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction(value, _Generic.StepActionKind.STOP) + +        @staticmethod +        def aggregate( +            *results: _Generic.StepAction[_V, _R], +        ) -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction(results, _Generic.StepActionKind.AGGREGATE) + +        @staticmethod +        def push_last(value: _V | None) -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction.aggregate( +                _Generic.StepAction.push(value), _Generic.StepAction.stop() +            ) + +        def flatten(self) -> Iterable[Self]: +            return _Generic.flatten( +                self, +                is_leave=lambda r: r.kind != _Generic.StepActionKind.AGGREGATE, +                get_children=lambda r: cast(Iterable[Self], r.value), +            ) + +    GeneralStepAction: TypeAlias = StepAction[_V, _R] | _V | _R | None +    IterateOperation: TypeAlias = Callable[[_T, int], GeneralStepAction[_V, _R]] +    IteratePreHook: TypeAlias = Callable[[Iterable[_T]], GeneralStepAction[_V, _R]] +    IteratePostHook: TypeAlias = Callable[[int], GeneralStepAction[_V, _R]] + +    @staticmethod +    def _is_not_iterable(o: Any) -> bool: +        return not isinstance(o, Iterable) + +    @staticmethod +    def _return_self(o): +        return o + +    @staticmethod +    def iterable_flatten( +        maybe_iterable: Iterable[_T] | _T, max_depth: int = -1, *, _depth: int = 0 +    ) -> Iterable[Iterable[_T] | _T]: +        if _depth == max_depth or not isinstance(maybe_iterable, Iterable): +            yield maybe_iterable +            return + +        for child in maybe_iterable: +            yield from _Generic.iterable_flatten( +                child, +                max_depth, +                _depth=_depth + 1, +            ) + +    @staticmethod +    def flatten( +        o: _O, +        max_depth: int = -1, +        /, +        is_leave: CruIterator.ElementPredicate[_O] = _is_not_iterable, +        get_children: CruIterator.ElementTransformer[_O, Iterable[_O]] = _return_self, +        *, +        _depth: int = 0, +    ) -> Iterable[_O]: +        if _depth == max_depth or is_leave(o): +            yield o +            return +        for child in get_children(o): +            yield from _Generic.flatten( +                child, +                max_depth, +                is_leave, +                get_children, +                _depth=_depth + 1, +            ) + +    class Results: +        @staticmethod +        def true(_) -> Literal[True]: +            return True + +        @staticmethod +        def false(_) -> Literal[False]: +            return False + +        @staticmethod +        def not_found(_) -> Literal[CruNotFound.VALUE]: +            return CruNotFound.VALUE + +    @staticmethod +    def _non_result_to_push(value: Any) -> StepAction[_V, _R]: +        return _Generic.StepAction.push(value) + +    @staticmethod +    def _non_result_to_stop(value: Any) -> StepAction[_V, _R]: +        return _Generic.StepAction.stop(value) + +    @staticmethod +    def _none_hook(_: Any) -> StepAction[_V, _R]: +        return _Generic.StepAction.skip() + +    def iterate( +        iterable: Iterable[_T], +        operation: IterateOperation[_T, _V, _R], +        fallback_return: _R, +        pre_iterate: IteratePreHook[_T, _V, _R], +        post_iterate: IteratePostHook[_V, _R], +        convert_value_result: Callable[[_V | _R | None], StepAction[_V, _R]], +    ) -> Generator[_V, None, _R]: +        pre_result = pre_iterate(iterable) +        if not isinstance(pre_result, _Generic.StepAction): +            real_pre_result = convert_value_result(pre_result) +        for r in real_pre_result.flatten(): +            if r.kind == _Generic.StepActionKind.STOP: +                return r.stop_value +            elif r.kind == _Generic.StepActionKind.PUSH: +                yield r.push_value +            else: +                assert r.kind == _Generic.StepActionKind.SKIP + +        for index, element in enumerate(iterable): +            result = operation(element, index) +            if not isinstance(result, _Generic.StepAction): +                real_result = convert_value_result(result) +            for r in real_result.flatten(): +                if r.kind == _Generic.StepActionKind.STOP: +                    return r.stop_value +                elif r.kind == _Generic.StepActionKind.PUSH: +                    yield r.push_value +                else: +                    assert r.kind == _Generic.StepActionKind.SKIP +                    continue + +        post_result = post_iterate(index + 1) +        if not isinstance(post_result, _Generic.StepAction): +            real_post_result = convert_value_result(post_result) +        for r in real_post_result.flatten(): +            if r.kind == _Generic.StepActionKind.STOP: +                return r.stop_value +            elif r.kind == _Generic.StepActionKind.PUSH: +                yield r.push_value +            else: +                assert r.kind == _Generic.StepActionKind.SKIP + +        return fallback_return + +    def create_new( +        iterable: Iterable[_T], +        operation: IterateOperation[_T, _V, _R], +        fallback_return: _R, +        /, +        pre_iterate: IteratePreHook[_T, _V, _R] | None = None, +        post_iterate: IteratePostHook[_V, _R] | None = None, +    ) -> Generator[_V, None, _R]: +        return _Generic.iterate( +            iterable, +            operation, +            fallback_return, +            pre_iterate or _Generic._none_hook, +            post_iterate or _Generic._none_hook, +            _Generic._non_result_to_push, +        ) + +    def get_result( +        iterable: Iterable[_T], +        operation: IterateOperation[_T, _V, _R], +        fallback_return: _R, +        /, +        pre_iterate: IteratePreHook[_T, _V, _R] | None = None, +        post_iterate: IteratePostHook[_V, _R] | None = None, +    ) -> _R: +        try: +            for _ in _Generic.iterate( +                iterable, +                operation, +                fallback_return, +                pre_iterate or _Generic._none_hook, +                post_iterate or _Generic._none_hook, +                _Generic._non_result_to_stop, +            ): +                pass +        except StopIteration as stop: +            return stop.value +        cru_unreachable() + + +class _Helpers: +    @staticmethod +    def auto_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]: +        count = 0 + +        def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _O: +            nonlocal count +            r = c(count, *args, **kwargs) +            count += 1 +            return r + +        return wrapper + + +class _Creators: +    class Raw: +        @staticmethod +        def empty() -> Iterator[Never]: +            return iter([]) + +        @staticmethod +        def range(*args) -> Iterator[int]: +            return iter(range(*args)) + +        @staticmethod +        def unite(*args: _T) -> Iterator[_T]: +            return iter(args) + +        @staticmethod +        def _concat(*iterables: Iterable[_T]) -> Iterable[_T]: +            for iterable in iterables: +                yield from iterable + +        @staticmethod +        def concat(*iterables: Iterable[_T]) -> Iterator[_T]: +            return iter(_Creators.Raw._concat(*iterables)) + +    @staticmethod +    def _wrap(f: Callable[_P, Iterable[_O]]) -> Callable[_P, CruIterator[_O]]: +        def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> CruIterator[_O]: +            return CruIterator(f(*args, **kwargs)) + +        return _wrapped + +    empty = _wrap(Raw.empty) +    range = _wrap(Raw.range) +    unite = _wrap(Raw.unite) +    concat = _wrap(Raw.concat) + + +class CruIterator(Generic[_T]): +    ElementOperation: TypeAlias = Callable[[_V], Any] +    ElementPredicate: TypeAlias = Callable[[_V], bool] +    AnyElementPredicate: TypeAlias = ElementPredicate[Any] +    ElementTransformer: TypeAlias = Callable[[_V], _O] +    SelfElementTransformer: TypeAlias = ElementTransformer[_V, _V] +    AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] + +    Creators: TypeAlias = _Creators +    Helpers: TypeAlias = _Helpers + +    def __init__(self, iterable: Iterable[_T]) -> None: +        self._iterator = iter(iterable) + +    def __iter__(self) -> Iterator[_T]: +        return self._iterator + +    def create_new_me(self, iterable: Iterable[_O]) -> CruIterator[_O]: +        return type(self)(iterable)  # type: ignore + +    @staticmethod +    def _wrap( +        f: Callable[Concatenate[CruIterator[_T], _P], Iterable[_O]], +    ) -> Callable[Concatenate[CruIterator[_T], _P], CruIterator[_O]]: +        def _wrapped( +            self: CruIterator[_T], *args: _P.args, **kwargs: _P.kwargs +        ) -> CruIterator[_O]: +            return self.create_new_me(f(self, *args, **kwargs)) + +        return _wrapped + +    @_wrap +    def replace_me(self, iterable: Iterable[_O]) -> Iterable[_O]: +        return iterable + +    def replace_me_with_empty(self) -> CruIterator[Never]: +        return self.create_new_me(_Creators.Raw.empty()) + +    def replace_me_with_range(self, *args) -> CruIterator[int]: +        return self.create_new_me(_Creators.Raw.range(*args)) + +    def replace_me_with_unite(self, *args: _O) -> CruIterator[_O]: +        return self.create_new_me(_Creators.Raw.unite(*args)) + +    def replace_me_with_concat(self, *iterables: Iterable[_T]) -> CruIterator[_T]: +        return self.create_new_me(_Creators.Raw.concat(*iterables)) + +    def to_set(self) -> set[_T]: +        return set(self) + +    def to_list(self) -> list[_T]: +        return list(self) + +    def all(self, predicate: ElementPredicate[_T]) -> bool: +        for value in self: +            if not predicate(value): +                return False +        return True + +    def any(self, predicate: ElementPredicate[_T]) -> bool: +        for value in self: +            if predicate(value): +                return True +        return False + +    def foreach(self, operation: ElementOperation[_T]) -> None: +        for value in self: +            operation(value) + +    @_wrap +    def transform(self, transformer: ElementTransformer[_T, _O]) -> Iterable[_O]: +        for value in self: +            yield transformer(value) + +    map = transform + +    @_wrap +    def filter(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: +        for value in self: +            if predicate(value): +                yield value + +    @_wrap +    def continue_if(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: +        for value in self: +            yield value +            if not predicate(value): +                break + +    def first_n(self, max_count: int) -> CruIterator[_T]: +        if max_count < 0: +            raise ValueError("max_count must be 0 or positive.") +        if max_count == 0: +            return self.replace_me_with_empty()  # type: ignore +        return self.continue_if(_Helpers.auto_count(lambda i, _: i < max_count - 1)) + +    def drop_n(self, n: int) -> CruIterator[_T]: +        if n < 0: +            raise ValueError("n must be 0 or positive.") +        if n == 0: +            return self +        return self.filter(_Helpers.auto_count(lambda i, _: i < n)) + +    def single_or( +        self, fallback: _O | CruNotFound = CruNotFound.VALUE +    ) -> _T | _O | CruNotFound: +        first_2 = self.first_n(2) +        has_value = False +        for element in first_2: +            if has_value: +                raise ValueError("More than one value found.") +            has_value = True +            value = element +        if has_value: +            return value +        else: +            return fallback + +    def first_or( +        self, fallback: _O | CruNotFound = CruNotFound.VALUE +    ) -> _T | _O | CruNotFound: +        return self.first_n(1).single_or(fallback) + +    @_wrap +    def flatten(self, max_depth: int = -1) -> Iterable[Any]: +        return _Generic.iterable_flatten(self, max_depth) + +    def select_by_indices(self, indices: Iterable[int]) -> CruIterator[_T]: +        index_set = set(indices) +        max_index = max(index_set) +        return self.first_n(max_index + 1).filter( +            _Helpers.auto_count(lambda i, _: i in index_set) +        ) + +    def remove_values(self, values: Iterable[Any]) -> CruIterator[_T]: +        value_set = set(values) +        return self.filter(lambda v: v not in value_set) + +    def replace_values( +        self, old_values: Iterable[Any], new_value: _O +    ) -> Iterable[_T | _O]: +        value_set = set(old_values) +        return self.transform(lambda v: new_value if v in value_set else v) + +    def group_by(self, key_getter: Callable[[_T], _O]) -> dict[_O, list[_T]]: +        result: dict[_O, list[_T]] = {} + +        for item in self: +            key = key_getter(item) +            if key not in result: +                result[key] = [] +            result[key].append(item) + +        return result + + +class CruIterMixin(Generic[_T]): +    def cru_iter(self: Iterable[_T]) -> CruIterator[_T]: +        return CruIterator(self) + + +class CruIterList(list[_T], CruIterMixin[_T]): +    pass + + +class CruIterable: +    Generic: TypeAlias = _Generic +    Iterator: TypeAlias = CruIterator[_T] +    Helpers: TypeAlias = _Helpers +    Mixin: TypeAlias = CruIterMixin[_T] +    IterList: TypeAlias = CruIterList[_T] + + +CRU.add_objects(CruIterable, CruIterator) diff --git a/tools/cru-py/cru/_type.py b/tools/cru-py/cru/_type.py new file mode 100644 index 0000000..1f81da3 --- /dev/null +++ b/tools/cru-py/cru/_type.py @@ -0,0 +1,52 @@ +from collections.abc import Iterable +from typing import Any + +from ._error import CruException, CruLogicError +from ._iter import CruIterator + + +class CruTypeCheckError(CruException): +    pass + + +DEFAULT_NONE_ERR_MSG = "None is not allowed here." +DEFAULT_TYPE_ERR_MSG = "Object of this type is not allowed here." + + +class CruTypeSet(set[type]): +    def __init__(self, *types: type): +        type_set = CruIterator(types).filter(lambda t: t is not None).to_set() +        if not CruIterator(type_set).all(lambda t: isinstance(t, type)): +            raise CruLogicError("TypeSet can only contain type.") +        super().__init__(type_set) + +    def check_value( +        self, +        value: Any, +        /, +        allow_none: bool = False, +        empty_allow_all: bool = True, +    ) -> None: +        if value is None: +            if allow_none: +                return +            else: +                raise CruTypeCheckError(DEFAULT_NONE_ERR_MSG) +        if len(self) == 0 and empty_allow_all: +            return +        if not CruIterator(self).any(lambda t: isinstance(value, t)): +            raise CruTypeCheckError(DEFAULT_TYPE_ERR_MSG) + +    def check_value_list( +        self, +        values: Iterable[Any], +        /, +        allow_none: bool = False, +        empty_allow_all: bool = True, +    ) -> None: +        for value in values: +            self.check_value( +                value, +                allow_none, +                empty_allow_all, +            ) diff --git a/tools/cru-py/cru/attr.py b/tools/cru-py/cru/attr.py new file mode 100644 index 0000000..d4cc86a --- /dev/null +++ b/tools/cru-py/cru/attr.py @@ -0,0 +1,364 @@ +from __future__ import annotations + +import copy +from collections.abc import Callable, Iterable +from dataclasses import dataclass, field +from typing import Any + +from .list import CruUniqueKeyList +from ._type import CruTypeSet +from ._const import CruNotFound, CruUseDefault, CruDontChange +from ._iter import CruIterator + + +@dataclass +class CruAttr: + +    name: str +    value: Any +    description: str | None + +    @staticmethod +    def make( +        name: str, value: Any = CruUseDefault.VALUE, description: str | None = None +    ) -> CruAttr: +        return CruAttr(name, value, description) + + +CruAttrDefaultFactory = Callable[["CruAttrDef"], Any] +CruAttrTransformer = Callable[[Any, "CruAttrDef"], Any] +CruAttrValidator = Callable[[Any, "CruAttrDef"], None] + + +@dataclass +class CruAttrDef: +    name: str +    description: str +    default_factory: CruAttrDefaultFactory +    transformer: CruAttrTransformer +    validator: CruAttrValidator + +    def __init__( +        self, +        name: str, +        description: str, +        default_factory: CruAttrDefaultFactory, +        transformer: CruAttrTransformer, +        validator: CruAttrValidator, +    ) -> None: +        self.name = name +        self.description = description +        self.default_factory = default_factory +        self.transformer = transformer +        self.validator = validator + +    def transform(self, value: Any) -> Any: +        if self.transformer is not None: +            return self.transformer(value, self) +        return value + +    def validate(self, value: Any, /, force_allow_none: bool = False) -> None: +        if force_allow_none is value is None: +            return +        if self.validator is not None: +            self.validator(value, self) + +    def transform_and_validate( +        self, value: Any, /, force_allow_none: bool = False +    ) -> Any: +        value = self.transform(value) +        self.validate(value, force_allow_none) +        return value + +    def make_default_value(self) -> Any: +        return self.transform_and_validate(self.default_factory(self)) + +    def adopt(self, attr: CruAttr) -> CruAttr: +        attr = copy.deepcopy(attr) + +        if attr.name is None: +            attr.name = self.name +        elif attr.name != self.name: +            raise ValueError(f"Attr name is not match: {attr.name} != {self.name}") + +        if attr.value is CruUseDefault.VALUE: +            attr.value = self.make_default_value() +        else: +            attr.value = self.transform_and_validate(attr.value) + +        if attr.description is None: +            attr.description = self.description + +        return attr + +    def make( +        self, value: Any = CruUseDefault.VALUE, description: None | str = None +    ) -> CruAttr: +        value = self.make_default_value() if value is CruUseDefault.VALUE else value +        value = self.transform_and_validate(value) +        return CruAttr( +            self.name, +            value, +            description if description is not None else self.description, +        ) + + +@dataclass +class CruAttrDefBuilder: + +    name: str +    description: str +    types: list[type] | None = field(default=None) +    allow_none: bool = field(default=False) +    default: Any = field(default=CruUseDefault.VALUE) +    default_factory: CruAttrDefaultFactory | None = field(default=None) +    auto_list: bool = field(default=False) +    transformers: list[CruAttrTransformer] = field(default_factory=list) +    validators: list[CruAttrValidator] = field(default_factory=list) +    override_transformer: CruAttrTransformer | None = field(default=None) +    override_validator: CruAttrValidator | None = field(default=None) + +    build_hook: Callable[[CruAttrDef], None] | None = field(default=None) + +    def __init__(self, name: str, description: str) -> None: +        super().__init__() +        self.name = name +        self.description = description + +    def auto_adjust_default(self) -> None: +        if self.default is not CruUseDefault.VALUE and self.default is not None: +            return +        if self.allow_none and self.default is CruUseDefault.VALUE: +            self.default = None +        if not self.allow_none and self.default is None: +            self.default = CruUseDefault.VALUE +        if self.auto_list and not self.allow_none: +            self.default = [] + +    def with_name(self, name: str | CruDontChange) -> CruAttrDefBuilder: +        if name is not CruDontChange.VALUE: +            self.name = name +        return self + +    def with_description( +        self, default_description: str | CruDontChange +    ) -> CruAttrDefBuilder: +        if default_description is not CruDontChange.VALUE: +            self.description = default_description +        return self + +    def with_default(self, default: Any) -> CruAttrDefBuilder: +        if default is not CruDontChange.VALUE: +            self.default = default +        return self + +    def with_default_factory( +        self, +        default_factory: CruAttrDefaultFactory | CruDontChange, +    ) -> CruAttrDefBuilder: +        if default_factory is not CruDontChange.VALUE: +            self.default_factory = default_factory +        return self + +    def with_types( +        self, +        types: Iterable[type] | None | CruDontChange, +    ) -> CruAttrDefBuilder: +        if types is not CruDontChange.VALUE: +            self.types = None if types is None else list(types) +        return self + +    def with_allow_none(self, allow_none: bool | CruDontChange) -> CruAttrDefBuilder: +        if allow_none is not CruDontChange.VALUE: +            self.allow_none = allow_none +        return self + +    def with_auto_list( +        self, auto_list: bool | CruDontChange = True +    ) -> CruAttrDefBuilder: +        if auto_list is not CruDontChange.VALUE: +            self.auto_list = auto_list +        return self + +    def with_constraint( +        self, +        /, +        allow_none: bool | CruDontChange = CruDontChange.VALUE, +        types: Iterable[type] | None | CruDontChange = CruDontChange.VALUE, +        default: Any = CruDontChange.VALUE, +        default_factory: CruAttrDefaultFactory | CruDontChange = CruDontChange.VALUE, +        auto_list: bool | CruDontChange = CruDontChange.VALUE, +    ) -> CruAttrDefBuilder: +        return ( +            self.with_allow_none(allow_none) +            .with_types(types) +            .with_default(default) +            .with_default_factory(default_factory) +            .with_auto_list(auto_list) +        ) + +    def add_transformer(self, transformer: CruAttrTransformer) -> CruAttrDefBuilder: +        self.transformers.append(transformer) +        return self + +    def clear_transformers(self) -> CruAttrDefBuilder: +        self.transformers.clear() +        return self + +    def add_validator(self, validator: CruAttrValidator) -> CruAttrDefBuilder: +        self.validators.append(validator) +        return self + +    def clear_validators(self) -> CruAttrDefBuilder: +        self.validators.clear() +        return self + +    def with_override_transformer( +        self, override_transformer: CruAttrTransformer | None | CruDontChange +    ) -> CruAttrDefBuilder: +        if override_transformer is not CruDontChange.VALUE: +            self.override_transformer = override_transformer +        return self + +    def with_override_validator( +        self, override_validator: CruAttrValidator | None | CruDontChange +    ) -> CruAttrDefBuilder: +        if override_validator is not CruDontChange.VALUE: +            self.override_validator = override_validator +        return self + +    def is_valid(self) -> tuple[bool, str]: +        if not isinstance(self.name, str): +            return False, "Name must be a string!" +        if not isinstance(self.description, str): +            return False, "Default description must be a string!" +        if ( +            not self.allow_none +            and self.default is None +            and self.default_factory is None +        ): +            return False, "Default must be set if allow_none is False!" +        return True, "" + +    @staticmethod +    def _build( +        builder: CruAttrDefBuilder, auto_adjust_default: bool = True +    ) -> CruAttrDef: +        if auto_adjust_default: +            builder.auto_adjust_default() + +        valid, err = builder.is_valid() +        if not valid: +            raise ValueError(err) + +        def composed_transformer(value: Any, attr_def: CruAttrDef) -> Any: +            def transform_value(single_value: Any) -> Any: +                for transformer in builder.transformers: +                    single_value = transformer(single_value, attr_def) +                return single_value + +            if builder.auto_list: +                if not isinstance(value, list): +                    value = [value] +                value = CruIterator(value).transform(transform_value).to_list() + +            else: +                value = transform_value(value) +            return value + +        type_set = None if builder.types is None else CruTypeSet(*builder.types) + +        def composed_validator(value: Any, attr_def: CruAttrDef): +            def validate_value(single_value: Any) -> None: +                if type_set is not None: +                    type_set.check_value(single_value, allow_none=builder.allow_none) +                for validator in builder.validators: +                    validator(single_value, attr_def) + +            if builder.auto_list: +                CruIterator(value).foreach(validate_value) +            else: +                validate_value(value) + +        real_transformer = builder.override_transformer or composed_transformer +        real_validator = builder.override_validator or composed_validator + +        default_factory = builder.default_factory +        if default_factory is None: + +            def default_factory(_d): +                return copy.deepcopy(builder.default) + +        d = CruAttrDef( +            builder.name, +            builder.description, +            default_factory, +            real_transformer, +            real_validator, +        ) +        if builder.build_hook: +            builder.build_hook(d) +        return d + +    def build(self, auto_adjust_default=True) -> CruAttrDef: +        c = copy.deepcopy(self) +        self.build_hook = None +        return CruAttrDefBuilder._build(c, auto_adjust_default) + + +class CruAttrDefRegistry(CruUniqueKeyList[CruAttrDef, str]): + +    def __init__(self) -> None: +        super().__init__(lambda d: d.name) + +    def make_builder(self, name: str, default_description: str) -> CruAttrDefBuilder: +        b = CruAttrDefBuilder(name, default_description) +        b.build_hook = lambda a: self.add(a) +        return b + +    def adopt(self, attr: CruAttr) -> CruAttr: +        d = self.get(attr.name) +        return d.adopt(attr) + + +class CruAttrTable(CruUniqueKeyList[CruAttr, str]): +    def __init__(self, registry: CruAttrDefRegistry) -> None: +        self._registry: CruAttrDefRegistry = registry +        super().__init__(lambda a: a.name, before_add=registry.adopt) + +    @property +    def registry(self) -> CruAttrDefRegistry: +        return self._registry + +    def get_value_or(self, name: str, fallback: Any = CruNotFound.VALUE) -> Any: +        a = self.get_or(name, CruNotFound.VALUE) +        if a is CruNotFound.VALUE: +            return fallback +        return a.value + +    def get_value(self, name: str) -> Any: +        a = self.get(name) +        return a.value + +    def make_attr( +        self, +        name: str, +        value: Any = CruUseDefault.VALUE, +        /, +        description: str | None = None, +    ) -> CruAttr: +        d = self._registry.get(name) +        return d.make(value, description or d.description) + +    def add_value( +        self, +        name: str, +        value: Any = CruUseDefault.VALUE, +        /, +        description: str | None = None, +        *, +        replace: bool = False, +    ) -> CruAttr: +        attr = self.make_attr(name, value, description) +        self.add(attr, replace) +        return attr diff --git a/tools/cru-py/cru/config.py b/tools/cru-py/cru/config.py new file mode 100644 index 0000000..0f6f0d0 --- /dev/null +++ b/tools/cru-py/cru/config.py @@ -0,0 +1,196 @@ +from __future__ import annotations + +from typing import Any, TypeVar, Generic + +from ._error import CruException +from .list import CruUniqueKeyList +from .value import ( +    INTEGER_VALUE_TYPE, +    TEXT_VALUE_TYPE, +    CruValueTypeError, +    ValueGeneratorBase, +    ValueType, +) + +_T = TypeVar("_T") + + +class CruConfigError(CruException): +    def __init__(self, message: str, item: ConfigItem, *args, **kwargs): +        super().__init__(message, *args, **kwargs) +        self._item = item + +    @property +    def item(self) -> ConfigItem: +        return self._item + + +class ConfigItem(Generic[_T]): +    def __init__( +        self, +        name: str, +        description: str, +        value_type: ValueType[_T], +        value: _T | None = None, +        /, +        default: ValueGeneratorBase[_T] | _T | None = None, +    ) -> None: +        self._name = name +        self._description = description +        self._value_type = value_type +        self._value = value +        self._default = default + +    @property +    def name(self) -> str: +        return self._name + +    @property +    def description(self) -> str: +        return self._description + +    @property +    def value_type(self) -> ValueType[_T]: +        return self._value_type + +    @property +    def is_set(self) -> bool: +        return self._value is not None + +    @property +    def value(self) -> _T: +        if self._value is None: +            raise CruConfigError( +                "Config value is not set.", +                self, +                user_message=f"Config item {self.name} is not set.", +            ) +        return self._value + +    @property +    def value_str(self) -> str: +        return self.value_type.convert_value_to_str(self.value) + +    def set_value(self, v: _T | str, allow_convert_from_str=False): +        if allow_convert_from_str: +            self._value = self.value_type.check_value_or_try_convert_from_str(v) +        else: +            self._value = self.value_type.check_value(v) + +    def reset(self): +        self._value = None + +    @property +    def default(self) -> ValueGeneratorBase[_T] | _T | None: +        return self._default + +    @property +    def can_generate_default(self) -> bool: +        return self.default is not None + +    def generate_default_value(self) -> _T: +        if self.default is None: +            raise CruConfigError( +                "Config item does not support default value generation.", self +            ) +        elif isinstance(self.default, ValueGeneratorBase): +            v = self.default.generate() +        else: +            v = self.default +        try: +            self.value_type.check_value(v) +            return v +        except CruValueTypeError as e: +            raise CruConfigError( +                "Config value generator returns an invalid value.", self +            ) from e + +    def copy(self) -> "ConfigItem": +        return ConfigItem( +            self.name, +            self.description, +            self.value_type, +            self.value, +            self.default, +        ) + +    @property +    def description_str(self) -> str: +        return f"{self.name} ({self.value_type.name}): {self.description}" + + +class Configuration(CruUniqueKeyList[ConfigItem[Any], str]): +    def __init__(self): +        super().__init__(lambda c: c.name) + +    def get_set_items(self) -> list[ConfigItem[Any]]: +        return [item for item in self if item.is_set] + +    def get_unset_items(self) -> list[ConfigItem[Any]]: +        return [item for item in self if not item.is_set] + +    @property +    def all_set(self) -> bool: +        return len(self.get_unset_items()) == 0 + +    @property +    def all_not_set(self) -> bool: +        return len(self.get_set_items()) == 0 + +    def add_text_config( +        self, +        name: str, +        description: str, +        value: str | None = None, +        default: ValueGeneratorBase[str] | str | None = None, +    ) -> ConfigItem[str]: +        item = ConfigItem(name, description, TEXT_VALUE_TYPE, value, default) +        self.add(item) +        return item + +    def add_int_config( +        self, +        name: str, +        description: str, +        value: int | None = None, +        default: ValueGeneratorBase[int] | int | None = None, +    ) -> ConfigItem[int]: +        item = ConfigItem(name, description, INTEGER_VALUE_TYPE, value, default) +        self.add(item) +        return item + +    def set_config_item( +        self, +        name: str, +        value: Any | str, +        allow_convert_from_str=True, +    ) -> None: +        item = self.get(name) +        item.set_value( +            value, +            allow_convert_from_str=allow_convert_from_str, +        ) + +    def reset_all(self) -> None: +        for item in self: +            item.reset() + +    def to_dict(self) -> dict[str, Any]: +        return {item.name: item.value for item in self} + +    def to_str_dict(self) -> dict[str, str]: +        return { +            item.name: item.value_type.convert_value_to_str(item.value) for item in self +        } + +    def set_value_dict( +        self, +        value_dict: dict[str, Any], +        allow_convert_from_str: bool = False, +    ) -> None: +        for name, value in value_dict.items(): +            item = self.get(name) +            item.set_value( +                value, +                allow_convert_from_str=allow_convert_from_str, +            ) diff --git a/tools/cru-py/cru/list.py b/tools/cru-py/cru/list.py new file mode 100644 index 0000000..9d210b7 --- /dev/null +++ b/tools/cru-py/cru/list.py @@ -0,0 +1,160 @@ +from __future__ import annotations + +from collections.abc import Callable, Iterator +from typing import Any, Generic, Iterable, TypeAlias, TypeVar, overload + +from ._error import CruInternalError +from ._iter import CruIterator +from ._const import CruNotFound + +_T = TypeVar("_T") +_O = TypeVar("_O") + + +class CruListEdit(CruIterator[_T]): +    def __init__(self, iterable: Iterable[_T], _list: CruList[Any]) -> None: +        super().__init__(iterable) +        self._list = _list + +    def create_me(self, iterable: Iterable[_O]) -> CruListEdit[_O]: +        return CruListEdit(iterable, self._list) + +    @property +    def list(self) -> CruList[Any]: +        return self._list + +    def done(self) -> CruList[Any]: +        self._list.reset(self) +        return self._list + + +class CruList(list[_T]): +    def reset(self, new_values: Iterable[_T]): +        if self is new_values: +            new_values = list(new_values) +        self.clear() +        self.extend(new_values) +        return self + +    def as_cru_iterator(self) -> CruIterator[_T]: +        return CruIterator(self) + +    @staticmethod +    def make(maybe_list: Iterable[_T] | _T | None) -> CruList[_T]: +        if maybe_list is None: +            return CruList() +        if isinstance(maybe_list, Iterable): +            return CruList(maybe_list) +        return CruList([maybe_list]) + + +_K = TypeVar("_K") + +_KeyGetter: TypeAlias = Callable[[_T], _K] + + +class CruUniqueKeyList(Generic[_T, _K]): +    def __init__( +        self, +        key_getter: _KeyGetter[_T, _K], +        *, +        before_add: Callable[[_T], _T] | None = None, +    ): +        super().__init__() +        self._key_getter = key_getter +        self._before_add = before_add +        self._list: CruList[_T] = CruList() + +    @property +    def key_getter(self) -> _KeyGetter[_T, _K]: +        return self._key_getter + +    @property +    def internal_list(self) -> CruList[_T]: +        return self._list + +    def validate_self(self): +        keys = self._list.transform(self._key_getter) +        if len(keys) != len(set(keys)): +            raise CruInternalError("Duplicate keys!") + +    @overload +    def get_or( +        self, key: _K, fallback: CruNotFound = CruNotFound.VALUE +    ) -> _T | CruNotFound: ... + +    @overload +    def get_or(self, key: _K, fallback: _O) -> _T | _O: ... + +    def get_or( +        self, key: _K, fallback: _O | CruNotFound = CruNotFound.VALUE +    ) -> _T | _O | CruNotFound: +        return ( +            self._list.as_cru_iterator() +            .filter(lambda v: key == self._key_getter(v)) +            .first_or(fallback) +        ) + +    def get(self, key: _K) -> _T: +        value = self.get_or(key) +        if value is CruNotFound: +            raise KeyError(f"Key {key} not found!") +        return value  # type: ignore + +    @property +    def keys(self) -> Iterable[_K]: +        return self._list.as_cru_iterator().map(self._key_getter) + +    def has_key(self, key: _K) -> bool: +        return self.get_or(key) != CruNotFound.VALUE + +    def try_remove(self, key: _K) -> bool: +        value = self.get_or(key) +        if value is CruNotFound.VALUE: +            return False +        self._list.remove(value) +        return True + +    def remove(self, key: _K, allow_absence: bool = False) -> None: +        if not self.try_remove(key) and not allow_absence: +            raise KeyError(f"Key {key} not found!") + +    def add(self, value: _T, /, replace: bool = False) -> None: +        v = self.get_or(self._key_getter(value)) +        if v is not CruNotFound.VALUE: +            if not replace: +                raise KeyError(f"Key {self._key_getter(v)} already exists!") +            self._list.remove(v) +        if self._before_add is not None: +            value = self._before_add(value) +        self._list.append(value) + +    def set(self, value: _T) -> None: +        self.add(value, True) + +    def extend(self, iterable: Iterable[_T], /, replace: bool = False) -> None: +        values = list(iterable) +        to_remove = [] +        for value in values: +            v = self.get_or(self._key_getter(value)) +            if v is not CruNotFound.VALUE: +                if not replace: +                    raise KeyError(f"Key {self._key_getter(v)} already exists!") +                to_remove.append(v) +        for value in to_remove: +            self._list.remove(value) +        if self._before_add is not None: +            values = [self._before_add(value) for value in values] +        self._list.extend(values) + +    def clear(self) -> None: +        self._list.reset([]) + +    def __iter__(self) -> Iterator[_T]: +        return iter(self._list) + +    def __len__(self) -> int: +        return len(self._list) + +    def cru_iter(self) -> CruIterator[_T]: +        return CruIterator(self._list) diff --git a/tools/cru-py/cru/parsing.py b/tools/cru-py/cru/parsing.py new file mode 100644 index 0000000..1d2fa7f --- /dev/null +++ b/tools/cru-py/cru/parsing.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from abc import ABCMeta, abstractmethod +from typing import NamedTuple, TypeAlias, TypeVar, Generic, NoReturn, Callable + +from ._error import CruException +from ._iter import CruIterable + +_T = TypeVar("_T") + + +class ParseError(CruException, Generic[_T]): +    def __init__( +        self, +        message, +        parser: Parser[_T], +        text: str, +        line_number: int | None = None, +        *args, +        **kwargs, +    ): +        super().__init__(message, *args, **kwargs) +        self._parser = parser +        self._text = text +        self._line_number = line_number + +    @property +    def parser(self) -> Parser[_T]: +        return self._parser + +    @property +    def text(self) -> str: +        return self._text + +    @property +    def line_number(self) -> int | None: +        return self._line_number + + +class Parser(Generic[_T], metaclass=ABCMeta): +    def __init__(self, name: str) -> None: +        self._name = name + +    @property +    def name(self) -> str: +        return self._name + +    @abstractmethod +    def parse(self, s: str) -> _T: +        raise NotImplementedError() + +    def raise_parse_exception( +        self, text: str, line_number: int | None = None +    ) -> NoReturn: +        a = line_number and f" at line {line_number}" or "" +        raise ParseError(f"Parser {self.name} failed{a}.", self, text, line_number) + + +class SimpleLineConfigParserEntry(NamedTuple): +    key: str +    value: str +    line_number: int | None = None + + +class SimpleLineConfigParserResult(CruIterable.IterList[SimpleLineConfigParserEntry]): +    pass + + +class SimpleLineConfigParser(Parser[SimpleLineConfigParserResult]): +    """ +    The parsing result is a list of tuples (key, value, line number). +    """ + +    Entry: TypeAlias = SimpleLineConfigParserEntry +    Result: TypeAlias = SimpleLineConfigParserResult + +    def __init__(self) -> None: +        super().__init__(type(self).__name__) + +    def _parse(self, text: str, callback: Callable[[Entry], None]) -> None: +        for ln, line in enumerate(text.splitlines()): +            line_number = ln + 1 +            # check if it's a comment +            if line.strip().startswith("#"): +                continue +            # check if there is a '=' +            if line.find("=") == -1: +                self.raise_parse_exception("There is even no '='!", line_number) +            # split at first '=' +            key, value = line.split("=", 1) +            key = key.strip() +            value = value.strip() +            callback(SimpleLineConfigParserEntry(key, value, line_number)) + +    def parse(self, text: str) -> Result: +        result = SimpleLineConfigParserResult() +        self._parse(text, lambda item: result.append(item)) +        return result diff --git a/tools/cru-py/cru/service/__init__.py b/tools/cru-py/cru/service/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tools/cru-py/cru/service/__init__.py diff --git a/tools/cru-py/cru/service/__main__.py b/tools/cru-py/cru/service/__main__.py new file mode 100644 index 0000000..1c10e82 --- /dev/null +++ b/tools/cru-py/cru/service/__main__.py @@ -0,0 +1,20 @@ +from cru import CruException + +from ._app import create_app + + +def main(): +    app = create_app() +    app.run_command() + + +if __name__ == "__main__": +    try: +        main() +    except CruException as e: +        user_message = e.get_user_message() +        if user_message is not None: +            print(f"Error: {user_message}") +            exit(1) +        else: +            raise diff --git a/tools/cru-py/cru/service/_app.py b/tools/cru-py/cru/service/_app.py new file mode 100644 index 0000000..6030dad --- /dev/null +++ b/tools/cru-py/cru/service/_app.py @@ -0,0 +1,34 @@ +from ._base import ( +    AppBase, +    CommandDispatcher, +    AppInitializer, +    PathCommandProvider, +) +from ._config import ConfigManager +from ._template import TemplateManager +from ._nginx import NginxManager +from ._external import CliToolCommandProvider + +APP_ID = "crupest" + + +class App(AppBase): +    def __init__(self): +        super().__init__(APP_ID, f"{APP_ID}-service") +        self.add_feature(PathCommandProvider()) +        self.add_feature(AppInitializer()) +        self.add_feature(ConfigManager()) +        self.add_feature(TemplateManager()) +        self.add_feature(NginxManager()) +        self.add_feature(CliToolCommandProvider()) +        self.add_feature(CommandDispatcher()) + +    def run_command(self): +        command_dispatcher = self.get_feature(CommandDispatcher) +        command_dispatcher.run_command() + + +def create_app() -> App: +    app = App() +    app.setup() +    return app diff --git a/tools/cru-py/cru/service/_base.py b/tools/cru-py/cru/service/_base.py new file mode 100644 index 0000000..ad813c9 --- /dev/null +++ b/tools/cru-py/cru/service/_base.py @@ -0,0 +1,449 @@ +from __future__ import annotations + +from argparse import ArgumentParser, Namespace +from abc import ABC, abstractmethod +import argparse +import os +from pathlib import Path +from typing import TypeVar, overload + +from cru import CruException, CruLogicError + +_Feature = TypeVar("_Feature", bound="AppFeatureProvider") + + +class AppError(CruException): +    pass + + +class AppFeatureError(AppError): +    def __init__(self, message, feature: type | str, *args, **kwargs): +        super().__init__(message, *args, **kwargs) +        self._feature = feature + +    @property +    def feature(self) -> type | str: +        return self._feature + + +class AppPathError(CruException): +    def __init__(self, message, _path: str | Path, *args, **kwargs): +        super().__init__(message, *args, **kwargs) +        self._path = str(_path) + +    @property +    def path(self) -> str: +        return self._path + + +class AppPath(ABC): +    def __init__(self, id: str, is_dir: bool, description: str) -> None: +        self._is_dir = is_dir +        self._id = id +        self._description = description + +    @property +    @abstractmethod +    def parent(self) -> AppPath | None: ... + +    @property +    @abstractmethod +    def app(self) -> AppBase: ... + +    @property +    def id(self) -> str: +        return self._id + +    @property +    def description(self) -> str: +        return self._description + +    @property +    def is_dir(self) -> bool: +        return self._is_dir + +    @property +    @abstractmethod +    def full_path(self) -> Path: ... + +    @property +    def full_path_str(self) -> str: +        return str(self.full_path) + +    def check_parents(self, must_exist: bool = False) -> bool: +        for p in reversed(self.full_path.parents): +            if not p.exists() and not must_exist: +                return False +            if not p.is_dir(): +                raise AppPathError("Parents' path must be a dir.", self.full_path) +        return True + +    def check_self(self, must_exist: bool = False) -> bool: +        if not self.check_parents(must_exist): +            return False +        if not self.full_path.exists(): +            if not must_exist: +                return False +            raise AppPathError("Not exist.", self.full_path) +        if self.is_dir: +            if not self.full_path.is_dir(): +                raise AppPathError("Should be a directory, but not.", self.full_path) +            else: +                return True +        else: +            if not self.full_path.is_file(): +                raise AppPathError("Should be a file, but not.", self.full_path) +            else: +                return True + +    def ensure(self, create_file: bool = False) -> None: +        e = self.check_self(False) +        if not e: +            os.makedirs(self.full_path.parent, exist_ok=True) +            if self.is_dir: +                os.mkdir(self.full_path) +            elif create_file: +                with open(self.full_path, "w") as f: +                    f.write("") + +    def add_subpath( +        self, +        name: str, +        is_dir: bool, +        /, +        id: str | None = None, +        description: str = "", +    ) -> AppFeaturePath: +        return self.app.add_path(name, is_dir, self, id, description) + +    @property +    def app_relative_path(self) -> Path: +        return self.full_path.relative_to(self.app.root.full_path) + + +class AppFeaturePath(AppPath): +    def __init__( +        self, +        parent: AppPath, +        name: str, +        is_dir: bool, +        /, +        id: str | None = None, +        description: str = "", +    ) -> None: +        super().__init__(id or name, is_dir, description) +        self._name = name +        self._parent = parent + +    @property +    def name(self) -> str: +        return self._name + +    @property +    def parent(self) -> AppPath: +        return self._parent + +    @property +    def app(self) -> AppBase: +        return self.parent.app + +    @property +    def full_path(self) -> Path: +        return Path(self.parent.full_path, self.name).resolve() + + +class AppRootPath(AppPath): +    def __init__(self, app: AppBase): +        super().__init__("root", True, "Application root path.") +        self._app = app +        self._full_path: Path | None = None + +    @property +    def parent(self) -> None: +        return None + +    @property +    def app(self) -> AppBase: +        return self._app + +    @property +    def full_path(self) -> Path: +        if self._full_path is None: +            raise AppError("App root path is not set yet.") +        return self._full_path + +    def setup(self, path: os.PathLike) -> None: +        if self._full_path is not None: +            raise AppError("App root path is already set.") +        self._full_path = Path(path).resolve() + + +class AppFeatureProvider(ABC): +    def __init__(self, name: str, /, app: AppBase | None = None): +        super().__init__() +        self._name = name +        self._app = app if app else AppBase.get_instance() + +    @property +    def app(self) -> AppBase: +        return self._app + +    @property +    def name(self) -> str: +        return self._name + +    @abstractmethod +    def setup(self) -> None: ... + + +class AppCommandFeatureProvider(AppFeatureProvider): +    @abstractmethod +    def get_command_info(self) -> tuple[str, str]: ... + +    @abstractmethod +    def setup_arg_parser(self, arg_parser: ArgumentParser): ... + +    @abstractmethod +    def run_command(self, args: Namespace) -> None: ... + + +DATA_DIR_NAME = "data" + + +class PathCommandProvider(AppCommandFeatureProvider): +    def __init__(self) -> None: +        super().__init__("path-command-provider") + +    def setup(self): +        pass + +    def get_command_info(self): +        return ("path", "Get information about paths used by app.") + +    def setup_arg_parser(self, arg_parser: ArgumentParser) -> None: +        subparsers = arg_parser.add_subparsers( +            dest="path_command", required=True, metavar="PATH_COMMAND" +        ) +        _list_parser = subparsers.add_parser( +            "list", help="list special paths used by app" +        ) + +    def run_command(self, args: Namespace) -> None: +        if args.path_command == "list": +            for path in self.app.paths: +                print(f"{path.app_relative_path.as_posix()}: {path.description}") + + +class CommandDispatcher(AppFeatureProvider): +    def __init__(self) -> None: +        super().__init__("command-dispatcher") +        self._parsed_args: argparse.Namespace | None = None + +    def setup_arg_parser(self) -> None: +        epilog = """ +==> to start, +./tools/manage init +./tools/manage config init +ln -s generated/docker-compose.yaml . +# Then edit config file. + +==> to update +git pull +./tools/manage template generate --no-dry-run +docker compose up +        """.strip() + +        self._map: dict[str, AppCommandFeatureProvider] = {} +        arg_parser = argparse.ArgumentParser( +            description="Service management", +            formatter_class=argparse.RawDescriptionHelpFormatter, +            epilog=epilog, +        ) +        arg_parser.add_argument( +            "--project-dir", +            help="The path of the project directory.", +            required=True, +            type=str, +        ) +        subparsers = arg_parser.add_subparsers( +            dest="command", +            help="The management command to execute.", +            metavar="COMMAND", +        ) +        for feature in self.app.features: +            if isinstance(feature, AppCommandFeatureProvider): +                info = feature.get_command_info() +                command_subparser = subparsers.add_parser(info[0], help=info[1]) +                feature.setup_arg_parser(command_subparser) +                self._map[info[0]] = feature +        self._arg_parser = arg_parser + +    def setup(self): +        pass + +    @property +    def arg_parser(self) -> argparse.ArgumentParser: +        return self._arg_parser + +    @property +    def map(self) -> dict[str, AppCommandFeatureProvider]: +        return self._map + +    def get_program_parsed_args(self) -> argparse.Namespace: +        if self._parsed_args is None: +            self._parsed_args = self.arg_parser.parse_args() +        return self._parsed_args + +    def run_command(self, args: argparse.Namespace | None = None) -> None: +        real_args = args or self.get_program_parsed_args() +        if real_args.command is None: +            self.arg_parser.print_help() +            return +        self.map[real_args.command].run_command(real_args) + + +class AppInitializer(AppCommandFeatureProvider): +    def __init__(self) -> None: +        super().__init__("app-initializer") + +    def _init_app(self) -> bool: +        if self.app.app_initialized: +            return False +        self.app.data_dir.ensure() +        return True + +    def setup(self): +        pass + +    def get_command_info(self): +        return ("init", "Initialize the app.") + +    def setup_arg_parser(self, arg_parser): +        pass + +    def run_command(self, args): +        init = self._init_app() +        if init: +            print("App initialized successfully.") +        else: +            print("App is already initialized. Do nothing.") + + +class AppBase: +    _instance: AppBase | None = None + +    @staticmethod +    def get_instance() -> AppBase: +        if AppBase._instance is None: +            raise AppError("App instance not initialized") +        return AppBase._instance + +    def __init__(self, app_id: str, name: str): +        AppBase._instance = self +        self._app_id = app_id +        self._name = name +        self._root = AppRootPath(self) +        self._paths: list[AppFeaturePath] = [] +        self._features: list[AppFeatureProvider] = [] + +    def setup(self) -> None: +        command_dispatcher = self.get_feature(CommandDispatcher) +        command_dispatcher.setup_arg_parser() +        program_args = command_dispatcher.get_program_parsed_args() +        self.setup_root(program_args.project_dir) +        self._data_dir = self.add_path(DATA_DIR_NAME, True, id="data") +        for feature in self.features: +            feature.setup() +        for path in self.paths: +            path.check_self() + +    @property +    def app_id(self) -> str: +        return self._app_id + +    @property +    def name(self) -> str: +        return self._name + +    @property +    def root(self) -> AppRootPath: +        return self._root + +    def setup_root(self, path: os.PathLike) -> None: +        self._root.setup(path) + +    @property +    def data_dir(self) -> AppFeaturePath: +        return self._data_dir + +    @property +    def app_initialized(self) -> bool: +        return self.data_dir.check_self() + +    def ensure_app_initialized(self) -> AppRootPath: +        if not self.app_initialized: +            raise AppError( +                user_message="Root directory does not exist. " +                "Please run 'init' to create one." +            ) +        return self.root + +    @property +    def features(self) -> list[AppFeatureProvider]: +        return self._features + +    @property +    def paths(self) -> list[AppFeaturePath]: +        return self._paths + +    def add_feature(self, feature: _Feature) -> _Feature: +        for f in self.features: +            if f.name == feature.name: +                raise AppFeatureError( +                    f"Duplicate feature name: {feature.name}.", feature.name +                ) +        self._features.append(feature) +        return feature + +    def add_path( +        self, +        name: str, +        is_dir: bool, +        /, +        parent: AppPath | None = None, +        id: str | None = None, +        description: str = "", +    ) -> AppFeaturePath: +        p = AppFeaturePath( +            parent or self.root, name, is_dir, id=id, description=description +        ) +        self._paths.append(p) +        return p + +    @overload +    def get_feature(self, feature: str) -> AppFeatureProvider: ... + +    @overload +    def get_feature(self, feature: type[_Feature]) -> _Feature: ... + +    def get_feature( +        self, feature: str | type[_Feature] +    ) -> AppFeatureProvider | _Feature: +        if isinstance(feature, str): +            for f in self._features: +                if f.name == feature: +                    return f +        elif isinstance(feature, type): +            for f in self._features: +                if isinstance(f, feature): +                    return f +        else: +            raise CruLogicError("Argument must be the name of feature or its class.") + +        raise AppFeatureError(f"Feature {feature} not found.", feature) + +    def get_path(self, name: str) -> AppFeaturePath: +        for p in self._paths: +            if p.id == name or p.name == name: +                return p +        raise AppPathError(f"Application path {name} not found.", name) diff --git a/tools/cru-py/cru/service/_config.py b/tools/cru-py/cru/service/_config.py new file mode 100644 index 0000000..b51e21c --- /dev/null +++ b/tools/cru-py/cru/service/_config.py @@ -0,0 +1,446 @@ +from collections.abc import Iterable +from typing import Any, Literal, overload + +from cru import CruException +from cru.config import Configuration, ConfigItem +from cru.value import ( +    INTEGER_VALUE_TYPE, +    TEXT_VALUE_TYPE, +    CruValueTypeError, +    RandomStringValueGenerator, +    UuidValueGenerator, +) +from cru.parsing import ParseError, SimpleLineConfigParser + +from ._base import AppFeaturePath, AppCommandFeatureProvider + + +class AppConfigError(CruException): +    def __init__( +        self, message: str, configuration: Configuration, *args, **kwargs +    ) -> None: +        super().__init__(message, *args, **kwargs) +        self._configuration = configuration + +    @property +    def configuration(self) -> Configuration: +        return self._configuration + + +class AppConfigFileError(AppConfigError): +    def __init__( +        self, +        message: str, +        configuration: Configuration, +        *args, +        **kwargs, +    ) -> None: +        super().__init__(message, configuration, *args, **kwargs) + + +class AppConfigFileNotFoundError(AppConfigFileError): +    def __init__( +        self, +        message: str, +        configuration: Configuration, +        file_path: str, +        *args, +        **kwargs, +    ) -> None: +        super().__init__(message, configuration, *args, **kwargs) +        self._file_path = file_path + +    @property +    def file_path(self) -> str: +        return self._file_path + + +class AppConfigFileParseError(AppConfigFileError): +    def __init__( +        self, +        message: str, +        configuration: Configuration, +        file_content: str, +        *args, +        **kwargs, +    ) -> None: +        super().__init__(message, configuration, *args, **kwargs) +        self._file_content = file_content +        self.__cause__: ParseError + +    @property +    def file_content(self) -> str: +        return self._file_content + +    def get_user_message(self) -> str: +        return f"Error while parsing config file at line {self.__cause__.line_number}." + + +class AppConfigFileEntryError(AppConfigFileError): +    def __init__( +        self, +        message: str, +        configuration: Configuration, +        entries: Iterable[SimpleLineConfigParser.Entry], +        *args, +        **kwargs, +    ) -> None: +        super().__init__(message, configuration, *args, **kwargs) +        self._entries = list(entries) + +    @property +    def error_entries(self) -> list[SimpleLineConfigParser.Entry]: +        return self._entries + +    @staticmethod +    def entries_to_friendly_message( +        entries: Iterable[SimpleLineConfigParser.Entry], +    ) -> str: +        return "\n".join( +            f"line {entry.line_number}: {entry.key}={entry.value}" for entry in entries +        ) + +    @property +    def friendly_message_head(self) -> str: +        return "Error entries found in config file" + +    def get_user_message(self) -> str: +        return ( +            f"{self.friendly_message_head}:\n" +            f"{self.entries_to_friendly_message(self.error_entries)}" +        ) + + +class AppConfigDuplicateEntryError(AppConfigFileEntryError): +    @property +    def friendly_message_head(self) -> str: +        return "Duplicate entries found in config file" + + +class AppConfigEntryValueFormatError(AppConfigFileEntryError): +    @property +    def friendly_message_head(self) -> str: +        return "Invalid value format for entries" + + +class AppConfigItemNotSetError(AppConfigError): +    def __init__( +        self, +        message: str, +        configuration: Configuration, +        items: list[ConfigItem], +        *args, +        **kwargs, +    ) -> None: +        super().__init__(message, configuration, *args, **kwargs) +        self._items = items + + +class ConfigManager(AppCommandFeatureProvider): +    def __init__(self) -> None: +        super().__init__("config-manager") +        configuration = Configuration() +        self._configuration = configuration +        self._loaded: bool = False +        self._init_app_defined_items() + +    def _init_app_defined_items(self) -> None: +        prefix = self.config_name_prefix + +        def _add_text(name: str, description: str) -> ConfigItem: +            item = ConfigItem(f"{prefix}_{name}", description, TEXT_VALUE_TYPE) +            self.configuration.add(item) +            return item + +        def _add_uuid(name: str, description: str) -> ConfigItem: +            item = ConfigItem( +                f"{prefix}_{name}", +                description, +                TEXT_VALUE_TYPE, +                default=UuidValueGenerator(), +            ) +            self.configuration.add(item) +            return item + +        def _add_random_string( +            name: str, description: str, length: int = 32, secure: bool = True +        ) -> ConfigItem: +            item = ConfigItem( +                f"{prefix}_{name}", +                description, +                TEXT_VALUE_TYPE, +                default=RandomStringValueGenerator(length, secure), +            ) +            self.configuration.add(item) +            return item + +        def _add_int(name: str, description: str) -> ConfigItem: +            item = ConfigItem(f"{prefix}_{name}", description, INTEGER_VALUE_TYPE) +            self.configuration.add(item) +            return item + +        self._domain = _add_text("DOMAIN", "domain name") +        self._email = _add_text("EMAIL", "admin email address") +        _add_text( +            "AUTO_BACKUP_COS_SECRET_ID", +            "access key id for Tencent COS, used for auto backup", +        ) +        _add_text( +            "AUTO_BACKUP_COS_SECRET_KEY", +            "access key secret for Tencent COS, used for auto backup", +        ) +        _add_text( +            "AUTO_BACKUP_COS_REGION", "region for Tencent COS, used for auto backup" +        ) +        _add_text( +            "AUTO_BACKUP_BUCKET_NAME", +            "bucket name for Tencent COS, used for auto backup", +        ) +        _add_text("GITHUB_USERNAME", "github username for fetching todos") +        _add_int("GITHUB_PROJECT_NUMBER", "github project number for fetching todos") +        _add_text("GITHUB_TOKEN", "github token for fetching todos") +        _add_text("GITHUB_TODO_COUNT", "github todo count") +        _add_uuid("V2RAY_TOKEN", "v2ray user id") +        _add_uuid("V2RAY_PATH", "v2ray path, which will be prefixed by _") +        _add_text("FORGEJO_MAILER_USER", "Forgejo SMTP user") +        _add_text("FORGEJO_MAILER_PASSWD", "Forgejo SMTP password") +        _add_random_string("2FAUTH_APP_KEY", "2FAuth App Key") +        _add_text("2FAUTH_MAIL_USERNAME", "2FAuth SMTP user") +        _add_text("2FAUTH_MAIL_PASSWORD", "2FAuth SMTP password") + +    def setup(self) -> None: +        self._config_file_path = self.app.data_dir.add_subpath( +            "config", False, description="Configuration file path." +        ) + +    @property +    def config_name_prefix(self) -> str: +        return self.app.app_id.upper() + +    @property +    def configuration(self) -> Configuration: +        return self._configuration + +    @property +    def config_file_path(self) -> AppFeaturePath: +        return self._config_file_path + +    @property +    def all_set(self) -> bool: +        return self.configuration.all_set + +    def get_item(self, name: str) -> ConfigItem[Any]: +        if not name.startswith(self.config_name_prefix + "_"): +            name = f"{self.config_name_prefix}_{name}" + +        item = self.configuration.get_or(name, None) +        if item is None: +            raise AppConfigError(f"Config item '{name}' not found.", self.configuration) +        return item + +    @overload +    def get_item_value_str(self, name: str) -> str: ... + +    @overload +    def get_item_value_str(self, name: str, ensure_set: Literal[True]) -> str: ... + +    @overload +    def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: ... + +    def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: +        self.load_config_file() +        item = self.get_item(name) +        if not item.is_set: +            if ensure_set: +                raise AppConfigItemNotSetError( +                    f"Config item '{name}' is not set.", self.configuration, [item] +                ) +            return None +        return item.value_str + +    def get_str_dict(self, ensure_all_set: bool = True) -> dict[str, str]: +        self.load_config_file() +        if ensure_all_set and not self.configuration.all_set: +            raise AppConfigItemNotSetError( +                "Some config items are not set.", +                self.configuration, +                self.configuration.get_unset_items(), +            ) +        return self.configuration.to_str_dict() + +    @property +    def domain_item_name(self) -> str: +        return self._domain.name + +    def get_domain_value_str(self) -> str: +        return self.get_item_value_str(self._domain.name) + +    def get_email_value_str_optional(self) -> str | None: +        return self.get_item_value_str(self._email.name, ensure_set=False) + +    def _set_with_default(self) -> None: +        if not self.configuration.all_not_set: +            raise AppConfigError( +                "Config is not clean. " +                "Some config items are already set. " +                "Can't set again with default value.", +                self.configuration, +            ) +        for item in self.configuration: +            if item.can_generate_default: +                item.set_value(item.generate_default_value()) + +    def _to_config_file_content(self) -> str: +        content = "".join( +            [ +                f"{item.name}={item.value_str if item.is_set else ''}\n" +                for item in self.configuration +            ] +        ) +        return content + +    def _create_init_config_file(self) -> None: +        if self.config_file_path.check_self(): +            raise AppConfigError( +                "Config file already exists.", +                self.configuration, +                user_message=f"The config file at " +                f"{self.config_file_path.full_path_str} already exists.", +            ) +        self._set_with_default() +        self.config_file_path.ensure() +        with open( +            self.config_file_path.full_path, "w", encoding="utf-8", newline="\n" +        ) as file: +            file.write(self._to_config_file_content()) + +    def _parse_config_file(self) -> SimpleLineConfigParser.Result: +        if not self.config_file_path.check_self(): +            raise AppConfigFileNotFoundError( +                "Config file not found.", +                self.configuration, +                self.config_file_path.full_path_str, +                user_message=f"The config file at " +                f"{self.config_file_path.full_path_str} does not exist. " +                f"You can create an initial one with 'init' command.", +            ) + +        text = self.config_file_path.full_path.read_text() +        try: +            parser = SimpleLineConfigParser() +            return parser.parse(text) +        except ParseError as e: +            raise AppConfigFileParseError( +                "Failed to parse config file.", self.configuration, text +            ) from e + +    def _parse_and_print_config_file(self) -> None: +        parse_result = self._parse_config_file() +        for entry in parse_result: +            print(f"{entry.key}={entry.value}") + +    def _check_duplicate( +        self, +        parse_result: dict[str, list[SimpleLineConfigParser.Entry]], +    ) -> dict[str, SimpleLineConfigParser.Entry]: +        entry_dict: dict[str, SimpleLineConfigParser.Entry] = {} +        duplicate_entries: list[SimpleLineConfigParser.Entry] = [] +        for key, entries in parse_result.items(): +            entry_dict[key] = entries[0] +            if len(entries) > 1: +                duplicate_entries.extend(entries) +        if len(duplicate_entries) > 0: +            raise AppConfigDuplicateEntryError( +                "Duplicate entries found.", self.configuration, duplicate_entries +            ) + +        return entry_dict + +    def _check_type( +        self, entry_dict: dict[str, SimpleLineConfigParser.Entry] +    ) -> dict[str, Any]: +        value_dict: dict[str, Any] = {} +        error_entries: list[SimpleLineConfigParser.Entry] = [] +        errors: list[CruValueTypeError] = [] +        for key, entry in entry_dict.items(): +            config_item = self.configuration.get(key) +            try: +                if entry.value == "": +                    value_dict[key] = None +                else: +                    value_dict[key] = config_item.value_type.convert_str_to_value( +                        entry.value +                    ) +            except CruValueTypeError as e: +                error_entries.append(entry) +                errors.append(e) +        if len(error_entries) > 0: +            raise AppConfigEntryValueFormatError( +                "Entry value format is not correct.", +                self.configuration, +                error_entries, +            ) from ExceptionGroup("Multiple format errors occurred.", errors) +        return value_dict + +    def _read_config_file(self) -> dict[str, Any]: +        parsed = self._parse_config_file() +        entry_groups = parsed.cru_iter().group_by(lambda e: e.key) +        entry_dict = self._check_duplicate(entry_groups) +        value_dict = self._check_type(entry_dict) +        return value_dict + +    def _real_load_config_file(self) -> None: +        self.configuration.reset_all() +        value_dict = self._read_config_file() +        for key, value in value_dict.items(): +            if value is None: +                continue +            self.configuration.set_config_item(key, value) + +    def load_config_file(self, force=False) -> None: +        if force or not self._loaded: +            self._real_load_config_file() +            self._loaded = True + +    def _print_app_config_info(self): +        for item in self.configuration: +            print(item.description_str) + +    def get_command_info(self): +        return "config", "Manage configuration." + +    def setup_arg_parser(self, arg_parser) -> None: +        subparsers = arg_parser.add_subparsers( +            dest="config_command", required=True, metavar="CONFIG_COMMAND" +        ) +        _init_parser = subparsers.add_parser( +            "init", help="create an initial config file" +        ) +        _print_app_parser = subparsers.add_parser( +            "print-app", +            help="print information of the config items defined by app", +        ) +        _print_parser = subparsers.add_parser("print", help="print current config") +        _check_config_parser = subparsers.add_parser( +            "check", +            help="check the validity of the config file", +        ) +        _check_config_parser.add_argument( +            "-f", +            "--format-only", +            action="store_true", +            help="only check content format, not app config item requirements.", +        ) + +    def run_command(self, args) -> None: +        if args.config_command == "init": +            self._create_init_config_file() +        elif args.config_command == "print-app": +            self._print_app_config_info() +        elif args.config_command == "print": +            self._parse_and_print_config_file() +        elif args.config_command == "check": +            if args.format_only: +                self._parse_config_file() +            else: +                self._read_config_file() diff --git a/tools/cru-py/cru/service/_external.py b/tools/cru-py/cru/service/_external.py new file mode 100644 index 0000000..2347e95 --- /dev/null +++ b/tools/cru-py/cru/service/_external.py @@ -0,0 +1,81 @@ +from ._base import AppCommandFeatureProvider +from ._nginx import NginxManager + + +class CliToolCommandProvider(AppCommandFeatureProvider): +    def __init__(self) -> None: +        super().__init__("cli-tool-command-provider") + +    def setup(self): +        pass + +    def get_command_info(self): +        return ("gen-cli", "Get commands of running external cli tools.") + +    def setup_arg_parser(self, arg_parser): +        subparsers = arg_parser.add_subparsers( +            dest="gen_cli_command", required=True, metavar="GEN_CLI_COMMAND" +        ) +        certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") +        certbot_parser.add_argument( +            "-t", "--test", action="store_true", help="run certbot in test mode" +        ) +        _install_docker_parser = subparsers.add_parser( +            "install-docker", help="print docker installation commands" +        ) +        _update_blog_parser = subparsers.add_parser( +            "update-blog", help="print blog update command" +        ) + +    def _print_install_docker_commands(self) -> None: +        output = """ +### COMMAND: uninstall apt docker +for pkg in docker.io docker-doc docker-compose \ +podman-docker containerd runc; \ +do sudo apt-get remove $pkg; done + +### COMMAND: prepare apt certs +sudo apt-get update +sudo apt-get install ca-certificates curl +sudo install -m 0755 -d /etc/apt/keyrings + +### COMMAND: install certs +sudo curl -fsSL https://download.docker.com/linux/debian/gpg \ +-o /etc/apt/keyrings/docker.asc +sudo chmod a+r /etc/apt/keyrings/docker.asc + +### COMMAND: add docker apt source +echo \\ +  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \ +https://download.docker.com/linux/debian \\ +  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\ +  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null + +### COMMAND: update apt and install docker +sudo apt-get update +sudo apt-get install docker-ce docker-ce-cli containerd.io \ +docker-buildx-plugin docker-compose-plugin + +### COMMAND: setup system for docker +sudo systemctl enable docker +sudo systemctl start docker +sudo groupadd -f docker +sudo usermod -aG docker $USER +# Remember to log out and log back in for the group changes to take effect +""".strip() +        print(output) + +    def _print_update_blog_command(self): +        output = """ +### COMMAND: update blog +docker exec -it blog /scripts/update.bash +""".strip() +        print(output) + +    def run_command(self, args): +        if args.gen_cli_command == "certbot": +            self.app.get_feature(NginxManager).print_all_certbot_commands(args.test) +        elif args.gen_cli_command == "install-docker": +            self._print_install_docker_commands() +        elif args.gen_cli_command == "update-blog": +            self._print_update_blog_command()
\ No newline at end of file diff --git a/tools/cru-py/cru/service/_nginx.py b/tools/cru-py/cru/service/_nginx.py new file mode 100644 index 0000000..e0a9c60 --- /dev/null +++ b/tools/cru-py/cru/service/_nginx.py @@ -0,0 +1,281 @@ +from argparse import Namespace +from enum import Enum, auto +import re +import subprocess +from typing import TypeAlias + +from cru import CruInternalError + +from ._base import AppCommandFeatureProvider +from ._config import ConfigManager +from ._template import TemplateManager + + +class CertbotAction(Enum): +    CREATE = auto() +    EXPAND = auto() +    SHRINK = auto() +    RENEW = auto() + + +class NginxManager(AppCommandFeatureProvider): +    CertbotAction: TypeAlias = CertbotAction + +    def __init__(self) -> None: +        super().__init__("nginx-manager") +        self._domains_cache: list[str] | None = None + +    def setup(self) -> None: +        pass + +    @property +    def _config_manager(self) -> ConfigManager: +        return self.app.get_feature(ConfigManager) + +    @property +    def root_domain(self) -> str: +        return self._config_manager.get_domain_value_str() + +    @property +    def domains(self) -> list[str]: +        if self._domains_cache is None: +            self._domains_cache = self._get_domains() +        return self._domains_cache + +    @property +    def subdomains(self) -> list[str]: +        suffix = "." + self.root_domain +        return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)] + +    @property +    def _domain_config_name(self) -> str: +        return self._config_manager.domain_item_name + +    def _get_domains_from_text(self, text: str) -> set[str]: +        domains: set[str] = set() +        regex = re.compile(r"server_name\s+(\S+)\s*;") +        domain_variable_str = f"${self._domain_config_name}" +        brace_domain_variable_regex = re.compile( +            r"\$\{\s*" + self._domain_config_name + r"\s*\}" +        ) +        for match in regex.finditer(text): +            domain_part = match.group(1) +            if domain_variable_str in domain_part: +                domains.add(domain_part.replace(domain_variable_str, self.root_domain)) +                continue +            m = brace_domain_variable_regex.search(domain_part) +            if m: +                domains.add(domain_part.replace(m.group(0), self.root_domain)) +                continue +            domains.add(domain_part) +        return domains + +    def _get_nginx_conf_template_text(self) -> str: +        template_manager = self.app.get_feature(TemplateManager) +        text = "" +        for path, template in template_manager.template_tree.templates: +            if path.as_posix().startswith("nginx/"): +                text += template.raw_text +        return text + +    def _get_domains(self) -> list[str]: +        text = self._get_nginx_conf_template_text() +        domains = list(self._get_domains_from_text(text)) +        domains.remove(self.root_domain) +        return [self.root_domain, *domains] + +    def _print_domains(self) -> None: +        for domain in self.domains: +            print(domain) + +    def _certbot_command( +        self, +        action: CertbotAction | str, +        test: bool, +        *, +        docker=True, +        standalone=None, +        email=None, +        agree_tos=True, +    ) -> str: +        if isinstance(action, str): +            action = CertbotAction[action.upper()] + +        command_args = [] + +        add_domain_option = True +        if action is CertbotAction.CREATE: +            if standalone is None: +                standalone = True +            command_action = "certonly" +        elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]: +            if standalone is None: +                standalone = False +            command_action = "certonly" +        elif action is CertbotAction.RENEW: +            if standalone is None: +                standalone = False +            add_domain_option = False +            command_action = "renew" +        else: +            raise CruInternalError("Invalid certbot action.") + +        data_dir = self.app.data_dir.full_path.as_posix() + +        if not docker: +            command_args.append("certbot") +        else: +            command_args.extend( +                [ +                    "docker run -it --rm --name certbot", +                    f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"', +                    f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"', +                ] +            ) +            if standalone: +                command_args.append('-p "0.0.0.0:80:80"') +            else: +                command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"') + +            command_args.append("certbot/certbot") + +        command_args.append(command_action) + +        command_args.append(f"--cert-name {self.root_domain}") + +        if standalone: +            command_args.append("--standalone") +        else: +            command_args.append("--webroot -w /var/www/certbot") + +        if add_domain_option: +            command_args.append(" ".join([f"-d {domain}" for domain in self.domains])) + +        if email is not None: +            command_args.append(f"--email {email}") + +        if agree_tos: +            command_args.append("--agree-tos") + +        if test: +            command_args.append("--test-cert --dry-run") + +        return " ".join(command_args) + +    def print_all_certbot_commands(self, test: bool): +        print("### COMMAND: (standalone) create certs") +        print( +            self._certbot_command( +                CertbotAction.CREATE, +                test, +                email=self._config_manager.get_email_value_str_optional(), +            ) +        ) +        print() +        print("### COMMAND: (webroot+nginx) expand or shrink certs") +        print( +            self._certbot_command( +                CertbotAction.EXPAND, +                test, +                email=self._config_manager.get_email_value_str_optional(), +            ) +        ) +        print() +        print("### COMMAND: (webroot+nginx) renew certs") +        print( +            self._certbot_command( +                CertbotAction.RENEW, +                test, +                email=self._config_manager.get_email_value_str_optional(), +            ) +        ) + +    @property +    def _cert_path_str(self) -> str: +        return str( +            self.app.data_dir.full_path +            / "certbot/certs/live" +            / self.root_domain +            / "fullchain.pem" +        ) + +    def get_command_info(self): +        return "nginx", "Manage nginx related things." + +    def setup_arg_parser(self, arg_parser): +        subparsers = arg_parser.add_subparsers( +            dest="nginx_command", required=True, metavar="NGINX_COMMAND" +        ) +        _list_parser = subparsers.add_parser("list", help="list domains") +        certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") +        certbot_parser.add_argument( +            "--no-test", +            action="store_true", +            help="remove args making certbot run in test mode", +        ) + +    def run_command(self, args: Namespace) -> None: +        if args.nginx_command == "list": +            self._print_domains() +        elif args.nginx_command == "certbot": +            self.print_all_certbot_commands(not args.no_test) + +    def _generate_dns_zone( +        self, +        ip: str, +        /, +        ttl: str | int = 600, +        *, +        enable_mail: bool = True, +        dkim: str | None = None, +    ) -> str: +        # TODO: Not complete and test now. +        root_domain = self.root_domain +        result = f"$ORIGIN {root_domain}.\n\n" +        result += "; A records\n" +        result += f"@ {ttl} IN A {ip}\n" +        for subdomain in self.subdomains: +            result += f"{subdomain} {ttl} IN A {ip}\n" + +        if enable_mail: +            result += "\n; MX records\n" +            result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n" +            result += "\n; SPF record\n" +            result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n' +            if dkim is not None: +                result += "\n; DKIM record\n" +                result += f'mail._domainkey {ttl} IN TEXT "{dkim}"' +                result += "\n; DMARC record\n" +                dmarc_options = [ +                    "v=DMARC1", +                    "p=none", +                    f"rua=mailto:dmarc.report@{root_domain}", +                    f"ruf=mailto:dmarc.report@{root_domain}", +                    "sp=none", +                    "ri=86400", +                ] +                result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n' +        return result + +    def _get_dkim_from_mailserver(self) -> str | None: +        # TODO: Not complete and test now. +        dkim_path = ( +            self.app.data_dir.full_path +            / "dms/config/opendkim/keys" +            / self.root_domain +            / "mail.txt" +        ) +        if not dkim_path.exists(): +            return None + +        p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True) +        value = "" +        for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")): +            value += match.group(1) +        return value + +    def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str: +        # TODO: Not complete and test now. +        return self._generate_dns_zone( +            ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver() +        ) diff --git a/tools/cru-py/cru/service/_template.py b/tools/cru-py/cru/service/_template.py new file mode 100644 index 0000000..170116c --- /dev/null +++ b/tools/cru-py/cru/service/_template.py @@ -0,0 +1,86 @@ +from argparse import Namespace +import shutil + +from cru import CruIterator +from cru.template import TemplateTree + +from ._base import AppCommandFeatureProvider, AppFeaturePath +from ._config import ConfigManager + + +class TemplateManager(AppCommandFeatureProvider): +    def __init__(self, prefix: str | None = None): +        super().__init__("template-manager") +        self._prefix = prefix or self.app.app_id.upper() + +    def setup(self) -> None: +        self._templates_dir = self.app.add_path("templates", True) +        self._generated_dir = self.app.add_path("generated", True) +        self._template_tree: TemplateTree | None = None + +    @property +    def prefix(self) -> str: +        return self._prefix + +    @property +    def templates_dir(self) -> AppFeaturePath: +        return self._templates_dir + +    @property +    def generated_dir(self) -> AppFeaturePath: +        return self._generated_dir + +    @property +    def template_tree(self) -> TemplateTree: +        if self._template_tree is None: +            return self.reload() +        return self._template_tree + +    def reload(self) -> TemplateTree: +        self._template_tree = TemplateTree( +            self.prefix, self.templates_dir.full_path_str +        ) +        return self._template_tree + +    def _print_file_lists(self) -> None: +        for file in CruIterator(self.template_tree.templates).transform(lambda t: t[0]): +            print(file.as_posix()) + +    def _generate_files(self, dry_run: bool) -> None: +        config_manager = self.app.get_feature(ConfigManager) +        if not dry_run and self.generated_dir.full_path.exists(): +            shutil.rmtree(self.generated_dir.full_path) +        self.template_tree.generate_to( +            self.generated_dir.full_path_str, config_manager.get_str_dict(), dry_run +        ) + +    def get_command_info(self): +        return ("template", "Manage templates.") + +    def setup_arg_parser(self, arg_parser): +        subparsers = arg_parser.add_subparsers( +            dest="template_command", required=True, metavar="TEMPLATE_COMMAND" +        ) +        _list_parser = subparsers.add_parser("list", help="list templates") +        _variables_parser = subparsers.add_parser( +            "variables", help="list variables used in all templates" +        ) +        generate_parser = subparsers.add_parser("generate", help="generate templates") +        generate_parser.add_argument( +            "--no-dry-run", action="store_true", help="generate and write target files" +        ) + +    def run_command(self, args: Namespace) -> None: +        if args.template_command == "list": +            self._print_file_lists() +        elif args.template_command == "variables": +            for var in self.template_tree.variables: +                print(var) +        elif args.template_command == "generate": +            dry_run = not args.no_dry_run +            self._generate_files(dry_run) +            if dry_run: +                print("Dry run successfully.") +                print( +                    f"Will delete dir {self.generated_dir.full_path_str} if it exists." +                ) diff --git a/tools/cru-py/cru/system.py b/tools/cru-py/cru/system.py new file mode 100644 index 0000000..f321717 --- /dev/null +++ b/tools/cru-py/cru/system.py @@ -0,0 +1,23 @@ +import os.path +import re + + +def check_debian_derivative_version(name: str) -> None | str: +    if not os.path.isfile("/etc/os-release"): +        return None +    with open("/etc/os-release", "r") as f: +        content = f.read() +        if f"ID={name}" not in content: +            return None +        m = re.search(r'VERSION_ID="(.+)"', content) +        if m is None: +            return None +        return m.group(1) + + +def check_ubuntu_version() -> None | str: +    return check_debian_derivative_version("ubuntu") + + +def check_debian_version() -> None | str: +    return check_debian_derivative_version("debian") diff --git a/tools/cru-py/cru/template.py b/tools/cru-py/cru/template.py new file mode 100644 index 0000000..6749cab --- /dev/null +++ b/tools/cru-py/cru/template.py @@ -0,0 +1,153 @@ +from collections.abc import Mapping +import os +import os.path +from pathlib import Path +from string import Template + +from ._iter import CruIterator +from ._error import CruException + + +class CruTemplateError(CruException): +    pass + + +class CruTemplate: +    def __init__(self, prefix: str, text: str): +        self._prefix = prefix +        self._template = Template(text) +        self._variables = ( +            CruIterator(self._template.get_identifiers()) +            .filter(lambda i: i.startswith(self._prefix)) +            .to_set() +        ) +        self._all_variables = set(self._template.get_identifiers()) + +    @property +    def prefix(self) -> str: +        return self._prefix + +    @property +    def raw_text(self) -> str: +        return self._template.template + +    @property +    def py_template(self) -> Template: +        return self._template + +    @property +    def variables(self) -> set[str]: +        return self._variables + +    @property +    def all_variables(self) -> set[str]: +        return self._all_variables + +    @property +    def has_variables(self) -> bool: +        """ +        If the template does not has any variables that starts with the given prefix, +        it returns False. This usually indicates that the template is not a real +        template and should be copied as is. Otherwise, it returns True. + +        This can be used as a guard to prevent invalid templates created accidentally +        without notice. +        """ +        return len(self.variables) > 0 + +    def generate(self, mapping: Mapping[str, str], allow_extra: bool = True) -> str: +        values = dict(mapping) +        if not self.variables <= set(values.keys()): +            raise CruTemplateError("Missing variables.") +        if not allow_extra and not set(values.keys()) <= self.variables: +            raise CruTemplateError("Extra variables.") +        return self._template.safe_substitute(values) + + +class TemplateTree: +    def __init__( +        self, +        prefix: str, +        source: str, +        template_file_suffix: str | None = ".template", +    ): +        """ +        If template_file_suffix is not None, the files will be checked according to the +        suffix of the file name. If the suffix matches, the file will be regarded as a +        template file. Otherwise, it will be regarded as a non-template file. +        Content of template file must contain variables that need to be replaced, while +        content of non-template file may not contain any variables. +        If either case is false, it generally means whether the file is a template is +        wrongly handled. +        """ +        self._prefix = prefix +        self._files: list[tuple[Path, CruTemplate]] = [] +        self._source = source +        self._template_file_suffix = template_file_suffix +        self._load() + +    @property +    def prefix(self) -> str: +        return self._prefix + +    @property +    def templates(self) -> list[tuple[Path, CruTemplate]]: +        return self._files + +    @property +    def source(self) -> str: +        return self._source + +    @property +    def template_file_suffix(self) -> str | None: +        return self._template_file_suffix + +    @staticmethod +    def _scan_files(root_path: str) -> list[Path]: +        result: list[Path] = [] +        for root, _dirs, files in os.walk(root_path): +            for file in files: +                path = Path(root, file) +                path = path.relative_to(root_path) +                result.append(Path(path)) +        return result + +    def _load(self) -> None: +        files = self._scan_files(self.source) +        for file_path in files: +            template_file = Path(self.source) / file_path +            with open(template_file, "r") as f: +                content = f.read() +            template = CruTemplate(self.prefix, content) +            if self.template_file_suffix is not None: +                should_be_template = file_path.name.endswith(self.template_file_suffix) +                if should_be_template and not template.has_variables: +                    raise CruTemplateError( +                        f"Template file {file_path} has no variables." +                    ) +                elif not should_be_template and template.has_variables: +                    raise CruTemplateError(f"Non-template {file_path} has variables.") +            self._files.append((file_path, template)) + +    @property +    def variables(self) -> set[str]: +        s = set() +        for _, template in self.templates: +            s.update(template.variables) +        return s + +    def generate_to( +        self, destination: str, variables: Mapping[str, str], dry_run: bool +    ) -> None: +        for file, template in self.templates: +            des = Path(destination) / file +            if self.template_file_suffix is not None and des.name.endswith( +                self.template_file_suffix +            ): +                des = des.parent / (des.name[: -len(self.template_file_suffix)]) + +            text = template.generate(variables) +            if not dry_run: +                des.parent.mkdir(parents=True, exist_ok=True) +                with open(des, "w") as f: +                    f.write(text) diff --git a/tools/cru-py/cru/tool.py b/tools/cru-py/cru/tool.py new file mode 100644 index 0000000..377f5d7 --- /dev/null +++ b/tools/cru-py/cru/tool.py @@ -0,0 +1,82 @@ +import shutil +import subprocess +from typing import Any +from collections.abc import Iterable + +from ._error import CruException + + +class CruExternalToolError(CruException): +    def __init__(self, message: str, tool: str, *args, **kwargs) -> None: +        super().__init__(message, *args, **kwargs) +        self._tool = tool + +    @property +    def tool(self) -> str: +        return self._tool + + +class CruExternalToolNotFoundError(CruExternalToolError): +    def __init__(self, message: str | None, tool: str, *args, **kwargs) -> None: +        super().__init__( +            message or f"Could not find binary for {tool}.", tool, *args, **kwargs +        ) + + +class CruExternalToolRunError(CruExternalToolError): +    def __init__( +        self, +        message: str, +        tool: str, +        tool_args: Iterable[str], +        tool_error: Any, +        *args, +        **kwargs, +    ) -> None: +        super().__init__(message, tool, *args, **kwargs) +        self._tool_args = list(tool_args) +        self._tool_error = tool_error + +    @property +    def tool_args(self) -> list[str]: +        return self._tool_args + +    @property +    def tool_error(self) -> Any: +        return self._tool_error + + +class ExternalTool: +    def __init__(self, bin: str) -> None: +        self._bin = bin + +    @property +    def bin(self) -> str: +        return self._bin + +    @bin.setter +    def bin(self, value: str) -> None: +        self._bin = value + +    @property +    def bin_path(self) -> str: +        real_bin = shutil.which(self.bin) +        if not real_bin: +            raise CruExternalToolNotFoundError(None, self.bin) +        return real_bin + +    def run( +        self, *process_args: str, **subprocess_kwargs +    ) -> subprocess.CompletedProcess: +        try: +            return subprocess.run( +                [self.bin_path] + list(process_args), **subprocess_kwargs +            ) +        except subprocess.CalledProcessError as e: +            raise CruExternalToolError("Subprocess failed.", self.bin) from e +        except OSError as e: +            raise CruExternalToolError("Failed to start subprocess", self.bin) from e + +    def run_get_output(self, *process_args: str, **subprocess_kwargs) -> Any: +        process = self.run(*process_args, capture_output=True, **subprocess_kwargs) +        return process.stdout diff --git a/tools/cru-py/cru/value.py b/tools/cru-py/cru/value.py new file mode 100644 index 0000000..9c03219 --- /dev/null +++ b/tools/cru-py/cru/value.py @@ -0,0 +1,292 @@ +from __future__ import annotations + +import random +import secrets +import string +import uuid +from abc import abstractmethod, ABCMeta +from collections.abc import Callable +from typing import Any, ClassVar, TypeVar, Generic + +from ._error import CruException + + +def _str_case_in(s: str, case: bool, str_list: list[str]) -> bool: +    if case: +        return s in str_list +    else: +        return s.lower() in [s.lower() for s in str_list] + + +_T = TypeVar("_T") + + +class CruValueTypeError(CruException): +    def __init__( +        self, +        message: str, +        value: Any, +        value_type: ValueType | None, +        *args, +        **kwargs, +    ): +        super().__init__( +            message, +            *args, +            **kwargs, +        ) +        self._value = value +        self._value_type = value_type + +    @property +    def value(self) -> Any: +        return self._value + +    @property +    def value_type(self) -> ValueType | None: +        return self._value_type + + +class ValueType(Generic[_T], metaclass=ABCMeta): +    def __init__(self, name: str, _type: type[_T]) -> None: +        self._name = name +        self._type = _type + +    @property +    def name(self) -> str: +        return self._name + +    @property +    def type(self) -> type[_T]: +        return self._type + +    def check_value_type(self, value: Any) -> None: +        if not isinstance(value, self.type): +            raise CruValueTypeError("Type of value is wrong.", value, self) + +    def _do_check_value(self, value: Any) -> _T: +        return value + +    def check_value(self, value: Any) -> _T: +        self.check_value_type(value) +        return self._do_check_value(value) + +    @abstractmethod +    def _do_check_str_format(self, s: str) -> None: +        raise NotImplementedError() + +    def check_str_format(self, s: str) -> None: +        if not isinstance(s, str): +            raise CruValueTypeError("Try to check format on a non-str.", s, self) +        self._do_check_str_format(s) + +    @abstractmethod +    def _do_convert_value_to_str(self, value: _T) -> str: +        raise NotImplementedError() + +    def convert_value_to_str(self, value: _T) -> str: +        self.check_value(value) +        return self._do_convert_value_to_str(value) + +    @abstractmethod +    def _do_convert_str_to_value(self, s: str) -> _T: +        raise NotImplementedError() + +    def convert_str_to_value(self, s: str) -> _T: +        self.check_str_format(s) +        return self._do_convert_str_to_value(s) + +    def check_value_or_try_convert_from_str(self, value_or_str: Any) -> _T: +        try: +            return self.check_value(value_or_str) +        except CruValueTypeError: +            if isinstance(value_or_str, str): +                return self.convert_str_to_value(value_or_str) +            else: +                raise + +    def create_default_value(self) -> _T: +        return self.type() + + +class TextValueType(ValueType[str]): +    def __init__(self) -> None: +        super().__init__("text", str) + +    def _do_check_str_format(self, _s): +        return + +    def _do_convert_value_to_str(self, value): +        return value + +    def _do_convert_str_to_value(self, s): +        return s + + +class IntegerValueType(ValueType[int]): +    def __init__(self) -> None: +        super().__init__("integer", int) + +    def _do_check_str_format(self, s): +        try: +            int(s) +        except ValueError as e: +            raise CruValueTypeError("Invalid integer format.", s, self) from e + +    def _do_convert_value_to_str(self, value): +        return str(value) + +    def _do_convert_str_to_value(self, s): +        return int(s) + + +class FloatValueType(ValueType[float]): +    def __init__(self) -> None: +        super().__init__("float", float) + +    def _do_check_str_format(self, s): +        try: +            float(s) +        except ValueError as e: +            raise CruValueTypeError("Invalid float format.", s, self) from e + +    def _do_convert_value_to_str(self, value): +        return str(value) + +    def _do_convert_str_to_value(self, s): +        return float(s) + + +class BooleanValueType(ValueType[bool]): +    DEFAULT_TRUE_LIST: ClassVar[list[str]] = ["true", "yes", "y", "on", "1"] +    DEFAULT_FALSE_LIST: ClassVar[list[str]] = ["false", "no", "n", "off", "0"] + +    def __init__( +        self, +        *, +        case_sensitive=False, +        true_list: None | list[str] = None, +        false_list: None | list[str] = None, +    ) -> None: +        super().__init__("boolean", bool) +        self._case_sensitive = case_sensitive +        self._valid_true_strs: list[str] = ( +            true_list or BooleanValueType.DEFAULT_TRUE_LIST +        ) +        self._valid_false_strs: list[str] = ( +            false_list or BooleanValueType.DEFAULT_FALSE_LIST +        ) + +    @property +    def case_sensitive(self) -> bool: +        return self._case_sensitive + +    @property +    def valid_true_strs(self) -> list[str]: +        return self._valid_true_strs + +    @property +    def valid_false_strs(self) -> list[str]: +        return self._valid_false_strs + +    @property +    def valid_boolean_strs(self) -> list[str]: +        return self._valid_true_strs + self._valid_false_strs + +    def _do_check_str_format(self, s): +        if not _str_case_in(s, self.case_sensitive, self.valid_boolean_strs): +            raise CruValueTypeError("Invalid boolean format.", s, self) + +    def _do_convert_value_to_str(self, value): +        return self._valid_true_strs[0] if value else self._valid_false_strs[0] + +    def _do_convert_str_to_value(self, s): +        return _str_case_in(s, self.case_sensitive, self._valid_true_strs) + +    def create_default_value(self): +        return self.valid_false_strs[0] + + +class EnumValueType(ValueType[str]): +    def __init__(self, valid_values: list[str], /, case_sensitive=False) -> None: +        super().__init__(f"enum({'|'.join(valid_values)})", str) +        self._case_sensitive = case_sensitive +        self._valid_values = valid_values + +    @property +    def case_sensitive(self) -> bool: +        return self._case_sensitive + +    @property +    def valid_values(self) -> list[str]: +        return self._valid_values + +    def _do_check_value(self, value): +        self._do_check_str_format(value) + +    def _do_check_str_format(self, s): +        if not _str_case_in(s, self.case_sensitive, self.valid_values): +            raise CruValueTypeError("Invalid enum value", s, self) + +    def _do_convert_value_to_str(self, value): +        return value + +    def _do_convert_str_to_value(self, s): +        return s + +    def create_default_value(self): +        return self.valid_values[0] + + +TEXT_VALUE_TYPE = TextValueType() +INTEGER_VALUE_TYPE = IntegerValueType() +BOOLEAN_VALUE_TYPE = BooleanValueType() + + +class ValueGeneratorBase(Generic[_T], metaclass=ABCMeta): +    @abstractmethod +    def generate(self) -> _T: +        raise NotImplementedError() + +    def __call__(self) -> _T: +        return self.generate() + + +class ValueGenerator(ValueGeneratorBase[_T]): +    def __init__(self, generate_func: Callable[[], _T]) -> None: +        self._generate_func = generate_func + +    @property +    def generate_func(self) -> Callable[[], _T]: +        return self._generate_func + +    def generate(self) -> _T: +        return self._generate_func() + + +class UuidValueGenerator(ValueGeneratorBase[str]): +    def generate(self): +        return str(uuid.uuid4()) + + +class RandomStringValueGenerator(ValueGeneratorBase[str]): +    def __init__(self, length: int, secure: bool) -> None: +        self._length = length +        self._secure = secure + +    @property +    def length(self) -> int: +        return self._length + +    @property +    def secure(self) -> bool: +        return self._secure + +    def generate(self): +        random_func = secrets.choice if self._secure else random.choice +        characters = string.ascii_letters + string.digits +        random_string = "".join(random_func(characters) for _ in range(self._length)) +        return random_string + + +UUID_VALUE_GENERATOR = UuidValueGenerator() | 
