From d2ed5fa3e26c3a6d0ad700791bc7de955b8d68e0 Mon Sep 17 00:00:00 2001 From: crupest Date: Mon, 11 Nov 2024 01:12:29 +0800 Subject: HALF WORK: 2024.11.27 --- tools/cru-py/cru/__init__.py | 1 + tools/cru-py/cru/_util/__init__.py | 63 +++ tools/cru-py/cru/_util/_const.py | 49 ++ tools/cru-py/cru/_util/_cru.py | 94 ++++ tools/cru-py/cru/_util/_event.py | 41 ++ tools/cru-py/cru/_util/_func.py | 259 +++++++++++ tools/cru-py/cru/_util/_lang.py | 16 + tools/cru-py/cru/_util/_list.py | 915 +++++++++++++++++++++++++++++++++++++ tools/cru-py/cru/_util/_type.py | 42 ++ tools/cru-py/cru/attr.py | 2 +- tools/cru-py/cru/excp.py | 2 +- tools/cru-py/cru/service/docker.py | 2 +- tools/cru-py/cru/util/__init__.py | 25 - tools/cru-py/cru/util/_const.py | 56 --- tools/cru-py/cru/util/_cru.py | 90 ---- tools/cru-py/cru/util/_event.py | 41 -- tools/cru-py/cru/util/_func.py | 126 ----- tools/cru-py/cru/util/_list.py | 772 ------------------------------- tools/cru-py/cru/util/_type.py | 42 -- 19 files changed, 1483 insertions(+), 1155 deletions(-) create mode 100644 tools/cru-py/cru/_util/__init__.py create mode 100644 tools/cru-py/cru/_util/_const.py create mode 100644 tools/cru-py/cru/_util/_cru.py create mode 100644 tools/cru-py/cru/_util/_event.py create mode 100644 tools/cru-py/cru/_util/_func.py create mode 100644 tools/cru-py/cru/_util/_lang.py create mode 100644 tools/cru-py/cru/_util/_list.py create mode 100644 tools/cru-py/cru/_util/_type.py delete mode 100644 tools/cru-py/cru/util/__init__.py delete mode 100644 tools/cru-py/cru/util/_const.py delete mode 100644 tools/cru-py/cru/util/_cru.py delete mode 100644 tools/cru-py/cru/util/_event.py delete mode 100644 tools/cru-py/cru/util/_func.py delete mode 100644 tools/cru-py/cru/util/_list.py delete mode 100644 tools/cru-py/cru/util/_type.py (limited to 'tools/cru-py/cru') diff --git a/tools/cru-py/cru/__init__.py b/tools/cru-py/cru/__init__.py index e36a778..2ae241e 100644 --- a/tools/cru-py/cru/__init__.py +++ b/tools/cru-py/cru/__init__.py @@ -4,6 +4,7 @@ import sys class CruInitError(Exception): pass + def check_python_version(required_version=(3, 11)): if sys.version_info < required_version: raise CruInitError(f"Python version must be >= {required_version}!") diff --git a/tools/cru-py/cru/_util/__init__.py b/tools/cru-py/cru/_util/__init__.py new file mode 100644 index 0000000..481502c --- /dev/null +++ b/tools/cru-py/cru/_util/__init__.py @@ -0,0 +1,63 @@ +from ._const import ( + CruNotFound, + CruUseDefault, + CruDontChange, + CruNoValue, + CruPlaceholder, +) +from ._func import ( + CruFunction, + CruFunctionMeta, + CruRawFunctions, + CruWrappedFunctions, + CruFunctionGenerators, +) +from ._list import ( + CruList, + CruInplaceList, + CruUniqueKeyInplaceList, + ListOperations, + CanBeList, + ElementOperation, + ElementPredicate, + ElementTransformer, + OptionalElementOperation, + ElementPredicate, + OptionalElementTransformer, +) +from ._type import TypeSet + +F = CruFunction +WF = CruWrappedFunctions +FG = CruFunctionGenerators +L = CruList + + +__all__ = [ + "CruNotFound", + "CruUseDefault", + "CruDontChange", + "CruNoValue", + "CruPlaceholder", + "CruFunction", + "CruFunctionMeta", + "CruRawFunctions", + "CruWrappedFunctions", + "CruFunctionGenerators", + "CruList", + "CruInplaceList", + "CruUniqueKeyInplaceList", + "ListOperations", + "CanBeList", + "ElementOperation", + "ElementPredicate", + "ElementTransformer", + "OptionalElementOperation", + "ElementPredicate", + "OptionalElementTransformer", + "TypeSet", + "F", + "WF", + "FG", + "L", +] diff --git a/tools/cru-py/cru/_util/_const.py b/tools/cru-py/cru/_util/_const.py new file mode 100644 index 0000000..bc02c3a --- /dev/null +++ b/tools/cru-py/cru/_util/_const.py @@ -0,0 +1,49 @@ +from enum import Enum, auto +from typing import Self, TypeGuard, TypeVar + +from ._cru import CRU + +_T = TypeVar("_T") + + +class CruConstantBase(Enum): + @classmethod + def check(cls, v: _T | Self) -> TypeGuard[Self]: + return isinstance(v, cls) + + @classmethod + def check_not(cls, v: _T | Self) -> TypeGuard[_T]: + return not cls.check(v) + + @classmethod + def value(cls) -> Self: + return cls.VALUE # type: ignore + + +class CruNotFound(CruConstantBase): + VALUE = auto() + + +class CruUseDefault(CruConstantBase): + VALUE = auto() + + +class CruDontChange(CruConstantBase): + VALUE = auto() + + +class CruNoValue(CruConstantBase): + VALUE = auto() + + +class CruPlaceholder(CruConstantBase): + VALUE = auto() + + +CRU.add_objects( + CruNotFound, + CruUseDefault, + CruDontChange, + CruNoValue, + CruPlaceholder, +) diff --git a/tools/cru-py/cru/_util/_cru.py b/tools/cru-py/cru/_util/_cru.py new file mode 100644 index 0000000..0085a80 --- /dev/null +++ b/tools/cru-py/cru/_util/_cru.py @@ -0,0 +1,94 @@ +from typing import Any + +from ._lang import remove_none + + +class _Cru: + NAME_PREFIXES = ("CRU_", "Cru", "cru_") + + def __init__(self) -> None: + self._d: dict[str, Any] = {} + + def all_names(self) -> list[str]: + return list(self._d.keys()) + + def get(self, name: str) -> Any: + return self._d[name] + + def has_name(self, name: str) -> bool: + return name in self._d + + @staticmethod + def _maybe_remove_prefix(name: str) -> str | None: + for prefix in _Cru.NAME_PREFIXES: + if name.startswith(prefix): + return name[len(prefix) :] + return None + + def _check_name_exist(self, *names: str | None) -> None: + for name in names: + if name is None: + continue + if self.has_name(name): + raise ValueError(f"Name {name} exists in CRU.") + + @staticmethod + def check_name_format(name: str) -> tuple[str, str]: + no_prefix_name = _Cru._maybe_remove_prefix(name) + if no_prefix_name is None: + raise ValueError(f"Name {name} is not prefixed with {_Cru.NAME_PREFIXES}.") + return name, no_prefix_name + + @staticmethod + def _check_object_name(o) -> tuple[str, str]: + return _Cru.check_name_format(o.__name__) + + def _do_add(self, o, *names: str | None) -> list[str]: + name_list: list[str] = remove_none(names) + for name in name_list: + self._d[name] = o + return name_list + + def add(self, o, name: str | None) -> tuple[str, str | None]: + no_prefix_name: str | None + if name is None: + name, no_prefix_name = self._check_object_name(o) + else: + no_prefix_name = self._maybe_remove_prefix(name) + + self._check_name_exist(name, no_prefix_name) + self._do_add(o, name, no_prefix_name) + return name, no_prefix_name + + def add_with_alias(self, o, name: str | None = None, *aliases: str) -> list[str]: + final_names: list[str | None] = [] + no_prefix_name: str | None + if name is None: + name, no_prefix_name = self._check_object_name(o) + self._check_name_exist(name, no_prefix_name) + final_names.extend([name, no_prefix_name]) + for alias in aliases: + no_prefix_name = self._maybe_remove_prefix(alias) + self._check_name_exist(alias, no_prefix_name) + final_names.extend([alias, no_prefix_name]) + + return self._do_add(o, *final_names) + + def add_objects(self, *objects): + final_list = [] + for o in objects: + name, no_prefix_name = self._check_object_name(o) + self._check_name_exist(name, no_prefix_name) + final_list.append((o, name, no_prefix_name)) + for o, name, no_prefix_name in final_list: + self._do_add(o, name, no_prefix_name) + + def __getitem__(self, item): + return self.get(item) + + def __getattr__(self, item): + return self.get(item) + + +CRU_NAME_PREFIXES = _Cru.NAME_PREFIXES +CRU = _Cru() diff --git a/tools/cru-py/cru/_util/_event.py b/tools/cru-py/cru/_util/_event.py new file mode 100644 index 0000000..813e33f --- /dev/null +++ b/tools/cru-py/cru/_util/_event.py @@ -0,0 +1,41 @@ +from typing import ParamSpec, TypeVar, Callable + +from ._list import CruInplaceList, CruList + +P = ParamSpec('P') +R = TypeVar('R') +F = Callable[P, R] + + +class EventHandlerToken: + def __init__(self, event: "Event", handler: F, once: bool = False) -> None: + self._event = event + self._handler = handler + self._once = once + + @property + def event(self) -> "Event": + return self._event + + @property + def handler(self) -> F: + return self._handler + + @property + def once(self) -> bool: + return self._once + + +class Event: + def __init__(self, name: str) -> None: + self._name = name + self._tokens: CruInplaceList[EventHandlerToken] = CruInplaceList() + + def register(self, handler: F, once: bool = False) -> EventHandlerToken: + token = EventHandlerToken(self, handler, once) + self._tokens.append(token) + return token + + def unregister(self, *h: EventHandlerToken | F) -> int: + + self._tokens.find_all_indices_if(lambda t: ) diff --git a/tools/cru-py/cru/_util/_func.py b/tools/cru-py/cru/_util/_func.py new file mode 100644 index 0000000..0b8b07a --- /dev/null +++ b/tools/cru-py/cru/_util/_func.py @@ -0,0 +1,259 @@ +from __future__ import annotations + +from collections.abc import Callable +from enum import Flag, auto +from typing import ( + Any, + Generic, + Iterable, + Literal, + ParamSpec, + TypeAlias, + TypeVar, + cast, +) + +from ._cru import CRU +from ._const import CruPlaceholder + +_P = ParamSpec("_P") +_T = TypeVar("_T") + + +class CruRawFunctions: + @staticmethod + def none(*_v, **_kwargs) -> None: + return None + + @staticmethod + def true(*_v, **_kwargs) -> Literal[True]: + return True + + @staticmethod + def false(*_v, **_kwargs) -> Literal[False]: + return False + + @staticmethod + def identity(v: _T) -> _T: + return v + + @staticmethod + def only_you(v: _T, *_v, **_kwargs) -> _T: + return v + + @staticmethod + def equal(a: Any, b: Any) -> bool: + return a == b + + @staticmethod + def not_equal(a: Any, b: Any) -> bool: + return a != b + + @staticmethod + def not_(v: Any) -> Any: + return not v + + +CruArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]] +CruKwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]] +CruChainableCallable: TypeAlias = Callable[ + ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]] +] + + +class CruFunctionChainMode(Flag): + ARGS = auto() + KWARGS = auto() + BOTH = ARGS | KWARGS + + +class CruFunctionMeta: + @staticmethod + def bind(func: Callable[..., _T], *bind_args, **bind_kwargs) -> Callable[..., _T]: + def bound_func(*args, **kwargs): + popped = 0 + real_args = [] + for arg in bind_args: + if CruPlaceholder.check(arg): + real_args.append(args[popped]) + popped += 1 + else: + real_args.append(arg) + real_args.extend(args[popped:]) + return func(*real_args, **(bind_kwargs | kwargs)) + + return bound_func + + @staticmethod + def chain_with_args( + funcs: Iterable[CruArgsChainableCallable], *bind_args, **bind_kwargs + ) -> CruArgsChainableCallable: + def chained_func(*args): + for func in funcs: + args = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(*args) + return args + + return chained_func + + @staticmethod + def chain_with_kwargs( + funcs: Iterable[CruKwargsChainableCallable], *bind_args, **bind_kwargs + ) -> CruKwargsChainableCallable: + def chained_func(**kwargs): + for func in funcs: + kwargs = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(**kwargs) + return kwargs + + return chained_func + + @staticmethod + def chain_with_both( + funcs: Iterable[CruChainableCallable], *bind_args, **bind_kwargs + ) -> CruChainableCallable: + def chained_func(*args, **kwargs): + for func in funcs: + args, kwargs = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)( + *args, **kwargs + ) + return args, kwargs + + return chained_func + + @staticmethod + def chain( + mode: CruFunctionChainMode, + funcs: Iterable[ + CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable + ], + *bind_args, + **bind_kwargs, + ) -> CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable: + if mode == CruFunctionChainMode.ARGS: + return CruFunctionMeta.chain_with_args( + cast(Iterable[CruArgsChainableCallable], funcs), + *bind_args, + **bind_kwargs, + ) + elif mode == CruFunctionChainMode.KWARGS: + return CruFunctionMeta.chain_with_kwargs( + cast(Iterable[CruKwargsChainableCallable], funcs), + *bind_args, + **bind_kwargs, + ) + elif mode == CruFunctionChainMode.BOTH: + return CruFunctionMeta.chain_with_both( + cast(Iterable[CruChainableCallable], funcs), *bind_args, **bind_kwargs + ) + + +# Advanced Function Wrapper +class CruFunction(Generic[_P, _T]): + + def __init__(self, f: Callable[_P, _T]): + self._f = f + + @property + def f(self) -> Callable[_P, _T]: + return self._f + + @property + def func(self) -> Callable[_P, _T]: + return self.f + + def bind(self, *bind_args, **bind_kwargs) -> CruFunction[..., _T]: + self._f = CruFunctionMeta.bind(self._f, *bind_args, **bind_kwargs) + return self + + def _iter_with_self( + self, funcs: Iterable[Callable[..., Any]] + ) -> Iterable[Callable[..., Any]]: + yield self + yield from funcs + + @staticmethod + def chain_with_args( + self, + funcs: Iterable[CruArgsChainableCallable], + *bind_args, + **bind_kwargs, + ) -> CruArgsChainableCallable: + return CruFunction( + CruFunctionMeta.chain_with_args( + self._iter_with_self(funcs), *bind_args, **bind_kwargs + ) + ) + + def chain_with_kwargs( + self, funcs: Iterable[CruKwargsChainableCallable], *bind_args, **bind_kwargs + ) -> CruKwargsChainableCallable: + return CruFunction( + CruFunctionMeta.chain_with_kwargs( + self._iter_with_self(funcs), *bind_args, **bind_kwargs + ) + ) + + def chain_with_both( + self, funcs: Iterable[CruChainableCallable], *bind_args, **bind_kwargs + ) -> CruChainableCallable: + return CruFunction( + CruFunctionMeta.chain_with_both( + self._iter_with_self(funcs), *bind_args, **bind_kwargs + ) + ) + + def chain( + self, + mode: CruFunctionChainMode, + funcs: Iterable[ + CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable + ], + *bind_args, + **bind_kwargs, + ) -> CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable: + return CruFunction( + CruFunctionMeta.chain( + mode, self._iter_with_self(funcs), *bind_args, **bind_kwargs + ) + ) + + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: + return self._f(*args, **kwargs) + + @staticmethod + def make_chain( + mode: CruFunctionChainMode, + funcs: Iterable[ + CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable + ], + *bind_args, + **bind_kwargs, + ) -> CruFunction: + return CruFunction( + CruFunctionMeta.chain(mode, funcs, *bind_args, **bind_kwargs) + ) + + +class CruWrappedFunctions: + none = CruFunction(CruRawFunctions.none) + true = CruFunction(CruRawFunctions.true) + false = CruFunction(CruRawFunctions.false) + identity = CruFunction(CruRawFunctions.identity) + only_you = CruFunction(CruRawFunctions.only_you) + equal = CruFunction(CruRawFunctions.equal) + not_equal = CruFunction(CruRawFunctions.not_equal) + not_ = CruFunction(CruRawFunctions.not_) + + +class CruFunctionGenerators: + @staticmethod + def make_isinstance_of_types(*types: type) -> Callable: + return CruFunction(lambda v: type(v) in types) + + +CRU.add_objects( + CruRawFunctions, + CruFunctionMeta, + CruFunction, + CruWrappedFunctions, + CruFunctionGenerators, +) diff --git a/tools/cru-py/cru/_util/_lang.py b/tools/cru-py/cru/_util/_lang.py new file mode 100644 index 0000000..925ba00 --- /dev/null +++ b/tools/cru-py/cru/_util/_lang.py @@ -0,0 +1,16 @@ +from collections.abc import Callable +from typing import Any, Iterable, TypeVar, cast + +T = TypeVar("T") +D = TypeVar("D") + + +def remove_element( + iterable: Iterable[T | None], to_rm: Iterable[Any], des: type[D] | None = None +) -> D: + to_rm = set(to_rm) + return cast(Callable[..., D], des or list)(v for v in iterable if v not in to_rm) + + +def remove_none(iterable: Iterable[T | None], des: type[D] | None = None) -> D: + return cast(Callable[..., D], des or list)(v for v in iterable if v is not None) diff --git a/tools/cru-py/cru/_util/_list.py b/tools/cru-py/cru/_util/_list.py new file mode 100644 index 0000000..711d5f1 --- /dev/null +++ b/tools/cru-py/cru/_util/_list.py @@ -0,0 +1,915 @@ +from __future__ import annotations + +from collections.abc import Iterable, Callable +from dataclasses import dataclass +from enum import Enum +from typing import ( + Generator, + Literal, + Self, + TypeAlias, + TypeVar, + ParamSpec, + Any, + Generic, + ClassVar, + Optional, + Union, + assert_never, + cast, + overload, + override, +) + +from ._const import CruNoValue, CruNotFound + +P = ParamSpec("P") +T = TypeVar("T") +O = TypeVar("O") +F = TypeVar("F") + +CanBeList: TypeAlias = Iterable[T] | T | None + +OptionalIndex: TypeAlias = int | None +OptionalType: TypeAlias = type | None +ElementOperation: TypeAlias = Callable[[T], Any] +ElementPredicate: TypeAlias = Callable[[T], bool] +ElementTransformer: TypeAlias = Callable[[T], O] +SelfElementTransformer: TypeAlias = ElementTransformer[T, T] +AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] + + +def flatten_with_func( + o: T, + max_depth: int, + is_leave: ElementPredicate[T], + get_children: ElementTransformer[T, Iterable[T]], + depth: int = 0, +) -> Iterable[T]: + if depth == max_depth or is_leave(o): + yield o + return + for child in get_children(o): + yield from flatten_with_func( + child, max_depth, is_leave, get_children, depth + 1 + ) + + +class _StepActionKind(Enum): + SKIP = 0 + # TODO: Rename this + SEND = 1 + STOP = 2 + AGGREGATE = 3 + + +@dataclass +class _StepAction(Generic[T]): + value: Iterable[_StepAction[T]] | T | None + kind: _StepActionKind + + @property + def non_aggregate_value(self) -> T: + assert self.kind != _StepActionKind.AGGREGATE + return cast(T, self.value) + + @staticmethod + def skip() -> _StepAction[T]: + return _StepAction(None, _StepActionKind.SKIP) + + @staticmethod + def send(value: T | None) -> _StepAction[T]: + return _StepAction(value, _StepActionKind.SEND) + + @staticmethod + def stop(value: T | None = None) -> _StepAction[T]: + return _StepAction(value, _StepActionKind.STOP) + + @staticmethod + def aggregate(*results: _StepAction[T]) -> _StepAction[T]: + return _StepAction(results, _StepActionKind.AGGREGATE) + + @staticmethod + def send_last(value: Any) -> _StepAction[T]: + return _StepAction.aggregate(_StepAction.send(value), _StepAction.stop()) + + def flatten(self) -> Iterable[_StepAction[T]]: + return flatten_with_func( + self, + -1, + lambda r: r.kind != _StepActionKind.AGGREGATE, + lambda r: cast(Iterable[_StepAction[T]], r.value), + ) + + +_r_skip = _StepAction.skip +_r_send = _StepAction.send +_r_stop = _StepAction.stop +_r_send_last = _StepAction.send_last +_r_aggregate = _StepAction.aggregate + + +_GeneralStepAction: TypeAlias = _StepAction[T] | T | None +_GeneralStepActionConverter: TypeAlias = Callable[ + [_GeneralStepAction[T]], _StepAction[T] +] +_IterateOperation = Callable[[T, int], _GeneralStepAction[O]] +_IteratePreHook = Callable[[Iterable[T]], _GeneralStepAction[O]] +_IteratePostHook = Callable[[int], _GeneralStepAction[O]] + + +class CruGenericIterableMeta: + StepActionKind = _StepActionKind + StepAction = _StepAction + GeneralStepAction = _GeneralStepAction + GeneralStepActionConverter = _GeneralStepActionConverter + IterateOperation = _IterateOperation + IteratePreHook = _IteratePreHook + IteratePostHook = _IteratePostHook + + @staticmethod + def _non_result_to_send(value: O | None) -> _StepAction[O]: + return _StepAction.send(value) + + @staticmethod + def _non_result_to_stop(value: O | None) -> _StepAction[O]: + return _StepAction.stop(value) + + @staticmethod + def _none_pre_iterate() -> _StepAction[O]: + return _r_skip() + + @staticmethod + def _none_post_iterate( + _index: int, + ) -> _StepAction[O]: + return _r_skip() + + def iterate( + self, + operation: _IterateOperation[T, O], + fallback_return: O, + pre_iterate: _IteratePreHook[T, O], + post_iterate: _IteratePostHook[O], + convert_non_result: Callable[[O | None], _StepAction[O]], + ) -> Generator[O, None, O]: + pre_result = pre_iterate(self._iterable) + if not isinstance(pre_result, _StepAction): + real_pre_result = convert_non_result(pre_result) + for r in real_pre_result.flatten(): + if r.kind == _StepActionKind.STOP: + return r.non_aggregate_value + elif r.kind == _StepActionKind.SEND: + yield r.non_aggregate_value + + for index, element in enumerate(self._iterable): + result = operation(element, index) + if not isinstance(result, _StepAction): + real_result = convert_non_result(result) + for r in real_result.flatten(): + if r.kind == _StepActionKind.STOP: + return r.non_aggregate_value + elif r.kind == _StepActionKind.SEND: + yield r.non_aggregate_value + else: + continue + + post_result = post_iterate(index + 1) + if not isinstance(post_result, _StepAction): + real_post_result = convert_non_result(post_result) + for r in real_post_result.flatten(): + if r.kind == _StepActionKind.STOP: + return r.non_aggregate_value + elif r.kind == _StepActionKind.SEND: + yield r.non_aggregate_value + + return fallback_return + + def _new( + self, + operation: _IterateOperation, + fallback_return: O, + /, + pre_iterate: _IteratePreHook[T, O] | None = None, + post_iterate: _IteratePostHook[O] | None = None, + ) -> CruIterableWrapper: + return CruIterableWrapper( + self.iterate( + operation, + fallback_return, + pre_iterate or CruIterableWrapper._none_pre_iterate, + post_iterate or CruIterableWrapper._none_post_iterate, + CruIterableWrapper._non_result_to_send, + ), + self._create_new_upstream(), + ) + + def _result( + self, + operation: _IterateOperation, + fallback_return: O, + /, + result_transform: SelfElementTransformer[O] | None = None, + pre_iterate: _IteratePreHook[T, O] | None = None, + post_iterate: _IteratePostHook[O] | None = None, + ) -> O: + try: + for _ in self.iterate( + operation, + fallback_return, + pre_iterate or CruIterableWrapper._none_pre_iterate, + post_iterate or CruIterableWrapper._none_post_iterate, + CruIterableWrapper._non_result_to_stop, + ): + pass + except StopIteration as stop: + return ( + stop.value if result_transform is None else result_transform(stop.value) + ) + raise RuntimeError("Should not reach here") + + +class IterDefaultResults: + @staticmethod + def true(_): + return True + + @staticmethod + def false(_): + return False + + @staticmethod + def not_found(_): + return CruNotFound.VALUE + + +class CruIterableCreators: + @staticmethod + def with_(o: Any) -> CruIterableWrapper: + return CruIterableWrapper(iter(o)) + + @staticmethod + def empty() -> CruIterableWrapper: + return CruIterableCreators.with_([]) + + @staticmethod + def range( + a, + b=None, + c=None, + ) -> CruIterableWrapper[int]: + args = [arg for arg in [a, b, c] if arg is not None] + return CruIterableCreators.with_(range(*args)) + + @staticmethod + def unite(*args: T) -> CruIterableWrapper[T]: + return CruIterableCreators.with_(args) + + @staticmethod + def _concat(*iterables: Iterable) -> Iterable: + for iterable in iterables: + yield from iterable + + @staticmethod + def concat(*iterables: Iterable) -> CruIterableWrapper: + return CruIterableWrapper(CruIterableCreators._concat(*iterables)) + + +class CruIterableWrapper(Generic[T]): + + def __init__( + self, + iterable: Iterable[T], + ) -> None: + self._iterable = iterable + + def __iter__(self): + return self._iterable.__iter__() + + @property + def me(self) -> Iterable[T]: + return self._iterable + + def replace_me_with(self, iterable: Iterable[O]) -> CruIterableWrapper[O]: + return CruIterableCreators.with_(iterable) + + def replace_me_with_empty(self) -> CruIterableWrapper[O]: + return CruIterableCreators.empty() + + def replace_me_with_range(self, a, b=None, c=None) -> CruIterableWrapper[int]: + return CruIterableCreators.range(a, b, c) + + def replace_me_with_unite(self, *args: O) -> CruIterableWrapper[O]: + return CruIterableCreators.unite(*args) + + def replace_me_with_concat(self, *iterables: Iterable) -> CruIterableWrapper: + return CruIterableCreators.concat(*iterables) + + @staticmethod + def _make_set(iterable: Iterable[O], discard: Iterable[Any] | None) -> set[O]: + s = set(iterable) + if discard is not None: + s = s - set(discard) + return s + + @staticmethod + def _make_list(iterable: Iterable[O], discard: Iterable[Any] | None) -> list[O]: + if discard is None: + return list(iterable) + return [v for v in iterable if v not in discard] + + def _help_make_set( + self, iterable: Iterable[O], discard: Iterable[Any] | None + ) -> set[O]: + return CruIterableWrapper._make_set(iterable, discard) + + def _help_make_list( + self, iterable: Iterable[O], discard: Iterable[Any] | None + ) -> list[O]: + return CruIterableWrapper._make_list(iterable, discard) + + def to_set(self, discard: Iterable[Any] | None = None) -> set[T]: + return CruIterableWrapper._make_set(self.me, discard) + + def to_list(self, discard: Iterable[Any] | None = None) -> list[T]: + return CruIterableWrapper._make_list(self.me, discard) + + def copy(self) -> CruIterableWrapper: + return CruIterableWrapper(iter(self.to_list()), self._create_new_upstream()) + + def new_start( + self, other: Iterable[O], /, clear_upstream: bool = False + ) -> CruIterableWrapper[O]: + return CruIterableWrapper( + other, None if clear_upstream else self._create_new_upstream() + ) + + @overload + def concat(self) -> Self: ... + + @overload + def concat( + self, *iterable: Iterable[Any], last: Iterable[O] + ) -> CruIterableWrapper[O]: ... + + def concat(self, *iterable: Iterable[Any]) -> CruIterableWrapper[Any]: # type: ignore + return self.new_start(CruIterableCreators.concat(self.me, *iterable)) + + def all(self, predicate: ElementPredicate[T]) -> bool: + """ + partial + """ + return self._result(lambda v, _: predicate(v) and None, IterDefaultResults.true) + + def all_isinstance(self, *types: OptionalType) -> bool: + """ + partial + """ + types = self._help_make_set(types) + return self.all(lambda v: type(v) in types) + + def any(self, predicate: ElementPredicate[T]) -> bool: + """ + partial + """ + return self._result(lambda v, _: predicate(v) or None, IterDefaultResults.false) + + def number(self) -> CruIterableWrapper: + """ + partial + """ + return self._new(lambda _, i: i) + + def take(self, predicate: ElementPredicate[T]) -> CruIterableWrapper: + """ + complete + """ + return self._new(lambda v, _: _r_send(v) if predicate(v) else None) + + def transform( + self, *transformers: OptionalElementTransformer + ) -> CruIterableWrapper: + """ + complete + """ + + def _transform_element(element, _): + for transformer in self._help_make_list(transformers): + if transformer is not None: + element = transformer(element) + return _r_send(element) + + return self._new(_transform_element) + + def take_n(self, max_count: int, neg_is_clone: bool = True) -> CruIterableWrapper: + """ + partial + """ + if max_count < 0: + if neg_is_clone: + return self.clone_me() + else: + raise ValueError("max_count must be 0 or positive.") + elif max_count == 0: + return self.drop_all() + return self._new( + lambda v, i: _r_send(v) if i < max_count - 1 else _r_send_last(v) + ) + + def take_by_indices(self, *indices: OptionalIndex) -> CruIterableWrapper: + """ + partial + """ + indices = self._help_make_set(indices) + max_index = max(indices) + return self.take_n(max_index + 1)._new( + lambda v, i: _r_send(v) if i in indices else None + ) + + def single_or( + self, fallback: Any | None = CRU_NOT_FOUND + ) -> T | Any | CRU_NOT_FOUND: + """ + partial + """ + first_2 = self.take_n(2) + has_value = False + value = None + for element in first_2.me: + if has_value: + raise ValueError("More than one value found.") + has_value = True + value = element + if has_value: + return value + else: + return fallback + + def first_or( + self, predicate: ElementPredicate[T], fallback: Any | None = CRU_NOT_FOUND + ) -> T | CRU_NOT_FOUND: + """ + partial + """ + result_iterable = self.take_n(1).single_or() + + @staticmethod + def first_index( + iterable: Iterable[T], predicate: ElementPredicate[T] + ) -> int | CRU_NOT_FOUND: + """ + partial + """ + for index, element in enumerate(iterable): + if predicate(element): + return index + + @staticmethod + def take_indices( + iterable: Iterable[T], predicate: ElementPredicate[T] + ) -> Iterable[int]: + """ + complete + """ + for index, element in enumerate(iterable): + if predicate(element): + yield index + + @staticmethod + def flatten( + o, + max_depth=-1, + is_leave: ElementPredicate | None = None, + get_children: OptionalElementTransformer = None, + ) -> Iterable: + """ + complete + """ + if is_leave is None: + is_leave = lambda v: not isinstance(v, Iterable) + if get_children is None: + get_children = lambda v: v + return CruIterableWrapper._flatten_with_func( + o, max_depth, is_leave, get_children + ) + + @staticmethod + def skip_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]: + """ + complete + """ + indices = set(indices) - {None} + for index, element in enumerate(iterable): + if index not in indices: + yield element + + @staticmethod + def skip_if(iterable: Iterable[T], predicate: ElementPredicate[T]) -> list[T]: + """ + complete + """ + for element in iterable: + if not predicate(element): + yield element + + def drop_all(self) -> CruIterableWrapper: + return self.replace_me_with_empty() + + @staticmethod + def remove_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]: + return [v for v in l if not p(v)] + + @staticmethod + def remove_all_value(l: Iterable[T], *r: Any) -> list[T]: + return [v for v in l if v not in r] + + @staticmethod + def replace_all_value(l: Iterable[T], old_value: Any, new_value: R) -> list[T | R]: + return [new_value if v == old_value else v for v in l] + + @staticmethod + def foreach(iterable: Iterable[T], *f: OptionalElementOperation[T]) -> None: + if len(f) == 0: + return + for v in iterable: + for f_ in f: + if f_ is not None: + f_(v) + + @staticmethod + def make(v: CanBeList[T], /, none_to_empty_list: bool = True) -> list[T]: + if v is None and none_to_empty_list: + return [] + return list(v) if isinstance(v, Iterable) else [v] + + +class ListOperations: + @staticmethod + def all(iterable: Iterable[T], predicate: ElementPredicate[T]) -> bool: + """ + partial + """ + return _God.spy(iterable, lambda v, _: predicate(v) and None, _God.Default.true) + + @staticmethod + def all_isinstance(iterable: Iterable[T], *types: OptionalType) -> bool: + """ + partial + """ + types = _God.help_make_set(types) + return ListOperations.all(iterable, lambda v: type(v) in types) + + @staticmethod + def any(iterable: Iterable[T], predicate: ElementPredicate[T]) -> bool: + """ + partial + """ + return _God.spy(iterable, lambda v, _: predicate(v) or None, _God.Default.false) + + @staticmethod + def indices(iterable: Iterable[T]) -> Iterable[int]: + """ + partial + """ + return _God.new(iterable, lambda _, i: i) + + @staticmethod + def take(iterable: Iterable[T], predicate: ElementPredicate[T]) -> Iterable[T]: + """ + complete + """ + return _God.new(iterable, lambda v, _: _God.yield_(v) if predicate(v) else None) + + @staticmethod + def transform( + iterable: Iterable[T], *transformers: OptionalElementTransformer + ) -> Iterable: + """ + complete + """ + + def _transform_element(element, _): + for transformer in transformers: + if transformer is not None: + element = transformer(element) + return element + + return _God.new(iterable, _transform_element) + + @staticmethod + def take_n(iterable: Iterable[T], n: int) -> Iterable[T]: + """ + partial + """ + if n < 0: + return iterable + elif n == 0: + return [] + return range(n)._god_yield( + iterable, lambda v, i: _yield(v) if i < n else _return() + ) + + @staticmethod + def take_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]: + """ + partial + """ + indices = set(indices) - {None} + max_index = max(indices) + iterable = ListOperations.take_n(iterable, max_index + 1) + return _god_yield(iterable, lambda v, i: _yield(v) if i in indices else None) + + @staticmethod + def first(iterable: Iterable[T]) -> T | CRU_NOT_FOUND: + """ + partial + """ + result_iterable = ListOperations.take_n(iterable, 1) + for element in result_iterable: + return element + return CRU_NOT_FOUND + + @staticmethod + def first_index( + iterable: Iterable[T], predicate: ElementPredicate[T] + ) -> int | CRU_NOT_FOUND: + """ + partial + """ + for index, element in enumerate(iterable): + if predicate(element): + return index + + @staticmethod + def take_indices( + iterable: Iterable[T], predicate: ElementPredicate[T] + ) -> Iterable[int]: + """ + complete + """ + for index, element in enumerate(iterable): + if predicate(element): + yield index + + @staticmethod + def _flatten(o, depth: int, max_depth: int) -> Iterable: + if depth == max_depth or not isinstance(o, Iterable): + yield o + return + for v in o: + yield from ListOperations._flatten(v, depth + 1, max_depth) + + @staticmethod + def flatten(o, max_depth=-1) -> Iterable: + """ + complete + """ + return ListOperations._flatten(o, 0, max_depth) + + @staticmethod + def skip_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]: + """ + complete + """ + indices = set(indices) - {None} + for index, element in enumerate(iterable): + if index not in indices: + yield element + + @staticmethod + def skip_if(iterable: Iterable[T], predicate: ElementPredicate[T]) -> list[T]: + """ + complete + """ + for element in iterable: + if not predicate(element): + yield element + + @staticmethod + def remove_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]: + return [v for v in l if not p(v)] + + @staticmethod + def remove_all_value(l: Iterable[T], *r: Any) -> list[T]: + return [v for v in l if v not in r] + + @staticmethod + def replace_all_value(l: Iterable[T], old_value: Any, new_value: R) -> list[T | R]: + return [new_value if v == old_value else v for v in l] + + @staticmethod + def foreach(iterable: Iterable[T], *f: OptionalElementOperation[T]) -> None: + if len(f) == 0: + return + for v in iterable: + for f_ in f: + if f_ is not None: + f_(v) + + @staticmethod + def make(v: CanBeList[T], /, none_to_empty_list: bool = True) -> list[T]: + if v is None and none_to_empty_list: + return [] + return list(v) if isinstance(v, Iterable) else [v] + + +class CruList(list, Generic[T]): + @property + def is_empty(self) -> bool: + return len(self) == 0 + + def sub_by_indices(self, *index: int) -> "CruList"[T]: + return CruList(ListOperations.sub_by_indices(self, *index)) + + def split_by_indices(self, *index: int) -> tuple["CruList"[T], "CruList"[T]]: + l1, l2 = ListOperations.split_by_indices(self, *index) + return CruList(l1), CruList(l2) + + def complement_indices(self, *index: int) -> list[int]: + return ListOperations.complement_indices(len(self), *index) + + def foreach(self, *f: OptionalElementOperation[T]) -> None: + ListOperations.foreach(self, *f) + + def all(self, p: ElementPredicate[T]) -> bool: + return ListOperations.all(self, p) + + def all_is_instance(self, *t: type) -> bool: + return ListOperations.all_isinstance(self, *t) + + def any(self, p: ElementPredicate[T]) -> bool: + return ListOperations.any(self, p) + + def find_all_if(self, p: ElementPredicate[T]) -> "CruList"[T]: + return CruList(ListOperations.take(self, p)) + + def find_if(self, p: ElementPredicate[T]) -> T | CRU_NOT_FOUND: + return ListOperations.first(self, p) + + def find_all_indices_if(self, p: ElementPredicate[T]) -> "CruList"[int]: + return CruList(ListOperations.take_indices(self, p)) + + def find_index_if(self, p: ElementPredicate[T]) -> int | CRU_NOT_FOUND: + return ListOperations.first_index(self, p) + + def split_if(self, p: ElementPredicate[T]) -> tuple["CruList"[T], "CruList"[T]]: + l1, l2 = ListOperations.split_if(self, p) + return CruList(l1), CruList(l2) + + def split_by_types(self, *t: type) -> tuple["CruList"[T], "CruList"[T]]: + l1, l2 = ListOperations.split_by_types(self, *t) + return CruList(l1), CruList(l2) + + def transform(self, *f: OptionalElementTransformer) -> "CruList"[Any]: + return CruList(ListOperations.transform(self, *f)) + + def transform_if( + self, f: OptionalElementTransformer, p: ElementPredicate[T] + ) -> "CruList"[Any]: + return CruList(ListOperations.transform_if(self, f, p)) + + def remove_by_indices(self, *index: int) -> "CruList"[T]: + return CruList(ListOperations.skip_by_indices(self, *index)) + + def remove_if(self, p: ElementPredicate[T]) -> "CruList"[T]: + return CruList(ListOperations.remove_if(self, p)) + + def remove_all_if(self, p: ElementPredicate[T]) -> "CruList"[T]: + return CruList(ListOperations.remove_all_if(self, p)) + + def remove_all_value(self, *r: Any) -> "CruList"[T]: + return CruList(ListOperations.remove_all_value(self, *r)) + + def replace_all_value(self, old_value: Any, new_value: R) -> "CruList"[T | R]: + return CruList(ListOperations.replace_all_value(self, old_value, new_value)) + + @staticmethod + def make(l: CanBeList[T]) -> "CruList"[T]: + return CruList(ListOperations.make(l)) + + +class CruInplaceList(CruList, Generic[T]): + + def clear(self) -> "CruInplaceList[T]": + self.clear() + return self + + def extend(self, *l: Iterable[T]) -> "CruInplaceList[T]": + self.extend(l) + return self + + def reset(self, *l: Iterable[T]) -> "CruInplaceList[T]": + self.clear() + self.extend(l) + return self + + def transform(self, *f: OptionalElementTransformer) -> "CruInplaceList"[Any]: + return self.reset(super().transform(*f)) + + def transform_if( + self, f: OptionalElementTransformer, p: ElementPredicate[T] + ) -> "CruInplaceList"[Any]: + return self.reset(super().transform_if(f, p)) + + def remove_by_indices(self, *index: int) -> "CruInplaceList"[T]: + return self.reset(super().remove_by_indices(*index)) + + def remove_all_if(self, p: ElementPredicate[T]) -> "CruInplaceList"[T]: + return self.reset(super().remove_all_if(p)) + + def remove_all_value(self, *r: Any) -> "CruInplaceList"[T]: + return self.reset(super().remove_all_value(*r)) + + def replace_all_value( + self, old_value: Any, new_value: R + ) -> "CruInplaceList"[T | R]: + return self.reset(super().replace_all_value(old_value, new_value)) + + @staticmethod + def make(l: CanBeList[T]) -> "CruInplaceList"[T]: + return CruInplaceList(ListOperations.make(l)) + + +K = TypeVar("K") + + +class CruUniqueKeyInplaceList(Generic[T, K]): + KeyGetter = Callable[[T], K] + + def __init__( + self, get_key: KeyGetter, *, before_add: Callable[[T], T] | None = None + ): + super().__init__() + self._get_key = get_key + self._before_add = before_add + self._l: CruInplaceList[T] = CruInplaceList() + + @property + def object_key_getter(self) -> KeyGetter: + return self._get_key + + @property + def internal_list(self) -> CruInplaceList[T]: + return self._l + + def validate_self(self): + keys = self._l.transform(self._get_key) + if len(keys) != len(set(keys)): + raise ValueError("Duplicate keys!") + + def get_or(self, k: K, fallback: Any = CRU_NOT_FOUND) -> T | Any: + r = self._l.find_if(lambda i: k == self._get_key(i)) + return r if r is not CRU_NOT_FOUND else fallback + + def get(self, k: K) -> T: + v = self.get_or(k, CRU_NOT_FOUND) + if v is CRU_NOT_FOUND: + raise KeyError(f"Key not found!") + return v + + def has_key(self, k: K) -> bool: + return self.get_or(k, CRU_NOT_FOUND) is not CRU_NOT_FOUND + + def has_any_key(self, *k: K) -> bool: + return self._l.any(lambda i: self._get_key(i) in k) + + def try_remove(self, k: K) -> bool: + i = self._l.find_index_if(lambda v: k == self._get_key(v)) + if i is CRU_NOT_FOUND: + return False + self._l.remove_by_indices(i) + return True + + def remove(self, k: K, allow_absense: bool = False) -> None: + if not self.try_remove(k) and not allow_absense: + raise KeyError(f"Key {k} not found!") + + def add(self, v: T, /, replace: bool = False) -> None: + if self.has_key(self._get_key(v)): + if replace: + self.remove(self._get_key(v)) + else: + raise ValueError(f"Key {self._get_key(v)} already exists!") + if self._before_add is not None: + v = self._before_add(v) + self._l.append(v) + + def set(self, v: T) -> None: + self.add(v, True) + + def extend(self, l: Iterable[T], /, replace: bool = False) -> None: + if not replace and self.has_any_key([self._get_key(i) for i in l]): + raise ValueError("Keys already exists!") + if self._before_add is not None: + l = [self._before_add(i) for i in l] + keys = [self._get_key(i) for i in l] + self._l.remove_all_if(lambda i: self._get_key(i) in keys).extend(l) + + def clear(self) -> None: + self._l.clear() + + def __iter__(self): + return iter(self._l) + + def __len__(self): + return len(self._l) diff --git a/tools/cru-py/cru/_util/_type.py b/tools/cru-py/cru/_util/_type.py new file mode 100644 index 0000000..dc50def --- /dev/null +++ b/tools/cru-py/cru/_util/_type.py @@ -0,0 +1,42 @@ +from types import NoneType +from typing import Any + +from ._list import CanBeList, CruList + +DEFAULT_NONE_ERR = ValueError +DEFAULT_NONE_ERR_MSG = "None is not allowed here." +DEFAULT_TYPE_ERR = ValueError +DEFAULT_TYPE_ERR_MSG = "Type of object is not allowed here." + + +class TypeSet(set[type]): + def __init__(self, *l: type): + l = CruList.make(l).remove_all_value(None, NoneType) + if not l.all_is_instance(type): + raise TypeError("t must be a type or None.") + super().__init__(l) + + def check_value(self, v: Any, /, allow_none: bool, empty_allow_all: bool = True, *, + none_err: type[Exception] = DEFAULT_NONE_ERR, + none_err_msg: str = DEFAULT_NONE_ERR_MSG, + type_err: type[Exception] = DEFAULT_TYPE_ERR, + type_err_msg: str = DEFAULT_TYPE_ERR_MSG) -> None: + if v is None: + if allow_none: + return + else: + raise none_err(none_err_msg) + if len(self) == 0 and empty_allow_all: + return + if type(v) not in self: + raise type_err(type_err_msg) + + def check_value_list(self, l: CanBeList, /, allow_none: bool, empty_allow_all: bool = True, *, + none_err: type[Exception] = DEFAULT_NONE_ERR, + none_err_msg: str = DEFAULT_NONE_ERR_MSG, + type_err: type[Exception] = DEFAULT_TYPE_ERR, + type_err_msg: str = DEFAULT_TYPE_ERR_MSG) -> None: + l = CruList.make(l) + for v in l: + self.check_value(v, allow_none, empty_allow_all, none_err=none_err, none_err_msg=none_err_msg, + type_err=type_err, type_err_msg=type_err_msg) diff --git a/tools/cru-py/cru/attr.py b/tools/cru-py/cru/attr.py index 32cca8d..a52585a 100644 --- a/tools/cru-py/cru/attr.py +++ b/tools/cru-py/cru/attr.py @@ -3,7 +3,7 @@ from collections.abc import Callable, Iterable from dataclasses import dataclass, field from typing import Any, ClassVar -from .util import CanBeList, TypeSet, F, L, WF, CruUniqueKeyInplaceList, CRU_NOT_FOUND, CRU_USE_DEFAULT, \ +from ._util import CanBeList, TypeSet, F, L, WF, CruUniqueKeyInplaceList, CRU_NOT_FOUND, CRU_USE_DEFAULT, \ CRU_DONT_CHANGE, CRU_PLACEHOLDER diff --git a/tools/cru-py/cru/excp.py b/tools/cru-py/cru/excp.py index 358ad90..9ea204e 100644 --- a/tools/cru-py/cru/excp.py +++ b/tools/cru-py/cru/excp.py @@ -1,7 +1,7 @@ from typing import Any from .attr import CruAttrDefRegistry, CruAttr, CruAttrTable -from .util import CRU_NOT_FOUND, CruList, CRU_USE_DEFAULT +from ._util import CRU_NOT_FOUND, CruList, CRU_USE_DEFAULT CRU_EXCEPTION_ATTR_DEF_REGISTRY = CruAttrDefRegistry() diff --git a/tools/cru-py/cru/service/docker.py b/tools/cru-py/cru/service/docker.py index a57a246..5958f4f 100644 --- a/tools/cru-py/cru/service/docker.py +++ b/tools/cru-py/cru/service/docker.py @@ -1,7 +1,7 @@ import shutil import subprocess -from ..util import L +from .._util import L class DockerController: diff --git a/tools/cru-py/cru/util/__init__.py b/tools/cru-py/cru/util/__init__.py deleted file mode 100644 index ecd9673..0000000 --- a/tools/cru-py/cru/util/__init__.py +++ /dev/null @@ -1,25 +0,0 @@ -from typing import Any - -from ._const import cru_make_unique_object, cru_make_bool_unique_object, CRU_NOT_FOUND, CRU_USE_DEFAULT, \ - CRU_DONT_CHANGE, \ - CRU_PLACEHOLDER -from ._func import CruFunction, CruFunctionMeta, CruRawFunctions, CruWrappedFunctions, CruFunctionGenerators -from ._list import CruList, CruInplaceList, CruUniqueKeyInplaceList, ListOperations, CanBeList, ElementOperation, \ - ElementPredicate, ElementTransformer, OptionalElementOperation, ElementPredicate, OptionalElementTransformer -from ._type import TypeSet - -F = CruFunction -WF = CruWrappedFunctions -FG = CruFunctionGenerators -L = CruList - - -__all__ = [ - "CRU_NOT_FOUND", "CRU_USE_DEFAULT", "CRU_DONT_CHANGE", "CRU_PLACEHOLDER", - "CruFunction", "CruFunctionMeta", "CruRawFunctions", "CruWrappedFunctions", "CruFunctionGenerators", - "CruList", "CruInplaceList", "CruUniqueKeyInplaceList", "ListOperations", - "CanBeList", "ElementOperation", "ElementPredicate", "ElementTransformer", - "OptionalElementOperation", "ElementPredicate", "OptionalElementTransformer", - "TypeSet", - "F", "WF", "FG", "L" -] diff --git a/tools/cru-py/cru/util/_const.py b/tools/cru-py/cru/util/_const.py deleted file mode 100644 index 8140988..0000000 --- a/tools/cru-py/cru/util/_const.py +++ /dev/null @@ -1,56 +0,0 @@ -from typing import Any - -from ._cru import CRU - - -def cru_make_unique_object() -> Any: - class _CruUnique: - _i = False - - def __init__(self): - if self._i: - raise ValueError("_CruAttrNotSet is a singleton!") - self._i = True - - def __copy__(self): - return self - - def __eq__(self, other): - return isinstance(other, _CruUnique) - - v = _CruUnique() - - return v - - -def cru_make_bool_unique_object(b: bool) -> Any: - class _CruBoolUnique: - _i = False - - def __init__(self): - super().__init__(b) - if self._i: - raise ValueError("_CruAttrNotSet is a singleton!") - self._i = True - - def __copy__(self): - return self - - def __eq__(self, other): - return isinstance(other, _CruBoolUnique) or b == other - - def __bool__(self): - return b - - v = _CruBoolUnique() - - return v - - -CRU_NOT_FOUND = cru_make_bool_unique_object(False) -CRU_USE_DEFAULT = cru_make_unique_object() -CRU_DONT_CHANGE = cru_make_unique_object() -CRU_PLACEHOLDER = cru_make_unique_object() - -CRU.add_objects(cru_make_unique_object, cru_make_bool_unique_object, CRU_NOT_FOUND, CRU_USE_DEFAULT, - CRU_DONT_CHANGE, CRU_PLACEHOLDER) diff --git a/tools/cru-py/cru/util/_cru.py b/tools/cru-py/cru/util/_cru.py deleted file mode 100644 index 61a0ee1..0000000 --- a/tools/cru-py/cru/util/_cru.py +++ /dev/null @@ -1,90 +0,0 @@ -from typing import Any - - -class _Cru: - NAME_PREFIXES = ("CRU_", "Cru", "cru_") - - def __init__(self): - self._d: dict[str, Any] = {} - - def all_names(self) -> list[str]: - return list(self._d.keys()) - - def get(self, name: str) -> Any: - return self._d[name] - - def has_name(self, name: str) -> bool: - return name in self._d - - @staticmethod - def _maybe_remove_prefix(name: str) -> str | None: - for prefix in _Cru.NAME_PREFIXES: - if name.startswith(prefix): - return name[len(prefix):] - return None - - def _check_name_exist(self, *names: str) -> None: - for name in names: - if name is None: continue - if self.has_name(name): - raise ValueError(f"Name {name} exists in CRU.") - - @staticmethod - def check_name_format(name: str) -> tuple[str, str]: - no_prefix_name = _Cru._maybe_remove_prefix(name) - if no_prefix_name is None: - raise ValueError(f"Name {name} is not prefixed with {_Cru.NAME_PREFIXES}.") - return name, no_prefix_name - - @staticmethod - def _check_object_name(o) -> tuple[str, str]: - return _Cru.check_name_format(o.__name__) - - def _do_add(self, o, *names: str | None) -> list[str]: - names = set(names) - names.remove(None) - for name in names: - self._d[name] = o - return list(names) - - def add(self, o, name: str | None) -> tuple[str, str | None]: - if name is None: - name, no_prefix_name = self._check_object_name(o) - else: - no_prefix_name = self._maybe_remove_prefix(name) - - self._check_name_exist(name, no_prefix_name) - self._do_add(o, name, no_prefix_name) - return name, no_prefix_name - - def add_with_alias(self, o, name: str | None = None, *aliases: str) -> list[str]: - final_names = [] - if name is None: - name, no_prefix_name = self._check_object_name(o) - self._check_name_exist(name, no_prefix_name) - final_names.extend([name, no_prefix_name]) - for alias in aliases: - no_prefix_name = self._maybe_remove_prefix(alias) - self._check_name_exist(alias, no_prefix_name) - final_names.extend([alias, no_prefix_name]) - - return self._do_add(o, *final_names) - - def add_objects(self, *objects): - final_list = [] - for o in objects: - name, no_prefix_name = self._check_object_name(o) - self._check_name_exist(name, no_prefix_name) - final_list.append((o, name, no_prefix_name)) - for o, name, no_prefix_name in final_list: - self._do_add(o, name, no_prefix_name) - - def __getitem__(self, item): - return self.get(item) - - def __getattr__(self, item): - return self.get(item) - - -CRU_NAME_PREFIXES = _Cru.NAME_PREFIXES -CRU = _Cru() diff --git a/tools/cru-py/cru/util/_event.py b/tools/cru-py/cru/util/_event.py deleted file mode 100644 index 813e33f..0000000 --- a/tools/cru-py/cru/util/_event.py +++ /dev/null @@ -1,41 +0,0 @@ -from typing import ParamSpec, TypeVar, Callable - -from ._list import CruInplaceList, CruList - -P = ParamSpec('P') -R = TypeVar('R') -F = Callable[P, R] - - -class EventHandlerToken: - def __init__(self, event: "Event", handler: F, once: bool = False) -> None: - self._event = event - self._handler = handler - self._once = once - - @property - def event(self) -> "Event": - return self._event - - @property - def handler(self) -> F: - return self._handler - - @property - def once(self) -> bool: - return self._once - - -class Event: - def __init__(self, name: str) -> None: - self._name = name - self._tokens: CruInplaceList[EventHandlerToken] = CruInplaceList() - - def register(self, handler: F, once: bool = False) -> EventHandlerToken: - token = EventHandlerToken(self, handler, once) - self._tokens.append(token) - return token - - def unregister(self, *h: EventHandlerToken | F) -> int: - - self._tokens.find_all_indices_if(lambda t: ) diff --git a/tools/cru-py/cru/util/_func.py b/tools/cru-py/cru/util/_func.py deleted file mode 100644 index d9f8044..0000000 --- a/tools/cru-py/cru/util/_func.py +++ /dev/null @@ -1,126 +0,0 @@ -from collections.abc import Callable -from typing import TypeVar - -from ._cru import CRU -from ._const import CRU_PLACEHOLDER - -T = TypeVar("T") - -_PLACEHOLDER = CRU_PLACEHOLDER - -class CruRawFunctions: - @staticmethod - def none(*_v, **_kwargs) -> None: - return None - - @staticmethod - def true(*_v, **_kwargs) -> True: - return True - - @staticmethod - def false(*_v, **_kwargs) -> False: - return False - - @staticmethod - def identity(v: T) -> T: - return v - - @staticmethod - def only_you(v: T, *_v, **_kwargs) -> T: - return v - - @staticmethod - def equal(a, b) -> bool: - return a == b - - @staticmethod - def not_equal(a, b) -> bool: - return a != b - - @staticmethod - def not_(v): - return not v - - -class CruFunctionMeta: - @staticmethod - def bind(func: Callable, *bind_args, **bind_kwargs) -> Callable: - def bound_func(*args, **kwargs): - popped = 0 - real_args = [] - for arg in bind_args: - if arg is _PLACEHOLDER: - real_args.append(args[popped]) - popped += 1 - else: - real_args.append(arg) - real_args.extend(args[popped:]) - return func(*real_args, **(bind_kwargs | kwargs)) - - return bound_func - - @staticmethod - def chain(*funcs: Callable) -> Callable: - if len(funcs) == 0: - raise ValueError("At least one function is required!") - - final_func = funcs[0] - for func in funcs[1:]: - func_copy = func - - def chained_func(*args, **kwargs): - results = final_func(*args, **kwargs) - results = results if isinstance(results, tuple) else (results,) - return func_copy(*results) - - final_func = chained_func - - return final_func - - -# Advanced Function Wrapper -class CruFunction: - def __init__(self, f: Callable): - self._f = f - - @property - def f(self) -> Callable: - return self._f - - @property - def func(self) -> Callable: - return self.f - - def bind(self, *bind_args, **bind_kwargs) -> "CruFunction": - self._f = CruFunctionMeta.bind(self._f, *bind_args, **bind_kwargs) - return self - - def chain(self, *funcs: Callable) -> "CruFunction": - self._f = CruFunctionMeta.chain(self._f, *funcs) - return self - - def __call__(self, *args, **kwargs): - return self._f(*args, **kwargs) - - @staticmethod - def make_chain(base_func: Callable, *funcs: Callable) -> "CruFunction": - return CruFunction(base_func).chain(*funcs) - - -class CruWrappedFunctions: - none = CruFunction(CruRawFunctions.none) - true = CruFunction(CruRawFunctions.true) - false = CruFunction(CruRawFunctions.false) - identity = CruFunction(CruRawFunctions.identity) - only_you = CruFunction(CruRawFunctions.only_you) - equal = CruFunction(CruRawFunctions.equal) - not_equal = CruFunction(CruRawFunctions.not_equal) - not_ = CruFunction(CruRawFunctions.not_) - - -class CruFunctionGenerators: - @staticmethod - def make_isinstance_of_types(*types: type) -> Callable: - return CruFunction(lambda v: type(v) in types) - -CRU.add_objects(CruRawFunctions, CruFunctionMeta, CruFunction, CruWrappedFunctions, CruFunctionGenerators) diff --git a/tools/cru-py/cru/util/_list.py b/tools/cru-py/cru/util/_list.py deleted file mode 100644 index e1c8373..0000000 --- a/tools/cru-py/cru/util/_list.py +++ /dev/null @@ -1,772 +0,0 @@ -from collections.abc import Iterable, Callable -from dataclasses import dataclass -from enum import Enum -from typing import TypeVar, ParamSpec, Any, Generic, ClassVar, Optional, Union - -from ._const import CRU_NOT_FOUND - -T = TypeVar("T") -O = TypeVar("O") -R = TypeVar("R") -F = TypeVar("F") - -CanBeList = T | Iterable[T] | None - -OptionalIndex = int | None -OptionalType = type | None -ElementOperation = Callable[[T], Any] | None -ElementPredicate = Callable[[T], bool] -ElementTransformer = Callable[[T], R] -SelfElementTransformer = ElementTransformer[T, T] -AnyElementTransformer = ElementTransformer[Any, Any] -OptionalElementOperation = ElementOperation | None -OptionalElementTransformer = ElementTransformer | None -OptionalSelfElementTransformer = ElementTransformer[T, T] -OptionalAnyElementTransformer = AnyElementTransformer | None - - -def _flatten_with_func(o: T, max_depth: int, is_leave: ElementPredicate[T], - get_children: SelfElementTransformer[T], depth: int = 0) -> Iterable[T]: - if depth == max_depth or is_leave(o): - yield o - return - for child in get_children(o): - yield from _flatten_with_func(child, max_depth, is_leave, get_children, depth + 1) - - -class _Action(Enum): - SKIP = 0 - SEND = 1 - STOP = 2 - AGGREGATE = 3 - - -@dataclass -class _Result(Generic[T]): - Action: ClassVar[type[_Action]] = _Action - - value: T | O | None - action: Action - - @staticmethod - def skip() -> "_Result"[T]: - return _Result(None, _Action.SKIP) - - @staticmethod - def send(value: Any) -> "_Result"[T]: - return _Result(value, _Action.SEND) - - @staticmethod - def stop(value: Any = None) -> "_Result"[T]: - return _Result(value, _Action.STOP) - - @staticmethod - def aggregate(*result: "_Result"[T]) -> "_Result"[T]: - return _Result(result, _Action.AGGREGATE) - - @staticmethod - def send_last(value: Any) -> "_Result"[T]: - return _Result.aggregate(_Result.send(value), _Result.stop()) - - def flatten(self) -> Iterable["_Result"[T]]: - return _flatten_with_func(self, -1, lambda r: r.action != _Action.AGGREGATE, lambda r: r.value) - - -_r_skip = _Result.skip -_r_send = _Result.send -_r_stop = _Result.stop -_r_send_last = _Result.send_last -_r_aggregate = _Result.aggregate - - -class _Defaults: - @staticmethod - def true(_): - return True - - @staticmethod - def false(_): - return False - - @staticmethod - def not_found(_): - return CRU_NOT_FOUND - - -def _default_upstream() -> Iterable[Iterable]: - return iter([]) - - -CruIterableUpstream = Iterable[Iterable] -CruIterableOptionalUpstream = CruIterableUpstream | None - - -class CruIterableCreators: - @staticmethod - def with_(o: Any, /, upstreams: CruIterableOptionalUpstream = _default_upstream()) -> "CruIterableWrapper": - return CruIterableWrapper(iter(o), upstreams) - - @staticmethod - def empty(upstreams: CruIterableOptionalUpstream = _default_upstream()) -> "CruIterableWrapper": - return CruIterableCreators.with_([], upstreams) - - @staticmethod - def range(a, b=None, c=None, /, upstreams: CruIterableOptionalUpstream = _default_upstream()) -> \ - "CruIterableWrapper"[int]: - args = [arg for arg in [a, b, c] if arg is not None] - return CruIterableCreators.with_(range(*args), upstreams) - - @staticmethod - def unite(*args: T, upstreams: CruIterableOptionalUpstream = _default_upstream()) -> "CruIterableWrapper"[T]: - return CruIterableCreators.with_(args, upstreams) - - @staticmethod - def _concat(*iterables: Iterable) -> Iterable: - for iterable in iterables: - yield from iterable - - @staticmethod - def concat(*iterables: Iterable, - upstreams: CruIterableOptionalUpstream = _default_upstream()) -> "CruIterableWrapper": - return CruIterableWrapper(CruIterableCreators._concat(*iterables), upstreams) - - -class CruIterableWrapper(Generic[T]): - Upstream = CruIterableUpstream - OptionalUpstream = CruIterableOptionalUpstream - _Result = _Result[T] - _Operation = Callable[[T, int], _Result | Any | None] - - def __init__(self, iterable: Iterable[T], /, upstreams: OptionalUpstream = _default_upstream()) -> None: - self._iterable = iterable - self._upstreams = None if upstreams is None else list(upstreams) - - @property - def me(self) -> Iterable[T]: - return self._iterable - - # TODO: Return Type - @property - def my_upstreams(self) -> Optional["CruIterableWrapper"]: - if self._upstreams is None: - return None - return CruIterableWrapper(iter(self._upstreams)) - - def disable_my_upstreams(self) -> "CruIterableWrapper"[T]: - return CruIterableWrapper(self._iterable, None) - - def clear_my_upstreams(self) -> "CruIterableWrapper"[T]: - return CruIterableWrapper(self._iterable) - - def _create_upstreams_prepend_self(self) -> Upstream: - yield self._iterable - yield self.my_upstreams - - # TODO: Return Type - def _create_new_upstreams(self, append: bool = True) -> Optional["CruIterableWrapper"]: - if not append: return self.my_upstreams - if self.my_upstreams is None: - return None - return CruIterableWrapper(self._create_upstreams_prepend_self()) - - def clone_me(self, /, update_upstreams: bool = True) -> "CruIterableWrapper"[T]: - return CruIterableWrapper(self._iterable, self._create_new_upstreams(update_upstreams)) - - def replace_me_with(self, iterable: Iterable[O], /, update_upstreams: bool = True) -> "CruIterableWrapper"[O]: - return CruIterableCreators.with_(iterable, upstreams=self._create_new_upstreams(update_upstreams)) - - def replace_me_with_empty(self, /, update_upstreams: bool = True) -> "CruIterableWrapper"[O]: - return CruIterableCreators.empty(upstreams=self._create_new_upstreams(update_upstreams)) - - def replace_me_with_range(self, a, b=None, c=None, /, update_upstreams: bool = True) -> "CruIterableWrapper"[int]: - return CruIterableCreators.range(a, b, c, upstreams=self._create_new_upstreams(update_upstreams)) - - def replace_me_with_unite(self, *args: O, update_upstreams: bool = True) -> "CruIterableWrapper"[O]: - return CruIterableCreators.unite(*args, upstreams=self._create_new_upstreams(update_upstreams)) - - def replace_me_with_concat(self, *iterables: Iterable, update_upstreams: bool = True) -> "CruIterableWrapper": - return CruIterableCreators.concat(*iterables, upstreams=self._create_new_upstreams(update_upstreams)) - - @staticmethod - def _non_result_to_yield(value: Any | None) -> _Result: - return _Result.stop(value) - - @staticmethod - def _non_result_to_return(value: Any | None) -> _Result: - return _Result.stop(value) - - def _real_iterate(self, operation: _Operation, - convert_non_result: Callable[[Any | None], _Result]) -> Iterable: - - for index, element in enumerate(self._iterable): - result = operation(element, index) - if not isinstance(result, _Result): - result = convert_non_result(result) - for result in result.flatten(): - if result.action == _Result.Action.STOP: - return result.value - elif result.action == _Result.Action.SEND: - yield result.value - else: - continue - - def _new(self, operation: _Operation) -> "CruIterableWrapper": - return CruIterableWrapper(self._real_iterate(operation, CruIterableWrapper._non_result_to_yield), - self._create_new_upstreams()) - - def _result(self, operation: _Operation, - result_transform: OptionalElementTransformer[T, T | O] = None) -> T | O: - try: - self._real_iterate(operation, CruIterableWrapper._non_result_to_return) - except StopIteration as stop: - return stop.value if result_transform is None else result_transform(stop.value) - - @staticmethod - def _make_set(iterable: Iterable, discard: Iterable | None) -> set: - s = set(iterable) - if discard is not None: - s = s - set(discard) - return s - - @staticmethod - def _make_list(iterable: Iterable, discard: Iterable | None) -> list: - if discard is None: return list(iterable) - return [v for v in iterable if v not in discard] - - # noinspection PyMethodMayBeStatic - def _help_make_set(self, iterable: Iterable, discard: Iterable | None = iter([None])) -> set: - return CruIterableWrapper._make_set(iterable, discard) - - # noinspection PyMethodMayBeStatic - def _help_make_list(self, iterable: Iterable, discard: Iterable | None = iter([None])) -> list: - return CruIterableWrapper._make_list(iterable, discard) - - def to_set(self, discard: Iterable | None = None) -> set[T]: - return CruIterableWrapper._make_set(self.me, discard) - - def to_list(self, discard: Iterable | None = None) -> list[T]: - return CruIterableWrapper._make_list(self.me, discard) - - def copy(self) -> "CruIterableWrapper": - return CruIterableWrapper(iter(self.to_list()), self._create_new_upstreams()) - - def concat(self, *iterable: Iterable[T]) -> "CruIterableWrapper": - return self.replace_me_with_concat(self.me, *iterable) - - def all(self, predicate: ElementPredicate[T]) -> bool: - """ - partial - """ - return self._result(lambda v, _: predicate(v) and None, _Defaults.true) - - def all_isinstance(self, *types: OptionalType) -> bool: - """ - partial - """ - types = self._help_make_set(types) - return self.all(lambda v: type(v) in types) - - def any(self, predicate: ElementPredicate[T]) -> bool: - """ - partial - """ - return self._result(lambda v, _: predicate(v) or None, _Defaults.false) - - def number(self) -> "CruIterableWrapper": - """ - partial - """ - return self._new(lambda _, i: i) - - def take(self, predicate: ElementPredicate[T]) -> "CruIterableWrapper": - """ - complete - """ - return self._new(lambda v, _: _r_send(v) if predicate(v) else None) - - def transform(self, *transformers: OptionalElementTransformer) -> "CruIterableWrapper": - """ - complete - """ - - def _transform_element(element, _): - for transformer in self._help_make_list(transformers): - if transformer is not None: - element = transformer(element) - return _r_send(element) - - return self._new(_transform_element) - - def take_n(self, max_count: int, neg_is_clone: bool = True) -> "CruIterableWrapper": - """ - partial - """ - if max_count < 0: - if neg_is_clone: - return self.clone_me() - else: - raise ValueError("max_count must be 0 or positive.") - elif max_count == 0: - return self.drop_all() - return self._new(lambda v, i: _r_send(v) if i < max_count - 1 else _r_send_last(v)) - - def take_by_indices(self, *indices: OptionalIndex) -> "CruIterableWrapper": - """ - partial - """ - indices = self._help_make_set(indices) - max_index = max(indices) - return self.take_n(max_index + 1)._new(lambda v, i: _r_send(v) if i in indices else None) - - def single_or(self, fallback: Any | None = CRU_NOT_FOUND) -> T | Any | CRU_NOT_FOUND: - """ - partial - """ - first_2 = self.take_n(2) - has_value = False - value = None - for element in first_2.me: - if has_value: - raise ValueError("More than one value found.") - has_value = True - value = element - if has_value: - return value - else: - return fallback - - def first_or(self, predicate: ElementPredicate[T], fallback: Any | None = CRU_NOT_FOUND) -> T | CRU_NOT_FOUND: - """ - partial - """ - result_iterable = self.take_n(1).single_or() - - @staticmethod - def first_index(iterable: Iterable[T], predicate: ElementPredicate[T]) -> int | CRU_NOT_FOUND: - """ - partial - """ - for index, element in enumerate(iterable): - if predicate(element): - return index - - @staticmethod - def take_indices(iterable: Iterable[T], predicate: ElementPredicate[T]) -> Iterable[int]: - """ - complete - """ - for index, element in enumerate(iterable): - if predicate(element): - yield index - - @staticmethod - def flatten(o, max_depth=-1, is_leave: ElementPredicate | None = None, - get_children: OptionalElementTransformer = None) -> Iterable: - """ - complete - """ - if is_leave is None: - is_leave = lambda v: not isinstance(v, Iterable) - if get_children is None: - get_children = lambda v: v - return CruIterableWrapper._flatten_with_func(o, max_depth, is_leave, get_children) - - @staticmethod - def skip_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]: - """ - complete - """ - indices = set(indices) - {None} - for index, element in enumerate(iterable): - if index not in indices: - yield element - - @staticmethod - def skip_if(iterable: Iterable[T], predicate: ElementPredicate[T]) -> list[T]: - """ - complete - """ - for element in iterable: - if not predicate(element): - yield element - - def drop_all(self) -> "CruIterableWrapper": - return self.replace_me_with_empty() - - @staticmethod - def remove_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]: - return [v for v in l if not p(v)] - - @staticmethod - def remove_all_value(l: Iterable[T], *r: Any) -> list[T]: - return [v for v in l if v not in r] - - @staticmethod - def replace_all_value(l: Iterable[T], old_value: Any, new_value: R) -> list[T | R]: - return [new_value if v == old_value else v for v in l] - - @staticmethod - def foreach(iterable: Iterable[T], *f: OptionalElementOperation[T]) -> None: - if len(f) == 0: return - for v in iterable: - for f_ in f: - if f_ is not None: - f_(v) - - @staticmethod - def make(v: CanBeList[T], /, none_to_empty_list: bool = True) -> list[T]: - if v is None and none_to_empty_list: return [] - return list(v) if isinstance(v, Iterable) else [v] - - -class ListOperations: - @staticmethod - def all(iterable: Iterable[T], predicate: ElementPredicate[T]) -> bool: - """ - partial - """ - return _God.spy(iterable, lambda v, _: predicate(v) and None, _God.Default.true) - - @staticmethod - def all_isinstance(iterable: Iterable[T], *types: OptionalType) -> bool: - """ - partial - """ - types = _God.help_make_set(types) - return ListOperations.all(iterable, lambda v: type(v) in types) - - @staticmethod - def any(iterable: Iterable[T], predicate: ElementPredicate[T]) -> bool: - """ - partial - """ - return _God.spy(iterable, lambda v, _: predicate(v) or None, _God.Default.false) - - @staticmethod - def indices(iterable: Iterable[T]) -> Iterable[int]: - """ - partial - """ - return _God.new(iterable, lambda _, i: i) - - @staticmethod - def take(iterable: Iterable[T], predicate: ElementPredicate[T]) -> Iterable[T]: - """ - complete - """ - return _God.new(iterable, lambda v, _: _God.yield_(v) if predicate(v) else None) - - @staticmethod - def transform(iterable: Iterable[T], *transformers: OptionalElementTransformer) -> Iterable: - """ - complete - """ - - def _transform_element(element, _): - for transformer in transformers: - if transformer is not None: - element = transformer(element) - return element - - return _God.new(iterable, _transform_element) - - @staticmethod - def take_n(iterable: Iterable[T], n: int) -> Iterable[T]: - """ - partial - """ - if n < 0: - return iterable - elif n == 0: - return [] - return range(n)._god_yield(iterable, lambda v, i: _yield(v) if i < n else _return()) - - @staticmethod - def take_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]: - """ - partial - """ - indices = set(indices) - {None} - max_index = max(indices) - iterable = ListOperations.take_n(iterable, max_index + 1) - return _god_yield(iterable, lambda v, i: _yield(v) if i in indices else None) - - @staticmethod - def first(iterable: Iterable[T]) -> T | CRU_NOT_FOUND: - """ - partial - """ - result_iterable = ListOperations.take_n(iterable, 1) - for element in result_iterable: - return element - return CRU_NOT_FOUND - - @staticmethod - def first_index(iterable: Iterable[T], predicate: ElementPredicate[T]) -> int | CRU_NOT_FOUND: - """ - partial - """ - for index, element in enumerate(iterable): - if predicate(element): - return index - - @staticmethod - def take_indices(iterable: Iterable[T], predicate: ElementPredicate[T]) -> Iterable[int]: - """ - complete - """ - for index, element in enumerate(iterable): - if predicate(element): - yield index - - @staticmethod - def _flatten(o, depth: int, max_depth: int) -> Iterable: - if depth == max_depth or not isinstance(o, Iterable): - yield o - return - for v in o: - yield from ListOperations._flatten(v, depth + 1, max_depth) - - @staticmethod - def flatten(o, max_depth=-1) -> Iterable: - """ - complete - """ - return ListOperations._flatten(o, 0, max_depth) - - @staticmethod - def skip_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]: - """ - complete - """ - indices = set(indices) - {None} - for index, element in enumerate(iterable): - if index not in indices: - yield element - - @staticmethod - def skip_if(iterable: Iterable[T], predicate: ElementPredicate[T]) -> list[T]: - """ - complete - """ - for element in iterable: - if not predicate(element): - yield element - - @staticmethod - def remove_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]: - return [v for v in l if not p(v)] - - @staticmethod - def remove_all_value(l: Iterable[T], *r: Any) -> list[T]: - return [v for v in l if v not in r] - - @staticmethod - def replace_all_value(l: Iterable[T], old_value: Any, new_value: R) -> list[T | R]: - return [new_value if v == old_value else v for v in l] - - @staticmethod - def foreach(iterable: Iterable[T], *f: OptionalElementOperation[T]) -> None: - if len(f) == 0: return - for v in iterable: - for f_ in f: - if f_ is not None: - f_(v) - - @staticmethod - def make(v: CanBeList[T], /, none_to_empty_list: bool = True) -> list[T]: - if v is None and none_to_empty_list: return [] - return list(v) if isinstance(v, Iterable) else [v] - - -class CruList(list, Generic[T]): - @property - def is_empty(self) -> bool: - return len(self) == 0 - - def sub_by_indices(self, *index: int) -> "CruList"[T]: - return CruList(ListOperations.sub_by_indices(self, *index)) - - def split_by_indices(self, *index: int) -> tuple["CruList"[T], "CruList"[T]]: - l1, l2 = ListOperations.split_by_indices(self, *index) - return CruList(l1), CruList(l2) - - def complement_indices(self, *index: int) -> list[int]: - return ListOperations.complement_indices(len(self), *index) - - def foreach(self, *f: OptionalElementOperation[T]) -> None: - ListOperations.foreach(self, *f) - - def all(self, p: ElementPredicate[T]) -> bool: - return ListOperations.all(self, p) - - def all_is_instance(self, *t: type) -> bool: - return ListOperations.all_isinstance(self, *t) - - def any(self, p: ElementPredicate[T]) -> bool: - return ListOperations.any(self, p) - - def find_all_if(self, p: ElementPredicate[T]) -> "CruList"[T]: - return CruList(ListOperations.take(self, p)) - - def find_if(self, p: ElementPredicate[T]) -> T | CRU_NOT_FOUND: - return ListOperations.first(self, p) - - def find_all_indices_if(self, p: ElementPredicate[T]) -> "CruList"[int]: - return CruList(ListOperations.take_indices(self, p)) - - def find_index_if(self, p: ElementPredicate[T]) -> int | CRU_NOT_FOUND: - return ListOperations.first_index(self, p) - - def split_if(self, p: ElementPredicate[T]) -> tuple["CruList"[T], "CruList"[T]]: - l1, l2 = ListOperations.split_if(self, p) - return CruList(l1), CruList(l2) - - def split_by_types(self, *t: type) -> tuple["CruList"[T], "CruList"[T]]: - l1, l2 = ListOperations.split_by_types(self, *t) - return CruList(l1), CruList(l2) - - def transform(self, *f: OptionalElementTransformer) -> "CruList"[Any]: - return CruList(ListOperations.transform(self, *f)) - - def transform_if(self, f: OptionalElementTransformer, p: ElementPredicate[T]) -> "CruList"[Any]: - return CruList(ListOperations.transform_if(self, f, p)) - - def remove_by_indices(self, *index: int) -> "CruList"[T]: - return CruList(ListOperations.skip_by_indices(self, *index)) - - def remove_if(self, p: ElementPredicate[T]) -> "CruList"[T]: - return CruList(ListOperations.remove_if(self, p)) - - def remove_all_if(self, p: ElementPredicate[T]) -> "CruList"[T]: - return CruList(ListOperations.remove_all_if(self, p)) - - def remove_all_value(self, *r: Any) -> "CruList"[T]: - return CruList(ListOperations.remove_all_value(self, *r)) - - def replace_all_value(self, old_value: Any, new_value: R) -> "CruList"[T | R]: - return CruList(ListOperations.replace_all_value(self, old_value, new_value)) - - @staticmethod - def make(l: CanBeList[T]) -> "CruList"[T]: - return CruList(ListOperations.make(l)) - - -class CruInplaceList(CruList, Generic[T]): - - def clear(self) -> "CruInplaceList[T]": - self.clear() - return self - - def extend(self, *l: Iterable[T]) -> "CruInplaceList[T]": - self.extend(l) - return self - - def reset(self, *l: Iterable[T]) -> "CruInplaceList[T]": - self.clear() - self.extend(l) - return self - - def transform(self, *f: OptionalElementTransformer) -> "CruInplaceList"[Any]: - return self.reset(super().transform(*f)) - - def transform_if(self, f: OptionalElementTransformer, p: ElementPredicate[T]) -> "CruInplaceList"[Any]: - return self.reset(super().transform_if(f, p)) - - def remove_by_indices(self, *index: int) -> "CruInplaceList"[T]: - return self.reset(super().remove_by_indices(*index)) - - def remove_all_if(self, p: ElementPredicate[T]) -> "CruInplaceList"[T]: - return self.reset(super().remove_all_if(p)) - - def remove_all_value(self, *r: Any) -> "CruInplaceList"[T]: - return self.reset(super().remove_all_value(*r)) - - def replace_all_value(self, old_value: Any, new_value: R) -> "CruInplaceList"[T | R]: - return self.reset(super().replace_all_value(old_value, new_value)) - - @staticmethod - def make(l: CanBeList[T]) -> "CruInplaceList"[T]: - return CruInplaceList(ListOperations.make(l)) - - -K = TypeVar("K") - - -class CruUniqueKeyInplaceList(Generic[T, K]): - KeyGetter = Callable[[T], K] - - def __init__(self, get_key: KeyGetter, *, before_add: Callable[[T], T] | None = None): - super().__init__() - self._get_key = get_key - self._before_add = before_add - self._l: CruInplaceList[T] = CruInplaceList() - - @property - def object_key_getter(self) -> KeyGetter: - return self._get_key - - @property - def internal_list(self) -> CruInplaceList[T]: - return self._l - - def validate_self(self): - keys = self._l.transform(self._get_key) - if len(keys) != len(set(keys)): - raise ValueError("Duplicate keys!") - - def get_or(self, k: K, fallback: Any = CRU_NOT_FOUND) -> T | Any: - r = self._l.find_if(lambda i: k == self._get_key(i)) - return r if r is not CRU_NOT_FOUND else fallback - - def get(self, k: K) -> T: - v = self.get_or(k, CRU_NOT_FOUND) - if v is CRU_NOT_FOUND: - raise KeyError(f"Key not found!") - return v - - def has_key(self, k: K) -> bool: - return self.get_or(k, CRU_NOT_FOUND) is not CRU_NOT_FOUND - - def has_any_key(self, *k: K) -> bool: - return self._l.any(lambda i: self._get_key(i) in k) - - def try_remove(self, k: K) -> bool: - i = self._l.find_index_if(lambda v: k == self._get_key(v)) - if i is CRU_NOT_FOUND: return False - self._l.remove_by_indices(i) - return True - - def remove(self, k: K, allow_absense: bool = False) -> None: - if not self.try_remove(k) and not allow_absense: - raise KeyError(f"Key {k} not found!") - - def add(self, v: T, /, replace: bool = False) -> None: - if self.has_key(self._get_key(v)): - if replace: - self.remove(self._get_key(v)) - else: - raise ValueError(f"Key {self._get_key(v)} already exists!") - if self._before_add is not None: - v = self._before_add(v) - self._l.append(v) - - def set(self, v: T) -> None: - self.add(v, True) - - def extend(self, l: Iterable[T], /, replace: bool = False) -> None: - if not replace and self.has_any_key([self._get_key(i) for i in l]): - raise ValueError("Keys already exists!") - if self._before_add is not None: - l = [self._before_add(i) for i in l] - keys = [self._get_key(i) for i in l] - self._l.remove_all_if(lambda i: self._get_key(i) in keys).extend(l) - - def clear(self) -> None: - self._l.clear() - - def __iter__(self): - return iter(self._l) - - def __len__(self): - return len(self._l) diff --git a/tools/cru-py/cru/util/_type.py b/tools/cru-py/cru/util/_type.py deleted file mode 100644 index dc50def..0000000 --- a/tools/cru-py/cru/util/_type.py +++ /dev/null @@ -1,42 +0,0 @@ -from types import NoneType -from typing import Any - -from ._list import CanBeList, CruList - -DEFAULT_NONE_ERR = ValueError -DEFAULT_NONE_ERR_MSG = "None is not allowed here." -DEFAULT_TYPE_ERR = ValueError -DEFAULT_TYPE_ERR_MSG = "Type of object is not allowed here." - - -class TypeSet(set[type]): - def __init__(self, *l: type): - l = CruList.make(l).remove_all_value(None, NoneType) - if not l.all_is_instance(type): - raise TypeError("t must be a type or None.") - super().__init__(l) - - def check_value(self, v: Any, /, allow_none: bool, empty_allow_all: bool = True, *, - none_err: type[Exception] = DEFAULT_NONE_ERR, - none_err_msg: str = DEFAULT_NONE_ERR_MSG, - type_err: type[Exception] = DEFAULT_TYPE_ERR, - type_err_msg: str = DEFAULT_TYPE_ERR_MSG) -> None: - if v is None: - if allow_none: - return - else: - raise none_err(none_err_msg) - if len(self) == 0 and empty_allow_all: - return - if type(v) not in self: - raise type_err(type_err_msg) - - def check_value_list(self, l: CanBeList, /, allow_none: bool, empty_allow_all: bool = True, *, - none_err: type[Exception] = DEFAULT_NONE_ERR, - none_err_msg: str = DEFAULT_NONE_ERR_MSG, - type_err: type[Exception] = DEFAULT_TYPE_ERR, - type_err_msg: str = DEFAULT_TYPE_ERR_MSG) -> None: - l = CruList.make(l) - for v in l: - self.check_value(v, allow_none, empty_allow_all, none_err=none_err, none_err_msg=none_err_msg, - type_err=type_err, type_err_msg=type_err_msg) -- cgit v1.2.3