feat(cli): add worker and level filtering flags to tui and web commands

Add --worker <id> and --level <level> filtering flags to both "fabric tui"
and "fabric web" commands. Filters are applied at the tailer level for
efficiency, before events are added to the store.

- Add --worker <id> option to filter by specific worker ID
- Add --level <level> option to filter by log level (debug, info, warn, error)
- Validate level filter against valid levels
- Pass filter to TUI app for header indicator display
- Pass cliFilter to web server for UI indicator display
- Apply filters in tailer, OTLP/gRPC, and OTLP/HTTP event handlers

Also adds heap snapshot options to web command for leak detection:
- Add --heap-snapshots flag to enable automatic heap snapshots
- Add --snapshot-interval <minutes> option for snapshot frequency

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-27 01:34:37 -04:00
parent 5dd5dce0f0
commit 429d831556

View file

@ -95,16 +95,31 @@ program
.description('Launch terminal UI dashboard')
.option('-f, --file <path>', 'Log file to tail (single-file mode)')
.option('--source <path>', 'Log source (file or directory)', undefined)
.option('-w, --worker <id>', 'Filter to specific worker ID')
.option('-l, --level <level>', 'Filter by log level (debug, info, warn, error)')
.option('--otlp-grpc <addr>', 'Enable OTLP/gRPC receiver (e.g. :4317 or 0.0.0.0:4317)')
.option('--otlp-http <addr>', 'Enable OTLP/HTTP receiver (e.g. :4318 or 0.0.0.0:4318)')
.action(async (options) => {
const resolved = resolveFromOptions(options.source, options.file);
// Validate level filter if provided
const validLevels = ['debug', 'info', 'warn', 'error'];
const levelFilter = options.level?.toLowerCase();
if (levelFilter && !validLevels.includes(levelFilter)) {
console.error(`Invalid level: ${options.level}. Must be one of: ${validLevels.join(', ')}`);
process.exit(1);
}
// Build filter object
const filter: EventFilter = {};
if (options.worker) filter.worker = options.worker;
if (levelFilter) filter.level = levelFilter as LogLevel;
try {
const { createTuiApp } = await import('./tui/index.js');
const { OtlpGrpcReceiver } = await import('./otlpGrpcReceiver.js');
const store = getStore();
const app = createTuiApp(store, { logPath: resolved.path });
const app = createTuiApp(store, { logPath: resolved.path, filter });
// Shared deduplicator for cross-source dedup when OTLP is active
const needsDedup = !!(options.otlpGrpc || options.otlpHttp);
@ -122,6 +137,10 @@ program
});
tailer.on('event', (event) => {
// Apply filters before processing event
if (filter.worker && event.worker !== filter.worker) return;
if (filter.level && event.level !== filter.level) return;
store.add(event);
app.addEvent(event);
});
@ -135,6 +154,10 @@ program
if (options.otlpGrpc) {
otlpReceiver = new OtlpGrpcReceiver({ address: options.otlpGrpc, deduplicator });
otlpReceiver.on('event', (event) => {
// Apply filters before processing event
if (filter.worker && event.worker !== filter.worker) return;
if (filter.level && event.level !== filter.level) return;
store.add(event);
app.addEvent(event);
});
@ -146,6 +169,10 @@ program
let otlpHttpServer: import('http').Server | undefined;
if (options.otlpHttp) {
otlpHttpServer = await startOtlpHttpListener(options.otlpHttp, (event) => {
// Apply filters before processing event
if (filter.worker && event.worker !== filter.worker) return;
if (filter.level && event.level !== filter.level) return;
store.add(event);
app.addEvent(event);
}, deduplicator);
@ -176,10 +203,14 @@ program
.option('-p, --port <number>', 'Port to listen on', '3000')
.option('-f, --file <path>', 'Log file to tail (single-file mode)')
.option('--source <path>', 'Log source (file or directory)', undefined)
.option('-w, --worker <id>', 'Filter to specific worker ID')
.option('-l, --level <level>', 'Filter by log level (debug, info, warn, error)')
.option('-a, --auth-token <token>', 'Auth token for POST endpoints (or use FABRIC_AUTH_TOKEN env var)')
.option('--otlp-grpc <addr>', 'Enable OTLP/gRPC receiver (e.g. :4317 or 0.0.0.0:4317)')
.option('--otlp-http <addr>', 'Enable OTLP/HTTP receiver (e.g. :4318 or 0.0.0.0:4318)')
.option('--max-events <number>', 'Max events in store before liveness guard exits (memory-bomb guard)')
.option('--heap-snapshots', 'Enable automatic heap snapshot capture for leak detection (default: true in production)')
.option('--snapshot-interval <minutes>', 'Interval between heap snapshots (default: 30)', '30')
.action(async (options) => {
const resolved = resolveFromOptions(options.source, options.file);
const port = parseInt(options.port, 10) || 3000;
@ -187,6 +218,19 @@ program
const otlpHttpAddr: string | undefined = options.otlpHttp;
const maxEventCount = options.maxEvents ? parseInt(options.maxEvents, 10) : undefined;
// Validate level filter if provided
const validLevels = ['debug', 'info', 'warn', 'error'];
const levelFilter = options.level?.toLowerCase();
if (levelFilter && !validLevels.includes(levelFilter)) {
console.error(`Invalid level: ${options.level}. Must be one of: ${validLevels.join(', ')}`);
process.exit(1);
}
// Build filter object
const filter: EventFilter = {};
if (options.worker) filter.worker = options.worker;
if (levelFilter) filter.level = levelFilter as LogLevel;
// Extract port number from --otlp-http (e.g. ":4318" or "0.0.0.0:4318" → 4318)
let otlpHttpPort: number | undefined;
if (otlpHttpAddr) {
@ -194,6 +238,21 @@ program
otlpHttpPort = match ? parseInt(match[1], 10) : undefined;
}
// Enable heap snapshots for leak detection in production (NODE_ENV=production)
const enableHeapSnapshots = options.heapSnapshots ?? (process.env.NODE_ENV === 'production');
const snapshotIntervalMinutes = parseInt(options.snapshotInterval, 10) || 30;
// Initialize memory profiler for leak detection
const { getMemoryProfiler } = await import('./memoryProfiler.js');
const profiler = getMemoryProfiler();
if (enableHeapSnapshots) {
profiler.writeSnapshots = true;
profiler.autoSnapshot = true;
profiler.snapshotIntervalMs = snapshotIntervalMinutes * 60 * 1000;
profiler.startPeriodicCapture();
console.error(`Heap snapshots enabled: every ${snapshotIntervalMinutes} minutes → ~/.needle/snapshots/`);
}
// Shared deduplicator for cross-source dedup when OTLP is active.
// Always created so dedup_dropped is reported accurately in /api/health.
const deduplicator = new EventDeduplicator();
@ -208,6 +267,7 @@ program
otlpHttpPort,
maxEventCount,
deduplicator,
cliFilter: Object.keys(filter).length > 0 ? filter : undefined,
});
// Setup log tailing
@ -222,6 +282,10 @@ program
});
tailer.on('event', (event) => {
// Apply filters before processing event
if (filter.worker && event.worker !== filter.worker) return;
if (filter.level && event.level !== filter.level) return;
store.add(event);
server.recordEvent();
server.broadcast(event);
@ -241,6 +305,10 @@ program
const { OtlpGrpcReceiver } = await import('./otlpGrpcReceiver.js');
otlpReceiver = new OtlpGrpcReceiver({ address: options.otlpGrpc, deduplicator });
otlpReceiver.on('event', (event) => {
// Apply filters before processing event
if (filter.worker && event.worker !== filter.worker) return;
if (filter.level && event.level !== filter.level) return;
store.add(event);
server.recordEvent();
server.broadcast(event);
@ -252,6 +320,7 @@ program
// Handle graceful shutdown
process.on('SIGINT', () => {
console.log('\nShutting down...');
profiler.stopPeriodicCapture();
tailer.stop();
otlpReceiver?.stop();
server.stop();