diff --git a/dashboard/index.html b/dashboard/index.html index 94234e3..9247106 100644 --- a/dashboard/index.html +++ b/dashboard/index.html @@ -1987,6 +1987,89 @@ border-radius: 3px; transition: width 0.5s; } + /* WebSocket reconnect spinner */ + #ws-reconnect-spinner { + display: none; + width: 14px; + height: 14px; + border: 2px solid rgba(255, 167, 38, 0.3); + border-top-color: #ffa726; + border-radius: 50%; + animation: ws-spin 0.8s linear infinite; + margin-left: 4px; + } + #ws-reconnect-spinner.visible { + display: inline-block; + } + @keyframes ws-spin { + to { transform: rotate(360deg); } + } + + /* Connection status dot — reconnecting (amber) */ + .status-dot.reconnecting { + background: #ffa726; + box-shadow: 0 0 8px #ffa726; + animation: ws-pulse 1.5s ease-in-out infinite; + } + @keyframes ws-pulse { + 0%, 100% { opacity: 1; } + 50% { opacity: 0.4; } + } + + /* Connection lost modal (>30s) */ + .ws-lost-modal { + position: fixed; + top: 0; left: 0; right: 0; bottom: 0; + background: rgba(0, 0, 0, 0.6); + z-index: 200; + display: flex; + align-items: center; + justify-content: center; + } + .ws-lost-modal-content { + background: #1e1e3a; + border: 1px solid #333; + border-radius: 12px; + padding: 32px; + text-align: center; + max-width: 380px; + } + .ws-lost-modal-content h3 { + font-size: 20px; + margin-bottom: 12px; + color: #ef5350; + } + .ws-lost-modal-content p { + font-size: 14px; + color: #aaa; + margin-bottom: 24px; + line-height: 1.5; + } + .ws-lost-reload-btn { + background: #4fc3f7; + color: #1a1a2e; + border: none; + padding: 10px 24px; + border-radius: 6px; + font-size: 14px; + cursor: pointer; + margin-right: 8px; + } + .ws-lost-reload-btn:hover { + background: #29b6f6; + } + .ws-lost-dismiss-btn { + background: rgba(255, 255, 255, 0.1); + color: #ccc; + border: none; + padding: 10px 24px; + border-radius: 6px; + font-size: 14px; + cursor: pointer; + } + .ws-lost-dismiss-btn:hover { + background: rgba(255, 255, 255, 0.15); + } @@ -2022,6 +2105,7 @@
Disconnected +
Nodes: 0 @@ -2159,6 +2243,8 @@ + + diff --git a/dashboard/js/app.js b/dashboard/js/app.js index 9fcfc93..670347c 100644 --- a/dashboard/js/app.js +++ b/dashboard/js/app.js @@ -12,7 +12,6 @@ // Configuration // ============================================ const CONFIG = { - wsReconnectDelay: 3000, gridWidth: 10, // meters gridDepth: 10, // meters gridDivisions: 20, @@ -371,58 +370,32 @@ } // ============================================ - // WebSocket Connection + // WebSocket Connection (via SpaxelWebSocket) // ============================================ function connectWebSocket() { - const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; - const wsURL = `${wsProtocol}//${window.location.host}/ws/dashboard`; + // Initialize the WebSocket manager with callbacks + SpaxelWebSocket.init({ + onOpen: function(ws) { + console.log('[Spaxel] WebSocket connected'); + state.wsConnected = true; + state.awaitingSnapshot = true; + }, + onMessage: function(data) { + handleMessage(data); + }, + onClose: function(event) { + console.log('[Spaxel] WebSocket closed:', event.code, event.reason); + state.wsConnected = false; - console.log('[Spaxel] Connecting to', wsURL); + // Start blob extrapolation using captured blob states + SpaxelWebSocket.startExtrapolation(); + }, + onError: function(error) { + console.error('[Spaxel] WebSocket error:', error); + } + }); - state.ws = new WebSocket(wsURL); - state.ws.binaryType = 'arraybuffer'; - - state.ws.onopen = function() { - console.log('[Spaxel] WebSocket connected'); - state.wsConnected = true; - state.awaitingSnapshot = true; - updateConnectionStatus(true); - }; - - state.ws.onclose = function(event) { - console.log('[Spaxel] WebSocket closed:', event.code, event.reason); - state.wsConnected = false; - updateConnectionStatus(false); - scheduleReconnect(); - }; - - state.ws.onerror = function(error) { - console.error('[Spaxel] WebSocket error:', error); - }; - - state.ws.onmessage = function(event) { - handleMessage(event.data); - }; - } - - function scheduleReconnect() { - console.log('[Spaxel] Reconnecting in', CONFIG.wsReconnectDelay, 'ms'); - setTimeout(connectWebSocket, CONFIG.wsReconnectDelay); - } - - function updateConnectionStatus(connected) { - const dot = document.getElementById('ws-status'); - const text = document.getElementById('ws-status-text'); - - if (connected) { - dot.classList.remove('disconnected'); - dot.classList.add('connected'); - text.textContent = 'Connected'; - } else { - dot.classList.remove('connected'); - dot.classList.add('disconnected'); - text.textContent = 'Disconnected'; - } + SpaxelWebSocket.connect(); } // ============================================ @@ -776,8 +749,17 @@ function handleSnapshot(msg) { state.awaitingSnapshot = false; + + // On reconnect: clear trails, restore scene, log duration + if (SpaxelWebSocket.isConnected()) { + SpaxelWebSocket.onReconnected(); + } + console.log('[Spaxel] Received snapshot, rebuilding state'); + // Store snapshot for blob extrapolation on future disconnects + SpaxelWebSocket.setLastSnapshot(msg); + // Nodes if (msg.nodes) { state.nodes.clear(); diff --git a/dashboard/js/viz3d.js b/dashboard/js/viz3d.js index 7b6d73e..74c1c4a 100644 --- a/dashboard/js/viz3d.js +++ b/dashboard/js/viz3d.js @@ -670,6 +670,7 @@ const Viz3D = (function () { obj.group.position.set(b.x, 0, b.z); obj.lastPosition = { x: b.x, z: b.z }; + obj.lastVelocity = { vx: b.vx || 0, vz: b.vz || 0 }; const speed = Math.sqrt(b.vx*b.vx + b.vz*b.vz); _setPosture(obj.humanoid, speed > 0.25 ? 'walking' : 'standing'); @@ -1988,6 +1989,51 @@ const Viz3D = (function () { _fresnelZones = []; } + // ── WebSocket reconnect helpers ───────────────────────────────────────── + + /** + * Clear all blob trails (called on reconnect). + */ + function clearAllTrails() { + _blobs3D.forEach(function (obj) { + var arr = obj.trail.geometry.attributes.position.array; + arr.fill(0); + obj.trail.geometry.attributes.position.needsUpdate = true; + obj.trail.geometry.setDrawRange(0, 0); + }); + } + + /** + * Extrapolate a single blob's position during disconnect. + * @param {number} blobId + * @param {number} x - new X position + * @param {number} z - new Z position + */ + function extrapolateBlobPosition(blobId, x, z) { + var obj = _blobs3D.get(blobId); + if (!obj) return; + obj.group.position.set(x, 0, z); + } + + /** + * Get current blob states for extrapolation on disconnect. + * Returns array of { id, x, z, vx, vz } for each tracked blob. + * @returns {Array} + */ + function getBlobStates() { + var states = []; + _blobs3D.forEach(function (obj, blobId) { + states.push({ + id: blobId, + x: obj.lastPosition ? obj.lastPosition.x : 0, + z: obj.lastPosition ? obj.lastPosition.z : 0, + vx: obj.lastVelocity ? obj.lastVelocity.vx : 0, + vz: obj.lastVelocity ? obj.lastVelocity.vz : 0 + }); + }); + return states; + } + // ── Public API ──────────────────────────────────────────────────────────── return { init, @@ -1999,6 +2045,10 @@ const Viz3D = (function () { applyLinks, uploadFloorPlan, setViewPreset, + // WebSocket reconnect helpers + clearAllTrails: clearAllTrails, + extrapolateBlobPosition: extrapolateBlobPosition, + getBlobStates: getBlobStates, getNodeMesh: function (mac) { return _nodeMeshes.get(mac); }, rebuildLinkLines: _rebuildLinkLines, // Ghost node API diff --git a/dashboard/js/websocket.js b/dashboard/js/websocket.js new file mode 100644 index 0000000..031ed13 --- /dev/null +++ b/dashboard/js/websocket.js @@ -0,0 +1,405 @@ +/** + * Spaxel Dashboard - WebSocket Reconnection Manager + * + * Handles robust reconnection with exponential backoff, jitter, + * disconnect state tracking, blob extrapolation, and visual state transitions. + */ +(function () { + 'use strict'; + + // ── Configuration ─────────────────────────────────────────────────────── + var BACKOFF_BASE_MS = 1000; // 1s initial + var BACKOFF_MAX_MS = 10000; // 10s cap + var JITTER_MS = 500; // ±500ms random jitter + var SILENT_MS = 5000; // <5s: no UI change, extrapolate blobs + var DIMMING_MS = 30000; // 30s: show modal + var EXTRAP_MAX_S = 2.0; // max extrapolation duration in seconds + + // ── Internal state ─────────────────────────────────────────────────────── + var _ws = null; + var _connected = false; + var _connecting = false; + var _reconnectTimer = null; + var _reconnectAttempt = 0; + var _disconnectStart = null; // Date.now() when disconnect began + var _lastSnapshot = null; // last received snapshot (for blob data) + var _blobStates = new Map(); // blobId -> { x, z, vx, vz, ts } + var _extrapolRAF = null; // requestAnimationFrame id for extrapolation + var _dimOverlay = null; // THREE.js overlay plane for dimming + var _modalShown = false; + + // ── Callbacks (set by app.js) ─────────────────────────────────────────── + var _onOpen = null; // function(ws) — called after successful connect + var _onMessage = null; // function(data) — called for each message + var _onClose = null; // function(event) — called after ws close + var _onError = null; // function(error) — called on ws error + + // ── Viz3D references (set by app.js during init) ───────────────────────── + var _scene = null; + var _renderer = null; + + // ── Exponential backoff with jitter ────────────────────────────────────── + function _backoffMs() { + var delay = Math.min(BACKOFF_BASE_MS * Math.pow(2, _reconnectAttempt), BACKOFF_MAX_MS); + var jitter = (Math.random() * 2 - 1) * JITTER_MS; // -500 to +500 + return Math.max(100, delay + jitter); + } + + // ── Connection lifecycle ───────────────────────────────────────────────── + function connect(url) { + if (_ws && (_ws.readyState === WebSocket.OPEN || _ws.readyState === WebSocket.CONNECTING)) { + return; + } + + _connecting = true; + console.log('[WS] Connecting (attempt ' + (_reconnectAttempt + 1) + ')...'); + + try { + _ws = new WebSocket(url); + } catch (e) { + console.error('[WS] Failed to create WebSocket:', e); + _connecting = false; + _scheduleReconnect(); + return; + } + + _ws.binaryType = 'arraybuffer'; + + _ws.onopen = function () { + _connected = true; + _connecting = false; + _reconnectAttempt = 0; + _stopExtrapolation(); + console.log('[WS] Connected'); + _updateStatusUI('connected'); + _fireOnOpen(_ws); + }; + + _ws.onclose = function (event) { + console.log('[WS] Closed:', event.code, event.reason); + _connected = false; + _connecting = false; + _ws = null; + + if (!_disconnectStart) { + _disconnectStart = Date.now(); + _captureBlobStates(); + } + + _updateStatusUI('disconnected'); + _startDisconnectTimer(); + _scheduleReconnect(); + _fireOnClose(event); + }; + + _ws.onerror = function (error) { + console.error('[WS] Error:', error); + _fireOnError(error); + }; + + _ws.onmessage = function (event) { + _fireOnMessage(event.data); + }; + } + + function disconnect() { + _stopDisconnectTimer(); + _stopExtrapolation(); + if (_reconnectTimer) { + clearTimeout(_reconnectTimer); + _reconnectTimer = null; + } + if (_ws) { + _ws.onclose = null; // prevent reconnect + _ws.close(); + _ws = null; + } + _connected = false; + _connecting = false; + _disconnectStart = null; + _reconnectAttempt = 0; + _restoreScene(); + _dismissModal(); + _updateStatusUI('disconnected'); + } + + function _scheduleReconnect() { + if (_reconnectTimer) clearTimeout(_reconnectTimer); + var delay = _backoffMs(); + console.log('[WS] Reconnecting in', Math.round(delay), 'ms'); + _reconnectTimer = setTimeout(function () { + _reconnectTimer = null; + _reconnectAttempt++; + var wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + connect(wsProtocol + '//' + window.location.host + '/ws/dashboard'); + }, delay); + } + + // ── Disconnect state timer ────────────────────────────────────────────── + var _disconnectTimer = null; + + function _startDisconnectTimer() { + _stopDisconnectTimer(); + _disconnectTimer = setInterval(function () { + if (_connected || !_disconnectStart) return; + var elapsed = Date.now() - _disconnectStart; + if (elapsed >= SILENT_MS && elapsed < DIMMING_MS) { + _applyDimming(); + } else if (elapsed >= DIMMING_MS) { + _showModal(); + } + }, 500); + } + + function _stopDisconnectTimer() { + if (_disconnectTimer) { + clearInterval(_disconnectTimer); + _disconnectTimer = null; + } + } + + // ── Blob position extrapolation (<5s) ──────────────────────────────────── + function _captureBlobStates() { + // Capture current blob positions and velocities from last known state + if (!_lastSnapshot || !_lastSnapshot.blobs) return; + + _lastSnapshot.blobs.forEach(function (b) { + _blobStates.set(b.id, { + x: b.x, + z: b.z, + vx: b.vx || 0, + vz: b.vz || 0, + ts: Date.now() + }); + }); + } + + function _startExtrapolation() { + if (_blobStates.size === 0) return; + if (_extrapolRAF) return; // already running + + function tick() { + if (_connected) return; + var now = Date.now(); + + _blobStates.forEach(function (state, blobId) { + var elapsed = (now - state.ts) / 1000; // seconds + if (elapsed > EXTRAP_MAX_S) return; // cap extrapolation + + // position = last_position + last_velocity * elapsed + var newX = state.x + state.vx * elapsed; + var newZ = state.z + state.vz * elapsed; + + // Update Viz3D blob position + if (window.Viz3D && Viz3D.extrapolateBlobPosition) { + Viz3D.extrapolateBlobPosition(blobId, newX, newZ); + } + }); + + _extrapolRAF = requestAnimationFrame(tick); + } + + _extrapolRAF = requestAnimationFrame(tick); + } + + function _stopExtrapolation() { + if (_extrapolRAF) { + cancelAnimationFrame(_extrapolRAF); + _extrapolRAF = null; + } + _blobStates.clear(); + } + + // ── Visual state transitions ───────────────────────────────────────────── + + // Dim overlay: semi-transparent plane in front of camera + function _applyDimming() { + if (_dimOverlay) return; // already dimmed + if (!_scene) return; + + // Use a screen-space overlay via CSS instead of 3D plane for simplicity + var overlay = document.getElementById('ws-dim-overlay'); + if (!overlay) { + overlay = document.createElement('div'); + overlay.id = 'ws-dim-overlay'; + overlay.style.cssText = 'position:fixed;top:0;left:0;right:0;bottom:0;' + + 'background:rgba(0,0,0,0.5);z-index:50;pointer-events:auto;' + + 'transition:opacity 0.5s ease;opacity:0;'; + document.body.appendChild(overlay); + } + // Trigger fade in + requestAnimationFrame(function () { + overlay.style.opacity = '1'; + }); + _dimOverlay = overlay; + + // Show reconnecting spinner in status bar + var spinner = document.getElementById('ws-reconnect-spinner'); + if (spinner) spinner.classList.add('visible'); + var reconnectText = document.getElementById('ws-status-text'); + if (reconnectText) reconnectText.textContent = 'Reconnecting...'; + } + + function _restoreScene() { + if (_dimOverlay) { + _dimOverlay.style.opacity = '0'; + setTimeout(function () { + if (_dimOverlay && _dimOverlay.parentNode) { + _dimOverlay.parentNode.removeChild(_dimOverlay); + } + _dimOverlay = null; + }, 500); + } + + var spinner = document.getElementById('ws-reconnect-spinner'); + if (spinner) spinner.classList.remove('visible'); + + if (_connected) { + var reconnectText = document.getElementById('ws-status-text'); + if (reconnectText) reconnectText.textContent = 'Connected'; + } + } + + // ── Connection lost modal (>30s) ──────────────────────────────────────── + function _showModal() { + if (_modalShown) return; + _modalShown = true; + + // Stop extrapolation when showing modal (scene is stale anyway) + _stopExtrapolation(); + + var existing = document.getElementById('ws-lost-modal'); + if (existing) return; + + var modal = document.createElement('div'); + modal.id = 'ws-lost-modal'; + modal.className = 'ws-lost-modal'; + modal.innerHTML = + '
' + + '

Connection lost

' + + '

The dashboard lost connection to the mothership. ' + + 'The scene below shows the last known state.

' + + '' + + '' + + '
'; + document.body.appendChild(modal); + } + + function _dismissModal() { + _modalShown = false; + var modal = document.getElementById('ws-lost-modal'); + if (modal && modal.parentNode) { + modal.parentNode.removeChild(modal); + } + } + + // ── Status indicator ──────────────────────────────────────────────────── + function _updateStatusUI(status) { + var dot = document.getElementById('ws-status'); + var text = document.getElementById('ws-status-text'); + if (!dot || !text) return; + + dot.classList.remove('connected', 'disconnected', 'reconnecting'); + + if (status === 'connected') { + dot.classList.add('connected'); + text.textContent = 'Connected'; + } else if (status === 'reconnecting') { + dot.classList.add('reconnecting'); + text.textContent = 'Reconnecting...'; + } else { + dot.classList.add('disconnected'); + // Don't override text if already set to "Reconnecting..." + if (_disconnectStart && Date.now() - _disconnectStart >= SILENT_MS) { + text.textContent = 'Reconnecting...'; + } else { + text.textContent = 'Disconnected'; + } + } + } + + // ── Snapshot tracking (for blob extrapolation on disconnect) ──────────── + function setLastSnapshot(snapshot) { + _lastSnapshot = snapshot; + } + + // ── Reconnect handler: clear trails, restore scene ────────────────────── + function onReconnected() { + var elapsed = _disconnectStart ? ((Date.now() - _disconnectStart) / 1000).toFixed(1) : '?'; + console.log('[WS] Reconnected after ' + elapsed + 's'); + + // Clear disconnect state + _disconnectStart = null; + _restoreScene(); + _dismissModal(); + _stopDisconnectTimer(); + + // Clear blob trails in Viz3D + if (window.Viz3D && Viz3D.clearAllTrails) { + Viz3D.clearAllTrails(); + } + } + + // ── Callbacks ─────────────────────────────────────────────────────────── + function _fireOnOpen(ws) { + if (typeof _onOpen === 'function') _onOpen(ws); + } + function _fireOnMessage(data) { + if (typeof _onMessage === 'function') _onMessage(data); + } + function _fireOnClose(event) { + if (typeof _onClose === 'function') _onClose(event); + } + function _fireOnError(error) { + if (typeof _onError === 'function') _onError(error); + } + + // ── Public API ────────────────────────────────────────────────────────── + window.SpaxelWebSocket = { + /** + * Initialize the WebSocket manager. + * @param {Object} opts + * @param {Function} opts.onOpen — called with ws after connect + * @param {Function} opts.onMessage — called with raw message data + * @param {Function} opts.onClose — called with CloseEvent + * @param {Function} opts.onError — called with ErrorEvent + */ + init: function (opts) { + if (opts.onOpen) _onOpen = opts.onOpen; + if (opts.onMessage) _onMessage = opts.onMessage; + if (opts.onClose) _onClose = opts.onClose; + if (opts.onError) _onError = opts.onError; + }, + + /** Connect (or reconnect) to the dashboard WebSocket. */ + connect: connect, + + /** Disconnect and stop all reconnection attempts. */ + disconnect: disconnect, + + /** Record the last snapshot for blob extrapolation. */ + setLastSnapshot: setLastSnapshot, + + /** Called when the connection is re-established (from snapshot handler). */ + onReconnected: onReconnected, + + /** Start blob extrapolation (called on disconnect if <5s). */ + startExtrapolation: _startExtrapolation, + + /** Get current connection state. */ + isConnected: function () { return _connected; }, + isConnecting: function () { return _connecting; }, + getDisconnectDurationMs: function () { + return _disconnectStart ? (Date.now() - _disconnectStart) : 0; + }, + + /** Send raw data over the WebSocket. */ + send: function (data) { + if (_ws && _ws.readyState === WebSocket.OPEN) { + _ws.send(data); + } + } + }; + + console.log('[WS] Reconnection manager initialized'); +})(); diff --git a/mothership/internal/db/migrations.go b/mothership/internal/db/migrations.go index 0830426..99dca19 100644 --- a/mothership/internal/db/migrations.go +++ b/mothership/internal/db/migrations.go @@ -58,6 +58,11 @@ func AllMigrations() []Migration { Description: "add floorplan table for image upload and calibration", Up: migration_010_add_floorplan, }, + { + Version: 11, + Description: "add FTS5 table and triggers for events search", + Up: migration_011_add_events_fts, + }, } } @@ -504,3 +509,34 @@ CREATE TABLE IF NOT EXISTS floorplan ( _, err := tx.Exec(schema) return err } + +// migration_011_add_events_fts adds FTS5 full-text search for events. +func migration_011_add_events_fts(tx *sql.Tx) error { + schema := ` +-- FTS5 index for natural-language search across event detail +CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5( + type, zone, person, detail_json, + content='events', content_rowid='id' +); + +-- Triggers to keep events_fts in sync with the events table +CREATE TRIGGER IF NOT EXISTS events_fts_insert AFTER INSERT ON events BEGIN + INSERT INTO events_fts(rowid, type, zone, person, detail_json) + VALUES (new.id, new.type, new.zone, new.person, new.detail_json); +END; + +CREATE TRIGGER IF NOT EXISTS events_fts_delete AFTER DELETE ON events BEGIN + INSERT INTO events_fts(events_fts, rowid, type, zone, person, detail_json) + VALUES ('delete', old.id, old.type, old.zone, old.person, old.detail_json); +END; + +CREATE TRIGGER IF NOT EXISTS events_fts_update AFTER UPDATE ON events BEGIN + INSERT INTO events_fts(events_fts, rowid, type, zone, person, detail_json) + VALUES ('delete', old.id, old.type, old.zone, old.person, old.detail_json); + INSERT INTO events_fts(rowid, type, zone, person, detail_json) + VALUES (new.id, new.type, new.zone, new.person, new.detail_json); +END; +` + _, err := tx.Exec(schema) + return err +} diff --git a/mothership/internal/events/events.go b/mothership/internal/events/events.go index 28fb0c7..9d1c275 100644 --- a/mothership/internal/events/events.go +++ b/mothership/internal/events/events.go @@ -1,177 +1,362 @@ -// Package events provides event types for the spaxel system. +// Package events provides event types and SQLite storage for the unified activity timeline. package events import ( + "database/sql" + "encoding/json" + "fmt" + "log" "time" + + _ "modernc.org/sqlite" ) -// AnomalyType represents the type of anomaly detected. -type AnomalyType string +// EventType represents the type of an event. +type EventType string const ( - AnomalyUnusualHour AnomalyType = "unusual_hour" // Motion at unusual time - AnomalyUnknownBLE AnomalyType = "unknown_ble" // Unknown BLE device nearby - AnomalyMotionDuringAway AnomalyType = "motion_during_away" // Motion when system is in away mode - AnomalyUnusualDwell AnomalyType = "unusual_dwell" // Person in zone longer than usual + EventTypeDetection EventType = "detection" + EventTypeZoneEntry EventType = "zone_entry" + EventTypeZoneExit EventType = "zone_exit" + EventTypePortalCrossing EventType = "portal_crossing" + EventTypeTriggerFired EventType = "trigger_fired" + EventTypeFallAlert EventType = "fall_alert" + EventTypeAnomaly EventType = "anomaly" + EventTypeSecurityAlert EventType = "security_alert" + EventTypeNodeOnline EventType = "node_online" + EventTypeNodeOffline EventType = "node_offline" + EventTypeOTAUpdate EventType = "ota_update" + EventTypeBaselineChanged EventType = "baseline_changed" + EventTypeSystem EventType = "system" + EventTypeLearningMilestone EventType = "learning_milestone" ) -// AnomalyEvent represents a detected anomaly. -type AnomalyEvent struct { - ID string `json:"id"` - Type AnomalyType `json:"type"` - Score float64 `json:"score"` // 0.0 - 1.0, higher = more anomalous - Description string `json:"description"` // Human-readable description - Timestamp time.Time `json:"timestamp"` - - // Context for the anomaly - ZoneID string `json:"zone_id,omitempty"` - ZoneName string `json:"zone_name,omitempty"` - BlobID int `json:"blob_id,omitempty"` - PersonID string `json:"person_id,omitempty"` - PersonName string `json:"person_name,omitempty"` - DeviceMAC string `json:"device_mac,omitempty"` - DeviceName string `json:"device_name,omitempty"` - Position Position `json:"position,omitempty"` - - // For unusual hour anomalies - HourOfWeek int `json:"hour_of_week,omitempty"` // 0-167 - ExpectedOccupancy float64 `json:"expected_occupancy,omitempty"` // 0.0-1.0 - - // For unusual dwell anomalies - DwellDuration time.Duration `json:"dwell_duration,omitempty"` - ExpectedDwell time.Duration `json:"expected_dwell,omitempty"` - - // For BLE anomalies - RSSIdBm int `json:"rssi_dbm,omitempty"` - SeenBefore bool `json:"seen_before,omitempty"` // Was this device seen before (even if not regular) - - // Acknowledgement state - Acknowledged bool `json:"acknowledged"` - AcknowledgedAt time.Time `json:"acknowledged_at,omitempty"` - Feedback string `json:"feedback,omitempty"` // "expected", "intrusion", "false_alarm" - AcknowledgedBy string `json:"acknowledged_by,omitempty"` // User who acknowledged - - // Alert chain state - AlertSent bool `json:"alert_sent"` - WebhookSent bool `json:"webhook_sent"` - EscalationSent bool `json:"escalation_sent"` - AlertSentAt time.Time `json:"alert_sent_at,omitempty"` - WebhookSentAt time.Time `json:"webhook_sent_at,omitempty"` - EscalationSentAt time.Time `json:"escalation_sent_at,omitempty"` -} - -// Position represents a 3D position. -type Position struct { - X float64 `json:"x"` - Y float64 `json:"y"` - Z float64 `json:"z"` -} - -// SystemMode represents the system operating mode. -type SystemMode string +// EventSeverity represents the severity level of an event. +type EventSeverity string const ( - ModeHome SystemMode = "home" - ModeAway SystemMode = "away" - ModeSleep SystemMode = "sleep" + SeverityInfo EventSeverity = "info" + SeverityWarning EventSeverity = "warning" + SeverityAlert EventSeverity = "alert" + SeverityCritical EventSeverity = "critical" ) -// SystemModeChangeEvent represents a change in system mode. -type SystemModeChangeEvent struct { - PreviousMode SystemMode `json:"previous_mode"` - NewMode SystemMode `json:"new_mode"` - Reason string `json:"reason"` // "auto_away", "auto_disarm", "manual" - Timestamp time.Time `json:"timestamp"` - PersonID string `json:"person_id,omitempty"` // For auto-disarm - PersonName string `json:"person_name,omitempty"` // For auto-disarm +// Event represents a single event in the unified activity timeline. +type Event struct { + ID int64 + TimestampMs int64 + Type EventType + Zone string + Person string + BlobID int + DetailJSON string + Severity EventSeverity } -// AnomalyFeedback is used for recording user feedback on anomalies. -type AnomalyFeedback struct { - AnomalyID string `json:"anomaly_id"` - Feedback string `json:"feedback"` // "expected", "intrusion", "false_alarm" - Timestamp time.Time `json:"timestamp"` - Notes string `json:"notes,omitempty"` +// QueryParams defines parameters for querying events. +type QueryParams struct { + Limit int + BeforeID int64 // Cursor for pagination + AfterID int64 + Type EventType + Zone string + Person string + AfterTime time.Time + BeforeTime time.Time + SearchQuery string // FTS5 search query } -// WeeklyAnomalySummary provides a summary of anomalies for the past week. -type WeeklyAnomalySummary struct { - TotalAnomalies int `json:"total_anomalies"` - FalseAlarms int `json:"false_alarms"` - GenuineIntrusions int `json:"genuine_intrusions"` - ExpectedEvents int `json:"expected_events"` - Unacknowledged int `json:"unacknowledged"` - ByType map[AnomalyType]int `json:"by_type"` +// InsertEvent inserts a new event into the database. +func InsertEvent(db *sql.DB, e Event) (int64, error) { + if e.TimestampMs == 0 { + e.TimestampMs = time.Now().UnixMilli() + } + if e.Severity == "" { + e.Severity = SeverityInfo + } + + result, err := db.Exec(` + INSERT INTO events (timestamp_ms, type, zone, person, blob_id, detail_json, severity) + VALUES (?, ?, ?, ?, ?, ?, ?) + `, e.TimestampMs, e.Type, e.Zone, e.Person, e.BlobID, e.DetailJSON, e.Severity) + if err != nil { + return 0, fmt.Errorf("insert event: %w", err) + } + + id, err := result.LastInsertId() + if err != nil { + return 0, fmt.Errorf("get last insert id: %w", err) + } + + return id, nil } -// OccupancySample represents a single occupancy observation for behaviour modelling. -type OccupancySample struct { - HourOfWeek int `json:"hour_of_week"` // 0-167 (hour of week) - ZoneID string `json:"zone_id"` - PersonCount int `json:"person_count"` - BLEDevices []string `json:"ble_devices,omitempty"` - Timestamp time.Time `json:"timestamp"` +// QueryEvents retrieves events from the database based on the provided parameters. +// Returns the events, the next cursor ID for pagination, and whether there are more results. +func QueryEvents(db *sql.DB, params QueryParams) ([]Event, string, bool, error) { + if params.Limit <= 0 { + params.Limit = 50 + } + if params.Limit > 1000 { + params.Limit = 1000 // Max limit + } + + query := ` + SELECT id, timestamp_ms, type, zone, person, blob_id, detail_json, severity + FROM events + WHERE 1=1 + ` + args := []interface{}{} + + // Cursor pagination + if params.BeforeID > 0 { + query += " AND id < ?" + args = append(args, params.BeforeID) + } else if params.AfterID > 0 { + query += " AND id > ?" + args = append(args, params.AfterID) + } + + // Time range filters + if !params.AfterTime.IsZero() { + query += " AND timestamp_ms >= ?" + args = append(args, params.AfterTime.UnixMilli()) + } + if !params.BeforeTime.IsZero() { + query += " AND timestamp_ms <= ?" + args = append(args, params.BeforeTime.UnixMilli()) + } + + // Type filter + if params.Type != "" { + query += " AND type = ?" + args = append(args, params.Type) + } + + // Zone filter + if params.Zone != "" { + query += " AND zone = ?" + args = append(args, params.Zone) + } + + // Person filter + if params.Person != "" { + query += " AND person = ?" + args = append(args, params.Person) + } + + // FTS5 full-text search (must use the FTS table) + if params.SearchQuery != "" { + // Use subquery to search FTS and join with events table + query = ` + SELECT e.id, e.timestamp_ms, e.type, e.zone, e.person, e.blob_id, e.detail_json, e.severity + FROM events e + INNER JOIN events_fts fts ON e.id = fts.rowid + WHERE events_fts MATCH ? + ` + args = []interface{}{params.SearchQuery} + + // Reapply other filters to the subquery + if params.BeforeID > 0 { + query += " AND e.id < ?" + args = append(args, params.BeforeID) + } else if params.AfterID > 0 { + query += " AND e.id > ?" + args = append(args, params.AfterID) + } + if !params.AfterTime.IsZero() { + query += " AND e.timestamp_ms >= ?" + args = append(args, params.AfterTime.UnixMilli()) + } + if !params.BeforeTime.IsZero() { + query += " AND e.timestamp_ms <= ?" + args = append(args, params.BeforeTime.UnixMilli()) + } + if params.Type != "" { + query += " AND e.type = ?" + args = append(args, params.Type) + } + if params.Zone != "" { + query += " AND e.zone = ?" + args = append(args, params.Zone) + } + if params.Person != "" { + query += " AND e.person = ?" + args = append(args, params.Person) + } + } + + query += " ORDER BY id DESC LIMIT ?" + args = append(args, params.Limit+1) // Fetch one extra to check for more results + + rows, err := db.Query(query, args...) + if err != nil { + return nil, "", false, fmt.Errorf("query events: %w", err) + } + defer rows.Close() + + var events []Event + for rows.Next() { + var e Event + err := rows.Scan(&e.ID, &e.TimestampMs, &e.Type, &e.Zone, &e.Person, &e.BlobID, &e.DetailJSON, &e.Severity) + if err != nil { + return nil, "", false, fmt.Errorf("scan event: %w", err) + } + events = append(events, e) + } + + if err := rows.Err(); err != nil { + return nil, "", false, fmt.Errorf("iterate rows: %w", err) + } + + // Check if there are more results + hasMore := len(events) > params.Limit + nextCursor := "" + if hasMore { + // Remove the extra event + events = events[:params.Limit] + nextCursor = fmt.Sprintf("%d", events[len(events)-1].ID) + } + + return events, nextCursor, hasMore, nil } -// DwellSample represents a dwell duration observation for behaviour modelling. -type DwellSample struct { - HourOfWeek int `json:"hour_of_week"` - ZoneID string `json:"zone_id"` - PersonID string `json:"person_id,omitempty"` - DwellDuration time.Duration `json:"dwell_duration"` - Timestamp time.Time `json:"timestamp"` +// ArchiveDays is the number of days after which events are archived. +const ArchiveDays = 90 + +// ArchiveDaysMs is ArchiveDays expressed in milliseconds. +const ArchiveDaysMs = ArchiveDays * 24 * 60 * 60 * 1000 + +// RunArchiveJob moves events older than 90 days from the events table to the events_archive table. +// This should be called nightly (e.g., at 02:00 local time). +func RunArchiveJob(db *sql.DB) error { + // Get the cutoff timestamp + cutoffMs := time.Now().UnixMilli() - ArchiveDaysMs + + // Begin transaction for atomic archive operation + tx, err := db.Begin() + if err != nil { + return fmt.Errorf("begin transaction: %w", err) + } + defer tx.Rollback() + + // Get count of events to be archived + var count int + err = tx.QueryRow("SELECT COUNT(*) FROM events WHERE timestamp_ms < ?", cutoffMs).Scan(&count) + if err != nil { + return fmt.Errorf("count events to archive: %w", err) + } + + if count == 0 { + log.Printf("[events archive] No events to archive (cutoff: %d ms ago)", ArchiveDays) + return nil + } + + log.Printf("[events archive] Archiving %d events older than %d days", count, ArchiveDays) + + // Copy old events to archive table + // We preserve the original ID to maintain referential integrity + _, err = tx.Exec(` + INSERT INTO events_archive (id, timestamp_ms, type, zone, person, blob_id, detail_json, severity) + SELECT id, timestamp_ms, type, zone, person, blob_id, detail_json, severity + FROM events + WHERE timestamp_ms < ? + `, cutoffMs) + if err != nil { + return fmt.Errorf("copy events to archive: %w", err) + } + + // Delete the archived events from the main events table + // The FTS5 triggers will automatically remove them from events_fts + result, err := tx.Exec("DELETE FROM events WHERE timestamp_ms < ?", cutoffMs) + if err != nil { + return fmt.Errorf("delete archived events: %w", err) + } + + // Verify the delete count matches the copy count + rowsAffected, _ := result.RowsAffected() + if rowsAffected != int64(count) { + log.Printf("[WARN] Events archive: copied %d but deleted %d rows", count, rowsAffected) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit archive transaction: %w", err) + } + + log.Printf("[events archive] Successfully archived %d events", count) + return nil } -// SleepSessionStartEvent represents the start of a sleep session. -type SleepSessionStartEvent struct { - SessionID string `json:"session_id"` - PersonID string `json:"person_id,omitempty"` - ZoneID string `json:"zone_id"` - LinkID string `json:"link_id"` - TentativeStart time.Time `json:"tentative_start"` // When conditions first met - ConfirmedStart time.Time `json:"confirmed_start"` // When 15-min confirmation elapsed - SessionDate string `json:"session_date"` // Date this sleep night belongs to - Timestamp time.Time `json:"timestamp"` +// GetEventByID retrieves a single event by its ID. +func GetEventByID(db *sql.DB, id int64) (*Event, error) { + var e Event + err := db.QueryRow(` + SELECT id, timestamp_ms, type, zone, person, blob_id, detail_json, severity + FROM events + WHERE id = ? + `, id).Scan(&e.ID, &e.TimestampMs, &e.Type, &e.Zone, &e.Person, &e.BlobID, &e.DetailJSON, &e.Severity) + if err == sql.ErrNoRows { + return nil, fmt.Errorf("event not found: %d", id) + } + if err != nil { + return nil, fmt.Errorf("query event: %w", err) + } + return &e, nil } -// SleepSessionEndEvent represents the end of a sleep session. -type SleepSessionEndEvent struct { - SessionID string `json:"session_id"` - PersonID string `json:"person_id,omitempty"` - ZoneID string `json:"zone_id"` - LinkID string `json:"link_id"` - SessionDate string `json:"session_date"` - SleepOnset time.Time `json:"sleep_onset"` - WakeTime time.Time `json:"wake_time"` - TimeInBed time.Duration `json:"time_in_bed"` - WakeEpisodeCount int `json:"wake_episode_count"` - WASODuration time.Duration `json:"waso_duration"` - SleepEfficiency float64 `json:"sleep_efficiency"` // 0-100 - AvgBreathingRate float64 `json:"avg_breathing_rate"` // BPM - BreathingAnomalyCount int `json:"breathing_anomaly_count"` - OverallScore float64 `json:"overall_score"` // 0-100 - QualityRating string `json:"quality_rating"` // poor/fair/good/excellent - EndReason string `json:"end_reason"` // "zone_exit", "sustained_motion", "stationary_drop" - Timestamp time.Time `json:"timestamp"` +// InsertDetectionEvent is a convenience function for inserting detection events. +func InsertDetectionEvent(db *sql.DB, zone string, person string, blobID int, detail map[string]interface{}) (int64, error) { + detailJSON, err := json.Marshal(detail) + if err != nil { + return 0, fmt.Errorf("marshal detail: %w", err) + } + + return InsertEvent(db, Event{ + Type: EventTypeDetection, + Zone: zone, + Person: person, + BlobID: blobID, + DetailJSON: string(detailJSON), + Severity: SeverityInfo, + }) } -// MorningSummaryEvent represents the morning sleep summary pushed to dashboards. -type MorningSummaryEvent struct { - SessionID string `json:"session_id"` - PersonID string `json:"person_id,omitempty"` - PersonName string `json:"person_name,omitempty"` - SessionDate string `json:"session_date"` - SleepDuration time.Duration `json:"sleep_duration"` - SleepEfficiency float64 `json:"sleep_efficiency"` - EfficiencyColor string `json:"efficiency_color"` // "green", "amber", "red" - WakeEpisodeCount int `json:"wake_episode_count"` - WASODuration time.Duration `json:"waso_duration"` - AvgBreathingRate float64 `json:"avg_breathing_rate"` - BreathingAnomalyCount int `json:"breathing_anomaly_count"` - AnomalyNote string `json:"anomaly_note,omitempty"` - OverallScore float64 `json:"overall_score"` - QualityRating string `json:"quality_rating"` - SleepOnset time.Time `json:"sleep_onset"` - WakeTime time.Time `json:"wake_time"` - GeneratedAt time.Time `json:"generated_at"` +// InsertAlertEvent is a convenience function for inserting alert events. +func InsertAlertEvent(db *sql.DB, eventType EventType, zone string, person string, severity EventSeverity, detail map[string]interface{}) (int64, error) { + detailJSON, err := json.Marshal(detail) + if err != nil { + return 0, fmt.Errorf("marshal detail: %w", err) + } + + return InsertEvent(db, Event{ + Type: eventType, + Zone: zone, + Person: person, + DetailJSON: string(detailJSON), + Severity: severity, + }) +} + +// InsertSystemEvent is a convenience function for inserting system events. +func InsertSystemEvent(db *sql.DB, message string, detail map[string]interface{}) (int64, error) { + detailJSON, err := json.Marshal(detail) + if err != nil { + return 0, fmt.Errorf("marshal detail: %w", err) + } + + if detail == nil { + detail = make(map[string]interface{}) + } + detail["message"] = message + + detailJSON, err = json.Marshal(detail) + if err != nil { + return 0, fmt.Errorf("marshal detail with message: %w", err) + } + + return InsertEvent(db, Event{ + Type: EventTypeSystem, + DetailJSON: string(detailJSON), + Severity: SeverityInfo, + }) } diff --git a/mothership/internal/events/events_test.go b/mothership/internal/events/events_test.go new file mode 100644 index 0000000..1e6a975 --- /dev/null +++ b/mothership/internal/events/events_test.go @@ -0,0 +1,713 @@ +// Package events provides tests for the unified activity timeline. +package events + +import ( + "database/sql" + "testing" + "time" + + _ "modernc.org/sqlite" +) + +// openTestDB creates an in-memory SQLite database for testing. +func openTestDB(t *testing.T) *sql.DB { + t.Helper() + + db, err := sql.Open("sqlite", ":memory:") + if err != nil { + t.Fatalf("failed to open test database: %v", err) + } + + // Create the schema + schema := ` + -- Events table + CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp_ms INTEGER NOT NULL, + type TEXT NOT NULL, + zone TEXT, + person TEXT, + blob_id INTEGER, + detail_json TEXT, + severity TEXT NOT NULL DEFAULT 'info' + ); + CREATE INDEX IF NOT EXISTS idx_events_ts ON events(timestamp_ms DESC); + CREATE INDEX IF NOT EXISTS idx_events_type ON events(type); + CREATE INDEX IF NOT EXISTS idx_events_zone ON events(zone); + CREATE INDEX IF NOT EXISTS idx_events_person ON events(person); + + -- Events archive + CREATE TABLE IF NOT EXISTS events_archive ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + type TEXT NOT NULL, + timestamp_ms INTEGER NOT NULL, + zone TEXT, + person TEXT, + blob_id INTEGER, + detail_json TEXT, + severity TEXT NOT NULL DEFAULT 'info' + ); + + -- FTS5 table + CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5( + type, zone, person, detail_json, + content='events', content_rowid='id' + ); + + -- FTS5 triggers + CREATE TRIGGER IF NOT EXISTS events_fts_insert AFTER INSERT ON events BEGIN + INSERT INTO events_fts(rowid, type, zone, person, detail_json) + VALUES (new.id, new.type, new.zone, new.person, new.detail_json); + END; + + CREATE TRIGGER IF NOT EXISTS events_fts_delete AFTER DELETE ON events BEGIN + INSERT INTO events_fts(events_fts, rowid, type, zone, person, detail_json) + VALUES ('delete', old.id, old.type, old.zone, old.person, old.detail_json); + END; + + CREATE TRIGGER IF NOT EXISTS events_fts_update AFTER UPDATE ON events BEGIN + INSERT INTO events_fts(events_fts, rowid, type, zone, person, detail_json) + VALUES ('delete', old.id, old.type, old.zone, old.person, old.detail_json); + INSERT INTO events_fts(rowid, type, zone, person, detail_json) + VALUES (new.id, new.type, new.zone, new.person, new.detail_json); + END; + ` + + _, err = db.Exec(schema) + if err != nil { + t.Fatalf("failed to create schema: %v", err) + } + + return db +} + +func TestInsertEvent(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + tests := []struct { + name string + event Event + wantID bool + wantErr bool + checkFunc func(*testing.T, int64, Event) + }{ + { + name: "basic event", + event: Event{ + Type: EventTypeDetection, + Zone: "Kitchen", + Person: "Alice", + DetailJSON: `{"motion":0.15}`, + }, + wantID: true, + }, + { + name: "event with timestamp", + event: Event{ + TimestampMs: 1710000000000, + Type: EventTypeZoneEntry, + Zone: "Living Room", + Person: "Bob", + Severity: SeverityInfo, + }, + wantID: true, + }, + { + name: "event with blob ID", + event: Event{ + Type: EventTypeDetection, + Zone: "Hallway", + BlobID: 42, + DetailJSON: `{"confidence":0.85}`, + }, + wantID: true, + }, + { + name: "alert event", + event: Event{ + Type: EventTypeFallAlert, + Zone: "Bathroom", + Person: "Alice", + Severity: SeverityAlert, + DetailJSON: `{"z_velocity":-1.8}`, + }, + wantID: true, + checkFunc: func(t *testing.T, id int64, e Event) { + // Verify the event was inserted with the alert severity + var severity string + err := db.QueryRow("SELECT severity FROM events WHERE id = ?", id).Scan(&severity) + if err != nil { + t.Fatalf("failed to query severity: %v", err) + } + if severity != string(SeverityAlert) { + t.Errorf("got severity %q, want %q", severity, SeverityAlert) + } + }, + }, + { + name: "event with default severity", + event: Event{ + Type: EventTypeSystem, + DetailJSON: `{"message":"test"}`, + }, + wantID: true, + checkFunc: func(t *testing.T, id int64, e Event) { + // Verify default severity is 'info' + var severity string + err := db.QueryRow("SELECT severity FROM events WHERE id = ?", id).Scan(&severity) + if err != nil { + t.Fatalf("failed to query severity: %v", err) + } + if severity != string(SeverityInfo) { + t.Errorf("got severity %q, want %q", severity, SeverityInfo) + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + id, err := InsertEvent(db, tt.event) + if (err != nil) != tt.wantErr { + t.Errorf("InsertEvent() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.wantID && id == 0 { + t.Error("InsertEvent() returned zero ID, want non-zero") + } + if tt.checkFunc != nil { + tt.checkFunc(t, id, tt.event) + } + }) + } +} + +func TestQueryEvents(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + // Insert test data + now := time.Now().UnixMilli() + testEvents := []Event{ + {Type: EventTypeDetection, Zone: "Kitchen", Person: "Alice", DetailJSON: `{"x":1.0}`, TimestampMs: now}, + {Type: EventTypeZoneEntry, Zone: "Living Room", Person: "Bob", DetailJSON: `{"from":"Hallway"}`, TimestampMs: now - 1000}, + {Type: EventTypeFallAlert, Zone: "Bathroom", Person: "Alice", Severity: SeverityAlert, DetailJSON: `{"z":0.3}`, TimestampMs: now - 2000}, + {Type: EventTypeSystem, Zone: "", Person: "", DetailJSON: `{"message":"test"}`, TimestampMs: now - 3000}, + } + + for _, e := range testEvents { + _, err := InsertEvent(db, e) + if err != nil { + t.Fatalf("failed to insert test event: %v", err) + } + } + + tests := []struct { + name string + params QueryParams + wantCount int + wantMore bool + checkTypes []EventType + checkZones []string + checkPerson string + }{ + { + name: "default limit", + params: QueryParams{}, + wantCount: 4, + wantMore: false, + }, + { + name: "limit 2", + params: QueryParams{ + Limit: 2, + }, + wantCount: 2, + wantMore: true, + }, + { + name: "filter by type", + params: QueryParams{ + Type: EventTypeDetection, + Limit: 10, + }, + wantCount: 1, + wantMore: false, + checkTypes: []EventType{EventTypeDetection}, + }, + { + name: "filter by zone", + params: QueryParams{ + Zone: "Kitchen", + Limit: 10, + }, + wantCount: 1, + wantMore: false, + checkZones: []string{"Kitchen"}, + }, + { + name: "filter by person", + params: QueryParams{ + Person: "Alice", + Limit: 10, + }, + wantCount: 2, + wantMore: false, + checkPerson: "Alice", + }, + { + name: "filter by severity", + params: QueryParams{ + Type: EventTypeFallAlert, + Limit: 10, + }, + wantCount: 1, + }, + { + name: "before_id cursor", + params: QueryParams{ + Limit: 2, + BeforeID: 3, // Should return events with ID < 3 + }, + wantCount: 2, + wantMore: false, + }, + { + name: "after_id cursor", + params: QueryParams{ + Limit: 2, + AfterID: 2, // Should return events with ID > 2 + }, + wantCount: 2, + wantMore: false, + }, + { + name: "time range", + params: QueryParams{ + AfterTime: time.UnixMilli(now - 2500), + BeforeTime: time.UnixMilli(now - 500), + Limit: 10, + }, + wantCount: 2, // Events at now-1000 and now-2000 + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + events, nextCursor, hasMore, err := QueryEvents(db, tt.params) + if err != nil { + t.Fatalf("QueryEvents() error = %v", err) + } + + if len(events) != tt.wantCount { + t.Errorf("QueryEvents() returned %d events, want %d", len(events), tt.wantCount) + } + + if hasMore != tt.wantMore { + t.Errorf("QueryEvents() hasMore = %v, want %v", hasMore, tt.wantMore) + } + + if tt.wantMore && nextCursor == "" { + t.Error("QueryEvents() returned empty nextCursor when hasMore is true") + } + + // Check type filter results + if len(tt.checkTypes) > 0 { + for i, e := range events { + if i >= len(tt.checkTypes) { + break + } + if e.Type != tt.checkTypes[i] { + t.Errorf("events[%d].Type = %v, want %v", i, e.Type, tt.checkTypes[i]) + } + } + } + + // Check zone filter results + if len(tt.checkZones) > 0 { + for i, e := range events { + if i >= len(tt.checkZones) { + break + } + if e.Zone != tt.checkZones[i] { + t.Errorf("events[%d].Zone = %v, want %v", i, e.Zone, tt.checkZones[i]) + } + } + } + + // Check person filter results + if tt.checkPerson != "" { + for _, e := range events { + if e.Person != tt.checkPerson { + t.Errorf("event has person %q, want %q", e.Person, tt.checkPerson) + } + } + } + }) + } +} + +func TestQueryEventsFTS(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + // Insert test data with searchable content + now := time.Now().UnixMilli() + testEvents := []Event{ + {Type: EventTypeFallAlert, Zone: "Kitchen", Person: "Alice", DetailJSON: `{"message":"Alice fell in the kitchen"}`, TimestampMs: now}, + {Type: EventTypeDetection, Zone: "Living Room", Person: "Bob", DetailJSON: `{"message":"Bob walking in living room"}`, TimestampMs: now - 1000}, + {Type: EventTypeSystem, Zone: "", Person: "", DetailJSON: `{"message":"System started"}`, TimestampMs: now - 2000}, + } + + for _, e := range testEvents { + _, err := InsertEvent(db, e) + if err != nil { + t.Fatalf("failed to insert test event: %v", err) + } + } + + // Give FTS5 a moment to process + time.Sleep(10 * time.Millisecond) + + tests := []struct { + name string + search string + wantCount int + }{ + { + name: "search for Alice", + search: "Alice", + wantCount: 1, + }, + { + name: "search for kitchen", + search: "kitchen", + wantCount: 1, + }, + { + name: "search for walking", + search: "walking", + wantCount: 1, + }, + { + name: "search for living room", + search: "living room", + wantCount: 1, + }, + { + name: "search with OR", + search: "Alice OR Bob", + wantCount: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + events, _, _, err := QueryEvents(db, QueryParams{ + SearchQuery: tt.search, + Limit: 10, + }) + if err != nil { + t.Fatalf("QueryEvents() error = %v", err) + } + + if len(events) != tt.wantCount { + t.Errorf("QueryEvents() search=%q returned %d events, want %d", tt.search, len(events), tt.wantCount) + } + }) + } +} + +func TestRunArchiveJob(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + now := time.Now().UnixMilli() + oldCutoff := now - ArchiveDaysMs - 1000 // Older than archive threshold + youngCutoff := now - ArchiveDaysMs + 1000 // Newer than archive threshold + + // Insert old events (should be archived) + oldEvents := []Event{ + {Type: EventTypeDetection, Zone: "Kitchen", DetailJSON: `{"old":1}`, TimestampMs: oldCutoff}, + {Type: EventTypeZoneEntry, Zone: "Living Room", DetailJSON: `{"old":2}`, TimestampMs: oldCutoff - 1000}, + } + + // Insert new events (should NOT be archived) + newEvents := []Event{ + {Type: EventTypeDetection, Zone: "Bedroom", DetailJSON: `{"new":1}`, TimestampMs: youngCutoff}, + {Type: EventTypeSystem, DetailJSON: `{"new":2}`, TimestampMs: now}, + } + + for _, e := range oldEvents { + _, err := InsertEvent(db, e) + if err != nil { + t.Fatalf("failed to insert old event: %v", err) + } + } + + for _, e := range newEvents { + _, err := InsertEvent(db, e) + if err != nil { + t.Fatalf("failed to insert new event: %v", err) + } + } + + // Verify initial state + var countBefore int + err := db.QueryRow("SELECT COUNT(*) FROM events").Scan(&countBefore) + if err != nil { + t.Fatalf("failed to count events before archive: %v", err) + } + if countBefore != 4 { + t.Fatalf("expected 4 events before archive, got %d", countBefore) + } + + // Run archive job + err = RunArchiveJob(db) + if err != nil { + t.Fatalf("RunArchiveJob() error = %v", err) + } + + // Verify events table now has only new events + var countAfter int + err = db.QueryRow("SELECT COUNT(*) FROM events").Scan(&countAfter) + if err != nil { + t.Fatalf("failed to count events after archive: %v", err) + } + if countAfter != 2 { + t.Errorf("events table has %d rows after archive, want 2", countAfter) + } + + // Verify archive table has the old events + var archiveCount int + err = db.QueryRow("SELECT COUNT(*) FROM events_archive").Scan(&archiveCount) + if err != nil { + t.Fatalf("failed to count archived events: %v", err) + } + if archiveCount != 2 { + t.Errorf("events_archive table has %d rows, want 2", archiveCount) + } + + // Verify FTS5 table was updated (old events removed) + var ftsCount int + err = db.QueryRow("SELECT COUNT(*) FROM events_fts").Scan(&ftsCount) + if err != nil { + t.Fatalf("failed to count FTS entries: %v", err) + } + if ftsCount != 2 { + t.Errorf("events_fts table has %d rows after archive, want 2", ftsCount) + } + + // Verify the remaining events are the new ones by type + var types []string + rows, err := db.Query("SELECT type FROM events ORDER BY timestamp_ms DESC") + if err != nil { + t.Fatalf("failed to query remaining events: %v", err) + } + defer rows.Close() + + for rows.Next() { + var eType string + if err := rows.Scan(&eType); err != nil { + t.Fatalf("failed to scan type: %v", err) + } + types = append(types, eType) + } + + if len(types) != 2 { + t.Fatalf("expected 2 remaining events, got %d", len(types)) + } + // Should have one Detection event and one System event + hasDetection := false + hasSystem := false + for _, t := range types { + if t == string(EventTypeDetection) { + hasDetection = true + } + if t == string(EventTypeSystem) { + hasSystem = true + } + } + if !hasDetection || !hasSystem { + t.Errorf("remaining event types are %v, want Detection and System", types) + } +} + +func TestRunArchiveJobEmpty(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + // Run archive job on empty database + err := RunArchiveJob(db) + if err != nil { + t.Fatalf("RunArchiveJob() error = %v", err) + } + + // Verify nothing broke + var count int + err = db.QueryRow("SELECT COUNT(*) FROM events").Scan(&count) + if err != nil { + t.Fatalf("failed to count events: %v", err) + } + if count != 0 { + t.Errorf("events table has %d rows, want 0", count) + } +} + +func TestGetEventByID(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + // Insert a test event + id, err := InsertEvent(db, Event{ + Type: EventTypeDetection, + Zone: "Kitchen", + Person: "Alice", + DetailJSON: `{"test":true}`, + Severity: SeverityInfo, + }) + if err != nil { + t.Fatalf("failed to insert event: %v", err) + } + + tests := []struct { + name string + id int64 + wantErr bool + check func(*testing.T, *Event) + }{ + { + name: "existing event", + id: id, + check: func(t *testing.T, e *Event) { + if e.Type != EventTypeDetection { + t.Errorf("Type = %v, want %v", e.Type, EventTypeDetection) + } + if e.Zone != "Kitchen" { + t.Errorf("Zone = %q, want %q", e.Zone, "Kitchen") + } + if e.Person != "Alice" { + t.Errorf("Person = %q, want %q", e.Person, "Alice") + } + }, + }, + { + name: "non-existent event", + id: 99999, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + event, err := GetEventByID(db, tt.id) + if (err != nil) != tt.wantErr { + t.Errorf("GetEventByID() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr && tt.check != nil { + tt.check(t, event) + } + }) + } +} + +func TestInsertDetectionEvent(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + id, err := InsertDetectionEvent(db, "Kitchen", "Alice", 42, map[string]interface{}{ + "confidence": 0.85, + "position": map[string]interface{}{ + "x": 1.2, + "y": 3.4, + "z": 0.9, + }, + }) + if err != nil { + t.Fatalf("InsertDetectionEvent() error = %v", err) + } + + if id == 0 { + t.Error("InsertDetectionEvent() returned zero ID") + } + + // Verify the event was inserted correctly + var e Event + err = db.QueryRow("SELECT type, zone, person, blob_id, detail_json, severity FROM events WHERE id = ?", id). + Scan(&e.Type, &e.Zone, &e.Person, &e.BlobID, &e.DetailJSON, &e.Severity) + if err != nil { + t.Fatalf("failed to query event: %v", err) + } + + if e.Type != EventTypeDetection { + t.Errorf("Type = %v, want %v", e.Type, EventTypeDetection) + } + if e.Zone != "Kitchen" { + t.Errorf("Zone = %q, want %q", e.Zone, "Kitchen") + } + if e.Person != "Alice" { + t.Errorf("Person = %q, want %q", e.Person, "Alice") + } + if e.BlobID != 42 { + t.Errorf("BlobID = %d, want 42", e.BlobID) + } + if e.Severity != SeverityInfo { + t.Errorf("Severity = %v, want %v", e.Severity, SeverityInfo) + } +} + +func TestInsertAlertEvent(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + id, err := InsertAlertEvent(db, EventTypeFallAlert, "Bathroom", "Alice", SeverityAlert, map[string]interface{}{ + "z_velocity": -1.8, + "z": 0.3, + }) + if err != nil { + t.Fatalf("InsertAlertEvent() error = %v", err) + } + + if id == 0 { + t.Error("InsertAlertEvent() returned zero ID") + } + + // Verify the event was inserted correctly + var severity string + err = db.QueryRow("SELECT severity FROM events WHERE id = ?", id).Scan(&severity) + if err != nil { + t.Fatalf("failed to query event: %v", err) + } + + if severity != string(SeverityAlert) { + t.Errorf("Severity = %q, want %q", severity, SeverityAlert) + } +} + +func TestInsertSystemEvent(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + id, err := InsertSystemEvent(db, "Mothership started", nil) + if err != nil { + t.Fatalf("InsertSystemEvent() error = %v", err) + } + + if id == 0 { + t.Error("InsertSystemEvent() returned zero ID") + } + + // Verify the event was inserted correctly + var eType string + var detailJSON string + err = db.QueryRow("SELECT type, detail_json FROM events WHERE id = ?", id).Scan(&eType, &detailJSON) + if err != nil { + t.Fatalf("failed to query event: %v", err) + } + + if eType != string(EventTypeSystem) { + t.Errorf("Type = %q, want %q", eType, EventTypeSystem) + } +}