Add POST /event/normalized endpoint that accepts pre-normalized stuck/unstuck/ registered/ended events directly from harness-agnostic adapters. Decision rationale: - Keeps adapter layer clean — adapters emit normalized events directly - Avoids coupling non-Claude sources to Claude-specific data structures - The normalized contract is already the internal model — we expose it directly Event schema matches daemon/types.ts NormalizedEvent union type. This enables the tmux detector to post events without wrapping them in Claude hook format, making the adapter layer harness-agnostic.
264 lines
9.9 KiB
TypeScript
264 lines
9.9 KiB
TypeScript
// Trail Boss daemon: ingest endpoint, state, queue, reconcile loop
|
|
import * as http from "http";
|
|
import type { HookEvent, NormalizedEvent } from "./types.ts";
|
|
import { adaptHookEvent, isStuckEvent, isUnstuckEvent, isSessionRegistered, isSessionEnded } from "./claude-adapter.ts";
|
|
import { upsertSession, deleteSession, enqueue, dequeue, dequeueByPaneId, skipHead, getHead, getStuckCount, getAllStuck, cleanupQueue } from "./db.ts";
|
|
import { startReconcileLoop, reconcileStuckDirection } from "./reconcile.ts";
|
|
|
|
const PORT = 4000;
|
|
const HOST = "127.0.0.1"; // Loopback only
|
|
const SKIP_COOLDOWN_MS = 30_000; // 30 seconds
|
|
|
|
// Run stuck-direction reconcile on startup to recover sessions that became stuck while daemon was down
|
|
console.log("[startup] running stuck-direction reconcile...");
|
|
const stuckResult = reconcileStuckDirection();
|
|
if (stuckResult.enqueued > 0) {
|
|
console.log(`[startup] enqueued ${stuckResult.enqueued}/${stuckResult.checked} sessions from transcripts`);
|
|
} else {
|
|
console.log(`[startup] no stuck sessions recovered from transcripts (${stuckResult.checked} checked)`);
|
|
}
|
|
|
|
// Start reconcile loop (runs every 5s by default)
|
|
startReconcileLoop(5000);
|
|
|
|
// Cleanup old queue entries hourly
|
|
setInterval(() => cleanupQueue(), 60 * 60 * 1000);
|
|
|
|
const server = http.createServer(async (req, res) => {
|
|
const url = new URL(req.url || "", `http://${req.headers.host}`);
|
|
|
|
try {
|
|
// POST /event - hook ingest endpoint
|
|
if (req.method === "POST" && url.pathname === "/event") {
|
|
const paneId = req.headers["x-tmux-pane"] as string | undefined;
|
|
if (!paneId) {
|
|
res.writeHead(400, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ error: "Missing X-Tmux-Pane header" }));
|
|
return;
|
|
}
|
|
|
|
const body: string = await new Promise((resolve) => {
|
|
let data = "";
|
|
req.on("data", (chunk) => (data += chunk));
|
|
req.on("end", () => resolve(data));
|
|
});
|
|
|
|
let raw: HookEvent;
|
|
try {
|
|
raw = JSON.parse(body) as HookEvent;
|
|
} catch {
|
|
res.writeHead(400, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ error: "Invalid JSON" }));
|
|
return;
|
|
}
|
|
|
|
const event = adaptHookEvent(raw, paneId);
|
|
|
|
if (isStuckEvent(event)) {
|
|
// Clean up any bootstrap synthetic entry for this pane before registering real session
|
|
if (event.sessionId !== event.paneId) {
|
|
dequeueByPaneId(event.paneId, event.sessionId);
|
|
}
|
|
upsertSession(
|
|
event.sessionId,
|
|
event.paneId,
|
|
event.cwd,
|
|
event.transcriptPath,
|
|
event.timestamp,
|
|
event.reason,
|
|
event.message
|
|
);
|
|
enqueue(event.sessionId, event.reason, event.timestamp);
|
|
console.log(`[event] stuck: ${event.sessionId.slice(0, 8)} (${event.reason})`);
|
|
} else if (isUnstuckEvent(event)) {
|
|
// Dequeue by session_id; also clean up any bootstrap entry for this pane
|
|
dequeue(event.sessionId);
|
|
dequeueByPaneId(event.paneId, event.sessionId);
|
|
console.log(`[event] unstuck: ${event.sessionId.slice(0, 8)}`);
|
|
} else if (isSessionRegistered(event)) {
|
|
upsertSession(
|
|
event.sessionId,
|
|
event.paneId,
|
|
event.cwd,
|
|
event.transcriptPath,
|
|
null,
|
|
null,
|
|
null
|
|
);
|
|
console.log(`[event] registered: ${event.sessionId.slice(0, 8)} -> ${event.paneId}`);
|
|
} else if (isSessionEnded(event)) {
|
|
deleteSession(event.sessionId);
|
|
console.log(`[event] ended: ${event.sessionId.slice(0, 8)}`);
|
|
}
|
|
|
|
res.writeHead(200, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ ok: true }));
|
|
return;
|
|
}
|
|
|
|
// POST /event/normalized - normalized event ingest endpoint
|
|
//
|
|
// This endpoint accepts pre-normalized stuck/unstuck/registered/ended events
|
|
// directly from harness-agnostic adapters (tmux detector, future hooks).
|
|
//
|
|
// Design decision: We chose a separate /event/normalized endpoint over wrapping
|
|
// tmux events in the Claude hook format because:
|
|
// 1. Keeps the adapter layer clean — adapters emit normalized events directly
|
|
// 2. Avoids coupling non-Claude sources to Claude-specific data structures
|
|
// 3. The normalized contract is already the internal model — we expose it directly
|
|
//
|
|
// See docs/notes/decisions.md for full rationale.
|
|
if (req.method === "POST" && url.pathname === "/event/normalized") {
|
|
const body: string = await new Promise((resolve) => {
|
|
let data = "";
|
|
req.on("data", (chunk) => (data += chunk));
|
|
req.on("end", () => resolve(data));
|
|
});
|
|
|
|
let event: NormalizedEvent;
|
|
try {
|
|
event = JSON.parse(body) as NormalizedEvent;
|
|
} catch {
|
|
res.writeHead(400, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ error: "Invalid JSON" }));
|
|
return;
|
|
}
|
|
|
|
// Validate type discriminator
|
|
if (!event.type || !["stuck", "unstuck", "registered", "ended"].includes(event.type)) {
|
|
res.writeHead(400, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ error: "Missing or invalid event type" }));
|
|
return;
|
|
}
|
|
|
|
// Route by event type
|
|
if (event.type === "stuck") {
|
|
// Clean up any bootstrap synthetic entry for this pane before registering real session
|
|
if (event.sessionId !== event.paneId) {
|
|
dequeueByPaneId(event.paneId, event.sessionId);
|
|
}
|
|
upsertSession(
|
|
event.sessionId,
|
|
event.paneId,
|
|
event.cwd,
|
|
event.transcriptPath,
|
|
event.timestamp,
|
|
event.reason,
|
|
event.message
|
|
);
|
|
enqueue(event.sessionId, event.reason, event.timestamp);
|
|
console.log(`[normalized] stuck: ${event.sessionId.slice(0, 8)} (${event.reason})`);
|
|
} else if (event.type === "unstuck") {
|
|
dequeue(event.sessionId);
|
|
console.log(`[normalized] unstuck: ${event.sessionId.slice(0, 8)}`);
|
|
} else if (event.type === "registered") {
|
|
upsertSession(
|
|
event.sessionId,
|
|
event.paneId,
|
|
event.cwd,
|
|
event.transcriptPath,
|
|
null,
|
|
null,
|
|
null
|
|
);
|
|
console.log(`[normalized] registered: ${event.sessionId.slice(0, 8)} -> ${event.paneId}`);
|
|
} else if (event.type === "ended") {
|
|
deleteSession(event.sessionId);
|
|
console.log(`[normalized] ended: ${event.sessionId.slice(0, 8)}`);
|
|
}
|
|
|
|
res.writeHead(200, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ ok: true }));
|
|
return;
|
|
}
|
|
|
|
// GET /next - return the head-of-queue pane id
|
|
if (req.method === "GET" && url.pathname === "/next") {
|
|
const head = getHead();
|
|
if (!head) {
|
|
res.writeHead(200, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ paneId: null, reason: "queue empty" }));
|
|
return;
|
|
}
|
|
|
|
const sess = await getStoredSession(head.session_id);
|
|
if (!sess) {
|
|
// Shouldn't happen due to FK, but handle gracefully
|
|
res.writeHead(200, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ paneId: null, reason: "session not found" }));
|
|
return;
|
|
}
|
|
|
|
res.writeHead(200, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ paneId: sess.pane_id, sessionId: sess.session_id, reason: null }));
|
|
return;
|
|
}
|
|
|
|
// POST /skip - skip current head and move to tail
|
|
if (req.method === "POST" && url.pathname === "/skip") {
|
|
const head = getHead();
|
|
if (!head) {
|
|
res.writeHead(200, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ paneId: null, reason: "queue empty" }));
|
|
return;
|
|
}
|
|
|
|
skipHead(head.session_id, SKIP_COOLDOWN_MS);
|
|
console.log(`[skip] ${head.session_id.slice(0, 8)} moved to tail (cooldown ${SKIP_COOLDOWN_MS}ms)`);
|
|
|
|
// Return the new head
|
|
const newHead = getHead();
|
|
if (!newHead) {
|
|
res.writeHead(200, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ paneId: null, reason: "queue empty after skip" }));
|
|
return;
|
|
}
|
|
|
|
const sess = await getStoredSession(newHead.session_id);
|
|
if (!sess) {
|
|
res.writeHead(200, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ paneId: null, reason: "session not found" }));
|
|
return;
|
|
}
|
|
|
|
res.writeHead(200, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ paneId: sess.pane_id, sessionId: sess.session_id, reason: null }));
|
|
return;
|
|
}
|
|
|
|
// GET /queue - list all stuck items (for popup display)
|
|
if (req.method === "GET" && url.pathname === "/queue") {
|
|
const items = getAllStuck(50);
|
|
res.writeHead(200, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ items, count: items.length }));
|
|
return;
|
|
}
|
|
|
|
// GET /status - simple health/status endpoint
|
|
if (req.method === "GET" && url.pathname === "/status") {
|
|
const stuckCount = getStuckCount();
|
|
res.writeHead(200, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ status: "ok", stuckCount }));
|
|
return;
|
|
}
|
|
|
|
// 404
|
|
res.writeHead(404, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ error: "Not found" }));
|
|
} catch (err) {
|
|
console.error("[request] error:", err);
|
|
res.writeHead(500, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify({ error: "Internal server error" }));
|
|
}
|
|
});
|
|
|
|
async function getStoredSession(sessionId: string): Promise<{ session_id: string; pane_id: string } | null> {
|
|
// Direct query since db.ts functions return full rows
|
|
const { db } = await import("./db.ts");
|
|
const stmt = db.prepare("SELECT session_id, pane_id FROM sessions WHERE session_id = ?");
|
|
return stmt.get(sessionId) as ReturnType<typeof getStoredSession>;
|
|
}
|
|
|
|
server.listen(PORT, HOST, () => {
|
|
console.log(`[trailboss] daemon listening on http://${HOST}:${PORT}`);
|
|
});
|