aboutsummaryrefslogtreecommitdiff
path: root/tools/cru-py
diff options
context:
space:
mode:
authorcrupest <crupest@outlook.com>2023-05-31 22:56:15 +0800
committerYuqian Yang <crupest@crupest.life>2024-12-18 18:31:27 +0800
commit723c9a963a96b25a7498f3e0417307e89c8bb684 (patch)
tree3eff901a5b96eb4ff88d272bed4bb964c6397f1f /tools/cru-py
parent6e60bb06bd7a5052a7d6c2b5b8df0ab084697fdd (diff)
downloadcrupest-723c9a963a96b25a7498f3e0417307e89c8bb684.tar.gz
crupest-723c9a963a96b25a7498f3e0417307e89c8bb684.tar.bz2
crupest-723c9a963a96b25a7498f3e0417307e89c8bb684.zip
HALF WORK: for sync.
Diffstat (limited to 'tools/cru-py')
-rw-r--r--tools/cru-py/.gitignore4
-rw-r--r--tools/cru-py/.python-version1
-rw-r--r--tools/cru-py/__init__.py0
-rw-r--r--tools/cru-py/aio2
-rw-r--r--tools/cru-py/aio.py319
-rw-r--r--tools/cru-py/cru/__init__.py12
-rw-r--r--tools/cru-py/cru/attr.py125
-rw-r--r--tools/cru-py/cru/config.py128
-rw-r--r--tools/cru-py/cru/excp.py137
-rw-r--r--tools/cru-py/cru/parsing.py70
-rw-r--r--tools/cru-py/cru/paths.py63
-rw-r--r--tools/cru-py/cru/service/__init__.py0
-rw-r--r--tools/cru-py/cru/service/docker.py15
-rw-r--r--tools/cru-py/cru/service/nginx.py377
-rw-r--r--tools/cru-py/cru/system.py22
-rw-r--r--tools/cru-py/cru/value.py309
-rw-r--r--tools/cru-py/crupest/__init__.py0
-rw-r--r--tools/cru-py/crupest/__main__.py0
-rw-r--r--tools/cru-py/crupest/backup.py41
-rw-r--r--tools/cru-py/crupest/certbot.py119
-rw-r--r--tools/cru-py/crupest/config.py134
-rw-r--r--tools/cru-py/crupest/dns.py42
-rw-r--r--tools/cru-py/crupest/download_tools.py47
-rw-r--r--tools/cru-py/crupest/helper.py18
-rw-r--r--tools/cru-py/crupest/install_docker.py16
-rw-r--r--tools/cru-py/crupest/nginx.py246
-rw-r--r--tools/cru-py/crupest/path.py57
-rw-r--r--tools/cru-py/crupest/setup.py233
-rw-r--r--tools/cru-py/crupest/template.py32
-rw-r--r--tools/cru-py/crupest/template2.py45
-rw-r--r--tools/cru-py/crupest/test.py31
-rw-r--r--tools/cru-py/crupest/tui.py7
-rw-r--r--tools/cru-py/crupest/ui_base.py19
-rw-r--r--tools/cru-py/requirements.txt3
-rw-r--r--tools/cru-py/update-blog2
-rw-r--r--tools/cru-py/www-dev8
36 files changed, 2684 insertions, 0 deletions
diff --git a/tools/cru-py/.gitignore b/tools/cru-py/.gitignore
new file mode 100644
index 0000000..259058c
--- /dev/null
+++ b/tools/cru-py/.gitignore
@@ -0,0 +1,4 @@
+.idea
+venv
+
+__pycache__
diff --git a/tools/cru-py/.python-version b/tools/cru-py/.python-version
new file mode 100644
index 0000000..37504c5
--- /dev/null
+++ b/tools/cru-py/.python-version
@@ -0,0 +1 @@
+3.11
diff --git a/tools/cru-py/__init__.py b/tools/cru-py/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tools/cru-py/__init__.py
diff --git a/tools/cru-py/aio b/tools/cru-py/aio
new file mode 100644
index 0000000..f74877a
--- /dev/null
+++ b/tools/cru-py/aio
@@ -0,0 +1,2 @@
+#! /usr/bin/env sh
+exec python3 "$(dirname "$0")/aio.py" "$@"
diff --git a/tools/cru-py/aio.py b/tools/cru-py/aio.py
new file mode 100644
index 0000000..d5386f1
--- /dev/null
+++ b/tools/cru-py/aio.py
@@ -0,0 +1,319 @@
+#!/usr/bin/env python3
+
+try:
+ import rich
+ import jsonschema
+ import cryptography
+except ImportError:
+ print("Some necessary crupest can't be imported. Please run `pip install -r requirements.txt` to install them.")
+ exit(1)
+
+from os.path import *
+import argparse
+import subprocess
+from rich.prompt import Confirm
+from crupest.install_docker import *
+from crupest.path import *
+from crupest.nginx import *
+from crupest.config import *
+from crupest.check import *
+from crupest.backup import *
+from crupest.download_tools import *
+from crupest.test import *
+from crupest.dns import *
+from crupest.setup import *
+
+from crupest.tui import console
+
+
+parser = argparse.ArgumentParser(
+ description="Crupest server all-in-one setup script. Have fun play with it!")
+parser.add_argument("--no-hello", action="store_true",
+ default=False, help="Do not print hello message.")
+parser.add_argument("--no-bye-bye", action="store_true",
+ default=False, help="Do not print bye-bye message.")
+
+parser.add_argument("--no-check-python-version", action="store_true",
+ default=False, help="Do not check python version.")
+parser.add_argument("--no-check-system", action="store_true",
+ default=False, help="Do not check system type.")
+parser.add_argument("-y", "--yes", action="store_true",
+ default=False, help="Yes to all confirmation.")
+
+subparsers = parser.add_subparsers(dest="action")
+
+setup_parser = subparsers.add_parser(
+ "setup", help="Do everything necessary to setup the server.")
+
+print_path_parser = subparsers.add_parser(
+ "print-path", help="Print the paths of all related files and dirs.")
+
+download_tools_parser = subparsers.add_parser(
+ "download-tools", help="Download some extra tools to manage the server.")
+
+list_domain_parser = subparsers.add_parser(
+ "list-domain", help="Misc things about domains.")
+
+nginx_parser = subparsers.add_parser(
+ "nginx", help="Generate nginx config.")
+
+certbot_parser = subparsers.add_parser(
+ "certbot", help="Get some common certbot commands.")
+
+certbot_command_group = certbot_parser.add_mutually_exclusive_group()
+
+certbot_command_group.add_argument(
+ "-C", "--create", action="store_true", default=False, help="Only print the command for 'create' action.")
+certbot_command_group.add_argument(
+ "-E", "--expand", action="store_true", default=False, help="Only print the command for 'expand' action.")
+certbot_command_group.add_argument(
+ "-R", "--renew", action="store_true", default=False, help="Only print the command for 'renew' action.")
+
+certbot_parser.add_argument(
+ "-t", "--test", action="store_true", default=False, help="Make the commands for test use.")
+
+clear_parser = subparsers.add_parser(
+ "clear", help="Delete existing data so you can make a fresh start.")
+clear_parser.add_argument("-D", "--include-data-dir", action="store_true",
+ default=False, help="Also delete the data directory.")
+
+install_docker_parser = subparsers.add_parser(
+ "install-docker", help="Install docker and docker-compose.")
+
+backup_parser = subparsers.add_parser(
+ "backup", help="Backup related things."
+)
+
+backup_subparsers = backup_parser.add_subparsers(dest="backup_action")
+backup_restore_parser = backup_subparsers.add_parser(
+ "restore", help="Restore data from url.")
+backup_restore_parser.add_argument(
+ "restore_url", help="Restore archive url. Can be local path or http/https.")
+backup_backup_parser = backup_subparsers.add_parser(
+ "backup", help="Backup data to specified path.")
+backup_backup_parser.add_argument(
+ "backup_path", nargs="?", help="Backup path. Can be empty for a timestamp as name. Must be local path.")
+
+docker_parser = subparsers.add_parser("docker", help="Docker related things.")
+docker_subparsers = docker_parser.add_subparsers(dest="docker_action")
+docker_subparsers.add_parser("up", help="Run docker compose up -d.")
+docker_subparsers.add_parser("down", help="Run docker compose down.")
+docker_subparsers.add_parser(
+ "prune", help="Run docker system prune -a -f.")
+
+test_parser = subparsers.add_parser("test", help="Test things.")
+test_parser.add_argument(
+ "test_action", help="Test action.", choices=["crupest-api"])
+
+dns_parser = subparsers.add_parser("dns", help="Generate dns zone.")
+
+dns_parser.add_argument("-i", "--ip", help="IP address of the server.")
+
+git_update_parser = subparsers.add_parser(
+ "git-update", help="Update git submodules.")
+
+update_blog_parser = subparsers.add_parser(
+ "update-blog", help="Update and regenerate blog.")
+
+up_parser = subparsers.add_parser(
+ "up", help="Do something necessary and then docker compose up.")
+
+down_parser = subparsers.add_parser(
+ "down", help="Do something necessary and then docker compose down.")
+
+args = parser.parse_args()
+
+if args.yes:
+ old_ask = Confirm.ask
+
+ def new_ask(prompt, *args, console=console, default=None, **kwargs):
+ default_text = ""
+ if default is not None:
+ default_text = "(y)" if default else "(n)"
+ text = f"[prompt]{prompt}[/] [prompt.choices]\\[y/n][/] [prompt.default]{default_text}[/]"
+ console.print(text)
+ return True
+
+ Confirm.ask = new_ask
+
+if (args.action == "certbot" and (args.create or args.renew or args.expand)) or (args.action == "dns" and args.ip is not None):
+ args.no_hello = True
+ args.no_bye_bye = True
+
+
+if not args.no_check_python_version:
+ if check_python_version():
+ console.print("This script works well on python 3.10. Otherwise you may encounter some problems. But I would like to improve some rational compatibility.", style="yellow")
+
+if not args.no_check_system:
+ if not check_ubuntu():
+ console.print("This script works well on Ubuntu 22.04. Otherwise you may encounter some problems. But I would like to improve some rational compatibility.", style="yellow")
+
+
+if not args.no_hello:
+ console.print("Nice to see you! :waving_hand:", style="cyan")
+
+
+def check_domain_is_defined():
+ try:
+ return get_domain()
+ except Exception as e:
+ console.print(e.args[0], style="red")
+ raise e
+
+
+def git_update():
+ def do_it():
+ subprocess.run(["git", "pull"], check=True)
+ run_in_project_dir(do_it)
+
+
+def update_blog():
+ def do_it():
+ subprocess.run(["docker", "compose", "exec",
+ "crupest-blog", "/scripts/update.bash"], check=True)
+ run_in_project_dir(do_it)
+
+
+def docker_compose_up():
+ def do_docker_compose_up():
+ subprocess.run(["docker", "compose", "up", "-d"], check=True)
+ run_in_dir(project_abs_path, do_docker_compose_up)
+
+
+def docker_compose_down():
+ def do_docker_compose_down():
+ subprocess.run(
+ ["docker", "compose", "down"], check=True)
+ run_in_dir(project_abs_path, do_docker_compose_down)
+
+
+action = args.action
+
+
+def run():
+ match action:
+ case "install-docker":
+ install_docker()
+ console.print(
+ "Succeeded to install docker. Please re-login to take effect.", style="green")
+
+ case "docker":
+ docker_action = args.docker_action
+
+ match docker_action:
+ case "up":
+ docker_compose_up()
+ case "down":
+ docker_compose_down()
+ case "prune":
+ to_do = Confirm.ask(
+ "[yellow]Are you sure to prune docker?[/]", console=console)
+ if to_do:
+ subprocess.run(
+ ["docker", "system", "prune", "-a", "-f"], check=True)
+ case _:
+ raise ValueError("Unknown docker action.")
+
+ case "backup":
+ backup_action = args.backup_action
+ match backup_action:
+ case "backup":
+ backup_backup(args.backup_path, console)
+ console.print("Succeeded to restore data.", style="green")
+ case "restore":
+ backup_restore(args.restore_path, console)
+ console.print("Succeeded to backup data.", style="green")
+
+ case 'print-path':
+ console.print("Project path =", project_dir)
+ console.print("Project absolute path =", project_abs_path)
+ console.print("Data path =", data_dir)
+
+ case "download-tools":
+ download_tools(console)
+
+ case "list-domain":
+ domain = check_domain_is_defined()
+ domains = list_domains(domain)
+ for domain in domains:
+ console.print(domain)
+
+ case "nginx":
+ raise Exception("This command is deprecated.")
+
+ case "certbot":
+ domain = check_domain_is_defined()
+ is_test = args.test
+ if args.create:
+ console.print(certbot_command_gen(domain, "create",
+ test=is_test), soft_wrap=True, highlight=False)
+ elif args.expand:
+ console.print(certbot_command_gen(domain, "expand",
+ test=is_test), soft_wrap=True, highlight=False)
+ elif args.renew:
+ console.print(certbot_command_gen(domain, "renew",
+ test=is_test), soft_wrap=True, highlight=False)
+ else:
+ console.print(
+ "Here is some commands you can use to do certbot related work.")
+ if is_test:
+ console.print(
+ "Note you specified --test, so the commands are for test use.", style="yellow")
+ console.print(
+ "To create certs for init (standalone):", style="cyan")
+ console.print(certbot_command_gen(
+ domain, 'create', test=is_test), soft_wrap=True)
+ console.print("To expand certs (nginx):", style="cyan")
+ console.print(certbot_command_gen(
+ domain, 'create', test=is_test), soft_wrap=True)
+ console.print(
+ "To renew certs previously created (nginx):", style="cyan")
+ console.print(certbot_command_gen(
+ domain, 'renew', test=is_test), soft_wrap=True)
+ case "test":
+ match args.test_action:
+ case "crupest-api":
+ test_crupest_api(console)
+ case _:
+ console.print("Test action invalid.", style="red")
+
+ case "dns":
+ domain = check_domain_is_defined()
+ if domain is not None:
+ if args.ip is None:
+ ip = Prompt.ask(
+ "Please enter your server ip", console=console)
+ else:
+ ip = args.ip
+ console.print(generate_dns_zone_with_dkim(
+ domain, ip), soft_wrap=True, highlight=False)
+
+ case "git-update":
+ git_update()
+
+ case "update-blog":
+ update_blog()
+
+ case "up":
+ git_update()
+ template_generate(console)
+ docker_compose_up()
+
+ case "down":
+ docker_compose_down()
+
+ case "clear":
+ clear(console, args.include_data_dir)
+
+ case _:
+ template_generate(console)
+ if Confirm.ask(
+ "By the way, would you like to download some scripts to do some extra setup like creating email user?", console=console, default=True):
+ download_tools(console)
+
+
+run()
+
+if not args.no_bye_bye:
+ console.print(":beers: All done! Bye bye!", style="green")
diff --git a/tools/cru-py/cru/__init__.py b/tools/cru-py/cru/__init__.py
new file mode 100644
index 0000000..e36a778
--- /dev/null
+++ b/tools/cru-py/cru/__init__.py
@@ -0,0 +1,12 @@
+import sys
+
+
+class CruInitError(Exception):
+ pass
+
+def check_python_version(required_version=(3, 11)):
+ if sys.version_info < required_version:
+ raise CruInitError(f"Python version must be >= {required_version}!")
+
+
+check_python_version()
diff --git a/tools/cru-py/cru/attr.py b/tools/cru-py/cru/attr.py
new file mode 100644
index 0000000..d07cc55
--- /dev/null
+++ b/tools/cru-py/cru/attr.py
@@ -0,0 +1,125 @@
+from collections.abc import Callable
+from dataclasses import dataclass
+from types import NoneType
+from typing import Any
+from copy import deepcopy
+
+
+@dataclass
+class CruAttr:
+ name: str
+ value: Any
+ description: str
+
+
+@dataclass
+class CruAttrDef:
+ name: str
+ default_description: str
+ allow_types: None | set[type]
+ allow_none: bool
+ default_value: Any
+ transformer: Callable[[Any], Any] | None
+ validator: Callable[[Any], None]
+
+ def __init__(self, name: str, default_description: str, *,
+ allow_types: list[type] | type | None, allow_none: bool, default_value: Any = None,
+ transformer: Callable[[Any], Any] | None = None,
+ validator: Callable[[Any], None] | None = None) -> None:
+ self.name = name
+ self.default_description = default_description
+ self.default_value = default_value
+ #TODO: CONTINUE TOMORROW
+ if allow_types is None:
+ allow_types = []
+ elif isinstance(allow_types, type):
+ allow_types = [allow_types]
+ else:
+ for t in allow_types:
+ if not isinstance(t, type):
+ raise TypeError(f"Invalid value of python type : {t}")
+ self.allow_types = set(filter(lambda tt: tt is not NoneType, allow_types))
+ self.allow_none = allow_none
+ self.transformer = transformer
+ self.validator = validator
+ self.default_value = self.transform_and_validate(self.default_value)
+ self.default_value = deepcopy(self.default_value)
+
+ def transform(self, value: Any) -> Any:
+ if self.transformer is not None:
+ return self.transformer(value)
+ return value
+
+ def validate(self, value: Any, /, override_allow_none: bool | None = None) -> None:
+ allow_none = override_allow_none if override_allow_none is not None else self.allow_none
+ if value is None and not allow_none:
+ raise TypeError(f"None is not allowed!")
+ if len(self.allow_types) != 0 and type(value) not in self.allow_types:
+ raise TypeError(f"Type of {value} is not allowed!")
+ if self.validator is not None:
+ return self.validator(value)
+ return None
+
+ def transform_and_validate(self, value: Any, /, override_allow_none: bool | None = None) -> Any:
+ value = self.transform(value)
+ self.validate(value, override_allow_none)
+ return value
+
+ def make(self, value: Any, description: None | str = None) -> CruAttr:
+ value = self.transform_and_validate(value)
+ return CruAttr(self.name, value if value is not None else deepcopy(self.default_value),
+ description if description is not None else self.default_description)
+
+
+class CruAttrDefRegistry:
+
+ def __init__(self) -> None:
+ self._def_list = []
+
+ @property
+ def items(self) -> list[CruAttrDef]:
+ return self._def_list
+
+ def register(self, def_: CruAttrDef):
+ for i in self._def_list:
+ if i.name == def_.name:
+ raise ValueError(f"Attribute {def_.name} already exists!")
+ self._def_list.append(def_)
+
+ def register_with(self, name: str, default_description: str, *,
+ allow_types: list[type] | type | None, allow_none: bool,
+ default_value: Any = None,
+ transformer: Callable[[Any], Any] | None = None,
+ validator: Callable[[Any], None] | None = None
+ ) -> CruAttrDef:
+ def_ = CruAttrDef(name, default_description, default_value=default_value, allow_types=allow_types,
+ allow_none=allow_none, transformer=transformer, validator=validator)
+ self.register(def_)
+ return def_
+
+ def register_required(self, name: str, default_description: str, *,
+ allow_types: list[type] | type | None,
+ default_value: Any = None,
+ transformer: Callable[[Any], Any] | None = None,
+ validator: Callable[[Any], None] | None = None
+ ) -> CruAttrDef:
+ return self.register_with(name, default_description, default_value=default_value, allow_types=allow_types,
+ allow_none=False, transformer=transformer, validator=validator)
+
+ def register_optional(self, name: str, default_description: str, *,
+ allow_types: list[type] | type | None,
+ default_value: Any = None,
+ transformer: Callable[[Any], Any] | None = None,
+ validator: Callable[[Any], None] | None = None
+ ) -> CruAttrDef:
+ return self.register_with(name, default_description, default_value=default_value, allow_types=allow_types,
+ allow_none=True, transformer=transformer, validator=validator)
+
+ def get_item_optional(self, name: str) -> CruAttrDef | None:
+ for i in self._def_list:
+ if i.name == name:
+ return i
+ return None
+
+ def __getitem__(self, item) -> CruAttrDef | None:
+ return self.get_item_optional(item)
diff --git a/tools/cru-py/cru/config.py b/tools/cru-py/cru/config.py
new file mode 100644
index 0000000..b0c83d5
--- /dev/null
+++ b/tools/cru-py/cru/config.py
@@ -0,0 +1,128 @@
+from typing import Any, TypeVar, Generic
+
+from .excp import CruInternalLogicError
+from .value import ValueType, ValueGenerator, ValidationError
+
+T = TypeVar("T")
+
+
+class ConfigItem(Generic[T]):
+ OptionalValueGenerator = ValueGenerator[T, []] | None
+
+ def __init__(self, name: str, description: str, value_type: ValueType[T], value: T | None, default_value: T, *,
+ value_generator: OptionalValueGenerator = None) -> None:
+ self._name = name
+ self._description = description
+ self._value_type = value_type
+ self._default_value = default_value
+ self._value_generator = value_generator
+ self._value: T | None = value
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def description(self) -> str:
+ return self._description
+
+ @property
+ def value_type(self) -> ValueType[T]:
+ return self._value_type
+
+ @property
+ def default_value(self) -> T:
+ return self._default_value
+
+ @property
+ def is_default(self) -> bool:
+ return self._value is None
+
+ @property
+ def is_set(self) -> bool:
+ return not self.is_default
+
+ @property
+ def value(self) -> T:
+ return self._value or self._default_value
+
+ def set_value(self, v: T | str, /, allow_convert_from_str=False):
+ if allow_convert_from_str:
+ self._value = self.value_type.check_value(v)
+ else:
+ self._value = self.value_type.check_value_or_try_convert_from_str(v)
+
+ @value.setter
+ def value(self, v: T) -> None:
+ self.set_value(v)
+
+ @property
+ def value_generator(self) -> OptionalValueGenerator:
+ return self._value_generator
+
+ def generate_value(self, allow_interactive=False) -> T | None:
+ if self.value_generator is None: return None
+ if self.value_generator.interactive and not allow_interactive:
+ return None
+ else:
+ v = self.generate_value()
+ try:
+ self.value_type.check_value(v)
+ return v
+ except ValidationError as e:
+ raise CruInternalLogicError("Config value generator returns invalid value.", name=self.name, inner=e)
+
+ def copy(self) -> "ConfigItem":
+ return ConfigItem(self.name, self.description, self.value_type,
+ self._value.copy() if self._value is not None else None, self._default_value.copy(),
+ value_generator=self.value_generator)
+
+
+class Configuration:
+ def __init__(self, items: None | list[ConfigItem] = None) -> None:
+ self._items: list[ConfigItem] = items or []
+
+ @property
+ def items(self) -> list[ConfigItem]:
+ return self._items
+
+ @property
+ def item_map(self) -> dict[str, ConfigItem]:
+ return {i.name: i for i in self.items}
+
+ def get_optional_item(self, name: str) -> ConfigItem | None:
+ for i in self.items:
+ if i.name == name:
+ return i
+ return None
+
+ def clear(self) -> None:
+ self._items.clear()
+
+ def has_item(self, name: str) -> bool:
+ return self.get_optional_item(name) is not None
+
+ def add_item(self, item: ConfigItem):
+ i = self.get_optional_item(item.name)
+ if i is not None:
+ raise CruInternalLogicError("Config item of the name already exists.", name=item.name)
+ self.items.append(item)
+ return item
+
+ def set_value(self, name: str, v: Any, /, allow_convert_from_str=False):
+ i = self.get_optional_item(name)
+ if i is None:
+ raise CruInternalLogicError("No config item of the name. Can't set value.", name=name)
+ i.set_value(v, allow_convert_from_str)
+
+ def copy(self) -> "Configuration":
+ return Configuration([i.copy() for i in self.items])
+
+ def __getitem__(self, name: str) -> ConfigItem:
+ i = self.get_optional_item(name)
+ if i is not None:
+ return i
+ raise CruInternalLogicError('No config item of the name.', name=name)
+
+ def __contains__(self, name: str):
+ return self.has_item(name)
diff --git a/tools/cru-py/cru/excp.py b/tools/cru-py/cru/excp.py
new file mode 100644
index 0000000..5a5871b
--- /dev/null
+++ b/tools/cru-py/cru/excp.py
@@ -0,0 +1,137 @@
+from collections.abc import Callable
+from dataclasses import dataclass
+from types import NoneType
+from typing import Any
+
+from cru.attr import CruAttrDefRegistry
+
+CRU_EXCEPTION_ATTR_DEF_REGISTRY = CruAttrDefRegistry()
+
+
+class CruException(Exception):
+ @staticmethod
+ def transform_inner(inner: Exception | list[Exception] | None):
+ if inner is None:
+ return None
+ if isinstance(inner, Exception):
+ return [inner]
+ if isinstance(inner, list):
+ return inner
+
+ @staticmethod
+ def validate_inner(inner: list[Exception]):
+ for i in inner:
+ if not isinstance(i, Exception):
+ raise TypeError(f"Invalid inner exception: {i}")
+
+ MESSAGE_DEF = CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_required("message", "Message describing the exception.",
+ allow_types=str, default_value="")
+ INNER_DEF = CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_required("inner", "Inner exception.",
+ allow_types=list, default_value=[],
+ transformer=transform_inner, validator=validate_inner)
+ INTERNAL_DEF = CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_required("internal",
+ "True if the exception is caused by wrong internal logic. False if it is caused by user's wrong input.",
+ allow_types=bool, default_value=False)
+ CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_optional("name", "Name of the object that causes the exception.",
+ allow_types=str)
+ CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_optional("value", "Value that causes the exception.",
+ allow_types=[])
+ CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with("path", "Path that causes the exception.",)
+ CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with("type", "Python type related to the exception.")
+
+ def __init__(self, message: str, *args,
+ inner: Exception | list[Exception] | None = None,
+ internal: bool = False,
+ name: str | None = None,
+ value: Any | None = None,
+ path: str | None = None,
+ type_: type | None = None,
+ init_attrs: dict[str, Any] | None = None,
+ attrs: dict[str, Any] | None = None, **kwargs) -> None:
+ super().__init__(message, *args)
+
+ self._attrs = {
+ CruException.MESSAGE_KEY: message,
+ CruException.INTERNAL_KEY: internal,
+ CruException.INNER_KEY: inner,
+ CruException.NAME_KEY: name,
+ CruException.VALUE_KEY: value,
+ CruException.PATH_KEY: path,
+ CruException.TYPE_KEY: type_,
+ }
+ if init_attrs is not None:
+ self._attrs.update(init_attrs)
+ if attrs is not None:
+ self._attrs.update(attrs)
+ self._attrs.update(kwargs)
+
+ @property
+ def message(self) -> str:
+ return self[CruException.MESSAGE_KEY]
+
+ @property
+ def internal(self) -> bool:
+ return self[CruException.INTERNAL_KEY]
+
+ @property
+ def inner(self) -> list[Exception]:
+ return self[CruException.INNER_KEY]
+
+ @property
+ def name(self) -> str | None:
+ return self[CruException.NAME_KEY]
+
+ @property
+ def value(self) -> Any | None:
+ return self[CruException.VALUE_KEY]
+
+ @property
+ def path(self) -> str | None:
+ return self[CruException.PATH_KEY]
+
+ @property
+ def type(self) -> type | None:
+ return self[CruException.TYPE_KEY]
+
+ def _get_attr_list_recursive(self, name: str, depth: int, max_depth: int, l: list[Any]):
+ if 0 < max_depth < depth + 1:
+ return
+ a = self._attrs.get(name, None)
+ if a is not None:
+ l.append(a)
+ for i in self.inner:
+ if isinstance(i, CruException):
+ i._get_attr_list_recursive(name, depth + 1, max_depth, l)
+
+ def get_attr_list_recursive(self, name: str, /, max_depth: int = -1) -> list[Any]:
+ l = []
+ self._get_attr_list_recursive(name, 0, max_depth, l)
+ return l
+
+ def get_optional_attr(self, name: str, max_depth: int = -1) -> Any | None:
+ l = self.get_attr_list_recursive(name, max_depth)
+ return l[0] if len(l) > 0 else None
+
+ def __getitem__(self, name: str) -> Any | None:
+ return self.get_optional_attr(name)
+
+
+class CruInternalLogicError(CruException):
+ def __init__(self, message: str, *args, **kwargs) -> None:
+ super().__init__(message, *args, internal=True, **kwargs)
+
+
+class UserFriendlyException(CruException):
+ USER_MESSAGE_KEY = "user_message"
+
+ CRU_EXCEPTION_ATTR_DEF_REGISTRY.register(
+ CruExceptionAttrDef(USER_MESSAGE_KEY, "Message describing the exception, but with user-friendly language."))
+
+ def __init__(self, message: str, user_message: str | None = None, *args, **kwargs) -> None:
+ if user_message is None:
+ user_message = message
+ super().__init__(message, *args, init_attrs={UserFriendlyException.USER_MESSAGE_KEY: user_message}, **kwargs)
+
+ @property
+ def user_message(self) -> str:
+ return self[UserFriendlyException.USER_MESSAGE_KEY]
diff --git a/tools/cru-py/cru/parsing.py b/tools/cru-py/cru/parsing.py
new file mode 100644
index 0000000..be7bbf4
--- /dev/null
+++ b/tools/cru-py/cru/parsing.py
@@ -0,0 +1,70 @@
+from abc import ABCMeta, abstractmethod
+from typing import TypeVar, Generic, NoReturn, Callable
+
+from cru.excp import CruException, CRU_EXCEPTION_ATTR_DEF_REGISTRY
+
+R = TypeVar("R")
+
+
+class ParseException(CruException):
+ LINE_NUMBER_KEY = "line_number"
+
+ CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with(LINE_NUMBER_KEY, "Line number of the error.")
+
+
+class Parser(Generic[R], metaclass=ABCMeta):
+ def __init__(self, name: str) -> None:
+ self._name = name
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @abstractmethod
+ def parse(self, s: str) -> R:
+ raise NotImplementedError()
+
+ def raise_parse_exception(self, s: str, line_number: int | None = None) -> NoReturn:
+ a = f" at line {line_number}" if line_number is not None else ""
+ raise ParseException(f"Parser {self.name} failed{a}, {s}")
+
+
+class SimpleLineConfigParser(Parser[list[tuple[str, str]]]):
+ def __init__(self) -> None:
+ super().__init__(type(self).__name__)
+
+ def _parse(self, s: str, f: Callable[[str, str], None]) -> None:
+ for ln, line in enumerate(s.splitlines()):
+ line_number = ln + 1
+ # check if it's a comment
+ if line.strip().startswith("#"):
+ continue
+ # check if there is a '='
+ if line.find("=") == -1:
+ self.raise_parse_exception(f"There is even no '='!", line_number)
+ # split at first '='
+ key, value = line.split("=", 1)
+ key = key.strip()
+ value = value.strip()
+ f(key, value)
+
+ def parse(self, s: str) -> list[tuple[str, str]]:
+ items = []
+ self._parse(s, lambda key, value: items.append((key, value)))
+ return items
+
+ def parse_to_dict(self, s: str, /, allow_override: bool = False) -> tuple[dict[str, str], list[tuple[str, str]]]:
+ d = {}
+ duplicate = []
+
+ def add(key: str, value: str) -> None:
+ if key in d:
+ if allow_override:
+ duplicate.append((key, d[key]))
+ d[key] = value
+ else:
+ self.raise_parse_exception(f"Key '{key}' already exists!", None)
+ d[key] = value
+
+ self._parse(s, add)
+ return d, duplicate
diff --git a/tools/cru-py/cru/paths.py b/tools/cru-py/cru/paths.py
new file mode 100644
index 0000000..df5042b
--- /dev/null
+++ b/tools/cru-py/cru/paths.py
@@ -0,0 +1,63 @@
+from pathlib import Path
+import os
+
+from .excp import CruException
+
+
+class ApplicationPathError(CruException):
+ def __init__(self, message: str, p: str | Path, *args, **kwargs):
+ super().__init__(message, *args, path=str(p), **kwargs)
+
+
+def check_parents_dir(p: str | Path, /, must_exist: bool = False) -> bool:
+ p = Path(p) if isinstance(p, str) else p
+ for p in reversed(p.parents):
+ if not p.exists() and not must_exist:
+ return False
+ if not p.is_dir():
+ raise ApplicationPathError("Parents path should be a dir.", p)
+ return True
+
+
+class ApplicationPath:
+ def __init__(self, p: str | Path, is_dir: bool) -> None:
+ self._path = Path(p) if isinstance(p, str) else p
+ self._is_dir = is_dir
+
+ @property
+ def path(self) -> Path:
+ return self._path
+
+ @property
+ def is_dir(self) -> bool:
+ return self._is_dir
+
+ def check_parents(self, must_exist: bool = False) -> bool:
+ return check_parents_dir(self._path.parent, must_exist)
+
+ def check_self(self, must_exist: bool = False) -> bool:
+ if not self.check_parents(must_exist):
+ return False
+ if not self.path.exists():
+ if not must_exist: return False
+ raise ApplicationPathError("Mot exist.", self.path)
+ if self.is_dir:
+ if not self.path.is_dir():
+ raise ApplicationPathError("Should be a directory, but not.", self.path)
+ else:
+ return False
+ else:
+ if not self.path.is_file():
+ raise ApplicationPathError("Should be a file, but not.", self.path)
+ else:
+ return False
+
+ def ensure(self, create_file: bool = False) -> None:
+ e = self.check_self(False)
+ if not e:
+ os.makedirs(self.path.parent, exist_ok=True)
+ if self.is_dir:
+ os.mkdir(self.path)
+ elif create_file:
+ with open(self.path, "w") as f:
+ f.write("")
diff --git a/tools/cru-py/cru/service/__init__.py b/tools/cru-py/cru/service/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tools/cru-py/cru/service/__init__.py
diff --git a/tools/cru-py/cru/service/docker.py b/tools/cru-py/cru/service/docker.py
new file mode 100644
index 0000000..42d4a35
--- /dev/null
+++ b/tools/cru-py/cru/service/docker.py
@@ -0,0 +1,15 @@
+import shutil
+
+
+class DockerController:
+ DOCKER_BIN_NAME = "docker"
+
+ def __init__(self, docker_bin: None | str = None) -> None:
+ self._docker_bin = docker_bin
+
+ @property
+ def docker_bin(self) -> str:
+ if self._docker_bin is None:
+ self._docker_bin = shutil.which(self.DOCKER_BIN_NAME)
+ return self._docker_bin
+
diff --git a/tools/cru-py/cru/service/nginx.py b/tools/cru-py/cru/service/nginx.py
new file mode 100644
index 0000000..9298623
--- /dev/null
+++ b/tools/cru-py/cru/service/nginx.py
@@ -0,0 +1,377 @@
+from typing import Literal, Any, cast, ClassVar
+import os
+from os.path import join, basename, dirname
+import subprocess
+import re
+import json
+import jsonschema
+
+from crupest.template2 import Template2
+from crupest.tui import Paths, UserFriendlyException, create_dir_if_not_exists, console, Confirm, ensure_dir
+from crupest.ui_base import file_name_style
+
+
+def restart_nginx(force=False) -> bool:
+ if not force:
+ p = subprocess.run(['docker', "container", "ls",
+ "-f", "name=nginx", "-q"], capture_output=True)
+ container: str = p.stdout.decode("utf-8")
+ if len(container.strip()) == 0:
+ return False
+ subprocess.run(['docker', 'restart', 'nginx'])
+ return True
+
+
+_server_schema_filename = "server.schema.json"
+
+with open(join(Paths.nginx2_template_dir, _server_schema_filename)) as f:
+ server_json_schema = json.load(f)
+
+
+_domain_template_filename = "domain.conf.template"
+
+NginxSourceFileType = Literal["global", "domain", "http", "https"]
+
+
+class NginxSourceFile:
+ def __init__(self, path: str) -> None:
+ """
+ path: relative to nginx2_template_dir
+ """
+ self._path = path
+ is_template = path.endswith(".template")
+ self._is_template = is_template
+ filename = basename(path)
+ self.name = filename[:-len(".template")] if is_template else filename
+ if is_template:
+ self._template = Template2.from_file(
+ join(Paths.nginx2_template_dir, path))
+ else:
+ with open(join(Paths.nginx2_template_dir, path)) as f:
+ self._content = f.read()
+
+ self._scope: NginxSourceFileType = self._calc_scope()
+
+ @property
+ def is_template(self) -> bool:
+ return self._is_template
+
+ @property
+ def content(self) -> str:
+ if self._is_template:
+ raise Exception(f"{self._path} is a template file")
+ return self._content
+
+ @property
+ def template(self) -> Template2:
+ if not self._is_template:
+ raise Exception(f"{self._path} is not a template file")
+ return cast(Template2, self._template)
+
+ @property
+ def global_target_filename(self) -> str:
+ if self.scope != "global":
+ raise Exception(f"{self._path} is not a global file")
+ if self.is_template:
+ return basename(self._path)[:-len(".template")]
+ else:
+ return basename(self._path)
+
+ def _calc_scope(self) -> NginxSourceFileType:
+ f = basename(self._path)
+ d = basename(dirname(self._path))
+ if f == _domain_template_filename:
+ return "domain"
+ elif d in ["global", "http", "https"]:
+ return cast(Literal["global", "http", "https"], d)
+ else:
+ raise Exception(f"Unknown scope for {self._path}")
+
+ @property
+ def scope(self) -> NginxSourceFileType:
+ return self._scope
+
+
+_domain_template_source = NginxSourceFile(_domain_template_filename)
+
+_client_max_body_size_source = NginxSourceFile(
+ "global/client-max-body-size.conf")
+_forbid_unknown_domain_source = NginxSourceFile(
+ "global/forbid-unknown-domain.conf")
+_ssl_template_source = NginxSourceFile("global/ssl.conf.template")
+_websocket_source = NginxSourceFile("global/websocket.conf")
+
+_http_444_source = NginxSourceFile("http/444.segment")
+_http_redirect_to_https_source = NginxSourceFile(
+ "http/redirect-to-https.segment")
+
+_https_redirect_template_source = NginxSourceFile(
+ "https/redirect.segment.template")
+_https_reverse_proxy_template_source = NginxSourceFile(
+ "https/reverse-proxy.segment.template")
+_https_static_file_template_source = NginxSourceFile(
+ "https/static-file.segment.template")
+_https_static_file_no_strip_prefix_template_source = NginxSourceFile(
+ "https/static-file.no-strip-prefix.segment.template")
+
+
+class NginxService:
+ def __init__(self, type: str, path: str) -> None:
+ self.type = type
+ self.path = path
+ self._check_path(path)
+
+ @staticmethod
+ def _check_path(path: str) -> None:
+ assert isinstance(path, str)
+ if path == "" or path == "/":
+ return
+ if not path.startswith("/"):
+ raise UserFriendlyException("Service path should start with '/'.")
+ if path.endswith("/"):
+ raise UserFriendlyException(
+ "Service path should not end with '/'.")
+
+ def generate_https_segment(self) -> str:
+ raise NotImplementedError()
+
+
+class NginxRedirectService(NginxService):
+ def __init__(self, path: str, redirect_url: str, redirect_code: int = 307) -> None:
+ if redirect_url.endswith("/"):
+ raise UserFriendlyException(
+ "Redirect URL should not end with '/'.")
+
+ super().__init__("redirect", path)
+
+ self.redirect_url = redirect_url
+ self.redirect_code = redirect_code
+
+ def generate_https_segment(self) -> str:
+ vars = {
+ "PATH": self.path,
+ "REDIRECT_CODE": self.redirect_code,
+ "REDIRECT_URL": self.redirect_url
+ }
+ return _https_redirect_template_source.template.render(vars)
+
+ @staticmethod
+ def from_json(json: dict[str, Any]) -> "NginxRedirectService":
+ path = json["path"]
+ redirect_url = json["to"]
+ redirect_code = json.get("code", 307)
+ assert isinstance(path, str)
+ assert isinstance(redirect_url, str)
+ assert isinstance(redirect_code, int)
+ return NginxRedirectService(path, redirect_url, redirect_code)
+
+
+class NginxReverseProxyService(NginxService):
+
+ _upstream_regex: ClassVar[re.Pattern[str]] = re.compile(
+ r"^[-_0-9a-zA-Z]+:[0-9]+$")
+
+ def __init__(self, path: str, upstream: str) -> None:
+ if not self._upstream_regex.match(upstream):
+ raise UserFriendlyException(
+ f"Invalid upstream format: {upstream}.")
+
+ super().__init__("reverse-proxy", path)
+
+ self.upstream = upstream
+
+ def generate_https_segment(self) -> str:
+ vars = {
+ "PATH": self.path,
+ "UPSTREAM": self.upstream
+ }
+ return _https_reverse_proxy_template_source.template.render(vars)
+
+ @staticmethod
+ def from_json(json: dict[str, Any]) -> "NginxReverseProxyService":
+ path = json["path"]
+ upstream = json["upstream"]
+ assert isinstance(path, str)
+ assert isinstance(upstream, str)
+ return NginxReverseProxyService(path, upstream)
+
+
+class NginxStaticFileService(NginxService):
+ def __init__(self, path: str, root: str, no_strip_prefix: bool = False) -> None:
+ super().__init__("static-file", path)
+
+ self.root = root
+ self.no_strip_prefix = no_strip_prefix
+
+ def generate_https_segment(self) -> str:
+ vars = {
+ "PATH": self.path,
+ "ROOT": self.root,
+ }
+ if self.no_strip_prefix:
+ return _https_static_file_no_strip_prefix_template_source.template.render(vars)
+ else:
+ return _https_static_file_template_source.template.render(vars)
+
+ @staticmethod
+ def from_json(json: dict[str, Any]) -> "NginxStaticFileService":
+ path = json["path"]
+ root = json["root"]
+ no_strip_prefix = json.get("no_strip_prefix", False)
+ assert isinstance(path, str)
+ assert isinstance(root, str)
+ assert isinstance(no_strip_prefix, bool)
+ return NginxStaticFileService(path, root, no_strip_prefix)
+
+
+def nginx_service_from_json(json: dict[str, Any]) -> NginxService:
+ type = json["type"]
+ if type == "redirect":
+ return NginxRedirectService.from_json(json)
+ elif type == "reverse-proxy":
+ return NginxReverseProxyService.from_json(json)
+ elif type == "static-file":
+ return NginxStaticFileService.from_json(json)
+ else:
+ raise UserFriendlyException(f"Invalid crupest type: {type}.")
+
+
+def _prepend_indent(text: str, indent: str = " " * 4) -> str:
+ lines = text.split("\n")
+ for i in range(len(lines)):
+ if lines[i] != "":
+ lines[i] = indent + lines[i]
+ return "\n".join(lines)
+
+
+class NginxDomain:
+ def __init__(self, domain: str, services: list[NginxService] = []) -> None:
+ self.domain = domain
+ self.services = services
+
+ def add_service(self, service: NginxService) -> None:
+ self.services.append(service)
+
+ def generate_http_segment(self) -> str:
+ if len(self.services) == 0:
+ return _http_444_source.content
+ else:
+ return _http_redirect_to_https_source.content
+
+ def generate_https_segment(self) -> str:
+ return "\n\n".join([s.generate_https_segment() for s in self.services])
+
+ def generate_config(self) -> str:
+ vars = {
+ "DOMAIN": self.domain,
+ "HTTP_SEGMENT": _prepend_indent(self.generate_http_segment()),
+ "HTTPS_SEGMENT": _prepend_indent(self.generate_https_segment()),
+ }
+ return _domain_template_source.template.render(vars)
+
+ def generate_config_file(self, path: str) -> None:
+ with open(path, "w") as f:
+ f.write(self.generate_config())
+
+ @staticmethod
+ def from_json(root_domain: str, json: dict[str, Any]) -> "NginxDomain":
+ name = json["name"]
+ assert isinstance(name, str)
+ if name == "@" or name == "":
+ domain = root_domain
+ else:
+ domain = f"{name}.{root_domain}"
+ assert isinstance(json["services"], list)
+ services = [nginx_service_from_json(s) for s in json["services"]]
+ return NginxDomain(domain, services)
+
+
+def check_nginx_config_schema(json: Any) -> None:
+ jsonschema.validate(json, server_json_schema)
+
+
+class NginxServer:
+ def __init__(self, root_domain: str) -> None:
+ self.root_domain = root_domain
+ self.domains: list[NginxDomain] = []
+
+ def add_sub_domain(self, sub_domain: str, services: list[NginxService]) -> None:
+ if sub_domain == "" or sub_domain == "@":
+ domain = self.root_domain
+ else:
+ domain = f"{sub_domain}.{self.root_domain}"
+ self.domains.append(NginxDomain(domain, services))
+
+ def generate_ssl(self) -> str:
+ return _ssl_template_source.template.render({
+ "ROOT_DOMAIN": self.root_domain
+ })
+
+ def generate_global_files(self, d: str) -> None:
+ for source in [_client_max_body_size_source, _forbid_unknown_domain_source, _websocket_source]:
+ with open(join(d, source.name), "w") as f:
+ f.write(source.content)
+ with open(join(d, _ssl_template_source.name), "w") as f:
+ f.write(self.generate_ssl())
+
+ def generate_domain_files(self, d: str) -> None:
+ for domain in self.domains:
+ domain.generate_config_file(join(d, f"{domain.domain}.conf"))
+
+ def generate_config(self, d: str) -> None:
+ create_dir_if_not_exists(d)
+ self.generate_global_files(d)
+
+ def get_allowed_files(self) -> list[str]:
+ files = []
+ for source in [_client_max_body_size_source, _forbid_unknown_domain_source, _ssl_template_source, _websocket_source]:
+ files.append(source.name)
+ for domain in self.domains:
+ files.append(f"{domain.domain}.conf")
+ return files
+
+ def check_bad_files(self, d: str) -> list[str]:
+ allowed_files = self.get_allowed_files()
+ bad_files = []
+ if not ensure_dir(d, must_exist=False):
+ return []
+ for path in os.listdir(d):
+ if path not in allowed_files:
+ bad_files.append(path)
+ return bad_files
+
+ @staticmethod
+ def from_json(root_domain: str, json: dict[str, Any]) -> "NginxServer":
+ check_nginx_config_schema(json)
+ server = NginxServer(root_domain)
+ sub_domains = json["domains"]
+ assert isinstance(sub_domains, list)
+ server.domains = [NginxDomain.from_json(
+ root_domain, d) for d in sub_domains]
+ return server
+
+ @staticmethod
+ def from_json_str(root_domain: str, json_str: str) -> "NginxServer":
+ return NginxServer.from_json(root_domain, json.loads(json_str))
+
+ def go(self):
+ bad_files = self.check_bad_files(Paths.nginx_generated_dir)
+ if len(bad_files) > 0:
+ console.print(
+ "WARNING: It seems there are some bad conf files in the nginx config directory:", style="yellow")
+ for bad_file in bad_files:
+ console.print(bad_file, style=file_name_style)
+ to_delete = Confirm.ask(
+ "They will affect nginx in a [red]bad[/] way. Do you want to delete them?", default=True, console=console)
+ if to_delete:
+ for file in bad_files:
+ os.remove(join(Paths.nginx_generated_dir, file))
+ create_dir_if_not_exists(Paths.generated_dir)
+ if not ensure_dir(Paths.nginx_generated_dir, must_exist=False):
+ os.mkdir(Paths.nginx_generated_dir)
+ console.print(
+ f"Nginx config directory created at [magenta]{Paths.nginx_generated_dir}[/]", style="green")
+ self.generate_config(Paths.nginx_generated_dir)
+ console.print("Nginx config generated.", style="green")
+ if restart_nginx():
+ console.print('Nginx restarted.', style="green")
diff --git a/tools/cru-py/cru/system.py b/tools/cru-py/cru/system.py
new file mode 100644
index 0000000..2e05cd1
--- /dev/null
+++ b/tools/cru-py/cru/system.py
@@ -0,0 +1,22 @@
+import re
+import os.path
+
+
+def check_debian_derivative_version(name: str) -> None | str:
+ if not os.path.isfile("/etc/os-release"):
+ return None
+ with open("/etc/os-release", "r") as f:
+ content = f.read()
+ if not f"ID={name}" in content:
+ return None
+ m = re.search(r'VERSION_ID="(.+)"', content)
+ if m is None: return None
+ return m.group(1)
+
+
+def check_ubuntu_version() -> None | str:
+ return check_debian_derivative_version("ubuntu")
+
+
+def check_debian_version() -> None | str:
+ return check_debian_derivative_version("debian")
diff --git a/tools/cru-py/cru/value.py b/tools/cru-py/cru/value.py
new file mode 100644
index 0000000..cddbde9
--- /dev/null
+++ b/tools/cru-py/cru/value.py
@@ -0,0 +1,309 @@
+import random
+import secrets
+import string
+import uuid
+from abc import abstractmethod, ABCMeta
+from collections.abc import Mapping, Callable
+from typing import Any, ClassVar, Literal, TypeVar, Generic, ParamSpec
+
+from .excp import CruInternalLogicError, CruException, CRU_EXCEPTION_ATTR_DEF_REGISTRY
+
+
+def _str_case_in(s: str, case: bool, l: list[str]) -> bool:
+ if case:
+ return s in l
+ else:
+ return s.lower() in [s.lower() for s in l]
+
+
+_ValueTypeForward = type["ValueType"]
+
+T = TypeVar("T")
+
+
+class _ValueErrorMixin:
+ VALUE_TYPE_KEY = "value_type"
+
+ CRU_EXCEPTION_ATTR_DEF_REGISTRY.register_with(
+ VALUE_TYPE_KEY,
+ "The type of the value that causes the exception."
+ )
+
+
+class ValidationError(CruException, _ValueErrorMixin):
+ def __init__(self, message: str, value: Any, value_type: _ValueTypeForward[T] | None, *args, **kwargs):
+ super().__init__(message, *args, value=value, type_=value_type.type, init_attrs={
+ ValidationError.VALUE_TYPE_KEY: value_type,
+ }, **kwargs)
+
+ @property
+ def value_type(self) -> _ValueTypeForward[T] | None:
+ return self[ValidationError.VALUE_TYPE_KEY]
+
+
+class ValueStringConvertionError(CruException, _ValueErrorMixin):
+ def __init__(self, message: str, value: Any, value_type: _ValueTypeForward[T] | None, *args,
+ **kwargs):
+ super().__init__(message, *args, value=value, type_=value_type.type, init_attrs={
+ ValueStringConvertionError.VALUE_TYPE_KEY: value_type,
+ }, **kwargs)
+
+ @property
+ def value_type(self) -> _ValueTypeForward[T] | None:
+ return self[ValueStringConvertionError.VALUE_TYPE_KEY]
+
+
+class ValueType(Generic[T], metaclass=ABCMeta):
+ @staticmethod
+ def case_sensitive_to_str(case_sensitive: bool) -> str:
+ return f"case-{'' if case_sensitive else 'in'}sensitive"
+
+ def __init__(self, name: str) -> None:
+ self._name = name
+ self._type = type("T")
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def type(self) -> type:
+ return self._type
+
+ def is_instance_of_value_type(self, value: Any) -> bool:
+ return isinstance(value, self.type)
+
+ def _do_check_value(self, value: Any) -> tuple[True, T] | tuple[False, None | str]:
+ return True, value
+
+ def check_value(self, value: Any) -> T:
+ if not isinstance(value, self.type):
+ raise ValidationError("Value type is wrong.", value, self)
+ ok, v_or_err = self._do_check_value(value)
+ if ok:
+ return v_or_err
+ else:
+ raise ValidationError(v_or_err or "Value is not valid.", value, self)
+
+ @abstractmethod
+ def _do_check_str_format(self, s: str) -> bool | tuple[bool, str]:
+ """
+ Return None for no error. Otherwise, return error message.
+ """
+ raise NotImplementedError()
+
+ def check_str_format(self, s: str) -> None:
+ ok, err = self._do_check_str_format(s)
+ if ok is None: raise CruInternalLogicError("_do_check_str_format should not return None.")
+ if ok: return
+ if err is None:
+ err = "Invalid value str format."
+ raise ValueStringConvertionError(err, s, value_type=self)
+
+ @abstractmethod
+ def _do_convert_value_to_str(self, value: T) -> str:
+ raise NotImplementedError()
+
+ def convert_value_to_str(self, value: T) -> str:
+ self.check_value(value)
+ return self._do_convert_value_to_str(value)
+
+ @abstractmethod
+ def _do_convert_str_to_value(self, s: str) -> T:
+ raise NotImplementedError()
+
+ def convert_str_to_value(self, s: str) -> T:
+ self.check_str_format(s)
+ return self._do_convert_str_to_value(s)
+
+ def check_value_or_try_convert_from_str(self, value_or_str: Any) -> T:
+ try:
+ return self.check_value(value_or_str)
+ except ValidationError as e:
+ if isinstance(value_or_str, str):
+ return self.convert_str_to_value(value_or_str)
+ else:
+ raise ValidationError("Value is not valid and is not a str.", value_or_str, self,
+ inner=e)
+
+
+class TextValueType(ValueType[str]):
+ def __init__(self) -> None:
+ super().__init__("text")
+
+ def _do_check_str_format(self, s):
+ return True
+
+ def _do_convert_value_to_str(self, value):
+ return value
+
+ def _do_convert_str_to_value(self, s):
+ return s
+
+
+class IntegerValueType(ValueType[int]):
+
+ def __init__(self) -> None:
+ super().__init__("integer")
+
+ def _do_check_str_format(self, s):
+ try:
+ int(s)
+ return True
+ except ValueError:
+ return False
+
+ def _do_convert_value_to_str(self, value):
+ return str(value)
+
+ def _do_convert_str_to_value(self, s):
+ return int(s)
+
+
+class FloatValueType(ValueType[float]):
+ def __init__(self) -> None:
+ super().__init__("float")
+
+ def _do_check_str_format(self, s):
+ try:
+ float(s)
+ return True
+ except ValueError:
+ return False
+
+ def _do_convert_value_to_str(self, value):
+ return str(value)
+
+ def _do_convert_str_to_value(self, s):
+ return float(s)
+
+
+class BooleanValueType(ValueType[bool]):
+ DEFAULT_TRUE_LIST: ClassVar[list[str]] = ["true", "yes", "y", "on", "1"]
+ DEFAULT_FALSE_LIST: ClassVar[list[str]] = ["false", "no", "n", "off", "0"]
+
+ def __init__(self, *, case_sensitive=False, true_list: None | list[str] = None,
+ false_list: None | list[str] = None) -> None:
+ super().__init__("boolean")
+ self._case_sensitive = case_sensitive
+ self._valid_true_strs: list[str] = true_list or BooleanValueType.DEFAULT_TRUE_LIST
+ self._valid_false_strs: list[str] = false_list or BooleanValueType.DEFAULT_FALSE_LIST
+
+ @property
+ def case_sensitive(self) -> bool:
+ return self._case_sensitive
+
+ @property
+ def valid_true_strs(self) -> list[str]:
+ return self._valid_true_strs
+
+ @property
+ def valid_false_strs(self) -> list[str]:
+ return self._valid_false_strs
+
+ @property
+ def valid_boolean_strs(self) -> list[str]:
+ return self._valid_true_strs + self._valid_false_strs
+
+ def _do_check_str_format(self, s):
+ if _str_case_in(s, self.case_sensitive, self.valid_boolean_strs): return True
+ return False, f"Not a valid boolean string ({ValueType.case_sensitive_to_str(self.case_sensitive)}). Valid string of true: {' '.join(self._valid_true_strs)}. Valid string of false: {' '.join(self._valid_false_strs)}. All is case insensitive."
+
+ def _do_convert_value_to_str(self, value):
+ return "True" if value else "False"
+
+ def _do_convert_str_to_value(self, s):
+ return _str_case_in(s, self.case_sensitive, self._valid_true_strs)
+
+
+class EnumValueType(ValueType[str]):
+ def __init__(self, valid_values: list[str], /, case_sensitive=False) -> None:
+ s = ' | '.join([f'"{v}"' for v in valid_values])
+ self._valid_value_str = f'[ {s} ]'
+ super().__init__(f"enum{self._valid_value_str}")
+ self._case_sensitive = case_sensitive
+ self._valid_values = valid_values
+
+ @property
+ def case_sensitive(self) -> bool:
+ return self._case_sensitive
+
+ @property
+ def valid_values(self) -> list[str]:
+ return self._valid_values
+
+ def _do_check_value(self, value):
+ ok, err = self._do_check_str_format(value)
+ return ok, (value if ok else err)
+
+ def _do_check_str_format(self, s):
+ if _str_case_in(s, self.case_sensitive, self.valid_values): return True
+ return False, f"Value is not in valid values ({ValueType.case_sensitive_to_str(self.case_sensitive)}): {self._valid_value_str}"
+
+ def _do_convert_value_to_str(self, value):
+ return value
+
+ def _do_convert_str_to_value(self, s):
+ return s
+
+
+TEXT_VALUE_TYPE = TextValueType()
+INTEGER_VALUE_TYPE = IntegerValueType()
+BOOLEAN_VALUE_TYPE = BooleanValueType()
+
+P = ParamSpec('P')
+
+
+class ValueGenerator(Generic[T, P]):
+ INTERACTIVE_KEY: ClassVar[Literal["interactive"]] = "interactive"
+
+ def __init__(self, f: Callable[P, T], /, attributes: None | Mapping[str, Any] = None) -> None:
+ self._f = f
+ self._attributes = attributes or {}
+
+ @property
+ def f(self) -> Callable[P, T]:
+ return self._f
+
+ @property
+ def attributes(self) -> Mapping[str, Any]:
+ return self._attributes
+
+ def generate(self, *args, **kwargs) -> T:
+ return self._f(*args, **kwargs)
+
+ def __call__(self, *args, **kwargs):
+ return self._f(*args, **kwargs)
+
+ @property
+ def interactive(self) -> bool:
+ return self._attributes.get(ValueGenerator.INTERACTIVE_KEY, False)
+
+ @staticmethod
+ def create_interactive(f: Callable[P, T], interactive: bool = True, /,
+ attributes: None | Mapping[str, Any] = None) -> "ValueGenerator[T, P]":
+ return ValueGenerator(f, dict({ValueGenerator.INTERACTIVE_KEY: interactive}, **(attributes or {})))
+
+
+class UuidValueGenerator(ValueGenerator[str, []]):
+ def __init__(self) -> None:
+ super().__init__(lambda: str(uuid.uuid4()))
+
+
+class RandomStringValueGenerator(ValueGenerator[str, []]):
+ @staticmethod
+ def _create_generate_ramdom_func(length: int, secure: bool) -> Callable[str, []]:
+ random_choice = secrets.choice if secure else random.choice
+
+ def generate_random_string():
+ characters = string.ascii_letters + string.digits
+ random_string = ''.join(random_choice(characters) for _ in range(length))
+ return random_string
+
+ return generate_random_string
+
+ def __init__(self, length: int, secure: bool) -> None:
+ super().__init__(RandomStringValueGenerator._create_generate_ramdom_func(length, secure))
+
+
+UUID_VALUE_GENERATOR = UuidValueGenerator()
diff --git a/tools/cru-py/crupest/__init__.py b/tools/cru-py/crupest/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tools/cru-py/crupest/__init__.py
diff --git a/tools/cru-py/crupest/__main__.py b/tools/cru-py/crupest/__main__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tools/cru-py/crupest/__main__.py
diff --git a/tools/cru-py/crupest/backup.py b/tools/cru-py/crupest/backup.py
new file mode 100644
index 0000000..7921d0d
--- /dev/null
+++ b/tools/cru-py/crupest/backup.py
@@ -0,0 +1,41 @@
+from .path import *
+from rich.prompt import Prompt, Confirm
+from urllib.request import urlretrieve
+import subprocess
+from datetime import datetime
+
+
+def backup_restore(http_url_or_path, /, console):
+ url = http_url_or_path
+ if len(url) == 0:
+ raise Exception("You specify an empty url. Abort.")
+ if url.startswith("http://") or url.startswith("https://"):
+ download_path = os.path.join(tmp_dir, "data.tar.xz")
+ if os.path.exists(download_path):
+ to_remove = Confirm.ask(
+ f"I want to download to [cyan]{download_path}[/]. However, there is a file already there. Do you want to remove it first", default=False, console=console)
+ if to_remove:
+ os.remove(download_path)
+ else:
+ raise Exception(
+ "Aborted! Please check the file and try again.")
+ urlretrieve(url, download_path)
+ url = download_path
+ subprocess.run(["sudo", "tar", "-xJf", url, "-C", project_dir], check=True)
+
+
+def backup_backup(path, /, console):
+ ensure_backup_dir()
+ now = datetime.utcnow().isoformat(timespec="seconds") + "Z"
+ if path is None:
+ path = Prompt.ask(
+ "You don't specify the path to backup to. Please specify one. http and https are NOT supported", console=console, default=os.path.join(backup_dir, now + ".tar.xz"))
+ if len(path) == 0:
+ raise Exception("You specify an empty path. Abort!")
+ if os.path.exists(path):
+ raise Exception(
+ "A file is already there. Please remove it first. Abort!")
+ subprocess.run(
+ ["sudo", "tar", "-cJf", path, "data", "-C", project_dir],
+ check=True
+ )
diff --git a/tools/cru-py/crupest/certbot.py b/tools/cru-py/crupest/certbot.py
new file mode 100644
index 0000000..8c89fa7
--- /dev/null
+++ b/tools/cru-py/crupest/certbot.py
@@ -0,0 +1,119 @@
+from typing import Literal, cast
+import os
+from os.path import join
+import subprocess
+from cryptography.x509 import load_pem_x509_certificate, DNSName, SubjectAlternativeName
+from cryptography.x509.oid import ExtensionOID
+from .tui import Paths, ensure_file, create_dir_if_not_exists, console
+
+CertbotAction = Literal['create', 'expand', 'shrink', 'renew']
+
+
+class Certbot:
+ def __init__(self, root_domain: str, subdomains: list[str]) -> None:
+ """
+ subdomain: like ["a", "b.c", ...]
+ """
+ self.root_domain = root_domain
+ self.subdomains = subdomains
+ self.domains = [
+ root_domain, *[f"{subdomain}.{root_domain}" for subdomain in subdomains]]
+
+ def generate_command(self, action: CertbotAction, /, test=False, no_docker=False, *, standalone=None, email=None, agree_tos=False) -> str:
+ add_domain_option = True
+ if action == 'create':
+ if standalone == None:
+ standalone = True
+ certbot_action = "certonly"
+ elif action == 'expand' or action == 'shrink':
+ if standalone == None:
+ standalone = False
+ certbot_action = "certonly"
+ elif action == 'renew':
+ if standalone == None:
+ standalone = False
+ add_domain_option = False
+ certbot_action = "renew"
+ else:
+ raise ValueError('Invalid action')
+
+ if no_docker:
+ command = "certbot "
+ else:
+ expose_segment = ' -p "0.0.0.0:80:80"'
+ web_root_segment = f' -v "{Paths.project_abs_path}/data/certbot/webroot:/var/www/certbot"'
+ command = f'docker run -it --rm --name certbot -v "{Paths.project_abs_path}/data/certbot/certs:/etc/letsencrypt" -v "{Paths.project_abs_path}/data/certbot/data:/var/lib/letsencrypt"{ expose_segment if standalone else web_root_segment} certbot/certbot '
+
+ command += certbot_action
+
+ if standalone:
+ command += " --standalone"
+ else:
+ command += ' --webroot -w /var/www/certbot'
+
+ if add_domain_option:
+ command += f' -d {" -d ".join(self.domains)}'
+
+ if email is not None:
+ command += f' --email {email}'
+
+ if agree_tos:
+ command += ' --agree-tos'
+
+ if test:
+ command += " --test-cert --dry-run"
+
+ return command
+
+ def get_cert_path(self) -> str:
+ return join(Paths.data_dir, "certbot", "certs", "live", self.root_domain, "fullchain.pem")
+
+ def get_cert_actual_domains(self, cert_path: str | None = None) -> None | list[str]:
+ if cert_path is None:
+ cert_path = self.get_cert_path()
+
+ if not ensure_file(cert_path):
+ return None
+
+ with open(cert_path, 'rb') as f:
+ cert = load_pem_x509_certificate(f.read())
+ ext = cert.extensions.get_extension_for_oid(
+ ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
+ domains: list[str] = cast(
+ SubjectAlternativeName, ext.value).get_values_for_type(DNSName)
+
+ # This weird code is to make sure the root domain is the first one
+ if self.root_domain in domains:
+ domains.remove(self.root_domain)
+ domains = [self.root_domain, *domains]
+
+ return domains
+
+ def print_create_cert_message(self):
+ console.print(
+ "Looks like you haven't run certbot to get the init ssl certificates. You may want to run following code to get one:", style="cyan")
+ console.print(self.generate_command("create"),
+ soft_wrap=True, highlight=False)
+
+ def check_ssl_cert(self, tmp_dir: str = Paths.tmp_dir):
+ cert_path = self.get_cert_path()
+ tmp_cert_path = join(tmp_dir, "fullchain.pem")
+ console.print("Temporarily copy cert to tmp...", style="yellow")
+ create_dir_if_not_exists(tmp_dir)
+ subprocess.run(
+ ["sudo", "cp", cert_path, tmp_cert_path], check=True)
+ subprocess.run(["sudo", "chown", str(
+ os.geteuid()), tmp_cert_path], check=True)
+ cert_domains = self.get_cert_actual_domains(tmp_cert_path)
+ if cert_domains is None:
+ self.print_create_cert_message()
+ else:
+ cert_domain_set = set(cert_domains)
+ domains = set(self.domains)
+ if not cert_domain_set == domains:
+ console.print(
+ "Cert domains are not equal to host domains. Run following command to recreate it with nginx stopped.", style="red")
+ console.print(self.generate_command(
+ "create", standalone=True), soft_wrap=True, highlight=False)
+ console.print("Remove tmp cert...", style="yellow")
+ os.remove(tmp_cert_path)
diff --git a/tools/cru-py/crupest/config.py b/tools/cru-py/crupest/config.py
new file mode 100644
index 0000000..7a63e2a
--- /dev/null
+++ b/tools/cru-py/crupest/config.py
@@ -0,0 +1,134 @@
+import os
+import typing
+import uuid
+import random
+import string
+from dataclasses import dataclass
+
+from rich.prompt import Prompt
+
+from cru.config import Configuration
+from cru.parsing import SimpleLineConfigParser
+from .path import config_file_path
+
+
+@dataclass
+class ConfigurationMigrationInfo:
+ duplicate_item_in_old_config: list[str]
+ item
+
+
+class OldConfiguration:
+ def __init__(self, items: None | dict[str, str] = None) -> None:
+ self._items = items or {}
+
+ @staticmethod
+ def load_from_str(s: str) -> tuple["OldConfiguration", list[str, str]]:
+ d, duplicate = SimpleLineConfigParser().parse_to_dict(s, True)
+ return OldConfiguration(d), duplicate
+
+ def convert_to_new_config(self) -> Configuration:
+
+
+class ConfigVar:
+ def __init__(self, name: str, description: str, default_value_generator: typing.Callable[[], str] | str, /,
+ default_value_for_ask=str | None):
+ """Create a config var.
+
+ Args:
+ name (str): The name of the config var.
+ description (str): The description of the config var.
+ default_value_generator (typing.Callable[[], str] | str): The default value generator of the config var. If it is a string, it will be used as the input prompt and let user input the value.
+ """
+ self.name = name
+ self.description = description
+ self.default_value_generator = default_value_generator
+ self.default_value_for_ask = default_value_for_ask
+
+ def get_default_value(self, /, console):
+ if isinstance(self.default_value_generator, str):
+ return Prompt.ask(self.default_value_generator, console=console, default=self.default_value_for_ask)
+ else:
+ return self.default_value_generator()
+
+
+config_var_list: list = [
+ ConfigVar("CRUPEST_DOMAIN", "domain name",
+ "Please input your domain name"),
+ ConfigVar("CRUPEST_EMAIL", "admin email address",
+ "Please input your email address"),
+ ConfigVar("CRUPEST_AUTO_BACKUP_COS_SECRET_ID",
+ "access key id for Tencent COS, used for auto backup",
+ "Please input your Tencent COS access key id for backup"),
+ ConfigVar("CRUPEST_AUTO_BACKUP_COS_SECRET_KEY",
+ "access key secret for Tencent COS, used for auto backup",
+ "Please input your Tencent COS access key for backup"),
+ ConfigVar("CRUPEST_AUTO_BACKUP_COS_REGION",
+ "region for Tencent COS, used for auto backup", "Please input your Tencent COS region for backup",
+ "ap-hongkong"),
+ ConfigVar("CRUPEST_AUTO_BACKUP_BUCKET_NAME",
+ "bucket name for Tencent COS, used for auto backup",
+ "Please input your Tencent COS bucket name for backup"),
+ ConfigVar("CRUPEST_GITHUB_USERNAME",
+ "github username for fetching todos", "Please input your github username for fetching todos", "crupest"),
+ ConfigVar("CRUPEST_GITHUB_PROJECT_NUMBER",
+ "github project number for fetching todos", "Please input your github project number for fetching todos",
+ "2"),
+ ConfigVar("CRUPEST_GITHUB_TOKEN",
+ "github token for fetching todos", "Please input your github token for fetching todos"),
+ ConfigVar("CRUPEST_GITHUB_TODO_COUNT",
+ "github todo count", "Please input your github todo count", 10),
+ ConfigVar("CRUPEST_GITHUB_TODO_COUNT",
+ "github todo count", "Please input your github todo count", 10),
+ ConfigVar("CRUPEST_V2RAY_TOKEN",
+ "v2ray user id", generate_uuid),
+ ConfigVar("CRUPEST_V2RAY_PATH",
+ "v2ray path, which will be prefixed by _", generate_uuid),
+ ConfigVar("CRUPEST_FORGEJO_MAILER_USER",
+ "Forgejo SMTP user.", "Please input your Forgejo SMTP user."),
+ ConfigVar("CRUPEST_FORGEJO_MAILER_PASSWD",
+ "Forgejo SMTP password.", "Please input your Forgejo SMTP password."),
+ ConfigVar("CRUPEST_2FAUTH_APP_KEY",
+ "2FAuth App Key.", generate_random_string_32),
+ ConfigVar("CRUPEST_2FAUTH_MAIL_USERNAME",
+ "2FAuth SMTP user.", "Please input your 2FAuth SMTP user."),
+ ConfigVar("CRUPEST_2FAUTH_MAIL_PASSWORD",
+ "2FAuth SMTP password.", "Please input your 2FAuth SMTP password."),
+]
+
+config_var_name_set = set([config_var.name for config_var in config_var_list])
+
+
+def check_config_var_set(needed_config_var_set: set[str]) -> tuple[bool, list[str], list[str]]:
+ more = []
+ less = []
+ for var_name in needed_config_var_set:
+ if var_name not in config_var_name_set:
+ more.append(var_name)
+ for var_name in config_var_name_set:
+ if var_name not in needed_config_var_set:
+ less.append(var_name)
+ return (True if len(more) == 0 else False, more, less)
+
+
+def config_file_exists():
+ return ensure_file(Paths.config_file_path, must_exist=False)
+
+
+def parse_config(str: str) -> dict[str, str]:
+ return ConfigMap().load_from_str(str).to_dict()
+
+
+def get_domain() -> str:
+ if configuration is None:
+ raise ValueError("Config file not found!")
+ return configuration.get_domain()
+
+
+def config_to_str(config: dict) -> str:
+ return "\n".join([f"{key}={value}" for key, value in config.items()])
+
+
+def print_config(console, config: dict) -> None:
+ for key, value in config.items():
+ console.print(f"[magenta]{key}[/] = [cyan]{value}")
diff --git a/tools/cru-py/crupest/dns.py b/tools/cru-py/crupest/dns.py
new file mode 100644
index 0000000..5006d5f
--- /dev/null
+++ b/tools/cru-py/crupest/dns.py
@@ -0,0 +1,42 @@
+from os.path import *
+from io import StringIO
+import re
+from .nginx import *
+
+
+def generate_dns_zone(domain: str, ip: str, /, ttl: str | int = 600, *, enable_mail: bool = True, dkim: str | None = None) -> str:
+ result = f"$ORIGIN {domain}.\n\n"
+ result += "; A records\n"
+ result += f"@ {ttl} IN A {ip}\n"
+ subdomains = list_subdomain_names()
+ for subdomain in subdomains:
+ result += f"{subdomain} {ttl} IN A {ip}\n"
+
+ if enable_mail:
+ result += "\n; MX records\n"
+ result += f"@ {ttl} IN MX 10 mail.{domain}.\n"
+ result += "\n; SPF record\n"
+ result += f"@ {ttl} IN TXT \"v=spf1 mx ~all\"\n"
+ if dkim is not None:
+ result += "\n; DKIM record\n"
+ result += f"mail._domainkey {ttl} IN TEXT \"{dkim}\""
+ result += "\n; DMARC record\n"
+ result += "_dmarc {ttl} IN TXT \"v=DMARC1; p=none; rua=mailto:dmarc.report@{domain}; ruf=mailto:dmarc.report@{domain}; sp=none; ri=86400\"\n"
+ return result
+
+
+def get_dkim_from_mailserver(domain: str) -> str | None:
+ dkim_path = join(data_dir, "dms/config/opendkim/keys", domain, "mail.txt")
+ if not exists(dkim_path):
+ return None
+
+ p = subprocess.run(["sudo", "cat", dkim_path],
+ capture_output=True, check=True)
+ value = ""
+ for match in re.finditer("\"(.*)\"", p.stdout.decode('utf-8')):
+ value += match.group(1)
+ return value
+
+
+def generate_dns_zone_with_dkim(domain: str, ip: str, /, ttl: str | int = 600) -> str:
+ return generate_dns_zone(domain, ip, ttl, enable_mail=True, dkim=get_dkim_from_mailserver(domain))
diff --git a/tools/cru-py/crupest/download_tools.py b/tools/cru-py/crupest/download_tools.py
new file mode 100644
index 0000000..beb06d4
--- /dev/null
+++ b/tools/cru-py/crupest/download_tools.py
@@ -0,0 +1,47 @@
+import sys
+from os.path import *
+from urllib.request import *
+from rich.prompt import Confirm
+from .path import *
+from .helper import print_order
+
+
+TOOLS = [("docker-mailserver setup script", "docker-mailserver-setup.sh",
+ "https://raw.githubusercontent.com/docker-mailserver/docker-mailserver/master/setup.sh")]
+
+
+def download_tools(console):
+ # if we are not linux, we prompt the user
+ if sys.platform != "linux":
+ console.print(
+ "You are not running this script on linux. The tools will not work.", style="yellow")
+ if not Confirm.ask("Do you want to continue?", default=False, console=console):
+ return
+
+ for index, script in enumerate(TOOLS):
+ number = index + 1
+ total = len(TOOLS)
+ print_order(number, total, console)
+ name, filename, url = script
+ # if url is callable, call it
+ if callable(url):
+ url = url()
+ path = join(tool_dir, filename)
+ skip = False
+ if exists(path):
+ overwrite = Confirm.ask(
+ f"[cyan]{name}[/] already exists, download and overwrite?", default=False, console=console)
+ if not overwrite:
+ skip = True
+ else:
+ download = Confirm.ask(
+ f"Download [cyan]{name}[/] to [magenta]{path}[/]?", default=True, console=console)
+ if not download:
+ skip = True
+ if not skip:
+ console.print(f"Downloading {name}...")
+ urlretrieve(url, path)
+ os.chmod(path, 0o755)
+ console.print(f"Downloaded {name} to {path}.", style="green")
+ else:
+ console.print(f"Skipped {name}.", style="yellow")
diff --git a/tools/cru-py/crupest/helper.py b/tools/cru-py/crupest/helper.py
new file mode 100644
index 0000000..f8fe34a
--- /dev/null
+++ b/tools/cru-py/crupest/helper.py
@@ -0,0 +1,18 @@
+import os
+import os.path
+from .path import *
+
+
+def run_in_dir(dir: str, func: callable):
+ old_dir = os.path.abspath(os.getcwd())
+ os.chdir(dir)
+ func()
+ os.chdir(old_dir)
+
+
+def run_in_project_dir(func: callable):
+ run_in_dir(project_dir, func)
+
+
+def print_order(number: int, total: int, /, console) -> None:
+ console.print(f"\[{number}/{total}]", end=" ", style="green")
diff --git a/tools/cru-py/crupest/install_docker.py b/tools/cru-py/crupest/install_docker.py
new file mode 100644
index 0000000..ac50290
--- /dev/null
+++ b/tools/cru-py/crupest/install_docker.py
@@ -0,0 +1,16 @@
+from os.path import *
+from .path import *
+import urllib
+import subprocess
+
+
+def install_docker():
+ ensure_tmp_dir()
+ get_docker_path = join(tmp_dir, "get-docker.sh")
+ urllib.request.urlretrieve("https://get.docker.com", get_docker_path)
+ os.chmod(get_docker_path, 0o755)
+ subprocess.run(["sudo", "sh", get_docker_path], check=True)
+ subprocess.run(["sudo", "systemctl", "enable",
+ "--now", "docker"], check=True)
+ subprocess.run(["sudo", "usermod", "-aG", "docker",
+ os.getlogin()], check=True)
diff --git a/tools/cru-py/crupest/nginx.py b/tools/cru-py/crupest/nginx.py
new file mode 100644
index 0000000..1ec5c6b
--- /dev/null
+++ b/tools/cru-py/crupest/nginx.py
@@ -0,0 +1,246 @@
+from typing import cast
+import json
+import jsonschema
+import os
+from os.path import *
+import shutil
+import subprocess
+from rich.prompt import Confirm
+from cryptography.x509 import *
+from cryptography.x509.oid import ExtensionOID
+from .template import Template
+from .path import *
+
+with open(join(nginx_template_dir, 'server.json')) as f:
+ server = json.load(f)
+
+with open(join(nginx_template_dir, 'server.schema.json')) as f:
+ schema = json.load(f)
+
+jsonschema.validate(server, schema)
+
+non_template_files = ['forbid_unknown_domain.conf', "websocket.conf"]
+
+ssl_template = Template(join(nginx_template_dir, 'ssl.conf.template'))
+root_template = Template(join(
+ nginx_template_dir, 'root.conf.template'))
+static_file_template = Template(join(
+ nginx_template_dir, 'static-file.conf.template'))
+reverse_proxy_template = Template(join(
+ nginx_template_dir, 'reverse-proxy.conf.template'))
+redirect_template = Template(join(
+ nginx_template_dir, 'redirect.conf.template'))
+cert_only_template = Template(join(
+ nginx_template_dir, 'cert-only.conf.template'))
+
+nginx_var_set = set.union(root_template.var_set,
+ static_file_template.var_set, reverse_proxy_template.var_set)
+
+
+def list_subdomain_names() -> list:
+ return [s["subdomain"] for s in server["sites"]]
+
+
+def list_subdomains(domain: str) -> list:
+ return [f"{s['subdomain']}.{domain}" for s in server["sites"]]
+
+
+def list_domains(domain: str) -> list:
+ return [domain, *list_subdomains(domain)]
+
+
+def generate_nginx_config(domain: str, original_config, dest: str) -> None:
+ if not isdir(dest):
+ raise ValueError('dest must be a directory')
+ # copy ssl.conf and https-redirect.conf which need no variable substitution
+ for filename in non_template_files:
+ src = join(nginx_template_dir, filename)
+ dst = join(dest, filename)
+ shutil.copyfile(src, dst)
+ config = {
+ "CRUPEST_DOMAIN": domain,
+ "CRUPEST_V2RAY_TOKEN": original_config["CRUPEST_V2RAY_TOKEN"],
+ "CRUPEST_V2RAY_PATH": original_config["CRUPEST_V2RAY_PATH"]
+ }
+ # generate ssl.conf
+ with open(join(dest, 'ssl.conf'), 'w') as f:
+ f.write(ssl_template.generate(config))
+ # generate root.conf
+ with open(join(dest, f'{domain}.conf'), 'w') as f:
+ root_config = config.copy()
+ root_config["CRUPEST_V2RAY_TOKEN"] = config["CRUPEST_V2RAY_TOKEN"]
+ root_config["CRUPEST_V2RAY_PATH"] = config["CRUPEST_V2RAY_PATH"]
+ f.write(root_template.generate(config))
+ # generate nginx config for each site
+ sites: list = server["sites"]
+ for site in sites:
+ subdomain = site["subdomain"]
+ local_config = config.copy()
+ local_config['CRUPEST_NGINX_SUBDOMAIN'] = subdomain
+ if site["type"] == 'static-file':
+ template = static_file_template
+ local_config['CRUPEST_NGINX_ROOT'] = site["root"]
+ elif site["type"] == 'reverse-proxy':
+ template = reverse_proxy_template
+ local_config['CRUPEST_NGINX_UPSTREAM_SERVER'] = site["upstream"]
+ elif site["type"] == 'redirect':
+ template = redirect_template
+ local_config['CRUPEST_NGINX_URL'] = site["url"]
+ elif site["type"] == 'cert-only':
+ template = cert_only_template
+ else:
+ raise Exception('Invalid site type')
+ with open(join(dest, f'{subdomain}.{domain}.conf'), 'w') as f:
+ f.write(template.generate(local_config))
+
+
+def check_nginx_config_dir(dir_path: str, domain: str) -> list:
+ if not exists(dir_path):
+ return []
+ good_files = [*non_template_files, "ssl.conf", *
+ [f"{full_domain}.conf" for full_domain in list_domains(domain)]]
+ bad_files = []
+ for path in os.listdir(dir_path):
+ file_name = basename(path)
+ if file_name not in good_files:
+ bad_files.append(file_name)
+ return bad_files
+
+
+def restart_nginx(force=False) -> bool:
+ if not force:
+ p = subprocess.run(['docker', "container", "ls",
+ "-f", "name=nginx", "-q"], capture_output=True)
+ container: str = p.stdout.decode("utf-8")
+ if len(container.strip()) == 0:
+ return False
+ subprocess.run(['docker', 'restart', 'nginx'])
+ return True
+
+
+def nginx(domain: str, config, /, console) -> None:
+ bad_files = check_nginx_config_dir(nginx_config_dir, domain)
+ if len(bad_files) > 0:
+ console.print(
+ "WARNING: It seems there are some bad conf files in the nginx config directory:", style="yellow")
+ for bad_file in bad_files:
+ console.print(bad_file, style="cyan")
+ to_delete = Confirm.ask(
+ "They will affect nginx in a [red]bad[/] way. Do you want to delete them?", default=True, console=console)
+ if to_delete:
+ for file in bad_files:
+ os.remove(join(nginx_config_dir, file))
+ console.print(
+ "I have found following var in nginx templates:", style="green")
+ for var in nginx_var_set:
+ console.print(var, style="magenta")
+ if not exists(nginx_config_dir):
+ os.mkdir(nginx_config_dir)
+ console.print(
+ f"Nginx config directory created at [magenta]{nginx_config_dir}[/]", style="green")
+ generate_nginx_config(domain, config, dest=nginx_config_dir)
+ console.print("Nginx config generated.", style="green")
+ if restart_nginx():
+ console.print('Nginx restarted.', style="green")
+
+
+def certbot_command_gen(domain: str, action, /, test=False, no_docker=False, *, standalone=None, email=None, agree_tos=False) -> str:
+ domains = list_domains(domain)
+
+ add_domain_option = True
+ if action == 'create':
+ if standalone == None:
+ standalone = True
+ certbot_action = "certonly"
+ elif action == 'expand':
+ if standalone == None:
+ standalone = False
+ certbot_action = "certonly"
+ elif action == 'renew':
+ if standalone == None:
+ standalone = False
+ add_domain_option = False
+ certbot_action = "renew"
+ else:
+ raise ValueError('Invalid action')
+
+ if no_docker:
+ command = "certbot "
+ else:
+ expose_segment = ' -p "0.0.0.0:80:80"'
+ web_root_segment = ' -v "{project_abs_path}/data/certbot/webroot:/var/www/certbot"'
+ command = f'docker run -it --rm --name certbot -v "{project_abs_path}/data/certbot/certs:/etc/letsencrypt" -v "{project_abs_path}/data/certbot/data:/var/lib/letsencrypt"{ expose_segment if standalone else web_root_segment} certbot/certbot '
+
+ command += certbot_action
+
+ if standalone:
+ command += " --standalone"
+ else:
+ command += ' --webroot -w /var/www/certbot'
+
+ if add_domain_option:
+ command += f' -d {" -d ".join(domains)}'
+
+ if email is not None:
+ command += f' --email {email}'
+
+ if agree_tos:
+ command += ' --agree-tos'
+
+ if test:
+ command += " --test-cert --dry-run"
+
+ return command
+
+
+def get_cert_path(root_domain):
+ return join(data_dir, "certbot", "certs", "live", root_domain, "fullchain.pem")
+
+
+def get_cert_domains(cert_path, root_domain):
+
+ if not exists(cert_path):
+ return None
+
+ if not isfile(cert_path):
+ return None
+
+ with open(cert_path, 'rb') as f:
+ cert = load_pem_x509_certificate(f.read())
+ ext = cert.extensions.get_extension_for_oid(
+ ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
+ domains: list[str] = cast(
+ SubjectAlternativeName, ext.value).get_values_for_type(DNSName)
+ domains.remove(root_domain)
+ domains = [root_domain, *domains]
+ return domains
+
+
+def print_create_cert_message(domain, console):
+ console.print(
+ "Looks like you haven't run certbot to get the init ssl certificates. You may want to run following code to get one:", style="cyan")
+ console.print(certbot_command_gen(domain, "create"),
+ soft_wrap=True, highlight=False)
+
+
+def check_ssl_cert(domain, console):
+ cert_path = get_cert_path(domain)
+ tmp_cert_path = join(tmp_dir, "fullchain.pem")
+ console.print("Temporarily copy cert to tmp...", style="yellow")
+ subprocess.run(
+ ["sudo", "cp", cert_path, tmp_cert_path], check=True)
+ subprocess.run(["sudo", "chown", str(os.geteuid()),
+ tmp_cert_path], check=True)
+ cert_domains = get_cert_domains(tmp_cert_path, domain)
+ if cert_domains is None:
+ print_create_cert_message(domain, console)
+ else:
+ cert_domain_set = set(cert_domains)
+ domains = set(list_domains(domain))
+ if not cert_domain_set == domains:
+ console.print(
+ "Cert domains are not equal to host domains. Run following command to recreate it with nginx stopped.", style="red")
+ console.print(certbot_command_gen(
+ domain, "create", standalone=True), soft_wrap=True, highlight=False)
+ console.print("Remove tmp cert...", style="yellow")
+ os.remove(tmp_cert_path)
diff --git a/tools/cru-py/crupest/path.py b/tools/cru-py/crupest/path.py
new file mode 100644
index 0000000..0cfcfb8
--- /dev/null
+++ b/tools/cru-py/crupest/path.py
@@ -0,0 +1,57 @@
+import os
+import os.path
+
+script_dir = os.path.relpath(os.path.dirname(__file__))
+project_dir = os.path.normpath(os.path.join(script_dir, "../../../"))
+project_abs_path = os.path.abspath(project_dir)
+template_dir = os.path.join(project_dir, "template")
+nginx_template_dir = os.path.join(template_dir, "nginx")
+data_dir = os.path.join(project_dir, "data")
+tool_dir = os.path.join(project_dir, "tools")
+tmp_dir = os.path.join(project_dir, "tmp")
+backup_dir = os.path.join(project_dir, "backup")
+config_file_path = os.path.join(data_dir, "config")
+nginx_config_dir = os.path.join(project_dir, "nginx-config")
+log_dir = os.path.join(project_dir, "log")
+
+
+def ensure_file(path: str, /, must_exist: bool = True) -> bool:
+ if must_exist and not os.path.exists(path):
+ raise Exception(f"File {path} does not exist!")
+ if not os.path.exists(path):
+ return False
+ if not os.path.isfile(path):
+ raise Exception(f"{path} is not a file!")
+ return True
+
+
+def ensure_dir(path: str, /, must_exist: bool = True) -> bool:
+ if must_exist and not os.path.exists(path):
+ raise Exception(f"Directory {path} does not exist!")
+ if not os.path.exists(path):
+ return False
+ if not os.path.isdir(path):
+ raise Exception(f"{path} is not a directory!")
+ return True
+
+
+class Paths:
+ script_dir = os.path.relpath(os.path.dirname(__file__))
+ project_dir = os.path.normpath(os.path.join(script_dir, "../../"))
+ project_abs_path = os.path.abspath(project_dir)
+ data_dir = os.path.join(project_dir, "data")
+ config_file_path = os.path.join(data_dir, "config")
+ template_dir = os.path.join(project_dir, "template")
+ tool_dir = os.path.join(project_dir, "tool")
+ tmp_dir = os.path.join(project_dir, "tmp")
+ backup_dir = os.path.join(project_dir, "backup")
+ log_dir = os.path.join(project_dir, "log")
+ template2_dir = os.path.join(project_dir, "template2")
+ nginx2_template_dir = os.path.join(template2_dir, "nginx")
+ generated_dir = os.path.join(project_dir, "generated")
+ nginx_generated_dir = os.path.join(generated_dir, "nginx")
+
+
+def create_dir_if_not_exists(path: str) -> None:
+ if not ensure_dir(path, must_exist=False):
+ os.mkdir(path)
diff --git a/tools/cru-py/crupest/setup.py b/tools/cru-py/crupest/setup.py
new file mode 100644
index 0000000..4e91302
--- /dev/null
+++ b/tools/cru-py/crupest/setup.py
@@ -0,0 +1,233 @@
+from os.path import *
+from datetime import datetime
+from rich.prompt import Confirm
+from .path import *
+from .nginx import *
+from .config import *
+from .helper import *
+
+
+def get_template_name_list(console) -> list[str]:
+ console.print("First let's check all the templates...")
+
+ # get all filenames ending with .template
+ template_name_list = [basename(f)[:-len('.template')] for f in os.listdir(
+ template_dir) if f.endswith(".template")]
+ console.print(
+ f"I have found following template files in [magenta]{template_dir}[/]:", style="green")
+ for filename in template_name_list:
+ console.print(f"{filename}.template", style="magenta")
+
+ return template_name_list
+
+
+def data_dir_check(domain, console):
+ if isdir(data_dir):
+ if not exists(join(data_dir, "certbot")):
+ print_create_cert_message(domain, console)
+ else:
+ to_check = Confirm.ask(
+ "I want to check your ssl certs, but I need to sudo. Do you want me check", console=console, default=False)
+ if to_check:
+ check_ssl_cert(domain, console)
+
+
+def template_generate(console):
+ template_name_list = get_template_name_list(console)
+ template_list: list = []
+ config_var_name_set_in_template = set()
+ for template_name in template_name_list:
+ template = Template(join(template_dir, template_name+".template"))
+ template_list.append(template)
+ config_var_name_set_in_template.update(template.var_set)
+
+ console.print(
+ "I have found following variables needed in templates:", style="green")
+ for key in config_var_name_set_in_template:
+ console.print(key, style="magenta")
+
+ # check vars
+ check_success, more, less = check_config_var_set(
+ config_var_name_set_in_template)
+ if len(more) != 0:
+ console.print("There are more variables in templates than in config file:",
+ style="red")
+ for key in more:
+ console.print(key, style="magenta")
+ if len(less) != 0:
+ console.print("Following config vars are not used:",
+ style="yellow")
+ for key in less:
+ console.print(key, style="magenta")
+
+ if not check_success:
+ console.print(
+ "Please check you config vars and make sure the needed ones are defined!", style="red")
+ else:
+ console.print(
+ "Now let's check if they are already generated...")
+
+ conflict = False
+
+ # check if there exists any generated files
+ for filename in template_name_list:
+ if exists(join(project_dir, filename)):
+ console.print(f"Found [magenta]{filename}[/]")
+ conflict = True
+
+ to_gen = True
+ if conflict:
+ to_overwrite = Confirm.ask(
+ "It seems there are some files already generated. Do you want to overwrite them?", console=console, default=False)
+ if not to_overwrite:
+ to_gen = False
+ console.print(
+ "Great! Check the existing files and see you next time!", style="green")
+ else:
+ print("No conflict found. Let's go on!\n")
+
+ if to_gen:
+ console.print("Check for existing config file...")
+
+ # check if there exists a config file
+ if not config_file_exists():
+ config = {}
+ console.print(
+ "No existing config file found. Don't worry. Let's create one!", style="green")
+ for config_var in config_var_list:
+ config[config_var.name] = config_var.get_default_value()
+ config_content = config_to_str(config)
+ # create data dir if not exist
+ if not exists(data_dir):
+ os.mkdir(data_dir)
+ # write config file
+ with open(config_file_path, "w") as f:
+ f.write(config_content)
+ console.print(
+ f"Everything else is auto generated. The config file is written into [magenta]{config_file_path}[/]. You had better keep it safe. And here is the content:", style="green")
+ print_config(console, config)
+ is_ok = Confirm.ask(
+ "If you think it's not ok, you can stop here and edit it. Or let's go on?", console=console, default=True)
+ if not is_ok:
+ console.print(
+ "Great! Check the config file and see you next time!", style="green")
+ to_gen = False
+ else:
+ console.print(
+ "Looks like you have already had a config file. Let's check the content:", style="green")
+ with open(config_file_path, "r") as f:
+ content = f.read()
+ config = parse_config(content)
+ print_config(console, config)
+ missed_config_vars = []
+ for config_var in config_var_list:
+ if config_var.name not in config:
+ missed_config_vars.append(config_var)
+
+ if len(missed_config_vars) > 0:
+ console.print(
+ "Oops! It seems you have missed some keys in your config file. Let's add them!", style="green")
+ for config_var in missed_config_vars:
+ config[config_var.name] = config_var.get_default_value(
+ console)
+ content = config_to_str(config)
+ with open(config_file_path, "w") as f:
+ f.write(content)
+ console.print(
+ f"Here is the new config, it has been written out to [magenta]{config_file_path}[/]:")
+ print_config(console, config)
+ good_enough = Confirm.ask("Is it good enough?",
+ console=console, default=True)
+ if not good_enough:
+ console.print(
+ "Great! Check the config file and see you next time!", style="green")
+ to_gen = False
+
+ domain = get_domain()
+
+ if to_gen:
+ console.print(
+ "Finally, everything is ready. Let's generate the files:", style="green")
+
+ # generate files
+ for index, template in enumerate(template_list):
+ number = index + 1
+ total = len(template_list)
+ print_order(number, total, console)
+ console.print(
+ f"Generating [magenta]{template.template_name}[/]...")
+ content = template.generate(config)
+ with open(join(project_dir, template.template_name), "w") as f:
+ f.write(content)
+
+ # generate nginx config
+ if not exists(nginx_config_dir):
+ to_gen_nginx_conf = Confirm.ask("It seems you haven't generate nginx config. Do you want to generate it?",
+ default=True, console=console)
+ else:
+ # get the latest time of files in nginx template
+ template_time = 0
+ for path in os.listdir(nginx_template_dir):
+ template_time = max(template_time, os.stat(
+ join(nginx_template_dir, path)).st_mtime)
+ console.print(
+ f"Nginx template update time: {datetime.fromtimestamp(template_time)}")
+
+ nginx_config_time = 0
+ for path in os.listdir(nginx_config_dir):
+ nginx_config_time = max(nginx_config_time, os.stat(
+ join(nginx_config_dir, path)).st_mtime)
+ console.print(
+ f"Generated nginx template update time: {datetime.fromtimestamp(nginx_config_time)}")
+ if template_time > nginx_config_time:
+ to_gen_nginx_conf = Confirm.ask("It seems you have updated the nginx template and not regenerate config. Do you want to regenerate the nginx config?",
+ default=True, console=console)
+ else:
+ to_gen_nginx_conf = Confirm.ask("[yellow]It seems you have already generated nginx config. Do you want to overwrite it?[/]",
+ default=False, console=console)
+ if to_gen_nginx_conf:
+ nginx(domain, config, console)
+ data_dir_check(domain, console)
+
+
+def clear(console, /, delete_data_dir=False):
+ template_name_list = get_template_name_list(console)
+ # check root if we have to delete data dir
+ if delete_data_dir and exists(data_dir) and os.geteuid() != 0:
+ console.print(
+ "You need to be root to delete data dir.", style="red")
+ exit(1)
+
+ to_delete = Confirm.ask(
+ "[yellow]Are you sure you want to delete everything? all your data will be lost![/]", default=False, console=console)
+ if to_delete:
+ files_to_delete = []
+ for template_name in template_name_list:
+ f = join(project_dir, template_name)
+ if exists(f):
+ files_to_delete.append(f)
+
+ delete_data_dir = delete_data_dir and exists(
+ data_dir)
+
+ if len(files_to_delete) == 0:
+ console.print(
+ "Nothing to delete. We are safe!", style="green")
+ else:
+ console.print("Here are the files to delete:")
+ for f in files_to_delete:
+ console.print(f, style="magenta")
+ if delete_data_dir:
+ console.print(data_dir + " (data dir)",
+ style="magenta")
+
+ to_delete = Confirm.ask(
+ "[red]Are you sure you want to delete them?[/]", default=False, console=console)
+ if to_delete:
+ for f in files_to_delete:
+ os.remove(f)
+ if delete_data_dir:
+ # recursively delete data dir
+ shutil.rmtree(data_dir)
+ console.print(
+ "Your workspace is clean now!", style="green")
diff --git a/tools/cru-py/crupest/template.py b/tools/cru-py/crupest/template.py
new file mode 100644
index 0000000..9747af1
--- /dev/null
+++ b/tools/cru-py/crupest/template.py
@@ -0,0 +1,32 @@
+import os.path
+import re
+
+
+class Template:
+ def __init__(self, template_path: str, var_prefix: str = "CRUPEST"):
+ if len(var_prefix) != 0 and re.fullmatch(r"^[a-zA-Z_][a-zA-Z0-9_]*$", var_prefix) is None:
+ raise ValueError("Invalid var prefix.")
+ self.template_path = template_path
+ self.template_name = os.path.basename(
+ template_path)[:-len(".template")]
+ with open(template_path, "r") as f:
+ self.template = f.read()
+ self.var_prefix = var_prefix
+ self.__var_regex = re.compile(r"\$(" + var_prefix + r"_[a-zA-Z0-9_]+)")
+ self.__var_brace_regex = re.compile(
+ r"\$\{\s*(" + var_prefix + r"_[a-zA-Z0-9_]+)\s*\}")
+ var_set = set()
+ for match in self.__var_regex.finditer(self.template):
+ var_set.add(match.group(1))
+ for match in self.__var_brace_regex.finditer(self.template):
+ var_set.add(match.group(1))
+ self.var_set = var_set
+
+ def generate(self, config: dict) -> str:
+ result = self.template
+ for var in self.var_set:
+ if var not in config:
+ raise ValueError(f"Missing config var {var}.")
+ result = result.replace("$" + var, config[var])
+ result = re.sub(r"\$\{\s*" + var + r"\s*\}", config[var], result)
+ return result
diff --git a/tools/cru-py/crupest/template2.py b/tools/cru-py/crupest/template2.py
new file mode 100644
index 0000000..ae096df
--- /dev/null
+++ b/tools/cru-py/crupest/template2.py
@@ -0,0 +1,45 @@
+import os.path
+import re
+
+_template_filename_suffix = ".template"
+_template_var_regex = r"\$([-_a-zA-Z0-9]+)"
+_template_var_brace_regex = r"\$\{\s*([-_a-zA-Z0-9]+?)\s*\}"
+
+
+class Template2:
+
+ @staticmethod
+ def from_file(template_path: str) -> "Template2":
+ if not template_path.endswith(_template_filename_suffix):
+ raise Exception(
+ "Template file must have a name ending with .template.")
+ template_name = os.path.basename(
+ template_path)[:-len(_template_filename_suffix)]
+ with open(template_path, "r") as f:
+ template = f.read()
+ return Template2(template_name, template, template_path=template_path)
+
+ def __init__(self, template_name: str, template: str, *, template_path: str | None = None) -> None:
+ self.template_name = template_name
+ self.template = template
+ self.template_path = template_path
+ self.var_set = set()
+ for match in re.finditer(_template_var_regex, self.template):
+ self.var_set.add(match.group(1))
+ for match in re.finditer(_template_var_brace_regex, self.template):
+ self.var_set.add(match.group(1))
+
+ def partial_render(self, vars: dict[str, str]) -> "Template2":
+ t = self.render(vars)
+ return Template2(self.template_name, t, template_path=self.template_path)
+
+ def render(self, vars: dict[str, str]) -> str:
+ for name in vars.keys():
+ if name not in self.var_set:
+ raise ValueError(f"Invalid var name {name}.")
+
+ text = self.template
+ for name, value in vars.items():
+ text = text.replace("$" + name, value)
+ text = re.sub(r"\$\{\s*" + name + r"\s*\}", value, text)
+ return text
diff --git a/tools/cru-py/crupest/test.py b/tools/cru-py/crupest/test.py
new file mode 100644
index 0000000..d6eb778
--- /dev/null
+++ b/tools/cru-py/crupest/test.py
@@ -0,0 +1,31 @@
+import json
+from http.client import *
+from urllib.request import urlopen
+
+
+def test_crupest_api(console):
+ def do_the_test():
+ res: HTTPResponse = urlopen("http://localhost:5188/api/todos")
+ body = res.read()
+
+ if res.status != 200:
+ raise Exception("Status code is not 200.")
+ result = json.loads(body)
+ if not isinstance(result, list):
+ raise Exception("Result is not an array.")
+ if len(result) == 0:
+ raise Exception("Result is an empty array.")
+ if not isinstance(result[0], dict):
+ raise Exception("Result[0] is not an object.")
+ if not isinstance(result[0].get("title"), str):
+ raise Exception("Result[0].title is not a string.")
+ if not isinstance(result[0].get("status"), str):
+ raise Exception("Result[0].status is not a string.")
+
+ try:
+ do_the_test()
+ console.print("Test passed!", style="green")
+ exit(0)
+ except Exception as e:
+ console.print(e)
+ console.print("Test failed!", style="red")
diff --git a/tools/cru-py/crupest/tui.py b/tools/cru-py/crupest/tui.py
new file mode 100644
index 0000000..20ba1dd
--- /dev/null
+++ b/tools/cru-py/crupest/tui.py
@@ -0,0 +1,7 @@
+from rich.console import Console
+from rich.prompt import Prompt, Confirm
+
+Prompt = Prompt
+Confirm = Confirm
+
+console = Console()
diff --git a/tools/cru-py/crupest/ui_base.py b/tools/cru-py/crupest/ui_base.py
new file mode 100644
index 0000000..b26e65b
--- /dev/null
+++ b/tools/cru-py/crupest/ui_base.py
@@ -0,0 +1,19 @@
+from .tui import console
+
+good_style = "green"
+warning_style = "yellow"
+error_style = "red bold"
+file_name_style = "cyan bold"
+var_style = "magenta bold"
+value_style = "cyan bold"
+bye_style = "cyan"
+
+
+def print_with_indent(value: str, style: str, /, indent: int = 0, *, indent_width: int = 2, end='\n'):
+ console.print(
+ f'{" " * indent * indent_width}[{style}]{value}[/]', end=end)
+
+
+def print_var_value(name: str, value: str, /, indent: int = 0, *, indent_width: int = 2, end='\n'):
+ console.print(
+ f'{" " * indent * indent_width}[{var_style}]{name}[/] = [{value_style}]{value}[/]', end=end)
diff --git a/tools/cru-py/requirements.txt b/tools/cru-py/requirements.txt
new file mode 100644
index 0000000..2fb5657
--- /dev/null
+++ b/tools/cru-py/requirements.txt
@@ -0,0 +1,3 @@
+rich
+jsonschema
+cryptography
diff --git a/tools/cru-py/update-blog b/tools/cru-py/update-blog
new file mode 100644
index 0000000..e4a25ab
--- /dev/null
+++ b/tools/cru-py/update-blog
@@ -0,0 +1,2 @@
+#! /usr/bin/env sh
+exec python3 "$(dirname "$0")/aio.py" update-blog "$@"
diff --git a/tools/cru-py/www-dev b/tools/cru-py/www-dev
new file mode 100644
index 0000000..f56d679
--- /dev/null
+++ b/tools/cru-py/www-dev
@@ -0,0 +1,8 @@
+#! /usr/bin/env sh
+
+set -e
+
+cd "$(dirname "$0")/../.."
+
+exec tmux new-session 'cd docker/crupest-nginx/sites/www && pnpm start' \; \
+ split-window -h 'cd docker/crupest-api/CrupestApi/CrupestApi && dotnet run --launch-profile dev'