diff --git a/backend/src/index.ts b/backend/src/index.ts index ac3cd97..a10c1c8 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -1,9 +1,12 @@ import { Hono } from "hono"; import { cors } from "hono/cors"; import { emailRoutes } from "./routes/emails"; +import { eventRoutes } from "./routes/events"; import { createDatabase } from "./db/index"; +import { EventBus } from "./services/eventbus"; const db = createDatabase(process.env.DB_PATH ?? "magnumopus.db"); +const bus = new EventBus(); const app = new Hono(); @@ -12,6 +15,7 @@ app.use("*", cors()); app.get("/health", (c) => c.json({ status: "ok" })); app.route("/api", emailRoutes(db)); +app.route("/api", eventRoutes(bus)); export default { port: Number(process.env.PORT ?? 3000), diff --git a/backend/src/routes/events.ts b/backend/src/routes/events.ts new file mode 100644 index 0000000..5759739 --- /dev/null +++ b/backend/src/routes/events.ts @@ -0,0 +1,31 @@ +import { Hono } from "hono"; +import { streamSSE } from "hono/streaming"; +import type { EventBus } from "../services/eventbus"; + +export function eventRoutes(bus: EventBus): Hono { + const router = new Hono(); + + router.get("/events", (c) => { + return streamSSE(c, async (stream) => { + let id = 0; + const unsubscribe = bus.subscribe(async (event) => { + await stream.writeSSE({ + data: JSON.stringify(event), + event: event.type as string, + id: String(id++), + }); + }); + + stream.onAbort(() => { + unsubscribe(); + }); + + // keep connection alive + while (true) { + await stream.sleep(30000); + } + }); + }); + + return router; +} diff --git a/backend/src/services/eventbus.test.ts b/backend/src/services/eventbus.test.ts new file mode 100644 index 0000000..0a5f853 --- /dev/null +++ b/backend/src/services/eventbus.test.ts @@ -0,0 +1,27 @@ +import { describe, expect, test } from "bun:test"; +import { EventBus } from "./eventbus"; + +describe("EventBus", () => { + test("subscribers receive published events", () => { + const bus = new EventBus(); + const received: unknown[] = []; + + bus.subscribe((event) => received.push(event)); + bus.publish({ type: "new_mail", threadId: "t001" }); + + expect(received).toHaveLength(1); + expect(received[0]).toEqual({ type: "new_mail", threadId: "t001" }); + }); + + test("unsubscribe stops receiving events", () => { + const bus = new EventBus(); + const received: unknown[] = []; + + const unsubscribe = bus.subscribe((event) => received.push(event)); + bus.publish({ type: "a" }); + unsubscribe(); + bus.publish({ type: "b" }); + + expect(received).toHaveLength(1); + }); +}); diff --git a/backend/src/services/eventbus.ts b/backend/src/services/eventbus.ts new file mode 100644 index 0000000..7f3c52a --- /dev/null +++ b/backend/src/services/eventbus.ts @@ -0,0 +1,18 @@ +export type EventHandler = (event: Record) => void; + +export class EventBus { + private subscribers = new Set(); + + subscribe(handler: EventHandler): () => void { + this.subscribers.add(handler); + return () => { + this.subscribers.delete(handler); + }; + } + + publish(event: Record): void { + for (const handler of this.subscribers) { + handler(event); + } + } +}