// Room-aware WebSocket hub. // // Connect to `/ws?room=&kind=display|controller|panel`. Auth via the // session cookie. The server is the single source of truth for room state // (now-playing, volume); clients send `command` (intent) and the elected // display client emits `state` (truth) which is persisted to `room_state` // and rebroadcast. // // Message envelope: `{ type, ...payload }`. Types: // - hello server->client: { room, peers, state, role, you } // - presence server->room: { peers } // - command client->server: { action: play|pause|stop|volume|setSink, ... } // server->room: forwarded as-is // - state display->server: ground-truth playback snapshot // server->room: persisted snapshot // - devices display->server: { list, current } server->room: same // - vote server->room: { stationId, stats } emitted after castVote // - plays server->room: { stationId, plays } emitted after recordPlay import { WebSocketServer } from 'ws'; import { URL } from 'node:url'; import { getUserBySession, readSessionToken } from './auth.js'; import { getRoomBySlug, ensurePersonalRoom, isMember, getRoomState, setRoomState } from './rooms.js'; import { getStation } from './stations.js'; // roomSlug -> Set const rooms = new Map(); // userId -> Set const byUser = new Map(); function addToIndex(map, key, ws) { if (!map.has(key)) map.set(key, new Set()); map.get(key).add(ws); } function removeFromIndex(map, key, ws) { const set = map.get(key); if (!set) return; set.delete(ws); if (!set.size) map.delete(key); } function presenceFor(roomSlug) { const set = rooms.get(roomSlug); if (!set) return []; const out = []; for (const ws of set) { out.push({ user: { id: ws.user.id, username: ws.user.username }, kind: ws.kind, since: ws.since }); } return out; } function send(ws, msg) { if (ws.readyState === ws.OPEN) ws.send(JSON.stringify(msg)); } export function broadcastToRoom(roomSlug, msg, except) { const set = rooms.get(roomSlug); if (!set) return; const payload = JSON.stringify(msg); for (const ws of set) { if (ws === except) continue; if (ws.readyState === ws.OPEN) ws.send(payload); } } /** Send to every connection of the given user, across all their rooms. */ export function broadcastToUser(userId, msg, except) { const set = byUser.get(userId); if (!set) return; const payload = JSON.stringify(msg); for (const ws of set) { if (ws === except) continue; if (ws.readyState === ws.OPEN) ws.send(payload); } } /** Broadcast to every open WS, regardless of room. Used for global events. */ export function broadcastGlobal(msg) { const payload = JSON.stringify(msg); for (const set of rooms.values()) { for (const ws of set) { if (ws.readyState === ws.OPEN) ws.send(payload); } } } function hasDisplay(roomSlug) { const set = rooms.get(roomSlug); if (!set) return false; for (const ws of set) if (ws.kind === 'display') return true; return false; } export function attachWs(server) { const wss = new WebSocketServer({ noServer: true }); server.on('upgrade', (req, socket, head) => { if (!req.url.startsWith('/ws')) return socket.destroy(); const token = readSessionToken(req); const user = getUserBySession(token); if (!user) { socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); socket.destroy(); return; } const url = new URL(req.url, 'http://x'); const slug = url.searchParams.get('room') || `u-${user.id}`; let kindRaw = url.searchParams.get('kind') || 'controller'; if (!['display', 'controller', 'panel'].includes(kindRaw)) kindRaw = 'controller'; // Resolve room. The personal room is auto-provisioned on demand. let room = getRoomBySlug(slug); if (!room && slug === `u-${user.id}`) room = ensurePersonalRoom(user); if (!room) { socket.write('HTTP/1.1 404 Not Found\r\n\r\n'); socket.destroy(); return; } if (!isMember(room.id, user.id)) { socket.write('HTTP/1.1 403 Forbidden\r\n\r\n'); socket.destroy(); return; } // One display per room: subsequent claimers are silently demoted // to passive panels (no audio, no device picker). let kind = kindRaw; if (kind === 'display' && hasDisplay(room.slug)) kind = 'panel'; wss.handleUpgrade(req, socket, head, (ws) => { ws.user = user; ws.kind = kind; ws.room = room; ws.since = Date.now(); addToIndex(rooms, room.slug, ws); addToIndex(byUser, user.id, ws); ws.on('close', () => { removeFromIndex(rooms, room.slug, ws); removeFromIndex(byUser, user.id, ws); broadcastToRoom(room.slug, { type: 'presence', peers: presenceFor(room.slug) }); }); ws.on('message', (raw) => { let msg; try { msg = JSON.parse(raw.toString()); } catch { return; } handleClientMessage(ws, msg); }); // Send hello snapshot. const state = getRoomState(room.id); const station = state.station_id ? getStation(state.station_id) : null; send(ws, { type: 'hello', you: { id: user.id, username: user.username, role: user.role, kind }, room: { id: room.id, slug: room.slug, name: room.name }, state: { ...state, station }, peers: presenceFor(room.slug) }); broadcastToRoom(room.slug, { type: 'presence', peers: presenceFor(room.slug) }, ws); }); }); return wss; } function handleClientMessage(ws, msg) { if (!msg || typeof msg !== 'object') return; const slug = ws.room.slug; switch (msg.type) { case 'command': { // Controllers express intent. Forward to all peers; the display // is responsible for actually changing audio output. We also // optimistically reflect simple intents into room_state so a // late-joining peer sees the latest target station/volume even // before the display emits a confirmation `state`. if (msg.action === 'play' && Number.isFinite(msg.stationId)) { setRoomState(ws.room.id, { station_id: Number(msg.stationId), playing: true }); } else if (msg.action === 'stop') { setRoomState(ws.room.id, { playing: false }); } else if (msg.action === 'volume' && typeof msg.value === 'number') { setRoomState(ws.room.id, { volume: Math.max(0, Math.min(1, msg.value)) }); } broadcastToRoom(slug, msg, null); // include sender so its UI mirrors return; } case 'state': { // Only the display's state messages are persisted as truth. if (ws.kind !== 'display') return; const patch = {}; if ('stationId' in msg) patch.station_id = msg.stationId ?? null; if ('playing' in msg) patch.playing = !!msg.playing; if (typeof msg.volume === 'number') patch.volume = msg.volume; const next = setRoomState(ws.room.id, patch); const station = next.station_id ? getStation(next.station_id) : null; broadcastToRoom(slug, { type: 'state', ...next, station }); return; } case 'devices': { if (ws.kind !== 'display') return; broadcastToRoom(slug, { type: 'devices', list: msg.list || [], current: msg.current || null }); return; } case 'ping': send(ws, { type: 'pong', t: Date.now() }); return; default: return; } }