import type { Database } from "bun:sqlite"; import { getAllConfig, getDb } from "../db/index"; import { log, warn } from "../lib/log"; import { getItem, type JellyfinConfig } from "./jellyfin"; import { loadLibrary as loadRadarrLibrary, isUsable as radarrUsable } from "./radarr"; import { type RescanConfig, type RescanResult, upsertJellyfinItem } from "./rescan"; import { loadLibrary as loadSonarrLibrary, isUsable as sonarrUsable } from "./sonarr"; /** * Events we care about. Jellyfin's Webhook plugin (jellyfin-plugin-webhook) * only exposes ItemAdded as a library-side notification — there is no * ItemUpdated or Library.ItemUpdated. File-rewrites on existing items * produce zero MQTT traffic, so we can't observe them here; the UI's * post-job verification runs off our own ffprobe instead. * * Payload fields are PascalCase (NotificationType, ItemId, ItemType) — the * earlier camelCase in this handler matched nothing the plugin ever sends. */ const ACCEPTED_EVENTS = new Set(["ItemAdded"]); const ACCEPTED_TYPES = new Set(["Movie", "Episode"]); /** 5-second dedupe window: Jellyfin can fire the same ItemAdded twice when multiple libraries share a path. */ const DEDUPE_WINDOW_MS = 5000; const dedupe = new Map(); function parseLanguageList(raw: string | null | undefined, fallback: string[]): string[] { if (!raw) return fallback; try { const parsed = JSON.parse(raw); return Array.isArray(parsed) ? parsed.filter((v): v is string => typeof v === "string") : fallback; } catch { return fallback; } } export interface WebhookPayload { NotificationType?: string; ItemId?: string; ItemType?: string; } export interface WebhookHandlerDeps { db: Database; jellyfin: JellyfinConfig; rescanCfg: RescanConfig; getItemFn?: typeof getItem; now?: () => number; } export interface WebhookResult { accepted: boolean; reason?: string; result?: RescanResult; } /** * Parse an incoming webhook payload and, if it describes a relevant Jellyfin * library event for a Movie/Episode, re-analyze the item and let rescan's * webhook-override flip stale 'done' plans back to 'pending'. * * Errors from Jellyfin are logged and swallowed: one bad message must not * take down the MQTT subscriber. */ export async function processWebhookEvent(payload: WebhookPayload, deps: WebhookHandlerDeps): Promise { const { db, jellyfin, rescanCfg, getItemFn = getItem, now = Date.now } = deps; if (!payload.NotificationType || !ACCEPTED_EVENTS.has(payload.NotificationType)) { return { accepted: false, reason: `NotificationType '${payload.NotificationType}' not accepted` }; } if (!payload.ItemType || !ACCEPTED_TYPES.has(payload.ItemType)) { return { accepted: false, reason: `ItemType '${payload.ItemType}' not accepted` }; } if (!payload.ItemId) { return { accepted: false, reason: "missing ItemId" }; } // Debounce: drop bursts within the window, always evict stale entries. const ts = now(); for (const [id, seen] of dedupe) { if (ts - seen > DEDUPE_WINDOW_MS) dedupe.delete(id); } const last = dedupe.get(payload.ItemId); if (last != null && ts - last <= DEDUPE_WINDOW_MS) { return { accepted: false, reason: "deduped" }; } dedupe.set(payload.ItemId, ts); const fresh = await getItemFn(jellyfin, payload.ItemId); if (!fresh) { warn(`Webhook: Jellyfin returned no item for ${payload.ItemId}`); return { accepted: false, reason: "jellyfin returned no item" }; } const result = await upsertJellyfinItem(db, fresh, rescanCfg, { source: "webhook" }); log(`Webhook: ingested ${payload.ItemType} ${payload.ItemId} is_noop=${result.isNoop}`); return { accepted: true, result }; } /** * MQTT-facing adapter: parses the raw payload text, pulls config, calls * processWebhookEvent. Exposed so server/services/mqtt.ts can stay purely * about transport, and tests can drive the logic without spinning up MQTT. */ export async function handleWebhookMessage(rawPayload: string): Promise { let payload: WebhookPayload; try { payload = JSON.parse(rawPayload); } catch (err) { warn(`Webhook: malformed JSON payload: ${String(err)}`); return { accepted: false, reason: "malformed JSON" }; } const cfg = getAllConfig(); const jellyfin: JellyfinConfig = { url: cfg.jellyfin_url, apiKey: cfg.jellyfin_api_key, userId: cfg.jellyfin_user_id, }; if (!jellyfin.url || !jellyfin.apiKey) { return { accepted: false, reason: "jellyfin not configured" }; } const radarrCfg = { url: cfg.radarr_url, apiKey: cfg.radarr_api_key }; const sonarrCfg = { url: cfg.sonarr_url, apiKey: cfg.sonarr_api_key }; const radarrEnabled = cfg.radarr_enabled === "1" && radarrUsable(radarrCfg); const sonarrEnabled = cfg.sonarr_enabled === "1" && sonarrUsable(sonarrCfg); const [radarrLibrary, sonarrLibrary] = await Promise.all([ radarrEnabled ? loadRadarrLibrary(radarrCfg) : Promise.resolve(null), sonarrEnabled ? loadSonarrLibrary(sonarrCfg) : Promise.resolve(null), ]); const rescanCfg: RescanConfig = { audioLanguages: parseLanguageList(cfg.audio_languages, []), radarr: radarrEnabled ? radarrCfg : null, sonarr: sonarrEnabled ? sonarrCfg : null, radarrLibrary, sonarrLibrary, }; return processWebhookEvent(payload, { db: getDb(), jellyfin, rescanCfg }); } /** Exposed for tests. */ export function _resetDedupe(): void { dedupe.clear(); }