diff options
author | Yuqian Yang <crupest@crupest.life> | 2025-02-23 16:40:32 +0800 |
---|---|---|
committer | Yuqian Yang <crupest@crupest.life> | 2025-02-23 16:40:32 +0800 |
commit | 90868bf85dc295f70620dbcbd5790999fe239550 (patch) | |
tree | 08c0f73597a751acff14a4d224446e87b2d8775d /python | |
parent | 1e9b2436eaffa4130f6a69c3a108f6feb9dd4ac8 (diff) | |
download | crupest-90868bf85dc295f70620dbcbd5790999fe239550.tar.gz crupest-90868bf85dc295f70620dbcbd5790999fe239550.tar.bz2 crupest-90868bf85dc295f70620dbcbd5790999fe239550.zip |
feat(python): move python codes.
Diffstat (limited to 'python')
29 files changed, 4064 insertions, 0 deletions
diff --git a/python/.gitignore b/python/.gitignore new file mode 100644 index 0000000..f5833b1 --- /dev/null +++ b/python/.gitignore @@ -0,0 +1,3 @@ +__pycache__ +.venv +.mypy_cache diff --git a/python/.python-version b/python/.python-version new file mode 100644 index 0000000..2c07333 --- /dev/null +++ b/python/.python-version @@ -0,0 +1 @@ +3.11 diff --git a/python/cru/__init__.py b/python/cru/__init__.py new file mode 100644 index 0000000..17799a9 --- /dev/null +++ b/python/cru/__init__.py @@ -0,0 +1,60 @@ +import sys + +from ._base import CRU, CruNamespaceError, CRU_NAME_PREFIXES +from ._error import ( + CruException, + CruLogicError, + CruInternalError, + CruUnreachableError, + cru_unreachable, +) +from ._const import ( + CruConstantBase, + CruDontChange, + CruNotFound, + CruNoValue, + CruPlaceholder, + CruUseDefault, +) +from ._func import CruFunction +from ._iter import CruIterable, CruIterator +from ._event import CruEvent, CruEventHandlerToken +from ._type import CruTypeSet, CruTypeCheckError + + +class CruInitError(CruException): + pass + + +def check_python_version(required_version=(3, 11)): + if sys.version_info < required_version: + raise CruInitError(f"Python version must be >= {required_version}!") + + +check_python_version() + +__all__ = [ + "CRU", + "CruNamespaceError", + "CRU_NAME_PREFIXES", + "check_python_version", + "CruException", + "CruInternalError", + "CruLogicError", + "CruUnreachableError", + "cru_unreachable", + "CruInitError", + "CruConstantBase", + "CruDontChange", + "CruNotFound", + "CruNoValue", + "CruPlaceholder", + "CruUseDefault", + "CruFunction", + "CruIterable", + "CruIterator", + "CruEvent", + "CruEventHandlerToken", + "CruTypeSet", + "CruTypeCheckError", +] diff --git a/python/cru/_base.py b/python/cru/_base.py new file mode 100644 index 0000000..2599d8f --- /dev/null +++ b/python/cru/_base.py @@ -0,0 +1,101 @@ +from typing import Any + +from ._helper import remove_none +from ._error import CruException + + +class CruNamespaceError(CruException): + """Raised when a namespace is not found.""" + + +class _Cru: + NAME_PREFIXES = ("CRU_", "Cru", "cru_") + + def __init__(self) -> None: + self._d: dict[str, Any] = {} + + def all_names(self) -> list[str]: + return list(self._d.keys()) + + def get(self, name: str) -> Any: + return self._d[name] + + def has_name(self, name: str) -> bool: + return name in self._d + + @staticmethod + def _maybe_remove_prefix(name: str) -> str | None: + for prefix in _Cru.NAME_PREFIXES: + if name.startswith(prefix): + return name[len(prefix) :] + return None + + def _check_name_exist(self, *names: str | None) -> None: + for name in names: + if name is None: + continue + if self.has_name(name): + raise CruNamespaceError(f"Name {name} exists in CRU.") + + @staticmethod + def check_name_format(name: str) -> tuple[str, str]: + no_prefix_name = _Cru._maybe_remove_prefix(name) + if no_prefix_name is None: + raise CruNamespaceError( + f"Name {name} is not prefixed with any of {_Cru.NAME_PREFIXES}." + ) + return name, no_prefix_name + + @staticmethod + def _check_object_name(o) -> tuple[str, str]: + return _Cru.check_name_format(o.__name__) + + def _do_add(self, o, *names: str | None) -> list[str]: + name_list: list[str] = remove_none(names) + for name in name_list: + self._d[name] = o + return name_list + + def add(self, o, name: str | None) -> tuple[str, str | None]: + no_prefix_name: str | None + if name is None: + name, no_prefix_name = self._check_object_name(o) + else: + no_prefix_name = self._maybe_remove_prefix(name) + + self._check_name_exist(name, no_prefix_name) + self._do_add(o, name, no_prefix_name) + return name, no_prefix_name + + def add_with_alias(self, o, name: str | None = None, *aliases: str) -> list[str]: + final_names: list[str | None] = [] + no_prefix_name: str | None + if name is None: + name, no_prefix_name = self._check_object_name(o) + self._check_name_exist(name, no_prefix_name) + final_names.extend([name, no_prefix_name]) + for alias in aliases: + no_prefix_name = self._maybe_remove_prefix(alias) + self._check_name_exist(alias, no_prefix_name) + final_names.extend([alias, no_prefix_name]) + + return self._do_add(o, *final_names) + + def add_objects(self, *objects): + final_list = [] + for o in objects: + name, no_prefix_name = self._check_object_name(o) + self._check_name_exist(name, no_prefix_name) + final_list.append((o, name, no_prefix_name)) + for o, name, no_prefix_name in final_list: + self._do_add(o, name, no_prefix_name) + + def __getitem__(self, item): + return self.get(item) + + def __getattr__(self, item): + return self.get(item) + + +CRU_NAME_PREFIXES = _Cru.NAME_PREFIXES +CRU = _Cru() diff --git a/python/cru/_const.py b/python/cru/_const.py new file mode 100644 index 0000000..8246b35 --- /dev/null +++ b/python/cru/_const.py @@ -0,0 +1,49 @@ +from enum import Enum, auto +from typing import Self, TypeGuard, TypeVar + +from ._base import CRU + +_T = TypeVar("_T") + + +class CruConstantBase(Enum): + @classmethod + def check(cls, v: _T | Self) -> TypeGuard[Self]: + return isinstance(v, cls) + + @classmethod + def check_not(cls, v: _T | Self) -> TypeGuard[_T]: + return not cls.check(v) + + @classmethod + def value(cls) -> Self: + return cls.VALUE # type: ignore + + +class CruNotFound(CruConstantBase): + VALUE = auto() + + +class CruUseDefault(CruConstantBase): + VALUE = auto() + + +class CruDontChange(CruConstantBase): + VALUE = auto() + + +class CruNoValue(CruConstantBase): + VALUE = auto() + + +class CruPlaceholder(CruConstantBase): + VALUE = auto() + + +CRU.add_objects( + CruNotFound, + CruUseDefault, + CruDontChange, + CruNoValue, + CruPlaceholder, +) diff --git a/python/cru/_decorator.py b/python/cru/_decorator.py new file mode 100644 index 0000000..137fc05 --- /dev/null +++ b/python/cru/_decorator.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import ( + Concatenate, + Generic, + ParamSpec, + TypeVar, + cast, +) + +from ._base import CRU + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_O = TypeVar("_O") +_R = TypeVar("_R") + + +class CruDecorator: + + class ConvertResult(Generic[_T, _O]): + def __init__( + self, + converter: Callable[[_T], _O], + ) -> None: + self.converter = converter + + def __call__(self, origin: Callable[_P, _T]) -> Callable[_P, _O]: + converter = self.converter + + def real_impl(*args: _P.args, **kwargs: _P.kwargs) -> _O: + return converter(origin(*args, **kwargs)) + + return real_impl + + class ImplementedBy(Generic[_T, _O, _P, _R]): + def __init__( + self, + impl: Callable[Concatenate[_O, _P], _R], + converter: Callable[[_T], _O], + ) -> None: + self.impl = impl + self.converter = converter + + def __call__( + self, _origin: Callable[[_T], None] + ) -> Callable[Concatenate[_T, _P], _R]: + converter = self.converter + impl = self.impl + + def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: + return cast(Callable[Concatenate[_O, _P], _R], impl)( + converter(_self), *args, **kwargs + ) + + return real_impl + + @staticmethod + def create_factory(converter: Callable[[_T], _O]) -> Callable[ + [Callable[Concatenate[_O, _P], _R]], + CruDecorator.ImplementedBy[_T, _O, _P, _R], + ]: + def create( + m: Callable[Concatenate[_O, _P], _R], + ) -> CruDecorator.ImplementedBy[_T, _O, _P, _R]: + return CruDecorator.ImplementedBy(m, converter) + + return create + + class ImplementedByNoSelf(Generic[_P, _R]): + def __init__(self, impl: Callable[_P, _R]) -> None: + self.impl = impl + + def __call__( + self, _origin: Callable[[_T], None] + ) -> Callable[Concatenate[_T, _P], _R]: + impl = self.impl + + def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: + return cast(Callable[_P, _R], impl)(*args, **kwargs) + + return real_impl + + @staticmethod + def create_factory() -> ( + Callable[[Callable[_P, _R]], CruDecorator.ImplementedByNoSelf[_P, _R]] + ): + def create( + m: Callable[_P, _R], + ) -> CruDecorator.ImplementedByNoSelf[_P, _R]: + return CruDecorator.ImplementedByNoSelf(m) + + return create + + +CRU.add_objects(CruDecorator) diff --git a/python/cru/_error.py b/python/cru/_error.py new file mode 100644 index 0000000..e53c787 --- /dev/null +++ b/python/cru/_error.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +from typing import NoReturn, cast, overload + + +class CruException(Exception): + """Base exception class of all exceptions in cru.""" + + @overload + def __init__( + self, + message: None = None, + *args, + user_message: str, + **kwargs, + ): ... + + @overload + def __init__( + self, + message: str, + *args, + user_message: str | None = None, + **kwargs, + ): ... + + def __init__( + self, + message: str | None = None, + *args, + user_message: str | None = None, + **kwargs, + ): + if message is None: + message = user_message + + super().__init__( + message, + *args, + **kwargs, + ) + self._message: str + self._message = cast(str, message) + self._user_message = user_message + + @property + def message(self) -> str: + return self._message + + def get_user_message(self) -> str | None: + return self._user_message + + def get_message(self, use_user: bool = True) -> str: + if use_user and self._user_message is not None: + return self._user_message + else: + return self._message + + @property + def is_internal(self) -> bool: + return False + + @property + def is_logic_error(self) -> bool: + return False + + +class CruLogicError(CruException): + """Raised when a logic error occurs.""" + + @property + def is_logic_error(self) -> bool: + return True + + +class CruInternalError(CruException): + """Raised when an internal error occurs.""" + + @property + def is_internal(self) -> bool: + return True + + +class CruUnreachableError(CruInternalError): + """Raised when a code path is unreachable.""" + + +def cru_unreachable() -> NoReturn: + raise CruUnreachableError("Code should not reach here!") diff --git a/python/cru/_event.py b/python/cru/_event.py new file mode 100644 index 0000000..51a794c --- /dev/null +++ b/python/cru/_event.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import Generic, ParamSpec, TypeVar + +from .list import CruList + +_P = ParamSpec("_P") +_R = TypeVar("_R") + + +class CruEventHandlerToken(Generic[_P, _R]): + def __init__( + self, event: CruEvent, handler: Callable[_P, _R], once: bool = False + ) -> None: + self._event = event + self._handler = handler + self._once = once + + @property + def event(self) -> CruEvent: + return self._event + + @property + def handler(self) -> Callable[_P, _R]: + return self._handler + + @property + def once(self) -> bool: + return self._once + + +class CruEvent(Generic[_P, _R]): + def __init__(self, name: str) -> None: + self._name = name + self._tokens: CruList[CruEventHandlerToken] = CruList() + + def register( + self, handler: Callable[_P, _R], once: bool = False + ) -> CruEventHandlerToken: + token = CruEventHandlerToken(self, handler, once) + self._tokens.append(token) + return token + + def unregister(self, *handlers: CruEventHandlerToken | Callable[_P, _R]) -> int: + old_length = len(self._tokens) + self._tokens.reset( + self._tokens.as_cru_iterator().filter( + (lambda t: t in handlers or t.handler in handlers) + ) + ) + return old_length - len(self._tokens) + + def trigger(self, *args: _P.args, **kwargs: _P.kwargs) -> CruList[_R]: + results = CruList( + self._tokens.as_cru_iterator() + .transform(lambda t: t.handler(*args, **kwargs)) + .to_list() + ) + self._tokens.reset(self._tokens.as_cru_iterator().filter(lambda t: not t.once)) + return results diff --git a/python/cru/_func.py b/python/cru/_func.py new file mode 100644 index 0000000..fc57802 --- /dev/null +++ b/python/cru/_func.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +from collections.abc import Callable, Iterable +from enum import Flag, auto +from typing import ( + Any, + Generic, + Literal, + ParamSpec, + TypeAlias, + TypeVar, +) + + +from ._base import CRU +from ._const import CruPlaceholder + +_P = ParamSpec("_P") +_P1 = ParamSpec("_P1") +_T = TypeVar("_T") + + +class _Dec: + @staticmethod + def wrap( + origin: Callable[_P, Callable[_P1, _T]] + ) -> Callable[_P, _Wrapper[_P1, _T]]: + def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_P1, _T]: + return _Wrapper(origin(*args, **kwargs)) + + return _wrapped + + +class _RawBase: + @staticmethod + def none(*_v, **_kwargs) -> None: + return None + + @staticmethod + def true(*_v, **_kwargs) -> Literal[True]: + return True + + @staticmethod + def false(*_v, **_kwargs) -> Literal[False]: + return False + + @staticmethod + def identity(v: _T) -> _T: + return v + + @staticmethod + def only_you(v: _T, *_v, **_kwargs) -> _T: + return v + + @staticmethod + def equal(a: Any, b: Any) -> bool: + return a == b + + @staticmethod + def not_equal(a: Any, b: Any) -> bool: + return a != b + + @staticmethod + def not_(v: Any) -> Any: + return not v + + +class _Wrapper(Generic[_P, _T]): + def __init__(self, f: Callable[_P, _T]): + self._f = f + + @property + def me(self) -> Callable[_P, _T]: + return self._f + + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: + return self._f(*args, **kwargs) + + @_Dec.wrap + def bind(self, *bind_args, **bind_kwargs) -> Callable[..., _T]: + func = self.me + + def bound_func(*args, **kwargs): + popped = 0 + real_args = [] + for arg in bind_args: + if CruPlaceholder.check(arg): + real_args.append(args[popped]) + popped += 1 + else: + real_args.append(arg) + real_args.extend(args[popped:]) + return func(*real_args, **(bind_kwargs | kwargs)) + + return bound_func + + class ChainMode(Flag): + ARGS = auto() + KWARGS = auto() + BOTH = ARGS | KWARGS + + ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]] + KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]] + ChainableCallable: TypeAlias = Callable[ + ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]] + ] + + @_Dec.wrap + def chain_with_args( + self, funcs: Iterable[ArgsChainableCallable], *bind_args, **bind_kwargs + ) -> ArgsChainableCallable: + def chained_func(*args): + args = self.bind(*bind_args, **bind_kwargs)(*args) + + for func in funcs: + args = _Wrapper(func).bind(*bind_args, **bind_kwargs)(*args) + return args + + return chained_func + + @_Dec.wrap + def chain_with_kwargs( + self, funcs: Iterable[KwargsChainableCallable], *bind_args, **bind_kwargs + ) -> KwargsChainableCallable: + def chained_func(**kwargs): + kwargs = self.bind(*bind_args, **bind_kwargs)(**kwargs) + for func in funcs: + kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(**kwargs) + return kwargs + + return chained_func + + @_Dec.wrap + def chain_with_both( + self, funcs: Iterable[ChainableCallable], *bind_args, **bind_kwargs + ) -> ChainableCallable: + def chained_func(*args, **kwargs): + for func in funcs: + args, kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)( + *args, **kwargs + ) + return args, kwargs + + return chained_func + + +class _Base: + none = _Wrapper(_RawBase.none) + true = _Wrapper(_RawBase.true) + false = _Wrapper(_RawBase.false) + identity = _Wrapper(_RawBase.identity) + only_you = _Wrapper(_RawBase.only_you) + equal = _Wrapper(_RawBase.equal) + not_equal = _Wrapper(_RawBase.not_equal) + not_ = _Wrapper(_RawBase.not_) + + +class _Creators: + @staticmethod + def make_isinstance_of_types(*types: type) -> Callable: + return _Wrapper(lambda v: type(v) in types) + + +class CruFunction: + RawBase: TypeAlias = _RawBase + Base: TypeAlias = _Base + Creators: TypeAlias = _Creators + Wrapper: TypeAlias = _Wrapper + Decorators: TypeAlias = _Dec + + +CRU.add_objects(CruFunction) diff --git a/python/cru/_helper.py b/python/cru/_helper.py new file mode 100644 index 0000000..43baf46 --- /dev/null +++ b/python/cru/_helper.py @@ -0,0 +1,16 @@ +from collections.abc import Callable +from typing import Any, Iterable, TypeVar, cast + +_T = TypeVar("_T") +_D = TypeVar("_D") + + +def remove_element( + iterable: Iterable[_T | None], to_rm: Iterable[Any], des: type[_D] | None = None +) -> _D: + to_rm = set(to_rm) + return cast(Callable[..., _D], des or list)(v for v in iterable if v not in to_rm) + + +def remove_none(iterable: Iterable[_T | None], des: type[_D] | None = None) -> _D: + return cast(Callable[..., _D], des or list)(v for v in iterable if v is not None) diff --git a/python/cru/_iter.py b/python/cru/_iter.py new file mode 100644 index 0000000..f9683ca --- /dev/null +++ b/python/cru/_iter.py @@ -0,0 +1,469 @@ +from __future__ import annotations + +from collections.abc import Iterable, Callable, Generator, Iterator +from dataclasses import dataclass +from enum import Enum +from typing import ( + Concatenate, + Literal, + Never, + Self, + TypeAlias, + TypeVar, + ParamSpec, + Any, + Generic, + cast, +) + +from ._base import CRU +from ._const import CruNotFound +from ._error import cru_unreachable + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_O = TypeVar("_O") +_V = TypeVar("_V") +_R = TypeVar("_R") + + +class _Generic: + class StepActionKind(Enum): + SKIP = 0 + PUSH = 1 + STOP = 2 + AGGREGATE = 3 + + @dataclass + class StepAction(Generic[_V, _R]): + value: Iterable[Self] | _V | _R | None + kind: _Generic.StepActionKind + + @property + def push_value(self) -> _V: + assert self.kind == _Generic.StepActionKind.PUSH + return cast(_V, self.value) + + @property + def stop_value(self) -> _R: + assert self.kind == _Generic.StepActionKind.STOP + return cast(_R, self.value) + + @staticmethod + def skip() -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(None, _Generic.StepActionKind.SKIP) + + @staticmethod + def push(value: _V | None) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(value, _Generic.StepActionKind.PUSH) + + @staticmethod + def stop(value: _R | None = None) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(value, _Generic.StepActionKind.STOP) + + @staticmethod + def aggregate( + *results: _Generic.StepAction[_V, _R], + ) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(results, _Generic.StepActionKind.AGGREGATE) + + @staticmethod + def push_last(value: _V | None) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction.aggregate( + _Generic.StepAction.push(value), _Generic.StepAction.stop() + ) + + def flatten(self) -> Iterable[Self]: + return _Generic.flatten( + self, + is_leave=lambda r: r.kind != _Generic.StepActionKind.AGGREGATE, + get_children=lambda r: cast(Iterable[Self], r.value), + ) + + GeneralStepAction: TypeAlias = StepAction[_V, _R] | _V | _R | None + IterateOperation: TypeAlias = Callable[[_T, int], GeneralStepAction[_V, _R]] + IteratePreHook: TypeAlias = Callable[[Iterable[_T]], GeneralStepAction[_V, _R]] + IteratePostHook: TypeAlias = Callable[[int], GeneralStepAction[_V, _R]] + + @staticmethod + def _is_not_iterable(o: Any) -> bool: + return not isinstance(o, Iterable) + + @staticmethod + def _return_self(o): + return o + + @staticmethod + def iterable_flatten( + maybe_iterable: Iterable[_T] | _T, max_depth: int = -1, *, _depth: int = 0 + ) -> Iterable[Iterable[_T] | _T]: + if _depth == max_depth or not isinstance(maybe_iterable, Iterable): + yield maybe_iterable + return + + for child in maybe_iterable: + yield from _Generic.iterable_flatten( + child, + max_depth, + _depth=_depth + 1, + ) + + @staticmethod + def flatten( + o: _O, + max_depth: int = -1, + /, + is_leave: CruIterator.ElementPredicate[_O] = _is_not_iterable, + get_children: CruIterator.ElementTransformer[_O, Iterable[_O]] = _return_self, + *, + _depth: int = 0, + ) -> Iterable[_O]: + if _depth == max_depth or is_leave(o): + yield o + return + for child in get_children(o): + yield from _Generic.flatten( + child, + max_depth, + is_leave, + get_children, + _depth=_depth + 1, + ) + + class Results: + @staticmethod + def true(_) -> Literal[True]: + return True + + @staticmethod + def false(_) -> Literal[False]: + return False + + @staticmethod + def not_found(_) -> Literal[CruNotFound.VALUE]: + return CruNotFound.VALUE + + @staticmethod + def _non_result_to_push(value: Any) -> StepAction[_V, _R]: + return _Generic.StepAction.push(value) + + @staticmethod + def _non_result_to_stop(value: Any) -> StepAction[_V, _R]: + return _Generic.StepAction.stop(value) + + @staticmethod + def _none_hook(_: Any) -> StepAction[_V, _R]: + return _Generic.StepAction.skip() + + def iterate( + iterable: Iterable[_T], + operation: IterateOperation[_T, _V, _R], + fallback_return: _R, + pre_iterate: IteratePreHook[_T, _V, _R], + post_iterate: IteratePostHook[_V, _R], + convert_value_result: Callable[[_V | _R | None], StepAction[_V, _R]], + ) -> Generator[_V, None, _R]: + pre_result = pre_iterate(iterable) + if not isinstance(pre_result, _Generic.StepAction): + real_pre_result = convert_value_result(pre_result) + for r in real_pre_result.flatten(): + if r.kind == _Generic.StepActionKind.STOP: + return r.stop_value + elif r.kind == _Generic.StepActionKind.PUSH: + yield r.push_value + else: + assert r.kind == _Generic.StepActionKind.SKIP + + for index, element in enumerate(iterable): + result = operation(element, index) + if not isinstance(result, _Generic.StepAction): + real_result = convert_value_result(result) + for r in real_result.flatten(): + if r.kind == _Generic.StepActionKind.STOP: + return r.stop_value + elif r.kind == _Generic.StepActionKind.PUSH: + yield r.push_value + else: + assert r.kind == _Generic.StepActionKind.SKIP + continue + + post_result = post_iterate(index + 1) + if not isinstance(post_result, _Generic.StepAction): + real_post_result = convert_value_result(post_result) + for r in real_post_result.flatten(): + if r.kind == _Generic.StepActionKind.STOP: + return r.stop_value + elif r.kind == _Generic.StepActionKind.PUSH: + yield r.push_value + else: + assert r.kind == _Generic.StepActionKind.SKIP + + return fallback_return + + def create_new( + iterable: Iterable[_T], + operation: IterateOperation[_T, _V, _R], + fallback_return: _R, + /, + pre_iterate: IteratePreHook[_T, _V, _R] | None = None, + post_iterate: IteratePostHook[_V, _R] | None = None, + ) -> Generator[_V, None, _R]: + return _Generic.iterate( + iterable, + operation, + fallback_return, + pre_iterate or _Generic._none_hook, + post_iterate or _Generic._none_hook, + _Generic._non_result_to_push, + ) + + def get_result( + iterable: Iterable[_T], + operation: IterateOperation[_T, _V, _R], + fallback_return: _R, + /, + pre_iterate: IteratePreHook[_T, _V, _R] | None = None, + post_iterate: IteratePostHook[_V, _R] | None = None, + ) -> _R: + try: + for _ in _Generic.iterate( + iterable, + operation, + fallback_return, + pre_iterate or _Generic._none_hook, + post_iterate or _Generic._none_hook, + _Generic._non_result_to_stop, + ): + pass + except StopIteration as stop: + return stop.value + cru_unreachable() + + +class _Helpers: + @staticmethod + def auto_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]: + count = 0 + + def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _O: + nonlocal count + r = c(count, *args, **kwargs) + count += 1 + return r + + return wrapper + + +class _Creators: + class Raw: + @staticmethod + def empty() -> Iterator[Never]: + return iter([]) + + @staticmethod + def range(*args) -> Iterator[int]: + return iter(range(*args)) + + @staticmethod + def unite(*args: _T) -> Iterator[_T]: + return iter(args) + + @staticmethod + def _concat(*iterables: Iterable[_T]) -> Iterable[_T]: + for iterable in iterables: + yield from iterable + + @staticmethod + def concat(*iterables: Iterable[_T]) -> Iterator[_T]: + return iter(_Creators.Raw._concat(*iterables)) + + @staticmethod + def _wrap(f: Callable[_P, Iterable[_O]]) -> Callable[_P, CruIterator[_O]]: + def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> CruIterator[_O]: + return CruIterator(f(*args, **kwargs)) + + return _wrapped + + empty = _wrap(Raw.empty) + range = _wrap(Raw.range) + unite = _wrap(Raw.unite) + concat = _wrap(Raw.concat) + + +class CruIterator(Generic[_T]): + ElementOperation: TypeAlias = Callable[[_V], Any] + ElementPredicate: TypeAlias = Callable[[_V], bool] + AnyElementPredicate: TypeAlias = ElementPredicate[Any] + ElementTransformer: TypeAlias = Callable[[_V], _O] + SelfElementTransformer: TypeAlias = ElementTransformer[_V, _V] + AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] + + Creators: TypeAlias = _Creators + Helpers: TypeAlias = _Helpers + + def __init__(self, iterable: Iterable[_T]) -> None: + self._iterator = iter(iterable) + + def __iter__(self) -> Iterator[_T]: + return self._iterator + + def create_new_me(self, iterable: Iterable[_O]) -> CruIterator[_O]: + return type(self)(iterable) # type: ignore + + @staticmethod + def _wrap( + f: Callable[Concatenate[CruIterator[_T], _P], Iterable[_O]], + ) -> Callable[Concatenate[CruIterator[_T], _P], CruIterator[_O]]: + def _wrapped( + self: CruIterator[_T], *args: _P.args, **kwargs: _P.kwargs + ) -> CruIterator[_O]: + return self.create_new_me(f(self, *args, **kwargs)) + + return _wrapped + + @_wrap + def replace_me(self, iterable: Iterable[_O]) -> Iterable[_O]: + return iterable + + def replace_me_with_empty(self) -> CruIterator[Never]: + return self.create_new_me(_Creators.Raw.empty()) + + def replace_me_with_range(self, *args) -> CruIterator[int]: + return self.create_new_me(_Creators.Raw.range(*args)) + + def replace_me_with_unite(self, *args: _O) -> CruIterator[_O]: + return self.create_new_me(_Creators.Raw.unite(*args)) + + def replace_me_with_concat(self, *iterables: Iterable[_T]) -> CruIterator[_T]: + return self.create_new_me(_Creators.Raw.concat(*iterables)) + + def to_set(self) -> set[_T]: + return set(self) + + def to_list(self) -> list[_T]: + return list(self) + + def all(self, predicate: ElementPredicate[_T]) -> bool: + for value in self: + if not predicate(value): + return False + return True + + def any(self, predicate: ElementPredicate[_T]) -> bool: + for value in self: + if predicate(value): + return True + return False + + def foreach(self, operation: ElementOperation[_T]) -> None: + for value in self: + operation(value) + + @_wrap + def transform(self, transformer: ElementTransformer[_T, _O]) -> Iterable[_O]: + for value in self: + yield transformer(value) + + map = transform + + @_wrap + def filter(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: + for value in self: + if predicate(value): + yield value + + @_wrap + def continue_if(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: + for value in self: + yield value + if not predicate(value): + break + + def first_n(self, max_count: int) -> CruIterator[_T]: + if max_count < 0: + raise ValueError("max_count must be 0 or positive.") + if max_count == 0: + return self.replace_me_with_empty() # type: ignore + return self.continue_if(_Helpers.auto_count(lambda i, _: i < max_count - 1)) + + def drop_n(self, n: int) -> CruIterator[_T]: + if n < 0: + raise ValueError("n must be 0 or positive.") + if n == 0: + return self + return self.filter(_Helpers.auto_count(lambda i, _: i < n)) + + def single_or( + self, fallback: _O | CruNotFound = CruNotFound.VALUE + ) -> _T | _O | CruNotFound: + first_2 = self.first_n(2) + has_value = False + for element in first_2: + if has_value: + raise ValueError("More than one value found.") + has_value = True + value = element + if has_value: + return value + else: + return fallback + + def first_or( + self, fallback: _O | CruNotFound = CruNotFound.VALUE + ) -> _T | _O | CruNotFound: + return self.first_n(1).single_or(fallback) + + @_wrap + def flatten(self, max_depth: int = -1) -> Iterable[Any]: + return _Generic.iterable_flatten(self, max_depth) + + def select_by_indices(self, indices: Iterable[int]) -> CruIterator[_T]: + index_set = set(indices) + max_index = max(index_set) + return self.first_n(max_index + 1).filter( + _Helpers.auto_count(lambda i, _: i in index_set) + ) + + def remove_values(self, values: Iterable[Any]) -> CruIterator[_T]: + value_set = set(values) + return self.filter(lambda v: v not in value_set) + + def replace_values( + self, old_values: Iterable[Any], new_value: _O + ) -> Iterable[_T | _O]: + value_set = set(old_values) + return self.transform(lambda v: new_value if v in value_set else v) + + def group_by(self, key_getter: Callable[[_T], _O]) -> dict[_O, list[_T]]: + result: dict[_O, list[_T]] = {} + + for item in self: + key = key_getter(item) + if key not in result: + result[key] = [] + result[key].append(item) + + return result + + def join_str(self: CruIterator[str], separator: str) -> str: + return separator.join(self) + + +class CruIterMixin(Generic[_T]): + def cru_iter(self: Iterable[_T]) -> CruIterator[_T]: + return CruIterator(self) + + +class CruIterList(list[_T], CruIterMixin[_T]): + pass + + +class CruIterable: + Generic: TypeAlias = _Generic + Iterator: TypeAlias = CruIterator[_T] + Helpers: TypeAlias = _Helpers + Mixin: TypeAlias = CruIterMixin[_T] + IterList: TypeAlias = CruIterList[_T] + + +CRU.add_objects(CruIterable, CruIterator) diff --git a/python/cru/_type.py b/python/cru/_type.py new file mode 100644 index 0000000..1f81da3 --- /dev/null +++ b/python/cru/_type.py @@ -0,0 +1,52 @@ +from collections.abc import Iterable +from typing import Any + +from ._error import CruException, CruLogicError +from ._iter import CruIterator + + +class CruTypeCheckError(CruException): + pass + + +DEFAULT_NONE_ERR_MSG = "None is not allowed here." +DEFAULT_TYPE_ERR_MSG = "Object of this type is not allowed here." + + +class CruTypeSet(set[type]): + def __init__(self, *types: type): + type_set = CruIterator(types).filter(lambda t: t is not None).to_set() + if not CruIterator(type_set).all(lambda t: isinstance(t, type)): + raise CruLogicError("TypeSet can only contain type.") + super().__init__(type_set) + + def check_value( + self, + value: Any, + /, + allow_none: bool = False, + empty_allow_all: bool = True, + ) -> None: + if value is None: + if allow_none: + return + else: + raise CruTypeCheckError(DEFAULT_NONE_ERR_MSG) + if len(self) == 0 and empty_allow_all: + return + if not CruIterator(self).any(lambda t: isinstance(value, t)): + raise CruTypeCheckError(DEFAULT_TYPE_ERR_MSG) + + def check_value_list( + self, + values: Iterable[Any], + /, + allow_none: bool = False, + empty_allow_all: bool = True, + ) -> None: + for value in values: + self.check_value( + value, + allow_none, + empty_allow_all, + ) diff --git a/python/cru/attr.py b/python/cru/attr.py new file mode 100644 index 0000000..d4cc86a --- /dev/null +++ b/python/cru/attr.py @@ -0,0 +1,364 @@ +from __future__ import annotations + +import copy +from collections.abc import Callable, Iterable +from dataclasses import dataclass, field +from typing import Any + +from .list import CruUniqueKeyList +from ._type import CruTypeSet +from ._const import CruNotFound, CruUseDefault, CruDontChange +from ._iter import CruIterator + + +@dataclass +class CruAttr: + + name: str + value: Any + description: str | None + + @staticmethod + def make( + name: str, value: Any = CruUseDefault.VALUE, description: str | None = None + ) -> CruAttr: + return CruAttr(name, value, description) + + +CruAttrDefaultFactory = Callable[["CruAttrDef"], Any] +CruAttrTransformer = Callable[[Any, "CruAttrDef"], Any] +CruAttrValidator = Callable[[Any, "CruAttrDef"], None] + + +@dataclass +class CruAttrDef: + name: str + description: str + default_factory: CruAttrDefaultFactory + transformer: CruAttrTransformer + validator: CruAttrValidator + + def __init__( + self, + name: str, + description: str, + default_factory: CruAttrDefaultFactory, + transformer: CruAttrTransformer, + validator: CruAttrValidator, + ) -> None: + self.name = name + self.description = description + self.default_factory = default_factory + self.transformer = transformer + self.validator = validator + + def transform(self, value: Any) -> Any: + if self.transformer is not None: + return self.transformer(value, self) + return value + + def validate(self, value: Any, /, force_allow_none: bool = False) -> None: + if force_allow_none is value is None: + return + if self.validator is not None: + self.validator(value, self) + + def transform_and_validate( + self, value: Any, /, force_allow_none: bool = False + ) -> Any: + value = self.transform(value) + self.validate(value, force_allow_none) + return value + + def make_default_value(self) -> Any: + return self.transform_and_validate(self.default_factory(self)) + + def adopt(self, attr: CruAttr) -> CruAttr: + attr = copy.deepcopy(attr) + + if attr.name is None: + attr.name = self.name + elif attr.name != self.name: + raise ValueError(f"Attr name is not match: {attr.name} != {self.name}") + + if attr.value is CruUseDefault.VALUE: + attr.value = self.make_default_value() + else: + attr.value = self.transform_and_validate(attr.value) + + if attr.description is None: + attr.description = self.description + + return attr + + def make( + self, value: Any = CruUseDefault.VALUE, description: None | str = None + ) -> CruAttr: + value = self.make_default_value() if value is CruUseDefault.VALUE else value + value = self.transform_and_validate(value) + return CruAttr( + self.name, + value, + description if description is not None else self.description, + ) + + +@dataclass +class CruAttrDefBuilder: + + name: str + description: str + types: list[type] | None = field(default=None) + allow_none: bool = field(default=False) + default: Any = field(default=CruUseDefault.VALUE) + default_factory: CruAttrDefaultFactory | None = field(default=None) + auto_list: bool = field(default=False) + transformers: list[CruAttrTransformer] = field(default_factory=list) + validators: list[CruAttrValidator] = field(default_factory=list) + override_transformer: CruAttrTransformer | None = field(default=None) + override_validator: CruAttrValidator | None = field(default=None) + + build_hook: Callable[[CruAttrDef], None] | None = field(default=None) + + def __init__(self, name: str, description: str) -> None: + super().__init__() + self.name = name + self.description = description + + def auto_adjust_default(self) -> None: + if self.default is not CruUseDefault.VALUE and self.default is not None: + return + if self.allow_none and self.default is CruUseDefault.VALUE: + self.default = None + if not self.allow_none and self.default is None: + self.default = CruUseDefault.VALUE + if self.auto_list and not self.allow_none: + self.default = [] + + def with_name(self, name: str | CruDontChange) -> CruAttrDefBuilder: + if name is not CruDontChange.VALUE: + self.name = name + return self + + def with_description( + self, default_description: str | CruDontChange + ) -> CruAttrDefBuilder: + if default_description is not CruDontChange.VALUE: + self.description = default_description + return self + + def with_default(self, default: Any) -> CruAttrDefBuilder: + if default is not CruDontChange.VALUE: + self.default = default + return self + + def with_default_factory( + self, + default_factory: CruAttrDefaultFactory | CruDontChange, + ) -> CruAttrDefBuilder: + if default_factory is not CruDontChange.VALUE: + self.default_factory = default_factory + return self + + def with_types( + self, + types: Iterable[type] | None | CruDontChange, + ) -> CruAttrDefBuilder: + if types is not CruDontChange.VALUE: + self.types = None if types is None else list(types) + return self + + def with_allow_none(self, allow_none: bool | CruDontChange) -> CruAttrDefBuilder: + if allow_none is not CruDontChange.VALUE: + self.allow_none = allow_none + return self + + def with_auto_list( + self, auto_list: bool | CruDontChange = True + ) -> CruAttrDefBuilder: + if auto_list is not CruDontChange.VALUE: + self.auto_list = auto_list + return self + + def with_constraint( + self, + /, + allow_none: bool | CruDontChange = CruDontChange.VALUE, + types: Iterable[type] | None | CruDontChange = CruDontChange.VALUE, + default: Any = CruDontChange.VALUE, + default_factory: CruAttrDefaultFactory | CruDontChange = CruDontChange.VALUE, + auto_list: bool | CruDontChange = CruDontChange.VALUE, + ) -> CruAttrDefBuilder: + return ( + self.with_allow_none(allow_none) + .with_types(types) + .with_default(default) + .with_default_factory(default_factory) + .with_auto_list(auto_list) + ) + + def add_transformer(self, transformer: CruAttrTransformer) -> CruAttrDefBuilder: + self.transformers.append(transformer) + return self + + def clear_transformers(self) -> CruAttrDefBuilder: + self.transformers.clear() + return self + + def add_validator(self, validator: CruAttrValidator) -> CruAttrDefBuilder: + self.validators.append(validator) + return self + + def clear_validators(self) -> CruAttrDefBuilder: + self.validators.clear() + return self + + def with_override_transformer( + self, override_transformer: CruAttrTransformer | None | CruDontChange + ) -> CruAttrDefBuilder: + if override_transformer is not CruDontChange.VALUE: + self.override_transformer = override_transformer + return self + + def with_override_validator( + self, override_validator: CruAttrValidator | None | CruDontChange + ) -> CruAttrDefBuilder: + if override_validator is not CruDontChange.VALUE: + self.override_validator = override_validator + return self + + def is_valid(self) -> tuple[bool, str]: + if not isinstance(self.name, str): + return False, "Name must be a string!" + if not isinstance(self.description, str): + return False, "Default description must be a string!" + if ( + not self.allow_none + and self.default is None + and self.default_factory is None + ): + return False, "Default must be set if allow_none is False!" + return True, "" + + @staticmethod + def _build( + builder: CruAttrDefBuilder, auto_adjust_default: bool = True + ) -> CruAttrDef: + if auto_adjust_default: + builder.auto_adjust_default() + + valid, err = builder.is_valid() + if not valid: + raise ValueError(err) + + def composed_transformer(value: Any, attr_def: CruAttrDef) -> Any: + def transform_value(single_value: Any) -> Any: + for transformer in builder.transformers: + single_value = transformer(single_value, attr_def) + return single_value + + if builder.auto_list: + if not isinstance(value, list): + value = [value] + value = CruIterator(value).transform(transform_value).to_list() + + else: + value = transform_value(value) + return value + + type_set = None if builder.types is None else CruTypeSet(*builder.types) + + def composed_validator(value: Any, attr_def: CruAttrDef): + def validate_value(single_value: Any) -> None: + if type_set is not None: + type_set.check_value(single_value, allow_none=builder.allow_none) + for validator in builder.validators: + validator(single_value, attr_def) + + if builder.auto_list: + CruIterator(value).foreach(validate_value) + else: + validate_value(value) + + real_transformer = builder.override_transformer or composed_transformer + real_validator = builder.override_validator or composed_validator + + default_factory = builder.default_factory + if default_factory is None: + + def default_factory(_d): + return copy.deepcopy(builder.default) + + d = CruAttrDef( + builder.name, + builder.description, + default_factory, + real_transformer, + real_validator, + ) + if builder.build_hook: + builder.build_hook(d) + return d + + def build(self, auto_adjust_default=True) -> CruAttrDef: + c = copy.deepcopy(self) + self.build_hook = None + return CruAttrDefBuilder._build(c, auto_adjust_default) + + +class CruAttrDefRegistry(CruUniqueKeyList[CruAttrDef, str]): + + def __init__(self) -> None: + super().__init__(lambda d: d.name) + + def make_builder(self, name: str, default_description: str) -> CruAttrDefBuilder: + b = CruAttrDefBuilder(name, default_description) + b.build_hook = lambda a: self.add(a) + return b + + def adopt(self, attr: CruAttr) -> CruAttr: + d = self.get(attr.name) + return d.adopt(attr) + + +class CruAttrTable(CruUniqueKeyList[CruAttr, str]): + def __init__(self, registry: CruAttrDefRegistry) -> None: + self._registry: CruAttrDefRegistry = registry + super().__init__(lambda a: a.name, before_add=registry.adopt) + + @property + def registry(self) -> CruAttrDefRegistry: + return self._registry + + def get_value_or(self, name: str, fallback: Any = CruNotFound.VALUE) -> Any: + a = self.get_or(name, CruNotFound.VALUE) + if a is CruNotFound.VALUE: + return fallback + return a.value + + def get_value(self, name: str) -> Any: + a = self.get(name) + return a.value + + def make_attr( + self, + name: str, + value: Any = CruUseDefault.VALUE, + /, + description: str | None = None, + ) -> CruAttr: + d = self._registry.get(name) + return d.make(value, description or d.description) + + def add_value( + self, + name: str, + value: Any = CruUseDefault.VALUE, + /, + description: str | None = None, + *, + replace: bool = False, + ) -> CruAttr: + attr = self.make_attr(name, value, description) + self.add(attr, replace) + return attr diff --git a/python/cru/config.py b/python/cru/config.py new file mode 100644 index 0000000..0f6f0d0 --- /dev/null +++ b/python/cru/config.py @@ -0,0 +1,196 @@ +from __future__ import annotations + +from typing import Any, TypeVar, Generic + +from ._error import CruException +from .list import CruUniqueKeyList +from .value import ( + INTEGER_VALUE_TYPE, + TEXT_VALUE_TYPE, + CruValueTypeError, + ValueGeneratorBase, + ValueType, +) + +_T = TypeVar("_T") + + +class CruConfigError(CruException): + def __init__(self, message: str, item: ConfigItem, *args, **kwargs): + super().__init__(message, *args, **kwargs) + self._item = item + + @property + def item(self) -> ConfigItem: + return self._item + + +class ConfigItem(Generic[_T]): + def __init__( + self, + name: str, + description: str, + value_type: ValueType[_T], + value: _T | None = None, + /, + default: ValueGeneratorBase[_T] | _T | None = None, + ) -> None: + self._name = name + self._description = description + self._value_type = value_type + self._value = value + self._default = default + + @property + def name(self) -> str: + return self._name + + @property + def description(self) -> str: + return self._description + + @property + def value_type(self) -> ValueType[_T]: + return self._value_type + + @property + def is_set(self) -> bool: + return self._value is not None + + @property + def value(self) -> _T: + if self._value is None: + raise CruConfigError( + "Config value is not set.", + self, + user_message=f"Config item {self.name} is not set.", + ) + return self._value + + @property + def value_str(self) -> str: + return self.value_type.convert_value_to_str(self.value) + + def set_value(self, v: _T | str, allow_convert_from_str=False): + if allow_convert_from_str: + self._value = self.value_type.check_value_or_try_convert_from_str(v) + else: + self._value = self.value_type.check_value(v) + + def reset(self): + self._value = None + + @property + def default(self) -> ValueGeneratorBase[_T] | _T | None: + return self._default + + @property + def can_generate_default(self) -> bool: + return self.default is not None + + def generate_default_value(self) -> _T: + if self.default is None: + raise CruConfigError( + "Config item does not support default value generation.", self + ) + elif isinstance(self.default, ValueGeneratorBase): + v = self.default.generate() + else: + v = self.default + try: + self.value_type.check_value(v) + return v + except CruValueTypeError as e: + raise CruConfigError( + "Config value generator returns an invalid value.", self + ) from e + + def copy(self) -> "ConfigItem": + return ConfigItem( + self.name, + self.description, + self.value_type, + self.value, + self.default, + ) + + @property + def description_str(self) -> str: + return f"{self.name} ({self.value_type.name}): {self.description}" + + +class Configuration(CruUniqueKeyList[ConfigItem[Any], str]): + def __init__(self): + super().__init__(lambda c: c.name) + + def get_set_items(self) -> list[ConfigItem[Any]]: + return [item for item in self if item.is_set] + + def get_unset_items(self) -> list[ConfigItem[Any]]: + return [item for item in self if not item.is_set] + + @property + def all_set(self) -> bool: + return len(self.get_unset_items()) == 0 + + @property + def all_not_set(self) -> bool: + return len(self.get_set_items()) == 0 + + def add_text_config( + self, + name: str, + description: str, + value: str | None = None, + default: ValueGeneratorBase[str] | str | None = None, + ) -> ConfigItem[str]: + item = ConfigItem(name, description, TEXT_VALUE_TYPE, value, default) + self.add(item) + return item + + def add_int_config( + self, + name: str, + description: str, + value: int | None = None, + default: ValueGeneratorBase[int] | int | None = None, + ) -> ConfigItem[int]: + item = ConfigItem(name, description, INTEGER_VALUE_TYPE, value, default) + self.add(item) + return item + + def set_config_item( + self, + name: str, + value: Any | str, + allow_convert_from_str=True, + ) -> None: + item = self.get(name) + item.set_value( + value, + allow_convert_from_str=allow_convert_from_str, + ) + + def reset_all(self) -> None: + for item in self: + item.reset() + + def to_dict(self) -> dict[str, Any]: + return {item.name: item.value for item in self} + + def to_str_dict(self) -> dict[str, str]: + return { + item.name: item.value_type.convert_value_to_str(item.value) for item in self + } + + def set_value_dict( + self, + value_dict: dict[str, Any], + allow_convert_from_str: bool = False, + ) -> None: + for name, value in value_dict.items(): + item = self.get(name) + item.set_value( + value, + allow_convert_from_str=allow_convert_from_str, + ) diff --git a/python/cru/list.py b/python/cru/list.py new file mode 100644 index 0000000..216a561 --- /dev/null +++ b/python/cru/list.py @@ -0,0 +1,160 @@ +from __future__ import annotations + +from collections.abc import Callable, Iterator +from typing import Any, Generic, Iterable, TypeAlias, TypeVar, overload + +from ._error import CruInternalError +from ._iter import CruIterator +from ._const import CruNotFound + +_T = TypeVar("_T") +_O = TypeVar("_O") + + +class CruListEdit(CruIterator[_T]): + def __init__(self, iterable: Iterable[_T], _list: CruList[Any]) -> None: + super().__init__(iterable) + self._list = _list + + def create_me(self, iterable: Iterable[_O]) -> CruListEdit[_O]: + return CruListEdit(iterable, self._list) + + @property + def list(self) -> CruList[Any]: + return self._list + + def done(self) -> CruList[Any]: + self._list.reset(self) + return self._list + + +class CruList(list[_T]): + def reset(self, new_values: Iterable[_T]): + if self is new_values: + new_values = list(new_values) + self.clear() + self.extend(new_values) + return self + + def as_cru_iterator(self) -> CruIterator[_T]: + return CruIterator(self) + + @staticmethod + def make(maybe_list: Iterable[_T] | _T | None) -> CruList[_T]: + if maybe_list is None: + return CruList() + if isinstance(maybe_list, Iterable): + return CruList(maybe_list) + return CruList([maybe_list]) + + +_K = TypeVar("_K") + +_KeyGetter: TypeAlias = Callable[[_T], _K] + + +class CruUniqueKeyList(Generic[_T, _K]): + def __init__( + self, + key_getter: _KeyGetter[_T, _K], + *, + before_add: Callable[[_T], _T] | None = None, + ): + super().__init__() + self._key_getter = key_getter + self._before_add = before_add + self._list: CruList[_T] = CruList() + + @property + def key_getter(self) -> _KeyGetter[_T, _K]: + return self._key_getter + + @property + def internal_list(self) -> CruList[_T]: + return self._list + + def validate_self(self): + keys = self._list.transform(self._key_getter) + if len(keys) != len(set(keys)): + raise CruInternalError("Duplicate keys!") + + @overload + def get_or( + self, key: _K, fallback: CruNotFound = CruNotFound.VALUE + ) -> _T | CruNotFound: ... + + @overload + def get_or(self, key: _K, fallback: _O) -> _T | _O: ... + + def get_or( + self, key: _K, fallback: _O | CruNotFound = CruNotFound.VALUE + ) -> _T | _O | CruNotFound: + return ( + self._list.as_cru_iterator() + .filter(lambda v: key == self._key_getter(v)) + .first_or(fallback) + ) + + def get(self, key: _K) -> _T: + value = self.get_or(key) + if value is CruNotFound.VALUE: + raise KeyError(f"Key {key} not found!") + return value # type: ignore + + @property + def keys(self) -> Iterable[_K]: + return self._list.as_cru_iterator().map(self._key_getter) + + def has_key(self, key: _K) -> bool: + return self.get_or(key) != CruNotFound.VALUE + + def try_remove(self, key: _K) -> bool: + value = self.get_or(key) + if value is CruNotFound.VALUE: + return False + self._list.remove(value) + return True + + def remove(self, key: _K, allow_absence: bool = False) -> None: + if not self.try_remove(key) and not allow_absence: + raise KeyError(f"Key {key} not found!") + + def add(self, value: _T, /, replace: bool = False) -> None: + v = self.get_or(self._key_getter(value)) + if v is not CruNotFound.VALUE: + if not replace: + raise KeyError(f"Key {self._key_getter(v)} already exists!") + self._list.remove(v) + if self._before_add is not None: + value = self._before_add(value) + self._list.append(value) + + def set(self, value: _T) -> None: + self.add(value, True) + + def extend(self, iterable: Iterable[_T], /, replace: bool = False) -> None: + values = list(iterable) + to_remove = [] + for value in values: + v = self.get_or(self._key_getter(value)) + if v is not CruNotFound.VALUE: + if not replace: + raise KeyError(f"Key {self._key_getter(v)} already exists!") + to_remove.append(v) + for value in to_remove: + self._list.remove(value) + if self._before_add is not None: + values = [self._before_add(value) for value in values] + self._list.extend(values) + + def clear(self) -> None: + self._list.reset([]) + + def __iter__(self) -> Iterator[_T]: + return iter(self._list) + + def __len__(self) -> int: + return len(self._list) + + def cru_iter(self) -> CruIterator[_T]: + return CruIterator(self._list) diff --git a/python/cru/parsing.py b/python/cru/parsing.py new file mode 100644 index 0000000..0e9239d --- /dev/null +++ b/python/cru/parsing.py @@ -0,0 +1,290 @@ +from __future__ import annotations + +from abc import ABCMeta, abstractmethod +from dataclasses import dataclass +from enum import Enum +from typing import NamedTuple, TypeAlias, TypeVar, Generic, NoReturn, Callable + +from ._error import CruException +from ._iter import CruIterable + +_T = TypeVar("_T") + + +class StrParseStream: + class MemStackEntry(NamedTuple): + pos: int + lineno: int + + class MemStackPopStr(NamedTuple): + text: str + lineno: int + + def __init__(self, text: str) -> None: + self._text = text + self._pos = 0 + self._lineno = 1 + self._length = len(self._text) + self._valid_pos_range = range(0, self.length + 1) + self._valid_offset_range = range(-self.length, self.length + 1) + self._mem_stack: CruIterable.IterList[StrParseStream.MemStackEntry] = ( + CruIterable.IterList() + ) + + @property + def text(self) -> str: + return self._text + + @property + def length(self) -> int: + return self._length + + @property + def valid_pos_range(self) -> range: + return self._valid_pos_range + + @property + def valid_offset_range(self) -> range: + return self._valid_offset_range + + @property + def pos(self) -> int: + return self._pos + + @property + def lineno(self) -> int: + return self._lineno + + @property + def eof(self) -> bool: + return self._pos == self.length + + def peek(self, length: int) -> str: + real_length = min(length, self.length - self._pos) + new_position = self._pos + real_length + text = self._text[self._pos : new_position] + return text + + def read(self, length: int) -> str: + text = self.peek(length) + self._pos += len(text) + self._lineno += text.count("\n") + return text + + def skip(self, length: int) -> None: + self.read(length) + + def peek_str(self, text: str) -> bool: + if self.pos + len(text) > self.length: + return False + for offset in range(len(text)): + if self._text[self.pos + offset] != text[offset]: + return False + return True + + def read_str(self, text: str) -> bool: + if not self.peek_str(text): + return False + self._pos += len(text) + self._lineno += text.count("\n") + return True + + @property + def mem_stack(self) -> CruIterable.IterList[MemStackEntry]: + return self._mem_stack + + def push_mem(self) -> None: + self.mem_stack.append(self.MemStackEntry(self.pos, self.lineno)) + + def pop_mem(self) -> MemStackEntry: + return self.mem_stack.pop() + + def pop_mem_str(self, strip_end: int = 0) -> MemStackPopStr: + old = self.pop_mem() + assert self.pos >= old.pos + return self.MemStackPopStr( + self._text[old.pos : self.pos - strip_end], old.lineno + ) + + +class ParseError(CruException, Generic[_T]): + def __init__( + self, + message, + parser: Parser[_T], + text: str, + line_number: int | None = None, + *args, + **kwargs, + ): + super().__init__(message, *args, **kwargs) + self._parser = parser + self._text = text + self._line_number = line_number + + @property + def parser(self) -> Parser[_T]: + return self._parser + + @property + def text(self) -> str: + return self._text + + @property + def line_number(self) -> int | None: + return self._line_number + + +class Parser(Generic[_T], metaclass=ABCMeta): + def __init__(self, name: str) -> None: + self._name = name + + @property + def name(self) -> str: + return self._name + + @abstractmethod + def parse(self, s: str) -> _T: + raise NotImplementedError() + + def raise_parse_exception( + self, text: str, line_number: int | None = None + ) -> NoReturn: + a = line_number and f" at line {line_number}" or "" + raise ParseError(f"Parser {self.name} failed{a}.", self, text, line_number) + + +class _SimpleLineVarParserEntry(NamedTuple): + key: str + value: str + line_number: int | None = None + + +class _SimpleLineVarParserResult(CruIterable.IterList[_SimpleLineVarParserEntry]): + pass + + +class SimpleLineVarParser(Parser[_SimpleLineVarParserResult]): + """ + The parsing result is a list of tuples (key, value, line number). + """ + + Entry: TypeAlias = _SimpleLineVarParserEntry + Result: TypeAlias = _SimpleLineVarParserResult + + def __init__(self) -> None: + super().__init__(type(self).__name__) + + def _parse(self, text: str, callback: Callable[[Entry], None]) -> None: + for ln, line in enumerate(text.splitlines()): + line_number = ln + 1 + # check if it's a comment + if line.strip().startswith("#"): + continue + # check if there is a '=' + if line.find("=") == -1: + self.raise_parse_exception("There is even no '='!", line_number) + # split at first '=' + key, value = line.split("=", 1) + key = key.strip() + value = value.strip() + callback(_SimpleLineVarParserEntry(key, value, line_number)) + + def parse(self, text: str) -> Result: + result = _SimpleLineVarParserResult() + self._parse(text, lambda item: result.append(item)) + return result + + +class _StrWrapperVarParserTokenKind(Enum): + TEXT = "TEXT" + VAR = "VAR" + + +@dataclass +class _StrWrapperVarParserToken: + kind: _StrWrapperVarParserTokenKind + value: str + line_number: int + + @property + def is_text(self) -> bool: + return self.kind is _StrWrapperVarParserTokenKind.TEXT + + @property + def is_var(self) -> bool: + return self.kind is _StrWrapperVarParserTokenKind.VAR + + @staticmethod + def from_mem_str( + kind: _StrWrapperVarParserTokenKind, mem_str: StrParseStream.MemStackPopStr + ) -> _StrWrapperVarParserToken: + return _StrWrapperVarParserToken(kind, mem_str.text, mem_str.lineno) + + def __repr__(self) -> str: + return f"VAR: {self.value}" if self.is_var else "TEXT: ..." + + +class _StrWrapperVarParserResult(CruIterable.IterList[_StrWrapperVarParserToken]): + pass + + +class StrWrapperVarParser(Parser[_StrWrapperVarParserResult]): + TokenKind: TypeAlias = _StrWrapperVarParserTokenKind + Token: TypeAlias = _StrWrapperVarParserToken + Result: TypeAlias = _StrWrapperVarParserResult + + def __init__(self, wrapper: str): + super().__init__(f"StrWrapperVarParser({wrapper})") + self._wrapper = wrapper + + @property + def wrapper(self) -> str: + return self._wrapper + + def parse(self, text: str) -> Result: + result = self.Result() + + class _State(Enum): + TEXT = "TEXT" + VAR = "VAR" + + state = _State.TEXT + stream = StrParseStream(text) + stream.push_mem() + + while True: + if stream.eof: + break + + if stream.read_str(self.wrapper): + if state is _State.TEXT: + result.append( + self.Token.from_mem_str( + self.TokenKind.TEXT, stream.pop_mem_str(len(self.wrapper)) + ) + ) + state = _State.VAR + stream.push_mem() + else: + result.append( + self.Token.from_mem_str( + self.TokenKind.VAR, + stream.pop_mem_str(len(self.wrapper)), + ) + ) + state = _State.TEXT + stream.push_mem() + + continue + + stream.skip(1) + + if state is _State.VAR: + raise ParseError("Text ended without closing variable.", self, text) + + mem_str = stream.pop_mem_str() + if len(mem_str.text) != 0: + result.append(self.Token.from_mem_str(self.TokenKind.TEXT, mem_str)) + + return result diff --git a/python/cru/service/__init__.py b/python/cru/service/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/python/cru/service/__init__.py diff --git a/python/cru/service/__main__.py b/python/cru/service/__main__.py new file mode 100644 index 0000000..2a0268b --- /dev/null +++ b/python/cru/service/__main__.py @@ -0,0 +1,27 @@ +import sys + +from cru import CruException + +from ._app import create_app + + +def main(): + app = create_app() + app.run_command() + + +if __name__ == "__main__": + version_info = sys.version_info + if not (version_info.major == 3 and version_info.minor >= 11): + print("This application requires Python 3.11 or later.", file=sys.stderr) + sys.exit(1) + + try: + main() + except CruException as e: + user_message = e.get_user_message() + if user_message is not None: + print(f"Error: {user_message}") + exit(1) + else: + raise diff --git a/python/cru/service/_app.py b/python/cru/service/_app.py new file mode 100644 index 0000000..b4c6271 --- /dev/null +++ b/python/cru/service/_app.py @@ -0,0 +1,30 @@ +from ._base import ( + AppBase, + CommandDispatcher, + PathCommandProvider, +) +from ._template import TemplateManager +from ._nginx import NginxManager +from ._gen_cmd import GenCmdProvider + +APP_ID = "crupest" + + +class App(AppBase): + def __init__(self): + super().__init__(APP_ID, f"{APP_ID}-service") + self.add_feature(PathCommandProvider()) + self.add_feature(TemplateManager()) + self.add_feature(NginxManager()) + self.add_feature(GenCmdProvider()) + self.add_feature(CommandDispatcher()) + + def run_command(self): + command_dispatcher = self.get_feature(CommandDispatcher) + command_dispatcher.run_command() + + +def create_app() -> App: + app = App() + app.setup() + return app diff --git a/python/cru/service/_base.py b/python/cru/service/_base.py new file mode 100644 index 0000000..e1eee70 --- /dev/null +++ b/python/cru/service/_base.py @@ -0,0 +1,400 @@ +from __future__ import annotations + +from argparse import ArgumentParser, Namespace +from abc import ABC, abstractmethod +import argparse +import os +from pathlib import Path +from typing import TypeVar, overload + +from cru import CruException, CruLogicError + +_Feature = TypeVar("_Feature", bound="AppFeatureProvider") + + +class AppError(CruException): + pass + + +class AppFeatureError(AppError): + def __init__(self, message, feature: type | str, *args, **kwargs): + super().__init__(message, *args, **kwargs) + self._feature = feature + + @property + def feature(self) -> type | str: + return self._feature + + +class AppPathError(CruException): + def __init__(self, message, _path: str | Path, *args, **kwargs): + super().__init__(message, *args, **kwargs) + self._path = str(_path) + + @property + def path(self) -> str: + return self._path + + +class AppPath(ABC): + def __init__(self, id: str, is_dir: bool, description: str) -> None: + self._is_dir = is_dir + self._id = id + self._description = description + + @property + @abstractmethod + def parent(self) -> AppPath | None: ... + + @property + @abstractmethod + def app(self) -> AppBase: ... + + @property + def id(self) -> str: + return self._id + + @property + def description(self) -> str: + return self._description + + @property + def is_dir(self) -> bool: + return self._is_dir + + @property + @abstractmethod + def full_path(self) -> Path: ... + + @property + def full_path_str(self) -> str: + return str(self.full_path) + + def check_parents(self, must_exist: bool = False) -> bool: + for p in reversed(self.full_path.parents): + if not p.exists() and not must_exist: + return False + if not p.is_dir(): + raise AppPathError("Parents' path must be a dir.", self.full_path) + return True + + def check_self(self, must_exist: bool = False) -> bool: + if not self.check_parents(must_exist): + return False + if not self.full_path.exists(): + if not must_exist: + return False + raise AppPathError("Not exist.", self.full_path) + if self.is_dir: + if not self.full_path.is_dir(): + raise AppPathError("Should be a directory, but not.", self.full_path) + else: + return True + else: + if not self.full_path.is_file(): + raise AppPathError("Should be a file, but not.", self.full_path) + else: + return True + + def ensure(self, create_file: bool = False) -> None: + e = self.check_self(False) + if not e: + os.makedirs(self.full_path.parent, exist_ok=True) + if self.is_dir: + os.mkdir(self.full_path) + elif create_file: + with open(self.full_path, "w") as f: + f.write("") + + def read_text(self) -> str: + if self.is_dir: + raise AppPathError("Can't read text of a dir.", self.full_path) + self.check_self() + return self.full_path.read_text() + + def add_subpath( + self, + name: str, + is_dir: bool, + /, + id: str | None = None, + description: str = "", + ) -> AppFeaturePath: + return self.app._add_path(name, is_dir, self, id, description) + + @property + def app_relative_path(self) -> Path: + return self.full_path.relative_to(self.app.root.full_path) + + +class AppFeaturePath(AppPath): + def __init__( + self, + parent: AppPath, + name: str, + is_dir: bool, + /, + id: str | None = None, + description: str = "", + ) -> None: + super().__init__(id or name, is_dir, description) + self._name = name + self._parent = parent + + @property + def name(self) -> str: + return self._name + + @property + def parent(self) -> AppPath: + return self._parent + + @property + def app(self) -> AppBase: + return self.parent.app + + @property + def full_path(self) -> Path: + return Path(self.parent.full_path, self.name).resolve() + + +class AppRootPath(AppPath): + def __init__(self, app: AppBase, path: Path): + super().__init__(f"/{id}", True, f"Application {id} root path.") + self._app = app + self._full_path = path.resolve() + + @property + def parent(self) -> None: + return None + + @property + def app(self) -> AppBase: + return self._app + + @property + def full_path(self) -> Path: + return self._full_path + + +class AppFeatureProvider(ABC): + def __init__(self, name: str, /, app: AppBase | None = None): + super().__init__() + self._name = name + self._app = app if app else AppBase.get_instance() + + @property + def app(self) -> AppBase: + return self._app + + @property + def name(self) -> str: + return self._name + + @abstractmethod + def setup(self) -> None: ... + + +class AppCommandFeatureProvider(AppFeatureProvider): + @abstractmethod + def get_command_info(self) -> tuple[str, str]: ... + + @abstractmethod + def setup_arg_parser(self, arg_parser: ArgumentParser): ... + + @abstractmethod + def run_command(self, args: Namespace) -> None: ... + + +class PathCommandProvider(AppCommandFeatureProvider): + def __init__(self) -> None: + super().__init__("path-command-provider") + + def setup(self): + pass + + def get_command_info(self): + return ("path", "Get information about paths used by app.") + + def setup_arg_parser(self, arg_parser: ArgumentParser) -> None: + subparsers = arg_parser.add_subparsers( + dest="path_command", metavar="PATH_COMMAND" + ) + _list_parser = subparsers.add_parser( + "list", help="list special paths used by app" + ) + + def run_command(self, args: Namespace) -> None: + if args.path_command is None or args.path_command == "list": + for path in self.app.paths: + print( + f"{path.app_relative_path.as_posix()}{'/' if path.is_dir else ''}: {path.description}" + ) + + +class CommandDispatcher(AppFeatureProvider): + def __init__(self) -> None: + super().__init__("command-dispatcher") + + def setup_arg_parser(self) -> None: + self._map: dict[str, AppCommandFeatureProvider] = {} + arg_parser = argparse.ArgumentParser( + description="Service management", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + subparsers = arg_parser.add_subparsers( + dest="command", + help="The management command to execute.", + metavar="COMMAND", + ) + for feature in self.app.features: + if isinstance(feature, AppCommandFeatureProvider): + info = feature.get_command_info() + command_subparser = subparsers.add_parser(info[0], help=info[1]) + feature.setup_arg_parser(command_subparser) + self._map[info[0]] = feature + self._arg_parser = arg_parser + + def setup(self): + self._parsed_args = self.arg_parser.parse_args() + + @property + def arg_parser(self) -> argparse.ArgumentParser: + return self._arg_parser + + @property + def command_map(self) -> dict[str, AppCommandFeatureProvider]: + return self._map + + @property + def program_args(self) -> argparse.Namespace: + return self._parsed_args + + def run_command(self) -> None: + args = self.program_args + if args.command is None: + self.arg_parser.print_help() + return + self.command_map[args.command].run_command(args) + + +class AppBase: + _instance: AppBase | None = None + + @staticmethod + def get_instance() -> AppBase: + if AppBase._instance is None: + raise AppError("App instance not initialized") + return AppBase._instance + + def __init__(self, app_id: str, name: str): + AppBase._instance = self + self._app_id = app_id + self._name = name + self._features: list[AppFeatureProvider] = [] + self._paths: list[AppFeaturePath] = [] + + def setup(self) -> None: + command_dispatcher = self.get_feature(CommandDispatcher) + command_dispatcher.setup_arg_parser() + self._root = AppRootPath(self, Path(self._ensure_env("CRUPEST_PROJECT_DIR"))) + self._data_dir = self._root.add_subpath( + self._ensure_env("CRUPEST_DATA_DIR"), True, id="data" + ) + self._services_dir = self._root.add_subpath( + self._ensure_env("CRUPEST_SERVICES_DIR"), True, id="CRUPEST_SERVICES_DIR" + ) + for feature in self.features: + feature.setup() + for path in self.paths: + path.check_self() + + @property + def app_id(self) -> str: + return self._app_id + + @property + def name(self) -> str: + return self._name + + def _ensure_env(self, env_name: str) -> str: + value = os.getenv(env_name) + if value is None: + raise AppError(f"Environment variable {env_name} not set") + return value + + @property + def root(self) -> AppRootPath: + return self._root + + @property + def data_dir(self) -> AppFeaturePath: + return self._data_dir + + @property + def services_dir(self) -> AppFeaturePath: + return self._services_dir + + @property + def app_initialized(self) -> bool: + return self.data_dir.check_self() + + @property + def features(self) -> list[AppFeatureProvider]: + return self._features + + @property + def paths(self) -> list[AppFeaturePath]: + return self._paths + + def add_feature(self, feature: _Feature) -> _Feature: + for f in self.features: + if f.name == feature.name: + raise AppFeatureError( + f"Duplicate feature name: {feature.name}.", feature.name + ) + self._features.append(feature) + return feature + + def _add_path( + self, + name: str, + is_dir: bool, + /, + parent: AppPath | None = None, + id: str | None = None, + description: str = "", + ) -> AppFeaturePath: + p = AppFeaturePath( + parent or self.root, name, is_dir, id=id, description=description + ) + self._paths.append(p) + return p + + @overload + def get_feature(self, feature: str) -> AppFeatureProvider: ... + + @overload + def get_feature(self, feature: type[_Feature]) -> _Feature: ... + + def get_feature( + self, feature: str | type[_Feature] + ) -> AppFeatureProvider | _Feature: + if isinstance(feature, str): + for f in self._features: + if f.name == feature: + return f + elif isinstance(feature, type): + for f in self._features: + if isinstance(f, feature): + return f + else: + raise CruLogicError("Argument must be the name of feature or its class.") + + raise AppFeatureError(f"Feature {feature} not found.", feature) + + def get_path(self, name: str) -> AppFeaturePath: + for p in self._paths: + if p.id == name or p.name == name: + return p + raise AppPathError(f"Application path {name} not found.", name) diff --git a/python/cru/service/_gen_cmd.py b/python/cru/service/_gen_cmd.py new file mode 100644 index 0000000..f51d65f --- /dev/null +++ b/python/cru/service/_gen_cmd.py @@ -0,0 +1,200 @@ +from dataclasses import dataclass, replace +from typing import TypeAlias + +from ._base import AppCommandFeatureProvider +from ._nginx import NginxManager + +_Str_Or_Cmd_List: TypeAlias = str | list["_Cmd"] + + +@dataclass +class _Cmd: + name: str + desc: str + cmd: _Str_Or_Cmd_List + + def clean(self) -> "_Cmd": + if isinstance(self.cmd, list): + return replace( + self, + cmd=[cmd.clean() for cmd in self.cmd], + ) + elif isinstance(self.cmd, str): + return replace(self, cmd=self.cmd.strip()) + else: + raise ValueError("Unexpected type for cmd.") + + def generate_text( + self, + info_only: bool, + *, + parent: str | None = None, + ) -> str: + if parent is None: + tag = "COMMAND" + long_name = self.name + indent = "" + else: + tag = "SUBCOMMAND" + long_name = f"{parent}.{self.name}" + indent = " " + + if info_only: + return f"{indent}[{long_name}]: {self.desc}" + + text = f"--- {tag}[{long_name}]: {self.desc}" + if isinstance(self.cmd, str): + text += "\n" + self.cmd + elif isinstance(self.cmd, list): + for sub in self.cmd: + text += "\n" * 2 + sub.generate_text(info_only, parent=self.name) + else: + raise ValueError("Unexpected type for cmd.") + + lines: list[str] = [] + for line in text.splitlines(): + if len(line) == 0: + lines.append("") + else: + lines.append(indent + line) + text = "\n".join(lines) + + return text + + +_docker_uninstall = _Cmd( + "uninstall", + "uninstall apt docker", + """ +for pkg in docker.io docker-doc docker-compose \ +podman-docker containerd runc; \ +do sudo apt-get remove $pkg; done +""", +) + +_docker_apt_certs = _Cmd( + "apt-certs", + "prepare apt certs", + """ +sudo apt-get update +sudo apt-get install ca-certificates curl +sudo install -m 0755 -d /etc/apt/keyrings +""", +) + +_docker_docker_certs = _Cmd( + "docker-certs", + "add docker certs", + """ +sudo curl -fsSL https://download.docker.com/linux/debian/gpg \ +-o /etc/apt/keyrings/docker.asc +sudo chmod a+r /etc/apt/keyrings/docker.asc +""", +) + +_docker_apt_repo = _Cmd( + "apt-repo", + "add docker apt repo", + """ +echo \\ + "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \ +https://download.docker.com/linux/debian \\ + $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\ + sudo tee /etc/apt/sources.list.d/docker.list > /dev/null +""", +) + +_docker_install = _Cmd( + "install", + "update apt and install docker", + """ +sudo apt-get update +sudo apt-get install docker-ce docker-ce-cli containerd.io \ +docker-buildx-plugin docker-compose-plugin +""", +) + +_docker_setup = _Cmd( + "setup", + "setup system for docker", + """ +sudo systemctl enable docker +sudo systemctl start docker +sudo groupadd -f docker +sudo usermod -aG docker $USER +# Remember to log out and log back in for the group changes to take effect +""", +) + +_docker = _Cmd( + "install-docker", + "install docker for a fresh new system", + [ + _docker_uninstall, + _docker_apt_certs, + _docker_docker_certs, + _docker_apt_repo, + _docker_install, + _docker_setup, + ], +) + +_update_blog = _Cmd( + "update-blog", + "re-generate blog pages", + """ +docker exec -it blog /scripts/update.bash +""", +) + +_git_user = _Cmd( + "git-user", + "add/set git server user and password", + """ +docker run -it --rm -v "$ps_file:/user-info" httpd htpasswd "/user-info" [username] +""", +) + + +class GenCmdProvider(AppCommandFeatureProvider): + def __init__(self) -> None: + super().__init__("gen-cmd-provider") + self._cmds: dict[str, _Cmd] = {} + self._register_cmds(_docker, _update_blog, _git_user) + + def _register_cmd(self, cmd: "_Cmd"): + self._cmds[cmd.name] = cmd.clean() + + def _register_cmds(self, *cmds: "_Cmd"): + for c in cmds: + self._register_cmd(c) + + def setup(self): + pass + + def get_command_info(self): + return ("gen-cmd", "Get commands of running external cli tools.") + + def setup_arg_parser(self, arg_parser): + subparsers = arg_parser.add_subparsers( + dest="gen_cmd", metavar="GEN_CMD_COMMAND" + ) + certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") + certbot_parser.add_argument( + "-t", "--test", action="store_true", help="run certbot in test mode" + ) + for cmd in self._cmds.values(): + subparsers.add_parser(cmd.name, help=cmd.desc) + + def _print_cmd(self, name: str): + print(self._cmds[name].generate_text(False)) + + def run_command(self, args): + if args.gen_cmd is None or args.gen_cmd == "list": + print("[certbot]: certbot ssl cert commands") + for cmd in self._cmds.values(): + print(cmd.generate_text(True)) + elif args.gen_cmd == "certbot": + self.app.get_feature(NginxManager).print_all_certbot_commands(args.test) + else: + self._print_cmd(args.gen_cmd) diff --git a/python/cru/service/_nginx.py b/python/cru/service/_nginx.py new file mode 100644 index 0000000..87cff6d --- /dev/null +++ b/python/cru/service/_nginx.py @@ -0,0 +1,263 @@ +from argparse import Namespace +from enum import Enum, auto +import re +import subprocess +from typing import TypeAlias + +from cru import CruInternalError + +from ._base import AppCommandFeatureProvider +from ._template import TemplateManager + + +class CertbotAction(Enum): + CREATE = auto() + EXPAND = auto() + SHRINK = auto() + RENEW = auto() + + +class NginxManager(AppCommandFeatureProvider): + CertbotAction: TypeAlias = CertbotAction + + def __init__(self) -> None: + super().__init__("nginx-manager") + self._domains_cache: list[str] | None = None + + def setup(self) -> None: + pass + + @property + def _template_manager(self) -> TemplateManager: + return self.app.get_feature(TemplateManager) + + @property + def root_domain(self) -> str: + return self._template_manager.get_domain() + + @property + def domains(self) -> list[str]: + if self._domains_cache is None: + self._domains_cache = self._get_domains() + return self._domains_cache + + @property + def subdomains(self) -> list[str]: + suffix = "." + self.root_domain + return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)] + + def _get_domains_from_text(self, text: str) -> set[str]: + domains: set[str] = set() + regex = re.compile(r"server_name\s+(\S+)\s*;") + for match in regex.finditer(text): + domains.add(match[1]) + return domains + + def _join_generated_nginx_conf_text(self) -> str: + result = "" + for path, text in self._template_manager.generate(): + if "nginx" in str(path): + result += text + return result + + def _get_domains(self) -> list[str]: + text = self._join_generated_nginx_conf_text() + domains = self._get_domains_from_text(text) + domains.remove(self.root_domain) + return [self.root_domain, *domains] + + def _print_domains(self) -> None: + for domain in self.domains: + print(domain) + + def _certbot_command( + self, + action: CertbotAction | str, + test: bool, + *, + docker=True, + standalone=None, + email=None, + agree_tos=True, + ) -> str: + if isinstance(action, str): + action = CertbotAction[action.upper()] + + command_args = [] + + add_domain_option = True + if action is CertbotAction.CREATE: + if standalone is None: + standalone = True + command_action = "certonly" + elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]: + if standalone is None: + standalone = False + command_action = "certonly" + elif action is CertbotAction.RENEW: + if standalone is None: + standalone = False + add_domain_option = False + command_action = "renew" + else: + raise CruInternalError("Invalid certbot action.") + + data_dir = self.app.data_dir.full_path.as_posix() + + if not docker: + command_args.append("certbot") + else: + command_args.extend( + [ + "docker run -it --rm --name certbot", + f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"', + f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"', + ] + ) + if standalone: + command_args.append('-p "0.0.0.0:80:80"') + else: + command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"') + + command_args.append("certbot/certbot") + + command_args.append(command_action) + + command_args.append(f"--cert-name {self.root_domain}") + + if standalone: + command_args.append("--standalone") + else: + command_args.append("--webroot -w /var/www/certbot") + + if add_domain_option: + command_args.append(" ".join([f"-d {domain}" for domain in self.domains])) + + if email is not None: + command_args.append(f"--email {email}") + + if agree_tos: + command_args.append("--agree-tos") + + if test: + command_args.append("--test-cert --dry-run") + + return " ".join(command_args) + + def print_all_certbot_commands(self, test: bool): + print("### COMMAND: (standalone) create certs") + print( + self._certbot_command( + CertbotAction.CREATE, + test, + email=self._template_manager.get_email(), + ) + ) + print() + print("### COMMAND: (webroot+nginx) expand or shrink certs") + print( + self._certbot_command( + CertbotAction.EXPAND, + test, + email=self._template_manager.get_email(), + ) + ) + print() + print("### COMMAND: (webroot+nginx) renew certs") + print( + self._certbot_command( + CertbotAction.RENEW, + test, + email=self._template_manager.get_email(), + ) + ) + + @property + def _cert_path_str(self) -> str: + return str( + self.app.data_dir.full_path + / "certbot/certs/live" + / self.root_domain + / "fullchain.pem" + ) + + def get_command_info(self): + return "nginx", "Manage nginx related things." + + def setup_arg_parser(self, arg_parser): + subparsers = arg_parser.add_subparsers( + dest="nginx_command", required=True, metavar="NGINX_COMMAND" + ) + _list_parser = subparsers.add_parser("list", help="list domains") + certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") + certbot_parser.add_argument( + "--no-test", + action="store_true", + help="remove args making certbot run in test mode", + ) + + def run_command(self, args: Namespace) -> None: + if args.nginx_command == "list": + self._print_domains() + elif args.nginx_command == "certbot": + self.print_all_certbot_commands(not args.no_test) + + def _generate_dns_zone( + self, + ip: str, + /, + ttl: str | int = 600, + *, + enable_mail: bool = True, + dkim: str | None = None, + ) -> str: + # TODO: Not complete and test now. + root_domain = self.root_domain + result = f"$ORIGIN {root_domain}.\n\n" + result += "; A records\n" + result += f"@ {ttl} IN A {ip}\n" + for subdomain in self.subdomains: + result += f"{subdomain} {ttl} IN A {ip}\n" + + if enable_mail: + result += "\n; MX records\n" + result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n" + result += "\n; SPF record\n" + result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n' + if dkim is not None: + result += "\n; DKIM record\n" + result += f'mail._domainkey {ttl} IN TEXT "{dkim}"' + result += "\n; DMARC record\n" + dmarc_options = [ + "v=DMARC1", + "p=none", + f"rua=mailto:dmarc.report@{root_domain}", + f"ruf=mailto:dmarc.report@{root_domain}", + "sp=none", + "ri=86400", + ] + result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n' + return result + + def _get_dkim_from_mailserver(self) -> str | None: + # TODO: Not complete and test now. + dkim_path = ( + self.app.data_dir.full_path + / "dms/config/opendkim/keys" + / self.root_domain + / "mail.txt" + ) + if not dkim_path.exists(): + return None + + p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True) + value = "" + for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")): + value += match.group(1) + return value + + def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str: + # TODO: Not complete and test now. + return self._generate_dns_zone( + ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver() + ) diff --git a/python/cru/service/_template.py b/python/cru/service/_template.py new file mode 100644 index 0000000..22c1d21 --- /dev/null +++ b/python/cru/service/_template.py @@ -0,0 +1,228 @@ +from argparse import Namespace +from pathlib import Path +import shutil +from typing import NamedTuple +import graphlib + +from cru import CruException +from cru.parsing import SimpleLineVarParser +from cru.template import TemplateTree, CruStrWrapperTemplate + +from ._base import AppCommandFeatureProvider, AppFeaturePath + + +class _Config(NamedTuple): + text: str + config: dict[str, str] + + +class _GeneratedConfig(NamedTuple): + base: _Config + private: _Config + merged: _Config + + +class _PreConfig(NamedTuple): + base: _Config + private: _Config + config: dict[str, str] + + @staticmethod + def create(base: _Config, private: _Config) -> "_PreConfig": + return _PreConfig(base, private, {**base.config, **private.config}) + + def _merge(self, generated: _Config): + text = ( + "\n".join( + [ + self.private.text.strip(), + self.base.text.strip(), + generated.text.strip(), + ] + ) + + "\n" + ) + config = {**self.config, **generated.config} + return _GeneratedConfig(self.base, self.private, _Config(text, config)) + + +class _Template(NamedTuple): + config: CruStrWrapperTemplate + config_vars: set[str] + tree: TemplateTree + + +class TemplateManager(AppCommandFeatureProvider): + def __init__(self): + super().__init__("template-manager") + + def setup(self) -> None: + self._base_config_file = self.app.services_dir.add_subpath("base-config", False) + self._private_config_file = self.app.data_dir.add_subpath("config", False) + self._template_config_file = self.app.services_dir.add_subpath( + "config.template", False + ) + self._templates_dir = self.app.services_dir.add_subpath("templates", True) + self._generated_dir = self.app.services_dir.add_subpath("generated", True) + + self._config_parser = SimpleLineVarParser() + + def _read_pre(app_path: AppFeaturePath) -> _Config: + text = app_path.read_text() + config = self._read_config(text) + return _Config(text, config) + + base = _read_pre(self._base_config_file) + private = _read_pre(self._private_config_file) + self._preconfig = _PreConfig.create(base, private) + + self._generated: _GeneratedConfig | None = None + + template_config_text = self._template_config_file.read_text() + self._template_config = self._read_config(template_config_text) + + self._template = _Template( + CruStrWrapperTemplate(template_config_text), + set(self._template_config.keys()), + TemplateTree( + lambda text: CruStrWrapperTemplate(text), + self.templates_dir.full_path_str, + ), + ) + + self._real_required_vars = ( + self._template.config_vars | self._template.tree.variables + ) - self._template.config_vars + lacks = self._real_required_vars - self._preconfig.config.keys() + self._lack_vars = lacks if len(lacks) > 0 else None + + def _read_config_entry_names(self, text: str) -> set[str]: + return set(entry.key for entry in self._config_parser.parse(text)) + + def _read_config(self, text: str) -> dict[str, str]: + return {entry.key: entry.value for entry in self._config_parser.parse(text)} + + @property + def templates_dir(self) -> AppFeaturePath: + return self._templates_dir + + @property + def generated_dir(self) -> AppFeaturePath: + return self._generated_dir + + def get_domain(self) -> str: + return self._preconfig.config["CRUPEST_DOMAIN"] + + def get_email(self) -> str: + return self._preconfig.config["CRUPEST_EMAIL"] + + def _generate_template_config(self, config: dict[str, str]) -> dict[str, str]: + entry_templates = { + key: CruStrWrapperTemplate(value) + for key, value in self._template_config.items() + } + sorter = graphlib.TopologicalSorter( + config + | {key: template.variables for key, template in entry_templates.items()} + ) + + vars: dict[str, str] = config.copy() + for _ in sorter.static_order(): + del_keys = [] + for key, template in entry_templates.items(): + new = template.generate_partial(vars) + if not new.has_variables: + vars[key] = new.generate({}) + del_keys.append(key) + else: + entry_templates[key] = new + for key in del_keys: + del entry_templates[key] + assert len(entry_templates) == 0 + return {key: value for key, value in vars.items() if key not in config} + + def _generate_config(self) -> _GeneratedConfig: + if self._generated is not None: + return self._generated + if self._lack_vars is not None: + raise CruException(f"Required vars are not defined: {self._lack_vars}.") + config = self._generate_template_config(self._preconfig.config) + text = self._template.config.generate(self._preconfig.config | config) + self._generated = self._preconfig._merge(_Config(text, config)) + return self._generated + + def generate(self) -> list[tuple[Path, str]]: + config = self._generate_config() + return [ + (Path("config"), config.merged.text), + *self._template.tree.generate(config.merged.config), + ] + + def _generate_files(self, dry_run: bool) -> None: + result = self.generate() + if not dry_run: + if self.generated_dir.full_path.exists(): + shutil.rmtree(self.generated_dir.full_path) + for path, text in result: + des = self.generated_dir.full_path / path + des.parent.mkdir(parents=True, exist_ok=True) + with open(des, "w") as f: + f.write(text) + + def get_command_info(self): + return ("template", "Manage templates.") + + def _print_file_lists(self) -> None: + print(f"[{self._template.config.variable_count}]", "config") + for path, template in self._template.tree.templates: + print(f"[{template.variable_count}]", path.as_posix()) + + def _print_vars(self, required: bool) -> None: + for var in self._template.config.variables: + print(f"[config] {var}") + for var in self._template.tree.variables: + if not (required and var in self._template.config_vars): + print(f"[template] {var}") + + def _run_check_vars(self) -> None: + if self._lack_vars is not None: + print("Lacks:") + for var in self._lack_vars: + print(var) + + def setup_arg_parser(self, arg_parser): + subparsers = arg_parser.add_subparsers( + dest="template_command", required=True, metavar="TEMPLATE_COMMAND" + ) + _list_parser = subparsers.add_parser("list", help="list templates") + vars_parser = subparsers.add_parser( + "vars", help="list variables used in all templates" + ) + vars_parser.add_argument( + "-r", + "--required", + help="only list really required one.", + action="store_true", + ) + _check_vars_parser = subparsers.add_parser( + "check-vars", + help="check if required vars are set", + ) + generate_parser = subparsers.add_parser("generate", help="generate templates") + generate_parser.add_argument( + "--no-dry-run", action="store_true", help="generate and write target files" + ) + + def run_command(self, args: Namespace) -> None: + if args.template_command == "list": + self._print_file_lists() + elif args.template_command == "vars": + self._print_vars(args.required) + elif args.template_command == "generate": + dry_run = not args.no_dry_run + self._generate_files(dry_run) + if dry_run: + print("Dry run successfully.") + print( + f"Will delete dir {self.generated_dir.full_path_str} if it exists." + ) diff --git a/python/cru/system.py b/python/cru/system.py new file mode 100644 index 0000000..f321717 --- /dev/null +++ b/python/cru/system.py @@ -0,0 +1,23 @@ +import os.path +import re + + +def check_debian_derivative_version(name: str) -> None | str: + if not os.path.isfile("/etc/os-release"): + return None + with open("/etc/os-release", "r") as f: + content = f.read() + if f"ID={name}" not in content: + return None + m = re.search(r'VERSION_ID="(.+)"', content) + if m is None: + return None + return m.group(1) + + +def check_ubuntu_version() -> None | str: + return check_debian_derivative_version("ubuntu") + + +def check_debian_version() -> None | str: + return check_debian_derivative_version("debian") diff --git a/python/cru/template.py b/python/cru/template.py new file mode 100644 index 0000000..3a70337 --- /dev/null +++ b/python/cru/template.py @@ -0,0 +1,209 @@ +from abc import ABCMeta, abstractmethod +from collections.abc import Callable, Mapping +from pathlib import Path +from string import Template +from typing import Generic, Self, TypeVar + +from ._iter import CruIterator +from ._error import CruException + +from .parsing import StrWrapperVarParser + + +class CruTemplateError(CruException): + pass + + +class CruTemplateBase(metaclass=ABCMeta): + def __init__(self, text: str): + self._text = text + self._variables: set[str] | None = None + + @abstractmethod + def _get_variables(self) -> set[str]: + raise NotImplementedError() + + @property + def text(self) -> str: + return self._text + + @property + def variables(self) -> set[str]: + if self._variables is None: + self._variables = self._get_variables() + return self._variables + + @property + def variable_count(self) -> int: + return len(self.variables) + + @property + def has_variables(self) -> bool: + return self.variable_count > 0 + + @abstractmethod + def _do_generate(self, mapping: dict[str, str]) -> str: + raise NotImplementedError() + + def _generate_partial( + self, mapping: Mapping[str, str], allow_unused: bool = True + ) -> str: + values = dict(mapping) + if not allow_unused and not len(set(values.keys() - self.variables)) != 0: + raise CruTemplateError("Unused variables.") + return self._do_generate(values) + + def generate_partial( + self, mapping: Mapping[str, str], allow_unused: bool = True + ) -> Self: + return self.__class__(self._generate_partial(mapping, allow_unused)) + + def generate(self, mapping: Mapping[str, str], allow_unused: bool = True) -> str: + values = dict(mapping) + if len(self.variables - values.keys()) != 0: + raise CruTemplateError( + f"Missing variables: {self.variables - values.keys()} ." + ) + return self._generate_partial(values, allow_unused) + + +class CruTemplate(CruTemplateBase): + def __init__(self, prefix: str, text: str): + super().__init__(text) + self._prefix = prefix + self._template = Template(text) + + def _get_variables(self) -> set[str]: + return ( + CruIterator(self._template.get_identifiers()) + .filter(lambda i: i.startswith(self.prefix)) + .to_set() + ) + + @property + def prefix(self) -> str: + return self._prefix + + @property + def py_template(self) -> Template: + return self._template + + @property + def all_variables(self) -> set[str]: + return set(self._template.get_identifiers()) + + def _do_generate(self, mapping: dict[str, str]) -> str: + return self._template.safe_substitute(mapping) + + +class CruStrWrapperTemplate(CruTemplateBase): + def __init__(self, text: str, wrapper: str = "@@"): + super().__init__(text) + self._wrapper = wrapper + self._tokens: StrWrapperVarParser.Result + + @property + def wrapper(self) -> str: + return self._wrapper + + def _get_variables(self): + self._tokens = StrWrapperVarParser(self.wrapper).parse(self.text) + return ( + self._tokens.cru_iter() + .filter(lambda t: t.is_var) + .map(lambda t: t.value) + .to_set() + ) + + def _do_generate(self, mapping): + return ( + self._tokens.cru_iter() + .map(lambda t: mapping[t.value] if t.is_var else t.value) + .join_str("") + ) + + +_Template = TypeVar("_Template", bound=CruTemplateBase) + + +class TemplateTree(Generic[_Template]): + def __init__( + self, + template_generator: Callable[[str], _Template], + source: str, + *, + template_file_suffix: str | None = ".template", + ): + """ + If template_file_suffix is not None, the files will be checked according to the + suffix of the file name. If the suffix matches, the file will be regarded as a + template file. Otherwise, it will be regarded as a non-template file. + Content of template file must contain variables that need to be replaced, while + content of non-template file may not contain any variables. + If either case is false, it generally means whether the file is a template is + wrongly handled. + """ + self._template_generator = template_generator + self._files: list[tuple[Path, _Template]] = [] + self._source = source + self._template_file_suffix = template_file_suffix + self._load() + + @property + def templates(self) -> list[tuple[Path, _Template]]: + return self._files + + @property + def source(self) -> str: + return self._source + + @property + def template_file_suffix(self) -> str | None: + return self._template_file_suffix + + @staticmethod + def _scan_files(root: str) -> list[Path]: + root_path = Path(root) + result: list[Path] = [] + for path in root_path.glob("**/*"): + if not path.is_file(): + continue + path = path.relative_to(root_path) + result.append(Path(path)) + return result + + def _load(self) -> None: + files = self._scan_files(self.source) + for file_path in files: + template_file = Path(self.source) / file_path + with open(template_file, "r") as f: + content = f.read() + template = self._template_generator(content) + if self.template_file_suffix is not None: + should_be_template = file_path.name.endswith(self.template_file_suffix) + if should_be_template and not template.has_variables: + raise CruTemplateError( + f"Template file {file_path} has no variables." + ) + elif not should_be_template and template.has_variables: + raise CruTemplateError(f"Non-template {file_path} has variables.") + self._files.append((file_path, template)) + + @property + def variables(self) -> set[str]: + s = set() + for _, template in self.templates: + s.update(template.variables) + return s + + def generate(self, variables: Mapping[str, str]) -> list[tuple[Path, str]]: + result: list[tuple[Path, str]] = [] + for path, template in self.templates: + if self.template_file_suffix is not None and path.name.endswith( + self.template_file_suffix + ): + path = path.parent / (path.name[: -len(self.template_file_suffix)]) + + text = template.generate(variables) + result.append((path, text)) + return result diff --git a/python/cru/tool.py b/python/cru/tool.py new file mode 100644 index 0000000..377f5d7 --- /dev/null +++ b/python/cru/tool.py @@ -0,0 +1,82 @@ +import shutil +import subprocess +from typing import Any +from collections.abc import Iterable + +from ._error import CruException + + +class CruExternalToolError(CruException): + def __init__(self, message: str, tool: str, *args, **kwargs) -> None: + super().__init__(message, *args, **kwargs) + self._tool = tool + + @property + def tool(self) -> str: + return self._tool + + +class CruExternalToolNotFoundError(CruExternalToolError): + def __init__(self, message: str | None, tool: str, *args, **kwargs) -> None: + super().__init__( + message or f"Could not find binary for {tool}.", tool, *args, **kwargs + ) + + +class CruExternalToolRunError(CruExternalToolError): + def __init__( + self, + message: str, + tool: str, + tool_args: Iterable[str], + tool_error: Any, + *args, + **kwargs, + ) -> None: + super().__init__(message, tool, *args, **kwargs) + self._tool_args = list(tool_args) + self._tool_error = tool_error + + @property + def tool_args(self) -> list[str]: + return self._tool_args + + @property + def tool_error(self) -> Any: + return self._tool_error + + +class ExternalTool: + def __init__(self, bin: str) -> None: + self._bin = bin + + @property + def bin(self) -> str: + return self._bin + + @bin.setter + def bin(self, value: str) -> None: + self._bin = value + + @property + def bin_path(self) -> str: + real_bin = shutil.which(self.bin) + if not real_bin: + raise CruExternalToolNotFoundError(None, self.bin) + return real_bin + + def run( + self, *process_args: str, **subprocess_kwargs + ) -> subprocess.CompletedProcess: + try: + return subprocess.run( + [self.bin_path] + list(process_args), **subprocess_kwargs + ) + except subprocess.CalledProcessError as e: + raise CruExternalToolError("Subprocess failed.", self.bin) from e + except OSError as e: + raise CruExternalToolError("Failed to start subprocess", self.bin) from e + + def run_get_output(self, *process_args: str, **subprocess_kwargs) -> Any: + process = self.run(*process_args, capture_output=True, **subprocess_kwargs) + return process.stdout diff --git a/python/cru/value.py b/python/cru/value.py new file mode 100644 index 0000000..9c03219 --- /dev/null +++ b/python/cru/value.py @@ -0,0 +1,292 @@ +from __future__ import annotations + +import random +import secrets +import string +import uuid +from abc import abstractmethod, ABCMeta +from collections.abc import Callable +from typing import Any, ClassVar, TypeVar, Generic + +from ._error import CruException + + +def _str_case_in(s: str, case: bool, str_list: list[str]) -> bool: + if case: + return s in str_list + else: + return s.lower() in [s.lower() for s in str_list] + + +_T = TypeVar("_T") + + +class CruValueTypeError(CruException): + def __init__( + self, + message: str, + value: Any, + value_type: ValueType | None, + *args, + **kwargs, + ): + super().__init__( + message, + *args, + **kwargs, + ) + self._value = value + self._value_type = value_type + + @property + def value(self) -> Any: + return self._value + + @property + def value_type(self) -> ValueType | None: + return self._value_type + + +class ValueType(Generic[_T], metaclass=ABCMeta): + def __init__(self, name: str, _type: type[_T]) -> None: + self._name = name + self._type = _type + + @property + def name(self) -> str: + return self._name + + @property + def type(self) -> type[_T]: + return self._type + + def check_value_type(self, value: Any) -> None: + if not isinstance(value, self.type): + raise CruValueTypeError("Type of value is wrong.", value, self) + + def _do_check_value(self, value: Any) -> _T: + return value + + def check_value(self, value: Any) -> _T: + self.check_value_type(value) + return self._do_check_value(value) + + @abstractmethod + def _do_check_str_format(self, s: str) -> None: + raise NotImplementedError() + + def check_str_format(self, s: str) -> None: + if not isinstance(s, str): + raise CruValueTypeError("Try to check format on a non-str.", s, self) + self._do_check_str_format(s) + + @abstractmethod + def _do_convert_value_to_str(self, value: _T) -> str: + raise NotImplementedError() + + def convert_value_to_str(self, value: _T) -> str: + self.check_value(value) + return self._do_convert_value_to_str(value) + + @abstractmethod + def _do_convert_str_to_value(self, s: str) -> _T: + raise NotImplementedError() + + def convert_str_to_value(self, s: str) -> _T: + self.check_str_format(s) + return self._do_convert_str_to_value(s) + + def check_value_or_try_convert_from_str(self, value_or_str: Any) -> _T: + try: + return self.check_value(value_or_str) + except CruValueTypeError: + if isinstance(value_or_str, str): + return self.convert_str_to_value(value_or_str) + else: + raise + + def create_default_value(self) -> _T: + return self.type() + + +class TextValueType(ValueType[str]): + def __init__(self) -> None: + super().__init__("text", str) + + def _do_check_str_format(self, _s): + return + + def _do_convert_value_to_str(self, value): + return value + + def _do_convert_str_to_value(self, s): + return s + + +class IntegerValueType(ValueType[int]): + def __init__(self) -> None: + super().__init__("integer", int) + + def _do_check_str_format(self, s): + try: + int(s) + except ValueError as e: + raise CruValueTypeError("Invalid integer format.", s, self) from e + + def _do_convert_value_to_str(self, value): + return str(value) + + def _do_convert_str_to_value(self, s): + return int(s) + + +class FloatValueType(ValueType[float]): + def __init__(self) -> None: + super().__init__("float", float) + + def _do_check_str_format(self, s): + try: + float(s) + except ValueError as e: + raise CruValueTypeError("Invalid float format.", s, self) from e + + def _do_convert_value_to_str(self, value): + return str(value) + + def _do_convert_str_to_value(self, s): + return float(s) + + +class BooleanValueType(ValueType[bool]): + DEFAULT_TRUE_LIST: ClassVar[list[str]] = ["true", "yes", "y", "on", "1"] + DEFAULT_FALSE_LIST: ClassVar[list[str]] = ["false", "no", "n", "off", "0"] + + def __init__( + self, + *, + case_sensitive=False, + true_list: None | list[str] = None, + false_list: None | list[str] = None, + ) -> None: + super().__init__("boolean", bool) + self._case_sensitive = case_sensitive + self._valid_true_strs: list[str] = ( + true_list or BooleanValueType.DEFAULT_TRUE_LIST + ) + self._valid_false_strs: list[str] = ( + false_list or BooleanValueType.DEFAULT_FALSE_LIST + ) + + @property + def case_sensitive(self) -> bool: + return self._case_sensitive + + @property + def valid_true_strs(self) -> list[str]: + return self._valid_true_strs + + @property + def valid_false_strs(self) -> list[str]: + return self._valid_false_strs + + @property + def valid_boolean_strs(self) -> list[str]: + return self._valid_true_strs + self._valid_false_strs + + def _do_check_str_format(self, s): + if not _str_case_in(s, self.case_sensitive, self.valid_boolean_strs): + raise CruValueTypeError("Invalid boolean format.", s, self) + + def _do_convert_value_to_str(self, value): + return self._valid_true_strs[0] if value else self._valid_false_strs[0] + + def _do_convert_str_to_value(self, s): + return _str_case_in(s, self.case_sensitive, self._valid_true_strs) + + def create_default_value(self): + return self.valid_false_strs[0] + + +class EnumValueType(ValueType[str]): + def __init__(self, valid_values: list[str], /, case_sensitive=False) -> None: + super().__init__(f"enum({'|'.join(valid_values)})", str) + self._case_sensitive = case_sensitive + self._valid_values = valid_values + + @property + def case_sensitive(self) -> bool: + return self._case_sensitive + + @property + def valid_values(self) -> list[str]: + return self._valid_values + + def _do_check_value(self, value): + self._do_check_str_format(value) + + def _do_check_str_format(self, s): + if not _str_case_in(s, self.case_sensitive, self.valid_values): + raise CruValueTypeError("Invalid enum value", s, self) + + def _do_convert_value_to_str(self, value): + return value + + def _do_convert_str_to_value(self, s): + return s + + def create_default_value(self): + return self.valid_values[0] + + +TEXT_VALUE_TYPE = TextValueType() +INTEGER_VALUE_TYPE = IntegerValueType() +BOOLEAN_VALUE_TYPE = BooleanValueType() + + +class ValueGeneratorBase(Generic[_T], metaclass=ABCMeta): + @abstractmethod + def generate(self) -> _T: + raise NotImplementedError() + + def __call__(self) -> _T: + return self.generate() + + +class ValueGenerator(ValueGeneratorBase[_T]): + def __init__(self, generate_func: Callable[[], _T]) -> None: + self._generate_func = generate_func + + @property + def generate_func(self) -> Callable[[], _T]: + return self._generate_func + + def generate(self) -> _T: + return self._generate_func() + + +class UuidValueGenerator(ValueGeneratorBase[str]): + def generate(self): + return str(uuid.uuid4()) + + +class RandomStringValueGenerator(ValueGeneratorBase[str]): + def __init__(self, length: int, secure: bool) -> None: + self._length = length + self._secure = secure + + @property + def length(self) -> int: + return self._length + + @property + def secure(self) -> bool: + return self._secure + + def generate(self): + random_func = secrets.choice if self._secure else random.choice + characters = string.ascii_letters + string.digits + random_string = "".join(random_func(characters) for _ in range(self._length)) + return random_string + + +UUID_VALUE_GENERATOR = UuidValueGenerator() diff --git a/python/poetry.lock b/python/poetry.lock new file mode 100644 index 0000000..4338200 --- /dev/null +++ b/python/poetry.lock @@ -0,0 +1,111 @@ +# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. + +[[package]] +name = "mypy" +version = "1.15.0" +description = "Optional static typing for Python" +optional = false +python-versions = ">=3.9" +groups = ["dev"] +files = [ + {file = "mypy-1.15.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:979e4e1a006511dacf628e36fadfecbcc0160a8af6ca7dad2f5025529e082c13"}, + {file = "mypy-1.15.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c4bb0e1bd29f7d34efcccd71cf733580191e9a264a2202b0239da95984c5b559"}, + {file = "mypy-1.15.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:be68172e9fd9ad8fb876c6389f16d1c1b5f100ffa779f77b1fb2176fcc9ab95b"}, + {file = "mypy-1.15.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c7be1e46525adfa0d97681432ee9fcd61a3964c2446795714699a998d193f1a3"}, + {file = "mypy-1.15.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:2e2c2e6d3593f6451b18588848e66260ff62ccca522dd231cd4dd59b0160668b"}, + {file = "mypy-1.15.0-cp310-cp310-win_amd64.whl", hash = "sha256:6983aae8b2f653e098edb77f893f7b6aca69f6cffb19b2cc7443f23cce5f4828"}, + {file = "mypy-1.15.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:2922d42e16d6de288022e5ca321cd0618b238cfc5570e0263e5ba0a77dbef56f"}, + {file = "mypy-1.15.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2ee2d57e01a7c35de00f4634ba1bbf015185b219e4dc5909e281016df43f5ee5"}, + {file = "mypy-1.15.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:973500e0774b85d9689715feeffcc980193086551110fd678ebe1f4342fb7c5e"}, + {file = "mypy-1.15.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5a95fb17c13e29d2d5195869262f8125dfdb5c134dc8d9a9d0aecf7525b10c2c"}, + {file = "mypy-1.15.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:1905f494bfd7d85a23a88c5d97840888a7bd516545fc5aaedff0267e0bb54e2f"}, + {file = "mypy-1.15.0-cp311-cp311-win_amd64.whl", hash = "sha256:c9817fa23833ff189db061e6d2eff49b2f3b6ed9856b4a0a73046e41932d744f"}, + {file = "mypy-1.15.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:aea39e0583d05124836ea645f412e88a5c7d0fd77a6d694b60d9b6b2d9f184fd"}, + {file = "mypy-1.15.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:2f2147ab812b75e5b5499b01ade1f4a81489a147c01585cda36019102538615f"}, + {file = "mypy-1.15.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ce436f4c6d218a070048ed6a44c0bbb10cd2cc5e272b29e7845f6a2f57ee4464"}, + {file = "mypy-1.15.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8023ff13985661b50a5928fc7a5ca15f3d1affb41e5f0a9952cb68ef090b31ee"}, + {file = "mypy-1.15.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1124a18bc11a6a62887e3e137f37f53fbae476dc36c185d549d4f837a2a6a14e"}, + {file = "mypy-1.15.0-cp312-cp312-win_amd64.whl", hash = "sha256:171a9ca9a40cd1843abeca0e405bc1940cd9b305eaeea2dda769ba096932bb22"}, + {file = "mypy-1.15.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:93faf3fdb04768d44bf28693293f3904bbb555d076b781ad2530214ee53e3445"}, + {file = "mypy-1.15.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:811aeccadfb730024c5d3e326b2fbe9249bb7413553f15499a4050f7c30e801d"}, + {file = "mypy-1.15.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:98b7b9b9aedb65fe628c62a6dc57f6d5088ef2dfca37903a7d9ee374d03acca5"}, + {file = "mypy-1.15.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c43a7682e24b4f576d93072216bf56eeff70d9140241f9edec0c104d0c515036"}, + {file = "mypy-1.15.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:baefc32840a9f00babd83251560e0ae1573e2f9d1b067719479bfb0e987c6357"}, + {file = "mypy-1.15.0-cp313-cp313-win_amd64.whl", hash = "sha256:b9378e2c00146c44793c98b8d5a61039a048e31f429fb0eb546d93f4b000bedf"}, + {file = "mypy-1.15.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:e601a7fa172c2131bff456bb3ee08a88360760d0d2f8cbd7a75a65497e2df078"}, + {file = "mypy-1.15.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:712e962a6357634fef20412699a3655c610110e01cdaa6180acec7fc9f8513ba"}, + {file = "mypy-1.15.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f95579473af29ab73a10bada2f9722856792a36ec5af5399b653aa28360290a5"}, + {file = "mypy-1.15.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8f8722560a14cde92fdb1e31597760dc35f9f5524cce17836c0d22841830fd5b"}, + {file = "mypy-1.15.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:1fbb8da62dc352133d7d7ca90ed2fb0e9d42bb1a32724c287d3c76c58cbaa9c2"}, + {file = "mypy-1.15.0-cp39-cp39-win_amd64.whl", hash = "sha256:d10d994b41fb3497719bbf866f227b3489048ea4bbbb5015357db306249f7980"}, + {file = "mypy-1.15.0-py3-none-any.whl", hash = "sha256:5469affef548bd1895d86d3bf10ce2b44e33d86923c29e4d675b3e323437ea3e"}, + {file = "mypy-1.15.0.tar.gz", hash = "sha256:404534629d51d3efea5c800ee7c42b72a6554d6c400e6a79eafe15d11341fd43"}, +] + +[package.dependencies] +mypy_extensions = ">=1.0.0" +typing_extensions = ">=4.6.0" + +[package.extras] +dmypy = ["psutil (>=4.0)"] +faster-cache = ["orjson"] +install-types = ["pip"] +mypyc = ["setuptools (>=50)"] +reports = ["lxml"] + +[[package]] +name = "mypy-extensions" +version = "1.0.0" +description = "Type system extensions for programs checked with the mypy type checker." +optional = false +python-versions = ">=3.5" +groups = ["dev"] +files = [ + {file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"}, + {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, +] + +[[package]] +name = "ruff" +version = "0.9.6" +description = "An extremely fast Python linter and code formatter, written in Rust." +optional = false +python-versions = ">=3.7" +groups = ["dev"] +files = [ + {file = "ruff-0.9.6-py3-none-linux_armv6l.whl", hash = "sha256:2f218f356dd2d995839f1941322ff021c72a492c470f0b26a34f844c29cdf5ba"}, + {file = "ruff-0.9.6-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b908ff4df65dad7b251c9968a2e4560836d8f5487c2f0cc238321ed951ea0504"}, + {file = "ruff-0.9.6-py3-none-macosx_11_0_arm64.whl", hash = "sha256:b109c0ad2ececf42e75fa99dc4043ff72a357436bb171900714a9ea581ddef83"}, + {file = "ruff-0.9.6-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1de4367cca3dac99bcbd15c161404e849bb0bfd543664db39232648dc00112dc"}, + {file = "ruff-0.9.6-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ac3ee4d7c2c92ddfdaedf0bf31b2b176fa7aa8950efc454628d477394d35638b"}, + {file = "ruff-0.9.6-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5dc1edd1775270e6aa2386119aea692039781429f0be1e0949ea5884e011aa8e"}, + {file = "ruff-0.9.6-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:4a091729086dffa4bd070aa5dab7e39cc6b9d62eb2bef8f3d91172d30d599666"}, + {file = "ruff-0.9.6-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d1bbc6808bf7b15796cef0815e1dfb796fbd383e7dbd4334709642649625e7c5"}, + {file = "ruff-0.9.6-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:589d1d9f25b5754ff230dce914a174a7c951a85a4e9270613a2b74231fdac2f5"}, + {file = "ruff-0.9.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc61dd5131742e21103fbbdcad683a8813be0e3c204472d520d9a5021ca8b217"}, + {file = "ruff-0.9.6-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5e2d9126161d0357e5c8f30b0bd6168d2c3872372f14481136d13de9937f79b6"}, + {file = "ruff-0.9.6-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:68660eab1a8e65babb5229a1f97b46e3120923757a68b5413d8561f8a85d4897"}, + {file = "ruff-0.9.6-py3-none-musllinux_1_2_i686.whl", hash = "sha256:c4cae6c4cc7b9b4017c71114115db0445b00a16de3bcde0946273e8392856f08"}, + {file = "ruff-0.9.6-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:19f505b643228b417c1111a2a536424ddde0db4ef9023b9e04a46ed8a1cb4656"}, + {file = "ruff-0.9.6-py3-none-win32.whl", hash = "sha256:194d8402bceef1b31164909540a597e0d913c0e4952015a5b40e28c146121b5d"}, + {file = "ruff-0.9.6-py3-none-win_amd64.whl", hash = "sha256:03482d5c09d90d4ee3f40d97578423698ad895c87314c4de39ed2af945633caa"}, + {file = "ruff-0.9.6-py3-none-win_arm64.whl", hash = "sha256:0e2bb706a2be7ddfea4a4af918562fdc1bcb16df255e5fa595bbd800ce322a5a"}, + {file = "ruff-0.9.6.tar.gz", hash = "sha256:81761592f72b620ec8fa1068a6fd00e98a5ebee342a3642efd84454f3031dca9"}, +] + +[[package]] +name = "typing-extensions" +version = "4.12.2" +description = "Backported and Experimental Type Hints for Python 3.8+" +optional = false +python-versions = ">=3.8" +groups = ["dev"] +files = [ + {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, + {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, +] + +[metadata] +lock-version = "2.1" +python-versions = "^3.11" +content-hash = "674a21dbda993a1ee761e2e6e2f13ccece8289336a83fd0a154285eac48f3a76" diff --git a/python/pyproject.toml b/python/pyproject.toml new file mode 100644 index 0000000..28c753e --- /dev/null +++ b/python/pyproject.toml @@ -0,0 +1,19 @@ +[project] +name = "cru" +version = "0.1.0" +requires-python = ">=3.11" +license = "MIT" + +[tool.poetry] +package-mode = false + +[tool.poetry.group.dev.dependencies] +mypy = "^1.13.0" +ruff = "^0.9.6" + +[tool.ruff.lint] +select = ["E", "F", "B"] + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" |