add legislation-syncer job: fetch upcoming Vorgänge from DIP API hourly

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-10 16:50:25 +01:00
parent 231eb6f6bc
commit d6ff129af4
3 changed files with 179 additions and 0 deletions

View File

@@ -1,9 +1,11 @@
import app from "./app"
import { getPgBoss } from "./shared/jobs/client"
import { syncLegislation } from "./shared/jobs/legislation-syncer"
import { checkForNewPolls } from "./shared/jobs/poll-checker"
import { env } from "./shared/lib/env"
const POLL_CHECK_SCHEDULE = "*/15 * * * *" // every 15 minutes
const LEGISLATION_SYNC_SCHEDULE = "0 * * * *" // every hour
// start pg-boss
const boss = getPgBoss()
@@ -18,6 +20,13 @@ await boss.work("poll-checker", async () => {
})
console.log("[pg-boss] poll-checker cron registered (every 15 min)")
await boss.createQueue("legislation-syncer")
await boss.schedule("legislation-syncer", LEGISLATION_SYNC_SCHEDULE)
await boss.work("legislation-syncer", async () => {
await syncLegislation()
})
console.log("[pg-boss] legislation-syncer cron registered (every 60 min)")
// graceful shutdown
const shutdown = async () => {
console.log("[server] shutting down…")

View File

@@ -0,0 +1,88 @@
import { beforeEach, describe, expect, it, vi } from "vitest"
vi.mock("../db/client", () => {
const mockDb = {
select: vi.fn(),
insert: vi.fn(),
}
const chain = (result: unknown) => ({
from: vi.fn().mockReturnValue({
where: vi.fn().mockResolvedValue(result),
}),
})
mockDb.select.mockReturnValue(chain([]))
mockDb.insert.mockReturnValue({
values: vi.fn().mockReturnValue({
onConflictDoUpdate: vi.fn().mockResolvedValue(undefined),
onConflictDoNothing: vi.fn().mockResolvedValue(undefined),
}),
})
return { db: mockDb }
})
vi.mock("../lib/dip-api", () => ({
fetchUpcomingVorgaenge: vi.fn(),
fetchVorgangDetail: vi.fn(),
}))
const { syncLegislation } = await import("./legislation-syncer")
const { fetchUpcomingVorgaenge, fetchVorgangDetail } = await import(
"../lib/dip-api"
)
describe("legislation-syncer", () => {
beforeEach(() => {
vi.clearAllMocks()
})
it("fetches upcoming Vorgänge and upserts new ones", async () => {
vi.mocked(fetchUpcomingVorgaenge).mockResolvedValue([
{
id: 100,
titel: "Test Gesetz",
beratungsstand: "Beschlussempfehlung liegt vor",
datum: "2026-03-15",
vorgangstyp: "Gesetzgebung",
sachgebiet: ["Wirtschaft"],
},
])
vi.mocked(fetchVorgangDetail).mockResolvedValue({
titel: "Test Gesetz",
beratungsstand: "Beschlussempfehlung liegt vor",
abstract: "Zusammenfassung",
sachgebiet: ["Wirtschaft"],
datum: "2026-03-15",
})
await syncLegislation()
expect(fetchUpcomingVorgaenge).toHaveBeenCalledOnce()
expect(fetchVorgangDetail).toHaveBeenCalledWith(100)
})
it("skips API detail call for already-cached Vorgänge", async () => {
const { db } = await import("../db/client")
// simulate existing cached entry
vi.mocked(db.select).mockReturnValue({
from: vi.fn().mockReturnValue({
where: vi.fn().mockResolvedValue([{ dipVorgangsId: 100 }]),
}),
// biome-ignore lint/suspicious/noExplicitAny: test mock requires flexible typing
} as any)
vi.mocked(fetchUpcomingVorgaenge).mockResolvedValue([
{
id: 100,
titel: "Cached Gesetz",
beratungsstand: "Beschlussempfehlung liegt vor",
datum: "2026-03-10",
vorgangstyp: "Gesetzgebung",
sachgebiet: [],
},
])
await syncLegislation()
expect(fetchVorgangDetail).not.toHaveBeenCalled()
})
})

View File

@@ -0,0 +1,82 @@
import { inArray } from "drizzle-orm"
import { db } from "../db/client"
import { legislationTexts } from "../db/schema/legislation"
import { fetchUpcomingVorgaenge, fetchVorgangDetail } from "../lib/dip-api"
const CACHE_TTL_MS = 24 * 60 * 60 * 1000 // 24 hours
export async function syncLegislation() {
console.log("[legislation-syncer] starting sync…")
const vorgaenge = await fetchUpcomingVorgaenge()
if (vorgaenge.length === 0) {
console.log("[legislation-syncer] no upcoming Vorgänge found")
return
}
console.log(
`[legislation-syncer] found ${vorgaenge.length} upcoming Vorgänge`,
)
// check which ones we already have cached
const vorgangsIds = vorgaenge.map((v) => v.id)
const cached = await db
.select({ dipVorgangsId: legislationTexts.dipVorgangsId })
.from(legislationTexts)
.where(inArray(legislationTexts.dipVorgangsId, vorgangsIds))
const cachedIds = new Set(cached.map((c) => c.dipVorgangsId))
const newVorgaenge = vorgaenge.filter((v) => !cachedIds.has(v.id))
if (newVorgaenge.length === 0) {
console.log("[legislation-syncer] all Vorgänge already cached")
return
}
console.log(
`[legislation-syncer] fetching detail for ${newVorgaenge.length} new Vorgänge`,
)
for (const vorgang of newVorgaenge) {
try {
const detail = await fetchVorgangDetail(vorgang.id)
const expiresAt = new Date(Date.now() + CACHE_TTL_MS)
await db
.insert(legislationTexts)
.values({
dipVorgangsId: vorgang.id,
title: detail.titel,
abstract: detail.abstract ?? null,
beratungsstand: detail.beratungsstand ?? null,
expiresAt,
})
.onConflictDoUpdate({
target: legislationTexts.dipVorgangsId,
set: {
title: detail.titel,
abstract: detail.abstract ?? null,
beratungsstand: detail.beratungsstand ?? null,
fetchedAt: new Date(),
expiresAt,
},
})
console.log(
`[legislation-syncer] cached Vorgang ${vorgang.id}: ${detail.titel}`,
)
} catch (err) {
console.error(
`[legislation-syncer] error fetching Vorgang ${vorgang.id}:`,
err,
)
}
}
// placeholder: trigger summarization job for items without summary
console.log(
"[legislation-syncer] TODO: trigger summarization for new items (Phase 2)",
)
console.log("[legislation-syncer] sync complete")
}