diff options
Diffstat (limited to 'python/cru')
| -rw-r--r-- | python/cru/__init__.py | 60 | ||||
| -rw-r--r-- | python/cru/_base.py | 101 | ||||
| -rw-r--r-- | python/cru/_const.py | 49 | ||||
| -rw-r--r-- | python/cru/_decorator.py | 97 | ||||
| -rw-r--r-- | python/cru/_error.py | 89 | ||||
| -rw-r--r-- | python/cru/_event.py | 61 | ||||
| -rw-r--r-- | python/cru/_func.py | 172 | ||||
| -rw-r--r-- | python/cru/_helper.py | 16 | ||||
| -rw-r--r-- | python/cru/_iter.py | 469 | ||||
| -rw-r--r-- | python/cru/_type.py | 52 | ||||
| -rw-r--r-- | python/cru/attr.py | 364 | ||||
| -rw-r--r-- | python/cru/config.py | 196 | ||||
| -rw-r--r-- | python/cru/list.py | 160 | ||||
| -rw-r--r-- | python/cru/parsing.py | 290 | ||||
| -rw-r--r-- | python/cru/service/__init__.py | 0 | ||||
| -rw-r--r-- | python/cru/service/__main__.py | 27 | ||||
| -rw-r--r-- | python/cru/service/_app.py | 30 | ||||
| -rw-r--r-- | python/cru/service/_base.py | 400 | ||||
| -rw-r--r-- | python/cru/service/_gen_cmd.py | 200 | ||||
| -rw-r--r-- | python/cru/service/_nginx.py | 263 | ||||
| -rw-r--r-- | python/cru/service/_template.py | 228 | ||||
| -rw-r--r-- | python/cru/system.py | 23 | ||||
| -rw-r--r-- | python/cru/template.py | 209 | ||||
| -rw-r--r-- | python/cru/tool.py | 82 | ||||
| -rw-r--r-- | python/cru/value.py | 292 | 
25 files changed, 3930 insertions, 0 deletions
| diff --git a/python/cru/__init__.py b/python/cru/__init__.py new file mode 100644 index 0000000..17799a9 --- /dev/null +++ b/python/cru/__init__.py @@ -0,0 +1,60 @@ +import sys + +from ._base import CRU, CruNamespaceError, CRU_NAME_PREFIXES +from ._error import ( +    CruException, +    CruLogicError, +    CruInternalError, +    CruUnreachableError, +    cru_unreachable, +) +from ._const import ( +    CruConstantBase, +    CruDontChange, +    CruNotFound, +    CruNoValue, +    CruPlaceholder, +    CruUseDefault, +) +from ._func import CruFunction +from ._iter import CruIterable, CruIterator +from ._event import CruEvent, CruEventHandlerToken +from ._type import CruTypeSet, CruTypeCheckError + + +class CruInitError(CruException): +    pass + + +def check_python_version(required_version=(3, 11)): +    if sys.version_info < required_version: +        raise CruInitError(f"Python version must be >= {required_version}!") + + +check_python_version() + +__all__ = [ +    "CRU", +    "CruNamespaceError", +    "CRU_NAME_PREFIXES", +    "check_python_version", +    "CruException", +    "CruInternalError", +    "CruLogicError", +    "CruUnreachableError", +    "cru_unreachable", +    "CruInitError", +    "CruConstantBase", +    "CruDontChange", +    "CruNotFound", +    "CruNoValue", +    "CruPlaceholder", +    "CruUseDefault", +    "CruFunction", +    "CruIterable", +    "CruIterator", +    "CruEvent", +    "CruEventHandlerToken", +    "CruTypeSet", +    "CruTypeCheckError", +] diff --git a/python/cru/_base.py b/python/cru/_base.py new file mode 100644 index 0000000..2599d8f --- /dev/null +++ b/python/cru/_base.py @@ -0,0 +1,101 @@ +from typing import Any + +from ._helper import remove_none +from ._error import CruException + + +class CruNamespaceError(CruException): +    """Raised when a namespace is not found.""" + + +class _Cru: +    NAME_PREFIXES = ("CRU_", "Cru", "cru_") + +    def __init__(self) -> None: +        self._d: dict[str, Any] = {} + +    def all_names(self) -> list[str]: +        return list(self._d.keys()) + +    def get(self, name: str) -> Any: +        return self._d[name] + +    def has_name(self, name: str) -> bool: +        return name in self._d + +    @staticmethod +    def _maybe_remove_prefix(name: str) -> str | None: +        for prefix in _Cru.NAME_PREFIXES: +            if name.startswith(prefix): +                return name[len(prefix) :] +        return None + +    def _check_name_exist(self, *names: str | None) -> None: +        for name in names: +            if name is None: +                continue +            if self.has_name(name): +                raise CruNamespaceError(f"Name {name} exists in CRU.") + +    @staticmethod +    def check_name_format(name: str) -> tuple[str, str]: +        no_prefix_name = _Cru._maybe_remove_prefix(name) +        if no_prefix_name is None: +            raise CruNamespaceError( +                f"Name {name} is not prefixed with any of {_Cru.NAME_PREFIXES}." +            ) +        return name, no_prefix_name + +    @staticmethod +    def _check_object_name(o) -> tuple[str, str]: +        return _Cru.check_name_format(o.__name__) + +    def _do_add(self, o, *names: str | None) -> list[str]: +        name_list: list[str] = remove_none(names) +        for name in name_list: +            self._d[name] = o +        return name_list + +    def add(self, o, name: str | None) -> tuple[str, str | None]: +        no_prefix_name: str | None +        if name is None: +            name, no_prefix_name = self._check_object_name(o) +        else: +            no_prefix_name = self._maybe_remove_prefix(name) + +        self._check_name_exist(name, no_prefix_name) +        self._do_add(o, name, no_prefix_name) +        return name, no_prefix_name + +    def add_with_alias(self, o, name: str | None = None, *aliases: str) -> list[str]: +        final_names: list[str | None] = [] +        no_prefix_name: str | None +        if name is None: +            name, no_prefix_name = self._check_object_name(o) +            self._check_name_exist(name, no_prefix_name) +            final_names.extend([name, no_prefix_name]) +        for alias in aliases: +            no_prefix_name = self._maybe_remove_prefix(alias) +            self._check_name_exist(alias, no_prefix_name) +            final_names.extend([alias, no_prefix_name]) + +        return self._do_add(o, *final_names) + +    def add_objects(self, *objects): +        final_list = [] +        for o in objects: +            name, no_prefix_name = self._check_object_name(o) +            self._check_name_exist(name, no_prefix_name) +            final_list.append((o, name, no_prefix_name)) +        for o, name, no_prefix_name in final_list: +            self._do_add(o, name, no_prefix_name) + +    def __getitem__(self, item): +        return self.get(item) + +    def __getattr__(self, item): +        return self.get(item) + + +CRU_NAME_PREFIXES = _Cru.NAME_PREFIXES +CRU = _Cru() diff --git a/python/cru/_const.py b/python/cru/_const.py new file mode 100644 index 0000000..8246b35 --- /dev/null +++ b/python/cru/_const.py @@ -0,0 +1,49 @@ +from enum import Enum, auto +from typing import Self, TypeGuard, TypeVar + +from ._base import CRU + +_T = TypeVar("_T") + + +class CruConstantBase(Enum): +    @classmethod +    def check(cls, v: _T | Self) -> TypeGuard[Self]: +        return isinstance(v, cls) + +    @classmethod +    def check_not(cls, v: _T | Self) -> TypeGuard[_T]: +        return not cls.check(v) + +    @classmethod +    def value(cls) -> Self: +        return cls.VALUE  # type: ignore + + +class CruNotFound(CruConstantBase): +    VALUE = auto() + + +class CruUseDefault(CruConstantBase): +    VALUE = auto() + + +class CruDontChange(CruConstantBase): +    VALUE = auto() + + +class CruNoValue(CruConstantBase): +    VALUE = auto() + + +class CruPlaceholder(CruConstantBase): +    VALUE = auto() + + +CRU.add_objects( +    CruNotFound, +    CruUseDefault, +    CruDontChange, +    CruNoValue, +    CruPlaceholder, +) diff --git a/python/cru/_decorator.py b/python/cru/_decorator.py new file mode 100644 index 0000000..137fc05 --- /dev/null +++ b/python/cru/_decorator.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import ( +    Concatenate, +    Generic, +    ParamSpec, +    TypeVar, +    cast, +) + +from ._base import CRU + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_O = TypeVar("_O") +_R = TypeVar("_R") + + +class CruDecorator: + +    class ConvertResult(Generic[_T, _O]): +        def __init__( +            self, +            converter: Callable[[_T], _O], +        ) -> None: +            self.converter = converter + +        def __call__(self, origin: Callable[_P, _T]) -> Callable[_P, _O]: +            converter = self.converter + +            def real_impl(*args: _P.args, **kwargs: _P.kwargs) -> _O: +                return converter(origin(*args, **kwargs)) + +            return real_impl + +    class ImplementedBy(Generic[_T, _O, _P, _R]): +        def __init__( +            self, +            impl: Callable[Concatenate[_O, _P], _R], +            converter: Callable[[_T], _O], +        ) -> None: +            self.impl = impl +            self.converter = converter + +        def __call__( +            self, _origin: Callable[[_T], None] +        ) -> Callable[Concatenate[_T, _P], _R]: +            converter = self.converter +            impl = self.impl + +            def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: +                return cast(Callable[Concatenate[_O, _P], _R], impl)( +                    converter(_self), *args, **kwargs +                ) + +            return real_impl + +        @staticmethod +        def create_factory(converter: Callable[[_T], _O]) -> Callable[ +            [Callable[Concatenate[_O, _P], _R]], +            CruDecorator.ImplementedBy[_T, _O, _P, _R], +        ]: +            def create( +                m: Callable[Concatenate[_O, _P], _R], +            ) -> CruDecorator.ImplementedBy[_T, _O, _P, _R]: +                return CruDecorator.ImplementedBy(m, converter) + +            return create + +    class ImplementedByNoSelf(Generic[_P, _R]): +        def __init__(self, impl: Callable[_P, _R]) -> None: +            self.impl = impl + +        def __call__( +            self, _origin: Callable[[_T], None] +        ) -> Callable[Concatenate[_T, _P], _R]: +            impl = self.impl + +            def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: +                return cast(Callable[_P, _R], impl)(*args, **kwargs) + +            return real_impl + +        @staticmethod +        def create_factory() -> ( +            Callable[[Callable[_P, _R]], CruDecorator.ImplementedByNoSelf[_P, _R]] +        ): +            def create( +                m: Callable[_P, _R], +            ) -> CruDecorator.ImplementedByNoSelf[_P, _R]: +                return CruDecorator.ImplementedByNoSelf(m) + +            return create + + +CRU.add_objects(CruDecorator) diff --git a/python/cru/_error.py b/python/cru/_error.py new file mode 100644 index 0000000..e53c787 --- /dev/null +++ b/python/cru/_error.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +from typing import NoReturn, cast, overload + + +class CruException(Exception): +    """Base exception class of all exceptions in cru.""" + +    @overload +    def __init__( +        self, +        message: None = None, +        *args, +        user_message: str, +        **kwargs, +    ): ... + +    @overload +    def __init__( +        self, +        message: str, +        *args, +        user_message: str | None = None, +        **kwargs, +    ): ... + +    def __init__( +        self, +        message: str | None = None, +        *args, +        user_message: str | None = None, +        **kwargs, +    ): +        if message is None: +            message = user_message + +        super().__init__( +            message, +            *args, +            **kwargs, +        ) +        self._message: str +        self._message = cast(str, message) +        self._user_message = user_message + +    @property +    def message(self) -> str: +        return self._message + +    def get_user_message(self) -> str | None: +        return self._user_message + +    def get_message(self, use_user: bool = True) -> str: +        if use_user and self._user_message is not None: +            return self._user_message +        else: +            return self._message + +    @property +    def is_internal(self) -> bool: +        return False + +    @property +    def is_logic_error(self) -> bool: +        return False + + +class CruLogicError(CruException): +    """Raised when a logic error occurs.""" + +    @property +    def is_logic_error(self) -> bool: +        return True + + +class CruInternalError(CruException): +    """Raised when an internal error occurs.""" + +    @property +    def is_internal(self) -> bool: +        return True + + +class CruUnreachableError(CruInternalError): +    """Raised when a code path is unreachable.""" + + +def cru_unreachable() -> NoReturn: +    raise CruUnreachableError("Code should not reach here!") diff --git a/python/cru/_event.py b/python/cru/_event.py new file mode 100644 index 0000000..51a794c --- /dev/null +++ b/python/cru/_event.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import Generic, ParamSpec, TypeVar + +from .list import CruList + +_P = ParamSpec("_P") +_R = TypeVar("_R") + + +class CruEventHandlerToken(Generic[_P, _R]): +    def __init__( +        self, event: CruEvent, handler: Callable[_P, _R], once: bool = False +    ) -> None: +        self._event = event +        self._handler = handler +        self._once = once + +    @property +    def event(self) -> CruEvent: +        return self._event + +    @property +    def handler(self) -> Callable[_P, _R]: +        return self._handler + +    @property +    def once(self) -> bool: +        return self._once + + +class CruEvent(Generic[_P, _R]): +    def __init__(self, name: str) -> None: +        self._name = name +        self._tokens: CruList[CruEventHandlerToken] = CruList() + +    def register( +        self, handler: Callable[_P, _R], once: bool = False +    ) -> CruEventHandlerToken: +        token = CruEventHandlerToken(self, handler, once) +        self._tokens.append(token) +        return token + +    def unregister(self, *handlers: CruEventHandlerToken | Callable[_P, _R]) -> int: +        old_length = len(self._tokens) +        self._tokens.reset( +            self._tokens.as_cru_iterator().filter( +                (lambda t: t in handlers or t.handler in handlers) +            ) +        ) +        return old_length - len(self._tokens) + +    def trigger(self, *args: _P.args, **kwargs: _P.kwargs) -> CruList[_R]: +        results = CruList( +            self._tokens.as_cru_iterator() +            .transform(lambda t: t.handler(*args, **kwargs)) +            .to_list() +        ) +        self._tokens.reset(self._tokens.as_cru_iterator().filter(lambda t: not t.once)) +        return results diff --git a/python/cru/_func.py b/python/cru/_func.py new file mode 100644 index 0000000..fc57802 --- /dev/null +++ b/python/cru/_func.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +from collections.abc import Callable, Iterable +from enum import Flag, auto +from typing import ( +    Any, +    Generic, +    Literal, +    ParamSpec, +    TypeAlias, +    TypeVar, +) + + +from ._base import CRU +from ._const import CruPlaceholder + +_P = ParamSpec("_P") +_P1 = ParamSpec("_P1") +_T = TypeVar("_T") + + +class _Dec: +    @staticmethod +    def wrap( +        origin: Callable[_P, Callable[_P1, _T]] +    ) -> Callable[_P, _Wrapper[_P1, _T]]: +        def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_P1, _T]: +            return _Wrapper(origin(*args, **kwargs)) + +        return _wrapped + + +class _RawBase: +    @staticmethod +    def none(*_v, **_kwargs) -> None: +        return None + +    @staticmethod +    def true(*_v, **_kwargs) -> Literal[True]: +        return True + +    @staticmethod +    def false(*_v, **_kwargs) -> Literal[False]: +        return False + +    @staticmethod +    def identity(v: _T) -> _T: +        return v + +    @staticmethod +    def only_you(v: _T, *_v, **_kwargs) -> _T: +        return v + +    @staticmethod +    def equal(a: Any, b: Any) -> bool: +        return a == b + +    @staticmethod +    def not_equal(a: Any, b: Any) -> bool: +        return a != b + +    @staticmethod +    def not_(v: Any) -> Any: +        return not v + + +class _Wrapper(Generic[_P, _T]): +    def __init__(self, f: Callable[_P, _T]): +        self._f = f + +    @property +    def me(self) -> Callable[_P, _T]: +        return self._f + +    def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: +        return self._f(*args, **kwargs) + +    @_Dec.wrap +    def bind(self, *bind_args, **bind_kwargs) -> Callable[..., _T]: +        func = self.me + +        def bound_func(*args, **kwargs): +            popped = 0 +            real_args = [] +            for arg in bind_args: +                if CruPlaceholder.check(arg): +                    real_args.append(args[popped]) +                    popped += 1 +                else: +                    real_args.append(arg) +            real_args.extend(args[popped:]) +            return func(*real_args, **(bind_kwargs | kwargs)) + +        return bound_func + +    class ChainMode(Flag): +        ARGS = auto() +        KWARGS = auto() +        BOTH = ARGS | KWARGS + +    ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]] +    KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]] +    ChainableCallable: TypeAlias = Callable[ +        ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]] +    ] + +    @_Dec.wrap +    def chain_with_args( +        self, funcs: Iterable[ArgsChainableCallable], *bind_args, **bind_kwargs +    ) -> ArgsChainableCallable: +        def chained_func(*args): +            args = self.bind(*bind_args, **bind_kwargs)(*args) + +            for func in funcs: +                args = _Wrapper(func).bind(*bind_args, **bind_kwargs)(*args) +            return args + +        return chained_func + +    @_Dec.wrap +    def chain_with_kwargs( +        self, funcs: Iterable[KwargsChainableCallable], *bind_args, **bind_kwargs +    ) -> KwargsChainableCallable: +        def chained_func(**kwargs): +            kwargs = self.bind(*bind_args, **bind_kwargs)(**kwargs) +            for func in funcs: +                kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(**kwargs) +            return kwargs + +        return chained_func + +    @_Dec.wrap +    def chain_with_both( +        self, funcs: Iterable[ChainableCallable], *bind_args, **bind_kwargs +    ) -> ChainableCallable: +        def chained_func(*args, **kwargs): +            for func in funcs: +                args, kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)( +                    *args, **kwargs +                ) +            return args, kwargs + +        return chained_func + + +class _Base: +    none = _Wrapper(_RawBase.none) +    true = _Wrapper(_RawBase.true) +    false = _Wrapper(_RawBase.false) +    identity = _Wrapper(_RawBase.identity) +    only_you = _Wrapper(_RawBase.only_you) +    equal = _Wrapper(_RawBase.equal) +    not_equal = _Wrapper(_RawBase.not_equal) +    not_ = _Wrapper(_RawBase.not_) + + +class _Creators: +    @staticmethod +    def make_isinstance_of_types(*types: type) -> Callable: +        return _Wrapper(lambda v: type(v) in types) + + +class CruFunction: +    RawBase: TypeAlias = _RawBase +    Base: TypeAlias = _Base +    Creators: TypeAlias = _Creators +    Wrapper: TypeAlias = _Wrapper +    Decorators: TypeAlias = _Dec + + +CRU.add_objects(CruFunction) diff --git a/python/cru/_helper.py b/python/cru/_helper.py new file mode 100644 index 0000000..43baf46 --- /dev/null +++ b/python/cru/_helper.py @@ -0,0 +1,16 @@ +from collections.abc import Callable +from typing import Any, Iterable, TypeVar, cast + +_T = TypeVar("_T") +_D = TypeVar("_D") + + +def remove_element( +    iterable: Iterable[_T | None], to_rm: Iterable[Any], des: type[_D] | None = None +) -> _D: +    to_rm = set(to_rm) +    return cast(Callable[..., _D], des or list)(v for v in iterable if v not in to_rm) + + +def remove_none(iterable: Iterable[_T | None], des: type[_D] | None = None) -> _D: +    return cast(Callable[..., _D], des or list)(v for v in iterable if v is not None) diff --git a/python/cru/_iter.py b/python/cru/_iter.py new file mode 100644 index 0000000..f9683ca --- /dev/null +++ b/python/cru/_iter.py @@ -0,0 +1,469 @@ +from __future__ import annotations + +from collections.abc import Iterable, Callable, Generator, Iterator +from dataclasses import dataclass +from enum import Enum +from typing import ( +    Concatenate, +    Literal, +    Never, +    Self, +    TypeAlias, +    TypeVar, +    ParamSpec, +    Any, +    Generic, +    cast, +) + +from ._base import CRU +from ._const import CruNotFound +from ._error import cru_unreachable + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_O = TypeVar("_O") +_V = TypeVar("_V") +_R = TypeVar("_R") + + +class _Generic: +    class StepActionKind(Enum): +        SKIP = 0 +        PUSH = 1 +        STOP = 2 +        AGGREGATE = 3 + +    @dataclass +    class StepAction(Generic[_V, _R]): +        value: Iterable[Self] | _V | _R | None +        kind: _Generic.StepActionKind + +        @property +        def push_value(self) -> _V: +            assert self.kind == _Generic.StepActionKind.PUSH +            return cast(_V, self.value) + +        @property +        def stop_value(self) -> _R: +            assert self.kind == _Generic.StepActionKind.STOP +            return cast(_R, self.value) + +        @staticmethod +        def skip() -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction(None, _Generic.StepActionKind.SKIP) + +        @staticmethod +        def push(value: _V | None) -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction(value, _Generic.StepActionKind.PUSH) + +        @staticmethod +        def stop(value: _R | None = None) -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction(value, _Generic.StepActionKind.STOP) + +        @staticmethod +        def aggregate( +            *results: _Generic.StepAction[_V, _R], +        ) -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction(results, _Generic.StepActionKind.AGGREGATE) + +        @staticmethod +        def push_last(value: _V | None) -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction.aggregate( +                _Generic.StepAction.push(value), _Generic.StepAction.stop() +            ) + +        def flatten(self) -> Iterable[Self]: +            return _Generic.flatten( +                self, +                is_leave=lambda r: r.kind != _Generic.StepActionKind.AGGREGATE, +                get_children=lambda r: cast(Iterable[Self], r.value), +            ) + +    GeneralStepAction: TypeAlias = StepAction[_V, _R] | _V | _R | None +    IterateOperation: TypeAlias = Callable[[_T, int], GeneralStepAction[_V, _R]] +    IteratePreHook: TypeAlias = Callable[[Iterable[_T]], GeneralStepAction[_V, _R]] +    IteratePostHook: TypeAlias = Callable[[int], GeneralStepAction[_V, _R]] + +    @staticmethod +    def _is_not_iterable(o: Any) -> bool: +        return not isinstance(o, Iterable) + +    @staticmethod +    def _return_self(o): +        return o + +    @staticmethod +    def iterable_flatten( +        maybe_iterable: Iterable[_T] | _T, max_depth: int = -1, *, _depth: int = 0 +    ) -> Iterable[Iterable[_T] | _T]: +        if _depth == max_depth or not isinstance(maybe_iterable, Iterable): +            yield maybe_iterable +            return + +        for child in maybe_iterable: +            yield from _Generic.iterable_flatten( +                child, +                max_depth, +                _depth=_depth + 1, +            ) + +    @staticmethod +    def flatten( +        o: _O, +        max_depth: int = -1, +        /, +        is_leave: CruIterator.ElementPredicate[_O] = _is_not_iterable, +        get_children: CruIterator.ElementTransformer[_O, Iterable[_O]] = _return_self, +        *, +        _depth: int = 0, +    ) -> Iterable[_O]: +        if _depth == max_depth or is_leave(o): +            yield o +            return +        for child in get_children(o): +            yield from _Generic.flatten( +                child, +                max_depth, +                is_leave, +                get_children, +                _depth=_depth + 1, +            ) + +    class Results: +        @staticmethod +        def true(_) -> Literal[True]: +            return True + +        @staticmethod +        def false(_) -> Literal[False]: +            return False + +        @staticmethod +        def not_found(_) -> Literal[CruNotFound.VALUE]: +            return CruNotFound.VALUE + +    @staticmethod +    def _non_result_to_push(value: Any) -> StepAction[_V, _R]: +        return _Generic.StepAction.push(value) + +    @staticmethod +    def _non_result_to_stop(value: Any) -> StepAction[_V, _R]: +        return _Generic.StepAction.stop(value) + +    @staticmethod +    def _none_hook(_: Any) -> StepAction[_V, _R]: +        return _Generic.StepAction.skip() + +    def iterate( +        iterable: Iterable[_T], +        operation: IterateOperation[_T, _V, _R], +        fallback_return: _R, +        pre_iterate: IteratePreHook[_T, _V, _R], +        post_iterate: IteratePostHook[_V, _R], +        convert_value_result: Callable[[_V | _R | None], StepAction[_V, _R]], +    ) -> Generator[_V, None, _R]: +        pre_result = pre_iterate(iterable) +        if not isinstance(pre_result, _Generic.StepAction): +            real_pre_result = convert_value_result(pre_result) +        for r in real_pre_result.flatten(): +            if r.kind == _Generic.StepActionKind.STOP: +                return r.stop_value +            elif r.kind == _Generic.StepActionKind.PUSH: +                yield r.push_value +            else: +                assert r.kind == _Generic.StepActionKind.SKIP + +        for index, element in enumerate(iterable): +            result = operation(element, index) +            if not isinstance(result, _Generic.StepAction): +                real_result = convert_value_result(result) +            for r in real_result.flatten(): +                if r.kind == _Generic.StepActionKind.STOP: +                    return r.stop_value +                elif r.kind == _Generic.StepActionKind.PUSH: +                    yield r.push_value +                else: +                    assert r.kind == _Generic.StepActionKind.SKIP +                    continue + +        post_result = post_iterate(index + 1) +        if not isinstance(post_result, _Generic.StepAction): +            real_post_result = convert_value_result(post_result) +        for r in real_post_result.flatten(): +            if r.kind == _Generic.StepActionKind.STOP: +                return r.stop_value +            elif r.kind == _Generic.StepActionKind.PUSH: +                yield r.push_value +            else: +                assert r.kind == _Generic.StepActionKind.SKIP + +        return fallback_return + +    def create_new( +        iterable: Iterable[_T], +        operation: IterateOperation[_T, _V, _R], +        fallback_return: _R, +        /, +        pre_iterate: IteratePreHook[_T, _V, _R] | None = None, +        post_iterate: IteratePostHook[_V, _R] | None = None, +    ) -> Generator[_V, None, _R]: +        return _Generic.iterate( +            iterable, +            operation, +            fallback_return, +            pre_iterate or _Generic._none_hook, +            post_iterate or _Generic._none_hook, +            _Generic._non_result_to_push, +        ) + +    def get_result( +        iterable: Iterable[_T], +        operation: IterateOperation[_T, _V, _R], +        fallback_return: _R, +        /, +        pre_iterate: IteratePreHook[_T, _V, _R] | None = None, +        post_iterate: IteratePostHook[_V, _R] | None = None, +    ) -> _R: +        try: +            for _ in _Generic.iterate( +                iterable, +                operation, +                fallback_return, +                pre_iterate or _Generic._none_hook, +                post_iterate or _Generic._none_hook, +                _Generic._non_result_to_stop, +            ): +                pass +        except StopIteration as stop: +            return stop.value +        cru_unreachable() + + +class _Helpers: +    @staticmethod +    def auto_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]: +        count = 0 + +        def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _O: +            nonlocal count +            r = c(count, *args, **kwargs) +            count += 1 +            return r + +        return wrapper + + +class _Creators: +    class Raw: +        @staticmethod +        def empty() -> Iterator[Never]: +            return iter([]) + +        @staticmethod +        def range(*args) -> Iterator[int]: +            return iter(range(*args)) + +        @staticmethod +        def unite(*args: _T) -> Iterator[_T]: +            return iter(args) + +        @staticmethod +        def _concat(*iterables: Iterable[_T]) -> Iterable[_T]: +            for iterable in iterables: +                yield from iterable + +        @staticmethod +        def concat(*iterables: Iterable[_T]) -> Iterator[_T]: +            return iter(_Creators.Raw._concat(*iterables)) + +    @staticmethod +    def _wrap(f: Callable[_P, Iterable[_O]]) -> Callable[_P, CruIterator[_O]]: +        def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> CruIterator[_O]: +            return CruIterator(f(*args, **kwargs)) + +        return _wrapped + +    empty = _wrap(Raw.empty) +    range = _wrap(Raw.range) +    unite = _wrap(Raw.unite) +    concat = _wrap(Raw.concat) + + +class CruIterator(Generic[_T]): +    ElementOperation: TypeAlias = Callable[[_V], Any] +    ElementPredicate: TypeAlias = Callable[[_V], bool] +    AnyElementPredicate: TypeAlias = ElementPredicate[Any] +    ElementTransformer: TypeAlias = Callable[[_V], _O] +    SelfElementTransformer: TypeAlias = ElementTransformer[_V, _V] +    AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] + +    Creators: TypeAlias = _Creators +    Helpers: TypeAlias = _Helpers + +    def __init__(self, iterable: Iterable[_T]) -> None: +        self._iterator = iter(iterable) + +    def __iter__(self) -> Iterator[_T]: +        return self._iterator + +    def create_new_me(self, iterable: Iterable[_O]) -> CruIterator[_O]: +        return type(self)(iterable)  # type: ignore + +    @staticmethod +    def _wrap( +        f: Callable[Concatenate[CruIterator[_T], _P], Iterable[_O]], +    ) -> Callable[Concatenate[CruIterator[_T], _P], CruIterator[_O]]: +        def _wrapped( +            self: CruIterator[_T], *args: _P.args, **kwargs: _P.kwargs +        ) -> CruIterator[_O]: +            return self.create_new_me(f(self, *args, **kwargs)) + +        return _wrapped + +    @_wrap +    def replace_me(self, iterable: Iterable[_O]) -> Iterable[_O]: +        return iterable + +    def replace_me_with_empty(self) -> CruIterator[Never]: +        return self.create_new_me(_Creators.Raw.empty()) + +    def replace_me_with_range(self, *args) -> CruIterator[int]: +        return self.create_new_me(_Creators.Raw.range(*args)) + +    def replace_me_with_unite(self, *args: _O) -> CruIterator[_O]: +        return self.create_new_me(_Creators.Raw.unite(*args)) + +    def replace_me_with_concat(self, *iterables: Iterable[_T]) -> CruIterator[_T]: +        return self.create_new_me(_Creators.Raw.concat(*iterables)) + +    def to_set(self) -> set[_T]: +        return set(self) + +    def to_list(self) -> list[_T]: +        return list(self) + +    def all(self, predicate: ElementPredicate[_T]) -> bool: +        for value in self: +            if not predicate(value): +                return False +        return True + +    def any(self, predicate: ElementPredicate[_T]) -> bool: +        for value in self: +            if predicate(value): +                return True +        return False + +    def foreach(self, operation: ElementOperation[_T]) -> None: +        for value in self: +            operation(value) + +    @_wrap +    def transform(self, transformer: ElementTransformer[_T, _O]) -> Iterable[_O]: +        for value in self: +            yield transformer(value) + +    map = transform + +    @_wrap +    def filter(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: +        for value in self: +            if predicate(value): +                yield value + +    @_wrap +    def continue_if(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: +        for value in self: +            yield value +            if not predicate(value): +                break + +    def first_n(self, max_count: int) -> CruIterator[_T]: +        if max_count < 0: +            raise ValueError("max_count must be 0 or positive.") +        if max_count == 0: +            return self.replace_me_with_empty()  # type: ignore +        return self.continue_if(_Helpers.auto_count(lambda i, _: i < max_count - 1)) + +    def drop_n(self, n: int) -> CruIterator[_T]: +        if n < 0: +            raise ValueError("n must be 0 or positive.") +        if n == 0: +            return self +        return self.filter(_Helpers.auto_count(lambda i, _: i < n)) + +    def single_or( +        self, fallback: _O | CruNotFound = CruNotFound.VALUE +    ) -> _T | _O | CruNotFound: +        first_2 = self.first_n(2) +        has_value = False +        for element in first_2: +            if has_value: +                raise ValueError("More than one value found.") +            has_value = True +            value = element +        if has_value: +            return value +        else: +            return fallback + +    def first_or( +        self, fallback: _O | CruNotFound = CruNotFound.VALUE +    ) -> _T | _O | CruNotFound: +        return self.first_n(1).single_or(fallback) + +    @_wrap +    def flatten(self, max_depth: int = -1) -> Iterable[Any]: +        return _Generic.iterable_flatten(self, max_depth) + +    def select_by_indices(self, indices: Iterable[int]) -> CruIterator[_T]: +        index_set = set(indices) +        max_index = max(index_set) +        return self.first_n(max_index + 1).filter( +            _Helpers.auto_count(lambda i, _: i in index_set) +        ) + +    def remove_values(self, values: Iterable[Any]) -> CruIterator[_T]: +        value_set = set(values) +        return self.filter(lambda v: v not in value_set) + +    def replace_values( +        self, old_values: Iterable[Any], new_value: _O +    ) -> Iterable[_T | _O]: +        value_set = set(old_values) +        return self.transform(lambda v: new_value if v in value_set else v) + +    def group_by(self, key_getter: Callable[[_T], _O]) -> dict[_O, list[_T]]: +        result: dict[_O, list[_T]] = {} + +        for item in self: +            key = key_getter(item) +            if key not in result: +                result[key] = [] +            result[key].append(item) + +        return result + +    def join_str(self: CruIterator[str], separator: str) -> str: +        return separator.join(self) + + +class CruIterMixin(Generic[_T]): +    def cru_iter(self: Iterable[_T]) -> CruIterator[_T]: +        return CruIterator(self) + + +class CruIterList(list[_T], CruIterMixin[_T]): +    pass + + +class CruIterable: +    Generic: TypeAlias = _Generic +    Iterator: TypeAlias = CruIterator[_T] +    Helpers: TypeAlias = _Helpers +    Mixin: TypeAlias = CruIterMixin[_T] +    IterList: TypeAlias = CruIterList[_T] + + +CRU.add_objects(CruIterable, CruIterator) diff --git a/python/cru/_type.py b/python/cru/_type.py new file mode 100644 index 0000000..1f81da3 --- /dev/null +++ b/python/cru/_type.py @@ -0,0 +1,52 @@ +from collections.abc import Iterable +from typing import Any + +from ._error import CruException, CruLogicError +from ._iter import CruIterator + + +class CruTypeCheckError(CruException): +    pass + + +DEFAULT_NONE_ERR_MSG = "None is not allowed here." +DEFAULT_TYPE_ERR_MSG = "Object of this type is not allowed here." + + +class CruTypeSet(set[type]): +    def __init__(self, *types: type): +        type_set = CruIterator(types).filter(lambda t: t is not None).to_set() +        if not CruIterator(type_set).all(lambda t: isinstance(t, type)): +            raise CruLogicError("TypeSet can only contain type.") +        super().__init__(type_set) + +    def check_value( +        self, +        value: Any, +        /, +        allow_none: bool = False, +        empty_allow_all: bool = True, +    ) -> None: +        if value is None: +            if allow_none: +                return +            else: +                raise CruTypeCheckError(DEFAULT_NONE_ERR_MSG) +        if len(self) == 0 and empty_allow_all: +            return +        if not CruIterator(self).any(lambda t: isinstance(value, t)): +            raise CruTypeCheckError(DEFAULT_TYPE_ERR_MSG) + +    def check_value_list( +        self, +        values: Iterable[Any], +        /, +        allow_none: bool = False, +        empty_allow_all: bool = True, +    ) -> None: +        for value in values: +            self.check_value( +                value, +                allow_none, +                empty_allow_all, +            ) diff --git a/python/cru/attr.py b/python/cru/attr.py new file mode 100644 index 0000000..d4cc86a --- /dev/null +++ b/python/cru/attr.py @@ -0,0 +1,364 @@ +from __future__ import annotations + +import copy +from collections.abc import Callable, Iterable +from dataclasses import dataclass, field +from typing import Any + +from .list import CruUniqueKeyList +from ._type import CruTypeSet +from ._const import CruNotFound, CruUseDefault, CruDontChange +from ._iter import CruIterator + + +@dataclass +class CruAttr: + +    name: str +    value: Any +    description: str | None + +    @staticmethod +    def make( +        name: str, value: Any = CruUseDefault.VALUE, description: str | None = None +    ) -> CruAttr: +        return CruAttr(name, value, description) + + +CruAttrDefaultFactory = Callable[["CruAttrDef"], Any] +CruAttrTransformer = Callable[[Any, "CruAttrDef"], Any] +CruAttrValidator = Callable[[Any, "CruAttrDef"], None] + + +@dataclass +class CruAttrDef: +    name: str +    description: str +    default_factory: CruAttrDefaultFactory +    transformer: CruAttrTransformer +    validator: CruAttrValidator + +    def __init__( +        self, +        name: str, +        description: str, +        default_factory: CruAttrDefaultFactory, +        transformer: CruAttrTransformer, +        validator: CruAttrValidator, +    ) -> None: +        self.name = name +        self.description = description +        self.default_factory = default_factory +        self.transformer = transformer +        self.validator = validator + +    def transform(self, value: Any) -> Any: +        if self.transformer is not None: +            return self.transformer(value, self) +        return value + +    def validate(self, value: Any, /, force_allow_none: bool = False) -> None: +        if force_allow_none is value is None: +            return +        if self.validator is not None: +            self.validator(value, self) + +    def transform_and_validate( +        self, value: Any, /, force_allow_none: bool = False +    ) -> Any: +        value = self.transform(value) +        self.validate(value, force_allow_none) +        return value + +    def make_default_value(self) -> Any: +        return self.transform_and_validate(self.default_factory(self)) + +    def adopt(self, attr: CruAttr) -> CruAttr: +        attr = copy.deepcopy(attr) + +        if attr.name is None: +            attr.name = self.name +        elif attr.name != self.name: +            raise ValueError(f"Attr name is not match: {attr.name} != {self.name}") + +        if attr.value is CruUseDefault.VALUE: +            attr.value = self.make_default_value() +        else: +            attr.value = self.transform_and_validate(attr.value) + +        if attr.description is None: +            attr.description = self.description + +        return attr + +    def make( +        self, value: Any = CruUseDefault.VALUE, description: None | str = None +    ) -> CruAttr: +        value = self.make_default_value() if value is CruUseDefault.VALUE else value +        value = self.transform_and_validate(value) +        return CruAttr( +            self.name, +            value, +            description if description is not None else self.description, +        ) + + +@dataclass +class CruAttrDefBuilder: + +    name: str +    description: str +    types: list[type] | None = field(default=None) +    allow_none: bool = field(default=False) +    default: Any = field(default=CruUseDefault.VALUE) +    default_factory: CruAttrDefaultFactory | None = field(default=None) +    auto_list: bool = field(default=False) +    transformers: list[CruAttrTransformer] = field(default_factory=list) +    validators: list[CruAttrValidator] = field(default_factory=list) +    override_transformer: CruAttrTransformer | None = field(default=None) +    override_validator: CruAttrValidator | None = field(default=None) + +    build_hook: Callable[[CruAttrDef], None] | None = field(default=None) + +    def __init__(self, name: str, description: str) -> None: +        super().__init__() +        self.name = name +        self.description = description + +    def auto_adjust_default(self) -> None: +        if self.default is not CruUseDefault.VALUE and self.default is not None: +            return +        if self.allow_none and self.default is CruUseDefault.VALUE: +            self.default = None +        if not self.allow_none and self.default is None: +            self.default = CruUseDefault.VALUE +        if self.auto_list and not self.allow_none: +            self.default = [] + +    def with_name(self, name: str | CruDontChange) -> CruAttrDefBuilder: +        if name is not CruDontChange.VALUE: +            self.name = name +        return self + +    def with_description( +        self, default_description: str | CruDontChange +    ) -> CruAttrDefBuilder: +        if default_description is not CruDontChange.VALUE: +            self.description = default_description +        return self + +    def with_default(self, default: Any) -> CruAttrDefBuilder: +        if default is not CruDontChange.VALUE: +            self.default = default +        return self + +    def with_default_factory( +        self, +        default_factory: CruAttrDefaultFactory | CruDontChange, +    ) -> CruAttrDefBuilder: +        if default_factory is not CruDontChange.VALUE: +            self.default_factory = default_factory +        return self + +    def with_types( +        self, +        types: Iterable[type] | None | CruDontChange, +    ) -> CruAttrDefBuilder: +        if types is not CruDontChange.VALUE: +            self.types = None if types is None else list(types) +        return self + +    def with_allow_none(self, allow_none: bool | CruDontChange) -> CruAttrDefBuilder: +        if allow_none is not CruDontChange.VALUE: +            self.allow_none = allow_none +        return self + +    def with_auto_list( +        self, auto_list: bool | CruDontChange = True +    ) -> CruAttrDefBuilder: +        if auto_list is not CruDontChange.VALUE: +            self.auto_list = auto_list +        return self + +    def with_constraint( +        self, +        /, +        allow_none: bool | CruDontChange = CruDontChange.VALUE, +        types: Iterable[type] | None | CruDontChange = CruDontChange.VALUE, +        default: Any = CruDontChange.VALUE, +        default_factory: CruAttrDefaultFactory | CruDontChange = CruDontChange.VALUE, +        auto_list: bool | CruDontChange = CruDontChange.VALUE, +    ) -> CruAttrDefBuilder: +        return ( +            self.with_allow_none(allow_none) +            .with_types(types) +            .with_default(default) +            .with_default_factory(default_factory) +            .with_auto_list(auto_list) +        ) + +    def add_transformer(self, transformer: CruAttrTransformer) -> CruAttrDefBuilder: +        self.transformers.append(transformer) +        return self + +    def clear_transformers(self) -> CruAttrDefBuilder: +        self.transformers.clear() +        return self + +    def add_validator(self, validator: CruAttrValidator) -> CruAttrDefBuilder: +        self.validators.append(validator) +        return self + +    def clear_validators(self) -> CruAttrDefBuilder: +        self.validators.clear() +        return self + +    def with_override_transformer( +        self, override_transformer: CruAttrTransformer | None | CruDontChange +    ) -> CruAttrDefBuilder: +        if override_transformer is not CruDontChange.VALUE: +            self.override_transformer = override_transformer +        return self + +    def with_override_validator( +        self, override_validator: CruAttrValidator | None | CruDontChange +    ) -> CruAttrDefBuilder: +        if override_validator is not CruDontChange.VALUE: +            self.override_validator = override_validator +        return self + +    def is_valid(self) -> tuple[bool, str]: +        if not isinstance(self.name, str): +            return False, "Name must be a string!" +        if not isinstance(self.description, str): +            return False, "Default description must be a string!" +        if ( +            not self.allow_none +            and self.default is None +            and self.default_factory is None +        ): +            return False, "Default must be set if allow_none is False!" +        return True, "" + +    @staticmethod +    def _build( +        builder: CruAttrDefBuilder, auto_adjust_default: bool = True +    ) -> CruAttrDef: +        if auto_adjust_default: +            builder.auto_adjust_default() + +        valid, err = builder.is_valid() +        if not valid: +            raise ValueError(err) + +        def composed_transformer(value: Any, attr_def: CruAttrDef) -> Any: +            def transform_value(single_value: Any) -> Any: +                for transformer in builder.transformers: +                    single_value = transformer(single_value, attr_def) +                return single_value + +            if builder.auto_list: +                if not isinstance(value, list): +                    value = [value] +                value = CruIterator(value).transform(transform_value).to_list() + +            else: +                value = transform_value(value) +            return value + +        type_set = None if builder.types is None else CruTypeSet(*builder.types) + +        def composed_validator(value: Any, attr_def: CruAttrDef): +            def validate_value(single_value: Any) -> None: +                if type_set is not None: +                    type_set.check_value(single_value, allow_none=builder.allow_none) +                for validator in builder.validators: +                    validator(single_value, attr_def) + +            if builder.auto_list: +                CruIterator(value).foreach(validate_value) +            else: +                validate_value(value) + +        real_transformer = builder.override_transformer or composed_transformer +        real_validator = builder.override_validator or composed_validator + +        default_factory = builder.default_factory +        if default_factory is None: + +            def default_factory(_d): +                return copy.deepcopy(builder.default) + +        d = CruAttrDef( +            builder.name, +            builder.description, +            default_factory, +            real_transformer, +            real_validator, +        ) +        if builder.build_hook: +            builder.build_hook(d) +        return d + +    def build(self, auto_adjust_default=True) -> CruAttrDef: +        c = copy.deepcopy(self) +        self.build_hook = None +        return CruAttrDefBuilder._build(c, auto_adjust_default) + + +class CruAttrDefRegistry(CruUniqueKeyList[CruAttrDef, str]): + +    def __init__(self) -> None: +        super().__init__(lambda d: d.name) + +    def make_builder(self, name: str, default_description: str) -> CruAttrDefBuilder: +        b = CruAttrDefBuilder(name, default_description) +        b.build_hook = lambda a: self.add(a) +        return b + +    def adopt(self, attr: CruAttr) -> CruAttr: +        d = self.get(attr.name) +        return d.adopt(attr) + + +class CruAttrTable(CruUniqueKeyList[CruAttr, str]): +    def __init__(self, registry: CruAttrDefRegistry) -> None: +        self._registry: CruAttrDefRegistry = registry +        super().__init__(lambda a: a.name, before_add=registry.adopt) + +    @property +    def registry(self) -> CruAttrDefRegistry: +        return self._registry + +    def get_value_or(self, name: str, fallback: Any = CruNotFound.VALUE) -> Any: +        a = self.get_or(name, CruNotFound.VALUE) +        if a is CruNotFound.VALUE: +            return fallback +        return a.value + +    def get_value(self, name: str) -> Any: +        a = self.get(name) +        return a.value + +    def make_attr( +        self, +        name: str, +        value: Any = CruUseDefault.VALUE, +        /, +        description: str | None = None, +    ) -> CruAttr: +        d = self._registry.get(name) +        return d.make(value, description or d.description) + +    def add_value( +        self, +        name: str, +        value: Any = CruUseDefault.VALUE, +        /, +        description: str | None = None, +        *, +        replace: bool = False, +    ) -> CruAttr: +        attr = self.make_attr(name, value, description) +        self.add(attr, replace) +        return attr diff --git a/python/cru/config.py b/python/cru/config.py new file mode 100644 index 0000000..0f6f0d0 --- /dev/null +++ b/python/cru/config.py @@ -0,0 +1,196 @@ +from __future__ import annotations + +from typing import Any, TypeVar, Generic + +from ._error import CruException +from .list import CruUniqueKeyList +from .value import ( +    INTEGER_VALUE_TYPE, +    TEXT_VALUE_TYPE, +    CruValueTypeError, +    ValueGeneratorBase, +    ValueType, +) + +_T = TypeVar("_T") + + +class CruConfigError(CruException): +    def __init__(self, message: str, item: ConfigItem, *args, **kwargs): +        super().__init__(message, *args, **kwargs) +        self._item = item + +    @property +    def item(self) -> ConfigItem: +        return self._item + + +class ConfigItem(Generic[_T]): +    def __init__( +        self, +        name: str, +        description: str, +        value_type: ValueType[_T], +        value: _T | None = None, +        /, +        default: ValueGeneratorBase[_T] | _T | None = None, +    ) -> None: +        self._name = name +        self._description = description +        self._value_type = value_type +        self._value = value +        self._default = default + +    @property +    def name(self) -> str: +        return self._name + +    @property +    def description(self) -> str: +        return self._description + +    @property +    def value_type(self) -> ValueType[_T]: +        return self._value_type + +    @property +    def is_set(self) -> bool: +        return self._value is not None + +    @property +    def value(self) -> _T: +        if self._value is None: +            raise CruConfigError( +                "Config value is not set.", +                self, +                user_message=f"Config item {self.name} is not set.", +            ) +        return self._value + +    @property +    def value_str(self) -> str: +        return self.value_type.convert_value_to_str(self.value) + +    def set_value(self, v: _T | str, allow_convert_from_str=False): +        if allow_convert_from_str: +            self._value = self.value_type.check_value_or_try_convert_from_str(v) +        else: +            self._value = self.value_type.check_value(v) + +    def reset(self): +        self._value = None + +    @property +    def default(self) -> ValueGeneratorBase[_T] | _T | None: +        return self._default + +    @property +    def can_generate_default(self) -> bool: +        return self.default is not None + +    def generate_default_value(self) -> _T: +        if self.default is None: +            raise CruConfigError( +                "Config item does not support default value generation.", self +            ) +        elif isinstance(self.default, ValueGeneratorBase): +            v = self.default.generate() +        else: +            v = self.default +        try: +            self.value_type.check_value(v) +            return v +        except CruValueTypeError as e: +            raise CruConfigError( +                "Config value generator returns an invalid value.", self +            ) from e + +    def copy(self) -> "ConfigItem": +        return ConfigItem( +            self.name, +            self.description, +            self.value_type, +            self.value, +            self.default, +        ) + +    @property +    def description_str(self) -> str: +        return f"{self.name} ({self.value_type.name}): {self.description}" + + +class Configuration(CruUniqueKeyList[ConfigItem[Any], str]): +    def __init__(self): +        super().__init__(lambda c: c.name) + +    def get_set_items(self) -> list[ConfigItem[Any]]: +        return [item for item in self if item.is_set] + +    def get_unset_items(self) -> list[ConfigItem[Any]]: +        return [item for item in self if not item.is_set] + +    @property +    def all_set(self) -> bool: +        return len(self.get_unset_items()) == 0 + +    @property +    def all_not_set(self) -> bool: +        return len(self.get_set_items()) == 0 + +    def add_text_config( +        self, +        name: str, +        description: str, +        value: str | None = None, +        default: ValueGeneratorBase[str] | str | None = None, +    ) -> ConfigItem[str]: +        item = ConfigItem(name, description, TEXT_VALUE_TYPE, value, default) +        self.add(item) +        return item + +    def add_int_config( +        self, +        name: str, +        description: str, +        value: int | None = None, +        default: ValueGeneratorBase[int] | int | None = None, +    ) -> ConfigItem[int]: +        item = ConfigItem(name, description, INTEGER_VALUE_TYPE, value, default) +        self.add(item) +        return item + +    def set_config_item( +        self, +        name: str, +        value: Any | str, +        allow_convert_from_str=True, +    ) -> None: +        item = self.get(name) +        item.set_value( +            value, +            allow_convert_from_str=allow_convert_from_str, +        ) + +    def reset_all(self) -> None: +        for item in self: +            item.reset() + +    def to_dict(self) -> dict[str, Any]: +        return {item.name: item.value for item in self} + +    def to_str_dict(self) -> dict[str, str]: +        return { +            item.name: item.value_type.convert_value_to_str(item.value) for item in self +        } + +    def set_value_dict( +        self, +        value_dict: dict[str, Any], +        allow_convert_from_str: bool = False, +    ) -> None: +        for name, value in value_dict.items(): +            item = self.get(name) +            item.set_value( +                value, +                allow_convert_from_str=allow_convert_from_str, +            ) diff --git a/python/cru/list.py b/python/cru/list.py new file mode 100644 index 0000000..216a561 --- /dev/null +++ b/python/cru/list.py @@ -0,0 +1,160 @@ +from __future__ import annotations + +from collections.abc import Callable, Iterator +from typing import Any, Generic, Iterable, TypeAlias, TypeVar, overload + +from ._error import CruInternalError +from ._iter import CruIterator +from ._const import CruNotFound + +_T = TypeVar("_T") +_O = TypeVar("_O") + + +class CruListEdit(CruIterator[_T]): +    def __init__(self, iterable: Iterable[_T], _list: CruList[Any]) -> None: +        super().__init__(iterable) +        self._list = _list + +    def create_me(self, iterable: Iterable[_O]) -> CruListEdit[_O]: +        return CruListEdit(iterable, self._list) + +    @property +    def list(self) -> CruList[Any]: +        return self._list + +    def done(self) -> CruList[Any]: +        self._list.reset(self) +        return self._list + + +class CruList(list[_T]): +    def reset(self, new_values: Iterable[_T]): +        if self is new_values: +            new_values = list(new_values) +        self.clear() +        self.extend(new_values) +        return self + +    def as_cru_iterator(self) -> CruIterator[_T]: +        return CruIterator(self) + +    @staticmethod +    def make(maybe_list: Iterable[_T] | _T | None) -> CruList[_T]: +        if maybe_list is None: +            return CruList() +        if isinstance(maybe_list, Iterable): +            return CruList(maybe_list) +        return CruList([maybe_list]) + + +_K = TypeVar("_K") + +_KeyGetter: TypeAlias = Callable[[_T], _K] + + +class CruUniqueKeyList(Generic[_T, _K]): +    def __init__( +        self, +        key_getter: _KeyGetter[_T, _K], +        *, +        before_add: Callable[[_T], _T] | None = None, +    ): +        super().__init__() +        self._key_getter = key_getter +        self._before_add = before_add +        self._list: CruList[_T] = CruList() + +    @property +    def key_getter(self) -> _KeyGetter[_T, _K]: +        return self._key_getter + +    @property +    def internal_list(self) -> CruList[_T]: +        return self._list + +    def validate_self(self): +        keys = self._list.transform(self._key_getter) +        if len(keys) != len(set(keys)): +            raise CruInternalError("Duplicate keys!") + +    @overload +    def get_or( +        self, key: _K, fallback: CruNotFound = CruNotFound.VALUE +    ) -> _T | CruNotFound: ... + +    @overload +    def get_or(self, key: _K, fallback: _O) -> _T | _O: ... + +    def get_or( +        self, key: _K, fallback: _O | CruNotFound = CruNotFound.VALUE +    ) -> _T | _O | CruNotFound: +        return ( +            self._list.as_cru_iterator() +            .filter(lambda v: key == self._key_getter(v)) +            .first_or(fallback) +        ) + +    def get(self, key: _K) -> _T: +        value = self.get_or(key) +        if value is CruNotFound.VALUE: +            raise KeyError(f"Key {key} not found!") +        return value  # type: ignore + +    @property +    def keys(self) -> Iterable[_K]: +        return self._list.as_cru_iterator().map(self._key_getter) + +    def has_key(self, key: _K) -> bool: +        return self.get_or(key) != CruNotFound.VALUE + +    def try_remove(self, key: _K) -> bool: +        value = self.get_or(key) +        if value is CruNotFound.VALUE: +            return False +        self._list.remove(value) +        return True + +    def remove(self, key: _K, allow_absence: bool = False) -> None: +        if not self.try_remove(key) and not allow_absence: +            raise KeyError(f"Key {key} not found!") + +    def add(self, value: _T, /, replace: bool = False) -> None: +        v = self.get_or(self._key_getter(value)) +        if v is not CruNotFound.VALUE: +            if not replace: +                raise KeyError(f"Key {self._key_getter(v)} already exists!") +            self._list.remove(v) +        if self._before_add is not None: +            value = self._before_add(value) +        self._list.append(value) + +    def set(self, value: _T) -> None: +        self.add(value, True) + +    def extend(self, iterable: Iterable[_T], /, replace: bool = False) -> None: +        values = list(iterable) +        to_remove = [] +        for value in values: +            v = self.get_or(self._key_getter(value)) +            if v is not CruNotFound.VALUE: +                if not replace: +                    raise KeyError(f"Key {self._key_getter(v)} already exists!") +                to_remove.append(v) +        for value in to_remove: +            self._list.remove(value) +        if self._before_add is not None: +            values = [self._before_add(value) for value in values] +        self._list.extend(values) + +    def clear(self) -> None: +        self._list.reset([]) + +    def __iter__(self) -> Iterator[_T]: +        return iter(self._list) + +    def __len__(self) -> int: +        return len(self._list) + +    def cru_iter(self) -> CruIterator[_T]: +        return CruIterator(self._list) diff --git a/python/cru/parsing.py b/python/cru/parsing.py new file mode 100644 index 0000000..0e9239d --- /dev/null +++ b/python/cru/parsing.py @@ -0,0 +1,290 @@ +from __future__ import annotations + +from abc import ABCMeta, abstractmethod +from dataclasses import dataclass +from enum import Enum +from typing import NamedTuple, TypeAlias, TypeVar, Generic, NoReturn, Callable + +from ._error import CruException +from ._iter import CruIterable + +_T = TypeVar("_T") + + +class StrParseStream: +    class MemStackEntry(NamedTuple): +        pos: int +        lineno: int + +    class MemStackPopStr(NamedTuple): +        text: str +        lineno: int + +    def __init__(self, text: str) -> None: +        self._text = text +        self._pos = 0 +        self._lineno = 1 +        self._length = len(self._text) +        self._valid_pos_range = range(0, self.length + 1) +        self._valid_offset_range = range(-self.length, self.length + 1) +        self._mem_stack: CruIterable.IterList[StrParseStream.MemStackEntry] = ( +            CruIterable.IterList() +        ) + +    @property +    def text(self) -> str: +        return self._text + +    @property +    def length(self) -> int: +        return self._length + +    @property +    def valid_pos_range(self) -> range: +        return self._valid_pos_range + +    @property +    def valid_offset_range(self) -> range: +        return self._valid_offset_range + +    @property +    def pos(self) -> int: +        return self._pos + +    @property +    def lineno(self) -> int: +        return self._lineno + +    @property +    def eof(self) -> bool: +        return self._pos == self.length + +    def peek(self, length: int) -> str: +        real_length = min(length, self.length - self._pos) +        new_position = self._pos + real_length +        text = self._text[self._pos : new_position] +        return text + +    def read(self, length: int) -> str: +        text = self.peek(length) +        self._pos += len(text) +        self._lineno += text.count("\n") +        return text + +    def skip(self, length: int) -> None: +        self.read(length) + +    def peek_str(self, text: str) -> bool: +        if self.pos + len(text) > self.length: +            return False +        for offset in range(len(text)): +            if self._text[self.pos + offset] != text[offset]: +                return False +        return True + +    def read_str(self, text: str) -> bool: +        if not self.peek_str(text): +            return False +        self._pos += len(text) +        self._lineno += text.count("\n") +        return True + +    @property +    def mem_stack(self) -> CruIterable.IterList[MemStackEntry]: +        return self._mem_stack + +    def push_mem(self) -> None: +        self.mem_stack.append(self.MemStackEntry(self.pos, self.lineno)) + +    def pop_mem(self) -> MemStackEntry: +        return self.mem_stack.pop() + +    def pop_mem_str(self, strip_end: int = 0) -> MemStackPopStr: +        old = self.pop_mem() +        assert self.pos >= old.pos +        return self.MemStackPopStr( +            self._text[old.pos : self.pos - strip_end], old.lineno +        ) + + +class ParseError(CruException, Generic[_T]): +    def __init__( +        self, +        message, +        parser: Parser[_T], +        text: str, +        line_number: int | None = None, +        *args, +        **kwargs, +    ): +        super().__init__(message, *args, **kwargs) +        self._parser = parser +        self._text = text +        self._line_number = line_number + +    @property +    def parser(self) -> Parser[_T]: +        return self._parser + +    @property +    def text(self) -> str: +        return self._text + +    @property +    def line_number(self) -> int | None: +        return self._line_number + + +class Parser(Generic[_T], metaclass=ABCMeta): +    def __init__(self, name: str) -> None: +        self._name = name + +    @property +    def name(self) -> str: +        return self._name + +    @abstractmethod +    def parse(self, s: str) -> _T: +        raise NotImplementedError() + +    def raise_parse_exception( +        self, text: str, line_number: int | None = None +    ) -> NoReturn: +        a = line_number and f" at line {line_number}" or "" +        raise ParseError(f"Parser {self.name} failed{a}.", self, text, line_number) + + +class _SimpleLineVarParserEntry(NamedTuple): +    key: str +    value: str +    line_number: int | None = None + + +class _SimpleLineVarParserResult(CruIterable.IterList[_SimpleLineVarParserEntry]): +    pass + + +class SimpleLineVarParser(Parser[_SimpleLineVarParserResult]): +    """ +    The parsing result is a list of tuples (key, value, line number). +    """ + +    Entry: TypeAlias = _SimpleLineVarParserEntry +    Result: TypeAlias = _SimpleLineVarParserResult + +    def __init__(self) -> None: +        super().__init__(type(self).__name__) + +    def _parse(self, text: str, callback: Callable[[Entry], None]) -> None: +        for ln, line in enumerate(text.splitlines()): +            line_number = ln + 1 +            # check if it's a comment +            if line.strip().startswith("#"): +                continue +            # check if there is a '=' +            if line.find("=") == -1: +                self.raise_parse_exception("There is even no '='!", line_number) +            # split at first '=' +            key, value = line.split("=", 1) +            key = key.strip() +            value = value.strip() +            callback(_SimpleLineVarParserEntry(key, value, line_number)) + +    def parse(self, text: str) -> Result: +        result = _SimpleLineVarParserResult() +        self._parse(text, lambda item: result.append(item)) +        return result + + +class _StrWrapperVarParserTokenKind(Enum): +    TEXT = "TEXT" +    VAR = "VAR" + + +@dataclass +class _StrWrapperVarParserToken: +    kind: _StrWrapperVarParserTokenKind +    value: str +    line_number: int + +    @property +    def is_text(self) -> bool: +        return self.kind is _StrWrapperVarParserTokenKind.TEXT + +    @property +    def is_var(self) -> bool: +        return self.kind is _StrWrapperVarParserTokenKind.VAR + +    @staticmethod +    def from_mem_str( +        kind: _StrWrapperVarParserTokenKind, mem_str: StrParseStream.MemStackPopStr +    ) -> _StrWrapperVarParserToken: +        return _StrWrapperVarParserToken(kind, mem_str.text, mem_str.lineno) + +    def __repr__(self) -> str: +        return f"VAR: {self.value}" if self.is_var else "TEXT: ..." + + +class _StrWrapperVarParserResult(CruIterable.IterList[_StrWrapperVarParserToken]): +    pass + + +class StrWrapperVarParser(Parser[_StrWrapperVarParserResult]): +    TokenKind: TypeAlias = _StrWrapperVarParserTokenKind +    Token: TypeAlias = _StrWrapperVarParserToken +    Result: TypeAlias = _StrWrapperVarParserResult + +    def __init__(self, wrapper: str): +        super().__init__(f"StrWrapperVarParser({wrapper})") +        self._wrapper = wrapper + +    @property +    def wrapper(self) -> str: +        return self._wrapper + +    def parse(self, text: str) -> Result: +        result = self.Result() + +        class _State(Enum): +            TEXT = "TEXT" +            VAR = "VAR" + +        state = _State.TEXT +        stream = StrParseStream(text) +        stream.push_mem() + +        while True: +            if stream.eof: +                break + +            if stream.read_str(self.wrapper): +                if state is _State.TEXT: +                    result.append( +                        self.Token.from_mem_str( +                            self.TokenKind.TEXT, stream.pop_mem_str(len(self.wrapper)) +                        ) +                    ) +                    state = _State.VAR +                    stream.push_mem() +                else: +                    result.append( +                        self.Token.from_mem_str( +                            self.TokenKind.VAR, +                            stream.pop_mem_str(len(self.wrapper)), +                        ) +                    ) +                    state = _State.TEXT +                    stream.push_mem() + +                continue + +            stream.skip(1) + +        if state is _State.VAR: +            raise ParseError("Text ended without closing variable.", self, text) + +        mem_str = stream.pop_mem_str() +        if len(mem_str.text) != 0: +            result.append(self.Token.from_mem_str(self.TokenKind.TEXT, mem_str)) + +        return result diff --git a/python/cru/service/__init__.py b/python/cru/service/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/python/cru/service/__init__.py diff --git a/python/cru/service/__main__.py b/python/cru/service/__main__.py new file mode 100644 index 0000000..2a0268b --- /dev/null +++ b/python/cru/service/__main__.py @@ -0,0 +1,27 @@ +import sys + +from cru import CruException + +from ._app import create_app + + +def main(): +    app = create_app() +    app.run_command() + + +if __name__ == "__main__": +    version_info = sys.version_info +    if not (version_info.major == 3 and version_info.minor >= 11): +        print("This application requires Python 3.11 or later.", file=sys.stderr) +        sys.exit(1) + +    try: +        main() +    except CruException as e: +        user_message = e.get_user_message() +        if user_message is not None: +            print(f"Error: {user_message}") +            exit(1) +        else: +            raise diff --git a/python/cru/service/_app.py b/python/cru/service/_app.py new file mode 100644 index 0000000..b4c6271 --- /dev/null +++ b/python/cru/service/_app.py @@ -0,0 +1,30 @@ +from ._base import ( +    AppBase, +    CommandDispatcher, +    PathCommandProvider, +) +from ._template import TemplateManager +from ._nginx import NginxManager +from ._gen_cmd import GenCmdProvider + +APP_ID = "crupest" + + +class App(AppBase): +    def __init__(self): +        super().__init__(APP_ID, f"{APP_ID}-service") +        self.add_feature(PathCommandProvider()) +        self.add_feature(TemplateManager()) +        self.add_feature(NginxManager()) +        self.add_feature(GenCmdProvider()) +        self.add_feature(CommandDispatcher()) + +    def run_command(self): +        command_dispatcher = self.get_feature(CommandDispatcher) +        command_dispatcher.run_command() + + +def create_app() -> App: +    app = App() +    app.setup() +    return app diff --git a/python/cru/service/_base.py b/python/cru/service/_base.py new file mode 100644 index 0000000..e1eee70 --- /dev/null +++ b/python/cru/service/_base.py @@ -0,0 +1,400 @@ +from __future__ import annotations + +from argparse import ArgumentParser, Namespace +from abc import ABC, abstractmethod +import argparse +import os +from pathlib import Path +from typing import TypeVar, overload + +from cru import CruException, CruLogicError + +_Feature = TypeVar("_Feature", bound="AppFeatureProvider") + + +class AppError(CruException): +    pass + + +class AppFeatureError(AppError): +    def __init__(self, message, feature: type | str, *args, **kwargs): +        super().__init__(message, *args, **kwargs) +        self._feature = feature + +    @property +    def feature(self) -> type | str: +        return self._feature + + +class AppPathError(CruException): +    def __init__(self, message, _path: str | Path, *args, **kwargs): +        super().__init__(message, *args, **kwargs) +        self._path = str(_path) + +    @property +    def path(self) -> str: +        return self._path + + +class AppPath(ABC): +    def __init__(self, id: str, is_dir: bool, description: str) -> None: +        self._is_dir = is_dir +        self._id = id +        self._description = description + +    @property +    @abstractmethod +    def parent(self) -> AppPath | None: ... + +    @property +    @abstractmethod +    def app(self) -> AppBase: ... + +    @property +    def id(self) -> str: +        return self._id + +    @property +    def description(self) -> str: +        return self._description + +    @property +    def is_dir(self) -> bool: +        return self._is_dir + +    @property +    @abstractmethod +    def full_path(self) -> Path: ... + +    @property +    def full_path_str(self) -> str: +        return str(self.full_path) + +    def check_parents(self, must_exist: bool = False) -> bool: +        for p in reversed(self.full_path.parents): +            if not p.exists() and not must_exist: +                return False +            if not p.is_dir(): +                raise AppPathError("Parents' path must be a dir.", self.full_path) +        return True + +    def check_self(self, must_exist: bool = False) -> bool: +        if not self.check_parents(must_exist): +            return False +        if not self.full_path.exists(): +            if not must_exist: +                return False +            raise AppPathError("Not exist.", self.full_path) +        if self.is_dir: +            if not self.full_path.is_dir(): +                raise AppPathError("Should be a directory, but not.", self.full_path) +            else: +                return True +        else: +            if not self.full_path.is_file(): +                raise AppPathError("Should be a file, but not.", self.full_path) +            else: +                return True + +    def ensure(self, create_file: bool = False) -> None: +        e = self.check_self(False) +        if not e: +            os.makedirs(self.full_path.parent, exist_ok=True) +            if self.is_dir: +                os.mkdir(self.full_path) +            elif create_file: +                with open(self.full_path, "w") as f: +                    f.write("") + +    def read_text(self) -> str: +        if self.is_dir: +            raise AppPathError("Can't read text of a dir.", self.full_path) +        self.check_self() +        return self.full_path.read_text() + +    def add_subpath( +        self, +        name: str, +        is_dir: bool, +        /, +        id: str | None = None, +        description: str = "", +    ) -> AppFeaturePath: +        return self.app._add_path(name, is_dir, self, id, description) + +    @property +    def app_relative_path(self) -> Path: +        return self.full_path.relative_to(self.app.root.full_path) + + +class AppFeaturePath(AppPath): +    def __init__( +        self, +        parent: AppPath, +        name: str, +        is_dir: bool, +        /, +        id: str | None = None, +        description: str = "", +    ) -> None: +        super().__init__(id or name, is_dir, description) +        self._name = name +        self._parent = parent + +    @property +    def name(self) -> str: +        return self._name + +    @property +    def parent(self) -> AppPath: +        return self._parent + +    @property +    def app(self) -> AppBase: +        return self.parent.app + +    @property +    def full_path(self) -> Path: +        return Path(self.parent.full_path, self.name).resolve() + + +class AppRootPath(AppPath): +    def __init__(self, app: AppBase, path: Path): +        super().__init__(f"/{id}", True, f"Application {id} root path.") +        self._app = app +        self._full_path = path.resolve() + +    @property +    def parent(self) -> None: +        return None + +    @property +    def app(self) -> AppBase: +        return self._app + +    @property +    def full_path(self) -> Path: +        return self._full_path + + +class AppFeatureProvider(ABC): +    def __init__(self, name: str, /, app: AppBase | None = None): +        super().__init__() +        self._name = name +        self._app = app if app else AppBase.get_instance() + +    @property +    def app(self) -> AppBase: +        return self._app + +    @property +    def name(self) -> str: +        return self._name + +    @abstractmethod +    def setup(self) -> None: ... + + +class AppCommandFeatureProvider(AppFeatureProvider): +    @abstractmethod +    def get_command_info(self) -> tuple[str, str]: ... + +    @abstractmethod +    def setup_arg_parser(self, arg_parser: ArgumentParser): ... + +    @abstractmethod +    def run_command(self, args: Namespace) -> None: ... + + +class PathCommandProvider(AppCommandFeatureProvider): +    def __init__(self) -> None: +        super().__init__("path-command-provider") + +    def setup(self): +        pass + +    def get_command_info(self): +        return ("path", "Get information about paths used by app.") + +    def setup_arg_parser(self, arg_parser: ArgumentParser) -> None: +        subparsers = arg_parser.add_subparsers( +            dest="path_command", metavar="PATH_COMMAND" +        ) +        _list_parser = subparsers.add_parser( +            "list", help="list special paths used by app" +        ) + +    def run_command(self, args: Namespace) -> None: +        if args.path_command is None or args.path_command == "list": +            for path in self.app.paths: +                print( +                    f"{path.app_relative_path.as_posix()}{'/' if path.is_dir else ''}: {path.description}" +                ) + + +class CommandDispatcher(AppFeatureProvider): +    def __init__(self) -> None: +        super().__init__("command-dispatcher") + +    def setup_arg_parser(self) -> None: +        self._map: dict[str, AppCommandFeatureProvider] = {} +        arg_parser = argparse.ArgumentParser( +            description="Service management", +            formatter_class=argparse.RawDescriptionHelpFormatter, +        ) +        subparsers = arg_parser.add_subparsers( +            dest="command", +            help="The management command to execute.", +            metavar="COMMAND", +        ) +        for feature in self.app.features: +            if isinstance(feature, AppCommandFeatureProvider): +                info = feature.get_command_info() +                command_subparser = subparsers.add_parser(info[0], help=info[1]) +                feature.setup_arg_parser(command_subparser) +                self._map[info[0]] = feature +        self._arg_parser = arg_parser + +    def setup(self): +        self._parsed_args = self.arg_parser.parse_args() + +    @property +    def arg_parser(self) -> argparse.ArgumentParser: +        return self._arg_parser + +    @property +    def command_map(self) -> dict[str, AppCommandFeatureProvider]: +        return self._map + +    @property +    def program_args(self) -> argparse.Namespace: +        return self._parsed_args + +    def run_command(self) -> None: +        args = self.program_args +        if args.command is None: +            self.arg_parser.print_help() +            return +        self.command_map[args.command].run_command(args) + + +class AppBase: +    _instance: AppBase | None = None + +    @staticmethod +    def get_instance() -> AppBase: +        if AppBase._instance is None: +            raise AppError("App instance not initialized") +        return AppBase._instance + +    def __init__(self, app_id: str, name: str): +        AppBase._instance = self +        self._app_id = app_id +        self._name = name +        self._features: list[AppFeatureProvider] = [] +        self._paths: list[AppFeaturePath] = [] + +    def setup(self) -> None: +        command_dispatcher = self.get_feature(CommandDispatcher) +        command_dispatcher.setup_arg_parser() +        self._root = AppRootPath(self, Path(self._ensure_env("CRUPEST_PROJECT_DIR"))) +        self._data_dir = self._root.add_subpath( +            self._ensure_env("CRUPEST_DATA_DIR"), True, id="data" +        ) +        self._services_dir = self._root.add_subpath( +            self._ensure_env("CRUPEST_SERVICES_DIR"), True, id="CRUPEST_SERVICES_DIR" +        ) +        for feature in self.features: +            feature.setup() +        for path in self.paths: +            path.check_self() + +    @property +    def app_id(self) -> str: +        return self._app_id + +    @property +    def name(self) -> str: +        return self._name + +    def _ensure_env(self, env_name: str) -> str: +        value = os.getenv(env_name) +        if value is None: +            raise AppError(f"Environment variable {env_name} not set") +        return value + +    @property +    def root(self) -> AppRootPath: +        return self._root + +    @property +    def data_dir(self) -> AppFeaturePath: +        return self._data_dir + +    @property +    def services_dir(self) -> AppFeaturePath: +        return self._services_dir + +    @property +    def app_initialized(self) -> bool: +        return self.data_dir.check_self() + +    @property +    def features(self) -> list[AppFeatureProvider]: +        return self._features + +    @property +    def paths(self) -> list[AppFeaturePath]: +        return self._paths + +    def add_feature(self, feature: _Feature) -> _Feature: +        for f in self.features: +            if f.name == feature.name: +                raise AppFeatureError( +                    f"Duplicate feature name: {feature.name}.", feature.name +                ) +        self._features.append(feature) +        return feature + +    def _add_path( +        self, +        name: str, +        is_dir: bool, +        /, +        parent: AppPath | None = None, +        id: str | None = None, +        description: str = "", +    ) -> AppFeaturePath: +        p = AppFeaturePath( +            parent or self.root, name, is_dir, id=id, description=description +        ) +        self._paths.append(p) +        return p + +    @overload +    def get_feature(self, feature: str) -> AppFeatureProvider: ... + +    @overload +    def get_feature(self, feature: type[_Feature]) -> _Feature: ... + +    def get_feature( +        self, feature: str | type[_Feature] +    ) -> AppFeatureProvider | _Feature: +        if isinstance(feature, str): +            for f in self._features: +                if f.name == feature: +                    return f +        elif isinstance(feature, type): +            for f in self._features: +                if isinstance(f, feature): +                    return f +        else: +            raise CruLogicError("Argument must be the name of feature or its class.") + +        raise AppFeatureError(f"Feature {feature} not found.", feature) + +    def get_path(self, name: str) -> AppFeaturePath: +        for p in self._paths: +            if p.id == name or p.name == name: +                return p +        raise AppPathError(f"Application path {name} not found.", name) diff --git a/python/cru/service/_gen_cmd.py b/python/cru/service/_gen_cmd.py new file mode 100644 index 0000000..f51d65f --- /dev/null +++ b/python/cru/service/_gen_cmd.py @@ -0,0 +1,200 @@ +from dataclasses import dataclass, replace +from typing import TypeAlias + +from ._base import AppCommandFeatureProvider +from ._nginx import NginxManager + +_Str_Or_Cmd_List: TypeAlias = str | list["_Cmd"] + + +@dataclass +class _Cmd: +    name: str +    desc: str +    cmd: _Str_Or_Cmd_List + +    def clean(self) -> "_Cmd": +        if isinstance(self.cmd, list): +            return replace( +                self, +                cmd=[cmd.clean() for cmd in self.cmd], +            ) +        elif isinstance(self.cmd, str): +            return replace(self, cmd=self.cmd.strip()) +        else: +            raise ValueError("Unexpected type for cmd.") + +    def generate_text( +        self, +        info_only: bool, +        *, +        parent: str | None = None, +    ) -> str: +        if parent is None: +            tag = "COMMAND" +            long_name = self.name +            indent = "" +        else: +            tag = "SUBCOMMAND" +            long_name = f"{parent}.{self.name}" +            indent = "  " + +        if info_only: +            return f"{indent}[{long_name}]: {self.desc}" + +        text = f"--- {tag}[{long_name}]: {self.desc}" +        if isinstance(self.cmd, str): +            text += "\n" + self.cmd +        elif isinstance(self.cmd, list): +            for sub in self.cmd: +                text += "\n" * 2 + sub.generate_text(info_only, parent=self.name) +        else: +            raise ValueError("Unexpected type for cmd.") + +        lines: list[str] = [] +        for line in text.splitlines(): +            if len(line) == 0: +                lines.append("") +            else: +                lines.append(indent + line) +        text = "\n".join(lines) + +        return text + + +_docker_uninstall = _Cmd( +    "uninstall", +    "uninstall apt docker", +    """ +for pkg in docker.io docker-doc docker-compose \ +podman-docker containerd runc; \ +do sudo apt-get remove $pkg; done +""", +) + +_docker_apt_certs = _Cmd( +    "apt-certs", +    "prepare apt certs", +    """ +sudo apt-get update +sudo apt-get install ca-certificates curl +sudo install -m 0755 -d /etc/apt/keyrings +""", +) + +_docker_docker_certs = _Cmd( +    "docker-certs", +    "add docker certs", +    """ +sudo curl -fsSL https://download.docker.com/linux/debian/gpg \ +-o /etc/apt/keyrings/docker.asc +sudo chmod a+r /etc/apt/keyrings/docker.asc +""", +) + +_docker_apt_repo = _Cmd( +    "apt-repo", +    "add docker apt repo", +    """ +echo \\ +  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \ +https://download.docker.com/linux/debian \\ +  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\ +  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null +""", +) + +_docker_install = _Cmd( +    "install", +    "update apt and install docker", +    """ +sudo apt-get update +sudo apt-get install docker-ce docker-ce-cli containerd.io \ +docker-buildx-plugin docker-compose-plugin +""", +) + +_docker_setup = _Cmd( +    "setup", +    "setup system for docker", +    """ +sudo systemctl enable docker +sudo systemctl start docker +sudo groupadd -f docker +sudo usermod -aG docker $USER +# Remember to log out and log back in for the group changes to take effect +""", +) + +_docker = _Cmd( +    "install-docker", +    "install docker for a fresh new system", +    [ +        _docker_uninstall, +        _docker_apt_certs, +        _docker_docker_certs, +        _docker_apt_repo, +        _docker_install, +        _docker_setup, +    ], +) + +_update_blog = _Cmd( +    "update-blog", +    "re-generate blog pages", +    """ +docker exec -it blog /scripts/update.bash +""", +) + +_git_user = _Cmd( +    "git-user", +    "add/set git server user and password", +    """ +docker run -it --rm -v "$ps_file:/user-info" httpd htpasswd "/user-info" [username] +""", +) + + +class GenCmdProvider(AppCommandFeatureProvider): +    def __init__(self) -> None: +        super().__init__("gen-cmd-provider") +        self._cmds: dict[str, _Cmd] = {} +        self._register_cmds(_docker, _update_blog, _git_user) + +    def _register_cmd(self, cmd: "_Cmd"): +        self._cmds[cmd.name] = cmd.clean() + +    def _register_cmds(self, *cmds: "_Cmd"): +        for c in cmds: +            self._register_cmd(c) + +    def setup(self): +        pass + +    def get_command_info(self): +        return ("gen-cmd", "Get commands of running external cli tools.") + +    def setup_arg_parser(self, arg_parser): +        subparsers = arg_parser.add_subparsers( +            dest="gen_cmd", metavar="GEN_CMD_COMMAND" +        ) +        certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") +        certbot_parser.add_argument( +            "-t", "--test", action="store_true", help="run certbot in test mode" +        ) +        for cmd in self._cmds.values(): +            subparsers.add_parser(cmd.name, help=cmd.desc) + +    def _print_cmd(self, name: str): +        print(self._cmds[name].generate_text(False)) + +    def run_command(self, args): +        if args.gen_cmd is None or args.gen_cmd == "list": +            print("[certbot]: certbot ssl cert commands") +            for cmd in self._cmds.values(): +                print(cmd.generate_text(True)) +        elif args.gen_cmd == "certbot": +            self.app.get_feature(NginxManager).print_all_certbot_commands(args.test) +        else: +            self._print_cmd(args.gen_cmd) diff --git a/python/cru/service/_nginx.py b/python/cru/service/_nginx.py new file mode 100644 index 0000000..87cff6d --- /dev/null +++ b/python/cru/service/_nginx.py @@ -0,0 +1,263 @@ +from argparse import Namespace +from enum import Enum, auto +import re +import subprocess +from typing import TypeAlias + +from cru import CruInternalError + +from ._base import AppCommandFeatureProvider +from ._template import TemplateManager + + +class CertbotAction(Enum): +    CREATE = auto() +    EXPAND = auto() +    SHRINK = auto() +    RENEW = auto() + + +class NginxManager(AppCommandFeatureProvider): +    CertbotAction: TypeAlias = CertbotAction + +    def __init__(self) -> None: +        super().__init__("nginx-manager") +        self._domains_cache: list[str] | None = None + +    def setup(self) -> None: +        pass + +    @property +    def _template_manager(self) -> TemplateManager: +        return self.app.get_feature(TemplateManager) + +    @property +    def root_domain(self) -> str: +        return self._template_manager.get_domain() + +    @property +    def domains(self) -> list[str]: +        if self._domains_cache is None: +            self._domains_cache = self._get_domains() +        return self._domains_cache + +    @property +    def subdomains(self) -> list[str]: +        suffix = "." + self.root_domain +        return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)] + +    def _get_domains_from_text(self, text: str) -> set[str]: +        domains: set[str] = set() +        regex = re.compile(r"server_name\s+(\S+)\s*;") +        for match in regex.finditer(text): +            domains.add(match[1]) +        return domains + +    def _join_generated_nginx_conf_text(self) -> str: +        result = "" +        for path, text in self._template_manager.generate(): +            if "nginx" in str(path): +                result += text +        return result + +    def _get_domains(self) -> list[str]: +        text = self._join_generated_nginx_conf_text() +        domains = self._get_domains_from_text(text) +        domains.remove(self.root_domain) +        return [self.root_domain, *domains] + +    def _print_domains(self) -> None: +        for domain in self.domains: +            print(domain) + +    def _certbot_command( +        self, +        action: CertbotAction | str, +        test: bool, +        *, +        docker=True, +        standalone=None, +        email=None, +        agree_tos=True, +    ) -> str: +        if isinstance(action, str): +            action = CertbotAction[action.upper()] + +        command_args = [] + +        add_domain_option = True +        if action is CertbotAction.CREATE: +            if standalone is None: +                standalone = True +            command_action = "certonly" +        elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]: +            if standalone is None: +                standalone = False +            command_action = "certonly" +        elif action is CertbotAction.RENEW: +            if standalone is None: +                standalone = False +            add_domain_option = False +            command_action = "renew" +        else: +            raise CruInternalError("Invalid certbot action.") + +        data_dir = self.app.data_dir.full_path.as_posix() + +        if not docker: +            command_args.append("certbot") +        else: +            command_args.extend( +                [ +                    "docker run -it --rm --name certbot", +                    f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"', +                    f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"', +                ] +            ) +            if standalone: +                command_args.append('-p "0.0.0.0:80:80"') +            else: +                command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"') + +            command_args.append("certbot/certbot") + +        command_args.append(command_action) + +        command_args.append(f"--cert-name {self.root_domain}") + +        if standalone: +            command_args.append("--standalone") +        else: +            command_args.append("--webroot -w /var/www/certbot") + +        if add_domain_option: +            command_args.append(" ".join([f"-d {domain}" for domain in self.domains])) + +        if email is not None: +            command_args.append(f"--email {email}") + +        if agree_tos: +            command_args.append("--agree-tos") + +        if test: +            command_args.append("--test-cert --dry-run") + +        return " ".join(command_args) + +    def print_all_certbot_commands(self, test: bool): +        print("### COMMAND: (standalone) create certs") +        print( +            self._certbot_command( +                CertbotAction.CREATE, +                test, +                email=self._template_manager.get_email(), +            ) +        ) +        print() +        print("### COMMAND: (webroot+nginx) expand or shrink certs") +        print( +            self._certbot_command( +                CertbotAction.EXPAND, +                test, +                email=self._template_manager.get_email(), +            ) +        ) +        print() +        print("### COMMAND: (webroot+nginx) renew certs") +        print( +            self._certbot_command( +                CertbotAction.RENEW, +                test, +                email=self._template_manager.get_email(), +            ) +        ) + +    @property +    def _cert_path_str(self) -> str: +        return str( +            self.app.data_dir.full_path +            / "certbot/certs/live" +            / self.root_domain +            / "fullchain.pem" +        ) + +    def get_command_info(self): +        return "nginx", "Manage nginx related things." + +    def setup_arg_parser(self, arg_parser): +        subparsers = arg_parser.add_subparsers( +            dest="nginx_command", required=True, metavar="NGINX_COMMAND" +        ) +        _list_parser = subparsers.add_parser("list", help="list domains") +        certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") +        certbot_parser.add_argument( +            "--no-test", +            action="store_true", +            help="remove args making certbot run in test mode", +        ) + +    def run_command(self, args: Namespace) -> None: +        if args.nginx_command == "list": +            self._print_domains() +        elif args.nginx_command == "certbot": +            self.print_all_certbot_commands(not args.no_test) + +    def _generate_dns_zone( +        self, +        ip: str, +        /, +        ttl: str | int = 600, +        *, +        enable_mail: bool = True, +        dkim: str | None = None, +    ) -> str: +        # TODO: Not complete and test now. +        root_domain = self.root_domain +        result = f"$ORIGIN {root_domain}.\n\n" +        result += "; A records\n" +        result += f"@ {ttl} IN A {ip}\n" +        for subdomain in self.subdomains: +            result += f"{subdomain} {ttl} IN A {ip}\n" + +        if enable_mail: +            result += "\n; MX records\n" +            result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n" +            result += "\n; SPF record\n" +            result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n' +            if dkim is not None: +                result += "\n; DKIM record\n" +                result += f'mail._domainkey {ttl} IN TEXT "{dkim}"' +                result += "\n; DMARC record\n" +                dmarc_options = [ +                    "v=DMARC1", +                    "p=none", +                    f"rua=mailto:dmarc.report@{root_domain}", +                    f"ruf=mailto:dmarc.report@{root_domain}", +                    "sp=none", +                    "ri=86400", +                ] +                result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n' +        return result + +    def _get_dkim_from_mailserver(self) -> str | None: +        # TODO: Not complete and test now. +        dkim_path = ( +            self.app.data_dir.full_path +            / "dms/config/opendkim/keys" +            / self.root_domain +            / "mail.txt" +        ) +        if not dkim_path.exists(): +            return None + +        p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True) +        value = "" +        for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")): +            value += match.group(1) +        return value + +    def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str: +        # TODO: Not complete and test now. +        return self._generate_dns_zone( +            ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver() +        ) diff --git a/python/cru/service/_template.py b/python/cru/service/_template.py new file mode 100644 index 0000000..22c1d21 --- /dev/null +++ b/python/cru/service/_template.py @@ -0,0 +1,228 @@ +from argparse import Namespace +from pathlib import Path +import shutil +from typing import NamedTuple +import graphlib + +from cru import CruException +from cru.parsing import SimpleLineVarParser +from cru.template import TemplateTree, CruStrWrapperTemplate + +from ._base import AppCommandFeatureProvider, AppFeaturePath + + +class _Config(NamedTuple): +    text: str +    config: dict[str, str] + + +class _GeneratedConfig(NamedTuple): +    base: _Config +    private: _Config +    merged: _Config + + +class _PreConfig(NamedTuple): +    base: _Config +    private: _Config +    config: dict[str, str] + +    @staticmethod +    def create(base: _Config, private: _Config) -> "_PreConfig": +        return _PreConfig(base, private, {**base.config, **private.config}) + +    def _merge(self, generated: _Config): +        text = ( +            "\n".join( +                [ +                    self.private.text.strip(), +                    self.base.text.strip(), +                    generated.text.strip(), +                ] +            ) +            + "\n" +        ) +        config = {**self.config, **generated.config} +        return _GeneratedConfig(self.base, self.private, _Config(text, config)) + + +class _Template(NamedTuple): +    config: CruStrWrapperTemplate +    config_vars: set[str] +    tree: TemplateTree + + +class TemplateManager(AppCommandFeatureProvider): +    def __init__(self): +        super().__init__("template-manager") + +    def setup(self) -> None: +        self._base_config_file = self.app.services_dir.add_subpath("base-config", False) +        self._private_config_file = self.app.data_dir.add_subpath("config", False) +        self._template_config_file = self.app.services_dir.add_subpath( +            "config.template", False +        ) +        self._templates_dir = self.app.services_dir.add_subpath("templates", True) +        self._generated_dir = self.app.services_dir.add_subpath("generated", True) + +        self._config_parser = SimpleLineVarParser() + +        def _read_pre(app_path: AppFeaturePath) -> _Config: +            text = app_path.read_text() +            config = self._read_config(text) +            return _Config(text, config) + +        base = _read_pre(self._base_config_file) +        private = _read_pre(self._private_config_file) +        self._preconfig = _PreConfig.create(base, private) + +        self._generated: _GeneratedConfig | None = None + +        template_config_text = self._template_config_file.read_text() +        self._template_config = self._read_config(template_config_text) + +        self._template = _Template( +            CruStrWrapperTemplate(template_config_text), +            set(self._template_config.keys()), +            TemplateTree( +                lambda text: CruStrWrapperTemplate(text), +                self.templates_dir.full_path_str, +            ), +        ) + +        self._real_required_vars = ( +            self._template.config_vars | self._template.tree.variables +        ) - self._template.config_vars +        lacks = self._real_required_vars - self._preconfig.config.keys() +        self._lack_vars = lacks if len(lacks) > 0 else None + +    def _read_config_entry_names(self, text: str) -> set[str]: +        return set(entry.key for entry in self._config_parser.parse(text)) + +    def _read_config(self, text: str) -> dict[str, str]: +        return {entry.key: entry.value for entry in self._config_parser.parse(text)} + +    @property +    def templates_dir(self) -> AppFeaturePath: +        return self._templates_dir + +    @property +    def generated_dir(self) -> AppFeaturePath: +        return self._generated_dir + +    def get_domain(self) -> str: +        return self._preconfig.config["CRUPEST_DOMAIN"] + +    def get_email(self) -> str: +        return self._preconfig.config["CRUPEST_EMAIL"] + +    def _generate_template_config(self, config: dict[str, str]) -> dict[str, str]: +        entry_templates = { +            key: CruStrWrapperTemplate(value) +            for key, value in self._template_config.items() +        } +        sorter = graphlib.TopologicalSorter( +            config +            | {key: template.variables for key, template in entry_templates.items()} +        ) + +        vars: dict[str, str] = config.copy() +        for _ in sorter.static_order(): +            del_keys = [] +            for key, template in entry_templates.items(): +                new = template.generate_partial(vars) +                if not new.has_variables: +                    vars[key] = new.generate({}) +                    del_keys.append(key) +                else: +                    entry_templates[key] = new +            for key in del_keys: +                del entry_templates[key] +        assert len(entry_templates) == 0 +        return {key: value for key, value in vars.items() if key not in config} + +    def _generate_config(self) -> _GeneratedConfig: +        if self._generated is not None: +            return self._generated +        if self._lack_vars is not None: +            raise CruException(f"Required vars are not defined: {self._lack_vars}.") +        config = self._generate_template_config(self._preconfig.config) +        text = self._template.config.generate(self._preconfig.config | config) +        self._generated = self._preconfig._merge(_Config(text, config)) +        return self._generated + +    def generate(self) -> list[tuple[Path, str]]: +        config = self._generate_config() +        return [ +            (Path("config"), config.merged.text), +            *self._template.tree.generate(config.merged.config), +        ] + +    def _generate_files(self, dry_run: bool) -> None: +        result = self.generate() +        if not dry_run: +            if self.generated_dir.full_path.exists(): +                shutil.rmtree(self.generated_dir.full_path) +            for path, text in result: +                des = self.generated_dir.full_path / path +                des.parent.mkdir(parents=True, exist_ok=True) +                with open(des, "w") as f: +                    f.write(text) + +    def get_command_info(self): +        return ("template", "Manage templates.") + +    def _print_file_lists(self) -> None: +        print(f"[{self._template.config.variable_count}]", "config") +        for path, template in self._template.tree.templates: +            print(f"[{template.variable_count}]", path.as_posix()) + +    def _print_vars(self, required: bool) -> None: +        for var in self._template.config.variables: +            print(f"[config] {var}") +        for var in self._template.tree.variables: +            if not (required and var in self._template.config_vars): +                print(f"[template] {var}") + +    def _run_check_vars(self) -> None: +        if self._lack_vars is not None: +            print("Lacks:") +            for var in self._lack_vars: +                print(var) + +    def setup_arg_parser(self, arg_parser): +        subparsers = arg_parser.add_subparsers( +            dest="template_command", required=True, metavar="TEMPLATE_COMMAND" +        ) +        _list_parser = subparsers.add_parser("list", help="list templates") +        vars_parser = subparsers.add_parser( +            "vars", help="list variables used in all templates" +        ) +        vars_parser.add_argument( +            "-r", +            "--required", +            help="only list really required one.", +            action="store_true", +        ) +        _check_vars_parser = subparsers.add_parser( +            "check-vars", +            help="check if required vars are set", +        ) +        generate_parser = subparsers.add_parser("generate", help="generate templates") +        generate_parser.add_argument( +            "--no-dry-run", action="store_true", help="generate and write target files" +        ) + +    def run_command(self, args: Namespace) -> None: +        if args.template_command == "list": +            self._print_file_lists() +        elif args.template_command == "vars": +            self._print_vars(args.required) +        elif args.template_command == "generate": +            dry_run = not args.no_dry_run +            self._generate_files(dry_run) +            if dry_run: +                print("Dry run successfully.") +                print( +                    f"Will delete dir {self.generated_dir.full_path_str} if it exists." +                ) diff --git a/python/cru/system.py b/python/cru/system.py new file mode 100644 index 0000000..f321717 --- /dev/null +++ b/python/cru/system.py @@ -0,0 +1,23 @@ +import os.path +import re + + +def check_debian_derivative_version(name: str) -> None | str: +    if not os.path.isfile("/etc/os-release"): +        return None +    with open("/etc/os-release", "r") as f: +        content = f.read() +        if f"ID={name}" not in content: +            return None +        m = re.search(r'VERSION_ID="(.+)"', content) +        if m is None: +            return None +        return m.group(1) + + +def check_ubuntu_version() -> None | str: +    return check_debian_derivative_version("ubuntu") + + +def check_debian_version() -> None | str: +    return check_debian_derivative_version("debian") diff --git a/python/cru/template.py b/python/cru/template.py new file mode 100644 index 0000000..3a70337 --- /dev/null +++ b/python/cru/template.py @@ -0,0 +1,209 @@ +from abc import ABCMeta, abstractmethod +from collections.abc import Callable, Mapping +from pathlib import Path +from string import Template +from typing import Generic, Self, TypeVar + +from ._iter import CruIterator +from ._error import CruException + +from .parsing import StrWrapperVarParser + + +class CruTemplateError(CruException): +    pass + + +class CruTemplateBase(metaclass=ABCMeta): +    def __init__(self, text: str): +        self._text = text +        self._variables: set[str] | None = None + +    @abstractmethod +    def _get_variables(self) -> set[str]: +        raise NotImplementedError() + +    @property +    def text(self) -> str: +        return self._text + +    @property +    def variables(self) -> set[str]: +        if self._variables is None: +            self._variables = self._get_variables() +        return self._variables + +    @property +    def variable_count(self) -> int: +        return len(self.variables) + +    @property +    def has_variables(self) -> bool: +        return self.variable_count > 0 + +    @abstractmethod +    def _do_generate(self, mapping: dict[str, str]) -> str: +        raise NotImplementedError() + +    def _generate_partial( +        self, mapping: Mapping[str, str], allow_unused: bool = True +    ) -> str: +        values = dict(mapping) +        if not allow_unused and not len(set(values.keys() - self.variables)) != 0: +            raise CruTemplateError("Unused variables.") +        return self._do_generate(values) + +    def generate_partial( +        self, mapping: Mapping[str, str], allow_unused: bool = True +    ) -> Self: +        return self.__class__(self._generate_partial(mapping, allow_unused)) + +    def generate(self, mapping: Mapping[str, str], allow_unused: bool = True) -> str: +        values = dict(mapping) +        if len(self.variables - values.keys()) != 0: +            raise CruTemplateError( +                f"Missing variables: {self.variables - values.keys()} ." +            ) +        return self._generate_partial(values, allow_unused) + + +class CruTemplate(CruTemplateBase): +    def __init__(self, prefix: str, text: str): +        super().__init__(text) +        self._prefix = prefix +        self._template = Template(text) + +    def _get_variables(self) -> set[str]: +        return ( +            CruIterator(self._template.get_identifiers()) +            .filter(lambda i: i.startswith(self.prefix)) +            .to_set() +        ) + +    @property +    def prefix(self) -> str: +        return self._prefix + +    @property +    def py_template(self) -> Template: +        return self._template + +    @property +    def all_variables(self) -> set[str]: +        return set(self._template.get_identifiers()) + +    def _do_generate(self, mapping: dict[str, str]) -> str: +        return self._template.safe_substitute(mapping) + + +class CruStrWrapperTemplate(CruTemplateBase): +    def __init__(self, text: str, wrapper: str = "@@"): +        super().__init__(text) +        self._wrapper = wrapper +        self._tokens: StrWrapperVarParser.Result + +    @property +    def wrapper(self) -> str: +        return self._wrapper + +    def _get_variables(self): +        self._tokens = StrWrapperVarParser(self.wrapper).parse(self.text) +        return ( +            self._tokens.cru_iter() +            .filter(lambda t: t.is_var) +            .map(lambda t: t.value) +            .to_set() +        ) + +    def _do_generate(self, mapping): +        return ( +            self._tokens.cru_iter() +            .map(lambda t: mapping[t.value] if t.is_var else t.value) +            .join_str("") +        ) + + +_Template = TypeVar("_Template", bound=CruTemplateBase) + + +class TemplateTree(Generic[_Template]): +    def __init__( +        self, +        template_generator: Callable[[str], _Template], +        source: str, +        *, +        template_file_suffix: str | None = ".template", +    ): +        """ +        If template_file_suffix is not None, the files will be checked according to the +        suffix of the file name. If the suffix matches, the file will be regarded as a +        template file. Otherwise, it will be regarded as a non-template file. +        Content of template file must contain variables that need to be replaced, while +        content of non-template file may not contain any variables. +        If either case is false, it generally means whether the file is a template is +        wrongly handled. +        """ +        self._template_generator = template_generator +        self._files: list[tuple[Path, _Template]] = [] +        self._source = source +        self._template_file_suffix = template_file_suffix +        self._load() + +    @property +    def templates(self) -> list[tuple[Path, _Template]]: +        return self._files + +    @property +    def source(self) -> str: +        return self._source + +    @property +    def template_file_suffix(self) -> str | None: +        return self._template_file_suffix + +    @staticmethod +    def _scan_files(root: str) -> list[Path]: +        root_path = Path(root) +        result: list[Path] = [] +        for path in root_path.glob("**/*"): +            if not path.is_file(): +                continue +            path = path.relative_to(root_path) +            result.append(Path(path)) +        return result + +    def _load(self) -> None: +        files = self._scan_files(self.source) +        for file_path in files: +            template_file = Path(self.source) / file_path +            with open(template_file, "r") as f: +                content = f.read() +            template = self._template_generator(content) +            if self.template_file_suffix is not None: +                should_be_template = file_path.name.endswith(self.template_file_suffix) +                if should_be_template and not template.has_variables: +                    raise CruTemplateError( +                        f"Template file {file_path} has no variables." +                    ) +                elif not should_be_template and template.has_variables: +                    raise CruTemplateError(f"Non-template {file_path} has variables.") +            self._files.append((file_path, template)) + +    @property +    def variables(self) -> set[str]: +        s = set() +        for _, template in self.templates: +            s.update(template.variables) +        return s + +    def generate(self, variables: Mapping[str, str]) -> list[tuple[Path, str]]: +        result: list[tuple[Path, str]] = [] +        for path, template in self.templates: +            if self.template_file_suffix is not None and path.name.endswith( +                self.template_file_suffix +            ): +                path = path.parent / (path.name[: -len(self.template_file_suffix)]) + +            text = template.generate(variables) +            result.append((path, text)) +        return result diff --git a/python/cru/tool.py b/python/cru/tool.py new file mode 100644 index 0000000..377f5d7 --- /dev/null +++ b/python/cru/tool.py @@ -0,0 +1,82 @@ +import shutil +import subprocess +from typing import Any +from collections.abc import Iterable + +from ._error import CruException + + +class CruExternalToolError(CruException): +    def __init__(self, message: str, tool: str, *args, **kwargs) -> None: +        super().__init__(message, *args, **kwargs) +        self._tool = tool + +    @property +    def tool(self) -> str: +        return self._tool + + +class CruExternalToolNotFoundError(CruExternalToolError): +    def __init__(self, message: str | None, tool: str, *args, **kwargs) -> None: +        super().__init__( +            message or f"Could not find binary for {tool}.", tool, *args, **kwargs +        ) + + +class CruExternalToolRunError(CruExternalToolError): +    def __init__( +        self, +        message: str, +        tool: str, +        tool_args: Iterable[str], +        tool_error: Any, +        *args, +        **kwargs, +    ) -> None: +        super().__init__(message, tool, *args, **kwargs) +        self._tool_args = list(tool_args) +        self._tool_error = tool_error + +    @property +    def tool_args(self) -> list[str]: +        return self._tool_args + +    @property +    def tool_error(self) -> Any: +        return self._tool_error + + +class ExternalTool: +    def __init__(self, bin: str) -> None: +        self._bin = bin + +    @property +    def bin(self) -> str: +        return self._bin + +    @bin.setter +    def bin(self, value: str) -> None: +        self._bin = value + +    @property +    def bin_path(self) -> str: +        real_bin = shutil.which(self.bin) +        if not real_bin: +            raise CruExternalToolNotFoundError(None, self.bin) +        return real_bin + +    def run( +        self, *process_args: str, **subprocess_kwargs +    ) -> subprocess.CompletedProcess: +        try: +            return subprocess.run( +                [self.bin_path] + list(process_args), **subprocess_kwargs +            ) +        except subprocess.CalledProcessError as e: +            raise CruExternalToolError("Subprocess failed.", self.bin) from e +        except OSError as e: +            raise CruExternalToolError("Failed to start subprocess", self.bin) from e + +    def run_get_output(self, *process_args: str, **subprocess_kwargs) -> Any: +        process = self.run(*process_args, capture_output=True, **subprocess_kwargs) +        return process.stdout diff --git a/python/cru/value.py b/python/cru/value.py new file mode 100644 index 0000000..9c03219 --- /dev/null +++ b/python/cru/value.py @@ -0,0 +1,292 @@ +from __future__ import annotations + +import random +import secrets +import string +import uuid +from abc import abstractmethod, ABCMeta +from collections.abc import Callable +from typing import Any, ClassVar, TypeVar, Generic + +from ._error import CruException + + +def _str_case_in(s: str, case: bool, str_list: list[str]) -> bool: +    if case: +        return s in str_list +    else: +        return s.lower() in [s.lower() for s in str_list] + + +_T = TypeVar("_T") + + +class CruValueTypeError(CruException): +    def __init__( +        self, +        message: str, +        value: Any, +        value_type: ValueType | None, +        *args, +        **kwargs, +    ): +        super().__init__( +            message, +            *args, +            **kwargs, +        ) +        self._value = value +        self._value_type = value_type + +    @property +    def value(self) -> Any: +        return self._value + +    @property +    def value_type(self) -> ValueType | None: +        return self._value_type + + +class ValueType(Generic[_T], metaclass=ABCMeta): +    def __init__(self, name: str, _type: type[_T]) -> None: +        self._name = name +        self._type = _type + +    @property +    def name(self) -> str: +        return self._name + +    @property +    def type(self) -> type[_T]: +        return self._type + +    def check_value_type(self, value: Any) -> None: +        if not isinstance(value, self.type): +            raise CruValueTypeError("Type of value is wrong.", value, self) + +    def _do_check_value(self, value: Any) -> _T: +        return value + +    def check_value(self, value: Any) -> _T: +        self.check_value_type(value) +        return self._do_check_value(value) + +    @abstractmethod +    def _do_check_str_format(self, s: str) -> None: +        raise NotImplementedError() + +    def check_str_format(self, s: str) -> None: +        if not isinstance(s, str): +            raise CruValueTypeError("Try to check format on a non-str.", s, self) +        self._do_check_str_format(s) + +    @abstractmethod +    def _do_convert_value_to_str(self, value: _T) -> str: +        raise NotImplementedError() + +    def convert_value_to_str(self, value: _T) -> str: +        self.check_value(value) +        return self._do_convert_value_to_str(value) + +    @abstractmethod +    def _do_convert_str_to_value(self, s: str) -> _T: +        raise NotImplementedError() + +    def convert_str_to_value(self, s: str) -> _T: +        self.check_str_format(s) +        return self._do_convert_str_to_value(s) + +    def check_value_or_try_convert_from_str(self, value_or_str: Any) -> _T: +        try: +            return self.check_value(value_or_str) +        except CruValueTypeError: +            if isinstance(value_or_str, str): +                return self.convert_str_to_value(value_or_str) +            else: +                raise + +    def create_default_value(self) -> _T: +        return self.type() + + +class TextValueType(ValueType[str]): +    def __init__(self) -> None: +        super().__init__("text", str) + +    def _do_check_str_format(self, _s): +        return + +    def _do_convert_value_to_str(self, value): +        return value + +    def _do_convert_str_to_value(self, s): +        return s + + +class IntegerValueType(ValueType[int]): +    def __init__(self) -> None: +        super().__init__("integer", int) + +    def _do_check_str_format(self, s): +        try: +            int(s) +        except ValueError as e: +            raise CruValueTypeError("Invalid integer format.", s, self) from e + +    def _do_convert_value_to_str(self, value): +        return str(value) + +    def _do_convert_str_to_value(self, s): +        return int(s) + + +class FloatValueType(ValueType[float]): +    def __init__(self) -> None: +        super().__init__("float", float) + +    def _do_check_str_format(self, s): +        try: +            float(s) +        except ValueError as e: +            raise CruValueTypeError("Invalid float format.", s, self) from e + +    def _do_convert_value_to_str(self, value): +        return str(value) + +    def _do_convert_str_to_value(self, s): +        return float(s) + + +class BooleanValueType(ValueType[bool]): +    DEFAULT_TRUE_LIST: ClassVar[list[str]] = ["true", "yes", "y", "on", "1"] +    DEFAULT_FALSE_LIST: ClassVar[list[str]] = ["false", "no", "n", "off", "0"] + +    def __init__( +        self, +        *, +        case_sensitive=False, +        true_list: None | list[str] = None, +        false_list: None | list[str] = None, +    ) -> None: +        super().__init__("boolean", bool) +        self._case_sensitive = case_sensitive +        self._valid_true_strs: list[str] = ( +            true_list or BooleanValueType.DEFAULT_TRUE_LIST +        ) +        self._valid_false_strs: list[str] = ( +            false_list or BooleanValueType.DEFAULT_FALSE_LIST +        ) + +    @property +    def case_sensitive(self) -> bool: +        return self._case_sensitive + +    @property +    def valid_true_strs(self) -> list[str]: +        return self._valid_true_strs + +    @property +    def valid_false_strs(self) -> list[str]: +        return self._valid_false_strs + +    @property +    def valid_boolean_strs(self) -> list[str]: +        return self._valid_true_strs + self._valid_false_strs + +    def _do_check_str_format(self, s): +        if not _str_case_in(s, self.case_sensitive, self.valid_boolean_strs): +            raise CruValueTypeError("Invalid boolean format.", s, self) + +    def _do_convert_value_to_str(self, value): +        return self._valid_true_strs[0] if value else self._valid_false_strs[0] + +    def _do_convert_str_to_value(self, s): +        return _str_case_in(s, self.case_sensitive, self._valid_true_strs) + +    def create_default_value(self): +        return self.valid_false_strs[0] + + +class EnumValueType(ValueType[str]): +    def __init__(self, valid_values: list[str], /, case_sensitive=False) -> None: +        super().__init__(f"enum({'|'.join(valid_values)})", str) +        self._case_sensitive = case_sensitive +        self._valid_values = valid_values + +    @property +    def case_sensitive(self) -> bool: +        return self._case_sensitive + +    @property +    def valid_values(self) -> list[str]: +        return self._valid_values + +    def _do_check_value(self, value): +        self._do_check_str_format(value) + +    def _do_check_str_format(self, s): +        if not _str_case_in(s, self.case_sensitive, self.valid_values): +            raise CruValueTypeError("Invalid enum value", s, self) + +    def _do_convert_value_to_str(self, value): +        return value + +    def _do_convert_str_to_value(self, s): +        return s + +    def create_default_value(self): +        return self.valid_values[0] + + +TEXT_VALUE_TYPE = TextValueType() +INTEGER_VALUE_TYPE = IntegerValueType() +BOOLEAN_VALUE_TYPE = BooleanValueType() + + +class ValueGeneratorBase(Generic[_T], metaclass=ABCMeta): +    @abstractmethod +    def generate(self) -> _T: +        raise NotImplementedError() + +    def __call__(self) -> _T: +        return self.generate() + + +class ValueGenerator(ValueGeneratorBase[_T]): +    def __init__(self, generate_func: Callable[[], _T]) -> None: +        self._generate_func = generate_func + +    @property +    def generate_func(self) -> Callable[[], _T]: +        return self._generate_func + +    def generate(self) -> _T: +        return self._generate_func() + + +class UuidValueGenerator(ValueGeneratorBase[str]): +    def generate(self): +        return str(uuid.uuid4()) + + +class RandomStringValueGenerator(ValueGeneratorBase[str]): +    def __init__(self, length: int, secure: bool) -> None: +        self._length = length +        self._secure = secure + +    @property +    def length(self) -> int: +        return self._length + +    @property +    def secure(self) -> bool: +        return self._secure + +    def generate(self): +        random_func = secrets.choice if self._secure else random.choice +        characters = string.ascii_letters + string.digits +        random_string = "".join(random_func(characters) for _ in range(self._length)) +        return random_string + + +UUID_VALUE_GENERATOR = UuidValueGenerator() | 
