// Room-aware WebSocket hub. // // Connect to `/ws?room=&kind=display|controller|panel`. Auth via the // session cookie. The server is the single source of truth for room state // (now-playing, volume); clients send `command` (intent) and the elected // display client emits `state` (truth) which is persisted to `room_state` // and rebroadcast. // // Message envelope: `{ type, ...payload }`. Types: // - hello server->client: { room, peers, state, role, you } // - presence server->room: { peers } // - command client->server: { action: play|pause|stop|volume|setSink, ... } // server->room: forwarded as-is // - state display->server: ground-truth playback snapshot // server->room: persisted snapshot // - sync-pos display->server: { stationId, masterCT, atServerNow } // server->room: forwarded; latest cached for hello replay // - devices display->server: { list, current } server->room: same // - vote server->room: { stationId, stats } emitted after castVote // - plays server->room: { stationId, plays } emitted after recordPlay import { WebSocketServer } from 'ws'; import { URL } from 'node:url'; import { getUserBySession, readSessionToken } from './auth.js'; import { getRoomBySlug, ensurePersonalRoom, isMember, getRoomState, setRoomState } from './rooms.js'; import { getStation } from './stations.js'; const DEBUG = !!process.env.ORADIO_DEBUG_SYNC; function dlog(...args) { if (DEBUG) console.log('[ws]', ...args); } // roomSlug -> Set const rooms = new Map(); // userId -> Set const byUser = new Map(); // roomSlug -> last `sync-pos` payload from this room's display, plus a // server-clock timestamp so receivers can age it. In-memory only; this churns // every couple of seconds and never needs to outlive a process. const lastSyncPos = new Map(); function addToIndex(map, key, ws) { if (!map.has(key)) map.set(key, new Set()); map.get(key).add(ws); } function removeFromIndex(map, key, ws) { const set = map.get(key); if (!set) return; set.delete(ws); if (!set.size) map.delete(key); } function presenceFor(roomSlug) { const set = rooms.get(roomSlug); if (!set) return []; const out = []; for (const ws of set) { out.push({ user: { id: ws.user.id, username: ws.user.username }, kind: ws.kind, since: ws.since }); } return out; } function send(ws, msg) { if (ws.readyState === ws.OPEN) ws.send(JSON.stringify(msg)); } export function broadcastToRoom(roomSlug, msg, except) { const set = rooms.get(roomSlug); if (!set) return; const payload = JSON.stringify(msg); for (const ws of set) { if (ws === except) continue; if (ws.readyState === ws.OPEN) ws.send(payload); } } /** Send to every connection of the given user, across all their rooms. */ export function broadcastToUser(userId, msg, except) { const set = byUser.get(userId); if (!set) return; const payload = JSON.stringify(msg); for (const ws of set) { if (ws === except) continue; if (ws.readyState === ws.OPEN) ws.send(payload); } } /** Broadcast to every open WS, regardless of room. Used for global events. */ export function broadcastGlobal(msg) { const payload = JSON.stringify(msg); for (const set of rooms.values()) { for (const ws of set) { if (ws.readyState === ws.OPEN) ws.send(payload); } } } export function hasDisplay(roomSlug) { const set = rooms.get(roomSlug); if (!set) return false; for (const ws of set) if (ws.kind === 'display') return true; return false; } // Mutates room_state for a play|pause|stop|volume intent and broadcasts both // the command (so the display actually changes audio) and the resulting state // (so every other peer's UI mirrors). Shared by the WS `command` handler and // the HTTP `POST /api/rooms/:slug/command` route so both paths produce // byte-identical broadcasts. // // `except` lets the WS path exclude the original sender from the command // rebroadcast (kiosks in linked mode would otherwise loop play→stop→play). // The state broadcast always goes to everyone — including the sender — so // UIs stay in sync. export function dispatchRoomCommand(room, msg, { except = null } = {}) { const slug = room.slug; const cur = getRoomState(room.id); const displayPresent = hasDisplay(slug); if (msg.action === 'play' && Number.isFinite(msg.stationId)) { const newStation = Number(msg.stationId); const sameAndPlaying = cur.playing && cur.station_id === newStation && cur.started_at; const patch = { station_id: newStation, playing: true }; if (!displayPresent) { patch.started_at = sameAndPlaying ? cur.started_at : Date.now(); } else if (cur.station_id !== newStation) { patch.started_at = null; } setRoomState(room.id, patch); } else if (msg.action === 'pause') { setRoomState(room.id, { playing: false }); } else if (msg.action === 'stop') { setRoomState(room.id, { playing: false, started_at: null }); } else if (msg.action === 'volume' && typeof msg.value === 'number') { setRoomState(room.id, { volume: Math.max(0, Math.min(1, msg.value)) }); } else { return null; } const next = getRoomState(room.id); const station = next.station_id ? getStation(next.station_id) : null; broadcastToRoom(slug, msg, except); broadcastToRoom(slug, { type: 'state', ...next, station, server_now: Date.now() }); return { state: next, station }; } export function attachWs(server) { const wss = new WebSocketServer({ noServer: true }); server.on('upgrade', (req, socket, head) => { if (!req.url.startsWith('/ws')) return socket.destroy(); const token = readSessionToken(req); const user = getUserBySession(token); if (!user) { socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); socket.destroy(); return; } const url = new URL(req.url, 'http://x'); const slug = url.searchParams.get('room') || `u-${user.id}`; let kindRaw = url.searchParams.get('kind') || 'controller'; if (!['display', 'controller', 'panel'].includes(kindRaw)) kindRaw = 'controller'; // Resolve room. The personal room is auto-provisioned on demand. let room = getRoomBySlug(slug); if (!room && slug === `u-${user.id}`) room = ensurePersonalRoom(user); if (!room) { socket.write('HTTP/1.1 404 Not Found\r\n\r\n'); socket.destroy(); return; } if (!isMember(room.id, user.id)) { socket.write('HTTP/1.1 403 Forbidden\r\n\r\n'); socket.destroy(); return; } // One display per room: subsequent claimers are silently demoted // to passive panels (no audio, no device picker). let kind = kindRaw; if (kind === 'display' && hasDisplay(room.slug)) kind = 'panel'; wss.handleUpgrade(req, socket, head, (ws) => { ws.user = user; ws.kind = kind; ws.room = room; ws.since = Date.now(); addToIndex(rooms, room.slug, ws); addToIndex(byUser, user.id, ws); ws.on('close', () => { removeFromIndex(rooms, room.slug, ws); removeFromIndex(byUser, user.id, ws); // If the departing socket was the display, its sync-pos cache // is stale (no source until the new display starts emitting). if (ws.kind === 'display' && !hasDisplay(room.slug)) { lastSyncPos.delete(room.slug); } broadcastToRoom(room.slug, { type: 'presence', peers: presenceFor(room.slug) }); }); ws.on('message', (raw) => { let msg; try { msg = JSON.parse(raw.toString()); } catch { return; } handleClientMessage(ws, msg); }); // Send hello snapshot. const state = getRoomState(room.id); const station = state.station_id ? getStation(state.station_id) : null; send(ws, { type: 'hello', you: { id: user.id, username: user.username, role: user.role, kind }, room: { id: room.id, slug: room.slug, name: room.name }, state: { ...state, station }, server_now: Date.now(), peers: presenceFor(room.slug), last_sync_pos: lastSyncPos.get(room.slug) || null }); broadcastToRoom(room.slug, { type: 'presence', peers: presenceFor(room.slug) }, ws); }); }); return wss; } function handleClientMessage(ws, msg) { if (!msg || typeof msg !== 'object') return; const slug = ws.room.slug; switch (msg.type) { case 'command': { // Controllers express intent. Forward to all peers; the display // is responsible for actually changing audio output. We also // optimistically reflect simple intents into room_state so a // late-joining peer sees the latest target station/volume even // before the display emits a confirmation `state`. dlog('cmd', msg.action, 'from', ws.user.username, 'kind', ws.kind, 'display?', hasDisplay(slug)); if (msg.action === 'setSyncBuffer' && Number.isFinite(msg.value)) { // Room-wide sync buffer. Forward to everyone (including the // master, which will adopt it and include in the next // sync-pos so late joiners pick it up). Not persisted to // room_state — it's ephemeral per session. const v = Math.max(500, Math.min(60000, Math.round(msg.value))); broadcastToRoom(slug, { type: 'command', action: 'setSyncBuffer', value: v }, ws); return; } if (msg.action === 'peerVolume' && Number.isFinite(msg.userId) && typeof msg.value === 'number') { // Targeted per-zone hint: do NOT persist in room_state. Just // forward to that user's connections in this room. Receiving // clients apply it as their LOCAL volume. const v = Math.max(0, Math.min(1, msg.value)); const set = rooms.get(slug); if (set) { const payload = JSON.stringify({ type: 'peerVolume', userId: msg.userId, value: v, from: ws.user.id }); for (const peer of set) { if (peer.user.id === msg.userId && peer.readyState === peer.OPEN) { peer.send(payload); } } } return; } // play/pause/stop/volume share their effect with the HTTP route, // so the mutation + broadcasts live in dispatchRoomCommand. The // command itself is NOT echoed back to `ws`: the sender already // acted locally, and echoing would re-trigger their command // handler (kiosks in linked mode would loop play→stop→play). // Other clients still see the command, and EVERYONE — including // the sender — sees the resulting `state` so UIs stay in sync. dispatchRoomCommand(ws.room, msg, { except: ws }); return; } case 'state': { // Only the display's state messages are persisted as truth. if (ws.kind !== 'display') return; const patch = {}; if ('stationId' in msg) patch.station_id = msg.stationId ?? null; if ('playing' in msg) patch.playing = !!msg.playing; if (typeof msg.volume === 'number') patch.volume = msg.volume; // Display can re-anchor started_at (e.g. when it begins playback). if ('started_at' in msg) patch.started_at = msg.started_at ?? null; dlog('state from display', patch); const prev = getRoomState(ws.room.id); const next = setRoomState(ws.room.id, patch); // Station change invalidates any cached master position. if (prev.station_id !== next.station_id) lastSyncPos.delete(slug); const station = next.station_id ? getStation(next.station_id) : null; broadcastToRoom(slug, { type: 'state', ...next, station, server_now: Date.now() }); return; } case 'clock-ping': { // NTP-lite: echo client's t1 + server t2; client computes offset. send(ws, { type: 'clock-pong', t1: msg.t1, t2: Date.now() }); return; } case 'sync-pos': { // Only the room's display has an authoritative stream position. // Forward to every other peer and cache so reconnecting peers can // anchor immediately from `hello.last_sync_pos`. if (ws.kind !== 'display') return; const payload = { stationId: Number.isFinite(msg.stationId) ? Number(msg.stationId) : null, masterCT: Number.isFinite(msg.masterCT) ? Number(msg.masterCT) : null, atServerNow: Number.isFinite(msg.atServerNow) ? Number(msg.atServerNow) : Date.now(), pdtMs: Number.isFinite(msg.pdtMs) ? Number(msg.pdtMs) : null, bufferMs: Number.isFinite(msg.bufferMs) ? Number(msg.bufferMs) : null }; if (payload.masterCT == null || payload.stationId == null) return; lastSyncPos.set(slug, payload); dlog('sync-pos', payload); broadcastToRoom(slug, { type: 'sync-pos', ...payload }, ws); return; } case 'devices': { if (ws.kind !== 'display') return; broadcastToRoom(slug, { type: 'devices', list: msg.list || [], current: msg.current || null }); return; } case 'ping': send(ws, { type: 'pong', t: Date.now() }); return; default: return; } }