All checks were successful
Build and Push Docker Image / build (push) Successful in 1m11s
The scalar subquery I added in 7d30e6c ran one aggregate scan of
media_streams per row. On a real library (33k items / 212k streams)
a single page took 500+ seconds synchronously, blocking the event
loop and timing out every other request — Library AND Pipeline both
stopped loading.
Swap it for a single batched `GROUP_CONCAT ... WHERE item_id IN (?...)`
query over the current page's ids (max 25), then merge back into rows.
v2026.04.15.10
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
422 lines
16 KiB
TypeScript
422 lines
16 KiB
TypeScript
import { Hono } from "hono";
|
||
import { stream } from "hono/streaming";
|
||
import { getAllConfig, getConfig, getDb, setConfig } from "../db/index";
|
||
import { log, error as logError, warn } from "../lib/log";
|
||
import { getAllItems, getDevItems } from "../services/jellyfin";
|
||
import { loadLibrary as loadRadarrLibrary, isUsable as radarrUsable } from "../services/radarr";
|
||
import { upsertJellyfinItem } from "../services/rescan";
|
||
import { isInScanWindow, msUntilScanWindow, nextScanWindowTime, waitForScanWindow } from "../services/scheduler";
|
||
import { loadLibrary as loadSonarrLibrary, isUsable as sonarrUsable } from "../services/sonarr";
|
||
|
||
const app = new Hono();
|
||
|
||
/**
|
||
* Validate a scan `limit` input. Must be a positive integer or absent —
|
||
* NaN/negatives/non-numerics would disable the progress cap
|
||
* (`processed >= NaN` never trips) or produce bogus totals via
|
||
* `Math.min(NaN, …)`. Exported for unit tests.
|
||
*/
|
||
export function parseScanLimit(raw: unknown): { ok: true; value: number | null } | { ok: false } {
|
||
if (raw == null || raw === "") return { ok: true, value: null };
|
||
const n = typeof raw === "number" ? raw : Number(raw);
|
||
if (!Number.isInteger(n) || n <= 0) return { ok: false };
|
||
return { ok: true, value: n };
|
||
}
|
||
|
||
type ScanStatusFilter = "all" | "pending" | "scanned" | "error";
|
||
type ScanTypeFilter = "all" | "movie" | "episode";
|
||
type ScanSourceFilter = "all" | "scan" | "webhook";
|
||
|
||
export interface ScanItemsQuery {
|
||
offset: number;
|
||
limit: number;
|
||
search: string;
|
||
status: ScanStatusFilter;
|
||
type: ScanTypeFilter;
|
||
source: ScanSourceFilter;
|
||
}
|
||
|
||
function parsePositiveInt(raw: unknown, fallback: number): number {
|
||
const n = typeof raw === "number" ? raw : Number(raw);
|
||
if (!Number.isFinite(n)) return fallback;
|
||
if (!Number.isInteger(n)) return fallback;
|
||
return n;
|
||
}
|
||
|
||
function clamp(n: number, min: number, max: number): number {
|
||
if (n < min) return min;
|
||
if (n > max) return max;
|
||
return n;
|
||
}
|
||
|
||
function parseOneOf<T extends readonly string[]>(raw: unknown, allowed: T, fallback: T[number]): T[number] {
|
||
if (typeof raw !== "string") return fallback;
|
||
const lowered = raw.toLowerCase();
|
||
return (allowed as readonly string[]).includes(lowered) ? (lowered as T[number]) : fallback;
|
||
}
|
||
|
||
export function parseScanItemsQuery(raw: Record<string, unknown>): ScanItemsQuery {
|
||
const limit = clamp(parsePositiveInt(raw.limit, 50), 1, 200);
|
||
const offset = Math.max(0, parsePositiveInt(raw.offset, 0));
|
||
const search = typeof raw.q === "string" ? raw.q.trim() : "";
|
||
return {
|
||
offset,
|
||
limit,
|
||
search,
|
||
status: parseOneOf(raw.status, ["all", "pending", "scanned", "error"] as const, "all"),
|
||
type: parseOneOf(raw.type, ["all", "movie", "episode"] as const, "all"),
|
||
source: parseOneOf(raw.source, ["all", "scan", "webhook"] as const, "all"),
|
||
};
|
||
}
|
||
|
||
export function buildScanItemsWhere(query: ScanItemsQuery): { sql: string; args: string[] } {
|
||
const clauses: string[] = [];
|
||
const args: string[] = [];
|
||
if (query.status !== "all") {
|
||
clauses.push("scan_status = ?");
|
||
args.push(query.status);
|
||
}
|
||
if (query.type !== "all") {
|
||
clauses.push("lower(type) = ?");
|
||
args.push(query.type);
|
||
}
|
||
if (query.source !== "all") {
|
||
clauses.push("ingest_source = ?");
|
||
args.push(query.source);
|
||
}
|
||
if (query.search.length > 0) {
|
||
clauses.push("(lower(name) LIKE ? OR lower(file_path) LIKE ?)");
|
||
const needle = `%${query.search.toLowerCase()}%`;
|
||
args.push(needle, needle);
|
||
}
|
||
return {
|
||
sql: clauses.length > 0 ? `WHERE ${clauses.join(" AND ")}` : "",
|
||
args,
|
||
};
|
||
}
|
||
|
||
// ─── State ────────────────────────────────────────────────────────────────────
|
||
|
||
let scanAbort: AbortController | null = null;
|
||
const scanListeners = new Set<(data: string) => void>();
|
||
|
||
function emitSse(type: string, data: unknown): void {
|
||
const line = `event: ${type}\ndata: ${JSON.stringify(data)}\n\n`;
|
||
for (const listener of scanListeners) listener(line);
|
||
}
|
||
|
||
function currentScanLimit(): number | null {
|
||
const v = getConfig("scan_limit");
|
||
return v ? Number(v) : null;
|
||
}
|
||
|
||
function parseLanguageList(raw: string | null, fallback: string[]): string[] {
|
||
if (!raw) return fallback;
|
||
try {
|
||
const parsed = JSON.parse(raw);
|
||
return Array.isArray(parsed) ? parsed.filter((v): v is string => typeof v === "string") : fallback;
|
||
} catch {
|
||
return fallback;
|
||
}
|
||
}
|
||
|
||
// ─── Status ───────────────────────────────────────────────────────────────────
|
||
|
||
app.get("/", (c) => {
|
||
const db = getDb();
|
||
const running = getConfig("scan_running") === "1";
|
||
const total = (db.prepare("SELECT COUNT(*) as n FROM media_items").get() as { n: number }).n;
|
||
const scanned = (
|
||
db.prepare("SELECT COUNT(*) as n FROM media_items WHERE scan_status = 'scanned'").get() as { n: number }
|
||
).n;
|
||
const errors = (db.prepare("SELECT COUNT(*) as n FROM media_items WHERE scan_status = 'error'").get() as { n: number })
|
||
.n;
|
||
const recentItems = db
|
||
.prepare(
|
||
"SELECT name, type, scan_status, file_path, last_scanned_at, ingest_source FROM media_items ORDER BY COALESCE(last_scanned_at, created_at) DESC, id DESC LIMIT 5",
|
||
)
|
||
.all() as {
|
||
name: string;
|
||
type: string;
|
||
scan_status: string;
|
||
file_path: string;
|
||
last_scanned_at: string | null;
|
||
ingest_source: string | null;
|
||
}[];
|
||
|
||
return c.json({ running, progress: { scanned, total, errors }, recentItems, scanLimit: currentScanLimit() });
|
||
});
|
||
|
||
app.get("/items", (c) => {
|
||
const db = getDb();
|
||
const query = parseScanItemsQuery({
|
||
offset: c.req.query("offset"),
|
||
limit: c.req.query("limit"),
|
||
q: c.req.query("q"),
|
||
status: c.req.query("status"),
|
||
type: c.req.query("type"),
|
||
source: c.req.query("source"),
|
||
});
|
||
const where = buildScanItemsWhere(query);
|
||
const rows = db
|
||
.prepare(
|
||
`
|
||
SELECT id, jellyfin_id, name, type, series_name, season_number, episode_number,
|
||
scan_status, original_language, orig_lang_source, container, file_size, file_path,
|
||
last_scanned_at, ingest_source
|
||
FROM media_items
|
||
${where.sql}
|
||
ORDER BY COALESCE(last_scanned_at, created_at) DESC, id DESC
|
||
LIMIT ? OFFSET ?
|
||
`,
|
||
)
|
||
.all(...where.args, query.limit, query.offset) as Array<{
|
||
id: number;
|
||
jellyfin_id: string;
|
||
name: string;
|
||
type: string;
|
||
series_name: string | null;
|
||
season_number: number | null;
|
||
episode_number: number | null;
|
||
scan_status: string;
|
||
original_language: string | null;
|
||
orig_lang_source: string | null;
|
||
container: string | null;
|
||
file_size: number | null;
|
||
file_path: string;
|
||
last_scanned_at: string | null;
|
||
ingest_source: string | null;
|
||
audio_codecs: string | null;
|
||
}>;
|
||
|
||
// Audio codecs per item, batched into one query for the current page.
|
||
// A per-row scalar subquery over media_streams was O(page × streams)
|
||
// and could block the event loop for minutes on large libraries.
|
||
if (rows.length > 0) {
|
||
const placeholders = rows.map(() => "?").join(",");
|
||
const codecRows = db
|
||
.prepare(
|
||
`SELECT item_id, GROUP_CONCAT(DISTINCT LOWER(codec)) AS codecs
|
||
FROM media_streams
|
||
WHERE item_id IN (${placeholders}) AND type = 'Audio' AND codec IS NOT NULL
|
||
GROUP BY item_id`,
|
||
)
|
||
.all(...rows.map((r) => r.id)) as { item_id: number; codecs: string | null }[];
|
||
const byItem = new Map(codecRows.map((r) => [r.item_id, r.codecs]));
|
||
for (const row of rows) row.audio_codecs = byItem.get(row.id) ?? null;
|
||
}
|
||
|
||
const total = (db.prepare(`SELECT COUNT(*) as n FROM media_items ${where.sql}`).get(...where.args) as { n: number }).n;
|
||
return c.json({ rows, total, hasMore: query.offset + rows.length < total, query });
|
||
});
|
||
|
||
// ─── Start ────────────────────────────────────────────────────────────────────
|
||
|
||
app.post("/start", async (c) => {
|
||
const db = getDb();
|
||
// Atomic claim: only succeed if scan_running is not already '1'.
|
||
const claim = db.prepare("UPDATE config SET value = '1' WHERE key = 'scan_running' AND value != '1'").run();
|
||
if (claim.changes === 0) {
|
||
return c.json({ ok: false, error: "Scan already running" }, 409);
|
||
}
|
||
|
||
const body = await c.req.json<{ limit?: unknown }>().catch(() => ({ limit: undefined }));
|
||
const formLimit = parseScanLimit(body.limit);
|
||
const envLimit = parseScanLimit(process.env.SCAN_LIMIT);
|
||
if (!formLimit.ok || !envLimit.ok) {
|
||
db.prepare("UPDATE config SET value = '0' WHERE key = 'scan_running'").run();
|
||
return c.json({ ok: false, error: "limit must be a positive integer" }, 400);
|
||
}
|
||
const limit: number | null = formLimit.value ?? envLimit.value ?? null;
|
||
setConfig("scan_limit", limit != null ? String(limit) : "");
|
||
|
||
runScan(limit).catch((err) => {
|
||
logError("Scan failed:", err);
|
||
setConfig("scan_running", "0");
|
||
emitSse("error", { message: String(err) });
|
||
});
|
||
|
||
return c.json({ ok: true });
|
||
});
|
||
|
||
// ─── Stop ─────────────────────────────────────────────────────────────────────
|
||
|
||
app.post("/stop", (c) => {
|
||
scanAbort?.abort();
|
||
setConfig("scan_running", "0");
|
||
return c.json({ ok: true });
|
||
});
|
||
|
||
// ─── SSE ──────────────────────────────────────────────────────────────────────
|
||
|
||
app.get("/events", (c) => {
|
||
return stream(c, async (s) => {
|
||
c.header("Content-Type", "text/event-stream");
|
||
c.header("Cache-Control", "no-cache");
|
||
c.header("Connection", "keep-alive");
|
||
|
||
const queue: string[] = [];
|
||
let resolve: (() => void) | null = null;
|
||
|
||
const listener = (data: string) => {
|
||
queue.push(data);
|
||
resolve?.();
|
||
};
|
||
|
||
scanListeners.add(listener);
|
||
s.onAbort(() => {
|
||
scanListeners.delete(listener);
|
||
});
|
||
|
||
try {
|
||
while (!s.closed) {
|
||
if (queue.length > 0) {
|
||
await s.write(queue.shift()!);
|
||
} else {
|
||
await new Promise<void>((res) => {
|
||
resolve = res;
|
||
setTimeout(res, 25_000);
|
||
});
|
||
resolve = null;
|
||
if (queue.length === 0) await s.write(": keepalive\n\n");
|
||
}
|
||
}
|
||
} finally {
|
||
scanListeners.delete(listener);
|
||
}
|
||
});
|
||
});
|
||
|
||
// ─── Core scan logic ──────────────────────────────────────────────────────────
|
||
|
||
async function runScan(limit: number | null = null): Promise<void> {
|
||
log(`Scan started${limit ? ` (limit: ${limit})` : ""}`);
|
||
scanAbort = new AbortController();
|
||
const { signal } = scanAbort;
|
||
const isDev = process.env.NODE_ENV === "development";
|
||
const db = getDb();
|
||
|
||
if (isDev) {
|
||
// Order matters only if foreign keys are enforced without CASCADE; we
|
||
// have ON DELETE CASCADE on media_streams/review_plans/stream_decisions/
|
||
// subtitle_files/jobs, so deleting media_items would be enough. List
|
||
// them explicitly for clarity and to survive future schema drift.
|
||
db.prepare("DELETE FROM jobs").run();
|
||
db.prepare("DELETE FROM subtitle_files").run();
|
||
db.prepare("DELETE FROM stream_decisions").run();
|
||
db.prepare("DELETE FROM review_plans").run();
|
||
db.prepare("DELETE FROM media_streams").run();
|
||
db.prepare("DELETE FROM media_items").run();
|
||
}
|
||
|
||
const cfg = getAllConfig();
|
||
const jellyfinCfg = { url: cfg.jellyfin_url, apiKey: cfg.jellyfin_api_key, userId: cfg.jellyfin_user_id };
|
||
const audioLanguages = parseLanguageList(cfg.audio_languages ?? null, []);
|
||
const radarrCfg = { url: cfg.radarr_url, apiKey: cfg.radarr_api_key };
|
||
const sonarrCfg = { url: cfg.sonarr_url, apiKey: cfg.sonarr_api_key };
|
||
// 'enabled' in config means the user toggled it on. Only actually use it
|
||
// if the URL+key pass URL parsing — otherwise we'd hit ERR_INVALID_URL on
|
||
// every item. Refuse to call invalid endpoints rather than spamming logs.
|
||
const radarrEnabled = cfg.radarr_enabled === "1" && radarrUsable(radarrCfg);
|
||
const sonarrEnabled = cfg.sonarr_enabled === "1" && sonarrUsable(sonarrCfg);
|
||
|
||
if (cfg.radarr_enabled === "1" && !radarrEnabled) {
|
||
warn(`Radarr is enabled in config but URL/API key is invalid (url='${cfg.radarr_url}') — skipping Radarr lookups`);
|
||
}
|
||
if (cfg.sonarr_enabled === "1" && !sonarrEnabled) {
|
||
warn(`Sonarr is enabled in config but URL/API key is invalid (url='${cfg.sonarr_url}') — skipping Sonarr lookups`);
|
||
}
|
||
|
||
// Pre-load both libraries once so per-item lookups are O(1) cache hits
|
||
// instead of HTTP round-trips. The previous code called /api/v3/movie
|
||
// (the entire library!) once per item that didn't match by tmdbId.
|
||
const [radarrLibrary, sonarrLibrary] = await Promise.all([
|
||
radarrEnabled ? loadRadarrLibrary(radarrCfg) : Promise.resolve(null),
|
||
sonarrEnabled ? loadSonarrLibrary(sonarrCfg) : Promise.resolve(null),
|
||
]);
|
||
|
||
log(
|
||
`External language sources: radarr=${radarrEnabled ? `enabled (${cfg.radarr_url}, ${radarrLibrary?.byTmdbId.size ?? 0} movies in library)` : "disabled"}, sonarr=${sonarrEnabled ? `enabled (${cfg.sonarr_url}, ${sonarrLibrary?.byTvdbId.size ?? 0} series in library)` : "disabled"}`,
|
||
);
|
||
let processed = 0;
|
||
let errors = 0;
|
||
let total = 0;
|
||
|
||
const rescanCfg = {
|
||
audioLanguages,
|
||
radarr: radarrEnabled ? radarrCfg : null,
|
||
sonarr: sonarrEnabled ? sonarrCfg : null,
|
||
radarrLibrary,
|
||
sonarrLibrary,
|
||
};
|
||
|
||
let radarrMisses = 0;
|
||
let radarrHits = 0;
|
||
let sonarrMisses = 0;
|
||
let sonarrHits = 0;
|
||
let missingProviderIds = 0;
|
||
|
||
const itemSource = isDev
|
||
? getDevItems(jellyfinCfg)
|
||
: getAllItems(jellyfinCfg, (_fetched, jellyfinTotal) => {
|
||
total = limit != null ? Math.min(limit, jellyfinTotal) : jellyfinTotal;
|
||
});
|
||
for await (const jellyfinItem of itemSource) {
|
||
if (signal.aborted) break;
|
||
if (!isDev && limit != null && processed >= limit) break;
|
||
if (!jellyfinItem.Name || !jellyfinItem.Path) {
|
||
warn(`Skipping item without name/path: id=${jellyfinItem.Id}`);
|
||
continue;
|
||
}
|
||
|
||
// Honour the scan window between items so overnight-only setups don't hog
|
||
// Jellyfin during the day. Checked between items rather than mid-item so
|
||
// we don't leave a partial upsert sitting in flight.
|
||
if (!isInScanWindow()) {
|
||
emitSse("paused", {
|
||
until: nextScanWindowTime(),
|
||
seconds: Math.round(msUntilScanWindow() / 1000),
|
||
});
|
||
await waitForScanWindow();
|
||
if (signal.aborted) break;
|
||
emitSse("resumed", {});
|
||
}
|
||
|
||
processed++;
|
||
emitSse("progress", { scanned: processed, total, current_item: jellyfinItem.Name, errors, running: true });
|
||
|
||
try {
|
||
const result = await upsertJellyfinItem(db, jellyfinItem, rescanCfg);
|
||
if (result.radarrHit) radarrHits++;
|
||
if (result.radarrMiss) radarrMisses++;
|
||
if (result.sonarrHit) sonarrHits++;
|
||
if (result.sonarrMiss) sonarrMisses++;
|
||
if (result.missingProviderId) missingProviderIds++;
|
||
emitSse("log", { name: jellyfinItem.Name, type: jellyfinItem.Type, status: "scanned", file: jellyfinItem.Path });
|
||
} catch (err) {
|
||
errors++;
|
||
logError(`Error scanning ${jellyfinItem.Name}:`, err);
|
||
try {
|
||
db
|
||
.prepare("UPDATE media_items SET scan_status = 'error', scan_error = ? WHERE jellyfin_id = ?")
|
||
.run(String(err), jellyfinItem.Id);
|
||
} catch (dbErr) {
|
||
// Failed to persist the error status — log it so the incident
|
||
// doesn't disappear silently. We can't do much more; the outer
|
||
// loop continues so the scan still finishes.
|
||
logError(`Failed to record scan error for ${jellyfinItem.Id}:`, dbErr);
|
||
}
|
||
emitSse("log", { name: jellyfinItem.Name, type: jellyfinItem.Type, status: "error", file: jellyfinItem.Path });
|
||
}
|
||
}
|
||
|
||
setConfig("scan_running", "0");
|
||
log(`Scan complete: ${processed} scanned, ${errors} errors`);
|
||
log(
|
||
` language sources: radarr hits=${radarrHits} misses=${radarrMisses}, sonarr hits=${sonarrHits} misses=${sonarrMisses}, no provider id=${missingProviderIds}`,
|
||
);
|
||
emitSse("complete", { scanned: processed, total, errors });
|
||
}
|
||
|
||
export default app;
|