diff options
| author | crupest <crupest@outlook.com> | 2024-11-11 01:12:29 +0800 | 
|---|---|---|
| committer | Yuqian Yang <crupest@crupest.life> | 2024-12-18 18:31:27 +0800 | 
| commit | ea0ad38152d7ebad5daf9f6c2f5053168a63645d (patch) | |
| tree | 2195f5fe971a62fc253d021983753a18f9c81d10 | |
| parent | 6f285760e1432a6c7f60d94c15af4bb3f29ce8fa (diff) | |
| download | crupest-ea0ad38152d7ebad5daf9f6c2f5053168a63645d.tar.gz crupest-ea0ad38152d7ebad5daf9f6c2f5053168a63645d.tar.bz2 crupest-ea0ad38152d7ebad5daf9f6c2f5053168a63645d.zip | |
HALF WORK: 2024.11.29
| -rw-r--r-- | tools/cru-py/cru/_decorator.py | 97 | ||||
| -rw-r--r-- | tools/cru-py/cru/_func.py | 262 | ||||
| -rw-r--r-- | tools/cru-py/cru/_iter.py | 674 | ||||
| -rw-r--r-- | tools/cru-py/cru/_meta.py | 92 | 
4 files changed, 487 insertions, 638 deletions
| diff --git a/tools/cru-py/cru/_decorator.py b/tools/cru-py/cru/_decorator.py new file mode 100644 index 0000000..432ceca --- /dev/null +++ b/tools/cru-py/cru/_decorator.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import ( +    Concatenate, +    Generic, +    ParamSpec, +    TypeVar, +    cast, +) + +from ._cru import CRU + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_O = TypeVar("_O") +_R = TypeVar("_R") + + +class CruDecorator: + +    class ConvertResult(Generic[_T, _O]): +        def __init__( +            self, +            converter: Callable[[_T], _O], +        ) -> None: +            self.converter = converter + +        def __call__(self, origin: Callable[_P, _T]) -> Callable[_P, _O]: +            converter = self.converter + +            def real_impl(*args: _P.args, **kwargs: _P.kwargs) -> _O: +                return converter(origin(*args, **kwargs)) + +            return real_impl + +    class ImplementedBy(Generic[_T, _O, _P, _R]): +        def __init__( +            self, +            impl: Callable[Concatenate[_O, _P], _R], +            converter: Callable[[_T], _O], +        ) -> None: +            self.impl = impl +            self.converter = converter + +        def __call__( +            self, _origin: Callable[[_T], None] +        ) -> Callable[Concatenate[_T, _P], _R]: +            converter = self.converter +            impl = self.impl + +            def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: +                return cast(Callable[Concatenate[_O, _P], _R], impl)( +                    converter(_self), *args, **kwargs +                ) + +            return real_impl + +        @staticmethod +        def create_factory(converter: Callable[[_T], _O]) -> Callable[ +            [Callable[Concatenate[_O, _P], _R]], +            CruDecorator.ImplementedBy[_T, _O, _P, _R], +        ]: +            def create( +                m: Callable[Concatenate[_O, _P], _R], +            ) -> CruDecorator.ImplementedBy[_T, _O, _P, _R]: +                return CruDecorator.ImplementedBy(m, converter) + +            return create + +    class ImplementedByNoSelf(Generic[_P, _R]): +        def __init__(self, impl: Callable[_P, _R]) -> None: +            self.impl = impl + +        def __call__( +            self, _origin: Callable[[_T], None] +        ) -> Callable[Concatenate[_T, _P], _R]: +            impl = self.impl + +            def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: +                return cast(Callable[_P, _R], impl)(*args, **kwargs) + +            return real_impl + +        @staticmethod +        def create_factory() -> ( +            Callable[[Callable[_P, _R]], CruDecorator.ImplementedByNoSelf[_P, _R]] +        ): +            def create( +                m: Callable[_P, _R], +            ) -> CruDecorator.ImplementedByNoSelf[_P, _R]: +                return CruDecorator.ImplementedByNoSelf(m) + +            return create + + +CRU.add_objects(CruDecorator) diff --git a/tools/cru-py/cru/_func.py b/tools/cru-py/cru/_func.py index ef3da72..1b437be 100644 --- a/tools/cru-py/cru/_func.py +++ b/tools/cru-py/cru/_func.py @@ -10,59 +10,77 @@ from typing import (      ParamSpec,      TypeAlias,      TypeVar, -    cast,  ) +  from ._cru import CRU  from ._const import CruPlaceholder  _P = ParamSpec("_P") +_P1 = ParamSpec("_P1")  _T = TypeVar("_T") -_ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]] -_KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]] -_ChainableCallable: TypeAlias = Callable[ -    ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]] -] +class _Dec: +    @staticmethod +    def wrap( +        origin: Callable[_P, Callable[_P1, _T]] +    ) -> Callable[_P, _Wrapper[_P1, _T]]: +        def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_P1, _T]: +            return _Wrapper(origin(*args, **kwargs)) +        return _wrapped -class CruFunctionMeta: -    class Base: -        @staticmethod -        def none(*_v, **_kwargs) -> None: -            return None -        @staticmethod -        def true(*_v, **_kwargs) -> Literal[True]: -            return True +class _RawBase: +    @staticmethod +    def none(*_v, **_kwargs) -> None: +        return None -        @staticmethod -        def false(*_v, **_kwargs) -> Literal[False]: -            return False +    @staticmethod +    def true(*_v, **_kwargs) -> Literal[True]: +        return True -        @staticmethod -        def identity(v: _T) -> _T: -            return v +    @staticmethod +    def false(*_v, **_kwargs) -> Literal[False]: +        return False -        @staticmethod -        def only_you(v: _T, *_v, **_kwargs) -> _T: -            return v +    @staticmethod +    def identity(v: _T) -> _T: +        return v -        @staticmethod -        def equal(a: Any, b: Any) -> bool: -            return a == b +    @staticmethod +    def only_you(v: _T, *_v, **_kwargs) -> _T: +        return v -        @staticmethod -        def not_equal(a: Any, b: Any) -> bool: -            return a != b +    @staticmethod +    def equal(a: Any, b: Any) -> bool: +        return a == b -        @staticmethod -        def not_(v: Any) -> Any: -            return not v +    @staticmethod +    def not_equal(a: Any, b: Any) -> bool: +        return a != b      @staticmethod -    def bind(func: Callable[..., _T], *bind_args, **bind_kwargs) -> Callable[..., _T]: +    def not_(v: Any) -> Any: +        return not v + + +class _Wrapper(Generic[_P, _T]): +    def __init__(self, f: Callable[_P, _T]): +        self._f = f + +    @property +    def me(self) -> Callable[_P, _T]: +        return self._f + +    def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: +        return self._f(*args, **kwargs) + +    @_Dec.wrap +    def bind(self, *bind_args, **bind_kwargs) -> Callable[..., _T]: +        func = self.me +          def bound_func(*args, **kwargs):              popped = 0              real_args = [] @@ -82,174 +100,74 @@ class CruFunctionMeta:          KWARGS = auto()          BOTH = ARGS | KWARGS -    ArgsChainableCallable = _ArgsChainableCallable -    KwargsChainableCallable = _KwargsChainableCallable -    ChainableCallable = _ChainableCallable +    ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]] +    KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]] +    ChainableCallable: TypeAlias = Callable[ +        ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]] +    ] -    @staticmethod +    @_Dec.wrap      def chain_with_args( -        funcs: Iterable[_ArgsChainableCallable], *bind_args, **bind_kwargs -    ) -> _ArgsChainableCallable: +        self, funcs: Iterable[ArgsChainableCallable], *bind_args, **bind_kwargs +    ) -> ArgsChainableCallable:          def chained_func(*args): +            args = self.bind(*bind_args, **bind_kwargs)(*args) +              for func in funcs: -                args = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(*args) +                args = _Wrapper(func).bind(*bind_args, **bind_kwargs)(*args)              return args          return chained_func -    @staticmethod +    @_Dec.wrap      def chain_with_kwargs( -        funcs: Iterable[_KwargsChainableCallable], *bind_args, **bind_kwargs -    ) -> _KwargsChainableCallable: +        self, funcs: Iterable[KwargsChainableCallable], *bind_args, **bind_kwargs +    ) -> KwargsChainableCallable:          def chained_func(**kwargs): +            kwargs = self.bind(*bind_args, **bind_kwargs)(**kwargs)              for func in funcs: -                kwargs = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(**kwargs) +                kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(**kwargs)              return kwargs          return chained_func -    @staticmethod +    @_Dec.wrap      def chain_with_both( -        funcs: Iterable[_ChainableCallable], *bind_args, **bind_kwargs -    ) -> _ChainableCallable: +        self, funcs: Iterable[ChainableCallable], *bind_args, **bind_kwargs +    ) -> ChainableCallable:          def chained_func(*args, **kwargs):              for func in funcs: -                args, kwargs = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)( +                args, kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(                      *args, **kwargs                  )              return args, kwargs          return chained_func -    @staticmethod -    def chain( -        mode: ChainMode, -        funcs: Iterable[ -            _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable -        ], -        *bind_args, -        **bind_kwargs, -    ) -> _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable: -        if mode == CruFunctionMeta.ChainMode.ARGS: -            return CruFunctionMeta.chain_with_args( -                cast(Iterable[_ArgsChainableCallable], funcs), -                *bind_args, -                **bind_kwargs, -            ) -        elif mode == CruFunctionMeta.ChainMode.KWARGS: -            return CruFunctionMeta.chain_with_kwargs( -                cast(Iterable[_KwargsChainableCallable], funcs), -                *bind_args, -                **bind_kwargs, -            ) -        elif mode == CruFunctionMeta.ChainMode.BOTH: -            return CruFunctionMeta.chain_with_both( -                cast(Iterable[_ChainableCallable], funcs), *bind_args, **bind_kwargs -            ) - - -class CruFunction(Generic[_P, _T]): - -    def __init__(self, f: Callable[_P, _T]): -        self._f = f - -    @property -    def me(self) -> Callable[_P, _T]: -        return self._f -    def bind(self, *bind_args, **bind_kwargs) -> CruFunction[..., _T]: -        return CruFunction(CruFunctionMeta.bind(self._f, *bind_args, **bind_kwargs)) +class _Base: +    none = _Wrapper(_RawBase.none) +    true = _Wrapper(_RawBase.true) +    false = _Wrapper(_RawBase.false) +    identity = _Wrapper(_RawBase.identity) +    only_you = _Wrapper(_RawBase.only_you) +    equal = _Wrapper(_RawBase.equal) +    not_equal = _Wrapper(_RawBase.not_equal) +    not_ = _Wrapper(_RawBase.not_) -    def _iter_with_self( -        self, funcs: Iterable[Callable[..., Any]] -    ) -> Iterable[Callable[..., Any]]: -        yield self -        yield from funcs +class _Creators:      @staticmethod -    def chain_with_args( -        self, -        funcs: Iterable[_ArgsChainableCallable], -        *bind_args, -        **bind_kwargs, -    ) -> _ArgsChainableCallable: -        return CruFunction( -            CruFunctionMeta.chain_with_args( -                self._iter_with_self(funcs), *bind_args, **bind_kwargs -            ) -        ) +    def make_isinstance_of_types(*types: type) -> Callable: +        return _Wrapper(lambda v: type(v) in types) -    def chain_with_kwargs( -        self, funcs: Iterable[_KwargsChainableCallable], *bind_args, **bind_kwargs -    ) -> _KwargsChainableCallable: -        return CruFunction( -            CruFunctionMeta.chain_with_kwargs( -                self._iter_with_self(funcs), *bind_args, **bind_kwargs -            ) -        ) -    def chain_with_both( -        self, funcs: Iterable[_ChainableCallable], *bind_args, **bind_kwargs -    ) -> _ChainableCallable: -        return CruFunction( -            CruFunctionMeta.chain_with_both( -                self._iter_with_self(funcs), *bind_args, **bind_kwargs -            ) -        ) - -    def chain( -        self, -        mode: CruFunctionChainMode, -        funcs: Iterable[ -            _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable -        ], -        *bind_args, -        **bind_kwargs, -    ) -> _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable: -        return CruFunction( -            CruFunctionMeta.chain( -                mode, self._iter_with_self(funcs), *bind_args, **bind_kwargs -            ) -        ) +class CruFunction: +    RawBase = _RawBase +    Base = _Base +    Creators = _Creators +    Wrapper = _Wrapper +    Decorators = _Dec -    def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T: -        return self._f(*args, **kwargs) -    @staticmethod -    def make_chain( -        mode: CruFunctionChainMode, -        funcs: Iterable[ -            _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable -        ], -        *bind_args, -        **bind_kwargs, -    ) -> CruFunction: -        return CruFunction( -            CruFunctionMeta.chain(mode, funcs, *bind_args, **bind_kwargs) -        ) - - -class CruWrappedFunctions: -    none = CruFunction(CruRawFunctions.none) -    true = CruFunction(CruRawFunctions.true) -    false = CruFunction(CruRawFunctions.false) -    identity = CruFunction(CruRawFunctions.identity) -    only_you = CruFunction(CruRawFunctions.only_you) -    equal = CruFunction(CruRawFunctions.equal) -    not_equal = CruFunction(CruRawFunctions.not_equal) -    not_ = CruFunction(CruRawFunctions.not_) - - -class CruFunctionGenerators: -    @staticmethod -    def make_isinstance_of_types(*types: type) -> Callable: -        return CruFunction(lambda v: type(v) in types) - - -CRU.add_objects( -    CruRawFunctions, -    CruFunctionMeta, -    CruFunction, -    CruWrappedFunctions, -    CruFunctionGenerators, -) +CRU.add_objects(CruFunction) diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py index da849e8..83a9513 100644 --- a/tools/cru-py/cru/_iter.py +++ b/tools/cru-py/cru/_iter.py @@ -8,6 +8,7 @@ from typing import (      Generator,      Iterator,      Literal, +    Never,      Self,      TypeAlias,      TypeVar, @@ -15,11 +16,9 @@ from typing import (      Any,      Generic,      cast, -    overload,  )  from ._cru import CRU -from ._meta import CruDecCreators  from ._const import CruNotFound  _P = ParamSpec("_P") @@ -28,208 +27,221 @@ _O = TypeVar("_O")  _V = TypeVar("_V")  _R = TypeVar("_R") -CanBeList: TypeAlias = Iterable[_T] | _T | None -ElementOperation: TypeAlias = Callable[[_T], Any] -ElementPredicate: TypeAlias = Callable[[_T], bool] -AnyElementPredicate: TypeAlias = ElementPredicate[Any] -ElementTransformer: TypeAlias = Callable[[_T], _O] -SelfElementTransformer: TypeAlias = ElementTransformer[_T, _T] -AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] +class _Generic: +    class StepActionKind(Enum): +        SKIP = 0 +        PUSH = 1 +        STOP = 2 +        AGGREGATE = 3 +    @dataclass +    class StepAction(Generic[_V, _R]): +        value: Iterable[Self] | _V | _R | None +        kind: _Generic.StepActionKind -class _StepActionKind(Enum): -    SKIP = 0 -    PUSH = 1 -    STOP = 2 -    AGGREGATE = 3 +        @property +        def push_value(self) -> _V: +            assert self.kind == _Generic.StepActionKind.PUSH +            return cast(_V, self.value) +        @property +        def stop_value(self) -> _R: +            assert self.kind == _Generic.StepActionKind.STOP +            return cast(_R, self.value) -@dataclass -class _StepAction(Generic[_V, _R]): -    value: Iterable[Self] | _V | _R | None -    kind: _StepActionKind +        @staticmethod +        def skip() -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction(None, _Generic.StepActionKind.SKIP) -    @property -    def push_value(self) -> _V: -        assert self.kind == _StepActionKind.PUSH -        return cast(_V, self.value) +        @staticmethod +        def push(value: _V | None) -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction(value, _Generic.StepActionKind.PUSH) -    @property -    def stop_value(self) -> _R: -        assert self.kind == _StepActionKind.STOP -        return cast(_R, self.value) +        @staticmethod +        def stop(value: _R | None = None) -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction(value, _Generic.StepActionKind.STOP) -    @staticmethod -    def skip() -> _StepAction[_V, _R]: -        return _StepAction(None, _StepActionKind.SKIP) +        @staticmethod +        def aggregate( +            *results: _Generic.StepAction[_V, _R] +        ) -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction(results, _Generic.StepActionKind.AGGREGATE) -    @staticmethod -    def push(value: _V | None) -> _StepAction[_V, _R]: -        return _StepAction(value, _StepActionKind.PUSH) +        @staticmethod +        def push_last(value: _V | None) -> _Generic.StepAction[_V, _R]: +            return _Generic.StepAction.aggregate( +                _Generic.StepAction.push(value), _Generic.StepAction.stop() +            ) -    @staticmethod -    def stop(value: _R | None = None) -> _StepAction[_V, _R]: -        return _StepAction(value, _StepActionKind.STOP) +        def flatten(self) -> Iterable[Self]: +            return _Generic.flatten( +                self, +                is_leave=lambda r: r.kind != _Generic.StepActionKind.AGGREGATE, +                get_children=lambda r: cast(Iterable[Self], r.value), +            ) + +    GeneralStepAction: TypeAlias = StepAction[_V, _R] | _V | _R | None +    IterateOperation: TypeAlias = Callable[[_T, int], GeneralStepAction[_V, _R]] +    IteratePreHook: TypeAlias = Callable[[Iterable[_T]], GeneralStepAction[_V, _R]] +    IteratePostHook: TypeAlias = Callable[[int], GeneralStepAction[_V, _R]]      @staticmethod -    def aggregate(*results: _StepAction[_V, _R]) -> _StepAction[_V, _R]: -        return _StepAction(results, _StepActionKind.AGGREGATE) +    def _is_not_iterable(o: Any) -> bool: +        return not isinstance(o, Iterable)      @staticmethod -    def push_last(value: _V | None) -> _StepAction[_V, _R]: -        return _StepAction.aggregate(_StepAction.push(value), _StepAction.stop()) - -    def flatten(self) -> Iterable[Self]: -        return CruIterableMeta.flatten( -            self, -            is_leave=lambda r: r.kind != _StepActionKind.AGGREGATE, -            get_children=lambda r: cast(Iterable[Self], r.value), -        ) +    def _return_self(o): +        return o +    @staticmethod +    def iterable_flatten( +        maybe_iterable: Iterable[_T] | _T, max_depth: int = -1, *, _depth: int = 0 +    ) -> Iterable[Iterable[_T] | _T]: +        if _depth == max_depth or not isinstance(maybe_iterable, Iterable): +            yield maybe_iterable +            return -_GeneralStepAction: TypeAlias = _StepAction[_V, _R] | _V | _R | None -_IterateOperation = Callable[[_T, int], _GeneralStepAction[_V, _R]] -_IteratePreHook = Callable[[Iterable[_T]], _GeneralStepAction[_V, _R]] -_IteratePostHook = Callable[[int], _GeneralStepAction[_V, _R]] +        for child in maybe_iterable: +            yield from _Generic.iterable_flatten( +                child, +                max_depth, +                _depth=_depth + 1, +            ) +    @staticmethod +    def flatten( +        o: _O, +        max_depth: int = -1, +        /, +        is_leave: _Wrapper.ElementPredicate[_O] = _is_not_iterable, +        get_children: _Wrapper.ElementTransformer[_O, Iterable[_O]] = _return_self, +        *, +        _depth: int = 0, +    ) -> Iterable[_O]: +        if _depth == max_depth or is_leave(o): +            yield o +            return +        for child in get_children(o): +            yield from _Generic.flatten( +                child, +                max_depth, +                is_leave, +                get_children, +                _depth=_depth + 1, +            ) -class CruIterableMeta: -    class Generic: -        StepActionKind = _StepActionKind -        StepAction = _StepAction -        GeneralStepAction = _GeneralStepAction -        IterateOperation = _IterateOperation -        IteratePreHook = _IteratePreHook -        IteratePostHook = _IteratePostHook +    class Results: +        @staticmethod +        def true(_) -> Literal[True]: +            return True -        class Results: -            @staticmethod -            def true(_) -> Literal[True]: -                return True +        @staticmethod +        def false(_) -> Literal[False]: +            return False -            @staticmethod -            def false(_) -> Literal[False]: -                return False +        @staticmethod +        def not_found(_) -> Literal[CruNotFound.VALUE]: +            return CruNotFound.VALUE -            @staticmethod -            def not_found(_) -> Literal[CruNotFound.VALUE]: -                return CruNotFound.VALUE +    @staticmethod +    def _non_result_to_push(value: Any) -> StepAction[_V, _R]: +        return _Generic.StepAction.push(value) -        @staticmethod -        def _non_result_to_push(value: Any) -> _StepAction[_V, _R]: -            return _StepAction.push(value) +    @staticmethod +    def _non_result_to_stop(value: Any) -> StepAction[_V, _R]: +        return _Generic.StepAction.stop(value) -        @staticmethod -        def _non_result_to_stop(value: Any) -> _StepAction[_V, _R]: -            return _StepAction.stop(value) +    @staticmethod +    def _none_hook(_: Any) -> StepAction[_V, _R]: +        return _Generic.StepAction.skip() -        @staticmethod -        def _none_hook(_: Any) -> _StepAction[_V, _R]: -            return _StepAction.skip() - -        def iterate( -            iterable: Iterable[_T], -            operation: _IterateOperation[_T, _V, _R], -            fallback_return: _R, -            pre_iterate: _IteratePreHook[_T, _V, _R], -            post_iterate: _IteratePostHook[_V, _R], -            convert_value_result: Callable[[_V | _R | None], _StepAction[_V, _R]], -        ) -> Generator[_V, None, _R]: -            pre_result = pre_iterate(iterable) -            if not isinstance(pre_result, _StepAction): -                real_pre_result = convert_value_result(pre_result) -            for r in real_pre_result.flatten(): -                if r.kind == _StepActionKind.STOP: -                    return r.stop_value -                elif r.kind == _StepActionKind.PUSH: -                    yield r.push_value -                else: -                    assert r.kind == _StepActionKind.SKIP - -            for index, element in enumerate(iterable): -                result = operation(element, index) -                if not isinstance(result, _StepAction): -                    real_result = convert_value_result(result) -                for r in real_result.flatten(): -                    if r.kind == _StepActionKind.STOP: -                        return r.stop_value -                    elif r.kind == _StepActionKind.PUSH: -                        yield r.push_value -                    else: -                        assert r.kind == _StepActionKind.SKIP -                        continue - -            post_result = post_iterate(index + 1) -            if not isinstance(post_result, _StepAction): -                real_post_result = convert_value_result(post_result) -            for r in real_post_result.flatten(): -                if r.kind == _StepActionKind.STOP: +    def iterate( +        iterable: Iterable[_T], +        operation: IterateOperation[_T, _V, _R], +        fallback_return: _R, +        pre_iterate: IteratePreHook[_T, _V, _R], +        post_iterate: IteratePostHook[_V, _R], +        convert_value_result: Callable[[_V | _R | None], StepAction[_V, _R]], +    ) -> Generator[_V, None, _R]: +        pre_result = pre_iterate(iterable) +        if not isinstance(pre_result, _Generic.StepAction): +            real_pre_result = convert_value_result(pre_result) +        for r in real_pre_result.flatten(): +            if r.kind == _Generic.StepActionKind.STOP: +                return r.stop_value +            elif r.kind == _Generic.StepActionKind.PUSH: +                yield r.push_value +            else: +                assert r.kind == _Generic.StepActionKind.SKIP + +        for index, element in enumerate(iterable): +            result = operation(element, index) +            if not isinstance(result, _Generic.StepAction): +                real_result = convert_value_result(result) +            for r in real_result.flatten(): +                if r.kind == _Generic.StepActionKind.STOP:                      return r.stop_value -                elif r.kind == _StepActionKind.PUSH: +                elif r.kind == _Generic.StepActionKind.PUSH:                      yield r.push_value                  else: -                    assert r.kind == _StepActionKind.SKIP - -            return fallback_return - -        def create_new( -            iterable: Iterable[_T], -            operation: _IterateOperation[_T, _V, _R], -            fallback_return: _R, -            /, -            pre_iterate: _IteratePreHook[_T, _V, _R] | None = None, -            post_iterate: _IteratePostHook[_V, _R] | None = None, -        ) -> Generator[_V, None, _R]: -            return CruIterableMeta.Generic.iterate( +                    assert r.kind == _Generic.StepActionKind.SKIP +                    continue + +        post_result = post_iterate(index + 1) +        if not isinstance(post_result, _Generic.StepAction): +            real_post_result = convert_value_result(post_result) +        for r in real_post_result.flatten(): +            if r.kind == _Generic.StepActionKind.STOP: +                return r.stop_value +            elif r.kind == _Generic.StepActionKind.PUSH: +                yield r.push_value +            else: +                assert r.kind == _Generic.StepActionKind.SKIP + +        return fallback_return + +    def create_new( +        iterable: Iterable[_T], +        operation: IterateOperation[_T, _V, _R], +        fallback_return: _R, +        /, +        pre_iterate: IteratePreHook[_T, _V, _R] | None = None, +        post_iterate: IteratePostHook[_V, _R] | None = None, +    ) -> Generator[_V, None, _R]: +        return _Generic.iterate( +            iterable, +            operation, +            fallback_return, +            pre_iterate or _Generic._none_hook, +            post_iterate or _Generic._none_hook, +            _Generic._non_result_to_push, +        ) + +    def get_result( +        iterable: Iterable[_T], +        operation: IterateOperation[_T, _V, _R], +        fallback_return: _R, +        /, +        pre_iterate: IteratePreHook[_T, _V, _R] | None = None, +        post_iterate: IteratePostHook[_V, _R] | None = None, +    ) -> _R: +        try: +            for _ in _Generic.iterate(                  iterable,                  operation,                  fallback_return, -                pre_iterate or CruIterableMeta.Generic._none_hook, -                post_iterate or CruIterableMeta.Generic._none_hook, -                CruIterableMeta.Generic._non_result_to_push, -            ) +                pre_iterate or _Generic._none_hook, +                post_iterate or _Generic._none_hook, +                _Generic._non_result_to_stop, +            ): +                pass +        except StopIteration as stop: +            return stop.value +        raise RuntimeError("Should not reach here") -        def get_result( -            iterable: Iterable[_T], -            operation: _IterateOperation[_T, _V, _R], -            fallback_return: _R, -            /, -            pre_iterate: _IteratePreHook[_T, _V, _R] | None = None, -            post_iterate: _IteratePostHook[_V, _R] | None = None, -        ) -> _R: -            try: -                for _ in CruIterableMeta.Generic.iterate( -                    iterable, -                    operation, -                    fallback_return, -                    pre_iterate or CruIterableMeta.Generic._none_hook, -                    post_iterate or CruIterableMeta.Generic._none_hook, -                    CruIterableMeta.Generic._non_result_to_stop, -                ): -                    pass -            except StopIteration as stop: -                return stop.value -            raise RuntimeError("Should not reach here") - -    class Creators: -        @staticmethod -        def empty() -> Iterable[_T]: -            return iter([]) - -        @staticmethod -        def range(*args) -> Iterable[int]: -            return iter(range(*args)) - -        @staticmethod -        def unite(*args: _O) -> Iterable[_O]: -            return iter(args) - -        @staticmethod -        def concat(*iterables: Iterable[_T]) -> Iterable[_T]: -            for iterable in iterables: -                yield from iterable +class _Helper:      @staticmethod      def with_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]:          count = 0 @@ -242,78 +254,139 @@ class CruIterableMeta:          return wrapper + +class _Dec: + +    @staticmethod +    def wrap(origin: Callable[_P, Iterable[_T]]) -> Callable[_P, _Wrapper[_T]]: +        def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_T]: +            return _Wrapper(origin(*args, **kwargs)) + +        return _wrapped + + +class _Creators:      @staticmethod -    def to_set(iterable: Iterable[_T], discard: Iterable[Any]) -> set[_T]: -        return set(iterable) - set(discard) +    @_Dec.wrap +    def empty() -> Iterable[Never]: +        return iter([])      @staticmethod -    def to_list(iterable: Iterable[_T], discard: Iterable[Any]) -> list[_T]: -        return [v for v in iterable if v not in set(discard)] +    @_Dec.wrap +    def range(*args) -> Iterable[int]: +        return iter(range(*args))      @staticmethod -    def all(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> bool: -        for value in iterable: +    @_Dec.wrap +    def unite(*args: _O) -> Iterable[_O]: +        return iter(args) + +    @staticmethod +    @_Dec.wrap +    def concat(*iterables: Iterable[_T]) -> Iterable[_T]: +        for iterable in iterables: +            yield from iterable + + +class _Wrapper(Generic[_T]): +    ElementOperation: TypeAlias = Callable[[_V], Any] +    ElementPredicate: TypeAlias = Callable[[_V], bool] +    AnyElementPredicate: TypeAlias = ElementPredicate[Any] +    ElementTransformer: TypeAlias = Callable[[_V], _O] +    SelfElementTransformer: TypeAlias = ElementTransformer[_V, _V] +    AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any] + +    def __init__( +        self, +        iterable: Iterable[_T], +    ) -> None: +        self._iterable = iterable + +    def __iter__(self) -> Iterator[_T]: +        return self._iterable.__iter__() + +    @property +    def me(self) -> Iterable[_T]: +        return self._iterable + +    _wrap = _Dec.wrap + +    @_wrap +    def replace_me(self, iterable: Iterable[_O]) -> Iterable[_O]: +        return iterable + +    def replace_me_with_empty(self) -> _Wrapper[Never]: +        return _Creators.empty() + +    def replace_me_with_range(self, *args) -> _Wrapper[int]: +        return _Creators.range(*args) + +    def replace_me_with_unite(self, *args: _O) -> _Wrapper[_O]: +        return _Creators.unite(*args) + +    def replace_me_with_concat(self, *iterables: Iterable[_T]) -> _Wrapper[_T]: +        return _Creators.concat(*iterables) + +    def to_set(self, discard: Iterable[Any]) -> set[_T]: +        return set(self.me) - set(discard) + +    def to_list(self, discard: Iterable[Any]) -> list[_T]: +        return [v for v in self.me if v not in set(discard)] + +    def all(self, predicate: ElementPredicate[_T]) -> bool: +        for value in self.me:              if not predicate(value):                  return False          return True -    @staticmethod -    def any(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> bool: -        for value in iterable: +    def any(self, predicate: ElementPredicate[_T]) -> bool: +        for value in self.me:              if predicate(value):                  return True          return False -    @staticmethod -    def foreach(iterable: Iterable[_T], operation: ElementOperation[_T]) -> None: -        for value in iterable: +    def foreach(self, operation: ElementOperation[_T]) -> None: +        for value in self.me:              operation(value) -    @staticmethod -    def transform( -        iterable: Iterable[_T], transformer: ElementTransformer[_T, _O] -    ) -> Iterable[_O]: -        for value in iterable: +    @_wrap +    def transform(self, transformer: ElementTransformer[_T, _O]) -> Iterable[_O]: +        for value in self.me:              yield transformer(value) -    @staticmethod -    def filter(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> Iterable[_T]: -        for value in iterable: +    map = transform + +    @_wrap +    def filter(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: +        for value in self.me:              if predicate(value):                  yield value -    @staticmethod -    def continue_if( -        iterable: Iterable[_T], predicate: ElementPredicate[_T] -    ) -> Iterable[_T]: -        for value in iterable: +    @_wrap +    def continue_if(self, predicate: ElementPredicate[_T]) -> Iterable[_T]: +        for value in self.me:              yield value              if not predicate(value):                  break -    @staticmethod -    def first_n(iterable: Iterable[_T], max_count: int) -> Iterable[_T]: +    def first_n(self, max_count: int) -> _Wrapper[_T]:          if max_count < 0:              raise ValueError("max_count must be 0 or positive.")          if max_count == 0: -            return CruIterableMeta.Creators.empty() -        return CruIterableMeta.continue_if( -            iterable, _with_count(lambda i, _: i < max_count - 1) -        ) +            return self.replace_me_with_empty()  # type: ignore +        return self.continue_if(_Helper.with_count(lambda i, _: i < max_count - 1)) -    @staticmethod -    def drop_n(iterable: Iterable[_T], n: int) -> Iterable[_T]: +    def drop_n(self, n: int) -> _Wrapper[_T]:          if n < 0:              raise ValueError("n must be 0 or positive.")          if n == 0: -            return iterable -        return CruIterableMeta.filter(iterable, _with_count(lambda i, _: i < n)) +            return self +        return self.filter(_Helper.with_count(lambda i, _: i < n)) -    @staticmethod      def single_or( -        iterable: Iterable[_T], fallback: _O | CruNotFound | None = CruNotFound.VALUE +        self, fallback: _O | CruNotFound | None = CruNotFound.VALUE      ) -> _T | _O | Any | CruNotFound: -        first_2 = CruIterableMeta.first_n(iterable, 2) +        first_2 = self.first_n(2)          has_value = False          value = None          for element in first_2: @@ -326,181 +399,34 @@ class CruIterableMeta:          else:              return fallback -    @staticmethod -    def _is_not_iterable(o: Any) -> bool: -        return not isinstance(o, Iterable) - -    @staticmethod -    def _return_self(o): -        return o - -    @staticmethod -    @overload -    def flatten(o: Iterable[_T], max_depth: int = -1) -> Iterable[_T]: ... - -    @staticmethod -    @overload -    def flatten( -        o: _O, -        max_depth: int = -1, -        *, -        is_leave: ElementPredicate[_O], -        get_children: ElementTransformer[_O, Iterable[_O]], -    ) -> Iterable[_O]: ... - -    @staticmethod -    def flatten( -        o: _O, -        max_depth: int = -1, -        *, -        is_leave: ElementPredicate[_O] = _is_not_iterable, -        get_children: ElementTransformer[_O, Iterable[_O]] = _return_self, -        _depth: int = 0, -    ) -> Iterable[Any]: -        if _depth == max_depth or is_leave(o): -            yield o -            return -        for child in get_children(o): -            yield from CruIterableMeta.flatten( -                child, -                max_depth, -                is_leave=is_leave, -                get_children=get_children, -                _depth=_depth + 1, -            )  # type: ignore - - -_with_count = CruIterableMeta.with_count - - -class CruIterableWrapper(Generic[_T]): -    Meta = CruIterableMeta - -    class Dec: -        @staticmethod -        def _wrap(iterable: Iterable[_O]) -> CruIterableWrapper[_O]: -            return CruIterableWrapper(iterable) - -        wrap = CruDecCreators.convert_result(_wrap) - -        @staticmethod -        def _as_iterable(_self: CruIterableWrapper[_O]) -> CruIterableWrapper[_O]: -            return _self - -        meta = CruDecCreators.create_implement_by(_as_iterable) -        meta_no_self = CruDecCreators.create_implement_by_no_self() - -    def __init__( -        self, -        iterable: Iterable[_T], -    ) -> None: -        self._iterable = iterable - -    def __iter__(self) -> Iterator[_T]: -        return self._iterable.__iter__() - -    @property -    def me(self) -> Iterable[_T]: -        return self._iterable - -    def replace_me(self, iterable: Iterable[_O]) -> CruIterableWrapper[_O]: -        return CruIterableWrapper(iterable) - -    @Dec.wrap -    @Dec.meta_no_self(CruIterableMeta.Creators.empty) -    def replace_me_with_empty(self) -> None: -        pass - -    @Dec.wrap -    @Dec.meta_no_self(CruIterableMeta.Creators.range) -    def replace_me_with_range(self) -> None: -        pass - -    @Dec.wrap -    @Dec.meta_no_self(CruIterableMeta.Creators.unite) -    def replace_me_with_unite(self) -> None: -        pass +    @_wrap +    def flatten(self) -> Iterable[_T | Iterable[_T]]: +        return _Generic.iterable_flatten(self.me) -    @Dec.wrap -    @Dec.meta_no_self(CruIterableMeta.Creators.concat) -    def replace_me_with_concat(self) -> None: -        pass - -    @Dec.meta(CruIterableMeta.to_set) -    def to_set(self) -> None: -        pass - -    @Dec.meta(CruIterableMeta.to_list) -    def to_list(self) -> None: -        pass - -    @Dec.meta(CruIterableMeta.all) -    def all(self) -> None: -        pass - -    @Dec.meta(CruIterableMeta.any) -    def any(self) -> None: -        pass - -    @Dec.meta(CruIterableMeta.foreach) -    def foreach(self) -> None: -        pass - -    @Dec.wrap -    @Dec.meta(CruIterableMeta.transform) -    def transform(self) -> None: -        pass - -    map = transform - -    @Dec.wrap -    @Dec.meta(CruIterableMeta.filter) -    def filter(self) -> None: -        pass - -    @Dec.wrap -    @Dec.meta(CruIterableMeta.continue_if) -    def continue_if(self) -> None: -        pass - -    @Dec.wrap -    @Dec.meta(CruIterableMeta.first_n) -    def first_n(self) -> None: -        pass - -    @Dec.wrap -    @Dec.meta(CruIterableMeta.drop_n) -    def drop_n(self) -> None: -        pass - -    @Dec.wrap -    @Dec.meta(CruIterableMeta.flatten) -    def flatten(self) -> None: -        pass - -    @Dec.meta(CruIterableMeta.single_or) -    def single_or(self) -> None: -        pass - -    @Dec.wrap -    def select_by_indices(self, indices: Iterable[int]) -> Iterable[_T]: +    def select_by_indices(self, indices: Iterable[int]) -> _Wrapper[_T]:          index_set = set(indices)          max_index = max(index_set)          return self.first_n(max_index + 1).filter( -            _with_count(lambda i, _: i in index_set) +            _Helper.with_count(lambda i, _: i in index_set)          ) -    @Dec.wrap -    def remove_values(self, values: Iterable[Any]) -> Iterable[_V]: +    def remove_values(self, values: Iterable[Any]) -> _Wrapper[_T]:          value_set = set(values)          return self.filter(lambda v: v not in value_set) -    @Dec.wrap      def replace_values(          self, old_values: Iterable[Any], new_value: _O -    ) -> Iterable[_V | _O]: +    ) -> Iterable[_T | _O]:          value_set = set(old_values) -        return self.map(lambda v: new_value if v in value_set else v) +        return self.transform(lambda v: new_value if v in value_set else v) + + +class CruIterable: +    Decorators = _Dec +    Generic = _Generic +    Helper = _Helper +    Creators = _Creators +    Wrapper = _Wrapper -CRU.add_objects(CruIterableMeta, CruIterableWrapper) +CRU.add_objects(CruIterable, _Wrapper) diff --git a/tools/cru-py/cru/_meta.py b/tools/cru-py/cru/_meta.py deleted file mode 100644 index 86763cf..0000000 --- a/tools/cru-py/cru/_meta.py +++ /dev/null @@ -1,92 +0,0 @@ -from collections.abc import Callable -from typing import Concatenate, Generic, ParamSpec, TypeVar - -_P = ParamSpec("_P") -_T = TypeVar("_T") -_O = TypeVar("_O") -_R = TypeVar("_R") - - -class CruDecorator: - -    class ImplementedBy(Generic[_O, _P, _R]): -        # TODO: Continue here tomorrow! -        def __init__( -            self, impl: Callable[Concatenate[_O, _P], _R], pass_self: bool -        ) -> None: -            self.impl = impl -            self.pass_self = pass_self - -        def __call__( -            self, _origin: Callable[[_T], None] -        ) -> Callable[Concatenate[_T, _P], _R]: -            def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: -                if self.pass_self: -                    return self.impl(_self, *args, **kwargs) -                else: -                    return self.impl(*args, **kwargs) - -            return real_impl - -    @staticmethod -    def convert_result( -        converter: Callable[[_T], _O] -    ) -> Callable[[Callable[_P, _T]], Callable[_P, _O]]: -        def dec(original: Callable[_P, _T]) -> Callable[_P, _O]: -            def wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _O: -                return converter(original(*args, **kwargs)) - -            return wrapped - -        return dec - -    @staticmethod -    def create_implement_by(converter: Callable[[_T], _O]) -> Callable[ -        [Callable[Concatenate[_O, _P], _R]], -        Callable[ -            [Callable[[_T], None]], -            Callable[Concatenate[_T, _P], _R], -        ], -    ]: -        def implement_by( -            m: Callable[Concatenate[_O, _P], _R], -        ) -> Callable[ -            [Callable[[_T], None]], -            Callable[Concatenate[_T, _P], _R], -        ]: -            def implementation(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: -                return m(converter(_self), *args, **kwargs) - -            def decorator(_: Callable[[_T], None]) -> Callable[Concatenate[_T, _P], _R]: -                return implementation - -            return decorator - -        return implement_by - -    @staticmethod -    def create_implement_by_no_self() -> Callable[ -        [Callable[_P, _R]], -        Callable[ -            [Callable[[_T], None]], -            Callable[Concatenate[_T, _P], _R], -        ], -    ]: -        def implement_by_no_self( -            m: Callable[_P, _R], -        ) -> Callable[ -            [Callable[[_T], None]], -            Callable[Concatenate[_T, _P], _R], -        ]: -            def implementation(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R: -                return m(*args, **kwargs) - -            def decorator(_: Callable[[_T], None]) -> Callable[Concatenate[_T, _P], _R]: -                return implementation - -            return decorator - -        return implement_by_no_self - - -CRU.add | 
