diff options
Diffstat (limited to 'tools/cru-py/cru/service/_nginx.py')
-rw-r--r-- | tools/cru-py/cru/service/_nginx.py | 268 |
1 files changed, 0 insertions, 268 deletions
diff --git a/tools/cru-py/cru/service/_nginx.py b/tools/cru-py/cru/service/_nginx.py deleted file mode 100644 index 6c77971..0000000 --- a/tools/cru-py/cru/service/_nginx.py +++ /dev/null @@ -1,268 +0,0 @@ -from argparse import Namespace -from enum import Enum, auto -import re -import subprocess -from typing import TypeAlias - -from cru import CruInternalError - -from ._base import AppCommandFeatureProvider -from ._config import ConfigManager -from ._template import TemplateManager - - -class CertbotAction(Enum): - CREATE = auto() - EXPAND = auto() - SHRINK = auto() - RENEW = auto() - - -class NginxManager(AppCommandFeatureProvider): - CertbotAction: TypeAlias = CertbotAction - - def __init__(self) -> None: - super().__init__("nginx-manager") - self._domains_cache: list[str] | None = None - - def setup(self) -> None: - pass - - @property - def _config_manager(self) -> ConfigManager: - return self.app.get_feature(ConfigManager) - - @property - def root_domain(self) -> str: - return self._config_manager.get_domain_value_str() - - @property - def domains(self) -> list[str]: - if self._domains_cache is None: - self._domains_cache = self._get_domains() - return self._domains_cache - - @property - def subdomains(self) -> list[str]: - suffix = "." + self.root_domain - return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)] - - @property - def _domain_config_name(self) -> str: - return self._config_manager.domain_item_name - - def _get_domains_from_text(self, text: str) -> set[str]: - domains: set[str] = set() - regex = re.compile(r"server_name\s+(\S+)\s*;") - for match in regex.finditer(text): - domains.add(match[1]) - return domains - - def _join_generated_nginx_conf_text(self) -> str: - text = "" - template_manager = self.app.get_feature(TemplateManager) - for nginx_conf in template_manager.generate(): - text += nginx_conf[1] - return text - - def _get_domains(self) -> list[str]: - text = self._join_generated_nginx_conf_text() - domains = list(self._get_domains_from_text(text)) - domains.remove(self.root_domain) - return [self.root_domain, *domains] - - def _print_domains(self) -> None: - for domain in self.domains: - print(domain) - - def _certbot_command( - self, - action: CertbotAction | str, - test: bool, - *, - docker=True, - standalone=None, - email=None, - agree_tos=True, - ) -> str: - if isinstance(action, str): - action = CertbotAction[action.upper()] - - command_args = [] - - add_domain_option = True - if action is CertbotAction.CREATE: - if standalone is None: - standalone = True - command_action = "certonly" - elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]: - if standalone is None: - standalone = False - command_action = "certonly" - elif action is CertbotAction.RENEW: - if standalone is None: - standalone = False - add_domain_option = False - command_action = "renew" - else: - raise CruInternalError("Invalid certbot action.") - - data_dir = self.app.data_dir.full_path.as_posix() - - if not docker: - command_args.append("certbot") - else: - command_args.extend( - [ - "docker run -it --rm --name certbot", - f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"', - f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"', - ] - ) - if standalone: - command_args.append('-p "0.0.0.0:80:80"') - else: - command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"') - - command_args.append("certbot/certbot") - - command_args.append(command_action) - - command_args.append(f"--cert-name {self.root_domain}") - - if standalone: - command_args.append("--standalone") - else: - command_args.append("--webroot -w /var/www/certbot") - - if add_domain_option: - command_args.append(" ".join([f"-d {domain}" for domain in self.domains])) - - if email is not None: - command_args.append(f"--email {email}") - - if agree_tos: - command_args.append("--agree-tos") - - if test: - command_args.append("--test-cert --dry-run") - - return " ".join(command_args) - - def print_all_certbot_commands(self, test: bool): - print("### COMMAND: (standalone) create certs") - print( - self._certbot_command( - CertbotAction.CREATE, - test, - email=self._config_manager.get_email_value_str_optional(), - ) - ) - print() - print("### COMMAND: (webroot+nginx) expand or shrink certs") - print( - self._certbot_command( - CertbotAction.EXPAND, - test, - email=self._config_manager.get_email_value_str_optional(), - ) - ) - print() - print("### COMMAND: (webroot+nginx) renew certs") - print( - self._certbot_command( - CertbotAction.RENEW, - test, - email=self._config_manager.get_email_value_str_optional(), - ) - ) - - @property - def _cert_path_str(self) -> str: - return str( - self.app.data_dir.full_path - / "certbot/certs/live" - / self.root_domain - / "fullchain.pem" - ) - - def get_command_info(self): - return "nginx", "Manage nginx related things." - - def setup_arg_parser(self, arg_parser): - subparsers = arg_parser.add_subparsers( - dest="nginx_command", required=True, metavar="NGINX_COMMAND" - ) - _list_parser = subparsers.add_parser("list", help="list domains") - certbot_parser = subparsers.add_parser("certbot", help="print certbot commands") - certbot_parser.add_argument( - "--no-test", - action="store_true", - help="remove args making certbot run in test mode", - ) - - def run_command(self, args: Namespace) -> None: - if args.nginx_command == "list": - self._print_domains() - elif args.nginx_command == "certbot": - self.print_all_certbot_commands(not args.no_test) - - def _generate_dns_zone( - self, - ip: str, - /, - ttl: str | int = 600, - *, - enable_mail: bool = True, - dkim: str | None = None, - ) -> str: - # TODO: Not complete and test now. - root_domain = self.root_domain - result = f"$ORIGIN {root_domain}.\n\n" - result += "; A records\n" - result += f"@ {ttl} IN A {ip}\n" - for subdomain in self.subdomains: - result += f"{subdomain} {ttl} IN A {ip}\n" - - if enable_mail: - result += "\n; MX records\n" - result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n" - result += "\n; SPF record\n" - result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n' - if dkim is not None: - result += "\n; DKIM record\n" - result += f'mail._domainkey {ttl} IN TEXT "{dkim}"' - result += "\n; DMARC record\n" - dmarc_options = [ - "v=DMARC1", - "p=none", - f"rua=mailto:dmarc.report@{root_domain}", - f"ruf=mailto:dmarc.report@{root_domain}", - "sp=none", - "ri=86400", - ] - result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n' - return result - - def _get_dkim_from_mailserver(self) -> str | None: - # TODO: Not complete and test now. - dkim_path = ( - self.app.data_dir.full_path - / "dms/config/opendkim/keys" - / self.root_domain - / "mail.txt" - ) - if not dkim_path.exists(): - return None - - p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True) - value = "" - for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")): - value += match.group(1) - return value - - def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str: - # TODO: Not complete and test now. - return self._generate_dns_zone( - ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver() - ) |