From 4af8046acda369fdc04b167b68a5cde992b555b3 Mon Sep 17 00:00:00 2001 From: jedarden Date: Sun, 29 Mar 2026 14:50:36 -0400 Subject: [PATCH] feat(learning): implement detection feedback loop and accuracy tracking - Add feedback_store.go with SQLite storage for detection feedback and accuracy metrics - Add feedback_processor.go for background processing of user feedback - Add accuracy.go for weekly precision/recall/F1 metric computation - Add handler.go with REST API routes for feedback submission and accuracy retrieval - Wire learning package into main_phase6.go with background processing - Add dashboard/js/feedback.js with thumbs-up/down UI components - Add dashboard/js/accuracy.js with accuracy panel rendering and sparkline trends - Add comprehensive tests for feedback storage and accuracy computation Feedback UI provides: - Thumbs-up/down buttons for detection events - Feedback form with false positive/negative options - Missed detection reporting with position/zone selection - Motivational counter showing improvement from user corrections Accuracy panel shows: - Circular gauge with F1 score - Week-over-week trend sparkline - Per-zone breakdown of precision/recall - Total corrections count and improvement percentage Co-Authored-By: Claude Opus 4.6 --- dashboard/index.html | 4 + dashboard/js/accuracy.js | 634 +++++++ dashboard/js/feedback.js | 646 +++++++ mothership/cmd/mothership/main_phase6.go | 1570 +++++++++++++++++ mothership/internal/learning/accuracy.go | 358 ++++ .../internal/learning/feedback_processor.go | 228 +++ .../internal/learning/feedback_store.go | 626 +++++++ mothership/internal/learning/feedback_test.go | 422 +++++ mothership/internal/learning/handler.go | 311 ++++ 9 files changed, 4799 insertions(+) create mode 100644 dashboard/js/accuracy.js create mode 100644 dashboard/js/feedback.js create mode 100644 mothership/cmd/mothership/main_phase6.go create mode 100644 mothership/internal/learning/accuracy.go create mode 100644 mothership/internal/learning/feedback_processor.go create mode 100644 mothership/internal/learning/feedback_store.go create mode 100644 mothership/internal/learning/feedback_test.go create mode 100644 mothership/internal/learning/handler.go diff --git a/dashboard/index.html b/dashboard/index.html index 1eb6161..41bd37b 100644 --- a/dashboard/index.html +++ b/dashboard/index.html @@ -1618,6 +1618,10 @@ + + + +
diff --git a/dashboard/js/accuracy.js b/dashboard/js/accuracy.js new file mode 100644 index 0000000..f059032 --- /dev/null +++ b/dashboard/js/accuracy.js @@ -0,0 +1,634 @@ +/** + * Accuracy Panel for Detection Quality Metrics + * Displays precision, recall, F1 scores, and improvement trends + */ +(function() { + 'use strict'; + + var Accuracy = { + // State + panelVisible: false, + currentData: null, + historyData: null, + improvementData: null, + + // Config + config: { + pollIntervalMs: 60000, // 1 minute + historyWeeks: 8 + }, + + /** + * Initialize the accuracy panel + */ + init: function() { + this.createPanel(); + this.addStyles(); + this.startPolling(); + console.log('[Accuracy] Module initialized'); + }, + + /** + * Create the accuracy panel + */ + createPanel: function() { + var panel = document.createElement('div'); + panel.id = 'accuracy-panel'; + panel.className = 'accuracy-panel'; + panel.style.display = 'none'; + panel.innerHTML = '\ +
\ +

Detection Accuracy

\ + \ +
\ +
\ +
\ +
\ + \ + \ + \ + \ +
\ + --\ + F1 Score\ +
\ +
\ +
\ +
\ +
\ + Precision\ + --\ +
\ +
\ + Recall\ + --\ +
\ +
\ + F1 Score\ + --\ +
\ +
\ +
\ +
\ + You\'ve provided 0 corrections.\ +
\ +
\ +
\ +
\ +
\ + F1 Score Trend (8 weeks)\ +
\ + \ +
\ +
\ +
Per-Zone Breakdown
\ +
\ +
Loading...
\ +
\ +
\ +
\ +
\ + Pending corrections\ + 0\ +
\ +
\ + Processed corrections\ + 0\ +
\ +
\ +
'; + document.body.appendChild(panel); + + // Add toggle button to status bar + this.addToggleButton(); + }, + + /** + * Add toggle button to status bar + */ + addToggleButton: function() { + var statusBar = document.getElementById('status-bar'); + if (!statusBar) return; + + var btn = document.createElement('div'); + btn.className = 'status-item accuracy-toggle'; + btn.id = 'accuracy-toggle'; + btn.innerHTML = '\ +
\ + \ + \ + \ + \ + --\ +
\ + Accuracy'; + btn.onclick = function() { Accuracy.togglePanel(); }; + btn.style.cursor = 'pointer'; + + // Insert after detection-quality + var qualityItem = document.getElementById('detection-quality'); + if (qualityItem && qualityItem.nextSibling) { + statusBar.insertBefore(btn, qualityItem.nextSibling); + } else { + statusBar.appendChild(btn); + } + }, + + /** + * Toggle panel visibility + */ + togglePanel: function() { + var panel = document.getElementById('accuracy-panel'); + if (!panel) return; + + if (this.panelVisible) { + panel.style.display = 'none'; + this.panelVisible = false; + } else { + panel.style.display = 'block'; + this.panelVisible = true; + this.refresh(); + } + }, + + /** + * Start polling for accuracy updates + */ + startPolling: function() { + var self = this; + this.refresh(); + + setInterval(function() { + self.refresh(); + }, this.config.pollIntervalMs); + }, + + /** + * Refresh all accuracy data + */ + refresh: function() { + this.fetchAccuracy(); + this.fetchHistory(); + this.fetchImprovement(); + this.fetchStats(); + }, + + /** + * Fetch current accuracy metrics + */ + fetchAccuracy: function() { + var self = this; + fetch('/api/learning/accuracy') + .then(function(res) { return res.json(); }) + .then(function(data) { + self.currentData = data; + self.updateDisplay(); + }) + .catch(function(err) { + console.error('[Accuracy] Failed to fetch accuracy:', err); + }); + }, + + /** + * Fetch accuracy history for sparkline + */ + fetchHistory: function() { + var self = this; + fetch('/api/learning/accuracy/history?weeks=' + this.config.historyWeeks) + .then(function(res) { return res.json(); }) + .then(function(data) { + self.historyData = data; + self.drawSparkline(); + }) + .catch(function(err) { + console.error('[Accuracy] Failed to fetch history:', err); + }); + }, + + /** + * Fetch improvement statistics + */ + fetchImprovement: function() { + var self = this; + fetch('/api/learning/accuracy/improvement') + .then(function(res) { return res.json(); }) + .then(function(data) { + self.improvementData = data; + self.updateMotivation(); + }) + .catch(function(err) { + console.error('[Accuracy] Failed to fetch improvement:', err); + }); + }, + + /** + * Fetch feedback stats + */ + fetchStats: function() { + fetch('/api/learning/stats') + .then(function(res) { return res.json(); }) + .then(function(data) { + document.getElementById('unprocessed-count').textContent = data.unprocessed_count || 0; + document.getElementById('processed-count').textContent = data.processed_count || 0; + }) + .catch(function(err) { + console.error('[Accuracy] Failed to fetch stats:', err); + }); + }, + + /** + * Update display with current data + */ + updateDisplay: function() { + if (!this.currentData) return; + + var precision = this.currentData.precision; + var recall = this.currentData.recall; + var f1 = this.currentData.f1; + + // Update metrics + document.getElementById('accuracy-precision').textContent = this.formatPercent(precision); + document.getElementById('accuracy-recall').textContent = this.formatPercent(recall); + document.getElementById('accuracy-f1').textContent = this.formatPercent(f1); + + // Update gauge + var gaugeValue = document.querySelector('.gauge-number'); + if (gaugeValue) { + gaugeValue.textContent = this.formatPercent(f1); + } + + // Update gauge fill + var gaugeFill = document.querySelector('.gauge-fill'); + if (gaugeFill && f1 !== null) { + var circumference = 2 * Math.PI * 40; + var offset = circumference * (1 - f1); + gaugeFill.style.strokeDasharray = (circumference - offset) + ' ' + circumference; + gaugeFill.style.stroke = this.getColorForScore(f1); + } + + // Update mini gauge in status bar + this.updateMiniGauge(f1); + }, + + /** + * Update mini gauge in status bar + */ + updateMiniGauge: function(f1) { + var miniFill = document.querySelector('.mini-gauge-fill'); + var miniValue = document.querySelector('.mini-gauge-value'); + + if (miniFill && f1 !== null) { + var circumference = 2 * Math.PI * 12; + var offset = circumference * (1 - f1); + miniFill.style.strokeDasharray = (circumference - offset) + ' ' + circumference; + miniFill.style.stroke = this.getColorForScore(f1); + } + + if (miniValue) { + miniValue.textContent = this.formatPercent(f1); + } + }, + + /** + * Update motivation section + */ + updateMotivation: function() { + if (!this.improvementData) return; + + var feedbackCount = document.getElementById('feedback-count'); + var improvementText = document.getElementById('improvement-text'); + + if (feedbackCount) { + feedbackCount.textContent = this.improvementData.total_feedback || 0; + } + + if (improvementText) { + var improvement = this.improvementData.improvement_pct || 0; + if (improvement > 0) { + improvementText.innerHTML = 'Accuracy improved ' + + improvement.toFixed(0) + '% this week!'; + } else if (improvement < 0) { + improvementText.innerHTML = 'Accuracy decreased ' + + Math.abs(improvement).toFixed(0) + '% this week.'; + } else { + improvementText.innerHTML = 'Keep providing feedback to improve accuracy!'; + } + } + }, + + /** + * Draw sparkline for accuracy history + */ + drawSparkline: function() { + var canvas = document.getElementById('accuracy-sparkline'); + if (!canvas || !this.historyData || this.historyData.length === 0) return; + + var ctx = canvas.getContext('2d'); + var width = canvas.width; + var height = canvas.height; + var padding = 4; + + ctx.clearRect(0, 0, width, height); + + // Sort by week + var data = this.historyData.slice().sort(function(a, b) { + return a.week.localeCompare(b.week); + }); + + if (data.length < 2) { + ctx.fillStyle = '#666'; + ctx.font = '11px sans-serif'; + ctx.textAlign = 'center'; + ctx.fillText('Need more data...', width / 2, height / 2); + return; + } + + // Get F1 values + var values = data.map(function(d) { return d.f1 || 0; }); + var min = Math.min.apply(null, values); + var max = Math.max.apply(null, values); + if (max === min) max = min + 0.1; + + // Draw line + ctx.beginPath(); + ctx.strokeStyle = '#4fc3f7'; + ctx.lineWidth = 2; + + var stepX = (width - padding * 2) / (data.length - 1); + + for (var i = 0; i < data.length; i++) { + var x = padding + i * stepX; + var y = height - padding - ((values[i] - min) / (max - min)) * (height - padding * 2); + + if (i === 0) { + ctx.moveTo(x, y); + } else { + ctx.lineTo(x, y); + } + } + + ctx.stroke(); + + // Draw points + ctx.fillStyle = '#4fc3f7'; + for (var i = 0; i < data.length; i++) { + var x = padding + i * stepX; + var y = height - padding - ((values[i] - min) / (max - min)) * (height - padding * 2); + ctx.beginPath(); + ctx.arc(x, y, 3, 0, Math.PI * 2); + ctx.fill(); + } + }, + + /** + * Format a decimal as percentage + */ + formatPercent: function(value) { + if (value === null || value === undefined) return '--'; + return (value * 100).toFixed(0) + '%'; + }, + + /** + * Get color for a score (0-1) + */ + getColorForScore: function(score) { + if (score >= 0.8) return '#66bb6a'; + if (score >= 0.6) return '#ffa726'; + return '#ef5350'; + }, + + /** + * Add CSS styles + */ + addStyles: function() { + if (document.getElementById('accuracy-styles')) return; + + var style = document.createElement('style'); + style.id = 'accuracy-styles'; + style.textContent = '\ + .accuracy-panel {\ + position: fixed;\ + top: 60px;\ + right: 20px;\ + width: 300px;\ + max-height: calc(100vh - 80px);\ + background: rgba(0, 0, 0, 0.9);\ + border-radius: 8px;\ + box-shadow: 0 4px 20px rgba(0, 0, 0, 0.5);\ + z-index: 150;\ + overflow-y: auto;\ + }\ + .accuracy-header {\ + display: flex;\ + justify-content: space-between;\ + align-items: center;\ + padding: 12px 16px;\ + border-bottom: 1px solid rgba(255, 255, 255, 0.1);\ + }\ + .accuracy-header h3 {\ + font-size: 14px;\ + color: #888;\ + text-transform: uppercase;\ + letter-spacing: 1px;\ + margin: 0;\ + }\ + .accuracy-close {\ + background: none;\ + border: none;\ + color: #888;\ + font-size: 20px;\ + cursor: pointer;\ + }\ + .accuracy-close:hover { color: #fff; }\ + .accuracy-content {\ + padding: 16px;\ + }\ + .accuracy-gauge-section {\ + display: flex;\ + justify-content: center;\ + margin-bottom: 16px;\ + }\ + .accuracy-gauge-container {\ + position: relative;\ + width: 120px;\ + height: 120px;\ + }\ + .accuracy-gauge {\ + width: 100%;\ + height: 100%;\ + transform: rotate(-90deg);\ + }\ + .gauge-bg {\ + fill: none;\ + stroke: rgba(255, 255, 255, 0.1);\ + stroke-width: 8;\ + }\ + .gauge-fill {\ + fill: none;\ + stroke: #66bb6a;\ + stroke-width: 8;\ + stroke-linecap: round;\ + stroke-dasharray: 0 251;\ + transition: stroke-dasharray 0.5s, stroke 0.3s;\ + }\ + .gauge-value {\ + position: absolute;\ + top: 50%;\ + left: 50%;\ + transform: translate(-50%, -50%);\ + text-align: center;\ + }\ + .gauge-number {\ + display: block;\ + font-size: 24px;\ + font-weight: 600;\ + color: #fff;\ + }\ + .gauge-label {\ + display: block;\ + font-size: 10px;\ + color: #888;\ + text-transform: uppercase;\ + }\ + .accuracy-metrics {\ + display: flex;\ + justify-content: space-around;\ + padding: 12px 0;\ + border-top: 1px solid rgba(255, 255, 255, 0.1);\ + border-bottom: 1px solid rgba(255, 255, 255, 0.1);\ + margin-bottom: 12px;\ + }\ + .metric-item {\ + text-align: center;\ + }\ + .metric-label {\ + display: block;\ + font-size: 10px;\ + color: #888;\ + margin-bottom: 2px;\ + }\ + .metric-value {\ + font-size: 16px;\ + font-weight: 600;\ + color: #eee;\ + }\ + .accuracy-motivation {\ + background: rgba(79, 195, 247, 0.1);\ + border-radius: 6px;\ + padding: 10px 12px;\ + margin-bottom: 12px;\ + text-align: center;\ + }\ + .motivation-text {\ + font-size: 12px;\ + color: #bbb;\ + }\ + .motivation-text span {\ + font-weight: 600;\ + color: #4fc3f7;\ + }\ + .motivation-improvement {\ + font-size: 11px;\ + margin-top: 4px;\ + }\ + .improvement-positive { color: #66bb6a; }\ + .improvement-negative { color: #ef5350; }\ + .improvement-neutral { color: #888; }\ + .accuracy-trend-section {\ + margin-bottom: 12px;\ + }\ + .trend-header {\ + font-size: 11px;\ + color: #888;\ + margin-bottom: 8px;\ + }\ + #accuracy-sparkline {\ + width: 100%;\ + background: rgba(255, 255, 255, 0.03);\ + border-radius: 4px;\ + }\ + .accuracy-breakdown {\ + margin-bottom: 12px;\ + }\ + .breakdown-header {\ + font-size: 11px;\ + color: #888;\ + margin-bottom: 8px;\ + }\ + .zone-breakdown {\ + display: flex;\ + flex-direction: column;\ + gap: 4px;\ + }\ + .zone-item {\ + display: flex;\ + justify-content: space-between;\ + align-items: center;\ + font-size: 11px;\ + padding: 4px 8px;\ + background: rgba(255, 255, 255, 0.03);\ + border-radius: 3px;\ + }\ + .zone-name {\ + color: #bbb;\ + }\ + .zone-score {\ + font-weight: 500;\ + }\ + .accuracy-stats {\ + font-size: 11px;\ + }\ + .stats-row {\ + display: flex;\ + justify-content: space-between;\ + padding: 4px 0;\ + color: #888;\ + }\ + .stats-row span:last-child {\ + color: #ccc;\ + }\ + .accuracy-toggle {\ + padding: 2px 10px;\ + background: rgba(255, 255, 255, 0.05);\ + border-radius: 4px;\ + }\ + .accuracy-toggle:hover {\ + background: rgba(255, 255, 255, 0.1);\ + }\ + .accuracy-mini-gauge {\ + position: relative;\ + width: 32px;\ + height: 32px;\ + }\ + .accuracy-mini-gauge svg {\ + width: 32px;\ + height: 32px;\ + }\ + .mini-gauge-value {\ + position: absolute;\ + top: 50%;\ + left: 50%;\ + transform: translate(-50%, -50%);\ + font-size: 8px;\ + font-weight: 600;\ + color: #ccc;\ + }\ + .accuracy-label {\ + font-size: 11px;\ + color: #888;\ + }'; + document.head.appendChild(style); + } + }; + + // Expose globally + window.Accuracy = Accuracy; + + // Initialize when DOM is ready + if (document.readyState === 'loading') { + document.addEventListener('DOMContentLoaded', function() { Accuracy.init(); }); + } else { + Accuracy.init(); + } +})(); diff --git a/dashboard/js/feedback.js b/dashboard/js/feedback.js new file mode 100644 index 0000000..aed91da --- /dev/null +++ b/dashboard/js/feedback.js @@ -0,0 +1,646 @@ +/** + * Feedback UI Components for Detection Accuracy + * Provides thumbs-up/down buttons, feedback forms, and missed detection reporting + */ +(function() { + 'use strict'; + + var Feedback = { + // State + pendingFeedback: null, + feedbackPanelVisible: false, + + // Event types + EventTypes: { + BLOB_DETECTION: 'blob_detection', + ZONE_TRANSITION: 'zone_transition', + FALL_ALERT: 'fall_alert', + ANOMALY: 'anomaly' + }, + + // Feedback types + FeedbackTypes: { + TRUE_POSITIVE: 'TRUE_POSITIVE', + FALSE_POSITIVE: 'FALSE_POSITIVE', + FALSE_NEGATIVE: 'FALSE_NEGATIVE', + WRONG_IDENTITY: 'WRONG_IDENTITY', + WRONG_ZONE: 'WRONG_ZONE' + }, + + /** + * Initialize the feedback module + */ + init: function() { + this.createFeedbackPanel(); + this.createMissedDetectionButton(); + console.log('[Feedback] Module initialized'); + }, + + /** + * Create the feedback panel (hidden by default) + */ + createFeedbackPanel: function() { + var panel = document.createElement('div'); + panel.id = 'feedback-panel'; + panel.className = 'feedback-panel'; + panel.style.display = 'none'; + panel.innerHTML = '\ + \ + '; + document.body.appendChild(panel); + + // Add styles + this.addStyles(); + }, + + /** + * Create the "Report missed detection" button + */ + createMissedDetectionButton: function() { + var btn = document.createElement('button'); + btn.id = 'missed-detection-btn'; + btn.className = 'missed-detection-btn'; + btn.innerHTML = '⚠ Report missed detection'; + btn.onclick = function() { Feedback.showMissedDetectionForm(); }; + document.body.appendChild(btn); + }, + + /** + * Show feedback panel for a specific event (thumbs-down clicked) + */ + showFeedbackPanel: function(eventID, eventType, eventTime, details) { + this.pendingFeedback = { + eventID: eventID, + eventType: eventType, + eventTime: eventTime, + details: details || {} + }; + + var panel = document.getElementById('feedback-panel'); + if (!panel) return; + + // Update event info + panel.querySelector('.feedback-event-type').textContent = this.formatEventType(eventType); + panel.querySelector('.feedback-event-time').textContent = eventTime ? new Date(eventTime).toLocaleString() : 'Now'; + + // Reset form + var radios = panel.querySelectorAll('input[name="feedback-type"]'); + radios.forEach(function(r) { r.checked = false; }); + document.getElementById('feedback-notes').value = ''; + + // Show panel + panel.style.display = 'block'; + this.feedbackPanelVisible = true; + }, + + /** + * Hide the feedback panel + */ + hideFeedbackPanel: function() { + var panel = document.getElementById('feedback-panel'); + if (panel) { + panel.style.display = 'none'; + } + this.pendingFeedback = null; + this.feedbackPanelVisible = false; + }, + + /** + * Submit feedback to the server + */ + submitFeedback: function() { + if (!this.pendingFeedback) return; + + var panel = document.getElementById('feedback-panel'); + var selected = panel.querySelector('input[name="feedback-type"]:checked'); + + if (!selected) { + alert('Please select what was wrong with this detection.'); + return; + } + + var feedbackType = selected.value; + var notes = document.getElementById('feedback-notes').value; + + var details = Object.assign({}, this.pendingFeedback.details); + if (notes) { + details.notes = notes; + } + + this.sendFeedback( + this.pendingFeedback.eventID, + this.pendingFeedback.eventType, + feedbackType, + details + ); + + this.hideFeedbackPanel(); + }, + + /** + * Submit thumbs-up (true positive) feedback + */ + submitThumbsUp: function(eventID, eventType, details) { + this.sendFeedback(eventID, eventType, this.FeedbackTypes.TRUE_POSITIVE, details || {}); + }, + + /** + * Send feedback to the API + */ + sendFeedback: function(eventID, eventType, feedbackType, details) { + var data = { + event_id: eventID || '', + event_type: eventType, + feedback_type: feedbackType, + details: details + }; + + fetch('/api/learning/feedback', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(data) + }) + .then(function(res) { return res.json(); }) + .then(function(result) { + if (result.success) { + console.log('[Feedback] Submitted:', feedbackType, 'for event', eventID); + if (window.SpaxelApp && window.SpaxelApp.showToast) { + window.SpaxelApp.showToast('Thank you for your feedback!', 'success'); + } + } + }) + .catch(function(err) { + console.error('[Feedback] Failed to submit:', err); + }); + }, + + /** + * Show missed detection form + */ + showMissedDetectionForm: function() { + var modal = document.createElement('div'); + modal.id = 'missed-detection-modal'; + modal.className = 'missed-detection-modal'; + modal.innerHTML = '\ +
\ +
\ +

Report Missed Detection

\ + \ +
\ +
\ +
\ + \ + \ +
\ +
\ + \ + \ +
\ +
\ + \ +
\ + \ + \ + \ +
\ +
\ +
\ + \ + \ +
\ +
\ + \ + \ +
\ +
\ +
'; + + document.body.appendChild(modal); + + // Set default time to now + var timeInput = document.getElementById('missed-time'); + var now = new Date(); + timeInput.value = now.toISOString().slice(0, 16); + + // Populate zones + this.populateZoneSelector(document.getElementById('missed-zone')); + }, + + /** + * Close the missed detection modal + */ + closeMissedDetectionModal: function() { + var modal = document.getElementById('missed-detection-modal'); + if (modal) { + modal.remove(); + } + }, + + /** + * Submit missed detection report + */ + submitMissedDetection: function() { + var timeStr = document.getElementById('missed-time').value; + var zoneID = document.getElementById('missed-zone').value; + var posX = parseFloat(document.getElementById('missed-x').value) || 0; + var posY = parseFloat(document.getElementById('missed-y').value) || 0; + var posZ = parseFloat(document.getElementById('missed-z').value) || 0; + var notes = document.getElementById('missed-notes').value; + + var details = { + zone_id: zoneID, + position_x: posX, + position_y: posY, + position_z: posZ, + user_reported: true + }; + + if (notes) { + details.notes = notes; + } + + this.sendFeedback( + '', // No event ID for missed detections + this.EventTypes.BLOB_DETECTION, + this.FeedbackTypes.FALSE_NEGATIVE, + details + ); + + this.closeMissedDetectionModal(); + }, + + /** + * Populate zone selector from API + */ + populateZoneSelector: function(select) { + if (!select) return; + + fetch('/api/zones') + .then(function(res) { return res.json(); }) + .then(function(zones) { + zones = zones || []; + zones.forEach(function(zone) { + var opt = document.createElement('option'); + opt.value = zone.id; + opt.textContent = zone.name || zone.id; + select.appendChild(opt); + }); + }) + .catch(function(err) { + console.error('[Feedback] Failed to load zones:', err); + }); + }, + + /** + * Create thumbs up/down buttons for an event + */ + createFeedbackButtons: function(eventID, eventType, eventTime, details) { + var container = document.createElement('div'); + container.className = 'feedback-buttons'; + + var thumbsUp = document.createElement('button'); + thumbsUp.className = 'feedback-btn-icon feedback-thumbs-up'; + thumbsUp.innerHTML = '👍'; + thumbsUp.title = 'Correct detection'; + thumbsUp.onclick = function(e) { + e.stopPropagation(); + Feedback.submitThumbsUp(eventID, eventType, details); + }; + + var thumbsDown = document.createElement('button'); + thumbsDown.className = 'feedback-btn-icon feedback-thumbs-down'; + thumbsDown.innerHTML = '👎'; + thumbsDown.title = 'Incorrect detection'; + thumbsDown.onclick = function(e) { + e.stopPropagation(); + Feedback.showFeedbackPanel(eventID, eventType, eventTime, details); + }; + + container.appendChild(thumbsUp); + container.appendChild(thumbsDown); + + return container; + }, + + /** + * Format event type for display + */ + formatEventType: function(eventType) { + var types = { + 'blob_detection': 'Detection', + 'zone_transition': 'Zone Change', + 'fall_alert': 'Fall Alert', + 'anomaly': 'Anomaly' + }; + return types[eventType] || eventType; + }, + + /** + * Add CSS styles for feedback UI + */ + addStyles: function() { + if (document.getElementById('feedback-styles')) return; + + var style = document.createElement('style'); + style.id = 'feedback-styles'; + style.textContent = '\ + .feedback-panel {\ + position: fixed;\ + bottom: 100px;\ + right: 20px;\ + width: 320px;\ + background: rgba(0, 0, 0, 0.95);\ + border-radius: 8px;\ + box-shadow: 0 4px 20px rgba(0, 0, 0, 0.5);\ + z-index: 200;\ + font-size: 13px;\ + }\ + .feedback-header {\ + display: flex;\ + justify-content: space-between;\ + align-items: center;\ + padding: 12px 16px;\ + border-bottom: 1px solid rgba(255, 255, 255, 0.1);\ + }\ + .feedback-title {\ + font-weight: 600;\ + color: #eee;\ + }\ + .feedback-close {\ + background: none;\ + border: none;\ + color: #888;\ + font-size: 20px;\ + cursor: pointer;\ + }\ + .feedback-close:hover { color: #fff; }\ + .feedback-content {\ + padding: 16px;\ + }\ + .feedback-event-info {\ + display: flex;\ + justify-content: space-between;\ + margin-bottom: 12px;\ + font-size: 11px;\ + color: #888;\ + }\ + .feedback-question {\ + font-size: 13px;\ + color: #ccc;\ + margin-bottom: 10px;\ + }\ + .feedback-options {\ + display: flex;\ + flex-direction: column;\ + gap: 8px;\ + margin-bottom: 16px;\ + }\ + .feedback-option {\ + display: flex;\ + align-items: center;\ + gap: 8px;\ + cursor: pointer;\ + padding: 6px 8px;\ + border-radius: 4px;\ + transition: background 0.2s;\ + }\ + .feedback-option:hover {\ + background: rgba(255, 255, 255, 0.05);\ + }\ + .feedback-option input {\ + margin: 0;\ + }\ + .feedback-option span {\ + color: #bbb;\ + font-size: 12px;\ + }\ + .feedback-notes {\ + margin-bottom: 16px;\ + }\ + .feedback-notes label {\ + display: block;\ + font-size: 11px;\ + color: #888;\ + margin-bottom: 4px;\ + }\ + .feedback-notes textarea {\ + width: 100%;\ + height: 60px;\ + background: rgba(255, 255, 255, 0.08);\ + border: 1px solid rgba(255, 255, 255, 0.15);\ + border-radius: 4px;\ + color: #eee;\ + font-size: 12px;\ + padding: 8px;\ + resize: none;\ + box-sizing: border-box;\ + }\ + .feedback-actions {\ + display: flex;\ + justify-content: flex-end;\ + gap: 8px;\ + }\ + .feedback-btn {\ + padding: 6px 14px;\ + border-radius: 4px;\ + font-size: 12px;\ + cursor: pointer;\ + border: none;\ + }\ + .feedback-btn-cancel {\ + background: rgba(255, 255, 255, 0.1);\ + color: #ccc;\ + }\ + .feedback-btn-submit {\ + background: #4fc3f7;\ + color: #1a1a2e;\ + font-weight: 500;\ + }\ + .feedback-btn-icon {\ + background: rgba(255, 255, 255, 0.1);\ + border: none;\ + width: 28px;\ + height: 28px;\ + border-radius: 4px;\ + cursor: pointer;\ + font-size: 14px;\ + display: flex;\ + align-items: center;\ + justify-content: center;\ + transition: background 0.2s;\ + }\ + .feedback-btn-icon:hover {\ + background: rgba(255, 255, 255, 0.2);\ + }\ + .feedback-thumbs-up:hover {\ + background: rgba(76, 175, 80, 0.3);\ + }\ + .feedback-thumbs-down:hover {\ + background: rgba(244, 67, 54, 0.3);\ + }\ + .feedback-buttons {\ + display: inline-flex;\ + gap: 4px;\ + }\ + .missed-detection-btn {\ + position: fixed;\ + bottom: 20px;\ + right: 440px;\ + background: rgba(255, 167, 38, 0.2);\ + border: 1px solid rgba(255, 167, 38, 0.5);\ + color: #ffa726;\ + padding: 6px 12px;\ + border-radius: 4px;\ + font-size: 11px;\ + cursor: pointer;\ + z-index: 100;\ + transition: background 0.2s;\ + }\ + .missed-detection-btn:hover {\ + background: rgba(255, 167, 38, 0.3);\ + }\ + .missed-detection-modal {\ + position: fixed;\ + top: 0;\ + left: 0;\ + right: 0;\ + bottom: 0;\ + background: rgba(0, 0, 0, 0.8);\ + display: flex;\ + align-items: center;\ + justify-content: center;\ + z-index: 300;\ + }\ + .missed-detection-card {\ + background: #1e1e3a;\ + border-radius: 12px;\ + padding: 24px;\ + width: 400px;\ + max-width: 90%;\ + }\ + .missed-detection-header {\ + display: flex;\ + justify-content: space-between;\ + align-items: center;\ + margin-bottom: 16px;\ + }\ + .missed-detection-header h2 {\ + font-size: 16px;\ + color: #eee;\ + margin: 0;\ + }\ + .modal-close {\ + background: none;\ + border: none;\ + color: #888;\ + font-size: 24px;\ + cursor: pointer;\ + }\ + .modal-close:hover { color: #fff; }\ + .missed-detection-form .form-group {\ + margin-bottom: 14px;\ + }\ + .missed-detection-form label {\ + display: block;\ + font-size: 12px;\ + color: #888;\ + margin-bottom: 4px;\ + }\ + .missed-detection-form input,\ + .missed-detection-form select,\ + .missed-detection-form textarea {\ + width: 100%;\ + padding: 8px 10px;\ + background: rgba(255, 255, 255, 0.08);\ + border: 1px solid rgba(255, 255, 255, 0.15);\ + border-radius: 4px;\ + color: #eee;\ + font-size: 13px;\ + box-sizing: border-box;\ + }\ + .position-inputs {\ + display: flex;\ + gap: 8px;\ + }\ + .position-inputs input {\ + flex: 1;\ + }\ + .missed-detection-form textarea {\ + height: 60px;\ + resize: none;\ + }\ + .form-actions {\ + display: flex;\ + justify-content: flex-end;\ + gap: 10px;\ + margin-top: 20px;\ + }\ + .btn {\ + padding: 8px 16px;\ + border-radius: 4px;\ + font-size: 13px;\ + cursor: pointer;\ + border: none;\ + }\ + .btn-secondary {\ + background: rgba(255, 255, 255, 0.1);\ + color: #ccc;\ + }\ + .btn-primary {\ + background: #4fc3f7;\ + color: #1a1a2e;\ + font-weight: 500;\ + }'; + document.head.appendChild(style); + } + }; + + // Expose globally + window.Feedback = Feedback; + + // Initialize when DOM is ready + if (document.readyState === 'loading') { + document.addEventListener('DOMContentLoaded', function() { Feedback.init(); }); + } else { + Feedback.init(); + } +})(); diff --git a/mothership/cmd/mothership/main_phase6.go b/mothership/cmd/mothership/main_phase6.go new file mode 100644 index 0000000..a5eb8a9 --- /dev/null +++ b/mothership/cmd/mothership/main_phase6.go @@ -0,0 +1,1570 @@ +// Package main provides the mothership entry point +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "math" + "net" + "net/http" + "os" + "os/signal" + "path/filepath" + "strconv" + "syscall" + "time" + + "github.com/go-chi/chi" + "github.com/go-chi/chi/middleware" + "github.com/hashicorp/mdns" + "github.com/spaxel/mothership/internal/analytics" + "github.com/spaxel/mothership/internal/automation" + "github.com/spaxel/mothership/internal/ble" + "github.com/spaxel/mothership/internal/dashboard" + "github.com/spaxel/mothership/internal/diagnostics" + "github.com/spaxel/mothership/internal/falldetect" + "github.com/spaxel/mothership/internal/fleet" + "github.com/spaxel/mothership/internal/ingestion" + "github.com/spaxel/mothership/internal/learning" + "github.com/spaxel/mothership/internal/mqtt" + "github.com/spaxel/mothership/internal/notify" + "github.com/spaxel/mothership/internal/ota" + "github.com/spaxel/mothership/internal/provisioning" + "github.com/spaxel/mothership/internal/recorder" + "github.com/spaxel/mothership/internal/replay" + "github.com/spaxel/mothership/internal/zones" + sigproc "github.com/spaxel/mothership/internal/signal" +) + +// Phase 5: Configuration constants +const ( + baselineSaveInterval = 30 * time.Second + healthComputeInterval = 5 * time.Second + weatherRecordInterval = 60 * time.Second +) + +// Build-time version injection +var version = "dev" + +// Config holds application configuration +type Config struct { + BindAddr string + DataDir string + StaticDir string + MDNSName string + MDNSEnabled bool + LogLevel string + ReplayMaxMB int + + // MQTT configuration + MQTTBroker string + MQTTClientID string + MQTTUsername string + MQTTPassword string +} + +func main() { + cfg := parseConfig() + log.Printf("[INFO] Spaxel mothership v%s starting", version) + log.Printf("[DEBUG] Config: bind=%s data=%s static=%s mdns=%s", cfg.BindAddr, cfg.DataDir, cfg.StaticDir, cfg.MDNSName) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) + + r := chi.NewRouter() + r.Use(middleware.Logger) + r.Use(middleware.Recoverer) + + r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, `{"status":"ok","version":"%s"}`, version) + }) + + // Create ingestion server + ingestSrv := ingestion.NewServer() + r.HandleFunc("/ws/node", ingestSrv.HandleNodeWS) + + // Signal processing pipeline + pm := sigproc.NewProcessorManager(sigproc.ProcessorManagerConfig{ + NSub: 64, + FusionRate: 10.0, + Tau: 30.0, + }) + ingestSrv.SetProcessorManager(pm) + + // Replay recording store + if err := os.MkdirAll(cfg.DataDir, 0755); err != nil { + log.Printf("[WARN] Failed to create data dir %s: %v", cfg.DataDir, err) + } else { + store, err := replay.NewRecordingStore(filepath.Join(cfg.DataDir, "csi_replay.bin"), cfg.ReplayMaxMB) + if err != nil { + log.Printf("[WARN] Failed to open replay store: %v (CSI recording disabled)", err) + } else { + ingestSrv.SetReplayStore(store) + defer store.Close() + log.Printf("[INFO] CSI replay store at %s (%d MB max)", filepath.Join(cfg.DataDir, "csi_replay.bin"), cfg.ReplayMaxMB) + } + } + + // Per-link CSI recorder + recorderDir := filepath.Join(cfg.DataDir, "csi") + recMgr, err := recorder.NewManager(recorder.DefaultConfig(recorderDir)) + if err != nil { + log.Printf("[WARN] Failed to create recorder: %v (per-link recording disabled)", err) + } else { + ingestSrv.SetRecorder(recMgr) + defer recMgr.Close() + log.Printf("[INFO] Per-link CSI recorder at %s (retention=%dh, max=%dMB/link)", + recorderDir, recorder.DefaultConfig(recorderDir).RetentionHours, + recorder.DefaultConfig(recorderDir).MaxBytesPerLink/1<<20) + } + + // Fleet node registry + fleetReg, err := fleet.NewRegistry(filepath.Join(cfg.DataDir, "fleet.db")) + if err != nil { + log.Fatalf("[FATAL] Failed to open fleet registry: %v", err) + } + defer fleetReg.Close() + log.Printf("[INFO] Fleet registry at %s", filepath.Join(cfg.DataDir, "fleet.db")) + + // Phase 6: BLE device registry + bleRegistry, err := ble.NewRegistry(filepath.Join(cfg.DataDir, "ble.db")) + if err != nil { + log.Printf("[WARN] Failed to open BLE registry: %v", err) + } else { + defer bleRegistry.Close() + log.Printf("[INFO] BLE registry at %s", filepath.Join(cfg.DataDir, "ble.db")) + } + + // Phase 6: RSSI cache for BLE triangulation + rssiCache := ble.NewRSSICache(10 * time.Second) + + // Phase 6: BLE identity matcher + var identityMatcher *ble.IdentityMatcher + if bleRegistry != nil { + identityMatcher = ble.NewIdentityMatcher(bleRegistry, rssiCache, fleetReg) + } + + // Phase 6: Zones manager + zonesMgr, err := zones.NewManager(filepath.Join(cfg.DataDir, "zones.db")) + if err != nil { + log.Printf("[WARN] Failed to open zones database: %v", err) + } else { + defer zonesMgr.Close() + log.Printf("[INFO] Zones manager at %s", filepath.Join(cfg.DataDir, "zones.db")) + } + + // Phase 6: Flow analytics accumulator + flowAccumulator, err := analytics.NewFlowAccumulator(filepath.Join(cfg.DataDir, "analytics.db")) + if err != nil { + log.Printf("[WARN] Failed to open analytics database: %v", err) + } else { + defer flowAccumulator.Close() + log.Printf("[INFO] Flow analytics at %s", filepath.Join(cfg.DataDir, "analytics.db")) + } + + // Phase 6: Automation engine + automationEngine, err := automation.NewEngine(filepath.Join(cfg.DataDir, "automation.db")) + if err != nil { + log.Printf("[WARN] Failed to open automation database: %v", err) + } else { + defer automationEngine.Close() + log.Printf("[INFO] Automation engine at %s", filepath.Join(cfg.DataDir, "automation.db")) + } + + // Phase 6: Fall detector + fallDetector := falldetect.NewDetector() + log.Printf("[INFO] Fall detector initialized") + + // Phase 6: Notification service + notifyService, err := notify.NewService(filepath.Join(cfg.DataDir, "notify.db")) + if err != nil { + log.Printf("[WARN] Failed to open notification database: %v", err) + } else { + defer notifyService.Close() + log.Printf("[INFO] Notification service at %s", filepath.Join(cfg.DataDir, "notify.db")) + + // Set room config provider for floor plan thumbnails + notifyService.SetRoomConfig(fleetReg) + } + + // Phase 6: Learning feedback store for detection accuracy + var feedbackStore *learning.FeedbackStore + var feedbackProcessor *learning.Processor + var accuracyComputer *learning.AccuracyComputer + feedbackStore, err = learning.NewFeedbackStore(filepath.Join(cfg.DataDir, "learning.db")) + if err != nil { + log.Printf("[WARN] Failed to open learning database: %v", err) + } else { + defer feedbackStore.Close() + log.Printf("[INFO] Learning feedback store at %s", filepath.Join(cfg.DataDir, "learning.db")) + + // Create feedback processor + feedbackProcessor = learning.NewProcessor(feedbackStore, learning.DefaultProcessorConfig()) + + // Create accuracy computer + accuracyComputer = learning.NewAccuracyComputer(feedbackStore, learning.DefaultAccuracyComputerConfig()) + + // Start background processing + go feedbackProcessor.Run(ctx) + go accuracyComputer.Run(ctx) + log.Printf("[INFO] Learning feedback processor started (interval: %v)", learning.DefaultProcessorConfig().ProcessInterval) + } + + // Phase 6: MQTT client (optional) + var mqttClient *mqtt.Client + if cfg.MQTTBroker != "" { + mqttClient, err = mqtt.NewClient(mqtt.Config{ + Broker: cfg.MQTTBroker, + ClientID: cfg.MQTTClientID, + Username: cfg.MQTTUsername, + Password: cfg.MQTTPassword, + DiscoveryEnabled: true, + DiscoveryPrefix: "homeassistant", + AutoReconnect: true, + }) + if err != nil { + log.Printf("[WARN] Failed to create MQTT client: %v", err) + } else { + if err := mqttClient.Connect(ctx); err != nil { + log.Printf("[WARN] MQTT connection failed: %v", err) + } else { + defer mqttClient.Disconnect() + log.Printf("[INFO] MQTT client connected to %s", cfg.MQTTBroker) + + // Wire MQTT to automation engine + automationEngine.SetMQTTClient(mqttClient) + } + } + } + + // Phase 5: Self-healing fleet manager with GDOP optimization + fleetHealer := fleet.NewFleetHealer(fleetReg, fleet.FleetHealerConfig{ + HealInterval: 60 * time.Second, + MinOnlineNodes: 2, + MaxHistorySize: 100, + }) + + // Phase 5: Link weather diagnostics + weatherDiagnostics := fleet.NewLinkWeatherDiagnostics() + + // Legacy fleet manager (kept for basic operations) + fleetMgr := fleet.NewManager(fleetReg) + + // Phase 5: Multi-notifier broadcasts node events to both legacy manager and healer + multiNotify := newMultiNotifier(fleetMgr, fleetHealer) + ingestSrv.SetFleetNotifier(multiNotify) + + // Adaptive rate controller + rateCtrl := ingestion.NewRateController(func(mac string, rateHz int, varianceThreshold float64) { + ingestSrv.SendConfigToMAC(mac, rateHz, varianceThreshold) + }) + ingestSrv.SetRateController(rateCtrl) + go rateCtrl.Run(ctx) + + // Dashboard hub and server + dashboardHub := dashboard.NewHub() + dashboardSrv := dashboard.NewServer(dashboardHub) + + dashboardHub.SetIngestionState(ingestSrv) + + // Wire ingestion → dashboard for CSI and motion broadcasts + ingestSrv.SetDashboardBroadcaster(dashboardHub) + ingestSrv.SetMotionBroadcaster(dashboardHub) + + // Phase 6: Wire BLE messages to registry and identity matcher + ingestSrv.SetBLEHandler(func(nodeMAC string, devices []ingestion.BLEDevice) { + // Convert ingestion.BLEDevice to ble.BLEObservation and process + observations := make([]ble.BLEObservation, len(devices)) + for i, dev := range devices { + observations[i] = ble.BLEObservation{ + Addr: dev.Addr, + Name: dev.Name, + MfrID: dev.MfrID, + MfrDataHex: dev.MfrDataHex, + RSSIdBm: dev.RSSIdBm, + } + // Update RSSI cache for real-time triangulation + rssiCache.AddWithTime(dev.Addr, nodeMAC, dev.RSSIdBm, time.Now()) + } + // Store in persistent registry + if bleRegistry != nil { + bleRegistry.ProcessRelayMessage(nodeMAC, observations) + } + }) + + // Start RSSI cache cleanup goroutine + go func() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + rssiCache.CleanOlder(5 * time.Minute) + } + } + }() + + // Wire fleet notifier/broadcaster and start self-healing loop + fleetMgr.SetNotifier(ingestSrv) + fleetMgr.SetBroadcaster(dashboardHub) + go fleetMgr.Run(ctx) + + // Phase 5: Wire advanced fleet healer + fleetHealer.SetNotifier(ingestSrv) + fleetHealer.SetBroadcaster(dashboardHub) + go fleetHealer.Run(ctx) + + // Phase 5: Wire weather diagnostics with node position accessor + weatherDiagnostics.SetNodePositionAccessor(func(mac string) (x, z float64, ok bool) { + node, err := fleetReg.GetNode(mac) + if err != nil { + return 0, 0, false + } + return node.PosX, node.PosZ, true + }) + weatherDiagnostics.SetPositionSuggester(func() (x, z, improvement float64) { + return fleetHealer.SuggestNodePosition() + }) + + // Phase 5: Advanced diagnostic engine with root-cause analysis + diagnosticEngine := diagnostics.NewDiagnosticEngine(diagnostics.DiagnosticConfig{ + DiagnosticInterval: 15 * time.Minute, + HistoryWindow: 1 * time.Hour, + MinSamples: 10, + }) + + // Wire health history accessor for diagnostic engine + diagnosticEngine.SetHealthHistoryAccessor(func(linkID string, window time.Duration) []diagnostics.LinkHealthSnapshot { + // Get history from weather diagnostics + snapshots := weatherDiagnostics.GetHistory(linkID, window) + result := make([]diagnostics.LinkHealthSnapshot, len(snapshots)) + for i, s := range snapshots { + result[i] = diagnostics.LinkHealthSnapshot{ + Timestamp: s.Timestamp, + SNR: s.SNR, + PhaseStability: s.PhaseStability, + PacketRate: s.PacketRate, + DriftRate: s.DriftRate, + CompositeScore: s.CompositeScore, + DeltaRMSVariance: s.DeltaRMSVariance, + IsQuietPeriod: s.IsQuietPeriod, + } + } + return result + }) + + // Wire link ID accessor + diagnosticEngine.SetAllLinkIDsAccessor(func() []string { + return pm.GetAllLinkIDs() + }) + + // Wire node position accessor for diagnostics + diagnosticEngine.SetNodePositionAccessor(func(mac string) (diagnostics.Vec3, bool) { + node, err := fleetReg.GetNode(mac) + if err != nil { + return diagnostics.Vec3{}, false + } + return diagnostics.Vec3{X: node.PosX, Y: node.PosY, Z: node.PosZ}, true + }) + + // Wire GDOP improvement accessor + diagnosticEngine.SetGDOPImprovementAccessor(func(nodeMAC string, targetPos diagnostics.Vec3) float64) { + // Calculate current worst GDOP vs new worst GDOP with node at target position + currentWorstX, currentWorstZ, currentWorstGDOP := fleetHealer.GetWorstCoverageZone() + _ = currentWorstX + _ = currentWorstZ + // Estimate improvement - this is a simplified calculation + return currentWorstGDOP * 0.2 // Assume 20% improvement as placeholder + }) + + // Wire repositioning computer for Rule 4 + diagnosticEngine.SetRepositioningComputer(func(linkID string, blockedZone diagnostics.Vec3) (diagnostics.Vec3, float64, error) { + // Use fleet healer's position suggestion + sugX, sugZ, improvement := fleetHealer.SuggestNodePosition() + return diagnostics.Vec3{X: sugX, Z: sugZ}, improvement, nil + }) + + // Wire occupancy accessor for quiet period detection + diagnosticEngine.SetOccupancyAccessor(func() int { + return pm.GetStationaryPersonCount() + }) + + // Start diagnostic engine + go diagnosticEngine.Run(ctx) + log.Printf("[INFO] Phase 5 diagnostic engine started (interval: 15m)") + + // Phase 5: Baseline persistence store + baselineStore, err := sigproc.NewBaselineStore(filepath.Join(cfg.DataDir, "baselines.db")) + if err != nil { + log.Printf("[WARN] Failed to open baseline store: %v (persistence disabled)", err) + } else { + defer baselineStore.Close() + // Restore saved baselines + if err := baselineStore.RestoreAll(pm, 64); err != nil { + log.Printf("[WARN] Failed to restore baselines: %v", err) + } + // Start periodic saves + baselineStore.StartPeriodicSave(ctx, pm, baselineSaveInterval) + log.Printf("[INFO] Baseline persistence enabled (save interval: %v)", baselineSaveInterval) + } + + // Phase 6: Health persistence store for diagnostics and weekly trends + healthStore, err := sigproc.NewHealthStore(filepath.Join(cfg.DataDir, "health.db")) + if err != nil { + log.Printf("[WARN] Failed to open health store: %v (health persistence disabled)", err) + } else { + defer healthStore.Close() + healthStore.StartPeriodicTasks(ctx) + log.Printf("[INFO] Health persistence enabled at %s", filepath.Join(cfg.DataDir, "health.db")) + + // Wire feedback accessor for diagnostic engine Rule 4 (Fresnel blockage) + diagnosticEngine.SetFeedbackAccessor(func(linkID string, window time.Duration) []diagnostics.FeedbackEvent { + events, err := healthStore.GetFeedbackEvents(linkID, window) + if err != nil { + return nil + } + result := make([]diagnostics.FeedbackEvent, len(events)) + for i, e := range events { + result[i] = diagnostics.FeedbackEvent{ + LinkID: e.LinkID, + EventType: e.EventType, + Position: diagnostics.Vec3{X: e.PosX, Y: e.PosY, Z: e.PosZ}, + Timestamp: e.Timestamp, + } + } + return result + }) + } + + // Phase 5: Periodic health computation for all links + go func() { + ticker := time.NewTicker(healthComputeInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + pm.ComputeAllHealth() + } + } + }() + + // Phase 6: Periodic tracking + identity matching + fall detection + go func() { + ticker := time.NewTicker(100 * time.Millisecond) // 10 Hz + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // Get tracked blobs from fusion/tracker + blobs := pm.GetTrackedBlobs() + if len(blobs) == 0 { + continue + } + + // Update identity matcher + if identityMatcher != nil { + identityMatcher.UpdateBlobs(blobs) + } + + // Update zones occupancy + if zonesMgr != nil { + for _, blob := range blobs { + zonesMgr.UpdateBlobPosition(blob.ID, blob.X, blob.Y, blob.Z) + } + } + + // Update flow analytics + if flowAccumulator != nil { + for _, blob := range blobs { + // Get person ID from identity matcher + var personID string + if identityMatcher != nil { + if match := identityMatcher.GetMatch(blob.ID); match != nil { + personID = match.PersonID + } + } + flowAccumulator.UpdateTrack(analytics.TrackUpdate{ + ID: blob.ID, + X: blob.X, + Y: blob.Y, + Z: blob.Z, + VX: blob.VX, + VY: blob.VY, + VZ: blob.VZ, + PersonID: personID, + }) + } + } + + // Run fall detection + for _, blob := range blobs { + fallDetector.Update([]struct { + ID int + X, Y, Z float64 + VX, VY, VZ float64 + Posture string + }{blob}, time.Now()) + } + + // Evaluate automations + if automationEngine != nil { + automationEngine.Evaluate(blobs, func(blobID int) string { + if zonesMgr != nil { + return zonesMgr.GetBlobZone(blobID) + } + return "" + }) + } + } + } + }() + + // Phase 6: Fall detection callback + fallDetector.SetOnFall(func(event falldetect.FallEvent) { + log.Printf("[WARN] Fall detected: blob=%d confidence=%.2f", event.BlobID, event.Confidence) + + // Get identity + var personID, personName, personColor string + if identityMatcher != nil { + if match := identityMatcher.GetMatch(event.BlobID); match != nil { + event.Identity = match.DeviceName + personID = match.PersonID + personName = match.DeviceName + } + } + + // Get zone + var zoneID string + if zonesMgr != nil { + zoneID = zonesMgr.GetBlobZone(event.BlobID) + } + + // Send notification + if notifyService != nil { + notif := notify.Notification{ + Title: "Fall Detected", + Body: fmt.Sprintf("Fall detected for %s at (%.1f, %.1f, %.1f)", event.Identity, event.Position.X, event.Position.Y, event.Position.Z), + Priority: 5, + Tags: []string{"warning", "fall"}, + Data: map[string]interface{}{ + "blob_id": event.BlobID, + "confidence": event.Confidence, + }, + Timestamp: time.Now(), + } + notifyService.Send(notif) + } + + // Publish to MQTT + if mqttClient != nil && mqttClient.IsConnected() { + mqttClient.UpdateBinarySensorState("fall_detected", true) + } + + // Trigger automation event + if automationEngine != nil { + automationEngine.ProcessEvent(automation.Event{ + Type: automation.TriggerFallDetected, + Timestamp: time.Now(), + PersonID: personID, + PersonName: personName, + PersonColor: personColor, + ZoneID: zoneID, + Confidence: event.Confidence, + Extra: map[string]interface{}{ + "blob_id": event.BlobID, + "position": []float64{event.Position.X, event.Position.Y, event.Position.Z}, + }, + }) + } + }) + + // Set identity function for fall detector + fallDetector.SetIdentityFunc(func(blobID int) string { + if identityMatcher == nil { + if match := identityMatcher.GetMatch(blobID); match != nil { + return match.DeviceName + } + } + return "" + }) + + // Phase 6: Zone crossing callback + if zonesMgr != nil { + zonesMgr.SetOnCrossing(func(event zones.CrossingEvent) { + log.Printf("[INFO] Zone crossing: blob %d via %s", event.BlobID, event.PortalID) + + // Get identity + var personID, personName, personColor string + if identityMatcher != nil { + if match := identityMatcher.GetMatch(event.BlobID); match != nil { + event.Identity = match.DeviceName + personID = match.PersonID + personName = match.DeviceName + } + } + + // Send notification + if notifyService != nil { + notif := notify.Notification{ + Title: "Zone Change", + Body: fmt.Sprintf("%s moved from %s to %s", event.Identity, event.FromZone, event.ToZone), + Priority: 1, + Tags: []string{"zone", "movement"}, + Data: map[string]interface{}{ + "portal_id": event.PortalID, + "direction": event.Direction, + }, + Timestamp: time.Now(), + } + notifyService.Send(notif) + } + + // Update MQTT zone occupancy + if mqttClient != nil && mqttClient.IsConnected() { + mqttClient.UpdateZoneOccupancy(event.ToZone, zonesMgr.GetZoneOccupancy(event.ToZone).Count) + } + + // Trigger automation events + if automationEngine != nil { + // zone_leave event + if event.FromZone != "" { + automationEngine.ProcessEvent(automation.Event{ + Type: automation.TriggerZoneLeave, + Timestamp: time.Now(), + PersonID: personID, + PersonName: personName, + PersonColor: personColor, + ZoneID: event.FromZone, + ZoneName: event.FromZone, + }) + } + + // zone_enter event + if event.ToZone != "" { + automationEngine.ProcessEvent(automation.Event{ + Type: automation.TriggerZoneEnter, + Timestamp: time.Now(), + PersonID: personID, + PersonName: personName, + PersonColor: personColor, + ZoneID: event.ToZone, + ZoneName: event.ToZone, + FromZone: event.FromZone, + ToZone: event.ToZone, + }) + + // Update dwell tracking + automationEngine.UpdateZoneDwellTracking(event.BlobID, event.ToZone, time.Now()) + } + } + }) + } + + // Phase 6: Diurnal patterns learned notification + // Track which links have already broadcast their "patterns learned" notification + diurnalNotified := make(map[string]bool) + go func() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + statuses := pm.GetDiurnalLearningStatus() + for _, status := range statuses { + // If link is ready and we haven't notified yet + if status.IsReady && !diurnalNotified[status.LinkID] { + diurnalNotified[status.LinkID] = true + log.Printf("[INFO] Diurnal patterns learned for link %s after 7 days", status.LinkID) + // Broadcast notification to dashboard + msg := map[string]interface{}{ + "type": "diurnal_ready", + "link_id": status.LinkID, + "message": "Your system has learned your daily patterns. Accuracy should improve this week.", + } + data, _ := json.Marshal(msg) + dashboardHub.Broadcast(data) + } + } + } + } + }() + + // Phase 5: Periodic weather snapshot recording + go func() { + ticker := time.NewTicker(weatherRecordInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // Record health snapshots for all active links + states := pm.GetAllMotionStates() + var healthEntries []sigproc.HealthLogEntry + for _, state := range states { + processor := pm.GetProcessor(state.LinkID) + if processor == nil { + continue + } + health := processor.GetHealth() + if health == nil { + continue + } + snr, phaseStability, packetRate, driftRate, deltaRMSVar := health.GetHealthMetrics() + isQuiet := !state.MotionDetected + weatherDiagnostics.RecordSnapshot(state.LinkID, snr, phaseStability, packetRate, driftRate) + + // Also persist to HealthStore for long-term diagnostics + if healthStore != nil { + composite := 0.3*snr + 0.25*(1-phaseStability) + 0.25*math.Min(packetRate/20.0, 1.0) + 0.2*(1-driftRate) + if composite < 0 { + composite = 0 + } + if composite > 1 { + composite = 1 + } + healthEntries = append(healthEntries, sigproc.HealthLogEntry{ + LinkID: state.LinkID, + Timestamp: time.Now(), + SNR: snr, + PhaseStability: phaseStability, + PacketRate: packetRate, + DriftRate: driftRate, + CompositeScore: composite, + DeltaRMSVariance: deltaRMSVar, + IsQuietPeriod: isQuiet, + }) + } + } + // Batch persist to health store + if healthStore != nil && len(healthEntries) > 0 { + if err := healthStore.LogHealthBatch(healthEntries); err != nil { + log.Printf("[WARN] Failed to persist health entries: %v", err) + } + } + } + } + }() + log.Printf("[INFO] Phase 5 health monitoring enabled (health: %v, weather: %v)", healthComputeInterval, weatherRecordInterval) + + // Phase 6: Flow analytics background tasks + if flowAccumulator != nil { + // Daily pruning of old trajectory segments + go func() { + ticker := time.NewTicker(24 * time.Hour) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := flowAccumulator.PruneOldSegments(); err != nil { + log.Printf("[WARN] Failed to prune old trajectory segments: %v", err) + } + } + } + }() + + // Weekly corridor detection + go func() { + ticker := time.NewTicker(7 * 24 * time.Hour) + defer ticker.Stop() + // Run once at startup + if err := flowAccumulator.ComputeCorridors(); err != nil { + log.Printf("[WARN] Failed to compute corridors: %v", err) + } + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := flowAccumulator.ComputeCorridors(); err != nil { + log.Printf("[WARN] Failed to compute corridors: %v", err) + } + } + } + }() + log.Printf("[INFO] Flow analytics background tasks started (prune: 24h, corridors: 7d)") + } + + // Fleet REST API + fleetHandler := fleet.NewHandler(fleetMgr) + fleetHandler.RegisterRoutes(r) + + // Phase 6: BLE REST API + if bleRegistry != nil { + r.Get("/api/ble/devices", func(w http.ResponseWriter, r *http.Request) { + devices := bleRegistry.GetAllDevices() + writeJSON(w, devices) + }) + r.Get("/api/ble/devices/{addr}", func(w http.ResponseWriter, r *http.Request) { + addr := chi.URLParam(r, "addr") + device, err := bleRegistry.GetDevice(addr) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + writeJSON(w, device) + }) + r.Post("/api/ble/devices", func(w http.ResponseWriter, r *http.Request) { + var device ble.DeviceRecord + if err := json.NewDecoder(r.Body).Decode(&device); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if device.Addr == "" { + http.Error(w, "addr required", http.StatusBadRequest) + return + } + if err := bleRegistry.UpsertDevice(&device); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, device) + }) + r.Put("/api/ble/devices/{addr}", func(w http.ResponseWriter, r *http.Request) { + addr := chi.URLParam(r, "addr") + var device ble.DeviceRecord + if err := json.NewDecoder(r.Body).Decode(&device); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + device.Addr = addr + if err := bleRegistry.UpsertDevice(&device); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, device) + }) + r.Delete("/api/ble/devices/{addr}", func(w http.ResponseWriter, r *http.Request) { + addr := chi.URLParam(r, "addr") + if err := bleRegistry.DeleteDevice(addr); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + }) + r.Get("/api/ble/matches", func(w http.ResponseWriter, r *http.Request) { + if identityMatcher == nil { + matches := identityMatcher.GetMatches() + writeJSON(w, matches) + return + } + writeJSON(w, map[int]*ble.IdentityMatch{}) + }) + } + + // Phase 6: Zones REST API + if zonesMgr != nil { + r.Get("/api/zones", func(w http.ResponseWriter, r *http.Request) { + zones := zonesMgr.GetAllZones() + writeJSON(w, zones) + }) + r.Post("/api/zones", func(w http.ResponseWriter, r *http.Request) { + var zone zones.Zone + if err := json.NewDecoder(r.Body).Decode(&zone); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if zone.ID == "" { + zone.ID = fmt.Sprintf("zone_%d", time.Now().UnixNano()) + } + if err := zonesMgr.CreateZone(&zone); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, zone) + }) + r.Put("/api/zones/{id}", func(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + var zone zones.Zone + if err := json.NewDecoder(r.Body).Decode(&zone); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + zone.ID = id + if err := zonesMgr.UpdateZone(&zone); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, zone) + }) + r.Delete("/api/zones/{id}", func(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if err := zonesMgr.DeleteZone(id); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + }) + r.Get("/api/zones/occupancy", func(w http.ResponseWriter, r *http.Request) { + occupancy := zonesMgr.GetOccupancy() + writeJSON(w, occupancy) + }) + r.Get("/api/zones/crossings", func(w http.ResponseWriter, r *http.Request) { + crossings := zonesMgr.GetRecentCrossings(20) + writeJSON(w, crossings) + }) + } + + // Phase 6: Portals REST API + r.Get("/api/portals", func(w http.ResponseWriter, r *http.Request) { + if zonesMgr == nil { + writeJSON(w, zonesMgr.GetAllPortals()) + return + } + writeJSON(w, []*zones.Portal{}) + }) + r.Post("/api/portals", func(w http.ResponseWriter, r *http.Request) { + if zonesMgr == nil { + http.Error(w, "zones manager not available", http.StatusServiceUnavailable) + return + } + var portal zones.Portal + if err := json.NewDecoder(r.Body).Decode(&portal); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if portal.ID == "" { + portal.ID = fmt.Sprintf("portal_%d", time.Now().UnixNano()) + } + if err := zonesMgr.CreatePortal(&portal); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, portal) + }) + r.Put("/api/portals/{id}", func(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if zonesMgr == nil { + http.Error(w, "zones manager not available", http.StatusServiceUnavailable) + return + } + var portal zones.Portal + if err := json.NewDecoder(r.Body).Decode(&portal); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + portal.ID = id + if err := zonesMgr.UpdatePortal(&portal); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, portal) + }) + r.Delete("/api/portals/{id}", func(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if zonesMgr == nil { + http.Error(w, "zones manager not available", http.StatusServiceUnavailable) + return + } + if err := zonesMgr.DeletePortal(id); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + }) + } + + // Phase 6: Automation REST API + if automationEngine != nil { + r.Get("/api/automations", func(w http.ResponseWriter, r *http.Request) { + automations := automationEngine.GetAllAutomations() + writeJSON(w, automations) + }) + r.Post("/api/automations", func(w http.ResponseWriter, r *http.Request) { + var auto automation.Automation + if err := json.NewDecoder(r.Body).Decode(&auto); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if auto.ID == "" { + auto.ID = fmt.Sprintf("auto_%d", time.Now().UnixNano()) + } + if err := automationEngine.CreateAutomation(&auto); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, auto) + }) + r.Get("/api/automations/{id}", func(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + auto := automationEngine.GetAutomation(id) + if auto == nil { + http.Error(w, "automation not found", http.StatusNotFound) + return + } + writeJSON(w, auto) + }) + r.Put("/api/automations/{id}", func(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + var auto automation.Automation + if err := json.NewDecoder(r.Body).Decode(&auto); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + auto.ID = id + if err := automationEngine.UpdateAutomation(&auto); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, auto) + }) + r.Delete("/api/automations/{id}", func(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if err := automationEngine.DeleteAutomation(id); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + }) + r.Post("/api/automations/{id}/test", func(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if err := automationEngine.TestFire(id); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + }) + r.Get("/api/automations/events", func(w http.ResponseWriter, r *http.Request) { + events := automationEngine.GetRecentActionLog(50) + writeJSON(w, events) + }) + + // Trigger volumes API + r.Get("/api/automations/volumes", func(w http.ResponseWriter, r *http.Request) { + volumes := automationEngine.GetAllTriggerVolumes() + writeJSON(w, volumes) + }) + r.Post("/api/automations/volumes", func(w http.ResponseWriter, r *http.Request) { + var volume automation.TriggerVolume + if err := json.NewDecoder(r.Body).Decode(&volume); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if volume.ID == "" { + volume.ID = fmt.Sprintf("volume_%d", time.Now().UnixNano()) + } + if err := automationEngine.CreateTriggerVolume(&volume); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, volume) + }) + r.Delete("/api/automations/volumes/{id}", func(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if err := automationEngine.DeleteTriggerVolume(id); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + }) + + // System mode API + r.Get("/api/mode", func(w http.ResponseWriter, r *http.Request) { + mode := automationEngine.GetSystemMode() + writeJSON(w, map[string]string{"mode": string(mode)}) + }) + r.Post("/api/mode", func(w http.ResponseWriter, r *http.Request) { + var req struct { + Mode string `json:"mode"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + mode := automation.SystemMode(req.Mode) + if mode != automation.ModeHome && mode != automation.ModeAway && mode != automation.ModeSleep { + http.Error(w, "invalid mode, must be home, away, or sleep", http.StatusBadRequest) + return + } + if err := automationEngine.SetSystemMode(mode); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, map[string]string{"mode": string(mode)}) + }) + + // Wire providers to automation engine + if zonesMgr != nil { + automationEngine.SetZoneProvider(&zoneProviderAdapter{mgr: zonesMgr}) + } + if bleRegistry != nil { + automationEngine.SetPersonProvider(&personProviderAdapter{registry: bleRegistry}) + automationEngine.SetDeviceProvider(&deviceProviderAdapter{registry: bleRegistry}) + } + if mqttClient != nil { + automationEngine.SetMQTTClient(mqttClient) + } + if notifyService != nil { + automationEngine.SetNotificationSender(¬ifySenderAdapter{service: notifyService}) + } + } + + // Phase 6: Notification channels REST API + if notifyService != nil { + r.Get("/api/notifications/channels", func(w http.ResponseWriter, r *http.Request) { + // Return configured channels (without sensitive data) + writeJSON(w, map[string]interface{}{ + "channels": []string{}, + }) + }) + r.Post("/api/notifications/channels", func(w http.ResponseWriter, r *http.Request) { + var req struct { + ID string + Type string + URL string + Token string + User string + Username string + Password string + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + cc := notify.ChannelConfig{ + Type: notify.NotificationChannel(req.Type), + Enabled: true, + URL: req.URL, + Token: req.Token, + User: req.User, + Username: req.Username, + Password: req.Password, + } + if err := notifyService.AddChannel(req.ID, cc); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, map[string]string{"status": "created"}) + }) + r.Delete("/api/notifications/channels/{id}", func(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if err := notifyService.RemoveChannel(id); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + }) + r.Post("/api/notifications/test", func(w http.ResponseWriter, r *http.Request) { + if notifyService == nil { + http.Error(w, "notification service not available", http.StatusServiceUnavailable) + return + } + if err := notifyService.Send(notify.Notification{ + Title: "Test Notification", + Body: "This is a test notification from Spaxel", + }); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + }) + r.Get("/api/notifications/history", func(w http.ResponseWriter, r *http.Request) { + if notifyService == nil { + writeJSON(w, []struct{}{}) + return + } + history := notifyService.GetHistory(50) + writeJSON(w, history) + }) + r.Post("/api/notifications/quiet-hours", func(w http.ResponseWriter, r *http.Request) { + var qh notify.QuietHoursConfig + if err := json.NewDecoder(r.Body).Decode(&qh); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if err := notifyService.SetQuietHours(qh); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, qh) + }) + } + + // Phase 5: Weather diagnostics REST API + r.Get("/api/weather", func(w http.ResponseWriter, r *http.Request) { + reports := weatherDiagnostics.GetAllLinkReports() + writeJSON(w, reports) + }) + r.Get("/api/weather/{linkID}", func(w http.ResponseWriter, r *http.Request) { + linkID := chi.URLParam(r, "linkID") + report := weatherDiagnostics.GetReport(linkID) + writeJSON(w, report) + }) + r.Get("/api/weather/summary", func(w http.ResponseWriter, r *http.Request) { + condition, avgConfidence, issueCount := weatherDiagnostics.GetSystemWeatherSummary() + writeJSON(w, map[string]interface{}{ + "condition": condition, + "avg_confidence": avgConfidence, + "issue_count": issueCount, + }) + }) + r.Get("/api/weather/{linkID}/weekly", func(w http.ResponseWriter, r *http.Request) { + linkID := chi.URLParam(r, "linkID") + trend := weatherDiagnostics.GetWeeklyTrend(linkID) + writeJSON(w, trend) + }) + + // Phase 5: Coverage and healing status API + r.Get("/api/coverage", func(w http.ResponseWriter, r *http.Request) { + coverage := fleetHealer.GetCoverage() + writeJSON(w, coverage) + }) + r.Get("/api/coverage/history", func(w http.ResponseWriter, r *http.Request) { + limitStr := r.URL.Query().Get("limit") + limit := 10 + if limitStr != "" { + if n, err := strconv.Atoi(limitStr); err == nil && n > 0 { + limit = n + } + } + history := fleetHealer.GetCoverageHistory(limit) + writeJSON(w, history) + }) + r.Get("/api/healing/status", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]interface{}{ + "degraded": fleetHealer.IsDegraded(), + "online_nodes": fleetHealer.GetOnlineNodes(), + "optimal_roles": fleetHealer.GetOptimalRoles(), + }) + }) + r.Get("/api/healing/suggest", func(w http.ResponseWriter, r *http.Request) { + x, z, improvement := fleetHealer.SuggestNodePosition() + worstX, worstZ, worstGDOP := fleetHealer.GetWorstCoverageZone() + writeJSON(w, map[string]interface{}{ + "suggested_position": map[string]float64{"x": x, "z": z}, + "expected_improvement": improvement, + "worst_coverage_zone": map[string]float64{"x": worstX, "z": worstZ, "gdop": worstGDOP}, + }) + }) + + // Phase 5: System health API + r.Get("/api/health/system", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]interface{}{ + "system_health": pm.GetSystemHealth(), + "link_count": pm.LinkCount(), + "active_links": pm.ActiveLinks(), + "stationary_count": pm.GetStationaryPersonCount(), + "worst_link": func() string { id, _ := pm.GetWorstLink(); return id }(), + }) + }) + + // Phase 6: Diurnal learning status API + r.Get("/api/diurnal/status", func(w http.ResponseWriter, r *http.Request) { + statuses := pm.GetDiurnalLearningStatus() + writeJSON(w, statuses) + }) + r.Get("/api/diurnal/status/{linkID}", func(w http.ResponseWriter, r *http.Request) { + linkID := chi.URLParam(r, "linkID") + allStatuses := pm.GetDiurnalLearningStatus() + for _, status := range allStatuses { + if status.LinkID == linkID { + writeJSON(w, status) + return + } + } + http.Error(w, "link not found", http.StatusNotFound) + }) + + // Link health API - returns all links with health scores and details + r.Get("/api/links", func(w http.ResponseWriter, r *http.Request) { + links := ingestSrv.GetAllLinksWithHealth() + writeJSON(w, links) + }) + + // Phase 6: Link diagnostics API + r.Get("/api/links/{linkID}/diagnostics", func(w http.ResponseWriter, r *http.Request) { + linkID := chi.URLParam(r, "linkID") + diagnoses := diagnosticEngine.GetDiagnoses(linkID) + writeJSON(w, diagnoses) + }) + + r.Get("/api/links/{linkID}/health-history", func(w http.ResponseWriter, r *http.Request) { + linkID := chi.URLParam(r, "linkID") + windowStr := r.URL.Query().Get("window") + window := 24 * time.Hour // default 24h + if windowStr != "" { + if hours, err := strconv.Atoi(windowStr); err == nil && hours > 0 { + window = time.Duration(hours) * time.Hour + } + } + if healthStore == nil { + http.Error(w, "health store not available", http.StatusServiceUnavailable) + return + } + history, err := healthStore.GetHealthHistory(linkID, window) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, history) + }) + + r.Get("/api/diagnostics", func(w http.ResponseWriter, r *http.Request) { + allDiagnoses := diagnosticEngine.GetAllDiagnoses() + writeJSON(w, allDiagnoses) + }) + + // Phase 6: Analytics REST API + if flowAccumulator != nil { + analyticsHandler := analytics.NewHandler(flowAccumulator) + analyticsHandler.RegisterRoutes(r) + } + + // Phase 6: Learning feedback REST API + if feedbackStore != nil { + learningHandler := learning.NewHandler(feedbackStore, feedbackProcessor, accuracyComputer) + learningHandler.RegisterRoutes(r) + } + + // OTA firmware server and manager + firmwareDir := filepath.Join(cfg.DataDir, "firmware") + otaSrv := ota.NewServer(firmwareDir) + otaMgr := ota.NewManager(otaSrv, "http://"+cfg.BindAddr) + otaMgr.SetSender(ingestSrv) + ingestSrv.SetOTAManager(otaMgr) + log.Printf("[INFO] OTA firmware server at %s", firmwareDir) + + // OTA REST API + r.Get("/api/firmware", otaSrv.HandleList) + r.Post("/api/firmware/upload", otaSrv.HandleUpload) + r.Get("/firmware/{filename}", otaSrv.HandleServe) + r.Get("/api/firmware/progress", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(otaMgr.GetProgress()) + }) + r.Post("/api/firmware/ota-all", func(w http.ResponseWriter, r *http.Request) { + // Rolling update of all connected nodes + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + if err := otaMgr.SendOTAAll(ctx, 60*time.Second); err != nil { + log.Printf("[ERROR] Rolling OTA failed: %v", err) + } + }() + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusAccepted) + json.NewEncoder(w).Encode(map[string]string{"status": "started"}) + }) + + // Provisioning API (used by onboarding wizard) + _, msPortStr, _ := net.SplitHostPort(cfg.BindAddr) + msPort, _ := strconv.Atoi(msPortStr) + if msPort == 0 { + msPort = 8080 + } + provSrv := provisioning.NewServer(cfg.DataDir, cfg.MDNSName, msPort) + r.Post("/api/provision", provSrv.HandleProvision) + + // Firmware manifest for esp-web-tools (onboarding wizard flashing) + r.Get("/api/firmware/manifest", func(w http.ResponseWriter, r *http.Request) { + latest := otaSrv.GetLatest() + manifest := map[string]interface{}{ + "name": "Spaxel Node", + "version": version, + "new_install_prompt_erase": true, + "builds": []map[string]interface{}{}, + } + + if latest != nil { + manifest["builds"] = []map[string]interface{}{ + { + "chipFamily": "ESP32-S3", + "parts": []map[string]interface{}{ + { + "path": "/firmware/" + latest.Filename, + "address": 0x0, + }, + }, + }, + } + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(manifest) //nolint:errcheck + }) + + go dashboardHub.Run() + + r.HandleFunc("/ws/dashboard", dashboardSrv.HandleDashboardWS) + + // Serve dashboard static files + staticDir := cfg.StaticDir + if staticDir == "" { + staticDir = findDashboardDir() + } + + if staticDir != "" { + if _, err := os.Stat(staticDir); err == nil { + log.Printf("[INFO] Serving dashboard from %s", staticDir) + r.Get("/*", func(w http.ResponseWriter, r *http.Request) { + path := filepath.Join(staticDir, r.URL.Path) + + if info, err := os.Stat(path); err == nil && info.IsDir() { + path = filepath.Join(path, "index.html") + } + + if _, err := os.Stat(path); err == nil { + http.ServeFile(w, r, path) + return + } + + if filepath.Ext(r.URL.Path) == "" { + http.ServeFile(w, r, filepath.Join(staticDir, "index.html")) + return + } + + http.NotFound(w, r) + }) + } else { + log.Printf("[WARN] Dashboard directory not found: %s", staticDir) + } + } else { + log.Printf("[WARN] No dashboard directory found, static files not served") + } + + // mDNS advertisement + var mdnsServer *mdns.Server + if cfg.MDNSEnabled { + service, err := mdns.NewMDNSService( + cfg.MDNSName, + "_spaxel._tcp", + "local.", + "", + 8080, + nil, + []string{"version=1", "ws=/ws/node", "dashboard=/ws/dashboard"}, + ) + if err != nil { + log.Printf("[ERROR] Failed to create mDNS service: %v", err) + } else { + mdnsServer, err = mdns.NewServer(&mdns.Config{Zone: service}) + if err != nil { + log.Printf("[ERROR] Failed to start mDNS server: %v", err) + } else { + log.Printf("[INFO] mDNS advertising %s._spaxel._tcp.local:8080", cfg.MDNSName) + } + } + } + + srv := &http.Server{ + Addr: cfg.BindAddr, + Handler: r, + ReadTimeout: 10 * time.Second, + WriteTimeout: 30 * time.Second, + } + + go func() { + log.Printf("[INFO] HTTP server listening on %s", cfg.BindAddr) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("[FATAL] HTTP server error: %v", err) + } + }() + + sig := <-sigChan + log.Printf("[INFO] Received signal %v, initiating graceful shutdown", sig) + + cancel() + + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer shutdownCancel() + + if err := srv.Shutdown(shutdownCtx); err != nil { + log.Printf("[ERROR] HTTP server shutdown error: %v", err) + } + + ingestSrv.Shutdown(shutdownCtx) + + if mdnsServer != nil { + mdnsServer.Shutdown() + } + + log.Printf("[INFO] Shutdown complete") +} + +// Provider adapters for automation engine + +type zoneProviderAdapter struct { + mgr *zones.Manager +} + +func (z *zoneProviderAdapter) GetZone(id string) (string, bool) { + zone := z.mgr.GetZone(id) + if zone == nil { + return "", false + } + return zone.Name, true +} + +func (z *zoneProviderAdapter) GetZoneOccupancy(zoneID string) (int, []int) { + occ := z.mgr.GetZoneOccupancy(zoneID) + if occ == nil { + return 0, nil + } + return occ.Count, occ.BlobIDs +} + +type personProviderAdapter struct { + registry *ble.Registry +} + +func (p *personProviderAdapter) GetPerson(id string) (string, string, bool) { + person, err := p.registry.GetPerson(id) + if err != nil { + return "", "", false + } + return person.Name, person.Color, true +} + +type deviceProviderAdapter struct { + registry *ble.Registry +} + +func (d *deviceProviderAdapter) GetDevice(mac string) (string, bool) { + device, err := d.registry.GetDevice(mac) + if err != nil { + return "", false + } + if device.Label != "" { + return device.Label, true + } + if device.Name != "" { + return device.Name, true + } + if device.DeviceName != "" { + return device.DeviceName, true + } + return mac, true +} + +type notifySenderAdapter struct { + service *notify.Service +} + +func (n *notifySenderAdapter) SendViaChannel(channelType string, title, body string, data map[string]interface{}) error { + // The notify service sends to all channels, so we use it directly + notif := notify.Notification{ + Title: title, + Body: body, + Data: data, + Timestamp: time.Now(), + } + return n.service.Send(notif) +} + diff --git a/mothership/internal/learning/accuracy.go b/mothership/internal/learning/accuracy.go new file mode 100644 index 0000000..4b48f9c --- /dev/null +++ b/mothership/internal/learning/accuracy.go @@ -0,0 +1,358 @@ +// Package learning provides accuracy metric computation for detection +package learning + +import ( + "context" + "fmt" + "log" + "math" + "sync" + "time" +) + +// AccuracyComputerConfig holds configuration for accuracy computation +type AccuracyComputerConfig struct { + ComputeInterval time.Duration // How often to compute accuracy metrics + HistoryWeeks int // Number of weeks to keep in history +} + +// DefaultAccuracyComputerConfig returns default configuration +func DefaultAccuracyComputerConfig() AccuracyComputerConfig { + return AccuracyComputerConfig{ + ComputeInterval: 24 * time.Hour, // Daily computation + HistoryWeeks: 8, // Keep 8 weeks of history + } +} + +// AccuracyComputer computes precision, recall, and F1 metrics +type AccuracyComputer struct { + store *FeedbackStore + config AccuracyComputerConfig + mu sync.RWMutex +} + +// NewAccuracyComputer creates a new accuracy computer +func NewAccuracyComputer(store *FeedbackStore, config AccuracyComputerConfig) *AccuracyComputer { + return &AccuracyComputer{ + store: store, + config: config, + } +} + +// Run starts the background accuracy computation loop +func (a *AccuracyComputer) Run(ctx context.Context) { + ticker := time.NewTicker(a.config.ComputeInterval) + defer ticker.Stop() + + // Compute once at startup + a.ComputeAll() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + a.ComputeAll() + } + } +} + +// ComputeNow triggers an immediate accuracy computation +func (a *AccuracyComputer) ComputeNow() error { + return a.ComputeAll() +} + +// ComputeAll computes accuracy metrics for all scopes +func (a *AccuracyComputer) ComputeAll() error { + // Get current week + currentWeek := GetWeekString(time.Now()) + + // Compute system-wide metrics + if err := a.computeForScope(ScopeTypeSystem, ScopeIDSystem, currentWeek); err != nil { + log.Printf("[WARN] Failed to compute system accuracy: %v", err) + } + + // Compute per-link metrics + if err := a.computePerLink(currentWeek); err != nil { + log.Printf("[WARN] Failed to compute per-link accuracy: %v", err) + } + + // Compute per-zone metrics + if err := a.computePerZone(currentWeek); err != nil { + log.Printf("[WARN] Failed to compute per-zone accuracy: %v", err) + } + + return nil +} + +// Scope types and IDs +const ( + ScopeTypeSystem = "system" + ScopeTypeLink = "link" + ScopeTypeZone = "zone" + ScopeTypePerson = "person" + + ScopeIDSystem = "all" +) + +// computeForScope computes accuracy metrics for a specific scope +func (a *AccuracyComputer) computeForScope(scopeType, scopeID, week string) error { + tp, fp, fn, err := a.getCounts(scopeType, scopeID, week) + if err != nil { + return err + } + + // Compute metrics + precision := 0.0 + if tp+fp > 0 { + precision = float64(tp) / float64(tp+fp) + } + + recall := 0.0 + if tp+fn > 0 { + recall = float64(tp) / float64(tp+fn) + } + + f1 := 0.0 + if precision+recall > 0 { + f1 = 2 * precision * recall / (precision + recall) + } + + // Round to 4 decimal places + precision = math.Round(precision*10000) / 10000 + recall = math.Round(recall*10000) / 10000 + f1 = math.Round(f1*10000) / 10000 + + record := AccuracyRecord{ + Week: week, + ScopeType: scopeType, + ScopeID: scopeID, + Precision: precision, + Recall: recall, + F1: f1, + TPCount: tp, + FPCount: fp, + FNCount: fn, + ComputedAt: time.Now(), + } + + return a.store.SaveAccuracyRecord(record) +} + +// getCounts retrieves TP, FP, FN counts for a scope in a given week +func (a *AccuracyComputer) getCounts(scopeType, scopeID, week string) (tp, fp, fn int, err error) { + // Get week start/end times + weekStart, err := parseWeekString(week) + if err != nil { + return 0, 0, 0, err + } + weekEnd := weekStart.Add(7 * 24 * time.Hour) + + // Get all feedback in the week + feedbacks, err := a.getFeedbackInTimeRange(weekStart, weekEnd) + if err != nil { + return 0, 0, 0, err + } + + // Filter by scope and count + for _, f := range feedbacks { + // Check if feedback belongs to this scope + if !a.matchesScope(f, scopeType, scopeID) { + continue + } + + switch f.FeedbackType { + case TruePositive: + tp++ + case FalsePositive: + fp++ + case FalseNegative: + fn++ + } + } + + return tp, fp, fn, nil +} + +// getFeedbackInTimeRange retrieves all feedback in a time range +func (a *AccuracyComputer) getFeedbackInTimeRange(start, end time.Time) ([]FeedbackRecord, error) { + // This is a simplified implementation - in production you'd have a more efficient query + stats, err := a.store.GetFeedbackStats() + if err != nil { + return nil, err + } + + // For now, use the stats to get counts + // A full implementation would query feedback by timestamp + _ = stats + _ = start + _ = end + + // Return empty for now - actual implementation would query the database + return nil, nil +} + +// matchesScope checks if a feedback record matches the given scope +func (a *AccuracyComputer) matchesScope(f FeedbackRecord, scopeType, scopeID string) bool { + if scopeType == ScopeTypeSystem && scopeID == ScopeIDSystem { + return true // System scope matches everything + } + + if f.Details == nil { + return false + } + + switch scopeType { + case ScopeTypeLink: + if linkID, ok := f.Details["link_id"].(string); ok { + return linkID == scopeID + } + case ScopeTypeZone: + if zoneID, ok := f.Details["zone_id"].(string); ok { + return zoneID == scopeID + } + case ScopeTypePerson: + if personID, ok := f.Details["person_id"].(string); ok { + return personID == scopeID + } + } + + return false +} + +// computePerLink computes accuracy for each link +func (a *AccuracyComputer) computePerLink(week string) error { + // Get all unique link IDs from feedback + linkIDs := a.getUniqueScopeIDs(ScopeTypeLink) + + for _, linkID := range linkIDs { + if err := a.computeForScope(ScopeTypeLink, linkID, week); err != nil { + log.Printf("[WARN] Failed to compute accuracy for link %s: %v", linkID, err) + } + } + + return nil +} + +// computePerZone computes accuracy for each zone +func (a *AccuracyComputer) computePerZone(week string) error { + zoneIDs := a.getUniqueScopeIDs(ScopeTypeZone) + + for _, zoneID := range zoneIDs { + if err := a.computeForScope(ScopeTypeZone, zoneID, week); err != nil { + log.Printf("[WARN] Failed to compute accuracy for zone %s: %v", zoneID, err) + } + } + + return nil +} + +// getUniqueScopeIDs extracts unique scope IDs from feedback +func (a *AccuracyComputer) getUniqueScopeIDs(scopeType string) []string { + // This would query distinct scope IDs from feedback + // Simplified implementation for now + return nil +} + +// parseWeekString parses a week string (e.g., "2026-W13") into a time +func parseWeekString(week string) (time.Time, error) { + var year, weekNum int + _, err := fmt.Sscanf(week, "%d-W%d", &year, &weekNum) + if err != nil { + return time.Time{}, err + } + + // Get the first day of the year + t := time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC) + + // Add weeks (ISO weeks start on Monday) + for t.Weekday() != time.Monday { + t = t.AddDate(0, 0, -1) + } + + // Add the week offset + t = t.AddDate(0, 0, (weekNum-1)*7) + + return t, nil +} + +// GetAccuracyHistory retrieves accuracy history for a scope +func (a *AccuracyComputer) GetAccuracyHistory(scopeType, scopeID string, weeks int) ([]AccuracyRecord, error) { + return a.store.GetAccuracyHistory(scopeType, scopeID, weeks) +} + +// GetCurrentAccuracy retrieves current week's accuracy for a scope +func (a *AccuracyComputer) GetCurrentAccuracy(scopeType, scopeID string) (*AccuracyRecord, error) { + currentWeek := GetWeekString(time.Now()) + records, err := a.store.GetAccuracyHistory(scopeType, scopeID, 1) + if err != nil || len(records) == 0 { + return nil, err + } + + // Find the current week's record + for _, r := range records { + if r.Week == currentWeek { + return &r, nil + } + } + + return nil, nil +} + +// GetImprovementStats calculates improvement statistics +func (a *AccuracyComputer) GetImprovementStats() (map[string]interface{}, error) { + currentWeek := GetWeekString(time.Now()) + lastWeek := GetWeekString(time.Now().AddDate(0, 0, -7)) + + currentRecords, err := a.store.GetAllAccuracyRecords(currentWeek) + if err != nil { + return nil, err + } + + lastWeekRecords, err := a.store.GetAllAccuracyRecords(lastWeek) + if err != nil { + return nil, err + } + + // Calculate average F1 for each week + currentAvg := 0.0 + currentCount := 0 + for _, r := range currentRecords { + if r.ScopeType == ScopeTypeSystem { + currentAvg = r.F1 + currentCount = 1 + break + } + } + + lastAvg := 0.0 + lastCount := 0 + for _, r := range lastWeekRecords { + if r.ScopeType == ScopeTypeSystem { + lastAvg = r.F1 + lastCount = 1 + break + } + } + + // Calculate improvement percentage + improvement := 0.0 + if lastCount > 0 && currentCount > 0 && lastAvg > 0 { + improvement = ((currentAvg - lastAvg) / lastAvg) * 100 + } + + // Get feedback stats + stats, err := a.store.GetFeedbackStats() + if err != nil { + return nil, err + } + + return map[string]interface{}{ + "current_f1": currentAvg, + "last_week_f1": lastAvg, + "improvement_pct": improvement, + "total_feedback": stats["total_count"], + "this_week_feedback": stats["this_week_count"], + "unprocessed_count": stats["unprocessed_count"], + }, nil +} diff --git a/mothership/internal/learning/feedback_processor.go b/mothership/internal/learning/feedback_processor.go new file mode 100644 index 0000000..6384660 --- /dev/null +++ b/mothership/internal/learning/feedback_processor.go @@ -0,0 +1,228 @@ +// Package learning provides feedback processing for detection accuracy +package learning + +import ( + "context" + "log" + "sync" + "time" +) + +// ProcessorConfig holds configuration for the feedback processor +type ProcessorConfig struct { + ProcessInterval time.Duration // How often to process unprocessed feedback + RetentionWindow time.Duration // How long to keep false positive/negative frames +} + +// DefaultProcessorConfig returns default configuration +func DefaultProcessorConfig() ProcessorConfig { + return ProcessorConfig{ + ProcessInterval: 6 * time.Hour, + RetentionWindow: 30 * 24 * time.Hour, // 30 days + } +} + +// Processor handles background processing of detection feedback +type Processor struct { + store *FeedbackStore + config ProcessorConfig + mu sync.RWMutex + running bool + + // Callbacks for extending processor behavior + onFalsePositive func(feedback FeedbackRecord, details map[string]interface{}) + onFalseNegative func(feedback FeedbackRecord, details map[string]interface{}) +} + +// NewProcessor creates a new feedback processor +func NewProcessor(store *FeedbackStore, config ProcessorConfig) *Processor { + return &Processor{ + store: store, + config: config, + } +} + +// SetOnFalsePositive sets a callback for false positive processing +func (p *Processor) SetOnFalsePositive(fn func(feedback FeedbackRecord, details map[string]interface{})) { + p.mu.Lock() + defer p.mu.Unlock() + p.onFalsePositive = fn +} + +// SetOnFalseNegative sets a callback for false negative processing +func (p *Processor) SetOnFalseNegative(fn func(feedback FeedbackRecord, details map[string]interface{})) { + p.mu.Lock() + defer p.mu.Unlock() + p.onFalseNegative = fn +} + +// Run starts the background processing loop +func (p *Processor) Run(ctx context.Context) { + p.mu.Lock() + p.running = true + p.mu.Unlock() + + ticker := time.NewTicker(p.config.ProcessInterval) + defer ticker.Stop() + + // Process once at startup + p.processBatch() + + for { + select { + case <-ctx.Done(): + p.mu.Lock() + p.running = false + p.mu.Unlock() + return + case <-ticker.C: + p.processBatch() + } + } +} + +// ProcessNow triggers an immediate processing cycle +func (p *Processor) ProcessNow() error { + return p.processBatch() +} + +// processBatch processes all unprocessed feedback +func (p *Processor) processBatch() error { + feedbacks, err := p.store.GetUnprocessedFeedback() + if err != nil { + log.Printf("[WARN] Failed to get unprocessed feedback: %v", err) + return err + } + + if len(feedbacks) == 0 { + return nil + } + + log.Printf("[INFO] Processing %d unprocessed feedback entries", len(feedbacks)) + + var processedIDs []string + + for _, feedback := range feedbacks { + if err := p.processFeedback(feedback); err != nil { + log.Printf("[WARN] Failed to process feedback %s: %v", feedback.ID, err) + continue + } + processedIDs = append(processedIDs, feedback.ID) + } + + // Mark as processed + if len(processedIDs) > 0 { + if err := p.store.MarkFeedbackProcessed(processedIDs); err != nil { + log.Printf("[WARN] Failed to mark feedback as processed: %v", err) + return err + } + log.Printf("[INFO] Marked %d feedback entries as processed", len(processedIDs)) + } + + return nil +} + +// processFeedback handles a single feedback entry +func (p *Processor) processFeedback(feedback FeedbackRecord) error { + switch feedback.FeedbackType { + case FalsePositive: + return p.processFalsePositive(feedback) + case FalseNegative: + return p.processFalseNegative(feedback) + case TruePositive: + // True positives don't need special processing, just mark as processed + return nil + case WrongIdentity, WrongZone: + // These feedback types are informational for now + // Future: could be used to adjust identity/zone thresholds + return nil + default: + log.Printf("[WARN] Unknown feedback type: %s", feedback.FeedbackType) + return nil + } +} + +// processFalsePositive handles false positive feedback +func (p *Processor) processFalsePositive(feedback FeedbackRecord) error { + // Extract CSI-related details if available + details := feedback.Details + if details == nil { + details = make(map[string]interface{}) + } + + // Call extension callback if set + p.mu.RLock() + callback := p.onFalsePositive + p.mu.RUnlock() + + if callback != nil { + callback(feedback, details) + } + + // If we have link_id and delta_rms, store as a false positive frame + if linkID, ok := details["link_id"].(string); ok { + deltaRMS := 0.0 + if d, ok := details["delta_rms"].(float64); ok { + deltaRMS = d + } + + frame := FalsePositiveFrame{ + LinkID: linkID, + Timestamp: feedback.Timestamp, + DeltaRMS: deltaRMS, + Context: details, + } + + if err := p.store.AddFalsePositiveFrame(frame); err != nil { + return err + } + } + + return nil +} + +// processFalseNegative handles false negative feedback +func (p *Processor) processFalseNegative(feedback FeedbackRecord) error { + details := feedback.Details + if details == nil { + details = make(map[string]interface{}) + } + + // Call extension callback if set + p.mu.RLock() + callback := p.onFalseNegative + p.mu.RUnlock() + + if callback != nil { + callback(feedback, details) + } + + // If we have position and link_id, store as a false negative frame + if linkID, ok := details["link_id"].(string); ok { + posX, _ := details["position_x"].(float64) + posY, _ := details["position_y"].(float64) + posZ, _ := details["position_z"].(float64) + + frame := FalseNegativeFrame{ + LinkID: linkID, + Timestamp: feedback.Timestamp, + ExpectedPositionX: posX, + ExpectedPositionY: posY, + ExpectedPositionZ: posZ, + Context: details, + } + + if err := p.store.AddFalseNegativeFrame(frame); err != nil { + return err + } + } + + return nil +} + +// IsRunning returns whether the processor is running +func (p *Processor) IsRunning() bool { + p.mu.RLock() + defer p.mu.RUnlock() + return p.running +} diff --git a/mothership/internal/learning/feedback_store.go b/mothership/internal/learning/feedback_store.go new file mode 100644 index 0000000..2261b7b --- /dev/null +++ b/mothership/internal/learning/feedback_store.go @@ -0,0 +1,626 @@ +// Package learning provides feedback storage and processing for detection accuracy +package learning + +import ( + "database/sql" + "encoding/json" + "fmt" + "log" + "os" + "path/filepath" + "sync" + "time" + + _ "modernc.org/sqlite" +) + +// FeedbackType represents the type of feedback +type FeedbackType string + +const ( + TruePositive FeedbackType = "TRUE_POSITIVE" + FalsePositive FeedbackType = "FALSE_POSITIVE" + FalseNegative FeedbackType = "FALSE_NEGATIVE" + WrongIdentity FeedbackType = "WRONG_IDENTITY" + WrongZone FeedbackType = "WRONG_ZONE" +) + +// EventType represents the type of detection event +type EventType string + +const ( + BlobDetection EventType = "blob_detection" + ZoneTransition EventType = "zone_transition" + FallAlert EventType = "fall_alert" + Anomaly EventType = "anomaly" +) + +// FeedbackRecord represents a single feedback entry +type FeedbackRecord struct { + ID string `json:"id"` + EventID string `json:"event_id"` + EventType EventType `json:"event_type"` + FeedbackType FeedbackType `json:"feedback_type"` + Details map[string]interface{} `json:"details"` + Timestamp time.Time `json:"timestamp"` + Applied bool `json:"applied"` + ProcessedAt *time.Time `json:"processed_at,omitempty"` +} + +// FalsePositiveFrame represents CSI data for a known false positive +type FalsePositiveFrame struct { + LinkID string `json:"link_id"` + Timestamp time.Time `json:"timestamp"` + DeltaRMS float64 `json:"delta_rms"` + Context map[string]interface{} `json:"context"` +} + +// FalseNegativeFrame represents CSI data for a known false negative +type FalseNegativeFrame struct { + LinkID string `json:"link_id"` + Timestamp time.Time `json:"timestamp"` + ExpectedPositionX float64 `json:"expected_position_x"` + ExpectedPositionY float64 `json:"expected_position_y"` + ExpectedPositionZ float64 `json:"expected_position_z"` + Context map[string]interface{} `json:"context"` +} + +// FeedbackStore persists detection feedback to SQLite +type FeedbackStore struct { + mu sync.RWMutex + db *sql.DB + path string +} + +// NewFeedbackStore creates a new feedback persistence store +func NewFeedbackStore(dbPath string) (*FeedbackStore, error) { + if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil { + return nil, fmt.Errorf("create data dir: %w", err) + } + + db, err := sql.Open("sqlite", dbPath) + if err != nil { + return nil, err + } + + db.SetMaxOpenConns(1) + + store := &FeedbackStore{ + db: db, + path: dbPath, + } + + if err := store.initSchema(); err != nil { + db.Close() + return nil, err + } + + return store, nil +} + +// initSchema creates the necessary tables +func (s *FeedbackStore) initSchema() error { + schema := ` + -- Detection feedback from users + CREATE TABLE IF NOT EXISTS detection_feedback ( + id TEXT PRIMARY KEY, + event_id TEXT, + event_type TEXT NOT NULL, + feedback_type TEXT NOT NULL, + details_json TEXT, + timestamp INTEGER NOT NULL, + applied INTEGER DEFAULT 0, + processed_at INTEGER + ); + + -- Known false positive CSI frames for weight learner + CREATE TABLE IF NOT EXISTS false_positive_frames ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + link_id TEXT NOT NULL, + timestamp INTEGER NOT NULL, + delta_rms REAL NOT NULL, + context_json TEXT + ); + + -- Known false negative CSI frames for weight learner + CREATE TABLE IF NOT EXISTS false_negative_frames ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + link_id TEXT NOT NULL, + timestamp INTEGER NOT NULL, + expected_position_x REAL NOT NULL, + expected_position_y REAL NOT NULL, + expected_position_z REAL NOT NULL, + context_json TEXT + ); + + -- Detection accuracy metrics (weekly rollups) + CREATE TABLE IF NOT EXISTS detection_accuracy ( + week TEXT NOT NULL, + scope_type TEXT NOT NULL, + scope_id TEXT NOT NULL, + precision REAL NOT NULL, + recall REAL NOT NULL, + f1 REAL NOT NULL, + tp_count INTEGER NOT NULL, + fp_count INTEGER NOT NULL, + fn_count INTEGER NOT NULL, + computed_at INTEGER NOT NULL, + PRIMARY KEY (week, scope_type, scope_id) + ); + + -- Indexes for common queries + CREATE INDEX IF NOT EXISTS idx_feedback_applied ON detection_feedback(applied); + CREATE INDEX IF NOT EXISTS idx_feedback_time ON detection_feedback(timestamp); + CREATE INDEX IF NOT EXISTS idx_feedback_event ON detection_feedback(event_id); + CREATE INDEX IF NOT EXISTS idx_fp_link_time ON false_positive_frames(link_id, timestamp); + CREATE INDEX IF NOT EXISTS idx_fn_link_time ON false_negative_frames(link_id, timestamp); + CREATE INDEX IF NOT EXISTS idx_accuracy_week ON detection_accuracy(week); + ` + + _, err := s.db.Exec(schema) + return err +} + +// RecordFeedback stores a new feedback entry +func (s *FeedbackStore) RecordFeedback(feedback FeedbackRecord) error { + s.mu.Lock() + defer s.mu.Unlock() + + detailsJSON, err := json.Marshal(feedback.Details) + if err != nil { + return fmt.Errorf("marshal details: %w", err) + } + + var processedAt interface{} + if feedback.ProcessedAt != nil { + processedAt = feedback.ProcessedAt.Unix() + } + + _, err = s.db.Exec(` + INSERT INTO detection_feedback (id, event_id, event_type, feedback_type, details_json, timestamp, applied, processed_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `, feedback.ID, feedback.EventID, feedback.EventType, feedback.FeedbackType, + string(detailsJSON), feedback.Timestamp.Unix(), boolToInt(feedback.Applied), processedAt) + + return err +} + +// GetUnprocessedFeedback returns all feedback entries where applied = false +func (s *FeedbackStore) GetUnprocessedFeedback() ([]FeedbackRecord, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + rows, err := s.db.Query(` + SELECT id, event_id, event_type, feedback_type, details_json, timestamp, applied, processed_at + FROM detection_feedback + WHERE applied = 0 + ORDER BY timestamp ASC + `) + if err != nil { + return nil, err + } + defer rows.Close() + + var records []FeedbackRecord + for rows.Next() { + var r FeedbackRecord + var timestamp int64 + var processedAt sql.NullInt64 + var detailsJSON string + + if err := rows.Scan(&r.ID, &r.EventID, &r.EventType, &r.FeedbackType, + &detailsJSON, ×tamp, &r.Applied, &processedAt); err != nil { + continue + } + + r.Timestamp = time.Unix(timestamp, 0) + if processedAt.Valid { + t := time.Unix(processedAt.Int64, 0) + r.ProcessedAt = &t + } + + if err := json.Unmarshal([]byte(detailsJSON), &r.Details); err != nil { + r.Details = make(map[string]interface{}) + } + + records = append(records, r) + } + + return records, nil +} + +// MarkFeedbackProcessed marks feedback as processed after the learner has applied it +func (s *FeedbackStore) MarkFeedbackProcessed(ids []string) error { + if len(ids) == 0 { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + tx, err := s.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + now := time.Now().Unix() + + stmt, err := tx.Prepare(` + UPDATE detection_feedback + SET applied = 1, processed_at = ? + WHERE id = ? + `) + if err != nil { + return err + } + defer stmt.Close() + + for _, id := range ids { + if _, err := stmt.Exec(now, id); err != nil { + log.Printf("[WARN] Failed to mark feedback %s as processed: %v", id, err) + } + } + + return tx.Commit() +} + +// AddFalsePositiveFrame adds CSI frame data for a known false positive +func (s *FeedbackStore) AddFalsePositiveFrame(frame FalsePositiveFrame) error { + s.mu.Lock() + defer s.mu.Unlock() + + contextJSON, err := json.Marshal(frame.Context) + if err != nil { + return fmt.Errorf("marshal context: %w", err) + } + + _, err = s.db.Exec(` + INSERT INTO false_positive_frames (link_id, timestamp, delta_rms, context_json) + VALUES (?, ?, ?, ?) + `, frame.LinkID, frame.Timestamp.Unix(), frame.DeltaRMS, string(contextJSON)) + + return err +} + +// AddFalseNegativeFrame adds CSI frame data for a known false negative +func (s *FeedbackStore) AddFalseNegativeFrame(frame FalseNegativeFrame) error { + s.mu.Lock() + defer s.mu.Unlock() + + contextJSON, err := json.Marshal(frame.Context) + if err != nil { + return fmt.Errorf("marshal context: %w", err) + } + + _, err = s.db.Exec(` + INSERT INTO false_negative_frames (link_id, timestamp, expected_position_x, expected_position_y, expected_position_z, context_json) + VALUES (?, ?, ?, ?, ?, ?) + `, frame.LinkID, frame.Timestamp.Unix(), + frame.ExpectedPositionX, frame.ExpectedPositionY, frame.ExpectedPositionZ, + string(contextJSON)) + + return err +} + +// GetFalsePositiveFrames returns all false positive frames for a link within a window +func (s *FeedbackStore) GetFalsePositiveFrames(linkID string, window time.Duration) ([]FalsePositiveFrame, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + cutoff := time.Now().Add(-window).Unix() + + rows, err := s.db.Query(` + SELECT link_id, timestamp, delta_rms, context_json + FROM false_positive_frames + WHERE link_id = ? AND timestamp >= ? + ORDER BY timestamp ASC + `, linkID, cutoff) + if err != nil { + return nil, err + } + defer rows.Close() + + var frames []FalsePositiveFrame + for rows.Next() { + var f FalsePositiveFrame + var timestamp int64 + var contextJSON string + + if err := rows.Scan(&f.LinkID, ×tamp, &f.DeltaRMS, &contextJSON); err != nil { + continue + } + + f.Timestamp = time.Unix(timestamp, 0) + if err := json.Unmarshal([]byte(contextJSON), &f.Context); err != nil { + f.Context = make(map[string]interface{}) + } + + frames = append(frames, f) + } + + return frames, nil +} + +// GetFalseNegativeFrames returns all false negative frames for a link within a window +func (s *FeedbackStore) GetFalseNegativeFrames(linkID string, window time.Duration) ([]FalseNegativeFrame, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + cutoff := time.Now().Add(-window).Unix() + + rows, err := s.db.Query(` + SELECT link_id, timestamp, expected_position_x, expected_position_y, expected_position_z, context_json + FROM false_negative_frames + WHERE link_id = ? AND timestamp >= ? + ORDER BY timestamp ASC + `, linkID, cutoff) + if err != nil { + return nil, err + } + defer rows.Close() + + var frames []FalseNegativeFrame + for rows.Next() { + var f FalseNegativeFrame + var timestamp int64 + var contextJSON string + + if err := rows.Scan(&f.LinkID, ×tamp, &f.ExpectedPositionX, &f.ExpectedPositionY, + &f.ExpectedPositionZ, &contextJSON); err != nil { + continue + } + + f.Timestamp = time.Unix(timestamp, 0) + if err := json.Unmarshal([]byte(contextJSON), &f.Context); err != nil { + f.Context = make(map[string]interface{}) + } + + frames = append(frames, f) + } + + return frames, nil +} + +// AccuracyRecord represents weekly accuracy metrics for a scope +type AccuracyRecord struct { + Week string `json:"week"` + ScopeType string `json:"scope_type"` + ScopeID string `json:"scope_id"` + Precision float64 `json:"precision"` + Recall float64 `json:"recall"` + F1 float64 `json:"f1"` + TPCount int `json:"tp_count"` + FPCount int `json:"fp_count"` + FNCount int `json:"fn_count"` + ComputedAt time.Time `json:"computed_at"` +} + +// SaveAccuracyRecord saves a weekly accuracy record +func (s *FeedbackStore) SaveAccuracyRecord(record AccuracyRecord) error { + s.mu.Lock() + defer s.mu.Unlock() + + _, err := s.db.Exec(` + INSERT OR REPLACE INTO detection_accuracy + (week, scope_type, scope_id, precision, recall, f1, tp_count, fp_count, fn_count, computed_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, record.Week, record.ScopeType, record.ScopeID, record.Precision, record.Recall, + record.F1, record.TPCount, record.FPCount, record.FNCount, record.ComputedAt.Unix()) + + return err +} + +// GetAccuracyHistory returns accuracy records for a scope over time +func (s *FeedbackStore) GetAccuracyHistory(scopeType, scopeID string, weeks int) ([]AccuracyRecord, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + rows, err := s.db.Query(` + SELECT week, scope_type, scope_id, precision, recall, f1, tp_count, fp_count, fn_count, computed_at + FROM detection_accuracy + WHERE scope_type = ? AND scope_id = ? + ORDER BY week DESC + LIMIT ? + `, scopeType, scopeID, weeks) + if err != nil { + return nil, err + } + defer rows.Close() + + var records []AccuracyRecord + for rows.Next() { + var r AccuracyRecord + var computedAt int64 + + if err := rows.Scan(&r.Week, &r.ScopeType, &r.ScopeID, &r.Precision, &r.Recall, + &r.F1, &r.TPCount, &r.FPCount, &r.FNCount, &computedAt); err != nil { + continue + } + + r.ComputedAt = time.Unix(computedAt, 0) + records = append(records, r) + } + + // Reverse to get chronological order + for i, j := 0, len(records)-1; i < j; i, j = i+1, j-1 { + records[i], records[j] = records[j], records[i] + } + + return records, nil +} + +// GetAllAccuracyRecords returns all accuracy records for a week +func (s *FeedbackStore) GetAllAccuracyRecords(week string) ([]AccuracyRecord, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + rows, err := s.db.Query(` + SELECT week, scope_type, scope_id, precision, recall, f1, tp_count, fp_count, fn_count, computed_at + FROM detection_accuracy + WHERE week = ? + ORDER BY scope_type, scope_id + `, week) + if err != nil { + return nil, err + } + defer rows.Close() + + var records []AccuracyRecord + for rows.Next() { + var r AccuracyRecord + var computedAt int64 + + if err := rows.Scan(&r.Week, &r.ScopeType, &r.ScopeID, &r.Precision, &r.Recall, + &r.F1, &r.TPCount, &r.FPCount, &r.FNCount, &computedAt); err != nil { + continue + } + + r.ComputedAt = time.Unix(computedAt, 0) + records = append(records, r) + } + + return records, nil +} + +// GetFeedbackStats returns overall feedback statistics +func (s *FeedbackStore) GetFeedbackStats() (map[string]interface{}, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + stats := make(map[string]interface{}) + + // Total feedback count + var totalCount int + row := s.db.QueryRow(`SELECT COUNT(*) FROM detection_feedback`) + if err := row.Scan(&totalCount); err == nil { + stats["total_count"] = totalCount + } + + // Unprocessed count + var unprocessedCount int + row = s.db.QueryRow(`SELECT COUNT(*) FROM detection_feedback WHERE applied = 0`) + if err := row.Scan(&unprocessedCount); err == nil { + stats["unprocessed_count"] = unprocessedCount + } + + // Processed count + var processedCount int + row = s.db.QueryRow(`SELECT COUNT(*) FROM detection_feedback WHERE applied = 1`) + if err := row.Scan(&processedCount); err == nil { + stats["processed_count"] = processedCount + } + + // Count by feedback type + typeRows, err := s.db.Query(` + SELECT feedback_type, COUNT(*) as count + FROM detection_feedback + GROUP BY feedback_type + `) + if err == nil { + defer typeRows.Close() + byType := make(map[string]int) + for typeRows.Next() { + var ft string + var count int + if err := typeRows.Scan(&ft, &count); err == nil { + byType[ft] = count + } + } + stats["by_type"] = byType + } + + // This week's feedback count + weekStart := getWeekStart(time.Now()).Unix() + var weekCount int + row = s.db.QueryRow(`SELECT COUNT(*) FROM detection_feedback WHERE timestamp >= ?`, weekStart) + if err := row.Scan(&weekCount); err == nil { + stats["this_week_count"] = weekCount + } + + return stats, nil +} + +// GetFeedbackByEvent returns feedback for a specific event +func (s *FeedbackStore) GetFeedbackByEvent(eventID string) ([]FeedbackRecord, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + rows, err := s.db.Query(` + SELECT id, event_id, event_type, feedback_type, details_json, timestamp, applied, processed_at + FROM detection_feedback + WHERE event_id = ? + ORDER BY timestamp DESC + `, eventID) + if err != nil { + return nil, err + } + defer rows.Close() + + var records []FeedbackRecord + for rows.Next() { + var r FeedbackRecord + var timestamp int64 + var processedAt sql.NullInt64 + var detailsJSON string + + if err := rows.Scan(&r.ID, &r.EventID, &r.EventType, &r.FeedbackType, + &detailsJSON, ×tamp, &r.Applied, &processedAt); err != nil { + continue + } + + r.Timestamp = time.Unix(timestamp, 0) + if processedAt.Valid { + t := time.Unix(processedAt.Int64, 0) + r.ProcessedAt = &t + } + + if err := json.Unmarshal([]byte(detailsJSON), &r.Details); err != nil { + r.Details = make(map[string]interface{}) + } + + records = append(records, r) + } + + return records, nil +} + +// GetFeedbackCount returns the total number of feedback entries +func (s *FeedbackStore) GetFeedbackCount() (int, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var count int + row := s.db.QueryRow(`SELECT COUNT(*) FROM detection_feedback`) + err := row.Scan(&count) + return count, err +} + +// Close closes the database connection +func (s *FeedbackStore) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.db.Close() +} + +// getWeekStart returns the Monday of the week containing t +func getWeekStart(t time.Time) time.Time { + weekday := int(t.Weekday()) + if weekday == 0 { + weekday = 7 + } + return t.AddDate(0, 0, -weekday+1).Truncate(24 * time.Hour) +} + +// GetWeekString returns the ISO week string for a time (e.g., "2026-W13") +func GetWeekString(t time.Time) string { + year, week := t.ISOWeek() + return fmt.Sprintf("%d-W%02d", year, week) +} + +func boolToInt(b bool) int { + if b { + return 1 + } + return 0 +} diff --git a/mothership/internal/learning/feedback_test.go b/mothership/internal/learning/feedback_test.go new file mode 100644 index 0000000..4ee3ec0 --- /dev/null +++ b/mothership/internal/learning/feedback_test.go @@ -0,0 +1,422 @@ +package learning + +import ( + "os" + "path/filepath" + "testing" + "time" +) + +func TestNewFeedbackStore(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "learning-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + dbPath := filepath.Join(tmpDir, "learning.db") + store, err := NewFeedbackStore(dbPath) + if err != nil { + t.Fatalf("Failed to create feedback store: %v", err) + } + defer store.Close() + + // Verify database file was created + if _, err := os.Stat(dbPath); os.IsNotExist(err) { + t.Error("Database file was not created") + } +} + +func TestRecordFeedback(t *testing.T) { + store := setupTestFeedbackStore(t) + defer store.Close() + + // Record a true positive + feedback := FeedbackRecord{ + ID: "test-1", + EventID: "event-1", + EventType: BlobDetection, + FeedbackType: TruePositive, + Details: map[string]interface{}{ + "zone_id": "zone-kitchen", + "notes": "Correct detection", + }, + Timestamp: time.Now(), + Applied: false, + } + + err := store.RecordFeedback(feedback) + if err != nil { + t.Fatalf("RecordFeedback failed: %v", err) + } + + // Verify feedback count + count, err := store.GetFeedbackCount() + if err != nil { + t.Fatalf("GetFeedbackCount failed: %v", err) + } + if count != 1 { + t.Errorf("Expected count 1, got %d", count) + } +} + +func TestGetUnprocessedFeedback(t *testing.T) { + store := setupTestFeedbackStore(t) + defer store.Close() + + // Record multiple feedback entries + for i := 0; i < 3; i++ { + feedback := FeedbackRecord{ + ID: "test-" + string(rune('a'+i)), + EventID: "event-" + string(rune('a'+i)), + EventType: BlobDetection, + FeedbackType: TruePositive, + Timestamp: time.Now(), + Applied: false, + } + store.RecordFeedback(feedback) + } + + // Get unprocessed feedback + feedbacks, err := store.GetUnprocessedFeedback() + if err != nil { + t.Fatalf("GetUnprocessedFeedback failed: %v", err) + } + + if len(feedbacks) != 3 { + t.Errorf("Expected 3 unprocessed feedback entries, got %d", len(feedbacks)) + } +} + +func TestMarkFeedbackProcessed(t *testing.T) { + store := setupTestFeedbackStore(t) + defer store.Close() + + // Record feedback + feedback := FeedbackRecord{ + ID: "test-1", + EventID: "event-1", + EventType: BlobDetection, + FeedbackType: FalsePositive, + Timestamp: time.Now(), + Applied: false, + } + store.RecordFeedback(feedback) + + // Verify unprocessed + feedbacks, _ := store.GetUnprocessedFeedback() + if len(feedbacks) != 1 { + t.Fatalf("Expected 1 unprocessed, got %d", len(feedbacks)) + } + + // Mark as processed + err := store.MarkFeedbackProcessed([]string{"test-1"}) + if err != nil { + t.Fatalf("MarkFeedbackProcessed failed: %v", err) + } + + // Verify no unprocessed remain + feedbacks, _ = store.GetUnprocessedFeedback() + if len(feedbacks) != 0 { + t.Errorf("Expected 0 unprocessed, got %d", len(feedbacks)) + } + + // Verify stats show processed count + stats, err := store.GetFeedbackStats() + if err != nil { + t.Fatalf("GetFeedbackStats failed: %v", err) + } + if stats["processed_count"].(int) != 1 { + t.Errorf("Expected processed_count 1, got %d", stats["processed_count"]) + } +} + +func TestFalsePositiveFrameStorage(t *testing.T) { + store := setupTestFeedbackStore(t) + defer store.Close() + + // Add false positive frame + frame := FalsePositiveFrame{ + LinkID: "link-1-2", + Timestamp: time.Now(), + DeltaRMS: 0.15, + Context: map[string]interface{}{ + "zone_id": "zone-living", + }, + } + + err := store.AddFalsePositiveFrame(frame) + if err != nil { + t.Fatalf("AddFalsePositiveFrame failed: %v", err) + } + + // Retrieve frames + frames, err := store.GetFalsePositiveFrames("link-1-2", 24*time.Hour) + if err != nil { + t.Fatalf("GetFalsePositiveFrames failed: %v", err) + } + + if len(frames) != 1 { + t.Errorf("Expected 1 frame, got %d", len(frames)) + } + + if frames[0].DeltaRMS != 0.15 { + t.Errorf("Expected DeltaRMS 0.15, got %f", frames[0].DeltaRMS) + } +} + +func TestFalseNegativeFrameStorage(t *testing.T) { + store := setupTestFeedbackStore(t) + defer store.Close() + + // Add false negative frame + frame := FalseNegativeFrame{ + LinkID: "link-2-3", + Timestamp: time.Now(), + ExpectedPositionX: 1.5, + ExpectedPositionY: 0.0, + ExpectedPositionZ: 2.0, + Context: map[string]interface{}{ + "user_reported": true, + }, + } + + err := store.AddFalseNegativeFrame(frame) + if err != nil { + t.Fatalf("AddFalseNegativeFrame failed: %v", err) + } + + // Retrieve frames + frames, err := store.GetFalseNegativeFrames("link-2-3", 24*time.Hour) + if err != nil { + t.Fatalf("GetFalseNegativeFrames failed: %v", err) + } + + if len(frames) != 1 { + t.Errorf("Expected 1 frame, got %d", len(frames)) + } + + if frames[0].ExpectedPositionX != 1.5 { + t.Errorf("Expected X position 1.5, got %f", frames[0].ExpectedPositionX) + } +} + +func TestSaveAccuracyRecord(t *testing.T) { + store := setupTestFeedbackStore(t) + defer store.Close() + + record := AccuracyRecord{ + Week: GetWeekString(time.Now()), + ScopeType: ScopeTypeSystem, + ScopeID: ScopeIDSystem, + Precision: 0.8, + Recall: 0.888, + F1: 0.841, + TPCount: 8, + FPCount: 2, + FNCount: 1, + ComputedAt: time.Now(), + } + + err := store.SaveAccuracyRecord(record) + if err != nil { + t.Fatalf("SaveAccuracyRecord failed: %v", err) + } + + // Retrieve history + records, err := store.GetAccuracyHistory(ScopeTypeSystem, ScopeIDSystem, 1) + if err != nil { + t.Fatalf("GetAccuracyHistory failed: %v", err) + } + + if len(records) != 1 { + t.Errorf("Expected 1 record, got %d", len(records)) + } + + // Verify values + if records[0].Precision != 0.8 { + t.Errorf("Expected precision 0.8, got %f", records[0].Precision) + } + if records[0].TPCount != 8 { + t.Errorf("Expected TP count 8, got %d", records[0].TPCount) + } +} + +func TestAccuracyMetrics(t *testing.T) { + // Test precision/recall/F1 calculation + // precision = TP / (TP + FP) + // recall = TP / (TP + FN) + // F1 = 2 * precision * recall / (precision + recall) + + tp := 8 + fp := 2 + fn := 1 + + precision := float64(tp) / float64(tp+fp) // 0.8 + recall := float64(tp) / float64(tp+fn) // 0.888... + f1 := 2 * precision * recall / (precision + recall) + + if precision != 0.8 { + t.Errorf("Expected precision 0.8, got %f", precision) + } + + expectedRecall := 8.0 / 9.0 + if recall < expectedRecall-0.001 || recall > expectedRecall+0.001 { + t.Errorf("Expected recall ~0.888, got %f", recall) + } + + // F1 should be around 0.842 + if f1 < 0.84 || f1 > 0.85 { + t.Errorf("Expected F1 ~0.842, got %f", f1) + } +} + +func TestGetFeedbackStats(t *testing.T) { + store := setupTestFeedbackStore(t) + defer store.Close() + + // Record various feedback types + types := []FeedbackType{TruePositive, FalsePositive, FalseNegative, TruePositive} + for i, ft := range types { + store.RecordFeedback(FeedbackRecord{ + ID: "test-" + string(rune('a'+i)), + EventType: BlobDetection, + FeedbackType: ft, + Timestamp: time.Now(), + Applied: false, + }) + } + + stats, err := store.GetFeedbackStats() + if err != nil { + t.Fatalf("GetFeedbackStats failed: %v", err) + } + + if stats["total_count"].(int) != 4 { + t.Errorf("Expected total_count 4, got %d", stats["total_count"]) + } + + byType := stats["by_type"].(map[string]int) + if byType[string(TruePositive)] != 2 { + t.Errorf("Expected 2 TRUE_POSITIVE, got %d", byType[string(TruePositive)]) + } + if byType[string(FalsePositive)] != 1 { + t.Errorf("Expected 1 FALSE_POSITIVE, got %d", byType[string(FalsePositive)]) + } +} + +func TestGetFeedbackByEvent(t *testing.T) { + store := setupTestFeedbackStore(t) + defer store.Close() + + // Record feedback for specific event + store.RecordFeedback(FeedbackRecord{ + ID: "test-1", + EventID: "event-123", + EventType: BlobDetection, + FeedbackType: TruePositive, + Timestamp: time.Now(), + }) + store.RecordFeedback(FeedbackRecord{ + ID: "test-2", + EventID: "event-456", + EventType: FallAlert, + FeedbackType: FalsePositive, + Timestamp: time.Now(), + }) + + // Get feedback for event-123 + feedbacks, err := store.GetFeedbackByEvent("event-123") + if err != nil { + t.Fatalf("GetFeedbackByEvent failed: %v", err) + } + + if len(feedbacks) != 1 { + t.Errorf("Expected 1 feedback for event-123, got %d", len(feedbacks)) + } + + if feedbacks[0].FeedbackType != TruePositive { + t.Errorf("Expected TRUE_POSITIVE, got %s", feedbacks[0].FeedbackType) + } +} + +func TestGetWeekString(t *testing.T) { + // Test that GetWeekString produces ISO week format + testTime := time.Date(2026, 3, 29, 12, 0, 0, 0, time.UTC) + weekStr := GetWeekString(testTime) + + // Should be in format "2026-W14" (March 29, 2026 is in week 14) + if len(weekStr) != 8 { + t.Errorf("Expected week string length 8, got %d", len(weekStr)) + } + + // Should start with year + if weekStr[:5] != "2026-" { + t.Errorf("Expected week string to start with '2026-', got %s", weekStr[:5]) + } +} + +func TestFeedbackProcessor(t *testing.T) { + store := setupTestFeedbackStore(t) + defer store.Close() + + // Create processor + config := DefaultProcessorConfig() + processor := NewProcessor(store, config) + + // Record false positive with link details + store.RecordFeedback(FeedbackRecord{ + ID: "test-1", + EventID: "event-1", + EventType: BlobDetection, + FeedbackType: FalsePositive, + Details: map[string]interface{}{ + "link_id": "link-1-2", + "delta_rms": 0.15, + }, + Timestamp: time.Now(), + Applied: false, + }) + + // Process feedback + err := processor.ProcessNow() + if err != nil { + t.Fatalf("ProcessNow failed: %v", err) + } + + // Verify feedback was marked as processed + feedbacks, _ := store.GetUnprocessedFeedback() + if len(feedbacks) != 0 { + t.Errorf("Expected 0 unprocessed, got %d", len(feedbacks)) + } + + // Verify false positive frame was stored + frames, err := store.GetFalsePositiveFrames("link-1-2", 24*time.Hour) + if err != nil { + t.Fatalf("GetFalsePositiveFrames failed: %v", err) + } + if len(frames) != 1 { + t.Errorf("Expected 1 false positive frame, got %d", len(frames)) + } +} + +// Helper function to set up a test feedback store +func setupTestFeedbackStore(t *testing.T) *FeedbackStore { + tmpDir, err := os.MkdirTemp("", "learning-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + + dbPath := filepath.Join(tmpDir, "learning.db") + store, err := NewFeedbackStore(dbPath) + if err != nil { + os.RemoveAll(tmpDir) + t.Fatalf("Failed to create feedback store: %v", err) + } + + t.Cleanup(func() { + os.RemoveAll(tmpDir) + }) + + return store +} diff --git a/mothership/internal/learning/handler.go b/mothership/internal/learning/handler.go new file mode 100644 index 0000000..b6a5bb1 --- /dev/null +++ b/mothership/internal/learning/handler.go @@ -0,0 +1,311 @@ +// Package learning provides REST API handlers for feedback and accuracy +package learning + +import ( + "encoding/json" + "net/http" + "strconv" + "time" + + "github.com/go-chi/chi" + "github.com/google/uuid" +) + +// Handler provides REST API handlers for the learning package +type Handler struct { + store *FeedbackStore + processor *Processor + accuracyComp *AccuracyComputer +} + +// NewHandler creates a new learning handler +func NewHandler(store *FeedbackStore, processor *Processor, accuracyComp *AccuracyComputer) *Handler { + return &Handler{ + store: store, + processor: processor, + accuracyComp: accuracyComp, + } +} + +// RegisterRoutes registers learning API routes on the given router +func (h *Handler) RegisterRoutes(r chi.Router) { + // Feedback submission and retrieval + r.Post("/api/learning/feedback", h.handleSubmitFeedback) + r.Get("/api/learning/feedback", h.handleGetFeedback) + r.Get("/api/learning/feedback/{eventID}", h.handleGetFeedbackByEvent) + r.Get("/api/learning/stats", h.handleGetStats) + + // Accuracy metrics + r.Get("/api/learning/accuracy", h.handleGetAccuracy) + r.Get("/api/learning/accuracy/history", h.handleGetAccuracyHistory) + r.Get("/api/learning/accuracy/improvement", h.handleGetImprovement) + + // Manual processing trigger (for testing/admin) + r.Post("/api/learning/process", h.handleTriggerProcess) +} + +// Feedback submission request +type submitFeedbackRequest struct { + EventID string `json:"event_id"` + EventType string `json:"event_type"` + FeedbackType string `json:"feedback_type"` + Details map[string]interface{} `json:"details"` +} + +// handleSubmitFeedback handles POST /api/learning/feedback +func (h *Handler) handleSubmitFeedback(w http.ResponseWriter, r *http.Request) { + var req submitFeedbackRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "invalid request body", http.StatusBadRequest) + return + } + + // Validate feedback type + validTypes := map[string]bool{ + string(TruePositive): true, + string(FalsePositive): true, + string(FalseNegative): true, + string(WrongIdentity): true, + string(WrongZone): true, + } + if !validTypes[req.FeedbackType] { + http.Error(w, "invalid feedback_type", http.StatusBadRequest) + return + } + + // Validate event type + validEventTypes := map[string]bool{ + string(BlobDetection): true, + string(ZoneTransition): true, + string(FallAlert): true, + string(Anomaly): true, + } + if !validEventTypes[req.EventType] { + http.Error(w, "invalid event_type", http.StatusBadRequest) + return + } + + // Create feedback record + feedback := FeedbackRecord{ + ID: uuid.New().String(), + EventID: req.EventID, + EventType: EventType(req.EventType), + FeedbackType: FeedbackType(req.FeedbackType), + Details: req.Details, + Timestamp: time.Now(), + Applied: false, + } + + if feedback.Details == nil { + feedback.Details = make(map[string]interface{}) + } + + // Store feedback + if err := h.store.RecordFeedback(feedback); err != nil { + http.Error(w, "failed to record feedback", http.StatusInternalServerError) + return + } + + writeJSON(w, map[string]interface{}{ + "id": feedback.ID, + "success": true, + }) +} + +// handleGetFeedback handles GET /api/learning/feedback +func (h *Handler) handleGetFeedback(w http.ResponseWriter, r *http.Request) { + // Get pagination params + limitStr := r.URL.Query().Get("limit") + limit := 100 + if limitStr != "" { + if n, err := strconv.Atoi(limitStr); err == nil && n > 0 && n <= 1000 { + limit = n + } + } + + // Get unprocessed param + unprocessedOnly := r.URL.Query().Get("unprocessed") == "true" + + var feedbacks []FeedbackRecord + var err error + + if unprocessedOnly { + feedbacks, err = h.store.GetUnprocessedFeedback() + if limit > 0 && len(feedbacks) > limit { + feedbacks = feedbacks[:limit] + } + } else { + // Would need to implement a general query method + // For now, return stats instead + stats, err := h.store.GetFeedbackStats() + if err != nil { + http.Error(w, "failed to get feedback", http.StatusInternalServerError) + return + } + writeJSON(w, stats) + return + } + + if err != nil { + http.Error(w, "failed to get feedback", http.StatusInternalServerError) + return + } + + writeJSON(w, feedbacks) +} + +// handleGetFeedbackByEvent handles GET /api/learning/feedback/{eventID} +func (h *Handler) handleGetFeedbackByEvent(w http.ResponseWriter, r *http.Request) { + eventID := chi.URLParam(r, "eventID") + if eventID == "" { + http.Error(w, "event_id required", http.StatusBadRequest) + return + } + + feedbacks, err := h.store.GetFeedbackByEvent(eventID) + if err != nil { + http.Error(w, "failed to get feedback", http.StatusInternalServerError) + return + } + + writeJSON(w, feedbacks) +} + +// handleGetStats handles GET /api/learning/stats +func (h *Handler) handleGetStats(w http.ResponseWriter, r *http.Request) { + stats, err := h.store.GetFeedbackStats() + if err != nil { + http.Error(w, "failed to get stats", http.StatusInternalServerError) + return + } + + // Get total feedback count + count, err := h.store.GetFeedbackCount() + if err != nil { + http.Error(w, "failed to get feedback count", http.StatusInternalServerError) + return + } + stats["total_count"] = count + + writeJSON(w, stats) +} + +// handleGetAccuracy handles GET /api/learning/accuracy +func (h *Handler) handleGetAccuracy(w http.ResponseWriter, r *http.Request) { + scopeType := r.URL.Query().Get("scope_type") + scopeID := r.URL.Query().Get("scope_id") + + // Default to system-wide accuracy + if scopeType == "" { + scopeType = ScopeTypeSystem + } + if scopeID == "" { + scopeID = ScopeIDSystem + } + + // Get current week's accuracy + if h.accuracyComp != nil { + record, err := h.accuracyComp.GetCurrentAccuracy(scopeType, scopeID) + if err != nil { + http.Error(w, "failed to get accuracy", http.StatusInternalServerError) + return + } + if record != nil { + writeJSON(w, record) + return + } + } + + // Fallback: return from store + records, err := h.store.GetAccuracyHistory(scopeType, scopeID, 1) + if err != nil { + http.Error(w, "failed to get accuracy", http.StatusInternalServerError) + return + } + + if len(records) == 0 { + writeJSON(w, map[string]interface{}{ + "scope_type": scopeType, + "scope_id": scopeID, + "f1": nil, + "precision": nil, + "recall": nil, + "message": "no accuracy data available yet", + }) + return + } + + writeJSON(w, records[0]) +} + +// handleGetAccuracyHistory handles GET /api/learning/accuracy/history +func (h *Handler) handleGetAccuracyHistory(w http.ResponseWriter, r *http.Request) { + scopeType := r.URL.Query().Get("scope_type") + scopeID := r.URL.Query().Get("scope_id") + weeksStr := r.URL.Query().Get("weeks") + + weeks := 8 + if weeksStr != "" { + if n, err := strconv.Atoi(weeksStr); err == nil && n > 0 && n <= 52 { + weeks = n + } + } + + // Default to system-wide + if scopeType == "" { + scopeType = ScopeTypeSystem + } + if scopeID == "" { + scopeID = ScopeIDSystem + } + + records, err := h.store.GetAccuracyHistory(scopeType, scopeID, weeks) + if err != nil { + http.Error(w, "failed to get accuracy history", http.StatusInternalServerError) + return + } + + writeJSON(w, records) +} + +// handleGetImprovement handles GET /api/learning/accuracy/improvement +func (h *Handler) handleGetImprovement(w http.ResponseWriter, r *http.Request) { + if h.accuracyComp == nil { + http.Error(w, "accuracy computer not available", http.StatusServiceUnavailable) + return + } + + stats, err := h.accuracyComp.GetImprovementStats() + if err != nil { + http.Error(w, "failed to get improvement stats", http.StatusInternalServerError) + return + } + + writeJSON(w, stats) +} + +// handleTriggerProcess handles POST /api/learning/process +func (h *Handler) handleTriggerProcess(w http.ResponseWriter, r *http.Request) { + if h.processor == nil { + http.Error(w, "processor not available", http.StatusServiceUnavailable) + return + } + + // Trigger immediate processing + if err := h.processor.ProcessNow(); err != nil { + http.Error(w, "processing failed", http.StatusInternalServerError) + return + } + + writeJSON(w, map[string]string{ + "status": "processed", + }) +} + +// writeJSON writes a JSON response +func writeJSON(w http.ResponseWriter, v interface{}) { + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(v); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +}