feat(bd-0nd): add DirectoryTailer for multi-file JSONL tailing
New DirectoryTailer class that watches a directory for *.jsonl files, spawning a LogTailer per file and hot-adding new files via fs.watch. Forwards child events with source file path and deduplicates across sources using a shared EventDeduplicator. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
f37d88de3e
commit
f7483e2502
2 changed files with 299 additions and 0 deletions
190
src/directoryTailer.test.ts
Normal file
190
src/directoryTailer.test.ts
Normal file
|
|
@ -0,0 +1,190 @@
|
|||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||||
import * as fs from 'fs';
|
||||
import * as path from 'path';
|
||||
import * as os from 'os';
|
||||
import { DirectoryTailer } from './directoryTailer.js';
|
||||
import { EventDeduplicator } from './normalizer.js';
|
||||
|
||||
function makeEvent(worker: string, msg: string, sequence: number) {
|
||||
return JSON.stringify({
|
||||
timestamp: new Date().toISOString(),
|
||||
event_type: msg,
|
||||
worker_id: worker,
|
||||
session_id: 'test-session',
|
||||
sequence,
|
||||
data: {},
|
||||
});
|
||||
}
|
||||
|
||||
describe('DirectoryTailer', () => {
|
||||
let tempDir: string;
|
||||
|
||||
beforeEach(() => {
|
||||
tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'fabric-dir-test-'));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
fs.rmSync(tempDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it('emits events from multiple pre-existing JSONL files', async () => {
|
||||
const fileA = path.join(tempDir, 'a.jsonl');
|
||||
const fileB = path.join(tempDir, 'b.jsonl');
|
||||
|
||||
// Write initial content then close so size is known
|
||||
fs.writeFileSync(fileA, makeEvent('w-a', 'alpha', 1) + '\n');
|
||||
fs.writeFileSync(fileB, makeEvent('w-b', 'beta', 1) + '\n');
|
||||
|
||||
const tailer = new DirectoryTailer({ directory: tempDir });
|
||||
|
||||
const received: Array<{ msg: string; filePath: string }> = [];
|
||||
tailer.on('event', (event, filePath) => {
|
||||
received.push({ msg: event.msg, filePath });
|
||||
});
|
||||
|
||||
tailer.start();
|
||||
|
||||
// Wait for initial reads to propagate
|
||||
await new Promise((r) => setTimeout(r, 300));
|
||||
|
||||
// Append a new line to each so the watcher fires
|
||||
fs.appendFileSync(fileA, makeEvent('w-a', 'alpha2', 2) + '\n');
|
||||
fs.appendFileSync(fileB, makeEvent('w-b', 'beta2', 2) + '\n');
|
||||
|
||||
await new Promise((r) => setTimeout(r, 500));
|
||||
|
||||
tailer.stop();
|
||||
|
||||
expect(received.length).toBeGreaterThanOrEqual(2);
|
||||
const msgs = received.map((r) => r.msg);
|
||||
expect(msgs).toContain('alpha2');
|
||||
expect(msgs).toContain('beta2');
|
||||
});
|
||||
|
||||
it('hot-adds a new JSONL file and emits its events', async () => {
|
||||
const tailer = new DirectoryTailer({ directory: tempDir });
|
||||
const received: string[] = [];
|
||||
|
||||
tailer.on('event', (event) => {
|
||||
received.push(event.msg);
|
||||
});
|
||||
|
||||
tailer.start();
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
|
||||
// Create a new file after the tailer is running
|
||||
const newFile = path.join(tempDir, 'new.jsonl');
|
||||
fs.writeFileSync(newFile, '');
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
|
||||
// Append an event
|
||||
fs.appendFileSync(newFile, makeEvent('w-new', 'hot-add-event', 1) + '\n');
|
||||
|
||||
await new Promise((r) => setTimeout(r, 500));
|
||||
|
||||
tailer.stop();
|
||||
|
||||
expect(received).toContain('hot-add-event');
|
||||
});
|
||||
|
||||
it('ignores non-*.jsonl files', async () => {
|
||||
const txtFile = path.join(tempDir, 'notes.txt');
|
||||
fs.writeFileSync(txtFile, makeEvent('w-txt', 'ignored', 1) + '\n');
|
||||
|
||||
const tailer = new DirectoryTailer({ directory: tempDir });
|
||||
|
||||
let eventCount = 0;
|
||||
tailer.on('event', () => {
|
||||
eventCount++;
|
||||
});
|
||||
|
||||
tailer.start();
|
||||
await new Promise((r) => setTimeout(r, 300));
|
||||
|
||||
// Append to the txt file — should be ignored
|
||||
fs.appendFileSync(txtFile, makeEvent('w-txt', 'still-ignored', 2) + '\n');
|
||||
await new Promise((r) => setTimeout(r, 300));
|
||||
|
||||
tailer.stop();
|
||||
|
||||
expect(eventCount).toBe(0);
|
||||
});
|
||||
|
||||
it('stop() closes all child watchers', async () => {
|
||||
fs.writeFileSync(path.join(tempDir, 'a.jsonl'), '');
|
||||
fs.writeFileSync(path.join(tempDir, 'b.jsonl'), '');
|
||||
|
||||
const tailer = new DirectoryTailer({ directory: tempDir });
|
||||
|
||||
const endPromise = new Promise<void>((resolve) => {
|
||||
tailer.on('end', resolve);
|
||||
});
|
||||
|
||||
tailer.start();
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
|
||||
expect(tailer.activeFiles.length).toBe(2);
|
||||
expect(tailer.isActive).toBe(true);
|
||||
|
||||
tailer.stop();
|
||||
await endPromise;
|
||||
|
||||
expect(tailer.isActive).toBe(false);
|
||||
expect(tailer.activeFiles.length).toBe(0);
|
||||
});
|
||||
|
||||
it('deduplicates events across files', async () => {
|
||||
const dedup = new EventDeduplicator();
|
||||
|
||||
const fileA = path.join(tempDir, 'a.jsonl');
|
||||
const fileB = path.join(tempDir, 'b.jsonl');
|
||||
|
||||
// Same (worker_id, session_id, sequence) in both files
|
||||
const dupEvent = JSON.stringify({
|
||||
timestamp: new Date().toISOString(),
|
||||
event_type: 'bead.claimed',
|
||||
worker_id: 'w-dup',
|
||||
session_id: 's-dup',
|
||||
sequence: 42,
|
||||
data: {},
|
||||
});
|
||||
|
||||
fs.writeFileSync(fileA, '');
|
||||
fs.writeFileSync(fileB, '');
|
||||
|
||||
const tailer = new DirectoryTailer({ directory: tempDir, deduplicator: dedup });
|
||||
const received: string[] = [];
|
||||
|
||||
tailer.on('event', (event) => {
|
||||
received.push(event.msg);
|
||||
});
|
||||
|
||||
tailer.start();
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
|
||||
// Append the same event to both files
|
||||
fs.appendFileSync(fileA, dupEvent + '\n');
|
||||
fs.appendFileSync(fileB, dupEvent + '\n');
|
||||
|
||||
await new Promise((r) => setTimeout(r, 500));
|
||||
|
||||
tailer.stop();
|
||||
|
||||
// Only one should have been emitted (dedup drops the second)
|
||||
const claimedCount = received.filter((m) => m === 'bead.claimed').length;
|
||||
expect(claimedCount).toBe(1);
|
||||
});
|
||||
|
||||
it('emits error when directory does not exist', async () => {
|
||||
const tailer = new DirectoryTailer({ directory: '/nonexistent/dir' });
|
||||
|
||||
const errorPromise = new Promise<Error>((resolve) => {
|
||||
tailer.on('error', resolve);
|
||||
});
|
||||
|
||||
tailer.start();
|
||||
|
||||
const err = await errorPromise;
|
||||
expect(err.message).toContain('Directory not found');
|
||||
});
|
||||
});
|
||||
109
src/directoryTailer.ts
Normal file
109
src/directoryTailer.ts
Normal file
|
|
@ -0,0 +1,109 @@
|
|||
/**
|
||||
* FABRIC Directory Tailer
|
||||
*
|
||||
* Watches a directory for *.jsonl files, spawning a LogTailer per file.
|
||||
* Hot-adds new *.jsonl files via fs.watch rename events.
|
||||
*/
|
||||
|
||||
import * as fs from 'fs';
|
||||
import * as path from 'path';
|
||||
import { EventEmitter } from 'events';
|
||||
import { LogTailer, TailerEvents } from './tailer.js';
|
||||
import { EventDeduplicator } from './normalizer.js';
|
||||
|
||||
export interface DirectoryTailerOptions {
|
||||
/** Directory to watch for *.jsonl files */
|
||||
directory: string;
|
||||
|
||||
/** Shared deduplicator for cross-source dedup. */
|
||||
deduplicator?: EventDeduplicator;
|
||||
}
|
||||
|
||||
export class DirectoryTailer extends EventEmitter {
|
||||
private directory: string;
|
||||
private deduplicator?: EventDeduplicator;
|
||||
private children: Map<string, LogTailer> = new Map();
|
||||
private dirWatcher?: fs.FSWatcher;
|
||||
private stopped: boolean = false;
|
||||
|
||||
constructor(options: DirectoryTailerOptions) {
|
||||
super();
|
||||
this.directory = options.directory;
|
||||
this.deduplicator = options.deduplicator;
|
||||
}
|
||||
|
||||
start(): void {
|
||||
if (!fs.existsSync(this.directory)) {
|
||||
this.emit('error', new Error(`Directory not found: ${this.directory}`));
|
||||
return;
|
||||
}
|
||||
|
||||
// Spawn tailers for existing *.jsonl files
|
||||
const entries = fs.readdirSync(this.directory);
|
||||
for (const entry of entries) {
|
||||
if (entry.endsWith('.jsonl')) {
|
||||
this.spawnTailer(path.join(this.directory, entry));
|
||||
}
|
||||
}
|
||||
|
||||
// Watch for new files
|
||||
this.dirWatcher = fs.watch(this.directory, (eventType, filename) => {
|
||||
if (this.stopped) return;
|
||||
if (!filename || !filename.endsWith('.jsonl')) return;
|
||||
if (eventType === 'rename') {
|
||||
const fullPath = path.join(this.directory, filename);
|
||||
// Small delay — the file may not be fully created yet
|
||||
setTimeout(() => {
|
||||
if (this.stopped) return;
|
||||
if (fs.existsSync(fullPath) && !this.children.has(fullPath)) {
|
||||
this.spawnTailer(fullPath);
|
||||
}
|
||||
}, 50);
|
||||
}
|
||||
});
|
||||
|
||||
this.dirWatcher.on('error', (err) => {
|
||||
this.emit('error', err);
|
||||
});
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
this.stopped = true;
|
||||
if (this.dirWatcher) {
|
||||
this.dirWatcher.close();
|
||||
this.dirWatcher = undefined;
|
||||
}
|
||||
for (const tailer of this.children.values()) {
|
||||
tailer.stop();
|
||||
}
|
||||
this.children.clear();
|
||||
this.emit('end');
|
||||
}
|
||||
|
||||
get isActive(): boolean {
|
||||
return !this.stopped;
|
||||
}
|
||||
|
||||
get activeFiles(): string[] {
|
||||
return [...this.children.keys()];
|
||||
}
|
||||
|
||||
private spawnTailer(filePath: string): void {
|
||||
if (this.children.has(filePath)) return;
|
||||
|
||||
const tailer = new LogTailer({
|
||||
path: filePath,
|
||||
follow: true,
|
||||
lines: 0,
|
||||
deduplicator: this.deduplicator,
|
||||
});
|
||||
|
||||
// Forward child events
|
||||
tailer.on('line', (line: string) => this.emit('line', line, filePath));
|
||||
tailer.on('event', (event) => this.emit('event', event, filePath));
|
||||
tailer.on('error', (err: Error) => this.emit('error', err));
|
||||
|
||||
this.children.set(filePath, tailer);
|
||||
tailer.start();
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue