feat(bd-3ip): Add POST /api/events/batch endpoint for batched NEEDLE telemetry
- Add MAX_BATCH_SIZE constant (100 events limit) - Implement POST /api/events/batch endpoint that accepts JSON array of events - Validate array format, empty batches, and batch size limits - Validate each event has required fields (ts, event) - Store all valid events via store.add() - Broadcast all ingested events via WebSocket - Return 201 with ingested count, total count, and errors array - Handle partial success (valid events processed, errors reported) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude Worker <noreply@anthropic.com>
This commit is contained in:
parent
ee90eb05a3
commit
b21df31ea4
1 changed files with 80 additions and 0 deletions
|
|
@ -18,6 +18,9 @@ import { parseEventObject } from '../parser.js';
|
|||
/** Maximum payload size for POST requests (64KB) */
|
||||
const MAX_PAYLOAD_SIZE = 64 * 1024;
|
||||
|
||||
/** Maximum number of events in a batch request */
|
||||
const MAX_BATCH_SIZE = 100;
|
||||
|
||||
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||||
|
||||
export interface WebServerOptions {
|
||||
|
|
@ -150,6 +153,83 @@ export function createWebServer(options: WebServerOptions): WebServer {
|
|||
}
|
||||
});
|
||||
|
||||
// POST endpoint to ingest batched NEEDLE telemetry events
|
||||
app.post('/api/events/batch', (req: Request, res: Response) => {
|
||||
try {
|
||||
const eventsArray = req.body;
|
||||
|
||||
// Validate request body is an array
|
||||
if (!Array.isArray(eventsArray)) {
|
||||
res.status(400).json({ error: 'Invalid request body', message: 'Expected JSON array of events' });
|
||||
return;
|
||||
}
|
||||
|
||||
// Check batch size limit
|
||||
if (eventsArray.length === 0) {
|
||||
res.status(400).json({ error: 'Empty batch', message: 'Batch must contain at least one event' });
|
||||
return;
|
||||
}
|
||||
|
||||
if (eventsArray.length > MAX_BATCH_SIZE) {
|
||||
res.status(400).json({
|
||||
error: 'Batch too large',
|
||||
message: `Batch exceeds maximum size of ${MAX_BATCH_SIZE} events (received ${eventsArray.length})`
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const ingestedEvents: LogEvent[] = [];
|
||||
const errors: { index: number; error: string }[] = [];
|
||||
|
||||
// Process each event
|
||||
for (let i = 0; i < eventsArray.length; i++) {
|
||||
const eventObj = eventsArray[i];
|
||||
|
||||
// Validate each event has required fields
|
||||
if (!eventObj || typeof eventObj !== 'object') {
|
||||
errors.push({ index: i, error: 'Invalid event object' });
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!eventObj.ts) {
|
||||
errors.push({ index: i, error: 'Missing required field "ts"' });
|
||||
continue;
|
||||
}
|
||||
if (!eventObj.event) {
|
||||
errors.push({ index: i, error: 'Missing required field "event"' });
|
||||
continue;
|
||||
}
|
||||
|
||||
// Parse the event object
|
||||
const logEvent = parseEventObject(eventObj);
|
||||
if (!logEvent) {
|
||||
errors.push({ index: i, error: 'Failed to parse event object' });
|
||||
continue;
|
||||
}
|
||||
|
||||
// Store the event
|
||||
store.add(logEvent);
|
||||
ingestedEvents.push(logEvent);
|
||||
}
|
||||
|
||||
// Broadcast all ingested events to WebSocket clients
|
||||
for (const event of ingestedEvents) {
|
||||
broadcast(event);
|
||||
}
|
||||
|
||||
// Return success with count
|
||||
res.status(201).json({
|
||||
success: true,
|
||||
ingested: ingestedEvents.length,
|
||||
total: eventsArray.length,
|
||||
errors: errors.length > 0 ? errors : undefined
|
||||
});
|
||||
} catch (err) {
|
||||
console.error('Error processing POST /api/events/batch:', err);
|
||||
res.status(500).json({ error: 'Internal server error', message: err instanceof Error ? err.message : 'Unknown error' });
|
||||
}
|
||||
});
|
||||
|
||||
// Get worker details
|
||||
app.get('/api/workers/:id', (req: Request, res: Response) => {
|
||||
const workerId = Array.isArray(req.params.id) ? req.params.id[0] : req.params.id;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue