aboutsummaryrefslogtreecommitdiff
path: root/tools/cru-py/crupest/nginx.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/cru-py/crupest/nginx.py')
-rw-r--r--tools/cru-py/crupest/nginx.py246
1 files changed, 0 insertions, 246 deletions
diff --git a/tools/cru-py/crupest/nginx.py b/tools/cru-py/crupest/nginx.py
deleted file mode 100644
index 1ec5c6b..0000000
--- a/tools/cru-py/crupest/nginx.py
+++ /dev/null
@@ -1,246 +0,0 @@
-from typing import cast
-import json
-import jsonschema
-import os
-from os.path import *
-import shutil
-import subprocess
-from rich.prompt import Confirm
-from cryptography.x509 import *
-from cryptography.x509.oid import ExtensionOID
-from .template import Template
-from .path import *
-
-with open(join(nginx_template_dir, 'server.json')) as f:
- server = json.load(f)
-
-with open(join(nginx_template_dir, 'server.schema.json')) as f:
- schema = json.load(f)
-
-jsonschema.validate(server, schema)
-
-non_template_files = ['forbid_unknown_domain.conf', "websocket.conf"]
-
-ssl_template = Template(join(nginx_template_dir, 'ssl.conf.template'))
-root_template = Template(join(
- nginx_template_dir, 'root.conf.template'))
-static_file_template = Template(join(
- nginx_template_dir, 'static-file.conf.template'))
-reverse_proxy_template = Template(join(
- nginx_template_dir, 'reverse-proxy.conf.template'))
-redirect_template = Template(join(
- nginx_template_dir, 'redirect.conf.template'))
-cert_only_template = Template(join(
- nginx_template_dir, 'cert-only.conf.template'))
-
-nginx_var_set = set.union(root_template.var_set,
- static_file_template.var_set, reverse_proxy_template.var_set)
-
-
-def list_subdomain_names() -> list:
- return [s["subdomain"] for s in server["sites"]]
-
-
-def list_subdomains(domain: str) -> list:
- return [f"{s['subdomain']}.{domain}" for s in server["sites"]]
-
-
-def list_domains(domain: str) -> list:
- return [domain, *list_subdomains(domain)]
-
-
-def generate_nginx_config(domain: str, original_config, dest: str) -> None:
- if not isdir(dest):
- raise ValueError('dest must be a directory')
- # copy ssl.conf and https-redirect.conf which need no variable substitution
- for filename in non_template_files:
- src = join(nginx_template_dir, filename)
- dst = join(dest, filename)
- shutil.copyfile(src, dst)
- config = {
- "CRUPEST_DOMAIN": domain,
- "CRUPEST_V2RAY_TOKEN": original_config["CRUPEST_V2RAY_TOKEN"],
- "CRUPEST_V2RAY_PATH": original_config["CRUPEST_V2RAY_PATH"]
- }
- # generate ssl.conf
- with open(join(dest, 'ssl.conf'), 'w') as f:
- f.write(ssl_template.generate(config))
- # generate root.conf
- with open(join(dest, f'{domain}.conf'), 'w') as f:
- root_config = config.copy()
- root_config["CRUPEST_V2RAY_TOKEN"] = config["CRUPEST_V2RAY_TOKEN"]
- root_config["CRUPEST_V2RAY_PATH"] = config["CRUPEST_V2RAY_PATH"]
- f.write(root_template.generate(config))
- # generate nginx config for each site
- sites: list = server["sites"]
- for site in sites:
- subdomain = site["subdomain"]
- local_config = config.copy()
- local_config['CRUPEST_NGINX_SUBDOMAIN'] = subdomain
- if site["type"] == 'static-file':
- template = static_file_template
- local_config['CRUPEST_NGINX_ROOT'] = site["root"]
- elif site["type"] == 'reverse-proxy':
- template = reverse_proxy_template
- local_config['CRUPEST_NGINX_UPSTREAM_SERVER'] = site["upstream"]
- elif site["type"] == 'redirect':
- template = redirect_template
- local_config['CRUPEST_NGINX_URL'] = site["url"]
- elif site["type"] == 'cert-only':
- template = cert_only_template
- else:
- raise Exception('Invalid site type')
- with open(join(dest, f'{subdomain}.{domain}.conf'), 'w') as f:
- f.write(template.generate(local_config))
-
-
-def check_nginx_config_dir(dir_path: str, domain: str) -> list:
- if not exists(dir_path):
- return []
- good_files = [*non_template_files, "ssl.conf", *
- [f"{full_domain}.conf" for full_domain in list_domains(domain)]]
- bad_files = []
- for path in os.listdir(dir_path):
- file_name = basename(path)
- if file_name not in good_files:
- bad_files.append(file_name)
- return bad_files
-
-
-def restart_nginx(force=False) -> bool:
- if not force:
- p = subprocess.run(['docker', "container", "ls",
- "-f", "name=nginx", "-q"], capture_output=True)
- container: str = p.stdout.decode("utf-8")
- if len(container.strip()) == 0:
- return False
- subprocess.run(['docker', 'restart', 'nginx'])
- return True
-
-
-def nginx(domain: str, config, /, console) -> None:
- bad_files = check_nginx_config_dir(nginx_config_dir, domain)
- if len(bad_files) > 0:
- console.print(
- "WARNING: It seems there are some bad conf files in the nginx config directory:", style="yellow")
- for bad_file in bad_files:
- console.print(bad_file, style="cyan")
- to_delete = Confirm.ask(
- "They will affect nginx in a [red]bad[/] way. Do you want to delete them?", default=True, console=console)
- if to_delete:
- for file in bad_files:
- os.remove(join(nginx_config_dir, file))
- console.print(
- "I have found following var in nginx templates:", style="green")
- for var in nginx_var_set:
- console.print(var, style="magenta")
- if not exists(nginx_config_dir):
- os.mkdir(nginx_config_dir)
- console.print(
- f"Nginx config directory created at [magenta]{nginx_config_dir}[/]", style="green")
- generate_nginx_config(domain, config, dest=nginx_config_dir)
- console.print("Nginx config generated.", style="green")
- if restart_nginx():
- console.print('Nginx restarted.', style="green")
-
-
-def certbot_command_gen(domain: str, action, /, test=False, no_docker=False, *, standalone=None, email=None, agree_tos=False) -> str:
- domains = list_domains(domain)
-
- add_domain_option = True
- if action == 'create':
- if standalone == None:
- standalone = True
- certbot_action = "certonly"
- elif action == 'expand':
- if standalone == None:
- standalone = False
- certbot_action = "certonly"
- elif action == 'renew':
- if standalone == None:
- standalone = False
- add_domain_option = False
- certbot_action = "renew"
- else:
- raise ValueError('Invalid action')
-
- if no_docker:
- command = "certbot "
- else:
- expose_segment = ' -p "0.0.0.0:80:80"'
- web_root_segment = ' -v "{project_abs_path}/data/certbot/webroot:/var/www/certbot"'
- command = f'docker run -it --rm --name certbot -v "{project_abs_path}/data/certbot/certs:/etc/letsencrypt" -v "{project_abs_path}/data/certbot/data:/var/lib/letsencrypt"{ expose_segment if standalone else web_root_segment} certbot/certbot '
-
- command += certbot_action
-
- if standalone:
- command += " --standalone"
- else:
- command += ' --webroot -w /var/www/certbot'
-
- if add_domain_option:
- command += f' -d {" -d ".join(domains)}'
-
- if email is not None:
- command += f' --email {email}'
-
- if agree_tos:
- command += ' --agree-tos'
-
- if test:
- command += " --test-cert --dry-run"
-
- return command
-
-
-def get_cert_path(root_domain):
- return join(data_dir, "certbot", "certs", "live", root_domain, "fullchain.pem")
-
-
-def get_cert_domains(cert_path, root_domain):
-
- if not exists(cert_path):
- return None
-
- if not isfile(cert_path):
- return None
-
- with open(cert_path, 'rb') as f:
- cert = load_pem_x509_certificate(f.read())
- ext = cert.extensions.get_extension_for_oid(
- ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
- domains: list[str] = cast(
- SubjectAlternativeName, ext.value).get_values_for_type(DNSName)
- domains.remove(root_domain)
- domains = [root_domain, *domains]
- return domains
-
-
-def print_create_cert_message(domain, console):
- console.print(
- "Looks like you haven't run certbot to get the init ssl certificates. You may want to run following code to get one:", style="cyan")
- console.print(certbot_command_gen(domain, "create"),
- soft_wrap=True, highlight=False)
-
-
-def check_ssl_cert(domain, console):
- cert_path = get_cert_path(domain)
- tmp_cert_path = join(tmp_dir, "fullchain.pem")
- console.print("Temporarily copy cert to tmp...", style="yellow")
- subprocess.run(
- ["sudo", "cp", cert_path, tmp_cert_path], check=True)
- subprocess.run(["sudo", "chown", str(os.geteuid()),
- tmp_cert_path], check=True)
- cert_domains = get_cert_domains(tmp_cert_path, domain)
- if cert_domains is None:
- print_create_cert_message(domain, console)
- else:
- cert_domain_set = set(cert_domains)
- domains = set(list_domains(domain))
- if not cert_domain_set == domains:
- console.print(
- "Cert domains are not equal to host domains. Run following command to recreate it with nginx stopped.", style="red")
- console.print(certbot_command_gen(
- domain, "create", standalone=True), soft_wrap=True, highlight=False)
- console.print("Remove tmp cert...", style="yellow")
- os.remove(tmp_cert_path)