aboutsummaryrefslogtreecommitdiff
path: root/tools/cru-py/cru/service/_nginx.py
diff options
context:
space:
mode:
authorcrupest <crupest@outlook.com>2024-11-11 01:12:29 +0800
committerYuqian Yang <crupest@crupest.life>2025-01-20 22:34:18 +0800
commit5c76a1257b4a058bf919af3e31cc9461a39c2f33 (patch)
treecb32f0c22e5438a0ed9de4b29f58d0b7f142a58d /tools/cru-py/cru/service/_nginx.py
parent12e1272508ba0b5909069319007d677c1c76e355 (diff)
downloadcrupest-5c76a1257b4a058bf919af3e31cc9461a39c2f33.tar.gz
crupest-5c76a1257b4a058bf919af3e31cc9461a39c2f33.tar.bz2
crupest-5c76a1257b4a058bf919af3e31cc9461a39c2f33.zip
HALF WORK: 2024.1.20 - 2
Diffstat (limited to 'tools/cru-py/cru/service/_nginx.py')
-rw-r--r--tools/cru-py/cru/service/_nginx.py227
1 files changed, 208 insertions, 19 deletions
diff --git a/tools/cru-py/cru/service/_nginx.py b/tools/cru-py/cru/service/_nginx.py
index ad29d21..a9013e2 100644
--- a/tools/cru-py/cru/service/_nginx.py
+++ b/tools/cru-py/cru/service/_nginx.py
@@ -1,36 +1,55 @@
from argparse import Namespace
+from enum import Enum, auto
import re
+import subprocess
+from typing import TypeAlias
+
+from cru import CruInternalError
from ._base import AppCommandFeatureProvider
from ._config import ConfigManager
from ._template import TemplateManager
+class CertbotAction(Enum):
+ CREATE = auto()
+ EXPAND = auto()
+ SHRINK = auto()
+ RENEW = auto()
+
+
class NginxManager(AppCommandFeatureProvider):
+ CertbotAction: TypeAlias = CertbotAction
+
def __init__(self) -> None:
super().__init__("nginx-manager")
self._domains_cache: list[str] | None = None
- self._domain_config_value_cache: str | None = None
def setup(self) -> None:
pass
@property
+ def _config_manager(self) -> ConfigManager:
+ return self.app.get_feature(ConfigManager)
+
+ @property
+ def root_domain(self) -> str:
+ return self._config_manager.get_domain_value_str()
+
+ @property
def domains(self) -> list[str]:
if self._domains_cache is None:
self._domains_cache = self._get_domains()
return self._domains_cache
@property
- def _domain_config_name(self) -> str:
- return self.app.get_feature(ConfigManager).get_domain_item_name()
+ def subdomains(self) -> list[str]:
+ suffix = "." + self.root_domain
+ return [d[: -len(suffix)] for d in self.domains if d.endswith(suffix)]
- def _get_domain_config_value(self) -> str:
- if self._domain_config_value_cache is None:
- self._domain_config_value_cache = self.app.get_feature(
- ConfigManager
- ).get_item_value_str(self._domain_config_name)
- return self._domain_config_value_cache
+ @property
+ def _domain_config_name(self) -> str:
+ return self._config_manager.domain_item_name
def _get_domains_from_text(self, text: str) -> set[str]:
domains: set[str] = set()
@@ -42,17 +61,11 @@ class NginxManager(AppCommandFeatureProvider):
for match in regex.finditer(text):
domain_part = match.group(1)
if domain_variable_str in domain_part:
- domains.add(
- domain_part.replace(
- domain_variable_str, self._get_domain_config_value()
- )
- )
+ domains.add(domain_part.replace(domain_variable_str, self.root_domain))
continue
m = brace_domain_variable_regex.search(domain_part)
if m:
- domains.add(
- domain_part.replace(m.group(0), self._get_domain_config_value())
- )
+ domains.add(domain_part.replace(m.group(0), self.root_domain))
continue
domains.add(domain_part)
return domains
@@ -68,13 +81,123 @@ class NginxManager(AppCommandFeatureProvider):
def _get_domains(self) -> list[str]:
text = self._get_nginx_conf_template_text()
domains = list(self._get_domains_from_text(text))
- domains.remove(self._get_domain_config_value())
- return [self._get_domain_config_value(), *domains]
+ domains.remove(self.root_domain)
+ return [self.root_domain, *domains]
def _print_domains(self) -> None:
for domain in self.domains:
print(domain)
+ def _certbot_command(
+ self,
+ action: CertbotAction | str,
+ /,
+ test=False,
+ no_docker=False,
+ *,
+ standalone=None,
+ email=None,
+ agree_tos=True,
+ ) -> str:
+ if isinstance(action, str):
+ action = CertbotAction[action.upper()]
+
+ command_args = []
+
+ add_domain_option = True
+ if action is CertbotAction.CREATE:
+ if standalone is None:
+ standalone = True
+ command_action = "certonly"
+ elif action in [CertbotAction.EXPAND, CertbotAction.SHRINK]:
+ if standalone is None:
+ standalone = False
+ command_action = "certonly"
+ elif action is CertbotAction.RENEW:
+ if standalone is None:
+ standalone = False
+ add_domain_option = False
+ command_action = "renew"
+ else:
+ raise CruInternalError("Invalid certbot action.")
+
+ data_dir = self.app.data_dir.full_path.as_posix()
+
+ if no_docker:
+ command_args.append("certbot")
+ else:
+ command_args.extend(
+ [
+ "docker run -it --rm --name certbot",
+ f'-v "{data_dir}/certbot/certs:/etc/letsencrypt"',
+ f'-v "{data_dir}/certbot/data:/var/lib/letsencrypt"',
+ ]
+ )
+ if standalone:
+ command_args.append('-p "0.0.0.0:80:80"')
+ else:
+ command_args.append(f'-v "{data_dir}/certbot/webroot:/var/www/certbot"')
+
+ command_args.append("certbot/certbot")
+
+ command_args.append(command_action)
+
+ if standalone:
+ command_args.append("--standalone")
+ else:
+ command_args.append("--webroot -w /var/www/certbot")
+
+ if add_domain_option:
+ command_args.append(" ".join([f"-d {domain}" for domain in self.domains]))
+
+ if email is not None:
+ command_args.append(f"--email {email}")
+
+ if agree_tos:
+ command_args.append("--agree-tos")
+
+ if test:
+ command_args.append("--test-cert --dry-run")
+
+ return " ".join(command_args)
+
+ def print_all_certbot_commands(self, test: bool):
+ print("### COMMAND: (standalone) create certs")
+ print(
+ self._certbot_command(
+ CertbotAction.CREATE,
+ test,
+ email=self._config_manager.get_email_value_str_optional(),
+ )
+ )
+ print()
+ print("### COMMAND: (webroot+nginx) expand or shrink certs")
+ print(
+ self._certbot_command(
+ CertbotAction.EXPAND,
+ test,
+ email=self._config_manager.get_email_value_str_optional(),
+ )
+ )
+ print()
+ print("### COMMAND: (webroot+nginx) renew certs")
+ print(
+ self._certbot_command(
+ CertbotAction.RENEW,
+ test,
+ email=self._config_manager.get_email_value_str_optional(),
+ )
+ )
+
+ @property
+ def _cert_path_str(self) -> str:
+ return str(
+ self.app.data_dir.full_path
+ / "certbot/certs/live"
+ / self.root_domain
+ / "fullchain.pem"
+ )
+
def get_command_info(self):
return "nginx", "Manage nginx related things."
@@ -83,7 +206,73 @@ class NginxManager(AppCommandFeatureProvider):
dest="nginx_command", required=True, metavar="NGINX_COMMAND"
)
_list_parser = subparsers.add_parser("list", help="list domains")
+ certbot_parser = subparsers.add_parser("certbot", help="print certbot commands")
+ certbot_parser.add_argument(
+ "-t", "--test", action="store_true", help="run certbot in test mode"
+ )
def run_command(self, args: Namespace) -> None:
if args.nginx_command == "list":
self._print_domains()
+ elif args.nginx_command == "certbot":
+ self.print_all_certbot_commands(args.test)
+
+ def _generate_dns_zone(
+ self,
+ ip: str,
+ /,
+ ttl: str | int = 600,
+ *,
+ enable_mail: bool = True,
+ dkim: str | None = None,
+ ) -> str:
+ # TODO: Not complete and test now.
+ root_domain = self.root_domain
+ result = f"$ORIGIN {root_domain}.\n\n"
+ result += "; A records\n"
+ result += f"@ {ttl} IN A {ip}\n"
+ for subdomain in self.subdomains:
+ result += f"{subdomain} {ttl} IN A {ip}\n"
+
+ if enable_mail:
+ result += "\n; MX records\n"
+ result += f"@ {ttl} IN MX 10 mail.{root_domain}.\n"
+ result += "\n; SPF record\n"
+ result += f'@ {ttl} IN TXT "v=spf1 mx ~all"\n'
+ if dkim is not None:
+ result += "\n; DKIM record\n"
+ result += f'mail._domainkey {ttl} IN TEXT "{dkim}"'
+ result += "\n; DMARC record\n"
+ dmarc_options = [
+ "v=DMARC1",
+ "p=none",
+ f"rua=mailto:dmarc.report@{root_domain}",
+ f"ruf=mailto:dmarc.report@{root_domain}",
+ "sp=none",
+ "ri=86400",
+ ]
+ result += f'_dmarc {ttl} IN TXT "{"; ".join(dmarc_options)}"\n'
+ return result
+
+ def _get_dkim_from_mailserver(self) -> str | None:
+ # TODO: Not complete and test now.
+ dkim_path = (
+ self.app.data_dir.full_path
+ / "dms/config/opendkim/keys"
+ / self.root_domain
+ / "mail.txt"
+ )
+ if not dkim_path.exists():
+ return None
+
+ p = subprocess.run(["sudo", "cat", dkim_path], capture_output=True, check=True)
+ value = ""
+ for match in re.finditer('"(.*)"', p.stdout.decode("utf-8")):
+ value += match.group(1)
+ return value
+
+ def _generate_dns_zone_with_dkim(self, ip: str, /, ttl: str | int = 600) -> str:
+ # TODO: Not complete and test now.
+ return self._generate_dns_zone(
+ ip, ttl, enable_mail=True, dkim=self._get_dkim_from_mailserver()
+ )