feat(bd-qtg): map OTLP spans → paired started/finished events for DAG

- normalizeOtlpSpanStart: emit {name}.started with span_id/parent_span_id/trace_id
- normalizeOtlpSpanEnd: emit {name}.finished with duration_ms and span attributes
- needleEventToLogEvent: promote span_id, parent_span_id, trace_id, span_name
- dagUtils: add buildSpanDag() using parent_span_id for parent-child linkage
- dagUtils: add findSpansForBead() for bead-to-span lookup
- Add integration test confirming bead lifecycle renders as DAG node with children
- Add namespaced OTLP attribute resolution (needle.worker.id etc.)
- Add OTLP body (AnyValue) extraction for logs

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-21 14:34:28 -04:00
parent cf7f727210
commit 6805bff904
4 changed files with 931 additions and 36 deletions

355
src/dagUtils.test.ts Normal file
View file

@ -0,0 +1,355 @@
/**
* Tests for FABRIC DAG Utilities Span DAG
*
* Covers buildSpanDag() which reconstructs OTLP span hierarchy from
* paired .started/.finished LogEvents carrying span_id / parent_span_id.
*/
import { describe, it, expect } from 'vitest';
import { buildSpanDag, findSpansForBead } from './dagUtils.js';
import { normalizeToLogEvent } from './normalizer.js';
import { LogEvent } from './types.js';
// ── Helpers ──────────────────────────────────────────────────────
function spanStartedEvent(overrides: Partial<LogEvent> & { span_id: string }): LogEvent {
return {
ts: 1000,
worker: 'tcb-alpha',
level: 'info',
msg: 'bead.lifecycle.started',
span_id: overrides.span_id,
trace_id: overrides.trace_id ?? 'trace-1',
parent_span_id: overrides.parent_span_id,
span_name: overrides.span_name ?? 'bead.lifecycle',
bead: overrides.bead,
session: 'sess-1',
...overrides,
};
}
function spanFinishedEvent(overrides: Partial<LogEvent> & { span_id: string }): LogEvent {
return {
ts: 2000,
worker: 'tcb-alpha',
level: 'info',
msg: 'bead.lifecycle.finished',
span_id: overrides.span_id,
trace_id: overrides.trace_id ?? 'trace-1',
parent_span_id: overrides.parent_span_id,
span_name: overrides.span_name ?? 'bead.lifecycle',
bead: overrides.bead,
duration_ms: overrides.duration_ms ?? 1000,
session: 'sess-1',
...overrides,
};
}
// ── buildSpanDag ──────────────────────────────────────────────────
describe('buildSpanDag', () => {
it('builds a single root span from started/finished pair', () => {
const events = [
spanStartedEvent({ span_id: 'span-1', trace_id: 'trace-1' }),
spanFinishedEvent({ span_id: 'span-1', trace_id: 'trace-1', duration_ms: 500 }),
];
const dag = buildSpanDag(events);
expect(dag.roots).toHaveLength(1);
expect(dag.roots[0].span_id).toBe('span-1');
expect(dag.roots[0].start_ts).toBe(1000);
expect(dag.roots[0].end_ts).toBe(2000);
expect(dag.roots[0].duration_ms).toBe(500);
expect(dag.roots[0].status).toBe('ok');
expect(dag.roots[0].trace_id).toBe('trace-1');
expect(dag.allSpans.size).toBe(1);
});
it('builds parent-child relationship from parent_span_id', () => {
const events = [
spanStartedEvent({ span_id: 'parent', trace_id: 'trace-1', span_name: 'bead.lifecycle' }),
spanStartedEvent({ span_id: 'child-1', trace_id: 'trace-1', parent_span_id: 'parent', span_name: 'tool.call', msg: 'tool.call.started' }),
spanFinishedEvent({ span_id: 'child-1', trace_id: 'trace-1', parent_span_id: 'parent', span_name: 'tool.call', msg: 'tool.call.finished', duration_ms: 200 }),
spanStartedEvent({ span_id: 'child-2', trace_id: 'trace-1', parent_span_id: 'parent', span_name: 'llm.request', msg: 'llm.request.started' }),
spanFinishedEvent({ span_id: 'child-2', trace_id: 'trace-1', parent_span_id: 'parent', span_name: 'llm.request', msg: 'llm.request.finished', duration_ms: 800 }),
spanFinishedEvent({ span_id: 'parent', trace_id: 'trace-1', span_name: 'bead.lifecycle', duration_ms: 1000 }),
];
const dag = buildSpanDag(events);
expect(dag.roots).toHaveLength(1);
const root = dag.roots[0];
expect(root.span_id).toBe('parent');
expect(root.children).toHaveLength(2);
expect(root.children.map(c => c.span_id)).toEqual(expect.arrayContaining(['child-1', 'child-2']));
expect(root.children.map(c => c.name)).toEqual(expect.arrayContaining(['tool.call', 'llm.request']));
expect(dag.allSpans.size).toBe(3);
});
it('groups spans by trace_id', () => {
const events = [
spanStartedEvent({ span_id: 's1', trace_id: 'trace-a' }),
spanFinishedEvent({ span_id: 's1', trace_id: 'trace-a' }),
spanStartedEvent({ span_id: 's2', trace_id: 'trace-b' }),
spanFinishedEvent({ span_id: 's2', trace_id: 'trace-b' }),
];
const dag = buildSpanDag(events);
expect(dag.traces.size).toBe(2);
expect(dag.traces.get('trace-a')!.map(s => s.span_id)).toEqual(['s1']);
expect(dag.traces.get('trace-b')!.map(s => s.span_id)).toEqual(['s2']);
});
it('marks error status from error field', () => {
const events = [
spanStartedEvent({ span_id: 's-err', trace_id: 'trace-1' }),
spanFinishedEvent({ span_id: 's-err', trace_id: 'trace-1', error: 'timeout' }),
];
const dag = buildSpanDag(events);
expect(dag.roots[0].status).toBe('error');
});
it('handles orphaned spans (parent not found)', () => {
const events = [
spanStartedEvent({ span_id: 'orphan', trace_id: 'trace-1', parent_span_id: 'missing-parent' }),
spanFinishedEvent({ span_id: 'orphan', trace_id: 'trace-1', parent_span_id: 'missing-parent' }),
];
const dag = buildSpanDag(events);
// Orphan with missing parent becomes a root
expect(dag.roots).toHaveLength(1);
expect(dag.roots[0].span_id).toBe('orphan');
});
it('returns empty dag for events without span_id', () => {
const events: LogEvent[] = [
{ ts: 1000, worker: 'w-1', level: 'info', msg: 'worker.started' },
];
const dag = buildSpanDag(events);
expect(dag.roots).toHaveLength(0);
expect(dag.allSpans.size).toBe(0);
expect(dag.traces.size).toBe(0);
});
it('handles nested span hierarchy (grandchild)', () => {
const events = [
spanStartedEvent({ span_id: 'root', trace_id: 't1', span_name: 'bead.lifecycle' }),
spanStartedEvent({ span_id: 'child', trace_id: 't1', parent_span_id: 'root', span_name: 'tool.call', msg: 'tool.call.started' }),
spanStartedEvent({ span_id: 'grandchild', trace_id: 't1', parent_span_id: 'child', span_name: 'llm.request', msg: 'llm.request.started' }),
spanFinishedEvent({ span_id: 'grandchild', trace_id: 't1', parent_span_id: 'child', span_name: 'llm.request', msg: 'llm.request.finished', duration_ms: 100 }),
spanFinishedEvent({ span_id: 'child', trace_id: 't1', parent_span_id: 'root', span_name: 'tool.call', msg: 'tool.call.finished', duration_ms: 300 }),
spanFinishedEvent({ span_id: 'root', trace_id: 't1', span_name: 'bead.lifecycle', duration_ms: 1000 }),
];
const dag = buildSpanDag(events);
expect(dag.roots).toHaveLength(1);
const root = dag.roots[0];
expect(root.span_id).toBe('root');
expect(root.children).toHaveLength(1);
expect(root.children[0].span_id).toBe('child');
expect(root.children[0].children).toHaveLength(1);
expect(root.children[0].children[0].span_id).toBe('grandchild');
});
it('stores extra event fields in span attributes', () => {
const events = [
spanStartedEvent({
span_id: 's-attrs',
trace_id: 'trace-1',
tool: 'Bash',
} as any),
];
const dag = buildSpanDag(events);
// 'tool' is in the reserved set, so it should NOT be in attributes
expect(dag.allSpans.get('s-attrs')!.attributes).not.toHaveProperty('tool');
});
});
// ── findSpansForBead ──────────────────────────────────────────────
describe('findSpansForBead', () => {
it('finds spans associated with a bead ID', () => {
const events = [
spanStartedEvent({ span_id: 's1', trace_id: 'trace-1', bead: 'bd-abc' }),
spanFinishedEvent({ span_id: 's1', trace_id: 'trace-1', bead: 'bd-abc' }),
spanStartedEvent({ span_id: 's2', trace_id: 'trace-1', bead: 'bd-xyz' }),
];
const dag = buildSpanDag(events);
const spans = findSpansForBead(dag, 'bd-abc');
expect(spans).toHaveLength(1);
expect(spans[0].span_id).toBe('s1');
});
it('returns empty array when no spans match', () => {
const events = [
spanStartedEvent({ span_id: 's1', trace_id: 'trace-1' }),
];
const dag = buildSpanDag(events);
const spans = findSpansForBead(dag, 'bd-nonexistent');
expect(spans).toHaveLength(0);
});
});
// ── Full pipeline: OTLP span → normalizer → buildSpanDag ──────────
describe('OTLP span → DAG integration', () => {
it('bead lifecycle span renders as DAG node with tool-call children', () => {
// Simulate an OTLP trace export: a bead.lifecycle root span with
// two child spans (tool.call and llm.request) as NEEDLE would emit.
// 1. Root span: bead.lifecycle for bd-integ
const rootStartSpan = {
name: 'bead.lifecycle',
traceId: 'trace-integ-001',
spanId: 'root-span',
startTimeUnixNano: '1772641054008000000',
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
{ key: 'session_id', value: { stringValue: 'sess-integ' } },
{ key: 'bead_id', value: { stringValue: 'bd-integ' } },
],
};
const rootEndSpan = {
name: 'bead.lifecycle',
traceId: 'trace-integ-001',
spanId: 'root-span',
startTimeUnixNano: '1772641054008000000',
endTimeUnixNano: '1772641060000000000',
status: { code: 'OK' },
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
{ key: 'session_id', value: { stringValue: 'sess-integ' } },
{ key: 'bead_id', value: { stringValue: 'bd-integ' } },
],
};
// 2. Child span: tool.call
const toolStartSpan = {
name: 'tool.call',
traceId: 'trace-integ-001',
spanId: 'tool-span-1',
parentSpanId: 'root-span',
startTimeUnixNano: '1772641055000000000',
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
{ key: 'tool', value: { stringValue: 'Bash' } },
],
};
const toolEndSpan = {
name: 'tool.call',
traceId: 'trace-integ-001',
spanId: 'tool-span-1',
parentSpanId: 'root-span',
startTimeUnixNano: '1772641055000000000',
endTimeUnixNano: '1772641057000000000',
status: { code: 'OK' },
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
{ key: 'tool', value: { stringValue: 'Bash' } },
],
};
// 3. Child span: llm.request
const llmStartSpan = {
name: 'llm.request',
traceId: 'trace-integ-001',
spanId: 'llm-span-1',
parentSpanId: 'root-span',
startTimeUnixNano: '1772641057100000000',
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
{ key: 'provider', value: { stringValue: 'anthropic' } },
{ key: 'model', value: { stringValue: 'sonnet' } },
],
};
const llmEndSpan = {
name: 'llm.request',
traceId: 'trace-integ-001',
spanId: 'llm-span-1',
parentSpanId: 'root-span',
startTimeUnixNano: '1772641057100000000',
endTimeUnixNano: '1772641059000000000',
status: { code: 'OK' },
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
{ key: 'provider', value: { stringValue: 'anthropic' } },
{ key: 'model', value: { stringValue: 'sonnet' } },
],
};
// Normalize all spans through the OTLP pipeline
const events: LogEvent[] = [];
for (const [startRaw, endRaw] of [
[rootStartSpan, rootEndSpan],
[toolStartSpan, toolEndSpan],
[llmStartSpan, llmEndSpan],
]) {
const startEvent = normalizeToLogEvent(startRaw, 'otlp-span-start');
const endEvent = normalizeToLogEvent(endRaw, 'otlp-span-end');
if (startEvent) events.push(startEvent);
if (endEvent) events.push(endEvent);
}
// Build the span DAG from the normalized events
const dag = buildSpanDag(events);
// Verify: one root node (bead.lifecycle)
expect(dag.roots).toHaveLength(1);
const root = dag.roots[0];
expect(root.span_id).toBe('root-span');
expect(root.name).toBe('bead.lifecycle');
expect(root.trace_id).toBe('trace-integ-001');
expect(root.bead_id).toBe('bd-integ');
expect(root.worker_id).toBe('tcb-alpha');
expect(root.status).toBe('ok');
expect(root.duration_ms).toBe(5992);
// Verify: root has two children (tool.call and llm.request)
expect(root.children).toHaveLength(2);
const toolChild = root.children.find(c => c.name === 'tool.call');
const llmChild = root.children.find(c => c.name === 'llm.request');
expect(toolChild).toBeDefined();
expect(llmChild).toBeDefined();
expect(toolChild!.parent_span_id).toBe('root-span');
expect(llmChild!.parent_span_id).toBe('root-span');
// Verify: trace grouping
expect(dag.traces.size).toBe(1);
expect(dag.traces.get('trace-integ-001')!.length).toBe(3);
// Verify: findSpansForBead works
const beadSpans = findSpansForBead(dag, 'bd-integ');
expect(beadSpans).toHaveLength(1);
expect(beadSpans[0].span_id).toBe('root-span');
});
it('event_type is {name}.started/.finished through full pipeline', () => {
const span = {
name: 'bead.lifecycle',
traceId: 'trace-et',
spanId: 'span-et',
startTimeUnixNano: '1772641054008000000',
endTimeUnixNano: '1772641058000000000',
status: { code: 'OK' },
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
{ key: 'bead_id', value: { stringValue: 'bd-et' } },
],
};
const startEvent = normalizeToLogEvent(span, 'otlp-span-start')!;
const endEvent = normalizeToLogEvent(span, 'otlp-span-end')!;
expect(startEvent.msg).toBe('bead.lifecycle.started');
expect(endEvent.msg).toBe('bead.lifecycle.finished');
expect(startEvent.span_id).toBe('span-et');
expect(endEvent.span_id).toBe('span-et');
});
});

View file

@ -14,6 +14,9 @@ import {
DagOptions,
DagStats,
BeadStatus,
SpanNode,
SpanDag,
LogEvent,
} from './types.js';
/**
@ -508,3 +511,105 @@ export function refreshDependencyGraph(options: DagOptions = {}): DependencyGrap
const rawGraph = getBrGraphJson(options);
return parseDependencyGraph(rawGraph, options);
}
// ── Span-based DAG (OTLP parent_span_id linkage) ──────────────
/** Fields on LogEvent that are standard and should not be copied to SpanNode.attributes */
const SPAN_RESERVED_KEYS = new Set([
'ts', 'worker', 'level', 'msg', 'sequence', 'session', 'bead',
'duration_ms', 'error', 'tool', 'path', 'provider', 'model',
'span_id', 'parent_span_id', 'trace_id', 'span_name',
]);
/**
* Build a span hierarchy DAG from a stream of LogEvents that carry
* `span_id` / `parent_span_id` (set by the OTLP normalizer).
*
* Each span_id produces one SpanNode. The `.started` event seeds the node
* (timestamp, name), and the matching `.finished` event closes it
* (duration, status). Parent-child edges come from `parent_span_id`.
*/
export function buildSpanDag(events: LogEvent[]): SpanDag {
const allSpans = new Map<string, SpanNode>();
const traces = new Map<string, SpanNode[]>();
for (const event of events) {
const spanId = event.span_id as string | undefined;
if (!spanId) continue;
let node = allSpans.get(spanId);
if (!node) {
node = {
span_id: spanId,
trace_id: (event.trace_id as string) || '',
parent_span_id: event.parent_span_id as string | undefined,
name: (event.span_name as string) || event.msg.replace(/\.(started|finished)$/, ''),
worker_id: event.worker,
bead_id: event.bead,
status: 'unknown',
children: [],
attributes: {},
};
allSpans.set(spanId, node);
}
// Update from .started / .finished events
const isStarted = event.msg.endsWith('.started');
const isFinished = event.msg.endsWith('.finished');
if (isStarted) {
node.start_ts = event.ts;
if (event.span_name) node.name = event.span_name as string;
} else if (isFinished) {
node.end_ts = event.ts;
node.duration_ms = event.duration_ms;
node.status = event.error ? 'error' : 'ok';
}
// Track by trace_id
if (node.trace_id) {
const traceSpans = traces.get(node.trace_id) || [];
if (!traceSpans.some(s => s.span_id === spanId)) {
traceSpans.push(node);
}
traces.set(node.trace_id, traceSpans);
}
// Copy non-reserved fields into attributes
for (const [key, value] of Object.entries(event)) {
if (!SPAN_RESERVED_KEYS.has(key) && !(key in node.attributes)) {
node.attributes[key] = value;
}
}
}
// Build parent-child tree
const roots: SpanNode[] = [];
for (const node of allSpans.values()) {
if (node.parent_span_id) {
const parent = allSpans.get(node.parent_span_id);
if (parent) {
parent.children.push(node);
} else {
roots.push(node);
}
} else {
roots.push(node);
}
}
return { roots, allSpans, traces };
}
/**
* Find spans for a specific bead ID within a SpanDag.
*/
export function findSpansForBead(dag: SpanDag, beadId: string): SpanNode[] {
const result: SpanNode[] = [];
for (const node of dag.allSpans.values()) {
if (node.bead_id === beadId) {
result.push(node);
}
}
return result;
}

View file

@ -384,6 +384,143 @@ describe('normalize otlp-log source', () => {
expect(result!.data.rate).toBe(3.14);
expect(result!.data.active).toBe(true);
});
// ── namespaced OTLP attributes ──────────────────────────────────\
it('resolves namespaced attribute keys (needle.worker.id etc.)', () => {
const record = {
timeUnixNano: '1772641054008000000',
attributes: [
{ key: 'event_type', value: { stringValue: 'bead.claimed' } },
{ key: 'needle.worker.id', value: { stringValue: 'tcb-alpha' } },
{ key: 'needle.session.id', value: { stringValue: 'sess-ns' } },
{ key: 'needle.sequence', value: { intValue: '42' } },
{ key: 'needle.bead.id', value: { stringValue: 'bd-ns' } },
],
};
const result = normalize(record, 'otlp-log');
expect(result).not.toBeNull();
expect(result!.worker_id).toBe('tcb-alpha');
expect(result!.session_id).toBe('sess-ns');
expect(result!.sequence).toBe(42);
expect(result!.bead_id).toBe('bd-ns');
});
it('prefers namespaced keys over non-namespaced when both present', () => {
const record = {
timeUnixNano: '1772641054008000000',
attributes: [
{ key: 'event_type', value: { stringValue: 'test' } },
{ key: 'worker_id', value: { stringValue: 'plain-worker' } },
{ key: 'needle.worker.id', value: { stringValue: 'ns-worker' } },
{ key: 'session_id', value: { stringValue: 'plain-sess' } },
{ key: 'needle.session.id', value: { stringValue: 'ns-sess' } },
],
};
const result = normalize(record, 'otlp-log');
expect(result!.worker_id).toBe('ns-worker');
expect(result!.session_id).toBe('ns-sess');
});
it('falls back to non-namespaced keys when namespaced absent', () => {
const record = {
timeUnixNano: '1772641054008000000',
attributes: [
{ key: 'event_type', value: { stringValue: 'test' } },
{ key: 'worker_id', value: { stringValue: 'fallback-w' } },
{ key: 'session_id', value: { stringValue: 'fallback-s' } },
],
};
const result = normalize(record, 'otlp-log');
expect(result!.worker_id).toBe('fallback-w');
expect(result!.session_id).toBe('fallback-s');
});
it('excludes namespaced keys from data payload', () => {
const record = {
timeUnixNano: '1772641054008000000',
attributes: [
{ key: 'event_type', value: { stringValue: 'test' } },
{ key: 'needle.worker.id', value: { stringValue: 'w-1' } },
{ key: 'needle.session.id', value: { stringValue: 's-1' } },
{ key: 'needle.sequence', value: { intValue: '1' } },
{ key: 'needle.bead.id', value: { stringValue: 'bd-1' } },
{ key: 'extra', value: { stringValue: 'kept' } },
],
};
const result = normalize(record, 'otlp-log');
expect(result!.data['needle.worker.id']).toBeUndefined();
expect(result!.data['needle.session.id']).toBeUndefined();
expect(result!.data['needle.sequence']).toBeUndefined();
expect(result!.data['needle.bead.id']).toBeUndefined();
expect(result!.data.extra).toBe('kept');
});
// ── body field mapping ──────────────────────────────────────────\
it('lifts body (kvlistValue) into data', () => {
const record = {
timeUnixNano: '1772641054008000000',
body: {
kvlistValue: {
values: [
{ key: 'version', value: { stringValue: '1.2.3' } },
{ key: 'count', value: { intValue: '7' } },
],
},
},
attributes: [
{ key: 'event_type', value: { stringValue: 'worker.started' } },
{ key: 'worker_id', value: { stringValue: 'w-1' } },
],
};
const result = normalize(record, 'otlp-log');
expect(result).not.toBeNull();
expect(result!.data.version).toBe('1.2.3');
expect(result!.data.count).toBe(7);
});
it('lifts body (plain JSON object) into data', () => {
const record = {
timeUnixNano: '1772641054008000000',
body: { worker_name: 'tcb-alpha', version: '2.0' },
attributes: [
{ key: 'event_type', value: { stringValue: 'worker.started' } },
{ key: 'worker_id', value: { stringValue: 'w-1' } },
],
};
const result = normalize(record, 'otlp-log');
expect(result!.data.worker_name).toBe('tcb-alpha');
expect(result!.data.version).toBe('2.0');
});
it('wraps scalar body into data.value', () => {
const record = {
timeUnixNano: '1772641054008000000',
body: { stringValue: 'hello world' },
attributes: [
{ key: 'event_type', value: { stringValue: 'test' } },
{ key: 'worker_id', value: { stringValue: 'w-1' } },
],
};
const result = normalize(record, 'otlp-log');
expect(result!.data.value).toBe('hello world');
});
it('merges attributes into data on top of body', () => {
const record = {
timeUnixNano: '1772641054008000000',
body: { version: '1.0' },
attributes: [
{ key: 'event_type', value: { stringValue: 'test' } },
{ key: 'worker_id', value: { stringValue: 'w-1' } },
{ key: 'extra_attr', value: { stringValue: 'from-attrs' } },
],
};
const result = normalize(record, 'otlp-log');
expect(result!.data.version).toBe('1.0');
expect(result!.data.extra_attr).toBe('from-attrs');
});
});
// ── OTLP-span-start source ───────────────────────────────────────
@ -454,6 +591,87 @@ describe('normalize otlp-span-start source', () => {
it('returns null for null input', () => {
expect(normalize(null, 'otlp-span-start')).toBeNull();
});
// ── span structural fields (span_id, parent_span_id, trace_id) ──
it('extracts span structural fields into data', () => {
const span = {
name: 'bead.lifecycle',
traceId: 'abc123',
spanId: 'span001',
parentSpanId: 'parent001',
startTimeUnixNano: '1772641054008000000',
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
{ key: 'session_id', value: { stringValue: 'sess-1' } },
],
};
const result = normalize(span, 'otlp-span-start');
expect(result).not.toBeNull();
expect(result!.data.span_id).toBe('span001');
expect(result!.data.parent_span_id).toBe('parent001');
expect(result!.data.trace_id).toBe('abc123');
expect(result!.data.span_name).toBe('bead.lifecycle');
});
it('uses span name for event_type as {name}.started', () => {
const span = {
name: 'tool.call',
spanId: 'span002',
startTimeUnixNano: '1772641054008000000',
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
],
};
const result = normalize(span, 'otlp-span-start');
expect(result).not.toBeNull();
expect(result!.event_type).toBe('tool.call.started');
});
it('prefers span name over explicit event_type for .started', () => {
const span = {
name: 'llm.request',
spanId: 'span003',
startTimeUnixNano: '1772641054008000000',
attributes: [
{ key: 'event_type', value: { stringValue: 'bead.claimed' } },
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
],
};
const result = normalize(span, 'otlp-span-start');
expect(result!.event_type).toBe('llm.request.started');
});
it('handles spans without structural fields (backward compat)', () => {
const span = {
attributes: [
{ key: 'event_type', value: { stringValue: 'bead.claimed' } },
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
],
};
const result = normalize(span, 'otlp-span-start');
expect(result).not.toBeNull();
expect(result!.event_type).toBe('bead.claimed');
expect(result!.data.span_id).toBeUndefined();
expect(result!.data.parent_span_id).toBeUndefined();
expect(result!.data.trace_id).toBeUndefined();
});
it('accepts snake_case span_id / parent_span_id / trace_id', () => {
const span = {
trace_id: 'trace-ns',
span_id: 'span-ns',
parent_span_id: 'parent-ns',
startTimeUnixNano: '1772641054008000000',
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
],
};
const result = normalize(span, 'otlp-span-start');
expect(result!.data.span_id).toBe('span-ns');
expect(result!.data.parent_span_id).toBe('parent-ns');
expect(result!.data.trace_id).toBe('trace-ns');
});
});
// ── OTLP-span-end source ─────────────────────────────────────────
@ -551,6 +769,85 @@ describe('normalize otlp-span-end source', () => {
it('returns null for null input', () => {
expect(normalize(null, 'otlp-span-end')).toBeNull();
});
// ── span structural fields (span_id, parent_span_id, trace_id) ──
it('extracts span structural fields into data on span end', () => {
const span = {
name: 'bead.lifecycle',
traceId: 'trace-abc',
spanId: 'span-end-001',
parentSpanId: 'parent-end-001',
startTimeUnixNano: '1772641054008000000',
endTimeUnixNano: '1772641058000000000',
status: { code: 'OK' },
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
{ key: 'bead_id', value: { stringValue: 'bd-end' } },
],
};
const result = normalize(span, 'otlp-span-end');
expect(result).not.toBeNull();
expect(result!.data.span_id).toBe('span-end-001');
expect(result!.data.parent_span_id).toBe('parent-end-001');
expect(result!.data.trace_id).toBe('trace-abc');
expect(result!.data.span_name).toBe('bead.lifecycle');
expect(result!.data.duration_ms).toBe(3992);
});
it('uses span name for event_type as {name}.finished', () => {
const span = {
name: 'tool.call',
spanId: 'span-tool-001',
endTimeUnixNano: '1772641058000000000',
status: { code: 'OK' },
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
],
};
const result = normalize(span, 'otlp-span-end');
expect(result).not.toBeNull();
expect(result!.event_type).toBe('tool.call.finished');
});
it('uses {name}.finished even on ERROR status', () => {
const span = {
name: 'llm.request',
spanId: 'span-llm-err',
endTimeUnixNano: '1772641058000000000',
status: { code: 'ERROR', message: 'rate limited' },
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
],
};
const result = normalize(span, 'otlp-span-end');
expect(result!.event_type).toBe('llm.request.finished');
expect(result!.data.error).toBe('rate limited');
});
it('promotes span fields through to LogEvent via normalizeToLogEvent', () => {
const span = {
name: 'bead.lifecycle',
traceId: 'trace-log',
spanId: 'span-log-001',
parentSpanId: 'parent-log-001',
startTimeUnixNano: '1772641054008000000',
endTimeUnixNano: '1772641058000000000',
status: { code: 'OK' },
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
{ key: 'bead_id', value: { stringValue: 'bd-log' } },
],
};
const logEvent = normalizeToLogEvent(span, 'otlp-span-end');
expect(logEvent).not.toBeNull();
expect(logEvent!.span_id).toBe('span-log-001');
expect(logEvent!.parent_span_id).toBe('parent-log-001');
expect(logEvent!.trace_id).toBe('trace-log');
expect(logEvent!.span_name).toBe('bead.lifecycle');
expect(logEvent!.bead).toBe('bd-log');
expect(logEvent!.duration_ms).toBe(3992);
});
});
// ── OTLP-metric source ───────────────────────────────────────────

View file

@ -240,29 +240,59 @@ function normalizeLegacyLogEntry(parsed: unknown): NeedleEvent | null {
/**
* Normalize an OTLP LogRecord into NeedleEvent.
*
* Expected input shape (already JSON-parsed from the OTLP/HTTP JSON export
* or decoded from OTLP/gRPC protobuf):
* Supports both namespaced OTLP semantic-convention keys and the
* non-namespaced keys that NEEDLE currently emits. Namespaced keys
* take priority when present.
*
* Namespaced: needle.worker.id, needle.session.id,
* needle.sequence, needle.bead.id
* Fallback: worker_id, session_id, sequence, bead_id
* Shared: event_type (never namespaced)
*
* The OTLP `body` field (AnyValue) is lifted to `data`. Any remaining
* attributes that are not structural fields are merged into `data`.
*
* Expected input shape (JSON-parsed from OTLP/HTTP or decoded from gRPC):
*
* {
* timeUnixNano: string,
* body: { stringValue: string },
* body: { kvlistValue: { values: [{key, value}] } }
* | { stringValue: "..." }
* | { ...plain JSON object in HTTP mode },
* attributes: [
* { key: "event_type", value: { stringValue: "worker.started" } },
* { key: "worker_id", value: { stringValue: "tcb-alpha" } },
* { key: "event_type", value: { stringValue: "worker.started" } },
* { key: "needle.worker.id", value: { stringValue: "tcb-alpha" } },
*
* ]
* }
*/
/** Namespaced → canonical field mapping */
const OTLP_ATTR_ALIASES: ReadonlyMap<string, string> = new Map([
['needle.worker.id', 'worker_id'],
['needle.session.id', 'session_id'],
['needle.sequence', 'sequence'],
['needle.bead.id', 'bead_id'],
]);
/** All attribute keys that map to structural NeedleEvent fields */
const STRUCTURAL_ATTR_KEYS = new Set([
'event_type', 'worker_id', 'session_id', 'sequence', 'bead_id',
...OTLP_ATTR_ALIASES.keys(),
]);
function normalizeOtlpLog(raw: unknown): NeedleEvent | null {
if (typeof raw !== 'object' || raw === null) return null;
const record = raw as Record<string, unknown>;
const attrs = otlpAttrs(record);
// Resolve structural fields, preferring namespaced keys
const event_type = attrs.get('event_type');
const worker_id = attrs.get('worker_id');
const session_id = attrs.get('session_id');
const sequence = attrs.get('sequence');
const worker_id = resolveAttr(attrs, 'worker_id');
const session_id = resolveAttr(attrs, 'session_id');
const sequence = resolveAttr(attrs, 'sequence');
const bead_id = resolveAttr(attrs, 'bead_id');
if (!event_type || !worker_id) return null;
@ -270,9 +300,10 @@ function normalizeOtlpLog(raw: unknown): NeedleEvent | null {
? otlpNanoToISO(record.timeUnixNano as string | number)
: new Date().toISOString();
const data: Record<string, unknown> = {};
// Start data from body (if present), then merge non-structural attributes
const data = extractBody(record);
for (const [k, v] of attrs) {
if (k !== 'event_type' && k !== 'worker_id' && k !== 'session_id' && k !== 'sequence' && k !== 'bead_id') {
if (!STRUCTURAL_ATTR_KEYS.has(k)) {
data[k] = v;
}
}
@ -286,25 +317,92 @@ function normalizeOtlpLog(raw: unknown): NeedleEvent | null {
data,
};
const bead_id = attrs.get('bead_id');
if (bead_id) ne.bead_id = String(bead_id);
return ne;
}
/** Resolve an attribute by trying the namespaced alias first, then the plain key. */
function resolveAttr(attrs: Map<string, unknown>, canonicalKey: string): unknown {
for (const [ns, plain] of OTLP_ATTR_ALIASES) {
if (plain === canonicalKey) {
const v = attrs.get(ns);
if (v !== undefined) return v;
}
}
return attrs.get(canonicalKey);
}
/** Extract the OTLP `body` AnyValue into a plain object for `data`. */
function extractBody(record: Record<string, unknown>): Record<string, unknown> {
const body = record.body;
if (body == null || typeof body !== 'object') return {};
// Array-format (protobuf JSON): { kvlistValue: { values: [{key, value}] } }
const b = body as Record<string, unknown>;
if (Array.isArray((b.kvlistValue as Record<string, unknown> | undefined)?.values)) {
const out: Record<string, unknown> = {};
for (const kv of (b.kvlistValue as { values: Array<{ key: string; value: Record<string, unknown> }> }).values) {
out[kv.key] = unwrapAnyValue(kv.value);
}
return out;
}
// Protobuf JSON scalar wrappers: { stringValue: "…" }, { intValue: 5 }, etc.
if ('stringValue' in b || 'intValue' in b || 'doubleValue' in b || 'boolValue' in b) {
return { value: unwrapAnyValue(b) };
}
// Plain JSON object (OTLP/HTTP JSON mode)
return { ...(b as Record<string, unknown>) };
}
/** Unwrap an OTLP AnyValue to a JS primitive / object. */
function unwrapAnyValue(v: Record<string, unknown>): unknown {
if ('stringValue' in v) return v.stringValue;
if ('intValue' in v) return Number(v.intValue);
if ('doubleValue' in v) return v.doubleValue;
if ('boolValue' in v) return v.boolValue;
if ('arrayValue' in v && Array.isArray((v.arrayValue as Record<string, unknown>)?.values)) {
return ((v.arrayValue as { values: Array<Record<string, unknown>> }).values)
.map(item => unwrapAnyValue(item));
}
if ('kvlistValue' in v && Array.isArray((v.kvlistValue as Record<string, unknown>)?.values)) {
const out: Record<string, unknown> = {};
for (const kv of ((v.kvlistValue as { values: Array<{ key: string; value: Record<string, unknown> }> }).values)) {
out[kv.key] = unwrapAnyValue(kv.value);
}
return out;
}
// Plain object (HTTP JSON mode)
return v;
}
/**
* Normalize an OTLP Span start event into NeedleEvent.
*
* A span start maps to a bead.claimed or worker.started event depending on
* the span's attributes.
* When the span carries a `name` field, the event_type becomes `{name}.started`.
* Otherwise falls back to explicit `event_type` attribute or `bead.claimed`.
*
* OTLP span structural fields (spanId, parentSpanId, traceId) are promoted
* into `data` as `span_id`, `parent_span_id`, `trace_id` so downstream
* consumers (DAG view, cost-per-task) can reconstruct hierarchy without
* re-parsing logs.
*/
function normalizeOtlpSpanStart(raw: unknown): NeedleEvent | null {
if (typeof raw !== 'object' || raw === null) return null;
const span = raw as Record<string, unknown>;
const attrs = otlpAttrs(span);
const event_type = attrs.get('event_type') || 'bead.claimed';
const worker_id = attrs.get('worker_id');
const spanName = (span.name || span.span_name) as string | undefined;
const explicitEventType = attrs.get('event_type');
const event_type = spanName
? `${spanName}.started`
: explicitEventType
? String(explicitEventType)
: 'bead.claimed';
const worker_id = resolveAttr(attrs, 'worker_id');
if (!worker_id) return null;
const startTime = span.startTimeUnixNano || span.start_time_unix_nano;
@ -312,23 +410,36 @@ function normalizeOtlpSpanStart(raw: unknown): NeedleEvent | null {
? otlpNanoToISO(startTime as string | number)
: new Date().toISOString();
const session_id = resolveAttr(attrs, 'session_id');
const sequence = resolveAttr(attrs, 'sequence');
const bead_id = resolveAttr(attrs, 'bead_id');
// OTLP span structural fields → data for DAG linkage
const spanId = (span.spanId || span.span_id) as string | undefined;
const parentSpanId = (span.parentSpanId || span.parent_span_id) as string | undefined;
const traceId = (span.traceId || span.trace_id) as string | undefined;
const data: Record<string, unknown> = {};
for (const [k, v] of attrs) {
if (k !== 'event_type' && k !== 'worker_id' && k !== 'session_id' && k !== 'sequence' && k !== 'bead_id') {
if (!STRUCTURAL_ATTR_KEYS.has(k)) {
data[k] = v;
}
}
if (spanId) data.span_id = spanId;
if (parentSpanId) data.parent_span_id = parentSpanId;
if (traceId) data.trace_id = traceId;
if (spanName) data.span_name = spanName;
const ne: NeedleEvent = {
timestamp,
event_type: String(event_type),
worker_id: String(worker_id),
session_id: attrs.get('session_id') ? String(attrs.get('session_id')) : '',
sequence: typeof attrs.get('sequence') === 'number' ? attrs.get('sequence') as number : -1,
session_id: session_id ? String(session_id) : '',
sequence: typeof sequence === 'number' ? sequence : -1,
data,
};
const bead_id = attrs.get('bead_id');
if (bead_id) ne.bead_id = String(bead_id);
return ne;
@ -337,8 +448,12 @@ function normalizeOtlpSpanStart(raw: unknown): NeedleEvent | null {
/**
* Normalize an OTLP Span end event into NeedleEvent.
*
* A span end maps to a bead.completed or bead.failed event depending on
* the span's status.
* When the span carries a `name` field, the event_type becomes `{name}.finished`.
* Otherwise falls back to explicit `event_type` attribute or status-based
* inference (bead.completed / bead.failed).
*
* OTLP span structural fields (spanId, parentSpanId, traceId) are promoted
* into `data` as `span_id`, `parent_span_id`, `trace_id`.
*/
function normalizeOtlpSpanEnd(raw: unknown): NeedleEvent | null {
if (typeof raw !== 'object' || raw === null) return null;
@ -346,29 +461,46 @@ function normalizeOtlpSpanEnd(raw: unknown): NeedleEvent | null {
const attrs = otlpAttrs(span);
// Determine event_type from span status
// Determine event_type: prefer {name}.finished, then explicit attribute, then status
const spanName = (span.name || span.span_name) as string | undefined;
const explicitEventType = attrs.get('event_type');
const status = span.status as Record<string, unknown> | undefined;
const code = status?.code;
const spanEventType = attrs.get('event_type');
const event_type = spanEventType
? String(spanEventType)
: (code === 'ERROR' || code === 2) ? 'bead.failed' : 'bead.completed';
const event_type = spanName
? `${spanName}.finished`
: explicitEventType
? String(explicitEventType)
: (code === 'ERROR' || code === 2) ? 'bead.failed' : 'bead.completed';
const worker_id = attrs.get('worker_id');
const worker_id = resolveAttr(attrs, 'worker_id');
if (!worker_id) return null;
const session_id = resolveAttr(attrs, 'session_id');
const sequence = resolveAttr(attrs, 'sequence');
const bead_id = resolveAttr(attrs, 'bead_id');
const endTime = span.endTimeUnixNano || span.end_time_unix_nano;
const timestamp = endTime
? otlpNanoToISO(endTime as string | number)
: new Date().toISOString();
// OTLP span structural fields → data for DAG linkage
const spanId = (span.spanId || span.span_id) as string | undefined;
const parentSpanId = (span.parentSpanId || span.parent_span_id) as string | undefined;
const traceId = (span.traceId || span.trace_id) as string | undefined;
const data: Record<string, unknown> = {};
for (const [k, v] of attrs) {
if (k !== 'event_type' && k !== 'worker_id' && k !== 'session_id' && k !== 'sequence' && k !== 'bead_id') {
if (!STRUCTURAL_ATTR_KEYS.has(k)) {
data[k] = v;
}
}
if (spanId) data.span_id = spanId;
if (parentSpanId) data.parent_span_id = parentSpanId;
if (traceId) data.trace_id = traceId;
if (spanName) data.span_name = spanName;
// Include duration if both start and end times are available
const startTime = span.startTimeUnixNano || span.start_time_unix_nano;
if (startTime && endTime) {
@ -386,12 +518,11 @@ function normalizeOtlpSpanEnd(raw: unknown): NeedleEvent | null {
timestamp,
event_type,
worker_id: String(worker_id),
session_id: attrs.get('session_id') ? String(attrs.get('session_id')) : '',
sequence: typeof attrs.get('sequence') === 'number' ? attrs.get('sequence') as number : -1,
session_id: session_id ? String(session_id) : '',
sequence: typeof sequence === 'number' ? sequence : -1,
data,
};
const bead_id = attrs.get('bead_id');
if (bead_id) ne.bead_id = String(bead_id);
return ne;
@ -410,9 +541,12 @@ function normalizeOtlpMetric(raw: unknown): NeedleEvent | null {
const attrs = otlpAttrs(metricPoint);
const metricName = metricPoint.name as string | undefined;
const worker_id = attrs.get('worker_id');
const worker_id = resolveAttr(attrs, 'worker_id');
if (!worker_id) return null;
const session_id = resolveAttr(attrs, 'session_id');
const bead_id = resolveAttr(attrs, 'bead_id');
const timeUnixNano = metricPoint.timeUnixNano || metricPoint.time_unix_nano;
const timestamp = timeUnixNano
? otlpNanoToISO(timeUnixNano as string | number)
@ -420,7 +554,7 @@ function normalizeOtlpMetric(raw: unknown): NeedleEvent | null {
const data: Record<string, unknown> = {};
for (const [k, v] of attrs) {
if (k !== 'worker_id' && k !== 'session_id' && k !== 'bead_id') {
if (!STRUCTURAL_ATTR_KEYS.has(k)) {
data[k] = v;
}
}
@ -435,12 +569,11 @@ function normalizeOtlpMetric(raw: unknown): NeedleEvent | null {
timestamp,
event_type: `metric.${metricName || 'unknown'}`,
worker_id: String(worker_id),
session_id: attrs.get('session_id') ? String(attrs.get('session_id')) : '',
session_id: session_id ? String(session_id) : '',
sequence: -1,
data,
};
const bead_id = attrs.get('bead_id');
if (bead_id) ne.bead_id = String(bead_id);
return ne;
@ -462,6 +595,7 @@ export function needleEventToLogEvent(ne: NeedleEvent): LogEvent {
worker: ne.worker_id,
level,
msg: ne.event_type,
sequence: ne.sequence >= 0 ? ne.sequence : undefined,
};
if (ne.session_id) event.session = ne.session_id;
@ -474,10 +608,14 @@ export function needleEventToLogEvent(ne: NeedleEvent): LogEvent {
if (typeof data.path === 'string') event.path = data.path;
if (typeof data.provider === 'string') event.provider = data.provider;
if (typeof data.model === 'string') event.model = data.model;
if (typeof data.span_id === 'string') event.span_id = data.span_id;
if (typeof data.parent_span_id === 'string') event.parent_span_id = data.parent_span_id;
if (typeof data.trace_id === 'string') event.trace_id = data.trace_id;
if (typeof data.span_name === 'string') event.span_name = data.span_name;
const reserved = new Set([
'duration_ms', 'error', 'tool', 'path', 'provider', 'model', 'level',
'bead_id',
'bead_id', 'span_id', 'parent_span_id', 'trace_id', 'span_name',
]);
for (const key of Object.keys(data)) {
if (!reserved.has(key) && !(key in event)) {