diff options
| author | crupest <crupest@outlook.com> | 2023-05-31 22:56:15 +0800 | 
|---|---|---|
| committer | Yuqian Yang <crupest@crupest.life> | 2024-12-18 18:31:27 +0800 | 
| commit | 410bc000613b355ffcfbcd282061849b5129bdea (patch) | |
| tree | 3eff901a5b96eb4ff88d272bed4bb964c6397f1f | |
| parent | 889e10a1908c32514edf23518c5ea4c26ffc0ae0 (diff) | |
| download | crupest-410bc000613b355ffcfbcd282061849b5129bdea.tar.gz crupest-410bc000613b355ffcfbcd282061849b5129bdea.tar.bz2 crupest-410bc000613b355ffcfbcd282061849b5129bdea.zip | |
HALF WORK: for sync.
54 files changed, 1867 insertions, 111 deletions
| diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..457f44d --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ +    "python.analysis.typeCheckingMode": "basic" +}
\ No newline at end of file diff --git a/cspell.yaml b/cspell.yaml index 3bdca1f..7a1151c 100644 --- a/cspell.yaml +++ b/cspell.yaml @@ -12,7 +12,6 @@ dictionaries:    - fonts    - filetypes    - npm -  - crupest-words  dictionaryDefinitions:    - name: crupest-words diff --git a/template2/nginx/domain.conf.template b/template2/nginx/domain.conf.template new file mode 100644 index 0000000..7fa2d7a --- /dev/null +++ b/template2/nginx/domain.conf.template @@ -0,0 +1,19 @@ +server { +    listen 443 ssl http2; +    listen [::]:443 ssl http2; +    server_name ${DOMAIN}; + +${HTTPS_SEGMENT} +} + +server { +    listen 80; +    listen [::]:80; +    server_name ${DOMAIN}; + +${HTTP_SEGMENT} + +    location /.well-known/acme-challenge { +        root /srv/acme; +    } +} diff --git a/template2/nginx/global/client-max-body-size.conf b/template2/nginx/global/client-max-body-size.conf new file mode 100644 index 0000000..a2b1c00 --- /dev/null +++ b/template2/nginx/global/client-max-body-size.conf @@ -0,0 +1 @@ +client_max_body_size 5G; diff --git a/template2/nginx/global/forbid-unknown-domain.conf b/template2/nginx/global/forbid-unknown-domain.conf new file mode 100644 index 0000000..ae96393 --- /dev/null +++ b/template2/nginx/global/forbid-unknown-domain.conf @@ -0,0 +1,8 @@ +server { +    listen 80 default_server; +    listen [::]:80 default_server; +    listen 443 ssl http2 default_server; +    listen [::]:443 ssl http2 default_server; + +    return 444; +} diff --git a/template2/nginx/global/ssl.conf.template b/template2/nginx/global/ssl.conf.template new file mode 100644 index 0000000..ff70f5a --- /dev/null +++ b/template2/nginx/global/ssl.conf.template @@ -0,0 +1,17 @@ +# This file contains important security parameters. If you modify this file +# manually, Certbot will be unable to automatically provide future security +# updates. Instead, Certbot will print and log an error message with a path to +# the up-to-date file that you will need to refer to when manually updating +# this file. Contents are based on https://ssl-config.mozilla.org + +ssl_certificate /etc/letsencrypt/live/${ROOT_DOMAIN}/fullchain.pem; +ssl_certificate_key /etc/letsencrypt/live/${ROOT_DOMAIN}/privkey.pem; + +ssl_session_cache shared:le_nginx_SSL:10m; +ssl_session_timeout 1440m; +ssl_session_tickets off; + +ssl_protocols TLSv1.2 TLSv1.3; +ssl_prefer_server_ciphers off; + +ssl_ciphers "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384"; diff --git a/template2/nginx/global/websocket.conf b/template2/nginx/global/websocket.conf new file mode 100644 index 0000000..32af4c3 --- /dev/null +++ b/template2/nginx/global/websocket.conf @@ -0,0 +1,4 @@ +map $http_upgrade $connection_upgrade { +    default upgrade; +    ''      close; +} diff --git a/template2/nginx/http/444.segment b/template2/nginx/http/444.segment new file mode 100644 index 0000000..fe490d4 --- /dev/null +++ b/template2/nginx/http/444.segment @@ -0,0 +1,3 @@ +location / { +    return 444; +} diff --git a/template2/nginx/http/redirect-to-https.segment b/template2/nginx/http/redirect-to-https.segment new file mode 100644 index 0000000..56d095d --- /dev/null +++ b/template2/nginx/http/redirect-to-https.segment @@ -0,0 +1,3 @@ +location / { +    return 301 https://$host$request_uri; +} diff --git a/template2/nginx/https/redirect.segment.template b/template2/nginx/https/redirect.segment.template new file mode 100644 index 0000000..028f617 --- /dev/null +++ b/template2/nginx/https/redirect.segment.template @@ -0,0 +1,7 @@ +location = ${PATH} { +    return ${REDIRECT_CODE} ${REDIRECT_URL}; +} + +location ^${PATH}/(?<redirect_path>.*)$ { +    return ${REDIRECT_CODE} ${REDIRECT_URL}/$redirect_path; +} diff --git a/template2/nginx/https/reverse-proxy.segment.template b/template2/nginx/https/reverse-proxy.segment.template new file mode 100644 index 0000000..85a942e --- /dev/null +++ b/template2/nginx/https/reverse-proxy.segment.template @@ -0,0 +1,10 @@ +location ${PATH}/ { +    proxy_http_version 1.1; +    proxy_set_header Upgrade $http_upgrade; +    proxy_set_header Connection $connection_upgrade; +    proxy_set_header Host $host; +    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; +    proxy_set_header X-Forwarded-Proto $scheme; +    proxy_set_header X-Real-IP $remote_addr; +    proxy_pass http://${UPSTREAM}; +} diff --git a/template2/nginx/https/static-file.no-strip-prefix.segment.template b/template2/nginx/https/static-file.no-strip-prefix.segment.template new file mode 100644 index 0000000..4e829ba --- /dev/null +++ b/template2/nginx/https/static-file.no-strip-prefix.segment.template @@ -0,0 +1,3 @@ +location ${PATH}/ { +    root ${ROOT}; +} diff --git a/template2/nginx/https/static-file.segment.template b/template2/nginx/https/static-file.segment.template new file mode 100644 index 0000000..683cad3 --- /dev/null +++ b/template2/nginx/https/static-file.segment.template @@ -0,0 +1,3 @@ +location ${PATH}/ { +    alias ${ROOT}; +} diff --git a/template2/nginx/server.schema.json b/template2/nginx/server.schema.json new file mode 100644 index 0000000..a19c131 --- /dev/null +++ b/template2/nginx/server.schema.json @@ -0,0 +1,96 @@ +{ +    "$schema": "http://json-schema.org/draft-07/schema#", +    "definitions": { +        "RedirectService": { +            "properties": { +                "code": { +                    "type": "number" +                }, +                "path": { +                    "type": "string" +                }, +                "to": { +                    "type": "string" +                }, +                "type": { +                    "enum": [ +                        "redirect" +                    ], +                    "type": "string" +                } +            }, +            "type": "object" +        }, +        "ReverseProxyService": { +            "properties": { +                "path": { +                    "type": "string" +                }, +                "type": { +                    "enum": [ +                        "reverse-proxy" +                    ], +                    "type": "string" +                }, +                "upstream": { +                    "type": "string" +                } +            }, +            "type": "object" +        }, +        "StaticFileService": { +            "properties": { +                "no_strip_prefix": { +                    "type": "boolean" +                }, +                "path": { +                    "type": "string" +                }, +                "root": { +                    "type": "string" +                }, +                "type": { +                    "enum": [ +                        "static-file" +                    ], +                    "type": "string" +                } +            }, +            "type": "object" +        }, +        "SubDomain": { +            "properties": { +                "name": { +                    "type": "string" +                }, +                "services": { +                    "items": { +                        "anyOf": [ +                            { +                                "$ref": "#/definitions/RedirectService" +                            }, +                            { +                                "$ref": "#/definitions/StaticFileService" +                            }, +                            { +                                "$ref": "#/definitions/ReverseProxyService" +                            } +                        ] +                    }, +                    "type": "array" +                } +            }, +            "type": "object" +        } +    }, +    "properties": { +        "domains": { +            "items": { +                "$ref": "#/definitions/SubDomain" +            }, +            "type": "array" +        } +    }, +    "type": "object" +} + diff --git a/template2/nginx/server.ts b/template2/nginx/server.ts new file mode 100644 index 0000000..ffd64b7 --- /dev/null +++ b/template2/nginx/server.ts @@ -0,0 +1,66 @@ +// Used to generate json schema. + +// path should start with "/", end without "/" and contain no special characters in regex. +// the special case is root path "/", which is allowed. + +// For example: +// Given +//   path: /a/b +//   to: http://c.com/d +// Then (no_strip_prefix is false) +//   url: /a/b/c +//   redirect to: http://c.com/d/c (/a/b is removed) +// Note: +// Contrary to reverse proxy, you would always want to strip the prefix path. +// Because there is no meaning to redirect to the new page with the original path. +// If you want a domain-only redirect, just specify the path as "/". +export interface RedirectService { +  type: "redirect"; +  path: string; // must be a path, should start with "/", end without "/" +  to: string; // must be a url, should start with scheme (http:// or https://), end without "/" +  code?: number; // default to 307 +} + +// For example: +// Given +//   path: /a/b +//   root: /e/f +// Then (no_strip_prefix is false) +//   url: /a/b/c/d +//   file path: /e/f/c/d (/a/b is removed) +// Or (no_strip_prefix is true) +//   url: /a/b/c/d +//   file path: /e/f/a/b/c/d +export interface StaticFileService { +  type: "static-file"; +  path: string; // must be a path, should start with "/", end without "/" +  root: string; // must be a path (directory), should start with "/", end without "/" +  no_strip_prefix?: boolean; // default to false. If true, the path prefix is not removed from the url when finding the file. +} + +// For example: +// Given +//   path: /a/b +//   upstream: another-server:1234 +// Then +//   url: /a/b/c/d +//   proxy to: another-server:1234/a/b/c/d +// Note: +//   Contrary to redirect, you would always want to keep the prefix path. +//   Because the upstream server will mess up the path handling if the prefix is not kept. +export interface ReverseProxyService { +  type: "reverse-proxy"; +  path: string; // must be a path, should start with "/", end without "/" +  upstream: string; // should be a [host]:[port], like "localhost:1234" +} + +export type Service = RedirectService | StaticFileService | ReverseProxyService; + +export interface SubDomain { +  name: string; // @ for root domain +  services: Service[]; +} + +export interface Server { +  domains: SubDomain[]; +} diff --git a/template2/server.json.template b/template2/server.json.template new file mode 100644 index 0000000..22f1251 --- /dev/null +++ b/template2/server.json.template @@ -0,0 +1,58 @@ +{ +    "$schema": "./server.schema.json", +    "domains": [ +        { +            "name": "@", +            "services": [ +                { +                    "type": "static-file", +                    "path": "/", +                    "root": "/srv/www" +                }, +                { +                    "type": "redirect", +                    "path": "/github", +                    "to": "https://github.com/crupest" +                }, +                { +                    "type": "reverse-proxy", +                    "path": "/_${V2RAY_PATH}", +                    "upstream": "crupest-v2ray:10000" +                } +            ] +        }, +        { +            "name": "code", +            "services": [ +                { +                    "type": "reverse-proxy", +                    "path": "/", +                    "upstream": "code-server:8080" +                } +            ] +        }, +        { +            "name": "timeline", +            "services": [ +                { +                    "type": "reverse-proxy", +                    "path": "/", +                    "upstream": "timeline:5000" +                } +            ] +        }, +        { +            "name": "blog", +            "services": [ +                { +                    "type": "static-file", +                    "path": "/", +                    "root": "/srv/blog" +                } +            ] +        }, +        { +            "name": "mail" +        } +    ] +}
\ No newline at end of file diff --git a/tools/aio/modules/check.py b/tools/aio/modules/check.py deleted file mode 100644 index 2a082f6..0000000 --- a/tools/aio/modules/check.py +++ /dev/null @@ -1,20 +0,0 @@ -import sys -import re -from os.path import * - - -def check_python_version(required_version=(3, 10)): -    return sys.version_info < required_version - - -def check_ubuntu(): -    if not exists("/etc/os-release"): -        return False -    else: -        with open("/etc/os-release", "r") as f: -            content = f.read() -            if re.search(r"NAME=\"?Ubuntu\"?", content, re.IGNORECASE) is None: -                return False -            if re.search(r"VERSION_ID=\"?22.04\"?", content, re.IGNORECASE) is None: -                return False -    return True diff --git a/tools/aio/modules/path.py b/tools/aio/modules/path.py deleted file mode 100644 index ac969d3..0000000 --- a/tools/aio/modules/path.py +++ /dev/null @@ -1,30 +0,0 @@ -import os -import os.path - -script_dir = os.path.relpath(os.path.dirname(__file__)) -project_dir = os.path.normpath(os.path.join(script_dir, "../../../")) -project_abs_path = os.path.abspath(project_dir) -template_dir = os.path.join(project_dir, "template") -nginx_template_dir = os.path.join(template_dir, "nginx") -data_dir = os.path.join(project_dir, "data") -tool_dir = os.path.join(project_dir, "tools") -tmp_dir = os.path.join(project_dir, "tmp") -backup_dir = os.path.join(project_dir, "backup") -config_file_path = os.path.join(data_dir, "config") -nginx_config_dir = os.path.join(project_dir, "nginx-config") -log_dir = os.path.join(project_dir, "log") - - -def ensure_log_dir(): -    if not os.path.exists(log_dir): -        os.mkdir(log_dir) - - -def ensure_tmp_dir(): -    if not os.path.exists(tmp_dir): -        os.mkdir(tmp_dir) - - -def ensure_backup_dir(): -    if not os.path.exists(backup_dir): -        os.mkdir(backup_dir) diff --git a/tools/aio/.gitignore b/tools/cru-py/.gitignore index 259058c..259058c 100644 --- a/tools/aio/.gitignore +++ b/tools/cru-py/.gitignore diff --git a/tools/cru-py/.python-version b/tools/cru-py/.python-version new file mode 100644 index 0000000..37504c5 --- /dev/null +++ b/tools/cru-py/.python-version @@ -0,0 +1 @@ +3.11
 diff --git a/tools/cru-py/__init__.py b/tools/cru-py/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tools/cru-py/__init__.py diff --git a/tools/aio/aio b/tools/cru-py/aio index f74877a..f74877a 100755..100644 --- a/tools/aio/aio +++ b/tools/cru-py/aio diff --git a/tools/aio/aio.py b/tools/cru-py/aio.py index 970c389..d5386f1 100755..100644 --- a/tools/aio/aio.py +++ b/tools/cru-py/aio.py @@ -5,26 +5,26 @@ try:      import jsonschema      import cryptography  except ImportError: -    print("Some necessary modules can't be imported. Please run `pip install -r requirements.txt` to install them.") +    print("Some necessary crupest can't be imported. Please run `pip install -r requirements.txt` to install them.")      exit(1)  from os.path import *  import argparse  import subprocess -from rich.console import Console  from rich.prompt import Confirm -from modules.install_docker import * -from modules.path import * -from modules.nginx import * -from modules.config import * -from modules.check import * -from modules.backup import * -from modules.download_tools import * -from modules.test import * -from modules.dns import * -from modules.setup import * - -console = Console() +from crupest.install_docker import * +from crupest.path import * +from crupest.nginx import * +from crupest.config import * +from crupest.check import * +from crupest.backup import * +from crupest.download_tools import * +from crupest.test import * +from crupest.dns import * +from crupest.setup import * + +from crupest.tui import console +  parser = argparse.ArgumentParser(      description="Crupest server all-in-one setup script. Have fun play with it!") @@ -130,7 +130,7 @@ if args.yes:          default_text = ""          if default is not None:              default_text = "(y)" if default else "(n)" -        text = f"[prompt]{prompt}[/] [prompt.choices]\[y/n][/] [prompt.default]{default_text}[/]" +        text = f"[prompt]{prompt}[/] [prompt.choices]\\[y/n][/] [prompt.default]{default_text}[/]"          console.print(text)          return True @@ -159,6 +159,7 @@ def check_domain_is_defined():          return get_domain()      except Exception as e:          console.print(e.args[0], style="red") +        raise e  def git_update(): diff --git a/tools/cru-py/cru/__init__.py b/tools/cru-py/cru/__init__.py new file mode 100644 index 0000000..e36a778 --- /dev/null +++ b/tools/cru-py/cru/__init__.py @@ -0,0 +1,12 @@ +import sys + + +class CruInitError(Exception): +    pass + +def check_python_version(required_version=(3, 11)): +    if sys.version_info < required_version: +        raise CruInitError(f"Python version must be >= {required_version}!") + + +check_python_version() diff --git a/tools/cru-py/cru/attr.py b/tools/cru-py/cru/attr.py new file mode 100644 index 0000000..d07cc55 --- /dev/null +++ b/tools/cru-py/cru/attr.py @@ -0,0 +1,125 @@ +from collections.abc import Callable +from dataclasses import dataclass +from types import NoneType +from typing import Any +from copy import deepcopy + + +@dataclass +class CruAttr: +    name: str +    value: Any +    description: str + + +@dataclass +class CruAttrDef: +    name: str +    default_description: str +    allow_types: None | set[type] +    allow_none: bool +    default_value: Any +    transformer: Callable[[Any], Any] | None +    validator: Callable[[Any], None] + +    def __init__(self, name: str, default_description: str, *, +                 allow_types: list[type] | type | None, allow_none: bool, default_value: Any = None, +                 transformer: Callable[[Any], Any] | None = None, +                 validator: Callable[[Any], None] | None = None) -> None: +        self.name = name +        self.default_description = default_description +        self.default_value = default_value +        #TODO: CONTINUE TOMORROW +        if allow_types is None: +            allow_types = [] +        elif isinstance(allow_types, type): +            allow_types = [allow_types] +        else: +            for t in allow_types: +                if not isinstance(t, type): +                    raise TypeError(f"Invalid value of python type : {t}") +        self.allow_types = set(filter(lambda tt: tt is not NoneType, allow_types)) +        self.allow_none = allow_none +        self.transformer = transformer +        self.validator = validator +        self.default_value = self.transform_and_validate(self.default_value) +        self.default_value = deepcopy(self.default_value) + +    def transform(self, value: Any) -> Any: +        if self.transformer is not None: +            return self.transformer(value) +        return value + +    def validate(self, value: Any, /, override_allow_none: bool | None = None) -> None: +        allow_none = override_allow_none if override_allow_none is not None else self.allow_none +        if value is None and not allow_none: +            raise TypeError(f"None is not allowed!") +        if len(self.allow_types) != 0 and type(value) not in self.allow_types: +            raise TypeError(f"Type of {value} is not allowed!") +        if self.validator is not None: +            return self.validator(value) +        return None + +    def transform_and_validate(self, value: Any, /, override_allow_none: bool | None = None) -> Any: +        value = self.transform(value) +        self.validate(value, override_allow_none) +        return value + +    def make(self, value: Any, description: None | str = None) -> CruAttr: +        value = self.transform_and_validate(value) +        return CruAttr(self.name, value if value is not None else deepcopy(self.default_value), +                       description if description is not None else self.default_description) + + +class CruAttrDefRegistry: + +    def __init__(self) -> None: +        self._def_list = [] + +    @property +    def items(self) -> list[CruAttrDef]: +        return self._def_list + +    def register(self, def_: CruAttrDef): +        for i in self._def_list: +            if i.name == def_.name: +                raise ValueError(f"Attribute {def_.name} already exists!") +        self._def_list.append(def_) + +    def register_with(self, name: str, default_description: str, *, +                      allow_types: list[type] | type | None, allow_none: bool, +                      default_value: Any = None, +                      transformer: Callable[[Any], Any] | None = None, +                      validator: Callable[[Any], None] | None = None +                      ) -> CruAttrDef: +        def_ = CruAttrDef(name, default_description, default_value=default_value, allow_types=allow_types, +                          allow_none=allow_none, transformer=transformer, validator=validator) +        self.register(def_) +        return def_ + +    def register_required(self, name: str, default_description: str, *, +                      allow_types: list[type] | type | None, +                      default_value: Any = None, +                      transformer: Callable[[Any], Any] | None = None, +                      validator: Callable[[Any], None] | None = None +                      ) -> CruAttrDef: +        return self.register_with(name, default_description, default_value=default_value, allow_types=allow_types, +                          allow_none=False, transformer=transformer, validator=validator) + +    def register_optional(self, name: str, default_description: str, *, +                      allow_types: list[type] | type | None, +                      default_value: Any = None, +                      transformer: Callable[[Any], Any] | None = None, +                      validator: Callable[[Any], None] | None = None +                      ) -> CruAttrDef: +        return self.register_with(name, default_description, default_value=default_value, allow_types=allow_types, +                          allow_none=True, transformer=transformer, validator=validator) + +    def get_item_optional(self, name: str) -> CruAttrDef | None: +        for i in self._def_list: +            if i.name == name: +                return i +        return None + +    def __getitem__(self, item) -> CruAttrDef | None: +        return self.get_item_optional(item) diff --git a/tools/cru-py/cru/config.py b/tools/cru-py/cru/config.py new file mode 100644 index 0000000..b0c83d5 --- /dev/null +++ b/tools/cru-py/cru/config.py @@ -0,0 +1,128 @@ +from typing import Any, TypeVar, Generic + +from .excp import CruInternalLogicError +from .value import ValueType, ValueGenerator, ValidationError + +T = TypeVar("T") + + +class ConfigItem(Generic[T]): +    OptionalValueGenerator = ValueGenerator[T, []] | None + +    def __init__(self, name: str, description: str, value_type: ValueType[T], value: T | None, default_value: T, *, +                 value_generator: OptionalValueGenerator = None) -> None: +        self._name = name +        self._description = description +        self._value_type = value_type +        self._default_value = default_value +        self._value_generator = value_generator +        self._value: T | None = value + +    @property +    def name(self) -> str: +        return self._name + +    @property +    def description(self) -> str: +        return self._description + +    @property +    def value_type(self) -> ValueType[T]: +        return self._value_type + +    @property +    def default_value(self) -> T: +        return self._default_value + +    @property +    def is_default(self) -> bool: +        return self._value is None + +    @property +    def is_set(self) -> bool: +        return not self.is_default + +    @property +    def value(self) -> T: +        return self._value or self._default_value + +    def set_value(self, v: T | str, /, allow_convert_from_str=False): +        if allow_convert_from_str: +            self._value = self.value_type.check_value(v) +        else: +            self._value = self.value_type.check_value_or_try_convert_from_str(v) + +    @value.setter +    def value(self, v: T) -> None: +        self.set_value(v) + +    @property +    def value_generator(self) -> OptionalValueGenerator: +        return self._value_generator + +    def generate_value(self, allow_interactive=False) -> T | None: +        if self.value_generator is None: return None +        if self.value_generator.interactive and not allow_interactive: +            return None +        else: +            v = self.generate_value() +            try: +                self.value_type.check_value(v) +                return v +            except ValidationError as e: +                raise CruInternalLogicError("Config value generator returns invalid value.", name=self.name, inner=e) + +    def copy(self) -> "ConfigItem": +        return ConfigItem(self.name, self.description, self.value_type, +                          self._value.copy() if self._value is not None else None, self._default_value.copy(), +                          value_generator=self.value_generator) + + +class Configuration: +    def __init__(self, items: None | list[ConfigItem] = None) -> None: +        self._items: list[ConfigItem] = items or [] + +    @property +    def items(self) -> list[ConfigItem]: +        return self._items + +    @property +    def item_map(self) -> dict[str, ConfigItem]: +        return {i.name: i for i in self.items} + +    def get_optional_item(self, name: str) -> ConfigItem | None: +        for i in self.items: +            if i.name == name: +                return i +        return None + +    def clear(self) -> None: +        self._items.clear() + +    def has_item(self, name: str) -> bool: +        return self.get_optional_item(name) is not None + +    def add_item(self, item: ConfigItem): +        i = self.get_optional_item(item.name) +        if i is not None: +            raise CruInternalLogicError("Config item of the name already exists.", name=item.name) +        self.items.append(item) +        return item + +    def set_value(self, name: str, v: Any, /, allow_convert_from_str=False): +        i = self.get_optional_item(name) +        if i is None: +            raise CruInternalLogicError("No config item of the name. Can't set value.", name=name) +        i.set_value(v, allow_convert_from_str) + +    def copy(self) -> "Configuration": +        return Configuration([i.copy() for i in self.items]) + +    def __getitem__(self, name: str) -> ConfigItem: +        i = self.get_optional_item(name) +        if i is not None: +            return i +        raise CruInternalLogicError('No config item of the name.', name=name) + +    def __contains__(self, name: str): +        return self.has_item(name) diff --git a/tools/cru-py/cru/excp.py b/tools/cru-py/cru/excp.py new file mode 100644 index 0000000..5a5871b --- /dev/null +++ b/tools/cru-py/cru/excp.py @@ -0,0 +1,137 @@ +from collections.abc import Callable +from dataclasses import dataclass +from types import NoneType +from typing import Any + +from cru.attr import CruAttrDefRegistry + +CRU_EXCEPTION_ATTR_DEF_REGISTRY = CruAttrDefRegistry() + + +class CruException(Exception): +    @staticmethod +    def transform_inner(inner: Exception | list[Exception] | None): +        if inner is None: +            return None +        if isinstance(inner, Exception): +            return [inner] +        if isinstance(inner, list): +            return inner + +    @staticmethod +    def validate_inner(inner: list[Exception]): +        for i in inner: +            if not isinstance(i, Exception): +                raise TypeError(f"Invalid inner exception: {i}") + +    MESSAGE_DEF = CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_required("message", "Message describing the exception.", +                                                                allow_types=str, default_value="") +    INNER_DEF = CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_required("inner", "Inner exception.", +                                                  allow_types=list, default_value=[], +                                                  transformer=transform_inner, validator=validate_inner) +    INTERNAL_DEF = CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_required("internal", +                                                  "True if the exception is caused by wrong internal logic. False if it is caused by user's wrong input.", +                                                  allow_types=bool, default_value=False) +    CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_optional("name", "Name of the object that causes the exception.", +                                                  allow_types=str) +    CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_optional("value", "Value that causes the exception.", +                                                  allow_types=[]) +    CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with("path", "Path that causes the exception.",) +    CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with("type", "Python type related to the exception.") + +    def __init__(self, message: str, *args, +                 inner: Exception | list[Exception] | None = None, +                 internal: bool = False, +                 name: str | None = None, +                 value: Any | None = None, +                 path: str | None = None, +                 type_: type | None = None, +                 init_attrs: dict[str, Any] | None = None, +                 attrs: dict[str, Any] | None = None, **kwargs) -> None: +        super().__init__(message, *args) + +        self._attrs = { +            CruException.MESSAGE_KEY: message, +            CruException.INTERNAL_KEY: internal, +            CruException.INNER_KEY: inner, +            CruException.NAME_KEY: name, +            CruException.VALUE_KEY: value, +            CruException.PATH_KEY: path, +            CruException.TYPE_KEY: type_, +        } +        if init_attrs is not None: +            self._attrs.update(init_attrs) +        if attrs is not None: +            self._attrs.update(attrs) +        self._attrs.update(kwargs) + +    @property +    def message(self) -> str: +        return self[CruException.MESSAGE_KEY] + +    @property +    def internal(self) -> bool: +        return self[CruException.INTERNAL_KEY] + +    @property +    def inner(self) -> list[Exception]: +        return self[CruException.INNER_KEY] + +    @property +    def name(self) -> str | None: +        return self[CruException.NAME_KEY] + +    @property +    def value(self) -> Any | None: +        return self[CruException.VALUE_KEY] + +    @property +    def path(self) -> str | None: +        return self[CruException.PATH_KEY] + +    @property +    def type(self) -> type | None: +        return self[CruException.TYPE_KEY] + +    def _get_attr_list_recursive(self, name: str, depth: int, max_depth: int, l: list[Any]): +        if 0 < max_depth < depth + 1: +            return +        a = self._attrs.get(name, None) +        if a is not None: +            l.append(a) +        for i in self.inner: +            if isinstance(i, CruException): +                i._get_attr_list_recursive(name, depth + 1, max_depth, l) + +    def get_attr_list_recursive(self, name: str, /, max_depth: int = -1) -> list[Any]: +        l = [] +        self._get_attr_list_recursive(name, 0, max_depth, l) +        return l + +    def get_optional_attr(self, name: str, max_depth: int = -1) -> Any | None: +        l = self.get_attr_list_recursive(name, max_depth) +        return l[0] if len(l) > 0 else None + +    def __getitem__(self, name: str) -> Any | None: +        return self.get_optional_attr(name) + + +class CruInternalLogicError(CruException): +    def __init__(self, message: str, *args, **kwargs) -> None: +        super().__init__(message, *args, internal=True, **kwargs) + + +class UserFriendlyException(CruException): +    USER_MESSAGE_KEY = "user_message" + +    CRU_EXCEPTION_ATTR_DEF_REGISTRY.register( +        CruExceptionAttrDef(USER_MESSAGE_KEY, "Message describing the exception, but with user-friendly language.")) + +    def __init__(self, message: str, user_message: str | None = None, *args, **kwargs) -> None: +        if user_message is None: +            user_message = message +        super().__init__(message, *args, init_attrs={UserFriendlyException.USER_MESSAGE_KEY: user_message}, **kwargs) + +    @property +    def user_message(self) -> str: +        return self[UserFriendlyException.USER_MESSAGE_KEY] diff --git a/tools/cru-py/cru/parsing.py b/tools/cru-py/cru/parsing.py new file mode 100644 index 0000000..be7bbf4 --- /dev/null +++ b/tools/cru-py/cru/parsing.py @@ -0,0 +1,70 @@ +from abc import ABCMeta, abstractmethod
 +from typing import TypeVar, Generic, NoReturn, Callable
 +
 +from cru.excp import CruException, CRU_EXCEPTION_ATTR_DEF_REGISTRY
 +
 +R = TypeVar("R")
 +
 +
 +class ParseException(CruException):
 +    LINE_NUMBER_KEY = "line_number"
 +
 +    CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with(LINE_NUMBER_KEY, "Line number of the error.")
 +
 +
 +class Parser(Generic[R], metaclass=ABCMeta):
 +    def __init__(self, name: str) -> None:
 +        self._name = name
 +
 +    @property
 +    def name(self) -> str:
 +        return self._name
 +
 +    @abstractmethod
 +    def parse(self, s: str) -> R:
 +        raise NotImplementedError()
 +
 +    def raise_parse_exception(self, s: str, line_number: int | None = None) -> NoReturn:
 +        a = f" at line {line_number}" if line_number is not None else ""
 +        raise ParseException(f"Parser {self.name} failed{a}, {s}")
 +
 +
 +class SimpleLineConfigParser(Parser[list[tuple[str, str]]]):
 +    def __init__(self) -> None:
 +        super().__init__(type(self).__name__)
 +
 +    def _parse(self, s: str, f: Callable[[str, str], None]) -> None:
 +        for ln, line in enumerate(s.splitlines()):
 +            line_number = ln + 1
 +            # check if it's a comment
 +            if line.strip().startswith("#"):
 +                continue
 +            # check if there is a '='
 +            if line.find("=") == -1:
 +                self.raise_parse_exception(f"There is even no '='!", line_number)
 +            # split at first '='
 +            key, value = line.split("=", 1)
 +            key = key.strip()
 +            value = value.strip()
 +            f(key, value)
 +
 +    def parse(self, s: str) -> list[tuple[str, str]]:
 +        items = []
 +        self._parse(s, lambda key, value: items.append((key, value)))
 +        return items
 +
 +    def parse_to_dict(self, s: str, /, allow_override: bool = False) -> tuple[dict[str, str], list[tuple[str, str]]]:
 +        d = {}
 +        duplicate = []
 +
 +        def add(key: str, value: str) -> None:
 +            if key in d:
 +                if allow_override:
 +                    duplicate.append((key, d[key]))
 +                    d[key] = value
 +                else:
 +                    self.raise_parse_exception(f"Key '{key}' already exists!", None)
 +            d[key] = value
 +
 +        self._parse(s, add)
 +        return d, duplicate
 diff --git a/tools/cru-py/cru/paths.py b/tools/cru-py/cru/paths.py new file mode 100644 index 0000000..df5042b --- /dev/null +++ b/tools/cru-py/cru/paths.py @@ -0,0 +1,63 @@ +from pathlib import Path +import os + +from .excp import CruException + + +class ApplicationPathError(CruException): +    def __init__(self, message: str, p: str | Path, *args, **kwargs): +        super().__init__(message, *args, path=str(p), **kwargs) + + +def check_parents_dir(p: str | Path, /, must_exist: bool = False) -> bool: +    p = Path(p) if isinstance(p, str) else p +    for p in reversed(p.parents): +        if not p.exists() and not must_exist: +            return False +        if not p.is_dir(): +            raise ApplicationPathError("Parents path should be a dir.", p) +    return True + + +class ApplicationPath: +    def __init__(self, p: str | Path, is_dir: bool) -> None: +        self._path = Path(p) if isinstance(p, str) else p +        self._is_dir = is_dir + +    @property +    def path(self) -> Path: +        return self._path + +    @property +    def is_dir(self) -> bool: +        return self._is_dir + +    def check_parents(self, must_exist: bool = False) -> bool: +        return check_parents_dir(self._path.parent, must_exist) + +    def check_self(self, must_exist: bool = False) -> bool: +        if not self.check_parents(must_exist): +            return False +        if not self.path.exists(): +            if not must_exist: return False +            raise ApplicationPathError("Mot exist.", self.path) +        if self.is_dir: +            if not self.path.is_dir(): +                raise ApplicationPathError("Should be a directory, but not.", self.path) +            else: +                return False +        else: +            if not self.path.is_file(): +                raise ApplicationPathError("Should be a file, but not.", self.path) +            else: +                return False + +    def ensure(self, create_file: bool = False) -> None: +        e = self.check_self(False) +        if not e: +            os.makedirs(self.path.parent, exist_ok=True) +            if self.is_dir: +                os.mkdir(self.path) +            elif create_file: +                with open(self.path, "w") as f: +                    f.write("") diff --git a/tools/cru-py/cru/service/__init__.py b/tools/cru-py/cru/service/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tools/cru-py/cru/service/__init__.py diff --git a/tools/cru-py/cru/service/docker.py b/tools/cru-py/cru/service/docker.py new file mode 100644 index 0000000..42d4a35 --- /dev/null +++ b/tools/cru-py/cru/service/docker.py @@ -0,0 +1,15 @@ +import shutil + + +class DockerController: +    DOCKER_BIN_NAME = "docker" + +    def __init__(self, docker_bin: None | str = None) -> None: +        self._docker_bin = docker_bin + +    @property +    def docker_bin(self) -> str: +        if self._docker_bin is None: +            self._docker_bin = shutil.which(self.DOCKER_BIN_NAME) +        return self._docker_bin + diff --git a/tools/cru-py/cru/service/nginx.py b/tools/cru-py/cru/service/nginx.py new file mode 100644 index 0000000..9298623 --- /dev/null +++ b/tools/cru-py/cru/service/nginx.py @@ -0,0 +1,377 @@ +from typing import Literal, Any, cast, ClassVar +import os +from os.path import join, basename, dirname +import subprocess +import re +import json +import jsonschema + +from crupest.template2 import Template2 +from crupest.tui import Paths, UserFriendlyException, create_dir_if_not_exists, console, Confirm, ensure_dir +from crupest.ui_base import file_name_style + + +def restart_nginx(force=False) -> bool: +    if not force: +        p = subprocess.run(['docker', "container", "ls", +                            "-f", "name=nginx", "-q"], capture_output=True) +        container: str = p.stdout.decode("utf-8") +        if len(container.strip()) == 0: +            return False +    subprocess.run(['docker', 'restart', 'nginx']) +    return True + + +_server_schema_filename = "server.schema.json" + +with open(join(Paths.nginx2_template_dir, _server_schema_filename)) as f: +    server_json_schema = json.load(f) + + +_domain_template_filename = "domain.conf.template" + +NginxSourceFileType = Literal["global", "domain", "http", "https"] + + +class NginxSourceFile: +    def __init__(self, path: str) -> None: +        """ +        path: relative to nginx2_template_dir +        """ +        self._path = path +        is_template = path.endswith(".template") +        self._is_template = is_template +        filename = basename(path) +        self.name = filename[:-len(".template")] if is_template else filename +        if is_template: +            self._template = Template2.from_file( +                join(Paths.nginx2_template_dir, path)) +        else: +            with open(join(Paths.nginx2_template_dir, path)) as f: +                self._content = f.read() + +        self._scope: NginxSourceFileType = self._calc_scope() + +    @property +    def is_template(self) -> bool: +        return self._is_template + +    @property +    def content(self) -> str: +        if self._is_template: +            raise Exception(f"{self._path} is a template file") +        return self._content + +    @property +    def template(self) -> Template2: +        if not self._is_template: +            raise Exception(f"{self._path} is not a template file") +        return cast(Template2, self._template) + +    @property +    def global_target_filename(self) -> str: +        if self.scope != "global": +            raise Exception(f"{self._path} is not a global file") +        if self.is_template: +            return basename(self._path)[:-len(".template")] +        else: +            return basename(self._path) + +    def _calc_scope(self) -> NginxSourceFileType: +        f = basename(self._path) +        d = basename(dirname(self._path)) +        if f == _domain_template_filename: +            return "domain" +        elif d in ["global", "http", "https"]: +            return cast(Literal["global", "http", "https"], d) +        else: +            raise Exception(f"Unknown scope for {self._path}") + +    @property +    def scope(self) -> NginxSourceFileType: +        return self._scope + + +_domain_template_source = NginxSourceFile(_domain_template_filename) + +_client_max_body_size_source = NginxSourceFile( +    "global/client-max-body-size.conf") +_forbid_unknown_domain_source = NginxSourceFile( +    "global/forbid-unknown-domain.conf") +_ssl_template_source = NginxSourceFile("global/ssl.conf.template") +_websocket_source = NginxSourceFile("global/websocket.conf") + +_http_444_source = NginxSourceFile("http/444.segment") +_http_redirect_to_https_source = NginxSourceFile( +    "http/redirect-to-https.segment") + +_https_redirect_template_source = NginxSourceFile( +    "https/redirect.segment.template") +_https_reverse_proxy_template_source = NginxSourceFile( +    "https/reverse-proxy.segment.template") +_https_static_file_template_source = NginxSourceFile( +    "https/static-file.segment.template") +_https_static_file_no_strip_prefix_template_source = NginxSourceFile( +    "https/static-file.no-strip-prefix.segment.template") + + +class NginxService: +    def __init__(self, type: str, path: str) -> None: +        self.type = type +        self.path = path +        self._check_path(path) + +    @staticmethod +    def _check_path(path: str) -> None: +        assert isinstance(path, str) +        if path == "" or path == "/": +            return +        if not path.startswith("/"): +            raise UserFriendlyException("Service path should start with '/'.") +        if path.endswith("/"): +            raise UserFriendlyException( +                "Service path should not end with '/'.") + +    def generate_https_segment(self) -> str: +        raise NotImplementedError() + + +class NginxRedirectService(NginxService): +    def __init__(self, path: str, redirect_url: str, redirect_code: int = 307) -> None: +        if redirect_url.endswith("/"): +            raise UserFriendlyException( +                "Redirect URL should not end with '/'.") + +        super().__init__("redirect", path) + +        self.redirect_url = redirect_url +        self.redirect_code = redirect_code + +    def generate_https_segment(self) -> str: +        vars = { +            "PATH": self.path, +            "REDIRECT_CODE": self.redirect_code, +            "REDIRECT_URL": self.redirect_url +        } +        return _https_redirect_template_source.template.render(vars) + +    @staticmethod +    def from_json(json: dict[str, Any]) -> "NginxRedirectService": +        path = json["path"] +        redirect_url = json["to"] +        redirect_code = json.get("code", 307) +        assert isinstance(path, str) +        assert isinstance(redirect_url, str) +        assert isinstance(redirect_code, int) +        return NginxRedirectService(path, redirect_url, redirect_code) + + +class NginxReverseProxyService(NginxService): + +    _upstream_regex: ClassVar[re.Pattern[str]] = re.compile( +        r"^[-_0-9a-zA-Z]+:[0-9]+$") + +    def __init__(self, path: str, upstream: str) -> None: +        if not self._upstream_regex.match(upstream): +            raise UserFriendlyException( +                f"Invalid upstream format: {upstream}.") + +        super().__init__("reverse-proxy", path) + +        self.upstream = upstream + +    def generate_https_segment(self) -> str: +        vars = { +            "PATH": self.path, +            "UPSTREAM": self.upstream +        } +        return _https_reverse_proxy_template_source.template.render(vars) + +    @staticmethod +    def from_json(json: dict[str, Any]) -> "NginxReverseProxyService": +        path = json["path"] +        upstream = json["upstream"] +        assert isinstance(path, str) +        assert isinstance(upstream, str) +        return NginxReverseProxyService(path, upstream) + + +class NginxStaticFileService(NginxService): +    def __init__(self, path: str, root: str, no_strip_prefix: bool = False) -> None: +        super().__init__("static-file", path) + +        self.root = root +        self.no_strip_prefix = no_strip_prefix + +    def generate_https_segment(self) -> str: +        vars = { +            "PATH": self.path, +            "ROOT": self.root, +        } +        if self.no_strip_prefix: +            return _https_static_file_no_strip_prefix_template_source.template.render(vars) +        else: +            return _https_static_file_template_source.template.render(vars) + +    @staticmethod +    def from_json(json: dict[str, Any]) -> "NginxStaticFileService": +        path = json["path"] +        root = json["root"] +        no_strip_prefix = json.get("no_strip_prefix", False) +        assert isinstance(path, str) +        assert isinstance(root, str) +        assert isinstance(no_strip_prefix, bool) +        return NginxStaticFileService(path, root, no_strip_prefix) + + +def nginx_service_from_json(json: dict[str, Any]) -> NginxService: +    type = json["type"] +    if type == "redirect": +        return NginxRedirectService.from_json(json) +    elif type == "reverse-proxy": +        return NginxReverseProxyService.from_json(json) +    elif type == "static-file": +        return NginxStaticFileService.from_json(json) +    else: +        raise UserFriendlyException(f"Invalid crupest type: {type}.") + + +def _prepend_indent(text: str, indent: str = " " * 4) -> str: +    lines = text.split("\n") +    for i in range(len(lines)): +        if lines[i] != "": +            lines[i] = indent + lines[i] +    return "\n".join(lines) + + +class NginxDomain: +    def __init__(self, domain: str, services: list[NginxService] = []) -> None: +        self.domain = domain +        self.services = services + +    def add_service(self, service: NginxService) -> None: +        self.services.append(service) + +    def generate_http_segment(self) -> str: +        if len(self.services) == 0: +            return _http_444_source.content +        else: +            return _http_redirect_to_https_source.content + +    def generate_https_segment(self) -> str: +        return "\n\n".join([s.generate_https_segment() for s in self.services]) + +    def generate_config(self) -> str: +        vars = { +            "DOMAIN": self.domain, +            "HTTP_SEGMENT": _prepend_indent(self.generate_http_segment()), +            "HTTPS_SEGMENT": _prepend_indent(self.generate_https_segment()), +        } +        return _domain_template_source.template.render(vars) + +    def generate_config_file(self, path: str) -> None: +        with open(path, "w") as f: +            f.write(self.generate_config()) + +    @staticmethod +    def from_json(root_domain: str, json: dict[str, Any]) -> "NginxDomain": +        name = json["name"] +        assert isinstance(name, str) +        if name == "@" or name == "": +            domain = root_domain +        else: +            domain = f"{name}.{root_domain}" +        assert isinstance(json["services"], list) +        services = [nginx_service_from_json(s) for s in json["services"]] +        return NginxDomain(domain, services) + + +def check_nginx_config_schema(json: Any) -> None: +    jsonschema.validate(json, server_json_schema) + + +class NginxServer: +    def __init__(self, root_domain: str) -> None: +        self.root_domain = root_domain +        self.domains: list[NginxDomain] = [] + +    def add_sub_domain(self, sub_domain: str, services: list[NginxService]) -> None: +        if sub_domain == "" or sub_domain == "@": +            domain = self.root_domain +        else: +            domain = f"{sub_domain}.{self.root_domain}" +        self.domains.append(NginxDomain(domain, services)) + +    def generate_ssl(self) -> str: +        return _ssl_template_source.template.render({ +            "ROOT_DOMAIN": self.root_domain +        }) + +    def generate_global_files(self, d: str) -> None: +        for source in [_client_max_body_size_source, _forbid_unknown_domain_source, _websocket_source]: +            with open(join(d, source.name), "w") as f: +                f.write(source.content) +        with open(join(d, _ssl_template_source.name), "w") as f: +            f.write(self.generate_ssl()) + +    def generate_domain_files(self, d: str) -> None: +        for domain in self.domains: +            domain.generate_config_file(join(d, f"{domain.domain}.conf")) + +    def generate_config(self, d: str) -> None: +        create_dir_if_not_exists(d) +        self.generate_global_files(d) + +    def get_allowed_files(self) -> list[str]: +        files = [] +        for source in [_client_max_body_size_source, _forbid_unknown_domain_source, _ssl_template_source, _websocket_source]: +            files.append(source.name) +        for domain in self.domains: +            files.append(f"{domain.domain}.conf") +        return files + +    def check_bad_files(self, d: str) -> list[str]: +        allowed_files = self.get_allowed_files() +        bad_files = [] +        if not ensure_dir(d, must_exist=False): +            return [] +        for path in os.listdir(d): +            if path not in allowed_files: +                bad_files.append(path) +        return bad_files + +    @staticmethod +    def from_json(root_domain: str, json: dict[str, Any]) -> "NginxServer": +        check_nginx_config_schema(json) +        server = NginxServer(root_domain) +        sub_domains = json["domains"] +        assert isinstance(sub_domains, list) +        server.domains = [NginxDomain.from_json( +            root_domain, d) for d in sub_domains] +        return server + +    @staticmethod +    def from_json_str(root_domain: str, json_str: str) -> "NginxServer": +        return NginxServer.from_json(root_domain, json.loads(json_str)) + +    def go(self): +        bad_files = self.check_bad_files(Paths.nginx_generated_dir) +        if len(bad_files) > 0: +            console.print( +                "WARNING: It seems there are some bad conf files in the nginx config directory:", style="yellow") +            for bad_file in bad_files: +                console.print(bad_file, style=file_name_style) +            to_delete = Confirm.ask( +                "They will affect nginx in a [red]bad[/] way. Do you want to delete them?", default=True, console=console) +            if to_delete: +                for file in bad_files: +                    os.remove(join(Paths.nginx_generated_dir, file)) +        create_dir_if_not_exists(Paths.generated_dir) +        if not ensure_dir(Paths.nginx_generated_dir, must_exist=False): +            os.mkdir(Paths.nginx_generated_dir) +            console.print( +                f"Nginx config directory created at [magenta]{Paths.nginx_generated_dir}[/]", style="green") +        self.generate_config(Paths.nginx_generated_dir) +        console.print("Nginx config generated.", style="green") +        if restart_nginx(): +            console.print('Nginx restarted.', style="green") diff --git a/tools/cru-py/cru/system.py b/tools/cru-py/cru/system.py new file mode 100644 index 0000000..2e05cd1 --- /dev/null +++ b/tools/cru-py/cru/system.py @@ -0,0 +1,22 @@ +import re +import os.path + + +def check_debian_derivative_version(name: str) -> None | str: +    if not os.path.isfile("/etc/os-release"): +        return None +    with open("/etc/os-release", "r") as f: +        content = f.read() +        if not f"ID={name}" in content: +            return None +        m = re.search(r'VERSION_ID="(.+)"', content) +        if m is None: return None +        return m.group(1) + + +def check_ubuntu_version() -> None | str: +    return check_debian_derivative_version("ubuntu") + + +def check_debian_version() -> None | str: +    return check_debian_derivative_version("debian") diff --git a/tools/cru-py/cru/value.py b/tools/cru-py/cru/value.py new file mode 100644 index 0000000..cddbde9 --- /dev/null +++ b/tools/cru-py/cru/value.py @@ -0,0 +1,309 @@ +import random +import secrets +import string +import uuid +from abc import abstractmethod, ABCMeta +from collections.abc import Mapping, Callable +from typing import Any, ClassVar, Literal, TypeVar, Generic, ParamSpec + +from .excp import CruInternalLogicError, CruException, CRU_EXCEPTION_ATTR_DEF_REGISTRY + + +def _str_case_in(s: str, case: bool, l: list[str]) -> bool: +    if case: +        return s in l +    else: +        return s.lower() in [s.lower() for s in l] + + +_ValueTypeForward = type["ValueType"] + +T = TypeVar("T") + + +class _ValueErrorMixin: +    VALUE_TYPE_KEY = "value_type" + +    CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with( +        VALUE_TYPE_KEY, +        "The type of the value that causes the exception." +    ) + + +class ValidationError(CruException, _ValueErrorMixin): +    def __init__(self, message: str, value: Any, value_type: _ValueTypeForward[T] | None, *args, **kwargs): +        super().__init__(message, *args, value=value, type_=value_type.type, init_attrs={ +            ValidationError.VALUE_TYPE_KEY: value_type, +        }, **kwargs) + +    @property +    def value_type(self) -> _ValueTypeForward[T] | None: +        return self[ValidationError.VALUE_TYPE_KEY] + + +class ValueStringConvertionError(CruException, _ValueErrorMixin): +    def __init__(self, message: str, value: Any, value_type: _ValueTypeForward[T] | None, *args, +                 **kwargs): +        super().__init__(message, *args, value=value, type_=value_type.type, init_attrs={ +            ValueStringConvertionError.VALUE_TYPE_KEY: value_type, +        }, **kwargs) + +    @property +    def value_type(self) -> _ValueTypeForward[T] | None: +        return self[ValueStringConvertionError.VALUE_TYPE_KEY] + + +class ValueType(Generic[T], metaclass=ABCMeta): +    @staticmethod +    def case_sensitive_to_str(case_sensitive: bool) -> str: +        return f"case-{'' if case_sensitive else 'in'}sensitive" + +    def __init__(self, name: str) -> None: +        self._name = name +        self._type = type("T") + +    @property +    def name(self) -> str: +        return self._name + +    @property +    def type(self) -> type: +        return self._type + +    def is_instance_of_value_type(self, value: Any) -> bool: +        return isinstance(value, self.type) + +    def _do_check_value(self, value: Any) -> tuple[True, T] | tuple[False, None | str]: +        return True, value + +    def check_value(self, value: Any) -> T: +        if not isinstance(value, self.type): +            raise ValidationError("Value type is wrong.", value, self) +        ok, v_or_err = self._do_check_value(value) +        if ok: +            return v_or_err +        else: +            raise ValidationError(v_or_err or "Value is not valid.", value, self) + +    @abstractmethod +    def _do_check_str_format(self, s: str) -> bool | tuple[bool, str]: +        """ +        Return None for no error. Otherwise, return error message. +        """ +        raise NotImplementedError() + +    def check_str_format(self, s: str) -> None: +        ok, err = self._do_check_str_format(s) +        if ok is None: raise CruInternalLogicError("_do_check_str_format should not return None.") +        if ok: return +        if err is None: +            err = "Invalid value str format." +        raise ValueStringConvertionError(err, s, value_type=self) + +    @abstractmethod +    def _do_convert_value_to_str(self, value: T) -> str: +        raise NotImplementedError() + +    def convert_value_to_str(self, value: T) -> str: +        self.check_value(value) +        return self._do_convert_value_to_str(value) + +    @abstractmethod +    def _do_convert_str_to_value(self, s: str) -> T: +        raise NotImplementedError() + +    def convert_str_to_value(self, s: str) -> T: +        self.check_str_format(s) +        return self._do_convert_str_to_value(s) + +    def check_value_or_try_convert_from_str(self, value_or_str: Any) -> T: +        try: +            return self.check_value(value_or_str) +        except ValidationError as e: +            if isinstance(value_or_str, str): +                return self.convert_str_to_value(value_or_str) +            else: +                raise ValidationError("Value is not valid and is not a str.", value_or_str, self, +                                      inner=e) + + +class TextValueType(ValueType[str]): +    def __init__(self) -> None: +        super().__init__("text") + +    def _do_check_str_format(self, s): +        return True + +    def _do_convert_value_to_str(self, value): +        return value + +    def _do_convert_str_to_value(self, s): +        return s + + +class IntegerValueType(ValueType[int]): + +    def __init__(self) -> None: +        super().__init__("integer") + +    def _do_check_str_format(self, s): +        try: +            int(s) +            return True +        except ValueError: +            return False + +    def _do_convert_value_to_str(self, value): +        return str(value) + +    def _do_convert_str_to_value(self, s): +        return int(s) + + +class FloatValueType(ValueType[float]): +    def __init__(self) -> None: +        super().__init__("float") + +    def _do_check_str_format(self, s): +        try: +            float(s) +            return True +        except ValueError: +            return False + +    def _do_convert_value_to_str(self, value): +        return str(value) + +    def _do_convert_str_to_value(self, s): +        return float(s) + + +class BooleanValueType(ValueType[bool]): +    DEFAULT_TRUE_LIST: ClassVar[list[str]] = ["true", "yes", "y", "on", "1"] +    DEFAULT_FALSE_LIST: ClassVar[list[str]] = ["false", "no", "n", "off", "0"] + +    def __init__(self, *, case_sensitive=False, true_list: None | list[str] = None, +                 false_list: None | list[str] = None) -> None: +        super().__init__("boolean") +        self._case_sensitive = case_sensitive +        self._valid_true_strs: list[str] = true_list or BooleanValueType.DEFAULT_TRUE_LIST +        self._valid_false_strs: list[str] = false_list or BooleanValueType.DEFAULT_FALSE_LIST + +    @property +    def case_sensitive(self) -> bool: +        return self._case_sensitive + +    @property +    def valid_true_strs(self) -> list[str]: +        return self._valid_true_strs + +    @property +    def valid_false_strs(self) -> list[str]: +        return self._valid_false_strs + +    @property +    def valid_boolean_strs(self) -> list[str]: +        return self._valid_true_strs + self._valid_false_strs + +    def _do_check_str_format(self, s): +        if _str_case_in(s, self.case_sensitive, self.valid_boolean_strs): return True +        return False, f"Not a valid boolean string ({ValueType.case_sensitive_to_str(self.case_sensitive)}). Valid string of true: {' '.join(self._valid_true_strs)}. Valid string of false: {' '.join(self._valid_false_strs)}. All is case insensitive." + +    def _do_convert_value_to_str(self, value): +        return "True" if value else "False" + +    def _do_convert_str_to_value(self, s): +        return _str_case_in(s, self.case_sensitive, self._valid_true_strs) + + +class EnumValueType(ValueType[str]): +    def __init__(self, valid_values: list[str], /, case_sensitive=False) -> None: +        s = ' | '.join([f'"{v}"' for v in valid_values]) +        self._valid_value_str = f'[ {s} ]' +        super().__init__(f"enum{self._valid_value_str}") +        self._case_sensitive = case_sensitive +        self._valid_values = valid_values + +    @property +    def case_sensitive(self) -> bool: +        return self._case_sensitive + +    @property +    def valid_values(self) -> list[str]: +        return self._valid_values + +    def _do_check_value(self, value): +        ok, err = self._do_check_str_format(value) +        return ok, (value if ok else err) + +    def _do_check_str_format(self, s): +        if _str_case_in(s, self.case_sensitive, self.valid_values): return True +        return False, f"Value is not in valid values ({ValueType.case_sensitive_to_str(self.case_sensitive)}): {self._valid_value_str}" + +    def _do_convert_value_to_str(self, value): +        return value + +    def _do_convert_str_to_value(self, s): +        return s + + +TEXT_VALUE_TYPE = TextValueType() +INTEGER_VALUE_TYPE = IntegerValueType() +BOOLEAN_VALUE_TYPE = BooleanValueType() + +P = ParamSpec('P') + + +class ValueGenerator(Generic[T, P]): +    INTERACTIVE_KEY: ClassVar[Literal["interactive"]] = "interactive" + +    def __init__(self, f: Callable[P, T], /, attributes: None | Mapping[str, Any] = None) -> None: +        self._f = f +        self._attributes = attributes or {} + +    @property +    def f(self) -> Callable[P, T]: +        return self._f + +    @property +    def attributes(self) -> Mapping[str, Any]: +        return self._attributes + +    def generate(self, *args, **kwargs) -> T: +        return self._f(*args, **kwargs) + +    def __call__(self, *args, **kwargs): +        return self._f(*args, **kwargs) + +    @property +    def interactive(self) -> bool: +        return self._attributes.get(ValueGenerator.INTERACTIVE_KEY, False) + +    @staticmethod +    def create_interactive(f: Callable[P, T], interactive: bool = True, /, +                           attributes: None | Mapping[str, Any] = None) -> "ValueGenerator[T, P]": +        return ValueGenerator(f, dict({ValueGenerator.INTERACTIVE_KEY: interactive}, **(attributes or {}))) + + +class UuidValueGenerator(ValueGenerator[str, []]): +    def __init__(self) -> None: +        super().__init__(lambda: str(uuid.uuid4())) + + +class RandomStringValueGenerator(ValueGenerator[str, []]): +    @staticmethod +    def _create_generate_ramdom_func(length: int, secure: bool) -> Callable[str, []]: +        random_choice = secrets.choice if secure else random.choice + +        def generate_random_string(): +            characters = string.ascii_letters + string.digits +            random_string = ''.join(random_choice(characters) for _ in range(length)) +            return random_string + +        return generate_random_string + +    def __init__(self, length: int, secure: bool) -> None: +        super().__init__(RandomStringValueGenerator._create_generate_ramdom_func(length, secure)) + + +UUID_VALUE_GENERATOR = UuidValueGenerator() diff --git a/tools/cru-py/crupest/__init__.py b/tools/cru-py/crupest/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tools/cru-py/crupest/__init__.py diff --git a/tools/cru-py/crupest/__main__.py b/tools/cru-py/crupest/__main__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tools/cru-py/crupest/__main__.py diff --git a/tools/aio/modules/backup.py b/tools/cru-py/crupest/backup.py index 7921d0d..7921d0d 100644 --- a/tools/aio/modules/backup.py +++ b/tools/cru-py/crupest/backup.py diff --git a/tools/cru-py/crupest/certbot.py b/tools/cru-py/crupest/certbot.py new file mode 100644 index 0000000..8c89fa7 --- /dev/null +++ b/tools/cru-py/crupest/certbot.py @@ -0,0 +1,119 @@ +from typing import Literal, cast +import os +from os.path import join +import subprocess +from cryptography.x509 import load_pem_x509_certificate, DNSName, SubjectAlternativeName +from cryptography.x509.oid import ExtensionOID +from .tui import Paths, ensure_file, create_dir_if_not_exists, console + +CertbotAction = Literal['create', 'expand', 'shrink', 'renew'] + + +class Certbot: +    def __init__(self, root_domain: str, subdomains: list[str]) -> None: +        """ +        subdomain: like ["a", "b.c", ...] +        """ +        self.root_domain = root_domain +        self.subdomains = subdomains +        self.domains = [ +            root_domain, *[f"{subdomain}.{root_domain}" for subdomain in subdomains]] + +    def generate_command(self, action: CertbotAction, /, test=False, no_docker=False, *, standalone=None, email=None, agree_tos=False) -> str: +        add_domain_option = True +        if action == 'create': +            if standalone == None: +                standalone = True +            certbot_action = "certonly" +        elif action == 'expand' or action == 'shrink': +            if standalone == None: +                standalone = False +            certbot_action = "certonly" +        elif action == 'renew': +            if standalone == None: +                standalone = False +            add_domain_option = False +            certbot_action = "renew" +        else: +            raise ValueError('Invalid action') + +        if no_docker: +            command = "certbot " +        else: +            expose_segment = ' -p "0.0.0.0:80:80"' +            web_root_segment = f' -v "{Paths.project_abs_path}/data/certbot/webroot:/var/www/certbot"' +            command = f'docker run -it --rm --name certbot -v "{Paths.project_abs_path}/data/certbot/certs:/etc/letsencrypt" -v "{Paths.project_abs_path}/data/certbot/data:/var/lib/letsencrypt"{ expose_segment if  standalone else web_root_segment} certbot/certbot ' + +        command += certbot_action + +        if standalone: +            command += " --standalone" +        else: +            command += ' --webroot -w /var/www/certbot' + +        if add_domain_option: +            command += f' -d {" -d ".join(self.domains)}' + +        if email is not None: +            command += f' --email {email}' + +        if agree_tos: +            command += ' --agree-tos' + +        if test: +            command += " --test-cert --dry-run" + +        return command + +    def get_cert_path(self) -> str: +        return join(Paths.data_dir, "certbot", "certs", "live", self.root_domain, "fullchain.pem") + +    def get_cert_actual_domains(self, cert_path: str | None = None) -> None | list[str]: +        if cert_path is None: +            cert_path = self.get_cert_path() + +        if not ensure_file(cert_path): +            return None + +        with open(cert_path, 'rb') as f: +            cert = load_pem_x509_certificate(f.read()) +            ext = cert.extensions.get_extension_for_oid( +                ExtensionOID.SUBJECT_ALTERNATIVE_NAME) +            domains: list[str] = cast( +                SubjectAlternativeName, ext.value).get_values_for_type(DNSName) + +            # This weird code is to make sure the root domain is the first one +            if self.root_domain in domains: +                domains.remove(self.root_domain) +                domains = [self.root_domain, *domains] + +            return domains + +    def print_create_cert_message(self): +        console.print( +            "Looks like you haven't run certbot to get the init ssl certificates. You may want to run following code to get one:", style="cyan") +        console.print(self.generate_command("create"), +                      soft_wrap=True, highlight=False) + +    def check_ssl_cert(self, tmp_dir: str = Paths.tmp_dir): +        cert_path = self.get_cert_path() +        tmp_cert_path = join(tmp_dir, "fullchain.pem") +        console.print("Temporarily copy cert to tmp...", style="yellow") +        create_dir_if_not_exists(tmp_dir) +        subprocess.run( +            ["sudo", "cp", cert_path, tmp_cert_path], check=True) +        subprocess.run(["sudo", "chown", str( +            os.geteuid()), tmp_cert_path], check=True) +        cert_domains = self.get_cert_actual_domains(tmp_cert_path) +        if cert_domains is None: +            self.print_create_cert_message() +        else: +            cert_domain_set = set(cert_domains) +            domains = set(self.domains) +            if not cert_domain_set == domains: +                console.print( +                    "Cert domains are not equal to host domains. Run following command to recreate it with nginx stopped.", style="red") +                console.print(self.generate_command( +                    "create", standalone=True), soft_wrap=True, highlight=False) +            console.print("Remove tmp cert...", style="yellow") +            os.remove(tmp_cert_path) diff --git a/tools/aio/modules/config.py b/tools/cru-py/crupest/config.py index 4faa8a3..7a63e2a 100644 --- a/tools/aio/modules/config.py +++ b/tools/cru-py/crupest/config.py @@ -3,23 +3,36 @@ import typing  import uuid  import random  import string +from dataclasses import dataclass +  from rich.prompt import Prompt + +from cru.config import Configuration +from cru.parsing import SimpleLineConfigParser  from .path import config_file_path -def generate_uuid(): -    return str(uuid.uuid4()) -# generate random characters of digits and alphabets -def generate_random_string(length: int): -    characters = string.ascii_letters + string.digits -    random_string = ''.join(random.choice(characters) for _ in range(length)) -    return random_string +@dataclass +class ConfigurationMigrationInfo: +    duplicate_item_in_old_config: list[str] +    item + + +class OldConfiguration: +    def __init__(self, items: None | dict[str, str] = None) -> None: +        self._items = items or {} + +    @staticmethod +    def load_from_str(s: str) -> tuple["OldConfiguration", list[str, str]]: +        d, duplicate = SimpleLineConfigParser().parse_to_dict(s, True) +        return OldConfiguration(d), duplicate + +    def convert_to_new_config(self) -> Configuration: -def generate_random_string_32(): -    return generate_random_string(32)  class ConfigVar: -    def __init__(self, name: str, description: str, default_value_generator: typing.Callable[[], str] | str, /, default_value_for_ask=str | None): +    def __init__(self, name: str, description: str, default_value_generator: typing.Callable[[], str] | str, /, +                 default_value_for_ask=str | None):          """Create a config var.          Args: @@ -45,17 +58,22 @@ config_var_list: list = [      ConfigVar("CRUPEST_EMAIL", "admin email address",                "Please input your email address"),      ConfigVar("CRUPEST_AUTO_BACKUP_COS_SECRET_ID", -              "access key id for Tencent COS, used for auto backup", "Please input your Tencent COS access key id for backup"), +              "access key id for Tencent COS, used for auto backup", +              "Please input your Tencent COS access key id for backup"),      ConfigVar("CRUPEST_AUTO_BACKUP_COS_SECRET_KEY", -              "access key secret for Tencent COS, used for auto backup", "Please input your Tencent COS access key for backup"), +              "access key secret for Tencent COS, used for auto backup", +              "Please input your Tencent COS access key for backup"),      ConfigVar("CRUPEST_AUTO_BACKUP_COS_REGION", -              "region for Tencent COS, used for auto backup", "Please input your Tencent COS region for backup", "ap-hongkong"), +              "region for Tencent COS, used for auto backup", "Please input your Tencent COS region for backup", +              "ap-hongkong"),      ConfigVar("CRUPEST_AUTO_BACKUP_BUCKET_NAME", -              "bucket name for Tencent COS, used for auto backup", "Please input your Tencent COS bucket name for backup"), +              "bucket name for Tencent COS, used for auto backup", +              "Please input your Tencent COS bucket name for backup"),      ConfigVar("CRUPEST_GITHUB_USERNAME",                "github username for fetching todos", "Please input your github username for fetching todos", "crupest"),      ConfigVar("CRUPEST_GITHUB_PROJECT_NUMBER", -              "github project number for fetching todos", "Please input your github project number for fetching todos", "2"), +              "github project number for fetching todos", "Please input your github project number for fetching todos", +              "2"),      ConfigVar("CRUPEST_GITHUB_TOKEN",                "github token for fetching todos", "Please input your github token for fetching todos"),      ConfigVar("CRUPEST_GITHUB_TODO_COUNT", @@ -81,7 +99,7 @@ config_var_list: list = [  config_var_name_set = set([config_var.name for config_var in config_var_list]) -def check_config_var_set(needed_config_var_set: set): +def check_config_var_set(needed_config_var_set: set[str]) -> tuple[bool, list[str], list[str]]:      more = []      less = []      for var_name in needed_config_var_set: @@ -94,35 +112,17 @@ def check_config_var_set(needed_config_var_set: set):  def config_file_exists(): -    return os.path.isfile(config_file_path) - - -def parse_config(str: str) -> dict: -    config = {} -    for line_number, line in enumerate(str.splitlines()): -        # check if it's a comment -        if line.startswith("#"): -            continue -        # check if there is a '=' -        if line.find("=") == -1: -            raise ValueError( -                f"Invalid config string. Please check line {line_number + 1}. There is even no '='!") -        # split at first '=' -        key, value = line.split("=", 1) -        key = key.strip() -        value = value.strip() -        config[key] = value -    return config +    return ensure_file(Paths.config_file_path, must_exist=False) + + +def parse_config(str: str) -> dict[str, str]: +    return ConfigMap().load_from_str(str).to_dict()  def get_domain() -> str: -    if not config_file_exists(): +    if configuration is None:          raise ValueError("Config file not found!") -    with open(config_file_path) as f: -        config = parse_config(f.read()) -    if "CRUPEST_DOMAIN" not in config: -        raise ValueError("Domain not found in config file!") -    return config["CRUPEST_DOMAIN"] +    return configuration.get_domain()  def config_to_str(config: dict) -> str: diff --git a/tools/aio/modules/dns.py b/tools/cru-py/crupest/dns.py index 5006d5f..5006d5f 100644 --- a/tools/aio/modules/dns.py +++ b/tools/cru-py/crupest/dns.py diff --git a/tools/aio/modules/download_tools.py b/tools/cru-py/crupest/download_tools.py index beb06d4..beb06d4 100644 --- a/tools/aio/modules/download_tools.py +++ b/tools/cru-py/crupest/download_tools.py diff --git a/tools/aio/modules/helper.py b/tools/cru-py/crupest/helper.py index f8fe34a..f8fe34a 100644 --- a/tools/aio/modules/helper.py +++ b/tools/cru-py/crupest/helper.py diff --git a/tools/aio/modules/install_docker.py b/tools/cru-py/crupest/install_docker.py index ac50290..ac50290 100644 --- a/tools/aio/modules/install_docker.py +++ b/tools/cru-py/crupest/install_docker.py diff --git a/tools/aio/modules/nginx.py b/tools/cru-py/crupest/nginx.py index f69c5df..1ec5c6b 100755..100644 --- a/tools/aio/modules/nginx.py +++ b/tools/cru-py/crupest/nginx.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python3 - +from typing import cast  import json  import jsonschema  import os @@ -210,7 +209,8 @@ def get_cert_domains(cert_path, root_domain):          cert = load_pem_x509_certificate(f.read())          ext = cert.extensions.get_extension_for_oid(              ExtensionOID.SUBJECT_ALTERNATIVE_NAME) -        domains: list = ext.value.get_values_for_type(DNSName) +        domains: list[str] = cast( +                SubjectAlternativeName, ext.value).get_values_for_type(DNSName)          domains.remove(root_domain)          domains = [root_domain, *domains]          return domains @@ -227,7 +227,6 @@ def check_ssl_cert(domain, console):      cert_path = get_cert_path(domain)      tmp_cert_path = join(tmp_dir, "fullchain.pem")      console.print("Temporarily copy cert to tmp...", style="yellow") -    ensure_tmp_dir()      subprocess.run(          ["sudo", "cp", cert_path, tmp_cert_path], check=True)      subprocess.run(["sudo", "chown", str(os.geteuid()), diff --git a/tools/cru-py/crupest/path.py b/tools/cru-py/crupest/path.py new file mode 100644 index 0000000..0cfcfb8 --- /dev/null +++ b/tools/cru-py/crupest/path.py @@ -0,0 +1,57 @@ +import os +import os.path + +script_dir = os.path.relpath(os.path.dirname(__file__)) +project_dir = os.path.normpath(os.path.join(script_dir, "../../../")) +project_abs_path = os.path.abspath(project_dir) +template_dir = os.path.join(project_dir, "template") +nginx_template_dir = os.path.join(template_dir, "nginx") +data_dir = os.path.join(project_dir, "data") +tool_dir = os.path.join(project_dir, "tools") +tmp_dir = os.path.join(project_dir, "tmp") +backup_dir = os.path.join(project_dir, "backup") +config_file_path = os.path.join(data_dir, "config") +nginx_config_dir = os.path.join(project_dir, "nginx-config") +log_dir = os.path.join(project_dir, "log") + + +def ensure_file(path: str, /, must_exist: bool = True) -> bool: +    if must_exist and not os.path.exists(path): +        raise Exception(f"File {path} does not exist!") +    if not os.path.exists(path): +        return False +    if not os.path.isfile(path): +        raise Exception(f"{path} is not a file!") +    return True + + +def ensure_dir(path: str, /, must_exist: bool = True) -> bool: +    if must_exist and not os.path.exists(path): +        raise Exception(f"Directory {path} does not exist!") +    if not os.path.exists(path): +        return False +    if not os.path.isdir(path): +        raise Exception(f"{path} is not a directory!") +    return True + + +class Paths: +    script_dir = os.path.relpath(os.path.dirname(__file__)) +    project_dir = os.path.normpath(os.path.join(script_dir, "../../")) +    project_abs_path = os.path.abspath(project_dir) +    data_dir = os.path.join(project_dir, "data") +    config_file_path = os.path.join(data_dir, "config") +    template_dir = os.path.join(project_dir, "template") +    tool_dir = os.path.join(project_dir, "tool") +    tmp_dir = os.path.join(project_dir, "tmp") +    backup_dir = os.path.join(project_dir, "backup") +    log_dir = os.path.join(project_dir, "log") +    template2_dir = os.path.join(project_dir, "template2") +    nginx2_template_dir = os.path.join(template2_dir, "nginx") +    generated_dir = os.path.join(project_dir, "generated") +    nginx_generated_dir = os.path.join(generated_dir, "nginx") + + +def create_dir_if_not_exists(path: str) -> None: +    if not ensure_dir(path, must_exist=False): +        os.mkdir(path) diff --git a/tools/aio/modules/setup.py b/tools/cru-py/crupest/setup.py index 4e91302..4e91302 100644 --- a/tools/aio/modules/setup.py +++ b/tools/cru-py/crupest/setup.py diff --git a/tools/aio/modules/template.py b/tools/cru-py/crupest/template.py index 9747af1..9747af1 100644 --- a/tools/aio/modules/template.py +++ b/tools/cru-py/crupest/template.py diff --git a/tools/cru-py/crupest/template2.py b/tools/cru-py/crupest/template2.py new file mode 100644 index 0000000..ae096df --- /dev/null +++ b/tools/cru-py/crupest/template2.py @@ -0,0 +1,45 @@ +import os.path +import re + +_template_filename_suffix = ".template" +_template_var_regex = r"\$([-_a-zA-Z0-9]+)" +_template_var_brace_regex = r"\$\{\s*([-_a-zA-Z0-9]+?)\s*\}" + + +class Template2: + +    @staticmethod +    def from_file(template_path: str) -> "Template2": +        if not template_path.endswith(_template_filename_suffix): +            raise Exception( +                "Template file must have a name ending with .template.") +        template_name = os.path.basename( +            template_path)[:-len(_template_filename_suffix)] +        with open(template_path, "r") as f: +            template = f.read() +        return Template2(template_name, template, template_path=template_path) + +    def __init__(self, template_name: str, template: str, *, template_path: str | None = None) -> None: +        self.template_name = template_name +        self.template = template +        self.template_path = template_path +        self.var_set = set() +        for match in re.finditer(_template_var_regex, self.template): +            self.var_set.add(match.group(1)) +        for match in re.finditer(_template_var_brace_regex, self.template): +            self.var_set.add(match.group(1)) + +    def partial_render(self, vars: dict[str, str]) -> "Template2": +        t = self.render(vars) +        return Template2(self.template_name, t, template_path=self.template_path) + +    def render(self, vars: dict[str, str]) -> str: +        for name in vars.keys(): +            if name not in self.var_set: +                raise ValueError(f"Invalid var name {name}.") + +        text = self.template +        for name, value in vars.items(): +            text = text.replace("$" + name, value) +            text = re.sub(r"\$\{\s*" + name + r"\s*\}", value, text) +        return text diff --git a/tools/aio/modules/test.py b/tools/cru-py/crupest/test.py index d6eb778..d6eb778 100644 --- a/tools/aio/modules/test.py +++ b/tools/cru-py/crupest/test.py diff --git a/tools/cru-py/crupest/tui.py b/tools/cru-py/crupest/tui.py new file mode 100644 index 0000000..20ba1dd --- /dev/null +++ b/tools/cru-py/crupest/tui.py @@ -0,0 +1,7 @@ +from rich.console import Console +from rich.prompt import Prompt, Confirm + +Prompt = Prompt +Confirm = Confirm + +console = Console() diff --git a/tools/cru-py/crupest/ui_base.py b/tools/cru-py/crupest/ui_base.py new file mode 100644 index 0000000..b26e65b --- /dev/null +++ b/tools/cru-py/crupest/ui_base.py @@ -0,0 +1,19 @@ +from .tui import console + +good_style = "green" +warning_style = "yellow" +error_style = "red bold" +file_name_style = "cyan bold" +var_style = "magenta bold" +value_style = "cyan bold" +bye_style = "cyan" + + +def print_with_indent(value: str, style: str,  /, indent: int = 0, *, indent_width: int = 2, end='\n'): +    console.print( +        f'{" " * indent * indent_width}[{style}]{value}[/]', end=end) + + +def print_var_value(name: str, value: str, /, indent: int = 0, *, indent_width: int = 2, end='\n'): +    console.print( +        f'{" " * indent * indent_width}[{var_style}]{name}[/] = [{value_style}]{value}[/]', end=end) diff --git a/tools/aio/requirements.txt b/tools/cru-py/requirements.txt index 2fb5657..2fb5657 100644 --- a/tools/aio/requirements.txt +++ b/tools/cru-py/requirements.txt diff --git a/tools/aio/update-blog b/tools/cru-py/update-blog index e4a25ab..e4a25ab 100755..100644 --- a/tools/aio/update-blog +++ b/tools/cru-py/update-blog diff --git a/tools/aio/www-dev b/tools/cru-py/www-dev index f56d679..f56d679 100755..100644 --- a/tools/aio/www-dev +++ b/tools/cru-py/www-dev | 
