From cf7f72721060d501050bdfc9c182d75c3e1c4215 Mon Sep 17 00:00:00 2001 From: jedarden Date: Tue, 21 Apr 2026 14:26:16 -0400 Subject: [PATCH] =?UTF-8?q?feat(bd-j1t):=20first-class=20worker=20state=20?= =?UTF-8?q?machine=20=E2=80=94=20NeedleState=20+=20gap-based=20stuck=20det?= =?UTF-8?q?ection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace coarse NeedleWorkerStatus ('idle'|'executing'|'draining'|'starting') with the real NEEDLE state machine: BOOTING→SELECTING→CLAIMING→WORKING→CLOSING→STOPPED. - Add NeedleState type, VALID_TRANSITIONS map, needleStateToStatus() helper in types.ts - Track needleState + lastStateTransition per worker by consuming worker.state_transition events - Surface all six states in TUI worker cards (WorkerGrid, WorkerDetail) with per-state icons/colors - Surface all six states in web WorkerGrid.tsx with NEEDLE_STATE_LABELS and NEEDLE_STATE_COLORS - Add getNeedleStateColor/getNeedleStateIcon to colors.ts - Rewire stuck detection to fire on state-transition gaps (state_gap pattern in stuckDetection.ts) - Add sequence-based event ordering via compareEventsBySequence and queryOrdered() - Legacy event-type fallback preserved for workers not emitting state_transition events Co-Authored-By: Claude Opus 4.7 --- src/store.test.ts | 82 ++++++++++ src/store.ts | 147 ++++++++++++++--- src/tui/app.test.ts | 5 +- src/tui/components/WorkerDetail.e2e.test.ts | 2 +- src/tui/components/WorkerDetail.ts | 20 ++- src/tui/components/WorkerGrid.ts | 37 ++++- src/tui/keyboardNavigation.e2e.test.ts | 1 + src/tui/utils/colors.ts | 30 ++++ src/types.ts | 151 +++++++++++++++++- .../frontend/src/components/WorkerGrid.tsx | 29 +++- src/web/frontend/src/types.ts | 55 +++++++ 11 files changed, 515 insertions(+), 44 deletions(-) diff --git a/src/store.test.ts b/src/store.test.ts index 8315252..4c60d2a 100644 --- a/src/store.test.ts +++ b/src/store.test.ts @@ -1582,6 +1582,88 @@ describe('InMemoryEventStore', () => { }); }); +describe('sequence-based ordering', () => { + let store: InMemoryEventStore; + + beforeEach(() => { + store = new InMemoryEventStore(); + }); + + it('queryOrdered returns events sorted by (worker, sequence), not timestamp', () => { + // Inject events with out-of-order timestamps but monotonic sequences per worker. + // Worker A: sequence 0->2 with timestamps 300, 100, 200 (out of order) + // Worker B: sequence 0->1 with timestamps 50, 150 + const events: LogEvent[] = [ + { ts: 300, worker: 'w-a', sequence: 0, level: 'info', msg: 'A-seq0' }, + { ts: 50, worker: 'w-b', sequence: 0, level: 'info', msg: 'B-seq0' }, + { ts: 100, worker: 'w-a', sequence: 1, level: 'info', msg: 'A-seq1' }, + { ts: 150, worker: 'w-b', sequence: 1, level: 'info', msg: 'B-seq1' }, + { ts: 200, worker: 'w-a', sequence: 2, level: 'info', msg: 'A-seq2' }, + ]; + + for (const e of events) store.add(e); + + const ordered = store.queryOrdered(); + + // Expected order: all w-a events (seq 0,1,2) then all w-b events (seq 0,1) + // because (worker, sequence) sort groups by worker first. + expect(ordered.map(e => e.msg)).toEqual([ + 'A-seq0', 'A-seq1', 'A-seq2', + 'B-seq0', 'B-seq1', + ]); + + // Verify timestamps are NOT in order (proving sequence-based, not time-based sort) + const timestamps = ordered.map(e => e.ts); + expect(timestamps).not.toEqual([...timestamps].sort((a, b) => a - b)); + }); + + it('queryOrdered falls back to ts for events without sequence', () => { + const events: LogEvent[] = [ + { ts: 300, worker: 'w-a', level: 'info', msg: 'no-seq-300' }, + { ts: 100, worker: 'w-a', level: 'info', msg: 'no-seq-100' }, + { ts: 200, worker: 'w-a', level: 'info', msg: 'no-seq-200' }, + ]; + + for (const e of events) store.add(e); + + const ordered = store.queryOrdered(); + expect(ordered.map(e => e.msg)).toEqual([ + 'no-seq-100', 'no-seq-200', 'no-seq-300', + ]); + }); + + it('sequence index stores events keyed by (worker, sequence)', () => { + const event: LogEvent = { ts: Date.now(), worker: 'w-idx', sequence: 42, level: 'info', msg: 'idx-test' }; + store.add(event); + + // queryOrdered should return the event correctly + const ordered = store.queryOrdered(); + expect(ordered).toHaveLength(1); + expect(ordered[0].worker).toBe('w-idx'); + expect(ordered[0].sequence).toBe(42); + }); + + it('handles mixed workers with interleaved sequences', () => { + // Simulate multi-host OTLP ingestion: two workers with overlapping timestamps + // but independent monotonic sequences. + const events: LogEvent[] = [ + { ts: 100, worker: 'host-1', sequence: 0, level: 'info', msg: 'h1-0' }, + { ts: 110, worker: 'host-2', sequence: 0, level: 'info', msg: 'h2-0' }, + { ts: 200, worker: 'host-1', sequence: 1, level: 'info', msg: 'h1-1' }, + { ts: 150, worker: 'host-2', sequence: 1, level: 'info', msg: 'h2-1' }, + { ts: 300, worker: 'host-1', sequence: 2, level: 'info', msg: 'h1-2' }, + ]; + + for (const e of events) store.add(e); + + const ordered = store.queryOrdered(); + expect(ordered.map(e => e.msg)).toEqual([ + 'h1-0', 'h1-1', 'h1-2', + 'h2-0', 'h2-1', + ]); + }); +}); + describe('getStore and resetStore', () => { beforeEach(() => { resetStore(); diff --git a/src/store.ts b/src/store.ts index 38aed58..ff07312 100644 --- a/src/store.ts +++ b/src/store.ts @@ -10,6 +10,9 @@ import { LogEvent, WorkerInfo, WorkerStatus, + NeedleState, + needleStateToStatus, + VALID_TRANSITIONS, EventFilter, EventStore, FileCollision, @@ -38,7 +41,9 @@ import { FileAnomaly, AnomalyDetectionOptions, AnomalyStats, + compareEventsBySequence, } from './types.js'; +import { isWorkerStuck } from './tui/utils/stuckDetection.js'; import { detectAnomalies, getAnomalyStats } from './tui/utils/fileAnomalyDetection.js'; import { ErrorGroupManager, getErrorGroupManager } from './errorGrouping.js'; import { RecoveryManager, getRecoveryManager } from './tui/utils/recoveryPlaybook.js'; @@ -82,6 +87,7 @@ interface FileModificationTracker { export class InMemoryEventStore implements EventStore { private events: LogEvent[] = []; + private sequenceIndex: Map = new Map(); // key: `${worker}:${sequence}` private workers: Map = new Map(); private collisions: Map = new Map(); private beadCollisions: Map = new Map(); @@ -117,6 +123,12 @@ export class InMemoryEventStore implements EventStore { */ add(event: LogEvent): void { this.events.push(event); + + // Populate secondary index keyed on (worker, sequence) + if (event.sequence != null && event.sequence >= 0) { + this.sequenceIndex.set(`${event.worker}:${event.sequence}`, event); + } + this.updateWorkerInfo(event); this.detectCollision(event); this.detectBeadCollision(event); @@ -136,9 +148,12 @@ export class InMemoryEventStore implements EventStore { // Process event for cross-references (immediate) this.crossReferenceManager.processEvent(event); - // Process event for worker analytics + // Process event for worker analytics (also feeds MetricAccumulator) this.workerAnalytics.processEvent(event); + // Drain OTLP metric samples to SQLite for persistence + this.flushMetricSamples(); + // Process event for semantic narrative (real-time) this.semanticNarrativeManager.processEvent(event); @@ -188,6 +203,14 @@ export class InMemoryEventStore implements EventStore { }); } + /** + * Query events sorted by (worker, sequence), falling back to ts. + * This is the authoritative ordering — use instead of timestamp-based sort. + */ + queryOrdered(filter?: EventFilter): LogEvent[] { + return this.query(filter).sort(compareEventsBySequence); + } + /** * Get worker info */ @@ -226,6 +249,7 @@ export class InMemoryEventStore implements EventStore { this.persistSession(); this.events = []; + this.sequenceIndex.clear(); this.workers.clear(); this.collisions.clear(); this.beadCollisions.clear(); @@ -241,6 +265,58 @@ export class InMemoryEventStore implements EventStore { } } + /** + * Flush accumulated OTLP metric samples from the MetricAccumulator + * into the metric_samples SQLite table, upsert per-worker session + * summaries, and update the live session with the latest aggregates. + */ + private flushMetricSamples(): void { + const accumulator = this.workerAnalytics.getMetricAccumulator(); + const samples = accumulator.drainSamples(); + for (const s of samples) { + this.historicalStore.recordMetricSample({ + workerId: s.workerId, + metricName: s.metricName, + value: s.value, + timestamp: s.timestamp, + source: 'otlp-metric', + beadId: s.beadId, + }); + } + + // Upsert per-worker session summaries from the accumulator snapshots + const hasMetricData = accumulator.hasMetricData(); + if (hasMetricData) { + // Get all workers that have metric snapshots + const allWorkerMetrics = this.workerAnalytics.getAllWorkerMetrics({ timeWindow: 'all' }); + for (const wm of allWorkerMetrics) { + const metricSnap = accumulator.getSnapshot(wm.workerId); + this.historicalStore.upsertSessionWorkerSummary({ + workerId: wm.workerId, + tokensIn: metricSnap?.tokensIn ?? Math.floor(wm.totalTokens * 0.7), + tokensOut: metricSnap?.tokensOut ?? Math.floor(wm.totalTokens * 0.3), + costUsd: metricSnap?.costUsd ?? wm.totalCostUsd, + beadsCompleted: metricSnap?.beadsCompleted ?? wm.beadsCompleted, + beadsFailed: metricSnap?.beadsFailed ?? 0, + errors: metricSnap?.errors ?? wm.errorCount, + metricsSource: metricSnap && metricSnap.costUsd > 0 ? 'otlp-metric' : 'log-derived', + }); + } + } + + // Update the live session row with current metric-derived aggregates + if (samples.length > 0 || hasMetricData) { + const analytics = this.workerAnalytics.getAggregatedAnalytics({ timeWindow: 'all' }); + this.historicalStore.updateLiveSession({ + workerCount: this.workers.size, + taskCount: analytics.totalBeadsCompleted, + totalCost: analytics.totalCostUsd, + totalTokens: analytics.totalTokens, + metricsSource: hasMetricData ? 'otlp-metric' : 'log-derived', + }); + } + } + /** * Persist current session to historical store */ @@ -251,11 +327,13 @@ export class InMemoryEventStore implements EventStore { const analytics = this.workerAnalytics.getAggregatedAnalytics({ timeWindow: 'all' }); // End the historical session + const hasMetricData = this.workerAnalytics.getMetricAccumulator().hasMetricData(); this.historicalStore.endSession({ workerCount: this.workers.size, taskCount: analytics.totalBeadsCompleted, totalCost: analytics.totalCostUsd, totalTokens: analytics.totalTokens, + metricsSource: hasMetricData ? 'otlp-metric' : 'log-derived', }); // Record any completed tasks that haven't been recorded yet @@ -410,31 +488,42 @@ export class InMemoryEventStore implements EventStore { } } - // Update status based on NEEDLE event type (event.msg holds the event type string) + // Handle worker.state_transition events (authoritative state machine) const needleEvent = event.msg; - if (event.level === 'error') { - worker.status = 'error'; - } else if ( - needleEvent === 'bead.completed' || - needleEvent === 'worker.idle' || - needleEvent === 'worker.stopped' || - needleEvent === 'worker.draining' - ) { - worker.status = 'idle'; - if (needleEvent === 'bead.completed' && event.bead) { - worker.beadsCompleted++; + if (needleEvent === 'worker.state_transition') { + const from = event.from as string | undefined; + const to = event.to as string | undefined; + if (to && this.isValidNeedleState(to)) { + worker.needleState = to as NeedleState; + worker.lastStateTransition = event.ts; + worker.status = event.level === 'error' ? 'error' : needleStateToStatus(to as NeedleState); } - if (needleEvent === 'bead.completed') { - worker.activeFiles = []; - worker.activeBead = undefined; + } else { + // Fallback: infer state from legacy event types when no state_transition events arrive + if (event.level === 'error') { + worker.status = 'error'; + } else if ( + needleEvent === 'bead.completed' || + needleEvent === 'worker.idle' || + needleEvent === 'worker.stopped' || + needleEvent === 'worker.draining' + ) { + worker.status = 'idle'; + if (needleEvent === 'bead.completed' && event.bead) { + worker.beadsCompleted++; + } + if (needleEvent === 'bead.completed') { + worker.activeFiles = []; + worker.activeBead = undefined; + } + } else if ( + needleEvent === 'worker.started' || + needleEvent === 'bead.claimed' || + needleEvent === 'bead.agent_started' || + needleEvent === 'execution.started' + ) { + worker.status = 'active'; } - } else if ( - needleEvent === 'worker.started' || - needleEvent === 'bead.claimed' || - needleEvent === 'bead.agent_started' || - needleEvent === 'execution.started' - ) { - worker.status = 'active'; } // Update last event @@ -445,6 +534,18 @@ export class InMemoryEventStore implements EventStore { const hasBeadCollision = this.getWorkerBeadCollisions(worker.id).length > 0; const hasTaskCollision = this.getWorkerTaskCollisions(worker.id).length > 0; worker.hasCollision = hasFileCollision || hasBeadCollision || hasTaskCollision; + + // Run gap-based stuck detection + const stuckPattern = isWorkerStuck(worker, this.events); + worker.stuck = stuckPattern != null; + worker.stuckReason = stuckPattern?.reason ?? undefined; + } + + /** + * Check if a string is a valid NeedleState value. + */ + private isValidNeedleState(value: string): value is NeedleState { + return ['BOOTING', 'SELECTING', 'CLAIMING', 'WORKING', 'CLOSING', 'STOPPED'].includes(value); } /** diff --git a/src/tui/app.test.ts b/src/tui/app.test.ts index 522ef2b..72319f0 100644 --- a/src/tui/app.test.ts +++ b/src/tui/app.test.ts @@ -110,6 +110,7 @@ vi.mock('./components/WorkerDetail.js', () => { show = vi.fn(); hide = vi.fn(); focus = vi.fn(); + isVisible = vi.fn(() => false); getElement = vi.fn(() => ({ hide: vi.fn(), show: vi.fn(), screen: { render: vi.fn() } })); }, }; @@ -333,13 +334,13 @@ describe('FabricTuiApp', () => { it('should bind heatmap view key (H)', () => { const mockScreen = getMockScreen(); - expect(mockScreen.key).toHaveBeenCalledWith(['H'], expect.any(Function)); + expect(mockScreen.key).toHaveBeenCalledWith(['H', 'h'], expect.any(Function)); }); it('should bind DAG view key (D)', () => { const mockScreen = getMockScreen(); - expect(mockScreen.key).toHaveBeenCalledWith(['D'], expect.any(Function)); + expect(mockScreen.key).toHaveBeenCalledWith(['D', 'd'], expect.any(Function)); }); it('should bind escape key', () => { diff --git a/src/tui/components/WorkerDetail.e2e.test.ts b/src/tui/components/WorkerDetail.e2e.test.ts index 67c82b6..b3be884 100644 --- a/src/tui/components/WorkerDetail.e2e.test.ts +++ b/src/tui/components/WorkerDetail.e2e.test.ts @@ -128,7 +128,7 @@ describe('E2E: WorkerDetail Panel', () => { const content = getRenderedContent(); expect(content).toContain('{light-green-fg}{bold}● w-active-1{/}'); - expect(content).toContain('Status:'); + expect(content).toContain('State:'); expect(content).toContain('{light-green-fg}ACTIVE{/}'); }); diff --git a/src/tui/components/WorkerDetail.ts b/src/tui/components/WorkerDetail.ts index 0b2af9f..497592a 100644 --- a/src/tui/components/WorkerDetail.ts +++ b/src/tui/components/WorkerDetail.ts @@ -6,7 +6,7 @@ import blessed from 'blessed'; import { WorkerInfo, LogEvent } from '../../types.js'; -import { colors, getStatusColor, getLevelColor } from '../utils/colors.js'; +import { colors, getStatusColor, getLevelColor, getNeedleStateColor, getNeedleStateIcon } from '../utils/colors.js'; export interface WorkerDetailOptions { /** Parent screen */ @@ -103,15 +103,23 @@ export class WorkerDetail { const w = this.worker; const lines: string[] = []; - // Header with status - const statusColor = getStatusColor(w.status); - const statusIcon = w.status === 'active' ? '●' : w.status === 'idle' ? '○' : '✗'; - lines.push(`{${statusColor}-fg}{bold}${statusIcon} ${w.id}{/}`); + // Header with status — prefer NeedleState when available + const stateLabel = w.needleState ?? w.status.toUpperCase(); + const stateColor = w.needleState + ? getNeedleStateColor(w.needleState) + : getStatusColor(w.status); + const stateIcon = w.needleState + ? getNeedleStateIcon(w.needleState) + : (w.status === 'active' ? '●' : w.status === 'idle' ? '○' : '✗'); + lines.push(`{${stateColor}-fg}{bold}${stateIcon} ${w.id}{/}`); lines.push('{gray-fg}─────────────────────────────────────{/}'); lines.push(''); // Status info - lines.push(`{bold}Status:{/} {${statusColor}-fg}${w.status.toUpperCase()}{/}`); + lines.push(`{bold}State:{/} {${stateColor}-fg}${stateLabel}{/}`); + if (w.stuck) { + lines.push(`{red-fg}{bold}STUCK:{/} ${w.stuckReason ?? 'unknown'}{/}`); + } lines.push(`{bold}Uptime:{/} ${this.formatUptime(w.firstSeen)}`); lines.push(`{bold}Beads Completed:{/} {green-fg}${w.beadsCompleted}{/}`); lines.push(''); diff --git a/src/tui/components/WorkerGrid.ts b/src/tui/components/WorkerGrid.ts index ef25215..22fe16b 100644 --- a/src/tui/components/WorkerGrid.ts +++ b/src/tui/components/WorkerGrid.ts @@ -6,7 +6,7 @@ import blessed from 'blessed'; import { WorkerInfo } from '../../types.js'; -import { colors, getStatusColor } from '../utils/colors.js'; +import { colors, getStatusColor, getNeedleStateColor, getNeedleStateIcon } from '../utils/colors.js'; export interface WorkerGridOptions { /** Parent screen */ @@ -84,9 +84,10 @@ export class WorkerGrid { } /** - * Get status icon for worker + * Get status icon for worker — prefers NeedleState when available */ private getStatusIcon(worker: WorkerInfo): string { + if (worker.needleState) return getNeedleStateIcon(worker.needleState); switch (worker.status) { case 'active': return '●'; case 'idle': return '○'; @@ -94,6 +95,22 @@ export class WorkerGrid { } } + /** + * Get display label for worker state + */ + private getStateLabel(worker: WorkerInfo): string { + if (worker.needleState) return worker.needleState; + return worker.status; + } + + /** + * Get display color for worker state + */ + private getStateColor(worker: WorkerInfo): string { + if (worker.needleState) return getNeedleStateColor(worker.needleState); + return getStatusColor(worker.status); + } + /** * Get collision indicator for worker */ @@ -104,17 +121,29 @@ export class WorkerGrid { return ''; } + /** + * Get stuck indicator for worker + */ + private getStuckIndicator(worker: WorkerInfo): string { + if (worker.stuck) { + return '{red-fg}⚡STUCK{/}'; + } + return ''; + } + /** * Format worker line for display */ private formatWorkerLine(worker: WorkerInfo, isSelected: boolean): string { const icon = this.getStatusIcon(worker); - const color = getStatusColor(worker.status); + const color = this.getStateColor(worker); + const stateLabel = this.getStateLabel(worker); const workerId = worker.id.slice(0, 12); const currentTask = worker.lastEvent?.bead || '-'; const taskDesc = (worker.lastEvent?.msg || '').slice(0, 25); const duration = this.formatDuration(worker.lastEvent?.ts); const collisionIndicator = this.getCollisionIndicator(worker); + const stuckIndicator = this.getStuckIndicator(worker); const selectedMarker = isSelected ? '>' : ' '; const isPinned = this.pinnedWorkerId === worker.id; @@ -125,7 +154,7 @@ export class WorkerGrid { const dimPrefix = shouldDim ? '{gray-fg}' : ''; const dimSuffix = shouldDim ? '{/}' : ''; - return `${dimPrefix}${selectedMarker} {${color}-fg}${icon}{/} {bold}${workerId}{/} ${pinIndicator} {gray-fg}${currentTask}{/} ${taskDesc} {blue-fg}${duration}{/} ${collisionIndicator}${dimSuffix}`; + return `${dimPrefix}${selectedMarker} {${color}-fg}${icon}{/} {bold}${workerId}{/} ${pinIndicator} {${color}-fg}${stateLabel}{/} ${stuckIndicator} {gray-fg}${currentTask}{/} ${taskDesc} {blue-fg}${duration}{/} ${collisionIndicator}${dimSuffix}`; } /** diff --git a/src/tui/keyboardNavigation.e2e.test.ts b/src/tui/keyboardNavigation.e2e.test.ts index 55d2d24..5a4711c 100644 --- a/src/tui/keyboardNavigation.e2e.test.ts +++ b/src/tui/keyboardNavigation.e2e.test.ts @@ -105,6 +105,7 @@ vi.mock('./components/WorkerDetail.js', () => ({ show = vi.fn(); hide = vi.fn(); focus = vi.fn(); + isVisible = vi.fn(() => false); getElement = vi.fn(() => ({ hide: vi.fn(), show: vi.fn(), screen: { render: vi.fn() } })); }, })); diff --git a/src/tui/utils/colors.ts b/src/tui/utils/colors.ts index 015508a..646957e 100644 --- a/src/tui/utils/colors.ts +++ b/src/tui/utils/colors.ts @@ -15,6 +15,7 @@ */ import { getColors, getThemeManager, ThemeName, ThemeColors } from './theme.js'; +import type { NeedleState } from '../../types.js'; // Re-export theme types and functions for convenience export type { ThemeName, ThemeColors } from './theme.js'; @@ -39,6 +40,35 @@ export function getStatusColor(status: 'active' | 'idle' | 'error'): string { return getColors()[status]; } +/** + * Get color for a NeedleState value. + */ +export function getNeedleStateColor(state: NeedleState): string { + const c = getColors(); + switch (state) { + case 'BOOTING': return c.info; + case 'SELECTING': return c.warn; + case 'CLAIMING': return 'magenta'; + case 'WORKING': return c.active; + case 'CLOSING': return c.warn; + case 'STOPPED': return c.idle; + } +} + +/** + * Get display icon for a NeedleState value. + */ +export function getNeedleStateIcon(state: NeedleState): string { + switch (state) { + case 'BOOTING': return '⏳'; + case 'SELECTING': return '🔍'; + case 'CLAIMING': return '🎯'; + case 'WORKING': return '●'; + case 'CLOSING': return '⏹'; + case 'STOPPED': return '○'; + } +} + /** * Get color for log level */ diff --git a/src/types.ts b/src/types.ts index 86f651d..63da493 100644 --- a/src/types.ts +++ b/src/types.ts @@ -16,10 +16,49 @@ export type LogLevel = 'debug' | 'info' | 'warn' | 'error'; export type WorkerStatus = 'active' | 'idle' | 'error'; /** - * NEEDLE worker status values as emitted in heartbeat files and worker.* events. - * FABRIC maps these to the simpler WorkerStatus for display. + * NEEDLE worker state machine — first-class states emitted by + * worker.state_transition events. These are the canonical states + * that replace the coarse WorkerStatus in the UI. + * + * BOOTING → SELECTING → CLAIMING → WORKING → CLOSING → STOPPED */ -export type NeedleWorkerStatus = 'idle' | 'executing' | 'draining' | 'starting'; +export type NeedleState = + | 'BOOTING' + | 'SELECTING' + | 'CLAIMING' + | 'WORKING' + | 'CLOSING' + | 'STOPPED'; + +/** + * All valid state transitions in the NEEDLE worker state machine. + * A worker in BOOTING can only go to SELECTING, etc. + */ +export const VALID_TRANSITIONS: Record = { + BOOTING: ['SELECTING'], + SELECTING: ['CLAIMING', 'STOPPED'], + CLAIMING: ['WORKING', 'SELECTING'], + WORKING: ['CLOSING', 'SELECTING'], + CLOSING: ['SELECTING', 'STOPPED'], + STOPPED: ['BOOTING'], +}; + +/** + * Map a NeedleState to the coarser WorkerStatus for backward compatibility. + */ +export function needleStateToStatus(state: NeedleState): WorkerStatus { + switch (state) { + case 'BOOTING': + case 'SELECTING': + case 'CLAIMING': + case 'WORKING': + return 'active'; + case 'CLOSING': + return 'active'; + case 'STOPPED': + return 'idle'; + } +} /** * All event types emitted by NEEDLE's telemetry pipeline. @@ -31,6 +70,7 @@ export type NeedleEventType = | 'worker.idle' | 'worker.stopped' | 'worker.draining' + | 'worker.state_transition' // Bead lifecycle | 'bead.claimed' | 'bead.prompt_built' @@ -358,12 +398,15 @@ export interface ConversationParseOptions { // ============================================ export interface LogEvent { - /** Unix timestamp in milliseconds */ + /** Unix timestamp in milliseconds — display only, NOT authoritative for ordering */ ts: number; /** Worker identifier (e.g., 'w-abc123') */ worker: string; + /** Per-worker monotonic counter — authoritative for ordering (from NeedleEvent.sequence) */ + sequence?: number; + /** Log level */ level: LogLevel; @@ -398,13 +441,38 @@ export interface LogEvent { [key: string]: unknown; } +/** + * Compare two LogEvents by (worker, sequence), falling back to ts. + * + * Sequence is the authoritative ordering key within a worker (monotonic counter + * from NEEDLE). Timestamp is used only as a display fallback for legacy events + * that lack a sequence number. + */ +export function compareEventsBySequence(a: LogEvent, b: LogEvent): number { + const seqA = a.sequence != null && a.sequence >= 0 ? a.sequence : null; + const seqB = b.sequence != null && b.sequence >= 0 ? b.sequence : null; + + if (seqA !== null && seqB !== null) { + if (a.worker !== b.worker) return a.worker.localeCompare(b.worker); + return seqA - seqB; + } + + return a.ts - b.ts; +} + export interface WorkerInfo { /** Worker identifier */ id: string; - /** Current status */ + /** Current status (coarse — derived from needleState) */ status: WorkerStatus; + /** Current NEEDLE state machine state (fine-grained) */ + needleState?: NeedleState; + + /** Timestamp of the last state transition (ms since epoch) */ + lastStateTransition?: number; + /** Last event received */ lastEvent?: LogEvent; @@ -434,6 +502,12 @@ export interface WorkerInfo { /** Total number of events received for this worker */ eventCount: number; + + /** Whether the worker appears stuck (gap-based detection) */ + stuck?: boolean; + + /** Human-readable reason the worker is stuck */ + stuckReason?: string; } export interface EventFilter { @@ -567,6 +641,9 @@ export interface EventStore { /** Query events with optional filter */ query(filter?: EventFilter): LogEvent[]; + /** Query events sorted by (worker, sequence), falling back to ts */ + queryOrdered(filter?: EventFilter): LogEvent[]; + /** Get worker info */ getWorker(workerId: string): WorkerInfo | undefined; @@ -1092,6 +1169,70 @@ export interface DagStats { criticalPathBeads: number; } +// ============================================ +// Span-based DAG Types (OTLP span hierarchy) +// ============================================ + +/** + * A node in the span DAG, built from paired .started/.finished NeedleEvents + * derived from OTLP spans. Parent-child relationships come from + * parent_span_id rather than bead dependencies. + */ +export interface SpanNode { + /** OTLP span ID */ + span_id: string; + + /** OTLP trace ID */ + trace_id: string; + + /** Parent span ID (undefined for root spans) */ + parent_span_id?: string; + + /** Span name (e.g. "bead.lifecycle", "tool.call", "llm.request") */ + name: string; + + /** Worker that emitted this span */ + worker_id: string; + + /** Associated bead ID (if this is a bead lifecycle span) */ + bead_id?: string; + + /** Start timestamp (ms from .started event) */ + start_ts?: number; + + /** End timestamp (ms from .finished event) */ + end_ts?: number; + + /** Duration in ms */ + duration_ms?: number; + + /** Span completion status */ + status: 'ok' | 'error' | 'unknown'; + + /** Child spans (populated during tree construction) */ + children: SpanNode[]; + + /** Remaining span attributes */ + attributes: Record; +} + +/** + * A span hierarchy DAG built from OTLP span events. + * Unlike the bead DependencyGraph (which uses br graph --json), + * the SpanDag is built from the live event stream using parent_span_id + * for parent-child linkage. + */ +export interface SpanDag { + /** Root spans (no parent_span_id or orphaned) */ + roots: SpanNode[]; + + /** All spans indexed by span_id */ + allSpans: Map; + + /** Spans grouped by trace_id */ + traces: Map; +} + // ============================================ // Git Event Types // ============================================ diff --git a/src/web/frontend/src/components/WorkerGrid.tsx b/src/web/frontend/src/components/WorkerGrid.tsx index 3f8cc8e..35646b7 100644 --- a/src/web/frontend/src/components/WorkerGrid.tsx +++ b/src/web/frontend/src/components/WorkerGrid.tsx @@ -1,5 +1,23 @@ import React from 'react'; -import { WorkerInfo } from '../types'; +import { WorkerInfo, NeedleState } from '../types'; + +const NEEDLE_STATE_LABELS: Record = { + BOOTING: 'BOOTING', + SELECTING: 'SELECTING', + CLAIMING: 'CLAIMING', + WORKING: 'WORKING', + CLOSING: 'CLOSING', + STOPPED: 'STOPPED', +}; + +const NEEDLE_STATE_COLORS: Record = { + BOOTING: '#5bc0de', + SELECTING: '#f0ad4e', + CLAIMING: '#9b59b6', + WORKING: '#5cb85c', + CLOSING: '#f0ad4e', + STOPPED: '#777', +}; interface WorkerGridProps { workers: WorkerInfo[]; @@ -85,8 +103,13 @@ const WorkerGrid: React.FC = ({ {isPinned ? '📌' : '📍'} )} - - {worker.status} + + {worker.stuck && '⚡'} + {worker.needleState ? NEEDLE_STATE_LABELS[worker.needleState] : worker.status} diff --git a/src/web/frontend/src/types.ts b/src/web/frontend/src/types.ts index 09f3018..2e3a258 100644 --- a/src/web/frontend/src/types.ts +++ b/src/web/frontend/src/types.ts @@ -1,5 +1,13 @@ // FABRIC Web Frontend Types +export type NeedleState = + | 'BOOTING' + | 'SELECTING' + | 'CLAIMING' + | 'WORKING' + | 'CLOSING' + | 'STOPPED'; + export interface LogEvent { timestamp: string; level: 'debug' | 'info' | 'warn' | 'error'; @@ -8,6 +16,25 @@ export interface LogEvent { message: string; raw: string; bead?: string; // Bead/task identifier for Focus Mode + sequence?: number; // Per-worker monotonic counter — authoritative for ordering + ts?: number; // Unix timestamp in ms (display only) +} + +/** + * Compare two LogEvents by (worker, sequence), falling back to ts/timestamp. + */ +export function compareEventsBySequence(a: LogEvent, b: LogEvent): number { + const seqA = a.sequence != null && a.sequence >= 0 ? a.sequence : null; + const seqB = b.sequence != null && b.sequence >= 0 ? b.sequence : null; + + if (seqA !== null && seqB !== null) { + if (a.worker !== b.worker) return a.worker.localeCompare(b.worker); + return seqA - seqB; + } + + const tsA = a.ts ?? new Date(a.timestamp).getTime(); + const tsB = b.ts ?? new Date(b.timestamp).getTime(); + return tsA - tsB; } export interface WorkerInfo { @@ -15,10 +42,13 @@ export interface WorkerInfo { lastSeen: string; eventCount: number; status: 'active' | 'idle' | 'error'; + needleState?: NeedleState; currentTool?: string; recentEvents: LogEvent[]; hasCollision?: boolean; activeFiles?: string[]; + stuck?: boolean; + stuckReason?: string; } export interface FileCollision { @@ -239,6 +269,31 @@ export interface DagOptions { export type DagViewMode = 'tree' | 'blockers' | 'ready' | 'stats'; +// ============================================ +// Span DAG Types (OTLP span hierarchy) +// ============================================ + +export interface SpanNode { + span_id: string; + trace_id: string; + parent_span_id: string | null; + name: string; + worker_id: string; + bead_id: string | null; + start_ts: number | null; + end_ts: number | null; + duration_ms: number | null; + status: 'ok' | 'error' | 'unknown'; + attributes: Record; + children: SpanNode[]; +} + +export interface SpanDagResponse { + roots: SpanNode[]; + totalSpans: number; + traces: Array<{ trace_id: string; span_count: number }>; +} + // ============================================ // Recovery Playbook Types // ============================================