From d6ff129af41276ef7a522603b892707ac1951740 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20F=C3=B6rtsch?= Date: Tue, 10 Mar 2026 16:50:25 +0100 Subject: [PATCH] =?UTF-8?q?add=20legislation-syncer=20job:=20fetch=20upcom?= =?UTF-8?q?ing=20Vorg=C3=A4nge=20from=20DIP=20API=20hourly?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- src/server/index.ts | 9 ++ .../shared/jobs/legislation-syncer.test.ts | 88 +++++++++++++++++++ src/server/shared/jobs/legislation-syncer.ts | 82 +++++++++++++++++ 3 files changed, 179 insertions(+) create mode 100644 src/server/shared/jobs/legislation-syncer.test.ts create mode 100644 src/server/shared/jobs/legislation-syncer.ts diff --git a/src/server/index.ts b/src/server/index.ts index 0ff12cf..5c6a51a 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -1,9 +1,11 @@ import app from "./app" import { getPgBoss } from "./shared/jobs/client" +import { syncLegislation } from "./shared/jobs/legislation-syncer" import { checkForNewPolls } from "./shared/jobs/poll-checker" import { env } from "./shared/lib/env" const POLL_CHECK_SCHEDULE = "*/15 * * * *" // every 15 minutes +const LEGISLATION_SYNC_SCHEDULE = "0 * * * *" // every hour // start pg-boss const boss = getPgBoss() @@ -18,6 +20,13 @@ await boss.work("poll-checker", async () => { }) console.log("[pg-boss] poll-checker cron registered (every 15 min)") +await boss.createQueue("legislation-syncer") +await boss.schedule("legislation-syncer", LEGISLATION_SYNC_SCHEDULE) +await boss.work("legislation-syncer", async () => { + await syncLegislation() +}) +console.log("[pg-boss] legislation-syncer cron registered (every 60 min)") + // graceful shutdown const shutdown = async () => { console.log("[server] shutting down…") diff --git a/src/server/shared/jobs/legislation-syncer.test.ts b/src/server/shared/jobs/legislation-syncer.test.ts new file mode 100644 index 0000000..5973819 --- /dev/null +++ b/src/server/shared/jobs/legislation-syncer.test.ts @@ -0,0 +1,88 @@ +import { beforeEach, describe, expect, it, vi } from "vitest" + +vi.mock("../db/client", () => { + const mockDb = { + select: vi.fn(), + insert: vi.fn(), + } + const chain = (result: unknown) => ({ + from: vi.fn().mockReturnValue({ + where: vi.fn().mockResolvedValue(result), + }), + }) + mockDb.select.mockReturnValue(chain([])) + mockDb.insert.mockReturnValue({ + values: vi.fn().mockReturnValue({ + onConflictDoUpdate: vi.fn().mockResolvedValue(undefined), + onConflictDoNothing: vi.fn().mockResolvedValue(undefined), + }), + }) + return { db: mockDb } +}) + +vi.mock("../lib/dip-api", () => ({ + fetchUpcomingVorgaenge: vi.fn(), + fetchVorgangDetail: vi.fn(), +})) + +const { syncLegislation } = await import("./legislation-syncer") +const { fetchUpcomingVorgaenge, fetchVorgangDetail } = await import( + "../lib/dip-api" +) + +describe("legislation-syncer", () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it("fetches upcoming Vorgänge and upserts new ones", async () => { + vi.mocked(fetchUpcomingVorgaenge).mockResolvedValue([ + { + id: 100, + titel: "Test Gesetz", + beratungsstand: "Beschlussempfehlung liegt vor", + datum: "2026-03-15", + vorgangstyp: "Gesetzgebung", + sachgebiet: ["Wirtschaft"], + }, + ]) + vi.mocked(fetchVorgangDetail).mockResolvedValue({ + titel: "Test Gesetz", + beratungsstand: "Beschlussempfehlung liegt vor", + abstract: "Zusammenfassung", + sachgebiet: ["Wirtschaft"], + datum: "2026-03-15", + }) + + await syncLegislation() + + expect(fetchUpcomingVorgaenge).toHaveBeenCalledOnce() + expect(fetchVorgangDetail).toHaveBeenCalledWith(100) + }) + + it("skips API detail call for already-cached Vorgänge", async () => { + const { db } = await import("../db/client") + // simulate existing cached entry + vi.mocked(db.select).mockReturnValue({ + from: vi.fn().mockReturnValue({ + where: vi.fn().mockResolvedValue([{ dipVorgangsId: 100 }]), + }), + // biome-ignore lint/suspicious/noExplicitAny: test mock requires flexible typing + } as any) + + vi.mocked(fetchUpcomingVorgaenge).mockResolvedValue([ + { + id: 100, + titel: "Cached Gesetz", + beratungsstand: "Beschlussempfehlung liegt vor", + datum: "2026-03-10", + vorgangstyp: "Gesetzgebung", + sachgebiet: [], + }, + ]) + + await syncLegislation() + + expect(fetchVorgangDetail).not.toHaveBeenCalled() + }) +}) diff --git a/src/server/shared/jobs/legislation-syncer.ts b/src/server/shared/jobs/legislation-syncer.ts new file mode 100644 index 0000000..307b943 --- /dev/null +++ b/src/server/shared/jobs/legislation-syncer.ts @@ -0,0 +1,82 @@ +import { inArray } from "drizzle-orm" +import { db } from "../db/client" +import { legislationTexts } from "../db/schema/legislation" +import { fetchUpcomingVorgaenge, fetchVorgangDetail } from "../lib/dip-api" + +const CACHE_TTL_MS = 24 * 60 * 60 * 1000 // 24 hours + +export async function syncLegislation() { + console.log("[legislation-syncer] starting sync…") + + const vorgaenge = await fetchUpcomingVorgaenge() + if (vorgaenge.length === 0) { + console.log("[legislation-syncer] no upcoming Vorgänge found") + return + } + + console.log( + `[legislation-syncer] found ${vorgaenge.length} upcoming Vorgänge`, + ) + + // check which ones we already have cached + const vorgangsIds = vorgaenge.map((v) => v.id) + const cached = await db + .select({ dipVorgangsId: legislationTexts.dipVorgangsId }) + .from(legislationTexts) + .where(inArray(legislationTexts.dipVorgangsId, vorgangsIds)) + + const cachedIds = new Set(cached.map((c) => c.dipVorgangsId)) + const newVorgaenge = vorgaenge.filter((v) => !cachedIds.has(v.id)) + + if (newVorgaenge.length === 0) { + console.log("[legislation-syncer] all Vorgänge already cached") + return + } + + console.log( + `[legislation-syncer] fetching detail for ${newVorgaenge.length} new Vorgänge`, + ) + + for (const vorgang of newVorgaenge) { + try { + const detail = await fetchVorgangDetail(vorgang.id) + const expiresAt = new Date(Date.now() + CACHE_TTL_MS) + + await db + .insert(legislationTexts) + .values({ + dipVorgangsId: vorgang.id, + title: detail.titel, + abstract: detail.abstract ?? null, + beratungsstand: detail.beratungsstand ?? null, + expiresAt, + }) + .onConflictDoUpdate({ + target: legislationTexts.dipVorgangsId, + set: { + title: detail.titel, + abstract: detail.abstract ?? null, + beratungsstand: detail.beratungsstand ?? null, + fetchedAt: new Date(), + expiresAt, + }, + }) + + console.log( + `[legislation-syncer] cached Vorgang ${vorgang.id}: ${detail.titel}`, + ) + } catch (err) { + console.error( + `[legislation-syncer] error fetching Vorgang ${vorgang.id}:`, + err, + ) + } + } + + // placeholder: trigger summarization job for items without summary + console.log( + "[legislation-syncer] TODO: trigger summarization for new items (Phase 2)", + ) + + console.log("[legislation-syncer] sync complete") +}