341 lines
14 KiB
JavaScript
341 lines
14 KiB
JavaScript
// Room-aware WebSocket hub.
|
|
//
|
|
// Connect to `/ws?room=<slug>&kind=display|controller|panel`. Auth via the
|
|
// session cookie. The server is the single source of truth for room state
|
|
// (now-playing, volume); clients send `command` (intent) and the elected
|
|
// display client emits `state` (truth) which is persisted to `room_state`
|
|
// and rebroadcast.
|
|
//
|
|
// Message envelope: `{ type, ...payload }`. Types:
|
|
// - hello server->client: { room, peers, state, role, you }
|
|
// - presence server->room: { peers }
|
|
// - command client->server: { action: play|pause|stop|volume|setSink, ... }
|
|
// server->room: forwarded as-is
|
|
// - state display->server: ground-truth playback snapshot
|
|
// server->room: persisted snapshot
|
|
// - sync-pos display->server: { stationId, masterCT, atServerNow }
|
|
// server->room: forwarded; latest cached for hello replay
|
|
// - devices display->server: { list, current } server->room: same
|
|
// - vote server->room: { stationId, stats } emitted after castVote
|
|
// - plays server->room: { stationId, plays } emitted after recordPlay
|
|
|
|
import { WebSocketServer } from 'ws';
|
|
import { URL } from 'node:url';
|
|
import { getUserBySession, readSessionToken } from './auth.js';
|
|
import {
|
|
getRoomBySlug, ensurePersonalRoom, isMember,
|
|
getRoomState, setRoomState
|
|
} from './rooms.js';
|
|
import { getStation } from './stations.js';
|
|
import { notifyCompanion } from './companion.js';
|
|
|
|
const DEBUG = !!process.env.ORADIO_DEBUG_SYNC;
|
|
function dlog(...args) { if (DEBUG) console.log('[ws]', ...args); }
|
|
|
|
// roomSlug -> Set<ws>
|
|
const rooms = new Map();
|
|
// userId -> Set<ws>
|
|
const byUser = new Map();
|
|
// roomSlug -> last `sync-pos` payload from this room's display, plus a
|
|
// server-clock timestamp so receivers can age it. In-memory only; this churns
|
|
// every couple of seconds and never needs to outlive a process.
|
|
const lastSyncPos = new Map();
|
|
|
|
function addToIndex(map, key, ws) {
|
|
if (!map.has(key)) map.set(key, new Set());
|
|
map.get(key).add(ws);
|
|
}
|
|
function removeFromIndex(map, key, ws) {
|
|
const set = map.get(key);
|
|
if (!set) return;
|
|
set.delete(ws);
|
|
if (!set.size) map.delete(key);
|
|
}
|
|
|
|
function presenceFor(roomSlug) {
|
|
const set = rooms.get(roomSlug);
|
|
if (!set) return [];
|
|
const out = [];
|
|
for (const ws of set) {
|
|
out.push({
|
|
user: { id: ws.user.id, username: ws.user.username },
|
|
kind: ws.kind,
|
|
since: ws.since
|
|
});
|
|
}
|
|
return out;
|
|
}
|
|
|
|
function send(ws, msg) {
|
|
if (ws.readyState === ws.OPEN) ws.send(JSON.stringify(msg));
|
|
}
|
|
|
|
export function broadcastToRoom(roomSlug, msg, except) {
|
|
const set = rooms.get(roomSlug);
|
|
if (!set) return;
|
|
const payload = JSON.stringify(msg);
|
|
for (const ws of set) {
|
|
if (ws === except) continue;
|
|
if (ws.readyState === ws.OPEN) ws.send(payload);
|
|
}
|
|
}
|
|
|
|
/** Send to every connection of the given user, across all their rooms. */
|
|
export function broadcastToUser(userId, msg, except) {
|
|
const set = byUser.get(userId);
|
|
if (!set) return;
|
|
const payload = JSON.stringify(msg);
|
|
for (const ws of set) {
|
|
if (ws === except) continue;
|
|
if (ws.readyState === ws.OPEN) ws.send(payload);
|
|
}
|
|
}
|
|
|
|
/** Broadcast to every open WS, regardless of room. Used for global events. */
|
|
export function broadcastGlobal(msg) {
|
|
const payload = JSON.stringify(msg);
|
|
for (const set of rooms.values()) {
|
|
for (const ws of set) {
|
|
if (ws.readyState === ws.OPEN) ws.send(payload);
|
|
}
|
|
}
|
|
}
|
|
|
|
export function hasDisplay(roomSlug) {
|
|
const set = rooms.get(roomSlug);
|
|
if (!set) return false;
|
|
for (const ws of set) if (ws.kind === 'display') return true;
|
|
return false;
|
|
}
|
|
|
|
// Mutates room_state for a play|pause|stop|volume intent and broadcasts both
|
|
// the command (so the display actually changes audio) and the resulting state
|
|
// (so every other peer's UI mirrors). Shared by the WS `command` handler and
|
|
// the HTTP `POST /api/rooms/:slug/command` route so both paths produce
|
|
// byte-identical broadcasts.
|
|
//
|
|
// `except` lets the WS path exclude the original sender from the command
|
|
// rebroadcast (kiosks in linked mode would otherwise loop play→stop→play).
|
|
// The state broadcast always goes to everyone — including the sender — so
|
|
// UIs stay in sync.
|
|
export function dispatchRoomCommand(room, msg, { except = null } = {}) {
|
|
const slug = room.slug;
|
|
const cur = getRoomState(room.id);
|
|
const displayPresent = hasDisplay(slug);
|
|
|
|
if (msg.action === 'play' && Number.isFinite(msg.stationId)) {
|
|
const newStation = Number(msg.stationId);
|
|
notifyCompanion(newStation);
|
|
const sameAndPlaying = cur.playing && cur.station_id === newStation && cur.started_at;
|
|
const patch = { station_id: newStation, playing: true };
|
|
if (!displayPresent) {
|
|
patch.started_at = sameAndPlaying ? cur.started_at : Date.now();
|
|
} else if (cur.station_id !== newStation) {
|
|
patch.started_at = null;
|
|
}
|
|
setRoomState(room.id, patch);
|
|
} else if (msg.action === 'pause') {
|
|
setRoomState(room.id, { playing: false });
|
|
notifyCompanion(null);
|
|
} else if (msg.action === 'stop') {
|
|
setRoomState(room.id, { playing: false, started_at: null });
|
|
notifyCompanion(null);
|
|
} else if (msg.action === 'volume' && typeof msg.value === 'number') {
|
|
setRoomState(room.id, { volume: Math.max(0, Math.min(1, msg.value)) });
|
|
} else {
|
|
return null;
|
|
}
|
|
|
|
const next = getRoomState(room.id);
|
|
const station = next.station_id ? getStation(next.station_id) : null;
|
|
broadcastToRoom(slug, msg, except);
|
|
broadcastToRoom(slug, { type: 'state', ...next, station, server_now: Date.now() });
|
|
return { state: next, station };
|
|
}
|
|
|
|
export function attachWs(server) {
|
|
const wss = new WebSocketServer({ noServer: true });
|
|
|
|
server.on('upgrade', (req, socket, head) => {
|
|
if (!req.url.startsWith('/ws')) return socket.destroy();
|
|
const token = readSessionToken(req);
|
|
const user = getUserBySession(token);
|
|
if (!user) {
|
|
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
|
|
socket.destroy();
|
|
return;
|
|
}
|
|
|
|
const url = new URL(req.url, 'http://x');
|
|
const slug = url.searchParams.get('room') || `u-${user.id}`;
|
|
let kindRaw = url.searchParams.get('kind') || 'controller';
|
|
if (!['display', 'controller', 'panel'].includes(kindRaw)) kindRaw = 'controller';
|
|
|
|
// Resolve room. The personal room is auto-provisioned on demand.
|
|
let room = getRoomBySlug(slug);
|
|
if (!room && slug === `u-${user.id}`) room = ensurePersonalRoom(user);
|
|
if (!room) {
|
|
socket.write('HTTP/1.1 404 Not Found\r\n\r\n');
|
|
socket.destroy();
|
|
return;
|
|
}
|
|
if (!isMember(room.id, user.id)) {
|
|
socket.write('HTTP/1.1 403 Forbidden\r\n\r\n');
|
|
socket.destroy();
|
|
return;
|
|
}
|
|
|
|
// One display per room: subsequent claimers are silently demoted
|
|
// to passive panels (no audio, no device picker).
|
|
let kind = kindRaw;
|
|
if (kind === 'display' && hasDisplay(room.slug)) kind = 'panel';
|
|
|
|
wss.handleUpgrade(req, socket, head, (ws) => {
|
|
ws.user = user;
|
|
ws.kind = kind;
|
|
ws.room = room;
|
|
ws.since = Date.now();
|
|
addToIndex(rooms, room.slug, ws);
|
|
addToIndex(byUser, user.id, ws);
|
|
|
|
ws.on('close', () => {
|
|
removeFromIndex(rooms, room.slug, ws);
|
|
removeFromIndex(byUser, user.id, ws);
|
|
// If the departing socket was the display, its sync-pos cache
|
|
// is stale (no source until the new display starts emitting).
|
|
if (ws.kind === 'display' && !hasDisplay(room.slug)) {
|
|
lastSyncPos.delete(room.slug);
|
|
}
|
|
broadcastToRoom(room.slug, { type: 'presence', peers: presenceFor(room.slug) });
|
|
});
|
|
|
|
ws.on('message', (raw) => {
|
|
let msg;
|
|
try { msg = JSON.parse(raw.toString()); } catch { return; }
|
|
handleClientMessage(ws, msg);
|
|
});
|
|
|
|
// Send hello snapshot.
|
|
const state = getRoomState(room.id);
|
|
const station = state.station_id ? getStation(state.station_id) : null;
|
|
send(ws, {
|
|
type: 'hello',
|
|
you: { id: user.id, username: user.username, role: user.role, kind },
|
|
room: { id: room.id, slug: room.slug, name: room.name },
|
|
state: { ...state, station },
|
|
server_now: Date.now(),
|
|
peers: presenceFor(room.slug),
|
|
last_sync_pos: lastSyncPos.get(room.slug) || null
|
|
});
|
|
broadcastToRoom(room.slug, { type: 'presence', peers: presenceFor(room.slug) }, ws);
|
|
});
|
|
});
|
|
|
|
return wss;
|
|
}
|
|
|
|
function handleClientMessage(ws, msg) {
|
|
if (!msg || typeof msg !== 'object') return;
|
|
const slug = ws.room.slug;
|
|
|
|
switch (msg.type) {
|
|
case 'command': {
|
|
// Controllers express intent. Forward to all peers; the display
|
|
// is responsible for actually changing audio output. We also
|
|
// optimistically reflect simple intents into room_state so a
|
|
// late-joining peer sees the latest target station/volume even
|
|
// before the display emits a confirmation `state`.
|
|
dlog('cmd', msg.action, 'from', ws.user.username, 'kind', ws.kind, 'display?', hasDisplay(slug));
|
|
if (msg.action === 'setSyncBuffer' && Number.isFinite(msg.value)) {
|
|
// Room-wide sync buffer. Forward to everyone (including the
|
|
// master, which will adopt it and include in the next
|
|
// sync-pos so late joiners pick it up). Not persisted to
|
|
// room_state — it's ephemeral per session.
|
|
const v = Math.max(500, Math.min(60000, Math.round(msg.value)));
|
|
broadcastToRoom(slug, { type: 'command', action: 'setSyncBuffer', value: v }, ws);
|
|
return;
|
|
}
|
|
if (msg.action === 'peerVolume' && Number.isFinite(msg.userId) && typeof msg.value === 'number') {
|
|
// Targeted per-zone hint: do NOT persist in room_state. Just
|
|
// forward to that user's connections in this room. Receiving
|
|
// clients apply it as their LOCAL volume.
|
|
const v = Math.max(0, Math.min(1, msg.value));
|
|
const set = rooms.get(slug);
|
|
if (set) {
|
|
const payload = JSON.stringify({ type: 'peerVolume', userId: msg.userId, value: v, from: ws.user.id });
|
|
for (const peer of set) {
|
|
if (peer.user.id === msg.userId && peer.readyState === peer.OPEN) {
|
|
peer.send(payload);
|
|
}
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
// play/pause/stop/volume share their effect with the HTTP route,
|
|
// so the mutation + broadcasts live in dispatchRoomCommand. The
|
|
// command itself is NOT echoed back to `ws`: the sender already
|
|
// acted locally, and echoing would re-trigger their command
|
|
// handler (kiosks in linked mode would loop play→stop→play).
|
|
// Other clients still see the command, and EVERYONE — including
|
|
// the sender — sees the resulting `state` so UIs stay in sync.
|
|
dispatchRoomCommand(ws.room, msg, { except: ws });
|
|
return;
|
|
}
|
|
case 'state': {
|
|
// Only the display's state messages are persisted as truth.
|
|
if (ws.kind !== 'display') return;
|
|
const patch = {};
|
|
if ('stationId' in msg) patch.station_id = msg.stationId ?? null;
|
|
if ('playing' in msg) patch.playing = !!msg.playing;
|
|
if (typeof msg.volume === 'number') patch.volume = msg.volume;
|
|
// Display can re-anchor started_at (e.g. when it begins playback).
|
|
if ('started_at' in msg) patch.started_at = msg.started_at ?? null;
|
|
dlog('state from display', patch);
|
|
const prev = getRoomState(ws.room.id);
|
|
const next = setRoomState(ws.room.id, patch);
|
|
// Station change invalidates any cached master position.
|
|
if (prev.station_id !== next.station_id) lastSyncPos.delete(slug);
|
|
// Notify Companion whenever the effective now-playing state changes.
|
|
if (prev.station_id !== next.station_id || prev.playing !== next.playing) {
|
|
notifyCompanion(next.playing && next.station_id ? next.station_id : null);
|
|
}
|
|
const station = next.station_id ? getStation(next.station_id) : null;
|
|
broadcastToRoom(slug, { type: 'state', ...next, station, server_now: Date.now() });
|
|
return;
|
|
}
|
|
case 'clock-ping': {
|
|
// NTP-lite: echo client's t1 + server t2; client computes offset.
|
|
send(ws, { type: 'clock-pong', t1: msg.t1, t2: Date.now() });
|
|
return;
|
|
}
|
|
case 'sync-pos': {
|
|
// Only the room's display has an authoritative stream position.
|
|
// Forward to every other peer and cache so reconnecting peers can
|
|
// anchor immediately from `hello.last_sync_pos`.
|
|
if (ws.kind !== 'display') return;
|
|
const payload = {
|
|
stationId: Number.isFinite(msg.stationId) ? Number(msg.stationId) : null,
|
|
masterCT: Number.isFinite(msg.masterCT) ? Number(msg.masterCT) : null,
|
|
atServerNow: Number.isFinite(msg.atServerNow) ? Number(msg.atServerNow) : Date.now(),
|
|
pdtMs: Number.isFinite(msg.pdtMs) ? Number(msg.pdtMs) : null,
|
|
bufferMs: Number.isFinite(msg.bufferMs) ? Number(msg.bufferMs) : null
|
|
};
|
|
if (payload.masterCT == null || payload.stationId == null) return;
|
|
lastSyncPos.set(slug, payload);
|
|
dlog('sync-pos', payload);
|
|
broadcastToRoom(slug, { type: 'sync-pos', ...payload }, ws);
|
|
return;
|
|
}
|
|
case 'devices': {
|
|
if (ws.kind !== 'display') return;
|
|
broadcastToRoom(slug, { type: 'devices', list: msg.list || [], current: msg.current || null });
|
|
return;
|
|
}
|
|
case 'ping':
|
|
send(ws, { type: 'pong', t: Date.now() });
|
|
return;
|
|
default:
|
|
return;
|
|
}
|
|
}
|