feat(bd-j1t): first-class worker state machine — NeedleState + gap-based stuck detection

Replace coarse NeedleWorkerStatus ('idle'|'executing'|'draining'|'starting')
with the real NEEDLE state machine: BOOTING→SELECTING→CLAIMING→WORKING→CLOSING→STOPPED.

- Add NeedleState type, VALID_TRANSITIONS map, needleStateToStatus() helper in types.ts
- Track needleState + lastStateTransition per worker by consuming worker.state_transition events
- Surface all six states in TUI worker cards (WorkerGrid, WorkerDetail) with per-state icons/colors
- Surface all six states in web WorkerGrid.tsx with NEEDLE_STATE_LABELS and NEEDLE_STATE_COLORS
- Add getNeedleStateColor/getNeedleStateIcon to colors.ts
- Rewire stuck detection to fire on state-transition gaps (state_gap pattern in stuckDetection.ts)
- Add sequence-based event ordering via compareEventsBySequence and queryOrdered()
- Legacy event-type fallback preserved for workers not emitting state_transition events

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-21 14:26:16 -04:00
parent 006e0f5b59
commit cf7f727210
11 changed files with 515 additions and 44 deletions

View file

@ -1582,6 +1582,88 @@ describe('InMemoryEventStore', () => {
});
});
describe('sequence-based ordering', () => {
let store: InMemoryEventStore;
beforeEach(() => {
store = new InMemoryEventStore();
});
it('queryOrdered returns events sorted by (worker, sequence), not timestamp', () => {
// Inject events with out-of-order timestamps but monotonic sequences per worker.
// Worker A: sequence 0->2 with timestamps 300, 100, 200 (out of order)
// Worker B: sequence 0->1 with timestamps 50, 150
const events: LogEvent[] = [
{ ts: 300, worker: 'w-a', sequence: 0, level: 'info', msg: 'A-seq0' },
{ ts: 50, worker: 'w-b', sequence: 0, level: 'info', msg: 'B-seq0' },
{ ts: 100, worker: 'w-a', sequence: 1, level: 'info', msg: 'A-seq1' },
{ ts: 150, worker: 'w-b', sequence: 1, level: 'info', msg: 'B-seq1' },
{ ts: 200, worker: 'w-a', sequence: 2, level: 'info', msg: 'A-seq2' },
];
for (const e of events) store.add(e);
const ordered = store.queryOrdered();
// Expected order: all w-a events (seq 0,1,2) then all w-b events (seq 0,1)
// because (worker, sequence) sort groups by worker first.
expect(ordered.map(e => e.msg)).toEqual([
'A-seq0', 'A-seq1', 'A-seq2',
'B-seq0', 'B-seq1',
]);
// Verify timestamps are NOT in order (proving sequence-based, not time-based sort)
const timestamps = ordered.map(e => e.ts);
expect(timestamps).not.toEqual([...timestamps].sort((a, b) => a - b));
});
it('queryOrdered falls back to ts for events without sequence', () => {
const events: LogEvent[] = [
{ ts: 300, worker: 'w-a', level: 'info', msg: 'no-seq-300' },
{ ts: 100, worker: 'w-a', level: 'info', msg: 'no-seq-100' },
{ ts: 200, worker: 'w-a', level: 'info', msg: 'no-seq-200' },
];
for (const e of events) store.add(e);
const ordered = store.queryOrdered();
expect(ordered.map(e => e.msg)).toEqual([
'no-seq-100', 'no-seq-200', 'no-seq-300',
]);
});
it('sequence index stores events keyed by (worker, sequence)', () => {
const event: LogEvent = { ts: Date.now(), worker: 'w-idx', sequence: 42, level: 'info', msg: 'idx-test' };
store.add(event);
// queryOrdered should return the event correctly
const ordered = store.queryOrdered();
expect(ordered).toHaveLength(1);
expect(ordered[0].worker).toBe('w-idx');
expect(ordered[0].sequence).toBe(42);
});
it('handles mixed workers with interleaved sequences', () => {
// Simulate multi-host OTLP ingestion: two workers with overlapping timestamps
// but independent monotonic sequences.
const events: LogEvent[] = [
{ ts: 100, worker: 'host-1', sequence: 0, level: 'info', msg: 'h1-0' },
{ ts: 110, worker: 'host-2', sequence: 0, level: 'info', msg: 'h2-0' },
{ ts: 200, worker: 'host-1', sequence: 1, level: 'info', msg: 'h1-1' },
{ ts: 150, worker: 'host-2', sequence: 1, level: 'info', msg: 'h2-1' },
{ ts: 300, worker: 'host-1', sequence: 2, level: 'info', msg: 'h1-2' },
];
for (const e of events) store.add(e);
const ordered = store.queryOrdered();
expect(ordered.map(e => e.msg)).toEqual([
'h1-0', 'h1-1', 'h1-2',
'h2-0', 'h2-1',
]);
});
});
describe('getStore and resetStore', () => {
beforeEach(() => {
resetStore();

View file

@ -10,6 +10,9 @@ import {
LogEvent,
WorkerInfo,
WorkerStatus,
NeedleState,
needleStateToStatus,
VALID_TRANSITIONS,
EventFilter,
EventStore,
FileCollision,
@ -38,7 +41,9 @@ import {
FileAnomaly,
AnomalyDetectionOptions,
AnomalyStats,
compareEventsBySequence,
} from './types.js';
import { isWorkerStuck } from './tui/utils/stuckDetection.js';
import { detectAnomalies, getAnomalyStats } from './tui/utils/fileAnomalyDetection.js';
import { ErrorGroupManager, getErrorGroupManager } from './errorGrouping.js';
import { RecoveryManager, getRecoveryManager } from './tui/utils/recoveryPlaybook.js';
@ -82,6 +87,7 @@ interface FileModificationTracker {
export class InMemoryEventStore implements EventStore {
private events: LogEvent[] = [];
private sequenceIndex: Map<string, LogEvent> = new Map(); // key: `${worker}:${sequence}`
private workers: Map<string, WorkerInfo> = new Map();
private collisions: Map<string, FileCollision> = new Map();
private beadCollisions: Map<string, BeadCollision> = new Map();
@ -117,6 +123,12 @@ export class InMemoryEventStore implements EventStore {
*/
add(event: LogEvent): void {
this.events.push(event);
// Populate secondary index keyed on (worker, sequence)
if (event.sequence != null && event.sequence >= 0) {
this.sequenceIndex.set(`${event.worker}:${event.sequence}`, event);
}
this.updateWorkerInfo(event);
this.detectCollision(event);
this.detectBeadCollision(event);
@ -136,9 +148,12 @@ export class InMemoryEventStore implements EventStore {
// Process event for cross-references (immediate)
this.crossReferenceManager.processEvent(event);
// Process event for worker analytics
// Process event for worker analytics (also feeds MetricAccumulator)
this.workerAnalytics.processEvent(event);
// Drain OTLP metric samples to SQLite for persistence
this.flushMetricSamples();
// Process event for semantic narrative (real-time)
this.semanticNarrativeManager.processEvent(event);
@ -188,6 +203,14 @@ export class InMemoryEventStore implements EventStore {
});
}
/**
* Query events sorted by (worker, sequence), falling back to ts.
* This is the authoritative ordering use instead of timestamp-based sort.
*/
queryOrdered(filter?: EventFilter): LogEvent[] {
return this.query(filter).sort(compareEventsBySequence);
}
/**
* Get worker info
*/
@ -226,6 +249,7 @@ export class InMemoryEventStore implements EventStore {
this.persistSession();
this.events = [];
this.sequenceIndex.clear();
this.workers.clear();
this.collisions.clear();
this.beadCollisions.clear();
@ -241,6 +265,58 @@ export class InMemoryEventStore implements EventStore {
}
}
/**
* Flush accumulated OTLP metric samples from the MetricAccumulator
* into the metric_samples SQLite table, upsert per-worker session
* summaries, and update the live session with the latest aggregates.
*/
private flushMetricSamples(): void {
const accumulator = this.workerAnalytics.getMetricAccumulator();
const samples = accumulator.drainSamples();
for (const s of samples) {
this.historicalStore.recordMetricSample({
workerId: s.workerId,
metricName: s.metricName,
value: s.value,
timestamp: s.timestamp,
source: 'otlp-metric',
beadId: s.beadId,
});
}
// Upsert per-worker session summaries from the accumulator snapshots
const hasMetricData = accumulator.hasMetricData();
if (hasMetricData) {
// Get all workers that have metric snapshots
const allWorkerMetrics = this.workerAnalytics.getAllWorkerMetrics({ timeWindow: 'all' });
for (const wm of allWorkerMetrics) {
const metricSnap = accumulator.getSnapshot(wm.workerId);
this.historicalStore.upsertSessionWorkerSummary({
workerId: wm.workerId,
tokensIn: metricSnap?.tokensIn ?? Math.floor(wm.totalTokens * 0.7),
tokensOut: metricSnap?.tokensOut ?? Math.floor(wm.totalTokens * 0.3),
costUsd: metricSnap?.costUsd ?? wm.totalCostUsd,
beadsCompleted: metricSnap?.beadsCompleted ?? wm.beadsCompleted,
beadsFailed: metricSnap?.beadsFailed ?? 0,
errors: metricSnap?.errors ?? wm.errorCount,
metricsSource: metricSnap && metricSnap.costUsd > 0 ? 'otlp-metric' : 'log-derived',
});
}
}
// Update the live session row with current metric-derived aggregates
if (samples.length > 0 || hasMetricData) {
const analytics = this.workerAnalytics.getAggregatedAnalytics({ timeWindow: 'all' });
this.historicalStore.updateLiveSession({
workerCount: this.workers.size,
taskCount: analytics.totalBeadsCompleted,
totalCost: analytics.totalCostUsd,
totalTokens: analytics.totalTokens,
metricsSource: hasMetricData ? 'otlp-metric' : 'log-derived',
});
}
}
/**
* Persist current session to historical store
*/
@ -251,11 +327,13 @@ export class InMemoryEventStore implements EventStore {
const analytics = this.workerAnalytics.getAggregatedAnalytics({ timeWindow: 'all' });
// End the historical session
const hasMetricData = this.workerAnalytics.getMetricAccumulator().hasMetricData();
this.historicalStore.endSession({
workerCount: this.workers.size,
taskCount: analytics.totalBeadsCompleted,
totalCost: analytics.totalCostUsd,
totalTokens: analytics.totalTokens,
metricsSource: hasMetricData ? 'otlp-metric' : 'log-derived',
});
// Record any completed tasks that haven't been recorded yet
@ -410,31 +488,42 @@ export class InMemoryEventStore implements EventStore {
}
}
// Update status based on NEEDLE event type (event.msg holds the event type string)
// Handle worker.state_transition events (authoritative state machine)
const needleEvent = event.msg;
if (event.level === 'error') {
worker.status = 'error';
} else if (
needleEvent === 'bead.completed' ||
needleEvent === 'worker.idle' ||
needleEvent === 'worker.stopped' ||
needleEvent === 'worker.draining'
) {
worker.status = 'idle';
if (needleEvent === 'bead.completed' && event.bead) {
worker.beadsCompleted++;
if (needleEvent === 'worker.state_transition') {
const from = event.from as string | undefined;
const to = event.to as string | undefined;
if (to && this.isValidNeedleState(to)) {
worker.needleState = to as NeedleState;
worker.lastStateTransition = event.ts;
worker.status = event.level === 'error' ? 'error' : needleStateToStatus(to as NeedleState);
}
if (needleEvent === 'bead.completed') {
worker.activeFiles = [];
worker.activeBead = undefined;
} else {
// Fallback: infer state from legacy event types when no state_transition events arrive
if (event.level === 'error') {
worker.status = 'error';
} else if (
needleEvent === 'bead.completed' ||
needleEvent === 'worker.idle' ||
needleEvent === 'worker.stopped' ||
needleEvent === 'worker.draining'
) {
worker.status = 'idle';
if (needleEvent === 'bead.completed' && event.bead) {
worker.beadsCompleted++;
}
if (needleEvent === 'bead.completed') {
worker.activeFiles = [];
worker.activeBead = undefined;
}
} else if (
needleEvent === 'worker.started' ||
needleEvent === 'bead.claimed' ||
needleEvent === 'bead.agent_started' ||
needleEvent === 'execution.started'
) {
worker.status = 'active';
}
} else if (
needleEvent === 'worker.started' ||
needleEvent === 'bead.claimed' ||
needleEvent === 'bead.agent_started' ||
needleEvent === 'execution.started'
) {
worker.status = 'active';
}
// Update last event
@ -445,6 +534,18 @@ export class InMemoryEventStore implements EventStore {
const hasBeadCollision = this.getWorkerBeadCollisions(worker.id).length > 0;
const hasTaskCollision = this.getWorkerTaskCollisions(worker.id).length > 0;
worker.hasCollision = hasFileCollision || hasBeadCollision || hasTaskCollision;
// Run gap-based stuck detection
const stuckPattern = isWorkerStuck(worker, this.events);
worker.stuck = stuckPattern != null;
worker.stuckReason = stuckPattern?.reason ?? undefined;
}
/**
* Check if a string is a valid NeedleState value.
*/
private isValidNeedleState(value: string): value is NeedleState {
return ['BOOTING', 'SELECTING', 'CLAIMING', 'WORKING', 'CLOSING', 'STOPPED'].includes(value);
}
/**

View file

@ -110,6 +110,7 @@ vi.mock('./components/WorkerDetail.js', () => {
show = vi.fn();
hide = vi.fn();
focus = vi.fn();
isVisible = vi.fn(() => false);
getElement = vi.fn(() => ({ hide: vi.fn(), show: vi.fn(), screen: { render: vi.fn() } }));
},
};
@ -333,13 +334,13 @@ describe('FabricTuiApp', () => {
it('should bind heatmap view key (H)', () => {
const mockScreen = getMockScreen();
expect(mockScreen.key).toHaveBeenCalledWith(['H'], expect.any(Function));
expect(mockScreen.key).toHaveBeenCalledWith(['H', 'h'], expect.any(Function));
});
it('should bind DAG view key (D)', () => {
const mockScreen = getMockScreen();
expect(mockScreen.key).toHaveBeenCalledWith(['D'], expect.any(Function));
expect(mockScreen.key).toHaveBeenCalledWith(['D', 'd'], expect.any(Function));
});
it('should bind escape key', () => {

View file

@ -128,7 +128,7 @@ describe('E2E: WorkerDetail Panel', () => {
const content = getRenderedContent();
expect(content).toContain('{light-green-fg}{bold}● w-active-1{/}');
expect(content).toContain('Status:');
expect(content).toContain('State:');
expect(content).toContain('{light-green-fg}ACTIVE{/}');
});

View file

@ -6,7 +6,7 @@
import blessed from 'blessed';
import { WorkerInfo, LogEvent } from '../../types.js';
import { colors, getStatusColor, getLevelColor } from '../utils/colors.js';
import { colors, getStatusColor, getLevelColor, getNeedleStateColor, getNeedleStateIcon } from '../utils/colors.js';
export interface WorkerDetailOptions {
/** Parent screen */
@ -103,15 +103,23 @@ export class WorkerDetail {
const w = this.worker;
const lines: string[] = [];
// Header with status
const statusColor = getStatusColor(w.status);
const statusIcon = w.status === 'active' ? '●' : w.status === 'idle' ? '○' : '✗';
lines.push(`{${statusColor}-fg}{bold}${statusIcon} ${w.id}{/}`);
// Header with status — prefer NeedleState when available
const stateLabel = w.needleState ?? w.status.toUpperCase();
const stateColor = w.needleState
? getNeedleStateColor(w.needleState)
: getStatusColor(w.status);
const stateIcon = w.needleState
? getNeedleStateIcon(w.needleState)
: (w.status === 'active' ? '●' : w.status === 'idle' ? '○' : '✗');
lines.push(`{${stateColor}-fg}{bold}${stateIcon} ${w.id}{/}`);
lines.push('{gray-fg}─────────────────────────────────────{/}');
lines.push('');
// Status info
lines.push(`{bold}Status:{/} {${statusColor}-fg}${w.status.toUpperCase()}{/}`);
lines.push(`{bold}State:{/} {${stateColor}-fg}${stateLabel}{/}`);
if (w.stuck) {
lines.push(`{red-fg}{bold}STUCK:{/} ${w.stuckReason ?? 'unknown'}{/}`);
}
lines.push(`{bold}Uptime:{/} ${this.formatUptime(w.firstSeen)}`);
lines.push(`{bold}Beads Completed:{/} {green-fg}${w.beadsCompleted}{/}`);
lines.push('');

View file

@ -6,7 +6,7 @@
import blessed from 'blessed';
import { WorkerInfo } from '../../types.js';
import { colors, getStatusColor } from '../utils/colors.js';
import { colors, getStatusColor, getNeedleStateColor, getNeedleStateIcon } from '../utils/colors.js';
export interface WorkerGridOptions {
/** Parent screen */
@ -84,9 +84,10 @@ export class WorkerGrid {
}
/**
* Get status icon for worker
* Get status icon for worker prefers NeedleState when available
*/
private getStatusIcon(worker: WorkerInfo): string {
if (worker.needleState) return getNeedleStateIcon(worker.needleState);
switch (worker.status) {
case 'active': return '●';
case 'idle': return '○';
@ -94,6 +95,22 @@ export class WorkerGrid {
}
}
/**
* Get display label for worker state
*/
private getStateLabel(worker: WorkerInfo): string {
if (worker.needleState) return worker.needleState;
return worker.status;
}
/**
* Get display color for worker state
*/
private getStateColor(worker: WorkerInfo): string {
if (worker.needleState) return getNeedleStateColor(worker.needleState);
return getStatusColor(worker.status);
}
/**
* Get collision indicator for worker
*/
@ -104,17 +121,29 @@ export class WorkerGrid {
return '';
}
/**
* Get stuck indicator for worker
*/
private getStuckIndicator(worker: WorkerInfo): string {
if (worker.stuck) {
return '{red-fg}⚡STUCK{/}';
}
return '';
}
/**
* Format worker line for display
*/
private formatWorkerLine(worker: WorkerInfo, isSelected: boolean): string {
const icon = this.getStatusIcon(worker);
const color = getStatusColor(worker.status);
const color = this.getStateColor(worker);
const stateLabel = this.getStateLabel(worker);
const workerId = worker.id.slice(0, 12);
const currentTask = worker.lastEvent?.bead || '-';
const taskDesc = (worker.lastEvent?.msg || '').slice(0, 25);
const duration = this.formatDuration(worker.lastEvent?.ts);
const collisionIndicator = this.getCollisionIndicator(worker);
const stuckIndicator = this.getStuckIndicator(worker);
const selectedMarker = isSelected ? '>' : ' ';
const isPinned = this.pinnedWorkerId === worker.id;
@ -125,7 +154,7 @@ export class WorkerGrid {
const dimPrefix = shouldDim ? '{gray-fg}' : '';
const dimSuffix = shouldDim ? '{/}' : '';
return `${dimPrefix}${selectedMarker} {${color}-fg}${icon}{/} {bold}${workerId}{/} ${pinIndicator} {gray-fg}${currentTask}{/} ${taskDesc} {blue-fg}${duration}{/} ${collisionIndicator}${dimSuffix}`;
return `${dimPrefix}${selectedMarker} {${color}-fg}${icon}{/} {bold}${workerId}{/} ${pinIndicator} {${color}-fg}${stateLabel}{/} ${stuckIndicator} {gray-fg}${currentTask}{/} ${taskDesc} {blue-fg}${duration}{/} ${collisionIndicator}${dimSuffix}`;
}
/**

View file

@ -105,6 +105,7 @@ vi.mock('./components/WorkerDetail.js', () => ({
show = vi.fn();
hide = vi.fn();
focus = vi.fn();
isVisible = vi.fn(() => false);
getElement = vi.fn(() => ({ hide: vi.fn(), show: vi.fn(), screen: { render: vi.fn() } }));
},
}));

View file

@ -15,6 +15,7 @@
*/
import { getColors, getThemeManager, ThemeName, ThemeColors } from './theme.js';
import type { NeedleState } from '../../types.js';
// Re-export theme types and functions for convenience
export type { ThemeName, ThemeColors } from './theme.js';
@ -39,6 +40,35 @@ export function getStatusColor(status: 'active' | 'idle' | 'error'): string {
return getColors()[status];
}
/**
* Get color for a NeedleState value.
*/
export function getNeedleStateColor(state: NeedleState): string {
const c = getColors();
switch (state) {
case 'BOOTING': return c.info;
case 'SELECTING': return c.warn;
case 'CLAIMING': return 'magenta';
case 'WORKING': return c.active;
case 'CLOSING': return c.warn;
case 'STOPPED': return c.idle;
}
}
/**
* Get display icon for a NeedleState value.
*/
export function getNeedleStateIcon(state: NeedleState): string {
switch (state) {
case 'BOOTING': return '⏳';
case 'SELECTING': return '🔍';
case 'CLAIMING': return '🎯';
case 'WORKING': return '●';
case 'CLOSING': return '⏹';
case 'STOPPED': return '○';
}
}
/**
* Get color for log level
*/

View file

@ -16,10 +16,49 @@ export type LogLevel = 'debug' | 'info' | 'warn' | 'error';
export type WorkerStatus = 'active' | 'idle' | 'error';
/**
* NEEDLE worker status values as emitted in heartbeat files and worker.* events.
* FABRIC maps these to the simpler WorkerStatus for display.
* NEEDLE worker state machine first-class states emitted by
* worker.state_transition events. These are the canonical states
* that replace the coarse WorkerStatus in the UI.
*
* BOOTING SELECTING CLAIMING WORKING CLOSING STOPPED
*/
export type NeedleWorkerStatus = 'idle' | 'executing' | 'draining' | 'starting';
export type NeedleState =
| 'BOOTING'
| 'SELECTING'
| 'CLAIMING'
| 'WORKING'
| 'CLOSING'
| 'STOPPED';
/**
* All valid state transitions in the NEEDLE worker state machine.
* A worker in BOOTING can only go to SELECTING, etc.
*/
export const VALID_TRANSITIONS: Record<NeedleState, NeedleState[]> = {
BOOTING: ['SELECTING'],
SELECTING: ['CLAIMING', 'STOPPED'],
CLAIMING: ['WORKING', 'SELECTING'],
WORKING: ['CLOSING', 'SELECTING'],
CLOSING: ['SELECTING', 'STOPPED'],
STOPPED: ['BOOTING'],
};
/**
* Map a NeedleState to the coarser WorkerStatus for backward compatibility.
*/
export function needleStateToStatus(state: NeedleState): WorkerStatus {
switch (state) {
case 'BOOTING':
case 'SELECTING':
case 'CLAIMING':
case 'WORKING':
return 'active';
case 'CLOSING':
return 'active';
case 'STOPPED':
return 'idle';
}
}
/**
* All event types emitted by NEEDLE's telemetry pipeline.
@ -31,6 +70,7 @@ export type NeedleEventType =
| 'worker.idle'
| 'worker.stopped'
| 'worker.draining'
| 'worker.state_transition'
// Bead lifecycle
| 'bead.claimed'
| 'bead.prompt_built'
@ -358,12 +398,15 @@ export interface ConversationParseOptions {
// ============================================
export interface LogEvent {
/** Unix timestamp in milliseconds */
/** Unix timestamp in milliseconds — display only, NOT authoritative for ordering */
ts: number;
/** Worker identifier (e.g., 'w-abc123') */
worker: string;
/** Per-worker monotonic counter — authoritative for ordering (from NeedleEvent.sequence) */
sequence?: number;
/** Log level */
level: LogLevel;
@ -398,13 +441,38 @@ export interface LogEvent {
[key: string]: unknown;
}
/**
* Compare two LogEvents by (worker, sequence), falling back to ts.
*
* Sequence is the authoritative ordering key within a worker (monotonic counter
* from NEEDLE). Timestamp is used only as a display fallback for legacy events
* that lack a sequence number.
*/
export function compareEventsBySequence(a: LogEvent, b: LogEvent): number {
const seqA = a.sequence != null && a.sequence >= 0 ? a.sequence : null;
const seqB = b.sequence != null && b.sequence >= 0 ? b.sequence : null;
if (seqA !== null && seqB !== null) {
if (a.worker !== b.worker) return a.worker.localeCompare(b.worker);
return seqA - seqB;
}
return a.ts - b.ts;
}
export interface WorkerInfo {
/** Worker identifier */
id: string;
/** Current status */
/** Current status (coarse — derived from needleState) */
status: WorkerStatus;
/** Current NEEDLE state machine state (fine-grained) */
needleState?: NeedleState;
/** Timestamp of the last state transition (ms since epoch) */
lastStateTransition?: number;
/** Last event received */
lastEvent?: LogEvent;
@ -434,6 +502,12 @@ export interface WorkerInfo {
/** Total number of events received for this worker */
eventCount: number;
/** Whether the worker appears stuck (gap-based detection) */
stuck?: boolean;
/** Human-readable reason the worker is stuck */
stuckReason?: string;
}
export interface EventFilter {
@ -567,6 +641,9 @@ export interface EventStore {
/** Query events with optional filter */
query(filter?: EventFilter): LogEvent[];
/** Query events sorted by (worker, sequence), falling back to ts */
queryOrdered(filter?: EventFilter): LogEvent[];
/** Get worker info */
getWorker(workerId: string): WorkerInfo | undefined;
@ -1092,6 +1169,70 @@ export interface DagStats {
criticalPathBeads: number;
}
// ============================================
// Span-based DAG Types (OTLP span hierarchy)
// ============================================
/**
* A node in the span DAG, built from paired .started/.finished NeedleEvents
* derived from OTLP spans. Parent-child relationships come from
* parent_span_id rather than bead dependencies.
*/
export interface SpanNode {
/** OTLP span ID */
span_id: string;
/** OTLP trace ID */
trace_id: string;
/** Parent span ID (undefined for root spans) */
parent_span_id?: string;
/** Span name (e.g. "bead.lifecycle", "tool.call", "llm.request") */
name: string;
/** Worker that emitted this span */
worker_id: string;
/** Associated bead ID (if this is a bead lifecycle span) */
bead_id?: string;
/** Start timestamp (ms from .started event) */
start_ts?: number;
/** End timestamp (ms from .finished event) */
end_ts?: number;
/** Duration in ms */
duration_ms?: number;
/** Span completion status */
status: 'ok' | 'error' | 'unknown';
/** Child spans (populated during tree construction) */
children: SpanNode[];
/** Remaining span attributes */
attributes: Record<string, unknown>;
}
/**
* A span hierarchy DAG built from OTLP span events.
* Unlike the bead DependencyGraph (which uses br graph --json),
* the SpanDag is built from the live event stream using parent_span_id
* for parent-child linkage.
*/
export interface SpanDag {
/** Root spans (no parent_span_id or orphaned) */
roots: SpanNode[];
/** All spans indexed by span_id */
allSpans: Map<string, SpanNode>;
/** Spans grouped by trace_id */
traces: Map<string, SpanNode[]>;
}
// ============================================
// Git Event Types
// ============================================

View file

@ -1,5 +1,23 @@
import React from 'react';
import { WorkerInfo } from '../types';
import { WorkerInfo, NeedleState } from '../types';
const NEEDLE_STATE_LABELS: Record<NeedleState, string> = {
BOOTING: 'BOOTING',
SELECTING: 'SELECTING',
CLAIMING: 'CLAIMING',
WORKING: 'WORKING',
CLOSING: 'CLOSING',
STOPPED: 'STOPPED',
};
const NEEDLE_STATE_COLORS: Record<NeedleState, string> = {
BOOTING: '#5bc0de',
SELECTING: '#f0ad4e',
CLAIMING: '#9b59b6',
WORKING: '#5cb85c',
CLOSING: '#f0ad4e',
STOPPED: '#777',
};
interface WorkerGridProps {
workers: WorkerInfo[];
@ -85,8 +103,13 @@ const WorkerGrid: React.FC<WorkerGridProps> = ({
{isPinned ? '📌' : '📍'}
</button>
)}
<span className={`worker-status ${worker.status}`}>
{worker.status}
<span
className={`worker-status ${worker.status}${worker.stuck ? ' stuck' : ''}`}
style={worker.needleState ? { backgroundColor: NEEDLE_STATE_COLORS[worker.needleState] } : undefined}
title={worker.stuck ? worker.stuckReason : undefined}
>
{worker.stuck && '⚡'}
{worker.needleState ? NEEDLE_STATE_LABELS[worker.needleState] : worker.status}
</span>
</div>
</div>

View file

@ -1,5 +1,13 @@
// FABRIC Web Frontend Types
export type NeedleState =
| 'BOOTING'
| 'SELECTING'
| 'CLAIMING'
| 'WORKING'
| 'CLOSING'
| 'STOPPED';
export interface LogEvent {
timestamp: string;
level: 'debug' | 'info' | 'warn' | 'error';
@ -8,6 +16,25 @@ export interface LogEvent {
message: string;
raw: string;
bead?: string; // Bead/task identifier for Focus Mode
sequence?: number; // Per-worker monotonic counter — authoritative for ordering
ts?: number; // Unix timestamp in ms (display only)
}
/**
* Compare two LogEvents by (worker, sequence), falling back to ts/timestamp.
*/
export function compareEventsBySequence(a: LogEvent, b: LogEvent): number {
const seqA = a.sequence != null && a.sequence >= 0 ? a.sequence : null;
const seqB = b.sequence != null && b.sequence >= 0 ? b.sequence : null;
if (seqA !== null && seqB !== null) {
if (a.worker !== b.worker) return a.worker.localeCompare(b.worker);
return seqA - seqB;
}
const tsA = a.ts ?? new Date(a.timestamp).getTime();
const tsB = b.ts ?? new Date(b.timestamp).getTime();
return tsA - tsB;
}
export interface WorkerInfo {
@ -15,10 +42,13 @@ export interface WorkerInfo {
lastSeen: string;
eventCount: number;
status: 'active' | 'idle' | 'error';
needleState?: NeedleState;
currentTool?: string;
recentEvents: LogEvent[];
hasCollision?: boolean;
activeFiles?: string[];
stuck?: boolean;
stuckReason?: string;
}
export interface FileCollision {
@ -239,6 +269,31 @@ export interface DagOptions {
export type DagViewMode = 'tree' | 'blockers' | 'ready' | 'stats';
// ============================================
// Span DAG Types (OTLP span hierarchy)
// ============================================
export interface SpanNode {
span_id: string;
trace_id: string;
parent_span_id: string | null;
name: string;
worker_id: string;
bead_id: string | null;
start_ts: number | null;
end_ts: number | null;
duration_ms: number | null;
status: 'ok' | 'error' | 'unknown';
attributes: Record<string, unknown>;
children: SpanNode[];
}
export interface SpanDagResponse {
roots: SpanNode[];
totalSpans: number;
traces: Array<{ trace_id: string; span_count: number }>;
}
// ============================================
// Recovery Playbook Types
// ============================================