diff --git a/docs/schema.md b/docs/schema.md index d12ae51..aa5086e 100644 --- a/docs/schema.md +++ b/docs/schema.md @@ -210,6 +210,17 @@ estimates when both are present for the same worker + session. | `needle.bead.failed` | Sum | count | `session_worker_summaries.beads_failed` | | `needle.worker.errors` | Sum | count | `session_worker_summaries.errors` | +### Accepted Aliases + +NEEDLE's telemetry module (`OtlpMetricSink`) emits some instruments under +slightly different names. FABRIC resolves these to the canonical names above +via the `INSTRUMENT_ALIASES` map in `src/workerAnalytics.ts`: + +| NEEDLE Emitted Name | Canonical Name | +|---|---| +| `needle.worker.beads.completed` | `needle.bead.completed` | +| `needle.worker.beads.failed` | `needle.bead.failed` | + ### Attribute Requirements Every metric data point **must** carry these OTLP attributes (namespaced form diff --git a/src/historicalStore.test.ts b/src/historicalStore.test.ts index 185db9a..1e7d7d5 100644 --- a/src/historicalStore.test.ts +++ b/src/historicalStore.test.ts @@ -703,5 +703,61 @@ describe('HistoricalStore', () => { store.endSession({ workerCount: 1, taskCount: 0, totalCost: 0, totalTokens: 800, metricsSource: 'otlp-metric' }); }); + + it('should protect otlp-metric rows from log-derived overwrites', () => { + store.startSession('priority-sess'); + + // Write the authoritative metric-sourced row first + store.upsertSessionWorkerSummary({ + workerId: 'needle-zeta', + tokensIn: 2000, + tokensOut: 800, + costUsd: 0.12, + beadsCompleted: 6, + beadsFailed: 0, + errors: 0, + metricsSource: 'otlp-metric', + }); + + // Attempt to overwrite with lower-priority log-derived data + store.upsertSessionWorkerSummary({ + workerId: 'needle-zeta', + tokensIn: 500, + tokensOut: 200, + costUsd: 0.05, + beadsCompleted: 2, + beadsFailed: 1, + errors: 1, + metricsSource: 'log-derived', + }); + + const summaries = store.getSessionWorkerSummaries({ sessionId: 'priority-sess' }); + expect(summaries).toHaveLength(1); + const s = summaries[0]; + // Metric-sourced values must survive the log-derived overwrite + expect(s.metricsSource).toBe('otlp-metric'); + expect(s.tokensIn).toBe(2000); + expect(s.tokensOut).toBe(800); + expect(s.costUsd).toBeCloseTo(0.12); + expect(s.beadsCompleted).toBe(6); + + // A second metric-sourced write should still update (equal priority) + store.upsertSessionWorkerSummary({ + workerId: 'needle-zeta', + tokensIn: 2500, + tokensOut: 1000, + costUsd: 0.15, + beadsCompleted: 8, + beadsFailed: 0, + errors: 0, + metricsSource: 'otlp-metric', + }); + + const updated = store.getSessionWorkerSummaries({ sessionId: 'priority-sess' }); + expect(updated[0].tokensIn).toBe(2500); + expect(updated[0].beadsCompleted).toBe(8); + + store.endSession({ workerCount: 1, taskCount: 8, totalCost: 0.15, totalTokens: 3500, metricsSource: 'otlp-metric' }); + }); }); }); diff --git a/src/historicalStore.ts b/src/historicalStore.ts index 1cea2a4..557cbe1 100644 --- a/src/historicalStore.ts +++ b/src/historicalStore.ts @@ -504,18 +504,23 @@ export class HistoricalStore { this.startSession(); } + // Source priority: otlp-metric (0) > otlp-span (1) > log-derived (2). + // Only overwrite existing row when the incoming source has equal or higher priority. + const incomingRank = summary.metricsSource === 'otlp-metric' ? 0 + : summary.metricsSource === 'otlp-span' ? 1 : 2; + this.db.prepare(` INSERT INTO session_worker_summaries (session_id, worker_id, tokens_in, tokens_out, cost_usd, beads_completed, beads_failed, errors, metrics_source, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(session_id, worker_id) DO UPDATE SET - tokens_in = excluded.tokens_in, - tokens_out = excluded.tokens_out, - cost_usd = excluded.cost_usd, - beads_completed = excluded.beads_completed, - beads_failed = excluded.beads_failed, - errors = excluded.errors, - metrics_source = excluded.metrics_source, + tokens_in = CASE WHEN ${incomingRank} <= CASE metrics_source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END THEN excluded.tokens_in ELSE tokens_in END, + tokens_out = CASE WHEN ${incomingRank} <= CASE metrics_source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END THEN excluded.tokens_out ELSE tokens_out END, + cost_usd = CASE WHEN ${incomingRank} <= CASE metrics_source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END THEN excluded.cost_usd ELSE cost_usd END, + beads_completed = CASE WHEN ${incomingRank} <= CASE metrics_source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END THEN excluded.beads_completed ELSE beads_completed END, + beads_failed = CASE WHEN ${incomingRank} <= CASE metrics_source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END THEN excluded.beads_failed ELSE beads_failed END, + errors = CASE WHEN ${incomingRank} <= CASE metrics_source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END THEN excluded.errors ELSE errors END, + metrics_source = CASE WHEN ${incomingRank} <= CASE metrics_source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END THEN excluded.metrics_source ELSE metrics_source END, updated_at = excluded.updated_at `).run( this.currentSessionId, @@ -655,7 +660,26 @@ export class HistoricalStore { query += ` ORDER BY CASE source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END, timestamp DESC LIMIT ?`; params.push(options.limit || 1000); - return this.db.prepare(query).all(...params) as any[]; + const rows = this.db.prepare(query).all(...params) as Array<{ + id: number; + worker_id: string; + metric_name: string; + value: number; + timestamp: number; + source: string; + bead_id: string | null; + session_id: string | null; + }>; + + return rows.map(row => ({ + workerId: row.worker_id, + metricName: row.metric_name, + value: row.value, + timestamp: row.timestamp, + source: row.source, + beadId: row.bead_id, + sessionId: row.session_id, + })); } /** diff --git a/src/store.ts b/src/store.ts index 6fb36cb..d675933 100644 --- a/src/store.ts +++ b/src/store.ts @@ -284,22 +284,25 @@ export class InMemoryEventStore implements EventStore { }); } - // Upsert per-worker session summaries from the accumulator snapshots + // Upsert per-worker session summaries from the accumulator snapshots. + // Only write rows for workers that have actual OTLP metric snapshots — + // the upsert will protect any existing metric-sourced rows from + // lower-priority log-derived overwrites. const hasMetricData = accumulator.hasMetricData(); if (hasMetricData) { - // Get all workers that have metric snapshots const allWorkerMetrics = this.workerAnalytics.getAllWorkerMetrics({ timeWindow: 'all' }); for (const wm of allWorkerMetrics) { const metricSnap = accumulator.getSnapshot(wm.workerId); + if (!metricSnap) continue; this.historicalStore.upsertSessionWorkerSummary({ workerId: wm.workerId, - tokensIn: metricSnap?.tokensIn ?? Math.floor(wm.totalTokens * 0.7), - tokensOut: metricSnap?.tokensOut ?? Math.floor(wm.totalTokens * 0.3), - costUsd: metricSnap?.costUsd ?? wm.totalCostUsd, - beadsCompleted: metricSnap?.beadsCompleted ?? wm.beadsCompleted, - beadsFailed: metricSnap?.beadsFailed ?? 0, - errors: metricSnap?.errors ?? wm.errorCount, - metricsSource: metricSnap ? 'otlp-metric' : 'log-derived', + tokensIn: metricSnap.tokensIn, + tokensOut: metricSnap.tokensOut, + costUsd: metricSnap.costUsd, + beadsCompleted: metricSnap.beadsCompleted, + beadsFailed: metricSnap.beadsFailed, + errors: metricSnap.errors, + metricsSource: 'otlp-metric', }); } } @@ -349,19 +352,34 @@ export class InMemoryEventStore implements EventStore { const workerEvents = this.events.filter(e => e.bead === beadId); const workerId = workerEvents[0]?.worker || 'unknown'; - // Get cost info for this task - const costSummary = this.workerAnalytics.getAllWorkerMetrics({ workerIds: [workerId] }); - const workerCost = costSummary[0]?.totalCostUsd || 0; - const workerTokens = costSummary[0]?.totalTokens || 0; + // Prefer OTLP metric snapshot over log-derived estimates + const accumulator = this.workerAnalytics.getMetricAccumulator(); + const metricSnap = accumulator.getSnapshot(workerId); + + let tokensIn: number; + let tokensOut: number; + let cost: number; + + if (metricSnap) { + tokensIn = metricSnap.tokensIn; + tokensOut = metricSnap.tokensOut; + cost = metricSnap.costUsd; + } else { + const costSummary = this.workerAnalytics.getAllWorkerMetrics({ workerIds: [workerId] }); + cost = costSummary[0]?.totalCostUsd || 0; + const workerTokens = costSummary[0]?.totalTokens || 0; + tokensIn = Math.floor(workerTokens * 0.7); + tokensOut = Math.floor(workerTokens * 0.3); + } this.historicalStore.recordTask({ workerId, taskType: 'bead', startedAt: startTime, endedAt: completionEvent.ts, - cost: workerCost, - tokensIn: Math.floor(workerTokens * 0.7), // Estimate - tokensOut: Math.floor(workerTokens * 0.3), // Estimate + cost, + tokensIn, + tokensOut, success: completionEvent.level !== 'error', retryCount: 0, }); @@ -574,11 +592,25 @@ export class InMemoryEventStore implements EventStore { if (startTime) { const durationMs = event.ts - startTime; - // Get cost info for this worker - const workerMetrics = this.workerAnalytics.getWorkerMetrics(event.worker); - const cost = workerMetrics?.costPerBead || 0; - const tokensIn = Math.floor((workerMetrics?.totalTokens || 0) * 0.7); - const tokensOut = Math.floor((workerMetrics?.totalTokens || 0) * 0.3); + // Prefer OTLP metric snapshot over log-derived estimates + const accumulator = this.workerAnalytics.getMetricAccumulator(); + const metricSnap = accumulator.getSnapshot(event.worker); + + let tokensIn: number; + let tokensOut: number; + let cost: number; + + if (metricSnap) { + tokensIn = metricSnap.tokensIn; + tokensOut = metricSnap.tokensOut; + cost = metricSnap.costUsd; + } else { + // Fallback: log-derived estimate with 70/30 split + const workerMetrics = this.workerAnalytics.getWorkerMetrics(event.worker); + cost = workerMetrics?.costPerBead || 0; + tokensIn = Math.floor((workerMetrics?.totalTokens || 0) * 0.7); + tokensOut = Math.floor((workerMetrics?.totalTokens || 0) * 0.3); + } this.historicalStore.recordTask({ workerId: event.worker, diff --git a/src/workerAnalytics.test.ts b/src/workerAnalytics.test.ts index 2a8d069..87e3ab2 100644 --- a/src/workerAnalytics.test.ts +++ b/src/workerAnalytics.test.ts @@ -4,7 +4,7 @@ import { describe, it, expect, beforeEach } from 'vitest'; import { WorkerAnalytics } from './workerAnalytics.js'; -import { MetricAccumulator, INSTRUMENT_NAMES } from './workerAnalytics.js'; +import { MetricAccumulator, INSTRUMENT_NAMES, resolveInstrumentName } from './workerAnalytics.js'; import { LogEvent } from './types.js'; import { CostTracker } from './tui/utils/costTracking.js'; @@ -733,4 +733,44 @@ describe('MetricAccumulator', () => { accumulator.processEvent(event); expect(accumulator.getSnapshot('w-1')!.tokensIn).toBe(42); }); + + describe('alias resolution (NEEDLE naming convention)', () => { + it('resolves needle.worker.beads.completed to needle.bead.completed', () => { + accumulator.processEvent(makeMetricEvent('w-1', 'needle.worker.beads.completed', 1)); + accumulator.processEvent(makeMetricEvent('w-1', 'needle.worker.beads.completed', 1)); + + const snap = accumulator.getSnapshot('w-1'); + expect(snap!.beadsCompleted).toBe(2); + }); + + it('resolves needle.worker.beads.failed to needle.bead.failed', () => { + accumulator.processEvent(makeMetricEvent('w-1', 'needle.worker.beads.failed', 1)); + + const snap = accumulator.getSnapshot('w-1'); + expect(snap!.beadsFailed).toBe(1); + }); + + it('drains alias-resolved samples with canonical name', () => { + accumulator.processEvent(makeMetricEvent('w-1', 'needle.worker.beads.completed', 1)); + + const samples = accumulator.drainSamples(); + expect(samples).toHaveLength(1); + expect(samples[0].metricName).toBe('needle.bead.completed'); + }); + + it('coaccumulates alias and canonical names for the same instrument', () => { + // NEEDLE emits alias form + accumulator.processEvent(makeMetricEvent('w-1', 'needle.worker.beads.completed', 2)); + // Direct canonical form + accumulator.processEvent(makeMetricEvent('w-1', 'needle.bead.completed', 3)); + + const snap = accumulator.getSnapshot('w-1'); + expect(snap!.beadsCompleted).toBe(5); + }); + + it('passes through unknown names unchanged', () => { + expect(resolveInstrumentName('needle.worker.tokens.in')).toBe('needle.worker.tokens.in'); + expect(resolveInstrumentName('custom.metric')).toBe('custom.metric'); + }); + }); }); diff --git a/src/workerAnalytics.ts b/src/workerAnalytics.ts index 8c60aa3..9510a07 100644 --- a/src/workerAnalytics.ts +++ b/src/workerAnalytics.ts @@ -38,6 +38,21 @@ export const INSTRUMENT_NAMES = { const ALL_INSTRUMENTS = new Set(Object.values(INSTRUMENT_NAMES)); +/** + * Alias map: NEEDLE's OTLP emitter uses `needle.worker.beads.*` (plural) + * while the canonical schema uses `needle.bead.*` (singular). Both forms + * are accepted and resolved to the canonical name before accumulation. + */ +const INSTRUMENT_ALIASES: Record = { + 'needle.worker.beads.completed': INSTRUMENT_NAMES.BEAD_COMPLETED, + 'needle.worker.beads.failed': INSTRUMENT_NAMES.BEAD_FAILED, +}; + +/** Resolve an incoming metric name to its canonical instrument name. */ +export function resolveInstrumentName(name: string): string { + return INSTRUMENT_ALIASES[name] || name; +} + // ── MetricAccumulator ──────────────────────────────────────────── export interface MetricSample { @@ -85,8 +100,11 @@ export class MetricAccumulator { const msg = event.msg || ''; if (!msg.startsWith('metric.')) return; - const metricName = (event.metric_name as string) || msg.slice(7); - if (!metricName) return; + const rawName = (event.metric_name as string) || msg.slice(7); + if (!rawName) return; + + // Resolve NEEDLE's naming convention to canonical instrument name + const metricName = resolveInstrumentName(rawName); const value = typeof event.value === 'number' ? event.value : typeof event.metric_value === 'number' ? event.metric_value as number