make done plans terminal, add ffprobe preflight to skip already-processed files
All checks were successful
Build and Push Docker Image / build (push) Successful in 1m34s
All checks were successful
Build and Push Docker Image / build (push) Successful in 1m34s
root cause of duplicate pipeline entries: rescan.ts flipped done plans back to pending whenever a post-job jellyfin refresh returned stale metadata, putting the item back in review and letting a second jobs row pile up in done. done is now sticky across rescans (error still re-opens for retries). second line of defense: before spawning ffmpeg, ffprobe the file and compare audio count/language/codec order + embedded subtitle count against the plan. if it already matches, mark the job done with the reason in jobs.output and skip the spawn. prevents corrupting a post-processed file with a stale stream-index command.
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "netfelix-audio-fix",
|
"name": "netfelix-audio-fix",
|
||||||
"version": "2026.04.13.10",
|
"version": "2026.04.13.11",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"dev:server": "NODE_ENV=development bun --hot server/index.tsx",
|
"dev:server": "NODE_ENV=development bun --hot server/index.tsx",
|
||||||
"dev:client": "vite",
|
"dev:client": "vite",
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import {
|
|||||||
waitForProcessWindow,
|
waitForProcessWindow,
|
||||||
} from "../services/scheduler";
|
} from "../services/scheduler";
|
||||||
import { loadLibrary as loadSonarrLibrary, isUsable as sonarrUsable } from "../services/sonarr";
|
import { loadLibrary as loadSonarrLibrary, isUsable as sonarrUsable } from "../services/sonarr";
|
||||||
|
import { verifyDesiredState } from "../services/verify";
|
||||||
import type { Job, MediaItem, MediaStream } from "../types";
|
import type { Job, MediaItem, MediaStream } from "../types";
|
||||||
|
|
||||||
function parseLanguageList(raw: string | null | undefined, fallback: string[]): string[] {
|
function parseLanguageList(raw: string | null | undefined, fallback: string[]): string[] {
|
||||||
@@ -403,6 +404,31 @@ async function runJob(job: Job): Promise<void> {
|
|||||||
db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id);
|
db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Preflight: if the file already matches the plan, skip ffmpeg. Cheap
|
||||||
|
// guard against re-running a stream-index-based command against a file
|
||||||
|
// that's already been processed — which would either error out or
|
||||||
|
// silently corrupt the file.
|
||||||
|
try {
|
||||||
|
const verify = await verifyDesiredState(db, job.item_id, itemRow.file_path);
|
||||||
|
if (verify.matches) {
|
||||||
|
const msg = `Preflight check: ${verify.reason}\nSkipping FFmpeg — no work needed.`;
|
||||||
|
log(`Job ${job.id} ${msg.replace(/\n/g, " ")}`);
|
||||||
|
db.transaction(() => {
|
||||||
|
db
|
||||||
|
.prepare(
|
||||||
|
"UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?",
|
||||||
|
)
|
||||||
|
.run(msg, job.id);
|
||||||
|
db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?").run(job.item_id);
|
||||||
|
})();
|
||||||
|
emitJobUpdate(job.id, "done", msg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log(`Job ${job.id} preflight: ${verify.reason} — running FFmpeg`);
|
||||||
|
} catch (err) {
|
||||||
|
warn(`Job ${job.id} preflight check errored: ${String(err)} — proceeding with FFmpeg`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
emitJobUpdate(job.id, "running");
|
emitJobUpdate(job.id, "running");
|
||||||
|
|||||||
@@ -225,8 +225,9 @@ export async function upsertJellyfinItem(
|
|||||||
VALUES (?, 'pending', ?, ?, ?, ?, ?)
|
VALUES (?, 'pending', ?, ?, ?, ?, ?)
|
||||||
ON CONFLICT(item_id) DO UPDATE SET
|
ON CONFLICT(item_id) DO UPDATE SET
|
||||||
status = CASE
|
status = CASE
|
||||||
|
WHEN review_plans.status = 'done' THEN 'done'
|
||||||
WHEN excluded.is_noop = 1 THEN 'done'
|
WHEN excluded.is_noop = 1 THEN 'done'
|
||||||
WHEN review_plans.status IN ('done','error') THEN 'pending'
|
WHEN review_plans.status = 'error' THEN 'pending'
|
||||||
ELSE review_plans.status
|
ELSE review_plans.status
|
||||||
END,
|
END,
|
||||||
is_noop = excluded.is_noop,
|
is_noop = excluded.is_noop,
|
||||||
|
|||||||
139
server/services/verify.ts
Normal file
139
server/services/verify.ts
Normal file
@@ -0,0 +1,139 @@
|
|||||||
|
import type { Database } from "bun:sqlite";
|
||||||
|
|
||||||
|
interface ProbedStream {
|
||||||
|
type: "Audio" | "Video" | "Subtitle" | "Data" | "Attachment" | "Unknown";
|
||||||
|
codec: string | null;
|
||||||
|
language: string | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function ffprobeStreams(filePath: string): Promise<ProbedStream[]> {
|
||||||
|
const proc = Bun.spawn(["ffprobe", "-v", "error", "-print_format", "json", "-show_streams", filePath], {
|
||||||
|
stdout: "pipe",
|
||||||
|
stderr: "pipe",
|
||||||
|
});
|
||||||
|
const [stdout, stderr] = await Promise.all([new Response(proc.stdout).text(), new Response(proc.stderr).text()]);
|
||||||
|
const exitCode = await proc.exited;
|
||||||
|
if (exitCode !== 0) throw new Error(`ffprobe exited ${exitCode}: ${stderr.trim() || "<no stderr>"}`);
|
||||||
|
|
||||||
|
const data = JSON.parse(stdout) as {
|
||||||
|
streams?: Array<{ codec_type?: string; codec_name?: string; tags?: { language?: string } }>;
|
||||||
|
};
|
||||||
|
return (data.streams ?? []).map((s) => ({
|
||||||
|
type: codecTypeToType(s.codec_type),
|
||||||
|
codec: s.codec_name ?? null,
|
||||||
|
language: s.tags?.language ?? null,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
function codecTypeToType(t: string | undefined): ProbedStream["type"] {
|
||||||
|
switch (t) {
|
||||||
|
case "audio":
|
||||||
|
return "Audio";
|
||||||
|
case "video":
|
||||||
|
return "Video";
|
||||||
|
case "subtitle":
|
||||||
|
return "Subtitle";
|
||||||
|
case "data":
|
||||||
|
return "Data";
|
||||||
|
case "attachment":
|
||||||
|
return "Attachment";
|
||||||
|
default:
|
||||||
|
return "Unknown";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface VerifyResult {
|
||||||
|
matches: boolean;
|
||||||
|
reason: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether the on-disk file already matches the plan's desired state.
|
||||||
|
* Comparison is conservative: any uncertainty falls back to "run the job".
|
||||||
|
*
|
||||||
|
* Matches when:
|
||||||
|
* - audio stream count, language order, and codec match the `keep` decisions
|
||||||
|
* - no subtitle streams remain in the container
|
||||||
|
* - either subs_extracted=1 or the plan has no subtitle decisions to extract
|
||||||
|
*/
|
||||||
|
export async function verifyDesiredState(db: Database, itemId: number, filePath: string): Promise<VerifyResult> {
|
||||||
|
const plan = db.prepare("SELECT id, subs_extracted FROM review_plans WHERE item_id = ?").get(itemId) as
|
||||||
|
| { id: number; subs_extracted: number }
|
||||||
|
| undefined;
|
||||||
|
if (!plan) return { matches: false, reason: "no review plan found" };
|
||||||
|
|
||||||
|
const expected = db
|
||||||
|
.prepare(`
|
||||||
|
SELECT sd.target_index, sd.transcode_codec, ms.language, ms.codec
|
||||||
|
FROM stream_decisions sd
|
||||||
|
JOIN media_streams ms ON ms.id = sd.stream_id
|
||||||
|
WHERE sd.plan_id = ? AND sd.action = 'keep' AND ms.type = 'Audio'
|
||||||
|
ORDER BY sd.target_index
|
||||||
|
`)
|
||||||
|
.all(plan.id) as {
|
||||||
|
target_index: number;
|
||||||
|
transcode_codec: string | null;
|
||||||
|
language: string | null;
|
||||||
|
codec: string | null;
|
||||||
|
}[];
|
||||||
|
|
||||||
|
let probed: ProbedStream[];
|
||||||
|
try {
|
||||||
|
probed = await ffprobeStreams(filePath);
|
||||||
|
} catch (err) {
|
||||||
|
return { matches: false, reason: `ffprobe failed: ${(err as Error).message}` };
|
||||||
|
}
|
||||||
|
|
||||||
|
const probedAudio = probed.filter((s) => s.type === "Audio");
|
||||||
|
const probedSubs = probed.filter((s) => s.type === "Subtitle");
|
||||||
|
|
||||||
|
if (probedSubs.length > 0) {
|
||||||
|
return { matches: false, reason: `file still contains ${probedSubs.length} subtitle stream(s) in the container` };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (probedAudio.length !== expected.length) {
|
||||||
|
return {
|
||||||
|
matches: false,
|
||||||
|
reason: `audio stream count mismatch (file: ${probedAudio.length}, expected: ${expected.length})`,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
for (let i = 0; i < expected.length; i++) {
|
||||||
|
const want = expected[i];
|
||||||
|
const got = probedAudio[i];
|
||||||
|
const wantCodec = (want.transcode_codec ?? want.codec ?? "").toLowerCase();
|
||||||
|
const gotCodec = (got.codec ?? "").toLowerCase();
|
||||||
|
const wantLang = (want.language ?? "").toLowerCase();
|
||||||
|
const gotLang = (got.language ?? "").toLowerCase();
|
||||||
|
if (wantLang && wantLang !== gotLang) {
|
||||||
|
return {
|
||||||
|
matches: false,
|
||||||
|
reason: `audio track ${i}: language ${gotLang || "<none>"} ≠ expected ${wantLang}`,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
if (wantCodec && gotCodec && wantCodec !== gotCodec) {
|
||||||
|
return {
|
||||||
|
matches: false,
|
||||||
|
reason: `audio track ${i}: codec ${gotCodec} ≠ expected ${wantCodec}`,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (plan.subs_extracted === 0) {
|
||||||
|
const pendingSubs = db
|
||||||
|
.prepare(`
|
||||||
|
SELECT COUNT(*) as n FROM stream_decisions sd
|
||||||
|
JOIN media_streams ms ON ms.id = sd.stream_id
|
||||||
|
WHERE sd.plan_id = ? AND ms.type = 'Subtitle'
|
||||||
|
`)
|
||||||
|
.get(plan.id) as { n: number };
|
||||||
|
if (pendingSubs.n > 0) {
|
||||||
|
return { matches: false, reason: "subtitles not yet extracted to sidecar files" };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
matches: true,
|
||||||
|
reason: `file already matches desired layout (${probedAudio.length} audio track(s), no embedded subtitles)`,
|
||||||
|
};
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user