1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
|
from cru import CruException, CruUserFriendlyException
from cru.config import Configuration, ConfigItem
from cru.value import (
INTEGER_VALUE_TYPE,
TEXT_VALUE_TYPE,
RandomStringValueGenerator,
UuidValueGenerator,
)
from cru.parsing import SimpleLineConfigParser
from ._base import AppFeaturePath, AppCommandFeatureProvider, OWNER_NAME
class AppConfigError(CruException):
pass
class AppConfigDuplicateItemsError(AppConfigError):
def __init__(
self, message: str, items: list[SimpleLineConfigParser.Item], *args, **kwargs
) -> None:
super().__init__(message, *args, **kwargs)
self._items = items
@property
def duplicate_items(self) -> list[SimpleLineConfigParser.Item]:
return self._items
@staticmethod
def duplicate_items_to_friendly_message(
items: list[SimpleLineConfigParser.Item],
) -> str:
return "".join(
f"line {item.line_number}: {item.key}={item.value}\n" for item in items
)
def to_friendly_error(self) -> CruUserFriendlyException:
e = CruUserFriendlyException(
f"Duplicate configuration items detected:\n"
f"{self.duplicate_items_to_friendly_message(self.duplicate_items)}"
)
e.__cause__ = self
return e
class ConfigManager(AppCommandFeatureProvider):
def __init__(self) -> None:
super().__init__("config-manager")
configuration = Configuration()
self._configuration = configuration
self._add_text_item("DOMAIN", "domain name")
self._add_text_item("EMAIL", "admin email address")
self._add_text_item(
"AUTO_BACKUP_COS_SECRET_ID",
"access key id for Tencent COS, used for auto backup",
)
self._add_text_item(
"AUTO_BACKUP_COS_SECRET_KEY",
"access key secret for Tencent COS, used for auto backup",
)
self._add_text_item(
"AUTO_BACKUP_COS_REGION", "region for Tencent COS, used for auto backup"
)
self._add_text_item(
"AUTO_BACKUP_BUCKET_NAME",
"bucket name for Tencent COS, used for auto backup",
)
self._add_text_item("GITHUB_USERNAME", "github username for fetching todos")
self._add_int_item(
"GITHUB_PROJECT_NUMBER", "github project number for fetching todos"
)
self._add_text_item("GITHUB_TOKEN", "github token for fetching todos")
self._add_text_item("GITHUB_TODO_COUNT", "github todo count")
self._add_uuid_item("V2RAY_TOKEN", "v2ray user id")
self._add_uuid_item("V2RAY_PATH", "v2ray path, which will be prefixed by _")
self._add_text_item("FORGEJO_MAILER_USER", "Forgejo SMTP user")
self._add_text_item("FORGEJO_MAILER_PASSWD", "Forgejo SMTP password")
self._add_random_string_item("2FAUTH_APP_KEY", "2FAuth App Key")
self._add_text_item("2FAUTH_MAIL_USERNAME", "2FAuth SMTP user")
self._add_text_item("2FAUTH_MAIL_PASSWORD", "2FAuth SMTP password")
def _add_text_item(self, name: str, description: str) -> None:
self.configuration.add(
ConfigItem(f"{OWNER_NAME}_{name}", description, TEXT_VALUE_TYPE)
)
def _add_uuid_item(self, name: str, description: str) -> None:
self.configuration.add(
ConfigItem(
f"{OWNER_NAME}_{name}",
description,
TEXT_VALUE_TYPE,
default=UuidValueGenerator(),
)
)
def _add_random_string_item(
self, name: str, description: str, length: int = 32, secure: bool = True
) -> None:
self.configuration.add(
ConfigItem(
f"{OWNER_NAME}_{name}",
description,
TEXT_VALUE_TYPE,
default=RandomStringValueGenerator(length, secure),
)
)
def _add_int_item(self, name: str, description: str) -> None:
self.configuration.add(
ConfigItem(f"{OWNER_NAME}_{name}", description, INTEGER_VALUE_TYPE)
)
def setup(self) -> None:
self._config_path = self.app.data_dir.add_subpath(
"config", False, description="Configuration file path."
)
@property
def config_file_path(self) -> AppFeaturePath:
return self._config_path
@property
def configuration(self) -> Configuration:
return self._configuration
@property
def config_keys(self) -> list[str]:
return [item.name for item in self.configuration]
@property
def config_map(self) -> dict[str, str]:
raise NotImplementedError()
def _parse_config_file(self) -> SimpleLineConfigParser.Result | None:
if not self.config_file_path.check_self():
return None
parser = SimpleLineConfigParser()
return parser.parse(self.config_file_path.full_path.read_text())
def _check_duplicate(
self,
result: SimpleLineConfigParser.Result
| dict[str, list[SimpleLineConfigParser.Item]],
) -> dict[str, str]:
if isinstance(result, SimpleLineConfigParser.Result):
result = result.cru_iter().group_by(lambda i: i.key)
config = {}
error_items = []
for key, items in result.items():
config[key] = items[0].value
for item in items[1:]:
error_items.append(item)
if len(error_items) > 0:
raise AppConfigDuplicateItemsError("Duplicate items found.", error_items)
return config
def _check_config_file(self) -> dict[str, str]:
# TODO: Continue here!
raise NotImplementedError()
def reload_config_file(self) -> bool:
self.configuration.reset_all()
if not self.config_file_path.check_self():
return False
parser = SimpleLineConfigParser()
parse_result = parser.parse(self.config_file_path.full_path.read_text())
config_dict = parse_result.cru_iter().group_by(lambda i: i.key)
return True
def print_app_config_info(self):
for item in self.configuration:
print(f"{item.name} ({item.value_type.name}): {item.description}")
def get_command_info(self):
return "config", "Manage configuration."
def setup_arg_parser(self, arg_parser) -> None:
subparsers = arg_parser.add_subparsers(dest="config_command")
_print_app_parser = subparsers.add_parser(
"print-app",
help="Print application configuration information "
"of the items defined in the application.",
)
_check_config_parser = subparsers.add_parser(
"check",
help="Check the validity of the configuration file.",
)
_check_config_parser.add_argument(
"-f",
"--format-only",
action="store_true",
help="Only check content format, not "
"for application configuration requirements.",
)
def run_command(self, args) -> None:
if args.config_command == "print-app":
self.print_app_config_info()
|