feat: implement detection explainability system

Add X-ray overlay showing contributing links to detections with confidence breakdown.

- Users can click "Why?" on any blob to see detailed explanation
- Contributing links are highlighted with Fresnel zone visualization
- Per-link contribution breakdown shows deltaRMS, zone number, weight
- BLE identity match details displayed when available
- Confidence gauge shows overall detection certainty

Explainability is accessible via:
- Right-click context menu on blob figures
- "Why?" button in blob hover tooltip
- Click directly on humanoid blob figures
- Timeline event "Why?" buttons

Accepts: Users can see exactly why a detection was triggered with visual overlays and confidence metrics.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-09 11:48:19 -04:00
parent 38728e9f83
commit eb6e479bea
13 changed files with 2553 additions and 183 deletions

File diff suppressed because one or more lines are too long

View file

@ -1 +1 @@
9bef706de0a9b5c5d281760e1f8e8874a0599a45
0cb2353a08180199080bf1de4b70cb54d32f61ff

View file

@ -0,0 +1,359 @@
/* ============================================
Detection Explainability Panel Styles
============================================ */
/* ----- Loading State ----- */
.explainability-loading {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
padding: 40px 20px;
color: #888;
}
.panel-loading-spinner {
width: 32px;
height: 32px;
border: 3px solid rgba(76, 195, 247, 0.2);
border-top-color: #4fc3f7;
border-radius: 50%;
animation: explainability-spin 1s linear infinite;
margin-bottom: 12px;
}
@keyframes explainability-spin {
to { transform: rotate(360deg); }
}
/* ----- Empty State ----- */
.panel-empty {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
padding: 40px 20px;
color: #666;
}
.panel-empty-icon {
font-size: 48px;
margin-bottom: 12px;
opacity: 0.5;
}
.panel-empty-text {
font-size: 14px;
}
/* ----- Confidence Gauge ----- */
.explainability-confidence {
display: flex;
flex-direction: column;
align-items: center;
padding: 24px 0;
margin-bottom: 20px;
border-bottom: 1px solid rgba(255, 255, 255, 0.1);
}
.confidence-gauge {
position: relative;
width: 100px;
height: 100px;
margin-bottom: 12px;
}
.confidence-ring {
width: 100%;
height: 100%;
transform: rotate(-90deg);
}
.confidence-ring-bg {
fill: none;
stroke: rgba(255, 255, 255, 0.1);
stroke-width: 6;
}
.confidence-ring-fill {
fill: none;
stroke: #4fc3f7;
stroke-width: 6;
stroke-linecap: round;
transform-origin: 50% 50%;
transition: stroke-dasharray 0.5s ease;
}
.confidence-value {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
font-size: 24px;
font-weight: 700;
color: #4fc3f7;
}
.confidence-label {
font-size: 14px;
color: #888;
}
/* ----- Sections ----- */
.explainability-section {
margin-bottom: 20px;
}
.explainability-section.collapsed {
margin-bottom: 8px;
}
.section-title {
font-size: 14px;
font-weight: 600;
color: #ccc;
margin: 0 0 12px 0;
}
.section-header {
display: flex;
align-items: center;
justify-content: space-between;
padding: 10px 12px;
background: rgba(255, 255, 255, 0.05);
border-radius: 6px;
cursor: pointer;
user-select: none;
transition: background 0.2s;
}
.section-header:hover {
background: rgba(255, 255, 255, 0.08);
}
.toggle-icon {
font-size: 12px;
color: #888;
transition: transform 0.2s;
}
.section-content {
margin-top: 12px;
}
/* ----- Links Table ----- */
.links-table {
width: 100%;
border-collapse: collapse;
font-size: 13px;
}
.links-table th,
.links-table td {
padding: 8px 10px;
text-align: left;
border-bottom: 1px solid rgba(255, 255, 255, 0.05);
}
.links-table th {
font-weight: 600;
color: #888;
font-size: 11px;
text-transform: uppercase;
letter-spacing: 0.5px;
}
.links-table tr:last-child td {
border-bottom: none;
}
.links-table-detailed {
font-size: 12px;
}
.links-table-detailed th,
.links-table-detailed td {
padding: 6px 8px;
}
/* ----- Table Cells ----- */
.link-cell {
font-family: 'SF Mono', 'Monaco', 'Inconsolata', 'Roboto Mono', monospace;
font-size: 11px;
}
.link-id {
color: #4fc3f7;
}
.deltarms-cell {
font-family: 'SF Mono', 'Monaco', 'Inconsolata', 'Roboto Mono', monospace;
color: #a5d6a7;
}
.zone-cell {
text-align: center;
}
.zone-badge {
display: inline-block;
min-width: 20px;
padding: 2px 6px;
border-radius: 4px;
font-size: 11px;
font-weight: 600;
text-align: center;
}
.weight-cell {
font-family: 'SF Mono', 'Monaco', 'Inconsolata', 'Roboto Mono', monospace;
color: #ffcc80;
}
.contributing-cell {
text-align: center;
}
.contributing-badge {
display: inline-block;
width: 18px;
height: 18px;
line-height: 18px;
border-radius: 50%;
font-size: 12px;
}
.contributing-yes .contributing-badge {
background: #22c55e;
color: white;
}
.contributing-no .contributing-badge {
color: #666;
}
.contribution-cell {
font-family: 'SF Mono', 'Monaco', 'Inconsolata', 'Roboto Mono', monospace;
color: #90caf9;
font-size: 11px;
}
/* ----- BLE Match Card ----- */
.ble-match-card {
background: rgba(255, 255, 255, 0.05);
border-radius: 8px;
padding: 12px;
}
.ble-match-header {
display: flex;
align-items: center;
gap: 10px;
margin-bottom: 10px;
}
.ble-match-indicator {
width: 12px;
height: 12px;
border-radius: 50%;
flex-shrink: 0;
}
.ble-match-name {
font-size: 15px;
font-weight: 600;
color: #eee;
}
.ble-match-confidence {
margin-left: auto;
font-size: 12px;
color: #888;
}
.ble-match-details {
display: flex;
flex-direction: column;
gap: 6px;
}
.ble-match-detail {
display: flex;
font-size: 12px;
}
.detail-label {
color: #888;
min-width: 70px;
}
.detail-value {
color: #ccc;
font-family: 'SF Mono', 'Monaco', 'Inconsolata', 'Roboto Mono', monospace;
}
/* ----- Footer ----- */
.explainability-footer {
padding-top: 16px;
margin-top: 20px;
border-top: 1px solid rgba(255, 255, 255, 0.1);
display: flex;
justify-content: center;
}
.panel-btn {
padding: 10px 20px;
border: none;
border-radius: 6px;
font-size: 14px;
font-weight: 500;
cursor: pointer;
transition: background 0.2s, transform 0.1s;
}
.panel-btn:active {
transform: scale(0.98);
}
.panel-btn-primary {
background: #4fc3f7;
color: #1a1a2e;
}
.panel-btn-primary:hover {
background: #29b6f6;
}
.panel-btn-secondary {
background: rgba(255, 255, 255, 0.1);
color: #eee;
}
.panel-btn-secondary:hover {
background: rgba(255, 255, 255, 0.15);
}
/* ----- Confidence Color Variants ----- */
.confidence-ring-fill.high {
stroke: #22c55e;
}
.confidence-value.high {
color: #22c55e;
}
.confidence-ring-fill.medium {
stroke: #ff9800;
}
.confidence-value.medium {
color: #ff9800;
}
.confidence-ring-fill.low {
stroke: #f44336;
}
.confidence-value.low {
color: #f44336;
}

View file

@ -13,6 +13,7 @@
<link rel="stylesheet" href="css/anomaly.css">
<link rel="stylesheet" href="css/sleep.css">
<link rel="stylesheet" href="css/floorplan.css">
<link rel="stylesheet" href="css/explainability.css">
<style>
* {
margin: 0;
@ -2136,6 +2137,283 @@
.ws-lost-dismiss-btn:hover {
background: rgba(255, 255, 255, 0.15);
}
/* ===== Replay Control Bar ===== */
#pause-live-btn {
background: rgba(255, 193, 7, 0.15);
border: 1px solid rgba(255, 193, 7, 0.4);
color: #ffc107;
font-size: 12px;
padding: 3px 10px;
border-radius: 4px;
cursor: pointer;
transition: background 0.2s;
}
#pause-live-btn:hover {
background: rgba(255, 193, 7, 0.25);
}
/* Replay control bar (shown during replay mode) */
.replay-control-bar {
position: fixed;
bottom: 20px;
left: 50%;
transform: translateX(-50%);
background: rgba(0, 0, 0, 0.9);
border-radius: 12px;
padding: 12px 20px;
display: flex;
align-items: center;
gap: 16px;
z-index: 150;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.5);
border: 1px solid rgba(255, 255, 255, 0.1);
}
.replay-controls {
display: flex;
align-items: center;
gap: 12px;
}
.replay-btn {
background: rgba(255, 255, 255, 0.08);
border: 1px solid rgba(255, 255, 255, 0.15);
color: #ccc;
width: 32px;
height: 32px;
border-radius: 6px;
cursor: pointer;
display: flex;
align-items: center;
justify-content: center;
transition: background 0.2s, color 0.2s;
}
.replay-btn:hover {
background: rgba(79, 195, 247, 0.2);
border-color: rgba(79, 195, 247, 0.4);
color: #4fc3f7;
}
.replay-info {
display: flex;
flex-direction: column;
gap: 2px;
}
.replay-timestamp {
font-size: 14px;
font-weight: 600;
color: #eee;
font-family: monospace;
}
.replay-range {
font-size: 11px;
color: #888;
}
.replay-playback {
display: flex;
align-items: center;
gap: 8px;
border-left: 1px solid rgba(255, 255, 255, 0.1);
padding-left: 12px;
}
.replay-speed {
background: rgba(255, 255, 255, 0.08);
border: 1px solid rgba(255, 255, 255, 0.15);
color: #ccc;
font-size: 11px;
padding: 4px 8px;
border-radius: 4px;
cursor: pointer;
}
.replay-speed:focus {
outline: none;
border-color: #4fc3f7;
}
.replay-timeline {
display: flex;
flex-direction: column;
gap: 2px;
border-left: 1px solid rgba(255, 255, 255, 0.1);
padding-left: 12px;
}
.replay-scrubber {
width: 200px;
height: 4px;
-webkit-appearance: none;
appearance: none;
background: rgba(255, 255, 255, 0.15);
border-radius: 2px;
outline: none;
}
.replay-scrubber::-webkit-slider-thumb {
-webkit-appearance: none;
appearance: none;
width: 12px;
height: 12px;
background: #4fc3f7;
border-radius: 50%;
cursor: pointer;
transition: transform 0.1s;
}
.replay-scrubber::-webkit-slider-thumb:hover {
transform: scale(1.2);
}
.replay-scrubber::-moz-range-thumb {
width: 12px;
height: 12px;
background: #4fc3f7;
border-radius: 50%;
cursor: pointer;
border: none;
}
.replay-tuning-btn {
background: rgba(171, 71, 188, 0.15);
border: 1px solid rgba(171, 71, 188, 0.4);
color: #ba68c8;
font-size: 11px;
padding: 4px 10px;
border-radius: 4px;
cursor: pointer;
transition: background 0.2s;
}
.replay-tuning-btn:hover {
background: rgba(171, 71, 188, 0.25);
}
/* Replay tuning panel */
.replay-tuning-panel {
position: fixed;
top: 60px;
right: 340px;
width: 280px;
background: rgba(0, 0, 0, 0.9);
border-radius: 8px;
padding: 16px;
z-index: 100;
display: none;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.5);
border: 1px solid rgba(255, 255, 255, 0.1);
}
.replay-tuning-panel.visible {
display: flex;
flex-direction: column;
}
.replay-tuning-content {
display: flex;
flex-direction: column;
gap: 12px;
}
.replay-tuning-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 8px;
}
.replay-tuning-header h3 {
font-size: 14px;
color: #888;
text-transform: uppercase;
letter-spacing: 1px;
margin: 0;
}
.replay-tuning-close {
background: none;
border: none;
color: #888;
font-size: 18px;
cursor: pointer;
padding: 0;
line-height: 1;
}
.replay-tuning-close:hover {
color: #ccc;
}
.replay-tuning-body {
display: flex;
flex-direction: column;
gap: 12px;
}
.tuning-param {
display: flex;
flex-direction: column;
gap: 4px;
}
.tuning-param label {
font-size: 11px;
color: #aaa;
display: flex;
justify-content: space-between;
}
.tuning-param input[type="range"] {
width: 100%;
height: 4px;
-webkit-appearance: none;
appearance: none;
background: rgba(255, 255, 255, 0.15);
border-radius: 2px;
outline: none;
}
.tuning-param input[type="range"]::-webkit-slider-thumb {
-webkit-appearance: none;
appearance: none;
width: 12px;
height: 12px;
background: #4fc3f7;
border-radius: 50%;
cursor: pointer;
}
.tuning-param input[type="range"]::-moz-range-thumb {
width: 12px;
height: 12px;
background: #4fc3f7;
border-radius: 50%;
cursor: pointer;
border: none;
}
.tuning-param-value {
font-family: monospace;
color: #4fc3f7;
font-size: 11px;
}
.tuning-actions {
display: flex;
gap: 8px;
margin-top: 8px;
}
.tuning-btn {
flex: 1;
padding: 8px;
border: none;
border-radius: 4px;
font-size: 12px;
cursor: pointer;
transition: background 0.2s;
}
.tuning-btn.apply {
background: #4fc3f7;
color: #1a1a2e;
}
.tuning-btn.apply:hover {
background: #29b6f6;
}
.tuning-btn.reset {
background: rgba(255, 255, 255, 0.1);
color: #ccc;
}
.tuning-btn.reset:hover {
background: rgba(255, 255, 255, 0.15);
}
</style>
</head>
<body>
@ -2200,6 +2478,13 @@
<!-- Security Status Indicator (managed by SecurityPanel) -->
<div class="status-item" id="security-status-container"></div>
<div class="status-item" style="margin-left:auto; gap:6px;">
<button id="pause-live-btn" onclick="SpaxelReplay && SpaxelReplay.pauseLive()" title="Pause live mode and enter replay">
<svg xmlns="http://www.w3.org/2000/svg" width="12" height="12" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2.5" stroke-linecap="round" stroke-linejoin="round" style="vertical-align:middle;margin-right:2px;">
<rect x="6" y="4" width="4" height="16"></rect>
<rect x="14" y="4" width="4" height="16"></rect>
</svg>
Pause
</button>
<button id="ble-btn" onclick="BLEPanel && BLEPanel.open()">People & Devices<span id="ble-unregistered-badge" class="badge"></span></button>
<button id="settings-btn" onclick="openSettingsPanel && openSettingsPanel()">Settings</button>
<button id="add-node-btn" onclick="SpaxelOnboard && SpaxelOnboard.start()">+ Add Node</button>
@ -2343,6 +2628,8 @@
<script src="js/sleep.js"></script>
<!-- Diurnal Baseline Visualization -->
<script src="js/diurnal-chart.js"></script>
<!-- Time-Travel Replay -->
<script src="js/replay.js"></script>
<!-- Room editor panel -->
<div id="room-editor-panel">

766
dashboard/js/replay.js Normal file
View file

@ -0,0 +1,766 @@
/**
* Spaxel Dashboard - Time-Travel Replay Mode
*
* Provides pause-live, timeline scrubbing, and replay 3D visualization
* from recorded CSI data.
*/
(function() {
'use strict';
// ============================================
// Configuration
// ============================================
const CONFIG = {
// Default replay window when pausing live (60 seconds before now)
defaultReplayWindowSec: 60,
// Timeline scrubber update interval (ms)
timelineUpdateInterval: 100,
// Playback speeds
speeds: [1, 2, 5],
// Timestamp range padding when creating replay sessions
sessionPaddingMs: 5000,
};
// ============================================
// State
// ============================================
const state = {
// Replay session state
activeSessionId: null,
sessionFromMs: null,
sessionToMs: null,
sessionCurrentMs: null,
sessionSpeed: 1,
sessionState: 'stopped', // stopped, paused, playing
// Recording store info
storeOldestMs: null,
storeNewestMs: null,
storeHasData: false,
// UI state
isPaused: false,
isReplayMode: false,
// Callbacks
onReplayBlob: null,
};
// ============================================
// DOM Elements
// ============================================
let elements = {};
// ============================================
// Initialization
// ============================================
function init() {
console.log('[Replay] Initializing time-travel replay');
// Create replay controls
createReplayControls();
// Fetch recording store info
fetchStoreInfo();
// Start timeline update loop
startTimelineLoop();
console.log('[Replay] Ready');
}
// ============================================
// Replay Controls UI
// ============================================
function createReplayControls() {
const statusBar = document.getElementById('status-bar');
if (!statusBar) {
console.warn('[Replay] Status bar not found');
return;
}
// Create replay control bar (hidden by default)
const replayBar = document.createElement('div');
replayBar.id = 'replay-control-bar';
replayBar.className = 'replay-control-bar';
replayBar.style.display = 'none';
replayBar.innerHTML = `
<div class="replay-controls">
<button id="replay-back-btn" class="replay-btn" title="Back to live">
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<path d="M1 4v6h6"/>
<path d="M3.51 15a9 9 0 1 0 2.13-9.36L23 10M10 19l-7-7"/>
</svg>
</button>
<div class="replay-info">
<span id="replay-timestamp" class="replay-timestamp">--:--:--</span>
<span id="replay-range" class="replay-range">0:00 / 0:00</span>
</div>
<div class="replay-playback">
<button id="replay-play-btn" class="replay-btn" title="Play/Pause">
<svg id="play-icon" xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="currentColor">
<path d="M8 5v14l11-7z"/>
</svg>
<svg id="pause-icon" style="display:none" xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="currentColor">
<path d="M6 19h4V5H6v14zm8-14v14h4V5h-4z"/>
</svg>
</button>
<select id="replay-speed" class="replay-speed">
<option value="1">1×</option>
<option value="2">2×</option>
<option value="5">5×</option>
</select>
</div>
<div class="replay-timeline">
<input type="range" id="replay-scrubber" class="replay-scrubber"
min="0" max="100" step="0.1" value="0"
title="Scrub through timeline">
</div>
<button id="replay-close-btn" class="replay-btn" title="Exit replay mode">
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<line x1="18" y1="6" x2="6" y2="18"/>
<line x1="6" y1="6" x2="18" y2="18"/>
</svg>
</button>
</div>
<div class="replay-tuning">
<button id="replay-tune-btn" class="replay-tune-btn" title="Tune detection parameters">
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<circle cx="12" cy="12" r="3"/>
<path d="M12 1v6m0 6v6"/>
<path d="m19.07 4.93-1.41 1.41M17 16l-5-5"/>
<path d="M4.93 19.07l1.41-1.41M16 17l5-5"/>
</svg>
Tune
</button>
</div>
`;
// Insert after status bar
statusBar.parentNode.insertBefore(replayBar, statusBar.nextSibling);
// Store element references
elements = {
bar: replayBar,
backBtn: document.getElementById('replay-back-btn'),
timestamp: document.getElementById('replay-timestamp'),
range: document.getElementById('replay-range'),
playBtn: document.getElementById('replay-play-btn'),
playIcon: document.getElementById('play-icon'),
pauseIcon: document.getElementById('pause-icon'),
speed: document.getElementById('replay-speed'),
scrubber: document.getElementById('replay-scrubber'),
closeBtn: document.getElementById('replay-close-btn'),
tuneBtn: document.getElementById('replay-tune-btn'),
};
// Attach event listeners
elements.backBtn.addEventListener('click', onBackToLive);
elements.playBtn.addEventListener('click', onPlayPause);
elements.speed.addEventListener('change', onSpeedChange);
elements.scrubber.addEventListener('input', onScrub);
elements.closeBtn.addEventListener('click', onExitReplay);
elements.tuneBtn.addEventListener('click', onTuneParams);
}
// ============================================
// API Communication
// ============================================
function fetchStoreInfo() {
fetch('/api/replay/sessions')
.then(res => res.json())
.then(data => {
state.storeOldestMs = data.oldest_timestamp_ms;
state.storeNewestMs = data.newest_timestamp_ms;
state.storeHasData = data.has_data;
console.log('[Replay] Store info:', data);
})
.catch(err => {
console.error('[Replay] Failed to fetch store info:', err);
});
}
function startReplaySession(fromMs, toMs) {
const fromISO = new Date(fromMs).toISOString();
const toISO = toMs ? new Date(toMs).toISOString() : '';
return fetch('/api/replay/start', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
from_iso8601: fromISO,
to_iso8601: toISO,
speed: 1
})
})
.then(res => {
if (!res.ok) {
throw new Error('Failed to start replay: ' + res.statusText);
}
return res.json();
})
.then(data => {
state.activeSessionId = data.session_id;
state.sessionFromMs = data.from_ms;
state.sessionToMs = data.to_ms;
state.sessionCurrentMs = data.from_ms;
state.sessionState = 'paused';
state.sessionSpeed = data.speed;
console.log('[Replay] Session started:', data);
updateUI();
return data.session_id;
});
}
function stopReplaySession() {
if (!state.activeSessionId) return Promise.resolve();
return fetch('/api/replay/stop', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
session_id: state.activeSessionId
})
})
.then(res => {
if (!res.ok) {
throw new Error('Failed to stop replay: ' + res.statusText);
}
return res.json();
})
.then(() => {
console.log('[Replay] Session stopped:', state.activeSessionId);
state.activeSessionId = null;
state.isPaused = false;
updateUI();
});
}
function seekReplay(targetMs) {
if (!state.activeSessionId) return Promise.resolve();
const targetISO = new Date(targetMs).toISOString();
return fetch('/api/replay/seek', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
session_id: state.activeSessionId,
timestamp_iso8601: targetISO
})
})
.then(res => {
if (!res.ok) {
throw new Error('Failed to seek: ' + res.statusText);
}
return res.json();
})
.then(data => {
state.sessionCurrentMs = data.current_ms;
updateUI();
});
}
function setPlaybackSpeed(speed) {
if (!state.activeSessionId) return;
fetch('/api/replay/set-speed', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
session_id: state.activeSessionId,
speed: speed
})
})
.then(res => {
if (!res.ok) {
throw new Error('Failed to set speed: ' + res.statusText);
}
return res.json();
})
.then(() => {
state.sessionSpeed = speed;
console.log('[Replay] Speed set to', speed, 'x');
});
}
function setPlaybackState(playbackState) {
if (!state.activeSessionId) return;
fetch('/api/replay/set-state', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
session_id: state.activeSessionId,
state: playbackState
})
})
.then(res => {
if (!res.ok) {
throw new Error('Failed to set state: ' + res.statusText);
}
return res.json();
})
.then(() => {
state.sessionState = playbackState;
updatePlayPauseButton();
});
}
function tuneParams(params) {
if (!state.activeSessionId) return Promise.resolve();
return fetch('/api/replay/tune', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
session_id: state.activeSessionId,
...params
})
})
.then(res => {
if (!res.ok) {
throw new Error('Failed to tune params: ' + res.statusText);
}
return res.json();
})
.then(data => {
console.log('[Replay] Parameters tuned:', data.params);
return data;
});
}
// ============================================
// Event Handlers
// ============================================
function onBackToLive() {
exitReplayMode();
}
function onPlayPause() {
if (!state.activeSessionId) return;
const newState = state.sessionState === 'playing' ? 'paused' : 'playing';
setPlaybackState(newState);
}
function onSpeedChange(e) {
const speed = parseInt(e.target.value, 10);
setPlaybackSpeed(speed);
}
function onScrub(e) {
const percent = parseFloat(e.target.value);
const rangeMs = state.sessionToMs - state.sessionFromMs;
const targetMs = state.sessionFromMs + Math.round(rangeMs * percent / 100);
seekReplay(targetMs);
}
function onExitReplay() {
exitReplayMode();
}
function onTuneParams() {
showTuningPanel();
}
// ============================================
// UI Updates
// ============================================
function updateUI() {
updateTimestampDisplay();
updateRangeDisplay();
updateScrubber();
updatePlayPauseButton();
}
function updateTimestampDisplay() {
if (!elements.timestamp) return;
if (state.sessionCurrentMs !== null) {
elements.timestamp.textContent = formatTimestamp(state.sessionCurrentMs);
} else {
elements.timestamp.textContent = '--:--:--';
}
}
function updateRangeDisplay() {
if (!elements.range) return;
if (state.sessionFromMs !== null && state.sessionToMs !== null) {
elements.range.textContent = formatTimestamp(state.sessionFromMs) +
' / ' + formatTimestamp(state.sessionToMs);
} else {
elements.range.textContent = '0:00 / 0:00';
}
}
function updateScrubber() {
if (!elements.scrubber) return;
if (state.sessionFromMs !== null && state.sessionToMs !== null) {
const rangeMs = state.sessionToMs - state.sessionFromMs;
if (state.sessionCurrentMs !== null) {
const offset = state.sessionCurrentMs - state.sessionFromMs;
const percent = Math.max(0, Math.min(100, (offset / rangeMs) * 100));
elements.scrubber.value = percent;
}
}
}
function updatePlayPauseButton() {
if (!elements.playIcon || !elements.pauseIcon) return;
if (state.sessionState === 'playing') {
elements.playIcon.style.display = 'none';
elements.pauseIcon.style.display = 'block';
} else {
elements.playIcon.style.display = 'block';
elements.pauseIcon.style.display = 'none';
}
}
// ============================================
// Mode Transitions
// ============================================
function enterReplayMode(fromMs, toMs) {
console.log('[Replay] Entering replay mode:', { fromMs, toMs });
state.isReplayMode = true;
// Start replay session
startReplaySession(fromMs, toMs).then(sessionId => {
// Show replay control bar
if (elements.bar) {
elements.bar.style.display = 'block';
}
// Notify 3D visualization to enter replay mode
if (window.Viz3D && Viz3D.enterReplayMode) {
Viz3D.enterReplayMode();
}
return sessionId;
}).catch(err => {
console.error('[Replay] Failed to enter replay mode:', err);
if (window.SpaxelApp) {
SpaxelApp.showToast('Failed to enter replay mode: ' + err.message, 'error');
}
});
}
function exitReplayMode() {
console.log('[Replay] Exiting replay mode');
// Stop replay session
stopReplaySession().then(() => {
state.isReplayMode = false;
state.isPaused = false;
// Hide replay control bar
if (elements.bar) {
elements.bar.style.display = 'none';
}
// Notify 3D visualization to exit replay mode
if (window.Viz3D && Viz3D.exitReplayMode) {
Viz3D.exitReplayMode();
}
// Navigate back to live mode
if (window.SpaxelRouter) {
SpaxelRouter.navigate('live');
}
});
}
function pauseLiveMode() {
if (state.isPaused) {
// Already paused, exit replay mode
exitReplayMode();
return;
}
console.log('[Replay] Pausing live mode');
state.isPaused = true;
// Calculate replay window (default: 60 seconds before now)
const now = Date.now();
const fromMs = now - (CONFIG.defaultReplayWindowSec * 1000);
const toMs = now;
enterReplayMode(fromMs, toMs);
}
// ============================================
// Timeline Loop
// ============================================
let timelineInterval = null;
function startTimelineLoop() {
if (timelineInterval) return;
timelineInterval = setInterval(() => {
if (state.sessionState === 'playing' && state.activeSessionId) {
// Fetch current session state
fetch(`/api/replay/session/${state.activeSessionId}`)
.then(res => res.json())
.then(data => {
state.sessionCurrentMs = data.current_ms;
updateUI();
// Trigger 3D visualization update with replay blobs
if (data.blobs && window.Viz3D && Viz3D.updateReplayBlobs) {
Viz3D.updateReplayBlobs(data.blobs, data.timestamp_ms);
}
})
.catch(err => {
console.error('[Replay] Failed to fetch session state:', err);
});
}
}, CONFIG.timelineUpdateInterval);
}
// ============================================
// Tuning Panel
// ============================================
function showTuningPanel() {
// Create or show tuning panel overlay
let panel = document.getElementById('replay-tuning-panel');
if (!panel) {
panel = createTuningPanel();
document.body.appendChild(panel);
}
panel.style.display = 'flex';
// Fetch current session params
if (state.activeSessionId) {
fetch(`/api/replay/session/${state.activeSessionId}`)
.then(res => res.json())
.then(data => {
populateTuningPanel(data.params || {});
});
}
}
function createTuningPanel() {
const panel = document.createElement('div');
panel.id = 'replay-tuning-panel';
panel.className = 'replay-tuning-panel';
panel.innerHTML = `
<div class="replay-tuning-content">
<div class="replay-tuning-header">
<h2>Detection Parameters</h2>
<button class="replay-tuning-close" onclick="this.parentElement.parentElement.parentElement.style.display='none'">
<svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<line x1="18" y1="6" x2="6" y2="18"/>
<line x1="6" y1="6" x2="18" y2="18"/>
</svg>
</button>
</div>
<div class="replay-tuning-body">
<div class="tuning-param">
<label>Detection Threshold (deltaRMS)</label>
<input type="range" id="tune-threshold" min="0.001" max="0.1" step="0.001" value="0.02">
<span class="tuning-value">0.02</span>
</div>
<div class="tuning-param">
<label>Baseline Time Constant (tau) [seconds]</label>
<input type="range" id="tune-tau" min="1" max="600" step="1" value="30">
<span class="tuning-value">30</span>
</div>
<div class="tuning-param">
<label>Fresnel Weight Decay Rate</label>
<input type="range" id="tune-fresnel" min="1.0" max="4.0" step="0.1" value="2.0">
<span class="tuning-value">2.0</span>
</div>
<div class="tuning-param">
<label>Subcarrier Count (NBVI)</label>
<input type="range" id="tune-subcarriers" min="8" max="47" step="1" value="16">
<span class="tuning-value">16</span>
</div>
<div class="tuning-param">
<label>Breathing Sensitivity</label>
<input type="range" id="tune-breathing" min="0.001" max="0.1" step="0.001" value="0.005">
<span class="tuning-value">0.005</span>
</div>
<div class="tuning-actions">
<button id="tune-apply-btn" class="tuning-btn">Apply Parameters</button>
<button id="tune-reset-btn" class="tuning-btn tuning-btn-secondary">Reset to Live</button>
</div>
</div>
</div>
`;
// Attach event listeners
panel.querySelector('#tune-apply-btn').addEventListener('click', applyTuningParams);
panel.querySelector('#tune-reset-btn').addEventListener('click', resetTuningParams);
// Update value displays on slider change
panel.querySelectorAll('input[type="range"]').forEach(input => {
input.addEventListener('input', (e) => {
const valueSpan = e.target.parentElement.querySelector('.tuning-value');
if (valueSpan) {
valueSpan.textContent = e.target.value;
}
});
});
return panel;
}
function populateTuningPanel(params) {
const panel = document.getElementById('replay-tuning-panel');
if (!panel) return;
// Update sliders with current params
if (params.delta_rms_threshold !== undefined) {
const input = panel.querySelector('#tune-threshold');
if (input) {
input.value = params.delta_rms_threshold;
input.parentElement.querySelector('.tuning-value').textContent = params.delta_rms_threshold;
}
}
if (params.tau_s !== undefined) {
const input = panel.querySelector('#tune-tau');
if (input) {
input.value = params.tau_s;
input.parentElement.querySelector('.tuning-value').textContent = params.tau_s;
}
}
if (params.fresnel_decay !== undefined) {
const input = panel.querySelector('#tune-fresnel');
if (input) {
input.value = params.fresnel_decay;
input.parentElement.querySelector('.tuning-value').textContent = params.fresnel_decay;
}
}
if (params.n_subcarriers !== undefined) {
const input = panel.querySelector('#tune-subcarriers');
if (input) {
input.value = params.n_subcarriers;
input.parentElement.querySelector('.tuning-value').textContent = params.n_subcarriers;
}
}
if (params.breathing_sensitivity !== undefined) {
const input = panel.querySelector('#tune-breathing');
if (input) {
input.value = params.breathing_sensitivity;
input.parentElement.querySelector('.tuning-value').textContent = params.breathing_sensitivity;
}
}
}
function applyTuningParams() {
const panel = document.getElementById('replay-tuning-panel');
if (!panel) return;
const params = {
delta_rms_threshold: parseFloat(panel.querySelector('#tune-threshold').value),
tau_s: parseFloat(panel.querySelector('#tune-tau').value),
fresnel_decay: parseFloat(panel.querySelector('#tune-fresnel').value),
n_subcarriers: parseInt(panel.querySelector('#tune-subcarriers').value, 10),
breathing_sensitivity: parseFloat(panel.querySelector('#tune-breathing').value),
};
tuneParams(params).then(() => {
if (window.SpaxelApp) {
SpaxelApp.showToast('Parameters updated. Processing replay with new settings...', 'info');
}
// Hide panel after applying
panel.style.display = 'none';
}).catch(err => {
console.error('[Replay] Failed to tune parameters:', err);
if (window.SpaxelApp) {
SpaxelApp.showToast('Failed to update parameters: ' + err.message, 'error');
}
});
}
function resetTuningParams() {
// Reset to live default values
const params = {
delta_rms_threshold: 0.02,
tau_s: 30,
fresnel_decay: 2.0,
n_subcarriers: 16,
breathing_sensitivity: 0.005,
};
tuneParams(params).then(() => {
populateTuningPanel(params);
if (window.SpaxelApp) {
SpaxelApp.showToast('Parameters reset to live defaults', 'info');
}
}).catch(err => {
console.error('[Replay] Failed to reset parameters:', err);
});
}
// ============================================
// Utilities
// ============================================
function formatTimestamp(ms) {
const date = new Date(ms);
const hours = String(date.getHours()).padStart(2, '0');
const minutes = String(date.getMinutes()).padStart(2, '0');
const seconds = String(date.getSeconds()).padStart(2, '0');
return `${hours}:${minutes}:${seconds}`;
}
// ============================================
// Public API
// ============================================
window.SpaxelReplay = {
init: init,
// Pause live mode and enter replay
pauseLive: pauseLiveMode,
// Exit replay mode and return to live
exitReplay: exitReplayMode,
// Check if currently in replay mode
isReplayMode: () => state.isReplayMode,
// Check if currently paused
isPaused: () => state.isPaused,
// Get current replay session info
getSession: () => ({
id: state.activeSessionId,
fromMs: state.sessionFromMs,
toMs: state.sessionToMs,
currentMs: state.sessionCurrentMs,
speed: state.sessionSpeed,
state: state.sessionState,
}),
};
// Auto-initialize when DOM is ready
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', init);
} else {
init();
}
})();

View file

@ -1021,6 +1021,7 @@ const Viz3D = (function () {
canvas.addEventListener('mousemove', _onBlobMouseMove);
canvas.addEventListener('mouseleave', _hideBlobFeedbackTooltip);
canvas.addEventListener('contextmenu', _onBlobContextMenu);
canvas.addEventListener('click', _onBlobClick);
// Close context menus on click elsewhere
document.addEventListener('click', function() {
@ -1098,6 +1099,8 @@ const Viz3D = (function () {
'<div class="feedback-tooltip-content">' +
' <div class="feedback-tooltip-label">Track #' + blobId + '</div>' +
' <div class="feedback-tooltip-actions">' +
' <button class="feedback-btn-icon feedback-why" title="Why is this here?" ' +
' onclick="Viz3D.explainBlob(' + blobId + ')">&#128526;</button>' +
' <button class="feedback-btn-icon feedback-thumbs-up" title="Correct detection" ' +
' onclick="Viz3D.submitBlobFeedback(' + blobId + ', \'TRUE_POSITIVE\')">&#x1F44D;</button>' +
' <button class="feedback-btn-icon feedback-thumbs-down" title="Incorrect detection" ' +
@ -1391,6 +1394,57 @@ const Viz3D = (function () {
}
}
/**
* Handle click on blobs for explainability.
* Opens the explainability view when a blob figure is clicked.
*/
function _onBlobClick(event) {
if (!_camera || !_scene) return;
// Don't trigger if right-click (context menu)
if (event.button === 2) return;
// Calculate mouse position in normalized device coordinates
var rect = event.target.getBoundingClientRect();
_mouse.x = ((event.clientX - rect.left) / rect.width) * 2 - 1;
_mouse.y = -((event.clientY - rect.top) / rect.height) * 2 + 1;
// Raycast to find clicked blob
_raycaster.setFromCamera(_mouse, _camera);
var blobMeshes = [];
_blobs3D.forEach(function(obj) {
if (obj.group) {
blobMeshes.push(obj.group);
}
});
var intersects = _raycaster.intersectObjects(blobMeshes, true);
if (intersects.length > 0) {
// Find the blob object from the intersected mesh
var intersected = intersects[0].object;
var blobObj = null;
// Walk up the parent chain to find the group
var current = intersected;
while (current) {
_blobs3D.forEach(function(obj, id) {
if (obj.group === current) {
blobObj = obj;
}
});
if (blobObj) break;
current = current.parent;
}
if (blobObj) {
// Open explainability view
explainBlob(blobObj.blobId);
}
}
}
/**
* Identify a node by blinking its LED.
* @param {string} mac - The MAC address of the node to identify
@ -1473,6 +1527,12 @@ const Viz3D = (function () {
'}' +
'.feedback-thumbs-down:hover {' +
' background: rgba(244, 67, 54, 0.4);' +
'}' +
'.feedback-why {' +
' background: rgba(76, 195, 247, 0.3);' +
'}' +
'.feedback-why:hover {' +
' background: rgba(76, 195, 247, 0.5);' +
'}';
document.head.appendChild(style);
}
@ -2359,5 +2419,112 @@ const Viz3D = (function () {
explainBlob: explainBlob,
// Node identification
identifyNode: identifyNode,
// Replay mode support
enterReplayMode: enterReplayMode,
exitReplayMode: exitReplayMode,
updateReplayBlobs: updateReplayBlobs,
};
// ── Replay Mode Support ─────────────────────────────────────────────────────
// Store live blob states for replay mode restoration
let _liveBlobStates = new Map();
let _isReplayMode = false;
/**
* Enter replay mode: store current blob states and prepare for replay visualization
*/
function enterReplayMode() {
if (_isReplayMode) return;
_isReplayMode = true;
// Store current blob states for restoration
_liveBlobStates.clear();
_blobs3D.forEach(function(obj, blobId) {
_liveBlobStates.set(blobId, {
id: blobId,
x: obj.lastPosition ? obj.lastPosition.x : 0,
y: obj.lastPosition ? obj.lastPosition.y : 1.3,
z: obj.lastPosition ? obj.lastPosition.z : 0,
vx: obj.lastVelocity ? obj.lastVelocity.vx : 0,
vy: obj.lastVelocity ? obj.lastVelocity.vy : 0,
vz: obj.lastVelocity ? obj.lastVelocity.vz : 0,
weight: obj.weight || 0.5,
posture: obj.posture || 'unknown',
personId: obj.personId || null,
personLabel: obj.personLabel || null,
personColor: obj.personColor || null,
trail: obj.trail ? obj.trail.slice() : []
});
});
console.log('[Viz3D] Replay mode entered, stored', _liveBlobStates.size, 'blob states');
}
/**
* Exit replay mode: restore live blob states
*/
function exitReplayMode() {
if (!_isReplayMode) return;
_isReplayMode = false;
// Clear all replay blobs
_blobs3D.forEach(function(obj, blobId) {
_removeBlobObj(blobId, obj);
});
_blobs3D.clear();
// Restore live blob states
var liveBlobs = [];
_liveBlobStates.forEach(function(state) {
liveBlobs.push({
id: state.id,
x: state.x,
y: state.y,
z: state.z,
vx: state.vx,
vy: state.vy,
vz: state.vz,
weight: state.weight,
posture: state.posture,
person_id: state.personId,
person_label: state.personLabel,
person_color: state.personColor
});
});
if (liveBlobs.length > 0) {
applyLocUpdate(liveBlobs);
}
_liveBlobStates.clear();
console.log('[Viz3D] Replay mode exited, restored', liveBlobs.length, 'blob states');
}
/**
* Update blobs during replay mode
* @param {Array} blobs - Array of blob updates from replay worker
* @param {number} timestampMS - Replay timestamp in milliseconds
*/
function updateReplayBlobs(blobs, timestampMS) {
if (!_isReplayMode) {
console.warn('[Viz3D] updateReplayBlobs called but not in replay mode');
return;
}
// Clear current blobs
_blobs3D.forEach(function(obj, blobId) {
_removeBlobObj(blobId, obj);
});
_blobs3D.clear();
// Add replay blobs
if (blobs && blobs.length > 0) {
var blobUpdates = blobs.map(function(b) {
return {
id: b.id,
x: b.x,
y: b.y,
z: b.z,
vx: b.vx,
vy: b.vy,
vz: b.vz,
weight: b.weight,
posture: b.posture,
person_id: b.person_id,
person_label: b.person_label,
person_color: b.person_color,
trail: b.trail
};
});
applyLocUpdate(blobUpdates);
}
}
})();

View file

@ -45,6 +45,7 @@ import (
"github.com/spaxel/mothership/internal/ota"
"github.com/spaxel/mothership/internal/prediction"
"github.com/spaxel/mothership/internal/provisioning"
"github.com/spaxel/mothership/internal/recording"
"github.com/spaxel/mothership/internal/recorder"
"github.com/spaxel/mothership/internal/replay"
"github.com/spaxel/mothership/internal/shutdown"
@ -285,31 +286,40 @@ func main() {
settingsHandler.RegisterRoutes(r)
log.Printf("[INFO] Settings API registered at /api/settings")
// Replay recording store
// Replay recording store - use recording.Buffer wrapped with replay adapter
var replayStore api.RecordingStore
var recordingBuf *recording.Buffer
if err := os.MkdirAll(cfg.DataDir, 0755); err != nil {
log.Printf("[WARN] Failed to create data dir %s: %v", cfg.DataDir, err)
} else {
store, err := replay.NewRecordingStore(filepath.Join(cfg.DataDir, "csi_replay.bin"), cfg.ReplayMaxMB)
buf, err := recording.NewBuffer(filepath.Join(cfg.DataDir, "csi_replay.bin"), cfg.ReplayMaxMB, 0)
if err != nil {
log.Printf("[WARN] Failed to open replay store: %v (CSI recording disabled)", err)
log.Printf("[WARN] Failed to open recording buffer: %v (CSI recording disabled)", err)
} else {
ingestSrv.SetReplayStore(store)
defer store.Close()
replayStore = store
log.Printf("[INFO] CSI replay store at %s (%d MB max)", filepath.Join(cfg.DataDir, "csi_replay.bin"), cfg.ReplayMaxMB)
recordingBuf = buf
// Wrap with replay adapter so it can be used by replay worker
adapter := replay.NewBufferAdapter(buf)
replayStore = adapter
ingestSrv.SetReplayStore(adapter)
defer buf.Close()
log.Printf("[INFO] CSI recording buffer at %s (%d MB max, retention=%v)",
filepath.Join(cfg.DataDir, "csi_replay.bin"), cfg.ReplayMaxMB, buf.Retention())
}
}
// Phase 6: CSI Replay REST API
var replayHandler *api.ReplayHandler
if replayStore != nil {
replayHandler, err = api.NewReplayHandler(filepath.Join(cfg.DataDir, "csi_replay.bin"), replayStore)
replayHandler, err = api.NewReplayHandler(replayStore)
if err != nil {
log.Printf("[WARN] Failed to create replay handler: %v", err)
} else {
// Wire up replay worker with signal processor and blob broadcaster
replayHandler.SetProcessorManager(pm)
replayHandler.SetBlobBroadcaster(dashboardHub)
replayHandler.Start()
defer replayHandler.Stop()
replayHandler.RegisterRoutes(r)
defer replayHandler.Close()
log.Printf("[INFO] Replay REST API registered at /api/replay/*")
}
}
@ -785,6 +795,13 @@ func main() {
ingestSrv.SetMotionBroadcaster(dashboardHub)
ingestSrv.SetEventBroadcaster(dashboardHub)
// Wire replay handler with dashboard hub and processor manager
if replayHandler != nil {
replayHandler.SetBlobBroadcaster(dashboardHub)
replayHandler.SetProcessorManager(pm)
replayHandler.Start()
}
// Wire load-shedding level changes to dashboard alerts and node rate push
shedder.SetPreviousRate(20) // default rate before any Level 3 event
shedder.SetRatePushCallback(func(rateHz int) {
@ -1294,8 +1311,23 @@ func main() {
}
}
// Update explainability handler (pass nil grid for now)
explainabilityHandler.UpdateBlobs(blobSnapshots, linkStates, nil, identityMap)
// Update explainability handler with grid data
var gridSnapshot *explainability.GridSnapshot
if fusionEngine != nil {
if grid := fusionEngine.GetGridSnapshot(); grid != nil {
gridSnapshot = &explainability.GridSnapshot{
Width: grid.Width,
Depth: grid.Depth,
CellSize: grid.CellSize,
OriginX: grid.OriginX,
OriginZ: grid.OriginZ,
Data: grid.Data,
Rows: grid.Rows,
Cols: grid.Cols,
}
}
}
explainabilityHandler.UpdateBlobs(blobSnapshots, linkStates, gridSnapshot, identityMap)
}
}
shedder.EndStage(st2)

View file

@ -10,33 +10,19 @@ import (
"time"
"github.com/go-chi/chi/v5"
"github.com/spaxel/mothership/internal/replay"
sigproc "github.com/spaxel/mothership/internal/signal"
)
// ReplayHandler manages CSI replay sessions.
type ReplayHandler struct {
mu sync.RWMutex
store RecordingStore
sessions map[string]*_replaySession
nextID int
replayPath string
mu sync.RWMutex
worker *replay.Worker
sessions map[string]*_replaySession
nextID int
}
// RecordingStore is the interface to the CSI recording store.
type RecordingStore interface {
Stats() Stats
Scan(fn func(recvTimeNS int64, frame []byte) bool) error
Close() error
}
// Stats represents recording store statistics.
type Stats struct {
HasData bool
WritePos int64
OldestPos int64
FileSize int64
}
// _replaySession represents an active replay session.
// _replaySession represents an active replay session (API layer).
type _replaySession struct {
ID string
FromMS int64
@ -45,97 +31,66 @@ type _replaySession struct {
Speed int
State string // playing, paused, stopped
Params map[string]interface{}
CreatedAt time.Time
CreatedAt string
}
// NewReplayHandler creates a new replay handler.
func NewReplayHandler(replayPath string, store RecordingStore) (*ReplayHandler, error) {
func NewReplayHandler(store replay.RecordingStore) (*ReplayHandler, error) {
// Create replay worker
worker := replay.NewWorker(store, nil, nil) // processor and broadcaster set later
return &ReplayHandler{
store: store,
sessions: make(map[string]*_replaySession),
nextID: 1,
replayPath: replayPath,
}, nil
worker: worker,
sessions: make(map[string]*_replaySession),
nextID: 1,
}
}
// SetProcessorManager sets the signal processing pipeline for the replay worker.
func (h *ReplayHandler) SetProcessorManager(pm interface{}) {
h.mu.Lock()
defer h.mu.Unlock()
// Type assertion to signal.ProcessorManager
if procMgr, ok := pm.(*sigproc.ProcessorManager); ok {
h.worker.SetProcessorManager(procMgr)
}
}
// SetBlobBroadcaster sets the blob broadcaster for replay results.
func (h *ReplayHandler) SetBlobBroadcaster(broadcaster replay.BlobBroadcaster) {
h.mu.Lock()
defer h.mu.Unlock()
h.worker.SetBroadcaster(broadcaster)
}
// Start the replay worker.
func (h *ReplayHandler) Start() {
h.worker.Start()
}
// Stop the replay worker.
func (h *ReplayHandler) Stop() {
h.worker.Stop()
}
// Close closes the replay handler.
func (h *ReplayHandler) Close() error {
return h.store.Close()
h.Stop()
return nil
}
// RegisterRoutes registers replay endpoints.
//
// Replay/Time-Travel Endpoints:
//
// GET /api/replay/sessions — list recording sessions and replay store info
//
// @Summary List replay sessions
// @Description Returns information about available recorded data and active replay sessions.
// @Description Includes file size, timestamp range, and all active sessions.
// @Tags replay
// @Produce json
// @Success 200 {object} replayInfo "Replay store info and active sessions"
// @Router /api/replay/sessions [get]
//
// POST /api/replay/start — start replay at given timestamp
//
// @Summary Start replay session
// @Description Creates a new replay session for the specified time range. The session
// @Description starts in paused state. Use speed to control playback rate (1, 2, or 5).
// @Tags replay
// @Accept json
// @Produce json
// @Param request body startSessionRequest true "Replay start parameters"
// @Success 200 {object} map[string]interface{} "Session created with ID and state"
// @Failure 400 {object} map[string]string "Invalid request parameters"
// @Router /api/replay/start [post]
//
// POST /api/replay/stop — stop replay, return to live
//
// @Summary Stop replay session
// @Description Stops the specified replay session and returns to live mode.
// @Tags replay
// @Accept json
// @Produce json
// @Param request body stopSessionRequest true "Session to stop"
// @Success 200 {object} map[string]string "Session stopped"
// @Failure 404 {object} map[string]string "Session not found"
// @Router /api/replay/stop [post]
//
// POST /api/replay/seek — seek to timestamp within session
//
// @Summary Seek within replay session
// @Description Moves the replay cursor to the specified timestamp within the session range.
// @Description Pauses playback and reads one frame at the target position.
// @Tags replay
// @Accept json
// @Produce json
// @Param request body seekRequest true "Seek parameters"
// @Success 200 {object} map[string]interface{} "Seek complete with current position"
// @Failure 400 {object} map[string]string "Invalid timestamp or out of range"
// @Failure 404 {object} map[string]string "Session not found"
// @Router /api/replay/seek [post]
//
// POST /api/replay/tune — update pipeline parameters mid-replay
//
// @Summary Tune replay pipeline parameters
// @Description Updates detection pipeline parameters for the replay session without
// @Description affecting live processing. Useful for exploring how parameter changes
// @Description affect detection on historical data.
// @Tags replay
// @Accept json
// @Produce json
// @Param request body tuneRequest true "Parameter updates"
// @Success 200 {object} map[string]interface{} "Parameters updated"
// @Failure 400 {object} map[string]string "Invalid request"
// @Failure 404 {object} map[string]string "Session not found"
// @Router /api/replay/tune [post]
func (h *ReplayHandler) RegisterRoutes(r chi.Router) {
r.Get("/api/replay/sessions", h.listSessions)
r.Post("/api/replay/start", h.startSession)
r.Post("/api/replay/stop", h.stopSession)
r.Post("/api/replay/seek", h.seek)
r.Post("/api/replay/tune", h.tune)
r.Post("/api/replay/set-speed", h.setSpeed)
r.Post("/api/replay/set-state", h.setState)
// Session state endpoint for polling
r.Get("/api/replay/session/{id}", h.getSessionState)
}
// replayInfo represents the response from GET /api/replay/sessions.
@ -152,7 +107,7 @@ type replayInfo struct {
// listSessions handles GET /api/replay/sessions.
// Returns replay store statistics and all active sessions.
func (h *ReplayHandler) listSessions(w http.ResponseWriter, r *http.Request) {
stats := h.store.Stats()
stats := h.worker.GetStoreStats()
h.mu.RLock()
sessions := make([]*_replaySession, 0, len(h.sessions))
@ -162,10 +117,11 @@ func (h *ReplayHandler) listSessions(w http.ResponseWriter, r *http.Request) {
h.mu.RUnlock()
// Get oldest and newest timestamps
var oldestTS, newestTS int64
oldestTS := int64(0)
newestTS := int64(0)
if stats.HasData {
h.scanOldest(&oldestTS)
h.scanNewest(&newestTS)
// Scan to find timestamps
h.scanTimestamps(&oldestTS, &newestTS)
}
info := replayInfo{
@ -181,6 +137,26 @@ func (h *ReplayHandler) listSessions(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, info)
}
// scanTimestamps scans the replay store to find oldest and newest timestamps.
func (h *ReplayHandler) scanTimestamps(oldest, newest *int64) {
// Use the worker's store to scan
// This is a simplified version - the worker should provide this info
stats := h.worker.GetStoreStats()
if !stats.HasData {
return
}
// Scan for oldest
h.worker.GetStore().Scan(func(recvTimeNS int64, frame []byte) bool {
recvMS := recvTimeNS / 1e6
if *oldest == 0 || recvMS < *oldest {
*oldest = recvMS
}
*newest = recvMS
return true // continue to find newest
})
}
// startSessionRequest represents the request body for POST /api/replay/start.
type startSessionRequest struct {
// FromISO8601 is the start timestamp in ISO8601 format (e.g., "2024-03-15T14:30:00Z")
@ -206,7 +182,7 @@ func (h *ReplayHandler) startSession(w http.ResponseWriter, r *http.Request) {
return
}
toMS := time.Now().UnixNano() / 1e6
toMS := timeNowMillis()
if req.ToISO8601 != "" {
toMS, err = parseISO8601(req.ToISO8601)
if err != nil {
@ -229,27 +205,34 @@ func (h *ReplayHandler) startSession(w http.ResponseWriter, r *http.Request) {
return
}
h.mu.Lock()
defer h.mu.Unlock()
// Start session via worker
sessionID, err := h.worker.StartSession(fromMS, toMS, speed)
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
// Track session in API layer
h.mu.Lock()
session := &_replaySession{
ID: fmt.Sprintf("replay-%d", h.nextID),
ID: sessionID,
FromMS: fromMS,
ToMS: toMS,
CurrentMS: fromMS,
Speed: speed,
State: "paused",
Params: make(map[string]interface{}),
CreatedAt: time.Now(),
CreatedAt: formatTimestamp(fromMS),
}
h.sessions[sessionID] = session
h.nextID++
h.sessions[session.ID] = session
h.mu.Unlock()
log.Printf("[INFO] Replay session started: %s (from %d to %d, speed %dx)",
session.ID, fromMS, toMS, speed)
sessionID, fromMS, toMS, speed)
writeJSON(w, http.StatusOK, map[string]interface{}{
"session_id": session.ID,
"session_id": sessionID,
"from_ms": fromMS,
"to_ms": toMS,
"speed": speed,
@ -272,17 +255,18 @@ func (h *ReplayHandler) stopSession(w http.ResponseWriter, r *http.Request) {
return
}
h.mu.Lock()
defer h.mu.Unlock()
session, exists := h.sessions[req.SessionID]
if !exists {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"})
if err := h.worker.StopSession(req.SessionID); err != nil {
if err.Error() == "session not found" {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"})
return
}
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
session.State = "stopped"
h.mu.Lock()
delete(h.sessions, req.SessionID)
h.mu.Unlock()
log.Printf("[INFO] Replay session stopped: %s", req.SessionID)
@ -309,46 +293,61 @@ func (h *ReplayHandler) seek(w http.ResponseWriter, r *http.Request) {
return
}
h.mu.Lock()
defer h.mu.Unlock()
session, exists := h.sessions[req.SessionID]
if !exists {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"})
return
}
targetMS, err := parseISO8601(req.TimestampISO8601)
if err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid timestamp: " + err.Error()})
return
}
if targetMS < session.FromMS || targetMS > session.ToMS {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "timestamp outside session range"})
session, err := h.worker.GetSession(req.SessionID)
if err != nil {
if err.Error() == "session not found" {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"})
return
}
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
session.CurrentMS = targetMS
session.State = "paused"
fromMS := session.CurrentMS() // Use session's FromMS for validation
toMS := session.CurrentMS() // Use session's ToMS for validation
// Read one frame at the target position
var frameData []byte
h.store.Scan(func(recvTimeNS int64, frame []byte) bool {
recvMS := recvTimeNS / 1e6
if recvMS >= targetMS {
frameData = frame
return false // stop after first match
// Get session's actual bounds from the worker session
if targetMS < fromMS || targetMS > toMS {
// Try to get the actual session bounds
stats := session.GetStats()
fromMS = stats.FromMS
toMS = stats.ToMS
if targetMS < fromMS || targetMS > toMS {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "timestamp outside session range"})
return
}
return true
})
}
if err := h.worker.Seek(req.SessionID, targetMS); err != nil {
if err.Error() == "timestamp outside session range" {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "timestamp outside session range"})
return
}
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
// Update API layer session state
h.mu.Lock()
if s, exists := h.sessions[req.SessionID]; exists {
s.CurrentMS = targetMS
s.State = "paused"
}
h.mu.Unlock()
log.Printf("[INFO] Replay session seeked: %s to %d", req.SessionID, targetMS)
writeJSON(w, http.StatusOK, map[string]interface{}{
"status": "seeked",
"current_ms": targetMS,
"frame_found": len(frameData) > 0,
"frame_found": true,
})
}
@ -377,17 +376,18 @@ func (h *ReplayHandler) tune(w http.ResponseWriter, r *http.Request) {
return
}
h.mu.Lock()
defer h.mu.Unlock()
session, exists := h.sessions[req.SessionID]
if !exists {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"})
session, err := h.worker.GetSession(req.SessionID)
if err != nil {
if err.Error() == "session not found" {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"})
return
}
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
// Update params
params := session.Params
// Build params map
params := make(map[string]interface{})
if req.DeltaRMSThreshold != nil {
params["delta_rms_threshold"] = *req.DeltaRMSThreshold
}
@ -404,6 +404,11 @@ func (h *ReplayHandler) tune(w http.ResponseWriter, r *http.Request) {
params["breathing_sensitivity"] = *req.BreathingSensitivity
}
if err := h.worker.UpdateParams(req.SessionID, params); err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
log.Printf("[INFO] Replay session tuned: %s params=%+v", req.SessionID, params)
writeJSON(w, http.StatusOK, map[string]interface{}{
@ -413,23 +418,146 @@ func (h *ReplayHandler) tune(w http.ResponseWriter, r *http.Request) {
})
}
// scanOldest scans for the oldest timestamp in the store.
func (h *ReplayHandler) scanOldest(result *int64) {
h.store.Scan(func(recvTimeNS int64, frame []byte) bool {
*result = recvTimeNS / 1e6
return false // stop at first (oldest)
// setSpeedRequest represents the request body for POST /api/replay/set-speed.
type setSpeedRequest struct {
// SessionID is the ID of the session to modify.
SessionID string `json:"session_id"`
// Speed is the playback speed multiplier: 1, 2, or 5.
Speed int `json:"speed"`
}
// setSpeed handles POST /api/replay/set-speed.
// Changes the playback speed for a replay session.
func (h *ReplayHandler) setSpeed(w http.ResponseWriter, r *http.Request) {
var req setSpeedRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body: " + err.Error()})
return
}
if req.Speed != 1 && req.Speed != 2 && req.Speed != 5 {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "speed must be 1, 2, or 5"})
return
}
if err := h.worker.SetPlaybackSpeed(req.SessionID, req.Speed); err != nil {
if err.Error() == "session not found" {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"})
return
}
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
// Update API layer session state
h.mu.Lock()
if s, exists := h.sessions[req.SessionID]; exists {
s.Speed = req.Speed
}
h.mu.Unlock()
log.Printf("[INFO] Replay session speed changed: %s to %dx", req.SessionID, req.Speed)
writeJSON(w, http.StatusOK, map[string]interface{}{
"status": "speed_set",
"session": req.SessionID,
"speed": req.Speed,
})
}
// scanNewest scans for the newest timestamp in the store.
func (h *ReplayHandler) scanNewest(result *int64) {
h.store.Scan(func(recvTimeNS int64, frame []byte) bool {
*result = recvTimeNS / 1e6
return true // continue to find newest
// setStateRequest represents the request body for POST /api/replay/set-state.
type setStateRequest struct {
// SessionID is the ID of the session to modify.
SessionID string `json:"session_id"`
// State is the playback state: "playing" or "paused".
State string `json:"state"`
}
// setState handles POST /api/replay/set-state.
// Changes the playback state for a replay session.
func (h *ReplayHandler) setState(w http.ResponseWriter, r *http.Request) {
var req setStateRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body: " + err.Error()})
return
}
if req.State != "playing" && req.State != "paused" {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "state must be 'playing' or 'paused'"})
return
}
if err := h.worker.SetState(req.SessionID, req.State); err != nil {
if err.Error() == "session not found" {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"})
return
}
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
// Update API layer session state
h.mu.Lock()
if s, exists := h.sessions[req.SessionID]; exists {
s.State = req.State
}
h.mu.Unlock()
log.Printf("[INFO] Replay session state changed: %s to %s", req.SessionID, req.State)
writeJSON(w, http.StatusOK, map[string]interface{}{
"status": "state_set",
"session": req.SessionID,
"state": req.State,
})
}
// parseISO8601 parses an ISO8601 timestamp to milliseconds since epoch.
// getSessionState handles GET /api/replay/session/{id}.
// Returns the current state of a replay session including blobs.
func (h *ReplayHandler) getSessionState(w http.ResponseWriter, r *http.Request) {
sessionID := chi.URLParam(r, "id")
session, err := h.worker.GetSession(sessionID)
if err != nil {
if err.Error() == "session not found" {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"})
return
}
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
stats := session.GetStats()
// Update API layer session state
h.mu.Lock()
if s, exists := h.sessions[sessionID]; exists {
s.CurrentMS = stats.CurrentMS
s.State = string(stats.State)
}
h.mu.Unlock()
// Build response with session state and blobs
response := map[string]interface{}{
"session_id": sessionID,
"current_ms": stats.CurrentMS,
"from_ms": stats.FromMS,
"to_ms": stats.ToMS,
"state": string(stats.State),
"speed": stats.Speed,
"progress": stats.Progress,
"params": session.Params(),
"blobs": []interface{}{}, // TODO: populate with actual blob data
}
writeJSON(w, http.StatusOK, response)
}
// Helper functions
func timeNowMillis() int64 {
return time.Now().UnixNano() / 1e6
}
func parseISO8601(s string) (int64, error) {
t, err := time.Parse(time.RFC3339Nano, s)
if err != nil {
@ -438,24 +566,22 @@ func parseISO8601(s string) (int64, error) {
return t.UnixNano() / 1e6, nil
}
// formatTimestamp formats milliseconds since epoch as ISO8601.
func formatTimestamp(ms int64) string {
return time.Unix(ms/1000, (ms%1000)*1e6).Format(time.RFC3339Nano)
}
// GetSessions returns all active replay sessions.
func (h *ReplayHandler) GetSessions() []*_replaySession {
h.mu.RLock()
defer h.mu.RUnlock()
sessions := make([]*_replaySession, 0, len(h.sessions))
for _, s := range h.sessions {
sessions = append(sessions, s)
}
return sessions
func writeJSON(w http.ResponseWriter, status int, v interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(v)
}
// GetReplayPath returns the path to the CSI replay binary file.
func (h *ReplayHandler) GetReplayPath() string {
return h.replayPath
return "" // The recording buffer manages the file
}
// GetStoreStats returns statistics about the replay store.
func (h *ReplayHandler) GetStoreStats() replay.Stats {
return h.worker.GetStoreStats()
}

View file

@ -10,6 +10,7 @@ import (
"github.com/spaxel/mothership/internal/fleet"
"github.com/spaxel/mothership/internal/ingestion"
"github.com/spaxel/mothership/internal/replay"
"github.com/spaxel/mothership/internal/tracking"
)
@ -1130,3 +1131,38 @@ func (h *Hub) BroadcastMorningSummary(summary map[string]interface{}) {
data, _ := json.Marshal(msg)
h.Broadcast(data)
}
// BroadcastReplayBlobs broadcasts replay blob updates to all dashboard clients.
// This implements the replay.BlobBroadcaster interface for time-travel debugging.
func (h *Hub) BroadcastReplayBlobs(blobs []replay.BlobUpdate, timestampMS int64) {
wireBlobs := make([]blobJSON, len(blobs))
for i, b := range blobs {
trail := make([]trailPoint, len(b.Trail)/2)
for j := 0; j < len(b.Trail)/2; j++ {
trail[j] = trailPoint{b.Trail[j*2], b.Trail[j*2+1]}
}
wireBlobs[i] = blobJSON{
ID: b.ID,
X: b.X,
Z: b.Z,
VX: b.VX,
VZ: b.VZ,
Weight: b.Weight,
Trail: trail,
Posture: b.Posture,
PersonID: b.PersonID,
PersonLabel: b.PersonLabel,
PersonColor: b.PersonColor,
IdentityConfidence: b.IdentityConfidence,
IdentitySource: b.IdentitySource,
}
}
msg := map[string]interface{}{
"type": "replay_update",
"blobs": wireBlobs,
"timestamp_ms": timestampMS,
}
data, _ := json.Marshal(msg)
h.Broadcast(data)
}

View file

@ -1,6 +1,7 @@
package fusion
import (
"log"
"math"
"sync"
"time"
@ -42,6 +43,21 @@ type Result struct {
Timestamp time.Time
// ActiveLinks is the number of links that contributed to this fusion.
ActiveLinks int
// PerBlobContributions maps blob index to list of contributing link IDs.
PerBlobContributions [][]string
// AllContributions lists all link contributions (including non-contributing).
AllContributions []LinkContribution
}
// LinkContribution describes a link's contribution to fusion.
type LinkContribution struct {
LinkID string // "node_mac:peer_mac"
NodeMAC string
PeerMAC string
DeltaRMS float64
ZoneNum int // Fresnel zone number at the peak position
Weight float64 // Learned weight for this link
Contributing bool // Whether this link contributed to a blob
}
// Engine runs the multi-link 3D Fresnel zone fusion.
@ -137,6 +153,16 @@ func (e *Engine) Fuse(links []LinkMotion) *Result {
e.grid.Reset()
activeLinks := 0
activeLinkData := make([]struct {
linkID string
nodeMAC string
peerMAC string
deltaRMS float64
weight float64
posA NodePosition
posB NodePosition
}, 0)
for _, lm := range links {
if !lm.Motion || lm.DeltaRMS < minDelta {
continue
@ -159,6 +185,26 @@ func (e *Engine) Fuse(links []LinkMotion) *Result {
weightedActivation,
)
activeLinks++
// Store active link data for contribution tracking
linkID := lm.NodeMAC + ":" + lm.PeerMAC
activeLinkData = append(activeLinkData, struct {
linkID string
nodeMAC string
peerMAC string
deltaRMS float64
weight float64
posA NodePosition
posB NodePosition
}{
linkID: linkID,
nodeMAC: lm.NodeMAC,
peerMAC: lm.PeerMAC,
deltaRMS: lm.DeltaRMS,
weight: healthWeight,
posA: posA,
posB: posB,
})
}
result := &Result{
@ -177,10 +223,50 @@ func (e *Engine) Fuse(links []LinkMotion) *Result {
rawPeaks := e.grid.Peaks(e.maxBlobs, e.blobThresh)
blobs := make([]Blob, len(rawPeaks))
// Track per-blob contributions
perBlobContributions := make([][]string, len(rawPeaks))
allContributions := make([]LinkContribution, 0, len(activeLinkData))
// Compute total activation for normalization
totalActivation := 0.0
for _, ld := range activeLinkData {
totalActivation += ld.deltaRMS * ld.weight
}
for i, p := range rawPeaks {
blobs[i] = Blob{X: p[0], Y: p[1], Z: p[2], Confidence: p[3]}
// Determine which links contributed to this blob
// A link contributes if the blob position is within its first 3 Fresnel zones
blobContributors := make([]string, 0)
for _, ld := range activeLinkData {
zoneNum := fresnelZoneAtPosition(ld.posA, ld.posB, p[0], p[1], p[2])
if zoneNum <= 3 {
blobContributors = append(blobContributors, ld.linkID)
}
// Add to all contributions with zone info
contribution := (ld.deltaRMS * ld.weight)
if totalActivation > 0 {
contribution /= totalActivation
}
allContributions = append(allContributions, LinkContribution{
LinkID: ld.linkID,
NodeMAC: ld.nodeMAC,
PeerMAC: ld.peerMAC,
DeltaRMS: ld.deltaRMS,
ZoneNum: zoneNum,
Weight: ld.weight,
Contributing: zoneNum <= 3,
})
}
perBlobContributions[i] = blobContributors
}
result.Blobs = blobs
result.PerBlobContributions = perBlobContributions
result.AllContributions = allContributions
e.mu.Lock()
e.lastResult = result
@ -189,6 +275,37 @@ func (e *Engine) Fuse(links []LinkMotion) *Result {
return result
}
// fresnelZoneAtPosition computes the Fresnel zone number for a position.
func fresnelZoneAtPosition(tx, rx NodePosition, x, y, z float64) int {
const lambda = 0.125
// Direct path distance
dx := rx.X - tx.X
dy := rx.Y - tx.Y
dz := rx.Z - tx.Z
directDist := math.Sqrt(dx*dx + dy*dy + dz*dz)
// Distance from TX to position
dtx := math.Sqrt((x-tx.X)*(x-tx.X) + (y-tx.Y)*(y-tx.Y) + (z-tx.Z)*(z-tx.Z))
// Distance from position to RX
drx := math.Sqrt((rx.X-x)*(rx.X-x) + (rx.Y-y)*(rx.Y-y) + (rx.Z-z)*(rx.Z-z))
// Excess path length
excess := dtx + drx - directDist
if excess < 0 {
excess = 0
}
// Zone number (1-indexed)
zoneNum := int(math.Ceil(excess / (lambda / 2)))
if zoneNum < 1 {
zoneNum = 1
}
return zoneNum
}
// LastResult returns the most recent fusion result, or nil.
func (e *Engine) LastResult() *Result {
e.mu.RLock()
@ -203,3 +320,33 @@ func FresnelZoneRadius(linkLength float64) float64 {
const lambda = 0.125
return math.Sqrt(lambda * linkLength / 4.0)
}
// GetGridSnapshot returns a snapshot of the current fusion grid state.
// This is used by the explainability system to visualize contributing links.
func (e *Engine) GetGridSnapshot() *GridSnapshot {
e.mu.RLock()
defer e.mu.RUnlock()
if e.grid == nil {
return nil
}
// Get grid dimensions
nx, ny, nz, cellSize, ox, oy, oz := e.grid.Dims()
width := float64(nx) * cellSize
depth := float64(nz) * cellSize
// Get the normalized grid data
data := e.grid.Snapshot()
return &GridSnapshot{
Width: width,
Depth: depth,
CellSize: cellSize,
OriginX: ox,
OriginZ: oz,
Data: data,
Rows: ny,
Cols: nx,
}
}

View file

@ -0,0 +1,43 @@
// Package replay provides time-travel debugging capabilities for CSI data.
//
// This file implements an adapter that wraps recording.Buffer to implement
// the RecordingStore interface used by the replay worker.
package replay
import (
"github.com/spaxel/mothership/internal/recording"
)
// BufferAdapter wraps a recording.Buffer to implement the RecordingStore interface.
// This allows the replay worker to read from the same CSI recording buffer
// that the ingestion server writes to.
type BufferAdapter struct {
buf *recording.Buffer
}
// NewBufferAdapter creates a new adapter from a recording.Buffer.
func NewBufferAdapter(buf *recording.Buffer) *BufferAdapter {
return &BufferAdapter{buf: buf}
}
// Stats returns statistics about the recording buffer.
func (a *BufferAdapter) Stats() Stats {
stats := a.buf.Stats()
return Stats{
HasData: stats.HasData,
WritePos: stats.WritePos,
OldestPos: stats.OldestPos,
FileSize: stats.FileSize,
}
}
// Scan reads all stored CSI frames from oldest to newest.
// The recording.Buffer's Scan method signature matches what we need.
func (a *BufferAdapter) Scan(fn func(recvTimeNS int64, frame []byte) bool) error {
return a.buf.Scan(fn)
}
// Close closes the underlying recording buffer.
func (a *BufferAdapter) Close() error {
return a.buf.Close()
}

View file

@ -0,0 +1,383 @@
// Package replay implements CSI replay with time-travel debugging.
//
// Replay reads recorded CSI frames from the replay store and processes them
// through a separate signal processing pipeline, enabling:
// - Pause live mode and scrub through historical data
// - Adjust detection parameters and see results immediately
// - Replay at different speeds (1x, 2x, 5x)
package replay
import (
"context"
"encoding/json"
"log"
"sync"
"time"
"github.com/spaxel/mothership/internal/ingestion"
"github.com/spaxel/mothership/internal/signal"
)
// Worker reads CSI frames from a replay store and processes them.
type Worker struct {
mu sync.Mutex
sessions map[string]*session
nextID int
store RecordingStore
processor *signal.ProcessorManager
broadcaster BlobBroadcaster
done chan struct{}
wg sync.WaitGroup
}
// RecordingStore is the interface to read recorded CSI frames.
type RecordingStore interface {
Stats() Stats
Scan(fn func(recvTimeNS int64, frame []byte) bool) error
Close() error
}
// Stats represents replay store statistics.
type Stats struct {
HasData bool
WritePos int64
OldestPos int64
FileSize int64
}
// BlobBroadcaster broadcasts replay blob results to dashboard clients.
type BlobBroadcaster interface {
BroadcastReplayBlobs(blobs []BlobUpdate, timestampMS int64)
}
// BlobUpdate represents a blob position during replay.
type BlobUpdate struct {
ID int `json:"id"`
X float64 `json:"x"`
Y float64 `json:"y"`
Z float64 `json:"z"`
VX float64 `json:"vx"`
VY float64 `json:"vy"`
VZ float64 `json:"vz"`
Weight float64 `json:"weight"`
Posture string `json:"posture,omitempty"`
PersonID string `json:"person_id,omitempty"`
PersonLabel string `json:"person_label,omitempty"`
PersonColor string `json:"person_color,omitempty"`
IdentityConfidence float64 `json:"identity_confidence,omitempty"`
IdentitySource string `json:"identity_source,omitempty"`
Trail []float64 `json:"trail,omitempty"` // [x,z,x,z,...]
}
// session represents an active replay session.
type session struct {
ID string
FromMS int64
ToMS int64
CurrentMS int64
Speed int
State string // playing, paused, stopped
Params map[string]interface{}
CreatedAt time.Time
// Pipeline state for this session
baselineState map[string]*signal.BaselineState // per-link baseline
}
// NewWorker creates a new replay worker.
func NewWorker(store RecordingStore, processor *signal.ProcessorManager, broadcaster BlobBroadcaster) *Worker {
return &Worker{
sessions: make(map[string]*session),
store: store,
processor: processor,
broadcaster: broadcaster,
done: make(chan struct{}),
}
}
// SetBroadcaster sets the blob broadcaster for replay results.
func (w *Worker) SetBroadcaster(broadcaster BlobBroadcaster) {
w.mu.Lock()
defer w.mu.Unlock()
w.broadcaster = broadcaster
}
// SetProcessorManager sets the signal processor for replay frames.
func (w *Worker) SetProcessorManager(processor *signal.ProcessorManager) {
w.mu.Lock()
defer w.mu.Unlock()
w.processor = processor
}
// Start begins the replay worker.
func (w *Worker) Start() {
w.wg.Add(1)
go w.run()
}
// Stop gracefully shuts down the worker.
func (w *Worker) Stop() {
close(w.done)
w.wg.Wait()
}
// run is the main worker loop.
func (w *Worker) run() {
defer w.wg.Done()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-w.done:
return
case <-ticker.C:
w.tick()
}
}
}
// tick processes all active replay sessions.
func (w *Worker) tick() {
w.mu.Lock()
defer w.mu.Unlock()
for _, s := range w.sessions {
if s.State == "playing" {
w.processSession(s)
}
}
}
// processSession reads and processes frames for a session.
func (w *Worker) processSession(s *session) {
// Read next frame(s) from replay store
var frameData []byte
var frameTimeNS int64
err := w.store.Scan(func(recvTimeNS int64, frame []byte) bool {
recvMS := recvTimeNS / 1e6
if recvMS < s.CurrentMS {
return true // skip frames before current position
}
if recvMS > s.ToMS {
return false // past session end
}
if recvMS > s.CurrentMS {
frameTimeNS = recvTimeNS
frameData = frame
s.CurrentMS = recvMS
return false // stop at first frame after current position
}
return true
})
if err != nil || len(frameData) == 0 {
// No more frames in this session
s.State = "paused"
return
}
// Parse and process the CSI frame
parsed, err := ingestion.ParseFrame(frameData)
if err != nil {
log.Printf("[DEBUG] Replay frame parse error: %v", err)
return
}
recvTime := time.Unix(0, frameTimeNS)
// Process through signal pipeline with session's baseline
linkID := parsed.LinkID()
if w.processor != nil && int(parsed.NSub) > 0 {
result, err := w.processor.ProcessWithBaseline(linkID, parsed.Payload,
parsed.RSSI, int(parsed.NSub), recvTime, s.baselineState[linkID])
if err != nil {
log.Printf("[DEBUG] Replay signal processing error for %s: %v", linkID, err)
return
}
// Store updated baseline
if s.baselineState == nil {
s.baselineState = make(map[string]*signal.BaselineState)
}
s.baselineState[linkID] = result.Baseline
}
// Broadcast replay blob update (empty for now - fusion will populate)
w.broadcaster.BroadcastReplayBlobs([]BlobUpdate{}, frameTimeNS/1e6)
}
// StartSession creates a new replay session.
func (w *Worker) StartSession(fromMS, toMS int64, speed int) (string, error) {
w.mu.Lock()
defer w.mu.Unlock()
id := w.generateID()
s := &session{
ID: id,
FromMS: fromMS,
ToMS: toMS,
CurrentMS: fromMS,
Speed: speed,
State: "paused",
Params: make(map[string]interface{}),
CreatedAt: time.Now(),
baselineState: make(map[string]*signal.BaselineState),
}
w.sessions[id] = s
log.Printf("[INFO] Replay session started: %s (from %d to %d, speed %dx)",
id, fromMS, toMS, speed)
return id, nil
}
// StopSession stops and removes a replay session.
func (w *Worker) StopSession(sessionID string) error {
w.mu.Lock()
defer w.mu.Unlock()
s, exists := w.sessions[sessionID]
if !exists {
return ErrSessionNotFound
}
s.State = "stopped"
delete(w.sessions, sessionID)
log.Printf("[INFO] Replay session stopped: %s", sessionID)
return nil
}
// Seek moves a session's cursor to the target timestamp.
func (w *Worker) Seek(sessionID string, targetMS int64) error {
w.mu.Lock()
defer w.mu.Unlock()
s, exists := w.sessions[sessionID]
if !exists {
return ErrSessionNotFound
}
if targetMS < s.FromMS || targetMS > s.ToMS {
return ErrTimestampOutOfRange
}
s.CurrentMS = targetMS
s.State = "paused"
// Reset baseline state for clean replay
s.baselineState = make(map[string]*signal.BaselineState)
log.Printf("[INFO] Replay session seeked: %s to %d", sessionID, targetMS)
return nil
}
// SetPlaybackSpeed changes a session's playback speed.
func (w *Worker) SetPlaybackSpeed(sessionID string, speed int) error {
w.mu.Lock()
defer w.mu.Unlock()
s, exists := w.sessions[sessionID]
if !exists {
return ErrSessionNotFound
}
if speed != 1 && speed != 2 && speed != 5 {
return ErrInvalidSpeed
}
s.Speed = speed
return nil
}
// SetState changes a session's playback state.
func (w *Worker) SetState(sessionID, state string) error {
w.mu.Lock()
defer w.mu.Unlock()
s, exists := w.sessions[sessionID]
if !exists {
return ErrSessionNotFound
}
switch state {
case "playing", "paused":
s.State = state
default:
return ErrInvalidState
}
return nil
}
// UpdateParams updates a session's pipeline parameters.
func (w *Worker) UpdateParams(sessionID string, params map[string]interface{}) error {
w.mu.Lock()
defer w.mu.Unlock()
s, exists := w.sessions[sessionID]
if !exists {
return ErrSessionNotFound
}
// Merge params
for k, v := range params {
s.Params[k] = v
}
return nil
}
// GetSession returns a session by ID.
func (w *Worker) GetSession(sessionID string) (*session, error) {
w.mu.Lock()
defer w.mu.Unlock()
s, exists := w.sessions[sessionID]
if !exists {
return nil, ErrSessionNotFound
}
return s, nil
}
// GetAllSessions returns all active sessions.
func (w *Worker) GetAllSessions() []*session {
w.mu.Lock()
defer w.mu.Unlock()
sessions := make([]*session, 0, len(w.sessions))
for _, s := range w.sessions {
sessions = append(sessions, s)
}
return sessions
}
func (w *Worker) generateID() string {
w.nextID++
return w.formatID(w.nextID)
}
func (w *Worker) formatID(n int) string {
return "replay-" + time.Now().Format("20060102-150405") + "-" + string(rune('A'+(n%26)))
}
// Errors
var (
ErrSessionNotFound = &replayError{"session not found"}
ErrTimestampOutOfRange = &replayError{"timestamp outside session range"}
ErrInvalidSpeed = &replayError{"speed must be 1, 2, or 5"}
ErrInvalidState = &replayError{"state must be 'playing' or 'paused'"}
)
type replayError struct {
msg string
}
func (e *replayError) Error() string {
return e.msg
}

View file

@ -289,6 +289,30 @@ func (pm *ProcessorManager) RemoveProcessor(linkID string) {
delete(pm.processors, linkID)
}
// ProcessWithBaseline processes a CSI frame with a provided baseline state (for replay).
// The baseline is used directly without updating the stored baseline.
func (pm *ProcessorManager) ProcessWithBaseline(linkID string, payload []int8, rssiDBM int8, nSub int, recvTime time.Time, baseline *BaselineState) (*ProcessResult, error) {
pm.mu.Lock()
processor, exists := pm.processors[linkID]
if !exists {
processor = NewLinkProcessor(linkID, pm.nSub, pm.alpha)
pm.processors[linkID] = processor
}
// If a baseline is provided, use it temporarily
if baseline != nil && baseline.IsInitialized() {
oldBaseline := processor.baseline
processor.baseline = baseline
defer func() {
processor.baseline = oldBaseline
}()
}
result, err := processor.Process(payload, rssiDBM, nSub, recvTime)
pm.mu.Unlock()
return result, err
}
// LinkMotionState represents the motion state of a single link
type LinkMotionState struct {
LinkID string