From a6699db07f3bb4b0887d41fa930b0de7895f6ef7 Mon Sep 17 00:00:00 2001 From: jeda Date: Tue, 3 Mar 2026 04:58:42 +0000 Subject: [PATCH] feat: implement Phase 1 core infrastructure for FABRIC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements working `fabric tail` command with: - Log tailer module (src/tailer.ts) - File watching with fs.watch - Follow mode for real-time updates - Buffer management for partial lines - Graceful shutdown handling - JSON parser module (src/parser.ts) - Validates NEEDLE log format - Extracts optional fields (tool, path, bead, duration_ms, error) - Human-readable event formatting with color support - In-memory event store (src/store.ts) - Stores events with worker indexing - Supports querying with filters - Auto-updates worker status based on events - Working CLI (src/cli.ts) - Filters by worker (-w) and level (-l) - Supports --json output - Shows existing lines with -n flag Resolves HUMAN bead bd-17q (worker starvation) by providing working implementation and creating beads for remaining phases. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/cli.ts | 93 ++++++++++++++++++-- src/parser.ts | 198 +++++++++++++++++++++++++++++++++++++++++ src/store.ts | 130 +++++++++++++++++++++++++++ src/tailer.ts | 237 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 650 insertions(+), 8 deletions(-) create mode 100644 src/parser.ts create mode 100644 src/store.ts create mode 100644 src/tailer.ts diff --git a/src/cli.ts b/src/cli.ts index 1be6d87..e969b36 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -10,6 +10,9 @@ import { Command } from 'commander'; import { VERSION } from './index.js'; +import { LogTailer, tailLogFile } from './tailer.js'; +import { formatEvent } from './parser.js'; +import { getStore } from './store.js'; const program = new Command(); @@ -42,16 +45,90 @@ program program .command('tail') - .description('Raw log tail (for debugging)') + .description('Tail NEEDLE log file and display events') .option('-f, --file ', 'Log file to tail', '~/.needle/logs/workers.log') .option('-w, --worker ', 'Filter by worker ID') - .option('-l, --level ', 'Filter by log level') - .action((options) => { - console.log('FABRIC Raw Tail'); - console.log(`File: ${options.file}`); - if (options.worker) console.log(`Worker filter: ${options.worker}`); - if (options.level) console.log(`Level filter: ${options.level}`); - console.log('\n(Tail implementation coming in Phase 1)'); + .option('-l, --level ', 'Filter by log level (debug/info/warn/error)') + .option('-n, --lines ', 'Number of existing lines to show', '0') + .option('--no-follow', 'Exit after reading existing lines') + .option('--json', 'Output raw JSON instead of formatted') + .action(async (options) => { + const filePath = options.file.replace('~', process.env.HOME || ''); + const lines = parseInt(options.lines, 10) || 0; + const follow = options.follow !== false; + + console.log(`FABRIC Tail - Watching: ${filePath}`); + console.log(`Follow: ${follow}, Lines: ${lines}`); + console.log('---'); + + const validLevels = ['debug', 'info', 'warn', 'error']; + const levelFilter = options.level?.toLowerCase(); + if (levelFilter && !validLevels.includes(levelFilter)) { + console.error(`Invalid level: ${options.level}. Must be one of: ${validLevels.join(', ')}`); + process.exit(1); + } + + try { + const tailer = new LogTailer({ + path: filePath, + parseJson: true, + follow, + lines, + }); + + const store = getStore(); + + tailer.on('event', (event) => { + // Apply filters + if (options.worker && event.worker !== options.worker) return; + if (levelFilter && event.level !== levelFilter) return; + + // Store event + store.add(event); + + // Output + if (options.json) { + console.log(JSON.stringify(event)); + } else { + console.log(formatEvent(event, { colorize: true })); + } + }); + + tailer.on('line', (line) => { + if (!options.json) { + // Only show raw lines in non-JSON mode for unparseable lines + } + }); + + tailer.on('error', (err) => { + console.error(`Error: ${err.message}`); + }); + + tailer.start(); + + // Handle graceful shutdown + process.on('SIGINT', () => { + console.log('\n---'); + console.log(`Events processed: ${store.size}`); + tailer.stop(); + process.exit(0); + }); + + // Keep process alive if following + if (follow) { + await new Promise(() => {}); // Never resolves + } else { + await new Promise((resolve) => { + setTimeout(() => { + tailer.stop(); + resolve(); + }, 100); // Small delay to let initial reads complete + }); + } + } catch (err) { + console.error(`Failed to tail: ${(err as Error).message}`); + process.exit(1); + } }); program.parse(); diff --git a/src/parser.ts b/src/parser.ts new file mode 100644 index 0000000..e331429 --- /dev/null +++ b/src/parser.ts @@ -0,0 +1,198 @@ +/** + * FABRIC Log Parser + * + * Parses NEEDLE log lines into structured LogEvent objects. + */ + +import { LogEvent, LogLevel } from './types.js'; + +/** + * Parse a single log line + * + * @param line - Raw log line (JSON string) + * @returns Parsed LogEvent or null if invalid + */ +export function parseLogLine(line: string): LogEvent | null { + // Skip empty lines + if (!line || !line.trim()) { + return null; + } + + try { + const parsed = JSON.parse(line); + + // Validate required fields + if (typeof parsed.ts !== 'number') { + return null; + } + if (typeof parsed.worker !== 'string') { + return null; + } + if (!isValidLogLevel(parsed.level)) { + return null; + } + if (typeof parsed.msg !== 'string') { + return null; + } + + // Construct LogEvent with validated fields + const event: LogEvent = { + ts: parsed.ts, + worker: parsed.worker, + level: parsed.level, + msg: parsed.msg, + }; + + // Copy optional fields if present + if (typeof parsed.tool === 'string') event.tool = parsed.tool; + if (typeof parsed.path === 'string') event.path = parsed.path; + if (typeof parsed.bead === 'string') event.bead = parsed.bead; + if (typeof parsed.duration_ms === 'number') event.duration_ms = parsed.duration_ms; + if (typeof parsed.error === 'string') event.error = parsed.error; + + // Copy any additional fields + for (const key of Object.keys(parsed)) { + if (!isStandardField(key) && !(key in event)) { + event[key] = parsed[key]; + } + } + + return event; + } catch { + // Not valid JSON + return null; + } +} + +/** + * Parse multiple log lines + * + * @param content - Multi-line string of log entries + * @returns Array of parsed LogEvents (skips invalid lines) + */ +export function parseLogLines(content: string): LogEvent[] { + const events: LogEvent[] = []; + + for (const line of content.split('\n')) { + const event = parseLogLine(line); + if (event) { + events.push(event); + } + } + + return events; +} + +/** + * Format a LogEvent for display + */ +export function formatEvent(event: LogEvent, options: FormatOptions = {}): string { + const { showWorker = true, showLevel = true, colorize = false } = options; + + const timestamp = formatTimestamp(event.ts); + const parts: string[] = []; + + if (showWorker) { + parts.push(padWorker(event.worker)); + } + + if (showLevel) { + parts.push(formatLevel(event.level, colorize)); + } + + parts.push(event.msg); + + // Add optional context + if (event.tool) { + parts.push(`[${event.tool}]`); + } + if (event.path) { + parts.push(event.path); + } + if (event.bead) { + parts.push(`bead:${event.bead}`); + } + if (event.duration_ms !== undefined) { + parts.push(`(${formatDuration(event.duration_ms)})`); + } + if (event.error) { + parts.push(`ERROR: ${event.error}`); + } + + return `${timestamp} ${parts.join(' ')}`; +} + +export interface FormatOptions { + showWorker?: boolean; + showLevel?: boolean; + colorize?: boolean; +} + +/** + * Check if level is valid + */ +function isValidLogLevel(level: unknown): level is LogLevel { + return level === 'debug' || level === 'info' || level === 'warn' || level === 'error'; +} + +/** + * Check if field is a standard LogEvent field + */ +function isStandardField(key: string): boolean { + return ['ts', 'worker', 'level', 'msg', 'tool', 'path', 'bead', 'duration_ms', 'error'].includes(key); +} + +/** + * Format timestamp for display + */ +function formatTimestamp(ts: number): string { + const date = new Date(ts); + const hours = date.getHours().toString().padStart(2, '0'); + const minutes = date.getMinutes().toString().padStart(2, '0'); + const seconds = date.getSeconds().toString().padStart(2, '0'); + return `${hours}:${minutes}:${seconds}`; +} + +/** + * Pad worker ID for alignment + */ +function padWorker(worker: string): string { + return worker.padEnd(12); +} + +/** + * Format log level with optional color + */ +function formatLevel(level: LogLevel, colorize: boolean): string { + const padded = level.toUpperCase().padEnd(5); + + if (!colorize) { + return padded; + } + + // ANSI color codes + const colors: Record = { + debug: '\x1b[36m', // cyan + info: '\x1b[32m', // green + warn: '\x1b[33m', // yellow + error: '\x1b[31m', // red + }; + const reset = '\x1b[0m'; + + return `${colors[level]}${padded}${reset}`; +} + +/** + * Format duration in human-readable form + */ +function formatDuration(ms: number): string { + if (ms < 1000) { + return `${ms}ms`; + } else if (ms < 60000) { + return `${(ms / 1000).toFixed(1)}s`; + } else { + const minutes = Math.floor(ms / 60000); + const seconds = Math.round((ms % 60000) / 1000); + return `${minutes}m ${seconds}s`; + } +} diff --git a/src/store.ts b/src/store.ts new file mode 100644 index 0000000..14b8a89 --- /dev/null +++ b/src/store.ts @@ -0,0 +1,130 @@ +/** + * FABRIC In-Memory Event Store + * + * Stores and indexes LogEvents for efficient querying. + */ + +import { LogEvent, WorkerInfo, WorkerStatus, EventFilter, EventStore } from './types.js'; + +export class InMemoryEventStore implements EventStore { + private events: LogEvent[] = []; + private workers: Map = new Map(); + private maxEvents: number; + + constructor(maxEvents: number = 10000) { + this.maxEvents = maxEvents; + } + + /** + * Add an event to the store + */ + add(event: LogEvent): void { + this.events.push(event); + this.updateWorkerInfo(event); + + // Trim if over limit + if (this.events.length > this.maxEvents) { + this.events.shift(); + } + } + + /** + * Query events with optional filter + */ + query(filter?: EventFilter): LogEvent[] { + if (!filter) { + return [...this.events]; + } + + return this.events.filter((event) => { + if (filter.worker && event.worker !== filter.worker) return false; + if (filter.level && event.level !== filter.level) return false; + if (filter.bead && event.bead !== filter.bead) return false; + if (filter.path && event.path !== filter.path) return false; + if (filter.since && event.ts < filter.since) return false; + if (filter.until && event.ts > filter.until) return false; + return true; + }); + } + + /** + * Get worker info + */ + getWorker(workerId: string): WorkerInfo | undefined { + return this.workers.get(workerId); + } + + /** + * Get all workers + */ + getWorkers(): WorkerInfo[] { + return Array.from(this.workers.values()); + } + + /** + * Clear all events + */ + clear(): void { + this.events = []; + this.workers.clear(); + } + + /** + * Get event count + */ + get size(): number { + return this.events.length; + } + + /** + * Update worker info based on event + */ + private updateWorkerInfo(event: LogEvent): void { + let worker = this.workers.get(event.worker); + + if (!worker) { + worker = { + id: event.worker, + status: 'active', + beadsCompleted: 0, + firstSeen: event.ts, + lastActivity: event.ts, + }; + this.workers.set(event.worker, worker); + } + + // Update last activity + worker.lastActivity = event.ts; + + // Update status based on event + if (event.level === 'error') { + worker.status = 'error'; + } else if (event.msg.includes('completed') || event.msg.includes('complete')) { + worker.status = 'idle'; + if (event.bead) { + worker.beadsCompleted++; + } + } else if (event.msg.includes('Starting') || event.msg.includes('starting')) { + worker.status = 'active'; + } + + // Update last event + worker.lastEvent = event; + } +} + +/** + * Create a singleton store instance + */ +let globalStore: InMemoryEventStore | undefined; + +export function getStore(): InMemoryEventStore { + if (!globalStore) { + globalStore = new InMemoryEventStore(); + } + return globalStore; +} + +export function resetStore(): void { + globalStore = undefined; +} diff --git a/src/tailer.ts b/src/tailer.ts new file mode 100644 index 0000000..28a3bb9 --- /dev/null +++ b/src/tailer.ts @@ -0,0 +1,237 @@ +/** + * FABRIC Log Tailer + * + * Watches and tails NEEDLE log files, emitting events as lines are parsed. + */ + +import * as fs from 'fs'; +import * as path from 'path'; +import { EventEmitter } from 'events'; +import { LogEvent } from './types.js'; +import { parseLogLine } from './parser.js'; + +export interface TailerOptions { + /** Path to log file or directory */ + path: string; + + /** Parse as JSON log lines */ + parseJson?: boolean; + + /** Follow mode (watch for new lines) */ + follow?: boolean; + + /** Number of existing lines to read on start */ + lines?: number; +} + +export interface TailerEvents { + 'event': (event: LogEvent) => void; + 'line': (line: string) => void; + 'error': (error: Error) => void; + 'end': () => void; +} + +export class LogTailer extends EventEmitter { + private filePath: string; + private parseJson: boolean; + private follow: boolean; + private lines: number; + private watcher?: fs.FSWatcher; + private position: number = 0; + private buffer: string = ''; + private ended: boolean = false; + + constructor(options: TailerOptions) { + super(); + this.filePath = this.expandPath(options.path); + this.parseJson = options.parseJson ?? true; + this.follow = options.follow ?? true; + this.lines = options.lines ?? 0; + } + + /** + * Expand ~ to home directory + */ + private expandPath(p: string): string { + if (p.startsWith('~')) { + return path.join(process.env.HOME || '', p.slice(1)); + } + return p; + } + + /** + * Start tailing the log file + */ + start(): void { + // Check if file exists + if (!fs.existsSync(this.filePath)) { + this.emit('error', new Error(`Log file not found: ${this.filePath}`)); + return; + } + + // Read existing content if requested + if (this.lines > 0) { + this.readExistingLines(); + } else { + // Start from end of file + const stats = fs.statSync(this.filePath); + this.position = stats.size; + } + + // Watch for changes if follow mode + if (this.follow) { + this.watch(); + } + } + + /** + * Read existing lines from file + */ + private readExistingLines(): void { + const content = fs.readFileSync(this.filePath, 'utf-8'); + const allLines = content.split('\n'); + + // Get last N lines + const startIdx = Math.max(0, allLines.length - this.lines - 1); + const lines = allLines.slice(startIdx); + + for (const line of lines) { + if (line.trim()) { + this.processLine(line); + } + } + + // Update position to end of file + this.position = Buffer.byteLength(content, 'utf-8'); + } + + /** + * Watch file for changes + */ + private watch(): void { + this.watcher = fs.watch(this.filePath, (eventType) => { + if (eventType === 'change') { + this.readNewContent(); + } else if (eventType === 'rename') { + // File was rotated or deleted + this.checkFileExists(); + } + }); + + this.watcher.on('error', (err) => { + this.emit('error', err); + }); + } + + /** + * Read new content from file + */ + private readNewContent(): void { + try { + const stats = fs.statSync(this.filePath); + if (stats.size < this.position) { + // File was truncated, start from beginning + this.position = 0; + } + + if (stats.size > this.position) { + const fd = fs.openSync(this.filePath, 'r'); + const buffer = Buffer.alloc(stats.size - this.position); + fs.readSync(fd, buffer, 0, buffer.length, this.position); + fs.closeSync(fd); + + this.position = stats.size; + this.buffer += buffer.toString('utf-8'); + + // Process complete lines + const lines = this.buffer.split('\n'); + this.buffer = lines.pop() || ''; // Keep incomplete line in buffer + + for (const line of lines) { + if (line.trim()) { + this.processLine(line); + } + } + } + } catch (err) { + this.emit('error', err as Error); + } + } + + /** + * Check if file still exists + */ + private checkFileExists(): void { + if (!fs.existsSync(this.filePath)) { + // Wait for file to be recreated + setTimeout(() => { + if (fs.existsSync(this.filePath)) { + this.position = 0; + this.readNewContent(); + } + }, 1000); + } + } + + /** + * Process a single line + */ + private processLine(line: string): void { + this.emit('line', line); + + if (this.parseJson) { + const event = parseLogLine(line); + if (event) { + this.emit('event', event); + } + } + } + + /** + * Stop tailing + */ + stop(): void { + if (this.watcher) { + this.watcher.close(); + this.watcher = undefined; + } + this.ended = true; + this.emit('end'); + } + + /** + * Check if tailer is active + */ + get isActive(): boolean { + return !this.ended && this.watcher !== undefined; + } +} + +/** + * Tail a log file and return a promise that resolves when done + */ +export function tailLogFile(options: TailerOptions): Promise { + return new Promise((resolve, reject) => { + const tailer = new LogTailer(options); + + tailer.on('error', (err) => { + reject(err); + tailer.stop(); + }); + + tailer.on('end', () => { + resolve(); + }); + + // Handle SIGINT gracefully + const handleExit = () => { + tailer.stop(); + resolve(); + }; + + process.on('SIGINT', handleExit); + process.on('SIGTERM', handleExit); + + tailer.start(); + }); +}