diff options
Diffstat (limited to 'tools/cru-py/cru/_iter.py')
-rw-r--r-- | tools/cru-py/cru/_iter.py | 180 |
1 files changed, 83 insertions, 97 deletions
diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py index bf1dfde..b91195b 100644 --- a/tools/cru-py/cru/_iter.py +++ b/tools/cru-py/cru/_iter.py @@ -1,12 +1,10 @@ from __future__ import annotations -from collections.abc import Iterable, Callable +from collections.abc import Iterable, Callable, Generator, Iterator from dataclasses import dataclass from enum import Enum from typing import ( Concatenate, - Generator, - Iterator, Literal, Never, Self, @@ -114,8 +112,8 @@ class _Generic: o: _O, max_depth: int = -1, /, - is_leave: _Wrapper.ElementPredicate[_O] = _is_not_iterable, - get_children: _Wrapper.ElementTransformer[_O, Iterable[_O]] = _return_self, + is_leave: CruIterator.ElementPredicate[_O] = _is_not_iterable, + get_children: CruIterator.ElementTransformer[_O, Iterable[_O]] = _return_self, *, _depth: int = 0, ) -> Iterable[_O]: @@ -241,9 +239,9 @@ class _Generic: cru_unreachable() -class _Helper: +class _Helpers: @staticmethod - def with_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]: + def auto_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]: count = 0 def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _O: @@ -257,49 +255,42 @@ class _Helper: class _Creators: - @staticmethod - def empty( - *, - wrapper_type: type["_Wrapper[Never]"] | None = None, - attr: dict[str, Any] | None = None, - ) -> _Wrapper[Never]: - wrapper_type = wrapper_type or _Wrapper - return wrapper_type(iter([]), attr) + class Raw: + @staticmethod + def empty() -> Iterator[Never]: + return iter([]) - @staticmethod - def range( - *args, - wrapper_type: type["_Wrapper[int]"] | None = None, - attr: dict[str, Any] | None = None, - ) -> _Wrapper[int]: - wrapper_type = wrapper_type or _Wrapper - return wrapper_type(iter(range(*args)), attr) + @staticmethod + def range(*args) -> Iterator[int]: + return iter(range(*args)) - @staticmethod - def unite( - *args: _O, - wrapper_type: type["_Wrapper[_O]"] | None = None, - attr: dict[str, Any] | None = None, - ) -> _Wrapper[_O]: - wrapper_type = wrapper_type or _Wrapper - return wrapper_type(iter(args), attr) + @staticmethod + def unite(*args: _T) -> Iterator[_T]: + return iter(args) - @staticmethod - def _concat(*iterables: Iterable[_T]) -> Iterable[_T]: - for iterable in iterables: - yield from iterable + @staticmethod + def _concat(*iterables: Iterable[_T]) -> Iterable[_T]: + for iterable in iterables: + yield from iterable + + @staticmethod + def concat(*iterables: Iterable[_T]) -> Iterator[_T]: + return iter(_Creators.Raw._concat(*iterables)) @staticmethod - def concat( - *iterables: Iterable[_T], - wrapper_type: type["_Wrapper[_T]"] | None = None, - attr: dict[str, Any] | None = None, - ) -> _Wrapper[_T]: - wrapper_type = wrapper_type or _Wrapper - return wrapper_type(_Creators._concat(*iterables), attr) + def _wrap(f: Callable[_P, Iterable[_O]]) -> Callable[_P, CruIterator[_O]]: + def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> CruIterator[_O]: + return CruIterator(f(*args, **kwargs)) + + return _wrapped + empty = _wrap(Raw.empty) + range = _wrap(Raw.range) + unite = _wrap(Raw.unite) + concat = _wrap(Raw.concat) -class _Wrapper(Generic[_T]): + +class CruIterator(Generic[_T]): ElementOperation: TypeAlias = Callable[[_V], Any] ElementPredicate: TypeAlias = Callable[[_V], bool] AnyElementPredicate: TypeAlias = ElementPredicate[Any] @@ -307,34 +298,26 @@ class _Wrapper(Generic[_T]): SelfElementTransformer: TypeAlias = ElementTransformer[_V, _V] AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] - def __init__( - self, iterable: Iterable[_T], attr: dict[str, Any] | None = None - ) -> None: - self._iterable = iterable - self._attr = attr or {} - - def __iter__(self) -> Iterator[_T]: - return self._iterable.__iter__() + Creators: TypeAlias = _Creators + Helpers: TypeAlias = _Helpers - @property - def me(self) -> Iterable[_T]: - return self._iterable + def __init__(self, iterable: Iterable[_T]) -> None: + self._iterator = iter(iterable) - @property - def my_attr(self) -> dict[str, Any]: - return self._attr + def __iter__(self) -> Iterator[_T]: + return self._iterator - def create_with_me(self, iterable: Iterable[_O]) -> _Wrapper[_O]: - return type(self)(iterable, self._attr) # type: ignore + def create_new_me(self, iterable: Iterable[_O]) -> CruIterator[_O]: + return type(self)(iterable) # type: ignore @staticmethod def _wrap( - f: Callable[Concatenate[_Wrapper[_T], _P], Iterable[_O]] - ) -> Callable[Concatenate[_Wrapper[_T], _P], _Wrapper[_O]]: + f: Callable[Concatenate[CruIterator[_T], _P], Iterable[_O]] + ) -> Callable[Concatenate[CruIterator[_T], _P], CruIterator[_O]]: def _wrapped( - self: _Wrapper[_T], *args: _P.args, **kwargs: _P.kwargs - ) -> _Wrapper[_O]: - return self.create_with_me(f(self, *args, **kwargs)) + self: CruIterator[_T], *args: _P.args, **kwargs: _P.kwargs + ) -> CruIterator[_O]: + return self.create_new_me(f(self, *args, **kwargs)) return _wrapped @@ -342,80 +325,79 @@ class _Wrapper(Generic[_T]): def replace_me(self, iterable: Iterable[_O]) -> Iterable[_O]: return iterable - def replace_me_with_empty(self) -> _Wrapper[Never]: - return _Creators.empty(wrapper_type=type(self), attr=self._attr) # type: ignore + def replace_me_with_empty(self) -> CruIterator[Never]: + return self.create_new_me(_Creators.Raw.empty()) - def replace_me_with_range(self, *args) -> _Wrapper[int]: - return _Creators.range(*args, attr=self._attr) + def replace_me_with_range(self, *args) -> CruIterator[int]: + return self.create_new_me(_Creators.Raw.range(*args)) - def replace_me_with_unite(self, *args: _O) -> _Wrapper[_O]: - return _Creators.unite(*args, attr=self._attr) + def replace_me_with_unite(self, *args: _O) -> CruIterator[_O]: + return self.create_new_me(_Creators.Raw.unite(*args)) - def replace_me_with_concat(self, *iterables: Iterable[_T]) -> _Wrapper[_T]: - return _Creators.concat(*iterables, attr=self._attr) + def replace_me_with_concat(self, *iterables: Iterable[_T]) -> CruIterator[_T]: + return self.create_new_me(_Creators.Raw.concat(*iterables)) - def to_set(self, discard: Iterable[Any]) -> set[_T]: - return set(self.me) - set(discard) + def to_set(self) -> set[_T]: + return set(self) - def to_list(self, discard: Iterable[Any]) -> list[_T]: - return [v for v in self.me if v not in set(discard)] + def to_list(self) -> list[_T]: + return list(self) def all(self, predicate: ElementPredicate[_T]) -> bool: - for value in self.me: + for value in self: if not predicate(value): return False return True def any(self, predicate: ElementPredicate[_T]) -> bool: - for value in self.me: + for value in self: if predicate(value): return True return False def foreach(self, operation: ElementOperation[_T]) -> None: - for value in self.me: + for value in self: operation(value) @_wrap def transform(self, transformer: ElementTransformer[_T, _O]) -> Iterable[_O]: - for value in self.me: + for value in self: yield transformer(value) map = transform @_wrap def filter(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: - for value in self.me: + for value in self: if predicate(value): yield value @_wrap def continue_if(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: - for value in self.me: + for value in self: yield value if not predicate(value): break - def first_n(self, max_count: int) -> _Wrapper[_T]: + def first_n(self, max_count: int) -> CruIterator[_T]: if max_count < 0: raise ValueError("max_count must be 0 or positive.") if max_count == 0: return self.replace_me_with_empty() # type: ignore - return self.continue_if(_Helper.with_count(lambda i, _: i < max_count - 1)) + return self.continue_if(_Helpers.auto_count(lambda i, _: i < max_count - 1)) - def drop_n(self, n: int) -> _Wrapper[_T]: + def drop_n(self, n: int) -> CruIterator[_T]: if n < 0: raise ValueError("n must be 0 or positive.") if n == 0: return self - return self.filter(_Helper.with_count(lambda i, _: i < n)) + return self.filter(_Helpers.auto_count(lambda i, _: i < n)) def single_or( - self, fallback: _O | CruNotFound | None = CruNotFound.VALUE - ) -> _T | _O | Any | CruNotFound: + self, fallback: _O | CruNotFound = CruNotFound.VALUE + ) -> _T | _O | CruNotFound: first_2 = self.first_n(2) has_value = False - value = None for element in first_2: if has_value: raise ValueError("More than one value found.") @@ -426,18 +408,23 @@ class _Wrapper(Generic[_T]): else: return fallback + def first_or( + self, fallback: _O | CruNotFound = CruNotFound.VALUE + ) -> _T | _O | CruNotFound: + return self.first_n(1).single_or(fallback) + @_wrap def flatten(self) -> Iterable[_T | Iterable[_T]]: - return _Generic.iterable_flatten(self.me) + return _Generic.iterable_flatten(self) - def select_by_indices(self, indices: Iterable[int]) -> _Wrapper[_T]: + def select_by_indices(self, indices: Iterable[int]) -> CruIterator[_T]: index_set = set(indices) max_index = max(index_set) return self.first_n(max_index + 1).filter( - _Helper.with_count(lambda i, _: i in index_set) + _Helpers.auto_count(lambda i, _: i in index_set) ) - def remove_values(self, values: Iterable[Any]) -> _Wrapper[_T]: + def remove_values(self, values: Iterable[Any]) -> CruIterator[_T]: value_set = set(values) return self.filter(lambda v: v not in value_set) @@ -450,9 +437,8 @@ class _Wrapper(Generic[_T]): class CruIterable: Generic: TypeAlias = _Generic - Helper: TypeAlias = _Helper - Creators: TypeAlias = _Creators - Wrapper: TypeAlias = _Wrapper + Iterator: TypeAlias = CruIterator + Helpers: TypeAlias = _Helpers -CRU.add_objects(CruIterable, _Wrapper) +CRU.add_objects(CruIterable, CruIterator) |