94610d05b7
RescanConfig now only carries audioLanguages. Radarr/Sonarr library loading, language resolution, and resolveSeriesTvdb callback removed from rescan.ts, scan.ts, and webhook.ts. RescanResult no longer tracks radarrHit/sonarrHit/missingProviderId counters. Tests updated: removed authoritative-source and resolved-TVDB-enables-Sonarr tests (moving to processInbox in a later task), added assertion that scan never sets sonarr/radarr as language source. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
132 lines
4.5 KiB
TypeScript
132 lines
4.5 KiB
TypeScript
import type { Database } from "bun:sqlite";
|
|
import { getAllConfig, getDb } from "../db/index";
|
|
import { log, warn } from "../lib/log";
|
|
import { getItem, type JellyfinConfig } from "./jellyfin";
|
|
import { type RescanConfig, type RescanResult, upsertJellyfinItem } from "./rescan";
|
|
|
|
/**
|
|
* Events we care about. Jellyfin's Webhook plugin (jellyfin-plugin-webhook)
|
|
* only exposes ItemAdded as a library-side notification — there is no
|
|
* ItemUpdated or Library.ItemUpdated. File-rewrites on existing items
|
|
* produce zero MQTT traffic, so we can't observe them here; the UI's
|
|
* post-job verification runs off our own ffprobe instead.
|
|
*
|
|
* Payload fields are PascalCase (NotificationType, ItemId, ItemType) — the
|
|
* earlier camelCase in this handler matched nothing the plugin ever sends.
|
|
*/
|
|
const ACCEPTED_EVENTS = new Set(["ItemAdded"]);
|
|
const ACCEPTED_TYPES = new Set(["Movie", "Episode"]);
|
|
|
|
/** 5-second dedupe window: Jellyfin can fire the same ItemAdded twice when multiple libraries share a path. */
|
|
const DEDUPE_WINDOW_MS = 5000;
|
|
const dedupe = new Map<string, number>();
|
|
|
|
function parseLanguageList(raw: string | null | undefined, fallback: string[]): string[] {
|
|
if (!raw) return fallback;
|
|
try {
|
|
const parsed = JSON.parse(raw);
|
|
return Array.isArray(parsed) ? parsed.filter((v): v is string => typeof v === "string") : fallback;
|
|
} catch {
|
|
return fallback;
|
|
}
|
|
}
|
|
|
|
export interface WebhookPayload {
|
|
NotificationType?: string;
|
|
ItemId?: string;
|
|
ItemType?: string;
|
|
}
|
|
|
|
export interface WebhookHandlerDeps {
|
|
db: Database;
|
|
jellyfin: JellyfinConfig;
|
|
rescanCfg: RescanConfig;
|
|
getItemFn?: typeof getItem;
|
|
now?: () => number;
|
|
}
|
|
|
|
export interface WebhookResult {
|
|
accepted: boolean;
|
|
reason?: string;
|
|
result?: RescanResult;
|
|
}
|
|
|
|
/**
|
|
* Parse an incoming webhook payload and, if it describes a relevant Jellyfin
|
|
* library event for a Movie/Episode, re-analyze the item and let rescan's
|
|
* webhook-override flip stale 'done' plans back to 'pending'.
|
|
*
|
|
* Errors from Jellyfin are logged and swallowed: one bad message must not
|
|
* take down the MQTT subscriber.
|
|
*/
|
|
export async function processWebhookEvent(payload: WebhookPayload, deps: WebhookHandlerDeps): Promise<WebhookResult> {
|
|
const { db, jellyfin, rescanCfg, getItemFn = getItem, now = Date.now } = deps;
|
|
|
|
if (!payload.NotificationType || !ACCEPTED_EVENTS.has(payload.NotificationType)) {
|
|
return { accepted: false, reason: `NotificationType '${payload.NotificationType}' not accepted` };
|
|
}
|
|
if (!payload.ItemType || !ACCEPTED_TYPES.has(payload.ItemType)) {
|
|
return { accepted: false, reason: `ItemType '${payload.ItemType}' not accepted` };
|
|
}
|
|
if (!payload.ItemId) {
|
|
return { accepted: false, reason: "missing ItemId" };
|
|
}
|
|
|
|
// Debounce: drop bursts within the window, always evict stale entries.
|
|
const ts = now();
|
|
for (const [id, seen] of dedupe) {
|
|
if (ts - seen > DEDUPE_WINDOW_MS) dedupe.delete(id);
|
|
}
|
|
const last = dedupe.get(payload.ItemId);
|
|
if (last != null && ts - last <= DEDUPE_WINDOW_MS) {
|
|
return { accepted: false, reason: "deduped" };
|
|
}
|
|
dedupe.set(payload.ItemId, ts);
|
|
|
|
const fresh = await getItemFn(jellyfin, payload.ItemId);
|
|
if (!fresh) {
|
|
warn(`Webhook: Jellyfin returned no item for ${payload.ItemId}`);
|
|
return { accepted: false, reason: "jellyfin returned no item" };
|
|
}
|
|
|
|
const result = await upsertJellyfinItem(db, fresh, rescanCfg, { source: "webhook" });
|
|
log(`Webhook: ingested ${payload.ItemType} ${payload.ItemId} is_noop=${result.isNoop}`);
|
|
return { accepted: true, result };
|
|
}
|
|
|
|
/**
|
|
* MQTT-facing adapter: parses the raw payload text, pulls config, calls
|
|
* processWebhookEvent. Exposed so server/services/mqtt.ts can stay purely
|
|
* about transport, and tests can drive the logic without spinning up MQTT.
|
|
*/
|
|
export async function handleWebhookMessage(rawPayload: string): Promise<WebhookResult> {
|
|
let payload: WebhookPayload;
|
|
try {
|
|
payload = JSON.parse(rawPayload);
|
|
} catch (err) {
|
|
warn(`Webhook: malformed JSON payload: ${String(err)}`);
|
|
return { accepted: false, reason: "malformed JSON" };
|
|
}
|
|
|
|
const cfg = getAllConfig();
|
|
const jellyfin: JellyfinConfig = {
|
|
url: cfg.jellyfin_url,
|
|
apiKey: cfg.jellyfin_api_key,
|
|
userId: cfg.jellyfin_user_id,
|
|
};
|
|
if (!jellyfin.url || !jellyfin.apiKey) {
|
|
return { accepted: false, reason: "jellyfin not configured" };
|
|
}
|
|
|
|
const rescanCfg: RescanConfig = {
|
|
audioLanguages: parseLanguageList(cfg.audio_languages, []),
|
|
};
|
|
|
|
return processWebhookEvent(payload, { db: getDb(), jellyfin, rescanCfg });
|
|
}
|
|
|
|
/** Exposed for tests. */
|
|
export function _resetDedupe(): void {
|
|
dedupe.clear();
|
|
}
|