feat(bf-5cdj): sample per-worker process RSS from /proc and expose via API
Some checks are pending
CI / test (18.x) (push) Waiting to run
CI / test (20.x) (push) Waiting to run
CI / test (22.x) (push) Waiting to run

Add MemorySampler that polls active worker PIDs every 10s to sample
/proc/<pid>/status for VmRSS, VmPeak, and VmSwap memory metrics.

Changes:
- Add MemorySampler class with periodic sampling (10s interval)
- Attach rssKb, peakRssKb, swapKb to WorkerState in types.ts
- Integrate with InMemoryEventStore to register PIDs from events
- Expose memory fields on GET /api/workers response
- Broadcast updated memory fields via WebSocket
- Add comprehensive test suite

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-06-07 09:57:41 -04:00
parent 64aa3bd11b
commit 77b1cd72c3
5 changed files with 668 additions and 0 deletions

327
src/memorySampler.test.ts Normal file
View file

@ -0,0 +1,327 @@
/**
* FABRIC MemorySampler Tests
*
* Note: Full fs mocking is not possible in ESM. These tests focus on:
* 1. Public API behavior (register/unregister/sampling)
* 2. Parse logic with direct function testing
* 3. Real-world behavior with actual /proc reads (if available)
*/
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { MemorySampler, getMemorySampler, resetMemorySampler } from './memorySampler.js';
// Create a testable MemorySampler that allows injecting read behavior
class TestableMemorySampler extends MemorySampler {
private mockContent: string | null = null;
injectMockStatus(content: string | null) {
this.mockContent = content;
}
sampleProcStatus(pid: number): import('./memorySampler.js').WorkerMemorySample {
if (this.mockContent === null) {
// Simulate read failure - return null values
return {
rssKb: null,
peakRssKb: null,
swapKb: null,
sampledAt: Date.now(),
};
}
return this['parseProcStatus'](this.mockContent);
}
sampleAllWorkers(): Map<string, import('./memorySampler.js').WorkerMemorySample> {
if (this.mockContent === null) {
// Simulate read failure - return null values
const results = new Map<string, import('./memorySampler.js').WorkerMemorySample>();
for (const workerId of this.getWorkerIds()) {
results.set(workerId, {
rssKb: null,
peakRssKb: null,
swapKb: null,
sampledAt: Date.now(),
});
}
return results;
}
const parsed = this['parseProcStatus'](this.mockContent);
const results = new Map<string, import('./memorySampler.js').WorkerMemorySample>();
for (const workerId of this.getWorkerIds()) {
results.set(workerId, parsed);
}
return results;
}
getWorkerMemory(workerId: string): import('./memorySampler.js').WorkerMemorySample | null {
if (!this['workers'].has(workerId)) {
return null;
}
return this.sampleProcStatus(0);
}
}
describe('MemorySampler', () => {
let sampler: MemorySampler;
beforeEach(() => {
resetMemorySampler();
sampler = new MemorySampler(100, 30000); // 100ms interval for testing
});
afterEach(() => {
sampler.stop();
sampler.clear();
});
describe('registerWorkerPid', () => {
it('should register a worker PID', () => {
sampler.registerWorkerPid('worker-1', 12345);
expect(sampler.workerCount).toBe(1);
expect(sampler.getWorkerIds()).toEqual(['worker-1']);
});
it('should update existing worker PID', () => {
sampler.registerWorkerPid('worker-1', 12345);
sampler.registerWorkerPid('worker-1', 67890); // New PID
expect(sampler.workerCount).toBe(1);
});
it('should track multiple workers', () => {
sampler.registerWorkerPid('worker-1', 12345);
sampler.registerWorkerPid('worker-2', 67890);
expect(sampler.workerCount).toBe(2);
expect(new Set(sampler.getWorkerIds())).toEqual(new Set(['worker-1', 'worker-2']));
});
});
describe('unregisterWorker', () => {
it('should unregister a worker', () => {
sampler.registerWorkerPid('worker-1', 12345);
sampler.unregisterWorker('worker-1');
expect(sampler.workerCount).toBe(0);
expect(sampler.getWorkerIds()).toEqual([]);
});
it('should handle unregistering non-existent worker', () => {
sampler.unregisterWorker('worker-1');
expect(sampler.workerCount).toBe(0);
});
});
describe('parseProcStatus', () => {
it('should parse valid /proc/<pid>/status content', () => {
const testSampler = new TestableMemorySampler();
const mockStatus = `
Name: test
State: S (sleeping)
VmPeak: 12345 kB
VmSize: 23456 kB
VmRSS: 34567 kB
VmData: 45678 kB
VmStk: 1234 kB
VmExe: 567 kB
VmLib: 890 kB
VmSwap: 9876 kB
Threads: 1
`;
testSampler.injectMockStatus(mockStatus);
testSampler.registerWorkerPid('worker-1', 12345);
const result = testSampler.getWorkerMemory('worker-1');
expect(result).not.toBeNull();
expect(result?.rssKb).toBe(34567);
expect(result?.peakRssKb).toBe(12345);
expect(result?.swapKb).toBe(9876);
expect(result?.sampledAt).toBeGreaterThan(0);
});
it('should handle partial /proc/<pid>/status content', () => {
const testSampler = new TestableMemorySampler();
const mockStatus = `
Name: test
State: S (sleeping)
VmPeak: 12345 kB
VmSize: 23456 kB
VmRSS: 34567 kB
`;
testSampler.injectMockStatus(mockStatus);
testSampler.registerWorkerPid('worker-1', 12345);
const result = testSampler.getWorkerMemory('worker-1');
expect(result).not.toBeNull();
expect(result?.rssKb).toBe(34567);
expect(result?.peakRssKb).toBe(12345);
expect(result?.swapKb).toBeNull();
});
it('should return null values when /proc is unreadable', () => {
const testSampler = new TestableMemorySampler();
testSampler.injectMockStatus(null);
testSampler.registerWorkerPid('worker-1', 12345);
const result = testSampler.getWorkerMemory('worker-1');
expect(result).not.toBeNull();
expect(result?.rssKb).toBeNull();
expect(result?.peakRssKb).toBeNull();
expect(result?.swapKb).toBeNull();
});
it('should handle malformed /proc/<pid>/status lines', () => {
const testSampler = new TestableMemorySampler();
const mockStatus = `
Name: test
VmPeak: invalid kB
VmRSS: not-a-number kB
VmSwap: 9876 kB
`;
testSampler.injectMockStatus(mockStatus);
testSampler.registerWorkerPid('worker-1', 12345);
const result = testSampler.getWorkerMemory('worker-1');
expect(result).not.toBeNull();
expect(result?.rssKb).toBeNull();
expect(result?.peakRssKb).toBeNull();
// Only VmSwap is valid
expect(result?.swapKb).toBe(9876);
});
});
describe('sampleAllWorkers', () => {
it('should sample all registered workers', () => {
const testSampler = new TestableMemorySampler();
const mockStatus = `
VmPeak: 12345 kB
VmRSS: 34567 kB
VmSwap: 9876 kB
`;
testSampler.injectMockStatus(mockStatus);
testSampler.registerWorkerPid('worker-1', 12345);
testSampler.registerWorkerPid('worker-2', 67890);
const results = testSampler.sampleAllWorkers();
expect(results.size).toBe(2);
expect(results.get('worker-1')?.rssKb).toBe(34567);
expect(results.get('worker-2')?.rssKb).toBe(34567);
});
it('should remove stale workers during sampling', () => {
const shortLivedSampler = new MemorySampler(100, 50);
shortLivedSampler.registerWorkerPid('worker-1', 12345);
// Simulate time passing
const now = Date.now();
vi.spyOn(Date, 'now').mockReturnValue(now + 100);
const results = shortLivedSampler.sampleAllWorkers();
expect(results.size).toBe(0); // Stale worker removed
expect(shortLivedSampler.workerCount).toBe(0);
shortLivedSampler.stop();
});
});
describe('start and stop', () => {
it('should start periodic sampling', () => {
const testSampler = new TestableMemorySampler();
const mockStatus = `
VmPeak: 12345 kB
VmRSS: 34567 kB
VmSwap: 9876 kB
`;
testSampler.injectMockStatus(mockStatus);
testSampler.registerWorkerPid('worker-1', 12345);
testSampler.start();
// Sample immediately after start
const results1 = testSampler.sampleAllWorkers();
expect(results1.size).toBe(1);
testSampler.stop();
});
it('should stop periodic sampling', () => {
sampler.start();
sampler.stop();
// Should not throw when stopped
sampler.stop();
});
it('should not start twice', () => {
const testSampler = new TestableMemorySampler();
const mockStatus = `
VmPeak: 12345 kB
VmRSS: 34567 kB
VmSwap: 9876 kB
`;
testSampler.injectMockStatus(mockStatus);
testSampler.registerWorkerPid('worker-1', 12345);
testSampler.start();
testSampler.start(); // Should not create second interval
testSampler.stop();
});
});
describe('clear', () => {
it('should clear all worker registrations', () => {
sampler.registerWorkerPid('worker-1', 12345);
sampler.registerWorkerPid('worker-2', 67890);
expect(sampler.workerCount).toBe(2);
sampler.clear();
expect(sampler.workerCount).toBe(0);
expect(sampler.getWorkerIds()).toEqual([]);
});
});
describe('getWorkerMemory', () => {
it('should return null for unregistered worker', () => {
const result = sampler.getWorkerMemory('worker-1');
expect(result).toBeNull();
});
it('should return memory stats for registered worker', () => {
const testSampler = new TestableMemorySampler();
const mockStatus = `
VmPeak: 12345 kB
VmRSS: 34567 kB
VmSwap: 9876 kB
`;
testSampler.injectMockStatus(mockStatus);
testSampler.registerWorkerPid('worker-1', 12345);
const result = testSampler.getWorkerMemory('worker-1');
expect(result).not.toBeNull();
expect(result?.rssKb).toBe(34567);
});
});
});
describe('getMemorySampler singleton', () => {
afterEach(() => {
resetMemorySampler();
});
it('should return the same instance', () => {
const sampler1 = getMemorySampler();
const sampler2 = getMemorySampler();
expect(sampler1).toBe(sampler2);
});
it('should reset to new instance', () => {
const sampler1 = getMemorySampler();
sampler1.registerWorkerPid('worker-1', 12345);
resetMemorySampler();
const sampler2 = getMemorySampler();
expect(sampler2).not.toBe(sampler1);
expect(sampler2.workerCount).toBe(0);
});
});

264
src/memorySampler.ts Normal file
View file

@ -0,0 +1,264 @@
/**
* FABRIC Memory Sampler
*
* Polls /proc/<pid>/status for active worker processes to sample
* VmRSS, VmPeak, and VmSwap memory metrics. Provides per-worker memory
* statistics from the kernel's process accounting.
*
* Integration:
* - Call registerWorkerPid(workerId, pid) when a new worker PID is detected
* - Call start() to begin periodic sampling (every 10 seconds)
* - Call getWorkerMemory(workerId) to get current memory stats
*/
import * as fs from 'fs';
import * as path from 'path';
/** Memory statistics for a worker process */
export interface WorkerMemoryStats {
/** Current RSS in KB */
rssKb: number;
/** Peak RSS in KB */
peakRssKb: number;
/** Swap usage in KB */
swapKb: number;
/** Timestamp when sampled */
sampledAt: number;
}
/** Internal worker PID tracking */
interface WorkerPidEntry {
workerId: string;
pid: number;
lastSeen: number;
}
/** Sample result with optional null values for missing/unreadable data */
export interface WorkerMemorySample {
rssKb: number | null;
peakRssKb: number | null;
swapKb: number | null;
sampledAt: number;
}
/**
* MemorySampler class
*
* Tracks worker PIDs and periodically samples /proc/<pid>/status
* for memory statistics.
*/
export class MemorySampler {
private workers: Map<string, WorkerPidEntry> = new Map();
private sampleInterval: NodeJS.Timeout | null = null;
private intervalMs: number;
private readonly maxWorkerAgeMs: number;
constructor(intervalMs: number = 10000, maxWorkerAgeMs: number = 3600000) {
this.intervalMs = intervalMs;
this.maxWorkerAgeMs = maxWorkerAgeMs;
}
/**
* Register or update a worker's PID.
* @param workerId Worker identifier
* @param pid Process ID
*/
registerWorkerPid(workerId: string, pid: number): void {
this.workers.set(workerId, {
workerId,
pid,
lastSeen: Date.now(),
});
}
/**
* Unregister a worker (e.g., when it exits).
* @param workerId Worker identifier
*/
unregisterWorker(workerId: string): void {
this.workers.delete(workerId);
}
/**
* Get current memory stats for a worker.
* @param workerId Worker identifier
* @returns Memory stats or null if not found
*/
getWorkerMemory(workerId: string): WorkerMemorySample | null {
const entry = this.workers.get(workerId);
if (!entry) {
return null;
}
return this.sampleProcStatus(entry.pid);
}
/**
* Get all currently tracked worker IDs.
*/
getWorkerIds(): string[] {
return Array.from(this.workers.keys());
}
/**
* Start periodic sampling.
*/
start(): void {
if (this.sampleInterval) {
return; // Already running
}
this.sampleInterval = setInterval(() => {
this.sampleAllWorkers();
}, this.intervalMs);
}
/**
* Stop periodic sampling.
*/
stop(): void {
if (this.sampleInterval) {
clearInterval(this.sampleInterval);
this.sampleInterval = null;
}
}
/**
* Sample all registered workers and clean up stale entries.
*/
sampleAllWorkers(): Map<string, WorkerMemorySample> {
const results = new Map<string, WorkerMemorySample>();
const now = Date.now();
const stale: string[] = [];
for (const [workerId, entry] of this.workers) {
// Check for stale entries (worker hasn't been seen in a while)
if (now - entry.lastSeen > this.maxWorkerAgeMs) {
stale.push(workerId);
continue;
}
const sample = this.sampleProcStatus(entry.pid);
results.set(workerId, sample);
}
// Clean up stale entries
for (const workerId of stale) {
this.workers.delete(workerId);
}
return results;
}
/**
* Sample /proc/<pid>/status for a specific PID.
* Protected for test override.
* @param pid Process ID
* @returns Memory sample with null values if unreadable
*/
protected sampleProcStatus(pid: number): WorkerMemorySample {
const statusPath = path.join('/proc', pid.toString(), 'status');
try {
const content = fs.readFileSync(statusPath, 'utf-8');
return this.parseProcStatus(content);
} catch {
// Process may have exited or /proc not readable
return {
rssKb: null,
peakRssKb: null,
swapKb: null,
sampledAt: Date.now(),
};
}
}
/**
* Parse /proc/<pid>/status content to extract memory fields.
* @param content File content
* @returns Parsed memory sample
*/
private parseProcStatus(content: string): WorkerMemorySample {
let rssKb: number | null = null;
let peakRssKb: number | null = null;
let swapKb: number | null = null;
const lines = content.split('\n');
for (const line of lines) {
if (line.startsWith('VmRSS:')) {
rssKb = this.extractKbValue(line);
} else if (line.startsWith('VmPeak:')) {
peakRssKb = this.extractKbValue(line);
} else if (line.startsWith('VmSwap:')) {
swapKb = this.extractKbValue(line);
}
}
return {
rssKb,
peakRssKb,
swapKb,
sampledAt: Date.now(),
};
}
/**
* Extract KB value from a /proc/<pid>/status line.
* Expected format: "VmRSS: 12345 kB"
* @param line Status line
* @returns KB value or null if parse fails
*/
private extractKbValue(line: string): number | null {
try {
// Format: "VmRSS: 12345 kB"
const parts = line.trim().split(/\s+/);
if (parts.length >= 2) {
const value = parseInt(parts[1], 10);
return isNaN(value) ? null : value;
}
} catch {
// Parse failed
}
return null;
}
/**
* Get the number of actively tracked workers.
*/
get workerCount(): number {
return this.workers.size;
}
/**
* Clear all worker registrations.
*/
clear(): void {
this.workers.clear();
}
}
/**
* Global singleton instance
*/
let globalSampler: MemorySampler | undefined;
/**
* Get or create the global MemorySampler instance.
*/
export function getMemorySampler(): MemorySampler {
if (!globalSampler) {
globalSampler = new MemorySampler();
}
return globalSampler;
}
/**
* Reset the global sampler (mainly for testing).
*/
export function resetMemorySampler(): void {
if (globalSampler) {
globalSampler.stop();
globalSampler.clear();
}
globalSampler = undefined;
}

View file

@ -641,6 +641,16 @@ export class InMemoryEventStore implements EventStore {
// Update last activity
worker.lastActivity = event.ts;
// Track PID from event if available
const pid = (event as Record<string, unknown>).pid as number | undefined;
if (pid && typeof pid === 'number') {
worker.pid = pid;
// Register with MemorySampler
const { getMemorySampler } = require('./memorySampler.js');
const sampler = getMemorySampler();
sampler.registerWorkerPid(event.worker, pid);
}
// Track active bead
if (event.bead) {
worker.activeBead = event.bead;

View file

@ -598,6 +598,18 @@ export interface WorkerInfo {
/** RSS usage as percentage of limit (0-100) */
rssPercent?: number;
/** Process ID for this worker (from OTLP telemetry or needle state) */
pid?: number;
/** Current RSS from /proc/<pid>/status in KB */
rssKb?: number;
/** Peak RSS from /proc/<pid>/status in KB */
peakRssKb?: number;
/** Swap usage from /proc/<pid>/status in KB */
swapKb?: number;
}
export interface EventFilter {

View file

@ -25,6 +25,7 @@ import { getMemoryProfiler } from '../memoryProfiler.js';
import { getRecentHeapDiff, analyzeTrend, formatTrendAsMarkdown, saveTrendReport } from '../heapDiff.js';
import { computeRetentionState, pruneLogs, formatPruneResult, PruneOptions } from '../logPruner.js';
import { scanBeadWorkspaces } from '../beadWorkspaceScanner.js';
import { getMemorySampler, type WorkerMemorySample } from '../memorySampler.js';
/** Get the v8 module (available in Node.js) */
function getV8() {
@ -104,6 +105,7 @@ export function createWebServer(options: WebServerOptions): WebServer {
let wsServer: WebSocketServer;
let running = false;
const clients: Set<WebSocket> = new Set();
let memoryUpdateInterval: NodeJS.Timeout | null = null;
function start() {
if (running) return;
@ -184,6 +186,51 @@ export function createWebServer(options: WebServerOptions): WebServer {
});
});
// ── Memory Sampler Setup ──
const memorySampler = getMemorySampler();
memorySampler.start();
// Periodic memory updates broadcast (every 10 seconds)
memoryUpdateInterval = setInterval(() => {
const samples = memorySampler.sampleAllWorkers();
if (samples.size === 0) return;
// Update worker memory stats in store
for (const [workerId, sample] of samples) {
const worker = store.getWorker(workerId);
if (worker && sample.rssKb !== null) {
worker.rssKb = sample.rssKb;
worker.peakRssKb = sample.peakRssKb ?? undefined;
worker.swapKb = sample.swapKb ?? undefined;
}
}
// Broadcast memory updates to WebSocket clients
const message = JSON.stringify({
type: 'memory',
data: {
workers: store.getWorkers(),
},
});
const terminatedClients: WebSocket[] = [];
for (const client of clients) {
if (client.readyState === WebSocket.OPEN) {
if (client.bufferedAmount > WS_MAX_BUFFERED_BYTES) {
client.close(1013, 'Send buffer overflow');
terminatedClients.push(client);
continue;
}
client.send(message);
}
}
// Clean up terminated clients
for (const client of terminatedClients) {
clients.delete(client);
}
}, 10000);
// Health check endpoint
app.get('/api/health', (_req: Request, res: Response) => {
metrics.wsClients = clients.size;
@ -1735,6 +1782,14 @@ export function createWebServer(options: WebServerOptions): WebServer {
function stop() {
if (!running || !httpServer) return;
// Stop memory sampler
if (memoryUpdateInterval) {
clearInterval(memoryUpdateInterval);
memoryUpdateInterval = null;
}
const memorySampler = getMemorySampler();
memorySampler.stop();
// Close all WebSocket connections
for (const client of clients) {
client.close();