import { accessSync, constants } from "node:fs"; import { Hono } from "hono"; import { stream } from "hono/streaming"; import { getDb } from "../db/index"; import { log, error as logError, warn } from "../lib/log"; import { parseId } from "../lib/validate"; import { predictExtractedFiles } from "../services/ffmpeg"; import { getScheduleConfig, isInProcessWindow, msUntilProcessWindow, nextProcessWindowTime, sleepBetweenJobs, waitForProcessWindow, } from "../services/scheduler"; import { verifyDesiredState } from "../services/verify"; import type { Job, MediaItem, MediaStream } from "../types"; const app = new Hono(); // ─── Sequential local queue ────────────────────────────────────────────────── let queueRunning = false; let runningProc: ReturnType | null = null; let runningJobId: number | null = null; let activeQueue: Job[] | null = null; let activeSeen: Set | null = null; const LIVE_UPDATE_INTERVAL_MS = 500; const STREAM_CHUNKS_BEFORE_YIELD = 24; export function shouldSendLiveUpdate(now: number, lastSentAt: number, intervalMs = LIVE_UPDATE_INTERVAL_MS): boolean { return now - lastSentAt > intervalMs; } export async function yieldAfterChunk( chunksSinceYield: number, chunksBeforeYield = STREAM_CHUNKS_BEFORE_YIELD, sleep: (ms: number) => Promise = (ms) => Bun.sleep(ms), ): Promise { const next = chunksSinceYield + 1; if (next < chunksBeforeYield) return next; await sleep(0); return 0; } export function enqueueUnseenJobs(queue: T[], seen: Set, jobs: T[]): number { let added = 0; for (const job of jobs) { if (seen.has(job.id)) continue; queue.push(job); seen.add(job.id); added += 1; } return added; } function emitQueueStatus( status: "running" | "paused" | "sleeping" | "idle", extra: { until?: string; seconds?: number } = {}, ): void { const line = `event: queue_status\ndata: ${JSON.stringify({ status, ...extra })}\n\n`; for (const l of jobListeners) l(line); } async function runSequential(initial: Job[]): Promise { if (queueRunning) return; queueRunning = true; try { let first = true; const queue: Job[] = [...initial]; const seen = new Set(queue.map((j) => j.id)); activeQueue = queue; activeSeen = seen; while (queue.length > 0) { const job = queue.shift() as Job; // Pause outside the processing window if (!isInProcessWindow()) { emitQueueStatus("paused", { until: nextProcessWindowTime(), seconds: Math.round(msUntilProcessWindow() / 1000), }); await waitForProcessWindow(); } // Sleep between jobs (but not before the first one) if (!first) { const cfg = getScheduleConfig(); if (cfg.job_sleep_seconds > 0) { emitQueueStatus("sleeping", { seconds: cfg.job_sleep_seconds }); await sleepBetweenJobs(); } } first = false; // Atomic claim: only pick up jobs still pending const db = getDb(); const claimed = db .prepare( "UPDATE jobs SET status = 'running', started_at = datetime('now'), output = '' WHERE id = ? AND status = 'pending'", ) .run(job.id); if (claimed.changes === 0) continue; // cancelled or already running emitQueueStatus("running"); try { await runJob(job); } catch (err) { logError(`Job ${job.id} failed:`, err); } // When the local queue drains, re-check the DB for jobs that were // approved mid-run. Without this they'd sit pending until the user // manually clicks "Run all" again. if (queue.length === 0) { const more = db.prepare("SELECT * FROM jobs WHERE status = 'pending' ORDER BY created_at").all() as Job[]; enqueueUnseenJobs(queue, seen, more); } } } finally { activeQueue = null; activeSeen = null; queueRunning = false; emitQueueStatus("idle"); } } // ─── SSE state ──────────────────────────────────────────────────────────────── const jobListeners = new Set<(data: string) => void>(); function emitJobUpdate(jobId: number, status: string, output?: string): void { const line = `event: job_update\ndata: ${JSON.stringify({ id: jobId, status, output })}\n\n`; for (const l of jobListeners) l(line); } function emitJobProgress(jobId: number, seconds: number, total: number): void { const line = `event: job_progress\ndata: ${JSON.stringify({ id: jobId, seconds, total })}\n\n`; for (const l of jobListeners) l(line); } /** Parse "Duration: HH:MM:SS.MS" from ffmpeg startup output. */ function parseFFmpegDuration(line: string): number | null { const match = line.match(/Duration:\s*(\d+):(\d+):(\d+)\.(\d+)/); if (!match) return null; const [, h, m, s] = match.map(Number); return h * 3600 + m * 60 + s; } function loadJobRow(jobId: number) { const db = getDb(); const row = db .prepare(` SELECT j.*, mi.id as mi_id, mi.name, mi.type, mi.series_name, mi.season_number, mi.episode_number, mi.file_path FROM jobs j LEFT JOIN media_items mi ON mi.id = j.item_id WHERE j.id = ? `) .get(jobId) as | (Job & { mi_id: number | null; name: string | null; type: string | null; series_name: string | null; season_number: number | null; episode_number: number | null; file_path: string | null; }) | undefined; if (!row) return null; const item = row.name ? ({ id: row.item_id, name: row.name, type: row.type, series_name: row.series_name, season_number: row.season_number, episode_number: row.episode_number, file_path: row.file_path, } as unknown as MediaItem) : null; return { job: row as unknown as Job, item }; } // ─── Start all pending ──────────────────────────────────────────────────────── app.post("/start", (c) => { const db = getDb(); const pending = db.prepare("SELECT * FROM jobs WHERE status = 'pending' ORDER BY created_at").all() as Job[]; if (queueRunning && activeQueue && activeSeen) { const queued = enqueueUnseenJobs(activeQueue, activeSeen, pending); return c.json({ ok: true, started: 0, queued }); } runSequential(pending).catch((err) => logError("Queue failed:", err)); return c.json({ ok: true, started: pending.length, queued: pending.length }); }); // ─── Run single ─────────────────────────────────────────────────────────────── app.post("/job/:id/run", async (c) => { const jobId = parseId(c.req.param("id")); if (jobId == null) return c.json({ error: "invalid job id" }, 400); const db = getDb(); const job = db.prepare("SELECT * FROM jobs WHERE id = ?").get(jobId) as Job | undefined; if (!job) return c.notFound(); if (job.status !== "pending") { const result = loadJobRow(jobId); if (!result) return c.notFound(); return c.json(result); } runSequential([job]).catch((err) => logError(`Job ${job.id} failed:`, err)); const result = loadJobRow(jobId); if (!result) return c.notFound(); return c.json(result); }); // ─── Cancel ─────────────────────────────────────────────────────────────────── app.post("/job/:id/cancel", (c) => { const jobId = parseId(c.req.param("id")); if (jobId == null) return c.json({ error: "invalid job id" }, 400); const db = getDb(); db.prepare("DELETE FROM jobs WHERE id = ? AND status = 'pending'").run(jobId); return c.json({ ok: true }); }); // ─── Clear queue ────────────────────────────────────────────────────────────── app.post("/clear", (c) => { const db = getDb(); db .prepare(` UPDATE review_plans SET status = 'pending', reviewed_at = NULL WHERE item_id IN (SELECT item_id FROM jobs WHERE status = 'pending') AND status = 'approved' `) .run(); const result = db.prepare("DELETE FROM jobs WHERE status = 'pending'").run(); return c.json({ ok: true, cleared: result.changes }); }); app.post("/clear-completed", (c) => { const db = getDb(); const result = db.prepare("DELETE FROM jobs WHERE status IN ('done', 'error')").run(); return c.json({ ok: true, cleared: result.changes }); }); // ─── Stop running job ───────────────────────────────────────────────────────── app.post("/stop", (c) => { if (!runningProc || runningJobId == null) { return c.json({ ok: false, error: "No job is currently running" }, 409); } const stoppedId = runningJobId; try { runningProc.kill("SIGTERM"); } catch (err) { logError(`Failed to kill job ${stoppedId}:`, err); return c.json({ ok: false, error: String(err) }, 500); } // runJob's catch path will mark the job error and clean up runningProc. return c.json({ ok: true, stopped: stoppedId }); }); // ─── SSE ────────────────────────────────────────────────────────────────────── app.get("/events", (c) => { return stream(c, async (s) => { c.header("Content-Type", "text/event-stream"); c.header("Cache-Control", "no-cache"); const queue: string[] = []; let resolve: (() => void) | null = null; const listener = (data: string) => { queue.push(data); resolve?.(); }; jobListeners.add(listener); s.onAbort(() => { jobListeners.delete(listener); }); try { while (!s.closed) { if (queue.length > 0) { await s.write(queue.shift()!); } else { await new Promise((res) => { resolve = res; setTimeout(res, 15_000); }); resolve = null; if (queue.length === 0) await s.write(": keepalive\n\n"); } } } finally { jobListeners.delete(listener); } }); }); // ─── Job execution ──────────────────────────────────────────────────────────── async function runJob(job: Job): Promise { log(`Job ${job.id} starting (item=${job.item_id})`); log(`Job ${job.id} command: ${job.command}`); const db = getDb(); const itemRow = db.prepare("SELECT file_path FROM media_items WHERE id = ?").get(job.item_id) as | { file_path: string } | undefined; if (itemRow?.file_path) { try { accessSync(itemRow.file_path, constants.R_OK | constants.W_OK); } catch (fsErr) { const msg = `File not accessible: ${itemRow.file_path}\n${(fsErr as Error).message}`; db .prepare("UPDATE jobs SET status = 'error', output = ?, exit_code = 1, completed_at = datetime('now') WHERE id = ?") .run(msg, job.id); emitJobUpdate(job.id, "error", msg); db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id); return; } // Preflight: if the file already matches the plan, skip ffmpeg. Cheap // guard against re-running a stream-index-based command against a file // that's already been processed — which would either error out or // silently corrupt the file. try { const verify = await verifyDesiredState(db, job.item_id, itemRow.file_path); if (verify.matches) { const msg = `Preflight check: ${verify.reason}\nSkipping FFmpeg — no work needed.`; log(`Job ${job.id} ${msg.replace(/\n/g, " ")}`); db.transaction(() => { db .prepare( "UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?", ) .run(msg, job.id); db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?").run(job.item_id); })(); emitJobUpdate(job.id, "done", msg); return; } log(`Job ${job.id} preflight: ${verify.reason} — running FFmpeg`); } catch (err) { warn(`Job ${job.id} preflight check errored: ${String(err)} — proceeding with FFmpeg`); } } emitJobUpdate(job.id, "running"); const outputLines: string[] = []; let pendingFlush = false; let lastFlushAt = 0; let totalSeconds = 0; let lastProgressEmit = 0; const updateOutput = db.prepare("UPDATE jobs SET output = ? WHERE id = ?"); const flush = (final = false) => { const now = Date.now(); if (!final && !shouldSendLiveUpdate(now, lastFlushAt)) { pendingFlush = true; return; } const text = outputLines.join("\n"); if (final || shouldSendLiveUpdate(now, lastFlushAt)) { updateOutput.run(text, job.id); lastFlushAt = now; pendingFlush = false; } emitJobUpdate(job.id, "running", text); }; const consumeProgress = (line: string) => { if (totalSeconds === 0) { const d = parseFFmpegDuration(line); if (d != null) totalSeconds = d; } const progressed = parseFFmpegProgress(line); if (progressed != null && totalSeconds > 0) { const now = Date.now(); if (shouldSendLiveUpdate(now, lastProgressEmit)) { emitJobProgress(job.id, progressed, totalSeconds); lastProgressEmit = now; } } }; try { const proc = Bun.spawn(["sh", "-c", job.command], { stdout: "pipe", stderr: "pipe" }); runningProc = proc; runningJobId = job.id; const readStream = async (readable: ReadableStream, prefix = "") => { const reader = readable.getReader(); const decoder = new TextDecoder(); let buffer = ""; let chunksSinceYield = 0; try { while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const parts = buffer.split(/\r\n|\n|\r/); buffer = parts.pop() ?? ""; for (const line of parts) { if (!line.trim()) continue; outputLines.push(prefix + line); consumeProgress(line); } flush(); // Let pending HTTP requests run even when ffmpeg floods stdout/stderr. chunksSinceYield = await yieldAfterChunk(chunksSinceYield); } if (buffer.trim()) { outputLines.push(prefix + buffer); consumeProgress(buffer); } } catch (err) { logError(`stream read error (${prefix.trim() || "stdout"}):`, err); } }; await Promise.all([readStream(proc.stdout), readStream(proc.stderr, "[stderr] "), proc.exited]); const exitCode = await proc.exited; if (pendingFlush) updateOutput.run(outputLines.join("\n"), job.id); if (exitCode !== 0) throw new Error(`FFmpeg exited with code ${exitCode}`); const fullOutput = outputLines.join("\n"); // Gather sidecar files to record const item = db.prepare("SELECT * FROM media_items WHERE id = ?").get(job.item_id) as MediaItem | undefined; const streams = db.prepare("SELECT * FROM media_streams WHERE item_id = ?").all(job.item_id) as MediaStream[]; const files = item && streams.length > 0 ? predictExtractedFiles(item, streams) : []; const insertFile = db.prepare( "INSERT OR IGNORE INTO subtitle_files (item_id, file_path, language, codec, is_forced, is_hearing_impaired) VALUES (?, ?, ?, ?, ?, ?)", ); const markJobDone = db.prepare( "UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?", ); const markPlanDone = db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?"); const markSubsExtracted = db.prepare("UPDATE review_plans SET subs_extracted = 1 WHERE item_id = ?"); db.transaction(() => { markJobDone.run(fullOutput, job.id); markPlanDone.run(job.item_id); for (const f of files) { insertFile.run(job.item_id, f.file_path, f.language, f.codec, f.is_forced ? 1 : 0, f.is_hearing_impaired ? 1 : 0); } if (files.length > 0) markSubsExtracted.run(job.item_id); })(); log(`Job ${job.id} completed successfully`); emitJobUpdate(job.id, "done", fullOutput); } catch (err) { logError(`Job ${job.id} failed:`, err); const fullOutput = `${outputLines.join("\n")}\n${String(err)}`; const summary = extractErrorSummary(outputLines, err); // Prepend the scraped summary so the job log starts with what broke. // ffmpeg's 200-line stream+config banner buries the real error; this // gives the UI a crisp hook for the failure cause. const annotatedOutput = summary ? `${summary}\n\n---\n\n${fullOutput}` : fullOutput; db .prepare("UPDATE jobs SET status = 'error', exit_code = 1, output = ?, completed_at = datetime('now') WHERE id = ?") .run(annotatedOutput, job.id); emitJobUpdate(job.id, "error", annotatedOutput); db .prepare("UPDATE review_plans SET status = 'error', notes = ? WHERE item_id = ?") .run(summary ?? String(err), job.item_id); } finally { runningProc = null; runningJobId = null; } } /** * Extract a short, human-readable reason from a failed job's stderr. * * ffmpeg prints a ~200-line banner (version, config, every stream in the * input file) before the real error shows up. We scan the tail of the * output for the last line matching fatal keywords, plus anything ffmpeg * explicitly labels "Error:" or "Conversion failed!". Returns up to three * lines so the UI can show a crisp summary without users scrolling the * full log. */ export function extractErrorSummary(outputLines: string[], thrown?: unknown): string | null { const FATAL = /(Error:|Conversion failed!|Unsupported\b|Invalid argument|Permission denied|No such file|Cannot allocate|No space left|Killed|Segmentation fault)/; // Only scan the last 60 lines — anything earlier is the banner or stream // mapping. The real cause sits near the end. const tail = outputLines.slice(-60).filter((l) => l.trim()); const hits: string[] = []; for (const line of tail) { if (FATAL.test(line)) hits.push(line.replace(/^\[stderr]\s*/, "")); } const unique = [...new Set(hits)].slice(-3); if (unique.length === 0) { // Fell off the end with no recognisable fatal line — fall back to the // thrown error (usually "FFmpeg exited with code N"). Better than // showing nothing, since the exit code at least tells someone *where* // to look. return thrown ? String(thrown) : null; } return unique.join("\n"); } // Scheduler endpoints live on /api/settings/schedule now — see server/api/settings.ts. // ─── FFmpeg progress parsing ─────────────────────────────────────────────────── /** Parse FFmpeg stderr line for progress. Returns seconds processed or null. */ export function parseFFmpegProgress(line: string): number | null { const match = line.match(/time=(\d+):(\d+):(\d+)\.(\d+)/); if (!match) return null; const [, h, m, s] = match.map(Number); return h * 3600 + m * 60 + s; } export default app;