feat: implement Phase 1 core infrastructure for FABRIC

Implements working `fabric tail` command with:

- Log tailer module (src/tailer.ts)
  - File watching with fs.watch
  - Follow mode for real-time updates
  - Buffer management for partial lines
  - Graceful shutdown handling

- JSON parser module (src/parser.ts)
  - Validates NEEDLE log format
  - Extracts optional fields (tool, path, bead, duration_ms, error)
  - Human-readable event formatting with color support

- In-memory event store (src/store.ts)
  - Stores events with worker indexing
  - Supports querying with filters
  - Auto-updates worker status based on events

- Working CLI (src/cli.ts)
  - Filters by worker (-w) and level (-l)
  - Supports --json output
  - Shows existing lines with -n flag

Resolves HUMAN bead bd-17q (worker starvation) by providing
working implementation and creating beads for remaining phases.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
jeda 2026-03-03 04:58:42 +00:00
parent 69f3c1d5ab
commit a6699db07f
4 changed files with 650 additions and 8 deletions

View file

@ -10,6 +10,9 @@
import { Command } from 'commander';
import { VERSION } from './index.js';
import { LogTailer, tailLogFile } from './tailer.js';
import { formatEvent } from './parser.js';
import { getStore } from './store.js';
const program = new Command();
@ -42,16 +45,90 @@ program
program
.command('tail')
.description('Raw log tail (for debugging)')
.description('Tail NEEDLE log file and display events')
.option('-f, --file <path>', 'Log file to tail', '~/.needle/logs/workers.log')
.option('-w, --worker <id>', 'Filter by worker ID')
.option('-l, --level <level>', 'Filter by log level')
.action((options) => {
console.log('FABRIC Raw Tail');
console.log(`File: ${options.file}`);
if (options.worker) console.log(`Worker filter: ${options.worker}`);
if (options.level) console.log(`Level filter: ${options.level}`);
console.log('\n(Tail implementation coming in Phase 1)');
.option('-l, --level <level>', 'Filter by log level (debug/info/warn/error)')
.option('-n, --lines <number>', 'Number of existing lines to show', '0')
.option('--no-follow', 'Exit after reading existing lines')
.option('--json', 'Output raw JSON instead of formatted')
.action(async (options) => {
const filePath = options.file.replace('~', process.env.HOME || '');
const lines = parseInt(options.lines, 10) || 0;
const follow = options.follow !== false;
console.log(`FABRIC Tail - Watching: ${filePath}`);
console.log(`Follow: ${follow}, Lines: ${lines}`);
console.log('---');
const validLevels = ['debug', 'info', 'warn', 'error'];
const levelFilter = options.level?.toLowerCase();
if (levelFilter && !validLevels.includes(levelFilter)) {
console.error(`Invalid level: ${options.level}. Must be one of: ${validLevels.join(', ')}`);
process.exit(1);
}
try {
const tailer = new LogTailer({
path: filePath,
parseJson: true,
follow,
lines,
});
const store = getStore();
tailer.on('event', (event) => {
// Apply filters
if (options.worker && event.worker !== options.worker) return;
if (levelFilter && event.level !== levelFilter) return;
// Store event
store.add(event);
// Output
if (options.json) {
console.log(JSON.stringify(event));
} else {
console.log(formatEvent(event, { colorize: true }));
}
});
tailer.on('line', (line) => {
if (!options.json) {
// Only show raw lines in non-JSON mode for unparseable lines
}
});
tailer.on('error', (err) => {
console.error(`Error: ${err.message}`);
});
tailer.start();
// Handle graceful shutdown
process.on('SIGINT', () => {
console.log('\n---');
console.log(`Events processed: ${store.size}`);
tailer.stop();
process.exit(0);
});
// Keep process alive if following
if (follow) {
await new Promise(() => {}); // Never resolves
} else {
await new Promise<void>((resolve) => {
setTimeout(() => {
tailer.stop();
resolve();
}, 100); // Small delay to let initial reads complete
});
}
} catch (err) {
console.error(`Failed to tail: ${(err as Error).message}`);
process.exit(1);
}
});
program.parse();

198
src/parser.ts Normal file
View file

@ -0,0 +1,198 @@
/**
* FABRIC Log Parser
*
* Parses NEEDLE log lines into structured LogEvent objects.
*/
import { LogEvent, LogLevel } from './types.js';
/**
* Parse a single log line
*
* @param line - Raw log line (JSON string)
* @returns Parsed LogEvent or null if invalid
*/
export function parseLogLine(line: string): LogEvent | null {
// Skip empty lines
if (!line || !line.trim()) {
return null;
}
try {
const parsed = JSON.parse(line);
// Validate required fields
if (typeof parsed.ts !== 'number') {
return null;
}
if (typeof parsed.worker !== 'string') {
return null;
}
if (!isValidLogLevel(parsed.level)) {
return null;
}
if (typeof parsed.msg !== 'string') {
return null;
}
// Construct LogEvent with validated fields
const event: LogEvent = {
ts: parsed.ts,
worker: parsed.worker,
level: parsed.level,
msg: parsed.msg,
};
// Copy optional fields if present
if (typeof parsed.tool === 'string') event.tool = parsed.tool;
if (typeof parsed.path === 'string') event.path = parsed.path;
if (typeof parsed.bead === 'string') event.bead = parsed.bead;
if (typeof parsed.duration_ms === 'number') event.duration_ms = parsed.duration_ms;
if (typeof parsed.error === 'string') event.error = parsed.error;
// Copy any additional fields
for (const key of Object.keys(parsed)) {
if (!isStandardField(key) && !(key in event)) {
event[key] = parsed[key];
}
}
return event;
} catch {
// Not valid JSON
return null;
}
}
/**
* Parse multiple log lines
*
* @param content - Multi-line string of log entries
* @returns Array of parsed LogEvents (skips invalid lines)
*/
export function parseLogLines(content: string): LogEvent[] {
const events: LogEvent[] = [];
for (const line of content.split('\n')) {
const event = parseLogLine(line);
if (event) {
events.push(event);
}
}
return events;
}
/**
* Format a LogEvent for display
*/
export function formatEvent(event: LogEvent, options: FormatOptions = {}): string {
const { showWorker = true, showLevel = true, colorize = false } = options;
const timestamp = formatTimestamp(event.ts);
const parts: string[] = [];
if (showWorker) {
parts.push(padWorker(event.worker));
}
if (showLevel) {
parts.push(formatLevel(event.level, colorize));
}
parts.push(event.msg);
// Add optional context
if (event.tool) {
parts.push(`[${event.tool}]`);
}
if (event.path) {
parts.push(event.path);
}
if (event.bead) {
parts.push(`bead:${event.bead}`);
}
if (event.duration_ms !== undefined) {
parts.push(`(${formatDuration(event.duration_ms)})`);
}
if (event.error) {
parts.push(`ERROR: ${event.error}`);
}
return `${timestamp} ${parts.join(' ')}`;
}
export interface FormatOptions {
showWorker?: boolean;
showLevel?: boolean;
colorize?: boolean;
}
/**
* Check if level is valid
*/
function isValidLogLevel(level: unknown): level is LogLevel {
return level === 'debug' || level === 'info' || level === 'warn' || level === 'error';
}
/**
* Check if field is a standard LogEvent field
*/
function isStandardField(key: string): boolean {
return ['ts', 'worker', 'level', 'msg', 'tool', 'path', 'bead', 'duration_ms', 'error'].includes(key);
}
/**
* Format timestamp for display
*/
function formatTimestamp(ts: number): string {
const date = new Date(ts);
const hours = date.getHours().toString().padStart(2, '0');
const minutes = date.getMinutes().toString().padStart(2, '0');
const seconds = date.getSeconds().toString().padStart(2, '0');
return `${hours}:${minutes}:${seconds}`;
}
/**
* Pad worker ID for alignment
*/
function padWorker(worker: string): string {
return worker.padEnd(12);
}
/**
* Format log level with optional color
*/
function formatLevel(level: LogLevel, colorize: boolean): string {
const padded = level.toUpperCase().padEnd(5);
if (!colorize) {
return padded;
}
// ANSI color codes
const colors: Record<LogLevel, string> = {
debug: '\x1b[36m', // cyan
info: '\x1b[32m', // green
warn: '\x1b[33m', // yellow
error: '\x1b[31m', // red
};
const reset = '\x1b[0m';
return `${colors[level]}${padded}${reset}`;
}
/**
* Format duration in human-readable form
*/
function formatDuration(ms: number): string {
if (ms < 1000) {
return `${ms}ms`;
} else if (ms < 60000) {
return `${(ms / 1000).toFixed(1)}s`;
} else {
const minutes = Math.floor(ms / 60000);
const seconds = Math.round((ms % 60000) / 1000);
return `${minutes}m ${seconds}s`;
}
}

130
src/store.ts Normal file
View file

@ -0,0 +1,130 @@
/**
* FABRIC In-Memory Event Store
*
* Stores and indexes LogEvents for efficient querying.
*/
import { LogEvent, WorkerInfo, WorkerStatus, EventFilter, EventStore } from './types.js';
export class InMemoryEventStore implements EventStore {
private events: LogEvent[] = [];
private workers: Map<string, WorkerInfo> = new Map();
private maxEvents: number;
constructor(maxEvents: number = 10000) {
this.maxEvents = maxEvents;
}
/**
* Add an event to the store
*/
add(event: LogEvent): void {
this.events.push(event);
this.updateWorkerInfo(event);
// Trim if over limit
if (this.events.length > this.maxEvents) {
this.events.shift();
}
}
/**
* Query events with optional filter
*/
query(filter?: EventFilter): LogEvent[] {
if (!filter) {
return [...this.events];
}
return this.events.filter((event) => {
if (filter.worker && event.worker !== filter.worker) return false;
if (filter.level && event.level !== filter.level) return false;
if (filter.bead && event.bead !== filter.bead) return false;
if (filter.path && event.path !== filter.path) return false;
if (filter.since && event.ts < filter.since) return false;
if (filter.until && event.ts > filter.until) return false;
return true;
});
}
/**
* Get worker info
*/
getWorker(workerId: string): WorkerInfo | undefined {
return this.workers.get(workerId);
}
/**
* Get all workers
*/
getWorkers(): WorkerInfo[] {
return Array.from(this.workers.values());
}
/**
* Clear all events
*/
clear(): void {
this.events = [];
this.workers.clear();
}
/**
* Get event count
*/
get size(): number {
return this.events.length;
}
/**
* Update worker info based on event
*/
private updateWorkerInfo(event: LogEvent): void {
let worker = this.workers.get(event.worker);
if (!worker) {
worker = {
id: event.worker,
status: 'active',
beadsCompleted: 0,
firstSeen: event.ts,
lastActivity: event.ts,
};
this.workers.set(event.worker, worker);
}
// Update last activity
worker.lastActivity = event.ts;
// Update status based on event
if (event.level === 'error') {
worker.status = 'error';
} else if (event.msg.includes('completed') || event.msg.includes('complete')) {
worker.status = 'idle';
if (event.bead) {
worker.beadsCompleted++;
}
} else if (event.msg.includes('Starting') || event.msg.includes('starting')) {
worker.status = 'active';
}
// Update last event
worker.lastEvent = event;
}
}
/**
* Create a singleton store instance
*/
let globalStore: InMemoryEventStore | undefined;
export function getStore(): InMemoryEventStore {
if (!globalStore) {
globalStore = new InMemoryEventStore();
}
return globalStore;
}
export function resetStore(): void {
globalStore = undefined;
}

237
src/tailer.ts Normal file
View file

@ -0,0 +1,237 @@
/**
* FABRIC Log Tailer
*
* Watches and tails NEEDLE log files, emitting events as lines are parsed.
*/
import * as fs from 'fs';
import * as path from 'path';
import { EventEmitter } from 'events';
import { LogEvent } from './types.js';
import { parseLogLine } from './parser.js';
export interface TailerOptions {
/** Path to log file or directory */
path: string;
/** Parse as JSON log lines */
parseJson?: boolean;
/** Follow mode (watch for new lines) */
follow?: boolean;
/** Number of existing lines to read on start */
lines?: number;
}
export interface TailerEvents {
'event': (event: LogEvent) => void;
'line': (line: string) => void;
'error': (error: Error) => void;
'end': () => void;
}
export class LogTailer extends EventEmitter {
private filePath: string;
private parseJson: boolean;
private follow: boolean;
private lines: number;
private watcher?: fs.FSWatcher;
private position: number = 0;
private buffer: string = '';
private ended: boolean = false;
constructor(options: TailerOptions) {
super();
this.filePath = this.expandPath(options.path);
this.parseJson = options.parseJson ?? true;
this.follow = options.follow ?? true;
this.lines = options.lines ?? 0;
}
/**
* Expand ~ to home directory
*/
private expandPath(p: string): string {
if (p.startsWith('~')) {
return path.join(process.env.HOME || '', p.slice(1));
}
return p;
}
/**
* Start tailing the log file
*/
start(): void {
// Check if file exists
if (!fs.existsSync(this.filePath)) {
this.emit('error', new Error(`Log file not found: ${this.filePath}`));
return;
}
// Read existing content if requested
if (this.lines > 0) {
this.readExistingLines();
} else {
// Start from end of file
const stats = fs.statSync(this.filePath);
this.position = stats.size;
}
// Watch for changes if follow mode
if (this.follow) {
this.watch();
}
}
/**
* Read existing lines from file
*/
private readExistingLines(): void {
const content = fs.readFileSync(this.filePath, 'utf-8');
const allLines = content.split('\n');
// Get last N lines
const startIdx = Math.max(0, allLines.length - this.lines - 1);
const lines = allLines.slice(startIdx);
for (const line of lines) {
if (line.trim()) {
this.processLine(line);
}
}
// Update position to end of file
this.position = Buffer.byteLength(content, 'utf-8');
}
/**
* Watch file for changes
*/
private watch(): void {
this.watcher = fs.watch(this.filePath, (eventType) => {
if (eventType === 'change') {
this.readNewContent();
} else if (eventType === 'rename') {
// File was rotated or deleted
this.checkFileExists();
}
});
this.watcher.on('error', (err) => {
this.emit('error', err);
});
}
/**
* Read new content from file
*/
private readNewContent(): void {
try {
const stats = fs.statSync(this.filePath);
if (stats.size < this.position) {
// File was truncated, start from beginning
this.position = 0;
}
if (stats.size > this.position) {
const fd = fs.openSync(this.filePath, 'r');
const buffer = Buffer.alloc(stats.size - this.position);
fs.readSync(fd, buffer, 0, buffer.length, this.position);
fs.closeSync(fd);
this.position = stats.size;
this.buffer += buffer.toString('utf-8');
// Process complete lines
const lines = this.buffer.split('\n');
this.buffer = lines.pop() || ''; // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim()) {
this.processLine(line);
}
}
}
} catch (err) {
this.emit('error', err as Error);
}
}
/**
* Check if file still exists
*/
private checkFileExists(): void {
if (!fs.existsSync(this.filePath)) {
// Wait for file to be recreated
setTimeout(() => {
if (fs.existsSync(this.filePath)) {
this.position = 0;
this.readNewContent();
}
}, 1000);
}
}
/**
* Process a single line
*/
private processLine(line: string): void {
this.emit('line', line);
if (this.parseJson) {
const event = parseLogLine(line);
if (event) {
this.emit('event', event);
}
}
}
/**
* Stop tailing
*/
stop(): void {
if (this.watcher) {
this.watcher.close();
this.watcher = undefined;
}
this.ended = true;
this.emit('end');
}
/**
* Check if tailer is active
*/
get isActive(): boolean {
return !this.ended && this.watcher !== undefined;
}
}
/**
* Tail a log file and return a promise that resolves when done
*/
export function tailLogFile(options: TailerOptions): Promise<void> {
return new Promise((resolve, reject) => {
const tailer = new LogTailer(options);
tailer.on('error', (err) => {
reject(err);
tailer.stop();
});
tailer.on('end', () => {
resolve();
});
// Handle SIGINT gracefully
const handleExit = () => {
tailer.stop();
resolve();
};
process.on('SIGINT', handleExit);
process.on('SIGTERM', handleExit);
tailer.start();
});
}