From e74149f167de955b7c8186e3623d2a4f5cc33b63 Mon Sep 17 00:00:00 2001 From: jedarden Date: Tue, 21 Apr 2026 17:06:45 -0400 Subject: [PATCH] =?UTF-8?q?feat(bd-zci):=20map=20OTLP=20metrics=20?= =?UTF-8?q?=E2=86=92=20analytics=20DB=20instruments?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix histogram data point value extraction in normalizer — OTLP histogram points carry sum/count instead of asDouble/asInt, so needle.bead.duration was silently dropped. Add MetricAccumulator tests and end-to-end tests validating OTLP metrics flow through to session_worker_summaries in fabric.db with otlp-metric source preference. Co-Authored-By: Claude Opus 4.7 --- src/historicalStore.test.ts | 191 ++++++++++++++++++++++++++++++++++++ src/normalizer.test.ts | 20 ++++ src/normalizer.ts | 2 + src/workerAnalytics.test.ts | 162 ++++++++++++++++++++++++++++++ 4 files changed, 375 insertions(+) diff --git a/src/historicalStore.test.ts b/src/historicalStore.test.ts index a5c3e9a..4198354 100644 --- a/src/historicalStore.test.ts +++ b/src/historicalStore.test.ts @@ -14,6 +14,8 @@ import { TaskMetricsRecord, ErrorHistoryRecord, } from './historicalStore.js'; +import { MetricAccumulator, INSTRUMENT_NAMES } from './workerAnalytics.js'; +import { normalizeToLogEvent } from './normalizer.js'; // Test database path const TEST_DB_DIR = path.join(os.tmpdir(), 'fabric-test-' + Date.now()); @@ -439,4 +441,193 @@ describe('HistoricalStore', () => { defaultStore.close(); }); }); + + describe('OTLP Metric Persistence', () => { + it('should record metric samples and upsert session worker summaries from OTLP metrics', () => { + store.startSession('otlp-metric-sess'); + + const accumulator = new MetricAccumulator(); + + // Simulate OTLP metric events flowing through the normalizer → accumulator → store + const metrics = [ + { name: INSTRUMENT_NAMES.TOKENS_IN, value: 1500, worker: 'needle-alpha' }, + { name: INSTRUMENT_NAMES.TOKENS_OUT, value: 600, worker: 'needle-alpha' }, + { name: INSTRUMENT_NAMES.COST_USD, value: 0.084, worker: 'needle-alpha' }, + { name: INSTRUMENT_NAMES.BEAD_COMPLETED, value: 2, worker: 'needle-alpha' }, + { name: INSTRUMENT_NAMES.BEAD_FAILED, value: 1, worker: 'needle-alpha' }, + { name: INSTRUMENT_NAMES.WORKER_ERRORS, value: 1, worker: 'needle-alpha' }, + { name: INSTRUMENT_NAMES.BEAD_DURATION, value: 4500, worker: 'needle-alpha' }, + ]; + + for (const m of metrics) { + // Simulate what extractDataPoints + normalizer produces + const rawPoint = { + name: m.name, + timeUnixNano: String(Date.now() * 1_000_000), + asDouble: m.value, + attributes: [ + { key: 'worker_id', value: { stringValue: m.worker } }, + { key: 'session_id', value: { stringValue: 'otlp-metric-sess' } }, + ], + }; + const logEvent = normalizeToLogEvent(rawPoint, 'otlp-metric'); + expect(logEvent).not.toBeNull(); + accumulator.processEvent(logEvent!); + } + + // Drain and persist samples (mirrors store.ts flushMetricSamples) + const samples = accumulator.drainSamples(); + for (const s of samples) { + store.recordMetricSample({ + workerId: s.workerId, + metricName: s.metricName, + value: s.value, + timestamp: s.timestamp, + source: 'otlp-metric', + beadId: s.beadId, + }); + } + + // Upsert session worker summary from accumulator snapshot + const snap = accumulator.getSnapshot('needle-alpha'); + expect(snap).not.toBeNull(); + store.upsertSessionWorkerSummary({ + workerId: 'needle-alpha', + tokensIn: snap!.tokensIn, + tokensOut: snap!.tokensOut, + costUsd: snap!.costUsd, + beadsCompleted: snap!.beadsCompleted, + beadsFailed: snap!.beadsFailed, + errors: snap!.errors, + metricsSource: 'otlp-metric', + }); + + // Verify metric_samples were persisted + const metricRows = store.getMetricSamples({ workerId: 'needle-alpha' }); + expect(metricRows).toHaveLength(7); + expect(metricRows.every(r => r.source === 'otlp-metric')).toBe(true); + + // Verify session_worker_summaries were persisted with correct values + const summaries = store.getSessionWorkerSummaries({ sessionId: 'otlp-metric-sess' }); + expect(summaries).toHaveLength(1); + const summary = summaries[0]; + expect(summary.worker_id).toBe('needle-alpha'); + expect(summary.tokens_in).toBe(1500); + expect(summary.tokens_out).toBe(600); + expect(summary.cost_usd).toBeCloseTo(0.084); + expect(summary.beads_completed).toBe(2); + expect(summary.beads_failed).toBe(1); + expect(summary.errors).toBe(1); + expect(summary.metrics_source).toBe('otlp-metric'); + + // Verify getAggregatedAnalytics prefers metric-sourced data + store.endSession({ + workerCount: 1, + taskCount: 3, + totalCost: 0.084, + totalTokens: 2100, + metricsSource: 'otlp-metric', + }); + }); + + it('should handle histogram metric data points via sum field', () => { + store.startSession('histogram-sess'); + + // Simulate a histogram data point — no asDouble/asInt, only sum/count + const rawPoint = { + name: INSTRUMENT_NAMES.BEAD_DURATION, + timeUnixNano: String(Date.now() * 1_000_000), + sum: 12345, + count: '1', + attributes: [ + { key: 'worker_id', value: { stringValue: 'needle-beta' } }, + { key: 'session_id', value: { stringValue: 'histogram-sess' } }, + ], + }; + + const logEvent = normalizeToLogEvent(rawPoint, 'otlp-metric'); + expect(logEvent).not.toBeNull(); + expect(logEvent!.value).toBe(12345); + + const accumulator = new MetricAccumulator(); + accumulator.processEvent(logEvent!); + + const snap = accumulator.getSnapshot('needle-beta'); + expect(snap).not.toBeNull(); + expect(snap!.durations).toEqual([12345]); + + // Persist + const samples = accumulator.drainSamples(); + for (const s of samples) { + store.recordMetricSample({ + workerId: s.workerId, + metricName: s.metricName, + value: s.value, + timestamp: s.timestamp, + source: 'otlp-metric', + }); + } + + const metricRows = store.getMetricSamples({ metricName: INSTRUMENT_NAMES.BEAD_DURATION }); + expect(metricRows).toHaveLength(1); + expect(metricRows[0].value).toBe(12345); + expect(metricRows[0].source).toBe('otlp-metric'); + + store.endSession({ workerCount: 1, taskCount: 0, totalCost: 0, totalTokens: 0 }); + }); + + it('should prefer otlp-metric summaries over log-derived task_metrics in aggregated analytics', () => { + const now = Date.now(); + + // Session with both log-derived task_metrics AND otlp-metric summaries + store.startSession('mixed-sess'); + + // Log-derived task (coarse estimates) + store.recordTask({ + workerId: 'needle-gamma', + taskType: 'bead', + startedAt: now - 10000, + endedAt: now - 5000, + cost: 0.1, + tokensIn: 700, + tokensOut: 300, + success: true, + retryCount: 0, + }); + + // OTLP-metric summary (authoritative) + store.upsertSessionWorkerSummary({ + workerId: 'needle-gamma', + tokensIn: 1200, + tokensOut: 450, + costUsd: 0.092, + beadsCompleted: 5, + beadsFailed: 0, + errors: 0, + metricsSource: 'otlp-metric', + }); + + store.endSession({ + workerCount: 1, + taskCount: 5, + totalCost: 0.092, + totalTokens: 1650, + metricsSource: 'otlp-metric', + }); + + // Query aggregated analytics — metric-sourced data should win + const analytics = store.getAggregatedAnalytics({ + startTime: now - 60000, + endTime: now + 60000, + }); + + // The worker should appear with metric-sourced values + const gammaPerf = analytics.topPerformers.find(p => p.workerId === 'needle-gamma'); + expect(gammaPerf).toBeDefined(); + // OTLP metric data: 5 completed, 0.092 cost, 1650 tokens + expect(gammaPerf!.beadsCompleted).toBe(5); + expect(gammaPerf!.totalCostUsd).toBeCloseTo(0.092); + expect(gammaPerf!.totalTokens).toBe(1650); + }); + }); }); diff --git a/src/normalizer.test.ts b/src/normalizer.test.ts index 6533c73..7eabfde 100644 --- a/src/normalizer.test.ts +++ b/src/normalizer.test.ts @@ -903,6 +903,26 @@ describe('normalize – otlp-metric source', () => { expect(result!.data.value).toBe(0.05); }); + it('extracts value from histogram data point (sum field)', () => { + const metric = { + name: 'needle.bead.duration', + timeUnixNano: '1772641054008000000', + sum: 5230, + count: '1', + min: 5230, + max: 5230, + attributes: [ + { key: 'worker_id', value: { stringValue: 'tcb-alpha' } }, + { key: 'session_id', value: { stringValue: 'sess-1' } }, + ], + }; + const result = normalize(metric, 'otlp-metric'); + expect(result).not.toBeNull(); + expect(result!.event_type).toBe('metric.needle.bead.duration'); + expect(result!.data.value).toBe(5230); + expect(result!.worker_id).toBe('tcb-alpha'); + }); + it('defaults event_type to metric.unknown when name is missing', () => { const metric = { timeUnixNano: '1772641054008000000', diff --git a/src/normalizer.ts b/src/normalizer.ts index b5e299d..84a1987 100644 --- a/src/normalizer.ts +++ b/src/normalizer.ts @@ -625,6 +625,8 @@ function normalizeOtlpMetric(raw: unknown): NeedleEvent | null { if (metricPoint.asDouble !== undefined) data.value = metricPoint.asDouble; else if (metricPoint.asInt !== undefined) data.value = metricPoint.asInt; else if (typeof metricPoint.value === 'number') data.value = metricPoint.value; + // Histogram data points carry sum/count instead of asDouble/asInt + else if (typeof metricPoint.sum === 'number') data.value = metricPoint.sum; const ne: NeedleEvent = { timestamp, diff --git a/src/workerAnalytics.test.ts b/src/workerAnalytics.test.ts index 2584be5..2a8d069 100644 --- a/src/workerAnalytics.test.ts +++ b/src/workerAnalytics.test.ts @@ -4,6 +4,7 @@ import { describe, it, expect, beforeEach } from 'vitest'; import { WorkerAnalytics } from './workerAnalytics.js'; +import { MetricAccumulator, INSTRUMENT_NAMES } from './workerAnalytics.js'; import { LogEvent } from './types.js'; import { CostTracker } from './tui/utils/costTracking.js'; @@ -572,3 +573,164 @@ describe('WorkerAnalytics', () => { }); }); }); + +describe('MetricAccumulator', () => { + let accumulator: MetricAccumulator; + const baseTime = Date.now(); + + beforeEach(() => { + accumulator = new MetricAccumulator(); + }); + + function makeMetricEvent(worker: string, metricName: string, value: number, ts?: number): LogEvent { + return { + ts: ts ?? baseTime, + worker, + level: 'info', + msg: `metric.${metricName}`, + metric_name: metricName, + value, + }; + } + + it('accumulates token-in counts', () => { + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_IN, 100)); + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_IN, 50)); + + const snap = accumulator.getSnapshot('w-1'); + expect(snap).not.toBeNull(); + expect(snap!.tokensIn).toBe(150); + }); + + it('accumulates token-out counts', () => { + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_OUT, 200)); + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_OUT, 75)); + + const snap = accumulator.getSnapshot('w-1'); + expect(snap!.tokensOut).toBe(275); + }); + + it('accumulates cost USD', () => { + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.COST_USD, 0.05)); + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.COST_USD, 0.03)); + + const snap = accumulator.getSnapshot('w-1'); + expect(snap!.costUsd).toBeCloseTo(0.08); + }); + + it('tracks bead completions', () => { + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_COMPLETED, 1)); + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_COMPLETED, 1)); + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_COMPLETED, 1)); + + const snap = accumulator.getSnapshot('w-1'); + expect(snap!.beadsCompleted).toBe(3); + }); + + it('tracks bead failures', () => { + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_FAILED, 1)); + + const snap = accumulator.getSnapshot('w-1'); + expect(snap!.beadsFailed).toBe(1); + }); + + it('tracks worker errors', () => { + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.WORKER_ERRORS, 2)); + + const snap = accumulator.getSnapshot('w-1'); + expect(snap!.errors).toBe(2); + }); + + it('records bead duration samples', () => { + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_DURATION, 5000)); + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_DURATION, 3200)); + + const snap = accumulator.getSnapshot('w-1'); + expect(snap!.durations).toEqual([5000, 3200]); + }); + + it('isolates workers', () => { + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_IN, 100)); + accumulator.processEvent(makeMetricEvent('w-2', INSTRUMENT_NAMES.TOKENS_IN, 200)); + + expect(accumulator.getSnapshot('w-1')!.tokensIn).toBe(100); + expect(accumulator.getSnapshot('w-2')!.tokensIn).toBe(200); + }); + + it('drains samples and clears buffer', () => { + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_IN, 100)); + accumulator.processEvent(makeMetricEvent('w-2', INSTRUMENT_NAMES.COST_USD, 0.05)); + + const samples = accumulator.drainSamples(); + expect(samples).toHaveLength(2); + expect(samples[0].metricName).toBe(INSTRUMENT_NAMES.TOKENS_IN); + expect(samples[1].metricName).toBe(INSTRUMENT_NAMES.COST_USD); + + // Second drain should be empty + expect(accumulator.drainSamples()).toHaveLength(0); + }); + + it('hasMetricData returns false before any metric event', () => { + expect(accumulator.hasMetricData()).toBe(false); + }); + + it('hasMetricData returns true after processing a metric', () => { + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_IN, 100)); + expect(accumulator.hasMetricData()).toBe(true); + }); + + it('ignores non-metric events', () => { + const event: LogEvent = { + ts: baseTime, + worker: 'w-1', + level: 'info', + msg: 'bead.completed', + }; + accumulator.processEvent(event); + expect(accumulator.hasMetricData()).toBe(false); + expect(accumulator.getSnapshot('w-1')).toBeNull(); + }); + + it('resets all state', () => { + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_IN, 100)); + accumulator.reset(); + + expect(accumulator.hasMetricData()).toBe(false); + expect(accumulator.getSnapshot('w-1')).toBeNull(); + expect(accumulator.drainSamples()).toHaveLength(0); + }); + + it('returns full snapshot with all fields', () => { + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_IN, 1000)); + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.TOKENS_OUT, 500)); + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.COST_USD, 0.12)); + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_COMPLETED, 3)); + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_FAILED, 1)); + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.WORKER_ERRORS, 2)); + accumulator.processEvent(makeMetricEvent('w-1', INSTRUMENT_NAMES.BEAD_DURATION, 4000)); + + const snap = accumulator.getSnapshot('w-1'); + expect(snap).toEqual({ + tokensIn: 1000, + tokensOut: 500, + costUsd: 0.12, + beadsCompleted: 3, + beadsFailed: 1, + errors: 2, + durations: [4000], + }); + }); + + it('uses metric_value fallback when value is missing', () => { + const event: LogEvent = { + ts: baseTime, + worker: 'w-1', + level: 'info', + msg: `metric.${INSTRUMENT_NAMES.TOKENS_IN}`, + metric_name: INSTRUMENT_NAMES.TOKENS_IN, + metric_value: 42, + }; + accumulator.processEvent(event); + expect(accumulator.getSnapshot('w-1')!.tokensIn).toBe(42); + }); +});