import { watch, type FSWatcher } from 'node:fs'; import fs from 'node:fs/promises'; import type { Server as HttpServer } from 'node:http'; import path from 'node:path'; import type { Socket } from 'node:net'; import { WebSocketServer, type WebSocket } from 'ws'; import type { DashboardSnapshot, DashboardSnapshotPatch, DashboardSyncMessage } from '../../src/shared/contracts'; import { AuthService } from './auth'; import { getDashboardSnapshot } from './snapshot'; import type { RuntimePaths } from './config'; import type { RuntimeController } from './runtime'; import type { StateStore } from './store'; export interface LiveSyncPublisher { notifyPotentialChange(): void; } interface LiveSyncOptions { auth: AuthService; runtime: RuntimeController; runtimePaths: RuntimePaths; store: StateStore; } export class LiveSyncServer implements LiveSyncPublisher { private readonly wss = new WebSocketServer({ noServer: true }); private readonly watchers: FSWatcher[] = []; private readonly clients = new Set(); private lastSnapshot: DashboardSnapshot | null = null; private refreshTimer: NodeJS.Timeout | null = null; constructor(private readonly options: LiveSyncOptions) { this.wss.on('connection', (socket) => { this.clients.add(socket); socket.on('close', () => { this.clients.delete(socket); }); void this.sendInit(socket); }); } attach(server: HttpServer) { server.on('upgrade', (request, socket, head) => { const url = new URL(request.url ?? '/', `http://${request.headers.host ?? 'localhost'}`); if (url.pathname !== '/ws') { return; } const token = url.searchParams.get('token'); const session = token ? this.options.auth.verify(token) : null; if (!session) { socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); socket.destroy(); return; } this.wss.handleUpgrade(request, socket as Socket, head, (ws) => { this.wss.emit('connection', ws, request); this.scheduleSessionExpiry(ws, session.exp); }); }); } async initialize() { const directories = new Set([ this.options.runtimePaths.rootDir, path.dirname(this.options.runtimePaths.configPath), path.dirname(this.options.runtimePaths.logPath), path.dirname(this.options.runtimePaths.counterPath), this.options.runtimePaths.reportDir, ]); for (const directory of directories) { await fs.mkdir(directory, { recursive: true }); this.watchers.push(watch(directory, { recursive: false }, () => this.notifyPotentialChange())); } } notifyPotentialChange() { if (this.refreshTimer) { clearTimeout(this.refreshTimer); } this.refreshTimer = setTimeout(() => { this.refreshTimer = null; void this.refreshAndBroadcast(); }, 150); } async refreshAndBroadcast() { if (this.clients.size === 0) { this.lastSnapshot = await this.readSnapshot(); return; } const nextSnapshot = await this.readSnapshot(); const patch = buildSnapshotPatch(this.lastSnapshot, nextSnapshot); this.lastSnapshot = nextSnapshot; if (!patch) { return; } const message: DashboardSyncMessage = { type: 'snapshot.patch', patch }; const payload = JSON.stringify(message); this.clients.forEach((client) => { if (client.readyState === client.OPEN) { client.send(payload); } }); } close() { if (this.refreshTimer) { clearTimeout(this.refreshTimer); this.refreshTimer = null; } this.watchers.forEach((watcher) => watcher.close()); this.watchers.length = 0; this.clients.forEach((client) => client.close()); this.clients.clear(); this.wss.close(); } private async sendInit(socket: WebSocket) { const snapshot = await this.readSnapshot(); this.lastSnapshot = snapshot; const message: DashboardSyncMessage = { type: 'snapshot.init', snapshot, }; if (socket.readyState === socket.OPEN) { socket.send(JSON.stringify(message)); } } private readSnapshot() { return getDashboardSnapshot(this.options.store, this.options.runtime, this.options.runtimePaths); } private scheduleSessionExpiry(socket: WebSocket, expiresAt: number) { const timeoutMs = Math.max(expiresAt - Date.now(), 0); const timer = setTimeout(() => { if (socket.readyState === socket.OPEN) { socket.send( JSON.stringify({ type: 'session.expired', error: this.options.auth.invalidTokenError().error, } satisfies DashboardSyncMessage), ); } socket.close(); }, timeoutMs); socket.on('close', () => clearTimeout(timer)); } } export function buildSnapshotPatch( previous: DashboardSnapshot | null, next: DashboardSnapshot, ): DashboardSnapshotPatch | null { if (!previous) { return { service: next.service, traffic: next.traffic, users: next.users, attention: next.attention, userRecords: next.userRecords, system: next.system, }; } const patch: DashboardSnapshotPatch = {}; if (!isEqual(previous.service, next.service)) { patch.service = next.service; } if (!isEqual(previous.traffic, next.traffic)) { patch.traffic = next.traffic; } if (!isEqual(previous.users, next.users)) { patch.users = next.users; } if (!isEqual(previous.attention, next.attention)) { patch.attention = next.attention; } if (!isEqual(previous.userRecords, next.userRecords)) { patch.userRecords = next.userRecords; } if (!isEqual(previous.system, next.system)) { patch.system = next.system; } return Object.keys(patch).length > 0 ? patch : null; } function isEqual(left: unknown, right: unknown): boolean { return JSON.stringify(left) === JSON.stringify(right); }