aboutsummaryrefslogtreecommitdiff
path: root/tools/cru-py/crupest/nginx.py
blob: 1ec5c6bad35b05dff0a87af8182a5192a8d1fc8d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
from typing import cast
import json
import jsonschema
import os
from os.path import *
import shutil
import subprocess
from rich.prompt import Confirm
from cryptography.x509 import *
from cryptography.x509.oid import ExtensionOID
from .template import Template
from .path import *

with open(join(nginx_template_dir, 'server.json')) as f:
    server = json.load(f)

with open(join(nginx_template_dir, 'server.schema.json')) as f:
    schema = json.load(f)

jsonschema.validate(server, schema)

non_template_files = ['forbid_unknown_domain.conf', "websocket.conf"]

ssl_template = Template(join(nginx_template_dir, 'ssl.conf.template'))
root_template = Template(join(
    nginx_template_dir, 'root.conf.template'))
static_file_template = Template(join(
    nginx_template_dir, 'static-file.conf.template'))
reverse_proxy_template = Template(join(
    nginx_template_dir, 'reverse-proxy.conf.template'))
redirect_template = Template(join(
    nginx_template_dir, 'redirect.conf.template'))
cert_only_template = Template(join(
    nginx_template_dir, 'cert-only.conf.template'))

nginx_var_set = set.union(root_template.var_set,
                          static_file_template.var_set, reverse_proxy_template.var_set)


def list_subdomain_names() -> list:
    return [s["subdomain"] for s in server["sites"]]


def list_subdomains(domain: str) -> list:
    return [f"{s['subdomain']}.{domain}" for s in server["sites"]]


def list_domains(domain: str) -> list:
    return [domain, *list_subdomains(domain)]


def generate_nginx_config(domain: str, original_config, dest: str) -> None:
    if not isdir(dest):
        raise ValueError('dest must be a directory')
    # copy ssl.conf and https-redirect.conf which need no variable substitution
    for filename in non_template_files:
        src = join(nginx_template_dir, filename)
        dst = join(dest, filename)
        shutil.copyfile(src, dst)
    config = {
        "CRUPEST_DOMAIN": domain,
        "CRUPEST_V2RAY_TOKEN": original_config["CRUPEST_V2RAY_TOKEN"],
        "CRUPEST_V2RAY_PATH": original_config["CRUPEST_V2RAY_PATH"]
    }
    # generate ssl.conf
    with open(join(dest, 'ssl.conf'), 'w') as f:
        f.write(ssl_template.generate(config))
    # generate root.conf
    with open(join(dest, f'{domain}.conf'), 'w') as f:
        root_config = config.copy()
        root_config["CRUPEST_V2RAY_TOKEN"] = config["CRUPEST_V2RAY_TOKEN"]
        root_config["CRUPEST_V2RAY_PATH"] = config["CRUPEST_V2RAY_PATH"]
        f.write(root_template.generate(config))
    # generate nginx config for each site
    sites: list = server["sites"]
    for site in sites:
        subdomain = site["subdomain"]
        local_config = config.copy()
        local_config['CRUPEST_NGINX_SUBDOMAIN'] = subdomain
        if site["type"] == 'static-file':
            template = static_file_template
            local_config['CRUPEST_NGINX_ROOT'] = site["root"]
        elif site["type"] == 'reverse-proxy':
            template = reverse_proxy_template
            local_config['CRUPEST_NGINX_UPSTREAM_SERVER'] = site["upstream"]
        elif site["type"] == 'redirect':
            template = redirect_template
            local_config['CRUPEST_NGINX_URL'] = site["url"]
        elif site["type"] == 'cert-only':
            template = cert_only_template
        else:
            raise Exception('Invalid site type')
        with open(join(dest, f'{subdomain}.{domain}.conf'), 'w') as f:
            f.write(template.generate(local_config))


def check_nginx_config_dir(dir_path: str, domain: str) -> list:
    if not exists(dir_path):
        return []
    good_files = [*non_template_files, "ssl.conf", *
                  [f"{full_domain}.conf" for full_domain in list_domains(domain)]]
    bad_files = []
    for path in os.listdir(dir_path):
        file_name = basename(path)
        if file_name not in good_files:
            bad_files.append(file_name)
    return bad_files


def restart_nginx(force=False) -> bool:
    if not force:
        p = subprocess.run(['docker', "container", "ls",
                           "-f", "name=nginx", "-q"], capture_output=True)
        container: str = p.stdout.decode("utf-8")
        if len(container.strip()) == 0:
            return False
    subprocess.run(['docker', 'restart', 'nginx'])
    return True


def nginx(domain: str, config, /, console) -> None:
    bad_files = check_nginx_config_dir(nginx_config_dir, domain)
    if len(bad_files) > 0:
        console.print(
            "WARNING: It seems there are some bad conf files in the nginx config directory:", style="yellow")
        for bad_file in bad_files:
            console.print(bad_file, style="cyan")
        to_delete = Confirm.ask(
            "They will affect nginx in a [red]bad[/] way. Do you want to delete them?", default=True, console=console)
        if to_delete:
            for file in bad_files:
                os.remove(join(nginx_config_dir, file))
    console.print(
        "I have found following var in nginx templates:", style="green")
    for var in nginx_var_set:
        console.print(var, style="magenta")
    if not exists(nginx_config_dir):
        os.mkdir(nginx_config_dir)
        console.print(
            f"Nginx config directory created at [magenta]{nginx_config_dir}[/]", style="green")
    generate_nginx_config(domain, config, dest=nginx_config_dir)
    console.print("Nginx config generated.", style="green")
    if restart_nginx():
        console.print('Nginx restarted.', style="green")


def certbot_command_gen(domain: str, action, /, test=False, no_docker=False, *, standalone=None, email=None, agree_tos=False) -> str:
    domains = list_domains(domain)

    add_domain_option = True
    if action == 'create':
        if standalone == None:
            standalone = True
        certbot_action = "certonly"
    elif action == 'expand':
        if standalone == None:
            standalone = False
        certbot_action = "certonly"
    elif action == 'renew':
        if standalone == None:
            standalone = False
        add_domain_option = False
        certbot_action = "renew"
    else:
        raise ValueError('Invalid action')

    if no_docker:
        command = "certbot "
    else:
        expose_segment = ' -p "0.0.0.0:80:80"'
        web_root_segment = ' -v "{project_abs_path}/data/certbot/webroot:/var/www/certbot"'
        command = f'docker run -it --rm --name certbot -v "{project_abs_path}/data/certbot/certs:/etc/letsencrypt" -v "{project_abs_path}/data/certbot/data:/var/lib/letsencrypt"{ expose_segment if  standalone else web_root_segment} certbot/certbot '

    command += certbot_action

    if standalone:
        command += " --standalone"
    else:
        command += ' --webroot -w /var/www/certbot'

    if add_domain_option:
        command += f' -d {" -d ".join(domains)}'

    if email is not None:
        command += f' --email {email}'

    if agree_tos:
        command += ' --agree-tos'

    if test:
        command += " --test-cert --dry-run"

    return command


def get_cert_path(root_domain):
    return join(data_dir, "certbot", "certs", "live", root_domain, "fullchain.pem")


def get_cert_domains(cert_path, root_domain):

    if not exists(cert_path):
        return None

    if not isfile(cert_path):
        return None

    with open(cert_path, 'rb') as f:
        cert = load_pem_x509_certificate(f.read())
        ext = cert.extensions.get_extension_for_oid(
            ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
        domains: list[str] = cast(
                SubjectAlternativeName, ext.value).get_values_for_type(DNSName)
        domains.remove(root_domain)
        domains = [root_domain, *domains]
        return domains


def print_create_cert_message(domain, console):
    console.print(
        "Looks like you haven't run certbot to get the init ssl certificates. You may want to run following code to get one:", style="cyan")
    console.print(certbot_command_gen(domain, "create"),
                  soft_wrap=True, highlight=False)


def check_ssl_cert(domain, console):
    cert_path = get_cert_path(domain)
    tmp_cert_path = join(tmp_dir, "fullchain.pem")
    console.print("Temporarily copy cert to tmp...", style="yellow")
    subprocess.run(
        ["sudo", "cp", cert_path, tmp_cert_path], check=True)
    subprocess.run(["sudo", "chown", str(os.geteuid()),
                   tmp_cert_path], check=True)
    cert_domains = get_cert_domains(tmp_cert_path, domain)
    if cert_domains is None:
        print_create_cert_message(domain, console)
    else:
        cert_domain_set = set(cert_domains)
        domains = set(list_domains(domain))
        if not cert_domain_set == domains:
            console.print(
                "Cert domains are not equal to host domains. Run following command to recreate it with nginx stopped.", style="red")
            console.print(certbot_command_gen(
                domain, "create", standalone=True), soft_wrap=True, highlight=False)
        console.print("Remove tmp cert...", style="yellow")
        os.remove(tmp_cert_path)