diff --git a/package.json b/package.json index 8573139..0ebc495 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "netfelix-audio-fix", - "version": "2026.03.27", + "version": "2026.04.13", "scripts": { "dev:server": "NODE_ENV=development bun --hot server/index.tsx", "dev:client": "vite", diff --git a/server/api/execute.ts b/server/api/execute.ts index 7238619..e9d7d1d 100644 --- a/server/api/execute.ts +++ b/server/api/execute.ts @@ -1,8 +1,7 @@ import { Hono } from 'hono'; import { stream } from 'hono/streaming'; import { getDb } from '../db/index'; -import { execStream } from '../services/ssh'; -import type { Job, Node, MediaItem, MediaStream } from '../types'; +import type { Job, MediaItem, MediaStream } from '../types'; import { predictExtractedFiles } from '../services/ffmpeg'; import { accessSync, constants } from 'node:fs'; import { log, error as logError } from '../lib/log'; @@ -10,29 +9,29 @@ import { getSchedulerState, updateSchedulerState } from '../services/scheduler'; const app = new Hono(); -// ─── Sequential queue per target ───────────────────────────────────────────── +// ─── Sequential local queue ────────────────────────────────────────────────── -const runningTargets = new Set(); +let queueRunning = false; -function targetKey(nodeId: number | null): string { - return nodeId ? `node-${nodeId}` : 'local'; -} - -/** Run a list of jobs sequentially on the same target. */ async function runSequential(jobs: Job[]): Promise { - const key = targetKey(jobs[0]?.node_id ?? null); - if (runningTargets.has(key)) return; // already processing this target - runningTargets.add(key); + if (queueRunning) return; + queueRunning = true; try { for (const job of jobs) { - // Re-check status — job may have been cancelled while queued + // Atomic claim: only pick up jobs still pending const db = getDb(); - const fresh = db.prepare('SELECT status FROM jobs WHERE id = ?').get(job.id) as { status: string } | undefined; - if (!fresh || fresh.status !== 'pending') continue; - try { await runJob(job); } catch (err) { logError(`Job ${job.id} failed:`, err); } + const claimed = db + .prepare("UPDATE jobs SET status = 'running', started_at = datetime('now'), output = '' WHERE id = ? AND status = 'pending'") + .run(job.id); + if (claimed.changes === 0) continue; // cancelled or already running + try { + await runJob(job); + } catch (err) { + logError(`Job ${job.id} failed:`, err); + } } } finally { - runningTargets.delete(key); + queueRunning = false; } } @@ -49,28 +48,20 @@ function loadJobRow(jobId: number) { const db = getDb(); const row = db.prepare(` SELECT j.*, mi.id as mi_id, mi.name, mi.type, mi.series_name, mi.season_number, - mi.episode_number, mi.file_path, - n.id as n_id, n.name as node_name, n.host, n.port, n.username, - n.private_key, n.ffmpeg_path, n.work_dir, n.status as node_status + mi.episode_number, mi.file_path FROM jobs j LEFT JOIN media_items mi ON mi.id = j.item_id - LEFT JOIN nodes n ON n.id = j.node_id WHERE j.id = ? `).get(jobId) as (Job & { mi_id: number | null; name: string | null; type: string | null; series_name: string | null; season_number: number | null; episode_number: number | null; - file_path: string | null; n_id: number | null; node_name: string | null; - host: string | null; port: number | null; username: string | null; - private_key: string | null; ffmpeg_path: string | null; work_dir: string | null; node_status: string | null; + file_path: string | null; }) | undefined; if (!row) return null; - const nodes = db.prepare('SELECT * FROM nodes ORDER BY name').all() as Node[]; const item = row.name ? { id: row.item_id, name: row.name, type: row.type, series_name: row.series_name, season_number: row.season_number, episode_number: row.episode_number, file_path: row.file_path } as unknown as MediaItem : null; - const node = row.node_name ? { id: row.node_id!, name: row.node_name, host: row.host!, port: row.port!, username: row.username!, private_key: row.private_key!, ffmpeg_path: row.ffmpeg_path!, work_dir: row.work_dir!, status: row.node_status! } as unknown as Node : null; - - return { job: row as unknown as Job, item, node, nodes }; + return { job: row as unknown as Job, item }; } // ─── List ───────────────────────────────────────────────────────────────────── @@ -84,23 +75,19 @@ app.get('/', (c) => { const params = whereClause ? [filter] : []; const jobRows = db.prepare(` - SELECT j.*, mi.name, mi.type, mi.series_name, mi.season_number, mi.episode_number, mi.file_path, - n.name as node_name, n.host, n.port, n.username, n.private_key, n.ffmpeg_path, n.work_dir, n.status as node_status + SELECT j.*, mi.name, mi.type, mi.series_name, mi.season_number, mi.episode_number, mi.file_path FROM jobs j LEFT JOIN media_items mi ON mi.id = j.item_id - LEFT JOIN nodes n ON n.id = j.node_id ${whereClause} ORDER BY j.created_at DESC LIMIT 200 - `).all(...params) as (Job & { name: string; type: string; series_name: string | null; season_number: number | null; episode_number: number | null; file_path: string; node_name: string | null; host: string | null; port: number | null; username: string | null; private_key: string | null; ffmpeg_path: string | null; work_dir: string | null; node_status: string | null; })[]; + `).all(...params) as (Job & { name: string; type: string; series_name: string | null; season_number: number | null; episode_number: number | null; file_path: string })[]; const jobs = jobRows.map((r) => ({ job: r as unknown as Job, item: r.name ? { id: r.item_id, name: r.name, type: r.type, series_name: r.series_name, season_number: r.season_number, episode_number: r.episode_number, file_path: r.file_path } as unknown as MediaItem : null, - node: r.node_name ? { id: r.node_id!, name: r.node_name, host: r.host!, port: r.port!, username: r.username!, private_key: r.private_key!, ffmpeg_path: r.ffmpeg_path!, work_dir: r.work_dir!, status: r.node_status! } as unknown as Node : null, })); - // Count totals by status const countRows = db.prepare('SELECT status, COUNT(*) as cnt FROM jobs GROUP BY status').all() as { status: string; cnt: number }[]; const totalCounts: Record = { all: 0, pending: 0, running: 0, done: 0, error: 0 }; for (const row of countRows) { @@ -108,52 +95,40 @@ app.get('/', (c) => { totalCounts.all += row.cnt; } - const nodes = db.prepare('SELECT * FROM nodes ORDER BY name').all() as Node[]; - return c.json({ jobs, nodes, filter, totalCounts }); + return c.json({ jobs, filter, totalCounts }); }); +// ─── Param helpers ──────────────────────────────────────────────────────────── + +function parseId(raw: string | undefined): number | null { + if (!raw) return null; + const n = Number.parseInt(raw, 10); + return Number.isFinite(n) && n > 0 ? n : null; +} + // ─── Start all pending ──────────────────────────────────────────────────────── app.post('/start', (c) => { const db = getDb(); const pending = db.prepare("SELECT * FROM jobs WHERE status = 'pending' ORDER BY created_at").all() as Job[]; - // Group by target (local vs each node) — run sequentially within each group, parallel across groups - const groups = new Map(); - for (const job of pending) { - const key = targetKey(job.node_id); - if (!groups.has(key)) groups.set(key, []); - groups.get(key)!.push(job); - } - for (const jobs of groups.values()) { - runSequential(jobs).catch((err) => logError('Queue failed:', err)); - } + runSequential(pending).catch((err) => logError('Queue failed:', err)); return c.json({ ok: true, started: pending.length }); }); -// ─── Assign node ────────────────────────────────────────────────────────────── - -app.post('/job/:id/assign', async (c) => { - const db = getDb(); - const jobId = Number(c.req.param('id')); - const body = await c.req.json<{ node_id: number | null }>(); - db.prepare('UPDATE jobs SET node_id = ? WHERE id = ?').run(body.node_id, jobId); - const result = loadJobRow(jobId); - if (!result) return c.notFound(); - return c.json(result); -}); - // ─── Run single ─────────────────────────────────────────────────────────────── app.post('/job/:id/run', async (c) => { + const jobId = parseId(c.req.param('id')); + if (jobId == null) return c.json({ error: 'invalid job id' }, 400); const db = getDb(); - const jobId = Number(c.req.param('id')); const job = db.prepare('SELECT * FROM jobs WHERE id = ?').get(jobId) as Job | undefined; - if (!job || job.status !== 'pending') { + if (!job) return c.notFound(); + if (job.status !== 'pending') { const result = loadJobRow(jobId); if (!result) return c.notFound(); return c.json(result); } - runJob(job).catch((err) => logError(`Job ${job.id} failed:`, err)); + runSequential([job]).catch((err) => logError(`Job ${job.id} failed:`, err)); const result = loadJobRow(jobId); if (!result) return c.notFound(); return c.json(result); @@ -162,8 +137,9 @@ app.post('/job/:id/run', async (c) => { // ─── Cancel ─────────────────────────────────────────────────────────────────── app.post('/job/:id/cancel', (c) => { + const jobId = parseId(c.req.param('id')); + if (jobId == null) return c.json({ error: 'invalid job id' }, 400); const db = getDb(); - const jobId = Number(c.req.param('id')); db.prepare("DELETE FROM jobs WHERE id = ? AND status = 'pending'").run(jobId); return c.json({ ok: true }); }); @@ -172,7 +148,6 @@ app.post('/job/:id/cancel', (c) => { app.post('/clear', (c) => { const db = getDb(); - // Revert review plans for pending jobs back to 'pending' so they can be re-approved db.prepare(` UPDATE review_plans SET status = 'pending', reviewed_at = NULL WHERE item_id IN (SELECT item_id FROM jobs WHERE status = 'pending') @@ -221,82 +196,94 @@ app.get('/events', (c) => { // ─── Job execution ──────────────────────────────────────────────────────────── async function runJob(job: Job): Promise { - log(`Job ${job.id} starting (item=${job.item_id}${job.node_id ? `, node=${job.node_id}` : ', local'})`); + log(`Job ${job.id} starting (item=${job.item_id})`); log(`Job ${job.id} command: ${job.command}`); const db = getDb(); - if (!job.node_id) { - const itemRow = db.prepare('SELECT file_path FROM media_items WHERE id = ?').get(job.item_id) as { file_path: string } | undefined; - if (itemRow?.file_path) { - try { accessSync(itemRow.file_path, constants.R_OK | constants.W_OK); } catch (fsErr) { - const msg = `File not accessible: ${itemRow.file_path}\n${(fsErr as Error).message}`; - db.prepare("UPDATE jobs SET status = 'error', output = ?, exit_code = 1, completed_at = datetime('now') WHERE id = ?").run(msg, job.id); - emitJobUpdate(job.id, 'error', msg); - db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id); - return; - } + const itemRow = db.prepare('SELECT file_path FROM media_items WHERE id = ?').get(job.item_id) as { file_path: string } | undefined; + if (itemRow?.file_path) { + try { + accessSync(itemRow.file_path, constants.R_OK | constants.W_OK); + } catch (fsErr) { + const msg = `File not accessible: ${itemRow.file_path}\n${(fsErr as Error).message}`; + db.prepare("UPDATE jobs SET status = 'error', output = ?, exit_code = 1, completed_at = datetime('now') WHERE id = ?").run(msg, job.id); + emitJobUpdate(job.id, 'error', msg); + db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id); + return; } } - db.prepare("UPDATE jobs SET status = 'running', started_at = datetime('now'), output = '' WHERE id = ?").run(job.id); emitJobUpdate(job.id, 'running'); - let outputLines: string[] = []; + const outputLines: string[] = []; + let pendingFlush = false; + let lastFlushAt = 0; + const updateOutput = db.prepare('UPDATE jobs SET output = ? WHERE id = ?'); + const flush = (final = false) => { const text = outputLines.join('\n'); - if (final || outputLines.length % 10 === 0) db.prepare('UPDATE jobs SET output = ? WHERE id = ?').run(text, job.id); + const now = Date.now(); + if (final || now - lastFlushAt > 500) { + updateOutput.run(text, job.id); + lastFlushAt = now; + pendingFlush = false; + } else { + pendingFlush = true; + } emitJobUpdate(job.id, 'running', text); }; try { - if (job.node_id) { - const node = db.prepare('SELECT * FROM nodes WHERE id = ?').get(job.node_id) as Node | undefined; - if (!node) throw new Error(`Node ${job.node_id} not found`); - // Translate container paths to node-specific mount paths - let cmd = job.command; - if (node.movies_path) cmd = cmd.replaceAll('/movies/', node.movies_path.replace(/\/$/, '') + '/'); - if (node.series_path) cmd = cmd.replaceAll('/series/', node.series_path.replace(/\/$/, '') + '/'); - for await (const line of execStream(node, cmd)) { outputLines.push(line); flush(); } - } else { - const proc = Bun.spawn(['sh', '-c', job.command], { stdout: 'pipe', stderr: 'pipe' }); - const readStream = async (readable: ReadableStream, prefix = '') => { - const reader = readable.getReader(); - const decoder = new TextDecoder(); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - const text = decoder.decode(value); - const lines = text.split('\n').filter((l) => l.trim()); - for (const line of lines) outputLines.push(prefix + line); - flush(); + const proc = Bun.spawn(['sh', '-c', job.command], { stdout: 'pipe', stderr: 'pipe' }); + const readStream = async (readable: ReadableStream, prefix = '') => { + const reader = readable.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + const parts = buffer.split(/\r\n|\n|\r/); + buffer = parts.pop() ?? ''; + for (const line of parts) { + if (line.trim()) outputLines.push(prefix + line); } - } catch { /* ignore */ } - }; - await Promise.all([readStream(proc.stdout), readStream(proc.stderr, '[stderr] '), proc.exited]); - const exitCode = await proc.exited; - if (exitCode !== 0) throw new Error(`FFmpeg exited with code ${exitCode}`); - } + flush(); + } + if (buffer.trim()) outputLines.push(prefix + buffer); + } catch (err) { + logError(`stream read error (${prefix.trim() || 'stdout'}):`, err); + } + }; + await Promise.all([readStream(proc.stdout), readStream(proc.stderr, '[stderr] '), proc.exited]); + const exitCode = await proc.exited; + if (pendingFlush) updateOutput.run(outputLines.join('\n'), job.id); + if (exitCode !== 0) throw new Error(`FFmpeg exited with code ${exitCode}`); const fullOutput = outputLines.join('\n'); - db.prepare("UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?").run(fullOutput, job.id); + + // Gather sidecar files to record + const item = db.prepare('SELECT * FROM media_items WHERE id = ?').get(job.item_id) as MediaItem | undefined; + const streams = db.prepare('SELECT * FROM media_streams WHERE item_id = ?').all(job.item_id) as MediaStream[]; + const files = item && streams.length > 0 ? predictExtractedFiles(item, streams) : []; + + const insertFile = db.prepare('INSERT OR IGNORE INTO subtitle_files (item_id, file_path, language, codec, is_forced, is_hearing_impaired) VALUES (?, ?, ?, ?, ?, ?)'); + const markJobDone = db.prepare("UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?"); + const markPlanDone = db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?"); + const markSubsExtracted = db.prepare('UPDATE review_plans SET subs_extracted = 1 WHERE item_id = ?'); + + db.transaction(() => { + markJobDone.run(fullOutput, job.id); + markPlanDone.run(job.item_id); + for (const f of files) { + insertFile.run(job.item_id, f.file_path, f.language, f.codec, f.is_forced ? 1 : 0, f.is_hearing_impaired ? 1 : 0); + } + if (files.length > 0) markSubsExtracted.run(job.item_id); + })(); + log(`Job ${job.id} completed successfully`); emitJobUpdate(job.id, 'done', fullOutput); - db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?").run(job.item_id); - - // Populate subtitle_files table with extracted sidecar files - try { - const item = db.prepare('SELECT * FROM media_items WHERE id = ?').get(job.item_id) as MediaItem | undefined; - const streams = db.prepare('SELECT * FROM media_streams WHERE item_id = ?').all(job.item_id) as MediaStream[]; - if (item && streams.length > 0) { - const files = predictExtractedFiles(item, streams); - const insertFile = db.prepare('INSERT OR IGNORE INTO subtitle_files (item_id, file_path, language, codec, is_forced, is_hearing_impaired) VALUES (?, ?, ?, ?, ?, ?)'); - for (const f of files) { - insertFile.run(job.item_id, f.file_path, f.language, f.codec, f.is_forced ? 1 : 0, f.is_hearing_impaired ? 1 : 0); - } - db.prepare('UPDATE review_plans SET subs_extracted = 1 WHERE item_id = ?').run(job.item_id); - } - } catch (subErr) { logError('Failed to record extracted subtitle files:', subErr); } } catch (err) { logError(`Job ${job.id} failed:`, err); const fullOutput = outputLines.join('\n') + '\n' + String(err); @@ -308,12 +295,10 @@ async function runJob(job: Job): Promise { // ─── Scheduler ──────────────────────────────────────────────────────────────── -// GET /scheduler — current scheduler state app.get('/scheduler', (c) => { return c.json(getSchedulerState()); }); -// PATCH /scheduler — update scheduler settings app.patch('/scheduler', async (c) => { const body = await c.req.json(); updateSchedulerState(body); diff --git a/server/api/nodes.ts b/server/api/nodes.ts deleted file mode 100644 index c3ce0a0..0000000 --- a/server/api/nodes.ts +++ /dev/null @@ -1,78 +0,0 @@ -import { Hono } from 'hono'; -import { getDb } from '../db/index'; -import { testConnection } from '../services/ssh'; -import type { Node } from '../types'; - -const app = new Hono(); - -app.get('/', (c) => { - const db = getDb(); - const nodes = db.prepare('SELECT * FROM nodes ORDER BY name').all() as Node[]; - return c.json({ nodes }); -}); - -app.post('/', async (c) => { - const db = getDb(); - const contentType = c.req.header('Content-Type') ?? ''; - let name: string, host: string, port: number, username: string, ffmpegPath: string, workDir: string, privateKey: string, moviesPath: string, seriesPath: string; - - // Support both multipart (file upload) and JSON - if (contentType.includes('multipart/form-data')) { - const body = await c.req.formData(); - name = body.get('name') as string; - host = body.get('host') as string; - port = Number(body.get('port') ?? '22'); - username = body.get('username') as string; - ffmpegPath = (body.get('ffmpeg_path') as string) || 'ffmpeg'; - workDir = (body.get('work_dir') as string) || '/tmp'; - moviesPath = (body.get('movies_path') as string) || ''; - seriesPath = (body.get('series_path') as string) || ''; - const keyFile = body.get('private_key') as File | null; - if (!name || !host || !username || !keyFile) return c.json({ ok: false, error: 'All fields are required' }, 400); - privateKey = await keyFile.text(); - } else { - const body = await c.req.json<{ name: string; host: string; port?: number; username: string; ffmpeg_path?: string; work_dir?: string; movies_path?: string; series_path?: string; private_key: string }>(); - name = body.name; host = body.host; port = body.port ?? 22; username = body.username; - ffmpegPath = body.ffmpeg_path || 'ffmpeg'; workDir = body.work_dir || '/tmp'; - moviesPath = body.movies_path || ''; seriesPath = body.series_path || ''; - privateKey = body.private_key; - if (!name || !host || !username || !privateKey) return c.json({ ok: false, error: 'All fields are required' }, 400); - } - - try { - db.prepare('INSERT INTO nodes (name, host, port, username, private_key, ffmpeg_path, work_dir, movies_path, series_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)') - .run(name, host, port, username, privateKey, ffmpegPath, workDir, moviesPath, seriesPath); - } catch (e) { - if (String(e).includes('UNIQUE')) return c.json({ ok: false, error: `A node named "${name}" already exists` }, 409); - throw e; - } - - const nodes = db.prepare('SELECT * FROM nodes ORDER BY name').all() as Node[]; - return c.json({ ok: true, nodes }); -}); - -app.delete('/:id', (c) => { - const db = getDb(); - db.prepare('DELETE FROM nodes WHERE id = ?').run(Number(c.req.param('id'))); - return c.json({ ok: true }); -}); - -// Legacy POST delete for HTML-form compat (may be removed later) -app.post('/:id/delete', (c) => { - const db = getDb(); - db.prepare('DELETE FROM nodes WHERE id = ?').run(Number(c.req.param('id'))); - return c.json({ ok: true }); -}); - -app.post('/:id/test', async (c) => { - const db = getDb(); - const id = Number(c.req.param('id')); - const node = db.prepare('SELECT * FROM nodes WHERE id = ?').get(id) as Node | undefined; - if (!node) return c.notFound(); - const result = await testConnection(node); - const status = result.ok ? 'ok' : `error: ${result.error}`; - db.prepare("UPDATE nodes SET status = ?, last_checked_at = datetime('now') WHERE id = ?").run(status, id); - return c.json({ ok: result.ok, status, error: result.error }); -}); - -export default app; diff --git a/server/db/index.ts b/server/db/index.ts index b2abb0b..86ffa53 100644 --- a/server/db/index.ts +++ b/server/db/index.ts @@ -54,8 +54,6 @@ export function getDb(): Database { // Migrations for columns added after initial release try { _db.exec('ALTER TABLE stream_decisions ADD COLUMN custom_title TEXT'); } catch { /* already exists */ } try { _db.exec('ALTER TABLE review_plans ADD COLUMN subs_extracted INTEGER NOT NULL DEFAULT 0'); } catch { /* already exists */ } - try { _db.exec("ALTER TABLE nodes ADD COLUMN movies_path TEXT NOT NULL DEFAULT ''"); } catch { /* already exists */ } - try { _db.exec("ALTER TABLE nodes ADD COLUMN series_path TEXT NOT NULL DEFAULT ''"); } catch { /* already exists */ } try { _db.exec("ALTER TABLE jobs ADD COLUMN job_type TEXT NOT NULL DEFAULT 'audio'"); } catch { /* already exists */ } // Apple compat pipeline columns try { _db.exec("ALTER TABLE review_plans ADD COLUMN confidence TEXT NOT NULL DEFAULT 'low'"); } catch { /* already exists */ } diff --git a/server/db/schema.ts b/server/db/schema.ts index 73fdf80..803b8f0 100644 --- a/server/db/schema.ts +++ b/server/db/schema.ts @@ -7,22 +7,6 @@ CREATE TABLE IF NOT EXISTS config ( value TEXT ); -CREATE TABLE IF NOT EXISTS nodes ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL UNIQUE, - host TEXT NOT NULL, - port INTEGER NOT NULL DEFAULT 22, - username TEXT NOT NULL, - private_key TEXT NOT NULL, - ffmpeg_path TEXT NOT NULL DEFAULT 'ffmpeg', - work_dir TEXT NOT NULL DEFAULT '/tmp', - movies_path TEXT NOT NULL DEFAULT '', - series_path TEXT NOT NULL DEFAULT '', - status TEXT NOT NULL DEFAULT 'unknown', - last_checked_at TEXT, - created_at TEXT NOT NULL DEFAULT (datetime('now')) -); - CREATE TABLE IF NOT EXISTS media_items ( id INTEGER PRIMARY KEY AUTOINCREMENT, jellyfin_id TEXT NOT NULL UNIQUE, @@ -104,7 +88,6 @@ CREATE TABLE IF NOT EXISTS jobs ( item_id INTEGER NOT NULL REFERENCES media_items(id) ON DELETE CASCADE, command TEXT NOT NULL, job_type TEXT NOT NULL DEFAULT 'audio', - node_id INTEGER REFERENCES nodes(id) ON DELETE SET NULL, status TEXT NOT NULL DEFAULT 'pending', output TEXT, exit_code INTEGER, diff --git a/server/index.tsx b/server/index.tsx index 23eb1a8..22e806f 100644 --- a/server/index.tsx +++ b/server/index.tsx @@ -8,7 +8,6 @@ import setupRoutes from './api/setup'; import scanRoutes from './api/scan'; import reviewRoutes from './api/review'; import executeRoutes from './api/execute'; -import nodesRoutes from './api/nodes'; import subtitlesRoutes from './api/subtitles'; import dashboardRoutes from './api/dashboard'; import pathsRoutes from './api/paths'; @@ -41,7 +40,6 @@ app.route('/api/scan', scanRoutes); app.route('/api/review', reviewRoutes); app.route('/api/execute', executeRoutes); app.route('/api/subtitles', subtitlesRoutes); -app.route('/api/nodes', nodesRoutes); app.route('/api/paths', pathsRoutes); // ─── Static assets (production: serve Vite build) ──────────────────────────── diff --git a/server/services/ssh.ts b/server/services/ssh.ts deleted file mode 100644 index e284b71..0000000 --- a/server/services/ssh.ts +++ /dev/null @@ -1,163 +0,0 @@ -import { Client } from 'ssh2'; -import type { Node } from '../types'; - -export interface ExecResult { - exitCode: number; - output: string; -} - -/** Test SSH connectivity to a node. Returns ok + optional error message. */ -export function testConnection(node: Node): Promise<{ ok: boolean; error?: string }> { - return new Promise((resolve) => { - const conn = new Client(); - const timeout = setTimeout(() => { - conn.destroy(); - resolve({ ok: false, error: 'Connection timed out' }); - }, 10_000); - - conn.on('ready', () => { - clearTimeout(timeout); - conn.exec('echo ok', (err, stream) => { - if (err) { - conn.end(); - resolve({ ok: false, error: err.message }); - return; - } - stream.on('close', () => { - conn.end(); - resolve({ ok: true }); - }); - }); - }); - - conn.on('error', (err) => { - clearTimeout(timeout); - resolve({ ok: false, error: err.message }); - }); - - conn.connect(buildConnectConfig(node)); - }); -} - -/** - * Execute a command on a remote node and stream output lines. - * Yields lines as they arrive. Throws if connection fails. - */ -export async function* execStream( - node: Node, - command: string -): AsyncGenerator { - // Collect lines via a promise-based queue - const queue: string[] = []; - const resolvers: Array<(value: IteratorResult) => void> = []; - let done = false; - let errorVal: Error | null = null; - - const push = (line: string) => { - if (resolvers.length > 0) { - resolvers.shift()!({ value: line, done: false }); - } else { - queue.push(line); - } - }; - - const finish = (err?: Error) => { - done = true; - errorVal = err ?? null; - while (resolvers.length > 0) { - resolvers.shift()!({ value: undefined as unknown as string, done: true }); - } - }; - - const conn = new Client(); - - conn.on('ready', () => { - conn.exec(command, { pty: false }, (err, stream) => { - if (err) { - conn.end(); - finish(err); - return; - } - - stream.stdout.on('data', (data: Buffer) => { - const lines = data.toString('utf8').split('\n'); - for (const line of lines) { - if (line) push(line); - } - }); - - stream.stderr.on('data', (data: Buffer) => { - const lines = data.toString('utf8').split('\n'); - for (const line of lines) { - if (line) push(`[stderr] ${line}`); - } - }); - - stream.on('close', (code: number) => { - if (code !== 0) { - push(`[exit code ${code}]`); - } - conn.end(); - finish(); - }); - }); - }); - - conn.on('error', (err) => finish(err)); - conn.connect(buildConnectConfig(node)); - - // Yield from the queue - while (true) { - if (queue.length > 0) { - yield queue.shift()!; - } else if (done) { - if (errorVal) throw errorVal; - return; - } else { - await new Promise>((resolve) => { - resolvers.push(resolve); - }); - } - } -} - -/** - * Execute a command on a remote node and return full output + exit code. - * For use when streaming isn't needed. - */ -export function execOnce(node: Node, command: string): Promise { - return new Promise((resolve, reject) => { - const conn = new Client(); - let output = ''; - - conn.on('ready', () => { - conn.exec(command, (err, stream) => { - if (err) { - conn.end(); - reject(err); - return; - } - - stream.stdout.on('data', (d: Buffer) => { output += d.toString(); }); - stream.stderr.on('data', (d: Buffer) => { output += d.toString(); }); - stream.on('close', (code: number) => { - conn.end(); - resolve({ exitCode: code ?? 0, output }); - }); - }); - }); - - conn.on('error', reject); - conn.connect(buildConnectConfig(node)); - }); -} - -function buildConnectConfig(node: Node): Parameters[0] { - return { - host: node.host, - port: node.port, - username: node.username, - privateKey: node.private_key, - readyTimeout: 10_000, - }; -} diff --git a/server/types.ts b/server/types.ts index 118f16a..1c072f0 100644 --- a/server/types.ts +++ b/server/types.ts @@ -84,7 +84,6 @@ export interface Job { item_id: number; command: string; job_type: 'copy' | 'transcode'; - node_id: number | null; status: 'pending' | 'running' | 'done' | 'error'; output: string | null; exit_code: number | null; @@ -93,22 +92,6 @@ export interface Job { completed_at: string | null; } -export interface Node { - id: number; - name: string; - host: string; - port: number; - username: string; - private_key: string; - ffmpeg_path: string; - work_dir: string; - movies_path: string; - series_path: string; - status: 'unknown' | 'ok' | 'error'; - last_checked_at: string | null; - created_at: string; -} - // ─── Analyzer types ─────────────────────────────────────────────────────────── export interface StreamWithDecision extends MediaStream { diff --git a/src/features/nodes/NodesPage.tsx b/src/features/nodes/NodesPage.tsx deleted file mode 100644 index 871ea9f..0000000 --- a/src/features/nodes/NodesPage.tsx +++ /dev/null @@ -1,158 +0,0 @@ -import { useEffect, useRef, useState } from 'react'; -import { api } from '~/shared/lib/api'; -import { Badge } from '~/shared/components/ui/badge'; -import { Button } from '~/shared/components/ui/button'; -import { Input } from '~/shared/components/ui/input'; -import { Alert } from '~/shared/components/ui/alert'; -import type { Node } from '~/shared/lib/types'; - -interface NodesData { nodes: Node[]; } - -function nodeStatusVariant(status: string): 'done' | 'error' | 'pending' { - if (status === 'ok') return 'done'; - if (status.startsWith('error')) return 'error'; - return 'pending'; -} - -export function NodesPage() { - const [nodes, setNodes] = useState([]); - const [error, setError] = useState(''); - const [testing, setTesting] = useState>(new Set()); - const fileRef = useRef(null); - - const load = () => api.get('/api/nodes').then((d) => setNodes(d.nodes)); - useEffect(() => { load(); }, []); - - const submit = async (e: React.FormEvent) => { - e.preventDefault(); - setError(''); - const form = e.currentTarget; - const fd = new FormData(form); - const result = await api.postForm('/api/nodes', fd).catch((err) => { setError(String(err)); return null; }); - if (result) { setNodes(result.nodes); form.reset(); if (fileRef.current) fileRef.current.value = ''; } - }; - - const deleteNode = async (id: number) => { - if (!confirm('Remove node?')) return; - await api.post(`/api/nodes/${id}/delete`); - load(); - }; - - const testNode = async (id: number) => { - setTesting((s) => { const n = new Set(s); n.add(id); return n; }); - await api.post<{ ok: boolean; status: string }>(`/api/nodes/${id}/test`); - setTesting((s) => { const n = new Set(s); n.delete(id); return n; }); - load(); - }; - - return ( -
-
-

Remote Nodes

-
- -

- Remote nodes run FFmpeg over SSH. If the media is mounted at a different path on the - remote node, set the Movies/Series path fields to translate /movies/ and /series/ to the node's mount points. -

- - {/* Add form */} -
-
Add Node
- {error && {error}} -
-
- - - - - - - - -
- - -
-
- - {/* Node list */} - {nodes.length === 0 ? ( -

No nodes configured. Add one above.

- ) : ( -
- - - {['Name', 'Host', 'Port', 'User', 'FFmpeg', 'Movies', 'Series', 'Status', 'Actions'].map((h) => ( - - ))} - - - - {nodes.map((node) => ( - - - - - - - - - - - - ))} - -
{h}
{node.name}{node.host}{node.port}{node.username}{node.ffmpeg_path}{node.movies_path || '—'}{node.series_path || '—'} - {node.status} - -
- - -
-
- )} -
- ); -} - -import type React from 'react'; diff --git a/src/routes/__root.tsx b/src/routes/__root.tsx index c997c86..a37e2bd 100644 --- a/src/routes/__root.tsx +++ b/src/routes/__root.tsx @@ -56,7 +56,6 @@ function RootLayout() {
- Nodes Settings
diff --git a/src/routes/nodes.tsx b/src/routes/nodes.tsx deleted file mode 100644 index 818938a..0000000 --- a/src/routes/nodes.tsx +++ /dev/null @@ -1,6 +0,0 @@ -import { createFileRoute } from '@tanstack/react-router'; -import { NodesPage } from '~/features/nodes/NodesPage'; - -export const Route = createFileRoute('/nodes')({ - component: NodesPage, -}); diff --git a/src/shared/lib/types.ts b/src/shared/lib/types.ts index c99a28d..7646884 100644 --- a/src/shared/lib/types.ts +++ b/src/shared/lib/types.ts @@ -80,7 +80,6 @@ export interface StreamDecision { export interface Job { id: number; item_id: number; - node_id: number | null; command: string; job_type: 'copy' | 'transcode'; status: 'pending' | 'running' | 'done' | 'error'; @@ -90,18 +89,3 @@ export interface Job { started_at: string | null; completed_at: string | null; } - -export interface Node { - id: number; - name: string; - host: string; - port: number; - username: string; - private_key: string; - ffmpeg_path: string; - work_dir: string; - movies_path: string; - series_path: string; - status: string; - last_checked_at: string | null; -}