import mqtt, { type MqttClient } from "mqtt"; import { getConfig } from "../db/index"; import { log, error as logError, warn } from "../lib/log"; import { handleWebhookMessage } from "./webhook"; export type MqttStatus = "connected" | "disconnected" | "error" | "not_configured"; interface MqttConfig { url: string; topic: string; username: string; password: string; } let client: MqttClient | null = null; let currentStatus: MqttStatus = "not_configured"; let currentError: string | null = null; const statusListeners = new Set<(status: MqttStatus, error: string | null) => void>(); export function getMqttStatus(): { status: MqttStatus; error: string | null } { return { status: currentStatus, error: currentError }; } export function onMqttStatus(fn: (status: MqttStatus, error: string | null) => void): () => void { statusListeners.add(fn); return () => { statusListeners.delete(fn); }; } function setStatus(next: MqttStatus, err: string | null = null): void { currentStatus = next; currentError = err; for (const l of statusListeners) l(next, err); } function readConfig(): MqttConfig | null { const url = getConfig("mqtt_url") ?? ""; if (!url) return null; return { url, topic: getConfig("mqtt_topic") ?? "jellyfin/events", username: getConfig("mqtt_username") ?? "", password: getConfig("mqtt_password") ?? "", }; } /** * Connect to the configured MQTT broker and subscribe to the webhook topic. * Safe to call repeatedly: an existing client is torn down first. When no * broker is configured, status is set to 'not_configured' and the call is * a no-op. */ export async function startMqttClient(): Promise { await stopMqttClient(); const cfg = readConfig(); if (!cfg) { setStatus("not_configured"); return; } log(`MQTT: connecting to ${cfg.url} (topic=${cfg.topic})`); const c = mqtt.connect(cfg.url, { username: cfg.username || undefined, password: cfg.password || undefined, reconnectPeriod: 5000, connectTimeout: 15_000, clientId: `netfelix-audio-fix-${Math.random().toString(16).slice(2, 10)}`, }); client = c; c.on("connect", () => { c.subscribe(cfg.topic, { qos: 0 }, (err) => { if (err) { logError(`MQTT subscribe to ${cfg.topic} failed:`, err); setStatus("error", String(err)); return; } log(`MQTT: connected, subscribed to ${cfg.topic}`); setStatus("connected"); }); }); c.on("reconnect", () => { setStatus("disconnected", "reconnecting"); }); c.on("close", () => { setStatus("disconnected", null); }); c.on("error", (err) => { warn(`MQTT error: ${String(err)}`); setStatus("error", String(err)); }); c.on("message", (_topic, payload) => { const text = payload.toString("utf8"); // Best-effort: the handler owns its own error handling. Don't let a // single malformed message tear the subscriber down. handleWebhookMessage(text).catch((err) => logError("webhook handler threw:", err)); }); } export async function stopMqttClient(): Promise { if (!client) return; const c = client; client = null; await new Promise((resolve) => { c.end(false, {}, () => resolve()); }); setStatus("not_configured"); } /** * Test a candidate MQTT configuration without touching the running client. * Connects, subscribes to `/#`, waits up to `timeoutMs` for any * message, then disconnects. Returns whether the connection succeeded and * whether any traffic arrived. */ export async function testMqttConnection( cfg: MqttConfig, timeoutMs = 30_000, ): Promise<{ connected: boolean; receivedMessage: boolean; error?: string; samplePayload?: string }> { return new Promise((resolve) => { const c = mqtt.connect(cfg.url, { username: cfg.username || undefined, password: cfg.password || undefined, reconnectPeriod: 0, connectTimeout: 10_000, clientId: `netfelix-audio-fix-test-${Math.random().toString(16).slice(2, 10)}`, }); let settled = false; const done = (result: { connected: boolean; receivedMessage: boolean; error?: string; samplePayload?: string }) => { if (settled) return; settled = true; c.end(true); resolve(result); }; c.on("connect", () => { c.subscribe(`${cfg.topic}`, { qos: 0 }, (err) => { if (err) { done({ connected: true, receivedMessage: false, error: String(err) }); } }); setTimeout(() => done({ connected: true, receivedMessage: false }), timeoutMs); }); c.on("message", (_topic, payload) => { done({ connected: true, receivedMessage: true, samplePayload: payload.toString("utf8").slice(0, 200) }); }); c.on("error", (err) => { done({ connected: false, receivedMessage: false, error: String(err) }); }); }); }