diff options
author | crupest <crupest@outlook.com> | 2024-11-11 01:12:29 +0800 |
---|---|---|
committer | Yuqian Yang <crupest@crupest.life> | 2024-12-18 18:31:27 +0800 |
commit | 7b4d49e4bbdff6ddf1f8f7e937130e700024d5e9 (patch) | |
tree | b45e6cb4312b050ac4e1ccf51106d44af9201b40 /tools/cru-py/cru | |
parent | ca88aee42b741110d42683db826caf61b642abea (diff) | |
download | crupest-7b4d49e4bbdff6ddf1f8f7e937130e700024d5e9.tar.gz crupest-7b4d49e4bbdff6ddf1f8f7e937130e700024d5e9.tar.bz2 crupest-7b4d49e4bbdff6ddf1f8f7e937130e700024d5e9.zip |
HALF WORK: 2024.12.17
Diffstat (limited to 'tools/cru-py/cru')
-rw-r--r-- | tools/cru-py/cru/__init__.py | 50 | ||||
-rw-r--r-- | tools/cru-py/cru/_base.py | 4 | ||||
-rw-r--r-- | tools/cru-py/cru/_error.py (renamed from tools/cru-py/cru/error.py) | 2 | ||||
-rw-r--r-- | tools/cru-py/cru/_event.py | 18 | ||||
-rw-r--r-- | tools/cru-py/cru/_helper.py | 16 | ||||
-rw-r--r-- | tools/cru-py/cru/_iter.py | 3 | ||||
-rw-r--r-- | tools/cru-py/cru/_lang.py | 16 | ||||
-rw-r--r-- | tools/cru-py/cru/_path.py | 23 | ||||
-rw-r--r-- | tools/cru-py/cru/_type.py | 2 | ||||
-rw-r--r-- | tools/cru-py/cru/app.py (renamed from tools/cru-py/cru/paths.py) | 33 | ||||
-rw-r--r-- | tools/cru-py/cru/attr.py | 2 | ||||
-rw-r--r-- | tools/cru-py/cru/config.py | 147 | ||||
-rw-r--r-- | tools/cru-py/cru/list.py (renamed from tools/cru-py/cru/_list.py) | 0 | ||||
-rw-r--r-- | tools/cru-py/cru/parsing.py | 155 | ||||
-rw-r--r-- | tools/cru-py/cru/property.py | 24 | ||||
-rw-r--r-- | tools/cru-py/cru/service/nginx.py | 356 | ||||
-rw-r--r-- | tools/cru-py/cru/system.py | 5 | ||||
-rw-r--r-- | tools/cru-py/cru/template.py | 142 | ||||
-rw-r--r-- | tools/cru-py/cru/value.py | 201 |
19 files changed, 487 insertions, 712 deletions
diff --git a/tools/cru-py/cru/__init__.py b/tools/cru-py/cru/__init__.py index 94d0d69..7c1a5f1 100644 --- a/tools/cru-py/cru/__init__.py +++ b/tools/cru-py/cru/__init__.py @@ -1,6 +1,26 @@ import sys -from ._base import CruException +from ._base import CRU, CruNamespaceError, CRU_NAME_PREFIXES +from ._error import ( + cru_unreachable, + CruException, + CruUserFriendlyException, + CruInternalError, + CruUnreachableError, +) +from ._const import ( + CruConstantBase, + CruDontChange, + CruNotFound, + CruNoValue, + CruPlaceholder, + CruUseDefault, +) +from ._func import CruFunction +from ._iter import CruIterable, CruIterator +from ._event import CruEvent, CruEventHandlerToken +from ._path import CruPath, CruPathError +from ._type import CruTypeSet, CruTypeCheckError class CruInitError(CruException): @@ -13,3 +33,31 @@ def check_python_version(required_version=(3, 11)): check_python_version() + +__all__ = [ + "CRU", + "CruNamespaceError", + "CRU_NAME_PREFIXES", + "check_python_version", + "CruException", + "cru_unreachable", + "CruInitError", + "CruUserFriendlyException", + "CruInternalError", + "CruUnreachableError", + "CruConstantBase", + "CruDontChange", + "CruNotFound", + "CruNoValue", + "CruPlaceholder", + "CruUseDefault", + "CruFunction", + "CruIterable", + "CruIterator", + "CruEvent", + "CruEventHandlerToken", + "CruPath", + "CruPathError", + "CruTypeSet", + "CruTypeCheckError", +] diff --git a/tools/cru-py/cru/_base.py b/tools/cru-py/cru/_base.py index 2310bfb..0a22df4 100644 --- a/tools/cru-py/cru/_base.py +++ b/tools/cru-py/cru/_base.py @@ -1,7 +1,7 @@ from typing import Any -from ._lang import remove_none -from .error import CruInternalError +from ._helper import remove_none +from ._error import CruInternalError class CruNamespaceError(CruInternalError): diff --git a/tools/cru-py/cru/error.py b/tools/cru-py/cru/_error.py index 95edbd3..0d2bf79 100644 --- a/tools/cru-py/cru/error.py +++ b/tools/cru-py/cru/_error.py @@ -17,7 +17,7 @@ class CruInternalError(CruException): """Raised when an internal logic error occurs.""" -class UserFriendlyException(CruException): +class CruUserFriendlyException(CruException): def __init__(self, message: str, user_message: str, *args, **kwargs) -> None: super().__init__(message, *args, **kwargs) self._user_message = user_message diff --git a/tools/cru-py/cru/_event.py b/tools/cru-py/cru/_event.py index 65265fd..51a794c 100644 --- a/tools/cru-py/cru/_event.py +++ b/tools/cru-py/cru/_event.py @@ -3,22 +3,22 @@ from __future__ import annotations from collections.abc import Callable from typing import Generic, ParamSpec, TypeVar -from ._list import CruList +from .list import CruList _P = ParamSpec("_P") _R = TypeVar("_R") -class EventHandlerToken(Generic[_P, _R]): +class CruEventHandlerToken(Generic[_P, _R]): def __init__( - self, event: Event, handler: Callable[_P, _R], once: bool = False + self, event: CruEvent, handler: Callable[_P, _R], once: bool = False ) -> None: self._event = event self._handler = handler self._once = once @property - def event(self) -> Event: + def event(self) -> CruEvent: return self._event @property @@ -30,19 +30,19 @@ class EventHandlerToken(Generic[_P, _R]): return self._once -class Event(Generic[_P, _R]): +class CruEvent(Generic[_P, _R]): def __init__(self, name: str) -> None: self._name = name - self._tokens: CruList[EventHandlerToken] = CruList() + self._tokens: CruList[CruEventHandlerToken] = CruList() def register( self, handler: Callable[_P, _R], once: bool = False - ) -> EventHandlerToken: - token = EventHandlerToken(self, handler, once) + ) -> CruEventHandlerToken: + token = CruEventHandlerToken(self, handler, once) self._tokens.append(token) return token - def unregister(self, *handlers: EventHandlerToken | Callable[_P, _R]) -> int: + def unregister(self, *handlers: CruEventHandlerToken | Callable[_P, _R]) -> int: old_length = len(self._tokens) self._tokens.reset( self._tokens.as_cru_iterator().filter( diff --git a/tools/cru-py/cru/_helper.py b/tools/cru-py/cru/_helper.py new file mode 100644 index 0000000..43baf46 --- /dev/null +++ b/tools/cru-py/cru/_helper.py @@ -0,0 +1,16 @@ +from collections.abc import Callable +from typing import Any, Iterable, TypeVar, cast + +_T = TypeVar("_T") +_D = TypeVar("_D") + + +def remove_element( + iterable: Iterable[_T | None], to_rm: Iterable[Any], des: type[_D] | None = None +) -> _D: + to_rm = set(to_rm) + return cast(Callable[..., _D], des or list)(v for v in iterable if v not in to_rm) + + +def remove_none(iterable: Iterable[_T | None], des: type[_D] | None = None) -> _D: + return cast(Callable[..., _D], des or list)(v for v in iterable if v is not None) diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py index b91195b..12d1d1f 100644 --- a/tools/cru-py/cru/_iter.py +++ b/tools/cru-py/cru/_iter.py @@ -16,8 +16,9 @@ from typing import ( cast, ) -from ._base import CRU, cru_unreachable +from ._base import CRU from ._const import CruNotFound +from ._error import cru_unreachable _P = ParamSpec("_P") _T = TypeVar("_T") diff --git a/tools/cru-py/cru/_lang.py b/tools/cru-py/cru/_lang.py deleted file mode 100644 index 925ba00..0000000 --- a/tools/cru-py/cru/_lang.py +++ /dev/null @@ -1,16 +0,0 @@ -from collections.abc import Callable -from typing import Any, Iterable, TypeVar, cast - -T = TypeVar("T") -D = TypeVar("D") - - -def remove_element( - iterable: Iterable[T | None], to_rm: Iterable[Any], des: type[D] | None = None -) -> D: - to_rm = set(to_rm) - return cast(Callable[..., D], des or list)(v for v in iterable if v not in to_rm) - - -def remove_none(iterable: Iterable[T | None], des: type[D] | None = None) -> D: - return cast(Callable[..., D], des or list)(v for v in iterable if v is not None) diff --git a/tools/cru-py/cru/_path.py b/tools/cru-py/cru/_path.py new file mode 100644 index 0000000..a131c41 --- /dev/null +++ b/tools/cru-py/cru/_path.py @@ -0,0 +1,23 @@ +from pathlib import Path + +from ._error import CruException + + +class CruPathError(CruException): + def __init__(self, message, _path: Path, *args, **kwargs): + super().__init__(message, *args, **kwargs) + self._path = _path + + @property + def path(self) -> Path: + return self._path + + +class CruPath(Path): + def check_parents_dir(self, must_exist: bool = False) -> bool: + for p in reversed(self.parents): + if not p.exists() and not must_exist: + return False + if not p.is_dir(): + raise CruPathError("Parents path must be a dir.", self) + return True diff --git a/tools/cru-py/cru/_type.py b/tools/cru-py/cru/_type.py index 0fd86a4..96d5d4b 100644 --- a/tools/cru-py/cru/_type.py +++ b/tools/cru-py/cru/_type.py @@ -1,7 +1,7 @@ from collections.abc import Iterable from typing import Any -from ._base import CruException, CruInternalError +from ._error import CruException, CruInternalError from ._iter import CruIterator diff --git a/tools/cru-py/cru/paths.py b/tools/cru-py/cru/app.py index cdd97fe..6a60926 100644 --- a/tools/cru-py/cru/paths.py +++ b/tools/cru-py/cru/app.py @@ -1,31 +1,32 @@ import os from pathlib import Path -from .error import CruException +from ._error import CruException +from ._path import CruPath -class ApplicationPathError(CruException): - def __init__(self, message: str, p: str | Path, *args, **kwargs): - super().__init__(message, *args, path=str(p), **kwargs) +class CruApplication: + def __init__(self, name: str) -> None: + self._name = name -def check_parents_dir(p: str | Path, /, must_exist: bool = False) -> bool: - p = Path(p) if isinstance(p, str) else p - for p in reversed(p.parents): - if not p.exists() and not must_exist: - return False - if not p.is_dir(): - raise ApplicationPathError("Parents path should be a dir.", p) - return True +class ApplicationPathError(CruException): + def __init__(self, message, _path: Path, *args, **kwargs): + super().__init__(message, *args, **kwargs) + self._path = _path + + @property + def path(self) -> Path: + return self._path class ApplicationPath: def __init__(self, p: str | Path, is_dir: bool) -> None: - self._path = Path(p) if isinstance(p, str) else p + self._path = CruPath(p) self._is_dir = is_dir @property - def path(self) -> Path: + def path(self) -> CruPath: return self._path @property @@ -33,7 +34,7 @@ class ApplicationPath: return self._is_dir def check_parents(self, must_exist: bool = False) -> bool: - return check_parents_dir(self._path.parent, must_exist) + return self._path.check_parents_dir(must_exist) def check_self(self, must_exist: bool = False) -> bool: if not self.check_parents(must_exist): @@ -41,7 +42,7 @@ class ApplicationPath: if not self.path.exists(): if not must_exist: return False - raise ApplicationPathError("Mot exist.", self.path) + raise ApplicationPathError("Not exist.", self.path) if self.is_dir: if not self.path.is_dir(): raise ApplicationPathError("Should be a directory, but not.", self.path) diff --git a/tools/cru-py/cru/attr.py b/tools/cru-py/cru/attr.py index 82f1eba..d4cc86a 100644 --- a/tools/cru-py/cru/attr.py +++ b/tools/cru-py/cru/attr.py @@ -5,7 +5,7 @@ from collections.abc import Callable, Iterable from dataclasses import dataclass, field from typing import Any -from ._list import CruUniqueKeyList +from .list import CruUniqueKeyList from ._type import CruTypeSet from ._const import CruNotFound, CruUseDefault, CruDontChange from ._iter import CruIterator diff --git a/tools/cru-py/cru/config.py b/tools/cru-py/cru/config.py index 843f30a..3fd994b 100644 --- a/tools/cru-py/cru/config.py +++ b/tools/cru-py/cru/config.py @@ -1,22 +1,35 @@ -from typing import Any, TypeVar, Generic - -from .error import CruInternalLogicError -from .value import ValueType, ValueGenerator, ValidationError - -T = TypeVar("T") - - -class ConfigItem(Generic[T]): - OptionalValueGenerator = ValueGenerator[T, []] | None - - def __init__(self, name: str, description: str, value_type: ValueType[T], value: T | None, default_value: T, *, - value_generator: OptionalValueGenerator = None) -> None: +from typing import TypeVar, Generic +import copy + + +from .list import CruUniqueKeyList +from ._error import CruInternalError +from .value import ( + CruValueTypeError, + ValueGeneratorBase, + ValueType, +) + +_T = TypeVar("_T") + + +class ConfigItem(Generic[_T]): + def __init__( + self, + name: str, + description: str, + value_type: ValueType[_T], + value: _T | None, + default_value: _T, + *, + value_generator: ValueGeneratorBase | None = None, + ) -> None: self._name = name self._description = description self._value_type = value_type self._default_value = default_value self._value_generator = value_generator - self._value: T | None = value + self._value = value @property def name(self) -> str: @@ -27,11 +40,11 @@ class ConfigItem(Generic[T]): return self._description @property - def value_type(self) -> ValueType[T]: + def value_type(self) -> ValueType[_T]: return self._value_type @property - def default_value(self) -> T: + def default_value(self) -> _T: return self._default_value @property @@ -43,86 +56,42 @@ class ConfigItem(Generic[T]): return not self.is_default @property - def value(self) -> T: + def value(self) -> _T: return self._value or self._default_value - def set_value(self, v: T | str, /, allow_convert_from_str=False): + @property + def value_generator(self) -> ValueGeneratorBase | None: + return self._value_generator + + def set_value(self, v: _T | str, /, allow_convert_from_str=False): if allow_convert_from_str: self._value = self.value_type.check_value(v) else: self._value = self.value_type.check_value_or_try_convert_from_str(v) - @value.setter - def value(self, v: T) -> None: - self.set_value(v) - - @property - def value_generator(self) -> OptionalValueGenerator: - return self._value_generator - - def generate_value(self, allow_interactive=False) -> T | None: - if self.value_generator is None: return None - if self.value_generator.interactive and not allow_interactive: + def generate_value(self) -> _T | None: + if self.value_generator is None: return None - else: - v = self.generate_value() - try: - self.value_type.check_value(v) - return v - except ValidationError as e: - raise CruInternalLogicError("Config value generator returns invalid value.", name=self.name, inner=e) + v = self.generate_value() + try: + self.value_type.check_value(v) + return v + except CruValueTypeError as e: + raise CruInternalError( + "Config value generator returns invalid value." + ) from e def copy(self) -> "ConfigItem": - return ConfigItem(self.name, self.description, self.value_type, - self._value.copy() if self._value is not None else None, self._default_value.copy(), - value_generator=self.value_generator) - - -class Configuration: - def __init__(self, items: None | list[ConfigItem] = None) -> None: - self._items: list[ConfigItem] = items or [] - - @property - def items(self) -> list[ConfigItem]: - return self._items - - @property - def item_map(self) -> dict[str, ConfigItem]: - return {i.name: i for i in self.items} - - def get_optional_item(self, name: str) -> ConfigItem | None: - for i in self.items: - if i.name == name: - return i - return None - - def clear(self) -> None: - self._items.clear() - - def has_item(self, name: str) -> bool: - return self.get_optional_item(name) is not None - - def add_item(self, item: ConfigItem): - i = self.get_optional_item(item.name) - if i is not None: - raise CruInternalLogicError("Config item of the name already exists.", name=item.name) - self.items.append(item) - return item - - def set_value(self, name: str, v: Any, /, allow_convert_from_str=False): - i = self.get_optional_item(name) - if i is None: - raise CruInternalLogicError("No config item of the name. Can't set value.", name=name) - i.set_value(v, allow_convert_from_str) - - def copy(self) -> "Configuration": - return Configuration([i.copy() for i in self.items]) - - def __getitem__(self, name: str) -> ConfigItem: - i = self.get_optional_item(name) - if i is not None: - return i - raise CruInternalLogicError('No config item of the name.', name=name) - - def __contains__(self, name: str): - return self.has_item(name) + return ConfigItem( + self.name, + self.description, + self.value_type, + copy.deepcopy(self._value) if self._value is not None else None, + copy.deepcopy(self._default_value), + value_generator=self.value_generator, + ) + + +class Configuration(CruUniqueKeyList[ConfigItem, str]): + def __init__(self): + super().__init__(lambda c: c.name) diff --git a/tools/cru-py/cru/_list.py b/tools/cru-py/cru/list.py index c65c793..c65c793 100644 --- a/tools/cru-py/cru/_list.py +++ b/tools/cru-py/cru/list.py diff --git a/tools/cru-py/cru/parsing.py b/tools/cru-py/cru/parsing.py index be7bbf4..a9eee04 100644 --- a/tools/cru-py/cru/parsing.py +++ b/tools/cru-py/cru/parsing.py @@ -1,70 +1,85 @@ -from abc import ABCMeta, abstractmethod
-from typing import TypeVar, Generic, NoReturn, Callable
-
-from cru.excp import CruException, CRU_EXCEPTION_ATTR_DEF_REGISTRY
-
-R = TypeVar("R")
-
-
-class ParseException(CruException):
- LINE_NUMBER_KEY = "line_number"
-
- CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with(LINE_NUMBER_KEY, "Line number of the error.")
-
-
-class Parser(Generic[R], metaclass=ABCMeta):
- def __init__(self, name: str) -> None:
- self._name = name
-
- @property
- def name(self) -> str:
- return self._name
-
- @abstractmethod
- def parse(self, s: str) -> R:
- raise NotImplementedError()
-
- def raise_parse_exception(self, s: str, line_number: int | None = None) -> NoReturn:
- a = f" at line {line_number}" if line_number is not None else ""
- raise ParseException(f"Parser {self.name} failed{a}, {s}")
-
-
-class SimpleLineConfigParser(Parser[list[tuple[str, str]]]):
- def __init__(self) -> None:
- super().__init__(type(self).__name__)
-
- def _parse(self, s: str, f: Callable[[str, str], None]) -> None:
- for ln, line in enumerate(s.splitlines()):
- line_number = ln + 1
- # check if it's a comment
- if line.strip().startswith("#"):
- continue
- # check if there is a '='
- if line.find("=") == -1:
- self.raise_parse_exception(f"There is even no '='!", line_number)
- # split at first '='
- key, value = line.split("=", 1)
- key = key.strip()
- value = value.strip()
- f(key, value)
-
- def parse(self, s: str) -> list[tuple[str, str]]:
- items = []
- self._parse(s, lambda key, value: items.append((key, value)))
- return items
-
- def parse_to_dict(self, s: str, /, allow_override: bool = False) -> tuple[dict[str, str], list[tuple[str, str]]]:
- d = {}
- duplicate = []
-
- def add(key: str, value: str) -> None:
- if key in d:
- if allow_override:
- duplicate.append((key, d[key]))
- d[key] = value
- else:
- self.raise_parse_exception(f"Key '{key}' already exists!", None)
- d[key] = value
-
- self._parse(s, add)
- return d, duplicate
+from abc import ABCMeta, abstractmethod +from typing import TypeVar, Generic, NoReturn, Callable + +from ._error import CruException + +_T = TypeVar("_T") + + +class ParseException(CruException): + def __init__( + self, message, text: str, line_number: int | None = None, *args, **kwargs + ): + super().__init__(message, *args, **kwargs) + self._text = text + self._line_number = line_number + + @property + def text(self) -> str: + return self._text + + @property + def line_number(self) -> int | None: + return self._line_number + + +class Parser(Generic[_T], metaclass=ABCMeta): + def __init__(self, name: str) -> None: + self._name = name + + @property + def name(self) -> str: + return self._name + + @abstractmethod + def parse(self, s: str) -> _T: + raise NotImplementedError() + + def raise_parse_exception( + self, text: str, line_number: int | None = None + ) -> NoReturn: + a = f" at line {line_number}" if line_number is not None else "" + raise ParseException(f"Parser {self.name} failed{a}.", text, line_number) + + +class SimpleLineConfigParser(Parser[list[tuple[str, str]]]): + def __init__(self) -> None: + super().__init__(type(self).__name__) + + def _parse(self, s: str, callback: Callable[[str, str], None]) -> None: + for ln, line in enumerate(s.splitlines()): + line_number = ln + 1 + # check if it's a comment + if line.strip().startswith("#"): + continue + # check if there is a '=' + if line.find("=") == -1: + self.raise_parse_exception("There is even no '='!", line_number) + # split at first '=' + key, value = line.split("=", 1) + key = key.strip() + value = value.strip() + callback(key, value) + + def parse(self, s: str) -> list[tuple[str, str]]: + items = [] + self._parse(s, lambda key, value: items.append((key, value))) + return items + + def parse_to_dict( + self, s: str, /, allow_override: bool = False + ) -> tuple[dict[str, str], list[tuple[str, str]]]: + result: dict[str, str] = {} + duplicate: list[tuple[str, str]] = [] + + def add(key: str, value: str) -> None: + if key in result: + if allow_override: + duplicate.append((key, result[key])) + result[key] = value + else: + self.raise_parse_exception(f"Key '{key}' already exists!", None) + result[key] = value + + self._parse(s, add) + return result, duplicate diff --git a/tools/cru-py/cru/property.py b/tools/cru-py/cru/property.py deleted file mode 100644 index 9549731..0000000 --- a/tools/cru-py/cru/property.py +++ /dev/null @@ -1,24 +0,0 @@ -import json -from typing import Any - - -class PropertyItem: - def __init__(self, value: Any): - self._value = value - - @property - def value(self) -> Any: - return self._value - - @value.setter - def value(self, value: Any): - self._value = value - - -class PropertyTreeSection: - def __init__(self, data: dict[str, Any] | None = None) -> None: - self._data = data or {} - -class PropertyTree: - def __init__(self, data: dict[str, Any] | None = None) -> None: - self._data = data or {}
\ No newline at end of file diff --git a/tools/cru-py/cru/service/nginx.py b/tools/cru-py/cru/service/nginx.py index 94c4375..ad32cb9 100644 --- a/tools/cru-py/cru/service/nginx.py +++ b/tools/cru-py/cru/service/nginx.py @@ -4,7 +4,6 @@ import re import subprocess from typing import Literal, Any, cast, ClassVar -import jsonschema def restart_nginx(force=False) -> bool: @@ -16,358 +15,3 @@ def restart_nginx(force=False) -> bool: return False subprocess.run(['docker', 'restart', 'nginx']) return True - - -_server_schema_filename = "server.schema.json" - -with open(join(Paths.nginx2_template_dir, _server_schema_filename)) as f: - server_json_schema = json.load(f) - - -_domain_template_filename = "domain.conf.template" - -NginxSourceFileType = Literal["global", "domain", "http", "https"] - - -class NginxSourceFile: - def __init__(self, path: str) -> None: - """ - path: relative to nginx2_template_dir - """ - self._path = path - is_template = path.endswith(".template") - self._is_template = is_template - filename = basename(path) - self.name = filename[:-len(".template")] if is_template else filename - if is_template: - self._template = Template2.from_file( - join(Paths.nginx2_template_dir, path)) - else: - with open(join(Paths.nginx2_template_dir, path)) as f: - self._content = f.read() - - self._scope: NginxSourceFileType = self._calc_scope() - - @property - def is_template(self) -> bool: - return self._is_template - - @property - def content(self) -> str: - if self._is_template: - raise Exception(f"{self._path} is a template file") - return self._content - - @property - def template(self) -> Template2: - if not self._is_template: - raise Exception(f"{self._path} is not a template file") - return cast(Template2, self._template) - - @property - def global_target_filename(self) -> str: - if self.scope != "global": - raise Exception(f"{self._path} is not a global file") - if self.is_template: - return basename(self._path)[:-len(".template")] - else: - return basename(self._path) - - def _calc_scope(self) -> NginxSourceFileType: - f = basename(self._path) - d = basename(dirname(self._path)) - if f == _domain_template_filename: - return "domain" - elif d in ["global", "http", "https"]: - return cast(Literal["global", "http", "https"], d) - else: - raise Exception(f"Unknown scope for {self._path}") - - @property - def scope(self) -> NginxSourceFileType: - return self._scope - - -_domain_template_source = NginxSourceFile(_domain_template_filename) - -_client_max_body_size_source = NginxSourceFile( - "global/client-max-body-size.conf") -_forbid_unknown_domain_source = NginxSourceFile( - "global/forbid-unknown-domain.conf") -_ssl_template_source = NginxSourceFile("global/ssl.conf.template") -_websocket_source = NginxSourceFile("global/websocket.conf") - -_http_444_source = NginxSourceFile("http/444.segment") -_http_redirect_to_https_source = NginxSourceFile( - "http/redirect-to-https.segment") - -_https_redirect_template_source = NginxSourceFile( - "https/redirect.segment.template") -_https_reverse_proxy_template_source = NginxSourceFile( - "https/reverse-proxy.segment.template") -_https_static_file_template_source = NginxSourceFile( - "https/static-file.segment.template") -_https_static_file_no_strip_prefix_template_source = NginxSourceFile( - "https/static-file.no-strip-prefix.segment.template") - - -class NginxService: - def __init__(self, type: str, path: str) -> None: - self.type = type - self.path = path - self._check_path(path) - - @staticmethod - def _check_path(path: str) -> None: - assert isinstance(path, str) - if path == "" or path == "/": - return - if not path.startswith("/"): - raise UserFriendlyException("Service path should start with '/'.") - if path.endswith("/"): - raise UserFriendlyException( - "Service path should not end with '/'.") - - def generate_https_segment(self) -> str: - raise NotImplementedError() - - -class NginxRedirectService(NginxService): - def __init__(self, path: str, redirect_url: str, redirect_code: int = 307) -> None: - if redirect_url.endswith("/"): - raise UserFriendlyException( - "Redirect URL should not end with '/'.") - - super().__init__("redirect", path) - - self.redirect_url = redirect_url - self.redirect_code = redirect_code - - def generate_https_segment(self) -> str: - vars = { - "PATH": self.path, - "REDIRECT_CODE": self.redirect_code, - "REDIRECT_URL": self.redirect_url - } - return _https_redirect_template_source.template.render(vars) - - @staticmethod - def from_json(json: dict[str, Any]) -> "NginxRedirectService": - path = json["path"] - redirect_url = json["to"] - redirect_code = json.get("code", 307) - assert isinstance(path, str) - assert isinstance(redirect_url, str) - assert isinstance(redirect_code, int) - return NginxRedirectService(path, redirect_url, redirect_code) - - -class NginxReverseProxyService(NginxService): - - _upstream_regex: ClassVar[re.Pattern[str]] = re.compile( - r"^[-_0-9a-zA-Z]+:[0-9]+$") - - def __init__(self, path: str, upstream: str) -> None: - if not self._upstream_regex.match(upstream): - raise UserFriendlyException( - f"Invalid upstream format: {upstream}.") - - super().__init__("reverse-proxy", path) - - self.upstream = upstream - - def generate_https_segment(self) -> str: - vars = { - "PATH": self.path, - "UPSTREAM": self.upstream - } - return _https_reverse_proxy_template_source.template.render(vars) - - @staticmethod - def from_json(json: dict[str, Any]) -> "NginxReverseProxyService": - path = json["path"] - upstream = json["upstream"] - assert isinstance(path, str) - assert isinstance(upstream, str) - return NginxReverseProxyService(path, upstream) - - -class NginxStaticFileService(NginxService): - def __init__(self, path: str, root: str, no_strip_prefix: bool = False) -> None: - super().__init__("static-file", path) - - self.root = root - self.no_strip_prefix = no_strip_prefix - - def generate_https_segment(self) -> str: - vars = { - "PATH": self.path, - "ROOT": self.root, - } - if self.no_strip_prefix: - return _https_static_file_no_strip_prefix_template_source.template.render(vars) - else: - return _https_static_file_template_source.template.render(vars) - - @staticmethod - def from_json(json: dict[str, Any]) -> "NginxStaticFileService": - path = json["path"] - root = json["root"] - no_strip_prefix = json.get("no_strip_prefix", False) - assert isinstance(path, str) - assert isinstance(root, str) - assert isinstance(no_strip_prefix, bool) - return NginxStaticFileService(path, root, no_strip_prefix) - - -def nginx_service_from_json(json: dict[str, Any]) -> NginxService: - type = json["type"] - if type == "redirect": - return NginxRedirectService.from_json(json) - elif type == "reverse-proxy": - return NginxReverseProxyService.from_json(json) - elif type == "static-file": - return NginxStaticFileService.from_json(json) - else: - raise UserFriendlyException(f"Invalid crupest type: {type}.") - - -def _prepend_indent(text: str, indent: str = " " * 4) -> str: - lines = text.split("\n") - for i in range(len(lines)): - if lines[i] != "": - lines[i] = indent + lines[i] - return "\n".join(lines) - - -class NginxDomain: - def __init__(self, domain: str, services: list[NginxService] = []) -> None: - self.domain = domain - self.services = services - - def add_service(self, service: NginxService) -> None: - self.services.append(service) - - def generate_http_segment(self) -> str: - if len(self.services) == 0: - return _http_444_source.content - else: - return _http_redirect_to_https_source.content - - def generate_https_segment(self) -> str: - return "\n\n".join([s.generate_https_segment() for s in self.services]) - - def generate_config(self) -> str: - vars = { - "DOMAIN": self.domain, - "HTTP_SEGMENT": _prepend_indent(self.generate_http_segment()), - "HTTPS_SEGMENT": _prepend_indent(self.generate_https_segment()), - } - return _domain_template_source.template.render(vars) - - def generate_config_file(self, path: str) -> None: - with open(path, "w") as f: - f.write(self.generate_config()) - - @staticmethod - def from_json(root_domain: str, json: dict[str, Any]) -> "NginxDomain": - name = json["name"] - assert isinstance(name, str) - if name == "@" or name == "": - domain = root_domain - else: - domain = f"{name}.{root_domain}" - assert isinstance(json["services"], list) - services = [nginx_service_from_json(s) for s in json["services"]] - return NginxDomain(domain, services) - - -def check_nginx_config_schema(json: Any) -> None: - jsonschema.validate(json, server_json_schema) - - -class NginxServer: - def __init__(self, root_domain: str) -> None: - self.root_domain = root_domain - self.domains: list[NginxDomain] = [] - - def add_sub_domain(self, sub_domain: str, services: list[NginxService]) -> None: - if sub_domain == "" or sub_domain == "@": - domain = self.root_domain - else: - domain = f"{sub_domain}.{self.root_domain}" - self.domains.append(NginxDomain(domain, services)) - - def generate_ssl(self) -> str: - return _ssl_template_source.template.render({ - "ROOT_DOMAIN": self.root_domain - }) - - def generate_global_files(self, d: str) -> None: - for source in [_client_max_body_size_source, _forbid_unknown_domain_source, _websocket_source]: - with open(join(d, source.name), "w") as f: - f.write(source.content) - with open(join(d, _ssl_template_source.name), "w") as f: - f.write(self.generate_ssl()) - - def generate_domain_files(self, d: str) -> None: - for domain in self.domains: - domain.generate_config_file(join(d, f"{domain.domain}.conf")) - - def generate_config(self, d: str) -> None: - create_dir_if_not_exists(d) - self.generate_global_files(d) - - def get_allowed_files(self) -> list[str]: - files = [] - for source in [_client_max_body_size_source, _forbid_unknown_domain_source, _ssl_template_source, _websocket_source]: - files.append(source.name) - for domain in self.domains: - files.append(f"{domain.domain}.conf") - return files - - def check_bad_files(self, d: str) -> list[str]: - allowed_files = self.get_allowed_files() - bad_files = [] - if not ensure_dir(d, must_exist=False): - return [] - for path in os.listdir(d): - if path not in allowed_files: - bad_files.append(path) - return bad_files - - @staticmethod - def from_json(root_domain: str, json: dict[str, Any]) -> "NginxServer": - check_nginx_config_schema(json) - server = NginxServer(root_domain) - sub_domains = json["domains"] - assert isinstance(sub_domains, list) - server.domains = [NginxDomain.from_json( - root_domain, d) for d in sub_domains] - return server - - @staticmethod - def from_json_str(root_domain: str, json_str: str) -> "NginxServer": - return NginxServer.from_json(root_domain, json.loads(json_str)) - - def go(self): - bad_files = self.check_bad_files(Paths.nginx_generated_dir) - if len(bad_files) > 0: - console.print( - "WARNING: It seems there are some bad conf files in the nginx config directory:", style="yellow") - for bad_file in bad_files: - console.print(bad_file, style=file_name_style) - to_delete = Confirm.ask( - "They will affect nginx in a [red]bad[/] way. Do you want to delete them?", default=True, console=console) - if to_delete: - for file in bad_files: - os.remove(join(Paths.nginx_generated_dir, file)) - create_dir_if_not_exists(Paths.generated_dir) - if not ensure_dir(Paths.nginx_generated_dir, must_exist=False): - os.mkdir(Paths.nginx_generated_dir) - console.print( - f"Nginx config directory created at [magenta]{Paths.nginx_generated_dir}[/]", style="green") - self.generate_config(Paths.nginx_generated_dir) - console.print("Nginx config generated.", style="green") - if restart_nginx(): - console.print('Nginx restarted.', style="green") diff --git a/tools/cru-py/cru/system.py b/tools/cru-py/cru/system.py index 4c9de01..f321717 100644 --- a/tools/cru-py/cru/system.py +++ b/tools/cru-py/cru/system.py @@ -7,10 +7,11 @@ def check_debian_derivative_version(name: str) -> None | str: return None with open("/etc/os-release", "r") as f: content = f.read() - if not f"ID={name}" in content: + if f"ID={name}" not in content: return None m = re.search(r'VERSION_ID="(.+)"', content) - if m is None: return None + if m is None: + return None return m.group(1) diff --git a/tools/cru-py/cru/template.py b/tools/cru-py/cru/template.py new file mode 100644 index 0000000..8e06418 --- /dev/null +++ b/tools/cru-py/cru/template.py @@ -0,0 +1,142 @@ +from collections.abc import Iterable, Mapping +import os +import os.path +from string import Template + +from ._error import CruException + + +class CruTemplateError(CruException): + pass + + +class TemplateFile: + def __init__(self, source: str, destination_path: str | None): + self._source = source + self._destination = destination_path + self._template: Template | None = None + + @property + def source(self) -> str: + return self._source + + @property + def destination(self) -> str | None: + return self._destination + + @destination.setter + def destination(self, value: str | None) -> None: + self._destination = value + + @property + def template(self) -> Template: + if self._template is None: + return self.reload_template() + return self._template + + def reload_template(self) -> Template: + with open(self._source, "r") as f: + self._template = Template(f.read()) + return self._template + + @property + def variables(self) -> set[str]: + return set(self.template.get_identifiers()) + + def generate(self, variables: Mapping[str, str]) -> str: + return self.template.substitute(variables) + + def generate_to_destination(self, variables: Mapping[str, str]) -> None: + if self._destination is None: + raise CruTemplateError("No destination specified for this template.") + with open(self._destination, "w") as f: + f.write(self.generate(variables)) + + +class TemplateDirectory: + def __init__( + self, + source: str, + destination: str, + exclude: Iterable[str], + file_suffix: str = ".template", + ): + self._files: list[TemplateFile] | None = None + self._source = source + self._destination = destination + self._exclude = [os.path.normpath(p) for p in exclude] + self._file_suffix = file_suffix + + @property + def files(self) -> list[TemplateFile]: + if self._files is None: + return self.reload() + else: + return self._files + + @property + def source(self) -> str: + return self._source + + @property + def destination(self) -> str: + return self._destination + + @property + def exclude(self) -> list[str]: + return self._exclude + + @property + def file_suffix(self) -> str: + return self._file_suffix + + @staticmethod + def _scan_files( + root_path: str, exclude: list[str], suffix: str | None + ) -> Iterable[str]: + for root, _dirs, files in os.walk(root_path): + for file in files: + if suffix is None or file.endswith(suffix): + path = os.path.join(root, file) + path = os.path.relpath(path, root_path) + if suffix is not None: + path = path[: -len(suffix)] + is_exclude = False + for exclude_path in exclude: + if path.startswith(exclude_path): + is_exclude = True + break + if not is_exclude: + yield path + + def reload(self) -> list[TemplateFile]: + if not os.path.isdir(self.source): + raise CruTemplateError( + f"Source directory {self.source} does not exist or is not a directory." + ) + files = self._scan_files(self.source, self.exclude, self.file_suffix) + self._files = [ + TemplateFile( + os.path.join(self._source, file + self.file_suffix), + os.path.join(self._destination, file), + ) + for file in files + ] + return self._files + + @property + def variables(self) -> set[str]: + s = set() + for file in self.files: + s.update(file.variables) + return s + + def generate_to_destination(self, variables: Mapping[str, str]) -> None: + for file in self.files: + file.generate_to_destination(variables) + + def extra_files_in_destination(self) -> Iterable[str]: + source_files = set(os.path.relpath(f.source, self.source) for f in self.files) + for file in self._scan_files(self.destination, self.exclude, None): + if file not in source_files: + yield file diff --git a/tools/cru-py/cru/value.py b/tools/cru-py/cru/value.py index 189f44f..4096362 100644 --- a/tools/cru-py/cru/value.py +++ b/tools/cru-py/cru/value.py @@ -5,23 +5,23 @@ import secrets import string import uuid from abc import abstractmethod, ABCMeta -from collections.abc import Mapping, Callable -from typing import Any, ClassVar, Literal, TypeVar, Generic, ParamSpec +from collections.abc import Callable +from typing import Any, ClassVar, TypeVar, Generic -from .error import CruInternalError, CruException +from ._error import CruException -def _str_case_in(s: str, case: bool, l: list[str]) -> bool: +def _str_case_in(s: str, case: bool, str_list: list[str]) -> bool: if case: - return s in l + return s in str_list else: - return s.lower() in [s.lower() for s in l] + return s.lower() in [s.lower() for s in str_list] -T = TypeVar("T") +_T = TypeVar("_T") -class CruValueError(CruException): +class CruValueTypeError(CruException): def __init__( self, message: str, @@ -47,17 +47,8 @@ class CruValueError(CruException): return self._value_type -class CruValueValidationError(CruValueError): - pass - - -class CruValueStringConversionError(CruValueError): - pass - - -# TODO: Continue here tomorrow! -class ValueType(Generic[T], metaclass=ABCMeta): - def __init__(self, name: str, _type: type[T]) -> None: +class ValueType(Generic[_T], metaclass=ABCMeta): + def __init__(self, name: str, _type: type[_T]) -> None: self._name = name self._type = _type @@ -66,67 +57,61 @@ class ValueType(Generic[T], metaclass=ABCMeta): return self._name @property - def type(self) -> type[T]: + def type(self) -> type[_T]: return self._type - def check_value_type(self, value: Any) -> bool: - return isinstance(value, self.type) + def check_value_type(self, value: Any) -> None: + if not isinstance(value, self.type): + raise CruValueTypeError("Type of value is wrong.", value, self) - def _do_check_value(self, value: Any) -> T: + def _do_check_value(self, value: Any) -> _T: return value - def check_value(self, value: Any) -> T: - if not isinstance(value, self.type): - raise CruValueValidationError("Value type is wrong.", value, self) + def check_value(self, value: Any) -> _T: + self.check_value_type(value) return self._do_check_value(value) - def _do_check_str_format(self, s: str) -> bool | tuple[bool, str]: + @abstractmethod + def _do_check_str_format(self, s: str) -> None: raise NotImplementedError() def check_str_format(self, s: str) -> None: - ok, err = self._do_check_str_format(s) - if ok is None: - raise CruInternalLogicError("_do_check_str_format should not return None.") - if ok: - return - if err is None: - err = "Invalid value str format." - raise ValueStringConvertionError(err, s, value_type=self) + if not isinstance(s, str): + raise CruValueTypeError("Try to check format on a non-str.", s, self) + self._do_check_str_format(s) @abstractmethod - def _do_convert_value_to_str(self, value: T) -> str: + def _do_convert_value_to_str(self, value: _T) -> str: raise NotImplementedError() - def convert_value_to_str(self, value: T) -> str: + def convert_value_to_str(self, value: _T) -> str: self.check_value(value) return self._do_convert_value_to_str(value) @abstractmethod - def _do_convert_str_to_value(self, s: str) -> T: + def _do_convert_str_to_value(self, s: str) -> _T: raise NotImplementedError() - def convert_str_to_value(self, s: str) -> T: + def convert_str_to_value(self, s: str) -> _T: self.check_str_format(s) return self._do_convert_str_to_value(s) - def check_value_or_try_convert_from_str(self, value_or_str: Any) -> T: + def check_value_or_try_convert_from_str(self, value_or_str: Any) -> _T: try: return self.check_value(value_or_str) - except ValidationError as e: + except CruValueTypeError: if isinstance(value_or_str, str): return self.convert_str_to_value(value_or_str) else: - raise ValidationError( - "Value is not valid and is not a str.", value_or_str, self, inner=e - ) + raise class TextValueType(ValueType[str]): def __init__(self) -> None: - super().__init__("text") + super().__init__("text", str) - def _do_check_str_format(self, s): - return True + def _do_check_str_format(self, _s): + return def _do_convert_value_to_str(self, value): return value @@ -138,14 +123,13 @@ class TextValueType(ValueType[str]): class IntegerValueType(ValueType[int]): def __init__(self) -> None: - super().__init__("integer") + super().__init__("integer", int) def _do_check_str_format(self, s): try: int(s) - return True - except ValueError: - return False + except ValueError as e: + raise CruValueTypeError("Invalid integer format.", s, self) from e def _do_convert_value_to_str(self, value): return str(value) @@ -156,14 +140,13 @@ class IntegerValueType(ValueType[int]): class FloatValueType(ValueType[float]): def __init__(self) -> None: - super().__init__("float") + super().__init__("float", float) def _do_check_str_format(self, s): try: float(s) - return True - except ValueError: - return False + except ValueError as e: + raise CruValueTypeError("Invalid float format.", s, self) from e def _do_convert_value_to_str(self, value): return str(value) @@ -183,7 +166,7 @@ class BooleanValueType(ValueType[bool]): true_list: None | list[str] = None, false_list: None | list[str] = None, ) -> None: - super().__init__("boolean") + super().__init__("boolean", bool) self._case_sensitive = case_sensitive self._valid_true_strs: list[str] = ( true_list or BooleanValueType.DEFAULT_TRUE_LIST @@ -209,15 +192,11 @@ class BooleanValueType(ValueType[bool]): return self._valid_true_strs + self._valid_false_strs def _do_check_str_format(self, s): - if _str_case_in(s, self.case_sensitive, self.valid_boolean_strs): - return True - return ( - False, - f"Not a valid boolean string ({ValueType.case_sensitive_to_str(self.case_sensitive)}). Valid string of true: {' '.join(self._valid_true_strs)}. Valid string of false: {' '.join(self._valid_false_strs)}. All is case insensitive.", - ) + if not _str_case_in(s, self.case_sensitive, self.valid_boolean_strs): + raise CruValueTypeError("Invalid boolean format.", s, self) def _do_convert_value_to_str(self, value): - return "True" if value else "False" + return self._valid_true_strs[0] if value else self._valid_false_strs[0] def _do_convert_str_to_value(self, s): return _str_case_in(s, self.case_sensitive, self._valid_true_strs) @@ -225,9 +204,7 @@ class BooleanValueType(ValueType[bool]): class EnumValueType(ValueType[str]): def __init__(self, valid_values: list[str], /, case_sensitive=False) -> None: - s = " | ".join([f'"{v}"' for v in valid_values]) - self._valid_value_str = f"[ {s} ]" - super().__init__(f"enum{self._valid_value_str}") + super().__init__(f"enum({'|'.join(valid_values)})", str) self._case_sensitive = case_sensitive self._valid_values = valid_values @@ -240,16 +217,11 @@ class EnumValueType(ValueType[str]): return self._valid_values def _do_check_value(self, value): - ok, err = self._do_check_str_format(value) - return ok, (value if ok else err) + self._do_check_str_format(value) def _do_check_str_format(self, s): - if _str_case_in(s, self.case_sensitive, self.valid_values): - return True - return ( - False, - f"Value is not in valid values ({ValueType.case_sensitive_to_str(self.case_sensitive)}): {self._valid_value_str}", - ) + if not _str_case_in(s, self.case_sensitive, self.valid_values): + raise CruValueTypeError("Invalid enum value", s, self) def _do_convert_value_to_str(self, value): return value @@ -262,69 +234,52 @@ TEXT_VALUE_TYPE = TextValueType() INTEGER_VALUE_TYPE = IntegerValueType() BOOLEAN_VALUE_TYPE = BooleanValueType() -P = ParamSpec("P") +class ValueGeneratorBase(Generic[_T], metaclass=ABCMeta): + @abstractmethod + def generate(self) -> _T: + raise NotImplementedError() -class ValueGenerator(Generic[T, P]): - INTERACTIVE_KEY: ClassVar[Literal["interactive"]] = "interactive" + def __call__(self) -> _T: + return self.generate() - def __init__( - self, f: Callable[P, T], /, attributes: None | Mapping[str, Any] = None - ) -> None: - self._f = f - self._attributes = attributes or {} - @property - def f(self) -> Callable[P, T]: - return self._f +class ValueGenerator(ValueGeneratorBase[_T]): + def __init__(self, generate_func: Callable[[], _T]) -> None: + self._generate_func = generate_func @property - def attributes(self) -> Mapping[str, Any]: - return self._attributes + def generate_func(self) -> Callable[[], _T]: + return self._generate_func - def generate(self, *args, **kwargs) -> T: - return self._f(*args, **kwargs) + def generate(self) -> _T: + return self._generate_func() - def __call__(self, *args, **kwargs): - return self._f(*args, **kwargs) - @property - def interactive(self) -> bool: - return self._attributes.get(ValueGenerator.INTERACTIVE_KEY, False) - - @staticmethod - def create_interactive( - f: Callable[P, T], - interactive: bool = True, - /, - attributes: None | Mapping[str, Any] = None, - ) -> "ValueGenerator[T, P]": - return ValueGenerator( - f, dict({ValueGenerator.INTERACTIVE_KEY: interactive}, **(attributes or {})) - ) - - -class UuidValueGenerator(ValueGenerator[str, []]): - def __init__(self) -> None: - super().__init__(lambda: str(uuid.uuid4())) +class UuidValueGenerator(ValueGeneratorBase[str]): + def generate(self): + return str(uuid.uuid4()) -class RandomStringValueGenerator(ValueGenerator[str, []]): - @staticmethod - def _create_generate_ramdom_func(length: int, secure: bool) -> Callable[str, []]: - random_choice = secrets.choice if secure else random.choice - def generate_random_string(): - characters = string.ascii_letters + string.digits - random_string = "".join(random_choice(characters) for _ in range(length)) - return random_string +class RandomStringValueGenerator(ValueGeneratorBase[str]): + def __init__(self, length: int, secure: bool) -> None: + self._length = length + self._secure = secure - return generate_random_string + @property + def length(self) -> int: + return self._length - def __init__(self, length: int, secure: bool) -> None: - super().__init__( - RandomStringValueGenerator._create_generate_ramdom_func(length, secure) - ) + @property + def secure(self) -> bool: + return self._secure + + def generate(self): + random_func = secrets.choice if self._secure else random.choice + characters = string.ascii_letters + string.digits + random_string = "".join(random_func(characters) for _ in range(self._length)) + return random_string UUID_VALUE_GENERATOR = UuidValueGenerator() |