All checks were successful
Build and Push Docker Image / build (push) Successful in 1m54s
- install ffmpeg in dockerfile (fixes exit code 127) - buildCommand() now audio-only remux, no subtitle extraction - add unapprove endpoint + ui button for approved items - add batch extract-all subtitles endpoint + ui button - audio detail page shows only video+audio streams - remove global movies_path/series_path config, add per-node path mapping - remove docker-in-docker command building (buildDockerCommand, buildDockerExtractOnlyCommand) - ssh execution translates /movies/ and /series/ to node-specific paths - remove media paths section from setup page - add unraid-template.xml Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
233 lines
12 KiB
TypeScript
233 lines
12 KiB
TypeScript
import { Hono } from 'hono';
|
|
import { stream } from 'hono/streaming';
|
|
import { getDb } from '../db/index';
|
|
import { execStream } from '../services/ssh';
|
|
import type { Job, Node, MediaItem, MediaStream } from '../types';
|
|
import { predictExtractedFiles } from '../services/ffmpeg';
|
|
import { accessSync, constants } from 'node:fs';
|
|
|
|
const app = new Hono();
|
|
|
|
// ─── SSE state ────────────────────────────────────────────────────────────────
|
|
|
|
const jobListeners = new Set<(data: string) => void>();
|
|
|
|
function emitJobUpdate(jobId: number, status: string, output?: string): void {
|
|
const line = `event: job_update\ndata: ${JSON.stringify({ id: jobId, status, output })}\n\n`;
|
|
for (const l of jobListeners) l(line);
|
|
}
|
|
|
|
function loadJobRow(jobId: number) {
|
|
const db = getDb();
|
|
const row = db.prepare(`
|
|
SELECT j.*, mi.id as mi_id, mi.name, mi.type, mi.series_name, mi.season_number,
|
|
mi.episode_number, mi.file_path,
|
|
n.id as n_id, n.name as node_name, n.host, n.port, n.username,
|
|
n.private_key, n.ffmpeg_path, n.work_dir, n.status as node_status
|
|
FROM jobs j
|
|
LEFT JOIN media_items mi ON mi.id = j.item_id
|
|
LEFT JOIN nodes n ON n.id = j.node_id
|
|
WHERE j.id = ?
|
|
`).get(jobId) as (Job & {
|
|
mi_id: number | null; name: string | null; type: string | null;
|
|
series_name: string | null; season_number: number | null; episode_number: number | null;
|
|
file_path: string | null; n_id: number | null; node_name: string | null;
|
|
host: string | null; port: number | null; username: string | null;
|
|
private_key: string | null; ffmpeg_path: string | null; work_dir: string | null; node_status: string | null;
|
|
}) | undefined;
|
|
|
|
if (!row) return null;
|
|
|
|
const nodes = db.prepare('SELECT * FROM nodes ORDER BY name').all() as Node[];
|
|
const item = row.name ? { id: row.item_id, name: row.name, type: row.type, series_name: row.series_name, season_number: row.season_number, episode_number: row.episode_number, file_path: row.file_path } as unknown as MediaItem : null;
|
|
const node = row.node_name ? { id: row.node_id!, name: row.node_name, host: row.host!, port: row.port!, username: row.username!, private_key: row.private_key!, ffmpeg_path: row.ffmpeg_path!, work_dir: row.work_dir!, status: row.node_status! } as unknown as Node : null;
|
|
|
|
return { job: row as unknown as Job, item, node, nodes };
|
|
}
|
|
|
|
// ─── List ─────────────────────────────────────────────────────────────────────
|
|
|
|
app.get('/', (c) => {
|
|
const db = getDb();
|
|
const jobRows = db.prepare(`
|
|
SELECT j.*, mi.name, mi.type, mi.series_name, mi.season_number, mi.episode_number, mi.file_path,
|
|
n.name as node_name, n.host, n.port, n.username, n.private_key, n.ffmpeg_path, n.work_dir, n.status as node_status
|
|
FROM jobs j
|
|
LEFT JOIN media_items mi ON mi.id = j.item_id
|
|
LEFT JOIN nodes n ON n.id = j.node_id
|
|
ORDER BY j.created_at DESC LIMIT 200
|
|
`).all() as (Job & { name: string; type: string; series_name: string | null; season_number: number | null; episode_number: number | null; file_path: string; node_name: string | null; host: string | null; port: number | null; username: string | null; private_key: string | null; ffmpeg_path: string | null; work_dir: string | null; node_status: string | null; })[];
|
|
|
|
const jobs = jobRows.map((r) => ({
|
|
job: r as unknown as Job,
|
|
item: r.name ? { id: r.item_id, name: r.name, type: r.type, series_name: r.series_name, season_number: r.season_number, episode_number: r.episode_number, file_path: r.file_path } as unknown as MediaItem : null,
|
|
node: r.node_name ? { id: r.node_id!, name: r.node_name, host: r.host!, port: r.port!, username: r.username!, private_key: r.private_key!, ffmpeg_path: r.ffmpeg_path!, work_dir: r.work_dir!, status: r.node_status! } as unknown as Node : null,
|
|
}));
|
|
|
|
const nodes = db.prepare('SELECT * FROM nodes ORDER BY name').all() as Node[];
|
|
return c.json({ jobs, nodes });
|
|
});
|
|
|
|
// ─── Start all pending ────────────────────────────────────────────────────────
|
|
|
|
app.post('/start', (c) => {
|
|
const db = getDb();
|
|
const pending = db.prepare("SELECT * FROM jobs WHERE status = 'pending' ORDER BY created_at").all() as Job[];
|
|
for (const job of pending) runJob(job).catch((err) => console.error(`Job ${job.id} failed:`, err));
|
|
return c.json({ ok: true, started: pending.length });
|
|
});
|
|
|
|
// ─── Assign node ──────────────────────────────────────────────────────────────
|
|
|
|
app.post('/job/:id/assign', async (c) => {
|
|
const db = getDb();
|
|
const jobId = Number(c.req.param('id'));
|
|
const body = await c.req.json<{ node_id: number | null }>();
|
|
db.prepare('UPDATE jobs SET node_id = ? WHERE id = ?').run(body.node_id, jobId);
|
|
const result = loadJobRow(jobId);
|
|
if (!result) return c.notFound();
|
|
return c.json(result);
|
|
});
|
|
|
|
// ─── Run single ───────────────────────────────────────────────────────────────
|
|
|
|
app.post('/job/:id/run', async (c) => {
|
|
const db = getDb();
|
|
const jobId = Number(c.req.param('id'));
|
|
const job = db.prepare('SELECT * FROM jobs WHERE id = ?').get(jobId) as Job | undefined;
|
|
if (!job || job.status !== 'pending') {
|
|
const result = loadJobRow(jobId);
|
|
if (!result) return c.notFound();
|
|
return c.json(result);
|
|
}
|
|
runJob(job).catch((err) => console.error(`Job ${job.id} failed:`, err));
|
|
const result = loadJobRow(jobId);
|
|
if (!result) return c.notFound();
|
|
return c.json(result);
|
|
});
|
|
|
|
// ─── Cancel ───────────────────────────────────────────────────────────────────
|
|
|
|
app.post('/job/:id/cancel', (c) => {
|
|
const db = getDb();
|
|
const jobId = Number(c.req.param('id'));
|
|
db.prepare("DELETE FROM jobs WHERE id = ? AND status = 'pending'").run(jobId);
|
|
return c.json({ ok: true });
|
|
});
|
|
|
|
// ─── SSE ──────────────────────────────────────────────────────────────────────
|
|
|
|
app.get('/events', (c) => {
|
|
return stream(c, async (s) => {
|
|
c.header('Content-Type', 'text/event-stream');
|
|
c.header('Cache-Control', 'no-cache');
|
|
|
|
const queue: string[] = [];
|
|
let resolve: (() => void) | null = null;
|
|
const listener = (data: string) => { queue.push(data); resolve?.(); };
|
|
|
|
jobListeners.add(listener);
|
|
s.onAbort(() => { jobListeners.delete(listener); });
|
|
|
|
try {
|
|
while (!s.closed) {
|
|
if (queue.length > 0) {
|
|
await s.write(queue.shift()!);
|
|
} else {
|
|
await new Promise<void>((res) => { resolve = res; setTimeout(res, 15_000); });
|
|
resolve = null;
|
|
if (queue.length === 0) await s.write(': keepalive\n\n');
|
|
}
|
|
}
|
|
} finally {
|
|
jobListeners.delete(listener);
|
|
}
|
|
});
|
|
});
|
|
|
|
// ─── Job execution ────────────────────────────────────────────────────────────
|
|
|
|
async function runJob(job: Job): Promise<void> {
|
|
const db = getDb();
|
|
|
|
if (!job.node_id) {
|
|
const itemRow = db.prepare('SELECT file_path FROM media_items WHERE id = ?').get(job.item_id) as { file_path: string } | undefined;
|
|
if (itemRow?.file_path) {
|
|
try { accessSync(itemRow.file_path, constants.R_OK | constants.W_OK); } catch (fsErr) {
|
|
const msg = `File not accessible: ${itemRow.file_path}\n${(fsErr as Error).message}`;
|
|
db.prepare("UPDATE jobs SET status = 'error', output = ?, exit_code = 1, completed_at = datetime('now') WHERE id = ?").run(msg, job.id);
|
|
emitJobUpdate(job.id, 'error', msg);
|
|
db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
db.prepare("UPDATE jobs SET status = 'running', started_at = datetime('now'), output = '' WHERE id = ?").run(job.id);
|
|
emitJobUpdate(job.id, 'running');
|
|
|
|
let outputLines: string[] = [];
|
|
const flush = (final = false) => {
|
|
const text = outputLines.join('\n');
|
|
if (final || outputLines.length % 10 === 0) db.prepare('UPDATE jobs SET output = ? WHERE id = ?').run(text, job.id);
|
|
emitJobUpdate(job.id, 'running', text);
|
|
};
|
|
|
|
try {
|
|
if (job.node_id) {
|
|
const node = db.prepare('SELECT * FROM nodes WHERE id = ?').get(job.node_id) as Node | undefined;
|
|
if (!node) throw new Error(`Node ${job.node_id} not found`);
|
|
// Translate container paths to node-specific mount paths
|
|
let cmd = job.command;
|
|
if (node.movies_path) cmd = cmd.replaceAll('/movies/', node.movies_path.replace(/\/$/, '') + '/');
|
|
if (node.series_path) cmd = cmd.replaceAll('/series/', node.series_path.replace(/\/$/, '') + '/');
|
|
for await (const line of execStream(node, cmd)) { outputLines.push(line); flush(); }
|
|
} else {
|
|
const proc = Bun.spawn(['sh', '-c', job.command], { stdout: 'pipe', stderr: 'pipe' });
|
|
const readStream = async (readable: ReadableStream<Uint8Array>, prefix = '') => {
|
|
const reader = readable.getReader();
|
|
const decoder = new TextDecoder();
|
|
try {
|
|
while (true) {
|
|
const { done, value } = await reader.read();
|
|
if (done) break;
|
|
const text = decoder.decode(value);
|
|
const lines = text.split('\n').filter((l) => l.trim());
|
|
for (const line of lines) outputLines.push(prefix + line);
|
|
flush();
|
|
}
|
|
} catch { /* ignore */ }
|
|
};
|
|
await Promise.all([readStream(proc.stdout), readStream(proc.stderr, '[stderr] '), proc.exited]);
|
|
const exitCode = await proc.exited;
|
|
if (exitCode !== 0) throw new Error(`FFmpeg exited with code ${exitCode}`);
|
|
}
|
|
|
|
const fullOutput = outputLines.join('\n');
|
|
db.prepare("UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?").run(fullOutput, job.id);
|
|
emitJobUpdate(job.id, 'done', fullOutput);
|
|
db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?").run(job.item_id);
|
|
|
|
// Populate subtitle_files table with extracted sidecar files
|
|
try {
|
|
const item = db.prepare('SELECT * FROM media_items WHERE id = ?').get(job.item_id) as MediaItem | undefined;
|
|
const streams = db.prepare('SELECT * FROM media_streams WHERE item_id = ?').all(job.item_id) as MediaStream[];
|
|
if (item && streams.length > 0) {
|
|
const files = predictExtractedFiles(item, streams);
|
|
const insertFile = db.prepare('INSERT OR IGNORE INTO subtitle_files (item_id, file_path, language, codec, is_forced, is_hearing_impaired) VALUES (?, ?, ?, ?, ?, ?)');
|
|
for (const f of files) {
|
|
insertFile.run(job.item_id, f.file_path, f.language, f.codec, f.is_forced ? 1 : 0, f.is_hearing_impaired ? 1 : 0);
|
|
}
|
|
db.prepare('UPDATE review_plans SET subs_extracted = 1 WHERE item_id = ?').run(job.item_id);
|
|
}
|
|
} catch (subErr) { console.error('Failed to record extracted subtitle files:', subErr); }
|
|
} catch (err) {
|
|
const fullOutput = outputLines.join('\n') + '\n' + String(err);
|
|
db.prepare("UPDATE jobs SET status = 'error', exit_code = 1, output = ?, completed_at = datetime('now') WHERE id = ?").run(fullOutput, job.id);
|
|
emitJobUpdate(job.id, 'error', fullOutput);
|
|
db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id);
|
|
}
|
|
}
|
|
|
|
export default app;
|