diff options
author | Yuqian Yang <crupest@crupest.life> | 2025-06-05 22:30:51 +0800 |
---|---|---|
committer | Yuqian Yang <crupest@crupest.life> | 2025-06-09 21:48:00 +0800 |
commit | 3bdca0b90cf8bf5dfd6ff1ab482d857abb4acd2d (patch) | |
tree | 42fd1bf1f0119910c09542fbf475c012404658fd /deno/mail-relay/aws/app.ts | |
parent | 543fc733da074751e1750603df6931089efab465 (diff) | |
download | crupest-3bdca0b90cf8bf5dfd6ff1ab482d857abb4acd2d.tar.gz crupest-3bdca0b90cf8bf5dfd6ff1ab482d857abb4acd2d.tar.bz2 crupest-3bdca0b90cf8bf5dfd6ff1ab482d857abb4acd2d.zip |
feat(deno): move deno (mail-server) to top level.
Diffstat (limited to 'deno/mail-relay/aws/app.ts')
-rw-r--r-- | deno/mail-relay/aws/app.ts | 380 |
1 files changed, 272 insertions, 108 deletions
diff --git a/deno/mail-relay/aws/app.ts b/deno/mail-relay/aws/app.ts index 1fda64e..685d7a9 100644 --- a/deno/mail-relay/aws/app.ts +++ b/deno/mail-relay/aws/app.ts @@ -1,113 +1,266 @@ +import { join } from "@std/path"; import { parseArgs } from "@std/cli"; import { z } from "zod"; +import { Hono } from "hono"; import { zValidator } from "@hono/zod-validator"; +import { FetchHttpHandler } from "@smithy/fetch-http-handler"; -import log from "../log.ts"; -import config from "../config.ts"; -import { AppBase } from "../app.ts"; -import { AwsContext } from "./context.ts"; +import { Logger } from "@crupest/base/log"; +import { ConfigDefinition, ConfigProvider } from "@crupest/base/config"; +import { CronTask } from "@crupest/base/cron"; + +import { DbService } from "../db.ts"; +import { Mail } from "../mail.ts"; import { - AwsMailDeliverer, AwsMailMessageIdRewriteHook, AwsMailMessageIdSaveHook, -} from "./deliver.ts"; -import { AwsMailRetriever } from "./retriever.ts"; - -export class AwsRelayApp extends AppBase { - readonly #aws = new AwsContext(); - readonly #retriever; - protected readonly outboundDeliverer = new AwsMailDeliverer(this.#aws); - - constructor() { - super(); - this.#retriever = new AwsMailRetriever(this.#aws, this.inboundDeliverer); - - this.outboundDeliverer.preHooks.push( - new AwsMailMessageIdRewriteHook(this.db), - ); - this.outboundDeliverer.postHooks.push( - new AwsMailMessageIdSaveHook(this.db), - ); - - this.hono.post( - `/${config.get("awsInboundPath")}`, - async (ctx, next) => { - const auth = ctx.req.header("Authorization"); - if (auth !== config.get("awsInboundKey")) { - return ctx.json({ "msg": "Bad auth!" }, 403); - } - await next(); - }, - zValidator( - "json", - z.object({ - key: z.string(), - recipients: z.optional(z.array(z.string())), - }), - ), - async (ctx) => { - const { key, recipients } = ctx.req.valid("json"); - await this.#retriever.deliverS3Mail(key, recipients); - return ctx.json({ "msg": "Done!" }); - }, - ); - } +} from "./mail.ts"; +import { AwsMailDeliverer } from "./deliver.ts"; +import { AwsMailFetcher, AwsS3MailConsumer } from "./fetch.ts"; +import { createInbound, createHono, sendMail, createSmtp } from "../app.ts"; - realServe() { - this.createCron({ - name: "live-mail-recycler", - interval: 6 * 3600 * 1000, - callback: () => { - return this.#retriever.recycleLiveMails(); - }, - startNow: true, - }); - - return this.serve(); - } +const PREFIX = "crupest-mail-server"; +const CONFIG_DEFINITIONS = { + dataPath: { + description: "Path to save app persistent data.", + default: ".", + }, + mailDomain: { + description: + "The part after `@` of an address. Used to determine local recipients.", + }, + httpHost: { + description: "Listening address for http server.", + default: "0.0.0.0", + }, + httpPort: { description: "Listening port for http server.", default: "2345" }, + smtpHost: { + description: "Listening address for dumb smtp server.", + default: "127.0.0.1", + }, + smtpPort: { + description: "Listening port for dumb smtp server.", + default: "2346", + }, + ldaPath: { + description: "full path of lda executable", + default: "/dovecot/libexec/dovecot/dovecot-lda", + }, + inboundFallback: { + description: "comma separated addresses used as fallback recipients", + default: "", + }, + awsInboundPath: { + description: "(random set) path for aws sns", + }, + awsInboundKey: { + description: "(random set) http header Authorization for aws sns", + }, + awsRegion: { + description: "aws region", + }, + awsUser: { + description: "aws access key id", + }, + awsPassword: { + description: "aws secret access key", + secret: true, + }, + awsMailBucket: { + description: "aws s3 bucket saving raw mails", + secret: true, + }, +} as const satisfies ConfigDefinition; + +function createAwsOptions({ + user, + password, + region, +}: { + user: string; + password: string; + region: string; +}) { + return { + credentials: () => + Promise.resolve({ + accessKeyId: user, + secretAccessKey: password, + }), + requestHandler: new FetchHttpHandler(), + region, + }; +} - readonly cli = { - "init": (_: unknown) => { - log.info("Just init!"); - return Promise.resolve(); +function createOutbound( + logger: Logger, + awsOptions: ReturnType<typeof createAwsOptions>, + db: DbService, +) { + const deliverer = new AwsMailDeliverer(logger, awsOptions); + deliverer.preHooks.push( + new AwsMailMessageIdRewriteHook(db.messageIdToAws.bind(db)), + ); + deliverer.postHooks.push( + new AwsMailMessageIdSaveHook((original, aws) => + db.addMessageIdMap({ message_id: original, aws_message_id: aws }).then(), + ), + ); + return deliverer; +} + +function setupAwsHono( + hono: Hono, + options: { + path: string; + auth: string; + callback: (s3Key: string, recipients?: string[]) => Promise<void>; + }, +) { + hono.post( + `/${options.path}`, + async (ctx, next) => { + const auth = ctx.req.header("Authorization"); + if (auth !== options.auth) { + return ctx.json({ msg: "Bad auth!" }, 403); + } + await next(); }, - "list-lives": async (_: unknown) => { - const liveMails = await this.#retriever.listLiveMails(); - log.info(`Total ${liveMails.length}:`); - log.info(liveMails.join("\n")); + zValidator( + "json", + z.object({ + key: z.string(), + recipients: z.optional(z.array(z.string())), + }), + ), + async (ctx) => { + const { key, recipients } = ctx.req.valid("json"); + await options.callback(key, recipients); + return ctx.json({ msg: "Done!" }); }, - "recycle-lives": async (_: unknown) => { - await this.#retriever.recycleLiveMails(); + ); +} + +function createCron(fetcher: AwsMailFetcher, consumer: AwsS3MailConsumer) { + return new CronTask({ + name: "live-mail-recycler", + interval: 6 * 3600 * 1000, + callback: () => { + return fetcher.recycleLiveMails(consumer); }, - "serve": async (_: unknown) => { - await this.serve().http.finished; + startNow: true, + }); +} + +function createBaseServices() { + const config = new ConfigProvider(PREFIX, CONFIG_DEFINITIONS); + Deno.mkdirSync(config.get("dataPath"), { recursive: true }); + const logger = new Logger(); + logger.externalLogDir = join(config.get("dataPath"), "log"); + return { config, logger }; +} + +function createAwsFetchOnlyServices() { + const { config, logger } = createBaseServices(); + const awsOptions = createAwsOptions({ + user: config.get("awsUser"), + password: config.get("awsPassword"), + region: config.get("awsRegion"), + }); + const fetcher = new AwsMailFetcher( + logger, + awsOptions, + config.get("awsMailBucket"), + ); + return { config, logger, awsOptions, fetcher }; +} + +function createAwsRecycleOnlyServices() { + const { config, logger, awsOptions, fetcher } = createAwsFetchOnlyServices(); + + const inbound = createInbound(logger, { + fallback: config.getList("inboundFallback"), + ldaPath: config.get("ldaPath"), + aliasFile: join(config.get("dataPath"), "aliases.csv"), + mailDomain: config.get("mailDomain"), + }); + + const recycler = (rawMail: string, _: unknown): Promise<void> => + inbound.deliver({ mail: new Mail(rawMail) }).then(); + + return { config, logger, awsOptions, fetcher, inbound, recycler }; +} +function createAwsServices() { + const { config, logger, inbound, awsOptions, fetcher, recycler } = + createAwsRecycleOnlyServices(); + const dbService = new DbService(join(config.get("dataPath"), "db.sqlite")); + const outbound = createOutbound(logger, awsOptions, dbService); + + return { + config, + logger, + inbound, + dbService, + awsOptions, + fetcher, + recycler, + outbound, + }; +} + +function createServerServices() { + const services = createAwsServices(); + const { logger, config, outbound, inbound, fetcher } = services; + const smtp = createSmtp(logger, outbound); + + const hono = createHono(logger, outbound, inbound); + setupAwsHono(hono, { + path: config.get("awsInboundPath"), + auth: config.get("awsInboundKey"), + callback: (s3Key, recipients) => { + return fetcher.consumeS3Mail(s3Key, (rawMail, _) => + inbound.deliver({ mail: new Mail(rawMail), recipients }).then(), + ); }, - "real-serve": async (_: unknown) => { - await this.realServe().http.finished; + }); + + return { + ...services, + smtp, + hono, + }; +} + +function serve(cron: boolean = false) { + const { config, fetcher, recycler, smtp, hono } = createServerServices(); + smtp.serve({ + hostname: config.get("smtpHost"), + port: config.getInt("smtpPort"), + }); + Deno.serve( + { + hostname: config.get("httpHost"), + port: config.getInt("httpPort"), }, - } as const; + hono.fetch, + ); + + if (cron) { + createCron(fetcher, recycler); + } } -const nonServerCli = { - "sendmail": async (_: unknown) => { - const decoder = new TextDecoder(); - let text = ""; - for await (const chunk of Deno.stdin.readable) { - text += decoder.decode(chunk); - } +async function listLives() { + const { logger, fetcher } = createAwsFetchOnlyServices(); + const liveMails = await fetcher.listLiveMails(); + logger.info(`Total ${liveMails.length}:`); + logger.info(liveMails.join("\n")); +} - const res = await fetch( - `http://localhost:${config.HTTP_PORT}/send/raw`, - { - method: "post", - body: text, - }, - ); - log.infoOrError(!res.ok, res); - log.infoOrError(!res.ok, "Body\n" + await res.text()); - if (!res.ok) Deno.exit(-1); - }, -} as const; +async function recycleLives() { + const { fetcher, recycler } = createAwsRecycleOnlyServices(); + await fetcher.recycleLiveMails(recycler); +} if (import.meta.main) { const args = parseArgs(Deno.args); @@ -116,21 +269,32 @@ if (import.meta.main) { throw new Error("You must specify a command."); } - const command = args._[0]; - - if (command in nonServerCli) { - log.info(`Run non-server command ${command}.`); - await nonServerCli[command as keyof typeof nonServerCli](args); - Deno.exit(0); - } + const command = String(args._[0]); - const app = new AwsRelayApp(); - await app.setup(); - if (command in app.cli) { - log.info(`Run command ${command}.`); - await app.cli[command as keyof AwsRelayApp["cli"]](args); - Deno.exit(0); - } else { - throw new Error(command + " is not a valid command."); + switch (command) { + case "sendmail": { + const { logger, config } = createBaseServices(); + await sendMail(logger, config.getInt("httpPort")); + break; + } + case "list-lives": { + await listLives(); + break; + } + case "recycle-lives": { + await recycleLives(); + break; + } + case "serve": { + serve(); + break; + } + case "real-serve": { + serve(true); + break; + } + default: { + throw new Error(command + " is not a valid command."); + } } } |