diff options
author | crupest <crupest@outlook.com> | 2023-05-31 22:56:15 +0800 |
---|---|---|
committer | Yuqian Yang <crupest@crupest.life> | 2024-12-18 18:31:27 +0800 |
commit | 723c9a963a96b25a7498f3e0417307e89c8bb684 (patch) | |
tree | 3eff901a5b96eb4ff88d272bed4bb964c6397f1f /tools/cru-py/cru | |
parent | 6e60bb06bd7a5052a7d6c2b5b8df0ab084697fdd (diff) | |
download | crupest-723c9a963a96b25a7498f3e0417307e89c8bb684.tar.gz crupest-723c9a963a96b25a7498f3e0417307e89c8bb684.tar.bz2 crupest-723c9a963a96b25a7498f3e0417307e89c8bb684.zip |
HALF WORK: for sync.
Diffstat (limited to 'tools/cru-py/cru')
-rw-r--r-- | tools/cru-py/cru/__init__.py | 12 | ||||
-rw-r--r-- | tools/cru-py/cru/attr.py | 125 | ||||
-rw-r--r-- | tools/cru-py/cru/config.py | 128 | ||||
-rw-r--r-- | tools/cru-py/cru/excp.py | 137 | ||||
-rw-r--r-- | tools/cru-py/cru/parsing.py | 70 | ||||
-rw-r--r-- | tools/cru-py/cru/paths.py | 63 | ||||
-rw-r--r-- | tools/cru-py/cru/service/__init__.py | 0 | ||||
-rw-r--r-- | tools/cru-py/cru/service/docker.py | 15 | ||||
-rw-r--r-- | tools/cru-py/cru/service/nginx.py | 377 | ||||
-rw-r--r-- | tools/cru-py/cru/system.py | 22 | ||||
-rw-r--r-- | tools/cru-py/cru/value.py | 309 |
11 files changed, 1258 insertions, 0 deletions
diff --git a/tools/cru-py/cru/__init__.py b/tools/cru-py/cru/__init__.py new file mode 100644 index 0000000..e36a778 --- /dev/null +++ b/tools/cru-py/cru/__init__.py @@ -0,0 +1,12 @@ +import sys + + +class CruInitError(Exception): + pass + +def check_python_version(required_version=(3, 11)): + if sys.version_info < required_version: + raise CruInitError(f"Python version must be >= {required_version}!") + + +check_python_version() diff --git a/tools/cru-py/cru/attr.py b/tools/cru-py/cru/attr.py new file mode 100644 index 0000000..d07cc55 --- /dev/null +++ b/tools/cru-py/cru/attr.py @@ -0,0 +1,125 @@ +from collections.abc import Callable +from dataclasses import dataclass +from types import NoneType +from typing import Any +from copy import deepcopy + + +@dataclass +class CruAttr: + name: str + value: Any + description: str + + +@dataclass +class CruAttrDef: + name: str + default_description: str + allow_types: None | set[type] + allow_none: bool + default_value: Any + transformer: Callable[[Any], Any] | None + validator: Callable[[Any], None] + + def __init__(self, name: str, default_description: str, *, + allow_types: list[type] | type | None, allow_none: bool, default_value: Any = None, + transformer: Callable[[Any], Any] | None = None, + validator: Callable[[Any], None] | None = None) -> None: + self.name = name + self.default_description = default_description + self.default_value = default_value + #TODO: CONTINUE TOMORROW + if allow_types is None: + allow_types = [] + elif isinstance(allow_types, type): + allow_types = [allow_types] + else: + for t in allow_types: + if not isinstance(t, type): + raise TypeError(f"Invalid value of python type : {t}") + self.allow_types = set(filter(lambda tt: tt is not NoneType, allow_types)) + self.allow_none = allow_none + self.transformer = transformer + self.validator = validator + self.default_value = self.transform_and_validate(self.default_value) + self.default_value = deepcopy(self.default_value) + + def transform(self, value: Any) -> Any: + if self.transformer is not None: + return self.transformer(value) + return value + + def validate(self, value: Any, /, override_allow_none: bool | None = None) -> None: + allow_none = override_allow_none if override_allow_none is not None else self.allow_none + if value is None and not allow_none: + raise TypeError(f"None is not allowed!") + if len(self.allow_types) != 0 and type(value) not in self.allow_types: + raise TypeError(f"Type of {value} is not allowed!") + if self.validator is not None: + return self.validator(value) + return None + + def transform_and_validate(self, value: Any, /, override_allow_none: bool | None = None) -> Any: + value = self.transform(value) + self.validate(value, override_allow_none) + return value + + def make(self, value: Any, description: None | str = None) -> CruAttr: + value = self.transform_and_validate(value) + return CruAttr(self.name, value if value is not None else deepcopy(self.default_value), + description if description is not None else self.default_description) + + +class CruAttrDefRegistry: + + def __init__(self) -> None: + self._def_list = [] + + @property + def items(self) -> list[CruAttrDef]: + return self._def_list + + def register(self, def_: CruAttrDef): + for i in self._def_list: + if i.name == def_.name: + raise ValueError(f"Attribute {def_.name} already exists!") + self._def_list.append(def_) + + def register_with(self, name: str, default_description: str, *, + allow_types: list[type] | type | None, allow_none: bool, + default_value: Any = None, + transformer: Callable[[Any], Any] | None = None, + validator: Callable[[Any], None] | None = None + ) -> CruAttrDef: + def_ = CruAttrDef(name, default_description, default_value=default_value, allow_types=allow_types, + allow_none=allow_none, transformer=transformer, validator=validator) + self.register(def_) + return def_ + + def register_required(self, name: str, default_description: str, *, + allow_types: list[type] | type | None, + default_value: Any = None, + transformer: Callable[[Any], Any] | None = None, + validator: Callable[[Any], None] | None = None + ) -> CruAttrDef: + return self.register_with(name, default_description, default_value=default_value, allow_types=allow_types, + allow_none=False, transformer=transformer, validator=validator) + + def register_optional(self, name: str, default_description: str, *, + allow_types: list[type] | type | None, + default_value: Any = None, + transformer: Callable[[Any], Any] | None = None, + validator: Callable[[Any], None] | None = None + ) -> CruAttrDef: + return self.register_with(name, default_description, default_value=default_value, allow_types=allow_types, + allow_none=True, transformer=transformer, validator=validator) + + def get_item_optional(self, name: str) -> CruAttrDef | None: + for i in self._def_list: + if i.name == name: + return i + return None + + def __getitem__(self, item) -> CruAttrDef | None: + return self.get_item_optional(item) diff --git a/tools/cru-py/cru/config.py b/tools/cru-py/cru/config.py new file mode 100644 index 0000000..b0c83d5 --- /dev/null +++ b/tools/cru-py/cru/config.py @@ -0,0 +1,128 @@ +from typing import Any, TypeVar, Generic + +from .excp import CruInternalLogicError +from .value import ValueType, ValueGenerator, ValidationError + +T = TypeVar("T") + + +class ConfigItem(Generic[T]): + OptionalValueGenerator = ValueGenerator[T, []] | None + + def __init__(self, name: str, description: str, value_type: ValueType[T], value: T | None, default_value: T, *, + value_generator: OptionalValueGenerator = None) -> None: + self._name = name + self._description = description + self._value_type = value_type + self._default_value = default_value + self._value_generator = value_generator + self._value: T | None = value + + @property + def name(self) -> str: + return self._name + + @property + def description(self) -> str: + return self._description + + @property + def value_type(self) -> ValueType[T]: + return self._value_type + + @property + def default_value(self) -> T: + return self._default_value + + @property + def is_default(self) -> bool: + return self._value is None + + @property + def is_set(self) -> bool: + return not self.is_default + + @property + def value(self) -> T: + return self._value or self._default_value + + def set_value(self, v: T | str, /, allow_convert_from_str=False): + if allow_convert_from_str: + self._value = self.value_type.check_value(v) + else: + self._value = self.value_type.check_value_or_try_convert_from_str(v) + + @value.setter + def value(self, v: T) -> None: + self.set_value(v) + + @property + def value_generator(self) -> OptionalValueGenerator: + return self._value_generator + + def generate_value(self, allow_interactive=False) -> T | None: + if self.value_generator is None: return None + if self.value_generator.interactive and not allow_interactive: + return None + else: + v = self.generate_value() + try: + self.value_type.check_value(v) + return v + except ValidationError as e: + raise CruInternalLogicError("Config value generator returns invalid value.", name=self.name, inner=e) + + def copy(self) -> "ConfigItem": + return ConfigItem(self.name, self.description, self.value_type, + self._value.copy() if self._value is not None else None, self._default_value.copy(), + value_generator=self.value_generator) + + +class Configuration: + def __init__(self, items: None | list[ConfigItem] = None) -> None: + self._items: list[ConfigItem] = items or [] + + @property + def items(self) -> list[ConfigItem]: + return self._items + + @property + def item_map(self) -> dict[str, ConfigItem]: + return {i.name: i for i in self.items} + + def get_optional_item(self, name: str) -> ConfigItem | None: + for i in self.items: + if i.name == name: + return i + return None + + def clear(self) -> None: + self._items.clear() + + def has_item(self, name: str) -> bool: + return self.get_optional_item(name) is not None + + def add_item(self, item: ConfigItem): + i = self.get_optional_item(item.name) + if i is not None: + raise CruInternalLogicError("Config item of the name already exists.", name=item.name) + self.items.append(item) + return item + + def set_value(self, name: str, v: Any, /, allow_convert_from_str=False): + i = self.get_optional_item(name) + if i is None: + raise CruInternalLogicError("No config item of the name. Can't set value.", name=name) + i.set_value(v, allow_convert_from_str) + + def copy(self) -> "Configuration": + return Configuration([i.copy() for i in self.items]) + + def __getitem__(self, name: str) -> ConfigItem: + i = self.get_optional_item(name) + if i is not None: + return i + raise CruInternalLogicError('No config item of the name.', name=name) + + def __contains__(self, name: str): + return self.has_item(name) diff --git a/tools/cru-py/cru/excp.py b/tools/cru-py/cru/excp.py new file mode 100644 index 0000000..5a5871b --- /dev/null +++ b/tools/cru-py/cru/excp.py @@ -0,0 +1,137 @@ +from collections.abc import Callable +from dataclasses import dataclass +from types import NoneType +from typing import Any + +from cru.attr import CruAttrDefRegistry + +CRU_EXCEPTION_ATTR_DEF_REGISTRY = CruAttrDefRegistry() + + +class CruException(Exception): + @staticmethod + def transform_inner(inner: Exception | list[Exception] | None): + if inner is None: + return None + if isinstance(inner, Exception): + return [inner] + if isinstance(inner, list): + return inner + + @staticmethod + def validate_inner(inner: list[Exception]): + for i in inner: + if not isinstance(i, Exception): + raise TypeError(f"Invalid inner exception: {i}") + + MESSAGE_DEF = CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_required("message", "Message describing the exception.", + allow_types=str, default_value="") + INNER_DEF = CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_required("inner", "Inner exception.", + allow_types=list, default_value=[], + transformer=transform_inner, validator=validate_inner) + INTERNAL_DEF = CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_required("internal", + "True if the exception is caused by wrong internal logic. False if it is caused by user's wrong input.", + allow_types=bool, default_value=False) + CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_optional("name", "Name of the object that causes the exception.", + allow_types=str) + CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_optional("value", "Value that causes the exception.", + allow_types=[]) + CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with("path", "Path that causes the exception.",) + CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with("type", "Python type related to the exception.") + + def __init__(self, message: str, *args, + inner: Exception | list[Exception] | None = None, + internal: bool = False, + name: str | None = None, + value: Any | None = None, + path: str | None = None, + type_: type | None = None, + init_attrs: dict[str, Any] | None = None, + attrs: dict[str, Any] | None = None, **kwargs) -> None: + super().__init__(message, *args) + + self._attrs = { + CruException.MESSAGE_KEY: message, + CruException.INTERNAL_KEY: internal, + CruException.INNER_KEY: inner, + CruException.NAME_KEY: name, + CruException.VALUE_KEY: value, + CruException.PATH_KEY: path, + CruException.TYPE_KEY: type_, + } + if init_attrs is not None: + self._attrs.update(init_attrs) + if attrs is not None: + self._attrs.update(attrs) + self._attrs.update(kwargs) + + @property + def message(self) -> str: + return self[CruException.MESSAGE_KEY] + + @property + def internal(self) -> bool: + return self[CruException.INTERNAL_KEY] + + @property + def inner(self) -> list[Exception]: + return self[CruException.INNER_KEY] + + @property + def name(self) -> str | None: + return self[CruException.NAME_KEY] + + @property + def value(self) -> Any | None: + return self[CruException.VALUE_KEY] + + @property + def path(self) -> str | None: + return self[CruException.PATH_KEY] + + @property + def type(self) -> type | None: + return self[CruException.TYPE_KEY] + + def _get_attr_list_recursive(self, name: str, depth: int, max_depth: int, l: list[Any]): + if 0 < max_depth < depth + 1: + return + a = self._attrs.get(name, None) + if a is not None: + l.append(a) + for i in self.inner: + if isinstance(i, CruException): + i._get_attr_list_recursive(name, depth + 1, max_depth, l) + + def get_attr_list_recursive(self, name: str, /, max_depth: int = -1) -> list[Any]: + l = [] + self._get_attr_list_recursive(name, 0, max_depth, l) + return l + + def get_optional_attr(self, name: str, max_depth: int = -1) -> Any | None: + l = self.get_attr_list_recursive(name, max_depth) + return l[0] if len(l) > 0 else None + + def __getitem__(self, name: str) -> Any | None: + return self.get_optional_attr(name) + + +class CruInternalLogicError(CruException): + def __init__(self, message: str, *args, **kwargs) -> None: + super().__init__(message, *args, internal=True, **kwargs) + + +class UserFriendlyException(CruException): + USER_MESSAGE_KEY = "user_message" + + CRU_EXCEPTION_ATTR_DEF_REGISTRY.register( + CruExceptionAttrDef(USER_MESSAGE_KEY, "Message describing the exception, but with user-friendly language.")) + + def __init__(self, message: str, user_message: str | None = None, *args, **kwargs) -> None: + if user_message is None: + user_message = message + super().__init__(message, *args, init_attrs={UserFriendlyException.USER_MESSAGE_KEY: user_message}, **kwargs) + + @property + def user_message(self) -> str: + return self[UserFriendlyException.USER_MESSAGE_KEY] diff --git a/tools/cru-py/cru/parsing.py b/tools/cru-py/cru/parsing.py new file mode 100644 index 0000000..be7bbf4 --- /dev/null +++ b/tools/cru-py/cru/parsing.py @@ -0,0 +1,70 @@ +from abc import ABCMeta, abstractmethod
+from typing import TypeVar, Generic, NoReturn, Callable
+
+from cru.excp import CruException, CRU_EXCEPTION_ATTR_DEF_REGISTRY
+
+R = TypeVar("R")
+
+
+class ParseException(CruException):
+ LINE_NUMBER_KEY = "line_number"
+
+ CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with(LINE_NUMBER_KEY, "Line number of the error.")
+
+
+class Parser(Generic[R], metaclass=ABCMeta):
+ def __init__(self, name: str) -> None:
+ self._name = name
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @abstractmethod
+ def parse(self, s: str) -> R:
+ raise NotImplementedError()
+
+ def raise_parse_exception(self, s: str, line_number: int | None = None) -> NoReturn:
+ a = f" at line {line_number}" if line_number is not None else ""
+ raise ParseException(f"Parser {self.name} failed{a}, {s}")
+
+
+class SimpleLineConfigParser(Parser[list[tuple[str, str]]]):
+ def __init__(self) -> None:
+ super().__init__(type(self).__name__)
+
+ def _parse(self, s: str, f: Callable[[str, str], None]) -> None:
+ for ln, line in enumerate(s.splitlines()):
+ line_number = ln + 1
+ # check if it's a comment
+ if line.strip().startswith("#"):
+ continue
+ # check if there is a '='
+ if line.find("=") == -1:
+ self.raise_parse_exception(f"There is even no '='!", line_number)
+ # split at first '='
+ key, value = line.split("=", 1)
+ key = key.strip()
+ value = value.strip()
+ f(key, value)
+
+ def parse(self, s: str) -> list[tuple[str, str]]:
+ items = []
+ self._parse(s, lambda key, value: items.append((key, value)))
+ return items
+
+ def parse_to_dict(self, s: str, /, allow_override: bool = False) -> tuple[dict[str, str], list[tuple[str, str]]]:
+ d = {}
+ duplicate = []
+
+ def add(key: str, value: str) -> None:
+ if key in d:
+ if allow_override:
+ duplicate.append((key, d[key]))
+ d[key] = value
+ else:
+ self.raise_parse_exception(f"Key '{key}' already exists!", None)
+ d[key] = value
+
+ self._parse(s, add)
+ return d, duplicate
diff --git a/tools/cru-py/cru/paths.py b/tools/cru-py/cru/paths.py new file mode 100644 index 0000000..df5042b --- /dev/null +++ b/tools/cru-py/cru/paths.py @@ -0,0 +1,63 @@ +from pathlib import Path +import os + +from .excp import CruException + + +class ApplicationPathError(CruException): + def __init__(self, message: str, p: str | Path, *args, **kwargs): + super().__init__(message, *args, path=str(p), **kwargs) + + +def check_parents_dir(p: str | Path, /, must_exist: bool = False) -> bool: + p = Path(p) if isinstance(p, str) else p + for p in reversed(p.parents): + if not p.exists() and not must_exist: + return False + if not p.is_dir(): + raise ApplicationPathError("Parents path should be a dir.", p) + return True + + +class ApplicationPath: + def __init__(self, p: str | Path, is_dir: bool) -> None: + self._path = Path(p) if isinstance(p, str) else p + self._is_dir = is_dir + + @property + def path(self) -> Path: + return self._path + + @property + def is_dir(self) -> bool: + return self._is_dir + + def check_parents(self, must_exist: bool = False) -> bool: + return check_parents_dir(self._path.parent, must_exist) + + def check_self(self, must_exist: bool = False) -> bool: + if not self.check_parents(must_exist): + return False + if not self.path.exists(): + if not must_exist: return False + raise ApplicationPathError("Mot exist.", self.path) + if self.is_dir: + if not self.path.is_dir(): + raise ApplicationPathError("Should be a directory, but not.", self.path) + else: + return False + else: + if not self.path.is_file(): + raise ApplicationPathError("Should be a file, but not.", self.path) + else: + return False + + def ensure(self, create_file: bool = False) -> None: + e = self.check_self(False) + if not e: + os.makedirs(self.path.parent, exist_ok=True) + if self.is_dir: + os.mkdir(self.path) + elif create_file: + with open(self.path, "w") as f: + f.write("") diff --git a/tools/cru-py/cru/service/__init__.py b/tools/cru-py/cru/service/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tools/cru-py/cru/service/__init__.py diff --git a/tools/cru-py/cru/service/docker.py b/tools/cru-py/cru/service/docker.py new file mode 100644 index 0000000..42d4a35 --- /dev/null +++ b/tools/cru-py/cru/service/docker.py @@ -0,0 +1,15 @@ +import shutil + + +class DockerController: + DOCKER_BIN_NAME = "docker" + + def __init__(self, docker_bin: None | str = None) -> None: + self._docker_bin = docker_bin + + @property + def docker_bin(self) -> str: + if self._docker_bin is None: + self._docker_bin = shutil.which(self.DOCKER_BIN_NAME) + return self._docker_bin + diff --git a/tools/cru-py/cru/service/nginx.py b/tools/cru-py/cru/service/nginx.py new file mode 100644 index 0000000..9298623 --- /dev/null +++ b/tools/cru-py/cru/service/nginx.py @@ -0,0 +1,377 @@ +from typing import Literal, Any, cast, ClassVar +import os +from os.path import join, basename, dirname +import subprocess +import re +import json +import jsonschema + +from crupest.template2 import Template2 +from crupest.tui import Paths, UserFriendlyException, create_dir_if_not_exists, console, Confirm, ensure_dir +from crupest.ui_base import file_name_style + + +def restart_nginx(force=False) -> bool: + if not force: + p = subprocess.run(['docker', "container", "ls", + "-f", "name=nginx", "-q"], capture_output=True) + container: str = p.stdout.decode("utf-8") + if len(container.strip()) == 0: + return False + subprocess.run(['docker', 'restart', 'nginx']) + return True + + +_server_schema_filename = "server.schema.json" + +with open(join(Paths.nginx2_template_dir, _server_schema_filename)) as f: + server_json_schema = json.load(f) + + +_domain_template_filename = "domain.conf.template" + +NginxSourceFileType = Literal["global", "domain", "http", "https"] + + +class NginxSourceFile: + def __init__(self, path: str) -> None: + """ + path: relative to nginx2_template_dir + """ + self._path = path + is_template = path.endswith(".template") + self._is_template = is_template + filename = basename(path) + self.name = filename[:-len(".template")] if is_template else filename + if is_template: + self._template = Template2.from_file( + join(Paths.nginx2_template_dir, path)) + else: + with open(join(Paths.nginx2_template_dir, path)) as f: + self._content = f.read() + + self._scope: NginxSourceFileType = self._calc_scope() + + @property + def is_template(self) -> bool: + return self._is_template + + @property + def content(self) -> str: + if self._is_template: + raise Exception(f"{self._path} is a template file") + return self._content + + @property + def template(self) -> Template2: + if not self._is_template: + raise Exception(f"{self._path} is not a template file") + return cast(Template2, self._template) + + @property + def global_target_filename(self) -> str: + if self.scope != "global": + raise Exception(f"{self._path} is not a global file") + if self.is_template: + return basename(self._path)[:-len(".template")] + else: + return basename(self._path) + + def _calc_scope(self) -> NginxSourceFileType: + f = basename(self._path) + d = basename(dirname(self._path)) + if f == _domain_template_filename: + return "domain" + elif d in ["global", "http", "https"]: + return cast(Literal["global", "http", "https"], d) + else: + raise Exception(f"Unknown scope for {self._path}") + + @property + def scope(self) -> NginxSourceFileType: + return self._scope + + +_domain_template_source = NginxSourceFile(_domain_template_filename) + +_client_max_body_size_source = NginxSourceFile( + "global/client-max-body-size.conf") +_forbid_unknown_domain_source = NginxSourceFile( + "global/forbid-unknown-domain.conf") +_ssl_template_source = NginxSourceFile("global/ssl.conf.template") +_websocket_source = NginxSourceFile("global/websocket.conf") + +_http_444_source = NginxSourceFile("http/444.segment") +_http_redirect_to_https_source = NginxSourceFile( + "http/redirect-to-https.segment") + +_https_redirect_template_source = NginxSourceFile( + "https/redirect.segment.template") +_https_reverse_proxy_template_source = NginxSourceFile( + "https/reverse-proxy.segment.template") +_https_static_file_template_source = NginxSourceFile( + "https/static-file.segment.template") +_https_static_file_no_strip_prefix_template_source = NginxSourceFile( + "https/static-file.no-strip-prefix.segment.template") + + +class NginxService: + def __init__(self, type: str, path: str) -> None: + self.type = type + self.path = path + self._check_path(path) + + @staticmethod + def _check_path(path: str) -> None: + assert isinstance(path, str) + if path == "" or path == "/": + return + if not path.startswith("/"): + raise UserFriendlyException("Service path should start with '/'.") + if path.endswith("/"): + raise UserFriendlyException( + "Service path should not end with '/'.") + + def generate_https_segment(self) -> str: + raise NotImplementedError() + + +class NginxRedirectService(NginxService): + def __init__(self, path: str, redirect_url: str, redirect_code: int = 307) -> None: + if redirect_url.endswith("/"): + raise UserFriendlyException( + "Redirect URL should not end with '/'.") + + super().__init__("redirect", path) + + self.redirect_url = redirect_url + self.redirect_code = redirect_code + + def generate_https_segment(self) -> str: + vars = { + "PATH": self.path, + "REDIRECT_CODE": self.redirect_code, + "REDIRECT_URL": self.redirect_url + } + return _https_redirect_template_source.template.render(vars) + + @staticmethod + def from_json(json: dict[str, Any]) -> "NginxRedirectService": + path = json["path"] + redirect_url = json["to"] + redirect_code = json.get("code", 307) + assert isinstance(path, str) + assert isinstance(redirect_url, str) + assert isinstance(redirect_code, int) + return NginxRedirectService(path, redirect_url, redirect_code) + + +class NginxReverseProxyService(NginxService): + + _upstream_regex: ClassVar[re.Pattern[str]] = re.compile( + r"^[-_0-9a-zA-Z]+:[0-9]+$") + + def __init__(self, path: str, upstream: str) -> None: + if not self._upstream_regex.match(upstream): + raise UserFriendlyException( + f"Invalid upstream format: {upstream}.") + + super().__init__("reverse-proxy", path) + + self.upstream = upstream + + def generate_https_segment(self) -> str: + vars = { + "PATH": self.path, + "UPSTREAM": self.upstream + } + return _https_reverse_proxy_template_source.template.render(vars) + + @staticmethod + def from_json(json: dict[str, Any]) -> "NginxReverseProxyService": + path = json["path"] + upstream = json["upstream"] + assert isinstance(path, str) + assert isinstance(upstream, str) + return NginxReverseProxyService(path, upstream) + + +class NginxStaticFileService(NginxService): + def __init__(self, path: str, root: str, no_strip_prefix: bool = False) -> None: + super().__init__("static-file", path) + + self.root = root + self.no_strip_prefix = no_strip_prefix + + def generate_https_segment(self) -> str: + vars = { + "PATH": self.path, + "ROOT": self.root, + } + if self.no_strip_prefix: + return _https_static_file_no_strip_prefix_template_source.template.render(vars) + else: + return _https_static_file_template_source.template.render(vars) + + @staticmethod + def from_json(json: dict[str, Any]) -> "NginxStaticFileService": + path = json["path"] + root = json["root"] + no_strip_prefix = json.get("no_strip_prefix", False) + assert isinstance(path, str) + assert isinstance(root, str) + assert isinstance(no_strip_prefix, bool) + return NginxStaticFileService(path, root, no_strip_prefix) + + +def nginx_service_from_json(json: dict[str, Any]) -> NginxService: + type = json["type"] + if type == "redirect": + return NginxRedirectService.from_json(json) + elif type == "reverse-proxy": + return NginxReverseProxyService.from_json(json) + elif type == "static-file": + return NginxStaticFileService.from_json(json) + else: + raise UserFriendlyException(f"Invalid crupest type: {type}.") + + +def _prepend_indent(text: str, indent: str = " " * 4) -> str: + lines = text.split("\n") + for i in range(len(lines)): + if lines[i] != "": + lines[i] = indent + lines[i] + return "\n".join(lines) + + +class NginxDomain: + def __init__(self, domain: str, services: list[NginxService] = []) -> None: + self.domain = domain + self.services = services + + def add_service(self, service: NginxService) -> None: + self.services.append(service) + + def generate_http_segment(self) -> str: + if len(self.services) == 0: + return _http_444_source.content + else: + return _http_redirect_to_https_source.content + + def generate_https_segment(self) -> str: + return "\n\n".join([s.generate_https_segment() for s in self.services]) + + def generate_config(self) -> str: + vars = { + "DOMAIN": self.domain, + "HTTP_SEGMENT": _prepend_indent(self.generate_http_segment()), + "HTTPS_SEGMENT": _prepend_indent(self.generate_https_segment()), + } + return _domain_template_source.template.render(vars) + + def generate_config_file(self, path: str) -> None: + with open(path, "w") as f: + f.write(self.generate_config()) + + @staticmethod + def from_json(root_domain: str, json: dict[str, Any]) -> "NginxDomain": + name = json["name"] + assert isinstance(name, str) + if name == "@" or name == "": + domain = root_domain + else: + domain = f"{name}.{root_domain}" + assert isinstance(json["services"], list) + services = [nginx_service_from_json(s) for s in json["services"]] + return NginxDomain(domain, services) + + +def check_nginx_config_schema(json: Any) -> None: + jsonschema.validate(json, server_json_schema) + + +class NginxServer: + def __init__(self, root_domain: str) -> None: + self.root_domain = root_domain + self.domains: list[NginxDomain] = [] + + def add_sub_domain(self, sub_domain: str, services: list[NginxService]) -> None: + if sub_domain == "" or sub_domain == "@": + domain = self.root_domain + else: + domain = f"{sub_domain}.{self.root_domain}" + self.domains.append(NginxDomain(domain, services)) + + def generate_ssl(self) -> str: + return _ssl_template_source.template.render({ + "ROOT_DOMAIN": self.root_domain + }) + + def generate_global_files(self, d: str) -> None: + for source in [_client_max_body_size_source, _forbid_unknown_domain_source, _websocket_source]: + with open(join(d, source.name), "w") as f: + f.write(source.content) + with open(join(d, _ssl_template_source.name), "w") as f: + f.write(self.generate_ssl()) + + def generate_domain_files(self, d: str) -> None: + for domain in self.domains: + domain.generate_config_file(join(d, f"{domain.domain}.conf")) + + def generate_config(self, d: str) -> None: + create_dir_if_not_exists(d) + self.generate_global_files(d) + + def get_allowed_files(self) -> list[str]: + files = [] + for source in [_client_max_body_size_source, _forbid_unknown_domain_source, _ssl_template_source, _websocket_source]: + files.append(source.name) + for domain in self.domains: + files.append(f"{domain.domain}.conf") + return files + + def check_bad_files(self, d: str) -> list[str]: + allowed_files = self.get_allowed_files() + bad_files = [] + if not ensure_dir(d, must_exist=False): + return [] + for path in os.listdir(d): + if path not in allowed_files: + bad_files.append(path) + return bad_files + + @staticmethod + def from_json(root_domain: str, json: dict[str, Any]) -> "NginxServer": + check_nginx_config_schema(json) + server = NginxServer(root_domain) + sub_domains = json["domains"] + assert isinstance(sub_domains, list) + server.domains = [NginxDomain.from_json( + root_domain, d) for d in sub_domains] + return server + + @staticmethod + def from_json_str(root_domain: str, json_str: str) -> "NginxServer": + return NginxServer.from_json(root_domain, json.loads(json_str)) + + def go(self): + bad_files = self.check_bad_files(Paths.nginx_generated_dir) + if len(bad_files) > 0: + console.print( + "WARNING: It seems there are some bad conf files in the nginx config directory:", style="yellow") + for bad_file in bad_files: + console.print(bad_file, style=file_name_style) + to_delete = Confirm.ask( + "They will affect nginx in a [red]bad[/] way. Do you want to delete them?", default=True, console=console) + if to_delete: + for file in bad_files: + os.remove(join(Paths.nginx_generated_dir, file)) + create_dir_if_not_exists(Paths.generated_dir) + if not ensure_dir(Paths.nginx_generated_dir, must_exist=False): + os.mkdir(Paths.nginx_generated_dir) + console.print( + f"Nginx config directory created at [magenta]{Paths.nginx_generated_dir}[/]", style="green") + self.generate_config(Paths.nginx_generated_dir) + console.print("Nginx config generated.", style="green") + if restart_nginx(): + console.print('Nginx restarted.', style="green") diff --git a/tools/cru-py/cru/system.py b/tools/cru-py/cru/system.py new file mode 100644 index 0000000..2e05cd1 --- /dev/null +++ b/tools/cru-py/cru/system.py @@ -0,0 +1,22 @@ +import re +import os.path + + +def check_debian_derivative_version(name: str) -> None | str: + if not os.path.isfile("/etc/os-release"): + return None + with open("/etc/os-release", "r") as f: + content = f.read() + if not f"ID={name}" in content: + return None + m = re.search(r'VERSION_ID="(.+)"', content) + if m is None: return None + return m.group(1) + + +def check_ubuntu_version() -> None | str: + return check_debian_derivative_version("ubuntu") + + +def check_debian_version() -> None | str: + return check_debian_derivative_version("debian") diff --git a/tools/cru-py/cru/value.py b/tools/cru-py/cru/value.py new file mode 100644 index 0000000..cddbde9 --- /dev/null +++ b/tools/cru-py/cru/value.py @@ -0,0 +1,309 @@ +import random +import secrets +import string +import uuid +from abc import abstractmethod, ABCMeta +from collections.abc import Mapping, Callable +from typing import Any, ClassVar, Literal, TypeVar, Generic, ParamSpec + +from .excp import CruInternalLogicError, CruException, CRU_EXCEPTION_ATTR_DEF_REGISTRY + + +def _str_case_in(s: str, case: bool, l: list[str]) -> bool: + if case: + return s in l + else: + return s.lower() in [s.lower() for s in l] + + +_ValueTypeForward = type["ValueType"] + +T = TypeVar("T") + + +class _ValueErrorMixin: + VALUE_TYPE_KEY = "value_type" + + CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with( + VALUE_TYPE_KEY, + "The type of the value that causes the exception." + ) + + +class ValidationError(CruException, _ValueErrorMixin): + def __init__(self, message: str, value: Any, value_type: _ValueTypeForward[T] | None, *args, **kwargs): + super().__init__(message, *args, value=value, type_=value_type.type, init_attrs={ + ValidationError.VALUE_TYPE_KEY: value_type, + }, **kwargs) + + @property + def value_type(self) -> _ValueTypeForward[T] | None: + return self[ValidationError.VALUE_TYPE_KEY] + + +class ValueStringConvertionError(CruException, _ValueErrorMixin): + def __init__(self, message: str, value: Any, value_type: _ValueTypeForward[T] | None, *args, + **kwargs): + super().__init__(message, *args, value=value, type_=value_type.type, init_attrs={ + ValueStringConvertionError.VALUE_TYPE_KEY: value_type, + }, **kwargs) + + @property + def value_type(self) -> _ValueTypeForward[T] | None: + return self[ValueStringConvertionError.VALUE_TYPE_KEY] + + +class ValueType(Generic[T], metaclass=ABCMeta): + @staticmethod + def case_sensitive_to_str(case_sensitive: bool) -> str: + return f"case-{'' if case_sensitive else 'in'}sensitive" + + def __init__(self, name: str) -> None: + self._name = name + self._type = type("T") + + @property + def name(self) -> str: + return self._name + + @property + def type(self) -> type: + return self._type + + def is_instance_of_value_type(self, value: Any) -> bool: + return isinstance(value, self.type) + + def _do_check_value(self, value: Any) -> tuple[True, T] | tuple[False, None | str]: + return True, value + + def check_value(self, value: Any) -> T: + if not isinstance(value, self.type): + raise ValidationError("Value type is wrong.", value, self) + ok, v_or_err = self._do_check_value(value) + if ok: + return v_or_err + else: + raise ValidationError(v_or_err or "Value is not valid.", value, self) + + @abstractmethod + def _do_check_str_format(self, s: str) -> bool | tuple[bool, str]: + """ + Return None for no error. Otherwise, return error message. + """ + raise NotImplementedError() + + def check_str_format(self, s: str) -> None: + ok, err = self._do_check_str_format(s) + if ok is None: raise CruInternalLogicError("_do_check_str_format should not return None.") + if ok: return + if err is None: + err = "Invalid value str format." + raise ValueStringConvertionError(err, s, value_type=self) + + @abstractmethod + def _do_convert_value_to_str(self, value: T) -> str: + raise NotImplementedError() + + def convert_value_to_str(self, value: T) -> str: + self.check_value(value) + return self._do_convert_value_to_str(value) + + @abstractmethod + def _do_convert_str_to_value(self, s: str) -> T: + raise NotImplementedError() + + def convert_str_to_value(self, s: str) -> T: + self.check_str_format(s) + return self._do_convert_str_to_value(s) + + def check_value_or_try_convert_from_str(self, value_or_str: Any) -> T: + try: + return self.check_value(value_or_str) + except ValidationError as e: + if isinstance(value_or_str, str): + return self.convert_str_to_value(value_or_str) + else: + raise ValidationError("Value is not valid and is not a str.", value_or_str, self, + inner=e) + + +class TextValueType(ValueType[str]): + def __init__(self) -> None: + super().__init__("text") + + def _do_check_str_format(self, s): + return True + + def _do_convert_value_to_str(self, value): + return value + + def _do_convert_str_to_value(self, s): + return s + + +class IntegerValueType(ValueType[int]): + + def __init__(self) -> None: + super().__init__("integer") + + def _do_check_str_format(self, s): + try: + int(s) + return True + except ValueError: + return False + + def _do_convert_value_to_str(self, value): + return str(value) + + def _do_convert_str_to_value(self, s): + return int(s) + + +class FloatValueType(ValueType[float]): + def __init__(self) -> None: + super().__init__("float") + + def _do_check_str_format(self, s): + try: + float(s) + return True + except ValueError: + return False + + def _do_convert_value_to_str(self, value): + return str(value) + + def _do_convert_str_to_value(self, s): + return float(s) + + +class BooleanValueType(ValueType[bool]): + DEFAULT_TRUE_LIST: ClassVar[list[str]] = ["true", "yes", "y", "on", "1"] + DEFAULT_FALSE_LIST: ClassVar[list[str]] = ["false", "no", "n", "off", "0"] + + def __init__(self, *, case_sensitive=False, true_list: None | list[str] = None, + false_list: None | list[str] = None) -> None: + super().__init__("boolean") + self._case_sensitive = case_sensitive + self._valid_true_strs: list[str] = true_list or BooleanValueType.DEFAULT_TRUE_LIST + self._valid_false_strs: list[str] = false_list or BooleanValueType.DEFAULT_FALSE_LIST + + @property + def case_sensitive(self) -> bool: + return self._case_sensitive + + @property + def valid_true_strs(self) -> list[str]: + return self._valid_true_strs + + @property + def valid_false_strs(self) -> list[str]: + return self._valid_false_strs + + @property + def valid_boolean_strs(self) -> list[str]: + return self._valid_true_strs + self._valid_false_strs + + def _do_check_str_format(self, s): + if _str_case_in(s, self.case_sensitive, self.valid_boolean_strs): return True + return False, f"Not a valid boolean string ({ValueType.case_sensitive_to_str(self.case_sensitive)}). Valid string of true: {' '.join(self._valid_true_strs)}. Valid string of false: {' '.join(self._valid_false_strs)}. All is case insensitive." + + def _do_convert_value_to_str(self, value): + return "True" if value else "False" + + def _do_convert_str_to_value(self, s): + return _str_case_in(s, self.case_sensitive, self._valid_true_strs) + + +class EnumValueType(ValueType[str]): + def __init__(self, valid_values: list[str], /, case_sensitive=False) -> None: + s = ' | '.join([f'"{v}"' for v in valid_values]) + self._valid_value_str = f'[ {s} ]' + super().__init__(f"enum{self._valid_value_str}") + self._case_sensitive = case_sensitive + self._valid_values = valid_values + + @property + def case_sensitive(self) -> bool: + return self._case_sensitive + + @property + def valid_values(self) -> list[str]: + return self._valid_values + + def _do_check_value(self, value): + ok, err = self._do_check_str_format(value) + return ok, (value if ok else err) + + def _do_check_str_format(self, s): + if _str_case_in(s, self.case_sensitive, self.valid_values): return True + return False, f"Value is not in valid values ({ValueType.case_sensitive_to_str(self.case_sensitive)}): {self._valid_value_str}" + + def _do_convert_value_to_str(self, value): + return value + + def _do_convert_str_to_value(self, s): + return s + + +TEXT_VALUE_TYPE = TextValueType() +INTEGER_VALUE_TYPE = IntegerValueType() +BOOLEAN_VALUE_TYPE = BooleanValueType() + +P = ParamSpec('P') + + +class ValueGenerator(Generic[T, P]): + INTERACTIVE_KEY: ClassVar[Literal["interactive"]] = "interactive" + + def __init__(self, f: Callable[P, T], /, attributes: None | Mapping[str, Any] = None) -> None: + self._f = f + self._attributes = attributes or {} + + @property + def f(self) -> Callable[P, T]: + return self._f + + @property + def attributes(self) -> Mapping[str, Any]: + return self._attributes + + def generate(self, *args, **kwargs) -> T: + return self._f(*args, **kwargs) + + def __call__(self, *args, **kwargs): + return self._f(*args, **kwargs) + + @property + def interactive(self) -> bool: + return self._attributes.get(ValueGenerator.INTERACTIVE_KEY, False) + + @staticmethod + def create_interactive(f: Callable[P, T], interactive: bool = True, /, + attributes: None | Mapping[str, Any] = None) -> "ValueGenerator[T, P]": + return ValueGenerator(f, dict({ValueGenerator.INTERACTIVE_KEY: interactive}, **(attributes or {}))) + + +class UuidValueGenerator(ValueGenerator[str, []]): + def __init__(self) -> None: + super().__init__(lambda: str(uuid.uuid4())) + + +class RandomStringValueGenerator(ValueGenerator[str, []]): + @staticmethod + def _create_generate_ramdom_func(length: int, secure: bool) -> Callable[str, []]: + random_choice = secrets.choice if secure else random.choice + + def generate_random_string(): + characters = string.ascii_letters + string.digits + random_string = ''.join(random_choice(characters) for _ in range(length)) + return random_string + + return generate_random_string + + def __init__(self, length: int, secure: bool) -> None: + super().__init__(RandomStringValueGenerator._create_generate_ramdom_func(length, secure)) + + +UUID_VALUE_GENERATOR = UuidValueGenerator() |