diff --git a/package.json b/package.json index 9734957..edc063a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "netfelix-audio-fix", - "version": "2026.04.15.6", + "version": "2026.04.15.7", "scripts": { "dev:server": "NODE_ENV=development bun --hot server/index.tsx", "dev:client": "vite", diff --git a/server/api/__tests__/execute.test.ts b/server/api/__tests__/execute.test.ts index 67fc77d..47f106b 100644 --- a/server/api/__tests__/execute.test.ts +++ b/server/api/__tests__/execute.test.ts @@ -1,5 +1,5 @@ import { describe, expect, test } from "bun:test"; -import { extractErrorSummary } from "../execute"; +import { extractErrorSummary, shouldSendLiveUpdate, yieldAfterChunk } from "../execute"; describe("extractErrorSummary", () => { test("pulls the real error line out of ffmpeg's banner", () => { @@ -47,3 +47,27 @@ describe("extractErrorSummary", () => { expect(summary).toBe("Error: no space left on device"); }); }); + +describe("shouldSendLiveUpdate", () => { + test("throttles updates until interval passes", () => { + expect(shouldSendLiveUpdate(1_000, 800, 500)).toBe(false); + expect(shouldSendLiveUpdate(1_301, 800, 500)).toBe(true); + }); +}); + +describe("yieldAfterChunk", () => { + test("yields once threshold is reached, resets chunk counter", async () => { + let yieldCalls = 0; + const sleep = async (_ms: number) => { + yieldCalls += 1; + }; + let chunks = 0; + chunks = await yieldAfterChunk(chunks, 3, sleep); + expect(chunks).toBe(1); + chunks = await yieldAfterChunk(chunks, 3, sleep); + expect(chunks).toBe(2); + chunks = await yieldAfterChunk(chunks, 3, sleep); + expect(chunks).toBe(0); + expect(yieldCalls).toBe(1); + }); +}); diff --git a/server/api/execute.ts b/server/api/execute.ts index e937bee..929458c 100644 --- a/server/api/execute.ts +++ b/server/api/execute.ts @@ -23,6 +23,23 @@ const app = new Hono(); let queueRunning = false; let runningProc: ReturnType | null = null; let runningJobId: number | null = null; +const LIVE_UPDATE_INTERVAL_MS = 500; +const STREAM_CHUNKS_BEFORE_YIELD = 24; + +export function shouldSendLiveUpdate(now: number, lastSentAt: number, intervalMs = LIVE_UPDATE_INTERVAL_MS): boolean { + return now - lastSentAt > intervalMs; +} + +export async function yieldAfterChunk( + chunksSinceYield: number, + chunksBeforeYield = STREAM_CHUNKS_BEFORE_YIELD, + sleep: (ms: number) => Promise = (ms) => Bun.sleep(ms), +): Promise { + const next = chunksSinceYield + 1; + if (next < chunksBeforeYield) return next; + await sleep(0); + return 0; +} function emitQueueStatus( status: "running" | "paused" | "sleeping" | "idle", @@ -329,14 +346,16 @@ async function runJob(job: Job): Promise { const updateOutput = db.prepare("UPDATE jobs SET output = ? WHERE id = ?"); const flush = (final = false) => { - const text = outputLines.join("\n"); const now = Date.now(); - if (final || now - lastFlushAt > 500) { + if (!final && !shouldSendLiveUpdate(now, lastFlushAt)) { + pendingFlush = true; + return; + } + const text = outputLines.join("\n"); + if (final || shouldSendLiveUpdate(now, lastFlushAt)) { updateOutput.run(text, job.id); lastFlushAt = now; pendingFlush = false; - } else { - pendingFlush = true; } emitJobUpdate(job.id, "running", text); }; @@ -349,7 +368,7 @@ async function runJob(job: Job): Promise { const progressed = parseFFmpegProgress(line); if (progressed != null && totalSeconds > 0) { const now = Date.now(); - if (now - lastProgressEmit > 500) { + if (shouldSendLiveUpdate(now, lastProgressEmit)) { emitJobProgress(job.id, progressed, totalSeconds); lastProgressEmit = now; } @@ -364,6 +383,7 @@ async function runJob(job: Job): Promise { const reader = readable.getReader(); const decoder = new TextDecoder(); let buffer = ""; + let chunksSinceYield = 0; try { while (true) { const { done, value } = await reader.read(); @@ -377,6 +397,8 @@ async function runJob(job: Job): Promise { consumeProgress(line); } flush(); + // Let pending HTTP requests run even when ffmpeg floods stdout/stderr. + chunksSinceYield = await yieldAfterChunk(chunksSinceYield); } if (buffer.trim()) { outputLines.push(prefix + buffer);