feat(bd-zci): map OTLP metrics → analytics DB instruments

Add OTLP metric ingestion pipeline that persists Sum/Histogram/Gauge
data points to fabric.db with canonical instrument names:

- MetricAccumulator in workerAnalytics.ts accumulates per-worker
  running totals for tokens, cost, durations, bead counts
- Schema v2 in historicalStore.ts adds metric_samples and
  session_worker_summaries tables with source preference tracking
- flushMetricSamples() in store.ts drains accumulator → SQLite on
  every event, upserting session summaries and live session rows
- docs/schema.md documents instrument names and resolution order

Fix source preference ordering (otlp-metric > otlp-span > log-derived)
using CASE-based SQL sort instead of alphabetical ASC which was
inverted. Fix metricsSource detection to not require costUsd > 0.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-21 17:18:32 -04:00
parent e74149f167
commit a6fb5c9289
5 changed files with 685 additions and 7 deletions

View file

@ -177,6 +177,61 @@ Format: `category.action`. Categories group related lifecycle phases.
| `lock.priority_bump_received` | Received priority bump notification | `path`, `from_worker` |
| `lock.expired` | Lock expired | `path` |
## OTLP Metric Instruments
When NEEDLE workers emit OTLP **Metric** payloads (Sum, Histogram, Gauge), FABRIC
normalizes each data point into a `metric.{name}` event (see normalizer). The
canonical instrument names below define the mapping to analytics DB columns in
`fabric.db`.
FABRIC's Analytics Writer **prefers** OTLP metric values over log-derived
estimates when both are present for the same worker + session.
### Token & Cost Instruments
| Instrument Name | Type | Unit | DB Column(s) |
|---|---|---|---|
| `needle.worker.tokens.in` | Sum | tokens | `task_metrics.tokens_in`, `session_worker_summaries.tokens_in` |
| `needle.worker.tokens.out` | Sum | tokens | `task_metrics.tokens_out`, `session_worker_summaries.tokens_out` |
| `needle.worker.cost.usd` | Sum | USD | `task_metrics.cost`, `session_worker_summaries.cost_usd` |
### Duration Instruments
| Instrument Name | Type | Unit | DB Column(s) |
|---|---|---|---|
| `needle.bead.duration` | Histogram | ms | `task_metrics.duration_ms` |
| `needle.worker.uptime` | Gauge | ms | — (informational) |
### Counting Instruments
| Instrument Name | Type | Unit | DB Column(s) |
|---|---|---|---|
| `needle.bead.completed` | Sum | count | `session_worker_summaries.beads_completed` |
| `needle.bead.failed` | Sum | count | `session_worker_summaries.beads_failed` |
| `needle.worker.errors` | Sum | count | `session_worker_summaries.errors` |
### Attribute Requirements
Every metric data point **must** carry these OTLP attributes (namespaced form
preferred, plain form accepted as fallback):
| Attribute | Required | Purpose |
|---|---|---|
| `needle.worker.id` / `worker_id` | yes | Worker identity |
| `needle.session.id` / `session_id` | yes | Session grouping |
| `needle.bead.id` / `bead_id` | for bead-scoped metrics | Task correlation |
### Resolution Order
When querying `fabric.db`, FABRIC resolves conflicting values in this order:
1. **`otlp-metric`** — row sourced from an OTLP metric instrument (authoritative)
2. **`otlp-span`** — duration derived from span start/end times
3. **`log-derived`** — estimated from log message parsing (fallback)
The `metrics_source` column on `sessions` and `session_worker_summaries` records
which source was used.
## TypeScript Reference
The canonical TypeScript definitions live in `src/types.ts`:

View file

@ -629,5 +629,77 @@ describe('HistoricalStore', () => {
expect(gammaPerf!.totalCostUsd).toBeCloseTo(0.092);
expect(gammaPerf!.totalTokens).toBe(1650);
});
it('should prefer otlp-metric source over log-derived in getLatestWorkerMetrics', () => {
store.startSession('source-pref-sess');
// Write a log-derived sample first
store.recordMetricSample({
workerId: 'needle-delta',
metricName: INSTRUMENT_NAMES.TOKENS_IN,
value: 500,
timestamp: Date.now() - 1000,
source: 'log-derived',
});
// Write an OTLP-metric sample for the same instrument
store.recordMetricSample({
workerId: 'needle-delta',
metricName: INSTRUMENT_NAMES.TOKENS_IN,
value: 1200,
timestamp: Date.now(),
source: 'otlp-metric',
});
const latest = store.getLatestWorkerMetrics('needle-delta');
expect(latest[INSTRUMENT_NAMES.TOKENS_IN]).toBeDefined();
expect(latest[INSTRUMENT_NAMES.TOKENS_IN].value).toBe(1200);
expect(latest[INSTRUMENT_NAMES.TOKENS_IN].source).toBe('otlp-metric');
store.endSession({ workerCount: 1, taskCount: 0, totalCost: 0, totalTokens: 0 });
});
it('should mark metricsSource as otlp-metric even without cost data', () => {
store.startSession('no-cost-sess');
const accumulator = new MetricAccumulator();
// Only send token metrics — no cost
const logEvent1 = normalizeToLogEvent({
name: INSTRUMENT_NAMES.TOKENS_IN,
timeUnixNano: String(Date.now() * 1_000_000),
asDouble: 800,
attributes: [
{ key: 'worker_id', value: { stringValue: 'needle-epsilon' } },
{ key: 'session_id', value: { stringValue: 'no-cost-sess' } },
],
}, 'otlp-metric');
accumulator.processEvent(logEvent1!);
const snap = accumulator.getSnapshot('needle-epsilon');
expect(snap).not.toBeNull();
// costUsd should be 0 but source should still be otlp-metric
expect(snap!.costUsd).toBe(0);
expect(snap!.tokensIn).toBe(800);
store.upsertSessionWorkerSummary({
workerId: 'needle-epsilon',
tokensIn: snap!.tokensIn,
tokensOut: snap!.tokensOut,
costUsd: snap!.costUsd,
beadsCompleted: snap!.beadsCompleted,
beadsFailed: snap!.beadsFailed,
errors: snap!.errors,
metricsSource: snap ? 'otlp-metric' : 'log-derived',
});
const summaries = store.getSessionWorkerSummaries({ sessionId: 'no-cost-sess' });
expect(summaries).toHaveLength(1);
expect(summaries[0].metrics_source).toBe('otlp-metric');
expect(summaries[0].tokens_in).toBe(800);
expect(summaries[0].cost_usd).toBe(0);
store.endSession({ workerCount: 1, taskCount: 0, totalCost: 0, totalTokens: 800, metricsSource: 'otlp-metric' });
});
});
});

View file

@ -32,6 +32,7 @@ export interface SessionRecord {
task_count: number;
total_cost: number;
total_tokens: number;
metrics_source?: string;
}
/**
@ -125,7 +126,7 @@ export interface LearnedRecoveryEntry {
// Database Schema
// ============================================
const SCHEMA_VERSION = 1;
const SCHEMA_VERSION = 2;
const CREATE_SESSIONS_TABLE = `
CREATE TABLE IF NOT EXISTS sessions (
@ -190,6 +191,49 @@ CREATE TABLE IF NOT EXISTS schema_version (
);
`;
const CREATE_METRIC_SAMPLES_TABLE = `
CREATE TABLE IF NOT EXISTS metric_samples (
id INTEGER PRIMARY KEY AUTOINCREMENT,
worker_id TEXT NOT NULL,
metric_name TEXT NOT NULL,
value REAL NOT NULL,
timestamp INTEGER NOT NULL,
source TEXT NOT NULL DEFAULT 'otlp-metric',
bead_id TEXT,
session_id TEXT
);
CREATE INDEX IF NOT EXISTS idx_metric_samples_worker ON metric_samples(worker_id);
CREATE INDEX IF NOT EXISTS idx_metric_samples_name ON metric_samples(metric_name);
CREATE INDEX IF NOT EXISTS idx_metric_samples_timestamp ON metric_samples(timestamp);
`;
const CREATE_SESSION_WORKER_SUMMARIES_TABLE = `
CREATE TABLE IF NOT EXISTS session_worker_summaries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
worker_id TEXT NOT NULL,
tokens_in INTEGER NOT NULL DEFAULT 0,
tokens_out INTEGER NOT NULL DEFAULT 0,
cost_usd REAL NOT NULL DEFAULT 0,
beads_completed INTEGER NOT NULL DEFAULT 0,
beads_failed INTEGER NOT NULL DEFAULT 0,
errors INTEGER NOT NULL DEFAULT 0,
metrics_source TEXT NOT NULL DEFAULT 'log-derived',
updated_at INTEGER NOT NULL,
UNIQUE(session_id, worker_id)
);
CREATE INDEX IF NOT EXISTS idx_sws_session ON session_worker_summaries(session_id);
CREATE INDEX IF NOT EXISTS idx_sws_worker ON session_worker_summaries(worker_id);
CREATE INDEX IF NOT EXISTS idx_sws_source ON session_worker_summaries(metrics_source);
`;
// Schema v2 migration: add metrics_source column to sessions if missing
const MIGRATE_V2_ADD_METRICS_SOURCE = `
ALTER TABLE sessions ADD COLUMN metrics_source TEXT NOT NULL DEFAULT 'log-derived';
`;
// ============================================
// Historical Store Class
// ============================================
@ -239,13 +283,27 @@ export class HistoricalStore {
const versionRow = this.db.prepare('SELECT version FROM schema_version').get() as { version: number } | undefined;
const currentVersion = versionRow?.version || 0;
if (currentVersion < SCHEMA_VERSION) {
// Run schema migrations
if (currentVersion < 1) {
// Initial schema (v1)
this.db.exec(CREATE_SESSIONS_TABLE);
this.db.exec(CREATE_TASK_METRICS_TABLE);
this.db.exec(CREATE_ERROR_HISTORY_TABLE);
}
// Update version
if (currentVersion < 2) {
// v2: metric_samples, session_worker_summaries, metrics_source on sessions
this.db.exec(CREATE_METRIC_SAMPLES_TABLE);
this.db.exec(CREATE_SESSION_WORKER_SUMMARIES_TABLE);
// Add metrics_source column to sessions (may already exist if v1 ran fresh)
try {
this.db.exec(MIGRATE_V2_ADD_METRICS_SOURCE);
} catch {
// Column already exists — safe to ignore
}
}
// Update version
if (currentVersion < SCHEMA_VERSION) {
this.db.prepare('INSERT OR REPLACE INTO schema_version (version) VALUES (?)').run(SCHEMA_VERSION);
}
}
@ -276,14 +334,16 @@ export class HistoricalStore {
taskCount: number;
totalCost: number;
totalTokens: number;
metricsSource?: string;
}): void {
if (!this.currentSessionId) return;
const endTime = Date.now();
const source = metrics.metricsSource || 'log-derived';
this.db.prepare(`
UPDATE sessions
SET ended_at = ?, worker_count = ?, task_count = ?, total_cost = ?, total_tokens = ?
SET ended_at = ?, worker_count = ?, task_count = ?, total_cost = ?, total_tokens = ?, metrics_source = ?
WHERE id = ?
`).run(
endTime,
@ -291,6 +351,7 @@ export class HistoricalStore {
metrics.taskCount,
metrics.totalCost,
metrics.totalTokens,
source,
this.currentSessionId
);
@ -392,6 +453,225 @@ export class HistoricalStore {
`).run(resolution, successful ? 1 : 0, errorId);
}
// ============================================
// OTLP Metric Persistence
// ============================================
/**
* Record a raw metric sample from OTLP ingestion.
*/
recordMetricSample(sample: {
workerId: string;
metricName: string;
value: number;
timestamp: number;
source: string;
beadId?: string;
}): void {
if (!this.currentSessionId) {
this.startSession();
}
this.db.prepare(`
INSERT INTO metric_samples (worker_id, metric_name, value, timestamp, source, bead_id, session_id)
VALUES (?, ?, ?, ?, ?, ?, ?)
`).run(
sample.workerId,
sample.metricName,
sample.value,
sample.timestamp,
sample.source,
sample.beadId || null,
this.currentSessionId,
);
}
/**
* Upsert per-worker session summary from metric accumulator snapshots.
* When metric-sourced data is available it overwrites log-derived estimates.
*/
upsertSessionWorkerSummary(summary: {
workerId: string;
tokensIn: number;
tokensOut: number;
costUsd: number;
beadsCompleted: number;
beadsFailed: number;
errors: number;
metricsSource: string;
}): void {
if (!this.currentSessionId) {
this.startSession();
}
this.db.prepare(`
INSERT INTO session_worker_summaries
(session_id, worker_id, tokens_in, tokens_out, cost_usd, beads_completed, beads_failed, errors, metrics_source, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(session_id, worker_id) DO UPDATE SET
tokens_in = excluded.tokens_in,
tokens_out = excluded.tokens_out,
cost_usd = excluded.cost_usd,
beads_completed = excluded.beads_completed,
beads_failed = excluded.beads_failed,
errors = excluded.errors,
metrics_source = excluded.metrics_source,
updated_at = excluded.updated_at
`).run(
this.currentSessionId,
summary.workerId,
summary.tokensIn,
summary.tokensOut,
summary.costUsd,
summary.beadsCompleted,
summary.beadsFailed,
summary.errors,
summary.metricsSource,
Date.now(),
);
}
/**
* Update the live (current) session row with aggregated metrics.
*/
updateLiveSession(metrics: {
workerCount: number;
taskCount: number;
totalCost: number;
totalTokens: number;
metricsSource: string;
}): void {
if (!this.currentSessionId) return;
this.db.prepare(`
UPDATE sessions
SET worker_count = ?, task_count = ?, total_cost = ?, total_tokens = ?, metrics_source = ?, ended_at = ?
WHERE id = ?
`).run(
metrics.workerCount,
metrics.taskCount,
metrics.totalCost,
metrics.totalTokens,
metrics.metricsSource,
Date.now(),
this.currentSessionId,
);
}
/**
* Get session worker summaries, preferring metric-sourced rows.
*/
getSessionWorkerSummaries(options: { sessionId?: string; workerId?: string } = {}): Array<{
sessionId: string;
workerId: string;
tokensIn: number;
tokensOut: number;
costUsd: number;
beadsCompleted: number;
beadsFailed: number;
errors: number;
metricsSource: string;
}> {
let query = 'SELECT * FROM session_worker_summaries WHERE 1=1';
const params: (string | number)[] = [];
if (options.sessionId) {
query += ' AND session_id = ?';
params.push(options.sessionId);
}
if (options.workerId) {
query += ' AND worker_id = ?';
params.push(options.workerId);
}
query += ' ORDER BY updated_at DESC';
return this.db.prepare(query).all(...params) as Array<{
id: number;
session_id: string;
worker_id: string;
tokens_in: number;
tokens_out: number;
cost_usd: number;
beads_completed: number;
beads_failed: number;
errors: number;
metrics_source: string;
updated_at: number;
}>;
}
/**
* Query metric samples for a given time range / worker / instrument.
* Returns metric-sourced rows first, then log-derived rows.
*/
getMetricSamples(options: {
workerId?: string;
metricName?: string;
startTime?: number;
endTime?: number;
limit?: number;
} = {}): Array<{
workerId: string;
metricName: string;
value: number;
timestamp: number;
source: string;
beadId: string | null;
sessionId: string | null;
}> {
let query = 'SELECT * FROM metric_samples WHERE 1=1';
const params: (string | number)[] = [];
if (options.workerId) {
query += ' AND worker_id = ?';
params.push(options.workerId);
}
if (options.metricName) {
query += ' AND metric_name = ?';
params.push(options.metricName);
}
if (options.startTime) {
query += ' AND timestamp >= ?';
params.push(options.startTime);
}
if (options.endTime) {
query += ' AND timestamp <= ?';
params.push(options.endTime);
}
query += ` ORDER BY CASE source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END, timestamp DESC LIMIT ?`;
params.push(options.limit || 1000);
return this.db.prepare(query).all(...params) as any[];
}
/**
* Get the latest aggregated values for each instrument for a worker.
* Prefers OTLP-metric-sourced data over log-derived.
*/
getLatestWorkerMetrics(workerId: string): Record<string, { value: number; source: string; timestamp: number }> {
const rows = this.db.prepare(`
SELECT metric_name, value, source, timestamp
FROM metric_samples
WHERE worker_id = ?
ORDER BY CASE source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END, timestamp DESC
`).all(workerId) as Array<{ metric_name: string; value: number; source: string; timestamp: number }>;
// Keep the first occurrence per metric_name (which is the preferred source)
const result: Record<string, { value: number; source: string; timestamp: number }> = {};
for (const row of rows) {
if (!(row.metric_name in result)) {
result[row.metric_name] = {
value: row.value,
source: row.source,
timestamp: row.timestamp,
};
}
}
return result;
}
// ============================================
// Query Methods
// ============================================
@ -722,8 +1002,49 @@ export class HistoricalStore {
// Get sessions in range
const sessions = this.getSingsInRange(startTime, endTime);
const sessionIds = sessions.map(s => s.id);
// Get task metrics in range
// Fetch metric-sourced worker summaries for these sessions
// These are preferred over log-derived task_metrics when available
const metricWorkerMap = new Map<string, {
tokensIn: number;
tokensOut: number;
costUsd: number;
beadsCompleted: number;
beadsFailed: number;
errors: number;
}>();
if (sessionIds.length > 0) {
const summaries = this.db.prepare(`
SELECT worker_id, tokens_in, tokens_out, cost_usd, beads_completed, beads_failed, errors, metrics_source
FROM session_worker_summaries
WHERE session_id IN (${sessionIds.map(() => '?').join(',')})
AND metrics_source = 'otlp-metric'
`).all(...sessionIds) as Array<{
worker_id: string;
tokens_in: number;
tokens_out: number;
cost_usd: number;
beads_completed: number;
beads_failed: number;
errors: number;
metrics_source: string;
}>;
for (const s of summaries) {
metricWorkerMap.set(s.worker_id, {
tokensIn: s.tokens_in,
tokensOut: s.tokens_out,
costUsd: s.cost_usd,
beadsCompleted: s.beads_completed,
beadsFailed: s.beads_failed,
errors: s.errors,
});
}
}
// Get task metrics in range (fallback / log-derived)
const tasks = this.db.prepare(`
SELECT * FROM task_metrics
WHERE started_at >= ? AND ended_at <= ?
@ -736,6 +1057,7 @@ export class HistoricalStore {
cost: number;
tokens: number;
completionTimes: number[];
hasMetricSource: boolean;
}>();
let totalBeadsCompleted = 0;
@ -754,6 +1076,7 @@ export class HistoricalStore {
cost: 0,
tokens: 0,
completionTimes: [],
hasMetricSource: metricWorkerMap.has(task.worker_id),
};
workerMap.set(task.worker_id, worker);
}
@ -775,6 +1098,48 @@ export class HistoricalStore {
worker.tokens += task.tokens_in + task.tokens_out;
}
// For workers with metric-sourced summaries, override the log-derived values
for (const [workerId, metricData] of metricWorkerMap) {
const worker = workerMap.get(workerId);
if (worker) {
// Replace log-derived estimates with metric-sourced values
worker.tasksCompleted = metricData.beadsCompleted;
worker.errors = metricData.errors;
worker.cost = metricData.costUsd;
worker.tokens = metricData.tokensIn + metricData.tokensOut;
worker.hasMetricSource = true;
} else {
// Worker has metric data but no task_metrics entries yet
workerMap.set(workerId, {
tasksCompleted: metricData.beadsCompleted,
errors: metricData.errors,
cost: metricData.costUsd,
tokens: metricData.tokensIn + metricData.tokensOut,
completionTimes: [],
hasMetricSource: true,
});
}
}
// Recalculate totals from the (possibly overridden) worker map
totalBeadsCompleted = 0;
totalErrors = 0;
totalCostUsd = 0;
totalTokens = 0;
totalCompletionTime = 0;
successCount = 0;
for (const worker of workerMap.values()) {
totalBeadsCompleted += worker.tasksCompleted;
totalErrors += worker.errors;
totalCostUsd += worker.cost;
totalTokens += worker.tokens;
successCount += worker.tasksCompleted;
for (const d of worker.completionTimes) {
totalCompletionTime += d;
}
}
const totalTimeMs = endTime - startTime;
const totalTimeHours = totalTimeMs / 3600000;
const avgBeadsPerHour = totalTimeHours > 0 ? totalBeadsCompleted / totalTimeHours : 0;
@ -898,6 +1263,8 @@ export class HistoricalStore {
* Clear all historical data
*/
clear(): void {
this.db.exec('DELETE FROM metric_samples');
this.db.exec('DELETE FROM session_worker_summaries');
this.db.exec('DELETE FROM error_history');
this.db.exec('DELETE FROM task_metrics');
this.db.exec('DELETE FROM sessions');

View file

@ -299,7 +299,7 @@ export class InMemoryEventStore implements EventStore {
beadsCompleted: metricSnap?.beadsCompleted ?? wm.beadsCompleted,
beadsFailed: metricSnap?.beadsFailed ?? 0,
errors: metricSnap?.errors ?? wm.errorCount,
metricsSource: metricSnap && metricSnap.costUsd > 0 ? 'otlp-metric' : 'log-derived',
metricsSource: metricSnap ? 'otlp-metric' : 'log-derived',
});
}
}

View file

@ -23,6 +23,175 @@ import {
import { CostTracker } from './tui/utils/costTracking.js';
import { getHistoricalStore, HistoricalStore, WorkerComparisonMetrics } from './historicalStore.js';
// ── Canonical OTLP metric instrument names ───────────────────────
export const INSTRUMENT_NAMES = {
TOKENS_IN: 'needle.worker.tokens.in',
TOKENS_OUT: 'needle.worker.tokens.out',
COST_USD: 'needle.worker.cost.usd',
BEAD_DURATION: 'needle.bead.duration',
BEAD_COMPLETED: 'needle.bead.completed',
BEAD_FAILED: 'needle.bead.failed',
WORKER_ERRORS: 'needle.worker.errors',
WORKER_UPTIME: 'needle.worker.uptime',
} as const;
const ALL_INSTRUMENTS = new Set(Object.values(INSTRUMENT_NAMES));
// ── MetricAccumulator ────────────────────────────────────────────
export interface MetricSample {
workerId: string;
metricName: string;
value: number;
timestamp: number;
beadId?: string;
}
export interface WorkerMetricSnapshot {
tokensIn: number;
tokensOut: number;
costUsd: number;
beadsCompleted: number;
beadsFailed: number;
errors: number;
durations: number[];
}
/**
* Accumulates OTLP metric data points, keyed by (worker, instrument).
* drainSamples() is called periodically by the store to flush to SQLite.
*/
export class MetricAccumulator {
private samples: MetricSample[] = [];
private hasData = false;
/** Per-worker running totals for cumulative instruments */
private workerTotals = new Map<string, Map<string, number>>();
/** Per-worker bead duration samples */
private workerDurations = new Map<string, number[]>();
/** Per-worker bead completion/failure counts */
private workerBeadCompleted = new Map<string, number>();
private workerBeadFailed = new Map<string, number>();
private workerErrors = new Map<string, number>();
/**
* Ingest a LogEvent that was produced by normalizing an OTLP metric.
* event.msg is "metric.<name>" and event.value / event.metric_name carry the payload.
*/
processEvent(event: LogEvent): void {
const msg = event.msg || '';
if (!msg.startsWith('metric.')) return;
const metricName = (event.metric_name as string) || msg.slice(7);
if (!metricName) return;
const value = typeof event.value === 'number' ? event.value
: typeof event.metric_value === 'number' ? event.metric_value as number
: undefined;
if (value === undefined) return;
this.hasData = true;
const sample: MetricSample = {
workerId: event.worker,
metricName,
value,
timestamp: event.ts,
};
if (event.bead) sample.beadId = event.bead;
this.samples.push(sample);
// Update per-worker running totals
if (!this.workerTotals.has(event.worker)) {
this.workerTotals.set(event.worker, new Map());
}
const totals = this.workerTotals.get(event.worker)!;
switch (metricName) {
case INSTRUMENT_NAMES.TOKENS_IN:
case INSTRUMENT_NAMES.TOKENS_OUT:
case INSTRUMENT_NAMES.COST_USD:
totals.set(metricName, (totals.get(metricName) || 0) + value);
break;
case INSTRUMENT_NAMES.BEAD_DURATION: {
let durations = this.workerDurations.get(event.worker);
if (!durations) {
durations = [];
this.workerDurations.set(event.worker, durations);
}
durations.push(value);
break;
}
case INSTRUMENT_NAMES.BEAD_COMPLETED:
this.workerBeadCompleted.set(event.worker,
(this.workerBeadCompleted.get(event.worker) || 0) + value);
break;
case INSTRUMENT_NAMES.BEAD_FAILED:
this.workerBeadFailed.set(event.worker,
(this.workerBeadFailed.get(event.worker) || 0) + value);
break;
case INSTRUMENT_NAMES.WORKER_ERRORS:
this.workerErrors.set(event.worker,
(this.workerErrors.get(event.worker) || 0) + value);
break;
}
}
/**
* Drain buffered samples for SQLite persistence.
* Returns the samples and clears the buffer.
*/
drainSamples(): MetricSample[] {
const drained = this.samples;
this.samples = [];
return drained;
}
/**
* Whether any OTLP metric data has been received.
*/
hasMetricData(): boolean {
return this.hasData;
}
/**
* Get a snapshot of accumulated metric values for a specific worker.
* Returns null if no metric data exists for this worker.
*/
getSnapshot(workerId: string): WorkerMetricSnapshot | null {
const totals = this.workerTotals.get(workerId);
if (!totals && !this.workerDurations.has(workerId)
&& !this.workerBeadCompleted.has(workerId)) {
return null;
}
return {
tokensIn: totals?.get(INSTRUMENT_NAMES.TOKENS_IN) || 0,
tokensOut: totals?.get(INSTRUMENT_NAMES.TOKENS_OUT) || 0,
costUsd: totals?.get(INSTRUMENT_NAMES.COST_USD) || 0,
beadsCompleted: this.workerBeadCompleted.get(workerId) || 0,
beadsFailed: this.workerBeadFailed.get(workerId) || 0,
errors: this.workerErrors.get(workerId) || 0,
durations: this.workerDurations.get(workerId) || [],
};
}
/** Reset all accumulated data. */
reset(): void {
this.samples = [];
this.hasData = false;
this.workerTotals.clear();
this.workerDurations.clear();
this.workerBeadCompleted.clear();
this.workerBeadFailed.clear();
this.workerErrors.clear();
}
}
const DEFAULT_OPTIONS: Required<WorkerAnalyticsOptions> = {
timeWindow: 'all',
startTime: 0,
@ -72,10 +241,12 @@ export class WorkerAnalytics implements WorkerAnalyticsStore {
private costTracker: CostTracker;
private timeSeriesInterval: number;
private lastSnapshotTime: number = 0;
private metricAccumulator: MetricAccumulator;
constructor(costTracker?: CostTracker, timeSeriesInterval: number = 3600000) {
this.costTracker = costTracker || new CostTracker();
this.timeSeriesInterval = timeSeriesInterval;
this.metricAccumulator = new MetricAccumulator();
}
/**
@ -116,6 +287,11 @@ export class WorkerAnalytics implements WorkerAnalyticsStore {
worker.totalTokens = workerCost.total;
}
// Feed OTLP metric events to the accumulator
if (event.msg?.startsWith('metric.')) {
this.metricAccumulator.processEvent(event);
}
// Periodic time-series snapshot
this.maybeCreateSnapshot(event.ts);
}
@ -336,6 +512,7 @@ export class WorkerAnalytics implements WorkerAnalyticsStore {
clear(): void {
this.workers.clear();
this.costTracker.reset();
this.metricAccumulator.reset();
this.lastSnapshotTime = 0;
}
@ -346,6 +523,13 @@ export class WorkerAnalytics implements WorkerAnalyticsStore {
return this.costTracker;
}
/**
* Get the MetricAccumulator for OTLP metric data
*/
getMetricAccumulator(): MetricAccumulator {
return this.metricAccumulator;
}
/**
* Get analytics summary as formatted string
*/