diff options
| -rw-r--r-- | tools/cru-py/cru/__init__.py | 50 | ||||
| -rw-r--r-- | tools/cru-py/cru/_base.py | 4 | ||||
| -rw-r--r-- | tools/cru-py/cru/_error.py (renamed from tools/cru-py/cru/error.py) | 2 | ||||
| -rw-r--r-- | tools/cru-py/cru/_event.py | 18 | ||||
| -rw-r--r-- | tools/cru-py/cru/_helper.py | 16 | ||||
| -rw-r--r-- | tools/cru-py/cru/_iter.py | 3 | ||||
| -rw-r--r-- | tools/cru-py/cru/_lang.py | 16 | ||||
| -rw-r--r-- | tools/cru-py/cru/_path.py | 23 | ||||
| -rw-r--r-- | tools/cru-py/cru/_type.py | 2 | ||||
| -rw-r--r-- | tools/cru-py/cru/app.py (renamed from tools/cru-py/cru/paths.py) | 33 | ||||
| -rw-r--r-- | tools/cru-py/cru/attr.py | 2 | ||||
| -rw-r--r-- | tools/cru-py/cru/config.py | 147 | ||||
| -rw-r--r-- | tools/cru-py/cru/list.py (renamed from tools/cru-py/cru/_list.py) | 0 | ||||
| -rw-r--r-- | tools/cru-py/cru/parsing.py | 155 | ||||
| -rw-r--r-- | tools/cru-py/cru/property.py | 24 | ||||
| -rw-r--r-- | tools/cru-py/cru/service/nginx.py | 356 | ||||
| -rw-r--r-- | tools/cru-py/cru/system.py | 5 | ||||
| -rw-r--r-- | tools/cru-py/cru/template.py | 142 | ||||
| -rw-r--r-- | tools/cru-py/cru/value.py | 201 | 
19 files changed, 487 insertions, 712 deletions
| diff --git a/tools/cru-py/cru/__init__.py b/tools/cru-py/cru/__init__.py index 94d0d69..7c1a5f1 100644 --- a/tools/cru-py/cru/__init__.py +++ b/tools/cru-py/cru/__init__.py @@ -1,6 +1,26 @@  import sys -from ._base import CruException +from ._base import CRU, CruNamespaceError, CRU_NAME_PREFIXES +from ._error import ( +    cru_unreachable, +    CruException, +    CruUserFriendlyException, +    CruInternalError, +    CruUnreachableError, +) +from ._const import ( +    CruConstantBase, +    CruDontChange, +    CruNotFound, +    CruNoValue, +    CruPlaceholder, +    CruUseDefault, +) +from ._func import CruFunction +from ._iter import CruIterable, CruIterator +from ._event import CruEvent, CruEventHandlerToken +from ._path import CruPath, CruPathError +from ._type import CruTypeSet, CruTypeCheckError  class CruInitError(CruException): @@ -13,3 +33,31 @@ def check_python_version(required_version=(3, 11)):  check_python_version() + +__all__ = [ +    "CRU", +    "CruNamespaceError", +    "CRU_NAME_PREFIXES", +    "check_python_version", +    "CruException", +    "cru_unreachable", +    "CruInitError", +    "CruUserFriendlyException", +    "CruInternalError", +    "CruUnreachableError", +    "CruConstantBase", +    "CruDontChange", +    "CruNotFound", +    "CruNoValue", +    "CruPlaceholder", +    "CruUseDefault", +    "CruFunction", +    "CruIterable", +    "CruIterator", +    "CruEvent", +    "CruEventHandlerToken", +    "CruPath", +    "CruPathError", +    "CruTypeSet", +    "CruTypeCheckError", +] diff --git a/tools/cru-py/cru/_base.py b/tools/cru-py/cru/_base.py index 2310bfb..0a22df4 100644 --- a/tools/cru-py/cru/_base.py +++ b/tools/cru-py/cru/_base.py @@ -1,7 +1,7 @@  from typing import Any -from ._lang import remove_none -from .error import CruInternalError +from ._helper import remove_none +from ._error import CruInternalError  class CruNamespaceError(CruInternalError): diff --git a/tools/cru-py/cru/error.py b/tools/cru-py/cru/_error.py index 95edbd3..0d2bf79 100644 --- a/tools/cru-py/cru/error.py +++ b/tools/cru-py/cru/_error.py @@ -17,7 +17,7 @@ class CruInternalError(CruException):      """Raised when an internal logic error occurs.""" -class UserFriendlyException(CruException): +class CruUserFriendlyException(CruException):      def __init__(self, message: str, user_message: str, *args, **kwargs) -> None:          super().__init__(message, *args, **kwargs)          self._user_message = user_message diff --git a/tools/cru-py/cru/_event.py b/tools/cru-py/cru/_event.py index 65265fd..51a794c 100644 --- a/tools/cru-py/cru/_event.py +++ b/tools/cru-py/cru/_event.py @@ -3,22 +3,22 @@ from __future__ import annotations  from collections.abc import Callable  from typing import Generic, ParamSpec, TypeVar -from ._list import CruList +from .list import CruList  _P = ParamSpec("_P")  _R = TypeVar("_R") -class EventHandlerToken(Generic[_P, _R]): +class CruEventHandlerToken(Generic[_P, _R]):      def __init__( -        self, event: Event, handler: Callable[_P, _R], once: bool = False +        self, event: CruEvent, handler: Callable[_P, _R], once: bool = False      ) -> None:          self._event = event          self._handler = handler          self._once = once      @property -    def event(self) -> Event: +    def event(self) -> CruEvent:          return self._event      @property @@ -30,19 +30,19 @@ class EventHandlerToken(Generic[_P, _R]):          return self._once -class Event(Generic[_P, _R]): +class CruEvent(Generic[_P, _R]):      def __init__(self, name: str) -> None:          self._name = name -        self._tokens: CruList[EventHandlerToken] = CruList() +        self._tokens: CruList[CruEventHandlerToken] = CruList()      def register(          self, handler: Callable[_P, _R], once: bool = False -    ) -> EventHandlerToken: -        token = EventHandlerToken(self, handler, once) +    ) -> CruEventHandlerToken: +        token = CruEventHandlerToken(self, handler, once)          self._tokens.append(token)          return token -    def unregister(self, *handlers: EventHandlerToken | Callable[_P, _R]) -> int: +    def unregister(self, *handlers: CruEventHandlerToken | Callable[_P, _R]) -> int:          old_length = len(self._tokens)          self._tokens.reset(              self._tokens.as_cru_iterator().filter( diff --git a/tools/cru-py/cru/_helper.py b/tools/cru-py/cru/_helper.py new file mode 100644 index 0000000..43baf46 --- /dev/null +++ b/tools/cru-py/cru/_helper.py @@ -0,0 +1,16 @@ +from collections.abc import Callable +from typing import Any, Iterable, TypeVar, cast + +_T = TypeVar("_T") +_D = TypeVar("_D") + + +def remove_element( +    iterable: Iterable[_T | None], to_rm: Iterable[Any], des: type[_D] | None = None +) -> _D: +    to_rm = set(to_rm) +    return cast(Callable[..., _D], des or list)(v for v in iterable if v not in to_rm) + + +def remove_none(iterable: Iterable[_T | None], des: type[_D] | None = None) -> _D: +    return cast(Callable[..., _D], des or list)(v for v in iterable if v is not None) diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py index b91195b..12d1d1f 100644 --- a/tools/cru-py/cru/_iter.py +++ b/tools/cru-py/cru/_iter.py @@ -16,8 +16,9 @@ from typing import (      cast,  ) -from ._base import CRU, cru_unreachable +from ._base import CRU  from ._const import CruNotFound +from ._error import cru_unreachable  _P = ParamSpec("_P")  _T = TypeVar("_T") diff --git a/tools/cru-py/cru/_lang.py b/tools/cru-py/cru/_lang.py deleted file mode 100644 index 925ba00..0000000 --- a/tools/cru-py/cru/_lang.py +++ /dev/null @@ -1,16 +0,0 @@ -from collections.abc import Callable -from typing import Any, Iterable, TypeVar, cast - -T = TypeVar("T") -D = TypeVar("D") - - -def remove_element( -    iterable: Iterable[T | None], to_rm: Iterable[Any], des: type[D] | None = None -) -> D: -    to_rm = set(to_rm) -    return cast(Callable[..., D], des or list)(v for v in iterable if v not in to_rm) - - -def remove_none(iterable: Iterable[T | None], des: type[D] | None = None) -> D: -    return cast(Callable[..., D], des or list)(v for v in iterable if v is not None) diff --git a/tools/cru-py/cru/_path.py b/tools/cru-py/cru/_path.py new file mode 100644 index 0000000..a131c41 --- /dev/null +++ b/tools/cru-py/cru/_path.py @@ -0,0 +1,23 @@ +from pathlib import Path + +from ._error import CruException + + +class CruPathError(CruException): +    def __init__(self, message, _path: Path, *args, **kwargs): +        super().__init__(message, *args, **kwargs) +        self._path = _path + +    @property +    def path(self) -> Path: +        return self._path + + +class CruPath(Path): +    def check_parents_dir(self, must_exist: bool = False) -> bool: +        for p in reversed(self.parents): +            if not p.exists() and not must_exist: +                return False +            if not p.is_dir(): +                raise CruPathError("Parents path must be a dir.", self) +        return True diff --git a/tools/cru-py/cru/_type.py b/tools/cru-py/cru/_type.py index 0fd86a4..96d5d4b 100644 --- a/tools/cru-py/cru/_type.py +++ b/tools/cru-py/cru/_type.py @@ -1,7 +1,7 @@  from collections.abc import Iterable  from typing import Any -from ._base import CruException, CruInternalError +from ._error import CruException, CruInternalError  from ._iter import CruIterator diff --git a/tools/cru-py/cru/paths.py b/tools/cru-py/cru/app.py index cdd97fe..6a60926 100644 --- a/tools/cru-py/cru/paths.py +++ b/tools/cru-py/cru/app.py @@ -1,31 +1,32 @@  import os  from pathlib import Path -from .error import CruException +from ._error import CruException +from ._path import CruPath -class ApplicationPathError(CruException): -    def __init__(self, message: str, p: str | Path, *args, **kwargs): -        super().__init__(message, *args, path=str(p), **kwargs) +class CruApplication: +    def __init__(self, name: str) -> None: +        self._name = name -def check_parents_dir(p: str | Path, /, must_exist: bool = False) -> bool: -    p = Path(p) if isinstance(p, str) else p -    for p in reversed(p.parents): -        if not p.exists() and not must_exist: -            return False -        if not p.is_dir(): -            raise ApplicationPathError("Parents path should be a dir.", p) -    return True +class ApplicationPathError(CruException): +    def __init__(self, message, _path: Path, *args, **kwargs): +        super().__init__(message, *args, **kwargs) +        self._path = _path + +    @property +    def path(self) -> Path: +        return self._path  class ApplicationPath:      def __init__(self, p: str | Path, is_dir: bool) -> None: -        self._path = Path(p) if isinstance(p, str) else p +        self._path = CruPath(p)          self._is_dir = is_dir      @property -    def path(self) -> Path: +    def path(self) -> CruPath:          return self._path      @property @@ -33,7 +34,7 @@ class ApplicationPath:          return self._is_dir      def check_parents(self, must_exist: bool = False) -> bool: -        return check_parents_dir(self._path.parent, must_exist) +        return self._path.check_parents_dir(must_exist)      def check_self(self, must_exist: bool = False) -> bool:          if not self.check_parents(must_exist): @@ -41,7 +42,7 @@ class ApplicationPath:          if not self.path.exists():              if not must_exist:                  return False -            raise ApplicationPathError("Mot exist.", self.path) +            raise ApplicationPathError("Not exist.", self.path)          if self.is_dir:              if not self.path.is_dir():                  raise ApplicationPathError("Should be a directory, but not.", self.path) diff --git a/tools/cru-py/cru/attr.py b/tools/cru-py/cru/attr.py index 82f1eba..d4cc86a 100644 --- a/tools/cru-py/cru/attr.py +++ b/tools/cru-py/cru/attr.py @@ -5,7 +5,7 @@ from collections.abc import Callable, Iterable  from dataclasses import dataclass, field  from typing import Any -from ._list import CruUniqueKeyList +from .list import CruUniqueKeyList  from ._type import CruTypeSet  from ._const import CruNotFound, CruUseDefault, CruDontChange  from ._iter import CruIterator diff --git a/tools/cru-py/cru/config.py b/tools/cru-py/cru/config.py index 843f30a..3fd994b 100644 --- a/tools/cru-py/cru/config.py +++ b/tools/cru-py/cru/config.py @@ -1,22 +1,35 @@ -from typing import Any, TypeVar, Generic - -from .error import CruInternalLogicError -from .value import ValueType, ValueGenerator, ValidationError - -T = TypeVar("T") - - -class ConfigItem(Generic[T]): -    OptionalValueGenerator = ValueGenerator[T, []] | None - -    def __init__(self, name: str, description: str, value_type: ValueType[T], value: T | None, default_value: T, *, -                 value_generator: OptionalValueGenerator = None) -> None: +from typing import TypeVar, Generic +import copy + + +from .list import CruUniqueKeyList +from ._error import CruInternalError +from .value import ( +    CruValueTypeError, +    ValueGeneratorBase, +    ValueType, +) + +_T = TypeVar("_T") + + +class ConfigItem(Generic[_T]): +    def __init__( +        self, +        name: str, +        description: str, +        value_type: ValueType[_T], +        value: _T | None, +        default_value: _T, +        *, +        value_generator: ValueGeneratorBase | None = None, +    ) -> None:          self._name = name          self._description = description          self._value_type = value_type          self._default_value = default_value          self._value_generator = value_generator -        self._value: T | None = value +        self._value = value      @property      def name(self) -> str: @@ -27,11 +40,11 @@ class ConfigItem(Generic[T]):          return self._description      @property -    def value_type(self) -> ValueType[T]: +    def value_type(self) -> ValueType[_T]:          return self._value_type      @property -    def default_value(self) -> T: +    def default_value(self) -> _T:          return self._default_value      @property @@ -43,86 +56,42 @@ class ConfigItem(Generic[T]):          return not self.is_default      @property -    def value(self) -> T: +    def value(self) -> _T:          return self._value or self._default_value -    def set_value(self, v: T | str, /, allow_convert_from_str=False): +    @property +    def value_generator(self) -> ValueGeneratorBase | None: +        return self._value_generator + +    def set_value(self, v: _T | str, /, allow_convert_from_str=False):          if allow_convert_from_str:              self._value = self.value_type.check_value(v)          else:              self._value = self.value_type.check_value_or_try_convert_from_str(v) -    @value.setter -    def value(self, v: T) -> None: -        self.set_value(v) - -    @property -    def value_generator(self) -> OptionalValueGenerator: -        return self._value_generator - -    def generate_value(self, allow_interactive=False) -> T | None: -        if self.value_generator is None: return None -        if self.value_generator.interactive and not allow_interactive: +    def generate_value(self) -> _T | None: +        if self.value_generator is None:              return None -        else: -            v = self.generate_value() -            try: -                self.value_type.check_value(v) -                return v -            except ValidationError as e: -                raise CruInternalLogicError("Config value generator returns invalid value.", name=self.name, inner=e) +        v = self.generate_value() +        try: +            self.value_type.check_value(v) +            return v +        except CruValueTypeError as e: +            raise CruInternalError( +                "Config value generator returns invalid value." +            ) from e      def copy(self) -> "ConfigItem": -        return ConfigItem(self.name, self.description, self.value_type, -                          self._value.copy() if self._value is not None else None, self._default_value.copy(), -                          value_generator=self.value_generator) - - -class Configuration: -    def __init__(self, items: None | list[ConfigItem] = None) -> None: -        self._items: list[ConfigItem] = items or [] - -    @property -    def items(self) -> list[ConfigItem]: -        return self._items - -    @property -    def item_map(self) -> dict[str, ConfigItem]: -        return {i.name: i for i in self.items} - -    def get_optional_item(self, name: str) -> ConfigItem | None: -        for i in self.items: -            if i.name == name: -                return i -        return None - -    def clear(self) -> None: -        self._items.clear() - -    def has_item(self, name: str) -> bool: -        return self.get_optional_item(name) is not None - -    def add_item(self, item: ConfigItem): -        i = self.get_optional_item(item.name) -        if i is not None: -            raise CruInternalLogicError("Config item of the name already exists.", name=item.name) -        self.items.append(item) -        return item - -    def set_value(self, name: str, v: Any, /, allow_convert_from_str=False): -        i = self.get_optional_item(name) -        if i is None: -            raise CruInternalLogicError("No config item of the name. Can't set value.", name=name) -        i.set_value(v, allow_convert_from_str) - -    def copy(self) -> "Configuration": -        return Configuration([i.copy() for i in self.items]) - -    def __getitem__(self, name: str) -> ConfigItem: -        i = self.get_optional_item(name) -        if i is not None: -            return i -        raise CruInternalLogicError('No config item of the name.', name=name) - -    def __contains__(self, name: str): -        return self.has_item(name) +        return ConfigItem( +            self.name, +            self.description, +            self.value_type, +            copy.deepcopy(self._value) if self._value is not None else None, +            copy.deepcopy(self._default_value), +            value_generator=self.value_generator, +        ) + + +class Configuration(CruUniqueKeyList[ConfigItem, str]): +    def __init__(self): +        super().__init__(lambda c: c.name) diff --git a/tools/cru-py/cru/_list.py b/tools/cru-py/cru/list.py index c65c793..c65c793 100644 --- a/tools/cru-py/cru/_list.py +++ b/tools/cru-py/cru/list.py diff --git a/tools/cru-py/cru/parsing.py b/tools/cru-py/cru/parsing.py index be7bbf4..a9eee04 100644 --- a/tools/cru-py/cru/parsing.py +++ b/tools/cru-py/cru/parsing.py @@ -1,70 +1,85 @@ -from abc import ABCMeta, abstractmethod
 -from typing import TypeVar, Generic, NoReturn, Callable
 -
 -from cru.excp import CruException, CRU_EXCEPTION_ATTR_DEF_REGISTRY
 -
 -R = TypeVar("R")
 -
 -
 -class ParseException(CruException):
 -    LINE_NUMBER_KEY = "line_number"
 -
 -    CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with(LINE_NUMBER_KEY, "Line number of the error.")
 -
 -
 -class Parser(Generic[R], metaclass=ABCMeta):
 -    def __init__(self, name: str) -> None:
 -        self._name = name
 -
 -    @property
 -    def name(self) -> str:
 -        return self._name
 -
 -    @abstractmethod
 -    def parse(self, s: str) -> R:
 -        raise NotImplementedError()
 -
 -    def raise_parse_exception(self, s: str, line_number: int | None = None) -> NoReturn:
 -        a = f" at line {line_number}" if line_number is not None else ""
 -        raise ParseException(f"Parser {self.name} failed{a}, {s}")
 -
 -
 -class SimpleLineConfigParser(Parser[list[tuple[str, str]]]):
 -    def __init__(self) -> None:
 -        super().__init__(type(self).__name__)
 -
 -    def _parse(self, s: str, f: Callable[[str, str], None]) -> None:
 -        for ln, line in enumerate(s.splitlines()):
 -            line_number = ln + 1
 -            # check if it's a comment
 -            if line.strip().startswith("#"):
 -                continue
 -            # check if there is a '='
 -            if line.find("=") == -1:
 -                self.raise_parse_exception(f"There is even no '='!", line_number)
 -            # split at first '='
 -            key, value = line.split("=", 1)
 -            key = key.strip()
 -            value = value.strip()
 -            f(key, value)
 -
 -    def parse(self, s: str) -> list[tuple[str, str]]:
 -        items = []
 -        self._parse(s, lambda key, value: items.append((key, value)))
 -        return items
 -
 -    def parse_to_dict(self, s: str, /, allow_override: bool = False) -> tuple[dict[str, str], list[tuple[str, str]]]:
 -        d = {}
 -        duplicate = []
 -
 -        def add(key: str, value: str) -> None:
 -            if key in d:
 -                if allow_override:
 -                    duplicate.append((key, d[key]))
 -                    d[key] = value
 -                else:
 -                    self.raise_parse_exception(f"Key '{key}' already exists!", None)
 -            d[key] = value
 -
 -        self._parse(s, add)
 -        return d, duplicate
 +from abc import ABCMeta, abstractmethod +from typing import TypeVar, Generic, NoReturn, Callable + +from ._error import CruException + +_T = TypeVar("_T") + + +class ParseException(CruException): +    def __init__( +        self, message, text: str, line_number: int | None = None, *args, **kwargs +    ): +        super().__init__(message, *args, **kwargs) +        self._text = text +        self._line_number = line_number + +    @property +    def text(self) -> str: +        return self._text + +    @property +    def line_number(self) -> int | None: +        return self._line_number + + +class Parser(Generic[_T], metaclass=ABCMeta): +    def __init__(self, name: str) -> None: +        self._name = name + +    @property +    def name(self) -> str: +        return self._name + +    @abstractmethod +    def parse(self, s: str) -> _T: +        raise NotImplementedError() + +    def raise_parse_exception( +        self, text: str, line_number: int | None = None +    ) -> NoReturn: +        a = f" at line {line_number}" if line_number is not None else "" +        raise ParseException(f"Parser {self.name} failed{a}.", text, line_number) + + +class SimpleLineConfigParser(Parser[list[tuple[str, str]]]): +    def __init__(self) -> None: +        super().__init__(type(self).__name__) + +    def _parse(self, s: str, callback: Callable[[str, str], None]) -> None: +        for ln, line in enumerate(s.splitlines()): +            line_number = ln + 1 +            # check if it's a comment +            if line.strip().startswith("#"): +                continue +            # check if there is a '=' +            if line.find("=") == -1: +                self.raise_parse_exception("There is even no '='!", line_number) +            # split at first '=' +            key, value = line.split("=", 1) +            key = key.strip() +            value = value.strip() +            callback(key, value) + +    def parse(self, s: str) -> list[tuple[str, str]]: +        items = [] +        self._parse(s, lambda key, value: items.append((key, value))) +        return items + +    def parse_to_dict( +        self, s: str, /, allow_override: bool = False +    ) -> tuple[dict[str, str], list[tuple[str, str]]]: +        result: dict[str, str] = {} +        duplicate: list[tuple[str, str]] = [] + +        def add(key: str, value: str) -> None: +            if key in result: +                if allow_override: +                    duplicate.append((key, result[key])) +                    result[key] = value +                else: +                    self.raise_parse_exception(f"Key '{key}' already exists!", None) +            result[key] = value + +        self._parse(s, add) +        return result, duplicate diff --git a/tools/cru-py/cru/property.py b/tools/cru-py/cru/property.py deleted file mode 100644 index 9549731..0000000 --- a/tools/cru-py/cru/property.py +++ /dev/null @@ -1,24 +0,0 @@ -import json -from typing import Any - - -class PropertyItem: -    def __init__(self, value: Any): -        self._value = value - -    @property -    def value(self) -> Any: -        return self._value - -    @value.setter -    def value(self, value: Any): -        self._value = value - - -class PropertyTreeSection: -    def __init__(self, data: dict[str, Any] | None = None) -> None: -        self._data = data or {} - -class PropertyTree: -    def __init__(self, data: dict[str, Any] | None = None) -> None: -        self._data = data or {}
\ No newline at end of file diff --git a/tools/cru-py/cru/service/nginx.py b/tools/cru-py/cru/service/nginx.py index 94c4375..ad32cb9 100644 --- a/tools/cru-py/cru/service/nginx.py +++ b/tools/cru-py/cru/service/nginx.py @@ -4,7 +4,6 @@ import re  import subprocess  from typing import Literal, Any, cast, ClassVar -import jsonschema  def restart_nginx(force=False) -> bool: @@ -16,358 +15,3 @@ def restart_nginx(force=False) -> bool:              return False      subprocess.run(['docker', 'restart', 'nginx'])      return True - - -_server_schema_filename = "server.schema.json" - -with open(join(Paths.nginx2_template_dir, _server_schema_filename)) as f: -    server_json_schema = json.load(f) - - -_domain_template_filename = "domain.conf.template" - -NginxSourceFileType = Literal["global", "domain", "http", "https"] - - -class NginxSourceFile: -    def __init__(self, path: str) -> None: -        """ -        path: relative to nginx2_template_dir -        """ -        self._path = path -        is_template = path.endswith(".template") -        self._is_template = is_template -        filename = basename(path) -        self.name = filename[:-len(".template")] if is_template else filename -        if is_template: -            self._template = Template2.from_file( -                join(Paths.nginx2_template_dir, path)) -        else: -            with open(join(Paths.nginx2_template_dir, path)) as f: -                self._content = f.read() - -        self._scope: NginxSourceFileType = self._calc_scope() - -    @property -    def is_template(self) -> bool: -        return self._is_template - -    @property -    def content(self) -> str: -        if self._is_template: -            raise Exception(f"{self._path} is a template file") -        return self._content - -    @property -    def template(self) -> Template2: -        if not self._is_template: -            raise Exception(f"{self._path} is not a template file") -        return cast(Template2, self._template) - -    @property -    def global_target_filename(self) -> str: -        if self.scope != "global": -            raise Exception(f"{self._path} is not a global file") -        if self.is_template: -            return basename(self._path)[:-len(".template")] -        else: -            return basename(self._path) - -    def _calc_scope(self) -> NginxSourceFileType: -        f = basename(self._path) -        d = basename(dirname(self._path)) -        if f == _domain_template_filename: -            return "domain" -        elif d in ["global", "http", "https"]: -            return cast(Literal["global", "http", "https"], d) -        else: -            raise Exception(f"Unknown scope for {self._path}") - -    @property -    def scope(self) -> NginxSourceFileType: -        return self._scope - - -_domain_template_source = NginxSourceFile(_domain_template_filename) - -_client_max_body_size_source = NginxSourceFile( -    "global/client-max-body-size.conf") -_forbid_unknown_domain_source = NginxSourceFile( -    "global/forbid-unknown-domain.conf") -_ssl_template_source = NginxSourceFile("global/ssl.conf.template") -_websocket_source = NginxSourceFile("global/websocket.conf") - -_http_444_source = NginxSourceFile("http/444.segment") -_http_redirect_to_https_source = NginxSourceFile( -    "http/redirect-to-https.segment") - -_https_redirect_template_source = NginxSourceFile( -    "https/redirect.segment.template") -_https_reverse_proxy_template_source = NginxSourceFile( -    "https/reverse-proxy.segment.template") -_https_static_file_template_source = NginxSourceFile( -    "https/static-file.segment.template") -_https_static_file_no_strip_prefix_template_source = NginxSourceFile( -    "https/static-file.no-strip-prefix.segment.template") - - -class NginxService: -    def __init__(self, type: str, path: str) -> None: -        self.type = type -        self.path = path -        self._check_path(path) - -    @staticmethod -    def _check_path(path: str) -> None: -        assert isinstance(path, str) -        if path == "" or path == "/": -            return -        if not path.startswith("/"): -            raise UserFriendlyException("Service path should start with '/'.") -        if path.endswith("/"): -            raise UserFriendlyException( -                "Service path should not end with '/'.") - -    def generate_https_segment(self) -> str: -        raise NotImplementedError() - - -class NginxRedirectService(NginxService): -    def __init__(self, path: str, redirect_url: str, redirect_code: int = 307) -> None: -        if redirect_url.endswith("/"): -            raise UserFriendlyException( -                "Redirect URL should not end with '/'.") - -        super().__init__("redirect", path) - -        self.redirect_url = redirect_url -        self.redirect_code = redirect_code - -    def generate_https_segment(self) -> str: -        vars = { -            "PATH": self.path, -            "REDIRECT_CODE": self.redirect_code, -            "REDIRECT_URL": self.redirect_url -        } -        return _https_redirect_template_source.template.render(vars) - -    @staticmethod -    def from_json(json: dict[str, Any]) -> "NginxRedirectService": -        path = json["path"] -        redirect_url = json["to"] -        redirect_code = json.get("code", 307) -        assert isinstance(path, str) -        assert isinstance(redirect_url, str) -        assert isinstance(redirect_code, int) -        return NginxRedirectService(path, redirect_url, redirect_code) - - -class NginxReverseProxyService(NginxService): - -    _upstream_regex: ClassVar[re.Pattern[str]] = re.compile( -        r"^[-_0-9a-zA-Z]+:[0-9]+$") - -    def __init__(self, path: str, upstream: str) -> None: -        if not self._upstream_regex.match(upstream): -            raise UserFriendlyException( -                f"Invalid upstream format: {upstream}.") - -        super().__init__("reverse-proxy", path) - -        self.upstream = upstream - -    def generate_https_segment(self) -> str: -        vars = { -            "PATH": self.path, -            "UPSTREAM": self.upstream -        } -        return _https_reverse_proxy_template_source.template.render(vars) - -    @staticmethod -    def from_json(json: dict[str, Any]) -> "NginxReverseProxyService": -        path = json["path"] -        upstream = json["upstream"] -        assert isinstance(path, str) -        assert isinstance(upstream, str) -        return NginxReverseProxyService(path, upstream) - - -class NginxStaticFileService(NginxService): -    def __init__(self, path: str, root: str, no_strip_prefix: bool = False) -> None: -        super().__init__("static-file", path) - -        self.root = root -        self.no_strip_prefix = no_strip_prefix - -    def generate_https_segment(self) -> str: -        vars = { -            "PATH": self.path, -            "ROOT": self.root, -        } -        if self.no_strip_prefix: -            return _https_static_file_no_strip_prefix_template_source.template.render(vars) -        else: -            return _https_static_file_template_source.template.render(vars) - -    @staticmethod -    def from_json(json: dict[str, Any]) -> "NginxStaticFileService": -        path = json["path"] -        root = json["root"] -        no_strip_prefix = json.get("no_strip_prefix", False) -        assert isinstance(path, str) -        assert isinstance(root, str) -        assert isinstance(no_strip_prefix, bool) -        return NginxStaticFileService(path, root, no_strip_prefix) - - -def nginx_service_from_json(json: dict[str, Any]) -> NginxService: -    type = json["type"] -    if type == "redirect": -        return NginxRedirectService.from_json(json) -    elif type == "reverse-proxy": -        return NginxReverseProxyService.from_json(json) -    elif type == "static-file": -        return NginxStaticFileService.from_json(json) -    else: -        raise UserFriendlyException(f"Invalid crupest type: {type}.") - - -def _prepend_indent(text: str, indent: str = " " * 4) -> str: -    lines = text.split("\n") -    for i in range(len(lines)): -        if lines[i] != "": -            lines[i] = indent + lines[i] -    return "\n".join(lines) - - -class NginxDomain: -    def __init__(self, domain: str, services: list[NginxService] = []) -> None: -        self.domain = domain -        self.services = services - -    def add_service(self, service: NginxService) -> None: -        self.services.append(service) - -    def generate_http_segment(self) -> str: -        if len(self.services) == 0: -            return _http_444_source.content -        else: -            return _http_redirect_to_https_source.content - -    def generate_https_segment(self) -> str: -        return "\n\n".join([s.generate_https_segment() for s in self.services]) - -    def generate_config(self) -> str: -        vars = { -            "DOMAIN": self.domain, -            "HTTP_SEGMENT": _prepend_indent(self.generate_http_segment()), -            "HTTPS_SEGMENT": _prepend_indent(self.generate_https_segment()), -        } -        return _domain_template_source.template.render(vars) - -    def generate_config_file(self, path: str) -> None: -        with open(path, "w") as f: -            f.write(self.generate_config()) - -    @staticmethod -    def from_json(root_domain: str, json: dict[str, Any]) -> "NginxDomain": -        name = json["name"] -        assert isinstance(name, str) -        if name == "@" or name == "": -            domain = root_domain -        else: -            domain = f"{name}.{root_domain}" -        assert isinstance(json["services"], list) -        services = [nginx_service_from_json(s) for s in json["services"]] -        return NginxDomain(domain, services) - - -def check_nginx_config_schema(json: Any) -> None: -    jsonschema.validate(json, server_json_schema) - - -class NginxServer: -    def __init__(self, root_domain: str) -> None: -        self.root_domain = root_domain -        self.domains: list[NginxDomain] = [] - -    def add_sub_domain(self, sub_domain: str, services: list[NginxService]) -> None: -        if sub_domain == "" or sub_domain == "@": -            domain = self.root_domain -        else: -            domain = f"{sub_domain}.{self.root_domain}" -        self.domains.append(NginxDomain(domain, services)) - -    def generate_ssl(self) -> str: -        return _ssl_template_source.template.render({ -            "ROOT_DOMAIN": self.root_domain -        }) - -    def generate_global_files(self, d: str) -> None: -        for source in [_client_max_body_size_source, _forbid_unknown_domain_source, _websocket_source]: -            with open(join(d, source.name), "w") as f: -                f.write(source.content) -        with open(join(d, _ssl_template_source.name), "w") as f: -            f.write(self.generate_ssl()) - -    def generate_domain_files(self, d: str) -> None: -        for domain in self.domains: -            domain.generate_config_file(join(d, f"{domain.domain}.conf")) - -    def generate_config(self, d: str) -> None: -        create_dir_if_not_exists(d) -        self.generate_global_files(d) - -    def get_allowed_files(self) -> list[str]: -        files = [] -        for source in [_client_max_body_size_source, _forbid_unknown_domain_source, _ssl_template_source, _websocket_source]: -            files.append(source.name) -        for domain in self.domains: -            files.append(f"{domain.domain}.conf") -        return files - -    def check_bad_files(self, d: str) -> list[str]: -        allowed_files = self.get_allowed_files() -        bad_files = [] -        if not ensure_dir(d, must_exist=False): -            return [] -        for path in os.listdir(d): -            if path not in allowed_files: -                bad_files.append(path) -        return bad_files - -    @staticmethod -    def from_json(root_domain: str, json: dict[str, Any]) -> "NginxServer": -        check_nginx_config_schema(json) -        server = NginxServer(root_domain) -        sub_domains = json["domains"] -        assert isinstance(sub_domains, list) -        server.domains = [NginxDomain.from_json( -            root_domain, d) for d in sub_domains] -        return server - -    @staticmethod -    def from_json_str(root_domain: str, json_str: str) -> "NginxServer": -        return NginxServer.from_json(root_domain, json.loads(json_str)) - -    def go(self): -        bad_files = self.check_bad_files(Paths.nginx_generated_dir) -        if len(bad_files) > 0: -            console.print( -                "WARNING: It seems there are some bad conf files in the nginx config directory:", style="yellow") -            for bad_file in bad_files: -                console.print(bad_file, style=file_name_style) -            to_delete = Confirm.ask( -                "They will affect nginx in a [red]bad[/] way. Do you want to delete them?", default=True, console=console) -            if to_delete: -                for file in bad_files: -                    os.remove(join(Paths.nginx_generated_dir, file)) -        create_dir_if_not_exists(Paths.generated_dir) -        if not ensure_dir(Paths.nginx_generated_dir, must_exist=False): -            os.mkdir(Paths.nginx_generated_dir) -            console.print( -                f"Nginx config directory created at [magenta]{Paths.nginx_generated_dir}[/]", style="green") -        self.generate_config(Paths.nginx_generated_dir) -        console.print("Nginx config generated.", style="green") -        if restart_nginx(): -            console.print('Nginx restarted.', style="green") diff --git a/tools/cru-py/cru/system.py b/tools/cru-py/cru/system.py index 4c9de01..f321717 100644 --- a/tools/cru-py/cru/system.py +++ b/tools/cru-py/cru/system.py @@ -7,10 +7,11 @@ def check_debian_derivative_version(name: str) -> None | str:          return None      with open("/etc/os-release", "r") as f:          content = f.read() -        if not f"ID={name}" in content: +        if f"ID={name}" not in content:              return None          m = re.search(r'VERSION_ID="(.+)"', content) -        if m is None: return None +        if m is None: +            return None          return m.group(1) diff --git a/tools/cru-py/cru/template.py b/tools/cru-py/cru/template.py new file mode 100644 index 0000000..8e06418 --- /dev/null +++ b/tools/cru-py/cru/template.py @@ -0,0 +1,142 @@ +from collections.abc import Iterable, Mapping +import os +import os.path +from string import Template + +from ._error import CruException + + +class CruTemplateError(CruException): +    pass + + +class TemplateFile: +    def __init__(self, source: str, destination_path: str | None): +        self._source = source +        self._destination = destination_path +        self._template: Template | None = None + +    @property +    def source(self) -> str: +        return self._source + +    @property +    def destination(self) -> str | None: +        return self._destination + +    @destination.setter +    def destination(self, value: str | None) -> None: +        self._destination = value + +    @property +    def template(self) -> Template: +        if self._template is None: +            return self.reload_template() +        return self._template + +    def reload_template(self) -> Template: +        with open(self._source, "r") as f: +            self._template = Template(f.read()) +        return self._template + +    @property +    def variables(self) -> set[str]: +        return set(self.template.get_identifiers()) + +    def generate(self, variables: Mapping[str, str]) -> str: +        return self.template.substitute(variables) + +    def generate_to_destination(self, variables: Mapping[str, str]) -> None: +        if self._destination is None: +            raise CruTemplateError("No destination specified for this template.") +        with open(self._destination, "w") as f: +            f.write(self.generate(variables)) + + +class TemplateDirectory: +    def __init__( +        self, +        source: str, +        destination: str, +        exclude: Iterable[str], +        file_suffix: str = ".template", +    ): +        self._files: list[TemplateFile] | None = None +        self._source = source +        self._destination = destination +        self._exclude = [os.path.normpath(p) for p in exclude] +        self._file_suffix = file_suffix + +    @property +    def files(self) -> list[TemplateFile]: +        if self._files is None: +            return self.reload() +        else: +            return self._files + +    @property +    def source(self) -> str: +        return self._source + +    @property +    def destination(self) -> str: +        return self._destination + +    @property +    def exclude(self) -> list[str]: +        return self._exclude + +    @property +    def file_suffix(self) -> str: +        return self._file_suffix + +    @staticmethod +    def _scan_files( +        root_path: str, exclude: list[str], suffix: str | None +    ) -> Iterable[str]: +        for root, _dirs, files in os.walk(root_path): +            for file in files: +                if suffix is None or file.endswith(suffix): +                    path = os.path.join(root, file) +                    path = os.path.relpath(path, root_path) +                    if suffix is not None: +                        path = path[: -len(suffix)] +                    is_exclude = False +                    for exclude_path in exclude: +                        if path.startswith(exclude_path): +                            is_exclude = True +                            break +                    if not is_exclude: +                        yield path + +    def reload(self) -> list[TemplateFile]: +        if not os.path.isdir(self.source): +            raise CruTemplateError( +                f"Source directory {self.source} does not exist or is not a directory." +            ) +        files = self._scan_files(self.source, self.exclude, self.file_suffix) +        self._files = [ +            TemplateFile( +                os.path.join(self._source, file + self.file_suffix), +                os.path.join(self._destination, file), +            ) +            for file in files +        ] +        return self._files + +    @property +    def variables(self) -> set[str]: +        s = set() +        for file in self.files: +            s.update(file.variables) +        return s + +    def generate_to_destination(self, variables: Mapping[str, str]) -> None: +        for file in self.files: +            file.generate_to_destination(variables) + +    def extra_files_in_destination(self) -> Iterable[str]: +        source_files = set(os.path.relpath(f.source, self.source) for f in self.files) +        for file in self._scan_files(self.destination, self.exclude, None): +            if file not in source_files: +                yield file diff --git a/tools/cru-py/cru/value.py b/tools/cru-py/cru/value.py index 189f44f..4096362 100644 --- a/tools/cru-py/cru/value.py +++ b/tools/cru-py/cru/value.py @@ -5,23 +5,23 @@ import secrets  import string  import uuid  from abc import abstractmethod, ABCMeta -from collections.abc import Mapping, Callable -from typing import Any, ClassVar, Literal, TypeVar, Generic, ParamSpec +from collections.abc import Callable +from typing import Any, ClassVar, TypeVar, Generic -from .error import CruInternalError, CruException +from ._error import CruException -def _str_case_in(s: str, case: bool, l: list[str]) -> bool: +def _str_case_in(s: str, case: bool, str_list: list[str]) -> bool:      if case: -        return s in l +        return s in str_list      else: -        return s.lower() in [s.lower() for s in l] +        return s.lower() in [s.lower() for s in str_list] -T = TypeVar("T") +_T = TypeVar("_T") -class CruValueError(CruException): +class CruValueTypeError(CruException):      def __init__(          self,          message: str, @@ -47,17 +47,8 @@ class CruValueError(CruException):          return self._value_type -class CruValueValidationError(CruValueError): -    pass - - -class CruValueStringConversionError(CruValueError): -    pass - - -# TODO: Continue here tomorrow! -class ValueType(Generic[T], metaclass=ABCMeta): -    def __init__(self, name: str, _type: type[T]) -> None: +class ValueType(Generic[_T], metaclass=ABCMeta): +    def __init__(self, name: str, _type: type[_T]) -> None:          self._name = name          self._type = _type @@ -66,67 +57,61 @@ class ValueType(Generic[T], metaclass=ABCMeta):          return self._name      @property -    def type(self) -> type[T]: +    def type(self) -> type[_T]:          return self._type -    def check_value_type(self, value: Any) -> bool: -        return isinstance(value, self.type) +    def check_value_type(self, value: Any) -> None: +        if not isinstance(value, self.type): +            raise CruValueTypeError("Type of value is wrong.", value, self) -    def _do_check_value(self, value: Any) -> T: +    def _do_check_value(self, value: Any) -> _T:          return value -    def check_value(self, value: Any) -> T: -        if not isinstance(value, self.type): -            raise CruValueValidationError("Value type is wrong.", value, self) +    def check_value(self, value: Any) -> _T: +        self.check_value_type(value)          return self._do_check_value(value) -    def _do_check_str_format(self, s: str) -> bool | tuple[bool, str]: +    @abstractmethod +    def _do_check_str_format(self, s: str) -> None:          raise NotImplementedError()      def check_str_format(self, s: str) -> None: -        ok, err = self._do_check_str_format(s) -        if ok is None: -            raise CruInternalLogicError("_do_check_str_format should not return None.") -        if ok: -            return -        if err is None: -            err = "Invalid value str format." -        raise ValueStringConvertionError(err, s, value_type=self) +        if not isinstance(s, str): +            raise CruValueTypeError("Try to check format on a non-str.", s, self) +        self._do_check_str_format(s)      @abstractmethod -    def _do_convert_value_to_str(self, value: T) -> str: +    def _do_convert_value_to_str(self, value: _T) -> str:          raise NotImplementedError() -    def convert_value_to_str(self, value: T) -> str: +    def convert_value_to_str(self, value: _T) -> str:          self.check_value(value)          return self._do_convert_value_to_str(value)      @abstractmethod -    def _do_convert_str_to_value(self, s: str) -> T: +    def _do_convert_str_to_value(self, s: str) -> _T:          raise NotImplementedError() -    def convert_str_to_value(self, s: str) -> T: +    def convert_str_to_value(self, s: str) -> _T:          self.check_str_format(s)          return self._do_convert_str_to_value(s) -    def check_value_or_try_convert_from_str(self, value_or_str: Any) -> T: +    def check_value_or_try_convert_from_str(self, value_or_str: Any) -> _T:          try:              return self.check_value(value_or_str) -        except ValidationError as e: +        except CruValueTypeError:              if isinstance(value_or_str, str):                  return self.convert_str_to_value(value_or_str)              else: -                raise ValidationError( -                    "Value is not valid and is not a str.", value_or_str, self, inner=e -                ) +                raise  class TextValueType(ValueType[str]):      def __init__(self) -> None: -        super().__init__("text") +        super().__init__("text", str) -    def _do_check_str_format(self, s): -        return True +    def _do_check_str_format(self, _s): +        return      def _do_convert_value_to_str(self, value):          return value @@ -138,14 +123,13 @@ class TextValueType(ValueType[str]):  class IntegerValueType(ValueType[int]):      def __init__(self) -> None: -        super().__init__("integer") +        super().__init__("integer", int)      def _do_check_str_format(self, s):          try:              int(s) -            return True -        except ValueError: -            return False +        except ValueError as e: +            raise CruValueTypeError("Invalid integer format.", s, self) from e      def _do_convert_value_to_str(self, value):          return str(value) @@ -156,14 +140,13 @@ class IntegerValueType(ValueType[int]):  class FloatValueType(ValueType[float]):      def __init__(self) -> None: -        super().__init__("float") +        super().__init__("float", float)      def _do_check_str_format(self, s):          try:              float(s) -            return True -        except ValueError: -            return False +        except ValueError as e: +            raise CruValueTypeError("Invalid float format.", s, self) from e      def _do_convert_value_to_str(self, value):          return str(value) @@ -183,7 +166,7 @@ class BooleanValueType(ValueType[bool]):          true_list: None | list[str] = None,          false_list: None | list[str] = None,      ) -> None: -        super().__init__("boolean") +        super().__init__("boolean", bool)          self._case_sensitive = case_sensitive          self._valid_true_strs: list[str] = (              true_list or BooleanValueType.DEFAULT_TRUE_LIST @@ -209,15 +192,11 @@ class BooleanValueType(ValueType[bool]):          return self._valid_true_strs + self._valid_false_strs      def _do_check_str_format(self, s): -        if _str_case_in(s, self.case_sensitive, self.valid_boolean_strs): -            return True -        return ( -            False, -            f"Not a valid boolean string ({ValueType.case_sensitive_to_str(self.case_sensitive)}). Valid string of true: {' '.join(self._valid_true_strs)}. Valid string of false: {' '.join(self._valid_false_strs)}. All is case insensitive.", -        ) +        if not _str_case_in(s, self.case_sensitive, self.valid_boolean_strs): +            raise CruValueTypeError("Invalid boolean format.", s, self)      def _do_convert_value_to_str(self, value): -        return "True" if value else "False" +        return self._valid_true_strs[0] if value else self._valid_false_strs[0]      def _do_convert_str_to_value(self, s):          return _str_case_in(s, self.case_sensitive, self._valid_true_strs) @@ -225,9 +204,7 @@ class BooleanValueType(ValueType[bool]):  class EnumValueType(ValueType[str]):      def __init__(self, valid_values: list[str], /, case_sensitive=False) -> None: -        s = " | ".join([f'"{v}"' for v in valid_values]) -        self._valid_value_str = f"[ {s} ]" -        super().__init__(f"enum{self._valid_value_str}") +        super().__init__(f"enum({'|'.join(valid_values)})", str)          self._case_sensitive = case_sensitive          self._valid_values = valid_values @@ -240,16 +217,11 @@ class EnumValueType(ValueType[str]):          return self._valid_values      def _do_check_value(self, value): -        ok, err = self._do_check_str_format(value) -        return ok, (value if ok else err) +        self._do_check_str_format(value)      def _do_check_str_format(self, s): -        if _str_case_in(s, self.case_sensitive, self.valid_values): -            return True -        return ( -            False, -            f"Value is not in valid values ({ValueType.case_sensitive_to_str(self.case_sensitive)}): {self._valid_value_str}", -        ) +        if not _str_case_in(s, self.case_sensitive, self.valid_values): +            raise CruValueTypeError("Invalid enum value", s, self)      def _do_convert_value_to_str(self, value):          return value @@ -262,69 +234,52 @@ TEXT_VALUE_TYPE = TextValueType()  INTEGER_VALUE_TYPE = IntegerValueType()  BOOLEAN_VALUE_TYPE = BooleanValueType() -P = ParamSpec("P") +class ValueGeneratorBase(Generic[_T], metaclass=ABCMeta): +    @abstractmethod +    def generate(self) -> _T: +        raise NotImplementedError() -class ValueGenerator(Generic[T, P]): -    INTERACTIVE_KEY: ClassVar[Literal["interactive"]] = "interactive" +    def __call__(self) -> _T: +        return self.generate() -    def __init__( -        self, f: Callable[P, T], /, attributes: None | Mapping[str, Any] = None -    ) -> None: -        self._f = f -        self._attributes = attributes or {} -    @property -    def f(self) -> Callable[P, T]: -        return self._f +class ValueGenerator(ValueGeneratorBase[_T]): +    def __init__(self, generate_func: Callable[[], _T]) -> None: +        self._generate_func = generate_func      @property -    def attributes(self) -> Mapping[str, Any]: -        return self._attributes +    def generate_func(self) -> Callable[[], _T]: +        return self._generate_func -    def generate(self, *args, **kwargs) -> T: -        return self._f(*args, **kwargs) +    def generate(self) -> _T: +        return self._generate_func() -    def __call__(self, *args, **kwargs): -        return self._f(*args, **kwargs) -    @property -    def interactive(self) -> bool: -        return self._attributes.get(ValueGenerator.INTERACTIVE_KEY, False) - -    @staticmethod -    def create_interactive( -        f: Callable[P, T], -        interactive: bool = True, -        /, -        attributes: None | Mapping[str, Any] = None, -    ) -> "ValueGenerator[T, P]": -        return ValueGenerator( -            f, dict({ValueGenerator.INTERACTIVE_KEY: interactive}, **(attributes or {})) -        ) - - -class UuidValueGenerator(ValueGenerator[str, []]): -    def __init__(self) -> None: -        super().__init__(lambda: str(uuid.uuid4())) +class UuidValueGenerator(ValueGeneratorBase[str]): +    def generate(self): +        return str(uuid.uuid4()) -class RandomStringValueGenerator(ValueGenerator[str, []]): -    @staticmethod -    def _create_generate_ramdom_func(length: int, secure: bool) -> Callable[str, []]: -        random_choice = secrets.choice if secure else random.choice -        def generate_random_string(): -            characters = string.ascii_letters + string.digits -            random_string = "".join(random_choice(characters) for _ in range(length)) -            return random_string +class RandomStringValueGenerator(ValueGeneratorBase[str]): +    def __init__(self, length: int, secure: bool) -> None: +        self._length = length +        self._secure = secure -        return generate_random_string +    @property +    def length(self) -> int: +        return self._length -    def __init__(self, length: int, secure: bool) -> None: -        super().__init__( -            RandomStringValueGenerator._create_generate_ramdom_func(length, secure) -        ) +    @property +    def secure(self) -> bool: +        return self._secure + +    def generate(self): +        random_func = secrets.choice if self._secure else random.choice +        characters = string.ascii_letters + string.digits +        random_string = "".join(random_func(characters) for _ in range(self._length)) +        return random_string  UUID_VALUE_GENERATOR = UuidValueGenerator() | 
