aboutsummaryrefslogtreecommitdiff
path: root/tools/cru-py
diff options
context:
space:
mode:
authorcrupest <crupest@outlook.com>2024-11-11 01:12:29 +0800
committerYuqian Yang <crupest@crupest.life>2024-12-18 18:31:27 +0800
commit95da3ade5bfa6ef39923cd3fc2a551ad983c1537 (patch)
treed2bf4e40bf625c75768d5ccda2ca49dbaba97a9a /tools/cru-py
parenteff33fcbc8e78b1cd15332c229cd39ae9befbe5e (diff)
downloadcrupest-95da3ade5bfa6ef39923cd3fc2a551ad983c1537.tar.gz
crupest-95da3ade5bfa6ef39923cd3fc2a551ad983c1537.tar.bz2
crupest-95da3ade5bfa6ef39923cd3fc2a551ad983c1537.zip
HALF WORK: 2024.11.12
Diffstat (limited to 'tools/cru-py')
-rw-r--r--tools/cru-py/cru/property.py24
-rw-r--r--tools/cru-py/cru/service/docker.py9
-rw-r--r--tools/cru-py/cru/service/nginx.py5
-rw-r--r--tools/cru-py/cru/util/__init__.py15
-rw-r--r--tools/cru-py/cru/util/_const.py17
-rw-r--r--tools/cru-py/cru/util/_cru.py90
-rw-r--r--tools/cru-py/cru/util/_event.py41
-rw-r--r--tools/cru-py/cru/util/_func.py122
-rw-r--r--tools/cru-py/cru/util/_list.py591
-rw-r--r--tools/cru-py/crupest/template2.py45
10 files changed, 777 insertions, 182 deletions
diff --git a/tools/cru-py/cru/property.py b/tools/cru-py/cru/property.py
new file mode 100644
index 0000000..9549731
--- /dev/null
+++ b/tools/cru-py/cru/property.py
@@ -0,0 +1,24 @@
+import json
+from typing import Any
+
+
+class PropertyItem:
+ def __init__(self, value: Any):
+ self._value = value
+
+ @property
+ def value(self) -> Any:
+ return self._value
+
+ @value.setter
+ def value(self, value: Any):
+ self._value = value
+
+
+class PropertyTreeSection:
+ def __init__(self, data: dict[str, Any] | None = None) -> None:
+ self._data = data or {}
+
+class PropertyTree:
+ def __init__(self, data: dict[str, Any] | None = None) -> None:
+ self._data = data or {} \ No newline at end of file
diff --git a/tools/cru-py/cru/service/docker.py b/tools/cru-py/cru/service/docker.py
index 42d4a35..a57a246 100644
--- a/tools/cru-py/cru/service/docker.py
+++ b/tools/cru-py/cru/service/docker.py
@@ -1,4 +1,7 @@
import shutil
+import subprocess
+
+from ..util import L
class DockerController:
@@ -13,3 +16,9 @@ class DockerController:
self._docker_bin = shutil.which(self.DOCKER_BIN_NAME)
return self._docker_bin
+ def list_containers(self) -> L[str]:
+ p = subprocess.run([self.docker_bin, "container", "ls", ""], capture_output=True)
+ return p.stdout.decode("utf-8").splitlines()
+
+ def restart_container(self, container_name: str) -> None:
+ subprocess.run([self.docker_bin, "restart", container_name]) \ No newline at end of file
diff --git a/tools/cru-py/cru/service/nginx.py b/tools/cru-py/cru/service/nginx.py
index f9120de..94c4375 100644
--- a/tools/cru-py/cru/service/nginx.py
+++ b/tools/cru-py/cru/service/nginx.py
@@ -2,15 +2,10 @@ import json
import os
import re
import subprocess
-from os.path import join, basename, dirname
from typing import Literal, Any, cast, ClassVar
import jsonschema
-from crupest.template2 import Template2
-from crupest.tui import Paths, UserFriendlyException, create_dir_if_not_exists, console, Confirm, ensure_dir
-from crupest.ui_base import file_name_style
-
def restart_nginx(force=False) -> bool:
if not force:
diff --git a/tools/cru-py/cru/util/__init__.py b/tools/cru-py/cru/util/__init__.py
index 2cbb8f4..ecd9673 100644
--- a/tools/cru-py/cru/util/__init__.py
+++ b/tools/cru-py/cru/util/__init__.py
@@ -1,20 +1,25 @@
-from ._const import make_unique_object, make_bool_unique_object, CRU_NOT_FOUND, CRU_USE_DEFAULT, CRU_DONT_CHANGE, \
+from typing import Any
+
+from ._const import cru_make_unique_object, cru_make_bool_unique_object, CRU_NOT_FOUND, CRU_USE_DEFAULT, \
+ CRU_DONT_CHANGE, \
CRU_PLACEHOLDER
-from ._func import CruFunction, MetaFunction, RawFunctions, WrappedFunctions
+from ._func import CruFunction, CruFunctionMeta, CruRawFunctions, CruWrappedFunctions, CruFunctionGenerators
from ._list import CruList, CruInplaceList, CruUniqueKeyInplaceList, ListOperations, CanBeList, ElementOperation, \
ElementPredicate, ElementTransformer, OptionalElementOperation, ElementPredicate, OptionalElementTransformer
from ._type import TypeSet
F = CruFunction
-WF = WrappedFunctions
+WF = CruWrappedFunctions
+FG = CruFunctionGenerators
L = CruList
+
__all__ = [
"CRU_NOT_FOUND", "CRU_USE_DEFAULT", "CRU_DONT_CHANGE", "CRU_PLACEHOLDER",
- "CruFunction", "MetaFunction", "RawFunctions", "WrappedFunctions",
+ "CruFunction", "CruFunctionMeta", "CruRawFunctions", "CruWrappedFunctions", "CruFunctionGenerators",
"CruList", "CruInplaceList", "CruUniqueKeyInplaceList", "ListOperations",
"CanBeList", "ElementOperation", "ElementPredicate", "ElementTransformer",
"OptionalElementOperation", "ElementPredicate", "OptionalElementTransformer",
"TypeSet",
- "F", "WF", "L"
+ "F", "WF", "FG", "L"
]
diff --git a/tools/cru-py/cru/util/_const.py b/tools/cru-py/cru/util/_const.py
index ea67450..8140988 100644
--- a/tools/cru-py/cru/util/_const.py
+++ b/tools/cru-py/cru/util/_const.py
@@ -1,7 +1,9 @@
from typing import Any
+from ._cru import CRU
-def make_unique_object() -> Any:
+
+def cru_make_unique_object() -> Any:
class _CruUnique:
_i = False
@@ -21,7 +23,7 @@ def make_unique_object() -> Any:
return v
-def make_bool_unique_object(b: bool) -> Any:
+def cru_make_bool_unique_object(b: bool) -> Any:
class _CruBoolUnique:
_i = False
@@ -45,7 +47,10 @@ def make_bool_unique_object(b: bool) -> Any:
return v
-CRU_NOT_FOUND = make_bool_unique_object(False)
-CRU_USE_DEFAULT = make_unique_object()
-CRU_DONT_CHANGE = make_unique_object()
-CRU_PLACEHOLDER = make_unique_object()
+CRU_NOT_FOUND = cru_make_bool_unique_object(False)
+CRU_USE_DEFAULT = cru_make_unique_object()
+CRU_DONT_CHANGE = cru_make_unique_object()
+CRU_PLACEHOLDER = cru_make_unique_object()
+
+CRU.add_objects(cru_make_unique_object, cru_make_bool_unique_object, CRU_NOT_FOUND, CRU_USE_DEFAULT,
+ CRU_DONT_CHANGE, CRU_PLACEHOLDER)
diff --git a/tools/cru-py/cru/util/_cru.py b/tools/cru-py/cru/util/_cru.py
new file mode 100644
index 0000000..61a0ee1
--- /dev/null
+++ b/tools/cru-py/cru/util/_cru.py
@@ -0,0 +1,90 @@
+from typing import Any
+
+
+class _Cru:
+ NAME_PREFIXES = ("CRU_", "Cru", "cru_")
+
+ def __init__(self):
+ self._d: dict[str, Any] = {}
+
+ def all_names(self) -> list[str]:
+ return list(self._d.keys())
+
+ def get(self, name: str) -> Any:
+ return self._d[name]
+
+ def has_name(self, name: str) -> bool:
+ return name in self._d
+
+ @staticmethod
+ def _maybe_remove_prefix(name: str) -> str | None:
+ for prefix in _Cru.NAME_PREFIXES:
+ if name.startswith(prefix):
+ return name[len(prefix):]
+ return None
+
+ def _check_name_exist(self, *names: str) -> None:
+ for name in names:
+ if name is None: continue
+ if self.has_name(name):
+ raise ValueError(f"Name {name} exists in CRU.")
+
+ @staticmethod
+ def check_name_format(name: str) -> tuple[str, str]:
+ no_prefix_name = _Cru._maybe_remove_prefix(name)
+ if no_prefix_name is None:
+ raise ValueError(f"Name {name} is not prefixed with {_Cru.NAME_PREFIXES}.")
+ return name, no_prefix_name
+
+ @staticmethod
+ def _check_object_name(o) -> tuple[str, str]:
+ return _Cru.check_name_format(o.__name__)
+
+ def _do_add(self, o, *names: str | None) -> list[str]:
+ names = set(names)
+ names.remove(None)
+ for name in names:
+ self._d[name] = o
+ return list(names)
+
+ def add(self, o, name: str | None) -> tuple[str, str | None]:
+ if name is None:
+ name, no_prefix_name = self._check_object_name(o)
+ else:
+ no_prefix_name = self._maybe_remove_prefix(name)
+
+ self._check_name_exist(name, no_prefix_name)
+ self._do_add(o, name, no_prefix_name)
+ return name, no_prefix_name
+
+ def add_with_alias(self, o, name: str | None = None, *aliases: str) -> list[str]:
+ final_names = []
+ if name is None:
+ name, no_prefix_name = self._check_object_name(o)
+ self._check_name_exist(name, no_prefix_name)
+ final_names.extend([name, no_prefix_name])
+ for alias in aliases:
+ no_prefix_name = self._maybe_remove_prefix(alias)
+ self._check_name_exist(alias, no_prefix_name)
+ final_names.extend([alias, no_prefix_name])
+
+ return self._do_add(o, *final_names)
+
+ def add_objects(self, *objects):
+ final_list = []
+ for o in objects:
+ name, no_prefix_name = self._check_object_name(o)
+ self._check_name_exist(name, no_prefix_name)
+ final_list.append((o, name, no_prefix_name))
+ for o, name, no_prefix_name in final_list:
+ self._do_add(o, name, no_prefix_name)
+
+ def __getitem__(self, item):
+ return self.get(item)
+
+ def __getattr__(self, item):
+ return self.get(item)
+
+
+CRU_NAME_PREFIXES = _Cru.NAME_PREFIXES
+CRU = _Cru()
diff --git a/tools/cru-py/cru/util/_event.py b/tools/cru-py/cru/util/_event.py
new file mode 100644
index 0000000..813e33f
--- /dev/null
+++ b/tools/cru-py/cru/util/_event.py
@@ -0,0 +1,41 @@
+from typing import ParamSpec, TypeVar, Callable
+
+from ._list import CruInplaceList, CruList
+
+P = ParamSpec('P')
+R = TypeVar('R')
+F = Callable[P, R]
+
+
+class EventHandlerToken:
+ def __init__(self, event: "Event", handler: F, once: bool = False) -> None:
+ self._event = event
+ self._handler = handler
+ self._once = once
+
+ @property
+ def event(self) -> "Event":
+ return self._event
+
+ @property
+ def handler(self) -> F:
+ return self._handler
+
+ @property
+ def once(self) -> bool:
+ return self._once
+
+
+class Event:
+ def __init__(self, name: str) -> None:
+ self._name = name
+ self._tokens: CruInplaceList[EventHandlerToken] = CruInplaceList()
+
+ def register(self, handler: F, once: bool = False) -> EventHandlerToken:
+ token = EventHandlerToken(self, handler, once)
+ self._tokens.append(token)
+ return token
+
+ def unregister(self, *h: EventHandlerToken | F) -> int:
+
+ self._tokens.find_all_indices_if(lambda t: )
diff --git a/tools/cru-py/cru/util/_func.py b/tools/cru-py/cru/util/_func.py
index 3221c94..d9f8044 100644
--- a/tools/cru-py/cru/util/_func.py
+++ b/tools/cru-py/cru/util/_func.py
@@ -1,16 +1,14 @@
from collections.abc import Callable
-from typing import TypeVar, Any, ParamSpec
+from typing import TypeVar
+from ._cru import CRU
from ._const import CRU_PLACEHOLDER
T = TypeVar("T")
-R = TypeVar("R")
-R1 = TypeVar("R1")
-P = ParamSpec("P")
-P1 = ParamSpec("P1")
+_PLACEHOLDER = CRU_PLACEHOLDER
-class RawFunctions:
+class CruRawFunctions:
@staticmethod
def none(*_v, **_kwargs) -> None:
return None
@@ -28,15 +26,15 @@ class RawFunctions:
return v
@staticmethod
- def only_you(r: T, *_v, **_kwargs) -> T:
- return r
+ def only_you(v: T, *_v, **_kwargs) -> T:
+ return v
@staticmethod
- def equal(a: Any, b: Any) -> bool:
+ def equal(a, b) -> bool:
return a == b
@staticmethod
- def not_equal(a: Any, b: Any) -> bool:
+ def not_equal(a, b) -> bool:
return a != b
@staticmethod
@@ -44,95 +42,85 @@ class RawFunctions:
return not v
-class MetaFunction:
+class CruFunctionMeta:
@staticmethod
- def bind(f: Callable[P, R], *bind_args, **bind_kwargs) -> Callable[P1, R1]:
- def bound(*args, **kwargs):
+ def bind(func: Callable, *bind_args, **bind_kwargs) -> Callable:
+ def bound_func(*args, **kwargs):
popped = 0
real_args = []
- for a in bind_args:
- if a is CRU_PLACEHOLDER:
+ for arg in bind_args:
+ if arg is _PLACEHOLDER:
real_args.append(args[popped])
popped += 1
else:
- real_args.append(a)
+ real_args.append(arg)
real_args.extend(args[popped:])
- return f(*real_args, **(bind_kwargs | kwargs))
+ return func(*real_args, **(bind_kwargs | kwargs))
- return bound
+ return bound_func
@staticmethod
- def chain(*fs: Callable) -> Callable:
- if len(fs) == 0:
+ def chain(*funcs: Callable) -> Callable:
+ if len(funcs) == 0:
raise ValueError("At least one function is required!")
- rf = fs[0]
- for f in fs[1:]:
- def n(*args, **kwargs):
- r = rf(*args, **kwargs)
- r = r if isinstance(r, tuple) else (r,)
- return f(*r)
- rf = n
- return rf
+ final_func = funcs[0]
+ for func in funcs[1:]:
+ func_copy = func
- @staticmethod
- def chain_single(f: Callable[P, R], f1: Callable[P1, R1], *bind_args, **bind_kwargs) -> \
- Callable[
- P, R1]:
- return MetaFunction.chain(f, MetaFunction.bind(f1, *bind_args, **bind_kwargs))
+ def chained_func(*args, **kwargs):
+ results = final_func(*args, **kwargs)
+ results = results if isinstance(results, tuple) else (results,)
+ return func_copy(*results)
- convert_r = chain_single
+ final_func = chained_func
- @staticmethod
- def neg(f: Callable[P, bool]) -> Callable[P, bool]:
- return MetaFunction.convert_r(f, RawFunctions.not_)
+ return final_func
# Advanced Function Wrapper
class CruFunction:
- def __init__(self, f):
+ def __init__(self, f: Callable):
self._f = f
@property
def f(self) -> Callable:
return self._f
+ @property
+ def func(self) -> Callable:
+ return self.f
+
def bind(self, *bind_args, **bind_kwargs) -> "CruFunction":
- self._f = MetaFunction.bind(self._f, *bind_args, **bind_kwargs)
+ self._f = CruFunctionMeta.bind(self._f, *bind_args, **bind_kwargs)
return self
- def chain(self, *fs: Callable) -> "CruFunction":
- self._f = MetaFunction.chain(self._f, *fs)
+ def chain(self, *funcs: Callable) -> "CruFunction":
+ self._f = CruFunctionMeta.chain(self._f, *funcs)
return self
- def chain_single(self, f: Callable[P, R], f1: Callable[P1, R1], *bind_args,
- **bind_kwargs) -> "CruFunction":
- self._f = MetaFunction.chain_single(self._f, f, f1, *bind_args, **bind_kwargs)
- return self
+ def __call__(self, *args, **kwargs):
+ return self._f(*args, **kwargs)
- def convert_r(self, f: Callable[P, R], f1: Callable[P1, R1], *bind_args,
- **bind_kwargs) -> "CruFunction":
- self._f = MetaFunction.convert_r(self._f, f, f1, *bind_args, **bind_kwargs)
- return self
+ @staticmethod
+ def make_chain(base_func: Callable, *funcs: Callable) -> "CruFunction":
+ return CruFunction(base_func).chain(*funcs)
- def neg(self) -> "CruFunction":
- self._f = MetaFunction.neg(self._f)
- return self
- def __call__(self, *args, **kwargs):
- return self._f(*args, **kwargs)
+class CruWrappedFunctions:
+ none = CruFunction(CruRawFunctions.none)
+ true = CruFunction(CruRawFunctions.true)
+ false = CruFunction(CruRawFunctions.false)
+ identity = CruFunction(CruRawFunctions.identity)
+ only_you = CruFunction(CruRawFunctions.only_you)
+ equal = CruFunction(CruRawFunctions.equal)
+ not_equal = CruFunction(CruRawFunctions.not_equal)
+ not_ = CruFunction(CruRawFunctions.not_)
+
+class CruFunctionGenerators:
@staticmethod
- def make_chain(*fs: Callable) -> Callable[P, R1]:
- return CruFunction(MetaFunction.chain(*fs))
-
-
-class WrappedFunctions:
- none = CruFunction(RawFunctions.none)
- true = CruFunction(RawFunctions.true)
- false = CruFunction(RawFunctions.false)
- identity = CruFunction(RawFunctions.identity)
- only_you = CruFunction(RawFunctions.only_you)
- equal = CruFunction(RawFunctions.equal)
- not_equal = CruFunction(RawFunctions.not_equal)
- not_ = CruFunction(RawFunctions.not_)
+ def make_isinstance_of_types(*types: type) -> Callable:
+ return CruFunction(lambda v: type(v) in types)
+
+CRU.add_objects(CruRawFunctions, CruFunctionMeta, CruFunction, CruWrappedFunctions, CruFunctionGenerators)
diff --git a/tools/cru-py/cru/util/_list.py b/tools/cru-py/cru/util/_list.py
index 92cd88c..e1c8373 100644
--- a/tools/cru-py/cru/util/_list.py
+++ b/tools/cru-py/cru/util/_list.py
@@ -1,94 +1,557 @@
from collections.abc import Iterable, Callable
-from typing import TypeVar, ParamSpec, Any, Generic
+from dataclasses import dataclass
+from enum import Enum
+from typing import TypeVar, ParamSpec, Any, Generic, ClassVar, Optional, Union
from ._const import CRU_NOT_FOUND
T = TypeVar("T")
+O = TypeVar("O")
R = TypeVar("R")
-P = ParamSpec("P")
+F = TypeVar("F")
CanBeList = T | Iterable[T] | None
-ElementOperation = Callable[[T], Any]
+
+OptionalIndex = int | None
+OptionalType = type | None
+ElementOperation = Callable[[T], Any] | None
ElementPredicate = Callable[[T], bool]
-ElementTransformer = Callable[[Any], Any]
-OptionalElementOperation = Callable[[T], Any] | None
-OptionalElementTransformer = Callable[[Any], Any] | None
+ElementTransformer = Callable[[T], R]
+SelfElementTransformer = ElementTransformer[T, T]
+AnyElementTransformer = ElementTransformer[Any, Any]
+OptionalElementOperation = ElementOperation | None
+OptionalElementTransformer = ElementTransformer | None
+OptionalSelfElementTransformer = ElementTransformer[T, T]
+OptionalAnyElementTransformer = AnyElementTransformer | None
-class ListOperations:
+def _flatten_with_func(o: T, max_depth: int, is_leave: ElementPredicate[T],
+ get_children: SelfElementTransformer[T], depth: int = 0) -> Iterable[T]:
+ if depth == max_depth or is_leave(o):
+ yield o
+ return
+ for child in get_children(o):
+ yield from _flatten_with_func(child, max_depth, is_leave, get_children, depth + 1)
+
+
+class _Action(Enum):
+ SKIP = 0
+ SEND = 1
+ STOP = 2
+ AGGREGATE = 3
+
+
+@dataclass
+class _Result(Generic[T]):
+ Action: ClassVar[type[_Action]] = _Action
+
+ value: T | O | None
+ action: Action
+
+ @staticmethod
+ def skip() -> "_Result"[T]:
+ return _Result(None, _Action.SKIP)
+
+ @staticmethod
+ def send(value: Any) -> "_Result"[T]:
+ return _Result(value, _Action.SEND)
+
+ @staticmethod
+ def stop(value: Any = None) -> "_Result"[T]:
+ return _Result(value, _Action.STOP)
+
+ @staticmethod
+ def aggregate(*result: "_Result"[T]) -> "_Result"[T]:
+ return _Result(result, _Action.AGGREGATE)
@staticmethod
- def sub_by_indices(l: Iterable[T], *index: int) -> list[T]:
- return [v for i, v in enumerate(l) if i in index]
+ def send_last(value: Any) -> "_Result"[T]:
+ return _Result.aggregate(_Result.send(value), _Result.stop())
+
+ def flatten(self) -> Iterable["_Result"[T]]:
+ return _flatten_with_func(self, -1, lambda r: r.action != _Action.AGGREGATE, lambda r: r.value)
+
+
+_r_skip = _Result.skip
+_r_send = _Result.send
+_r_stop = _Result.stop
+_r_send_last = _Result.send_last
+_r_aggregate = _Result.aggregate
+
+
+class _Defaults:
+ @staticmethod
+ def true(_):
+ return True
+
+ @staticmethod
+ def false(_):
+ return False
+
+ @staticmethod
+ def not_found(_):
+ return CRU_NOT_FOUND
+
+
+def _default_upstream() -> Iterable[Iterable]:
+ return iter([])
+
+
+CruIterableUpstream = Iterable[Iterable]
+CruIterableOptionalUpstream = CruIterableUpstream | None
+
+
+class CruIterableCreators:
+ @staticmethod
+ def with_(o: Any, /, upstreams: CruIterableOptionalUpstream = _default_upstream()) -> "CruIterableWrapper":
+ return CruIterableWrapper(iter(o), upstreams)
+
+ @staticmethod
+ def empty(upstreams: CruIterableOptionalUpstream = _default_upstream()) -> "CruIterableWrapper":
+ return CruIterableCreators.with_([], upstreams)
+
+ @staticmethod
+ def range(a, b=None, c=None, /, upstreams: CruIterableOptionalUpstream = _default_upstream()) -> \
+ "CruIterableWrapper"[int]:
+ args = [arg for arg in [a, b, c] if arg is not None]
+ return CruIterableCreators.with_(range(*args), upstreams)
+
+ @staticmethod
+ def unite(*args: T, upstreams: CruIterableOptionalUpstream = _default_upstream()) -> "CruIterableWrapper"[T]:
+ return CruIterableCreators.with_(args, upstreams)
+
+ @staticmethod
+ def _concat(*iterables: Iterable) -> Iterable:
+ for iterable in iterables:
+ yield from iterable
+
+ @staticmethod
+ def concat(*iterables: Iterable,
+ upstreams: CruIterableOptionalUpstream = _default_upstream()) -> "CruIterableWrapper":
+ return CruIterableWrapper(CruIterableCreators._concat(*iterables), upstreams)
+
+
+class CruIterableWrapper(Generic[T]):
+ Upstream = CruIterableUpstream
+ OptionalUpstream = CruIterableOptionalUpstream
+ _Result = _Result[T]
+ _Operation = Callable[[T, int], _Result | Any | None]
+
+ def __init__(self, iterable: Iterable[T], /, upstreams: OptionalUpstream = _default_upstream()) -> None:
+ self._iterable = iterable
+ self._upstreams = None if upstreams is None else list(upstreams)
+
+ @property
+ def me(self) -> Iterable[T]:
+ return self._iterable
+
+ # TODO: Return Type
+ @property
+ def my_upstreams(self) -> Optional["CruIterableWrapper"]:
+ if self._upstreams is None:
+ return None
+ return CruIterableWrapper(iter(self._upstreams))
+
+ def disable_my_upstreams(self) -> "CruIterableWrapper"[T]:
+ return CruIterableWrapper(self._iterable, None)
+
+ def clear_my_upstreams(self) -> "CruIterableWrapper"[T]:
+ return CruIterableWrapper(self._iterable)
+
+ def _create_upstreams_prepend_self(self) -> Upstream:
+ yield self._iterable
+ yield self.my_upstreams
+
+ # TODO: Return Type
+ def _create_new_upstreams(self, append: bool = True) -> Optional["CruIterableWrapper"]:
+ if not append: return self.my_upstreams
+ if self.my_upstreams is None:
+ return None
+ return CruIterableWrapper(self._create_upstreams_prepend_self())
+
+ def clone_me(self, /, update_upstreams: bool = True) -> "CruIterableWrapper"[T]:
+ return CruIterableWrapper(self._iterable, self._create_new_upstreams(update_upstreams))
+
+ def replace_me_with(self, iterable: Iterable[O], /, update_upstreams: bool = True) -> "CruIterableWrapper"[O]:
+ return CruIterableCreators.with_(iterable, upstreams=self._create_new_upstreams(update_upstreams))
+
+ def replace_me_with_empty(self, /, update_upstreams: bool = True) -> "CruIterableWrapper"[O]:
+ return CruIterableCreators.empty(upstreams=self._create_new_upstreams(update_upstreams))
+
+ def replace_me_with_range(self, a, b=None, c=None, /, update_upstreams: bool = True) -> "CruIterableWrapper"[int]:
+ return CruIterableCreators.range(a, b, c, upstreams=self._create_new_upstreams(update_upstreams))
+
+ def replace_me_with_unite(self, *args: O, update_upstreams: bool = True) -> "CruIterableWrapper"[O]:
+ return CruIterableCreators.unite(*args, upstreams=self._create_new_upstreams(update_upstreams))
+
+ def replace_me_with_concat(self, *iterables: Iterable, update_upstreams: bool = True) -> "CruIterableWrapper":
+ return CruIterableCreators.concat(*iterables, upstreams=self._create_new_upstreams(update_upstreams))
@staticmethod
- def complement_indices(length: int, *index: int) -> list[int]:
- return [i for i in range(length) if i not in index]
+ def _non_result_to_yield(value: Any | None) -> _Result:
+ return _Result.stop(value)
@staticmethod
- def foreach(l: Iterable[T], *f: OptionalElementOperation[T]) -> None:
+ def _non_result_to_return(value: Any | None) -> _Result:
+ return _Result.stop(value)
+
+ def _real_iterate(self, operation: _Operation,
+ convert_non_result: Callable[[Any | None], _Result]) -> Iterable:
+
+ for index, element in enumerate(self._iterable):
+ result = operation(element, index)
+ if not isinstance(result, _Result):
+ result = convert_non_result(result)
+ for result in result.flatten():
+ if result.action == _Result.Action.STOP:
+ return result.value
+ elif result.action == _Result.Action.SEND:
+ yield result.value
+ else:
+ continue
+
+ def _new(self, operation: _Operation) -> "CruIterableWrapper":
+ return CruIterableWrapper(self._real_iterate(operation, CruIterableWrapper._non_result_to_yield),
+ self._create_new_upstreams())
+
+ def _result(self, operation: _Operation,
+ result_transform: OptionalElementTransformer[T, T | O] = None) -> T | O:
+ try:
+ self._real_iterate(operation, CruIterableWrapper._non_result_to_return)
+ except StopIteration as stop:
+ return stop.value if result_transform is None else result_transform(stop.value)
+
+ @staticmethod
+ def _make_set(iterable: Iterable, discard: Iterable | None) -> set:
+ s = set(iterable)
+ if discard is not None:
+ s = s - set(discard)
+ return s
+
+ @staticmethod
+ def _make_list(iterable: Iterable, discard: Iterable | None) -> list:
+ if discard is None: return list(iterable)
+ return [v for v in iterable if v not in discard]
+
+ # noinspection PyMethodMayBeStatic
+ def _help_make_set(self, iterable: Iterable, discard: Iterable | None = iter([None])) -> set:
+ return CruIterableWrapper._make_set(iterable, discard)
+
+ # noinspection PyMethodMayBeStatic
+ def _help_make_list(self, iterable: Iterable, discard: Iterable | None = iter([None])) -> list:
+ return CruIterableWrapper._make_list(iterable, discard)
+
+ def to_set(self, discard: Iterable | None = None) -> set[T]:
+ return CruIterableWrapper._make_set(self.me, discard)
+
+ def to_list(self, discard: Iterable | None = None) -> list[T]:
+ return CruIterableWrapper._make_list(self.me, discard)
+
+ def copy(self) -> "CruIterableWrapper":
+ return CruIterableWrapper(iter(self.to_list()), self._create_new_upstreams())
+
+ def concat(self, *iterable: Iterable[T]) -> "CruIterableWrapper":
+ return self.replace_me_with_concat(self.me, *iterable)
+
+ def all(self, predicate: ElementPredicate[T]) -> bool:
+ """
+ partial
+ """
+ return self._result(lambda v, _: predicate(v) and None, _Defaults.true)
+
+ def all_isinstance(self, *types: OptionalType) -> bool:
+ """
+ partial
+ """
+ types = self._help_make_set(types)
+ return self.all(lambda v: type(v) in types)
+
+ def any(self, predicate: ElementPredicate[T]) -> bool:
+ """
+ partial
+ """
+ return self._result(lambda v, _: predicate(v) or None, _Defaults.false)
+
+ def number(self) -> "CruIterableWrapper":
+ """
+ partial
+ """
+ return self._new(lambda _, i: i)
+
+ def take(self, predicate: ElementPredicate[T]) -> "CruIterableWrapper":
+ """
+ complete
+ """
+ return self._new(lambda v, _: _r_send(v) if predicate(v) else None)
+
+ def transform(self, *transformers: OptionalElementTransformer) -> "CruIterableWrapper":
+ """
+ complete
+ """
+
+ def _transform_element(element, _):
+ for transformer in self._help_make_list(transformers):
+ if transformer is not None:
+ element = transformer(element)
+ return _r_send(element)
+
+ return self._new(_transform_element)
+
+ def take_n(self, max_count: int, neg_is_clone: bool = True) -> "CruIterableWrapper":
+ """
+ partial
+ """
+ if max_count < 0:
+ if neg_is_clone:
+ return self.clone_me()
+ else:
+ raise ValueError("max_count must be 0 or positive.")
+ elif max_count == 0:
+ return self.drop_all()
+ return self._new(lambda v, i: _r_send(v) if i < max_count - 1 else _r_send_last(v))
+
+ def take_by_indices(self, *indices: OptionalIndex) -> "CruIterableWrapper":
+ """
+ partial
+ """
+ indices = self._help_make_set(indices)
+ max_index = max(indices)
+ return self.take_n(max_index + 1)._new(lambda v, i: _r_send(v) if i in indices else None)
+
+ def single_or(self, fallback: Any | None = CRU_NOT_FOUND) -> T | Any | CRU_NOT_FOUND:
+ """
+ partial
+ """
+ first_2 = self.take_n(2)
+ has_value = False
+ value = None
+ for element in first_2.me:
+ if has_value:
+ raise ValueError("More than one value found.")
+ has_value = True
+ value = element
+ if has_value:
+ return value
+ else:
+ return fallback
+
+ def first_or(self, predicate: ElementPredicate[T], fallback: Any | None = CRU_NOT_FOUND) -> T | CRU_NOT_FOUND:
+ """
+ partial
+ """
+ result_iterable = self.take_n(1).single_or()
+
+ @staticmethod
+ def first_index(iterable: Iterable[T], predicate: ElementPredicate[T]) -> int | CRU_NOT_FOUND:
+ """
+ partial
+ """
+ for index, element in enumerate(iterable):
+ if predicate(element):
+ return index
+
+ @staticmethod
+ def take_indices(iterable: Iterable[T], predicate: ElementPredicate[T]) -> Iterable[int]:
+ """
+ complete
+ """
+ for index, element in enumerate(iterable):
+ if predicate(element):
+ yield index
+
+ @staticmethod
+ def flatten(o, max_depth=-1, is_leave: ElementPredicate | None = None,
+ get_children: OptionalElementTransformer = None) -> Iterable:
+ """
+ complete
+ """
+ if is_leave is None:
+ is_leave = lambda v: not isinstance(v, Iterable)
+ if get_children is None:
+ get_children = lambda v: v
+ return CruIterableWrapper._flatten_with_func(o, max_depth, is_leave, get_children)
+
+ @staticmethod
+ def skip_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]:
+ """
+ complete
+ """
+ indices = set(indices) - {None}
+ for index, element in enumerate(iterable):
+ if index not in indices:
+ yield element
+
+ @staticmethod
+ def skip_if(iterable: Iterable[T], predicate: ElementPredicate[T]) -> list[T]:
+ """
+ complete
+ """
+ for element in iterable:
+ if not predicate(element):
+ yield element
+
+ def drop_all(self) -> "CruIterableWrapper":
+ return self.replace_me_with_empty()
+
+ @staticmethod
+ def remove_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]:
+ return [v for v in l if not p(v)]
+
+ @staticmethod
+ def remove_all_value(l: Iterable[T], *r: Any) -> list[T]:
+ return [v for v in l if v not in r]
+
+ @staticmethod
+ def replace_all_value(l: Iterable[T], old_value: Any, new_value: R) -> list[T | R]:
+ return [new_value if v == old_value else v for v in l]
+
+ @staticmethod
+ def foreach(iterable: Iterable[T], *f: OptionalElementOperation[T]) -> None:
if len(f) == 0: return
- for v in l:
+ for v in iterable:
for f_ in f:
if f_ is not None:
f_(v)
@staticmethod
- def all(l: Iterable[T], p: ElementPredicate[T]) -> bool:
- for v in l:
- if not p(v): return False
- return True
+ def make(v: CanBeList[T], /, none_to_empty_list: bool = True) -> list[T]:
+ if v is None and none_to_empty_list: return []
+ return list(v) if isinstance(v, Iterable) else [v]
+
+class ListOperations:
@staticmethod
- def all_is_instance(l: Iterable[T], *t: type) -> bool:
- return all(type(v) in t for v in l)
+ def all(iterable: Iterable[T], predicate: ElementPredicate[T]) -> bool:
+ """
+ partial
+ """
+ return _God.spy(iterable, lambda v, _: predicate(v) and None, _God.Default.true)
@staticmethod
- def any(l: Iterable[T], p: ElementPredicate[T]) -> bool:
- for v in l:
- if p(v): return True
- return False
+ def all_isinstance(iterable: Iterable[T], *types: OptionalType) -> bool:
+ """
+ partial
+ """
+ types = _God.help_make_set(types)
+ return ListOperations.all(iterable, lambda v: type(v) in types)
@staticmethod
- def find_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]:
- return [v for v in l if p(v)]
+ def any(iterable: Iterable[T], predicate: ElementPredicate[T]) -> bool:
+ """
+ partial
+ """
+ return _God.spy(iterable, lambda v, _: predicate(v) or None, _God.Default.false)
@staticmethod
- def find_if(l: Iterable[T], p: ElementPredicate[T]) -> T | CRU_NOT_FOUND:
- r = ListOperations.find_all_if(l, p)
- return r[0] if len(r) > 0 else CRU_NOT_FOUND
+ def indices(iterable: Iterable[T]) -> Iterable[int]:
+ """
+ partial
+ """
+ return _God.new(iterable, lambda _, i: i)
@staticmethod
- def find_all_indices_if(l: Iterable[T], p: ElementPredicate[T]) -> list[int]:
- return [i for i, v in enumerate(l) if p(v)]
+ def take(iterable: Iterable[T], predicate: ElementPredicate[T]) -> Iterable[T]:
+ """
+ complete
+ """
+ return _God.new(iterable, lambda v, _: _God.yield_(v) if predicate(v) else None)
@staticmethod
- def find_index_if(l: Iterable[T], p: ElementPredicate[T]) -> int | CRU_NOT_FOUND:
- r = ListOperations.find_all_indices_if(l, p)
- return r[0] if len(r) > 0 else CRU_NOT_FOUND
+ def transform(iterable: Iterable[T], *transformers: OptionalElementTransformer) -> Iterable:
+ """
+ complete
+ """
+
+ def _transform_element(element, _):
+ for transformer in transformers:
+ if transformer is not None:
+ element = transformer(element)
+ return element
+
+ return _God.new(iterable, _transform_element)
@staticmethod
- def transform(l: Iterable[T], *f: OptionalElementTransformer) -> list[Any]:
- r = []
- for v in l:
- for f_ in f:
- if f_ is not None:
- v = f_(v)
- r.append(v)
- return r
+ def take_n(iterable: Iterable[T], n: int) -> Iterable[T]:
+ """
+ partial
+ """
+ if n < 0:
+ return iterable
+ elif n == 0:
+ return []
+ return range(n)._god_yield(iterable, lambda v, i: _yield(v) if i < n else _return())
+
+ @staticmethod
+ def take_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]:
+ """
+ partial
+ """
+ indices = set(indices) - {None}
+ max_index = max(indices)
+ iterable = ListOperations.take_n(iterable, max_index + 1)
+ return _god_yield(iterable, lambda v, i: _yield(v) if i in indices else None)
+
+ @staticmethod
+ def first(iterable: Iterable[T]) -> T | CRU_NOT_FOUND:
+ """
+ partial
+ """
+ result_iterable = ListOperations.take_n(iterable, 1)
+ for element in result_iterable:
+ return element
+ return CRU_NOT_FOUND
@staticmethod
- def transform_if(l: Iterable[T], f: OptionalElementTransformer, p: ElementPredicate[T]) -> list[Any]:
- return [(f(v) if f else v) for v in l if p(v)]
+ def first_index(iterable: Iterable[T], predicate: ElementPredicate[T]) -> int | CRU_NOT_FOUND:
+ """
+ partial
+ """
+ for index, element in enumerate(iterable):
+ if predicate(element):
+ return index
@staticmethod
- def remove_by_indices(l: Iterable[T], *index: int | None) -> list[T]:
- return [v for i, v in enumerate(l) if i not in index]
+ def take_indices(iterable: Iterable[T], predicate: ElementPredicate[T]) -> Iterable[int]:
+ """
+ complete
+ """
+ for index, element in enumerate(iterable):
+ if predicate(element):
+ yield index
@staticmethod
- def remove_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]:
- i = ListOperations.find_index_if(l, p)
- return ListOperations.remove_by_indices(l, i)
+ def _flatten(o, depth: int, max_depth: int) -> Iterable:
+ if depth == max_depth or not isinstance(o, Iterable):
+ yield o
+ return
+ for v in o:
+ yield from ListOperations._flatten(v, depth + 1, max_depth)
+
+ @staticmethod
+ def flatten(o, max_depth=-1) -> Iterable:
+ """
+ complete
+ """
+ return ListOperations._flatten(o, 0, max_depth)
+
+ @staticmethod
+ def skip_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]:
+ """
+ complete
+ """
+ indices = set(indices) - {None}
+ for index, element in enumerate(iterable):
+ if index not in indices:
+ yield element
+
+ @staticmethod
+ def skip_if(iterable: Iterable[T], predicate: ElementPredicate[T]) -> list[T]:
+ """
+ complete
+ """
+ for element in iterable:
+ if not predicate(element):
+ yield element
@staticmethod
def remove_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]:
@@ -103,6 +566,14 @@ class ListOperations:
return [new_value if v == old_value else v for v in l]
@staticmethod
+ def foreach(iterable: Iterable[T], *f: OptionalElementOperation[T]) -> None:
+ if len(f) == 0: return
+ for v in iterable:
+ for f_ in f:
+ if f_ is not None:
+ f_(v)
+
+ @staticmethod
def make(v: CanBeList[T], /, none_to_empty_list: bool = True) -> list[T]:
if v is None and none_to_empty_list: return []
return list(v) if isinstance(v, Iterable) else [v]
@@ -116,6 +587,10 @@ class CruList(list, Generic[T]):
def sub_by_indices(self, *index: int) -> "CruList"[T]:
return CruList(ListOperations.sub_by_indices(self, *index))
+ def split_by_indices(self, *index: int) -> tuple["CruList"[T], "CruList"[T]]:
+ l1, l2 = ListOperations.split_by_indices(self, *index)
+ return CruList(l1), CruList(l2)
+
def complement_indices(self, *index: int) -> list[int]:
return ListOperations.complement_indices(len(self), *index)
@@ -126,22 +601,30 @@ class CruList(list, Generic[T]):
return ListOperations.all(self, p)
def all_is_instance(self, *t: type) -> bool:
- return ListOperations.all_is_instance(self, *t)
+ return ListOperations.all_isinstance(self, *t)
def any(self, p: ElementPredicate[T]) -> bool:
return ListOperations.any(self, p)
def find_all_if(self, p: ElementPredicate[T]) -> "CruList"[T]:
- return CruList(ListOperations.find_all_if(self, p))
+ return CruList(ListOperations.take(self, p))
def find_if(self, p: ElementPredicate[T]) -> T | CRU_NOT_FOUND:
- return ListOperations.find_if(self, p)
+ return ListOperations.first(self, p)
def find_all_indices_if(self, p: ElementPredicate[T]) -> "CruList"[int]:
- return CruList(ListOperations.find_all_indices_if(self, p))
+ return CruList(ListOperations.take_indices(self, p))
def find_index_if(self, p: ElementPredicate[T]) -> int | CRU_NOT_FOUND:
- return ListOperations.find_index_if(self, p)
+ return ListOperations.first_index(self, p)
+
+ def split_if(self, p: ElementPredicate[T]) -> tuple["CruList"[T], "CruList"[T]]:
+ l1, l2 = ListOperations.split_if(self, p)
+ return CruList(l1), CruList(l2)
+
+ def split_by_types(self, *t: type) -> tuple["CruList"[T], "CruList"[T]]:
+ l1, l2 = ListOperations.split_by_types(self, *t)
+ return CruList(l1), CruList(l2)
def transform(self, *f: OptionalElementTransformer) -> "CruList"[Any]:
return CruList(ListOperations.transform(self, *f))
@@ -150,7 +633,7 @@ class CruList(list, Generic[T]):
return CruList(ListOperations.transform_if(self, f, p))
def remove_by_indices(self, *index: int) -> "CruList"[T]:
- return CruList(ListOperations.remove_by_indices(self, *index))
+ return CruList(ListOperations.skip_by_indices(self, *index))
def remove_if(self, p: ElementPredicate[T]) -> "CruList"[T]:
return CruList(ListOperations.remove_if(self, p))
diff --git a/tools/cru-py/crupest/template2.py b/tools/cru-py/crupest/template2.py
deleted file mode 100644
index ae096df..0000000
--- a/tools/cru-py/crupest/template2.py
+++ /dev/null
@@ -1,45 +0,0 @@
-import os.path
-import re
-
-_template_filename_suffix = ".template"
-_template_var_regex = r"\$([-_a-zA-Z0-9]+)"
-_template_var_brace_regex = r"\$\{\s*([-_a-zA-Z0-9]+?)\s*\}"
-
-
-class Template2:
-
- @staticmethod
- def from_file(template_path: str) -> "Template2":
- if not template_path.endswith(_template_filename_suffix):
- raise Exception(
- "Template file must have a name ending with .template.")
- template_name = os.path.basename(
- template_path)[:-len(_template_filename_suffix)]
- with open(template_path, "r") as f:
- template = f.read()
- return Template2(template_name, template, template_path=template_path)
-
- def __init__(self, template_name: str, template: str, *, template_path: str | None = None) -> None:
- self.template_name = template_name
- self.template = template
- self.template_path = template_path
- self.var_set = set()
- for match in re.finditer(_template_var_regex, self.template):
- self.var_set.add(match.group(1))
- for match in re.finditer(_template_var_brace_regex, self.template):
- self.var_set.add(match.group(1))
-
- def partial_render(self, vars: dict[str, str]) -> "Template2":
- t = self.render(vars)
- return Template2(self.template_name, t, template_path=self.template_path)
-
- def render(self, vars: dict[str, str]) -> str:
- for name in vars.keys():
- if name not in self.var_set:
- raise ValueError(f"Invalid var name {name}.")
-
- text = self.template
- for name, value in vars.items():
- text = text.replace("$" + name, value)
- text = re.sub(r"\$\{\s*" + name + r"\s*\}", value, text)
- return text