improve running-job responsiveness, bump version to 2026.04.15.7
All checks were successful
Build and Push Docker Image / build (push) Successful in 1m41s
All checks were successful
Build and Push Docker Image / build (push) Successful in 1m41s
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import { describe, expect, test } from "bun:test";
|
||||
import { extractErrorSummary } from "../execute";
|
||||
import { extractErrorSummary, shouldSendLiveUpdate, yieldAfterChunk } from "../execute";
|
||||
|
||||
describe("extractErrorSummary", () => {
|
||||
test("pulls the real error line out of ffmpeg's banner", () => {
|
||||
@@ -47,3 +47,27 @@ describe("extractErrorSummary", () => {
|
||||
expect(summary).toBe("Error: no space left on device");
|
||||
});
|
||||
});
|
||||
|
||||
describe("shouldSendLiveUpdate", () => {
|
||||
test("throttles updates until interval passes", () => {
|
||||
expect(shouldSendLiveUpdate(1_000, 800, 500)).toBe(false);
|
||||
expect(shouldSendLiveUpdate(1_301, 800, 500)).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("yieldAfterChunk", () => {
|
||||
test("yields once threshold is reached, resets chunk counter", async () => {
|
||||
let yieldCalls = 0;
|
||||
const sleep = async (_ms: number) => {
|
||||
yieldCalls += 1;
|
||||
};
|
||||
let chunks = 0;
|
||||
chunks = await yieldAfterChunk(chunks, 3, sleep);
|
||||
expect(chunks).toBe(1);
|
||||
chunks = await yieldAfterChunk(chunks, 3, sleep);
|
||||
expect(chunks).toBe(2);
|
||||
chunks = await yieldAfterChunk(chunks, 3, sleep);
|
||||
expect(chunks).toBe(0);
|
||||
expect(yieldCalls).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -23,6 +23,23 @@ const app = new Hono();
|
||||
let queueRunning = false;
|
||||
let runningProc: ReturnType<typeof Bun.spawn> | null = null;
|
||||
let runningJobId: number | null = null;
|
||||
const LIVE_UPDATE_INTERVAL_MS = 500;
|
||||
const STREAM_CHUNKS_BEFORE_YIELD = 24;
|
||||
|
||||
export function shouldSendLiveUpdate(now: number, lastSentAt: number, intervalMs = LIVE_UPDATE_INTERVAL_MS): boolean {
|
||||
return now - lastSentAt > intervalMs;
|
||||
}
|
||||
|
||||
export async function yieldAfterChunk(
|
||||
chunksSinceYield: number,
|
||||
chunksBeforeYield = STREAM_CHUNKS_BEFORE_YIELD,
|
||||
sleep: (ms: number) => Promise<unknown> = (ms) => Bun.sleep(ms),
|
||||
): Promise<number> {
|
||||
const next = chunksSinceYield + 1;
|
||||
if (next < chunksBeforeYield) return next;
|
||||
await sleep(0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
function emitQueueStatus(
|
||||
status: "running" | "paused" | "sleeping" | "idle",
|
||||
@@ -329,14 +346,16 @@ async function runJob(job: Job): Promise<void> {
|
||||
const updateOutput = db.prepare("UPDATE jobs SET output = ? WHERE id = ?");
|
||||
|
||||
const flush = (final = false) => {
|
||||
const text = outputLines.join("\n");
|
||||
const now = Date.now();
|
||||
if (final || now - lastFlushAt > 500) {
|
||||
if (!final && !shouldSendLiveUpdate(now, lastFlushAt)) {
|
||||
pendingFlush = true;
|
||||
return;
|
||||
}
|
||||
const text = outputLines.join("\n");
|
||||
if (final || shouldSendLiveUpdate(now, lastFlushAt)) {
|
||||
updateOutput.run(text, job.id);
|
||||
lastFlushAt = now;
|
||||
pendingFlush = false;
|
||||
} else {
|
||||
pendingFlush = true;
|
||||
}
|
||||
emitJobUpdate(job.id, "running", text);
|
||||
};
|
||||
@@ -349,7 +368,7 @@ async function runJob(job: Job): Promise<void> {
|
||||
const progressed = parseFFmpegProgress(line);
|
||||
if (progressed != null && totalSeconds > 0) {
|
||||
const now = Date.now();
|
||||
if (now - lastProgressEmit > 500) {
|
||||
if (shouldSendLiveUpdate(now, lastProgressEmit)) {
|
||||
emitJobProgress(job.id, progressed, totalSeconds);
|
||||
lastProgressEmit = now;
|
||||
}
|
||||
@@ -364,6 +383,7 @@ async function runJob(job: Job): Promise<void> {
|
||||
const reader = readable.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = "";
|
||||
let chunksSinceYield = 0;
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
@@ -377,6 +397,8 @@ async function runJob(job: Job): Promise<void> {
|
||||
consumeProgress(line);
|
||||
}
|
||||
flush();
|
||||
// Let pending HTTP requests run even when ffmpeg floods stdout/stderr.
|
||||
chunksSinceYield = await yieldAfterChunk(chunksSinceYield);
|
||||
}
|
||||
if (buffer.trim()) {
|
||||
outputLines.push(prefix + buffer);
|
||||
|
||||
Reference in New Issue
Block a user