feat(bd-zci): map OTLP metrics → analytics DB instruments

Fix histogram data point value extraction in normalizer — OTLP histogram
points carry sum/count instead of asDouble/asInt, so needle.bead.duration
was silently dropped. Add MetricAccumulator tests and end-to-end tests
validating OTLP metrics flow through to session_worker_summaries in
fabric.db with otlp-metric source preference.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-21 17:06:45 -04:00
parent 1f392c39d6
commit e74149f167
4 changed files with 375 additions and 0 deletions

View file

@ -14,6 +14,8 @@ import {
TaskMetricsRecord,
ErrorHistoryRecord,
} from './historicalStore.js';
import { MetricAccumulator, INSTRUMENT_NAMES } from './workerAnalytics.js';
import { normalizeToLogEvent } from './normalizer.js';
// Test database path
const TEST_DB_DIR = path.join(os.tmpdir(), 'fabric-test-' + Date.now());
@ -439,4 +441,193 @@ describe('HistoricalStore', () => {
defaultStore.close();
});
});
describe('OTLP Metric Persistence', () => {
it('should record metric samples and upsert session worker summaries from OTLP metrics', () => {
store.startSession('otlp-metric-sess');
const accumulator = new MetricAccumulator();
// Simulate OTLP metric events flowing through the normalizer → accumulator → store
const metrics = [
{ name: INSTRUMENT_NAMES.TOKENS_IN, value: 1500, worker: 'needle-alpha' },
{ name: INSTRUMENT_NAMES.TOKENS_OUT, value: 600, worker: 'needle-alpha' },
{ name: INSTRUMENT_NAMES.COST_USD, value: 0.084, worker: 'needle-alpha' },
{ name: INSTRUMENT_NAMES.BEAD_COMPLETED, value: 2, worker: 'needle-alpha' },
{ name: INSTRUMENT_NAMES.BEAD_FAILED, value: 1, worker: 'needle-alpha' },
{ name: INSTRUMENT_NAMES.WORKER_ERRORS, value: 1, worker: 'needle-alpha' },
{ name: INSTRUMENT_NAMES.BEAD_DURATION, value: 4500, worker: 'needle-alpha' },
];
for (const m of metrics) {
// Simulate what extractDataPoints + normalizer produces
const rawPoint = {
name: m.name,
timeUnixNano: String(Date.now() * 1_000_000),
asDouble: m.value,
attributes: [
{ key: 'worker_id', value: { stringValue: m.worker } },
{ key: 'session_id', value: { stringValue: 'otlp-metric-sess' } },
],
};
const logEvent = normalizeToLogEvent(rawPoint, 'otlp-metric');
expect(logEvent).not.toBeNull();
accumulator.processEvent(logEvent!);
}
// Drain and persist samples (mirrors store.ts flushMetricSamples)
const samples = accumulator.drainSamples();
for (const s of samples) {
store.recordMetricSample({
workerId: s.workerId,
metricName: s.metricName,
value: s.value,
timestamp: s.timestamp,
source: 'otlp-metric',
beadId: s.beadId,
});
}
// Upsert session worker summary from accumulator snapshot
const snap = accumulator.getSnapshot('needle-alpha');
expect(snap).not.toBeNull();
store.upsertSessionWorkerSummary({
workerId: 'needle-alpha',
tokensIn: snap!.tokensIn,
tokensOut: snap!.tokensOut,
costUsd: snap!.costUsd,
beadsCompleted: snap!.beadsCompleted,
beadsFailed: snap!.beadsFailed,
errors: snap!.errors,
metricsSource: 'otlp-metric',
});
// Verify metric_samples were persisted
const metricRows = store.getMetricSamples({ workerId: 'needle-alpha' });
expect(metricRows).toHaveLength(7);
expect(metricRows.every(r => r.source === 'otlp-metric')).toBe(true);
// Verify session_worker_summaries were persisted with correct values
const summaries = store.getSessionWorkerSummaries({ sessionId: 'otlp-metric-sess' });
expect(summaries).toHaveLength(1);
const summary = summaries[0];
expect(summary.worker_id).toBe('needle-alpha');
expect(summary.tokens_in).toBe(1500);
expect(summary.tokens_out).toBe(600);
expect(summary.cost_usd).toBeCloseTo(0.084);
expect(summary.beads_completed).toBe(2);
expect(summary.beads_failed).toBe(1);
expect(summary.errors).toBe(1);
expect(summary.metrics_source).toBe('otlp-metric');
// Verify getAggregatedAnalytics prefers metric-sourced data
store.endSession({
workerCount: 1,
taskCount: 3,
totalCost: 0.084,
totalTokens: 2100,
metricsSource: 'otlp-metric',
});
});
it('should handle histogram metric data points via sum field', () => {
store.startSession('histogram-sess');
// Simulate a histogram data point — no asDouble/asInt, only sum/count
const rawPoint = {
name: INSTRUMENT_NAMES.BEAD_DURATION,
timeUnixNano: String(Date.now() * 1_000_000),
sum: 12345,
count: '1',
attributes: [
{ key: 'worker_id', value: { stringValue: 'needle-beta' } },
{ key: 'session_id', value: { stringValue: 'histogram-sess' } },
],
};
const logEvent = normalizeToLogEvent(rawPoint, 'otlp-metric');
expect(logEvent).not.toBeNull();
expect(logEvent!.value).toBe(12345);
const accumulator = new MetricAccumulator();
accumulator.processEvent(logEvent!);
const snap = accumulator.getSnapshot('needle-beta');
expect(snap).not.toBeNull();
expect(snap!.durations).toEqual([12345]);
// Persist
const samples = accumulator.drainSamples();
for (const s of samples) {
store.recordMetricSample({
workerId: s.workerId,
metricName: s.metricName,
value: s.value,
timestamp: s.timestamp,
source: 'otlp-metric',
});
}
const metricRows = store.getMetricSamples({ metricName: INSTRUMENT_NAMES.BEAD_DURATION });
expect(metricRows).toHaveLength(1);
expect(metricRows[0].value).toBe(12345);
expect(metricRows[0].source).toBe('otlp-metric');
store.endSession({ workerCount: 1, taskCount: 0, totalCost: 0, totalTokens: 0 });
});
it('should prefer otlp-metric summaries over log-derived task_metrics in aggregated analytics', () => {
const now = Date.now();
// Session with both log-derived task_metrics AND otlp-metric summaries
store.startSession('mixed-sess');
// Log-derived task (coarse estimates)
store.recordTask({
workerId: 'needle-gamma',
taskType: 'bead',
startedAt: now - 10000,
endedAt: now - 5000,
cost: 0.1,
tokensIn: 700,
tokensOut: 300,
success: true,
retryCount: 0,
});
// OTLP-metric summary (authoritative)
store.upsertSessionWorkerSummary({
workerId: 'needle-gamma',
tokensIn: 1200,
tokensOut: 450,
costUsd: 0.092,
beadsCompleted: 5,
beadsFailed: 0,
errors: 0,
metricsSource: 'otlp-metric',
});
store.endSession({
workerCount: 1,
taskCount: 5,
totalCost: 0.092,
totalTokens: 1650,
metricsSource: 'otlp-metric',
});
// Query aggregated analytics — metric-sourced data should win
const analytics = store.getAggregatedAnalytics({
startTime: now - 60000,
endTime: now + 60000,
});
// The worker should appear with metric-sourced values
const gammaPerf = analytics.topPerformers.find(p => p.workerId === 'needle-gamma');
expect(gammaPerf).toBeDefined();
// OTLP metric data: 5 completed, 0.092 cost, 1650 tokens
expect(gammaPerf!.beadsCompleted).toBe(5);
expect(gammaPerf!.totalCostUsd).toBeCloseTo(0.092);
expect(gammaPerf!.totalTokens).toBe(1650);
});
});
});

View file

@ -903,6 +903,26 @@ describe('normalize otlp-metric source', () => {
expect(result!.data.value).toBe(0.05);
});
it('extracts value from histogram data point (sum field)', () => {
const metric = {
name: 'needle.bead.duration',
timeUnixNano: '1772641054008000000',
sum: 5230,
count: '1',
min: 5230,
max: 5230,
attributes: [
{ key: 'worker_id', value: { stringValue: 'tcb-alpha' } },
{ key: 'session_id', value: { stringValue: 'sess-1' } },
],
};
const result = normalize(metric, 'otlp-metric');
expect(result).not.toBeNull();
expect(result!.event_type).toBe('metric.needle.bead.duration');
expect(result!.data.value).toBe(5230);
expect(result!.worker_id).toBe('tcb-alpha');
});
it('defaults event_type to metric.unknown when name is missing', () => {
const metric = {
timeUnixNano: '1772641054008000000',

View file

@ -625,6 +625,8 @@ function normalizeOtlpMetric(raw: unknown): NeedleEvent | null {
if (metricPoint.asDouble !== undefined) data.value = metricPoint.asDouble;
else if (metricPoint.asInt !== undefined) data.value = metricPoint.asInt;
else if (typeof metricPoint.value === 'number') data.value = metricPoint.value;
// Histogram data points carry sum/count instead of asDouble/asInt
else if (typeof metricPoint.sum === 'number') data.value = metricPoint.sum;
const ne: NeedleEvent = {
timestamp,

View file

@ -4,6 +4,7 @@
import { describe, it, expect, beforeEach } from 'vitest';
import { WorkerAnalytics } from './workerAnalytics.js';
import { MetricAccumulator, INSTRUMENT_NAMES } from './workerAnalytics.js';
import { LogEvent } from './types.js';
import { CostTracker } from './tui/utils/costTracking.js';
@ -572,3 +573,164 @@ describe('WorkerAnalytics', () => {
});
});
});
describe('MetricAccumulator', () => {
let accumulator: MetricAccumulator;
const baseTime = Date.now();
beforeEach(() => {
accumulator = new MetricAccumulator();
});
function makeMetricEvent(worker: string, metricName: string, value: number, ts?: number): LogEvent {
return {
ts: ts ?? baseTime,
worker,
level: 'info',
msg: `metric.${metricName}`,
metric_name: metricName,
value,
};
}
it('accumulates token-in counts', () => {
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_IN, 100));
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_IN, 50));
const snap = accumulator.getSnapshot('w-1');
expect(snap).not.toBeNull();
expect(snap!.tokensIn).toBe(150);
});
it('accumulates token-out counts', () => {
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_OUT, 200));
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_OUT, 75));
const snap = accumulator.getSnapshot('w-1');
expect(snap!.tokensOut).toBe(275);
});
it('accumulates cost USD', () => {
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.COST_USD, 0.05));
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.COST_USD, 0.03));
const snap = accumulator.getSnapshot('w-1');
expect(snap!.costUsd).toBeCloseTo(0.08);
});
it('tracks bead completions', () => {
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_COMPLETED, 1));
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_COMPLETED, 1));
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_COMPLETED, 1));
const snap = accumulator.getSnapshot('w-1');
expect(snap!.beadsCompleted).toBe(3);
});
it('tracks bead failures', () => {
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_FAILED, 1));
const snap = accumulator.getSnapshot('w-1');
expect(snap!.beadsFailed).toBe(1);
});
it('tracks worker errors', () => {
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.WORKER_ERRORS, 2));
const snap = accumulator.getSnapshot('w-1');
expect(snap!.errors).toBe(2);
});
it('records bead duration samples', () => {
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_DURATION, 5000));
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_DURATION, 3200));
const snap = accumulator.getSnapshot('w-1');
expect(snap!.durations).toEqual([5000, 3200]);
});
it('isolates workers', () => {
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_IN, 100));
accumulator.processEvent(makeMetricEvent('w-2', INSTRUMENT_NAMES.TOKENS_IN, 200));
expect(accumulator.getSnapshot('w-1')!.tokensIn).toBe(100);
expect(accumulator.getSnapshot('w-2')!.tokensIn).toBe(200);
});
it('drains samples and clears buffer', () => {
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_IN, 100));
accumulator.processEvent(makeMetricEvent('w-2', INSTRUMENT_NAMES.COST_USD, 0.05));
const samples = accumulator.drainSamples();
expect(samples).toHaveLength(2);
expect(samples[0].metricName).toBe(INSTRUMENT_NAMES.TOKENS_IN);
expect(samples[1].metricName).toBe(INSTRUMENT_NAMES.COST_USD);
// Second drain should be empty
expect(accumulator.drainSamples()).toHaveLength(0);
});
it('hasMetricData returns false before any metric event', () => {
expect(accumulator.hasMetricData()).toBe(false);
});
it('hasMetricData returns true after processing a metric', () => {
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_IN, 100));
expect(accumulator.hasMetricData()).toBe(true);
});
it('ignores non-metric events', () => {
const event: LogEvent = {
ts: baseTime,
worker: 'w-1',
level: 'info',
msg: 'bead.completed',
};
accumulator.processEvent(event);
expect(accumulator.hasMetricData()).toBe(false);
expect(accumulator.getSnapshot('w-1')).toBeNull();
});
it('resets all state', () => {
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_IN, 100));
accumulator.reset();
expect(accumulator.hasMetricData()).toBe(false);
expect(accumulator.getSnapshot('w-1')).toBeNull();
expect(accumulator.drainSamples()).toHaveLength(0);
});
it('returns full snapshot with all fields', () => {
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_IN, 1000));
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_OUT, 500));
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.COST_USD, 0.12));
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_COMPLETED, 3));
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_FAILED, 1));
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.WORKER_ERRORS, 2));
accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_DURATION, 4000));
const snap = accumulator.getSnapshot('w-1');
expect(snap).toEqual({
tokensIn: 1000,
tokensOut: 500,
costUsd: 0.12,
beadsCompleted: 3,
beadsFailed: 1,
errors: 2,
durations: [4000],
});
});
it('uses metric_value fallback when value is missing', () => {
const event: LogEvent = {
ts: baseTime,
worker: 'w-1',
level: 'info',
msg: `metric.${INSTRUMENT_NAMES.TOKENS_IN}`,
metric_name: INSTRUMENT_NAMES.TOKENS_IN,
metric_value: 42,
};
accumulator.processEvent(event);
expect(accumulator.getSnapshot('w-1')!.tokensIn).toBe(42);
});
});