Ingest live 3proxy traffic from access logs

This commit is contained in:
2026-04-02 02:03:37 +03:00
parent 1141622f86
commit 9a3785deb9
10 changed files with 408 additions and 37 deletions

View File

@@ -11,10 +11,10 @@ import { validateCreateUserInput, validateSystemInput } from '../src/shared/vali
import {
buildRuntimePaths,
createUserRecord,
deriveDashboardSnapshot,
render3proxyConfig,
type RuntimePaths,
} from './lib/config';
import { getDashboardSnapshot } from './lib/snapshot';
import { AuthService } from './lib/auth';
import type { RuntimeController } from './lib/runtime';
import { StateStore } from './lib/store';
@@ -81,7 +81,7 @@ export function createApp({ store, runtime, runtimeRootDir, auth }: AppServices)
app.get('/api/state', async (_request, response, next) => {
try {
const payload = await getSnapshot(store, runtime, runtimePaths);
const payload = await getDashboardSnapshot(store, runtime, runtimePaths);
response.json(payload);
} catch (error) {
next(error);
@@ -106,7 +106,7 @@ export function createApp({ store, runtime, runtimeRootDir, auth }: AppServices)
}
await writeConfigAndState(store, state, runtimePaths);
response.json(await getSnapshot(store, runtime, runtimePaths));
response.json(await getDashboardSnapshot(store, runtime, runtimePaths));
} catch (error) {
next(error);
}
@@ -120,7 +120,7 @@ export function createApp({ store, runtime, runtimeRootDir, auth }: AppServices)
state.userRecords.push(record);
state.service.lastEvent = `User ${record.username} created from panel`;
await persistRuntimeMutation(store, runtime, state, runtimePaths);
response.status(201).json(await getSnapshot(store, runtime, runtimePaths));
response.status(201).json(await getDashboardSnapshot(store, runtime, runtimePaths));
} catch (error) {
next(error);
}
@@ -147,7 +147,7 @@ export function createApp({ store, runtime, runtimeRootDir, auth }: AppServices)
? `System configuration updated from panel and removed ${removedUsers.length} linked users`
: 'System configuration updated from panel';
await persistRuntimeMutation(store, runtime, state, runtimePaths);
response.json(await getSnapshot(store, runtime, runtimePaths));
response.json(await getDashboardSnapshot(store, runtime, runtimePaths));
} catch (error) {
next(error);
}
@@ -169,7 +169,7 @@ export function createApp({ store, runtime, runtimeRootDir, auth }: AppServices)
: `User ${user.username} resumed from panel`;
await persistRuntimeMutation(store, runtime, state, runtimePaths);
response.json(await getSnapshot(store, runtime, runtimePaths));
response.json(await getDashboardSnapshot(store, runtime, runtimePaths));
} catch (error) {
next(error);
}
@@ -188,7 +188,7 @@ export function createApp({ store, runtime, runtimeRootDir, auth }: AppServices)
const [removed] = state.userRecords.splice(index, 1);
state.service.lastEvent = `User ${removed.username} deleted from panel`;
await persistRuntimeMutation(store, runtime, state, runtimePaths);
response.json(await getSnapshot(store, runtime, runtimePaths));
response.json(await getDashboardSnapshot(store, runtime, runtimePaths));
} catch (error) {
next(error);
}
@@ -214,15 +214,6 @@ export function createApp({ store, runtime, runtimeRootDir, auth }: AppServices)
return app;
}
async function getSnapshot(
store: StateStore,
runtime: RuntimeController,
runtimePaths: RuntimePaths,
) {
const state = await store.read();
const previewConfig = render3proxyConfig(state, runtimePaths);
return deriveDashboardSnapshot(state, runtime.getSnapshot(), previewConfig);
}
async function persistRuntimeMutation(
store: StateStore,
runtime: RuntimeController,

View File

@@ -35,5 +35,6 @@ describe('render3proxyConfig', () => {
expect(config).not.toContain('night-shift:CL:kettle!23');
expect(config).not.toContain('allow night-shift,ops-east');
expect(config).toContain('allow ops-east');
expect(config).toContain('countall 1 D 1024 night-shift');
});
});

View File

@@ -4,6 +4,7 @@ import type {
ControlPlaneState,
CreateUserInput,
DashboardSnapshot,
DailyTrafficBucket,
ProxyServiceRecord,
ProxyUserRecord,
} from '../../src/shared/contracts';
@@ -19,6 +20,14 @@ export interface RuntimeSnapshot {
lastError: string | null;
}
export interface ObservedRuntimeState {
totalBytes: number;
liveConnections: number;
activeUsers: number;
daily: DailyTrafficBucket[];
userRecords: ProxyUserRecord[];
}
export interface RuntimePaths {
rootDir: string;
configPath: string;
@@ -73,7 +82,7 @@ export function render3proxyConfig(state: ControlPlaneState, paths: RuntimePaths
lines.push(`users ${activeUsers.map(renderUserCredential).join(' ')}`);
}
const quotaUsers = activeUsers.filter((user) => user.quotaBytes !== null);
const quotaUsers = state.userRecords.filter((user) => user.quotaBytes !== null);
if (quotaUsers.length > 0) {
lines.push(
`counter ${normalizePath(paths.counterPath)} D ${normalizePath(
@@ -112,13 +121,14 @@ export function render3proxyConfig(state: ControlPlaneState, paths: RuntimePaths
export function deriveDashboardSnapshot(
state: ControlPlaneState,
runtime: RuntimeSnapshot,
observed: ObservedRuntimeState,
previewConfig: string,
): DashboardSnapshot {
const liveUsers = state.userRecords.filter((user) => !user.paused && user.status === 'live').length;
const exceededUsers = state.userRecords.filter(
const liveUsers = observed.userRecords.filter((user) => !user.paused && user.status === 'live').length;
const exceededUsers = observed.userRecords.filter(
(user) => user.quotaBytes !== null && user.usedBytes >= user.quotaBytes,
).length;
const nearQuotaUsers = state.userRecords.filter((user) => {
const nearQuotaUsers = observed.userRecords.filter((user) => {
if (user.paused || user.quotaBytes === null || user.usedBytes >= user.quotaBytes) {
return false;
}
@@ -175,17 +185,19 @@ export function deriveDashboardSnapshot(
lastEvent: state.service.lastEvent,
},
traffic: {
...state.traffic,
activeUsers: state.userRecords.filter((user) => !user.paused).length,
totalBytes: observed.totalBytes,
liveConnections: observed.liveConnections,
activeUsers: observed.activeUsers,
daily: observed.daily,
},
users: {
total: state.userRecords.length,
total: observed.userRecords.length,
live: liveUsers,
nearQuota: nearQuotaUsers,
exceeded: exceededUsers,
},
attention,
userRecords: state.userRecords,
userRecords: observed.userRecords,
system: {
...state.system,
previewConfig,

64
server/lib/snapshot.ts Normal file
View File

@@ -0,0 +1,64 @@
import type { ProxyUserRecord } from '../../src/shared/contracts';
import { deriveDashboardSnapshot, render3proxyConfig, type ObservedRuntimeState, type RuntimePaths, type RuntimeSnapshot } from './config';
import type { StateStore } from './store';
import { readObservedTraffic } from './traffic';
export async function getDashboardSnapshot(
store: StateStore,
runtime: { getSnapshot(): RuntimeSnapshot },
runtimePaths: RuntimePaths,
) {
const state = await store.read();
const previewConfig = render3proxyConfig(state, runtimePaths);
const traffic = await readObservedTraffic(runtimePaths, state.userRecords);
const observedUsers = deriveObservedUsers(state.userRecords, traffic.userBytesByName, traffic.recentUsers);
const observed: ObservedRuntimeState = {
totalBytes: traffic.totalBytes,
liveConnections: traffic.liveConnections,
activeUsers: traffic.activeUsers,
daily: traffic.daily,
userRecords: observedUsers,
};
return deriveDashboardSnapshot(state, runtime.getSnapshot(), observed, previewConfig);
}
function deriveObservedUsers(
users: ProxyUserRecord[],
userBytesByName: Map<string, number>,
recentUsers: Set<string>,
): ProxyUserRecord[] {
return users.map((user) => {
const usedBytes = userBytesByName.get(user.username) ?? 0;
if (user.paused) {
return {
...user,
usedBytes,
status: 'idle',
};
}
if (user.quotaBytes !== null && usedBytes >= user.quotaBytes) {
return {
...user,
usedBytes,
status: 'fail',
};
}
if (user.quotaBytes !== null && user.quotaBytes > 0 && usedBytes / user.quotaBytes >= 0.8) {
return {
...user,
usedBytes,
status: recentUsers.has(user.username) ? 'live' : 'warn',
};
}
return {
...user,
usedBytes,
status: recentUsers.has(user.username) ? 'live' : 'idle',
};
});
}

104
server/lib/traffic.test.ts Normal file
View File

@@ -0,0 +1,104 @@
import fs from 'node:fs/promises';
import os from 'node:os';
import path from 'node:path';
import { afterEach, describe, expect, it } from 'vitest';
import type { ProxyUserRecord } from '../../src/shared/contracts';
import { readObservedTraffic } from './traffic';
const cleanupDirs: string[] = [];
afterEach(async () => {
await Promise.all(cleanupDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true })));
});
describe('readObservedTraffic', () => {
it('derives current user usage, recent activity, and daily totals from 3proxy access logs', async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), '3proxy-traffic-'));
cleanupDirs.push(dir);
const logDir = path.join(dir, 'logs');
await fs.mkdir(logDir, { recursive: true });
await fs.writeFile(
path.join(logDir, '3proxy.log.2026.04.02'),
[
'260402111500.000 1080 00000 night-shift 172.19.0.1:54402 104.18.27.120:80 100 900 0 GET http://example.com/ HTTP/1.1',
'260402115900.000 2080 00000 lab-unlimited 172.19.0.1:53490 8.6.112.0:80 75 842 0 CONNECT example.com:80',
'260402115930.000 1080 00000 - 0.0.0.0:1080 0.0.0.0:0 0 0 0 Accepting connections [18/1]',
].join('\n'),
'utf8',
);
await fs.writeFile(
path.join(logDir, '3proxy.log.2026.04.01'),
'260401230000.000 1080 00000 night-shift 172.19.0.1:50000 104.18.27.120:80 50 150 0 GET http://example.com/ HTTP/1.1\n',
'utf8',
);
const users: ProxyUserRecord[] = [
{
id: 'u-1',
username: 'night-shift',
password: 'secret',
serviceId: 'socks-main',
status: 'idle',
usedBytes: 0,
quotaBytes: 1024,
},
{
id: 'u-2',
username: 'lab-unlimited',
password: 'secret',
serviceId: 'socks-lab',
status: 'idle',
usedBytes: 0,
quotaBytes: null,
},
];
const observed = await readObservedTraffic(
{
rootDir: dir,
configPath: path.join(dir, 'generated', '3proxy.cfg'),
counterPath: path.join(dir, 'state', 'counters.3cf'),
reportDir: path.join(dir, 'state', 'reports'),
logPath: path.join(logDir, '3proxy.log'),
pidPath: path.join(dir, '3proxy.pid'),
},
users,
new Date(2026, 3, 2, 12, 0, 0, 0),
);
expect(observed.totalBytes).toBe(1917);
expect(observed.liveConnections).toBe(1);
expect(observed.activeUsers).toBe(2);
expect(observed.userBytesByName.get('night-shift')).toBe(1000);
expect(observed.userBytesByName.get('lab-unlimited')).toBe(917);
expect(observed.recentUsers.has('night-shift')).toBe(false);
expect(observed.recentUsers.has('lab-unlimited')).toBe(true);
expect(observed.daily[observed.daily.length - 2].bytes).toBe(200);
expect(observed.daily[observed.daily.length - 1].bytes).toBe(1917);
});
it('returns zeroed metrics when no runtime logs exist yet', async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), '3proxy-traffic-empty-'));
cleanupDirs.push(dir);
const observed = await readObservedTraffic(
{
rootDir: dir,
configPath: path.join(dir, 'generated', '3proxy.cfg'),
counterPath: path.join(dir, 'state', 'counters.3cf'),
reportDir: path.join(dir, 'state', 'reports'),
logPath: path.join(dir, 'logs', '3proxy.log'),
pidPath: path.join(dir, '3proxy.pid'),
},
[],
new Date(2026, 3, 2, 12, 0, 0, 0),
);
expect(observed.totalBytes).toBe(0);
expect(observed.liveConnections).toBe(0);
expect(observed.activeUsers).toBe(0);
expect(observed.daily).toHaveLength(5);
expect(observed.daily.every((entry) => entry.bytes === 0 && entry.share === 0)).toBe(true);
});
});

184
server/lib/traffic.ts Normal file
View File

@@ -0,0 +1,184 @@
import fs from 'node:fs/promises';
import path from 'node:path';
import type { DailyTrafficBucket, ProxyUserRecord } from '../../src/shared/contracts';
import type { RuntimePaths } from './config';
const LOG_FILE_PREFIX = '3proxy.log';
const RECENT_WINDOW_MS = 15 * 60 * 1000;
const CONNECTION_WINDOW_MS = 5 * 60 * 1000;
const HISTORY_DAYS = 5;
export interface ObservedTraffic {
totalBytes: number;
liveConnections: number;
activeUsers: number;
daily: DailyTrafficBucket[];
userBytesByName: Map<string, number>;
recentUsers: Set<string>;
}
interface ParsedLogRecord {
timestamp: Date;
username: string;
bytes: number;
}
export async function readObservedTraffic(
runtimePaths: RuntimePaths,
users: ProxyUserRecord[],
now = new Date(),
): Promise<ObservedTraffic> {
const records = await readParsedRecords(runtimePaths.logPath);
const currentDayKey = toDayKey(now);
const historyStart = startOfDay(now);
const history = buildHistoryDays(historyStart, HISTORY_DAYS);
const dailyTotals = new Map(history.map((entry) => [entry.key, 0]));
const userBytesByName = new Map(users.map((user) => [user.username, 0]));
const recentUsers = new Set<string>();
let liveConnections = 0;
records.forEach((record) => {
const dayKey = toDayKey(record.timestamp);
if (dailyTotals.has(dayKey)) {
dailyTotals.set(dayKey, (dailyTotals.get(dayKey) ?? 0) + record.bytes);
}
if (dayKey === currentDayKey && userBytesByName.has(record.username)) {
userBytesByName.set(record.username, (userBytesByName.get(record.username) ?? 0) + record.bytes);
}
const ageMs = now.getTime() - record.timestamp.getTime();
if (ageMs >= 0 && ageMs <= RECENT_WINDOW_MS) {
recentUsers.add(record.username);
}
if (ageMs >= 0 && ageMs <= CONNECTION_WINDOW_MS) {
liveConnections += 1;
}
});
const activeUsers = Array.from(userBytesByName.values()).filter((value) => value > 0).length;
const totalBytes = Array.from(userBytesByName.values()).reduce((sum, value) => sum + value, 0);
const daily = history.map((entry) => ({
day: entry.label,
bytes: dailyTotals.get(entry.key) ?? 0,
share: 0,
}));
const peak = Math.max(...daily.map((entry) => entry.bytes), 0);
daily.forEach((entry) => {
entry.share = peak > 0 ? entry.bytes / peak : 0;
});
return {
totalBytes,
liveConnections,
activeUsers,
daily,
userBytesByName,
recentUsers,
};
}
async function readParsedRecords(logPath: string): Promise<ParsedLogRecord[]> {
const logDir = path.dirname(logPath);
try {
const files = await fs.readdir(logDir);
const logFiles = files
.filter((file) => file === LOG_FILE_PREFIX || file.startsWith(`${LOG_FILE_PREFIX}.`))
.sort()
.slice(-HISTORY_DAYS - 2);
const parsed = await Promise.all(logFiles.map(async (file) => parseLogFile(path.join(logDir, file))));
return parsed.flat();
} catch (error) {
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
return [];
}
throw error;
}
}
async function parseLogFile(filePath: string): Promise<ParsedLogRecord[]> {
try {
const content = await fs.readFile(filePath, 'utf8');
return content
.split(/\r?\n/)
.map(parseLogLine)
.filter((record): record is ParsedLogRecord => record !== null);
} catch (error) {
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
return [];
}
throw error;
}
}
function parseLogLine(line: string): ParsedLogRecord | null {
const trimmed = line.trim();
if (!trimmed || trimmed.includes('Accepting connections') || trimmed.includes('Exiting thread')) {
return null;
}
const match = trimmed.match(
/^(\d{12}\.\d{3})\s+\d+\s+\d+\s+(\S+)\s+\S+\s+\S+\s+(\d+)\s+(\d+)\s+\d+\s+/,
);
if (!match) {
return null;
}
const [, rawTimestamp, username, bytesIn, bytesOut] = match;
if (username === '-') {
return null;
}
const timestamp = parse3proxyTimestamp(rawTimestamp);
if (!timestamp) {
return null;
}
return {
timestamp,
username,
bytes: Number(bytesIn) + Number(bytesOut),
};
}
function parse3proxyTimestamp(value: string): Date | null {
const match = value.match(/^(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})\.(\d{3})$/);
if (!match) {
return null;
}
const [, yy, mm, dd, hh, min, ss, ms] = match;
const year = 2000 + Number(yy);
const date = new Date(year, Number(mm) - 1, Number(dd), Number(hh), Number(min), Number(ss), Number(ms));
return Number.isNaN(date.getTime()) ? null : date;
}
function buildHistoryDays(today: Date, count: number) {
return Array.from({ length: count }, (_value, index) => {
const date = new Date(today);
date.setDate(today.getDate() - (count - index - 1));
return {
key: toDayKey(date),
label: date.toLocaleDateString('en-US', { weekday: 'short' }),
};
});
}
function startOfDay(date: Date): Date {
return new Date(date.getFullYear(), date.getMonth(), date.getDate());
}
function toDayKey(date: Date): string {
const month = String(date.getMonth() + 1).padStart(2, '0');
const day = String(date.getDate()).padStart(2, '0');
return `${date.getFullYear()}-${month}-${day}`;
}