Files
netfelix-audio-fix/docs/superpowers/plans/2026-03-27-unified-pipeline.md
Felix Förtsch 93ed0ac33c fix analyzer + api boundary + perf + scheduler hardening
- analyzer: rewrite checkAudioOrderChanged to compare actual output order, unify assignTargetOrder with a shared sortKeptStreams util in ffmpeg builder
- review: recompute is_noop via full audio removed/reordered/transcode/subs check on toggle, preserve custom_title across rescan by matching (type,lang,stream_index,title), batch pipeline transcode-reasons query to avoid N+1
- validate: add lib/validate.ts with parseId + isOneOf helpers; replace bare Number(c.req.param('id')) with 400 on invalid ids across review/subtitles
- scan: atomic CAS on scan_running config to prevent concurrent scans
- subtitles: path-traversal guard — only unlink sidecars within the media item's directory; log-and-orphan DB entries pointing outside
- schedule: include end minute in window (<= vs <)
- db: add indexes on review_plans(status,is_noop), stream_decisions(plan_id), media_items(series_jellyfin_id,series_name,type), media_streams(item_id,type), subtitle_files(item_id), jobs(status,item_id)
2026-04-13 07:31:48 +02:00

1586 lines
49 KiB
Markdown

# Unified Media Processing Pipeline — Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Unify subtitle extraction, audio cleanup, and Apple compatibility transcoding into a single pipeline per file with a Kanban board UI, confidence-based auto-approval, and job scheduling.
**Architecture:** The analyzer evaluates all three pipeline steps and produces one review plan with one FFmpeg command per file. A new Apple compatibility service maps codecs to transcode targets. The Kanban board replaces the separate scan/review/execute pages. A scheduler controls job timing (sleep between jobs, time window).
**Tech Stack:** Bun + Hono (backend), React 19 + TanStack Router + Tailwind v4 (frontend), bun:sqlite, FFmpeg
---
## File Structure
### New files
| File | Responsibility |
|------|---------------|
| `server/services/apple-compat.ts` | Codec compatibility checks, transcode target mapping |
| `server/services/scheduler.ts` | Job sleep and schedule window logic |
| `src/routes/pipeline.tsx` | TanStack Router route for pipeline page |
| `src/features/pipeline/PipelinePage.tsx` | Kanban board layout with 5 columns |
| `src/features/pipeline/ReviewColumn.tsx` | Review column with series grouping, batch approve |
| `src/features/pipeline/QueueColumn.tsx` | Queued jobs waiting for execution |
| `src/features/pipeline/ProcessingColumn.tsx` | Active job with progress bar, schedule status |
| `src/features/pipeline/DoneColumn.tsx` | Completed items |
| `src/features/pipeline/PipelineCard.tsx` | Card component for movies/episodes |
| `src/features/pipeline/SeriesCard.tsx` | Collapsible series card with episode list |
| `src/features/pipeline/ScheduleControls.tsx` | Sleep/schedule config UI |
### Modified files
| File | Changes |
|------|---------|
| `server/db/schema.ts` | New columns on review_plans, stream_decisions; new config defaults |
| `server/db/index.ts` | Migrations for new columns |
| `server/types.ts` | Updated interfaces: ReviewPlan, StreamDecision, Job, new AppleCompat types |
| `server/services/analyzer.ts` | Unified 3-step analysis with confidence scoring |
| `server/services/ffmpeg.ts` | Per-stream codec args (copy vs transcode), combined sub+audio command |
| `server/api/scan.ts` | Confidence scoring during scan |
| `server/api/review.ts` | Batch "approve up to here", series-level language |
| `server/api/execute.ts` | Scheduler integration, FFmpeg progress parsing |
| `server/index.tsx` | Add pipeline API route |
| `src/shared/lib/types.ts` | Mirror backend type changes |
| `src/routes/__root.tsx` | Replace nav links with Pipeline tab |
---
### Task 1: Schema Migrations and Type Updates
**Files:**
- Modify: `server/db/schema.ts`
- Modify: `server/db/index.ts`
- Modify: `server/types.ts`
- Modify: `src/shared/lib/types.ts`
- [ ] **Step 1: Add new columns to schema migrations in `server/db/index.ts`**
Add migrations after the existing ones (around line 40 in `getDb()`). Find the existing migration block that adds `subs_extracted` and append:
```typescript
// Apple compat pipeline columns
db.run(`ALTER TABLE review_plans ADD COLUMN confidence TEXT NOT NULL DEFAULT 'low'`);
db.run(`ALTER TABLE review_plans ADD COLUMN apple_compat TEXT`);
db.run(`ALTER TABLE review_plans ADD COLUMN job_type TEXT NOT NULL DEFAULT 'copy'`);
db.run(`ALTER TABLE stream_decisions ADD COLUMN transcode_codec TEXT`);
```
Wrap each in try/catch like the existing migrations (SQLite throws if column already exists).
- [ ] **Step 2: Add new config defaults in `server/db/schema.ts`**
Add to `DEFAULT_CONFIG` (after `scan_running`):
```typescript
job_sleep_seconds: '0',
schedule_enabled: '0',
schedule_start: '01:00',
schedule_end: '07:00',
```
- [ ] **Step 3: Update `server/types.ts` — ReviewPlan interface**
Add new fields to `ReviewPlan` (around line 46):
```typescript
export interface ReviewPlan {
id: number;
item_id: number;
status: 'pending' | 'approved' | 'skipped' | 'done' | 'error';
is_noop: number;
confidence: 'high' | 'low';
apple_compat: 'direct_play' | 'remux' | 'audio_transcode' | null;
job_type: 'copy' | 'transcode';
subs_extracted: number;
notes: string | null;
reviewed_at: string | null;
created_at: string;
}
```
- [ ] **Step 4: Update `server/types.ts` — StreamDecision interface**
Add `transcode_codec` to `StreamDecision` (around line 69):
```typescript
export interface StreamDecision {
id: number;
plan_id: number;
stream_id: number;
action: 'keep' | 'remove';
target_index: number | null;
custom_title: string | null;
transcode_codec: string | null;
}
```
- [ ] **Step 5: Update `server/types.ts` — PlanResult interface**
Extend `PlanResult` (around line 115):
```typescript
export interface PlanResult {
is_noop: boolean;
has_subs: boolean;
confidence: 'high' | 'low';
apple_compat: 'direct_play' | 'remux' | 'audio_transcode' | null;
job_type: 'copy' | 'transcode';
decisions: { stream_id: number; action: 'keep' | 'remove'; target_index: number | null; transcode_codec: string | null }[];
notes: string[];
}
```
- [ ] **Step 6: Mirror type changes in `src/shared/lib/types.ts`**
Update `ReviewPlan`, `StreamDecision` to match server types. Add `confidence`, `apple_compat`, `job_type` to ReviewPlan and `transcode_codec` to StreamDecision.
- [ ] **Step 7: Verify migrations run**
Run: `bun run dev` — check that the server starts without errors and the database has the new columns.
- [ ] **Step 8: Commit**
```bash
git add server/db/schema.ts server/db/index.ts server/types.ts src/shared/lib/types.ts
git commit -m "add schema migrations for unified pipeline: confidence, apple_compat, job_type, transcode_codec"
```
---
### Task 2: Apple Compatibility Service
**Files:**
- Create: `server/services/apple-compat.ts`
- [ ] **Step 1: Create `server/services/apple-compat.ts`**
```typescript
// Codec sets and transcode target mapping for Apple device compatibility.
// Apple natively decodes: AAC, AC3, EAC3, ALAC, FLAC, MP3, PCM, Opus
// Everything else (DTS family, TrueHD family) needs transcoding.
const APPLE_COMPATIBLE_AUDIO = new Set([
'aac', 'ac3', 'eac3', 'alac', 'flac', 'mp3',
'pcm_s16le', 'pcm_s24le', 'pcm_s32le', 'pcm_f32le',
'pcm_s16be', 'pcm_s24be', 'pcm_s32be', 'pcm_f64le',
'opus',
]);
// Lossless source codecs — get FLAC in MKV, EAC3 in MP4
const LOSSLESS_CODECS = new Set([
'dts', // DTS-HD MA reports as 'dts' with profile
'truehd',
]);
// Codec strings Jellyfin may report for DTS variants
const DTS_CODECS = new Set([
'dts', 'dca',
]);
const TRUEHD_CODECS = new Set([
'truehd',
]);
export function isAppleCompatible(codec: string): boolean {
return APPLE_COMPATIBLE_AUDIO.has(codec.toLowerCase());
}
/** Maps (codec, profile, container) → target codec for transcoding. */
export function transcodeTarget(
codec: string,
profile: string | null,
container: string | null,
): string | null {
const c = codec.toLowerCase();
const isMkv = !container || container.toLowerCase() === 'mkv' || container.toLowerCase() === 'matroska';
if (isAppleCompatible(c)) return null; // no transcode needed
// DTS-HD MA and DTS:X are lossless → FLAC in MKV, EAC3 in MP4
if (DTS_CODECS.has(c)) {
const p = (profile ?? '').toLowerCase();
const isLossless = p.includes('ma') || p.includes('hd ma') || p.includes('x');
if (isLossless) return isMkv ? 'flac' : 'eac3';
// Lossy DTS variants → EAC3
return 'eac3';
}
// TrueHD (including Atmos) → FLAC in MKV, EAC3 in MP4
if (TRUEHD_CODECS.has(c)) {
return isMkv ? 'flac' : 'eac3';
}
// Any other incompatible codec → EAC3 as safe fallback
return 'eac3';
}
/** Determine overall Apple compatibility for a set of kept audio streams. */
export function computeAppleCompat(
keptAudioCodecs: string[],
container: string | null,
): 'direct_play' | 'remux' | 'audio_transcode' {
const hasIncompatible = keptAudioCodecs.some(c => !isAppleCompatible(c));
if (hasIncompatible) return 'audio_transcode';
const isMkv = !container || container.toLowerCase() === 'mkv' || container.toLowerCase() === 'matroska';
if (isMkv) return 'remux';
return 'direct_play';
}
```
- [ ] **Step 2: Commit**
```bash
git add server/services/apple-compat.ts
git commit -m "add apple compatibility service: codec checks, transcode target mapping"
```
---
### Task 3: Unified Analyzer
**Files:**
- Modify: `server/services/analyzer.ts`
- [ ] **Step 1: Read the current analyzer**
Read `server/services/analyzer.ts` fully to understand the existing `analyzeItem()`, `decideAction()`, `assignTargetOrder()`, `langRank()`, and `checkAudioOrderChanged()` functions.
- [ ] **Step 2: Add apple-compat imports and update `PlanResult`**
At the top of `server/services/analyzer.ts`, add:
```typescript
import { isAppleCompatible, transcodeTarget, computeAppleCompat } from './apple-compat';
```
- [ ] **Step 3: Update `analyzeItem()` to compute transcode decisions**
After `assignTargetOrder()` runs (which determines which streams are kept and their order), add a new step that iterates over kept audio streams and computes `transcode_codec`:
```typescript
// Step 3: Apple compatibility — compute transcode targets for kept audio
for (const d of decisions) {
if (d.action === 'keep') {
const stream = streams.find(s => s.id === d.stream_id);
if (stream && stream.type === 'Audio') {
d.transcode_codec = transcodeTarget(
stream.codec ?? '',
stream.title, // Jellyfin sometimes puts DTS profile info in title
item.container,
);
}
}
}
```
- [ ] **Step 4: Compute confidence, apple_compat, and job_type on the PlanResult**
Before returning from `analyzeItem()`, compute the new fields:
```typescript
const keptAudioCodecs = decisions
.filter(d => d.action === 'keep')
.map(d => streams.find(s => s.id === d.stream_id))
.filter(s => s && s.type === 'Audio')
.map(s => s!.codec ?? '');
const needsTranscode = decisions.some(d => d.transcode_codec != null);
const apple_compat = computeAppleCompat(keptAudioCodecs, item.container);
const job_type = needsTranscode ? 'transcode' as const : 'copy' as const;
// Confidence is computed during scan (depends on source agreement),
// but we can detect the "zero audio tracks match OG" case here
const ogLang = item.original_language;
const hasOgAudio = ogLang
? streams.some(s => s.type === 'Audio' && s.language === ogLang)
: false;
const noOgMatch = ogLang && !hasOgAudio;
```
- [ ] **Step 5: Update is_noop to include Apple compatibility**
Update the `is_noop` computation to also check that no audio transcoding is needed:
```typescript
const is_noop = !anyAudioRemoved && !audioOrderChanged && !has_subs && !needsTranscode;
```
Where `has_subs` means there are embedded subtitles to extract (already computed). Previously `is_noop` only checked audio; now it checks all three pipeline steps.
- [ ] **Step 6: Return extended PlanResult**
```typescript
return {
is_noop,
has_subs,
confidence: noOgMatch ? 'low' : 'low', // base confidence, scan upgrades to 'high'
apple_compat,
job_type,
decisions: decisions.map(d => ({
stream_id: d.stream_id,
action: d.action,
target_index: d.target_index,
transcode_codec: d.transcode_codec ?? null,
})),
notes,
};
```
- [ ] **Step 7: Verify existing scan still works**
Run: `bun run dev`, trigger a scan with a few items, check that `review_plans` rows now have `confidence`, `apple_compat`, and `job_type` populated.
- [ ] **Step 8: Commit**
```bash
git add server/services/analyzer.ts
git commit -m "unify analyzer: 3-step pipeline with apple compat, transcode decisions, extended is_noop"
```
---
### Task 4: Update Scan to Store New Fields and Compute Confidence
**Files:**
- Modify: `server/api/scan.ts`
- [ ] **Step 1: Read `server/api/scan.ts`**
Read the full file to understand the `runScan()` function and how it stores `review_plans` and `stream_decisions`.
- [ ] **Step 2: Update review_plans INSERT to include new columns**
Find the INSERT INTO review_plans statement and add the new columns:
```sql
INSERT INTO review_plans (item_id, status, is_noop, confidence, apple_compat, job_type, notes)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(item_id) DO UPDATE SET
status = CASE WHEN review_plans.status IN ('done','error') THEN 'pending' ELSE review_plans.status END,
is_noop = excluded.is_noop,
confidence = excluded.confidence,
apple_compat = excluded.apple_compat,
job_type = excluded.job_type,
notes = excluded.notes
```
Pass `plan.confidence`, `plan.apple_compat`, `plan.job_type` as bind params.
- [ ] **Step 3: Update stream_decisions INSERT to include transcode_codec**
Find the INSERT INTO stream_decisions statement and add:
```sql
INSERT INTO stream_decisions (plan_id, stream_id, action, target_index, transcode_codec)
VALUES (?, ?, ?, ?, ?)
```
Pass `d.transcode_codec` as the fifth bind param.
- [ ] **Step 4: Compute confidence during scan**
After the language resolution logic (Jellyfin vs Radarr vs Sonarr), compute confidence based on source agreement. Add this before the `analyzeItem()` call:
```typescript
// Compute confidence from source agreement
let confidence: 'high' | 'low' = 'low';
const jellyfinLang = jellyfinOrigLang; // from Jellyfin metadata
const arrLang = radarrLang ?? sonarrLang ?? null; // from *arr services
if (!origLang) {
confidence = 'low'; // unknown language
} else if (needsReview) {
confidence = 'low'; // sources disagree
} else if (jellyfinLang && arrLang && jellyfinLang === arrLang) {
confidence = 'high'; // two sources agree
} else if ((jellyfinLang || arrLang) && !needsReview) {
confidence = 'high'; // single source, no conflict
}
```
Override the analyzer's base confidence with this computed value before storing.
- [ ] **Step 5: Verify scan populates new fields**
Run scan, then query:
```sql
SELECT confidence, apple_compat, job_type FROM review_plans LIMIT 10;
SELECT transcode_codec FROM stream_decisions WHERE transcode_codec IS NOT NULL LIMIT 10;
```
- [ ] **Step 6: Commit**
```bash
git add server/api/scan.ts
git commit -m "store confidence, apple_compat, job_type, transcode_codec during scan"
```
---
### Task 5: Unified FFmpeg Command Builder
**Files:**
- Modify: `server/services/ffmpeg.ts`
- [ ] **Step 1: Read `server/services/ffmpeg.ts`**
Read the full file, especially `buildCommand()`, `buildExtractOnlyCommand()`, `buildExtractionOutputs()`, and `buildStreamFlags()`.
- [ ] **Step 2: Create `buildPipelineCommand()` function**
This replaces the separate `buildCommand()` and `buildExtractOnlyCommand()` with a single function that handles all three pipeline steps:
```typescript
/**
* Build a single FFmpeg command that:
* 1. Extracts subtitles to sidecar files
* 2. Remuxes with reordered/filtered audio
* 3. Transcodes incompatible audio codecs
*/
export function buildPipelineCommand(
item: { file_path: string; container: string | null },
streams: MediaStream[],
decisions: (StreamDecision & { stream: MediaStream })[],
): { command: string; extractedFiles: { path: string; language: string | null; codec: string | null; is_forced: number; is_hearing_impaired: number }[] } {
const inputPath = item.file_path;
const ext = item.container?.toLowerCase() === 'mp4' ? 'mp4' : 'mkv';
const tmpPath = inputPath.replace(/\.[^.]+$/, `.tmp.${ext}`);
// --- Subtitle extraction outputs ---
const subStreams = streams.filter(s => s.type === 'Subtitle');
const extraction = computeExtractionEntries(inputPath, subStreams);
const subOutputs = buildExtractionOutputs(extraction, streams);
// --- Kept streams for remuxed output ---
const kept = decisions
.filter(d => d.action === 'keep')
.sort((a, b) => (a.target_index ?? 999) - (b.target_index ?? 999));
// Build -map flags using type-relative indices
const maps = buildMaps(kept.map(d => d.stream), streams);
// Build per-stream codec flags
const codecFlags: string[] = ['-c:v copy'];
let audioIdx = 0;
for (const d of kept) {
if (d.stream.type === 'Audio') {
if (d.transcode_codec) {
codecFlags.push(`-c:a:${audioIdx} ${d.transcode_codec}`);
// For EAC3, set a reasonable bitrate based on channel count
if (d.transcode_codec === 'eac3') {
const bitrate = (d.stream.channels ?? 2) >= 6 ? '640k' : '256k';
codecFlags.push(`-b:a:${audioIdx} ${bitrate}`);
}
} else {
codecFlags.push(`-c:a:${audioIdx} copy`);
}
audioIdx++;
}
}
// Disposition + metadata flags for audio
const streamFlags = buildStreamFlags(kept, streams);
// Assemble command parts
const parts: string[] = ['ffmpeg -y', `-i ${shellQuote(inputPath)}`];
// Subtitle extraction outputs first
for (const out of subOutputs) {
parts.push(out);
}
// Map flags for remuxed output
parts.push(...maps);
// Codec flags
parts.push(...codecFlags);
// Stream flags (disposition, metadata)
parts.push(...streamFlags);
// Output file
parts.push(shellQuote(tmpPath));
const command = parts.join(' \\\n ')
+ ` && mv ${shellQuote(tmpPath)} ${shellQuote(inputPath)}`;
return {
command,
extractedFiles: extraction.map(e => ({
path: e.outputPath,
language: e.language,
codec: e.codec,
is_forced: e.is_forced ? 1 : 0,
is_hearing_impaired: e.is_hearing_impaired ? 1 : 0,
})),
};
}
```
- [ ] **Step 3: Update `buildStreamFlags()` to accept decisions with transcode info**
The existing function builds disposition and metadata flags. Update it to also handle title metadata for transcoded streams (e.g., append " (Transcoded)" to title if desired, or keep original title). No change needed if we keep the same title logic.
- [ ] **Step 4: Update review.ts `loadItemDetail()` to use `buildPipelineCommand()`**
In `server/api/review.ts`, update the `loadItemDetail()` function to call `buildPipelineCommand()` instead of the separate `buildCommand()` / `buildExtractOnlyCommand()`. This ensures the detail page shows the unified command.
- [ ] **Step 5: Update execute.ts `runJob()` post-processing**
After a job completes successfully, the current code calls `predictExtractedFiles()`. Update to use the `extractedFiles` from the command builder (stored alongside the job, or recomputed from decisions).
- [ ] **Step 6: Verify commands generate correctly**
Scan a file with DTS audio and embedded subtitles. Check the review detail page shows a single FFmpeg command with subtitle extraction, audio reordering, and `-c:a:N eac3` for the DTS stream.
- [ ] **Step 7: Commit**
```bash
git add server/services/ffmpeg.ts server/api/review.ts server/api/execute.ts
git commit -m "unified FFmpeg command builder: single command for sub extraction, audio cleanup, transcode"
```
---
### Task 6: Job Scheduler Service
**Files:**
- Create: `server/services/scheduler.ts`
- Modify: `server/api/execute.ts`
- [ ] **Step 1: Create `server/services/scheduler.ts`**
```typescript
import { getConfig, setConfig } from '../db';
export interface SchedulerState {
job_sleep_seconds: number;
schedule_enabled: boolean;
schedule_start: string; // "HH:MM"
schedule_end: string; // "HH:MM"
}
export function getSchedulerState(): SchedulerState {
return {
job_sleep_seconds: parseInt(getConfig('job_sleep_seconds') ?? '0', 10),
schedule_enabled: getConfig('schedule_enabled') === '1',
schedule_start: getConfig('schedule_start') ?? '01:00',
schedule_end: getConfig('schedule_end') ?? '07:00',
};
}
export function updateSchedulerState(updates: Partial<SchedulerState>): void {
if (updates.job_sleep_seconds != null) setConfig('job_sleep_seconds', String(updates.job_sleep_seconds));
if (updates.schedule_enabled != null) setConfig('schedule_enabled', updates.schedule_enabled ? '1' : '0');
if (updates.schedule_start != null) setConfig('schedule_start', updates.schedule_start);
if (updates.schedule_end != null) setConfig('schedule_end', updates.schedule_end);
}
/** Check if current time is within the schedule window. */
export function isInScheduleWindow(): boolean {
const state = getSchedulerState();
if (!state.schedule_enabled) return true; // no schedule = always allowed
const now = new Date();
const minutes = now.getHours() * 60 + now.getMinutes();
const start = parseTime(state.schedule_start);
const end = parseTime(state.schedule_end);
// Handle overnight windows (e.g., 23:00 → 07:00)
if (start <= end) {
return minutes >= start && minutes < end;
} else {
return minutes >= start || minutes < end;
}
}
/** Returns milliseconds until the next schedule window opens. */
export function msUntilWindow(): number {
const state = getSchedulerState();
const now = new Date();
const minutes = now.getHours() * 60 + now.getMinutes();
const start = parseTime(state.schedule_start);
if (minutes < start) {
return (start - minutes) * 60_000;
} else {
// Next day
return (24 * 60 - minutes + start) * 60_000;
}
}
/** Returns the schedule_start time as "HH:MM" for display. */
export function nextWindowTime(): string {
return getSchedulerState().schedule_start;
}
function parseTime(hhmm: string): number {
const [h, m] = hhmm.split(':').map(Number);
return h * 60 + m;
}
/** Sleep for the configured duration between jobs. */
export function sleepBetweenJobs(): Promise<void> {
const seconds = getSchedulerState().job_sleep_seconds;
if (seconds <= 0) return Promise.resolve();
return new Promise(resolve => setTimeout(resolve, seconds * 1000));
}
/** Wait until the schedule window opens. Resolves immediately if already in window. */
export function waitForWindow(): Promise<void> {
if (isInScheduleWindow()) return Promise.resolve();
const ms = msUntilWindow();
return new Promise(resolve => setTimeout(resolve, ms));
}
```
- [ ] **Step 2: Integrate scheduler into `server/api/execute.ts`**
Update `runSequential()` to check the schedule window before each job and sleep between jobs:
```typescript
import { sleepBetweenJobs, waitForWindow, isInScheduleWindow, getSchedulerState, nextWindowTime } from '../services/scheduler';
async function runSequential(jobs: Job[], nodeId: string | null) {
const key = targetKey(nodeId);
if (runningTargets.has(key)) return;
runningTargets.add(key);
try {
for (const job of jobs) {
// Wait for schedule window
if (!isInScheduleWindow()) {
emitJobUpdate({ type: 'queue_status', data: { status: 'paused', until: nextWindowTime() } });
await waitForWindow();
}
await runJob(job);
// Sleep between jobs
const state = getSchedulerState();
if (state.job_sleep_seconds > 0) {
emitJobUpdate({ type: 'queue_status', data: { status: 'sleeping', seconds: state.job_sleep_seconds } });
await sleepBetweenJobs();
}
}
} finally {
runningTargets.delete(key);
}
}
```
- [ ] **Step 3: Add scheduler API endpoints**
Add to `server/api/execute.ts`:
```typescript
// GET /scheduler — current scheduler state
app.get('/scheduler', (c) => {
return c.json(getSchedulerState());
});
// PATCH /scheduler — update scheduler settings
app.patch('/scheduler', async (c) => {
const body = await c.req.json();
updateSchedulerState(body);
return c.json(getSchedulerState());
});
```
- [ ] **Step 4: Add FFmpeg progress parsing**
Add a helper to parse FFmpeg's stderr for transcode progress:
```typescript
/** Parse FFmpeg stderr line for progress. Returns seconds processed or null. */
export function parseFFmpegProgress(line: string): number | null {
const match = line.match(/time=(\d+):(\d+):(\d+)\.(\d+)/);
if (!match) return null;
const [, h, m, s] = match.map(Number);
return h * 3600 + m * 60 + s;
}
```
Use this in `runJob()` to emit progress events for transcode jobs. The total duration comes from the media item's metadata (or from FFmpeg's initial output).
- [ ] **Step 5: Emit progress SSE events**
Add new SSE event types for the job runner:
```typescript
// In runJob(), when processing stderr:
const progress = parseFFmpegProgress(line);
if (progress != null) {
emitJobUpdate({
type: 'job_progress',
data: { id: job.id, seconds: progress, total: totalDuration },
});
}
```
- [ ] **Step 6: Commit**
```bash
git add server/services/scheduler.ts server/api/execute.ts
git commit -m "add job scheduler: sleep between jobs, schedule window, FFmpeg progress parsing"
```
---
### Task 7: Pipeline API Endpoint
**Files:**
- Modify: `server/api/review.ts`
- Modify: `server/index.tsx`
- [ ] **Step 1: Add "approve up to here" endpoint**
Add to `server/api/review.ts`:
```typescript
// POST /approve-up-to/:id — approve this plan and all plans above it
// "Above" means: higher confidence first, then by item name, then by id
app.post('/approve-up-to/:id', async (c) => {
const targetId = Number(c.req.param('id'));
const db = getDb();
// Get the target plan's sort position
const target = db.query(`SELECT id, item_id FROM review_plans WHERE id = ?`).get(targetId) as any;
if (!target) return c.json({ error: 'Plan not found' }, 404);
// Get all pending plans sorted by confidence (high first), then name
const pendingPlans = db.query(`
SELECT rp.id, rp.confidence, mi.name, mi.series_name
FROM review_plans rp
JOIN media_items mi ON mi.id = rp.item_id
WHERE rp.status = 'pending' AND rp.is_noop = 0
ORDER BY
CASE rp.confidence WHEN 'high' THEN 0 ELSE 1 END,
COALESCE(mi.series_name, mi.name),
mi.season_number,
mi.episode_number,
mi.name
`).all() as any[];
// Find the target and approve everything up to and including it
const toApprove: number[] = [];
for (const plan of pendingPlans) {
toApprove.push(plan.id);
if (plan.id === targetId) break;
}
// Batch approve
const stmt = db.prepare(`UPDATE review_plans SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?`);
for (const id of toApprove) {
stmt.run(id);
}
// Create jobs for approved plans
for (const id of toApprove) {
const detail = loadItemDetail(id);
if (detail?.command) {
db.run(
`INSERT INTO jobs (item_id, command, job_type, status) VALUES (?, ?, ?, 'pending')`,
[detail.item.id, detail.command, detail.plan.job_type],
);
}
}
return c.json({ approved: toApprove.length });
});
```
- [ ] **Step 2: Add series-level language endpoint**
Add to `server/api/review.ts`:
```typescript
// PATCH /series/:seriesKey/language — set OG language for all episodes in a series
app.patch('/series/:seriesKey/language', async (c) => {
const seriesKey = decodeURIComponent(c.req.param('seriesKey'));
const { language } = await c.req.json();
const db = getDb();
// Update all items in the series
const items = db.query(
`SELECT id FROM media_items WHERE series_jellyfin_id = ? OR series_name = ?`
).all(seriesKey, seriesKey) as { id: number }[];
for (const item of items) {
db.run(`UPDATE media_items SET original_language = ?, orig_lang_source = 'manual', needs_review = 0 WHERE id = ?`, [language, item.id]);
}
// Re-analyze all episodes
const config = {
subtitleLanguages: JSON.parse(getConfig('subtitle_languages') ?? '[]'),
audioLanguages: JSON.parse(getConfig('audio_languages') ?? '[]'),
};
for (const item of items) {
await reanalyze(item.id, config);
}
return c.json({ updated: items.length });
});
```
- [ ] **Step 3: Add pipeline summary endpoint**
Add a new endpoint that returns data grouped by pipeline column:
```typescript
// GET /pipeline — returns items grouped by pipeline stage
app.get('/pipeline', (c) => {
const db = getDb();
const review = db.query(`
SELECT rp.*, mi.name, mi.series_name, mi.series_jellyfin_id,
mi.season_number, mi.episode_number, mi.type, mi.container,
mi.original_language, mi.orig_lang_source, mi.file_path
FROM review_plans rp
JOIN media_items mi ON mi.id = rp.item_id
WHERE rp.status = 'pending' AND rp.is_noop = 0
ORDER BY
CASE rp.confidence WHEN 'high' THEN 0 ELSE 1 END,
COALESCE(mi.series_name, mi.name),
mi.season_number, mi.episode_number
`).all();
const queued = db.query(`
SELECT j.*, mi.name, mi.series_name, mi.type,
rp.job_type, rp.apple_compat
FROM jobs j
JOIN media_items mi ON mi.id = j.item_id
JOIN review_plans rp ON rp.item_id = j.item_id
WHERE j.status = 'pending'
ORDER BY j.created_at
`).all();
const processing = db.query(`
SELECT j.*, mi.name, mi.series_name, mi.type,
rp.job_type, rp.apple_compat
FROM jobs j
JOIN media_items mi ON mi.id = j.item_id
JOIN review_plans rp ON rp.item_id = j.item_id
WHERE j.status = 'running'
`).all();
const done = db.query(`
SELECT j.*, mi.name, mi.series_name, mi.type,
rp.job_type, rp.apple_compat
FROM jobs j
JOIN media_items mi ON mi.id = j.item_id
JOIN review_plans rp ON rp.item_id = j.item_id
WHERE j.status IN ('done', 'error')
ORDER BY j.completed_at DESC
LIMIT 50
`).all();
const noops = db.query(`SELECT COUNT(*) as count FROM review_plans WHERE is_noop = 1`).get() as any;
const scheduler = getSchedulerState();
return c.json({ review, queued, processing, done, noopCount: noops.count, scheduler });
});
```
- [ ] **Step 4: Register pipeline routes in `server/index.tsx`**
The pipeline endpoints live on the existing review and execute routes. No new route registration needed — the endpoints are added to the existing Hono apps.
- [ ] **Step 5: Commit**
```bash
git add server/api/review.ts server/api/execute.ts
git commit -m "add pipeline API: approve-up-to, series language, pipeline summary, scheduler endpoints"
```
---
### Task 8: Kanban Board — Route and Layout
**Files:**
- Create: `src/routes/pipeline.tsx`
- Create: `src/features/pipeline/PipelinePage.tsx`
- Modify: `src/routes/__root.tsx`
- [ ] **Step 1: Create route file `src/routes/pipeline.tsx`**
```typescript
import { createFileRoute } from '@tanstack/react-router';
import { PipelinePage } from '~/features/pipeline/PipelinePage';
export const Route = createFileRoute('/pipeline')({
component: PipelinePage,
});
```
- [ ] **Step 2: Create `src/features/pipeline/PipelinePage.tsx`**
```typescript
import { useCallback, useEffect, useRef, useState } from 'react';
import { api } from '~/shared/lib/api';
import { ReviewColumn } from './ReviewColumn';
import { QueueColumn } from './QueueColumn';
import { ProcessingColumn } from './ProcessingColumn';
import { DoneColumn } from './DoneColumn';
import { ScheduleControls } from './ScheduleControls';
interface PipelineData {
review: any[];
queued: any[];
processing: any[];
done: any[];
noopCount: number;
scheduler: {
job_sleep_seconds: number;
schedule_enabled: boolean;
schedule_start: string;
schedule_end: string;
};
}
export function PipelinePage() {
const [data, setData] = useState<PipelineData | null>(null);
const [loading, setLoading] = useState(true);
const load = useCallback(async () => {
const res = await api.get<PipelineData>('/api/review/pipeline');
setData(res);
setLoading(false);
}, []);
useEffect(() => { load(); }, [load]);
// SSE for live updates
useEffect(() => {
const es = new EventSource('/api/execute/events');
es.addEventListener('job_update', () => load());
es.addEventListener('job_progress', (e) => {
const progress = JSON.parse(e.data);
setData(prev => prev ? { ...prev, _progress: progress } : prev);
});
es.addEventListener('queue_status', (e) => {
const status = JSON.parse(e.data);
setData(prev => prev ? { ...prev, _queueStatus: status } : prev);
});
return () => es.close();
}, [load]);
if (loading || !data) return <div className="p-6 text-gray-500">Loading pipeline...</div>;
return (
<div className="flex flex-col h-[calc(100vh-4rem)]">
<div className="flex items-center justify-between px-6 py-3 border-b">
<h1 className="text-lg font-semibold">Pipeline</h1>
<div className="flex items-center gap-4">
<span className="text-sm text-gray-500">{data.noopCount} files already processed</span>
<ScheduleControls scheduler={data.scheduler} onUpdate={load} />
</div>
</div>
<div className="flex flex-1 gap-4 p-4 overflow-x-auto">
<ReviewColumn items={data.review} onMutate={load} />
<QueueColumn items={data.queued} />
<ProcessingColumn items={data.processing} progress={(data as any)._progress} queueStatus={(data as any)._queueStatus} />
<DoneColumn items={data.done} />
</div>
</div>
);
}
```
- [ ] **Step 3: Update nav in `src/routes/__root.tsx`**
Replace the existing scan/review/execute nav links with a single Pipeline link. Keep the subtitle manager, nodes, and settings links:
```typescript
<NavLink to="/pipeline">Pipeline</NavLink>
<NavLink to="/review/subtitles">Subtitles</NavLink>
<NavLink to="/nodes">Nodes</NavLink>
<NavLink to="/settings">Settings</NavLink>
```
Keep the old routes functional (don't delete them) but remove them from the nav. Users can still access them via URL if needed.
- [ ] **Step 4: Commit**
```bash
git add src/routes/pipeline.tsx src/features/pipeline/PipelinePage.tsx src/routes/__root.tsx
git commit -m "add pipeline route and Kanban board layout with 4 columns"
```
---
### Task 9: Kanban Board — Review Column with Cards
**Files:**
- Create: `src/features/pipeline/ReviewColumn.tsx`
- Create: `src/features/pipeline/PipelineCard.tsx`
- Create: `src/features/pipeline/SeriesCard.tsx`
- [ ] **Step 1: Create `src/features/pipeline/PipelineCard.tsx`**
Card component for a single media item:
```typescript
import { Badge } from '~/shared/components/ui/badge';
import { Select } from '~/shared/components/ui/select';
import { LANG_NAMES, langName } from '~/shared/lib/lang';
interface PipelineCardProps {
item: any;
onLanguageChange?: (lang: string) => void;
showApproveUpTo?: boolean;
onApproveUpTo?: () => void;
}
export function PipelineCard({ item, onLanguageChange, showApproveUpTo, onApproveUpTo }: PipelineCardProps) {
const title = item.type === 'Episode'
? `S${String(item.season_number).padStart(2, '0')}E${String(item.episode_number).padStart(2, '0')}${item.name}`
: item.name;
const confidenceColor = item.confidence === 'high' ? 'bg-green-50 border-green-200' : 'bg-amber-50 border-amber-200';
return (
<div className={`rounded-lg border p-3 ${confidenceColor}`}>
<div className="flex items-start justify-between gap-2">
<div className="min-w-0">
<p className="text-sm font-medium truncate">{title}</p>
<div className="flex items-center gap-1.5 mt-1">
{/* OG language dropdown */}
{onLanguageChange ? (
<Select
className="h-6 text-xs w-20"
value={item.original_language ?? ''}
onChange={(e) => onLanguageChange(e.target.value)}
>
<option value="">unknown</option>
{Object.entries(LANG_NAMES).map(([code, name]) => (
<option key={code} value={code}>{name}</option>
))}
</Select>
) : (
<Badge variant="default">{langName(item.original_language)}</Badge>
)}
{/* Pipeline step badges */}
{item.apple_compat === 'audio_transcode' && (
<Badge variant="manual">transcode</Badge>
)}
{item.job_type === 'copy' && item.apple_compat !== 'audio_transcode' && (
<Badge variant="noop">copy</Badge>
)}
</div>
</div>
</div>
{showApproveUpTo && onApproveUpTo && (
<button
onClick={onApproveUpTo}
className="mt-2 w-full text-xs py-1 rounded bg-blue-600 text-white hover:bg-blue-700"
>
Approve up to here
</button>
)}
</div>
);
}
```
- [ ] **Step 2: Create `src/features/pipeline/SeriesCard.tsx`**
Collapsible series card:
```typescript
import { useState } from 'react';
import { api } from '~/shared/lib/api';
import { Select } from '~/shared/components/ui/select';
import { LANG_NAMES, langName } from '~/shared/lib/lang';
import { PipelineCard } from './PipelineCard';
interface SeriesCardProps {
seriesKey: string;
seriesName: string;
episodes: any[];
onMutate: () => void;
}
export function SeriesCard({ seriesKey, seriesName, episodes, onMutate }: SeriesCardProps) {
const [expanded, setExpanded] = useState(false);
// Use the first episode's language as the series language
const seriesLang = episodes[0]?.original_language ?? '';
const setSeriesLanguage = async (lang: string) => {
await api.patch(`/api/review/series/${encodeURIComponent(seriesKey)}/language`, { language: lang });
onMutate();
};
const approveSeries = async () => {
await api.post(`/api/review/series/${encodeURIComponent(seriesKey)}/approve-all`);
onMutate();
};
const highCount = episodes.filter((e: any) => e.confidence === 'high').length;
const lowCount = episodes.filter((e: any) => e.confidence === 'low').length;
return (
<div className="rounded-lg border bg-white">
<div
className="flex items-center justify-between p-3 cursor-pointer hover:bg-gray-50"
onClick={() => setExpanded(!expanded)}
>
<div className="flex items-center gap-2 min-w-0">
<span className="text-xs text-gray-400">{expanded ? '▼' : '▶'}</span>
<p className="text-sm font-medium truncate">{seriesName}</p>
<span className="text-xs text-gray-500">{episodes.length} eps</span>
{highCount > 0 && <span className="text-xs text-green-600">{highCount} ready</span>}
{lowCount > 0 && <span className="text-xs text-amber-600">{lowCount} review</span>}
</div>
<div className="flex items-center gap-2" onClick={(e) => e.stopPropagation()}>
<Select
className="h-6 text-xs w-20"
value={seriesLang}
onChange={(e) => setSeriesLanguage(e.target.value)}
>
<option value="">unknown</option>
{Object.entries(LANG_NAMES).map(([code, name]) => (
<option key={code} value={code}>{name}</option>
))}
</Select>
<button
onClick={approveSeries}
className="text-xs px-2 py-1 rounded bg-blue-600 text-white hover:bg-blue-700"
>
Approve all
</button>
</div>
</div>
{expanded && (
<div className="border-t px-3 pb-3 space-y-2 pt-2">
{episodes.map((ep: any) => (
<PipelineCard
key={ep.id}
item={ep}
onLanguageChange={async (lang) => {
await api.patch(`/api/review/${ep.item_id}/language`, { language: lang });
onMutate();
}}
/>
))}
</div>
)}
</div>
);
}
```
- [ ] **Step 3: Create `src/features/pipeline/ReviewColumn.tsx`**
```typescript
import { api } from '~/shared/lib/api';
import { PipelineCard } from './PipelineCard';
import { SeriesCard } from './SeriesCard';
interface ReviewColumnProps {
items: any[];
onMutate: () => void;
}
export function ReviewColumn({ items, onMutate }: ReviewColumnProps) {
// Group by series (movies are standalone)
const movies = items.filter(i => i.type === 'Movie');
const seriesMap = new Map<string, { name: string; key: string; episodes: any[] }>();
for (const item of items.filter(i => i.type === 'Episode')) {
const key = item.series_jellyfin_id ?? item.series_name;
if (!seriesMap.has(key)) {
seriesMap.set(key, { name: item.series_name, key, episodes: [] });
}
seriesMap.get(key)!.episodes.push(item);
}
const approveUpTo = async (planId: number) => {
await api.post(`/api/review/approve-up-to/${planId}`);
onMutate();
};
// Interleave movies and series, sorted by confidence (high first)
const allItems = [
...movies.map(m => ({ type: 'movie' as const, item: m, sortKey: m.confidence === 'high' ? 0 : 1 })),
...[...seriesMap.values()].map(s => ({
type: 'series' as const,
item: s,
sortKey: s.episodes.every((e: any) => e.confidence === 'high') ? 0 : 1,
})),
].sort((a, b) => a.sortKey - b.sortKey);
return (
<div className="flex flex-col w-80 min-w-80 bg-gray-50 rounded-lg">
<div className="px-3 py-2 border-b font-medium text-sm">
Review <span className="text-gray-400">({items.length})</span>
</div>
<div className="flex-1 overflow-y-auto p-2 space-y-2">
{allItems.map((entry, idx) => {
if (entry.type === 'movie') {
return (
<PipelineCard
key={entry.item.id}
item={entry.item}
onLanguageChange={async (lang) => {
await api.patch(`/api/review/${entry.item.item_id}/language`, { language: lang });
onMutate();
}}
showApproveUpTo
onApproveUpTo={() => approveUpTo(entry.item.id)}
/>
);
} else {
return (
<SeriesCard
key={entry.item.key}
seriesKey={entry.item.key}
seriesName={entry.item.name}
episodes={entry.item.episodes}
onMutate={onMutate}
/>
);
}
})}
{allItems.length === 0 && (
<p className="text-sm text-gray-400 text-center py-8">No items to review</p>
)}
</div>
</div>
);
}
```
- [ ] **Step 4: Commit**
```bash
git add src/features/pipeline/PipelineCard.tsx src/features/pipeline/SeriesCard.tsx src/features/pipeline/ReviewColumn.tsx
git commit -m "add pipeline review column with cards, series grouping, approve-up-to"
```
---
### Task 10: Kanban Board — Queue, Processing, Done Columns
**Files:**
- Create: `src/features/pipeline/QueueColumn.tsx`
- Create: `src/features/pipeline/ProcessingColumn.tsx`
- Create: `src/features/pipeline/DoneColumn.tsx`
- Create: `src/features/pipeline/ScheduleControls.tsx`
- [ ] **Step 1: Create `src/features/pipeline/QueueColumn.tsx`**
```typescript
import { Badge } from '~/shared/components/ui/badge';
interface QueueColumnProps {
items: any[];
}
export function QueueColumn({ items }: QueueColumnProps) {
return (
<div className="flex flex-col w-64 min-w-64 bg-gray-50 rounded-lg">
<div className="px-3 py-2 border-b font-medium text-sm">
Queued <span className="text-gray-400">({items.length})</span>
</div>
<div className="flex-1 overflow-y-auto p-2 space-y-1">
{items.map((item: any) => (
<div key={item.id} className="rounded border bg-white p-2">
<p className="text-xs font-medium truncate">{item.name}</p>
<Badge variant={item.job_type === 'transcode' ? 'manual' : 'noop'}>
{item.job_type}
</Badge>
</div>
))}
{items.length === 0 && (
<p className="text-sm text-gray-400 text-center py-8">Queue empty</p>
)}
</div>
</div>
);
}
```
- [ ] **Step 2: Create `src/features/pipeline/ProcessingColumn.tsx`**
```typescript
import { Badge } from '~/shared/components/ui/badge';
interface ProcessingColumnProps {
items: any[];
progress?: { id: number; seconds: number; total: number } | null;
queueStatus?: { status: string; until?: string; seconds?: number } | null;
}
export function ProcessingColumn({ items, progress, queueStatus }: ProcessingColumnProps) {
const job = items[0]; // at most one running job
const formatTime = (s: number) => {
const m = Math.floor(s / 60);
const sec = Math.floor(s % 60);
return `${m}:${String(sec).padStart(2, '0')}`;
};
return (
<div className="flex flex-col w-72 min-w-72 bg-gray-50 rounded-lg">
<div className="px-3 py-2 border-b font-medium text-sm">Processing</div>
<div className="flex-1 p-3">
{/* Queue status */}
{queueStatus && queueStatus.status !== 'running' && (
<div className="mb-3 text-xs text-gray-500 bg-white rounded border p-2">
{queueStatus.status === 'paused' && <>Paused until {queueStatus.until}</>}
{queueStatus.status === 'sleeping' && <>Sleeping {queueStatus.seconds}s between jobs</>}
{queueStatus.status === 'idle' && <>Idle</>}
</div>
)}
{job ? (
<div className="rounded border bg-white p-3">
<p className="text-sm font-medium truncate">{job.name}</p>
<div className="flex items-center gap-2 mt-1">
<Badge variant="running">running</Badge>
<Badge variant={job.job_type === 'transcode' ? 'manual' : 'noop'}>
{job.job_type}
</Badge>
</div>
{/* Progress bar for transcode jobs */}
{progress && progress.total > 0 && (
<div className="mt-3">
<div className="flex justify-between text-xs text-gray-500 mb-1">
<span>{formatTime(progress.seconds)}</span>
<span>{Math.round((progress.seconds / progress.total) * 100)}%</span>
<span>{formatTime(progress.total)}</span>
</div>
<div className="h-2 bg-gray-200 rounded-full overflow-hidden">
<div
className="h-full bg-blue-500 rounded-full transition-all"
style={{ width: `${Math.min(100, (progress.seconds / progress.total) * 100)}%` }}
/>
</div>
</div>
)}
</div>
) : (
<p className="text-sm text-gray-400 text-center py-8">No active job</p>
)}
</div>
</div>
);
}
```
- [ ] **Step 3: Create `src/features/pipeline/DoneColumn.tsx`**
```typescript
import { Badge } from '~/shared/components/ui/badge';
interface DoneColumnProps {
items: any[];
}
export function DoneColumn({ items }: DoneColumnProps) {
return (
<div className="flex flex-col w-64 min-w-64 bg-gray-50 rounded-lg">
<div className="px-3 py-2 border-b font-medium text-sm">
Done <span className="text-gray-400">({items.length})</span>
</div>
<div className="flex-1 overflow-y-auto p-2 space-y-1">
{items.map((item: any) => (
<div key={item.id} className="rounded border bg-white p-2">
<p className="text-xs font-medium truncate">{item.name}</p>
<Badge variant={item.status === 'done' ? 'done' : 'error'}>
{item.status}
</Badge>
</div>
))}
{items.length === 0 && (
<p className="text-sm text-gray-400 text-center py-8">No completed items</p>
)}
</div>
</div>
);
}
```
- [ ] **Step 4: Create `src/features/pipeline/ScheduleControls.tsx`**
```typescript
import { useState } from 'react';
import { api } from '~/shared/lib/api';
import { Input } from '~/shared/components/ui/input';
import { Button } from '~/shared/components/ui/button';
interface ScheduleControlsProps {
scheduler: {
job_sleep_seconds: number;
schedule_enabled: boolean;
schedule_start: string;
schedule_end: string;
};
onUpdate: () => void;
}
export function ScheduleControls({ scheduler, onUpdate }: ScheduleControlsProps) {
const [open, setOpen] = useState(false);
const [state, setState] = useState(scheduler);
const save = async () => {
await api.patch('/api/execute/scheduler', state);
onUpdate();
setOpen(false);
};
const startAll = async () => {
await api.post('/api/execute/start');
onUpdate();
};
return (
<div className="flex items-center gap-2">
<Button variant="primary" size="sm" onClick={startAll}>
Start queue
</Button>
<button
onClick={() => setOpen(!open)}
className="text-xs text-gray-500 hover:text-gray-700"
>
Schedule settings
</button>
{open && (
<div className="absolute right-4 top-16 z-50 bg-white border rounded-lg shadow-lg p-4 w-72">
<h3 className="text-sm font-medium mb-3">Schedule Settings</h3>
<label className="block text-xs text-gray-600 mb-1">Sleep between jobs (seconds)</label>
<Input
type="number"
min={0}
value={state.job_sleep_seconds}
onChange={(e) => setState({ ...state, job_sleep_seconds: parseInt(e.target.value) || 0 })}
className="mb-3"
/>
<label className="flex items-center gap-2 text-xs text-gray-600 mb-2">
<input
type="checkbox"
checked={state.schedule_enabled}
onChange={(e) => setState({ ...state, schedule_enabled: e.target.checked })}
/>
Enable time window
</label>
{state.schedule_enabled && (
<div className="flex items-center gap-2 mb-3">
<Input
type="time"
value={state.schedule_start}
onChange={(e) => setState({ ...state, schedule_start: e.target.value })}
className="w-24"
/>
<span className="text-xs text-gray-500">to</span>
<Input
type="time"
value={state.schedule_end}
onChange={(e) => setState({ ...state, schedule_end: e.target.value })}
className="w-24"
/>
</div>
)}
<Button variant="primary" size="sm" onClick={save}>Save</Button>
</div>
)}
</div>
);
}
```
- [ ] **Step 5: Commit**
```bash
git add src/features/pipeline/QueueColumn.tsx src/features/pipeline/ProcessingColumn.tsx src/features/pipeline/DoneColumn.tsx src/features/pipeline/ScheduleControls.tsx
git commit -m "add pipeline queue, processing, done columns, schedule controls"
```
---
### Task 11: Integration and Scan Page Update
**Files:**
- Modify: `src/routes/__root.tsx`
- Modify: `src/features/scan/ScanPage.tsx`
- [ ] **Step 1: Update the Scan page to link to Pipeline**
After a scan completes, show a link to the Pipeline page instead of the review page. Update the "complete" SSE handler to show:
```typescript
// After scan completes, show link to pipeline
<a href="/pipeline" className="text-blue-600 hover:underline">
Review in Pipeline
</a>
```
- [ ] **Step 2: Keep old routes accessible but not in nav**
Don't delete the old route files (`src/routes/review.tsx`, etc.) — they still work for direct URL access and for the subtitle manager. Just remove them from the nav bar in `__root.tsx`.
- [ ] **Step 3: Verify end-to-end flow**
1. Run a scan
2. Navigate to /pipeline
3. See items in Review column with confidence badges
4. Set OG language on a series
5. Click "Approve up to here" on an item
6. See items move to Queued
7. Click "Start queue"
8. See job in Processing with progress
9. See completed items in Done
- [ ] **Step 4: Commit**
```bash
git add src/features/scan/ScanPage.tsx src/routes/__root.tsx
git commit -m "wire pipeline into nav, link scan completion to pipeline page"
```
---
### Task 12: Cleanup and Polish
**Files:**
- Modify: Various
- [ ] **Step 1: Add `duration` to media_items if not present**
For FFmpeg progress, we need total duration. Check if Jellyfin provides `RunTimeTicks` and store it during scan. If already available, use it in the progress calculation.
- [ ] **Step 2: Handle edge cases in apple-compat**
- Files with no audio streams → skip audio analysis
- Files with only video streams → `is_noop` for audio steps
- Files where all embedded subs are already extracted → `subs_extracted = 1`
- [ ] **Step 3: Verify subtitle manager still works**
Navigate to /review/subtitles. Verify:
- Browse sidecar files
- Delete a sidecar file
- Language summary view works
- [ ] **Step 4: Bump version**
Update `package.json` version to `2026.03.27` (CalVer).
- [ ] **Step 5: Final commit**
```bash
git add -A
git commit -m "polish unified pipeline: edge cases, duration tracking, version bump"
```