Add live sync heartbeat for status decay
This commit is contained in:
@@ -17,19 +17,31 @@ export interface LiveSyncPublisher {
|
||||
|
||||
interface LiveSyncOptions {
|
||||
auth: AuthService;
|
||||
heartbeatMs?: number;
|
||||
runtime: RuntimeController;
|
||||
runtimePaths: RuntimePaths;
|
||||
snapshotReader?: () => Promise<DashboardSnapshot>;
|
||||
store: StateStore;
|
||||
}
|
||||
|
||||
const DEFAULT_HEARTBEAT_MS = 60 * 1000;
|
||||
|
||||
export class LiveSyncServer implements LiveSyncPublisher {
|
||||
private readonly wss = new WebSocketServer({ noServer: true });
|
||||
private readonly watchers: FSWatcher[] = [];
|
||||
private readonly clients = new Set<WebSocket>();
|
||||
private lastSnapshot: DashboardSnapshot | null = null;
|
||||
private readonly heartbeatMs: number;
|
||||
private heartbeatTimer: NodeJS.Timeout | null = null;
|
||||
private refreshTimer: NodeJS.Timeout | null = null;
|
||||
private refreshPromise: Promise<void> | null = null;
|
||||
|
||||
constructor(private readonly options: LiveSyncOptions) {
|
||||
this.heartbeatMs =
|
||||
Number.isFinite(options.heartbeatMs) && (options.heartbeatMs ?? 0) > 0
|
||||
? options.heartbeatMs!
|
||||
: DEFAULT_HEARTBEAT_MS;
|
||||
|
||||
this.wss.on('connection', (socket) => {
|
||||
this.clients.add(socket);
|
||||
socket.on('close', () => {
|
||||
@@ -76,6 +88,12 @@ export class LiveSyncServer implements LiveSyncPublisher {
|
||||
await fs.mkdir(directory, { recursive: true });
|
||||
this.watchers.push(watch(directory, { recursive: false }, () => this.notifyPotentialChange()));
|
||||
}
|
||||
|
||||
if (!this.heartbeatTimer) {
|
||||
this.heartbeatTimer = setInterval(() => {
|
||||
void this.refreshAndBroadcast();
|
||||
}, this.heartbeatMs);
|
||||
}
|
||||
}
|
||||
|
||||
notifyPotentialChange() {
|
||||
@@ -90,30 +108,23 @@ export class LiveSyncServer implements LiveSyncPublisher {
|
||||
}
|
||||
|
||||
async refreshAndBroadcast() {
|
||||
if (this.clients.size === 0) {
|
||||
this.lastSnapshot = await this.readSnapshot();
|
||||
return;
|
||||
if (this.refreshPromise) {
|
||||
return this.refreshPromise;
|
||||
}
|
||||
|
||||
const nextSnapshot = await this.readSnapshot();
|
||||
const patch = buildSnapshotPatch(this.lastSnapshot, nextSnapshot);
|
||||
this.lastSnapshot = nextSnapshot;
|
||||
|
||||
if (!patch) {
|
||||
return;
|
||||
this.refreshPromise = this.performRefreshAndBroadcast();
|
||||
try {
|
||||
await this.refreshPromise;
|
||||
} finally {
|
||||
this.refreshPromise = null;
|
||||
}
|
||||
|
||||
const message: DashboardSyncMessage = { type: 'snapshot.patch', patch };
|
||||
const payload = JSON.stringify(message);
|
||||
|
||||
this.clients.forEach((client) => {
|
||||
if (client.readyState === client.OPEN) {
|
||||
client.send(payload);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
close() {
|
||||
if (this.heartbeatTimer) {
|
||||
clearInterval(this.heartbeatTimer);
|
||||
this.heartbeatTimer = null;
|
||||
}
|
||||
if (this.refreshTimer) {
|
||||
clearTimeout(this.refreshTimer);
|
||||
this.refreshTimer = null;
|
||||
@@ -140,6 +151,10 @@ export class LiveSyncServer implements LiveSyncPublisher {
|
||||
}
|
||||
|
||||
private readSnapshot() {
|
||||
if (this.options.snapshotReader) {
|
||||
return this.options.snapshotReader();
|
||||
}
|
||||
|
||||
return getDashboardSnapshot(this.options.store, this.options.runtime, this.options.runtimePaths);
|
||||
}
|
||||
|
||||
@@ -160,6 +175,30 @@ export class LiveSyncServer implements LiveSyncPublisher {
|
||||
|
||||
socket.on('close', () => clearTimeout(timer));
|
||||
}
|
||||
|
||||
private async performRefreshAndBroadcast() {
|
||||
if (this.clients.size === 0) {
|
||||
this.lastSnapshot = await this.readSnapshot();
|
||||
return;
|
||||
}
|
||||
|
||||
const nextSnapshot = await this.readSnapshot();
|
||||
const patch = buildSnapshotPatch(this.lastSnapshot, nextSnapshot);
|
||||
this.lastSnapshot = nextSnapshot;
|
||||
|
||||
if (!patch) {
|
||||
return;
|
||||
}
|
||||
|
||||
const message: DashboardSyncMessage = { type: 'snapshot.patch', patch };
|
||||
const payload = JSON.stringify(message);
|
||||
|
||||
this.clients.forEach((client) => {
|
||||
if (client.readyState === client.OPEN) {
|
||||
client.send(payload);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export function buildSnapshotPatch(
|
||||
|
||||
Reference in New Issue
Block a user