diff options
Diffstat (limited to 'tools/aio/modules')
-rw-r--r-- | tools/aio/modules/backup.py | 41 | ||||
-rw-r--r-- | tools/aio/modules/check.py | 20 | ||||
-rw-r--r-- | tools/aio/modules/config.py | 134 | ||||
-rw-r--r-- | tools/aio/modules/dns.py | 42 | ||||
-rw-r--r-- | tools/aio/modules/download_tools.py | 47 | ||||
-rw-r--r-- | tools/aio/modules/helper.py | 18 | ||||
-rw-r--r-- | tools/aio/modules/install_docker.py | 16 | ||||
-rwxr-xr-x | tools/aio/modules/nginx.py | 247 | ||||
-rw-r--r-- | tools/aio/modules/path.py | 30 | ||||
-rw-r--r-- | tools/aio/modules/setup.py | 233 | ||||
-rw-r--r-- | tools/aio/modules/template.py | 32 | ||||
-rw-r--r-- | tools/aio/modules/test.py | 31 |
12 files changed, 0 insertions, 891 deletions
diff --git a/tools/aio/modules/backup.py b/tools/aio/modules/backup.py deleted file mode 100644 index 7921d0d..0000000 --- a/tools/aio/modules/backup.py +++ /dev/null @@ -1,41 +0,0 @@ -from .path import * -from rich.prompt import Prompt, Confirm -from urllib.request import urlretrieve -import subprocess -from datetime import datetime - - -def backup_restore(http_url_or_path, /, console): - url = http_url_or_path - if len(url) == 0: - raise Exception("You specify an empty url. Abort.") - if url.startswith("http://") or url.startswith("https://"): - download_path = os.path.join(tmp_dir, "data.tar.xz") - if os.path.exists(download_path): - to_remove = Confirm.ask( - f"I want to download to [cyan]{download_path}[/]. However, there is a file already there. Do you want to remove it first", default=False, console=console) - if to_remove: - os.remove(download_path) - else: - raise Exception( - "Aborted! Please check the file and try again.") - urlretrieve(url, download_path) - url = download_path - subprocess.run(["sudo", "tar", "-xJf", url, "-C", project_dir], check=True) - - -def backup_backup(path, /, console): - ensure_backup_dir() - now = datetime.utcnow().isoformat(timespec="seconds") + "Z" - if path is None: - path = Prompt.ask( - "You don't specify the path to backup to. Please specify one. http and https are NOT supported", console=console, default=os.path.join(backup_dir, now + ".tar.xz")) - if len(path) == 0: - raise Exception("You specify an empty path. Abort!") - if os.path.exists(path): - raise Exception( - "A file is already there. Please remove it first. Abort!") - subprocess.run( - ["sudo", "tar", "-cJf", path, "data", "-C", project_dir], - check=True - ) diff --git a/tools/aio/modules/check.py b/tools/aio/modules/check.py deleted file mode 100644 index 2a082f6..0000000 --- a/tools/aio/modules/check.py +++ /dev/null @@ -1,20 +0,0 @@ -import sys -import re -from os.path import * - - -def check_python_version(required_version=(3, 10)): - return sys.version_info < required_version - - -def check_ubuntu(): - if not exists("/etc/os-release"): - return False - else: - with open("/etc/os-release", "r") as f: - content = f.read() - if re.search(r"NAME=\"?Ubuntu\"?", content, re.IGNORECASE) is None: - return False - if re.search(r"VERSION_ID=\"?22.04\"?", content, re.IGNORECASE) is None: - return False - return True diff --git a/tools/aio/modules/config.py b/tools/aio/modules/config.py deleted file mode 100644 index 4faa8a3..0000000 --- a/tools/aio/modules/config.py +++ /dev/null @@ -1,134 +0,0 @@ -import os -import typing -import uuid -import random -import string -from rich.prompt import Prompt -from .path import config_file_path - -def generate_uuid(): - return str(uuid.uuid4()) - -# generate random characters of digits and alphabets -def generate_random_string(length: int): - characters = string.ascii_letters + string.digits - random_string = ''.join(random.choice(characters) for _ in range(length)) - return random_string - -def generate_random_string_32(): - return generate_random_string(32) - -class ConfigVar: - def __init__(self, name: str, description: str, default_value_generator: typing.Callable[[], str] | str, /, default_value_for_ask=str | None): - """Create a config var. - - Args: - name (str): The name of the config var. - description (str): The description of the config var. - default_value_generator (typing.Callable[[], str] | str): The default value generator of the config var. If it is a string, it will be used as the input prompt and let user input the value. - """ - self.name = name - self.description = description - self.default_value_generator = default_value_generator - self.default_value_for_ask = default_value_for_ask - - def get_default_value(self, /, console): - if isinstance(self.default_value_generator, str): - return Prompt.ask(self.default_value_generator, console=console, default=self.default_value_for_ask) - else: - return self.default_value_generator() - - -config_var_list: list = [ - ConfigVar("CRUPEST_DOMAIN", "domain name", - "Please input your domain name"), - ConfigVar("CRUPEST_EMAIL", "admin email address", - "Please input your email address"), - ConfigVar("CRUPEST_AUTO_BACKUP_COS_SECRET_ID", - "access key id for Tencent COS, used for auto backup", "Please input your Tencent COS access key id for backup"), - ConfigVar("CRUPEST_AUTO_BACKUP_COS_SECRET_KEY", - "access key secret for Tencent COS, used for auto backup", "Please input your Tencent COS access key for backup"), - ConfigVar("CRUPEST_AUTO_BACKUP_COS_REGION", - "region for Tencent COS, used for auto backup", "Please input your Tencent COS region for backup", "ap-hongkong"), - ConfigVar("CRUPEST_AUTO_BACKUP_BUCKET_NAME", - "bucket name for Tencent COS, used for auto backup", "Please input your Tencent COS bucket name for backup"), - ConfigVar("CRUPEST_GITHUB_USERNAME", - "github username for fetching todos", "Please input your github username for fetching todos", "crupest"), - ConfigVar("CRUPEST_GITHUB_PROJECT_NUMBER", - "github project number for fetching todos", "Please input your github project number for fetching todos", "2"), - ConfigVar("CRUPEST_GITHUB_TOKEN", - "github token for fetching todos", "Please input your github token for fetching todos"), - ConfigVar("CRUPEST_GITHUB_TODO_COUNT", - "github todo count", "Please input your github todo count", 10), - ConfigVar("CRUPEST_GITHUB_TODO_COUNT", - "github todo count", "Please input your github todo count", 10), - ConfigVar("CRUPEST_V2RAY_TOKEN", - "v2ray user id", generate_uuid), - ConfigVar("CRUPEST_V2RAY_PATH", - "v2ray path, which will be prefixed by _", generate_uuid), - ConfigVar("CRUPEST_FORGEJO_MAILER_USER", - "Forgejo SMTP user.", "Please input your Forgejo SMTP user."), - ConfigVar("CRUPEST_FORGEJO_MAILER_PASSWD", - "Forgejo SMTP password.", "Please input your Forgejo SMTP password."), - ConfigVar("CRUPEST_2FAUTH_APP_KEY", - "2FAuth App Key.", generate_random_string_32), - ConfigVar("CRUPEST_2FAUTH_MAIL_USERNAME", - "2FAuth SMTP user.", "Please input your 2FAuth SMTP user."), - ConfigVar("CRUPEST_2FAUTH_MAIL_PASSWORD", - "2FAuth SMTP password.", "Please input your 2FAuth SMTP password."), -] - -config_var_name_set = set([config_var.name for config_var in config_var_list]) - - -def check_config_var_set(needed_config_var_set: set): - more = [] - less = [] - for var_name in needed_config_var_set: - if var_name not in config_var_name_set: - more.append(var_name) - for var_name in config_var_name_set: - if var_name not in needed_config_var_set: - less.append(var_name) - return (True if len(more) == 0 else False, more, less) - - -def config_file_exists(): - return os.path.isfile(config_file_path) - - -def parse_config(str: str) -> dict: - config = {} - for line_number, line in enumerate(str.splitlines()): - # check if it's a comment - if line.startswith("#"): - continue - # check if there is a '=' - if line.find("=") == -1: - raise ValueError( - f"Invalid config string. Please check line {line_number + 1}. There is even no '='!") - # split at first '=' - key, value = line.split("=", 1) - key = key.strip() - value = value.strip() - config[key] = value - return config - - -def get_domain() -> str: - if not config_file_exists(): - raise ValueError("Config file not found!") - with open(config_file_path) as f: - config = parse_config(f.read()) - if "CRUPEST_DOMAIN" not in config: - raise ValueError("Domain not found in config file!") - return config["CRUPEST_DOMAIN"] - - -def config_to_str(config: dict) -> str: - return "\n".join([f"{key}={value}" for key, value in config.items()]) - - -def print_config(console, config: dict) -> None: - for key, value in config.items(): - console.print(f"[magenta]{key}[/] = [cyan]{value}") diff --git a/tools/aio/modules/dns.py b/tools/aio/modules/dns.py deleted file mode 100644 index 5006d5f..0000000 --- a/tools/aio/modules/dns.py +++ /dev/null @@ -1,42 +0,0 @@ -from os.path import * -from io import StringIO -import re -from .nginx import * - - -def generate_dns_zone(domain: str, ip: str, /, ttl: str | int = 600, *, enable_mail: bool = True, dkim: str | None = None) -> str: - result = f"$ORIGIN {domain}.\n\n" - result += "; A records\n" - result += f"@ {ttl} IN A {ip}\n" - subdomains = list_subdomain_names() - for subdomain in subdomains: - result += f"{subdomain} {ttl} IN A {ip}\n" - - if enable_mail: - result += "\n; MX records\n" - result += f"@ {ttl} IN MX 10 mail.{domain}.\n" - result += "\n; SPF record\n" - result += f"@ {ttl} IN TXT \"v=spf1 mx ~all\"\n" - if dkim is not None: - result += "\n; DKIM record\n" - result += f"mail._domainkey {ttl} IN TEXT \"{dkim}\"" - result += "\n; DMARC record\n" - result += "_dmarc {ttl} IN TXT \"v=DMARC1; p=none; rua=mailto:dmarc.report@{domain}; ruf=mailto:dmarc.report@{domain}; sp=none; ri=86400\"\n" - return result - - -def get_dkim_from_mailserver(domain: str) -> str | None: - dkim_path = join(data_dir, "dms/config/opendkim/keys", domain, "mail.txt") - if not exists(dkim_path): - return None - - p = subprocess.run(["sudo", "cat", dkim_path], - capture_output=True, check=True) - value = "" - for match in re.finditer("\"(.*)\"", p.stdout.decode('utf-8')): - value += match.group(1) - return value - - -def generate_dns_zone_with_dkim(domain: str, ip: str, /, ttl: str | int = 600) -> str: - return generate_dns_zone(domain, ip, ttl, enable_mail=True, dkim=get_dkim_from_mailserver(domain)) diff --git a/tools/aio/modules/download_tools.py b/tools/aio/modules/download_tools.py deleted file mode 100644 index beb06d4..0000000 --- a/tools/aio/modules/download_tools.py +++ /dev/null @@ -1,47 +0,0 @@ -import sys -from os.path import * -from urllib.request import * -from rich.prompt import Confirm -from .path import * -from .helper import print_order - - -TOOLS = [("docker-mailserver setup script", "docker-mailserver-setup.sh", - "https://raw.githubusercontent.com/docker-mailserver/docker-mailserver/master/setup.sh")] - - -def download_tools(console): - # if we are not linux, we prompt the user - if sys.platform != "linux": - console.print( - "You are not running this script on linux. The tools will not work.", style="yellow") - if not Confirm.ask("Do you want to continue?", default=False, console=console): - return - - for index, script in enumerate(TOOLS): - number = index + 1 - total = len(TOOLS) - print_order(number, total, console) - name, filename, url = script - # if url is callable, call it - if callable(url): - url = url() - path = join(tool_dir, filename) - skip = False - if exists(path): - overwrite = Confirm.ask( - f"[cyan]{name}[/] already exists, download and overwrite?", default=False, console=console) - if not overwrite: - skip = True - else: - download = Confirm.ask( - f"Download [cyan]{name}[/] to [magenta]{path}[/]?", default=True, console=console) - if not download: - skip = True - if not skip: - console.print(f"Downloading {name}...") - urlretrieve(url, path) - os.chmod(path, 0o755) - console.print(f"Downloaded {name} to {path}.", style="green") - else: - console.print(f"Skipped {name}.", style="yellow") diff --git a/tools/aio/modules/helper.py b/tools/aio/modules/helper.py deleted file mode 100644 index f8fe34a..0000000 --- a/tools/aio/modules/helper.py +++ /dev/null @@ -1,18 +0,0 @@ -import os -import os.path -from .path import * - - -def run_in_dir(dir: str, func: callable): - old_dir = os.path.abspath(os.getcwd()) - os.chdir(dir) - func() - os.chdir(old_dir) - - -def run_in_project_dir(func: callable): - run_in_dir(project_dir, func) - - -def print_order(number: int, total: int, /, console) -> None: - console.print(f"\[{number}/{total}]", end=" ", style="green") diff --git a/tools/aio/modules/install_docker.py b/tools/aio/modules/install_docker.py deleted file mode 100644 index ac50290..0000000 --- a/tools/aio/modules/install_docker.py +++ /dev/null @@ -1,16 +0,0 @@ -from os.path import * -from .path import * -import urllib -import subprocess - - -def install_docker(): - ensure_tmp_dir() - get_docker_path = join(tmp_dir, "get-docker.sh") - urllib.request.urlretrieve("https://get.docker.com", get_docker_path) - os.chmod(get_docker_path, 0o755) - subprocess.run(["sudo", "sh", get_docker_path], check=True) - subprocess.run(["sudo", "systemctl", "enable", - "--now", "docker"], check=True) - subprocess.run(["sudo", "usermod", "-aG", "docker", - os.getlogin()], check=True) diff --git a/tools/aio/modules/nginx.py b/tools/aio/modules/nginx.py deleted file mode 100755 index f69c5df..0000000 --- a/tools/aio/modules/nginx.py +++ /dev/null @@ -1,247 +0,0 @@ -#!/usr/bin/env python3 - -import json -import jsonschema -import os -from os.path import * -import shutil -import subprocess -from rich.prompt import Confirm -from cryptography.x509 import * -from cryptography.x509.oid import ExtensionOID -from .template import Template -from .path import * - -with open(join(nginx_template_dir, 'server.json')) as f: - server = json.load(f) - -with open(join(nginx_template_dir, 'server.schema.json')) as f: - schema = json.load(f) - -jsonschema.validate(server, schema) - -non_template_files = ['forbid_unknown_domain.conf', "websocket.conf"] - -ssl_template = Template(join(nginx_template_dir, 'ssl.conf.template')) -root_template = Template(join( - nginx_template_dir, 'root.conf.template')) -static_file_template = Template(join( - nginx_template_dir, 'static-file.conf.template')) -reverse_proxy_template = Template(join( - nginx_template_dir, 'reverse-proxy.conf.template')) -redirect_template = Template(join( - nginx_template_dir, 'redirect.conf.template')) -cert_only_template = Template(join( - nginx_template_dir, 'cert-only.conf.template')) - -nginx_var_set = set.union(root_template.var_set, - static_file_template.var_set, reverse_proxy_template.var_set) - - -def list_subdomain_names() -> list: - return [s["subdomain"] for s in server["sites"]] - - -def list_subdomains(domain: str) -> list: - return [f"{s['subdomain']}.{domain}" for s in server["sites"]] - - -def list_domains(domain: str) -> list: - return [domain, *list_subdomains(domain)] - - -def generate_nginx_config(domain: str, original_config, dest: str) -> None: - if not isdir(dest): - raise ValueError('dest must be a directory') - # copy ssl.conf and https-redirect.conf which need no variable substitution - for filename in non_template_files: - src = join(nginx_template_dir, filename) - dst = join(dest, filename) - shutil.copyfile(src, dst) - config = { - "CRUPEST_DOMAIN": domain, - "CRUPEST_V2RAY_TOKEN": original_config["CRUPEST_V2RAY_TOKEN"], - "CRUPEST_V2RAY_PATH": original_config["CRUPEST_V2RAY_PATH"] - } - # generate ssl.conf - with open(join(dest, 'ssl.conf'), 'w') as f: - f.write(ssl_template.generate(config)) - # generate root.conf - with open(join(dest, f'{domain}.conf'), 'w') as f: - root_config = config.copy() - root_config["CRUPEST_V2RAY_TOKEN"] = config["CRUPEST_V2RAY_TOKEN"] - root_config["CRUPEST_V2RAY_PATH"] = config["CRUPEST_V2RAY_PATH"] - f.write(root_template.generate(config)) - # generate nginx config for each site - sites: list = server["sites"] - for site in sites: - subdomain = site["subdomain"] - local_config = config.copy() - local_config['CRUPEST_NGINX_SUBDOMAIN'] = subdomain - if site["type"] == 'static-file': - template = static_file_template - local_config['CRUPEST_NGINX_ROOT'] = site["root"] - elif site["type"] == 'reverse-proxy': - template = reverse_proxy_template - local_config['CRUPEST_NGINX_UPSTREAM_SERVER'] = site["upstream"] - elif site["type"] == 'redirect': - template = redirect_template - local_config['CRUPEST_NGINX_URL'] = site["url"] - elif site["type"] == 'cert-only': - template = cert_only_template - else: - raise Exception('Invalid site type') - with open(join(dest, f'{subdomain}.{domain}.conf'), 'w') as f: - f.write(template.generate(local_config)) - - -def check_nginx_config_dir(dir_path: str, domain: str) -> list: - if not exists(dir_path): - return [] - good_files = [*non_template_files, "ssl.conf", * - [f"{full_domain}.conf" for full_domain in list_domains(domain)]] - bad_files = [] - for path in os.listdir(dir_path): - file_name = basename(path) - if file_name not in good_files: - bad_files.append(file_name) - return bad_files - - -def restart_nginx(force=False) -> bool: - if not force: - p = subprocess.run(['docker', "container", "ls", - "-f", "name=nginx", "-q"], capture_output=True) - container: str = p.stdout.decode("utf-8") - if len(container.strip()) == 0: - return False - subprocess.run(['docker', 'restart', 'nginx']) - return True - - -def nginx(domain: str, config, /, console) -> None: - bad_files = check_nginx_config_dir(nginx_config_dir, domain) - if len(bad_files) > 0: - console.print( - "WARNING: It seems there are some bad conf files in the nginx config directory:", style="yellow") - for bad_file in bad_files: - console.print(bad_file, style="cyan") - to_delete = Confirm.ask( - "They will affect nginx in a [red]bad[/] way. Do you want to delete them?", default=True, console=console) - if to_delete: - for file in bad_files: - os.remove(join(nginx_config_dir, file)) - console.print( - "I have found following var in nginx templates:", style="green") - for var in nginx_var_set: - console.print(var, style="magenta") - if not exists(nginx_config_dir): - os.mkdir(nginx_config_dir) - console.print( - f"Nginx config directory created at [magenta]{nginx_config_dir}[/]", style="green") - generate_nginx_config(domain, config, dest=nginx_config_dir) - console.print("Nginx config generated.", style="green") - if restart_nginx(): - console.print('Nginx restarted.', style="green") - - -def certbot_command_gen(domain: str, action, /, test=False, no_docker=False, *, standalone=None, email=None, agree_tos=False) -> str: - domains = list_domains(domain) - - add_domain_option = True - if action == 'create': - if standalone == None: - standalone = True - certbot_action = "certonly" - elif action == 'expand': - if standalone == None: - standalone = False - certbot_action = "certonly" - elif action == 'renew': - if standalone == None: - standalone = False - add_domain_option = False - certbot_action = "renew" - else: - raise ValueError('Invalid action') - - if no_docker: - command = "certbot " - else: - expose_segment = ' -p "0.0.0.0:80:80"' - web_root_segment = ' -v "{project_abs_path}/data/certbot/webroot:/var/www/certbot"' - command = f'docker run -it --rm --name certbot -v "{project_abs_path}/data/certbot/certs:/etc/letsencrypt" -v "{project_abs_path}/data/certbot/data:/var/lib/letsencrypt"{ expose_segment if standalone else web_root_segment} certbot/certbot ' - - command += certbot_action - - if standalone: - command += " --standalone" - else: - command += ' --webroot -w /var/www/certbot' - - if add_domain_option: - command += f' -d {" -d ".join(domains)}' - - if email is not None: - command += f' --email {email}' - - if agree_tos: - command += ' --agree-tos' - - if test: - command += " --test-cert --dry-run" - - return command - - -def get_cert_path(root_domain): - return join(data_dir, "certbot", "certs", "live", root_domain, "fullchain.pem") - - -def get_cert_domains(cert_path, root_domain): - - if not exists(cert_path): - return None - - if not isfile(cert_path): - return None - - with open(cert_path, 'rb') as f: - cert = load_pem_x509_certificate(f.read()) - ext = cert.extensions.get_extension_for_oid( - ExtensionOID.SUBJECT_ALTERNATIVE_NAME) - domains: list = ext.value.get_values_for_type(DNSName) - domains.remove(root_domain) - domains = [root_domain, *domains] - return domains - - -def print_create_cert_message(domain, console): - console.print( - "Looks like you haven't run certbot to get the init ssl certificates. You may want to run following code to get one:", style="cyan") - console.print(certbot_command_gen(domain, "create"), - soft_wrap=True, highlight=False) - - -def check_ssl_cert(domain, console): - cert_path = get_cert_path(domain) - tmp_cert_path = join(tmp_dir, "fullchain.pem") - console.print("Temporarily copy cert to tmp...", style="yellow") - ensure_tmp_dir() - subprocess.run( - ["sudo", "cp", cert_path, tmp_cert_path], check=True) - subprocess.run(["sudo", "chown", str(os.geteuid()), - tmp_cert_path], check=True) - cert_domains = get_cert_domains(tmp_cert_path, domain) - if cert_domains is None: - print_create_cert_message(domain, console) - else: - cert_domain_set = set(cert_domains) - domains = set(list_domains(domain)) - if not cert_domain_set == domains: - console.print( - "Cert domains are not equal to host domains. Run following command to recreate it with nginx stopped.", style="red") - console.print(certbot_command_gen( - domain, "create", standalone=True), soft_wrap=True, highlight=False) - console.print("Remove tmp cert...", style="yellow") - os.remove(tmp_cert_path) diff --git a/tools/aio/modules/path.py b/tools/aio/modules/path.py deleted file mode 100644 index ac969d3..0000000 --- a/tools/aio/modules/path.py +++ /dev/null @@ -1,30 +0,0 @@ -import os -import os.path - -script_dir = os.path.relpath(os.path.dirname(__file__)) -project_dir = os.path.normpath(os.path.join(script_dir, "../../../")) -project_abs_path = os.path.abspath(project_dir) -template_dir = os.path.join(project_dir, "template") -nginx_template_dir = os.path.join(template_dir, "nginx") -data_dir = os.path.join(project_dir, "data") -tool_dir = os.path.join(project_dir, "tools") -tmp_dir = os.path.join(project_dir, "tmp") -backup_dir = os.path.join(project_dir, "backup") -config_file_path = os.path.join(data_dir, "config") -nginx_config_dir = os.path.join(project_dir, "nginx-config") -log_dir = os.path.join(project_dir, "log") - - -def ensure_log_dir(): - if not os.path.exists(log_dir): - os.mkdir(log_dir) - - -def ensure_tmp_dir(): - if not os.path.exists(tmp_dir): - os.mkdir(tmp_dir) - - -def ensure_backup_dir(): - if not os.path.exists(backup_dir): - os.mkdir(backup_dir) diff --git a/tools/aio/modules/setup.py b/tools/aio/modules/setup.py deleted file mode 100644 index 4e91302..0000000 --- a/tools/aio/modules/setup.py +++ /dev/null @@ -1,233 +0,0 @@ -from os.path import * -from datetime import datetime -from rich.prompt import Confirm -from .path import * -from .nginx import * -from .config import * -from .helper import * - - -def get_template_name_list(console) -> list[str]: - console.print("First let's check all the templates...") - - # get all filenames ending with .template - template_name_list = [basename(f)[:-len('.template')] for f in os.listdir( - template_dir) if f.endswith(".template")] - console.print( - f"I have found following template files in [magenta]{template_dir}[/]:", style="green") - for filename in template_name_list: - console.print(f"{filename}.template", style="magenta") - - return template_name_list - - -def data_dir_check(domain, console): - if isdir(data_dir): - if not exists(join(data_dir, "certbot")): - print_create_cert_message(domain, console) - else: - to_check = Confirm.ask( - "I want to check your ssl certs, but I need to sudo. Do you want me check", console=console, default=False) - if to_check: - check_ssl_cert(domain, console) - - -def template_generate(console): - template_name_list = get_template_name_list(console) - template_list: list = [] - config_var_name_set_in_template = set() - for template_name in template_name_list: - template = Template(join(template_dir, template_name+".template")) - template_list.append(template) - config_var_name_set_in_template.update(template.var_set) - - console.print( - "I have found following variables needed in templates:", style="green") - for key in config_var_name_set_in_template: - console.print(key, style="magenta") - - # check vars - check_success, more, less = check_config_var_set( - config_var_name_set_in_template) - if len(more) != 0: - console.print("There are more variables in templates than in config file:", - style="red") - for key in more: - console.print(key, style="magenta") - if len(less) != 0: - console.print("Following config vars are not used:", - style="yellow") - for key in less: - console.print(key, style="magenta") - - if not check_success: - console.print( - "Please check you config vars and make sure the needed ones are defined!", style="red") - else: - console.print( - "Now let's check if they are already generated...") - - conflict = False - - # check if there exists any generated files - for filename in template_name_list: - if exists(join(project_dir, filename)): - console.print(f"Found [magenta]{filename}[/]") - conflict = True - - to_gen = True - if conflict: - to_overwrite = Confirm.ask( - "It seems there are some files already generated. Do you want to overwrite them?", console=console, default=False) - if not to_overwrite: - to_gen = False - console.print( - "Great! Check the existing files and see you next time!", style="green") - else: - print("No conflict found. Let's go on!\n") - - if to_gen: - console.print("Check for existing config file...") - - # check if there exists a config file - if not config_file_exists(): - config = {} - console.print( - "No existing config file found. Don't worry. Let's create one!", style="green") - for config_var in config_var_list: - config[config_var.name] = config_var.get_default_value() - config_content = config_to_str(config) - # create data dir if not exist - if not exists(data_dir): - os.mkdir(data_dir) - # write config file - with open(config_file_path, "w") as f: - f.write(config_content) - console.print( - f"Everything else is auto generated. The config file is written into [magenta]{config_file_path}[/]. You had better keep it safe. And here is the content:", style="green") - print_config(console, config) - is_ok = Confirm.ask( - "If you think it's not ok, you can stop here and edit it. Or let's go on?", console=console, default=True) - if not is_ok: - console.print( - "Great! Check the config file and see you next time!", style="green") - to_gen = False - else: - console.print( - "Looks like you have already had a config file. Let's check the content:", style="green") - with open(config_file_path, "r") as f: - content = f.read() - config = parse_config(content) - print_config(console, config) - missed_config_vars = [] - for config_var in config_var_list: - if config_var.name not in config: - missed_config_vars.append(config_var) - - if len(missed_config_vars) > 0: - console.print( - "Oops! It seems you have missed some keys in your config file. Let's add them!", style="green") - for config_var in missed_config_vars: - config[config_var.name] = config_var.get_default_value( - console) - content = config_to_str(config) - with open(config_file_path, "w") as f: - f.write(content) - console.print( - f"Here is the new config, it has been written out to [magenta]{config_file_path}[/]:") - print_config(console, config) - good_enough = Confirm.ask("Is it good enough?", - console=console, default=True) - if not good_enough: - console.print( - "Great! Check the config file and see you next time!", style="green") - to_gen = False - - domain = get_domain() - - if to_gen: - console.print( - "Finally, everything is ready. Let's generate the files:", style="green") - - # generate files - for index, template in enumerate(template_list): - number = index + 1 - total = len(template_list) - print_order(number, total, console) - console.print( - f"Generating [magenta]{template.template_name}[/]...") - content = template.generate(config) - with open(join(project_dir, template.template_name), "w") as f: - f.write(content) - - # generate nginx config - if not exists(nginx_config_dir): - to_gen_nginx_conf = Confirm.ask("It seems you haven't generate nginx config. Do you want to generate it?", - default=True, console=console) - else: - # get the latest time of files in nginx template - template_time = 0 - for path in os.listdir(nginx_template_dir): - template_time = max(template_time, os.stat( - join(nginx_template_dir, path)).st_mtime) - console.print( - f"Nginx template update time: {datetime.fromtimestamp(template_time)}") - - nginx_config_time = 0 - for path in os.listdir(nginx_config_dir): - nginx_config_time = max(nginx_config_time, os.stat( - join(nginx_config_dir, path)).st_mtime) - console.print( - f"Generated nginx template update time: {datetime.fromtimestamp(nginx_config_time)}") - if template_time > nginx_config_time: - to_gen_nginx_conf = Confirm.ask("It seems you have updated the nginx template and not regenerate config. Do you want to regenerate the nginx config?", - default=True, console=console) - else: - to_gen_nginx_conf = Confirm.ask("[yellow]It seems you have already generated nginx config. Do you want to overwrite it?[/]", - default=False, console=console) - if to_gen_nginx_conf: - nginx(domain, config, console) - data_dir_check(domain, console) - - -def clear(console, /, delete_data_dir=False): - template_name_list = get_template_name_list(console) - # check root if we have to delete data dir - if delete_data_dir and exists(data_dir) and os.geteuid() != 0: - console.print( - "You need to be root to delete data dir.", style="red") - exit(1) - - to_delete = Confirm.ask( - "[yellow]Are you sure you want to delete everything? all your data will be lost![/]", default=False, console=console) - if to_delete: - files_to_delete = [] - for template_name in template_name_list: - f = join(project_dir, template_name) - if exists(f): - files_to_delete.append(f) - - delete_data_dir = delete_data_dir and exists( - data_dir) - - if len(files_to_delete) == 0: - console.print( - "Nothing to delete. We are safe!", style="green") - else: - console.print("Here are the files to delete:") - for f in files_to_delete: - console.print(f, style="magenta") - if delete_data_dir: - console.print(data_dir + " (data dir)", - style="magenta") - - to_delete = Confirm.ask( - "[red]Are you sure you want to delete them?[/]", default=False, console=console) - if to_delete: - for f in files_to_delete: - os.remove(f) - if delete_data_dir: - # recursively delete data dir - shutil.rmtree(data_dir) - console.print( - "Your workspace is clean now!", style="green") diff --git a/tools/aio/modules/template.py b/tools/aio/modules/template.py deleted file mode 100644 index 9747af1..0000000 --- a/tools/aio/modules/template.py +++ /dev/null @@ -1,32 +0,0 @@ -import os.path -import re - - -class Template: - def __init__(self, template_path: str, var_prefix: str = "CRUPEST"): - if len(var_prefix) != 0 and re.fullmatch(r"^[a-zA-Z_][a-zA-Z0-9_]*$", var_prefix) is None: - raise ValueError("Invalid var prefix.") - self.template_path = template_path - self.template_name = os.path.basename( - template_path)[:-len(".template")] - with open(template_path, "r") as f: - self.template = f.read() - self.var_prefix = var_prefix - self.__var_regex = re.compile(r"\$(" + var_prefix + r"_[a-zA-Z0-9_]+)") - self.__var_brace_regex = re.compile( - r"\$\{\s*(" + var_prefix + r"_[a-zA-Z0-9_]+)\s*\}") - var_set = set() - for match in self.__var_regex.finditer(self.template): - var_set.add(match.group(1)) - for match in self.__var_brace_regex.finditer(self.template): - var_set.add(match.group(1)) - self.var_set = var_set - - def generate(self, config: dict) -> str: - result = self.template - for var in self.var_set: - if var not in config: - raise ValueError(f"Missing config var {var}.") - result = result.replace("$" + var, config[var]) - result = re.sub(r"\$\{\s*" + var + r"\s*\}", config[var], result) - return result diff --git a/tools/aio/modules/test.py b/tools/aio/modules/test.py deleted file mode 100644 index d6eb778..0000000 --- a/tools/aio/modules/test.py +++ /dev/null @@ -1,31 +0,0 @@ -import json -from http.client import * -from urllib.request import urlopen - - -def test_crupest_api(console): - def do_the_test(): - res: HTTPResponse = urlopen("http://localhost:5188/api/todos") - body = res.read() - - if res.status != 200: - raise Exception("Status code is not 200.") - result = json.loads(body) - if not isinstance(result, list): - raise Exception("Result is not an array.") - if len(result) == 0: - raise Exception("Result is an empty array.") - if not isinstance(result[0], dict): - raise Exception("Result[0] is not an object.") - if not isinstance(result[0].get("title"), str): - raise Exception("Result[0].title is not a string.") - if not isinstance(result[0].get("status"), str): - raise Exception("Result[0].status is not a string.") - - try: - do_the_test() - console.print("Test passed!", style="green") - exit(0) - except Exception as e: - console.print(e) - console.print("Test failed!", style="red") |