aboutsummaryrefslogtreecommitdiff
path: root/deno/mail-relay/aws
diff options
context:
space:
mode:
Diffstat (limited to 'deno/mail-relay/aws')
-rw-r--r--deno/mail-relay/aws/app.ts60
-rw-r--r--deno/mail-relay/aws/fetch.ts41
2 files changed, 54 insertions, 47 deletions
diff --git a/deno/mail-relay/aws/app.ts b/deno/mail-relay/aws/app.ts
index c8a90c8..e3f1114 100644
--- a/deno/mail-relay/aws/app.ts
+++ b/deno/mail-relay/aws/app.ts
@@ -10,15 +10,16 @@ import { ConfigDefinition, ConfigProvider } from "@crupest/base/config";
import { CronTask } from "@crupest/base/cron";
import { DbService } from "../db.ts";
-import { Mail } from "../mail.ts";
+import { createHono, createInbound, createSmtp, sendMail } from "../app.ts";
+import { DovecotMailDeliverer } from "../dovecot.ts";
+import { MailDeliverer } from "../mail.ts";
import {
AwsMailMessageIdRewriteHook,
AwsMailMessageIdSaveHook,
} from "./mail.ts";
import { AwsMailDeliverer } from "./deliver.ts";
-import { AwsMailFetcher, AwsS3MailConsumer } from "./fetch.ts";
-import { createHono, createInbound, createSmtp, sendMail } from "../app.ts";
-import { DovecotMailDeliverer } from "../dovecot.ts";
+import { AwsMailFetcher, LiveMailNotFoundError } from "./fetch.ts";
+
const PREFIX = "crupest-mail-server";
const CONFIG_DEFINITIONS = {
@@ -122,15 +123,18 @@ function setupAwsHono(
options: {
path: string;
auth: string;
- callback: (s3Key: string, recipients?: string[]) => Promise<void>;
+ fetcher: AwsMailFetcher;
+ deliverer: MailDeliverer;
},
) {
+ let counter = 1;
+
hono.post(
`/${options.path}`,
async (ctx, next) => {
const auth = ctx.req.header("Authorization");
if (auth !== options.auth) {
- return ctx.json({ msg: "Bad auth!" }, 403);
+ return ctx.json({ message: "Bad auth!" }, 403);
}
await next();
},
@@ -142,19 +146,32 @@ function setupAwsHono(
}),
),
async (ctx) => {
+ const { fetcher, deliverer } = options;
const { key, recipients } = ctx.req.valid("json");
- await options.callback(key, recipients);
- return ctx.json({ msg: "Done!" });
+ try {
+ await fetcher.deliverLiveMail(
+ `[inbound ${counter++}]`,
+ key,
+ deliverer,
+ recipients,
+ );
+ } catch (e) {
+ if (e instanceof LiveMailNotFoundError) {
+ return ctx.json({ message: e.message });
+ }
+ throw e;
+ }
+ return ctx.json({ message: "Done!" });
},
);
}
-function createCron(fetcher: AwsMailFetcher, consumer: AwsS3MailConsumer) {
+function createCron(fetcher: AwsMailFetcher, deliverer: MailDeliverer) {
return new CronTask({
name: "live-mail-recycler",
interval: 6 * 3600 * 1000,
callback: () => {
- return fetcher.recycleLiveMails(consumer);
+ return fetcher.recycleLiveMails(deliverer);
},
startNow: true,
});
@@ -191,10 +208,8 @@ function createAwsRecycleOnlyServices() {
aliasFile: join(config.get("dataPath"), "aliases.csv"),
mailDomain: config.get("mailDomain"),
});
- const recycler = (rawMail: string, _: unknown): Promise<void> =>
- inbound.deliver({ mail: new Mail(rawMail) }).then();
- return { ...services, inbound, recycler };
+ return { ...services, inbound };
}
function createAwsServices() {
@@ -214,25 +229,18 @@ function createServerServices() {
const smtp = createSmtp(outbound);
const hono = createHono(outbound, inbound);
- let counter = 1;
setupAwsHono(hono, {
path: config.get("awsInboundPath"),
auth: config.get("awsInboundKey"),
- callback: (s3Key, recipients) => {
- return fetcher.consumeS3Mail(
- `[inbound ${counter++}]`,
- s3Key,
- (rawMail, _) =>
- inbound.deliver({ mail: new Mail(rawMail), recipients }).then(),
- );
- },
+ fetcher,
+ deliverer: inbound,
});
return { ...services, smtp, hono };
}
function serve(cron: boolean = false) {
- const { config, fetcher, recycler, smtp, hono } = createServerServices();
+ const { config, fetcher, inbound, smtp, hono } = createServerServices();
smtp.serve({
hostname: config.get("smtpHost"),
port: config.getInt("smtpPort"),
@@ -246,7 +254,7 @@ function serve(cron: boolean = false) {
);
if (cron) {
- createCron(fetcher, recycler);
+ createCron(fetcher, inbound);
}
}
@@ -260,8 +268,8 @@ async function listLives() {
}
async function recycleLives() {
- const { fetcher, recycler } = createAwsRecycleOnlyServices();
- await fetcher.recycleLiveMails(recycler);
+ const { fetcher, inbound } = createAwsRecycleOnlyServices();
+ await fetcher.recycleLiveMails(inbound);
}
if (import.meta.main) {
diff --git a/deno/mail-relay/aws/fetch.ts b/deno/mail-relay/aws/fetch.ts
index 34948d4..2154972 100644
--- a/deno/mail-relay/aws/fetch.ts
+++ b/deno/mail-relay/aws/fetch.ts
@@ -3,7 +3,7 @@ import {
DeleteObjectCommand,
GetObjectCommand,
ListObjectsV2Command,
- NoSuchBucket,
+ NoSuchKey,
S3Client,
S3ClientConfig,
} from "@aws-sdk/client-s3";
@@ -11,6 +11,9 @@ import {
import { DateUtils } from "@crupest/base";
import { Mail } from "../mail.ts";
+import { MailDeliverer } from "../mail.ts";
+
+export class LiveMailNotFoundError extends Error {}
async function s3MoveObject(
client: S3Client,
@@ -34,11 +37,6 @@ async function s3MoveObject(
const AWS_SES_S3_SETUP_TAG = "AMAZON_SES_SETUP_NOTIFICATION";
-export type AwsS3MailConsumer = (
- rawMail: string,
- s3Key: string,
-) => Promise<void>;
-
export class AwsMailFetcher {
readonly #livePrefix = "mail/live/";
readonly #archivePrefix = "mail/archive/";
@@ -76,12 +74,13 @@ export class AwsMailFetcher {
return result;
}
- async consumeS3Mail(
+ async deliverLiveMail(
logTag: string,
s3Key: string,
- consumer: AwsS3MailConsumer,
+ deliverer: MailDeliverer,
+ recipients?: string[],
) {
- console.info(logTag, `Fetching s3 mail ${s3Key}...`);
+ console.info(logTag, `Fetching live mail ${s3Key}...`);
const mailPath = `${this.#livePrefix}${s3Key}`;
const command = new GetObjectCommand({
Bucket: this.#bucket,
@@ -97,17 +96,17 @@ export class AwsMailFetcher {
}
rawMail = await res.Body.transformToString();
} catch (cause) {
- if (cause instanceof NoSuchBucket) {
- console.error(
- `S3 mail key ${s3Key} not found. Perhaps already consumed?`,
- );
- return;
+ if (cause instanceof NoSuchKey) {
+ const message =
+ `Live mail ${s3Key} is not found. Perhaps already delivered?`;
+ console.error(message, cause);
+ throw new LiveMailNotFoundError(message);
}
throw cause;
}
- console.info(logTag, `Calling consumer...`);
- await consumer(rawMail, s3Key);
+ const mail = new Mail(rawMail);
+ await deliverer.deliver({ mail, recipients });
const { date } = new Mail(rawMail).parsed;
const dateString = date != null
@@ -115,22 +114,22 @@ export class AwsMailFetcher {
: "invalid-date";
const newPath = `${this.#archivePrefix}${dateString}/${s3Key}`;
- console.info(logTag, `Archiving s3 mail ${s3Key} to ${newPath}...`);
+ console.info(logTag, `Archiving live mail ${s3Key} to ${newPath}...`);
await s3MoveObject(this.#s3, this.#bucket, mailPath, newPath);
- console.info(logTag, `Done consuming s3 mail ${s3Key}.`);
+ console.info(logTag, `Done deliver live mail ${s3Key}.`);
}
- async recycleLiveMails(consumer: AwsS3MailConsumer) {
+ async recycleLiveMails(deliverer: MailDeliverer) {
console.info("Begin to recycle live mails...");
const mails = await this.listLiveMails();
console.info(`Found ${mails.length} live mails`);
let counter = 1;
for (const s3Key of mails) {
- await this.consumeS3Mail(
+ await this.deliverLiveMail(
`[${counter++}/${mails.length}]`,
s3Key,
- consumer,
+ deliverer,
);
}
}