From 6466eb468ddac95c1c685de7d9c2369e72a14294 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20F=C3=B6rtsch?= Date: Tue, 10 Mar 2026 11:56:23 +0100 Subject: [PATCH] add sync-to-cache pipeline, publish events on new threads Co-Authored-By: Claude Opus 4.6 --- backend/src/services/cache-sync.test.ts | 71 +++++++++++++++++++++++++ backend/src/services/cache-sync.ts | 51 ++++++++++++++++++ 2 files changed, 122 insertions(+) create mode 100644 backend/src/services/cache-sync.test.ts create mode 100644 backend/src/services/cache-sync.ts diff --git a/backend/src/services/cache-sync.test.ts b/backend/src/services/cache-sync.test.ts new file mode 100644 index 0000000..088fce4 --- /dev/null +++ b/backend/src/services/cache-sync.test.ts @@ -0,0 +1,71 @@ +import { describe, expect, test, beforeEach } from "bun:test"; +import { syncThreadsToCache } from "./cache-sync"; +import { createDatabase, getThreads, getMessagesForThread } from "../db/index"; +import { EventBus } from "./eventbus"; +import type { ThreadSummary, Message } from "./notmuch"; +import type { Database } from "bun:sqlite"; + +let db: Database; +let bus: EventBus; + +beforeEach(() => { + db = createDatabase(":memory:"); + bus = new EventBus(); +}); + +const mockThreads: ThreadSummary[] = [ + { + threadId: "t001", + subject: "Q1 Planning", + authors: "Alice, Bob", + totalMessages: 2, + tags: ["inbox", "unread"], + timestamp: 1709884532, + }, +]; + +const mockMessages: Message[] = [ + { + messageId: "msg001@example.com", + from: "Alice ", + to: "user@example.com", + subject: "Q1 Planning", + date: "Fri, 08 Mar 2024 10:15:32 +0100", + inReplyTo: "", + references: "", + body: "Hey, let's plan Q1.\n", + tags: ["inbox", "unread"], + timestamp: 1709884532, + }, + { + messageId: "msg003@example.com", + from: "Bob ", + to: "alice@example.com", + subject: "Re: Q1 Planning", + date: "Fri, 08 Mar 2024 10:16:40 +0100", + inReplyTo: "msg001@example.com", + references: "msg001@example.com", + body: "Sounds good.\n", + tags: ["inbox"], + timestamp: 1709884600, + }, +]; + +describe("syncThreadsToCache", () => { + test("populates database from thread/message data", () => { + const events: unknown[] = []; + bus.subscribe((e) => events.push(e)); + + syncThreadsToCache(db, bus, "personal", mockThreads, new Map([["t001", mockMessages]])); + + const threads = getThreads(db, "personal"); + expect(threads).toHaveLength(1); + expect(threads[0].subject).toBe("Q1 Planning"); + + const messages = getMessagesForThread(db, "t001"); + expect(messages).toHaveLength(2); + + expect(events.length).toBeGreaterThan(0); + expect(events[0]).toEqual(expect.objectContaining({ type: "threads_updated" })); + }); +}); diff --git a/backend/src/services/cache-sync.ts b/backend/src/services/cache-sync.ts new file mode 100644 index 0000000..c309060 --- /dev/null +++ b/backend/src/services/cache-sync.ts @@ -0,0 +1,51 @@ +import type { Database } from "bun:sqlite"; +import type { EventBus } from "./eventbus"; +import type { ThreadSummary, Message } from "./notmuch"; +import { insertThread, insertMessage } from "../db/index"; + +export function syncThreadsToCache( + db: Database, + bus: EventBus, + accountId: string, + threads: ThreadSummary[], + messagesMap: Map, +): void { + const transaction = db.transaction(() => { + for (const thread of threads) { + insertThread(db, { + threadId: thread.threadId, + subject: thread.subject, + authors: thread.authors, + totalMessages: thread.totalMessages, + tags: thread.tags.join(","), + timestamp: thread.timestamp, + accountId, + }); + + const messages = messagesMap.get(thread.threadId) ?? []; + for (const msg of messages) { + insertMessage(db, { + messageId: msg.messageId, + threadId: thread.threadId, + fromHeader: msg.from, + toHeader: msg.to, + subject: msg.subject, + date: msg.date, + inReplyTo: msg.inReplyTo, + body: msg.body, + tags: msg.tags.join(","), + timestamp: msg.timestamp, + accountId, + }); + } + } + }); + + transaction(); + + bus.publish({ + type: "threads_updated", + accountId, + threadCount: threads.length, + }); +}