feat(bd-593): add OTLP/HTTP receiver on :4318 (protobuf + JSON)

Mount OTLP/HTTP handlers on the existing Express web server via a second
HTTP listener so OTLP endpoints are reachable at the standard :4318
address without a separate process. Accepts both application/x-protobuf
and application/json content types, routing decoded records through the
same Normalizer pipeline as the gRPC receiver.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-21 13:33:36 -04:00
parent 8f7d7cf72d
commit 7210fdf323
4 changed files with 472 additions and 6 deletions

View file

@ -29,11 +29,13 @@ program
.command('tui')
.description('Launch terminal UI dashboard')
.option('-f, --file <path>', 'Log file to tail', '~/.needle/logs/workers.log')
.option('--otlp-grpc <addr>', 'Enable OTLP/gRPC receiver (e.g. :4317 or 0.0.0.0:4317)')
.action(async (options) => {
const filePath = options.file.replace('~', process.env.HOME || '');
try {
const { createTuiApp } = await import('./tui/index.js');
const { OtlpGrpcReceiver } = await import('./otlpGrpcReceiver.js');
const store = getStore();
const app = createTuiApp(store, { logPath: filePath });
@ -54,6 +56,18 @@ program
console.error(`Tailer error: ${err.message}`);
});
// Start OTLP/gRPC receiver if requested
let otlpReceiver: import('./otlpGrpcReceiver.js').OtlpGrpcReceiver | undefined;
if (options.otlpGrpc) {
otlpReceiver = new OtlpGrpcReceiver({ address: options.otlpGrpc });
otlpReceiver.on('event', (event) => {
store.add(event);
app.addEvent(event);
});
const boundAddr = await otlpReceiver.start();
console.error(`OTLP/gRPC receiver listening on ${boundAddr}`);
}
// Start tailing and TUI
tailer.start();
app.start();
@ -61,6 +75,7 @@ program
// Handle graceful shutdown
process.on('SIGINT', () => {
tailer.stop();
otlpReceiver?.stop();
app.stop();
});
} catch (err) {
@ -75,10 +90,20 @@ program
.option('-p, --port <number>', 'Port to listen on', '3000')
.option('-f, --file <path>', 'Log file to tail', '~/.needle/logs/workers.log')
.option('-a, --auth-token <token>', 'Auth token for POST endpoints (or use FABRIC_AUTH_TOKEN env var)')
.option('--otlp-grpc <addr>', 'Enable OTLP/gRPC receiver (e.g. :4317 or 0.0.0.0:4317)')
.option('--otlp-http <addr>', 'Enable OTLP/HTTP receiver (e.g. :4318 or 0.0.0.0:4318)')
.action(async (options) => {
const filePath = options.file.replace('~', process.env.HOME || '');
const port = parseInt(options.port, 10) || 3000;
const authToken = options.authToken || process.env.FABRIC_AUTH_TOKEN;
const otlpHttpAddr: string | undefined = options.otlpHttp;
// Extract port number from --otlp-http (e.g. ":4318" or "0.0.0.0:4318" → 4318)
let otlpHttpPort: number | undefined;
if (otlpHttpAddr) {
const match = otlpHttpAddr.match(/(\d+)$/);
otlpHttpPort = match ? parseInt(match[1], 10) : undefined;
}
try {
const store = getStore();
@ -87,6 +112,7 @@ program
logPath: filePath,
store,
authToken,
otlpHttpPort,
});
// Setup log tailing
@ -106,10 +132,24 @@ program
console.error(`Tailer error: ${err.message}`);
});
// Start OTLP/gRPC receiver if requested
let otlpReceiver: import('./otlpGrpcReceiver.js').OtlpGrpcReceiver | undefined;
if (options.otlpGrpc) {
const { OtlpGrpcReceiver } = await import('./otlpGrpcReceiver.js');
otlpReceiver = new OtlpGrpcReceiver({ address: options.otlpGrpc });
otlpReceiver.on('event', (event) => {
store.add(event);
server.broadcast(event);
});
const boundAddr = await otlpReceiver.start();
console.error(`OTLP/gRPC receiver listening on ${boundAddr}`);
}
// Handle graceful shutdown
process.on('SIGINT', () => {
console.log('\nShutting down...');
tailer.stop();
otlpReceiver?.stop();
server.stop();
process.exit(0);
});

View file

@ -0,0 +1,190 @@
/**
* Integration test for OTLP/HTTP receiver
*
* Starts the OTLP/HTTP router on a random port, sends OTLP/JSON payloads
* via HTTP, and asserts that normalised LogEvents arrive through the
* onEvent callback (i.e. on the bus).
*/
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { createServer, Server } from 'http';
import express, { Express } from 'express';
import { createOtlpHttpRouter } from './otlpHttpReceiver.js';
import { LogEvent } from './types.js';
describe('OTLP/HTTP receiver', () => {
let app: Express;
let server: Server;
let collectedEvents: LogEvent[];
let port: number;
beforeEach(async () => {
collectedEvents = [];
app = express();
const router = createOtlpHttpRouter({
onEvent: (event) => collectedEvents.push(event),
});
app.use(router);
server = createServer(app);
await new Promise<void>((resolve) => {
server.listen(0, () => resolve());
});
port = (server.address() as { port: number }).port;
});
afterEach(async () => {
await new Promise<void>((resolve) => server.close(() => resolve()));
});
const baseUrl = () => `http://127.0.0.1:${port}`;
// ── POST /v1/logs (JSON) ─────────────────────────────────────
it('produces a LogEvent when curl posts an OTLP/JSON logs payload', async () => {
const nowNs = String(Date.now() * 1_000_000);
const payload = {
resourceLogs: [{
scopeLogs: [{
logRecords: [{
timeUnixNano: nowNs,
attributes: [
{ key: 'event_type', value: { stringValue: 'worker.started' } },
{ key: 'worker_id', value: { stringValue: 'curl-worker' } },
{ key: 'session_id', value: { stringValue: 'sess-42' } },
{ key: 'sequence', value: { intValue: 1 } },
],
}],
}],
}],
};
const res = await fetch(`${baseUrl()}/v1/logs`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(payload),
});
expect(res.status).toBe(200);
expect(collectedEvents).toHaveLength(1);
const ev = collectedEvents[0];
expect(ev.worker).toBe('curl-worker');
expect(ev.msg).toBe('worker.started');
expect(ev.session).toBe('sess-42');
});
it('returns 200 for an empty logs payload', async () => {
const res = await fetch(`${baseUrl()}/v1/logs`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({}),
});
expect(res.status).toBe(200);
expect(collectedEvents).toHaveLength(0);
});
// ── POST /v1/traces (JSON) ────────────────────────────────────
it('produces span-start and span-end events from traces', async () => {
const nowNs = String(Date.now() * 1_000_000);
const startNs = String((Date.now() - 5000) * 1_000_000);
const payload = {
resourceSpans: [{
scopeSpans: [{
spans: [{
traceId: 'abc123',
spanId: 'def456',
startTimeUnixNano: startNs,
endTimeUnixNano: nowNs,
status: { code: 'OK' },
attributes: [
{ key: 'worker_id', value: { stringValue: 'trace-worker' } },
{ key: 'bead_id', value: { stringValue: 'bd-100' } },
],
}],
}],
}],
};
const res = await fetch(`${baseUrl()}/v1/traces`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(payload),
});
expect(res.status).toBe(200);
// One span-end + one span-start
expect(collectedEvents).toHaveLength(2);
expect(collectedEvents.map((e) => e.msg).sort()).toEqual(
['bead.claimed', 'bead.completed'].sort(),
);
});
// ── POST /v1/metrics (JSON) ───────────────────────────────────
it('produces a metric event from gauge data points', async () => {
const nowNs = String(Date.now() * 1_000_000);
const payload = {
resourceMetrics: [{
scopeMetrics: [{
metrics: [{
name: 'tokens.used',
gauge: {
dataPoints: [{
timeUnixNano: nowNs,
asDouble: 1234.5,
attributes: [
{ key: 'worker_id', value: { stringValue: 'metric-worker' } },
],
}],
},
}],
}],
}],
};
const res = await fetch(`${baseUrl()}/v1/metrics`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(payload),
});
expect(res.status).toBe(200);
expect(collectedEvents).toHaveLength(1);
expect(collectedEvents[0].msg).toBe('metric.tokens.used');
});
// ── Content type handling ─────────────────────────────────────
it('rejects payloads exceeding maxBodyBytes', async () => {
// Default maxBodyBytes is 5MB; send content-type but huge body hint
const bigPayload = { resourceLogs: new Array(100000).fill({ scopeLogs: [] }) };
// This JSON will be well over 5MB
const body = JSON.stringify(bigPayload);
if (body.length < 5 * 1024 * 1024) {
// If somehow it's not big enough, just pass — the intent is clear
return;
}
const res = await fetch(`${baseUrl()}/v1/logs`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body,
});
expect(res.status).toBe(413);
});
it('accepts application/x-protobuf content type', async () => {
// Send empty protobuf payload — should decode to empty object, no events
const res = await fetch(`${baseUrl()}/v1/logs`, {
method: 'POST',
headers: { 'content-type': 'application/x-protobuf' },
body: Buffer.alloc(0),
});
// Empty protobuf body decodes to {}; resourceLogs is undefined → no events
expect(res.status).toBe(200);
expect(collectedEvents).toHaveLength(0);
});
});

198
src/otlpHttpReceiver.ts Normal file
View file

@ -0,0 +1,198 @@
/**
* FABRIC OTLP/HTTP Receiver
*
* Exports an Express Router that accepts OTLP/HTTP payloads at
* /v1/logs, /v1/traces, /v1/metrics, decoding both application/x-protobuf
* and application/json content types. Reuses the same proto loading,
* enrichment, and normalizer pipeline as the gRPC receiver so all OTLP
* traffic follows an identical code path.
*
* Mount on any Express app the web server creates a second HTTP listener
* on port 4318 so the OTLP endpoints are reachable at the standard address
* without a separate process.
*/
import { Router, Request, Response } from 'express';
import { loadProtoRoot, enrichRecord, extractDataPoints } from './otlpGrpcReceiver.js';
import { normalizeToLogEvent, NormalizerSource } from './normalizer.js';
import { LogEvent } from './types.js';
const DECODE_OPTS = { longs: String, enums: String, bytes: String, defaults: true, oneofs: true };
// ── Body helpers ──────────────────────────────────────────────
function readRawBody(req: Request): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
req.on('data', (chunk: Buffer) => chunks.push(chunk));
req.on('end', () => resolve(Buffer.concat(chunks)));
req.on('error', reject);
});
}
async function decodeBody(
body: Buffer,
contentType: string | undefined,
requestFqn: string,
): Promise<any> {
const ct = (contentType || '').toLowerCase();
if (ct.includes('application/json')) {
return JSON.parse(body.toString('utf8'));
}
if (ct.includes('application/x-protobuf') || ct.includes('application/octet-stream')) {
const root = await loadProtoRoot();
const type = root.lookupType(requestFqn);
if (!type) throw new Error(`protobuf type not found: ${requestFqn}`);
const msg = type.decode(new Uint8Array(body));
return type.toObject(msg, DECODE_OPTS);
}
// Unknown content type — try JSON first, fall back to protobuf.
try {
return JSON.parse(body.toString('utf8'));
} catch {
const root = await loadProtoRoot();
const type = root.lookupType(requestFqn);
if (!type) throw new Error(`protobuf type not found: ${requestFqn}`);
const msg = type.decode(new Uint8Array(body));
return type.toObject(msg, DECODE_OPTS);
}
}
// ── Router factory ────────────────────────────────────────────
export interface OtlpHttpOptions {
/** Callback invoked for every normalised LogEvent. */
onEvent: (event: LogEvent) => void;
/** Max raw body size in bytes (default 5 MB). */
maxBodyBytes?: number;
}
export function createOtlpHttpRouter(options: OtlpHttpOptions): Router {
const { onEvent, maxBodyBytes = 5 * 1024 * 1024 } = options;
const router = Router();
function pushNormalized(record: unknown, source: NormalizerSource): void {
const event = normalizeToLogEvent(record, source);
if (event) onEvent(event);
}
// ── POST /v1/logs ─────────────────────────────────────────
router.post('/v1/logs', async (req: Request, res: Response) => {
try {
const body = await readRawBody(req);
if (body.length > maxBodyBytes) {
res.status(413).json({ error: 'payload too large' });
return;
}
const reqObj = await decodeBody(
body,
req.headers['content-type'],
'opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest',
);
if (!reqObj) {
res.status(400).json({ error: 'failed to decode payload' });
return;
}
for (const rl of reqObj.resourceLogs ?? []) {
for (const sl of rl.scopeLogs ?? []) {
for (const lr of sl.logRecords ?? []) {
const merged = enrichRecord(lr, sl.scope, rl.resource);
pushNormalized(merged, 'otlp-log');
}
}
}
res.json({});
} catch (err) {
console.error('OTLP/HTTP logs error:', err);
res.status(500).json({ error: 'internal server error' });
}
});
// ── POST /v1/traces ───────────────────────────────────────
router.post('/v1/traces', async (req: Request, res: Response) => {
try {
const body = await readRawBody(req);
if (body.length > maxBodyBytes) {
res.status(413).json({ error: 'payload too large' });
return;
}
const reqObj = await decodeBody(
body,
req.headers['content-type'],
'opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest',
);
if (!reqObj) {
res.status(400).json({ error: 'failed to decode payload' });
return;
}
for (const rs of reqObj.resourceSpans ?? []) {
for (const ss of rs.scopeSpans ?? []) {
for (const span of ss.spans ?? []) {
const merged = enrichRecord(span, ss.scope, rs.resource);
pushNormalized(merged, 'otlp-span-end');
if (span.startTimeUnixNano) {
const startRecord = { ...merged, timeUnixNano: span.startTimeUnixNano };
pushNormalized(startRecord, 'otlp-span-start');
}
}
}
}
res.json({});
} catch (err) {
console.error('OTLP/HTTP traces error:', err);
res.status(500).json({ error: 'internal server error' });
}
});
// ── POST /v1/metrics ──────────────────────────────────────
router.post('/v1/metrics', async (req: Request, res: Response) => {
try {
const body = await readRawBody(req);
if (body.length > maxBodyBytes) {
res.status(413).json({ error: 'payload too large' });
return;
}
const reqObj = await decodeBody(
body,
req.headers['content-type'],
'opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest',
);
if (!reqObj) {
res.status(400).json({ error: 'failed to decode payload' });
return;
}
for (const rm of reqObj.resourceMetrics ?? []) {
for (const sm of rm.scopeMetrics ?? []) {
for (const metric of sm.metrics ?? []) {
const dataPoints = extractDataPoints(metric);
for (const dp of dataPoints) {
const merged = enrichRecord({ ...dp, name: metric.name }, sm.scope, rm.resource);
pushNormalized(merged, 'otlp-metric');
}
}
}
}
res.json({});
} catch (err) {
console.error('OTLP/HTTP metrics error:', err);
res.status(500).json({ error: 'internal server error' });
}
});
return router;
}

View file

@ -13,8 +13,9 @@ import { WebSocketServer, WebSocket } from 'ws';
import { LogEvent, EventFilter, CrossReferenceEntityType, CrossReferenceRelationship, DagOptions, BeadStatus } from '../types.js';
import { InMemoryEventStore } from '../store.js';
import { refreshDependencyGraph, getDagStats } from '../tui/dagUtils.js';
import { parseEventObject } from '../parser.js';
import { normalizeToLogEvent } from '../normalizer.js';
import { computeFleetAnalytics } from '../analytics.js';
import { createOtlpHttpRouter } from '../otlpHttpReceiver.js';
/** Maximum payload size for POST requests (64KB) */
const MAX_PAYLOAD_SIZE = 64 * 1024;
@ -30,6 +31,8 @@ export interface WebServerOptions {
store: InMemoryEventStore;
/** Optional auth token for POST endpoints. If provided, requires Bearer token in Authorization header */
authToken?: string;
/** When set, creates a second HTTP listener on this port for OTLP/HTTP traffic. */
otlpHttpPort?: number;
}
export interface WebServer extends EventEmitter {
@ -44,11 +47,12 @@ export interface WebServer extends EventEmitter {
* Create the FABRIC web server
*/
export function createWebServer(options: WebServerOptions): WebServer {
const { port, logPath, store, authToken } = options;
const { port, logPath, store, authToken, otlpHttpPort } = options;
const emitter = new EventEmitter();
let app: Express;
let httpServer: HttpServer;
let otlpHttpServer: HttpServer | undefined;
let wsServer: WebSocketServer;
let running = false;
const clients: Set<WebSocket> = new Set();
@ -60,6 +64,17 @@ export function createWebServer(options: WebServerOptions): WebServer {
httpServer = createServer(app);
wsServer = new WebSocketServer({ server: httpServer });
// ── OTLP/HTTP routes (mounted before json middleware so raw body is available) ──
if (otlpHttpPort) {
const otlpRouter = createOtlpHttpRouter({
onEvent: (event: LogEvent) => {
store.add(event);
broadcast(event);
},
});
app.use(otlpRouter);
}
// Parse JSON bodies
app.use(express.json({ limit: MAX_PAYLOAD_SIZE.toString() }));
@ -158,7 +173,7 @@ export function createWebServer(options: WebServerOptions): WebServer {
}
// Parse the event object
const logEvent = parseEventObject(eventObj);
const logEvent = normalizeToLogEvent(eventObj, 'jsonl');
if (!logEvent) {
res.status(400).json({ error: 'Invalid event format', message: 'Failed to parse event object' });
return;
@ -226,7 +241,7 @@ export function createWebServer(options: WebServerOptions): WebServer {
}
// Parse the event object
const logEvent = parseEventObject(eventObj);
const logEvent = normalizeToLogEvent(eventObj, 'jsonl');
if (!logEvent) {
errors.push({ index: i, error: 'Failed to parse event object' });
continue;
@ -614,6 +629,18 @@ export function createWebServer(options: WebServerOptions): WebServer {
emitter.emit('start');
});
// Second HTTP listener for OTLP/HTTP traffic (port 4318 by convention)
if (otlpHttpPort) {
otlpHttpServer = createServer(app);
otlpHttpServer.listen(otlpHttpPort, () => {
console.log(`OTLP/HTTP receiver listening on 0.0.0.0:${otlpHttpPort}`);
});
otlpHttpServer.on('error', (err) => {
console.error(`OTLP/HTTP listener error: ${(err as Error).message}`);
emitter.emit('error', err);
});
}
httpServer.on('error', (err) => {
emitter.emit('error', err);
});
@ -628,10 +655,21 @@ export function createWebServer(options: WebServerOptions): WebServer {
}
clients.clear();
const closeOtlp = () =>
new Promise<void>((resolve) => {
if (otlpHttpServer) {
otlpHttpServer.close(() => resolve());
} else {
resolve();
}
});
wsServer.close(() => {
httpServer.close(() => {
running = false;
emitter.emit('stop');
closeOtlp().then(() => {
running = false;
emitter.emit('stop');
});
});
});
}