- Implemented a new API endpoint for retrieving and managing user favorites in /api/users. - Added functionality for admins to edit the shared "main" user's favorites. - Created a one-shot DB smoke test script for verifying multi-user kiosk migrations. - Introduced a RoomClock class for synchronizing server time across clients using WebSocket.
147 lines
5.3 KiB
JavaScript
147 lines
5.3 KiB
JavaScript
// NTP-lite room clock. Exchanges clock-ping/clock-pong pairs over the existing
|
||
// WebSocket and keeps a server/local offset so all clients can agree on a
|
||
// wall-clock for stream-sync purposes.
|
||
//
|
||
// Filter: drop samples whose RTT > 2× rolling median, then take the median
|
||
// offset of the remainder. Lowest-RTT alone is too vulnerable to one lucky
|
||
// packet that happens to skew the offset.
|
||
//
|
||
// Adaptive ping rate: 1 s while still converging (offsetStd > 5 ms or < 8
|
||
// accepted samples), 5 s after stabilising. The original flat 15 s heartbeat
|
||
// was too slow to recover from a WiFi RTT spike.
|
||
//
|
||
// Usage:
|
||
// const clock = new RoomClock();
|
||
// clock.attachWs(ws); // start handshake
|
||
// clock.now(); // returns estimated server epoch ms
|
||
// clock.onUpdate((info) => ...) // get notified when offset moves
|
||
|
||
const FAST_PING_MS = 1000;
|
||
const SLOW_PING_MS = 5000;
|
||
const STABLE_STD_MS = 5;
|
||
const STABLE_MIN_SAMPLES = 8;
|
||
const SAMPLE_WINDOW = 16;
|
||
|
||
function median(nums) {
|
||
if (!nums.length) return 0;
|
||
const s = nums.slice().sort((a, b) => a - b);
|
||
const mid = s.length >> 1;
|
||
return s.length % 2 ? s[mid] : (s[mid - 1] + s[mid]) / 2;
|
||
}
|
||
|
||
export class RoomClock {
|
||
constructor() {
|
||
this.offset = 0; // ms to add to Date.now() to get server time
|
||
this.rtt = Infinity; // RTT of the latest accepted sample (ms)
|
||
this.offsetStd = Infinity; // std-dev (ms) of accepted offsets in window
|
||
this.samples = []; // recent { offset, rtt } pairs
|
||
this.synced = false;
|
||
this._pending = new Map(); // t1 -> sent timestamp
|
||
this._listeners = new Set();
|
||
this._timeoutId = null;
|
||
this._ws = null;
|
||
}
|
||
|
||
attachWs(wsClient) {
|
||
this._ws = wsClient;
|
||
this.reset();
|
||
// Burst of 5 pings ~150ms apart, then adaptive heartbeat.
|
||
let n = 0;
|
||
const burst = () => {
|
||
if (n++ >= 5) {
|
||
this._scheduleNext();
|
||
return;
|
||
}
|
||
this._sendPing();
|
||
setTimeout(burst, 150);
|
||
};
|
||
burst();
|
||
}
|
||
|
||
detach() {
|
||
if (this._timeoutId) clearTimeout(this._timeoutId);
|
||
this._timeoutId = null;
|
||
this._pending.clear();
|
||
this._ws = null;
|
||
}
|
||
|
||
reset() {
|
||
this.samples = [];
|
||
this.synced = false;
|
||
this.offsetStd = Infinity;
|
||
this._pending.clear();
|
||
if (this._timeoutId) { clearTimeout(this._timeoutId); this._timeoutId = null; }
|
||
}
|
||
|
||
/** Server epoch ms estimate. */
|
||
now() { return Date.now() + this.offset; }
|
||
|
||
/** True once the clock has stabilised: enough samples and low jitter. */
|
||
isStable() {
|
||
return this.synced
|
||
&& this.samples.length >= STABLE_MIN_SAMPLES
|
||
&& this.offsetStd <= STABLE_STD_MS;
|
||
}
|
||
|
||
onUpdate(fn) { this._listeners.add(fn); return () => this._listeners.delete(fn); }
|
||
|
||
/** Called by the WS dispatcher when a `clock-pong` arrives. */
|
||
handlePong(msg) {
|
||
const sent = this._pending.get(msg.t1);
|
||
if (sent == null) return;
|
||
this._pending.delete(msg.t1);
|
||
const t4 = Date.now();
|
||
const rtt = t4 - msg.t1;
|
||
// Symmetric one-way latency assumption: server-clock at midpoint == t2,
|
||
// local-clock at midpoint == (t1+t4)/2, so offset = t2 - (t1+t4)/2.
|
||
const offset = msg.t2 - (msg.t1 + t4) / 2;
|
||
this.samples.push({ offset, rtt });
|
||
if (this.samples.length > SAMPLE_WINDOW) this.samples.shift();
|
||
|
||
// Drop samples whose RTT is > 2× rolling median RTT — those are
|
||
// bufferbloat / WiFi-burst outliers and tend to carry a skewed offset.
|
||
const rttMed = median(this.samples.map((s) => s.rtt));
|
||
const cutoff = Math.max(rttMed * 2, rttMed + 10);
|
||
const good = this.samples.filter((s) => s.rtt <= cutoff);
|
||
const offsets = good.length ? good.map((s) => s.offset) : this.samples.map((s) => s.offset);
|
||
const medOffset = median(offsets);
|
||
// Std-dev of the accepted offsets — clock-quality metric.
|
||
const mean = offsets.reduce((a, b) => a + b, 0) / offsets.length;
|
||
const variance = offsets.reduce((a, b) => a + (b - mean) ** 2, 0) / offsets.length;
|
||
this.offsetStd = Math.sqrt(variance);
|
||
this.offset = medOffset;
|
||
this.rtt = rtt;
|
||
this.synced = true;
|
||
for (const fn of this._listeners) {
|
||
fn({
|
||
offset: this.offset,
|
||
rtt: this.rtt,
|
||
offsetStd: this.offsetStd,
|
||
samples: this.samples.length,
|
||
accepted: good.length,
|
||
stable: this.isStable()
|
||
});
|
||
}
|
||
}
|
||
|
||
_sendPing() {
|
||
if (!this._ws) return;
|
||
const t1 = Date.now();
|
||
this._pending.set(t1, t1);
|
||
// Drop very old pending entries to avoid leaking memory if pongs are lost.
|
||
for (const k of this._pending.keys()) {
|
||
if (t1 - k > 5000) this._pending.delete(k);
|
||
}
|
||
this._ws.send({ type: 'clock-ping', t1 });
|
||
}
|
||
|
||
_scheduleNext() {
|
||
if (this._timeoutId) clearTimeout(this._timeoutId);
|
||
const delay = this.isStable() ? SLOW_PING_MS : FAST_PING_MS;
|
||
this._timeoutId = setTimeout(() => {
|
||
this._sendPing();
|
||
this._scheduleNext();
|
||
}, delay);
|
||
}
|
||
}
|