execute: drain newly-approved jobs mid-run, use strict shared parseId

Queue previously processed a snapshot of pending jobs — anything approved
after Run-all clicked sat idle until the user clicked again. Now, when
the local queue drains, re-poll the DB once for newly-approved jobs
before exiting.

Also swap the looser local parseId (Number.parseInt accepted '42abc')
for the strict shared parseId in server/lib/validate.ts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-15 08:12:59 +02:00
parent e49a04c576
commit b0d06a1d8c

View File

@@ -3,6 +3,7 @@ import { Hono } from "hono";
import { stream } from "hono/streaming"; import { stream } from "hono/streaming";
import { getDb } from "../db/index"; import { getDb } from "../db/index";
import { log, error as logError, warn } from "../lib/log"; import { log, error as logError, warn } from "../lib/log";
import { parseId } from "../lib/validate";
import { predictExtractedFiles } from "../services/ffmpeg"; import { predictExtractedFiles } from "../services/ffmpeg";
import { import {
getScheduleConfig, getScheduleConfig,
@@ -31,12 +32,18 @@ function emitQueueStatus(
for (const l of jobListeners) l(line); for (const l of jobListeners) l(line);
} }
async function runSequential(jobs: Job[]): Promise<void> { async function runSequential(initial: Job[]): Promise<void> {
if (queueRunning) return; if (queueRunning) return;
queueRunning = true; queueRunning = true;
try { try {
let first = true; let first = true;
for (const job of jobs) { const queue: Job[] = [...initial];
const seen = new Set<number>(queue.map((j) => j.id));
while (queue.length > 0) {
// biome-ignore lint/style/noNonNullAssertion: length checked above
const job = queue.shift()!;
// Pause outside the processing window // Pause outside the processing window
if (!isInProcessWindow()) { if (!isInProcessWindow()) {
emitQueueStatus("paused", { emitQueueStatus("paused", {
@@ -70,6 +77,19 @@ async function runSequential(jobs: Job[]): Promise<void> {
} catch (err) { } catch (err) {
logError(`Job ${job.id} failed:`, err); logError(`Job ${job.id} failed:`, err);
} }
// When the local queue drains, re-check the DB for jobs that were
// approved mid-run. Without this they'd sit pending until the user
// manually clicks "Run all" again.
if (queue.length === 0) {
const more = db.prepare("SELECT * FROM jobs WHERE status = 'pending' ORDER BY created_at").all() as Job[];
for (const m of more) {
if (!seen.has(m.id)) {
queue.push(m);
seen.add(m.id);
}
}
}
} }
} finally { } finally {
queueRunning = false; queueRunning = false;
@@ -137,14 +157,6 @@ function loadJobRow(jobId: number) {
return { job: row as unknown as Job, item }; return { job: row as unknown as Job, item };
} }
// ─── Param helpers ────────────────────────────────────────────────────────────
function parseId(raw: string | undefined): number | null {
if (!raw) return null;
const n = Number.parseInt(raw, 10);
return Number.isFinite(n) && n > 0 ? n : null;
}
// ─── Start all pending ──────────────────────────────────────────────────────── // ─── Start all pending ────────────────────────────────────────────────────────
app.post("/start", (c) => { app.post("/start", (c) => {