/** * Integration test for OTLP/HTTP receiver * * Starts the OTLP/HTTP router on a random port, sends OTLP/JSON payloads * via HTTP, and asserts that normalised LogEvents arrive through the * onEvent callback (i.e. on the bus). */ import { describe, it, expect, beforeEach, afterEach } from 'vitest'; import { createServer, Server } from 'http'; import express, { Express } from 'express'; import { createOtlpHttpRouter } from './otlpHttpReceiver.js'; import { LogEvent } from './types.js'; describe('OTLP/HTTP receiver', () => { let app: Express; let server: Server; let collectedEvents: LogEvent[]; let port: number; beforeEach(async () => { collectedEvents = []; app = express(); const router = createOtlpHttpRouter({ onEvent: (event) => collectedEvents.push(event), }); app.use(router); server = createServer(app); await new Promise((resolve) => { server.listen(0, () => resolve()); }); port = (server.address() as { port: number }).port; }); afterEach(async () => { await new Promise((resolve) => server.close(() => resolve())); }); const baseUrl = () => `http://127.0.0.1:${port}`; // ── POST /v1/logs (JSON) ───────────────────────────────────── it('produces a LogEvent when curl posts an OTLP/JSON logs payload', async () => { const nowNs = String(Date.now() * 1_000_000); const payload = { resourceLogs: [{ scopeLogs: [{ logRecords: [{ timeUnixNano: nowNs, attributes: [ { key: 'event_type', value: { stringValue: 'worker.started' } }, { key: 'worker_id', value: { stringValue: 'curl-worker' } }, { key: 'session_id', value: { stringValue: 'sess-42' } }, { key: 'sequence', value: { intValue: 1 } }, ], }], }], }], }; const res = await fetch(`${baseUrl()}/v1/logs`, { method: 'POST', headers: { 'content-type': 'application/json' }, body: JSON.stringify(payload), }); expect(res.status).toBe(200); expect(collectedEvents).toHaveLength(1); const ev = collectedEvents[0]; expect(ev.worker).toBe('curl-worker'); expect(ev.msg).toBe('worker.started'); expect(ev.session).toBe('sess-42'); }); it('returns 200 for an empty logs payload', async () => { const res = await fetch(`${baseUrl()}/v1/logs`, { method: 'POST', headers: { 'content-type': 'application/json' }, body: JSON.stringify({}), }); expect(res.status).toBe(200); expect(collectedEvents).toHaveLength(0); }); // ── POST /v1/traces (JSON) ──────────────────────────────────── it('produces span-start and span-end events from traces', async () => { const nowNs = String(Date.now() * 1_000_000); const startNs = String((Date.now() - 5000) * 1_000_000); const payload = { resourceSpans: [{ scopeSpans: [{ spans: [{ traceId: 'abc123', spanId: 'def456', startTimeUnixNano: startNs, endTimeUnixNano: nowNs, status: { code: 'OK' }, attributes: [ { key: 'worker_id', value: { stringValue: 'trace-worker' } }, { key: 'bead_id', value: { stringValue: 'bd-100' } }, ], }], }], }], }; const res = await fetch(`${baseUrl()}/v1/traces`, { method: 'POST', headers: { 'content-type': 'application/json' }, body: JSON.stringify(payload), }); expect(res.status).toBe(200); // One span-end + one span-start expect(collectedEvents).toHaveLength(2); expect(collectedEvents.map((e) => e.msg).sort()).toEqual( ['bead.claimed', 'bead.completed'].sort(), ); }); // ── POST /v1/metrics (JSON) ─────────────────────────────────── it('produces a metric event from gauge data points', async () => { const nowNs = String(Date.now() * 1_000_000); const payload = { resourceMetrics: [{ scopeMetrics: [{ metrics: [{ name: 'tokens.used', gauge: { dataPoints: [{ timeUnixNano: nowNs, asDouble: 1234.5, attributes: [ { key: 'worker_id', value: { stringValue: 'metric-worker' } }, ], }], }, }], }], }], }; const res = await fetch(`${baseUrl()}/v1/metrics`, { method: 'POST', headers: { 'content-type': 'application/json' }, body: JSON.stringify(payload), }); expect(res.status).toBe(200); expect(collectedEvents).toHaveLength(1); expect(collectedEvents[0].msg).toBe('metric.tokens.used'); }); // ── Content type handling ───────────────────────────────────── it('rejects payloads exceeding maxBodyBytes', async () => { // Default maxBodyBytes is 5MB; send content-type but huge body hint const bigPayload = { resourceLogs: new Array(100000).fill({ scopeLogs: [] }) }; // This JSON will be well over 5MB const body = JSON.stringify(bigPayload); if (body.length < 5 * 1024 * 1024) { // If somehow it's not big enough, just pass — the intent is clear return; } const res = await fetch(`${baseUrl()}/v1/logs`, { method: 'POST', headers: { 'content-type': 'application/json' }, body, }); expect(res.status).toBe(413); }); it('accepts application/x-protobuf content type', async () => { // Send empty protobuf payload — should decode to empty object, no events const res = await fetch(`${baseUrl()}/v1/logs`, { method: 'POST', headers: { 'content-type': 'application/x-protobuf' }, body: Buffer.alloc(0), }); // Empty protobuf body decodes to {}; resourceLogs is undefined → no events expect(res.status).toBe(200); expect(collectedEvents).toHaveLength(0); }); });