diff --git a/src/dagUtils.test.ts b/src/dagUtils.test.ts new file mode 100644 index 0000000..a3c30ce --- /dev/null +++ b/src/dagUtils.test.ts @@ -0,0 +1,355 @@ +/** + * Tests for FABRIC DAG Utilities — Span DAG + * + * Covers buildSpanDag() which reconstructs OTLP span hierarchy from + * paired .started/.finished LogEvents carrying span_id / parent_span_id. + */ + +import { describe, it, expect } from 'vitest'; +import { buildSpanDag, findSpansForBead } from './dagUtils.js'; +import { normalizeToLogEvent } from './normalizer.js'; +import { LogEvent } from './types.js'; + +// ── Helpers ────────────────────────────────────────────────────── + +function spanStartedEvent(overrides: Partial & { span_id: string }): LogEvent { + return { + ts: 1000, + worker: 'tcb-alpha', + level: 'info', + msg: 'bead.lifecycle.started', + span_id: overrides.span_id, + trace_id: overrides.trace_id ?? 'trace-1', + parent_span_id: overrides.parent_span_id, + span_name: overrides.span_name ?? 'bead.lifecycle', + bead: overrides.bead, + session: 'sess-1', + ...overrides, + }; +} + +function spanFinishedEvent(overrides: Partial & { span_id: string }): LogEvent { + return { + ts: 2000, + worker: 'tcb-alpha', + level: 'info', + msg: 'bead.lifecycle.finished', + span_id: overrides.span_id, + trace_id: overrides.trace_id ?? 'trace-1', + parent_span_id: overrides.parent_span_id, + span_name: overrides.span_name ?? 'bead.lifecycle', + bead: overrides.bead, + duration_ms: overrides.duration_ms ?? 1000, + session: 'sess-1', + ...overrides, + }; +} + +// ── buildSpanDag ────────────────────────────────────────────────── + +describe('buildSpanDag', () => { + it('builds a single root span from started/finished pair', () => { + const events = [ + spanStartedEvent({ span_id: 'span-1', trace_id: 'trace-1' }), + spanFinishedEvent({ span_id: 'span-1', trace_id: 'trace-1', duration_ms: 500 }), + ]; + const dag = buildSpanDag(events); + + expect(dag.roots).toHaveLength(1); + expect(dag.roots[0].span_id).toBe('span-1'); + expect(dag.roots[0].start_ts).toBe(1000); + expect(dag.roots[0].end_ts).toBe(2000); + expect(dag.roots[0].duration_ms).toBe(500); + expect(dag.roots[0].status).toBe('ok'); + expect(dag.roots[0].trace_id).toBe('trace-1'); + expect(dag.allSpans.size).toBe(1); + }); + + it('builds parent-child relationship from parent_span_id', () => { + const events = [ + spanStartedEvent({ span_id: 'parent', trace_id: 'trace-1', span_name: 'bead.lifecycle' }), + spanStartedEvent({ span_id: 'child-1', trace_id: 'trace-1', parent_span_id: 'parent', span_name: 'tool.call', msg: 'tool.call.started' }), + spanFinishedEvent({ span_id: 'child-1', trace_id: 'trace-1', parent_span_id: 'parent', span_name: 'tool.call', msg: 'tool.call.finished', duration_ms: 200 }), + spanStartedEvent({ span_id: 'child-2', trace_id: 'trace-1', parent_span_id: 'parent', span_name: 'llm.request', msg: 'llm.request.started' }), + spanFinishedEvent({ span_id: 'child-2', trace_id: 'trace-1', parent_span_id: 'parent', span_name: 'llm.request', msg: 'llm.request.finished', duration_ms: 800 }), + spanFinishedEvent({ span_id: 'parent', trace_id: 'trace-1', span_name: 'bead.lifecycle', duration_ms: 1000 }), + ]; + const dag = buildSpanDag(events); + + expect(dag.roots).toHaveLength(1); + const root = dag.roots[0]; + expect(root.span_id).toBe('parent'); + expect(root.children).toHaveLength(2); + expect(root.children.map(c => c.span_id)).toEqual(expect.arrayContaining(['child-1', 'child-2'])); + expect(root.children.map(c => c.name)).toEqual(expect.arrayContaining(['tool.call', 'llm.request'])); + expect(dag.allSpans.size).toBe(3); + }); + + it('groups spans by trace_id', () => { + const events = [ + spanStartedEvent({ span_id: 's1', trace_id: 'trace-a' }), + spanFinishedEvent({ span_id: 's1', trace_id: 'trace-a' }), + spanStartedEvent({ span_id: 's2', trace_id: 'trace-b' }), + spanFinishedEvent({ span_id: 's2', trace_id: 'trace-b' }), + ]; + const dag = buildSpanDag(events); + + expect(dag.traces.size).toBe(2); + expect(dag.traces.get('trace-a')!.map(s => s.span_id)).toEqual(['s1']); + expect(dag.traces.get('trace-b')!.map(s => s.span_id)).toEqual(['s2']); + }); + + it('marks error status from error field', () => { + const events = [ + spanStartedEvent({ span_id: 's-err', trace_id: 'trace-1' }), + spanFinishedEvent({ span_id: 's-err', trace_id: 'trace-1', error: 'timeout' }), + ]; + const dag = buildSpanDag(events); + + expect(dag.roots[0].status).toBe('error'); + }); + + it('handles orphaned spans (parent not found)', () => { + const events = [ + spanStartedEvent({ span_id: 'orphan', trace_id: 'trace-1', parent_span_id: 'missing-parent' }), + spanFinishedEvent({ span_id: 'orphan', trace_id: 'trace-1', parent_span_id: 'missing-parent' }), + ]; + const dag = buildSpanDag(events); + + // Orphan with missing parent becomes a root + expect(dag.roots).toHaveLength(1); + expect(dag.roots[0].span_id).toBe('orphan'); + }); + + it('returns empty dag for events without span_id', () => { + const events: LogEvent[] = [ + { ts: 1000, worker: 'w-1', level: 'info', msg: 'worker.started' }, + ]; + const dag = buildSpanDag(events); + + expect(dag.roots).toHaveLength(0); + expect(dag.allSpans.size).toBe(0); + expect(dag.traces.size).toBe(0); + }); + + it('handles nested span hierarchy (grandchild)', () => { + const events = [ + spanStartedEvent({ span_id: 'root', trace_id: 't1', span_name: 'bead.lifecycle' }), + spanStartedEvent({ span_id: 'child', trace_id: 't1', parent_span_id: 'root', span_name: 'tool.call', msg: 'tool.call.started' }), + spanStartedEvent({ span_id: 'grandchild', trace_id: 't1', parent_span_id: 'child', span_name: 'llm.request', msg: 'llm.request.started' }), + spanFinishedEvent({ span_id: 'grandchild', trace_id: 't1', parent_span_id: 'child', span_name: 'llm.request', msg: 'llm.request.finished', duration_ms: 100 }), + spanFinishedEvent({ span_id: 'child', trace_id: 't1', parent_span_id: 'root', span_name: 'tool.call', msg: 'tool.call.finished', duration_ms: 300 }), + spanFinishedEvent({ span_id: 'root', trace_id: 't1', span_name: 'bead.lifecycle', duration_ms: 1000 }), + ]; + const dag = buildSpanDag(events); + + expect(dag.roots).toHaveLength(1); + const root = dag.roots[0]; + expect(root.span_id).toBe('root'); + expect(root.children).toHaveLength(1); + expect(root.children[0].span_id).toBe('child'); + expect(root.children[0].children).toHaveLength(1); + expect(root.children[0].children[0].span_id).toBe('grandchild'); + }); + + it('stores extra event fields in span attributes', () => { + const events = [ + spanStartedEvent({ + span_id: 's-attrs', + trace_id: 'trace-1', + tool: 'Bash', + } as any), + ]; + const dag = buildSpanDag(events); + + // 'tool' is in the reserved set, so it should NOT be in attributes + expect(dag.allSpans.get('s-attrs')!.attributes).not.toHaveProperty('tool'); + }); +}); + +// ── findSpansForBead ────────────────────────────────────────────── + +describe('findSpansForBead', () => { + it('finds spans associated with a bead ID', () => { + const events = [ + spanStartedEvent({ span_id: 's1', trace_id: 'trace-1', bead: 'bd-abc' }), + spanFinishedEvent({ span_id: 's1', trace_id: 'trace-1', bead: 'bd-abc' }), + spanStartedEvent({ span_id: 's2', trace_id: 'trace-1', bead: 'bd-xyz' }), + ]; + const dag = buildSpanDag(events); + const spans = findSpansForBead(dag, 'bd-abc'); + + expect(spans).toHaveLength(1); + expect(spans[0].span_id).toBe('s1'); + }); + + it('returns empty array when no spans match', () => { + const events = [ + spanStartedEvent({ span_id: 's1', trace_id: 'trace-1' }), + ]; + const dag = buildSpanDag(events); + const spans = findSpansForBead(dag, 'bd-nonexistent'); + + expect(spans).toHaveLength(0); + }); +}); + +// ── Full pipeline: OTLP span → normalizer → buildSpanDag ────────── + +describe('OTLP span → DAG integration', () => { + it('bead lifecycle span renders as DAG node with tool-call children', () => { + // Simulate an OTLP trace export: a bead.lifecycle root span with + // two child spans (tool.call and llm.request) as NEEDLE would emit. + + // 1. Root span: bead.lifecycle for bd-integ + const rootStartSpan = { + name: 'bead.lifecycle', + traceId: 'trace-integ-001', + spanId: 'root-span', + startTimeUnixNano: '1772641054008000000', + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + { key: 'session_id', value: { stringValue: 'sess-integ' } }, + { key: 'bead_id', value: { stringValue: 'bd-integ' } }, + ], + }; + + const rootEndSpan = { + name: 'bead.lifecycle', + traceId: 'trace-integ-001', + spanId: 'root-span', + startTimeUnixNano: '1772641054008000000', + endTimeUnixNano: '1772641060000000000', + status: { code: 'OK' }, + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + { key: 'session_id', value: { stringValue: 'sess-integ' } }, + { key: 'bead_id', value: { stringValue: 'bd-integ' } }, + ], + }; + + // 2. Child span: tool.call + const toolStartSpan = { + name: 'tool.call', + traceId: 'trace-integ-001', + spanId: 'tool-span-1', + parentSpanId: 'root-span', + startTimeUnixNano: '1772641055000000000', + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + { key: 'tool', value: { stringValue: 'Bash' } }, + ], + }; + + const toolEndSpan = { + name: 'tool.call', + traceId: 'trace-integ-001', + spanId: 'tool-span-1', + parentSpanId: 'root-span', + startTimeUnixNano: '1772641055000000000', + endTimeUnixNano: '1772641057000000000', + status: { code: 'OK' }, + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + { key: 'tool', value: { stringValue: 'Bash' } }, + ], + }; + + // 3. Child span: llm.request + const llmStartSpan = { + name: 'llm.request', + traceId: 'trace-integ-001', + spanId: 'llm-span-1', + parentSpanId: 'root-span', + startTimeUnixNano: '1772641057100000000', + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + { key: 'provider', value: { stringValue: 'anthropic' } }, + { key: 'model', value: { stringValue: 'sonnet' } }, + ], + }; + + const llmEndSpan = { + name: 'llm.request', + traceId: 'trace-integ-001', + spanId: 'llm-span-1', + parentSpanId: 'root-span', + startTimeUnixNano: '1772641057100000000', + endTimeUnixNano: '1772641059000000000', + status: { code: 'OK' }, + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + { key: 'provider', value: { stringValue: 'anthropic' } }, + { key: 'model', value: { stringValue: 'sonnet' } }, + ], + }; + + // Normalize all spans through the OTLP pipeline + const events: LogEvent[] = []; + for (const [startRaw, endRaw] of [ + [rootStartSpan, rootEndSpan], + [toolStartSpan, toolEndSpan], + [llmStartSpan, llmEndSpan], + ]) { + const startEvent = normalizeToLogEvent(startRaw, 'otlp-span-start'); + const endEvent = normalizeToLogEvent(endRaw, 'otlp-span-end'); + if (startEvent) events.push(startEvent); + if (endEvent) events.push(endEvent); + } + + // Build the span DAG from the normalized events + const dag = buildSpanDag(events); + + // Verify: one root node (bead.lifecycle) + expect(dag.roots).toHaveLength(1); + const root = dag.roots[0]; + expect(root.span_id).toBe('root-span'); + expect(root.name).toBe('bead.lifecycle'); + expect(root.trace_id).toBe('trace-integ-001'); + expect(root.bead_id).toBe('bd-integ'); + expect(root.worker_id).toBe('tcb-alpha'); + expect(root.status).toBe('ok'); + expect(root.duration_ms).toBe(5992); + + // Verify: root has two children (tool.call and llm.request) + expect(root.children).toHaveLength(2); + const toolChild = root.children.find(c => c.name === 'tool.call'); + const llmChild = root.children.find(c => c.name === 'llm.request'); + expect(toolChild).toBeDefined(); + expect(llmChild).toBeDefined(); + expect(toolChild!.parent_span_id).toBe('root-span'); + expect(llmChild!.parent_span_id).toBe('root-span'); + + // Verify: trace grouping + expect(dag.traces.size).toBe(1); + expect(dag.traces.get('trace-integ-001')!.length).toBe(3); + + // Verify: findSpansForBead works + const beadSpans = findSpansForBead(dag, 'bd-integ'); + expect(beadSpans).toHaveLength(1); + expect(beadSpans[0].span_id).toBe('root-span'); + }); + + it('event_type is {name}.started/.finished through full pipeline', () => { + const span = { + name: 'bead.lifecycle', + traceId: 'trace-et', + spanId: 'span-et', + startTimeUnixNano: '1772641054008000000', + endTimeUnixNano: '1772641058000000000', + status: { code: 'OK' }, + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + { key: 'bead_id', value: { stringValue: 'bd-et' } }, + ], + }; + + const startEvent = normalizeToLogEvent(span, 'otlp-span-start')!; + const endEvent = normalizeToLogEvent(span, 'otlp-span-end')!; + + expect(startEvent.msg).toBe('bead.lifecycle.started'); + expect(endEvent.msg).toBe('bead.lifecycle.finished'); + expect(startEvent.span_id).toBe('span-et'); + expect(endEvent.span_id).toBe('span-et'); + }); +}); diff --git a/src/dagUtils.ts b/src/dagUtils.ts index afda292..a7fd947 100644 --- a/src/dagUtils.ts +++ b/src/dagUtils.ts @@ -14,6 +14,9 @@ import { DagOptions, DagStats, BeadStatus, + SpanNode, + SpanDag, + LogEvent, } from './types.js'; /** @@ -508,3 +511,105 @@ export function refreshDependencyGraph(options: DagOptions = {}): DependencyGrap const rawGraph = getBrGraphJson(options); return parseDependencyGraph(rawGraph, options); } + +// ── Span-based DAG (OTLP parent_span_id linkage) ────────────── + +/** Fields on LogEvent that are standard and should not be copied to SpanNode.attributes */ +const SPAN_RESERVED_KEYS = new Set([ + 'ts', 'worker', 'level', 'msg', 'sequence', 'session', 'bead', + 'duration_ms', 'error', 'tool', 'path', 'provider', 'model', + 'span_id', 'parent_span_id', 'trace_id', 'span_name', +]); + +/** + * Build a span hierarchy DAG from a stream of LogEvents that carry + * `span_id` / `parent_span_id` (set by the OTLP normalizer). + * + * Each span_id produces one SpanNode. The `.started` event seeds the node + * (timestamp, name), and the matching `.finished` event closes it + * (duration, status). Parent-child edges come from `parent_span_id`. + */ +export function buildSpanDag(events: LogEvent[]): SpanDag { + const allSpans = new Map(); + const traces = new Map(); + + for (const event of events) { + const spanId = event.span_id as string | undefined; + if (!spanId) continue; + + let node = allSpans.get(spanId); + if (!node) { + node = { + span_id: spanId, + trace_id: (event.trace_id as string) || '', + parent_span_id: event.parent_span_id as string | undefined, + name: (event.span_name as string) || event.msg.replace(/\.(started|finished)$/, ''), + worker_id: event.worker, + bead_id: event.bead, + status: 'unknown', + children: [], + attributes: {}, + }; + allSpans.set(spanId, node); + } + + // Update from .started / .finished events + const isStarted = event.msg.endsWith('.started'); + const isFinished = event.msg.endsWith('.finished'); + + if (isStarted) { + node.start_ts = event.ts; + if (event.span_name) node.name = event.span_name as string; + } else if (isFinished) { + node.end_ts = event.ts; + node.duration_ms = event.duration_ms; + node.status = event.error ? 'error' : 'ok'; + } + + // Track by trace_id + if (node.trace_id) { + const traceSpans = traces.get(node.trace_id) || []; + if (!traceSpans.some(s => s.span_id === spanId)) { + traceSpans.push(node); + } + traces.set(node.trace_id, traceSpans); + } + + // Copy non-reserved fields into attributes + for (const [key, value] of Object.entries(event)) { + if (!SPAN_RESERVED_KEYS.has(key) && !(key in node.attributes)) { + node.attributes[key] = value; + } + } + } + + // Build parent-child tree + const roots: SpanNode[] = []; + for (const node of allSpans.values()) { + if (node.parent_span_id) { + const parent = allSpans.get(node.parent_span_id); + if (parent) { + parent.children.push(node); + } else { + roots.push(node); + } + } else { + roots.push(node); + } + } + + return { roots, allSpans, traces }; +} + +/** + * Find spans for a specific bead ID within a SpanDag. + */ +export function findSpansForBead(dag: SpanDag, beadId: string): SpanNode[] { + const result: SpanNode[] = []; + for (const node of dag.allSpans.values()) { + if (node.bead_id === beadId) { + result.push(node); + } + } + return result; +} diff --git a/src/normalizer.test.ts b/src/normalizer.test.ts index ddcbed0..e2311aa 100644 --- a/src/normalizer.test.ts +++ b/src/normalizer.test.ts @@ -384,6 +384,143 @@ describe('normalize – otlp-log source', () => { expect(result!.data.rate).toBe(3.14); expect(result!.data.active).toBe(true); }); + + // ── namespaced OTLP attributes ──────────────────────────────────\ + + it('resolves namespaced attribute keys (needle.worker.id etc.)', () => { + const record = { + timeUnixNano: '1772641054008000000', + attributes: [ + { key: 'event_type', value: { stringValue: 'bead.claimed' } }, + { key: 'needle.worker.id', value: { stringValue: 'tcb-alpha' } }, + { key: 'needle.session.id', value: { stringValue: 'sess-ns' } }, + { key: 'needle.sequence', value: { intValue: '42' } }, + { key: 'needle.bead.id', value: { stringValue: 'bd-ns' } }, + ], + }; + const result = normalize(record, 'otlp-log'); + expect(result).not.toBeNull(); + expect(result!.worker_id).toBe('tcb-alpha'); + expect(result!.session_id).toBe('sess-ns'); + expect(result!.sequence).toBe(42); + expect(result!.bead_id).toBe('bd-ns'); + }); + + it('prefers namespaced keys over non-namespaced when both present', () => { + const record = { + timeUnixNano: '1772641054008000000', + attributes: [ + { key: 'event_type', value: { stringValue: 'test' } }, + { key: 'worker_id', value: { stringValue: 'plain-worker' } }, + { key: 'needle.worker.id', value: { stringValue: 'ns-worker' } }, + { key: 'session_id', value: { stringValue: 'plain-sess' } }, + { key: 'needle.session.id', value: { stringValue: 'ns-sess' } }, + ], + }; + const result = normalize(record, 'otlp-log'); + expect(result!.worker_id).toBe('ns-worker'); + expect(result!.session_id).toBe('ns-sess'); + }); + + it('falls back to non-namespaced keys when namespaced absent', () => { + const record = { + timeUnixNano: '1772641054008000000', + attributes: [ + { key: 'event_type', value: { stringValue: 'test' } }, + { key: 'worker_id', value: { stringValue: 'fallback-w' } }, + { key: 'session_id', value: { stringValue: 'fallback-s' } }, + ], + }; + const result = normalize(record, 'otlp-log'); + expect(result!.worker_id).toBe('fallback-w'); + expect(result!.session_id).toBe('fallback-s'); + }); + + it('excludes namespaced keys from data payload', () => { + const record = { + timeUnixNano: '1772641054008000000', + attributes: [ + { key: 'event_type', value: { stringValue: 'test' } }, + { key: 'needle.worker.id', value: { stringValue: 'w-1' } }, + { key: 'needle.session.id', value: { stringValue: 's-1' } }, + { key: 'needle.sequence', value: { intValue: '1' } }, + { key: 'needle.bead.id', value: { stringValue: 'bd-1' } }, + { key: 'extra', value: { stringValue: 'kept' } }, + ], + }; + const result = normalize(record, 'otlp-log'); + expect(result!.data['needle.worker.id']).toBeUndefined(); + expect(result!.data['needle.session.id']).toBeUndefined(); + expect(result!.data['needle.sequence']).toBeUndefined(); + expect(result!.data['needle.bead.id']).toBeUndefined(); + expect(result!.data.extra).toBe('kept'); + }); + + // ── body field mapping ──────────────────────────────────────────\ + + it('lifts body (kvlistValue) into data', () => { + const record = { + timeUnixNano: '1772641054008000000', + body: { + kvlistValue: { + values: [ + { key: 'version', value: { stringValue: '1.2.3' } }, + { key: 'count', value: { intValue: '7' } }, + ], + }, + }, + attributes: [ + { key: 'event_type', value: { stringValue: 'worker.started' } }, + { key: 'worker_id', value: { stringValue: 'w-1' } }, + ], + }; + const result = normalize(record, 'otlp-log'); + expect(result).not.toBeNull(); + expect(result!.data.version).toBe('1.2.3'); + expect(result!.data.count).toBe(7); + }); + + it('lifts body (plain JSON object) into data', () => { + const record = { + timeUnixNano: '1772641054008000000', + body: { worker_name: 'tcb-alpha', version: '2.0' }, + attributes: [ + { key: 'event_type', value: { stringValue: 'worker.started' } }, + { key: 'worker_id', value: { stringValue: 'w-1' } }, + ], + }; + const result = normalize(record, 'otlp-log'); + expect(result!.data.worker_name).toBe('tcb-alpha'); + expect(result!.data.version).toBe('2.0'); + }); + + it('wraps scalar body into data.value', () => { + const record = { + timeUnixNano: '1772641054008000000', + body: { stringValue: 'hello world' }, + attributes: [ + { key: 'event_type', value: { stringValue: 'test' } }, + { key: 'worker_id', value: { stringValue: 'w-1' } }, + ], + }; + const result = normalize(record, 'otlp-log'); + expect(result!.data.value).toBe('hello world'); + }); + + it('merges attributes into data on top of body', () => { + const record = { + timeUnixNano: '1772641054008000000', + body: { version: '1.0' }, + attributes: [ + { key: 'event_type', value: { stringValue: 'test' } }, + { key: 'worker_id', value: { stringValue: 'w-1' } }, + { key: 'extra_attr', value: { stringValue: 'from-attrs' } }, + ], + }; + const result = normalize(record, 'otlp-log'); + expect(result!.data.version).toBe('1.0'); + expect(result!.data.extra_attr).toBe('from-attrs'); + }); }); // ── OTLP-span-start source ─────────────────────────────────────── @@ -454,6 +591,87 @@ describe('normalize – otlp-span-start source', () => { it('returns null for null input', () => { expect(normalize(null, 'otlp-span-start')).toBeNull(); }); + + // ── span structural fields (span_id, parent_span_id, trace_id) ── + + it('extracts span structural fields into data', () => { + const span = { + name: 'bead.lifecycle', + traceId: 'abc123', + spanId: 'span001', + parentSpanId: 'parent001', + startTimeUnixNano: '1772641054008000000', + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + { key: 'session_id', value: { stringValue: 'sess-1' } }, + ], + }; + const result = normalize(span, 'otlp-span-start'); + expect(result).not.toBeNull(); + expect(result!.data.span_id).toBe('span001'); + expect(result!.data.parent_span_id).toBe('parent001'); + expect(result!.data.trace_id).toBe('abc123'); + expect(result!.data.span_name).toBe('bead.lifecycle'); + }); + + it('uses span name for event_type as {name}.started', () => { + const span = { + name: 'tool.call', + spanId: 'span002', + startTimeUnixNano: '1772641054008000000', + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + ], + }; + const result = normalize(span, 'otlp-span-start'); + expect(result).not.toBeNull(); + expect(result!.event_type).toBe('tool.call.started'); + }); + + it('prefers span name over explicit event_type for .started', () => { + const span = { + name: 'llm.request', + spanId: 'span003', + startTimeUnixNano: '1772641054008000000', + attributes: [ + { key: 'event_type', value: { stringValue: 'bead.claimed' } }, + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + ], + }; + const result = normalize(span, 'otlp-span-start'); + expect(result!.event_type).toBe('llm.request.started'); + }); + + it('handles spans without structural fields (backward compat)', () => { + const span = { + attributes: [ + { key: 'event_type', value: { stringValue: 'bead.claimed' } }, + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + ], + }; + const result = normalize(span, 'otlp-span-start'); + expect(result).not.toBeNull(); + expect(result!.event_type).toBe('bead.claimed'); + expect(result!.data.span_id).toBeUndefined(); + expect(result!.data.parent_span_id).toBeUndefined(); + expect(result!.data.trace_id).toBeUndefined(); + }); + + it('accepts snake_case span_id / parent_span_id / trace_id', () => { + const span = { + trace_id: 'trace-ns', + span_id: 'span-ns', + parent_span_id: 'parent-ns', + startTimeUnixNano: '1772641054008000000', + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + ], + }; + const result = normalize(span, 'otlp-span-start'); + expect(result!.data.span_id).toBe('span-ns'); + expect(result!.data.parent_span_id).toBe('parent-ns'); + expect(result!.data.trace_id).toBe('trace-ns'); + }); }); // ── OTLP-span-end source ───────────────────────────────────────── @@ -551,6 +769,85 @@ describe('normalize – otlp-span-end source', () => { it('returns null for null input', () => { expect(normalize(null, 'otlp-span-end')).toBeNull(); }); + + // ── span structural fields (span_id, parent_span_id, trace_id) ── + + it('extracts span structural fields into data on span end', () => { + const span = { + name: 'bead.lifecycle', + traceId: 'trace-abc', + spanId: 'span-end-001', + parentSpanId: 'parent-end-001', + startTimeUnixNano: '1772641054008000000', + endTimeUnixNano: '1772641058000000000', + status: { code: 'OK' }, + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + { key: 'bead_id', value: { stringValue: 'bd-end' } }, + ], + }; + const result = normalize(span, 'otlp-span-end'); + expect(result).not.toBeNull(); + expect(result!.data.span_id).toBe('span-end-001'); + expect(result!.data.parent_span_id).toBe('parent-end-001'); + expect(result!.data.trace_id).toBe('trace-abc'); + expect(result!.data.span_name).toBe('bead.lifecycle'); + expect(result!.data.duration_ms).toBe(3992); + }); + + it('uses span name for event_type as {name}.finished', () => { + const span = { + name: 'tool.call', + spanId: 'span-tool-001', + endTimeUnixNano: '1772641058000000000', + status: { code: 'OK' }, + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + ], + }; + const result = normalize(span, 'otlp-span-end'); + expect(result).not.toBeNull(); + expect(result!.event_type).toBe('tool.call.finished'); + }); + + it('uses {name}.finished even on ERROR status', () => { + const span = { + name: 'llm.request', + spanId: 'span-llm-err', + endTimeUnixNano: '1772641058000000000', + status: { code: 'ERROR', message: 'rate limited' }, + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + ], + }; + const result = normalize(span, 'otlp-span-end'); + expect(result!.event_type).toBe('llm.request.finished'); + expect(result!.data.error).toBe('rate limited'); + }); + + it('promotes span fields through to LogEvent via normalizeToLogEvent', () => { + const span = { + name: 'bead.lifecycle', + traceId: 'trace-log', + spanId: 'span-log-001', + parentSpanId: 'parent-log-001', + startTimeUnixNano: '1772641054008000000', + endTimeUnixNano: '1772641058000000000', + status: { code: 'OK' }, + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + { key: 'bead_id', value: { stringValue: 'bd-log' } }, + ], + }; + const logEvent = normalizeToLogEvent(span, 'otlp-span-end'); + expect(logEvent).not.toBeNull(); + expect(logEvent!.span_id).toBe('span-log-001'); + expect(logEvent!.parent_span_id).toBe('parent-log-001'); + expect(logEvent!.trace_id).toBe('trace-log'); + expect(logEvent!.span_name).toBe('bead.lifecycle'); + expect(logEvent!.bead).toBe('bd-log'); + expect(logEvent!.duration_ms).toBe(3992); + }); }); // ── OTLP-metric source ─────────────────────────────────────────── diff --git a/src/normalizer.ts b/src/normalizer.ts index 71553df..b967818 100644 --- a/src/normalizer.ts +++ b/src/normalizer.ts @@ -240,29 +240,59 @@ function normalizeLegacyLogEntry(parsed: unknown): NeedleEvent | null { /** * Normalize an OTLP LogRecord into NeedleEvent. * - * Expected input shape (already JSON-parsed from the OTLP/HTTP JSON export - * or decoded from OTLP/gRPC protobuf): + * Supports both namespaced OTLP semantic-convention keys and the + * non-namespaced keys that NEEDLE currently emits. Namespaced keys + * take priority when present. + * + * Namespaced: needle.worker.id, needle.session.id, + * needle.sequence, needle.bead.id + * Fallback: worker_id, session_id, sequence, bead_id + * Shared: event_type (never namespaced) + * + * The OTLP `body` field (AnyValue) is lifted to `data`. Any remaining + * attributes that are not structural fields are merged into `data`. + * + * Expected input shape (JSON-parsed from OTLP/HTTP or decoded from gRPC): * * { * timeUnixNano: string, - * body: { stringValue: string }, + * body: { kvlistValue: { values: [{key, value}] } } + * | { stringValue: "..." } + * | { ...plain JSON object in HTTP mode }, * attributes: [ - * { key: "event_type", value: { stringValue: "worker.started" } }, - * { key: "worker_id", value: { stringValue: "tcb-alpha" } }, + * { key: "event_type", value: { stringValue: "worker.started" } }, + * { key: "needle.worker.id", value: { stringValue: "tcb-alpha" } }, * … * ] * } */ + +/** Namespaced → canonical field mapping */ +const OTLP_ATTR_ALIASES: ReadonlyMap = new Map([ + ['needle.worker.id', 'worker_id'], + ['needle.session.id', 'session_id'], + ['needle.sequence', 'sequence'], + ['needle.bead.id', 'bead_id'], +]); + +/** All attribute keys that map to structural NeedleEvent fields */ +const STRUCTURAL_ATTR_KEYS = new Set([ + 'event_type', 'worker_id', 'session_id', 'sequence', 'bead_id', + ...OTLP_ATTR_ALIASES.keys(), +]); + function normalizeOtlpLog(raw: unknown): NeedleEvent | null { if (typeof raw !== 'object' || raw === null) return null; const record = raw as Record; const attrs = otlpAttrs(record); + // Resolve structural fields, preferring namespaced keys const event_type = attrs.get('event_type'); - const worker_id = attrs.get('worker_id'); - const session_id = attrs.get('session_id'); - const sequence = attrs.get('sequence'); + const worker_id = resolveAttr(attrs, 'worker_id'); + const session_id = resolveAttr(attrs, 'session_id'); + const sequence = resolveAttr(attrs, 'sequence'); + const bead_id = resolveAttr(attrs, 'bead_id'); if (!event_type || !worker_id) return null; @@ -270,9 +300,10 @@ function normalizeOtlpLog(raw: unknown): NeedleEvent | null { ? otlpNanoToISO(record.timeUnixNano as string | number) : new Date().toISOString(); - const data: Record = {}; + // Start data from body (if present), then merge non-structural attributes + const data = extractBody(record); for (const [k, v] of attrs) { - if (k !== 'event_type' && k !== 'worker_id' && k !== 'session_id' && k !== 'sequence' && k !== 'bead_id') { + if (!STRUCTURAL_ATTR_KEYS.has(k)) { data[k] = v; } } @@ -286,25 +317,92 @@ function normalizeOtlpLog(raw: unknown): NeedleEvent | null { data, }; - const bead_id = attrs.get('bead_id'); if (bead_id) ne.bead_id = String(bead_id); return ne; } +/** Resolve an attribute by trying the namespaced alias first, then the plain key. */ +function resolveAttr(attrs: Map, canonicalKey: string): unknown { + for (const [ns, plain] of OTLP_ATTR_ALIASES) { + if (plain === canonicalKey) { + const v = attrs.get(ns); + if (v !== undefined) return v; + } + } + return attrs.get(canonicalKey); +} + +/** Extract the OTLP `body` AnyValue into a plain object for `data`. */ +function extractBody(record: Record): Record { + const body = record.body; + if (body == null || typeof body !== 'object') return {}; + + // Array-format (protobuf JSON): { kvlistValue: { values: [{key, value}] } } + const b = body as Record; + if (Array.isArray((b.kvlistValue as Record | undefined)?.values)) { + const out: Record = {}; + for (const kv of (b.kvlistValue as { values: Array<{ key: string; value: Record }> }).values) { + out[kv.key] = unwrapAnyValue(kv.value); + } + return out; + } + + // Protobuf JSON scalar wrappers: { stringValue: "…" }, { intValue: 5 }, etc. + if ('stringValue' in b || 'intValue' in b || 'doubleValue' in b || 'boolValue' in b) { + return { value: unwrapAnyValue(b) }; + } + + // Plain JSON object (OTLP/HTTP JSON mode) + return { ...(b as Record) }; +} + +/** Unwrap an OTLP AnyValue to a JS primitive / object. */ +function unwrapAnyValue(v: Record): unknown { + if ('stringValue' in v) return v.stringValue; + if ('intValue' in v) return Number(v.intValue); + if ('doubleValue' in v) return v.doubleValue; + if ('boolValue' in v) return v.boolValue; + if ('arrayValue' in v && Array.isArray((v.arrayValue as Record)?.values)) { + return ((v.arrayValue as { values: Array> }).values) + .map(item => unwrapAnyValue(item)); + } + if ('kvlistValue' in v && Array.isArray((v.kvlistValue as Record)?.values)) { + const out: Record = {}; + for (const kv of ((v.kvlistValue as { values: Array<{ key: string; value: Record }> }).values)) { + out[kv.key] = unwrapAnyValue(kv.value); + } + return out; + } + // Plain object (HTTP JSON mode) + return v; +} + /** * Normalize an OTLP Span start event into NeedleEvent. * - * A span start maps to a bead.claimed or worker.started event depending on - * the span's attributes. + * When the span carries a `name` field, the event_type becomes `{name}.started`. + * Otherwise falls back to explicit `event_type` attribute or `bead.claimed`. + * + * OTLP span structural fields (spanId, parentSpanId, traceId) are promoted + * into `data` as `span_id`, `parent_span_id`, `trace_id` so downstream + * consumers (DAG view, cost-per-task) can reconstruct hierarchy without + * re-parsing logs. */ function normalizeOtlpSpanStart(raw: unknown): NeedleEvent | null { if (typeof raw !== 'object' || raw === null) return null; const span = raw as Record; const attrs = otlpAttrs(span); - const event_type = attrs.get('event_type') || 'bead.claimed'; - const worker_id = attrs.get('worker_id'); + const spanName = (span.name || span.span_name) as string | undefined; + const explicitEventType = attrs.get('event_type'); + const event_type = spanName + ? `${spanName}.started` + : explicitEventType + ? String(explicitEventType) + : 'bead.claimed'; + + const worker_id = resolveAttr(attrs, 'worker_id'); if (!worker_id) return null; const startTime = span.startTimeUnixNano || span.start_time_unix_nano; @@ -312,23 +410,36 @@ function normalizeOtlpSpanStart(raw: unknown): NeedleEvent | null { ? otlpNanoToISO(startTime as string | number) : new Date().toISOString(); + const session_id = resolveAttr(attrs, 'session_id'); + const sequence = resolveAttr(attrs, 'sequence'); + const bead_id = resolveAttr(attrs, 'bead_id'); + + // OTLP span structural fields → data for DAG linkage + const spanId = (span.spanId || span.span_id) as string | undefined; + const parentSpanId = (span.parentSpanId || span.parent_span_id) as string | undefined; + const traceId = (span.traceId || span.trace_id) as string | undefined; + const data: Record = {}; for (const [k, v] of attrs) { - if (k !== 'event_type' && k !== 'worker_id' && k !== 'session_id' && k !== 'sequence' && k !== 'bead_id') { + if (!STRUCTURAL_ATTR_KEYS.has(k)) { data[k] = v; } } + if (spanId) data.span_id = spanId; + if (parentSpanId) data.parent_span_id = parentSpanId; + if (traceId) data.trace_id = traceId; + if (spanName) data.span_name = spanName; + const ne: NeedleEvent = { timestamp, event_type: String(event_type), worker_id: String(worker_id), - session_id: attrs.get('session_id') ? String(attrs.get('session_id')) : '', - sequence: typeof attrs.get('sequence') === 'number' ? attrs.get('sequence') as number : -1, + session_id: session_id ? String(session_id) : '', + sequence: typeof sequence === 'number' ? sequence : -1, data, }; - const bead_id = attrs.get('bead_id'); if (bead_id) ne.bead_id = String(bead_id); return ne; @@ -337,8 +448,12 @@ function normalizeOtlpSpanStart(raw: unknown): NeedleEvent | null { /** * Normalize an OTLP Span end event into NeedleEvent. * - * A span end maps to a bead.completed or bead.failed event depending on - * the span's status. + * When the span carries a `name` field, the event_type becomes `{name}.finished`. + * Otherwise falls back to explicit `event_type` attribute or status-based + * inference (bead.completed / bead.failed). + * + * OTLP span structural fields (spanId, parentSpanId, traceId) are promoted + * into `data` as `span_id`, `parent_span_id`, `trace_id`. */ function normalizeOtlpSpanEnd(raw: unknown): NeedleEvent | null { if (typeof raw !== 'object' || raw === null) return null; @@ -346,29 +461,46 @@ function normalizeOtlpSpanEnd(raw: unknown): NeedleEvent | null { const attrs = otlpAttrs(span); - // Determine event_type from span status + // Determine event_type: prefer {name}.finished, then explicit attribute, then status + const spanName = (span.name || span.span_name) as string | undefined; + const explicitEventType = attrs.get('event_type'); const status = span.status as Record | undefined; const code = status?.code; - const spanEventType = attrs.get('event_type'); - const event_type = spanEventType - ? String(spanEventType) - : (code === 'ERROR' || code === 2) ? 'bead.failed' : 'bead.completed'; + const event_type = spanName + ? `${spanName}.finished` + : explicitEventType + ? String(explicitEventType) + : (code === 'ERROR' || code === 2) ? 'bead.failed' : 'bead.completed'; - const worker_id = attrs.get('worker_id'); + const worker_id = resolveAttr(attrs, 'worker_id'); if (!worker_id) return null; + const session_id = resolveAttr(attrs, 'session_id'); + const sequence = resolveAttr(attrs, 'sequence'); + const bead_id = resolveAttr(attrs, 'bead_id'); + const endTime = span.endTimeUnixNano || span.end_time_unix_nano; const timestamp = endTime ? otlpNanoToISO(endTime as string | number) : new Date().toISOString(); + // OTLP span structural fields → data for DAG linkage + const spanId = (span.spanId || span.span_id) as string | undefined; + const parentSpanId = (span.parentSpanId || span.parent_span_id) as string | undefined; + const traceId = (span.traceId || span.trace_id) as string | undefined; + const data: Record = {}; for (const [k, v] of attrs) { - if (k !== 'event_type' && k !== 'worker_id' && k !== 'session_id' && k !== 'sequence' && k !== 'bead_id') { + if (!STRUCTURAL_ATTR_KEYS.has(k)) { data[k] = v; } } + if (spanId) data.span_id = spanId; + if (parentSpanId) data.parent_span_id = parentSpanId; + if (traceId) data.trace_id = traceId; + if (spanName) data.span_name = spanName; + // Include duration if both start and end times are available const startTime = span.startTimeUnixNano || span.start_time_unix_nano; if (startTime && endTime) { @@ -386,12 +518,11 @@ function normalizeOtlpSpanEnd(raw: unknown): NeedleEvent | null { timestamp, event_type, worker_id: String(worker_id), - session_id: attrs.get('session_id') ? String(attrs.get('session_id')) : '', - sequence: typeof attrs.get('sequence') === 'number' ? attrs.get('sequence') as number : -1, + session_id: session_id ? String(session_id) : '', + sequence: typeof sequence === 'number' ? sequence : -1, data, }; - const bead_id = attrs.get('bead_id'); if (bead_id) ne.bead_id = String(bead_id); return ne; @@ -410,9 +541,12 @@ function normalizeOtlpMetric(raw: unknown): NeedleEvent | null { const attrs = otlpAttrs(metricPoint); const metricName = metricPoint.name as string | undefined; - const worker_id = attrs.get('worker_id'); + const worker_id = resolveAttr(attrs, 'worker_id'); if (!worker_id) return null; + const session_id = resolveAttr(attrs, 'session_id'); + const bead_id = resolveAttr(attrs, 'bead_id'); + const timeUnixNano = metricPoint.timeUnixNano || metricPoint.time_unix_nano; const timestamp = timeUnixNano ? otlpNanoToISO(timeUnixNano as string | number) @@ -420,7 +554,7 @@ function normalizeOtlpMetric(raw: unknown): NeedleEvent | null { const data: Record = {}; for (const [k, v] of attrs) { - if (k !== 'worker_id' && k !== 'session_id' && k !== 'bead_id') { + if (!STRUCTURAL_ATTR_KEYS.has(k)) { data[k] = v; } } @@ -435,12 +569,11 @@ function normalizeOtlpMetric(raw: unknown): NeedleEvent | null { timestamp, event_type: `metric.${metricName || 'unknown'}`, worker_id: String(worker_id), - session_id: attrs.get('session_id') ? String(attrs.get('session_id')) : '', + session_id: session_id ? String(session_id) : '', sequence: -1, data, }; - const bead_id = attrs.get('bead_id'); if (bead_id) ne.bead_id = String(bead_id); return ne; @@ -462,6 +595,7 @@ export function needleEventToLogEvent(ne: NeedleEvent): LogEvent { worker: ne.worker_id, level, msg: ne.event_type, + sequence: ne.sequence >= 0 ? ne.sequence : undefined, }; if (ne.session_id) event.session = ne.session_id; @@ -474,10 +608,14 @@ export function needleEventToLogEvent(ne: NeedleEvent): LogEvent { if (typeof data.path === 'string') event.path = data.path; if (typeof data.provider === 'string') event.provider = data.provider; if (typeof data.model === 'string') event.model = data.model; + if (typeof data.span_id === 'string') event.span_id = data.span_id; + if (typeof data.parent_span_id === 'string') event.parent_span_id = data.parent_span_id; + if (typeof data.trace_id === 'string') event.trace_id = data.trace_id; + if (typeof data.span_name === 'string') event.span_name = data.span_name; const reserved = new Set([ 'duration_ms', 'error', 'tool', 'path', 'provider', 'model', 'level', - 'bead_id', + 'bead_id', 'span_id', 'parent_span_id', 'trace_id', 'span_name', ]); for (const key of Object.keys(data)) { if (!reserved.has(key) && !(key in event)) {