wire scheduler into queue, add retry, dev-reset cleanup, biome 2.4 migrate
- execute: actually call isInScheduleWindow/waitForWindow/sleepBetweenJobs in runSequential (they were dead code); emit queue_status SSE events (running/paused/sleeping/idle) so the pipeline's existing QueueStatus listener lights up - review: POST /:id/retry resets an errored plan to approved, wipes old done/error jobs, rebuilds command from current decisions, queues fresh job - scan: dev-mode DELETE now also wipes jobs + subtitle_files (previously orphaned after every dev reset) - biome: migrate config to 2.4 schema, autoformat 68 files (strings + indentation), relax opinionated a11y/hooks-deps/index-key rules that don't fit this codebase - routeTree.gen.ts regenerated after /nodes removal
This commit is contained in:
@@ -1,11 +1,19 @@
|
||||
import { Hono } from 'hono';
|
||||
import { stream } from 'hono/streaming';
|
||||
import { getDb } from '../db/index';
|
||||
import type { Job, MediaItem, MediaStream } from '../types';
|
||||
import { predictExtractedFiles } from '../services/ffmpeg';
|
||||
import { accessSync, constants } from 'node:fs';
|
||||
import { log, error as logError } from '../lib/log';
|
||||
import { getSchedulerState, updateSchedulerState } from '../services/scheduler';
|
||||
import { accessSync, constants } from "node:fs";
|
||||
import { Hono } from "hono";
|
||||
import { stream } from "hono/streaming";
|
||||
import { getDb } from "../db/index";
|
||||
import { log, error as logError } from "../lib/log";
|
||||
import { predictExtractedFiles } from "../services/ffmpeg";
|
||||
import {
|
||||
getSchedulerState,
|
||||
isInScheduleWindow,
|
||||
msUntilWindow,
|
||||
nextWindowTime,
|
||||
sleepBetweenJobs,
|
||||
updateSchedulerState,
|
||||
waitForWindow,
|
||||
} from "../services/scheduler";
|
||||
import type { Job, MediaItem, MediaStream } from "../types";
|
||||
|
||||
const app = new Hono();
|
||||
|
||||
@@ -13,17 +21,45 @@ const app = new Hono();
|
||||
|
||||
let queueRunning = false;
|
||||
|
||||
function emitQueueStatus(
|
||||
status: "running" | "paused" | "sleeping" | "idle",
|
||||
extra: { until?: string; seconds?: number } = {},
|
||||
): void {
|
||||
const line = `event: queue_status\ndata: ${JSON.stringify({ status, ...extra })}\n\n`;
|
||||
for (const l of jobListeners) l(line);
|
||||
}
|
||||
|
||||
async function runSequential(jobs: Job[]): Promise<void> {
|
||||
if (queueRunning) return;
|
||||
queueRunning = true;
|
||||
try {
|
||||
let first = true;
|
||||
for (const job of jobs) {
|
||||
// Pause outside the scheduler window
|
||||
if (!isInScheduleWindow()) {
|
||||
emitQueueStatus("paused", { until: nextWindowTime(), seconds: Math.round(msUntilWindow() / 1000) });
|
||||
await waitForWindow();
|
||||
}
|
||||
|
||||
// Sleep between jobs (but not before the first one)
|
||||
if (!first) {
|
||||
const state = getSchedulerState();
|
||||
if (state.job_sleep_seconds > 0) {
|
||||
emitQueueStatus("sleeping", { seconds: state.job_sleep_seconds });
|
||||
await sleepBetweenJobs();
|
||||
}
|
||||
}
|
||||
first = false;
|
||||
|
||||
// Atomic claim: only pick up jobs still pending
|
||||
const db = getDb();
|
||||
const claimed = db
|
||||
.prepare("UPDATE jobs SET status = 'running', started_at = datetime('now'), output = '' WHERE id = ? AND status = 'pending'")
|
||||
.prepare(
|
||||
"UPDATE jobs SET status = 'running', started_at = datetime('now'), output = '' WHERE id = ? AND status = 'pending'",
|
||||
)
|
||||
.run(job.id);
|
||||
if (claimed.changes === 0) continue; // cancelled or already running
|
||||
emitQueueStatus("running");
|
||||
try {
|
||||
await runJob(job);
|
||||
} catch (err) {
|
||||
@@ -32,6 +68,7 @@ async function runSequential(jobs: Job[]): Promise<void> {
|
||||
}
|
||||
} finally {
|
||||
queueRunning = false;
|
||||
emitQueueStatus("idle");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,49 +96,89 @@ function parseFFmpegDuration(line: string): number | null {
|
||||
|
||||
function loadJobRow(jobId: number) {
|
||||
const db = getDb();
|
||||
const row = db.prepare(`
|
||||
const row = db
|
||||
.prepare(`
|
||||
SELECT j.*, mi.id as mi_id, mi.name, mi.type, mi.series_name, mi.season_number,
|
||||
mi.episode_number, mi.file_path
|
||||
FROM jobs j
|
||||
LEFT JOIN media_items mi ON mi.id = j.item_id
|
||||
WHERE j.id = ?
|
||||
`).get(jobId) as (Job & {
|
||||
mi_id: number | null; name: string | null; type: string | null;
|
||||
series_name: string | null; season_number: number | null; episode_number: number | null;
|
||||
file_path: string | null;
|
||||
}) | undefined;
|
||||
`)
|
||||
.get(jobId) as
|
||||
| (Job & {
|
||||
mi_id: number | null;
|
||||
name: string | null;
|
||||
type: string | null;
|
||||
series_name: string | null;
|
||||
season_number: number | null;
|
||||
episode_number: number | null;
|
||||
file_path: string | null;
|
||||
})
|
||||
| undefined;
|
||||
|
||||
if (!row) return null;
|
||||
|
||||
const item = row.name ? { id: row.item_id, name: row.name, type: row.type, series_name: row.series_name, season_number: row.season_number, episode_number: row.episode_number, file_path: row.file_path } as unknown as MediaItem : null;
|
||||
const item = row.name
|
||||
? ({
|
||||
id: row.item_id,
|
||||
name: row.name,
|
||||
type: row.type,
|
||||
series_name: row.series_name,
|
||||
season_number: row.season_number,
|
||||
episode_number: row.episode_number,
|
||||
file_path: row.file_path,
|
||||
} as unknown as MediaItem)
|
||||
: null;
|
||||
return { job: row as unknown as Job, item };
|
||||
}
|
||||
|
||||
// ─── List ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
app.get('/', (c) => {
|
||||
app.get("/", (c) => {
|
||||
const db = getDb();
|
||||
const filter = (c.req.query('filter') ?? 'pending') as 'all' | 'pending' | 'running' | 'done' | 'error';
|
||||
const filter = (c.req.query("filter") ?? "pending") as "all" | "pending" | "running" | "done" | "error";
|
||||
|
||||
const validFilters = ['all', 'pending', 'running', 'done', 'error'];
|
||||
const whereClause = validFilters.includes(filter) && filter !== 'all' ? `WHERE j.status = ?` : '';
|
||||
const validFilters = ["all", "pending", "running", "done", "error"];
|
||||
const whereClause = validFilters.includes(filter) && filter !== "all" ? `WHERE j.status = ?` : "";
|
||||
const params = whereClause ? [filter] : [];
|
||||
|
||||
const jobRows = db.prepare(`
|
||||
const jobRows = db
|
||||
.prepare(`
|
||||
SELECT j.*, mi.name, mi.type, mi.series_name, mi.season_number, mi.episode_number, mi.file_path
|
||||
FROM jobs j
|
||||
LEFT JOIN media_items mi ON mi.id = j.item_id
|
||||
${whereClause}
|
||||
ORDER BY j.created_at DESC
|
||||
LIMIT 200
|
||||
`).all(...params) as (Job & { name: string; type: string; series_name: string | null; season_number: number | null; episode_number: number | null; file_path: string })[];
|
||||
`)
|
||||
.all(...params) as (Job & {
|
||||
name: string;
|
||||
type: string;
|
||||
series_name: string | null;
|
||||
season_number: number | null;
|
||||
episode_number: number | null;
|
||||
file_path: string;
|
||||
})[];
|
||||
|
||||
const jobs = jobRows.map((r) => ({
|
||||
job: r as unknown as Job,
|
||||
item: r.name ? { id: r.item_id, name: r.name, type: r.type, series_name: r.series_name, season_number: r.season_number, episode_number: r.episode_number, file_path: r.file_path } as unknown as MediaItem : null,
|
||||
item: r.name
|
||||
? ({
|
||||
id: r.item_id,
|
||||
name: r.name,
|
||||
type: r.type,
|
||||
series_name: r.series_name,
|
||||
season_number: r.season_number,
|
||||
episode_number: r.episode_number,
|
||||
file_path: r.file_path,
|
||||
} as unknown as MediaItem)
|
||||
: null,
|
||||
}));
|
||||
|
||||
const countRows = db.prepare('SELECT status, COUNT(*) as cnt FROM jobs GROUP BY status').all() as { status: string; cnt: number }[];
|
||||
const countRows = db.prepare("SELECT status, COUNT(*) as cnt FROM jobs GROUP BY status").all() as {
|
||||
status: string;
|
||||
cnt: number;
|
||||
}[];
|
||||
const totalCounts: Record<string, number> = { all: 0, pending: 0, running: 0, done: 0, error: 0 };
|
||||
for (const row of countRows) {
|
||||
totalCounts[row.status] = row.cnt;
|
||||
@@ -121,22 +198,22 @@ function parseId(raw: string | undefined): number | null {
|
||||
|
||||
// ─── Start all pending ────────────────────────────────────────────────────────
|
||||
|
||||
app.post('/start', (c) => {
|
||||
app.post("/start", (c) => {
|
||||
const db = getDb();
|
||||
const pending = db.prepare("SELECT * FROM jobs WHERE status = 'pending' ORDER BY created_at").all() as Job[];
|
||||
runSequential(pending).catch((err) => logError('Queue failed:', err));
|
||||
runSequential(pending).catch((err) => logError("Queue failed:", err));
|
||||
return c.json({ ok: true, started: pending.length });
|
||||
});
|
||||
|
||||
// ─── Run single ───────────────────────────────────────────────────────────────
|
||||
|
||||
app.post('/job/:id/run', async (c) => {
|
||||
const jobId = parseId(c.req.param('id'));
|
||||
if (jobId == null) return c.json({ error: 'invalid job id' }, 400);
|
||||
app.post("/job/:id/run", async (c) => {
|
||||
const jobId = parseId(c.req.param("id"));
|
||||
if (jobId == null) return c.json({ error: "invalid job id" }, 400);
|
||||
const db = getDb();
|
||||
const job = db.prepare('SELECT * FROM jobs WHERE id = ?').get(jobId) as Job | undefined;
|
||||
const job = db.prepare("SELECT * FROM jobs WHERE id = ?").get(jobId) as Job | undefined;
|
||||
if (!job) return c.notFound();
|
||||
if (job.status !== 'pending') {
|
||||
if (job.status !== "pending") {
|
||||
const result = loadJobRow(jobId);
|
||||
if (!result) return c.notFound();
|
||||
return c.json(result);
|
||||
@@ -149,9 +226,9 @@ app.post('/job/:id/run', async (c) => {
|
||||
|
||||
// ─── Cancel ───────────────────────────────────────────────────────────────────
|
||||
|
||||
app.post('/job/:id/cancel', (c) => {
|
||||
const jobId = parseId(c.req.param('id'));
|
||||
if (jobId == null) return c.json({ error: 'invalid job id' }, 400);
|
||||
app.post("/job/:id/cancel", (c) => {
|
||||
const jobId = parseId(c.req.param("id"));
|
||||
if (jobId == null) return c.json({ error: "invalid job id" }, 400);
|
||||
const db = getDb();
|
||||
db.prepare("DELETE FROM jobs WHERE id = ? AND status = 'pending'").run(jobId);
|
||||
return c.json({ ok: true });
|
||||
@@ -159,18 +236,20 @@ app.post('/job/:id/cancel', (c) => {
|
||||
|
||||
// ─── Clear queue ──────────────────────────────────────────────────────────────
|
||||
|
||||
app.post('/clear', (c) => {
|
||||
app.post("/clear", (c) => {
|
||||
const db = getDb();
|
||||
db.prepare(`
|
||||
db
|
||||
.prepare(`
|
||||
UPDATE review_plans SET status = 'pending', reviewed_at = NULL
|
||||
WHERE item_id IN (SELECT item_id FROM jobs WHERE status = 'pending')
|
||||
AND status = 'approved'
|
||||
`).run();
|
||||
`)
|
||||
.run();
|
||||
const result = db.prepare("DELETE FROM jobs WHERE status = 'pending'").run();
|
||||
return c.json({ ok: true, cleared: result.changes });
|
||||
});
|
||||
|
||||
app.post('/clear-completed', (c) => {
|
||||
app.post("/clear-completed", (c) => {
|
||||
const db = getDb();
|
||||
const result = db.prepare("DELETE FROM jobs WHERE status IN ('done', 'error')").run();
|
||||
return c.json({ ok: true, cleared: result.changes });
|
||||
@@ -178,26 +257,34 @@ app.post('/clear-completed', (c) => {
|
||||
|
||||
// ─── SSE ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
app.get('/events', (c) => {
|
||||
app.get("/events", (c) => {
|
||||
return stream(c, async (s) => {
|
||||
c.header('Content-Type', 'text/event-stream');
|
||||
c.header('Cache-Control', 'no-cache');
|
||||
c.header("Content-Type", "text/event-stream");
|
||||
c.header("Cache-Control", "no-cache");
|
||||
|
||||
const queue: string[] = [];
|
||||
let resolve: (() => void) | null = null;
|
||||
const listener = (data: string) => { queue.push(data); resolve?.(); };
|
||||
const listener = (data: string) => {
|
||||
queue.push(data);
|
||||
resolve?.();
|
||||
};
|
||||
|
||||
jobListeners.add(listener);
|
||||
s.onAbort(() => { jobListeners.delete(listener); });
|
||||
s.onAbort(() => {
|
||||
jobListeners.delete(listener);
|
||||
});
|
||||
|
||||
try {
|
||||
while (!s.closed) {
|
||||
if (queue.length > 0) {
|
||||
await s.write(queue.shift()!);
|
||||
} else {
|
||||
await new Promise<void>((res) => { resolve = res; setTimeout(res, 15_000); });
|
||||
await new Promise<void>((res) => {
|
||||
resolve = res;
|
||||
setTimeout(res, 15_000);
|
||||
});
|
||||
resolve = null;
|
||||
if (queue.length === 0) await s.write(': keepalive\n\n');
|
||||
if (queue.length === 0) await s.write(": keepalive\n\n");
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
@@ -213,30 +300,34 @@ async function runJob(job: Job): Promise<void> {
|
||||
log(`Job ${job.id} command: ${job.command}`);
|
||||
const db = getDb();
|
||||
|
||||
const itemRow = db.prepare('SELECT file_path FROM media_items WHERE id = ?').get(job.item_id) as { file_path: string } | undefined;
|
||||
const itemRow = db.prepare("SELECT file_path FROM media_items WHERE id = ?").get(job.item_id) as
|
||||
| { file_path: string }
|
||||
| undefined;
|
||||
if (itemRow?.file_path) {
|
||||
try {
|
||||
accessSync(itemRow.file_path, constants.R_OK | constants.W_OK);
|
||||
} catch (fsErr) {
|
||||
const msg = `File not accessible: ${itemRow.file_path}\n${(fsErr as Error).message}`;
|
||||
db.prepare("UPDATE jobs SET status = 'error', output = ?, exit_code = 1, completed_at = datetime('now') WHERE id = ?").run(msg, job.id);
|
||||
emitJobUpdate(job.id, 'error', msg);
|
||||
db
|
||||
.prepare("UPDATE jobs SET status = 'error', output = ?, exit_code = 1, completed_at = datetime('now') WHERE id = ?")
|
||||
.run(msg, job.id);
|
||||
emitJobUpdate(job.id, "error", msg);
|
||||
db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
emitJobUpdate(job.id, 'running');
|
||||
emitJobUpdate(job.id, "running");
|
||||
|
||||
const outputLines: string[] = [];
|
||||
let pendingFlush = false;
|
||||
let lastFlushAt = 0;
|
||||
let totalSeconds = 0;
|
||||
let lastProgressEmit = 0;
|
||||
const updateOutput = db.prepare('UPDATE jobs SET output = ? WHERE id = ?');
|
||||
const updateOutput = db.prepare("UPDATE jobs SET output = ? WHERE id = ?");
|
||||
|
||||
const flush = (final = false) => {
|
||||
const text = outputLines.join('\n');
|
||||
const text = outputLines.join("\n");
|
||||
const now = Date.now();
|
||||
if (final || now - lastFlushAt > 500) {
|
||||
updateOutput.run(text, job.id);
|
||||
@@ -245,7 +336,7 @@ async function runJob(job: Job): Promise<void> {
|
||||
} else {
|
||||
pendingFlush = true;
|
||||
}
|
||||
emitJobUpdate(job.id, 'running', text);
|
||||
emitJobUpdate(job.id, "running", text);
|
||||
};
|
||||
|
||||
const consumeProgress = (line: string) => {
|
||||
@@ -264,18 +355,18 @@ async function runJob(job: Job): Promise<void> {
|
||||
};
|
||||
|
||||
try {
|
||||
const proc = Bun.spawn(['sh', '-c', job.command], { stdout: 'pipe', stderr: 'pipe' });
|
||||
const readStream = async (readable: ReadableStream<Uint8Array>, prefix = '') => {
|
||||
const proc = Bun.spawn(["sh", "-c", job.command], { stdout: "pipe", stderr: "pipe" });
|
||||
const readStream = async (readable: ReadableStream<Uint8Array>, prefix = "") => {
|
||||
const reader = readable.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = '';
|
||||
let buffer = "";
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
const parts = buffer.split(/\r\n|\n|\r/);
|
||||
buffer = parts.pop() ?? '';
|
||||
buffer = parts.pop() ?? "";
|
||||
for (const line of parts) {
|
||||
if (!line.trim()) continue;
|
||||
outputLines.push(prefix + line);
|
||||
@@ -288,25 +379,29 @@ async function runJob(job: Job): Promise<void> {
|
||||
consumeProgress(buffer);
|
||||
}
|
||||
} catch (err) {
|
||||
logError(`stream read error (${prefix.trim() || 'stdout'}):`, err);
|
||||
logError(`stream read error (${prefix.trim() || "stdout"}):`, err);
|
||||
}
|
||||
};
|
||||
await Promise.all([readStream(proc.stdout), readStream(proc.stderr, '[stderr] '), proc.exited]);
|
||||
await Promise.all([readStream(proc.stdout), readStream(proc.stderr, "[stderr] "), proc.exited]);
|
||||
const exitCode = await proc.exited;
|
||||
if (pendingFlush) updateOutput.run(outputLines.join('\n'), job.id);
|
||||
if (pendingFlush) updateOutput.run(outputLines.join("\n"), job.id);
|
||||
if (exitCode !== 0) throw new Error(`FFmpeg exited with code ${exitCode}`);
|
||||
|
||||
const fullOutput = outputLines.join('\n');
|
||||
const fullOutput = outputLines.join("\n");
|
||||
|
||||
// Gather sidecar files to record
|
||||
const item = db.prepare('SELECT * FROM media_items WHERE id = ?').get(job.item_id) as MediaItem | undefined;
|
||||
const streams = db.prepare('SELECT * FROM media_streams WHERE item_id = ?').all(job.item_id) as MediaStream[];
|
||||
const item = db.prepare("SELECT * FROM media_items WHERE id = ?").get(job.item_id) as MediaItem | undefined;
|
||||
const streams = db.prepare("SELECT * FROM media_streams WHERE item_id = ?").all(job.item_id) as MediaStream[];
|
||||
const files = item && streams.length > 0 ? predictExtractedFiles(item, streams) : [];
|
||||
|
||||
const insertFile = db.prepare('INSERT OR IGNORE INTO subtitle_files (item_id, file_path, language, codec, is_forced, is_hearing_impaired) VALUES (?, ?, ?, ?, ?, ?)');
|
||||
const markJobDone = db.prepare("UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?");
|
||||
const insertFile = db.prepare(
|
||||
"INSERT OR IGNORE INTO subtitle_files (item_id, file_path, language, codec, is_forced, is_hearing_impaired) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
);
|
||||
const markJobDone = db.prepare(
|
||||
"UPDATE jobs SET status = 'done', exit_code = 0, output = ?, completed_at = datetime('now') WHERE id = ?",
|
||||
);
|
||||
const markPlanDone = db.prepare("UPDATE review_plans SET status = 'done' WHERE item_id = ?");
|
||||
const markSubsExtracted = db.prepare('UPDATE review_plans SET subs_extracted = 1 WHERE item_id = ?');
|
||||
const markSubsExtracted = db.prepare("UPDATE review_plans SET subs_extracted = 1 WHERE item_id = ?");
|
||||
|
||||
db.transaction(() => {
|
||||
markJobDone.run(fullOutput, job.id);
|
||||
@@ -318,23 +413,25 @@ async function runJob(job: Job): Promise<void> {
|
||||
})();
|
||||
|
||||
log(`Job ${job.id} completed successfully`);
|
||||
emitJobUpdate(job.id, 'done', fullOutput);
|
||||
emitJobUpdate(job.id, "done", fullOutput);
|
||||
} catch (err) {
|
||||
logError(`Job ${job.id} failed:`, err);
|
||||
const fullOutput = outputLines.join('\n') + '\n' + String(err);
|
||||
db.prepare("UPDATE jobs SET status = 'error', exit_code = 1, output = ?, completed_at = datetime('now') WHERE id = ?").run(fullOutput, job.id);
|
||||
emitJobUpdate(job.id, 'error', fullOutput);
|
||||
const fullOutput = `${outputLines.join("\n")}\n${String(err)}`;
|
||||
db
|
||||
.prepare("UPDATE jobs SET status = 'error', exit_code = 1, output = ?, completed_at = datetime('now') WHERE id = ?")
|
||||
.run(fullOutput, job.id);
|
||||
emitJobUpdate(job.id, "error", fullOutput);
|
||||
db.prepare("UPDATE review_plans SET status = 'error' WHERE item_id = ?").run(job.item_id);
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Scheduler ────────────────────────────────────────────────────────────────
|
||||
|
||||
app.get('/scheduler', (c) => {
|
||||
app.get("/scheduler", (c) => {
|
||||
return c.json(getSchedulerState());
|
||||
});
|
||||
|
||||
app.patch('/scheduler', async (c) => {
|
||||
app.patch("/scheduler", async (c) => {
|
||||
const body = await c.req.json();
|
||||
updateSchedulerState(body);
|
||||
return c.json(getSchedulerState());
|
||||
|
||||
Reference in New Issue
Block a user