diff options
Diffstat (limited to 'tools/cru-py')
-rw-r--r-- | tools/cru-py/cru/_const.py (renamed from tools/cru-py/cru/_util/_const.py) | 0 | ||||
-rw-r--r-- | tools/cru-py/cru/_cru.py (renamed from tools/cru-py/cru/_util/_cru.py) | 0 | ||||
-rw-r--r-- | tools/cru-py/cru/_event.py (renamed from tools/cru-py/cru/_util/_event.py) | 2 | ||||
-rw-r--r-- | tools/cru-py/cru/_func.py (renamed from tools/cru-py/cru/_util/_func.py) | 136 | ||||
-rw-r--r-- | tools/cru-py/cru/_iter.py | 506 | ||||
-rw-r--r-- | tools/cru-py/cru/_lang.py (renamed from tools/cru-py/cru/_util/_lang.py) | 0 | ||||
-rw-r--r-- | tools/cru-py/cru/_list.py | 128 | ||||
-rw-r--r-- | tools/cru-py/cru/_meta.py | 92 | ||||
-rw-r--r-- | tools/cru-py/cru/_type.py (renamed from tools/cru-py/cru/_util/_type.py) | 2 | ||||
-rw-r--r-- | tools/cru-py/cru/_util/__init__.py | 63 | ||||
-rw-r--r-- | tools/cru-py/cru/_util/_list.py | 915 |
11 files changed, 794 insertions, 1050 deletions
diff --git a/tools/cru-py/cru/_util/_const.py b/tools/cru-py/cru/_const.py index bc02c3a..bc02c3a 100644 --- a/tools/cru-py/cru/_util/_const.py +++ b/tools/cru-py/cru/_const.py diff --git a/tools/cru-py/cru/_util/_cru.py b/tools/cru-py/cru/_cru.py index 0085a80..0085a80 100644 --- a/tools/cru-py/cru/_util/_cru.py +++ b/tools/cru-py/cru/_cru.py diff --git a/tools/cru-py/cru/_util/_event.py b/tools/cru-py/cru/_event.py index 813e33f..ee914f0 100644 --- a/tools/cru-py/cru/_util/_event.py +++ b/tools/cru-py/cru/_event.py @@ -1,6 +1,6 @@ from typing import ParamSpec, TypeVar, Callable -from ._list import CruInplaceList, CruList +from ._iter import CruInplaceList, CruList P = ParamSpec('P') R = TypeVar('R') diff --git a/tools/cru-py/cru/_util/_func.py b/tools/cru-py/cru/_func.py index 0b8b07a..ef3da72 100644 --- a/tools/cru-py/cru/_util/_func.py +++ b/tools/cru-py/cru/_func.py @@ -20,54 +20,47 @@ _P = ParamSpec("_P") _T = TypeVar("_T") -class CruRawFunctions: - @staticmethod - def none(*_v, **_kwargs) -> None: - return None - - @staticmethod - def true(*_v, **_kwargs) -> Literal[True]: - return True - - @staticmethod - def false(*_v, **_kwargs) -> Literal[False]: - return False - - @staticmethod - def identity(v: _T) -> _T: - return v +_ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]] +_KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]] +_ChainableCallable: TypeAlias = Callable[ + ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]] +] - @staticmethod - def only_you(v: _T, *_v, **_kwargs) -> _T: - return v - @staticmethod - def equal(a: Any, b: Any) -> bool: - return a == b +class CruFunctionMeta: + class Base: + @staticmethod + def none(*_v, **_kwargs) -> None: + return None - @staticmethod - def not_equal(a: Any, b: Any) -> bool: - return a != b + @staticmethod + def true(*_v, **_kwargs) -> Literal[True]: + return True - @staticmethod - def not_(v: Any) -> Any: - return not v + @staticmethod + def false(*_v, **_kwargs) -> Literal[False]: + return False + @staticmethod + def identity(v: _T) -> _T: + return v -CruArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]] -CruKwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]] -CruChainableCallable: TypeAlias = Callable[ - ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]] -] + @staticmethod + def only_you(v: _T, *_v, **_kwargs) -> _T: + return v + @staticmethod + def equal(a: Any, b: Any) -> bool: + return a == b -class CruFunctionChainMode(Flag): - ARGS = auto() - KWARGS = auto() - BOTH = ARGS | KWARGS + @staticmethod + def not_equal(a: Any, b: Any) -> bool: + return a != b + @staticmethod + def not_(v: Any) -> Any: + return not v -class CruFunctionMeta: @staticmethod def bind(func: Callable[..., _T], *bind_args, **bind_kwargs) -> Callable[..., _T]: def bound_func(*args, **kwargs): @@ -84,10 +77,19 @@ class CruFunctionMeta: return bound_func + class ChainMode(Flag): + ARGS = auto() + KWARGS = auto() + BOTH = ARGS | KWARGS + + ArgsChainableCallable = _ArgsChainableCallable + KwargsChainableCallable = _KwargsChainableCallable + ChainableCallable = _ChainableCallable + @staticmethod def chain_with_args( - funcs: Iterable[CruArgsChainableCallable], *bind_args, **bind_kwargs - ) -> CruArgsChainableCallable: + funcs: Iterable[_ArgsChainableCallable], *bind_args, **bind_kwargs + ) -> _ArgsChainableCallable: def chained_func(*args): for func in funcs: args = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(*args) @@ -97,8 +99,8 @@ class CruFunctionMeta: @staticmethod def chain_with_kwargs( - funcs: Iterable[CruKwargsChainableCallable], *bind_args, **bind_kwargs - ) -> CruKwargsChainableCallable: + funcs: Iterable[_KwargsChainableCallable], *bind_args, **bind_kwargs + ) -> _KwargsChainableCallable: def chained_func(**kwargs): for func in funcs: kwargs = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(**kwargs) @@ -108,8 +110,8 @@ class CruFunctionMeta: @staticmethod def chain_with_both( - funcs: Iterable[CruChainableCallable], *bind_args, **bind_kwargs - ) -> CruChainableCallable: + funcs: Iterable[_ChainableCallable], *bind_args, **bind_kwargs + ) -> _ChainableCallable: def chained_func(*args, **kwargs): for func in funcs: args, kwargs = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)( @@ -121,48 +123,42 @@ class CruFunctionMeta: @staticmethod def chain( - mode: CruFunctionChainMode, + mode: ChainMode, funcs: Iterable[ - CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable + _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable ], *bind_args, **bind_kwargs, - ) -> CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable: - if mode == CruFunctionChainMode.ARGS: + ) -> _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable: + if mode == CruFunctionMeta.ChainMode.ARGS: return CruFunctionMeta.chain_with_args( - cast(Iterable[CruArgsChainableCallable], funcs), + cast(Iterable[_ArgsChainableCallable], funcs), *bind_args, **bind_kwargs, ) - elif mode == CruFunctionChainMode.KWARGS: + elif mode == CruFunctionMeta.ChainMode.KWARGS: return CruFunctionMeta.chain_with_kwargs( - cast(Iterable[CruKwargsChainableCallable], funcs), + cast(Iterable[_KwargsChainableCallable], funcs), *bind_args, **bind_kwargs, ) - elif mode == CruFunctionChainMode.BOTH: + elif mode == CruFunctionMeta.ChainMode.BOTH: return CruFunctionMeta.chain_with_both( - cast(Iterable[CruChainableCallable], funcs), *bind_args, **bind_kwargs + cast(Iterable[_ChainableCallable], funcs), *bind_args, **bind_kwargs ) -# Advanced Function Wrapper class CruFunction(Generic[_P, _T]): def __init__(self, f: Callable[_P, _T]): self._f = f @property - def f(self) -> Callable[_P, _T]: + def me(self) -> Callable[_P, _T]: return self._f - @property - def func(self) -> Callable[_P, _T]: - return self.f - def bind(self, *bind_args, **bind_kwargs) -> CruFunction[..., _T]: - self._f = CruFunctionMeta.bind(self._f, *bind_args, **bind_kwargs) - return self + return CruFunction(CruFunctionMeta.bind(self._f, *bind_args, **bind_kwargs)) def _iter_with_self( self, funcs: Iterable[Callable[..., Any]] @@ -173,10 +169,10 @@ class CruFunction(Generic[_P, _T]): @staticmethod def chain_with_args( self, - funcs: Iterable[CruArgsChainableCallable], + funcs: Iterable[_ArgsChainableCallable], *bind_args, **bind_kwargs, - ) -> CruArgsChainableCallable: + ) -> _ArgsChainableCallable: return CruFunction( CruFunctionMeta.chain_with_args( self._iter_with_self(funcs), *bind_args, **bind_kwargs @@ -184,8 +180,8 @@ class CruFunction(Generic[_P, _T]): ) def chain_with_kwargs( - self, funcs: Iterable[CruKwargsChainableCallable], *bind_args, **bind_kwargs - ) -> CruKwargsChainableCallable: + self, funcs: Iterable[_KwargsChainableCallable], *bind_args, **bind_kwargs + ) -> _KwargsChainableCallable: return CruFunction( CruFunctionMeta.chain_with_kwargs( self._iter_with_self(funcs), *bind_args, **bind_kwargs @@ -193,8 +189,8 @@ class CruFunction(Generic[_P, _T]): ) def chain_with_both( - self, funcs: Iterable[CruChainableCallable], *bind_args, **bind_kwargs - ) -> CruChainableCallable: + self, funcs: Iterable[_ChainableCallable], *bind_args, **bind_kwargs + ) -> _ChainableCallable: return CruFunction( CruFunctionMeta.chain_with_both( self._iter_with_self(funcs), *bind_args, **bind_kwargs @@ -205,11 +201,11 @@ class CruFunction(Generic[_P, _T]): self, mode: CruFunctionChainMode, funcs: Iterable[ - CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable + _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable ], *bind_args, **bind_kwargs, - ) -> CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable: + ) -> _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable: return CruFunction( CruFunctionMeta.chain( mode, self._iter_with_self(funcs), *bind_args, **bind_kwargs @@ -223,7 +219,7 @@ class CruFunction(Generic[_P, _T]): def make_chain( mode: CruFunctionChainMode, funcs: Iterable[ - CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable + _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable ], *bind_args, **bind_kwargs, diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py new file mode 100644 index 0000000..da849e8 --- /dev/null +++ b/tools/cru-py/cru/_iter.py @@ -0,0 +1,506 @@ +from __future__ import annotations + +from collections.abc import Iterable, Callable +from dataclasses import dataclass +from enum import Enum +from typing import ( + Concatenate, + Generator, + Iterator, + Literal, + Self, + TypeAlias, + TypeVar, + ParamSpec, + Any, + Generic, + cast, + overload, +) + +from ._cru import CRU +from ._meta import CruDecCreators +from ._const import CruNotFound + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_O = TypeVar("_O") +_V = TypeVar("_V") +_R = TypeVar("_R") + +CanBeList: TypeAlias = Iterable[_T] | _T | None + +ElementOperation: TypeAlias = Callable[[_T], Any] +ElementPredicate: TypeAlias = Callable[[_T], bool] +AnyElementPredicate: TypeAlias = ElementPredicate[Any] +ElementTransformer: TypeAlias = Callable[[_T], _O] +SelfElementTransformer: TypeAlias = ElementTransformer[_T, _T] +AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] + + +class _StepActionKind(Enum): + SKIP = 0 + PUSH = 1 + STOP = 2 + AGGREGATE = 3 + + +@dataclass +class _StepAction(Generic[_V, _R]): + value: Iterable[Self] | _V | _R | None + kind: _StepActionKind + + @property + def push_value(self) -> _V: + assert self.kind == _StepActionKind.PUSH + return cast(_V, self.value) + + @property + def stop_value(self) -> _R: + assert self.kind == _StepActionKind.STOP + return cast(_R, self.value) + + @staticmethod + def skip() -> _StepAction[_V, _R]: + return _StepAction(None, _StepActionKind.SKIP) + + @staticmethod + def push(value: _V | None) -> _StepAction[_V, _R]: + return _StepAction(value, _StepActionKind.PUSH) + + @staticmethod + def stop(value: _R | None = None) -> _StepAction[_V, _R]: + return _StepAction(value, _StepActionKind.STOP) + + @staticmethod + def aggregate(*results: _StepAction[_V, _R]) -> _StepAction[_V, _R]: + return _StepAction(results, _StepActionKind.AGGREGATE) + + @staticmethod + def push_last(value: _V | None) -> _StepAction[_V, _R]: + return _StepAction.aggregate(_StepAction.push(value), _StepAction.stop()) + + def flatten(self) -> Iterable[Self]: + return CruIterableMeta.flatten( + self, + is_leave=lambda r: r.kind != _StepActionKind.AGGREGATE, + get_children=lambda r: cast(Iterable[Self], r.value), + ) + + +_GeneralStepAction: TypeAlias = _StepAction[_V, _R] | _V | _R | None +_IterateOperation = Callable[[_T, int], _GeneralStepAction[_V, _R]] +_IteratePreHook = Callable[[Iterable[_T]], _GeneralStepAction[_V, _R]] +_IteratePostHook = Callable[[int], _GeneralStepAction[_V, _R]] + + +class CruIterableMeta: + class Generic: + StepActionKind = _StepActionKind + StepAction = _StepAction + GeneralStepAction = _GeneralStepAction + IterateOperation = _IterateOperation + IteratePreHook = _IteratePreHook + IteratePostHook = _IteratePostHook + + class Results: + @staticmethod + def true(_) -> Literal[True]: + return True + + @staticmethod + def false(_) -> Literal[False]: + return False + + @staticmethod + def not_found(_) -> Literal[CruNotFound.VALUE]: + return CruNotFound.VALUE + + @staticmethod + def _non_result_to_push(value: Any) -> _StepAction[_V, _R]: + return _StepAction.push(value) + + @staticmethod + def _non_result_to_stop(value: Any) -> _StepAction[_V, _R]: + return _StepAction.stop(value) + + @staticmethod + def _none_hook(_: Any) -> _StepAction[_V, _R]: + return _StepAction.skip() + + def iterate( + iterable: Iterable[_T], + operation: _IterateOperation[_T, _V, _R], + fallback_return: _R, + pre_iterate: _IteratePreHook[_T, _V, _R], + post_iterate: _IteratePostHook[_V, _R], + convert_value_result: Callable[[_V | _R | None], _StepAction[_V, _R]], + ) -> Generator[_V, None, _R]: + pre_result = pre_iterate(iterable) + if not isinstance(pre_result, _StepAction): + real_pre_result = convert_value_result(pre_result) + for r in real_pre_result.flatten(): + if r.kind == _StepActionKind.STOP: + return r.stop_value + elif r.kind == _StepActionKind.PUSH: + yield r.push_value + else: + assert r.kind == _StepActionKind.SKIP + + for index, element in enumerate(iterable): + result = operation(element, index) + if not isinstance(result, _StepAction): + real_result = convert_value_result(result) + for r in real_result.flatten(): + if r.kind == _StepActionKind.STOP: + return r.stop_value + elif r.kind == _StepActionKind.PUSH: + yield r.push_value + else: + assert r.kind == _StepActionKind.SKIP + continue + + post_result = post_iterate(index + 1) + if not isinstance(post_result, _StepAction): + real_post_result = convert_value_result(post_result) + for r in real_post_result.flatten(): + if r.kind == _StepActionKind.STOP: + return r.stop_value + elif r.kind == _StepActionKind.PUSH: + yield r.push_value + else: + assert r.kind == _StepActionKind.SKIP + + return fallback_return + + def create_new( + iterable: Iterable[_T], + operation: _IterateOperation[_T, _V, _R], + fallback_return: _R, + /, + pre_iterate: _IteratePreHook[_T, _V, _R] | None = None, + post_iterate: _IteratePostHook[_V, _R] | None = None, + ) -> Generator[_V, None, _R]: + return CruIterableMeta.Generic.iterate( + iterable, + operation, + fallback_return, + pre_iterate or CruIterableMeta.Generic._none_hook, + post_iterate or CruIterableMeta.Generic._none_hook, + CruIterableMeta.Generic._non_result_to_push, + ) + + def get_result( + iterable: Iterable[_T], + operation: _IterateOperation[_T, _V, _R], + fallback_return: _R, + /, + pre_iterate: _IteratePreHook[_T, _V, _R] | None = None, + post_iterate: _IteratePostHook[_V, _R] | None = None, + ) -> _R: + try: + for _ in CruIterableMeta.Generic.iterate( + iterable, + operation, + fallback_return, + pre_iterate or CruIterableMeta.Generic._none_hook, + post_iterate or CruIterableMeta.Generic._none_hook, + CruIterableMeta.Generic._non_result_to_stop, + ): + pass + except StopIteration as stop: + return stop.value + raise RuntimeError("Should not reach here") + + class Creators: + @staticmethod + def empty() -> Iterable[_T]: + return iter([]) + + @staticmethod + def range(*args) -> Iterable[int]: + return iter(range(*args)) + + @staticmethod + def unite(*args: _O) -> Iterable[_O]: + return iter(args) + + @staticmethod + def concat(*iterables: Iterable[_T]) -> Iterable[_T]: + for iterable in iterables: + yield from iterable + + @staticmethod + def with_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]: + count = 0 + + def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _O: + nonlocal count + r = c(count, *args, **kwargs) + count += 1 + return r + + return wrapper + + @staticmethod + def to_set(iterable: Iterable[_T], discard: Iterable[Any]) -> set[_T]: + return set(iterable) - set(discard) + + @staticmethod + def to_list(iterable: Iterable[_T], discard: Iterable[Any]) -> list[_T]: + return [v for v in iterable if v not in set(discard)] + + @staticmethod + def all(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> bool: + for value in iterable: + if not predicate(value): + return False + return True + + @staticmethod + def any(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> bool: + for value in iterable: + if predicate(value): + return True + return False + + @staticmethod + def foreach(iterable: Iterable[_T], operation: ElementOperation[_T]) -> None: + for value in iterable: + operation(value) + + @staticmethod + def transform( + iterable: Iterable[_T], transformer: ElementTransformer[_T, _O] + ) -> Iterable[_O]: + for value in iterable: + yield transformer(value) + + @staticmethod + def filter(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> Iterable[_T]: + for value in iterable: + if predicate(value): + yield value + + @staticmethod + def continue_if( + iterable: Iterable[_T], predicate: ElementPredicate[_T] + ) -> Iterable[_T]: + for value in iterable: + yield value + if not predicate(value): + break + + @staticmethod + def first_n(iterable: Iterable[_T], max_count: int) -> Iterable[_T]: + if max_count < 0: + raise ValueError("max_count must be 0 or positive.") + if max_count == 0: + return CruIterableMeta.Creators.empty() + return CruIterableMeta.continue_if( + iterable, _with_count(lambda i, _: i < max_count - 1) + ) + + @staticmethod + def drop_n(iterable: Iterable[_T], n: int) -> Iterable[_T]: + if n < 0: + raise ValueError("n must be 0 or positive.") + if n == 0: + return iterable + return CruIterableMeta.filter(iterable, _with_count(lambda i, _: i < n)) + + @staticmethod + def single_or( + iterable: Iterable[_T], fallback: _O | CruNotFound | None = CruNotFound.VALUE + ) -> _T | _O | Any | CruNotFound: + first_2 = CruIterableMeta.first_n(iterable, 2) + has_value = False + value = None + for element in first_2: + if has_value: + raise ValueError("More than one value found.") + has_value = True + value = element + if has_value: + return value + else: + return fallback + + @staticmethod + def _is_not_iterable(o: Any) -> bool: + return not isinstance(o, Iterable) + + @staticmethod + def _return_self(o): + return o + + @staticmethod + @overload + def flatten(o: Iterable[_T], max_depth: int = -1) -> Iterable[_T]: ... + + @staticmethod + @overload + def flatten( + o: _O, + max_depth: int = -1, + *, + is_leave: ElementPredicate[_O], + get_children: ElementTransformer[_O, Iterable[_O]], + ) -> Iterable[_O]: ... + + @staticmethod + def flatten( + o: _O, + max_depth: int = -1, + *, + is_leave: ElementPredicate[_O] = _is_not_iterable, + get_children: ElementTransformer[_O, Iterable[_O]] = _return_self, + _depth: int = 0, + ) -> Iterable[Any]: + if _depth == max_depth or is_leave(o): + yield o + return + for child in get_children(o): + yield from CruIterableMeta.flatten( + child, + max_depth, + is_leave=is_leave, + get_children=get_children, + _depth=_depth + 1, + ) # type: ignore + + +_with_count = CruIterableMeta.with_count + + +class CruIterableWrapper(Generic[_T]): + Meta = CruIterableMeta + + class Dec: + @staticmethod + def _wrap(iterable: Iterable[_O]) -> CruIterableWrapper[_O]: + return CruIterableWrapper(iterable) + + wrap = CruDecCreators.convert_result(_wrap) + + @staticmethod + def _as_iterable(_self: CruIterableWrapper[_O]) -> CruIterableWrapper[_O]: + return _self + + meta = CruDecCreators.create_implement_by(_as_iterable) + meta_no_self = CruDecCreators.create_implement_by_no_self() + + def __init__( + self, + iterable: Iterable[_T], + ) -> None: + self._iterable = iterable + + def __iter__(self) -> Iterator[_T]: + return self._iterable.__iter__() + + @property + def me(self) -> Iterable[_T]: + return self._iterable + + def replace_me(self, iterable: Iterable[_O]) -> CruIterableWrapper[_O]: + return CruIterableWrapper(iterable) + + @Dec.wrap + @Dec.meta_no_self(CruIterableMeta.Creators.empty) + def replace_me_with_empty(self) -> None: + pass + + @Dec.wrap + @Dec.meta_no_self(CruIterableMeta.Creators.range) + def replace_me_with_range(self) -> None: + pass + + @Dec.wrap + @Dec.meta_no_self(CruIterableMeta.Creators.unite) + def replace_me_with_unite(self) -> None: + pass + + @Dec.wrap + @Dec.meta_no_self(CruIterableMeta.Creators.concat) + def replace_me_with_concat(self) -> None: + pass + + @Dec.meta(CruIterableMeta.to_set) + def to_set(self) -> None: + pass + + @Dec.meta(CruIterableMeta.to_list) + def to_list(self) -> None: + pass + + @Dec.meta(CruIterableMeta.all) + def all(self) -> None: + pass + + @Dec.meta(CruIterableMeta.any) + def any(self) -> None: + pass + + @Dec.meta(CruIterableMeta.foreach) + def foreach(self) -> None: + pass + + @Dec.wrap + @Dec.meta(CruIterableMeta.transform) + def transform(self) -> None: + pass + + map = transform + + @Dec.wrap + @Dec.meta(CruIterableMeta.filter) + def filter(self) -> None: + pass + + @Dec.wrap + @Dec.meta(CruIterableMeta.continue_if) + def continue_if(self) -> None: + pass + + @Dec.wrap + @Dec.meta(CruIterableMeta.first_n) + def first_n(self) -> None: + pass + + @Dec.wrap + @Dec.meta(CruIterableMeta.drop_n) + def drop_n(self) -> None: + pass + + @Dec.wrap + @Dec.meta(CruIterableMeta.flatten) + def flatten(self) -> None: + pass + + @Dec.meta(CruIterableMeta.single_or) + def single_or(self) -> None: + pass + + @Dec.wrap + def select_by_indices(self, indices: Iterable[int]) -> Iterable[_T]: + index_set = set(indices) + max_index = max(index_set) + return self.first_n(max_index + 1).filter( + _with_count(lambda i, _: i in index_set) + ) + + @Dec.wrap + def remove_values(self, values: Iterable[Any]) -> Iterable[_V]: + value_set = set(values) + return self.filter(lambda v: v not in value_set) + + @Dec.wrap + def replace_values( + self, old_values: Iterable[Any], new_value: _O + ) -> Iterable[_V | _O]: + value_set = set(old_values) + return self.map(lambda v: new_value if v in value_set else v) + + +CRU.add_objects(CruIterableMeta, CruIterableWrapper) diff --git a/tools/cru-py/cru/_util/_lang.py b/tools/cru-py/cru/_lang.py index 925ba00..925ba00 100644 --- a/tools/cru-py/cru/_util/_lang.py +++ b/tools/cru-py/cru/_lang.py diff --git a/tools/cru-py/cru/_list.py b/tools/cru-py/cru/_list.py new file mode 100644 index 0000000..f1965db --- /dev/null +++ b/tools/cru-py/cru/_list.py @@ -0,0 +1,128 @@ + + +class CruInplaceList(CruList, Generic[_V]): + + def clear(self) -> "CruInplaceList[_V]": + self.clear() + return self + + def extend(self, *l: Iterable[_V]) -> "CruInplaceList[_V]": + self.extend(l) + return self + + def reset(self, *l: Iterable[_V]) -> "CruInplaceList[_V]": + self.clear() + self.extend(l) + return self + + def transform(self, *f: OptionalElementTransformer) -> "CruInplaceList"[Any]: + return self.reset(super().transform(*f)) + + def transform_if( + self, f: OptionalElementTransformer, p: ElementPredicate[_V] + ) -> "CruInplaceList"[Any]: + return self.reset(super().transform_if(f, p)) + + def remove_by_indices(self, *index: int) -> "CruInplaceList"[_V]: + return self.reset(super().remove_by_indices(*index)) + + def remove_all_if(self, p: ElementPredicate[_V]) -> "CruInplaceList"[_V]: + return self.reset(super().remove_all_if(p)) + + def remove_all_value(self, *r: Any) -> "CruInplaceList"[_V]: + return self.reset(super().remove_all_value(*r)) + + def replace_all_value( + self, old_value: Any, new_value: R + ) -> "CruInplaceList"[_V | R]: + return self.reset(super().replace_all_value(old_value, new_value)) + + @staticmethod + def make(l: CanBeList[_V]) -> "CruInplaceList"[_V]: + return CruInplaceList(ListOperations.make(l)) + + + +K = TypeVar("K") + + +class CruUniqueKeyInplaceList(Generic[_V, K]): + KeyGetter = Callable[[_V], K] + + def __init__( + self, get_key: KeyGetter, *, before_add: Callable[[_V], _V] | None = None + ): + super().__init__() + self._get_key = get_key + self._before_add = before_add + self._l: CruInplaceList[_V] = CruInplaceList() + + @property + def object_key_getter(self) -> KeyGetter: + return self._get_key + + @property + def internal_list(self) -> CruInplaceList[_V]: + return self._l + + def validate_self(self): + keys = self._l.transform(self._get_key) + if len(keys) != len(set(keys)): + raise ValueError("Duplicate keys!") + + def get_or(self, k: K, fallback: Any = CRU_NOT_FOUND) -> _V | Any: + r = self._l.find_if(lambda i: k == self._get_key(i)) + return r if r is not CRU_NOT_FOUND else fallback + + def get(self, k: K) -> _V: + v = self.get_or(k, CRU_NOT_FOUND) + if v is CRU_NOT_FOUND: + raise KeyError(f"Key not found!") + return v + + def has_key(self, k: K) -> bool: + return self.get_or(k, CRU_NOT_FOUND) is not CRU_NOT_FOUND + + def has_any_key(self, *k: K) -> bool: + return self._l.any(lambda i: self._get_key(i) in k) + + def try_remove(self, k: K) -> bool: + i = self._l.find_index_if(lambda v: k == self._get_key(v)) + if i is CRU_NOT_FOUND: + return False + self._l.remove_by_indices(i) + return True + + def remove(self, k: K, allow_absense: bool = False) -> None: + if not self.try_remove(k) and not allow_absense: + raise KeyError(f"Key {k} not found!") + + def add(self, v: _V, /, replace: bool = False) -> None: + if self.has_key(self._get_key(v)): + if replace: + self.remove(self._get_key(v)) + else: + raise ValueError(f"Key {self._get_key(v)} already exists!") + if self._before_add is not None: + v = self._before_add(v) + self._l.append(v) + + def set(self, v: _V) -> None: + self.add(v, True) + + def extend(self, l: Iterable[_V], /, replace: bool = False) -> None: + if not replace and self.has_any_key([self._get_key(i) for i in l]): + raise ValueError("Keys already exists!") + if self._before_add is not None: + l = [self._before_add(i) for i in l] + keys = [self._get_key(i) for i in l] + self._l.remove_all_if(lambda i: self._get_key(i) in keys).extend(l) + + def clear(self) -> None: + self._l.clear() + + def __iter__(self): + return iter(self._l) + + def __len__(self): + return len(self._l) diff --git a/tools/cru-py/cru/_meta.py b/tools/cru-py/cru/_meta.py new file mode 100644 index 0000000..86763cf --- /dev/null +++ b/tools/cru-py/cru/_meta.py @@ -0,0 +1,92 @@ +from collections.abc import Callable +from typing import Concatenate, Generic, ParamSpec, TypeVar + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_O = TypeVar("_O") +_R = TypeVar("_R") + + +class CruDecorator: + + class ImplementedBy(Generic[_O, _P, _R]): + # TODO: Continue here tomorrow! + def __init__( + self, impl: Callable[Concatenate[_O, _P], _R], pass_self: bool + ) -> None: + self.impl = impl + self.pass_self = pass_self + + def __call__( + self, _origin: Callable[[_T], None] + ) -> Callable[Concatenate[_T, _P], _R]: + def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: + if self.pass_self: + return self.impl(_self, *args, **kwargs) + else: + return self.impl(*args, **kwargs) + + return real_impl + + @staticmethod + def convert_result( + converter: Callable[[_T], _O] + ) -> Callable[[Callable[_P, _T]], Callable[_P, _O]]: + def dec(original: Callable[_P, _T]) -> Callable[_P, _O]: + def wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _O: + return converter(original(*args, **kwargs)) + + return wrapped + + return dec + + @staticmethod + def create_implement_by(converter: Callable[[_T], _O]) -> Callable[ + [Callable[Concatenate[_O, _P], _R]], + Callable[ + [Callable[[_T], None]], + Callable[Concatenate[_T, _P], _R], + ], + ]: + def implement_by( + m: Callable[Concatenate[_O, _P], _R], + ) -> Callable[ + [Callable[[_T], None]], + Callable[Concatenate[_T, _P], _R], + ]: + def implementation(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: + return m(converter(_self), *args, **kwargs) + + def decorator(_: Callable[[_T], None]) -> Callable[Concatenate[_T, _P], _R]: + return implementation + + return decorator + + return implement_by + + @staticmethod + def create_implement_by_no_self() -> Callable[ + [Callable[_P, _R]], + Callable[ + [Callable[[_T], None]], + Callable[Concatenate[_T, _P], _R], + ], + ]: + def implement_by_no_self( + m: Callable[_P, _R], + ) -> Callable[ + [Callable[[_T], None]], + Callable[Concatenate[_T, _P], _R], + ]: + def implementation(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: + return m(*args, **kwargs) + + def decorator(_: Callable[[_T], None]) -> Callable[Concatenate[_T, _P], _R]: + return implementation + + return decorator + + return implement_by_no_self + + +CRU.add diff --git a/tools/cru-py/cru/_util/_type.py b/tools/cru-py/cru/_type.py index dc50def..b67ee9a 100644 --- a/tools/cru-py/cru/_util/_type.py +++ b/tools/cru-py/cru/_type.py @@ -1,7 +1,7 @@ from types import NoneType from typing import Any -from ._list import CanBeList, CruList +from ._iter import CanBeList, CruList DEFAULT_NONE_ERR = ValueError DEFAULT_NONE_ERR_MSG = "None is not allowed here." diff --git a/tools/cru-py/cru/_util/__init__.py b/tools/cru-py/cru/_util/__init__.py deleted file mode 100644 index 481502c..0000000 --- a/tools/cru-py/cru/_util/__init__.py +++ /dev/null @@ -1,63 +0,0 @@ -from ._const import ( - CruNotFound, - CruUseDefault, - CruDontChange, - CruNoValue, - CruPlaceholder, -) -from ._func import ( - CruFunction, - CruFunctionMeta, - CruRawFunctions, - CruWrappedFunctions, - CruFunctionGenerators, -) -from ._list import ( - CruList, - CruInplaceList, - CruUniqueKeyInplaceList, - ListOperations, - CanBeList, - ElementOperation, - ElementPredicate, - ElementTransformer, - OptionalElementOperation, - ElementPredicate, - OptionalElementTransformer, -) -from ._type import TypeSet - -F = CruFunction -WF = CruWrappedFunctions -FG = CruFunctionGenerators -L = CruList - - -__all__ = [ - "CruNotFound", - "CruUseDefault", - "CruDontChange", - "CruNoValue", - "CruPlaceholder", - "CruFunction", - "CruFunctionMeta", - "CruRawFunctions", - "CruWrappedFunctions", - "CruFunctionGenerators", - "CruList", - "CruInplaceList", - "CruUniqueKeyInplaceList", - "ListOperations", - "CanBeList", - "ElementOperation", - "ElementPredicate", - "ElementTransformer", - "OptionalElementOperation", - "ElementPredicate", - "OptionalElementTransformer", - "TypeSet", - "F", - "WF", - "FG", - "L", -] diff --git a/tools/cru-py/cru/_util/_list.py b/tools/cru-py/cru/_util/_list.py deleted file mode 100644 index 711d5f1..0000000 --- a/tools/cru-py/cru/_util/_list.py +++ /dev/null @@ -1,915 +0,0 @@ -from __future__ import annotations - -from collections.abc import Iterable, Callable -from dataclasses import dataclass -from enum import Enum -from typing import ( - Generator, - Literal, - Self, - TypeAlias, - TypeVar, - ParamSpec, - Any, - Generic, - ClassVar, - Optional, - Union, - assert_never, - cast, - overload, - override, -) - -from ._const import CruNoValue, CruNotFound - -P = ParamSpec("P") -T = TypeVar("T") -O = TypeVar("O") -F = TypeVar("F") - -CanBeList: TypeAlias = Iterable[T] | T | None - -OptionalIndex: TypeAlias = int | None -OptionalType: TypeAlias = type | None -ElementOperation: TypeAlias = Callable[[T], Any] -ElementPredicate: TypeAlias = Callable[[T], bool] -ElementTransformer: TypeAlias = Callable[[T], O] -SelfElementTransformer: TypeAlias = ElementTransformer[T, T] -AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] - - -def flatten_with_func( - o: T, - max_depth: int, - is_leave: ElementPredicate[T], - get_children: ElementTransformer[T, Iterable[T]], - depth: int = 0, -) -> Iterable[T]: - if depth == max_depth or is_leave(o): - yield o - return - for child in get_children(o): - yield from flatten_with_func( - child, max_depth, is_leave, get_children, depth + 1 - ) - - -class _StepActionKind(Enum): - SKIP = 0 - # TODO: Rename this - SEND = 1 - STOP = 2 - AGGREGATE = 3 - - -@dataclass -class _StepAction(Generic[T]): - value: Iterable[_StepAction[T]] | T | None - kind: _StepActionKind - - @property - def non_aggregate_value(self) -> T: - assert self.kind != _StepActionKind.AGGREGATE - return cast(T, self.value) - - @staticmethod - def skip() -> _StepAction[T]: - return _StepAction(None, _StepActionKind.SKIP) - - @staticmethod - def send(value: T | None) -> _StepAction[T]: - return _StepAction(value, _StepActionKind.SEND) - - @staticmethod - def stop(value: T | None = None) -> _StepAction[T]: - return _StepAction(value, _StepActionKind.STOP) - - @staticmethod - def aggregate(*results: _StepAction[T]) -> _StepAction[T]: - return _StepAction(results, _StepActionKind.AGGREGATE) - - @staticmethod - def send_last(value: Any) -> _StepAction[T]: - return _StepAction.aggregate(_StepAction.send(value), _StepAction.stop()) - - def flatten(self) -> Iterable[_StepAction[T]]: - return flatten_with_func( - self, - -1, - lambda r: r.kind != _StepActionKind.AGGREGATE, - lambda r: cast(Iterable[_StepAction[T]], r.value), - ) - - -_r_skip = _StepAction.skip -_r_send = _StepAction.send -_r_stop = _StepAction.stop -_r_send_last = _StepAction.send_last -_r_aggregate = _StepAction.aggregate - - -_GeneralStepAction: TypeAlias = _StepAction[T] | T | None -_GeneralStepActionConverter: TypeAlias = Callable[ - [_GeneralStepAction[T]], _StepAction[T] -] -_IterateOperation = Callable[[T, int], _GeneralStepAction[O]] -_IteratePreHook = Callable[[Iterable[T]], _GeneralStepAction[O]] -_IteratePostHook = Callable[[int], _GeneralStepAction[O]] - - -class CruGenericIterableMeta: - StepActionKind = _StepActionKind - StepAction = _StepAction - GeneralStepAction = _GeneralStepAction - GeneralStepActionConverter = _GeneralStepActionConverter - IterateOperation = _IterateOperation - IteratePreHook = _IteratePreHook - IteratePostHook = _IteratePostHook - - @staticmethod - def _non_result_to_send(value: O | None) -> _StepAction[O]: - return _StepAction.send(value) - - @staticmethod - def _non_result_to_stop(value: O | None) -> _StepAction[O]: - return _StepAction.stop(value) - - @staticmethod - def _none_pre_iterate() -> _StepAction[O]: - return _r_skip() - - @staticmethod - def _none_post_iterate( - _index: int, - ) -> _StepAction[O]: - return _r_skip() - - def iterate( - self, - operation: _IterateOperation[T, O], - fallback_return: O, - pre_iterate: _IteratePreHook[T, O], - post_iterate: _IteratePostHook[O], - convert_non_result: Callable[[O | None], _StepAction[O]], - ) -> Generator[O, None, O]: - pre_result = pre_iterate(self._iterable) - if not isinstance(pre_result, _StepAction): - real_pre_result = convert_non_result(pre_result) - for r in real_pre_result.flatten(): - if r.kind == _StepActionKind.STOP: - return r.non_aggregate_value - elif r.kind == _StepActionKind.SEND: - yield r.non_aggregate_value - - for index, element in enumerate(self._iterable): - result = operation(element, index) - if not isinstance(result, _StepAction): - real_result = convert_non_result(result) - for r in real_result.flatten(): - if r.kind == _StepActionKind.STOP: - return r.non_aggregate_value - elif r.kind == _StepActionKind.SEND: - yield r.non_aggregate_value - else: - continue - - post_result = post_iterate(index + 1) - if not isinstance(post_result, _StepAction): - real_post_result = convert_non_result(post_result) - for r in real_post_result.flatten(): - if r.kind == _StepActionKind.STOP: - return r.non_aggregate_value - elif r.kind == _StepActionKind.SEND: - yield r.non_aggregate_value - - return fallback_return - - def _new( - self, - operation: _IterateOperation, - fallback_return: O, - /, - pre_iterate: _IteratePreHook[T, O] | None = None, - post_iterate: _IteratePostHook[O] | None = None, - ) -> CruIterableWrapper: - return CruIterableWrapper( - self.iterate( - operation, - fallback_return, - pre_iterate or CruIterableWrapper._none_pre_iterate, - post_iterate or CruIterableWrapper._none_post_iterate, - CruIterableWrapper._non_result_to_send, - ), - self._create_new_upstream(), - ) - - def _result( - self, - operation: _IterateOperation, - fallback_return: O, - /, - result_transform: SelfElementTransformer[O] | None = None, - pre_iterate: _IteratePreHook[T, O] | None = None, - post_iterate: _IteratePostHook[O] | None = None, - ) -> O: - try: - for _ in self.iterate( - operation, - fallback_return, - pre_iterate or CruIterableWrapper._none_pre_iterate, - post_iterate or CruIterableWrapper._none_post_iterate, - CruIterableWrapper._non_result_to_stop, - ): - pass - except StopIteration as stop: - return ( - stop.value if result_transform is None else result_transform(stop.value) - ) - raise RuntimeError("Should not reach here") - - -class IterDefaultResults: - @staticmethod - def true(_): - return True - - @staticmethod - def false(_): - return False - - @staticmethod - def not_found(_): - return CruNotFound.VALUE - - -class CruIterableCreators: - @staticmethod - def with_(o: Any) -> CruIterableWrapper: - return CruIterableWrapper(iter(o)) - - @staticmethod - def empty() -> CruIterableWrapper: - return CruIterableCreators.with_([]) - - @staticmethod - def range( - a, - b=None, - c=None, - ) -> CruIterableWrapper[int]: - args = [arg for arg in [a, b, c] if arg is not None] - return CruIterableCreators.with_(range(*args)) - - @staticmethod - def unite(*args: T) -> CruIterableWrapper[T]: - return CruIterableCreators.with_(args) - - @staticmethod - def _concat(*iterables: Iterable) -> Iterable: - for iterable in iterables: - yield from iterable - - @staticmethod - def concat(*iterables: Iterable) -> CruIterableWrapper: - return CruIterableWrapper(CruIterableCreators._concat(*iterables)) - - -class CruIterableWrapper(Generic[T]): - - def __init__( - self, - iterable: Iterable[T], - ) -> None: - self._iterable = iterable - - def __iter__(self): - return self._iterable.__iter__() - - @property - def me(self) -> Iterable[T]: - return self._iterable - - def replace_me_with(self, iterable: Iterable[O]) -> CruIterableWrapper[O]: - return CruIterableCreators.with_(iterable) - - def replace_me_with_empty(self) -> CruIterableWrapper[O]: - return CruIterableCreators.empty() - - def replace_me_with_range(self, a, b=None, c=None) -> CruIterableWrapper[int]: - return CruIterableCreators.range(a, b, c) - - def replace_me_with_unite(self, *args: O) -> CruIterableWrapper[O]: - return CruIterableCreators.unite(*args) - - def replace_me_with_concat(self, *iterables: Iterable) -> CruIterableWrapper: - return CruIterableCreators.concat(*iterables) - - @staticmethod - def _make_set(iterable: Iterable[O], discard: Iterable[Any] | None) -> set[O]: - s = set(iterable) - if discard is not None: - s = s - set(discard) - return s - - @staticmethod - def _make_list(iterable: Iterable[O], discard: Iterable[Any] | None) -> list[O]: - if discard is None: - return list(iterable) - return [v for v in iterable if v not in discard] - - def _help_make_set( - self, iterable: Iterable[O], discard: Iterable[Any] | None - ) -> set[O]: - return CruIterableWrapper._make_set(iterable, discard) - - def _help_make_list( - self, iterable: Iterable[O], discard: Iterable[Any] | None - ) -> list[O]: - return CruIterableWrapper._make_list(iterable, discard) - - def to_set(self, discard: Iterable[Any] | None = None) -> set[T]: - return CruIterableWrapper._make_set(self.me, discard) - - def to_list(self, discard: Iterable[Any] | None = None) -> list[T]: - return CruIterableWrapper._make_list(self.me, discard) - - def copy(self) -> CruIterableWrapper: - return CruIterableWrapper(iter(self.to_list()), self._create_new_upstream()) - - def new_start( - self, other: Iterable[O], /, clear_upstream: bool = False - ) -> CruIterableWrapper[O]: - return CruIterableWrapper( - other, None if clear_upstream else self._create_new_upstream() - ) - - @overload - def concat(self) -> Self: ... - - @overload - def concat( - self, *iterable: Iterable[Any], last: Iterable[O] - ) -> CruIterableWrapper[O]: ... - - def concat(self, *iterable: Iterable[Any]) -> CruIterableWrapper[Any]: # type: ignore - return self.new_start(CruIterableCreators.concat(self.me, *iterable)) - - def all(self, predicate: ElementPredicate[T]) -> bool: - """ - partial - """ - return self._result(lambda v, _: predicate(v) and None, IterDefaultResults.true) - - def all_isinstance(self, *types: OptionalType) -> bool: - """ - partial - """ - types = self._help_make_set(types) - return self.all(lambda v: type(v) in types) - - def any(self, predicate: ElementPredicate[T]) -> bool: - """ - partial - """ - return self._result(lambda v, _: predicate(v) or None, IterDefaultResults.false) - - def number(self) -> CruIterableWrapper: - """ - partial - """ - return self._new(lambda _, i: i) - - def take(self, predicate: ElementPredicate[T]) -> CruIterableWrapper: - """ - complete - """ - return self._new(lambda v, _: _r_send(v) if predicate(v) else None) - - def transform( - self, *transformers: OptionalElementTransformer - ) -> CruIterableWrapper: - """ - complete - """ - - def _transform_element(element, _): - for transformer in self._help_make_list(transformers): - if transformer is not None: - element = transformer(element) - return _r_send(element) - - return self._new(_transform_element) - - def take_n(self, max_count: int, neg_is_clone: bool = True) -> CruIterableWrapper: - """ - partial - """ - if max_count < 0: - if neg_is_clone: - return self.clone_me() - else: - raise ValueError("max_count must be 0 or positive.") - elif max_count == 0: - return self.drop_all() - return self._new( - lambda v, i: _r_send(v) if i < max_count - 1 else _r_send_last(v) - ) - - def take_by_indices(self, *indices: OptionalIndex) -> CruIterableWrapper: - """ - partial - """ - indices = self._help_make_set(indices) - max_index = max(indices) - return self.take_n(max_index + 1)._new( - lambda v, i: _r_send(v) if i in indices else None - ) - - def single_or( - self, fallback: Any | None = CRU_NOT_FOUND - ) -> T | Any | CRU_NOT_FOUND: - """ - partial - """ - first_2 = self.take_n(2) - has_value = False - value = None - for element in first_2.me: - if has_value: - raise ValueError("More than one value found.") - has_value = True - value = element - if has_value: - return value - else: - return fallback - - def first_or( - self, predicate: ElementPredicate[T], fallback: Any | None = CRU_NOT_FOUND - ) -> T | CRU_NOT_FOUND: - """ - partial - """ - result_iterable = self.take_n(1).single_or() - - @staticmethod - def first_index( - iterable: Iterable[T], predicate: ElementPredicate[T] - ) -> int | CRU_NOT_FOUND: - """ - partial - """ - for index, element in enumerate(iterable): - if predicate(element): - return index - - @staticmethod - def take_indices( - iterable: Iterable[T], predicate: ElementPredicate[T] - ) -> Iterable[int]: - """ - complete - """ - for index, element in enumerate(iterable): - if predicate(element): - yield index - - @staticmethod - def flatten( - o, - max_depth=-1, - is_leave: ElementPredicate | None = None, - get_children: OptionalElementTransformer = None, - ) -> Iterable: - """ - complete - """ - if is_leave is None: - is_leave = lambda v: not isinstance(v, Iterable) - if get_children is None: - get_children = lambda v: v - return CruIterableWrapper._flatten_with_func( - o, max_depth, is_leave, get_children - ) - - @staticmethod - def skip_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]: - """ - complete - """ - indices = set(indices) - {None} - for index, element in enumerate(iterable): - if index not in indices: - yield element - - @staticmethod - def skip_if(iterable: Iterable[T], predicate: ElementPredicate[T]) -> list[T]: - """ - complete - """ - for element in iterable: - if not predicate(element): - yield element - - def drop_all(self) -> CruIterableWrapper: - return self.replace_me_with_empty() - - @staticmethod - def remove_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]: - return [v for v in l if not p(v)] - - @staticmethod - def remove_all_value(l: Iterable[T], *r: Any) -> list[T]: - return [v for v in l if v not in r] - - @staticmethod - def replace_all_value(l: Iterable[T], old_value: Any, new_value: R) -> list[T | R]: - return [new_value if v == old_value else v for v in l] - - @staticmethod - def foreach(iterable: Iterable[T], *f: OptionalElementOperation[T]) -> None: - if len(f) == 0: - return - for v in iterable: - for f_ in f: - if f_ is not None: - f_(v) - - @staticmethod - def make(v: CanBeList[T], /, none_to_empty_list: bool = True) -> list[T]: - if v is None and none_to_empty_list: - return [] - return list(v) if isinstance(v, Iterable) else [v] - - -class ListOperations: - @staticmethod - def all(iterable: Iterable[T], predicate: ElementPredicate[T]) -> bool: - """ - partial - """ - return _God.spy(iterable, lambda v, _: predicate(v) and None, _God.Default.true) - - @staticmethod - def all_isinstance(iterable: Iterable[T], *types: OptionalType) -> bool: - """ - partial - """ - types = _God.help_make_set(types) - return ListOperations.all(iterable, lambda v: type(v) in types) - - @staticmethod - def any(iterable: Iterable[T], predicate: ElementPredicate[T]) -> bool: - """ - partial - """ - return _God.spy(iterable, lambda v, _: predicate(v) or None, _God.Default.false) - - @staticmethod - def indices(iterable: Iterable[T]) -> Iterable[int]: - """ - partial - """ - return _God.new(iterable, lambda _, i: i) - - @staticmethod - def take(iterable: Iterable[T], predicate: ElementPredicate[T]) -> Iterable[T]: - """ - complete - """ - return _God.new(iterable, lambda v, _: _God.yield_(v) if predicate(v) else None) - - @staticmethod - def transform( - iterable: Iterable[T], *transformers: OptionalElementTransformer - ) -> Iterable: - """ - complete - """ - - def _transform_element(element, _): - for transformer in transformers: - if transformer is not None: - element = transformer(element) - return element - - return _God.new(iterable, _transform_element) - - @staticmethod - def take_n(iterable: Iterable[T], n: int) -> Iterable[T]: - """ - partial - """ - if n < 0: - return iterable - elif n == 0: - return [] - return range(n)._god_yield( - iterable, lambda v, i: _yield(v) if i < n else _return() - ) - - @staticmethod - def take_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]: - """ - partial - """ - indices = set(indices) - {None} - max_index = max(indices) - iterable = ListOperations.take_n(iterable, max_index + 1) - return _god_yield(iterable, lambda v, i: _yield(v) if i in indices else None) - - @staticmethod - def first(iterable: Iterable[T]) -> T | CRU_NOT_FOUND: - """ - partial - """ - result_iterable = ListOperations.take_n(iterable, 1) - for element in result_iterable: - return element - return CRU_NOT_FOUND - - @staticmethod - def first_index( - iterable: Iterable[T], predicate: ElementPredicate[T] - ) -> int | CRU_NOT_FOUND: - """ - partial - """ - for index, element in enumerate(iterable): - if predicate(element): - return index - - @staticmethod - def take_indices( - iterable: Iterable[T], predicate: ElementPredicate[T] - ) -> Iterable[int]: - """ - complete - """ - for index, element in enumerate(iterable): - if predicate(element): - yield index - - @staticmethod - def _flatten(o, depth: int, max_depth: int) -> Iterable: - if depth == max_depth or not isinstance(o, Iterable): - yield o - return - for v in o: - yield from ListOperations._flatten(v, depth + 1, max_depth) - - @staticmethod - def flatten(o, max_depth=-1) -> Iterable: - """ - complete - """ - return ListOperations._flatten(o, 0, max_depth) - - @staticmethod - def skip_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]: - """ - complete - """ - indices = set(indices) - {None} - for index, element in enumerate(iterable): - if index not in indices: - yield element - - @staticmethod - def skip_if(iterable: Iterable[T], predicate: ElementPredicate[T]) -> list[T]: - """ - complete - """ - for element in iterable: - if not predicate(element): - yield element - - @staticmethod - def remove_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]: - return [v for v in l if not p(v)] - - @staticmethod - def remove_all_value(l: Iterable[T], *r: Any) -> list[T]: - return [v for v in l if v not in r] - - @staticmethod - def replace_all_value(l: Iterable[T], old_value: Any, new_value: R) -> list[T | R]: - return [new_value if v == old_value else v for v in l] - - @staticmethod - def foreach(iterable: Iterable[T], *f: OptionalElementOperation[T]) -> None: - if len(f) == 0: - return - for v in iterable: - for f_ in f: - if f_ is not None: - f_(v) - - @staticmethod - def make(v: CanBeList[T], /, none_to_empty_list: bool = True) -> list[T]: - if v is None and none_to_empty_list: - return [] - return list(v) if isinstance(v, Iterable) else [v] - - -class CruList(list, Generic[T]): - @property - def is_empty(self) -> bool: - return len(self) == 0 - - def sub_by_indices(self, *index: int) -> "CruList"[T]: - return CruList(ListOperations.sub_by_indices(self, *index)) - - def split_by_indices(self, *index: int) -> tuple["CruList"[T], "CruList"[T]]: - l1, l2 = ListOperations.split_by_indices(self, *index) - return CruList(l1), CruList(l2) - - def complement_indices(self, *index: int) -> list[int]: - return ListOperations.complement_indices(len(self), *index) - - def foreach(self, *f: OptionalElementOperation[T]) -> None: - ListOperations.foreach(self, *f) - - def all(self, p: ElementPredicate[T]) -> bool: - return ListOperations.all(self, p) - - def all_is_instance(self, *t: type) -> bool: - return ListOperations.all_isinstance(self, *t) - - def any(self, p: ElementPredicate[T]) -> bool: - return ListOperations.any(self, p) - - def find_all_if(self, p: ElementPredicate[T]) -> "CruList"[T]: - return CruList(ListOperations.take(self, p)) - - def find_if(self, p: ElementPredicate[T]) -> T | CRU_NOT_FOUND: - return ListOperations.first(self, p) - - def find_all_indices_if(self, p: ElementPredicate[T]) -> "CruList"[int]: - return CruList(ListOperations.take_indices(self, p)) - - def find_index_if(self, p: ElementPredicate[T]) -> int | CRU_NOT_FOUND: - return ListOperations.first_index(self, p) - - def split_if(self, p: ElementPredicate[T]) -> tuple["CruList"[T], "CruList"[T]]: - l1, l2 = ListOperations.split_if(self, p) - return CruList(l1), CruList(l2) - - def split_by_types(self, *t: type) -> tuple["CruList"[T], "CruList"[T]]: - l1, l2 = ListOperations.split_by_types(self, *t) - return CruList(l1), CruList(l2) - - def transform(self, *f: OptionalElementTransformer) -> "CruList"[Any]: - return CruList(ListOperations.transform(self, *f)) - - def transform_if( - self, f: OptionalElementTransformer, p: ElementPredicate[T] - ) -> "CruList"[Any]: - return CruList(ListOperations.transform_if(self, f, p)) - - def remove_by_indices(self, *index: int) -> "CruList"[T]: - return CruList(ListOperations.skip_by_indices(self, *index)) - - def remove_if(self, p: ElementPredicate[T]) -> "CruList"[T]: - return CruList(ListOperations.remove_if(self, p)) - - def remove_all_if(self, p: ElementPredicate[T]) -> "CruList"[T]: - return CruList(ListOperations.remove_all_if(self, p)) - - def remove_all_value(self, *r: Any) -> "CruList"[T]: - return CruList(ListOperations.remove_all_value(self, *r)) - - def replace_all_value(self, old_value: Any, new_value: R) -> "CruList"[T | R]: - return CruList(ListOperations.replace_all_value(self, old_value, new_value)) - - @staticmethod - def make(l: CanBeList[T]) -> "CruList"[T]: - return CruList(ListOperations.make(l)) - - -class CruInplaceList(CruList, Generic[T]): - - def clear(self) -> "CruInplaceList[T]": - self.clear() - return self - - def extend(self, *l: Iterable[T]) -> "CruInplaceList[T]": - self.extend(l) - return self - - def reset(self, *l: Iterable[T]) -> "CruInplaceList[T]": - self.clear() - self.extend(l) - return self - - def transform(self, *f: OptionalElementTransformer) -> "CruInplaceList"[Any]: - return self.reset(super().transform(*f)) - - def transform_if( - self, f: OptionalElementTransformer, p: ElementPredicate[T] - ) -> "CruInplaceList"[Any]: - return self.reset(super().transform_if(f, p)) - - def remove_by_indices(self, *index: int) -> "CruInplaceList"[T]: - return self.reset(super().remove_by_indices(*index)) - - def remove_all_if(self, p: ElementPredicate[T]) -> "CruInplaceList"[T]: - return self.reset(super().remove_all_if(p)) - - def remove_all_value(self, *r: Any) -> "CruInplaceList"[T]: - return self.reset(super().remove_all_value(*r)) - - def replace_all_value( - self, old_value: Any, new_value: R - ) -> "CruInplaceList"[T | R]: - return self.reset(super().replace_all_value(old_value, new_value)) - - @staticmethod - def make(l: CanBeList[T]) -> "CruInplaceList"[T]: - return CruInplaceList(ListOperations.make(l)) - - -K = TypeVar("K") - - -class CruUniqueKeyInplaceList(Generic[T, K]): - KeyGetter = Callable[[T], K] - - def __init__( - self, get_key: KeyGetter, *, before_add: Callable[[T], T] | None = None - ): - super().__init__() - self._get_key = get_key - self._before_add = before_add - self._l: CruInplaceList[T] = CruInplaceList() - - @property - def object_key_getter(self) -> KeyGetter: - return self._get_key - - @property - def internal_list(self) -> CruInplaceList[T]: - return self._l - - def validate_self(self): - keys = self._l.transform(self._get_key) - if len(keys) != len(set(keys)): - raise ValueError("Duplicate keys!") - - def get_or(self, k: K, fallback: Any = CRU_NOT_FOUND) -> T | Any: - r = self._l.find_if(lambda i: k == self._get_key(i)) - return r if r is not CRU_NOT_FOUND else fallback - - def get(self, k: K) -> T: - v = self.get_or(k, CRU_NOT_FOUND) - if v is CRU_NOT_FOUND: - raise KeyError(f"Key not found!") - return v - - def has_key(self, k: K) -> bool: - return self.get_or(k, CRU_NOT_FOUND) is not CRU_NOT_FOUND - - def has_any_key(self, *k: K) -> bool: - return self._l.any(lambda i: self._get_key(i) in k) - - def try_remove(self, k: K) -> bool: - i = self._l.find_index_if(lambda v: k == self._get_key(v)) - if i is CRU_NOT_FOUND: - return False - self._l.remove_by_indices(i) - return True - - def remove(self, k: K, allow_absense: bool = False) -> None: - if not self.try_remove(k) and not allow_absense: - raise KeyError(f"Key {k} not found!") - - def add(self, v: T, /, replace: bool = False) -> None: - if self.has_key(self._get_key(v)): - if replace: - self.remove(self._get_key(v)) - else: - raise ValueError(f"Key {self._get_key(v)} already exists!") - if self._before_add is not None: - v = self._before_add(v) - self._l.append(v) - - def set(self, v: T) -> None: - self.add(v, True) - - def extend(self, l: Iterable[T], /, replace: bool = False) -> None: - if not replace and self.has_any_key([self._get_key(i) for i in l]): - raise ValueError("Keys already exists!") - if self._before_add is not None: - l = [self._before_add(i) for i in l] - keys = [self._get_key(i) for i in l] - self._l.remove_all_if(lambda i: self._get_key(i) in keys).extend(l) - - def clear(self) -> None: - self._l.clear() - - def __iter__(self): - return iter(self._l) - - def __len__(self): - return len(self._l) |