dedupe pending jobs to stop rapid-fire approvals from spawning ghost ffmpeg runs
All checks were successful
Build and Push Docker Image / build (push) Successful in 1m35s
All checks were successful
Build and Push Docker Image / build (push) Successful in 1m35s
root cause: all five job-insert sites in review.ts blindly inserted a 'pending' row, so a double-click on approve (or an overlap between /approve-all and individual /approve) wrote N jobs for the same item. job 1 stripped subtitles + reordered audio; jobs 2..N then ran the same stale stream-index command against the already-processed file and ffmpeg bailed with 'Stream map matches no streams'. fix: funnel every insert through enqueueAudioJob(), which only writes when no pending job already exists for that item. covers approve, retry, approve-all, season approve-all, series approve-all.
This commit is contained in:
50
server/api/__tests__/review.test.ts
Normal file
50
server/api/__tests__/review.test.ts
Normal file
@@ -0,0 +1,50 @@
|
||||
import { Database } from "bun:sqlite";
|
||||
import { describe, expect, test } from "bun:test";
|
||||
import { SCHEMA } from "../../db/schema";
|
||||
import { enqueueAudioJob } from "../review";
|
||||
|
||||
function makeDb(): Database {
|
||||
const db = new Database(":memory:");
|
||||
for (const stmt of SCHEMA.split(";")) {
|
||||
const trimmed = stmt.trim();
|
||||
if (trimmed) db.run(trimmed);
|
||||
}
|
||||
db
|
||||
.prepare("INSERT INTO media_items (id, jellyfin_id, type, name, file_path) VALUES (?, ?, 'Movie', 'T', '/x.mkv')")
|
||||
.run(1, "jf-1");
|
||||
return db;
|
||||
}
|
||||
|
||||
describe("enqueueAudioJob dedup", () => {
|
||||
test("inserts a job when none exists", () => {
|
||||
const db = makeDb();
|
||||
expect(enqueueAudioJob(db, 1, "ffmpeg a")).toBe(true);
|
||||
const { n } = db.prepare("SELECT COUNT(*) as n FROM jobs WHERE item_id = 1").get() as { n: number };
|
||||
expect(n).toBe(1);
|
||||
});
|
||||
|
||||
test("no-ops when a pending job already exists", () => {
|
||||
const db = makeDb();
|
||||
enqueueAudioJob(db, 1, "ffmpeg a");
|
||||
expect(enqueueAudioJob(db, 1, "ffmpeg b")).toBe(false);
|
||||
expect(enqueueAudioJob(db, 1, "ffmpeg c")).toBe(false);
|
||||
const rows = db.prepare("SELECT command FROM jobs WHERE item_id = 1").all() as { command: string }[];
|
||||
expect(rows.length).toBe(1);
|
||||
expect(rows[0].command).toBe("ffmpeg a");
|
||||
});
|
||||
|
||||
test("allows a new pending job once the previous one is done or errored", () => {
|
||||
const db = makeDb();
|
||||
enqueueAudioJob(db, 1, "ffmpeg a");
|
||||
db.prepare("UPDATE jobs SET status = 'done' WHERE item_id = 1").run();
|
||||
expect(enqueueAudioJob(db, 1, "ffmpeg b")).toBe(true);
|
||||
|
||||
db.prepare("UPDATE jobs SET status = 'error' WHERE command = 'ffmpeg b'").run();
|
||||
expect(enqueueAudioJob(db, 1, "ffmpeg c")).toBe(true);
|
||||
|
||||
const pending = db.prepare("SELECT COUNT(*) as n FROM jobs WHERE item_id = 1 AND status = 'pending'").get() as {
|
||||
n: number;
|
||||
};
|
||||
expect(pending.n).toBe(1);
|
||||
});
|
||||
});
|
||||
@@ -24,6 +24,24 @@ function parseLanguageList(raw: string | null, fallback: string[]): string[] {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert a pending audio job for the given item only if no pending job
|
||||
* already exists for it. Guards against duplicate jobs from rapid-fire
|
||||
* approve clicks, overlapping individual + bulk approvals, or any other
|
||||
* path that could race two POSTs for the same item. Returns true if a
|
||||
* job was actually inserted.
|
||||
*/
|
||||
export function enqueueAudioJob(db: ReturnType<typeof getDb>, itemId: number, command: string): boolean {
|
||||
const result = db
|
||||
.prepare(`
|
||||
INSERT INTO jobs (item_id, command, job_type, status)
|
||||
SELECT ?, ?, 'audio', 'pending'
|
||||
WHERE NOT EXISTS (SELECT 1 FROM jobs WHERE item_id = ? AND status = 'pending')
|
||||
`)
|
||||
.run(itemId, command, itemId);
|
||||
return result.changes > 0;
|
||||
}
|
||||
|
||||
function countsByFilter(db: ReturnType<typeof getDb>): Record<string, number> {
|
||||
const total = (db.prepare("SELECT COUNT(*) as n FROM review_plans").get() as { n: number }).n;
|
||||
const noops = (db.prepare("SELECT COUNT(*) as n FROM review_plans WHERE is_noop = 1").get() as { n: number }).n;
|
||||
@@ -442,10 +460,7 @@ app.post("/series/:seriesKey/approve-all", (c) => {
|
||||
for (const plan of pending) {
|
||||
db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id);
|
||||
const { item, streams, decisions } = loadItemDetail(db, plan.item_id);
|
||||
if (item)
|
||||
db
|
||||
.prepare("INSERT INTO jobs (item_id, command, job_type, status) VALUES (?, ?, 'audio', 'pending')")
|
||||
.run(plan.item_id, buildCommand(item, streams, decisions));
|
||||
if (item) enqueueAudioJob(db, plan.item_id, buildCommand(item, streams, decisions));
|
||||
}
|
||||
return c.json({ ok: true, count: pending.length });
|
||||
});
|
||||
@@ -467,10 +482,7 @@ app.post("/season/:seriesKey/:season/approve-all", (c) => {
|
||||
for (const plan of pending) {
|
||||
db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id);
|
||||
const { item, streams, decisions } = loadItemDetail(db, plan.item_id);
|
||||
if (item)
|
||||
db
|
||||
.prepare("INSERT INTO jobs (item_id, command, job_type, status) VALUES (?, ?, 'audio', 'pending')")
|
||||
.run(plan.item_id, buildCommand(item, streams, decisions));
|
||||
if (item) enqueueAudioJob(db, plan.item_id, buildCommand(item, streams, decisions));
|
||||
}
|
||||
return c.json({ ok: true, count: pending.length });
|
||||
});
|
||||
@@ -487,10 +499,7 @@ app.post("/approve-all", (c) => {
|
||||
for (const plan of pending) {
|
||||
db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id);
|
||||
const { item, streams, decisions } = loadItemDetail(db, plan.item_id);
|
||||
if (item)
|
||||
db
|
||||
.prepare("INSERT INTO jobs (item_id, command, job_type, status) VALUES (?, ?, 'audio', 'pending')")
|
||||
.run(plan.item_id, buildCommand(item, streams, decisions));
|
||||
if (item) enqueueAudioJob(db, plan.item_id, buildCommand(item, streams, decisions));
|
||||
}
|
||||
return c.json({ ok: true, count: pending.length });
|
||||
});
|
||||
@@ -587,10 +596,7 @@ app.post("/:id/approve", (c) => {
|
||||
db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id);
|
||||
if (!plan.is_noop) {
|
||||
const { item, streams, decisions } = loadItemDetail(db, id);
|
||||
if (item)
|
||||
db
|
||||
.prepare("INSERT INTO jobs (item_id, command, job_type, status) VALUES (?, ?, 'audio', 'pending')")
|
||||
.run(id, buildCommand(item, streams, decisions));
|
||||
if (item) enqueueAudioJob(db, id, buildCommand(item, streams, decisions));
|
||||
}
|
||||
return c.json({ ok: true });
|
||||
});
|
||||
@@ -614,7 +620,7 @@ app.post("/:id/retry", (c) => {
|
||||
const { item, command } = loadItemDetail(db, id);
|
||||
if (!item || !command) return c.json({ ok: false, error: "Cannot rebuild command" }, 400);
|
||||
|
||||
db.prepare("INSERT INTO jobs (item_id, command, job_type, status) VALUES (?, ?, 'audio', 'pending')").run(id, command);
|
||||
enqueueAudioJob(db, id, command);
|
||||
db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id);
|
||||
return c.json({ ok: true });
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user