Files
netfelix-audio-fix/server/api/scan.ts
T
felixfoertsch 78d569189f
Build and Push Docker Image / build (push) Successful in 1m18s
fix stop buttons: centralize processInbox launch through abort-aware startProcessInbox
All three processInbox callers (manual button, auto-processing toggle,
post-scan auto-process) now go through startProcessInbox() which manages
the shared abort controller. Previously only the manual button set the
abort controller, so Stop Sorting had no effect when processing was
triggered from the settings toggle or after scan completion.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-21 10:18:57 +02:00

244 lines
9.1 KiB
TypeScript

import { Hono } from "hono";
import { stream } from "hono/streaming";
import { getAllConfig, getConfig, getDb, setConfig } from "../db/index";
import { log, error as logError } from "../lib/log";
import { discoverVideoFiles } from "../services/discover";
import { parsePath } from "../services/path-parser";
import { probeFile } from "../services/probe";
import { upsertScannedItem } from "../services/rescan";
import { isInScanWindow, msUntilScanWindow, nextScanWindowTime, waitForScanWindow } from "../services/scheduler";
import { emitPipelineChanged } from "./execute";
const app = new Hono();
/**
* Validate a scan `limit` input. Must be a positive integer or absent —
* NaN/negatives/non-numerics would disable the progress cap
* (`processed >= NaN` never trips) or produce bogus totals via
* `Math.min(NaN, …)`. Exported for unit tests.
*/
export function parseScanLimit(raw: unknown): { ok: true; value: number | null } | { ok: false } {
if (raw == null || raw === "") return { ok: true, value: null };
const n = typeof raw === "number" ? raw : Number(raw);
if (!Number.isInteger(n) || n <= 0) return { ok: false };
return { ok: true, value: n };
}
// ─── State ────────────────────────────────────────────────────────────────────
let scanAbort: AbortController | null = null;
const scanListeners = new Set<(data: string) => void>();
function emitSse(type: string, data: unknown): void {
const line = `event: ${type}\ndata: ${JSON.stringify(data)}\n\n`;
for (const listener of scanListeners) listener(line);
}
function currentScanLimit(): number | null {
const v = getConfig("scan_limit");
return v ? Number(v) : null;
}
// ─── Status ───────────────────────────────────────────────────────────────────
app.get("/", (c) => {
const db = getDb();
const running = getConfig("scan_running") === "1";
const total = (db.prepare("SELECT COUNT(*) as n FROM media_items").get() as { n: number }).n;
const scanned = (
db.prepare("SELECT COUNT(*) as n FROM media_items WHERE scan_status = 'scanned'").get() as { n: number }
).n;
const errors = (db.prepare("SELECT COUNT(*) as n FROM media_items WHERE scan_status = 'error'").get() as { n: number })
.n;
const recentItems = db
.prepare(
"SELECT name, type, scan_status, file_path, last_scanned_at FROM media_items ORDER BY COALESCE(last_scanned_at, created_at) DESC, id DESC LIMIT 5",
)
.all() as {
name: string;
type: string;
scan_status: string;
file_path: string;
last_scanned_at: string | null;
}[];
return c.json({ running, progress: { scanned, total, errors }, recentItems, scanLimit: currentScanLimit() });
});
app.get("/errors", (c) => {
const db = getDb();
const rows = db
.prepare("SELECT name, file_path, scan_error FROM media_items WHERE scan_status = 'error' ORDER BY name LIMIT 100")
.all() as { name: string; file_path: string; scan_error: string | null }[];
return c.json({ errors: rows });
});
// ─── Start ────────────────────────────────────────────────────────────────────
app.post("/start", async (c) => {
const db = getDb();
// Atomic claim: only succeed if scan_running is not already '1'.
const claim = db.prepare("UPDATE config SET value = '1' WHERE key = 'scan_running' AND value != '1'").run();
if (claim.changes === 0) {
return c.json({ ok: false, error: "Scan already running" }, 409);
}
const body = await c.req.json<{ limit?: unknown }>().catch(() => ({ limit: undefined }));
const formLimit = parseScanLimit(body.limit);
const envLimit = parseScanLimit(process.env.SCAN_LIMIT);
if (!formLimit.ok || !envLimit.ok) {
db.prepare("UPDATE config SET value = '0' WHERE key = 'scan_running'").run();
return c.json({ ok: false, error: "limit must be a positive integer" }, 400);
}
const limit: number | null = formLimit.value ?? envLimit.value ?? null;
setConfig("scan_limit", limit != null ? String(limit) : "");
runScan(limit).catch((err) => {
logError("Scan failed:", err);
setConfig("scan_running", "0");
emitSse("error", { message: String(err) });
});
return c.json({ ok: true });
});
// ─── Stop ─────────────────────────────────────────────────────────────────────
app.post("/stop", (c) => {
scanAbort?.abort();
setConfig("scan_running", "0");
return c.json({ ok: true });
});
// ─── SSE ──────────────────────────────────────────────────────────────────────
app.get("/events", (c) => {
return stream(c, async (s) => {
c.header("Content-Type", "text/event-stream");
c.header("Cache-Control", "no-cache");
c.header("Connection", "keep-alive");
const queue: string[] = [];
let resolve: (() => void) | null = null;
const listener = (data: string) => {
queue.push(data);
resolve?.();
};
scanListeners.add(listener);
s.onAbort(() => {
scanListeners.delete(listener);
});
try {
while (!s.closed) {
if (queue.length > 0) {
await s.write(queue.shift()!);
} else {
await new Promise<void>((res) => {
resolve = res;
setTimeout(res, 25_000);
});
resolve = null;
if (queue.length === 0) await s.write(": keepalive\n\n");
}
}
} finally {
scanListeners.delete(listener);
}
});
});
// ─── Core scan logic ──────────────────────────────────────────────────────────
async function runScan(limit: number | null = null): Promise<void> {
log(`Scan started${limit ? ` (limit: ${limit})` : ""}`);
scanAbort = new AbortController();
const { signal } = scanAbort;
const db = getDb();
const cfg = getAllConfig();
const moviesRoot = cfg.movies_root || "/movies";
const tvRoot = cfg.tv_root || "/tv";
let processed = 0;
let errors = 0;
emitSse("log", { message: "Discovering files..." });
const allFiles = await discoverVideoFiles([moviesRoot, tvRoot]);
// Skip files already scanned — makes the scan resumable across stops/restarts.
const alreadyScanned = new Set(
(db.prepare("SELECT file_path FROM media_items WHERE scan_status = 'scanned'").all() as { file_path: string }[]).map(
(r) => r.file_path,
),
);
const pending = allFiles.filter((f) => !alreadyScanned.has(f));
const skipped = allFiles.length - pending.length;
if (skipped > 0) log(`Scan: skipping ${skipped} already-scanned files, ${pending.length} remaining`);
const total = limit != null ? Math.min(limit, pending.length) : pending.length;
emitSse("progress", { scanned: 0, total, current_item: null, errors, running: true });
for (const filePath of pending) {
if (signal.aborted) break;
if (limit != null && processed >= limit) break;
// Honour the scan window between items so overnight-only setups don't
// hog the filesystem during the day. Checked between items rather than
// mid-item so we don't leave a partial upsert in flight.
if (!isInScanWindow()) {
emitSse("paused", {
until: nextScanWindowTime(),
seconds: Math.round(msUntilScanWindow() / 1000),
});
await waitForScanWindow();
if (signal.aborted) break;
emitSse("resumed", {});
}
const parsed = parsePath(filePath, moviesRoot, tvRoot);
if (!parsed) continue;
processed++;
emitSse("progress", { scanned: processed, total, current_item: parsed.name, errors, running: true });
try {
const probe = await probeFile(filePath);
upsertScannedItem(db, filePath, parsed, probe);
if (processed % 25 === 0) emitPipelineChanged();
emitSse("log", { name: parsed.name, type: parsed.type, status: "scanned", file: filePath });
} catch (err) {
errors++;
logError(`Error scanning ${filePath}:`, err);
try {
db
.prepare(`
INSERT INTO media_items (file_path, type, name, scan_status, scan_error, last_scanned_at)
VALUES (?, ?, ?, 'error', ?, datetime('now'))
ON CONFLICT(file_path) DO UPDATE SET scan_status = 'error', scan_error = ?, last_scanned_at = datetime('now')
`)
.run(filePath, parsed.type, parsed.name, String(err), String(err));
} catch (dbErr) {
// Failed to persist the error status — log it so the incident
// doesn't disappear silently.
logError(`Failed to record scan error for ${filePath}:`, dbErr);
}
emitSse("log", { name: parsed.name, type: parsed.type, status: "error", file: filePath });
}
}
setConfig("scan_running", "0");
log(`Scan complete: ${processed} scanned, ${errors} errors`);
emitSse("complete", { scanned: processed, total, errors });
if (getConfig("auto_processing") === "1") {
const { startProcessInbox } = await import("./review");
startProcessInbox();
}
}
export default app;