diff options
author | Yuqian Yang <crupest@crupest.life> | 2025-06-30 14:25:02 +0800 |
---|---|---|
committer | Yuqian Yang <crupest@crupest.life> | 2025-06-30 14:25:02 +0800 |
commit | 5f00fdd06fa51bb0c82928e060d3f8811bc171e9 (patch) | |
tree | 4aaf446eeb120ccf62bbbd2696d576a3235c7f4f /deno/mail/aws | |
parent | 7178983a5b8a27d5f0f5b447528f0e23b50d75c1 (diff) | |
download | crupest-5f00fdd06fa51bb0c82928e060d3f8811bc171e9.tar.gz crupest-5f00fdd06fa51bb0c82928e060d3f8811bc171e9.tar.bz2 crupest-5f00fdd06fa51bb0c82928e060d3f8811bc171e9.zip |
mail: revert removing.
Diffstat (limited to 'deno/mail/aws')
-rw-r--r-- | deno/mail/aws/app.ts | 315 | ||||
-rw-r--r-- | deno/mail/aws/deliver.ts | 63 | ||||
-rw-r--r-- | deno/mail/aws/fetch.ts | 136 | ||||
-rw-r--r-- | deno/mail/aws/mail.ts | 59 |
4 files changed, 573 insertions, 0 deletions
diff --git a/deno/mail/aws/app.ts b/deno/mail/aws/app.ts new file mode 100644 index 0000000..3c8305d --- /dev/null +++ b/deno/mail/aws/app.ts @@ -0,0 +1,315 @@ +import { join } from "@std/path"; +import { z } from "zod"; +import { Hono } from "hono"; +import { zValidator } from "@hono/zod-validator"; +import { FetchHttpHandler } from "@smithy/fetch-http-handler"; +// @ts-types="npm:@types/yargs" +import yargs from "yargs"; + +import { ConfigDefinition, ConfigProvider } from "@crupest/base/config"; +import { CronTask } from "@crupest/base/cron"; + +import { DbService } from "../db.ts"; +import { createHono, createInbound, createSmtp, sendMail } from "../app.ts"; +import { DovecotMailDeliverer } from "../dovecot.ts"; +import { MailDeliverer } from "../mail.ts"; +import { + AwsMailMessageIdRewriteHook, + AwsMailMessageIdSaveHook, +} from "./mail.ts"; +import { AwsMailDeliverer } from "./deliver.ts"; +import { AwsMailFetcher, LiveMailNotFoundError } from "./fetch.ts"; + + +const PREFIX = "crupest-mail-server"; +const CONFIG_DEFINITIONS = { + dataPath: { + description: "Path to save app persistent data.", + default: ".", + }, + mailDomain: { + description: + "The part after `@` of an address. Used to determine local recipients.", + }, + httpHost: { + description: "Listening address for http server.", + default: "0.0.0.0", + }, + httpPort: { description: "Listening port for http server.", default: "2345" }, + smtpHost: { + description: "Listening address for dumb smtp server.", + default: "127.0.0.1", + }, + smtpPort: { + description: "Listening port for dumb smtp server.", + default: "2346", + }, + ldaPath: { + description: "full path of lda executable", + default: "/dovecot/libexec/dovecot/dovecot-lda", + }, + doveadmPath: { + description: "full path of doveadm executable", + default: "/dovecot/bin/doveadm", + }, + inboundFallback: { + description: "comma separated addresses used as fallback recipients", + default: "", + }, + awsInboundPath: { + description: "(random set) path for aws sns", + }, + awsInboundKey: { + description: "(random set) http header Authorization for aws sns", + }, + awsRegion: { + description: "aws region", + }, + awsUser: { + description: "aws access key id", + }, + awsPassword: { + description: "aws secret access key", + secret: true, + }, + awsMailBucket: { + description: "aws s3 bucket saving raw mails", + secret: true, + }, +} as const satisfies ConfigDefinition; + +function createAwsOptions({ + user, + password, + region, +}: { + user: string; + password: string; + region: string; +}) { + return { + credentials: () => + Promise.resolve({ + accessKeyId: user, + secretAccessKey: password, + }), + requestHandler: new FetchHttpHandler(), + region, + }; +} + +function createOutbound( + awsOptions: ReturnType<typeof createAwsOptions>, + db: DbService, + local?: DovecotMailDeliverer, +) { + const deliverer = new AwsMailDeliverer(awsOptions); + deliverer.preHooks.push( + new AwsMailMessageIdRewriteHook(db.messageIdToAws.bind(db)), + ); + deliverer.postHooks.push( + new AwsMailMessageIdSaveHook( + async (original, aws, context) => { + await db.addMessageIdMap({ message_id: original, aws_message_id: aws }); + void local?.saveNewSent(context.logTag, context.mail, original); + }, + ), + ); + return deliverer; +} + +function setupAwsHono( + hono: Hono, + options: { + path: string; + auth: string; + fetcher: AwsMailFetcher; + deliverer: MailDeliverer; + }, +) { + let counter = 1; + + hono.post( + `/${options.path}`, + async (ctx, next) => { + const auth = ctx.req.header("Authorization"); + if (auth !== options.auth) { + return ctx.json({ message: "Bad auth!" }, 403); + } + await next(); + }, + zValidator( + "json", + z.object({ + key: z.string(), + recipients: z.optional(z.array(z.string())), + }), + ), + async (ctx) => { + const { fetcher, deliverer } = options; + const { key, recipients } = ctx.req.valid("json"); + try { + await fetcher.deliverLiveMail( + `[inbound ${counter++}]`, + key, + deliverer, + recipients, + ); + } catch (e) { + if (e instanceof LiveMailNotFoundError) { + return ctx.json({ message: e.message }); + } + throw e; + } + return ctx.json({ message: "Done!" }); + }, + ); +} + +function createCron(fetcher: AwsMailFetcher, deliverer: MailDeliverer) { + return new CronTask({ + name: "live-mail-recycler", + interval: 6 * 3600 * 1000, + callback: () => { + return fetcher.recycleLiveMails(deliverer); + }, + startNow: true, + }); +} + +function createBaseServices() { + const config = new ConfigProvider(PREFIX, CONFIG_DEFINITIONS); + Deno.mkdirSync(config.get("dataPath"), { recursive: true }); + return { config }; +} + +function createAwsFetchOnlyServices() { + const services = createBaseServices(); + const { config } = services; + + const awsOptions = createAwsOptions({ + user: config.get("awsUser"), + password: config.get("awsPassword"), + region: config.get("awsRegion"), + }); + const fetcher = new AwsMailFetcher(awsOptions, config.get("awsMailBucket")); + + return { ...services, awsOptions, fetcher }; +} + +function createAwsRecycleOnlyServices() { + const services = createAwsFetchOnlyServices(); + const { config } = services; + + const inbound = createInbound({ + fallback: config.getList("inboundFallback"), + ldaPath: config.get("ldaPath"), + doveadmPath: config.get("doveadmPath"), + aliasFile: join(config.get("dataPath"), "aliases.csv"), + mailDomain: config.get("mailDomain"), + }); + + return { ...services, inbound }; +} + +function createAwsServices() { + const services = createAwsRecycleOnlyServices(); + const { config, awsOptions, inbound } = services; + + const dbService = new DbService(join(config.get("dataPath"), "db.sqlite")); + const outbound = createOutbound(awsOptions, dbService, inbound); + + return { ...services, dbService, outbound }; +} + +function createServerServices() { + const services = createAwsServices(); + const { config, outbound, inbound, fetcher } = services; + + const smtp = createSmtp(outbound); + const hono = createHono(outbound, inbound); + + setupAwsHono(hono, { + path: config.get("awsInboundPath"), + auth: config.get("awsInboundKey"), + fetcher, + deliverer: inbound, + }); + + return { ...services, smtp, hono }; +} + +function serve(cron: boolean = false) { + const { config, fetcher, inbound, smtp, hono } = createServerServices(); + smtp.serve({ + hostname: config.get("smtpHost"), + port: config.getInt("smtpPort"), + }); + Deno.serve( + { + hostname: config.get("httpHost"), + port: config.getInt("httpPort"), + }, + hono.fetch, + ); + + if (cron) { + createCron(fetcher, inbound); + } +} + +async function listLives() { + const { fetcher } = createAwsFetchOnlyServices(); + const liveMails = await fetcher.listLiveMails(); + console.info(`Total ${liveMails.length}:`); + if (liveMails.length !== 0) { + console.info(liveMails.join("\n")); + } +} + +async function recycleLives() { + const { fetcher, inbound } = createAwsRecycleOnlyServices(); + await fetcher.recycleLiveMails(inbound); +} + +if (import.meta.main) { + await yargs(Deno.args) + .scriptName("mail") + .command({ + command: "sendmail", + describe: "send mail via this server's endpoint", + handler: async (_argv) => { + const { config } = createBaseServices(); + await sendMail(config.getInt("httpPort")); + }, + }) + .command({ + command: "live", + describe: "work with live mails", + builder: (builder) => { + return builder + .command({ + command: "list", + describe: "list live mails", + handler: listLives, + }) + .command({ + command: "recycle", + describe: "recycle all live mails", + handler: recycleLives, + }) + .demandCommand(1, "One command must be specified."); + }, + handler: () => {}, + }) + .command({ + command: "serve", + describe: "start the http and smtp servers", + builder: (builder) => builder.option("real", { type: "boolean" }), + handler: (argv) => serve(argv.real), + }) + .demandCommand(1, "One command must be specified.") + .help() + .strict() + .parse(); +} diff --git a/deno/mail/aws/deliver.ts b/deno/mail/aws/deliver.ts new file mode 100644 index 0000000..0195369 --- /dev/null +++ b/deno/mail/aws/deliver.ts @@ -0,0 +1,63 @@ +import { + SendEmailCommand, + SESv2Client, + SESv2ClientConfig, +} from "@aws-sdk/client-sesv2"; + +import { Mail, MailDeliverContext, MailDeliverer } from "../mail.ts"; + +declare module "../mail.ts" { + interface MailDeliverResult { + awsMessageId?: string; + } +} + +export class AwsMailDeliverer extends MailDeliverer { + readonly name = "aws"; + readonly #aws; + readonly #ses; + + constructor(aws: SESv2ClientConfig) { + super(true); + this.#aws = aws; + this.#ses = new SESv2Client(aws); + } + + protected override async doDeliver( + mail: Mail, + context: MailDeliverContext, + ): Promise<void> { + try { + const sendCommand = new SendEmailCommand({ + Content: { + Raw: { Data: mail.toUtf8Bytes() }, + }, + }); + + console.info(context.logTag, "Calling aws send-email api..."); + const res = await this.#ses.send(sendCommand); + if (res.MessageId == null) { + console.warn( + context.logTag, + "AWS send-email returned null message id.", + ); + } else { + context.result.awsMessageId = + `${res.MessageId}@${this.#aws.region}.amazonses.com`; + } + + context.result.smtpMessage = + `AWS Message ID: ${context.result.awsMessageId}`; + context.result.recipients.set("*", { + kind: "success", + message: `Succeeded to call aws send-email api.`, + }); + } catch (cause) { + context.result.recipients.set("*", { + kind: "failure", + message: "A JS error was thrown when calling aws send-email." + cause, + cause, + }); + } + } +} diff --git a/deno/mail/aws/fetch.ts b/deno/mail/aws/fetch.ts new file mode 100644 index 0000000..2154972 --- /dev/null +++ b/deno/mail/aws/fetch.ts @@ -0,0 +1,136 @@ +import { + CopyObjectCommand, + DeleteObjectCommand, + GetObjectCommand, + ListObjectsV2Command, + NoSuchKey, + S3Client, + S3ClientConfig, +} from "@aws-sdk/client-s3"; + +import { DateUtils } from "@crupest/base"; + +import { Mail } from "../mail.ts"; +import { MailDeliverer } from "../mail.ts"; + +export class LiveMailNotFoundError extends Error {} + +async function s3MoveObject( + client: S3Client, + bucket: string, + path: string, + newPath: string, +): Promise<void> { + const copyCommand = new CopyObjectCommand({ + Bucket: bucket, + Key: newPath, + CopySource: `${bucket}/${path}`, + }); + await client.send(copyCommand); + + const deleteCommand = new DeleteObjectCommand({ + Bucket: bucket, + Key: path, + }); + await client.send(deleteCommand); +} + +const AWS_SES_S3_SETUP_TAG = "AMAZON_SES_SETUP_NOTIFICATION"; + +export class AwsMailFetcher { + readonly #livePrefix = "mail/live/"; + readonly #archivePrefix = "mail/archive/"; + readonly #s3; + readonly #bucket; + + constructor(aws: S3ClientConfig, bucket: string) { + this.#s3 = new S3Client(aws); + this.#bucket = bucket; + } + + async listLiveMails(): Promise<string[]> { + const listCommand = new ListObjectsV2Command({ + Bucket: this.#bucket, + Prefix: this.#livePrefix, + }); + const res = await this.#s3.send(listCommand); + + if (res.Contents == null) { + console.warn("S3 API returned null Content."); + return []; + } + + const result: string[] = []; + for (const object of res.Contents) { + if (object.Key == null) { + console.warn("S3 API returned null Key."); + continue; + } + + if (object.Key.endsWith(AWS_SES_S3_SETUP_TAG)) continue; + + result.push(object.Key.slice(this.#livePrefix.length)); + } + return result; + } + + async deliverLiveMail( + logTag: string, + s3Key: string, + deliverer: MailDeliverer, + recipients?: string[], + ) { + console.info(logTag, `Fetching live mail ${s3Key}...`); + const mailPath = `${this.#livePrefix}${s3Key}`; + const command = new GetObjectCommand({ + Bucket: this.#bucket, + Key: mailPath, + }); + + let rawMail; + + try { + const res = await this.#s3.send(command); + if (res.Body == null) { + throw new Error("S3 API returns a null body."); + } + rawMail = await res.Body.transformToString(); + } catch (cause) { + if (cause instanceof NoSuchKey) { + const message = + `Live mail ${s3Key} is not found. Perhaps already delivered?`; + console.error(message, cause); + throw new LiveMailNotFoundError(message); + } + throw cause; + } + + const mail = new Mail(rawMail); + await deliverer.deliver({ mail, recipients }); + + const { date } = new Mail(rawMail).parsed; + const dateString = date != null + ? DateUtils.toFileNameString(date, true) + : "invalid-date"; + const newPath = `${this.#archivePrefix}${dateString}/${s3Key}`; + + console.info(logTag, `Archiving live mail ${s3Key} to ${newPath}...`); + await s3MoveObject(this.#s3, this.#bucket, mailPath, newPath); + + console.info(logTag, `Done deliver live mail ${s3Key}.`); + } + + async recycleLiveMails(deliverer: MailDeliverer) { + console.info("Begin to recycle live mails..."); + const mails = await this.listLiveMails(); + console.info(`Found ${mails.length} live mails`); + let counter = 1; + for (const s3Key of mails) { + await this.deliverLiveMail( + `[${counter++}/${mails.length}]`, + s3Key, + deliverer, + ); + } + } +} diff --git a/deno/mail/aws/mail.ts b/deno/mail/aws/mail.ts new file mode 100644 index 0000000..26f3ea0 --- /dev/null +++ b/deno/mail/aws/mail.ts @@ -0,0 +1,59 @@ +import { MailDeliverContext, MailDeliverHook } from "../mail.ts"; + +export class AwsMailMessageIdRewriteHook implements MailDeliverHook { + readonly #lookup; + + constructor(lookup: (origin: string) => Promise<string | null>) { + this.#lookup = lookup; + } + + async callback(context: MailDeliverContext): Promise<void> { + const addresses = context.mail.simpleFindAllAddresses(); + for (const address of addresses) { + const awsMessageId = await this.#lookup(address); + if (awsMessageId != null && awsMessageId.length !== 0) { + console.info( + context.logTag, + `Rewrite address-line string in mail: ${address} => ${awsMessageId}.`, + ); + context.mail.raw = context.mail.raw.replaceAll(address, awsMessageId); + } + } + } +} + +export class AwsMailMessageIdSaveHook implements MailDeliverHook { + readonly #record; + + constructor( + record: ( + original: string, + aws: string, + context: MailDeliverContext, + ) => Promise<void>, + ) { + this.#record = record; + } + + async callback(context: MailDeliverContext): Promise<void> { + const { messageId } = context.mail.parsed; + if (messageId == null) { + console.warn( + context.logTag, + "Original mail doesn't have message id, skip saving message id map.", + ); + return; + } + if (context.result.awsMessageId != null) { + console.info( + context.logTag, + `Save message id map: ${messageId} => ${context.result.awsMessageId}.`, + ); + context.mail.raw = context.mail.raw.replaceAll( + messageId, + context.result.awsMessageId, + ); + await this.#record(messageId, context.result.awsMessageId, context); + } + } +} |