aboutsummaryrefslogtreecommitdiff
path: root/tools/cru-py/cru
diff options
context:
space:
mode:
authorYuqian Yang <crupest@crupest.life>2025-02-20 17:52:32 +0800
committerYuqian Yang <crupest@crupest.life>2025-02-20 18:02:19 +0800
commite870972428794f51912dfa955c6de0d712c74db1 (patch)
tree39d5a6118e31f211c3cc511d73618bca4082578e /tools/cru-py/cru
parent75df0f7c4eaec0d50157ea8dc048241465da9c6c (diff)
downloadcrupest-e870972428794f51912dfa955c6de0d712c74db1.tar.gz
crupest-e870972428794f51912dfa955c6de0d712c74db1.tar.bz2
crupest-e870972428794f51912dfa955c6de0d712c74db1.zip
feat(cru-py): use new template format.
Diffstat (limited to 'tools/cru-py/cru')
-rw-r--r--tools/cru-py/cru/_iter.py3
-rw-r--r--tools/cru-py/cru/parsing.py192
-rw-r--r--tools/cru-py/cru/service/_nginx.py25
-rw-r--r--tools/cru-py/cru/service/_template.py18
-rw-r--r--tools/cru-py/cru/template.py166
5 files changed, 322 insertions, 82 deletions
diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py
index 8f58561..f9683ca 100644
--- a/tools/cru-py/cru/_iter.py
+++ b/tools/cru-py/cru/_iter.py
@@ -445,6 +445,9 @@ class CruIterator(Generic[_T]):
return result
+ def join_str(self: CruIterator[str], separator: str) -> str:
+ return separator.join(self)
+
class CruIterMixin(Generic[_T]):
def cru_iter(self: Iterable[_T]) -> CruIterator[_T]:
diff --git a/tools/cru-py/cru/parsing.py b/tools/cru-py/cru/parsing.py
index 1d2fa7f..c31ce35 100644
--- a/tools/cru-py/cru/parsing.py
+++ b/tools/cru-py/cru/parsing.py
@@ -1,6 +1,8 @@
from __future__ import annotations
from abc import ABCMeta, abstractmethod
+from dataclasses import dataclass
+from enum import Enum
from typing import NamedTuple, TypeAlias, TypeVar, Generic, NoReturn, Callable
from ._error import CruException
@@ -9,6 +11,102 @@ from ._iter import CruIterable
_T = TypeVar("_T")
+class StrParseStream:
+ class MemStackEntry(NamedTuple):
+ pos: int
+ lineno: int
+
+ class MemStackPopStr(NamedTuple):
+ text: str
+ lineno: int
+
+ def __init__(self, text: str) -> None:
+ self._text = text
+ self._pos = 0
+ self._lineno = 1
+ self._length = len(self._text)
+ self._valid_pos_range = range(0, self.length + 1)
+ self._valid_offset_range = range(-self.length, self.length + 1)
+ self._mem_stack: CruIterable.IterList[StrParseStream.MemStackEntry] = (
+ CruIterable.IterList()
+ )
+
+ @property
+ def text(self) -> str:
+ return self._text
+
+ @property
+ def length(self) -> int:
+ return self._length
+
+ @property
+ def valid_pos_range(self) -> range:
+ return self._valid_pos_range
+
+ @property
+ def valid_offset_range(self) -> range:
+ return self._valid_offset_range
+
+ @property
+ def pos(self) -> int:
+ return self._pos
+
+ @property
+ def lineno(self) -> int:
+ return self._lineno
+
+ @property
+ def eof(self) -> bool:
+ return self._pos == self.length
+
+ def peek(self, length: int) -> str:
+ real_length = min(length, self.length - self._pos)
+ new_position = self._pos + real_length
+ text = self._text[self._pos : new_position]
+ return text
+
+ def read(self, length: int) -> str:
+ text = self.peek(length)
+ self._pos += len(text)
+ self._lineno += text.count("\n")
+ return text
+
+ def skip(self, length: int) -> None:
+ self.read(length)
+
+ def peek_str(self, text: str) -> bool:
+ if self.pos + len(text) > self.length:
+ return False
+ for offset in range(len(text)):
+ if self._text[self.pos + offset] != text[offset]:
+ return False
+ return True
+
+ def read_str(self, text: str) -> bool:
+ if not self.peek_str(text):
+ return False
+ self._pos += len(text)
+ self._lineno += text.count("\n")
+ return True
+
+ @property
+ def mem_stack(self) -> CruIterable.IterList[MemStackEntry]:
+ return self._mem_stack
+
+ def push_mem(self) -> None:
+ self.mem_stack.append(self.MemStackEntry(self.pos, self.lineno))
+
+ def pop_mem(self) -> MemStackEntry:
+ return self.mem_stack.pop()
+
+ def pop_mem_str(self, strip_end: int = 0) -> MemStackPopStr:
+ old = self.pop_mem()
+ assert self.pos >= old.pos
+ return self.MemStackPopStr(
+ self._text[old.pos : self.pos - strip_end], old.lineno
+ )
+
+
class ParseError(CruException, Generic[_T]):
def __init__(
self,
@@ -96,3 +194,97 @@ class SimpleLineConfigParser(Parser[SimpleLineConfigParserResult]):
result = SimpleLineConfigParserResult()
self._parse(text, lambda item: result.append(item))
return result
+
+
+class _StrWrapperVarParserTokenKind(Enum):
+ TEXT = "TEXT"
+ VAR = "VAR"
+
+
+@dataclass
+class _StrWrapperVarParserToken:
+ kind: _StrWrapperVarParserTokenKind
+ value: str
+ line_number: int
+
+ @property
+ def is_text(self) -> bool:
+ return self.kind is _StrWrapperVarParserTokenKind.TEXT
+
+ @property
+ def is_var(self) -> bool:
+ return self.kind is _StrWrapperVarParserTokenKind.VAR
+
+ @staticmethod
+ def from_mem_str(
+ kind: _StrWrapperVarParserTokenKind, mem_str: StrParseStream.MemStackPopStr
+ ) -> _StrWrapperVarParserToken:
+ return _StrWrapperVarParserToken(kind, mem_str.text, mem_str.lineno)
+
+ def __repr__(self) -> str:
+ return f"VAR: {self.value}" if self.is_var else "TEXT: ..."
+
+
+class _StrWrapperVarParserResult(CruIterable.IterList[_StrWrapperVarParserToken]):
+ pass
+
+
+class StrWrapperVarParser(Parser[_StrWrapperVarParserResult]):
+ TokenKind: TypeAlias = _StrWrapperVarParserTokenKind
+ Token: TypeAlias = _StrWrapperVarParserToken
+ Result: TypeAlias = _StrWrapperVarParserResult
+
+ def __init__(self, wrapper: str):
+ super().__init__(f"StrWrapperVarParser({wrapper})")
+ self._wrapper = wrapper
+
+ @property
+ def wrapper(self) -> str:
+ return self._wrapper
+
+ def parse(self, text: str) -> Result:
+ result = self.Result()
+
+ class _State(Enum):
+ TEXT = "TEXT"
+ VAR = "VAR"
+
+ state = _State.TEXT
+ stream = StrParseStream(text)
+ stream.push_mem()
+
+ while True:
+ if stream.eof:
+ break
+
+ if stream.read_str(self.wrapper):
+ if state is _State.TEXT:
+ result.append(
+ self.Token.from_mem_str(
+ self.TokenKind.TEXT, stream.pop_mem_str(len(self.wrapper))
+ )
+ )
+ state = _State.VAR
+ stream.push_mem()
+ else:
+ result.append(
+ self.Token.from_mem_str(
+ self.TokenKind.VAR,
+ stream.pop_mem_str(len(self.wrapper)),
+ )
+ )
+ state = _State.TEXT
+ stream.push_mem()
+
+ continue
+
+ stream.skip(1)
+
+ if state is _State.VAR:
+ raise ParseError("Text ended without closing variable.", self, text)
+
+ mem_str = stream.pop_mem_str()
+ if len(mem_str.text) != 0:
+ result.append(self.Token.from_mem_str(self.TokenKind.TEXT, mem_str))
+
+ return result
diff --git a/tools/cru-py/cru/service/_nginx.py b/tools/cru-py/cru/service/_nginx.py
index e0a9c60..6c77971 100644
--- a/tools/cru-py/cru/service/_nginx.py
+++ b/tools/cru-py/cru/service/_nginx.py
@@ -54,32 +54,19 @@ class NginxManager(AppCommandFeatureProvider):
def _get_domains_from_text(self, text: str) -> set[str]:
domains: set[str] = set()
regex = re.compile(r"server_name\s+(\S+)\s*;")
- domain_variable_str = f"${self._domain_config_name}"
- brace_domain_variable_regex = re.compile(
- r"\$\{\s*" + self._domain_config_name + r"\s*\}"
- )
for match in regex.finditer(text):
- domain_part = match.group(1)
- if domain_variable_str in domain_part:
- domains.add(domain_part.replace(domain_variable_str, self.root_domain))
- continue
- m = brace_domain_variable_regex.search(domain_part)
- if m:
- domains.add(domain_part.replace(m.group(0), self.root_domain))
- continue
- domains.add(domain_part)
+ domains.add(match[1])
return domains
- def _get_nginx_conf_template_text(self) -> str:
- template_manager = self.app.get_feature(TemplateManager)
+ def _join_generated_nginx_conf_text(self) -> str:
text = ""
- for path, template in template_manager.template_tree.templates:
- if path.as_posix().startswith("nginx/"):
- text += template.raw_text
+ template_manager = self.app.get_feature(TemplateManager)
+ for nginx_conf in template_manager.generate():
+ text += nginx_conf[1]
return text
def _get_domains(self) -> list[str]:
- text = self._get_nginx_conf_template_text()
+ text = self._join_generated_nginx_conf_text()
domains = list(self._get_domains_from_text(text))
domains.remove(self.root_domain)
return [self.root_domain, *domains]
diff --git a/tools/cru-py/cru/service/_template.py b/tools/cru-py/cru/service/_template.py
index 170116c..1381700 100644
--- a/tools/cru-py/cru/service/_template.py
+++ b/tools/cru-py/cru/service/_template.py
@@ -1,8 +1,8 @@
from argparse import Namespace
+from pathlib import Path
import shutil
-from cru import CruIterator
-from cru.template import TemplateTree
+from cru.template import TemplateTree, CruStrWrapperTemplate
from ._base import AppCommandFeatureProvider, AppFeaturePath
from ._config import ConfigManager
@@ -16,7 +16,7 @@ class TemplateManager(AppCommandFeatureProvider):
def setup(self) -> None:
self._templates_dir = self.app.add_path("templates", True)
self._generated_dir = self.app.add_path("generated", True)
- self._template_tree: TemplateTree | None = None
+ self._template_tree: TemplateTree[CruStrWrapperTemplate] | None = None
@property
def prefix(self) -> str:
@@ -31,20 +31,24 @@ class TemplateManager(AppCommandFeatureProvider):
return self._generated_dir
@property
- def template_tree(self) -> TemplateTree:
+ def template_tree(self) -> TemplateTree[CruStrWrapperTemplate]:
if self._template_tree is None:
return self.reload()
return self._template_tree
def reload(self) -> TemplateTree:
self._template_tree = TemplateTree(
- self.prefix, self.templates_dir.full_path_str
+ lambda text: CruStrWrapperTemplate(text), self.templates_dir.full_path_str
)
return self._template_tree
def _print_file_lists(self) -> None:
- for file in CruIterator(self.template_tree.templates).transform(lambda t: t[0]):
- print(file.as_posix())
+ for path, template in self.template_tree.templates:
+ print(f"[{template.variable_count}]", path.as_posix())
+
+ def generate(self) -> list[tuple[Path, str]]:
+ config_manager = self.app.get_feature(ConfigManager)
+ return self.template_tree.generate(config_manager.get_str_dict())
def _generate_files(self, dry_run: bool) -> None:
config_manager = self.app.get_feature(ConfigManager)
diff --git a/tools/cru-py/cru/template.py b/tools/cru-py/cru/template.py
index 6749cab..35d68ac 100644
--- a/tools/cru-py/cru/template.py
+++ b/tools/cru-py/cru/template.py
@@ -1,74 +1,124 @@
-from collections.abc import Mapping
-import os
-import os.path
+from abc import ABCMeta, abstractmethod
+from collections.abc import Callable, Mapping
from pathlib import Path
from string import Template
+from typing import Generic, TypeVar
from ._iter import CruIterator
from ._error import CruException
+from .parsing import StrWrapperVarParser
+
class CruTemplateError(CruException):
pass
-class CruTemplate:
+class CruTemplateBase(metaclass=ABCMeta):
+ def __init__(self, text: str):
+ self._text = text
+ self._variables: set[str] | None = None
+
+ @abstractmethod
+ def _get_variables(self) -> set[str]:
+ raise NotImplementedError()
+
+ @property
+ def text(self) -> str:
+ return self._text
+
+ @property
+ def variables(self) -> set[str]:
+ if self._variables is None:
+ self._variables = self._get_variables()
+ return self._variables
+
+ @property
+ def variable_count(self) -> int:
+ return len(self.variables)
+
+ @property
+ def has_variables(self) -> bool:
+ return self.variable_count > 0
+
+ @abstractmethod
+ def _do_generate(self, mapping: dict[str, str]) -> str:
+ raise NotImplementedError()
+
+ def generate(self, mapping: Mapping[str, str], allow_extra: bool = True) -> str:
+ values = dict(mapping)
+ if not self.variables <= set(values.keys()):
+ raise CruTemplateError("Missing variables.")
+ if not allow_extra and not set(values.keys()) <= self.variables:
+ raise CruTemplateError("Extra variables.")
+ return self._do_generate(values)
+
+
+class CruTemplate(CruTemplateBase):
def __init__(self, prefix: str, text: str):
+ super().__init__(text)
self._prefix = prefix
self._template = Template(text)
- self._variables = (
+
+ def _get_variables(self) -> set[str]:
+ return (
CruIterator(self._template.get_identifiers())
- .filter(lambda i: i.startswith(self._prefix))
+ .filter(lambda i: i.startswith(self.prefix))
.to_set()
)
- self._all_variables = set(self._template.get_identifiers())
@property
def prefix(self) -> str:
return self._prefix
@property
- def raw_text(self) -> str:
- return self._template.template
-
- @property
def py_template(self) -> Template:
return self._template
@property
- def variables(self) -> set[str]:
- return self._variables
-
- @property
def all_variables(self) -> set[str]:
- return self._all_variables
+ return set(self._template.get_identifiers())
+
+ def _do_generate(self, mapping: dict[str, str]) -> str:
+ return self._template.safe_substitute(mapping)
+
+
+class CruStrWrapperTemplate(CruTemplateBase):
+ def __init__(self, text: str, wrapper: str = "@@"):
+ super().__init__(text)
+ self._wrapper = wrapper
+ self._tokens: StrWrapperVarParser.Result
@property
- def has_variables(self) -> bool:
- """
- If the template does not has any variables that starts with the given prefix,
- it returns False. This usually indicates that the template is not a real
- template and should be copied as is. Otherwise, it returns True.
+ def wrapper(self) -> str:
+ return self._wrapper
+
+ def _get_variables(self):
+ self._tokens = StrWrapperVarParser(self.wrapper).parse(self.text)
+ return (
+ self._tokens.cru_iter()
+ .filter(lambda t: t.is_var)
+ .map(lambda t: t.value)
+ .to_set()
+ )
- This can be used as a guard to prevent invalid templates created accidentally
- without notice.
- """
- return len(self.variables) > 0
+ def _do_generate(self, mapping):
+ return (
+ self._tokens.cru_iter()
+ .map(lambda t: mapping[t.value] if t.is_var else t.value)
+ .join_str("")
+ )
- def generate(self, mapping: Mapping[str, str], allow_extra: bool = True) -> str:
- values = dict(mapping)
- if not self.variables <= set(values.keys()):
- raise CruTemplateError("Missing variables.")
- if not allow_extra and not set(values.keys()) <= self.variables:
- raise CruTemplateError("Extra variables.")
- return self._template.safe_substitute(values)
+_Template = TypeVar("_Template", bound=CruTemplateBase)
-class TemplateTree:
+
+class TemplateTree(Generic[_Template]):
def __init__(
self,
- prefix: str,
+ template_generator: Callable[[str], _Template],
source: str,
+ *,
template_file_suffix: str | None = ".template",
):
"""
@@ -80,18 +130,14 @@ class TemplateTree:
If either case is false, it generally means whether the file is a template is
wrongly handled.
"""
- self._prefix = prefix
- self._files: list[tuple[Path, CruTemplate]] = []
+ self._template_generator = template_generator
+ self._files: list[tuple[Path, _Template]] = []
self._source = source
self._template_file_suffix = template_file_suffix
self._load()
@property
- def prefix(self) -> str:
- return self._prefix
-
- @property
- def templates(self) -> list[tuple[Path, CruTemplate]]:
+ def templates(self) -> list[tuple[Path, _Template]]:
return self._files
@property
@@ -103,13 +149,14 @@ class TemplateTree:
return self._template_file_suffix
@staticmethod
- def _scan_files(root_path: str) -> list[Path]:
+ def _scan_files(root: str) -> list[Path]:
+ root_path = Path(root)
result: list[Path] = []
- for root, _dirs, files in os.walk(root_path):
- for file in files:
- path = Path(root, file)
- path = path.relative_to(root_path)
- result.append(Path(path))
+ for path in root_path.glob("**/*"):
+ if not path.is_file():
+ continue
+ path = path.relative_to(root_path)
+ result.append(Path(path))
return result
def _load(self) -> None:
@@ -118,7 +165,7 @@ class TemplateTree:
template_file = Path(self.source) / file_path
with open(template_file, "r") as f:
content = f.read()
- template = CruTemplate(self.prefix, content)
+ template = self._template_generator(content)
if self.template_file_suffix is not None:
should_be_template = file_path.name.endswith(self.template_file_suffix)
if should_be_template and not template.has_variables:
@@ -136,18 +183,25 @@ class TemplateTree:
s.update(template.variables)
return s
- def generate_to(
- self, destination: str, variables: Mapping[str, str], dry_run: bool
- ) -> None:
- for file, template in self.templates:
- des = Path(destination) / file
- if self.template_file_suffix is not None and des.name.endswith(
+ def generate(self, variables: Mapping[str, str]) -> list[tuple[Path, str]]:
+ result: list[tuple[Path, str]] = []
+ for path, template in self.templates:
+ if self.template_file_suffix is not None and path.name.endswith(
self.template_file_suffix
):
- des = des.parent / (des.name[: -len(self.template_file_suffix)])
+ path = path.parent / (path.name[: -len(self.template_file_suffix)])
text = template.generate(variables)
- if not dry_run:
+ result.append((path, text))
+ return result
+
+ def generate_to(
+ self, destination: str, variables: Mapping[str, str], dry_run: bool
+ ) -> None:
+ generated = self.generate(variables)
+ if not dry_run:
+ for path, text in generated:
+ des = Path(destination) / path
des.parent.mkdir(parents=True, exist_ok=True)
with open(des, "w") as f:
f.write(text)