From 4f1433437b721efb371969f9af300a1881e91aa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20F=C3=B6rtsch?= Date: Tue, 14 Apr 2026 07:36:15 +0200 Subject: [PATCH] dedupe pending jobs to stop rapid-fire approvals from spawning ghost ffmpeg runs root cause: all five job-insert sites in review.ts blindly inserted a 'pending' row, so a double-click on approve (or an overlap between /approve-all and individual /approve) wrote N jobs for the same item. job 1 stripped subtitles + reordered audio; jobs 2..N then ran the same stale stream-index command against the already-processed file and ffmpeg bailed with 'Stream map matches no streams'. fix: funnel every insert through enqueueAudioJob(), which only writes when no pending job already exists for that item. covers approve, retry, approve-all, season approve-all, series approve-all. --- package.json | 2 +- server/api/__tests__/review.test.ts | 50 +++++++++++++++++++++++++++++ server/api/review.ts | 40 +++++++++++++---------- 3 files changed, 74 insertions(+), 18 deletions(-) create mode 100644 server/api/__tests__/review.test.ts diff --git a/package.json b/package.json index a00d2b1..30582c8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "netfelix-audio-fix", - "version": "2026.04.13.11", + "version": "2026.04.14", "scripts": { "dev:server": "NODE_ENV=development bun --hot server/index.tsx", "dev:client": "vite", diff --git a/server/api/__tests__/review.test.ts b/server/api/__tests__/review.test.ts new file mode 100644 index 0000000..1a21ec4 --- /dev/null +++ b/server/api/__tests__/review.test.ts @@ -0,0 +1,50 @@ +import { Database } from "bun:sqlite"; +import { describe, expect, test } from "bun:test"; +import { SCHEMA } from "../../db/schema"; +import { enqueueAudioJob } from "../review"; + +function makeDb(): Database { + const db = new Database(":memory:"); + for (const stmt of SCHEMA.split(";")) { + const trimmed = stmt.trim(); + if (trimmed) db.run(trimmed); + } + db + .prepare("INSERT INTO media_items (id, jellyfin_id, type, name, file_path) VALUES (?, ?, 'Movie', 'T', '/x.mkv')") + .run(1, "jf-1"); + return db; +} + +describe("enqueueAudioJob dedup", () => { + test("inserts a job when none exists", () => { + const db = makeDb(); + expect(enqueueAudioJob(db, 1, "ffmpeg a")).toBe(true); + const { n } = db.prepare("SELECT COUNT(*) as n FROM jobs WHERE item_id = 1").get() as { n: number }; + expect(n).toBe(1); + }); + + test("no-ops when a pending job already exists", () => { + const db = makeDb(); + enqueueAudioJob(db, 1, "ffmpeg a"); + expect(enqueueAudioJob(db, 1, "ffmpeg b")).toBe(false); + expect(enqueueAudioJob(db, 1, "ffmpeg c")).toBe(false); + const rows = db.prepare("SELECT command FROM jobs WHERE item_id = 1").all() as { command: string }[]; + expect(rows.length).toBe(1); + expect(rows[0].command).toBe("ffmpeg a"); + }); + + test("allows a new pending job once the previous one is done or errored", () => { + const db = makeDb(); + enqueueAudioJob(db, 1, "ffmpeg a"); + db.prepare("UPDATE jobs SET status = 'done' WHERE item_id = 1").run(); + expect(enqueueAudioJob(db, 1, "ffmpeg b")).toBe(true); + + db.prepare("UPDATE jobs SET status = 'error' WHERE command = 'ffmpeg b'").run(); + expect(enqueueAudioJob(db, 1, "ffmpeg c")).toBe(true); + + const pending = db.prepare("SELECT COUNT(*) as n FROM jobs WHERE item_id = 1 AND status = 'pending'").get() as { + n: number; + }; + expect(pending.n).toBe(1); + }); +}); diff --git a/server/api/review.ts b/server/api/review.ts index 964f447..bfeecca 100644 --- a/server/api/review.ts +++ b/server/api/review.ts @@ -24,6 +24,24 @@ function parseLanguageList(raw: string | null, fallback: string[]): string[] { } } +/** + * Insert a pending audio job for the given item only if no pending job + * already exists for it. Guards against duplicate jobs from rapid-fire + * approve clicks, overlapping individual + bulk approvals, or any other + * path that could race two POSTs for the same item. Returns true if a + * job was actually inserted. + */ +export function enqueueAudioJob(db: ReturnType, itemId: number, command: string): boolean { + const result = db + .prepare(` + INSERT INTO jobs (item_id, command, job_type, status) + SELECT ?, ?, 'audio', 'pending' + WHERE NOT EXISTS (SELECT 1 FROM jobs WHERE item_id = ? AND status = 'pending') + `) + .run(itemId, command, itemId); + return result.changes > 0; +} + function countsByFilter(db: ReturnType): Record { const total = (db.prepare("SELECT COUNT(*) as n FROM review_plans").get() as { n: number }).n; const noops = (db.prepare("SELECT COUNT(*) as n FROM review_plans WHERE is_noop = 1").get() as { n: number }).n; @@ -442,10 +460,7 @@ app.post("/series/:seriesKey/approve-all", (c) => { for (const plan of pending) { db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id); const { item, streams, decisions } = loadItemDetail(db, plan.item_id); - if (item) - db - .prepare("INSERT INTO jobs (item_id, command, job_type, status) VALUES (?, ?, 'audio', 'pending')") - .run(plan.item_id, buildCommand(item, streams, decisions)); + if (item) enqueueAudioJob(db, plan.item_id, buildCommand(item, streams, decisions)); } return c.json({ ok: true, count: pending.length }); }); @@ -467,10 +482,7 @@ app.post("/season/:seriesKey/:season/approve-all", (c) => { for (const plan of pending) { db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id); const { item, streams, decisions } = loadItemDetail(db, plan.item_id); - if (item) - db - .prepare("INSERT INTO jobs (item_id, command, job_type, status) VALUES (?, ?, 'audio', 'pending')") - .run(plan.item_id, buildCommand(item, streams, decisions)); + if (item) enqueueAudioJob(db, plan.item_id, buildCommand(item, streams, decisions)); } return c.json({ ok: true, count: pending.length }); }); @@ -487,10 +499,7 @@ app.post("/approve-all", (c) => { for (const plan of pending) { db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id); const { item, streams, decisions } = loadItemDetail(db, plan.item_id); - if (item) - db - .prepare("INSERT INTO jobs (item_id, command, job_type, status) VALUES (?, ?, 'audio', 'pending')") - .run(plan.item_id, buildCommand(item, streams, decisions)); + if (item) enqueueAudioJob(db, plan.item_id, buildCommand(item, streams, decisions)); } return c.json({ ok: true, count: pending.length }); }); @@ -587,10 +596,7 @@ app.post("/:id/approve", (c) => { db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id); if (!plan.is_noop) { const { item, streams, decisions } = loadItemDetail(db, id); - if (item) - db - .prepare("INSERT INTO jobs (item_id, command, job_type, status) VALUES (?, ?, 'audio', 'pending')") - .run(id, buildCommand(item, streams, decisions)); + if (item) enqueueAudioJob(db, id, buildCommand(item, streams, decisions)); } return c.json({ ok: true }); }); @@ -614,7 +620,7 @@ app.post("/:id/retry", (c) => { const { item, command } = loadItemDetail(db, id); if (!item || !command) return c.json({ ok: false, error: "Cannot rebuild command" }, 400); - db.prepare("INSERT INTO jobs (item_id, command, job_type, status) VALUES (?, ?, 'audio', 'pending')").run(id, command); + enqueueAudioJob(db, id, command); db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id); return c.json({ ok: true }); });