diff --git a/src/memorySampler.test.ts b/src/memorySampler.test.ts new file mode 100644 index 0000000..39ca778 --- /dev/null +++ b/src/memorySampler.test.ts @@ -0,0 +1,327 @@ +/** + * FABRIC MemorySampler Tests + * + * Note: Full fs mocking is not possible in ESM. These tests focus on: + * 1. Public API behavior (register/unregister/sampling) + * 2. Parse logic with direct function testing + * 3. Real-world behavior with actual /proc reads (if available) + */ + +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { MemorySampler, getMemorySampler, resetMemorySampler } from './memorySampler.js'; + +// Create a testable MemorySampler that allows injecting read behavior +class TestableMemorySampler extends MemorySampler { + private mockContent: string | null = null; + + injectMockStatus(content: string | null) { + this.mockContent = content; + } + + sampleProcStatus(pid: number): import('./memorySampler.js').WorkerMemorySample { + if (this.mockContent === null) { + // Simulate read failure - return null values + return { + rssKb: null, + peakRssKb: null, + swapKb: null, + sampledAt: Date.now(), + }; + } + return this['parseProcStatus'](this.mockContent); + } + + sampleAllWorkers(): Map { + if (this.mockContent === null) { + // Simulate read failure - return null values + const results = new Map(); + for (const workerId of this.getWorkerIds()) { + results.set(workerId, { + rssKb: null, + peakRssKb: null, + swapKb: null, + sampledAt: Date.now(), + }); + } + return results; + } + const parsed = this['parseProcStatus'](this.mockContent); + const results = new Map(); + for (const workerId of this.getWorkerIds()) { + results.set(workerId, parsed); + } + return results; + } + + getWorkerMemory(workerId: string): import('./memorySampler.js').WorkerMemorySample | null { + if (!this['workers'].has(workerId)) { + return null; + } + return this.sampleProcStatus(0); + } +} + +describe('MemorySampler', () => { + let sampler: MemorySampler; + + beforeEach(() => { + resetMemorySampler(); + sampler = new MemorySampler(100, 30000); // 100ms interval for testing + }); + + afterEach(() => { + sampler.stop(); + sampler.clear(); + }); + + describe('registerWorkerPid', () => { + it('should register a worker PID', () => { + sampler.registerWorkerPid('worker-1', 12345); + expect(sampler.workerCount).toBe(1); + expect(sampler.getWorkerIds()).toEqual(['worker-1']); + }); + + it('should update existing worker PID', () => { + sampler.registerWorkerPid('worker-1', 12345); + sampler.registerWorkerPid('worker-1', 67890); // New PID + expect(sampler.workerCount).toBe(1); + }); + + it('should track multiple workers', () => { + sampler.registerWorkerPid('worker-1', 12345); + sampler.registerWorkerPid('worker-2', 67890); + expect(sampler.workerCount).toBe(2); + expect(new Set(sampler.getWorkerIds())).toEqual(new Set(['worker-1', 'worker-2'])); + }); + }); + + describe('unregisterWorker', () => { + it('should unregister a worker', () => { + sampler.registerWorkerPid('worker-1', 12345); + sampler.unregisterWorker('worker-1'); + expect(sampler.workerCount).toBe(0); + expect(sampler.getWorkerIds()).toEqual([]); + }); + + it('should handle unregistering non-existent worker', () => { + sampler.unregisterWorker('worker-1'); + expect(sampler.workerCount).toBe(0); + }); + }); + + describe('parseProcStatus', () => { + it('should parse valid /proc//status content', () => { + const testSampler = new TestableMemorySampler(); + const mockStatus = ` +Name: test +State: S (sleeping) +VmPeak: 12345 kB +VmSize: 23456 kB +VmRSS: 34567 kB +VmData: 45678 kB +VmStk: 1234 kB +VmExe: 567 kB +VmLib: 890 kB +VmSwap: 9876 kB +Threads: 1 +`; + testSampler.injectMockStatus(mockStatus); + testSampler.registerWorkerPid('worker-1', 12345); + const result = testSampler.getWorkerMemory('worker-1'); + + expect(result).not.toBeNull(); + expect(result?.rssKb).toBe(34567); + expect(result?.peakRssKb).toBe(12345); + expect(result?.swapKb).toBe(9876); + expect(result?.sampledAt).toBeGreaterThan(0); + }); + + it('should handle partial /proc//status content', () => { + const testSampler = new TestableMemorySampler(); + const mockStatus = ` +Name: test +State: S (sleeping) +VmPeak: 12345 kB +VmSize: 23456 kB +VmRSS: 34567 kB +`; + testSampler.injectMockStatus(mockStatus); + testSampler.registerWorkerPid('worker-1', 12345); + const result = testSampler.getWorkerMemory('worker-1'); + + expect(result).not.toBeNull(); + expect(result?.rssKb).toBe(34567); + expect(result?.peakRssKb).toBe(12345); + expect(result?.swapKb).toBeNull(); + }); + + it('should return null values when /proc is unreadable', () => { + const testSampler = new TestableMemorySampler(); + testSampler.injectMockStatus(null); + testSampler.registerWorkerPid('worker-1', 12345); + const result = testSampler.getWorkerMemory('worker-1'); + + expect(result).not.toBeNull(); + expect(result?.rssKb).toBeNull(); + expect(result?.peakRssKb).toBeNull(); + expect(result?.swapKb).toBeNull(); + }); + + it('should handle malformed /proc//status lines', () => { + const testSampler = new TestableMemorySampler(); + const mockStatus = ` +Name: test +VmPeak: invalid kB +VmRSS: not-a-number kB +VmSwap: 9876 kB +`; + testSampler.injectMockStatus(mockStatus); + testSampler.registerWorkerPid('worker-1', 12345); + const result = testSampler.getWorkerMemory('worker-1'); + + expect(result).not.toBeNull(); + expect(result?.rssKb).toBeNull(); + expect(result?.peakRssKb).toBeNull(); + // Only VmSwap is valid + expect(result?.swapKb).toBe(9876); + }); + }); + + describe('sampleAllWorkers', () => { + it('should sample all registered workers', () => { + const testSampler = new TestableMemorySampler(); + const mockStatus = ` +VmPeak: 12345 kB +VmRSS: 34567 kB +VmSwap: 9876 kB +`; + testSampler.injectMockStatus(mockStatus); + testSampler.registerWorkerPid('worker-1', 12345); + testSampler.registerWorkerPid('worker-2', 67890); + + const results = testSampler.sampleAllWorkers(); + + expect(results.size).toBe(2); + expect(results.get('worker-1')?.rssKb).toBe(34567); + expect(results.get('worker-2')?.rssKb).toBe(34567); + }); + + it('should remove stale workers during sampling', () => { + const shortLivedSampler = new MemorySampler(100, 50); + shortLivedSampler.registerWorkerPid('worker-1', 12345); + + // Simulate time passing + const now = Date.now(); + vi.spyOn(Date, 'now').mockReturnValue(now + 100); + + const results = shortLivedSampler.sampleAllWorkers(); + + expect(results.size).toBe(0); // Stale worker removed + expect(shortLivedSampler.workerCount).toBe(0); + + shortLivedSampler.stop(); + }); + }); + + describe('start and stop', () => { + it('should start periodic sampling', () => { + const testSampler = new TestableMemorySampler(); + const mockStatus = ` +VmPeak: 12345 kB +VmRSS: 34567 kB +VmSwap: 9876 kB +`; + testSampler.injectMockStatus(mockStatus); + testSampler.registerWorkerPid('worker-1', 12345); + testSampler.start(); + + // Sample immediately after start + const results1 = testSampler.sampleAllWorkers(); + expect(results1.size).toBe(1); + + testSampler.stop(); + }); + + it('should stop periodic sampling', () => { + sampler.start(); + sampler.stop(); + + // Should not throw when stopped + sampler.stop(); + }); + + it('should not start twice', () => { + const testSampler = new TestableMemorySampler(); + const mockStatus = ` +VmPeak: 12345 kB +VmRSS: 34567 kB +VmSwap: 9876 kB +`; + testSampler.injectMockStatus(mockStatus); + testSampler.registerWorkerPid('worker-1', 12345); + testSampler.start(); + testSampler.start(); // Should not create second interval + + testSampler.stop(); + }); + }); + + describe('clear', () => { + it('should clear all worker registrations', () => { + sampler.registerWorkerPid('worker-1', 12345); + sampler.registerWorkerPid('worker-2', 67890); + + expect(sampler.workerCount).toBe(2); + + sampler.clear(); + + expect(sampler.workerCount).toBe(0); + expect(sampler.getWorkerIds()).toEqual([]); + }); + }); + + describe('getWorkerMemory', () => { + it('should return null for unregistered worker', () => { + const result = sampler.getWorkerMemory('worker-1'); + expect(result).toBeNull(); + }); + + it('should return memory stats for registered worker', () => { + const testSampler = new TestableMemorySampler(); + const mockStatus = ` +VmPeak: 12345 kB +VmRSS: 34567 kB +VmSwap: 9876 kB +`; + testSampler.injectMockStatus(mockStatus); + testSampler.registerWorkerPid('worker-1', 12345); + const result = testSampler.getWorkerMemory('worker-1'); + + expect(result).not.toBeNull(); + expect(result?.rssKb).toBe(34567); + }); + }); +}); + +describe('getMemorySampler singleton', () => { + afterEach(() => { + resetMemorySampler(); + }); + + it('should return the same instance', () => { + const sampler1 = getMemorySampler(); + const sampler2 = getMemorySampler(); + expect(sampler1).toBe(sampler2); + }); + + it('should reset to new instance', () => { + const sampler1 = getMemorySampler(); + sampler1.registerWorkerPid('worker-1', 12345); + + resetMemorySampler(); + + const sampler2 = getMemorySampler(); + expect(sampler2).not.toBe(sampler1); + expect(sampler2.workerCount).toBe(0); + }); +}); diff --git a/src/memorySampler.ts b/src/memorySampler.ts new file mode 100644 index 0000000..7c7aa29 --- /dev/null +++ b/src/memorySampler.ts @@ -0,0 +1,264 @@ +/** + * FABRIC Memory Sampler + * + * Polls /proc//status for active worker processes to sample + * VmRSS, VmPeak, and VmSwap memory metrics. Provides per-worker memory + * statistics from the kernel's process accounting. + * + * Integration: + * - Call registerWorkerPid(workerId, pid) when a new worker PID is detected + * - Call start() to begin periodic sampling (every 10 seconds) + * - Call getWorkerMemory(workerId) to get current memory stats + */ + +import * as fs from 'fs'; +import * as path from 'path'; + +/** Memory statistics for a worker process */ +export interface WorkerMemoryStats { + /** Current RSS in KB */ + rssKb: number; + /** Peak RSS in KB */ + peakRssKb: number; + /** Swap usage in KB */ + swapKb: number; + /** Timestamp when sampled */ + sampledAt: number; +} + +/** Internal worker PID tracking */ +interface WorkerPidEntry { + workerId: string; + pid: number; + lastSeen: number; +} + +/** Sample result with optional null values for missing/unreadable data */ +export interface WorkerMemorySample { + rssKb: number | null; + peakRssKb: number | null; + swapKb: number | null; + sampledAt: number; +} + +/** + * MemorySampler class + * + * Tracks worker PIDs and periodically samples /proc//status + * for memory statistics. + */ +export class MemorySampler { + private workers: Map = new Map(); + private sampleInterval: NodeJS.Timeout | null = null; + private intervalMs: number; + private readonly maxWorkerAgeMs: number; + + constructor(intervalMs: number = 10000, maxWorkerAgeMs: number = 3600000) { + this.intervalMs = intervalMs; + this.maxWorkerAgeMs = maxWorkerAgeMs; + } + + /** + * Register or update a worker's PID. + * @param workerId Worker identifier + * @param pid Process ID + */ + registerWorkerPid(workerId: string, pid: number): void { + this.workers.set(workerId, { + workerId, + pid, + lastSeen: Date.now(), + }); + } + + /** + * Unregister a worker (e.g., when it exits). + * @param workerId Worker identifier + */ + unregisterWorker(workerId: string): void { + this.workers.delete(workerId); + } + + /** + * Get current memory stats for a worker. + * @param workerId Worker identifier + * @returns Memory stats or null if not found + */ + getWorkerMemory(workerId: string): WorkerMemorySample | null { + const entry = this.workers.get(workerId); + if (!entry) { + return null; + } + + return this.sampleProcStatus(entry.pid); + } + + /** + * Get all currently tracked worker IDs. + */ + getWorkerIds(): string[] { + return Array.from(this.workers.keys()); + } + + /** + * Start periodic sampling. + */ + start(): void { + if (this.sampleInterval) { + return; // Already running + } + + this.sampleInterval = setInterval(() => { + this.sampleAllWorkers(); + }, this.intervalMs); + } + + /** + * Stop periodic sampling. + */ + stop(): void { + if (this.sampleInterval) { + clearInterval(this.sampleInterval); + this.sampleInterval = null; + } + } + + /** + * Sample all registered workers and clean up stale entries. + */ + sampleAllWorkers(): Map { + const results = new Map(); + const now = Date.now(); + const stale: string[] = []; + + for (const [workerId, entry] of this.workers) { + // Check for stale entries (worker hasn't been seen in a while) + if (now - entry.lastSeen > this.maxWorkerAgeMs) { + stale.push(workerId); + continue; + } + + const sample = this.sampleProcStatus(entry.pid); + results.set(workerId, sample); + } + + // Clean up stale entries + for (const workerId of stale) { + this.workers.delete(workerId); + } + + return results; + } + + /** + * Sample /proc//status for a specific PID. + * Protected for test override. + * @param pid Process ID + * @returns Memory sample with null values if unreadable + */ + protected sampleProcStatus(pid: number): WorkerMemorySample { + const statusPath = path.join('/proc', pid.toString(), 'status'); + + try { + const content = fs.readFileSync(statusPath, 'utf-8'); + return this.parseProcStatus(content); + } catch { + // Process may have exited or /proc not readable + return { + rssKb: null, + peakRssKb: null, + swapKb: null, + sampledAt: Date.now(), + }; + } + } + + /** + * Parse /proc//status content to extract memory fields. + * @param content File content + * @returns Parsed memory sample + */ + private parseProcStatus(content: string): WorkerMemorySample { + let rssKb: number | null = null; + let peakRssKb: number | null = null; + let swapKb: number | null = null; + + const lines = content.split('\n'); + for (const line of lines) { + if (line.startsWith('VmRSS:')) { + rssKb = this.extractKbValue(line); + } else if (line.startsWith('VmPeak:')) { + peakRssKb = this.extractKbValue(line); + } else if (line.startsWith('VmSwap:')) { + swapKb = this.extractKbValue(line); + } + } + + return { + rssKb, + peakRssKb, + swapKb, + sampledAt: Date.now(), + }; + } + + /** + * Extract KB value from a /proc//status line. + * Expected format: "VmRSS: 12345 kB" + * @param line Status line + * @returns KB value or null if parse fails + */ + private extractKbValue(line: string): number | null { + try { + // Format: "VmRSS: 12345 kB" + const parts = line.trim().split(/\s+/); + if (parts.length >= 2) { + const value = parseInt(parts[1], 10); + return isNaN(value) ? null : value; + } + } catch { + // Parse failed + } + return null; + } + + /** + * Get the number of actively tracked workers. + */ + get workerCount(): number { + return this.workers.size; + } + + /** + * Clear all worker registrations. + */ + clear(): void { + this.workers.clear(); + } +} + +/** + * Global singleton instance + */ +let globalSampler: MemorySampler | undefined; + +/** + * Get or create the global MemorySampler instance. + */ +export function getMemorySampler(): MemorySampler { + if (!globalSampler) { + globalSampler = new MemorySampler(); + } + return globalSampler; +} + +/** + * Reset the global sampler (mainly for testing). + */ +export function resetMemorySampler(): void { + if (globalSampler) { + globalSampler.stop(); + globalSampler.clear(); + } + globalSampler = undefined; +} diff --git a/src/store.ts b/src/store.ts index 7e1d2f3..95c1871 100644 --- a/src/store.ts +++ b/src/store.ts @@ -641,6 +641,16 @@ export class InMemoryEventStore implements EventStore { // Update last activity worker.lastActivity = event.ts; + // Track PID from event if available + const pid = (event as Record).pid as number | undefined; + if (pid && typeof pid === 'number') { + worker.pid = pid; + // Register with MemorySampler + const { getMemorySampler } = require('./memorySampler.js'); + const sampler = getMemorySampler(); + sampler.registerWorkerPid(event.worker, pid); + } + // Track active bead if (event.bead) { worker.activeBead = event.bead; diff --git a/src/types.ts b/src/types.ts index 8fc0551..f0dd282 100644 --- a/src/types.ts +++ b/src/types.ts @@ -598,6 +598,18 @@ export interface WorkerInfo { /** RSS usage as percentage of limit (0-100) */ rssPercent?: number; + + /** Process ID for this worker (from OTLP telemetry or needle state) */ + pid?: number; + + /** Current RSS from /proc//status in KB */ + rssKb?: number; + + /** Peak RSS from /proc//status in KB */ + peakRssKb?: number; + + /** Swap usage from /proc//status in KB */ + swapKb?: number; } export interface EventFilter { diff --git a/src/web/server.ts b/src/web/server.ts index 10e19d6..9ddbc23 100644 --- a/src/web/server.ts +++ b/src/web/server.ts @@ -25,6 +25,7 @@ import { getMemoryProfiler } from '../memoryProfiler.js'; import { getRecentHeapDiff, analyzeTrend, formatTrendAsMarkdown, saveTrendReport } from '../heapDiff.js'; import { computeRetentionState, pruneLogs, formatPruneResult, PruneOptions } from '../logPruner.js'; import { scanBeadWorkspaces } from '../beadWorkspaceScanner.js'; +import { getMemorySampler, type WorkerMemorySample } from '../memorySampler.js'; /** Get the v8 module (available in Node.js) */ function getV8() { @@ -104,6 +105,7 @@ export function createWebServer(options: WebServerOptions): WebServer { let wsServer: WebSocketServer; let running = false; const clients: Set = new Set(); + let memoryUpdateInterval: NodeJS.Timeout | null = null; function start() { if (running) return; @@ -184,6 +186,51 @@ export function createWebServer(options: WebServerOptions): WebServer { }); }); + // ── Memory Sampler Setup ── + const memorySampler = getMemorySampler(); + memorySampler.start(); + + // Periodic memory updates broadcast (every 10 seconds) + memoryUpdateInterval = setInterval(() => { + const samples = memorySampler.sampleAllWorkers(); + if (samples.size === 0) return; + + // Update worker memory stats in store + for (const [workerId, sample] of samples) { + const worker = store.getWorker(workerId); + if (worker && sample.rssKb !== null) { + worker.rssKb = sample.rssKb; + worker.peakRssKb = sample.peakRssKb ?? undefined; + worker.swapKb = sample.swapKb ?? undefined; + } + } + + // Broadcast memory updates to WebSocket clients + const message = JSON.stringify({ + type: 'memory', + data: { + workers: store.getWorkers(), + }, + }); + + const terminatedClients: WebSocket[] = []; + for (const client of clients) { + if (client.readyState === WebSocket.OPEN) { + if (client.bufferedAmount > WS_MAX_BUFFERED_BYTES) { + client.close(1013, 'Send buffer overflow'); + terminatedClients.push(client); + continue; + } + client.send(message); + } + } + + // Clean up terminated clients + for (const client of terminatedClients) { + clients.delete(client); + } + }, 10000); + // Health check endpoint app.get('/api/health', (_req: Request, res: Response) => { metrics.wsClients = clients.size; @@ -1735,6 +1782,14 @@ export function createWebServer(options: WebServerOptions): WebServer { function stop() { if (!running || !httpServer) return; + // Stop memory sampler + if (memoryUpdateInterval) { + clearInterval(memoryUpdateInterval); + memoryUpdateInterval = null; + } + const memorySampler = getMemorySampler(); + memorySampler.stop(); + // Close all WebSocket connections for (const client of clients) { client.close();