add sse event bus, real-time event endpoint
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,9 +1,12 @@
|
||||
import { Hono } from "hono";
|
||||
import { cors } from "hono/cors";
|
||||
import { emailRoutes } from "./routes/emails";
|
||||
import { eventRoutes } from "./routes/events";
|
||||
import { createDatabase } from "./db/index";
|
||||
import { EventBus } from "./services/eventbus";
|
||||
|
||||
const db = createDatabase(process.env.DB_PATH ?? "magnumopus.db");
|
||||
const bus = new EventBus();
|
||||
|
||||
const app = new Hono();
|
||||
|
||||
@@ -12,6 +15,7 @@ app.use("*", cors());
|
||||
app.get("/health", (c) => c.json({ status: "ok" }));
|
||||
|
||||
app.route("/api", emailRoutes(db));
|
||||
app.route("/api", eventRoutes(bus));
|
||||
|
||||
export default {
|
||||
port: Number(process.env.PORT ?? 3000),
|
||||
|
||||
31
backend/src/routes/events.ts
Normal file
31
backend/src/routes/events.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import { Hono } from "hono";
|
||||
import { streamSSE } from "hono/streaming";
|
||||
import type { EventBus } from "../services/eventbus";
|
||||
|
||||
export function eventRoutes(bus: EventBus): Hono {
|
||||
const router = new Hono();
|
||||
|
||||
router.get("/events", (c) => {
|
||||
return streamSSE(c, async (stream) => {
|
||||
let id = 0;
|
||||
const unsubscribe = bus.subscribe(async (event) => {
|
||||
await stream.writeSSE({
|
||||
data: JSON.stringify(event),
|
||||
event: event.type as string,
|
||||
id: String(id++),
|
||||
});
|
||||
});
|
||||
|
||||
stream.onAbort(() => {
|
||||
unsubscribe();
|
||||
});
|
||||
|
||||
// keep connection alive
|
||||
while (true) {
|
||||
await stream.sleep(30000);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
return router;
|
||||
}
|
||||
27
backend/src/services/eventbus.test.ts
Normal file
27
backend/src/services/eventbus.test.ts
Normal file
@@ -0,0 +1,27 @@
|
||||
import { describe, expect, test } from "bun:test";
|
||||
import { EventBus } from "./eventbus";
|
||||
|
||||
describe("EventBus", () => {
|
||||
test("subscribers receive published events", () => {
|
||||
const bus = new EventBus();
|
||||
const received: unknown[] = [];
|
||||
|
||||
bus.subscribe((event) => received.push(event));
|
||||
bus.publish({ type: "new_mail", threadId: "t001" });
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toEqual({ type: "new_mail", threadId: "t001" });
|
||||
});
|
||||
|
||||
test("unsubscribe stops receiving events", () => {
|
||||
const bus = new EventBus();
|
||||
const received: unknown[] = [];
|
||||
|
||||
const unsubscribe = bus.subscribe((event) => received.push(event));
|
||||
bus.publish({ type: "a" });
|
||||
unsubscribe();
|
||||
bus.publish({ type: "b" });
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
});
|
||||
});
|
||||
18
backend/src/services/eventbus.ts
Normal file
18
backend/src/services/eventbus.ts
Normal file
@@ -0,0 +1,18 @@
|
||||
export type EventHandler = (event: Record<string, unknown>) => void;
|
||||
|
||||
export class EventBus {
|
||||
private subscribers = new Set<EventHandler>();
|
||||
|
||||
subscribe(handler: EventHandler): () => void {
|
||||
this.subscribers.add(handler);
|
||||
return () => {
|
||||
this.subscribers.delete(handler);
|
||||
};
|
||||
}
|
||||
|
||||
publish(event: Record<string, unknown>): void {
|
||||
for (const handler of this.subscribers) {
|
||||
handler(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user