diff --git a/mothership/cmd/mothership/main.go b/mothership/cmd/mothership/main.go index 9f19647..d286895 100644 --- a/mothership/cmd/mothership/main.go +++ b/mothership/cmd/mothership/main.go @@ -13,6 +13,7 @@ import ( "os/signal" "path/filepath" "strconv" + "strings" "syscall" "time" @@ -55,6 +56,28 @@ const ( // Build-time version injection var version = "dev" +// gdopAdapter wraps a localization.Engine to implement fleet.GDOPCalculator. +type gdopAdapter struct { + eng *localization.Engine +} + +func (a *gdopAdapter) GDOPMap(positions []fleet.NodePosition) ([]float32, int, int) { + loc := make([]localization.NodePosition, len(positions)) + for i, p := range positions { + loc[i] = localization.NodePosition{MAC: p.MAC, X: p.X, Y: 0, Z: p.Z} + } + return a.eng.GDOPMap(loc) +} + +// parseLinkID splits a link ID "node_mac:peer_mac" into its two components. +func parseLinkID(linkID string) []string { + i := strings.IndexByte(linkID, ':') + if i < 0 { + return nil + } + return []string{linkID[:i], linkID[i+1:]} +} + // Config holds application configuration type Config struct { BindAddr string @@ -72,6 +95,109 @@ type Config struct { MQTTPassword string } +func parseConfig() Config { + return Config{ + BindAddr: envOr("SPAXEL_BIND_ADDR", "0.0.0.0:8080"), + DataDir: envOr("SPAXEL_DATA_DIR", "/data"), + StaticDir: envOr("SPAXEL_STATIC_DIR", ""), + MDNSName: envOr("SPAXEL_MDNS_NAME", "spaxel"), + MDNSEnabled: envOr("SPAXEL_MDNS_ENABLED", "true") == "true", + LogLevel: envOr("SPAXEL_LOG_LEVEL", "info"), + ReplayMaxMB: envInt("SPAXEL_REPLAY_MAX_MB", 360), + MQTTBroker: envOr("SPAXEL_MQTT_BROKER", ""), + MQTTClientID: envOr("SPAXEL_MQTT_CLIENT_ID", ""), + MQTTUsername: envOr("SPAXEL_MQTT_USERNAME", ""), + MQTTPassword: envOr("SPAXEL_MQTT_PASSWORD", ""), + } +} + +func envOr(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func envInt(key string, fallback int) int { + if v := os.Getenv(key); v != "" { + if n, err := strconv.Atoi(v); err == nil { + return n + } + } + return fallback +} + +func writeJSON(w http.ResponseWriter, v interface{}) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(v) //nolint:errcheck +} + +func findDashboardDir() string { + for _, dir := range []string{"./dashboard", "./../dashboard", "/app/dashboard"} { + if _, err := os.Stat(dir); err == nil { + return dir + } + } + return "" +} + +// fleetRoomConfigAdapter adapts fleet.Registry to notify.RoomConfigProvider. +type fleetRoomConfigAdapter struct { + reg *fleet.Registry +} + +func (a *fleetRoomConfigAdapter) GetRoom() (width, height, depth float64) { + room, err := a.reg.GetRoom() + if err != nil { + return 10, 2.5, 10 + } + return room.Width, room.Height, room.Depth +} + +// multiFleetNotifier fans out ingestion.FleetNotifier events to multiple fleet components. +type multiFleetNotifier struct { + notifiers []interface { + OnNodeConnected(mac, firmware, chip string) + OnNodeDisconnected(mac string) + } +} + +func newMultiNotifier(notifiers ...interface { + OnNodeConnected(mac, firmware, chip string) + OnNodeDisconnected(mac string) +}) *multiFleetNotifier { + return &multiFleetNotifier{notifiers: notifiers} +} + +func (m *multiFleetNotifier) OnNodeConnected(mac, firmware, chip string) { + for _, n := range m.notifiers { + n.OnNodeConnected(mac, firmware, chip) + } +} + +func (m *multiFleetNotifier) OnNodeDisconnected(mac string) { + for _, n := range m.notifiers { + n.OnNodeDisconnected(mac) + } +} + +// gdopCalculatorAdapter adapts localization.Engine to fleet.GDOPCalculator. +type gdopCalculatorAdapter struct { + engine *localization.Engine +} + +func (a *gdopCalculatorAdapter) GDOPMap(positions []fleet.NodePosition) ([]float32, int, int) { + locPositions := make([]localization.NodePosition, len(positions)) + for i, p := range positions { + locPositions[i] = localization.NodePosition{ + MAC: p.MAC, + X: p.X, + Z: p.Z, + } + } + return a.engine.GDOPMap(locPositions) +} + func main() { cfg := parseConfig() log.Printf("[INFO] Spaxel mothership v%s starting", version) @@ -80,6 +206,8 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + var explainabilityHandler *explainability.Handler + sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) @@ -212,6 +340,11 @@ func main() { fallDetector := falldetect.NewDetector() log.Printf("[INFO] Fall detector initialized") + // Declare dashboard hub and notify service early so closures can reference them. + // They are assigned later in this function. + var dashboardHub *dashboard.Hub + var notifyService *notify.Service + // Phase 6: Sleep quality monitor sleepMonitor := sleep.NewMonitor(sleep.MonitorConfig{ SampleInterval: 30 * time.Second, @@ -231,7 +364,9 @@ func main() { "generated_at": report.GeneratedAt.Unix(), } data, _ := json.Marshal(msg) - dashboardHub.Broadcast(data) + if dashboardHub != nil { + dashboardHub.Broadcast(data) + } // Send notification for morning report if notifyService != nil { @@ -242,7 +377,7 @@ func main() { Tags: []string{"sleep", "morning"}, Data: report.ToJSONMap(), } - notifyService.Send(notif) + notifyService.Send(notif) //nolint:errcheck } log.Printf("[INFO] Sleep report for %s: score=%.1f rating=%s", linkID, report.Metrics.OverallScore, report.Metrics.QualityRating) @@ -296,7 +431,7 @@ func main() { } // Phase 6: Notification service - notifyService, err := notify.NewService(filepath.Join(cfg.DataDir, "notify.db")) + notifyService, err = notify.NewService(filepath.Join(cfg.DataDir, "notify.db")) if err != nil { log.Printf("[WARN] Failed to open notification database: %v", err) } else { @@ -304,7 +439,7 @@ func main() { log.Printf("[INFO] Notification service at %s", filepath.Join(cfg.DataDir, "notify.db")) // Set room config provider for floor plan thumbnails - notifyService.SetRoomConfig(fleetReg) + notifyService.SetRoomConfig(&fleetRoomConfigAdapter{reg: fleetReg}) } // Phase 6: Self-improving localization system @@ -317,11 +452,12 @@ func main() { originX := 0.0 originZ := 0.0 if fleetReg != nil { - if w, d, ox, oz, ok := fleetReg.GetRoomConfig(); ok { - roomWidth = w - roomDepth = d - originX = ox - originZ = oz + room, roomErr := fleetReg.GetRoom() + if roomErr == nil && room != nil { + roomWidth = room.Width + roomDepth = room.Depth + originX = room.OriginX + originZ = room.OriginZ } } @@ -353,8 +489,8 @@ func main() { // Set node positions from fleet registry if fleetReg != nil { nodes, _ := fleetReg.GetAllNodes() - for mac, node := range nodes { - selfImprovingLocalizer.SetNodePosition(mac, node.PosX, node.PosZ) + for _, node := range nodes { + selfImprovingLocalizer.SetNodePosition(node.MAC, node.PosX, node.PosZ) } } @@ -488,7 +624,7 @@ func main() { go rateCtrl.Run(ctx) // Dashboard hub and server - dashboardHub := dashboard.NewHub() + dashboardHub = dashboard.NewHub() dashboardSrv := dashboard.NewServer(dashboardHub) dashboardHub.SetIngestionState(ingestSrv) @@ -617,11 +753,11 @@ func main() { // Load registered devices from BLE registry if bleRegistry != nil { - devices, err := bleRegistry.GetAllRegisteredDevices() - if err == nil { + deviceRecords, devErr := bleRegistry.GetRegisteredDevices(false) + if devErr == nil { var macs []string - for mac := range devices { - macs = append(macs, mac) + for _, dev := range deviceRecords { + macs = append(macs, dev.Addr) } anomalyDetector.SetRegisteredDevices(macs) } @@ -642,8 +778,9 @@ func main() { selfHealManager.SetNotifier(ingestSrv) selfHealManager.SetBroadcaster(dashboardHub) if selfImprovingLocalizer != nil { - selfHealManager.SetGDOPCalculator(selfImprovingLocalizer.GetEngine()) - roleOptimiser.SetGDOPCalculator(selfImprovingLocalizer.GetEngine()) + gdopCalc := &gdopAdapter{eng: selfImprovingLocalizer.GetEngine()} + selfHealManager.SetGDOPCalculator(gdopCalc) + roleOptimiser.SetGDOPCalculator(gdopCalc) } go selfHealManager.Run(ctx) @@ -800,7 +937,20 @@ func main() { // Update identity matcher if identityMatcher != nil { - identityMatcher.UpdateBlobs(blobs) + // Convert TrackedBlob to the anonymous struct expected by IdentityMatcher + matcherBlobs := make([]struct { + ID int + X, Y, Z float64 + Weight float64 + }, len(blobs)) + for i, b := range blobs { + matcherBlobs[i] = struct { + ID int + X, Y, Z float64 + Weight float64 + }{ID: b.ID, X: b.X, Y: b.Y, Z: b.Z, Weight: b.Weight} + } + identityMatcher.UpdateBlobs(matcherBlobs) // Collect ground truth samples for self-improving localization if groundTruthCollector != nil { @@ -902,15 +1052,16 @@ func main() { for _, blob := range blobs { match := identityMatcher.GetMatch(blob.ID) if match != nil && match.PersonID != "" { + triPos := [3]float64{match.TriangulationPos.X, match.TriangulationPos.Y, match.TriangulationPos.Z} identityMap[blob.ID] = &explainability.BLEMatch{ PersonID: match.PersonID, - PersonLabel: match.PersonLabel, + PersonLabel: match.PersonName, PersonColor: match.PersonColor, DeviceAddr: match.DeviceAddr, Confidence: match.Confidence, - MatchMethod: match.MatchMethod, - ReportedByNodes: match.ReportedByNodes, - TriangulationPos: &[3]float64{&match.TriangulationPos.X, &match.TriangulationPos.Y, &match.TriangulationPos.Z}, + MatchMethod: "ble_rssi", + ReportedByNodes: nil, + TriangulationPos: &triPos, } } } @@ -956,13 +1107,26 @@ func main() { ID int X, Y, Z float64 VX, VY, VZ float64 - Posture string - }{blob}, time.Now()) + Posture string + }{{ID: blob.ID, X: blob.X, Y: blob.Y, Z: blob.Z, VX: blob.VX, VY: blob.VY, VZ: blob.VZ}}, time.Now()) } // Evaluate automations if automationEngine != nil { - automationEngine.Evaluate(blobs, func(blobID int) string { + autoBlobs := make([]automation.TrackedBlob, len(blobs)) + for i, b := range blobs { + autoBlobs[i] = automation.TrackedBlob{ + ID: b.ID, + X: b.X, + Y: b.Y, + Z: b.Z, + VX: b.VX, + VY: b.VY, + VZ: b.VZ, + Confidence: b.Weight, + } + } + automationEngine.Evaluate(autoBlobs, func(blobID int) string { if zonesMgr != nil { return zonesMgr.GetBlobZone(blobID) } @@ -1004,8 +1168,8 @@ func main() { var bleDevices []string if identityMatcher != nil { for _, blobID := range occ.BlobIDs { - if match := identityMatcher.GetMatch(blobID); match != nil && match.DeviceMAC != "" { - bleDevices = append(bleDevices, match.DeviceMAC) + if match := identityMatcher.GetMatch(blobID); match != nil && match.DeviceAddr != "" { + bleDevices = append(bleDevices, match.DeviceAddr) } } } @@ -1033,7 +1197,7 @@ func main() { } if personID != "" { // Check for unusual dwell (fall detection takes priority) - fallDetected := fallDetector.IsFallDetected(blobID) + fallDetected := fallDetector.GetTrackState(blobID) == falldetect.StateFallConfirmed anomalyDetector.ProcessDwellDuration(zone.ID, personID, dwellTime, isSecurityMode, fallDetected) } } @@ -1577,7 +1741,11 @@ func main() { // Phase 6: BLE REST API if bleRegistry != nil { r.Get("/api/ble/devices", func(w http.ResponseWriter, r *http.Request) { - devices := bleRegistry.GetAllDevices() + devices, err := bleRegistry.GetDevices(false) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } writeJSON(w, devices) }) r.Get("/api/ble/devices/{addr}", func(w http.ResponseWriter, r *http.Request) { @@ -1599,11 +1767,12 @@ func main() { http.Error(w, "addr required", http.StatusBadRequest) return } - if err := bleRegistry.UpsertDevice(&device); err != nil { + result, err := bleRegistry.PreregisterDevice(device.Addr, device.Name) + if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - writeJSON(w, device) + writeJSON(w, result) }) r.Put("/api/ble/devices/{addr}", func(w http.ResponseWriter, r *http.Request) { addr := chi.URLParam(r, "addr") @@ -1612,12 +1781,33 @@ func main() { http.Error(w, err.Error(), http.StatusBadRequest) return } - device.Addr = addr - if err := bleRegistry.UpsertDevice(&device); err != nil { + updates := map[string]interface{}{} + if device.Name != "" { + updates["name"] = device.Name + } + if device.Label != "" { + updates["label"] = device.Label + } + if device.DeviceType != "" { + updates["device_type"] = string(device.DeviceType) + } + if device.PersonID != "" { + updates["person_id"] = device.PersonID + } + if len(updates) == 0 { + writeJSON(w, device) + return + } + if err := bleRegistry.UpdateDevice(addr, updates); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - writeJSON(w, device) + result, err := bleRegistry.GetDevice(addr) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, result) }) r.Delete("/api/ble/devices/{addr}", func(w http.ResponseWriter, r *http.Request) { addr := chi.URLParam(r, "addr") @@ -1640,8 +1830,31 @@ func main() { // Phase 6: Zones REST API if zonesMgr != nil { r.Get("/api/zones", func(w http.ResponseWriter, r *http.Request) { - zones := zonesMgr.GetAllZones() - writeJSON(w, zones) + allZones := zonesMgr.GetAllZones() + occupancy := zonesMgr.GetOccupancy() + statusMap := zonesMgr.GetOccupancyStatus() + type zoneWithOcc struct { + zones.Zone + Occupancy int `json:"occupancy"` + People []int `json:"people"` + OccStatus string `json:"occ_status,omitempty"` + } + result := make([]zoneWithOcc, 0, len(allZones)) + for _, z := range allZones { + entry := zoneWithOcc{Zone: *z} + if occ, ok := occupancy[z.ID]; ok { + entry.Occupancy = occ.Count + entry.People = occ.BlobIDs + } + if s, ok := statusMap[z.ID]; ok && s == zones.OccupancyUncertain { + entry.OccStatus = "uncertain" + } + if entry.People == nil { + entry.People = []int{} + } + result = append(result, entry) + } + writeJSON(w, result) }) r.Post("/api/zones", func(w http.ResponseWriter, r *http.Request) { var zone zones.Zone @@ -1671,6 +1884,17 @@ func main() { return } writeJSON(w, zone) + }) + r.Delete("/api/zones/{id}", func(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if err := zonesMgr.DeleteZone(id); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + }) + } + // Phase 6: Portals REST API if zonesMgr != nil { r.Get("/api/portals", func(w http.ResponseWriter, r *http.Request) { @@ -1715,25 +1939,6 @@ func main() { w.WriteHeader(http.StatusNoContent) }) } - if err := zonesMgr.UpdatePortal(&portal); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - writeJSON(w, portal) - }) - r.Delete("/api/portals/{id}", func(w http.ResponseWriter, r *http.Request) { - id := chi.URLParam(r, "id") - if zonesMgr == nil { - http.Error(w, "zones manager not available", http.StatusServiceUnavailable) - return - } - if err := zonesMgr.DeletePortal(id); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.WriteHeader(http.StatusNoContent) - }) - } // Phase 6: Automation REST API if automationEngine != nil { @@ -2280,7 +2485,10 @@ func main() { if err == nil { transitions := make([]map[string]interface{}, len(probs)) for i, p := range probs { - zoneName, _ := zonesMgr.GetZoneName(p.ToZoneID) + zoneName := p.ToZoneID + if z := zonesMgr.GetZone(p.ToZoneID); z != nil { + zoneName = z.Name + } transitions[i] = map[string]interface{}{ "from_zone_id": p.FromZoneID, "to_zone_id": p.ToZoneID, @@ -2322,7 +2530,9 @@ func main() { for i, p := range probs { zoneName := p.ToZoneID if zonesMgr != nil { - zoneName, _ = zonesMgr.GetZoneName(p.ToZoneID) + if z := zonesMgr.GetZone(p.ToZoneID); z != nil { + zoneName = z.Name + } } transitions[i] = map[string]interface{}{ "from_zone_id": p.FromZoneID, @@ -2368,7 +2578,9 @@ func main() { for i, p := range probs { zoneName := p.ToZoneID if zonesMgr != nil { - zoneName, _ = zonesMgr.GetZoneName(p.ToZoneID) + if z := zonesMgr.GetZone(p.ToZoneID); z != nil { + zoneName = z.Name + } } transitions[i] = map[string]interface{}{ "from_zone_id": p.FromZoneID, @@ -2449,7 +2661,7 @@ func main() { } // Phase 6: Detection explainability API - explainabilityHandler := explainability.NewHandler() + explainabilityHandler = explainability.NewHandler() explainabilityHandler.RegisterRoutes(r) log.Printf("[INFO] Detection explainability API enabled")