feat(bd-2bt): add POST /api/events endpoint to ingest NEEDLE telemetry

Add HTTP POST endpoint to receive NEEDLE telemetry events from the
fabric.sh forwarder. This bridges NEEDLE and FABRIC, enabling real-time
event ingestion via HTTP.

Changes:
- Add parseEventObject() to parser.ts for parsing JSON objects directly
- Add POST /api/events endpoint with JSON body parser (64KB limit)
- Validate required fields (ts, event) before processing
- Store events and broadcast to WebSocket clients in real-time
- Return 201 Created on success, 400 for invalid payloads

Acceptance criteria met:
- NEEDLE events sent via curl POST arrive in FABRIC's event store
- Events are broadcast to WebSocket clients in real-time
- Invalid payloads return appropriate error codes (400)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
default 2026-03-09 15:06:55 +00:00
parent 3d6f19ec43
commit ee90eb05a3
2 changed files with 107 additions and 0 deletions

View file

@ -219,6 +219,64 @@ function inferLogLevel(eventName: string): LogLevel {
return 'info';
}
/**
* Parse a JSON object directly into a LogEvent
*
* Used for HTTP-ingested events that are already parsed as JSON objects.
*
* @param obj - Parsed JSON object
* @returns Parsed LogEvent or null if invalid
*/
export function parseEventObject(obj: unknown): LogEvent | null {
if (typeof obj !== 'object' || obj === null) {
return null;
}
// Check for NEEDLE format
if (isNeedleFormat(obj)) {
return parseNeedleFormat(obj);
}
// Try as legacy format - validate required fields
const parsed = obj as Record<string, unknown>;
if (typeof parsed.ts !== 'number') {
return null;
}
if (typeof parsed.worker !== 'string') {
return null;
}
if (!isValidLogLevel(parsed.level)) {
return null;
}
if (typeof parsed.msg !== 'string') {
return null;
}
// Construct LogEvent with validated fields
const event: LogEvent = {
ts: parsed.ts,
worker: parsed.worker,
level: parsed.level,
msg: parsed.msg,
};
// Copy optional fields if present
if (typeof parsed.tool === 'string') event.tool = parsed.tool;
if (typeof parsed.path === 'string') event.path = parsed.path;
if (typeof parsed.bead === 'string') event.bead = parsed.bead;
if (typeof parsed.duration_ms === 'number') event.duration_ms = parsed.duration_ms;
if (typeof parsed.error === 'string') event.error = parsed.error;
// Copy any additional fields
for (const key of Object.keys(parsed)) {
if (!isStandardField(key) && !(key in event)) {
event[key] = parsed[key];
}
}
return event;
}
/**
* Parse multiple log lines
*

View file

@ -13,6 +13,10 @@ import { WebSocketServer, WebSocket } from 'ws';
import { LogEvent, EventFilter, CrossReferenceEntityType, CrossReferenceRelationship, DagOptions, BeadStatus } from '../types.js';
import { InMemoryEventStore } from '../store.js';
import { refreshDependencyGraph, getDagStats } from '../tui/dagUtils.js';
import { parseEventObject } from '../parser.js';
/** Maximum payload size for POST requests (64KB) */
const MAX_PAYLOAD_SIZE = 64 * 1024;
const __dirname = dirname(fileURLToPath(import.meta.url));
@ -50,6 +54,9 @@ export function createWebServer(options: WebServerOptions): WebServer {
httpServer = createServer(app);
wsServer = new WebSocketServer({ server: httpServer });
// JSON body parser for POST requests
app.use(express.json({ limit: MAX_PAYLOAD_SIZE }));
// WebSocket connection handling
wsServer.on('connection', (ws: WebSocket) => {
clients.add(ws);
@ -101,6 +108,48 @@ export function createWebServer(options: WebServerOptions): WebServer {
res.json(events);
});
// POST endpoint to ingest NEEDLE telemetry events
app.post('/api/events', (req: Request, res: Response) => {
try {
const eventObj = req.body;
// Validate request body exists
if (!eventObj || typeof eventObj !== 'object') {
res.status(400).json({ error: 'Invalid request body', message: 'Expected JSON object' });
return;
}
// Validate required fields for NEEDLE format
if (!eventObj.ts) {
res.status(400).json({ error: 'Missing required field', message: 'Field "ts" is required' });
return;
}
if (!eventObj.event) {
res.status(400).json({ error: 'Missing required field', message: 'Field "event" is required' });
return;
}
// Parse the event object
const logEvent = parseEventObject(eventObj);
if (!logEvent) {
res.status(400).json({ error: 'Invalid event format', message: 'Failed to parse event object' });
return;
}
// Store the event
store.add(logEvent);
// Broadcast to all connected WebSocket clients
broadcast(logEvent);
// Return success
res.status(201).json({ success: true, event: logEvent });
} catch (err) {
console.error('Error processing POST /api/events:', err);
res.status(500).json({ error: 'Internal server error', message: err instanceof Error ? err.message : 'Unknown error' });
}
});
// Get worker details
app.get('/api/workers/:id', (req: Request, res: Response) => {
const workerId = Array.isArray(req.params.id) ? req.params.id[0] : req.params.id;