diff --git a/src/directoryTailer.test.ts b/src/directoryTailer.test.ts index 6c4ba24..569ea6a 100644 --- a/src/directoryTailer.test.ts +++ b/src/directoryTailer.test.ts @@ -187,4 +187,124 @@ describe('DirectoryTailer', () => { const err = await errorPromise; expect(err.message).toContain('Directory not found'); }); + + it('caps active-file count and inotify watches with many files', async () => { + const COUNT = 10_000; + const MAX_ACTIVE = 100; + + // Create COUNT empty *.jsonl files synchronously. + for (let i = 0; i < COUNT; i++) { + fs.writeFileSync(path.join(tempDir, `worker-${String(i).padStart(5, '0')}.jsonl`), ''); + } + + const tailer = new DirectoryTailer({ + directory: tempDir, + maxActiveFiles: MAX_ACTIVE, + recentMtimeMs: 86_400_000, + }); + + tailer.start(); + await new Promise((r) => setTimeout(r, 1500)); + + // Active set must be bounded. + expect(tailer.activeFiles.length).toBeLessThanOrEqual(MAX_ACTIVE); + // All files must be tracked in fileInfo. + expect(tailer.knownFileCount).toBe(COUNT); + + // On Linux, each fs.FSWatcher corresponds to one inotify watch. Check the + // open-fd count as a proxy (Node.js uses one inotify fd shared by all + // watches, so the actual fd count stays very small regardless of watch count, + // but inotify watches are bounded by maxActiveFiles + 1 for the dir watcher). + if (fs.existsSync('/proc/self/fd')) { + const fdCount = fs.readdirSync('/proc/self/fd').length; + // Generous ceiling: baseline fds (~20) + MAX_ACTIVE + dir watcher + some slack. + expect(fdCount).toBeLessThan(MAX_ACTIVE + 60); + } + + tailer.stop(); + }, 60_000); + + it('evicts LRU and re-activates a file on mtime change', async () => { + // maxActiveFiles=2 with 3 files forces an eviction. + const fileA = path.join(tempDir, 'a.jsonl'); + const fileB = path.join(tempDir, 'b.jsonl'); + const fileC = path.join(tempDir, 'c.jsonl'); + + fs.writeFileSync(fileA, ''); + fs.writeFileSync(fileB, ''); + fs.writeFileSync(fileC, ''); + + const tailer = new DirectoryTailer({ + directory: tempDir, + maxActiveFiles: 2, + recentMtimeMs: 86_400_000, + inactiveCheckIntervalMs: 200, // fast poll for test + }); + + const received: string[] = []; + tailer.on('event', (event) => { + received.push(event.msg); + }); + + tailer.start(); + await new Promise((r) => setTimeout(r, 300)); + + // Only 2 of 3 files should be active. + expect(tailer.activeFiles.length).toBe(2); + + // Find the evicted file and write content to it. + const evicted = [fileA, fileB, fileC].find( + (f) => !tailer.activeFiles.includes(f), + )!; + expect(evicted).toBeDefined(); + + // Append an event to the evicted file — the poll should re-activate it. + fs.appendFileSync(evicted, makeEvent('w-evicted', 'reactivated-event', 1) + '\n'); + + // Wait for the poll interval to fire and re-activate. + await new Promise((r) => setTimeout(r, 800)); + + tailer.stop(); + + expect(received).toContain('reactivated-event'); + }); + + it('resumes from saved position when a file is re-activated after eviction', async () => { + const fileA = path.join(tempDir, 'a.jsonl'); + const fileB = path.join(tempDir, 'b.jsonl'); + + fs.writeFileSync(fileA, makeEvent('w-a', 'before-eviction', 1) + '\n'); + fs.writeFileSync(fileB, ''); + + // maxActiveFiles=1 so opening fileB will evict fileA. + const tailer = new DirectoryTailer({ + directory: tempDir, + maxActiveFiles: 1, + recentMtimeMs: 86_400_000, + inactiveCheckIntervalMs: 200, + }); + + const received: string[] = []; + tailer.on('event', (event) => received.push(event.msg)); + + tailer.start(); + await new Promise((r) => setTimeout(r, 400)); + + // Exactly one file is active; the other is inactive. + expect(tailer.activeFiles.length).toBe(1); + + // Write to the inactive file to trigger re-activation. + const inactive = tailer.activeFiles[0] === fileA ? fileB : fileA; + fs.appendFileSync(inactive, makeEvent('w-inactive', 'after-reactivation', 2) + '\n'); + + await new Promise((r) => setTimeout(r, 800)); + tailer.stop(); + + // The event written after re-activation must have been received. + expect(received).toContain('after-reactivation'); + // The event written before eviction (to fileA at start time) should NOT + // have been re-emitted when fileA was re-activated (position is checkpointed). + const beforeCount = received.filter((m) => m === 'before-eviction').length; + expect(beforeCount).toBe(0); + }); }); diff --git a/src/directoryTailer.ts b/src/directoryTailer.ts index 6764e28..1390937 100644 --- a/src/directoryTailer.ts +++ b/src/directoryTailer.ts @@ -1,8 +1,16 @@ /** * FABRIC Directory Tailer * - * Watches a directory for *.jsonl files, spawning a LogTailer per file. - * Hot-adds new *.jsonl files via fs.watch rename events. + * Watches a directory for *.jsonl files and tails a bounded active set of + * them (LRU, capped at maxActiveFiles). Inactive files are tracked but not + * watched; when their mtime changes they are re-opened from the last saved + * byte position. This keeps file-descriptor and heap usage O(maxActiveFiles) + * regardless of how many total *.jsonl files exist in the directory. + * + * Memory / fd ceiling (documented contract): + * - Open inotify watches ≤ maxActiveFiles + 1 (1 for the dir watcher) + * - Heap per active tailer ≈ 1–5 KB (FSWatcher + EventEmitter + buffers) + * - fileInfo Map entries = total discovered *.jsonl count (~200 B each) */ import * as fs from 'fs'; @@ -17,19 +25,69 @@ export interface DirectoryTailerOptions { /** Shared deduplicator for cross-source dedup. */ deduplicator?: EventDeduplicator; + + /** + * Maximum number of concurrently open file watchers. + * LRU eviction kicks in when the active set would exceed this. + * Default: 200. + */ + maxActiveFiles?: number; + + /** + * At startup, only activate files whose mtime is within this many + * milliseconds of now. Older files are registered in fileInfo but left + * inactive until their mtime changes. Default: 86_400_000 (24 h). + */ + recentMtimeMs?: number; + + /** + * How often (ms) the inactive-file poll runs to detect mtime changes. + * Default: 30_000 (30 s). + */ + inactiveCheckIntervalMs?: number; + + /** + * Process RSS bytes threshold above which new activations are skipped + * (back-pressure). The LRU eviction path still runs so that the most + * recently used file can displace a stale one. Default: 400 MB. + */ + maxRssBytes?: number; +} + +interface FileInfo { + /** Last observed mtime (ms since epoch). */ + mtime: number; + /** Byte offset of the last confirmed read — used to resume after eviction. */ + position: number; + /** Wall-clock ms of the last 'event' emitted from this file. */ + lastActivity: number; } export class DirectoryTailer extends EventEmitter { private directory: string; private deduplicator?: EventDeduplicator; + private maxActiveFiles: number; + private recentMtimeMs: number; + private inactiveCheckIntervalMs: number; + private maxRssBytes: number; + + /** Metadata for every discovered *.jsonl file (active and inactive). */ + private fileInfo: Map = new Map(); + /** Currently open tailers (subset of fileInfo, size ≤ maxActiveFiles). */ private children: Map = new Map(); + private dirWatcher?: fs.FSWatcher; + private pollInterval?: ReturnType; private stopped: boolean = false; constructor(options: DirectoryTailerOptions) { super(); this.directory = options.directory; this.deduplicator = options.deduplicator; + this.maxActiveFiles = options.maxActiveFiles ?? 200; + this.recentMtimeMs = options.recentMtimeMs ?? 86_400_000; + this.inactiveCheckIntervalMs = options.inactiveCheckIntervalMs ?? 30_000; + this.maxRssBytes = options.maxRssBytes ?? 400 * 1024 * 1024; } start(): void { @@ -38,37 +96,79 @@ export class DirectoryTailer extends EventEmitter { return; } - // Spawn tailers for existing *.jsonl files - const entries = fs.readdirSync(this.directory); - for (const entry of entries) { - if (entry.endsWith('.jsonl')) { - this.spawnTailer(path.join(this.directory, entry)); + const now = Date.now(); + const candidates: Array<{ fullPath: string; mtime: number; size: number }> = []; + + for (const entry of fs.readdirSync(this.directory)) { + if (!entry.endsWith('.jsonl')) continue; + const fullPath = path.join(this.directory, entry); + try { + const stat = fs.statSync(fullPath); + // Register all files; position = stat.size so initial activation + // starts from the current end (don't re-emit historical events). + this.fileInfo.set(fullPath, { + mtime: stat.mtimeMs, + position: stat.size, + lastActivity: 0, + }); + if (now - stat.mtimeMs <= this.recentMtimeMs) { + candidates.push({ fullPath, mtime: stat.mtimeMs, size: stat.size }); + } + } catch { + // Skip files we cannot stat (race with deletion, permissions, etc.) } } - // Watch for new files + // Activate the most recently modified files first, up to the cap. + candidates.sort((a, b) => b.mtime - a.mtime); + for (let i = 0; i < Math.min(candidates.length, this.maxActiveFiles); i++) { + this.activateFile(candidates[i].fullPath); + } + + // Watch for new files appearing in the directory. this.dirWatcher = fs.watch(this.directory, (eventType, filename) => { if (this.stopped) return; if (!filename || !filename.endsWith('.jsonl')) return; - if (eventType === 'rename') { - const fullPath = path.join(this.directory, filename); - // Small delay — the file may not be fully created yet - setTimeout(() => { - if (this.stopped) return; - if (fs.existsSync(fullPath) && !this.children.has(fullPath)) { - this.spawnTailer(fullPath); + if (eventType !== 'rename') return; + + const fullPath = path.join(this.directory, filename); + // Small delay — the file may not be fully created yet. + setTimeout(() => { + if (this.stopped) return; + if (!fs.existsSync(fullPath)) return; + if (this.children.has(fullPath)) return; + try { + const stat = fs.statSync(fullPath); + if (!this.fileInfo.has(fullPath)) { + // Brand-new file: start reading from byte 0 so we capture all content. + this.fileInfo.set(fullPath, { + mtime: stat.mtimeMs, + position: 0, + lastActivity: 0, + }); } - }, 50); - } + } catch { + return; + } + this.activateWithEviction(fullPath); + }, 50); }); - this.dirWatcher.on('error', (err) => { - this.emit('error', err); - }); + this.dirWatcher.on('error', (err) => this.emit('error', err)); + + // Periodically check inactive files for mtime changes. + this.pollInterval = setInterval( + () => this.pollInactiveFiles(), + this.inactiveCheckIntervalMs, + ); } stop(): void { this.stopped = true; + if (this.pollInterval) { + clearInterval(this.pollInterval); + this.pollInterval = undefined; + } if (this.dirWatcher) { this.dirWatcher.close(); this.dirWatcher = undefined; @@ -84,26 +184,129 @@ export class DirectoryTailer extends EventEmitter { return !this.stopped; } + /** Paths of files with an open watcher right now. */ get activeFiles(): string[] { return [...this.children.keys()]; } - private spawnTailer(filePath: string): void { + /** Total number of *.jsonl files discovered (active + inactive). */ + get knownFileCount(): number { + return this.fileInfo.size; + } + + // --------------------------------------------------------------------------- + // Internal helpers + // --------------------------------------------------------------------------- + + private activateFile(filePath: string): void { if (this.children.has(filePath)) return; + const info = this.fileInfo.get(filePath); + const tailer = new LogTailer({ path: filePath, follow: true, lines: 0, deduplicator: this.deduplicator, + // info.position drives the start offset: + // • stat.size → initial activation, starts at EOF (no history replayed) + // • 0 → new hot-added file, reads all existing content + // • savedPos → re-activation after eviction, resumes from checkpoint + startPosition: info?.position, }); - // Forward child events tailer.on('line', (line: string) => this.emit('line', line, filePath)); - tailer.on('event', (event) => this.emit('event', event, filePath)); + tailer.on('event', (event) => { + const fi = this.fileInfo.get(filePath); + if (fi) fi.lastActivity = Date.now(); + this.emit('event', event, filePath); + }); tailer.on('error', (err: Error) => this.emit('error', err)); this.children.set(filePath, tailer); tailer.start(); } + + /** + * Activate a file, evicting the least-recently-active tailer first if the + * active set is at capacity. Also applies RSS back-pressure. + */ + private activateWithEviction(filePath: string): void { + if (this.children.has(filePath)) return; + + // RSS back-pressure: if memory is tight and we're already at the cap, only + // proceed by evicting an existing tailer (don't grow the set further). + const underMemPressure = process.memoryUsage().rss > this.maxRssBytes; + if (underMemPressure && this.children.size >= this.maxActiveFiles) { + this.evictLRU(); + } else if (!underMemPressure && this.children.size >= this.maxActiveFiles) { + this.evictLRU(); + } else if (underMemPressure) { + // Memory pressure but still room in active set — skip activation. + return; + } + + this.activateFile(filePath); + } + + /** + * Save the current read position of the least-recently-active tailer, stop + * it, and remove it from the active set so a new file can take its slot. + */ + private evictLRU(): void { + let lruPath: string | undefined; + let lruTime = Infinity; + + for (const filePath of this.children.keys()) { + const info = this.fileInfo.get(filePath); + const lastActivity = info?.lastActivity ?? 0; + if (lastActivity < lruTime) { + lruTime = lastActivity; + lruPath = filePath; + } + } + + if (!lruPath) return; + + const tailer = this.children.get(lruPath); + if (tailer) { + const info = this.fileInfo.get(lruPath); + if (info) info.position = tailer.currentPosition; + tailer.stop(); + this.children.delete(lruPath); + } + } + + /** + * Iterate inactive files and re-activate any whose mtime has advanced since + * we last observed it. Also evicts under memory pressure. + */ + private pollInactiveFiles(): void { + if (this.stopped) return; + + // Opportunistic eviction when RSS is high. + if (process.memoryUsage().rss > this.maxRssBytes && this.children.size > 0) { + this.evictLRU(); + } + + for (const [filePath, info] of this.fileInfo) { + if (this.children.has(filePath)) continue; + + try { + const stat = fs.statSync(filePath); + if (stat.mtimeMs > info.mtime) { + info.mtime = stat.mtimeMs; + if (this.children.size >= this.maxActiveFiles) { + this.evictLRU(); + } + if (this.children.size < this.maxActiveFiles) { + this.activateFile(filePath); + } + } + } catch { + // File was deleted — remove from tracking. + this.fileInfo.delete(filePath); + } + } + } } diff --git a/src/tailer.ts b/src/tailer.ts index f2a2176..764c5b3 100644 --- a/src/tailer.ts +++ b/src/tailer.ts @@ -25,6 +25,14 @@ export interface TailerOptions { /** Shared deduplicator for cross-source dedup (JSONL + OTLP). */ deduplicator?: EventDeduplicator; + + /** + * Start reading from this byte offset instead of end-of-file. + * When set, any content between startPosition and current EOF is read + * immediately on start (catch-up read), then the tailer follows new writes. + * Use 0 to read the whole file from the beginning. + */ + startPosition?: number; } export interface TailerEvents { @@ -40,6 +48,7 @@ export class LogTailer extends EventEmitter { private follow: boolean; private lines: number; private deduplicator?: EventDeduplicator; + private startPositionOpt?: number; private watcher?: fs.FSWatcher; private position: number = 0; private buffer: string = ''; @@ -52,6 +61,12 @@ export class LogTailer extends EventEmitter { this.follow = options.follow ?? true; this.lines = options.lines ?? 0; this.deduplicator = options.deduplicator; + this.startPositionOpt = options.startPosition; + } + + /** Current read position in bytes (useful for checkpointing before eviction). */ + get currentPosition(): number { + return this.position; } /** @@ -77,6 +92,8 @@ export class LogTailer extends EventEmitter { // Read existing content if requested if (this.lines > 0) { this.readExistingLines(); + } else if (this.startPositionOpt !== undefined) { + this.position = this.startPositionOpt; } else { // Start from end of file const stats = fs.statSync(this.filePath); @@ -87,6 +104,13 @@ export class LogTailer extends EventEmitter { if (this.follow) { this.watch(); } + + // When given an explicit start position, do an immediate catch-up read to + // consume any bytes written between the checkpoint and now, then rely on + // the watcher for future writes. + if (this.startPositionOpt !== undefined) { + this.readNewContent(); + } } /**