All checks were successful
Build and Push Docker Image / build (push) Successful in 19s
"run all" now groups pending jobs by target (local or node), runs them one by one within each group, but runs different targets in parallel. single "run" button still fires immediately. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
294 lines
14 KiB
TypeScript
294 lines
14 KiB
TypeScript
import { Hono } from 'hono';
|
|
import { stream } from 'hono/streaming';
|
|
import { getDb, applyPathMappings, applyPathMappingsToCommand } from '../db/index';
|
|
import { execStream } from '../services/ssh';
|
|
import type { Job, Node, MediaItem, MediaStream } from '../types';
|
|
import { predictExtractedFiles } from '../services/ffmpeg';
|
|
import { accessSync, constants } from 'node:fs';
|
|
import { log, error as logError } from '../lib/log';
|
|
|
|
const app = new Hono();
|
|
|
|
// ─── Sequential queue per target ─────────────────────────────────────────────
|
|
|
|
const runningTargets = new Set<string>();
|
|
|
|
function targetKey(nodeId: number | null): string {
|
|
return nodeId ? `node-${nodeId}` : 'local';
|
|
}
|
|
|
|
/** Run a list of jobs sequentially on the same target. */
|
|
async function runSequential(jobs: Job[]): Promise<void> {
|
|
const key = targetKey(jobs[0]?.node_id ?? null);
|
|
if (runningTargets.has(key)) return; // already processing this target
|
|
runningTargets.add(key);
|
|
try {
|
|
for (const job of jobs) {
|
|
// Re-check status — job may have been cancelled while queued
|
|
const db = getDb();
|
|
const fresh = db.prepare('SELECT status FROM jobs WHERE id = ?').get(job.id) as { status: string } | undefined;
|
|
if (!fresh || fresh.status !== 'pending') continue;
|
|
try { await runJob(job); } catch (err) { logError(`Job ${job.id} failed:`, err); }
|
|
}
|
|
} finally {
|
|
runningTargets.delete(key);
|
|
}
|
|
}
|
|
|
|
// ─── SSE state ────────────────────────────────────────────────────────────────
|
|
|
|
const jobListeners = new Set<(data: string) => void>();
|
|
|
|
function emitJobUpdate(jobId: number, status: string, output?: string): void {
|
|
const line = `event: job_update\ndata: ${JSON.stringify({ id: jobId, status, output })}\n\n`;
|
|
for (const l of jobListeners) l(line);
|
|
}
|
|
|
|
function loadJobRow(jobId: number) {
|
|
const db = getDb();
|
|
const row = db.prepare(`
|
|
SELECT j.*, mi.id as mi_id, mi.name, mi.type, mi.series_name, mi.season_number,
|
|
mi.episode_number, mi.file_path,
|
|
n.id as n_id, n.name as node_name, n.host, n.port, n.username,
|
|
n.private_key, n.ffmpeg_path, n.work_dir, n.status as node_status
|
|
FROM jobs j
|
|
LEFT JOIN media_items mi ON mi.id = j.item_id
|
|
LEFT JOIN nodes n ON n.id = j.node_id
|
|
WHERE j.id = ?
|
|
`).get(jobId) as (Job & {
|
|
mi_id: number | null; name: string | null; type: string | null;
|
|
series_name: string | null; season_number: number | null; episode_number: number | null;
|
|
file_path: string | null; n_id: number | null; node_name: string | null;
|
|
host: string | null; port: number | null; username: string | null;
|
|
private_key: string | null; ffmpeg_path: string | null; work_dir: string | null; node_status: string | null;
|
|
}) | undefined;
|
|
|
|
if (!row) return null;
|
|
|
|
const nodes = db.prepare('SELECT * FROM nodes ORDER BY name').all() as Node[];
|
|
const item = row.name ? { id: row.item_id, name: row.name, type: row.type, series_name: row.series_name, season_number: row.season_number, episode_number: row.episode_number, file_path: row.file_path } as unknown as MediaItem : null;
|
|
const node = row.node_name ? { id: row.node_id!, name: row.node_name, host: row.host!, port: row.port!, username: row.username!, private_key: row.private_key!, ffmpeg_path: row.ffmpeg_path!, work_dir: row.work_dir!, status: row.node_status! } as unknown as Node : null;
|
|
|
|
return { job: row as unknown as Job, item, node, nodes };
|
|
}
|
|
|
|
// ─── List ─────────────────────────────────────────────────────────────────────
|
|
|
|
app.get('/', (c) => {
|
|
const db = getDb();
|
|
const jobRows = db.prepare(`
|
|
SELECT j.*, mi.name, mi.type, mi.series_name, mi.season_number, mi.episode_number, mi.file_path,
|
|
n.name as node_name, n.host, n.port, n.username, n.private_key, n.ffmpeg_path, n.work_dir, n.status as node_status
|
|
FROM jobs j
|
|
LEFT JOIN media_items mi ON mi.id = j.item_id
|
|
LEFT JOIN nodes n ON n.id = j.node_id
|
|
ORDER BY j.created_at DESC LIMIT 200
|
|
`).all() as (Job & { name: string; type: string; series_name: string | null; season_number: number | null; episode_number: number | null; file_path: string; node_name: string | null; host: string | null; port: number | null; username: string | null; private_key: string | null; ffmpeg_path: string | null; work_dir: string | null; node_status: string | null; })[];
|
|
|
|
const jobs = jobRows.map((r) => ({
|
|
job: r as unknown as Job,
|
|
item: r.name ? { id: r.item_id, name: r.name, type: r.type, series_name: r.series_name, season_number: r.season_number, episode_number: r.episode_number, file_path: r.file_path } as unknown as MediaItem : null,
|
|
node: r.node_name ? { id: r.node_id!, name: r.node_name, host: r.host!, port: r.port!, username: r.username!, private_key: r.private_key!, ffmpeg_path: r.ffmpeg_path!, work_dir: r.work_dir!, status: r.node_status! } as unknown as Node : null,
|
|
}));
|
|
|
|
const nodes = db.prepare('SELECT * FROM nodes ORDER BY name').all() as Node[];
|
|
return c.json({ jobs, nodes });
|
|
});
|
|
|
|
// ─── Start all pending ────────────────────────────────────────────────────────
|
|
|
|
app.post('/start', (c) => {
|
|
const db = getDb();
|
|
const pending = db.prepare("SELECT * FROM jobs WHERE status = 'pending' ORDER BY created_at").all() as Job[];
|
|
// Group by target (local vs each node) — run sequentially within each group, parallel across groups
|
|
const groups = new Map<string, Job[]>();
|
|
for (const job of pending) {
|
|
const key = targetKey(job.node_id);
|
|
if (!groups.has(key)) groups.set(key, []);
|
|
groups.get(key)!.push(job);
|
|
}
|
|
for (const jobs of groups.values()) {
|
|
runSequential(jobs).catch((err) => logError('Queue failed:', err));
|
|
}
|
|
return c.json({ ok: true, started: pending.length });
|
|
});
|
|
|
|
// ─── Assign node ──────────────────────────────────────────────────────────────
|
|
|
|
app.post('/job/:id/assign', async (c) => {
|
|
const db = getDb();
|
|
const jobId = Number(c.req.param('id'));
|
|
const body = await c.req.json<{ node_id: number | null }>();
|
|
db.prepare('UPDATE jobs SET node_id = ? WHERE id = ?').run(body.node_id, jobId);
|
|
const result = loadJobRow(jobId);
|
|
if (!result) return c.notFound();
|
|
return c.json(result);
|
|
});
|
|
|
|
// ─── Run single ───────────────────────────────────────────────────────────────
|
|
|
|
app.post('/job/:id/run', async (c) => {
|
|
const db = getDb();
|
|
const jobId = Number(c.req.param('id'));
|
|
const job = db.prepare('SELECT * FROM jobs WHERE id = ?').get(jobId) as Job | undefined;
|
|
if (!job || job.status !== 'pending') {
|
|
const result = loadJobRow(jobId);
|
|
if (!result) return c.notFound();
|
|
return c.json(result);
|
|
}
|
|
runJob(job).catch((err) => logError(`Job ${job.id} failed:`, err));
|
|
const result = loadJobRow(jobId);
|
|
if (!result) return c.notFound();
|
|
return c.json(result);
|
|
});
|
|
|
|
// ─── Cancel ───────────────────────────────────────────────────────────────────
|
|
|
|
app.post('/job/:id/cancel', (c) => {
|
|
const db = getDb();
|
|
const jobId = Number(c.req.param('id'));
|
|
db.prepare("DELETE FROM jobs WHERE id = ? AND status = 'pending'").run(jobId);
|
|
return c.json({ ok: true });
|
|
});
|
|
|
|
// ─── Clear queue ──────────────────────────────────────────────────────────────
|
|
|
|
app.post('/clear', (c) => {
|
|
const db = getDb();
|
|
// Revert review plans for pending jobs back to 'pending' so they can be re-approved
|
|
db.prepare(`
|
|
UPDATE review_plans SET status = 'pending', reviewed_at = NULL
|
|
WHERE item_id IN (SELECT item_id FROM jobs WHERE status = 'pending')
|
|
AND status = 'approved'
|
|
`).run();
|
|
const result = db.prepare("DELETE FROM jobs WHERE status = 'pending'").run();
|
|
return c.json({ ok: true, cleared: result.changes });
|
|
});
|
|
|
|
app.post('/clear-completed', (c) => {
|
|
const db = getDb();
|
|
const result = db.prepare("DELETE FROM jobs WHERE status IN ('done', 'error')").run();
|
|
return c.json({ ok: true, cleared: result.changes });
|
|
});
|
|
|
|
// ─── SSE ──────────────────────────────────────────────────────────────────────
|
|
|
|
app.get('/events', (c) => {
|
|
return stream(c, async (s) => {
|
|
c.header('Content-Type', 'text/event-stream');
|
|
c.header('Cache-Control', 'no-cache');
|
|
|
|
const queue: string[] = [];
|
|
let resolve: (() => void) | null = null;
|
|
const listener = (data: string) => { queue.push(data); resolve?.(); };
|
|
|
|
jobListeners.add(listener);
|
|
s.onAbort(() => { jobListeners.delete(listener); });
|
|
|
|
try {
|
|
while (!s.closed) {
|
|
if (queue.length > 0) {
|
|
await s.write(queue.shift()!);
|
|
} else {
|
|
await new Promise<void>((res) => { resolve = res; setTimeout(res, 15_000); });
|
|
resolve = null;
|
|
if (queue.length === 0) await s.write(': keepalive\n\n');
|
|
}
|
|
}
|
|
} finally {
|
|
jobListeners.delete(listener);
|
|
}
|
|
});
|
|
});
|
|
|
|
// ─── Job execution ────────────────────────────────────────────────────────────
|
|
|
|
async function runJob(job: Job): Promise<void> {
|
|
log(`Job ${job.id} starting (item=${job.item_id}${job.node_id ? `, node=${job.node_id}` : ', local'})`);
|
|
const db = getDb();
|
|
|
|
if (!job.node_id) {
|
|
const itemRow = db.prepare('SELECT file_path FROM media_items WHERE id = ?').get(job.item_id) as { file_path: string } | undefined;
|
|
if (itemRow?.file_path) {
|
|
const mappedPath = applyPathMappings(itemRow.file_path);
|
|
try { accessSync(mappedPath, constants.R_OK | constants.W_OK); } catch (fsErr) {
|
|
const msg = `File not accessible: ${mappedPath}\n${(fsErr as Error).message}`;
|
|
db.prepare("UPDATE jobs SET status = 'error', output = ?, exit_code = 1, completed_at = datetime('now') WHERE id = ?").run(msg, job.id);
|
|
emitJobUpdate(job.id, 'error', msg);
|
|
db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
db.prepare("UPDATE jobs SET status = 'running', started_at = datetime('now'), output = '' WHERE id = ?").run(job.id);
|
|
emitJobUpdate(job.id, 'running');
|
|
|
|
let outputLines: string[] = [];
|
|
const flush = (final = false) => {
|
|
const text = outputLines.join('\n');
|
|
if (final || outputLines.length % 10 === 0) db.prepare('UPDATE jobs SET output = ? WHERE id = ?').run(text, job.id);
|
|
emitJobUpdate(job.id, 'running', text);
|
|
};
|
|
|
|
try {
|
|
if (job.node_id) {
|
|
const node = db.prepare('SELECT * FROM nodes WHERE id = ?').get(job.node_id) as Node | undefined;
|
|
if (!node) throw new Error(`Node ${job.node_id} not found`);
|
|
// Translate container paths to node-specific mount paths
|
|
let cmd = job.command;
|
|
if (node.movies_path) cmd = cmd.replaceAll('/movies/', node.movies_path.replace(/\/$/, '') + '/');
|
|
if (node.series_path) cmd = cmd.replaceAll('/series/', node.series_path.replace(/\/$/, '') + '/');
|
|
for await (const line of execStream(node, cmd)) { outputLines.push(line); flush(); }
|
|
} else {
|
|
const mappedCmd = applyPathMappingsToCommand(job.command);
|
|
const proc = Bun.spawn(['sh', '-c', mappedCmd], { stdout: 'pipe', stderr: 'pipe' });
|
|
const readStream = async (readable: ReadableStream<Uint8Array>, prefix = '') => {
|
|
const reader = readable.getReader();
|
|
const decoder = new TextDecoder();
|
|
try {
|
|
while (true) {
|
|
const { done, value } = await reader.read();
|
|
if (done) break;
|
|
const text = decoder.decode(value);
|
|
const lines = text.split('\n').filter((l) => l.trim());
|
|
for (const line of lines) outputLines.push(prefix + line);
|
|
flush();
|
|
}
|
|
} catch { /* ignore */ }
|
|
};
|
|
await Promise.all([readStream(proc.stdout), readStream(proc.stderr, '[stderr] '), proc.exited]);
|
|
const exitCode = await proc.exited;
|
|
if (exitCode !== 0) throw new Error(`FFmpeg exited with code ${exitCode}`);
|
|
}
|
|
|
|
const fullOutput = outputLines.join('\n');
|
|
db.prepare("UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?").run(fullOutput, job.id);
|
|
log(`Job ${job.id} completed successfully`);
|
|
emitJobUpdate(job.id, 'done', fullOutput);
|
|
db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?").run(job.item_id);
|
|
|
|
// Populate subtitle_files table with extracted sidecar files
|
|
try {
|
|
const item = db.prepare('SELECT * FROM media_items WHERE id = ?').get(job.item_id) as MediaItem | undefined;
|
|
const streams = db.prepare('SELECT * FROM media_streams WHERE item_id = ?').all(job.item_id) as MediaStream[];
|
|
if (item && streams.length > 0) {
|
|
const files = predictExtractedFiles(item, streams);
|
|
const insertFile = db.prepare('INSERT OR IGNORE INTO subtitle_files (item_id, file_path, language, codec, is_forced, is_hearing_impaired) VALUES (?, ?, ?, ?, ?, ?)');
|
|
for (const f of files) {
|
|
insertFile.run(job.item_id, f.file_path, f.language, f.codec, f.is_forced ? 1 : 0, f.is_hearing_impaired ? 1 : 0);
|
|
}
|
|
db.prepare('UPDATE review_plans SET subs_extracted = 1 WHERE item_id = ?').run(job.item_id);
|
|
}
|
|
} catch (subErr) { logError('Failed to record extracted subtitle files:', subErr); }
|
|
} catch (err) {
|
|
logError(`Job ${job.id} failed:`, err);
|
|
const fullOutput = outputLines.join('\n') + '\n' + String(err);
|
|
db.prepare("UPDATE jobs SET status = 'error', exit_code = 1, output = ?, completed_at = datetime('now') WHERE id = ?").run(fullOutput, job.id);
|
|
emitJobUpdate(job.id, 'error', fullOutput);
|
|
db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id);
|
|
}
|
|
}
|
|
|
|
export default app;
|