feat(bd-9jm): add LRU event dedup keyed on (session_id, worker_id, sequence)

When FABRIC ingests events from both JSONL and OTLP sources, the same
logical event can arrive twice. EventDeduplicator keeps the first
arrival and silently drops duplicates with a counter for observability.
Events with sequence < 0 (legacy formats) always pass through.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-21 14:45:31 -04:00
parent 6805bff904
commit 0f7aced61b
2 changed files with 227 additions and 0 deletions

View file

@ -15,6 +15,7 @@ import {
normalizeToLogEvent,
needleEventToLogEvent,
NormalizerSource,
EventDeduplicator,
} from './normalizer.js';
import { NeedleEvent, NEEDLE_EVENT_SCHEMA_VERSION, LogEvent } from './types.js';
@ -1154,3 +1155,168 @@ describe('cross-source parity', () => {
expect(otlpResult!.duration_ms).toBe(1000);
});
});
// ── EventDeduplicator ────────────────────────────────────────────
describe('EventDeduplicator', () => {
it('passes through the first occurrence of an event', () => {
const dedup = new EventDeduplicator();
const event = canonicalNeedleEvent();
expect(dedup.check(event)).toBe(true);
});
it('drops a duplicate event with the same (session_id, worker_id, sequence)', () => {
const dedup = new EventDeduplicator();
const event = canonicalNeedleEvent();
expect(dedup.check(event)).toBe(true);
expect(dedup.check(event)).toBe(false);
expect(dedup.droppedCount).toBe(1);
});
it('allows events with different sequences from the same worker', () => {
const dedup = new EventDeduplicator();
expect(dedup.check(canonicalNeedleEvent({ sequence: 1 }))).toBe(true);
expect(dedup.check(canonicalNeedleEvent({ sequence: 2 }))).toBe(true);
expect(dedup.check(canonicalNeedleEvent({ sequence: 3 }))).toBe(true);
expect(dedup.droppedCount).toBe(0);
});
it('allows events with different worker_ids', () => {
const dedup = new EventDeduplicator();
expect(dedup.check(canonicalNeedleEvent({ worker_id: 'w1' }))).toBe(true);
expect(dedup.check(canonicalNeedleEvent({ worker_id: 'w2' }))).toBe(true);
expect(dedup.droppedCount).toBe(0);
});
it('allows events with different session_ids', () => {
const dedup = new EventDeduplicator();
expect(dedup.check(canonicalNeedleEvent({ session_id: 's1' }))).toBe(true);
expect(dedup.check(canonicalNeedleEvent({ session_id: 's2' }))).toBe(true);
expect(dedup.droppedCount).toBe(0);
});
it('passes through events with sequence < 0 (legacy, no dedup)', () => {
const dedup = new EventDeduplicator();
const event = canonicalNeedleEvent({ sequence: -1 });
expect(dedup.check(event)).toBe(true);
expect(dedup.check(event)).toBe(true);
expect(dedup.droppedCount).toBe(0);
});
it('increments droppedCount for each duplicate', () => {
const dedup = new EventDeduplicator();
const event = canonicalNeedleEvent();
dedup.check(event);
dedup.check(event);
dedup.check(event);
dedup.check(event);
expect(dedup.droppedCount).toBe(3);
});
it('resets state on reset()', () => {
const dedup = new EventDeduplicator();
const event = canonicalNeedleEvent();
dedup.check(event);
dedup.check(event);
expect(dedup.droppedCount).toBe(1);
expect(dedup.size).toBe(1);
dedup.reset();
expect(dedup.droppedCount).toBe(0);
expect(dedup.size).toBe(0);
// Same event should be allowed again after reset
expect(dedup.check(event)).toBe(true);
});
it('evicts oldest entries when exceeding maxSize', () => {
const dedup = new EventDeduplicator(5);
// Fill with 5 entries
for (let i = 0; i < 5; i++) {
expect(dedup.check(canonicalNeedleEvent({ sequence: i }))).toBe(true);
}
expect(dedup.size).toBe(5);
// Adding a 6th should trigger eviction
dedup.check(canonicalNeedleEvent({ sequence: 5 }));
expect(dedup.size).toBe(5);
// The oldest (seq=0) should have been evicted, so re-adding it is allowed
expect(dedup.check(canonicalNeedleEvent({ sequence: 0 }))).toBe(true);
});
it('deduplicates same event arriving from JSONL and OTLP sources', () => {
const dedup = new EventDeduplicator();
// Same logical event via JSONL (canonical format)
const jsonlRaw = JSON.stringify(canonicalNeedleEvent({
session_id: 'sess-dedup',
worker_id: 'worker-dedup',
sequence: 42,
}));
// Same logical event via OTLP-log
const otlpRecord = {
timeUnixNano: '1772641054008000000',
attributes: [
{ key: 'event_type', value: { stringValue: 'worker.started' } },
{ key: 'worker_id', value: { stringValue: 'worker-dedup' } },
{ key: 'session_id', value: { stringValue: 'sess-dedup' } },
{ key: 'sequence', value: { intValue: '42' } },
],
};
// JSONL arrives first → keep
const jsonlNe = normalize(jsonlRaw, 'jsonl')!;
expect(jsonlNe).not.toBeNull();
expect(dedup.check(jsonlNe)).toBe(true);
// OTLP arrives second → drop (duplicate)
const otlpNe = normalize(otlpRecord, 'otlp-log')!;
expect(otlpNe).not.toBeNull();
expect(dedup.check(otlpNe)).toBe(false);
expect(dedup.droppedCount).toBe(1);
});
it('deduplicates OTLP arriving before JSONL (order does not matter)', () => {
const dedup = new EventDeduplicator();
const otlpRecord = {
timeUnixNano: '1772641054008000000',
attributes: [
{ key: 'event_type', value: { stringValue: 'bead.claimed' } },
{ key: 'worker_id', value: { stringValue: 'w-reverse' } },
{ key: 'session_id', value: { stringValue: 's-reverse' } },
{ key: 'sequence', value: { intValue: '7' } },
],
};
const jsonlRaw = JSON.stringify(canonicalNeedleEvent({
event_type: 'bead.claimed',
worker_id: 'w-reverse',
session_id: 's-reverse',
sequence: 7,
}));
// OTLP arrives first → keep
const otlpNe = normalize(otlpRecord, 'otlp-log')!;
expect(dedup.check(otlpNe)).toBe(true);
// JSONL arrives second → drop
const jsonlNe = normalize(jsonlRaw, 'jsonl')!;
expect(dedup.check(jsonlNe)).toBe(false);
expect(dedup.droppedCount).toBe(1);
});
it('reports size correctly', () => {
const dedup = new EventDeduplicator();
expect(dedup.size).toBe(0);
dedup.check(canonicalNeedleEvent({ sequence: 1 }));
expect(dedup.size).toBe(1);
dedup.check(canonicalNeedleEvent({ sequence: 2 }));
expect(dedup.size).toBe(2);
// Duplicate doesn't increase size
dedup.check(canonicalNeedleEvent({ sequence: 1 }));
expect(dedup.size).toBe(2);
});
});

View file

@ -23,6 +23,67 @@ export type NormalizerSource =
| 'otlp-span-end'
| 'otlp-metric';
// ── Event deduplication ───────────────────────────────────────
/**
* LRU dedup set keyed on (session_id, worker_id, sequence).
*
* When FABRIC ingests events from both JSONL and OTLP sources,
* the same logical event can arrive twice. The deduplicator keeps
* the first arrival and silently drops duplicates, incrementing
* `droppedCount` for observability.
*
* Events with `sequence < 0` (legacy formats without sequence) are
* always passed through they cannot be deduped.
*/
export class EventDeduplicator {
private readonly seen: Map<string, true> = new Map();
private readonly maxSize: number;
droppedCount = 0;
constructor(maxSize = 10_000) {
this.maxSize = maxSize;
}
/**
* Returns `true` if this is the first occurrence (keep the event),
* `false` if it is a duplicate (drop it).
*/
check(event: NeedleEvent): boolean {
if (event.sequence < 0) return true;
const key = `${event.session_id}\0${event.worker_id}\0${event.sequence}`;
if (this.seen.has(key)) {
this.droppedCount++;
return false;
}
this.seen.set(key, true);
// Evict oldest entries when over capacity
if (this.seen.size > this.maxSize) {
const excess = this.seen.size - this.maxSize;
let count = 0;
for (const k of this.seen.keys()) {
if (count >= excess) break;
this.seen.delete(k);
count++;
}
}
return true;
}
reset(): void {
this.seen.clear();
this.droppedCount = 0;
}
get size(): number {
return this.seen.size;
}
}
// ── Internal interfaces for legacy formats ────────────────────
interface NeedleWorkerObject {