5d0af08b79
Build and Push Docker Image / build (push) Successful in 1m52s
Three threads: 1. Drop the "⚡ Auto-approve" / "✋ Needs decision" pills on PipelineCard and the "N auto · M need decisions" subtitle on ReviewColumn — noise for a workflow that wants to be unattended. Card amber tint stays as a softer cue. Remove the now-unused reviewManualCount field on the pipeline payload. 2. New /help route in the nav. Documents what netfelix actually does end-to-end, the folder/SxxExx/ID brackets we require, and that the codec/quality/audio brackets are *arr's job — we trigger their rename API instead of parsing them ourselves. Links to TRaSH guides. 3. Refactor triggerMovieRename / triggerSeriesRename to return a basename → new-basename map instead of one path. Add a batched triggerRenameFor in execute.ts that dedupes by movie and by series (one Sonarr call covers every episode of a series). Hook into processInbox: when an item becomes noop, fire a rename pass so lying filenames on already-clean files self-heal. Idempotent — *arr returns no work to do when names already match. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
673 lines
25 KiB
TypeScript
673 lines
25 KiB
TypeScript
import { accessSync, constants } from "node:fs";
|
|
import { Hono } from "hono";
|
|
import { stream } from "hono/streaming";
|
|
import { getConfig, getDb } from "../db/index";
|
|
import { log, error as logError, warn } from "../lib/log";
|
|
import { parseId } from "../lib/validate";
|
|
import { isExtractableSubtitle } from "../services/ffmpeg";
|
|
import * as radarr from "../services/radarr";
|
|
import * as sonarr from "../services/sonarr";
|
|
import {
|
|
getScheduleConfig,
|
|
isInProcessWindow,
|
|
msUntilProcessWindow,
|
|
nextProcessWindowTime,
|
|
sleepBetweenJobs,
|
|
waitForProcessWindow,
|
|
} from "../services/scheduler";
|
|
import { verifyDesiredState } from "../services/verify";
|
|
import type { Job, MediaItem, MediaStream } from "../types";
|
|
|
|
const app = new Hono();
|
|
|
|
// ─── Sequential local queue ──────────────────────────────────────────────────
|
|
|
|
let queueRunning = false;
|
|
let queueAbort: AbortController | null = null;
|
|
let runningProc: ReturnType<typeof Bun.spawn> | null = null;
|
|
let runningJobId: number | null = null;
|
|
let activeQueue: Job[] | null = null;
|
|
let activeSeen: Set<number> | null = null;
|
|
const LIVE_UPDATE_INTERVAL_MS = 500;
|
|
const STREAM_CHUNKS_BEFORE_YIELD = 24;
|
|
|
|
export function shouldSendLiveUpdate(now: number, lastSentAt: number, intervalMs = LIVE_UPDATE_INTERVAL_MS): boolean {
|
|
return now - lastSentAt > intervalMs;
|
|
}
|
|
|
|
export async function yieldAfterChunk(
|
|
chunksSinceYield: number,
|
|
chunksBeforeYield = STREAM_CHUNKS_BEFORE_YIELD,
|
|
sleep: (ms: number) => Promise<unknown> = (ms) => Bun.sleep(ms),
|
|
): Promise<number> {
|
|
const next = chunksSinceYield + 1;
|
|
if (next < chunksBeforeYield) return next;
|
|
await sleep(0);
|
|
return 0;
|
|
}
|
|
|
|
export function enqueueUnseenJobs<T extends { id: number }>(queue: T[], seen: Set<number>, jobs: T[]): number {
|
|
let added = 0;
|
|
for (const job of jobs) {
|
|
if (seen.has(job.id)) continue;
|
|
queue.push(job);
|
|
seen.add(job.id);
|
|
added += 1;
|
|
}
|
|
return added;
|
|
}
|
|
|
|
function emitQueueStatus(
|
|
status: "running" | "paused" | "sleeping" | "idle",
|
|
extra: { until?: string; seconds?: number } = {},
|
|
): void {
|
|
const line = `event: queue_status\ndata: ${JSON.stringify({ status, ...extra })}\n\n`;
|
|
for (const l of jobListeners) l(line);
|
|
}
|
|
|
|
async function runSequential(initial: Job[], { drain = true } = {}): Promise<void> {
|
|
if (queueRunning) return;
|
|
queueRunning = true;
|
|
queueAbort = new AbortController();
|
|
const { signal } = queueAbort;
|
|
try {
|
|
let first = true;
|
|
const queue: Job[] = [...initial];
|
|
const seen = new Set<number>(queue.map((j) => j.id));
|
|
activeQueue = queue;
|
|
activeSeen = seen;
|
|
|
|
while (queue.length > 0) {
|
|
if (signal.aborted) break;
|
|
const job = queue.shift() as Job;
|
|
|
|
// Pause outside the processing window
|
|
if (!isInProcessWindow()) {
|
|
emitQueueStatus("paused", {
|
|
until: nextProcessWindowTime(),
|
|
seconds: Math.round(msUntilProcessWindow() / 1000),
|
|
});
|
|
await waitForProcessWindow();
|
|
}
|
|
|
|
// Sleep between jobs (but not before the first one)
|
|
if (!first) {
|
|
const cfg = getScheduleConfig();
|
|
if (cfg.job_sleep_seconds > 0) {
|
|
emitQueueStatus("sleeping", { seconds: cfg.job_sleep_seconds });
|
|
await sleepBetweenJobs();
|
|
}
|
|
}
|
|
first = false;
|
|
|
|
// Atomic claim: only pick up jobs still pending
|
|
const db = getDb();
|
|
const claimed = db
|
|
.prepare(
|
|
"UPDATE jobs SET status = 'running', started_at = datetime('now'), output = '' WHERE id = ? AND status = 'pending'",
|
|
)
|
|
.run(job.id);
|
|
if (claimed.changes === 0) continue; // cancelled or already running
|
|
emitQueueStatus("running");
|
|
try {
|
|
await runJob(job);
|
|
} catch (err) {
|
|
logError(`Job ${job.id} failed:`, err);
|
|
}
|
|
|
|
// When the local queue drains, re-check the DB for jobs that were
|
|
// approved mid-run. Without this they'd sit pending until the user
|
|
// manually clicks "Run all" again. Skip if aborted or drain=false.
|
|
if (drain && queue.length === 0 && !signal.aborted) {
|
|
const more = db.prepare("SELECT * FROM jobs WHERE status = 'pending' ORDER BY created_at").all() as Job[];
|
|
enqueueUnseenJobs(queue, seen, more);
|
|
}
|
|
}
|
|
} finally {
|
|
activeQueue = null;
|
|
activeSeen = null;
|
|
queueRunning = false;
|
|
queueAbort = null;
|
|
emitQueueStatus("idle");
|
|
}
|
|
}
|
|
|
|
// ─── SSE state ────────────────────────────────────────────────────────────────
|
|
|
|
const jobListeners = new Set<(data: string) => void>();
|
|
|
|
function emitJobUpdate(jobId: number, status: string, output?: string): void {
|
|
const line = `event: job_update\ndata: ${JSON.stringify({ id: jobId, status, output })}\n\n`;
|
|
for (const l of jobListeners) l(line);
|
|
}
|
|
|
|
function emitJobProgress(jobId: number, seconds: number, total: number): void {
|
|
const line = `event: job_progress\ndata: ${JSON.stringify({ id: jobId, seconds, total })}\n\n`;
|
|
for (const l of jobListeners) l(line);
|
|
}
|
|
|
|
export function emitInboxSorted(result: { moved_to_queue: number; moved_to_review: number }): void {
|
|
const line = `event: inbox_sorted\ndata: ${JSON.stringify(result)}\n\n`;
|
|
for (const l of jobListeners) l(line);
|
|
}
|
|
|
|
export function emitInboxSortStart(total: number): void {
|
|
const line = `event: inbox_sort_start\ndata: ${JSON.stringify({ total })}\n\n`;
|
|
for (const l of jobListeners) l(line);
|
|
}
|
|
|
|
export function emitInboxSortProgress(processed: number, total: number): void {
|
|
const line = `event: inbox_sort_progress\ndata: ${JSON.stringify({ processed, total })}\n\n`;
|
|
for (const l of jobListeners) l(line);
|
|
}
|
|
|
|
/** Lightweight nudge so the pipeline page refreshes column data. */
|
|
export function emitPipelineChanged(): void {
|
|
const line = "event: pipeline_changed\ndata: {}\n\n";
|
|
for (const l of jobListeners) l(line);
|
|
}
|
|
|
|
/**
|
|
* If the queue runner is idle and the auto_process_queue config is on, kick
|
|
* off a sequential pass over whatever's currently pending. Used by the
|
|
* enqueue path (so a fresh approval immediately starts draining) and by the
|
|
* settings toggle (so flipping the checkbox drains existing queue items).
|
|
* Returns true when a new run was started.
|
|
*/
|
|
export function maybeStartQueueProcessor(): boolean {
|
|
if (queueRunning) return false;
|
|
if (getConfig("auto_process_queue") !== "1") return false;
|
|
const pending = getDb().prepare("SELECT * FROM jobs WHERE status = 'pending' ORDER BY created_at").all() as Job[];
|
|
if (pending.length === 0) return false;
|
|
runSequential(pending).catch((err) => logError("Queue failed:", err));
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Abort the queue loop so no more jobs start after the current one. The
|
|
* currently running ffmpeg keeps going — use POST /stop to also kill it.
|
|
* Mirrors stopAutoProcessLoop() for the inbox sorter.
|
|
*/
|
|
export function stopQueueProcessor(): boolean {
|
|
if (queueAbort) {
|
|
queueAbort.abort();
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/** Parse "Duration: HH:MM:SS.MS" from ffmpeg startup output. */
|
|
function parseFFmpegDuration(line: string): number | null {
|
|
const match = line.match(/Duration:\s*(\d+):(\d+):(\d+)\.(\d+)/);
|
|
if (!match) return null;
|
|
const [, h, m, s] = match.map(Number);
|
|
return h * 3600 + m * 60 + s;
|
|
}
|
|
|
|
function loadJobRow(jobId: number) {
|
|
const db = getDb();
|
|
const row = db
|
|
.prepare(`
|
|
SELECT j.*, mi.id as mi_id, mi.name, mi.type, mi.series_name, mi.season_number,
|
|
mi.episode_number, mi.file_path
|
|
FROM jobs j
|
|
LEFT JOIN media_items mi ON mi.id = j.item_id
|
|
WHERE j.id = ?
|
|
`)
|
|
.get(jobId) as
|
|
| (Job & {
|
|
mi_id: number | null;
|
|
name: string | null;
|
|
type: string | null;
|
|
series_name: string | null;
|
|
season_number: number | null;
|
|
episode_number: number | null;
|
|
file_path: string | null;
|
|
})
|
|
| undefined;
|
|
|
|
if (!row) return null;
|
|
|
|
const item = row.name
|
|
? ({
|
|
id: row.item_id,
|
|
name: row.name,
|
|
type: row.type,
|
|
series_name: row.series_name,
|
|
season_number: row.season_number,
|
|
episode_number: row.episode_number,
|
|
file_path: row.file_path,
|
|
} as unknown as MediaItem)
|
|
: null;
|
|
return { job: row as unknown as Job, item };
|
|
}
|
|
|
|
// ─── Start all pending ────────────────────────────────────────────────────────
|
|
|
|
app.post("/start", (c) => {
|
|
const db = getDb();
|
|
const pending = db.prepare("SELECT * FROM jobs WHERE status = 'pending' ORDER BY created_at").all() as Job[];
|
|
if (queueRunning && activeQueue && activeSeen) {
|
|
const queued = enqueueUnseenJobs(activeQueue, activeSeen, pending);
|
|
return c.json({ ok: true, started: 0, queued });
|
|
}
|
|
runSequential(pending).catch((err) => logError("Queue failed:", err));
|
|
return c.json({ ok: true, started: pending.length, queued: pending.length });
|
|
});
|
|
|
|
// ─── Run single ───────────────────────────────────────────────────────────────
|
|
|
|
app.post("/job/:id/run", async (c) => {
|
|
const jobId = parseId(c.req.param("id"));
|
|
if (jobId == null) return c.json({ error: "invalid job id" }, 400);
|
|
const db = getDb();
|
|
const job = db.prepare("SELECT * FROM jobs WHERE id = ?").get(jobId) as Job | undefined;
|
|
if (!job) return c.notFound();
|
|
if (job.status !== "pending") {
|
|
const result = loadJobRow(jobId);
|
|
if (!result) return c.notFound();
|
|
return c.json(result);
|
|
}
|
|
runSequential([job], { drain: false }).catch((err) => logError(`Job ${job.id} failed:`, err));
|
|
const result = loadJobRow(jobId);
|
|
if (!result) return c.notFound();
|
|
return c.json(result);
|
|
});
|
|
|
|
// ─── Cancel ───────────────────────────────────────────────────────────────────
|
|
|
|
app.post("/job/:id/cancel", (c) => {
|
|
const jobId = parseId(c.req.param("id"));
|
|
if (jobId == null) return c.json({ error: "invalid job id" }, 400);
|
|
const db = getDb();
|
|
db.prepare("DELETE FROM jobs WHERE id = ? AND status = 'pending'").run(jobId);
|
|
return c.json({ ok: true });
|
|
});
|
|
|
|
// ─── Clear queue ──────────────────────────────────────────────────────────────
|
|
|
|
/**
|
|
* Cancel every pending job and send its plan back to the Inbox so the
|
|
* distributor can re-classify it. Without `sorted = 0` the plan stays in
|
|
* Review with `auto_class='auto'` — where "Approve all ready" (auto_heuristic
|
|
* only) can't re-queue it and "Auto Review" (sort-inbox, sorted=0 only) can't
|
|
* see it, leaving the item stranded until the user manually approves.
|
|
*/
|
|
export function clearQueue(db: ReturnType<typeof getDb>): number {
|
|
const rows = db.prepare("SELECT item_id FROM jobs WHERE status = 'pending'").all() as { item_id: number }[];
|
|
for (const { item_id } of rows) {
|
|
db
|
|
.prepare(
|
|
"UPDATE review_plans SET status = 'pending', reviewed_at = NULL, sorted = 0, auto_class = NULL WHERE item_id = ? AND status != 'running'",
|
|
)
|
|
.run(item_id);
|
|
db.prepare("DELETE FROM jobs WHERE item_id = ? AND status IN ('pending', 'done', 'error')").run(item_id);
|
|
}
|
|
return rows.length;
|
|
}
|
|
|
|
app.post("/clear", (c) => {
|
|
const cleared = clearQueue(getDb());
|
|
return c.json({ ok: true, cleared });
|
|
});
|
|
|
|
app.post("/clear-completed", (c) => {
|
|
const db = getDb();
|
|
const result = db.prepare("DELETE FROM jobs WHERE status IN ('done', 'error')").run();
|
|
return c.json({ ok: true, cleared: result.changes });
|
|
});
|
|
|
|
// ─── Stop running job ─────────────────────────────────────────────────────────
|
|
|
|
app.post("/stop", (c) => {
|
|
// Abort the queue loop so no more jobs start after the current one
|
|
if (queueAbort) queueAbort.abort();
|
|
|
|
if (!runningProc || runningJobId == null) {
|
|
// No active ffmpeg but queue loop might be between jobs — abort is enough
|
|
return c.json({ ok: true, stopped: null });
|
|
}
|
|
const stoppedId = runningJobId;
|
|
try {
|
|
runningProc.kill("SIGTERM");
|
|
} catch (err) {
|
|
logError(`Failed to kill job ${stoppedId}:`, err);
|
|
return c.json({ ok: false, error: String(err) }, 500);
|
|
}
|
|
// runJob's catch path will mark the job error and clean up runningProc.
|
|
return c.json({ ok: true, stopped: stoppedId });
|
|
});
|
|
|
|
// ─── SSE ──────────────────────────────────────────────────────────────────────
|
|
|
|
app.get("/events", (c) => {
|
|
return stream(c, async (s) => {
|
|
c.header("Content-Type", "text/event-stream");
|
|
c.header("Cache-Control", "no-cache");
|
|
|
|
const queue: string[] = [];
|
|
let resolve: (() => void) | null = null;
|
|
const listener = (data: string) => {
|
|
queue.push(data);
|
|
resolve?.();
|
|
};
|
|
|
|
jobListeners.add(listener);
|
|
s.onAbort(() => {
|
|
jobListeners.delete(listener);
|
|
});
|
|
|
|
try {
|
|
while (!s.closed) {
|
|
if (queue.length > 0) {
|
|
await s.write(queue.shift()!);
|
|
} else {
|
|
await new Promise<void>((res) => {
|
|
resolve = res;
|
|
setTimeout(res, 15_000);
|
|
});
|
|
resolve = null;
|
|
if (queue.length === 0) await s.write(": keepalive\n\n");
|
|
}
|
|
}
|
|
} finally {
|
|
jobListeners.delete(listener);
|
|
}
|
|
});
|
|
});
|
|
|
|
// ─── Job execution ────────────────────────────────────────────────────────────
|
|
|
|
async function runJob(job: Job): Promise<void> {
|
|
log(`Job ${job.id} starting (item=${job.item_id})`);
|
|
log(`Job ${job.id} command: ${job.command}`);
|
|
const db = getDb();
|
|
|
|
const itemRow = db.prepare("SELECT file_path FROM media_items WHERE id = ?").get(job.item_id) as
|
|
| { file_path: string }
|
|
| undefined;
|
|
if (itemRow?.file_path) {
|
|
try {
|
|
accessSync(itemRow.file_path, constants.R_OK | constants.W_OK);
|
|
} catch (fsErr) {
|
|
const msg = `File not accessible: ${itemRow.file_path}\n${(fsErr as Error).message}`;
|
|
db
|
|
.prepare("UPDATE jobs SET status = 'error', output = ?, exit_code = 1, completed_at = datetime('now') WHERE id = ?")
|
|
.run(msg, job.id);
|
|
emitJobUpdate(job.id, "error", msg);
|
|
db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id);
|
|
return;
|
|
}
|
|
|
|
// Preflight: if the file already matches the plan, skip ffmpeg. Cheap
|
|
// guard against re-running a stream-index-based command against a file
|
|
// that's already been processed — which would either error out or
|
|
// silently corrupt the file.
|
|
try {
|
|
const verify = await verifyDesiredState(db, job.item_id, itemRow.file_path);
|
|
if (verify.matches) {
|
|
const msg = `Preflight check: ${verify.reason}\nSkipping FFmpeg — no work needed.`;
|
|
log(`Job ${job.id} ${msg.replace(/\n/g, " ")}`);
|
|
db.transaction(() => {
|
|
db
|
|
.prepare(
|
|
"UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?",
|
|
)
|
|
.run(msg, job.id);
|
|
db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?").run(job.item_id);
|
|
})();
|
|
emitJobUpdate(job.id, "done", msg);
|
|
return;
|
|
}
|
|
log(`Job ${job.id} preflight: ${verify.reason} — running FFmpeg`);
|
|
} catch (err) {
|
|
warn(`Job ${job.id} preflight check errored: ${String(err)} — proceeding with FFmpeg`);
|
|
}
|
|
}
|
|
|
|
emitJobUpdate(job.id, "running");
|
|
|
|
const outputLines: string[] = [];
|
|
let pendingFlush = false;
|
|
let lastFlushAt = 0;
|
|
let totalSeconds = 0;
|
|
let lastProgressEmit = 0;
|
|
const updateOutput = db.prepare("UPDATE jobs SET output = ? WHERE id = ?");
|
|
|
|
const flush = (final = false) => {
|
|
const now = Date.now();
|
|
if (!final && !shouldSendLiveUpdate(now, lastFlushAt)) {
|
|
pendingFlush = true;
|
|
return;
|
|
}
|
|
const text = outputLines.join("\n");
|
|
if (final || shouldSendLiveUpdate(now, lastFlushAt)) {
|
|
updateOutput.run(text, job.id);
|
|
lastFlushAt = now;
|
|
pendingFlush = false;
|
|
}
|
|
emitJobUpdate(job.id, "running", text);
|
|
};
|
|
|
|
const consumeProgress = (line: string) => {
|
|
if (totalSeconds === 0) {
|
|
const d = parseFFmpegDuration(line);
|
|
if (d != null) totalSeconds = d;
|
|
}
|
|
const progressed = parseFFmpegProgress(line);
|
|
if (progressed != null && totalSeconds > 0) {
|
|
const now = Date.now();
|
|
if (shouldSendLiveUpdate(now, lastProgressEmit)) {
|
|
emitJobProgress(job.id, progressed, totalSeconds);
|
|
lastProgressEmit = now;
|
|
}
|
|
}
|
|
};
|
|
|
|
try {
|
|
const proc = Bun.spawn(["sh", "-c", job.command], { stdout: "pipe", stderr: "pipe" });
|
|
runningProc = proc;
|
|
runningJobId = job.id;
|
|
const readStream = async (readable: ReadableStream<Uint8Array>, prefix = "") => {
|
|
const reader = readable.getReader();
|
|
const decoder = new TextDecoder();
|
|
let buffer = "";
|
|
let chunksSinceYield = 0;
|
|
try {
|
|
while (true) {
|
|
const { done, value } = await reader.read();
|
|
if (done) break;
|
|
buffer += decoder.decode(value, { stream: true });
|
|
const parts = buffer.split(/\r\n|\n|\r/);
|
|
buffer = parts.pop() ?? "";
|
|
for (const line of parts) {
|
|
if (!line.trim()) continue;
|
|
outputLines.push(prefix + line);
|
|
consumeProgress(line);
|
|
}
|
|
flush();
|
|
// Let pending HTTP requests run even when ffmpeg floods stdout/stderr.
|
|
chunksSinceYield = await yieldAfterChunk(chunksSinceYield);
|
|
}
|
|
if (buffer.trim()) {
|
|
outputLines.push(prefix + buffer);
|
|
consumeProgress(buffer);
|
|
}
|
|
} catch (err) {
|
|
logError(`stream read error (${prefix.trim() || "stdout"}):`, err);
|
|
}
|
|
};
|
|
await Promise.all([readStream(proc.stdout), readStream(proc.stderr, "[stderr] "), proc.exited]);
|
|
const exitCode = await proc.exited;
|
|
if (pendingFlush) updateOutput.run(outputLines.join("\n"), job.id);
|
|
if (exitCode !== 0) throw new Error(`FFmpeg exited with code ${exitCode}`);
|
|
|
|
const fullOutput = outputLines.join("\n");
|
|
|
|
// Post-job bookkeeping. The extraction itself is driven by the ffmpeg
|
|
// command (see buildExtractionOutputs) — sidecars are already on disk.
|
|
// Here we just need to flip subs_extracted on the plan so verify.ts
|
|
// can tell the extraction step has run.
|
|
const streams = db.prepare("SELECT * FROM media_streams WHERE item_id = ?").all(job.item_id) as MediaStream[];
|
|
const hadExtractableSubs = streams.some((s) => s.type === "Subtitle" && isExtractableSubtitle(s.codec));
|
|
|
|
const markJobDone = db.prepare(
|
|
"UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?",
|
|
);
|
|
const markPlanDone = db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?");
|
|
const markSubsExtracted = db.prepare("UPDATE review_plans SET subs_extracted = 1 WHERE item_id = ?");
|
|
|
|
db.transaction(() => {
|
|
markJobDone.run(fullOutput, job.id);
|
|
markPlanDone.run(job.item_id);
|
|
if (hadExtractableSubs) markSubsExtracted.run(job.item_id);
|
|
})();
|
|
|
|
log(`Job ${job.id} completed successfully`);
|
|
emitJobUpdate(job.id, "done", fullOutput);
|
|
|
|
// Trigger Radarr/Sonarr rescan + rename in the background.
|
|
// Non-blocking: rename failures must not affect job status.
|
|
triggerPostJobRename(job.item_id).catch((e) => warn(`Post-job rename for item ${job.item_id}: ${e}`));
|
|
} catch (err) {
|
|
logError(`Job ${job.id} failed:`, err);
|
|
const fullOutput = `${outputLines.join("\n")}\n${String(err)}`;
|
|
const summary = extractErrorSummary(outputLines, err);
|
|
// Prepend the scraped summary so the job log starts with what broke.
|
|
// ffmpeg's 200-line stream+config banner buries the real error; this
|
|
// gives the UI a crisp hook for the failure cause.
|
|
const annotatedOutput = summary ? `${summary}\n\n---\n\n${fullOutput}` : fullOutput;
|
|
db
|
|
.prepare("UPDATE jobs SET status = 'error', exit_code = 1, output = ?, completed_at = datetime('now') WHERE id = ?")
|
|
.run(annotatedOutput, job.id);
|
|
emitJobUpdate(job.id, "error", annotatedOutput);
|
|
db
|
|
.prepare("UPDATE review_plans SET status = 'error', notes = ? WHERE item_id = ?")
|
|
.run(summary ?? String(err), job.item_id);
|
|
} finally {
|
|
runningProc = null;
|
|
runningJobId = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Extract a short, human-readable reason from a failed job's stderr.
|
|
*
|
|
* ffmpeg prints a ~200-line banner (version, config, every stream in the
|
|
* input file) before the real error shows up. We scan the tail of the
|
|
* output for the last line matching fatal keywords, plus anything ffmpeg
|
|
* explicitly labels "Error:" or "Conversion failed!". Returns up to three
|
|
* lines so the UI can show a crisp summary without users scrolling the
|
|
* full log.
|
|
*/
|
|
export function extractErrorSummary(outputLines: string[], thrown?: unknown): string | null {
|
|
const FATAL =
|
|
/(Error:|Conversion failed!|Unsupported\b|Invalid argument|Permission denied|No such file|Cannot allocate|No space left|Killed|Segmentation fault)/;
|
|
// Only scan the last 60 lines — anything earlier is the banner or stream
|
|
// mapping. The real cause sits near the end.
|
|
const tail = outputLines.slice(-60).filter((l) => l.trim());
|
|
const hits: string[] = [];
|
|
for (const line of tail) {
|
|
if (FATAL.test(line)) hits.push(line.replace(/^\[stderr]\s*/, ""));
|
|
}
|
|
const unique = [...new Set(hits)].slice(-3);
|
|
if (unique.length === 0) {
|
|
// Fell off the end with no recognisable fatal line — fall back to the
|
|
// thrown error (usually "FFmpeg exited with code N"). Better than
|
|
// showing nothing, since the exit code at least tells someone *where*
|
|
// to look.
|
|
return thrown ? String(thrown) : null;
|
|
}
|
|
return unique.join("\n");
|
|
}
|
|
|
|
// Scheduler endpoints live on /api/settings/schedule now — see server/api/settings.ts.
|
|
|
|
// ─── FFmpeg progress parsing ───────────────────────────────────────────────────
|
|
|
|
/** Parse FFmpeg stderr line for progress. Returns seconds processed or null. */
|
|
export function parseFFmpegProgress(line: string): number | null {
|
|
const match = line.match(/time=(\d+):(\d+):(\d+)\.(\d+)/);
|
|
if (!match) return null;
|
|
const [, h, m, s] = match.map(Number);
|
|
return h * 3600 + m * 60 + s;
|
|
}
|
|
|
|
// ─── Radarr/Sonarr rename ─────────────────────────────────────────────────────
|
|
|
|
/**
|
|
* Trigger Radarr/Sonarr rename for the given items, deduplicated: one call
|
|
* per movie, one call per series (covers every episode of that series in
|
|
* one shot). After each call, updates file_path on every media_items row
|
|
* whose basename was renamed.
|
|
*
|
|
* Idempotent — *arr returns no work to do when filenames already match.
|
|
* Used after a job completes (fix-up post-transcode) and after processInbox
|
|
* classifies items as noop (catch lying filenames on already-clean files).
|
|
*/
|
|
async function triggerRenameFor(itemIds: number[]): Promise<void> {
|
|
if (itemIds.length === 0) return;
|
|
const db = getDb();
|
|
const placeholders = itemIds.map(() => "?").join(",");
|
|
const items = db
|
|
.prepare(`SELECT id, type, file_path, tmdb_id, imdb_id, tvdb_id FROM media_items WHERE id IN (${placeholders})`)
|
|
.all(...itemIds) as Pick<MediaItem, "id" | "type" | "file_path" | "tmdb_id" | "imdb_id" | "tvdb_id">[];
|
|
|
|
const movies = items.filter((i) => i.type === "Movie");
|
|
const seriesByTvdb = new Map<string, typeof items>();
|
|
for (const ep of items.filter((i) => i.type === "Episode")) {
|
|
if (!ep.tvdb_id) continue;
|
|
const list = seriesByTvdb.get(ep.tvdb_id) ?? [];
|
|
list.push(ep);
|
|
seriesByTvdb.set(ep.tvdb_id, list);
|
|
}
|
|
|
|
const radarrCfg: radarr.RadarrConfig = { url: getConfig("radarr_url") ?? "", apiKey: getConfig("radarr_api_key") ?? "" };
|
|
const sonarrCfg: sonarr.SonarrConfig = { url: getConfig("sonarr_url") ?? "", apiKey: getConfig("sonarr_api_key") ?? "" };
|
|
|
|
for (const movie of movies) {
|
|
const result = await radarr.triggerMovieRename(radarrCfg, { tmdbId: movie.tmdb_id, imdbId: movie.imdb_id });
|
|
if (!result.ok) {
|
|
warn(`Rename for movie ${movie.id}: ${result.error}`);
|
|
continue;
|
|
}
|
|
applyRenamesToDb(db, result.renames);
|
|
}
|
|
|
|
for (const tvdbId of seriesByTvdb.keys()) {
|
|
const result = await sonarr.triggerSeriesRename(sonarrCfg, { tvdbId });
|
|
if (!result.ok) {
|
|
warn(`Rename for series tvdb=${tvdbId}: ${result.error}`);
|
|
continue;
|
|
}
|
|
applyRenamesToDb(db, result.renames);
|
|
}
|
|
}
|
|
|
|
function applyRenamesToDb(db: ReturnType<typeof getDb>, renames: Map<string, string>): void {
|
|
if (renames.size === 0) return;
|
|
const sel = db.prepare("SELECT id, file_path FROM media_items WHERE file_path LIKE ?");
|
|
const upd = db.prepare("UPDATE media_items SET file_path = ? WHERE id = ?");
|
|
for (const [oldName, newName] of renames) {
|
|
if (oldName === newName) continue;
|
|
const rows = sel.all(`%/${oldName}`) as { id: number; file_path: string }[];
|
|
for (const r of rows) {
|
|
const newPath = `${r.file_path.slice(0, r.file_path.length - oldName.length)}${newName}`;
|
|
upd.run(newPath, r.id);
|
|
log(`Item ${r.id} renamed: ${r.file_path} → ${newPath}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
/** Fire-and-forget rename trigger. Errors are logged, never thrown. */
|
|
export function triggerRenameForItems(itemIds: number[]): void {
|
|
triggerRenameFor(itemIds).catch((e) => warn(`Rename batch failed: ${e}`));
|
|
}
|
|
|
|
/** Single-item helper used by the per-job post-success path. */
|
|
function triggerPostJobRename(itemId: number): Promise<void> {
|
|
return triggerRenameFor([itemId]);
|
|
}
|
|
|
|
export default app;
|