import { unlinkSync } from "node:fs"; import { Hono } from "hono"; import { getAllConfig, getConfig, getDb } from "../db/index"; import { log, error as logError } from "../lib/log"; import { isOneOf, parseId } from "../lib/validate"; import { analyzeItem, assignTargetOrder } from "../services/analyzer"; import { buildCommand, LANG_NAMES } from "../services/ffmpeg"; import { type LanguageResolverConfig, resolveLanguage } from "../services/language-resolver"; import { normalizeLanguage } from "../services/language-utils"; import { loadLibrary as loadRadarrLibrary, type RadarrLibrary, isUsable as radarrUsable, triggerMovieRefetch, } from "../services/radarr"; import { loadLibrary as loadSonarrLibrary, type SonarrLibrary, isUsable as sonarrUsable, triggerEpisodeRefetch, } from "../services/sonarr"; import type { Job, MediaItem, MediaStream, ReviewPlan, StreamDecision } from "../types"; import { emitInboxSorted, emitInboxSortProgress, emitInboxSortStart, emitPipelineChanged, maybeStartQueueProcessor, } from "./execute"; const app = new Hono(); // ─── Helpers ────────────────────────────────────────────────────────────────── export function getAudioLanguages(): string[] { return parseLanguageList(getConfig("audio_languages"), []); } function parseLanguageList(raw: string | null, fallback: string[]): string[] { if (!raw) return fallback; try { const parsed = JSON.parse(raw); return Array.isArray(parsed) ? parsed.filter((v): v is string => typeof v === "string") : fallback; } catch { return fallback; } } /** * Insert a pending audio job for the given item only if no pending job * already exists for it. Guards against duplicate jobs from rapid-fire * approve clicks, overlapping individual + bulk approvals, or any other * path that could race two POSTs for the same item. Returns true if a * job was actually inserted. */ export function enqueueAudioJob(db: ReturnType, itemId: number, command: string): boolean { const result = db .prepare(` INSERT INTO jobs (item_id, command, job_type, status) SELECT ?, ?, 'audio', 'pending' WHERE NOT EXISTS (SELECT 1 FROM jobs WHERE item_id = ? AND status = 'pending') `) .run(itemId, command, itemId); if (result.changes > 0) { // Kick the processor if the user has opted into hands-off queue // draining. No-ops when the runner is already active or the toggle // is off, so every enqueue path gets the behaviour for free. maybeStartQueueProcessor(); } return result.changes > 0; } export interface ProcessInboxResult { moved_to_queue: number; moved_to_review: number; } export interface ProcessInboxHooks { onStart?: (total: number) => void; onProgress?: (processed: number, total: number) => void; signal?: AbortSignal; } export interface ProcessInboxConfig { radarr?: { url: string; apiKey: string } | null; sonarr?: { url: string; apiKey: string } | null; radarrLibrary?: RadarrLibrary | null; sonarrLibrary?: SonarrLibrary | null; } // Yield to the event loop every N items so SSE writes flush and other // requests get a turn. 10 lands around 20 yields/second at typical // reanalyze speed — smooth progress without thrashing. const PROCESS_INBOX_YIELD_EVERY = 10; /** * Distribute every unsorted (sorted=0) pending plan to its final bucket: * auto → sorted=1, status='approved', job enqueued (→ Queue) * auto_heuristic → sorted=1 (→ Review, badge ⚡) * manual → sorted=1 (→ Review, badge ✋) * * Before distributing, resolves the original language via Radarr/Sonarr * (when configured) and persists it so reanalyze() picks it up. * * Emits progress via the optional hooks so the UI can render a live * counter during long sorts; ticks after every item (including noops * that get skipped) so the progress bar tracks real work done. */ export async function processInbox( db: ReturnType, audioLanguages: string[], externalConfig?: ProcessInboxConfig, hooks?: ProcessInboxHooks, ): Promise { // Build the language resolver config. When no external config is provided // (HTTP route, auto-process), load from DB config and fetch libraries. let resolverCfg: LanguageResolverConfig; if (externalConfig) { resolverCfg = { radarr: externalConfig.radarr ?? null, sonarr: externalConfig.sonarr ?? null, radarrLibrary: externalConfig.radarrLibrary ?? null, sonarrLibrary: externalConfig.sonarrLibrary ?? null, }; } else { const cfg = getAllConfig(); const radarrCfg = { url: cfg.radarr_url, apiKey: cfg.radarr_api_key }; const sonarrCfg = { url: cfg.sonarr_url, apiKey: cfg.sonarr_api_key }; const radarrEnabled = cfg.radarr_enabled === "1" && radarrUsable(radarrCfg); const sonarrEnabled = cfg.sonarr_enabled === "1" && sonarrUsable(sonarrCfg); const [radarrLibrary, sonarrLibrary] = await Promise.all([ radarrEnabled ? loadRadarrLibrary(radarrCfg) : Promise.resolve(null), sonarrEnabled ? loadSonarrLibrary(sonarrCfg) : Promise.resolve(null), ]); resolverCfg = { radarr: radarrEnabled ? radarrCfg : null, sonarr: sonarrEnabled ? sonarrCfg : null, radarrLibrary, sonarrLibrary, }; } // Snapshot the ids first — reanalyze() below rewrites stream_decisions and // the plan's auto_class/is_noop so we must re-read the plan after each // reanalysis rather than trusting this initial select. const unsortedIds = db .prepare(` SELECT rp.item_id FROM review_plans rp WHERE rp.status = 'pending' AND rp.is_noop = 0 AND rp.sorted = 0 `) .all() as { item_id: number }[]; const total = unsortedIds.length; log(`processInbox: ${total} unsorted items to classify`); hooks?.onStart?.(total); let movedToQueue = 0; let movedToReview = 0; let processed = 0; for (const { item_id } of unsortedIds) { if (hooks?.signal?.aborted) break; // Resolve the original language via Radarr/Sonarr before reanalysis // so the analyzer sees the authoritative OG language. Only persist // when the resolver actually succeeded via an external source (radarr // or sonarr); the probe fallback path just echoes existing DB // values and should not overwrite anything. const langResult = await resolveLanguage(db, item_id, resolverCfg); if (langResult.externalRaw != null) { db .prepare(` UPDATE media_items SET original_language = ?, orig_lang_source = ?, needs_review = ? WHERE id = ? `) .run(langResult.origLang, langResult.origLangSource, langResult.needsReview, item_id); } // Re-run the analyzer so settings changes made after the scan (e.g. // toggling an audio_languages entry) shape the decisions and class // before distribution. Without this the user's only path to apply a // config change to already-scanned items is a full per-item rescan. reanalyze(db, item_id, audioLanguages); const plan = db.prepare("SELECT id, auto_class, is_noop FROM review_plans WHERE item_id = ?").get(item_id) as | { id: number; auto_class: string | null; is_noop: number } | undefined; if (plan && !plan.is_noop) { if (plan.auto_class === "auto") { db .prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now'), sorted = 1 WHERE id = ?") .run(plan.id); const { item, streams, decisions } = loadItemDetail(db, item_id); if (item) enqueueAudioJob(db, item_id, buildCommand(item, streams, decisions)); movedToQueue += 1; } else { db.prepare("UPDATE review_plans SET sorted = 1 WHERE id = ?").run(plan.id); movedToReview += 1; } } // plans that vanished (!plan) or became noops fall through — the // is_noop filter already excludes them from both Inbox and Review. processed += 1; hooks?.onProgress?.(processed, total); if (processed % PROCESS_INBOX_YIELD_EVERY === 0 && processed < total) { await Bun.sleep(0); } } log(`processInbox complete: ${movedToQueue} → queue, ${movedToReview} → review, ${total - processed} skipped`); return { moved_to_queue: movedToQueue, moved_to_review: movedToReview }; } /** * Bulk-approve every ⚡ Ready (auto_heuristic) plan currently in the Review * column (sorted=1, status='pending'). Manual items are untouched. */ export function approveReady(db: ReturnType): number { const ready = db .prepare(` SELECT rp.*, mi.id as item_id FROM review_plans rp JOIN media_items mi ON mi.id = rp.item_id WHERE rp.status = 'pending' AND rp.is_noop = 0 AND rp.sorted = 1 AND rp.auto_class = 'auto_heuristic' `) .all() as (ReviewPlan & { item_id: number })[]; let count = 0; for (const plan of ready) { db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id); const { item, streams, decisions } = loadItemDetail(db, plan.item_id); if (item) { enqueueAudioJob(db, plan.item_id, buildCommand(item, streams, decisions)); count += 1; } } return count; } function countsByFilter(db: ReturnType): Record { const total = (db.prepare("SELECT COUNT(*) as n FROM review_plans").get() as { n: number }).n; const noops = (db.prepare("SELECT COUNT(*) as n FROM review_plans WHERE is_noop = 1").get() as { n: number }).n; const pending = ( db.prepare("SELECT COUNT(*) as n FROM review_plans WHERE status = 'pending' AND is_noop = 0").get() as { n: number } ).n; const approved = ( db.prepare("SELECT COUNT(*) as n FROM review_plans WHERE status = 'approved'").get() as { n: number } ).n; const skipped = (db.prepare("SELECT COUNT(*) as n FROM review_plans WHERE status = 'skipped'").get() as { n: number }) .n; const done = (db.prepare("SELECT COUNT(*) as n FROM review_plans WHERE status = 'done'").get() as { n: number }).n; const error = (db.prepare("SELECT COUNT(*) as n FROM review_plans WHERE status = 'error'").get() as { n: number }).n; const manual = ( db.prepare("SELECT COUNT(*) as n FROM media_items WHERE needs_review = 1 AND original_language IS NULL").get() as { n: number; } ).n; return { all: total, needs_action: pending, noop: noops, approved, skipped, done, error, manual }; } function buildWhereClause(filter: string): string { switch (filter) { case "needs_action": return "rp.status = 'pending' AND rp.is_noop = 0"; case "noop": return "rp.is_noop = 1"; case "manual": return "mi.needs_review = 1 AND mi.original_language IS NULL"; case "approved": return "rp.status = 'approved'"; case "skipped": return "rp.status = 'skipped'"; case "done": return "rp.status = 'done'"; case "error": return "rp.status = 'error'"; default: return "1=1"; } } type RawRow = MediaItem & { plan_id: number | null; plan_status: string | null; is_noop: number | null; plan_notes: string | null; reviewed_at: string | null; plan_created_at: string | null; remove_count: number; keep_count: number; }; function rowToPlan(r: RawRow): ReviewPlan | null { if (r.plan_id == null) return null; return { id: r.plan_id, item_id: r.id, status: r.plan_status ?? "pending", is_noop: r.is_noop ?? 0, notes: r.plan_notes, reviewed_at: r.reviewed_at, created_at: r.plan_created_at ?? "", } as ReviewPlan; } function loadItemDetail(db: ReturnType, itemId: number) { const item = db.prepare("SELECT * FROM media_items WHERE id = ?").get(itemId) as MediaItem | undefined; if (!item) return { item: null, streams: [], plan: null, decisions: [], command: null, job: null }; const streams = db .prepare("SELECT * FROM media_streams WHERE item_id = ? ORDER BY stream_index") .all(itemId) as MediaStream[]; const plan = db.prepare("SELECT * FROM review_plans WHERE item_id = ?").get(itemId) as ReviewPlan | undefined | null; const decisions = plan ? (db.prepare("SELECT * FROM stream_decisions WHERE plan_id = ?").all(plan.id) as StreamDecision[]) : []; const command = plan && !plan.is_noop ? buildCommand(item, streams, decisions) : null; const job = db .prepare( `SELECT id, item_id, command, job_type, status, output, exit_code, created_at, started_at, completed_at FROM jobs WHERE item_id = ? ORDER BY created_at DESC LIMIT 1`, ) .get(itemId) as Job | undefined; return { item, streams, plan: plan ?? null, decisions, command, job: job ?? null }; } /** * Match old custom_titles to new stream IDs after rescan. Keys by a * composite of (type, language, stream_index, title) so user overrides * survive stream-id changes when Jellyfin re-probes metadata. */ export function titleKey(s: { type: string; language: string | null; stream_index: number; title: string | null; }): string { return `${s.type}|${s.language ?? ""}|${s.stream_index}|${s.title ?? ""}`; } export function reanalyze( db: ReturnType, itemId: number, audioLanguages: string[], preservedTitles?: Map, ): void { const item = db.prepare("SELECT * FROM media_items WHERE id = ?").get(itemId) as MediaItem; if (!item) return; const streams = db .prepare("SELECT * FROM media_streams WHERE item_id = ? ORDER BY stream_index") .all(itemId) as MediaStream[]; // Pull prior decisions once so we can pass any custom_language overrides // into the analyzer (so reanalysis respects them) and re-attach them + // custom_title onto the freshly-written decision rows below. Keyed by // stream_id; survives rescan as long as the stream_id is stable. const priorPlan = db.prepare("SELECT id FROM review_plans WHERE item_id = ?").get(itemId) as | { id: number } | undefined; const priorDecisions = priorPlan ? (db .prepare("SELECT stream_id, custom_title, custom_language FROM stream_decisions WHERE plan_id = ?") .all(priorPlan.id) as { stream_id: number; custom_title: string | null; custom_language: string | null }[]) : []; const titleByStreamId = new Map(priorDecisions.map((r) => [r.stream_id, r.custom_title])); const languageByStreamId = new Map(priorDecisions.map((r) => [r.stream_id, r.custom_language])); const languageOverrides = new Map(); for (const [streamId, lang] of languageByStreamId) { if (lang) languageOverrides.set(streamId, lang); } const analysis = analyzeItem( { original_language: item.original_language, orig_lang_source: item.orig_lang_source, needs_review: item.needs_review, container: item.container, }, streams, { audioLanguages }, languageOverrides.size > 0 ? languageOverrides : undefined, ); db .prepare(` INSERT INTO review_plans (item_id, status, is_noop, auto_class, apple_compat, job_type, notes) VALUES (?, 'pending', ?, ?, ?, ?, ?) ON CONFLICT(item_id) DO UPDATE SET status = 'pending', is_noop = excluded.is_noop, auto_class = excluded.auto_class, apple_compat = excluded.apple_compat, job_type = excluded.job_type, notes = excluded.notes `) .run( itemId, analysis.is_noop ? 1 : 0, analysis.auto_class, analysis.apple_compat, analysis.job_type, analysis.notes.length > 0 ? analysis.notes.join("\n") : null, ); const plan = db.prepare("SELECT id FROM review_plans WHERE item_id = ?").get(itemId) as { id: number }; // Preserve existing custom_title/custom_language: prefer by stream_id // (streams unchanged); fall back to titleKey match (streams regenerated // after rescan — only applies to titles since custom_language didn't // exist at the time of the original title snapshot API). const streamById = new Map(streams.map((s) => [s.id, s] as const)); db.prepare("DELETE FROM stream_decisions WHERE plan_id = ?").run(plan.id); const insertDecision = db.prepare( "INSERT INTO stream_decisions (plan_id, stream_id, action, target_index, custom_title, custom_language, transcode_codec) VALUES (?, ?, ?, ?, ?, ?, ?)", ); for (const dec of analysis.decisions) { let customTitle = titleByStreamId.get(dec.stream_id) ?? null; if (!customTitle && preservedTitles) { const s = streamById.get(dec.stream_id); if (s) customTitle = preservedTitles.get(titleKey(s)) ?? null; } const customLanguage = languageByStreamId.get(dec.stream_id) ?? null; insertDecision.run( plan.id, dec.stream_id, dec.action, dec.target_index, customTitle, customLanguage, dec.transcode_codec, ); } } /** * After the user toggles a stream action, re-run assignTargetOrder and * recompute is_noop without wiping user-chosen actions or custom_titles. */ function recomputePlanAfterToggle(db: ReturnType, itemId: number): void { const item = db.prepare("SELECT * FROM media_items WHERE id = ?").get(itemId) as MediaItem | undefined; if (!item) return; const streams = db .prepare("SELECT * FROM media_streams WHERE item_id = ? ORDER BY stream_index") .all(itemId) as MediaStream[]; const plan = db.prepare("SELECT id FROM review_plans WHERE item_id = ?").get(itemId) as { id: number } | undefined; if (!plan) return; const decisions = db .prepare( "SELECT stream_id, action, target_index, custom_language, transcode_codec FROM stream_decisions WHERE plan_id = ?", ) .all(plan.id) as { stream_id: number; action: "keep" | "remove"; target_index: number | null; custom_language: string | null; transcode_codec: string | null; }[]; const origLang = item.original_language ? normalizeLanguage(item.original_language) : null; const audioLanguages = getAudioLanguages(); // Per-stream language overrides drive track ordering (OG first, then // configured keep-languages) so a "und → spa" rename reorders the output // correctly on the next pass. const languageOverrides = new Map(); for (const d of decisions) { if (d.custom_language) languageOverrides.set(d.stream_id, d.custom_language); } // Re-assign target_index based on current actions const decWithIdx = decisions.map((d) => ({ stream_id: d.stream_id, action: d.action, target_index: null as number | null, transcode_codec: d.transcode_codec, })); assignTargetOrder(streams, decWithIdx, origLang, audioLanguages, languageOverrides); const updateIdx = db.prepare("UPDATE stream_decisions SET target_index = ? WHERE plan_id = ? AND stream_id = ?"); for (const d of decWithIdx) updateIdx.run(d.target_index, plan.id, d.stream_id); // Recompute is_noop: audio removed OR reordered OR subs exist OR transcode needed const anyAudioRemoved = streams.some( (s) => s.type === "Audio" && decWithIdx.find((d) => d.stream_id === s.id)?.action === "remove", ); const hasSubs = streams.some((s) => s.type === "Subtitle"); const needsTranscode = decWithIdx.some((d) => d.transcode_codec != null && d.action === "keep"); const keptAudio = streams .filter((s) => s.type === "Audio" && decWithIdx.find((d) => d.stream_id === s.id)?.action === "keep") .sort((a, b) => a.stream_index - b.stream_index); let audioOrderChanged = false; for (let i = 0; i < keptAudio.length; i++) { const dec = decWithIdx.find((d) => d.stream_id === keptAudio[i].id); if (dec?.target_index !== i) { audioOrderChanged = true; break; } } const isNoop = !anyAudioRemoved && !audioOrderChanged && !hasSubs && !needsTranscode; db.prepare("UPDATE review_plans SET is_noop = ? WHERE id = ?").run(isNoop ? 1 : 0, plan.id); } // ─── Pipeline: summary ─────────────────────────────────────────────────────── interface PipelineAudioStream { id: number; language: string | null; codec: string | null; channels: number | null; title: string | null; is_default: number; action: "keep" | "remove"; } type EnrichableRow = { id?: number; plan_id?: number; item_id: number } & { transcode_reasons?: string[]; audio_streams?: PipelineAudioStream[]; }; /** * Enrich review/queued rows with transcode-reason badges and pre-checked audio * streams. Works for both the Review column (where `id` is the plan id) and * the Queued column (where `plan_id` is explicit and `id` is the job id). */ function enrichWithStreamsAndReasons(db: ReturnType, rows: EnrichableRow[]): void { if (rows.length === 0) return; const planIdFor = (r: EnrichableRow): number => (r.plan_id ?? r.id) as number; const planIds = rows.map(planIdFor); const itemIds = rows.map((r) => r.item_id); const reasonPh = planIds.map(() => "?").join(","); const allReasons = db .prepare(` SELECT DISTINCT sd.plan_id, ms.codec, sd.transcode_codec FROM stream_decisions sd JOIN media_streams ms ON ms.id = sd.stream_id WHERE sd.plan_id IN (${reasonPh}) AND sd.transcode_codec IS NOT NULL `) .all(...planIds) as { plan_id: number; codec: string | null; transcode_codec: string }[]; const reasonsByPlan = new Map(); for (const r of allReasons) { if (!reasonsByPlan.has(r.plan_id)) reasonsByPlan.set(r.plan_id, []); reasonsByPlan.get(r.plan_id)!.push(`${(r.codec ?? "").toUpperCase()} → ${r.transcode_codec.toUpperCase()}`); } const streamPh = itemIds.map(() => "?").join(","); const streamRows = db .prepare(` SELECT ms.id, ms.item_id, ms.language, ms.codec, ms.channels, ms.title, ms.is_default, sd.action FROM media_streams ms JOIN review_plans rp ON rp.item_id = ms.item_id LEFT JOIN stream_decisions sd ON sd.plan_id = rp.id AND sd.stream_id = ms.id WHERE ms.item_id IN (${streamPh}) AND ms.type = 'Audio' ORDER BY ms.item_id, ms.stream_index `) .all(...itemIds) as { id: number; item_id: number; language: string | null; codec: string | null; channels: number | null; title: string | null; is_default: number; action: "keep" | "remove" | null; }[]; const streamsByItem = new Map(); for (const r of streamRows) { if (!streamsByItem.has(r.item_id)) streamsByItem.set(r.item_id, []); streamsByItem.get(r.item_id)!.push({ id: r.id, language: r.language, codec: r.codec, channels: r.channels, title: r.title, is_default: r.is_default, action: r.action ?? "keep", }); } for (const r of rows) { r.transcode_reasons = reasonsByPlan.get(planIdFor(r)) ?? []; r.audio_streams = streamsByItem.get(r.item_id) ?? []; } } // ─── Review groups (paginated, always returns complete series) ────────────── interface ReviewItemRow { id: number; item_id: number; status: string; is_noop: number; auto_class: "auto" | "auto_heuristic" | "manual" | null; apple_compat: ReviewPlan["apple_compat"]; job_type: "copy" | "transcode"; name: string; series_name: string | null; series_key: string | null; season_number: number | null; episode_number: number | null; type: "Movie" | "Episode"; container: string | null; original_language: string | null; orig_lang_source: string | null; file_path: string; transcode_reasons?: string[]; audio_streams?: PipelineAudioStream[]; } type ReviewGroup = | { kind: "movie"; item: ReviewItemRow } | { kind: "series"; seriesKey: string; seriesName: string; episodeCount: number; readyCount: number; originalLanguage: string | null; seasons: Array<{ season: number | null; episodes: ReviewItemRow[] }>; }; export type InboxSort = "scan_asc" | "scan_desc" | "name_asc" | "name_desc"; export interface BuildReviewGroupsOpts { bucket: "inbox" | "review"; sort?: InboxSort; } function orderClause(bucket: "inbox" | "review", sort?: InboxSort): string { if (bucket === "review") { return `CASE rp.auto_class WHEN 'auto_heuristic' THEN 0 WHEN 'manual' THEN 1 ELSE 2 END, COALESCE(mi.series_name, mi.name), mi.season_number, mi.episode_number`; } if (sort === "scan_desc") return "mi.last_scanned_at DESC, mi.id DESC"; if (sort === "name_asc") return "COALESCE(mi.series_name, mi.name) ASC, mi.season_number, mi.episode_number"; if (sort === "name_desc") return "COALESCE(mi.series_name, mi.name) DESC, mi.season_number, mi.episode_number"; return "mi.last_scanned_at ASC, mi.id ASC"; } export function buildReviewGroups( db: ReturnType, opts: BuildReviewGroupsOpts, ): { groups: ReviewGroup[]; totalItems: number } { const sortedFilter = opts.bucket === "inbox" ? "rp.sorted = 0" : "rp.sorted = 1"; const order = orderClause(opts.bucket, opts.sort); const rows = db .prepare(` SELECT rp.*, mi.name, mi.series_name, mi.series_key, mi.season_number, mi.episode_number, mi.type, mi.container, mi.original_language, mi.orig_lang_source, mi.file_path, mi.last_scanned_at FROM review_plans rp JOIN media_items mi ON mi.id = rp.item_id WHERE rp.status = 'pending' AND rp.is_noop = 0 AND ${sortedFilter} ORDER BY ${order} `) .all() as ReviewItemRow[]; const movieGroups: ReviewGroup[] = []; interface SeriesAccum { seriesName: string; seasons: Map; originalLanguage: string | null; readyCount: number; } const seriesMap = new Map(); for (const row of rows) { if (row.type === "Movie") { movieGroups.push({ kind: "movie", item: row }); continue; } const key = row.series_key ?? row.series_name ?? String(row.item_id); let entry = seriesMap.get(key); if (!entry) { entry = { seriesName: row.series_name ?? "", seasons: new Map(), originalLanguage: row.original_language, readyCount: 0, }; seriesMap.set(key, entry); } let bucket = entry.seasons.get(row.season_number); if (!bucket) { bucket = []; entry.seasons.set(row.season_number, bucket); } bucket.push(row); if (row.auto_class === "auto_heuristic") entry.readyCount += 1; } const seriesGroups: ReviewGroup[] = []; for (const [seriesKey, entry] of seriesMap) { const seasonKeys = [...entry.seasons.keys()].sort((a, b) => { if (a === null) return 1; if (b === null) return -1; return a - b; }); const seasons = seasonKeys.map((season) => ({ season, episodes: (entry.seasons.get(season) ?? []).sort((a, b) => (a.episode_number ?? 0) - (b.episode_number ?? 0)), })); const episodeCount = seasons.reduce((sum, s) => sum + s.episodes.length, 0); seriesGroups.push({ kind: "series", seriesKey, seriesName: entry.seriesName, episodeCount, readyCount: entry.readyCount, originalLanguage: entry.originalLanguage, seasons, }); } // For inbox, preserve the SQL order (scan time or name). For review, // rank by auto_class so auto-approvable items float to the top. let allGroups: ReviewGroup[]; if (opts.bucket === "inbox") { // Interleave movies and series in the order their first row appeared // in the SQL result set so scan-time ordering stays intact. const groupOrder: ReviewGroup[] = []; const seen = new Set(); for (const row of rows) { if (row.type === "Movie") { const key = `movie:${row.item_id}`; if (!seen.has(key)) { seen.add(key); const mg = movieGroups.find((g) => g.kind === "movie" && g.item.item_id === row.item_id); if (mg) groupOrder.push(mg); } } else { const sk = row.series_key ?? row.series_name ?? String(row.item_id); const key = `series:${sk}`; if (!seen.has(key)) { seen.add(key); const sg = seriesGroups.find((g) => g.kind === "series" && g.seriesKey === sk); if (sg) groupOrder.push(sg); } } } allGroups = groupOrder; } else { allGroups = [...movieGroups, ...seriesGroups].sort((a, b) => { const rankA = a.kind === "movie" ? autoClassRank(a.item.auto_class) : a.readyCount > 0 ? 0 : 1; const rankB = b.kind === "movie" ? autoClassRank(b.item.auto_class) : b.readyCount > 0 ? 0 : 1; if (rankA !== rankB) return rankA - rankB; const nameA = a.kind === "movie" ? a.item.name : a.seriesName; const nameB = b.kind === "movie" ? b.item.name : b.seriesName; return nameA.localeCompare(nameB); }); } const totalItems = movieGroups.length + seriesGroups.reduce((sum, g) => sum + (g.kind === "series" ? g.episodeCount : 0), 0); return { groups: allGroups, totalItems }; } function autoClassRank(cls: string | null): number { if (cls === "auto_heuristic") return 0; if (cls === "manual") return 1; return 2; } app.get("/groups", (c) => { const db = getDb(); const offset = Math.max(0, Number.parseInt(c.req.query("offset") ?? "0", 10) || 0); const limit = Math.max(1, Math.min(200, Number.parseInt(c.req.query("limit") ?? "25", 10) || 25)); const bucketParam = c.req.query("bucket") ?? "review"; const bucket = bucketParam === "inbox" ? "inbox" : "review"; const sortParam = c.req.query("sort") as InboxSort | undefined; const sort = bucket === "inbox" ? (sortParam ?? "scan_asc") : undefined; const { groups, totalItems } = buildReviewGroups(db, { bucket, sort }); const page = groups.slice(offset, offset + limit); const flat: EnrichableRow[] = []; for (const g of page) { if (g.kind === "movie") flat.push(g.item as EnrichableRow); else for (const s of g.seasons) for (const ep of s.episodes) flat.push(ep as EnrichableRow); } enrichWithStreamsAndReasons(db, flat); return c.json({ groups: page, totalGroups: groups.length, totalItems, hasMore: offset + limit < groups.length, }); }); app.get("/pipeline", (c) => { const db = getDb(); const inboxTotal = ( db .prepare("SELECT COUNT(*) as n FROM review_plans WHERE status = 'pending' AND is_noop = 0 AND sorted = 0") .get() as { n: number } ).n; const reviewItemsTotal = ( db .prepare("SELECT COUNT(*) as n FROM review_plans WHERE status = 'pending' AND is_noop = 0 AND sorted = 1") .get() as { n: number } ).n; const reviewReadyCount = ( db .prepare( "SELECT COUNT(*) as n FROM review_plans WHERE status = 'pending' AND is_noop = 0 AND sorted = 1 AND auto_class = 'auto_heuristic'", ) .get() as { n: number } ).n; const reviewManualCount = reviewItemsTotal - reviewReadyCount; const autoProcessing = getConfig("auto_processing") === "1"; const autoProcessQueue = getConfig("auto_process_queue") === "1"; // Queued carries stream + transcode-reason enrichment so the card renders // read-only with a "Back to review" button. const queued = db .prepare(` SELECT j.id, j.item_id, j.status, j.started_at, j.completed_at, mi.name, mi.series_name, mi.series_key, mi.season_number, mi.episode_number, mi.type, mi.container, mi.original_language, mi.orig_lang_source, mi.file_path, rp.id as plan_id, rp.job_type, rp.apple_compat, rp.auto_class, rp.is_noop FROM jobs j JOIN media_items mi ON mi.id = j.item_id JOIN review_plans rp ON rp.item_id = j.item_id WHERE j.status = 'pending' ORDER BY j.created_at `) .all(); const processing = db .prepare(` SELECT j.*, mi.name, mi.series_name, mi.type, rp.job_type, rp.apple_compat FROM jobs j JOIN media_items mi ON mi.id = j.item_id JOIN review_plans rp ON rp.item_id = j.item_id WHERE j.status = 'running' `) .all(); const done = db .prepare(` SELECT j.*, mi.name, mi.series_name, mi.type, rp.job_type, rp.apple_compat FROM jobs j JOIN media_items mi ON mi.id = j.item_id JOIN review_plans rp ON rp.item_id = j.item_id WHERE j.status IN ('done', 'error') ORDER BY j.completed_at DESC `) .all(); // "Done" = files already in the desired end state. Either the analyzer // says nothing to do (is_noop=1) or a job finished. Use two indexable // counts and add — the OR form (is_noop=1 OR status='done') can't use // our single-column indexes and gets slow on large libraries. const noopRow = db.prepare("SELECT COUNT(*) as n FROM review_plans WHERE is_noop = 1").get() as { n: number }; const doneRow = db.prepare("SELECT COUNT(*) as n FROM review_plans WHERE status = 'done' AND is_noop = 0").get() as { n: number; }; const doneCount = noopRow.n + doneRow.n; enrichWithStreamsAndReasons(db, queued as EnrichableRow[]); return c.json({ inboxTotal, reviewItemsTotal, reviewReadyCount, reviewManualCount, autoProcessing, autoProcessQueue, queued, processing, done, doneCount, }); }); // ─── List ───────────────────────────────────────────────────────────────────── app.get("/", (c) => { const db = getDb(); const filter = c.req.query("filter") ?? "all"; const where = buildWhereClause(filter); const movieRows = db .prepare(` SELECT mi.*, rp.id as plan_id, rp.status as plan_status, rp.is_noop, rp.notes as plan_notes, rp.reviewed_at, rp.created_at as plan_created_at, COUNT(CASE WHEN sd.action = 'remove' THEN 1 END) as remove_count, COUNT(CASE WHEN sd.action = 'keep' THEN 1 END) as keep_count FROM media_items mi LEFT JOIN review_plans rp ON rp.item_id = mi.id LEFT JOIN stream_decisions sd ON sd.plan_id = rp.id WHERE mi.type = 'Movie' AND ${where} GROUP BY mi.id ORDER BY mi.name LIMIT 500 `) .all() as RawRow[]; const movies = movieRows.map((r) => ({ item: r as unknown as MediaItem, plan: rowToPlan(r), removeCount: r.remove_count, keepCount: r.keep_count, })); const series = db .prepare(` SELECT COALESCE(mi.series_key, mi.series_name) as series_key, mi.series_name, MAX(mi.original_language) as original_language, COUNT(DISTINCT mi.season_number) as season_count, COUNT(mi.id) as episode_count, SUM(CASE WHEN rp.is_noop = 1 THEN 1 ELSE 0 END) as noop_count, SUM(CASE WHEN rp.status = 'pending' AND rp.is_noop = 0 THEN 1 ELSE 0 END) as needs_action_count, SUM(CASE WHEN rp.status = 'approved' THEN 1 ELSE 0 END) as approved_count, SUM(CASE WHEN rp.status = 'skipped' THEN 1 ELSE 0 END) as skipped_count, SUM(CASE WHEN rp.status = 'done' THEN 1 ELSE 0 END) as done_count, SUM(CASE WHEN rp.status = 'error' THEN 1 ELSE 0 END) as error_count, SUM(CASE WHEN mi.needs_review = 1 AND mi.original_language IS NULL THEN 1 ELSE 0 END) as manual_count FROM media_items mi LEFT JOIN review_plans rp ON rp.item_id = mi.id WHERE mi.type = 'Episode' AND ${where} GROUP BY series_key ORDER BY mi.series_name `) .all(); const totalCounts = countsByFilter(db); return c.json({ movies, series, filter, totalCounts }); }); // ─── Series episodes ────────────────────────────────────────────────────────── app.get("/series/:seriesKey/episodes", (c) => { const db = getDb(); const seriesKey = decodeURIComponent(c.req.param("seriesKey")); const rows = db .prepare(` SELECT mi.*, rp.id as plan_id, rp.status as plan_status, rp.is_noop, rp.notes as plan_notes, rp.reviewed_at, rp.created_at as plan_created_at, COUNT(CASE WHEN sd.action = 'remove' THEN 1 END) as remove_count, 0 as keep_count FROM media_items mi LEFT JOIN review_plans rp ON rp.item_id = mi.id LEFT JOIN stream_decisions sd ON sd.plan_id = rp.id WHERE mi.type = 'Episode' AND (mi.series_key = ? OR (mi.series_key IS NULL AND mi.series_name = ?)) GROUP BY mi.id ORDER BY mi.season_number, mi.episode_number `) .all(seriesKey, seriesKey) as RawRow[]; const seasonMap = new Map(); for (const r of rows) { const season = (r as unknown as { season_number: number | null }).season_number ?? null; if (!seasonMap.has(season)) seasonMap.set(season, []); seasonMap.get(season)!.push({ item: r as unknown as MediaItem, plan: rowToPlan(r), removeCount: r.remove_count }); } const seasons = Array.from(seasonMap.entries()) .sort(([a], [b]) => (a ?? -1) - (b ?? -1)) .map(([season, episodes]) => ({ season, episodes, noopCount: (episodes as { plan: ReviewPlan | null }[]).filter((e) => e.plan?.is_noop).length, actionCount: (episodes as { plan: ReviewPlan | null }[]).filter( (e) => e.plan?.status === "pending" && !e.plan.is_noop, ).length, approvedCount: (episodes as { plan: ReviewPlan | null }[]).filter((e) => e.plan?.status === "approved").length, doneCount: (episodes as { plan: ReviewPlan | null }[]).filter((e) => e.plan?.status === "done").length, })); return c.json({ seasons }); }); // ─── Approve series ─────────────────────────────────────────────────────────── app.post("/series/:seriesKey/approve-all", (c) => { const db = getDb(); const seriesKey = decodeURIComponent(c.req.param("seriesKey")); const pending = db .prepare(` SELECT rp.*, mi.id as item_id FROM review_plans rp JOIN media_items mi ON mi.id = rp.item_id WHERE mi.type = 'Episode' AND (mi.series_key = ? OR (mi.series_key IS NULL AND mi.series_name = ?)) AND rp.status = 'pending' AND rp.is_noop = 0 `) .all(seriesKey, seriesKey) as (ReviewPlan & { item_id: number })[]; for (const plan of pending) { db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id); const { item, streams, decisions } = loadItemDetail(db, plan.item_id); if (item) enqueueAudioJob(db, plan.item_id, buildCommand(item, streams, decisions)); } return c.json({ ok: true, count: pending.length }); }); // ─── Approve season ─────────────────────────────────────────────────────────── app.post("/season/:seriesKey/:season/approve-all", (c) => { const db = getDb(); const seriesKey = decodeURIComponent(c.req.param("seriesKey")); const season = Number.parseInt(c.req.param("season") ?? "", 10); if (!Number.isFinite(season)) return c.json({ error: "invalid season" }, 400); const pending = db .prepare(` SELECT rp.*, mi.id as item_id FROM review_plans rp JOIN media_items mi ON mi.id = rp.item_id WHERE mi.type = 'Episode' AND (mi.series_key = ? OR (mi.series_key IS NULL AND mi.series_name = ?)) AND mi.season_number = ? AND rp.status = 'pending' AND rp.is_noop = 0 `) .all(seriesKey, seriesKey, season) as (ReviewPlan & { item_id: number })[]; for (const plan of pending) { db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id); const { item, streams, decisions } = loadItemDetail(db, plan.item_id); if (item) enqueueAudioJob(db, plan.item_id, buildCommand(item, streams, decisions)); } return c.json({ ok: true, count: pending.length }); }); // ─── Approve all ────────────────────────────────────────────────────────────── app.post("/approve-all", (c) => { const db = getDb(); const pending = db .prepare( "SELECT rp.*, mi.id as item_id FROM review_plans rp JOIN media_items mi ON mi.id = rp.item_id WHERE rp.status = 'pending' AND rp.is_noop = 0", ) .all() as (ReviewPlan & { item_id: number })[]; for (const plan of pending) { db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id); const { item, streams, decisions } = loadItemDetail(db, plan.item_id); if (item) enqueueAudioJob(db, plan.item_id, buildCommand(item, streams, decisions)); } return c.json({ ok: true, count: pending.length }); }); // ─── Batch approve (by item id list) ───────────────────────────────────────── // Used by the "approve up to here" affordance in the review column. The // client knows the visible order (movies + series sort-key) and passes in // the prefix of item ids it wants approved in one round-trip. Items that // aren't pending (already approved / skipped / done) are silently ignored // so the endpoint is idempotent against stale client state. app.post("/approve-batch", async (c) => { const db = getDb(); const body = await c.req.json<{ itemIds?: unknown }>().catch(() => ({ itemIds: undefined })); if ( !Array.isArray(body.itemIds) || !body.itemIds.every((v) => typeof v === "number" && Number.isInteger(v) && v > 0) ) { return c.json({ ok: false, error: "itemIds must be an array of positive integers" }, 400); } const ids = body.itemIds as number[]; if (ids.length === 0) return c.json({ ok: true, count: 0 }); const placeholders = ids.map(() => "?").join(","); const pending = db .prepare( `SELECT rp.*, mi.id as item_id FROM review_plans rp JOIN media_items mi ON mi.id = rp.item_id WHERE rp.status = 'pending' AND rp.is_noop = 0 AND mi.id IN (${placeholders})`, ) .all(...ids) as (ReviewPlan & { item_id: number })[]; let count = 0; for (const plan of pending) { db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id); const { item, streams, decisions } = loadItemDetail(db, plan.item_id); if (item) { enqueueAudioJob(db, plan.item_id, buildCommand(item, streams, decisions)); count++; } } return c.json({ ok: true, count }); }); // ─── Process inbox ────────────────────────────────────────────────────────── // Distributor: walks every unsorted plan, resolves language, reanalyzes, and // distributes to Queue/Review. Runs in the background so the frontend can // update progressively via SSE events. let processInboxAbort: AbortController | null = null; app.post("/process-inbox", async (c) => { if (processInboxAbort) { return c.json({ ok: false, error: "processing already running" }, 409); } processInboxAbort = new AbortController(); const { signal } = processInboxAbort; // Fire and forget — the frontend tracks progress via SSE events. const db = getDb(); processInbox(db, getAudioLanguages(), undefined, { onStart: emitInboxSortStart, onProgress: emitInboxSortProgress, signal, }) .then((result) => emitInboxSorted(result)) .catch(() => emitInboxSorted({ moved_to_queue: 0, moved_to_review: 0 })) .finally(() => { processInboxAbort = null; }); return c.json({ ok: true }); }); app.post("/process-inbox/stop", (c) => { if (processInboxAbort) { processInboxAbort.abort(); return c.json({ ok: true }); } return c.json({ ok: true, message: "not running" }); }); // ─── Approve all ready ─────────────────────────────────────────────────────── // Bulk-approves every auto_heuristic-classified plan currently in Review. app.post("/approve-ready", (c) => { const db = getDb(); const count = approveReady(db); return c.json({ ok: true, count }); }); // ─── Send to inbox (shared logic) ──────────────────────────────────────────── // Resets a plan to inbox state: pending, unsorted. Deletes any non-running job. // Used by all "← Back to inbox" actions regardless of which column they come from. export function sendToInbox(db: ReturnType, itemId: number): void { db .prepare( "UPDATE review_plans SET status = 'pending', reviewed_at = NULL, sorted = 0, auto_class = NULL WHERE item_id = ? AND status != 'running'", ) .run(itemId); db.prepare("DELETE FROM jobs WHERE item_id = ? AND status IN ('pending', 'done', 'error')").run(itemId); } /** Send all items in Review back to inbox. */ export function unsortAll(db: ReturnType): number { const rows = db .prepare("SELECT item_id FROM review_plans WHERE status = 'pending' AND is_noop = 0 AND sorted = 1") .all() as { item_id: number }[]; for (const { item_id } of rows) sendToInbox(db, item_id); return rows.length; } app.post("/unsort-all", (c) => { const count = unsortAll(getDb()); return c.json({ ok: true, count }); }); /** Send all done/errored items back to inbox. */ export function reopenAllDone(db: ReturnType): number { const rows = db.prepare("SELECT item_id FROM review_plans WHERE status IN ('done', 'error')").all() as { item_id: number; }[]; for (const { item_id } of rows) sendToInbox(db, item_id); return rows.length; } app.post("/reopen-all", (c) => { const count = reopenAllDone(getDb()); return c.json({ ok: true, count }); }); // ─── Detail ─────────────────────────────────────────────────────────────────── app.get("/:id", (c) => { const db = getDb(); const id = parseId(c.req.param("id")); if (id == null) return c.json({ error: "invalid id" }, 400); const detail = loadItemDetail(db, id); if (!detail.item) return c.notFound(); return c.json(detail); }); // ─── Override language ──────────────────────────────────────────────────────── app.patch("/:id/language", async (c) => { const db = getDb(); const id = parseId(c.req.param("id")); if (id == null) return c.json({ error: "invalid id" }, 400); const body = await c.req.json<{ language: string | null }>(); const lang = body.language || null; db .prepare("UPDATE media_items SET original_language = ?, orig_lang_source = 'manual', needs_review = 0 WHERE id = ?") .run(lang ? normalizeLanguage(lang) : null, id); reanalyze(db, id, getAudioLanguages()); const detail = loadItemDetail(db, id); if (!detail.item) return c.notFound(); return c.json(detail); }); // ─── Edit stream title ──────────────────────────────────────────────────────── app.patch("/:id/stream/:streamId/title", async (c) => { const db = getDb(); const itemId = parseId(c.req.param("id")); const streamId = parseId(c.req.param("streamId")); if (itemId == null || streamId == null) return c.json({ error: "invalid id" }, 400); const body = await c.req.json<{ title: string }>(); const title = (body.title ?? "").trim() || null; const plan = db.prepare("SELECT id FROM review_plans WHERE item_id = ?").get(itemId) as { id: number } | undefined; if (!plan) return c.notFound(); db .prepare("UPDATE stream_decisions SET custom_title = ? WHERE plan_id = ? AND stream_id = ?") .run(title, plan.id, streamId); const detail = loadItemDetail(db, itemId); if (!detail.item) return c.notFound(); return c.json(detail); }); // ─── Override stream language ──────────────────────────────────────────────── // Per-stream language override. Used to correct an "und" or mislabeled audio // track without going through Jellyfin. Pass `language: null` to clear the // override. The value is normalized (e.g. "es"/"spa"/"spanish" → "spa") before // storage; invalid codes are rejected. Reanalysis runs after the update so // keep/remove decisions, track ordering, and is_noop reflect the new language // immediately. app.patch("/:id/stream/:streamId/language", async (c) => { const db = getDb(); const itemId = parseId(c.req.param("id")); const streamId = parseId(c.req.param("streamId")); if (itemId == null || streamId == null) return c.json({ error: "invalid id" }, 400); const body = await c.req.json<{ language: unknown }>().catch(() => ({ language: undefined })); let language: string | null; if (body.language === null || body.language === "") { language = null; } else if (typeof body.language === "string") { const normalized = normalizeLanguage(body.language); // Guard against typos and arbitrary strings — only accept codes the // app's lang dictionary knows about so downstream display + ffmpeg // metadata stays consistent. if (!(normalized in LANG_NAMES)) { return c.json({ error: `unknown language code: ${body.language}` }, 400); } language = normalized; } else { return c.json({ error: "language must be a string or null" }, 400); } // Only audio streams carry a meaningful language override; video/data // streams have no language semantics, and subtitle streams are always // removed from the container (managed separately from this app). const stream = db.prepare("SELECT type, item_id FROM media_streams WHERE id = ?").get(streamId) as | { type: string; item_id: number } | undefined; if (!stream || stream.item_id !== itemId) return c.json({ error: "stream not found on item" }, 404); if (stream.type !== "Audio") return c.json({ error: "language override only applies to audio streams" }, 400); const plan = db.prepare("SELECT id FROM review_plans WHERE item_id = ?").get(itemId) as { id: number } | undefined; if (!plan) return c.notFound(); db .prepare("UPDATE stream_decisions SET custom_language = ? WHERE plan_id = ? AND stream_id = ?") .run(language, plan.id, streamId); // Full reanalysis: a language change can flip the track's keep/remove // status (if the new language isn't in the keep list), shuffle target // indices (OG-match goes first), or flip is_noop. Cheaper and more // predictable than trying to patch each derived field in place. reanalyze(db, itemId, getAudioLanguages()); const detail = loadItemDetail(db, itemId); if (!detail.item) return c.notFound(); return c.json(detail); }); // ─── Toggle stream action ───────────────────────────────────────────────────── app.patch("/:id/stream/:streamId", async (c) => { const db = getDb(); const itemId = parseId(c.req.param("id")); const streamId = parseId(c.req.param("streamId")); if (itemId == null || streamId == null) return c.json({ error: "invalid id" }, 400); const body = await c.req.json<{ action: unknown }>().catch(() => ({ action: null })); if (!isOneOf(body.action, ["keep", "remove"] as const)) { return c.json({ error: 'action must be "keep" or "remove"' }, 400); } const action: "keep" | "remove" = body.action; // Only audio streams can be toggled — subtitles are always removed (extracted to sidecar) const stream = db.prepare("SELECT type, item_id FROM media_streams WHERE id = ?").get(streamId) as | { type: string; item_id: number } | undefined; if (!stream || stream.item_id !== itemId) return c.json({ error: "stream not found on item" }, 404); if (stream.type === "Subtitle") return c.json({ error: "Subtitle streams cannot be toggled" }, 400); const plan = db.prepare("SELECT id FROM review_plans WHERE item_id = ?").get(itemId) as { id: number } | undefined; if (!plan) return c.notFound(); db .prepare("UPDATE stream_decisions SET action = ? WHERE plan_id = ? AND stream_id = ?") .run(action, plan.id, streamId); recomputePlanAfterToggle(db, itemId); const detail = loadItemDetail(db, itemId); if (!detail.item) return c.notFound(); return c.json(detail); }); // ─── Approve ────────────────────────────────────────────────────────────────── app.post("/:id/approve", (c) => { const db = getDb(); const id = parseId(c.req.param("id")); if (id == null) return c.json({ error: "invalid id" }, 400); const plan = db.prepare("SELECT * FROM review_plans WHERE item_id = ?").get(id) as ReviewPlan | undefined; if (!plan) return c.notFound(); db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id); if (!plan.is_noop) { const { item, streams, decisions } = loadItemDetail(db, id); if (item) enqueueAudioJob(db, id, buildCommand(item, streams, decisions)); } return c.json({ ok: true }); }); // ─── Unapprove ─────────────────────────────────────────────────────────────── // ─── Retry failed job ───────────────────────────────────────────────────────── app.post("/:id/retry", (c) => { const db = getDb(); const id = parseId(c.req.param("id")); if (id == null) return c.json({ error: "invalid id" }, 400); const plan = db.prepare("SELECT * FROM review_plans WHERE item_id = ?").get(id) as ReviewPlan | undefined; if (!plan) return c.notFound(); if (plan.status !== "error") return c.json({ ok: false, error: "Only failed plans can be retried" }, 409); // Clear old errored/done jobs for this item so the queue starts clean db.prepare("DELETE FROM jobs WHERE item_id = ? AND status IN ('error', 'done')").run(id); // Rebuild the command from the current decisions (streams may have been edited) const { item, command } = loadItemDetail(db, id); if (!item || !command) return c.json({ ok: false, error: "Cannot rebuild command" }, 400); enqueueAudioJob(db, id, command); db.prepare("UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?").run(plan.id); return c.json({ ok: true }); }); // Reopen a completed or errored plan: flip it back to pending and send it to // the Inbox (sorted=0) so the user can adjust settings and have Auto Review // redo the classification. Used by the Done column's hover "Back to inbox" // affordance. Unlike /unapprove (which rolls back an approved-but-not-yet- // running plan), this handles the post-job states and drops the lingering // job row so the pipeline doesn't show leftover history for an item that's // about to be re-sorted. app.post("/:id/reopen", (c) => { const db = getDb(); const id = parseId(c.req.param("id")); if (id == null) return c.json({ error: "invalid id" }, 400); const plan = db.prepare("SELECT * FROM review_plans WHERE item_id = ?").get(id) as ReviewPlan | undefined; if (!plan) return c.notFound(); sendToInbox(db, id); return c.json({ ok: true }); }); app.post("/:id/unapprove", (c) => { const db = getDb(); const id = parseId(c.req.param("id")); if (id == null) return c.json({ error: "invalid id" }, 400); const plan = db.prepare("SELECT * FROM review_plans WHERE item_id = ?").get(id) as ReviewPlan | undefined; if (!plan) return c.notFound(); // Only block if a job is actively running const job = db.prepare("SELECT status FROM jobs WHERE item_id = ? ORDER BY created_at DESC LIMIT 1").get(id) as | { status: string } | undefined; if (job?.status === "running") return c.json({ ok: false, error: "Job is running — cannot send back to inbox" }, 409); sendToInbox(db, id); return c.json({ ok: true }); }); // ─── Skip / Unskip ─────────────────────────────────────────────────────────── app.post("/skip-all", (c) => { const db = getDb(); const result = db .prepare( "UPDATE review_plans SET status = 'skipped', reviewed_at = datetime('now') WHERE status = 'pending' AND is_noop = 0", ) .run(); return c.json({ ok: true, skipped: result.changes }); }); app.post("/:id/skip", (c) => { const db = getDb(); const id = parseId(c.req.param("id")); if (id == null) return c.json({ error: "invalid id" }, 400); db.prepare("UPDATE review_plans SET status = 'skipped', reviewed_at = datetime('now') WHERE item_id = ?").run(id); return c.json({ ok: true }); }); app.post("/:id/unskip", (c) => { const db = getDb(); const id = parseId(c.req.param("id")); if (id == null) return c.json({ error: "invalid id" }, 400); db .prepare("UPDATE review_plans SET status = 'pending', reviewed_at = NULL WHERE item_id = ? AND status = 'skipped'") .run(id); return c.json({ ok: true }); }); // ─── Rescan ─────────────────────────────────────────────────────────────────── app.post("/:id/rescan", async (c) => { const db = getDb(); const id = parseId(c.req.param("id")); if (id == null) return c.json({ error: "invalid id" }, 400); const item = db.prepare("SELECT * FROM media_items WHERE id = ?").get(id) as MediaItem | undefined; if (!item) return c.notFound(); // Reset plan to inbox state so processInbox handles language resolution + analysis db .prepare( "UPDATE review_plans SET status = 'pending', sorted = 0, auto_class = NULL WHERE item_id = ? AND status != 'running'", ) .run(id); // Delete pending jobs db.prepare("DELETE FROM jobs WHERE item_id = ? AND status = 'pending'").run(id); // Auto-process if enabled (processInbox handles language resolution + reanalysis) if (getConfig("auto_processing") === "1") { await processInbox(db, getAudioLanguages()); } emitPipelineChanged(); return c.json({ ok: true, inInbox: true }); }); // ─── Rescan series / season ────────────────────────────────────────────────── app.post("/rescan-series", async (c) => { const body = await c.req.json<{ seriesKey: string; seasonNumber?: number }>(); if (!body.seriesKey) return c.json({ error: "seriesKey required" }, 400); const db = getDb(); // Get all item IDs for this series (+ optional season filter) let query = "SELECT id FROM media_items WHERE series_key = ?"; const params: (string | number)[] = [body.seriesKey]; if (body.seasonNumber != null) { query += " AND season_number = ?"; params.push(body.seasonNumber); } const items = db.prepare(query).all(...params) as { id: number }[]; if (items.length === 0) return c.json({ ok: true, count: 0 }); // Reset each item to inbox for (const item of items) { db .prepare( "UPDATE review_plans SET status = 'pending', sorted = 0, auto_class = NULL WHERE item_id = ? AND status != 'running'", ) .run(item.id); db.prepare("DELETE FROM jobs WHERE item_id = ? AND status = 'pending'").run(item.id); } // Auto-process if enabled if (getConfig("auto_processing") === "1") { await processInbox(db, getAudioLanguages()); } emitPipelineChanged(); return c.json({ ok: true, count: items.length }); }); // ─── Delete item (+ optional Radarr/Sonarr re-fetch) ───────────────────────── // Escape hatch for files that this app can't usefully fix — e.g. a release // whose only audio track is commentary. Deletes the local file and the item's // DB rows (cascades to streams, plans, decisions, jobs); when `refetch` is // set and the matching *arr is configured, also asks Radarr/Sonarr to rescan // (so they notice the deletion) and to search for a replacement release. The // response always carries the file + db outcome; refetch is best-effort and // its own status lives under `refetch` so the UI can surface partial // successes ("file gone, but Radarr didn't have this movie"). app.post("/:id/delete", async (c) => { const db = getDb(); const id = parseId(c.req.param("id")); if (id == null) return c.json({ error: "invalid id" }, 400); const body = await c.req.json<{ refetch?: unknown }>().catch(() => ({ refetch: false })); const refetch = body.refetch === true; const item = db.prepare("SELECT * FROM media_items WHERE id = ?").get(id) as MediaItem | undefined; if (!item) return c.notFound(); let fileDeleted = false; let fileError: string | null = null; try { unlinkSync(item.file_path); fileDeleted = true; } catch (err) { const msg = (err as NodeJS.ErrnoException).code === "ENOENT" ? "file already gone" : String(err); fileError = msg; logError(`delete item ${id}: unlink ${item.file_path} failed:`, msg); } // DB row first — cascade drops streams, decisions, plans, jobs. Kept // separate from the unlink so a disk-write failure doesn't leave us with // half a cleanup; the user can still re-delete the orphan file manually. db.prepare("DELETE FROM media_items WHERE id = ?").run(id); let refetchResult: { triggered: boolean; service: "radarr" | "sonarr" | null; error?: string } = { triggered: false, service: null, }; if (refetch) { if (item.type === "Movie") { const cfg = { url: getConfig("radarr_url") ?? "", apiKey: getConfig("radarr_api_key") ?? "" }; const result = await triggerMovieRefetch(cfg, { tmdbId: item.tmdb_id, imdbId: item.imdb_id }); refetchResult = { triggered: result.ok, service: "radarr", error: result.error }; } else if (item.type === "Episode") { const cfg = { url: getConfig("sonarr_url") ?? "", apiKey: getConfig("sonarr_api_key") ?? "" }; // Episodes look up via the *series* tvdbId — individual episode // rows don't carry their own tvdb id. The season/episode number // pins down which episode Sonarr should re-search. const tvdbId = item.tvdb_id; const result = await triggerEpisodeRefetch(cfg, { tvdbId, seasonNumber: item.season_number, episodeNumber: item.episode_number, }); refetchResult = { triggered: result.ok, service: "sonarr", error: result.error }; } } return c.json({ ok: true, file: { deleted: fileDeleted, path: item.file_path, error: fileError }, db: { deleted: true }, refetch: refetchResult, }); }); // ─── Pipeline: series language ─────────────────────────────────────────────── app.patch("/series/:seriesKey/language", async (c) => { const seriesKey = decodeURIComponent(c.req.param("seriesKey")); const { language } = await c.req.json<{ language: string }>(); const db = getDb(); const items = db .prepare("SELECT id FROM media_items WHERE series_key = ? OR (series_key IS NULL AND series_name = ?)") .all(seriesKey, seriesKey) as { id: number }[]; const normalizedLang = language ? normalizeLanguage(language) : null; for (const item of items) { db .prepare("UPDATE media_items SET original_language = ?, orig_lang_source = 'manual', needs_review = 0 WHERE id = ?") .run(normalizedLang, item.id); } // Re-analyze all episodes const audioLanguages = getAudioLanguages(); for (const item of items) { reanalyze(db, item.id, audioLanguages); } return c.json({ updated: items.length }); }); export default app;