aboutsummaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
Diffstat (limited to 'tools')
-rw-r--r--tools/cru-py/.gitignore3
-rw-r--r--tools/cru-py/.python-version1
-rw-r--r--tools/cru-py/cru/__init__.py60
-rw-r--r--tools/cru-py/cru/_base.py101
-rw-r--r--tools/cru-py/cru/_const.py49
-rw-r--r--tools/cru-py/cru/_decorator.py97
-rw-r--r--tools/cru-py/cru/_error.py89
-rw-r--r--tools/cru-py/cru/_event.py61
-rw-r--r--tools/cru-py/cru/_func.py172
-rw-r--r--tools/cru-py/cru/_helper.py16
-rw-r--r--tools/cru-py/cru/_iter.py469
-rw-r--r--tools/cru-py/cru/_type.py52
-rw-r--r--tools/cru-py/cru/attr.py364
-rw-r--r--tools/cru-py/cru/config.py196
-rw-r--r--tools/cru-py/cru/list.py160
-rw-r--r--tools/cru-py/cru/parsing.py290
-rw-r--r--tools/cru-py/cru/service/__init__.py0
-rw-r--r--tools/cru-py/cru/service/__main__.py20
-rw-r--r--tools/cru-py/cru/service/_app.py34
-rw-r--r--tools/cru-py/cru/service/_base.py449
-rw-r--r--tools/cru-py/cru/service/_config.py444
-rw-r--r--tools/cru-py/cru/service/_external.py81
-rw-r--r--tools/cru-py/cru/service/_nginx.py268
-rw-r--r--tools/cru-py/cru/service/_template.py90
-rw-r--r--tools/cru-py/cru/system.py23
-rw-r--r--tools/cru-py/cru/template.py207
-rw-r--r--tools/cru-py/cru/tool.py82
-rw-r--r--tools/cru-py/cru/value.py292
-rw-r--r--tools/cru-py/poetry.lock111
-rw-r--r--tools/cru-py/pyproject.toml27
-rw-r--r--tools/cru-py/www-dev8
-rwxr-xr-xtools/manage16
-rw-r--r--tools/manage.cmd15
-rwxr-xr-xtools/update-blog5
34 files changed, 0 insertions, 4352 deletions
diff --git a/tools/cru-py/.gitignore b/tools/cru-py/.gitignore
deleted file mode 100644
index f5833b1..0000000
--- a/tools/cru-py/.gitignore
+++ /dev/null
@@ -1,3 +0,0 @@
-__pycache__
-.venv
-.mypy_cache
diff --git a/tools/cru-py/.python-version b/tools/cru-py/.python-version
deleted file mode 100644
index 2c07333..0000000
--- a/tools/cru-py/.python-version
+++ /dev/null
@@ -1 +0,0 @@
-3.11
diff --git a/tools/cru-py/cru/__init__.py b/tools/cru-py/cru/__init__.py
deleted file mode 100644
index 17799a9..0000000
--- a/tools/cru-py/cru/__init__.py
+++ /dev/null
@@ -1,60 +0,0 @@
-import sys
-
-from ._base import CRU, CruNamespaceError, CRU_NAME_PREFIXES
-from ._error import (
- CruException,
- CruLogicError,
- CruInternalError,
- CruUnreachableError,
- cru_unreachable,
-)
-from ._const import (
- CruConstantBase,
- CruDontChange,
- CruNotFound,
- CruNoValue,
- CruPlaceholder,
- CruUseDefault,
-)
-from ._func import CruFunction
-from ._iter import CruIterable, CruIterator
-from ._event import CruEvent, CruEventHandlerToken
-from ._type import CruTypeSet, CruTypeCheckError
-
-
-class CruInitError(CruException):
- pass
-
-
-def check_python_version(required_version=(3, 11)):
- if sys.version_info < required_version:
- raise CruInitError(f"Python version must be >= {required_version}!")
-
-
-check_python_version()
-
-__all__ = [
- "CRU",
- "CruNamespaceError",
- "CRU_NAME_PREFIXES",
- "check_python_version",
- "CruException",
- "CruInternalError",
- "CruLogicError",
- "CruUnreachableError",
- "cru_unreachable",
- "CruInitError",
- "CruConstantBase",
- "CruDontChange",
- "CruNotFound",
- "CruNoValue",
- "CruPlaceholder",
- "CruUseDefault",
- "CruFunction",
- "CruIterable",
- "CruIterator",
- "CruEvent",
- "CruEventHandlerToken",
- "CruTypeSet",
- "CruTypeCheckError",
-]
diff --git a/tools/cru-py/cru/_base.py b/tools/cru-py/cru/_base.py
deleted file mode 100644
index 2599d8f..0000000
--- a/tools/cru-py/cru/_base.py
+++ /dev/null
@@ -1,101 +0,0 @@
-from typing import Any
-
-from ._helper import remove_none
-from ._error import CruException
-
-
-class CruNamespaceError(CruException):
- """Raised when a namespace is not found."""
-
-
-class _Cru:
- NAME_PREFIXES = ("CRU_", "Cru", "cru_")
-
- def __init__(self) -> None:
- self._d: dict[str, Any] = {}
-
- def all_names(self) -> list[str]:
- return list(self._d.keys())
-
- def get(self, name: str) -> Any:
- return self._d[name]
-
- def has_name(self, name: str) -> bool:
- return name in self._d
-
- @staticmethod
- def _maybe_remove_prefix(name: str) -> str | None:
- for prefix in _Cru.NAME_PREFIXES:
- if name.startswith(prefix):
- return name[len(prefix) :]
- return None
-
- def _check_name_exist(self, *names: str | None) -> None:
- for name in names:
- if name is None:
- continue
- if self.has_name(name):
- raise CruNamespaceError(f"Name {name} exists in CRU.")
-
- @staticmethod
- def check_name_format(name: str) -> tuple[str, str]:
- no_prefix_name = _Cru._maybe_remove_prefix(name)
- if no_prefix_name is None:
- raise CruNamespaceError(
- f"Name {name} is not prefixed with any of {_Cru.NAME_PREFIXES}."
- )
- return name, no_prefix_name
-
- @staticmethod
- def _check_object_name(o) -> tuple[str, str]:
- return _Cru.check_name_format(o.__name__)
-
- def _do_add(self, o, *names: str | None) -> list[str]:
- name_list: list[str] = remove_none(names)
- for name in name_list:
- self._d[name] = o
- return name_list
-
- def add(self, o, name: str | None) -> tuple[str, str | None]:
- no_prefix_name: str | None
- if name is None:
- name, no_prefix_name = self._check_object_name(o)
- else:
- no_prefix_name = self._maybe_remove_prefix(name)
-
- self._check_name_exist(name, no_prefix_name)
- self._do_add(o, name, no_prefix_name)
- return name, no_prefix_name
-
- def add_with_alias(self, o, name: str | None = None, *aliases: str) -> list[str]:
- final_names: list[str | None] = []
- no_prefix_name: str | None
- if name is None:
- name, no_prefix_name = self._check_object_name(o)
- self._check_name_exist(name, no_prefix_name)
- final_names.extend([name, no_prefix_name])
- for alias in aliases:
- no_prefix_name = self._maybe_remove_prefix(alias)
- self._check_name_exist(alias, no_prefix_name)
- final_names.extend([alias, no_prefix_name])
-
- return self._do_add(o, *final_names)
-
- def add_objects(self, *objects):
- final_list = []
- for o in objects:
- name, no_prefix_name = self._check_object_name(o)
- self._check_name_exist(name, no_prefix_name)
- final_list.append((o, name, no_prefix_name))
- for o, name, no_prefix_name in final_list:
- self._do_add(o, name, no_prefix_name)
-
- def __getitem__(self, item):
- return self.get(item)
-
- def __getattr__(self, item):
- return self.get(item)
-
-
-CRU_NAME_PREFIXES = _Cru.NAME_PREFIXES
-CRU = _Cru()
diff --git a/tools/cru-py/cru/_const.py b/tools/cru-py/cru/_const.py
deleted file mode 100644
index 8246b35..0000000
--- a/tools/cru-py/cru/_const.py
+++ /dev/null
@@ -1,49 +0,0 @@
-from enum import Enum, auto
-from typing import Self, TypeGuard, TypeVar
-
-from ._base import CRU
-
-_T = TypeVar("_T")
-
-
-class CruConstantBase(Enum):
- @classmethod
- def check(cls, v: _T | Self) -> TypeGuard[Self]:
- return isinstance(v, cls)
-
- @classmethod
- def check_not(cls, v: _T | Self) -> TypeGuard[_T]:
- return not cls.check(v)
-
- @classmethod
- def value(cls) -> Self:
- return cls.VALUE # type: ignore
-
-
-class CruNotFound(CruConstantBase):
- VALUE = auto()
-
-
-class CruUseDefault(CruConstantBase):
- VALUE = auto()
-
-
-class CruDontChange(CruConstantBase):
- VALUE = auto()
-
-
-class CruNoValue(CruConstantBase):
- VALUE = auto()
-
-
-class CruPlaceholder(CruConstantBase):
- VALUE = auto()
-
-
-CRU.add_objects(
- CruNotFound,
- CruUseDefault,
- CruDontChange,
- CruNoValue,
- CruPlaceholder,
-)
diff --git a/tools/cru-py/cru/_decorator.py b/tools/cru-py/cru/_decorator.py
deleted file mode 100644
index 137fc05..0000000
--- a/tools/cru-py/cru/_decorator.py
+++ /dev/null
@@ -1,97 +0,0 @@
-from __future__ import annotations
-
-from collections.abc import Callable
-from typing import (
- Concatenate,
- Generic,
- ParamSpec,
- TypeVar,
- cast,
-)
-
-from ._base import CRU
-
-_P = ParamSpec("_P")
-_T = TypeVar("_T")
-_O = TypeVar("_O")
-_R = TypeVar("_R")
-
-
-class CruDecorator:
-
- class ConvertResult(Generic[_T, _O]):
- def __init__(
- self,
- converter: Callable[[_T], _O],
- ) -> None:
- self.converter = converter
-
- def __call__(self, origin: Callable[_P, _T]) -> Callable[_P, _O]:
- converter = self.converter
-
- def real_impl(*args: _P.args, **kwargs: _P.kwargs) -> _O:
- return converter(origin(*args, **kwargs))
-
- return real_impl
-
- class ImplementedBy(Generic[_T, _O, _P, _R]):
- def __init__(
- self,
- impl: Callable[Concatenate[_O, _P], _R],
- converter: Callable[[_T], _O],
- ) -> None:
- self.impl = impl
- self.converter = converter
-
- def __call__(
- self, _origin: Callable[[_T], None]
- ) -> Callable[Concatenate[_T, _P], _R]:
- converter = self.converter
- impl = self.impl
-
- def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R:
- return cast(Callable[Concatenate[_O, _P], _R], impl)(
- converter(_self), *args, **kwargs
- )
-
- return real_impl
-
- @staticmethod
- def create_factory(converter: Callable[[_T], _O]) -> Callable[
- [Callable[Concatenate[_O, _P], _R]],
- CruDecorator.ImplementedBy[_T, _O, _P, _R],
- ]:
- def create(
- m: Callable[Concatenate[_O, _P], _R],
- ) -> CruDecorator.ImplementedBy[_T, _O, _P, _R]:
- return CruDecorator.ImplementedBy(m, converter)
-
- return create
-
- class ImplementedByNoSelf(Generic[_P, _R]):
- def __init__(self, impl: Callable[_P, _R]) -> None:
- self.impl = impl
-
- def __call__(
- self, _origin: Callable[[_T], None]
- ) -> Callable[Concatenate[_T, _P], _R]:
- impl = self.impl
-
- def real_impl(_self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R:
- return cast(Callable[_P, _R], impl)(*args, **kwargs)
-
- return real_impl
-
- @staticmethod
- def create_factory() -> (
- Callable[[Callable[_P, _R]], CruDecorator.ImplementedByNoSelf[_P, _R]]
- ):
- def create(
- m: Callable[_P, _R],
- ) -> CruDecorator.ImplementedByNoSelf[_P, _R]:
- return CruDecorator.ImplementedByNoSelf(m)
-
- return create
-
-
-CRU.add_objects(CruDecorator)
diff --git a/tools/cru-py/cru/_error.py b/tools/cru-py/cru/_error.py
deleted file mode 100644
index e53c787..0000000
--- a/tools/cru-py/cru/_error.py
+++ /dev/null
@@ -1,89 +0,0 @@
-from __future__ import annotations
-
-from typing import NoReturn, cast, overload
-
-
-class CruException(Exception):
- """Base exception class of all exceptions in cru."""
-
- @overload
- def __init__(
- self,
- message: None = None,
- *args,
- user_message: str,
- **kwargs,
- ): ...
-
- @overload
- def __init__(
- self,
- message: str,
- *args,
- user_message: str | None = None,
- **kwargs,
- ): ...
-
- def __init__(
- self,
- message: str | None = None,
- *args,
- user_message: str | None = None,
- **kwargs,
- ):
- if message is None:
- message = user_message
-
- super().__init__(
- message,
- *args,
- **kwargs,
- )
- self._message: str
- self._message = cast(str, message)
- self._user_message = user_message
-
- @property
- def message(self) -> str:
- return self._message
-
- def get_user_message(self) -> str | None:
- return self._user_message
-
- def get_message(self, use_user: bool = True) -> str:
- if use_user and self._user_message is not None:
- return self._user_message
- else:
- return self._message
-
- @property
- def is_internal(self) -> bool:
- return False
-
- @property
- def is_logic_error(self) -> bool:
- return False
-
-
-class CruLogicError(CruException):
- """Raised when a logic error occurs."""
-
- @property
- def is_logic_error(self) -> bool:
- return True
-
-
-class CruInternalError(CruException):
- """Raised when an internal error occurs."""
-
- @property
- def is_internal(self) -> bool:
- return True
-
-
-class CruUnreachableError(CruInternalError):
- """Raised when a code path is unreachable."""
-
-
-def cru_unreachable() -> NoReturn:
- raise CruUnreachableError("Code should not reach here!")
diff --git a/tools/cru-py/cru/_event.py b/tools/cru-py/cru/_event.py
deleted file mode 100644
index 51a794c..0000000
--- a/tools/cru-py/cru/_event.py
+++ /dev/null
@@ -1,61 +0,0 @@
-from __future__ import annotations
-
-from collections.abc import Callable
-from typing import Generic, ParamSpec, TypeVar
-
-from .list import CruList
-
-_P = ParamSpec("_P")
-_R = TypeVar("_R")
-
-
-class CruEventHandlerToken(Generic[_P, _R]):
- def __init__(
- self, event: CruEvent, handler: Callable[_P, _R], once: bool = False
- ) -> None:
- self._event = event
- self._handler = handler
- self._once = once
-
- @property
- def event(self) -> CruEvent:
- return self._event
-
- @property
- def handler(self) -> Callable[_P, _R]:
- return self._handler
-
- @property
- def once(self) -> bool:
- return self._once
-
-
-class CruEvent(Generic[_P, _R]):
- def __init__(self, name: str) -> None:
- self._name = name
- self._tokens: CruList[CruEventHandlerToken] = CruList()
-
- def register(
- self, handler: Callable[_P, _R], once: bool = False
- ) -> CruEventHandlerToken:
- token = CruEventHandlerToken(self, handler, once)
- self._tokens.append(token)
- return token
-
- def unregister(self, *handlers: CruEventHandlerToken | Callable[_P, _R]) -> int:
- old_length = len(self._tokens)
- self._tokens.reset(
- self._tokens.as_cru_iterator().filter(
- (lambda t: t in handlers or t.handler in handlers)
- )
- )
- return old_length - len(self._tokens)
-
- def trigger(self, *args: _P.args, **kwargs: _P.kwargs) -> CruList[_R]:
- results = CruList(
- self._tokens.as_cru_iterator()
- .transform(lambda t: t.handler(*args, **kwargs))
- .to_list()
- )
- self._tokens.reset(self._tokens.as_cru_iterator().filter(lambda t: not t.once))
- return results
diff --git a/tools/cru-py/cru/_func.py b/tools/cru-py/cru/_func.py
deleted file mode 100644
index fc57802..0000000
--- a/tools/cru-py/cru/_func.py
+++ /dev/null
@@ -1,172 +0,0 @@
-from __future__ import annotations
-
-from collections.abc import Callable, Iterable
-from enum import Flag, auto
-from typing import (
- Any,
- Generic,
- Literal,
- ParamSpec,
- TypeAlias,
- TypeVar,
-)
-
-
-from ._base import CRU
-from ._const import CruPlaceholder
-
-_P = ParamSpec("_P")
-_P1 = ParamSpec("_P1")
-_T = TypeVar("_T")
-
-
-class _Dec:
- @staticmethod
- def wrap(
- origin: Callable[_P, Callable[_P1, _T]]
- ) -> Callable[_P, _Wrapper[_P1, _T]]:
- def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _Wrapper[_P1, _T]:
- return _Wrapper(origin(*args, **kwargs))
-
- return _wrapped
-
-
-class _RawBase:
- @staticmethod
- def none(*_v, **_kwargs) -> None:
- return None
-
- @staticmethod
- def true(*_v, **_kwargs) -> Literal[True]:
- return True
-
- @staticmethod
- def false(*_v, **_kwargs) -> Literal[False]:
- return False
-
- @staticmethod
- def identity(v: _T) -> _T:
- return v
-
- @staticmethod
- def only_you(v: _T, *_v, **_kwargs) -> _T:
- return v
-
- @staticmethod
- def equal(a: Any, b: Any) -> bool:
- return a == b
-
- @staticmethod
- def not_equal(a: Any, b: Any) -> bool:
- return a != b
-
- @staticmethod
- def not_(v: Any) -> Any:
- return not v
-
-
-class _Wrapper(Generic[_P, _T]):
- def __init__(self, f: Callable[_P, _T]):
- self._f = f
-
- @property
- def me(self) -> Callable[_P, _T]:
- return self._f
-
- def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T:
- return self._f(*args, **kwargs)
-
- @_Dec.wrap
- def bind(self, *bind_args, **bind_kwargs) -> Callable[..., _T]:
- func = self.me
-
- def bound_func(*args, **kwargs):
- popped = 0
- real_args = []
- for arg in bind_args:
- if CruPlaceholder.check(arg):
- real_args.append(args[popped])
- popped += 1
- else:
- real_args.append(arg)
- real_args.extend(args[popped:])
- return func(*real_args, **(bind_kwargs | kwargs))
-
- return bound_func
-
- class ChainMode(Flag):
- ARGS = auto()
- KWARGS = auto()
- BOTH = ARGS | KWARGS
-
- ArgsChainableCallable: TypeAlias = Callable[..., Iterable[Any]]
- KwargsChainableCallable: TypeAlias = Callable[..., Iterable[tuple[str, Any]]]
- ChainableCallable: TypeAlias = Callable[
- ..., tuple[Iterable[Any], Iterable[tuple[str, Any]]]
- ]
-
- @_Dec.wrap
- def chain_with_args(
- self, funcs: Iterable[ArgsChainableCallable], *bind_args, **bind_kwargs
- ) -> ArgsChainableCallable:
- def chained_func(*args):
- args = self.bind(*bind_args, **bind_kwargs)(*args)
-
- for func in funcs:
- args = _Wrapper(func).bind(*bind_args, **bind_kwargs)(*args)
- return args
-
- return chained_func
-
- @_Dec.wrap
- def chain_with_kwargs(
- self, funcs: Iterable[KwargsChainableCallable], *bind_args, **bind_kwargs
- ) -> KwargsChainableCallable:
- def chained_func(**kwargs):
- kwargs = self.bind(*bind_args, **bind_kwargs)(**kwargs)
- for func in funcs:
- kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(**kwargs)
- return kwargs
-
- return chained_func
-
- @_Dec.wrap
- def chain_with_both(
- self, funcs: Iterable[ChainableCallable], *bind_args, **bind_kwargs
- ) -> ChainableCallable:
- def chained_func(*args, **kwargs):
- for func in funcs:
- args, kwargs = _Wrapper(func).bind(func, *bind_args, **bind_kwargs)(
- *args, **kwargs
- )
- return args, kwargs
-
- return chained_func
-
-
-class _Base:
- none = _Wrapper(_RawBase.none)
- true = _Wrapper(_RawBase.true)
- false = _Wrapper(_RawBase.false)
- identity = _Wrapper(_RawBase.identity)
- only_you = _Wrapper(_RawBase.only_you)
- equal = _Wrapper(_RawBase.equal)
- not_equal = _Wrapper(_RawBase.not_equal)
- not_ = _Wrapper(_RawBase.not_)
-
-
-class _Creators:
- @staticmethod
- def make_isinstance_of_types(*types: type) -> Callable:
- return _Wrapper(lambda v: type(v) in types)
-
-
-class CruFunction:
- RawBase: TypeAlias = _RawBase
- Base: TypeAlias = _Base
- Creators: TypeAlias = _Creators
- Wrapper: TypeAlias = _Wrapper
- Decorators: TypeAlias = _Dec
-
-
-CRU.add_objects(CruFunction)
diff --git a/tools/cru-py/cru/_helper.py b/tools/cru-py/cru/_helper.py
deleted file mode 100644
index 43baf46..0000000
--- a/tools/cru-py/cru/_helper.py
+++ /dev/null
@@ -1,16 +0,0 @@
-from collections.abc import Callable
-from typing import Any, Iterable, TypeVar, cast
-
-_T = TypeVar("_T")
-_D = TypeVar("_D")
-
-
-def remove_element(
- iterable: Iterable[_T | None], to_rm: Iterable[Any], des: type[_D] | None = None
-) -> _D:
- to_rm = set(to_rm)
- return cast(Callable[..., _D], des or list)(v for v in iterable if v not in to_rm)
-
-
-def remove_none(iterable: Iterable[_T | None], des: type[_D] | None = None) -> _D:
- return cast(Callable[..., _D], des or list)(v for v in iterable if v is not None)
diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py
deleted file mode 100644
index f9683ca..0000000
--- a/tools/cru-py/cru/_iter.py
+++ /dev/null
@@ -1,469 +0,0 @@
-from __future__ import annotations
-
-from collections.abc import Iterable, Callable, Generator, Iterator
-from dataclasses import dataclass
-from enum import Enum
-from typing import (
- Concatenate,
- Literal,
- Never,
- Self,
- TypeAlias,
- TypeVar,
- ParamSpec,
- Any,
- Generic,
- cast,
-)
-
-from ._base import CRU
-from ._const import CruNotFound
-from ._error import cru_unreachable
-
-_P = ParamSpec("_P")
-_T = TypeVar("_T")
-_O = TypeVar("_O")
-_V = TypeVar("_V")
-_R = TypeVar("_R")
-
-
-class _Generic:
- class StepActionKind(Enum):
- SKIP = 0
- PUSH = 1
- STOP = 2
- AGGREGATE = 3
-
- @dataclass
- class StepAction(Generic[_V, _R]):
- value: Iterable[Self] | _V | _R | None
- kind: _Generic.StepActionKind
-
- @property
- def push_value(self) -> _V:
- assert self.kind == _Generic.StepActionKind.PUSH
- return cast(_V, self.value)
-
- @property
- def stop_value(self) -> _R:
- assert self.kind == _Generic.StepActionKind.STOP
- return cast(_R, self.value)
-
- @staticmethod
- def skip() -> _Generic.StepAction[_V, _R]:
- return _Generic.StepAction(None, _Generic.StepActionKind.SKIP)
-
- @staticmethod
- def push(value: _V | None) -> _Generic.StepAction[_V, _R]:
- return _Generic.StepAction(value, _Generic.StepActionKind.PUSH)
-
- @staticmethod
- def stop(value: _R | None = None) -> _Generic.StepAction[_V, _R]:
- return _Generic.StepAction(value, _Generic.StepActionKind.STOP)
-
- @staticmethod
- def aggregate(
- *results: _Generic.StepAction[_V, _R],
- ) -> _Generic.StepAction[_V, _R]:
- return _Generic.StepAction(results, _Generic.StepActionKind.AGGREGATE)
-
- @staticmethod
- def push_last(value: _V | None) -> _Generic.StepAction[_V, _R]:
- return _Generic.StepAction.aggregate(
- _Generic.StepAction.push(value), _Generic.StepAction.stop()
- )
-
- def flatten(self) -> Iterable[Self]:
- return _Generic.flatten(
- self,
- is_leave=lambda r: r.kind != _Generic.StepActionKind.AGGREGATE,
- get_children=lambda r: cast(Iterable[Self], r.value),
- )
-
- GeneralStepAction: TypeAlias = StepAction[_V, _R] | _V | _R | None
- IterateOperation: TypeAlias = Callable[[_T, int], GeneralStepAction[_V, _R]]
- IteratePreHook: TypeAlias = Callable[[Iterable[_T]], GeneralStepAction[_V, _R]]
- IteratePostHook: TypeAlias = Callable[[int], GeneralStepAction[_V, _R]]
-
- @staticmethod
- def _is_not_iterable(o: Any) -> bool:
- return not isinstance(o, Iterable)
-
- @staticmethod
- def _return_self(o):
- return o
-
- @staticmethod
- def iterable_flatten(
- maybe_iterable: Iterable[_T] | _T, max_depth: int = -1, *, _depth: int = 0
- ) -> Iterable[Iterable[_T] | _T]:
- if _depth == max_depth or not isinstance(maybe_iterable, Iterable):
- yield maybe_iterable
- return
-
- for child in maybe_iterable:
- yield from _Generic.iterable_flatten(
- child,
- max_depth,
- _depth=_depth + 1,
- )
-
- @staticmethod
- def flatten(
- o: _O,
- max_depth: int = -1,
- /,
- is_leave: CruIterator.ElementPredicate[_O] = _is_not_iterable,
- get_children: CruIterator.ElementTransformer[_O, Iterable[_O]] = _return_self,
- *,
- _depth: int = 0,
- ) -> Iterable[_O]:
- if _depth == max_depth or is_leave(o):
- yield o
- return
- for child in get_children(o):
- yield from _Generic.flatten(
- child,
- max_depth,
- is_leave,
- get_children,
- _depth=_depth + 1,
- )
-
- class Results:
- @staticmethod
- def true(_) -> Literal[True]:
- return True
-
- @staticmethod
- def false(_) -> Literal[False]:
- return False
-
- @staticmethod
- def not_found(_) -> Literal[CruNotFound.VALUE]:
- return CruNotFound.VALUE
-
- @staticmethod
- def _non_result_to_push(value: Any) -> StepAction[_V, _R]:
- return _Generic.StepAction.push(value)
-
- @staticmethod
- def _non_result_to_stop(value: Any) -> StepAction[_V, _R]:
- return _Generic.StepAction.stop(value)
-
- @staticmethod
- def _none_hook(_: Any) -> StepAction[_V, _R]:
- return _Generic.StepAction.skip()
-
- def iterate(
- iterable: Iterable[_T],
- operation: IterateOperation[_T, _V, _R],
- fallback_return: _R,
- pre_iterate: IteratePreHook[_T, _V, _R],
- post_iterate: IteratePostHook[_V, _R],
- convert_value_result: Callable[[_V | _R | None], StepAction[_V, _R]],
- ) -> Generator[_V, None, _R]:
- pre_result = pre_iterate(iterable)
- if not isinstance(pre_result, _Generic.StepAction):
- real_pre_result = convert_value_result(pre_result)
- for r in real_pre_result.flatten():
- if r.kind == _Generic.StepActionKind.STOP:
- return r.stop_value
- elif r.kind == _Generic.StepActionKind.PUSH:
- yield r.push_value
- else:
- assert r.kind == _Generic.StepActionKind.SKIP
-
- for index, element in enumerate(iterable):
- result = operation(element, index)
- if not isinstance(result, _Generic.StepAction):
- real_result = convert_value_result(result)
- for r in real_result.flatten():
- if r.kind == _Generic.StepActionKind.STOP:
- return r.stop_value
- elif r.kind == _Generic.StepActionKind.PUSH:
- yield r.push_value
- else:
- assert r.kind == _Generic.StepActionKind.SKIP
- continue
-
- post_result = post_iterate(index + 1)
- if not isinstance(post_result, _Generic.StepAction):
- real_post_result = convert_value_result(post_result)
- for r in real_post_result.flatten():
- if r.kind == _Generic.StepActionKind.STOP:
- return r.stop_value
- elif r.kind == _Generic.StepActionKind.PUSH:
- yield r.push_value
- else:
- assert r.kind == _Generic.StepActionKind.SKIP
-
- return fallback_return
-
- def create_new(
- iterable: Iterable[_T],
- operation: IterateOperation[_T, _V, _R],
- fallback_return: _R,
- /,
- pre_iterate: IteratePreHook[_T, _V, _R] | None = None,
- post_iterate: IteratePostHook[_V, _R] | None = None,
- ) -> Generator[_V, None, _R]:
- return _Generic.iterate(
- iterable,
- operation,
- fallback_return,
- pre_iterate or _Generic._none_hook,
- post_iterate or _Generic._none_hook,
- _Generic._non_result_to_push,
- )
-
- def get_result(
- iterable: Iterable[_T],
- operation: IterateOperation[_T, _V, _R],
- fallback_return: _R,
- /,
- pre_iterate: IteratePreHook[_T, _V, _R] | None = None,
- post_iterate: IteratePostHook[_V, _R] | None = None,
- ) -> _R:
- try:
- for _ in _Generic.iterate(
- iterable,
- operation,
- fallback_return,
- pre_iterate or _Generic._none_hook,
- post_iterate or _Generic._none_hook,
- _Generic._non_result_to_stop,
- ):
- pass
- except StopIteration as stop:
- return stop.value
- cru_unreachable()
-
-
-class _Helpers:
- @staticmethod
- def auto_count(c: Callable[Concatenate[int, _P], _O]) -> Callable[_P, _O]:
- count = 0
-
- def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _O:
- nonlocal count
- r = c(count, *args, **kwargs)
- count += 1
- return r
-
- return wrapper
-
-
-class _Creators:
- class Raw:
- @staticmethod
- def empty() -> Iterator[Never]:
- return iter([])
-
- @staticmethod
- def range(*args) -> Iterator[int]:
- return iter(range(*args))
-
- @staticmethod
- def unite(*args: _T) -> Iterator[_T]:
- return iter(args)
-
- @staticmethod
- def _concat(*iterables: Iterable[_T]) -> Iterable[_T]:
- for iterable in iterables:
- yield from iterable
-
- @staticmethod
- def concat(*iterables: Iterable[_T]) -> Iterator[_T]:
- return iter(_Creators.Raw._concat(*iterables))
-
- @staticmethod
- def _wrap(f: Callable[_P, Iterable[_O]]) -> Callable[_P, CruIterator[_O]]:
- def _wrapped(*args: _P.args, **kwargs: _P.kwargs) -> CruIterator[_O]:
- return CruIterator(f(*args, **kwargs))
-
- return _wrapped
-
- empty = _wrap(Raw.empty)
- range = _wrap(Raw.range)
- unite = _wrap(Raw.unite)
- concat = _wrap(Raw.concat)
-
-
-class CruIterator(Generic[_T]):
- ElementOperation: TypeAlias = Callable[[_V], Any]
- ElementPredicate: TypeAlias = Callable[[_V], bool]
- AnyElementPredicate: TypeAlias = ElementPredicate[Any]
- ElementTransformer: TypeAlias = Callable[[_V], _O]
- SelfElementTransformer: TypeAlias = ElementTransformer[_V, _V]
- AnyElementTransformer: TypeAlias = ElementTransformer[Any, Any]
-
- Creators: TypeAlias = _Creators
- Helpers: TypeAlias = _Helpers
-
- def __init__(self, iterable: Iterable[_T]) -> None:
- self._iterator = iter(iterable)
-
- def __iter__(self) -> Iterator[_T]:
- return self._iterator
-
- def create_new_me(self, iterable: Iterable[_O]) -> CruIterator[_O]:
- return type(self)(iterable) # type: ignore
-
- @staticmethod
- def _wrap(
- f: Callable[Concatenate[CruIterator[_T], _P], Iterable[_O]],
- ) -> Callable[Concatenate[CruIterator[_T], _P], CruIterator[_O]]:
- def _wrapped(
- self: CruIterator[_T], *args: _P.args, **kwargs: _P.kwargs
- ) -> CruIterator[_O]:
- return self.create_new_me(f(self, *args, **kwargs))
-
- return _wrapped
-
- @_wrap
- def replace_me(self, iterable: Iterable[_O]) -> Iterable[_O]:
- return iterable
-
- def replace_me_with_empty(self) -> CruIterator[Never]:
- return self.create_new_me(_Creators.Raw.empty())
-
- def replace_me_with_range(self, *args) -> CruIterator[int]:
- return self.create_new_me(_Creators.Raw.range(*args))
-
- def replace_me_with_unite(self, *args: _O) -> CruIterator[_O]:
- return self.create_new_me(_Creators.Raw.unite(*args))
-
- def replace_me_with_concat(self, *iterables: Iterable[_T]) -> CruIterator[_T]:
- return self.create_new_me(_Creators.Raw.concat(*iterables))
-
- def to_set(self) -> set[_T]:
- return set(self)
-
- def to_list(self) -> list[_T]:
- return list(self)
-
- def all(self, predicate: ElementPredicate[_T]) -> bool:
- for value in self:
- if not predicate(value):
- return False
- return True
-
- def any(self, predicate: ElementPredicate[_T]) -> bool:
- for value in self:
- if predicate(value):
- return True
- return False
-
- def foreach(self, operation: ElementOperation[_T]) -> None:
- for value in self:
- operation(value)
-
- @_wrap
- def transform(self, transformer: ElementTransformer[_T, _O]) -> Iterable[_O]:
- for value in self:
- yield transformer(value)
-
- map = transform
-
- @_wrap
- def filter(self, predicate: ElementPredicate[_T]) -> Iterable[_T]:
- for value in self:
- if predicate(value):
- yield value
-
- @_wrap
- def continue_if(self, predicate: ElementPredicate[_T]) -> Iterable[_T]:
- for value in self:
- yield value
- if not predicate(value):
- break
-
- def first_n(self, max_count: int) -> CruIterator[_T]:
- if max_count < 0:
- raise ValueError("max_count must be 0 or positive.")
- if max_count == 0:
- return self.replace_me_with_empty() # type: ignore
- return self.continue_if(_Helpers.auto_count(lambda i, _: i < max_count - 1))
-
- def drop_n(self, n: int) -> CruIterator[_T]:
- if n < 0:
- raise ValueError("n must be 0 or positive.")
- if n == 0:
- return self
- return self.filter(_Helpers.auto_count(lambda i, _: i < n))
-
- def single_or(
- self, fallback: _O | CruNotFound = CruNotFound.VALUE
- ) -> _T | _O | CruNotFound:
- first_2 = self.first_n(2)
- has_value = False
- for element in first_2:
- if has_value:
- raise ValueError("More than one value found.")
- has_value = True
- value = element
- if has_value:
- return value
- else:
- return fallback
-
- def first_or(
- self, fallback: _O | CruNotFound = CruNotFound.VALUE
- ) -> _T | _O | CruNotFound:
- return self.first_n(1).single_or(fallback)
-
- @_wrap
- def flatten(self, max_depth: int = -1) -> Iterable[Any]:
- return _Generic.iterable_flatten(self, max_depth)
-
- def select_by_indices(self, indices: Iterable[int]) -> CruIterator[_T]:
- index_set = set(indices)
- max_index = max(index_set)
- return self.first_n(max_index + 1).filter(
- _Helpers.auto_count(lambda i, _: i in index_set)
- )
-
- def remove_values(self, values: Iterable[Any]) -> CruIterator[_T]:
- value_set = set(values)
- return self.filter(lambda v: v not in value_set)
-
- def replace_values(
- self, old_values: Iterable[Any], new_value: _O
- ) -> Iterable[_T | _O]:
- value_set = set(old_values)
- return self.transform(lambda v: new_value if v in value_set else v)
-
- def group_by(self, key_getter: Callable[[_T], _O]) -> dict[_O, list[_T]]:
- result: dict[_O, list[_T]] = {}
-
- for item in self:
- key = key_getter(item)
- if key not in result:
- result[key] = []
- result[key].append(item)
-
- return result
-
- def join_str(self: CruIterator[str], separator: str) -> str:
- return separator.join(self)
-
-
-class CruIterMixin(Generic[_T]):
- def cru_iter(self: Iterable[_T]) -> CruIterator[_T]:
- return CruIterator(self)
-
-
-class CruIterList(list[_T], CruIterMixin[_T]):
- pass
-
-
-class CruIterable:
- Generic: TypeAlias = _Generic
- Iterator: TypeAlias = CruIterator[_T]
- Helpers: TypeAlias = _Helpers
- Mixin: TypeAlias = CruIterMixin[_T]
- IterList: TypeAlias = CruIterList[_T]
-
-
-CRU.add_objects(CruIterable, CruIterator)
diff --git a/tools/cru-py/cru/_type.py b/tools/cru-py/cru/_type.py
deleted file mode 100644
index 1f81da3..0000000
--- a/tools/cru-py/cru/_type.py
+++ /dev/null
@@ -1,52 +0,0 @@
-from collections.abc import Iterable
-from typing import Any
-
-from ._error import CruException, CruLogicError
-from ._iter import CruIterator
-
-
-class CruTypeCheckError(CruException):
- pass
-
-
-DEFAULT_NONE_ERR_MSG = "None is not allowed here."
-DEFAULT_TYPE_ERR_MSG = "Object of this type is not allowed here."
-
-
-class CruTypeSet(set[type]):
- def __init__(self, *types: type):
- type_set = CruIterator(types).filter(lambda t: t is not None).to_set()
- if not CruIterator(type_set).all(lambda t: isinstance(t, type)):
- raise CruLogicError("TypeSet can only contain type.")
- super().__init__(type_set)
-
- def check_value(
- self,
- value: Any,
- /,
- allow_none: bool = False,
- empty_allow_all: bool = True,
- ) -> None:
- if value is None:
- if allow_none:
- return
- else:
- raise CruTypeCheckError(DEFAULT_NONE_ERR_MSG)
- if len(self) == 0 and empty_allow_all:
- return
- if not CruIterator(self).any(lambda t: isinstance(value, t)):
- raise CruTypeCheckError(DEFAULT_TYPE_ERR_MSG)
-
- def check_value_list(
- self,
- values: Iterable[Any],
- /,
- allow_none: bool = False,
- empty_allow_all: bool = True,
- ) -> None:
- for value in values:
- self.check_value(
- value,
- allow_none,
- empty_allow_all,
- )
diff --git a/tools/cru-py/cru/attr.py b/tools/cru-py/cru/attr.py
deleted file mode 100644
index d4cc86a..0000000
--- a/tools/cru-py/cru/attr.py
+++ /dev/null
@@ -1,364 +0,0 @@
-from __future__ import annotations
-
-import copy
-from collections.abc import Callable, Iterable
-from dataclasses import dataclass, field
-from typing import Any
-
-from .list import CruUniqueKeyList
-from ._type import CruTypeSet
-from ._const import CruNotFound, CruUseDefault, CruDontChange
-from ._iter import CruIterator
-
-
-@dataclass
-class CruAttr:
-
- name: str
- value: Any
- description: str | None
-
- @staticmethod
- def make(
- name: str, value: Any = CruUseDefault.VALUE, description: str | None = None
- ) -> CruAttr:
- return CruAttr(name, value, description)
-
-
-CruAttrDefaultFactory = Callable[["CruAttrDef"], Any]
-CruAttrTransformer = Callable[[Any, "CruAttrDef"], Any]
-CruAttrValidator = Callable[[Any, "CruAttrDef"], None]
-
-
-@dataclass
-class CruAttrDef:
- name: str
- description: str
- default_factory: CruAttrDefaultFactory
- transformer: CruAttrTransformer
- validator: CruAttrValidator
-
- def __init__(
- self,
- name: str,
- description: str,
- default_factory: CruAttrDefaultFactory,
- transformer: CruAttrTransformer,
- validator: CruAttrValidator,
- ) -> None:
- self.name = name
- self.description = description
- self.default_factory = default_factory
- self.transformer = transformer
- self.validator = validator
-
- def transform(self, value: Any) -> Any:
- if self.transformer is not None:
- return self.transformer(value, self)
- return value
-
- def validate(self, value: Any, /, force_allow_none: bool = False) -> None:
- if force_allow_none is value is None:
- return
- if self.validator is not None:
- self.validator(value, self)
-
- def transform_and_validate(
- self, value: Any, /, force_allow_none: bool = False
- ) -> Any:
- value = self.transform(value)
- self.validate(value, force_allow_none)
- return value
-
- def make_default_value(self) -> Any:
- return self.transform_and_validate(self.default_factory(self))
-
- def adopt(self, attr: CruAttr) -> CruAttr:
- attr = copy.deepcopy(attr)
-
- if attr.name is None:
- attr.name = self.name
- elif attr.name != self.name:
- raise ValueError(f"Attr name is not match: {attr.name} != {self.name}")
-
- if attr.value is CruUseDefault.VALUE:
- attr.value = self.make_default_value()
- else:
- attr.value = self.transform_and_validate(attr.value)
-
- if attr.description is None:
- attr.description = self.description
-
- return attr
-
- def make(
- self, value: Any = CruUseDefault.VALUE, description: None | str = None
- ) -> CruAttr:
- value = self.make_default_value() if value is CruUseDefault.VALUE else value
- value = self.transform_and_validate(value)
- return CruAttr(
- self.name,
- value,
- description if description is not None else self.description,
- )
-
-
-@dataclass
-class CruAttrDefBuilder:
-
- name: str
- description: str
- types: list[type] | None = field(default=None)
- allow_none: bool = field(default=False)
- default: Any = field(default=CruUseDefault.VALUE)
- default_factory: CruAttrDefaultFactory | None = field(default=None)
- auto_list: bool = field(default=False)
- transformers: list[CruAttrTransformer] = field(default_factory=list)
- validators: list[CruAttrValidator] = field(default_factory=list)
- override_transformer: CruAttrTransformer | None = field(default=None)
- override_validator: CruAttrValidator | None = field(default=None)
-
- build_hook: Callable[[CruAttrDef], None] | None = field(default=None)
-
- def __init__(self, name: str, description: str) -> None:
- super().__init__()
- self.name = name
- self.description = description
-
- def auto_adjust_default(self) -> None:
- if self.default is not CruUseDefault.VALUE and self.default is not None:
- return
- if self.allow_none and self.default is CruUseDefault.VALUE:
- self.default = None
- if not self.allow_none and self.default is None:
- self.default = CruUseDefault.VALUE
- if self.auto_list and not self.allow_none:
- self.default = []
-
- def with_name(self, name: str | CruDontChange) -> CruAttrDefBuilder:
- if name is not CruDontChange.VALUE:
- self.name = name
- return self
-
- def with_description(
- self, default_description: str | CruDontChange
- ) -> CruAttrDefBuilder:
- if default_description is not CruDontChange.VALUE:
- self.description = default_description
- return self
-
- def with_default(self, default: Any) -> CruAttrDefBuilder:
- if default is not CruDontChange.VALUE:
- self.default = default
- return self
-
- def with_default_factory(
- self,
- default_factory: CruAttrDefaultFactory | CruDontChange,
- ) -> CruAttrDefBuilder:
- if default_factory is not CruDontChange.VALUE:
- self.default_factory = default_factory
- return self
-
- def with_types(
- self,
- types: Iterable[type] | None | CruDontChange,
- ) -> CruAttrDefBuilder:
- if types is not CruDontChange.VALUE:
- self.types = None if types is None else list(types)
- return self
-
- def with_allow_none(self, allow_none: bool | CruDontChange) -> CruAttrDefBuilder:
- if allow_none is not CruDontChange.VALUE:
- self.allow_none = allow_none
- return self
-
- def with_auto_list(
- self, auto_list: bool | CruDontChange = True
- ) -> CruAttrDefBuilder:
- if auto_list is not CruDontChange.VALUE:
- self.auto_list = auto_list
- return self
-
- def with_constraint(
- self,
- /,
- allow_none: bool | CruDontChange = CruDontChange.VALUE,
- types: Iterable[type] | None | CruDontChange = CruDontChange.VALUE,
- default: Any = CruDontChange.VALUE,
- default_factory: CruAttrDefaultFactory | CruDontChange = CruDontChange.VALUE,
- auto_list: bool | CruDontChange = CruDontChange.VALUE,
- ) -> CruAttrDefBuilder:
- return (
- self.with_allow_none(allow_none)
- .with_types(types)
- .with_default(default)
- .with_default_factory(default_factory)
- .with_auto_list(auto_list)
- )
-
- def add_transformer(self, transformer: CruAttrTransformer) -> CruAttrDefBuilder:
- self.transformers.append(transformer)
- return self
-
- def clear_transformers(self) -> CruAttrDefBuilder:
- self.transformers.clear()
- return self
-
- def add_validator(self, validator: CruAttrValidator) -> CruAttrDefBuilder:
- self.validators.append(validator)
- return self
-
- def clear_validators(self) -> CruAttrDefBuilder:
- self.validators.clear()
- return self
-
- def with_override_transformer(
- self, override_transformer: CruAttrTransformer | None | CruDontChange
- ) -> CruAttrDefBuilder:
- if override_transformer is not CruDontChange.VALUE:
- self.override_transformer = override_transformer
- return self
-
- def with_override_validator(
- self, override_validator: CruAttrValidator | None | CruDontChange
- ) -> CruAttrDefBuilder:
- if override_validator is not CruDontChange.VALUE:
- self.override_validator = override_validator
- return self
-
- def is_valid(self) -> tuple[bool, str]:
- if not isinstance(self.name, str):
- return False, "Name must be a string!"
- if not isinstance(self.description, str):
- return False, "Default description must be a string!"
- if (
- not self.allow_none
- and self.default is None
- and self.default_factory is None
- ):
- return False, "Default must be set if allow_none is False!"
- return True, ""
-
- @staticmethod
- def _build(
- builder: CruAttrDefBuilder, auto_adjust_default: bool = True
- ) -> CruAttrDef:
- if auto_adjust_default:
- builder.auto_adjust_default()
-
- valid, err = builder.is_valid()
- if not valid:
- raise ValueError(err)
-
- def composed_transformer(value: Any, attr_def: CruAttrDef) -> Any:
- def transform_value(single_value: Any) -> Any:
- for transformer in builder.transformers:
- single_value = transformer(single_value, attr_def)
- return single_value
-
- if builder.auto_list:
- if not isinstance(value, list):
- value = [value]
- value = CruIterator(value).transform(transform_value).to_list()
-
- else:
- value = transform_value(value)
- return value
-
- type_set = None if builder.types is None else CruTypeSet(*builder.types)
-
- def composed_validator(value: Any, attr_def: CruAttrDef):
- def validate_value(single_value: Any) -> None:
- if type_set is not None:
- type_set.check_value(single_value, allow_none=builder.allow_none)
- for validator in builder.validators:
- validator(single_value, attr_def)
-
- if builder.auto_list:
- CruIterator(value).foreach(validate_value)
- else:
- validate_value(value)
-
- real_transformer = builder.override_transformer or composed_transformer
- real_validator = builder.override_validator or composed_validator
-
- default_factory = builder.default_factory
- if default_factory is None:
-
- def default_factory(_d):
- return copy.deepcopy(builder.default)
-
- d = CruAttrDef(
- builder.name,
- builder.description,
- default_factory,
- real_transformer,
- real_validator,
- )
- if builder.build_hook:
- builder.build_hook(d)
- return d
-
- def build(self, auto_adjust_default=True) -> CruAttrDef:
- c = copy.deepcopy(self)
- self.build_hook = None
- return CruAttrDefBuilder._build(c, auto_adjust_default)
-
-
-class CruAttrDefRegistry(CruUniqueKeyList[CruAttrDef, str]):
-
- def __init__(self) -> None:
- super().__init__(lambda d: d.name)
-
- def make_builder(self, name: str, default_description: str) -> CruAttrDefBuilder:
- b = CruAttrDefBuilder(name, default_description)
- b.build_hook = lambda a: self.add(a)
- return b
-
- def adopt(self, attr: CruAttr) -> CruAttr:
- d = self.get(attr.name)
- return d.adopt(attr)
-
-
-class CruAttrTable(CruUniqueKeyList[CruAttr, str]):
- def __init__(self, registry: CruAttrDefRegistry) -> None:
- self._registry: CruAttrDefRegistry = registry
- super().__init__(lambda a: a.name, before_add=registry.adopt)
-
- @property
- def registry(self) -> CruAttrDefRegistry:
- return self._registry
-
- def get_value_or(self, name: str, fallback: Any = CruNotFound.VALUE) -> Any:
- a = self.get_or(name, CruNotFound.VALUE)
- if a is CruNotFound.VALUE:
- return fallback
- return a.value
-
- def get_value(self, name: str) -> Any:
- a = self.get(name)
- return a.value
-
- def make_attr(
- self,
- name: str,
- value: Any = CruUseDefault.VALUE,
- /,
- description: str | None = None,
- ) -> CruAttr:
- d = self._registry.get(name)
- return d.make(value, description or d.description)
-
- def add_value(
- self,
- name: str,
- value: Any = CruUseDefault.VALUE,
- /,
- description: str | None = None,
- *,
- replace: bool = False,
- ) -> CruAttr:
- attr = self.make_attr(name, value, description)
- self.add(attr, replace)
- return attr
diff --git a/tools/cru-py/cru/config.py b/tools/cru-py/cru/config.py
deleted file mode 100644
index 0f6f0d0..0000000
--- a/tools/cru-py/cru/config.py
+++ /dev/null
@@ -1,196 +0,0 @@
-from __future__ import annotations
-
-from typing import Any, TypeVar, Generic
-
-from ._error import CruException
-from .list import CruUniqueKeyList
-from .value import (
- INTEGER_VALUE_TYPE,
- TEXT_VALUE_TYPE,
- CruValueTypeError,
- ValueGeneratorBase,
- ValueType,
-)
-
-_T = TypeVar("_T")
-
-
-class CruConfigError(CruException):
- def __init__(self, message: str, item: ConfigItem, *args, **kwargs):
- super().__init__(message, *args, **kwargs)
- self._item = item
-
- @property
- def item(self) -> ConfigItem:
- return self._item
-
-
-class ConfigItem(Generic[_T]):
- def __init__(
- self,
- name: str,
- description: str,
- value_type: ValueType[_T],
- value: _T | None = None,
- /,
- default: ValueGeneratorBase[_T] | _T | None = None,
- ) -> None:
- self._name = name
- self._description = description
- self._value_type = value_type
- self._value = value
- self._default = default
-
- @property
- def name(self) -> str:
- return self._name
-
- @property
- def description(self) -> str:
- return self._description
-
- @property
- def value_type(self) -> ValueType[_T]:
- return self._value_type
-
- @property
- def is_set(self) -> bool:
- return self._value is not None
-
- @property
- def value(self) -> _T:
- if self._value is None:
- raise CruConfigError(
- "Config value is not set.",
- self,
- user_message=f"Config item {self.name} is not set.",
- )
- return self._value
-
- @property
- def value_str(self) -> str:
- return self.value_type.convert_value_to_str(self.value)
-
- def set_value(self, v: _T | str, allow_convert_from_str=False):
- if allow_convert_from_str:
- self._value = self.value_type.check_value_or_try_convert_from_str(v)
- else:
- self._value = self.value_type.check_value(v)
-
- def reset(self):
- self._value = None
-
- @property
- def default(self) -> ValueGeneratorBase[_T] | _T | None:
- return self._default
-
- @property
- def can_generate_default(self) -> bool:
- return self.default is not None
-
- def generate_default_value(self) -> _T:
- if self.default is None:
- raise CruConfigError(
- "Config item does not support default value generation.", self
- )
- elif isinstance(self.default, ValueGeneratorBase):
- v = self.default.generate()
- else:
- v = self.default
- try:
- self.value_type.check_value(v)
- return v
- except CruValueTypeError as e:
- raise CruConfigError(
- "Config value generator returns an invalid value.", self
- ) from e
-
- def copy(self) -> "ConfigItem":
- return ConfigItem(
- self.name,
- self.description,
- self.value_type,
- self.value,
- self.default,
- )
-
- @property
- def description_str(self) -> str:
- return f"{self.name} ({self.value_type.name}): {self.description}"
-
-
-class Configuration(CruUniqueKeyList[ConfigItem[Any], str]):
- def __init__(self):
- super().__init__(lambda c: c.name)
-
- def get_set_items(self) -> list[ConfigItem[Any]]:
- return [item for item in self if item.is_set]
-
- def get_unset_items(self) -> list[ConfigItem[Any]]:
- return [item for item in self if not item.is_set]
-
- @property
- def all_set(self) -> bool:
- return len(self.get_unset_items()) == 0
-
- @property
- def all_not_set(self) -> bool:
- return len(self.get_set_items()) == 0
-
- def add_text_config(
- self,
- name: str,
- description: str,
- value: str | None = None,
- default: ValueGeneratorBase[str] | str | None = None,
- ) -> ConfigItem[str]:
- item = ConfigItem(name, description, TEXT_VALUE_TYPE, value, default)
- self.add(item)
- return item
-
- def add_int_config(
- self,
- name: str,
- description: str,
- value: int | None = None,
- default: ValueGeneratorBase[int] | int | None = None,
- ) -> ConfigItem[int]:
- item = ConfigItem(name, description, INTEGER_VALUE_TYPE, value, default)
- self.add(item)
- return item
-
- def set_config_item(
- self,
- name: str,
- value: Any | str,
- allow_convert_from_str=True,
- ) -> None:
- item = self.get(name)
- item.set_value(
- value,
- allow_convert_from_str=allow_convert_from_str,
- )
-
- def reset_all(self) -> None:
- for item in self:
- item.reset()
-
- def to_dict(self) -> dict[str, Any]:
- return {item.name: item.value for item in self}
-
- def to_str_dict(self) -> dict[str, str]:
- return {
- item.name: item.value_type.convert_value_to_str(item.value) for item in self
- }
-
- def set_value_dict(
- self,
- value_dict: dict[str, Any],
- allow_convert_from_str: bool = False,
- ) -> None:
- for name, value in value_dict.items():
- item = self.get(name)
- item.set_value(
- value,
- allow_convert_from_str=allow_convert_from_str,
- )
diff --git a/tools/cru-py/cru/list.py b/tools/cru-py/cru/list.py
deleted file mode 100644
index 216a561..0000000
--- a/tools/cru-py/cru/list.py
+++ /dev/null
@@ -1,160 +0,0 @@
-from __future__ import annotations
-
-from collections.abc import Callable, Iterator
-from typing import Any, Generic, Iterable, TypeAlias, TypeVar, overload
-
-from ._error import CruInternalError
-from ._iter import CruIterator
-from ._const import CruNotFound
-
-_T = TypeVar("_T")
-_O = TypeVar("_O")
-
-
-class CruListEdit(CruIterator[_T]):
- def __init__(self, iterable: Iterable[_T], _list: CruList[Any]) -> None:
- super().__init__(iterable)
- self._list = _list
-
- def create_me(self, iterable: Iterable[_O]) -> CruListEdit[_O]:
- return CruListEdit(iterable, self._list)
-
- @property
- def list(self) -> CruList[Any]:
- return self._list
-
- def done(self) -> CruList[Any]:
- self._list.reset(self)
- return self._list
-
-
-class CruList(list[_T]):
- def reset(self, new_values: Iterable[_T]):
- if self is new_values:
- new_values = list(new_values)
- self.clear()
- self.extend(new_values)
- return self
-
- def as_cru_iterator(self) -> CruIterator[_T]:
- return CruIterator(self)
-
- @staticmethod
- def make(maybe_list: Iterable[_T] | _T | None) -> CruList[_T]:
- if maybe_list is None:
- return CruList()
- if isinstance(maybe_list, Iterable):
- return CruList(maybe_list)
- return CruList([maybe_list])
-
-
-_K = TypeVar("_K")
-
-_KeyGetter: TypeAlias = Callable[[_T], _K]
-
-
-class CruUniqueKeyList(Generic[_T, _K]):
- def __init__(
- self,
- key_getter: _KeyGetter[_T, _K],
- *,
- before_add: Callable[[_T], _T] | None = None,
- ):
- super().__init__()
- self._key_getter = key_getter
- self._before_add = before_add
- self._list: CruList[_T] = CruList()
-
- @property
- def key_getter(self) -> _KeyGetter[_T, _K]:
- return self._key_getter
-
- @property
- def internal_list(self) -> CruList[_T]:
- return self._list
-
- def validate_self(self):
- keys = self._list.transform(self._key_getter)
- if len(keys) != len(set(keys)):
- raise CruInternalError("Duplicate keys!")
-
- @overload
- def get_or(
- self, key: _K, fallback: CruNotFound = CruNotFound.VALUE
- ) -> _T | CruNotFound: ...
-
- @overload
- def get_or(self, key: _K, fallback: _O) -> _T | _O: ...
-
- def get_or(
- self, key: _K, fallback: _O | CruNotFound = CruNotFound.VALUE
- ) -> _T | _O | CruNotFound:
- return (
- self._list.as_cru_iterator()
- .filter(lambda v: key == self._key_getter(v))
- .first_or(fallback)
- )
-
- def get(self, key: _K) -> _T:
- value = self.get_or(key)
- if value is CruNotFound.VALUE:
- raise KeyError(f"Key {key} not found!")
- return value # type: ignore
-
- @property
- def keys(self) -> Iterable[_K]:
- return self._list.as_cru_iterator().map(self._key_getter)
-
- def has_key(self, key: _K) -> bool:
- return self.get_or(key) != CruNotFound.VALUE
-
- def try_remove(self, key: _K) -> bool:
- value = self.get_or(key)
- if value is CruNotFound.VALUE:
- return False
- self._list.remove(value)
- return True
-
- def remove(self, key: _K, allow_absence: bool = False) -> None:
- if not self.try_remove(key) and not allow_absence:
- raise KeyError(f"Key {key} not found!")
-
- def add(self, value: _T, /, replace: bool = False) -> None:
- v = self.get_or(self._key_getter(value))
- if v is not CruNotFound.VALUE:
- if not replace:
- raise KeyError(f"Key {self._key_getter(v)} already exists!")
- self._list.remove(v)
- if self._before_add is not None:
- value = self._before_add(value)
- self._list.append(value)
-
- def set(self, value: _T) -> None:
- self.add(value, True)
-
- def extend(self, iterable: Iterable[_T], /, replace: bool = False) -> None:
- values = list(iterable)
- to_remove = []
- for value in values:
- v = self.get_or(self._key_getter(value))
- if v is not CruNotFound.VALUE:
- if not replace:
- raise KeyError(f"Key {self._key_getter(v)} already exists!")
- to_remove.append(v)
- for value in to_remove:
- self._list.remove(value)
- if self._before_add is not None:
- values = [self._before_add(value) for value in values]
- self._list.extend(values)
-
- def clear(self) -> None:
- self._list.reset([])
-
- def __iter__(self) -> Iterator[_T]:
- return iter(self._list)
-
- def __len__(self) -> int:
- return len(self._list)
-
- def cru_iter(self) -> CruIterator[_T]:
- return CruIterator(self._list)
diff --git a/tools/cru-py/cru/parsing.py b/tools/cru-py/cru/parsing.py
deleted file mode 100644
index c31ce35..0000000
--- a/tools/cru-py/cru/parsing.py
+++ /dev/null
@@ -1,290 +0,0 @@
-from __future__ import annotations
-
-from abc import ABCMeta, abstractmethod
-from dataclasses import dataclass
-from enum import Enum
-from typing import NamedTuple, TypeAlias, TypeVar, Generic, NoReturn, Callable
-
-from ._error import CruException
-from ._iter import CruIterable
-
-_T = TypeVar("_T")
-
-
-class StrParseStream:
- class MemStackEntry(NamedTuple):
- pos: int
- lineno: int
-
- class MemStackPopStr(NamedTuple):
- text: str
- lineno: int
-
- def __init__(self, text: str) -> None:
- self._text = text
- self._pos = 0
- self._lineno = 1
- self._length = len(self._text)
- self._valid_pos_range = range(0, self.length + 1)
- self._valid_offset_range = range(-self.length, self.length + 1)
- self._mem_stack: CruIterable.IterList[StrParseStream.MemStackEntry] = (
- CruIterable.IterList()
- )
-
- @property
- def text(self) -> str:
- return self._text
-
- @property
- def length(self) -> int:
- return self._length
-
- @property
- def valid_pos_range(self) -> range:
- return self._valid_pos_range
-
- @property
- def valid_offset_range(self) -> range:
- return self._valid_offset_range
-
- @property
- def pos(self) -> int:
- return self._pos
-
- @property
- def lineno(self) -> int:
- return self._lineno
-
- @property
- def eof(self) -> bool:
- return self._pos == self.length
-
- def peek(self, length: int) -> str:
- real_length = min(length, self.length - self._pos)
- new_position = self._pos + real_length
- text = self._text[self._pos : new_position]
- return text
-
- def read(self, length: int) -> str:
- text = self.peek(length)
- self._pos += len(text)
- self._lineno += text.count("\n")
- return text
-
- def skip(self, length: int) -> None:
- self.read(length)
-
- def peek_str(self, text: str) -> bool:
- if self.pos + len(text) > self.length:
- return False
- for offset in range(len(text)):
- if self._text[self.pos + offset] != text[offset]:
- return False
- return True
-
- def read_str(self, text: str) -> bool:
- if not self.peek_str(text):
- return False
- self._pos += len(text)
- self._lineno += text.count("\n")
- return True
-
- @property
- def mem_stack(self) -> CruIterable.IterList[MemStackEntry]:
- return self._mem_stack
-
- def push_mem(self) -> None:
- self.mem_stack.append(self.MemStackEntry(self.pos, self.lineno))
-
- def pop_mem(self) -> MemStackEntry:
- return self.mem_stack.pop()
-
- def pop_mem_str(self, strip_end: int = 0) -> MemStackPopStr:
- old = self.pop_mem()
- assert self.pos >= old.pos
- return self.MemStackPopStr(
- self._text[old.pos : self.pos - strip_end], old.lineno
- )
-
-
-class ParseError(CruException, Generic[_T]):
- def __init__(
- self,
- message,
- parser: Parser[_T],
- text: str,
- line_number: int | None = None,
- *args,
- **kwargs,
- ):
- super().__init__(message, *args, **kwargs)
- self._parser = parser
- self._text = text
- self._line_number = line_number
-
- @property
- def parser(self) -> Parser[_T]:
- return self._parser
-
- @property
- def text(self) -> str:
- return self._text
-
- @property
- def line_number(self) -> int | None:
- return self._line_number
-
-
-class Parser(Generic[_T], metaclass=ABCMeta):
- def __init__(self, name: str) -> None:
- self._name = name
-
- @property
- def name(self) -> str:
- return self._name
-
- @abstractmethod
- def parse(self, s: str) -> _T:
- raise NotImplementedError()
-
- def raise_parse_exception(
- self, text: str, line_number: int | None = None
- ) -> NoReturn:
- a = line_number and f" at line {line_number}" or ""
- raise ParseError(f"Parser {self.name} failed{a}.", self, text, line_number)
-
-
-class SimpleLineConfigParserEntry(NamedTuple):
- key: str
- value: str
- line_number: int | None = None
-
-
-class SimpleLineConfigParserResult(CruIterable.IterList[SimpleLineConfigParserEntry]):
- pass
-
-
-class SimpleLineConfigParser(Parser[SimpleLineConfigParserResult]):
- """
- The parsing result is a list of tuples (key, value, line number).
- """
-
- Entry: TypeAlias = SimpleLineConfigParserEntry
- Result: TypeAlias = SimpleLineConfigParserResult
-
- def __init__(self) -> None:
- super().__init__(type(self).__name__)
-
- def _parse(self, text: str, callback: Callable[[Entry], None]) -> None:
- for ln, line in enumerate(text.splitlines()):
- line_number = ln + 1
- # check if it's a comment
- if line.strip().startswith("#"):
- continue
- # check if there is a '='
- if line.find("=") == -1:
- self.raise_parse_exception("There is even no '='!", line_number)
- # split at first '='
- key, value = line.split("=", 1)
- key = key.strip()
- value = value.strip()
- callback(SimpleLineConfigParserEntry(key, value, line_number))
-
- def parse(self, text: str) -> Result:
- result = SimpleLineConfigParserResult()
- self._parse(text, lambda item: result.append(item))
- return result
-
-
-class _StrWrapperVarParserTokenKind(Enum):
- TEXT = "TEXT"
- VAR = "VAR"
-
-
-@dataclass
-class _StrWrapperVarParserToken:
- kind: _StrWrapperVarParserTokenKind
- value: str
- line_number: int
-
- @property
- def is_text(self) -> bool:
- return self.kind is _StrWrapperVarParserTokenKind.TEXT
-
- @property
- def is_var(self) -> bool:
- return self.kind is _StrWrapperVarParserTokenKind.VAR
-
- @staticmethod
- def from_mem_str(
- kind: _StrWrapperVarParserTokenKind, mem_str: StrParseStream.MemStackPopStr
- ) -> _StrWrapperVarParserToken:
- return _StrWrapperVarParserToken(kind, mem_str.text, mem_str.lineno)
-
- def __repr__(self) -> str:
- return f"VAR: {self.value}" if self.is_var else "TEXT: ..."
-
-
-class _StrWrapperVarParserResult(CruIterable.IterList[_StrWrapperVarParserToken]):
- pass
-
-
-class StrWrapperVarParser(Parser[_StrWrapperVarParserResult]):
- TokenKind: TypeAlias = _StrWrapperVarParserTokenKind
- Token: TypeAlias = _StrWrapperVarParserToken
- Result: TypeAlias = _StrWrapperVarParserResult
-
- def __init__(self, wrapper: str):
- super().__init__(f"StrWrapperVarParser({wrapper})")
- self._wrapper = wrapper
-
- @property
- def wrapper(self) -> str:
- return self._wrapper
-
- def parse(self, text: str) -> Result:
- result = self.Result()
-
- class _State(Enum):
- TEXT = "TEXT"
- VAR = "VAR"
-
- state = _State.TEXT
- stream = StrParseStream(text)
- stream.push_mem()
-
- while True:
- if stream.eof:
- break
-
- if stream.read_str(self.wrapper):
- if state is _State.TEXT:
- result.append(
- self.Token.from_mem_str(
- self.TokenKind.TEXT, stream.pop_mem_str(len(self.wrapper))
- )
- )
- state = _State.VAR
- stream.push_mem()
- else:
- result.append(
- self.Token.from_mem_str(
- self.TokenKind.VAR,
- stream.pop_mem_str(len(self.wrapper)),
- )
- )
- state = _State.TEXT
- stream.push_mem()
-
- continue
-
- stream.skip(1)
-
- if state is _State.VAR:
- raise ParseError("Text ended without closing variable.", self, text)
-
- mem_str = stream.pop_mem_str()
- if len(mem_str.text) != 0:
- result.append(self.Token.from_mem_str(self.TokenKind.TEXT, mem_str))
-
- return result
diff --git a/tools/cru-py/cru/service/__init__.py b/tools/cru-py/cru/service/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/tools/cru-py/cru/service/__init__.py
+++ /dev/null
diff --git a/tools/cru-py/cru/service/__main__.py b/tools/cru-py/cru/service/__main__.py
deleted file mode 100644
index 1c10e82..0000000
--- a/tools/cru-py/cru/service/__main__.py
+++ /dev/null
@@ -1,20 +0,0 @@
-from cru import CruException
-
-from ._app import create_app
-
-
-def main():
- app = create_app()
- app.run_command()
-
-
-if __name__ == "__main__":
- try:
- main()
- except CruException as e:
- user_message = e.get_user_message()
- if user_message is not None:
- print(f"Error: {user_message}")
- exit(1)
- else:
- raise
diff --git a/tools/cru-py/cru/service/_app.py b/tools/cru-py/cru/service/_app.py
deleted file mode 100644
index 6030dad..0000000
--- a/tools/cru-py/cru/service/_app.py
+++ /dev/null
@@ -1,34 +0,0 @@
-from ._base import (
- AppBase,
- CommandDispatcher,
- AppInitializer,
- PathCommandProvider,
-)
-from ._config import ConfigManager
-from ._template import TemplateManager
-from ._nginx import NginxManager
-from ._external import CliToolCommandProvider
-
-APP_ID = "crupest"
-
-
-class App(AppBase):
- def __init__(self):
- super().__init__(APP_ID, f"{APP_ID}-service")
- self.add_feature(PathCommandProvider())
- self.add_feature(AppInitializer())
- self.add_feature(ConfigManager())
- self.add_feature(TemplateManager())
- self.add_feature(NginxManager())
- self.add_feature(CliToolCommandProvider())
- self.add_feature(CommandDispatcher())
-
- def run_command(self):
- command_dispatcher = self.get_feature(CommandDispatcher)
- command_dispatcher.run_command()
-
-
-def create_app() -> App:
- app = App()
- app.setup()
- return app
diff --git a/tools/cru-py/cru/service/_base.py b/tools/cru-py/cru/service/_base.py
deleted file mode 100644
index ad813c9..0000000
--- a/tools/cru-py/cru/service/_base.py
+++ /dev/null
@@ -1,449 +0,0 @@
-from __future__ import annotations
-
-from argparse import ArgumentParser, Namespace
-from abc import ABC, abstractmethod
-import argparse
-import os
-from pathlib import Path
-from typing import TypeVar, overload
-
-from cru import CruException, CruLogicError
-
-_Feature = TypeVar("_Feature", bound="AppFeatureProvider")
-
-
-class AppError(CruException):
- pass
-
-
-class AppFeatureError(AppError):
- def __init__(self, message, feature: type | str, *args, **kwargs):
- super().__init__(message, *args, **kwargs)
- self._feature = feature
-
- @property
- def feature(self) -> type | str:
- return self._feature
-
-
-class AppPathError(CruException):
- def __init__(self, message, _path: str | Path, *args, **kwargs):
- super().__init__(message, *args, **kwargs)
- self._path = str(_path)
-
- @property
- def path(self) -> str:
- return self._path
-
-
-class AppPath(ABC):
- def __init__(self, id: str, is_dir: bool, description: str) -> None:
- self._is_dir = is_dir
- self._id = id
- self._description = description
-
- @property
- @abstractmethod
- def parent(self) -> AppPath | None: ...
-
- @property
- @abstractmethod
- def app(self) -> AppBase: ...
-
- @property
- def id(self) -> str:
- return self._id
-
- @property
- def description(self) -> str:
- return self._description
-
- @property
- def is_dir(self) -> bool:
- return self._is_dir
-
- @property
- @abstractmethod
- def full_path(self) -> Path: ...
-
- @property
- def full_path_str(self) -> str:
- return str(self.full_path)
-
- def check_parents(self, must_exist: bool = False) -> bool:
- for p in reversed(self.full_path.parents):
- if not p.exists() and not must_exist:
- return False
- if not p.is_dir():
- raise AppPathError("Parents' path must be a dir.", self.full_path)
- return True
-
- def check_self(self, must_exist: bool = False) -> bool:
- if not self.check_parents(must_exist):
- return False
- if not self.full_path.exists():
- if not must_exist:
- return False
- raise AppPathError("Not exist.", self.full_path)
- if self.is_dir:
- if not self.full_path.is_dir():
- raise AppPathError("Should be a directory, but not.", self.full_path)
- else:
- return True
- else:
- if not self.full_path.is_file():
- raise AppPathError("Should be a file, but not.", self.full_path)
- else:
- return True
-
- def ensure(self, create_file: bool = False) -> None:
- e = self.check_self(False)
- if not e:
- os.makedirs(self.full_path.parent, exist_ok=True)
- if self.is_dir:
- os.mkdir(self.full_path)
- elif create_file:
- with open(self.full_path, "w") as f:
- f.write("")
-
- def add_subpath(
- self,
- name: str,
- is_dir: bool,
- /,
- id: str | None = None,
- description: str = "",
- ) -> AppFeaturePath:
- return self.app.add_path(name, is_dir, self, id, description)
-
- @property
- def app_relative_path(self) -> Path:
- return self.full_path.relative_to(self.app.root.full_path)
-
-
-class AppFeaturePath(AppPath):
- def __init__(
- self,
- parent: AppPath,
- name: str,
- is_dir: bool,
- /,
- id: str | None = None,
- description: str = "",
- ) -> None:
- super().__init__(id or name, is_dir, description)
- self._name = name
- self._parent = parent
-
- @property
- def name(self) -> str:
- return self._name
-
- @property
- def parent(self) -> AppPath:
- return self._parent
-
- @property
- def app(self) -> AppBase:
- return self.parent.app
-
- @property
- def full_path(self) -> Path:
- return Path(self.parent.full_path, self.name).resolve()
-
-
-class AppRootPath(AppPath):
- def __init__(self, app: AppBase):
- super().__init__("root", True, "Application root path.")
- self._app = app
- self._full_path: Path | None = None
-
- @property
- def parent(self) -> None:
- return None
-
- @property
- def app(self) -> AppBase:
- return self._app
-
- @property
- def full_path(self) -> Path:
- if self._full_path is None:
- raise AppError("App root path is not set yet.")
- return self._full_path
-
- def setup(self, path: os.PathLike) -> None:
- if self._full_path is not None:
- raise AppError("App root path is already set.")
- self._full_path = Path(path).resolve()
-
-
-class AppFeatureProvider(ABC):
- def __init__(self, name: str, /, app: AppBase | None = None):
- super().__init__()
- self._name = name
- self._app = app if app else AppBase.get_instance()
-
- @property
- def app(self) -> AppBase:
- return self._app
-
- @property
- def name(self) -> str:
- return self._name
-
- @abstractmethod
- def setup(self) -> None: ...
-
-
-class AppCommandFeatureProvider(AppFeatureProvider):
- @abstractmethod
- def get_command_info(self) -> tuple[str, str]: ...
-
- @abstractmethod
- def setup_arg_parser(self, arg_parser: ArgumentParser): ...
-
- @abstractmethod
- def run_command(self, args: Namespace) -> None: ...
-
-
-DATA_DIR_NAME = "data"
-
-
-class PathCommandProvider(AppCommandFeatureProvider):
- def __init__(self) -> None:
- super().__init__("path-command-provider")
-
- def setup(self):
- pass
-
- def get_command_info(self):
- return ("path", "Get information about paths used by app.")
-
- def setup_arg_parser(self, arg_parser: ArgumentParser) -> None:
- subparsers = arg_parser.add_subparsers(
- dest="path_command", required=True, metavar="PATH_COMMAND"
- )
- _list_parser = subparsers.add_parser(
- "list", help="list special paths used by app"
- )
-
- def run_command(self, args: Namespace) -> None:
- if args.path_command == "list":
- for path in self.app.paths:
- print(f"{path.app_relative_path.as_posix()}: {path.description}")
-
-
-class CommandDispatcher(AppFeatureProvider):
- def __init__(self) -> None:
- super().__init__("command-dispatcher")
- self._parsed_args: argparse.Namespace | None = None
-
- def setup_arg_parser(self) -> None:
- epilog = """
-==> to start,
-./tools/manage init
-./tools/manage config init
-ln -s generated/docker-compose.yaml .
-# Then edit config file.
-
-==> to update
-git pull
-./tools/manage template generate --no-dry-run
-docker compose up
- """.strip()
-
- self._map: dict[str, AppCommandFeatureProvider] = {}
- arg_parser = argparse.ArgumentParser(
- description="Service management",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog=epilog,
- )
- arg_parser.add_argument(
- "--project-dir",
- help="The path of the project directory.",
- required=True,
- type=str,
- )
- subparsers = arg_parser.add_subparsers(
- dest="command",
- help="The management command to execute.",
- metavar="COMMAND",
- )
- for feature in self.app.features:
- if isinstance(feature, AppCommandFeatureProvider):
- info = feature.get_command_info()
- command_subparser = subparsers.add_parser(info[0], help=info[1])
- feature.setup_arg_parser(command_subparser)
- self._map[info[0]] = feature
- self._arg_parser = arg_parser
-
- def setup(self):
- pass
-
- @property
- def arg_parser(self) -> argparse.ArgumentParser:
- return self._arg_parser
-
- @property
- def map(self) -> dict[str, AppCommandFeatureProvider]:
- return self._map
-
- def get_program_parsed_args(self) -> argparse.Namespace:
- if self._parsed_args is None:
- self._parsed_args = self.arg_parser.parse_args()
- return self._parsed_args
-
- def run_command(self, args: argparse.Namespace | None = None) -> None:
- real_args = args or self.get_program_parsed_args()
- if real_args.command is None:
- self.arg_parser.print_help()
- return
- self.map[real_args.command].run_command(real_args)
-
-
-class AppInitializer(AppCommandFeatureProvider):
- def __init__(self) -> None:
- super().__init__("app-initializer")
-
- def _init_app(self) -> bool:
- if self.app.app_initialized:
- return False
- self.app.data_dir.ensure()
- return True
-
- def setup(self):
- pass
-
- def get_command_info(self):
- return ("init", "Initialize the app.")
-
- def setup_arg_parser(self, arg_parser):
- pass
-
- def run_command(self, args):
- init = self._init_app()
- if init:
- print("App initialized successfully.")
- else:
- print("App is already initialized. Do nothing.")
-
-
-class AppBase:
- _instance: AppBase | None = None
-
- @staticmethod
- def get_instance() -> AppBase:
- if AppBase._instance is None:
- raise AppError("App instance not initialized")
- return AppBase._instance
-
- def __init__(self, app_id: str, name: str):
- AppBase._instance = self
- self._app_id = app_id
- self._name = name
- self._root = AppRootPath(self)
- self._paths: list[AppFeaturePath] = []
- self._features: list[AppFeatureProvider] = []
-
- def setup(self) -> None:
- command_dispatcher = self.get_feature(CommandDispatcher)
- command_dispatcher.setup_arg_parser()
- program_args = command_dispatcher.get_program_parsed_args()
- self.setup_root(program_args.project_dir)
- self._data_dir = self.add_path(DATA_DIR_NAME, True, id="data")
- for feature in self.features:
- feature.setup()
- for path in self.paths:
- path.check_self()
-
- @property
- def app_id(self) -> str:
- return self._app_id
-
- @property
- def name(self) -> str:
- return self._name
-
- @property
- def root(self) -> AppRootPath:
- return self._root
-
- def setup_root(self, path: os.PathLike) -> None:
- self._root.setup(path)
-
- @property
- def data_dir(self) -> AppFeaturePath:
- return self._data_dir
-
- @property
- def app_initialized(self) -> bool:
- return self.data_dir.check_self()
-
- def ensure_app_initialized(self) -> AppRootPath:
- if not self.app_initialized:
- raise AppError(
- user_message="Root directory does not exist. "
- "Please run 'init' to create one."
- )
- return self.root
-
- @property
- def features(self) -> list[AppFeatureProvider]:
- return self._features
-
- @property
- def paths(self) -> list[AppFeaturePath]:
- return self._paths
-
- def add_feature(self, feature: _Feature) -> _Feature:
- for f in self.features:
- if f.name == feature.name:
- raise AppFeatureError(
- f"Duplicate feature name: {feature.name}.", feature.name
- )
- self._features.append(feature)
- return feature
-
- def add_path(
- self,
- name: str,
- is_dir: bool,
- /,
- parent: AppPath | None = None,
- id: str | None = None,
- description: str = "",
- ) -> AppFeaturePath:
- p = AppFeaturePath(
- parent or self.root, name, is_dir, id=id, description=description
- )
- self._paths.append(p)
- return p
-
- @overload
- def get_feature(self, feature: str) -> AppFeatureProvider: ...
-
- @overload
- def get_feature(self, feature: type[_Feature]) -> _Feature: ...
-
- def get_feature(
- self, feature: str | type[_Feature]
- ) -> AppFeatureProvider | _Feature:
- if isinstance(feature, str):
- for f in self._features:
- if f.name == feature:
- return f
- elif isinstance(feature, type):
- for f in self._features:
- if isinstance(f, feature):
- return f
- else:
- raise CruLogicError("Argument must be the name of feature or its class.")
-
- raise AppFeatureError(f"Feature {feature} not found.", feature)
-
- def get_path(self, name: str) -> AppFeaturePath:
- for p in self._paths:
- if p.id == name or p.name == name:
- return p
- raise AppPathError(f"Application path {name} not found.", name)
diff --git a/tools/cru-py/cru/service/_config.py b/tools/cru-py/cru/service/_config.py
deleted file mode 100644
index cbb9533..0000000
--- a/tools/cru-py/cru/service/_config.py
+++ /dev/null
@@ -1,444 +0,0 @@
-from collections.abc import Iterable
-from typing import Any, Literal, overload
-
-from cru import CruException, CruNotFound
-from cru.config import Configuration, ConfigItem
-from cru.value import (
- INTEGER_VALUE_TYPE,
- TEXT_VALUE_TYPE,
- CruValueTypeError,
- RandomStringValueGenerator,
- UuidValueGenerator,
-)
-from cru.parsing import ParseError, SimpleLineConfigParser
-
-from ._base import AppFeaturePath, AppCommandFeatureProvider
-
-
-class AppConfigError(CruException):
- def __init__(
- self, message: str, configuration: Configuration, *args, **kwargs
- ) -> None:
- super().__init__(message, *args, **kwargs)
- self._configuration = configuration
-
- @property
- def configuration(self) -> Configuration:
- return self._configuration
-
-
-class AppConfigFileError(AppConfigError):
- def __init__(
- self,
- message: str,
- configuration: Configuration,
- *args,
- **kwargs,
- ) -> None:
- super().__init__(message, configuration, *args, **kwargs)
-
-
-class AppConfigFileNotFoundError(AppConfigFileError):
- def __init__(
- self,
- message: str,
- configuration: Configuration,
- file_path: str,
- *args,
- **kwargs,
- ) -> None:
- super().__init__(message, configuration, *args, **kwargs)
- self._file_path = file_path
-
- @property
- def file_path(self) -> str:
- return self._file_path
-
-
-class AppConfigFileParseError(AppConfigFileError):
- def __init__(
- self,
- message: str,
- configuration: Configuration,
- file_content: str,
- *args,
- **kwargs,
- ) -> None:
- super().__init__(message, configuration, *args, **kwargs)
- self._file_content = file_content
- self.__cause__: ParseError
-
- @property
- def file_content(self) -> str:
- return self._file_content
-
- def get_user_message(self) -> str:
- return f"Error while parsing config file at line {self.__cause__.line_number}."
-
-
-class AppConfigFileEntryError(AppConfigFileError):
- def __init__(
- self,
- message: str,
- configuration: Configuration,
- entries: Iterable[SimpleLineConfigParser.Entry],
- *args,
- **kwargs,
- ) -> None:
- super().__init__(message, configuration, *args, **kwargs)
- self._entries = list(entries)
-
- @property
- def error_entries(self) -> list[SimpleLineConfigParser.Entry]:
- return self._entries
-
- @staticmethod
- def entries_to_friendly_message(
- entries: Iterable[SimpleLineConfigParser.Entry],
- ) -> str:
- return "\n".join(
- f"line {entry.line_number}: {entry.key}={entry.value}" for entry in entries
- )
-
- @property
- def friendly_message_head(self) -> str:
- return "Error entries found in config file"
-
- def get_user_message(self) -> str:
- return (
- f"{self.friendly_message_head}:\n"
- f"{self.entries_to_friendly_message(self.error_entries)}"
- )
-
-
-class AppConfigDuplicateEntryError(AppConfigFileEntryError):
- @property
- def friendly_message_head(self) -> str:
- return "Duplicate entries found in config file"
-
-
-class AppConfigEntryValueFormatError(AppConfigFileEntryError):
- @property
- def friendly_message_head(self) -> str:
- return "Invalid value format for entries"
-
-
-class AppConfigItemNotSetError(AppConfigError):
- def __init__(
- self,
- message: str,
- configuration: Configuration,
- items: list[ConfigItem],
- *args,
- **kwargs,
- ) -> None:
- super().__init__(message, configuration, *args, **kwargs)
- self._items = items
-
-
-class ConfigManager(AppCommandFeatureProvider):
- def __init__(self) -> None:
- super().__init__("config-manager")
- configuration = Configuration()
- self._configuration = configuration
- self._loaded: bool = False
- self._init_app_defined_items()
-
- def _init_app_defined_items(self) -> None:
- prefix = self.config_name_prefix
-
- def _add_text(name: str, description: str) -> ConfigItem:
- item = ConfigItem(f"{prefix}_{name}", description, TEXT_VALUE_TYPE)
- self.configuration.add(item)
- return item
-
- def _add_uuid(name: str, description: str) -> ConfigItem:
- item = ConfigItem(
- f"{prefix}_{name}",
- description,
- TEXT_VALUE_TYPE,
- default=UuidValueGenerator(),
- )
- self.configuration.add(item)
- return item
-
- def _add_random_string(
- name: str, description: str, length: int = 32, secure: bool = True
- ) -> ConfigItem:
- item = ConfigItem(
- f"{prefix}_{name}",
- description,
- TEXT_VALUE_TYPE,
- default=RandomStringValueGenerator(length, secure),
- )
- self.configuration.add(item)
- return item
-
- def _add_int(name: str, description: str) -> ConfigItem:
- item = ConfigItem(f"{prefix}_{name}", description, INTEGER_VALUE_TYPE)
- self.configuration.add(item)
- return item
-
- self._domain = _add_text("DOMAIN", "domain name")
- self._email = _add_text("EMAIL", "admin email address")
- _add_text(
- "AUTO_BACKUP_COS_SECRET_ID",
- "access key id for Tencent COS, used for auto backup",
- )
- _add_text(
- "AUTO_BACKUP_COS_SECRET_KEY",
- "access key secret for Tencent COS, used for auto backup",
- )
- _add_text(
- "AUTO_BACKUP_COS_ENDPOINT",
- "endpoint (cos.*.myqcloud.com) for Tencent COS, used for auto backup",
- )
- _add_text(
- "AUTO_BACKUP_COS_BUCKET",
- "bucket name for Tencent COS, used for auto backup",
- )
- _add_uuid("V2RAY_TOKEN", "v2ray user id")
- _add_uuid("V2RAY_PATH", "v2ray path, which will be prefixed by _")
- _add_random_string("2FAUTH_APP_KEY", "2FAuth App Key")
- _add_text("2FAUTH_MAIL_USERNAME", "2FAuth SMTP user")
- _add_text("2FAUTH_MAIL_PASSWORD", "2FAuth SMTP password")
- _add_text("GIT_SERVER_USERNAME", "Git server username")
- _add_text("GIT_SERVER_PASSWORD", "Git server password")
-
- def setup(self) -> None:
- self._config_file_path = self.app.data_dir.add_subpath(
- "config", False, description="Configuration file path."
- )
-
- @property
- def config_name_prefix(self) -> str:
- return self.app.app_id.upper()
-
- @property
- def configuration(self) -> Configuration:
- return self._configuration
-
- @property
- def config_file_path(self) -> AppFeaturePath:
- return self._config_file_path
-
- @property
- def all_set(self) -> bool:
- return self.configuration.all_set
-
- def get_item(self, name: str) -> ConfigItem[Any]:
- if not name.startswith(self.config_name_prefix + "_"):
- name = f"{self.config_name_prefix}_{name}"
-
- item = self.configuration.get_or(name, None)
- if item is None:
- raise AppConfigError(f"Config item '{name}' not found.", self.configuration)
- return item
-
- @overload
- def get_item_value_str(self, name: str) -> str: ...
-
- @overload
- def get_item_value_str(self, name: str, ensure_set: Literal[True]) -> str: ...
-
- @overload
- def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: ...
-
- def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None:
- self.load_config_file()
- item = self.get_item(name)
- if not item.is_set:
- if ensure_set:
- raise AppConfigItemNotSetError(
- f"Config item '{name}' is not set.", self.configuration, [item]
- )
- return None
- return item.value_str
-
- def get_str_dict(self, ensure_all_set: bool = True) -> dict[str, str]:
- self.load_config_file()
- if ensure_all_set and not self.configuration.all_set:
- raise AppConfigItemNotSetError(
- "Some config items are not set.",
- self.configuration,
- self.configuration.get_unset_items(),
- )
- return self.configuration.to_str_dict()
-
- @property
- def domain_item_name(self) -> str:
- return self._domain.name
-
- def get_domain_value_str(self) -> str:
- return self.get_item_value_str(self._domain.name)
-
- def get_email_value_str_optional(self) -> str | None:
- return self.get_item_value_str(self._email.name, ensure_set=False)
-
- def _set_with_default(self) -> None:
- if not self.configuration.all_not_set:
- raise AppConfigError(
- "Config is not clean. "
- "Some config items are already set. "
- "Can't set again with default value.",
- self.configuration,
- )
- for item in self.configuration:
- if item.can_generate_default:
- item.set_value(item.generate_default_value())
-
- def _to_config_file_content(self) -> str:
- content = "".join(
- [
- f"{item.name}={item.value_str if item.is_set else ''}\n"
- for item in self.configuration
- ]
- )
- return content
-
- def _create_init_config_file(self) -> None:
- if self.config_file_path.check_self():
- raise AppConfigError(
- "Config file already exists.",
- self.configuration,
- user_message=f"The config file at "
- f"{self.config_file_path.full_path_str} already exists.",
- )
- self._set_with_default()
- self.config_file_path.ensure()
- with open(
- self.config_file_path.full_path, "w", encoding="utf-8", newline="\n"
- ) as file:
- file.write(self._to_config_file_content())
-
- def _parse_config_file(self) -> SimpleLineConfigParser.Result:
- if not self.config_file_path.check_self():
- raise AppConfigFileNotFoundError(
- "Config file not found.",
- self.configuration,
- self.config_file_path.full_path_str,
- user_message=f"The config file at "
- f"{self.config_file_path.full_path_str} does not exist. "
- f"You can create an initial one with 'init' command.",
- )
-
- text = self.config_file_path.full_path.read_text()
- try:
- parser = SimpleLineConfigParser()
- return parser.parse(text)
- except ParseError as e:
- raise AppConfigFileParseError(
- "Failed to parse config file.", self.configuration, text
- ) from e
-
- def _parse_and_print_config_file(self) -> None:
- parse_result = self._parse_config_file()
- for entry in parse_result:
- print(f"{entry.key}={entry.value}")
-
- def _check_duplicate(
- self,
- parse_result: dict[str, list[SimpleLineConfigParser.Entry]],
- ) -> dict[str, SimpleLineConfigParser.Entry]:
- entry_dict: dict[str, SimpleLineConfigParser.Entry] = {}
- duplicate_entries: list[SimpleLineConfigParser.Entry] = []
- for key, entries in parse_result.items():
- entry_dict[key] = entries[0]
- if len(entries) > 1:
- duplicate_entries.extend(entries)
- if len(duplicate_entries) > 0:
- raise AppConfigDuplicateEntryError(
- "Duplicate entries found.", self.configuration, duplicate_entries
- )
-
- return entry_dict
-
- def _check_type(
- self, entry_dict: dict[str, SimpleLineConfigParser.Entry]
- ) -> dict[str, Any]:
- value_dict: dict[str, Any] = {}
- error_entries: list[SimpleLineConfigParser.Entry] = []
- errors: list[CruValueTypeError] = []
- for key, entry in entry_dict.items():
- try:
- if entry.value == "":
- value_dict[key] = None
- else:
- value = entry.value
- config_item = self.configuration.get_or(key)
- if config_item is not CruNotFound.VALUE:
- value = config_item.value_type.convert_str_to_value(value)
- value_dict[key] = value
- except CruValueTypeError as e:
- error_entries.append(entry)
- errors.append(e)
- if len(error_entries) > 0:
- raise AppConfigEntryValueFormatError(
- "Entry value format is not correct.",
- self.configuration,
- error_entries,
- ) from ExceptionGroup("Multiple format errors occurred.", errors)
- return value_dict
-
- def _read_config_file(self) -> dict[str, Any]:
- parsed = self._parse_config_file()
- entry_groups = parsed.cru_iter().group_by(lambda e: e.key)
- entry_dict = self._check_duplicate(entry_groups)
- value_dict = self._check_type(entry_dict)
- return value_dict
-
- def _real_load_config_file(self) -> None:
- self.configuration.reset_all()
- value_dict = self._read_config_file()
- for key, value in value_dict.items():
- if value is None:
- continue
- self.configuration.set_config_item(key, value)
-
- def load_config_file(self, force=False) -> None:
- if force or not self._loaded:
- self._real_load_config_file()
- self._loaded = True
-
- def _print_app_config_info(self):
- for item in self.configuration:
- print(item.description_str)
-
- def get_command_info(self):
- return "config", "Manage configuration."
-
- def setup_arg_parser(self, arg_parser) -> None:
- subparsers = arg_parser.add_subparsers(
- dest="config_command", required=True, metavar="CONFIG_COMMAND"
- )
- _init_parser = subparsers.add_parser(
- "init", help="create an initial config file"
- )
- _print_app_parser = subparsers.add_parser(
- "print-app",
- help="print information of the config items defined by app",
- )
- _print_parser = subparsers.add_parser("print", help="print current config")
- _check_config_parser = subparsers.add_parser(
- "check",
- help="check the validity of the config file",
- )
- _check_config_parser.add_argument(
- "-f",
- "--format-only",
- action="store_true",
- help="only check content format, not app config item requirements.",
- )
-
- def run_command(self, args) -> None:
- if args.config_command == "init":
- self._create_init_config_file()
- elif args.config_command == "print-app":
- self._print_app_config_info()
- elif args.config_command == "print":
- self._parse_and_print_config_file()
- elif args.config_command == "check":
- if args.format_only:
- self._parse_config_file()
- else:
- self._read_config_file()
diff --git a/tools/cru-py/cru/service/_external.py b/tools/cru-py/cru/service/_external.py
deleted file mode 100644
index 2347e95..0000000
--- a/tools/cru-py/cru/service/_external.py
+++ /dev/null
@@ -1,81 +0,0 @@
-from ._base import AppCommandFeatureProvider
-from ._nginx import NginxManager
-
-
-class CliToolCommandProvider(AppCommandFeatureProvider):
- def __init__(self) -> None:
- super().__init__("cli-tool-command-provider")
-
- def setup(self):
- pass
-
- def get_command_info(self):
- return ("gen-cli", "Get commands of running external cli tools.")
-
- def setup_arg_parser(self, arg_parser):
- subparsers = arg_parser.add_subparsers(
- dest="gen_cli_command", required=True, metavar="GEN_CLI_COMMAND"
- )
- certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
- certbot_parser.add_argument(
- "-t", "--test", action="store_true", help="run certbot in test mode"
- )
- _install_docker_parser = subparsers.add_parser(
- "install-docker", help="print docker installation commands"
- )
- _update_blog_parser = subparsers.add_parser(
- "update-blog", help="print blog update command"
- )
-
- def _print_install_docker_commands(self) -> None:
- output = """
-### COMMAND: uninstall apt docker
-for pkg in docker.io docker-doc docker-compose \
-podman-docker containerd runc; \
-do sudo apt-get remove $pkg; done
-
-### COMMAND: prepare apt certs
-sudo apt-get update
-sudo apt-get install ca-certificates curl
-sudo install -m 0755 -d /etc/apt/keyrings
-
-### COMMAND: install certs
-sudo curl -fsSL https://download.docker.com/linux/debian/gpg \
--o /etc/apt/keyrings/docker.asc
-sudo chmod a+r /etc/apt/keyrings/docker.asc
-
-### COMMAND: add docker apt source
-echo \\
- "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \
-https://download.docker.com/linux/debian \\
- $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\
- sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
-
-### COMMAND: update apt and install docker
-sudo apt-get update
-sudo apt-get install docker-ce docker-ce-cli containerd.io \
-docker-buildx-plugin docker-compose-plugin
-
-### COMMAND: setup system for docker
-sudo systemctl enable docker
-sudo systemctl start docker
-sudo groupadd -f docker
-sudo usermod -aG docker $USER
-# Remember to log out and log back in for the group changes to take effect
-""".strip()
- print(output)
-
- def _print_update_blog_command(self):
- output = """
-### COMMAND: update blog
-docker exec -it blog /scripts/update.bash
-""".strip()
- print(output)
-
- def run_command(self, args):
- if args.gen_cli_command == "certbot":
- self.app.get_feature(NginxManager).print_all_certbot_commands(args.test)
- elif args.gen_cli_command == "install-docker":
- self._print_install_docker_commands()
- elif args.gen_cli_command == "update-blog":
- self._print_update_blog_command() \ No newline at end of file
diff --git a/tools/cru-py/cru/service/_nginx.py b/tools/cru-py/cru/service/_nginx.py
deleted file mode 100644
index 6c77971..0000000
--- a/tools/cru-py/cru/service/_nginx.py
+++ /dev/null
@@ -1,268 +0,0 @@
-from argparse import Namespace
-from enum import Enum, auto
-import re
-import subprocess
-from typing import TypeAlias
-
-from cru import CruInternalError
-
-from ._base import AppCommandFeatureProvider
-from ._config import ConfigManager
-from ._template import TemplateManager
-
-
-class CertbotAction(Enum):
- CREATE = auto()
- EXPAND = auto()
- SHRINK = auto()
- RENEW = auto()
-
-
-class NginxManager(AppCommandFeatureProvider):
- CertbotAction: TypeAlias = CertbotAction
-
- def __init__(self) -> None:
- super().__init__("nginx-manager")
- self._domains_cache: list[str] | None = None
-
- def setup(self) -> None:
- pass
-
- @property
- def _config_manager(self) -> ConfigManager:
- return self.app.get_feature(ConfigManager)
-
- @property
- def root_domain(self) -> str:
- return self._config_manager.get_domain_value_str()
-
- @property
- def domains(self) -> list[str]:
- if self._domains_cache is None:
- self._domains_cache = self._get_domains()
- return self._domains_cache
-
- @property
- def subdomains(self) -> list[str]:
- suffix = "." + self.root_domain
- return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)]
-
- @property
- def _domain_config_name(self) -> str:
- return self._config_manager.domain_item_name
-
- def _get_domains_from_text(self, text: str) -> set[str]:
- domains: set[str] = set()
- regex = re.compile(r"server_name\s+(\S+)\s*;")
- for match in regex.finditer(text):
- domains.add(match[1])
- return domains
-
- def _join_generated_nginx_conf_text(self) -> str:
- text = ""
- template_manager = self.app.get_feature(TemplateManager)
- for nginx_conf in template_manager.generate():
- text += nginx_conf[1]
- return text
-
- def _get_domains(self) -> list[str]:
- text = self._join_generated_nginx_conf_text()
- domains = list(self._get_domains_from_text(text))
- domains.remove(self.root_domain)
- return [self.root_domain, *domains]
-
- def _print_domains(self) -> None:
- for domain in self.domains:
- print(domain)
-
- def _certbot_command(
- self,
- action: CertbotAction | str,
- test: bool,
- *,
- docker=True,
- standalone=None,
- email=None,
- agree_tos=True,
- ) -> str:
- if isinstance(action, str):
- action = CertbotAction[action.upper()]
-
- command_args = []
-
- add_domain_option = True
- if action is CertbotAction.CREATE:
- if standalone is None:
- standalone = True
- command_action = "certonly"
- elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]:
- if standalone is None:
- standalone = False
- command_action = "certonly"
- elif action is CertbotAction.RENEW:
- if standalone is None:
- standalone = False
- add_domain_option = False
- command_action = "renew"
- else:
- raise CruInternalError("Invalid certbot action.")
-
- data_dir = self.app.data_dir.full_path.as_posix()
-
- if not docker:
- command_args.append("certbot")
- else:
- command_args.extend(
- [
- "docker run -it --rm --name certbot",
- f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"',
- f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"',
- ]
- )
- if standalone:
- command_args.append('-p "0.0.0.0:80:80"')
- else:
- command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"')
-
- command_args.append("certbot/certbot")
-
- command_args.append(command_action)
-
- command_args.append(f"--cert-name {self.root_domain}")
-
- if standalone:
- command_args.append("--standalone")
- else:
- command_args.append("--webroot -w /var/www/certbot")
-
- if add_domain_option:
- command_args.append(" ".join([f"-d {domain}" for domain in self.domains]))
-
- if email is not None:
- command_args.append(f"--email {email}")
-
- if agree_tos:
- command_args.append("--agree-tos")
-
- if test:
- command_args.append("--test-cert --dry-run")
-
- return " ".join(command_args)
-
- def print_all_certbot_commands(self, test: bool):
- print("### COMMAND: (standalone) create certs")
- print(
- self._certbot_command(
- CertbotAction.CREATE,
- test,
- email=self._config_manager.get_email_value_str_optional(),
- )
- )
- print()
- print("### COMMAND: (webroot+nginx) expand or shrink certs")
- print(
- self._certbot_command(
- CertbotAction.EXPAND,
- test,
- email=self._config_manager.get_email_value_str_optional(),
- )
- )
- print()
- print("### COMMAND: (webroot+nginx) renew certs")
- print(
- self._certbot_command(
- CertbotAction.RENEW,
- test,
- email=self._config_manager.get_email_value_str_optional(),
- )
- )
-
- @property
- def _cert_path_str(self) -> str:
- return str(
- self.app.data_dir.full_path
- / "certbot/certs/live"
- / self.root_domain
- / "fullchain.pem"
- )
-
- def get_command_info(self):
- return "nginx", "Manage nginx related things."
-
- def setup_arg_parser(self, arg_parser):
- subparsers = arg_parser.add_subparsers(
- dest="nginx_command", required=True, metavar="NGINX_COMMAND"
- )
- _list_parser = subparsers.add_parser("list", help="list domains")
- certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
- certbot_parser.add_argument(
- "--no-test",
- action="store_true",
- help="remove args making certbot run in test mode",
- )
-
- def run_command(self, args: Namespace) -> None:
- if args.nginx_command == "list":
- self._print_domains()
- elif args.nginx_command == "certbot":
- self.print_all_certbot_commands(not args.no_test)
-
- def _generate_dns_zone(
- self,
- ip: str,
- /,
- ttl: str | int = 600,
- *,
- enable_mail: bool = True,
- dkim: str | None = None,
- ) -> str:
- # TODO: Not complete and test now.
- root_domain = self.root_domain
- result = f"$ORIGIN {root_domain}.\n\n"
- result += "; A records\n"
- result += f"@ {ttl} IN A {ip}\n"
- for subdomain in self.subdomains:
- result += f"{subdomain} {ttl} IN A {ip}\n"
-
- if enable_mail:
- result += "\n; MX records\n"
- result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n"
- result += "\n; SPF record\n"
- result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n'
- if dkim is not None:
- result += "\n; DKIM record\n"
- result += f'mail._domainkey {ttl} IN TEXT "{dkim}"'
- result += "\n; DMARC record\n"
- dmarc_options = [
- "v=DMARC1",
- "p=none",
- f"rua=mailto:dmarc.report@{root_domain}",
- f"ruf=mailto:dmarc.report@{root_domain}",
- "sp=none",
- "ri=86400",
- ]
- result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n'
- return result
-
- def _get_dkim_from_mailserver(self) -> str | None:
- # TODO: Not complete and test now.
- dkim_path = (
- self.app.data_dir.full_path
- / "dms/config/opendkim/keys"
- / self.root_domain
- / "mail.txt"
- )
- if not dkim_path.exists():
- return None
-
- p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True)
- value = ""
- for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")):
- value += match.group(1)
- return value
-
- def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str:
- # TODO: Not complete and test now.
- return self._generate_dns_zone(
- ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver()
- )
diff --git a/tools/cru-py/cru/service/_template.py b/tools/cru-py/cru/service/_template.py
deleted file mode 100644
index 1381700..0000000
--- a/tools/cru-py/cru/service/_template.py
+++ /dev/null
@@ -1,90 +0,0 @@
-from argparse import Namespace
-from pathlib import Path
-import shutil
-
-from cru.template import TemplateTree, CruStrWrapperTemplate
-
-from ._base import AppCommandFeatureProvider, AppFeaturePath
-from ._config import ConfigManager
-
-
-class TemplateManager(AppCommandFeatureProvider):
- def __init__(self, prefix: str | None = None):
- super().__init__("template-manager")
- self._prefix = prefix or self.app.app_id.upper()
-
- def setup(self) -> None:
- self._templates_dir = self.app.add_path("templates", True)
- self._generated_dir = self.app.add_path("generated", True)
- self._template_tree: TemplateTree[CruStrWrapperTemplate] | None = None
-
- @property
- def prefix(self) -> str:
- return self._prefix
-
- @property
- def templates_dir(self) -> AppFeaturePath:
- return self._templates_dir
-
- @property
- def generated_dir(self) -> AppFeaturePath:
- return self._generated_dir
-
- @property
- def template_tree(self) -> TemplateTree[CruStrWrapperTemplate]:
- if self._template_tree is None:
- return self.reload()
- return self._template_tree
-
- def reload(self) -> TemplateTree:
- self._template_tree = TemplateTree(
- lambda text: CruStrWrapperTemplate(text), self.templates_dir.full_path_str
- )
- return self._template_tree
-
- def _print_file_lists(self) -> None:
- for path, template in self.template_tree.templates:
- print(f"[{template.variable_count}]", path.as_posix())
-
- def generate(self) -> list[tuple[Path, str]]:
- config_manager = self.app.get_feature(ConfigManager)
- return self.template_tree.generate(config_manager.get_str_dict())
-
- def _generate_files(self, dry_run: bool) -> None:
- config_manager = self.app.get_feature(ConfigManager)
- if not dry_run and self.generated_dir.full_path.exists():
- shutil.rmtree(self.generated_dir.full_path)
- self.template_tree.generate_to(
- self.generated_dir.full_path_str, config_manager.get_str_dict(), dry_run
- )
-
- def get_command_info(self):
- return ("template", "Manage templates.")
-
- def setup_arg_parser(self, arg_parser):
- subparsers = arg_parser.add_subparsers(
- dest="template_command", required=True, metavar="TEMPLATE_COMMAND"
- )
- _list_parser = subparsers.add_parser("list", help="list templates")
- _variables_parser = subparsers.add_parser(
- "variables", help="list variables used in all templates"
- )
- generate_parser = subparsers.add_parser("generate", help="generate templates")
- generate_parser.add_argument(
- "--no-dry-run", action="store_true", help="generate and write target files"
- )
-
- def run_command(self, args: Namespace) -> None:
- if args.template_command == "list":
- self._print_file_lists()
- elif args.template_command == "variables":
- for var in self.template_tree.variables:
- print(var)
- elif args.template_command == "generate":
- dry_run = not args.no_dry_run
- self._generate_files(dry_run)
- if dry_run:
- print("Dry run successfully.")
- print(
- f"Will delete dir {self.generated_dir.full_path_str} if it exists."
- )
diff --git a/tools/cru-py/cru/system.py b/tools/cru-py/cru/system.py
deleted file mode 100644
index f321717..0000000
--- a/tools/cru-py/cru/system.py
+++ /dev/null
@@ -1,23 +0,0 @@
-import os.path
-import re
-
-
-def check_debian_derivative_version(name: str) -> None | str:
- if not os.path.isfile("/etc/os-release"):
- return None
- with open("/etc/os-release", "r") as f:
- content = f.read()
- if f"ID={name}" not in content:
- return None
- m = re.search(r'VERSION_ID="(.+)"', content)
- if m is None:
- return None
- return m.group(1)
-
-
-def check_ubuntu_version() -> None | str:
- return check_debian_derivative_version("ubuntu")
-
-
-def check_debian_version() -> None | str:
- return check_debian_derivative_version("debian")
diff --git a/tools/cru-py/cru/template.py b/tools/cru-py/cru/template.py
deleted file mode 100644
index 35d68ac..0000000
--- a/tools/cru-py/cru/template.py
+++ /dev/null
@@ -1,207 +0,0 @@
-from abc import ABCMeta, abstractmethod
-from collections.abc import Callable, Mapping
-from pathlib import Path
-from string import Template
-from typing import Generic, TypeVar
-
-from ._iter import CruIterator
-from ._error import CruException
-
-from .parsing import StrWrapperVarParser
-
-
-class CruTemplateError(CruException):
- pass
-
-
-class CruTemplateBase(metaclass=ABCMeta):
- def __init__(self, text: str):
- self._text = text
- self._variables: set[str] | None = None
-
- @abstractmethod
- def _get_variables(self) -> set[str]:
- raise NotImplementedError()
-
- @property
- def text(self) -> str:
- return self._text
-
- @property
- def variables(self) -> set[str]:
- if self._variables is None:
- self._variables = self._get_variables()
- return self._variables
-
- @property
- def variable_count(self) -> int:
- return len(self.variables)
-
- @property
- def has_variables(self) -> bool:
- return self.variable_count > 0
-
- @abstractmethod
- def _do_generate(self, mapping: dict[str, str]) -> str:
- raise NotImplementedError()
-
- def generate(self, mapping: Mapping[str, str], allow_extra: bool = True) -> str:
- values = dict(mapping)
- if not self.variables <= set(values.keys()):
- raise CruTemplateError("Missing variables.")
- if not allow_extra and not set(values.keys()) <= self.variables:
- raise CruTemplateError("Extra variables.")
- return self._do_generate(values)
-
-
-class CruTemplate(CruTemplateBase):
- def __init__(self, prefix: str, text: str):
- super().__init__(text)
- self._prefix = prefix
- self._template = Template(text)
-
- def _get_variables(self) -> set[str]:
- return (
- CruIterator(self._template.get_identifiers())
- .filter(lambda i: i.startswith(self.prefix))
- .to_set()
- )
-
- @property
- def prefix(self) -> str:
- return self._prefix
-
- @property
- def py_template(self) -> Template:
- return self._template
-
- @property
- def all_variables(self) -> set[str]:
- return set(self._template.get_identifiers())
-
- def _do_generate(self, mapping: dict[str, str]) -> str:
- return self._template.safe_substitute(mapping)
-
-
-class CruStrWrapperTemplate(CruTemplateBase):
- def __init__(self, text: str, wrapper: str = "@@"):
- super().__init__(text)
- self._wrapper = wrapper
- self._tokens: StrWrapperVarParser.Result
-
- @property
- def wrapper(self) -> str:
- return self._wrapper
-
- def _get_variables(self):
- self._tokens = StrWrapperVarParser(self.wrapper).parse(self.text)
- return (
- self._tokens.cru_iter()
- .filter(lambda t: t.is_var)
- .map(lambda t: t.value)
- .to_set()
- )
-
- def _do_generate(self, mapping):
- return (
- self._tokens.cru_iter()
- .map(lambda t: mapping[t.value] if t.is_var else t.value)
- .join_str("")
- )
-
-
-_Template = TypeVar("_Template", bound=CruTemplateBase)
-
-
-class TemplateTree(Generic[_Template]):
- def __init__(
- self,
- template_generator: Callable[[str], _Template],
- source: str,
- *,
- template_file_suffix: str | None = ".template",
- ):
- """
- If template_file_suffix is not None, the files will be checked according to the
- suffix of the file name. If the suffix matches, the file will be regarded as a
- template file. Otherwise, it will be regarded as a non-template file.
- Content of template file must contain variables that need to be replaced, while
- content of non-template file may not contain any variables.
- If either case is false, it generally means whether the file is a template is
- wrongly handled.
- """
- self._template_generator = template_generator
- self._files: list[tuple[Path, _Template]] = []
- self._source = source
- self._template_file_suffix = template_file_suffix
- self._load()
-
- @property
- def templates(self) -> list[tuple[Path, _Template]]:
- return self._files
-
- @property
- def source(self) -> str:
- return self._source
-
- @property
- def template_file_suffix(self) -> str | None:
- return self._template_file_suffix
-
- @staticmethod
- def _scan_files(root: str) -> list[Path]:
- root_path = Path(root)
- result: list[Path] = []
- for path in root_path.glob("**/*"):
- if not path.is_file():
- continue
- path = path.relative_to(root_path)
- result.append(Path(path))
- return result
-
- def _load(self) -> None:
- files = self._scan_files(self.source)
- for file_path in files:
- template_file = Path(self.source) / file_path
- with open(template_file, "r") as f:
- content = f.read()
- template = self._template_generator(content)
- if self.template_file_suffix is not None:
- should_be_template = file_path.name.endswith(self.template_file_suffix)
- if should_be_template and not template.has_variables:
- raise CruTemplateError(
- f"Template file {file_path} has no variables."
- )
- elif not should_be_template and template.has_variables:
- raise CruTemplateError(f"Non-template {file_path} has variables.")
- self._files.append((file_path, template))
-
- @property
- def variables(self) -> set[str]:
- s = set()
- for _, template in self.templates:
- s.update(template.variables)
- return s
-
- def generate(self, variables: Mapping[str, str]) -> list[tuple[Path, str]]:
- result: list[tuple[Path, str]] = []
- for path, template in self.templates:
- if self.template_file_suffix is not None and path.name.endswith(
- self.template_file_suffix
- ):
- path = path.parent / (path.name[: -len(self.template_file_suffix)])
-
- text = template.generate(variables)
- result.append((path, text))
- return result
-
- def generate_to(
- self, destination: str, variables: Mapping[str, str], dry_run: bool
- ) -> None:
- generated = self.generate(variables)
- if not dry_run:
- for path, text in generated:
- des = Path(destination) / path
- des.parent.mkdir(parents=True, exist_ok=True)
- with open(des, "w") as f:
- f.write(text)
diff --git a/tools/cru-py/cru/tool.py b/tools/cru-py/cru/tool.py
deleted file mode 100644
index 377f5d7..0000000
--- a/tools/cru-py/cru/tool.py
+++ /dev/null
@@ -1,82 +0,0 @@
-import shutil
-import subprocess
-from typing import Any
-from collections.abc import Iterable
-
-from ._error import CruException
-
-
-class CruExternalToolError(CruException):
- def __init__(self, message: str, tool: str, *args, **kwargs) -> None:
- super().__init__(message, *args, **kwargs)
- self._tool = tool
-
- @property
- def tool(self) -> str:
- return self._tool
-
-
-class CruExternalToolNotFoundError(CruExternalToolError):
- def __init__(self, message: str | None, tool: str, *args, **kwargs) -> None:
- super().__init__(
- message or f"Could not find binary for {tool}.", tool, *args, **kwargs
- )
-
-
-class CruExternalToolRunError(CruExternalToolError):
- def __init__(
- self,
- message: str,
- tool: str,
- tool_args: Iterable[str],
- tool_error: Any,
- *args,
- **kwargs,
- ) -> None:
- super().__init__(message, tool, *args, **kwargs)
- self._tool_args = list(tool_args)
- self._tool_error = tool_error
-
- @property
- def tool_args(self) -> list[str]:
- return self._tool_args
-
- @property
- def tool_error(self) -> Any:
- return self._tool_error
-
-
-class ExternalTool:
- def __init__(self, bin: str) -> None:
- self._bin = bin
-
- @property
- def bin(self) -> str:
- return self._bin
-
- @bin.setter
- def bin(self, value: str) -> None:
- self._bin = value
-
- @property
- def bin_path(self) -> str:
- real_bin = shutil.which(self.bin)
- if not real_bin:
- raise CruExternalToolNotFoundError(None, self.bin)
- return real_bin
-
- def run(
- self, *process_args: str, **subprocess_kwargs
- ) -> subprocess.CompletedProcess:
- try:
- return subprocess.run(
- [self.bin_path] + list(process_args), **subprocess_kwargs
- )
- except subprocess.CalledProcessError as e:
- raise CruExternalToolError("Subprocess failed.", self.bin) from e
- except OSError as e:
- raise CruExternalToolError("Failed to start subprocess", self.bin) from e
-
- def run_get_output(self, *process_args: str, **subprocess_kwargs) -> Any:
- process = self.run(*process_args, capture_output=True, **subprocess_kwargs)
- return process.stdout
diff --git a/tools/cru-py/cru/value.py b/tools/cru-py/cru/value.py
deleted file mode 100644
index 9c03219..0000000
--- a/tools/cru-py/cru/value.py
+++ /dev/null
@@ -1,292 +0,0 @@
-from __future__ import annotations
-
-import random
-import secrets
-import string
-import uuid
-from abc import abstractmethod, ABCMeta
-from collections.abc import Callable
-from typing import Any, ClassVar, TypeVar, Generic
-
-from ._error import CruException
-
-
-def _str_case_in(s: str, case: bool, str_list: list[str]) -> bool:
- if case:
- return s in str_list
- else:
- return s.lower() in [s.lower() for s in str_list]
-
-
-_T = TypeVar("_T")
-
-
-class CruValueTypeError(CruException):
- def __init__(
- self,
- message: str,
- value: Any,
- value_type: ValueType | None,
- *args,
- **kwargs,
- ):
- super().__init__(
- message,
- *args,
- **kwargs,
- )
- self._value = value
- self._value_type = value_type
-
- @property
- def value(self) -> Any:
- return self._value
-
- @property
- def value_type(self) -> ValueType | None:
- return self._value_type
-
-
-class ValueType(Generic[_T], metaclass=ABCMeta):
- def __init__(self, name: str, _type: type[_T]) -> None:
- self._name = name
- self._type = _type
-
- @property
- def name(self) -> str:
- return self._name
-
- @property
- def type(self) -> type[_T]:
- return self._type
-
- def check_value_type(self, value: Any) -> None:
- if not isinstance(value, self.type):
- raise CruValueTypeError("Type of value is wrong.", value, self)
-
- def _do_check_value(self, value: Any) -> _T:
- return value
-
- def check_value(self, value: Any) -> _T:
- self.check_value_type(value)
- return self._do_check_value(value)
-
- @abstractmethod
- def _do_check_str_format(self, s: str) -> None:
- raise NotImplementedError()
-
- def check_str_format(self, s: str) -> None:
- if not isinstance(s, str):
- raise CruValueTypeError("Try to check format on a non-str.", s, self)
- self._do_check_str_format(s)
-
- @abstractmethod
- def _do_convert_value_to_str(self, value: _T) -> str:
- raise NotImplementedError()
-
- def convert_value_to_str(self, value: _T) -> str:
- self.check_value(value)
- return self._do_convert_value_to_str(value)
-
- @abstractmethod
- def _do_convert_str_to_value(self, s: str) -> _T:
- raise NotImplementedError()
-
- def convert_str_to_value(self, s: str) -> _T:
- self.check_str_format(s)
- return self._do_convert_str_to_value(s)
-
- def check_value_or_try_convert_from_str(self, value_or_str: Any) -> _T:
- try:
- return self.check_value(value_or_str)
- except CruValueTypeError:
- if isinstance(value_or_str, str):
- return self.convert_str_to_value(value_or_str)
- else:
- raise
-
- def create_default_value(self) -> _T:
- return self.type()
-
-
-class TextValueType(ValueType[str]):
- def __init__(self) -> None:
- super().__init__("text", str)
-
- def _do_check_str_format(self, _s):
- return
-
- def _do_convert_value_to_str(self, value):
- return value
-
- def _do_convert_str_to_value(self, s):
- return s
-
-
-class IntegerValueType(ValueType[int]):
- def __init__(self) -> None:
- super().__init__("integer", int)
-
- def _do_check_str_format(self, s):
- try:
- int(s)
- except ValueError as e:
- raise CruValueTypeError("Invalid integer format.", s, self) from e
-
- def _do_convert_value_to_str(self, value):
- return str(value)
-
- def _do_convert_str_to_value(self, s):
- return int(s)
-
-
-class FloatValueType(ValueType[float]):
- def __init__(self) -> None:
- super().__init__("float", float)
-
- def _do_check_str_format(self, s):
- try:
- float(s)
- except ValueError as e:
- raise CruValueTypeError("Invalid float format.", s, self) from e
-
- def _do_convert_value_to_str(self, value):
- return str(value)
-
- def _do_convert_str_to_value(self, s):
- return float(s)
-
-
-class BooleanValueType(ValueType[bool]):
- DEFAULT_TRUE_LIST: ClassVar[list[str]] = ["true", "yes", "y", "on", "1"]
- DEFAULT_FALSE_LIST: ClassVar[list[str]] = ["false", "no", "n", "off", "0"]
-
- def __init__(
- self,
- *,
- case_sensitive=False,
- true_list: None | list[str] = None,
- false_list: None | list[str] = None,
- ) -> None:
- super().__init__("boolean", bool)
- self._case_sensitive = case_sensitive
- self._valid_true_strs: list[str] = (
- true_list or BooleanValueType.DEFAULT_TRUE_LIST
- )
- self._valid_false_strs: list[str] = (
- false_list or BooleanValueType.DEFAULT_FALSE_LIST
- )
-
- @property
- def case_sensitive(self) -> bool:
- return self._case_sensitive
-
- @property
- def valid_true_strs(self) -> list[str]:
- return self._valid_true_strs
-
- @property
- def valid_false_strs(self) -> list[str]:
- return self._valid_false_strs
-
- @property
- def valid_boolean_strs(self) -> list[str]:
- return self._valid_true_strs + self._valid_false_strs
-
- def _do_check_str_format(self, s):
- if not _str_case_in(s, self.case_sensitive, self.valid_boolean_strs):
- raise CruValueTypeError("Invalid boolean format.", s, self)
-
- def _do_convert_value_to_str(self, value):
- return self._valid_true_strs[0] if value else self._valid_false_strs[0]
-
- def _do_convert_str_to_value(self, s):
- return _str_case_in(s, self.case_sensitive, self._valid_true_strs)
-
- def create_default_value(self):
- return self.valid_false_strs[0]
-
-
-class EnumValueType(ValueType[str]):
- def __init__(self, valid_values: list[str], /, case_sensitive=False) -> None:
- super().__init__(f"enum({'|'.join(valid_values)})", str)
- self._case_sensitive = case_sensitive
- self._valid_values = valid_values
-
- @property
- def case_sensitive(self) -> bool:
- return self._case_sensitive
-
- @property
- def valid_values(self) -> list[str]:
- return self._valid_values
-
- def _do_check_value(self, value):
- self._do_check_str_format(value)
-
- def _do_check_str_format(self, s):
- if not _str_case_in(s, self.case_sensitive, self.valid_values):
- raise CruValueTypeError("Invalid enum value", s, self)
-
- def _do_convert_value_to_str(self, value):
- return value
-
- def _do_convert_str_to_value(self, s):
- return s
-
- def create_default_value(self):
- return self.valid_values[0]
-
-
-TEXT_VALUE_TYPE = TextValueType()
-INTEGER_VALUE_TYPE = IntegerValueType()
-BOOLEAN_VALUE_TYPE = BooleanValueType()
-
-
-class ValueGeneratorBase(Generic[_T], metaclass=ABCMeta):
- @abstractmethod
- def generate(self) -> _T:
- raise NotImplementedError()
-
- def __call__(self) -> _T:
- return self.generate()
-
-
-class ValueGenerator(ValueGeneratorBase[_T]):
- def __init__(self, generate_func: Callable[[], _T]) -> None:
- self._generate_func = generate_func
-
- @property
- def generate_func(self) -> Callable[[], _T]:
- return self._generate_func
-
- def generate(self) -> _T:
- return self._generate_func()
-
-
-class UuidValueGenerator(ValueGeneratorBase[str]):
- def generate(self):
- return str(uuid.uuid4())
-
-
-class RandomStringValueGenerator(ValueGeneratorBase[str]):
- def __init__(self, length: int, secure: bool) -> None:
- self._length = length
- self._secure = secure
-
- @property
- def length(self) -> int:
- return self._length
-
- @property
- def secure(self) -> bool:
- return self._secure
-
- def generate(self):
- random_func = secrets.choice if self._secure else random.choice
- characters = string.ascii_letters + string.digits
- random_string = "".join(random_func(characters) for _ in range(self._length))
- return random_string
-
-
-UUID_VALUE_GENERATOR = UuidValueGenerator()
diff --git a/tools/cru-py/poetry.lock b/tools/cru-py/poetry.lock
deleted file mode 100644
index 4338200..0000000
--- a/tools/cru-py/poetry.lock
+++ /dev/null
@@ -1,111 +0,0 @@
-# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
-
-[[package]]
-name = "mypy"
-version = "1.15.0"
-description = "Optional static typing for Python"
-optional = false
-python-versions = ">=3.9"
-groups = ["dev"]
-files = [
- {file = "mypy-1.15.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:979e4e1a006511dacf628e36fadfecbcc0160a8af6ca7dad2f5025529e082c13"},
- {file = "mypy-1.15.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c4bb0e1bd29f7d34efcccd71cf733580191e9a264a2202b0239da95984c5b559"},
- {file = "mypy-1.15.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:be68172e9fd9ad8fb876c6389f16d1c1b5f100ffa779f77b1fb2176fcc9ab95b"},
- {file = "mypy-1.15.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c7be1e46525adfa0d97681432ee9fcd61a3964c2446795714699a998d193f1a3"},
- {file = "mypy-1.15.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:2e2c2e6d3593f6451b18588848e66260ff62ccca522dd231cd4dd59b0160668b"},
- {file = "mypy-1.15.0-cp310-cp310-win_amd64.whl", hash = "sha256:6983aae8b2f653e098edb77f893f7b6aca69f6cffb19b2cc7443f23cce5f4828"},
- {file = "mypy-1.15.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:2922d42e16d6de288022e5ca321cd0618b238cfc5570e0263e5ba0a77dbef56f"},
- {file = "mypy-1.15.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2ee2d57e01a7c35de00f4634ba1bbf015185b219e4dc5909e281016df43f5ee5"},
- {file = "mypy-1.15.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:973500e0774b85d9689715feeffcc980193086551110fd678ebe1f4342fb7c5e"},
- {file = "mypy-1.15.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5a95fb17c13e29d2d5195869262f8125dfdb5c134dc8d9a9d0aecf7525b10c2c"},
- {file = "mypy-1.15.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:1905f494bfd7d85a23a88c5d97840888a7bd516545fc5aaedff0267e0bb54e2f"},
- {file = "mypy-1.15.0-cp311-cp311-win_amd64.whl", hash = "sha256:c9817fa23833ff189db061e6d2eff49b2f3b6ed9856b4a0a73046e41932d744f"},
- {file = "mypy-1.15.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:aea39e0583d05124836ea645f412e88a5c7d0fd77a6d694b60d9b6b2d9f184fd"},
- {file = "mypy-1.15.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:2f2147ab812b75e5b5499b01ade1f4a81489a147c01585cda36019102538615f"},
- {file = "mypy-1.15.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ce436f4c6d218a070048ed6a44c0bbb10cd2cc5e272b29e7845f6a2f57ee4464"},
- {file = "mypy-1.15.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8023ff13985661b50a5928fc7a5ca15f3d1affb41e5f0a9952cb68ef090b31ee"},
- {file = "mypy-1.15.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1124a18bc11a6a62887e3e137f37f53fbae476dc36c185d549d4f837a2a6a14e"},
- {file = "mypy-1.15.0-cp312-cp312-win_amd64.whl", hash = "sha256:171a9ca9a40cd1843abeca0e405bc1940cd9b305eaeea2dda769ba096932bb22"},
- {file = "mypy-1.15.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:93faf3fdb04768d44bf28693293f3904bbb555d076b781ad2530214ee53e3445"},
- {file = "mypy-1.15.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:811aeccadfb730024c5d3e326b2fbe9249bb7413553f15499a4050f7c30e801d"},
- {file = "mypy-1.15.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:98b7b9b9aedb65fe628c62a6dc57f6d5088ef2dfca37903a7d9ee374d03acca5"},
- {file = "mypy-1.15.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c43a7682e24b4f576d93072216bf56eeff70d9140241f9edec0c104d0c515036"},
- {file = "mypy-1.15.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:baefc32840a9f00babd83251560e0ae1573e2f9d1b067719479bfb0e987c6357"},
- {file = "mypy-1.15.0-cp313-cp313-win_amd64.whl", hash = "sha256:b9378e2c00146c44793c98b8d5a61039a048e31f429fb0eb546d93f4b000bedf"},
- {file = "mypy-1.15.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:e601a7fa172c2131bff456bb3ee08a88360760d0d2f8cbd7a75a65497e2df078"},
- {file = "mypy-1.15.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:712e962a6357634fef20412699a3655c610110e01cdaa6180acec7fc9f8513ba"},
- {file = "mypy-1.15.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f95579473af29ab73a10bada2f9722856792a36ec5af5399b653aa28360290a5"},
- {file = "mypy-1.15.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8f8722560a14cde92fdb1e31597760dc35f9f5524cce17836c0d22841830fd5b"},
- {file = "mypy-1.15.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:1fbb8da62dc352133d7d7ca90ed2fb0e9d42bb1a32724c287d3c76c58cbaa9c2"},
- {file = "mypy-1.15.0-cp39-cp39-win_amd64.whl", hash = "sha256:d10d994b41fb3497719bbf866f227b3489048ea4bbbb5015357db306249f7980"},
- {file = "mypy-1.15.0-py3-none-any.whl", hash = "sha256:5469affef548bd1895d86d3bf10ce2b44e33d86923c29e4d675b3e323437ea3e"},
- {file = "mypy-1.15.0.tar.gz", hash = "sha256:404534629d51d3efea5c800ee7c42b72a6554d6c400e6a79eafe15d11341fd43"},
-]
-
-[package.dependencies]
-mypy_extensions = ">=1.0.0"
-typing_extensions = ">=4.6.0"
-
-[package.extras]
-dmypy = ["psutil (>=4.0)"]
-faster-cache = ["orjson"]
-install-types = ["pip"]
-mypyc = ["setuptools (>=50)"]
-reports = ["lxml"]
-
-[[package]]
-name = "mypy-extensions"
-version = "1.0.0"
-description = "Type system extensions for programs checked with the mypy type checker."
-optional = false
-python-versions = ">=3.5"
-groups = ["dev"]
-files = [
- {file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"},
- {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"},
-]
-
-[[package]]
-name = "ruff"
-version = "0.9.6"
-description = "An extremely fast Python linter and code formatter, written in Rust."
-optional = false
-python-versions = ">=3.7"
-groups = ["dev"]
-files = [
- {file = "ruff-0.9.6-py3-none-linux_armv6l.whl", hash = "sha256:2f218f356dd2d995839f1941322ff021c72a492c470f0b26a34f844c29cdf5ba"},
- {file = "ruff-0.9.6-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b908ff4df65dad7b251c9968a2e4560836d8f5487c2f0cc238321ed951ea0504"},
- {file = "ruff-0.9.6-py3-none-macosx_11_0_arm64.whl", hash = "sha256:b109c0ad2ececf42e75fa99dc4043ff72a357436bb171900714a9ea581ddef83"},
- {file = "ruff-0.9.6-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1de4367cca3dac99bcbd15c161404e849bb0bfd543664db39232648dc00112dc"},
- {file = "ruff-0.9.6-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ac3ee4d7c2c92ddfdaedf0bf31b2b176fa7aa8950efc454628d477394d35638b"},
- {file = "ruff-0.9.6-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5dc1edd1775270e6aa2386119aea692039781429f0be1e0949ea5884e011aa8e"},
- {file = "ruff-0.9.6-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:4a091729086dffa4bd070aa5dab7e39cc6b9d62eb2bef8f3d91172d30d599666"},
- {file = "ruff-0.9.6-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d1bbc6808bf7b15796cef0815e1dfb796fbd383e7dbd4334709642649625e7c5"},
- {file = "ruff-0.9.6-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:589d1d9f25b5754ff230dce914a174a7c951a85a4e9270613a2b74231fdac2f5"},
- {file = "ruff-0.9.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc61dd5131742e21103fbbdcad683a8813be0e3c204472d520d9a5021ca8b217"},
- {file = "ruff-0.9.6-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5e2d9126161d0357e5c8f30b0bd6168d2c3872372f14481136d13de9937f79b6"},
- {file = "ruff-0.9.6-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:68660eab1a8e65babb5229a1f97b46e3120923757a68b5413d8561f8a85d4897"},
- {file = "ruff-0.9.6-py3-none-musllinux_1_2_i686.whl", hash = "sha256:c4cae6c4cc7b9b4017c71114115db0445b00a16de3bcde0946273e8392856f08"},
- {file = "ruff-0.9.6-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:19f505b643228b417c1111a2a536424ddde0db4ef9023b9e04a46ed8a1cb4656"},
- {file = "ruff-0.9.6-py3-none-win32.whl", hash = "sha256:194d8402bceef1b31164909540a597e0d913c0e4952015a5b40e28c146121b5d"},
- {file = "ruff-0.9.6-py3-none-win_amd64.whl", hash = "sha256:03482d5c09d90d4ee3f40d97578423698ad895c87314c4de39ed2af945633caa"},
- {file = "ruff-0.9.6-py3-none-win_arm64.whl", hash = "sha256:0e2bb706a2be7ddfea4a4af918562fdc1bcb16df255e5fa595bbd800ce322a5a"},
- {file = "ruff-0.9.6.tar.gz", hash = "sha256:81761592f72b620ec8fa1068a6fd00e98a5ebee342a3642efd84454f3031dca9"},
-]
-
-[[package]]
-name = "typing-extensions"
-version = "4.12.2"
-description = "Backported and Experimental Type Hints for Python 3.8+"
-optional = false
-python-versions = ">=3.8"
-groups = ["dev"]
-files = [
- {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"},
- {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"},
-]
-
-[metadata]
-lock-version = "2.1"
-python-versions = "^3.11"
-content-hash = "674a21dbda993a1ee761e2e6e2f13ccece8289336a83fd0a154285eac48f3a76"
diff --git a/tools/cru-py/pyproject.toml b/tools/cru-py/pyproject.toml
deleted file mode 100644
index 0ce2c60..0000000
--- a/tools/cru-py/pyproject.toml
+++ /dev/null
@@ -1,27 +0,0 @@
-[project]
-name = "cru-py"
-version = "0.1.0"
-requires-python = ">=3.11"
-
-[tool.poetry]
-package-mode = false
-name = "cru"
-version = "0.1.0"
-description = ""
-authors = ["Yuqian Yang <crupest@crupest.life>"]
-license = "MIT"
-readme = "README.md"
-
-[tool.poetry.dependencies]
-python = "^3.11"
-
-[tool.poetry.group.dev.dependencies]
-mypy = "^1.13.0"
-ruff = "^0.9.6"
-
-[tool.ruff.lint]
-select = ["E", "F", "B"]
-
-[build-system]
-requires = ["poetry-core"]
-build-backend = "poetry.core.masonry.api"
diff --git a/tools/cru-py/www-dev b/tools/cru-py/www-dev
deleted file mode 100644
index f56d679..0000000
--- a/tools/cru-py/www-dev
+++ /dev/null
@@ -1,8 +0,0 @@
-#! /usr/bin/env sh
-
-set -e
-
-cd "$(dirname "$0")/../.."
-
-exec tmux new-session 'cd docker/crupest-nginx/sites/www && pnpm start' \; \
- split-window -h 'cd docker/crupest-api/CrupestApi/CrupestApi && dotnet run --launch-profile dev'
diff --git a/tools/manage b/tools/manage
deleted file mode 100755
index dc7f64b..0000000
--- a/tools/manage
+++ /dev/null
@@ -1,16 +0,0 @@
-#!/usr/bin/env bash
-
-set -e
-
-python3.11 --version > /dev/null 2>&1 || (
- echo Error: failed to run Python with python3.11 --version.
- exit 1
-)
-
-script_dir=$(dirname "$0")
-project_dir=$(realpath "$script_dir/..")
-
-cd "$project_dir"
-
-export PYTHONPATH="$project_dir/tools/cru-py:$PYTHONPATH"
-python3.11 -m cru.service --project-dir "$project_dir" "$@"
diff --git a/tools/manage.cmd b/tools/manage.cmd
deleted file mode 100644
index fce913d..0000000
--- a/tools/manage.cmd
+++ /dev/null
@@ -1,15 +0,0 @@
-@echo off
-
-set PYTHON=py -3
-%PYTHON% --version >NUL 2>&1 || (
- echo Error: failed to run Python with py -3 --version.
- exit 1
-)
-
-set TOOLS_DIR=%~dp0
-set PROJECT_DIR=%TOOLS_DIR%..
-
-cd /d "%PROJECT_DIR%"
-
-set PYTHONPATH=%PROJECT_DIR%\tools\cru-py;%PYTHONPATH%
-%PYTHON% -m cru.service --project-dir "%PROJECT_DIR%" %*
diff --git a/tools/update-blog b/tools/update-blog
deleted file mode 100755
index 5314f47..0000000
--- a/tools/update-blog
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/usr/bin/env bash
-
-set -e
-
-exec docker exec -it blog /scripts/update.bash