spec: jellyfin webhook ping-pong via mqtt
This commit is contained in:
213
docs/superpowers/specs/2026-04-14-jellyfin-webhook-design.md
Normal file
213
docs/superpowers/specs/2026-04-14-jellyfin-webhook-design.md
Normal file
@@ -0,0 +1,213 @@
|
||||
# Jellyfin Webhook Ping-Pong
|
||||
|
||||
**Date:** 2026-04-14
|
||||
**Status:** Design
|
||||
**Scope:** Close the processing loop between netfelix-audio-fix and Jellyfin.
|
||||
|
||||
## Goal
|
||||
|
||||
After ffmpeg finishes, the server currently blocks on a Jellyfin refresh +
|
||||
re-analysis before freeing the job queue. Move that responsibility to a
|
||||
Jellyfin-driven event: we hand off the rescan to Jellyfin and return. When
|
||||
Jellyfin detects the modified (or a newly added) file, it publishes an MQTT
|
||||
message and we react — re-analyzing the item and either confirming the plan
|
||||
as `done` or flipping it back to `pending`. The result is a ping-pong
|
||||
between the two systems that terminates in plans Jellyfin itself has just
|
||||
verified.
|
||||
|
||||
## Non-goals
|
||||
|
||||
- HTTP webhook fallback. One transport.
|
||||
- Persisting missed MQTT messages. If the broker is unreachable while a
|
||||
file changes, the user uses the existing manual rescan.
|
||||
- Programmatic creation/deletion of the Jellyfin-side plugin config.
|
||||
- MQTT TLS client certificates (`mqtts://` with username/password is
|
||||
sufficient).
|
||||
|
||||
## Architecture
|
||||
|
||||
### 1. Outbound hand-off (execute.ts)
|
||||
|
||||
`server/api/execute.ts` currently calls `refreshItemFromJellyfin()` after
|
||||
a successful job, which triggers Jellyfin's rescan, fetches the item, and
|
||||
calls `upsertJellyfinItem()` to re-analyze. Remove the fetch and re-upsert
|
||||
steps. Keep only the fire-and-forget refresh.
|
||||
|
||||
```
|
||||
after ffmpeg exit 0:
|
||||
markJobDone + markPlanDone (unchanged)
|
||||
insertSubtitleFiles (unchanged)
|
||||
refreshItem(jfCfg, jellyfinId) (best-effort; log + swallow)
|
||||
return (do not await analysis)
|
||||
```
|
||||
|
||||
The MQTT subscriber handles everything downstream of Jellyfin's rescan.
|
||||
|
||||
### 2. MQTT subscriber
|
||||
|
||||
A single long-lived MQTT client owned by the server process.
|
||||
|
||||
**Library:** `mqtt` (npm), Bun-compatible.
|
||||
|
||||
**Config (new `config` rows):**
|
||||
- `mqtt_url` — e.g. `mqtt://192.168.1.10:1883`, `mqtts://…` for TLS
|
||||
- `mqtt_topic` — default `jellyfin/events`
|
||||
- `mqtt_username` — optional
|
||||
- `mqtt_password` — optional, stored alongside other credentials and
|
||||
overridable by env var using the existing `getEnvLockedKeys()` pattern
|
||||
|
||||
**Lifecycle:**
|
||||
- On server boot, read config. If `mqtt_url` is set, connect and subscribe
|
||||
to `mqtt_topic`.
|
||||
- `mqtt.js` reconnects automatically. We log `connect`, `close`, and
|
||||
`error` events and publish them through the existing SSE channel as
|
||||
`mqtt_status` events so the UI badge can update live.
|
||||
- When config changes (Settings save), tear down the current client and
|
||||
start a new one.
|
||||
|
||||
**Message handling:** `handleWebhookMessage(db, jfCfg, payload)` is a pure
|
||||
function. The MQTT subscriber's on-message callback parses JSON and calls
|
||||
this. Unit tests drive it directly.
|
||||
|
||||
```
|
||||
parse JSON payload → { event, itemId, itemType }
|
||||
if event not in {'ItemAdded', 'ItemUpdated'}: drop
|
||||
if itemType not in {'Movie', 'Episode'}: drop
|
||||
if dedupeMap.has(itemId): drop (burst filter)
|
||||
dedupeMap.set(itemId, Date.now()) (evict after 5s)
|
||||
|
||||
fresh = getItem(jfCfg, itemId) (one HTTP call)
|
||||
if fresh is null: drop + log
|
||||
upsertJellyfinItem(db, fresh, cfg, { source: 'webhook' })
|
||||
```
|
||||
|
||||
### 3. "done is terminal" override for webhook-driven rescans
|
||||
|
||||
`rescan.ts` currently treats `review_plans.status = 'done'` as terminal
|
||||
so scans don't reopen plans and spawn duplicate jobs (see commit
|
||||
a06ab34). The webhook path wants the opposite: a post-processing event
|
||||
should be able to flip `done` back to `pending` when the on-disk streams
|
||||
no longer satisfy `is_noop`.
|
||||
|
||||
Add an options flag:
|
||||
|
||||
```ts
|
||||
upsertJellyfinItem(db, item, cfg, opts: { executed?, source?: 'scan'|'webhook' })
|
||||
```
|
||||
|
||||
Plan-status transition rules in rescan:
|
||||
|
||||
| Current | is_noop | source | Next |
|
||||
|---------|---------|--------------|----------|
|
||||
| done | 1 | any | done |
|
||||
| done | 0 | `scan` | done | (current safety net)
|
||||
| done | 0 | `webhook` | pending | (authoritative re-open)
|
||||
| other | any | any | pending | (existing behavior)
|
||||
|
||||
Scan-flow callers default to `source: 'scan'` (or omit). Only the MQTT
|
||||
handler passes `source: 'webhook'`.
|
||||
|
||||
### 4. Settings UI + connection status
|
||||
|
||||
**New Settings section — "Jellyfin webhook (MQTT)":**
|
||||
- Broker URL, Topic (default `jellyfin/events`), Username, Password.
|
||||
- "Test connection" button: connects to the broker with the submitted
|
||||
credentials, subscribes to `<topic>/#`, waits up to 30 s. On any message
|
||||
shows green success with a snippet of the payload; on timeout shows an
|
||||
amber "connected but no traffic — trigger a library edit in Jellyfin"
|
||||
message; on connect/auth error shows red with the error text.
|
||||
|
||||
**Webhook setup panel** (on the same Settings section, below the form):
|
||||
- Uses the existing `jellyfin_url` + `jellyfin_api_key` to call `GET
|
||||
/Plugins` and check whether the Webhook plugin is installed. If not,
|
||||
shows install instructions with a deep-link to the Jellyfin plugin
|
||||
catalog. Otherwise shows:
|
||||
- The exact values to paste into the Webhook plugin's MQTT destination:
|
||||
broker URL, port, topic, events (`Item Added`, `Item Updated`), item-type
|
||||
filter (`Movie`, `Episode`), handlebars template:
|
||||
```json
|
||||
{
|
||||
"event": "{{NotificationType}}",
|
||||
"itemId": "{{ItemId}}",
|
||||
"itemType": "{{ItemType}}"
|
||||
}
|
||||
```
|
||||
Each value has a copy-to-clipboard button.
|
||||
|
||||
**Connection status badge** on the dashboard / Scan page: `MQTT:
|
||||
connected | disconnected | not configured`, driven by the
|
||||
`mqtt_status` SSE events above.
|
||||
|
||||
## Data flow
|
||||
|
||||
```
|
||||
┌──────────────┐ ffmpeg done ┌─────────────┐
|
||||
│ execute.ts │ ────────────▶ │ Jellyfin │
|
||||
│ (job queue) │ rescan RPC │ │
|
||||
└──────────────┘ └──────┬──────┘
|
||||
▲ │ library scan finds
|
||||
│ │ changed/new file
|
||||
│ ▼
|
||||
│ ┌─────────────┐
|
||||
│ │ Webhook │
|
||||
│ │ plugin │
|
||||
│ └──────┬──────┘
|
||||
│ │ publishes ItemUpdated
|
||||
│ ▼
|
||||
│ ┌─────────────┐
|
||||
│ mqtt_status SSE ──────▶│ MQTT broker │
|
||||
│ └──────┬──────┘
|
||||
│ │
|
||||
┌──────┴───────────┐ re-analyze │
|
||||
│ mqtt subscriber │◀──────────────────┘
|
||||
│ handler │
|
||||
│ │ upsertJellyfinItem(..., source:'webhook')
|
||||
│ │────────────▶ review_plans.status =
|
||||
│ │ 'done' if is_noop, else 'pending'
|
||||
└──────────────────┘
|
||||
```
|
||||
|
||||
## Testing
|
||||
|
||||
- **`handleWebhookMessage` unit tests** (`server/services/__tests__/`):
|
||||
seeds an in-memory DB with a known plan, feeds synthetic payloads,
|
||||
asserts:
|
||||
- `ItemUpdated` + `is_noop=1` leaves plan `done`.
|
||||
- `ItemUpdated` + `is_noop=0` with `source:'webhook'` flips `done → pending`.
|
||||
- Unknown itemId inserts a new media_item (same path as scan).
|
||||
- Non-Movie/Episode types are ignored.
|
||||
- Duplicate messages within 5 s are dropped.
|
||||
- `getItem` returning null is logged and dropped.
|
||||
- **execute.ts test**: after job success, `refreshItem` is called once and
|
||||
`upsertJellyfinItem` is NOT called synchronously.
|
||||
- No integration test against a real broker — the `mqtt` library itself
|
||||
is not under test.
|
||||
|
||||
## Migration / rollout
|
||||
|
||||
- Database: two new `config` rows. No schema change needed; config is
|
||||
key/value.
|
||||
- Env-var overrides for `MQTT_URL`, `MQTT_USERNAME`, `MQTT_PASSWORD`,
|
||||
`MQTT_TOPIC` via existing `getEnvLockedKeys()`.
|
||||
- First deploy with `mqtt_url` unset: subscriber doesn't start, existing
|
||||
flow (scan → review → approve → execute) continues unchanged. Users opt
|
||||
in by filling in the Settings fields.
|
||||
- Removing the old post-job re-analyze is a behavior change for users
|
||||
who haven't configured MQTT. They lose automatic verification of
|
||||
finished jobs until they set up the webhook. They can still trigger a
|
||||
manual rescan.
|
||||
|
||||
## Risks & mitigations
|
||||
|
||||
- **Burst messages from Jellyfin's scan.** Multiple `ItemUpdated` events
|
||||
per item during a library sweep. Mitigated by 5 s in-memory dedupe map.
|
||||
- **Broker outage at the moment a job finishes.** Message is lost.
|
||||
Accepted; the user can trigger a manual rescan, and the next actual
|
||||
library event will resync.
|
||||
- **Plugin template drift.** The Webhook plugin's handlebars variables
|
||||
are stable across releases; if a future release renames `ItemId`, the
|
||||
Settings panel's copyable template is the single place to update.
|
||||
- **`done → pending` oscillation.** If analysis flaps between noop and
|
||||
non-noop due to a Jellyfin metadata race, the UI could bounce. Same 5 s
|
||||
dedupe protects against burst; a persistent non-noop indicates a real
|
||||
problem worth surfacing.
|
||||
Reference in New Issue
Block a user