From a6fb5c9289003318df84089c8ff9ae296125def6 Mon Sep 17 00:00:00 2001 From: jedarden Date: Tue, 21 Apr 2026 17:18:32 -0400 Subject: [PATCH] =?UTF-8?q?feat(bd-zci):=20map=20OTLP=20metrics=20?= =?UTF-8?q?=E2=86=92=20analytics=20DB=20instruments?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add OTLP metric ingestion pipeline that persists Sum/Histogram/Gauge data points to fabric.db with canonical instrument names: - MetricAccumulator in workerAnalytics.ts accumulates per-worker running totals for tokens, cost, durations, bead counts - Schema v2 in historicalStore.ts adds metric_samples and session_worker_summaries tables with source preference tracking - flushMetricSamples() in store.ts drains accumulator → SQLite on every event, upserting session summaries and live session rows - docs/schema.md documents instrument names and resolution order Fix source preference ordering (otlp-metric > otlp-span > log-derived) using CASE-based SQL sort instead of alphabetical ASC which was inverted. Fix metricsSource detection to not require costUsd > 0. Co-Authored-By: Claude Opus 4.7 --- docs/schema.md | 55 ++++++ src/historicalStore.test.ts | 72 +++++++ src/historicalStore.ts | 379 +++++++++++++++++++++++++++++++++++- src/store.ts | 2 +- src/workerAnalytics.ts | 184 +++++++++++++++++ 5 files changed, 685 insertions(+), 7 deletions(-) diff --git a/docs/schema.md b/docs/schema.md index 9995562..d12ae51 100644 --- a/docs/schema.md +++ b/docs/schema.md @@ -177,6 +177,61 @@ Format: `category.action`. Categories group related lifecycle phases. | `lock.priority_bump_received` | Received priority bump notification | `path`, `from_worker` | | `lock.expired` | Lock expired | `path` | +## OTLP Metric Instruments + +When NEEDLE workers emit OTLP **Metric** payloads (Sum, Histogram, Gauge), FABRIC +normalizes each data point into a `metric.{name}` event (see normalizer). The +canonical instrument names below define the mapping to analytics DB columns in +`fabric.db`. + +FABRIC's Analytics Writer **prefers** OTLP metric values over log-derived +estimates when both are present for the same worker + session. + +### Token & Cost Instruments + +| Instrument Name | Type | Unit | DB Column(s) | +|---|---|---|---| +| `needle.worker.tokens.in` | Sum | tokens | `task_metrics.tokens_in`, `session_worker_summaries.tokens_in` | +| `needle.worker.tokens.out` | Sum | tokens | `task_metrics.tokens_out`, `session_worker_summaries.tokens_out` | +| `needle.worker.cost.usd` | Sum | USD | `task_metrics.cost`, `session_worker_summaries.cost_usd` | + +### Duration Instruments + +| Instrument Name | Type | Unit | DB Column(s) | +|---|---|---|---| +| `needle.bead.duration` | Histogram | ms | `task_metrics.duration_ms` | +| `needle.worker.uptime` | Gauge | ms | — (informational) | + +### Counting Instruments + +| Instrument Name | Type | Unit | DB Column(s) | +|---|---|---|---| +| `needle.bead.completed` | Sum | count | `session_worker_summaries.beads_completed` | +| `needle.bead.failed` | Sum | count | `session_worker_summaries.beads_failed` | +| `needle.worker.errors` | Sum | count | `session_worker_summaries.errors` | + +### Attribute Requirements + +Every metric data point **must** carry these OTLP attributes (namespaced form +preferred, plain form accepted as fallback): + +| Attribute | Required | Purpose | +|---|---|---| +| `needle.worker.id` / `worker_id` | yes | Worker identity | +| `needle.session.id` / `session_id` | yes | Session grouping | +| `needle.bead.id` / `bead_id` | for bead-scoped metrics | Task correlation | + +### Resolution Order + +When querying `fabric.db`, FABRIC resolves conflicting values in this order: + +1. **`otlp-metric`** — row sourced from an OTLP metric instrument (authoritative) +2. **`otlp-span`** — duration derived from span start/end times +3. **`log-derived`** — estimated from log message parsing (fallback) + +The `metrics_source` column on `sessions` and `session_worker_summaries` records +which source was used. + ## TypeScript Reference The canonical TypeScript definitions live in `src/types.ts`: diff --git a/src/historicalStore.test.ts b/src/historicalStore.test.ts index 4198354..7158462 100644 --- a/src/historicalStore.test.ts +++ b/src/historicalStore.test.ts @@ -629,5 +629,77 @@ describe('HistoricalStore', () => { expect(gammaPerf!.totalCostUsd).toBeCloseTo(0.092); expect(gammaPerf!.totalTokens).toBe(1650); }); + + it('should prefer otlp-metric source over log-derived in getLatestWorkerMetrics', () => { + store.startSession('source-pref-sess'); + + // Write a log-derived sample first + store.recordMetricSample({ + workerId: 'needle-delta', + metricName: INSTRUMENT_NAMES.TOKENS_IN, + value: 500, + timestamp: Date.now() - 1000, + source: 'log-derived', + }); + + // Write an OTLP-metric sample for the same instrument + store.recordMetricSample({ + workerId: 'needle-delta', + metricName: INSTRUMENT_NAMES.TOKENS_IN, + value: 1200, + timestamp: Date.now(), + source: 'otlp-metric', + }); + + const latest = store.getLatestWorkerMetrics('needle-delta'); + expect(latest[INSTRUMENT_NAMES.TOKENS_IN]).toBeDefined(); + expect(latest[INSTRUMENT_NAMES.TOKENS_IN].value).toBe(1200); + expect(latest[INSTRUMENT_NAMES.TOKENS_IN].source).toBe('otlp-metric'); + + store.endSession({ workerCount: 1, taskCount: 0, totalCost: 0, totalTokens: 0 }); + }); + + it('should mark metricsSource as otlp-metric even without cost data', () => { + store.startSession('no-cost-sess'); + + const accumulator = new MetricAccumulator(); + + // Only send token metrics — no cost + const logEvent1 = normalizeToLogEvent({ + name: INSTRUMENT_NAMES.TOKENS_IN, + timeUnixNano: String(Date.now() * 1_000_000), + asDouble: 800, + attributes: [ + { key: 'worker_id', value: { stringValue: 'needle-epsilon' } }, + { key: 'session_id', value: { stringValue: 'no-cost-sess' } }, + ], + }, 'otlp-metric'); + accumulator.processEvent(logEvent1!); + + const snap = accumulator.getSnapshot('needle-epsilon'); + expect(snap).not.toBeNull(); + // costUsd should be 0 but source should still be otlp-metric + expect(snap!.costUsd).toBe(0); + expect(snap!.tokensIn).toBe(800); + + store.upsertSessionWorkerSummary({ + workerId: 'needle-epsilon', + tokensIn: snap!.tokensIn, + tokensOut: snap!.tokensOut, + costUsd: snap!.costUsd, + beadsCompleted: snap!.beadsCompleted, + beadsFailed: snap!.beadsFailed, + errors: snap!.errors, + metricsSource: snap ? 'otlp-metric' : 'log-derived', + }); + + const summaries = store.getSessionWorkerSummaries({ sessionId: 'no-cost-sess' }); + expect(summaries).toHaveLength(1); + expect(summaries[0].metrics_source).toBe('otlp-metric'); + expect(summaries[0].tokens_in).toBe(800); + expect(summaries[0].cost_usd).toBe(0); + + store.endSession({ workerCount: 1, taskCount: 0, totalCost: 0, totalTokens: 800, metricsSource: 'otlp-metric' }); + }); }); }); diff --git a/src/historicalStore.ts b/src/historicalStore.ts index c23fc38..f3ea345 100644 --- a/src/historicalStore.ts +++ b/src/historicalStore.ts @@ -32,6 +32,7 @@ export interface SessionRecord { task_count: number; total_cost: number; total_tokens: number; + metrics_source?: string; } /** @@ -125,7 +126,7 @@ export interface LearnedRecoveryEntry { // Database Schema // ============================================ -const SCHEMA_VERSION = 1; +const SCHEMA_VERSION = 2; const CREATE_SESSIONS_TABLE = ` CREATE TABLE IF NOT EXISTS sessions ( @@ -190,6 +191,49 @@ CREATE TABLE IF NOT EXISTS schema_version ( ); `; +const CREATE_METRIC_SAMPLES_TABLE = ` +CREATE TABLE IF NOT EXISTS metric_samples ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + worker_id TEXT NOT NULL, + metric_name TEXT NOT NULL, + value REAL NOT NULL, + timestamp INTEGER NOT NULL, + source TEXT NOT NULL DEFAULT 'otlp-metric', + bead_id TEXT, + session_id TEXT +); + +CREATE INDEX IF NOT EXISTS idx_metric_samples_worker ON metric_samples(worker_id); +CREATE INDEX IF NOT EXISTS idx_metric_samples_name ON metric_samples(metric_name); +CREATE INDEX IF NOT EXISTS idx_metric_samples_timestamp ON metric_samples(timestamp); +`; + +const CREATE_SESSION_WORKER_SUMMARIES_TABLE = ` +CREATE TABLE IF NOT EXISTS session_worker_summaries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + worker_id TEXT NOT NULL, + tokens_in INTEGER NOT NULL DEFAULT 0, + tokens_out INTEGER NOT NULL DEFAULT 0, + cost_usd REAL NOT NULL DEFAULT 0, + beads_completed INTEGER NOT NULL DEFAULT 0, + beads_failed INTEGER NOT NULL DEFAULT 0, + errors INTEGER NOT NULL DEFAULT 0, + metrics_source TEXT NOT NULL DEFAULT 'log-derived', + updated_at INTEGER NOT NULL, + UNIQUE(session_id, worker_id) +); + +CREATE INDEX IF NOT EXISTS idx_sws_session ON session_worker_summaries(session_id); +CREATE INDEX IF NOT EXISTS idx_sws_worker ON session_worker_summaries(worker_id); +CREATE INDEX IF NOT EXISTS idx_sws_source ON session_worker_summaries(metrics_source); +`; + +// Schema v2 migration: add metrics_source column to sessions if missing +const MIGRATE_V2_ADD_METRICS_SOURCE = ` +ALTER TABLE sessions ADD COLUMN metrics_source TEXT NOT NULL DEFAULT 'log-derived'; +`; + // ============================================ // Historical Store Class // ============================================ @@ -239,13 +283,27 @@ export class HistoricalStore { const versionRow = this.db.prepare('SELECT version FROM schema_version').get() as { version: number } | undefined; const currentVersion = versionRow?.version || 0; - if (currentVersion < SCHEMA_VERSION) { - // Run schema migrations + if (currentVersion < 1) { + // Initial schema (v1) this.db.exec(CREATE_SESSIONS_TABLE); this.db.exec(CREATE_TASK_METRICS_TABLE); this.db.exec(CREATE_ERROR_HISTORY_TABLE); + } - // Update version + if (currentVersion < 2) { + // v2: metric_samples, session_worker_summaries, metrics_source on sessions + this.db.exec(CREATE_METRIC_SAMPLES_TABLE); + this.db.exec(CREATE_SESSION_WORKER_SUMMARIES_TABLE); + // Add metrics_source column to sessions (may already exist if v1 ran fresh) + try { + this.db.exec(MIGRATE_V2_ADD_METRICS_SOURCE); + } catch { + // Column already exists — safe to ignore + } + } + + // Update version + if (currentVersion < SCHEMA_VERSION) { this.db.prepare('INSERT OR REPLACE INTO schema_version (version) VALUES (?)').run(SCHEMA_VERSION); } } @@ -276,14 +334,16 @@ export class HistoricalStore { taskCount: number; totalCost: number; totalTokens: number; + metricsSource?: string; }): void { if (!this.currentSessionId) return; const endTime = Date.now(); + const source = metrics.metricsSource || 'log-derived'; this.db.prepare(` UPDATE sessions - SET ended_at = ?, worker_count = ?, task_count = ?, total_cost = ?, total_tokens = ? + SET ended_at = ?, worker_count = ?, task_count = ?, total_cost = ?, total_tokens = ?, metrics_source = ? WHERE id = ? `).run( endTime, @@ -291,6 +351,7 @@ export class HistoricalStore { metrics.taskCount, metrics.totalCost, metrics.totalTokens, + source, this.currentSessionId ); @@ -392,6 +453,225 @@ export class HistoricalStore { `).run(resolution, successful ? 1 : 0, errorId); } + // ============================================ + // OTLP Metric Persistence + // ============================================ + + /** + * Record a raw metric sample from OTLP ingestion. + */ + recordMetricSample(sample: { + workerId: string; + metricName: string; + value: number; + timestamp: number; + source: string; + beadId?: string; + }): void { + if (!this.currentSessionId) { + this.startSession(); + } + + this.db.prepare(` + INSERT INTO metric_samples (worker_id, metric_name, value, timestamp, source, bead_id, session_id) + VALUES (?, ?, ?, ?, ?, ?, ?) + `).run( + sample.workerId, + sample.metricName, + sample.value, + sample.timestamp, + sample.source, + sample.beadId || null, + this.currentSessionId, + ); + } + + /** + * Upsert per-worker session summary from metric accumulator snapshots. + * When metric-sourced data is available it overwrites log-derived estimates. + */ + upsertSessionWorkerSummary(summary: { + workerId: string; + tokensIn: number; + tokensOut: number; + costUsd: number; + beadsCompleted: number; + beadsFailed: number; + errors: number; + metricsSource: string; + }): void { + if (!this.currentSessionId) { + this.startSession(); + } + + this.db.prepare(` + INSERT INTO session_worker_summaries + (session_id, worker_id, tokens_in, tokens_out, cost_usd, beads_completed, beads_failed, errors, metrics_source, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(session_id, worker_id) DO UPDATE SET + tokens_in = excluded.tokens_in, + tokens_out = excluded.tokens_out, + cost_usd = excluded.cost_usd, + beads_completed = excluded.beads_completed, + beads_failed = excluded.beads_failed, + errors = excluded.errors, + metrics_source = excluded.metrics_source, + updated_at = excluded.updated_at + `).run( + this.currentSessionId, + summary.workerId, + summary.tokensIn, + summary.tokensOut, + summary.costUsd, + summary.beadsCompleted, + summary.beadsFailed, + summary.errors, + summary.metricsSource, + Date.now(), + ); + } + + /** + * Update the live (current) session row with aggregated metrics. + */ + updateLiveSession(metrics: { + workerCount: number; + taskCount: number; + totalCost: number; + totalTokens: number; + metricsSource: string; + }): void { + if (!this.currentSessionId) return; + + this.db.prepare(` + UPDATE sessions + SET worker_count = ?, task_count = ?, total_cost = ?, total_tokens = ?, metrics_source = ?, ended_at = ? + WHERE id = ? + `).run( + metrics.workerCount, + metrics.taskCount, + metrics.totalCost, + metrics.totalTokens, + metrics.metricsSource, + Date.now(), + this.currentSessionId, + ); + } + + /** + * Get session worker summaries, preferring metric-sourced rows. + */ + getSessionWorkerSummaries(options: { sessionId?: string; workerId?: string } = {}): Array<{ + sessionId: string; + workerId: string; + tokensIn: number; + tokensOut: number; + costUsd: number; + beadsCompleted: number; + beadsFailed: number; + errors: number; + metricsSource: string; + }> { + let query = 'SELECT * FROM session_worker_summaries WHERE 1=1'; + const params: (string | number)[] = []; + + if (options.sessionId) { + query += ' AND session_id = ?'; + params.push(options.sessionId); + } + if (options.workerId) { + query += ' AND worker_id = ?'; + params.push(options.workerId); + } + + query += ' ORDER BY updated_at DESC'; + + return this.db.prepare(query).all(...params) as Array<{ + id: number; + session_id: string; + worker_id: string; + tokens_in: number; + tokens_out: number; + cost_usd: number; + beads_completed: number; + beads_failed: number; + errors: number; + metrics_source: string; + updated_at: number; + }>; + } + + /** + * Query metric samples for a given time range / worker / instrument. + * Returns metric-sourced rows first, then log-derived rows. + */ + getMetricSamples(options: { + workerId?: string; + metricName?: string; + startTime?: number; + endTime?: number; + limit?: number; + } = {}): Array<{ + workerId: string; + metricName: string; + value: number; + timestamp: number; + source: string; + beadId: string | null; + sessionId: string | null; + }> { + let query = 'SELECT * FROM metric_samples WHERE 1=1'; + const params: (string | number)[] = []; + + if (options.workerId) { + query += ' AND worker_id = ?'; + params.push(options.workerId); + } + if (options.metricName) { + query += ' AND metric_name = ?'; + params.push(options.metricName); + } + if (options.startTime) { + query += ' AND timestamp >= ?'; + params.push(options.startTime); + } + if (options.endTime) { + query += ' AND timestamp <= ?'; + params.push(options.endTime); + } + + query += ` ORDER BY CASE source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END, timestamp DESC LIMIT ?`; + params.push(options.limit || 1000); + + return this.db.prepare(query).all(...params) as any[]; + } + + /** + * Get the latest aggregated values for each instrument for a worker. + * Prefers OTLP-metric-sourced data over log-derived. + */ + getLatestWorkerMetrics(workerId: string): Record { + const rows = this.db.prepare(` + SELECT metric_name, value, source, timestamp + FROM metric_samples + WHERE worker_id = ? + ORDER BY CASE source WHEN 'otlp-metric' THEN 0 WHEN 'otlp-span' THEN 1 ELSE 2 END, timestamp DESC + `).all(workerId) as Array<{ metric_name: string; value: number; source: string; timestamp: number }>; + + // Keep the first occurrence per metric_name (which is the preferred source) + const result: Record = {}; + for (const row of rows) { + if (!(row.metric_name in result)) { + result[row.metric_name] = { + value: row.value, + source: row.source, + timestamp: row.timestamp, + }; + } + } + return result; + } + // ============================================ // Query Methods // ============================================ @@ -722,8 +1002,49 @@ export class HistoricalStore { // Get sessions in range const sessions = this.getSingsInRange(startTime, endTime); + const sessionIds = sessions.map(s => s.id); - // Get task metrics in range + // Fetch metric-sourced worker summaries for these sessions + // These are preferred over log-derived task_metrics when available + const metricWorkerMap = new Map(); + + if (sessionIds.length > 0) { + const summaries = this.db.prepare(` + SELECT worker_id, tokens_in, tokens_out, cost_usd, beads_completed, beads_failed, errors, metrics_source + FROM session_worker_summaries + WHERE session_id IN (${sessionIds.map(() => '?').join(',')}) + AND metrics_source = 'otlp-metric' + `).all(...sessionIds) as Array<{ + worker_id: string; + tokens_in: number; + tokens_out: number; + cost_usd: number; + beads_completed: number; + beads_failed: number; + errors: number; + metrics_source: string; + }>; + + for (const s of summaries) { + metricWorkerMap.set(s.worker_id, { + tokensIn: s.tokens_in, + tokensOut: s.tokens_out, + costUsd: s.cost_usd, + beadsCompleted: s.beads_completed, + beadsFailed: s.beads_failed, + errors: s.errors, + }); + } + } + + // Get task metrics in range (fallback / log-derived) const tasks = this.db.prepare(` SELECT * FROM task_metrics WHERE started_at >= ? AND ended_at <= ? @@ -736,6 +1057,7 @@ export class HistoricalStore { cost: number; tokens: number; completionTimes: number[]; + hasMetricSource: boolean; }>(); let totalBeadsCompleted = 0; @@ -754,6 +1076,7 @@ export class HistoricalStore { cost: 0, tokens: 0, completionTimes: [], + hasMetricSource: metricWorkerMap.has(task.worker_id), }; workerMap.set(task.worker_id, worker); } @@ -775,6 +1098,48 @@ export class HistoricalStore { worker.tokens += task.tokens_in + task.tokens_out; } + // For workers with metric-sourced summaries, override the log-derived values + for (const [workerId, metricData] of metricWorkerMap) { + const worker = workerMap.get(workerId); + if (worker) { + // Replace log-derived estimates with metric-sourced values + worker.tasksCompleted = metricData.beadsCompleted; + worker.errors = metricData.errors; + worker.cost = metricData.costUsd; + worker.tokens = metricData.tokensIn + metricData.tokensOut; + worker.hasMetricSource = true; + } else { + // Worker has metric data but no task_metrics entries yet + workerMap.set(workerId, { + tasksCompleted: metricData.beadsCompleted, + errors: metricData.errors, + cost: metricData.costUsd, + tokens: metricData.tokensIn + metricData.tokensOut, + completionTimes: [], + hasMetricSource: true, + }); + } + } + + // Recalculate totals from the (possibly overridden) worker map + totalBeadsCompleted = 0; + totalErrors = 0; + totalCostUsd = 0; + totalTokens = 0; + totalCompletionTime = 0; + successCount = 0; + + for (const worker of workerMap.values()) { + totalBeadsCompleted += worker.tasksCompleted; + totalErrors += worker.errors; + totalCostUsd += worker.cost; + totalTokens += worker.tokens; + successCount += worker.tasksCompleted; + for (const d of worker.completionTimes) { + totalCompletionTime += d; + } + } + const totalTimeMs = endTime - startTime; const totalTimeHours = totalTimeMs / 3600000; const avgBeadsPerHour = totalTimeHours > 0 ? totalBeadsCompleted / totalTimeHours : 0; @@ -898,6 +1263,8 @@ export class HistoricalStore { * Clear all historical data */ clear(): void { + this.db.exec('DELETE FROM metric_samples'); + this.db.exec('DELETE FROM session_worker_summaries'); this.db.exec('DELETE FROM error_history'); this.db.exec('DELETE FROM task_metrics'); this.db.exec('DELETE FROM sessions'); diff --git a/src/store.ts b/src/store.ts index ff07312..6fb36cb 100644 --- a/src/store.ts +++ b/src/store.ts @@ -299,7 +299,7 @@ export class InMemoryEventStore implements EventStore { beadsCompleted: metricSnap?.beadsCompleted ?? wm.beadsCompleted, beadsFailed: metricSnap?.beadsFailed ?? 0, errors: metricSnap?.errors ?? wm.errorCount, - metricsSource: metricSnap && metricSnap.costUsd > 0 ? 'otlp-metric' : 'log-derived', + metricsSource: metricSnap ? 'otlp-metric' : 'log-derived', }); } } diff --git a/src/workerAnalytics.ts b/src/workerAnalytics.ts index 9ef1ac8..8c60aa3 100644 --- a/src/workerAnalytics.ts +++ b/src/workerAnalytics.ts @@ -23,6 +23,175 @@ import { import { CostTracker } from './tui/utils/costTracking.js'; import { getHistoricalStore, HistoricalStore, WorkerComparisonMetrics } from './historicalStore.js'; +// ── Canonical OTLP metric instrument names ─────────────────────── + +export const INSTRUMENT_NAMES = { + TOKENS_IN: 'needle.worker.tokens.in', + TOKENS_OUT: 'needle.worker.tokens.out', + COST_USD: 'needle.worker.cost.usd', + BEAD_DURATION: 'needle.bead.duration', + BEAD_COMPLETED: 'needle.bead.completed', + BEAD_FAILED: 'needle.bead.failed', + WORKER_ERRORS: 'needle.worker.errors', + WORKER_UPTIME: 'needle.worker.uptime', +} as const; + +const ALL_INSTRUMENTS = new Set(Object.values(INSTRUMENT_NAMES)); + +// ── MetricAccumulator ──────────────────────────────────────────── + +export interface MetricSample { + workerId: string; + metricName: string; + value: number; + timestamp: number; + beadId?: string; +} + +export interface WorkerMetricSnapshot { + tokensIn: number; + tokensOut: number; + costUsd: number; + beadsCompleted: number; + beadsFailed: number; + errors: number; + durations: number[]; +} + +/** + * Accumulates OTLP metric data points, keyed by (worker, instrument). + * drainSamples() is called periodically by the store to flush to SQLite. + */ +export class MetricAccumulator { + private samples: MetricSample[] = []; + private hasData = false; + + /** Per-worker running totals for cumulative instruments */ + private workerTotals = new Map>(); + + /** Per-worker bead duration samples */ + private workerDurations = new Map(); + + /** Per-worker bead completion/failure counts */ + private workerBeadCompleted = new Map(); + private workerBeadFailed = new Map(); + private workerErrors = new Map(); + + /** + * Ingest a LogEvent that was produced by normalizing an OTLP metric. + * event.msg is "metric." and event.value / event.metric_name carry the payload. + */ + processEvent(event: LogEvent): void { + const msg = event.msg || ''; + if (!msg.startsWith('metric.')) return; + + const metricName = (event.metric_name as string) || msg.slice(7); + if (!metricName) return; + + const value = typeof event.value === 'number' ? event.value + : typeof event.metric_value === 'number' ? event.metric_value as number + : undefined; + if (value === undefined) return; + + this.hasData = true; + + const sample: MetricSample = { + workerId: event.worker, + metricName, + value, + timestamp: event.ts, + }; + if (event.bead) sample.beadId = event.bead; + + this.samples.push(sample); + + // Update per-worker running totals + if (!this.workerTotals.has(event.worker)) { + this.workerTotals.set(event.worker, new Map()); + } + const totals = this.workerTotals.get(event.worker)!; + + switch (metricName) { + case INSTRUMENT_NAMES.TOKENS_IN: + case INSTRUMENT_NAMES.TOKENS_OUT: + case INSTRUMENT_NAMES.COST_USD: + totals.set(metricName, (totals.get(metricName) || 0) + value); + break; + case INSTRUMENT_NAMES.BEAD_DURATION: { + let durations = this.workerDurations.get(event.worker); + if (!durations) { + durations = []; + this.workerDurations.set(event.worker, durations); + } + durations.push(value); + break; + } + case INSTRUMENT_NAMES.BEAD_COMPLETED: + this.workerBeadCompleted.set(event.worker, + (this.workerBeadCompleted.get(event.worker) || 0) + value); + break; + case INSTRUMENT_NAMES.BEAD_FAILED: + this.workerBeadFailed.set(event.worker, + (this.workerBeadFailed.get(event.worker) || 0) + value); + break; + case INSTRUMENT_NAMES.WORKER_ERRORS: + this.workerErrors.set(event.worker, + (this.workerErrors.get(event.worker) || 0) + value); + break; + } + } + + /** + * Drain buffered samples for SQLite persistence. + * Returns the samples and clears the buffer. + */ + drainSamples(): MetricSample[] { + const drained = this.samples; + this.samples = []; + return drained; + } + + /** + * Whether any OTLP metric data has been received. + */ + hasMetricData(): boolean { + return this.hasData; + } + + /** + * Get a snapshot of accumulated metric values for a specific worker. + * Returns null if no metric data exists for this worker. + */ + getSnapshot(workerId: string): WorkerMetricSnapshot | null { + const totals = this.workerTotals.get(workerId); + if (!totals && !this.workerDurations.has(workerId) + && !this.workerBeadCompleted.has(workerId)) { + return null; + } + + return { + tokensIn: totals?.get(INSTRUMENT_NAMES.TOKENS_IN) || 0, + tokensOut: totals?.get(INSTRUMENT_NAMES.TOKENS_OUT) || 0, + costUsd: totals?.get(INSTRUMENT_NAMES.COST_USD) || 0, + beadsCompleted: this.workerBeadCompleted.get(workerId) || 0, + beadsFailed: this.workerBeadFailed.get(workerId) || 0, + errors: this.workerErrors.get(workerId) || 0, + durations: this.workerDurations.get(workerId) || [], + }; + } + + /** Reset all accumulated data. */ + reset(): void { + this.samples = []; + this.hasData = false; + this.workerTotals.clear(); + this.workerDurations.clear(); + this.workerBeadCompleted.clear(); + this.workerBeadFailed.clear(); + this.workerErrors.clear(); + } +} + const DEFAULT_OPTIONS: Required = { timeWindow: 'all', startTime: 0, @@ -72,10 +241,12 @@ export class WorkerAnalytics implements WorkerAnalyticsStore { private costTracker: CostTracker; private timeSeriesInterval: number; private lastSnapshotTime: number = 0; + private metricAccumulator: MetricAccumulator; constructor(costTracker?: CostTracker, timeSeriesInterval: number = 3600000) { this.costTracker = costTracker || new CostTracker(); this.timeSeriesInterval = timeSeriesInterval; + this.metricAccumulator = new MetricAccumulator(); } /** @@ -116,6 +287,11 @@ export class WorkerAnalytics implements WorkerAnalyticsStore { worker.totalTokens = workerCost.total; } + // Feed OTLP metric events to the accumulator + if (event.msg?.startsWith('metric.')) { + this.metricAccumulator.processEvent(event); + } + // Periodic time-series snapshot this.maybeCreateSnapshot(event.ts); } @@ -336,6 +512,7 @@ export class WorkerAnalytics implements WorkerAnalyticsStore { clear(): void { this.workers.clear(); this.costTracker.reset(); + this.metricAccumulator.reset(); this.lastSnapshotTime = 0; } @@ -346,6 +523,13 @@ export class WorkerAnalytics implements WorkerAnalyticsStore { return this.costTracker; } + /** + * Get the MetricAccumulator for OTLP metric data + */ + getMetricAccumulator(): MetricAccumulator { + return this.metricAccumulator; + } + /** * Get analytics summary as formatted string */