Files
netfelix-audio-fix/server/services/mqtt.ts
Felix Förtsch 425ee751ce
All checks were successful
Build and Push Docker Image / build (push) Successful in 48s
mqtt test: use playback start as reliable trigger, drop auto-prefix
two fixes based on actual behavior of the jellyfin webhook plugin:

- 'Webhook Url' setup value no longer re-serialized with mqtt://. show
  the user's broker url verbatim so whatever protocol they use (ws://,
  http://, etc.) survives the round trip
- dropped the server-side 'trigger a jellyfin rescan during the test'
  machinery. a refresh that doesn't mutate metadata won't fire Item
  Added, so relying on it produced false negatives. now we just wait
  for any message on the topic; ui instructs the user to hit play on a
  movie in jellyfin while the test runs — playback start is a
  deterministic trigger, unlike library events
- setup panel now lists Notification Types as 'Item Added, Playback
  Start'. playback start is for the test only; the production handler
  still filters events down to item added / updated
2026-04-14 09:55:32 +02:00

207 lines
6.1 KiB
TypeScript

import mqtt, { type MqttClient } from "mqtt";
import { getConfig } from "../db/index";
import { log, error as logError, warn } from "../lib/log";
import { handleWebhookMessage } from "./webhook";
export type MqttStatus = "connected" | "disconnected" | "error" | "not_configured";
interface MqttConfig {
url: string;
topic: string;
username: string;
password: string;
}
let client: MqttClient | null = null;
let currentStatus: MqttStatus = "not_configured";
let currentError: string | null = null;
const statusListeners = new Set<(status: MqttStatus, error: string | null) => void>();
export function getMqttStatus(): { status: MqttStatus; error: string | null } {
return { status: currentStatus, error: currentError };
}
export function onMqttStatus(fn: (status: MqttStatus, error: string | null) => void): () => void {
statusListeners.add(fn);
return () => {
statusListeners.delete(fn);
};
}
function setStatus(next: MqttStatus, err: string | null = null): void {
currentStatus = next;
currentError = err;
for (const l of statusListeners) l(next, err);
}
function readConfig(): MqttConfig | null {
if (getConfig("mqtt_enabled") !== "1") return null;
const url = getConfig("mqtt_url") ?? "";
if (!url) return null;
return {
url,
topic: getConfig("mqtt_topic") ?? "jellyfin/events",
username: getConfig("mqtt_username") ?? "",
password: getConfig("mqtt_password") ?? "",
};
}
/**
* Connect to the configured MQTT broker and subscribe to the webhook topic.
* Safe to call repeatedly: an existing client is torn down first. When no
* broker is configured, status is set to 'not_configured' and the call is
* a no-op.
*/
export async function startMqttClient(): Promise<void> {
await stopMqttClient();
const cfg = readConfig();
if (!cfg) {
setStatus("not_configured");
return;
}
log(`MQTT: connecting to ${cfg.url} (topic=${cfg.topic})`);
const c = mqtt.connect(cfg.url, {
username: cfg.username || undefined,
password: cfg.password || undefined,
reconnectPeriod: 5000,
connectTimeout: 15_000,
clientId: `netfelix-audio-fix-${Math.random().toString(16).slice(2, 10)}`,
});
client = c;
c.on("connect", () => {
c.subscribe(cfg.topic, { qos: 0 }, (err) => {
if (err) {
logError(`MQTT subscribe to ${cfg.topic} failed:`, err);
setStatus("error", String(err));
return;
}
log(`MQTT: connected, subscribed to ${cfg.topic}`);
setStatus("connected");
});
});
c.on("reconnect", () => {
setStatus("disconnected", "reconnecting");
});
c.on("close", () => {
setStatus("disconnected", null);
});
c.on("error", (err) => {
warn(`MQTT error: ${String(err)}`);
setStatus("error", String(err));
});
c.on("message", (_topic, payload) => {
const text = payload.toString("utf8");
// Best-effort: the handler owns its own error handling. Don't let a
// single malformed message tear the subscriber down.
handleWebhookMessage(text).catch((err) => logError("webhook handler threw:", err));
});
}
export async function stopMqttClient(): Promise<void> {
if (!client) return;
const c = client;
client = null;
await new Promise<void>((resolve) => {
c.end(false, {}, () => resolve());
});
setStatus("not_configured");
}
export interface MqttTestResult {
brokerConnected: boolean;
jellyfinTriggered: boolean;
receivedMessage: boolean;
itemName?: string;
expectedItemId?: string;
samplePayload?: string;
error?: string;
}
/**
* End-to-end test of the MQTT loop: connect to the broker, subscribe to the
* topic, ask Jellyfin to refresh a known item, and wait for the plugin to
* publish a matching event. A pass proves the whole chain is wired up —
* broker creds, Jellyfin webhook plugin config, and network reachability
* between Jellyfin and broker.
*
* `triggerRefresh` is async and returns the Jellyfin item id we're waiting
* for (so we can match only messages about that item and ignore unrelated
* traffic). When null, we fall back to "any message on the topic" mode —
* useful before the library is scanned.
*/
export async function testMqttConnection(
cfg: MqttConfig,
triggerRefresh: () => Promise<{ itemId: string; itemName: string } | null>,
timeoutMs = 30_000,
): Promise<MqttTestResult> {
return new Promise((resolve) => {
const c = mqtt.connect(cfg.url, {
username: cfg.username || undefined,
password: cfg.password || undefined,
reconnectPeriod: 0,
connectTimeout: 10_000,
clientId: `netfelix-audio-fix-test-${Math.random().toString(16).slice(2, 10)}`,
});
let settled = false;
let expectedItemId: string | null = null;
let itemName: string | undefined;
let jellyfinTriggered = false;
let brokerConnected = false;
const done = (result: Omit<MqttTestResult, "expectedItemId" | "jellyfinTriggered" | "brokerConnected">) => {
if (settled) return;
settled = true;
c.end(true);
resolve({
brokerConnected,
jellyfinTriggered,
expectedItemId: expectedItemId ?? undefined,
itemName,
...result,
});
};
c.on("connect", () => {
brokerConnected = true;
c.subscribe(cfg.topic, { qos: 0 }, async (err) => {
if (err) {
done({ receivedMessage: false, error: `subscribe: ${String(err)}` });
return;
}
// Subscribed. Trigger the Jellyfin refresh so the webhook has
// something concrete to publish.
try {
const trigger = await triggerRefresh();
if (trigger) {
expectedItemId = trigger.itemId;
itemName = trigger.itemName;
jellyfinTriggered = true;
}
} catch (triggerErr) {
done({ receivedMessage: false, error: `jellyfin trigger: ${String(triggerErr)}` });
return;
}
});
setTimeout(() => done({ receivedMessage: false }), timeoutMs);
});
c.on("message", (_topic, payload) => {
// Any message on the configured topic is enough — a rescan of an
// unchanged item won't fire Item Added, so the "itemId matches"
// filter would cause false failures. The user triggers real
// activity in Jellyfin if the auto-rescan doesn't wake anything.
done({ receivedMessage: true, samplePayload: payload.toString("utf8").slice(0, 400) });
});
c.on("error", (err) => {
done({ receivedMessage: false, error: String(err) });
});
});
}