feat(bd-zci): add instrument alias resolution and source-priority upserts

- Add INSTRUMENT_ALIASES map resolving NEEDLE's plural naming
  (needle.worker.beads.*) to canonical singular (needle.bead.*)
- Source-priority SQL in upsertSessionWorkerSummary: otlp-metric rows
  survive lower-priority log-derived overwrites via CASE expressions
- Prefer OTLP metric snapshots over log-derived estimates in task
  recording (flushMetricSamples + persistSession)
- Document accepted aliases in docs/schema.md
- Add tests for alias resolution, coaccumulation, and priority protection

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-21 18:16:39 -04:00
parent f661430fce
commit af1560fba1
6 changed files with 213 additions and 32 deletions

View file

@ -210,6 +210,17 @@ estimates when both are present for the same worker + session.
| `needle.bead.failed` | Sum | count | `session_worker_summaries.beads_failed` |
| `needle.worker.errors` | Sum | count | `session_worker_summaries.errors` |
### Accepted Aliases
NEEDLE's telemetry module (`OtlpMetricSink`) emits some instruments under
slightly different names. FABRIC resolves these to the canonical names above
via the `INSTRUMENT_ALIASES` map in `src/workerAnalytics.ts`:
| NEEDLE Emitted Name | Canonical Name |
|---|---|
| `needle.worker.beads.completed` | `needle.bead.completed` |
| `needle.worker.beads.failed` | `needle.bead.failed` |
### Attribute Requirements
Every metric data point **must** carry these OTLP attributes (namespaced form

View file

@ -703,5 +703,61 @@ describe('HistoricalStore', () => {
store.endSession({ workerCount: 1, taskCount: 0, totalCost: 0, totalTokens: 800, metricsSource: 'otlp-metric' });
});
it('should protect otlp-metric rows from log-derived overwrites', () => {
store.startSession('priority-sess');
// Write the authoritative metric-sourced row first
store.upsertSessionWorkerSummary({
workerId: 'needle-zeta',
tokensIn: 2000,
tokensOut: 800,
costUsd: 0.12,
beadsCompleted: 6,
beadsFailed: 0,
errors: 0,
metricsSource: 'otlp-metric',
});
// Attempt to overwrite with lower-priority log-derived data
store.upsertSessionWorkerSummary({
workerId: 'needle-zeta',
tokensIn: 500,
tokensOut: 200,
costUsd: 0.05,
beadsCompleted: 2,
beadsFailed: 1,
errors: 1,
metricsSource: 'log-derived',
});
const summaries = store.getSessionWorkerSummaries({ sessionId: 'priority-sess' });
expect(summaries).toHaveLength(1);
const s = summaries[0];
// Metric-sourced values must survive the log-derived overwrite
expect(s.metricsSource).toBe('otlp-metric');
expect(s.tokensIn).toBe(2000);
expect(s.tokensOut).toBe(800);
expect(s.costUsd).toBeCloseTo(0.12);
expect(s.beadsCompleted).toBe(6);
// A second metric-sourced write should still update (equal priority)
store.upsertSessionWorkerSummary({
workerId: 'needle-zeta',
tokensIn: 2500,
tokensOut: 1000,
costUsd: 0.15,
beadsCompleted: 8,
beadsFailed: 0,
errors: 0,
metricsSource: 'otlp-metric',
});
const updated = store.getSessionWorkerSummaries({ sessionId: 'priority-sess' });
expect(updated[0].tokensIn).toBe(2500);
expect(updated[0].beadsCompleted).toBe(8);
store.endSession({ workerCount: 1, taskCount: 8, totalCost: 0.15, totalTokens: 3500, metricsSource: 'otlp-metric' });
});
});
});

View file

@ -504,18 +504,23 @@ export class HistoricalStore {
this.startSession();
}
// Source priority: otlp-metric (0) > otlp-span (1) > log-derived (2).
// Only overwrite existing row when the incoming source has equal or higher priority.
const incomingRank = summary.metricsSource === 'otlp-metric' ? 0
: summary.metricsSource === 'otlp-span' ? 1 : 2;
this.db.prepare(`
INSERT INTO session_worker_summaries
(session_id, worker_id, tokens_in, tokens_out, cost_usd, beads_completed, beads_failed, errors, metrics_source, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(session_id, worker_id) DO UPDATE SET
tokens_in = excluded.tokens_in,
tokens_out = excluded.tokens_out,
cost_usd = excluded.cost_usd,
beads_completed = excluded.beads_completed,
beads_failed = excluded.beads_failed,
errors = excluded.errors,
metrics_source = excluded.metrics_source,
tokens_in = CASE WHEN ${incomingRank} <= CASE metrics_source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END THEN excluded.tokens_in ELSE tokens_in END,
tokens_out = CASE WHEN ${incomingRank} <= CASE metrics_source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END THEN excluded.tokens_out ELSE tokens_out END,
cost_usd = CASE WHEN ${incomingRank} <= CASE metrics_source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END THEN excluded.cost_usd ELSE cost_usd END,
beads_completed = CASE WHEN ${incomingRank} <= CASE metrics_source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END THEN excluded.beads_completed ELSE beads_completed END,
beads_failed = CASE WHEN ${incomingRank} <= CASE metrics_source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END THEN excluded.beads_failed ELSE beads_failed END,
errors = CASE WHEN ${incomingRank} <= CASE metrics_source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END THEN excluded.errors ELSE errors END,
metrics_source = CASE WHEN ${incomingRank} <= CASE metrics_source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END THEN excluded.metrics_source ELSE metrics_source END,
updated_at = excluded.updated_at
`).run(
this.currentSessionId,
@ -655,7 +660,26 @@ export class HistoricalStore {
query += ` ORDER BY CASE source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END, timestamp DESC LIMIT ?`;
params.push(options.limit || 1000);
return this.db.prepare(query).all(...params) as any[];
const rows = this.db.prepare(query).all(...params) as Array<{
id: number;
worker_id: string;
metric_name: string;
value: number;
timestamp: number;
source: string;
bead_id: string | null;
session_id: string | null;
}>;
return rows.map(row => ({
workerId: row.worker_id,
metricName: row.metric_name,
value: row.value,
timestamp: row.timestamp,
source: row.source,
beadId: row.bead_id,
sessionId: row.session_id,
}));
}
/**

View file

@ -284,22 +284,25 @@ export class InMemoryEventStore implements EventStore {
});
}
// Upsert per-worker session summaries from the accumulator snapshots
// Upsert per-worker session summaries from the accumulator snapshots.
// Only write rows for workers that have actual OTLP metric snapshots —
// the upsert will protect any existing metric-sourced rows from
// lower-priority log-derived overwrites.
const hasMetricData = accumulator.hasMetricData();
if (hasMetricData) {
// Get all workers that have metric snapshots
const allWorkerMetrics = this.workerAnalytics.getAllWorkerMetrics({ timeWindow: 'all' });
for (const wm of allWorkerMetrics) {
const metricSnap = accumulator.getSnapshot(wm.workerId);
if (!metricSnap) continue;
this.historicalStore.upsertSessionWorkerSummary({
workerId: wm.workerId,
tokensIn: metricSnap?.tokensIn ?? Math.floor(wm.totalTokens * 0.7),
tokensOut: metricSnap?.tokensOut ?? Math.floor(wm.totalTokens * 0.3),
costUsd: metricSnap?.costUsd ?? wm.totalCostUsd,
beadsCompleted: metricSnap?.beadsCompleted ?? wm.beadsCompleted,
beadsFailed: metricSnap?.beadsFailed ?? 0,
errors: metricSnap?.errors ?? wm.errorCount,
metricsSource: metricSnap ? 'otlp-metric' : 'log-derived',
tokensIn: metricSnap.tokensIn,
tokensOut: metricSnap.tokensOut,
costUsd: metricSnap.costUsd,
beadsCompleted: metricSnap.beadsCompleted,
beadsFailed: metricSnap.beadsFailed,
errors: metricSnap.errors,
metricsSource: 'otlp-metric',
});
}
}
@ -349,19 +352,34 @@ export class InMemoryEventStore implements EventStore {
const workerEvents = this.events.filter(e => e.bead === beadId);
const workerId = workerEvents[0]?.worker || 'unknown';
// Get cost info for this task
const costSummary = this.workerAnalytics.getAllWorkerMetrics({ workerIds: [workerId] });
const workerCost = costSummary[0]?.totalCostUsd || 0;
const workerTokens = costSummary[0]?.totalTokens || 0;
// Prefer OTLP metric snapshot over log-derived estimates
const accumulator = this.workerAnalytics.getMetricAccumulator();
const metricSnap = accumulator.getSnapshot(workerId);
let tokensIn: number;
let tokensOut: number;
let cost: number;
if (metricSnap) {
tokensIn = metricSnap.tokensIn;
tokensOut = metricSnap.tokensOut;
cost = metricSnap.costUsd;
} else {
const costSummary = this.workerAnalytics.getAllWorkerMetrics({ workerIds: [workerId] });
cost = costSummary[0]?.totalCostUsd || 0;
const workerTokens = costSummary[0]?.totalTokens || 0;
tokensIn = Math.floor(workerTokens * 0.7);
tokensOut = Math.floor(workerTokens * 0.3);
}
this.historicalStore.recordTask({
workerId,
taskType: 'bead',
startedAt: startTime,
endedAt: completionEvent.ts,
cost: workerCost,
tokensIn: Math.floor(workerTokens * 0.7), // Estimate
tokensOut: Math.floor(workerTokens * 0.3), // Estimate
cost,
tokensIn,
tokensOut,
success: completionEvent.level !== 'error',
retryCount: 0,
});
@ -574,11 +592,25 @@ export class InMemoryEventStore implements EventStore {
if (startTime) {
const durationMs = event.ts - startTime;
// Get cost info for this worker
const workerMetrics = this.workerAnalytics.getWorkerMetrics(event.worker);
const cost = workerMetrics?.costPerBead || 0;
const tokensIn = Math.floor((workerMetrics?.totalTokens || 0) * 0.7);
const tokensOut = Math.floor((workerMetrics?.totalTokens || 0) * 0.3);
// Prefer OTLP metric snapshot over log-derived estimates
const accumulator = this.workerAnalytics.getMetricAccumulator();
const metricSnap = accumulator.getSnapshot(event.worker);
let tokensIn: number;
let tokensOut: number;
let cost: number;
if (metricSnap) {
tokensIn = metricSnap.tokensIn;
tokensOut = metricSnap.tokensOut;
cost = metricSnap.costUsd;
} else {
// Fallback: log-derived estimate with 70/30 split
const workerMetrics = this.workerAnalytics.getWorkerMetrics(event.worker);
cost = workerMetrics?.costPerBead || 0;
tokensIn = Math.floor((workerMetrics?.totalTokens || 0) * 0.7);
tokensOut = Math.floor((workerMetrics?.totalTokens || 0) * 0.3);
}
this.historicalStore.recordTask({
workerId: event.worker,

View file

@ -4,7 +4,7 @@
import { describe, it, expect, beforeEach } from 'vitest';
import { WorkerAnalytics } from './workerAnalytics.js';
import { MetricAccumulator, INSTRUMENT_NAMES } from './workerAnalytics.js';
import { MetricAccumulator, INSTRUMENT_NAMES, resolveInstrumentName } from './workerAnalytics.js';
import { LogEvent } from './types.js';
import { CostTracker } from './tui/utils/costTracking.js';
@ -733,4 +733,44 @@ describe('MetricAccumulator', () => {
accumulator.processEvent(event);
expect(accumulator.getSnapshot('w-1')!.tokensIn).toBe(42);
});
describe('alias resolution (NEEDLE naming convention)', () => {
it('resolves needle.worker.beads.completed to needle.bead.completed', () => {
accumulator.processEvent(makeMetricEvent('w-1', 'needle.worker.beads.completed', 1));
accumulator.processEvent(makeMetricEvent('w-1', 'needle.worker.beads.completed', 1));
const snap = accumulator.getSnapshot('w-1');
expect(snap!.beadsCompleted).toBe(2);
});
it('resolves needle.worker.beads.failed to needle.bead.failed', () => {
accumulator.processEvent(makeMetricEvent('w-1', 'needle.worker.beads.failed', 1));
const snap = accumulator.getSnapshot('w-1');
expect(snap!.beadsFailed).toBe(1);
});
it('drains alias-resolved samples with canonical name', () => {
accumulator.processEvent(makeMetricEvent('w-1', 'needle.worker.beads.completed', 1));
const samples = accumulator.drainSamples();
expect(samples).toHaveLength(1);
expect(samples[0].metricName).toBe('needle.bead.completed');
});
it('coaccumulates alias and canonical names for the same instrument', () => {
// NEEDLE emits alias form
accumulator.processEvent(makeMetricEvent('w-1', 'needle.worker.beads.completed', 2));
// Direct canonical form
accumulator.processEvent(makeMetricEvent('w-1', 'needle.bead.completed', 3));
const snap = accumulator.getSnapshot('w-1');
expect(snap!.beadsCompleted).toBe(5);
});
it('passes through unknown names unchanged', () => {
expect(resolveInstrumentName('needle.worker.tokens.in')).toBe('needle.worker.tokens.in');
expect(resolveInstrumentName('custom.metric')).toBe('custom.metric');
});
});
});

View file

@ -38,6 +38,21 @@ export const INSTRUMENT_NAMES = {
const ALL_INSTRUMENTS = new Set(Object.values(INSTRUMENT_NAMES));
/**
* Alias map: NEEDLE's OTLP emitter uses `needle.worker.beads.*` (plural)
* while the canonical schema uses `needle.bead.*` (singular). Both forms
* are accepted and resolved to the canonical name before accumulation.
*/
const INSTRUMENT_ALIASES: Record<string, string> = {
'needle.worker.beads.completed': INSTRUMENT_NAMES.BEAD_COMPLETED,
'needle.worker.beads.failed': INSTRUMENT_NAMES.BEAD_FAILED,
};
/** Resolve an incoming metric name to its canonical instrument name. */
export function resolveInstrumentName(name: string): string {
return INSTRUMENT_ALIASES[name] || name;
}
// ── MetricAccumulator ────────────────────────────────────────────
export interface MetricSample {
@ -85,8 +100,11 @@ export class MetricAccumulator {
const msg = event.msg || '';
if (!msg.startsWith('metric.')) return;
const metricName = (event.metric_name as string) || msg.slice(7);
if (!metricName) return;
const rawName = (event.metric_name as string) || msg.slice(7);
if (!rawName) return;
// Resolve NEEDLE's naming convention to canonical instrument name
const metricName = resolveInstrumentName(rawName);
const value = typeof event.value === 'number' ? event.value
: typeof event.metric_value === 'number' ? event.metric_value as number