aboutsummaryrefslogtreecommitdiff
path: root/tools/cru-py/cru/_util
diff options
context:
space:
mode:
Diffstat (limited to 'tools/cru-py/cru/_util')
-rw-r--r--tools/cru-py/cru/_util/__init__.py63
-rw-r--r--tools/cru-py/cru/_util/_const.py49
-rw-r--r--tools/cru-py/cru/_util/_cru.py94
-rw-r--r--tools/cru-py/cru/_util/_event.py41
-rw-r--r--tools/cru-py/cru/_util/_func.py259
-rw-r--r--tools/cru-py/cru/_util/_lang.py16
-rw-r--r--tools/cru-py/cru/_util/_list.py915
-rw-r--r--tools/cru-py/cru/_util/_type.py42
8 files changed, 1479 insertions, 0 deletions
diff --git a/tools/cru-py/cru/_util/__init__.py b/tools/cru-py/cru/_util/__init__.py
new file mode 100644
index 0000000..481502c
--- /dev/null
+++ b/tools/cru-py/cru/_util/__init__.py
@@ -0,0 +1,63 @@
+from ._const import (
+ CruNotFound,
+ CruUseDefault,
+ CruDontChange,
+ CruNoValue,
+ CruPlaceholder,
+)
+from ._func import (
+ CruFunction,
+ CruFunctionMeta,
+ CruRawFunctions,
+ CruWrappedFunctions,
+ CruFunctionGenerators,
+)
+from ._list import (
+ CruList,
+ CruInplaceList,
+ CruUniqueKeyInplaceList,
+ ListOperations,
+ CanBeList,
+ ElementOperation,
+ ElementPredicate,
+ ElementTransformer,
+ OptionalElementOperation,
+ ElementPredicate,
+ OptionalElementTransformer,
+)
+from ._type import TypeSet
+
+F = CruFunction
+WF = CruWrappedFunctions
+FG = CruFunctionGenerators
+L = CruList
+
+
+__all__ = [
+ "CruNotFound",
+ "CruUseDefault",
+ "CruDontChange",
+ "CruNoValue",
+ "CruPlaceholder",
+ "CruFunction",
+ "CruFunctionMeta",
+ "CruRawFunctions",
+ "CruWrappedFunctions",
+ "CruFunctionGenerators",
+ "CruList",
+ "CruInplaceList",
+ "CruUniqueKeyInplaceList",
+ "ListOperations",
+ "CanBeList",
+ "ElementOperation",
+ "ElementPredicate",
+ "ElementTransformer",
+ "OptionalElementOperation",
+ "ElementPredicate",
+ "OptionalElementTransformer",
+ "TypeSet",
+ "F",
+ "WF",
+ "FG",
+ "L",
+]
diff --git a/tools/cru-py/cru/_util/_const.py b/tools/cru-py/cru/_util/_const.py
new file mode 100644
index 0000000..bc02c3a
--- /dev/null
+++ b/tools/cru-py/cru/_util/_const.py
@@ -0,0 +1,49 @@
+from enum import Enum, auto
+from typing import Self, TypeGuard, TypeVar
+
+from ._cru import CRU
+
+_T = TypeVar("_T")
+
+
+class CruConstantBase(Enum):
+ @classmethod
+ def check(cls, v: _T | Self) -> TypeGuard[Self]:
+ return isinstance(v, cls)
+
+ @classmethod
+ def check_not(cls, v: _T | Self) -> TypeGuard[_T]:
+ return not cls.check(v)
+
+ @classmethod
+ def value(cls) -> Self:
+ return cls.VALUE # type: ignore
+
+
+class CruNotFound(CruConstantBase):
+ VALUE = auto()
+
+
+class CruUseDefault(CruConstantBase):
+ VALUE = auto()
+
+
+class CruDontChange(CruConstantBase):
+ VALUE = auto()
+
+
+class CruNoValue(CruConstantBase):
+ VALUE = auto()
+
+
+class CruPlaceholder(CruConstantBase):
+ VALUE = auto()
+
+
+CRU.add_objects(
+ CruNotFound,
+ CruUseDefault,
+ CruDontChange,
+ CruNoValue,
+ CruPlaceholder,
+)
diff --git a/tools/cru-py/cru/_util/_cru.py b/tools/cru-py/cru/_util/_cru.py
new file mode 100644
index 0000000..0085a80
--- /dev/null
+++ b/tools/cru-py/cru/_util/_cru.py
@@ -0,0 +1,94 @@
+from typing import Any
+
+from ._lang import remove_none
+
+
+class _Cru:
+ NAME_PREFIXES = ("CRU_", "Cru", "cru_")
+
+ def __init__(self) -> None:
+ self._d: dict[str, Any] = {}
+
+ def all_names(self) -> list[str]:
+ return list(self._d.keys())
+
+ def get(self, name: str) -> Any:
+ return self._d[name]
+
+ def has_name(self, name: str) -> bool:
+ return name in self._d
+
+ @staticmethod
+ def _maybe_remove_prefix(name: str) -> str | None:
+ for prefix in _Cru.NAME_PREFIXES:
+ if name.startswith(prefix):
+ return name[len(prefix) :]
+ return None
+
+ def _check_name_exist(self, *names: str | None) -> None:
+ for name in names:
+ if name is None:
+ continue
+ if self.has_name(name):
+ raise ValueError(f"Name {name} exists in CRU.")
+
+ @staticmethod
+ def check_name_format(name: str) -> tuple[str, str]:
+ no_prefix_name = _Cru._maybe_remove_prefix(name)
+ if no_prefix_name is None:
+ raise ValueError(f"Name {name} is not prefixed with {_Cru.NAME_PREFIXES}.")
+ return name, no_prefix_name
+
+ @staticmethod
+ def _check_object_name(o) -> tuple[str, str]:
+ return _Cru.check_name_format(o.__name__)
+
+ def _do_add(self, o, *names: str | None) -> list[str]:
+ name_list: list[str] = remove_none(names)
+ for name in name_list:
+ self._d[name] = o
+ return name_list
+
+ def add(self, o, name: str | None) -> tuple[str, str | None]:
+ no_prefix_name: str | None
+ if name is None:
+ name, no_prefix_name = self._check_object_name(o)
+ else:
+ no_prefix_name = self._maybe_remove_prefix(name)
+
+ self._check_name_exist(name, no_prefix_name)
+ self._do_add(o, name, no_prefix_name)
+ return name, no_prefix_name
+
+ def add_with_alias(self, o, name: str | None = None, *aliases: str) -> list[str]:
+ final_names: list[str | None] = []
+ no_prefix_name: str | None
+ if name is None:
+ name, no_prefix_name = self._check_object_name(o)
+ self._check_name_exist(name, no_prefix_name)
+ final_names.extend([name, no_prefix_name])
+ for alias in aliases:
+ no_prefix_name = self._maybe_remove_prefix(alias)
+ self._check_name_exist(alias, no_prefix_name)
+ final_names.extend([alias, no_prefix_name])
+
+ return self._do_add(o, *final_names)
+
+ def add_objects(self, *objects):
+ final_list = []
+ for o in objects:
+ name, no_prefix_name = self._check_object_name(o)
+ self._check_name_exist(name, no_prefix_name)
+ final_list.append((o, name, no_prefix_name))
+ for o, name, no_prefix_name in final_list:
+ self._do_add(o, name, no_prefix_name)
+
+ def __getitem__(self, item):
+ return self.get(item)
+
+ def __getattr__(self, item):
+ return self.get(item)
+
+
+CRU_NAME_PREFIXES = _Cru.NAME_PREFIXES
+CRU = _Cru()
diff --git a/tools/cru-py/cru/_util/_event.py b/tools/cru-py/cru/_util/_event.py
new file mode 100644
index 0000000..813e33f
--- /dev/null
+++ b/tools/cru-py/cru/_util/_event.py
@@ -0,0 +1,41 @@
+from typing import ParamSpec, TypeVar, Callable
+
+from ._list import CruInplaceList, CruList
+
+P = ParamSpec('P')
+R = TypeVar('R')
+F = Callable[P, R]
+
+
+class EventHandlerToken:
+ def __init__(self, event: "Event", handler: F, once: bool = False) -> None:
+ self._event = event
+ self._handler = handler
+ self._once = once
+
+ @property
+ def event(self) -> "Event":
+ return self._event
+
+ @property
+ def handler(self) -> F:
+ return self._handler
+
+ @property
+ def once(self) -> bool:
+ return self._once
+
+
+class Event:
+ def __init__(self, name: str) -> None:
+ self._name = name
+ self._tokens: CruInplaceList[EventHandlerToken] = CruInplaceList()
+
+ def register(self, handler: F, once: bool = False) -> EventHandlerToken:
+ token = EventHandlerToken(self, handler, once)
+ self._tokens.append(token)
+ return token
+
+ def unregister(self, *h: EventHandlerToken | F) -> int:
+
+ self._tokens.find_all_indices_if(lambda t: )
diff --git a/tools/cru-py/cru/_util/_func.py b/tools/cru-py/cru/_util/_func.py
new file mode 100644
index 0000000..0b8b07a
--- /dev/null
+++ b/tools/cru-py/cru/_util/_func.py
@@ -0,0 +1,259 @@
+from __future__ import annotations
+
+from collections.abc import Callable
+from enum import Flag, auto
+from typing import (
+ Any,
+ Generic,
+ Iterable,
+ Literal,
+ ParamSpec,
+ TypeAlias,
+ TypeVar,
+ cast,
+)
+
+from ._cru import CRU
+from ._const import CruPlaceholder
+
+_P = ParamSpec("_P")
+_T = TypeVar("_T")
+
+
+class CruRawFunctions:
+ @staticmethod
+ def none(*_v, **_kwargs) -> None:
+ return None
+
+ @staticmethod
+ def true(*_v, **_kwargs) -> Literal[True]:
+ return True
+
+ @staticmethod
+ def false(*_v, **_kwargs) -> Literal[False]:
+ return False
+
+ @staticmethod
+ def identity(v: _T) -> _T:
+ return v
+
+ @staticmethod
+ def only_you(v: _T, *_v, **_kwargs) -> _T:
+ return v
+
+ @staticmethod
+ def equal(a: Any, b: Any) -> bool:
+ return a == b
+
+ @staticmethod
+ def not_equal(a: Any, b: Any) -> bool:
+ return a != b
+
+ @staticmethod
+ def not_(v: Any) -> Any:
+ return not v
+
+
+CruArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]]
+CruKwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]]
+CruChainableCallable: TypeAlias = Callable[
+ ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]]
+]
+
+
+class CruFunctionChainMode(Flag):
+ ARGS = auto()
+ KWARGS = auto()
+ BOTH = ARGS | KWARGS
+
+
+class CruFunctionMeta:
+ @staticmethod
+ def bind(func: Callable[..., _T], *bind_args, **bind_kwargs) -> Callable[..., _T]:
+ def bound_func(*args, **kwargs):
+ popped = 0
+ real_args = []
+ for arg in bind_args:
+ if CruPlaceholder.check(arg):
+ real_args.append(args[popped])
+ popped += 1
+ else:
+ real_args.append(arg)
+ real_args.extend(args[popped:])
+ return func(*real_args, **(bind_kwargs | kwargs))
+
+ return bound_func
+
+ @staticmethod
+ def chain_with_args(
+ funcs: Iterable[CruArgsChainableCallable], *bind_args, **bind_kwargs
+ ) -> CruArgsChainableCallable:
+ def chained_func(*args):
+ for func in funcs:
+ args = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(*args)
+ return args
+
+ return chained_func
+
+ @staticmethod
+ def chain_with_kwargs(
+ funcs: Iterable[CruKwargsChainableCallable], *bind_args, **bind_kwargs
+ ) -> CruKwargsChainableCallable:
+ def chained_func(**kwargs):
+ for func in funcs:
+ kwargs = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(**kwargs)
+ return kwargs
+
+ return chained_func
+
+ @staticmethod
+ def chain_with_both(
+ funcs: Iterable[CruChainableCallable], *bind_args, **bind_kwargs
+ ) -> CruChainableCallable:
+ def chained_func(*args, **kwargs):
+ for func in funcs:
+ args, kwargs = CruFunctionMeta.bind(func, *bind_args, **bind_kwargs)(
+ *args, **kwargs
+ )
+ return args, kwargs
+
+ return chained_func
+
+ @staticmethod
+ def chain(
+ mode: CruFunctionChainMode,
+ funcs: Iterable[
+ CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable
+ ],
+ *bind_args,
+ **bind_kwargs,
+ ) -> CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable:
+ if mode == CruFunctionChainMode.ARGS:
+ return CruFunctionMeta.chain_with_args(
+ cast(Iterable[CruArgsChainableCallable], funcs),
+ *bind_args,
+ **bind_kwargs,
+ )
+ elif mode == CruFunctionChainMode.KWARGS:
+ return CruFunctionMeta.chain_with_kwargs(
+ cast(Iterable[CruKwargsChainableCallable], funcs),
+ *bind_args,
+ **bind_kwargs,
+ )
+ elif mode == CruFunctionChainMode.BOTH:
+ return CruFunctionMeta.chain_with_both(
+ cast(Iterable[CruChainableCallable], funcs), *bind_args, **bind_kwargs
+ )
+
+
+# Advanced Function Wrapper
+class CruFunction(Generic[_P, _T]):
+
+ def __init__(self, f: Callable[_P, _T]):
+ self._f = f
+
+ @property
+ def f(self) -> Callable[_P, _T]:
+ return self._f
+
+ @property
+ def func(self) -> Callable[_P, _T]:
+ return self.f
+
+ def bind(self, *bind_args, **bind_kwargs) -> CruFunction[..., _T]:
+ self._f = CruFunctionMeta.bind(self._f, *bind_args, **bind_kwargs)
+ return self
+
+ def _iter_with_self(
+ self, funcs: Iterable[Callable[..., Any]]
+ ) -> Iterable[Callable[..., Any]]:
+ yield self
+ yield from funcs
+
+ @staticmethod
+ def chain_with_args(
+ self,
+ funcs: Iterable[CruArgsChainableCallable],
+ *bind_args,
+ **bind_kwargs,
+ ) -> CruArgsChainableCallable:
+ return CruFunction(
+ CruFunctionMeta.chain_with_args(
+ self._iter_with_self(funcs), *bind_args, **bind_kwargs
+ )
+ )
+
+ def chain_with_kwargs(
+ self, funcs: Iterable[CruKwargsChainableCallable], *bind_args, **bind_kwargs
+ ) -> CruKwargsChainableCallable:
+ return CruFunction(
+ CruFunctionMeta.chain_with_kwargs(
+ self._iter_with_self(funcs), *bind_args, **bind_kwargs
+ )
+ )
+
+ def chain_with_both(
+ self, funcs: Iterable[CruChainableCallable], *bind_args, **bind_kwargs
+ ) -> CruChainableCallable:
+ return CruFunction(
+ CruFunctionMeta.chain_with_both(
+ self._iter_with_self(funcs), *bind_args, **bind_kwargs
+ )
+ )
+
+ def chain(
+ self,
+ mode: CruFunctionChainMode,
+ funcs: Iterable[
+ CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable
+ ],
+ *bind_args,
+ **bind_kwargs,
+ ) -> CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable:
+ return CruFunction(
+ CruFunctionMeta.chain(
+ mode, self._iter_with_self(funcs), *bind_args, **bind_kwargs
+ )
+ )
+
+ def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T:
+ return self._f(*args, **kwargs)
+
+ @staticmethod
+ def make_chain(
+ mode: CruFunctionChainMode,
+ funcs: Iterable[
+ CruArgsChainableCallable | CruKwargsChainableCallable | CruChainableCallable
+ ],
+ *bind_args,
+ **bind_kwargs,
+ ) -> CruFunction:
+ return CruFunction(
+ CruFunctionMeta.chain(mode, funcs, *bind_args, **bind_kwargs)
+ )
+
+
+class CruWrappedFunctions:
+ none = CruFunction(CruRawFunctions.none)
+ true = CruFunction(CruRawFunctions.true)
+ false = CruFunction(CruRawFunctions.false)
+ identity = CruFunction(CruRawFunctions.identity)
+ only_you = CruFunction(CruRawFunctions.only_you)
+ equal = CruFunction(CruRawFunctions.equal)
+ not_equal = CruFunction(CruRawFunctions.not_equal)
+ not_ = CruFunction(CruRawFunctions.not_)
+
+
+class CruFunctionGenerators:
+ @staticmethod
+ def make_isinstance_of_types(*types: type) -> Callable:
+ return CruFunction(lambda v: type(v) in types)
+
+
+CRU.add_objects(
+ CruRawFunctions,
+ CruFunctionMeta,
+ CruFunction,
+ CruWrappedFunctions,
+ CruFunctionGenerators,
+)
diff --git a/tools/cru-py/cru/_util/_lang.py b/tools/cru-py/cru/_util/_lang.py
new file mode 100644
index 0000000..925ba00
--- /dev/null
+++ b/tools/cru-py/cru/_util/_lang.py
@@ -0,0 +1,16 @@
+from collections.abc import Callable
+from typing import Any, Iterable, TypeVar, cast
+
+T = TypeVar("T")
+D = TypeVar("D")
+
+
+def remove_element(
+ iterable: Iterable[T | None], to_rm: Iterable[Any], des: type[D] | None = None
+) -> D:
+ to_rm = set(to_rm)
+ return cast(Callable[..., D], des or list)(v for v in iterable if v not in to_rm)
+
+
+def remove_none(iterable: Iterable[T | None], des: type[D] | None = None) -> D:
+ return cast(Callable[..., D], des or list)(v for v in iterable if v is not None)
diff --git a/tools/cru-py/cru/_util/_list.py b/tools/cru-py/cru/_util/_list.py
new file mode 100644
index 0000000..711d5f1
--- /dev/null
+++ b/tools/cru-py/cru/_util/_list.py
@@ -0,0 +1,915 @@
+from __future__ import annotations
+
+from collections.abc import Iterable, Callable
+from dataclasses import dataclass
+from enum import Enum
+from typing import (
+ Generator,
+ Literal,
+ Self,
+ TypeAlias,
+ TypeVar,
+ ParamSpec,
+ Any,
+ Generic,
+ ClassVar,
+ Optional,
+ Union,
+ assert_never,
+ cast,
+ overload,
+ override,
+)
+
+from ._const import CruNoValue, CruNotFound
+
+P = ParamSpec("P")
+T = TypeVar("T")
+O = TypeVar("O")
+F = TypeVar("F")
+
+CanBeList: TypeAlias = Iterable[T] | T | None
+
+OptionalIndex: TypeAlias = int | None
+OptionalType: TypeAlias = type | None
+ElementOperation: TypeAlias = Callable[[T], Any]
+ElementPredicate: TypeAlias = Callable[[T], bool]
+ElementTransformer: TypeAlias = Callable[[T], O]
+SelfElementTransformer: TypeAlias = ElementTransformer[T, T]
+AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any]
+
+
+def flatten_with_func(
+ o: T,
+ max_depth: int,
+ is_leave: ElementPredicate[T],
+ get_children: ElementTransformer[T, Iterable[T]],
+ depth: int = 0,
+) -> Iterable[T]:
+ if depth == max_depth or is_leave(o):
+ yield o
+ return
+ for child in get_children(o):
+ yield from flatten_with_func(
+ child, max_depth, is_leave, get_children, depth + 1
+ )
+
+
+class _StepActionKind(Enum):
+ SKIP = 0
+ # TODO: Rename this
+ SEND = 1
+ STOP = 2
+ AGGREGATE = 3
+
+
+@dataclass
+class _StepAction(Generic[T]):
+ value: Iterable[_StepAction[T]] | T | None
+ kind: _StepActionKind
+
+ @property
+ def non_aggregate_value(self) -> T:
+ assert self.kind != _StepActionKind.AGGREGATE
+ return cast(T, self.value)
+
+ @staticmethod
+ def skip() -> _StepAction[T]:
+ return _StepAction(None, _StepActionKind.SKIP)
+
+ @staticmethod
+ def send(value: T | None) -> _StepAction[T]:
+ return _StepAction(value, _StepActionKind.SEND)
+
+ @staticmethod
+ def stop(value: T | None = None) -> _StepAction[T]:
+ return _StepAction(value, _StepActionKind.STOP)
+
+ @staticmethod
+ def aggregate(*results: _StepAction[T]) -> _StepAction[T]:
+ return _StepAction(results, _StepActionKind.AGGREGATE)
+
+ @staticmethod
+ def send_last(value: Any) -> _StepAction[T]:
+ return _StepAction.aggregate(_StepAction.send(value), _StepAction.stop())
+
+ def flatten(self) -> Iterable[_StepAction[T]]:
+ return flatten_with_func(
+ self,
+ -1,
+ lambda r: r.kind != _StepActionKind.AGGREGATE,
+ lambda r: cast(Iterable[_StepAction[T]], r.value),
+ )
+
+
+_r_skip = _StepAction.skip
+_r_send = _StepAction.send
+_r_stop = _StepAction.stop
+_r_send_last = _StepAction.send_last
+_r_aggregate = _StepAction.aggregate
+
+
+_GeneralStepAction: TypeAlias = _StepAction[T] | T | None
+_GeneralStepActionConverter: TypeAlias = Callable[
+ [_GeneralStepAction[T]], _StepAction[T]
+]
+_IterateOperation = Callable[[T, int], _GeneralStepAction[O]]
+_IteratePreHook = Callable[[Iterable[T]], _GeneralStepAction[O]]
+_IteratePostHook = Callable[[int], _GeneralStepAction[O]]
+
+
+class CruGenericIterableMeta:
+ StepActionKind = _StepActionKind
+ StepAction = _StepAction
+ GeneralStepAction = _GeneralStepAction
+ GeneralStepActionConverter = _GeneralStepActionConverter
+ IterateOperation = _IterateOperation
+ IteratePreHook = _IteratePreHook
+ IteratePostHook = _IteratePostHook
+
+ @staticmethod
+ def _non_result_to_send(value: O | None) -> _StepAction[O]:
+ return _StepAction.send(value)
+
+ @staticmethod
+ def _non_result_to_stop(value: O | None) -> _StepAction[O]:
+ return _StepAction.stop(value)
+
+ @staticmethod
+ def _none_pre_iterate() -> _StepAction[O]:
+ return _r_skip()
+
+ @staticmethod
+ def _none_post_iterate(
+ _index: int,
+ ) -> _StepAction[O]:
+ return _r_skip()
+
+ def iterate(
+ self,
+ operation: _IterateOperation[T, O],
+ fallback_return: O,
+ pre_iterate: _IteratePreHook[T, O],
+ post_iterate: _IteratePostHook[O],
+ convert_non_result: Callable[[O | None], _StepAction[O]],
+ ) -> Generator[O, None, O]:
+ pre_result = pre_iterate(self._iterable)
+ if not isinstance(pre_result, _StepAction):
+ real_pre_result = convert_non_result(pre_result)
+ for r in real_pre_result.flatten():
+ if r.kind == _StepActionKind.STOP:
+ return r.non_aggregate_value
+ elif r.kind == _StepActionKind.SEND:
+ yield r.non_aggregate_value
+
+ for index, element in enumerate(self._iterable):
+ result = operation(element, index)
+ if not isinstance(result, _StepAction):
+ real_result = convert_non_result(result)
+ for r in real_result.flatten():
+ if r.kind == _StepActionKind.STOP:
+ return r.non_aggregate_value
+ elif r.kind == _StepActionKind.SEND:
+ yield r.non_aggregate_value
+ else:
+ continue
+
+ post_result = post_iterate(index + 1)
+ if not isinstance(post_result, _StepAction):
+ real_post_result = convert_non_result(post_result)
+ for r in real_post_result.flatten():
+ if r.kind == _StepActionKind.STOP:
+ return r.non_aggregate_value
+ elif r.kind == _StepActionKind.SEND:
+ yield r.non_aggregate_value
+
+ return fallback_return
+
+ def _new(
+ self,
+ operation: _IterateOperation,
+ fallback_return: O,
+ /,
+ pre_iterate: _IteratePreHook[T, O] | None = None,
+ post_iterate: _IteratePostHook[O] | None = None,
+ ) -> CruIterableWrapper:
+ return CruIterableWrapper(
+ self.iterate(
+ operation,
+ fallback_return,
+ pre_iterate or CruIterableWrapper._none_pre_iterate,
+ post_iterate or CruIterableWrapper._none_post_iterate,
+ CruIterableWrapper._non_result_to_send,
+ ),
+ self._create_new_upstream(),
+ )
+
+ def _result(
+ self,
+ operation: _IterateOperation,
+ fallback_return: O,
+ /,
+ result_transform: SelfElementTransformer[O] | None = None,
+ pre_iterate: _IteratePreHook[T, O] | None = None,
+ post_iterate: _IteratePostHook[O] | None = None,
+ ) -> O:
+ try:
+ for _ in self.iterate(
+ operation,
+ fallback_return,
+ pre_iterate or CruIterableWrapper._none_pre_iterate,
+ post_iterate or CruIterableWrapper._none_post_iterate,
+ CruIterableWrapper._non_result_to_stop,
+ ):
+ pass
+ except StopIteration as stop:
+ return (
+ stop.value if result_transform is None else result_transform(stop.value)
+ )
+ raise RuntimeError("Should not reach here")
+
+
+class IterDefaultResults:
+ @staticmethod
+ def true(_):
+ return True
+
+ @staticmethod
+ def false(_):
+ return False
+
+ @staticmethod
+ def not_found(_):
+ return CruNotFound.VALUE
+
+
+class CruIterableCreators:
+ @staticmethod
+ def with_(o: Any) -> CruIterableWrapper:
+ return CruIterableWrapper(iter(o))
+
+ @staticmethod
+ def empty() -> CruIterableWrapper:
+ return CruIterableCreators.with_([])
+
+ @staticmethod
+ def range(
+ a,
+ b=None,
+ c=None,
+ ) -> CruIterableWrapper[int]:
+ args = [arg for arg in [a, b, c] if arg is not None]
+ return CruIterableCreators.with_(range(*args))
+
+ @staticmethod
+ def unite(*args: T) -> CruIterableWrapper[T]:
+ return CruIterableCreators.with_(args)
+
+ @staticmethod
+ def _concat(*iterables: Iterable) -> Iterable:
+ for iterable in iterables:
+ yield from iterable
+
+ @staticmethod
+ def concat(*iterables: Iterable) -> CruIterableWrapper:
+ return CruIterableWrapper(CruIterableCreators._concat(*iterables))
+
+
+class CruIterableWrapper(Generic[T]):
+
+ def __init__(
+ self,
+ iterable: Iterable[T],
+ ) -> None:
+ self._iterable = iterable
+
+ def __iter__(self):
+ return self._iterable.__iter__()
+
+ @property
+ def me(self) -> Iterable[T]:
+ return self._iterable
+
+ def replace_me_with(self, iterable: Iterable[O]) -> CruIterableWrapper[O]:
+ return CruIterableCreators.with_(iterable)
+
+ def replace_me_with_empty(self) -> CruIterableWrapper[O]:
+ return CruIterableCreators.empty()
+
+ def replace_me_with_range(self, a, b=None, c=None) -> CruIterableWrapper[int]:
+ return CruIterableCreators.range(a, b, c)
+
+ def replace_me_with_unite(self, *args: O) -> CruIterableWrapper[O]:
+ return CruIterableCreators.unite(*args)
+
+ def replace_me_with_concat(self, *iterables: Iterable) -> CruIterableWrapper:
+ return CruIterableCreators.concat(*iterables)
+
+ @staticmethod
+ def _make_set(iterable: Iterable[O], discard: Iterable[Any] | None) -> set[O]:
+ s = set(iterable)
+ if discard is not None:
+ s = s - set(discard)
+ return s
+
+ @staticmethod
+ def _make_list(iterable: Iterable[O], discard: Iterable[Any] | None) -> list[O]:
+ if discard is None:
+ return list(iterable)
+ return [v for v in iterable if v not in discard]
+
+ def _help_make_set(
+ self, iterable: Iterable[O], discard: Iterable[Any] | None
+ ) -> set[O]:
+ return CruIterableWrapper._make_set(iterable, discard)
+
+ def _help_make_list(
+ self, iterable: Iterable[O], discard: Iterable[Any] | None
+ ) -> list[O]:
+ return CruIterableWrapper._make_list(iterable, discard)
+
+ def to_set(self, discard: Iterable[Any] | None = None) -> set[T]:
+ return CruIterableWrapper._make_set(self.me, discard)
+
+ def to_list(self, discard: Iterable[Any] | None = None) -> list[T]:
+ return CruIterableWrapper._make_list(self.me, discard)
+
+ def copy(self) -> CruIterableWrapper:
+ return CruIterableWrapper(iter(self.to_list()), self._create_new_upstream())
+
+ def new_start(
+ self, other: Iterable[O], /, clear_upstream: bool = False
+ ) -> CruIterableWrapper[O]:
+ return CruIterableWrapper(
+ other, None if clear_upstream else self._create_new_upstream()
+ )
+
+ @overload
+ def concat(self) -> Self: ...
+
+ @overload
+ def concat(
+ self, *iterable: Iterable[Any], last: Iterable[O]
+ ) -> CruIterableWrapper[O]: ...
+
+ def concat(self, *iterable: Iterable[Any]) -> CruIterableWrapper[Any]: # type: ignore
+ return self.new_start(CruIterableCreators.concat(self.me, *iterable))
+
+ def all(self, predicate: ElementPredicate[T]) -> bool:
+ """
+ partial
+ """
+ return self._result(lambda v, _: predicate(v) and None, IterDefaultResults.true)
+
+ def all_isinstance(self, *types: OptionalType) -> bool:
+ """
+ partial
+ """
+ types = self._help_make_set(types)
+ return self.all(lambda v: type(v) in types)
+
+ def any(self, predicate: ElementPredicate[T]) -> bool:
+ """
+ partial
+ """
+ return self._result(lambda v, _: predicate(v) or None, IterDefaultResults.false)
+
+ def number(self) -> CruIterableWrapper:
+ """
+ partial
+ """
+ return self._new(lambda _, i: i)
+
+ def take(self, predicate: ElementPredicate[T]) -> CruIterableWrapper:
+ """
+ complete
+ """
+ return self._new(lambda v, _: _r_send(v) if predicate(v) else None)
+
+ def transform(
+ self, *transformers: OptionalElementTransformer
+ ) -> CruIterableWrapper:
+ """
+ complete
+ """
+
+ def _transform_element(element, _):
+ for transformer in self._help_make_list(transformers):
+ if transformer is not None:
+ element = transformer(element)
+ return _r_send(element)
+
+ return self._new(_transform_element)
+
+ def take_n(self, max_count: int, neg_is_clone: bool = True) -> CruIterableWrapper:
+ """
+ partial
+ """
+ if max_count < 0:
+ if neg_is_clone:
+ return self.clone_me()
+ else:
+ raise ValueError("max_count must be 0 or positive.")
+ elif max_count == 0:
+ return self.drop_all()
+ return self._new(
+ lambda v, i: _r_send(v) if i < max_count - 1 else _r_send_last(v)
+ )
+
+ def take_by_indices(self, *indices: OptionalIndex) -> CruIterableWrapper:
+ """
+ partial
+ """
+ indices = self._help_make_set(indices)
+ max_index = max(indices)
+ return self.take_n(max_index + 1)._new(
+ lambda v, i: _r_send(v) if i in indices else None
+ )
+
+ def single_or(
+ self, fallback: Any | None = CRU_NOT_FOUND
+ ) -> T | Any | CRU_NOT_FOUND:
+ """
+ partial
+ """
+ first_2 = self.take_n(2)
+ has_value = False
+ value = None
+ for element in first_2.me:
+ if has_value:
+ raise ValueError("More than one value found.")
+ has_value = True
+ value = element
+ if has_value:
+ return value
+ else:
+ return fallback
+
+ def first_or(
+ self, predicate: ElementPredicate[T], fallback: Any | None = CRU_NOT_FOUND
+ ) -> T | CRU_NOT_FOUND:
+ """
+ partial
+ """
+ result_iterable = self.take_n(1).single_or()
+
+ @staticmethod
+ def first_index(
+ iterable: Iterable[T], predicate: ElementPredicate[T]
+ ) -> int | CRU_NOT_FOUND:
+ """
+ partial
+ """
+ for index, element in enumerate(iterable):
+ if predicate(element):
+ return index
+
+ @staticmethod
+ def take_indices(
+ iterable: Iterable[T], predicate: ElementPredicate[T]
+ ) -> Iterable[int]:
+ """
+ complete
+ """
+ for index, element in enumerate(iterable):
+ if predicate(element):
+ yield index
+
+ @staticmethod
+ def flatten(
+ o,
+ max_depth=-1,
+ is_leave: ElementPredicate | None = None,
+ get_children: OptionalElementTransformer = None,
+ ) -> Iterable:
+ """
+ complete
+ """
+ if is_leave is None:
+ is_leave = lambda v: not isinstance(v, Iterable)
+ if get_children is None:
+ get_children = lambda v: v
+ return CruIterableWrapper._flatten_with_func(
+ o, max_depth, is_leave, get_children
+ )
+
+ @staticmethod
+ def skip_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]:
+ """
+ complete
+ """
+ indices = set(indices) - {None}
+ for index, element in enumerate(iterable):
+ if index not in indices:
+ yield element
+
+ @staticmethod
+ def skip_if(iterable: Iterable[T], predicate: ElementPredicate[T]) -> list[T]:
+ """
+ complete
+ """
+ for element in iterable:
+ if not predicate(element):
+ yield element
+
+ def drop_all(self) -> CruIterableWrapper:
+ return self.replace_me_with_empty()
+
+ @staticmethod
+ def remove_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]:
+ return [v for v in l if not p(v)]
+
+ @staticmethod
+ def remove_all_value(l: Iterable[T], *r: Any) -> list[T]:
+ return [v for v in l if v not in r]
+
+ @staticmethod
+ def replace_all_value(l: Iterable[T], old_value: Any, new_value: R) -> list[T | R]:
+ return [new_value if v == old_value else v for v in l]
+
+ @staticmethod
+ def foreach(iterable: Iterable[T], *f: OptionalElementOperation[T]) -> None:
+ if len(f) == 0:
+ return
+ for v in iterable:
+ for f_ in f:
+ if f_ is not None:
+ f_(v)
+
+ @staticmethod
+ def make(v: CanBeList[T], /, none_to_empty_list: bool = True) -> list[T]:
+ if v is None and none_to_empty_list:
+ return []
+ return list(v) if isinstance(v, Iterable) else [v]
+
+
+class ListOperations:
+ @staticmethod
+ def all(iterable: Iterable[T], predicate: ElementPredicate[T]) -> bool:
+ """
+ partial
+ """
+ return _God.spy(iterable, lambda v, _: predicate(v) and None, _God.Default.true)
+
+ @staticmethod
+ def all_isinstance(iterable: Iterable[T], *types: OptionalType) -> bool:
+ """
+ partial
+ """
+ types = _God.help_make_set(types)
+ return ListOperations.all(iterable, lambda v: type(v) in types)
+
+ @staticmethod
+ def any(iterable: Iterable[T], predicate: ElementPredicate[T]) -> bool:
+ """
+ partial
+ """
+ return _God.spy(iterable, lambda v, _: predicate(v) or None, _God.Default.false)
+
+ @staticmethod
+ def indices(iterable: Iterable[T]) -> Iterable[int]:
+ """
+ partial
+ """
+ return _God.new(iterable, lambda _, i: i)
+
+ @staticmethod
+ def take(iterable: Iterable[T], predicate: ElementPredicate[T]) -> Iterable[T]:
+ """
+ complete
+ """
+ return _God.new(iterable, lambda v, _: _God.yield_(v) if predicate(v) else None)
+
+ @staticmethod
+ def transform(
+ iterable: Iterable[T], *transformers: OptionalElementTransformer
+ ) -> Iterable:
+ """
+ complete
+ """
+
+ def _transform_element(element, _):
+ for transformer in transformers:
+ if transformer is not None:
+ element = transformer(element)
+ return element
+
+ return _God.new(iterable, _transform_element)
+
+ @staticmethod
+ def take_n(iterable: Iterable[T], n: int) -> Iterable[T]:
+ """
+ partial
+ """
+ if n < 0:
+ return iterable
+ elif n == 0:
+ return []
+ return range(n)._god_yield(
+ iterable, lambda v, i: _yield(v) if i < n else _return()
+ )
+
+ @staticmethod
+ def take_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]:
+ """
+ partial
+ """
+ indices = set(indices) - {None}
+ max_index = max(indices)
+ iterable = ListOperations.take_n(iterable, max_index + 1)
+ return _god_yield(iterable, lambda v, i: _yield(v) if i in indices else None)
+
+ @staticmethod
+ def first(iterable: Iterable[T]) -> T | CRU_NOT_FOUND:
+ """
+ partial
+ """
+ result_iterable = ListOperations.take_n(iterable, 1)
+ for element in result_iterable:
+ return element
+ return CRU_NOT_FOUND
+
+ @staticmethod
+ def first_index(
+ iterable: Iterable[T], predicate: ElementPredicate[T]
+ ) -> int | CRU_NOT_FOUND:
+ """
+ partial
+ """
+ for index, element in enumerate(iterable):
+ if predicate(element):
+ return index
+
+ @staticmethod
+ def take_indices(
+ iterable: Iterable[T], predicate: ElementPredicate[T]
+ ) -> Iterable[int]:
+ """
+ complete
+ """
+ for index, element in enumerate(iterable):
+ if predicate(element):
+ yield index
+
+ @staticmethod
+ def _flatten(o, depth: int, max_depth: int) -> Iterable:
+ if depth == max_depth or not isinstance(o, Iterable):
+ yield o
+ return
+ for v in o:
+ yield from ListOperations._flatten(v, depth + 1, max_depth)
+
+ @staticmethod
+ def flatten(o, max_depth=-1) -> Iterable:
+ """
+ complete
+ """
+ return ListOperations._flatten(o, 0, max_depth)
+
+ @staticmethod
+ def skip_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]:
+ """
+ complete
+ """
+ indices = set(indices) - {None}
+ for index, element in enumerate(iterable):
+ if index not in indices:
+ yield element
+
+ @staticmethod
+ def skip_if(iterable: Iterable[T], predicate: ElementPredicate[T]) -> list[T]:
+ """
+ complete
+ """
+ for element in iterable:
+ if not predicate(element):
+ yield element
+
+ @staticmethod
+ def remove_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]:
+ return [v for v in l if not p(v)]
+
+ @staticmethod
+ def remove_all_value(l: Iterable[T], *r: Any) -> list[T]:
+ return [v for v in l if v not in r]
+
+ @staticmethod
+ def replace_all_value(l: Iterable[T], old_value: Any, new_value: R) -> list[T | R]:
+ return [new_value if v == old_value else v for v in l]
+
+ @staticmethod
+ def foreach(iterable: Iterable[T], *f: OptionalElementOperation[T]) -> None:
+ if len(f) == 0:
+ return
+ for v in iterable:
+ for f_ in f:
+ if f_ is not None:
+ f_(v)
+
+ @staticmethod
+ def make(v: CanBeList[T], /, none_to_empty_list: bool = True) -> list[T]:
+ if v is None and none_to_empty_list:
+ return []
+ return list(v) if isinstance(v, Iterable) else [v]
+
+
+class CruList(list, Generic[T]):
+ @property
+ def is_empty(self) -> bool:
+ return len(self) == 0
+
+ def sub_by_indices(self, *index: int) -> "CruList"[T]:
+ return CruList(ListOperations.sub_by_indices(self, *index))
+
+ def split_by_indices(self, *index: int) -> tuple["CruList"[T], "CruList"[T]]:
+ l1, l2 = ListOperations.split_by_indices(self, *index)
+ return CruList(l1), CruList(l2)
+
+ def complement_indices(self, *index: int) -> list[int]:
+ return ListOperations.complement_indices(len(self), *index)
+
+ def foreach(self, *f: OptionalElementOperation[T]) -> None:
+ ListOperations.foreach(self, *f)
+
+ def all(self, p: ElementPredicate[T]) -> bool:
+ return ListOperations.all(self, p)
+
+ def all_is_instance(self, *t: type) -> bool:
+ return ListOperations.all_isinstance(self, *t)
+
+ def any(self, p: ElementPredicate[T]) -> bool:
+ return ListOperations.any(self, p)
+
+ def find_all_if(self, p: ElementPredicate[T]) -> "CruList"[T]:
+ return CruList(ListOperations.take(self, p))
+
+ def find_if(self, p: ElementPredicate[T]) -> T | CRU_NOT_FOUND:
+ return ListOperations.first(self, p)
+
+ def find_all_indices_if(self, p: ElementPredicate[T]) -> "CruList"[int]:
+ return CruList(ListOperations.take_indices(self, p))
+
+ def find_index_if(self, p: ElementPredicate[T]) -> int | CRU_NOT_FOUND:
+ return ListOperations.first_index(self, p)
+
+ def split_if(self, p: ElementPredicate[T]) -> tuple["CruList"[T], "CruList"[T]]:
+ l1, l2 = ListOperations.split_if(self, p)
+ return CruList(l1), CruList(l2)
+
+ def split_by_types(self, *t: type) -> tuple["CruList"[T], "CruList"[T]]:
+ l1, l2 = ListOperations.split_by_types(self, *t)
+ return CruList(l1), CruList(l2)
+
+ def transform(self, *f: OptionalElementTransformer) -> "CruList"[Any]:
+ return CruList(ListOperations.transform(self, *f))
+
+ def transform_if(
+ self, f: OptionalElementTransformer, p: ElementPredicate[T]
+ ) -> "CruList"[Any]:
+ return CruList(ListOperations.transform_if(self, f, p))
+
+ def remove_by_indices(self, *index: int) -> "CruList"[T]:
+ return CruList(ListOperations.skip_by_indices(self, *index))
+
+ def remove_if(self, p: ElementPredicate[T]) -> "CruList"[T]:
+ return CruList(ListOperations.remove_if(self, p))
+
+ def remove_all_if(self, p: ElementPredicate[T]) -> "CruList"[T]:
+ return CruList(ListOperations.remove_all_if(self, p))
+
+ def remove_all_value(self, *r: Any) -> "CruList"[T]:
+ return CruList(ListOperations.remove_all_value(self, *r))
+
+ def replace_all_value(self, old_value: Any, new_value: R) -> "CruList"[T | R]:
+ return CruList(ListOperations.replace_all_value(self, old_value, new_value))
+
+ @staticmethod
+ def make(l: CanBeList[T]) -> "CruList"[T]:
+ return CruList(ListOperations.make(l))
+
+
+class CruInplaceList(CruList, Generic[T]):
+
+ def clear(self) -> "CruInplaceList[T]":
+ self.clear()
+ return self
+
+ def extend(self, *l: Iterable[T]) -> "CruInplaceList[T]":
+ self.extend(l)
+ return self
+
+ def reset(self, *l: Iterable[T]) -> "CruInplaceList[T]":
+ self.clear()
+ self.extend(l)
+ return self
+
+ def transform(self, *f: OptionalElementTransformer) -> "CruInplaceList"[Any]:
+ return self.reset(super().transform(*f))
+
+ def transform_if(
+ self, f: OptionalElementTransformer, p: ElementPredicate[T]
+ ) -> "CruInplaceList"[Any]:
+ return self.reset(super().transform_if(f, p))
+
+ def remove_by_indices(self, *index: int) -> "CruInplaceList"[T]:
+ return self.reset(super().remove_by_indices(*index))
+
+ def remove_all_if(self, p: ElementPredicate[T]) -> "CruInplaceList"[T]:
+ return self.reset(super().remove_all_if(p))
+
+ def remove_all_value(self, *r: Any) -> "CruInplaceList"[T]:
+ return self.reset(super().remove_all_value(*r))
+
+ def replace_all_value(
+ self, old_value: Any, new_value: R
+ ) -> "CruInplaceList"[T | R]:
+ return self.reset(super().replace_all_value(old_value, new_value))
+
+ @staticmethod
+ def make(l: CanBeList[T]) -> "CruInplaceList"[T]:
+ return CruInplaceList(ListOperations.make(l))
+
+
+K = TypeVar("K")
+
+
+class CruUniqueKeyInplaceList(Generic[T, K]):
+ KeyGetter = Callable[[T], K]
+
+ def __init__(
+ self, get_key: KeyGetter, *, before_add: Callable[[T], T] | None = None
+ ):
+ super().__init__()
+ self._get_key = get_key
+ self._before_add = before_add
+ self._l: CruInplaceList[T] = CruInplaceList()
+
+ @property
+ def object_key_getter(self) -> KeyGetter:
+ return self._get_key
+
+ @property
+ def internal_list(self) -> CruInplaceList[T]:
+ return self._l
+
+ def validate_self(self):
+ keys = self._l.transform(self._get_key)
+ if len(keys) != len(set(keys)):
+ raise ValueError("Duplicate keys!")
+
+ def get_or(self, k: K, fallback: Any = CRU_NOT_FOUND) -> T | Any:
+ r = self._l.find_if(lambda i: k == self._get_key(i))
+ return r if r is not CRU_NOT_FOUND else fallback
+
+ def get(self, k: K) -> T:
+ v = self.get_or(k, CRU_NOT_FOUND)
+ if v is CRU_NOT_FOUND:
+ raise KeyError(f"Key not found!")
+ return v
+
+ def has_key(self, k: K) -> bool:
+ return self.get_or(k, CRU_NOT_FOUND) is not CRU_NOT_FOUND
+
+ def has_any_key(self, *k: K) -> bool:
+ return self._l.any(lambda i: self._get_key(i) in k)
+
+ def try_remove(self, k: K) -> bool:
+ i = self._l.find_index_if(lambda v: k == self._get_key(v))
+ if i is CRU_NOT_FOUND:
+ return False
+ self._l.remove_by_indices(i)
+ return True
+
+ def remove(self, k: K, allow_absense: bool = False) -> None:
+ if not self.try_remove(k) and not allow_absense:
+ raise KeyError(f"Key {k} not found!")
+
+ def add(self, v: T, /, replace: bool = False) -> None:
+ if self.has_key(self._get_key(v)):
+ if replace:
+ self.remove(self._get_key(v))
+ else:
+ raise ValueError(f"Key {self._get_key(v)} already exists!")
+ if self._before_add is not None:
+ v = self._before_add(v)
+ self._l.append(v)
+
+ def set(self, v: T) -> None:
+ self.add(v, True)
+
+ def extend(self, l: Iterable[T], /, replace: bool = False) -> None:
+ if not replace and self.has_any_key([self._get_key(i) for i in l]):
+ raise ValueError("Keys already exists!")
+ if self._before_add is not None:
+ l = [self._before_add(i) for i in l]
+ keys = [self._get_key(i) for i in l]
+ self._l.remove_all_if(lambda i: self._get_key(i) in keys).extend(l)
+
+ def clear(self) -> None:
+ self._l.clear()
+
+ def __iter__(self):
+ return iter(self._l)
+
+ def __len__(self):
+ return len(self._l)
diff --git a/tools/cru-py/cru/_util/_type.py b/tools/cru-py/cru/_util/_type.py
new file mode 100644
index 0000000..dc50def
--- /dev/null
+++ b/tools/cru-py/cru/_util/_type.py
@@ -0,0 +1,42 @@
+from types import NoneType
+from typing import Any
+
+from ._list import CanBeList, CruList
+
+DEFAULT_NONE_ERR = ValueError
+DEFAULT_NONE_ERR_MSG = "None is not allowed here."
+DEFAULT_TYPE_ERR = ValueError
+DEFAULT_TYPE_ERR_MSG = "Type of object is not allowed here."
+
+
+class TypeSet(set[type]):
+ def __init__(self, *l: type):
+ l = CruList.make(l).remove_all_value(None, NoneType)
+ if not l.all_is_instance(type):
+ raise TypeError("t must be a type or None.")
+ super().__init__(l)
+
+ def check_value(self, v: Any, /, allow_none: bool, empty_allow_all: bool = True, *,
+ none_err: type[Exception] = DEFAULT_NONE_ERR,
+ none_err_msg: str = DEFAULT_NONE_ERR_MSG,
+ type_err: type[Exception] = DEFAULT_TYPE_ERR,
+ type_err_msg: str = DEFAULT_TYPE_ERR_MSG) -> None:
+ if v is None:
+ if allow_none:
+ return
+ else:
+ raise none_err(none_err_msg)
+ if len(self) == 0 and empty_allow_all:
+ return
+ if type(v) not in self:
+ raise type_err(type_err_msg)
+
+ def check_value_list(self, l: CanBeList, /, allow_none: bool, empty_allow_all: bool = True, *,
+ none_err: type[Exception] = DEFAULT_NONE_ERR,
+ none_err_msg: str = DEFAULT_NONE_ERR_MSG,
+ type_err: type[Exception] = DEFAULT_TYPE_ERR,
+ type_err_msg: str = DEFAULT_TYPE_ERR_MSG) -> None:
+ l = CruList.make(l)
+ for v in l:
+ self.check_value(v, allow_none, empty_allow_all, none_err=none_err, none_err_msg=none_err_msg,
+ type_err=type_err, type_err_msg=type_err_msg)