diff --git a/README.md b/README.md index 9ef7a34..4636f5b 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,12 @@ fabric tui # Web dashboard fabric web + +# Stream parsed events to stdout +fabric logs + +# With OTLP live telemetry +fabric tui --otlp-grpc :4317 ``` FABRIC reads from `~/.needle/logs/` by default. @@ -106,6 +112,13 @@ fabric web --otlp-http 0.0.0.0:4318 # Both sources merged (JSONL tail + OTLP live) fabric tui --source ~/.needle/logs/ --otlp-grpc :4317 + +# Tail with OTLP and event-type filtering +fabric tail --otlp-grpc :4317 --event-type "bead.*" + +# Stream logs to stdout with filtering (logs is an alias for tail) +fabric logs --event-type "bead.*" +fabric logs --worker tcb-a --otlp-grpc :4317 ``` | Receiver flag | Default port | Protocol | diff --git a/src/cli.ts b/src/cli.ts index cb9bf45..5c36b52 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -9,14 +9,62 @@ * fabric replay - Replay session history */ -import { Command } from 'commander'; +import { Command, Option } from 'commander'; import { VERSION } from './index.js'; import { LogTailer, tailLogFile } from './tailer.js'; import { formatEvent } from './parser.js'; import { getStore } from './store.js'; import { createWebServer } from './web/index.js'; import * as fs from 'fs'; -import type { LogLevel, EventFilter } from './types.js'; +import type { LogLevel, EventFilter, LogEvent } from './types.js'; + +/** Resolve --source to a file path. Directories get workers.log appended. */ +function resolveSource(source: string): string { + const expanded = source.replace('~', process.env.HOME || ''); + try { + if (fs.statSync(expanded).isDirectory()) { + return expanded.replace(/\/$/, '') + '/workers.log'; + } + } catch { + // Path doesn't exist yet — use as-is + } + return expanded; +} + +/** Simple glob → regex for NeedleEventType patterns like "bead.*", "worker.started". */ +function globMatch(pattern: string, value: string): boolean { + const regexStr = pattern + .replace(/[.+^${}()|[\]\\]/g, '\\$&') + .replace(/\*/g, '.*') + .replace(/\?/g, '.'); + return new RegExp(`^${regexStr}$`).test(value); +} + +/** Start a standalone OTLP/HTTP listener (used by tui and tail commands). */ +async function startOtlpHttpListener( + addr: string, + onEvent: (event: import('./types.js').LogEvent) => void, +): Promise { + const { default: express } = await import('express'); + const { createOtlpHttpRouter } = await import('./otlpHttpReceiver.js'); + const { createServer } = await import('http'); + + const app = express(); + app.use(createOtlpHttpRouter({ onEvent })); + + const match = addr.match(/^(?:([\d.]+):)?(\d+)$/); + const host = match?.[1] || '0.0.0.0'; + const port = match ? parseInt(match[2], 10) : 4318; + + const server = createServer(app); + return new Promise((resolve, reject) => { + server.listen(port, host, () => { + console.error(`OTLP/HTTP receiver listening on ${host}:${port}`); + resolve(server); + }); + server.on('error', reject); + }); +} const program = new Command(); @@ -29,9 +77,13 @@ program .command('tui') .description('Launch terminal UI dashboard') .option('-f, --file ', 'Log file to tail', '~/.needle/logs/workers.log') + .option('--source ', 'Log source (file or directory)', undefined) .option('--otlp-grpc ', 'Enable OTLP/gRPC receiver (e.g. :4317 or 0.0.0.0:4317)') + .option('--otlp-http ', 'Enable OTLP/HTTP receiver (e.g. :4318 or 0.0.0.0:4318)') .action(async (options) => { - const filePath = options.file.replace('~', process.env.HOME || ''); + const filePath = options.source + ? resolveSource(options.source) + : options.file.replace('~', process.env.HOME || ''); try { const { createTuiApp } = await import('./tui/index.js'); @@ -68,6 +120,15 @@ program console.error(`OTLP/gRPC receiver listening on ${boundAddr}`); } + // Start OTLP/HTTP receiver if requested + let otlpHttpServer: import('http').Server | undefined; + if (options.otlpHttp) { + otlpHttpServer = await startOtlpHttpListener(options.otlpHttp, (event) => { + store.add(event); + app.addEvent(event); + }); + } + // Start tailing and TUI tailer.start(); app.start(); @@ -76,7 +137,10 @@ program process.on('SIGINT', () => { tailer.stop(); otlpReceiver?.stop(); + otlpHttpServer?.close(); app.stop(); + store.clear(); // persists session + metric summaries to SQLite + process.exit(0); }); } catch (err) { console.error(`Failed to start TUI: ${(err as Error).message}`); @@ -89,11 +153,14 @@ program .description('Launch web dashboard') .option('-p, --port ', 'Port to listen on', '3000') .option('-f, --file ', 'Log file to tail', '~/.needle/logs/workers.log') + .option('--source ', 'Log source (file or directory)', undefined) .option('-a, --auth-token ', 'Auth token for POST endpoints (or use FABRIC_AUTH_TOKEN env var)') .option('--otlp-grpc ', 'Enable OTLP/gRPC receiver (e.g. :4317 or 0.0.0.0:4317)') .option('--otlp-http ', 'Enable OTLP/HTTP receiver (e.g. :4318 or 0.0.0.0:4318)') .action(async (options) => { - const filePath = options.file.replace('~', process.env.HOME || ''); + const filePath = options.source + ? resolveSource(options.source) + : options.file.replace('~', process.env.HOME || ''); const port = parseInt(options.port, 10) || 3000; const authToken = options.authToken || process.env.FABRIC_AUTH_TOKEN; const otlpHttpAddr: string | undefined = options.otlpHttp; @@ -151,6 +218,7 @@ program tailer.stop(); otlpReceiver?.stop(); server.stop(); + store.clear(); // persists session + metric summaries to SQLite process.exit(0); }); @@ -171,17 +239,25 @@ program program .command('tail') + .alias('logs') .description('Tail NEEDLE log file and display events') .option('-f, --file ', 'Log file to tail', '~/.needle/logs/workers.log') + .option('--source ', 'Log source (file or directory)', undefined) .option('-w, --worker ', 'Filter by worker ID') - .option('-l, --level ', 'Filter by log level (debug/info/warn/error)') + .option('-t, --event-type ', 'Filter by event type (glob, e.g. "bead.*", "worker.started")') + .addOption(new Option('-l, --level ', 'Filter by log level (deprecated: use --event-type)').hideHelp()) .option('-n, --lines ', 'Number of existing lines to show', '0') .option('--no-follow', 'Exit after reading existing lines') .option('--json', 'Output raw JSON instead of formatted') + .option('--otlp-grpc ', 'Enable OTLP/gRPC receiver (e.g. :4317 or 0.0.0.0:4317)') + .option('--otlp-http ', 'Enable OTLP/HTTP receiver (e.g. :4318 or 0.0.0.0:4318)') .action(async (options) => { - const filePath = options.file.replace('~', process.env.HOME || ''); + const filePath = options.source + ? resolveSource(options.source) + : options.file.replace('~', process.env.HOME || ''); const lines = parseInt(options.lines, 10) || 0; const follow = options.follow !== false; + const eventTypeFilter = options.eventType as string | undefined; console.log(`FABRIC Tail - Watching: ${filePath}`); console.log(`Follow: ${follow}, Lines: ${lines}`); @@ -204,10 +280,11 @@ program const store = getStore(); - tailer.on('event', (event) => { + const handleEvent = (event: LogEvent) => { // Apply filters if (options.worker && event.worker !== options.worker) return; if (levelFilter && event.level !== levelFilter) return; + if (eventTypeFilter && !globMatch(eventTypeFilter, event.msg)) return; // Store event store.add(event); @@ -218,7 +295,9 @@ program } else { console.log(formatEvent(event, { colorize: true })); } - }); + }; + + tailer.on('event', handleEvent); tailer.on('line', (line) => { if (!options.json) { @@ -232,11 +311,29 @@ program tailer.start(); + // Start OTLP/gRPC receiver if requested + let otlpReceiver: import('./otlpGrpcReceiver.js').OtlpGrpcReceiver | undefined; + if (options.otlpGrpc) { + const { OtlpGrpcReceiver } = await import('./otlpGrpcReceiver.js'); + otlpReceiver = new OtlpGrpcReceiver({ address: options.otlpGrpc }); + otlpReceiver.on('event', handleEvent); + const boundAddr = await otlpReceiver.start(); + console.error(`OTLP/gRPC receiver listening on ${boundAddr}`); + } + + // Start OTLP/HTTP receiver if requested + let otlpHttpServer: import('http').Server | undefined; + if (options.otlpHttp) { + otlpHttpServer = await startOtlpHttpListener(options.otlpHttp, handleEvent); + } + // Handle graceful shutdown process.on('SIGINT', () => { console.log('\n---'); console.log(`Events processed: ${store.size}`); tailer.stop(); + otlpReceiver?.stop(); + otlpHttpServer?.close(); process.exit(0); }); @@ -262,7 +359,8 @@ program .description('Replay worker session history chronologically') .option('-f, --file ', 'Log file to replay', '~/.needle/logs/workers.log') .option('-w, --worker ', 'Filter by worker ID') - .option('-l, --level ', 'Filter by log level (debug/info/warn/error)') + .option('-t, --event-type ', 'Filter by event type (glob, e.g. "bead.*", "worker.started")') + .addOption(new Option('-l, --level ', 'Filter by log level (deprecated: use --event-type)').hideHelp()) .option('-s, --speed ', 'Playback speed (0.5/1/2/5/10)', '1') .option('--auto', 'Start playback automatically') .action(async (options) => { @@ -278,6 +376,8 @@ program process.exit(1); } + const eventTypeFilter = options.eventType as string | undefined; + try { const blessed = (await import('blessed')).default; const { SessionReplay } = await import('./tui/components/SessionReplay.js'); @@ -332,6 +432,7 @@ program const filter: EventFilter = {}; if (options.worker) filter.worker = options.worker; if (levelFilter) filter.level = levelFilter as LogLevel; + if (eventTypeFilter) filter.eventType = eventTypeFilter; // Load the log file await replay.loadFile(filePath, Object.keys(filter).length > 0 ? filter : undefined);