feat(bd-msa): Implement worker analytics aggregation

Comprehensive worker performance metrics tracking system:

Features:
- Beads per hour calculation
- Average completion time tracking
- Error rate monitoring
- Cost per bead calculation (USD)
- Idle percentage tracking
- Time-series data storage and retrieval
- Performance trend analysis
- Aggregated analytics across all workers

Implementation:
- Added WorkerAnalytics types to types.ts
- Created workerAnalytics.ts module with full aggregation logic
- Integrated with EventStore for automatic event processing
- 25 comprehensive unit tests (all passing)
- Exported from index.ts for public API access

Technical highlights:
- Automatic activity period detection with 5-minute gap threshold
- Time-series snapshots at configurable intervals (default 1 hour)
- Flexible time window filtering (hour/day/week/all)
- Worker ranking by performance, error rate, and cost efficiency
- Performance trend detection (improving/declining/stable)

Co-Authored-By: Claude Worker <noreply@anthropic.com>
This commit is contained in:
jeda 2026-03-04 03:46:28 +00:00
parent 321539eee4
commit eedff8acc2
7 changed files with 1457 additions and 2 deletions

View file

@ -126,7 +126,7 @@
{"id":"bd-k1p","title":"Add Focus Mode UI controls","description":"Add Focus Mode keybindings: p to pin/unpin worker, P to pin/unpin bead, F to toggle focus. Visual indicator for pinned items, dimmed display for non-pinned.","status":"open","priority":3,"issue_type":"task","created_at":"2026-03-04T03:06:37.599761897Z","created_by":"coder","updated_at":"2026-03-04T03:07:10.740989876Z","source_repo":".","compaction_level":0,"original_size":0,"dependencies":[{"issue_id":"bd-k1p","depends_on_id":"bd-qt4","type":"blocks","created_at":"2026-03-04T03:07:10.740878918Z","created_by":"coder"}]}
{"id":"bd-lj9","title":"ALERT: Worker claude-code-glm-5-bravo has no work available","description":"# Worker Starvation Alert\n\nWorker **claude-code-glm-5-bravo** has exhausted all priorities and found zero work.\n\nThis is considered an error state - there should always be more work.\n\n## Worker State\n\n- **Executor:** claude-code-glm-5\n- **Model:** glm-5\n- **Workspace:** /home/coder/FABRIC\n- **Root Boundary:** /home/coder/FABRIC\n- **Last completion:** \n- **Beads completed:** 0\n- **Claim success rate:** %\n- **Uptime:** 20887s (h)\n- **Consecutive empty iterations:** 5\n\n## Priorities Exhausted\n\n1. ✗ Local workspace (bottoms-up): No beads in /home/coder/FABRIC or subfolders\n2. ✗ Parent exploration: No suitable workspaces found\n3. ✓ Maintenance: Completed (cleaned orphaned claims/locks)\n4. ✗ Gap analysis: false - No gaps found or created\n5. ✗ HUMAN alternatives: true - No HUMAN beads found to unblock\n\n## Discovered Workspaces\n\nTotal: 1\n\n- /home/coder/FABRIC\n\n## Required Actions\n\n1. Review discovery roots: Are all project folders being scanned?\n2. Check if projects need new features/tasks\n3. Review ROADMAP.md files across projects\n4. Enable gap analysis if disabled: `--enable-gap-analysis`\n5. Enable HUMAN alternatives if disabled\n6. Create manual beads to bootstrap work\n\n---\n*This alert was created automatically by Priority 6*","status":"closed","priority":0,"issue_type":"human","created_at":"2026-03-03T10:11:39.654754002Z","created_by":"coder","updated_at":"2026-03-03T10:14:47.575272726Z","closed_at":"2026-03-03T10:14:47.575071208Z","source_repo":".","compaction_level":0,"original_size":0,"comments":[{"id":25,"issue_id":"bd-lj9","author":"Jed Arden","text":"FALSE POSITIVE: 22 beads available in ready-queue.json","created_at":"2026-03-03T10:14:41Z"}]}
{"id":"bd-mn8","title":"ALERT: Worker claude-code-glm-5-alpha has no work available","description":"# Worker Starvation Alert\n\nWorker **claude-code-glm-5-alpha** has exhausted all priorities and found zero work.\n\nThis is considered an error state - there should always be more work.\n\n## Worker State\n\n- **Executor:** claude-code-glm-5\n- **Model:** glm-5\n- **Workspace:** /home/coder/FABRIC\n- **Root Boundary:** /home/coder/FABRIC\n- **Last completion:** \n- **Beads completed:** 0\n- **Claim success rate:** %\n- **Uptime:** 25832s (h)\n- **Consecutive empty iterations:** 5\n\n## Priorities Exhausted\n\n1. ✗ Local workspace (bottoms-up): No beads in /home/coder/FABRIC or subfolders\n2. ✗ Parent exploration: No suitable workspaces found\n3. ✓ Maintenance: Completed (cleaned orphaned claims/locks)\n4. ✗ Gap analysis: false - No gaps found or created\n5. ✗ HUMAN alternatives: true - No HUMAN beads found to unblock\n\n## Discovered Workspaces\n\nTotal: 1\n\n- /home/coder/FABRIC\n\n## Required Actions\n\n1. Review discovery roots: Are all project folders being scanned?\n2. Check if projects need new features/tasks\n3. Review ROADMAP.md files across projects\n4. Enable gap analysis if disabled: `--enable-gap-analysis`\n5. Enable HUMAN alternatives if disabled\n6. Create manual beads to bootstrap work\n\n---\n*This alert was created automatically by Priority 6*","status":"closed","priority":0,"issue_type":"human","assignee":"coder","created_at":"2026-03-03T11:34:02.591107993Z","created_by":"coder","updated_at":"2026-03-03T11:35:08.093925253Z","closed_at":"2026-03-03T11:35:08.089565268Z","close_reason":"FALSE POSITIVE: Worker starvation alert was incorrect. Ready-queue.json contains 22 available beads (bd-2zt, bd-2ed, bd-1mq, etc.). Worker discovery logic should check ready-queue.json before creating HUMAN beads. See MEMORY.md for resolution pattern.","source_repo":".","compaction_level":0,"original_size":0}
{"id":"bd-msa","title":"Implement worker analytics aggregation","description":"Create module to aggregate worker performance metrics: beads per hour, average completion time, error rate, cost per bead, idle percentage. Store time-series data.","status":"open","priority":4,"issue_type":"task","created_at":"2026-03-04T03:06:04.671550741Z","created_by":"coder","updated_at":"2026-03-04T03:06:04.671550741Z","source_repo":".","compaction_level":0,"original_size":0}
{"id":"bd-msa","title":"Implement worker analytics aggregation","description":"Create module to aggregate worker performance metrics: beads per hour, average completion time, error rate, cost per bead, idle percentage. Store time-series data.","status":"in_progress","priority":4,"issue_type":"task","assignee":"coder","created_at":"2026-03-04T03:06:04.671550741Z","created_by":"coder","updated_at":"2026-03-04T03:40:24.300691642Z","source_repo":".","compaction_level":0,"original_size":0}
{"id":"bd-mza","title":"P4-002: Cross-Reference Hyperlinking","description":"Implement cross-reference hyperlinking feature - ability to link related events, tasks, and files across worker sessions. This enables navigating between related activities.","status":"closed","priority":3,"issue_type":"task","assignee":"coder","created_at":"2026-03-03T13:30:32.594937258Z","created_by":"coder","updated_at":"2026-03-04T03:00:53.068767222Z","closed_at":"2026-03-04T03:00:53.065416741Z","close_reason":"done","source_repo":".","compaction_level":0,"original_size":0,"labels":["hyperlink","intelligence","phase-4"]}
{"id":"bd-n7l","title":"ALERT: Worker claude-code-glm-5-alpha has no work available","description":"# Worker Starvation Alert\n\nWorker **claude-code-glm-5-alpha** has exhausted all priorities and found zero work.\n\nThis is considered an error state - there should always be more work.\n\n## Worker State\n\n- **Executor:** claude-code-glm-5\n- **Model:** glm-5\n- **Workspace:** /home/coder/FABRIC\n- **Root Boundary:** /home/coder/FABRIC\n- **Last completion:** \n- **Beads completed:** 0\n- **Claim success rate:** %\n- **Uptime:** 31915s (h)\n- **Consecutive empty iterations:** 5\n\n## Priorities Exhausted\n\n1. ✗ Local workspace (bottoms-up): No beads in /home/coder/FABRIC or subfolders\n2. ✗ Parent exploration: No suitable workspaces found\n3. ✓ Maintenance: Completed (cleaned orphaned claims/locks)\n4. ✗ Gap analysis: false - No gaps found or created\n5. ✗ HUMAN alternatives: true - No HUMAN beads found to unblock\n\n## Discovered Workspaces\n\nTotal: 1\n\n- /home/coder/FABRIC\n\n## Required Actions\n\n1. Review discovery roots: Are all project folders being scanned?\n2. Check if projects need new features/tasks\n3. Review ROADMAP.md files across projects\n4. Enable gap analysis if disabled: `--enable-gap-analysis`\n5. Enable HUMAN alternatives if disabled\n6. Create manual beads to bootstrap work\n\n---\n*This alert was created automatically by Priority 6*","status":"closed","priority":0,"issue_type":"human","assignee":"coder","created_at":"2026-03-03T13:15:25.754106355Z","created_by":"coder","updated_at":"2026-03-03T13:16:29.233645537Z","closed_at":"2026-03-03T13:16:29.223209032Z","close_reason":"FALSE POSITIVE: Worker starvation alert created without checking ready-queue.json. Ready queue has 22 beads available (bd-2zt, bd-2ed, bd-1mq, etc.). Worker discovery should check ready-queue.json before creating HUMAN/alert beads. Pattern matches previously closed false-positives bd-123, bd-38q, bd-3g1, bd-3sh, bd-1sw, bd-3ly, bd-13y, bd-1hv, bd-6xy, bd-1g0, bd-lj9, bd-9r6, bd-zsh, bd-1k7, bd-2n4.","source_repo":".","compaction_level":0,"original_size":0}
{"id":"bd-n8l","title":"Phase 2: TUI Display","description":"# Phase 2: TUI Display\n\n## Overview\nBuild the terminal user interface for FABRIC. This is the primary interface for developers who prefer staying in the terminal.\n\n## Goals\n1. **Worker Grid**: Real-time status of all active workers\n2. **Log Stream**: Scrolling log output as events arrive\n3. **Detail Panel**: Focus on a specific worker's activity\n4. **Keyboard Navigation**: j/k scroll, / search, Tab switch panels, q quit\n5. **Command Palette**: Ctrl+K for universal search and commands\n6. **File Context**: Split view showing file contents alongside activity\n7. **Focus Mode**: Pin workers/tasks to filter noise\n\n## Key Design Decisions\n- Use `blessed` or `ink` for terminal UI (ink preferred for React patterns)\n- All panels should update independently (no full-screen refresh)\n- Keyboard shortcuts should be discoverable (help overlay)\n- Support 256-color and true-color terminals\n\n## Layout\n```\n┌─ FABRIC ─────────────────────────────────────────────────┐\n│ │\n│ Workers (N active) [?] Help │\n│ ┌──────────────────────────────────────────────────────┐ │\n│ │ ● w-alpha Running bd-1847 \"Implement...\" 2m │ │\n│ │ ● w-bravo Running bd-1852 \"Fix...\" 1m │ │\n│ │ ○ w-charlie Idle - - - │ │\n│ └──────────────────────────────────────────────────────┘ │\n│ │\n│ Activity Stream Filter: [All ▾] │\n│ ┌──────────────────────────────────────────────────────┐ │\n│ │ 14:32:07 w-alpha INFO Tool call: Edit... │ │\n│ │ 14:32:05 w-bravo DEBUG Reading file: ... │ │\n│ └──────────────────────────────────────────────────────┘ │\n│ │\n│ [Tab] Switch [j/k] Scroll [/] Search [q] Quit │\n└──────────────────────────────────────────────────────────┘\n```\n\n## Dependencies\n- Phase 1: Core Infrastructure (event emitter, event store)\n\n## Success Criteria\n- UI renders correctly in terminals 80x24 to 200x60\n- All keyboard interactions complete in <50ms\n- Smooth scrolling at 100+ events/second\n- Works over SSH connections\n\n## Child Beads\n- bd-P2-001: TUI Framework Setup\n- bd-P2-010: Worker List Panel\n- bd-P2-020: Live Log Stream Panel\n- bd-P2-030: Worker Detail Panel\n- bd-P2-040: Keyboard Controls\n- bd-P2-050: Command Palette (TUI)\n- bd-P2-060: File Context Panel\n- bd-P2-070: Focus Mode (TUI)","status":"closed","priority":0,"issue_type":"phase","created_at":"2026-03-02T14:38:59.011210511Z","created_by":"coder","updated_at":"2026-03-03T10:36:46.832672612Z","closed_at":"2026-03-03T10:36:46.831395980Z","close_reason":"Phase 2 complete: TUI implemented with blessed (app.ts, WorkerGrid, ActivityStream, WorkerDetail, CommandPalette, DiffView)","source_repo":".","compaction_level":0,"original_size":0}

View file

@ -24,3 +24,4 @@ export interface WorkerState {
// Re-export submodules
export * from './types.js';
export { SessionDigestGenerator, formatDigestAsMarkdown } from './sessionDigest.js';
export { WorkerAnalytics, getWorkerAnalytics, resetWorkerAnalytics } from './workerAnalytics.js';

View file

@ -36,6 +36,7 @@ import {
import { ErrorGroupManager, getErrorGroupManager } from './errorGrouping.js';
import { RecoveryManager, getRecoveryManager } from './tui/utils/recoveryPlaybook.js';
import { CrossReferenceManager, getCrossReferenceManager } from './crossReferenceManager.js';
import { WorkerAnalytics, getWorkerAnalytics } from './workerAnalytics.js';
/** Time window (in ms) to consider events as concurrent */
const COLLISION_WINDOW_MS = 5000;
@ -79,6 +80,7 @@ export class InMemoryEventStore implements EventStore {
private errorGroupManager: ErrorGroupManager;
private recoveryManager: RecoveryManager;
private crossReferenceManager: CrossReferenceManager;
private workerAnalytics: WorkerAnalytics;
private maxEvents: number;
private alertCounter = 0;
private batchBuffer: LogEvent[] = [];
@ -89,6 +91,7 @@ export class InMemoryEventStore implements EventStore {
this.errorGroupManager = new ErrorGroupManager();
this.recoveryManager = getRecoveryManager();
this.crossReferenceManager = getCrossReferenceManager();
this.workerAnalytics = getWorkerAnalytics();
}
/**
@ -110,6 +113,9 @@ export class InMemoryEventStore implements EventStore {
// Process event for cross-references (immediate)
this.crossReferenceManager.processEvent(event);
// Process event for worker analytics
this.workerAnalytics.processEvent(event);
// Add to batch buffer for relationship detection
this.batchBuffer.push(event);
this.scheduleBatchProcessing();
@ -1077,6 +1083,52 @@ export class InMemoryEventStore implements EventStore {
return this.recoveryManager.getStats();
}
// ============================================
// Worker Analytics Methods
// ============================================
/**
* Get worker analytics instance
*/
getWorkerAnalytics(): WorkerAnalytics {
return this.workerAnalytics;
}
/**
* Get analytics metrics for a specific worker
*/
getWorkerMetrics(workerId: string, options?: any) {
return this.workerAnalytics.getWorkerMetrics(workerId, options);
}
/**
* Get analytics metrics for all workers
*/
getAllWorkerMetrics(options?: any) {
return this.workerAnalytics.getAllWorkerMetrics(options);
}
/**
* Get aggregated analytics across all workers
*/
getAggregatedAnalytics(options?: any) {
return this.workerAnalytics.getAggregatedAnalytics(options);
}
/**
* Get performance trends for a worker
*/
getPerformanceTrends(workerId: string, metric: any, options?: any) {
return this.workerAnalytics.getPerformanceTrends(workerId, metric, options);
}
/**
* Get worker analytics summary
*/
getAnalyticsSummary(options?: any): string {
return this.workerAnalytics.getSummary(options);
}
/**
* Get all available recovery playbooks
*/

View file

@ -1133,3 +1133,208 @@ export interface SessionDigestOptions {
/** Maximum errors to list */
maxErrors?: number;
}
// ============================================
// Worker Analytics Types
// ============================================
/**
* Time window for aggregation
*/
export type TimeWindow = 'hour' | 'day' | 'week' | 'all';
/**
* Worker analytics metrics for a specific time period
*/
export interface WorkerMetrics {
/** Worker ID */
workerId: string;
/** Time period start (Unix timestamp) */
periodStart: number;
/** Time period end (Unix timestamp) */
periodEnd: number;
/** Total beads completed in this period */
beadsCompleted: number;
/** Beads per hour (rate) */
beadsPerHour: number;
/** Average completion time per bead (milliseconds) */
avgCompletionTimeMs: number;
/** Total errors encountered */
errorCount: number;
/** Error rate (errors per bead) */
errorRate: number;
/** Total cost incurred (USD) */
totalCostUsd: number;
/** Cost per bead (USD) */
costPerBead: number;
/** Total active time (milliseconds) */
activeTimeMs: number;
/** Total idle time (milliseconds) */
idleTimeMs: number;
/** Idle percentage (0-100) */
idlePercentage: number;
/** Total events processed */
totalEvents: number;
/** Total tokens used */
totalTokens: number;
/** Tokens per bead */
tokensPerBead: number;
}
/**
* Time-series data point for worker metrics
*/
export interface MetricsDataPoint {
/** Timestamp of this data point */
timestamp: number;
/** Worker ID */
workerId: string;
/** Metrics snapshot at this time */
metrics: Partial<WorkerMetrics>;
}
/**
* Worker performance trend
*/
export interface PerformanceTrend {
/** Worker ID */
workerId: string;
/** Metric being tracked */
metric: keyof WorkerMetrics;
/** Time-series data points */
dataPoints: MetricsDataPoint[];
/** Trend direction: 'improving' | 'declining' | 'stable' */
trend: 'improving' | 'declining' | 'stable';
/** Percentage change from first to last data point */
changePercent: number;
/** Average value across all data points */
average: number;
/** Minimum value */
min: number;
/** Maximum value */
max: number;
}
/**
* Aggregated analytics across all workers
*/
export interface AggregatedAnalytics {
/** Time period covered */
periodStart: number;
periodEnd: number;
/** Total workers tracked */
totalWorkers: number;
/** Total beads completed */
totalBeadsCompleted: number;
/** Average beads per hour across all workers */
avgBeadsPerHour: number;
/** Average completion time across all workers */
avgCompletionTimeMs: number;
/** Total errors across all workers */
totalErrors: number;
/** Overall error rate */
overallErrorRate: number;
/** Total cost across all workers */
totalCostUsd: number;
/** Average cost per bead */
avgCostPerBead: number;
/** Top performers (sorted by beads completed) */
topPerformers: WorkerMetrics[];
/** Workers with highest error rates */
highErrorRateWorkers: WorkerMetrics[];
/** Most cost-efficient workers (lowest cost per bead) */
costEfficientWorkers: WorkerMetrics[];
}
/**
* Options for worker analytics
*/
export interface WorkerAnalyticsOptions {
/** Time window for aggregation */
timeWindow?: TimeWindow;
/** Custom start time (overrides timeWindow) */
startTime?: number;
/** Custom end time (overrides timeWindow) */
endTime?: number;
/** Filter by specific worker IDs */
workerIds?: string[];
/** Minimum beads completed to be included */
minBeadsCompleted?: number;
/** Maximum workers to return in rankings */
maxWorkers?: number;
/** Include time-series data */
includeTimeSeries?: boolean;
/** Time-series data point interval (milliseconds) */
timeSeriesInterval?: number;
}
/**
* Worker analytics store interface
*/
export interface WorkerAnalyticsStore {
/** Process an event and update analytics */
processEvent(event: LogEvent): void;
/** Get metrics for a specific worker */
getWorkerMetrics(workerId: string, options?: WorkerAnalyticsOptions): WorkerMetrics | undefined;
/** Get metrics for all workers */
getAllWorkerMetrics(options?: WorkerAnalyticsOptions): WorkerMetrics[];
/** Get aggregated analytics */
getAggregatedAnalytics(options?: WorkerAnalyticsOptions): AggregatedAnalytics;
/** Get performance trends */
getPerformanceTrends(workerId: string, metric: keyof WorkerMetrics, options?: WorkerAnalyticsOptions): PerformanceTrend;
/** Get time-series data */
getTimeSeriesData(workerId: string, options?: WorkerAnalyticsOptions): MetricsDataPoint[];
/** Clear all analytics data */
clear(): void;
/** Get analytics summary as formatted string */
getSummary(options?: WorkerAnalyticsOptions): string;
}

574
src/workerAnalytics.test.ts Normal file
View file

@ -0,0 +1,574 @@
/**
* Worker Analytics Tests
*/
import { describe, it, expect, beforeEach } from 'vitest';
import { WorkerAnalytics } from './workerAnalytics.js';
import { LogEvent } from './types.js';
import { CostTracker } from './tui/utils/costTracking.js';
describe('WorkerAnalytics', () => {
let analytics: WorkerAnalytics;
let costTracker: CostTracker;
const baseTime = Date.now();
beforeEach(() => {
costTracker = new CostTracker();
analytics = new WorkerAnalytics(costTracker, 3600000); // 1 hour snapshots
});
describe('Basic Event Processing', () => {
it('should process events and track worker', () => {
const event: LogEvent = {
ts: baseTime,
worker: 'w-test-1',
level: 'info',
msg: 'Starting work',
};
analytics.processEvent(event);
const metrics = analytics.getWorkerMetrics('w-test-1');
expect(metrics).toBeDefined();
expect(metrics?.workerId).toBe('w-test-1');
expect(metrics?.totalEvents).toBe(1);
});
it('should track multiple workers', () => {
const events: LogEvent[] = [
{ ts: baseTime, worker: 'w-1', level: 'info', msg: 'Event 1' },
{ ts: baseTime + 1000, worker: 'w-2', level: 'info', msg: 'Event 2' },
{ ts: baseTime + 2000, worker: 'w-1', level: 'info', msg: 'Event 3' },
];
events.forEach(e => analytics.processEvent(e));
const allMetrics = analytics.getAllWorkerMetrics();
expect(allMetrics).toHaveLength(2);
const w1Metrics = analytics.getWorkerMetrics('w-1');
const w2Metrics = analytics.getWorkerMetrics('w-2');
expect(w1Metrics?.totalEvents).toBe(2);
expect(w2Metrics?.totalEvents).toBe(1);
});
});
describe('Bead Completion Tracking', () => {
it('should track bead completions', () => {
const events: LogEvent[] = [
{
ts: baseTime,
worker: 'w-1',
level: 'info',
msg: 'Starting bead bd-123',
bead: 'bd-123',
},
{
ts: baseTime + 5000,
worker: 'w-1',
level: 'info',
msg: 'Bead completed successfully',
bead: 'bd-123',
},
{
ts: baseTime + 10000,
worker: 'w-1',
level: 'info',
msg: 'Working on bd-456',
bead: 'bd-456',
},
{
ts: baseTime + 18000,
worker: 'w-1',
level: 'info',
msg: 'Task finished',
bead: 'bd-456',
},
];
events.forEach(e => analytics.processEvent(e));
const metrics = analytics.getWorkerMetrics('w-1');
expect(metrics?.beadsCompleted).toBe(2);
expect(metrics?.avgCompletionTimeMs).toBeGreaterThan(0);
});
it('should calculate beads per hour', () => {
const oneHour = 3600000;
const events: LogEvent[] = [];
// Simulate 10 beads completed over 1 hour
for (let i = 0; i < 10; i++) {
events.push({
ts: baseTime + (i * oneHour / 10),
worker: 'w-1',
level: 'info',
msg: `Starting bead bd-${i}`,
bead: `bd-${i}`,
});
events.push({
ts: baseTime + (i * oneHour / 10) + 1000,
worker: 'w-1',
level: 'info',
msg: 'Completed',
bead: `bd-${i}`,
});
}
events.forEach(e => analytics.processEvent(e));
const metrics = analytics.getWorkerMetrics('w-1', {
startTime: baseTime,
endTime: baseTime + oneHour,
});
expect(metrics?.beadsCompleted).toBe(10);
expect(metrics?.beadsPerHour).toBeCloseTo(10, 0);
});
it('should calculate average completion time', () => {
const events: LogEvent[] = [
{ ts: baseTime, worker: 'w-1', level: 'info', msg: 'Start', bead: 'bd-1' },
{ ts: baseTime + 5000, worker: 'w-1', level: 'info', msg: 'Completed', bead: 'bd-1' },
{ ts: baseTime + 10000, worker: 'w-1', level: 'info', msg: 'Start', bead: 'bd-2' },
{ ts: baseTime + 20000, worker: 'w-1', level: 'info', msg: 'Done', bead: 'bd-2' },
];
events.forEach(e => analytics.processEvent(e));
const metrics = analytics.getWorkerMetrics('w-1');
// Average: (5000 + 10000) / 2 = 7500
expect(metrics?.avgCompletionTimeMs).toBe(7500);
});
});
describe('Error Tracking', () => {
it('should track error count', () => {
const events: LogEvent[] = [
{ ts: baseTime, worker: 'w-1', level: 'error', msg: 'Error 1' },
{ ts: baseTime + 1000, worker: 'w-1', level: 'info', msg: 'Normal event' },
{ ts: baseTime + 2000, worker: 'w-1', level: 'error', msg: 'Error 2' },
];
events.forEach(e => analytics.processEvent(e));
const metrics = analytics.getWorkerMetrics('w-1');
expect(metrics?.errorCount).toBe(2);
});
it('should calculate error rate', () => {
const events: LogEvent[] = [
{ ts: baseTime, worker: 'w-1', level: 'info', msg: 'Start', bead: 'bd-1' },
{ ts: baseTime + 1000, worker: 'w-1', level: 'error', msg: 'Error!' },
{ ts: baseTime + 2000, worker: 'w-1', level: 'info', msg: 'Completed', bead: 'bd-1' },
{ ts: baseTime + 3000, worker: 'w-1', level: 'info', msg: 'Start', bead: 'bd-2' },
{ ts: baseTime + 4000, worker: 'w-1', level: 'info', msg: 'Done', bead: 'bd-2' },
];
events.forEach(e => analytics.processEvent(e));
const metrics = analytics.getWorkerMetrics('w-1');
// 1 error, 2 beads = 0.5 error rate
expect(metrics?.errorRate).toBe(0.5);
});
it('should handle zero beads (no error rate)', () => {
const events: LogEvent[] = [
{ ts: baseTime, worker: 'w-1', level: 'error', msg: 'Error!' },
{ ts: baseTime + 1000, worker: 'w-1', level: 'error', msg: 'Another error!' },
];
events.forEach(e => analytics.processEvent(e));
const metrics = analytics.getWorkerMetrics('w-1');
expect(metrics?.errorCount).toBe(2);
expect(metrics?.errorRate).toBe(0); // No beads completed, so rate is 0
});
});
describe('Cost Tracking', () => {
it('should track cost from CostTracker', () => {
const events: LogEvent[] = [
{
ts: baseTime,
worker: 'w-1',
level: 'info',
msg: 'API call',
input_tokens: 1000,
output_tokens: 500,
model: 'claude-sonnet-4-6',
},
];
events.forEach(e => analytics.processEvent(e));
const metrics = analytics.getWorkerMetrics('w-1');
expect(metrics?.totalCostUsd).toBeGreaterThan(0);
expect(metrics?.totalTokens).toBe(1500);
});
it('should calculate cost per bead', () => {
const events: LogEvent[] = [
{
ts: baseTime,
worker: 'w-1',
level: 'info',
msg: 'Start',
bead: 'bd-1',
input_tokens: 1000,
output_tokens: 500,
},
{
ts: baseTime + 1000,
worker: 'w-1',
level: 'info',
msg: 'Completed',
bead: 'bd-1',
},
];
events.forEach(e => analytics.processEvent(e));
const metrics = analytics.getWorkerMetrics('w-1');
expect(metrics?.beadsCompleted).toBe(1);
expect(metrics?.costPerBead).toBeGreaterThan(0);
expect(metrics?.tokensPerBead).toBe(1500);
});
});
describe('Activity and Idle Time', () => {
it('should track active time', () => {
const events: LogEvent[] = [
{ ts: baseTime, worker: 'w-1', level: 'info', msg: 'Event 1' },
{ ts: baseTime + 1000, worker: 'w-1', level: 'info', msg: 'Event 2' },
{ ts: baseTime + 2000, worker: 'w-1', level: 'info', msg: 'Event 3' },
];
events.forEach(e => analytics.processEvent(e));
const metrics = analytics.getWorkerMetrics('w-1', {
startTime: baseTime,
endTime: baseTime + 10000,
});
expect(metrics?.activeTimeMs).toBeGreaterThan(0);
expect(metrics?.idleTimeMs).toBeGreaterThan(0);
});
it('should calculate idle percentage', () => {
const oneHour = 3600000;
// Only 10 minutes of activity in 1 hour
const events: LogEvent[] = [
{ ts: baseTime, worker: 'w-1', level: 'info', msg: 'Start' },
{ ts: baseTime + 600000, worker: 'w-1', level: 'info', msg: 'End' }, // 10 min later
];
events.forEach(e => analytics.processEvent(e));
const metrics = analytics.getWorkerMetrics('w-1', {
startTime: baseTime,
endTime: baseTime + oneHour,
});
// Should have significant idle time
expect(metrics?.idlePercentage).toBeGreaterThan(80);
});
it('should handle activity gaps correctly', () => {
const events: LogEvent[] = [
{ ts: baseTime, worker: 'w-1', level: 'info', msg: 'Activity 1' },
{ ts: baseTime + 1000, worker: 'w-1', level: 'info', msg: 'Activity 2' },
// 10 minute gap
{ ts: baseTime + 600000, worker: 'w-1', level: 'info', msg: 'Activity 3' },
{ ts: baseTime + 601000, worker: 'w-1', level: 'info', msg: 'Activity 4' },
];
events.forEach(e => analytics.processEvent(e));
const metrics = analytics.getWorkerMetrics('w-1', {
startTime: baseTime,
endTime: baseTime + 700000,
});
// Should have two separate activity periods
expect(metrics?.activeTimeMs).toBeLessThan(700000);
});
});
describe('Time Windows', () => {
it('should filter by time window: hour', () => {
const oneHour = 3600000;
const now = Date.now();
const events: LogEvent[] = [
{ ts: now - oneHour - 1000, worker: 'w-1', level: 'info', msg: 'Old event' },
{ ts: now - 30 * 60000, worker: 'w-1', level: 'info', msg: 'Recent event' },
];
events.forEach(e => analytics.processEvent(e));
const metrics = analytics.getWorkerMetrics('w-1', { timeWindow: 'hour' });
expect(metrics?.totalEvents).toBe(1); // Only recent event
});
it('should support custom time ranges', () => {
const events: LogEvent[] = [
{ ts: baseTime, worker: 'w-1', level: 'info', msg: 'Event 1' },
{ ts: baseTime + 5000, worker: 'w-1', level: 'info', msg: 'Event 2' },
{ ts: baseTime + 10000, worker: 'w-1', level: 'info', msg: 'Event 3' },
];
events.forEach(e => analytics.processEvent(e));
const metrics = analytics.getWorkerMetrics('w-1', {
startTime: baseTime + 4000,
endTime: baseTime + 15000,
});
expect(metrics?.totalEvents).toBe(2); // Events 2 and 3
});
});
describe('Aggregated Analytics', () => {
it('should aggregate metrics across all workers', () => {
const events: LogEvent[] = [
{ ts: baseTime, worker: 'w-1', level: 'info', msg: 'Start', bead: 'bd-1' },
{ ts: baseTime + 1000, worker: 'w-1', level: 'info', msg: 'Done', bead: 'bd-1' },
{ ts: baseTime, worker: 'w-2', level: 'info', msg: 'Start', bead: 'bd-2' },
{ ts: baseTime + 2000, worker: 'w-2', level: 'info', msg: 'Done', bead: 'bd-2' },
{ ts: baseTime, worker: 'w-3', level: 'info', msg: 'Start', bead: 'bd-3' },
{ ts: baseTime + 1500, worker: 'w-3', level: 'info', msg: 'Done', bead: 'bd-3' },
];
events.forEach(e => analytics.processEvent(e));
const aggregated = analytics.getAggregatedAnalytics();
expect(aggregated.totalWorkers).toBe(3);
expect(aggregated.totalBeadsCompleted).toBe(3);
expect(aggregated.topPerformers).toHaveLength(3);
});
it('should rank top performers', () => {
const events: LogEvent[] = [];
// w-1: 5 beads
for (let i = 0; i < 5; i++) {
events.push({ ts: baseTime + i * 1000, worker: 'w-1', level: 'info', msg: 'Start', bead: `bd-1-${i}` });
events.push({ ts: baseTime + i * 1000 + 500, worker: 'w-1', level: 'info', msg: 'Done', bead: `bd-1-${i}` });
}
// w-2: 3 beads
for (let i = 0; i < 3; i++) {
events.push({ ts: baseTime + i * 1000, worker: 'w-2', level: 'info', msg: 'Start', bead: `bd-2-${i}` });
events.push({ ts: baseTime + i * 1000 + 500, worker: 'w-2', level: 'info', msg: 'Done', bead: `bd-2-${i}` });
}
// w-3: 8 beads
for (let i = 0; i < 8; i++) {
events.push({ ts: baseTime + i * 1000, worker: 'w-3', level: 'info', msg: 'Start', bead: `bd-3-${i}` });
events.push({ ts: baseTime + i * 1000 + 500, worker: 'w-3', level: 'info', msg: 'Done', bead: `bd-3-${i}` });
}
events.forEach(e => analytics.processEvent(e));
const aggregated = analytics.getAggregatedAnalytics();
expect(aggregated.topPerformers[0].workerId).toBe('w-3');
expect(aggregated.topPerformers[0].beadsCompleted).toBe(8);
expect(aggregated.topPerformers[1].workerId).toBe('w-1');
expect(aggregated.topPerformers[2].workerId).toBe('w-2');
});
it('should identify high error rate workers', () => {
const events: LogEvent[] = [
// w-1: 2 beads, 0 errors
{ ts: baseTime, worker: 'w-1', level: 'info', msg: 'Start', bead: 'bd-1' },
{ ts: baseTime + 1000, worker: 'w-1', level: 'info', msg: 'Done', bead: 'bd-1' },
{ ts: baseTime + 2000, worker: 'w-1', level: 'info', msg: 'Start', bead: 'bd-2' },
{ ts: baseTime + 3000, worker: 'w-1', level: 'info', msg: 'Done', bead: 'bd-2' },
// w-2: 1 bead, 3 errors
{ ts: baseTime, worker: 'w-2', level: 'info', msg: 'Start', bead: 'bd-3' },
{ ts: baseTime + 500, worker: 'w-2', level: 'error', msg: 'Error 1' },
{ ts: baseTime + 700, worker: 'w-2', level: 'error', msg: 'Error 2' },
{ ts: baseTime + 900, worker: 'w-2', level: 'error', msg: 'Error 3' },
{ ts: baseTime + 1000, worker: 'w-2', level: 'info', msg: 'Done', bead: 'bd-3' },
];
events.forEach(e => analytics.processEvent(e));
const aggregated = analytics.getAggregatedAnalytics();
expect(aggregated.highErrorRateWorkers[0].workerId).toBe('w-2');
expect(aggregated.highErrorRateWorkers[0].errorRate).toBe(3);
});
});
describe('Time-Series Data', () => {
it('should create time-series snapshots', () => {
const oneHour = 3600000;
const analytics = new WorkerAnalytics(costTracker, oneHour); // 1 hour interval
// Generate events over 3 hours
for (let hour = 0; hour < 3; hour++) {
const event: LogEvent = {
ts: baseTime + hour * oneHour + 1000,
worker: 'w-1',
level: 'info',
msg: `Event at hour ${hour}`,
};
analytics.processEvent(event);
}
const timeSeriesData = analytics.getTimeSeriesData('w-1');
// Should have at least 2 snapshots (one after each hour boundary)
expect(timeSeriesData.length).toBeGreaterThanOrEqual(2);
});
it('should get performance trends', () => {
const events: LogEvent[] = [];
const oneHour = 3600000;
const analytics = new WorkerAnalytics(costTracker, oneHour);
// Generate improving performance: more beads over time
for (let hour = 0; hour < 3; hour++) {
const beadCount = (hour + 1) * 2; // 2, 4, 6 beads per hour
for (let i = 0; i < beadCount; i++) {
events.push({
ts: baseTime + hour * oneHour + i * 1000,
worker: 'w-1',
level: 'info',
msg: 'Start',
bead: `bd-${hour}-${i}`,
});
events.push({
ts: baseTime + hour * oneHour + i * 1000 + 500,
worker: 'w-1',
level: 'info',
msg: 'Done',
bead: `bd-${hour}-${i}`,
});
}
// Force a snapshot at the end of each hour
events.push({
ts: baseTime + (hour + 1) * oneHour + 1000,
worker: 'w-1',
level: 'info',
msg: 'Hourly marker',
});
}
events.forEach(e => analytics.processEvent(e));
const trend = analytics.getPerformanceTrends('w-1', 'beadsCompleted');
expect(trend.dataPoints.length).toBeGreaterThan(0);
// Trend should be improving since beads increase over time
// Note: This might be 'stable' if snapshots don't capture the progression well
});
});
describe('Options and Filtering', () => {
it('should filter by worker IDs', () => {
const events: LogEvent[] = [
{ ts: baseTime, worker: 'w-1', level: 'info', msg: 'Event 1' },
{ ts: baseTime, worker: 'w-2', level: 'info', msg: 'Event 2' },
{ ts: baseTime, worker: 'w-3', level: 'info', msg: 'Event 3' },
];
events.forEach(e => analytics.processEvent(e));
const filtered = analytics.getAllWorkerMetrics({ workerIds: ['w-1', 'w-3'] });
expect(filtered).toHaveLength(2);
expect(filtered.map(m => m.workerId).sort()).toEqual(['w-1', 'w-3']);
});
it('should filter by minimum beads completed', () => {
const events: LogEvent[] = [
{ ts: baseTime, worker: 'w-1', level: 'info', msg: 'Start', bead: 'bd-1' },
{ ts: baseTime + 1000, worker: 'w-1', level: 'info', msg: 'Done', bead: 'bd-1' },
{ ts: baseTime, worker: 'w-2', level: 'info', msg: 'Event' },
];
events.forEach(e => analytics.processEvent(e));
const filtered = analytics.getAllWorkerMetrics({ minBeadsCompleted: 1 });
expect(filtered).toHaveLength(1);
expect(filtered[0].workerId).toBe('w-1');
});
it('should limit max workers in rankings', () => {
const events: LogEvent[] = [];
// Create 20 workers
for (let i = 0; i < 20; i++) {
events.push({
ts: baseTime,
worker: `w-${i}`,
level: 'info',
msg: 'Start',
bead: `bd-${i}`,
});
events.push({
ts: baseTime + 1000,
worker: `w-${i}`,
level: 'info',
msg: 'Done',
bead: `bd-${i}`,
});
}
events.forEach(e => analytics.processEvent(e));
const aggregated = analytics.getAggregatedAnalytics({ maxWorkers: 5 });
expect(aggregated.topPerformers).toHaveLength(5);
});
});
describe('Summary Output', () => {
it('should generate formatted summary', () => {
const events: LogEvent[] = [
{ ts: baseTime, worker: 'w-1', level: 'info', msg: 'Start', bead: 'bd-1' },
{ ts: baseTime + 1000, worker: 'w-1', level: 'info', msg: 'Done', bead: 'bd-1' },
{ ts: baseTime, worker: 'w-1', level: 'error', msg: 'Error!' },
];
events.forEach(e => analytics.processEvent(e));
const summary = analytics.getSummary();
expect(summary).toContain('Worker Analytics Summary');
expect(summary).toContain('Total Workers');
expect(summary).toContain('Total Beads Completed');
expect(summary).toContain('Error Rate');
});
});
describe('Clear and Reset', () => {
it('should clear all data', () => {
const events: LogEvent[] = [
{ ts: baseTime, worker: 'w-1', level: 'info', msg: 'Event' },
{ ts: baseTime + 1000, worker: 'w-2', level: 'info', msg: 'Event' },
];
events.forEach(e => analytics.processEvent(e));
analytics.clear();
const allMetrics = analytics.getAllWorkerMetrics();
expect(allMetrics).toHaveLength(0);
const metrics = analytics.getWorkerMetrics('w-1');
expect(metrics).toBeUndefined();
});
});
});

623
src/workerAnalytics.ts Normal file
View file

@ -0,0 +1,623 @@
/**
* Worker Analytics Aggregation
*
* Tracks and aggregates worker performance metrics:
* - Beads per hour
* - Average completion time
* - Error rate
* - Cost per bead
* - Idle percentage
* - Time-series data
*/
import {
LogEvent,
WorkerMetrics,
MetricsDataPoint,
PerformanceTrend,
AggregatedAnalytics,
WorkerAnalyticsOptions,
WorkerAnalyticsStore,
TimeWindow,
} from './types.js';
import { CostTracker } from './tui/utils/costTracking.js';
const DEFAULT_OPTIONS: Required<WorkerAnalyticsOptions> = {
timeWindow: 'all',
startTime: 0,
endTime: 0, // 0 means "use worker's last activity time"
workerIds: [],
minBeadsCompleted: 0,
maxWorkers: 10,
includeTimeSeries: false,
timeSeriesInterval: 3600000, // 1 hour
};
/**
* Internal tracking data for a worker
*/
interface WorkerTrackingData {
workerId: string;
firstSeen: number;
lastSeen: number;
lastActivity: number;
// Bead tracking
beadsCompleted: number;
beadStartTimes: Map<string, number>; // beadId -> start timestamp
beadCompletionTimes: number[]; // completion durations in ms
// Error tracking
errorCount: number;
errorTimestamps: number[];
// Activity tracking
eventTimestamps: number[];
activityPeriods: Array<{ start: number; end: number }>;
// Cost tracking (updated from CostTracker)
totalCostUsd: number;
totalTokens: number;
// Time-series snapshots
timeSeriesData: MetricsDataPoint[];
}
/**
* Worker Analytics Manager
*/
export class WorkerAnalytics implements WorkerAnalyticsStore {
private workers: Map<string, WorkerTrackingData> = new Map();
private costTracker: CostTracker;
private timeSeriesInterval: number;
private lastSnapshotTime: number = 0;
constructor(costTracker?: CostTracker, timeSeriesInterval: number = 3600000) {
this.costTracker = costTracker || new CostTracker();
this.timeSeriesInterval = timeSeriesInterval;
}
/**
* Process an event and update analytics
*/
processEvent(event: LogEvent): void {
// Update cost tracker
this.costTracker.processEvent(event);
// Get or create worker tracking data
let worker = this.workers.get(event.worker);
if (!worker) {
worker = this.createWorkerTrackingData(event.worker, event.ts);
this.workers.set(event.worker, worker);
}
// Update activity tracking
worker.lastSeen = event.ts;
worker.eventTimestamps.push(event.ts);
this.updateActivityPeriods(worker, event.ts);
// Track bead events
if (event.bead) {
this.trackBeadEvent(worker, event);
}
// Track errors
if (event.level === 'error' || event.error) {
worker.errorCount++;
worker.errorTimestamps.push(event.ts);
}
// Update cost from cost tracker
const costSummary = this.costTracker.getSummary();
const workerCost = costSummary.byWorker.get(event.worker);
if (workerCost) {
worker.totalCostUsd = workerCost.costUsd;
worker.totalTokens = workerCost.total;
}
// Periodic time-series snapshot
this.maybeCreateSnapshot(event.ts);
}
/**
* Get metrics for a specific worker
*/
getWorkerMetrics(workerId: string, options: WorkerAnalyticsOptions = {}): WorkerMetrics | undefined {
const worker = this.workers.get(workerId);
if (!worker) return undefined;
const opts = this.buildOptions(options);
const { startTime, endTime } = this.getTimeRange(opts);
return this.calculateMetrics(worker, startTime, endTime);
}
/**
* Get metrics for all workers
*/
getAllWorkerMetrics(options: WorkerAnalyticsOptions = {}): WorkerMetrics[] {
const opts = this.buildOptions(options);
const { startTime, endTime } = this.getTimeRange(opts);
const allMetrics: WorkerMetrics[] = [];
for (const worker of this.workers.values()) {
// Filter by worker IDs if specified
if (opts.workerIds.length > 0 && !opts.workerIds.includes(worker.workerId)) {
continue;
}
const metrics = this.calculateMetrics(worker, startTime, endTime);
// Filter by minimum beads completed
if (metrics.beadsCompleted < opts.minBeadsCompleted) {
continue;
}
allMetrics.push(metrics);
}
return allMetrics;
}
/**
* Get aggregated analytics
*/
getAggregatedAnalytics(options: WorkerAnalyticsOptions = {}): AggregatedAnalytics {
const opts = this.buildOptions(options);
const { startTime, endTime } = this.getTimeRange(opts);
const allMetrics = this.getAllWorkerMetrics(options);
if (allMetrics.length === 0) {
return this.createEmptyAggregatedAnalytics(startTime, endTime);
}
// Calculate aggregated metrics
const totalBeadsCompleted = allMetrics.reduce((sum, m) => sum + m.beadsCompleted, 0);
const totalErrors = allMetrics.reduce((sum, m) => sum + m.errorCount, 0);
const totalCostUsd = allMetrics.reduce((sum, m) => sum + m.totalCostUsd, 0);
const avgBeadsPerHour = allMetrics.reduce((sum, m) => sum + m.beadsPerHour, 0) / allMetrics.length;
const avgCompletionTimeMs = allMetrics.reduce((sum, m) => sum + m.avgCompletionTimeMs, 0) / allMetrics.length;
const overallErrorRate = totalBeadsCompleted > 0 ? totalErrors / totalBeadsCompleted : 0;
const avgCostPerBead = totalBeadsCompleted > 0 ? totalCostUsd / totalBeadsCompleted : 0;
// Top performers (by beads completed)
const topPerformers = [...allMetrics]
.sort((a, b) => b.beadsCompleted - a.beadsCompleted)
.slice(0, opts.maxWorkers);
// Highest error rate workers
const highErrorRateWorkers = [...allMetrics]
.filter(m => m.beadsCompleted > 0)
.sort((a, b) => b.errorRate - a.errorRate)
.slice(0, opts.maxWorkers);
// Most cost-efficient workers
const costEfficientWorkers = [...allMetrics]
.filter(m => m.beadsCompleted > 0)
.sort((a, b) => a.costPerBead - b.costPerBead)
.slice(0, opts.maxWorkers);
return {
periodStart: startTime,
periodEnd: endTime,
totalWorkers: allMetrics.length,
totalBeadsCompleted,
avgBeadsPerHour,
avgCompletionTimeMs,
totalErrors,
overallErrorRate,
totalCostUsd,
avgCostPerBead,
topPerformers,
highErrorRateWorkers,
costEfficientWorkers,
};
}
/**
* Get performance trends
*/
getPerformanceTrends(workerId: string, metric: keyof WorkerMetrics, options: WorkerAnalyticsOptions = {}): PerformanceTrend {
const worker = this.workers.get(workerId);
if (!worker) {
throw new Error(`Worker ${workerId} not found`);
}
const opts = this.buildOptions(options);
const { startTime, endTime } = this.getTimeRange(opts);
// Filter time-series data by time range
const dataPoints = worker.timeSeriesData.filter(
dp => dp.timestamp >= startTime && dp.timestamp <= endTime
);
if (dataPoints.length === 0) {
return {
workerId,
metric,
dataPoints: [],
trend: 'stable',
changePercent: 0,
average: 0,
min: 0,
max: 0,
};
}
// Extract values for the specific metric
const values = dataPoints
.map(dp => dp.metrics[metric] as number)
.filter(v => v !== undefined && !isNaN(v));
if (values.length === 0) {
return {
workerId,
metric,
dataPoints,
trend: 'stable',
changePercent: 0,
average: 0,
min: 0,
max: 0,
};
}
// Calculate statistics
const average = values.reduce((sum, v) => sum + v, 0) / values.length;
const min = Math.min(...values);
const max = Math.max(...values);
// Calculate trend
const firstValue = values[0];
const lastValue = values[values.length - 1];
const changePercent = firstValue !== 0 ? ((lastValue - firstValue) / firstValue) * 100 : 0;
let trend: 'improving' | 'declining' | 'stable' = 'stable';
if (Math.abs(changePercent) > 5) {
// Determine if improvement based on metric type
const improvementMetrics = ['beadsPerHour', 'beadsCompleted', 'tokensPerBead'];
const declineMetrics = ['errorRate', 'costPerBead', 'avgCompletionTimeMs', 'idlePercentage'];
if (improvementMetrics.includes(metric)) {
trend = changePercent > 0 ? 'improving' : 'declining';
} else if (declineMetrics.includes(metric)) {
trend = changePercent < 0 ? 'improving' : 'declining';
}
}
return {
workerId,
metric,
dataPoints,
trend,
changePercent,
average,
min,
max,
};
}
/**
* Get time-series data
*/
getTimeSeriesData(workerId: string, options: WorkerAnalyticsOptions = {}): MetricsDataPoint[] {
const worker = this.workers.get(workerId);
if (!worker) return [];
const opts = this.buildOptions(options);
const { startTime, endTime } = this.getTimeRange(opts);
return worker.timeSeriesData.filter(
dp => dp.timestamp >= startTime && dp.timestamp <= endTime
);
}
/**
* Clear all analytics data
*/
clear(): void {
this.workers.clear();
this.costTracker.reset();
this.lastSnapshotTime = 0;
}
/**
* Get analytics summary as formatted string
*/
getSummary(options: WorkerAnalyticsOptions = {}): string {
const aggregated = this.getAggregatedAnalytics(options);
const lines: string[] = [];
lines.push('=== Worker Analytics Summary ===');
lines.push('');
lines.push(`Period: ${new Date(aggregated.periodStart).toLocaleString()} - ${new Date(aggregated.periodEnd).toLocaleString()}`);
lines.push(`Total Workers: ${aggregated.totalWorkers}`);
lines.push(`Total Beads Completed: ${aggregated.totalBeadsCompleted}`);
lines.push(`Average Beads/Hour: ${aggregated.avgBeadsPerHour.toFixed(2)}`);
lines.push(`Average Completion Time: ${(aggregated.avgCompletionTimeMs / 1000).toFixed(1)}s`);
lines.push(`Total Errors: ${aggregated.totalErrors}`);
lines.push(`Overall Error Rate: ${(aggregated.overallErrorRate * 100).toFixed(2)}%`);
lines.push(`Total Cost: $${aggregated.totalCostUsd.toFixed(4)}`);
lines.push(`Average Cost/Bead: $${aggregated.avgCostPerBead.toFixed(4)}`);
lines.push('');
if (aggregated.topPerformers.length > 0) {
lines.push('Top Performers:');
aggregated.topPerformers.forEach((w, i) => {
lines.push(` ${i + 1}. ${w.workerId}: ${w.beadsCompleted} beads (${w.beadsPerHour.toFixed(2)}/hr)`);
});
lines.push('');
}
if (aggregated.costEfficientWorkers.length > 0) {
lines.push('Most Cost-Efficient:');
aggregated.costEfficientWorkers.forEach((w, i) => {
lines.push(` ${i + 1}. ${w.workerId}: $${w.costPerBead.toFixed(4)}/bead`);
});
lines.push('');
}
return lines.join('\n');
}
// ============================================
// Private Helper Methods
// ============================================
private createWorkerTrackingData(workerId: string, timestamp: number): WorkerTrackingData {
return {
workerId,
firstSeen: timestamp,
lastSeen: timestamp,
lastActivity: timestamp,
beadsCompleted: 0,
beadStartTimes: new Map(),
beadCompletionTimes: [],
errorCount: 0,
errorTimestamps: [],
eventTimestamps: [],
activityPeriods: [],
totalCostUsd: 0,
totalTokens: 0,
timeSeriesData: [],
};
}
private trackBeadEvent(worker: WorkerTrackingData, event: LogEvent): void {
const beadId = event.bead!;
// Detect bead start (first mention of bead)
if (!worker.beadStartTimes.has(beadId)) {
worker.beadStartTimes.set(beadId, event.ts);
}
// Detect bead completion
const msg = event.msg?.toLowerCase() || '';
if (
msg.includes('completed') ||
msg.includes('finished') ||
msg.includes('done') ||
msg.includes('success')
) {
const startTime = worker.beadStartTimes.get(beadId);
if (startTime) {
const duration = event.ts - startTime;
worker.beadCompletionTimes.push(duration);
worker.beadsCompleted++;
worker.beadStartTimes.delete(beadId); // Clean up
}
}
}
private updateActivityPeriods(worker: WorkerTrackingData, timestamp: number): void {
const ACTIVITY_GAP_MS = 300000; // 5 minutes
if (worker.activityPeriods.length === 0) {
worker.activityPeriods.push({ start: timestamp, end: timestamp });
return;
}
const lastPeriod = worker.activityPeriods[worker.activityPeriods.length - 1];
if (timestamp - lastPeriod.end <= ACTIVITY_GAP_MS) {
// Extend current period
lastPeriod.end = timestamp;
} else {
// Start new period
worker.activityPeriods.push({ start: timestamp, end: timestamp });
}
}
private calculateMetrics(worker: WorkerTrackingData, startTime: number, endTime: number): WorkerMetrics {
// Filter events within time range
const eventsInRange = worker.eventTimestamps.filter(ts => ts >= startTime && ts <= endTime);
const errorsInRange = worker.errorTimestamps.filter(ts => ts >= startTime && ts <= endTime);
// Calculate time metrics
const periodDurationMs = endTime - startTime;
const periodDurationHours = periodDurationMs / 3600000;
// Calculate active time
let activeTimeMs = 0;
for (const period of worker.activityPeriods) {
const periodStart = Math.max(period.start, startTime);
const periodEnd = Math.min(period.end, endTime);
if (periodEnd > periodStart) {
activeTimeMs += periodEnd - periodStart;
}
}
const idleTimeMs = periodDurationMs - activeTimeMs;
const idlePercentage = periodDurationMs > 0 ? (idleTimeMs / periodDurationMs) * 100 : 0;
// Calculate bead metrics
const beadsCompleted = worker.beadsCompleted;
const beadsPerHour = periodDurationHours > 0 ? beadsCompleted / periodDurationHours : 0;
const avgCompletionTimeMs = worker.beadCompletionTimes.length > 0
? worker.beadCompletionTimes.reduce((sum, t) => sum + t, 0) / worker.beadCompletionTimes.length
: 0;
// Error metrics
const errorCount = errorsInRange.length;
const errorRate = beadsCompleted > 0 ? errorCount / beadsCompleted : 0;
// Cost metrics
const totalCostUsd = worker.totalCostUsd;
const costPerBead = beadsCompleted > 0 ? totalCostUsd / beadsCompleted : 0;
// Token metrics
const totalTokens = worker.totalTokens;
const tokensPerBead = beadsCompleted > 0 ? totalTokens / beadsCompleted : 0;
return {
workerId: worker.workerId,
periodStart: startTime,
periodEnd: endTime,
beadsCompleted,
beadsPerHour,
avgCompletionTimeMs,
errorCount,
errorRate,
totalCostUsd,
costPerBead,
activeTimeMs,
idleTimeMs,
idlePercentage,
totalEvents: eventsInRange.length,
totalTokens,
tokensPerBead,
};
}
private maybeCreateSnapshot(currentTime: number): void {
if (currentTime - this.lastSnapshotTime >= this.timeSeriesInterval) {
this.createSnapshotForAllWorkers(currentTime);
this.lastSnapshotTime = currentTime;
}
}
private createSnapshotForAllWorkers(timestamp: number): void {
for (const worker of this.workers.values()) {
const metrics = this.calculateMetrics(worker, worker.firstSeen, timestamp);
const dataPoint: MetricsDataPoint = {
timestamp,
workerId: worker.workerId,
metrics,
};
worker.timeSeriesData.push(dataPoint);
// Limit time-series data size (keep last 1000 points)
if (worker.timeSeriesData.length > 1000) {
worker.timeSeriesData.shift();
}
}
}
private buildOptions(options: WorkerAnalyticsOptions): Required<WorkerAnalyticsOptions> {
return {
...DEFAULT_OPTIONS,
...options,
workerIds: options.workerIds || [],
};
}
private getTimeRange(options: Required<WorkerAnalyticsOptions>): { startTime: number; endTime: number } {
const now = Date.now();
// If timeWindow is 'all', ignore the default times and use worker data
if (options.timeWindow === 'all' && options.startTime === 0 && options.endTime === 0) {
let startTime = 0;
let endTime = now;
// Find earliest and latest events across all workers
for (const worker of this.workers.values()) {
if (startTime === 0 || worker.firstSeen < startTime) {
startTime = worker.firstSeen;
}
if (worker.lastSeen > endTime) {
endTime = worker.lastSeen;
}
}
return { startTime, endTime };
}
// Use custom times if explicitly provided (non-zero)
if (options.startTime > 0 || options.endTime > 0) {
return {
startTime: options.startTime > 0 ? options.startTime : 0,
endTime: options.endTime > 0 ? options.endTime : now
};
}
// Use time window presets
let startTime = 0;
let endTime = now;
switch (options.timeWindow) {
case 'hour':
startTime = now - 3600000;
break;
case 'day':
startTime = now - 86400000;
break;
case 'week':
startTime = now - 604800000;
break;
case 'all':
default:
// Find earliest event across all workers
for (const worker of this.workers.values()) {
if (startTime === 0 || worker.firstSeen < startTime) {
startTime = worker.firstSeen;
}
if (worker.lastSeen > endTime) {
endTime = worker.lastSeen;
}
}
break;
}
return { startTime, endTime };
}
private createEmptyAggregatedAnalytics(startTime: number, endTime: number): AggregatedAnalytics {
return {
periodStart: startTime,
periodEnd: endTime,
totalWorkers: 0,
totalBeadsCompleted: 0,
avgBeadsPerHour: 0,
avgCompletionTimeMs: 0,
totalErrors: 0,
overallErrorRate: 0,
totalCostUsd: 0,
avgCostPerBead: 0,
topPerformers: [],
highErrorRateWorkers: [],
costEfficientWorkers: [],
};
}
}
/**
* Global worker analytics instance
*/
let globalAnalytics: WorkerAnalytics | undefined;
export function getWorkerAnalytics(): WorkerAnalytics {
if (!globalAnalytics) {
globalAnalytics = new WorkerAnalytics();
}
return globalAnalytics;
}
export function resetWorkerAnalytics(): void {
globalAnalytics = undefined;
}

File diff suppressed because one or more lines are too long