diff options
Diffstat (limited to 'tools/aio/modules')
-rw-r--r-- | tools/aio/modules/backup.py | 41 | ||||
-rw-r--r-- | tools/aio/modules/check.py | 20 | ||||
-rw-r--r-- | tools/aio/modules/config.py | 113 | ||||
-rw-r--r-- | tools/aio/modules/dns.py | 42 | ||||
-rw-r--r-- | tools/aio/modules/download_tools.py | 47 | ||||
-rw-r--r-- | tools/aio/modules/helper.py | 18 | ||||
-rw-r--r-- | tools/aio/modules/install_docker.py | 16 | ||||
-rwxr-xr-x | tools/aio/modules/nginx.py | 247 | ||||
-rw-r--r-- | tools/aio/modules/path.py | 30 | ||||
-rw-r--r-- | tools/aio/modules/setup.py | 233 | ||||
-rw-r--r-- | tools/aio/modules/template.py | 32 | ||||
-rw-r--r-- | tools/aio/modules/test.py | 31 |
12 files changed, 870 insertions, 0 deletions
diff --git a/tools/aio/modules/backup.py b/tools/aio/modules/backup.py new file mode 100644 index 0000000..7921d0d --- /dev/null +++ b/tools/aio/modules/backup.py @@ -0,0 +1,41 @@ +from .path import * +from rich.prompt import Prompt, Confirm +from urllib.request import urlretrieve +import subprocess +from datetime import datetime + + +def backup_restore(http_url_or_path, /, console): + url = http_url_or_path + if len(url) == 0: + raise Exception("You specify an empty url. Abort.") + if url.startswith("http://") or url.startswith("https://"): + download_path = os.path.join(tmp_dir, "data.tar.xz") + if os.path.exists(download_path): + to_remove = Confirm.ask( + f"I want to download to [cyan]{download_path}[/]. However, there is a file already there. Do you want to remove it first", default=False, console=console) + if to_remove: + os.remove(download_path) + else: + raise Exception( + "Aborted! Please check the file and try again.") + urlretrieve(url, download_path) + url = download_path + subprocess.run(["sudo", "tar", "-xJf", url, "-C", project_dir], check=True) + + +def backup_backup(path, /, console): + ensure_backup_dir() + now = datetime.utcnow().isoformat(timespec="seconds") + "Z" + if path is None: + path = Prompt.ask( + "You don't specify the path to backup to. Please specify one. http and https are NOT supported", console=console, default=os.path.join(backup_dir, now + ".tar.xz")) + if len(path) == 0: + raise Exception("You specify an empty path. Abort!") + if os.path.exists(path): + raise Exception( + "A file is already there. Please remove it first. Abort!") + subprocess.run( + ["sudo", "tar", "-cJf", path, "data", "-C", project_dir], + check=True + ) diff --git a/tools/aio/modules/check.py b/tools/aio/modules/check.py new file mode 100644 index 0000000..2a082f6 --- /dev/null +++ b/tools/aio/modules/check.py @@ -0,0 +1,20 @@ +import sys +import re +from os.path import * + + +def check_python_version(required_version=(3, 10)): + return sys.version_info < required_version + + +def check_ubuntu(): + if not exists("/etc/os-release"): + return False + else: + with open("/etc/os-release", "r") as f: + content = f.read() + if re.search(r"NAME=\"?Ubuntu\"?", content, re.IGNORECASE) is None: + return False + if re.search(r"VERSION_ID=\"?22.04\"?", content, re.IGNORECASE) is None: + return False + return True diff --git a/tools/aio/modules/config.py b/tools/aio/modules/config.py new file mode 100644 index 0000000..40b20d1 --- /dev/null +++ b/tools/aio/modules/config.py @@ -0,0 +1,113 @@ +import os +import typing +import uuid +from rich.prompt import Prompt +from .path import config_file_path + +def generate_uuid(): + return str(uuid.uuid4()) + +class ConfigVar: + def __init__(self, name: str, description: str, default_value_generator: typing.Callable[[], str] | str, /, default_value_for_ask=str | None): + """Create a config var. + + Args: + name (str): The name of the config var. + description (str): The description of the config var. + default_value_generator (typing.Callable[[], str] | str): The default value generator of the config var. If it is a string, it will be used as the input prompt and let user input the value. + """ + self.name = name + self.description = description + self.default_value_generator = default_value_generator + self.default_value_for_ask = default_value_for_ask + + def get_default_value(self, /, console): + if isinstance(self.default_value_generator, str): + return Prompt.ask(self.default_value_generator, console=console, default=self.default_value_for_ask) + else: + return self.default_value_generator() + + +config_var_list: list = [ + ConfigVar("CRUPEST_DOMAIN", "domain name", + "Please input your domain name"), + ConfigVar("CRUPEST_EMAIL", "admin email address", + "Please input your email address"), + ConfigVar("CRUPEST_AUTO_BACKUP_COS_SECRET_ID", + "access key id for Tencent COS, used for auto backup", "Please input your Tencent COS access key id for backup"), + ConfigVar("CRUPEST_AUTO_BACKUP_COS_SECRET_KEY", + "access key secret for Tencent COS, used for auto backup", "Please input your Tencent COS access key for backup"), + ConfigVar("CRUPEST_AUTO_BACKUP_COS_REGION", + "region for Tencent COS, used for auto backup", "Please input your Tencent COS region for backup", "ap-hongkong"), + ConfigVar("CRUPEST_AUTO_BACKUP_BUCKET_NAME", + "bucket name for Tencent COS, used for auto backup", "Please input your Tencent COS bucket name for backup"), + ConfigVar("CRUPEST_GITHUB_USERNAME", + "github username for fetching todos", "Please input your github username for fetching todos", "crupest"), + ConfigVar("CRUPEST_GITHUB_PROJECT_NUMBER", + "github project number for fetching todos", "Please input your github project number for fetching todos", "2"), + ConfigVar("CRUPEST_GITHUB_TOKEN", + "github token for fetching todos", "Please input your github token for fetching todos"), + ConfigVar("CRUPEST_GITHUB_TODO_COUNT", + "github todo count", "Please input your github todo count", 10), + ConfigVar("CRUPEST_GITHUB_TODO_COUNT", + "github todo count", "Please input your github todo count", 10), + ConfigVar("CRUPEST_V2RAY_TOKEN", + "v2ray user id", generate_uuid), + ConfigVar("CRUPEST_V2RAY_PATH", + "v2ray path, which will be prefixed by _", generate_uuid), +] + +config_var_name_set = set([config_var.name for config_var in config_var_list]) + + +def check_config_var_set(needed_config_var_set: set): + more = [] + less = [] + for var_name in needed_config_var_set: + if var_name not in config_var_name_set: + more.append(var_name) + for var_name in config_var_name_set: + if var_name not in needed_config_var_set: + less.append(var_name) + return (True if len(more) == 0 else False, more, less) + + +def config_file_exists(): + return os.path.isfile(config_file_path) + + +def parse_config(str: str) -> dict: + config = {} + for line_number, line in enumerate(str.splitlines()): + # check if it's a comment + if line.startswith("#"): + continue + # check if there is a '=' + if line.find("=") == -1: + raise ValueError( + f"Invalid config string. Please check line {line_number + 1}. There is even no '='!") + # split at first '=' + key, value = line.split("=", 1) + key = key.strip() + value = value.strip() + config[key] = value + return config + + +def get_domain() -> str: + if not config_file_exists(): + raise ValueError("Config file not found!") + with open(config_file_path) as f: + config = parse_config(f.read()) + if "CRUPEST_DOMAIN" not in config: + raise ValueError("Domain not found in config file!") + return config["CRUPEST_DOMAIN"] + + +def config_to_str(config: dict) -> str: + return "\n".join([f"{key}={value}" for key, value in config.items()]) + + +def print_config(console, config: dict) -> None: + for key, value in config.items(): + console.print(f"[magenta]{key}[/] = [cyan]{value}") diff --git a/tools/aio/modules/dns.py b/tools/aio/modules/dns.py new file mode 100644 index 0000000..5006d5f --- /dev/null +++ b/tools/aio/modules/dns.py @@ -0,0 +1,42 @@ +from os.path import * +from io import StringIO +import re +from .nginx import * + + +def generate_dns_zone(domain: str, ip: str, /, ttl: str | int = 600, *, enable_mail: bool = True, dkim: str | None = None) -> str: + result = f"$ORIGIN {domain}.\n\n" + result += "; A records\n" + result += f"@ {ttl} IN A {ip}\n" + subdomains = list_subdomain_names() + for subdomain in subdomains: + result += f"{subdomain} {ttl} IN A {ip}\n" + + if enable_mail: + result += "\n; MX records\n" + result += f"@ {ttl} IN MX 10 mail.{domain}.\n" + result += "\n; SPF record\n" + result += f"@ {ttl} IN TXT \"v=spf1 mx ~all\"\n" + if dkim is not None: + result += "\n; DKIM record\n" + result += f"mail._domainkey {ttl} IN TEXT \"{dkim}\"" + result += "\n; DMARC record\n" + result += "_dmarc {ttl} IN TXT \"v=DMARC1; p=none; rua=mailto:dmarc.report@{domain}; ruf=mailto:dmarc.report@{domain}; sp=none; ri=86400\"\n" + return result + + +def get_dkim_from_mailserver(domain: str) -> str | None: + dkim_path = join(data_dir, "dms/config/opendkim/keys", domain, "mail.txt") + if not exists(dkim_path): + return None + + p = subprocess.run(["sudo", "cat", dkim_path], + capture_output=True, check=True) + value = "" + for match in re.finditer("\"(.*)\"", p.stdout.decode('utf-8')): + value += match.group(1) + return value + + +def generate_dns_zone_with_dkim(domain: str, ip: str, /, ttl: str | int = 600) -> str: + return generate_dns_zone(domain, ip, ttl, enable_mail=True, dkim=get_dkim_from_mailserver(domain)) diff --git a/tools/aio/modules/download_tools.py b/tools/aio/modules/download_tools.py new file mode 100644 index 0000000..beb06d4 --- /dev/null +++ b/tools/aio/modules/download_tools.py @@ -0,0 +1,47 @@ +import sys +from os.path import * +from urllib.request import * +from rich.prompt import Confirm +from .path import * +from .helper import print_order + + +TOOLS = [("docker-mailserver setup script", "docker-mailserver-setup.sh", + "https://raw.githubusercontent.com/docker-mailserver/docker-mailserver/master/setup.sh")] + + +def download_tools(console): + # if we are not linux, we prompt the user + if sys.platform != "linux": + console.print( + "You are not running this script on linux. The tools will not work.", style="yellow") + if not Confirm.ask("Do you want to continue?", default=False, console=console): + return + + for index, script in enumerate(TOOLS): + number = index + 1 + total = len(TOOLS) + print_order(number, total, console) + name, filename, url = script + # if url is callable, call it + if callable(url): + url = url() + path = join(tool_dir, filename) + skip = False + if exists(path): + overwrite = Confirm.ask( + f"[cyan]{name}[/] already exists, download and overwrite?", default=False, console=console) + if not overwrite: + skip = True + else: + download = Confirm.ask( + f"Download [cyan]{name}[/] to [magenta]{path}[/]?", default=True, console=console) + if not download: + skip = True + if not skip: + console.print(f"Downloading {name}...") + urlretrieve(url, path) + os.chmod(path, 0o755) + console.print(f"Downloaded {name} to {path}.", style="green") + else: + console.print(f"Skipped {name}.", style="yellow") diff --git a/tools/aio/modules/helper.py b/tools/aio/modules/helper.py new file mode 100644 index 0000000..f8fe34a --- /dev/null +++ b/tools/aio/modules/helper.py @@ -0,0 +1,18 @@ +import os +import os.path +from .path import * + + +def run_in_dir(dir: str, func: callable): + old_dir = os.path.abspath(os.getcwd()) + os.chdir(dir) + func() + os.chdir(old_dir) + + +def run_in_project_dir(func: callable): + run_in_dir(project_dir, func) + + +def print_order(number: int, total: int, /, console) -> None: + console.print(f"\[{number}/{total}]", end=" ", style="green") diff --git a/tools/aio/modules/install_docker.py b/tools/aio/modules/install_docker.py new file mode 100644 index 0000000..ac50290 --- /dev/null +++ b/tools/aio/modules/install_docker.py @@ -0,0 +1,16 @@ +from os.path import * +from .path import * +import urllib +import subprocess + + +def install_docker(): + ensure_tmp_dir() + get_docker_path = join(tmp_dir, "get-docker.sh") + urllib.request.urlretrieve("https://get.docker.com", get_docker_path) + os.chmod(get_docker_path, 0o755) + subprocess.run(["sudo", "sh", get_docker_path], check=True) + subprocess.run(["sudo", "systemctl", "enable", + "--now", "docker"], check=True) + subprocess.run(["sudo", "usermod", "-aG", "docker", + os.getlogin()], check=True) diff --git a/tools/aio/modules/nginx.py b/tools/aio/modules/nginx.py new file mode 100755 index 0000000..f69c5df --- /dev/null +++ b/tools/aio/modules/nginx.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python3 + +import json +import jsonschema +import os +from os.path import * +import shutil +import subprocess +from rich.prompt import Confirm +from cryptography.x509 import * +from cryptography.x509.oid import ExtensionOID +from .template import Template +from .path import * + +with open(join(nginx_template_dir, 'server.json')) as f: + server = json.load(f) + +with open(join(nginx_template_dir, 'server.schema.json')) as f: + schema = json.load(f) + +jsonschema.validate(server, schema) + +non_template_files = ['forbid_unknown_domain.conf', "websocket.conf"] + +ssl_template = Template(join(nginx_template_dir, 'ssl.conf.template')) +root_template = Template(join( + nginx_template_dir, 'root.conf.template')) +static_file_template = Template(join( + nginx_template_dir, 'static-file.conf.template')) +reverse_proxy_template = Template(join( + nginx_template_dir, 'reverse-proxy.conf.template')) +redirect_template = Template(join( + nginx_template_dir, 'redirect.conf.template')) +cert_only_template = Template(join( + nginx_template_dir, 'cert-only.conf.template')) + +nginx_var_set = set.union(root_template.var_set, + static_file_template.var_set, reverse_proxy_template.var_set) + + +def list_subdomain_names() -> list: + return [s["subdomain"] for s in server["sites"]] + + +def list_subdomains(domain: str) -> list: + return [f"{s['subdomain']}.{domain}" for s in server["sites"]] + + +def list_domains(domain: str) -> list: + return [domain, *list_subdomains(domain)] + + +def generate_nginx_config(domain: str, original_config, dest: str) -> None: + if not isdir(dest): + raise ValueError('dest must be a directory') + # copy ssl.conf and https-redirect.conf which need no variable substitution + for filename in non_template_files: + src = join(nginx_template_dir, filename) + dst = join(dest, filename) + shutil.copyfile(src, dst) + config = { + "CRUPEST_DOMAIN": domain, + "CRUPEST_V2RAY_TOKEN": original_config["CRUPEST_V2RAY_TOKEN"], + "CRUPEST_V2RAY_PATH": original_config["CRUPEST_V2RAY_PATH"] + } + # generate ssl.conf + with open(join(dest, 'ssl.conf'), 'w') as f: + f.write(ssl_template.generate(config)) + # generate root.conf + with open(join(dest, f'{domain}.conf'), 'w') as f: + root_config = config.copy() + root_config["CRUPEST_V2RAY_TOKEN"] = config["CRUPEST_V2RAY_TOKEN"] + root_config["CRUPEST_V2RAY_PATH"] = config["CRUPEST_V2RAY_PATH"] + f.write(root_template.generate(config)) + # generate nginx config for each site + sites: list = server["sites"] + for site in sites: + subdomain = site["subdomain"] + local_config = config.copy() + local_config['CRUPEST_NGINX_SUBDOMAIN'] = subdomain + if site["type"] == 'static-file': + template = static_file_template + local_config['CRUPEST_NGINX_ROOT'] = site["root"] + elif site["type"] == 'reverse-proxy': + template = reverse_proxy_template + local_config['CRUPEST_NGINX_UPSTREAM_SERVER'] = site["upstream"] + elif site["type"] == 'redirect': + template = redirect_template + local_config['CRUPEST_NGINX_URL'] = site["url"] + elif site["type"] == 'cert-only': + template = cert_only_template + else: + raise Exception('Invalid site type') + with open(join(dest, f'{subdomain}.{domain}.conf'), 'w') as f: + f.write(template.generate(local_config)) + + +def check_nginx_config_dir(dir_path: str, domain: str) -> list: + if not exists(dir_path): + return [] + good_files = [*non_template_files, "ssl.conf", * + [f"{full_domain}.conf" for full_domain in list_domains(domain)]] + bad_files = [] + for path in os.listdir(dir_path): + file_name = basename(path) + if file_name not in good_files: + bad_files.append(file_name) + return bad_files + + +def restart_nginx(force=False) -> bool: + if not force: + p = subprocess.run(['docker', "container", "ls", + "-f", "name=nginx", "-q"], capture_output=True) + container: str = p.stdout.decode("utf-8") + if len(container.strip()) == 0: + return False + subprocess.run(['docker', 'restart', 'nginx']) + return True + + +def nginx(domain: str, config, /, console) -> None: + bad_files = check_nginx_config_dir(nginx_config_dir, domain) + if len(bad_files) > 0: + console.print( + "WARNING: It seems there are some bad conf files in the nginx config directory:", style="yellow") + for bad_file in bad_files: + console.print(bad_file, style="cyan") + to_delete = Confirm.ask( + "They will affect nginx in a [red]bad[/] way. Do you want to delete them?", default=True, console=console) + if to_delete: + for file in bad_files: + os.remove(join(nginx_config_dir, file)) + console.print( + "I have found following var in nginx templates:", style="green") + for var in nginx_var_set: + console.print(var, style="magenta") + if not exists(nginx_config_dir): + os.mkdir(nginx_config_dir) + console.print( + f"Nginx config directory created at [magenta]{nginx_config_dir}[/]", style="green") + generate_nginx_config(domain, config, dest=nginx_config_dir) + console.print("Nginx config generated.", style="green") + if restart_nginx(): + console.print('Nginx restarted.', style="green") + + +def certbot_command_gen(domain: str, action, /, test=False, no_docker=False, *, standalone=None, email=None, agree_tos=False) -> str: + domains = list_domains(domain) + + add_domain_option = True + if action == 'create': + if standalone == None: + standalone = True + certbot_action = "certonly" + elif action == 'expand': + if standalone == None: + standalone = False + certbot_action = "certonly" + elif action == 'renew': + if standalone == None: + standalone = False + add_domain_option = False + certbot_action = "renew" + else: + raise ValueError('Invalid action') + + if no_docker: + command = "certbot " + else: + expose_segment = ' -p "0.0.0.0:80:80"' + web_root_segment = ' -v "{project_abs_path}/data/certbot/webroot:/var/www/certbot"' + command = f'docker run -it --rm --name certbot -v "{project_abs_path}/data/certbot/certs:/etc/letsencrypt" -v "{project_abs_path}/data/certbot/data:/var/lib/letsencrypt"{ expose_segment if standalone else web_root_segment} certbot/certbot ' + + command += certbot_action + + if standalone: + command += " --standalone" + else: + command += ' --webroot -w /var/www/certbot' + + if add_domain_option: + command += f' -d {" -d ".join(domains)}' + + if email is not None: + command += f' --email {email}' + + if agree_tos: + command += ' --agree-tos' + + if test: + command += " --test-cert --dry-run" + + return command + + +def get_cert_path(root_domain): + return join(data_dir, "certbot", "certs", "live", root_domain, "fullchain.pem") + + +def get_cert_domains(cert_path, root_domain): + + if not exists(cert_path): + return None + + if not isfile(cert_path): + return None + + with open(cert_path, 'rb') as f: + cert = load_pem_x509_certificate(f.read()) + ext = cert.extensions.get_extension_for_oid( + ExtensionOID.SUBJECT_ALTERNATIVE_NAME) + domains: list = ext.value.get_values_for_type(DNSName) + domains.remove(root_domain) + domains = [root_domain, *domains] + return domains + + +def print_create_cert_message(domain, console): + console.print( + "Looks like you haven't run certbot to get the init ssl certificates. You may want to run following code to get one:", style="cyan") + console.print(certbot_command_gen(domain, "create"), + soft_wrap=True, highlight=False) + + +def check_ssl_cert(domain, console): + cert_path = get_cert_path(domain) + tmp_cert_path = join(tmp_dir, "fullchain.pem") + console.print("Temporarily copy cert to tmp...", style="yellow") + ensure_tmp_dir() + subprocess.run( + ["sudo", "cp", cert_path, tmp_cert_path], check=True) + subprocess.run(["sudo", "chown", str(os.geteuid()), + tmp_cert_path], check=True) + cert_domains = get_cert_domains(tmp_cert_path, domain) + if cert_domains is None: + print_create_cert_message(domain, console) + else: + cert_domain_set = set(cert_domains) + domains = set(list_domains(domain)) + if not cert_domain_set == domains: + console.print( + "Cert domains are not equal to host domains. Run following command to recreate it with nginx stopped.", style="red") + console.print(certbot_command_gen( + domain, "create", standalone=True), soft_wrap=True, highlight=False) + console.print("Remove tmp cert...", style="yellow") + os.remove(tmp_cert_path) diff --git a/tools/aio/modules/path.py b/tools/aio/modules/path.py new file mode 100644 index 0000000..b3b12b9 --- /dev/null +++ b/tools/aio/modules/path.py @@ -0,0 +1,30 @@ +import os +import os.path + +script_dir = os.path.relpath(os.path.dirname(__file__)) +project_dir = os.path.normpath(os.path.join(script_dir, "../../../")) +project_abs_path = os.path.abspath(project_dir) +template_dir = os.path.join(project_dir, "template") +nginx_template_dir = os.path.join(template_dir, "nginx") +data_dir = os.path.join(project_dir, "data") +tool_dir = os.path.join(project_dir, "tool") +tmp_dir = os.path.join(project_dir, "tmp") +backup_dir = os.path.join(project_dir, "backup") +config_file_path = os.path.join(data_dir, "config") +nginx_config_dir = os.path.join(project_dir, "nginx-config") +log_dir = os.path.join(project_dir, "log") + + +def ensure_log_dir(): + if not os.path.exists(log_dir): + os.mkdir(log_dir) + + +def ensure_tmp_dir(): + if not os.path.exists(tmp_dir): + os.mkdir(tmp_dir) + + +def ensure_backup_dir(): + if not os.path.exists(backup_dir): + os.mkdir(backup_dir) diff --git a/tools/aio/modules/setup.py b/tools/aio/modules/setup.py new file mode 100644 index 0000000..4e91302 --- /dev/null +++ b/tools/aio/modules/setup.py @@ -0,0 +1,233 @@ +from os.path import * +from datetime import datetime +from rich.prompt import Confirm +from .path import * +from .nginx import * +from .config import * +from .helper import * + + +def get_template_name_list(console) -> list[str]: + console.print("First let's check all the templates...") + + # get all filenames ending with .template + template_name_list = [basename(f)[:-len('.template')] for f in os.listdir( + template_dir) if f.endswith(".template")] + console.print( + f"I have found following template files in [magenta]{template_dir}[/]:", style="green") + for filename in template_name_list: + console.print(f"{filename}.template", style="magenta") + + return template_name_list + + +def data_dir_check(domain, console): + if isdir(data_dir): + if not exists(join(data_dir, "certbot")): + print_create_cert_message(domain, console) + else: + to_check = Confirm.ask( + "I want to check your ssl certs, but I need to sudo. Do you want me check", console=console, default=False) + if to_check: + check_ssl_cert(domain, console) + + +def template_generate(console): + template_name_list = get_template_name_list(console) + template_list: list = [] + config_var_name_set_in_template = set() + for template_name in template_name_list: + template = Template(join(template_dir, template_name+".template")) + template_list.append(template) + config_var_name_set_in_template.update(template.var_set) + + console.print( + "I have found following variables needed in templates:", style="green") + for key in config_var_name_set_in_template: + console.print(key, style="magenta") + + # check vars + check_success, more, less = check_config_var_set( + config_var_name_set_in_template) + if len(more) != 0: + console.print("There are more variables in templates than in config file:", + style="red") + for key in more: + console.print(key, style="magenta") + if len(less) != 0: + console.print("Following config vars are not used:", + style="yellow") + for key in less: + console.print(key, style="magenta") + + if not check_success: + console.print( + "Please check you config vars and make sure the needed ones are defined!", style="red") + else: + console.print( + "Now let's check if they are already generated...") + + conflict = False + + # check if there exists any generated files + for filename in template_name_list: + if exists(join(project_dir, filename)): + console.print(f"Found [magenta]{filename}[/]") + conflict = True + + to_gen = True + if conflict: + to_overwrite = Confirm.ask( + "It seems there are some files already generated. Do you want to overwrite them?", console=console, default=False) + if not to_overwrite: + to_gen = False + console.print( + "Great! Check the existing files and see you next time!", style="green") + else: + print("No conflict found. Let's go on!\n") + + if to_gen: + console.print("Check for existing config file...") + + # check if there exists a config file + if not config_file_exists(): + config = {} + console.print( + "No existing config file found. Don't worry. Let's create one!", style="green") + for config_var in config_var_list: + config[config_var.name] = config_var.get_default_value() + config_content = config_to_str(config) + # create data dir if not exist + if not exists(data_dir): + os.mkdir(data_dir) + # write config file + with open(config_file_path, "w") as f: + f.write(config_content) + console.print( + f"Everything else is auto generated. The config file is written into [magenta]{config_file_path}[/]. You had better keep it safe. And here is the content:", style="green") + print_config(console, config) + is_ok = Confirm.ask( + "If you think it's not ok, you can stop here and edit it. Or let's go on?", console=console, default=True) + if not is_ok: + console.print( + "Great! Check the config file and see you next time!", style="green") + to_gen = False + else: + console.print( + "Looks like you have already had a config file. Let's check the content:", style="green") + with open(config_file_path, "r") as f: + content = f.read() + config = parse_config(content) + print_config(console, config) + missed_config_vars = [] + for config_var in config_var_list: + if config_var.name not in config: + missed_config_vars.append(config_var) + + if len(missed_config_vars) > 0: + console.print( + "Oops! It seems you have missed some keys in your config file. Let's add them!", style="green") + for config_var in missed_config_vars: + config[config_var.name] = config_var.get_default_value( + console) + content = config_to_str(config) + with open(config_file_path, "w") as f: + f.write(content) + console.print( + f"Here is the new config, it has been written out to [magenta]{config_file_path}[/]:") + print_config(console, config) + good_enough = Confirm.ask("Is it good enough?", + console=console, default=True) + if not good_enough: + console.print( + "Great! Check the config file and see you next time!", style="green") + to_gen = False + + domain = get_domain() + + if to_gen: + console.print( + "Finally, everything is ready. Let's generate the files:", style="green") + + # generate files + for index, template in enumerate(template_list): + number = index + 1 + total = len(template_list) + print_order(number, total, console) + console.print( + f"Generating [magenta]{template.template_name}[/]...") + content = template.generate(config) + with open(join(project_dir, template.template_name), "w") as f: + f.write(content) + + # generate nginx config + if not exists(nginx_config_dir): + to_gen_nginx_conf = Confirm.ask("It seems you haven't generate nginx config. Do you want to generate it?", + default=True, console=console) + else: + # get the latest time of files in nginx template + template_time = 0 + for path in os.listdir(nginx_template_dir): + template_time = max(template_time, os.stat( + join(nginx_template_dir, path)).st_mtime) + console.print( + f"Nginx template update time: {datetime.fromtimestamp(template_time)}") + + nginx_config_time = 0 + for path in os.listdir(nginx_config_dir): + nginx_config_time = max(nginx_config_time, os.stat( + join(nginx_config_dir, path)).st_mtime) + console.print( + f"Generated nginx template update time: {datetime.fromtimestamp(nginx_config_time)}") + if template_time > nginx_config_time: + to_gen_nginx_conf = Confirm.ask("It seems you have updated the nginx template and not regenerate config. Do you want to regenerate the nginx config?", + default=True, console=console) + else: + to_gen_nginx_conf = Confirm.ask("[yellow]It seems you have already generated nginx config. Do you want to overwrite it?[/]", + default=False, console=console) + if to_gen_nginx_conf: + nginx(domain, config, console) + data_dir_check(domain, console) + + +def clear(console, /, delete_data_dir=False): + template_name_list = get_template_name_list(console) + # check root if we have to delete data dir + if delete_data_dir and exists(data_dir) and os.geteuid() != 0: + console.print( + "You need to be root to delete data dir.", style="red") + exit(1) + + to_delete = Confirm.ask( + "[yellow]Are you sure you want to delete everything? all your data will be lost![/]", default=False, console=console) + if to_delete: + files_to_delete = [] + for template_name in template_name_list: + f = join(project_dir, template_name) + if exists(f): + files_to_delete.append(f) + + delete_data_dir = delete_data_dir and exists( + data_dir) + + if len(files_to_delete) == 0: + console.print( + "Nothing to delete. We are safe!", style="green") + else: + console.print("Here are the files to delete:") + for f in files_to_delete: + console.print(f, style="magenta") + if delete_data_dir: + console.print(data_dir + " (data dir)", + style="magenta") + + to_delete = Confirm.ask( + "[red]Are you sure you want to delete them?[/]", default=False, console=console) + if to_delete: + for f in files_to_delete: + os.remove(f) + if delete_data_dir: + # recursively delete data dir + shutil.rmtree(data_dir) + console.print( + "Your workspace is clean now!", style="green") diff --git a/tools/aio/modules/template.py b/tools/aio/modules/template.py new file mode 100644 index 0000000..9747af1 --- /dev/null +++ b/tools/aio/modules/template.py @@ -0,0 +1,32 @@ +import os.path +import re + + +class Template: + def __init__(self, template_path: str, var_prefix: str = "CRUPEST"): + if len(var_prefix) != 0 and re.fullmatch(r"^[a-zA-Z_][a-zA-Z0-9_]*$", var_prefix) is None: + raise ValueError("Invalid var prefix.") + self.template_path = template_path + self.template_name = os.path.basename( + template_path)[:-len(".template")] + with open(template_path, "r") as f: + self.template = f.read() + self.var_prefix = var_prefix + self.__var_regex = re.compile(r"\$(" + var_prefix + r"_[a-zA-Z0-9_]+)") + self.__var_brace_regex = re.compile( + r"\$\{\s*(" + var_prefix + r"_[a-zA-Z0-9_]+)\s*\}") + var_set = set() + for match in self.__var_regex.finditer(self.template): + var_set.add(match.group(1)) + for match in self.__var_brace_regex.finditer(self.template): + var_set.add(match.group(1)) + self.var_set = var_set + + def generate(self, config: dict) -> str: + result = self.template + for var in self.var_set: + if var not in config: + raise ValueError(f"Missing config var {var}.") + result = result.replace("$" + var, config[var]) + result = re.sub(r"\$\{\s*" + var + r"\s*\}", config[var], result) + return result diff --git a/tools/aio/modules/test.py b/tools/aio/modules/test.py new file mode 100644 index 0000000..d6eb778 --- /dev/null +++ b/tools/aio/modules/test.py @@ -0,0 +1,31 @@ +import json +from http.client import * +from urllib.request import urlopen + + +def test_crupest_api(console): + def do_the_test(): + res: HTTPResponse = urlopen("http://localhost:5188/api/todos") + body = res.read() + + if res.status != 200: + raise Exception("Status code is not 200.") + result = json.loads(body) + if not isinstance(result, list): + raise Exception("Result is not an array.") + if len(result) == 0: + raise Exception("Result is an empty array.") + if not isinstance(result[0], dict): + raise Exception("Result[0] is not an object.") + if not isinstance(result[0].get("title"), str): + raise Exception("Result[0].title is not a string.") + if not isinstance(result[0].get("status"), str): + raise Exception("Result[0].status is not a string.") + + try: + do_the_test() + console.print("Test passed!", style="green") + exit(0) + except Exception as e: + console.print(e) + console.print("Test failed!", style="red") |