feat: robust WebSocket reconnection with backoff, extrapolation, and visual states

Implements exponential backoff (1s→10s cap) with ±500ms jitter,
blob position extrapolation during disconnects (capped at 2s),
three visual states (silent <5s, dimming 5-30s, modal >30s),
and automatic scene restoration on reconnect.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-07 12:34:08 -04:00
parent 31659a5ccc
commit ff3428fee6
7 changed files with 1654 additions and 197 deletions

View file

@ -1987,6 +1987,89 @@
border-radius: 3px;
transition: width 0.5s;
}
/* WebSocket reconnect spinner */
#ws-reconnect-spinner {
display: none;
width: 14px;
height: 14px;
border: 2px solid rgba(255, 167, 38, 0.3);
border-top-color: #ffa726;
border-radius: 50%;
animation: ws-spin 0.8s linear infinite;
margin-left: 4px;
}
#ws-reconnect-spinner.visible {
display: inline-block;
}
@keyframes ws-spin {
to { transform: rotate(360deg); }
}
/* Connection status dot — reconnecting (amber) */
.status-dot.reconnecting {
background: #ffa726;
box-shadow: 0 0 8px #ffa726;
animation: ws-pulse 1.5s ease-in-out infinite;
}
@keyframes ws-pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.4; }
}
/* Connection lost modal (>30s) */
.ws-lost-modal {
position: fixed;
top: 0; left: 0; right: 0; bottom: 0;
background: rgba(0, 0, 0, 0.6);
z-index: 200;
display: flex;
align-items: center;
justify-content: center;
}
.ws-lost-modal-content {
background: #1e1e3a;
border: 1px solid #333;
border-radius: 12px;
padding: 32px;
text-align: center;
max-width: 380px;
}
.ws-lost-modal-content h3 {
font-size: 20px;
margin-bottom: 12px;
color: #ef5350;
}
.ws-lost-modal-content p {
font-size: 14px;
color: #aaa;
margin-bottom: 24px;
line-height: 1.5;
}
.ws-lost-reload-btn {
background: #4fc3f7;
color: #1a1a2e;
border: none;
padding: 10px 24px;
border-radius: 6px;
font-size: 14px;
cursor: pointer;
margin-right: 8px;
}
.ws-lost-reload-btn:hover {
background: #29b6f6;
}
.ws-lost-dismiss-btn {
background: rgba(255, 255, 255, 0.1);
color: #ccc;
border: none;
padding: 10px 24px;
border-radius: 6px;
font-size: 14px;
cursor: pointer;
}
.ws-lost-dismiss-btn:hover {
background: rgba(255, 255, 255, 0.15);
}
</style>
</head>
<body>
@ -2022,6 +2105,7 @@
<div class="status-item">
<div id="ws-status" class="status-dot disconnected"></div>
<span id="ws-status-text">Disconnected</span>
<span id="ws-reconnect-spinner"></span>
</div>
<div class="status-item">
<span>Nodes: <strong id="node-count">0</strong></span>
@ -2159,6 +2243,8 @@
<script src="js/troubleshoot.js"></script>
<!-- First-time feature tooltips (must load before app.js) -->
<script src="js/tooltips.js"></script>
<!-- WebSocket reconnection manager -->
<script src="js/websocket.js"></script>
<!-- Main application -->
<script src="js/app.js"></script>
<!-- esp-web-tools for firmware flashing (Web Serial) -->

View file

@ -12,7 +12,6 @@
// Configuration
// ============================================
const CONFIG = {
wsReconnectDelay: 3000,
gridWidth: 10, // meters
gridDepth: 10, // meters
gridDivisions: 20,
@ -371,58 +370,32 @@
}
// ============================================
// WebSocket Connection
// WebSocket Connection (via SpaxelWebSocket)
// ============================================
function connectWebSocket() {
const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsURL = `${wsProtocol}//${window.location.host}/ws/dashboard`;
// Initialize the WebSocket manager with callbacks
SpaxelWebSocket.init({
onOpen: function(ws) {
console.log('[Spaxel] WebSocket connected');
state.wsConnected = true;
state.awaitingSnapshot = true;
},
onMessage: function(data) {
handleMessage(data);
},
onClose: function(event) {
console.log('[Spaxel] WebSocket closed:', event.code, event.reason);
state.wsConnected = false;
console.log('[Spaxel] Connecting to', wsURL);
// Start blob extrapolation using captured blob states
SpaxelWebSocket.startExtrapolation();
},
onError: function(error) {
console.error('[Spaxel] WebSocket error:', error);
}
});
state.ws = new WebSocket(wsURL);
state.ws.binaryType = 'arraybuffer';
state.ws.onopen = function() {
console.log('[Spaxel] WebSocket connected');
state.wsConnected = true;
state.awaitingSnapshot = true;
updateConnectionStatus(true);
};
state.ws.onclose = function(event) {
console.log('[Spaxel] WebSocket closed:', event.code, event.reason);
state.wsConnected = false;
updateConnectionStatus(false);
scheduleReconnect();
};
state.ws.onerror = function(error) {
console.error('[Spaxel] WebSocket error:', error);
};
state.ws.onmessage = function(event) {
handleMessage(event.data);
};
}
function scheduleReconnect() {
console.log('[Spaxel] Reconnecting in', CONFIG.wsReconnectDelay, 'ms');
setTimeout(connectWebSocket, CONFIG.wsReconnectDelay);
}
function updateConnectionStatus(connected) {
const dot = document.getElementById('ws-status');
const text = document.getElementById('ws-status-text');
if (connected) {
dot.classList.remove('disconnected');
dot.classList.add('connected');
text.textContent = 'Connected';
} else {
dot.classList.remove('connected');
dot.classList.add('disconnected');
text.textContent = 'Disconnected';
}
SpaxelWebSocket.connect();
}
// ============================================
@ -776,8 +749,17 @@
function handleSnapshot(msg) {
state.awaitingSnapshot = false;
// On reconnect: clear trails, restore scene, log duration
if (SpaxelWebSocket.isConnected()) {
SpaxelWebSocket.onReconnected();
}
console.log('[Spaxel] Received snapshot, rebuilding state');
// Store snapshot for blob extrapolation on future disconnects
SpaxelWebSocket.setLastSnapshot(msg);
// Nodes
if (msg.nodes) {
state.nodes.clear();

View file

@ -670,6 +670,7 @@ const Viz3D = (function () {
obj.group.position.set(b.x, 0, b.z);
obj.lastPosition = { x: b.x, z: b.z };
obj.lastVelocity = { vx: b.vx || 0, vz: b.vz || 0 };
const speed = Math.sqrt(b.vx*b.vx + b.vz*b.vz);
_setPosture(obj.humanoid, speed > 0.25 ? 'walking' : 'standing');
@ -1988,6 +1989,51 @@ const Viz3D = (function () {
_fresnelZones = [];
}
// ── WebSocket reconnect helpers ─────────────────────────────────────────
/**
* Clear all blob trails (called on reconnect).
*/
function clearAllTrails() {
_blobs3D.forEach(function (obj) {
var arr = obj.trail.geometry.attributes.position.array;
arr.fill(0);
obj.trail.geometry.attributes.position.needsUpdate = true;
obj.trail.geometry.setDrawRange(0, 0);
});
}
/**
* Extrapolate a single blob's position during disconnect.
* @param {number} blobId
* @param {number} x - new X position
* @param {number} z - new Z position
*/
function extrapolateBlobPosition(blobId, x, z) {
var obj = _blobs3D.get(blobId);
if (!obj) return;
obj.group.position.set(x, 0, z);
}
/**
* Get current blob states for extrapolation on disconnect.
* Returns array of { id, x, z, vx, vz } for each tracked blob.
* @returns {Array}
*/
function getBlobStates() {
var states = [];
_blobs3D.forEach(function (obj, blobId) {
states.push({
id: blobId,
x: obj.lastPosition ? obj.lastPosition.x : 0,
z: obj.lastPosition ? obj.lastPosition.z : 0,
vx: obj.lastVelocity ? obj.lastVelocity.vx : 0,
vz: obj.lastVelocity ? obj.lastVelocity.vz : 0
});
});
return states;
}
// ── Public API ────────────────────────────────────────────────────────────
return {
init,
@ -1999,6 +2045,10 @@ const Viz3D = (function () {
applyLinks,
uploadFloorPlan,
setViewPreset,
// WebSocket reconnect helpers
clearAllTrails: clearAllTrails,
extrapolateBlobPosition: extrapolateBlobPosition,
getBlobStates: getBlobStates,
getNodeMesh: function (mac) { return _nodeMeshes.get(mac); },
rebuildLinkLines: _rebuildLinkLines,
// Ghost node API

405
dashboard/js/websocket.js Normal file
View file

@ -0,0 +1,405 @@
/**
* Spaxel Dashboard - WebSocket Reconnection Manager
*
* Handles robust reconnection with exponential backoff, jitter,
* disconnect state tracking, blob extrapolation, and visual state transitions.
*/
(function () {
'use strict';
// ── Configuration ───────────────────────────────────────────────────────
var BACKOFF_BASE_MS = 1000; // 1s initial
var BACKOFF_MAX_MS = 10000; // 10s cap
var JITTER_MS = 500; // ±500ms random jitter
var SILENT_MS = 5000; // <5s: no UI change, extrapolate blobs
var DIMMING_MS = 30000; // 30s: show modal
var EXTRAP_MAX_S = 2.0; // max extrapolation duration in seconds
// ── Internal state ───────────────────────────────────────────────────────
var _ws = null;
var _connected = false;
var _connecting = false;
var _reconnectTimer = null;
var _reconnectAttempt = 0;
var _disconnectStart = null; // Date.now() when disconnect began
var _lastSnapshot = null; // last received snapshot (for blob data)
var _blobStates = new Map(); // blobId -> { x, z, vx, vz, ts }
var _extrapolRAF = null; // requestAnimationFrame id for extrapolation
var _dimOverlay = null; // THREE.js overlay plane for dimming
var _modalShown = false;
// ── Callbacks (set by app.js) ───────────────────────────────────────────
var _onOpen = null; // function(ws) — called after successful connect
var _onMessage = null; // function(data) — called for each message
var _onClose = null; // function(event) — called after ws close
var _onError = null; // function(error) — called on ws error
// ── Viz3D references (set by app.js during init) ─────────────────────────
var _scene = null;
var _renderer = null;
// ── Exponential backoff with jitter ──────────────────────────────────────
function _backoffMs() {
var delay = Math.min(BACKOFF_BASE_MS * Math.pow(2, _reconnectAttempt), BACKOFF_MAX_MS);
var jitter = (Math.random() * 2 - 1) * JITTER_MS; // -500 to +500
return Math.max(100, delay + jitter);
}
// ── Connection lifecycle ─────────────────────────────────────────────────
function connect(url) {
if (_ws && (_ws.readyState === WebSocket.OPEN || _ws.readyState === WebSocket.CONNECTING)) {
return;
}
_connecting = true;
console.log('[WS] Connecting (attempt ' + (_reconnectAttempt + 1) + ')...');
try {
_ws = new WebSocket(url);
} catch (e) {
console.error('[WS] Failed to create WebSocket:', e);
_connecting = false;
_scheduleReconnect();
return;
}
_ws.binaryType = 'arraybuffer';
_ws.onopen = function () {
_connected = true;
_connecting = false;
_reconnectAttempt = 0;
_stopExtrapolation();
console.log('[WS] Connected');
_updateStatusUI('connected');
_fireOnOpen(_ws);
};
_ws.onclose = function (event) {
console.log('[WS] Closed:', event.code, event.reason);
_connected = false;
_connecting = false;
_ws = null;
if (!_disconnectStart) {
_disconnectStart = Date.now();
_captureBlobStates();
}
_updateStatusUI('disconnected');
_startDisconnectTimer();
_scheduleReconnect();
_fireOnClose(event);
};
_ws.onerror = function (error) {
console.error('[WS] Error:', error);
_fireOnError(error);
};
_ws.onmessage = function (event) {
_fireOnMessage(event.data);
};
}
function disconnect() {
_stopDisconnectTimer();
_stopExtrapolation();
if (_reconnectTimer) {
clearTimeout(_reconnectTimer);
_reconnectTimer = null;
}
if (_ws) {
_ws.onclose = null; // prevent reconnect
_ws.close();
_ws = null;
}
_connected = false;
_connecting = false;
_disconnectStart = null;
_reconnectAttempt = 0;
_restoreScene();
_dismissModal();
_updateStatusUI('disconnected');
}
function _scheduleReconnect() {
if (_reconnectTimer) clearTimeout(_reconnectTimer);
var delay = _backoffMs();
console.log('[WS] Reconnecting in', Math.round(delay), 'ms');
_reconnectTimer = setTimeout(function () {
_reconnectTimer = null;
_reconnectAttempt++;
var wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
connect(wsProtocol + '//' + window.location.host + '/ws/dashboard');
}, delay);
}
// ── Disconnect state timer ──────────────────────────────────────────────
var _disconnectTimer = null;
function _startDisconnectTimer() {
_stopDisconnectTimer();
_disconnectTimer = setInterval(function () {
if (_connected || !_disconnectStart) return;
var elapsed = Date.now() - _disconnectStart;
if (elapsed >= SILENT_MS && elapsed < DIMMING_MS) {
_applyDimming();
} else if (elapsed >= DIMMING_MS) {
_showModal();
}
}, 500);
}
function _stopDisconnectTimer() {
if (_disconnectTimer) {
clearInterval(_disconnectTimer);
_disconnectTimer = null;
}
}
// ── Blob position extrapolation (<5s) ────────────────────────────────────
function _captureBlobStates() {
// Capture current blob positions and velocities from last known state
if (!_lastSnapshot || !_lastSnapshot.blobs) return;
_lastSnapshot.blobs.forEach(function (b) {
_blobStates.set(b.id, {
x: b.x,
z: b.z,
vx: b.vx || 0,
vz: b.vz || 0,
ts: Date.now()
});
});
}
function _startExtrapolation() {
if (_blobStates.size === 0) return;
if (_extrapolRAF) return; // already running
function tick() {
if (_connected) return;
var now = Date.now();
_blobStates.forEach(function (state, blobId) {
var elapsed = (now - state.ts) / 1000; // seconds
if (elapsed > EXTRAP_MAX_S) return; // cap extrapolation
// position = last_position + last_velocity * elapsed
var newX = state.x + state.vx * elapsed;
var newZ = state.z + state.vz * elapsed;
// Update Viz3D blob position
if (window.Viz3D && Viz3D.extrapolateBlobPosition) {
Viz3D.extrapolateBlobPosition(blobId, newX, newZ);
}
});
_extrapolRAF = requestAnimationFrame(tick);
}
_extrapolRAF = requestAnimationFrame(tick);
}
function _stopExtrapolation() {
if (_extrapolRAF) {
cancelAnimationFrame(_extrapolRAF);
_extrapolRAF = null;
}
_blobStates.clear();
}
// ── Visual state transitions ─────────────────────────────────────────────
// Dim overlay: semi-transparent plane in front of camera
function _applyDimming() {
if (_dimOverlay) return; // already dimmed
if (!_scene) return;
// Use a screen-space overlay via CSS instead of 3D plane for simplicity
var overlay = document.getElementById('ws-dim-overlay');
if (!overlay) {
overlay = document.createElement('div');
overlay.id = 'ws-dim-overlay';
overlay.style.cssText = 'position:fixed;top:0;left:0;right:0;bottom:0;' +
'background:rgba(0,0,0,0.5);z-index:50;pointer-events:auto;' +
'transition:opacity 0.5s ease;opacity:0;';
document.body.appendChild(overlay);
}
// Trigger fade in
requestAnimationFrame(function () {
overlay.style.opacity = '1';
});
_dimOverlay = overlay;
// Show reconnecting spinner in status bar
var spinner = document.getElementById('ws-reconnect-spinner');
if (spinner) spinner.classList.add('visible');
var reconnectText = document.getElementById('ws-status-text');
if (reconnectText) reconnectText.textContent = 'Reconnecting...';
}
function _restoreScene() {
if (_dimOverlay) {
_dimOverlay.style.opacity = '0';
setTimeout(function () {
if (_dimOverlay && _dimOverlay.parentNode) {
_dimOverlay.parentNode.removeChild(_dimOverlay);
}
_dimOverlay = null;
}, 500);
}
var spinner = document.getElementById('ws-reconnect-spinner');
if (spinner) spinner.classList.remove('visible');
if (_connected) {
var reconnectText = document.getElementById('ws-status-text');
if (reconnectText) reconnectText.textContent = 'Connected';
}
}
// ── Connection lost modal (>30s) ────────────────────────────────────────
function _showModal() {
if (_modalShown) return;
_modalShown = true;
// Stop extrapolation when showing modal (scene is stale anyway)
_stopExtrapolation();
var existing = document.getElementById('ws-lost-modal');
if (existing) return;
var modal = document.createElement('div');
modal.id = 'ws-lost-modal';
modal.className = 'ws-lost-modal';
modal.innerHTML =
'<div class="ws-lost-modal-content">' +
'<h3>Connection lost</h3>' +
'<p>The dashboard lost connection to the mothership. ' +
'The scene below shows the last known state.</p>' +
'<button class="ws-lost-reload-btn" onclick="location.reload()">Reload Page</button>' +
'<button class="ws-lost-dismiss-btn" onclick="this.closest(\'.ws-lost-modal\').style.display=\'none\'">Dismiss</button>' +
'</div>';
document.body.appendChild(modal);
}
function _dismissModal() {
_modalShown = false;
var modal = document.getElementById('ws-lost-modal');
if (modal && modal.parentNode) {
modal.parentNode.removeChild(modal);
}
}
// ── Status indicator ────────────────────────────────────────────────────
function _updateStatusUI(status) {
var dot = document.getElementById('ws-status');
var text = document.getElementById('ws-status-text');
if (!dot || !text) return;
dot.classList.remove('connected', 'disconnected', 'reconnecting');
if (status === 'connected') {
dot.classList.add('connected');
text.textContent = 'Connected';
} else if (status === 'reconnecting') {
dot.classList.add('reconnecting');
text.textContent = 'Reconnecting...';
} else {
dot.classList.add('disconnected');
// Don't override text if already set to "Reconnecting..."
if (_disconnectStart && Date.now() - _disconnectStart >= SILENT_MS) {
text.textContent = 'Reconnecting...';
} else {
text.textContent = 'Disconnected';
}
}
}
// ── Snapshot tracking (for blob extrapolation on disconnect) ────────────
function setLastSnapshot(snapshot) {
_lastSnapshot = snapshot;
}
// ── Reconnect handler: clear trails, restore scene ──────────────────────
function onReconnected() {
var elapsed = _disconnectStart ? ((Date.now() - _disconnectStart) / 1000).toFixed(1) : '?';
console.log('[WS] Reconnected after ' + elapsed + 's');
// Clear disconnect state
_disconnectStart = null;
_restoreScene();
_dismissModal();
_stopDisconnectTimer();
// Clear blob trails in Viz3D
if (window.Viz3D && Viz3D.clearAllTrails) {
Viz3D.clearAllTrails();
}
}
// ── Callbacks ───────────────────────────────────────────────────────────
function _fireOnOpen(ws) {
if (typeof _onOpen === 'function') _onOpen(ws);
}
function _fireOnMessage(data) {
if (typeof _onMessage === 'function') _onMessage(data);
}
function _fireOnClose(event) {
if (typeof _onClose === 'function') _onClose(event);
}
function _fireOnError(error) {
if (typeof _onError === 'function') _onError(error);
}
// ── Public API ──────────────────────────────────────────────────────────
window.SpaxelWebSocket = {
/**
* Initialize the WebSocket manager.
* @param {Object} opts
* @param {Function} opts.onOpen called with ws after connect
* @param {Function} opts.onMessage called with raw message data
* @param {Function} opts.onClose called with CloseEvent
* @param {Function} opts.onError called with ErrorEvent
*/
init: function (opts) {
if (opts.onOpen) _onOpen = opts.onOpen;
if (opts.onMessage) _onMessage = opts.onMessage;
if (opts.onClose) _onClose = opts.onClose;
if (opts.onError) _onError = opts.onError;
},
/** Connect (or reconnect) to the dashboard WebSocket. */
connect: connect,
/** Disconnect and stop all reconnection attempts. */
disconnect: disconnect,
/** Record the last snapshot for blob extrapolation. */
setLastSnapshot: setLastSnapshot,
/** Called when the connection is re-established (from snapshot handler). */
onReconnected: onReconnected,
/** Start blob extrapolation (called on disconnect if <5s). */
startExtrapolation: _startExtrapolation,
/** Get current connection state. */
isConnected: function () { return _connected; },
isConnecting: function () { return _connecting; },
getDisconnectDurationMs: function () {
return _disconnectStart ? (Date.now() - _disconnectStart) : 0;
},
/** Send raw data over the WebSocket. */
send: function (data) {
if (_ws && _ws.readyState === WebSocket.OPEN) {
_ws.send(data);
}
}
};
console.log('[WS] Reconnection manager initialized');
})();

View file

@ -58,6 +58,11 @@ func AllMigrations() []Migration {
Description: "add floorplan table for image upload and calibration",
Up: migration_010_add_floorplan,
},
{
Version: 11,
Description: "add FTS5 table and triggers for events search",
Up: migration_011_add_events_fts,
},
}
}
@ -504,3 +509,34 @@ CREATE TABLE IF NOT EXISTS floorplan (
_, err := tx.Exec(schema)
return err
}
// migration_011_add_events_fts adds FTS5 full-text search for events.
func migration_011_add_events_fts(tx *sql.Tx) error {
schema := `
-- FTS5 index for natural-language search across event detail
CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
type, zone, person, detail_json,
content='events', content_rowid='id'
);
-- Triggers to keep events_fts in sync with the events table
CREATE TRIGGER IF NOT EXISTS events_fts_insert AFTER INSERT ON events BEGIN
INSERT INTO events_fts(rowid, type, zone, person, detail_json)
VALUES (new.id, new.type, new.zone, new.person, new.detail_json);
END;
CREATE TRIGGER IF NOT EXISTS events_fts_delete AFTER DELETE ON events BEGIN
INSERT INTO events_fts(events_fts, rowid, type, zone, person, detail_json)
VALUES ('delete', old.id, old.type, old.zone, old.person, old.detail_json);
END;
CREATE TRIGGER IF NOT EXISTS events_fts_update AFTER UPDATE ON events BEGIN
INSERT INTO events_fts(events_fts, rowid, type, zone, person, detail_json)
VALUES ('delete', old.id, old.type, old.zone, old.person, old.detail_json);
INSERT INTO events_fts(rowid, type, zone, person, detail_json)
VALUES (new.id, new.type, new.zone, new.person, new.detail_json);
END;
`
_, err := tx.Exec(schema)
return err
}

View file

@ -1,177 +1,362 @@
// Package events provides event types for the spaxel system.
// Package events provides event types and SQLite storage for the unified activity timeline.
package events
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"time"
_ "modernc.org/sqlite"
)
// AnomalyType represents the type of anomaly detected.
type AnomalyType string
// EventType represents the type of an event.
type EventType string
const (
AnomalyUnusualHour AnomalyType = "unusual_hour" // Motion at unusual time
AnomalyUnknownBLE AnomalyType = "unknown_ble" // Unknown BLE device nearby
AnomalyMotionDuringAway AnomalyType = "motion_during_away" // Motion when system is in away mode
AnomalyUnusualDwell AnomalyType = "unusual_dwell" // Person in zone longer than usual
EventTypeDetection EventType = "detection"
EventTypeZoneEntry EventType = "zone_entry"
EventTypeZoneExit EventType = "zone_exit"
EventTypePortalCrossing EventType = "portal_crossing"
EventTypeTriggerFired EventType = "trigger_fired"
EventTypeFallAlert EventType = "fall_alert"
EventTypeAnomaly EventType = "anomaly"
EventTypeSecurityAlert EventType = "security_alert"
EventTypeNodeOnline EventType = "node_online"
EventTypeNodeOffline EventType = "node_offline"
EventTypeOTAUpdate EventType = "ota_update"
EventTypeBaselineChanged EventType = "baseline_changed"
EventTypeSystem EventType = "system"
EventTypeLearningMilestone EventType = "learning_milestone"
)
// AnomalyEvent represents a detected anomaly.
type AnomalyEvent struct {
ID string `json:"id"`
Type AnomalyType `json:"type"`
Score float64 `json:"score"` // 0.0 - 1.0, higher = more anomalous
Description string `json:"description"` // Human-readable description
Timestamp time.Time `json:"timestamp"`
// Context for the anomaly
ZoneID string `json:"zone_id,omitempty"`
ZoneName string `json:"zone_name,omitempty"`
BlobID int `json:"blob_id,omitempty"`
PersonID string `json:"person_id,omitempty"`
PersonName string `json:"person_name,omitempty"`
DeviceMAC string `json:"device_mac,omitempty"`
DeviceName string `json:"device_name,omitempty"`
Position Position `json:"position,omitempty"`
// For unusual hour anomalies
HourOfWeek int `json:"hour_of_week,omitempty"` // 0-167
ExpectedOccupancy float64 `json:"expected_occupancy,omitempty"` // 0.0-1.0
// For unusual dwell anomalies
DwellDuration time.Duration `json:"dwell_duration,omitempty"`
ExpectedDwell time.Duration `json:"expected_dwell,omitempty"`
// For BLE anomalies
RSSIdBm int `json:"rssi_dbm,omitempty"`
SeenBefore bool `json:"seen_before,omitempty"` // Was this device seen before (even if not regular)
// Acknowledgement state
Acknowledged bool `json:"acknowledged"`
AcknowledgedAt time.Time `json:"acknowledged_at,omitempty"`
Feedback string `json:"feedback,omitempty"` // "expected", "intrusion", "false_alarm"
AcknowledgedBy string `json:"acknowledged_by,omitempty"` // User who acknowledged
// Alert chain state
AlertSent bool `json:"alert_sent"`
WebhookSent bool `json:"webhook_sent"`
EscalationSent bool `json:"escalation_sent"`
AlertSentAt time.Time `json:"alert_sent_at,omitempty"`
WebhookSentAt time.Time `json:"webhook_sent_at,omitempty"`
EscalationSentAt time.Time `json:"escalation_sent_at,omitempty"`
}
// Position represents a 3D position.
type Position struct {
X float64 `json:"x"`
Y float64 `json:"y"`
Z float64 `json:"z"`
}
// SystemMode represents the system operating mode.
type SystemMode string
// EventSeverity represents the severity level of an event.
type EventSeverity string
const (
ModeHome SystemMode = "home"
ModeAway SystemMode = "away"
ModeSleep SystemMode = "sleep"
SeverityInfo EventSeverity = "info"
SeverityWarning EventSeverity = "warning"
SeverityAlert EventSeverity = "alert"
SeverityCritical EventSeverity = "critical"
)
// SystemModeChangeEvent represents a change in system mode.
type SystemModeChangeEvent struct {
PreviousMode SystemMode `json:"previous_mode"`
NewMode SystemMode `json:"new_mode"`
Reason string `json:"reason"` // "auto_away", "auto_disarm", "manual"
Timestamp time.Time `json:"timestamp"`
PersonID string `json:"person_id,omitempty"` // For auto-disarm
PersonName string `json:"person_name,omitempty"` // For auto-disarm
// Event represents a single event in the unified activity timeline.
type Event struct {
ID int64
TimestampMs int64
Type EventType
Zone string
Person string
BlobID int
DetailJSON string
Severity EventSeverity
}
// AnomalyFeedback is used for recording user feedback on anomalies.
type AnomalyFeedback struct {
AnomalyID string `json:"anomaly_id"`
Feedback string `json:"feedback"` // "expected", "intrusion", "false_alarm"
Timestamp time.Time `json:"timestamp"`
Notes string `json:"notes,omitempty"`
// QueryParams defines parameters for querying events.
type QueryParams struct {
Limit int
BeforeID int64 // Cursor for pagination
AfterID int64
Type EventType
Zone string
Person string
AfterTime time.Time
BeforeTime time.Time
SearchQuery string // FTS5 search query
}
// WeeklyAnomalySummary provides a summary of anomalies for the past week.
type WeeklyAnomalySummary struct {
TotalAnomalies int `json:"total_anomalies"`
FalseAlarms int `json:"false_alarms"`
GenuineIntrusions int `json:"genuine_intrusions"`
ExpectedEvents int `json:"expected_events"`
Unacknowledged int `json:"unacknowledged"`
ByType map[AnomalyType]int `json:"by_type"`
// InsertEvent inserts a new event into the database.
func InsertEvent(db *sql.DB, e Event) (int64, error) {
if e.TimestampMs == 0 {
e.TimestampMs = time.Now().UnixMilli()
}
if e.Severity == "" {
e.Severity = SeverityInfo
}
result, err := db.Exec(`
INSERT INTO events (timestamp_ms, type, zone, person, blob_id, detail_json, severity)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, e.TimestampMs, e.Type, e.Zone, e.Person, e.BlobID, e.DetailJSON, e.Severity)
if err != nil {
return 0, fmt.Errorf("insert event: %w", err)
}
id, err := result.LastInsertId()
if err != nil {
return 0, fmt.Errorf("get last insert id: %w", err)
}
return id, nil
}
// OccupancySample represents a single occupancy observation for behaviour modelling.
type OccupancySample struct {
HourOfWeek int `json:"hour_of_week"` // 0-167 (hour of week)
ZoneID string `json:"zone_id"`
PersonCount int `json:"person_count"`
BLEDevices []string `json:"ble_devices,omitempty"`
Timestamp time.Time `json:"timestamp"`
// QueryEvents retrieves events from the database based on the provided parameters.
// Returns the events, the next cursor ID for pagination, and whether there are more results.
func QueryEvents(db *sql.DB, params QueryParams) ([]Event, string, bool, error) {
if params.Limit <= 0 {
params.Limit = 50
}
if params.Limit > 1000 {
params.Limit = 1000 // Max limit
}
query := `
SELECT id, timestamp_ms, type, zone, person, blob_id, detail_json, severity
FROM events
WHERE 1=1
`
args := []interface{}{}
// Cursor pagination
if params.BeforeID > 0 {
query += " AND id < ?"
args = append(args, params.BeforeID)
} else if params.AfterID > 0 {
query += " AND id > ?"
args = append(args, params.AfterID)
}
// Time range filters
if !params.AfterTime.IsZero() {
query += " AND timestamp_ms >= ?"
args = append(args, params.AfterTime.UnixMilli())
}
if !params.BeforeTime.IsZero() {
query += " AND timestamp_ms <= ?"
args = append(args, params.BeforeTime.UnixMilli())
}
// Type filter
if params.Type != "" {
query += " AND type = ?"
args = append(args, params.Type)
}
// Zone filter
if params.Zone != "" {
query += " AND zone = ?"
args = append(args, params.Zone)
}
// Person filter
if params.Person != "" {
query += " AND person = ?"
args = append(args, params.Person)
}
// FTS5 full-text search (must use the FTS table)
if params.SearchQuery != "" {
// Use subquery to search FTS and join with events table
query = `
SELECT e.id, e.timestamp_ms, e.type, e.zone, e.person, e.blob_id, e.detail_json, e.severity
FROM events e
INNER JOIN events_fts fts ON e.id = fts.rowid
WHERE events_fts MATCH ?
`
args = []interface{}{params.SearchQuery}
// Reapply other filters to the subquery
if params.BeforeID > 0 {
query += " AND e.id < ?"
args = append(args, params.BeforeID)
} else if params.AfterID > 0 {
query += " AND e.id > ?"
args = append(args, params.AfterID)
}
if !params.AfterTime.IsZero() {
query += " AND e.timestamp_ms >= ?"
args = append(args, params.AfterTime.UnixMilli())
}
if !params.BeforeTime.IsZero() {
query += " AND e.timestamp_ms <= ?"
args = append(args, params.BeforeTime.UnixMilli())
}
if params.Type != "" {
query += " AND e.type = ?"
args = append(args, params.Type)
}
if params.Zone != "" {
query += " AND e.zone = ?"
args = append(args, params.Zone)
}
if params.Person != "" {
query += " AND e.person = ?"
args = append(args, params.Person)
}
}
query += " ORDER BY id DESC LIMIT ?"
args = append(args, params.Limit+1) // Fetch one extra to check for more results
rows, err := db.Query(query, args...)
if err != nil {
return nil, "", false, fmt.Errorf("query events: %w", err)
}
defer rows.Close()
var events []Event
for rows.Next() {
var e Event
err := rows.Scan(&e.ID, &e.TimestampMs, &e.Type, &e.Zone, &e.Person, &e.BlobID, &e.DetailJSON, &e.Severity)
if err != nil {
return nil, "", false, fmt.Errorf("scan event: %w", err)
}
events = append(events, e)
}
if err := rows.Err(); err != nil {
return nil, "", false, fmt.Errorf("iterate rows: %w", err)
}
// Check if there are more results
hasMore := len(events) > params.Limit
nextCursor := ""
if hasMore {
// Remove the extra event
events = events[:params.Limit]
nextCursor = fmt.Sprintf("%d", events[len(events)-1].ID)
}
return events, nextCursor, hasMore, nil
}
// DwellSample represents a dwell duration observation for behaviour modelling.
type DwellSample struct {
HourOfWeek int `json:"hour_of_week"`
ZoneID string `json:"zone_id"`
PersonID string `json:"person_id,omitempty"`
DwellDuration time.Duration `json:"dwell_duration"`
Timestamp time.Time `json:"timestamp"`
// ArchiveDays is the number of days after which events are archived.
const ArchiveDays = 90
// ArchiveDaysMs is ArchiveDays expressed in milliseconds.
const ArchiveDaysMs = ArchiveDays * 24 * 60 * 60 * 1000
// RunArchiveJob moves events older than 90 days from the events table to the events_archive table.
// This should be called nightly (e.g., at 02:00 local time).
func RunArchiveJob(db *sql.DB) error {
// Get the cutoff timestamp
cutoffMs := time.Now().UnixMilli() - ArchiveDaysMs
// Begin transaction for atomic archive operation
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
defer tx.Rollback()
// Get count of events to be archived
var count int
err = tx.QueryRow("SELECT COUNT(*) FROM events WHERE timestamp_ms < ?", cutoffMs).Scan(&count)
if err != nil {
return fmt.Errorf("count events to archive: %w", err)
}
if count == 0 {
log.Printf("[events archive] No events to archive (cutoff: %d ms ago)", ArchiveDays)
return nil
}
log.Printf("[events archive] Archiving %d events older than %d days", count, ArchiveDays)
// Copy old events to archive table
// We preserve the original ID to maintain referential integrity
_, err = tx.Exec(`
INSERT INTO events_archive (id, timestamp_ms, type, zone, person, blob_id, detail_json, severity)
SELECT id, timestamp_ms, type, zone, person, blob_id, detail_json, severity
FROM events
WHERE timestamp_ms < ?
`, cutoffMs)
if err != nil {
return fmt.Errorf("copy events to archive: %w", err)
}
// Delete the archived events from the main events table
// The FTS5 triggers will automatically remove them from events_fts
result, err := tx.Exec("DELETE FROM events WHERE timestamp_ms < ?", cutoffMs)
if err != nil {
return fmt.Errorf("delete archived events: %w", err)
}
// Verify the delete count matches the copy count
rowsAffected, _ := result.RowsAffected()
if rowsAffected != int64(count) {
log.Printf("[WARN] Events archive: copied %d but deleted %d rows", count, rowsAffected)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit archive transaction: %w", err)
}
log.Printf("[events archive] Successfully archived %d events", count)
return nil
}
// SleepSessionStartEvent represents the start of a sleep session.
type SleepSessionStartEvent struct {
SessionID string `json:"session_id"`
PersonID string `json:"person_id,omitempty"`
ZoneID string `json:"zone_id"`
LinkID string `json:"link_id"`
TentativeStart time.Time `json:"tentative_start"` // When conditions first met
ConfirmedStart time.Time `json:"confirmed_start"` // When 15-min confirmation elapsed
SessionDate string `json:"session_date"` // Date this sleep night belongs to
Timestamp time.Time `json:"timestamp"`
// GetEventByID retrieves a single event by its ID.
func GetEventByID(db *sql.DB, id int64) (*Event, error) {
var e Event
err := db.QueryRow(`
SELECT id, timestamp_ms, type, zone, person, blob_id, detail_json, severity
FROM events
WHERE id = ?
`, id).Scan(&e.ID, &e.TimestampMs, &e.Type, &e.Zone, &e.Person, &e.BlobID, &e.DetailJSON, &e.Severity)
if err == sql.ErrNoRows {
return nil, fmt.Errorf("event not found: %d", id)
}
if err != nil {
return nil, fmt.Errorf("query event: %w", err)
}
return &e, nil
}
// SleepSessionEndEvent represents the end of a sleep session.
type SleepSessionEndEvent struct {
SessionID string `json:"session_id"`
PersonID string `json:"person_id,omitempty"`
ZoneID string `json:"zone_id"`
LinkID string `json:"link_id"`
SessionDate string `json:"session_date"`
SleepOnset time.Time `json:"sleep_onset"`
WakeTime time.Time `json:"wake_time"`
TimeInBed time.Duration `json:"time_in_bed"`
WakeEpisodeCount int `json:"wake_episode_count"`
WASODuration time.Duration `json:"waso_duration"`
SleepEfficiency float64 `json:"sleep_efficiency"` // 0-100
AvgBreathingRate float64 `json:"avg_breathing_rate"` // BPM
BreathingAnomalyCount int `json:"breathing_anomaly_count"`
OverallScore float64 `json:"overall_score"` // 0-100
QualityRating string `json:"quality_rating"` // poor/fair/good/excellent
EndReason string `json:"end_reason"` // "zone_exit", "sustained_motion", "stationary_drop"
Timestamp time.Time `json:"timestamp"`
// InsertDetectionEvent is a convenience function for inserting detection events.
func InsertDetectionEvent(db *sql.DB, zone string, person string, blobID int, detail map[string]interface{}) (int64, error) {
detailJSON, err := json.Marshal(detail)
if err != nil {
return 0, fmt.Errorf("marshal detail: %w", err)
}
return InsertEvent(db, Event{
Type: EventTypeDetection,
Zone: zone,
Person: person,
BlobID: blobID,
DetailJSON: string(detailJSON),
Severity: SeverityInfo,
})
}
// MorningSummaryEvent represents the morning sleep summary pushed to dashboards.
type MorningSummaryEvent struct {
SessionID string `json:"session_id"`
PersonID string `json:"person_id,omitempty"`
PersonName string `json:"person_name,omitempty"`
SessionDate string `json:"session_date"`
SleepDuration time.Duration `json:"sleep_duration"`
SleepEfficiency float64 `json:"sleep_efficiency"`
EfficiencyColor string `json:"efficiency_color"` // "green", "amber", "red"
WakeEpisodeCount int `json:"wake_episode_count"`
WASODuration time.Duration `json:"waso_duration"`
AvgBreathingRate float64 `json:"avg_breathing_rate"`
BreathingAnomalyCount int `json:"breathing_anomaly_count"`
AnomalyNote string `json:"anomaly_note,omitempty"`
OverallScore float64 `json:"overall_score"`
QualityRating string `json:"quality_rating"`
SleepOnset time.Time `json:"sleep_onset"`
WakeTime time.Time `json:"wake_time"`
GeneratedAt time.Time `json:"generated_at"`
// InsertAlertEvent is a convenience function for inserting alert events.
func InsertAlertEvent(db *sql.DB, eventType EventType, zone string, person string, severity EventSeverity, detail map[string]interface{}) (int64, error) {
detailJSON, err := json.Marshal(detail)
if err != nil {
return 0, fmt.Errorf("marshal detail: %w", err)
}
return InsertEvent(db, Event{
Type: eventType,
Zone: zone,
Person: person,
DetailJSON: string(detailJSON),
Severity: severity,
})
}
// InsertSystemEvent is a convenience function for inserting system events.
func InsertSystemEvent(db *sql.DB, message string, detail map[string]interface{}) (int64, error) {
detailJSON, err := json.Marshal(detail)
if err != nil {
return 0, fmt.Errorf("marshal detail: %w", err)
}
if detail == nil {
detail = make(map[string]interface{})
}
detail["message"] = message
detailJSON, err = json.Marshal(detail)
if err != nil {
return 0, fmt.Errorf("marshal detail with message: %w", err)
}
return InsertEvent(db, Event{
Type: EventTypeSystem,
DetailJSON: string(detailJSON),
Severity: SeverityInfo,
})
}

View file

@ -0,0 +1,713 @@
// Package events provides tests for the unified activity timeline.
package events
import (
"database/sql"
"testing"
"time"
_ "modernc.org/sqlite"
)
// openTestDB creates an in-memory SQLite database for testing.
func openTestDB(t *testing.T) *sql.DB {
t.Helper()
db, err := sql.Open("sqlite", ":memory:")
if err != nil {
t.Fatalf("failed to open test database: %v", err)
}
// Create the schema
schema := `
-- Events table
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp_ms INTEGER NOT NULL,
type TEXT NOT NULL,
zone TEXT,
person TEXT,
blob_id INTEGER,
detail_json TEXT,
severity TEXT NOT NULL DEFAULT 'info'
);
CREATE INDEX IF NOT EXISTS idx_events_ts ON events(timestamp_ms DESC);
CREATE INDEX IF NOT EXISTS idx_events_type ON events(type);
CREATE INDEX IF NOT EXISTS idx_events_zone ON events(zone);
CREATE INDEX IF NOT EXISTS idx_events_person ON events(person);
-- Events archive
CREATE TABLE IF NOT EXISTS events_archive (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL,
timestamp_ms INTEGER NOT NULL,
zone TEXT,
person TEXT,
blob_id INTEGER,
detail_json TEXT,
severity TEXT NOT NULL DEFAULT 'info'
);
-- FTS5 table
CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
type, zone, person, detail_json,
content='events', content_rowid='id'
);
-- FTS5 triggers
CREATE TRIGGER IF NOT EXISTS events_fts_insert AFTER INSERT ON events BEGIN
INSERT INTO events_fts(rowid, type, zone, person, detail_json)
VALUES (new.id, new.type, new.zone, new.person, new.detail_json);
END;
CREATE TRIGGER IF NOT EXISTS events_fts_delete AFTER DELETE ON events BEGIN
INSERT INTO events_fts(events_fts, rowid, type, zone, person, detail_json)
VALUES ('delete', old.id, old.type, old.zone, old.person, old.detail_json);
END;
CREATE TRIGGER IF NOT EXISTS events_fts_update AFTER UPDATE ON events BEGIN
INSERT INTO events_fts(events_fts, rowid, type, zone, person, detail_json)
VALUES ('delete', old.id, old.type, old.zone, old.person, old.detail_json);
INSERT INTO events_fts(rowid, type, zone, person, detail_json)
VALUES (new.id, new.type, new.zone, new.person, new.detail_json);
END;
`
_, err = db.Exec(schema)
if err != nil {
t.Fatalf("failed to create schema: %v", err)
}
return db
}
func TestInsertEvent(t *testing.T) {
db := openTestDB(t)
defer db.Close()
tests := []struct {
name string
event Event
wantID bool
wantErr bool
checkFunc func(*testing.T, int64, Event)
}{
{
name: "basic event",
event: Event{
Type: EventTypeDetection,
Zone: "Kitchen",
Person: "Alice",
DetailJSON: `{"motion":0.15}`,
},
wantID: true,
},
{
name: "event with timestamp",
event: Event{
TimestampMs: 1710000000000,
Type: EventTypeZoneEntry,
Zone: "Living Room",
Person: "Bob",
Severity: SeverityInfo,
},
wantID: true,
},
{
name: "event with blob ID",
event: Event{
Type: EventTypeDetection,
Zone: "Hallway",
BlobID: 42,
DetailJSON: `{"confidence":0.85}`,
},
wantID: true,
},
{
name: "alert event",
event: Event{
Type: EventTypeFallAlert,
Zone: "Bathroom",
Person: "Alice",
Severity: SeverityAlert,
DetailJSON: `{"z_velocity":-1.8}`,
},
wantID: true,
checkFunc: func(t *testing.T, id int64, e Event) {
// Verify the event was inserted with the alert severity
var severity string
err := db.QueryRow("SELECT severity FROM events WHERE id = ?", id).Scan(&severity)
if err != nil {
t.Fatalf("failed to query severity: %v", err)
}
if severity != string(SeverityAlert) {
t.Errorf("got severity %q, want %q", severity, SeverityAlert)
}
},
},
{
name: "event with default severity",
event: Event{
Type: EventTypeSystem,
DetailJSON: `{"message":"test"}`,
},
wantID: true,
checkFunc: func(t *testing.T, id int64, e Event) {
// Verify default severity is 'info'
var severity string
err := db.QueryRow("SELECT severity FROM events WHERE id = ?", id).Scan(&severity)
if err != nil {
t.Fatalf("failed to query severity: %v", err)
}
if severity != string(SeverityInfo) {
t.Errorf("got severity %q, want %q", severity, SeverityInfo)
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
id, err := InsertEvent(db, tt.event)
if (err != nil) != tt.wantErr {
t.Errorf("InsertEvent() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tt.wantID && id == 0 {
t.Error("InsertEvent() returned zero ID, want non-zero")
}
if tt.checkFunc != nil {
tt.checkFunc(t, id, tt.event)
}
})
}
}
func TestQueryEvents(t *testing.T) {
db := openTestDB(t)
defer db.Close()
// Insert test data
now := time.Now().UnixMilli()
testEvents := []Event{
{Type: EventTypeDetection, Zone: "Kitchen", Person: "Alice", DetailJSON: `{"x":1.0}`, TimestampMs: now},
{Type: EventTypeZoneEntry, Zone: "Living Room", Person: "Bob", DetailJSON: `{"from":"Hallway"}`, TimestampMs: now - 1000},
{Type: EventTypeFallAlert, Zone: "Bathroom", Person: "Alice", Severity: SeverityAlert, DetailJSON: `{"z":0.3}`, TimestampMs: now - 2000},
{Type: EventTypeSystem, Zone: "", Person: "", DetailJSON: `{"message":"test"}`, TimestampMs: now - 3000},
}
for _, e := range testEvents {
_, err := InsertEvent(db, e)
if err != nil {
t.Fatalf("failed to insert test event: %v", err)
}
}
tests := []struct {
name string
params QueryParams
wantCount int
wantMore bool
checkTypes []EventType
checkZones []string
checkPerson string
}{
{
name: "default limit",
params: QueryParams{},
wantCount: 4,
wantMore: false,
},
{
name: "limit 2",
params: QueryParams{
Limit: 2,
},
wantCount: 2,
wantMore: true,
},
{
name: "filter by type",
params: QueryParams{
Type: EventTypeDetection,
Limit: 10,
},
wantCount: 1,
wantMore: false,
checkTypes: []EventType{EventTypeDetection},
},
{
name: "filter by zone",
params: QueryParams{
Zone: "Kitchen",
Limit: 10,
},
wantCount: 1,
wantMore: false,
checkZones: []string{"Kitchen"},
},
{
name: "filter by person",
params: QueryParams{
Person: "Alice",
Limit: 10,
},
wantCount: 2,
wantMore: false,
checkPerson: "Alice",
},
{
name: "filter by severity",
params: QueryParams{
Type: EventTypeFallAlert,
Limit: 10,
},
wantCount: 1,
},
{
name: "before_id cursor",
params: QueryParams{
Limit: 2,
BeforeID: 3, // Should return events with ID < 3
},
wantCount: 2,
wantMore: false,
},
{
name: "after_id cursor",
params: QueryParams{
Limit: 2,
AfterID: 2, // Should return events with ID > 2
},
wantCount: 2,
wantMore: false,
},
{
name: "time range",
params: QueryParams{
AfterTime: time.UnixMilli(now - 2500),
BeforeTime: time.UnixMilli(now - 500),
Limit: 10,
},
wantCount: 2, // Events at now-1000 and now-2000
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
events, nextCursor, hasMore, err := QueryEvents(db, tt.params)
if err != nil {
t.Fatalf("QueryEvents() error = %v", err)
}
if len(events) != tt.wantCount {
t.Errorf("QueryEvents() returned %d events, want %d", len(events), tt.wantCount)
}
if hasMore != tt.wantMore {
t.Errorf("QueryEvents() hasMore = %v, want %v", hasMore, tt.wantMore)
}
if tt.wantMore && nextCursor == "" {
t.Error("QueryEvents() returned empty nextCursor when hasMore is true")
}
// Check type filter results
if len(tt.checkTypes) > 0 {
for i, e := range events {
if i >= len(tt.checkTypes) {
break
}
if e.Type != tt.checkTypes[i] {
t.Errorf("events[%d].Type = %v, want %v", i, e.Type, tt.checkTypes[i])
}
}
}
// Check zone filter results
if len(tt.checkZones) > 0 {
for i, e := range events {
if i >= len(tt.checkZones) {
break
}
if e.Zone != tt.checkZones[i] {
t.Errorf("events[%d].Zone = %v, want %v", i, e.Zone, tt.checkZones[i])
}
}
}
// Check person filter results
if tt.checkPerson != "" {
for _, e := range events {
if e.Person != tt.checkPerson {
t.Errorf("event has person %q, want %q", e.Person, tt.checkPerson)
}
}
}
})
}
}
func TestQueryEventsFTS(t *testing.T) {
db := openTestDB(t)
defer db.Close()
// Insert test data with searchable content
now := time.Now().UnixMilli()
testEvents := []Event{
{Type: EventTypeFallAlert, Zone: "Kitchen", Person: "Alice", DetailJSON: `{"message":"Alice fell in the kitchen"}`, TimestampMs: now},
{Type: EventTypeDetection, Zone: "Living Room", Person: "Bob", DetailJSON: `{"message":"Bob walking in living room"}`, TimestampMs: now - 1000},
{Type: EventTypeSystem, Zone: "", Person: "", DetailJSON: `{"message":"System started"}`, TimestampMs: now - 2000},
}
for _, e := range testEvents {
_, err := InsertEvent(db, e)
if err != nil {
t.Fatalf("failed to insert test event: %v", err)
}
}
// Give FTS5 a moment to process
time.Sleep(10 * time.Millisecond)
tests := []struct {
name string
search string
wantCount int
}{
{
name: "search for Alice",
search: "Alice",
wantCount: 1,
},
{
name: "search for kitchen",
search: "kitchen",
wantCount: 1,
},
{
name: "search for walking",
search: "walking",
wantCount: 1,
},
{
name: "search for living room",
search: "living room",
wantCount: 1,
},
{
name: "search with OR",
search: "Alice OR Bob",
wantCount: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
events, _, _, err := QueryEvents(db, QueryParams{
SearchQuery: tt.search,
Limit: 10,
})
if err != nil {
t.Fatalf("QueryEvents() error = %v", err)
}
if len(events) != tt.wantCount {
t.Errorf("QueryEvents() search=%q returned %d events, want %d", tt.search, len(events), tt.wantCount)
}
})
}
}
func TestRunArchiveJob(t *testing.T) {
db := openTestDB(t)
defer db.Close()
now := time.Now().UnixMilli()
oldCutoff := now - ArchiveDaysMs - 1000 // Older than archive threshold
youngCutoff := now - ArchiveDaysMs + 1000 // Newer than archive threshold
// Insert old events (should be archived)
oldEvents := []Event{
{Type: EventTypeDetection, Zone: "Kitchen", DetailJSON: `{"old":1}`, TimestampMs: oldCutoff},
{Type: EventTypeZoneEntry, Zone: "Living Room", DetailJSON: `{"old":2}`, TimestampMs: oldCutoff - 1000},
}
// Insert new events (should NOT be archived)
newEvents := []Event{
{Type: EventTypeDetection, Zone: "Bedroom", DetailJSON: `{"new":1}`, TimestampMs: youngCutoff},
{Type: EventTypeSystem, DetailJSON: `{"new":2}`, TimestampMs: now},
}
for _, e := range oldEvents {
_, err := InsertEvent(db, e)
if err != nil {
t.Fatalf("failed to insert old event: %v", err)
}
}
for _, e := range newEvents {
_, err := InsertEvent(db, e)
if err != nil {
t.Fatalf("failed to insert new event: %v", err)
}
}
// Verify initial state
var countBefore int
err := db.QueryRow("SELECT COUNT(*) FROM events").Scan(&countBefore)
if err != nil {
t.Fatalf("failed to count events before archive: %v", err)
}
if countBefore != 4 {
t.Fatalf("expected 4 events before archive, got %d", countBefore)
}
// Run archive job
err = RunArchiveJob(db)
if err != nil {
t.Fatalf("RunArchiveJob() error = %v", err)
}
// Verify events table now has only new events
var countAfter int
err = db.QueryRow("SELECT COUNT(*) FROM events").Scan(&countAfter)
if err != nil {
t.Fatalf("failed to count events after archive: %v", err)
}
if countAfter != 2 {
t.Errorf("events table has %d rows after archive, want 2", countAfter)
}
// Verify archive table has the old events
var archiveCount int
err = db.QueryRow("SELECT COUNT(*) FROM events_archive").Scan(&archiveCount)
if err != nil {
t.Fatalf("failed to count archived events: %v", err)
}
if archiveCount != 2 {
t.Errorf("events_archive table has %d rows, want 2", archiveCount)
}
// Verify FTS5 table was updated (old events removed)
var ftsCount int
err = db.QueryRow("SELECT COUNT(*) FROM events_fts").Scan(&ftsCount)
if err != nil {
t.Fatalf("failed to count FTS entries: %v", err)
}
if ftsCount != 2 {
t.Errorf("events_fts table has %d rows after archive, want 2", ftsCount)
}
// Verify the remaining events are the new ones by type
var types []string
rows, err := db.Query("SELECT type FROM events ORDER BY timestamp_ms DESC")
if err != nil {
t.Fatalf("failed to query remaining events: %v", err)
}
defer rows.Close()
for rows.Next() {
var eType string
if err := rows.Scan(&eType); err != nil {
t.Fatalf("failed to scan type: %v", err)
}
types = append(types, eType)
}
if len(types) != 2 {
t.Fatalf("expected 2 remaining events, got %d", len(types))
}
// Should have one Detection event and one System event
hasDetection := false
hasSystem := false
for _, t := range types {
if t == string(EventTypeDetection) {
hasDetection = true
}
if t == string(EventTypeSystem) {
hasSystem = true
}
}
if !hasDetection || !hasSystem {
t.Errorf("remaining event types are %v, want Detection and System", types)
}
}
func TestRunArchiveJobEmpty(t *testing.T) {
db := openTestDB(t)
defer db.Close()
// Run archive job on empty database
err := RunArchiveJob(db)
if err != nil {
t.Fatalf("RunArchiveJob() error = %v", err)
}
// Verify nothing broke
var count int
err = db.QueryRow("SELECT COUNT(*) FROM events").Scan(&count)
if err != nil {
t.Fatalf("failed to count events: %v", err)
}
if count != 0 {
t.Errorf("events table has %d rows, want 0", count)
}
}
func TestGetEventByID(t *testing.T) {
db := openTestDB(t)
defer db.Close()
// Insert a test event
id, err := InsertEvent(db, Event{
Type: EventTypeDetection,
Zone: "Kitchen",
Person: "Alice",
DetailJSON: `{"test":true}`,
Severity: SeverityInfo,
})
if err != nil {
t.Fatalf("failed to insert event: %v", err)
}
tests := []struct {
name string
id int64
wantErr bool
check func(*testing.T, *Event)
}{
{
name: "existing event",
id: id,
check: func(t *testing.T, e *Event) {
if e.Type != EventTypeDetection {
t.Errorf("Type = %v, want %v", e.Type, EventTypeDetection)
}
if e.Zone != "Kitchen" {
t.Errorf("Zone = %q, want %q", e.Zone, "Kitchen")
}
if e.Person != "Alice" {
t.Errorf("Person = %q, want %q", e.Person, "Alice")
}
},
},
{
name: "non-existent event",
id: 99999,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
event, err := GetEventByID(db, tt.id)
if (err != nil) != tt.wantErr {
t.Errorf("GetEventByID() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !tt.wantErr && tt.check != nil {
tt.check(t, event)
}
})
}
}
func TestInsertDetectionEvent(t *testing.T) {
db := openTestDB(t)
defer db.Close()
id, err := InsertDetectionEvent(db, "Kitchen", "Alice", 42, map[string]interface{}{
"confidence": 0.85,
"position": map[string]interface{}{
"x": 1.2,
"y": 3.4,
"z": 0.9,
},
})
if err != nil {
t.Fatalf("InsertDetectionEvent() error = %v", err)
}
if id == 0 {
t.Error("InsertDetectionEvent() returned zero ID")
}
// Verify the event was inserted correctly
var e Event
err = db.QueryRow("SELECT type, zone, person, blob_id, detail_json, severity FROM events WHERE id = ?", id).
Scan(&e.Type, &e.Zone, &e.Person, &e.BlobID, &e.DetailJSON, &e.Severity)
if err != nil {
t.Fatalf("failed to query event: %v", err)
}
if e.Type != EventTypeDetection {
t.Errorf("Type = %v, want %v", e.Type, EventTypeDetection)
}
if e.Zone != "Kitchen" {
t.Errorf("Zone = %q, want %q", e.Zone, "Kitchen")
}
if e.Person != "Alice" {
t.Errorf("Person = %q, want %q", e.Person, "Alice")
}
if e.BlobID != 42 {
t.Errorf("BlobID = %d, want 42", e.BlobID)
}
if e.Severity != SeverityInfo {
t.Errorf("Severity = %v, want %v", e.Severity, SeverityInfo)
}
}
func TestInsertAlertEvent(t *testing.T) {
db := openTestDB(t)
defer db.Close()
id, err := InsertAlertEvent(db, EventTypeFallAlert, "Bathroom", "Alice", SeverityAlert, map[string]interface{}{
"z_velocity": -1.8,
"z": 0.3,
})
if err != nil {
t.Fatalf("InsertAlertEvent() error = %v", err)
}
if id == 0 {
t.Error("InsertAlertEvent() returned zero ID")
}
// Verify the event was inserted correctly
var severity string
err = db.QueryRow("SELECT severity FROM events WHERE id = ?", id).Scan(&severity)
if err != nil {
t.Fatalf("failed to query event: %v", err)
}
if severity != string(SeverityAlert) {
t.Errorf("Severity = %q, want %q", severity, SeverityAlert)
}
}
func TestInsertSystemEvent(t *testing.T) {
db := openTestDB(t)
defer db.Close()
id, err := InsertSystemEvent(db, "Mothership started", nil)
if err != nil {
t.Fatalf("InsertSystemEvent() error = %v", err)
}
if id == 0 {
t.Error("InsertSystemEvent() returned zero ID")
}
// Verify the event was inserted correctly
var eType string
var detailJSON string
err = db.QueryRow("SELECT type, detail_json FROM events WHERE id = ?", id).Scan(&eType, &detailJSON)
if err != nil {
t.Fatalf("failed to query event: %v", err)
}
if eType != string(EventTypeSystem) {
t.Errorf("Type = %q, want %q", eType, EventTypeSystem)
}
}