aboutsummaryrefslogtreecommitdiff
path: root/services/manager/service/_nginx.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/manager/service/_nginx.py')
-rw-r--r--services/manager/service/_nginx.py263
1 files changed, 263 insertions, 0 deletions
diff --git a/services/manager/service/_nginx.py b/services/manager/service/_nginx.py
new file mode 100644
index 0000000..5dfc3ab
--- /dev/null
+++ b/services/manager/service/_nginx.py
@@ -0,0 +1,263 @@
+from argparse import Namespace
+from enum import Enum, auto
+import re
+import subprocess
+from typing import TypeAlias
+
+from manager import CruInternalError
+
+from ._base import AppCommandFeatureProvider
+from ._template import TemplateManager
+
+
+class CertbotAction(Enum):
+ CREATE = auto()
+ EXPAND = auto()
+ SHRINK = auto()
+ RENEW = auto()
+
+
+class NginxManager(AppCommandFeatureProvider):
+ CertbotAction: TypeAlias = CertbotAction
+
+ def __init__(self) -> None:
+ super().__init__("nginx-manager")
+ self._domains_cache: list[str] | None = None
+
+ def setup(self) -> None:
+ pass
+
+ @property
+ def _template_manager(self) -> TemplateManager:
+ return self.app.get_feature(TemplateManager)
+
+ @property
+ def root_domain(self) -> str:
+ return self._template_manager.get_domain()
+
+ @property
+ def domains(self) -> list[str]:
+ if self._domains_cache is None:
+ self._domains_cache = self._get_domains()
+ return self._domains_cache
+
+ @property
+ def subdomains(self) -> list[str]:
+ suffix = "." + self.root_domain
+ return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)]
+
+ def _get_domains_from_text(self, text: str) -> set[str]:
+ domains: set[str] = set()
+ regex = re.compile(r"server_name\s+(\S+)\s*;")
+ for match in regex.finditer(text):
+ domains.add(match[1])
+ return domains
+
+ def _join_generated_nginx_conf_text(self) -> str:
+ result = ""
+ for path, text in self._template_manager.generate():
+ if path.parents[-1] == "nginx":
+ result += text
+ return result
+
+ def _get_domains(self) -> list[str]:
+ text = self._join_generated_nginx_conf_text()
+ domains = self._get_domains_from_text(text)
+ domains.remove(self.root_domain)
+ return [self.root_domain, *domains]
+
+ def _print_domains(self) -> None:
+ for domain in self.domains:
+ print(domain)
+
+ def _certbot_command(
+ self,
+ action: CertbotAction | str,
+ test: bool,
+ *,
+ docker=True,
+ standalone=None,
+ email=None,
+ agree_tos=True,
+ ) -> str:
+ if isinstance(action, str):
+ action = CertbotAction[action.upper()]
+
+ command_args = []
+
+ add_domain_option = True
+ if action is CertbotAction.CREATE:
+ if standalone is None:
+ standalone = True
+ command_action = "certonly"
+ elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]:
+ if standalone is None:
+ standalone = False
+ command_action = "certonly"
+ elif action is CertbotAction.RENEW:
+ if standalone is None:
+ standalone = False
+ add_domain_option = False
+ command_action = "renew"
+ else:
+ raise CruInternalError("Invalid certbot action.")
+
+ data_dir = self.app.data_dir.full_path.as_posix()
+
+ if not docker:
+ command_args.append("certbot")
+ else:
+ command_args.extend(
+ [
+ "docker run -it --rm --name certbot",
+ f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"',
+ f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"',
+ ]
+ )
+ if standalone:
+ command_args.append('-p "0.0.0.0:80:80"')
+ else:
+ command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"')
+
+ command_args.append("certbot/certbot")
+
+ command_args.append(command_action)
+
+ command_args.append(f"--cert-name {self.root_domain}")
+
+ if standalone:
+ command_args.append("--standalone")
+ else:
+ command_args.append("--webroot -w /var/www/certbot")
+
+ if add_domain_option:
+ command_args.append(" ".join([f"-d {domain}" for domain in self.domains]))
+
+ if email is not None:
+ command_args.append(f"--email {email}")
+
+ if agree_tos:
+ command_args.append("--agree-tos")
+
+ if test:
+ command_args.append("--test-cert --dry-run")
+
+ return " ".join(command_args)
+
+ def print_all_certbot_commands(self, test: bool):
+ print("### COMMAND: (standalone) create certs")
+ print(
+ self._certbot_command(
+ CertbotAction.CREATE,
+ test,
+ email=self._template_manager.get_email(),
+ )
+ )
+ print()
+ print("### COMMAND: (webroot+nginx) expand or shrink certs")
+ print(
+ self._certbot_command(
+ CertbotAction.EXPAND,
+ test,
+ email=self._template_manager.get_email(),
+ )
+ )
+ print()
+ print("### COMMAND: (webroot+nginx) renew certs")
+ print(
+ self._certbot_command(
+ CertbotAction.RENEW,
+ test,
+ email=self._template_manager.get_email(),
+ )
+ )
+
+ @property
+ def _cert_path_str(self) -> str:
+ return str(
+ self.app.data_dir.full_path
+ / "certbot/certs/live"
+ / self.root_domain
+ / "fullchain.pem"
+ )
+
+ def get_command_info(self):
+ return "nginx", "Manage nginx related things."
+
+ def setup_arg_parser(self, arg_parser):
+ subparsers = arg_parser.add_subparsers(
+ dest="nginx_command", required=True, metavar="NGINX_COMMAND"
+ )
+ _list_parser = subparsers.add_parser("list", help="list domains")
+ certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
+ certbot_parser.add_argument(
+ "--no-test",
+ action="store_true",
+ help="remove args making certbot run in test mode",
+ )
+
+ def run_command(self, args: Namespace) -> None:
+ if args.nginx_command == "list":
+ self._print_domains()
+ elif args.nginx_command == "certbot":
+ self.print_all_certbot_commands(not args.no_test)
+
+ def _generate_dns_zone(
+ self,
+ ip: str,
+ /,
+ ttl: str | int = 600,
+ *,
+ enable_mail: bool = True,
+ dkim: str | None = None,
+ ) -> str:
+ # TODO: Not complete and test now.
+ root_domain = self.root_domain
+ result = f"$ORIGIN {root_domain}.\n\n"
+ result += "; A records\n"
+ result += f"@ {ttl} IN A {ip}\n"
+ for subdomain in self.subdomains:
+ result += f"{subdomain} {ttl} IN A {ip}\n"
+
+ if enable_mail:
+ result += "\n; MX records\n"
+ result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n"
+ result += "\n; SPF record\n"
+ result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n'
+ if dkim is not None:
+ result += "\n; DKIM record\n"
+ result += f'mail._domainkey {ttl} IN TEXT "{dkim}"'
+ result += "\n; DMARC record\n"
+ dmarc_options = [
+ "v=DMARC1",
+ "p=none",
+ f"rua=mailto:dmarc.report@{root_domain}",
+ f"ruf=mailto:dmarc.report@{root_domain}",
+ "sp=none",
+ "ri=86400",
+ ]
+ result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n'
+ return result
+
+ def _get_dkim_from_mailserver(self) -> str | None:
+ # TODO: Not complete and test now.
+ dkim_path = (
+ self.app.data_dir.full_path
+ / "dms/config/opendkim/keys"
+ / self.root_domain
+ / "mail.txt"
+ )
+ if not dkim_path.exists():
+ return None
+
+ p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True)
+ value = ""
+ for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")):
+ value += match.group(1)
+ return value
+
+ def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str:
+ # TODO: Not complete and test now.
+ return self._generate_dns_zone(
+ ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver()
+ )