webhook: PascalCase payload + ItemAdded only, switch ✓✓ signal to ffprobe
All checks were successful
Build and Push Docker Image / build (push) Successful in 1m56s
All checks were successful
Build and Push Docker Image / build (push) Successful in 1m56s
monitoring the mqtt broker revealed two bugs and one design dead-end: 1. the jellyfin-plugin-webhook publishes pascalcase fields (NotificationType, ItemId, ItemType) and we were reading camelcase (event, itemId, itemType). every real payload was rejected by the first guard — the mqtt path never ingested anything. 2. the plugin has no ItemUpdated / Library.* notifications. file rewrites on existing items produce zero broker traffic (observed: transcode + manual refresh metadata + 'recently added' appearance → no mqtt messages). ✓✓ via webhook is structurally impossible. fix the webhook path so brand-new library items actually get ingested, and narrow ACCEPTED_EVENTS to just 'ItemAdded' (the only library-side event the plugin emits). move the ✓✓ signal from webhook-corroboration to post-execute ffprobe via the existing verifyDesiredState helper: after ffmpeg returns 0 we probe the output file ourselves and flip verified=1 on match. the preflight-skipped path sets verified=1 too. renamed the db column webhook_verified → verified (via idempotent RENAME COLUMN migration) since the signal is no longer webhook-sourced, and updated the Done column tooltip to reflect that ffprobe is doing the verification. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -377,7 +377,9 @@ async function runJob(job: Job): Promise<void> {
|
||||
"UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?",
|
||||
)
|
||||
.run(msg, job.id);
|
||||
db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?").run(job.item_id);
|
||||
// Preflight matched → file is already correct, so the plan is
|
||||
// both done AND independently verified. ✓✓ in the Done column.
|
||||
db.prepare("UPDATE review_plans SET status = 'done', verified = 1 WHERE item_id = ?").run(job.item_id);
|
||||
})();
|
||||
emitJobUpdate(job.id, "done", msg);
|
||||
return;
|
||||
@@ -488,6 +490,26 @@ async function runJob(job: Job): Promise<void> {
|
||||
log(`Job ${job.id} completed successfully`);
|
||||
emitJobUpdate(job.id, "done", fullOutput);
|
||||
|
||||
// Post-execute verification: ffprobe the output file and compare to
|
||||
// the plan. Independent check that ffmpeg actually produced what we
|
||||
// asked for — ffmpeg can exit 0 while having dropped a stream or
|
||||
// muxed into an unexpected layout. Sets verified=1 (the ✓✓ signal)
|
||||
// on match. If the probe disagrees, we leave verified=0 and log the
|
||||
// reason; the plan is still 'done' (the job technically succeeded)
|
||||
// but the UI will surface it as unverified so the user notices.
|
||||
if (item) {
|
||||
verifyDesiredState(db, job.item_id, item.file_path)
|
||||
.then((v) => {
|
||||
if (v.matches) {
|
||||
db.prepare("UPDATE review_plans SET verified = 1 WHERE item_id = ?").run(job.item_id);
|
||||
log(`Job ${job.id} post-verify: ${v.reason}`);
|
||||
} else {
|
||||
warn(`Job ${job.id} post-verify FAILED: ${v.reason}`);
|
||||
}
|
||||
})
|
||||
.catch((err) => warn(`Job ${job.id} post-verify errored: ${String(err)}`));
|
||||
}
|
||||
|
||||
// Fire-and-forget: tell Jellyfin to rescan the file. The MQTT subscriber
|
||||
// will pick up Jellyfin's resulting Library event and re-analyze the
|
||||
// item — flipping the plan back to 'pending' if the on-disk streams
|
||||
|
||||
@@ -322,7 +322,7 @@ app.get("/pipeline", (c) => {
|
||||
const done = db
|
||||
.prepare(`
|
||||
SELECT j.*, mi.name, mi.series_name, mi.type,
|
||||
rp.job_type, rp.apple_compat, rp.webhook_verified
|
||||
rp.job_type, rp.apple_compat, rp.verified
|
||||
FROM jobs j
|
||||
JOIN media_items mi ON mi.id = j.item_id
|
||||
JOIN review_plans rp ON rp.item_id = j.item_id
|
||||
|
||||
@@ -74,6 +74,10 @@ function migrate(db: Database): void {
|
||||
}
|
||||
};
|
||||
alter("ALTER TABLE review_plans ADD COLUMN webhook_verified INTEGER NOT NULL DEFAULT 0");
|
||||
// 2026-04-14: renamed webhook_verified → verified once we realized the
|
||||
// signal would come from our own ffprobe, not from a Jellyfin webhook.
|
||||
// RENAME COLUMN preserves values; both alters are no-ops on fresh DBs.
|
||||
alter("ALTER TABLE review_plans RENAME COLUMN webhook_verified TO verified");
|
||||
}
|
||||
|
||||
function seedDefaults(db: Database): void {
|
||||
|
||||
@@ -70,11 +70,11 @@ CREATE TABLE IF NOT EXISTS review_plans (
|
||||
subs_extracted INTEGER NOT NULL DEFAULT 0,
|
||||
notes TEXT,
|
||||
reviewed_at TEXT,
|
||||
-- Jellyfin has independently re-probed the file and the fresh analysis
|
||||
-- still says is_noop=1. Set on initial scan for items that never needed
|
||||
-- work, or on a post-execute webhook that corroborates the on-disk state.
|
||||
-- Surfaces as the second checkmark on the Done column.
|
||||
webhook_verified INTEGER NOT NULL DEFAULT 0,
|
||||
-- An independent post-hoc check has confirmed the on-disk file matches
|
||||
-- the plan: either the analyzer saw is_noop=1 on first scan, or after
|
||||
-- a job completed we ffprobed the output file and it agreed with the
|
||||
-- kept/removed stream decisions. Surfaces as the ✓✓ in the Done column.
|
||||
verified INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);
|
||||
|
||||
|
||||
@@ -43,41 +43,43 @@ describe("processWebhookEvent — acceptance", () => {
|
||||
beforeEach(() => _resetDedupe());
|
||||
afterEach(() => _resetDedupe());
|
||||
|
||||
test("rejects unknown events", async () => {
|
||||
test("rejects playback-related NotificationTypes (the plugin publishes many, we only want ItemAdded)", async () => {
|
||||
const db = makeDb();
|
||||
const res = await processWebhookEvent(
|
||||
{ event: "PlaybackStart", itemId: "jf-1", itemType: "Movie" },
|
||||
{ db, jellyfin: JF, rescanCfg: RESCAN_CFG, getItemFn: async () => fakeItem() },
|
||||
);
|
||||
expect(res.accepted).toBe(false);
|
||||
expect(res.reason).toContain("event");
|
||||
for (const nt of ["PlaybackStart", "PlaybackProgress", "UserDataSaved", "ItemUpdated"]) {
|
||||
const res = await processWebhookEvent(
|
||||
{ NotificationType: nt, ItemId: "jf-1", ItemType: "Movie" },
|
||||
{ db, jellyfin: JF, rescanCfg: RESCAN_CFG, getItemFn: async () => fakeItem() },
|
||||
);
|
||||
expect(res.accepted).toBe(false);
|
||||
expect(res.reason).toContain("NotificationType");
|
||||
}
|
||||
});
|
||||
|
||||
test("rejects non-Movie/Episode types", async () => {
|
||||
const db = makeDb();
|
||||
const res = await processWebhookEvent(
|
||||
{ event: "ItemUpdated", itemId: "jf-1", itemType: "Trailer" },
|
||||
{ NotificationType: "ItemAdded", ItemId: "jf-1", ItemType: "Trailer" },
|
||||
{ db, jellyfin: JF, rescanCfg: RESCAN_CFG, getItemFn: async () => fakeItem({ Type: "Trailer" }) },
|
||||
);
|
||||
expect(res.accepted).toBe(false);
|
||||
expect(res.reason).toContain("itemType");
|
||||
expect(res.reason).toContain("ItemType");
|
||||
});
|
||||
|
||||
test("rejects missing itemId", async () => {
|
||||
test("rejects missing ItemId", async () => {
|
||||
const db = makeDb();
|
||||
const res = await processWebhookEvent(
|
||||
{ event: "ItemUpdated", itemType: "Movie" },
|
||||
{ NotificationType: "ItemAdded", ItemType: "Movie" },
|
||||
{ db, jellyfin: JF, rescanCfg: RESCAN_CFG, getItemFn: async () => fakeItem() },
|
||||
);
|
||||
expect(res.accepted).toBe(false);
|
||||
expect(res.reason).toContain("itemId");
|
||||
expect(res.reason).toContain("ItemId");
|
||||
});
|
||||
|
||||
test("dedupes bursts within 5s and accepts again after", async () => {
|
||||
const db = makeDb();
|
||||
let fakeNow = 1_000_000;
|
||||
const getItemFn = async () => fakeItem();
|
||||
const payload = { event: "ItemUpdated", itemId: "jf-1", itemType: "Movie" };
|
||||
const payload = { NotificationType: "ItemAdded", ItemId: "jf-1", ItemType: "Movie" };
|
||||
|
||||
const first = await processWebhookEvent(payload, {
|
||||
db,
|
||||
@@ -113,7 +115,7 @@ describe("processWebhookEvent — acceptance", () => {
|
||||
test("drops when Jellyfin returns no item", async () => {
|
||||
const db = makeDb();
|
||||
const res = await processWebhookEvent(
|
||||
{ event: "ItemUpdated", itemId: "jf-missing", itemType: "Movie" },
|
||||
{ NotificationType: "ItemAdded", ItemId: "jf-missing", ItemType: "Movie" },
|
||||
{ db, jellyfin: JF, rescanCfg: RESCAN_CFG, getItemFn: async () => null },
|
||||
);
|
||||
expect(res.accepted).toBe(false);
|
||||
@@ -126,7 +128,7 @@ describe("processWebhookEvent — done-status override", () => {
|
||||
|
||||
async function runWebhook(db: Database, item: JellyfinItem, cfg: RescanConfig = RESCAN_CFG) {
|
||||
return processWebhookEvent(
|
||||
{ event: "ItemUpdated", itemId: item.Id, itemType: item.Type as "Movie" | "Episode" },
|
||||
{ NotificationType: "ItemAdded", ItemId: item.Id, ItemType: item.Type as "Movie" | "Episode" },
|
||||
{ db, jellyfin: JF, rescanCfg: cfg, getItemFn: async () => item },
|
||||
);
|
||||
}
|
||||
@@ -186,7 +188,7 @@ describe("processWebhookEvent — webhook_verified flag", () => {
|
||||
|
||||
async function runWebhook(db: Database, item: JellyfinItem, cfg: RescanConfig = RESCAN_CFG) {
|
||||
return processWebhookEvent(
|
||||
{ event: "ItemUpdated", itemId: item.Id, itemType: item.Type as "Movie" | "Episode" },
|
||||
{ NotificationType: "ItemAdded", ItemId: item.Id, ItemType: item.Type as "Movie" | "Episode" },
|
||||
{ db, jellyfin: JF, rescanCfg: cfg, getItemFn: async () => item },
|
||||
);
|
||||
}
|
||||
@@ -195,7 +197,7 @@ describe("processWebhookEvent — webhook_verified flag", () => {
|
||||
return (
|
||||
db
|
||||
.prepare(
|
||||
"SELECT rp.webhook_verified as v FROM review_plans rp JOIN media_items mi ON mi.id = rp.item_id WHERE mi.jellyfin_id = ?",
|
||||
"SELECT rp.verified as v FROM review_plans rp JOIN media_items mi ON mi.id = rp.item_id WHERE mi.jellyfin_id = ?",
|
||||
)
|
||||
.get(jellyfinId) as { v: number }
|
||||
).v;
|
||||
|
||||
@@ -230,14 +230,15 @@ export async function upsertJellyfinItem(
|
||||
// error → pending (retry loop)
|
||||
// else keep current status
|
||||
//
|
||||
// webhook_verified tracks whether Jellyfin's view of the file agrees
|
||||
// with ours. It's set to 1 whenever is_noop=1 (scan or webhook both
|
||||
// count — an unchanged file is already in the desired end state and
|
||||
// needs no further confirmation). It's cleared back to 0 the moment
|
||||
// a webhook says the file drifted off-noop.
|
||||
// `verified` tracks whether we have independent confirmation the file
|
||||
// matches the plan. Set to 1 whenever is_noop=1 on a fresh analysis
|
||||
// (an unchanged file is already in its desired end state). Post-
|
||||
// execute, execute.ts re-runs verifyDesiredState and flips this on
|
||||
// when ffprobe agrees. Cleared the moment a webhook says the file
|
||||
// drifted off-noop.
|
||||
db
|
||||
.prepare(`
|
||||
INSERT INTO review_plans (item_id, status, is_noop, confidence, apple_compat, job_type, notes, webhook_verified)
|
||||
INSERT INTO review_plans (item_id, status, is_noop, confidence, apple_compat, job_type, notes, verified)
|
||||
VALUES (?, 'pending', ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(item_id) DO UPDATE SET
|
||||
status = CASE
|
||||
@@ -252,10 +253,10 @@ export async function upsertJellyfinItem(
|
||||
apple_compat = excluded.apple_compat,
|
||||
job_type = excluded.job_type,
|
||||
notes = excluded.notes,
|
||||
webhook_verified = CASE
|
||||
verified = CASE
|
||||
WHEN excluded.is_noop = 1 THEN 1
|
||||
WHEN ? = 'webhook' THEN 0
|
||||
ELSE review_plans.webhook_verified
|
||||
ELSE review_plans.verified
|
||||
END
|
||||
`)
|
||||
.run(
|
||||
|
||||
@@ -7,14 +7,19 @@ import { type RescanConfig, type RescanResult, upsertJellyfinItem } from "./resc
|
||||
import { loadLibrary as loadSonarrLibrary, isUsable as sonarrUsable } from "./sonarr";
|
||||
|
||||
/**
|
||||
* Events we care about. Jellyfin's Webhook plugin emits many event types;
|
||||
* Library.ItemAdded and Library.ItemUpdated are the only ones that signal
|
||||
* an on-disk file mutation. We ignore user-data changes, playback, etc.
|
||||
* Events we care about. Jellyfin's Webhook plugin (jellyfin-plugin-webhook)
|
||||
* only exposes ItemAdded as a library-side notification — there is no
|
||||
* ItemUpdated or Library.ItemUpdated. File-rewrites on existing items
|
||||
* produce zero MQTT traffic, so we can't observe them here; the UI's
|
||||
* post-job verification runs off our own ffprobe instead.
|
||||
*
|
||||
* Payload fields are PascalCase (NotificationType, ItemId, ItemType) — the
|
||||
* earlier camelCase in this handler matched nothing the plugin ever sends.
|
||||
*/
|
||||
const ACCEPTED_EVENTS = new Set(["ItemAdded", "ItemUpdated", "Library.ItemAdded", "Library.ItemUpdated"]);
|
||||
const ACCEPTED_EVENTS = new Set(["ItemAdded"]);
|
||||
const ACCEPTED_TYPES = new Set(["Movie", "Episode"]);
|
||||
|
||||
/** 5-second dedupe window: Jellyfin fires ItemUpdated multiple times per rescan. */
|
||||
/** 5-second dedupe window: Jellyfin can fire the same ItemAdded twice when multiple libraries share a path. */
|
||||
const DEDUPE_WINDOW_MS = 5000;
|
||||
const dedupe = new Map<string, number>();
|
||||
|
||||
@@ -29,9 +34,9 @@ function parseLanguageList(raw: string | null | undefined, fallback: string[]):
|
||||
}
|
||||
|
||||
export interface WebhookPayload {
|
||||
event?: string;
|
||||
itemId?: string;
|
||||
itemType?: string;
|
||||
NotificationType?: string;
|
||||
ItemId?: string;
|
||||
ItemType?: string;
|
||||
}
|
||||
|
||||
export interface WebhookHandlerDeps {
|
||||
@@ -59,14 +64,14 @@ export interface WebhookResult {
|
||||
export async function processWebhookEvent(payload: WebhookPayload, deps: WebhookHandlerDeps): Promise<WebhookResult> {
|
||||
const { db, jellyfin, rescanCfg, getItemFn = getItem, now = Date.now } = deps;
|
||||
|
||||
if (!payload.event || !ACCEPTED_EVENTS.has(payload.event)) {
|
||||
return { accepted: false, reason: `event '${payload.event}' not accepted` };
|
||||
if (!payload.NotificationType || !ACCEPTED_EVENTS.has(payload.NotificationType)) {
|
||||
return { accepted: false, reason: `NotificationType '${payload.NotificationType}' not accepted` };
|
||||
}
|
||||
if (!payload.itemType || !ACCEPTED_TYPES.has(payload.itemType)) {
|
||||
return { accepted: false, reason: `itemType '${payload.itemType}' not accepted` };
|
||||
if (!payload.ItemType || !ACCEPTED_TYPES.has(payload.ItemType)) {
|
||||
return { accepted: false, reason: `ItemType '${payload.ItemType}' not accepted` };
|
||||
}
|
||||
if (!payload.itemId) {
|
||||
return { accepted: false, reason: "missing itemId" };
|
||||
if (!payload.ItemId) {
|
||||
return { accepted: false, reason: "missing ItemId" };
|
||||
}
|
||||
|
||||
// Debounce: drop bursts within the window, always evict stale entries.
|
||||
@@ -74,20 +79,20 @@ export async function processWebhookEvent(payload: WebhookPayload, deps: Webhook
|
||||
for (const [id, seen] of dedupe) {
|
||||
if (ts - seen > DEDUPE_WINDOW_MS) dedupe.delete(id);
|
||||
}
|
||||
const last = dedupe.get(payload.itemId);
|
||||
const last = dedupe.get(payload.ItemId);
|
||||
if (last != null && ts - last <= DEDUPE_WINDOW_MS) {
|
||||
return { accepted: false, reason: "deduped" };
|
||||
}
|
||||
dedupe.set(payload.itemId, ts);
|
||||
dedupe.set(payload.ItemId, ts);
|
||||
|
||||
const fresh = await getItemFn(jellyfin, payload.itemId);
|
||||
const fresh = await getItemFn(jellyfin, payload.ItemId);
|
||||
if (!fresh) {
|
||||
warn(`Webhook: Jellyfin returned no item for ${payload.itemId}`);
|
||||
warn(`Webhook: Jellyfin returned no item for ${payload.ItemId}`);
|
||||
return { accepted: false, reason: "jellyfin returned no item" };
|
||||
}
|
||||
|
||||
const result = await upsertJellyfinItem(db, fresh, rescanCfg, { source: "webhook" });
|
||||
log(`Webhook: reanalyzed ${payload.itemType} ${payload.itemId} is_noop=${result.isNoop}`);
|
||||
log(`Webhook: ingested ${payload.ItemType} ${payload.ItemId} is_noop=${result.isNoop}`);
|
||||
return { accepted: true, result };
|
||||
}
|
||||
|
||||
|
||||
@@ -65,7 +65,7 @@ export interface ReviewPlan {
|
||||
subs_extracted: number;
|
||||
notes: string | null;
|
||||
reviewed_at: string | null;
|
||||
webhook_verified: number;
|
||||
verified: number;
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user