From d05e037bbc21a3cfb2f8ba35a3fa99abb9040b91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20F=C3=B6rtsch?= Date: Tue, 14 Apr 2026 17:27:22 +0200 Subject: [PATCH] =?UTF-8?q?webhook:=20PascalCase=20payload=20+=20ItemAdded?= =?UTF-8?q?=20only,=20switch=20=E2=9C=93=E2=9C=93=20signal=20to=20ffprobe?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit monitoring the mqtt broker revealed two bugs and one design dead-end: 1. the jellyfin-plugin-webhook publishes pascalcase fields (NotificationType, ItemId, ItemType) and we were reading camelcase (event, itemId, itemType). every real payload was rejected by the first guard — the mqtt path never ingested anything. 2. the plugin has no ItemUpdated / Library.* notifications. file rewrites on existing items produce zero broker traffic (observed: transcode + manual refresh metadata + 'recently added' appearance → no mqtt messages). ✓✓ via webhook is structurally impossible. fix the webhook path so brand-new library items actually get ingested, and narrow ACCEPTED_EVENTS to just 'ItemAdded' (the only library-side event the plugin emits). move the ✓✓ signal from webhook-corroboration to post-execute ffprobe via the existing verifyDesiredState helper: after ffmpeg returns 0 we probe the output file ourselves and flip verified=1 on match. the preflight-skipped path sets verified=1 too. renamed the db column webhook_verified → verified (via idempotent RENAME COLUMN migration) since the signal is no longer webhook-sourced, and updated the Done column tooltip to reflect that ffprobe is doing the verification. Co-Authored-By: Claude Opus 4.6 (1M context) --- package.json | 2 +- server/api/execute.ts | 24 ++++++++++++- server/api/review.ts | 2 +- server/db/index.ts | 4 +++ server/db/schema.ts | 10 +++--- server/services/__tests__/webhook.test.ts | 36 ++++++++++--------- server/services/rescan.ts | 17 ++++----- server/services/webhook.ts | 43 +++++++++++++---------- server/types.ts | 2 +- src/features/pipeline/DoneColumn.tsx | 6 ++-- src/shared/lib/types.ts | 8 ++--- 11 files changed, 94 insertions(+), 60 deletions(-) diff --git a/package.json b/package.json index 6d8564b..d42a325 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "netfelix-audio-fix", - "version": "2026.04.14.15", + "version": "2026.04.14.16", "scripts": { "dev:server": "NODE_ENV=development bun --hot server/index.tsx", "dev:client": "vite", diff --git a/server/api/execute.ts b/server/api/execute.ts index 3700292..c32550d 100644 --- a/server/api/execute.ts +++ b/server/api/execute.ts @@ -377,7 +377,9 @@ async function runJob(job: Job): Promise { "UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?", ) .run(msg, job.id); - db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?").run(job.item_id); + // Preflight matched → file is already correct, so the plan is + // both done AND independently verified. ✓✓ in the Done column. + db.prepare("UPDATE review_plans SET status = 'done', verified = 1 WHERE item_id = ?").run(job.item_id); })(); emitJobUpdate(job.id, "done", msg); return; @@ -488,6 +490,26 @@ async function runJob(job: Job): Promise { log(`Job ${job.id} completed successfully`); emitJobUpdate(job.id, "done", fullOutput); + // Post-execute verification: ffprobe the output file and compare to + // the plan. Independent check that ffmpeg actually produced what we + // asked for — ffmpeg can exit 0 while having dropped a stream or + // muxed into an unexpected layout. Sets verified=1 (the ✓✓ signal) + // on match. If the probe disagrees, we leave verified=0 and log the + // reason; the plan is still 'done' (the job technically succeeded) + // but the UI will surface it as unverified so the user notices. + if (item) { + verifyDesiredState(db, job.item_id, item.file_path) + .then((v) => { + if (v.matches) { + db.prepare("UPDATE review_plans SET verified = 1 WHERE item_id = ?").run(job.item_id); + log(`Job ${job.id} post-verify: ${v.reason}`); + } else { + warn(`Job ${job.id} post-verify FAILED: ${v.reason}`); + } + }) + .catch((err) => warn(`Job ${job.id} post-verify errored: ${String(err)}`)); + } + // Fire-and-forget: tell Jellyfin to rescan the file. The MQTT subscriber // will pick up Jellyfin's resulting Library event and re-analyze the // item — flipping the plan back to 'pending' if the on-disk streams diff --git a/server/api/review.ts b/server/api/review.ts index 5cda94b..3904f9a 100644 --- a/server/api/review.ts +++ b/server/api/review.ts @@ -322,7 +322,7 @@ app.get("/pipeline", (c) => { const done = db .prepare(` SELECT j.*, mi.name, mi.series_name, mi.type, - rp.job_type, rp.apple_compat, rp.webhook_verified + rp.job_type, rp.apple_compat, rp.verified FROM jobs j JOIN media_items mi ON mi.id = j.item_id JOIN review_plans rp ON rp.item_id = j.item_id diff --git a/server/db/index.ts b/server/db/index.ts index 218c0e9..97ef818 100644 --- a/server/db/index.ts +++ b/server/db/index.ts @@ -74,6 +74,10 @@ function migrate(db: Database): void { } }; alter("ALTER TABLE review_plans ADD COLUMN webhook_verified INTEGER NOT NULL DEFAULT 0"); + // 2026-04-14: renamed webhook_verified → verified once we realized the + // signal would come from our own ffprobe, not from a Jellyfin webhook. + // RENAME COLUMN preserves values; both alters are no-ops on fresh DBs. + alter("ALTER TABLE review_plans RENAME COLUMN webhook_verified TO verified"); } function seedDefaults(db: Database): void { diff --git a/server/db/schema.ts b/server/db/schema.ts index 1e4a639..6eeb7ae 100644 --- a/server/db/schema.ts +++ b/server/db/schema.ts @@ -70,11 +70,11 @@ CREATE TABLE IF NOT EXISTS review_plans ( subs_extracted INTEGER NOT NULL DEFAULT 0, notes TEXT, reviewed_at TEXT, - -- Jellyfin has independently re-probed the file and the fresh analysis - -- still says is_noop=1. Set on initial scan for items that never needed - -- work, or on a post-execute webhook that corroborates the on-disk state. - -- Surfaces as the second checkmark on the Done column. - webhook_verified INTEGER NOT NULL DEFAULT 0, + -- An independent post-hoc check has confirmed the on-disk file matches + -- the plan: either the analyzer saw is_noop=1 on first scan, or after + -- a job completed we ffprobed the output file and it agreed with the + -- kept/removed stream decisions. Surfaces as the ✓✓ in the Done column. + verified INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); diff --git a/server/services/__tests__/webhook.test.ts b/server/services/__tests__/webhook.test.ts index 744ff3d..34520e6 100644 --- a/server/services/__tests__/webhook.test.ts +++ b/server/services/__tests__/webhook.test.ts @@ -43,41 +43,43 @@ describe("processWebhookEvent — acceptance", () => { beforeEach(() => _resetDedupe()); afterEach(() => _resetDedupe()); - test("rejects unknown events", async () => { + test("rejects playback-related NotificationTypes (the plugin publishes many, we only want ItemAdded)", async () => { const db = makeDb(); - const res = await processWebhookEvent( - { event: "PlaybackStart", itemId: "jf-1", itemType: "Movie" }, - { db, jellyfin: JF, rescanCfg: RESCAN_CFG, getItemFn: async () => fakeItem() }, - ); - expect(res.accepted).toBe(false); - expect(res.reason).toContain("event"); + for (const nt of ["PlaybackStart", "PlaybackProgress", "UserDataSaved", "ItemUpdated"]) { + const res = await processWebhookEvent( + { NotificationType: nt, ItemId: "jf-1", ItemType: "Movie" }, + { db, jellyfin: JF, rescanCfg: RESCAN_CFG, getItemFn: async () => fakeItem() }, + ); + expect(res.accepted).toBe(false); + expect(res.reason).toContain("NotificationType"); + } }); test("rejects non-Movie/Episode types", async () => { const db = makeDb(); const res = await processWebhookEvent( - { event: "ItemUpdated", itemId: "jf-1", itemType: "Trailer" }, + { NotificationType: "ItemAdded", ItemId: "jf-1", ItemType: "Trailer" }, { db, jellyfin: JF, rescanCfg: RESCAN_CFG, getItemFn: async () => fakeItem({ Type: "Trailer" }) }, ); expect(res.accepted).toBe(false); - expect(res.reason).toContain("itemType"); + expect(res.reason).toContain("ItemType"); }); - test("rejects missing itemId", async () => { + test("rejects missing ItemId", async () => { const db = makeDb(); const res = await processWebhookEvent( - { event: "ItemUpdated", itemType: "Movie" }, + { NotificationType: "ItemAdded", ItemType: "Movie" }, { db, jellyfin: JF, rescanCfg: RESCAN_CFG, getItemFn: async () => fakeItem() }, ); expect(res.accepted).toBe(false); - expect(res.reason).toContain("itemId"); + expect(res.reason).toContain("ItemId"); }); test("dedupes bursts within 5s and accepts again after", async () => { const db = makeDb(); let fakeNow = 1_000_000; const getItemFn = async () => fakeItem(); - const payload = { event: "ItemUpdated", itemId: "jf-1", itemType: "Movie" }; + const payload = { NotificationType: "ItemAdded", ItemId: "jf-1", ItemType: "Movie" }; const first = await processWebhookEvent(payload, { db, @@ -113,7 +115,7 @@ describe("processWebhookEvent — acceptance", () => { test("drops when Jellyfin returns no item", async () => { const db = makeDb(); const res = await processWebhookEvent( - { event: "ItemUpdated", itemId: "jf-missing", itemType: "Movie" }, + { NotificationType: "ItemAdded", ItemId: "jf-missing", ItemType: "Movie" }, { db, jellyfin: JF, rescanCfg: RESCAN_CFG, getItemFn: async () => null }, ); expect(res.accepted).toBe(false); @@ -126,7 +128,7 @@ describe("processWebhookEvent — done-status override", () => { async function runWebhook(db: Database, item: JellyfinItem, cfg: RescanConfig = RESCAN_CFG) { return processWebhookEvent( - { event: "ItemUpdated", itemId: item.Id, itemType: item.Type as "Movie" | "Episode" }, + { NotificationType: "ItemAdded", ItemId: item.Id, ItemType: item.Type as "Movie" | "Episode" }, { db, jellyfin: JF, rescanCfg: cfg, getItemFn: async () => item }, ); } @@ -186,7 +188,7 @@ describe("processWebhookEvent — webhook_verified flag", () => { async function runWebhook(db: Database, item: JellyfinItem, cfg: RescanConfig = RESCAN_CFG) { return processWebhookEvent( - { event: "ItemUpdated", itemId: item.Id, itemType: item.Type as "Movie" | "Episode" }, + { NotificationType: "ItemAdded", ItemId: item.Id, ItemType: item.Type as "Movie" | "Episode" }, { db, jellyfin: JF, rescanCfg: cfg, getItemFn: async () => item }, ); } @@ -195,7 +197,7 @@ describe("processWebhookEvent — webhook_verified flag", () => { return ( db .prepare( - "SELECT rp.webhook_verified as v FROM review_plans rp JOIN media_items mi ON mi.id = rp.item_id WHERE mi.jellyfin_id = ?", + "SELECT rp.verified as v FROM review_plans rp JOIN media_items mi ON mi.id = rp.item_id WHERE mi.jellyfin_id = ?", ) .get(jellyfinId) as { v: number } ).v; diff --git a/server/services/rescan.ts b/server/services/rescan.ts index d3e8d6b..298ae27 100644 --- a/server/services/rescan.ts +++ b/server/services/rescan.ts @@ -230,14 +230,15 @@ export async function upsertJellyfinItem( // error → pending (retry loop) // else keep current status // - // webhook_verified tracks whether Jellyfin's view of the file agrees - // with ours. It's set to 1 whenever is_noop=1 (scan or webhook both - // count — an unchanged file is already in the desired end state and - // needs no further confirmation). It's cleared back to 0 the moment - // a webhook says the file drifted off-noop. + // `verified` tracks whether we have independent confirmation the file + // matches the plan. Set to 1 whenever is_noop=1 on a fresh analysis + // (an unchanged file is already in its desired end state). Post- + // execute, execute.ts re-runs verifyDesiredState and flips this on + // when ffprobe agrees. Cleared the moment a webhook says the file + // drifted off-noop. db .prepare(` - INSERT INTO review_plans (item_id, status, is_noop, confidence, apple_compat, job_type, notes, webhook_verified) + INSERT INTO review_plans (item_id, status, is_noop, confidence, apple_compat, job_type, notes, verified) VALUES (?, 'pending', ?, ?, ?, ?, ?, ?) ON CONFLICT(item_id) DO UPDATE SET status = CASE @@ -252,10 +253,10 @@ export async function upsertJellyfinItem( apple_compat = excluded.apple_compat, job_type = excluded.job_type, notes = excluded.notes, - webhook_verified = CASE + verified = CASE WHEN excluded.is_noop = 1 THEN 1 WHEN ? = 'webhook' THEN 0 - ELSE review_plans.webhook_verified + ELSE review_plans.verified END `) .run( diff --git a/server/services/webhook.ts b/server/services/webhook.ts index caeda58..0dfeef2 100644 --- a/server/services/webhook.ts +++ b/server/services/webhook.ts @@ -7,14 +7,19 @@ import { type RescanConfig, type RescanResult, upsertJellyfinItem } from "./resc import { loadLibrary as loadSonarrLibrary, isUsable as sonarrUsable } from "./sonarr"; /** - * Events we care about. Jellyfin's Webhook plugin emits many event types; - * Library.ItemAdded and Library.ItemUpdated are the only ones that signal - * an on-disk file mutation. We ignore user-data changes, playback, etc. + * Events we care about. Jellyfin's Webhook plugin (jellyfin-plugin-webhook) + * only exposes ItemAdded as a library-side notification — there is no + * ItemUpdated or Library.ItemUpdated. File-rewrites on existing items + * produce zero MQTT traffic, so we can't observe them here; the UI's + * post-job verification runs off our own ffprobe instead. + * + * Payload fields are PascalCase (NotificationType, ItemId, ItemType) — the + * earlier camelCase in this handler matched nothing the plugin ever sends. */ -const ACCEPTED_EVENTS = new Set(["ItemAdded", "ItemUpdated", "Library.ItemAdded", "Library.ItemUpdated"]); +const ACCEPTED_EVENTS = new Set(["ItemAdded"]); const ACCEPTED_TYPES = new Set(["Movie", "Episode"]); -/** 5-second dedupe window: Jellyfin fires ItemUpdated multiple times per rescan. */ +/** 5-second dedupe window: Jellyfin can fire the same ItemAdded twice when multiple libraries share a path. */ const DEDUPE_WINDOW_MS = 5000; const dedupe = new Map(); @@ -29,9 +34,9 @@ function parseLanguageList(raw: string | null | undefined, fallback: string[]): } export interface WebhookPayload { - event?: string; - itemId?: string; - itemType?: string; + NotificationType?: string; + ItemId?: string; + ItemType?: string; } export interface WebhookHandlerDeps { @@ -59,14 +64,14 @@ export interface WebhookResult { export async function processWebhookEvent(payload: WebhookPayload, deps: WebhookHandlerDeps): Promise { const { db, jellyfin, rescanCfg, getItemFn = getItem, now = Date.now } = deps; - if (!payload.event || !ACCEPTED_EVENTS.has(payload.event)) { - return { accepted: false, reason: `event '${payload.event}' not accepted` }; + if (!payload.NotificationType || !ACCEPTED_EVENTS.has(payload.NotificationType)) { + return { accepted: false, reason: `NotificationType '${payload.NotificationType}' not accepted` }; } - if (!payload.itemType || !ACCEPTED_TYPES.has(payload.itemType)) { - return { accepted: false, reason: `itemType '${payload.itemType}' not accepted` }; + if (!payload.ItemType || !ACCEPTED_TYPES.has(payload.ItemType)) { + return { accepted: false, reason: `ItemType '${payload.ItemType}' not accepted` }; } - if (!payload.itemId) { - return { accepted: false, reason: "missing itemId" }; + if (!payload.ItemId) { + return { accepted: false, reason: "missing ItemId" }; } // Debounce: drop bursts within the window, always evict stale entries. @@ -74,20 +79,20 @@ export async function processWebhookEvent(payload: WebhookPayload, deps: Webhook for (const [id, seen] of dedupe) { if (ts - seen > DEDUPE_WINDOW_MS) dedupe.delete(id); } - const last = dedupe.get(payload.itemId); + const last = dedupe.get(payload.ItemId); if (last != null && ts - last <= DEDUPE_WINDOW_MS) { return { accepted: false, reason: "deduped" }; } - dedupe.set(payload.itemId, ts); + dedupe.set(payload.ItemId, ts); - const fresh = await getItemFn(jellyfin, payload.itemId); + const fresh = await getItemFn(jellyfin, payload.ItemId); if (!fresh) { - warn(`Webhook: Jellyfin returned no item for ${payload.itemId}`); + warn(`Webhook: Jellyfin returned no item for ${payload.ItemId}`); return { accepted: false, reason: "jellyfin returned no item" }; } const result = await upsertJellyfinItem(db, fresh, rescanCfg, { source: "webhook" }); - log(`Webhook: reanalyzed ${payload.itemType} ${payload.itemId} is_noop=${result.isNoop}`); + log(`Webhook: ingested ${payload.ItemType} ${payload.ItemId} is_noop=${result.isNoop}`); return { accepted: true, result }; } diff --git a/server/types.ts b/server/types.ts index 8e21f18..3b1fad1 100644 --- a/server/types.ts +++ b/server/types.ts @@ -65,7 +65,7 @@ export interface ReviewPlan { subs_extracted: number; notes: string | null; reviewed_at: string | null; - webhook_verified: number; + verified: number; created_at: string; } diff --git a/src/features/pipeline/DoneColumn.tsx b/src/features/pipeline/DoneColumn.tsx index 45c5f88..b5af2eb 100644 --- a/src/features/pipeline/DoneColumn.tsx +++ b/src/features/pipeline/DoneColumn.tsx @@ -21,12 +21,12 @@ export function DoneColumn({ items, onMutate }: DoneColumnProps) { actions={items.length > 0 ? [{ label: "Clear", onClick: clear }] : undefined} > {items.map((item) => { - const verified = item.status === "done" && item.webhook_verified === 1; + const verified = item.status === "done" && item.verified === 1; const mark = verified ? "✓✓" : item.status === "done" ? "✓" : "✗"; const markTitle = verified - ? "Done — Jellyfin re-probe confirms the file matches the plan" + ? "Done — ffprobe confirms the on-disk file matches the plan" : item.status === "done" - ? "Done — awaiting Jellyfin webhook confirmation" + ? "Done — awaiting post-job verification" : "Error"; return (
diff --git a/src/shared/lib/types.ts b/src/shared/lib/types.ts index fb9ac01..a3ea5c4 100644 --- a/src/shared/lib/types.ts +++ b/src/shared/lib/types.ts @@ -155,10 +155,10 @@ export interface PipelineJobItem { file_path?: string; confidence?: "high" | "low"; is_noop?: number; - // 1 when Jellyfin's independent re-probe agrees the on-disk file is - // already in the desired end state. Renders as a second checkmark in - // the Done column. - webhook_verified?: number; + // 1 when an independent post-hoc check confirms the on-disk file matches + // the plan (ffprobe after a job, or is_noop=1 on the very first scan). + // Renders as the second checkmark in the Done column. + verified?: number; transcode_reasons?: string[]; audio_streams?: PipelineAudioStream[]; }