aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcrupest <crupest@outlook.com>2024-11-11 01:12:29 +0800
committerYuqian Yang <crupest@crupest.life>2024-12-18 18:31:27 +0800
commite710a36af2775118c514954252537ae37f11996c (patch)
tree2195f5fe971a62fc253d021983753a18f9c81d10
parent7fbc091ae8ed2a4bac215ee04d7c4b5d0218af4e (diff)
downloadcrupest-e710a36af2775118c514954252537ae37f11996c.tar.gz
crupest-e710a36af2775118c514954252537ae37f11996c.tar.bz2
crupest-e710a36af2775118c514954252537ae37f11996c.zip
HALF WORK: 2024.11.29
-rw-r--r--tools/cru-py/cru/_decorator.py97
-rw-r--r--tools/cru-py/cru/_func.py262
-rw-r--r--tools/cru-py/cru/_iter.py674
-rw-r--r--tools/cru-py/cru/_meta.py92
4 files changed, 487 insertions, 638 deletions
diff --git a/tools/cru-py/cru/_decorator.py b/tools/cru-py/cru/_decorator.py
new file mode 100644
index 0000000..432ceca
--- /dev/null
+++ b/tools/cru-py/cru/_decorator.py
@@ -0,0 +1,97 @@
+from __future__ import annotations
+
+from collections.abc import Callable
+from typing import (
+ Concatenate,
+ Generic,
+ ParamSpec,
+ TypeVar,
+ cast,
+)
+
+from ._cru import CRU
+
+_P = ParamSpec("_P")
+_T = TypeVar("_T")
+_O = TypeVar("_O")
+_R = TypeVar("_R")
+
+
+class CruDecorator:
+
+ class ConvertResult(Generic[_T, _O]):
+ def __init__(
+ self,
+ converter: Callable[[_T], _O],
+ ) -> None:
+ self.converter = converter
+
+ def __call__(self, origin: Callable[_P, _T]) -> Callable[_P, _O]:
+ converter = self.converter
+
+ def real_impl(*args: _P.args, **kwargs: _P.kwargs) -> _O:
+ return converter(origin(*args, **kwargs))
+
+ return real_impl
+
+ class ImplementedBy(Generic[_T, _O, _P, _R]):
+ def __init__(
+ self,
+ impl: Callable[Concatenate[_O, _P], _R],
+ converter: Callable[[_T], _O],
+ ) -> None:
+ self.impl = impl
+ self.converter = converter
+
+ def __call__(
+ self, _origin: Callable[[_T], None]
+ ) -> Callable[Concatenate[_T, _P], _R]:
+ converter = self.converter
+ impl = self.impl
+
+ def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R:
+ return cast(Callable[Concatenate[_O, _P], _R], impl)(
+ converter(_self), *args, **kwargs
+ )
+
+ return real_impl
+
+ @staticmethod
+ def create_factory(converter: Callable[[_T], _O]) -> Callable[
+ [Callable[Concatenate[_O, _P], _R]],
+ CruDecorator.ImplementedBy[_T, _O, _P, _R],
+ ]:
+ def create(
+ m: Callable[Concatenate[_O, _P], _R],
+ ) -> CruDecorator.ImplementedBy[_T, _O, _P, _R]:
+ return CruDecorator.ImplementedBy(m, converter)
+
+ return create
+
+ class ImplementedByNoSelf(Generic[_P, _R]):
+ def __init__(self, impl: Callable[_P, _R]) -> None:
+ self.impl = impl
+
+ def __call__(
+ self, _origin: Callable[[_T], None]
+ ) -> Callable[Concatenate[_T, _P], _R]:
+ impl = self.impl
+
+ def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R:
+ return cast(Callable[_P, _R], impl)(*args, **kwargs)
+
+ return real_impl
+
+ @staticmethod
+ def create_factory() -> (
+ Callable[[Callable[_P, _R]], CruDecorator.ImplementedByNoSelf[_P, _R]]
+ ):
+ def create(
+ m: Callable[_P, _R],
+ ) -> CruDecorator.ImplementedByNoSelf[_P, _R]:
+ return CruDecorator.ImplementedByNoSelf(m)
+
+ return create
+
+
+CRU.add_objects(CruDecorator)
diff --git a/tools/cru-py/cru/_func.py b/tools/cru-py/cru/_func.py
index ef3da72..1b437be 100644
--- a/tools/cru-py/cru/_func.py
+++ b/tools/cru-py/cru/_func.py
@@ -10,59 +10,77 @@ from typing import (
ParamSpec,
TypeAlias,
TypeVar,
- cast,
)
+
from ._cru import CRU
from ._const import CruPlaceholder
_P = ParamSpec("_P")
+_P1 = ParamSpec("_P1")
_T = TypeVar("_T")
-_ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]]
-_KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]]
-_ChainableCallable: TypeAlias = Callable[
- ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]]
-]
+class _Dec:
+ @staticmethod
+ def wrap(
+ origin: Callable[_P, Callable[_P1, _T]]
+ ) -> Callable[_P, _Wrapper[_P1, _T]]:
+ def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_P1, _T]:
+ return _Wrapper(origin(*args, **kwargs))
+ return _wrapped
-class CruFunctionMeta:
- class Base:
- @staticmethod
- def none(*_v, **_kwargs) -> None:
- return None
- @staticmethod
- def true(*_v, **_kwargs) -> Literal[True]:
- return True
+class _RawBase:
+ @staticmethod
+ def none(*_v, **_kwargs) -> None:
+ return None
- @staticmethod
- def false(*_v, **_kwargs) -> Literal[False]:
- return False
+ @staticmethod
+ def true(*_v, **_kwargs) -> Literal[True]:
+ return True
- @staticmethod
- def identity(v: _T) -> _T:
- return v
+ @staticmethod
+ def false(*_v, **_kwargs) -> Literal[False]:
+ return False
- @staticmethod
- def only_you(v: _T, *_v, **_kwargs) -> _T:
- return v
+ @staticmethod
+ def identity(v: _T) -> _T:
+ return v
- @staticmethod
- def equal(a: Any, b: Any) -> bool:
- return a == b
+ @staticmethod
+ def only_you(v: _T, *_v, **_kwargs) -> _T:
+ return v
- @staticmethod
- def not_equal(a: Any, b: Any) -> bool:
- return a != b
+ @staticmethod
+ def equal(a: Any, b: Any) -> bool:
+ return a == b
- @staticmethod
- def not_(v: Any) -> Any:
- return not v
+ @staticmethod
+ def not_equal(a: Any, b: Any) -> bool:
+ return a != b
@staticmethod
- def bind(func: Callable[..., _T], *bind_args, **bind_kwargs) -> Callable[..., _T]:
+ def not_(v: Any) -> Any:
+ return not v
+
+
+class _Wrapper(Generic[_P, _T]):
+ def __init__(self, f: Callable[_P, _T]):
+ self._f = f
+
+ @property
+ def me(self) -> Callable[_P, _T]:
+ return self._f
+
+ def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T:
+ return self._f(*args, **kwargs)
+
+ @_Dec.wrap
+ def bind(self, *bind_args, **bind_kwargs) -> Callable[..., _T]:
+ func = self.me
+
def bound_func(*args, **kwargs):
popped = 0
real_args = []
@@ -82,174 +100,74 @@ class CruFunctionMeta:
KWARGS = auto()
BOTH = ARGS | KWARGS
- ArgsChainableCallable = _ArgsChainableCallable
- KwargsChainableCallable = _KwargsChainableCallable
- ChainableCallable = _ChainableCallable
+ ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]]
+ KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]]
+ ChainableCallable: TypeAlias = Callable[
+ ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]]
+ ]
- @staticmethod
+ @_Dec.wrap
def chain_with_args(
- funcs: Iterable[_ArgsChainableCallable], *bind_args, **bind_kwargs
- ) -> _ArgsChainableCallable:
+ self, funcs: Iterable[ArgsChainableCallable], *bind_args, **bind_kwargs
+ ) -> ArgsChainableCallable:
def chained_func(*args):
+ args = self.bind(*bind_args, **bind_kwargs)(*args)
+
for func in funcs:
- args = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(*args)
+ args = _Wrapper(func).bind(*bind_args, **bind_kwargs)(*args)
return args
return chained_func
- @staticmethod
+ @_Dec.wrap
def chain_with_kwargs(
- funcs: Iterable[_KwargsChainableCallable], *bind_args, **bind_kwargs
- ) -> _KwargsChainableCallable:
+ self, funcs: Iterable[KwargsChainableCallable], *bind_args, **bind_kwargs
+ ) -> KwargsChainableCallable:
def chained_func(**kwargs):
+ kwargs = self.bind(*bind_args, **bind_kwargs)(**kwargs)
for func in funcs:
- kwargs = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(**kwargs)
+ kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(**kwargs)
return kwargs
return chained_func
- @staticmethod
+ @_Dec.wrap
def chain_with_both(
- funcs: Iterable[_ChainableCallable], *bind_args, **bind_kwargs
- ) -> _ChainableCallable:
+ self, funcs: Iterable[ChainableCallable], *bind_args, **bind_kwargs
+ ) -> ChainableCallable:
def chained_func(*args, **kwargs):
for func in funcs:
- args, kwargs = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(
+ args, kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(
*args, **kwargs
)
return args, kwargs
return chained_func
- @staticmethod
- def chain(
- mode: ChainMode,
- funcs: Iterable[
- _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable
- ],
- *bind_args,
- **bind_kwargs,
- ) -> _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable:
- if mode == CruFunctionMeta.ChainMode.ARGS:
- return CruFunctionMeta.chain_with_args(
- cast(Iterable[_ArgsChainableCallable], funcs),
- *bind_args,
- **bind_kwargs,
- )
- elif mode == CruFunctionMeta.ChainMode.KWARGS:
- return CruFunctionMeta.chain_with_kwargs(
- cast(Iterable[_KwargsChainableCallable], funcs),
- *bind_args,
- **bind_kwargs,
- )
- elif mode == CruFunctionMeta.ChainMode.BOTH:
- return CruFunctionMeta.chain_with_both(
- cast(Iterable[_ChainableCallable], funcs), *bind_args, **bind_kwargs
- )
-
-
-class CruFunction(Generic[_P, _T]):
-
- def __init__(self, f: Callable[_P, _T]):
- self._f = f
-
- @property
- def me(self) -> Callable[_P, _T]:
- return self._f
- def bind(self, *bind_args, **bind_kwargs) -> CruFunction[..., _T]:
- return CruFunction(CruFunctionMeta.bind(self._f, *bind_args, **bind_kwargs))
+class _Base:
+ none = _Wrapper(_RawBase.none)
+ true = _Wrapper(_RawBase.true)
+ false = _Wrapper(_RawBase.false)
+ identity = _Wrapper(_RawBase.identity)
+ only_you = _Wrapper(_RawBase.only_you)
+ equal = _Wrapper(_RawBase.equal)
+ not_equal = _Wrapper(_RawBase.not_equal)
+ not_ = _Wrapper(_RawBase.not_)
- def _iter_with_self(
- self, funcs: Iterable[Callable[..., Any]]
- ) -> Iterable[Callable[..., Any]]:
- yield self
- yield from funcs
+class _Creators:
@staticmethod
- def chain_with_args(
- self,
- funcs: Iterable[_ArgsChainableCallable],
- *bind_args,
- **bind_kwargs,
- ) -> _ArgsChainableCallable:
- return CruFunction(
- CruFunctionMeta.chain_with_args(
- self._iter_with_self(funcs), *bind_args, **bind_kwargs
- )
- )
+ def make_isinstance_of_types(*types: type) -> Callable:
+ return _Wrapper(lambda v: type(v) in types)
- def chain_with_kwargs(
- self, funcs: Iterable[_KwargsChainableCallable], *bind_args, **bind_kwargs
- ) -> _KwargsChainableCallable:
- return CruFunction(
- CruFunctionMeta.chain_with_kwargs(
- self._iter_with_self(funcs), *bind_args, **bind_kwargs
- )
- )
- def chain_with_both(
- self, funcs: Iterable[_ChainableCallable], *bind_args, **bind_kwargs
- ) -> _ChainableCallable:
- return CruFunction(
- CruFunctionMeta.chain_with_both(
- self._iter_with_self(funcs), *bind_args, **bind_kwargs
- )
- )
-
- def chain(
- self,
- mode: CruFunctionChainMode,
- funcs: Iterable[
- _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable
- ],
- *bind_args,
- **bind_kwargs,
- ) -> _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable:
- return CruFunction(
- CruFunctionMeta.chain(
- mode, self._iter_with_self(funcs), *bind_args, **bind_kwargs
- )
- )
+class CruFunction:
+ RawBase = _RawBase
+ Base = _Base
+ Creators = _Creators
+ Wrapper = _Wrapper
+ Decorators = _Dec
- def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T:
- return self._f(*args, **kwargs)
- @staticmethod
- def make_chain(
- mode: CruFunctionChainMode,
- funcs: Iterable[
- _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable
- ],
- *bind_args,
- **bind_kwargs,
- ) -> CruFunction:
- return CruFunction(
- CruFunctionMeta.chain(mode, funcs, *bind_args, **bind_kwargs)
- )
-
-
-class CruWrappedFunctions:
- none = CruFunction(CruRawFunctions.none)
- true = CruFunction(CruRawFunctions.true)
- false = CruFunction(CruRawFunctions.false)
- identity = CruFunction(CruRawFunctions.identity)
- only_you = CruFunction(CruRawFunctions.only_you)
- equal = CruFunction(CruRawFunctions.equal)
- not_equal = CruFunction(CruRawFunctions.not_equal)
- not_ = CruFunction(CruRawFunctions.not_)
-
-
-class CruFunctionGenerators:
- @staticmethod
- def make_isinstance_of_types(*types: type) -> Callable:
- return CruFunction(lambda v: type(v) in types)
-
-
-CRU.add_objects(
- CruRawFunctions,
- CruFunctionMeta,
- CruFunction,
- CruWrappedFunctions,
- CruFunctionGenerators,
-)
+CRU.add_objects(CruFunction)
diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py
index da849e8..83a9513 100644
--- a/tools/cru-py/cru/_iter.py
+++ b/tools/cru-py/cru/_iter.py
@@ -8,6 +8,7 @@ from typing import (
Generator,
Iterator,
Literal,
+ Never,
Self,
TypeAlias,
TypeVar,
@@ -15,11 +16,9 @@ from typing import (
Any,
Generic,
cast,
- overload,
)
from ._cru import CRU
-from ._meta import CruDecCreators
from ._const import CruNotFound
_P = ParamSpec("_P")
@@ -28,208 +27,221 @@ _O = TypeVar("_O")
_V = TypeVar("_V")
_R = TypeVar("_R")
-CanBeList: TypeAlias = Iterable[_T] | _T | None
-ElementOperation: TypeAlias = Callable[[_T], Any]
-ElementPredicate: TypeAlias = Callable[[_T], bool]
-AnyElementPredicate: TypeAlias = ElementPredicate[Any]
-ElementTransformer: TypeAlias = Callable[[_T], _O]
-SelfElementTransformer: TypeAlias = ElementTransformer[_T, _T]
-AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any]
+class _Generic:
+ class StepActionKind(Enum):
+ SKIP = 0
+ PUSH = 1
+ STOP = 2
+ AGGREGATE = 3
+ @dataclass
+ class StepAction(Generic[_V, _R]):
+ value: Iterable[Self] | _V | _R | None
+ kind: _Generic.StepActionKind
-class _StepActionKind(Enum):
- SKIP = 0
- PUSH = 1
- STOP = 2
- AGGREGATE = 3
+ @property
+ def push_value(self) -> _V:
+ assert self.kind == _Generic.StepActionKind.PUSH
+ return cast(_V, self.value)
+ @property
+ def stop_value(self) -> _R:
+ assert self.kind == _Generic.StepActionKind.STOP
+ return cast(_R, self.value)
-@dataclass
-class _StepAction(Generic[_V, _R]):
- value: Iterable[Self] | _V | _R | None
- kind: _StepActionKind
+ @staticmethod
+ def skip() -> _Generic.StepAction[_V, _R]:
+ return _Generic.StepAction(None, _Generic.StepActionKind.SKIP)
- @property
- def push_value(self) -> _V:
- assert self.kind == _StepActionKind.PUSH
- return cast(_V, self.value)
+ @staticmethod
+ def push(value: _V | None) -> _Generic.StepAction[_V, _R]:
+ return _Generic.StepAction(value, _Generic.StepActionKind.PUSH)
- @property
- def stop_value(self) -> _R:
- assert self.kind == _StepActionKind.STOP
- return cast(_R, self.value)
+ @staticmethod
+ def stop(value: _R | None = None) -> _Generic.StepAction[_V, _R]:
+ return _Generic.StepAction(value, _Generic.StepActionKind.STOP)
- @staticmethod
- def skip() -> _StepAction[_V, _R]:
- return _StepAction(None, _StepActionKind.SKIP)
+ @staticmethod
+ def aggregate(
+ *results: _Generic.StepAction[_V, _R]
+ ) -> _Generic.StepAction[_V, _R]:
+ return _Generic.StepAction(results, _Generic.StepActionKind.AGGREGATE)
- @staticmethod
- def push(value: _V | None) -> _StepAction[_V, _R]:
- return _StepAction(value, _StepActionKind.PUSH)
+ @staticmethod
+ def push_last(value: _V | None) -> _Generic.StepAction[_V, _R]:
+ return _Generic.StepAction.aggregate(
+ _Generic.StepAction.push(value), _Generic.StepAction.stop()
+ )
- @staticmethod
- def stop(value: _R | None = None) -> _StepAction[_V, _R]:
- return _StepAction(value, _StepActionKind.STOP)
+ def flatten(self) -> Iterable[Self]:
+ return _Generic.flatten(
+ self,
+ is_leave=lambda r: r.kind != _Generic.StepActionKind.AGGREGATE,
+ get_children=lambda r: cast(Iterable[Self], r.value),
+ )
+
+ GeneralStepAction: TypeAlias = StepAction[_V, _R] | _V | _R | None
+ IterateOperation: TypeAlias = Callable[[_T, int], GeneralStepAction[_V, _R]]
+ IteratePreHook: TypeAlias = Callable[[Iterable[_T]], GeneralStepAction[_V, _R]]
+ IteratePostHook: TypeAlias = Callable[[int], GeneralStepAction[_V, _R]]
@staticmethod
- def aggregate(*results: _StepAction[_V, _R]) -> _StepAction[_V, _R]:
- return _StepAction(results, _StepActionKind.AGGREGATE)
+ def _is_not_iterable(o: Any) -> bool:
+ return not isinstance(o, Iterable)
@staticmethod
- def push_last(value: _V | None) -> _StepAction[_V, _R]:
- return _StepAction.aggregate(_StepAction.push(value), _StepAction.stop())
-
- def flatten(self) -> Iterable[Self]:
- return CruIterableMeta.flatten(
- self,
- is_leave=lambda r: r.kind != _StepActionKind.AGGREGATE,
- get_children=lambda r: cast(Iterable[Self], r.value),
- )
+ def _return_self(o):
+ return o
+ @staticmethod
+ def iterable_flatten(
+ maybe_iterable: Iterable[_T] | _T, max_depth: int = -1, *, _depth: int = 0
+ ) -> Iterable[Iterable[_T] | _T]:
+ if _depth == max_depth or not isinstance(maybe_iterable, Iterable):
+ yield maybe_iterable
+ return
-_GeneralStepAction: TypeAlias = _StepAction[_V, _R] | _V | _R | None
-_IterateOperation = Callable[[_T, int], _GeneralStepAction[_V, _R]]
-_IteratePreHook = Callable[[Iterable[_T]], _GeneralStepAction[_V, _R]]
-_IteratePostHook = Callable[[int], _GeneralStepAction[_V, _R]]
+ for child in maybe_iterable:
+ yield from _Generic.iterable_flatten(
+ child,
+ max_depth,
+ _depth=_depth + 1,
+ )
+ @staticmethod
+ def flatten(
+ o: _O,
+ max_depth: int = -1,
+ /,
+ is_leave: _Wrapper.ElementPredicate[_O] = _is_not_iterable,
+ get_children: _Wrapper.ElementTransformer[_O, Iterable[_O]] = _return_self,
+ *,
+ _depth: int = 0,
+ ) -> Iterable[_O]:
+ if _depth == max_depth or is_leave(o):
+ yield o
+ return
+ for child in get_children(o):
+ yield from _Generic.flatten(
+ child,
+ max_depth,
+ is_leave,
+ get_children,
+ _depth=_depth + 1,
+ )
-class CruIterableMeta:
- class Generic:
- StepActionKind = _StepActionKind
- StepAction = _StepAction
- GeneralStepAction = _GeneralStepAction
- IterateOperation = _IterateOperation
- IteratePreHook = _IteratePreHook
- IteratePostHook = _IteratePostHook
+ class Results:
+ @staticmethod
+ def true(_) -> Literal[True]:
+ return True
- class Results:
- @staticmethod
- def true(_) -> Literal[True]:
- return True
+ @staticmethod
+ def false(_) -> Literal[False]:
+ return False
- @staticmethod
- def false(_) -> Literal[False]:
- return False
+ @staticmethod
+ def not_found(_) -> Literal[CruNotFound.VALUE]:
+ return CruNotFound.VALUE
- @staticmethod
- def not_found(_) -> Literal[CruNotFound.VALUE]:
- return CruNotFound.VALUE
+ @staticmethod
+ def _non_result_to_push(value: Any) -> StepAction[_V, _R]:
+ return _Generic.StepAction.push(value)
- @staticmethod
- def _non_result_to_push(value: Any) -> _StepAction[_V, _R]:
- return _StepAction.push(value)
+ @staticmethod
+ def _non_result_to_stop(value: Any) -> StepAction[_V, _R]:
+ return _Generic.StepAction.stop(value)
- @staticmethod
- def _non_result_to_stop(value: Any) -> _StepAction[_V, _R]:
- return _StepAction.stop(value)
+ @staticmethod
+ def _none_hook(_: Any) -> StepAction[_V, _R]:
+ return _Generic.StepAction.skip()
- @staticmethod
- def _none_hook(_: Any) -> _StepAction[_V, _R]:
- return _StepAction.skip()
-
- def iterate(
- iterable: Iterable[_T],
- operation: _IterateOperation[_T, _V, _R],
- fallback_return: _R,
- pre_iterate: _IteratePreHook[_T, _V, _R],
- post_iterate: _IteratePostHook[_V, _R],
- convert_value_result: Callable[[_V | _R | None], _StepAction[_V, _R]],
- ) -> Generator[_V, None, _R]:
- pre_result = pre_iterate(iterable)
- if not isinstance(pre_result, _StepAction):
- real_pre_result = convert_value_result(pre_result)
- for r in real_pre_result.flatten():
- if r.kind == _StepActionKind.STOP:
- return r.stop_value
- elif r.kind == _StepActionKind.PUSH:
- yield r.push_value
- else:
- assert r.kind == _StepActionKind.SKIP
-
- for index, element in enumerate(iterable):
- result = operation(element, index)
- if not isinstance(result, _StepAction):
- real_result = convert_value_result(result)
- for r in real_result.flatten():
- if r.kind == _StepActionKind.STOP:
- return r.stop_value
- elif r.kind == _StepActionKind.PUSH:
- yield r.push_value
- else:
- assert r.kind == _StepActionKind.SKIP
- continue
-
- post_result = post_iterate(index + 1)
- if not isinstance(post_result, _StepAction):
- real_post_result = convert_value_result(post_result)
- for r in real_post_result.flatten():
- if r.kind == _StepActionKind.STOP:
+ def iterate(
+ iterable: Iterable[_T],
+ operation: IterateOperation[_T, _V, _R],
+ fallback_return: _R,
+ pre_iterate: IteratePreHook[_T, _V, _R],
+ post_iterate: IteratePostHook[_V, _R],
+ convert_value_result: Callable[[_V | _R | None], StepAction[_V, _R]],
+ ) -> Generator[_V, None, _R]:
+ pre_result = pre_iterate(iterable)
+ if not isinstance(pre_result, _Generic.StepAction):
+ real_pre_result = convert_value_result(pre_result)
+ for r in real_pre_result.flatten():
+ if r.kind == _Generic.StepActionKind.STOP:
+ return r.stop_value
+ elif r.kind == _Generic.StepActionKind.PUSH:
+ yield r.push_value
+ else:
+ assert r.kind == _Generic.StepActionKind.SKIP
+
+ for index, element in enumerate(iterable):
+ result = operation(element, index)
+ if not isinstance(result, _Generic.StepAction):
+ real_result = convert_value_result(result)
+ for r in real_result.flatten():
+ if r.kind == _Generic.StepActionKind.STOP:
return r.stop_value
- elif r.kind == _StepActionKind.PUSH:
+ elif r.kind == _Generic.StepActionKind.PUSH:
yield r.push_value
else:
- assert r.kind == _StepActionKind.SKIP
-
- return fallback_return
-
- def create_new(
- iterable: Iterable[_T],
- operation: _IterateOperation[_T, _V, _R],
- fallback_return: _R,
- /,
- pre_iterate: _IteratePreHook[_T, _V, _R] | None = None,
- post_iterate: _IteratePostHook[_V, _R] | None = None,
- ) -> Generator[_V, None, _R]:
- return CruIterableMeta.Generic.iterate(
+ assert r.kind == _Generic.StepActionKind.SKIP
+ continue
+
+ post_result = post_iterate(index + 1)
+ if not isinstance(post_result, _Generic.StepAction):
+ real_post_result = convert_value_result(post_result)
+ for r in real_post_result.flatten():
+ if r.kind == _Generic.StepActionKind.STOP:
+ return r.stop_value
+ elif r.kind == _Generic.StepActionKind.PUSH:
+ yield r.push_value
+ else:
+ assert r.kind == _Generic.StepActionKind.SKIP
+
+ return fallback_return
+
+ def create_new(
+ iterable: Iterable[_T],
+ operation: IterateOperation[_T, _V, _R],
+ fallback_return: _R,
+ /,
+ pre_iterate: IteratePreHook[_T, _V, _R] | None = None,
+ post_iterate: IteratePostHook[_V, _R] | None = None,
+ ) -> Generator[_V, None, _R]:
+ return _Generic.iterate(
+ iterable,
+ operation,
+ fallback_return,
+ pre_iterate or _Generic._none_hook,
+ post_iterate or _Generic._none_hook,
+ _Generic._non_result_to_push,
+ )
+
+ def get_result(
+ iterable: Iterable[_T],
+ operation: IterateOperation[_T, _V, _R],
+ fallback_return: _R,
+ /,
+ pre_iterate: IteratePreHook[_T, _V, _R] | None = None,
+ post_iterate: IteratePostHook[_V, _R] | None = None,
+ ) -> _R:
+ try:
+ for _ in _Generic.iterate(
iterable,
operation,
fallback_return,
- pre_iterate or CruIterableMeta.Generic._none_hook,
- post_iterate or CruIterableMeta.Generic._none_hook,
- CruIterableMeta.Generic._non_result_to_push,
- )
+ pre_iterate or _Generic._none_hook,
+ post_iterate or _Generic._none_hook,
+ _Generic._non_result_to_stop,
+ ):
+ pass
+ except StopIteration as stop:
+ return stop.value
+ raise RuntimeError("Should not reach here")
- def get_result(
- iterable: Iterable[_T],
- operation: _IterateOperation[_T, _V, _R],
- fallback_return: _R,
- /,
- pre_iterate: _IteratePreHook[_T, _V, _R] | None = None,
- post_iterate: _IteratePostHook[_V, _R] | None = None,
- ) -> _R:
- try:
- for _ in CruIterableMeta.Generic.iterate(
- iterable,
- operation,
- fallback_return,
- pre_iterate or CruIterableMeta.Generic._none_hook,
- post_iterate or CruIterableMeta.Generic._none_hook,
- CruIterableMeta.Generic._non_result_to_stop,
- ):
- pass
- except StopIteration as stop:
- return stop.value
- raise RuntimeError("Should not reach here")
-
- class Creators:
- @staticmethod
- def empty() -> Iterable[_T]:
- return iter([])
-
- @staticmethod
- def range(*args) -> Iterable[int]:
- return iter(range(*args))
-
- @staticmethod
- def unite(*args: _O) -> Iterable[_O]:
- return iter(args)
-
- @staticmethod
- def concat(*iterables: Iterable[_T]) -> Iterable[_T]:
- for iterable in iterables:
- yield from iterable
+class _Helper:
@staticmethod
def with_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]:
count = 0
@@ -242,78 +254,139 @@ class CruIterableMeta:
return wrapper
+
+class _Dec:
+
+ @staticmethod
+ def wrap(origin: Callable[_P, Iterable[_T]]) -> Callable[_P, _Wrapper[_T]]:
+ def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_T]:
+ return _Wrapper(origin(*args, **kwargs))
+
+ return _wrapped
+
+
+class _Creators:
@staticmethod
- def to_set(iterable: Iterable[_T], discard: Iterable[Any]) -> set[_T]:
- return set(iterable) - set(discard)
+ @_Dec.wrap
+ def empty() -> Iterable[Never]:
+ return iter([])
@staticmethod
- def to_list(iterable: Iterable[_T], discard: Iterable[Any]) -> list[_T]:
- return [v for v in iterable if v not in set(discard)]
+ @_Dec.wrap
+ def range(*args) -> Iterable[int]:
+ return iter(range(*args))
@staticmethod
- def all(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> bool:
- for value in iterable:
+ @_Dec.wrap
+ def unite(*args: _O) -> Iterable[_O]:
+ return iter(args)
+
+ @staticmethod
+ @_Dec.wrap
+ def concat(*iterables: Iterable[_T]) -> Iterable[_T]:
+ for iterable in iterables:
+ yield from iterable
+
+
+class _Wrapper(Generic[_T]):
+ ElementOperation: TypeAlias = Callable[[_V], Any]
+ ElementPredicate: TypeAlias = Callable[[_V], bool]
+ AnyElementPredicate: TypeAlias = ElementPredicate[Any]
+ ElementTransformer: TypeAlias = Callable[[_V], _O]
+ SelfElementTransformer: TypeAlias = ElementTransformer[_V, _V]
+ AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any]
+
+ def __init__(
+ self,
+ iterable: Iterable[_T],
+ ) -> None:
+ self._iterable = iterable
+
+ def __iter__(self) -> Iterator[_T]:
+ return self._iterable.__iter__()
+
+ @property
+ def me(self) -> Iterable[_T]:
+ return self._iterable
+
+ _wrap = _Dec.wrap
+
+ @_wrap
+ def replace_me(self, iterable: Iterable[_O]) -> Iterable[_O]:
+ return iterable
+
+ def replace_me_with_empty(self) -> _Wrapper[Never]:
+ return _Creators.empty()
+
+ def replace_me_with_range(self, *args) -> _Wrapper[int]:
+ return _Creators.range(*args)
+
+ def replace_me_with_unite(self, *args: _O) -> _Wrapper[_O]:
+ return _Creators.unite(*args)
+
+ def replace_me_with_concat(self, *iterables: Iterable[_T]) -> _Wrapper[_T]:
+ return _Creators.concat(*iterables)
+
+ def to_set(self, discard: Iterable[Any]) -> set[_T]:
+ return set(self.me) - set(discard)
+
+ def to_list(self, discard: Iterable[Any]) -> list[_T]:
+ return [v for v in self.me if v not in set(discard)]
+
+ def all(self, predicate: ElementPredicate[_T]) -> bool:
+ for value in self.me:
if not predicate(value):
return False
return True
- @staticmethod
- def any(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> bool:
- for value in iterable:
+ def any(self, predicate: ElementPredicate[_T]) -> bool:
+ for value in self.me:
if predicate(value):
return True
return False
- @staticmethod
- def foreach(iterable: Iterable[_T], operation: ElementOperation[_T]) -> None:
- for value in iterable:
+ def foreach(self, operation: ElementOperation[_T]) -> None:
+ for value in self.me:
operation(value)
- @staticmethod
- def transform(
- iterable: Iterable[_T], transformer: ElementTransformer[_T, _O]
- ) -> Iterable[_O]:
- for value in iterable:
+ @_wrap
+ def transform(self, transformer: ElementTransformer[_T, _O]) -> Iterable[_O]:
+ for value in self.me:
yield transformer(value)
- @staticmethod
- def filter(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> Iterable[_T]:
- for value in iterable:
+ map = transform
+
+ @_wrap
+ def filter(self, predicate: ElementPredicate[_T]) -> Iterable[_T]:
+ for value in self.me:
if predicate(value):
yield value
- @staticmethod
- def continue_if(
- iterable: Iterable[_T], predicate: ElementPredicate[_T]
- ) -> Iterable[_T]:
- for value in iterable:
+ @_wrap
+ def continue_if(self, predicate: ElementPredicate[_T]) -> Iterable[_T]:
+ for value in self.me:
yield value
if not predicate(value):
break
- @staticmethod
- def first_n(iterable: Iterable[_T], max_count: int) -> Iterable[_T]:
+ def first_n(self, max_count: int) -> _Wrapper[_T]:
if max_count < 0:
raise ValueError("max_count must be 0 or positive.")
if max_count == 0:
- return CruIterableMeta.Creators.empty()
- return CruIterableMeta.continue_if(
- iterable, _with_count(lambda i, _: i < max_count - 1)
- )
+ return self.replace_me_with_empty() # type: ignore
+ return self.continue_if(_Helper.with_count(lambda i, _: i < max_count - 1))
- @staticmethod
- def drop_n(iterable: Iterable[_T], n: int) -> Iterable[_T]:
+ def drop_n(self, n: int) -> _Wrapper[_T]:
if n < 0:
raise ValueError("n must be 0 or positive.")
if n == 0:
- return iterable
- return CruIterableMeta.filter(iterable, _with_count(lambda i, _: i < n))
+ return self
+ return self.filter(_Helper.with_count(lambda i, _: i < n))
- @staticmethod
def single_or(
- iterable: Iterable[_T], fallback: _O | CruNotFound | None = CruNotFound.VALUE
+ self, fallback: _O | CruNotFound | None = CruNotFound.VALUE
) -> _T | _O | Any | CruNotFound:
- first_2 = CruIterableMeta.first_n(iterable, 2)
+ first_2 = self.first_n(2)
has_value = False
value = None
for element in first_2:
@@ -326,181 +399,34 @@ class CruIterableMeta:
else:
return fallback
- @staticmethod
- def _is_not_iterable(o: Any) -> bool:
- return not isinstance(o, Iterable)
-
- @staticmethod
- def _return_self(o):
- return o
-
- @staticmethod
- @overload
- def flatten(o: Iterable[_T], max_depth: int = -1) -> Iterable[_T]: ...
-
- @staticmethod
- @overload
- def flatten(
- o: _O,
- max_depth: int = -1,
- *,
- is_leave: ElementPredicate[_O],
- get_children: ElementTransformer[_O, Iterable[_O]],
- ) -> Iterable[_O]: ...
-
- @staticmethod
- def flatten(
- o: _O,
- max_depth: int = -1,
- *,
- is_leave: ElementPredicate[_O] = _is_not_iterable,
- get_children: ElementTransformer[_O, Iterable[_O]] = _return_self,
- _depth: int = 0,
- ) -> Iterable[Any]:
- if _depth == max_depth or is_leave(o):
- yield o
- return
- for child in get_children(o):
- yield from CruIterableMeta.flatten(
- child,
- max_depth,
- is_leave=is_leave,
- get_children=get_children,
- _depth=_depth + 1,
- ) # type: ignore
-
-
-_with_count = CruIterableMeta.with_count
-
-
-class CruIterableWrapper(Generic[_T]):
- Meta = CruIterableMeta
-
- class Dec:
- @staticmethod
- def _wrap(iterable: Iterable[_O]) -> CruIterableWrapper[_O]:
- return CruIterableWrapper(iterable)
-
- wrap = CruDecCreators.convert_result(_wrap)
-
- @staticmethod
- def _as_iterable(_self: CruIterableWrapper[_O]) -> CruIterableWrapper[_O]:
- return _self
-
- meta = CruDecCreators.create_implement_by(_as_iterable)
- meta_no_self = CruDecCreators.create_implement_by_no_self()
-
- def __init__(
- self,
- iterable: Iterable[_T],
- ) -> None:
- self._iterable = iterable
-
- def __iter__(self) -> Iterator[_T]:
- return self._iterable.__iter__()
-
- @property
- def me(self) -> Iterable[_T]:
- return self._iterable
-
- def replace_me(self, iterable: Iterable[_O]) -> CruIterableWrapper[_O]:
- return CruIterableWrapper(iterable)
-
- @Dec.wrap
- @Dec.meta_no_self(CruIterableMeta.Creators.empty)
- def replace_me_with_empty(self) -> None:
- pass
-
- @Dec.wrap
- @Dec.meta_no_self(CruIterableMeta.Creators.range)
- def replace_me_with_range(self) -> None:
- pass
-
- @Dec.wrap
- @Dec.meta_no_self(CruIterableMeta.Creators.unite)
- def replace_me_with_unite(self) -> None:
- pass
+ @_wrap
+ def flatten(self) -> Iterable[_T | Iterable[_T]]:
+ return _Generic.iterable_flatten(self.me)
- @Dec.wrap
- @Dec.meta_no_self(CruIterableMeta.Creators.concat)
- def replace_me_with_concat(self) -> None:
- pass
-
- @Dec.meta(CruIterableMeta.to_set)
- def to_set(self) -> None:
- pass
-
- @Dec.meta(CruIterableMeta.to_list)
- def to_list(self) -> None:
- pass
-
- @Dec.meta(CruIterableMeta.all)
- def all(self) -> None:
- pass
-
- @Dec.meta(CruIterableMeta.any)
- def any(self) -> None:
- pass
-
- @Dec.meta(CruIterableMeta.foreach)
- def foreach(self) -> None:
- pass
-
- @Dec.wrap
- @Dec.meta(CruIterableMeta.transform)
- def transform(self) -> None:
- pass
-
- map = transform
-
- @Dec.wrap
- @Dec.meta(CruIterableMeta.filter)
- def filter(self) -> None:
- pass
-
- @Dec.wrap
- @Dec.meta(CruIterableMeta.continue_if)
- def continue_if(self) -> None:
- pass
-
- @Dec.wrap
- @Dec.meta(CruIterableMeta.first_n)
- def first_n(self) -> None:
- pass
-
- @Dec.wrap
- @Dec.meta(CruIterableMeta.drop_n)
- def drop_n(self) -> None:
- pass
-
- @Dec.wrap
- @Dec.meta(CruIterableMeta.flatten)
- def flatten(self) -> None:
- pass
-
- @Dec.meta(CruIterableMeta.single_or)
- def single_or(self) -> None:
- pass
-
- @Dec.wrap
- def select_by_indices(self, indices: Iterable[int]) -> Iterable[_T]:
+ def select_by_indices(self, indices: Iterable[int]) -> _Wrapper[_T]:
index_set = set(indices)
max_index = max(index_set)
return self.first_n(max_index + 1).filter(
- _with_count(lambda i, _: i in index_set)
+ _Helper.with_count(lambda i, _: i in index_set)
)
- @Dec.wrap
- def remove_values(self, values: Iterable[Any]) -> Iterable[_V]:
+ def remove_values(self, values: Iterable[Any]) -> _Wrapper[_T]:
value_set = set(values)
return self.filter(lambda v: v not in value_set)
- @Dec.wrap
def replace_values(
self, old_values: Iterable[Any], new_value: _O
- ) -> Iterable[_V | _O]:
+ ) -> Iterable[_T | _O]:
value_set = set(old_values)
- return self.map(lambda v: new_value if v in value_set else v)
+ return self.transform(lambda v: new_value if v in value_set else v)
+
+
+class CruIterable:
+ Decorators = _Dec
+ Generic = _Generic
+ Helper = _Helper
+ Creators = _Creators
+ Wrapper = _Wrapper
-CRU.add_objects(CruIterableMeta, CruIterableWrapper)
+CRU.add_objects(CruIterable, _Wrapper)
diff --git a/tools/cru-py/cru/_meta.py b/tools/cru-py/cru/_meta.py
deleted file mode 100644
index 86763cf..0000000
--- a/tools/cru-py/cru/_meta.py
+++ /dev/null
@@ -1,92 +0,0 @@
-from collections.abc import Callable
-from typing import Concatenate, Generic, ParamSpec, TypeVar
-
-_P = ParamSpec("_P")
-_T = TypeVar("_T")
-_O = TypeVar("_O")
-_R = TypeVar("_R")
-
-
-class CruDecorator:
-
- class ImplementedBy(Generic[_O, _P, _R]):
- # TODO: Continue here tomorrow!
- def __init__(
- self, impl: Callable[Concatenate[_O, _P], _R], pass_self: bool
- ) -> None:
- self.impl = impl
- self.pass_self = pass_self
-
- def __call__(
- self, _origin: Callable[[_T], None]
- ) -> Callable[Concatenate[_T, _P], _R]:
- def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R:
- if self.pass_self:
- return self.impl(_self, *args, **kwargs)
- else:
- return self.impl(*args, **kwargs)
-
- return real_impl
-
- @staticmethod
- def convert_result(
- converter: Callable[[_T], _O]
- ) -> Callable[[Callable[_P, _T]], Callable[_P, _O]]:
- def dec(original: Callable[_P, _T]) -> Callable[_P, _O]:
- def wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _O:
- return converter(original(*args, **kwargs))
-
- return wrapped
-
- return dec
-
- @staticmethod
- def create_implement_by(converter: Callable[[_T], _O]) -> Callable[
- [Callable[Concatenate[_O, _P], _R]],
- Callable[
- [Callable[[_T], None]],
- Callable[Concatenate[_T, _P], _R],
- ],
- ]:
- def implement_by(
- m: Callable[Concatenate[_O, _P], _R],
- ) -> Callable[
- [Callable[[_T], None]],
- Callable[Concatenate[_T, _P], _R],
- ]:
- def implementation(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R:
- return m(converter(_self), *args, **kwargs)
-
- def decorator(_: Callable[[_T], None]) -> Callable[Concatenate[_T, _P], _R]:
- return implementation
-
- return decorator
-
- return implement_by
-
- @staticmethod
- def create_implement_by_no_self() -> Callable[
- [Callable[_P, _R]],
- Callable[
- [Callable[[_T], None]],
- Callable[Concatenate[_T, _P], _R],
- ],
- ]:
- def implement_by_no_self(
- m: Callable[_P, _R],
- ) -> Callable[
- [Callable[[_T], None]],
- Callable[Concatenate[_T, _P], _R],
- ]:
- def implementation(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R:
- return m(*args, **kwargs)
-
- def decorator(_: Callable[[_T], None]) -> Callable[Concatenate[_T, _P], _R]:
- return implementation
-
- return decorator
-
- return implement_by_no_self
-
-
-CRU.add