From 0f7aced61bccf70bd301ed472ccfb496cd8109b7 Mon Sep 17 00:00:00 2001 From: jedarden Date: Tue, 21 Apr 2026 14:45:31 -0400 Subject: [PATCH] feat(bd-9jm): add LRU event dedup keyed on (session_id, worker_id, sequence) When FABRIC ingests events from both JSONL and OTLP sources, the same logical event can arrive twice. EventDeduplicator keeps the first arrival and silently drops duplicates with a counter for observability. Events with sequence < 0 (legacy formats) always pass through. Co-Authored-By: Claude Opus 4.7 --- src/normalizer.test.ts | 166 +++++++++++++++++++++++++++++++++++++++++ src/normalizer.ts | 61 +++++++++++++++ 2 files changed, 227 insertions(+) diff --git a/src/normalizer.test.ts b/src/normalizer.test.ts index e2311aa..6533c73 100644 --- a/src/normalizer.test.ts +++ b/src/normalizer.test.ts @@ -15,6 +15,7 @@ import { normalizeToLogEvent, needleEventToLogEvent, NormalizerSource, + EventDeduplicator, } from './normalizer.js'; import { NeedleEvent, NEEDLE_EVENT_SCHEMA_VERSION, LogEvent } from './types.js'; @@ -1154,3 +1155,168 @@ describe('cross-source parity', () => { expect(otlpResult!.duration_ms).toBe(1000); }); }); + +// ── EventDeduplicator ──────────────────────────────────────────── + +describe('EventDeduplicator', () => { + it('passes through the first occurrence of an event', () => { + const dedup = new EventDeduplicator(); + const event = canonicalNeedleEvent(); + expect(dedup.check(event)).toBe(true); + }); + + it('drops a duplicate event with the same (session_id, worker_id, sequence)', () => { + const dedup = new EventDeduplicator(); + const event = canonicalNeedleEvent(); + expect(dedup.check(event)).toBe(true); + expect(dedup.check(event)).toBe(false); + expect(dedup.droppedCount).toBe(1); + }); + + it('allows events with different sequences from the same worker', () => { + const dedup = new EventDeduplicator(); + expect(dedup.check(canonicalNeedleEvent({ sequence: 1 }))).toBe(true); + expect(dedup.check(canonicalNeedleEvent({ sequence: 2 }))).toBe(true); + expect(dedup.check(canonicalNeedleEvent({ sequence: 3 }))).toBe(true); + expect(dedup.droppedCount).toBe(0); + }); + + it('allows events with different worker_ids', () => { + const dedup = new EventDeduplicator(); + expect(dedup.check(canonicalNeedleEvent({ worker_id: 'w1' }))).toBe(true); + expect(dedup.check(canonicalNeedleEvent({ worker_id: 'w2' }))).toBe(true); + expect(dedup.droppedCount).toBe(0); + }); + + it('allows events with different session_ids', () => { + const dedup = new EventDeduplicator(); + expect(dedup.check(canonicalNeedleEvent({ session_id: 's1' }))).toBe(true); + expect(dedup.check(canonicalNeedleEvent({ session_id: 's2' }))).toBe(true); + expect(dedup.droppedCount).toBe(0); + }); + + it('passes through events with sequence < 0 (legacy, no dedup)', () => { + const dedup = new EventDeduplicator(); + const event = canonicalNeedleEvent({ sequence: -1 }); + expect(dedup.check(event)).toBe(true); + expect(dedup.check(event)).toBe(true); + expect(dedup.droppedCount).toBe(0); + }); + + it('increments droppedCount for each duplicate', () => { + const dedup = new EventDeduplicator(); + const event = canonicalNeedleEvent(); + dedup.check(event); + dedup.check(event); + dedup.check(event); + dedup.check(event); + expect(dedup.droppedCount).toBe(3); + }); + + it('resets state on reset()', () => { + const dedup = new EventDeduplicator(); + const event = canonicalNeedleEvent(); + dedup.check(event); + dedup.check(event); + expect(dedup.droppedCount).toBe(1); + expect(dedup.size).toBe(1); + + dedup.reset(); + expect(dedup.droppedCount).toBe(0); + expect(dedup.size).toBe(0); + + // Same event should be allowed again after reset + expect(dedup.check(event)).toBe(true); + }); + + it('evicts oldest entries when exceeding maxSize', () => { + const dedup = new EventDeduplicator(5); + // Fill with 5 entries + for (let i = 0; i < 5; i++) { + expect(dedup.check(canonicalNeedleEvent({ sequence: i }))).toBe(true); + } + expect(dedup.size).toBe(5); + + // Adding a 6th should trigger eviction + dedup.check(canonicalNeedleEvent({ sequence: 5 })); + expect(dedup.size).toBe(5); + + // The oldest (seq=0) should have been evicted, so re-adding it is allowed + expect(dedup.check(canonicalNeedleEvent({ sequence: 0 }))).toBe(true); + }); + + it('deduplicates same event arriving from JSONL and OTLP sources', () => { + const dedup = new EventDeduplicator(); + + // Same logical event via JSONL (canonical format) + const jsonlRaw = JSON.stringify(canonicalNeedleEvent({ + session_id: 'sess-dedup', + worker_id: 'worker-dedup', + sequence: 42, + })); + + // Same logical event via OTLP-log + const otlpRecord = { + timeUnixNano: '1772641054008000000', + attributes: [ + { key: 'event_type', value: { stringValue: 'worker.started' } }, + { key: 'worker_id', value: { stringValue: 'worker-dedup' } }, + { key: 'session_id', value: { stringValue: 'sess-dedup' } }, + { key: 'sequence', value: { intValue: '42' } }, + ], + }; + + // JSONL arrives first → keep + const jsonlNe = normalize(jsonlRaw, 'jsonl')!; + expect(jsonlNe).not.toBeNull(); + expect(dedup.check(jsonlNe)).toBe(true); + + // OTLP arrives second → drop (duplicate) + const otlpNe = normalize(otlpRecord, 'otlp-log')!; + expect(otlpNe).not.toBeNull(); + expect(dedup.check(otlpNe)).toBe(false); + expect(dedup.droppedCount).toBe(1); + }); + + it('deduplicates OTLP arriving before JSONL (order does not matter)', () => { + const dedup = new EventDeduplicator(); + + const otlpRecord = { + timeUnixNano: '1772641054008000000', + attributes: [ + { key: 'event_type', value: { stringValue: 'bead.claimed' } }, + { key: 'worker_id', value: { stringValue: 'w-reverse' } }, + { key: 'session_id', value: { stringValue: 's-reverse' } }, + { key: 'sequence', value: { intValue: '7' } }, + ], + }; + + const jsonlRaw = JSON.stringify(canonicalNeedleEvent({ + event_type: 'bead.claimed', + worker_id: 'w-reverse', + session_id: 's-reverse', + sequence: 7, + })); + + // OTLP arrives first → keep + const otlpNe = normalize(otlpRecord, 'otlp-log')!; + expect(dedup.check(otlpNe)).toBe(true); + + // JSONL arrives second → drop + const jsonlNe = normalize(jsonlRaw, 'jsonl')!; + expect(dedup.check(jsonlNe)).toBe(false); + expect(dedup.droppedCount).toBe(1); + }); + + it('reports size correctly', () => { + const dedup = new EventDeduplicator(); + expect(dedup.size).toBe(0); + dedup.check(canonicalNeedleEvent({ sequence: 1 })); + expect(dedup.size).toBe(1); + dedup.check(canonicalNeedleEvent({ sequence: 2 })); + expect(dedup.size).toBe(2); + // Duplicate doesn't increase size + dedup.check(canonicalNeedleEvent({ sequence: 1 })); + expect(dedup.size).toBe(2); + }); +}); diff --git a/src/normalizer.ts b/src/normalizer.ts index b967818..b5e299d 100644 --- a/src/normalizer.ts +++ b/src/normalizer.ts @@ -23,6 +23,67 @@ export type NormalizerSource = | 'otlp-span-end' | 'otlp-metric'; +// ── Event deduplication ─────────────────────────────────────── + +/** + * LRU dedup set keyed on (session_id, worker_id, sequence). + * + * When FABRIC ingests events from both JSONL and OTLP sources, + * the same logical event can arrive twice. The deduplicator keeps + * the first arrival and silently drops duplicates, incrementing + * `droppedCount` for observability. + * + * Events with `sequence < 0` (legacy formats without sequence) are + * always passed through — they cannot be deduped. + */ +export class EventDeduplicator { + private readonly seen: Map = new Map(); + private readonly maxSize: number; + droppedCount = 0; + + constructor(maxSize = 10_000) { + this.maxSize = maxSize; + } + + /** + * Returns `true` if this is the first occurrence (keep the event), + * `false` if it is a duplicate (drop it). + */ + check(event: NeedleEvent): boolean { + if (event.sequence < 0) return true; + + const key = `${event.session_id}\0${event.worker_id}\0${event.sequence}`; + if (this.seen.has(key)) { + this.droppedCount++; + return false; + } + + this.seen.set(key, true); + + // Evict oldest entries when over capacity + if (this.seen.size > this.maxSize) { + const excess = this.seen.size - this.maxSize; + let count = 0; + for (const k of this.seen.keys()) { + if (count >= excess) break; + this.seen.delete(k); + count++; + } + } + + return true; + } + + reset(): void { + this.seen.clear(); + this.droppedCount = 0; + } + + get size(): number { + return this.seen.size; + } +} + // ── Internal interfaces for legacy formats ──────────────────── interface NeedleWorkerObject {