diff options
| author | Yuqian Yang <crupest@crupest.life> | 2025-02-22 18:11:35 +0800 | 
|---|---|---|
| committer | Yuqian Yang <crupest@crupest.life> | 2025-02-23 01:36:11 +0800 | 
| commit | 34704b2090c48d9ab9e7458475fd701a38706ae7 (patch) | |
| tree | 2aff2885ecfbef32095daf205bf2ebb32a7f2c34 /tools/cru-py | |
| parent | 594691a66ceb00536e133ece321846f7ba49f881 (diff) | |
| download | crupest-34704b2090c48d9ab9e7458475fd701a38706ae7.tar.gz crupest-34704b2090c48d9ab9e7458475fd701a38706ae7.tar.bz2 crupest-34704b2090c48d9ab9e7458475fd701a38706ae7.zip | |
feat(services): refactor structure.
Diffstat (limited to 'tools/cru-py')
31 files changed, 0 insertions, 4316 deletions
| diff --git a/tools/cru-py/.gitignore b/tools/cru-py/.gitignore deleted file mode 100644 index f5833b1..0000000 --- a/tools/cru-py/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -__pycache__ -.venv -.mypy_cache diff --git a/tools/cru-py/.python-version b/tools/cru-py/.python-version deleted file mode 100644 index 2c07333..0000000 --- a/tools/cru-py/.python-version +++ /dev/null @@ -1 +0,0 @@ -3.11 diff --git a/tools/cru-py/cru/__init__.py b/tools/cru-py/cru/__init__.py deleted file mode 100644 index 17799a9..0000000 --- a/tools/cru-py/cru/__init__.py +++ /dev/null @@ -1,60 +0,0 @@ -import sys - -from ._base import CRU, CruNamespaceError, CRU_NAME_PREFIXES -from ._error import ( -    CruException, -    CruLogicError, -    CruInternalError, -    CruUnreachableError, -    cru_unreachable, -) -from ._const import ( -    CruConstantBase, -    CruDontChange, -    CruNotFound, -    CruNoValue, -    CruPlaceholder, -    CruUseDefault, -) -from ._func import CruFunction -from ._iter import CruIterable, CruIterator -from ._event import CruEvent, CruEventHandlerToken -from ._type import CruTypeSet, CruTypeCheckError - - -class CruInitError(CruException): -    pass - - -def check_python_version(required_version=(3, 11)): -    if sys.version_info < required_version: -        raise CruInitError(f"Python version must be >= {required_version}!") - - -check_python_version() - -__all__ = [ -    "CRU", -    "CruNamespaceError", -    "CRU_NAME_PREFIXES", -    "check_python_version", -    "CruException", -    "CruInternalError", -    "CruLogicError", -    "CruUnreachableError", -    "cru_unreachable", -    "CruInitError", -    "CruConstantBase", -    "CruDontChange", -    "CruNotFound", -    "CruNoValue", -    "CruPlaceholder", -    "CruUseDefault", -    "CruFunction", -    "CruIterable", -    "CruIterator", -    "CruEvent", -    "CruEventHandlerToken", -    "CruTypeSet", -    "CruTypeCheckError", -] diff --git a/tools/cru-py/cru/_base.py b/tools/cru-py/cru/_base.py deleted file mode 100644 index 2599d8f..0000000 --- a/tools/cru-py/cru/_base.py +++ /dev/null @@ -1,101 +0,0 @@ -from typing import Any - -from ._helper import remove_none -from ._error import CruException - - -class CruNamespaceError(CruException): -    """Raised when a namespace is not found.""" - - -class _Cru: -    NAME_PREFIXES = ("CRU_", "Cru", "cru_") - -    def __init__(self) -> None: -        self._d: dict[str, Any] = {} - -    def all_names(self) -> list[str]: -        return list(self._d.keys()) - -    def get(self, name: str) -> Any: -        return self._d[name] - -    def has_name(self, name: str) -> bool: -        return name in self._d - -    @staticmethod -    def _maybe_remove_prefix(name: str) -> str | None: -        for prefix in _Cru.NAME_PREFIXES: -            if name.startswith(prefix): -                return name[len(prefix) :] -        return None - -    def _check_name_exist(self, *names: str | None) -> None: -        for name in names: -            if name is None: -                continue -            if self.has_name(name): -                raise CruNamespaceError(f"Name {name} exists in CRU.") - -    @staticmethod -    def check_name_format(name: str) -> tuple[str, str]: -        no_prefix_name = _Cru._maybe_remove_prefix(name) -        if no_prefix_name is None: -            raise CruNamespaceError( -                f"Name {name} is not prefixed with any of {_Cru.NAME_PREFIXES}." -            ) -        return name, no_prefix_name - -    @staticmethod -    def _check_object_name(o) -> tuple[str, str]: -        return _Cru.check_name_format(o.__name__) - -    def _do_add(self, o, *names: str | None) -> list[str]: -        name_list: list[str] = remove_none(names) -        for name in name_list: -            self._d[name] = o -        return name_list - -    def add(self, o, name: str | None) -> tuple[str, str | None]: -        no_prefix_name: str | None -        if name is None: -            name, no_prefix_name = self._check_object_name(o) -        else: -            no_prefix_name = self._maybe_remove_prefix(name) - -        self._check_name_exist(name, no_prefix_name) -        self._do_add(o, name, no_prefix_name) -        return name, no_prefix_name - -    def add_with_alias(self, o, name: str | None = None, *aliases: str) -> list[str]: -        final_names: list[str | None] = [] -        no_prefix_name: str | None -        if name is None: -            name, no_prefix_name = self._check_object_name(o) -            self._check_name_exist(name, no_prefix_name) -            final_names.extend([name, no_prefix_name]) -        for alias in aliases: -            no_prefix_name = self._maybe_remove_prefix(alias) -            self._check_name_exist(alias, no_prefix_name) -            final_names.extend([alias, no_prefix_name]) - -        return self._do_add(o, *final_names) - -    def add_objects(self, *objects): -        final_list = [] -        for o in objects: -            name, no_prefix_name = self._check_object_name(o) -            self._check_name_exist(name, no_prefix_name) -            final_list.append((o, name, no_prefix_name)) -        for o, name, no_prefix_name in final_list: -            self._do_add(o, name, no_prefix_name) - -    def __getitem__(self, item): -        return self.get(item) - -    def __getattr__(self, item): -        return self.get(item) - - -CRU_NAME_PREFIXES = _Cru.NAME_PREFIXES -CRU = _Cru() diff --git a/tools/cru-py/cru/_const.py b/tools/cru-py/cru/_const.py deleted file mode 100644 index 8246b35..0000000 --- a/tools/cru-py/cru/_const.py +++ /dev/null @@ -1,49 +0,0 @@ -from enum import Enum, auto -from typing import Self, TypeGuard, TypeVar - -from ._base import CRU - -_T = TypeVar("_T") - - -class CruConstantBase(Enum): -    @classmethod -    def check(cls, v: _T | Self) -> TypeGuard[Self]: -        return isinstance(v, cls) - -    @classmethod -    def check_not(cls, v: _T | Self) -> TypeGuard[_T]: -        return not cls.check(v) - -    @classmethod -    def value(cls) -> Self: -        return cls.VALUE  # type: ignore - - -class CruNotFound(CruConstantBase): -    VALUE = auto() - - -class CruUseDefault(CruConstantBase): -    VALUE = auto() - - -class CruDontChange(CruConstantBase): -    VALUE = auto() - - -class CruNoValue(CruConstantBase): -    VALUE = auto() - - -class CruPlaceholder(CruConstantBase): -    VALUE = auto() - - -CRU.add_objects( -    CruNotFound, -    CruUseDefault, -    CruDontChange, -    CruNoValue, -    CruPlaceholder, -) diff --git a/tools/cru-py/cru/_decorator.py b/tools/cru-py/cru/_decorator.py deleted file mode 100644 index 137fc05..0000000 --- a/tools/cru-py/cru/_decorator.py +++ /dev/null @@ -1,97 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable -from typing import ( -    Concatenate, -    Generic, -    ParamSpec, -    TypeVar, -    cast, -) - -from ._base import CRU - -_P = ParamSpec("_P") -_T = TypeVar("_T") -_O = TypeVar("_O") -_R = TypeVar("_R") - - -class CruDecorator: - -    class ConvertResult(Generic[_T, _O]): -        def __init__( -            self, -            converter: Callable[[_T], _O], -        ) -> None: -            self.converter = converter - -        def __call__(self, origin: Callable[_P, _T]) -> Callable[_P, _O]: -            converter = self.converter - -            def real_impl(*args: _P.args, **kwargs: _P.kwargs) -> _O: -                return converter(origin(*args, **kwargs)) - -            return real_impl - -    class ImplementedBy(Generic[_T, _O, _P, _R]): -        def __init__( -            self, -            impl: Callable[Concatenate[_O, _P], _R], -            converter: Callable[[_T], _O], -        ) -> None: -            self.impl = impl -            self.converter = converter - -        def __call__( -            self, _origin: Callable[[_T], None] -        ) -> Callable[Concatenate[_T, _P], _R]: -            converter = self.converter -            impl = self.impl - -            def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: -                return cast(Callable[Concatenate[_O, _P], _R], impl)( -                    converter(_self), *args, **kwargs -                ) - -            return real_impl - -        @staticmethod -        def create_factory(converter: Callable[[_T], _O]) -> Callable[ -            [Callable[Concatenate[_O, _P], _R]], -            CruDecorator.ImplementedBy[_T, _O, _P, _R], -        ]: -            def create( -                m: Callable[Concatenate[_O, _P], _R], -            ) -> CruDecorator.ImplementedBy[_T, _O, _P, _R]: -                return CruDecorator.ImplementedBy(m, converter) - -            return create - -    class ImplementedByNoSelf(Generic[_P, _R]): -        def __init__(self, impl: Callable[_P, _R]) -> None: -            self.impl = impl - -        def __call__( -            self, _origin: Callable[[_T], None] -        ) -> Callable[Concatenate[_T, _P], _R]: -            impl = self.impl - -            def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: -                return cast(Callable[_P, _R], impl)(*args, **kwargs) - -            return real_impl - -        @staticmethod -        def create_factory() -> ( -            Callable[[Callable[_P, _R]], CruDecorator.ImplementedByNoSelf[_P, _R]] -        ): -            def create( -                m: Callable[_P, _R], -            ) -> CruDecorator.ImplementedByNoSelf[_P, _R]: -                return CruDecorator.ImplementedByNoSelf(m) - -            return create - - -CRU.add_objects(CruDecorator) diff --git a/tools/cru-py/cru/_error.py b/tools/cru-py/cru/_error.py deleted file mode 100644 index e53c787..0000000 --- a/tools/cru-py/cru/_error.py +++ /dev/null @@ -1,89 +0,0 @@ -from __future__ import annotations - -from typing import NoReturn, cast, overload - - -class CruException(Exception): -    """Base exception class of all exceptions in cru.""" - -    @overload -    def __init__( -        self, -        message: None = None, -        *args, -        user_message: str, -        **kwargs, -    ): ... - -    @overload -    def __init__( -        self, -        message: str, -        *args, -        user_message: str | None = None, -        **kwargs, -    ): ... - -    def __init__( -        self, -        message: str | None = None, -        *args, -        user_message: str | None = None, -        **kwargs, -    ): -        if message is None: -            message = user_message - -        super().__init__( -            message, -            *args, -            **kwargs, -        ) -        self._message: str -        self._message = cast(str, message) -        self._user_message = user_message - -    @property -    def message(self) -> str: -        return self._message - -    def get_user_message(self) -> str | None: -        return self._user_message - -    def get_message(self, use_user: bool = True) -> str: -        if use_user and self._user_message is not None: -            return self._user_message -        else: -            return self._message - -    @property -    def is_internal(self) -> bool: -        return False - -    @property -    def is_logic_error(self) -> bool: -        return False - - -class CruLogicError(CruException): -    """Raised when a logic error occurs.""" - -    @property -    def is_logic_error(self) -> bool: -        return True - - -class CruInternalError(CruException): -    """Raised when an internal error occurs.""" - -    @property -    def is_internal(self) -> bool: -        return True - - -class CruUnreachableError(CruInternalError): -    """Raised when a code path is unreachable.""" - - -def cru_unreachable() -> NoReturn: -    raise CruUnreachableError("Code should not reach here!") diff --git a/tools/cru-py/cru/_event.py b/tools/cru-py/cru/_event.py deleted file mode 100644 index 51a794c..0000000 --- a/tools/cru-py/cru/_event.py +++ /dev/null @@ -1,61 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable -from typing import Generic, ParamSpec, TypeVar - -from .list import CruList - -_P = ParamSpec("_P") -_R = TypeVar("_R") - - -class CruEventHandlerToken(Generic[_P, _R]): -    def __init__( -        self, event: CruEvent, handler: Callable[_P, _R], once: bool = False -    ) -> None: -        self._event = event -        self._handler = handler -        self._once = once - -    @property -    def event(self) -> CruEvent: -        return self._event - -    @property -    def handler(self) -> Callable[_P, _R]: -        return self._handler - -    @property -    def once(self) -> bool: -        return self._once - - -class CruEvent(Generic[_P, _R]): -    def __init__(self, name: str) -> None: -        self._name = name -        self._tokens: CruList[CruEventHandlerToken] = CruList() - -    def register( -        self, handler: Callable[_P, _R], once: bool = False -    ) -> CruEventHandlerToken: -        token = CruEventHandlerToken(self, handler, once) -        self._tokens.append(token) -        return token - -    def unregister(self, *handlers: CruEventHandlerToken | Callable[_P, _R]) -> int: -        old_length = len(self._tokens) -        self._tokens.reset( -            self._tokens.as_cru_iterator().filter( -                (lambda t: t in handlers or t.handler in handlers) -            ) -        ) -        return old_length - len(self._tokens) - -    def trigger(self, *args: _P.args, **kwargs: _P.kwargs) -> CruList[_R]: -        results = CruList( -            self._tokens.as_cru_iterator() -            .transform(lambda t: t.handler(*args, **kwargs)) -            .to_list() -        ) -        self._tokens.reset(self._tokens.as_cru_iterator().filter(lambda t: not t.once)) -        return results diff --git a/tools/cru-py/cru/_func.py b/tools/cru-py/cru/_func.py deleted file mode 100644 index fc57802..0000000 --- a/tools/cru-py/cru/_func.py +++ /dev/null @@ -1,172 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable, Iterable -from enum import Flag, auto -from typing import ( -    Any, -    Generic, -    Literal, -    ParamSpec, -    TypeAlias, -    TypeVar, -) - - -from ._base import CRU -from ._const import CruPlaceholder - -_P = ParamSpec("_P") -_P1 = ParamSpec("_P1") -_T = TypeVar("_T") - - -class _Dec: -    @staticmethod -    def wrap( -        origin: Callable[_P, Callable[_P1, _T]] -    ) -> Callable[_P, _Wrapper[_P1, _T]]: -        def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_P1, _T]: -            return _Wrapper(origin(*args, **kwargs)) - -        return _wrapped - - -class _RawBase: -    @staticmethod -    def none(*_v, **_kwargs) -> None: -        return None - -    @staticmethod -    def true(*_v, **_kwargs) -> Literal[True]: -        return True - -    @staticmethod -    def false(*_v, **_kwargs) -> Literal[False]: -        return False - -    @staticmethod -    def identity(v: _T) -> _T: -        return v - -    @staticmethod -    def only_you(v: _T, *_v, **_kwargs) -> _T: -        return v - -    @staticmethod -    def equal(a: Any, b: Any) -> bool: -        return a == b - -    @staticmethod -    def not_equal(a: Any, b: Any) -> bool: -        return a != b - -    @staticmethod -    def not_(v: Any) -> Any: -        return not v - - -class _Wrapper(Generic[_P, _T]): -    def __init__(self, f: Callable[_P, _T]): -        self._f = f - -    @property -    def me(self) -> Callable[_P, _T]: -        return self._f - -    def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: -        return self._f(*args, **kwargs) - -    @_Dec.wrap -    def bind(self, *bind_args, **bind_kwargs) -> Callable[..., _T]: -        func = self.me - -        def bound_func(*args, **kwargs): -            popped = 0 -            real_args = [] -            for arg in bind_args: -                if CruPlaceholder.check(arg): -                    real_args.append(args[popped]) -                    popped += 1 -                else: -                    real_args.append(arg) -            real_args.extend(args[popped:]) -            return func(*real_args, **(bind_kwargs | kwargs)) - -        return bound_func - -    class ChainMode(Flag): -        ARGS = auto() -        KWARGS = auto() -        BOTH = ARGS | KWARGS - -    ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]] -    KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]] -    ChainableCallable: TypeAlias = Callable[ -        ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]] -    ] - -    @_Dec.wrap -    def chain_with_args( -        self, funcs: Iterable[ArgsChainableCallable], *bind_args, **bind_kwargs -    ) -> ArgsChainableCallable: -        def chained_func(*args): -            args = self.bind(*bind_args, **bind_kwargs)(*args) - -            for func in funcs: -                args = _Wrapper(func).bind(*bind_args, **bind_kwargs)(*args) -            return args - -        return chained_func - -    @_Dec.wrap -    def chain_with_kwargs( -        self, funcs: Iterable[KwargsChainableCallable], *bind_args, **bind_kwargs -    ) -> KwargsChainableCallable: -        def chained_func(**kwargs): -            kwargs = self.bind(*bind_args, **bind_kwargs)(**kwargs) -            for func in funcs: -                kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(**kwargs) -            return kwargs - -        return chained_func - -    @_Dec.wrap -    def chain_with_both( -        self, funcs: Iterable[ChainableCallable], *bind_args, **bind_kwargs -    ) -> ChainableCallable: -        def chained_func(*args, **kwargs): -            for func in funcs: -                args, kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)( -                    *args, **kwargs -                ) -            return args, kwargs - -        return chained_func - - -class _Base: -    none = _Wrapper(_RawBase.none) -    true = _Wrapper(_RawBase.true) -    false = _Wrapper(_RawBase.false) -    identity = _Wrapper(_RawBase.identity) -    only_you = _Wrapper(_RawBase.only_you) -    equal = _Wrapper(_RawBase.equal) -    not_equal = _Wrapper(_RawBase.not_equal) -    not_ = _Wrapper(_RawBase.not_) - - -class _Creators: -    @staticmethod -    def make_isinstance_of_types(*types: type) -> Callable: -        return _Wrapper(lambda v: type(v) in types) - - -class CruFunction: -    RawBase: TypeAlias = _RawBase -    Base: TypeAlias = _Base -    Creators: TypeAlias = _Creators -    Wrapper: TypeAlias = _Wrapper -    Decorators: TypeAlias = _Dec - - -CRU.add_objects(CruFunction) diff --git a/tools/cru-py/cru/_helper.py b/tools/cru-py/cru/_helper.py deleted file mode 100644 index 43baf46..0000000 --- a/tools/cru-py/cru/_helper.py +++ /dev/null @@ -1,16 +0,0 @@ -from collections.abc import Callable -from typing import Any, Iterable, TypeVar, cast - -_T = TypeVar("_T") -_D = TypeVar("_D") - - -def remove_element( -    iterable: Iterable[_T | None], to_rm: Iterable[Any], des: type[_D] | None = None -) -> _D: -    to_rm = set(to_rm) -    return cast(Callable[..., _D], des or list)(v for v in iterable if v not in to_rm) - - -def remove_none(iterable: Iterable[_T | None], des: type[_D] | None = None) -> _D: -    return cast(Callable[..., _D], des or list)(v for v in iterable if v is not None) diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py deleted file mode 100644 index f9683ca..0000000 --- a/tools/cru-py/cru/_iter.py +++ /dev/null @@ -1,469 +0,0 @@ -from __future__ import annotations - -from collections.abc import Iterable, Callable, Generator, Iterator -from dataclasses import dataclass -from enum import Enum -from typing import ( -    Concatenate, -    Literal, -    Never, -    Self, -    TypeAlias, -    TypeVar, -    ParamSpec, -    Any, -    Generic, -    cast, -) - -from ._base import CRU -from ._const import CruNotFound -from ._error import cru_unreachable - -_P = ParamSpec("_P") -_T = TypeVar("_T") -_O = TypeVar("_O") -_V = TypeVar("_V") -_R = TypeVar("_R") - - -class _Generic: -    class StepActionKind(Enum): -        SKIP = 0 -        PUSH = 1 -        STOP = 2 -        AGGREGATE = 3 - -    @dataclass -    class StepAction(Generic[_V, _R]): -        value: Iterable[Self] | _V | _R | None -        kind: _Generic.StepActionKind - -        @property -        def push_value(self) -> _V: -            assert self.kind == _Generic.StepActionKind.PUSH -            return cast(_V, self.value) - -        @property -        def stop_value(self) -> _R: -            assert self.kind == _Generic.StepActionKind.STOP -            return cast(_R, self.value) - -        @staticmethod -        def skip() -> _Generic.StepAction[_V, _R]: -            return _Generic.StepAction(None, _Generic.StepActionKind.SKIP) - -        @staticmethod -        def push(value: _V | None) -> _Generic.StepAction[_V, _R]: -            return _Generic.StepAction(value, _Generic.StepActionKind.PUSH) - -        @staticmethod -        def stop(value: _R | None = None) -> _Generic.StepAction[_V, _R]: -            return _Generic.StepAction(value, _Generic.StepActionKind.STOP) - -        @staticmethod -        def aggregate( -            *results: _Generic.StepAction[_V, _R], -        ) -> _Generic.StepAction[_V, _R]: -            return _Generic.StepAction(results, _Generic.StepActionKind.AGGREGATE) - -        @staticmethod -        def push_last(value: _V | None) -> _Generic.StepAction[_V, _R]: -            return _Generic.StepAction.aggregate( -                _Generic.StepAction.push(value), _Generic.StepAction.stop() -            ) - -        def flatten(self) -> Iterable[Self]: -            return _Generic.flatten( -                self, -                is_leave=lambda r: r.kind != _Generic.StepActionKind.AGGREGATE, -                get_children=lambda r: cast(Iterable[Self], r.value), -            ) - -    GeneralStepAction: TypeAlias = StepAction[_V, _R] | _V | _R | None -    IterateOperation: TypeAlias = Callable[[_T, int], GeneralStepAction[_V, _R]] -    IteratePreHook: TypeAlias = Callable[[Iterable[_T]], GeneralStepAction[_V, _R]] -    IteratePostHook: TypeAlias = Callable[[int], GeneralStepAction[_V, _R]] - -    @staticmethod -    def _is_not_iterable(o: Any) -> bool: -        return not isinstance(o, Iterable) - -    @staticmethod -    def _return_self(o): -        return o - -    @staticmethod -    def iterable_flatten( -        maybe_iterable: Iterable[_T] | _T, max_depth: int = -1, *, _depth: int = 0 -    ) -> Iterable[Iterable[_T] | _T]: -        if _depth == max_depth or not isinstance(maybe_iterable, Iterable): -            yield maybe_iterable -            return - -        for child in maybe_iterable: -            yield from _Generic.iterable_flatten( -                child, -                max_depth, -                _depth=_depth + 1, -            ) - -    @staticmethod -    def flatten( -        o: _O, -        max_depth: int = -1, -        /, -        is_leave: CruIterator.ElementPredicate[_O] = _is_not_iterable, -        get_children: CruIterator.ElementTransformer[_O, Iterable[_O]] = _return_self, -        *, -        _depth: int = 0, -    ) -> Iterable[_O]: -        if _depth == max_depth or is_leave(o): -            yield o -            return -        for child in get_children(o): -            yield from _Generic.flatten( -                child, -                max_depth, -                is_leave, -                get_children, -                _depth=_depth + 1, -            ) - -    class Results: -        @staticmethod -        def true(_) -> Literal[True]: -            return True - -        @staticmethod -        def false(_) -> Literal[False]: -            return False - -        @staticmethod -        def not_found(_) -> Literal[CruNotFound.VALUE]: -            return CruNotFound.VALUE - -    @staticmethod -    def _non_result_to_push(value: Any) -> StepAction[_V, _R]: -        return _Generic.StepAction.push(value) - -    @staticmethod -    def _non_result_to_stop(value: Any) -> StepAction[_V, _R]: -        return _Generic.StepAction.stop(value) - -    @staticmethod -    def _none_hook(_: Any) -> StepAction[_V, _R]: -        return _Generic.StepAction.skip() - -    def iterate( -        iterable: Iterable[_T], -        operation: IterateOperation[_T, _V, _R], -        fallback_return: _R, -        pre_iterate: IteratePreHook[_T, _V, _R], -        post_iterate: IteratePostHook[_V, _R], -        convert_value_result: Callable[[_V | _R | None], StepAction[_V, _R]], -    ) -> Generator[_V, None, _R]: -        pre_result = pre_iterate(iterable) -        if not isinstance(pre_result, _Generic.StepAction): -            real_pre_result = convert_value_result(pre_result) -        for r in real_pre_result.flatten(): -            if r.kind == _Generic.StepActionKind.STOP: -                return r.stop_value -            elif r.kind == _Generic.StepActionKind.PUSH: -                yield r.push_value -            else: -                assert r.kind == _Generic.StepActionKind.SKIP - -        for index, element in enumerate(iterable): -            result = operation(element, index) -            if not isinstance(result, _Generic.StepAction): -                real_result = convert_value_result(result) -            for r in real_result.flatten(): -                if r.kind == _Generic.StepActionKind.STOP: -                    return r.stop_value -                elif r.kind == _Generic.StepActionKind.PUSH: -                    yield r.push_value -                else: -                    assert r.kind == _Generic.StepActionKind.SKIP -                    continue - -        post_result = post_iterate(index + 1) -        if not isinstance(post_result, _Generic.StepAction): -            real_post_result = convert_value_result(post_result) -        for r in real_post_result.flatten(): -            if r.kind == _Generic.StepActionKind.STOP: -                return r.stop_value -            elif r.kind == _Generic.StepActionKind.PUSH: -                yield r.push_value -            else: -                assert r.kind == _Generic.StepActionKind.SKIP - -        return fallback_return - -    def create_new( -        iterable: Iterable[_T], -        operation: IterateOperation[_T, _V, _R], -        fallback_return: _R, -        /, -        pre_iterate: IteratePreHook[_T, _V, _R] | None = None, -        post_iterate: IteratePostHook[_V, _R] | None = None, -    ) -> Generator[_V, None, _R]: -        return _Generic.iterate( -            iterable, -            operation, -            fallback_return, -            pre_iterate or _Generic._none_hook, -            post_iterate or _Generic._none_hook, -            _Generic._non_result_to_push, -        ) - -    def get_result( -        iterable: Iterable[_T], -        operation: IterateOperation[_T, _V, _R], -        fallback_return: _R, -        /, -        pre_iterate: IteratePreHook[_T, _V, _R] | None = None, -        post_iterate: IteratePostHook[_V, _R] | None = None, -    ) -> _R: -        try: -            for _ in _Generic.iterate( -                iterable, -                operation, -                fallback_return, -                pre_iterate or _Generic._none_hook, -                post_iterate or _Generic._none_hook, -                _Generic._non_result_to_stop, -            ): -                pass -        except StopIteration as stop: -            return stop.value -        cru_unreachable() - - -class _Helpers: -    @staticmethod -    def auto_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]: -        count = 0 - -        def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _O: -            nonlocal count -            r = c(count, *args, **kwargs) -            count += 1 -            return r - -        return wrapper - - -class _Creators: -    class Raw: -        @staticmethod -        def empty() -> Iterator[Never]: -            return iter([]) - -        @staticmethod -        def range(*args) -> Iterator[int]: -            return iter(range(*args)) - -        @staticmethod -        def unite(*args: _T) -> Iterator[_T]: -            return iter(args) - -        @staticmethod -        def _concat(*iterables: Iterable[_T]) -> Iterable[_T]: -            for iterable in iterables: -                yield from iterable - -        @staticmethod -        def concat(*iterables: Iterable[_T]) -> Iterator[_T]: -            return iter(_Creators.Raw._concat(*iterables)) - -    @staticmethod -    def _wrap(f: Callable[_P, Iterable[_O]]) -> Callable[_P, CruIterator[_O]]: -        def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> CruIterator[_O]: -            return CruIterator(f(*args, **kwargs)) - -        return _wrapped - -    empty = _wrap(Raw.empty) -    range = _wrap(Raw.range) -    unite = _wrap(Raw.unite) -    concat = _wrap(Raw.concat) - - -class CruIterator(Generic[_T]): -    ElementOperation: TypeAlias = Callable[[_V], Any] -    ElementPredicate: TypeAlias = Callable[[_V], bool] -    AnyElementPredicate: TypeAlias = ElementPredicate[Any] -    ElementTransformer: TypeAlias = Callable[[_V], _O] -    SelfElementTransformer: TypeAlias = ElementTransformer[_V, _V] -    AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] - -    Creators: TypeAlias = _Creators -    Helpers: TypeAlias = _Helpers - -    def __init__(self, iterable: Iterable[_T]) -> None: -        self._iterator = iter(iterable) - -    def __iter__(self) -> Iterator[_T]: -        return self._iterator - -    def create_new_me(self, iterable: Iterable[_O]) -> CruIterator[_O]: -        return type(self)(iterable)  # type: ignore - -    @staticmethod -    def _wrap( -        f: Callable[Concatenate[CruIterator[_T], _P], Iterable[_O]], -    ) -> Callable[Concatenate[CruIterator[_T], _P], CruIterator[_O]]: -        def _wrapped( -            self: CruIterator[_T], *args: _P.args, **kwargs: _P.kwargs -        ) -> CruIterator[_O]: -            return self.create_new_me(f(self, *args, **kwargs)) - -        return _wrapped - -    @_wrap -    def replace_me(self, iterable: Iterable[_O]) -> Iterable[_O]: -        return iterable - -    def replace_me_with_empty(self) -> CruIterator[Never]: -        return self.create_new_me(_Creators.Raw.empty()) - -    def replace_me_with_range(self, *args) -> CruIterator[int]: -        return self.create_new_me(_Creators.Raw.range(*args)) - -    def replace_me_with_unite(self, *args: _O) -> CruIterator[_O]: -        return self.create_new_me(_Creators.Raw.unite(*args)) - -    def replace_me_with_concat(self, *iterables: Iterable[_T]) -> CruIterator[_T]: -        return self.create_new_me(_Creators.Raw.concat(*iterables)) - -    def to_set(self) -> set[_T]: -        return set(self) - -    def to_list(self) -> list[_T]: -        return list(self) - -    def all(self, predicate: ElementPredicate[_T]) -> bool: -        for value in self: -            if not predicate(value): -                return False -        return True - -    def any(self, predicate: ElementPredicate[_T]) -> bool: -        for value in self: -            if predicate(value): -                return True -        return False - -    def foreach(self, operation: ElementOperation[_T]) -> None: -        for value in self: -            operation(value) - -    @_wrap -    def transform(self, transformer: ElementTransformer[_T, _O]) -> Iterable[_O]: -        for value in self: -            yield transformer(value) - -    map = transform - -    @_wrap -    def filter(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: -        for value in self: -            if predicate(value): -                yield value - -    @_wrap -    def continue_if(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: -        for value in self: -            yield value -            if not predicate(value): -                break - -    def first_n(self, max_count: int) -> CruIterator[_T]: -        if max_count < 0: -            raise ValueError("max_count must be 0 or positive.") -        if max_count == 0: -            return self.replace_me_with_empty()  # type: ignore -        return self.continue_if(_Helpers.auto_count(lambda i, _: i < max_count - 1)) - -    def drop_n(self, n: int) -> CruIterator[_T]: -        if n < 0: -            raise ValueError("n must be 0 or positive.") -        if n == 0: -            return self -        return self.filter(_Helpers.auto_count(lambda i, _: i < n)) - -    def single_or( -        self, fallback: _O | CruNotFound = CruNotFound.VALUE -    ) -> _T | _O | CruNotFound: -        first_2 = self.first_n(2) -        has_value = False -        for element in first_2: -            if has_value: -                raise ValueError("More than one value found.") -            has_value = True -            value = element -        if has_value: -            return value -        else: -            return fallback - -    def first_or( -        self, fallback: _O | CruNotFound = CruNotFound.VALUE -    ) -> _T | _O | CruNotFound: -        return self.first_n(1).single_or(fallback) - -    @_wrap -    def flatten(self, max_depth: int = -1) -> Iterable[Any]: -        return _Generic.iterable_flatten(self, max_depth) - -    def select_by_indices(self, indices: Iterable[int]) -> CruIterator[_T]: -        index_set = set(indices) -        max_index = max(index_set) -        return self.first_n(max_index + 1).filter( -            _Helpers.auto_count(lambda i, _: i in index_set) -        ) - -    def remove_values(self, values: Iterable[Any]) -> CruIterator[_T]: -        value_set = set(values) -        return self.filter(lambda v: v not in value_set) - -    def replace_values( -        self, old_values: Iterable[Any], new_value: _O -    ) -> Iterable[_T | _O]: -        value_set = set(old_values) -        return self.transform(lambda v: new_value if v in value_set else v) - -    def group_by(self, key_getter: Callable[[_T], _O]) -> dict[_O, list[_T]]: -        result: dict[_O, list[_T]] = {} - -        for item in self: -            key = key_getter(item) -            if key not in result: -                result[key] = [] -            result[key].append(item) - -        return result - -    def join_str(self: CruIterator[str], separator: str) -> str: -        return separator.join(self) - - -class CruIterMixin(Generic[_T]): -    def cru_iter(self: Iterable[_T]) -> CruIterator[_T]: -        return CruIterator(self) - - -class CruIterList(list[_T], CruIterMixin[_T]): -    pass - - -class CruIterable: -    Generic: TypeAlias = _Generic -    Iterator: TypeAlias = CruIterator[_T] -    Helpers: TypeAlias = _Helpers -    Mixin: TypeAlias = CruIterMixin[_T] -    IterList: TypeAlias = CruIterList[_T] - - -CRU.add_objects(CruIterable, CruIterator) diff --git a/tools/cru-py/cru/_type.py b/tools/cru-py/cru/_type.py deleted file mode 100644 index 1f81da3..0000000 --- a/tools/cru-py/cru/_type.py +++ /dev/null @@ -1,52 +0,0 @@ -from collections.abc import Iterable -from typing import Any - -from ._error import CruException, CruLogicError -from ._iter import CruIterator - - -class CruTypeCheckError(CruException): -    pass - - -DEFAULT_NONE_ERR_MSG = "None is not allowed here." -DEFAULT_TYPE_ERR_MSG = "Object of this type is not allowed here." - - -class CruTypeSet(set[type]): -    def __init__(self, *types: type): -        type_set = CruIterator(types).filter(lambda t: t is not None).to_set() -        if not CruIterator(type_set).all(lambda t: isinstance(t, type)): -            raise CruLogicError("TypeSet can only contain type.") -        super().__init__(type_set) - -    def check_value( -        self, -        value: Any, -        /, -        allow_none: bool = False, -        empty_allow_all: bool = True, -    ) -> None: -        if value is None: -            if allow_none: -                return -            else: -                raise CruTypeCheckError(DEFAULT_NONE_ERR_MSG) -        if len(self) == 0 and empty_allow_all: -            return -        if not CruIterator(self).any(lambda t: isinstance(value, t)): -            raise CruTypeCheckError(DEFAULT_TYPE_ERR_MSG) - -    def check_value_list( -        self, -        values: Iterable[Any], -        /, -        allow_none: bool = False, -        empty_allow_all: bool = True, -    ) -> None: -        for value in values: -            self.check_value( -                value, -                allow_none, -                empty_allow_all, -            ) diff --git a/tools/cru-py/cru/attr.py b/tools/cru-py/cru/attr.py deleted file mode 100644 index d4cc86a..0000000 --- a/tools/cru-py/cru/attr.py +++ /dev/null @@ -1,364 +0,0 @@ -from __future__ import annotations - -import copy -from collections.abc import Callable, Iterable -from dataclasses import dataclass, field -from typing import Any - -from .list import CruUniqueKeyList -from ._type import CruTypeSet -from ._const import CruNotFound, CruUseDefault, CruDontChange -from ._iter import CruIterator - - -@dataclass -class CruAttr: - -    name: str -    value: Any -    description: str | None - -    @staticmethod -    def make( -        name: str, value: Any = CruUseDefault.VALUE, description: str | None = None -    ) -> CruAttr: -        return CruAttr(name, value, description) - - -CruAttrDefaultFactory = Callable[["CruAttrDef"], Any] -CruAttrTransformer = Callable[[Any, "CruAttrDef"], Any] -CruAttrValidator = Callable[[Any, "CruAttrDef"], None] - - -@dataclass -class CruAttrDef: -    name: str -    description: str -    default_factory: CruAttrDefaultFactory -    transformer: CruAttrTransformer -    validator: CruAttrValidator - -    def __init__( -        self, -        name: str, -        description: str, -        default_factory: CruAttrDefaultFactory, -        transformer: CruAttrTransformer, -        validator: CruAttrValidator, -    ) -> None: -        self.name = name -        self.description = description -        self.default_factory = default_factory -        self.transformer = transformer -        self.validator = validator - -    def transform(self, value: Any) -> Any: -        if self.transformer is not None: -            return self.transformer(value, self) -        return value - -    def validate(self, value: Any, /, force_allow_none: bool = False) -> None: -        if force_allow_none is value is None: -            return -        if self.validator is not None: -            self.validator(value, self) - -    def transform_and_validate( -        self, value: Any, /, force_allow_none: bool = False -    ) -> Any: -        value = self.transform(value) -        self.validate(value, force_allow_none) -        return value - -    def make_default_value(self) -> Any: -        return self.transform_and_validate(self.default_factory(self)) - -    def adopt(self, attr: CruAttr) -> CruAttr: -        attr = copy.deepcopy(attr) - -        if attr.name is None: -            attr.name = self.name -        elif attr.name != self.name: -            raise ValueError(f"Attr name is not match: {attr.name} != {self.name}") - -        if attr.value is CruUseDefault.VALUE: -            attr.value = self.make_default_value() -        else: -            attr.value = self.transform_and_validate(attr.value) - -        if attr.description is None: -            attr.description = self.description - -        return attr - -    def make( -        self, value: Any = CruUseDefault.VALUE, description: None | str = None -    ) -> CruAttr: -        value = self.make_default_value() if value is CruUseDefault.VALUE else value -        value = self.transform_and_validate(value) -        return CruAttr( -            self.name, -            value, -            description if description is not None else self.description, -        ) - - -@dataclass -class CruAttrDefBuilder: - -    name: str -    description: str -    types: list[type] | None = field(default=None) -    allow_none: bool = field(default=False) -    default: Any = field(default=CruUseDefault.VALUE) -    default_factory: CruAttrDefaultFactory | None = field(default=None) -    auto_list: bool = field(default=False) -    transformers: list[CruAttrTransformer] = field(default_factory=list) -    validators: list[CruAttrValidator] = field(default_factory=list) -    override_transformer: CruAttrTransformer | None = field(default=None) -    override_validator: CruAttrValidator | None = field(default=None) - -    build_hook: Callable[[CruAttrDef], None] | None = field(default=None) - -    def __init__(self, name: str, description: str) -> None: -        super().__init__() -        self.name = name -        self.description = description - -    def auto_adjust_default(self) -> None: -        if self.default is not CruUseDefault.VALUE and self.default is not None: -            return -        if self.allow_none and self.default is CruUseDefault.VALUE: -            self.default = None -        if not self.allow_none and self.default is None: -            self.default = CruUseDefault.VALUE -        if self.auto_list and not self.allow_none: -            self.default = [] - -    def with_name(self, name: str | CruDontChange) -> CruAttrDefBuilder: -        if name is not CruDontChange.VALUE: -            self.name = name -        return self - -    def with_description( -        self, default_description: str | CruDontChange -    ) -> CruAttrDefBuilder: -        if default_description is not CruDontChange.VALUE: -            self.description = default_description -        return self - -    def with_default(self, default: Any) -> CruAttrDefBuilder: -        if default is not CruDontChange.VALUE: -            self.default = default -        return self - -    def with_default_factory( -        self, -        default_factory: CruAttrDefaultFactory | CruDontChange, -    ) -> CruAttrDefBuilder: -        if default_factory is not CruDontChange.VALUE: -            self.default_factory = default_factory -        return self - -    def with_types( -        self, -        types: Iterable[type] | None | CruDontChange, -    ) -> CruAttrDefBuilder: -        if types is not CruDontChange.VALUE: -            self.types = None if types is None else list(types) -        return self - -    def with_allow_none(self, allow_none: bool | CruDontChange) -> CruAttrDefBuilder: -        if allow_none is not CruDontChange.VALUE: -            self.allow_none = allow_none -        return self - -    def with_auto_list( -        self, auto_list: bool | CruDontChange = True -    ) -> CruAttrDefBuilder: -        if auto_list is not CruDontChange.VALUE: -            self.auto_list = auto_list -        return self - -    def with_constraint( -        self, -        /, -        allow_none: bool | CruDontChange = CruDontChange.VALUE, -        types: Iterable[type] | None | CruDontChange = CruDontChange.VALUE, -        default: Any = CruDontChange.VALUE, -        default_factory: CruAttrDefaultFactory | CruDontChange = CruDontChange.VALUE, -        auto_list: bool | CruDontChange = CruDontChange.VALUE, -    ) -> CruAttrDefBuilder: -        return ( -            self.with_allow_none(allow_none) -            .with_types(types) -            .with_default(default) -            .with_default_factory(default_factory) -            .with_auto_list(auto_list) -        ) - -    def add_transformer(self, transformer: CruAttrTransformer) -> CruAttrDefBuilder: -        self.transformers.append(transformer) -        return self - -    def clear_transformers(self) -> CruAttrDefBuilder: -        self.transformers.clear() -        return self - -    def add_validator(self, validator: CruAttrValidator) -> CruAttrDefBuilder: -        self.validators.append(validator) -        return self - -    def clear_validators(self) -> CruAttrDefBuilder: -        self.validators.clear() -        return self - -    def with_override_transformer( -        self, override_transformer: CruAttrTransformer | None | CruDontChange -    ) -> CruAttrDefBuilder: -        if override_transformer is not CruDontChange.VALUE: -            self.override_transformer = override_transformer -        return self - -    def with_override_validator( -        self, override_validator: CruAttrValidator | None | CruDontChange -    ) -> CruAttrDefBuilder: -        if override_validator is not CruDontChange.VALUE: -            self.override_validator = override_validator -        return self - -    def is_valid(self) -> tuple[bool, str]: -        if not isinstance(self.name, str): -            return False, "Name must be a string!" -        if not isinstance(self.description, str): -            return False, "Default description must be a string!" -        if ( -            not self.allow_none -            and self.default is None -            and self.default_factory is None -        ): -            return False, "Default must be set if allow_none is False!" -        return True, "" - -    @staticmethod -    def _build( -        builder: CruAttrDefBuilder, auto_adjust_default: bool = True -    ) -> CruAttrDef: -        if auto_adjust_default: -            builder.auto_adjust_default() - -        valid, err = builder.is_valid() -        if not valid: -            raise ValueError(err) - -        def composed_transformer(value: Any, attr_def: CruAttrDef) -> Any: -            def transform_value(single_value: Any) -> Any: -                for transformer in builder.transformers: -                    single_value = transformer(single_value, attr_def) -                return single_value - -            if builder.auto_list: -                if not isinstance(value, list): -                    value = [value] -                value = CruIterator(value).transform(transform_value).to_list() - -            else: -                value = transform_value(value) -            return value - -        type_set = None if builder.types is None else CruTypeSet(*builder.types) - -        def composed_validator(value: Any, attr_def: CruAttrDef): -            def validate_value(single_value: Any) -> None: -                if type_set is not None: -                    type_set.check_value(single_value, allow_none=builder.allow_none) -                for validator in builder.validators: -                    validator(single_value, attr_def) - -            if builder.auto_list: -                CruIterator(value).foreach(validate_value) -            else: -                validate_value(value) - -        real_transformer = builder.override_transformer or composed_transformer -        real_validator = builder.override_validator or composed_validator - -        default_factory = builder.default_factory -        if default_factory is None: - -            def default_factory(_d): -                return copy.deepcopy(builder.default) - -        d = CruAttrDef( -            builder.name, -            builder.description, -            default_factory, -            real_transformer, -            real_validator, -        ) -        if builder.build_hook: -            builder.build_hook(d) -        return d - -    def build(self, auto_adjust_default=True) -> CruAttrDef: -        c = copy.deepcopy(self) -        self.build_hook = None -        return CruAttrDefBuilder._build(c, auto_adjust_default) - - -class CruAttrDefRegistry(CruUniqueKeyList[CruAttrDef, str]): - -    def __init__(self) -> None: -        super().__init__(lambda d: d.name) - -    def make_builder(self, name: str, default_description: str) -> CruAttrDefBuilder: -        b = CruAttrDefBuilder(name, default_description) -        b.build_hook = lambda a: self.add(a) -        return b - -    def adopt(self, attr: CruAttr) -> CruAttr: -        d = self.get(attr.name) -        return d.adopt(attr) - - -class CruAttrTable(CruUniqueKeyList[CruAttr, str]): -    def __init__(self, registry: CruAttrDefRegistry) -> None: -        self._registry: CruAttrDefRegistry = registry -        super().__init__(lambda a: a.name, before_add=registry.adopt) - -    @property -    def registry(self) -> CruAttrDefRegistry: -        return self._registry - -    def get_value_or(self, name: str, fallback: Any = CruNotFound.VALUE) -> Any: -        a = self.get_or(name, CruNotFound.VALUE) -        if a is CruNotFound.VALUE: -            return fallback -        return a.value - -    def get_value(self, name: str) -> Any: -        a = self.get(name) -        return a.value - -    def make_attr( -        self, -        name: str, -        value: Any = CruUseDefault.VALUE, -        /, -        description: str | None = None, -    ) -> CruAttr: -        d = self._registry.get(name) -        return d.make(value, description or d.description) - -    def add_value( -        self, -        name: str, -        value: Any = CruUseDefault.VALUE, -        /, -        description: str | None = None, -        *, -        replace: bool = False, -    ) -> CruAttr: -        attr = self.make_attr(name, value, description) -        self.add(attr, replace) -        return attr diff --git a/tools/cru-py/cru/config.py b/tools/cru-py/cru/config.py deleted file mode 100644 index 0f6f0d0..0000000 --- a/tools/cru-py/cru/config.py +++ /dev/null @@ -1,196 +0,0 @@ -from __future__ import annotations - -from typing import Any, TypeVar, Generic - -from ._error import CruException -from .list import CruUniqueKeyList -from .value import ( -    INTEGER_VALUE_TYPE, -    TEXT_VALUE_TYPE, -    CruValueTypeError, -    ValueGeneratorBase, -    ValueType, -) - -_T = TypeVar("_T") - - -class CruConfigError(CruException): -    def __init__(self, message: str, item: ConfigItem, *args, **kwargs): -        super().__init__(message, *args, **kwargs) -        self._item = item - -    @property -    def item(self) -> ConfigItem: -        return self._item - - -class ConfigItem(Generic[_T]): -    def __init__( -        self, -        name: str, -        description: str, -        value_type: ValueType[_T], -        value: _T | None = None, -        /, -        default: ValueGeneratorBase[_T] | _T | None = None, -    ) -> None: -        self._name = name -        self._description = description -        self._value_type = value_type -        self._value = value -        self._default = default - -    @property -    def name(self) -> str: -        return self._name - -    @property -    def description(self) -> str: -        return self._description - -    @property -    def value_type(self) -> ValueType[_T]: -        return self._value_type - -    @property -    def is_set(self) -> bool: -        return self._value is not None - -    @property -    def value(self) -> _T: -        if self._value is None: -            raise CruConfigError( -                "Config value is not set.", -                self, -                user_message=f"Config item {self.name} is not set.", -            ) -        return self._value - -    @property -    def value_str(self) -> str: -        return self.value_type.convert_value_to_str(self.value) - -    def set_value(self, v: _T | str, allow_convert_from_str=False): -        if allow_convert_from_str: -            self._value = self.value_type.check_value_or_try_convert_from_str(v) -        else: -            self._value = self.value_type.check_value(v) - -    def reset(self): -        self._value = None - -    @property -    def default(self) -> ValueGeneratorBase[_T] | _T | None: -        return self._default - -    @property -    def can_generate_default(self) -> bool: -        return self.default is not None - -    def generate_default_value(self) -> _T: -        if self.default is None: -            raise CruConfigError( -                "Config item does not support default value generation.", self -            ) -        elif isinstance(self.default, ValueGeneratorBase): -            v = self.default.generate() -        else: -            v = self.default -        try: -            self.value_type.check_value(v) -            return v -        except CruValueTypeError as e: -            raise CruConfigError( -                "Config value generator returns an invalid value.", self -            ) from e - -    def copy(self) -> "ConfigItem": -        return ConfigItem( -            self.name, -            self.description, -            self.value_type, -            self.value, -            self.default, -        ) - -    @property -    def description_str(self) -> str: -        return f"{self.name} ({self.value_type.name}): {self.description}" - - -class Configuration(CruUniqueKeyList[ConfigItem[Any], str]): -    def __init__(self): -        super().__init__(lambda c: c.name) - -    def get_set_items(self) -> list[ConfigItem[Any]]: -        return [item for item in self if item.is_set] - -    def get_unset_items(self) -> list[ConfigItem[Any]]: -        return [item for item in self if not item.is_set] - -    @property -    def all_set(self) -> bool: -        return len(self.get_unset_items()) == 0 - -    @property -    def all_not_set(self) -> bool: -        return len(self.get_set_items()) == 0 - -    def add_text_config( -        self, -        name: str, -        description: str, -        value: str | None = None, -        default: ValueGeneratorBase[str] | str | None = None, -    ) -> ConfigItem[str]: -        item = ConfigItem(name, description, TEXT_VALUE_TYPE, value, default) -        self.add(item) -        return item - -    def add_int_config( -        self, -        name: str, -        description: str, -        value: int | None = None, -        default: ValueGeneratorBase[int] | int | None = None, -    ) -> ConfigItem[int]: -        item = ConfigItem(name, description, INTEGER_VALUE_TYPE, value, default) -        self.add(item) -        return item - -    def set_config_item( -        self, -        name: str, -        value: Any | str, -        allow_convert_from_str=True, -    ) -> None: -        item = self.get(name) -        item.set_value( -            value, -            allow_convert_from_str=allow_convert_from_str, -        ) - -    def reset_all(self) -> None: -        for item in self: -            item.reset() - -    def to_dict(self) -> dict[str, Any]: -        return {item.name: item.value for item in self} - -    def to_str_dict(self) -> dict[str, str]: -        return { -            item.name: item.value_type.convert_value_to_str(item.value) for item in self -        } - -    def set_value_dict( -        self, -        value_dict: dict[str, Any], -        allow_convert_from_str: bool = False, -    ) -> None: -        for name, value in value_dict.items(): -            item = self.get(name) -            item.set_value( -                value, -                allow_convert_from_str=allow_convert_from_str, -            ) diff --git a/tools/cru-py/cru/list.py b/tools/cru-py/cru/list.py deleted file mode 100644 index 216a561..0000000 --- a/tools/cru-py/cru/list.py +++ /dev/null @@ -1,160 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable, Iterator -from typing import Any, Generic, Iterable, TypeAlias, TypeVar, overload - -from ._error import CruInternalError -from ._iter import CruIterator -from ._const import CruNotFound - -_T = TypeVar("_T") -_O = TypeVar("_O") - - -class CruListEdit(CruIterator[_T]): -    def __init__(self, iterable: Iterable[_T], _list: CruList[Any]) -> None: -        super().__init__(iterable) -        self._list = _list - -    def create_me(self, iterable: Iterable[_O]) -> CruListEdit[_O]: -        return CruListEdit(iterable, self._list) - -    @property -    def list(self) -> CruList[Any]: -        return self._list - -    def done(self) -> CruList[Any]: -        self._list.reset(self) -        return self._list - - -class CruList(list[_T]): -    def reset(self, new_values: Iterable[_T]): -        if self is new_values: -            new_values = list(new_values) -        self.clear() -        self.extend(new_values) -        return self - -    def as_cru_iterator(self) -> CruIterator[_T]: -        return CruIterator(self) - -    @staticmethod -    def make(maybe_list: Iterable[_T] | _T | None) -> CruList[_T]: -        if maybe_list is None: -            return CruList() -        if isinstance(maybe_list, Iterable): -            return CruList(maybe_list) -        return CruList([maybe_list]) - - -_K = TypeVar("_K") - -_KeyGetter: TypeAlias = Callable[[_T], _K] - - -class CruUniqueKeyList(Generic[_T, _K]): -    def __init__( -        self, -        key_getter: _KeyGetter[_T, _K], -        *, -        before_add: Callable[[_T], _T] | None = None, -    ): -        super().__init__() -        self._key_getter = key_getter -        self._before_add = before_add -        self._list: CruList[_T] = CruList() - -    @property -    def key_getter(self) -> _KeyGetter[_T, _K]: -        return self._key_getter - -    @property -    def internal_list(self) -> CruList[_T]: -        return self._list - -    def validate_self(self): -        keys = self._list.transform(self._key_getter) -        if len(keys) != len(set(keys)): -            raise CruInternalError("Duplicate keys!") - -    @overload -    def get_or( -        self, key: _K, fallback: CruNotFound = CruNotFound.VALUE -    ) -> _T | CruNotFound: ... - -    @overload -    def get_or(self, key: _K, fallback: _O) -> _T | _O: ... - -    def get_or( -        self, key: _K, fallback: _O | CruNotFound = CruNotFound.VALUE -    ) -> _T | _O | CruNotFound: -        return ( -            self._list.as_cru_iterator() -            .filter(lambda v: key == self._key_getter(v)) -            .first_or(fallback) -        ) - -    def get(self, key: _K) -> _T: -        value = self.get_or(key) -        if value is CruNotFound.VALUE: -            raise KeyError(f"Key {key} not found!") -        return value  # type: ignore - -    @property -    def keys(self) -> Iterable[_K]: -        return self._list.as_cru_iterator().map(self._key_getter) - -    def has_key(self, key: _K) -> bool: -        return self.get_or(key) != CruNotFound.VALUE - -    def try_remove(self, key: _K) -> bool: -        value = self.get_or(key) -        if value is CruNotFound.VALUE: -            return False -        self._list.remove(value) -        return True - -    def remove(self, key: _K, allow_absence: bool = False) -> None: -        if not self.try_remove(key) and not allow_absence: -            raise KeyError(f"Key {key} not found!") - -    def add(self, value: _T, /, replace: bool = False) -> None: -        v = self.get_or(self._key_getter(value)) -        if v is not CruNotFound.VALUE: -            if not replace: -                raise KeyError(f"Key {self._key_getter(v)} already exists!") -            self._list.remove(v) -        if self._before_add is not None: -            value = self._before_add(value) -        self._list.append(value) - -    def set(self, value: _T) -> None: -        self.add(value, True) - -    def extend(self, iterable: Iterable[_T], /, replace: bool = False) -> None: -        values = list(iterable) -        to_remove = [] -        for value in values: -            v = self.get_or(self._key_getter(value)) -            if v is not CruNotFound.VALUE: -                if not replace: -                    raise KeyError(f"Key {self._key_getter(v)} already exists!") -                to_remove.append(v) -        for value in to_remove: -            self._list.remove(value) -        if self._before_add is not None: -            values = [self._before_add(value) for value in values] -        self._list.extend(values) - -    def clear(self) -> None: -        self._list.reset([]) - -    def __iter__(self) -> Iterator[_T]: -        return iter(self._list) - -    def __len__(self) -> int: -        return len(self._list) - -    def cru_iter(self) -> CruIterator[_T]: -        return CruIterator(self._list) diff --git a/tools/cru-py/cru/parsing.py b/tools/cru-py/cru/parsing.py deleted file mode 100644 index c31ce35..0000000 --- a/tools/cru-py/cru/parsing.py +++ /dev/null @@ -1,290 +0,0 @@ -from __future__ import annotations - -from abc import ABCMeta, abstractmethod -from dataclasses import dataclass -from enum import Enum -from typing import NamedTuple, TypeAlias, TypeVar, Generic, NoReturn, Callable - -from ._error import CruException -from ._iter import CruIterable - -_T = TypeVar("_T") - - -class StrParseStream: -    class MemStackEntry(NamedTuple): -        pos: int -        lineno: int - -    class MemStackPopStr(NamedTuple): -        text: str -        lineno: int - -    def __init__(self, text: str) -> None: -        self._text = text -        self._pos = 0 -        self._lineno = 1 -        self._length = len(self._text) -        self._valid_pos_range = range(0, self.length + 1) -        self._valid_offset_range = range(-self.length, self.length + 1) -        self._mem_stack: CruIterable.IterList[StrParseStream.MemStackEntry] = ( -            CruIterable.IterList() -        ) - -    @property -    def text(self) -> str: -        return self._text - -    @property -    def length(self) -> int: -        return self._length - -    @property -    def valid_pos_range(self) -> range: -        return self._valid_pos_range - -    @property -    def valid_offset_range(self) -> range: -        return self._valid_offset_range - -    @property -    def pos(self) -> int: -        return self._pos - -    @property -    def lineno(self) -> int: -        return self._lineno - -    @property -    def eof(self) -> bool: -        return self._pos == self.length - -    def peek(self, length: int) -> str: -        real_length = min(length, self.length - self._pos) -        new_position = self._pos + real_length -        text = self._text[self._pos : new_position] -        return text - -    def read(self, length: int) -> str: -        text = self.peek(length) -        self._pos += len(text) -        self._lineno += text.count("\n") -        return text - -    def skip(self, length: int) -> None: -        self.read(length) - -    def peek_str(self, text: str) -> bool: -        if self.pos + len(text) > self.length: -            return False -        for offset in range(len(text)): -            if self._text[self.pos + offset] != text[offset]: -                return False -        return True - -    def read_str(self, text: str) -> bool: -        if not self.peek_str(text): -            return False -        self._pos += len(text) -        self._lineno += text.count("\n") -        return True - -    @property -    def mem_stack(self) -> CruIterable.IterList[MemStackEntry]: -        return self._mem_stack - -    def push_mem(self) -> None: -        self.mem_stack.append(self.MemStackEntry(self.pos, self.lineno)) - -    def pop_mem(self) -> MemStackEntry: -        return self.mem_stack.pop() - -    def pop_mem_str(self, strip_end: int = 0) -> MemStackPopStr: -        old = self.pop_mem() -        assert self.pos >= old.pos -        return self.MemStackPopStr( -            self._text[old.pos : self.pos - strip_end], old.lineno -        ) - - -class ParseError(CruException, Generic[_T]): -    def __init__( -        self, -        message, -        parser: Parser[_T], -        text: str, -        line_number: int | None = None, -        *args, -        **kwargs, -    ): -        super().__init__(message, *args, **kwargs) -        self._parser = parser -        self._text = text -        self._line_number = line_number - -    @property -    def parser(self) -> Parser[_T]: -        return self._parser - -    @property -    def text(self) -> str: -        return self._text - -    @property -    def line_number(self) -> int | None: -        return self._line_number - - -class Parser(Generic[_T], metaclass=ABCMeta): -    def __init__(self, name: str) -> None: -        self._name = name - -    @property -    def name(self) -> str: -        return self._name - -    @abstractmethod -    def parse(self, s: str) -> _T: -        raise NotImplementedError() - -    def raise_parse_exception( -        self, text: str, line_number: int | None = None -    ) -> NoReturn: -        a = line_number and f" at line {line_number}" or "" -        raise ParseError(f"Parser {self.name} failed{a}.", self, text, line_number) - - -class SimpleLineConfigParserEntry(NamedTuple): -    key: str -    value: str -    line_number: int | None = None - - -class SimpleLineConfigParserResult(CruIterable.IterList[SimpleLineConfigParserEntry]): -    pass - - -class SimpleLineConfigParser(Parser[SimpleLineConfigParserResult]): -    """ -    The parsing result is a list of tuples (key, value, line number). -    """ - -    Entry: TypeAlias = SimpleLineConfigParserEntry -    Result: TypeAlias = SimpleLineConfigParserResult - -    def __init__(self) -> None: -        super().__init__(type(self).__name__) - -    def _parse(self, text: str, callback: Callable[[Entry], None]) -> None: -        for ln, line in enumerate(text.splitlines()): -            line_number = ln + 1 -            # check if it's a comment -            if line.strip().startswith("#"): -                continue -            # check if there is a '=' -            if line.find("=") == -1: -                self.raise_parse_exception("There is even no '='!", line_number) -            # split at first '=' -            key, value = line.split("=", 1) -            key = key.strip() -            value = value.strip() -            callback(SimpleLineConfigParserEntry(key, value, line_number)) - -    def parse(self, text: str) -> Result: -        result = SimpleLineConfigParserResult() -        self._parse(text, lambda item: result.append(item)) -        return result - - -class _StrWrapperVarParserTokenKind(Enum): -    TEXT = "TEXT" -    VAR = "VAR" - - -@dataclass -class _StrWrapperVarParserToken: -    kind: _StrWrapperVarParserTokenKind -    value: str -    line_number: int - -    @property -    def is_text(self) -> bool: -        return self.kind is _StrWrapperVarParserTokenKind.TEXT - -    @property -    def is_var(self) -> bool: -        return self.kind is _StrWrapperVarParserTokenKind.VAR - -    @staticmethod -    def from_mem_str( -        kind: _StrWrapperVarParserTokenKind, mem_str: StrParseStream.MemStackPopStr -    ) -> _StrWrapperVarParserToken: -        return _StrWrapperVarParserToken(kind, mem_str.text, mem_str.lineno) - -    def __repr__(self) -> str: -        return f"VAR: {self.value}" if self.is_var else "TEXT: ..." - - -class _StrWrapperVarParserResult(CruIterable.IterList[_StrWrapperVarParserToken]): -    pass - - -class StrWrapperVarParser(Parser[_StrWrapperVarParserResult]): -    TokenKind: TypeAlias = _StrWrapperVarParserTokenKind -    Token: TypeAlias = _StrWrapperVarParserToken -    Result: TypeAlias = _StrWrapperVarParserResult - -    def __init__(self, wrapper: str): -        super().__init__(f"StrWrapperVarParser({wrapper})") -        self._wrapper = wrapper - -    @property -    def wrapper(self) -> str: -        return self._wrapper - -    def parse(self, text: str) -> Result: -        result = self.Result() - -        class _State(Enum): -            TEXT = "TEXT" -            VAR = "VAR" - -        state = _State.TEXT -        stream = StrParseStream(text) -        stream.push_mem() - -        while True: -            if stream.eof: -                break - -            if stream.read_str(self.wrapper): -                if state is _State.TEXT: -                    result.append( -                        self.Token.from_mem_str( -                            self.TokenKind.TEXT, stream.pop_mem_str(len(self.wrapper)) -                        ) -                    ) -                    state = _State.VAR -                    stream.push_mem() -                else: -                    result.append( -                        self.Token.from_mem_str( -                            self.TokenKind.VAR, -                            stream.pop_mem_str(len(self.wrapper)), -                        ) -                    ) -                    state = _State.TEXT -                    stream.push_mem() - -                continue - -            stream.skip(1) - -        if state is _State.VAR: -            raise ParseError("Text ended without closing variable.", self, text) - -        mem_str = stream.pop_mem_str() -        if len(mem_str.text) != 0: -            result.append(self.Token.from_mem_str(self.TokenKind.TEXT, mem_str)) - -        return result diff --git a/tools/cru-py/cru/service/__init__.py b/tools/cru-py/cru/service/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/tools/cru-py/cru/service/__init__.py +++ /dev/null diff --git a/tools/cru-py/cru/service/__main__.py b/tools/cru-py/cru/service/__main__.py deleted file mode 100644 index 1c10e82..0000000 --- a/tools/cru-py/cru/service/__main__.py +++ /dev/null @@ -1,20 +0,0 @@ -from cru import CruException - -from ._app import create_app - - -def main(): -    app = create_app() -    app.run_command() - - -if __name__ == "__main__": -    try: -        main() -    except CruException as e: -        user_message = e.get_user_message() -        if user_message is not None: -            print(f"Error: {user_message}") -            exit(1) -        else: -            raise diff --git a/tools/cru-py/cru/service/_app.py b/tools/cru-py/cru/service/_app.py deleted file mode 100644 index 6030dad..0000000 --- a/tools/cru-py/cru/service/_app.py +++ /dev/null @@ -1,34 +0,0 @@ -from ._base import ( -    AppBase, -    CommandDispatcher, -    AppInitializer, -    PathCommandProvider, -) -from ._config import ConfigManager -from ._template import TemplateManager -from ._nginx import NginxManager -from ._external import CliToolCommandProvider - -APP_ID = "crupest" - - -class App(AppBase): -    def __init__(self): -        super().__init__(APP_ID, f"{APP_ID}-service") -        self.add_feature(PathCommandProvider()) -        self.add_feature(AppInitializer()) -        self.add_feature(ConfigManager()) -        self.add_feature(TemplateManager()) -        self.add_feature(NginxManager()) -        self.add_feature(CliToolCommandProvider()) -        self.add_feature(CommandDispatcher()) - -    def run_command(self): -        command_dispatcher = self.get_feature(CommandDispatcher) -        command_dispatcher.run_command() - - -def create_app() -> App: -    app = App() -    app.setup() -    return app diff --git a/tools/cru-py/cru/service/_base.py b/tools/cru-py/cru/service/_base.py deleted file mode 100644 index ad813c9..0000000 --- a/tools/cru-py/cru/service/_base.py +++ /dev/null @@ -1,449 +0,0 @@ -from __future__ import annotations - -from argparse import ArgumentParser, Namespace -from abc import ABC, abstractmethod -import argparse -import os -from pathlib import Path -from typing import TypeVar, overload - -from cru import CruException, CruLogicError - -_Feature = TypeVar("_Feature", bound="AppFeatureProvider") - - -class AppError(CruException): -    pass - - -class AppFeatureError(AppError): -    def __init__(self, message, feature: type | str, *args, **kwargs): -        super().__init__(message, *args, **kwargs) -        self._feature = feature - -    @property -    def feature(self) -> type | str: -        return self._feature - - -class AppPathError(CruException): -    def __init__(self, message, _path: str | Path, *args, **kwargs): -        super().__init__(message, *args, **kwargs) -        self._path = str(_path) - -    @property -    def path(self) -> str: -        return self._path - - -class AppPath(ABC): -    def __init__(self, id: str, is_dir: bool, description: str) -> None: -        self._is_dir = is_dir -        self._id = id -        self._description = description - -    @property -    @abstractmethod -    def parent(self) -> AppPath | None: ... - -    @property -    @abstractmethod -    def app(self) -> AppBase: ... - -    @property -    def id(self) -> str: -        return self._id - -    @property -    def description(self) -> str: -        return self._description - -    @property -    def is_dir(self) -> bool: -        return self._is_dir - -    @property -    @abstractmethod -    def full_path(self) -> Path: ... - -    @property -    def full_path_str(self) -> str: -        return str(self.full_path) - -    def check_parents(self, must_exist: bool = False) -> bool: -        for p in reversed(self.full_path.parents): -            if not p.exists() and not must_exist: -                return False -            if not p.is_dir(): -                raise AppPathError("Parents' path must be a dir.", self.full_path) -        return True - -    def check_self(self, must_exist: bool = False) -> bool: -        if not self.check_parents(must_exist): -            return False -        if not self.full_path.exists(): -            if not must_exist: -                return False -            raise AppPathError("Not exist.", self.full_path) -        if self.is_dir: -            if not self.full_path.is_dir(): -                raise AppPathError("Should be a directory, but not.", self.full_path) -            else: -                return True -        else: -            if not self.full_path.is_file(): -                raise AppPathError("Should be a file, but not.", self.full_path) -            else: -                return True - -    def ensure(self, create_file: bool = False) -> None: -        e = self.check_self(False) -        if not e: -            os.makedirs(self.full_path.parent, exist_ok=True) -            if self.is_dir: -                os.mkdir(self.full_path) -            elif create_file: -                with open(self.full_path, "w") as f: -                    f.write("") - -    def add_subpath( -        self, -        name: str, -        is_dir: bool, -        /, -        id: str | None = None, -        description: str = "", -    ) -> AppFeaturePath: -        return self.app.add_path(name, is_dir, self, id, description) - -    @property -    def app_relative_path(self) -> Path: -        return self.full_path.relative_to(self.app.root.full_path) - - -class AppFeaturePath(AppPath): -    def __init__( -        self, -        parent: AppPath, -        name: str, -        is_dir: bool, -        /, -        id: str | None = None, -        description: str = "", -    ) -> None: -        super().__init__(id or name, is_dir, description) -        self._name = name -        self._parent = parent - -    @property -    def name(self) -> str: -        return self._name - -    @property -    def parent(self) -> AppPath: -        return self._parent - -    @property -    def app(self) -> AppBase: -        return self.parent.app - -    @property -    def full_path(self) -> Path: -        return Path(self.parent.full_path, self.name).resolve() - - -class AppRootPath(AppPath): -    def __init__(self, app: AppBase): -        super().__init__("root", True, "Application root path.") -        self._app = app -        self._full_path: Path | None = None - -    @property -    def parent(self) -> None: -        return None - -    @property -    def app(self) -> AppBase: -        return self._app - -    @property -    def full_path(self) -> Path: -        if self._full_path is None: -            raise AppError("App root path is not set yet.") -        return self._full_path - -    def setup(self, path: os.PathLike) -> None: -        if self._full_path is not None: -            raise AppError("App root path is already set.") -        self._full_path = Path(path).resolve() - - -class AppFeatureProvider(ABC): -    def __init__(self, name: str, /, app: AppBase | None = None): -        super().__init__() -        self._name = name -        self._app = app if app else AppBase.get_instance() - -    @property -    def app(self) -> AppBase: -        return self._app - -    @property -    def name(self) -> str: -        return self._name - -    @abstractmethod -    def setup(self) -> None: ... - - -class AppCommandFeatureProvider(AppFeatureProvider): -    @abstractmethod -    def get_command_info(self) -> tuple[str, str]: ... - -    @abstractmethod -    def setup_arg_parser(self, arg_parser: ArgumentParser): ... - -    @abstractmethod -    def run_command(self, args: Namespace) -> None: ... - - -DATA_DIR_NAME = "data" - - -class PathCommandProvider(AppCommandFeatureProvider): -    def __init__(self) -> None: -        super().__init__("path-command-provider") - -    def setup(self): -        pass - -    def get_command_info(self): -        return ("path", "Get information about paths used by app.") - -    def setup_arg_parser(self, arg_parser: ArgumentParser) -> None: -        subparsers = arg_parser.add_subparsers( -            dest="path_command", required=True, metavar="PATH_COMMAND" -        ) -        _list_parser = subparsers.add_parser( -            "list", help="list special paths used by app" -        ) - -    def run_command(self, args: Namespace) -> None: -        if args.path_command == "list": -            for path in self.app.paths: -                print(f"{path.app_relative_path.as_posix()}: {path.description}") - - -class CommandDispatcher(AppFeatureProvider): -    def __init__(self) -> None: -        super().__init__("command-dispatcher") -        self._parsed_args: argparse.Namespace | None = None - -    def setup_arg_parser(self) -> None: -        epilog = """ -==> to start, -./tools/manage init -./tools/manage config init -ln -s generated/docker-compose.yaml . -# Then edit config file. - -==> to update -git pull -./tools/manage template generate --no-dry-run -docker compose up -        """.strip() - -        self._map: dict[str, AppCommandFeatureProvider] = {} -        arg_parser = argparse.ArgumentParser( -            description="Service management", -            formatter_class=argparse.RawDescriptionHelpFormatter, -            epilog=epilog, -        ) -        arg_parser.add_argument( -            "--project-dir", -            help="The path of the project directory.", -            required=True, -            type=str, -        ) -        subparsers = arg_parser.add_subparsers( -            dest="command", -            help="The management command to execute.", -            metavar="COMMAND", -        ) -        for feature in self.app.features: -            if isinstance(feature, AppCommandFeatureProvider): -                info = feature.get_command_info() -                command_subparser = subparsers.add_parser(info[0], help=info[1]) -                feature.setup_arg_parser(command_subparser) -                self._map[info[0]] = feature -        self._arg_parser = arg_parser - -    def setup(self): -        pass - -    @property -    def arg_parser(self) -> argparse.ArgumentParser: -        return self._arg_parser - -    @property -    def map(self) -> dict[str, AppCommandFeatureProvider]: -        return self._map - -    def get_program_parsed_args(self) -> argparse.Namespace: -        if self._parsed_args is None: -            self._parsed_args = self.arg_parser.parse_args() -        return self._parsed_args - -    def run_command(self, args: argparse.Namespace | None = None) -> None: -        real_args = args or self.get_program_parsed_args() -        if real_args.command is None: -            self.arg_parser.print_help() -            return -        self.map[real_args.command].run_command(real_args) - - -class AppInitializer(AppCommandFeatureProvider): -    def __init__(self) -> None: -        super().__init__("app-initializer") - -    def _init_app(self) -> bool: -        if self.app.app_initialized: -            return False -        self.app.data_dir.ensure() -        return True - -    def setup(self): -        pass - -    def get_command_info(self): -        return ("init", "Initialize the app.") - -    def setup_arg_parser(self, arg_parser): -        pass - -    def run_command(self, args): -        init = self._init_app() -        if init: -            print("App initialized successfully.") -        else: -            print("App is already initialized. Do nothing.") - - -class AppBase: -    _instance: AppBase | None = None - -    @staticmethod -    def get_instance() -> AppBase: -        if AppBase._instance is None: -            raise AppError("App instance not initialized") -        return AppBase._instance - -    def __init__(self, app_id: str, name: str): -        AppBase._instance = self -        self._app_id = app_id -        self._name = name -        self._root = AppRootPath(self) -        self._paths: list[AppFeaturePath] = [] -        self._features: list[AppFeatureProvider] = [] - -    def setup(self) -> None: -        command_dispatcher = self.get_feature(CommandDispatcher) -        command_dispatcher.setup_arg_parser() -        program_args = command_dispatcher.get_program_parsed_args() -        self.setup_root(program_args.project_dir) -        self._data_dir = self.add_path(DATA_DIR_NAME, True, id="data") -        for feature in self.features: -            feature.setup() -        for path in self.paths: -            path.check_self() - -    @property -    def app_id(self) -> str: -        return self._app_id - -    @property -    def name(self) -> str: -        return self._name - -    @property -    def root(self) -> AppRootPath: -        return self._root - -    def setup_root(self, path: os.PathLike) -> None: -        self._root.setup(path) - -    @property -    def data_dir(self) -> AppFeaturePath: -        return self._data_dir - -    @property -    def app_initialized(self) -> bool: -        return self.data_dir.check_self() - -    def ensure_app_initialized(self) -> AppRootPath: -        if not self.app_initialized: -            raise AppError( -                user_message="Root directory does not exist. " -                "Please run 'init' to create one." -            ) -        return self.root - -    @property -    def features(self) -> list[AppFeatureProvider]: -        return self._features - -    @property -    def paths(self) -> list[AppFeaturePath]: -        return self._paths - -    def add_feature(self, feature: _Feature) -> _Feature: -        for f in self.features: -            if f.name == feature.name: -                raise AppFeatureError( -                    f"Duplicate feature name: {feature.name}.", feature.name -                ) -        self._features.append(feature) -        return feature - -    def add_path( -        self, -        name: str, -        is_dir: bool, -        /, -        parent: AppPath | None = None, -        id: str | None = None, -        description: str = "", -    ) -> AppFeaturePath: -        p = AppFeaturePath( -            parent or self.root, name, is_dir, id=id, description=description -        ) -        self._paths.append(p) -        return p - -    @overload -    def get_feature(self, feature: str) -> AppFeatureProvider: ... - -    @overload -    def get_feature(self, feature: type[_Feature]) -> _Feature: ... - -    def get_feature( -        self, feature: str | type[_Feature] -    ) -> AppFeatureProvider | _Feature: -        if isinstance(feature, str): -            for f in self._features: -                if f.name == feature: -                    return f -        elif isinstance(feature, type): -            for f in self._features: -                if isinstance(f, feature): -                    return f -        else: -            raise CruLogicError("Argument must be the name of feature or its class.") - -        raise AppFeatureError(f"Feature {feature} not found.", feature) - -    def get_path(self, name: str) -> AppFeaturePath: -        for p in self._paths: -            if p.id == name or p.name == name: -                return p -        raise AppPathError(f"Application path {name} not found.", name) diff --git a/tools/cru-py/cru/service/_config.py b/tools/cru-py/cru/service/_config.py deleted file mode 100644 index cbb9533..0000000 --- a/tools/cru-py/cru/service/_config.py +++ /dev/null @@ -1,444 +0,0 @@ -from collections.abc import Iterable -from typing import Any, Literal, overload - -from cru import CruException, CruNotFound -from cru.config import Configuration, ConfigItem -from cru.value import ( -    INTEGER_VALUE_TYPE, -    TEXT_VALUE_TYPE, -    CruValueTypeError, -    RandomStringValueGenerator, -    UuidValueGenerator, -) -from cru.parsing import ParseError, SimpleLineConfigParser - -from ._base import AppFeaturePath, AppCommandFeatureProvider - - -class AppConfigError(CruException): -    def __init__( -        self, message: str, configuration: Configuration, *args, **kwargs -    ) -> None: -        super().__init__(message, *args, **kwargs) -        self._configuration = configuration - -    @property -    def configuration(self) -> Configuration: -        return self._configuration - - -class AppConfigFileError(AppConfigError): -    def __init__( -        self, -        message: str, -        configuration: Configuration, -        *args, -        **kwargs, -    ) -> None: -        super().__init__(message, configuration, *args, **kwargs) - - -class AppConfigFileNotFoundError(AppConfigFileError): -    def __init__( -        self, -        message: str, -        configuration: Configuration, -        file_path: str, -        *args, -        **kwargs, -    ) -> None: -        super().__init__(message, configuration, *args, **kwargs) -        self._file_path = file_path - -    @property -    def file_path(self) -> str: -        return self._file_path - - -class AppConfigFileParseError(AppConfigFileError): -    def __init__( -        self, -        message: str, -        configuration: Configuration, -        file_content: str, -        *args, -        **kwargs, -    ) -> None: -        super().__init__(message, configuration, *args, **kwargs) -        self._file_content = file_content -        self.__cause__: ParseError - -    @property -    def file_content(self) -> str: -        return self._file_content - -    def get_user_message(self) -> str: -        return f"Error while parsing config file at line {self.__cause__.line_number}." - - -class AppConfigFileEntryError(AppConfigFileError): -    def __init__( -        self, -        message: str, -        configuration: Configuration, -        entries: Iterable[SimpleLineConfigParser.Entry], -        *args, -        **kwargs, -    ) -> None: -        super().__init__(message, configuration, *args, **kwargs) -        self._entries = list(entries) - -    @property -    def error_entries(self) -> list[SimpleLineConfigParser.Entry]: -        return self._entries - -    @staticmethod -    def entries_to_friendly_message( -        entries: Iterable[SimpleLineConfigParser.Entry], -    ) -> str: -        return "\n".join( -            f"line {entry.line_number}: {entry.key}={entry.value}" for entry in entries -        ) - -    @property -    def friendly_message_head(self) -> str: -        return "Error entries found in config file" - -    def get_user_message(self) -> str: -        return ( -            f"{self.friendly_message_head}:\n" -            f"{self.entries_to_friendly_message(self.error_entries)}" -        ) - - -class AppConfigDuplicateEntryError(AppConfigFileEntryError): -    @property -    def friendly_message_head(self) -> str: -        return "Duplicate entries found in config file" - - -class AppConfigEntryValueFormatError(AppConfigFileEntryError): -    @property -    def friendly_message_head(self) -> str: -        return "Invalid value format for entries" - - -class AppConfigItemNotSetError(AppConfigError): -    def __init__( -        self, -        message: str, -        configuration: Configuration, -        items: list[ConfigItem], -        *args, -        **kwargs, -    ) -> None: -        super().__init__(message, configuration, *args, **kwargs) -        self._items = items - - -class ConfigManager(AppCommandFeatureProvider): -    def __init__(self) -> None: -        super().__init__("config-manager") -        configuration = Configuration() -        self._configuration = configuration -        self._loaded: bool = False -        self._init_app_defined_items() - -    def _init_app_defined_items(self) -> None: -        prefix = self.config_name_prefix - -        def _add_text(name: str, description: str) -> ConfigItem: -            item = ConfigItem(f"{prefix}_{name}", description, TEXT_VALUE_TYPE) -            self.configuration.add(item) -            return item - -        def _add_uuid(name: str, description: str) -> ConfigItem: -            item = ConfigItem( -                f"{prefix}_{name}", -                description, -                TEXT_VALUE_TYPE, -                default=UuidValueGenerator(), -            ) -            self.configuration.add(item) -            return item - -        def _add_random_string( -            name: str, description: str, length: int = 32, secure: bool = True -        ) -> ConfigItem: -            item = ConfigItem( -                f"{prefix}_{name}", -                description, -                TEXT_VALUE_TYPE, -                default=RandomStringValueGenerator(length, secure), -            ) -            self.configuration.add(item) -            return item - -        def _add_int(name: str, description: str) -> ConfigItem: -            item = ConfigItem(f"{prefix}_{name}", description, INTEGER_VALUE_TYPE) -            self.configuration.add(item) -            return item - -        self._domain = _add_text("DOMAIN", "domain name") -        self._email = _add_text("EMAIL", "admin email address") -        _add_text( -            "AUTO_BACKUP_COS_SECRET_ID", -            "access key id for Tencent COS, used for auto backup", -        ) -        _add_text( -            "AUTO_BACKUP_COS_SECRET_KEY", -            "access key secret for Tencent COS, used for auto backup", -        ) -        _add_text( -            "AUTO_BACKUP_COS_ENDPOINT", -            "endpoint (cos.*.myqcloud.com) for Tencent COS, used for auto backup", -        ) -        _add_text( -            "AUTO_BACKUP_COS_BUCKET", -            "bucket name for Tencent COS, used for auto backup", -        ) -        _add_uuid("V2RAY_TOKEN", "v2ray user id") -        _add_uuid("V2RAY_PATH", "v2ray path, which will be prefixed by _") -        _add_random_string("2FAUTH_APP_KEY", "2FAuth App Key") -        _add_text("2FAUTH_MAIL_USERNAME", "2FAuth SMTP user") -        _add_text("2FAUTH_MAIL_PASSWORD", "2FAuth SMTP password") -        _add_text("GIT_SERVER_USERNAME", "Git server username") -        _add_text("GIT_SERVER_PASSWORD", "Git server password") - -    def setup(self) -> None: -        self._config_file_path = self.app.data_dir.add_subpath( -            "config", False, description="Configuration file path." -        ) - -    @property -    def config_name_prefix(self) -> str: -        return self.app.app_id.upper() - -    @property -    def configuration(self) -> Configuration: -        return self._configuration - -    @property -    def config_file_path(self) -> AppFeaturePath: -        return self._config_file_path - -    @property -    def all_set(self) -> bool: -        return self.configuration.all_set - -    def get_item(self, name: str) -> ConfigItem[Any]: -        if not name.startswith(self.config_name_prefix + "_"): -            name = f"{self.config_name_prefix}_{name}" - -        item = self.configuration.get_or(name, None) -        if item is None: -            raise AppConfigError(f"Config item '{name}' not found.", self.configuration) -        return item - -    @overload -    def get_item_value_str(self, name: str) -> str: ... - -    @overload -    def get_item_value_str(self, name: str, ensure_set: Literal[True]) -> str: ... - -    @overload -    def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: ... - -    def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: -        self.load_config_file() -        item = self.get_item(name) -        if not item.is_set: -            if ensure_set: -                raise AppConfigItemNotSetError( -                    f"Config item '{name}' is not set.", self.configuration, [item] -                ) -            return None -        return item.value_str - -    def get_str_dict(self, ensure_all_set: bool = True) -> dict[str, str]: -        self.load_config_file() -        if ensure_all_set and not self.configuration.all_set: -            raise AppConfigItemNotSetError( -                "Some config items are not set.", -                self.configuration, -                self.configuration.get_unset_items(), -            ) -        return self.configuration.to_str_dict() - -    @property -    def domain_item_name(self) -> str: -        return self._domain.name - -    def get_domain_value_str(self) -> str: -        return self.get_item_value_str(self._domain.name) - -    def get_email_value_str_optional(self) -> str | None: -        return self.get_item_value_str(self._email.name, ensure_set=False) - -    def _set_with_default(self) -> None: -        if not self.configuration.all_not_set: -            raise AppConfigError( -                "Config is not clean. " -                "Some config items are already set. " -                "Can't set again with default value.", -                self.configuration, -            ) -        for item in self.configuration: -            if item.can_generate_default: -                item.set_value(item.generate_default_value()) - -    def _to_config_file_content(self) -> str: -        content = "".join( -            [ -                f"{item.name}={item.value_str if item.is_set else ''}\n" -                for item in self.configuration -            ] -        ) -        return content - -    def _create_init_config_file(self) -> None: -        if self.config_file_path.check_self(): -            raise AppConfigError( -                "Config file already exists.", -                self.configuration, -                user_message=f"The config file at " -                f"{self.config_file_path.full_path_str} already exists.", -            ) -        self._set_with_default() -        self.config_file_path.ensure() -        with open( -            self.config_file_path.full_path, "w", encoding="utf-8", newline="\n" -        ) as file: -            file.write(self._to_config_file_content()) - -    def _parse_config_file(self) -> SimpleLineConfigParser.Result: -        if not self.config_file_path.check_self(): -            raise AppConfigFileNotFoundError( -                "Config file not found.", -                self.configuration, -                self.config_file_path.full_path_str, -                user_message=f"The config file at " -                f"{self.config_file_path.full_path_str} does not exist. " -                f"You can create an initial one with 'init' command.", -            ) - -        text = self.config_file_path.full_path.read_text() -        try: -            parser = SimpleLineConfigParser() -            return parser.parse(text) -        except ParseError as e: -            raise AppConfigFileParseError( -                "Failed to parse config file.", self.configuration, text -            ) from e - -    def _parse_and_print_config_file(self) -> None: -        parse_result = self._parse_config_file() -        for entry in parse_result: -            print(f"{entry.key}={entry.value}") - -    def _check_duplicate( -        self, -        parse_result: dict[str, list[SimpleLineConfigParser.Entry]], -    ) -> dict[str, SimpleLineConfigParser.Entry]: -        entry_dict: dict[str, SimpleLineConfigParser.Entry] = {} -        duplicate_entries: list[SimpleLineConfigParser.Entry] = [] -        for key, entries in parse_result.items(): -            entry_dict[key] = entries[0] -            if len(entries) > 1: -                duplicate_entries.extend(entries) -        if len(duplicate_entries) > 0: -            raise AppConfigDuplicateEntryError( -                "Duplicate entries found.", self.configuration, duplicate_entries -            ) - -        return entry_dict - -    def _check_type( -        self, entry_dict: dict[str, SimpleLineConfigParser.Entry] -    ) -> dict[str, Any]: -        value_dict: dict[str, Any] = {} -        error_entries: list[SimpleLineConfigParser.Entry] = [] -        errors: list[CruValueTypeError] = [] -        for key, entry in entry_dict.items(): -            try: -                if entry.value == "": -                    value_dict[key] = None -                else: -                    value = entry.value -                    config_item = self.configuration.get_or(key) -                    if config_item is not CruNotFound.VALUE: -                        value = config_item.value_type.convert_str_to_value(value) -                    value_dict[key] = value -            except CruValueTypeError as e: -                error_entries.append(entry) -                errors.append(e) -        if len(error_entries) > 0: -            raise AppConfigEntryValueFormatError( -                "Entry value format is not correct.", -                self.configuration, -                error_entries, -            ) from ExceptionGroup("Multiple format errors occurred.", errors) -        return value_dict - -    def _read_config_file(self) -> dict[str, Any]: -        parsed = self._parse_config_file() -        entry_groups = parsed.cru_iter().group_by(lambda e: e.key) -        entry_dict = self._check_duplicate(entry_groups) -        value_dict = self._check_type(entry_dict) -        return value_dict - -    def _real_load_config_file(self) -> None: -        self.configuration.reset_all() -        value_dict = self._read_config_file() -        for key, value in value_dict.items(): -            if value is None: -                continue -            self.configuration.set_config_item(key, value) - -    def load_config_file(self, force=False) -> None: -        if force or not self._loaded: -            self._real_load_config_file() -            self._loaded = True - -    def _print_app_config_info(self): -        for item in self.configuration: -            print(item.description_str) - -    def get_command_info(self): -        return "config", "Manage configuration." - -    def setup_arg_parser(self, arg_parser) -> None: -        subparsers = arg_parser.add_subparsers( -            dest="config_command", required=True, metavar="CONFIG_COMMAND" -        ) -        _init_parser = subparsers.add_parser( -            "init", help="create an initial config file" -        ) -        _print_app_parser = subparsers.add_parser( -            "print-app", -            help="print information of the config items defined by app", -        ) -        _print_parser = subparsers.add_parser("print", help="print current config") -        _check_config_parser = subparsers.add_parser( -            "check", -            help="check the validity of the config file", -        ) -        _check_config_parser.add_argument( -            "-f", -            "--format-only", -            action="store_true", -            help="only check content format, not app config item requirements.", -        ) - -    def run_command(self, args) -> None: -        if args.config_command == "init": -            self._create_init_config_file() -        elif args.config_command == "print-app": -            self._print_app_config_info() -        elif args.config_command == "print": -            self._parse_and_print_config_file() -        elif args.config_command == "check": -            if args.format_only: -                self._parse_config_file() -            else: -                self._read_config_file() diff --git a/tools/cru-py/cru/service/_external.py b/tools/cru-py/cru/service/_external.py deleted file mode 100644 index 2347e95..0000000 --- a/tools/cru-py/cru/service/_external.py +++ /dev/null @@ -1,81 +0,0 @@ -from ._base import AppCommandFeatureProvider -from ._nginx import NginxManager - - -class CliToolCommandProvider(AppCommandFeatureProvider): -    def __init__(self) -> None: -        super().__init__("cli-tool-command-provider") - -    def setup(self): -        pass - -    def get_command_info(self): -        return ("gen-cli", "Get commands of running external cli tools.") - -    def setup_arg_parser(self, arg_parser): -        subparsers = arg_parser.add_subparsers( -            dest="gen_cli_command", required=True, metavar="GEN_CLI_COMMAND" -        ) -        certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") -        certbot_parser.add_argument( -            "-t", "--test", action="store_true", help="run certbot in test mode" -        ) -        _install_docker_parser = subparsers.add_parser( -            "install-docker", help="print docker installation commands" -        ) -        _update_blog_parser = subparsers.add_parser( -            "update-blog", help="print blog update command" -        ) - -    def _print_install_docker_commands(self) -> None: -        output = """ -### COMMAND: uninstall apt docker -for pkg in docker.io docker-doc docker-compose \ -podman-docker containerd runc; \ -do sudo apt-get remove $pkg; done - -### COMMAND: prepare apt certs -sudo apt-get update -sudo apt-get install ca-certificates curl -sudo install -m 0755 -d /etc/apt/keyrings - -### COMMAND: install certs -sudo curl -fsSL https://download.docker.com/linux/debian/gpg \ --o /etc/apt/keyrings/docker.asc -sudo chmod a+r /etc/apt/keyrings/docker.asc - -### COMMAND: add docker apt source -echo \\ -  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \ -https://download.docker.com/linux/debian \\ -  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\ -  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null - -### COMMAND: update apt and install docker -sudo apt-get update -sudo apt-get install docker-ce docker-ce-cli containerd.io \ -docker-buildx-plugin docker-compose-plugin - -### COMMAND: setup system for docker -sudo systemctl enable docker -sudo systemctl start docker -sudo groupadd -f docker -sudo usermod -aG docker $USER -# Remember to log out and log back in for the group changes to take effect -""".strip() -        print(output) - -    def _print_update_blog_command(self): -        output = """ -### COMMAND: update blog -docker exec -it blog /scripts/update.bash -""".strip() -        print(output) - -    def run_command(self, args): -        if args.gen_cli_command == "certbot": -            self.app.get_feature(NginxManager).print_all_certbot_commands(args.test) -        elif args.gen_cli_command == "install-docker": -            self._print_install_docker_commands() -        elif args.gen_cli_command == "update-blog": -            self._print_update_blog_command()
\ No newline at end of file diff --git a/tools/cru-py/cru/service/_nginx.py b/tools/cru-py/cru/service/_nginx.py deleted file mode 100644 index 6c77971..0000000 --- a/tools/cru-py/cru/service/_nginx.py +++ /dev/null @@ -1,268 +0,0 @@ -from argparse import Namespace -from enum import Enum, auto -import re -import subprocess -from typing import TypeAlias - -from cru import CruInternalError - -from ._base import AppCommandFeatureProvider -from ._config import ConfigManager -from ._template import TemplateManager - - -class CertbotAction(Enum): -    CREATE = auto() -    EXPAND = auto() -    SHRINK = auto() -    RENEW = auto() - - -class NginxManager(AppCommandFeatureProvider): -    CertbotAction: TypeAlias = CertbotAction - -    def __init__(self) -> None: -        super().__init__("nginx-manager") -        self._domains_cache: list[str] | None = None - -    def setup(self) -> None: -        pass - -    @property -    def _config_manager(self) -> ConfigManager: -        return self.app.get_feature(ConfigManager) - -    @property -    def root_domain(self) -> str: -        return self._config_manager.get_domain_value_str() - -    @property -    def domains(self) -> list[str]: -        if self._domains_cache is None: -            self._domains_cache = self._get_domains() -        return self._domains_cache - -    @property -    def subdomains(self) -> list[str]: -        suffix = "." + self.root_domain -        return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)] - -    @property -    def _domain_config_name(self) -> str: -        return self._config_manager.domain_item_name - -    def _get_domains_from_text(self, text: str) -> set[str]: -        domains: set[str] = set() -        regex = re.compile(r"server_name\s+(\S+)\s*;") -        for match in regex.finditer(text): -            domains.add(match[1]) -        return domains - -    def _join_generated_nginx_conf_text(self) -> str: -        text = "" -        template_manager = self.app.get_feature(TemplateManager) -        for nginx_conf in template_manager.generate(): -            text += nginx_conf[1] -        return text - -    def _get_domains(self) -> list[str]: -        text = self._join_generated_nginx_conf_text() -        domains = list(self._get_domains_from_text(text)) -        domains.remove(self.root_domain) -        return [self.root_domain, *domains] - -    def _print_domains(self) -> None: -        for domain in self.domains: -            print(domain) - -    def _certbot_command( -        self, -        action: CertbotAction | str, -        test: bool, -        *, -        docker=True, -        standalone=None, -        email=None, -        agree_tos=True, -    ) -> str: -        if isinstance(action, str): -            action = CertbotAction[action.upper()] - -        command_args = [] - -        add_domain_option = True -        if action is CertbotAction.CREATE: -            if standalone is None: -                standalone = True -            command_action = "certonly" -        elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]: -            if standalone is None: -                standalone = False -            command_action = "certonly" -        elif action is CertbotAction.RENEW: -            if standalone is None: -                standalone = False -            add_domain_option = False -            command_action = "renew" -        else: -            raise CruInternalError("Invalid certbot action.") - -        data_dir = self.app.data_dir.full_path.as_posix() - -        if not docker: -            command_args.append("certbot") -        else: -            command_args.extend( -                [ -                    "docker run -it --rm --name certbot", -                    f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"', -                    f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"', -                ] -            ) -            if standalone: -                command_args.append('-p "0.0.0.0:80:80"') -            else: -                command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"') - -            command_args.append("certbot/certbot") - -        command_args.append(command_action) - -        command_args.append(f"--cert-name {self.root_domain}") - -        if standalone: -            command_args.append("--standalone") -        else: -            command_args.append("--webroot -w /var/www/certbot") - -        if add_domain_option: -            command_args.append(" ".join([f"-d {domain}" for domain in self.domains])) - -        if email is not None: -            command_args.append(f"--email {email}") - -        if agree_tos: -            command_args.append("--agree-tos") - -        if test: -            command_args.append("--test-cert --dry-run") - -        return " ".join(command_args) - -    def print_all_certbot_commands(self, test: bool): -        print("### COMMAND: (standalone) create certs") -        print( -            self._certbot_command( -                CertbotAction.CREATE, -                test, -                email=self._config_manager.get_email_value_str_optional(), -            ) -        ) -        print() -        print("### COMMAND: (webroot+nginx) expand or shrink certs") -        print( -            self._certbot_command( -                CertbotAction.EXPAND, -                test, -                email=self._config_manager.get_email_value_str_optional(), -            ) -        ) -        print() -        print("### COMMAND: (webroot+nginx) renew certs") -        print( -            self._certbot_command( -                CertbotAction.RENEW, -                test, -                email=self._config_manager.get_email_value_str_optional(), -            ) -        ) - -    @property -    def _cert_path_str(self) -> str: -        return str( -            self.app.data_dir.full_path -            / "certbot/certs/live" -            / self.root_domain -            / "fullchain.pem" -        ) - -    def get_command_info(self): -        return "nginx", "Manage nginx related things." - -    def setup_arg_parser(self, arg_parser): -        subparsers = arg_parser.add_subparsers( -            dest="nginx_command", required=True, metavar="NGINX_COMMAND" -        ) -        _list_parser = subparsers.add_parser("list", help="list domains") -        certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") -        certbot_parser.add_argument( -            "--no-test", -            action="store_true", -            help="remove args making certbot run in test mode", -        ) - -    def run_command(self, args: Namespace) -> None: -        if args.nginx_command == "list": -            self._print_domains() -        elif args.nginx_command == "certbot": -            self.print_all_certbot_commands(not args.no_test) - -    def _generate_dns_zone( -        self, -        ip: str, -        /, -        ttl: str | int = 600, -        *, -        enable_mail: bool = True, -        dkim: str | None = None, -    ) -> str: -        # TODO: Not complete and test now. -        root_domain = self.root_domain -        result = f"$ORIGIN {root_domain}.\n\n" -        result += "; A records\n" -        result += f"@ {ttl} IN A {ip}\n" -        for subdomain in self.subdomains: -            result += f"{subdomain} {ttl} IN A {ip}\n" - -        if enable_mail: -            result += "\n; MX records\n" -            result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n" -            result += "\n; SPF record\n" -            result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n' -            if dkim is not None: -                result += "\n; DKIM record\n" -                result += f'mail._domainkey {ttl} IN TEXT "{dkim}"' -                result += "\n; DMARC record\n" -                dmarc_options = [ -                    "v=DMARC1", -                    "p=none", -                    f"rua=mailto:dmarc.report@{root_domain}", -                    f"ruf=mailto:dmarc.report@{root_domain}", -                    "sp=none", -                    "ri=86400", -                ] -                result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n' -        return result - -    def _get_dkim_from_mailserver(self) -> str | None: -        # TODO: Not complete and test now. -        dkim_path = ( -            self.app.data_dir.full_path -            / "dms/config/opendkim/keys" -            / self.root_domain -            / "mail.txt" -        ) -        if not dkim_path.exists(): -            return None - -        p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True) -        value = "" -        for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")): -            value += match.group(1) -        return value - -    def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str: -        # TODO: Not complete and test now. -        return self._generate_dns_zone( -            ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver() -        ) diff --git a/tools/cru-py/cru/service/_template.py b/tools/cru-py/cru/service/_template.py deleted file mode 100644 index 1381700..0000000 --- a/tools/cru-py/cru/service/_template.py +++ /dev/null @@ -1,90 +0,0 @@ -from argparse import Namespace -from pathlib import Path -import shutil - -from cru.template import TemplateTree, CruStrWrapperTemplate - -from ._base import AppCommandFeatureProvider, AppFeaturePath -from ._config import ConfigManager - - -class TemplateManager(AppCommandFeatureProvider): -    def __init__(self, prefix: str | None = None): -        super().__init__("template-manager") -        self._prefix = prefix or self.app.app_id.upper() - -    def setup(self) -> None: -        self._templates_dir = self.app.add_path("templates", True) -        self._generated_dir = self.app.add_path("generated", True) -        self._template_tree: TemplateTree[CruStrWrapperTemplate] | None = None - -    @property -    def prefix(self) -> str: -        return self._prefix - -    @property -    def templates_dir(self) -> AppFeaturePath: -        return self._templates_dir - -    @property -    def generated_dir(self) -> AppFeaturePath: -        return self._generated_dir - -    @property -    def template_tree(self) -> TemplateTree[CruStrWrapperTemplate]: -        if self._template_tree is None: -            return self.reload() -        return self._template_tree - -    def reload(self) -> TemplateTree: -        self._template_tree = TemplateTree( -            lambda text: CruStrWrapperTemplate(text), self.templates_dir.full_path_str -        ) -        return self._template_tree - -    def _print_file_lists(self) -> None: -        for path, template in self.template_tree.templates: -            print(f"[{template.variable_count}]", path.as_posix()) - -    def generate(self) -> list[tuple[Path, str]]: -        config_manager = self.app.get_feature(ConfigManager) -        return self.template_tree.generate(config_manager.get_str_dict()) - -    def _generate_files(self, dry_run: bool) -> None: -        config_manager = self.app.get_feature(ConfigManager) -        if not dry_run and self.generated_dir.full_path.exists(): -            shutil.rmtree(self.generated_dir.full_path) -        self.template_tree.generate_to( -            self.generated_dir.full_path_str, config_manager.get_str_dict(), dry_run -        ) - -    def get_command_info(self): -        return ("template", "Manage templates.") - -    def setup_arg_parser(self, arg_parser): -        subparsers = arg_parser.add_subparsers( -            dest="template_command", required=True, metavar="TEMPLATE_COMMAND" -        ) -        _list_parser = subparsers.add_parser("list", help="list templates") -        _variables_parser = subparsers.add_parser( -            "variables", help="list variables used in all templates" -        ) -        generate_parser = subparsers.add_parser("generate", help="generate templates") -        generate_parser.add_argument( -            "--no-dry-run", action="store_true", help="generate and write target files" -        ) - -    def run_command(self, args: Namespace) -> None: -        if args.template_command == "list": -            self._print_file_lists() -        elif args.template_command == "variables": -            for var in self.template_tree.variables: -                print(var) -        elif args.template_command == "generate": -            dry_run = not args.no_dry_run -            self._generate_files(dry_run) -            if dry_run: -                print("Dry run successfully.") -                print( -                    f"Will delete dir {self.generated_dir.full_path_str} if it exists." -                ) diff --git a/tools/cru-py/cru/system.py b/tools/cru-py/cru/system.py deleted file mode 100644 index f321717..0000000 --- a/tools/cru-py/cru/system.py +++ /dev/null @@ -1,23 +0,0 @@ -import os.path -import re - - -def check_debian_derivative_version(name: str) -> None | str: -    if not os.path.isfile("/etc/os-release"): -        return None -    with open("/etc/os-release", "r") as f: -        content = f.read() -        if f"ID={name}" not in content: -            return None -        m = re.search(r'VERSION_ID="(.+)"', content) -        if m is None: -            return None -        return m.group(1) - - -def check_ubuntu_version() -> None | str: -    return check_debian_derivative_version("ubuntu") - - -def check_debian_version() -> None | str: -    return check_debian_derivative_version("debian") diff --git a/tools/cru-py/cru/template.py b/tools/cru-py/cru/template.py deleted file mode 100644 index 35d68ac..0000000 --- a/tools/cru-py/cru/template.py +++ /dev/null @@ -1,207 +0,0 @@ -from abc import ABCMeta, abstractmethod -from collections.abc import Callable, Mapping -from pathlib import Path -from string import Template -from typing import Generic, TypeVar - -from ._iter import CruIterator -from ._error import CruException - -from .parsing import StrWrapperVarParser - - -class CruTemplateError(CruException): -    pass - - -class CruTemplateBase(metaclass=ABCMeta): -    def __init__(self, text: str): -        self._text = text -        self._variables: set[str] | None = None - -    @abstractmethod -    def _get_variables(self) -> set[str]: -        raise NotImplementedError() - -    @property -    def text(self) -> str: -        return self._text - -    @property -    def variables(self) -> set[str]: -        if self._variables is None: -            self._variables = self._get_variables() -        return self._variables - -    @property -    def variable_count(self) -> int: -        return len(self.variables) - -    @property -    def has_variables(self) -> bool: -        return self.variable_count > 0 - -    @abstractmethod -    def _do_generate(self, mapping: dict[str, str]) -> str: -        raise NotImplementedError() - -    def generate(self, mapping: Mapping[str, str], allow_extra: bool = True) -> str: -        values = dict(mapping) -        if not self.variables <= set(values.keys()): -            raise CruTemplateError("Missing variables.") -        if not allow_extra and not set(values.keys()) <= self.variables: -            raise CruTemplateError("Extra variables.") -        return self._do_generate(values) - - -class CruTemplate(CruTemplateBase): -    def __init__(self, prefix: str, text: str): -        super().__init__(text) -        self._prefix = prefix -        self._template = Template(text) - -    def _get_variables(self) -> set[str]: -        return ( -            CruIterator(self._template.get_identifiers()) -            .filter(lambda i: i.startswith(self.prefix)) -            .to_set() -        ) - -    @property -    def prefix(self) -> str: -        return self._prefix - -    @property -    def py_template(self) -> Template: -        return self._template - -    @property -    def all_variables(self) -> set[str]: -        return set(self._template.get_identifiers()) - -    def _do_generate(self, mapping: dict[str, str]) -> str: -        return self._template.safe_substitute(mapping) - - -class CruStrWrapperTemplate(CruTemplateBase): -    def __init__(self, text: str, wrapper: str = "@@"): -        super().__init__(text) -        self._wrapper = wrapper -        self._tokens: StrWrapperVarParser.Result - -    @property -    def wrapper(self) -> str: -        return self._wrapper - -    def _get_variables(self): -        self._tokens = StrWrapperVarParser(self.wrapper).parse(self.text) -        return ( -            self._tokens.cru_iter() -            .filter(lambda t: t.is_var) -            .map(lambda t: t.value) -            .to_set() -        ) - -    def _do_generate(self, mapping): -        return ( -            self._tokens.cru_iter() -            .map(lambda t: mapping[t.value] if t.is_var else t.value) -            .join_str("") -        ) - - -_Template = TypeVar("_Template", bound=CruTemplateBase) - - -class TemplateTree(Generic[_Template]): -    def __init__( -        self, -        template_generator: Callable[[str], _Template], -        source: str, -        *, -        template_file_suffix: str | None = ".template", -    ): -        """ -        If template_file_suffix is not None, the files will be checked according to the -        suffix of the file name. If the suffix matches, the file will be regarded as a -        template file. Otherwise, it will be regarded as a non-template file. -        Content of template file must contain variables that need to be replaced, while -        content of non-template file may not contain any variables. -        If either case is false, it generally means whether the file is a template is -        wrongly handled. -        """ -        self._template_generator = template_generator -        self._files: list[tuple[Path, _Template]] = [] -        self._source = source -        self._template_file_suffix = template_file_suffix -        self._load() - -    @property -    def templates(self) -> list[tuple[Path, _Template]]: -        return self._files - -    @property -    def source(self) -> str: -        return self._source - -    @property -    def template_file_suffix(self) -> str | None: -        return self._template_file_suffix - -    @staticmethod -    def _scan_files(root: str) -> list[Path]: -        root_path = Path(root) -        result: list[Path] = [] -        for path in root_path.glob("**/*"): -            if not path.is_file(): -                continue -            path = path.relative_to(root_path) -            result.append(Path(path)) -        return result - -    def _load(self) -> None: -        files = self._scan_files(self.source) -        for file_path in files: -            template_file = Path(self.source) / file_path -            with open(template_file, "r") as f: -                content = f.read() -            template = self._template_generator(content) -            if self.template_file_suffix is not None: -                should_be_template = file_path.name.endswith(self.template_file_suffix) -                if should_be_template and not template.has_variables: -                    raise CruTemplateError( -                        f"Template file {file_path} has no variables." -                    ) -                elif not should_be_template and template.has_variables: -                    raise CruTemplateError(f"Non-template {file_path} has variables.") -            self._files.append((file_path, template)) - -    @property -    def variables(self) -> set[str]: -        s = set() -        for _, template in self.templates: -            s.update(template.variables) -        return s - -    def generate(self, variables: Mapping[str, str]) -> list[tuple[Path, str]]: -        result: list[tuple[Path, str]] = [] -        for path, template in self.templates: -            if self.template_file_suffix is not None and path.name.endswith( -                self.template_file_suffix -            ): -                path = path.parent / (path.name[: -len(self.template_file_suffix)]) - -            text = template.generate(variables) -            result.append((path, text)) -        return result - -    def generate_to( -        self, destination: str, variables: Mapping[str, str], dry_run: bool -    ) -> None: -        generated = self.generate(variables) -        if not dry_run: -            for path, text in generated: -                des = Path(destination) / path -                des.parent.mkdir(parents=True, exist_ok=True) -                with open(des, "w") as f: -                    f.write(text) diff --git a/tools/cru-py/cru/tool.py b/tools/cru-py/cru/tool.py deleted file mode 100644 index 377f5d7..0000000 --- a/tools/cru-py/cru/tool.py +++ /dev/null @@ -1,82 +0,0 @@ -import shutil -import subprocess -from typing import Any -from collections.abc import Iterable - -from ._error import CruException - - -class CruExternalToolError(CruException): -    def __init__(self, message: str, tool: str, *args, **kwargs) -> None: -        super().__init__(message, *args, **kwargs) -        self._tool = tool - -    @property -    def tool(self) -> str: -        return self._tool - - -class CruExternalToolNotFoundError(CruExternalToolError): -    def __init__(self, message: str | None, tool: str, *args, **kwargs) -> None: -        super().__init__( -            message or f"Could not find binary for {tool}.", tool, *args, **kwargs -        ) - - -class CruExternalToolRunError(CruExternalToolError): -    def __init__( -        self, -        message: str, -        tool: str, -        tool_args: Iterable[str], -        tool_error: Any, -        *args, -        **kwargs, -    ) -> None: -        super().__init__(message, tool, *args, **kwargs) -        self._tool_args = list(tool_args) -        self._tool_error = tool_error - -    @property -    def tool_args(self) -> list[str]: -        return self._tool_args - -    @property -    def tool_error(self) -> Any: -        return self._tool_error - - -class ExternalTool: -    def __init__(self, bin: str) -> None: -        self._bin = bin - -    @property -    def bin(self) -> str: -        return self._bin - -    @bin.setter -    def bin(self, value: str) -> None: -        self._bin = value - -    @property -    def bin_path(self) -> str: -        real_bin = shutil.which(self.bin) -        if not real_bin: -            raise CruExternalToolNotFoundError(None, self.bin) -        return real_bin - -    def run( -        self, *process_args: str, **subprocess_kwargs -    ) -> subprocess.CompletedProcess: -        try: -            return subprocess.run( -                [self.bin_path] + list(process_args), **subprocess_kwargs -            ) -        except subprocess.CalledProcessError as e: -            raise CruExternalToolError("Subprocess failed.", self.bin) from e -        except OSError as e: -            raise CruExternalToolError("Failed to start subprocess", self.bin) from e - -    def run_get_output(self, *process_args: str, **subprocess_kwargs) -> Any: -        process = self.run(*process_args, capture_output=True, **subprocess_kwargs) -        return process.stdout diff --git a/tools/cru-py/cru/value.py b/tools/cru-py/cru/value.py deleted file mode 100644 index 9c03219..0000000 --- a/tools/cru-py/cru/value.py +++ /dev/null @@ -1,292 +0,0 @@ -from __future__ import annotations - -import random -import secrets -import string -import uuid -from abc import abstractmethod, ABCMeta -from collections.abc import Callable -from typing import Any, ClassVar, TypeVar, Generic - -from ._error import CruException - - -def _str_case_in(s: str, case: bool, str_list: list[str]) -> bool: -    if case: -        return s in str_list -    else: -        return s.lower() in [s.lower() for s in str_list] - - -_T = TypeVar("_T") - - -class CruValueTypeError(CruException): -    def __init__( -        self, -        message: str, -        value: Any, -        value_type: ValueType | None, -        *args, -        **kwargs, -    ): -        super().__init__( -            message, -            *args, -            **kwargs, -        ) -        self._value = value -        self._value_type = value_type - -    @property -    def value(self) -> Any: -        return self._value - -    @property -    def value_type(self) -> ValueType | None: -        return self._value_type - - -class ValueType(Generic[_T], metaclass=ABCMeta): -    def __init__(self, name: str, _type: type[_T]) -> None: -        self._name = name -        self._type = _type - -    @property -    def name(self) -> str: -        return self._name - -    @property -    def type(self) -> type[_T]: -        return self._type - -    def check_value_type(self, value: Any) -> None: -        if not isinstance(value, self.type): -            raise CruValueTypeError("Type of value is wrong.", value, self) - -    def _do_check_value(self, value: Any) -> _T: -        return value - -    def check_value(self, value: Any) -> _T: -        self.check_value_type(value) -        return self._do_check_value(value) - -    @abstractmethod -    def _do_check_str_format(self, s: str) -> None: -        raise NotImplementedError() - -    def check_str_format(self, s: str) -> None: -        if not isinstance(s, str): -            raise CruValueTypeError("Try to check format on a non-str.", s, self) -        self._do_check_str_format(s) - -    @abstractmethod -    def _do_convert_value_to_str(self, value: _T) -> str: -        raise NotImplementedError() - -    def convert_value_to_str(self, value: _T) -> str: -        self.check_value(value) -        return self._do_convert_value_to_str(value) - -    @abstractmethod -    def _do_convert_str_to_value(self, s: str) -> _T: -        raise NotImplementedError() - -    def convert_str_to_value(self, s: str) -> _T: -        self.check_str_format(s) -        return self._do_convert_str_to_value(s) - -    def check_value_or_try_convert_from_str(self, value_or_str: Any) -> _T: -        try: -            return self.check_value(value_or_str) -        except CruValueTypeError: -            if isinstance(value_or_str, str): -                return self.convert_str_to_value(value_or_str) -            else: -                raise - -    def create_default_value(self) -> _T: -        return self.type() - - -class TextValueType(ValueType[str]): -    def __init__(self) -> None: -        super().__init__("text", str) - -    def _do_check_str_format(self, _s): -        return - -    def _do_convert_value_to_str(self, value): -        return value - -    def _do_convert_str_to_value(self, s): -        return s - - -class IntegerValueType(ValueType[int]): -    def __init__(self) -> None: -        super().__init__("integer", int) - -    def _do_check_str_format(self, s): -        try: -            int(s) -        except ValueError as e: -            raise CruValueTypeError("Invalid integer format.", s, self) from e - -    def _do_convert_value_to_str(self, value): -        return str(value) - -    def _do_convert_str_to_value(self, s): -        return int(s) - - -class FloatValueType(ValueType[float]): -    def __init__(self) -> None: -        super().__init__("float", float) - -    def _do_check_str_format(self, s): -        try: -            float(s) -        except ValueError as e: -            raise CruValueTypeError("Invalid float format.", s, self) from e - -    def _do_convert_value_to_str(self, value): -        return str(value) - -    def _do_convert_str_to_value(self, s): -        return float(s) - - -class BooleanValueType(ValueType[bool]): -    DEFAULT_TRUE_LIST: ClassVar[list[str]] = ["true", "yes", "y", "on", "1"] -    DEFAULT_FALSE_LIST: ClassVar[list[str]] = ["false", "no", "n", "off", "0"] - -    def __init__( -        self, -        *, -        case_sensitive=False, -        true_list: None | list[str] = None, -        false_list: None | list[str] = None, -    ) -> None: -        super().__init__("boolean", bool) -        self._case_sensitive = case_sensitive -        self._valid_true_strs: list[str] = ( -            true_list or BooleanValueType.DEFAULT_TRUE_LIST -        ) -        self._valid_false_strs: list[str] = ( -            false_list or BooleanValueType.DEFAULT_FALSE_LIST -        ) - -    @property -    def case_sensitive(self) -> bool: -        return self._case_sensitive - -    @property -    def valid_true_strs(self) -> list[str]: -        return self._valid_true_strs - -    @property -    def valid_false_strs(self) -> list[str]: -        return self._valid_false_strs - -    @property -    def valid_boolean_strs(self) -> list[str]: -        return self._valid_true_strs + self._valid_false_strs - -    def _do_check_str_format(self, s): -        if not _str_case_in(s, self.case_sensitive, self.valid_boolean_strs): -            raise CruValueTypeError("Invalid boolean format.", s, self) - -    def _do_convert_value_to_str(self, value): -        return self._valid_true_strs[0] if value else self._valid_false_strs[0] - -    def _do_convert_str_to_value(self, s): -        return _str_case_in(s, self.case_sensitive, self._valid_true_strs) - -    def create_default_value(self): -        return self.valid_false_strs[0] - - -class EnumValueType(ValueType[str]): -    def __init__(self, valid_values: list[str], /, case_sensitive=False) -> None: -        super().__init__(f"enum({'|'.join(valid_values)})", str) -        self._case_sensitive = case_sensitive -        self._valid_values = valid_values - -    @property -    def case_sensitive(self) -> bool: -        return self._case_sensitive - -    @property -    def valid_values(self) -> list[str]: -        return self._valid_values - -    def _do_check_value(self, value): -        self._do_check_str_format(value) - -    def _do_check_str_format(self, s): -        if not _str_case_in(s, self.case_sensitive, self.valid_values): -            raise CruValueTypeError("Invalid enum value", s, self) - -    def _do_convert_value_to_str(self, value): -        return value - -    def _do_convert_str_to_value(self, s): -        return s - -    def create_default_value(self): -        return self.valid_values[0] - - -TEXT_VALUE_TYPE = TextValueType() -INTEGER_VALUE_TYPE = IntegerValueType() -BOOLEAN_VALUE_TYPE = BooleanValueType() - - -class ValueGeneratorBase(Generic[_T], metaclass=ABCMeta): -    @abstractmethod -    def generate(self) -> _T: -        raise NotImplementedError() - -    def __call__(self) -> _T: -        return self.generate() - - -class ValueGenerator(ValueGeneratorBase[_T]): -    def __init__(self, generate_func: Callable[[], _T]) -> None: -        self._generate_func = generate_func - -    @property -    def generate_func(self) -> Callable[[], _T]: -        return self._generate_func - -    def generate(self) -> _T: -        return self._generate_func() - - -class UuidValueGenerator(ValueGeneratorBase[str]): -    def generate(self): -        return str(uuid.uuid4()) - - -class RandomStringValueGenerator(ValueGeneratorBase[str]): -    def __init__(self, length: int, secure: bool) -> None: -        self._length = length -        self._secure = secure - -    @property -    def length(self) -> int: -        return self._length - -    @property -    def secure(self) -> bool: -        return self._secure - -    def generate(self): -        random_func = secrets.choice if self._secure else random.choice -        characters = string.ascii_letters + string.digits -        random_string = "".join(random_func(characters) for _ in range(self._length)) -        return random_string - - -UUID_VALUE_GENERATOR = UuidValueGenerator() diff --git a/tools/cru-py/poetry.lock b/tools/cru-py/poetry.lock deleted file mode 100644 index 4338200..0000000 --- a/tools/cru-py/poetry.lock +++ /dev/null @@ -1,111 +0,0 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. - -[[package]] -name = "mypy" -version = "1.15.0" -description = "Optional static typing for Python" -optional = false -python-versions = ">=3.9" -groups = ["dev"] -files = [ -    {file = "mypy-1.15.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:979e4e1a006511dacf628e36fadfecbcc0160a8af6ca7dad2f5025529e082c13"}, -    {file = "mypy-1.15.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c4bb0e1bd29f7d34efcccd71cf733580191e9a264a2202b0239da95984c5b559"}, -    {file = "mypy-1.15.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:be68172e9fd9ad8fb876c6389f16d1c1b5f100ffa779f77b1fb2176fcc9ab95b"}, -    {file = "mypy-1.15.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c7be1e46525adfa0d97681432ee9fcd61a3964c2446795714699a998d193f1a3"}, -    {file = "mypy-1.15.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:2e2c2e6d3593f6451b18588848e66260ff62ccca522dd231cd4dd59b0160668b"}, -    {file = "mypy-1.15.0-cp310-cp310-win_amd64.whl", hash = "sha256:6983aae8b2f653e098edb77f893f7b6aca69f6cffb19b2cc7443f23cce5f4828"}, -    {file = "mypy-1.15.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:2922d42e16d6de288022e5ca321cd0618b238cfc5570e0263e5ba0a77dbef56f"}, -    {file = "mypy-1.15.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2ee2d57e01a7c35de00f4634ba1bbf015185b219e4dc5909e281016df43f5ee5"}, -    {file = "mypy-1.15.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:973500e0774b85d9689715feeffcc980193086551110fd678ebe1f4342fb7c5e"}, -    {file = "mypy-1.15.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5a95fb17c13e29d2d5195869262f8125dfdb5c134dc8d9a9d0aecf7525b10c2c"}, -    {file = "mypy-1.15.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:1905f494bfd7d85a23a88c5d97840888a7bd516545fc5aaedff0267e0bb54e2f"}, -    {file = "mypy-1.15.0-cp311-cp311-win_amd64.whl", hash = "sha256:c9817fa23833ff189db061e6d2eff49b2f3b6ed9856b4a0a73046e41932d744f"}, -    {file = "mypy-1.15.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:aea39e0583d05124836ea645f412e88a5c7d0fd77a6d694b60d9b6b2d9f184fd"}, -    {file = "mypy-1.15.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:2f2147ab812b75e5b5499b01ade1f4a81489a147c01585cda36019102538615f"}, -    {file = "mypy-1.15.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ce436f4c6d218a070048ed6a44c0bbb10cd2cc5e272b29e7845f6a2f57ee4464"}, -    {file = "mypy-1.15.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8023ff13985661b50a5928fc7a5ca15f3d1affb41e5f0a9952cb68ef090b31ee"}, -    {file = "mypy-1.15.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1124a18bc11a6a62887e3e137f37f53fbae476dc36c185d549d4f837a2a6a14e"}, -    {file = "mypy-1.15.0-cp312-cp312-win_amd64.whl", hash = "sha256:171a9ca9a40cd1843abeca0e405bc1940cd9b305eaeea2dda769ba096932bb22"}, -    {file = "mypy-1.15.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:93faf3fdb04768d44bf28693293f3904bbb555d076b781ad2530214ee53e3445"}, -    {file = "mypy-1.15.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:811aeccadfb730024c5d3e326b2fbe9249bb7413553f15499a4050f7c30e801d"}, -    {file = "mypy-1.15.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:98b7b9b9aedb65fe628c62a6dc57f6d5088ef2dfca37903a7d9ee374d03acca5"}, -    {file = "mypy-1.15.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c43a7682e24b4f576d93072216bf56eeff70d9140241f9edec0c104d0c515036"}, -    {file = "mypy-1.15.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:baefc32840a9f00babd83251560e0ae1573e2f9d1b067719479bfb0e987c6357"}, -    {file = "mypy-1.15.0-cp313-cp313-win_amd64.whl", hash = "sha256:b9378e2c00146c44793c98b8d5a61039a048e31f429fb0eb546d93f4b000bedf"}, -    {file = "mypy-1.15.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:e601a7fa172c2131bff456bb3ee08a88360760d0d2f8cbd7a75a65497e2df078"}, -    {file = "mypy-1.15.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:712e962a6357634fef20412699a3655c610110e01cdaa6180acec7fc9f8513ba"}, -    {file = "mypy-1.15.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f95579473af29ab73a10bada2f9722856792a36ec5af5399b653aa28360290a5"}, -    {file = "mypy-1.15.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8f8722560a14cde92fdb1e31597760dc35f9f5524cce17836c0d22841830fd5b"}, -    {file = "mypy-1.15.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:1fbb8da62dc352133d7d7ca90ed2fb0e9d42bb1a32724c287d3c76c58cbaa9c2"}, -    {file = "mypy-1.15.0-cp39-cp39-win_amd64.whl", hash = "sha256:d10d994b41fb3497719bbf866f227b3489048ea4bbbb5015357db306249f7980"}, -    {file = "mypy-1.15.0-py3-none-any.whl", hash = "sha256:5469affef548bd1895d86d3bf10ce2b44e33d86923c29e4d675b3e323437ea3e"}, -    {file = "mypy-1.15.0.tar.gz", hash = "sha256:404534629d51d3efea5c800ee7c42b72a6554d6c400e6a79eafe15d11341fd43"}, -] - -[package.dependencies] -mypy_extensions = ">=1.0.0" -typing_extensions = ">=4.6.0" - -[package.extras] -dmypy = ["psutil (>=4.0)"] -faster-cache = ["orjson"] -install-types = ["pip"] -mypyc = ["setuptools (>=50)"] -reports = ["lxml"] - -[[package]] -name = "mypy-extensions" -version = "1.0.0" -description = "Type system extensions for programs checked with the mypy type checker." -optional = false -python-versions = ">=3.5" -groups = ["dev"] -files = [ -    {file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"}, -    {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, -] - -[[package]] -name = "ruff" -version = "0.9.6" -description = "An extremely fast Python linter and code formatter, written in Rust." -optional = false -python-versions = ">=3.7" -groups = ["dev"] -files = [ -    {file = "ruff-0.9.6-py3-none-linux_armv6l.whl", hash = "sha256:2f218f356dd2d995839f1941322ff021c72a492c470f0b26a34f844c29cdf5ba"}, -    {file = "ruff-0.9.6-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b908ff4df65dad7b251c9968a2e4560836d8f5487c2f0cc238321ed951ea0504"}, -    {file = "ruff-0.9.6-py3-none-macosx_11_0_arm64.whl", hash = "sha256:b109c0ad2ececf42e75fa99dc4043ff72a357436bb171900714a9ea581ddef83"}, -    {file = "ruff-0.9.6-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1de4367cca3dac99bcbd15c161404e849bb0bfd543664db39232648dc00112dc"}, -    {file = "ruff-0.9.6-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ac3ee4d7c2c92ddfdaedf0bf31b2b176fa7aa8950efc454628d477394d35638b"}, -    {file = "ruff-0.9.6-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5dc1edd1775270e6aa2386119aea692039781429f0be1e0949ea5884e011aa8e"}, -    {file = "ruff-0.9.6-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:4a091729086dffa4bd070aa5dab7e39cc6b9d62eb2bef8f3d91172d30d599666"}, -    {file = "ruff-0.9.6-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d1bbc6808bf7b15796cef0815e1dfb796fbd383e7dbd4334709642649625e7c5"}, -    {file = "ruff-0.9.6-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:589d1d9f25b5754ff230dce914a174a7c951a85a4e9270613a2b74231fdac2f5"}, -    {file = "ruff-0.9.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc61dd5131742e21103fbbdcad683a8813be0e3c204472d520d9a5021ca8b217"}, -    {file = "ruff-0.9.6-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5e2d9126161d0357e5c8f30b0bd6168d2c3872372f14481136d13de9937f79b6"}, -    {file = "ruff-0.9.6-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:68660eab1a8e65babb5229a1f97b46e3120923757a68b5413d8561f8a85d4897"}, -    {file = "ruff-0.9.6-py3-none-musllinux_1_2_i686.whl", hash = "sha256:c4cae6c4cc7b9b4017c71114115db0445b00a16de3bcde0946273e8392856f08"}, -    {file = "ruff-0.9.6-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:19f505b643228b417c1111a2a536424ddde0db4ef9023b9e04a46ed8a1cb4656"}, -    {file = "ruff-0.9.6-py3-none-win32.whl", hash = "sha256:194d8402bceef1b31164909540a597e0d913c0e4952015a5b40e28c146121b5d"}, -    {file = "ruff-0.9.6-py3-none-win_amd64.whl", hash = "sha256:03482d5c09d90d4ee3f40d97578423698ad895c87314c4de39ed2af945633caa"}, -    {file = "ruff-0.9.6-py3-none-win_arm64.whl", hash = "sha256:0e2bb706a2be7ddfea4a4af918562fdc1bcb16df255e5fa595bbd800ce322a5a"}, -    {file = "ruff-0.9.6.tar.gz", hash = "sha256:81761592f72b620ec8fa1068a6fd00e98a5ebee342a3642efd84454f3031dca9"}, -] - -[[package]] -name = "typing-extensions" -version = "4.12.2" -description = "Backported and Experimental Type Hints for Python 3.8+" -optional = false -python-versions = ">=3.8" -groups = ["dev"] -files = [ -    {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, -    {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, -] - -[metadata] -lock-version = "2.1" -python-versions = "^3.11" -content-hash = "674a21dbda993a1ee761e2e6e2f13ccece8289336a83fd0a154285eac48f3a76" diff --git a/tools/cru-py/pyproject.toml b/tools/cru-py/pyproject.toml deleted file mode 100644 index 0ce2c60..0000000 --- a/tools/cru-py/pyproject.toml +++ /dev/null @@ -1,27 +0,0 @@ -[project] -name = "cru-py" -version = "0.1.0" -requires-python = ">=3.11" - -[tool.poetry] -package-mode = false -name = "cru" -version = "0.1.0" -description = "" -authors = ["Yuqian Yang <crupest@crupest.life>"] -license = "MIT" -readme = "README.md" - -[tool.poetry.dependencies] -python = "^3.11" - -[tool.poetry.group.dev.dependencies] -mypy = "^1.13.0" -ruff = "^0.9.6" - -[tool.ruff.lint] -select = ["E", "F", "B"] - -[build-system] -requires = ["poetry-core"] -build-backend = "poetry.core.masonry.api" diff --git a/tools/cru-py/www-dev b/tools/cru-py/www-dev deleted file mode 100644 index f56d679..0000000 --- a/tools/cru-py/www-dev +++ /dev/null @@ -1,8 +0,0 @@ -#! /usr/bin/env sh - -set -e - -cd "$(dirname "$0")/../.." - -exec tmux new-session 'cd docker/crupest-nginx/sites/www && pnpm start' \; \ -    split-window -h 'cd docker/crupest-api/CrupestApi/CrupestApi && dotnet run --launch-profile dev' | 
