aboutsummaryrefslogtreecommitdiff
path: root/tools/cru-py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/cru-py')
-rw-r--r--tools/cru-py/.gitignore2
-rw-r--r--tools/cru-py/.python-version1
-rw-r--r--tools/cru-py/cru/__init__.py60
-rw-r--r--tools/cru-py/cru/_base.py101
-rw-r--r--tools/cru-py/cru/_const.py49
-rw-r--r--tools/cru-py/cru/_decorator.py97
-rw-r--r--tools/cru-py/cru/_error.py89
-rw-r--r--tools/cru-py/cru/_event.py61
-rw-r--r--tools/cru-py/cru/_func.py172
-rw-r--r--tools/cru-py/cru/_helper.py16
-rw-r--r--tools/cru-py/cru/_iter.py466
-rw-r--r--tools/cru-py/cru/_type.py52
-rw-r--r--tools/cru-py/cru/attr.py364
-rw-r--r--tools/cru-py/cru/config.py196
-rw-r--r--tools/cru-py/cru/list.py160
-rw-r--r--tools/cru-py/cru/parsing.py98
-rw-r--r--tools/cru-py/cru/service/__init__.py0
-rw-r--r--tools/cru-py/cru/service/__main__.py20
-rw-r--r--tools/cru-py/cru/service/_app.py34
-rw-r--r--tools/cru-py/cru/service/_base.py449
-rw-r--r--tools/cru-py/cru/service/_config.py446
-rw-r--r--tools/cru-py/cru/service/_external.py81
-rw-r--r--tools/cru-py/cru/service/_nginx.py281
-rw-r--r--tools/cru-py/cru/service/_template.py86
-rw-r--r--tools/cru-py/cru/system.py23
-rw-r--r--tools/cru-py/cru/template.py153
-rw-r--r--tools/cru-py/cru/tool.py82
-rw-r--r--tools/cru-py/cru/value.py292
-rw-r--r--tools/cru-py/poetry.lock80
-rw-r--r--tools/cru-py/pyproject.toml26
-rw-r--r--tools/cru-py/www-dev8
31 files changed, 4045 insertions, 0 deletions
diff --git a/tools/cru-py/.gitignore b/tools/cru-py/.gitignore
new file mode 100644
index 0000000..9f7550b
--- /dev/null
+++ b/tools/cru-py/.gitignore
@@ -0,0 +1,2 @@
+__pycache__
+.venv
diff --git a/tools/cru-py/.python-version b/tools/cru-py/.python-version
new file mode 100644
index 0000000..37504c5
--- /dev/null
+++ b/tools/cru-py/.python-version
@@ -0,0 +1 @@
+3.11
diff --git a/tools/cru-py/cru/__init__.py b/tools/cru-py/cru/__init__.py
new file mode 100644
index 0000000..17799a9
--- /dev/null
+++ b/tools/cru-py/cru/__init__.py
@@ -0,0 +1,60 @@
+import sys
+
+from ._base import CRU, CruNamespaceError, CRU_NAME_PREFIXES
+from ._error import (
+ CruException,
+ CruLogicError,
+ CruInternalError,
+ CruUnreachableError,
+ cru_unreachable,
+)
+from ._const import (
+ CruConstantBase,
+ CruDontChange,
+ CruNotFound,
+ CruNoValue,
+ CruPlaceholder,
+ CruUseDefault,
+)
+from ._func import CruFunction
+from ._iter import CruIterable, CruIterator
+from ._event import CruEvent, CruEventHandlerToken
+from ._type import CruTypeSet, CruTypeCheckError
+
+
+class CruInitError(CruException):
+ pass
+
+
+def check_python_version(required_version=(3, 11)):
+ if sys.version_info < required_version:
+ raise CruInitError(f"Python version must be >= {required_version}!")
+
+
+check_python_version()
+
+__all__ = [
+ "CRU",
+ "CruNamespaceError",
+ "CRU_NAME_PREFIXES",
+ "check_python_version",
+ "CruException",
+ "CruInternalError",
+ "CruLogicError",
+ "CruUnreachableError",
+ "cru_unreachable",
+ "CruInitError",
+ "CruConstantBase",
+ "CruDontChange",
+ "CruNotFound",
+ "CruNoValue",
+ "CruPlaceholder",
+ "CruUseDefault",
+ "CruFunction",
+ "CruIterable",
+ "CruIterator",
+ "CruEvent",
+ "CruEventHandlerToken",
+ "CruTypeSet",
+ "CruTypeCheckError",
+]
diff --git a/tools/cru-py/cru/_base.py b/tools/cru-py/cru/_base.py
new file mode 100644
index 0000000..2599d8f
--- /dev/null
+++ b/tools/cru-py/cru/_base.py
@@ -0,0 +1,101 @@
+from typing import Any
+
+from ._helper import remove_none
+from ._error import CruException
+
+
+class CruNamespaceError(CruException):
+ """Raised when a namespace is not found."""
+
+
+class _Cru:
+ NAME_PREFIXES = ("CRU_", "Cru", "cru_")
+
+ def __init__(self) -> None:
+ self._d: dict[str, Any] = {}
+
+ def all_names(self) -> list[str]:
+ return list(self._d.keys())
+
+ def get(self, name: str) -> Any:
+ return self._d[name]
+
+ def has_name(self, name: str) -> bool:
+ return name in self._d
+
+ @staticmethod
+ def _maybe_remove_prefix(name: str) -> str | None:
+ for prefix in _Cru.NAME_PREFIXES:
+ if name.startswith(prefix):
+ return name[len(prefix) :]
+ return None
+
+ def _check_name_exist(self, *names: str | None) -> None:
+ for name in names:
+ if name is None:
+ continue
+ if self.has_name(name):
+ raise CruNamespaceError(f"Name {name} exists in CRU.")
+
+ @staticmethod
+ def check_name_format(name: str) -> tuple[str, str]:
+ no_prefix_name = _Cru._maybe_remove_prefix(name)
+ if no_prefix_name is None:
+ raise CruNamespaceError(
+ f"Name {name} is not prefixed with any of {_Cru.NAME_PREFIXES}."
+ )
+ return name, no_prefix_name
+
+ @staticmethod
+ def _check_object_name(o) -> tuple[str, str]:
+ return _Cru.check_name_format(o.__name__)
+
+ def _do_add(self, o, *names: str | None) -> list[str]:
+ name_list: list[str] = remove_none(names)
+ for name in name_list:
+ self._d[name] = o
+ return name_list
+
+ def add(self, o, name: str | None) -> tuple[str, str | None]:
+ no_prefix_name: str | None
+ if name is None:
+ name, no_prefix_name = self._check_object_name(o)
+ else:
+ no_prefix_name = self._maybe_remove_prefix(name)
+
+ self._check_name_exist(name, no_prefix_name)
+ self._do_add(o, name, no_prefix_name)
+ return name, no_prefix_name
+
+ def add_with_alias(self, o, name: str | None = None, *aliases: str) -> list[str]:
+ final_names: list[str | None] = []
+ no_prefix_name: str | None
+ if name is None:
+ name, no_prefix_name = self._check_object_name(o)
+ self._check_name_exist(name, no_prefix_name)
+ final_names.extend([name, no_prefix_name])
+ for alias in aliases:
+ no_prefix_name = self._maybe_remove_prefix(alias)
+ self._check_name_exist(alias, no_prefix_name)
+ final_names.extend([alias, no_prefix_name])
+
+ return self._do_add(o, *final_names)
+
+ def add_objects(self, *objects):
+ final_list = []
+ for o in objects:
+ name, no_prefix_name = self._check_object_name(o)
+ self._check_name_exist(name, no_prefix_name)
+ final_list.append((o, name, no_prefix_name))
+ for o, name, no_prefix_name in final_list:
+ self._do_add(o, name, no_prefix_name)
+
+ def __getitem__(self, item):
+ return self.get(item)
+
+ def __getattr__(self, item):
+ return self.get(item)
+
+
+CRU_NAME_PREFIXES = _Cru.NAME_PREFIXES
+CRU = _Cru()
diff --git a/tools/cru-py/cru/_const.py b/tools/cru-py/cru/_const.py
new file mode 100644
index 0000000..8246b35
--- /dev/null
+++ b/tools/cru-py/cru/_const.py
@@ -0,0 +1,49 @@
+from enum import Enum, auto
+from typing import Self, TypeGuard, TypeVar
+
+from ._base import CRU
+
+_T = TypeVar("_T")
+
+
+class CruConstantBase(Enum):
+ @classmethod
+ def check(cls, v: _T | Self) -> TypeGuard[Self]:
+ return isinstance(v, cls)
+
+ @classmethod
+ def check_not(cls, v: _T | Self) -> TypeGuard[_T]:
+ return not cls.check(v)
+
+ @classmethod
+ def value(cls) -> Self:
+ return cls.VALUE # type: ignore
+
+
+class CruNotFound(CruConstantBase):
+ VALUE = auto()
+
+
+class CruUseDefault(CruConstantBase):
+ VALUE = auto()
+
+
+class CruDontChange(CruConstantBase):
+ VALUE = auto()
+
+
+class CruNoValue(CruConstantBase):
+ VALUE = auto()
+
+
+class CruPlaceholder(CruConstantBase):
+ VALUE = auto()
+
+
+CRU.add_objects(
+ CruNotFound,
+ CruUseDefault,
+ CruDontChange,
+ CruNoValue,
+ CruPlaceholder,
+)
diff --git a/tools/cru-py/cru/_decorator.py b/tools/cru-py/cru/_decorator.py
new file mode 100644
index 0000000..137fc05
--- /dev/null
+++ b/tools/cru-py/cru/_decorator.py
@@ -0,0 +1,97 @@
+from __future__ import annotations
+
+from collections.abc import Callable
+from typing import (
+ Concatenate,
+ Generic,
+ ParamSpec,
+ TypeVar,
+ cast,
+)
+
+from ._base import CRU
+
+_P = ParamSpec("_P")
+_T = TypeVar("_T")
+_O = TypeVar("_O")
+_R = TypeVar("_R")
+
+
+class CruDecorator:
+
+ class ConvertResult(Generic[_T, _O]):
+ def __init__(
+ self,
+ converter: Callable[[_T], _O],
+ ) -> None:
+ self.converter = converter
+
+ def __call__(self, origin: Callable[_P, _T]) -> Callable[_P, _O]:
+ converter = self.converter
+
+ def real_impl(*args: _P.args, **kwargs: _P.kwargs) -> _O:
+ return converter(origin(*args, **kwargs))
+
+ return real_impl
+
+ class ImplementedBy(Generic[_T, _O, _P, _R]):
+ def __init__(
+ self,
+ impl: Callable[Concatenate[_O, _P], _R],
+ converter: Callable[[_T], _O],
+ ) -> None:
+ self.impl = impl
+ self.converter = converter
+
+ def __call__(
+ self, _origin: Callable[[_T], None]
+ ) -> Callable[Concatenate[_T, _P], _R]:
+ converter = self.converter
+ impl = self.impl
+
+ def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R:
+ return cast(Callable[Concatenate[_O, _P], _R], impl)(
+ converter(_self), *args, **kwargs
+ )
+
+ return real_impl
+
+ @staticmethod
+ def create_factory(converter: Callable[[_T], _O]) -> Callable[
+ [Callable[Concatenate[_O, _P], _R]],
+ CruDecorator.ImplementedBy[_T, _O, _P, _R],
+ ]:
+ def create(
+ m: Callable[Concatenate[_O, _P], _R],
+ ) -> CruDecorator.ImplementedBy[_T, _O, _P, _R]:
+ return CruDecorator.ImplementedBy(m, converter)
+
+ return create
+
+ class ImplementedByNoSelf(Generic[_P, _R]):
+ def __init__(self, impl: Callable[_P, _R]) -> None:
+ self.impl = impl
+
+ def __call__(
+ self, _origin: Callable[[_T], None]
+ ) -> Callable[Concatenate[_T, _P], _R]:
+ impl = self.impl
+
+ def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R:
+ return cast(Callable[_P, _R], impl)(*args, **kwargs)
+
+ return real_impl
+
+ @staticmethod
+ def create_factory() -> (
+ Callable[[Callable[_P, _R]], CruDecorator.ImplementedByNoSelf[_P, _R]]
+ ):
+ def create(
+ m: Callable[_P, _R],
+ ) -> CruDecorator.ImplementedByNoSelf[_P, _R]:
+ return CruDecorator.ImplementedByNoSelf(m)
+
+ return create
+
+
+CRU.add_objects(CruDecorator)
diff --git a/tools/cru-py/cru/_error.py b/tools/cru-py/cru/_error.py
new file mode 100644
index 0000000..e53c787
--- /dev/null
+++ b/tools/cru-py/cru/_error.py
@@ -0,0 +1,89 @@
+from __future__ import annotations
+
+from typing import NoReturn, cast, overload
+
+
+class CruException(Exception):
+ """Base exception class of all exceptions in cru."""
+
+ @overload
+ def __init__(
+ self,
+ message: None = None,
+ *args,
+ user_message: str,
+ **kwargs,
+ ): ...
+
+ @overload
+ def __init__(
+ self,
+ message: str,
+ *args,
+ user_message: str | None = None,
+ **kwargs,
+ ): ...
+
+ def __init__(
+ self,
+ message: str | None = None,
+ *args,
+ user_message: str | None = None,
+ **kwargs,
+ ):
+ if message is None:
+ message = user_message
+
+ super().__init__(
+ message,
+ *args,
+ **kwargs,
+ )
+ self._message: str
+ self._message = cast(str, message)
+ self._user_message = user_message
+
+ @property
+ def message(self) -> str:
+ return self._message
+
+ def get_user_message(self) -> str | None:
+ return self._user_message
+
+ def get_message(self, use_user: bool = True) -> str:
+ if use_user and self._user_message is not None:
+ return self._user_message
+ else:
+ return self._message
+
+ @property
+ def is_internal(self) -> bool:
+ return False
+
+ @property
+ def is_logic_error(self) -> bool:
+ return False
+
+
+class CruLogicError(CruException):
+ """Raised when a logic error occurs."""
+
+ @property
+ def is_logic_error(self) -> bool:
+ return True
+
+
+class CruInternalError(CruException):
+ """Raised when an internal error occurs."""
+
+ @property
+ def is_internal(self) -> bool:
+ return True
+
+
+class CruUnreachableError(CruInternalError):
+ """Raised when a code path is unreachable."""
+
+
+def cru_unreachable() -> NoReturn:
+ raise CruUnreachableError("Code should not reach here!")
diff --git a/tools/cru-py/cru/_event.py b/tools/cru-py/cru/_event.py
new file mode 100644
index 0000000..51a794c
--- /dev/null
+++ b/tools/cru-py/cru/_event.py
@@ -0,0 +1,61 @@
+from __future__ import annotations
+
+from collections.abc import Callable
+from typing import Generic, ParamSpec, TypeVar
+
+from .list import CruList
+
+_P = ParamSpec("_P")
+_R = TypeVar("_R")
+
+
+class CruEventHandlerToken(Generic[_P, _R]):
+ def __init__(
+ self, event: CruEvent, handler: Callable[_P, _R], once: bool = False
+ ) -> None:
+ self._event = event
+ self._handler = handler
+ self._once = once
+
+ @property
+ def event(self) -> CruEvent:
+ return self._event
+
+ @property
+ def handler(self) -> Callable[_P, _R]:
+ return self._handler
+
+ @property
+ def once(self) -> bool:
+ return self._once
+
+
+class CruEvent(Generic[_P, _R]):
+ def __init__(self, name: str) -> None:
+ self._name = name
+ self._tokens: CruList[CruEventHandlerToken] = CruList()
+
+ def register(
+ self, handler: Callable[_P, _R], once: bool = False
+ ) -> CruEventHandlerToken:
+ token = CruEventHandlerToken(self, handler, once)
+ self._tokens.append(token)
+ return token
+
+ def unregister(self, *handlers: CruEventHandlerToken | Callable[_P, _R]) -> int:
+ old_length = len(self._tokens)
+ self._tokens.reset(
+ self._tokens.as_cru_iterator().filter(
+ (lambda t: t in handlers or t.handler in handlers)
+ )
+ )
+ return old_length - len(self._tokens)
+
+ def trigger(self, *args: _P.args, **kwargs: _P.kwargs) -> CruList[_R]:
+ results = CruList(
+ self._tokens.as_cru_iterator()
+ .transform(lambda t: t.handler(*args, **kwargs))
+ .to_list()
+ )
+ self._tokens.reset(self._tokens.as_cru_iterator().filter(lambda t: not t.once))
+ return results
diff --git a/tools/cru-py/cru/_func.py b/tools/cru-py/cru/_func.py
new file mode 100644
index 0000000..fc57802
--- /dev/null
+++ b/tools/cru-py/cru/_func.py
@@ -0,0 +1,172 @@
+from __future__ import annotations
+
+from collections.abc import Callable, Iterable
+from enum import Flag, auto
+from typing import (
+ Any,
+ Generic,
+ Literal,
+ ParamSpec,
+ TypeAlias,
+ TypeVar,
+)
+
+
+from ._base import CRU
+from ._const import CruPlaceholder
+
+_P = ParamSpec("_P")
+_P1 = ParamSpec("_P1")
+_T = TypeVar("_T")
+
+
+class _Dec:
+ @staticmethod
+ def wrap(
+ origin: Callable[_P, Callable[_P1, _T]]
+ ) -> Callable[_P, _Wrapper[_P1, _T]]:
+ def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_P1, _T]:
+ return _Wrapper(origin(*args, **kwargs))
+
+ return _wrapped
+
+
+class _RawBase:
+ @staticmethod
+ def none(*_v, **_kwargs) -> None:
+ return None
+
+ @staticmethod
+ def true(*_v, **_kwargs) -> Literal[True]:
+ return True
+
+ @staticmethod
+ def false(*_v, **_kwargs) -> Literal[False]:
+ return False
+
+ @staticmethod
+ def identity(v: _T) -> _T:
+ return v
+
+ @staticmethod
+ def only_you(v: _T, *_v, **_kwargs) -> _T:
+ return v
+
+ @staticmethod
+ def equal(a: Any, b: Any) -> bool:
+ return a == b
+
+ @staticmethod
+ def not_equal(a: Any, b: Any) -> bool:
+ return a != b
+
+ @staticmethod
+ def not_(v: Any) -> Any:
+ return not v
+
+
+class _Wrapper(Generic[_P, _T]):
+ def __init__(self, f: Callable[_P, _T]):
+ self._f = f
+
+ @property
+ def me(self) -> Callable[_P, _T]:
+ return self._f
+
+ def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T:
+ return self._f(*args, **kwargs)
+
+ @_Dec.wrap
+ def bind(self, *bind_args, **bind_kwargs) -> Callable[..., _T]:
+ func = self.me
+
+ def bound_func(*args, **kwargs):
+ popped = 0
+ real_args = []
+ for arg in bind_args:
+ if CruPlaceholder.check(arg):
+ real_args.append(args[popped])
+ popped += 1
+ else:
+ real_args.append(arg)
+ real_args.extend(args[popped:])
+ return func(*real_args, **(bind_kwargs | kwargs))
+
+ return bound_func
+
+ class ChainMode(Flag):
+ ARGS = auto()
+ KWARGS = auto()
+ BOTH = ARGS | KWARGS
+
+ ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]]
+ KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]]
+ ChainableCallable: TypeAlias = Callable[
+ ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]]
+ ]
+
+ @_Dec.wrap
+ def chain_with_args(
+ self, funcs: Iterable[ArgsChainableCallable], *bind_args, **bind_kwargs
+ ) -> ArgsChainableCallable:
+ def chained_func(*args):
+ args = self.bind(*bind_args, **bind_kwargs)(*args)
+
+ for func in funcs:
+ args = _Wrapper(func).bind(*bind_args, **bind_kwargs)(*args)
+ return args
+
+ return chained_func
+
+ @_Dec.wrap
+ def chain_with_kwargs(
+ self, funcs: Iterable[KwargsChainableCallable], *bind_args, **bind_kwargs
+ ) -> KwargsChainableCallable:
+ def chained_func(**kwargs):
+ kwargs = self.bind(*bind_args, **bind_kwargs)(**kwargs)
+ for func in funcs:
+ kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(**kwargs)
+ return kwargs
+
+ return chained_func
+
+ @_Dec.wrap
+ def chain_with_both(
+ self, funcs: Iterable[ChainableCallable], *bind_args, **bind_kwargs
+ ) -> ChainableCallable:
+ def chained_func(*args, **kwargs):
+ for func in funcs:
+ args, kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(
+ *args, **kwargs
+ )
+ return args, kwargs
+
+ return chained_func
+
+
+class _Base:
+ none = _Wrapper(_RawBase.none)
+ true = _Wrapper(_RawBase.true)
+ false = _Wrapper(_RawBase.false)
+ identity = _Wrapper(_RawBase.identity)
+ only_you = _Wrapper(_RawBase.only_you)
+ equal = _Wrapper(_RawBase.equal)
+ not_equal = _Wrapper(_RawBase.not_equal)
+ not_ = _Wrapper(_RawBase.not_)
+
+
+class _Creators:
+ @staticmethod
+ def make_isinstance_of_types(*types: type) -> Callable:
+ return _Wrapper(lambda v: type(v) in types)
+
+
+class CruFunction:
+ RawBase: TypeAlias = _RawBase
+ Base: TypeAlias = _Base
+ Creators: TypeAlias = _Creators
+ Wrapper: TypeAlias = _Wrapper
+ Decorators: TypeAlias = _Dec
+
+
+CRU.add_objects(CruFunction)
diff --git a/tools/cru-py/cru/_helper.py b/tools/cru-py/cru/_helper.py
new file mode 100644
index 0000000..43baf46
--- /dev/null
+++ b/tools/cru-py/cru/_helper.py
@@ -0,0 +1,16 @@
+from collections.abc import Callable
+from typing import Any, Iterable, TypeVar, cast
+
+_T = TypeVar("_T")
+_D = TypeVar("_D")
+
+
+def remove_element(
+ iterable: Iterable[_T | None], to_rm: Iterable[Any], des: type[_D] | None = None
+) -> _D:
+ to_rm = set(to_rm)
+ return cast(Callable[..., _D], des or list)(v for v in iterable if v not in to_rm)
+
+
+def remove_none(iterable: Iterable[_T | None], des: type[_D] | None = None) -> _D:
+ return cast(Callable[..., _D], des or list)(v for v in iterable if v is not None)
diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py
new file mode 100644
index 0000000..8f58561
--- /dev/null
+++ b/tools/cru-py/cru/_iter.py
@@ -0,0 +1,466 @@
+from __future__ import annotations
+
+from collections.abc import Iterable, Callable, Generator, Iterator
+from dataclasses import dataclass
+from enum import Enum
+from typing import (
+ Concatenate,
+ Literal,
+ Never,
+ Self,
+ TypeAlias,
+ TypeVar,
+ ParamSpec,
+ Any,
+ Generic,
+ cast,
+)
+
+from ._base import CRU
+from ._const import CruNotFound
+from ._error import cru_unreachable
+
+_P = ParamSpec("_P")
+_T = TypeVar("_T")
+_O = TypeVar("_O")
+_V = TypeVar("_V")
+_R = TypeVar("_R")
+
+
+class _Generic:
+ class StepActionKind(Enum):
+ SKIP = 0
+ PUSH = 1
+ STOP = 2
+ AGGREGATE = 3
+
+ @dataclass
+ class StepAction(Generic[_V, _R]):
+ value: Iterable[Self] | _V | _R | None
+ kind: _Generic.StepActionKind
+
+ @property
+ def push_value(self) -> _V:
+ assert self.kind == _Generic.StepActionKind.PUSH
+ return cast(_V, self.value)
+
+ @property
+ def stop_value(self) -> _R:
+ assert self.kind == _Generic.StepActionKind.STOP
+ return cast(_R, self.value)
+
+ @staticmethod
+ def skip() -> _Generic.StepAction[_V, _R]:
+ return _Generic.StepAction(None, _Generic.StepActionKind.SKIP)
+
+ @staticmethod
+ def push(value: _V | None) -> _Generic.StepAction[_V, _R]:
+ return _Generic.StepAction(value, _Generic.StepActionKind.PUSH)
+
+ @staticmethod
+ def stop(value: _R | None = None) -> _Generic.StepAction[_V, _R]:
+ return _Generic.StepAction(value, _Generic.StepActionKind.STOP)
+
+ @staticmethod
+ def aggregate(
+ *results: _Generic.StepAction[_V, _R],
+ ) -> _Generic.StepAction[_V, _R]:
+ return _Generic.StepAction(results, _Generic.StepActionKind.AGGREGATE)
+
+ @staticmethod
+ def push_last(value: _V | None) -> _Generic.StepAction[_V, _R]:
+ return _Generic.StepAction.aggregate(
+ _Generic.StepAction.push(value), _Generic.StepAction.stop()
+ )
+
+ def flatten(self) -> Iterable[Self]:
+ return _Generic.flatten(
+ self,
+ is_leave=lambda r: r.kind != _Generic.StepActionKind.AGGREGATE,
+ get_children=lambda r: cast(Iterable[Self], r.value),
+ )
+
+ GeneralStepAction: TypeAlias = StepAction[_V, _R] | _V | _R | None
+ IterateOperation: TypeAlias = Callable[[_T, int], GeneralStepAction[_V, _R]]
+ IteratePreHook: TypeAlias = Callable[[Iterable[_T]], GeneralStepAction[_V, _R]]
+ IteratePostHook: TypeAlias = Callable[[int], GeneralStepAction[_V, _R]]
+
+ @staticmethod
+ def _is_not_iterable(o: Any) -> bool:
+ return not isinstance(o, Iterable)
+
+ @staticmethod
+ def _return_self(o):
+ return o
+
+ @staticmethod
+ def iterable_flatten(
+ maybe_iterable: Iterable[_T] | _T, max_depth: int = -1, *, _depth: int = 0
+ ) -> Iterable[Iterable[_T] | _T]:
+ if _depth == max_depth or not isinstance(maybe_iterable, Iterable):
+ yield maybe_iterable
+ return
+
+ for child in maybe_iterable:
+ yield from _Generic.iterable_flatten(
+ child,
+ max_depth,
+ _depth=_depth + 1,
+ )
+
+ @staticmethod
+ def flatten(
+ o: _O,
+ max_depth: int = -1,
+ /,
+ is_leave: CruIterator.ElementPredicate[_O] = _is_not_iterable,
+ get_children: CruIterator.ElementTransformer[_O, Iterable[_O]] = _return_self,
+ *,
+ _depth: int = 0,
+ ) -> Iterable[_O]:
+ if _depth == max_depth or is_leave(o):
+ yield o
+ return
+ for child in get_children(o):
+ yield from _Generic.flatten(
+ child,
+ max_depth,
+ is_leave,
+ get_children,
+ _depth=_depth + 1,
+ )
+
+ class Results:
+ @staticmethod
+ def true(_) -> Literal[True]:
+ return True
+
+ @staticmethod
+ def false(_) -> Literal[False]:
+ return False
+
+ @staticmethod
+ def not_found(_) -> Literal[CruNotFound.VALUE]:
+ return CruNotFound.VALUE
+
+ @staticmethod
+ def _non_result_to_push(value: Any) -> StepAction[_V, _R]:
+ return _Generic.StepAction.push(value)
+
+ @staticmethod
+ def _non_result_to_stop(value: Any) -> StepAction[_V, _R]:
+ return _Generic.StepAction.stop(value)
+
+ @staticmethod
+ def _none_hook(_: Any) -> StepAction[_V, _R]:
+ return _Generic.StepAction.skip()
+
+ def iterate(
+ iterable: Iterable[_T],
+ operation: IterateOperation[_T, _V, _R],
+ fallback_return: _R,
+ pre_iterate: IteratePreHook[_T, _V, _R],
+ post_iterate: IteratePostHook[_V, _R],
+ convert_value_result: Callable[[_V | _R | None], StepAction[_V, _R]],
+ ) -> Generator[_V, None, _R]:
+ pre_result = pre_iterate(iterable)
+ if not isinstance(pre_result, _Generic.StepAction):
+ real_pre_result = convert_value_result(pre_result)
+ for r in real_pre_result.flatten():
+ if r.kind == _Generic.StepActionKind.STOP:
+ return r.stop_value
+ elif r.kind == _Generic.StepActionKind.PUSH:
+ yield r.push_value
+ else:
+ assert r.kind == _Generic.StepActionKind.SKIP
+
+ for index, element in enumerate(iterable):
+ result = operation(element, index)
+ if not isinstance(result, _Generic.StepAction):
+ real_result = convert_value_result(result)
+ for r in real_result.flatten():
+ if r.kind == _Generic.StepActionKind.STOP:
+ return r.stop_value
+ elif r.kind == _Generic.StepActionKind.PUSH:
+ yield r.push_value
+ else:
+ assert r.kind == _Generic.StepActionKind.SKIP
+ continue
+
+ post_result = post_iterate(index + 1)
+ if not isinstance(post_result, _Generic.StepAction):
+ real_post_result = convert_value_result(post_result)
+ for r in real_post_result.flatten():
+ if r.kind == _Generic.StepActionKind.STOP:
+ return r.stop_value
+ elif r.kind == _Generic.StepActionKind.PUSH:
+ yield r.push_value
+ else:
+ assert r.kind == _Generic.StepActionKind.SKIP
+
+ return fallback_return
+
+ def create_new(
+ iterable: Iterable[_T],
+ operation: IterateOperation[_T, _V, _R],
+ fallback_return: _R,
+ /,
+ pre_iterate: IteratePreHook[_T, _V, _R] | None = None,
+ post_iterate: IteratePostHook[_V, _R] | None = None,
+ ) -> Generator[_V, None, _R]:
+ return _Generic.iterate(
+ iterable,
+ operation,
+ fallback_return,
+ pre_iterate or _Generic._none_hook,
+ post_iterate or _Generic._none_hook,
+ _Generic._non_result_to_push,
+ )
+
+ def get_result(
+ iterable: Iterable[_T],
+ operation: IterateOperation[_T, _V, _R],
+ fallback_return: _R,
+ /,
+ pre_iterate: IteratePreHook[_T, _V, _R] | None = None,
+ post_iterate: IteratePostHook[_V, _R] | None = None,
+ ) -> _R:
+ try:
+ for _ in _Generic.iterate(
+ iterable,
+ operation,
+ fallback_return,
+ pre_iterate or _Generic._none_hook,
+ post_iterate or _Generic._none_hook,
+ _Generic._non_result_to_stop,
+ ):
+ pass
+ except StopIteration as stop:
+ return stop.value
+ cru_unreachable()
+
+
+class _Helpers:
+ @staticmethod
+ def auto_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]:
+ count = 0
+
+ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _O:
+ nonlocal count
+ r = c(count, *args, **kwargs)
+ count += 1
+ return r
+
+ return wrapper
+
+
+class _Creators:
+ class Raw:
+ @staticmethod
+ def empty() -> Iterator[Never]:
+ return iter([])
+
+ @staticmethod
+ def range(*args) -> Iterator[int]:
+ return iter(range(*args))
+
+ @staticmethod
+ def unite(*args: _T) -> Iterator[_T]:
+ return iter(args)
+
+ @staticmethod
+ def _concat(*iterables: Iterable[_T]) -> Iterable[_T]:
+ for iterable in iterables:
+ yield from iterable
+
+ @staticmethod
+ def concat(*iterables: Iterable[_T]) -> Iterator[_T]:
+ return iter(_Creators.Raw._concat(*iterables))
+
+ @staticmethod
+ def _wrap(f: Callable[_P, Iterable[_O]]) -> Callable[_P, CruIterator[_O]]:
+ def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> CruIterator[_O]:
+ return CruIterator(f(*args, **kwargs))
+
+ return _wrapped
+
+ empty = _wrap(Raw.empty)
+ range = _wrap(Raw.range)
+ unite = _wrap(Raw.unite)
+ concat = _wrap(Raw.concat)
+
+
+class CruIterator(Generic[_T]):
+ ElementOperation: TypeAlias = Callable[[_V], Any]
+ ElementPredicate: TypeAlias = Callable[[_V], bool]
+ AnyElementPredicate: TypeAlias = ElementPredicate[Any]
+ ElementTransformer: TypeAlias = Callable[[_V], _O]
+ SelfElementTransformer: TypeAlias = ElementTransformer[_V, _V]
+ AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any]
+
+ Creators: TypeAlias = _Creators
+ Helpers: TypeAlias = _Helpers
+
+ def __init__(self, iterable: Iterable[_T]) -> None:
+ self._iterator = iter(iterable)
+
+ def __iter__(self) -> Iterator[_T]:
+ return self._iterator
+
+ def create_new_me(self, iterable: Iterable[_O]) -> CruIterator[_O]:
+ return type(self)(iterable) # type: ignore
+
+ @staticmethod
+ def _wrap(
+ f: Callable[Concatenate[CruIterator[_T], _P], Iterable[_O]],
+ ) -> Callable[Concatenate[CruIterator[_T], _P], CruIterator[_O]]:
+ def _wrapped(
+ self: CruIterator[_T], *args: _P.args, **kwargs: _P.kwargs
+ ) -> CruIterator[_O]:
+ return self.create_new_me(f(self, *args, **kwargs))
+
+ return _wrapped
+
+ @_wrap
+ def replace_me(self, iterable: Iterable[_O]) -> Iterable[_O]:
+ return iterable
+
+ def replace_me_with_empty(self) -> CruIterator[Never]:
+ return self.create_new_me(_Creators.Raw.empty())
+
+ def replace_me_with_range(self, *args) -> CruIterator[int]:
+ return self.create_new_me(_Creators.Raw.range(*args))
+
+ def replace_me_with_unite(self, *args: _O) -> CruIterator[_O]:
+ return self.create_new_me(_Creators.Raw.unite(*args))
+
+ def replace_me_with_concat(self, *iterables: Iterable[_T]) -> CruIterator[_T]:
+ return self.create_new_me(_Creators.Raw.concat(*iterables))
+
+ def to_set(self) -> set[_T]:
+ return set(self)
+
+ def to_list(self) -> list[_T]:
+ return list(self)
+
+ def all(self, predicate: ElementPredicate[_T]) -> bool:
+ for value in self:
+ if not predicate(value):
+ return False
+ return True
+
+ def any(self, predicate: ElementPredicate[_T]) -> bool:
+ for value in self:
+ if predicate(value):
+ return True
+ return False
+
+ def foreach(self, operation: ElementOperation[_T]) -> None:
+ for value in self:
+ operation(value)
+
+ @_wrap
+ def transform(self, transformer: ElementTransformer[_T, _O]) -> Iterable[_O]:
+ for value in self:
+ yield transformer(value)
+
+ map = transform
+
+ @_wrap
+ def filter(self, predicate: ElementPredicate[_T]) -> Iterable[_T]:
+ for value in self:
+ if predicate(value):
+ yield value
+
+ @_wrap
+ def continue_if(self, predicate: ElementPredicate[_T]) -> Iterable[_T]:
+ for value in self:
+ yield value
+ if not predicate(value):
+ break
+
+ def first_n(self, max_count: int) -> CruIterator[_T]:
+ if max_count < 0:
+ raise ValueError("max_count must be 0 or positive.")
+ if max_count == 0:
+ return self.replace_me_with_empty() # type: ignore
+ return self.continue_if(_Helpers.auto_count(lambda i, _: i < max_count - 1))
+
+ def drop_n(self, n: int) -> CruIterator[_T]:
+ if n < 0:
+ raise ValueError("n must be 0 or positive.")
+ if n == 0:
+ return self
+ return self.filter(_Helpers.auto_count(lambda i, _: i < n))
+
+ def single_or(
+ self, fallback: _O | CruNotFound = CruNotFound.VALUE
+ ) -> _T | _O | CruNotFound:
+ first_2 = self.first_n(2)
+ has_value = False
+ for element in first_2:
+ if has_value:
+ raise ValueError("More than one value found.")
+ has_value = True
+ value = element
+ if has_value:
+ return value
+ else:
+ return fallback
+
+ def first_or(
+ self, fallback: _O | CruNotFound = CruNotFound.VALUE
+ ) -> _T | _O | CruNotFound:
+ return self.first_n(1).single_or(fallback)
+
+ @_wrap
+ def flatten(self, max_depth: int = -1) -> Iterable[Any]:
+ return _Generic.iterable_flatten(self, max_depth)
+
+ def select_by_indices(self, indices: Iterable[int]) -> CruIterator[_T]:
+ index_set = set(indices)
+ max_index = max(index_set)
+ return self.first_n(max_index + 1).filter(
+ _Helpers.auto_count(lambda i, _: i in index_set)
+ )
+
+ def remove_values(self, values: Iterable[Any]) -> CruIterator[_T]:
+ value_set = set(values)
+ return self.filter(lambda v: v not in value_set)
+
+ def replace_values(
+ self, old_values: Iterable[Any], new_value: _O
+ ) -> Iterable[_T | _O]:
+ value_set = set(old_values)
+ return self.transform(lambda v: new_value if v in value_set else v)
+
+ def group_by(self, key_getter: Callable[[_T], _O]) -> dict[_O, list[_T]]:
+ result: dict[_O, list[_T]] = {}
+
+ for item in self:
+ key = key_getter(item)
+ if key not in result:
+ result[key] = []
+ result[key].append(item)
+
+ return result
+
+
+class CruIterMixin(Generic[_T]):
+ def cru_iter(self: Iterable[_T]) -> CruIterator[_T]:
+ return CruIterator(self)
+
+
+class CruIterList(list[_T], CruIterMixin[_T]):
+ pass
+
+
+class CruIterable:
+ Generic: TypeAlias = _Generic
+ Iterator: TypeAlias = CruIterator[_T]
+ Helpers: TypeAlias = _Helpers
+ Mixin: TypeAlias = CruIterMixin[_T]
+ IterList: TypeAlias = CruIterList[_T]
+
+
+CRU.add_objects(CruIterable, CruIterator)
diff --git a/tools/cru-py/cru/_type.py b/tools/cru-py/cru/_type.py
new file mode 100644
index 0000000..1f81da3
--- /dev/null
+++ b/tools/cru-py/cru/_type.py
@@ -0,0 +1,52 @@
+from collections.abc import Iterable
+from typing import Any
+
+from ._error import CruException, CruLogicError
+from ._iter import CruIterator
+
+
+class CruTypeCheckError(CruException):
+ pass
+
+
+DEFAULT_NONE_ERR_MSG = "None is not allowed here."
+DEFAULT_TYPE_ERR_MSG = "Object of this type is not allowed here."
+
+
+class CruTypeSet(set[type]):
+ def __init__(self, *types: type):
+ type_set = CruIterator(types).filter(lambda t: t is not None).to_set()
+ if not CruIterator(type_set).all(lambda t: isinstance(t, type)):
+ raise CruLogicError("TypeSet can only contain type.")
+ super().__init__(type_set)
+
+ def check_value(
+ self,
+ value: Any,
+ /,
+ allow_none: bool = False,
+ empty_allow_all: bool = True,
+ ) -> None:
+ if value is None:
+ if allow_none:
+ return
+ else:
+ raise CruTypeCheckError(DEFAULT_NONE_ERR_MSG)
+ if len(self) == 0 and empty_allow_all:
+ return
+ if not CruIterator(self).any(lambda t: isinstance(value, t)):
+ raise CruTypeCheckError(DEFAULT_TYPE_ERR_MSG)
+
+ def check_value_list(
+ self,
+ values: Iterable[Any],
+ /,
+ allow_none: bool = False,
+ empty_allow_all: bool = True,
+ ) -> None:
+ for value in values:
+ self.check_value(
+ value,
+ allow_none,
+ empty_allow_all,
+ )
diff --git a/tools/cru-py/cru/attr.py b/tools/cru-py/cru/attr.py
new file mode 100644
index 0000000..d4cc86a
--- /dev/null
+++ b/tools/cru-py/cru/attr.py
@@ -0,0 +1,364 @@
+from __future__ import annotations
+
+import copy
+from collections.abc import Callable, Iterable
+from dataclasses import dataclass, field
+from typing import Any
+
+from .list import CruUniqueKeyList
+from ._type import CruTypeSet
+from ._const import CruNotFound, CruUseDefault, CruDontChange
+from ._iter import CruIterator
+
+
+@dataclass
+class CruAttr:
+
+ name: str
+ value: Any
+ description: str | None
+
+ @staticmethod
+ def make(
+ name: str, value: Any = CruUseDefault.VALUE, description: str | None = None
+ ) -> CruAttr:
+ return CruAttr(name, value, description)
+
+
+CruAttrDefaultFactory = Callable[["CruAttrDef"], Any]
+CruAttrTransformer = Callable[[Any, "CruAttrDef"], Any]
+CruAttrValidator = Callable[[Any, "CruAttrDef"], None]
+
+
+@dataclass
+class CruAttrDef:
+ name: str
+ description: str
+ default_factory: CruAttrDefaultFactory
+ transformer: CruAttrTransformer
+ validator: CruAttrValidator
+
+ def __init__(
+ self,
+ name: str,
+ description: str,
+ default_factory: CruAttrDefaultFactory,
+ transformer: CruAttrTransformer,
+ validator: CruAttrValidator,
+ ) -> None:
+ self.name = name
+ self.description = description
+ self.default_factory = default_factory
+ self.transformer = transformer
+ self.validator = validator
+
+ def transform(self, value: Any) -> Any:
+ if self.transformer is not None:
+ return self.transformer(value, self)
+ return value
+
+ def validate(self, value: Any, /, force_allow_none: bool = False) -> None:
+ if force_allow_none is value is None:
+ return
+ if self.validator is not None:
+ self.validator(value, self)
+
+ def transform_and_validate(
+ self, value: Any, /, force_allow_none: bool = False
+ ) -> Any:
+ value = self.transform(value)
+ self.validate(value, force_allow_none)
+ return value
+
+ def make_default_value(self) -> Any:
+ return self.transform_and_validate(self.default_factory(self))
+
+ def adopt(self, attr: CruAttr) -> CruAttr:
+ attr = copy.deepcopy(attr)
+
+ if attr.name is None:
+ attr.name = self.name
+ elif attr.name != self.name:
+ raise ValueError(f"Attr name is not match: {attr.name} != {self.name}")
+
+ if attr.value is CruUseDefault.VALUE:
+ attr.value = self.make_default_value()
+ else:
+ attr.value = self.transform_and_validate(attr.value)
+
+ if attr.description is None:
+ attr.description = self.description
+
+ return attr
+
+ def make(
+ self, value: Any = CruUseDefault.VALUE, description: None | str = None
+ ) -> CruAttr:
+ value = self.make_default_value() if value is CruUseDefault.VALUE else value
+ value = self.transform_and_validate(value)
+ return CruAttr(
+ self.name,
+ value,
+ description if description is not None else self.description,
+ )
+
+
+@dataclass
+class CruAttrDefBuilder:
+
+ name: str
+ description: str
+ types: list[type] | None = field(default=None)
+ allow_none: bool = field(default=False)
+ default: Any = field(default=CruUseDefault.VALUE)
+ default_factory: CruAttrDefaultFactory | None = field(default=None)
+ auto_list: bool = field(default=False)
+ transformers: list[CruAttrTransformer] = field(default_factory=list)
+ validators: list[CruAttrValidator] = field(default_factory=list)
+ override_transformer: CruAttrTransformer | None = field(default=None)
+ override_validator: CruAttrValidator | None = field(default=None)
+
+ build_hook: Callable[[CruAttrDef], None] | None = field(default=None)
+
+ def __init__(self, name: str, description: str) -> None:
+ super().__init__()
+ self.name = name
+ self.description = description
+
+ def auto_adjust_default(self) -> None:
+ if self.default is not CruUseDefault.VALUE and self.default is not None:
+ return
+ if self.allow_none and self.default is CruUseDefault.VALUE:
+ self.default = None
+ if not self.allow_none and self.default is None:
+ self.default = CruUseDefault.VALUE
+ if self.auto_list and not self.allow_none:
+ self.default = []
+
+ def with_name(self, name: str | CruDontChange) -> CruAttrDefBuilder:
+ if name is not CruDontChange.VALUE:
+ self.name = name
+ return self
+
+ def with_description(
+ self, default_description: str | CruDontChange
+ ) -> CruAttrDefBuilder:
+ if default_description is not CruDontChange.VALUE:
+ self.description = default_description
+ return self
+
+ def with_default(self, default: Any) -> CruAttrDefBuilder:
+ if default is not CruDontChange.VALUE:
+ self.default = default
+ return self
+
+ def with_default_factory(
+ self,
+ default_factory: CruAttrDefaultFactory | CruDontChange,
+ ) -> CruAttrDefBuilder:
+ if default_factory is not CruDontChange.VALUE:
+ self.default_factory = default_factory
+ return self
+
+ def with_types(
+ self,
+ types: Iterable[type] | None | CruDontChange,
+ ) -> CruAttrDefBuilder:
+ if types is not CruDontChange.VALUE:
+ self.types = None if types is None else list(types)
+ return self
+
+ def with_allow_none(self, allow_none: bool | CruDontChange) -> CruAttrDefBuilder:
+ if allow_none is not CruDontChange.VALUE:
+ self.allow_none = allow_none
+ return self
+
+ def with_auto_list(
+ self, auto_list: bool | CruDontChange = True
+ ) -> CruAttrDefBuilder:
+ if auto_list is not CruDontChange.VALUE:
+ self.auto_list = auto_list
+ return self
+
+ def with_constraint(
+ self,
+ /,
+ allow_none: bool | CruDontChange = CruDontChange.VALUE,
+ types: Iterable[type] | None | CruDontChange = CruDontChange.VALUE,
+ default: Any = CruDontChange.VALUE,
+ default_factory: CruAttrDefaultFactory | CruDontChange = CruDontChange.VALUE,
+ auto_list: bool | CruDontChange = CruDontChange.VALUE,
+ ) -> CruAttrDefBuilder:
+ return (
+ self.with_allow_none(allow_none)
+ .with_types(types)
+ .with_default(default)
+ .with_default_factory(default_factory)
+ .with_auto_list(auto_list)
+ )
+
+ def add_transformer(self, transformer: CruAttrTransformer) -> CruAttrDefBuilder:
+ self.transformers.append(transformer)
+ return self
+
+ def clear_transformers(self) -> CruAttrDefBuilder:
+ self.transformers.clear()
+ return self
+
+ def add_validator(self, validator: CruAttrValidator) -> CruAttrDefBuilder:
+ self.validators.append(validator)
+ return self
+
+ def clear_validators(self) -> CruAttrDefBuilder:
+ self.validators.clear()
+ return self
+
+ def with_override_transformer(
+ self, override_transformer: CruAttrTransformer | None | CruDontChange
+ ) -> CruAttrDefBuilder:
+ if override_transformer is not CruDontChange.VALUE:
+ self.override_transformer = override_transformer
+ return self
+
+ def with_override_validator(
+ self, override_validator: CruAttrValidator | None | CruDontChange
+ ) -> CruAttrDefBuilder:
+ if override_validator is not CruDontChange.VALUE:
+ self.override_validator = override_validator
+ return self
+
+ def is_valid(self) -> tuple[bool, str]:
+ if not isinstance(self.name, str):
+ return False, "Name must be a string!"
+ if not isinstance(self.description, str):
+ return False, "Default description must be a string!"
+ if (
+ not self.allow_none
+ and self.default is None
+ and self.default_factory is None
+ ):
+ return False, "Default must be set if allow_none is False!"
+ return True, ""
+
+ @staticmethod
+ def _build(
+ builder: CruAttrDefBuilder, auto_adjust_default: bool = True
+ ) -> CruAttrDef:
+ if auto_adjust_default:
+ builder.auto_adjust_default()
+
+ valid, err = builder.is_valid()
+ if not valid:
+ raise ValueError(err)
+
+ def composed_transformer(value: Any, attr_def: CruAttrDef) -> Any:
+ def transform_value(single_value: Any) -> Any:
+ for transformer in builder.transformers:
+ single_value = transformer(single_value, attr_def)
+ return single_value
+
+ if builder.auto_list:
+ if not isinstance(value, list):
+ value = [value]
+ value = CruIterator(value).transform(transform_value).to_list()
+
+ else:
+ value = transform_value(value)
+ return value
+
+ type_set = None if builder.types is None else CruTypeSet(*builder.types)
+
+ def composed_validator(value: Any, attr_def: CruAttrDef):
+ def validate_value(single_value: Any) -> None:
+ if type_set is not None:
+ type_set.check_value(single_value, allow_none=builder.allow_none)
+ for validator in builder.validators:
+ validator(single_value, attr_def)
+
+ if builder.auto_list:
+ CruIterator(value).foreach(validate_value)
+ else:
+ validate_value(value)
+
+ real_transformer = builder.override_transformer or composed_transformer
+ real_validator = builder.override_validator or composed_validator
+
+ default_factory = builder.default_factory
+ if default_factory is None:
+
+ def default_factory(_d):
+ return copy.deepcopy(builder.default)
+
+ d = CruAttrDef(
+ builder.name,
+ builder.description,
+ default_factory,
+ real_transformer,
+ real_validator,
+ )
+ if builder.build_hook:
+ builder.build_hook(d)
+ return d
+
+ def build(self, auto_adjust_default=True) -> CruAttrDef:
+ c = copy.deepcopy(self)
+ self.build_hook = None
+ return CruAttrDefBuilder._build(c, auto_adjust_default)
+
+
+class CruAttrDefRegistry(CruUniqueKeyList[CruAttrDef, str]):
+
+ def __init__(self) -> None:
+ super().__init__(lambda d: d.name)
+
+ def make_builder(self, name: str, default_description: str) -> CruAttrDefBuilder:
+ b = CruAttrDefBuilder(name, default_description)
+ b.build_hook = lambda a: self.add(a)
+ return b
+
+ def adopt(self, attr: CruAttr) -> CruAttr:
+ d = self.get(attr.name)
+ return d.adopt(attr)
+
+
+class CruAttrTable(CruUniqueKeyList[CruAttr, str]):
+ def __init__(self, registry: CruAttrDefRegistry) -> None:
+ self._registry: CruAttrDefRegistry = registry
+ super().__init__(lambda a: a.name, before_add=registry.adopt)
+
+ @property
+ def registry(self) -> CruAttrDefRegistry:
+ return self._registry
+
+ def get_value_or(self, name: str, fallback: Any = CruNotFound.VALUE) -> Any:
+ a = self.get_or(name, CruNotFound.VALUE)
+ if a is CruNotFound.VALUE:
+ return fallback
+ return a.value
+
+ def get_value(self, name: str) -> Any:
+ a = self.get(name)
+ return a.value
+
+ def make_attr(
+ self,
+ name: str,
+ value: Any = CruUseDefault.VALUE,
+ /,
+ description: str | None = None,
+ ) -> CruAttr:
+ d = self._registry.get(name)
+ return d.make(value, description or d.description)
+
+ def add_value(
+ self,
+ name: str,
+ value: Any = CruUseDefault.VALUE,
+ /,
+ description: str | None = None,
+ *,
+ replace: bool = False,
+ ) -> CruAttr:
+ attr = self.make_attr(name, value, description)
+ self.add(attr, replace)
+ return attr
diff --git a/tools/cru-py/cru/config.py b/tools/cru-py/cru/config.py
new file mode 100644
index 0000000..0f6f0d0
--- /dev/null
+++ b/tools/cru-py/cru/config.py
@@ -0,0 +1,196 @@
+from __future__ import annotations
+
+from typing import Any, TypeVar, Generic
+
+from ._error import CruException
+from .list import CruUniqueKeyList
+from .value import (
+ INTEGER_VALUE_TYPE,
+ TEXT_VALUE_TYPE,
+ CruValueTypeError,
+ ValueGeneratorBase,
+ ValueType,
+)
+
+_T = TypeVar("_T")
+
+
+class CruConfigError(CruException):
+ def __init__(self, message: str, item: ConfigItem, *args, **kwargs):
+ super().__init__(message, *args, **kwargs)
+ self._item = item
+
+ @property
+ def item(self) -> ConfigItem:
+ return self._item
+
+
+class ConfigItem(Generic[_T]):
+ def __init__(
+ self,
+ name: str,
+ description: str,
+ value_type: ValueType[_T],
+ value: _T | None = None,
+ /,
+ default: ValueGeneratorBase[_T] | _T | None = None,
+ ) -> None:
+ self._name = name
+ self._description = description
+ self._value_type = value_type
+ self._value = value
+ self._default = default
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def description(self) -> str:
+ return self._description
+
+ @property
+ def value_type(self) -> ValueType[_T]:
+ return self._value_type
+
+ @property
+ def is_set(self) -> bool:
+ return self._value is not None
+
+ @property
+ def value(self) -> _T:
+ if self._value is None:
+ raise CruConfigError(
+ "Config value is not set.",
+ self,
+ user_message=f"Config item {self.name} is not set.",
+ )
+ return self._value
+
+ @property
+ def value_str(self) -> str:
+ return self.value_type.convert_value_to_str(self.value)
+
+ def set_value(self, v: _T | str, allow_convert_from_str=False):
+ if allow_convert_from_str:
+ self._value = self.value_type.check_value_or_try_convert_from_str(v)
+ else:
+ self._value = self.value_type.check_value(v)
+
+ def reset(self):
+ self._value = None
+
+ @property
+ def default(self) -> ValueGeneratorBase[_T] | _T | None:
+ return self._default
+
+ @property
+ def can_generate_default(self) -> bool:
+ return self.default is not None
+
+ def generate_default_value(self) -> _T:
+ if self.default is None:
+ raise CruConfigError(
+ "Config item does not support default value generation.", self
+ )
+ elif isinstance(self.default, ValueGeneratorBase):
+ v = self.default.generate()
+ else:
+ v = self.default
+ try:
+ self.value_type.check_value(v)
+ return v
+ except CruValueTypeError as e:
+ raise CruConfigError(
+ "Config value generator returns an invalid value.", self
+ ) from e
+
+ def copy(self) -> "ConfigItem":
+ return ConfigItem(
+ self.name,
+ self.description,
+ self.value_type,
+ self.value,
+ self.default,
+ )
+
+ @property
+ def description_str(self) -> str:
+ return f"{self.name} ({self.value_type.name}): {self.description}"
+
+
+class Configuration(CruUniqueKeyList[ConfigItem[Any], str]):
+ def __init__(self):
+ super().__init__(lambda c: c.name)
+
+ def get_set_items(self) -> list[ConfigItem[Any]]:
+ return [item for item in self if item.is_set]
+
+ def get_unset_items(self) -> list[ConfigItem[Any]]:
+ return [item for item in self if not item.is_set]
+
+ @property
+ def all_set(self) -> bool:
+ return len(self.get_unset_items()) == 0
+
+ @property
+ def all_not_set(self) -> bool:
+ return len(self.get_set_items()) == 0
+
+ def add_text_config(
+ self,
+ name: str,
+ description: str,
+ value: str | None = None,
+ default: ValueGeneratorBase[str] | str | None = None,
+ ) -> ConfigItem[str]:
+ item = ConfigItem(name, description, TEXT_VALUE_TYPE, value, default)
+ self.add(item)
+ return item
+
+ def add_int_config(
+ self,
+ name: str,
+ description: str,
+ value: int | None = None,
+ default: ValueGeneratorBase[int] | int | None = None,
+ ) -> ConfigItem[int]:
+ item = ConfigItem(name, description, INTEGER_VALUE_TYPE, value, default)
+ self.add(item)
+ return item
+
+ def set_config_item(
+ self,
+ name: str,
+ value: Any | str,
+ allow_convert_from_str=True,
+ ) -> None:
+ item = self.get(name)
+ item.set_value(
+ value,
+ allow_convert_from_str=allow_convert_from_str,
+ )
+
+ def reset_all(self) -> None:
+ for item in self:
+ item.reset()
+
+ def to_dict(self) -> dict[str, Any]:
+ return {item.name: item.value for item in self}
+
+ def to_str_dict(self) -> dict[str, str]:
+ return {
+ item.name: item.value_type.convert_value_to_str(item.value) for item in self
+ }
+
+ def set_value_dict(
+ self,
+ value_dict: dict[str, Any],
+ allow_convert_from_str: bool = False,
+ ) -> None:
+ for name, value in value_dict.items():
+ item = self.get(name)
+ item.set_value(
+ value,
+ allow_convert_from_str=allow_convert_from_str,
+ )
diff --git a/tools/cru-py/cru/list.py b/tools/cru-py/cru/list.py
new file mode 100644
index 0000000..9d210b7
--- /dev/null
+++ b/tools/cru-py/cru/list.py
@@ -0,0 +1,160 @@
+from __future__ import annotations
+
+from collections.abc import Callable, Iterator
+from typing import Any, Generic, Iterable, TypeAlias, TypeVar, overload
+
+from ._error import CruInternalError
+from ._iter import CruIterator
+from ._const import CruNotFound
+
+_T = TypeVar("_T")
+_O = TypeVar("_O")
+
+
+class CruListEdit(CruIterator[_T]):
+ def __init__(self, iterable: Iterable[_T], _list: CruList[Any]) -> None:
+ super().__init__(iterable)
+ self._list = _list
+
+ def create_me(self, iterable: Iterable[_O]) -> CruListEdit[_O]:
+ return CruListEdit(iterable, self._list)
+
+ @property
+ def list(self) -> CruList[Any]:
+ return self._list
+
+ def done(self) -> CruList[Any]:
+ self._list.reset(self)
+ return self._list
+
+
+class CruList(list[_T]):
+ def reset(self, new_values: Iterable[_T]):
+ if self is new_values:
+ new_values = list(new_values)
+ self.clear()
+ self.extend(new_values)
+ return self
+
+ def as_cru_iterator(self) -> CruIterator[_T]:
+ return CruIterator(self)
+
+ @staticmethod
+ def make(maybe_list: Iterable[_T] | _T | None) -> CruList[_T]:
+ if maybe_list is None:
+ return CruList()
+ if isinstance(maybe_list, Iterable):
+ return CruList(maybe_list)
+ return CruList([maybe_list])
+
+
+_K = TypeVar("_K")
+
+_KeyGetter: TypeAlias = Callable[[_T], _K]
+
+
+class CruUniqueKeyList(Generic[_T, _K]):
+ def __init__(
+ self,
+ key_getter: _KeyGetter[_T, _K],
+ *,
+ before_add: Callable[[_T], _T] | None = None,
+ ):
+ super().__init__()
+ self._key_getter = key_getter
+ self._before_add = before_add
+ self._list: CruList[_T] = CruList()
+
+ @property
+ def key_getter(self) -> _KeyGetter[_T, _K]:
+ return self._key_getter
+
+ @property
+ def internal_list(self) -> CruList[_T]:
+ return self._list
+
+ def validate_self(self):
+ keys = self._list.transform(self._key_getter)
+ if len(keys) != len(set(keys)):
+ raise CruInternalError("Duplicate keys!")
+
+ @overload
+ def get_or(
+ self, key: _K, fallback: CruNotFound = CruNotFound.VALUE
+ ) -> _T | CruNotFound: ...
+
+ @overload
+ def get_or(self, key: _K, fallback: _O) -> _T | _O: ...
+
+ def get_or(
+ self, key: _K, fallback: _O | CruNotFound = CruNotFound.VALUE
+ ) -> _T | _O | CruNotFound:
+ return (
+ self._list.as_cru_iterator()
+ .filter(lambda v: key == self._key_getter(v))
+ .first_or(fallback)
+ )
+
+ def get(self, key: _K) -> _T:
+ value = self.get_or(key)
+ if value is CruNotFound:
+ raise KeyError(f"Key {key} not found!")
+ return value # type: ignore
+
+ @property
+ def keys(self) -> Iterable[_K]:
+ return self._list.as_cru_iterator().map(self._key_getter)
+
+ def has_key(self, key: _K) -> bool:
+ return self.get_or(key) != CruNotFound.VALUE
+
+ def try_remove(self, key: _K) -> bool:
+ value = self.get_or(key)
+ if value is CruNotFound.VALUE:
+ return False
+ self._list.remove(value)
+ return True
+
+ def remove(self, key: _K, allow_absence: bool = False) -> None:
+ if not self.try_remove(key) and not allow_absence:
+ raise KeyError(f"Key {key} not found!")
+
+ def add(self, value: _T, /, replace: bool = False) -> None:
+ v = self.get_or(self._key_getter(value))
+ if v is not CruNotFound.VALUE:
+ if not replace:
+ raise KeyError(f"Key {self._key_getter(v)} already exists!")
+ self._list.remove(v)
+ if self._before_add is not None:
+ value = self._before_add(value)
+ self._list.append(value)
+
+ def set(self, value: _T) -> None:
+ self.add(value, True)
+
+ def extend(self, iterable: Iterable[_T], /, replace: bool = False) -> None:
+ values = list(iterable)
+ to_remove = []
+ for value in values:
+ v = self.get_or(self._key_getter(value))
+ if v is not CruNotFound.VALUE:
+ if not replace:
+ raise KeyError(f"Key {self._key_getter(v)} already exists!")
+ to_remove.append(v)
+ for value in to_remove:
+ self._list.remove(value)
+ if self._before_add is not None:
+ values = [self._before_add(value) for value in values]
+ self._list.extend(values)
+
+ def clear(self) -> None:
+ self._list.reset([])
+
+ def __iter__(self) -> Iterator[_T]:
+ return iter(self._list)
+
+ def __len__(self) -> int:
+ return len(self._list)
+
+ def cru_iter(self) -> CruIterator[_T]:
+ return CruIterator(self._list)
diff --git a/tools/cru-py/cru/parsing.py b/tools/cru-py/cru/parsing.py
new file mode 100644
index 0000000..1d2fa7f
--- /dev/null
+++ b/tools/cru-py/cru/parsing.py
@@ -0,0 +1,98 @@
+from __future__ import annotations
+
+from abc import ABCMeta, abstractmethod
+from typing import NamedTuple, TypeAlias, TypeVar, Generic, NoReturn, Callable
+
+from ._error import CruException
+from ._iter import CruIterable
+
+_T = TypeVar("_T")
+
+
+class ParseError(CruException, Generic[_T]):
+ def __init__(
+ self,
+ message,
+ parser: Parser[_T],
+ text: str,
+ line_number: int | None = None,
+ *args,
+ **kwargs,
+ ):
+ super().__init__(message, *args, **kwargs)
+ self._parser = parser
+ self._text = text
+ self._line_number = line_number
+
+ @property
+ def parser(self) -> Parser[_T]:
+ return self._parser
+
+ @property
+ def text(self) -> str:
+ return self._text
+
+ @property
+ def line_number(self) -> int | None:
+ return self._line_number
+
+
+class Parser(Generic[_T], metaclass=ABCMeta):
+ def __init__(self, name: str) -> None:
+ self._name = name
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @abstractmethod
+ def parse(self, s: str) -> _T:
+ raise NotImplementedError()
+
+ def raise_parse_exception(
+ self, text: str, line_number: int | None = None
+ ) -> NoReturn:
+ a = line_number and f" at line {line_number}" or ""
+ raise ParseError(f"Parser {self.name} failed{a}.", self, text, line_number)
+
+
+class SimpleLineConfigParserEntry(NamedTuple):
+ key: str
+ value: str
+ line_number: int | None = None
+
+
+class SimpleLineConfigParserResult(CruIterable.IterList[SimpleLineConfigParserEntry]):
+ pass
+
+
+class SimpleLineConfigParser(Parser[SimpleLineConfigParserResult]):
+ """
+ The parsing result is a list of tuples (key, value, line number).
+ """
+
+ Entry: TypeAlias = SimpleLineConfigParserEntry
+ Result: TypeAlias = SimpleLineConfigParserResult
+
+ def __init__(self) -> None:
+ super().__init__(type(self).__name__)
+
+ def _parse(self, text: str, callback: Callable[[Entry], None]) -> None:
+ for ln, line in enumerate(text.splitlines()):
+ line_number = ln + 1
+ # check if it's a comment
+ if line.strip().startswith("#"):
+ continue
+ # check if there is a '='
+ if line.find("=") == -1:
+ self.raise_parse_exception("There is even no '='!", line_number)
+ # split at first '='
+ key, value = line.split("=", 1)
+ key = key.strip()
+ value = value.strip()
+ callback(SimpleLineConfigParserEntry(key, value, line_number))
+
+ def parse(self, text: str) -> Result:
+ result = SimpleLineConfigParserResult()
+ self._parse(text, lambda item: result.append(item))
+ return result
diff --git a/tools/cru-py/cru/service/__init__.py b/tools/cru-py/cru/service/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tools/cru-py/cru/service/__init__.py
diff --git a/tools/cru-py/cru/service/__main__.py b/tools/cru-py/cru/service/__main__.py
new file mode 100644
index 0000000..1c10e82
--- /dev/null
+++ b/tools/cru-py/cru/service/__main__.py
@@ -0,0 +1,20 @@
+from cru import CruException
+
+from ._app import create_app
+
+
+def main():
+ app = create_app()
+ app.run_command()
+
+
+if __name__ == "__main__":
+ try:
+ main()
+ except CruException as e:
+ user_message = e.get_user_message()
+ if user_message is not None:
+ print(f"Error: {user_message}")
+ exit(1)
+ else:
+ raise
diff --git a/tools/cru-py/cru/service/_app.py b/tools/cru-py/cru/service/_app.py
new file mode 100644
index 0000000..6030dad
--- /dev/null
+++ b/tools/cru-py/cru/service/_app.py
@@ -0,0 +1,34 @@
+from ._base import (
+ AppBase,
+ CommandDispatcher,
+ AppInitializer,
+ PathCommandProvider,
+)
+from ._config import ConfigManager
+from ._template import TemplateManager
+from ._nginx import NginxManager
+from ._external import CliToolCommandProvider
+
+APP_ID = "crupest"
+
+
+class App(AppBase):
+ def __init__(self):
+ super().__init__(APP_ID, f"{APP_ID}-service")
+ self.add_feature(PathCommandProvider())
+ self.add_feature(AppInitializer())
+ self.add_feature(ConfigManager())
+ self.add_feature(TemplateManager())
+ self.add_feature(NginxManager())
+ self.add_feature(CliToolCommandProvider())
+ self.add_feature(CommandDispatcher())
+
+ def run_command(self):
+ command_dispatcher = self.get_feature(CommandDispatcher)
+ command_dispatcher.run_command()
+
+
+def create_app() -> App:
+ app = App()
+ app.setup()
+ return app
diff --git a/tools/cru-py/cru/service/_base.py b/tools/cru-py/cru/service/_base.py
new file mode 100644
index 0000000..ad813c9
--- /dev/null
+++ b/tools/cru-py/cru/service/_base.py
@@ -0,0 +1,449 @@
+from __future__ import annotations
+
+from argparse import ArgumentParser, Namespace
+from abc import ABC, abstractmethod
+import argparse
+import os
+from pathlib import Path
+from typing import TypeVar, overload
+
+from cru import CruException, CruLogicError
+
+_Feature = TypeVar("_Feature", bound="AppFeatureProvider")
+
+
+class AppError(CruException):
+ pass
+
+
+class AppFeatureError(AppError):
+ def __init__(self, message, feature: type | str, *args, **kwargs):
+ super().__init__(message, *args, **kwargs)
+ self._feature = feature
+
+ @property
+ def feature(self) -> type | str:
+ return self._feature
+
+
+class AppPathError(CruException):
+ def __init__(self, message, _path: str | Path, *args, **kwargs):
+ super().__init__(message, *args, **kwargs)
+ self._path = str(_path)
+
+ @property
+ def path(self) -> str:
+ return self._path
+
+
+class AppPath(ABC):
+ def __init__(self, id: str, is_dir: bool, description: str) -> None:
+ self._is_dir = is_dir
+ self._id = id
+ self._description = description
+
+ @property
+ @abstractmethod
+ def parent(self) -> AppPath | None: ...
+
+ @property
+ @abstractmethod
+ def app(self) -> AppBase: ...
+
+ @property
+ def id(self) -> str:
+ return self._id
+
+ @property
+ def description(self) -> str:
+ return self._description
+
+ @property
+ def is_dir(self) -> bool:
+ return self._is_dir
+
+ @property
+ @abstractmethod
+ def full_path(self) -> Path: ...
+
+ @property
+ def full_path_str(self) -> str:
+ return str(self.full_path)
+
+ def check_parents(self, must_exist: bool = False) -> bool:
+ for p in reversed(self.full_path.parents):
+ if not p.exists() and not must_exist:
+ return False
+ if not p.is_dir():
+ raise AppPathError("Parents' path must be a dir.", self.full_path)
+ return True
+
+ def check_self(self, must_exist: bool = False) -> bool:
+ if not self.check_parents(must_exist):
+ return False
+ if not self.full_path.exists():
+ if not must_exist:
+ return False
+ raise AppPathError("Not exist.", self.full_path)
+ if self.is_dir:
+ if not self.full_path.is_dir():
+ raise AppPathError("Should be a directory, but not.", self.full_path)
+ else:
+ return True
+ else:
+ if not self.full_path.is_file():
+ raise AppPathError("Should be a file, but not.", self.full_path)
+ else:
+ return True
+
+ def ensure(self, create_file: bool = False) -> None:
+ e = self.check_self(False)
+ if not e:
+ os.makedirs(self.full_path.parent, exist_ok=True)
+ if self.is_dir:
+ os.mkdir(self.full_path)
+ elif create_file:
+ with open(self.full_path, "w") as f:
+ f.write("")
+
+ def add_subpath(
+ self,
+ name: str,
+ is_dir: bool,
+ /,
+ id: str | None = None,
+ description: str = "",
+ ) -> AppFeaturePath:
+ return self.app.add_path(name, is_dir, self, id, description)
+
+ @property
+ def app_relative_path(self) -> Path:
+ return self.full_path.relative_to(self.app.root.full_path)
+
+
+class AppFeaturePath(AppPath):
+ def __init__(
+ self,
+ parent: AppPath,
+ name: str,
+ is_dir: bool,
+ /,
+ id: str | None = None,
+ description: str = "",
+ ) -> None:
+ super().__init__(id or name, is_dir, description)
+ self._name = name
+ self._parent = parent
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def parent(self) -> AppPath:
+ return self._parent
+
+ @property
+ def app(self) -> AppBase:
+ return self.parent.app
+
+ @property
+ def full_path(self) -> Path:
+ return Path(self.parent.full_path, self.name).resolve()
+
+
+class AppRootPath(AppPath):
+ def __init__(self, app: AppBase):
+ super().__init__("root", True, "Application root path.")
+ self._app = app
+ self._full_path: Path | None = None
+
+ @property
+ def parent(self) -> None:
+ return None
+
+ @property
+ def app(self) -> AppBase:
+ return self._app
+
+ @property
+ def full_path(self) -> Path:
+ if self._full_path is None:
+ raise AppError("App root path is not set yet.")
+ return self._full_path
+
+ def setup(self, path: os.PathLike) -> None:
+ if self._full_path is not None:
+ raise AppError("App root path is already set.")
+ self._full_path = Path(path).resolve()
+
+
+class AppFeatureProvider(ABC):
+ def __init__(self, name: str, /, app: AppBase | None = None):
+ super().__init__()
+ self._name = name
+ self._app = app if app else AppBase.get_instance()
+
+ @property
+ def app(self) -> AppBase:
+ return self._app
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @abstractmethod
+ def setup(self) -> None: ...
+
+
+class AppCommandFeatureProvider(AppFeatureProvider):
+ @abstractmethod
+ def get_command_info(self) -> tuple[str, str]: ...
+
+ @abstractmethod
+ def setup_arg_parser(self, arg_parser: ArgumentParser): ...
+
+ @abstractmethod
+ def run_command(self, args: Namespace) -> None: ...
+
+
+DATA_DIR_NAME = "data"
+
+
+class PathCommandProvider(AppCommandFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("path-command-provider")
+
+ def setup(self):
+ pass
+
+ def get_command_info(self):
+ return ("path", "Get information about paths used by app.")
+
+ def setup_arg_parser(self, arg_parser: ArgumentParser) -> None:
+ subparsers = arg_parser.add_subparsers(
+ dest="path_command", required=True, metavar="PATH_COMMAND"
+ )
+ _list_parser = subparsers.add_parser(
+ "list", help="list special paths used by app"
+ )
+
+ def run_command(self, args: Namespace) -> None:
+ if args.path_command == "list":
+ for path in self.app.paths:
+ print(f"{path.app_relative_path.as_posix()}: {path.description}")
+
+
+class CommandDispatcher(AppFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("command-dispatcher")
+ self._parsed_args: argparse.Namespace | None = None
+
+ def setup_arg_parser(self) -> None:
+ epilog = """
+==> to start,
+./tools/manage init
+./tools/manage config init
+ln -s generated/docker-compose.yaml .
+# Then edit config file.
+
+==> to update
+git pull
+./tools/manage template generate --no-dry-run
+docker compose up
+ """.strip()
+
+ self._map: dict[str, AppCommandFeatureProvider] = {}
+ arg_parser = argparse.ArgumentParser(
+ description="Service management",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog=epilog,
+ )
+ arg_parser.add_argument(
+ "--project-dir",
+ help="The path of the project directory.",
+ required=True,
+ type=str,
+ )
+ subparsers = arg_parser.add_subparsers(
+ dest="command",
+ help="The management command to execute.",
+ metavar="COMMAND",
+ )
+ for feature in self.app.features:
+ if isinstance(feature, AppCommandFeatureProvider):
+ info = feature.get_command_info()
+ command_subparser = subparsers.add_parser(info[0], help=info[1])
+ feature.setup_arg_parser(command_subparser)
+ self._map[info[0]] = feature
+ self._arg_parser = arg_parser
+
+ def setup(self):
+ pass
+
+ @property
+ def arg_parser(self) -> argparse.ArgumentParser:
+ return self._arg_parser
+
+ @property
+ def map(self) -> dict[str, AppCommandFeatureProvider]:
+ return self._map
+
+ def get_program_parsed_args(self) -> argparse.Namespace:
+ if self._parsed_args is None:
+ self._parsed_args = self.arg_parser.parse_args()
+ return self._parsed_args
+
+ def run_command(self, args: argparse.Namespace | None = None) -> None:
+ real_args = args or self.get_program_parsed_args()
+ if real_args.command is None:
+ self.arg_parser.print_help()
+ return
+ self.map[real_args.command].run_command(real_args)
+
+
+class AppInitializer(AppCommandFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("app-initializer")
+
+ def _init_app(self) -> bool:
+ if self.app.app_initialized:
+ return False
+ self.app.data_dir.ensure()
+ return True
+
+ def setup(self):
+ pass
+
+ def get_command_info(self):
+ return ("init", "Initialize the app.")
+
+ def setup_arg_parser(self, arg_parser):
+ pass
+
+ def run_command(self, args):
+ init = self._init_app()
+ if init:
+ print("App initialized successfully.")
+ else:
+ print("App is already initialized. Do nothing.")
+
+
+class AppBase:
+ _instance: AppBase | None = None
+
+ @staticmethod
+ def get_instance() -> AppBase:
+ if AppBase._instance is None:
+ raise AppError("App instance not initialized")
+ return AppBase._instance
+
+ def __init__(self, app_id: str, name: str):
+ AppBase._instance = self
+ self._app_id = app_id
+ self._name = name
+ self._root = AppRootPath(self)
+ self._paths: list[AppFeaturePath] = []
+ self._features: list[AppFeatureProvider] = []
+
+ def setup(self) -> None:
+ command_dispatcher = self.get_feature(CommandDispatcher)
+ command_dispatcher.setup_arg_parser()
+ program_args = command_dispatcher.get_program_parsed_args()
+ self.setup_root(program_args.project_dir)
+ self._data_dir = self.add_path(DATA_DIR_NAME, True, id="data")
+ for feature in self.features:
+ feature.setup()
+ for path in self.paths:
+ path.check_self()
+
+ @property
+ def app_id(self) -> str:
+ return self._app_id
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def root(self) -> AppRootPath:
+ return self._root
+
+ def setup_root(self, path: os.PathLike) -> None:
+ self._root.setup(path)
+
+ @property
+ def data_dir(self) -> AppFeaturePath:
+ return self._data_dir
+
+ @property
+ def app_initialized(self) -> bool:
+ return self.data_dir.check_self()
+
+ def ensure_app_initialized(self) -> AppRootPath:
+ if not self.app_initialized:
+ raise AppError(
+ user_message="Root directory does not exist. "
+ "Please run 'init' to create one."
+ )
+ return self.root
+
+ @property
+ def features(self) -> list[AppFeatureProvider]:
+ return self._features
+
+ @property
+ def paths(self) -> list[AppFeaturePath]:
+ return self._paths
+
+ def add_feature(self, feature: _Feature) -> _Feature:
+ for f in self.features:
+ if f.name == feature.name:
+ raise AppFeatureError(
+ f"Duplicate feature name: {feature.name}.", feature.name
+ )
+ self._features.append(feature)
+ return feature
+
+ def add_path(
+ self,
+ name: str,
+ is_dir: bool,
+ /,
+ parent: AppPath | None = None,
+ id: str | None = None,
+ description: str = "",
+ ) -> AppFeaturePath:
+ p = AppFeaturePath(
+ parent or self.root, name, is_dir, id=id, description=description
+ )
+ self._paths.append(p)
+ return p
+
+ @overload
+ def get_feature(self, feature: str) -> AppFeatureProvider: ...
+
+ @overload
+ def get_feature(self, feature: type[_Feature]) -> _Feature: ...
+
+ def get_feature(
+ self, feature: str | type[_Feature]
+ ) -> AppFeatureProvider | _Feature:
+ if isinstance(feature, str):
+ for f in self._features:
+ if f.name == feature:
+ return f
+ elif isinstance(feature, type):
+ for f in self._features:
+ if isinstance(f, feature):
+ return f
+ else:
+ raise CruLogicError("Argument must be the name of feature or its class.")
+
+ raise AppFeatureError(f"Feature {feature} not found.", feature)
+
+ def get_path(self, name: str) -> AppFeaturePath:
+ for p in self._paths:
+ if p.id == name or p.name == name:
+ return p
+ raise AppPathError(f"Application path {name} not found.", name)
diff --git a/tools/cru-py/cru/service/_config.py b/tools/cru-py/cru/service/_config.py
new file mode 100644
index 0000000..b51e21c
--- /dev/null
+++ b/tools/cru-py/cru/service/_config.py
@@ -0,0 +1,446 @@
+from collections.abc import Iterable
+from typing import Any, Literal, overload
+
+from cru import CruException
+from cru.config import Configuration, ConfigItem
+from cru.value import (
+ INTEGER_VALUE_TYPE,
+ TEXT_VALUE_TYPE,
+ CruValueTypeError,
+ RandomStringValueGenerator,
+ UuidValueGenerator,
+)
+from cru.parsing import ParseError, SimpleLineConfigParser
+
+from ._base import AppFeaturePath, AppCommandFeatureProvider
+
+
+class AppConfigError(CruException):
+ def __init__(
+ self, message: str, configuration: Configuration, *args, **kwargs
+ ) -> None:
+ super().__init__(message, *args, **kwargs)
+ self._configuration = configuration
+
+ @property
+ def configuration(self) -> Configuration:
+ return self._configuration
+
+
+class AppConfigFileError(AppConfigError):
+ def __init__(
+ self,
+ message: str,
+ configuration: Configuration,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(message, configuration, *args, **kwargs)
+
+
+class AppConfigFileNotFoundError(AppConfigFileError):
+ def __init__(
+ self,
+ message: str,
+ configuration: Configuration,
+ file_path: str,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(message, configuration, *args, **kwargs)
+ self._file_path = file_path
+
+ @property
+ def file_path(self) -> str:
+ return self._file_path
+
+
+class AppConfigFileParseError(AppConfigFileError):
+ def __init__(
+ self,
+ message: str,
+ configuration: Configuration,
+ file_content: str,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(message, configuration, *args, **kwargs)
+ self._file_content = file_content
+ self.__cause__: ParseError
+
+ @property
+ def file_content(self) -> str:
+ return self._file_content
+
+ def get_user_message(self) -> str:
+ return f"Error while parsing config file at line {self.__cause__.line_number}."
+
+
+class AppConfigFileEntryError(AppConfigFileError):
+ def __init__(
+ self,
+ message: str,
+ configuration: Configuration,
+ entries: Iterable[SimpleLineConfigParser.Entry],
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(message, configuration, *args, **kwargs)
+ self._entries = list(entries)
+
+ @property
+ def error_entries(self) -> list[SimpleLineConfigParser.Entry]:
+ return self._entries
+
+ @staticmethod
+ def entries_to_friendly_message(
+ entries: Iterable[SimpleLineConfigParser.Entry],
+ ) -> str:
+ return "\n".join(
+ f"line {entry.line_number}: {entry.key}={entry.value}" for entry in entries
+ )
+
+ @property
+ def friendly_message_head(self) -> str:
+ return "Error entries found in config file"
+
+ def get_user_message(self) -> str:
+ return (
+ f"{self.friendly_message_head}:\n"
+ f"{self.entries_to_friendly_message(self.error_entries)}"
+ )
+
+
+class AppConfigDuplicateEntryError(AppConfigFileEntryError):
+ @property
+ def friendly_message_head(self) -> str:
+ return "Duplicate entries found in config file"
+
+
+class AppConfigEntryValueFormatError(AppConfigFileEntryError):
+ @property
+ def friendly_message_head(self) -> str:
+ return "Invalid value format for entries"
+
+
+class AppConfigItemNotSetError(AppConfigError):
+ def __init__(
+ self,
+ message: str,
+ configuration: Configuration,
+ items: list[ConfigItem],
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(message, configuration, *args, **kwargs)
+ self._items = items
+
+
+class ConfigManager(AppCommandFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("config-manager")
+ configuration = Configuration()
+ self._configuration = configuration
+ self._loaded: bool = False
+ self._init_app_defined_items()
+
+ def _init_app_defined_items(self) -> None:
+ prefix = self.config_name_prefix
+
+ def _add_text(name: str, description: str) -> ConfigItem:
+ item = ConfigItem(f"{prefix}_{name}", description, TEXT_VALUE_TYPE)
+ self.configuration.add(item)
+ return item
+
+ def _add_uuid(name: str, description: str) -> ConfigItem:
+ item = ConfigItem(
+ f"{prefix}_{name}",
+ description,
+ TEXT_VALUE_TYPE,
+ default=UuidValueGenerator(),
+ )
+ self.configuration.add(item)
+ return item
+
+ def _add_random_string(
+ name: str, description: str, length: int = 32, secure: bool = True
+ ) -> ConfigItem:
+ item = ConfigItem(
+ f"{prefix}_{name}",
+ description,
+ TEXT_VALUE_TYPE,
+ default=RandomStringValueGenerator(length, secure),
+ )
+ self.configuration.add(item)
+ return item
+
+ def _add_int(name: str, description: str) -> ConfigItem:
+ item = ConfigItem(f"{prefix}_{name}", description, INTEGER_VALUE_TYPE)
+ self.configuration.add(item)
+ return item
+
+ self._domain = _add_text("DOMAIN", "domain name")
+ self._email = _add_text("EMAIL", "admin email address")
+ _add_text(
+ "AUTO_BACKUP_COS_SECRET_ID",
+ "access key id for Tencent COS, used for auto backup",
+ )
+ _add_text(
+ "AUTO_BACKUP_COS_SECRET_KEY",
+ "access key secret for Tencent COS, used for auto backup",
+ )
+ _add_text(
+ "AUTO_BACKUP_COS_REGION", "region for Tencent COS, used for auto backup"
+ )
+ _add_text(
+ "AUTO_BACKUP_BUCKET_NAME",
+ "bucket name for Tencent COS, used for auto backup",
+ )
+ _add_text("GITHUB_USERNAME", "github username for fetching todos")
+ _add_int("GITHUB_PROJECT_NUMBER", "github project number for fetching todos")
+ _add_text("GITHUB_TOKEN", "github token for fetching todos")
+ _add_text("GITHUB_TODO_COUNT", "github todo count")
+ _add_uuid("V2RAY_TOKEN", "v2ray user id")
+ _add_uuid("V2RAY_PATH", "v2ray path, which will be prefixed by _")
+ _add_text("FORGEJO_MAILER_USER", "Forgejo SMTP user")
+ _add_text("FORGEJO_MAILER_PASSWD", "Forgejo SMTP password")
+ _add_random_string("2FAUTH_APP_KEY", "2FAuth App Key")
+ _add_text("2FAUTH_MAIL_USERNAME", "2FAuth SMTP user")
+ _add_text("2FAUTH_MAIL_PASSWORD", "2FAuth SMTP password")
+
+ def setup(self) -> None:
+ self._config_file_path = self.app.data_dir.add_subpath(
+ "config", False, description="Configuration file path."
+ )
+
+ @property
+ def config_name_prefix(self) -> str:
+ return self.app.app_id.upper()
+
+ @property
+ def configuration(self) -> Configuration:
+ return self._configuration
+
+ @property
+ def config_file_path(self) -> AppFeaturePath:
+ return self._config_file_path
+
+ @property
+ def all_set(self) -> bool:
+ return self.configuration.all_set
+
+ def get_item(self, name: str) -> ConfigItem[Any]:
+ if not name.startswith(self.config_name_prefix + "_"):
+ name = f"{self.config_name_prefix}_{name}"
+
+ item = self.configuration.get_or(name, None)
+ if item is None:
+ raise AppConfigError(f"Config item '{name}' not found.", self.configuration)
+ return item
+
+ @overload
+ def get_item_value_str(self, name: str) -> str: ...
+
+ @overload
+ def get_item_value_str(self, name: str, ensure_set: Literal[True]) -> str: ...
+
+ @overload
+ def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: ...
+
+ def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None:
+ self.load_config_file()
+ item = self.get_item(name)
+ if not item.is_set:
+ if ensure_set:
+ raise AppConfigItemNotSetError(
+ f"Config item '{name}' is not set.", self.configuration, [item]
+ )
+ return None
+ return item.value_str
+
+ def get_str_dict(self, ensure_all_set: bool = True) -> dict[str, str]:
+ self.load_config_file()
+ if ensure_all_set and not self.configuration.all_set:
+ raise AppConfigItemNotSetError(
+ "Some config items are not set.",
+ self.configuration,
+ self.configuration.get_unset_items(),
+ )
+ return self.configuration.to_str_dict()
+
+ @property
+ def domain_item_name(self) -> str:
+ return self._domain.name
+
+ def get_domain_value_str(self) -> str:
+ return self.get_item_value_str(self._domain.name)
+
+ def get_email_value_str_optional(self) -> str | None:
+ return self.get_item_value_str(self._email.name, ensure_set=False)
+
+ def _set_with_default(self) -> None:
+ if not self.configuration.all_not_set:
+ raise AppConfigError(
+ "Config is not clean. "
+ "Some config items are already set. "
+ "Can't set again with default value.",
+ self.configuration,
+ )
+ for item in self.configuration:
+ if item.can_generate_default:
+ item.set_value(item.generate_default_value())
+
+ def _to_config_file_content(self) -> str:
+ content = "".join(
+ [
+ f"{item.name}={item.value_str if item.is_set else ''}\n"
+ for item in self.configuration
+ ]
+ )
+ return content
+
+ def _create_init_config_file(self) -> None:
+ if self.config_file_path.check_self():
+ raise AppConfigError(
+ "Config file already exists.",
+ self.configuration,
+ user_message=f"The config file at "
+ f"{self.config_file_path.full_path_str} already exists.",
+ )
+ self._set_with_default()
+ self.config_file_path.ensure()
+ with open(
+ self.config_file_path.full_path, "w", encoding="utf-8", newline="\n"
+ ) as file:
+ file.write(self._to_config_file_content())
+
+ def _parse_config_file(self) -> SimpleLineConfigParser.Result:
+ if not self.config_file_path.check_self():
+ raise AppConfigFileNotFoundError(
+ "Config file not found.",
+ self.configuration,
+ self.config_file_path.full_path_str,
+ user_message=f"The config file at "
+ f"{self.config_file_path.full_path_str} does not exist. "
+ f"You can create an initial one with 'init' command.",
+ )
+
+ text = self.config_file_path.full_path.read_text()
+ try:
+ parser = SimpleLineConfigParser()
+ return parser.parse(text)
+ except ParseError as e:
+ raise AppConfigFileParseError(
+ "Failed to parse config file.", self.configuration, text
+ ) from e
+
+ def _parse_and_print_config_file(self) -> None:
+ parse_result = self._parse_config_file()
+ for entry in parse_result:
+ print(f"{entry.key}={entry.value}")
+
+ def _check_duplicate(
+ self,
+ parse_result: dict[str, list[SimpleLineConfigParser.Entry]],
+ ) -> dict[str, SimpleLineConfigParser.Entry]:
+ entry_dict: dict[str, SimpleLineConfigParser.Entry] = {}
+ duplicate_entries: list[SimpleLineConfigParser.Entry] = []
+ for key, entries in parse_result.items():
+ entry_dict[key] = entries[0]
+ if len(entries) > 1:
+ duplicate_entries.extend(entries)
+ if len(duplicate_entries) > 0:
+ raise AppConfigDuplicateEntryError(
+ "Duplicate entries found.", self.configuration, duplicate_entries
+ )
+
+ return entry_dict
+
+ def _check_type(
+ self, entry_dict: dict[str, SimpleLineConfigParser.Entry]
+ ) -> dict[str, Any]:
+ value_dict: dict[str, Any] = {}
+ error_entries: list[SimpleLineConfigParser.Entry] = []
+ errors: list[CruValueTypeError] = []
+ for key, entry in entry_dict.items():
+ config_item = self.configuration.get(key)
+ try:
+ if entry.value == "":
+ value_dict[key] = None
+ else:
+ value_dict[key] = config_item.value_type.convert_str_to_value(
+ entry.value
+ )
+ except CruValueTypeError as e:
+ error_entries.append(entry)
+ errors.append(e)
+ if len(error_entries) > 0:
+ raise AppConfigEntryValueFormatError(
+ "Entry value format is not correct.",
+ self.configuration,
+ error_entries,
+ ) from ExceptionGroup("Multiple format errors occurred.", errors)
+ return value_dict
+
+ def _read_config_file(self) -> dict[str, Any]:
+ parsed = self._parse_config_file()
+ entry_groups = parsed.cru_iter().group_by(lambda e: e.key)
+ entry_dict = self._check_duplicate(entry_groups)
+ value_dict = self._check_type(entry_dict)
+ return value_dict
+
+ def _real_load_config_file(self) -> None:
+ self.configuration.reset_all()
+ value_dict = self._read_config_file()
+ for key, value in value_dict.items():
+ if value is None:
+ continue
+ self.configuration.set_config_item(key, value)
+
+ def load_config_file(self, force=False) -> None:
+ if force or not self._loaded:
+ self._real_load_config_file()
+ self._loaded = True
+
+ def _print_app_config_info(self):
+ for item in self.configuration:
+ print(item.description_str)
+
+ def get_command_info(self):
+ return "config", "Manage configuration."
+
+ def setup_arg_parser(self, arg_parser) -> None:
+ subparsers = arg_parser.add_subparsers(
+ dest="config_command", required=True, metavar="CONFIG_COMMAND"
+ )
+ _init_parser = subparsers.add_parser(
+ "init", help="create an initial config file"
+ )
+ _print_app_parser = subparsers.add_parser(
+ "print-app",
+ help="print information of the config items defined by app",
+ )
+ _print_parser = subparsers.add_parser("print", help="print current config")
+ _check_config_parser = subparsers.add_parser(
+ "check",
+ help="check the validity of the config file",
+ )
+ _check_config_parser.add_argument(
+ "-f",
+ "--format-only",
+ action="store_true",
+ help="only check content format, not app config item requirements.",
+ )
+
+ def run_command(self, args) -> None:
+ if args.config_command == "init":
+ self._create_init_config_file()
+ elif args.config_command == "print-app":
+ self._print_app_config_info()
+ elif args.config_command == "print":
+ self._parse_and_print_config_file()
+ elif args.config_command == "check":
+ if args.format_only:
+ self._parse_config_file()
+ else:
+ self._read_config_file()
diff --git a/tools/cru-py/cru/service/_external.py b/tools/cru-py/cru/service/_external.py
new file mode 100644
index 0000000..2347e95
--- /dev/null
+++ b/tools/cru-py/cru/service/_external.py
@@ -0,0 +1,81 @@
+from ._base import AppCommandFeatureProvider
+from ._nginx import NginxManager
+
+
+class CliToolCommandProvider(AppCommandFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("cli-tool-command-provider")
+
+ def setup(self):
+ pass
+
+ def get_command_info(self):
+ return ("gen-cli", "Get commands of running external cli tools.")
+
+ def setup_arg_parser(self, arg_parser):
+ subparsers = arg_parser.add_subparsers(
+ dest="gen_cli_command", required=True, metavar="GEN_CLI_COMMAND"
+ )
+ certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
+ certbot_parser.add_argument(
+ "-t", "--test", action="store_true", help="run certbot in test mode"
+ )
+ _install_docker_parser = subparsers.add_parser(
+ "install-docker", help="print docker installation commands"
+ )
+ _update_blog_parser = subparsers.add_parser(
+ "update-blog", help="print blog update command"
+ )
+
+ def _print_install_docker_commands(self) -> None:
+ output = """
+### COMMAND: uninstall apt docker
+for pkg in docker.io docker-doc docker-compose \
+podman-docker containerd runc; \
+do sudo apt-get remove $pkg; done
+
+### COMMAND: prepare apt certs
+sudo apt-get update
+sudo apt-get install ca-certificates curl
+sudo install -m 0755 -d /etc/apt/keyrings
+
+### COMMAND: install certs
+sudo curl -fsSL https://download.docker.com/linux/debian/gpg \
+-o /etc/apt/keyrings/docker.asc
+sudo chmod a+r /etc/apt/keyrings/docker.asc
+
+### COMMAND: add docker apt source
+echo \\
+ "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \
+https://download.docker.com/linux/debian \\
+ $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\
+ sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
+
+### COMMAND: update apt and install docker
+sudo apt-get update
+sudo apt-get install docker-ce docker-ce-cli containerd.io \
+docker-buildx-plugin docker-compose-plugin
+
+### COMMAND: setup system for docker
+sudo systemctl enable docker
+sudo systemctl start docker
+sudo groupadd -f docker
+sudo usermod -aG docker $USER
+# Remember to log out and log back in for the group changes to take effect
+""".strip()
+ print(output)
+
+ def _print_update_blog_command(self):
+ output = """
+### COMMAND: update blog
+docker exec -it blog /scripts/update.bash
+""".strip()
+ print(output)
+
+ def run_command(self, args):
+ if args.gen_cli_command == "certbot":
+ self.app.get_feature(NginxManager).print_all_certbot_commands(args.test)
+ elif args.gen_cli_command == "install-docker":
+ self._print_install_docker_commands()
+ elif args.gen_cli_command == "update-blog":
+ self._print_update_blog_command() \ No newline at end of file
diff --git a/tools/cru-py/cru/service/_nginx.py b/tools/cru-py/cru/service/_nginx.py
new file mode 100644
index 0000000..e0a9c60
--- /dev/null
+++ b/tools/cru-py/cru/service/_nginx.py
@@ -0,0 +1,281 @@
+from argparse import Namespace
+from enum import Enum, auto
+import re
+import subprocess
+from typing import TypeAlias
+
+from cru import CruInternalError
+
+from ._base import AppCommandFeatureProvider
+from ._config import ConfigManager
+from ._template import TemplateManager
+
+
+class CertbotAction(Enum):
+ CREATE = auto()
+ EXPAND = auto()
+ SHRINK = auto()
+ RENEW = auto()
+
+
+class NginxManager(AppCommandFeatureProvider):
+ CertbotAction: TypeAlias = CertbotAction
+
+ def __init__(self) -> None:
+ super().__init__("nginx-manager")
+ self._domains_cache: list[str] | None = None
+
+ def setup(self) -> None:
+ pass
+
+ @property
+ def _config_manager(self) -> ConfigManager:
+ return self.app.get_feature(ConfigManager)
+
+ @property
+ def root_domain(self) -> str:
+ return self._config_manager.get_domain_value_str()
+
+ @property
+ def domains(self) -> list[str]:
+ if self._domains_cache is None:
+ self._domains_cache = self._get_domains()
+ return self._domains_cache
+
+ @property
+ def subdomains(self) -> list[str]:
+ suffix = "." + self.root_domain
+ return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)]
+
+ @property
+ def _domain_config_name(self) -> str:
+ return self._config_manager.domain_item_name
+
+ def _get_domains_from_text(self, text: str) -> set[str]:
+ domains: set[str] = set()
+ regex = re.compile(r"server_name\s+(\S+)\s*;")
+ domain_variable_str = f"${self._domain_config_name}"
+ brace_domain_variable_regex = re.compile(
+ r"\$\{\s*" + self._domain_config_name + r"\s*\}"
+ )
+ for match in regex.finditer(text):
+ domain_part = match.group(1)
+ if domain_variable_str in domain_part:
+ domains.add(domain_part.replace(domain_variable_str, self.root_domain))
+ continue
+ m = brace_domain_variable_regex.search(domain_part)
+ if m:
+ domains.add(domain_part.replace(m.group(0), self.root_domain))
+ continue
+ domains.add(domain_part)
+ return domains
+
+ def _get_nginx_conf_template_text(self) -> str:
+ template_manager = self.app.get_feature(TemplateManager)
+ text = ""
+ for path, template in template_manager.template_tree.templates:
+ if path.as_posix().startswith("nginx/"):
+ text += template.raw_text
+ return text
+
+ def _get_domains(self) -> list[str]:
+ text = self._get_nginx_conf_template_text()
+ domains = list(self._get_domains_from_text(text))
+ domains.remove(self.root_domain)
+ return [self.root_domain, *domains]
+
+ def _print_domains(self) -> None:
+ for domain in self.domains:
+ print(domain)
+
+ def _certbot_command(
+ self,
+ action: CertbotAction | str,
+ test: bool,
+ *,
+ docker=True,
+ standalone=None,
+ email=None,
+ agree_tos=True,
+ ) -> str:
+ if isinstance(action, str):
+ action = CertbotAction[action.upper()]
+
+ command_args = []
+
+ add_domain_option = True
+ if action is CertbotAction.CREATE:
+ if standalone is None:
+ standalone = True
+ command_action = "certonly"
+ elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]:
+ if standalone is None:
+ standalone = False
+ command_action = "certonly"
+ elif action is CertbotAction.RENEW:
+ if standalone is None:
+ standalone = False
+ add_domain_option = False
+ command_action = "renew"
+ else:
+ raise CruInternalError("Invalid certbot action.")
+
+ data_dir = self.app.data_dir.full_path.as_posix()
+
+ if not docker:
+ command_args.append("certbot")
+ else:
+ command_args.extend(
+ [
+ "docker run -it --rm --name certbot",
+ f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"',
+ f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"',
+ ]
+ )
+ if standalone:
+ command_args.append('-p "0.0.0.0:80:80"')
+ else:
+ command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"')
+
+ command_args.append("certbot/certbot")
+
+ command_args.append(command_action)
+
+ command_args.append(f"--cert-name {self.root_domain}")
+
+ if standalone:
+ command_args.append("--standalone")
+ else:
+ command_args.append("--webroot -w /var/www/certbot")
+
+ if add_domain_option:
+ command_args.append(" ".join([f"-d {domain}" for domain in self.domains]))
+
+ if email is not None:
+ command_args.append(f"--email {email}")
+
+ if agree_tos:
+ command_args.append("--agree-tos")
+
+ if test:
+ command_args.append("--test-cert --dry-run")
+
+ return " ".join(command_args)
+
+ def print_all_certbot_commands(self, test: bool):
+ print("### COMMAND: (standalone) create certs")
+ print(
+ self._certbot_command(
+ CertbotAction.CREATE,
+ test,
+ email=self._config_manager.get_email_value_str_optional(),
+ )
+ )
+ print()
+ print("### COMMAND: (webroot+nginx) expand or shrink certs")
+ print(
+ self._certbot_command(
+ CertbotAction.EXPAND,
+ test,
+ email=self._config_manager.get_email_value_str_optional(),
+ )
+ )
+ print()
+ print("### COMMAND: (webroot+nginx) renew certs")
+ print(
+ self._certbot_command(
+ CertbotAction.RENEW,
+ test,
+ email=self._config_manager.get_email_value_str_optional(),
+ )
+ )
+
+ @property
+ def _cert_path_str(self) -> str:
+ return str(
+ self.app.data_dir.full_path
+ / "certbot/certs/live"
+ / self.root_domain
+ / "fullchain.pem"
+ )
+
+ def get_command_info(self):
+ return "nginx", "Manage nginx related things."
+
+ def setup_arg_parser(self, arg_parser):
+ subparsers = arg_parser.add_subparsers(
+ dest="nginx_command", required=True, metavar="NGINX_COMMAND"
+ )
+ _list_parser = subparsers.add_parser("list", help="list domains")
+ certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
+ certbot_parser.add_argument(
+ "--no-test",
+ action="store_true",
+ help="remove args making certbot run in test mode",
+ )
+
+ def run_command(self, args: Namespace) -> None:
+ if args.nginx_command == "list":
+ self._print_domains()
+ elif args.nginx_command == "certbot":
+ self.print_all_certbot_commands(not args.no_test)
+
+ def _generate_dns_zone(
+ self,
+ ip: str,
+ /,
+ ttl: str | int = 600,
+ *,
+ enable_mail: bool = True,
+ dkim: str | None = None,
+ ) -> str:
+ # TODO: Not complete and test now.
+ root_domain = self.root_domain
+ result = f"$ORIGIN {root_domain}.\n\n"
+ result += "; A records\n"
+ result += f"@ {ttl} IN A {ip}\n"
+ for subdomain in self.subdomains:
+ result += f"{subdomain} {ttl} IN A {ip}\n"
+
+ if enable_mail:
+ result += "\n; MX records\n"
+ result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n"
+ result += "\n; SPF record\n"
+ result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n'
+ if dkim is not None:
+ result += "\n; DKIM record\n"
+ result += f'mail._domainkey {ttl} IN TEXT "{dkim}"'
+ result += "\n; DMARC record\n"
+ dmarc_options = [
+ "v=DMARC1",
+ "p=none",
+ f"rua=mailto:dmarc.report@{root_domain}",
+ f"ruf=mailto:dmarc.report@{root_domain}",
+ "sp=none",
+ "ri=86400",
+ ]
+ result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n'
+ return result
+
+ def _get_dkim_from_mailserver(self) -> str | None:
+ # TODO: Not complete and test now.
+ dkim_path = (
+ self.app.data_dir.full_path
+ / "dms/config/opendkim/keys"
+ / self.root_domain
+ / "mail.txt"
+ )
+ if not dkim_path.exists():
+ return None
+
+ p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True)
+ value = ""
+ for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")):
+ value += match.group(1)
+ return value
+
+ def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str:
+ # TODO: Not complete and test now.
+ return self._generate_dns_zone(
+ ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver()
+ )
diff --git a/tools/cru-py/cru/service/_template.py b/tools/cru-py/cru/service/_template.py
new file mode 100644
index 0000000..170116c
--- /dev/null
+++ b/tools/cru-py/cru/service/_template.py
@@ -0,0 +1,86 @@
+from argparse import Namespace
+import shutil
+
+from cru import CruIterator
+from cru.template import TemplateTree
+
+from ._base import AppCommandFeatureProvider, AppFeaturePath
+from ._config import ConfigManager
+
+
+class TemplateManager(AppCommandFeatureProvider):
+ def __init__(self, prefix: str | None = None):
+ super().__init__("template-manager")
+ self._prefix = prefix or self.app.app_id.upper()
+
+ def setup(self) -> None:
+ self._templates_dir = self.app.add_path("templates", True)
+ self._generated_dir = self.app.add_path("generated", True)
+ self._template_tree: TemplateTree | None = None
+
+ @property
+ def prefix(self) -> str:
+ return self._prefix
+
+ @property
+ def templates_dir(self) -> AppFeaturePath:
+ return self._templates_dir
+
+ @property
+ def generated_dir(self) -> AppFeaturePath:
+ return self._generated_dir
+
+ @property
+ def template_tree(self) -> TemplateTree:
+ if self._template_tree is None:
+ return self.reload()
+ return self._template_tree
+
+ def reload(self) -> TemplateTree:
+ self._template_tree = TemplateTree(
+ self.prefix, self.templates_dir.full_path_str
+ )
+ return self._template_tree
+
+ def _print_file_lists(self) -> None:
+ for file in CruIterator(self.template_tree.templates).transform(lambda t: t[0]):
+ print(file.as_posix())
+
+ def _generate_files(self, dry_run: bool) -> None:
+ config_manager = self.app.get_feature(ConfigManager)
+ if not dry_run and self.generated_dir.full_path.exists():
+ shutil.rmtree(self.generated_dir.full_path)
+ self.template_tree.generate_to(
+ self.generated_dir.full_path_str, config_manager.get_str_dict(), dry_run
+ )
+
+ def get_command_info(self):
+ return ("template", "Manage templates.")
+
+ def setup_arg_parser(self, arg_parser):
+ subparsers = arg_parser.add_subparsers(
+ dest="template_command", required=True, metavar="TEMPLATE_COMMAND"
+ )
+ _list_parser = subparsers.add_parser("list", help="list templates")
+ _variables_parser = subparsers.add_parser(
+ "variables", help="list variables used in all templates"
+ )
+ generate_parser = subparsers.add_parser("generate", help="generate templates")
+ generate_parser.add_argument(
+ "--no-dry-run", action="store_true", help="generate and write target files"
+ )
+
+ def run_command(self, args: Namespace) -> None:
+ if args.template_command == "list":
+ self._print_file_lists()
+ elif args.template_command == "variables":
+ for var in self.template_tree.variables:
+ print(var)
+ elif args.template_command == "generate":
+ dry_run = not args.no_dry_run
+ self._generate_files(dry_run)
+ if dry_run:
+ print("Dry run successfully.")
+ print(
+ f"Will delete dir {self.generated_dir.full_path_str} if it exists."
+ )
diff --git a/tools/cru-py/cru/system.py b/tools/cru-py/cru/system.py
new file mode 100644
index 0000000..f321717
--- /dev/null
+++ b/tools/cru-py/cru/system.py
@@ -0,0 +1,23 @@
+import os.path
+import re
+
+
+def check_debian_derivative_version(name: str) -> None | str:
+ if not os.path.isfile("/etc/os-release"):
+ return None
+ with open("/etc/os-release", "r") as f:
+ content = f.read()
+ if f"ID={name}" not in content:
+ return None
+ m = re.search(r'VERSION_ID="(.+)"', content)
+ if m is None:
+ return None
+ return m.group(1)
+
+
+def check_ubuntu_version() -> None | str:
+ return check_debian_derivative_version("ubuntu")
+
+
+def check_debian_version() -> None | str:
+ return check_debian_derivative_version("debian")
diff --git a/tools/cru-py/cru/template.py b/tools/cru-py/cru/template.py
new file mode 100644
index 0000000..6749cab
--- /dev/null
+++ b/tools/cru-py/cru/template.py
@@ -0,0 +1,153 @@
+from collections.abc import Mapping
+import os
+import os.path
+from pathlib import Path
+from string import Template
+
+from ._iter import CruIterator
+from ._error import CruException
+
+
+class CruTemplateError(CruException):
+ pass
+
+
+class CruTemplate:
+ def __init__(self, prefix: str, text: str):
+ self._prefix = prefix
+ self._template = Template(text)
+ self._variables = (
+ CruIterator(self._template.get_identifiers())
+ .filter(lambda i: i.startswith(self._prefix))
+ .to_set()
+ )
+ self._all_variables = set(self._template.get_identifiers())
+
+ @property
+ def prefix(self) -> str:
+ return self._prefix
+
+ @property
+ def raw_text(self) -> str:
+ return self._template.template
+
+ @property
+ def py_template(self) -> Template:
+ return self._template
+
+ @property
+ def variables(self) -> set[str]:
+ return self._variables
+
+ @property
+ def all_variables(self) -> set[str]:
+ return self._all_variables
+
+ @property
+ def has_variables(self) -> bool:
+ """
+ If the template does not has any variables that starts with the given prefix,
+ it returns False. This usually indicates that the template is not a real
+ template and should be copied as is. Otherwise, it returns True.
+
+ This can be used as a guard to prevent invalid templates created accidentally
+ without notice.
+ """
+ return len(self.variables) > 0
+
+ def generate(self, mapping: Mapping[str, str], allow_extra: bool = True) -> str:
+ values = dict(mapping)
+ if not self.variables <= set(values.keys()):
+ raise CruTemplateError("Missing variables.")
+ if not allow_extra and not set(values.keys()) <= self.variables:
+ raise CruTemplateError("Extra variables.")
+ return self._template.safe_substitute(values)
+
+
+class TemplateTree:
+ def __init__(
+ self,
+ prefix: str,
+ source: str,
+ template_file_suffix: str | None = ".template",
+ ):
+ """
+ If template_file_suffix is not None, the files will be checked according to the
+ suffix of the file name. If the suffix matches, the file will be regarded as a
+ template file. Otherwise, it will be regarded as a non-template file.
+ Content of template file must contain variables that need to be replaced, while
+ content of non-template file may not contain any variables.
+ If either case is false, it generally means whether the file is a template is
+ wrongly handled.
+ """
+ self._prefix = prefix
+ self._files: list[tuple[Path, CruTemplate]] = []
+ self._source = source
+ self._template_file_suffix = template_file_suffix
+ self._load()
+
+ @property
+ def prefix(self) -> str:
+ return self._prefix
+
+ @property
+ def templates(self) -> list[tuple[Path, CruTemplate]]:
+ return self._files
+
+ @property
+ def source(self) -> str:
+ return self._source
+
+ @property
+ def template_file_suffix(self) -> str | None:
+ return self._template_file_suffix
+
+ @staticmethod
+ def _scan_files(root_path: str) -> list[Path]:
+ result: list[Path] = []
+ for root, _dirs, files in os.walk(root_path):
+ for file in files:
+ path = Path(root, file)
+ path = path.relative_to(root_path)
+ result.append(Path(path))
+ return result
+
+ def _load(self) -> None:
+ files = self._scan_files(self.source)
+ for file_path in files:
+ template_file = Path(self.source) / file_path
+ with open(template_file, "r") as f:
+ content = f.read()
+ template = CruTemplate(self.prefix, content)
+ if self.template_file_suffix is not None:
+ should_be_template = file_path.name.endswith(self.template_file_suffix)
+ if should_be_template and not template.has_variables:
+ raise CruTemplateError(
+ f"Template file {file_path} has no variables."
+ )
+ elif not should_be_template and template.has_variables:
+ raise CruTemplateError(f"Non-template {file_path} has variables.")
+ self._files.append((file_path, template))
+
+ @property
+ def variables(self) -> set[str]:
+ s = set()
+ for _, template in self.templates:
+ s.update(template.variables)
+ return s
+
+ def generate_to(
+ self, destination: str, variables: Mapping[str, str], dry_run: bool
+ ) -> None:
+ for file, template in self.templates:
+ des = Path(destination) / file
+ if self.template_file_suffix is not None and des.name.endswith(
+ self.template_file_suffix
+ ):
+ des = des.parent / (des.name[: -len(self.template_file_suffix)])
+
+ text = template.generate(variables)
+ if not dry_run:
+ des.parent.mkdir(parents=True, exist_ok=True)
+ with open(des, "w") as f:
+ f.write(text)
diff --git a/tools/cru-py/cru/tool.py b/tools/cru-py/cru/tool.py
new file mode 100644
index 0000000..377f5d7
--- /dev/null
+++ b/tools/cru-py/cru/tool.py
@@ -0,0 +1,82 @@
+import shutil
+import subprocess
+from typing import Any
+from collections.abc import Iterable
+
+from ._error import CruException
+
+
+class CruExternalToolError(CruException):
+ def __init__(self, message: str, tool: str, *args, **kwargs) -> None:
+ super().__init__(message, *args, **kwargs)
+ self._tool = tool
+
+ @property
+ def tool(self) -> str:
+ return self._tool
+
+
+class CruExternalToolNotFoundError(CruExternalToolError):
+ def __init__(self, message: str | None, tool: str, *args, **kwargs) -> None:
+ super().__init__(
+ message or f"Could not find binary for {tool}.", tool, *args, **kwargs
+ )
+
+
+class CruExternalToolRunError(CruExternalToolError):
+ def __init__(
+ self,
+ message: str,
+ tool: str,
+ tool_args: Iterable[str],
+ tool_error: Any,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(message, tool, *args, **kwargs)
+ self._tool_args = list(tool_args)
+ self._tool_error = tool_error
+
+ @property
+ def tool_args(self) -> list[str]:
+ return self._tool_args
+
+ @property
+ def tool_error(self) -> Any:
+ return self._tool_error
+
+
+class ExternalTool:
+ def __init__(self, bin: str) -> None:
+ self._bin = bin
+
+ @property
+ def bin(self) -> str:
+ return self._bin
+
+ @bin.setter
+ def bin(self, value: str) -> None:
+ self._bin = value
+
+ @property
+ def bin_path(self) -> str:
+ real_bin = shutil.which(self.bin)
+ if not real_bin:
+ raise CruExternalToolNotFoundError(None, self.bin)
+ return real_bin
+
+ def run(
+ self, *process_args: str, **subprocess_kwargs
+ ) -> subprocess.CompletedProcess:
+ try:
+ return subprocess.run(
+ [self.bin_path] + list(process_args), **subprocess_kwargs
+ )
+ except subprocess.CalledProcessError as e:
+ raise CruExternalToolError("Subprocess failed.", self.bin) from e
+ except OSError as e:
+ raise CruExternalToolError("Failed to start subprocess", self.bin) from e
+
+ def run_get_output(self, *process_args: str, **subprocess_kwargs) -> Any:
+ process = self.run(*process_args, capture_output=True, **subprocess_kwargs)
+ return process.stdout
diff --git a/tools/cru-py/cru/value.py b/tools/cru-py/cru/value.py
new file mode 100644
index 0000000..9c03219
--- /dev/null
+++ b/tools/cru-py/cru/value.py
@@ -0,0 +1,292 @@
+from __future__ import annotations
+
+import random
+import secrets
+import string
+import uuid
+from abc import abstractmethod, ABCMeta
+from collections.abc import Callable
+from typing import Any, ClassVar, TypeVar, Generic
+
+from ._error import CruException
+
+
+def _str_case_in(s: str, case: bool, str_list: list[str]) -> bool:
+ if case:
+ return s in str_list
+ else:
+ return s.lower() in [s.lower() for s in str_list]
+
+
+_T = TypeVar("_T")
+
+
+class CruValueTypeError(CruException):
+ def __init__(
+ self,
+ message: str,
+ value: Any,
+ value_type: ValueType | None,
+ *args,
+ **kwargs,
+ ):
+ super().__init__(
+ message,
+ *args,
+ **kwargs,
+ )
+ self._value = value
+ self._value_type = value_type
+
+ @property
+ def value(self) -> Any:
+ return self._value
+
+ @property
+ def value_type(self) -> ValueType | None:
+ return self._value_type
+
+
+class ValueType(Generic[_T], metaclass=ABCMeta):
+ def __init__(self, name: str, _type: type[_T]) -> None:
+ self._name = name
+ self._type = _type
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def type(self) -> type[_T]:
+ return self._type
+
+ def check_value_type(self, value: Any) -> None:
+ if not isinstance(value, self.type):
+ raise CruValueTypeError("Type of value is wrong.", value, self)
+
+ def _do_check_value(self, value: Any) -> _T:
+ return value
+
+ def check_value(self, value: Any) -> _T:
+ self.check_value_type(value)
+ return self._do_check_value(value)
+
+ @abstractmethod
+ def _do_check_str_format(self, s: str) -> None:
+ raise NotImplementedError()
+
+ def check_str_format(self, s: str) -> None:
+ if not isinstance(s, str):
+ raise CruValueTypeError("Try to check format on a non-str.", s, self)
+ self._do_check_str_format(s)
+
+ @abstractmethod
+ def _do_convert_value_to_str(self, value: _T) -> str:
+ raise NotImplementedError()
+
+ def convert_value_to_str(self, value: _T) -> str:
+ self.check_value(value)
+ return self._do_convert_value_to_str(value)
+
+ @abstractmethod
+ def _do_convert_str_to_value(self, s: str) -> _T:
+ raise NotImplementedError()
+
+ def convert_str_to_value(self, s: str) -> _T:
+ self.check_str_format(s)
+ return self._do_convert_str_to_value(s)
+
+ def check_value_or_try_convert_from_str(self, value_or_str: Any) -> _T:
+ try:
+ return self.check_value(value_or_str)
+ except CruValueTypeError:
+ if isinstance(value_or_str, str):
+ return self.convert_str_to_value(value_or_str)
+ else:
+ raise
+
+ def create_default_value(self) -> _T:
+ return self.type()
+
+
+class TextValueType(ValueType[str]):
+ def __init__(self) -> None:
+ super().__init__("text", str)
+
+ def _do_check_str_format(self, _s):
+ return
+
+ def _do_convert_value_to_str(self, value):
+ return value
+
+ def _do_convert_str_to_value(self, s):
+ return s
+
+
+class IntegerValueType(ValueType[int]):
+ def __init__(self) -> None:
+ super().__init__("integer", int)
+
+ def _do_check_str_format(self, s):
+ try:
+ int(s)
+ except ValueError as e:
+ raise CruValueTypeError("Invalid integer format.", s, self) from e
+
+ def _do_convert_value_to_str(self, value):
+ return str(value)
+
+ def _do_convert_str_to_value(self, s):
+ return int(s)
+
+
+class FloatValueType(ValueType[float]):
+ def __init__(self) -> None:
+ super().__init__("float", float)
+
+ def _do_check_str_format(self, s):
+ try:
+ float(s)
+ except ValueError as e:
+ raise CruValueTypeError("Invalid float format.", s, self) from e
+
+ def _do_convert_value_to_str(self, value):
+ return str(value)
+
+ def _do_convert_str_to_value(self, s):
+ return float(s)
+
+
+class BooleanValueType(ValueType[bool]):
+ DEFAULT_TRUE_LIST: ClassVar[list[str]] = ["true", "yes", "y", "on", "1"]
+ DEFAULT_FALSE_LIST: ClassVar[list[str]] = ["false", "no", "n", "off", "0"]
+
+ def __init__(
+ self,
+ *,
+ case_sensitive=False,
+ true_list: None | list[str] = None,
+ false_list: None | list[str] = None,
+ ) -> None:
+ super().__init__("boolean", bool)
+ self._case_sensitive = case_sensitive
+ self._valid_true_strs: list[str] = (
+ true_list or BooleanValueType.DEFAULT_TRUE_LIST
+ )
+ self._valid_false_strs: list[str] = (
+ false_list or BooleanValueType.DEFAULT_FALSE_LIST
+ )
+
+ @property
+ def case_sensitive(self) -> bool:
+ return self._case_sensitive
+
+ @property
+ def valid_true_strs(self) -> list[str]:
+ return self._valid_true_strs
+
+ @property
+ def valid_false_strs(self) -> list[str]:
+ return self._valid_false_strs
+
+ @property
+ def valid_boolean_strs(self) -> list[str]:
+ return self._valid_true_strs + self._valid_false_strs
+
+ def _do_check_str_format(self, s):
+ if not _str_case_in(s, self.case_sensitive, self.valid_boolean_strs):
+ raise CruValueTypeError("Invalid boolean format.", s, self)
+
+ def _do_convert_value_to_str(self, value):
+ return self._valid_true_strs[0] if value else self._valid_false_strs[0]
+
+ def _do_convert_str_to_value(self, s):
+ return _str_case_in(s, self.case_sensitive, self._valid_true_strs)
+
+ def create_default_value(self):
+ return self.valid_false_strs[0]
+
+
+class EnumValueType(ValueType[str]):
+ def __init__(self, valid_values: list[str], /, case_sensitive=False) -> None:
+ super().__init__(f"enum({'|'.join(valid_values)})", str)
+ self._case_sensitive = case_sensitive
+ self._valid_values = valid_values
+
+ @property
+ def case_sensitive(self) -> bool:
+ return self._case_sensitive
+
+ @property
+ def valid_values(self) -> list[str]:
+ return self._valid_values
+
+ def _do_check_value(self, value):
+ self._do_check_str_format(value)
+
+ def _do_check_str_format(self, s):
+ if not _str_case_in(s, self.case_sensitive, self.valid_values):
+ raise CruValueTypeError("Invalid enum value", s, self)
+
+ def _do_convert_value_to_str(self, value):
+ return value
+
+ def _do_convert_str_to_value(self, s):
+ return s
+
+ def create_default_value(self):
+ return self.valid_values[0]
+
+
+TEXT_VALUE_TYPE = TextValueType()
+INTEGER_VALUE_TYPE = IntegerValueType()
+BOOLEAN_VALUE_TYPE = BooleanValueType()
+
+
+class ValueGeneratorBase(Generic[_T], metaclass=ABCMeta):
+ @abstractmethod
+ def generate(self) -> _T:
+ raise NotImplementedError()
+
+ def __call__(self) -> _T:
+ return self.generate()
+
+
+class ValueGenerator(ValueGeneratorBase[_T]):
+ def __init__(self, generate_func: Callable[[], _T]) -> None:
+ self._generate_func = generate_func
+
+ @property
+ def generate_func(self) -> Callable[[], _T]:
+ return self._generate_func
+
+ def generate(self) -> _T:
+ return self._generate_func()
+
+
+class UuidValueGenerator(ValueGeneratorBase[str]):
+ def generate(self):
+ return str(uuid.uuid4())
+
+
+class RandomStringValueGenerator(ValueGeneratorBase[str]):
+ def __init__(self, length: int, secure: bool) -> None:
+ self._length = length
+ self._secure = secure
+
+ @property
+ def length(self) -> int:
+ return self._length
+
+ @property
+ def secure(self) -> bool:
+ return self._secure
+
+ def generate(self):
+ random_func = secrets.choice if self._secure else random.choice
+ characters = string.ascii_letters + string.digits
+ random_string = "".join(random_func(characters) for _ in range(self._length))
+ return random_string
+
+
+UUID_VALUE_GENERATOR = UuidValueGenerator()
diff --git a/tools/cru-py/poetry.lock b/tools/cru-py/poetry.lock
new file mode 100644
index 0000000..305aaee
--- /dev/null
+++ b/tools/cru-py/poetry.lock
@@ -0,0 +1,80 @@
+# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand.
+
+[[package]]
+name = "mypy"
+version = "1.14.0"
+description = "Optional static typing for Python"
+optional = false
+python-versions = ">=3.8"
+files = [
+ {file = "mypy-1.14.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e971c1c667007f9f2b397ffa80fa8e1e0adccff336e5e77e74cb5f22868bee87"},
+ {file = "mypy-1.14.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:e86aaeaa3221a278c66d3d673b297232947d873773d61ca3ee0e28b2ff027179"},
+ {file = "mypy-1.14.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1628c5c3ce823d296e41e2984ff88c5861499041cb416a8809615d0c1f41740e"},
+ {file = "mypy-1.14.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:7fadb29b77fc14a0dd81304ed73c828c3e5cde0016c7e668a86a3e0dfc9f3af3"},
+ {file = "mypy-1.14.0-cp310-cp310-win_amd64.whl", hash = "sha256:3fa76988dc760da377c1e5069200a50d9eaaccf34f4ea18428a3337034ab5a44"},
+ {file = "mypy-1.14.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6e73c8a154eed31db3445fe28f63ad2d97b674b911c00191416cf7f6459fd49a"},
+ {file = "mypy-1.14.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:273e70fcb2e38c5405a188425aa60b984ffdcef65d6c746ea5813024b68c73dc"},
+ {file = "mypy-1.14.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1daca283d732943731a6a9f20fdbcaa927f160bc51602b1d4ef880a6fb252015"},
+ {file = "mypy-1.14.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:7e68047bedb04c1c25bba9901ea46ff60d5eaac2d71b1f2161f33107e2b368eb"},
+ {file = "mypy-1.14.0-cp311-cp311-win_amd64.whl", hash = "sha256:7a52f26b9c9b1664a60d87675f3bae00b5c7f2806e0c2800545a32c325920bcc"},
+ {file = "mypy-1.14.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:d5326ab70a6db8e856d59ad4cb72741124950cbbf32e7b70e30166ba7bbf61dd"},
+ {file = "mypy-1.14.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:bf4ec4980bec1e0e24e5075f449d014011527ae0055884c7e3abc6a99cd2c7f1"},
+ {file = "mypy-1.14.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:390dfb898239c25289495500f12fa73aa7f24a4c6d90ccdc165762462b998d63"},
+ {file = "mypy-1.14.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:7e026d55ddcd76e29e87865c08cbe2d0104e2b3153a523c529de584759379d3d"},
+ {file = "mypy-1.14.0-cp312-cp312-win_amd64.whl", hash = "sha256:585ed36031d0b3ee362e5107ef449a8b5dfd4e9c90ccbe36414ee405ee6b32ba"},
+ {file = "mypy-1.14.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:e9f6f4c0b27401d14c483c622bc5105eff3911634d576bbdf6695b9a7c1ba741"},
+ {file = "mypy-1.14.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:56b2280cedcb312c7a79f5001ae5325582d0d339bce684e4a529069d0e7ca1e7"},
+ {file = "mypy-1.14.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:342de51c48bab326bfc77ce056ba08c076d82ce4f5a86621f972ed39970f94d8"},
+ {file = "mypy-1.14.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:00df23b42e533e02a6f0055e54de9a6ed491cd8b7ea738647364fd3a39ea7efc"},
+ {file = "mypy-1.14.0-cp313-cp313-win_amd64.whl", hash = "sha256:e8c8387e5d9dff80e7daf961df357c80e694e942d9755f3ad77d69b0957b8e3f"},
+ {file = "mypy-1.14.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:0b16738b1d80ec4334654e89e798eb705ac0c36c8a5c4798496cd3623aa02286"},
+ {file = "mypy-1.14.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:10065fcebb7c66df04b05fc799a854b1ae24d9963c8bb27e9064a9bdb43aa8ad"},
+ {file = "mypy-1.14.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:fbb7d683fa6bdecaa106e8368aa973ecc0ddb79a9eaeb4b821591ecd07e9e03c"},
+ {file = "mypy-1.14.0-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:3498cb55448dc5533e438cd13d6ddd28654559c8c4d1fd4b5ca57a31b81bac01"},
+ {file = "mypy-1.14.0-cp38-cp38-win_amd64.whl", hash = "sha256:c7b243408ea43755f3a21a0a08e5c5ae30eddb4c58a80f415ca6b118816e60aa"},
+ {file = "mypy-1.14.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:14117b9da3305b39860d0aa34b8f1ff74d209a368829a584eb77524389a9c13e"},
+ {file = "mypy-1.14.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:af98c5a958f9c37404bd4eef2f920b94874507e146ed6ee559f185b8809c44cc"},
+ {file = "mypy-1.14.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f0b343a1d3989547024377c2ba0dca9c74a2428ad6ed24283c213af8dbb0710b"},
+ {file = "mypy-1.14.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:cdb5563c1726c85fb201be383168f8c866032db95e1095600806625b3a648cb7"},
+ {file = "mypy-1.14.0-cp39-cp39-win_amd64.whl", hash = "sha256:74e925649c1ee0a79aa7448baf2668d81cc287dc5782cff6a04ee93f40fb8d3f"},
+ {file = "mypy-1.14.0-py3-none-any.whl", hash = "sha256:2238d7f93fc4027ed1efc944507683df3ba406445a2b6c96e79666a045aadfab"},
+ {file = "mypy-1.14.0.tar.gz", hash = "sha256:822dbd184d4a9804df5a7d5335a68cf7662930e70b8c1bc976645d1509f9a9d6"},
+]
+
+[package.dependencies]
+mypy_extensions = ">=1.0.0"
+typing_extensions = ">=4.6.0"
+
+[package.extras]
+dmypy = ["psutil (>=4.0)"]
+faster-cache = ["orjson"]
+install-types = ["pip"]
+mypyc = ["setuptools (>=50)"]
+reports = ["lxml"]
+
+[[package]]
+name = "mypy-extensions"
+version = "1.0.0"
+description = "Type system extensions for programs checked with the mypy type checker."
+optional = false
+python-versions = ">=3.5"
+files = [
+ {file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"},
+ {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"},
+]
+
+[[package]]
+name = "typing-extensions"
+version = "4.12.2"
+description = "Backported and Experimental Type Hints for Python 3.8+"
+optional = false
+python-versions = ">=3.8"
+files = [
+ {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"},
+ {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"},
+]
+
+[metadata]
+lock-version = "2.0"
+python-versions = "^3.11"
+content-hash = "34a84c9f444021c048be3a70dbb3246bb73c4e7e8f0cc980b8050debcf21a6f9"
diff --git a/tools/cru-py/pyproject.toml b/tools/cru-py/pyproject.toml
new file mode 100644
index 0000000..e5e7f09
--- /dev/null
+++ b/tools/cru-py/pyproject.toml
@@ -0,0 +1,26 @@
+[project]
+name = "cru-py"
+version = "0.1.0"
+requires-python = ">=3.11"
+
+[tool.poetry]
+package-mode = false
+name = "cru"
+version = "0.1.0"
+description = ""
+authors = ["Yuqian Yang <crupest@crupest.life>"]
+license = "MIT"
+readme = "README.md"
+
+[tool.poetry.dependencies]
+python = "^3.11"
+
+[tool.poetry.group.dev.dependencies]
+mypy = "^1.13.0"
+
+[tool.ruff.lint]
+select = ["E", "F", "B"]
+
+[build-system]
+requires = ["poetry-core"]
+build-backend = "poetry.core.masonry.api"
diff --git a/tools/cru-py/www-dev b/tools/cru-py/www-dev
new file mode 100644
index 0000000..f56d679
--- /dev/null
+++ b/tools/cru-py/www-dev
@@ -0,0 +1,8 @@
+#! /usr/bin/env sh
+
+set -e
+
+cd "$(dirname "$0")/../.."
+
+exec tmux new-session 'cd docker/crupest-nginx/sites/www && pnpm start' \; \
+ split-window -h 'cd docker/crupest-api/CrupestApi/CrupestApi && dotnet run --launch-profile dev'