diff options
Diffstat (limited to 'tools')
-rw-r--r-- | tools/cru-py/cru/_decorator.py | 97 | ||||
-rw-r--r-- | tools/cru-py/cru/_func.py | 262 | ||||
-rw-r--r-- | tools/cru-py/cru/_iter.py | 674 | ||||
-rw-r--r-- | tools/cru-py/cru/_meta.py | 92 |
4 files changed, 487 insertions, 638 deletions
diff --git a/tools/cru-py/cru/_decorator.py b/tools/cru-py/cru/_decorator.py new file mode 100644 index 0000000..432ceca --- /dev/null +++ b/tools/cru-py/cru/_decorator.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import ( + Concatenate, + Generic, + ParamSpec, + TypeVar, + cast, +) + +from ._cru import CRU + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_O = TypeVar("_O") +_R = TypeVar("_R") + + +class CruDecorator: + + class ConvertResult(Generic[_T, _O]): + def __init__( + self, + converter: Callable[[_T], _O], + ) -> None: + self.converter = converter + + def __call__(self, origin: Callable[_P, _T]) -> Callable[_P, _O]: + converter = self.converter + + def real_impl(*args: _P.args, **kwargs: _P.kwargs) -> _O: + return converter(origin(*args, **kwargs)) + + return real_impl + + class ImplementedBy(Generic[_T, _O, _P, _R]): + def __init__( + self, + impl: Callable[Concatenate[_O, _P], _R], + converter: Callable[[_T], _O], + ) -> None: + self.impl = impl + self.converter = converter + + def __call__( + self, _origin: Callable[[_T], None] + ) -> Callable[Concatenate[_T, _P], _R]: + converter = self.converter + impl = self.impl + + def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: + return cast(Callable[Concatenate[_O, _P], _R], impl)( + converter(_self), *args, **kwargs + ) + + return real_impl + + @staticmethod + def create_factory(converter: Callable[[_T], _O]) -> Callable[ + [Callable[Concatenate[_O, _P], _R]], + CruDecorator.ImplementedBy[_T, _O, _P, _R], + ]: + def create( + m: Callable[Concatenate[_O, _P], _R], + ) -> CruDecorator.ImplementedBy[_T, _O, _P, _R]: + return CruDecorator.ImplementedBy(m, converter) + + return create + + class ImplementedByNoSelf(Generic[_P, _R]): + def __init__(self, impl: Callable[_P, _R]) -> None: + self.impl = impl + + def __call__( + self, _origin: Callable[[_T], None] + ) -> Callable[Concatenate[_T, _P], _R]: + impl = self.impl + + def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: + return cast(Callable[_P, _R], impl)(*args, **kwargs) + + return real_impl + + @staticmethod + def create_factory() -> ( + Callable[[Callable[_P, _R]], CruDecorator.ImplementedByNoSelf[_P, _R]] + ): + def create( + m: Callable[_P, _R], + ) -> CruDecorator.ImplementedByNoSelf[_P, _R]: + return CruDecorator.ImplementedByNoSelf(m) + + return create + + +CRU.add_objects(CruDecorator) diff --git a/tools/cru-py/cru/_func.py b/tools/cru-py/cru/_func.py index ef3da72..1b437be 100644 --- a/tools/cru-py/cru/_func.py +++ b/tools/cru-py/cru/_func.py @@ -10,59 +10,77 @@ from typing import ( ParamSpec, TypeAlias, TypeVar, - cast, ) + from ._cru import CRU from ._const import CruPlaceholder _P = ParamSpec("_P") +_P1 = ParamSpec("_P1") _T = TypeVar("_T") -_ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]] -_KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]] -_ChainableCallable: TypeAlias = Callable[ - ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]] -] +class _Dec: + @staticmethod + def wrap( + origin: Callable[_P, Callable[_P1, _T]] + ) -> Callable[_P, _Wrapper[_P1, _T]]: + def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_P1, _T]: + return _Wrapper(origin(*args, **kwargs)) + return _wrapped -class CruFunctionMeta: - class Base: - @staticmethod - def none(*_v, **_kwargs) -> None: - return None - @staticmethod - def true(*_v, **_kwargs) -> Literal[True]: - return True +class _RawBase: + @staticmethod + def none(*_v, **_kwargs) -> None: + return None - @staticmethod - def false(*_v, **_kwargs) -> Literal[False]: - return False + @staticmethod + def true(*_v, **_kwargs) -> Literal[True]: + return True - @staticmethod - def identity(v: _T) -> _T: - return v + @staticmethod + def false(*_v, **_kwargs) -> Literal[False]: + return False - @staticmethod - def only_you(v: _T, *_v, **_kwargs) -> _T: - return v + @staticmethod + def identity(v: _T) -> _T: + return v - @staticmethod - def equal(a: Any, b: Any) -> bool: - return a == b + @staticmethod + def only_you(v: _T, *_v, **_kwargs) -> _T: + return v - @staticmethod - def not_equal(a: Any, b: Any) -> bool: - return a != b + @staticmethod + def equal(a: Any, b: Any) -> bool: + return a == b - @staticmethod - def not_(v: Any) -> Any: - return not v + @staticmethod + def not_equal(a: Any, b: Any) -> bool: + return a != b @staticmethod - def bind(func: Callable[..., _T], *bind_args, **bind_kwargs) -> Callable[..., _T]: + def not_(v: Any) -> Any: + return not v + + +class _Wrapper(Generic[_P, _T]): + def __init__(self, f: Callable[_P, _T]): + self._f = f + + @property + def me(self) -> Callable[_P, _T]: + return self._f + + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: + return self._f(*args, **kwargs) + + @_Dec.wrap + def bind(self, *bind_args, **bind_kwargs) -> Callable[..., _T]: + func = self.me + def bound_func(*args, **kwargs): popped = 0 real_args = [] @@ -82,174 +100,74 @@ class CruFunctionMeta: KWARGS = auto() BOTH = ARGS | KWARGS - ArgsChainableCallable = _ArgsChainableCallable - KwargsChainableCallable = _KwargsChainableCallable - ChainableCallable = _ChainableCallable + ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]] + KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]] + ChainableCallable: TypeAlias = Callable[ + ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]] + ] - @staticmethod + @_Dec.wrap def chain_with_args( - funcs: Iterable[_ArgsChainableCallable], *bind_args, **bind_kwargs - ) -> _ArgsChainableCallable: + self, funcs: Iterable[ArgsChainableCallable], *bind_args, **bind_kwargs + ) -> ArgsChainableCallable: def chained_func(*args): + args = self.bind(*bind_args, **bind_kwargs)(*args) + for func in funcs: - args = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(*args) + args = _Wrapper(func).bind(*bind_args, **bind_kwargs)(*args) return args return chained_func - @staticmethod + @_Dec.wrap def chain_with_kwargs( - funcs: Iterable[_KwargsChainableCallable], *bind_args, **bind_kwargs - ) -> _KwargsChainableCallable: + self, funcs: Iterable[KwargsChainableCallable], *bind_args, **bind_kwargs + ) -> KwargsChainableCallable: def chained_func(**kwargs): + kwargs = self.bind(*bind_args, **bind_kwargs)(**kwargs) for func in funcs: - kwargs = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(**kwargs) + kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(**kwargs) return kwargs return chained_func - @staticmethod + @_Dec.wrap def chain_with_both( - funcs: Iterable[_ChainableCallable], *bind_args, **bind_kwargs - ) -> _ChainableCallable: + self, funcs: Iterable[ChainableCallable], *bind_args, **bind_kwargs + ) -> ChainableCallable: def chained_func(*args, **kwargs): for func in funcs: - args, kwargs = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)( + args, kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)( *args, **kwargs ) return args, kwargs return chained_func - @staticmethod - def chain( - mode: ChainMode, - funcs: Iterable[ - _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable - ], - *bind_args, - **bind_kwargs, - ) -> _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable: - if mode == CruFunctionMeta.ChainMode.ARGS: - return CruFunctionMeta.chain_with_args( - cast(Iterable[_ArgsChainableCallable], funcs), - *bind_args, - **bind_kwargs, - ) - elif mode == CruFunctionMeta.ChainMode.KWARGS: - return CruFunctionMeta.chain_with_kwargs( - cast(Iterable[_KwargsChainableCallable], funcs), - *bind_args, - **bind_kwargs, - ) - elif mode == CruFunctionMeta.ChainMode.BOTH: - return CruFunctionMeta.chain_with_both( - cast(Iterable[_ChainableCallable], funcs), *bind_args, **bind_kwargs - ) - - -class CruFunction(Generic[_P, _T]): - - def __init__(self, f: Callable[_P, _T]): - self._f = f - - @property - def me(self) -> Callable[_P, _T]: - return self._f - def bind(self, *bind_args, **bind_kwargs) -> CruFunction[..., _T]: - return CruFunction(CruFunctionMeta.bind(self._f, *bind_args, **bind_kwargs)) +class _Base: + none = _Wrapper(_RawBase.none) + true = _Wrapper(_RawBase.true) + false = _Wrapper(_RawBase.false) + identity = _Wrapper(_RawBase.identity) + only_you = _Wrapper(_RawBase.only_you) + equal = _Wrapper(_RawBase.equal) + not_equal = _Wrapper(_RawBase.not_equal) + not_ = _Wrapper(_RawBase.not_) - def _iter_with_self( - self, funcs: Iterable[Callable[..., Any]] - ) -> Iterable[Callable[..., Any]]: - yield self - yield from funcs +class _Creators: @staticmethod - def chain_with_args( - self, - funcs: Iterable[_ArgsChainableCallable], - *bind_args, - **bind_kwargs, - ) -> _ArgsChainableCallable: - return CruFunction( - CruFunctionMeta.chain_with_args( - self._iter_with_self(funcs), *bind_args, **bind_kwargs - ) - ) + def make_isinstance_of_types(*types: type) -> Callable: + return _Wrapper(lambda v: type(v) in types) - def chain_with_kwargs( - self, funcs: Iterable[_KwargsChainableCallable], *bind_args, **bind_kwargs - ) -> _KwargsChainableCallable: - return CruFunction( - CruFunctionMeta.chain_with_kwargs( - self._iter_with_self(funcs), *bind_args, **bind_kwargs - ) - ) - def chain_with_both( - self, funcs: Iterable[_ChainableCallable], *bind_args, **bind_kwargs - ) -> _ChainableCallable: - return CruFunction( - CruFunctionMeta.chain_with_both( - self._iter_with_self(funcs), *bind_args, **bind_kwargs - ) - ) - - def chain( - self, - mode: CruFunctionChainMode, - funcs: Iterable[ - _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable - ], - *bind_args, - **bind_kwargs, - ) -> _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable: - return CruFunction( - CruFunctionMeta.chain( - mode, self._iter_with_self(funcs), *bind_args, **bind_kwargs - ) - ) +class CruFunction: + RawBase = _RawBase + Base = _Base + Creators = _Creators + Wrapper = _Wrapper + Decorators = _Dec - def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: - return self._f(*args, **kwargs) - @staticmethod - def make_chain( - mode: CruFunctionChainMode, - funcs: Iterable[ - _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable - ], - *bind_args, - **bind_kwargs, - ) -> CruFunction: - return CruFunction( - CruFunctionMeta.chain(mode, funcs, *bind_args, **bind_kwargs) - ) - - -class CruWrappedFunctions: - none = CruFunction(CruRawFunctions.none) - true = CruFunction(CruRawFunctions.true) - false = CruFunction(CruRawFunctions.false) - identity = CruFunction(CruRawFunctions.identity) - only_you = CruFunction(CruRawFunctions.only_you) - equal = CruFunction(CruRawFunctions.equal) - not_equal = CruFunction(CruRawFunctions.not_equal) - not_ = CruFunction(CruRawFunctions.not_) - - -class CruFunctionGenerators: - @staticmethod - def make_isinstance_of_types(*types: type) -> Callable: - return CruFunction(lambda v: type(v) in types) - - -CRU.add_objects( - CruRawFunctions, - CruFunctionMeta, - CruFunction, - CruWrappedFunctions, - CruFunctionGenerators, -) +CRU.add_objects(CruFunction) diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py index da849e8..83a9513 100644 --- a/tools/cru-py/cru/_iter.py +++ b/tools/cru-py/cru/_iter.py @@ -8,6 +8,7 @@ from typing import ( Generator, Iterator, Literal, + Never, Self, TypeAlias, TypeVar, @@ -15,11 +16,9 @@ from typing import ( Any, Generic, cast, - overload, ) from ._cru import CRU -from ._meta import CruDecCreators from ._const import CruNotFound _P = ParamSpec("_P") @@ -28,208 +27,221 @@ _O = TypeVar("_O") _V = TypeVar("_V") _R = TypeVar("_R") -CanBeList: TypeAlias = Iterable[_T] | _T | None -ElementOperation: TypeAlias = Callable[[_T], Any] -ElementPredicate: TypeAlias = Callable[[_T], bool] -AnyElementPredicate: TypeAlias = ElementPredicate[Any] -ElementTransformer: TypeAlias = Callable[[_T], _O] -SelfElementTransformer: TypeAlias = ElementTransformer[_T, _T] -AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] +class _Generic: + class StepActionKind(Enum): + SKIP = 0 + PUSH = 1 + STOP = 2 + AGGREGATE = 3 + @dataclass + class StepAction(Generic[_V, _R]): + value: Iterable[Self] | _V | _R | None + kind: _Generic.StepActionKind -class _StepActionKind(Enum): - SKIP = 0 - PUSH = 1 - STOP = 2 - AGGREGATE = 3 + @property + def push_value(self) -> _V: + assert self.kind == _Generic.StepActionKind.PUSH + return cast(_V, self.value) + @property + def stop_value(self) -> _R: + assert self.kind == _Generic.StepActionKind.STOP + return cast(_R, self.value) -@dataclass -class _StepAction(Generic[_V, _R]): - value: Iterable[Self] | _V | _R | None - kind: _StepActionKind + @staticmethod + def skip() -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(None, _Generic.StepActionKind.SKIP) - @property - def push_value(self) -> _V: - assert self.kind == _StepActionKind.PUSH - return cast(_V, self.value) + @staticmethod + def push(value: _V | None) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(value, _Generic.StepActionKind.PUSH) - @property - def stop_value(self) -> _R: - assert self.kind == _StepActionKind.STOP - return cast(_R, self.value) + @staticmethod + def stop(value: _R | None = None) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(value, _Generic.StepActionKind.STOP) - @staticmethod - def skip() -> _StepAction[_V, _R]: - return _StepAction(None, _StepActionKind.SKIP) + @staticmethod + def aggregate( + *results: _Generic.StepAction[_V, _R] + ) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction(results, _Generic.StepActionKind.AGGREGATE) - @staticmethod - def push(value: _V | None) -> _StepAction[_V, _R]: - return _StepAction(value, _StepActionKind.PUSH) + @staticmethod + def push_last(value: _V | None) -> _Generic.StepAction[_V, _R]: + return _Generic.StepAction.aggregate( + _Generic.StepAction.push(value), _Generic.StepAction.stop() + ) - @staticmethod - def stop(value: _R | None = None) -> _StepAction[_V, _R]: - return _StepAction(value, _StepActionKind.STOP) + def flatten(self) -> Iterable[Self]: + return _Generic.flatten( + self, + is_leave=lambda r: r.kind != _Generic.StepActionKind.AGGREGATE, + get_children=lambda r: cast(Iterable[Self], r.value), + ) + + GeneralStepAction: TypeAlias = StepAction[_V, _R] | _V | _R | None + IterateOperation: TypeAlias = Callable[[_T, int], GeneralStepAction[_V, _R]] + IteratePreHook: TypeAlias = Callable[[Iterable[_T]], GeneralStepAction[_V, _R]] + IteratePostHook: TypeAlias = Callable[[int], GeneralStepAction[_V, _R]] @staticmethod - def aggregate(*results: _StepAction[_V, _R]) -> _StepAction[_V, _R]: - return _StepAction(results, _StepActionKind.AGGREGATE) + def _is_not_iterable(o: Any) -> bool: + return not isinstance(o, Iterable) @staticmethod - def push_last(value: _V | None) -> _StepAction[_V, _R]: - return _StepAction.aggregate(_StepAction.push(value), _StepAction.stop()) - - def flatten(self) -> Iterable[Self]: - return CruIterableMeta.flatten( - self, - is_leave=lambda r: r.kind != _StepActionKind.AGGREGATE, - get_children=lambda r: cast(Iterable[Self], r.value), - ) + def _return_self(o): + return o + @staticmethod + def iterable_flatten( + maybe_iterable: Iterable[_T] | _T, max_depth: int = -1, *, _depth: int = 0 + ) -> Iterable[Iterable[_T] | _T]: + if _depth == max_depth or not isinstance(maybe_iterable, Iterable): + yield maybe_iterable + return -_GeneralStepAction: TypeAlias = _StepAction[_V, _R] | _V | _R | None -_IterateOperation = Callable[[_T, int], _GeneralStepAction[_V, _R]] -_IteratePreHook = Callable[[Iterable[_T]], _GeneralStepAction[_V, _R]] -_IteratePostHook = Callable[[int], _GeneralStepAction[_V, _R]] + for child in maybe_iterable: + yield from _Generic.iterable_flatten( + child, + max_depth, + _depth=_depth + 1, + ) + @staticmethod + def flatten( + o: _O, + max_depth: int = -1, + /, + is_leave: _Wrapper.ElementPredicate[_O] = _is_not_iterable, + get_children: _Wrapper.ElementTransformer[_O, Iterable[_O]] = _return_self, + *, + _depth: int = 0, + ) -> Iterable[_O]: + if _depth == max_depth or is_leave(o): + yield o + return + for child in get_children(o): + yield from _Generic.flatten( + child, + max_depth, + is_leave, + get_children, + _depth=_depth + 1, + ) -class CruIterableMeta: - class Generic: - StepActionKind = _StepActionKind - StepAction = _StepAction - GeneralStepAction = _GeneralStepAction - IterateOperation = _IterateOperation - IteratePreHook = _IteratePreHook - IteratePostHook = _IteratePostHook + class Results: + @staticmethod + def true(_) -> Literal[True]: + return True - class Results: - @staticmethod - def true(_) -> Literal[True]: - return True + @staticmethod + def false(_) -> Literal[False]: + return False - @staticmethod - def false(_) -> Literal[False]: - return False + @staticmethod + def not_found(_) -> Literal[CruNotFound.VALUE]: + return CruNotFound.VALUE - @staticmethod - def not_found(_) -> Literal[CruNotFound.VALUE]: - return CruNotFound.VALUE + @staticmethod + def _non_result_to_push(value: Any) -> StepAction[_V, _R]: + return _Generic.StepAction.push(value) - @staticmethod - def _non_result_to_push(value: Any) -> _StepAction[_V, _R]: - return _StepAction.push(value) + @staticmethod + def _non_result_to_stop(value: Any) -> StepAction[_V, _R]: + return _Generic.StepAction.stop(value) - @staticmethod - def _non_result_to_stop(value: Any) -> _StepAction[_V, _R]: - return _StepAction.stop(value) + @staticmethod + def _none_hook(_: Any) -> StepAction[_V, _R]: + return _Generic.StepAction.skip() - @staticmethod - def _none_hook(_: Any) -> _StepAction[_V, _R]: - return _StepAction.skip() - - def iterate( - iterable: Iterable[_T], - operation: _IterateOperation[_T, _V, _R], - fallback_return: _R, - pre_iterate: _IteratePreHook[_T, _V, _R], - post_iterate: _IteratePostHook[_V, _R], - convert_value_result: Callable[[_V | _R | None], _StepAction[_V, _R]], - ) -> Generator[_V, None, _R]: - pre_result = pre_iterate(iterable) - if not isinstance(pre_result, _StepAction): - real_pre_result = convert_value_result(pre_result) - for r in real_pre_result.flatten(): - if r.kind == _StepActionKind.STOP: - return r.stop_value - elif r.kind == _StepActionKind.PUSH: - yield r.push_value - else: - assert r.kind == _StepActionKind.SKIP - - for index, element in enumerate(iterable): - result = operation(element, index) - if not isinstance(result, _StepAction): - real_result = convert_value_result(result) - for r in real_result.flatten(): - if r.kind == _StepActionKind.STOP: - return r.stop_value - elif r.kind == _StepActionKind.PUSH: - yield r.push_value - else: - assert r.kind == _StepActionKind.SKIP - continue - - post_result = post_iterate(index + 1) - if not isinstance(post_result, _StepAction): - real_post_result = convert_value_result(post_result) - for r in real_post_result.flatten(): - if r.kind == _StepActionKind.STOP: + def iterate( + iterable: Iterable[_T], + operation: IterateOperation[_T, _V, _R], + fallback_return: _R, + pre_iterate: IteratePreHook[_T, _V, _R], + post_iterate: IteratePostHook[_V, _R], + convert_value_result: Callable[[_V | _R | None], StepAction[_V, _R]], + ) -> Generator[_V, None, _R]: + pre_result = pre_iterate(iterable) + if not isinstance(pre_result, _Generic.StepAction): + real_pre_result = convert_value_result(pre_result) + for r in real_pre_result.flatten(): + if r.kind == _Generic.StepActionKind.STOP: + return r.stop_value + elif r.kind == _Generic.StepActionKind.PUSH: + yield r.push_value + else: + assert r.kind == _Generic.StepActionKind.SKIP + + for index, element in enumerate(iterable): + result = operation(element, index) + if not isinstance(result, _Generic.StepAction): + real_result = convert_value_result(result) + for r in real_result.flatten(): + if r.kind == _Generic.StepActionKind.STOP: return r.stop_value - elif r.kind == _StepActionKind.PUSH: + elif r.kind == _Generic.StepActionKind.PUSH: yield r.push_value else: - assert r.kind == _StepActionKind.SKIP - - return fallback_return - - def create_new( - iterable: Iterable[_T], - operation: _IterateOperation[_T, _V, _R], - fallback_return: _R, - /, - pre_iterate: _IteratePreHook[_T, _V, _R] | None = None, - post_iterate: _IteratePostHook[_V, _R] | None = None, - ) -> Generator[_V, None, _R]: - return CruIterableMeta.Generic.iterate( + assert r.kind == _Generic.StepActionKind.SKIP + continue + + post_result = post_iterate(index + 1) + if not isinstance(post_result, _Generic.StepAction): + real_post_result = convert_value_result(post_result) + for r in real_post_result.flatten(): + if r.kind == _Generic.StepActionKind.STOP: + return r.stop_value + elif r.kind == _Generic.StepActionKind.PUSH: + yield r.push_value + else: + assert r.kind == _Generic.StepActionKind.SKIP + + return fallback_return + + def create_new( + iterable: Iterable[_T], + operation: IterateOperation[_T, _V, _R], + fallback_return: _R, + /, + pre_iterate: IteratePreHook[_T, _V, _R] | None = None, + post_iterate: IteratePostHook[_V, _R] | None = None, + ) -> Generator[_V, None, _R]: + return _Generic.iterate( + iterable, + operation, + fallback_return, + pre_iterate or _Generic._none_hook, + post_iterate or _Generic._none_hook, + _Generic._non_result_to_push, + ) + + def get_result( + iterable: Iterable[_T], + operation: IterateOperation[_T, _V, _R], + fallback_return: _R, + /, + pre_iterate: IteratePreHook[_T, _V, _R] | None = None, + post_iterate: IteratePostHook[_V, _R] | None = None, + ) -> _R: + try: + for _ in _Generic.iterate( iterable, operation, fallback_return, - pre_iterate or CruIterableMeta.Generic._none_hook, - post_iterate or CruIterableMeta.Generic._none_hook, - CruIterableMeta.Generic._non_result_to_push, - ) + pre_iterate or _Generic._none_hook, + post_iterate or _Generic._none_hook, + _Generic._non_result_to_stop, + ): + pass + except StopIteration as stop: + return stop.value + raise RuntimeError("Should not reach here") - def get_result( - iterable: Iterable[_T], - operation: _IterateOperation[_T, _V, _R], - fallback_return: _R, - /, - pre_iterate: _IteratePreHook[_T, _V, _R] | None = None, - post_iterate: _IteratePostHook[_V, _R] | None = None, - ) -> _R: - try: - for _ in CruIterableMeta.Generic.iterate( - iterable, - operation, - fallback_return, - pre_iterate or CruIterableMeta.Generic._none_hook, - post_iterate or CruIterableMeta.Generic._none_hook, - CruIterableMeta.Generic._non_result_to_stop, - ): - pass - except StopIteration as stop: - return stop.value - raise RuntimeError("Should not reach here") - - class Creators: - @staticmethod - def empty() -> Iterable[_T]: - return iter([]) - - @staticmethod - def range(*args) -> Iterable[int]: - return iter(range(*args)) - - @staticmethod - def unite(*args: _O) -> Iterable[_O]: - return iter(args) - - @staticmethod - def concat(*iterables: Iterable[_T]) -> Iterable[_T]: - for iterable in iterables: - yield from iterable +class _Helper: @staticmethod def with_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]: count = 0 @@ -242,78 +254,139 @@ class CruIterableMeta: return wrapper + +class _Dec: + + @staticmethod + def wrap(origin: Callable[_P, Iterable[_T]]) -> Callable[_P, _Wrapper[_T]]: + def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_T]: + return _Wrapper(origin(*args, **kwargs)) + + return _wrapped + + +class _Creators: @staticmethod - def to_set(iterable: Iterable[_T], discard: Iterable[Any]) -> set[_T]: - return set(iterable) - set(discard) + @_Dec.wrap + def empty() -> Iterable[Never]: + return iter([]) @staticmethod - def to_list(iterable: Iterable[_T], discard: Iterable[Any]) -> list[_T]: - return [v for v in iterable if v not in set(discard)] + @_Dec.wrap + def range(*args) -> Iterable[int]: + return iter(range(*args)) @staticmethod - def all(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> bool: - for value in iterable: + @_Dec.wrap + def unite(*args: _O) -> Iterable[_O]: + return iter(args) + + @staticmethod + @_Dec.wrap + def concat(*iterables: Iterable[_T]) -> Iterable[_T]: + for iterable in iterables: + yield from iterable + + +class _Wrapper(Generic[_T]): + ElementOperation: TypeAlias = Callable[[_V], Any] + ElementPredicate: TypeAlias = Callable[[_V], bool] + AnyElementPredicate: TypeAlias = ElementPredicate[Any] + ElementTransformer: TypeAlias = Callable[[_V], _O] + SelfElementTransformer: TypeAlias = ElementTransformer[_V, _V] + AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] + + def __init__( + self, + iterable: Iterable[_T], + ) -> None: + self._iterable = iterable + + def __iter__(self) -> Iterator[_T]: + return self._iterable.__iter__() + + @property + def me(self) -> Iterable[_T]: + return self._iterable + + _wrap = _Dec.wrap + + @_wrap + def replace_me(self, iterable: Iterable[_O]) -> Iterable[_O]: + return iterable + + def replace_me_with_empty(self) -> _Wrapper[Never]: + return _Creators.empty() + + def replace_me_with_range(self, *args) -> _Wrapper[int]: + return _Creators.range(*args) + + def replace_me_with_unite(self, *args: _O) -> _Wrapper[_O]: + return _Creators.unite(*args) + + def replace_me_with_concat(self, *iterables: Iterable[_T]) -> _Wrapper[_T]: + return _Creators.concat(*iterables) + + def to_set(self, discard: Iterable[Any]) -> set[_T]: + return set(self.me) - set(discard) + + def to_list(self, discard: Iterable[Any]) -> list[_T]: + return [v for v in self.me if v not in set(discard)] + + def all(self, predicate: ElementPredicate[_T]) -> bool: + for value in self.me: if not predicate(value): return False return True - @staticmethod - def any(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> bool: - for value in iterable: + def any(self, predicate: ElementPredicate[_T]) -> bool: + for value in self.me: if predicate(value): return True return False - @staticmethod - def foreach(iterable: Iterable[_T], operation: ElementOperation[_T]) -> None: - for value in iterable: + def foreach(self, operation: ElementOperation[_T]) -> None: + for value in self.me: operation(value) - @staticmethod - def transform( - iterable: Iterable[_T], transformer: ElementTransformer[_T, _O] - ) -> Iterable[_O]: - for value in iterable: + @_wrap + def transform(self, transformer: ElementTransformer[_T, _O]) -> Iterable[_O]: + for value in self.me: yield transformer(value) - @staticmethod - def filter(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> Iterable[_T]: - for value in iterable: + map = transform + + @_wrap + def filter(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: + for value in self.me: if predicate(value): yield value - @staticmethod - def continue_if( - iterable: Iterable[_T], predicate: ElementPredicate[_T] - ) -> Iterable[_T]: - for value in iterable: + @_wrap + def continue_if(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: + for value in self.me: yield value if not predicate(value): break - @staticmethod - def first_n(iterable: Iterable[_T], max_count: int) -> Iterable[_T]: + def first_n(self, max_count: int) -> _Wrapper[_T]: if max_count < 0: raise ValueError("max_count must be 0 or positive.") if max_count == 0: - return CruIterableMeta.Creators.empty() - return CruIterableMeta.continue_if( - iterable, _with_count(lambda i, _: i < max_count - 1) - ) + return self.replace_me_with_empty() # type: ignore + return self.continue_if(_Helper.with_count(lambda i, _: i < max_count - 1)) - @staticmethod - def drop_n(iterable: Iterable[_T], n: int) -> Iterable[_T]: + def drop_n(self, n: int) -> _Wrapper[_T]: if n < 0: raise ValueError("n must be 0 or positive.") if n == 0: - return iterable - return CruIterableMeta.filter(iterable, _with_count(lambda i, _: i < n)) + return self + return self.filter(_Helper.with_count(lambda i, _: i < n)) - @staticmethod def single_or( - iterable: Iterable[_T], fallback: _O | CruNotFound | None = CruNotFound.VALUE + self, fallback: _O | CruNotFound | None = CruNotFound.VALUE ) -> _T | _O | Any | CruNotFound: - first_2 = CruIterableMeta.first_n(iterable, 2) + first_2 = self.first_n(2) has_value = False value = None for element in first_2: @@ -326,181 +399,34 @@ class CruIterableMeta: else: return fallback - @staticmethod - def _is_not_iterable(o: Any) -> bool: - return not isinstance(o, Iterable) - - @staticmethod - def _return_self(o): - return o - - @staticmethod - @overload - def flatten(o: Iterable[_T], max_depth: int = -1) -> Iterable[_T]: ... - - @staticmethod - @overload - def flatten( - o: _O, - max_depth: int = -1, - *, - is_leave: ElementPredicate[_O], - get_children: ElementTransformer[_O, Iterable[_O]], - ) -> Iterable[_O]: ... - - @staticmethod - def flatten( - o: _O, - max_depth: int = -1, - *, - is_leave: ElementPredicate[_O] = _is_not_iterable, - get_children: ElementTransformer[_O, Iterable[_O]] = _return_self, - _depth: int = 0, - ) -> Iterable[Any]: - if _depth == max_depth or is_leave(o): - yield o - return - for child in get_children(o): - yield from CruIterableMeta.flatten( - child, - max_depth, - is_leave=is_leave, - get_children=get_children, - _depth=_depth + 1, - ) # type: ignore - - -_with_count = CruIterableMeta.with_count - - -class CruIterableWrapper(Generic[_T]): - Meta = CruIterableMeta - - class Dec: - @staticmethod - def _wrap(iterable: Iterable[_O]) -> CruIterableWrapper[_O]: - return CruIterableWrapper(iterable) - - wrap = CruDecCreators.convert_result(_wrap) - - @staticmethod - def _as_iterable(_self: CruIterableWrapper[_O]) -> CruIterableWrapper[_O]: - return _self - - meta = CruDecCreators.create_implement_by(_as_iterable) - meta_no_self = CruDecCreators.create_implement_by_no_self() - - def __init__( - self, - iterable: Iterable[_T], - ) -> None: - self._iterable = iterable - - def __iter__(self) -> Iterator[_T]: - return self._iterable.__iter__() - - @property - def me(self) -> Iterable[_T]: - return self._iterable - - def replace_me(self, iterable: Iterable[_O]) -> CruIterableWrapper[_O]: - return CruIterableWrapper(iterable) - - @Dec.wrap - @Dec.meta_no_self(CruIterableMeta.Creators.empty) - def replace_me_with_empty(self) -> None: - pass - - @Dec.wrap - @Dec.meta_no_self(CruIterableMeta.Creators.range) - def replace_me_with_range(self) -> None: - pass - - @Dec.wrap - @Dec.meta_no_self(CruIterableMeta.Creators.unite) - def replace_me_with_unite(self) -> None: - pass + @_wrap + def flatten(self) -> Iterable[_T | Iterable[_T]]: + return _Generic.iterable_flatten(self.me) - @Dec.wrap - @Dec.meta_no_self(CruIterableMeta.Creators.concat) - def replace_me_with_concat(self) -> None: - pass - - @Dec.meta(CruIterableMeta.to_set) - def to_set(self) -> None: - pass - - @Dec.meta(CruIterableMeta.to_list) - def to_list(self) -> None: - pass - - @Dec.meta(CruIterableMeta.all) - def all(self) -> None: - pass - - @Dec.meta(CruIterableMeta.any) - def any(self) -> None: - pass - - @Dec.meta(CruIterableMeta.foreach) - def foreach(self) -> None: - pass - - @Dec.wrap - @Dec.meta(CruIterableMeta.transform) - def transform(self) -> None: - pass - - map = transform - - @Dec.wrap - @Dec.meta(CruIterableMeta.filter) - def filter(self) -> None: - pass - - @Dec.wrap - @Dec.meta(CruIterableMeta.continue_if) - def continue_if(self) -> None: - pass - - @Dec.wrap - @Dec.meta(CruIterableMeta.first_n) - def first_n(self) -> None: - pass - - @Dec.wrap - @Dec.meta(CruIterableMeta.drop_n) - def drop_n(self) -> None: - pass - - @Dec.wrap - @Dec.meta(CruIterableMeta.flatten) - def flatten(self) -> None: - pass - - @Dec.meta(CruIterableMeta.single_or) - def single_or(self) -> None: - pass - - @Dec.wrap - def select_by_indices(self, indices: Iterable[int]) -> Iterable[_T]: + def select_by_indices(self, indices: Iterable[int]) -> _Wrapper[_T]: index_set = set(indices) max_index = max(index_set) return self.first_n(max_index + 1).filter( - _with_count(lambda i, _: i in index_set) + _Helper.with_count(lambda i, _: i in index_set) ) - @Dec.wrap - def remove_values(self, values: Iterable[Any]) -> Iterable[_V]: + def remove_values(self, values: Iterable[Any]) -> _Wrapper[_T]: value_set = set(values) return self.filter(lambda v: v not in value_set) - @Dec.wrap def replace_values( self, old_values: Iterable[Any], new_value: _O - ) -> Iterable[_V | _O]: + ) -> Iterable[_T | _O]: value_set = set(old_values) - return self.map(lambda v: new_value if v in value_set else v) + return self.transform(lambda v: new_value if v in value_set else v) + + +class CruIterable: + Decorators = _Dec + Generic = _Generic + Helper = _Helper + Creators = _Creators + Wrapper = _Wrapper -CRU.add_objects(CruIterableMeta, CruIterableWrapper) +CRU.add_objects(CruIterable, _Wrapper) diff --git a/tools/cru-py/cru/_meta.py b/tools/cru-py/cru/_meta.py deleted file mode 100644 index 86763cf..0000000 --- a/tools/cru-py/cru/_meta.py +++ /dev/null @@ -1,92 +0,0 @@ -from collections.abc import Callable -from typing import Concatenate, Generic, ParamSpec, TypeVar - -_P = ParamSpec("_P") -_T = TypeVar("_T") -_O = TypeVar("_O") -_R = TypeVar("_R") - - -class CruDecorator: - - class ImplementedBy(Generic[_O, _P, _R]): - # TODO: Continue here tomorrow! - def __init__( - self, impl: Callable[Concatenate[_O, _P], _R], pass_self: bool - ) -> None: - self.impl = impl - self.pass_self = pass_self - - def __call__( - self, _origin: Callable[[_T], None] - ) -> Callable[Concatenate[_T, _P], _R]: - def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: - if self.pass_self: - return self.impl(_self, *args, **kwargs) - else: - return self.impl(*args, **kwargs) - - return real_impl - - @staticmethod - def convert_result( - converter: Callable[[_T], _O] - ) -> Callable[[Callable[_P, _T]], Callable[_P, _O]]: - def dec(original: Callable[_P, _T]) -> Callable[_P, _O]: - def wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _O: - return converter(original(*args, **kwargs)) - - return wrapped - - return dec - - @staticmethod - def create_implement_by(converter: Callable[[_T], _O]) -> Callable[ - [Callable[Concatenate[_O, _P], _R]], - Callable[ - [Callable[[_T], None]], - Callable[Concatenate[_T, _P], _R], - ], - ]: - def implement_by( - m: Callable[Concatenate[_O, _P], _R], - ) -> Callable[ - [Callable[[_T], None]], - Callable[Concatenate[_T, _P], _R], - ]: - def implementation(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: - return m(converter(_self), *args, **kwargs) - - def decorator(_: Callable[[_T], None]) -> Callable[Concatenate[_T, _P], _R]: - return implementation - - return decorator - - return implement_by - - @staticmethod - def create_implement_by_no_self() -> Callable[ - [Callable[_P, _R]], - Callable[ - [Callable[[_T], None]], - Callable[Concatenate[_T, _P], _R], - ], - ]: - def implement_by_no_self( - m: Callable[_P, _R], - ) -> Callable[ - [Callable[[_T], None]], - Callable[Concatenate[_T, _P], _R], - ]: - def implementation(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: - return m(*args, **kwargs) - - def decorator(_: Callable[[_T], None]) -> Callable[Concatenate[_T, _P], _R]: - return implementation - - return decorator - - return implement_by_no_self - - -CRU.add |