diff options
| author | crupest <crupest@outlook.com> | 2024-11-11 01:12:29 +0800 | 
|---|---|---|
| committer | Yuqian Yang <crupest@crupest.life> | 2024-12-18 18:31:27 +0800 | 
| commit | 453a1efefcd5088b34331cad3df91a0b906d0254 (patch) | |
| tree | d2bf4e40bf625c75768d5ccda2ca49dbaba97a9a | |
| parent | 3d2ed6821112179743083364aac705c6e78c08b6 (diff) | |
| download | crupest-453a1efefcd5088b34331cad3df91a0b906d0254.tar.gz crupest-453a1efefcd5088b34331cad3df91a0b906d0254.tar.bz2 crupest-453a1efefcd5088b34331cad3df91a0b906d0254.zip | |
HALF WORK: 2024.11.12
| -rw-r--r-- | tools/cru-py/cru/property.py | 24 | ||||
| -rw-r--r-- | tools/cru-py/cru/service/docker.py | 9 | ||||
| -rw-r--r-- | tools/cru-py/cru/service/nginx.py | 5 | ||||
| -rw-r--r-- | tools/cru-py/cru/util/__init__.py | 15 | ||||
| -rw-r--r-- | tools/cru-py/cru/util/_const.py | 17 | ||||
| -rw-r--r-- | tools/cru-py/cru/util/_cru.py | 90 | ||||
| -rw-r--r-- | tools/cru-py/cru/util/_event.py | 41 | ||||
| -rw-r--r-- | tools/cru-py/cru/util/_func.py | 122 | ||||
| -rw-r--r-- | tools/cru-py/cru/util/_list.py | 591 | ||||
| -rw-r--r-- | tools/cru-py/crupest/template2.py | 45 | 
10 files changed, 777 insertions, 182 deletions
| diff --git a/tools/cru-py/cru/property.py b/tools/cru-py/cru/property.py new file mode 100644 index 0000000..9549731 --- /dev/null +++ b/tools/cru-py/cru/property.py @@ -0,0 +1,24 @@ +import json +from typing import Any + + +class PropertyItem: +    def __init__(self, value: Any): +        self._value = value + +    @property +    def value(self) -> Any: +        return self._value + +    @value.setter +    def value(self, value: Any): +        self._value = value + + +class PropertyTreeSection: +    def __init__(self, data: dict[str, Any] | None = None) -> None: +        self._data = data or {} + +class PropertyTree: +    def __init__(self, data: dict[str, Any] | None = None) -> None: +        self._data = data or {}
\ No newline at end of file diff --git a/tools/cru-py/cru/service/docker.py b/tools/cru-py/cru/service/docker.py index 42d4a35..a57a246 100644 --- a/tools/cru-py/cru/service/docker.py +++ b/tools/cru-py/cru/service/docker.py @@ -1,4 +1,7 @@  import shutil +import subprocess + +from ..util import L  class DockerController: @@ -13,3 +16,9 @@ class DockerController:              self._docker_bin = shutil.which(self.DOCKER_BIN_NAME)          return self._docker_bin +    def list_containers(self) -> L[str]: +        p = subprocess.run([self.docker_bin, "container", "ls", ""], capture_output=True) +        return p.stdout.decode("utf-8").splitlines() + +    def restart_container(self, container_name: str) -> None: +        subprocess.run([self.docker_bin, "restart", container_name])
\ No newline at end of file diff --git a/tools/cru-py/cru/service/nginx.py b/tools/cru-py/cru/service/nginx.py index f9120de..94c4375 100644 --- a/tools/cru-py/cru/service/nginx.py +++ b/tools/cru-py/cru/service/nginx.py @@ -2,15 +2,10 @@ import json  import os  import re  import subprocess -from os.path import join, basename, dirname  from typing import Literal, Any, cast, ClassVar  import jsonschema -from crupest.template2 import Template2 -from crupest.tui import Paths, UserFriendlyException, create_dir_if_not_exists, console, Confirm, ensure_dir -from crupest.ui_base import file_name_style -  def restart_nginx(force=False) -> bool:      if not force: diff --git a/tools/cru-py/cru/util/__init__.py b/tools/cru-py/cru/util/__init__.py index 2cbb8f4..ecd9673 100644 --- a/tools/cru-py/cru/util/__init__.py +++ b/tools/cru-py/cru/util/__init__.py @@ -1,20 +1,25 @@ -from ._const import make_unique_object, make_bool_unique_object, CRU_NOT_FOUND, CRU_USE_DEFAULT, CRU_DONT_CHANGE, \ +from typing import Any + +from ._const import cru_make_unique_object, cru_make_bool_unique_object, CRU_NOT_FOUND, CRU_USE_DEFAULT, \ +    CRU_DONT_CHANGE, \      CRU_PLACEHOLDER -from ._func import CruFunction, MetaFunction, RawFunctions, WrappedFunctions +from ._func import CruFunction, CruFunctionMeta, CruRawFunctions, CruWrappedFunctions, CruFunctionGenerators  from ._list import CruList, CruInplaceList, CruUniqueKeyInplaceList, ListOperations, CanBeList, ElementOperation, \      ElementPredicate, ElementTransformer, OptionalElementOperation, ElementPredicate, OptionalElementTransformer  from ._type import TypeSet  F = CruFunction -WF = WrappedFunctions +WF = CruWrappedFunctions +FG = CruFunctionGenerators  L = CruList +  __all__ = [      "CRU_NOT_FOUND", "CRU_USE_DEFAULT", "CRU_DONT_CHANGE", "CRU_PLACEHOLDER", -    "CruFunction", "MetaFunction", "RawFunctions", "WrappedFunctions", +    "CruFunction", "CruFunctionMeta", "CruRawFunctions", "CruWrappedFunctions", "CruFunctionGenerators",      "CruList", "CruInplaceList", "CruUniqueKeyInplaceList", "ListOperations",      "CanBeList", "ElementOperation", "ElementPredicate", "ElementTransformer",      "OptionalElementOperation", "ElementPredicate", "OptionalElementTransformer",      "TypeSet", -    "F", "WF", "L" +    "F", "WF", "FG", "L"  ] diff --git a/tools/cru-py/cru/util/_const.py b/tools/cru-py/cru/util/_const.py index ea67450..8140988 100644 --- a/tools/cru-py/cru/util/_const.py +++ b/tools/cru-py/cru/util/_const.py @@ -1,7 +1,9 @@  from typing import Any +from ._cru import CRU -def make_unique_object() -> Any: + +def cru_make_unique_object() -> Any:      class _CruUnique:          _i = False @@ -21,7 +23,7 @@ def make_unique_object() -> Any:      return v -def make_bool_unique_object(b: bool) -> Any: +def cru_make_bool_unique_object(b: bool) -> Any:      class _CruBoolUnique:          _i = False @@ -45,7 +47,10 @@ def make_bool_unique_object(b: bool) -> Any:      return v -CRU_NOT_FOUND = make_bool_unique_object(False) -CRU_USE_DEFAULT = make_unique_object() -CRU_DONT_CHANGE = make_unique_object() -CRU_PLACEHOLDER = make_unique_object() +CRU_NOT_FOUND = cru_make_bool_unique_object(False) +CRU_USE_DEFAULT = cru_make_unique_object() +CRU_DONT_CHANGE = cru_make_unique_object() +CRU_PLACEHOLDER = cru_make_unique_object() + +CRU.add_objects(cru_make_unique_object, cru_make_bool_unique_object, CRU_NOT_FOUND, CRU_USE_DEFAULT, +                CRU_DONT_CHANGE, CRU_PLACEHOLDER) diff --git a/tools/cru-py/cru/util/_cru.py b/tools/cru-py/cru/util/_cru.py new file mode 100644 index 0000000..61a0ee1 --- /dev/null +++ b/tools/cru-py/cru/util/_cru.py @@ -0,0 +1,90 @@ +from typing import Any + + +class _Cru: +    NAME_PREFIXES = ("CRU_", "Cru", "cru_") + +    def __init__(self): +        self._d: dict[str, Any] = {} + +    def all_names(self) -> list[str]: +        return list(self._d.keys()) + +    def get(self, name: str) -> Any: +        return self._d[name] + +    def has_name(self, name: str) -> bool: +        return name in self._d + +    @staticmethod +    def _maybe_remove_prefix(name: str) -> str | None: +        for prefix in _Cru.NAME_PREFIXES: +            if name.startswith(prefix): +                return name[len(prefix):] +        return None + +    def _check_name_exist(self, *names: str) -> None: +        for name in names: +            if name is None: continue +            if self.has_name(name): +                raise ValueError(f"Name {name} exists in CRU.") + +    @staticmethod +    def check_name_format(name: str) -> tuple[str, str]: +        no_prefix_name = _Cru._maybe_remove_prefix(name) +        if no_prefix_name is None: +            raise ValueError(f"Name {name} is not prefixed with {_Cru.NAME_PREFIXES}.") +        return name, no_prefix_name + +    @staticmethod +    def _check_object_name(o) -> tuple[str, str]: +        return _Cru.check_name_format(o.__name__) + +    def _do_add(self, o, *names: str | None) -> list[str]: +        names = set(names) +        names.remove(None) +        for name in names: +            self._d[name] = o +        return list(names) + +    def add(self, o, name: str | None) -> tuple[str, str | None]: +        if name is None: +            name, no_prefix_name = self._check_object_name(o) +        else: +            no_prefix_name = self._maybe_remove_prefix(name) + +        self._check_name_exist(name, no_prefix_name) +        self._do_add(o, name, no_prefix_name) +        return name, no_prefix_name + +    def add_with_alias(self, o, name: str | None = None, *aliases: str) -> list[str]: +        final_names = [] +        if name is None: +            name, no_prefix_name = self._check_object_name(o) +            self._check_name_exist(name, no_prefix_name) +            final_names.extend([name, no_prefix_name]) +        for alias in aliases: +            no_prefix_name = self._maybe_remove_prefix(alias) +            self._check_name_exist(alias, no_prefix_name) +            final_names.extend([alias, no_prefix_name]) + +        return self._do_add(o, *final_names) + +    def add_objects(self, *objects): +        final_list = [] +        for o in objects: +            name, no_prefix_name = self._check_object_name(o) +            self._check_name_exist(name, no_prefix_name) +            final_list.append((o, name, no_prefix_name)) +        for o, name, no_prefix_name in final_list: +            self._do_add(o, name, no_prefix_name) + +    def __getitem__(self, item): +        return self.get(item) + +    def __getattr__(self, item): +        return self.get(item) + + +CRU_NAME_PREFIXES = _Cru.NAME_PREFIXES +CRU = _Cru() diff --git a/tools/cru-py/cru/util/_event.py b/tools/cru-py/cru/util/_event.py new file mode 100644 index 0000000..813e33f --- /dev/null +++ b/tools/cru-py/cru/util/_event.py @@ -0,0 +1,41 @@ +from typing import ParamSpec, TypeVar, Callable + +from ._list import CruInplaceList, CruList + +P = ParamSpec('P') +R = TypeVar('R') +F = Callable[P, R] + + +class EventHandlerToken: +    def __init__(self, event: "Event", handler: F, once: bool = False) -> None: +        self._event = event +        self._handler = handler +        self._once = once + +    @property +    def event(self) -> "Event": +        return self._event + +    @property +    def handler(self) -> F: +        return self._handler + +    @property +    def once(self) -> bool: +        return self._once + + +class Event: +    def __init__(self, name: str) -> None: +        self._name = name +        self._tokens: CruInplaceList[EventHandlerToken] = CruInplaceList() + +    def register(self, handler: F, once: bool = False) -> EventHandlerToken: +        token = EventHandlerToken(self, handler, once) +        self._tokens.append(token) +        return token + +    def unregister(self, *h: EventHandlerToken | F) -> int: + +        self._tokens.find_all_indices_if(lambda t: ) diff --git a/tools/cru-py/cru/util/_func.py b/tools/cru-py/cru/util/_func.py index 3221c94..d9f8044 100644 --- a/tools/cru-py/cru/util/_func.py +++ b/tools/cru-py/cru/util/_func.py @@ -1,16 +1,14 @@  from collections.abc import Callable -from typing import TypeVar, Any, ParamSpec +from typing import TypeVar +from ._cru import CRU  from ._const import CRU_PLACEHOLDER  T = TypeVar("T") -R = TypeVar("R") -R1 = TypeVar("R1") -P = ParamSpec("P") -P1 = ParamSpec("P1") +_PLACEHOLDER = CRU_PLACEHOLDER -class RawFunctions: +class CruRawFunctions:      @staticmethod      def none(*_v, **_kwargs) -> None:          return None @@ -28,15 +26,15 @@ class RawFunctions:          return v      @staticmethod -    def only_you(r: T, *_v, **_kwargs) -> T: -        return r +    def only_you(v: T, *_v, **_kwargs) -> T: +        return v      @staticmethod -    def equal(a: Any, b: Any) -> bool: +    def equal(a, b) -> bool:          return a == b      @staticmethod -    def not_equal(a: Any, b: Any) -> bool: +    def not_equal(a, b) -> bool:          return a != b      @staticmethod @@ -44,95 +42,85 @@ class RawFunctions:          return not v -class MetaFunction: +class CruFunctionMeta:      @staticmethod -    def bind(f: Callable[P, R], *bind_args, **bind_kwargs) -> Callable[P1, R1]: -        def bound(*args, **kwargs): +    def bind(func: Callable, *bind_args, **bind_kwargs) -> Callable: +        def bound_func(*args, **kwargs):              popped = 0              real_args = [] -            for a in bind_args: -                if a is CRU_PLACEHOLDER: +            for arg in bind_args: +                if arg is _PLACEHOLDER:                      real_args.append(args[popped])                      popped += 1                  else: -                    real_args.append(a) +                    real_args.append(arg)              real_args.extend(args[popped:]) -            return f(*real_args, **(bind_kwargs | kwargs)) +            return func(*real_args, **(bind_kwargs | kwargs)) -        return bound +        return bound_func      @staticmethod -    def chain(*fs: Callable) -> Callable: -        if len(fs) == 0: +    def chain(*funcs: Callable) -> Callable: +        if len(funcs) == 0:              raise ValueError("At least one function is required!") -        rf = fs[0] -        for f in fs[1:]: -            def n(*args, **kwargs): -                r = rf(*args, **kwargs) -                r = r if isinstance(r, tuple) else (r,) -                return f(*r) -            rf = n -        return rf +        final_func = funcs[0] +        for func in funcs[1:]: +            func_copy = func -    @staticmethod -    def chain_single(f: Callable[P, R], f1: Callable[P1, R1], *bind_args, **bind_kwargs) -> \ -            Callable[ -                P, R1]: -        return MetaFunction.chain(f, MetaFunction.bind(f1, *bind_args, **bind_kwargs)) +            def chained_func(*args, **kwargs): +                results = final_func(*args, **kwargs) +                results = results if isinstance(results, tuple) else (results,) +                return func_copy(*results) -    convert_r = chain_single +            final_func = chained_func -    @staticmethod -    def neg(f: Callable[P, bool]) -> Callable[P, bool]: -        return MetaFunction.convert_r(f, RawFunctions.not_) +        return final_func  # Advanced Function Wrapper  class CruFunction: -    def __init__(self, f): +    def __init__(self, f: Callable):          self._f = f      @property      def f(self) -> Callable:          return self._f +    @property +    def func(self) -> Callable: +        return self.f +      def bind(self, *bind_args, **bind_kwargs) -> "CruFunction": -        self._f = MetaFunction.bind(self._f, *bind_args, **bind_kwargs) +        self._f = CruFunctionMeta.bind(self._f, *bind_args, **bind_kwargs)          return self -    def chain(self, *fs: Callable) -> "CruFunction": -        self._f = MetaFunction.chain(self._f, *fs) +    def chain(self, *funcs: Callable) -> "CruFunction": +        self._f = CruFunctionMeta.chain(self._f, *funcs)          return self -    def chain_single(self, f: Callable[P, R], f1: Callable[P1, R1], *bind_args, -                     **bind_kwargs) -> "CruFunction": -        self._f = MetaFunction.chain_single(self._f, f, f1, *bind_args, **bind_kwargs) -        return self +    def __call__(self, *args, **kwargs): +        return self._f(*args, **kwargs) -    def convert_r(self, f: Callable[P, R], f1: Callable[P1, R1], *bind_args, -                  **bind_kwargs) -> "CruFunction": -        self._f = MetaFunction.convert_r(self._f, f, f1, *bind_args, **bind_kwargs) -        return self +    @staticmethod +    def make_chain(base_func: Callable, *funcs: Callable) -> "CruFunction": +        return CruFunction(base_func).chain(*funcs) -    def neg(self) -> "CruFunction": -        self._f = MetaFunction.neg(self._f) -        return self -    def __call__(self, *args, **kwargs): -        return self._f(*args, **kwargs) +class CruWrappedFunctions: +    none = CruFunction(CruRawFunctions.none) +    true = CruFunction(CruRawFunctions.true) +    false = CruFunction(CruRawFunctions.false) +    identity = CruFunction(CruRawFunctions.identity) +    only_you = CruFunction(CruRawFunctions.only_you) +    equal = CruFunction(CruRawFunctions.equal) +    not_equal = CruFunction(CruRawFunctions.not_equal) +    not_ = CruFunction(CruRawFunctions.not_) + +class CruFunctionGenerators:      @staticmethod -    def make_chain(*fs: Callable) -> Callable[P, R1]: -        return CruFunction(MetaFunction.chain(*fs)) - - -class WrappedFunctions: -    none = CruFunction(RawFunctions.none) -    true = CruFunction(RawFunctions.true) -    false = CruFunction(RawFunctions.false) -    identity = CruFunction(RawFunctions.identity) -    only_you = CruFunction(RawFunctions.only_you) -    equal = CruFunction(RawFunctions.equal) -    not_equal = CruFunction(RawFunctions.not_equal) -    not_ = CruFunction(RawFunctions.not_) +    def make_isinstance_of_types(*types: type) -> Callable: +        return CruFunction(lambda v: type(v) in types) + +CRU.add_objects(CruRawFunctions, CruFunctionMeta, CruFunction, CruWrappedFunctions, CruFunctionGenerators) diff --git a/tools/cru-py/cru/util/_list.py b/tools/cru-py/cru/util/_list.py index 92cd88c..e1c8373 100644 --- a/tools/cru-py/cru/util/_list.py +++ b/tools/cru-py/cru/util/_list.py @@ -1,94 +1,557 @@  from collections.abc import Iterable, Callable -from typing import TypeVar, ParamSpec, Any, Generic +from dataclasses import dataclass +from enum import Enum +from typing import TypeVar, ParamSpec, Any, Generic, ClassVar, Optional, Union  from ._const import CRU_NOT_FOUND  T = TypeVar("T") +O = TypeVar("O")  R = TypeVar("R") -P = ParamSpec("P") +F = TypeVar("F")  CanBeList = T | Iterable[T] | None -ElementOperation = Callable[[T], Any] + +OptionalIndex = int | None +OptionalType = type | None +ElementOperation = Callable[[T], Any] | None  ElementPredicate = Callable[[T], bool] -ElementTransformer = Callable[[Any], Any] -OptionalElementOperation = Callable[[T], Any] | None -OptionalElementTransformer = Callable[[Any], Any] | None +ElementTransformer = Callable[[T], R] +SelfElementTransformer = ElementTransformer[T, T] +AnyElementTransformer = ElementTransformer[Any, Any] +OptionalElementOperation = ElementOperation | None +OptionalElementTransformer = ElementTransformer | None +OptionalSelfElementTransformer = ElementTransformer[T, T] +OptionalAnyElementTransformer = AnyElementTransformer | None -class ListOperations: +def _flatten_with_func(o: T, max_depth: int, is_leave: ElementPredicate[T], +                       get_children: SelfElementTransformer[T], depth: int = 0) -> Iterable[T]: +    if depth == max_depth or is_leave(o): +        yield o +        return +    for child in get_children(o): +        yield from _flatten_with_func(child, max_depth, is_leave, get_children, depth + 1) + + +class _Action(Enum): +    SKIP = 0 +    SEND = 1 +    STOP = 2 +    AGGREGATE = 3 + + +@dataclass +class _Result(Generic[T]): +    Action: ClassVar[type[_Action]] = _Action + +    value: T | O | None +    action: Action + +    @staticmethod +    def skip() -> "_Result"[T]: +        return _Result(None, _Action.SKIP) + +    @staticmethod +    def send(value: Any) -> "_Result"[T]: +        return _Result(value, _Action.SEND) + +    @staticmethod +    def stop(value: Any = None) -> "_Result"[T]: +        return _Result(value, _Action.STOP) + +    @staticmethod +    def aggregate(*result: "_Result"[T]) -> "_Result"[T]: +        return _Result(result, _Action.AGGREGATE)      @staticmethod -    def sub_by_indices(l: Iterable[T], *index: int) -> list[T]: -        return [v for i, v in enumerate(l) if i in index] +    def send_last(value: Any) -> "_Result"[T]: +        return _Result.aggregate(_Result.send(value), _Result.stop()) + +    def flatten(self) -> Iterable["_Result"[T]]: +        return _flatten_with_func(self, -1, lambda r: r.action != _Action.AGGREGATE, lambda r: r.value) + + +_r_skip = _Result.skip +_r_send = _Result.send +_r_stop = _Result.stop +_r_send_last = _Result.send_last +_r_aggregate = _Result.aggregate + + +class _Defaults: +    @staticmethod +    def true(_): +        return True + +    @staticmethod +    def false(_): +        return False + +    @staticmethod +    def not_found(_): +        return CRU_NOT_FOUND + + +def _default_upstream() -> Iterable[Iterable]: +    return iter([]) + + +CruIterableUpstream = Iterable[Iterable] +CruIterableOptionalUpstream = CruIterableUpstream | None + + +class CruIterableCreators: +    @staticmethod +    def with_(o: Any, /, upstreams: CruIterableOptionalUpstream = _default_upstream()) -> "CruIterableWrapper": +        return CruIterableWrapper(iter(o), upstreams) + +    @staticmethod +    def empty(upstreams: CruIterableOptionalUpstream = _default_upstream()) -> "CruIterableWrapper": +        return CruIterableCreators.with_([], upstreams) + +    @staticmethod +    def range(a, b=None, c=None, /, upstreams: CruIterableOptionalUpstream = _default_upstream()) -> \ +    "CruIterableWrapper"[int]: +        args = [arg for arg in [a, b, c] if arg is not None] +        return CruIterableCreators.with_(range(*args), upstreams) + +    @staticmethod +    def unite(*args: T, upstreams: CruIterableOptionalUpstream = _default_upstream()) -> "CruIterableWrapper"[T]: +        return CruIterableCreators.with_(args, upstreams) + +    @staticmethod +    def _concat(*iterables: Iterable) -> Iterable: +        for iterable in iterables: +            yield from iterable + +    @staticmethod +    def concat(*iterables: Iterable, +               upstreams: CruIterableOptionalUpstream = _default_upstream()) -> "CruIterableWrapper": +        return CruIterableWrapper(CruIterableCreators._concat(*iterables), upstreams) + + +class CruIterableWrapper(Generic[T]): +    Upstream = CruIterableUpstream +    OptionalUpstream = CruIterableOptionalUpstream +    _Result = _Result[T] +    _Operation = Callable[[T, int], _Result | Any | None] + +    def __init__(self, iterable: Iterable[T], /, upstreams: OptionalUpstream = _default_upstream()) -> None: +        self._iterable = iterable +        self._upstreams = None if upstreams is None else list(upstreams) + +    @property +    def me(self) -> Iterable[T]: +        return self._iterable + +    # TODO: Return Type +    @property +    def my_upstreams(self) -> Optional["CruIterableWrapper"]: +        if self._upstreams is None: +            return None +        return CruIterableWrapper(iter(self._upstreams)) + +    def disable_my_upstreams(self) -> "CruIterableWrapper"[T]: +        return CruIterableWrapper(self._iterable, None) + +    def clear_my_upstreams(self) -> "CruIterableWrapper"[T]: +        return CruIterableWrapper(self._iterable) + +    def _create_upstreams_prepend_self(self) -> Upstream: +        yield self._iterable +        yield self.my_upstreams + +    # TODO: Return Type +    def _create_new_upstreams(self, append: bool = True) -> Optional["CruIterableWrapper"]: +        if not append: return self.my_upstreams +        if self.my_upstreams is None: +            return None +        return CruIterableWrapper(self._create_upstreams_prepend_self()) + +    def clone_me(self, /, update_upstreams: bool = True) -> "CruIterableWrapper"[T]: +        return CruIterableWrapper(self._iterable, self._create_new_upstreams(update_upstreams)) + +    def replace_me_with(self, iterable: Iterable[O], /, update_upstreams: bool = True) -> "CruIterableWrapper"[O]: +        return CruIterableCreators.with_(iterable, upstreams=self._create_new_upstreams(update_upstreams)) + +    def replace_me_with_empty(self, /, update_upstreams: bool = True) -> "CruIterableWrapper"[O]: +        return CruIterableCreators.empty(upstreams=self._create_new_upstreams(update_upstreams)) + +    def replace_me_with_range(self, a, b=None, c=None, /, update_upstreams: bool = True) -> "CruIterableWrapper"[int]: +        return CruIterableCreators.range(a, b, c, upstreams=self._create_new_upstreams(update_upstreams)) + +    def replace_me_with_unite(self, *args: O, update_upstreams: bool = True) -> "CruIterableWrapper"[O]: +        return CruIterableCreators.unite(*args, upstreams=self._create_new_upstreams(update_upstreams)) + +    def replace_me_with_concat(self, *iterables: Iterable, update_upstreams: bool = True) -> "CruIterableWrapper": +        return CruIterableCreators.concat(*iterables, upstreams=self._create_new_upstreams(update_upstreams))      @staticmethod -    def complement_indices(length: int, *index: int) -> list[int]: -        return [i for i in range(length) if i not in index] +    def _non_result_to_yield(value: Any | None) -> _Result: +        return _Result.stop(value)      @staticmethod -    def foreach(l: Iterable[T], *f: OptionalElementOperation[T]) -> None: +    def _non_result_to_return(value: Any | None) -> _Result: +        return _Result.stop(value) + +    def _real_iterate(self, operation: _Operation, +                      convert_non_result: Callable[[Any | None], _Result]) -> Iterable: + +        for index, element in enumerate(self._iterable): +            result = operation(element, index) +            if not isinstance(result, _Result): +                result = convert_non_result(result) +            for result in result.flatten(): +                if result.action == _Result.Action.STOP: +                    return result.value +                elif result.action == _Result.Action.SEND: +                    yield result.value +                else: +                    continue + +    def _new(self, operation: _Operation) -> "CruIterableWrapper": +        return CruIterableWrapper(self._real_iterate(operation, CruIterableWrapper._non_result_to_yield), +                                  self._create_new_upstreams()) + +    def _result(self, operation: _Operation, +                result_transform: OptionalElementTransformer[T, T | O] = None) -> T | O: +        try: +            self._real_iterate(operation, CruIterableWrapper._non_result_to_return) +        except StopIteration as stop: +            return stop.value if result_transform is None else result_transform(stop.value) + +    @staticmethod +    def _make_set(iterable: Iterable, discard: Iterable | None) -> set: +        s = set(iterable) +        if discard is not None: +            s = s - set(discard) +        return s + +    @staticmethod +    def _make_list(iterable: Iterable, discard: Iterable | None) -> list: +        if discard is None: return list(iterable) +        return [v for v in iterable if v not in discard] + +    # noinspection PyMethodMayBeStatic +    def _help_make_set(self, iterable: Iterable, discard: Iterable | None = iter([None])) -> set: +        return CruIterableWrapper._make_set(iterable, discard) + +    # noinspection PyMethodMayBeStatic +    def _help_make_list(self, iterable: Iterable, discard: Iterable | None = iter([None])) -> list: +        return CruIterableWrapper._make_list(iterable, discard) + +    def to_set(self, discard: Iterable | None = None) -> set[T]: +        return CruIterableWrapper._make_set(self.me, discard) + +    def to_list(self, discard: Iterable | None = None) -> list[T]: +        return CruIterableWrapper._make_list(self.me, discard) + +    def copy(self) -> "CruIterableWrapper": +        return CruIterableWrapper(iter(self.to_list()), self._create_new_upstreams()) + +    def concat(self, *iterable: Iterable[T]) -> "CruIterableWrapper": +        return self.replace_me_with_concat(self.me, *iterable) + +    def all(self, predicate: ElementPredicate[T]) -> bool: +        """ +        partial +        """ +        return self._result(lambda v, _: predicate(v) and None, _Defaults.true) + +    def all_isinstance(self, *types: OptionalType) -> bool: +        """ +        partial +        """ +        types = self._help_make_set(types) +        return self.all(lambda v: type(v) in types) + +    def any(self, predicate: ElementPredicate[T]) -> bool: +        """ +        partial +        """ +        return self._result(lambda v, _: predicate(v) or None, _Defaults.false) + +    def number(self) -> "CruIterableWrapper": +        """ +        partial +        """ +        return self._new(lambda _, i: i) + +    def take(self, predicate: ElementPredicate[T]) -> "CruIterableWrapper": +        """ +        complete +        """ +        return self._new(lambda v, _: _r_send(v) if predicate(v) else None) + +    def transform(self, *transformers: OptionalElementTransformer) -> "CruIterableWrapper": +        """ +        complete +        """ + +        def _transform_element(element, _): +            for transformer in self._help_make_list(transformers): +                if transformer is not None: +                    element = transformer(element) +            return _r_send(element) + +        return self._new(_transform_element) + +    def take_n(self, max_count: int, neg_is_clone: bool = True) -> "CruIterableWrapper": +        """ +        partial +        """ +        if max_count < 0: +            if neg_is_clone: +                return self.clone_me() +            else: +                raise ValueError("max_count must be 0 or positive.") +        elif max_count == 0: +            return self.drop_all() +        return self._new(lambda v, i: _r_send(v) if i < max_count - 1 else _r_send_last(v)) + +    def take_by_indices(self, *indices: OptionalIndex) -> "CruIterableWrapper": +        """ +        partial +        """ +        indices = self._help_make_set(indices) +        max_index = max(indices) +        return self.take_n(max_index + 1)._new(lambda v, i: _r_send(v) if i in indices else None) + +    def single_or(self, fallback: Any | None = CRU_NOT_FOUND) -> T | Any | CRU_NOT_FOUND: +        """ +        partial +        """ +        first_2 = self.take_n(2) +        has_value = False +        value = None +        for element in first_2.me: +            if has_value: +                raise ValueError("More than one value found.") +            has_value = True +            value = element +        if has_value: +            return value +        else: +            return fallback + +    def first_or(self, predicate: ElementPredicate[T], fallback: Any | None = CRU_NOT_FOUND) -> T | CRU_NOT_FOUND: +        """ +        partial +        """ +        result_iterable = self.take_n(1).single_or() + +    @staticmethod +    def first_index(iterable: Iterable[T], predicate: ElementPredicate[T]) -> int | CRU_NOT_FOUND: +        """ +        partial +        """ +        for index, element in enumerate(iterable): +            if predicate(element): +                return index + +    @staticmethod +    def take_indices(iterable: Iterable[T], predicate: ElementPredicate[T]) -> Iterable[int]: +        """ +        complete +        """ +        for index, element in enumerate(iterable): +            if predicate(element): +                yield index + +    @staticmethod +    def flatten(o, max_depth=-1, is_leave: ElementPredicate | None = None, +                get_children: OptionalElementTransformer = None) -> Iterable: +        """ +        complete +        """ +        if is_leave is None: +            is_leave = lambda v: not isinstance(v, Iterable) +        if get_children is None: +            get_children = lambda v: v +        return CruIterableWrapper._flatten_with_func(o, max_depth, is_leave, get_children) + +    @staticmethod +    def skip_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]: +        """ +        complete +        """ +        indices = set(indices) - {None} +        for index, element in enumerate(iterable): +            if index not in indices: +                yield element + +    @staticmethod +    def skip_if(iterable: Iterable[T], predicate: ElementPredicate[T]) -> list[T]: +        """ +        complete +        """ +        for element in iterable: +            if not predicate(element): +                yield element + +    def drop_all(self) -> "CruIterableWrapper": +        return self.replace_me_with_empty() + +    @staticmethod +    def remove_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]: +        return [v for v in l if not p(v)] + +    @staticmethod +    def remove_all_value(l: Iterable[T], *r: Any) -> list[T]: +        return [v for v in l if v not in r] + +    @staticmethod +    def replace_all_value(l: Iterable[T], old_value: Any, new_value: R) -> list[T | R]: +        return [new_value if v == old_value else v for v in l] + +    @staticmethod +    def foreach(iterable: Iterable[T], *f: OptionalElementOperation[T]) -> None:          if len(f) == 0: return -        for v in l: +        for v in iterable:              for f_ in f:                  if f_ is not None:                      f_(v)      @staticmethod -    def all(l: Iterable[T], p: ElementPredicate[T]) -> bool: -        for v in l: -            if not p(v): return False -        return True +    def make(v: CanBeList[T], /, none_to_empty_list: bool = True) -> list[T]: +        if v is None and none_to_empty_list: return [] +        return list(v) if isinstance(v, Iterable) else [v] + +class ListOperations:      @staticmethod -    def all_is_instance(l: Iterable[T], *t: type) -> bool: -        return all(type(v) in t for v in l) +    def all(iterable: Iterable[T], predicate: ElementPredicate[T]) -> bool: +        """ +        partial +        """ +        return _God.spy(iterable, lambda v, _: predicate(v) and None, _God.Default.true)      @staticmethod -    def any(l: Iterable[T], p: ElementPredicate[T]) -> bool: -        for v in l: -            if p(v): return True -        return False +    def all_isinstance(iterable: Iterable[T], *types: OptionalType) -> bool: +        """ +        partial +        """ +        types = _God.help_make_set(types) +        return ListOperations.all(iterable, lambda v: type(v) in types)      @staticmethod -    def find_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]: -        return [v for v in l if p(v)] +    def any(iterable: Iterable[T], predicate: ElementPredicate[T]) -> bool: +        """ +        partial +        """ +        return _God.spy(iterable, lambda v, _: predicate(v) or None, _God.Default.false)      @staticmethod -    def find_if(l: Iterable[T], p: ElementPredicate[T]) -> T | CRU_NOT_FOUND: -        r = ListOperations.find_all_if(l, p) -        return r[0] if len(r) > 0 else CRU_NOT_FOUND +    def indices(iterable: Iterable[T]) -> Iterable[int]: +        """ +        partial +        """ +        return _God.new(iterable, lambda _, i: i)      @staticmethod -    def find_all_indices_if(l: Iterable[T], p: ElementPredicate[T]) -> list[int]: -        return [i for i, v in enumerate(l) if p(v)] +    def take(iterable: Iterable[T], predicate: ElementPredicate[T]) -> Iterable[T]: +        """ +        complete +        """ +        return _God.new(iterable, lambda v, _: _God.yield_(v) if predicate(v) else None)      @staticmethod -    def find_index_if(l: Iterable[T], p: ElementPredicate[T]) -> int | CRU_NOT_FOUND: -        r = ListOperations.find_all_indices_if(l, p) -        return r[0] if len(r) > 0 else CRU_NOT_FOUND +    def transform(iterable: Iterable[T], *transformers: OptionalElementTransformer) -> Iterable: +        """ +        complete +        """ + +        def _transform_element(element, _): +            for transformer in transformers: +                if transformer is not None: +                    element = transformer(element) +            return element + +        return _God.new(iterable, _transform_element)      @staticmethod -    def transform(l: Iterable[T], *f: OptionalElementTransformer) -> list[Any]: -        r = [] -        for v in l: -            for f_ in f: -                if f_ is not None: -                    v = f_(v) -            r.append(v) -        return r +    def take_n(iterable: Iterable[T], n: int) -> Iterable[T]: +        """ +        partial +        """ +        if n < 0: +            return iterable +        elif n == 0: +            return [] +        return range(n)._god_yield(iterable, lambda v, i: _yield(v) if i < n else _return()) + +    @staticmethod +    def take_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]: +        """ +        partial +        """ +        indices = set(indices) - {None} +        max_index = max(indices) +        iterable = ListOperations.take_n(iterable, max_index + 1) +        return _god_yield(iterable, lambda v, i: _yield(v) if i in indices else None) + +    @staticmethod +    def first(iterable: Iterable[T]) -> T | CRU_NOT_FOUND: +        """ +        partial +        """ +        result_iterable = ListOperations.take_n(iterable, 1) +        for element in result_iterable: +            return element +        return CRU_NOT_FOUND      @staticmethod -    def transform_if(l: Iterable[T], f: OptionalElementTransformer, p: ElementPredicate[T]) -> list[Any]: -        return [(f(v) if f else v) for v in l if p(v)] +    def first_index(iterable: Iterable[T], predicate: ElementPredicate[T]) -> int | CRU_NOT_FOUND: +        """ +        partial +        """ +        for index, element in enumerate(iterable): +            if predicate(element): +                return index      @staticmethod -    def remove_by_indices(l: Iterable[T], *index: int | None) -> list[T]: -        return [v for i, v in enumerate(l) if i not in index] +    def take_indices(iterable: Iterable[T], predicate: ElementPredicate[T]) -> Iterable[int]: +        """ +        complete +        """ +        for index, element in enumerate(iterable): +            if predicate(element): +                yield index      @staticmethod -    def remove_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]: -        i = ListOperations.find_index_if(l, p) -        return ListOperations.remove_by_indices(l, i) +    def _flatten(o, depth: int, max_depth: int) -> Iterable: +        if depth == max_depth or not isinstance(o, Iterable): +            yield o +            return +        for v in o: +            yield from ListOperations._flatten(v, depth + 1, max_depth) + +    @staticmethod +    def flatten(o, max_depth=-1) -> Iterable: +        """ +        complete +        """ +        return ListOperations._flatten(o, 0, max_depth) + +    @staticmethod +    def skip_by_indices(iterable: Iterable[T], *indices: OptionalIndex) -> Iterable[T]: +        """ +        complete +        """ +        indices = set(indices) - {None} +        for index, element in enumerate(iterable): +            if index not in indices: +                yield element + +    @staticmethod +    def skip_if(iterable: Iterable[T], predicate: ElementPredicate[T]) -> list[T]: +        """ +        complete +        """ +        for element in iterable: +            if not predicate(element): +                yield element      @staticmethod      def remove_all_if(l: Iterable[T], p: ElementPredicate[T]) -> list[T]: @@ -103,6 +566,14 @@ class ListOperations:          return [new_value if v == old_value else v for v in l]      @staticmethod +    def foreach(iterable: Iterable[T], *f: OptionalElementOperation[T]) -> None: +        if len(f) == 0: return +        for v in iterable: +            for f_ in f: +                if f_ is not None: +                    f_(v) + +    @staticmethod      def make(v: CanBeList[T], /, none_to_empty_list: bool = True) -> list[T]:          if v is None and none_to_empty_list: return []          return list(v) if isinstance(v, Iterable) else [v] @@ -116,6 +587,10 @@ class CruList(list, Generic[T]):      def sub_by_indices(self, *index: int) -> "CruList"[T]:          return CruList(ListOperations.sub_by_indices(self, *index)) +    def split_by_indices(self, *index: int) -> tuple["CruList"[T], "CruList"[T]]: +        l1, l2 = ListOperations.split_by_indices(self, *index) +        return CruList(l1), CruList(l2) +      def complement_indices(self, *index: int) -> list[int]:          return ListOperations.complement_indices(len(self), *index) @@ -126,22 +601,30 @@ class CruList(list, Generic[T]):          return ListOperations.all(self, p)      def all_is_instance(self, *t: type) -> bool: -        return ListOperations.all_is_instance(self, *t) +        return ListOperations.all_isinstance(self, *t)      def any(self, p: ElementPredicate[T]) -> bool:          return ListOperations.any(self, p)      def find_all_if(self, p: ElementPredicate[T]) -> "CruList"[T]: -        return CruList(ListOperations.find_all_if(self, p)) +        return CruList(ListOperations.take(self, p))      def find_if(self, p: ElementPredicate[T]) -> T | CRU_NOT_FOUND: -        return ListOperations.find_if(self, p) +        return ListOperations.first(self, p)      def find_all_indices_if(self, p: ElementPredicate[T]) -> "CruList"[int]: -        return CruList(ListOperations.find_all_indices_if(self, p)) +        return CruList(ListOperations.take_indices(self, p))      def find_index_if(self, p: ElementPredicate[T]) -> int | CRU_NOT_FOUND: -        return ListOperations.find_index_if(self, p) +        return ListOperations.first_index(self, p) + +    def split_if(self, p: ElementPredicate[T]) -> tuple["CruList"[T], "CruList"[T]]: +        l1, l2 = ListOperations.split_if(self, p) +        return CruList(l1), CruList(l2) + +    def split_by_types(self, *t: type) -> tuple["CruList"[T], "CruList"[T]]: +        l1, l2 = ListOperations.split_by_types(self, *t) +        return CruList(l1), CruList(l2)      def transform(self, *f: OptionalElementTransformer) -> "CruList"[Any]:          return CruList(ListOperations.transform(self, *f)) @@ -150,7 +633,7 @@ class CruList(list, Generic[T]):          return CruList(ListOperations.transform_if(self, f, p))      def remove_by_indices(self, *index: int) -> "CruList"[T]: -        return CruList(ListOperations.remove_by_indices(self, *index)) +        return CruList(ListOperations.skip_by_indices(self, *index))      def remove_if(self, p: ElementPredicate[T]) -> "CruList"[T]:          return CruList(ListOperations.remove_if(self, p)) diff --git a/tools/cru-py/crupest/template2.py b/tools/cru-py/crupest/template2.py deleted file mode 100644 index ae096df..0000000 --- a/tools/cru-py/crupest/template2.py +++ /dev/null @@ -1,45 +0,0 @@ -import os.path -import re - -_template_filename_suffix = ".template" -_template_var_regex = r"\$([-_a-zA-Z0-9]+)" -_template_var_brace_regex = r"\$\{\s*([-_a-zA-Z0-9]+?)\s*\}" - - -class Template2: - -    @staticmethod -    def from_file(template_path: str) -> "Template2": -        if not template_path.endswith(_template_filename_suffix): -            raise Exception( -                "Template file must have a name ending with .template.") -        template_name = os.path.basename( -            template_path)[:-len(_template_filename_suffix)] -        with open(template_path, "r") as f: -            template = f.read() -        return Template2(template_name, template, template_path=template_path) - -    def __init__(self, template_name: str, template: str, *, template_path: str | None = None) -> None: -        self.template_name = template_name -        self.template = template -        self.template_path = template_path -        self.var_set = set() -        for match in re.finditer(_template_var_regex, self.template): -            self.var_set.add(match.group(1)) -        for match in re.finditer(_template_var_brace_regex, self.template): -            self.var_set.add(match.group(1)) - -    def partial_render(self, vars: dict[str, str]) -> "Template2": -        t = self.render(vars) -        return Template2(self.template_name, t, template_path=self.template_path) - -    def render(self, vars: dict[str, str]) -> str: -        for name in vars.keys(): -            if name not in self.var_set: -                raise ValueError(f"Invalid var name {name}.") - -        text = self.template -        for name, value in vars.items(): -            text = text.replace("$" + name, value) -            text = re.sub(r"\$\{\s*" + name + r"\s*\}", value, text) -        return text | 
