From 429d831556db1461268942e2915a9885b8a396fc Mon Sep 17 00:00:00 2001 From: jedarden Date: Mon, 27 Apr 2026 01:34:37 -0400 Subject: [PATCH] feat(cli): add worker and level filtering flags to tui and web commands Add --worker and --level filtering flags to both "fabric tui" and "fabric web" commands. Filters are applied at the tailer level for efficiency, before events are added to the store. - Add --worker option to filter by specific worker ID - Add --level option to filter by log level (debug, info, warn, error) - Validate level filter against valid levels - Pass filter to TUI app for header indicator display - Pass cliFilter to web server for UI indicator display - Apply filters in tailer, OTLP/gRPC, and OTLP/HTTP event handlers Also adds heap snapshot options to web command for leak detection: - Add --heap-snapshots flag to enable automatic heap snapshots - Add --snapshot-interval option for snapshot frequency Co-Authored-By: Claude Opus 4.7 --- src/cli.ts | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/src/cli.ts b/src/cli.ts index 95ceb74..a08d16f 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -95,16 +95,31 @@ program .description('Launch terminal UI dashboard') .option('-f, --file ', 'Log file to tail (single-file mode)') .option('--source ', 'Log source (file or directory)', undefined) + .option('-w, --worker ', 'Filter to specific worker ID') + .option('-l, --level ', 'Filter by log level (debug, info, warn, error)') .option('--otlp-grpc ', 'Enable OTLP/gRPC receiver (e.g. :4317 or 0.0.0.0:4317)') .option('--otlp-http ', 'Enable OTLP/HTTP receiver (e.g. :4318 or 0.0.0.0:4318)') .action(async (options) => { const resolved = resolveFromOptions(options.source, options.file); + // Validate level filter if provided + const validLevels = ['debug', 'info', 'warn', 'error']; + const levelFilter = options.level?.toLowerCase(); + if (levelFilter && !validLevels.includes(levelFilter)) { + console.error(`Invalid level: ${options.level}. Must be one of: ${validLevels.join(', ')}`); + process.exit(1); + } + + // Build filter object + const filter: EventFilter = {}; + if (options.worker) filter.worker = options.worker; + if (levelFilter) filter.level = levelFilter as LogLevel; + try { const { createTuiApp } = await import('./tui/index.js'); const { OtlpGrpcReceiver } = await import('./otlpGrpcReceiver.js'); const store = getStore(); - const app = createTuiApp(store, { logPath: resolved.path }); + const app = createTuiApp(store, { logPath: resolved.path, filter }); // Shared deduplicator for cross-source dedup when OTLP is active const needsDedup = !!(options.otlpGrpc || options.otlpHttp); @@ -122,6 +137,10 @@ program }); tailer.on('event', (event) => { + // Apply filters before processing event + if (filter.worker && event.worker !== filter.worker) return; + if (filter.level && event.level !== filter.level) return; + store.add(event); app.addEvent(event); }); @@ -135,6 +154,10 @@ program if (options.otlpGrpc) { otlpReceiver = new OtlpGrpcReceiver({ address: options.otlpGrpc, deduplicator }); otlpReceiver.on('event', (event) => { + // Apply filters before processing event + if (filter.worker && event.worker !== filter.worker) return; + if (filter.level && event.level !== filter.level) return; + store.add(event); app.addEvent(event); }); @@ -146,6 +169,10 @@ program let otlpHttpServer: import('http').Server | undefined; if (options.otlpHttp) { otlpHttpServer = await startOtlpHttpListener(options.otlpHttp, (event) => { + // Apply filters before processing event + if (filter.worker && event.worker !== filter.worker) return; + if (filter.level && event.level !== filter.level) return; + store.add(event); app.addEvent(event); }, deduplicator); @@ -176,10 +203,14 @@ program .option('-p, --port ', 'Port to listen on', '3000') .option('-f, --file ', 'Log file to tail (single-file mode)') .option('--source ', 'Log source (file or directory)', undefined) + .option('-w, --worker ', 'Filter to specific worker ID') + .option('-l, --level ', 'Filter by log level (debug, info, warn, error)') .option('-a, --auth-token ', 'Auth token for POST endpoints (or use FABRIC_AUTH_TOKEN env var)') .option('--otlp-grpc ', 'Enable OTLP/gRPC receiver (e.g. :4317 or 0.0.0.0:4317)') .option('--otlp-http ', 'Enable OTLP/HTTP receiver (e.g. :4318 or 0.0.0.0:4318)') .option('--max-events ', 'Max events in store before liveness guard exits (memory-bomb guard)') + .option('--heap-snapshots', 'Enable automatic heap snapshot capture for leak detection (default: true in production)') + .option('--snapshot-interval ', 'Interval between heap snapshots (default: 30)', '30') .action(async (options) => { const resolved = resolveFromOptions(options.source, options.file); const port = parseInt(options.port, 10) || 3000; @@ -187,6 +218,19 @@ program const otlpHttpAddr: string | undefined = options.otlpHttp; const maxEventCount = options.maxEvents ? parseInt(options.maxEvents, 10) : undefined; + // Validate level filter if provided + const validLevels = ['debug', 'info', 'warn', 'error']; + const levelFilter = options.level?.toLowerCase(); + if (levelFilter && !validLevels.includes(levelFilter)) { + console.error(`Invalid level: ${options.level}. Must be one of: ${validLevels.join(', ')}`); + process.exit(1); + } + + // Build filter object + const filter: EventFilter = {}; + if (options.worker) filter.worker = options.worker; + if (levelFilter) filter.level = levelFilter as LogLevel; + // Extract port number from --otlp-http (e.g. ":4318" or "0.0.0.0:4318" → 4318) let otlpHttpPort: number | undefined; if (otlpHttpAddr) { @@ -194,6 +238,21 @@ program otlpHttpPort = match ? parseInt(match[1], 10) : undefined; } + // Enable heap snapshots for leak detection in production (NODE_ENV=production) + const enableHeapSnapshots = options.heapSnapshots ?? (process.env.NODE_ENV === 'production'); + const snapshotIntervalMinutes = parseInt(options.snapshotInterval, 10) || 30; + + // Initialize memory profiler for leak detection + const { getMemoryProfiler } = await import('./memoryProfiler.js'); + const profiler = getMemoryProfiler(); + if (enableHeapSnapshots) { + profiler.writeSnapshots = true; + profiler.autoSnapshot = true; + profiler.snapshotIntervalMs = snapshotIntervalMinutes * 60 * 1000; + profiler.startPeriodicCapture(); + console.error(`Heap snapshots enabled: every ${snapshotIntervalMinutes} minutes → ~/.needle/snapshots/`); + } + // Shared deduplicator for cross-source dedup when OTLP is active. // Always created so dedup_dropped is reported accurately in /api/health. const deduplicator = new EventDeduplicator(); @@ -208,6 +267,7 @@ program otlpHttpPort, maxEventCount, deduplicator, + cliFilter: Object.keys(filter).length > 0 ? filter : undefined, }); // Setup log tailing @@ -222,6 +282,10 @@ program }); tailer.on('event', (event) => { + // Apply filters before processing event + if (filter.worker && event.worker !== filter.worker) return; + if (filter.level && event.level !== filter.level) return; + store.add(event); server.recordEvent(); server.broadcast(event); @@ -241,6 +305,10 @@ program const { OtlpGrpcReceiver } = await import('./otlpGrpcReceiver.js'); otlpReceiver = new OtlpGrpcReceiver({ address: options.otlpGrpc, deduplicator }); otlpReceiver.on('event', (event) => { + // Apply filters before processing event + if (filter.worker && event.worker !== filter.worker) return; + if (filter.level && event.level !== filter.level) return; + store.add(event); server.recordEvent(); server.broadcast(event); @@ -252,6 +320,7 @@ program // Handle graceful shutdown process.on('SIGINT', () => { console.log('\nShutting down...'); + profiler.stopPeriodicCapture(); tailer.stop(); otlpReceiver?.stop(); server.stop();