aboutsummaryrefslogtreecommitdiff
path: root/tools/cru-py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/cru-py')
-rw-r--r--tools/cru-py/cru/_const.py (renamed from tools/cru-py/cru/_util/_const.py)0
-rw-r--r--tools/cru-py/cru/_cru.py (renamed from tools/cru-py/cru/_util/_cru.py)0
-rw-r--r--tools/cru-py/cru/_event.py (renamed from tools/cru-py/cru/_util/_event.py)2
-rw-r--r--tools/cru-py/cru/_func.py (renamed from tools/cru-py/cru/_util/_func.py)136
-rw-r--r--tools/cru-py/cru/_iter.py506
-rw-r--r--tools/cru-py/cru/_lang.py (renamed from tools/cru-py/cru/_util/_lang.py)0
-rw-r--r--tools/cru-py/cru/_list.py128
-rw-r--r--tools/cru-py/cru/_meta.py92
-rw-r--r--tools/cru-py/cru/_type.py (renamed from tools/cru-py/cru/_util/_type.py)2
-rw-r--r--tools/cru-py/cru/_util/__init__.py63
-rw-r--r--tools/cru-py/cru/_util/_list.py915
11 files changed, 794 insertions, 1050 deletions
diff --git a/tools/cru-py/cru/_util/_const.py b/tools/cru-py/cru/_const.py
index bc02c3a..bc02c3a 100644
--- a/tools/cru-py/cru/_util/_const.py
+++ b/tools/cru-py/cru/_const.py
diff --git a/tools/cru-py/cru/_util/_cru.py b/tools/cru-py/cru/_cru.py
index 0085a80..0085a80 100644
--- a/tools/cru-py/cru/_util/_cru.py
+++ b/tools/cru-py/cru/_cru.py
diff --git a/tools/cru-py/cru/_util/_event.py b/tools/cru-py/cru/_event.py
index 813e33f..ee914f0 100644
--- a/tools/cru-py/cru/_util/_event.py
+++ b/tools/cru-py/cru/_event.py
@@ -1,6 +1,6 @@
from typing import ParamSpec, TypeVar, Callable
-from ._list import CruInplaceList, CruList
+from ._iter import CruInplaceList, CruList
P = ParamSpec('P')
R = TypeVar('R')
diff --git a/tools/cru-py/cru/_util/_func.py b/tools/cru-py/cru/_func.py
index 0b8b07a..ef3da72 100644
--- a/tools/cru-py/cru/_util/_func.py
+++ b/tools/cru-py/cru/_func.py
@@ -20,54 +20,47 @@ _P = ParamSpec("_P")
_T = TypeVar("_T")
-class CruRawFunctions:
- @staticmethod
- def none(*_v, **_kwargs) -> None:
- return None
-
- @staticmethod
- def true(*_v, **_kwargs) -> Literal[True]:
- return True
-
- @staticmethod
- def false(*_v, **_kwargs) -> Literal[False]:
- return False
-
- @staticmethod
- def identity(v: _T) -> _T:
- return v
+_ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]]
+_KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]]
+_ChainableCallable: TypeAlias = Callable[
+ ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]]
+]
- @staticmethod
- def only_you(v: _T, *_v, **_kwargs) -> _T:
- return v
- @staticmethod
- def equal(a: Any, b: Any) -> bool:
- return a == b
+class CruFunctionMeta:
+ class Base:
+ @staticmethod
+ def none(*_v, **_kwargs) -> None:
+ return None
- @staticmethod
- def not_equal(a: Any, b: Any) -> bool:
- return a != b
+ @staticmethod
+ def true(*_v, **_kwargs) -> Literal[True]:
+ return True
- @staticmethod
- def not_(v: Any) -> Any:
- return not v
+ @staticmethod
+ def false(*_v, **_kwargs) -> Literal[False]:
+ return False
+ @staticmethod
+ def identity(v: _T) -> _T:
+ return v
-CruArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]]
-CruKwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]]
-CruChainableCallable: TypeAlias = Callable[
- ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]]
-]
+ @staticmethod
+ def only_you(v: _T, *_v, **_kwargs) -> _T:
+ return v
+ @staticmethod
+ def equal(a: Any, b: Any) -> bool:
+ return a == b
-class CruFunctionChainMode(Flag):
- ARGS = auto()
- KWARGS = auto()
- BOTH = ARGS | KWARGS
+ @staticmethod
+ def not_equal(a: Any, b: Any) -> bool:
+ return a != b
+ @staticmethod
+ def not_(v: Any) -> Any:
+ return not v
-class CruFunctionMeta:
@staticmethod
def bind(func: Callable[..., _T], *bind_args, **bind_kwargs) -> Callable[..., _T]:
def bound_func(*args, **kwargs):
@@ -84,10 +77,19 @@ class CruFunctionMeta:
return bound_func
+ class ChainMode(Flag):
+ ARGS = auto()
+ KWARGS = auto()
+ BOTH = ARGS | KWARGS
+
+ ArgsChainableCallable = _ArgsChainableCallable
+ KwargsChainableCallable = _KwargsChainableCallable
+ ChainableCallable = _ChainableCallable
+
@staticmethod
def chain_with_args(
- funcs: Iterable[CruArgsChainableCallable], *bind_args, **bind_kwargs
- ) -> CruArgsChainableCallable:
+ funcs: Iterable[_ArgsChainableCallable], *bind_args, **bind_kwargs
+ ) -> _ArgsChainableCallable:
def chained_func(*args):
for func in funcs:
args = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(*args)
@@ -97,8 +99,8 @@ class CruFunctionMeta:
@staticmethod
def chain_with_kwargs(
- funcs: Iterable[CruKwargsChainableCallable], *bind_args, **bind_kwargs
- ) -> CruKwargsChainableCallable:
+ funcs: Iterable[_KwargsChainableCallable], *bind_args, **bind_kwargs
+ ) -> _KwargsChainableCallable:
def chained_func(**kwargs):
for func in funcs:
kwargs = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(**kwargs)
@@ -108,8 +110,8 @@ class CruFunctionMeta:
@staticmethod
def chain_with_both(
- funcs: Iterable[CruChainableCallable], *bind_args, **bind_kwargs
- ) -> CruChainableCallable:
+ funcs: Iterable[_ChainableCallable], *bind_args, **bind_kwargs
+ ) -> _ChainableCallable:
def chained_func(*args, **kwargs):
for func in funcs:
args, kwargs = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(
@@ -121,48 +123,42 @@ class CruFunctionMeta:
@staticmethod
def chain(
- mode: CruFunctionChainMode,
+ mode: ChainMode,
funcs: Iterable[
- CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable
+ _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable
],
*bind_args,
**bind_kwargs,
- ) -> CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable:
- if mode == CruFunctionChainMode.ARGS:
+ ) -> _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable:
+ if mode == CruFunctionMeta.ChainMode.ARGS:
return CruFunctionMeta.chain_with_args(
- cast(Iterable[CruArgsChainableCallable], funcs),
+ cast(Iterable[_ArgsChainableCallable], funcs),
*bind_args,
**bind_kwargs,
)
- elif mode == CruFunctionChainMode.KWARGS:
+ elif mode == CruFunctionMeta.ChainMode.KWARGS:
return CruFunctionMeta.chain_with_kwargs(
- cast(Iterable[CruKwargsChainableCallable], funcs),
+ cast(Iterable[_KwargsChainableCallable], funcs),
*bind_args,
**bind_kwargs,
)
- elif mode == CruFunctionChainMode.BOTH:
+ elif mode == CruFunctionMeta.ChainMode.BOTH:
return CruFunctionMeta.chain_with_both(
- cast(Iterable[CruChainableCallable], funcs), *bind_args, **bind_kwargs
+ cast(Iterable[_ChainableCallable], funcs), *bind_args, **bind_kwargs
)
-# Advanced Function Wrapper
class CruFunction(Generic[_P, _T]):
def __init__(self, f: Callable[_P, _T]):
self._f = f
@property
- def f(self) -> Callable[_P, _T]:
+ def me(self) -> Callable[_P, _T]:
return self._f
- @property
- def func(self) -> Callable[_P, _T]:
- return self.f
-
def bind(self, *bind_args, **bind_kwargs) -> CruFunction[..., _T]:
- self._f = CruFunctionMeta.bind(self._f, *bind_args, **bind_kwargs)
- return self
+ return CruFunction(CruFunctionMeta.bind(self._f, *bind_args, **bind_kwargs))
def _iter_with_self(
self, funcs: Iterable[Callable[..., Any]]
@@ -173,10 +169,10 @@ class CruFunction(Generic[_P, _T]):
@staticmethod
def chain_with_args(
self,
- funcs: Iterable[CruArgsChainableCallable],
+ funcs: Iterable[_ArgsChainableCallable],
*bind_args,
**bind_kwargs,
- ) -> CruArgsChainableCallable:
+ ) -> _ArgsChainableCallable:
return CruFunction(
CruFunctionMeta.chain_with_args(
self._iter_with_self(funcs), *bind_args, **bind_kwargs
@@ -184,8 +180,8 @@ class CruFunction(Generic[_P, _T]):
)
def chain_with_kwargs(
- self, funcs: Iterable[CruKwargsChainableCallable], *bind_args, **bind_kwargs
- ) -> CruKwargsChainableCallable:
+ self, funcs: Iterable[_KwargsChainableCallable], *bind_args, **bind_kwargs
+ ) -> _KwargsChainableCallable:
return CruFunction(
CruFunctionMeta.chain_with_kwargs(
self._iter_with_self(funcs), *bind_args, **bind_kwargs
@@ -193,8 +189,8 @@ class CruFunction(Generic[_P, _T]):
)
def chain_with_both(
- self, funcs: Iterable[CruChainableCallable], *bind_args, **bind_kwargs
- ) -> CruChainableCallable:
+ self, funcs: Iterable[_ChainableCallable], *bind_args, **bind_kwargs
+ ) -> _ChainableCallable:
return CruFunction(
CruFunctionMeta.chain_with_both(
self._iter_with_self(funcs), *bind_args, **bind_kwargs
@@ -205,11 +201,11 @@ class CruFunction(Generic[_P, _T]):
self,
mode: CruFunctionChainMode,
funcs: Iterable[
- CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable
+ _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable
],
*bind_args,
**bind_kwargs,
- ) -> CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable:
+ ) -> _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable:
return CruFunction(
CruFunctionMeta.chain(
mode, self._iter_with_self(funcs), *bind_args, **bind_kwargs
@@ -223,7 +219,7 @@ class CruFunction(Generic[_P, _T]):
def make_chain(
mode: CruFunctionChainMode,
funcs: Iterable[
- CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable
+ _ArgsChainableCallable | _KwargsChainableCallable | _ChainableCallable
],
*bind_args,
**bind_kwargs,
diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py
new file mode 100644
index 0000000..da849e8
--- /dev/null
+++ b/tools/cru-py/cru/_iter.py
@@ -0,0 +1,506 @@
+from __future__ import annotations
+
+from collections.abc import Iterable, Callable
+from dataclasses import dataclass
+from enum import Enum
+from typing import (
+ Concatenate,
+ Generator,
+ Iterator,
+ Literal,
+ Self,
+ TypeAlias,
+ TypeVar,
+ ParamSpec,
+ Any,
+ Generic,
+ cast,
+ overload,
+)
+
+from ._cru import CRU
+from ._meta import CruDecCreators
+from ._const import CruNotFound
+
+_P = ParamSpec("_P")
+_T = TypeVar("_T")
+_O = TypeVar("_O")
+_V = TypeVar("_V")
+_R = TypeVar("_R")
+
+CanBeList: TypeAlias = Iterable[_T] | _T | None
+
+ElementOperation: TypeAlias = Callable[[_T], Any]
+ElementPredicate: TypeAlias = Callable[[_T], bool]
+AnyElementPredicate: TypeAlias = ElementPredicate[Any]
+ElementTransformer: TypeAlias = Callable[[_T], _O]
+SelfElementTransformer: TypeAlias = ElementTransformer[_T, _T]
+AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any]
+
+
+class _StepActionKind(Enum):
+ SKIP = 0
+ PUSH = 1
+ STOP = 2
+ AGGREGATE = 3
+
+
+@dataclass
+class _StepAction(Generic[_V, _R]):
+ value: Iterable[Self] | _V | _R | None
+ kind: _StepActionKind
+
+ @property
+ def push_value(self) -> _V:
+ assert self.kind == _StepActionKind.PUSH
+ return cast(_V, self.value)
+
+ @property
+ def stop_value(self) -> _R:
+ assert self.kind == _StepActionKind.STOP
+ return cast(_R, self.value)
+
+ @staticmethod
+ def skip() -> _StepAction[_V, _R]:
+ return _StepAction(None, _StepActionKind.SKIP)
+
+ @staticmethod
+ def push(value: _V | None) -> _StepAction[_V, _R]:
+ return _StepAction(value, _StepActionKind.PUSH)
+
+ @staticmethod
+ def stop(value: _R | None = None) -> _StepAction[_V, _R]:
+ return _StepAction(value, _StepActionKind.STOP)
+
+ @staticmethod
+ def aggregate(*results: _StepAction[_V, _R]) -> _StepAction[_V, _R]:
+ return _StepAction(results, _StepActionKind.AGGREGATE)
+
+ @staticmethod
+ def push_last(value: _V | None) -> _StepAction[_V, _R]:
+ return _StepAction.aggregate(_StepAction.push(value), _StepAction.stop())
+
+ def flatten(self) -> Iterable[Self]:
+ return CruIterableMeta.flatten(
+ self,
+ is_leave=lambda r: r.kind != _StepActionKind.AGGREGATE,
+ get_children=lambda r: cast(Iterable[Self], r.value),
+ )
+
+
+_GeneralStepAction: TypeAlias = _StepAction[_V, _R] | _V | _R | None
+_IterateOperation = Callable[[_T, int], _GeneralStepAction[_V, _R]]
+_IteratePreHook = Callable[[Iterable[_T]], _GeneralStepAction[_V, _R]]
+_IteratePostHook = Callable[[int], _GeneralStepAction[_V, _R]]
+
+
+class CruIterableMeta:
+ class Generic:
+ StepActionKind = _StepActionKind
+ StepAction = _StepAction
+ GeneralStepAction = _GeneralStepAction
+ IterateOperation = _IterateOperation
+ IteratePreHook = _IteratePreHook
+ IteratePostHook = _IteratePostHook
+
+ class Results:
+ @staticmethod
+ def true(_) -> Literal[True]:
+ return True
+
+ @staticmethod
+ def false(_) -> Literal[False]:
+ return False
+
+ @staticmethod
+ def not_found(_) -> Literal[CruNotFound.VALUE]:
+ return CruNotFound.VALUE
+
+ @staticmethod
+ def _non_result_to_push(value: Any) -> _StepAction[_V, _R]:
+ return _StepAction.push(value)
+
+ @staticmethod
+ def _non_result_to_stop(value: Any) -> _StepAction[_V, _R]:
+ return _StepAction.stop(value)
+
+ @staticmethod
+ def _none_hook(_: Any) -> _StepAction[_V, _R]:
+ return _StepAction.skip()
+
+ def iterate(
+ iterable: Iterable[_T],
+ operation: _IterateOperation[_T, _V, _R],
+ fallback_return: _R,
+ pre_iterate: _IteratePreHook[_T, _V, _R],
+ post_iterate: _IteratePostHook[_V, _R],
+ convert_value_result: Callable[[_V | _R | None], _StepAction[_V, _R]],
+ ) -> Generator[_V, None, _R]:
+ pre_result = pre_iterate(iterable)
+ if not isinstance(pre_result, _StepAction):
+ real_pre_result = convert_value_result(pre_result)
+ for r in real_pre_result.flatten():
+ if r.kind == _StepActionKind.STOP:
+ return r.stop_value
+ elif r.kind == _StepActionKind.PUSH:
+ yield r.push_value
+ else:
+ assert r.kind == _StepActionKind.SKIP
+
+ for index, element in enumerate(iterable):
+ result = operation(element, index)
+ if not isinstance(result, _StepAction):
+ real_result = convert_value_result(result)
+ for r in real_result.flatten():
+ if r.kind == _StepActionKind.STOP:
+ return r.stop_value
+ elif r.kind == _StepActionKind.PUSH:
+ yield r.push_value
+ else:
+ assert r.kind == _StepActionKind.SKIP
+ continue
+
+ post_result = post_iterate(index + 1)
+ if not isinstance(post_result, _StepAction):
+ real_post_result = convert_value_result(post_result)
+ for r in real_post_result.flatten():
+ if r.kind == _StepActionKind.STOP:
+ return r.stop_value
+ elif r.kind == _StepActionKind.PUSH:
+ yield r.push_value
+ else:
+ assert r.kind == _StepActionKind.SKIP
+
+ return fallback_return
+
+ def create_new(
+ iterable: Iterable[_T],
+ operation: _IterateOperation[_T, _V, _R],
+ fallback_return: _R,
+ /,
+ pre_iterate: _IteratePreHook[_T, _V, _R] | None = None,
+ post_iterate: _IteratePostHook[_V, _R] | None = None,
+ ) -> Generator[_V, None, _R]:
+ return CruIterableMeta.Generic.iterate(
+ iterable,
+ operation,
+ fallback_return,
+ pre_iterate or CruIterableMeta.Generic._none_hook,
+ post_iterate or CruIterableMeta.Generic._none_hook,
+ CruIterableMeta.Generic._non_result_to_push,
+ )
+
+ def get_result(
+ iterable: Iterable[_T],
+ operation: _IterateOperation[_T, _V, _R],
+ fallback_return: _R,
+ /,
+ pre_iterate: _IteratePreHook[_T, _V, _R] | None = None,
+ post_iterate: _IteratePostHook[_V, _R] | None = None,
+ ) -> _R:
+ try:
+ for _ in CruIterableMeta.Generic.iterate(
+ iterable,
+ operation,
+ fallback_return,
+ pre_iterate or CruIterableMeta.Generic._none_hook,
+ post_iterate or CruIterableMeta.Generic._none_hook,
+ CruIterableMeta.Generic._non_result_to_stop,
+ ):
+ pass
+ except StopIteration as stop:
+ return stop.value
+ raise RuntimeError("Should not reach here")
+
+ class Creators:
+ @staticmethod
+ def empty() -> Iterable[_T]:
+ return iter([])
+
+ @staticmethod
+ def range(*args) -> Iterable[int]:
+ return iter(range(*args))
+
+ @staticmethod
+ def unite(*args: _O) -> Iterable[_O]:
+ return iter(args)
+
+ @staticmethod
+ def concat(*iterables: Iterable[_T]) -> Iterable[_T]:
+ for iterable in iterables:
+ yield from iterable
+
+ @staticmethod
+ def with_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]:
+ count = 0
+
+ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _O:
+ nonlocal count
+ r = c(count, *args, **kwargs)
+ count += 1
+ return r
+
+ return wrapper
+
+ @staticmethod
+ def to_set(iterable: Iterable[_T], discard: Iterable[Any]) -> set[_T]:
+ return set(iterable) - set(discard)
+
+ @staticmethod
+ def to_list(iterable: Iterable[_T], discard: Iterable[Any]) -> list[_T]:
+ return [v for v in iterable if v not in set(discard)]
+
+ @staticmethod
+ def all(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> bool:
+ for value in iterable:
+ if not predicate(value):
+ return False
+ return True
+
+ @staticmethod
+ def any(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> bool:
+ for value in iterable:
+ if predicate(value):
+ return True
+ return False
+
+ @staticmethod
+ def foreach(iterable: Iterable[_T], operation: ElementOperation[_T]) -> None:
+ for value in iterable:
+ operation(value)
+
+ @staticmethod
+ def transform(
+ iterable: Iterable[_T], transformer: ElementTransformer[_T, _O]
+ ) -> Iterable[_O]:
+ for value in iterable:
+ yield transformer(value)
+
+ @staticmethod
+ def filter(iterable: Iterable[_T], predicate: ElementPredicate[_T]) -> Iterable[_T]:
+ for value in iterable:
+ if predicate(value):
+ yield value
+
+ @staticmethod
+ def continue_if(
+ iterable: Iterable[_T], predicate: ElementPredicate[_T]
+ ) -> Iterable[_T]:
+ for value in iterable:
+ yield value
+ if not predicate(value):
+ break
+
+ @staticmethod
+ def first_n(iterable: Iterable[_T], max_count: int) -> Iterable[_T]:
+ if max_count < 0:
+ raise ValueError("max_count must be 0 or positive.")
+ if max_count == 0:
+ return CruIterableMeta.Creators.empty()
+ return CruIterableMeta.continue_if(
+ iterable, _with_count(lambda i, _: i < max_count - 1)
+ )
+
+ @staticmethod
+ def drop_n(iterable: Iterable[_T], n: int) -> Iterable[_T]:
+ if n < 0:
+ raise ValueError("n must be 0 or positive.")
+ if n == 0:
+ return iterable
+ return CruIterableMeta.filter(iterable, _with_count(lambda i, _: i < n))
+
+ @staticmethod
+ def single_or(
+ iterable: Iterable[_T], fallback: _O | CruNotFound | None = CruNotFound.VALUE
+ ) -> _T | _O | Any | CruNotFound:
+ first_2 = CruIterableMeta.first_n(iterable, 2)
+ has_value = False
+ value = None
+ for element in first_2:
+ if has_value:
+ raise ValueError("More than one value found.")
+ has_value = True
+ value = element
+ if has_value:
+ return value
+ else:
+ return fallback
+
+ @staticmethod
+ def _is_not_iterable(o: Any) -> bool:
+ return not isinstance(o, Iterable)
+
+ @staticmethod
+ def _return_self(o):
+ return o
+
+ @staticmethod
+ @overload
+ def flatten(o: Iterable[_T], max_depth: int = -1) -> Iterable[_T]: ...
+
+ @staticmethod
+ @overload
+ def flatten(
+ o: _O,
+ max_depth: int = -1,
+ *,
+ is_leave: ElementPredicate[_O],
+ get_children: ElementTransformer[_O, Iterable[_O]],
+ ) -> Iterable[_O]: ...
+
+ @staticmethod
+ def flatten(
+ o: _O,
+ max_depth: int = -1,
+ *,
+ is_leave: ElementPredicate[_O] = _is_not_iterable,
+ get_children: ElementTransformer[_O, Iterable[_O]] = _return_self,
+ _depth: int = 0,
+ ) -> Iterable[Any]:
+ if _depth == max_depth or is_leave(o):
+ yield o
+ return
+ for child in get_children(o):
+ yield from CruIterableMeta.flatten(
+ child,
+ max_depth,
+ is_leave=is_leave,
+ get_children=get_children,
+ _depth=_depth + 1,
+ ) # type: ignore
+
+
+_with_count = CruIterableMeta.with_count
+
+
+class CruIterableWrapper(Generic[_T]):
+ Meta = CruIterableMeta
+
+ class Dec:
+ @staticmethod
+ def _wrap(iterable: Iterable[_O]) -> CruIterableWrapper[_O]:
+ return CruIterableWrapper(iterable)
+
+ wrap = CruDecCreators.convert_result(_wrap)
+
+ @staticmethod
+ def _as_iterable(_self: CruIterableWrapper[_O]) -> CruIterableWrapper[_O]:
+ return _self
+
+ meta = CruDecCreators.create_implement_by(_as_iterable)
+ meta_no_self = CruDecCreators.create_implement_by_no_self()
+
+ def __init__(
+ self,
+ iterable: Iterable[_T],
+ ) -> None:
+ self._iterable = iterable
+
+ def __iter__(self) -> Iterator[_T]:
+ return self._iterable.__iter__()
+
+ @property
+ def me(self) -> Iterable[_T]:
+ return self._iterable
+
+ def replace_me(self, iterable: Iterable[_O]) -> CruIterableWrapper[_O]:
+ return CruIterableWrapper(iterable)
+
+ @Dec.wrap
+ @Dec.meta_no_self(CruIterableMeta.Creators.empty)
+ def replace_me_with_empty(self) -> None:
+ pass
+
+ @Dec.wrap
+ @Dec.meta_no_self(CruIterableMeta.Creators.range)
+ def replace_me_with_range(self) -> None:
+ pass
+
+ @Dec.wrap
+ @Dec.meta_no_self(CruIterableMeta.Creators.unite)
+ def replace_me_with_unite(self) -> None:
+ pass
+
+ @Dec.wrap
+ @Dec.meta_no_self(CruIterableMeta.Creators.concat)
+ def replace_me_with_concat(self) -> None:
+ pass
+
+ @Dec.meta(CruIterableMeta.to_set)
+ def to_set(self) -> None:
+ pass
+
+ @Dec.meta(CruIterableMeta.to_list)
+ def to_list(self) -> None:
+ pass
+
+ @Dec.meta(CruIterableMeta.all)
+ def all(self) -> None:
+ pass
+
+ @Dec.meta(CruIterableMeta.any)
+ def any(self) -> None:
+ pass
+
+ @Dec.meta(CruIterableMeta.foreach)
+ def foreach(self) -> None:
+ pass
+
+ @Dec.wrap
+ @Dec.meta(CruIterableMeta.transform)
+ def transform(self) -> None:
+ pass
+
+ map = transform
+
+ @Dec.wrap
+ @Dec.meta(CruIterableMeta.filter)
+ def filter(self) -> None:
+ pass
+
+ @Dec.wrap
+ @Dec.meta(CruIterableMeta.continue_if)
+ def continue_if(self) -> None:
+ pass
+
+ @Dec.wrap
+ @Dec.meta(CruIterableMeta.first_n)
+ def first_n(self) -> None:
+ pass
+
+ @Dec.wrap
+ @Dec.meta(CruIterableMeta.drop_n)
+ def drop_n(self) -> None:
+ pass
+
+ @Dec.wrap
+ @Dec.meta(CruIterableMeta.flatten)
+ def flatten(self) -> None:
+ pass
+
+ @Dec.meta(CruIterableMeta.single_or)
+ def single_or(self) -> None:
+ pass
+
+ @Dec.wrap
+ def select_by_indices(self, indices: Iterable[int]) -> Iterable[_T]:
+ index_set = set(indices)
+ max_index = max(index_set)
+ return self.first_n(max_index + 1).filter(
+ _with_count(lambda i, _: i in index_set)
+ )
+
+ @Dec.wrap
+ def remove_values(self, values: Iterable[Any]) -> Iterable[_V]:
+ value_set = set(values)
+ return self.filter(lambda v: v not in value_set)
+
+ @Dec.wrap
+ def replace_values(
+ self, old_values: Iterable[Any], new_value: _O
+ ) -> Iterable[_V | _O]:
+ value_set = set(old_values)
+ return self.map(lambda v: new_value if v in value_set else v)
+
+
+CRU.add_objects(CruIterableMeta, CruIterableWrapper)
diff --git a/tools/cru-py/cru/_util/_lang.py b/tools/cru-py/cru/_lang.py
index 925ba00..925ba00 100644
--- a/tools/cru-py/cru/_util/_lang.py
+++ b/tools/cru-py/cru/_lang.py
diff --git a/tools/cru-py/cru/_list.py b/tools/cru-py/cru/_list.py
new file mode 100644
index 0000000..f1965db
--- /dev/null
+++ b/tools/cru-py/cru/_list.py
@@ -0,0 +1,128 @@
+
+
+class CruInplaceList(CruList, Generic[_V]):
+
+ def clear(self) -> "CruInplaceList[_V]":
+ self.clear()
+ return self
+
+ def extend(self, *l: Iterable[_V]) -> "CruInplaceList[_V]":
+ self.extend(l)
+ return self
+
+ def reset(self, *l: Iterable[_V]) -> "CruInplaceList[_V]":
+ self.clear()
+ self.extend(l)
+ return self
+
+ def transform(self, *f: OptionalElementTransformer) -> "CruInplaceList"[Any]:
+ return self.reset(super().transform(*f))
+
+ def transform_if(
+ self, f: OptionalElementTransformer, p: ElementPredicate[_V]
+ ) -> "CruInplaceList"[Any]:
+ return self.reset(super().transform_if(f, p))
+
+ def remove_by_indices(self, *index: int) -> "CruInplaceList"[_V]:
+ return self.reset(super().remove_by_indices(*index))
+
+ def remove_all_if(self, p: ElementPredicate[_V]) -> "CruInplaceList"[_V]:
+ return self.reset(super().remove_all_if(p))
+
+ def remove_all_value(self, *r: Any) -> "CruInplaceList"[_V]:
+ return self.reset(super().remove_all_value(*r))
+
+ def replace_all_value(
+ self, old_value: Any, new_value: R
+ ) -> "CruInplaceList"[_V | R]:
+ return self.reset(super().replace_all_value(old_value, new_value))
+
+ @staticmethod
+ def make(l: CanBeList[_V]) -> "CruInplaceList"[_V]:
+ return CruInplaceList(ListOperations.make(l))
+
+
+
+K = TypeVar("K")
+
+
+class CruUniqueKeyInplaceList(Generic[_V, K]):
+ KeyGetter = Callable[[_V], K]
+
+ def __init__(
+ self, get_key: KeyGetter, *, before_add: Callable[[_V], _V] | None = None
+ ):
+ super().__init__()
+ self._get_key = get_key
+ self._before_add = before_add
+ self._l: CruInplaceList[_V] = CruInplaceList()
+
+ @property
+ def object_key_getter(self) -> KeyGetter:
+ return self._get_key
+
+ @property
+ def internal_list(self) -> CruInplaceList[_V]:
+ return self._l
+
+ def validate_self(self):
+ keys = self._l.transform(self._get_key)
+ if len(keys) != len(set(keys)):
+ raise ValueError("Duplicate keys!")
+
+ def get_or(self, k: K, fallback: Any = CRU_NOT_FOUND) -> _V | Any:
+ r = self._l.find_if(lambda i: k == self._get_key(i))
+ return r if r is not CRU_NOT_FOUND else fallback
+
+ def get(self, k: K) -> _V:
+ v = self.get_or(k, CRU_NOT_FOUND)
+ if v is CRU_NOT_FOUND:
+ raise KeyError(f"Key not found!")
+ return v
+
+ def has_key(self, k: K) -> bool:
+ return self.get_or(k, CRU_NOT_FOUND) is not CRU_NOT_FOUND
+
+ def has_any_key(self, *k: K) -> bool:
+ return self._l.any(lambda i: self._get_key(i) in k)
+
+ def try_remove(self, k: K) -> bool:
+ i = self._l.find_index_if(lambda v: k == self._get_key(v))
+ if i is CRU_NOT_FOUND:
+ return False
+ self._l.remove_by_indices(i)
+ return True
+
+ def remove(self, k: K, allow_absense: bool = False) -> None:
+ if not self.try_remove(k) and not allow_absense:
+ raise KeyError(f"Key {k} not found!")
+
+ def add(self, v: _V, /, replace: bool = False) -> None:
+ if self.has_key(self._get_key(v)):
+ if replace:
+ self.remove(self._get_key(v))
+ else:
+ raise ValueError(f"Key {self._get_key(v)} already exists!")
+ if self._before_add is not None:
+ v = self._before_add(v)
+ self._l.append(v)
+
+ def set(self, v: _V) -> None:
+ self.add(v, True)
+
+ def extend(self, l: Iterable[_V], /, replace: bool = False) -> None:
+ if not replace and self.has_any_key([self._get_key(i) for i in l]):
+ raise ValueError("Keys already exists!")
+ if self._before_add is not None:
+ l = [self._before_add(i) for i in l]
+ keys = [self._get_key(i) for i in l]
+ self._l.remove_all_if(lambda i: self._get_key(i) in keys).extend(l)
+
+ def clear(self) -> None:
+ self._l.clear()
+
+ def __iter__(self):
+ return iter(self._l)
+
+ def __len__(self):
+ return len(self._l)
diff --git a/tools/cru-py/cru/_meta.py b/tools/cru-py/cru/_meta.py
new file mode 100644
index 0000000..86763cf
--- /dev/null
+++ b/tools/cru-py/cru/_meta.py
@@ -0,0 +1,92 @@
+from collections.abc import Callable
+from typing import Concatenate, Generic, ParamSpec, TypeVar
+
+_P = ParamSpec("_P")
+_T = TypeVar("_T")
+_O = TypeVar("_O")
+_R = TypeVar("_R")
+
+
+class CruDecorator:
+
+ class ImplementedBy(Generic[_O, _P, _R]):
+ # TODO: Continue here tomorrow!
+ def __init__(
+ self, impl: Callable[Concatenate[_O, _P], _R], pass_self: bool
+ ) -> None:
+ self.impl = impl
+ self.pass_self = pass_self
+
+ def __call__(
+ self, _origin: Callable[[_T], None]
+ ) -> Callable[Concatenate[_T, _P], _R]:
+ def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R:
+ if self.pass_self:
+ return self.impl(_self, *args, **kwargs)
+ else:
+ return self.impl(*args, **kwargs)
+
+ return real_impl
+
+ @staticmethod
+ def convert_result(
+ converter: Callable[[_T], _O]
+ ) -> Callable[[Callable[_P, _T]], Callable[_P, _O]]:
+ def dec(original: Callable[_P, _T]) -> Callable[_P, _O]:
+ def wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _O:
+ return converter(original(*args, **kwargs))
+
+ return wrapped
+
+ return dec
+
+ @staticmethod
+ def create_implement_by(converter: Callable[[_T], _O]) -> Callable[
+ [Callable[Concatenate[_O, _P], _R]],
+ Callable[
+ [Callable[[_T], None]],
+ Callable[Concatenate[_T, _P], _R],
+ ],
+ ]:
+ def implement_by(
+ m: Callable[Concatenate[_O, _P], _R],
+ ) -> Callable[
+ [Callable[[_T], None]],
+ Callable[Concatenate[_T, _P], _R],
+ ]:
+ def implementation(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R:
+ return m(converter(_self), *args, **kwargs)
+
+ def decorator(_: Callable[[_T], None]) -> Callable[Concatenate[_T, _P], _R]:
+ return implementation
+
+ return decorator
+
+ return implement_by
+
+ @staticmethod
+ def create_implement_by_no_self() -> Callable[
+ [Callable[_P, _R]],
+ Callable[
+ [Callable[[_T], None]],
+ Callable[Concatenate[_T, _P], _R],
+ ],
+ ]:
+ def implement_by_no_self(
+ m: Callable[_P, _R],
+ ) -> Callable[
+ [Callable[[_T], None]],
+ Callable[Concatenate[_T, _P], _R],
+ ]:
+ def implementation(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R:
+ return m(*args, **kwargs)
+
+ def decorator(_: Callable[[_T], None]) -> Callable[Concatenate[_T, _P], _R]:
+ return implementation
+
+ return decorator
+
+ return implement_by_no_self
+
+
+CRU.add
diff --git a/tools/cru-py/cru/_util/_type.py b/tools/cru-py/cru/_type.py
index dc50def..b67ee9a 100644
--- a/tools/cru-py/cru/_util/_type.py
+++ b/tools/cru-py/cru/_type.py
@@ -1,7 +1,7 @@
from types import NoneType
from typing import Any
-from ._list import CanBeList, CruList
+from ._iter import CanBeList, CruList
DEFAULT_NONE_ERR = ValueError
DEFAULT_NONE_ERR_MSG = "None is not allowed here."
diff --git a/tools/cru-py/cru/_util/__init__.py b/tools/cru-py/cru/_util/__init__.py
deleted file mode 100644
index 481502c..0000000
--- a/tools/cru-py/cru/_util/__init__.py
+++ /dev/null
@@ -1,63 +0,0 @@
-from ._const import (
- CruNotFound,
- CruUseDefault,
- CruDontChange,
- CruNoValue,
- CruPlaceholder,
-)
-from ._func import (
- CruFunction,
- CruFunctionMeta,
- CruRawFunctions,
- CruWrappedFunctions,
- CruFunctionGenerators,
-)
-from ._list import (
- CruList,
- CruInplaceList,
- CruUniqueKeyInplaceList,
- ListOperations,
- CanBeList,
- ElementOperation,
- ElementPredicate,
- ElementTransformer,
- OptionalElementOperation,
- ElementPredicate,
- OptionalElementTransformer,
-)
-from ._type import TypeSet
-
-F = CruFunction
-WF = CruWrappedFunctions
-FG = CruFunctionGenerators
-L = CruList
-
-
-__all__ = [
- "CruNotFound",
- "CruUseDefault",
- "CruDontChange",
- "CruNoValue",
- "CruPlaceholder",
- "CruFunction",
- "CruFunctionMeta",
- "CruRawFunctions",
- "CruWrappedFunctions",
- "CruFunctionGenerators",
- "CruList",
- "CruInplaceList",
- "CruUniqueKeyInplaceList",
- "ListOperations",
- "CanBeList",
- "ElementOperation",
- "ElementPredicate",
- "ElementTransformer",
- "OptionalElementOperation",
- "ElementPredicate",
- "OptionalElementTransformer",
- "TypeSet",
- "F",
- "WF",
- "FG",
- "L",
-]
diff --git a/tools/cru-py/cru/_util/_list.py b/tools/cru-py/cru/_util/_list.py
deleted file mode 100644
index 711d5f1..0000000
--- a/tools/cru-py/cru/_util/_list.py
+++ /dev/null
@@ -1,915 +0,0 @@
-from __future__ import annotations
-
-from collections.abc import Iterable, Callable
-from dataclasses import dataclass
-from enum import Enum
-from typing import (
- Generator,
- Literal,
- Self,
- TypeAlias,
- TypeVar,
- ParamSpec,
- Any,
- Generic,
- ClassVar,
- Optional,
- Union,
- assert_never,
- cast,
- overload,
- override,
-)
-
-from ._const import CruNoValue, CruNotFound
-
-P = ParamSpec("P")
-T = TypeVar("T")
-O = TypeVar("O")
-F = TypeVar("F")
-
-CanBeList: TypeAlias = Iterable[T] | T | None
-
-OptionalIndex: TypeAlias = int | None
-OptionalType: TypeAlias = type | None
-ElementOperation: TypeAlias = Callable[[T], Any]
-ElementPredicate: TypeAlias = Callable[[T], bool]
-ElementTransformer: TypeAlias = Callable[[T], O]
-SelfElementTransformer: TypeAlias = ElementTransformer[T, T]
-AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any]
-
-
-def flatten_with_func(
- o: T,
- max_depth: int,
- is_leave: ElementPredicate[T],
- get_children: ElementTransformer[T, Iterable[T]],
- depth: int = 0,
-) -> Iterable[T]:
- if depth == max_depth or is_leave(o):
- yield o
- return
- for child in get_children(o):
- yield from flatten_with_func(
- child, max_depth, is_leave, get_children, depth + 1
- )
-
-
-class _StepActionKind(Enum):
- SKIP = 0
- # TODO: Rename this
- SEND = 1
- STOP = 2
- AGGREGATE = 3
-
-
-@dataclass
-class _StepAction(Generic[T]):
- value: Iterable[_StepAction[T]] | T | None
- kind: _StepActionKind
-
- @property
- def non_aggregate_value(self) -> T:
- assert self.kind != _StepActionKind.AGGREGATE
- return cast(T, self.value)
-
- @staticmethod
- def skip() -> _StepAction[T]:
- return _StepAction(None, _StepActionKind.SKIP)
-
- @staticmethod
- def send(value: T | None) -> _StepAction[T]:
- return _StepAction(value, _StepActionKind.SEND)
-
- @staticmethod
- def stop(value: T | None = None) -> _StepAction[T]:
- return _StepAction(value, _StepActionKind.STOP)
-
- @staticmethod
- def aggregate(*results: _StepAction[T]) -> _StepAction[T]:
- return _StepAction(results, _StepActionKind.AGGREGATE)
-
- @staticmethod
- def send_last(value: Any) -> _StepAction[T]:
- return _StepAction.aggregate(_StepAction.send(value), _StepAction.stop())
-
- def flatten(self) -> Iterable[_StepAction[T]]:
- return flatten_with_func(
- self,
- -1,
- lambda r: r.kind != _StepActionKind.AGGREGATE,
- lambda r: cast(Iterable[_StepAction[T]], r.value),
- )
-
-
-_r_skip = _StepAction.skip
-_r_send = _StepAction.send
-_r_stop = _StepAction.stop
-_r_send_last = _StepAction.send_last
-_r_aggregate = _StepAction.aggregate
-
-
-_GeneralStepAction: TypeAlias = _StepAction[T] | T | None
-_GeneralStepActionConverter: TypeAlias = Callable[
- [_GeneralStepAction[T]], _StepAction[T]
-]
-_IterateOperation = Callable[[T, int], _GeneralStepAction[O]]
-_IteratePreHook = Callable[[Iterable[T]], _GeneralStepAction[O]]
-_IteratePostHook = Callable[[int], _GeneralStepAction[O]]
-
-
-class CruGenericIterableMeta:
- StepActionKind = _StepActionKind
- StepAction = _StepAction
- GeneralStepAction = _GeneralStepAction
- GeneralStepActionConverter = _GeneralStepActionConverter
- IterateOperation = _IterateOperation
- IteratePreHook = _IteratePreHook
- IteratePostHook = _IteratePostHook
-
- @staticmethod
- def _non_result_to_send(value: O | None) -> _StepAction[O]:
- return _StepAction.send(value)
-
- @staticmethod
- def _non_result_to_stop(value: O | None) -> _StepAction[O]:
- return _StepAction.stop(value)
-
- @staticmethod
- def _none_pre_iterate() -> _StepAction[O]:
- return _r_skip()
-
- @staticmethod
- def _none_post_iterate(
- _index: int,
- ) -> _StepAction[O]:
- return _r_skip()
-
- def iterate(
- self,
- operation: _IterateOperation[T, O],
- fallback_return: O,
- pre_iterate: _IteratePreHook[T, O],
- post_iterate: _IteratePostHook[O],
- convert_non_result: Callable[[O | None], _StepAction[O]],
- ) -> Generator[O, None, O]:
- pre_result = pre_iterate(self._iterable)
- if not isinstance(pre_result, _StepAction):
- real_pre_result = convert_non_result(pre_result)
- for r in real_pre_result.flatten():
- if r.kind == _StepActionKind.STOP:
- return r.non_aggregate_value
- elif r.kind == _StepActionKind.SEND:
- yield r.non_aggregate_value
-
- for index, element in enumerate(self._iterable):
- result = operation(element, index)
- if not isinstance(result, _StepAction):
- real_result = convert_non_result(result)
- for r in real_result.flatten():
- if r.kind == _StepActionKind.STOP:
- return r.non_aggregate_value
- elif r.kind == _StepActionKind.SEND:
- yield r.non_aggregate_value
- else:
- continue
-
- post_result = post_iterate(index + 1)
- if not isinstance(post_result, _StepAction):
- real_post_result = convert_non_result(post_result)
- for r in real_post_result.flatten():
- if r.kind == _StepActionKind.STOP:
- return r.non_aggregate_value
- elif r.kind == _StepActionKind.SEND:
- yield r.non_aggregate_value
-
- return fallback_return
-
- def _new(
- self,
- operation: _IterateOperation,
- fallback_return: O,
- /,
- pre_iterate: _IteratePreHook[T, O] | None = None,
- post_iterate: _IteratePostHook[O] | None = None,
- ) -> CruIterableWrapper:
- return CruIterableWrapper(
- self.iterate(
- operation,
- fallback_return,
- pre_iterate or CruIterableWrapper._none_pre_iterate,
- post_iterate or CruIterableWrapper._none_post_iterate,
- CruIterableWrapper._non_result_to_send,
- ),
- self._create_new_upstream(),
- )
-
- def _result(
- self,
- operation: _IterateOperation,
- fallback_return: O,
- /,
- result_transform: SelfElementTransformer[O] | None = None,
- pre_iterate: _IteratePreHook[T, O] | None = None,
- post_iterate: _IteratePostHook[O] | None = None,
- ) -> O:
- try:
- for _ in self.iterate(
- operation,
- fallback_return,
- pre_iterate or CruIterableWrapper._none_pre_iterate,
- post_iterate or CruIterableWrapper._none_post_iterate,
- CruIterableWrapper._non_result_to_stop,
- ):
- pass
- except StopIteration as stop:
- return (
- stop.value if result_transform is None else result_transform(stop.value)
- )
- raise RuntimeError("Should not reach here")
-
-
-class IterDefaultResults:
- @staticmethod
- def true(_):
- return True
-
- @staticmethod
- def false(_):
- return False
-
- @staticmethod
- def not_found(_):
- return CruNotFound.VALUE
-
-
-class CruIterableCreators:
- @staticmethod
- def with_(o: Any) -> CruIterableWrapper:
- return CruIterableWrapper(iter(o))
-
- @staticmethod
- def empty() -> CruIterableWrapper:
- return CruIterableCreators.with_([])
-
- @staticmethod
- def range(
- a,
- b=None,
- c=None,
- ) -> CruIterableWrapper[int]:
- args = [arg for arg in [a, b, c] if arg is not None]
- return CruIterableCreators.with_(range(*args))
-
- @staticmethod
- def unite(*args: T) -> CruIterableWrapper[T]:
- return CruIterableCreators.with_(args)
-
- @staticmethod
- def _concat(*iterables: Iterable) -> Iterable:
- for iterable in iterables:
- yield from iterable
-
- @staticmethod
- def concat(*iterables: Iterable) -> CruIterableWrapper:
- return CruIterableWrapper(CruIterableCreators._concat(*iterables))
-
-
-class CruIterableWrapper(Generic[T]):
-
- def __init__(
- self,
- iterable: Iterable[T],
- ) -> None:
- self._iterable = iterable
-
- def __iter__(self):
- return self._iterable.__iter__()
-
- @property
- def me(self) -> Iterable[T]:
- return self._iterable
-
- def replace_me_with(self, iterable: Iterable[O]) -> CruIterableWrapper[O]:
- return CruIterableCreators.with_(iterable)
-
- def replace_me_with_empty(self) -> CruIterableWrapper[O]:
- return CruIterableCreators.empty()
-
- def replace_me_with_range(self, a, b=None, c=None) -> CruIterableWrapper[int]:
- return CruIterableCreators.range(a, b, c)
-
- def replace_me_with_unite(self, *args: O) -> CruIterableWrapper[O]:
- return CruIterableCreators.unite(*args)
-
- def replace_me_with_concat(self, *iterables: Iterable) -> CruIterableWrapper:
- return CruIterableCreators.concat(*iterables)
-
- @staticmethod
- def _make_set(iterable: Iterable[O], discard: Iterable[Any] | None) -> set[O]:
- s = set(iterable)
- if discard is not None:
- s = s - set(discard)
- return s
-
- @staticmethod
- def _make_list(iterable: Iterable[O], discard: Iterable[Any] | None) -> list[O]:
- if discard is None:
- return list(iterable)
- return [v for v in iterable if v not in discard]
-
- def _help_make_set(
- self, iterable: Iterable[O], discard: Iterable[Any] | None
- ) -> set[O]:
- return CruIterableWrapper._make_set(iterable, discard)
-
- def _help_make_list(
- self, iterable: Iterable[O], discard: Iterable[Any] | None
- ) -> list[O]:
- return CruIterableWrapper._make_list(iterable, discard)
-
- def to_set(self, discard: Iterable[Any] | None = None) -> set[T]:
- return CruIterableWrapper._make_set(self.me, discard)
-
- def to_list(self, discard: Iterable[Any] | None = None) -> list[T]:
- return CruIterableWrapper._make_list(self.me, discard)
-
- def copy(self) -> CruIterableWrapper:
- return CruIterableWrapper(iter(self.to_list()), self._create_new_upstream())
-
- def new_start(
- self, other: Iterable[O], /, clear_upstream: bool = False
- ) -> CruIterableWrapper[O]:
- return CruIterableWrapper(
- other, None if clear_upstream else self._create_new_upstream()
- )
-
- @overload
- def concat(self) -> Self: ...
-
- @overload
- def concat(
- self, *iterable: Iterable[Any], last: Iterable[O]
- ) -> CruIterableWrapper[O]: ...
-
- def concat(self, *iterable: Iterable[Any]) -> CruIterableWrapper[Any]: # type: ignore
- return self.new_start(CruIterableCreators.concat(self.me, *iterable))
-
- def all(self, predicate: ElementPredicate[T]) -> bool:
- """
- partial
- """
- return self._result(lambda v, _: predicate(v) and None, IterDefaultResults.true)
-
- def all_isinstance(self, *types: OptionalType) -> bool:
- """
- partial
- """
- types = self._help_make_set(types)
- return self.all(lambda v: type(v) in types)
-
- def any(self, predicate: ElementPredicate[T]) -> bool:
- """
- partial
- """
- return self._result(lambda v, _: predicate(v) or None, IterDefaultResults.false)
-
- def number(self) -> CruIterableWrapper:
- """
- partial
- """
- return self._new(lambda _, i: i)
-
- def take(self, predicate: ElementPredicate[T]) -> CruIterableWrapper:
- """
- complete
- """
- return self._new(lambda v, _: _r_send(v) if predicate(v) else None)
-
- def transform(
- self, *transformers: OptionalElementTransformer
- ) -> CruIterableWrapper:
- """
- complete
- """
-
- def _transform_element(element, _):
- for transformer in self._help_make_list(transformers):
- if transformer is not None:
- element = transformer(element)
- return _r_send(element)
-
- return self._new(_transform_element)
-
- def take_n(self, max_count: int, neg_is_clone: bool = True) -> CruIterableWrapper:
- """
- partial
- """
- if max_count < 0:
- if neg_is_clone:
- return self.clone_me()
- else:
- raise ValueError("max_count must be 0 or positive.")
- elif max_count == 0:
- return self.drop_all()
- return self._new(
- lambda v, i: _r_send(v) if i < max_count - 1 else _r_send_last(v)
- )
-
- def take_by_indices(self, *indices: OptionalIndex) -> CruIterableWrapper:
- """
- partial
- """
- indices = self._help_make_set(indices)
- max_index = max(indices)
- return self.take_n(max_index + 1)._new(
- lambda v, i: _r_send(v) if i in indices else None
- )
-
- def single_or(
- self, fallback: Any | None = CRU_NOT_FOUND
- ) -> T | Any | CRU_NOT_FOUND:
- """
- partial
- """
- first_2 = self.take_n(2)
- has_value = False
- value = None
- for element in first_2.me:
- if has_value:
- raise ValueError("More than one value found.")
- has_value = True
- value = element
- if has_value:
- return value
- else:
- return fallback
-
- def first_or(
- self, predicate: ElementPredicate[T], fallback: Any | None = CRU_NOT_FOUND
- ) -> T | CRU_NOT_FOUND:
- """
- partial
- """
- result_iterable = self.take_n(1).single_or()
-
- @staticmethod
- def first_index(
- iterable: Iterable[T], predicate: ElementPredicate[T]
- ) -> int | CRU_NOT_FOUND:
- """
- partial
- """
- for index, element in enumerate(iterable):
- if predicate(element):
- return index
-
- @staticmethod
- def take_indices(
- iterable: Iterable[T], predicate: ElementPredicate[T]
- ) -> Iterable[int]:
- """
- complete
- """
- for index, element in enumerate(iterable):
- if predicate(element):
- yield index
-
- @staticmethod
- def flatten(
- o,
- max_depth=-1,
- is_leave: ElementPredicate | None = None,
- get_children: OptionalElementTransformer = None,
- ) -> Iterable:
- """
- complete
- """
- if is_leave is None:
- is_leave = lambda v: not isinstance(v, Iterable)
- if get_children is None:
- get_children = lambda v: v
- return CruIterableWrapper._flatten_with_func(
- o, max_depth, is_leave, get_children
- )
-
- @staticmethod
- def skip_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]:
- """
- complete
- """
- indices = set(indices) - {None}
- for index, element in enumerate(iterable):
- if index not in indices:
- yield element
-
- @staticmethod
- def skip_if(iterable: Iterable[T], predicate: ElementPredicate[T]) -> list[T]:
- """
- complete
- """
- for element in iterable:
- if not predicate(element):
- yield element
-
- def drop_all(self) -> CruIterableWrapper:
- return self.replace_me_with_empty()
-
- @staticmethod
- def remove_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]:
- return [v for v in l if not p(v)]
-
- @staticmethod
- def remove_all_value(l: Iterable[T], *r: Any) -> list[T]:
- return [v for v in l if v not in r]
-
- @staticmethod
- def replace_all_value(l: Iterable[T], old_value: Any, new_value: R) -> list[T | R]:
- return [new_value if v == old_value else v for v in l]
-
- @staticmethod
- def foreach(iterable: Iterable[T], *f: OptionalElementOperation[T]) -> None:
- if len(f) == 0:
- return
- for v in iterable:
- for f_ in f:
- if f_ is not None:
- f_(v)
-
- @staticmethod
- def make(v: CanBeList[T], /, none_to_empty_list: bool = True) -> list[T]:
- if v is None and none_to_empty_list:
- return []
- return list(v) if isinstance(v, Iterable) else [v]
-
-
-class ListOperations:
- @staticmethod
- def all(iterable: Iterable[T], predicate: ElementPredicate[T]) -> bool:
- """
- partial
- """
- return _God.spy(iterable, lambda v, _: predicate(v) and None, _God.Default.true)
-
- @staticmethod
- def all_isinstance(iterable: Iterable[T], *types: OptionalType) -> bool:
- """
- partial
- """
- types = _God.help_make_set(types)
- return ListOperations.all(iterable, lambda v: type(v) in types)
-
- @staticmethod
- def any(iterable: Iterable[T], predicate: ElementPredicate[T]) -> bool:
- """
- partial
- """
- return _God.spy(iterable, lambda v, _: predicate(v) or None, _God.Default.false)
-
- @staticmethod
- def indices(iterable: Iterable[T]) -> Iterable[int]:
- """
- partial
- """
- return _God.new(iterable, lambda _, i: i)
-
- @staticmethod
- def take(iterable: Iterable[T], predicate: ElementPredicate[T]) -> Iterable[T]:
- """
- complete
- """
- return _God.new(iterable, lambda v, _: _God.yield_(v) if predicate(v) else None)
-
- @staticmethod
- def transform(
- iterable: Iterable[T], *transformers: OptionalElementTransformer
- ) -> Iterable:
- """
- complete
- """
-
- def _transform_element(element, _):
- for transformer in transformers:
- if transformer is not None:
- element = transformer(element)
- return element
-
- return _God.new(iterable, _transform_element)
-
- @staticmethod
- def take_n(iterable: Iterable[T], n: int) -> Iterable[T]:
- """
- partial
- """
- if n < 0:
- return iterable
- elif n == 0:
- return []
- return range(n)._god_yield(
- iterable, lambda v, i: _yield(v) if i < n else _return()
- )
-
- @staticmethod
- def take_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]:
- """
- partial
- """
- indices = set(indices) - {None}
- max_index = max(indices)
- iterable = ListOperations.take_n(iterable, max_index + 1)
- return _god_yield(iterable, lambda v, i: _yield(v) if i in indices else None)
-
- @staticmethod
- def first(iterable: Iterable[T]) -> T | CRU_NOT_FOUND:
- """
- partial
- """
- result_iterable = ListOperations.take_n(iterable, 1)
- for element in result_iterable:
- return element
- return CRU_NOT_FOUND
-
- @staticmethod
- def first_index(
- iterable: Iterable[T], predicate: ElementPredicate[T]
- ) -> int | CRU_NOT_FOUND:
- """
- partial
- """
- for index, element in enumerate(iterable):
- if predicate(element):
- return index
-
- @staticmethod
- def take_indices(
- iterable: Iterable[T], predicate: ElementPredicate[T]
- ) -> Iterable[int]:
- """
- complete
- """
- for index, element in enumerate(iterable):
- if predicate(element):
- yield index
-
- @staticmethod
- def _flatten(o, depth: int, max_depth: int) -> Iterable:
- if depth == max_depth or not isinstance(o, Iterable):
- yield o
- return
- for v in o:
- yield from ListOperations._flatten(v, depth + 1, max_depth)
-
- @staticmethod
- def flatten(o, max_depth=-1) -> Iterable:
- """
- complete
- """
- return ListOperations._flatten(o, 0, max_depth)
-
- @staticmethod
- def skip_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]:
- """
- complete
- """
- indices = set(indices) - {None}
- for index, element in enumerate(iterable):
- if index not in indices:
- yield element
-
- @staticmethod
- def skip_if(iterable: Iterable[T], predicate: ElementPredicate[T]) -> list[T]:
- """
- complete
- """
- for element in iterable:
- if not predicate(element):
- yield element
-
- @staticmethod
- def remove_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]:
- return [v for v in l if not p(v)]
-
- @staticmethod
- def remove_all_value(l: Iterable[T], *r: Any) -> list[T]:
- return [v for v in l if v not in r]
-
- @staticmethod
- def replace_all_value(l: Iterable[T], old_value: Any, new_value: R) -> list[T | R]:
- return [new_value if v == old_value else v for v in l]
-
- @staticmethod
- def foreach(iterable: Iterable[T], *f: OptionalElementOperation[T]) -> None:
- if len(f) == 0:
- return
- for v in iterable:
- for f_ in f:
- if f_ is not None:
- f_(v)
-
- @staticmethod
- def make(v: CanBeList[T], /, none_to_empty_list: bool = True) -> list[T]:
- if v is None and none_to_empty_list:
- return []
- return list(v) if isinstance(v, Iterable) else [v]
-
-
-class CruList(list, Generic[T]):
- @property
- def is_empty(self) -> bool:
- return len(self) == 0
-
- def sub_by_indices(self, *index: int) -> "CruList"[T]:
- return CruList(ListOperations.sub_by_indices(self, *index))
-
- def split_by_indices(self, *index: int) -> tuple["CruList"[T], "CruList"[T]]:
- l1, l2 = ListOperations.split_by_indices(self, *index)
- return CruList(l1), CruList(l2)
-
- def complement_indices(self, *index: int) -> list[int]:
- return ListOperations.complement_indices(len(self), *index)
-
- def foreach(self, *f: OptionalElementOperation[T]) -> None:
- ListOperations.foreach(self, *f)
-
- def all(self, p: ElementPredicate[T]) -> bool:
- return ListOperations.all(self, p)
-
- def all_is_instance(self, *t: type) -> bool:
- return ListOperations.all_isinstance(self, *t)
-
- def any(self, p: ElementPredicate[T]) -> bool:
- return ListOperations.any(self, p)
-
- def find_all_if(self, p: ElementPredicate[T]) -> "CruList"[T]:
- return CruList(ListOperations.take(self, p))
-
- def find_if(self, p: ElementPredicate[T]) -> T | CRU_NOT_FOUND:
- return ListOperations.first(self, p)
-
- def find_all_indices_if(self, p: ElementPredicate[T]) -> "CruList"[int]:
- return CruList(ListOperations.take_indices(self, p))
-
- def find_index_if(self, p: ElementPredicate[T]) -> int | CRU_NOT_FOUND:
- return ListOperations.first_index(self, p)
-
- def split_if(self, p: ElementPredicate[T]) -> tuple["CruList"[T], "CruList"[T]]:
- l1, l2 = ListOperations.split_if(self, p)
- return CruList(l1), CruList(l2)
-
- def split_by_types(self, *t: type) -> tuple["CruList"[T], "CruList"[T]]:
- l1, l2 = ListOperations.split_by_types(self, *t)
- return CruList(l1), CruList(l2)
-
- def transform(self, *f: OptionalElementTransformer) -> "CruList"[Any]:
- return CruList(ListOperations.transform(self, *f))
-
- def transform_if(
- self, f: OptionalElementTransformer, p: ElementPredicate[T]
- ) -> "CruList"[Any]:
- return CruList(ListOperations.transform_if(self, f, p))
-
- def remove_by_indices(self, *index: int) -> "CruList"[T]:
- return CruList(ListOperations.skip_by_indices(self, *index))
-
- def remove_if(self, p: ElementPredicate[T]) -> "CruList"[T]:
- return CruList(ListOperations.remove_if(self, p))
-
- def remove_all_if(self, p: ElementPredicate[T]) -> "CruList"[T]:
- return CruList(ListOperations.remove_all_if(self, p))
-
- def remove_all_value(self, *r: Any) -> "CruList"[T]:
- return CruList(ListOperations.remove_all_value(self, *r))
-
- def replace_all_value(self, old_value: Any, new_value: R) -> "CruList"[T | R]:
- return CruList(ListOperations.replace_all_value(self, old_value, new_value))
-
- @staticmethod
- def make(l: CanBeList[T]) -> "CruList"[T]:
- return CruList(ListOperations.make(l))
-
-
-class CruInplaceList(CruList, Generic[T]):
-
- def clear(self) -> "CruInplaceList[T]":
- self.clear()
- return self
-
- def extend(self, *l: Iterable[T]) -> "CruInplaceList[T]":
- self.extend(l)
- return self
-
- def reset(self, *l: Iterable[T]) -> "CruInplaceList[T]":
- self.clear()
- self.extend(l)
- return self
-
- def transform(self, *f: OptionalElementTransformer) -> "CruInplaceList"[Any]:
- return self.reset(super().transform(*f))
-
- def transform_if(
- self, f: OptionalElementTransformer, p: ElementPredicate[T]
- ) -> "CruInplaceList"[Any]:
- return self.reset(super().transform_if(f, p))
-
- def remove_by_indices(self, *index: int) -> "CruInplaceList"[T]:
- return self.reset(super().remove_by_indices(*index))
-
- def remove_all_if(self, p: ElementPredicate[T]) -> "CruInplaceList"[T]:
- return self.reset(super().remove_all_if(p))
-
- def remove_all_value(self, *r: Any) -> "CruInplaceList"[T]:
- return self.reset(super().remove_all_value(*r))
-
- def replace_all_value(
- self, old_value: Any, new_value: R
- ) -> "CruInplaceList"[T | R]:
- return self.reset(super().replace_all_value(old_value, new_value))
-
- @staticmethod
- def make(l: CanBeList[T]) -> "CruInplaceList"[T]:
- return CruInplaceList(ListOperations.make(l))
-
-
-K = TypeVar("K")
-
-
-class CruUniqueKeyInplaceList(Generic[T, K]):
- KeyGetter = Callable[[T], K]
-
- def __init__(
- self, get_key: KeyGetter, *, before_add: Callable[[T], T] | None = None
- ):
- super().__init__()
- self._get_key = get_key
- self._before_add = before_add
- self._l: CruInplaceList[T] = CruInplaceList()
-
- @property
- def object_key_getter(self) -> KeyGetter:
- return self._get_key
-
- @property
- def internal_list(self) -> CruInplaceList[T]:
- return self._l
-
- def validate_self(self):
- keys = self._l.transform(self._get_key)
- if len(keys) != len(set(keys)):
- raise ValueError("Duplicate keys!")
-
- def get_or(self, k: K, fallback: Any = CRU_NOT_FOUND) -> T | Any:
- r = self._l.find_if(lambda i: k == self._get_key(i))
- return r if r is not CRU_NOT_FOUND else fallback
-
- def get(self, k: K) -> T:
- v = self.get_or(k, CRU_NOT_FOUND)
- if v is CRU_NOT_FOUND:
- raise KeyError(f"Key not found!")
- return v
-
- def has_key(self, k: K) -> bool:
- return self.get_or(k, CRU_NOT_FOUND) is not CRU_NOT_FOUND
-
- def has_any_key(self, *k: K) -> bool:
- return self._l.any(lambda i: self._get_key(i) in k)
-
- def try_remove(self, k: K) -> bool:
- i = self._l.find_index_if(lambda v: k == self._get_key(v))
- if i is CRU_NOT_FOUND:
- return False
- self._l.remove_by_indices(i)
- return True
-
- def remove(self, k: K, allow_absense: bool = False) -> None:
- if not self.try_remove(k) and not allow_absense:
- raise KeyError(f"Key {k} not found!")
-
- def add(self, v: T, /, replace: bool = False) -> None:
- if self.has_key(self._get_key(v)):
- if replace:
- self.remove(self._get_key(v))
- else:
- raise ValueError(f"Key {self._get_key(v)} already exists!")
- if self._before_add is not None:
- v = self._before_add(v)
- self._l.append(v)
-
- def set(self, v: T) -> None:
- self.add(v, True)
-
- def extend(self, l: Iterable[T], /, replace: bool = False) -> None:
- if not replace and self.has_any_key([self._get_key(i) for i in l]):
- raise ValueError("Keys already exists!")
- if self._before_add is not None:
- l = [self._before_add(i) for i in l]
- keys = [self._get_key(i) for i in l]
- self._l.remove_all_if(lambda i: self._get_key(i) in keys).extend(l)
-
- def clear(self) -> None:
- self._l.clear()
-
- def __iter__(self):
- return iter(self._l)
-
- def __len__(self):
- return len(self._l)