import { Hono } from 'hono'; import { stream } from 'hono/streaming'; import { getDb } from '../db/index'; import type { Job, MediaItem, MediaStream } from '../types'; import { predictExtractedFiles } from '../services/ffmpeg'; import { accessSync, constants } from 'node:fs'; import { log, error as logError } from '../lib/log'; import { getSchedulerState, updateSchedulerState } from '../services/scheduler'; const app = new Hono(); // ─── Sequential local queue ────────────────────────────────────────────────── let queueRunning = false; async function runSequential(jobs: Job[]): Promise { if (queueRunning) return; queueRunning = true; try { for (const job of jobs) { // Atomic claim: only pick up jobs still pending const db = getDb(); const claimed = db .prepare("UPDATE jobs SET status = 'running', started_at = datetime('now'), output = '' WHERE id = ? AND status = 'pending'") .run(job.id); if (claimed.changes === 0) continue; // cancelled or already running try { await runJob(job); } catch (err) { logError(`Job ${job.id} failed:`, err); } } } finally { queueRunning = false; } } // ─── SSE state ──────────────────────────────────────────────────────────────── const jobListeners = new Set<(data: string) => void>(); function emitJobUpdate(jobId: number, status: string, output?: string): void { const line = `event: job_update\ndata: ${JSON.stringify({ id: jobId, status, output })}\n\n`; for (const l of jobListeners) l(line); } function loadJobRow(jobId: number) { const db = getDb(); const row = db.prepare(` SELECT j.*, mi.id as mi_id, mi.name, mi.type, mi.series_name, mi.season_number, mi.episode_number, mi.file_path FROM jobs j LEFT JOIN media_items mi ON mi.id = j.item_id WHERE j.id = ? `).get(jobId) as (Job & { mi_id: number | null; name: string | null; type: string | null; series_name: string | null; season_number: number | null; episode_number: number | null; file_path: string | null; }) | undefined; if (!row) return null; const item = row.name ? { id: row.item_id, name: row.name, type: row.type, series_name: row.series_name, season_number: row.season_number, episode_number: row.episode_number, file_path: row.file_path } as unknown as MediaItem : null; return { job: row as unknown as Job, item }; } // ─── List ───────────────────────────────────────────────────────────────────── app.get('/', (c) => { const db = getDb(); const filter = (c.req.query('filter') ?? 'pending') as 'all' | 'pending' | 'running' | 'done' | 'error'; const validFilters = ['all', 'pending', 'running', 'done', 'error']; const whereClause = validFilters.includes(filter) && filter !== 'all' ? `WHERE j.status = ?` : ''; const params = whereClause ? [filter] : []; const jobRows = db.prepare(` SELECT j.*, mi.name, mi.type, mi.series_name, mi.season_number, mi.episode_number, mi.file_path FROM jobs j LEFT JOIN media_items mi ON mi.id = j.item_id ${whereClause} ORDER BY j.created_at DESC LIMIT 200 `).all(...params) as (Job & { name: string; type: string; series_name: string | null; season_number: number | null; episode_number: number | null; file_path: string })[]; const jobs = jobRows.map((r) => ({ job: r as unknown as Job, item: r.name ? { id: r.item_id, name: r.name, type: r.type, series_name: r.series_name, season_number: r.season_number, episode_number: r.episode_number, file_path: r.file_path } as unknown as MediaItem : null, })); const countRows = db.prepare('SELECT status, COUNT(*) as cnt FROM jobs GROUP BY status').all() as { status: string; cnt: number }[]; const totalCounts: Record = { all: 0, pending: 0, running: 0, done: 0, error: 0 }; for (const row of countRows) { totalCounts[row.status] = row.cnt; totalCounts.all += row.cnt; } return c.json({ jobs, filter, totalCounts }); }); // ─── Param helpers ──────────────────────────────────────────────────────────── function parseId(raw: string | undefined): number | null { if (!raw) return null; const n = Number.parseInt(raw, 10); return Number.isFinite(n) && n > 0 ? n : null; } // ─── Start all pending ──────────────────────────────────────────────────────── app.post('/start', (c) => { const db = getDb(); const pending = db.prepare("SELECT * FROM jobs WHERE status = 'pending' ORDER BY created_at").all() as Job[]; runSequential(pending).catch((err) => logError('Queue failed:', err)); return c.json({ ok: true, started: pending.length }); }); // ─── Run single ─────────────────────────────────────────────────────────────── app.post('/job/:id/run', async (c) => { const jobId = parseId(c.req.param('id')); if (jobId == null) return c.json({ error: 'invalid job id' }, 400); const db = getDb(); const job = db.prepare('SELECT * FROM jobs WHERE id = ?').get(jobId) as Job | undefined; if (!job) return c.notFound(); if (job.status !== 'pending') { const result = loadJobRow(jobId); if (!result) return c.notFound(); return c.json(result); } runSequential([job]).catch((err) => logError(`Job ${job.id} failed:`, err)); const result = loadJobRow(jobId); if (!result) return c.notFound(); return c.json(result); }); // ─── Cancel ─────────────────────────────────────────────────────────────────── app.post('/job/:id/cancel', (c) => { const jobId = parseId(c.req.param('id')); if (jobId == null) return c.json({ error: 'invalid job id' }, 400); const db = getDb(); db.prepare("DELETE FROM jobs WHERE id = ? AND status = 'pending'").run(jobId); return c.json({ ok: true }); }); // ─── Clear queue ────────────────────────────────────────────────────────────── app.post('/clear', (c) => { const db = getDb(); db.prepare(` UPDATE review_plans SET status = 'pending', reviewed_at = NULL WHERE item_id IN (SELECT item_id FROM jobs WHERE status = 'pending') AND status = 'approved' `).run(); const result = db.prepare("DELETE FROM jobs WHERE status = 'pending'").run(); return c.json({ ok: true, cleared: result.changes }); }); app.post('/clear-completed', (c) => { const db = getDb(); const result = db.prepare("DELETE FROM jobs WHERE status IN ('done', 'error')").run(); return c.json({ ok: true, cleared: result.changes }); }); // ─── SSE ────────────────────────────────────────────────────────────────────── app.get('/events', (c) => { return stream(c, async (s) => { c.header('Content-Type', 'text/event-stream'); c.header('Cache-Control', 'no-cache'); const queue: string[] = []; let resolve: (() => void) | null = null; const listener = (data: string) => { queue.push(data); resolve?.(); }; jobListeners.add(listener); s.onAbort(() => { jobListeners.delete(listener); }); try { while (!s.closed) { if (queue.length > 0) { await s.write(queue.shift()!); } else { await new Promise((res) => { resolve = res; setTimeout(res, 15_000); }); resolve = null; if (queue.length === 0) await s.write(': keepalive\n\n'); } } } finally { jobListeners.delete(listener); } }); }); // ─── Job execution ──────────────────────────────────────────────────────────── async function runJob(job: Job): Promise { log(`Job ${job.id} starting (item=${job.item_id})`); log(`Job ${job.id} command: ${job.command}`); const db = getDb(); const itemRow = db.prepare('SELECT file_path FROM media_items WHERE id = ?').get(job.item_id) as { file_path: string } | undefined; if (itemRow?.file_path) { try { accessSync(itemRow.file_path, constants.R_OK | constants.W_OK); } catch (fsErr) { const msg = `File not accessible: ${itemRow.file_path}\n${(fsErr as Error).message}`; db.prepare("UPDATE jobs SET status = 'error', output = ?, exit_code = 1, completed_at = datetime('now') WHERE id = ?").run(msg, job.id); emitJobUpdate(job.id, 'error', msg); db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id); return; } } emitJobUpdate(job.id, 'running'); const outputLines: string[] = []; let pendingFlush = false; let lastFlushAt = 0; const updateOutput = db.prepare('UPDATE jobs SET output = ? WHERE id = ?'); const flush = (final = false) => { const text = outputLines.join('\n'); const now = Date.now(); if (final || now - lastFlushAt > 500) { updateOutput.run(text, job.id); lastFlushAt = now; pendingFlush = false; } else { pendingFlush = true; } emitJobUpdate(job.id, 'running', text); }; try { const proc = Bun.spawn(['sh', '-c', job.command], { stdout: 'pipe', stderr: 'pipe' }); const readStream = async (readable: ReadableStream, prefix = '') => { const reader = readable.getReader(); const decoder = new TextDecoder(); let buffer = ''; try { while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const parts = buffer.split(/\r\n|\n|\r/); buffer = parts.pop() ?? ''; for (const line of parts) { if (line.trim()) outputLines.push(prefix + line); } flush(); } if (buffer.trim()) outputLines.push(prefix + buffer); } catch (err) { logError(`stream read error (${prefix.trim() || 'stdout'}):`, err); } }; await Promise.all([readStream(proc.stdout), readStream(proc.stderr, '[stderr] '), proc.exited]); const exitCode = await proc.exited; if (pendingFlush) updateOutput.run(outputLines.join('\n'), job.id); if (exitCode !== 0) throw new Error(`FFmpeg exited with code ${exitCode}`); const fullOutput = outputLines.join('\n'); // Gather sidecar files to record const item = db.prepare('SELECT * FROM media_items WHERE id = ?').get(job.item_id) as MediaItem | undefined; const streams = db.prepare('SELECT * FROM media_streams WHERE item_id = ?').all(job.item_id) as MediaStream[]; const files = item && streams.length > 0 ? predictExtractedFiles(item, streams) : []; const insertFile = db.prepare('INSERT OR IGNORE INTO subtitle_files (item_id, file_path, language, codec, is_forced, is_hearing_impaired) VALUES (?, ?, ?, ?, ?, ?)'); const markJobDone = db.prepare("UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?"); const markPlanDone = db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?"); const markSubsExtracted = db.prepare('UPDATE review_plans SET subs_extracted = 1 WHERE item_id = ?'); db.transaction(() => { markJobDone.run(fullOutput, job.id); markPlanDone.run(job.item_id); for (const f of files) { insertFile.run(job.item_id, f.file_path, f.language, f.codec, f.is_forced ? 1 : 0, f.is_hearing_impaired ? 1 : 0); } if (files.length > 0) markSubsExtracted.run(job.item_id); })(); log(`Job ${job.id} completed successfully`); emitJobUpdate(job.id, 'done', fullOutput); } catch (err) { logError(`Job ${job.id} failed:`, err); const fullOutput = outputLines.join('\n') + '\n' + String(err); db.prepare("UPDATE jobs SET status = 'error', exit_code = 1, output = ?, completed_at = datetime('now') WHERE id = ?").run(fullOutput, job.id); emitJobUpdate(job.id, 'error', fullOutput); db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id); } } // ─── Scheduler ──────────────────────────────────────────────────────────────── app.get('/scheduler', (c) => { return c.json(getSchedulerState()); }); app.patch('/scheduler', async (c) => { const body = await c.req.json(); updateSchedulerState(body); return c.json(getSchedulerState()); }); // ─── FFmpeg progress parsing ─────────────────────────────────────────────────── /** Parse FFmpeg stderr line for progress. Returns seconds processed or null. */ export function parseFFmpegProgress(line: string): number | null { const match = line.match(/time=(\d+):(\d+):(\d+)\.(\d+)/); if (!match) return null; const [, h, m, s] = match.map(Number); return h * 3600 + m * 60 + s; } export default app;