import { Hono } from "hono"; import { stream } from "hono/streaming"; import { getAllConfig, getConfig, getDb, setConfig } from "../db/index"; import { log, error as logError } from "../lib/log"; import { discoverVideoFiles } from "../services/discover"; import { parsePath } from "../services/path-parser"; import { probeFile } from "../services/probe"; import { upsertScannedItem } from "../services/rescan"; import { emitPipelineChanged } from "./execute"; import { isInScanWindow, msUntilScanWindow, nextScanWindowTime, waitForScanWindow } from "../services/scheduler"; const app = new Hono(); /** * Validate a scan `limit` input. Must be a positive integer or absent — * NaN/negatives/non-numerics would disable the progress cap * (`processed >= NaN` never trips) or produce bogus totals via * `Math.min(NaN, …)`. Exported for unit tests. */ export function parseScanLimit(raw: unknown): { ok: true; value: number | null } | { ok: false } { if (raw == null || raw === "") return { ok: true, value: null }; const n = typeof raw === "number" ? raw : Number(raw); if (!Number.isInteger(n) || n <= 0) return { ok: false }; return { ok: true, value: n }; } // ─── State ──────────────────────────────────────────────────────────────────── let scanAbort: AbortController | null = null; const scanListeners = new Set<(data: string) => void>(); function emitSse(type: string, data: unknown): void { const line = `event: ${type}\ndata: ${JSON.stringify(data)}\n\n`; for (const listener of scanListeners) listener(line); } function currentScanLimit(): number | null { const v = getConfig("scan_limit"); return v ? Number(v) : null; } // ─── Status ─────────────────────────────────────────────────────────────────── app.get("/", (c) => { const db = getDb(); const running = getConfig("scan_running") === "1"; const total = (db.prepare("SELECT COUNT(*) as n FROM media_items").get() as { n: number }).n; const scanned = ( db.prepare("SELECT COUNT(*) as n FROM media_items WHERE scan_status = 'scanned'").get() as { n: number } ).n; const errors = (db.prepare("SELECT COUNT(*) as n FROM media_items WHERE scan_status = 'error'").get() as { n: number }) .n; const recentItems = db .prepare( "SELECT name, type, scan_status, file_path, last_scanned_at FROM media_items ORDER BY COALESCE(last_scanned_at, created_at) DESC, id DESC LIMIT 5", ) .all() as { name: string; type: string; scan_status: string; file_path: string; last_scanned_at: string | null; }[]; return c.json({ running, progress: { scanned, total, errors }, recentItems, scanLimit: currentScanLimit() }); }); // ─── Start ──────────────────────────────────────────────────────────────────── app.post("/start", async (c) => { const db = getDb(); // Atomic claim: only succeed if scan_running is not already '1'. const claim = db.prepare("UPDATE config SET value = '1' WHERE key = 'scan_running' AND value != '1'").run(); if (claim.changes === 0) { return c.json({ ok: false, error: "Scan already running" }, 409); } const body = await c.req.json<{ limit?: unknown }>().catch(() => ({ limit: undefined })); const formLimit = parseScanLimit(body.limit); const envLimit = parseScanLimit(process.env.SCAN_LIMIT); if (!formLimit.ok || !envLimit.ok) { db.prepare("UPDATE config SET value = '0' WHERE key = 'scan_running'").run(); return c.json({ ok: false, error: "limit must be a positive integer" }, 400); } const limit: number | null = formLimit.value ?? envLimit.value ?? null; setConfig("scan_limit", limit != null ? String(limit) : ""); runScan(limit).catch((err) => { logError("Scan failed:", err); setConfig("scan_running", "0"); emitSse("error", { message: String(err) }); }); return c.json({ ok: true }); }); // ─── Stop ───────────────────────────────────────────────────────────────────── app.post("/stop", (c) => { scanAbort?.abort(); setConfig("scan_running", "0"); return c.json({ ok: true }); }); // ─── SSE ────────────────────────────────────────────────────────────────────── app.get("/events", (c) => { return stream(c, async (s) => { c.header("Content-Type", "text/event-stream"); c.header("Cache-Control", "no-cache"); c.header("Connection", "keep-alive"); const queue: string[] = []; let resolve: (() => void) | null = null; const listener = (data: string) => { queue.push(data); resolve?.(); }; scanListeners.add(listener); s.onAbort(() => { scanListeners.delete(listener); }); try { while (!s.closed) { if (queue.length > 0) { await s.write(queue.shift()!); } else { await new Promise((res) => { resolve = res; setTimeout(res, 25_000); }); resolve = null; if (queue.length === 0) await s.write(": keepalive\n\n"); } } } finally { scanListeners.delete(listener); } }); }); // ─── Core scan logic ────────────────────────────────────────────────────────── async function runScan(limit: number | null = null): Promise { log(`Scan started${limit ? ` (limit: ${limit})` : ""}`); scanAbort = new AbortController(); const { signal } = scanAbort; const db = getDb(); const cfg = getAllConfig(); const moviesRoot = cfg.movies_root || "/movies"; const tvRoot = cfg.tv_root || "/tv"; let processed = 0; let errors = 0; emitSse("log", { message: "Discovering files..." }); const allFiles = await discoverVideoFiles([moviesRoot, tvRoot]); const total = limit != null ? Math.min(limit, allFiles.length) : allFiles.length; emitSse("progress", { scanned: 0, total, current_item: null, errors, running: true }); for (const filePath of allFiles) { if (signal.aborted) break; if (limit != null && processed >= limit) break; // Honour the scan window between items so overnight-only setups don't // hog the filesystem during the day. Checked between items rather than // mid-item so we don't leave a partial upsert in flight. if (!isInScanWindow()) { emitSse("paused", { until: nextScanWindowTime(), seconds: Math.round(msUntilScanWindow() / 1000), }); await waitForScanWindow(); if (signal.aborted) break; emitSse("resumed", {}); } const parsed = parsePath(filePath, moviesRoot, tvRoot); if (!parsed) continue; processed++; emitSse("progress", { scanned: processed, total, current_item: parsed.name, errors, running: true }); try { const probe = await probeFile(filePath); upsertScannedItem(db, filePath, parsed, probe); if (processed % 25 === 0) emitPipelineChanged(); emitSse("log", { name: parsed.name, type: parsed.type, status: "scanned", file: filePath }); } catch (err) { errors++; logError(`Error scanning ${filePath}:`, err); try { db.prepare(` INSERT INTO media_items (file_path, type, name, scan_status, scan_error, last_scanned_at) VALUES (?, ?, ?, 'error', ?, datetime('now')) ON CONFLICT(file_path) DO UPDATE SET scan_status = 'error', scan_error = ?, last_scanned_at = datetime('now') `).run(filePath, parsed.type, parsed.name, String(err), String(err)); } catch (dbErr) { // Failed to persist the error status — log it so the incident // doesn't disappear silently. logError(`Failed to record scan error for ${filePath}:`, dbErr); } emitSse("log", { name: parsed.name, type: parsed.type, status: "error", file: filePath }); } } setConfig("scan_running", "0"); log(`Scan complete: ${processed} scanned, ${errors} errors`); emitSse("complete", { scanned: processed, total, errors }); if (getConfig("auto_processing") === "1") { const { processInbox, getAudioLanguages } = await import("./review"); const { emitInboxSorted, emitInboxSortStart, emitInboxSortProgress } = await import("./execute"); processInbox(db, getAudioLanguages(), undefined, { onStart: emitInboxSortStart, onProgress: emitInboxSortProgress, }) .then((result) => emitInboxSorted(result)) .catch(() => emitInboxSorted({ moved_to_queue: 0, moved_to_review: 0 })); } } export default app;