diff options
Diffstat (limited to 'tools/cru-py/cru/_iter.py')
-rw-r--r-- | tools/cru-py/cru/_iter.py | 674 |
1 files changed, 300 insertions, 374 deletions
diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py index da849e8..83a9513 100644 --- a/tools/cru-py/cru/_iter.py +++ b/tools/cru-py/cru/_iter.py @@ -8,6 +8,7 @@ from typing import ( Generator, Iterator, Literal, + Never, Self, TypeAlias, TypeVar, @@ -15,11 +16,9 @@ from typing import ( Any, Generic, cast, - overload, ) from ._cru import CRU -from ._meta import CruDecCreators from ._const import CruNotFound _P = ParamSpec("_P") @@ -28,208 +27,221 @@ _O = TypeVar("_O") _V = TypeVar("_V") _R = TypeVar("_R") -CanBeList: TypeAlias = Iterable[_T] | _T | None -ElementOperation: TypeAlias = Callable[[_T], Any] -ElementPredicate: TypeAlias = Callable[[_T], bool] -AnyElementPredicate: TypeAlias = ElementPredicate[Any] -ElementTransformer: TypeAlias = Callable[[_T], _O] -SelfElementTransformer: TypeAlias = ElementTransformer[_T, _T] -AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] +class _Generic: + class StepActionKind(Enum): + SKIP = 0 + PUSH = 1 + STOP = 2 + AGGREGATE = 3 + @dataclass + class StepAction(Generic[_V, _R]): + value: Iterable[Self] | _V | _R | None + kind: _Generic.StepActionKind -class _StepActionKind(Enum): - SKIP = 0 - PUSH = 1 - STOP = 2 - AGGREGATE = 3 + @property + def push_value(self) -> _V: + assert self.kind == _Generic.StepActionKind.PUSH + return cast(_V, self.value) + @property + def stop_value(self) -> _R: + assert self.kind == _Generic.StepActionKind.STOP + return cast(_R, self.value) -@dataclass -class _StepAction(Generic[_V, _R]): - value: Iterable[Self] | _V | _R | None - kind: _StepActionKind + @staticmethod + def skip() -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(None, _Generic.StepActionKind.SKIP) - @property - def push_value(self) -> _V: - assert self.kind == _StepActionKind.PUSH - return cast(_V, self.value) + @staticmethod + def push(value: _V | None) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(value, _Generic.StepActionKind.PUSH) - @property - def stop_value(self) -> _R: - assert self.kind == _StepActionKind.STOP - return cast(_R, self.value) + @staticmethod + def stop(value: _R | None = None) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(value, _Generic.StepActionKind.STOP) - @staticmethod - def skip() -> _StepAction[_V, _R]: - return _StepAction(None, _StepActionKind.SKIP) + @staticmethod + def aggregate( + *results: _Generic.StepAction[_V, _R] + ) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(results, _Generic.StepActionKind.AGGREGATE) - @staticmethod - def push(value: _V | None) -> _StepAction[_V, _R]: - return _StepAction(value, _StepActionKind.PUSH) + @staticmethod + def push_last(value: _V | None) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction.aggregate( + _Generic.StepAction.push(value), _Generic.StepAction.stop() + ) - @staticmethod - def stop(value: _R | None = None) -> _StepAction[_V, _R]: - return _StepAction(value, _StepActionKind.STOP) + def flatten(self) -> Iterable[Self]: + return _Generic.flatten( + self, + is_leave=lambda r: r.kind != _Generic.StepActionKind.AGGREGATE, + get_children=lambda r: cast(Iterable[Self], r.value), + ) + + GeneralStepAction: TypeAlias = StepAction[_V, _R] | _V | _R | None + IterateOperation: TypeAlias = Callable[[_T, int], GeneralStepAction[_V, _R]] + IteratePreHook: TypeAlias = Callable[[Iterable[_T]], GeneralStepAction[_V, _R]] + IteratePostHook: TypeAlias = Callable[[int], GeneralStepAction[_V, _R]] @staticmethod - def aggregate(*results: _StepAction[_V, _R]) -> _StepAction[_V, _R]: - return _StepAction(results, _StepActionKind.AGGREGATE) + def _is_not_iterable(o: Any) -> bool: + return not isinstance(o, Iterable) @staticmethod - def push_last(value: _V | None) -> _StepAction[_V, _R]: - return _StepAction.aggregate(_StepAction.push(value), _StepAction.stop()) - - def flatten(self) -> Iterable[Self]: - return CruIterableMeta.flatten( - self, - is_leave=lambda r: r.kind != _StepActionKind.AGGREGATE, - get_children=lambda r: cast(Iterable[Self], r.value), - ) + def _return_self(o): + return o + @staticmethod + def iterable_flatten( + maybe_iterable: Iterable[_T] | _T, max_depth: int = -1, *, _depth: int = 0 + ) -> Iterable[Iterable[_T] | _T]: + if _depth == max_depth or not isinstance(maybe_iterable, Iterable): + yield maybe_iterable + return -_GeneralStepAction: TypeAlias = _StepAction[_V, _R] | _V | _R | None -_IterateOperation = Callable[[_T, int], _GeneralStepAction[_V, _R]] -_IteratePreHook = Callable[[Iterable[_T]], _GeneralStepAction[_V, _R]] -_IteratePostHook = Callable[[int], _GeneralStepAction[_V, _R]] + for child in maybe_iterable: + yield from _Generic.iterable_flatten( + child, + max_depth, + _depth=_depth + 1, + ) + @staticmethod + def flatten( + o: _O, + max_depth: int = -1, + /, + is_leave: _Wrapper.ElementPredicate[_O] = _is_not_iterable, + get_children: _Wrapper.ElementTransformer[_O, Iterable[_O]] = _return_self, + *, + _depth: int = 0, + ) -> Iterable[_O]: + if _depth == max_depth or is_leave(o): + yield o + return + for child in get_children(o): + yield from _Generic.flatten( + child, + max_depth, + is_leave, + get_children, + _depth=_depth + 1, + ) -class CruIterableMeta: - class Generic: - StepActionKind = _StepActionKind - StepAction = _StepAction - GeneralStepAction = _GeneralStepAction - IterateOperation = _IterateOperation - IteratePreHook = _IteratePreHook - IteratePostHook = _IteratePostHook + class Results: + @staticmethod + def true(_) -> Literal[True]: + return True - class Results: - @staticmethod - def true(_) -> Literal[True]: - return True + @staticmethod + def false(_) -> Literal[False]: + return False - @staticmethod - def false(_) -> Literal[False]: - return False + @staticmethod + def not_found(_) -> Literal[CruNotFound.VALUE]: + return CruNotFound.VALUE - @staticmethod - def not_found(_) -> Literal[CruNotFound.VALUE]: - return CruNotFound.VALUE + @staticmethod + def _non_result_to_push(value: Any) -> StepAction[_V, _R]: + return _Generic.StepAction.push(value) - @staticmethod - def _non_result_to_push(value: Any) -> _StepAction[_V, _R]: - return _StepAction.push(value) + @staticmethod + def _non_result_to_stop(value: Any) -> StepAction[_V, _R]: + return _Generic.StepAction.stop(value) - @staticmethod - def _non_result_to_stop(value: Any) -> _StepAction[_V, _R]: - return _StepAction.stop(value) + @staticmethod + def _none_hook(_: Any) -> StepAction[_V, _R]: + return _Generic.StepAction.skip() - @staticmethod - def _none_hook(_: Any) -> _StepAction[_V, _R]: - return _StepAction.skip() - - def iterate( - iterable: Iterable[_T], - operation: _IterateOperation[_T, _V, _R], - fallback_return: _R, - pre_iterate: _IteratePreHook[_T, _V, _R], - post_iterate: _IteratePostHook[_V, _R], - convert_value_result: Callable[[_V | _R | None], _StepAction[_V, _R]], - ) -> Generator[_V, None, _R]: - pre_result = pre_iterate(iterable) - if not isinstance(pre_result, _StepAction): - real_pre_result = convert_value_result(pre_result) - for r in real_pre_result.flatten(): - if r.kind == _StepActionKind.STOP: - return r.stop_value - elif r.kind == _StepActionKind.PUSH: - yield r.push_value - else: - assert r.kind == _StepActionKind.SKIP - - for index, element in enumerate(iterable): - result = operation(element, index) - if not isinstance(result, _StepAction): - real_result = convert_value_result(result) - for r in real_result.flatten(): - if r.kind == _StepActionKind.STOP: - return r.stop_value - elif r.kind == _StepActionKind.PUSH: - yield r.push_value - else: - assert r.kind == _StepActionKind.SKIP - continue - - post_result = post_iterate(index + 1) - if not isinstance(post_result, _StepAction): - real_post_result = convert_value_result(post_result) - for r in real_post_result.flatten(): - if r.kind == _StepActionKind.STOP: + def iterate( + iterable: Iterable[_T], + operation: IterateOperation[_T, _V, _R], + fallback_return: _R, + pre_iterate: IteratePreHook[_T, _V, _R], + post_iterate: IteratePostHook[_V, _R], + convert_value_result: Callable[[_V | _R | None], StepAction[_V, _R]], + ) -> Generator[_V, None, _R]: + pre_result = pre_iterate(iterable) + if not isinstance(pre_result, _Generic.StepAction): + real_pre_result = convert_value_result(pre_result) + for r in real_pre_result.flatten(): + if r.kind == _Generic.StepActionKind.STOP: + return r.stop_value + elif r.kind == _Generic.StepActionKind.PUSH: + yield r.push_value + else: + assert r.kind == _Generic.StepActionKind.SKIP + + for index, element in enumerate(iterable): + result = operation(element, index) + if not isinstance(result, _Generic.StepAction): + real_result = convert_value_result(result) + for r in real_result.flatten(): + if r.kind == _Generic.StepActionKind.STOP: return r.stop_value - elif r.kind == _StepActionKind.PUSH: + elif r.kind == _Generic.StepActionKind.PUSH: yield r.push_value else: - assert r.kind == _StepActionKind.SKIP - - return fallback_return - - def create_new( - iterable: Iterable[_T], - operation: _IterateOperation[_T, _V, _R], - fallback_return: _R, - /, - pre_iterate: _IteratePreHook[_T, _V, _R] | None = None, - post_iterate: _IteratePostHook[_V, _R] | None = None, - ) -> Generator[_V, None, _R]: - return CruIterableMeta.Generic.iterate( + assert r.kind == _Generic.StepActionKind.SKIP + continue + + post_result = post_iterate(index + 1) + if not isinstance(post_result, _Generic.StepAction): + real_post_result = convert_value_result(post_result) + for r in real_post_result.flatten(): + if r.kind == _Generic.StepActionKind.STOP: + return r.stop_value + elif r.kind == _Generic.StepActionKind.PUSH: + yield r.push_value + else: + assert r.kind == _Generic.StepActionKind.SKIP + + return fallback_return + + def create_new( + iterable: Iterable[_T], + operation: IterateOperation[_T, _V, _R], + fallback_return: _R, + /, + pre_iterate: IteratePreHook[_T, _V, _R] | None = None, + post_iterate: IteratePostHook[_V, _R] | None = None, + ) -> Generator[_V, None, _R]: + return _Generic.iterate( + iterable, + operation, + fallback_return, + pre_iterate or _Generic._none_hook, + post_iterate or _Generic._none_hook, + _Generic._non_result_to_push, + ) + + def get_result( + iterable: Iterable[_T], + operation: IterateOperation[_T, _V, _R], + fallback_return: _R, + /, + pre_iterate: IteratePreHook[_T, _V, _R] | None = None, + post_iterate: IteratePostHook[_V, _R] | None = None, + ) -> _R: + try: + for _ in _Generic.iterate( iterable, operation, fallback_return, - pre_iterate or CruIterableMeta.Generic._none_hook, - post_iterate or CruIterableMeta.Generic._none_hook, - CruIterableMeta.Generic._non_result_to_push, - ) + pre_iterate or _Generic._none_hook, + post_iterate or _Generic._none_hook, + _Generic._non_result_to_stop, + ): + pass + except StopIteration as stop: + return stop.value + raise RuntimeError("Should not reach here") - def get_result( - iterable: Iterable[_T], - operation: _IterateOperation[_T, _V, _R], - fallback_return: _R, - /, - pre_iterate: _IteratePreHook[_T, _V, _R] | None = None, - post_iterate: _IteratePostHook[_V, _R] | None = None, - ) -> _R: - try: - for _ in CruIterableMeta.Generic.iterate( - iterable, - operation, - fallback_return, - pre_iterate or CruIterableMeta.Generic._none_hook, - post_iterate or CruIterableMeta.Generic._none_hook, - CruIterableMeta.Generic._non_result_to_stop, - ): - pass - except StopIteration as stop: - return stop.value - raise RuntimeError("Should not reach here") - - class Creators: - @staticmethod - def empty() -> Iterable[_T]: - return iter([]) - - @staticmethod - def range(*args) -> Iterable[int]: - return iter(range(*args)) - - @staticmethod - def unite(*args: _O) -> Iterable[_O]: - return iter(args) - - @staticmethod - def concat(*iterables: Iterable[_T]) -> Iterable[_T]: - for iterable in iterables: - yield from iterable +class _Helper: @staticmethod def with_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]: count = 0 @@ -242,78 +254,139 @@ class CruIterableMeta: return wrapper + +class _Dec: + + @staticmethod + def wrap(origin: Callable[_P, Iterable[_T]]) -> Callable[_P, _Wrapper[_T]]: + def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_T]: + return _Wrapper(origin(*args, **kwargs)) + + return _wrapped + + +class _Creators: @staticmethod - def to_set(iterable: Iterable[_T], discard: Iterable[Any]) -> set[_T]: - return set(iterable) - set(discard) + @_Dec.wrap + def empty() -> Iterable[Never]: + return iter([]) @staticmethod - def to_list(iterable: Iterable[_T], discard: Iterable[Any]) -> list[_T]: - return [v for v in iterable if v not in set(discard)] + @_Dec.wrap + def range(*args) -> Iterable[int]: + return iter(range(*args)) @staticmethod - def all(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> bool: - for value in iterable: + @_Dec.wrap + def unite(*args: _O) -> Iterable[_O]: + return iter(args) + + @staticmethod + @_Dec.wrap + def concat(*iterables: Iterable[_T]) -> Iterable[_T]: + for iterable in iterables: + yield from iterable + + +class _Wrapper(Generic[_T]): + ElementOperation: TypeAlias = Callable[[_V], Any] + ElementPredicate: TypeAlias = Callable[[_V], bool] + AnyElementPredicate: TypeAlias = ElementPredicate[Any] + ElementTransformer: TypeAlias = Callable[[_V], _O] + SelfElementTransformer: TypeAlias = ElementTransformer[_V, _V] + AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] + + def __init__( + self, + iterable: Iterable[_T], + ) -> None: + self._iterable = iterable + + def __iter__(self) -> Iterator[_T]: + return self._iterable.__iter__() + + @property + def me(self) -> Iterable[_T]: + return self._iterable + + _wrap = _Dec.wrap + + @_wrap + def replace_me(self, iterable: Iterable[_O]) -> Iterable[_O]: + return iterable + + def replace_me_with_empty(self) -> _Wrapper[Never]: + return _Creators.empty() + + def replace_me_with_range(self, *args) -> _Wrapper[int]: + return _Creators.range(*args) + + def replace_me_with_unite(self, *args: _O) -> _Wrapper[_O]: + return _Creators.unite(*args) + + def replace_me_with_concat(self, *iterables: Iterable[_T]) -> _Wrapper[_T]: + return _Creators.concat(*iterables) + + def to_set(self, discard: Iterable[Any]) -> set[_T]: + return set(self.me) - set(discard) + + def to_list(self, discard: Iterable[Any]) -> list[_T]: + return [v for v in self.me if v not in set(discard)] + + def all(self, predicate: ElementPredicate[_T]) -> bool: + for value in self.me: if not predicate(value): return False return True - @staticmethod - def any(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> bool: - for value in iterable: + def any(self, predicate: ElementPredicate[_T]) -> bool: + for value in self.me: if predicate(value): return True return False - @staticmethod - def foreach(iterable: Iterable[_T], operation: ElementOperation[_T]) -> None: - for value in iterable: + def foreach(self, operation: ElementOperation[_T]) -> None: + for value in self.me: operation(value) - @staticmethod - def transform( - iterable: Iterable[_T], transformer: ElementTransformer[_T, _O] - ) -> Iterable[_O]: - for value in iterable: + @_wrap + def transform(self, transformer: ElementTransformer[_T, _O]) -> Iterable[_O]: + for value in self.me: yield transformer(value) - @staticmethod - def filter(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> Iterable[_T]: - for value in iterable: + map = transform + + @_wrap + def filter(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: + for value in self.me: if predicate(value): yield value - @staticmethod - def continue_if( - iterable: Iterable[_T], predicate: ElementPredicate[_T] - ) -> Iterable[_T]: - for value in iterable: + @_wrap + def continue_if(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: + for value in self.me: yield value if not predicate(value): break - @staticmethod - def first_n(iterable: Iterable[_T], max_count: int) -> Iterable[_T]: + def first_n(self, max_count: int) -> _Wrapper[_T]: if max_count < 0: raise ValueError("max_count must be 0 or positive.") if max_count == 0: - return CruIterableMeta.Creators.empty() - return CruIterableMeta.continue_if( - iterable, _with_count(lambda i, _: i < max_count - 1) - ) + return self.replace_me_with_empty() # type: ignore + return self.continue_if(_Helper.with_count(lambda i, _: i < max_count - 1)) - @staticmethod - def drop_n(iterable: Iterable[_T], n: int) -> Iterable[_T]: + def drop_n(self, n: int) -> _Wrapper[_T]: if n < 0: raise ValueError("n must be 0 or positive.") if n == 0: - return iterable - return CruIterableMeta.filter(iterable, _with_count(lambda i, _: i < n)) + return self + return self.filter(_Helper.with_count(lambda i, _: i < n)) - @staticmethod def single_or( - iterable: Iterable[_T], fallback: _O | CruNotFound | None = CruNotFound.VALUE + self, fallback: _O | CruNotFound | None = CruNotFound.VALUE ) -> _T | _O | Any | CruNotFound: - first_2 = CruIterableMeta.first_n(iterable, 2) + first_2 = self.first_n(2) has_value = False value = None for element in first_2: @@ -326,181 +399,34 @@ class CruIterableMeta: else: return fallback - @staticmethod - def _is_not_iterable(o: Any) -> bool: - return not isinstance(o, Iterable) - - @staticmethod - def _return_self(o): - return o - - @staticmethod - @overload - def flatten(o: Iterable[_T], max_depth: int = -1) -> Iterable[_T]: ... - - @staticmethod - @overload - def flatten( - o: _O, - max_depth: int = -1, - *, - is_leave: ElementPredicate[_O], - get_children: ElementTransformer[_O, Iterable[_O]], - ) -> Iterable[_O]: ... - - @staticmethod - def flatten( - o: _O, - max_depth: int = -1, - *, - is_leave: ElementPredicate[_O] = _is_not_iterable, - get_children: ElementTransformer[_O, Iterable[_O]] = _return_self, - _depth: int = 0, - ) -> Iterable[Any]: - if _depth == max_depth or is_leave(o): - yield o - return - for child in get_children(o): - yield from CruIterableMeta.flatten( - child, - max_depth, - is_leave=is_leave, - get_children=get_children, - _depth=_depth + 1, - ) # type: ignore - - -_with_count = CruIterableMeta.with_count - - -class CruIterableWrapper(Generic[_T]): - Meta = CruIterableMeta - - class Dec: - @staticmethod - def _wrap(iterable: Iterable[_O]) -> CruIterableWrapper[_O]: - return CruIterableWrapper(iterable) - - wrap = CruDecCreators.convert_result(_wrap) - - @staticmethod - def _as_iterable(_self: CruIterableWrapper[_O]) -> CruIterableWrapper[_O]: - return _self - - meta = CruDecCreators.create_implement_by(_as_iterable) - meta_no_self = CruDecCreators.create_implement_by_no_self() - - def __init__( - self, - iterable: Iterable[_T], - ) -> None: - self._iterable = iterable - - def __iter__(self) -> Iterator[_T]: - return self._iterable.__iter__() - - @property - def me(self) -> Iterable[_T]: - return self._iterable - - def replace_me(self, iterable: Iterable[_O]) -> CruIterableWrapper[_O]: - return CruIterableWrapper(iterable) - - @Dec.wrap - @Dec.meta_no_self(CruIterableMeta.Creators.empty) - def replace_me_with_empty(self) -> None: - pass - - @Dec.wrap - @Dec.meta_no_self(CruIterableMeta.Creators.range) - def replace_me_with_range(self) -> None: - pass - - @Dec.wrap - @Dec.meta_no_self(CruIterableMeta.Creators.unite) - def replace_me_with_unite(self) -> None: - pass + @_wrap + def flatten(self) -> Iterable[_T | Iterable[_T]]: + return _Generic.iterable_flatten(self.me) - @Dec.wrap - @Dec.meta_no_self(CruIterableMeta.Creators.concat) - def replace_me_with_concat(self) -> None: - pass - - @Dec.meta(CruIterableMeta.to_set) - def to_set(self) -> None: - pass - - @Dec.meta(CruIterableMeta.to_list) - def to_list(self) -> None: - pass - - @Dec.meta(CruIterableMeta.all) - def all(self) -> None: - pass - - @Dec.meta(CruIterableMeta.any) - def any(self) -> None: - pass - - @Dec.meta(CruIterableMeta.foreach) - def foreach(self) -> None: - pass - - @Dec.wrap - @Dec.meta(CruIterableMeta.transform) - def transform(self) -> None: - pass - - map = transform - - @Dec.wrap - @Dec.meta(CruIterableMeta.filter) - def filter(self) -> None: - pass - - @Dec.wrap - @Dec.meta(CruIterableMeta.continue_if) - def continue_if(self) -> None: - pass - - @Dec.wrap - @Dec.meta(CruIterableMeta.first_n) - def first_n(self) -> None: - pass - - @Dec.wrap - @Dec.meta(CruIterableMeta.drop_n) - def drop_n(self) -> None: - pass - - @Dec.wrap - @Dec.meta(CruIterableMeta.flatten) - def flatten(self) -> None: - pass - - @Dec.meta(CruIterableMeta.single_or) - def single_or(self) -> None: - pass - - @Dec.wrap - def select_by_indices(self, indices: Iterable[int]) -> Iterable[_T]: + def select_by_indices(self, indices: Iterable[int]) -> _Wrapper[_T]: index_set = set(indices) max_index = max(index_set) return self.first_n(max_index + 1).filter( - _with_count(lambda i, _: i in index_set) + _Helper.with_count(lambda i, _: i in index_set) ) - @Dec.wrap - def remove_values(self, values: Iterable[Any]) -> Iterable[_V]: + def remove_values(self, values: Iterable[Any]) -> _Wrapper[_T]: value_set = set(values) return self.filter(lambda v: v not in value_set) - @Dec.wrap def replace_values( self, old_values: Iterable[Any], new_value: _O - ) -> Iterable[_V | _O]: + ) -> Iterable[_T | _O]: value_set = set(old_values) - return self.map(lambda v: new_value if v in value_set else v) + return self.transform(lambda v: new_value if v in value_set else v) + + +class CruIterable: + Decorators = _Dec + Generic = _Generic + Helper = _Helper + Creators = _Creators + Wrapper = _Wrapper -CRU.add_objects(CruIterableMeta, CruIterableWrapper) +CRU.add_objects(CruIterable, _Wrapper) |