aboutsummaryrefslogtreecommitdiff
path: root/tools/cru-py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/cru-py')
-rw-r--r--tools/cru-py/cru/__init__.py50
-rw-r--r--tools/cru-py/cru/_base.py4
-rw-r--r--tools/cru-py/cru/_error.py (renamed from tools/cru-py/cru/error.py)2
-rw-r--r--tools/cru-py/cru/_event.py18
-rw-r--r--tools/cru-py/cru/_helper.py16
-rw-r--r--tools/cru-py/cru/_iter.py3
-rw-r--r--tools/cru-py/cru/_lang.py16
-rw-r--r--tools/cru-py/cru/_path.py23
-rw-r--r--tools/cru-py/cru/_type.py2
-rw-r--r--tools/cru-py/cru/app.py (renamed from tools/cru-py/cru/paths.py)33
-rw-r--r--tools/cru-py/cru/attr.py2
-rw-r--r--tools/cru-py/cru/config.py147
-rw-r--r--tools/cru-py/cru/list.py (renamed from tools/cru-py/cru/_list.py)0
-rw-r--r--tools/cru-py/cru/parsing.py155
-rw-r--r--tools/cru-py/cru/property.py24
-rw-r--r--tools/cru-py/cru/service/nginx.py356
-rw-r--r--tools/cru-py/cru/system.py5
-rw-r--r--tools/cru-py/cru/template.py142
-rw-r--r--tools/cru-py/cru/value.py201
19 files changed, 487 insertions, 712 deletions
diff --git a/tools/cru-py/cru/__init__.py b/tools/cru-py/cru/__init__.py
index 94d0d69..7c1a5f1 100644
--- a/tools/cru-py/cru/__init__.py
+++ b/tools/cru-py/cru/__init__.py
@@ -1,6 +1,26 @@
import sys
-from ._base import CruException
+from ._base import CRU, CruNamespaceError, CRU_NAME_PREFIXES
+from ._error import (
+ cru_unreachable,
+ CruException,
+ CruUserFriendlyException,
+ CruInternalError,
+ CruUnreachableError,
+)
+from ._const import (
+ CruConstantBase,
+ CruDontChange,
+ CruNotFound,
+ CruNoValue,
+ CruPlaceholder,
+ CruUseDefault,
+)
+from ._func import CruFunction
+from ._iter import CruIterable, CruIterator
+from ._event import CruEvent, CruEventHandlerToken
+from ._path import CruPath, CruPathError
+from ._type import CruTypeSet, CruTypeCheckError
class CruInitError(CruException):
@@ -13,3 +33,31 @@ def check_python_version(required_version=(3, 11)):
check_python_version()
+
+__all__ = [
+ "CRU",
+ "CruNamespaceError",
+ "CRU_NAME_PREFIXES",
+ "check_python_version",
+ "CruException",
+ "cru_unreachable",
+ "CruInitError",
+ "CruUserFriendlyException",
+ "CruInternalError",
+ "CruUnreachableError",
+ "CruConstantBase",
+ "CruDontChange",
+ "CruNotFound",
+ "CruNoValue",
+ "CruPlaceholder",
+ "CruUseDefault",
+ "CruFunction",
+ "CruIterable",
+ "CruIterator",
+ "CruEvent",
+ "CruEventHandlerToken",
+ "CruPath",
+ "CruPathError",
+ "CruTypeSet",
+ "CruTypeCheckError",
+]
diff --git a/tools/cru-py/cru/_base.py b/tools/cru-py/cru/_base.py
index 2310bfb..0a22df4 100644
--- a/tools/cru-py/cru/_base.py
+++ b/tools/cru-py/cru/_base.py
@@ -1,7 +1,7 @@
from typing import Any
-from ._lang import remove_none
-from .error import CruInternalError
+from ._helper import remove_none
+from ._error import CruInternalError
class CruNamespaceError(CruInternalError):
diff --git a/tools/cru-py/cru/error.py b/tools/cru-py/cru/_error.py
index 95edbd3..0d2bf79 100644
--- a/tools/cru-py/cru/error.py
+++ b/tools/cru-py/cru/_error.py
@@ -17,7 +17,7 @@ class CruInternalError(CruException):
"""Raised when an internal logic error occurs."""
-class UserFriendlyException(CruException):
+class CruUserFriendlyException(CruException):
def __init__(self, message: str, user_message: str, *args, **kwargs) -> None:
super().__init__(message, *args, **kwargs)
self._user_message = user_message
diff --git a/tools/cru-py/cru/_event.py b/tools/cru-py/cru/_event.py
index 65265fd..51a794c 100644
--- a/tools/cru-py/cru/_event.py
+++ b/tools/cru-py/cru/_event.py
@@ -3,22 +3,22 @@ from __future__ import annotations
from collections.abc import Callable
from typing import Generic, ParamSpec, TypeVar
-from ._list import CruList
+from .list import CruList
_P = ParamSpec("_P")
_R = TypeVar("_R")
-class EventHandlerToken(Generic[_P, _R]):
+class CruEventHandlerToken(Generic[_P, _R]):
def __init__(
- self, event: Event, handler: Callable[_P, _R], once: bool = False
+ self, event: CruEvent, handler: Callable[_P, _R], once: bool = False
) -> None:
self._event = event
self._handler = handler
self._once = once
@property
- def event(self) -> Event:
+ def event(self) -> CruEvent:
return self._event
@property
@@ -30,19 +30,19 @@ class EventHandlerToken(Generic[_P, _R]):
return self._once
-class Event(Generic[_P, _R]):
+class CruEvent(Generic[_P, _R]):
def __init__(self, name: str) -> None:
self._name = name
- self._tokens: CruList[EventHandlerToken] = CruList()
+ self._tokens: CruList[CruEventHandlerToken] = CruList()
def register(
self, handler: Callable[_P, _R], once: bool = False
- ) -> EventHandlerToken:
- token = EventHandlerToken(self, handler, once)
+ ) -> CruEventHandlerToken:
+ token = CruEventHandlerToken(self, handler, once)
self._tokens.append(token)
return token
- def unregister(self, *handlers: EventHandlerToken | Callable[_P, _R]) -> int:
+ def unregister(self, *handlers: CruEventHandlerToken | Callable[_P, _R]) -> int:
old_length = len(self._tokens)
self._tokens.reset(
self._tokens.as_cru_iterator().filter(
diff --git a/tools/cru-py/cru/_helper.py b/tools/cru-py/cru/_helper.py
new file mode 100644
index 0000000..43baf46
--- /dev/null
+++ b/tools/cru-py/cru/_helper.py
@@ -0,0 +1,16 @@
+from collections.abc import Callable
+from typing import Any, Iterable, TypeVar, cast
+
+_T = TypeVar("_T")
+_D = TypeVar("_D")
+
+
+def remove_element(
+ iterable: Iterable[_T | None], to_rm: Iterable[Any], des: type[_D] | None = None
+) -> _D:
+ to_rm = set(to_rm)
+ return cast(Callable[..., _D], des or list)(v for v in iterable if v not in to_rm)
+
+
+def remove_none(iterable: Iterable[_T | None], des: type[_D] | None = None) -> _D:
+ return cast(Callable[..., _D], des or list)(v for v in iterable if v is not None)
diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py
index b91195b..12d1d1f 100644
--- a/tools/cru-py/cru/_iter.py
+++ b/tools/cru-py/cru/_iter.py
@@ -16,8 +16,9 @@ from typing import (
cast,
)
-from ._base import CRU, cru_unreachable
+from ._base import CRU
from ._const import CruNotFound
+from ._error import cru_unreachable
_P = ParamSpec("_P")
_T = TypeVar("_T")
diff --git a/tools/cru-py/cru/_lang.py b/tools/cru-py/cru/_lang.py
deleted file mode 100644
index 925ba00..0000000
--- a/tools/cru-py/cru/_lang.py
+++ /dev/null
@@ -1,16 +0,0 @@
-from collections.abc import Callable
-from typing import Any, Iterable, TypeVar, cast
-
-T = TypeVar("T")
-D = TypeVar("D")
-
-
-def remove_element(
- iterable: Iterable[T | None], to_rm: Iterable[Any], des: type[D] | None = None
-) -> D:
- to_rm = set(to_rm)
- return cast(Callable[..., D], des or list)(v for v in iterable if v not in to_rm)
-
-
-def remove_none(iterable: Iterable[T | None], des: type[D] | None = None) -> D:
- return cast(Callable[..., D], des or list)(v for v in iterable if v is not None)
diff --git a/tools/cru-py/cru/_path.py b/tools/cru-py/cru/_path.py
new file mode 100644
index 0000000..a131c41
--- /dev/null
+++ b/tools/cru-py/cru/_path.py
@@ -0,0 +1,23 @@
+from pathlib import Path
+
+from ._error import CruException
+
+
+class CruPathError(CruException):
+ def __init__(self, message, _path: Path, *args, **kwargs):
+ super().__init__(message, *args, **kwargs)
+ self._path = _path
+
+ @property
+ def path(self) -> Path:
+ return self._path
+
+
+class CruPath(Path):
+ def check_parents_dir(self, must_exist: bool = False) -> bool:
+ for p in reversed(self.parents):
+ if not p.exists() and not must_exist:
+ return False
+ if not p.is_dir():
+ raise CruPathError("Parents path must be a dir.", self)
+ return True
diff --git a/tools/cru-py/cru/_type.py b/tools/cru-py/cru/_type.py
index 0fd86a4..96d5d4b 100644
--- a/tools/cru-py/cru/_type.py
+++ b/tools/cru-py/cru/_type.py
@@ -1,7 +1,7 @@
from collections.abc import Iterable
from typing import Any
-from ._base import CruException, CruInternalError
+from ._error import CruException, CruInternalError
from ._iter import CruIterator
diff --git a/tools/cru-py/cru/paths.py b/tools/cru-py/cru/app.py
index cdd97fe..6a60926 100644
--- a/tools/cru-py/cru/paths.py
+++ b/tools/cru-py/cru/app.py
@@ -1,31 +1,32 @@
import os
from pathlib import Path
-from .error import CruException
+from ._error import CruException
+from ._path import CruPath
-class ApplicationPathError(CruException):
- def __init__(self, message: str, p: str | Path, *args, **kwargs):
- super().__init__(message, *args, path=str(p), **kwargs)
+class CruApplication:
+ def __init__(self, name: str) -> None:
+ self._name = name
-def check_parents_dir(p: str | Path, /, must_exist: bool = False) -> bool:
- p = Path(p) if isinstance(p, str) else p
- for p in reversed(p.parents):
- if not p.exists() and not must_exist:
- return False
- if not p.is_dir():
- raise ApplicationPathError("Parents path should be a dir.", p)
- return True
+class ApplicationPathError(CruException):
+ def __init__(self, message, _path: Path, *args, **kwargs):
+ super().__init__(message, *args, **kwargs)
+ self._path = _path
+
+ @property
+ def path(self) -> Path:
+ return self._path
class ApplicationPath:
def __init__(self, p: str | Path, is_dir: bool) -> None:
- self._path = Path(p) if isinstance(p, str) else p
+ self._path = CruPath(p)
self._is_dir = is_dir
@property
- def path(self) -> Path:
+ def path(self) -> CruPath:
return self._path
@property
@@ -33,7 +34,7 @@ class ApplicationPath:
return self._is_dir
def check_parents(self, must_exist: bool = False) -> bool:
- return check_parents_dir(self._path.parent, must_exist)
+ return self._path.check_parents_dir(must_exist)
def check_self(self, must_exist: bool = False) -> bool:
if not self.check_parents(must_exist):
@@ -41,7 +42,7 @@ class ApplicationPath:
if not self.path.exists():
if not must_exist:
return False
- raise ApplicationPathError("Mot exist.", self.path)
+ raise ApplicationPathError("Not exist.", self.path)
if self.is_dir:
if not self.path.is_dir():
raise ApplicationPathError("Should be a directory, but not.", self.path)
diff --git a/tools/cru-py/cru/attr.py b/tools/cru-py/cru/attr.py
index 82f1eba..d4cc86a 100644
--- a/tools/cru-py/cru/attr.py
+++ b/tools/cru-py/cru/attr.py
@@ -5,7 +5,7 @@ from collections.abc import Callable, Iterable
from dataclasses import dataclass, field
from typing import Any
-from ._list import CruUniqueKeyList
+from .list import CruUniqueKeyList
from ._type import CruTypeSet
from ._const import CruNotFound, CruUseDefault, CruDontChange
from ._iter import CruIterator
diff --git a/tools/cru-py/cru/config.py b/tools/cru-py/cru/config.py
index 843f30a..3fd994b 100644
--- a/tools/cru-py/cru/config.py
+++ b/tools/cru-py/cru/config.py
@@ -1,22 +1,35 @@
-from typing import Any, TypeVar, Generic
-
-from .error import CruInternalLogicError
-from .value import ValueType, ValueGenerator, ValidationError
-
-T = TypeVar("T")
-
-
-class ConfigItem(Generic[T]):
- OptionalValueGenerator = ValueGenerator[T, []] | None
-
- def __init__(self, name: str, description: str, value_type: ValueType[T], value: T | None, default_value: T, *,
- value_generator: OptionalValueGenerator = None) -> None:
+from typing import TypeVar, Generic
+import copy
+
+
+from .list import CruUniqueKeyList
+from ._error import CruInternalError
+from .value import (
+ CruValueTypeError,
+ ValueGeneratorBase,
+ ValueType,
+)
+
+_T = TypeVar("_T")
+
+
+class ConfigItem(Generic[_T]):
+ def __init__(
+ self,
+ name: str,
+ description: str,
+ value_type: ValueType[_T],
+ value: _T | None,
+ default_value: _T,
+ *,
+ value_generator: ValueGeneratorBase | None = None,
+ ) -> None:
self._name = name
self._description = description
self._value_type = value_type
self._default_value = default_value
self._value_generator = value_generator
- self._value: T | None = value
+ self._value = value
@property
def name(self) -> str:
@@ -27,11 +40,11 @@ class ConfigItem(Generic[T]):
return self._description
@property
- def value_type(self) -> ValueType[T]:
+ def value_type(self) -> ValueType[_T]:
return self._value_type
@property
- def default_value(self) -> T:
+ def default_value(self) -> _T:
return self._default_value
@property
@@ -43,86 +56,42 @@ class ConfigItem(Generic[T]):
return not self.is_default
@property
- def value(self) -> T:
+ def value(self) -> _T:
return self._value or self._default_value
- def set_value(self, v: T | str, /, allow_convert_from_str=False):
+ @property
+ def value_generator(self) -> ValueGeneratorBase | None:
+ return self._value_generator
+
+ def set_value(self, v: _T | str, /, allow_convert_from_str=False):
if allow_convert_from_str:
self._value = self.value_type.check_value(v)
else:
self._value = self.value_type.check_value_or_try_convert_from_str(v)
- @value.setter
- def value(self, v: T) -> None:
- self.set_value(v)
-
- @property
- def value_generator(self) -> OptionalValueGenerator:
- return self._value_generator
-
- def generate_value(self, allow_interactive=False) -> T | None:
- if self.value_generator is None: return None
- if self.value_generator.interactive and not allow_interactive:
+ def generate_value(self) -> _T | None:
+ if self.value_generator is None:
return None
- else:
- v = self.generate_value()
- try:
- self.value_type.check_value(v)
- return v
- except ValidationError as e:
- raise CruInternalLogicError("Config value generator returns invalid value.", name=self.name, inner=e)
+ v = self.generate_value()
+ try:
+ self.value_type.check_value(v)
+ return v
+ except CruValueTypeError as e:
+ raise CruInternalError(
+ "Config value generator returns invalid value."
+ ) from e
def copy(self) -> "ConfigItem":
- return ConfigItem(self.name, self.description, self.value_type,
- self._value.copy() if self._value is not None else None, self._default_value.copy(),
- value_generator=self.value_generator)
-
-
-class Configuration:
- def __init__(self, items: None | list[ConfigItem] = None) -> None:
- self._items: list[ConfigItem] = items or []
-
- @property
- def items(self) -> list[ConfigItem]:
- return self._items
-
- @property
- def item_map(self) -> dict[str, ConfigItem]:
- return {i.name: i for i in self.items}
-
- def get_optional_item(self, name: str) -> ConfigItem | None:
- for i in self.items:
- if i.name == name:
- return i
- return None
-
- def clear(self) -> None:
- self._items.clear()
-
- def has_item(self, name: str) -> bool:
- return self.get_optional_item(name) is not None
-
- def add_item(self, item: ConfigItem):
- i = self.get_optional_item(item.name)
- if i is not None:
- raise CruInternalLogicError("Config item of the name already exists.", name=item.name)
- self.items.append(item)
- return item
-
- def set_value(self, name: str, v: Any, /, allow_convert_from_str=False):
- i = self.get_optional_item(name)
- if i is None:
- raise CruInternalLogicError("No config item of the name. Can't set value.", name=name)
- i.set_value(v, allow_convert_from_str)
-
- def copy(self) -> "Configuration":
- return Configuration([i.copy() for i in self.items])
-
- def __getitem__(self, name: str) -> ConfigItem:
- i = self.get_optional_item(name)
- if i is not None:
- return i
- raise CruInternalLogicError('No config item of the name.', name=name)
-
- def __contains__(self, name: str):
- return self.has_item(name)
+ return ConfigItem(
+ self.name,
+ self.description,
+ self.value_type,
+ copy.deepcopy(self._value) if self._value is not None else None,
+ copy.deepcopy(self._default_value),
+ value_generator=self.value_generator,
+ )
+
+
+class Configuration(CruUniqueKeyList[ConfigItem, str]):
+ def __init__(self):
+ super().__init__(lambda c: c.name)
diff --git a/tools/cru-py/cru/_list.py b/tools/cru-py/cru/list.py
index c65c793..c65c793 100644
--- a/tools/cru-py/cru/_list.py
+++ b/tools/cru-py/cru/list.py
diff --git a/tools/cru-py/cru/parsing.py b/tools/cru-py/cru/parsing.py
index be7bbf4..a9eee04 100644
--- a/tools/cru-py/cru/parsing.py
+++ b/tools/cru-py/cru/parsing.py
@@ -1,70 +1,85 @@
-from abc import ABCMeta, abstractmethod
-from typing import TypeVar, Generic, NoReturn, Callable
-
-from cru.excp import CruException, CRU_EXCEPTION_ATTR_DEF_REGISTRY
-
-R = TypeVar("R")
-
-
-class ParseException(CruException):
- LINE_NUMBER_KEY = "line_number"
-
- CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with(LINE_NUMBER_KEY, "Line number of the error.")
-
-
-class Parser(Generic[R], metaclass=ABCMeta):
- def __init__(self, name: str) -> None:
- self._name = name
-
- @property
- def name(self) -> str:
- return self._name
-
- @abstractmethod
- def parse(self, s: str) -> R:
- raise NotImplementedError()
-
- def raise_parse_exception(self, s: str, line_number: int | None = None) -> NoReturn:
- a = f" at line {line_number}" if line_number is not None else ""
- raise ParseException(f"Parser {self.name} failed{a}, {s}")
-
-
-class SimpleLineConfigParser(Parser[list[tuple[str, str]]]):
- def __init__(self) -> None:
- super().__init__(type(self).__name__)
-
- def _parse(self, s: str, f: Callable[[str, str], None]) -> None:
- for ln, line in enumerate(s.splitlines()):
- line_number = ln + 1
- # check if it's a comment
- if line.strip().startswith("#"):
- continue
- # check if there is a '='
- if line.find("=") == -1:
- self.raise_parse_exception(f"There is even no '='!", line_number)
- # split at first '='
- key, value = line.split("=", 1)
- key = key.strip()
- value = value.strip()
- f(key, value)
-
- def parse(self, s: str) -> list[tuple[str, str]]:
- items = []
- self._parse(s, lambda key, value: items.append((key, value)))
- return items
-
- def parse_to_dict(self, s: str, /, allow_override: bool = False) -> tuple[dict[str, str], list[tuple[str, str]]]:
- d = {}
- duplicate = []
-
- def add(key: str, value: str) -> None:
- if key in d:
- if allow_override:
- duplicate.append((key, d[key]))
- d[key] = value
- else:
- self.raise_parse_exception(f"Key '{key}' already exists!", None)
- d[key] = value
-
- self._parse(s, add)
- return d, duplicate
+from abc import ABCMeta, abstractmethod
+from typing import TypeVar, Generic, NoReturn, Callable
+
+from ._error import CruException
+
+_T = TypeVar("_T")
+
+
+class ParseException(CruException):
+ def __init__(
+ self, message, text: str, line_number: int | None = None, *args, **kwargs
+ ):
+ super().__init__(message, *args, **kwargs)
+ self._text = text
+ self._line_number = line_number
+
+ @property
+ def text(self) -> str:
+ return self._text
+
+ @property
+ def line_number(self) -> int | None:
+ return self._line_number
+
+
+class Parser(Generic[_T], metaclass=ABCMeta):
+ def __init__(self, name: str) -> None:
+ self._name = name
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @abstractmethod
+ def parse(self, s: str) -> _T:
+ raise NotImplementedError()
+
+ def raise_parse_exception(
+ self, text: str, line_number: int | None = None
+ ) -> NoReturn:
+ a = f" at line {line_number}" if line_number is not None else ""
+ raise ParseException(f"Parser {self.name} failed{a}.", text, line_number)
+
+
+class SimpleLineConfigParser(Parser[list[tuple[str, str]]]):
+ def __init__(self) -> None:
+ super().__init__(type(self).__name__)
+
+ def _parse(self, s: str, callback: Callable[[str, str], None]) -> None:
+ for ln, line in enumerate(s.splitlines()):
+ line_number = ln + 1
+ # check if it's a comment
+ if line.strip().startswith("#"):
+ continue
+ # check if there is a '='
+ if line.find("=") == -1:
+ self.raise_parse_exception("There is even no '='!", line_number)
+ # split at first '='
+ key, value = line.split("=", 1)
+ key = key.strip()
+ value = value.strip()
+ callback(key, value)
+
+ def parse(self, s: str) -> list[tuple[str, str]]:
+ items = []
+ self._parse(s, lambda key, value: items.append((key, value)))
+ return items
+
+ def parse_to_dict(
+ self, s: str, /, allow_override: bool = False
+ ) -> tuple[dict[str, str], list[tuple[str, str]]]:
+ result: dict[str, str] = {}
+ duplicate: list[tuple[str, str]] = []
+
+ def add(key: str, value: str) -> None:
+ if key in result:
+ if allow_override:
+ duplicate.append((key, result[key]))
+ result[key] = value
+ else:
+ self.raise_parse_exception(f"Key '{key}' already exists!", None)
+ result[key] = value
+
+ self._parse(s, add)
+ return result, duplicate
diff --git a/tools/cru-py/cru/property.py b/tools/cru-py/cru/property.py
deleted file mode 100644
index 9549731..0000000
--- a/tools/cru-py/cru/property.py
+++ /dev/null
@@ -1,24 +0,0 @@
-import json
-from typing import Any
-
-
-class PropertyItem:
- def __init__(self, value: Any):
- self._value = value
-
- @property
- def value(self) -> Any:
- return self._value
-
- @value.setter
- def value(self, value: Any):
- self._value = value
-
-
-class PropertyTreeSection:
- def __init__(self, data: dict[str, Any] | None = None) -> None:
- self._data = data or {}
-
-class PropertyTree:
- def __init__(self, data: dict[str, Any] | None = None) -> None:
- self._data = data or {} \ No newline at end of file
diff --git a/tools/cru-py/cru/service/nginx.py b/tools/cru-py/cru/service/nginx.py
index 94c4375..ad32cb9 100644
--- a/tools/cru-py/cru/service/nginx.py
+++ b/tools/cru-py/cru/service/nginx.py
@@ -4,7 +4,6 @@ import re
import subprocess
from typing import Literal, Any, cast, ClassVar
-import jsonschema
def restart_nginx(force=False) -> bool:
@@ -16,358 +15,3 @@ def restart_nginx(force=False) -> bool:
return False
subprocess.run(['docker', 'restart', 'nginx'])
return True
-
-
-_server_schema_filename = "server.schema.json"
-
-with open(join(Paths.nginx2_template_dir, _server_schema_filename)) as f:
- server_json_schema = json.load(f)
-
-
-_domain_template_filename = "domain.conf.template"
-
-NginxSourceFileType = Literal["global", "domain", "http", "https"]
-
-
-class NginxSourceFile:
- def __init__(self, path: str) -> None:
- """
- path: relative to nginx2_template_dir
- """
- self._path = path
- is_template = path.endswith(".template")
- self._is_template = is_template
- filename = basename(path)
- self.name = filename[:-len(".template")] if is_template else filename
- if is_template:
- self._template = Template2.from_file(
- join(Paths.nginx2_template_dir, path))
- else:
- with open(join(Paths.nginx2_template_dir, path)) as f:
- self._content = f.read()
-
- self._scope: NginxSourceFileType = self._calc_scope()
-
- @property
- def is_template(self) -> bool:
- return self._is_template
-
- @property
- def content(self) -> str:
- if self._is_template:
- raise Exception(f"{self._path} is a template file")
- return self._content
-
- @property
- def template(self) -> Template2:
- if not self._is_template:
- raise Exception(f"{self._path} is not a template file")
- return cast(Template2, self._template)
-
- @property
- def global_target_filename(self) -> str:
- if self.scope != "global":
- raise Exception(f"{self._path} is not a global file")
- if self.is_template:
- return basename(self._path)[:-len(".template")]
- else:
- return basename(self._path)
-
- def _calc_scope(self) -> NginxSourceFileType:
- f = basename(self._path)
- d = basename(dirname(self._path))
- if f == _domain_template_filename:
- return "domain"
- elif d in ["global", "http", "https"]:
- return cast(Literal["global", "http", "https"], d)
- else:
- raise Exception(f"Unknown scope for {self._path}")
-
- @property
- def scope(self) -> NginxSourceFileType:
- return self._scope
-
-
-_domain_template_source = NginxSourceFile(_domain_template_filename)
-
-_client_max_body_size_source = NginxSourceFile(
- "global/client-max-body-size.conf")
-_forbid_unknown_domain_source = NginxSourceFile(
- "global/forbid-unknown-domain.conf")
-_ssl_template_source = NginxSourceFile("global/ssl.conf.template")
-_websocket_source = NginxSourceFile("global/websocket.conf")
-
-_http_444_source = NginxSourceFile("http/444.segment")
-_http_redirect_to_https_source = NginxSourceFile(
- "http/redirect-to-https.segment")
-
-_https_redirect_template_source = NginxSourceFile(
- "https/redirect.segment.template")
-_https_reverse_proxy_template_source = NginxSourceFile(
- "https/reverse-proxy.segment.template")
-_https_static_file_template_source = NginxSourceFile(
- "https/static-file.segment.template")
-_https_static_file_no_strip_prefix_template_source = NginxSourceFile(
- "https/static-file.no-strip-prefix.segment.template")
-
-
-class NginxService:
- def __init__(self, type: str, path: str) -> None:
- self.type = type
- self.path = path
- self._check_path(path)
-
- @staticmethod
- def _check_path(path: str) -> None:
- assert isinstance(path, str)
- if path == "" or path == "/":
- return
- if not path.startswith("/"):
- raise UserFriendlyException("Service path should start with '/'.")
- if path.endswith("/"):
- raise UserFriendlyException(
- "Service path should not end with '/'.")
-
- def generate_https_segment(self) -> str:
- raise NotImplementedError()
-
-
-class NginxRedirectService(NginxService):
- def __init__(self, path: str, redirect_url: str, redirect_code: int = 307) -> None:
- if redirect_url.endswith("/"):
- raise UserFriendlyException(
- "Redirect URL should not end with '/'.")
-
- super().__init__("redirect", path)
-
- self.redirect_url = redirect_url
- self.redirect_code = redirect_code
-
- def generate_https_segment(self) -> str:
- vars = {
- "PATH": self.path,
- "REDIRECT_CODE": self.redirect_code,
- "REDIRECT_URL": self.redirect_url
- }
- return _https_redirect_template_source.template.render(vars)
-
- @staticmethod
- def from_json(json: dict[str, Any]) -> "NginxRedirectService":
- path = json["path"]
- redirect_url = json["to"]
- redirect_code = json.get("code", 307)
- assert isinstance(path, str)
- assert isinstance(redirect_url, str)
- assert isinstance(redirect_code, int)
- return NginxRedirectService(path, redirect_url, redirect_code)
-
-
-class NginxReverseProxyService(NginxService):
-
- _upstream_regex: ClassVar[re.Pattern[str]] = re.compile(
- r"^[-_0-9a-zA-Z]+:[0-9]+$")
-
- def __init__(self, path: str, upstream: str) -> None:
- if not self._upstream_regex.match(upstream):
- raise UserFriendlyException(
- f"Invalid upstream format: {upstream}.")
-
- super().__init__("reverse-proxy", path)
-
- self.upstream = upstream
-
- def generate_https_segment(self) -> str:
- vars = {
- "PATH": self.path,
- "UPSTREAM": self.upstream
- }
- return _https_reverse_proxy_template_source.template.render(vars)
-
- @staticmethod
- def from_json(json: dict[str, Any]) -> "NginxReverseProxyService":
- path = json["path"]
- upstream = json["upstream"]
- assert isinstance(path, str)
- assert isinstance(upstream, str)
- return NginxReverseProxyService(path, upstream)
-
-
-class NginxStaticFileService(NginxService):
- def __init__(self, path: str, root: str, no_strip_prefix: bool = False) -> None:
- super().__init__("static-file", path)
-
- self.root = root
- self.no_strip_prefix = no_strip_prefix
-
- def generate_https_segment(self) -> str:
- vars = {
- "PATH": self.path,
- "ROOT": self.root,
- }
- if self.no_strip_prefix:
- return _https_static_file_no_strip_prefix_template_source.template.render(vars)
- else:
- return _https_static_file_template_source.template.render(vars)
-
- @staticmethod
- def from_json(json: dict[str, Any]) -> "NginxStaticFileService":
- path = json["path"]
- root = json["root"]
- no_strip_prefix = json.get("no_strip_prefix", False)
- assert isinstance(path, str)
- assert isinstance(root, str)
- assert isinstance(no_strip_prefix, bool)
- return NginxStaticFileService(path, root, no_strip_prefix)
-
-
-def nginx_service_from_json(json: dict[str, Any]) -> NginxService:
- type = json["type"]
- if type == "redirect":
- return NginxRedirectService.from_json(json)
- elif type == "reverse-proxy":
- return NginxReverseProxyService.from_json(json)
- elif type == "static-file":
- return NginxStaticFileService.from_json(json)
- else:
- raise UserFriendlyException(f"Invalid crupest type: {type}.")
-
-
-def _prepend_indent(text: str, indent: str = " " * 4) -> str:
- lines = text.split("\n")
- for i in range(len(lines)):
- if lines[i] != "":
- lines[i] = indent + lines[i]
- return "\n".join(lines)
-
-
-class NginxDomain:
- def __init__(self, domain: str, services: list[NginxService] = []) -> None:
- self.domain = domain
- self.services = services
-
- def add_service(self, service: NginxService) -> None:
- self.services.append(service)
-
- def generate_http_segment(self) -> str:
- if len(self.services) == 0:
- return _http_444_source.content
- else:
- return _http_redirect_to_https_source.content
-
- def generate_https_segment(self) -> str:
- return "\n\n".join([s.generate_https_segment() for s in self.services])
-
- def generate_config(self) -> str:
- vars = {
- "DOMAIN": self.domain,
- "HTTP_SEGMENT": _prepend_indent(self.generate_http_segment()),
- "HTTPS_SEGMENT": _prepend_indent(self.generate_https_segment()),
- }
- return _domain_template_source.template.render(vars)
-
- def generate_config_file(self, path: str) -> None:
- with open(path, "w") as f:
- f.write(self.generate_config())
-
- @staticmethod
- def from_json(root_domain: str, json: dict[str, Any]) -> "NginxDomain":
- name = json["name"]
- assert isinstance(name, str)
- if name == "@" or name == "":
- domain = root_domain
- else:
- domain = f"{name}.{root_domain}"
- assert isinstance(json["services"], list)
- services = [nginx_service_from_json(s) for s in json["services"]]
- return NginxDomain(domain, services)
-
-
-def check_nginx_config_schema(json: Any) -> None:
- jsonschema.validate(json, server_json_schema)
-
-
-class NginxServer:
- def __init__(self, root_domain: str) -> None:
- self.root_domain = root_domain
- self.domains: list[NginxDomain] = []
-
- def add_sub_domain(self, sub_domain: str, services: list[NginxService]) -> None:
- if sub_domain == "" or sub_domain == "@":
- domain = self.root_domain
- else:
- domain = f"{sub_domain}.{self.root_domain}"
- self.domains.append(NginxDomain(domain, services))
-
- def generate_ssl(self) -> str:
- return _ssl_template_source.template.render({
- "ROOT_DOMAIN": self.root_domain
- })
-
- def generate_global_files(self, d: str) -> None:
- for source in [_client_max_body_size_source, _forbid_unknown_domain_source, _websocket_source]:
- with open(join(d, source.name), "w") as f:
- f.write(source.content)
- with open(join(d, _ssl_template_source.name), "w") as f:
- f.write(self.generate_ssl())
-
- def generate_domain_files(self, d: str) -> None:
- for domain in self.domains:
- domain.generate_config_file(join(d, f"{domain.domain}.conf"))
-
- def generate_config(self, d: str) -> None:
- create_dir_if_not_exists(d)
- self.generate_global_files(d)
-
- def get_allowed_files(self) -> list[str]:
- files = []
- for source in [_client_max_body_size_source, _forbid_unknown_domain_source, _ssl_template_source, _websocket_source]:
- files.append(source.name)
- for domain in self.domains:
- files.append(f"{domain.domain}.conf")
- return files
-
- def check_bad_files(self, d: str) -> list[str]:
- allowed_files = self.get_allowed_files()
- bad_files = []
- if not ensure_dir(d, must_exist=False):
- return []
- for path in os.listdir(d):
- if path not in allowed_files:
- bad_files.append(path)
- return bad_files
-
- @staticmethod
- def from_json(root_domain: str, json: dict[str, Any]) -> "NginxServer":
- check_nginx_config_schema(json)
- server = NginxServer(root_domain)
- sub_domains = json["domains"]
- assert isinstance(sub_domains, list)
- server.domains = [NginxDomain.from_json(
- root_domain, d) for d in sub_domains]
- return server
-
- @staticmethod
- def from_json_str(root_domain: str, json_str: str) -> "NginxServer":
- return NginxServer.from_json(root_domain, json.loads(json_str))
-
- def go(self):
- bad_files = self.check_bad_files(Paths.nginx_generated_dir)
- if len(bad_files) > 0:
- console.print(
- "WARNING: It seems there are some bad conf files in the nginx config directory:", style="yellow")
- for bad_file in bad_files:
- console.print(bad_file, style=file_name_style)
- to_delete = Confirm.ask(
- "They will affect nginx in a [red]bad[/] way. Do you want to delete them?", default=True, console=console)
- if to_delete:
- for file in bad_files:
- os.remove(join(Paths.nginx_generated_dir, file))
- create_dir_if_not_exists(Paths.generated_dir)
- if not ensure_dir(Paths.nginx_generated_dir, must_exist=False):
- os.mkdir(Paths.nginx_generated_dir)
- console.print(
- f"Nginx config directory created at [magenta]{Paths.nginx_generated_dir}[/]", style="green")
- self.generate_config(Paths.nginx_generated_dir)
- console.print("Nginx config generated.", style="green")
- if restart_nginx():
- console.print('Nginx restarted.', style="green")
diff --git a/tools/cru-py/cru/system.py b/tools/cru-py/cru/system.py
index 4c9de01..f321717 100644
--- a/tools/cru-py/cru/system.py
+++ b/tools/cru-py/cru/system.py
@@ -7,10 +7,11 @@ def check_debian_derivative_version(name: str) -> None | str:
return None
with open("/etc/os-release", "r") as f:
content = f.read()
- if not f"ID={name}" in content:
+ if f"ID={name}" not in content:
return None
m = re.search(r'VERSION_ID="(.+)"', content)
- if m is None: return None
+ if m is None:
+ return None
return m.group(1)
diff --git a/tools/cru-py/cru/template.py b/tools/cru-py/cru/template.py
new file mode 100644
index 0000000..8e06418
--- /dev/null
+++ b/tools/cru-py/cru/template.py
@@ -0,0 +1,142 @@
+from collections.abc import Iterable, Mapping
+import os
+import os.path
+from string import Template
+
+from ._error import CruException
+
+
+class CruTemplateError(CruException):
+ pass
+
+
+class TemplateFile:
+ def __init__(self, source: str, destination_path: str | None):
+ self._source = source
+ self._destination = destination_path
+ self._template: Template | None = None
+
+ @property
+ def source(self) -> str:
+ return self._source
+
+ @property
+ def destination(self) -> str | None:
+ return self._destination
+
+ @destination.setter
+ def destination(self, value: str | None) -> None:
+ self._destination = value
+
+ @property
+ def template(self) -> Template:
+ if self._template is None:
+ return self.reload_template()
+ return self._template
+
+ def reload_template(self) -> Template:
+ with open(self._source, "r") as f:
+ self._template = Template(f.read())
+ return self._template
+
+ @property
+ def variables(self) -> set[str]:
+ return set(self.template.get_identifiers())
+
+ def generate(self, variables: Mapping[str, str]) -> str:
+ return self.template.substitute(variables)
+
+ def generate_to_destination(self, variables: Mapping[str, str]) -> None:
+ if self._destination is None:
+ raise CruTemplateError("No destination specified for this template.")
+ with open(self._destination, "w") as f:
+ f.write(self.generate(variables))
+
+
+class TemplateDirectory:
+ def __init__(
+ self,
+ source: str,
+ destination: str,
+ exclude: Iterable[str],
+ file_suffix: str = ".template",
+ ):
+ self._files: list[TemplateFile] | None = None
+ self._source = source
+ self._destination = destination
+ self._exclude = [os.path.normpath(p) for p in exclude]
+ self._file_suffix = file_suffix
+
+ @property
+ def files(self) -> list[TemplateFile]:
+ if self._files is None:
+ return self.reload()
+ else:
+ return self._files
+
+ @property
+ def source(self) -> str:
+ return self._source
+
+ @property
+ def destination(self) -> str:
+ return self._destination
+
+ @property
+ def exclude(self) -> list[str]:
+ return self._exclude
+
+ @property
+ def file_suffix(self) -> str:
+ return self._file_suffix
+
+ @staticmethod
+ def _scan_files(
+ root_path: str, exclude: list[str], suffix: str | None
+ ) -> Iterable[str]:
+ for root, _dirs, files in os.walk(root_path):
+ for file in files:
+ if suffix is None or file.endswith(suffix):
+ path = os.path.join(root, file)
+ path = os.path.relpath(path, root_path)
+ if suffix is not None:
+ path = path[: -len(suffix)]
+ is_exclude = False
+ for exclude_path in exclude:
+ if path.startswith(exclude_path):
+ is_exclude = True
+ break
+ if not is_exclude:
+ yield path
+
+ def reload(self) -> list[TemplateFile]:
+ if not os.path.isdir(self.source):
+ raise CruTemplateError(
+ f"Source directory {self.source} does not exist or is not a directory."
+ )
+ files = self._scan_files(self.source, self.exclude, self.file_suffix)
+ self._files = [
+ TemplateFile(
+ os.path.join(self._source, file + self.file_suffix),
+ os.path.join(self._destination, file),
+ )
+ for file in files
+ ]
+ return self._files
+
+ @property
+ def variables(self) -> set[str]:
+ s = set()
+ for file in self.files:
+ s.update(file.variables)
+ return s
+
+ def generate_to_destination(self, variables: Mapping[str, str]) -> None:
+ for file in self.files:
+ file.generate_to_destination(variables)
+
+ def extra_files_in_destination(self) -> Iterable[str]:
+ source_files = set(os.path.relpath(f.source, self.source) for f in self.files)
+ for file in self._scan_files(self.destination, self.exclude, None):
+ if file not in source_files:
+ yield file
diff --git a/tools/cru-py/cru/value.py b/tools/cru-py/cru/value.py
index 189f44f..4096362 100644
--- a/tools/cru-py/cru/value.py
+++ b/tools/cru-py/cru/value.py
@@ -5,23 +5,23 @@ import secrets
import string
import uuid
from abc import abstractmethod, ABCMeta
-from collections.abc import Mapping, Callable
-from typing import Any, ClassVar, Literal, TypeVar, Generic, ParamSpec
+from collections.abc import Callable
+from typing import Any, ClassVar, TypeVar, Generic
-from .error import CruInternalError, CruException
+from ._error import CruException
-def _str_case_in(s: str, case: bool, l: list[str]) -> bool:
+def _str_case_in(s: str, case: bool, str_list: list[str]) -> bool:
if case:
- return s in l
+ return s in str_list
else:
- return s.lower() in [s.lower() for s in l]
+ return s.lower() in [s.lower() for s in str_list]
-T = TypeVar("T")
+_T = TypeVar("_T")
-class CruValueError(CruException):
+class CruValueTypeError(CruException):
def __init__(
self,
message: str,
@@ -47,17 +47,8 @@ class CruValueError(CruException):
return self._value_type
-class CruValueValidationError(CruValueError):
- pass
-
-
-class CruValueStringConversionError(CruValueError):
- pass
-
-
-# TODO: Continue here tomorrow!
-class ValueType(Generic[T], metaclass=ABCMeta):
- def __init__(self, name: str, _type: type[T]) -> None:
+class ValueType(Generic[_T], metaclass=ABCMeta):
+ def __init__(self, name: str, _type: type[_T]) -> None:
self._name = name
self._type = _type
@@ -66,67 +57,61 @@ class ValueType(Generic[T], metaclass=ABCMeta):
return self._name
@property
- def type(self) -> type[T]:
+ def type(self) -> type[_T]:
return self._type
- def check_value_type(self, value: Any) -> bool:
- return isinstance(value, self.type)
+ def check_value_type(self, value: Any) -> None:
+ if not isinstance(value, self.type):
+ raise CruValueTypeError("Type of value is wrong.", value, self)
- def _do_check_value(self, value: Any) -> T:
+ def _do_check_value(self, value: Any) -> _T:
return value
- def check_value(self, value: Any) -> T:
- if not isinstance(value, self.type):
- raise CruValueValidationError("Value type is wrong.", value, self)
+ def check_value(self, value: Any) -> _T:
+ self.check_value_type(value)
return self._do_check_value(value)
- def _do_check_str_format(self, s: str) -> bool | tuple[bool, str]:
+ @abstractmethod
+ def _do_check_str_format(self, s: str) -> None:
raise NotImplementedError()
def check_str_format(self, s: str) -> None:
- ok, err = self._do_check_str_format(s)
- if ok is None:
- raise CruInternalLogicError("_do_check_str_format should not return None.")
- if ok:
- return
- if err is None:
- err = "Invalid value str format."
- raise ValueStringConvertionError(err, s, value_type=self)
+ if not isinstance(s, str):
+ raise CruValueTypeError("Try to check format on a non-str.", s, self)
+ self._do_check_str_format(s)
@abstractmethod
- def _do_convert_value_to_str(self, value: T) -> str:
+ def _do_convert_value_to_str(self, value: _T) -> str:
raise NotImplementedError()
- def convert_value_to_str(self, value: T) -> str:
+ def convert_value_to_str(self, value: _T) -> str:
self.check_value(value)
return self._do_convert_value_to_str(value)
@abstractmethod
- def _do_convert_str_to_value(self, s: str) -> T:
+ def _do_convert_str_to_value(self, s: str) -> _T:
raise NotImplementedError()
- def convert_str_to_value(self, s: str) -> T:
+ def convert_str_to_value(self, s: str) -> _T:
self.check_str_format(s)
return self._do_convert_str_to_value(s)
- def check_value_or_try_convert_from_str(self, value_or_str: Any) -> T:
+ def check_value_or_try_convert_from_str(self, value_or_str: Any) -> _T:
try:
return self.check_value(value_or_str)
- except ValidationError as e:
+ except CruValueTypeError:
if isinstance(value_or_str, str):
return self.convert_str_to_value(value_or_str)
else:
- raise ValidationError(
- "Value is not valid and is not a str.", value_or_str, self, inner=e
- )
+ raise
class TextValueType(ValueType[str]):
def __init__(self) -> None:
- super().__init__("text")
+ super().__init__("text", str)
- def _do_check_str_format(self, s):
- return True
+ def _do_check_str_format(self, _s):
+ return
def _do_convert_value_to_str(self, value):
return value
@@ -138,14 +123,13 @@ class TextValueType(ValueType[str]):
class IntegerValueType(ValueType[int]):
def __init__(self) -> None:
- super().__init__("integer")
+ super().__init__("integer", int)
def _do_check_str_format(self, s):
try:
int(s)
- return True
- except ValueError:
- return False
+ except ValueError as e:
+ raise CruValueTypeError("Invalid integer format.", s, self) from e
def _do_convert_value_to_str(self, value):
return str(value)
@@ -156,14 +140,13 @@ class IntegerValueType(ValueType[int]):
class FloatValueType(ValueType[float]):
def __init__(self) -> None:
- super().__init__("float")
+ super().__init__("float", float)
def _do_check_str_format(self, s):
try:
float(s)
- return True
- except ValueError:
- return False
+ except ValueError as e:
+ raise CruValueTypeError("Invalid float format.", s, self) from e
def _do_convert_value_to_str(self, value):
return str(value)
@@ -183,7 +166,7 @@ class BooleanValueType(ValueType[bool]):
true_list: None | list[str] = None,
false_list: None | list[str] = None,
) -> None:
- super().__init__("boolean")
+ super().__init__("boolean", bool)
self._case_sensitive = case_sensitive
self._valid_true_strs: list[str] = (
true_list or BooleanValueType.DEFAULT_TRUE_LIST
@@ -209,15 +192,11 @@ class BooleanValueType(ValueType[bool]):
return self._valid_true_strs + self._valid_false_strs
def _do_check_str_format(self, s):
- if _str_case_in(s, self.case_sensitive, self.valid_boolean_strs):
- return True
- return (
- False,
- f"Not a valid boolean string ({ValueType.case_sensitive_to_str(self.case_sensitive)}). Valid string of true: {' '.join(self._valid_true_strs)}. Valid string of false: {' '.join(self._valid_false_strs)}. All is case insensitive.",
- )
+ if not _str_case_in(s, self.case_sensitive, self.valid_boolean_strs):
+ raise CruValueTypeError("Invalid boolean format.", s, self)
def _do_convert_value_to_str(self, value):
- return "True" if value else "False"
+ return self._valid_true_strs[0] if value else self._valid_false_strs[0]
def _do_convert_str_to_value(self, s):
return _str_case_in(s, self.case_sensitive, self._valid_true_strs)
@@ -225,9 +204,7 @@ class BooleanValueType(ValueType[bool]):
class EnumValueType(ValueType[str]):
def __init__(self, valid_values: list[str], /, case_sensitive=False) -> None:
- s = " | ".join([f'"{v}"' for v in valid_values])
- self._valid_value_str = f"[ {s} ]"
- super().__init__(f"enum{self._valid_value_str}")
+ super().__init__(f"enum({'|'.join(valid_values)})", str)
self._case_sensitive = case_sensitive
self._valid_values = valid_values
@@ -240,16 +217,11 @@ class EnumValueType(ValueType[str]):
return self._valid_values
def _do_check_value(self, value):
- ok, err = self._do_check_str_format(value)
- return ok, (value if ok else err)
+ self._do_check_str_format(value)
def _do_check_str_format(self, s):
- if _str_case_in(s, self.case_sensitive, self.valid_values):
- return True
- return (
- False,
- f"Value is not in valid values ({ValueType.case_sensitive_to_str(self.case_sensitive)}): {self._valid_value_str}",
- )
+ if not _str_case_in(s, self.case_sensitive, self.valid_values):
+ raise CruValueTypeError("Invalid enum value", s, self)
def _do_convert_value_to_str(self, value):
return value
@@ -262,69 +234,52 @@ TEXT_VALUE_TYPE = TextValueType()
INTEGER_VALUE_TYPE = IntegerValueType()
BOOLEAN_VALUE_TYPE = BooleanValueType()
-P = ParamSpec("P")
+class ValueGeneratorBase(Generic[_T], metaclass=ABCMeta):
+ @abstractmethod
+ def generate(self) -> _T:
+ raise NotImplementedError()
-class ValueGenerator(Generic[T, P]):
- INTERACTIVE_KEY: ClassVar[Literal["interactive"]] = "interactive"
+ def __call__(self) -> _T:
+ return self.generate()
- def __init__(
- self, f: Callable[P, T], /, attributes: None | Mapping[str, Any] = None
- ) -> None:
- self._f = f
- self._attributes = attributes or {}
- @property
- def f(self) -> Callable[P, T]:
- return self._f
+class ValueGenerator(ValueGeneratorBase[_T]):
+ def __init__(self, generate_func: Callable[[], _T]) -> None:
+ self._generate_func = generate_func
@property
- def attributes(self) -> Mapping[str, Any]:
- return self._attributes
+ def generate_func(self) -> Callable[[], _T]:
+ return self._generate_func
- def generate(self, *args, **kwargs) -> T:
- return self._f(*args, **kwargs)
+ def generate(self) -> _T:
+ return self._generate_func()
- def __call__(self, *args, **kwargs):
- return self._f(*args, **kwargs)
- @property
- def interactive(self) -> bool:
- return self._attributes.get(ValueGenerator.INTERACTIVE_KEY, False)
-
- @staticmethod
- def create_interactive(
- f: Callable[P, T],
- interactive: bool = True,
- /,
- attributes: None | Mapping[str, Any] = None,
- ) -> "ValueGenerator[T, P]":
- return ValueGenerator(
- f, dict({ValueGenerator.INTERACTIVE_KEY: interactive}, **(attributes or {}))
- )
-
-
-class UuidValueGenerator(ValueGenerator[str, []]):
- def __init__(self) -> None:
- super().__init__(lambda: str(uuid.uuid4()))
+class UuidValueGenerator(ValueGeneratorBase[str]):
+ def generate(self):
+ return str(uuid.uuid4())
-class RandomStringValueGenerator(ValueGenerator[str, []]):
- @staticmethod
- def _create_generate_ramdom_func(length: int, secure: bool) -> Callable[str, []]:
- random_choice = secrets.choice if secure else random.choice
- def generate_random_string():
- characters = string.ascii_letters + string.digits
- random_string = "".join(random_choice(characters) for _ in range(length))
- return random_string
+class RandomStringValueGenerator(ValueGeneratorBase[str]):
+ def __init__(self, length: int, secure: bool) -> None:
+ self._length = length
+ self._secure = secure
- return generate_random_string
+ @property
+ def length(self) -> int:
+ return self._length
- def __init__(self, length: int, secure: bool) -> None:
- super().__init__(
- RandomStringValueGenerator._create_generate_ramdom_func(length, secure)
- )
+ @property
+ def secure(self) -> bool:
+ return self._secure
+
+ def generate(self):
+ random_func = secrets.choice if self._secure else random.choice
+ characters = string.ascii_letters + string.digits
+ random_string = "".join(random_func(characters) for _ in range(self._length))
+ return random_string
UUID_VALUE_GENERATOR = UuidValueGenerator()