feat(bd-ch6.1): bound DirectoryTailer memory and fd usage via LRU active set

- Add startPosition option and currentPosition getter to LogTailer so
  evicted tailers can be resumed from their last read byte offset
- Rewrite DirectoryTailer with a bounded active set (maxActiveFiles=200):
  only the N most-recently-modified files have open watchers; older files
  are tracked in a fileInfo Map but not watched
- LRU eviction: when the active set is full and a new file needs activation,
  the least-recently-active tailer is stopped (position checkpointed) and
  replaced
- Re-activation: the poll loop (default 30 s) detects mtime changes in
  inactive files and opens them from their saved position so no bytes are
  missed or replayed
- RSS back-pressure: skip new activations when process.memoryUsage().rss
  exceeds maxRssBytes (default 400 MB)
- Hot-add new files via fs.watch rename, always reading from position 0
- Add three new tests: 10k-file cap assertion, LRU eviction+reactivation,
  and position-checkpoint correctness

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-23 22:13:18 -04:00
parent 19450d3047
commit 755669e73a
3 changed files with 370 additions and 23 deletions

View file

@ -187,4 +187,124 @@ describe('DirectoryTailer', () => {
const err = await errorPromise;
expect(err.message).toContain('Directory not found');
});
it('caps active-file count and inotify watches with many files', async () => {
const COUNT = 10_000;
const MAX_ACTIVE = 100;
// Create COUNT empty *.jsonl files synchronously.
for (let i = 0; i < COUNT; i++) {
fs.writeFileSync(path.join(tempDir, `worker-${String(i).padStart(5, '0')}.jsonl`), '');
}
const tailer = new DirectoryTailer({
directory: tempDir,
maxActiveFiles: MAX_ACTIVE,
recentMtimeMs: 86_400_000,
});
tailer.start();
await new Promise((r) => setTimeout(r, 1500));
// Active set must be bounded.
expect(tailer.activeFiles.length).toBeLessThanOrEqual(MAX_ACTIVE);
// All files must be tracked in fileInfo.
expect(tailer.knownFileCount).toBe(COUNT);
// On Linux, each fs.FSWatcher corresponds to one inotify watch. Check the
// open-fd count as a proxy (Node.js uses one inotify fd shared by all
// watches, so the actual fd count stays very small regardless of watch count,
// but inotify watches are bounded by maxActiveFiles + 1 for the dir watcher).
if (fs.existsSync('/proc/self/fd')) {
const fdCount = fs.readdirSync('/proc/self/fd').length;
// Generous ceiling: baseline fds (~20) + MAX_ACTIVE + dir watcher + some slack.
expect(fdCount).toBeLessThan(MAX_ACTIVE + 60);
}
tailer.stop();
}, 60_000);
it('evicts LRU and re-activates a file on mtime change', async () => {
// maxActiveFiles=2 with 3 files forces an eviction.
const fileA = path.join(tempDir, 'a.jsonl');
const fileB = path.join(tempDir, 'b.jsonl');
const fileC = path.join(tempDir, 'c.jsonl');
fs.writeFileSync(fileA, '');
fs.writeFileSync(fileB, '');
fs.writeFileSync(fileC, '');
const tailer = new DirectoryTailer({
directory: tempDir,
maxActiveFiles: 2,
recentMtimeMs: 86_400_000,
inactiveCheckIntervalMs: 200, // fast poll for test
});
const received: string[] = [];
tailer.on('event', (event) => {
received.push(event.msg);
});
tailer.start();
await new Promise((r) => setTimeout(r, 300));
// Only 2 of 3 files should be active.
expect(tailer.activeFiles.length).toBe(2);
// Find the evicted file and write content to it.
const evicted = [fileA, fileB, fileC].find(
(f) => !tailer.activeFiles.includes(f),
)!;
expect(evicted).toBeDefined();
// Append an event to the evicted file — the poll should re-activate it.
fs.appendFileSync(evicted, makeEvent('w-evicted', 'reactivated-event', 1) + '\n');
// Wait for the poll interval to fire and re-activate.
await new Promise((r) => setTimeout(r, 800));
tailer.stop();
expect(received).toContain('reactivated-event');
});
it('resumes from saved position when a file is re-activated after eviction', async () => {
const fileA = path.join(tempDir, 'a.jsonl');
const fileB = path.join(tempDir, 'b.jsonl');
fs.writeFileSync(fileA, makeEvent('w-a', 'before-eviction', 1) + '\n');
fs.writeFileSync(fileB, '');
// maxActiveFiles=1 so opening fileB will evict fileA.
const tailer = new DirectoryTailer({
directory: tempDir,
maxActiveFiles: 1,
recentMtimeMs: 86_400_000,
inactiveCheckIntervalMs: 200,
});
const received: string[] = [];
tailer.on('event', (event) => received.push(event.msg));
tailer.start();
await new Promise((r) => setTimeout(r, 400));
// Exactly one file is active; the other is inactive.
expect(tailer.activeFiles.length).toBe(1);
// Write to the inactive file to trigger re-activation.
const inactive = tailer.activeFiles[0] === fileA ? fileB : fileA;
fs.appendFileSync(inactive, makeEvent('w-inactive', 'after-reactivation', 2) + '\n');
await new Promise((r) => setTimeout(r, 800));
tailer.stop();
// The event written after re-activation must have been received.
expect(received).toContain('after-reactivation');
// The event written before eviction (to fileA at start time) should NOT
// have been re-emitted when fileA was re-activated (position is checkpointed).
const beforeCount = received.filter((m) => m === 'before-eviction').length;
expect(beforeCount).toBe(0);
});
});

View file

@ -1,8 +1,16 @@
/**
* FABRIC Directory Tailer
*
* Watches a directory for *.jsonl files, spawning a LogTailer per file.
* Hot-adds new *.jsonl files via fs.watch rename events.
* Watches a directory for *.jsonl files and tails a bounded active set of
* them (LRU, capped at maxActiveFiles). Inactive files are tracked but not
* watched; when their mtime changes they are re-opened from the last saved
* byte position. This keeps file-descriptor and heap usage O(maxActiveFiles)
* regardless of how many total *.jsonl files exist in the directory.
*
* Memory / fd ceiling (documented contract):
* - Open inotify watches maxActiveFiles + 1 (1 for the dir watcher)
* - Heap per active tailer 15 KB (FSWatcher + EventEmitter + buffers)
* - fileInfo Map entries = total discovered *.jsonl count (~200 B each)
*/
import * as fs from 'fs';
@ -17,19 +25,69 @@ export interface DirectoryTailerOptions {
/** Shared deduplicator for cross-source dedup. */
deduplicator?: EventDeduplicator;
/**
* Maximum number of concurrently open file watchers.
* LRU eviction kicks in when the active set would exceed this.
* Default: 200.
*/
maxActiveFiles?: number;
/**
* At startup, only activate files whose mtime is within this many
* milliseconds of now. Older files are registered in fileInfo but left
* inactive until their mtime changes. Default: 86_400_000 (24 h).
*/
recentMtimeMs?: number;
/**
* How often (ms) the inactive-file poll runs to detect mtime changes.
* Default: 30_000 (30 s).
*/
inactiveCheckIntervalMs?: number;
/**
* Process RSS bytes threshold above which new activations are skipped
* (back-pressure). The LRU eviction path still runs so that the most
* recently used file can displace a stale one. Default: 400 MB.
*/
maxRssBytes?: number;
}
interface FileInfo {
/** Last observed mtime (ms since epoch). */
mtime: number;
/** Byte offset of the last confirmed read — used to resume after eviction. */
position: number;
/** Wall-clock ms of the last 'event' emitted from this file. */
lastActivity: number;
}
export class DirectoryTailer extends EventEmitter {
private directory: string;
private deduplicator?: EventDeduplicator;
private maxActiveFiles: number;
private recentMtimeMs: number;
private inactiveCheckIntervalMs: number;
private maxRssBytes: number;
/** Metadata for every discovered *.jsonl file (active and inactive). */
private fileInfo: Map<string, FileInfo> = new Map();
/** Currently open tailers (subset of fileInfo, size ≤ maxActiveFiles). */
private children: Map<string, LogTailer> = new Map();
private dirWatcher?: fs.FSWatcher;
private pollInterval?: ReturnType<typeof setInterval>;
private stopped: boolean = false;
constructor(options: DirectoryTailerOptions) {
super();
this.directory = options.directory;
this.deduplicator = options.deduplicator;
this.maxActiveFiles = options.maxActiveFiles ?? 200;
this.recentMtimeMs = options.recentMtimeMs ?? 86_400_000;
this.inactiveCheckIntervalMs = options.inactiveCheckIntervalMs ?? 30_000;
this.maxRssBytes = options.maxRssBytes ?? 400 * 1024 * 1024;
}
start(): void {
@ -38,37 +96,79 @@ export class DirectoryTailer extends EventEmitter {
return;
}
// Spawn tailers for existing *.jsonl files
const entries = fs.readdirSync(this.directory);
for (const entry of entries) {
if (entry.endsWith('.jsonl')) {
this.spawnTailer(path.join(this.directory, entry));
const now = Date.now();
const candidates: Array<{ fullPath: string; mtime: number; size: number }> = [];
for (const entry of fs.readdirSync(this.directory)) {
if (!entry.endsWith('.jsonl')) continue;
const fullPath = path.join(this.directory, entry);
try {
const stat = fs.statSync(fullPath);
// Register all files; position = stat.size so initial activation
// starts from the current end (don't re-emit historical events).
this.fileInfo.set(fullPath, {
mtime: stat.mtimeMs,
position: stat.size,
lastActivity: 0,
});
if (now - stat.mtimeMs <= this.recentMtimeMs) {
candidates.push({ fullPath, mtime: stat.mtimeMs, size: stat.size });
}
} catch {
// Skip files we cannot stat (race with deletion, permissions, etc.)
}
}
// Watch for new files
// Activate the most recently modified files first, up to the cap.
candidates.sort((a, b) => b.mtime - a.mtime);
for (let i = 0; i < Math.min(candidates.length, this.maxActiveFiles); i++) {
this.activateFile(candidates[i].fullPath);
}
// Watch for new files appearing in the directory.
this.dirWatcher = fs.watch(this.directory, (eventType, filename) => {
if (this.stopped) return;
if (!filename || !filename.endsWith('.jsonl')) return;
if (eventType === 'rename') {
const fullPath = path.join(this.directory, filename);
// Small delay — the file may not be fully created yet
setTimeout(() => {
if (this.stopped) return;
if (fs.existsSync(fullPath) && !this.children.has(fullPath)) {
this.spawnTailer(fullPath);
if (eventType !== 'rename') return;
const fullPath = path.join(this.directory, filename);
// Small delay — the file may not be fully created yet.
setTimeout(() => {
if (this.stopped) return;
if (!fs.existsSync(fullPath)) return;
if (this.children.has(fullPath)) return;
try {
const stat = fs.statSync(fullPath);
if (!this.fileInfo.has(fullPath)) {
// Brand-new file: start reading from byte 0 so we capture all content.
this.fileInfo.set(fullPath, {
mtime: stat.mtimeMs,
position: 0,
lastActivity: 0,
});
}
}, 50);
}
} catch {
return;
}
this.activateWithEviction(fullPath);
}, 50);
});
this.dirWatcher.on('error', (err) => {
this.emit('error', err);
});
this.dirWatcher.on('error', (err) => this.emit('error', err));
// Periodically check inactive files for mtime changes.
this.pollInterval = setInterval(
() => this.pollInactiveFiles(),
this.inactiveCheckIntervalMs,
);
}
stop(): void {
this.stopped = true;
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = undefined;
}
if (this.dirWatcher) {
this.dirWatcher.close();
this.dirWatcher = undefined;
@ -84,26 +184,129 @@ export class DirectoryTailer extends EventEmitter {
return !this.stopped;
}
/** Paths of files with an open watcher right now. */
get activeFiles(): string[] {
return [...this.children.keys()];
}
private spawnTailer(filePath: string): void {
/** Total number of *.jsonl files discovered (active + inactive). */
get knownFileCount(): number {
return this.fileInfo.size;
}
// ---------------------------------------------------------------------------
// Internal helpers
// ---------------------------------------------------------------------------
private activateFile(filePath: string): void {
if (this.children.has(filePath)) return;
const info = this.fileInfo.get(filePath);
const tailer = new LogTailer({
path: filePath,
follow: true,
lines: 0,
deduplicator: this.deduplicator,
// info.position drives the start offset:
// • stat.size → initial activation, starts at EOF (no history replayed)
// • 0 → new hot-added file, reads all existing content
// • savedPos → re-activation after eviction, resumes from checkpoint
startPosition: info?.position,
});
// Forward child events
tailer.on('line', (line: string) => this.emit('line', line, filePath));
tailer.on('event', (event) => this.emit('event', event, filePath));
tailer.on('event', (event) => {
const fi = this.fileInfo.get(filePath);
if (fi) fi.lastActivity = Date.now();
this.emit('event', event, filePath);
});
tailer.on('error', (err: Error) => this.emit('error', err));
this.children.set(filePath, tailer);
tailer.start();
}
/**
* Activate a file, evicting the least-recently-active tailer first if the
* active set is at capacity. Also applies RSS back-pressure.
*/
private activateWithEviction(filePath: string): void {
if (this.children.has(filePath)) return;
// RSS back-pressure: if memory is tight and we're already at the cap, only
// proceed by evicting an existing tailer (don't grow the set further).
const underMemPressure = process.memoryUsage().rss > this.maxRssBytes;
if (underMemPressure && this.children.size >= this.maxActiveFiles) {
this.evictLRU();
} else if (!underMemPressure && this.children.size >= this.maxActiveFiles) {
this.evictLRU();
} else if (underMemPressure) {
// Memory pressure but still room in active set — skip activation.
return;
}
this.activateFile(filePath);
}
/**
* Save the current read position of the least-recently-active tailer, stop
* it, and remove it from the active set so a new file can take its slot.
*/
private evictLRU(): void {
let lruPath: string | undefined;
let lruTime = Infinity;
for (const filePath of this.children.keys()) {
const info = this.fileInfo.get(filePath);
const lastActivity = info?.lastActivity ?? 0;
if (lastActivity < lruTime) {
lruTime = lastActivity;
lruPath = filePath;
}
}
if (!lruPath) return;
const tailer = this.children.get(lruPath);
if (tailer) {
const info = this.fileInfo.get(lruPath);
if (info) info.position = tailer.currentPosition;
tailer.stop();
this.children.delete(lruPath);
}
}
/**
* Iterate inactive files and re-activate any whose mtime has advanced since
* we last observed it. Also evicts under memory pressure.
*/
private pollInactiveFiles(): void {
if (this.stopped) return;
// Opportunistic eviction when RSS is high.
if (process.memoryUsage().rss > this.maxRssBytes && this.children.size > 0) {
this.evictLRU();
}
for (const [filePath, info] of this.fileInfo) {
if (this.children.has(filePath)) continue;
try {
const stat = fs.statSync(filePath);
if (stat.mtimeMs > info.mtime) {
info.mtime = stat.mtimeMs;
if (this.children.size >= this.maxActiveFiles) {
this.evictLRU();
}
if (this.children.size < this.maxActiveFiles) {
this.activateFile(filePath);
}
}
} catch {
// File was deleted — remove from tracking.
this.fileInfo.delete(filePath);
}
}
}
}

View file

@ -25,6 +25,14 @@ export interface TailerOptions {
/** Shared deduplicator for cross-source dedup (JSONL + OTLP). */
deduplicator?: EventDeduplicator;
/**
* Start reading from this byte offset instead of end-of-file.
* When set, any content between startPosition and current EOF is read
* immediately on start (catch-up read), then the tailer follows new writes.
* Use 0 to read the whole file from the beginning.
*/
startPosition?: number;
}
export interface TailerEvents {
@ -40,6 +48,7 @@ export class LogTailer extends EventEmitter {
private follow: boolean;
private lines: number;
private deduplicator?: EventDeduplicator;
private startPositionOpt?: number;
private watcher?: fs.FSWatcher;
private position: number = 0;
private buffer: string = '';
@ -52,6 +61,12 @@ export class LogTailer extends EventEmitter {
this.follow = options.follow ?? true;
this.lines = options.lines ?? 0;
this.deduplicator = options.deduplicator;
this.startPositionOpt = options.startPosition;
}
/** Current read position in bytes (useful for checkpointing before eviction). */
get currentPosition(): number {
return this.position;
}
/**
@ -77,6 +92,8 @@ export class LogTailer extends EventEmitter {
// Read existing content if requested
if (this.lines > 0) {
this.readExistingLines();
} else if (this.startPositionOpt !== undefined) {
this.position = this.startPositionOpt;
} else {
// Start from end of file
const stats = fs.statSync(this.filePath);
@ -87,6 +104,13 @@ export class LogTailer extends EventEmitter {
if (this.follow) {
this.watch();
}
// When given an explicit start position, do an immediate catch-up read to
// consume any bytes written between the checkpoint and now, then rely on
// the watcher for future writes.
if (this.startPositionOpt !== undefined) {
this.readNewContent();
}
}
/**