From 2602fab6a7eb5220d6ca8f0dc3ad41b7b36f4b2d Mon Sep 17 00:00:00 2001 From: rednakse Date: Thu, 2 Apr 2026 03:03:39 +0300 Subject: [PATCH] Add live sync heartbeat for status decay --- docs/PLAN.md | 1 + docs/PROJECT_INDEX.md | 4 +- server/lib/liveSync.test.ts | 45 +++++++++++++++++++++- server/lib/liveSync.ts | 75 ++++++++++++++++++++++++++++--------- 4 files changed, 103 insertions(+), 22 deletions(-) diff --git a/docs/PLAN.md b/docs/PLAN.md index 0bcbfbf..27e5ff0 100644 --- a/docs/PLAN.md +++ b/docs/PLAN.md @@ -44,3 +44,4 @@ Updated: 2026-04-02 27. Verified websocket delivery in Docker over plain `ws://127.0.0.1:3000/ws` by authenticating, receiving `snapshot.init`, mutating panel state, and observing a follow-up `snapshot.patch`. 28. Reworked Users actions into fixed-width icon buttons, added edit-in-modal flow, generated credentials only for new users, and blocked action buttons while commands are in flight. 29. Added backend user-update support plus runtime stop control, then verified both in Docker by updating `u-1` and stopping the real bundled 3proxy process through the API. +30. Added a websocket heartbeat so time-based status transitions such as `live -> idle/warn` are recalculated predictably even when no new proxy events arrive. diff --git a/docs/PROJECT_INDEX.md b/docs/PROJECT_INDEX.md index ac22085..8cd7b66 100644 --- a/docs/PROJECT_INDEX.md +++ b/docs/PROJECT_INDEX.md @@ -44,8 +44,8 @@ Updated: 2026-04-02 - `server/lib/auth.ts`: expiring token issuance and bearer-token verification for the panel - `server/lib/config.ts`: 3proxy config renderer, validation, and dashboard derivation for SOCKS/HTTP managed services - `server/lib/config.test.ts`: config-generation regression tests -- `server/lib/liveSync.ts`: websocket broadcaster that emits `snapshot.init` and top-level `snapshot.patch` messages from runtime/store changes -- `server/lib/liveSync.test.ts`: regression tests for patch-only websocket payload generation +- `server/lib/liveSync.ts`: websocket broadcaster that emits `snapshot.init` and top-level `snapshot.patch` messages from runtime/store changes plus a heartbeat for time-based status decay +- `server/lib/liveSync.test.ts`: regression tests for patch-only websocket payload generation and heartbeat-driven refreshes - `server/lib/snapshot.ts`: runtime-backed dashboard snapshot assembly that combines stored panel state with parsed 3proxy traffic observations - `server/lib/traffic.ts`: 3proxy access-log reader that derives current user usage, recent activity, daily totals, and lightweight live-connection estimates - `server/lib/traffic.test.ts`: parser and empty-runtime regression tests for log-derived traffic metrics diff --git a/server/lib/liveSync.test.ts b/server/lib/liveSync.test.ts index de2412c..55ad0d7 100644 --- a/server/lib/liveSync.test.ts +++ b/server/lib/liveSync.test.ts @@ -1,6 +1,18 @@ -import { describe, expect, it } from 'vitest'; +import fs from 'node:fs/promises'; +import os from 'node:os'; +import path from 'node:path'; +import { afterEach, describe, expect, it, vi } from 'vitest'; import { fallbackDashboardSnapshot } from '../../src/data/mockDashboard'; -import { buildSnapshotPatch } from './liveSync'; +import { AuthService } from './auth'; +import { buildRuntimePaths } from './config'; +import { LiveSyncServer, buildSnapshotPatch } from './liveSync'; + +const cleanupDirs: string[] = []; + +afterEach(async () => { + vi.useRealTimers(); + await Promise.all(cleanupDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true }))); +}); describe('buildSnapshotPatch', () => { it('returns only changed top-level dashboard sections', () => { @@ -27,4 +39,33 @@ describe('buildSnapshotPatch', () => { system: fallbackDashboardSnapshot.system, }); }); + + it('refreshes snapshots on the heartbeat even without file changes', async () => { + vi.useFakeTimers(); + + const rootDir = await fs.mkdtemp(path.join(os.tmpdir(), '3proxy-ui-live-sync-')); + cleanupDirs.push(rootDir); + + const snapshotReader = vi.fn().mockResolvedValue(fallbackDashboardSnapshot); + const server = new LiveSyncServer({ + auth: new AuthService({ + login: 'admin', + password: 'proxy-ui-demo', + ttlMs: 60_000, + }), + heartbeatMs: 50, + runtime: {} as never, + runtimePaths: buildRuntimePaths(rootDir), + snapshotReader, + store: {} as never, + }); + + await server.initialize(); + expect(snapshotReader).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(50); + expect(snapshotReader).toHaveBeenCalledTimes(1); + + server.close(); + }); }); diff --git a/server/lib/liveSync.ts b/server/lib/liveSync.ts index 7049b1f..6b53f04 100644 --- a/server/lib/liveSync.ts +++ b/server/lib/liveSync.ts @@ -17,19 +17,31 @@ export interface LiveSyncPublisher { interface LiveSyncOptions { auth: AuthService; + heartbeatMs?: number; runtime: RuntimeController; runtimePaths: RuntimePaths; + snapshotReader?: () => Promise; store: StateStore; } +const DEFAULT_HEARTBEAT_MS = 60 * 1000; + export class LiveSyncServer implements LiveSyncPublisher { private readonly wss = new WebSocketServer({ noServer: true }); private readonly watchers: FSWatcher[] = []; private readonly clients = new Set(); private lastSnapshot: DashboardSnapshot | null = null; + private readonly heartbeatMs: number; + private heartbeatTimer: NodeJS.Timeout | null = null; private refreshTimer: NodeJS.Timeout | null = null; + private refreshPromise: Promise | null = null; constructor(private readonly options: LiveSyncOptions) { + this.heartbeatMs = + Number.isFinite(options.heartbeatMs) && (options.heartbeatMs ?? 0) > 0 + ? options.heartbeatMs! + : DEFAULT_HEARTBEAT_MS; + this.wss.on('connection', (socket) => { this.clients.add(socket); socket.on('close', () => { @@ -76,6 +88,12 @@ export class LiveSyncServer implements LiveSyncPublisher { await fs.mkdir(directory, { recursive: true }); this.watchers.push(watch(directory, { recursive: false }, () => this.notifyPotentialChange())); } + + if (!this.heartbeatTimer) { + this.heartbeatTimer = setInterval(() => { + void this.refreshAndBroadcast(); + }, this.heartbeatMs); + } } notifyPotentialChange() { @@ -90,30 +108,23 @@ export class LiveSyncServer implements LiveSyncPublisher { } async refreshAndBroadcast() { - if (this.clients.size === 0) { - this.lastSnapshot = await this.readSnapshot(); - return; + if (this.refreshPromise) { + return this.refreshPromise; } - const nextSnapshot = await this.readSnapshot(); - const patch = buildSnapshotPatch(this.lastSnapshot, nextSnapshot); - this.lastSnapshot = nextSnapshot; - - if (!patch) { - return; + this.refreshPromise = this.performRefreshAndBroadcast(); + try { + await this.refreshPromise; + } finally { + this.refreshPromise = null; } - - const message: DashboardSyncMessage = { type: 'snapshot.patch', patch }; - const payload = JSON.stringify(message); - - this.clients.forEach((client) => { - if (client.readyState === client.OPEN) { - client.send(payload); - } - }); } close() { + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } if (this.refreshTimer) { clearTimeout(this.refreshTimer); this.refreshTimer = null; @@ -140,6 +151,10 @@ export class LiveSyncServer implements LiveSyncPublisher { } private readSnapshot() { + if (this.options.snapshotReader) { + return this.options.snapshotReader(); + } + return getDashboardSnapshot(this.options.store, this.options.runtime, this.options.runtimePaths); } @@ -160,6 +175,30 @@ export class LiveSyncServer implements LiveSyncPublisher { socket.on('close', () => clearTimeout(timer)); } + + private async performRefreshAndBroadcast() { + if (this.clients.size === 0) { + this.lastSnapshot = await this.readSnapshot(); + return; + } + + const nextSnapshot = await this.readSnapshot(); + const patch = buildSnapshotPatch(this.lastSnapshot, nextSnapshot); + this.lastSnapshot = nextSnapshot; + + if (!patch) { + return; + } + + const message: DashboardSyncMessage = { type: 'snapshot.patch', patch }; + const payload = JSON.stringify(message); + + this.clients.forEach((client) => { + if (client.readyState === client.OPEN) { + client.send(payload); + } + }); + } } export function buildSnapshotPatch(