Files
netfelix-audio-fix/server/api/execute.ts
Felix Förtsch afd95f06df
All checks were successful
Build and Push Docker Image / build (push) Successful in 1m11s
✓✓ is now jellyfin-corroborated, not a self-confirming ffprobe
user reported ad astra got the double checkmark instantly after
transcode — correct, and correct to flag: the post-execute
verifyDesiredState ran ffprobe on the file we had just written, so it
tautologically matched the plan every time. not a second opinion.

replaced the flow with the semantics we actually wanted:

1. refreshItem now returns { refreshed: boolean } — true when jellyfin's
   DateLastRefreshed actually advanced within the timeout, false when it
   didn't. callers can tell 'jellyfin really re-probed' apart from
   'we timed out waiting'.

2. handOffToJellyfin post-job: refresh → (only if refreshed=true) fetch
   fresh streams → upsertJellyfinItem(source='webhook'). the rescan SQL
   sets verified=1 exactly when the fresh analysis sees is_noop=1, so
   ✓✓ now means 'jellyfin independently re-probed the file we wrote
   and agrees it matches the plan'. if jellyfin sees a drifted layout
   the plan flips back to pending so the user notices instead of the
   job silently rubber-stamping a bad output.

3. dropped the post-execute ffprobe block. the preflight-skipped branch
   no longer self-awards verified=1 either; it now does the same hand-
   off so jellyfin's re-probe drives the ✓✓ in that branch too.

refreshItem's other two callers (review /rescan, subtitles /rescan)
ignore the return value — their semantics haven't changed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 18:29:00 +02:00

589 lines
22 KiB
TypeScript

import { accessSync, constants } from "node:fs";
import { Hono } from "hono";
import { stream } from "hono/streaming";
import { getAllConfig, getDb } from "../db/index";
import { log, error as logError, warn } from "../lib/log";
import { predictExtractedFiles } from "../services/ffmpeg";
import { getItem, refreshItem } from "../services/jellyfin";
import { loadLibrary as loadRadarrLibrary, isUsable as radarrUsable } from "../services/radarr";
import { type RescanConfig, upsertJellyfinItem } from "../services/rescan";
import {
getScheduleConfig,
isInProcessWindow,
msUntilProcessWindow,
nextProcessWindowTime,
sleepBetweenJobs,
waitForProcessWindow,
} from "../services/scheduler";
import { loadLibrary as loadSonarrLibrary, isUsable as sonarrUsable } from "../services/sonarr";
import { verifyDesiredState } from "../services/verify";
import type { Job, MediaItem, MediaStream } from "../types";
/**
* Post-job hand-off to Jellyfin. Three phases:
* 1. refreshItem — ask Jellyfin to re-probe the file on disk and wait
* until its DateLastRefreshed advances (or 15s timeout).
* 2. getItem — pull back the freshly-probed metadata.
* 3. upsertJellyfinItem(source='webhook') — re-run the analyzer against
* Jellyfin's view. If it matches the plan (is_noop=1), sets verified=1
* — the ✓✓ in the Done column. If Jellyfin sees a different layout
* (is_noop=0) the plan flips back to 'pending' so the user notices.
*
* This closes the previously-dangling "we asked Jellyfin to refresh but
* never checked what it saw" gap. Earlier attempt used our own ffprobe of
* the output, but that was tautological — ffmpeg just wrote the file to
* match the plan, so the check always passed immediately. Jellyfin is the
* independent observer that matters.
*/
async function handOffToJellyfin(itemId: number): Promise<void> {
const db = getDb();
const row = db.prepare("SELECT jellyfin_id FROM media_items WHERE id = ?").get(itemId) as
| { jellyfin_id: string }
| undefined;
if (!row) return;
const cfg = getAllConfig();
const jellyfinCfg = { url: cfg.jellyfin_url, apiKey: cfg.jellyfin_api_key, userId: cfg.jellyfin_user_id };
if (!jellyfinCfg.url || !jellyfinCfg.apiKey) return;
let refreshResult: { refreshed: boolean };
try {
refreshResult = await refreshItem(jellyfinCfg, row.jellyfin_id);
} catch (err) {
warn(`Jellyfin refresh for item ${itemId} failed: ${String(err)} — skipping verification`);
return;
}
if (!refreshResult.refreshed) {
// DateLastRefreshed never advanced within the timeout — Jellyfin may
// still be probing asynchronously. We can't trust the item data we'd
// fetch right now, so skip the verify step; the plan stays verified=0
// (single ✓) rather than risk flipping it based on stale metadata.
warn(`Jellyfin refresh for item ${itemId} timed out — leaving plan unverified`);
return;
}
try {
const fresh = await getItem(jellyfinCfg, row.jellyfin_id);
if (!fresh) {
warn(`Jellyfin returned no item for ${row.jellyfin_id} during verification`);
return;
}
const radarrCfg = { url: cfg.radarr_url, apiKey: cfg.radarr_api_key };
const sonarrCfg = { url: cfg.sonarr_url, apiKey: cfg.sonarr_api_key };
const radarrEnabled = cfg.radarr_enabled === "1" && radarrUsable(radarrCfg);
const sonarrEnabled = cfg.sonarr_enabled === "1" && sonarrUsable(sonarrCfg);
const [radarrLibrary, sonarrLibrary] = await Promise.all([
radarrEnabled ? loadRadarrLibrary(radarrCfg) : Promise.resolve(null),
sonarrEnabled ? loadSonarrLibrary(sonarrCfg) : Promise.resolve(null),
]);
const audioLanguages = JSON.parse(cfg.audio_languages || "[]") as string[];
const rescanCfg: RescanConfig = {
audioLanguages,
radarr: radarrEnabled ? radarrCfg : null,
sonarr: sonarrEnabled ? sonarrCfg : null,
radarrLibrary,
sonarrLibrary,
};
const result = await upsertJellyfinItem(db, fresh, rescanCfg, { source: "webhook" });
log(`Post-job verify for item ${itemId}: is_noop=${result.isNoop}`);
} catch (err) {
warn(`Post-job verification for item ${itemId} failed: ${String(err)}`);
}
}
const app = new Hono();
// ─── Sequential local queue ──────────────────────────────────────────────────
let queueRunning = false;
let runningProc: ReturnType<typeof Bun.spawn> | null = null;
let runningJobId: number | null = null;
function emitQueueStatus(
status: "running" | "paused" | "sleeping" | "idle",
extra: { until?: string; seconds?: number } = {},
): void {
const line = `event: queue_status\ndata: ${JSON.stringify({ status, ...extra })}\n\n`;
for (const l of jobListeners) l(line);
}
async function runSequential(jobs: Job[]): Promise<void> {
if (queueRunning) return;
queueRunning = true;
try {
let first = true;
for (const job of jobs) {
// Pause outside the processing window
if (!isInProcessWindow()) {
emitQueueStatus("paused", {
until: nextProcessWindowTime(),
seconds: Math.round(msUntilProcessWindow() / 1000),
});
await waitForProcessWindow();
}
// Sleep between jobs (but not before the first one)
if (!first) {
const cfg = getScheduleConfig();
if (cfg.job_sleep_seconds > 0) {
emitQueueStatus("sleeping", { seconds: cfg.job_sleep_seconds });
await sleepBetweenJobs();
}
}
first = false;
// Atomic claim: only pick up jobs still pending
const db = getDb();
const claimed = db
.prepare(
"UPDATE jobs SET status = 'running', started_at = datetime('now'), output = '' WHERE id = ? AND status = 'pending'",
)
.run(job.id);
if (claimed.changes === 0) continue; // cancelled or already running
emitQueueStatus("running");
try {
await runJob(job);
} catch (err) {
logError(`Job ${job.id} failed:`, err);
}
}
} finally {
queueRunning = false;
emitQueueStatus("idle");
}
}
// ─── SSE state ────────────────────────────────────────────────────────────────
const jobListeners = new Set<(data: string) => void>();
function emitJobUpdate(jobId: number, status: string, output?: string): void {
const line = `event: job_update\ndata: ${JSON.stringify({ id: jobId, status, output })}\n\n`;
for (const l of jobListeners) l(line);
}
function emitJobProgress(jobId: number, seconds: number, total: number): void {
const line = `event: job_progress\ndata: ${JSON.stringify({ id: jobId, seconds, total })}\n\n`;
for (const l of jobListeners) l(line);
}
/** Parse "Duration: HH:MM:SS.MS" from ffmpeg startup output. */
function parseFFmpegDuration(line: string): number | null {
const match = line.match(/Duration:\s*(\d+):(\d+):(\d+)\.(\d+)/);
if (!match) return null;
const [, h, m, s] = match.map(Number);
return h * 3600 + m * 60 + s;
}
function loadJobRow(jobId: number) {
const db = getDb();
const row = db
.prepare(`
SELECT j.*, mi.id as mi_id, mi.name, mi.type, mi.series_name, mi.season_number,
mi.episode_number, mi.file_path
FROM jobs j
LEFT JOIN media_items mi ON mi.id = j.item_id
WHERE j.id = ?
`)
.get(jobId) as
| (Job & {
mi_id: number | null;
name: string | null;
type: string | null;
series_name: string | null;
season_number: number | null;
episode_number: number | null;
file_path: string | null;
})
| undefined;
if (!row) return null;
const item = row.name
? ({
id: row.item_id,
name: row.name,
type: row.type,
series_name: row.series_name,
season_number: row.season_number,
episode_number: row.episode_number,
file_path: row.file_path,
} as unknown as MediaItem)
: null;
return { job: row as unknown as Job, item };
}
// ─── List ─────────────────────────────────────────────────────────────────────
app.get("/", (c) => {
const db = getDb();
const filter = (c.req.query("filter") ?? "pending") as "all" | "pending" | "running" | "done" | "error";
const validFilters = ["all", "pending", "running", "done", "error"];
const whereClause = validFilters.includes(filter) && filter !== "all" ? `WHERE j.status = ?` : "";
const params = whereClause ? [filter] : [];
const jobRows = db
.prepare(`
SELECT j.*, mi.name, mi.type, mi.series_name, mi.season_number, mi.episode_number, mi.file_path
FROM jobs j
LEFT JOIN media_items mi ON mi.id = j.item_id
${whereClause}
ORDER BY j.created_at DESC
LIMIT 200
`)
.all(...params) as (Job & {
name: string;
type: string;
series_name: string | null;
season_number: number | null;
episode_number: number | null;
file_path: string;
})[];
const jobs = jobRows.map((r) => ({
job: r as unknown as Job,
item: r.name
? ({
id: r.item_id,
name: r.name,
type: r.type,
series_name: r.series_name,
season_number: r.season_number,
episode_number: r.episode_number,
file_path: r.file_path,
} as unknown as MediaItem)
: null,
}));
const countRows = db.prepare("SELECT status, COUNT(*) as cnt FROM jobs GROUP BY status").all() as {
status: string;
cnt: number;
}[];
const totalCounts: Record<string, number> = { all: 0, pending: 0, running: 0, done: 0, error: 0 };
for (const row of countRows) {
totalCounts[row.status] = row.cnt;
totalCounts.all += row.cnt;
}
return c.json({ jobs, filter, totalCounts });
});
// ─── Param helpers ────────────────────────────────────────────────────────────
function parseId(raw: string | undefined): number | null {
if (!raw) return null;
const n = Number.parseInt(raw, 10);
return Number.isFinite(n) && n > 0 ? n : null;
}
// ─── Start all pending ────────────────────────────────────────────────────────
app.post("/start", (c) => {
const db = getDb();
const pending = db.prepare("SELECT * FROM jobs WHERE status = 'pending' ORDER BY created_at").all() as Job[];
runSequential(pending).catch((err) => logError("Queue failed:", err));
return c.json({ ok: true, started: pending.length });
});
// ─── Run single ───────────────────────────────────────────────────────────────
app.post("/job/:id/run", async (c) => {
const jobId = parseId(c.req.param("id"));
if (jobId == null) return c.json({ error: "invalid job id" }, 400);
const db = getDb();
const job = db.prepare("SELECT * FROM jobs WHERE id = ?").get(jobId) as Job | undefined;
if (!job) return c.notFound();
if (job.status !== "pending") {
const result = loadJobRow(jobId);
if (!result) return c.notFound();
return c.json(result);
}
runSequential([job]).catch((err) => logError(`Job ${job.id} failed:`, err));
const result = loadJobRow(jobId);
if (!result) return c.notFound();
return c.json(result);
});
// ─── Cancel ───────────────────────────────────────────────────────────────────
app.post("/job/:id/cancel", (c) => {
const jobId = parseId(c.req.param("id"));
if (jobId == null) return c.json({ error: "invalid job id" }, 400);
const db = getDb();
db.prepare("DELETE FROM jobs WHERE id = ? AND status = 'pending'").run(jobId);
return c.json({ ok: true });
});
// ─── Clear queue ──────────────────────────────────────────────────────────────
app.post("/clear", (c) => {
const db = getDb();
db
.prepare(`
UPDATE review_plans SET status = 'pending', reviewed_at = NULL
WHERE item_id IN (SELECT item_id FROM jobs WHERE status = 'pending')
AND status = 'approved'
`)
.run();
const result = db.prepare("DELETE FROM jobs WHERE status = 'pending'").run();
return c.json({ ok: true, cleared: result.changes });
});
app.post("/clear-completed", (c) => {
const db = getDb();
const result = db.prepare("DELETE FROM jobs WHERE status IN ('done', 'error')").run();
return c.json({ ok: true, cleared: result.changes });
});
// ─── Stop running job ─────────────────────────────────────────────────────────
app.post("/stop", (c) => {
if (!runningProc || runningJobId == null) {
return c.json({ ok: false, error: "No job is currently running" }, 409);
}
const stoppedId = runningJobId;
try {
runningProc.kill("SIGTERM");
} catch (err) {
logError(`Failed to kill job ${stoppedId}:`, err);
return c.json({ ok: false, error: String(err) }, 500);
}
// runJob's catch path will mark the job error and clean up runningProc.
return c.json({ ok: true, stopped: stoppedId });
});
// ─── SSE ──────────────────────────────────────────────────────────────────────
app.get("/events", (c) => {
return stream(c, async (s) => {
c.header("Content-Type", "text/event-stream");
c.header("Cache-Control", "no-cache");
const queue: string[] = [];
let resolve: (() => void) | null = null;
const listener = (data: string) => {
queue.push(data);
resolve?.();
};
jobListeners.add(listener);
s.onAbort(() => {
jobListeners.delete(listener);
});
try {
while (!s.closed) {
if (queue.length > 0) {
await s.write(queue.shift()!);
} else {
await new Promise<void>((res) => {
resolve = res;
setTimeout(res, 15_000);
});
resolve = null;
if (queue.length === 0) await s.write(": keepalive\n\n");
}
}
} finally {
jobListeners.delete(listener);
}
});
});
// ─── Job execution ────────────────────────────────────────────────────────────
async function runJob(job: Job): Promise<void> {
log(`Job ${job.id} starting (item=${job.item_id})`);
log(`Job ${job.id} command: ${job.command}`);
const db = getDb();
const itemRow = db.prepare("SELECT file_path FROM media_items WHERE id = ?").get(job.item_id) as
| { file_path: string }
| undefined;
if (itemRow?.file_path) {
try {
accessSync(itemRow.file_path, constants.R_OK | constants.W_OK);
} catch (fsErr) {
const msg = `File not accessible: ${itemRow.file_path}\n${(fsErr as Error).message}`;
db
.prepare("UPDATE jobs SET status = 'error', output = ?, exit_code = 1, completed_at = datetime('now') WHERE id = ?")
.run(msg, job.id);
emitJobUpdate(job.id, "error", msg);
db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id);
return;
}
// Preflight: if the file already matches the plan, skip ffmpeg. Cheap
// guard against re-running a stream-index-based command against a file
// that's already been processed — which would either error out or
// silently corrupt the file.
try {
const verify = await verifyDesiredState(db, job.item_id, itemRow.file_path);
if (verify.matches) {
const msg = `Preflight check: ${verify.reason}\nSkipping FFmpeg — no work needed.`;
log(`Job ${job.id} ${msg.replace(/\n/g, " ")}`);
db.transaction(() => {
db
.prepare(
"UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?",
)
.run(msg, job.id);
// Preflight matched → file is already correct per our own ffprobe.
// We still hand off to Jellyfin below so its independent re-probe
// drives the ✓✓ verified flag, rather than trusting our check of
// our own output.
db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?").run(job.item_id);
})();
emitJobUpdate(job.id, "done", msg);
// Hand off so Jellyfin re-probes and can corroborate the ✓✓.
handOffToJellyfin(job.item_id).catch((err) =>
warn(`Jellyfin hand-off for item ${job.item_id} failed: ${String(err)}`),
);
return;
}
log(`Job ${job.id} preflight: ${verify.reason} — running FFmpeg`);
} catch (err) {
warn(`Job ${job.id} preflight check errored: ${String(err)} — proceeding with FFmpeg`);
}
}
emitJobUpdate(job.id, "running");
const outputLines: string[] = [];
let pendingFlush = false;
let lastFlushAt = 0;
let totalSeconds = 0;
let lastProgressEmit = 0;
const updateOutput = db.prepare("UPDATE jobs SET output = ? WHERE id = ?");
const flush = (final = false) => {
const text = outputLines.join("\n");
const now = Date.now();
if (final || now - lastFlushAt > 500) {
updateOutput.run(text, job.id);
lastFlushAt = now;
pendingFlush = false;
} else {
pendingFlush = true;
}
emitJobUpdate(job.id, "running", text);
};
const consumeProgress = (line: string) => {
if (totalSeconds === 0) {
const d = parseFFmpegDuration(line);
if (d != null) totalSeconds = d;
}
const progressed = parseFFmpegProgress(line);
if (progressed != null && totalSeconds > 0) {
const now = Date.now();
if (now - lastProgressEmit > 500) {
emitJobProgress(job.id, progressed, totalSeconds);
lastProgressEmit = now;
}
}
};
try {
const proc = Bun.spawn(["sh", "-c", job.command], { stdout: "pipe", stderr: "pipe" });
runningProc = proc;
runningJobId = job.id;
const readStream = async (readable: ReadableStream<Uint8Array>, prefix = "") => {
const reader = readable.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const parts = buffer.split(/\r\n|\n|\r/);
buffer = parts.pop() ?? "";
for (const line of parts) {
if (!line.trim()) continue;
outputLines.push(prefix + line);
consumeProgress(line);
}
flush();
}
if (buffer.trim()) {
outputLines.push(prefix + buffer);
consumeProgress(buffer);
}
} catch (err) {
logError(`stream read error (${prefix.trim() || "stdout"}):`, err);
}
};
await Promise.all([readStream(proc.stdout), readStream(proc.stderr, "[stderr] "), proc.exited]);
const exitCode = await proc.exited;
if (pendingFlush) updateOutput.run(outputLines.join("\n"), job.id);
if (exitCode !== 0) throw new Error(`FFmpeg exited with code ${exitCode}`);
const fullOutput = outputLines.join("\n");
// Gather sidecar files to record
const item = db.prepare("SELECT * FROM media_items WHERE id = ?").get(job.item_id) as MediaItem | undefined;
const streams = db.prepare("SELECT * FROM media_streams WHERE item_id = ?").all(job.item_id) as MediaStream[];
const files = item && streams.length > 0 ? predictExtractedFiles(item, streams) : [];
const insertFile = db.prepare(
"INSERT OR IGNORE INTO subtitle_files (item_id, file_path, language, codec, is_forced, is_hearing_impaired) VALUES (?, ?, ?, ?, ?, ?)",
);
const markJobDone = db.prepare(
"UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?",
);
const markPlanDone = db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?");
const markSubsExtracted = db.prepare("UPDATE review_plans SET subs_extracted = 1 WHERE item_id = ?");
db.transaction(() => {
markJobDone.run(fullOutput, job.id);
markPlanDone.run(job.item_id);
for (const f of files) {
insertFile.run(job.item_id, f.file_path, f.language, f.codec, f.is_forced ? 1 : 0, f.is_hearing_impaired ? 1 : 0);
}
if (files.length > 0) markSubsExtracted.run(job.item_id);
})();
log(`Job ${job.id} completed successfully`);
emitJobUpdate(job.id, "done", fullOutput);
// Fire-and-forget hand-off. Jellyfin re-probes the file we just wrote,
// we wait for DateLastRefreshed to advance, then re-analyze its fresh
// view. Setting verified=1 only happens when Jellyfin's independent
// probe confirms is_noop=1. If its view disagrees the plan flips back
// to 'pending' so the user notices — better than silently rubber-
// stamping a bad output as ✓✓.
handOffToJellyfin(job.item_id).catch((err) =>
warn(`Jellyfin hand-off for item ${job.item_id} failed: ${String(err)}`),
);
} catch (err) {
logError(`Job ${job.id} failed:`, err);
const fullOutput = `${outputLines.join("\n")}\n${String(err)}`;
db
.prepare("UPDATE jobs SET status = 'error', exit_code = 1, output = ?, completed_at = datetime('now') WHERE id = ?")
.run(fullOutput, job.id);
emitJobUpdate(job.id, "error", fullOutput);
db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id);
} finally {
runningProc = null;
runningJobId = null;
}
}
// Scheduler endpoints live on /api/settings/schedule now — see server/api/settings.ts.
// ─── FFmpeg progress parsing ───────────────────────────────────────────────────
/** Parse FFmpeg stderr line for progress. Returns seconds processed or null. */
export function parseFFmpegProgress(line: string): number | null {
const match = line.match(/time=(\d+):(\d+):(\d+)\.(\d+)/);
if (!match) return null;
const [, h, m, s] = match.map(Number);
return h * 3600 + m * 60 + s;
}
export default app;