diff options
author | Yuqian Yang <crupest@crupest.life> | 2025-02-20 17:52:32 +0800 |
---|---|---|
committer | Yuqian Yang <crupest@crupest.life> | 2025-02-20 18:02:19 +0800 |
commit | e870972428794f51912dfa955c6de0d712c74db1 (patch) | |
tree | 39d5a6118e31f211c3cc511d73618bca4082578e /tools/cru-py/cru | |
parent | 75df0f7c4eaec0d50157ea8dc048241465da9c6c (diff) | |
download | crupest-e870972428794f51912dfa955c6de0d712c74db1.tar.gz crupest-e870972428794f51912dfa955c6de0d712c74db1.tar.bz2 crupest-e870972428794f51912dfa955c6de0d712c74db1.zip |
feat(cru-py): use new template format.
Diffstat (limited to 'tools/cru-py/cru')
-rw-r--r-- | tools/cru-py/cru/_iter.py | 3 | ||||
-rw-r--r-- | tools/cru-py/cru/parsing.py | 192 | ||||
-rw-r--r-- | tools/cru-py/cru/service/_nginx.py | 25 | ||||
-rw-r--r-- | tools/cru-py/cru/service/_template.py | 18 | ||||
-rw-r--r-- | tools/cru-py/cru/template.py | 166 |
5 files changed, 322 insertions, 82 deletions
diff --git a/tools/cru-py/cru/_iter.py b/tools/cru-py/cru/_iter.py index 8f58561..f9683ca 100644 --- a/tools/cru-py/cru/_iter.py +++ b/tools/cru-py/cru/_iter.py @@ -445,6 +445,9 @@ class CruIterator(Generic[_T]): return result + def join_str(self: CruIterator[str], separator: str) -> str: + return separator.join(self) + class CruIterMixin(Generic[_T]): def cru_iter(self: Iterable[_T]) -> CruIterator[_T]: diff --git a/tools/cru-py/cru/parsing.py b/tools/cru-py/cru/parsing.py index 1d2fa7f..c31ce35 100644 --- a/tools/cru-py/cru/parsing.py +++ b/tools/cru-py/cru/parsing.py @@ -1,6 +1,8 @@ from __future__ import annotations from abc import ABCMeta, abstractmethod +from dataclasses import dataclass +from enum import Enum from typing import NamedTuple, TypeAlias, TypeVar, Generic, NoReturn, Callable from ._error import CruException @@ -9,6 +11,102 @@ from ._iter import CruIterable _T = TypeVar("_T") +class StrParseStream: + class MemStackEntry(NamedTuple): + pos: int + lineno: int + + class MemStackPopStr(NamedTuple): + text: str + lineno: int + + def __init__(self, text: str) -> None: + self._text = text + self._pos = 0 + self._lineno = 1 + self._length = len(self._text) + self._valid_pos_range = range(0, self.length + 1) + self._valid_offset_range = range(-self.length, self.length + 1) + self._mem_stack: CruIterable.IterList[StrParseStream.MemStackEntry] = ( + CruIterable.IterList() + ) + + @property + def text(self) -> str: + return self._text + + @property + def length(self) -> int: + return self._length + + @property + def valid_pos_range(self) -> range: + return self._valid_pos_range + + @property + def valid_offset_range(self) -> range: + return self._valid_offset_range + + @property + def pos(self) -> int: + return self._pos + + @property + def lineno(self) -> int: + return self._lineno + + @property + def eof(self) -> bool: + return self._pos == self.length + + def peek(self, length: int) -> str: + real_length = min(length, self.length - self._pos) + new_position = self._pos + real_length + text = self._text[self._pos : new_position] + return text + + def read(self, length: int) -> str: + text = self.peek(length) + self._pos += len(text) + self._lineno += text.count("\n") + return text + + def skip(self, length: int) -> None: + self.read(length) + + def peek_str(self, text: str) -> bool: + if self.pos + len(text) > self.length: + return False + for offset in range(len(text)): + if self._text[self.pos + offset] != text[offset]: + return False + return True + + def read_str(self, text: str) -> bool: + if not self.peek_str(text): + return False + self._pos += len(text) + self._lineno += text.count("\n") + return True + + @property + def mem_stack(self) -> CruIterable.IterList[MemStackEntry]: + return self._mem_stack + + def push_mem(self) -> None: + self.mem_stack.append(self.MemStackEntry(self.pos, self.lineno)) + + def pop_mem(self) -> MemStackEntry: + return self.mem_stack.pop() + + def pop_mem_str(self, strip_end: int = 0) -> MemStackPopStr: + old = self.pop_mem() + assert self.pos >= old.pos + return self.MemStackPopStr( + self._text[old.pos : self.pos - strip_end], old.lineno + ) + + class ParseError(CruException, Generic[_T]): def __init__( self, @@ -96,3 +194,97 @@ class SimpleLineConfigParser(Parser[SimpleLineConfigParserResult]): result = SimpleLineConfigParserResult() self._parse(text, lambda item: result.append(item)) return result + + +class _StrWrapperVarParserTokenKind(Enum): + TEXT = "TEXT" + VAR = "VAR" + + +@dataclass +class _StrWrapperVarParserToken: + kind: _StrWrapperVarParserTokenKind + value: str + line_number: int + + @property + def is_text(self) -> bool: + return self.kind is _StrWrapperVarParserTokenKind.TEXT + + @property + def is_var(self) -> bool: + return self.kind is _StrWrapperVarParserTokenKind.VAR + + @staticmethod + def from_mem_str( + kind: _StrWrapperVarParserTokenKind, mem_str: StrParseStream.MemStackPopStr + ) -> _StrWrapperVarParserToken: + return _StrWrapperVarParserToken(kind, mem_str.text, mem_str.lineno) + + def __repr__(self) -> str: + return f"VAR: {self.value}" if self.is_var else "TEXT: ..." + + +class _StrWrapperVarParserResult(CruIterable.IterList[_StrWrapperVarParserToken]): + pass + + +class StrWrapperVarParser(Parser[_StrWrapperVarParserResult]): + TokenKind: TypeAlias = _StrWrapperVarParserTokenKind + Token: TypeAlias = _StrWrapperVarParserToken + Result: TypeAlias = _StrWrapperVarParserResult + + def __init__(self, wrapper: str): + super().__init__(f"StrWrapperVarParser({wrapper})") + self._wrapper = wrapper + + @property + def wrapper(self) -> str: + return self._wrapper + + def parse(self, text: str) -> Result: + result = self.Result() + + class _State(Enum): + TEXT = "TEXT" + VAR = "VAR" + + state = _State.TEXT + stream = StrParseStream(text) + stream.push_mem() + + while True: + if stream.eof: + break + + if stream.read_str(self.wrapper): + if state is _State.TEXT: + result.append( + self.Token.from_mem_str( + self.TokenKind.TEXT, stream.pop_mem_str(len(self.wrapper)) + ) + ) + state = _State.VAR + stream.push_mem() + else: + result.append( + self.Token.from_mem_str( + self.TokenKind.VAR, + stream.pop_mem_str(len(self.wrapper)), + ) + ) + state = _State.TEXT + stream.push_mem() + + continue + + stream.skip(1) + + if state is _State.VAR: + raise ParseError("Text ended without closing variable.", self, text) + + mem_str = stream.pop_mem_str() + if len(mem_str.text) != 0: + result.append(self.Token.from_mem_str(self.TokenKind.TEXT, mem_str)) + + return result diff --git a/tools/cru-py/cru/service/_nginx.py b/tools/cru-py/cru/service/_nginx.py index e0a9c60..6c77971 100644 --- a/tools/cru-py/cru/service/_nginx.py +++ b/tools/cru-py/cru/service/_nginx.py @@ -54,32 +54,19 @@ class NginxManager(AppCommandFeatureProvider): def _get_domains_from_text(self, text: str) -> set[str]: domains: set[str] = set() regex = re.compile(r"server_name\s+(\S+)\s*;") - domain_variable_str = f"${self._domain_config_name}" - brace_domain_variable_regex = re.compile( - r"\$\{\s*" + self._domain_config_name + r"\s*\}" - ) for match in regex.finditer(text): - domain_part = match.group(1) - if domain_variable_str in domain_part: - domains.add(domain_part.replace(domain_variable_str, self.root_domain)) - continue - m = brace_domain_variable_regex.search(domain_part) - if m: - domains.add(domain_part.replace(m.group(0), self.root_domain)) - continue - domains.add(domain_part) + domains.add(match[1]) return domains - def _get_nginx_conf_template_text(self) -> str: - template_manager = self.app.get_feature(TemplateManager) + def _join_generated_nginx_conf_text(self) -> str: text = "" - for path, template in template_manager.template_tree.templates: - if path.as_posix().startswith("nginx/"): - text += template.raw_text + template_manager = self.app.get_feature(TemplateManager) + for nginx_conf in template_manager.generate(): + text += nginx_conf[1] return text def _get_domains(self) -> list[str]: - text = self._get_nginx_conf_template_text() + text = self._join_generated_nginx_conf_text() domains = list(self._get_domains_from_text(text)) domains.remove(self.root_domain) return [self.root_domain, *domains] diff --git a/tools/cru-py/cru/service/_template.py b/tools/cru-py/cru/service/_template.py index 170116c..1381700 100644 --- a/tools/cru-py/cru/service/_template.py +++ b/tools/cru-py/cru/service/_template.py @@ -1,8 +1,8 @@ from argparse import Namespace +from pathlib import Path import shutil -from cru import CruIterator -from cru.template import TemplateTree +from cru.template import TemplateTree, CruStrWrapperTemplate from ._base import AppCommandFeatureProvider, AppFeaturePath from ._config import ConfigManager @@ -16,7 +16,7 @@ class TemplateManager(AppCommandFeatureProvider): def setup(self) -> None: self._templates_dir = self.app.add_path("templates", True) self._generated_dir = self.app.add_path("generated", True) - self._template_tree: TemplateTree | None = None + self._template_tree: TemplateTree[CruStrWrapperTemplate] | None = None @property def prefix(self) -> str: @@ -31,20 +31,24 @@ class TemplateManager(AppCommandFeatureProvider): return self._generated_dir @property - def template_tree(self) -> TemplateTree: + def template_tree(self) -> TemplateTree[CruStrWrapperTemplate]: if self._template_tree is None: return self.reload() return self._template_tree def reload(self) -> TemplateTree: self._template_tree = TemplateTree( - self.prefix, self.templates_dir.full_path_str + lambda text: CruStrWrapperTemplate(text), self.templates_dir.full_path_str ) return self._template_tree def _print_file_lists(self) -> None: - for file in CruIterator(self.template_tree.templates).transform(lambda t: t[0]): - print(file.as_posix()) + for path, template in self.template_tree.templates: + print(f"[{template.variable_count}]", path.as_posix()) + + def generate(self) -> list[tuple[Path, str]]: + config_manager = self.app.get_feature(ConfigManager) + return self.template_tree.generate(config_manager.get_str_dict()) def _generate_files(self, dry_run: bool) -> None: config_manager = self.app.get_feature(ConfigManager) diff --git a/tools/cru-py/cru/template.py b/tools/cru-py/cru/template.py index 6749cab..35d68ac 100644 --- a/tools/cru-py/cru/template.py +++ b/tools/cru-py/cru/template.py @@ -1,74 +1,124 @@ -from collections.abc import Mapping -import os -import os.path +from abc import ABCMeta, abstractmethod +from collections.abc import Callable, Mapping from pathlib import Path from string import Template +from typing import Generic, TypeVar from ._iter import CruIterator from ._error import CruException +from .parsing import StrWrapperVarParser + class CruTemplateError(CruException): pass -class CruTemplate: +class CruTemplateBase(metaclass=ABCMeta): + def __init__(self, text: str): + self._text = text + self._variables: set[str] | None = None + + @abstractmethod + def _get_variables(self) -> set[str]: + raise NotImplementedError() + + @property + def text(self) -> str: + return self._text + + @property + def variables(self) -> set[str]: + if self._variables is None: + self._variables = self._get_variables() + return self._variables + + @property + def variable_count(self) -> int: + return len(self.variables) + + @property + def has_variables(self) -> bool: + return self.variable_count > 0 + + @abstractmethod + def _do_generate(self, mapping: dict[str, str]) -> str: + raise NotImplementedError() + + def generate(self, mapping: Mapping[str, str], allow_extra: bool = True) -> str: + values = dict(mapping) + if not self.variables <= set(values.keys()): + raise CruTemplateError("Missing variables.") + if not allow_extra and not set(values.keys()) <= self.variables: + raise CruTemplateError("Extra variables.") + return self._do_generate(values) + + +class CruTemplate(CruTemplateBase): def __init__(self, prefix: str, text: str): + super().__init__(text) self._prefix = prefix self._template = Template(text) - self._variables = ( + + def _get_variables(self) -> set[str]: + return ( CruIterator(self._template.get_identifiers()) - .filter(lambda i: i.startswith(self._prefix)) + .filter(lambda i: i.startswith(self.prefix)) .to_set() ) - self._all_variables = set(self._template.get_identifiers()) @property def prefix(self) -> str: return self._prefix @property - def raw_text(self) -> str: - return self._template.template - - @property def py_template(self) -> Template: return self._template @property - def variables(self) -> set[str]: - return self._variables - - @property def all_variables(self) -> set[str]: - return self._all_variables + return set(self._template.get_identifiers()) + + def _do_generate(self, mapping: dict[str, str]) -> str: + return self._template.safe_substitute(mapping) + + +class CruStrWrapperTemplate(CruTemplateBase): + def __init__(self, text: str, wrapper: str = "@@"): + super().__init__(text) + self._wrapper = wrapper + self._tokens: StrWrapperVarParser.Result @property - def has_variables(self) -> bool: - """ - If the template does not has any variables that starts with the given prefix, - it returns False. This usually indicates that the template is not a real - template and should be copied as is. Otherwise, it returns True. + def wrapper(self) -> str: + return self._wrapper + + def _get_variables(self): + self._tokens = StrWrapperVarParser(self.wrapper).parse(self.text) + return ( + self._tokens.cru_iter() + .filter(lambda t: t.is_var) + .map(lambda t: t.value) + .to_set() + ) - This can be used as a guard to prevent invalid templates created accidentally - without notice. - """ - return len(self.variables) > 0 + def _do_generate(self, mapping): + return ( + self._tokens.cru_iter() + .map(lambda t: mapping[t.value] if t.is_var else t.value) + .join_str("") + ) - def generate(self, mapping: Mapping[str, str], allow_extra: bool = True) -> str: - values = dict(mapping) - if not self.variables <= set(values.keys()): - raise CruTemplateError("Missing variables.") - if not allow_extra and not set(values.keys()) <= self.variables: - raise CruTemplateError("Extra variables.") - return self._template.safe_substitute(values) +_Template = TypeVar("_Template", bound=CruTemplateBase) -class TemplateTree: + +class TemplateTree(Generic[_Template]): def __init__( self, - prefix: str, + template_generator: Callable[[str], _Template], source: str, + *, template_file_suffix: str | None = ".template", ): """ @@ -80,18 +130,14 @@ class TemplateTree: If either case is false, it generally means whether the file is a template is wrongly handled. """ - self._prefix = prefix - self._files: list[tuple[Path, CruTemplate]] = [] + self._template_generator = template_generator + self._files: list[tuple[Path, _Template]] = [] self._source = source self._template_file_suffix = template_file_suffix self._load() @property - def prefix(self) -> str: - return self._prefix - - @property - def templates(self) -> list[tuple[Path, CruTemplate]]: + def templates(self) -> list[tuple[Path, _Template]]: return self._files @property @@ -103,13 +149,14 @@ class TemplateTree: return self._template_file_suffix @staticmethod - def _scan_files(root_path: str) -> list[Path]: + def _scan_files(root: str) -> list[Path]: + root_path = Path(root) result: list[Path] = [] - for root, _dirs, files in os.walk(root_path): - for file in files: - path = Path(root, file) - path = path.relative_to(root_path) - result.append(Path(path)) + for path in root_path.glob("**/*"): + if not path.is_file(): + continue + path = path.relative_to(root_path) + result.append(Path(path)) return result def _load(self) -> None: @@ -118,7 +165,7 @@ class TemplateTree: template_file = Path(self.source) / file_path with open(template_file, "r") as f: content = f.read() - template = CruTemplate(self.prefix, content) + template = self._template_generator(content) if self.template_file_suffix is not None: should_be_template = file_path.name.endswith(self.template_file_suffix) if should_be_template and not template.has_variables: @@ -136,18 +183,25 @@ class TemplateTree: s.update(template.variables) return s - def generate_to( - self, destination: str, variables: Mapping[str, str], dry_run: bool - ) -> None: - for file, template in self.templates: - des = Path(destination) / file - if self.template_file_suffix is not None and des.name.endswith( + def generate(self, variables: Mapping[str, str]) -> list[tuple[Path, str]]: + result: list[tuple[Path, str]] = [] + for path, template in self.templates: + if self.template_file_suffix is not None and path.name.endswith( self.template_file_suffix ): - des = des.parent / (des.name[: -len(self.template_file_suffix)]) + path = path.parent / (path.name[: -len(self.template_file_suffix)]) text = template.generate(variables) - if not dry_run: + result.append((path, text)) + return result + + def generate_to( + self, destination: str, variables: Mapping[str, str], dry_run: bool + ) -> None: + generated = self.generate(variables) + if not dry_run: + for path, text in generated: + des = Path(destination) / path des.parent.mkdir(parents=True, exist_ok=True) with open(des, "w") as f: f.write(text) |