aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcrupest <crupest@outlook.com>2024-11-11 01:12:29 +0800
committerYuqian Yang <crupest@crupest.life>2025-01-20 22:34:18 +0800
commit5c76a1257b4a058bf919af3e31cc9461a39c2f33 (patch)
treecb32f0c22e5438a0ed9de4b29f58d0b7f142a58d
parent12e1272508ba0b5909069319007d677c1c76e355 (diff)
downloadcrupest-5c76a1257b4a058bf919af3e31cc9461a39c2f33.tar.gz
crupest-5c76a1257b4a058bf919af3e31cc9461a39c2f33.tar.bz2
crupest-5c76a1257b4a058bf919af3e31cc9461a39c2f33.zip
HALF WORK: 2024.1.20 - 2
-rw-r--r--crupest-words.txt1
-rw-r--r--tools/cru-py/aio2
-rw-r--r--tools/cru-py/cru/service/_app.py4
-rw-r--r--tools/cru-py/cru/service/_config.py87
-rw-r--r--tools/cru-py/cru/service/_data.py9
-rw-r--r--tools/cru-py/cru/service/_docker.py19
-rw-r--r--tools/cru-py/cru/service/_external.py69
-rw-r--r--tools/cru-py/cru/service/_manager.py4
-rw-r--r--tools/cru-py/cru/service/_nginx.py227
-rw-r--r--tools/cru-py/cru/service/_template.py14
-rw-r--r--tools/cru-py/cru/service/nginx.py17
-rw-r--r--tools/cru-py/cru/template.py12
-rw-r--r--tools/cru-py/crupest/__init__.py0
-rw-r--r--tools/cru-py/crupest/__main__.py0
-rw-r--r--tools/cru-py/crupest/aio.py319
-rw-r--r--tools/cru-py/crupest/backup.py41
-rw-r--r--tools/cru-py/crupest/certbot.py119
-rw-r--r--tools/cru-py/crupest/config.py134
-rw-r--r--tools/cru-py/crupest/dns.py42
-rw-r--r--tools/cru-py/crupest/download_tools.py47
-rw-r--r--tools/cru-py/crupest/helper.py18
-rw-r--r--tools/cru-py/crupest/install_docker.py16
-rw-r--r--tools/cru-py/crupest/nginx.py246
-rw-r--r--tools/cru-py/crupest/path.py57
-rw-r--r--tools/cru-py/crupest/setup.py233
-rw-r--r--tools/cru-py/crupest/template.py32
-rw-r--r--tools/cru-py/crupest/test.py31
-rw-r--r--tools/cru-py/crupest/tui.py7
-rw-r--r--tools/cru-py/crupest/ui_base.py19
29 files changed, 350 insertions, 1476 deletions
diff --git a/crupest-words.txt b/crupest-words.txt
index 80992d1..378ea81 100644
--- a/crupest-words.txt
+++ b/crupest-words.txt
@@ -1,6 +1,7 @@
2fauth
aarch64
buildpackage
+certbot
chroot
confdir
cpio
diff --git a/tools/cru-py/aio b/tools/cru-py/aio
deleted file mode 100644
index f74877a..0000000
--- a/tools/cru-py/aio
+++ /dev/null
@@ -1,2 +0,0 @@
-#! /usr/bin/env sh
-exec python3 "$(dirname "$0")/aio.py" "$@"
diff --git a/tools/cru-py/cru/service/_app.py b/tools/cru-py/cru/service/_app.py
index e72baec..6030dad 100644
--- a/tools/cru-py/cru/service/_app.py
+++ b/tools/cru-py/cru/service/_app.py
@@ -5,9 +5,9 @@ from ._base import (
PathCommandProvider,
)
from ._config import ConfigManager
-from ._data import DataManager
from ._template import TemplateManager
from ._nginx import NginxManager
+from ._external import CliToolCommandProvider
APP_ID = "crupest"
@@ -17,10 +17,10 @@ class App(AppBase):
super().__init__(APP_ID, f"{APP_ID}-service")
self.add_feature(PathCommandProvider())
self.add_feature(AppInitializer())
- self.add_feature(DataManager())
self.add_feature(ConfigManager())
self.add_feature(TemplateManager())
self.add_feature(NginxManager())
+ self.add_feature(CliToolCommandProvider())
self.add_feature(CommandDispatcher())
def run_command(self):
diff --git a/tools/cru-py/cru/service/_config.py b/tools/cru-py/cru/service/_config.py
index 52fed34..b51e21c 100644
--- a/tools/cru-py/cru/service/_config.py
+++ b/tools/cru-py/cru/service/_config.py
@@ -141,45 +141,46 @@ class ConfigManager(AppCommandFeatureProvider):
super().__init__("config-manager")
configuration = Configuration()
self._configuration = configuration
+ self._loaded: bool = False
self._init_app_defined_items()
def _init_app_defined_items(self) -> None:
prefix = self.config_name_prefix
- def _add_text(name: str, description: str) -> None:
- self.configuration.add(
- ConfigItem(f"{prefix}_{name}", description, TEXT_VALUE_TYPE)
- )
-
- def _add_uuid(name: str, description: str) -> None:
- self.configuration.add(
- ConfigItem(
- f"{prefix}_{name}",
- description,
- TEXT_VALUE_TYPE,
- default=UuidValueGenerator(),
- )
+ def _add_text(name: str, description: str) -> ConfigItem:
+ item = ConfigItem(f"{prefix}_{name}", description, TEXT_VALUE_TYPE)
+ self.configuration.add(item)
+ return item
+
+ def _add_uuid(name: str, description: str) -> ConfigItem:
+ item = ConfigItem(
+ f"{prefix}_{name}",
+ description,
+ TEXT_VALUE_TYPE,
+ default=UuidValueGenerator(),
)
+ self.configuration.add(item)
+ return item
def _add_random_string(
name: str, description: str, length: int = 32, secure: bool = True
- ) -> None:
- self.configuration.add(
- ConfigItem(
- f"{prefix}_{name}",
- description,
- TEXT_VALUE_TYPE,
- default=RandomStringValueGenerator(length, secure),
- )
+ ) -> ConfigItem:
+ item = ConfigItem(
+ f"{prefix}_{name}",
+ description,
+ TEXT_VALUE_TYPE,
+ default=RandomStringValueGenerator(length, secure),
)
+ self.configuration.add(item)
+ return item
- def _add_int(name: str, description: str) -> None:
- self.configuration.add(
- ConfigItem(f"{prefix}_{name}", description, INTEGER_VALUE_TYPE)
- )
+ def _add_int(name: str, description: str) -> ConfigItem:
+ item = ConfigItem(f"{prefix}_{name}", description, INTEGER_VALUE_TYPE)
+ self.configuration.add(item)
+ return item
- _add_text("DOMAIN", "domain name")
- _add_text("EMAIL", "admin email address")
+ self._domain = _add_text("DOMAIN", "domain name")
+ self._email = _add_text("EMAIL", "admin email address")
_add_text(
"AUTO_BACKUP_COS_SECRET_ID",
"access key id for Tencent COS, used for auto backup",
@@ -247,16 +248,18 @@ class ConfigManager(AppCommandFeatureProvider):
def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None: ...
def get_item_value_str(self, name: str, ensure_set: bool = True) -> str | None:
- self.reload_config_file()
+ self.load_config_file()
item = self.get_item(name)
- if ensure_set and not item.is_set:
- raise AppConfigItemNotSetError(
- f"Config item '{name}' is not set.", self.configuration, [item]
- )
+ if not item.is_set:
+ if ensure_set:
+ raise AppConfigItemNotSetError(
+ f"Config item '{name}' is not set.", self.configuration, [item]
+ )
+ return None
return item.value_str
def get_str_dict(self, ensure_all_set: bool = True) -> dict[str, str]:
- self.reload_config_file()
+ self.load_config_file()
if ensure_all_set and not self.configuration.all_set:
raise AppConfigItemNotSetError(
"Some config items are not set.",
@@ -265,8 +268,15 @@ class ConfigManager(AppCommandFeatureProvider):
)
return self.configuration.to_str_dict()
- def get_domain_item_name(self) -> str:
- return f"{self.config_name_prefix}_DOMAIN"
+ @property
+ def domain_item_name(self) -> str:
+ return self._domain.name
+
+ def get_domain_value_str(self) -> str:
+ return self.get_item_value_str(self._domain.name)
+
+ def get_email_value_str_optional(self) -> str | None:
+ return self.get_item_value_str(self._email.name, ensure_set=False)
def _set_with_default(self) -> None:
if not self.configuration.all_not_set:
@@ -379,7 +389,7 @@ class ConfigManager(AppCommandFeatureProvider):
value_dict = self._check_type(entry_dict)
return value_dict
- def reload_config_file(self):
+ def _real_load_config_file(self) -> None:
self.configuration.reset_all()
value_dict = self._read_config_file()
for key, value in value_dict.items():
@@ -387,6 +397,11 @@ class ConfigManager(AppCommandFeatureProvider):
continue
self.configuration.set_config_item(key, value)
+ def load_config_file(self, force=False) -> None:
+ if force or not self._loaded:
+ self._real_load_config_file()
+ self._loaded = True
+
def _print_app_config_info(self):
for item in self.configuration:
print(item.description_str)
diff --git a/tools/cru-py/cru/service/_data.py b/tools/cru-py/cru/service/_data.py
deleted file mode 100644
index 885c8e8..0000000
--- a/tools/cru-py/cru/service/_data.py
+++ /dev/null
@@ -1,9 +0,0 @@
-from ._base import AppFeatureProvider
-
-
-class DataManager(AppFeatureProvider):
- def __init__(self) -> None:
- super().__init__("data-manager")
-
- def setup(self) -> None:
- pass
diff --git a/tools/cru-py/cru/service/_docker.py b/tools/cru-py/cru/service/_docker.py
deleted file mode 100644
index 9b801c4..0000000
--- a/tools/cru-py/cru/service/_docker.py
+++ /dev/null
@@ -1,19 +0,0 @@
-import subprocess
-
-from cru.tool import ExternalTool
-
-
-class DockerController(ExternalTool):
- DOCKER_BIN_NAME = "docker"
-
- def __init__(self, docker_bin: None | str = None) -> None:
- super().__init__(docker_bin or self.DOCKER_BIN_NAME)
-
- def list_containers(self) -> L[str]:
- p = subprocess.run(
- [self.docker_bin, "container", "ls", ""], capture_output=True
- )
- return p.stdout.decode("utf-8").splitlines()
-
- def restart_container(self, container_name: str) -> None:
- subprocess.run([self.docker_bin, "restart", container_name])
diff --git a/tools/cru-py/cru/service/_external.py b/tools/cru-py/cru/service/_external.py
new file mode 100644
index 0000000..418316a
--- /dev/null
+++ b/tools/cru-py/cru/service/_external.py
@@ -0,0 +1,69 @@
+from ._base import AppCommandFeatureProvider
+from ._nginx import NginxManager
+
+
+class CliToolCommandProvider(AppCommandFeatureProvider):
+ def __init__(self) -> None:
+ super().__init__("cli-tool-command-provider")
+
+ def setup(self):
+ pass
+
+ def get_command_info(self):
+ return ("gen-cli", "Get commands of running external cli tools.")
+
+ def setup_arg_parser(self, arg_parser):
+ subparsers = arg_parser.add_subparsers(
+ dest="gen_cli_command", required=True, metavar="GEN_CLI_COMMAND"
+ )
+ certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
+ certbot_parser.add_argument(
+ "-t", "--test", action="store_true", help="run certbot in test mode"
+ )
+ _install_docker_parser = subparsers.add_parser(
+ "install-docker", help="print docker commands"
+ )
+
+ def _print_install_docker_commands(self) -> None:
+ output = """
+### COMMAND: uninstall apt docker
+for pkg in docker.io docker-doc docker-compose \
+podman-docker containerd runc; \
+do sudo apt-get remove $pkg; done
+
+### COMMAND: prepare apt certs
+sudo apt-get update
+sudo apt-get install ca-certificates curl
+sudo install -m 0755 -d /etc/apt/keyrings
+
+### COMMAND: install certs
+sudo curl -fsSL https://download.docker.com/linux/debian/gpg \
+-o /etc/apt/keyrings/docker.asc
+sudo chmod a+r /etc/apt/keyrings/docker.asc
+
+### COMMAND: add docker apt source
+echo \\
+ "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] \
+https://download.docker.com/linux/debian \\
+ $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \\
+ sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
+
+### COMMAND: update apt and install docker
+sudo apt-get update
+sudo apt-get install docker-ce docker-ce-cli containerd.io \
+docker-buildx-plugin docker-compose-plugin
+
+### COMMAND: setup system for docker
+sudo systemctl enable docker
+sudo systemctl start docker
+sudo groupadd -f docker
+sudo usermod -aG docker $USER
+# Remember to log out and log back in for the group changes to take effect
+""".strip()
+ print(output)
+
+ def run_command(self, args):
+ if args.gen_cli_command == "certbot":
+ self.app.get_feature(NginxManager).print_all_certbot_commands(args.test)
+ elif args.gen_cli_command == "install-docker":
+ self._print_install_docker_commands()
diff --git a/tools/cru-py/cru/service/_manager.py b/tools/cru-py/cru/service/_manager.py
deleted file mode 100644
index c1af428..0000000
--- a/tools/cru-py/cru/service/_manager.py
+++ /dev/null
@@ -1,4 +0,0 @@
-class CruServiceManager:
- "TODO: Continue here tomorrow!"
- def __init__(self):
- \ No newline at end of file
diff --git a/tools/cru-py/cru/service/_nginx.py b/tools/cru-py/cru/service/_nginx.py
index ad29d21..a9013e2 100644
--- a/tools/cru-py/cru/service/_nginx.py
+++ b/tools/cru-py/cru/service/_nginx.py
@@ -1,36 +1,55 @@
from argparse import Namespace
+from enum import Enum, auto
import re
+import subprocess
+from typing import TypeAlias
+
+from cru import CruInternalError
from ._base import AppCommandFeatureProvider
from ._config import ConfigManager
from ._template import TemplateManager
+class CertbotAction(Enum):
+ CREATE = auto()
+ EXPAND = auto()
+ SHRINK = auto()
+ RENEW = auto()
+
+
class NginxManager(AppCommandFeatureProvider):
+ CertbotAction: TypeAlias = CertbotAction
+
def __init__(self) -> None:
super().__init__("nginx-manager")
self._domains_cache: list[str] | None = None
- self._domain_config_value_cache: str | None = None
def setup(self) -> None:
pass
@property
+ def _config_manager(self) -> ConfigManager:
+ return self.app.get_feature(ConfigManager)
+
+ @property
+ def root_domain(self) -> str:
+ return self._config_manager.get_domain_value_str()
+
+ @property
def domains(self) -> list[str]:
if self._domains_cache is None:
self._domains_cache = self._get_domains()
return self._domains_cache
@property
- def _domain_config_name(self) -> str:
- return self.app.get_feature(ConfigManager).get_domain_item_name()
+ def subdomains(self) -> list[str]:
+ suffix = "." + self.root_domain
+ return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)]
- def _get_domain_config_value(self) -> str:
- if self._domain_config_value_cache is None:
- self._domain_config_value_cache = self.app.get_feature(
- ConfigManager
- ).get_item_value_str(self._domain_config_name)
- return self._domain_config_value_cache
+ @property
+ def _domain_config_name(self) -> str:
+ return self._config_manager.domain_item_name
def _get_domains_from_text(self, text: str) -> set[str]:
domains: set[str] = set()
@@ -42,17 +61,11 @@ class NginxManager(AppCommandFeatureProvider):
for match in regex.finditer(text):
domain_part = match.group(1)
if domain_variable_str in domain_part:
- domains.add(
- domain_part.replace(
- domain_variable_str, self._get_domain_config_value()
- )
- )
+ domains.add(domain_part.replace(domain_variable_str, self.root_domain))
continue
m = brace_domain_variable_regex.search(domain_part)
if m:
- domains.add(
- domain_part.replace(m.group(0), self._get_domain_config_value())
- )
+ domains.add(domain_part.replace(m.group(0), self.root_domain))
continue
domains.add(domain_part)
return domains
@@ -68,13 +81,123 @@ class NginxManager(AppCommandFeatureProvider):
def _get_domains(self) -> list[str]:
text = self._get_nginx_conf_template_text()
domains = list(self._get_domains_from_text(text))
- domains.remove(self._get_domain_config_value())
- return [self._get_domain_config_value(), *domains]
+ domains.remove(self.root_domain)
+ return [self.root_domain, *domains]
def _print_domains(self) -> None:
for domain in self.domains:
print(domain)
+ def _certbot_command(
+ self,
+ action: CertbotAction | str,
+ /,
+ test=False,
+ no_docker=False,
+ *,
+ standalone=None,
+ email=None,
+ agree_tos=True,
+ ) -> str:
+ if isinstance(action, str):
+ action = CertbotAction[action.upper()]
+
+ command_args = []
+
+ add_domain_option = True
+ if action is CertbotAction.CREATE:
+ if standalone is None:
+ standalone = True
+ command_action = "certonly"
+ elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]:
+ if standalone is None:
+ standalone = False
+ command_action = "certonly"
+ elif action is CertbotAction.RENEW:
+ if standalone is None:
+ standalone = False
+ add_domain_option = False
+ command_action = "renew"
+ else:
+ raise CruInternalError("Invalid certbot action.")
+
+ data_dir = self.app.data_dir.full_path.as_posix()
+
+ if no_docker:
+ command_args.append("certbot")
+ else:
+ command_args.extend(
+ [
+ "docker run -it --rm --name certbot",
+ f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"',
+ f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"',
+ ]
+ )
+ if standalone:
+ command_args.append('-p "0.0.0.0:80:80"')
+ else:
+ command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"')
+
+ command_args.append("certbot/certbot")
+
+ command_args.append(command_action)
+
+ if standalone:
+ command_args.append("--standalone")
+ else:
+ command_args.append("--webroot -w /var/www/certbot")
+
+ if add_domain_option:
+ command_args.append(" ".join([f"-d {domain}" for domain in self.domains]))
+
+ if email is not None:
+ command_args.append(f"--email {email}")
+
+ if agree_tos:
+ command_args.append("--agree-tos")
+
+ if test:
+ command_args.append("--test-cert --dry-run")
+
+ return " ".join(command_args)
+
+ def print_all_certbot_commands(self, test: bool):
+ print("### COMMAND: (standalone) create certs")
+ print(
+ self._certbot_command(
+ CertbotAction.CREATE,
+ test,
+ email=self._config_manager.get_email_value_str_optional(),
+ )
+ )
+ print()
+ print("### COMMAND: (webroot+nginx) expand or shrink certs")
+ print(
+ self._certbot_command(
+ CertbotAction.EXPAND,
+ test,
+ email=self._config_manager.get_email_value_str_optional(),
+ )
+ )
+ print()
+ print("### COMMAND: (webroot+nginx) renew certs")
+ print(
+ self._certbot_command(
+ CertbotAction.RENEW,
+ test,
+ email=self._config_manager.get_email_value_str_optional(),
+ )
+ )
+
+ @property
+ def _cert_path_str(self) -> str:
+ return str(
+ self.app.data_dir.full_path
+ / "certbot/certs/live"
+ / self.root_domain
+ / "fullchain.pem"
+ )
+
def get_command_info(self):
return "nginx", "Manage nginx related things."
@@ -83,7 +206,73 @@ class NginxManager(AppCommandFeatureProvider):
dest="nginx_command", required=True, metavar="NGINX_COMMAND"
)
_list_parser = subparsers.add_parser("list", help="list domains")
+ certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
+ certbot_parser.add_argument(
+ "-t", "--test", action="store_true", help="run certbot in test mode"
+ )
def run_command(self, args: Namespace) -> None:
if args.nginx_command == "list":
self._print_domains()
+ elif args.nginx_command == "certbot":
+ self.print_all_certbot_commands(args.test)
+
+ def _generate_dns_zone(
+ self,
+ ip: str,
+ /,
+ ttl: str | int = 600,
+ *,
+ enable_mail: bool = True,
+ dkim: str | None = None,
+ ) -> str:
+ # TODO: Not complete and test now.
+ root_domain = self.root_domain
+ result = f"$ORIGIN {root_domain}.\n\n"
+ result += "; A records\n"
+ result += f"@ {ttl} IN A {ip}\n"
+ for subdomain in self.subdomains:
+ result += f"{subdomain} {ttl} IN A {ip}\n"
+
+ if enable_mail:
+ result += "\n; MX records\n"
+ result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n"
+ result += "\n; SPF record\n"
+ result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n'
+ if dkim is not None:
+ result += "\n; DKIM record\n"
+ result += f'mail._domainkey {ttl} IN TEXT "{dkim}"'
+ result += "\n; DMARC record\n"
+ dmarc_options = [
+ "v=DMARC1",
+ "p=none",
+ f"rua=mailto:dmarc.report@{root_domain}",
+ f"ruf=mailto:dmarc.report@{root_domain}",
+ "sp=none",
+ "ri=86400",
+ ]
+ result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n'
+ return result
+
+ def _get_dkim_from_mailserver(self) -> str | None:
+ # TODO: Not complete and test now.
+ dkim_path = (
+ self.app.data_dir.full_path
+ / "dms/config/opendkim/keys"
+ / self.root_domain
+ / "mail.txt"
+ )
+ if not dkim_path.exists():
+ return None
+
+ p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True)
+ value = ""
+ for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")):
+ value += match.group(1)
+ return value
+
+ def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str:
+ # TODO: Not complete and test now.
+ return self._generate_dns_zone(
+ ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver()
+ )
diff --git a/tools/cru-py/cru/service/_template.py b/tools/cru-py/cru/service/_template.py
index 9241a1f..ca2135f 100644
--- a/tools/cru-py/cru/service/_template.py
+++ b/tools/cru-py/cru/service/_template.py
@@ -45,10 +45,10 @@ class TemplateManager(AppCommandFeatureProvider):
for file in CruIterator(self.template_tree.templates).transform(lambda t: t[0]):
print(file.as_posix())
- def _generate_files(self) -> None:
+ def _generate_files(self, dry_run: bool) -> None:
config_manager = self.app.get_feature(ConfigManager)
self.template_tree.generate_to(
- self.generated_dir.full_path_str, config_manager.get_str_dict()
+ self.generated_dir.full_path_str, config_manager.get_str_dict(), dry_run
)
def get_command_info(self):
@@ -62,7 +62,10 @@ class TemplateManager(AppCommandFeatureProvider):
_variables_parser = subparsers.add_parser(
"variables", help="list variables used in all templates"
)
- _generate_parser = subparsers.add_parser("generate", help="generate templates")
+ generate_parser = subparsers.add_parser("generate", help="generate templates")
+ generate_parser.add_argument(
+ "--no-dry-run", action="store_true", help="generate and write target files"
+ )
def run_command(self, args: Namespace) -> None:
if args.template_command == "list":
@@ -71,4 +74,7 @@ class TemplateManager(AppCommandFeatureProvider):
for var in self.template_tree.variables:
print(var)
elif args.template_command == "generate":
- self._generate_files()
+ dry_run = not args.no_dry_run
+ self._generate_files(dry_run)
+ if dry_run:
+ print("Dry run successfully.")
diff --git a/tools/cru-py/cru/service/nginx.py b/tools/cru-py/cru/service/nginx.py
deleted file mode 100644
index ad32cb9..0000000
--- a/tools/cru-py/cru/service/nginx.py
+++ /dev/null
@@ -1,17 +0,0 @@
-import json
-import os
-import re
-import subprocess
-from typing import Literal, Any, cast, ClassVar
-
-
-
-def restart_nginx(force=False) -> bool:
- if not force:
- p = subprocess.run(['docker', "container", "ls",
- "-f", "name=nginx", "-q"], capture_output=True)
- container: str = p.stdout.decode("utf-8")
- if len(container.strip()) == 0:
- return False
- subprocess.run(['docker', 'restart', 'nginx'])
- return True
diff --git a/tools/cru-py/cru/template.py b/tools/cru-py/cru/template.py
index 2b0f1bc..74a5c9a 100644
--- a/tools/cru-py/cru/template.py
+++ b/tools/cru-py/cru/template.py
@@ -137,7 +137,13 @@ class TemplateTree:
s.update(template.variables)
return s
- def generate_to(self, destination: str, variables: Mapping[str, str]) -> None:
+ def generate_to(
+ self, destination: str, variables: Mapping[str, str], dry_run: bool
+ ) -> None:
for file, template in self.templates:
- with open(os.path.join(destination, file), "w") as f:
- f.write(template.generate(variables))
+ des = CruPath(destination) / file
+ text = template.generate(variables)
+ if not dry_run:
+ des.parent.mkdir(parents=True, exist_ok=True)
+ with open(des, "w") as f:
+ f.write(text)
diff --git a/tools/cru-py/crupest/__init__.py b/tools/cru-py/crupest/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/tools/cru-py/crupest/__init__.py
+++ /dev/null
diff --git a/tools/cru-py/crupest/__main__.py b/tools/cru-py/crupest/__main__.py
deleted file mode 100644
index e69de29..0000000
--- a/tools/cru-py/crupest/__main__.py
+++ /dev/null
diff --git a/tools/cru-py/crupest/aio.py b/tools/cru-py/crupest/aio.py
deleted file mode 100644
index 0a26146..0000000
--- a/tools/cru-py/crupest/aio.py
+++ /dev/null
@@ -1,319 +0,0 @@
-#!/usr/bin/env python3
-
-try:
- import rich
- import jsonschema
- import cryptography
-except ImportError:
- print("Some necessary crupest can't be imported. Please run `pip install -r requirements.txt` to install them.")
- exit(1)
-
-from os.path import *
-import argparse
-import subprocess
-from rich.prompt import Confirm
-from install_docker import *
-from path import *
-from nginx import *
-from config import *
-from check import *
-from backup import *
-from download_tools import *
-from test import *
-from dns import *
-from setup import *
-
-from tui import console
-
-
-parser = argparse.ArgumentParser(
- description="Crupest server all-in-one setup script. Have fun play with it!")
-parser.add_argument("--no-hello", action="store_true",
- default=False, help="Do not print hello message.")
-parser.add_argument("--no-bye-bye", action="store_true",
- default=False, help="Do not print bye-bye message.")
-
-parser.add_argument("--no-check-python-version", action="store_true",
- default=False, help="Do not check python version.")
-parser.add_argument("--no-check-system", action="store_true",
- default=False, help="Do not check system type.")
-parser.add_argument("-y", "--yes", action="store_true",
- default=False, help="Yes to all confirmation.")
-
-subparsers = parser.add_subparsers(dest="action")
-
-setup_parser = subparsers.add_parser(
- "setup", help="Do everything necessary to setup the server.")
-
-print_path_parser = subparsers.add_parser(
- "print-path", help="Print the paths of all related files and dirs.")
-
-download_tools_parser = subparsers.add_parser(
- "download-tools", help="Download some extra tools to manage the server.")
-
-list_domain_parser = subparsers.add_parser(
- "list-domain", help="Misc things about domains.")
-
-nginx_parser = subparsers.add_parser(
- "nginx", help="Generate nginx config.")
-
-certbot_parser = subparsers.add_parser(
- "certbot", help="Get some common certbot commands.")
-
-certbot_command_group = certbot_parser.add_mutually_exclusive_group()
-
-certbot_command_group.add_argument(
- "-C", "--create", action="store_true", default=False, help="Only print the command for 'create' action.")
-certbot_command_group.add_argument(
- "-E", "--expand", action="store_true", default=False, help="Only print the command for 'expand' action.")
-certbot_command_group.add_argument(
- "-R", "--renew", action="store_true", default=False, help="Only print the command for 'renew' action.")
-
-certbot_parser.add_argument(
- "-t", "--test", action="store_true", default=False, help="Make the commands for test use.")
-
-clear_parser = subparsers.add_parser(
- "clear", help="Delete existing data so you can make a fresh start.")
-clear_parser.add_argument("-D", "--include-data-dir", action="store_true",
- default=False, help="Also delete the data directory.")
-
-install_docker_parser = subparsers.add_parser(
- "install-docker", help="Install docker and docker-compose.")
-
-backup_parser = subparsers.add_parser(
- "backup", help="Backup related things."
-)
-
-backup_subparsers = backup_parser.add_subparsers(dest="backup_action")
-backup_restore_parser = backup_subparsers.add_parser(
- "restore", help="Restore data from url.")
-backup_restore_parser.add_argument(
- "restore_url", help="Restore archive url. Can be local path or http/https.")
-backup_backup_parser = backup_subparsers.add_parser(
- "backup", help="Backup data to specified path.")
-backup_backup_parser.add_argument(
- "backup_path", nargs="?", help="Backup path. Can be empty for a timestamp as name. Must be local path.")
-
-docker_parser = subparsers.add_parser("docker", help="Docker related things.")
-docker_subparsers = docker_parser.add_subparsers(dest="docker_action")
-docker_subparsers.add_parser("up", help="Run docker compose up -d.")
-docker_subparsers.add_parser("down", help="Run docker compose down.")
-docker_subparsers.add_parser(
- "prune", help="Run docker system prune -a -f.")
-
-test_parser = subparsers.add_parser("test", help="Test things.")
-test_parser.add_argument(
- "test_action", help="Test action.", choices=["crupest-api"])
-
-dns_parser = subparsers.add_parser("dns", help="Generate dns zone.")
-
-dns_parser.add_argument("-i", "--ip", help="IP address of the server.")
-
-git_update_parser = subparsers.add_parser(
- "git-update", help="Update git submodules.")
-
-update_blog_parser = subparsers.add_parser(
- "update-blog", help="Update and regenerate blog.")
-
-up_parser = subparsers.add_parser(
- "up", help="Do something necessary and then docker compose up.")
-
-down_parser = subparsers.add_parser(
- "down", help="Do something necessary and then docker compose down.")
-
-args = parser.parse_args()
-
-if args.yes:
- old_ask = Confirm.ask
-
- def new_ask(prompt, *args, console=console, default=None, **kwargs):
- default_text = ""
- if default is not None:
- default_text = "(y)" if default else "(n)"
- text = f"[prompt]{prompt}[/] [prompt.choices]\\[y/n][/] [prompt.default]{default_text}[/]"
- console.print(text)
- return True
-
- Confirm.ask = new_ask
-
-if (args.action == "certbot" and (args.create or args.renew or args.expand)) or (args.action == "dns" and args.ip is not None):
- args.no_hello = True
- args.no_bye_bye = True
-
-
-if not args.no_check_python_version:
- if check_python_version():
- console.print("This script works well on python 3.10. Otherwise you may encounter some problems. But I would like to improve some rational compatibility.", style="yellow")
-
-if not args.no_check_system:
- if not check_ubuntu():
- console.print("This script works well on Ubuntu 22.04. Otherwise you may encounter some problems. But I would like to improve some rational compatibility.", style="yellow")
-
-
-if not args.no_hello:
- console.print("Nice to see you! :waving_hand:", style="cyan")
-
-
-def check_domain_is_defined():
- try:
- return get_domain()
- except Exception as e:
- console.print(e.args[0], style="red")
- raise e
-
-
-def git_update():
- def do_it():
- subprocess.run(["git", "pull"], check=True)
- run_in_project_dir(do_it)
-
-
-def update_blog():
- def do_it():
- subprocess.run(["docker", "compose", "exec",
- "crupest-blog", "/scripts/update.bash"], check=True)
- run_in_project_dir(do_it)
-
-
-def docker_compose_up():
- def do_docker_compose_up():
- subprocess.run(["docker", "compose", "up", "-d"], check=True)
- run_in_dir(project_abs_path, do_docker_compose_up)
-
-
-def docker_compose_down():
- def do_docker_compose_down():
- subprocess.run(
- ["docker", "compose", "down"], check=True)
- run_in_dir(project_abs_path, do_docker_compose_down)
-
-
-action = args.action
-
-
-def run():
- match action:
- case "install-docker":
- install_docker()
- console.print(
- "Succeeded to install docker. Please re-login to take effect.", style="green")
-
- case "docker":
- docker_action = args.docker_action
-
- match docker_action:
- case "up":
- docker_compose_up()
- case "down":
- docker_compose_down()
- case "prune":
- to_do = Confirm.ask(
- "[yellow]Are you sure to prune docker?[/]", console=console)
- if to_do:
- subprocess.run(
- ["docker", "system", "prune", "-a", "-f"], check=True)
- case _:
- raise ValueError("Unknown docker action.")
-
- case "backup":
- backup_action = args.backup_action
- match backup_action:
- case "backup":
- backup_backup(args.backup_path, console)
- console.print("Succeeded to restore data.", style="green")
- case "restore":
- backup_restore(args.restore_path, console)
- console.print("Succeeded to backup data.", style="green")
-
- case 'print-path':
- console.print("Project path =", project_dir)
- console.print("Project absolute path =", project_abs_path)
- console.print("Data path =", data_dir)
-
- case "download-tools":
- download_tools(console)
-
- case "list-domain":
- domain = check_domain_is_defined()
- domains = list_domains(domain)
- for domain in domains:
- console.print(domain)
-
- case "nginx":
- raise Exception("This command is deprecated.")
-
- case "certbot":
- domain = check_domain_is_defined()
- is_test = args.test
- if args.create:
- console.print(certbot_command_gen(domain, "create",
- test=is_test), soft_wrap=True, highlight=False)
- elif args.expand:
- console.print(certbot_command_gen(domain, "expand",
- test=is_test), soft_wrap=True, highlight=False)
- elif args.renew:
- console.print(certbot_command_gen(domain, "renew",
- test=is_test), soft_wrap=True, highlight=False)
- else:
- console.print(
- "Here is some commands you can use to do certbot related work.")
- if is_test:
- console.print(
- "Note you specified --test, so the commands are for test use.", style="yellow")
- console.print(
- "To create certs for init (standalone):", style="cyan")
- console.print(certbot_command_gen(
- domain, 'create', test=is_test), soft_wrap=True)
- console.print("To expand certs (nginx):", style="cyan")
- console.print(certbot_command_gen(
- domain, 'create', test=is_test), soft_wrap=True)
- console.print(
- "To renew certs previously created (nginx):", style="cyan")
- console.print(certbot_command_gen(
- domain, 'renew', test=is_test), soft_wrap=True)
- case "test":
- match args.test_action:
- case "crupest-api":
- test_crupest_api(console)
- case _:
- console.print("Test action invalid.", style="red")
-
- case "dns":
- domain = check_domain_is_defined()
- if domain is not None:
- if args.ip is None:
- ip = Prompt.ask(
- "Please enter your server ip", console=console)
- else:
- ip = args.ip
- console.print(generate_dns_zone_with_dkim(
- domain, ip), soft_wrap=True, highlight=False)
-
- case "git-update":
- git_update()
-
- case "update-blog":
- update_blog()
-
- case "up":
- git_update()
- template_generate(console)
- docker_compose_up()
-
- case "down":
- docker_compose_down()
-
- case "clear":
- clear(console, args.include_data_dir)
-
- case _:
- template_generate(console)
- if Confirm.ask(
- "By the way, would you like to download some scripts to do some extra setup like creating email user?", console=console, default=True):
- download_tools(console)
-
-
-run()
-
-if not args.no_bye_bye:
- console.print(":beers: All done! Bye bye!", style="green")
diff --git a/tools/cru-py/crupest/backup.py b/tools/cru-py/crupest/backup.py
deleted file mode 100644
index 7921d0d..0000000
--- a/tools/cru-py/crupest/backup.py
+++ /dev/null
@@ -1,41 +0,0 @@
-from .path import *
-from rich.prompt import Prompt, Confirm
-from urllib.request import urlretrieve
-import subprocess
-from datetime import datetime
-
-
-def backup_restore(http_url_or_path, /, console):
- url = http_url_or_path
- if len(url) == 0:
- raise Exception("You specify an empty url. Abort.")
- if url.startswith("http://") or url.startswith("https://"):
- download_path = os.path.join(tmp_dir, "data.tar.xz")
- if os.path.exists(download_path):
- to_remove = Confirm.ask(
- f"I want to download to [cyan]{download_path}[/]. However, there is a file already there. Do you want to remove it first", default=False, console=console)
- if to_remove:
- os.remove(download_path)
- else:
- raise Exception(
- "Aborted! Please check the file and try again.")
- urlretrieve(url, download_path)
- url = download_path
- subprocess.run(["sudo", "tar", "-xJf", url, "-C", project_dir], check=True)
-
-
-def backup_backup(path, /, console):
- ensure_backup_dir()
- now = datetime.utcnow().isoformat(timespec="seconds") + "Z"
- if path is None:
- path = Prompt.ask(
- "You don't specify the path to backup to. Please specify one. http and https are NOT supported", console=console, default=os.path.join(backup_dir, now + ".tar.xz"))
- if len(path) == 0:
- raise Exception("You specify an empty path. Abort!")
- if os.path.exists(path):
- raise Exception(
- "A file is already there. Please remove it first. Abort!")
- subprocess.run(
- ["sudo", "tar", "-cJf", path, "data", "-C", project_dir],
- check=True
- )
diff --git a/tools/cru-py/crupest/certbot.py b/tools/cru-py/crupest/certbot.py
deleted file mode 100644
index 8c89fa7..0000000
--- a/tools/cru-py/crupest/certbot.py
+++ /dev/null
@@ -1,119 +0,0 @@
-from typing import Literal, cast
-import os
-from os.path import join
-import subprocess
-from cryptography.x509 import load_pem_x509_certificate, DNSName, SubjectAlternativeName
-from cryptography.x509.oid import ExtensionOID
-from .tui import Paths, ensure_file, create_dir_if_not_exists, console
-
-CertbotAction = Literal['create', 'expand', 'shrink', 'renew']
-
-
-class Certbot:
- def __init__(self, root_domain: str, subdomains: list[str]) -> None:
- """
- subdomain: like ["a", "b.c", ...]
- """
- self.root_domain = root_domain
- self.subdomains = subdomains
- self.domains = [
- root_domain, *[f"{subdomain}.{root_domain}" for subdomain in subdomains]]
-
- def generate_command(self, action: CertbotAction, /, test=False, no_docker=False, *, standalone=None, email=None, agree_tos=False) -> str:
- add_domain_option = True
- if action == 'create':
- if standalone == None:
- standalone = True
- certbot_action = "certonly"
- elif action == 'expand' or action == 'shrink':
- if standalone == None:
- standalone = False
- certbot_action = "certonly"
- elif action == 'renew':
- if standalone == None:
- standalone = False
- add_domain_option = False
- certbot_action = "renew"
- else:
- raise ValueError('Invalid action')
-
- if no_docker:
- command = "certbot "
- else:
- expose_segment = ' -p "0.0.0.0:80:80"'
- web_root_segment = f' -v "{Paths.project_abs_path}/data/certbot/webroot:/var/www/certbot"'
- command = f'docker run -it --rm --name certbot -v "{Paths.project_abs_path}/data/certbot/certs:/etc/letsencrypt" -v "{Paths.project_abs_path}/data/certbot/data:/var/lib/letsencrypt"{ expose_segment if standalone else web_root_segment} certbot/certbot '
-
- command += certbot_action
-
- if standalone:
- command += " --standalone"
- else:
- command += ' --webroot -w /var/www/certbot'
-
- if add_domain_option:
- command += f' -d {" -d ".join(self.domains)}'
-
- if email is not None:
- command += f' --email {email}'
-
- if agree_tos:
- command += ' --agree-tos'
-
- if test:
- command += " --test-cert --dry-run"
-
- return command
-
- def get_cert_path(self) -> str:
- return join(Paths.data_dir, "certbot", "certs", "live", self.root_domain, "fullchain.pem")
-
- def get_cert_actual_domains(self, cert_path: str | None = None) -> None | list[str]:
- if cert_path is None:
- cert_path = self.get_cert_path()
-
- if not ensure_file(cert_path):
- return None
-
- with open(cert_path, 'rb') as f:
- cert = load_pem_x509_certificate(f.read())
- ext = cert.extensions.get_extension_for_oid(
- ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
- domains: list[str] = cast(
- SubjectAlternativeName, ext.value).get_values_for_type(DNSName)
-
- # This weird code is to make sure the root domain is the first one
- if self.root_domain in domains:
- domains.remove(self.root_domain)
- domains = [self.root_domain, *domains]
-
- return domains
-
- def print_create_cert_message(self):
- console.print(
- "Looks like you haven't run certbot to get the init ssl certificates. You may want to run following code to get one:", style="cyan")
- console.print(self.generate_command("create"),
- soft_wrap=True, highlight=False)
-
- def check_ssl_cert(self, tmp_dir: str = Paths.tmp_dir):
- cert_path = self.get_cert_path()
- tmp_cert_path = join(tmp_dir, "fullchain.pem")
- console.print("Temporarily copy cert to tmp...", style="yellow")
- create_dir_if_not_exists(tmp_dir)
- subprocess.run(
- ["sudo", "cp", cert_path, tmp_cert_path], check=True)
- subprocess.run(["sudo", "chown", str(
- os.geteuid()), tmp_cert_path], check=True)
- cert_domains = self.get_cert_actual_domains(tmp_cert_path)
- if cert_domains is None:
- self.print_create_cert_message()
- else:
- cert_domain_set = set(cert_domains)
- domains = set(self.domains)
- if not cert_domain_set == domains:
- console.print(
- "Cert domains are not equal to host domains. Run following command to recreate it with nginx stopped.", style="red")
- console.print(self.generate_command(
- "create", standalone=True), soft_wrap=True, highlight=False)
- console.print("Remove tmp cert...", style="yellow")
- os.remove(tmp_cert_path)
diff --git a/tools/cru-py/crupest/config.py b/tools/cru-py/crupest/config.py
deleted file mode 100644
index 7a63e2a..0000000
--- a/tools/cru-py/crupest/config.py
+++ /dev/null
@@ -1,134 +0,0 @@
-import os
-import typing
-import uuid
-import random
-import string
-from dataclasses import dataclass
-
-from rich.prompt import Prompt
-
-from cru.config import Configuration
-from cru.parsing import SimpleLineConfigParser
-from .path import config_file_path
-
-
-@dataclass
-class ConfigurationMigrationInfo:
- duplicate_item_in_old_config: list[str]
- item
-
-
-class OldConfiguration:
- def __init__(self, items: None | dict[str, str] = None) -> None:
- self._items = items or {}
-
- @staticmethod
- def load_from_str(s: str) -> tuple["OldConfiguration", list[str, str]]:
- d, duplicate = SimpleLineConfigParser().parse_to_dict(s, True)
- return OldConfiguration(d), duplicate
-
- def convert_to_new_config(self) -> Configuration:
-
-
-class ConfigVar:
- def __init__(self, name: str, description: str, default_value_generator: typing.Callable[[], str] | str, /,
- default_value_for_ask=str | None):
- """Create a config var.
-
- Args:
- name (str): The name of the config var.
- description (str): The description of the config var.
- default_value_generator (typing.Callable[[], str] | str): The default value generator of the config var. If it is a string, it will be used as the input prompt and let user input the value.
- """
- self.name = name
- self.description = description
- self.default_value_generator = default_value_generator
- self.default_value_for_ask = default_value_for_ask
-
- def get_default_value(self, /, console):
- if isinstance(self.default_value_generator, str):
- return Prompt.ask(self.default_value_generator, console=console, default=self.default_value_for_ask)
- else:
- return self.default_value_generator()
-
-
-config_var_list: list = [
- ConfigVar("CRUPEST_DOMAIN", "domain name",
- "Please input your domain name"),
- ConfigVar("CRUPEST_EMAIL", "admin email address",
- "Please input your email address"),
- ConfigVar("CRUPEST_AUTO_BACKUP_COS_SECRET_ID",
- "access key id for Tencent COS, used for auto backup",
- "Please input your Tencent COS access key id for backup"),
- ConfigVar("CRUPEST_AUTO_BACKUP_COS_SECRET_KEY",
- "access key secret for Tencent COS, used for auto backup",
- "Please input your Tencent COS access key for backup"),
- ConfigVar("CRUPEST_AUTO_BACKUP_COS_REGION",
- "region for Tencent COS, used for auto backup", "Please input your Tencent COS region for backup",
- "ap-hongkong"),
- ConfigVar("CRUPEST_AUTO_BACKUP_BUCKET_NAME",
- "bucket name for Tencent COS, used for auto backup",
- "Please input your Tencent COS bucket name for backup"),
- ConfigVar("CRUPEST_GITHUB_USERNAME",
- "github username for fetching todos", "Please input your github username for fetching todos", "crupest"),
- ConfigVar("CRUPEST_GITHUB_PROJECT_NUMBER",
- "github project number for fetching todos", "Please input your github project number for fetching todos",
- "2"),
- ConfigVar("CRUPEST_GITHUB_TOKEN",
- "github token for fetching todos", "Please input your github token for fetching todos"),
- ConfigVar("CRUPEST_GITHUB_TODO_COUNT",
- "github todo count", "Please input your github todo count", 10),
- ConfigVar("CRUPEST_GITHUB_TODO_COUNT",
- "github todo count", "Please input your github todo count", 10),
- ConfigVar("CRUPEST_V2RAY_TOKEN",
- "v2ray user id", generate_uuid),
- ConfigVar("CRUPEST_V2RAY_PATH",
- "v2ray path, which will be prefixed by _", generate_uuid),
- ConfigVar("CRUPEST_FORGEJO_MAILER_USER",
- "Forgejo SMTP user.", "Please input your Forgejo SMTP user."),
- ConfigVar("CRUPEST_FORGEJO_MAILER_PASSWD",
- "Forgejo SMTP password.", "Please input your Forgejo SMTP password."),
- ConfigVar("CRUPEST_2FAUTH_APP_KEY",
- "2FAuth App Key.", generate_random_string_32),
- ConfigVar("CRUPEST_2FAUTH_MAIL_USERNAME",
- "2FAuth SMTP user.", "Please input your 2FAuth SMTP user."),
- ConfigVar("CRUPEST_2FAUTH_MAIL_PASSWORD",
- "2FAuth SMTP password.", "Please input your 2FAuth SMTP password."),
-]
-
-config_var_name_set = set([config_var.name for config_var in config_var_list])
-
-
-def check_config_var_set(needed_config_var_set: set[str]) -> tuple[bool, list[str], list[str]]:
- more = []
- less = []
- for var_name in needed_config_var_set:
- if var_name not in config_var_name_set:
- more.append(var_name)
- for var_name in config_var_name_set:
- if var_name not in needed_config_var_set:
- less.append(var_name)
- return (True if len(more) == 0 else False, more, less)
-
-
-def config_file_exists():
- return ensure_file(Paths.config_file_path, must_exist=False)
-
-
-def parse_config(str: str) -> dict[str, str]:
- return ConfigMap().load_from_str(str).to_dict()
-
-
-def get_domain() -> str:
- if configuration is None:
- raise ValueError("Config file not found!")
- return configuration.get_domain()
-
-
-def config_to_str(config: dict) -> str:
- return "\n".join([f"{key}={value}" for key, value in config.items()])
-
-
-def print_config(console, config: dict) -> None:
- for key, value in config.items():
- console.print(f"[magenta]{key}[/] = [cyan]{value}")
diff --git a/tools/cru-py/crupest/dns.py b/tools/cru-py/crupest/dns.py
deleted file mode 100644
index 5006d5f..0000000
--- a/tools/cru-py/crupest/dns.py
+++ /dev/null
@@ -1,42 +0,0 @@
-from os.path import *
-from io import StringIO
-import re
-from .nginx import *
-
-
-def generate_dns_zone(domain: str, ip: str, /, ttl: str | int = 600, *, enable_mail: bool = True, dkim: str | None = None) -> str:
- result = f"$ORIGIN {domain}.\n\n"
- result += "; A records\n"
- result += f"@ {ttl} IN A {ip}\n"
- subdomains = list_subdomain_names()
- for subdomain in subdomains:
- result += f"{subdomain} {ttl} IN A {ip}\n"
-
- if enable_mail:
- result += "\n; MX records\n"
- result += f"@ {ttl} IN MX 10 mail.{domain}.\n"
- result += "\n; SPF record\n"
- result += f"@ {ttl} IN TXT \"v=spf1 mx ~all\"\n"
- if dkim is not None:
- result += "\n; DKIM record\n"
- result += f"mail._domainkey {ttl} IN TEXT \"{dkim}\""
- result += "\n; DMARC record\n"
- result += "_dmarc {ttl} IN TXT \"v=DMARC1; p=none; rua=mailto:dmarc.report@{domain}; ruf=mailto:dmarc.report@{domain}; sp=none; ri=86400\"\n"
- return result
-
-
-def get_dkim_from_mailserver(domain: str) -> str | None:
- dkim_path = join(data_dir, "dms/config/opendkim/keys", domain, "mail.txt")
- if not exists(dkim_path):
- return None
-
- p = subprocess.run(["sudo", "cat", dkim_path],
- capture_output=True, check=True)
- value = ""
- for match in re.finditer("\"(.*)\"", p.stdout.decode('utf-8')):
- value += match.group(1)
- return value
-
-
-def generate_dns_zone_with_dkim(domain: str, ip: str, /, ttl: str | int = 600) -> str:
- return generate_dns_zone(domain, ip, ttl, enable_mail=True, dkim=get_dkim_from_mailserver(domain))
diff --git a/tools/cru-py/crupest/download_tools.py b/tools/cru-py/crupest/download_tools.py
deleted file mode 100644
index beb06d4..0000000
--- a/tools/cru-py/crupest/download_tools.py
+++ /dev/null
@@ -1,47 +0,0 @@
-import sys
-from os.path import *
-from urllib.request import *
-from rich.prompt import Confirm
-from .path import *
-from .helper import print_order
-
-
-TOOLS = [("docker-mailserver setup script", "docker-mailserver-setup.sh",
- "https://raw.githubusercontent.com/docker-mailserver/docker-mailserver/master/setup.sh")]
-
-
-def download_tools(console):
- # if we are not linux, we prompt the user
- if sys.platform != "linux":
- console.print(
- "You are not running this script on linux. The tools will not work.", style="yellow")
- if not Confirm.ask("Do you want to continue?", default=False, console=console):
- return
-
- for index, script in enumerate(TOOLS):
- number = index + 1
- total = len(TOOLS)
- print_order(number, total, console)
- name, filename, url = script
- # if url is callable, call it
- if callable(url):
- url = url()
- path = join(tool_dir, filename)
- skip = False
- if exists(path):
- overwrite = Confirm.ask(
- f"[cyan]{name}[/] already exists, download and overwrite?", default=False, console=console)
- if not overwrite:
- skip = True
- else:
- download = Confirm.ask(
- f"Download [cyan]{name}[/] to [magenta]{path}[/]?", default=True, console=console)
- if not download:
- skip = True
- if not skip:
- console.print(f"Downloading {name}...")
- urlretrieve(url, path)
- os.chmod(path, 0o755)
- console.print(f"Downloaded {name} to {path}.", style="green")
- else:
- console.print(f"Skipped {name}.", style="yellow")
diff --git a/tools/cru-py/crupest/helper.py b/tools/cru-py/crupest/helper.py
deleted file mode 100644
index f8fe34a..0000000
--- a/tools/cru-py/crupest/helper.py
+++ /dev/null
@@ -1,18 +0,0 @@
-import os
-import os.path
-from .path import *
-
-
-def run_in_dir(dir: str, func: callable):
- old_dir = os.path.abspath(os.getcwd())
- os.chdir(dir)
- func()
- os.chdir(old_dir)
-
-
-def run_in_project_dir(func: callable):
- run_in_dir(project_dir, func)
-
-
-def print_order(number: int, total: int, /, console) -> None:
- console.print(f"\[{number}/{total}]", end=" ", style="green")
diff --git a/tools/cru-py/crupest/install_docker.py b/tools/cru-py/crupest/install_docker.py
deleted file mode 100644
index ac50290..0000000
--- a/tools/cru-py/crupest/install_docker.py
+++ /dev/null
@@ -1,16 +0,0 @@
-from os.path import *
-from .path import *
-import urllib
-import subprocess
-
-
-def install_docker():
- ensure_tmp_dir()
- get_docker_path = join(tmp_dir, "get-docker.sh")
- urllib.request.urlretrieve("https://get.docker.com", get_docker_path)
- os.chmod(get_docker_path, 0o755)
- subprocess.run(["sudo", "sh", get_docker_path], check=True)
- subprocess.run(["sudo", "systemctl", "enable",
- "--now", "docker"], check=True)
- subprocess.run(["sudo", "usermod", "-aG", "docker",
- os.getlogin()], check=True)
diff --git a/tools/cru-py/crupest/nginx.py b/tools/cru-py/crupest/nginx.py
deleted file mode 100644
index 1ec5c6b..0000000
--- a/tools/cru-py/crupest/nginx.py
+++ /dev/null
@@ -1,246 +0,0 @@
-from typing import cast
-import json
-import jsonschema
-import os
-from os.path import *
-import shutil
-import subprocess
-from rich.prompt import Confirm
-from cryptography.x509 import *
-from cryptography.x509.oid import ExtensionOID
-from .template import Template
-from .path import *
-
-with open(join(nginx_template_dir, 'server.json')) as f:
- server = json.load(f)
-
-with open(join(nginx_template_dir, 'server.schema.json')) as f:
- schema = json.load(f)
-
-jsonschema.validate(server, schema)
-
-non_template_files = ['forbid_unknown_domain.conf', "websocket.conf"]
-
-ssl_template = Template(join(nginx_template_dir, 'ssl.conf.template'))
-root_template = Template(join(
- nginx_template_dir, 'root.conf.template'))
-static_file_template = Template(join(
- nginx_template_dir, 'static-file.conf.template'))
-reverse_proxy_template = Template(join(
- nginx_template_dir, 'reverse-proxy.conf.template'))
-redirect_template = Template(join(
- nginx_template_dir, 'redirect.conf.template'))
-cert_only_template = Template(join(
- nginx_template_dir, 'cert-only.conf.template'))
-
-nginx_var_set = set.union(root_template.var_set,
- static_file_template.var_set, reverse_proxy_template.var_set)
-
-
-def list_subdomain_names() -> list:
- return [s["subdomain"] for s in server["sites"]]
-
-
-def list_subdomains(domain: str) -> list:
- return [f"{s['subdomain']}.{domain}" for s in server["sites"]]
-
-
-def list_domains(domain: str) -> list:
- return [domain, *list_subdomains(domain)]
-
-
-def generate_nginx_config(domain: str, original_config, dest: str) -> None:
- if not isdir(dest):
- raise ValueError('dest must be a directory')
- # copy ssl.conf and https-redirect.conf which need no variable substitution
- for filename in non_template_files:
- src = join(nginx_template_dir, filename)
- dst = join(dest, filename)
- shutil.copyfile(src, dst)
- config = {
- "CRUPEST_DOMAIN": domain,
- "CRUPEST_V2RAY_TOKEN": original_config["CRUPEST_V2RAY_TOKEN"],
- "CRUPEST_V2RAY_PATH": original_config["CRUPEST_V2RAY_PATH"]
- }
- # generate ssl.conf
- with open(join(dest, 'ssl.conf'), 'w') as f:
- f.write(ssl_template.generate(config))
- # generate root.conf
- with open(join(dest, f'{domain}.conf'), 'w') as f:
- root_config = config.copy()
- root_config["CRUPEST_V2RAY_TOKEN"] = config["CRUPEST_V2RAY_TOKEN"]
- root_config["CRUPEST_V2RAY_PATH"] = config["CRUPEST_V2RAY_PATH"]
- f.write(root_template.generate(config))
- # generate nginx config for each site
- sites: list = server["sites"]
- for site in sites:
- subdomain = site["subdomain"]
- local_config = config.copy()
- local_config['CRUPEST_NGINX_SUBDOMAIN'] = subdomain
- if site["type"] == 'static-file':
- template = static_file_template
- local_config['CRUPEST_NGINX_ROOT'] = site["root"]
- elif site["type"] == 'reverse-proxy':
- template = reverse_proxy_template
- local_config['CRUPEST_NGINX_UPSTREAM_SERVER'] = site["upstream"]
- elif site["type"] == 'redirect':
- template = redirect_template
- local_config['CRUPEST_NGINX_URL'] = site["url"]
- elif site["type"] == 'cert-only':
- template = cert_only_template
- else:
- raise Exception('Invalid site type')
- with open(join(dest, f'{subdomain}.{domain}.conf'), 'w') as f:
- f.write(template.generate(local_config))
-
-
-def check_nginx_config_dir(dir_path: str, domain: str) -> list:
- if not exists(dir_path):
- return []
- good_files = [*non_template_files, "ssl.conf", *
- [f"{full_domain}.conf" for full_domain in list_domains(domain)]]
- bad_files = []
- for path in os.listdir(dir_path):
- file_name = basename(path)
- if file_name not in good_files:
- bad_files.append(file_name)
- return bad_files
-
-
-def restart_nginx(force=False) -> bool:
- if not force:
- p = subprocess.run(['docker', "container", "ls",
- "-f", "name=nginx", "-q"], capture_output=True)
- container: str = p.stdout.decode("utf-8")
- if len(container.strip()) == 0:
- return False
- subprocess.run(['docker', 'restart', 'nginx'])
- return True
-
-
-def nginx(domain: str, config, /, console) -> None:
- bad_files = check_nginx_config_dir(nginx_config_dir, domain)
- if len(bad_files) > 0:
- console.print(
- "WARNING: It seems there are some bad conf files in the nginx config directory:", style="yellow")
- for bad_file in bad_files:
- console.print(bad_file, style="cyan")
- to_delete = Confirm.ask(
- "They will affect nginx in a [red]bad[/] way. Do you want to delete them?", default=True, console=console)
- if to_delete:
- for file in bad_files:
- os.remove(join(nginx_config_dir, file))
- console.print(
- "I have found following var in nginx templates:", style="green")
- for var in nginx_var_set:
- console.print(var, style="magenta")
- if not exists(nginx_config_dir):
- os.mkdir(nginx_config_dir)
- console.print(
- f"Nginx config directory created at [magenta]{nginx_config_dir}[/]", style="green")
- generate_nginx_config(domain, config, dest=nginx_config_dir)
- console.print("Nginx config generated.", style="green")
- if restart_nginx():
- console.print('Nginx restarted.', style="green")
-
-
-def certbot_command_gen(domain: str, action, /, test=False, no_docker=False, *, standalone=None, email=None, agree_tos=False) -> str:
- domains = list_domains(domain)
-
- add_domain_option = True
- if action == 'create':
- if standalone == None:
- standalone = True
- certbot_action = "certonly"
- elif action == 'expand':
- if standalone == None:
- standalone = False
- certbot_action = "certonly"
- elif action == 'renew':
- if standalone == None:
- standalone = False
- add_domain_option = False
- certbot_action = "renew"
- else:
- raise ValueError('Invalid action')
-
- if no_docker:
- command = "certbot "
- else:
- expose_segment = ' -p "0.0.0.0:80:80"'
- web_root_segment = ' -v "{project_abs_path}/data/certbot/webroot:/var/www/certbot"'
- command = f'docker run -it --rm --name certbot -v "{project_abs_path}/data/certbot/certs:/etc/letsencrypt" -v "{project_abs_path}/data/certbot/data:/var/lib/letsencrypt"{ expose_segment if standalone else web_root_segment} certbot/certbot '
-
- command += certbot_action
-
- if standalone:
- command += " --standalone"
- else:
- command += ' --webroot -w /var/www/certbot'
-
- if add_domain_option:
- command += f' -d {" -d ".join(domains)}'
-
- if email is not None:
- command += f' --email {email}'
-
- if agree_tos:
- command += ' --agree-tos'
-
- if test:
- command += " --test-cert --dry-run"
-
- return command
-
-
-def get_cert_path(root_domain):
- return join(data_dir, "certbot", "certs", "live", root_domain, "fullchain.pem")
-
-
-def get_cert_domains(cert_path, root_domain):
-
- if not exists(cert_path):
- return None
-
- if not isfile(cert_path):
- return None
-
- with open(cert_path, 'rb') as f:
- cert = load_pem_x509_certificate(f.read())
- ext = cert.extensions.get_extension_for_oid(
- ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
- domains: list[str] = cast(
- SubjectAlternativeName, ext.value).get_values_for_type(DNSName)
- domains.remove(root_domain)
- domains = [root_domain, *domains]
- return domains
-
-
-def print_create_cert_message(domain, console):
- console.print(
- "Looks like you haven't run certbot to get the init ssl certificates. You may want to run following code to get one:", style="cyan")
- console.print(certbot_command_gen(domain, "create"),
- soft_wrap=True, highlight=False)
-
-
-def check_ssl_cert(domain, console):
- cert_path = get_cert_path(domain)
- tmp_cert_path = join(tmp_dir, "fullchain.pem")
- console.print("Temporarily copy cert to tmp...", style="yellow")
- subprocess.run(
- ["sudo", "cp", cert_path, tmp_cert_path], check=True)
- subprocess.run(["sudo", "chown", str(os.geteuid()),
- tmp_cert_path], check=True)
- cert_domains = get_cert_domains(tmp_cert_path, domain)
- if cert_domains is None:
- print_create_cert_message(domain, console)
- else:
- cert_domain_set = set(cert_domains)
- domains = set(list_domains(domain))
- if not cert_domain_set == domains:
- console.print(
- "Cert domains are not equal to host domains. Run following command to recreate it with nginx stopped.", style="red")
- console.print(certbot_command_gen(
- domain, "create", standalone=True), soft_wrap=True, highlight=False)
- console.print("Remove tmp cert...", style="yellow")
- os.remove(tmp_cert_path)
diff --git a/tools/cru-py/crupest/path.py b/tools/cru-py/crupest/path.py
deleted file mode 100644
index 0cfcfb8..0000000
--- a/tools/cru-py/crupest/path.py
+++ /dev/null
@@ -1,57 +0,0 @@
-import os
-import os.path
-
-script_dir = os.path.relpath(os.path.dirname(__file__))
-project_dir = os.path.normpath(os.path.join(script_dir, "../../../"))
-project_abs_path = os.path.abspath(project_dir)
-template_dir = os.path.join(project_dir, "template")
-nginx_template_dir = os.path.join(template_dir, "nginx")
-data_dir = os.path.join(project_dir, "data")
-tool_dir = os.path.join(project_dir, "tools")
-tmp_dir = os.path.join(project_dir, "tmp")
-backup_dir = os.path.join(project_dir, "backup")
-config_file_path = os.path.join(data_dir, "config")
-nginx_config_dir = os.path.join(project_dir, "nginx-config")
-log_dir = os.path.join(project_dir, "log")
-
-
-def ensure_file(path: str, /, must_exist: bool = True) -> bool:
- if must_exist and not os.path.exists(path):
- raise Exception(f"File {path} does not exist!")
- if not os.path.exists(path):
- return False
- if not os.path.isfile(path):
- raise Exception(f"{path} is not a file!")
- return True
-
-
-def ensure_dir(path: str, /, must_exist: bool = True) -> bool:
- if must_exist and not os.path.exists(path):
- raise Exception(f"Directory {path} does not exist!")
- if not os.path.exists(path):
- return False
- if not os.path.isdir(path):
- raise Exception(f"{path} is not a directory!")
- return True
-
-
-class Paths:
- script_dir = os.path.relpath(os.path.dirname(__file__))
- project_dir = os.path.normpath(os.path.join(script_dir, "../../"))
- project_abs_path = os.path.abspath(project_dir)
- data_dir = os.path.join(project_dir, "data")
- config_file_path = os.path.join(data_dir, "config")
- template_dir = os.path.join(project_dir, "template")
- tool_dir = os.path.join(project_dir, "tool")
- tmp_dir = os.path.join(project_dir, "tmp")
- backup_dir = os.path.join(project_dir, "backup")
- log_dir = os.path.join(project_dir, "log")
- template2_dir = os.path.join(project_dir, "template2")
- nginx2_template_dir = os.path.join(template2_dir, "nginx")
- generated_dir = os.path.join(project_dir, "generated")
- nginx_generated_dir = os.path.join(generated_dir, "nginx")
-
-
-def create_dir_if_not_exists(path: str) -> None:
- if not ensure_dir(path, must_exist=False):
- os.mkdir(path)
diff --git a/tools/cru-py/crupest/setup.py b/tools/cru-py/crupest/setup.py
deleted file mode 100644
index 4e91302..0000000
--- a/tools/cru-py/crupest/setup.py
+++ /dev/null
@@ -1,233 +0,0 @@
-from os.path import *
-from datetime import datetime
-from rich.prompt import Confirm
-from .path import *
-from .nginx import *
-from .config import *
-from .helper import *
-
-
-def get_template_name_list(console) -> list[str]:
- console.print("First let's check all the templates...")
-
- # get all filenames ending with .template
- template_name_list = [basename(f)[:-len('.template')] for f in os.listdir(
- template_dir) if f.endswith(".template")]
- console.print(
- f"I have found following template files in [magenta]{template_dir}[/]:", style="green")
- for filename in template_name_list:
- console.print(f"{filename}.template", style="magenta")
-
- return template_name_list
-
-
-def data_dir_check(domain, console):
- if isdir(data_dir):
- if not exists(join(data_dir, "certbot")):
- print_create_cert_message(domain, console)
- else:
- to_check = Confirm.ask(
- "I want to check your ssl certs, but I need to sudo. Do you want me check", console=console, default=False)
- if to_check:
- check_ssl_cert(domain, console)
-
-
-def template_generate(console):
- template_name_list = get_template_name_list(console)
- template_list: list = []
- config_var_name_set_in_template = set()
- for template_name in template_name_list:
- template = Template(join(template_dir, template_name+".template"))
- template_list.append(template)
- config_var_name_set_in_template.update(template.var_set)
-
- console.print(
- "I have found following variables needed in templates:", style="green")
- for key in config_var_name_set_in_template:
- console.print(key, style="magenta")
-
- # check vars
- check_success, more, less = check_config_var_set(
- config_var_name_set_in_template)
- if len(more) != 0:
- console.print("There are more variables in templates than in config file:",
- style="red")
- for key in more:
- console.print(key, style="magenta")
- if len(less) != 0:
- console.print("Following config vars are not used:",
- style="yellow")
- for key in less:
- console.print(key, style="magenta")
-
- if not check_success:
- console.print(
- "Please check you config vars and make sure the needed ones are defined!", style="red")
- else:
- console.print(
- "Now let's check if they are already generated...")
-
- conflict = False
-
- # check if there exists any generated files
- for filename in template_name_list:
- if exists(join(project_dir, filename)):
- console.print(f"Found [magenta]{filename}[/]")
- conflict = True
-
- to_gen = True
- if conflict:
- to_overwrite = Confirm.ask(
- "It seems there are some files already generated. Do you want to overwrite them?", console=console, default=False)
- if not to_overwrite:
- to_gen = False
- console.print(
- "Great! Check the existing files and see you next time!", style="green")
- else:
- print("No conflict found. Let's go on!\n")
-
- if to_gen:
- console.print("Check for existing config file...")
-
- # check if there exists a config file
- if not config_file_exists():
- config = {}
- console.print(
- "No existing config file found. Don't worry. Let's create one!", style="green")
- for config_var in config_var_list:
- config[config_var.name] = config_var.get_default_value()
- config_content = config_to_str(config)
- # create data dir if not exist
- if not exists(data_dir):
- os.mkdir(data_dir)
- # write config file
- with open(config_file_path, "w") as f:
- f.write(config_content)
- console.print(
- f"Everything else is auto generated. The config file is written into [magenta]{config_file_path}[/]. You had better keep it safe. And here is the content:", style="green")
- print_config(console, config)
- is_ok = Confirm.ask(
- "If you think it's not ok, you can stop here and edit it. Or let's go on?", console=console, default=True)
- if not is_ok:
- console.print(
- "Great! Check the config file and see you next time!", style="green")
- to_gen = False
- else:
- console.print(
- "Looks like you have already had a config file. Let's check the content:", style="green")
- with open(config_file_path, "r") as f:
- content = f.read()
- config = parse_config(content)
- print_config(console, config)
- missed_config_vars = []
- for config_var in config_var_list:
- if config_var.name not in config:
- missed_config_vars.append(config_var)
-
- if len(missed_config_vars) > 0:
- console.print(
- "Oops! It seems you have missed some keys in your config file. Let's add them!", style="green")
- for config_var in missed_config_vars:
- config[config_var.name] = config_var.get_default_value(
- console)
- content = config_to_str(config)
- with open(config_file_path, "w") as f:
- f.write(content)
- console.print(
- f"Here is the new config, it has been written out to [magenta]{config_file_path}[/]:")
- print_config(console, config)
- good_enough = Confirm.ask("Is it good enough?",
- console=console, default=True)
- if not good_enough:
- console.print(
- "Great! Check the config file and see you next time!", style="green")
- to_gen = False
-
- domain = get_domain()
-
- if to_gen:
- console.print(
- "Finally, everything is ready. Let's generate the files:", style="green")
-
- # generate files
- for index, template in enumerate(template_list):
- number = index + 1
- total = len(template_list)
- print_order(number, total, console)
- console.print(
- f"Generating [magenta]{template.template_name}[/]...")
- content = template.generate(config)
- with open(join(project_dir, template.template_name), "w") as f:
- f.write(content)
-
- # generate nginx config
- if not exists(nginx_config_dir):
- to_gen_nginx_conf = Confirm.ask("It seems you haven't generate nginx config. Do you want to generate it?",
- default=True, console=console)
- else:
- # get the latest time of files in nginx template
- template_time = 0
- for path in os.listdir(nginx_template_dir):
- template_time = max(template_time, os.stat(
- join(nginx_template_dir, path)).st_mtime)
- console.print(
- f"Nginx template update time: {datetime.fromtimestamp(template_time)}")
-
- nginx_config_time = 0
- for path in os.listdir(nginx_config_dir):
- nginx_config_time = max(nginx_config_time, os.stat(
- join(nginx_config_dir, path)).st_mtime)
- console.print(
- f"Generated nginx template update time: {datetime.fromtimestamp(nginx_config_time)}")
- if template_time > nginx_config_time:
- to_gen_nginx_conf = Confirm.ask("It seems you have updated the nginx template and not regenerate config. Do you want to regenerate the nginx config?",
- default=True, console=console)
- else:
- to_gen_nginx_conf = Confirm.ask("[yellow]It seems you have already generated nginx config. Do you want to overwrite it?[/]",
- default=False, console=console)
- if to_gen_nginx_conf:
- nginx(domain, config, console)
- data_dir_check(domain, console)
-
-
-def clear(console, /, delete_data_dir=False):
- template_name_list = get_template_name_list(console)
- # check root if we have to delete data dir
- if delete_data_dir and exists(data_dir) and os.geteuid() != 0:
- console.print(
- "You need to be root to delete data dir.", style="red")
- exit(1)
-
- to_delete = Confirm.ask(
- "[yellow]Are you sure you want to delete everything? all your data will be lost![/]", default=False, console=console)
- if to_delete:
- files_to_delete = []
- for template_name in template_name_list:
- f = join(project_dir, template_name)
- if exists(f):
- files_to_delete.append(f)
-
- delete_data_dir = delete_data_dir and exists(
- data_dir)
-
- if len(files_to_delete) == 0:
- console.print(
- "Nothing to delete. We are safe!", style="green")
- else:
- console.print("Here are the files to delete:")
- for f in files_to_delete:
- console.print(f, style="magenta")
- if delete_data_dir:
- console.print(data_dir + " (data dir)",
- style="magenta")
-
- to_delete = Confirm.ask(
- "[red]Are you sure you want to delete them?[/]", default=False, console=console)
- if to_delete:
- for f in files_to_delete:
- os.remove(f)
- if delete_data_dir:
- # recursively delete data dir
- shutil.rmtree(data_dir)
- console.print(
- "Your workspace is clean now!", style="green")
diff --git a/tools/cru-py/crupest/template.py b/tools/cru-py/crupest/template.py
deleted file mode 100644
index 9747af1..0000000
--- a/tools/cru-py/crupest/template.py
+++ /dev/null
@@ -1,32 +0,0 @@
-import os.path
-import re
-
-
-class Template:
- def __init__(self, template_path: str, var_prefix: str = "CRUPEST"):
- if len(var_prefix) != 0 and re.fullmatch(r"^[a-zA-Z_][a-zA-Z0-9_]*$", var_prefix) is None:
- raise ValueError("Invalid var prefix.")
- self.template_path = template_path
- self.template_name = os.path.basename(
- template_path)[:-len(".template")]
- with open(template_path, "r") as f:
- self.template = f.read()
- self.var_prefix = var_prefix
- self.__var_regex = re.compile(r"\$(" + var_prefix + r"_[a-zA-Z0-9_]+)")
- self.__var_brace_regex = re.compile(
- r"\$\{\s*(" + var_prefix + r"_[a-zA-Z0-9_]+)\s*\}")
- var_set = set()
- for match in self.__var_regex.finditer(self.template):
- var_set.add(match.group(1))
- for match in self.__var_brace_regex.finditer(self.template):
- var_set.add(match.group(1))
- self.var_set = var_set
-
- def generate(self, config: dict) -> str:
- result = self.template
- for var in self.var_set:
- if var not in config:
- raise ValueError(f"Missing config var {var}.")
- result = result.replace("$" + var, config[var])
- result = re.sub(r"\$\{\s*" + var + r"\s*\}", config[var], result)
- return result
diff --git a/tools/cru-py/crupest/test.py b/tools/cru-py/crupest/test.py
deleted file mode 100644
index d6eb778..0000000
--- a/tools/cru-py/crupest/test.py
+++ /dev/null
@@ -1,31 +0,0 @@
-import json
-from http.client import *
-from urllib.request import urlopen
-
-
-def test_crupest_api(console):
- def do_the_test():
- res: HTTPResponse = urlopen("http://localhost:5188/api/todos")
- body = res.read()
-
- if res.status != 200:
- raise Exception("Status code is not 200.")
- result = json.loads(body)
- if not isinstance(result, list):
- raise Exception("Result is not an array.")
- if len(result) == 0:
- raise Exception("Result is an empty array.")
- if not isinstance(result[0], dict):
- raise Exception("Result[0] is not an object.")
- if not isinstance(result[0].get("title"), str):
- raise Exception("Result[0].title is not a string.")
- if not isinstance(result[0].get("status"), str):
- raise Exception("Result[0].status is not a string.")
-
- try:
- do_the_test()
- console.print("Test passed!", style="green")
- exit(0)
- except Exception as e:
- console.print(e)
- console.print("Test failed!", style="red")
diff --git a/tools/cru-py/crupest/tui.py b/tools/cru-py/crupest/tui.py
deleted file mode 100644
index 20ba1dd..0000000
--- a/tools/cru-py/crupest/tui.py
+++ /dev/null
@@ -1,7 +0,0 @@
-from rich.console import Console
-from rich.prompt import Prompt, Confirm
-
-Prompt = Prompt
-Confirm = Confirm
-
-console = Console()
diff --git a/tools/cru-py/crupest/ui_base.py b/tools/cru-py/crupest/ui_base.py
deleted file mode 100644
index b26e65b..0000000
--- a/tools/cru-py/crupest/ui_base.py
+++ /dev/null
@@ -1,19 +0,0 @@
-from .tui import console
-
-good_style = "green"
-warning_style = "yellow"
-error_style = "red bold"
-file_name_style = "cyan bold"
-var_style = "magenta bold"
-value_style = "cyan bold"
-bye_style = "cyan"
-
-
-def print_with_indent(value: str, style: str, /, indent: int = 0, *, indent_width: int = 2, end='\n'):
- console.print(
- f'{" " * indent * indent_width}[{style}]{value}[/]', end=end)
-
-
-def print_var_value(name: str, value: str, /, indent: int = 0, *, indent_width: int = 2, end='\n'):
- console.print(
- f'{" " * indent * indent_width}[{var_style}]{name}[/] = [{value_style}]{value}[/]', end=end)