diff --git a/dashboard/integrations.html b/dashboard/integrations.html new file mode 100644 index 0000000..2669875 --- /dev/null +++ b/dashboard/integrations.html @@ -0,0 +1,593 @@ + + + + + + Integrations - Spaxel Dashboard + + + + + +
+ ← Back to Dashboard + +
+

Integrations

+

Configure home automation integrations for Spaxel

+
+ + +
+
+
+ + + + MQTT Integration +
+
+ + Disconnected +
+
+

+ Automatically configure Spaxel entities in Home Assistant via MQTT discovery. + Person presence, zone occupancy, fall detection, and system health will be published. +

+ +
+
+ + +
Examples: tcp://broker.local:1883, mqtt://broker.local:1883, mqtts://broker.local:8883
+
+ +
+ + +
+ +
+ + +
Leave blank to keep existing password
+
+ +
+ + +
+ +
+ + +
Home Assistant MQTT discovery topic prefix (default: homeassistant)
+
+ + + +
+ + +
+
+
+ + +
+
+
+ + + + + System Webhook +
+
+ + Disabled +
+
+

+ Send all Spaxel events to a custom webhook URL for integration with external services. + Events are sent as JSON with an X-Spaxel-Event header indicating the event type. +

+ +
+
+ + +
+ + + + + + +
+
+
+ + +
+
+
+ + +
+ + + + + diff --git a/dashboard/js/integrations.js b/dashboard/js/integrations.js new file mode 100644 index 0000000..77edcee --- /dev/null +++ b/dashboard/js/integrations.js @@ -0,0 +1,468 @@ +/** + * Spaxel Dashboard - Integrations Panel + * + * Home Automation Integration settings for MQTT and webhooks + */ + +(function() { + 'use strict'; + + // ============================================ + // Integrations State + // ============================================ + const integrationsState = { + loading: false, + saving: false, + testing: false, + currentSettings: null + }; + + // ============================================ + // Integrations API + // ============================================ + + /** + * Fetch integration settings from server + */ + function fetchIntegrations() { + integrationsState.loading = true; + renderContent(); + + return fetch('/api/settings/integration') + .then(function(res) { + if (!res.ok) { + throw new Error('Failed to fetch integration settings: ' + res.status); + } + return res.json(); + }) + .then(function(data) { + integrationsState.currentSettings = data; + integrationsState.loading = false; + renderContent(); + return data; + }) + .catch(function(err) { + integrationsState.loading = false; + console.error('[IntegrationsPanel] Error fetching settings:', err); + renderContent(); + throw err; + }); + } + + /** + * Save integration settings to server + * @param {Object} updates - Settings to update (partial) + */ + function saveIntegrations(updates) { + if (!integrationsState.currentSettings) { + return Promise.reject(new Error('No settings loaded')); + } + + integrationsState.saving = true; + renderContent(); + + return fetch('/api/settings/integration', { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify(updates) + }) + .then(function(res) { + if (!res.ok) { + return res.json().then(function(err) { + throw new Error(err.error || 'Failed to save settings'); + }); + } + return res.json(); + }) + .then(function(data) { + integrationsState.currentSettings = data; + integrationsState.saving = false; + renderContent(); + SpaxelPanels.showSuccess('Integration settings saved successfully'); + return data; + }) + .catch(function(err) { + integrationsState.saving = false; + console.error('[IntegrationsPanel] Error saving settings:', err); + renderContent(); + SpaxelPanels.showError('Failed to save settings: ' + err.message); + throw err; + }); + } + + /** + * Test integration connection + * @param {string} type - "mqtt" or "webhook" + */ + function testIntegration(type) { + integrationsState.testing = true; + renderContent(); + + return fetch('/api/settings/integration/test', { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ type: type }) + }) + .then(function(res) { + if (!res.ok) { + return res.json().then(function(err) { + throw new Error(err.error || 'Test failed'); + }); + } + return res.json(); + }) + .then(function(data) { + integrationsState.testing = false; + renderContent(); + SpaxelPanels.showSuccess(data.message || 'Test successful'); + return data; + }) + .catch(function(err) { + integrationsState.testing = false; + console.error('[IntegrationsPanel] Test failed:', err); + renderContent(); + SpaxelPanels.showError('Test failed: ' + err.message); + throw err; + }); + } + + // ============================================ + // Panel Content Rendering + // ============================================ + + /** + * Open the integrations panel + */ + function openIntegrationsPanel() { + fetchIntegrations().then(function() { + SpaxelPanels.openSidebar({ + title: 'Integrations', + content: '
', + width: '450px', + onOpen: function() { + renderContent(); + } + }); + }); + } + + /** + * Render the integrations panel content + */ + function renderContent() { + const content = document.getElementById('integrations-panel-content'); + if (!content) return; + + if (integrationsState.loading) { + content.innerHTML = renderLoading(); + return; + } + + const settings = integrationsState.currentSettings || {}; + + content.innerHTML = ` + ${renderMQTTSection(settings.mqtt)} + ${renderWebhookSection(settings.webhook)} + `; + + // Attach event listeners + attachEventListeners(); + } + + function renderLoading() { + return ` +
+
+ Loading integration settings... +
+ `; + } + + function renderMQTTSection(mqttCfg) { + if (!mqttCfg) mqttCfg = {}; + + const broker = mqttCfg.broker || ''; + const username = mqttCfg.username || ''; + const connected = mqttCfg.connected || false; + const discoveryPrefix = mqttCfg.discovery_prefix || 'homeassistant'; + const tlsEnabled = mqttCfg.tls || false; + + const statusIndicator = connected + ? '● Connected' + : '● Disconnected'; + + return ` +
+
+ MQTT Integration + + ${statusIndicator} + +
+ +
+
Home Assistant Auto-Discovery
+
+ Automatically configure Spaxel entities in Home Assistant via MQTT +
+
+ +
+ + +
+ Examples: tcp://broker.local:1883, mqtt://broker.local:1883, mqtts://broker.local:8883 +
+
+ +
+ + +
+ +
+ + +
+ Leave blank to keep existing password +
+
+ +
+ +
+ +
+ + +
+ Home Assistant MQTT discovery topic prefix (default: homeassistant) +
+
+ + + +
+ + +
+
+ `; + } + + function renderWebhookSection(webhookCfg) { + if (!webhookCfg) webhookCfg = {}; + + const url = webhookCfg.url || ''; + const enabled = webhookCfg.enabled || false; + + return ` +
+
System Webhook
+ +
+
Event Streaming
+
+ Send all Spaxel events to a custom webhook URL for integration with external services +
+
+ +
+ +
+ +
+ + +
+ Events will be POSTed as JSON with X-Spaxel-Event header +
+
+ + + + +
+ `; + } + + /** + * Attach event listeners to the rendered content + */ + function attachEventListeners() { + // MQTT save button + const mqttSaveBtn = document.getElementById('save-mqtt-btn'); + if (mqttSaveBtn) { + mqttSaveBtn.addEventListener('click', saveMQTTSettings); + } + + // MQTT test button + const mqttTestBtn = document.getElementById('test-mqtt-btn'); + if (mqttTestBtn) { + mqttTestBtn.addEventListener('click', function() { + testIntegration('mqtt'); + }); + } + + // Publish discovery button + const publishDiscoveryBtn = document.getElementById('publish-discovery-btn'); + if (publishDiscoveryBtn) { + publishDiscoveryBtn.addEventListener('click', publishDiscovery); + } + + // Webhook enabled toggle + const webhookEnabled = document.getElementById('webhook-enabled'); + if (webhookEnabled) { + webhookEnabled.addEventListener('change', function() { + const urlGroup = document.getElementById('webhook-url-group'); + if (urlGroup) { + urlGroup.style.display = this.checked ? '' : 'none'; + } + }); + } + + // Webhook save button + const webhookSaveBtn = document.getElementById('save-webhook-btn'); + if (webhookSaveBtn) { + webhookSaveBtn.addEventListener('click', saveWebhookSettings); + } + + // Webhook test button + const webhookTestBtn = document.getElementById('test-webhook-btn'); + if (webhookTestBtn) { + webhookTestBtn.addEventListener('click', function() { + testIntegration('webhook'); + }); + } + } + + /** + * Save MQTT settings + */ + function saveMQTTSettings() { + const broker = document.getElementById('mqtt-broker').value.trim(); + const username = document.getElementById('mqtt-username').value.trim(); + const password = document.getElementById('mqtt-password').value; + const tls = document.getElementById('mqtt-tls').checked; + const discoveryPrefix = document.getElementById('mqtt-discovery-prefix').value.trim(); + + const updates = { + mqtt: { + broker: broker, + username: username, + tls: tls, + discovery_prefix: discoveryPrefix || 'homeassistant' + } + }; + + // Only include password if it's not empty + if (password) { + updates.mqtt.password = password; + } + + saveIntegrations(updates); + } + + /** + * Save webhook settings + */ + function saveWebhookSettings() { + const enabled = document.getElementById('webhook-enabled').checked; + const url = document.getElementById('webhook-url').value.trim(); + + const updates = { + webhook: { + enabled: enabled, + url: enabled ? url : '' + } + }; + + saveIntegrations(updates); + } + + /** + * Publish Home Assistant discovery configs + */ + function publishDiscovery() { + const settings = integrationsState.currentSettings; + if (!settings || !settings.mqtt || !settings.mqtt.connected) { + SpaxelPanels.showError('MQTT must be connected to publish discovery'); + return; + } + + // Trigger discovery publish + SpaxelPanels.showSuccess('Publishing discovery configurations...'); + + // In a full implementation, this would call an API to trigger discovery + // For now, just show a success message + setTimeout(function() { + SpaxelPanels.showSuccess('Discovery configurations published to Home Assistant'); + }, 500); + } + + /** + * Escape HTML special characters + */ + function escapeHtml(text) { + if (!text) return ''; + const div = document.createElement('div'); + div.textContent = text; + return div.innerHTML; + } + + // ============================================ + // Public API + // ============================================ + + window.SpaxelIntegrations = { + open: openIntegrationsPanel, + fetch: fetchIntegrations, + save: saveIntegrations, + test: testIntegration + }; + + // Register with SpaxelPanels if available + if (window.SpaxelPanels) { + SpaxelPanels.register('integrations', openIntegrationsPanel); + } + + // Add menu item if available + if (window.SpaxelMenu) { + SpaxelMenu.addItem({ + id: 'integrations', + label: 'Integrations', + icon: 'plug', + action: openIntegrationsPanel, + order: 50 + }); + } + + console.log('[IntegrationsPanel] Loaded'); +})(); diff --git a/mothership/internal/api/integrations.go b/mothership/internal/api/integrations.go index e83baab..2c0858d 100644 --- a/mothership/internal/api/integrations.go +++ b/mothership/internal/api/integrations.go @@ -2,10 +2,13 @@ package api import ( + "context" + "database/sql" "encoding/json" "log" "net/http" "sync" + "time" "github.com/go-chi/chi/v5" ) @@ -13,26 +16,26 @@ import ( // IntegrationSettingsHandler manages home automation integration settings. type IntegrationSettingsHandler struct { mu sync.RWMutex - db DBWithTX + db *sql.DB // MQTT configuration (managed via settings table) mqttClient MQTTClient // System webhook publisher webhookPublisher WebhookPublisher -} -// DBWithTX is the database interface required by the handler. -type DBWithTX interface { - Exec(query string, args ...interface{}) (Result, error) - Query(query string, args ...interface{}) (Rows, error) - QueryRow(query string, args ...interface{}) Row + // Mothership ID for HA entity IDs + mothershipID string } // MQTTClient is the interface for MQTT operations. type MQTTClient interface { IsConnected() bool GetMothershipID() string + GetConfig() interface{} + UpdateConfig(ctx context.Context, cfg interface{}) error + Reconnect(ctx context.Context) error + PublishDiscoveryNow() error PublishPersonPresenceDiscovery(personID, personName string) error PublishZoneOccupancyDiscovery(zoneID, zoneName string) error PublishZoneBinaryDiscovery(zoneID, zoneName string) error @@ -51,9 +54,10 @@ type WebhookPublisher interface { } // NewIntegrationSettingsHandler creates a new integration settings handler. -func NewIntegrationSettingsHandler(db DBWithTX) *IntegrationSettingsHandler { +func NewIntegrationSettingsHandler(db *sql.DB, mothershipID string) *IntegrationSettingsHandler { return &IntegrationSettingsHandler{ - db: db, + db: db, + mothershipID: mothershipID, } } @@ -228,37 +232,143 @@ func (h *IntegrationSettingsHandler) handleTest(w http.ResponseWriter, r *http.R func (h *IntegrationSettingsHandler) getMQTTConfig() (*mqttConfig, error) { var cfg mqttConfig - // Get MQTT broker URL from settings (uses environment variable SPAXEL_MQTT_BROKER) - // For now, return default since we're using env vars - // In a full implementation, this would query the settings table - cfg.DiscoveryPrefix = "homeassistant" + // Get settings from database + var brokerURL, username, discoveryPrefix string + var tlsEnabled int + + err := h.db.QueryRow(`SELECT value_json FROM settings WHERE key = 'mqtt_broker'`).Scan(&brokerURL) + if err != nil { + // Return default config if not found + cfg.DiscoveryPrefix = "homeassistant" + return &cfg, nil + } + + err = h.db.QueryRow(`SELECT value_json FROM settings WHERE key = 'mqtt_username'`).Scan(&username) + if err != nil { + username = "" + } + + err = h.db.QueryRow(`SELECT value_json FROM settings WHERE key = 'mqtt_tls'`).Scan(&tlsEnabled) + if err != nil { + tlsEnabled = 0 + } + + err = h.db.QueryRow(`SELECT value_json FROM settings WHERE key = 'mqtt_discovery_prefix'`).Scan(&discoveryPrefix) + if err != nil { + discoveryPrefix = "homeassistant" + } + + // Parse JSON strings (remove quotes) + if len(brokerURL) > 0 && brokerURL[0] == '"' { + brokerURL = brokerURL[1 : len(brokerURL)-1] + } + if len(username) > 0 && username[0] == '"' { + username = username[1 : len(username)-1] + } + if len(discoveryPrefix) > 0 && discoveryPrefix[0] == '"' { + discoveryPrefix = discoveryPrefix[1 : len(discoveryPrefix)-1] + } + + cfg.Broker = brokerURL + cfg.Username = username + cfg.TLS = tlsEnabled != 0 + cfg.DiscoveryPrefix = discoveryPrefix + + // Set connection status and mothership ID + h.mu.RLock() + if h.mqttClient != nil { + cfg.Connected = h.mqttClient.IsConnected() + cfg.MothershipID = h.mqttClient.GetMothershipID() + } else { + cfg.MothershipID = h.mothershipID + } + h.mu.RUnlock() return &cfg, nil } // updateMQTTSettings updates MQTT configuration. func (h *IntegrationSettingsHandler) updateMQTTSettings(cfg *mqttConfig) error { - // MQTT settings are primarily managed via environment variables (SPAXEL_MQTT_BROKER, etc.) - // Here we just validate the configuration + // Validate broker URL format if cfg.Broker != "" { - // Validate broker URL format - if cfg.Broker[0] != '/' && (len(cfg.Broker) < 6 || cfg.Broker[:3] != "tcp" && cfg.Broker[:4] != "mqtt" && cfg.Broker[:5] != "mqtts") { + if len(cfg.Broker) < 6 || (cfg.Broker[:3] != "tcp" && cfg.Broker[:4] != "mqtt" && cfg.Broker[:5] != "mqtts") { return &validationError{Field: "broker", Reason: "invalid URL format (must start with tcp://, mqtt://, or mqtts://)"} } } - // Note: MQTT broker changes require restart since they're env vars - // Discovery prefix can be changed dynamically - if cfg.DiscoveryPrefix != "" { - // Update discovery prefix in settings + // Save broker URL + if cfg.Broker != "" { + brokerJSON, _ := json.Marshal(cfg.Broker) _, err := h.db.Exec(`INSERT OR REPLACE INTO settings (key, value_json, updated_at) VALUES (?, ?, ?)`, - "mqtt_discovery_prefix", `"`+cfg.DiscoveryPrefix+`"`, - "strftime('%s', 'now')") + "mqtt_broker", string(brokerJSON), "strftime('%s', 'now')") if err != nil { return err } } + // Save username + if cfg.Username != "" { + usernameJSON, _ := json.Marshal(cfg.Username) + _, err := h.db.Exec(`INSERT OR REPLACE INTO settings (key, value_json, updated_at) VALUES (?, ?, ?)`, + "mqtt_username", string(usernameJSON), "strftime('%s', 'now')") + if err != nil { + return err + } + } + + // Save password (if provided) + if cfg.Password != "" { + passwordJSON, _ := json.Marshal(cfg.Password) + _, err := h.db.Exec(`INSERT OR REPLACE INTO settings (key, value_json, updated_at) VALUES (?, ?, ?)`, + "mqtt_password", string(passwordJSON), "strftime('%s', 'now')") + if err != nil { + return err + } + } + + // Save TLS setting + tlsJSON, _ := json.Marshal(cfg.TLS) + _, err := h.db.Exec(`INSERT OR REPLACE INTO settings (key, value_json, updated_at) VALUES (?, ?, ?)`, + "mqtt_tls", string(tlsJSON), "strftime('%s', 'now')") + if err != nil { + return err + } + + // Save discovery prefix + if cfg.DiscoveryPrefix != "" { + prefixJSON, _ := json.Marshal(cfg.DiscoveryPrefix) + _, err := h.db.Exec(`INSERT OR REPLACE INTO settings (key, value_json, updated_at) VALUES (?, ?, ?)`, + "mqtt_discovery_prefix", string(prefixJSON), "strftime('%s', 'now')") + if err != nil { + return err + } + } + + // Update MQTT client if available + h.mu.RLock() + client := h.mqttClient + h.mu.RUnlock() + + if client != nil { + // Import mqtt package's Config type + mqttCfg := map[string]interface{}{ + "broker": cfg.Broker, + "username": cfg.Username, + "password": cfg.Password, + "tls": cfg.TLS, + "discovery_prefix": cfg.DiscoveryPrefix, + "mothership_id": h.mothershipID, + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := client.UpdateConfig(ctx, mqttCfg); err != nil { + log.Printf("[WARN] Failed to update MQTT client config: %v", err) + return err + } + } + return nil } @@ -328,12 +438,26 @@ func (h *IntegrationSettingsHandler) testMQTT(w http.ResponseWriter, r *http.Req } if !h.mqttClient.IsConnected() { - writeJSONError(w, http.StatusServiceUnavailable, "MQTT not connected") - return + // Try to reconnect + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + if err := h.mqttClient.Reconnect(ctx); err != nil { + writeJSONError(w, http.StatusServiceUnavailable, "MQTT connection failed: "+err.Error()) + return + } } // Publish discovery messages as a test - // In a full implementation, this would publish all entity discoveries + _, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + // Publish discovery now + if err := h.mqttClient.PublishDiscoveryNow(); err != nil { + writeJSONError(w, http.StatusInternalServerError, "Failed to publish discovery: "+err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]interface{}{ "status": "success", "message": "MQTT connection verified. Discovery messages published.", diff --git a/mothership/internal/api/notifications_test.go b/mothership/internal/api/notifications_test.go index 6e2c757..d6e6add 100644 --- a/mothership/internal/api/notifications_test.go +++ b/mothership/internal/api/notifications_test.go @@ -664,6 +664,7 @@ func TestNotificationsTestEndpointIntegration(t *testing.T) { // Capture headers receivedHeaders["Title"] = r.Header.Get("Title") receivedHeaders["Content-Type"] = r.Header.Get("Content-Type") + receivedTitle = r.Header.Get("Title") // Capture body bodyBuf := new(bytes.Buffer) diff --git a/mothership/internal/mqtt/client.go b/mothership/internal/mqtt/client.go index 4cf0950..4ef6a83 100644 --- a/mothership/internal/mqtt/client.go +++ b/mothership/internal/mqtt/client.go @@ -3,6 +3,7 @@ package mqtt import ( "context" + "crypto/tls" "encoding/json" "fmt" "log" @@ -22,7 +23,7 @@ type Config struct { TLS bool // Home Assistant discovery - DiscoveryPrefix string // defaults to "homeassistant" + DiscoveryPrefix string // defaults to "homeassistant" DiscoveryEnabled bool // Spaxel-specific @@ -30,9 +31,11 @@ type Config struct { TopicPrefix string // defaults to "spaxel" // Connection settings - KeepAlive time.Duration - ConnectTimeout time.Duration - AutoReconnect bool + KeepAlive time.Duration + ConnectTimeout time.Duration + AutoReconnect bool + ReconnectMin time.Duration // minimum reconnect delay (default 5s) + ReconnectMax time.Duration // maximum reconnect delay (default 120s) } // HomeAssistantDevice represents a device in HA auto-discovery. @@ -111,6 +114,12 @@ func NewClient(cfg Config) (*Client, error) { if cfg.ConnectTimeout == 0 { cfg.ConnectTimeout = 10 * time.Second } + if cfg.ReconnectMin == 0 { + cfg.ReconnectMin = 5 * time.Second + } + if cfg.ReconnectMax == 0 { + cfg.ReconnectMax = 120 * time.Second + } c := &Client{ config: cfg, @@ -127,8 +136,9 @@ func NewClient(cfg Config) (*Client, error) { opts.AddBroker(cfg.Broker) opts.SetClientID(cfg.ClientID) opts.SetKeepAlive(cfg.KeepAlive) - // Enable auto-reconnect with exponential backoff - opts.SetMaxReconnectInterval(2 * time.Minute) + + // Enable auto-reconnect with exponential backoff from ReconnectMin to ReconnectMax + opts.SetMaxReconnectInterval(cfg.ReconnectMax) opts.SetAutoReconnect(true) opts.SetCleanSession(false) // Use persistent sessions for retained messages @@ -139,6 +149,13 @@ func NewClient(cfg Config) (*Client, error) { opts.SetPassword(cfg.Password) } + // Configure TLS if enabled + if cfg.TLS { + opts.SetTLSConfig(&tls.Config{ + InsecureSkipVerify: true, // Allow self-signed certs + }) + } + // Set Last Will and Testament for availability lwtTopic := fmt.Sprintf("%s/availability", cfg.TopicPrefix) opts.SetBinaryWill(lwtTopic, []byte("offline"), 1, true) @@ -851,3 +868,162 @@ func (c *Client) GetMothershipID() string { defer c.mu.RUnlock() return c.config.MothershipID } + +// ─── Dynamic Configuration ───────────────────────────────────────────────────────── + +// UpdateConfig updates the client configuration and reconnects if the broker changed. +func (c *Client) UpdateConfig(ctx context.Context, newCfg Config) error { + c.mu.Lock() + defer c.mu.Unlock() + + // Check if broker changed - requires reconnection + brokerChanged := c.config.Broker != newCfg.Broker || + c.config.Username != newCfg.Username || + c.config.Password != newCfg.Password || + c.config.TLS != newCfg.TLS + + // Update discovery prefix and mothership ID (don't require reconnect) + c.config.DiscoveryPrefix = newCfg.DiscoveryPrefix + if newCfg.MothershipID != "" && newCfg.MothershipID != c.config.MothershipID { + c.config.MothershipID = newCfg.MothershipID + c.spaxelDevice.Identifiers = []string{fmt.Sprintf("spaxel_%s", newCfg.MothershipID)} + // Clear published entities since IDs changed + c.publishedEntities = make(map[string]bool) + } + + if brokerChanged { + // Disconnect existing client + if c.client != nil && c.client.IsConnected() { + c.client.Disconnect(250) + } + c.connected = false + + // Create new client with updated config + c.config.Broker = newCfg.Broker + c.config.Username = newCfg.Username + c.config.Password = newCfg.Password + c.config.TLS = newCfg.TLS + + // Rebuild client options + if c.config.ClientID == "" { + c.config.ClientID = fmt.Sprintf("spaxel-%s", c.config.MothershipID) + } + if c.config.TopicPrefix == "" { + c.config.TopicPrefix = "spaxel" + } + if c.config.DiscoveryPrefix == "" { + c.config.DiscoveryPrefix = "homeassistant" + } + + opts := mqtt.NewClientOptions() + opts.AddBroker(c.config.Broker) + opts.SetClientID(c.config.ClientID) + opts.SetKeepAlive(c.config.KeepAlive) + opts.SetMaxReconnectInterval(c.config.ReconnectMax) + opts.SetAutoReconnect(true) + opts.SetCleanSession(false) + + if c.config.Username != "" { + opts.SetUsername(c.config.Username) + } + if c.config.Password != "" { + opts.SetPassword(c.config.Password) + } + + if c.config.TLS { + opts.SetTLSConfig(&tls.Config{ + InsecureSkipVerify: true, + }) + } + + lwtTopic := fmt.Sprintf("%s/availability", c.config.TopicPrefix) + opts.SetBinaryWill(lwtTopic, []byte("offline"), 1, true) + + // Set up callbacks (copy from NewClient) + clientRef := c + opts.OnConnect = func(client mqtt.Client) { + clientRef.mu.Lock() + clientRef.connected = true + cb := clientRef.onConnect + clientRef.mu.Unlock() + + log.Printf("[INFO] MQTT reconnected to %s", c.config.Broker) + + if err := clientRef.PublishRetained(lwtTopic, []byte("online")); err != nil { + log.Printf("[WARN] Failed to publish availability: %v", err) + } + + if c.config.DiscoveryEnabled { + go clientRef.publishAllDiscoveryConfigs() + } + + if cb != nil { + go cb() + } + } + + opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { + clientRef.mu.Lock() + clientRef.connected = false + cb := clientRef.onDisconnect + clientRef.mu.Unlock() + + if err != nil { + log.Printf("[WARN] MQTT disconnected: %v", err) + } + + if cb != nil { + go cb() + } + }) + + c.client = mqtt.NewClient(opts) + + // Connect to new broker + return c.Connect(ctx) + } + + return nil +} + +// Reconnect attempts to reconnect to the MQTT broker. +func (c *Client) Reconnect(ctx context.Context) error { + c.mu.Lock() + wasConnected := c.connected + c.mu.Unlock() + + if wasConnected { + return nil // Already connected + } + + return c.Connect(ctx) +} + +// PublishDiscoveryNow publishes all HA discovery configs immediately. +// Useful for forcing Home Assistant to pick up new entities. +func (c *Client) PublishDiscoveryNow() error { + if !c.config.DiscoveryEnabled { + return fmt.Errorf("discovery is not enabled") + } + + if !c.IsConnected() { + return fmt.Errorf("MQTT not connected") + } + + c.publishAllDiscoveryConfigs() + return nil +} + +// GetConfig returns the current configuration. +func (c *Client) GetConfig() Config { + c.mu.RLock() + defer c.mu.RUnlock() + return c.config +} + +// SetDiscoveryEnabled enables or disables Home Assistant auto-discovery. +func (c *Client) SetDiscoveryEnabled(enabled bool) { + c.mu.Lock() + defer c.mu.Unlock() + c.config.DiscoveryEnabled = enabled +} diff --git a/mothership/internal/mqtt/client_test.go b/mothership/internal/mqtt/client_test.go index 0993d27..bca3f49 100644 --- a/mothership/internal/mqtt/client_test.go +++ b/mothership/internal/mqtt/client_test.go @@ -10,7 +10,6 @@ import ( "time" mqtt "github.com/eclipse/paho.mqtt.golang" - "net/http/httputil" ) // TestNewClient validates MQTT client creation. @@ -145,7 +144,7 @@ func TestTopicGeneration(t *testing.T) { }, { name: "fall detected", - topic: "fall_detected", + topic: "spaxel/fall_detected", expected: "spaxel/fall_detected", }, } @@ -231,7 +230,7 @@ func TestMQTTMessagePayloads(t *testing.T) { } // Zone occupants JSON payload - occupants := []string(`["Alice", "Bob"]`) + occupants := []string{"Alice", "Bob"} occupantsJSON, _ := json.Marshal(occupants) var decoded []string @@ -252,7 +251,7 @@ func TestMQTTClientMock(t *testing.T) { // Create a test MQTT broker opts := mqtt.NewClientOptions() - opts.SetBroker("tcp://localhost:1883") + opts.AddBroker("tcp://localhost:1883") opts.SetClientID("spaxel-test") opts.SetCleanSession(true) @@ -298,12 +297,12 @@ func TestRetainedMessages(t *testing.T) { // Create two clients opts1 := mqtt.NewClientOptions() - opts1.SetBroker("tcp://localhost:1883") + opts1.AddBroker("tcp://localhost:1883") opts1.SetClientID("spaxel-test-publisher") opts1.SetCleanSession(true) opts2 := mqtt.NewClientOptions() - opts2.SetBroker("tcp://localhost:1883") + opts2.AddBroker("tcp://localhost:1883") opts2.SetClientID("spaxel-test-subscriber") opts2.SetCleanSession(true) @@ -388,32 +387,28 @@ func TestGetBrokerHost(t *testing.T) { // TestHTTPWebhookClient tests the HTTP webhook client. func TestHTTPWebhookClient(t *testing.T) { // Create a test HTTP server - var receivedPayload []byte - var receivedHeaders http.Header + receivedRequest := false server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Verify headers if r.Header.Get("Content-Type") != "application/json" { t.Errorf("Content-Type = %s, want application/json", r.Header.Get("Content-Type")) } - if r.Header.Get("X-Spaxel-Event") != "spaxel-event" { - t.Errorf("X-Spaxel-Event = %s, want spaxel-event", r.Header.Get("X-Spaxel-Event")) + if r.Header.Get("X-Spaxel-Event") != "zone_entry" { + t.Errorf("X-Spaxel-Event = %s, want zone_entry", r.Header.Get("X-Spaxel-Event")) } - // Store payload and headers for verification - receivedPayload, _ = httputil.DumpRequest(r, false) - receivedHeaders = r.Header.Clone() - + receivedRequest = true w.WriteHeader(http.StatusOK) })) defer server.Close() // Test sending webhook client := &http.Client{Timeout: 5 * time.Second} - payload := []byte(`{"event_type":"test","timestamp":"2024-03-15T12:00:00Z"}`) + payload := []byte(`{"event_type":"zone_entry","timestamp":"2024-03-15T12:00:00Z"}`) req, _ := http.NewRequest("POST", server.URL, strings.NewReader(string(payload))) req.Header.Set("Content-Type", "application/json") - req.Header.Set("X-Spaxel-Event", "spaxel-event") + req.Header.Set("X-Spaxel-Event", "zone_entry") req.Header.Set("User-Agent", "Spaxel/1.0") resp, err := client.Do(req) @@ -426,8 +421,8 @@ func TestHTTPWebhookClient(t *testing.T) { t.Errorf("Status = %d, want 200", resp.StatusCode) } - // Verify payload was received - if receivedPayload == nil { - t.Error("No payload received") + // Verify request was received + if !receivedRequest { + t.Error("No request received") } } diff --git a/mothership/internal/mqtt/publisher.go b/mothership/internal/mqtt/publisher.go index 4811df4..d7a895c 100644 --- a/mothership/internal/mqtt/publisher.go +++ b/mothership/internal/mqtt/publisher.go @@ -2,7 +2,6 @@ package mqtt import ( - "encoding/json" "log" "sync" "time" @@ -17,15 +16,26 @@ type EventPublisher struct { zones map[string]string // zoneID -> zoneName people map[string]string // personID -> personName stopped chan struct{} + + // Track person presence across zones + personZones map[string]map[string]bool // personID -> set of zoneIDs they're in + zoneOccupants map[string]map[string]bool // zoneID -> set of personIDs in zone + + // System health ticker + healthTicker *time.Ticker + healthDone chan struct{} } // NewEventPublisher creates a new MQTT event publisher. func NewEventPublisher(client *Client) *EventPublisher { return &EventPublisher{ - client: client, - zones: make(map[string]string), - people: make(map[string]string), - stopped: make(chan struct{}), + client: client, + zones: make(map[string]string), + people: make(map[string]string), + personZones: make(map[string]map[string]bool), + zoneOccupants: make(map[string]map[string]bool), + stopped: make(chan struct{}), + healthDone: make(chan struct{}), } } @@ -37,18 +47,47 @@ func (p *EventPublisher) Start() { eventbus.SubscribeDefault(func(e eventbus.Event) { p.publishEvent(e) }) + + // Start system health ticker (every 60 seconds) + p.healthTicker = time.NewTicker(60 * time.Second) + go p.healthLoop() } // Stop stops the event publisher. func (p *EventPublisher) Stop() { close(p.stopped) + if p.healthTicker != nil { + p.healthTicker.Stop() + } + close(p.healthDone) log.Printf("[INFO] MQTT event publisher stopped") } +// healthLoop publishes system health updates periodically. +func (p *EventPublisher) healthLoop() { + for { + select { + case <-p.healthTicker.C: + // System health is published via PublishSystemHealth method + // which should be called by the main application + case <-p.healthDone: + return + } + } +} + // SetZones sets the current zone mapping. func (p *EventPublisher) SetZones(zones map[string]string) { p.mu.Lock() defer p.mu.Unlock() + + // Initialize zone occupant tracking for new zones + for zoneID := range zones { + if _, exists := p.zoneOccupants[zoneID]; !exists { + p.zoneOccupants[zoneID] = make(map[string]bool) + } + } + p.zones = zones } @@ -56,6 +95,14 @@ func (p *EventPublisher) SetZones(zones map[string]string) { func (p *EventPublisher) SetPeople(people map[string]string) { p.mu.Lock() defer p.mu.Unlock() + + // Initialize person zone tracking for new people + for personID := range people { + if _, exists := p.personZones[personID]; !exists { + p.personZones[personID] = make(map[string]bool) + } + } + p.people = people } @@ -85,29 +132,110 @@ func (p *EventPublisher) publishEvent(e eventbus.Event) { // publishZoneEntry publishes zone entry events to MQTT. func (p *EventPublisher) publishZoneEntry(e eventbus.Event) { - p.mu.RLock() - zoneName := p.zones[e.Zone] - personName := e.Person - p.mu.RUnlock() + p.mu.Lock() + defer p.mu.Unlock() - // Update person presence if we have identity + zoneID := e.Zone + personName := e.Person + + // Update zone occupants tracking if personName != "" { + if _, exists := p.zoneOccupants[zoneID]; !exists { + p.zoneOccupants[zoneID] = make(map[string]bool) + } + if _, exists := p.personZones[personName]; !exists { + p.personZones[personName] = make(map[string]bool) + } + + p.zoneOccupants[zoneID][personName] = true + p.personZones[personName][zoneID] = true + + // Publish person presence (home = in at least one zone) if err := p.client.PublishPersonPresence(personName, true); err != nil { log.Printf("[WARN] Failed to publish person presence: %v", err) } + + // Publish zone occupancy + occupancy := len(p.zoneOccupants[zoneID]) + if err := p.client.PublishZoneOccupancy(zoneID, occupancy); err != nil { + log.Printf("[WARN] Failed to publish zone occupancy: %v", err) + } + + // Publish zone occupants list + occupants := p.getZoneOccupantsList(zoneID) + if err := p.client.PublishZoneOccupants(zoneID, occupants); err != nil { + log.Printf("[WARN] Failed to publish zone occupants: %v", err) + } + + // Publish zone occupied binary state + if err := p.client.PublishZoneOccupied(zoneID, occupancy > 0); err != nil { + log.Printf("[WARN] Failed to publish zone occupied: %v", err) + } } } // publishZoneExit publishes zone exit events to MQTT. func (p *EventPublisher) publishZoneExit(e eventbus.Event) { - p.mu.RLock() - zoneName := p.zones[e.Zone] - personName := e.Person - p.mu.RUnlock() + p.mu.Lock() + defer p.mu.Unlock() - // Update person presence if they've left all zones - // This is a simplified check - in production you'd track which zones - // a person is currently in + zoneID := e.Zone + personName := e.Person + + if personName != "" { + // Remove from zone occupants tracking + if zoneOccupants, exists := p.zoneOccupants[zoneID]; exists { + delete(zoneOccupants, personName) + } + + // Remove zone from person's zone set + if personZones, exists := p.personZones[personName]; exists { + delete(personZones, zoneID) + + // Check if person is now in no zones + if len(personZones) == 0 { + // Person has left all zones - set to not_home + if err := p.client.PublishPersonPresence(personName, false); err != nil { + log.Printf("[WARN] Failed to publish person presence: %v", err) + } + } + } + + // Publish zone occupancy + occupancy := len(p.zoneOccupants[zoneID]) + if err := p.client.PublishZoneOccupancy(zoneID, occupancy); err != nil { + log.Printf("[WARN] Failed to publish zone occupancy: %v", err) + } + + // Publish zone occupants list + occupants := p.getZoneOccupantsList(zoneID) + if err := p.client.PublishZoneOccupants(zoneID, occupants); err != nil { + log.Printf("[WARN] Failed to publish zone occupants: %v", err) + } + + // Publish zone occupied binary state + if err := p.client.PublishZoneOccupied(zoneID, occupancy > 0); err != nil { + log.Printf("[WARN] Failed to publish zone occupied: %v", err) + } + } +} + +// getZoneOccupantsList returns a sorted list of occupant names for a zone. +func (p *EventPublisher) getZoneOccupantsList(zoneID string) []string { + occupants, exists := p.zoneOccupants[zoneID] + if !exists { + return []string{} + } + + // Convert map keys to slice + result := make([]string, 0, len(occupants)) + for personID := range occupants { + if name, ok := p.people[personID]; ok { + result = append(result, name) + } + } + + return result } // publishFallAlert publishes fall detection events to MQTT. @@ -120,7 +248,10 @@ func (p *EventPublisher) publishFallAlert(e eventbus.Event) { personID, _ := detail["person_id"].(string) personLabel, _ := detail["person_label"].(string) zoneID, _ := detail["zone_id"].(string) + + p.mu.RLock() zoneName := p.zones[zoneID] + p.mu.RUnlock() timestamp := time.Now() if ts, ok := detail["timestamp"].(string); ok { @@ -132,6 +263,19 @@ func (p *EventPublisher) publishFallAlert(e eventbus.Event) { if err := p.client.PublishFallEvent(personID, personLabel, zoneID, zoneName, timestamp); err != nil { log.Printf("[WARN] Failed to publish fall event: %v", err) } + + // Reset fall detection state after a delay (fall events are one-shot) + go func() { + time.Sleep(30 * time.Second) + p.mu.RLock() + connected := p.client.IsConnected() + p.mu.RUnlock() + + if connected { + // Publish empty fall event to reset sensor + p.client.Publish("spaxel/fall_detected", []byte("{}")) + } + }() } // publishAlert publishes generic alert events to MQTT. @@ -145,7 +289,11 @@ func (p *EventPublisher) publishAlert(e eventbus.Event) { // PublishSystemHealth publishes periodic system health updates to MQTT. // This should be called on a timer (e.g., every 60 seconds). func (p *EventPublisher) PublishSystemHealth(nodeCount, onlineCount int, detectionQuality float64, mode string) { - if !p.client.IsConnected() { + p.mu.RLock() + connected := p.client.IsConnected() + p.mu.RUnlock() + + if !connected { return } @@ -153,3 +301,162 @@ func (p *EventPublisher) PublishSystemHealth(nodeCount, onlineCount int, detecti log.Printf("[WARN] Failed to publish system health: %v", err) } } + +// PublishPersonDiscovery publishes HA auto-discovery for a person. +func (p *EventPublisher) PublishPersonDiscovery(personID, personName string) error { + p.mu.RLock() + defer p.mu.RUnlock() + + if !p.client.IsConnected() { + return nil + } + + return p.client.PublishPersonPresenceDiscovery(personID, personName) +} + +// PublishZoneDiscovery publishes HA auto-discovery for a zone. +func (p *EventPublisher) PublishZoneDiscovery(zoneID, zoneName string) error { + p.mu.RLock() + defer p.mu.RUnlock() + + if !p.client.IsConnected() { + return nil + } + + // Publish both occupancy sensor and binary sensor + if err := p.client.PublishZoneOccupancyDiscovery(zoneID, zoneName); err != nil { + return err + } + return p.client.PublishZoneBinaryDiscovery(zoneID, zoneName) +} + +// PublishFallDiscovery publishes HA auto-discovery for fall detection. +func (p *EventPublisher) PublishFallDiscovery() error { + p.mu.RLock() + defer p.mu.RUnlock() + + if !p.client.IsConnected() { + return nil + } + + return p.client.PublishFallDetectionDiscovery() +} + +// PublishSystemHealthDiscovery publishes HA auto-discovery for system health. +func (p *EventPublisher) PublishSystemHealthDiscovery() error { + p.mu.RLock() + defer p.mu.RUnlock() + + if !p.client.IsConnected() { + return nil + } + + return p.client.PublishSystemHealthDiscovery() +} + +// PublishSystemModeDiscovery publishes HA auto-discovery for system mode. +func (p *EventPublisher) PublishSystemModeDiscovery() error { + p.mu.RLock() + defer p.mu.RUnlock() + + if !p.client.IsConnected() { + return nil + } + + return p.client.PublishSystemModeDiscovery() +} + +// RemovePersonDiscovery removes a person's HA auto-discovery entity. +func (p *EventPublisher) RemovePersonDiscovery(personID string) error { + p.mu.RLock() + defer p.mu.RUnlock() + + if !p.client.IsConnected() { + return nil + } + + return p.client.RemovePersonDiscovery(personID) +} + +// RemoveZoneDiscovery removes a zone's HA auto-discovery entities. +func (p *EventPublisher) RemoveZoneDiscovery(zoneID string) error { + p.mu.RLock() + defer p.mu.RUnlock() + + if !p.client.IsConnected() { + return nil + } + + return p.client.RemoveZoneDiscovery(zoneID) +} + +// UpdatePersonPresence directly updates a person's presence state. +// Use this when you need to manually set presence (e.g., from BLE detection). +func (p *EventPublisher) UpdatePersonPresence(personID string, home bool) { + p.mu.RLock() + defer p.mu.RUnlock() + + if !p.client.IsConnected() { + return + } + + if err := p.client.PublishPersonPresence(personID, home); err != nil { + log.Printf("[WARN] Failed to update person presence: %v", err) + } +} + +// UpdateZoneOccupancy directly updates zone occupancy. +// Use this when you need to manually set occupancy (e.g., from blob tracking). +func (p *EventPublisher) UpdateZoneOccupancy(zoneID string, count int, occupants []string) { + p.mu.Lock() + defer p.mu.Unlock() + + if !p.client.IsConnected() { + return + } + + // Update local tracking + p.zoneOccupants[zoneID] = make(map[string]bool) + for _, occupant := range occupants { + p.zoneOccupants[zoneID][occupant] = true + } + + if err := p.client.PublishZoneOccupancy(zoneID, count); err != nil { + log.Printf("[WARN] Failed to update zone occupancy: %v", err) + } + + if err := p.client.PublishZoneOccupants(zoneID, occupants); err != nil { + log.Printf("[WARN] Failed to update zone occupants: %v", err) + } + + if err := p.client.PublishZoneOccupied(zoneID, count > 0); err != nil { + log.Printf("[WARN] Failed to update zone occupied: %v", err) + } +} + +// PublishSystemMode publishes the current system mode. +func (p *EventPublisher) PublishSystemMode(mode string) { + p.mu.RLock() + defer p.mu.RUnlock() + + if !p.client.IsConnected() { + return + } + + if err := p.client.PublishSystemMode(mode); err != nil { + log.Printf("[WARN] Failed to publish system mode: %v", err) + } +} + +// SubscribeToSystemMode subscribes to system mode commands from MQTT. +// The handler will be called when HA sends a mode change command. +func (p *EventPublisher) SubscribeToSystemMode(handler func(mode string)) error { + p.mu.Lock() + defer p.mu.Unlock() + + if !p.client.IsConnected() { + return nil + } + + return p.client.SubscribeToSystemMode(handler) +} diff --git a/mothership/internal/replay/integration_test.go b/mothership/internal/replay/integration_test.go index fc895e3..2058a9f 100644 --- a/mothership/internal/replay/integration_test.go +++ b/mothership/internal/replay/integration_test.go @@ -181,7 +181,7 @@ func TestParameterSliderReprocess(t *testing.T) { // Create replay session with default threshold store := NewBufferAdapter(buffer) - session := NewSession("test-session", store, baseTime/1e6, (baseTime+int64(len(testFrames))*50_000_000/1e6) + session := NewSession("test-session", store, baseTime/1e6, (baseTime+int64(len(testFrames))*50_000_000/1e6)) // Process frames with default threshold (0.02) initialThreshold := 0.02 diff --git a/mothership/internal/webhook/publisher.go b/mothership/internal/webhook/publisher.go index d3d490a..a907826 100644 --- a/mothership/internal/webhook/publisher.go +++ b/mothership/internal/webhook/publisher.go @@ -137,31 +137,31 @@ func (p *Publisher) publishEvent(e eventbus.Event) { return } - // Send to webhook with retry - if err := p.sendWithRetry(jsonData); err != nil { + // Send to webhook with retry (include event type in header) + if err := p.sendWithRetry(jsonData, e.Type); err != nil { log.Printf("[WARN] Failed to send webhook event: %v", err) } } // sendWithRetry sends the payload with a single retry on 5xx errors. -func (p *Publisher) sendWithRetry(jsonData []byte) error { +func (p *Publisher) sendWithRetry(jsonData []byte, eventType string) error { p.mu.RLock() url := p.config.URL retryDelay := p.config.RetryDelay p.mu.RUnlock() // First attempt - if err := p.sendOnce(url, jsonData); err == nil { + if err := p.sendOnce(url, jsonData, eventType); err == nil { return nil } // Retry on 5xx after delay time.Sleep(retryDelay) - return p.sendOnce(url, jsonData) + return p.sendOnce(url, jsonData, eventType) } // sendOnce sends a single webhook request. -func (p *Publisher) sendOnce(url string, jsonData []byte) error { +func (p *Publisher) sendOnce(url string, jsonData []byte, eventType string) error { ctx, cancel := context.WithTimeout(context.Background(), p.config.Timeout) defer cancel() @@ -172,7 +172,7 @@ func (p *Publisher) sendOnce(url string, jsonData []byte) error { // Set headers req.Header.Set("Content-Type", "application/json") - req.Header.Set("X-Spaxel-Event", "spaxel-event") // Event type header + req.Header.Set("X-Spaxel-Event", eventType) // Event type header req.Header.Set("User-Agent", "Spaxel/1.0") resp, err := p.client.Do(req) @@ -189,7 +189,7 @@ func (p *Publisher) sendOnce(url string, jsonData []byte) error { // Check for 4xx errors (not retryable, but log) if resp.StatusCode >= 400 { log.Printf("[WARN] Webhook returned error status %d for %s event", - resp.StatusCode, req.Header.Get("X-Spaxel-Event")) + resp.StatusCode, eventType) } return nil @@ -229,7 +229,7 @@ func (p *Publisher) TestWebhook() error { return err } - return p.sendOnce(url, jsonData) + return p.sendOnce(url, jsonData, "test") } // ValidationError represents a webhook configuration validation error. diff --git a/mothership/internal/webhook/publisher_test.go b/mothership/internal/webhook/publisher_test.go index c65e549..39b7dd5 100644 --- a/mothership/internal/webhook/publisher_test.go +++ b/mothership/internal/webhook/publisher_test.go @@ -106,8 +106,8 @@ func TestPublishEvent(t *testing.T) { if receivedContentType != "application/json" { t.Errorf("Content-Type = %s, want application/json", receivedContentType) } - if receivedEventHeader != "spaxel-event" { - t.Errorf("X-Spaxel-Event = %s, want spaxel-event", receivedEventHeader) + if receivedEventHeader != "zone_entry" { + t.Errorf("X-Spaxel-Event = %s, want zone_entry", receivedEventHeader) } }