diff --git a/dashboard/js/app.js b/dashboard/js/app.js index 3b66e3d..38f8538 100644 --- a/dashboard/js/app.js +++ b/dashboard/js/app.js @@ -37,6 +37,7 @@ const state = { ws: null, wsConnected: false, + awaitingSnapshot: false, // true between WS open and first snapshot message nodes: new Map(), // MAC -> { mac, firmware, chip, lastSeen } links: new Map(), // linkID -> { nodeMAC, peerMAC, lastFrame, lastCSI, motionDetected, deltaRMS, ampHistory, lastAmpSample } selectedLinkID: null, @@ -377,6 +378,7 @@ state.ws.onopen = function() { console.log('[Spaxel] WebSocket connected'); state.wsConnected = true; + state.awaitingSnapshot = true; updateConnectionStatus(true); }; @@ -555,6 +557,19 @@ } function handleJSONMessage(msg) { + // Snapshot: first message on every connect/reconnect. Contains full state. + if (msg.type === 'snapshot') { + handleSnapshot(msg); + return; + } + + // Incremental update: 10 Hz delta with no type field. + // Only fields that changed since last tick are present. + if (!msg.type) { + handleIncrementalUpdate(msg); + return; + } + switch (msg.type) { case 'state': // Initial state dump @@ -733,6 +748,153 @@ } } + // ─── Snapshot + Incremental Update Protocol ───────────────────────────── + + function handleSnapshot(msg) { + state.awaitingSnapshot = false; + console.log('[Spaxel] Received snapshot, rebuilding state'); + + // Nodes + if (msg.nodes) { + state.nodes.clear(); + msg.nodes.forEach(function (node) { + state.nodes.set(node.mac, { + mac: node.mac, + firmware: node.firmware_version, + chip: node.chip, + lastSeen: Date.now() + }); + }); + } + + // Links + if (msg.links) { + state.links.clear(); + msg.links.forEach(function (link) { + state.links.set(link.id, { + nodeMAC: link.node_mac, + peerMAC: link.peer_mac, + lastFrame: Date.now(), + lastCSI: null, + motionDetected: false, + deltaRMS: 0, + ampHistory: [], + lastAmpSample: 0 + }); + }); + } + + // Motion states + if (msg.motion_states) { + msg.motion_states.forEach(function (ms) { applyMotionState(ms); }); + } + + // Blobs (localisation) + if (msg.blobs) { + Viz3D.handleLocUpdate({ type: 'loc_update', blobs: msg.blobs }); + } + + // BLE devices + if (msg.ble_devices) { + handleBLEScanMessage({ type: 'ble_scan', devices: msg.ble_devices }); + } + + // Triggers + if (msg.triggers) { + msg.triggers.forEach(function (trigger) { + if (window.Automations && window.Automations.updateTriggerState) { + window.Automations.updateTriggerState(trigger); + } + }); + } + + // Zones + if (msg.zones) { + if (window.Viz3D && window.Viz3D.handleZoneUpdate) { + Viz3D.handleZoneUpdate(msg.zones); + } + } + + updateNodeList(); + updateLinkList(); + Viz3D.applyLinks(msg.links || []); + } + + function handleIncrementalUpdate(msg) { + // Drop incremental updates until the snapshot has been received. + if (state.awaitingSnapshot) return; + + // Blobs (always present when localisation is running) + if (msg.blobs) { + Viz3D.handleLocUpdate({ type: 'loc_update', blobs: msg.blobs }); + } + + // Nodes (only present when node list changed) + if (msg.nodes !== undefined) { + if (msg.nodes.length > 0) { + msg.nodes.forEach(function (node) { + state.nodes.set(node.mac, { + mac: node.mac, + firmware: node.firmware_version, + chip: node.chip, + lastSeen: Date.now() + }); + }); + } + updateNodeList(); + } + + // Links (only present when link list changed) + if (msg.links !== undefined) { + if (msg.links.length > 0) { + msg.links.forEach(function (link) { + state.links.set(link.id, { + nodeMAC: link.node_mac, + peerMAC: link.peer_mac, + lastFrame: Date.now(), + lastCSI: null, + motionDetected: false, + deltaRMS: 0, + ampHistory: [], + lastAmpSample: 0 + }); + }); + } + updateLinkList(); + Viz3D.applyLinks(msg.links); + } + + // Motion states (only present when motion state changed) + if (msg.motion_states) { + var changed = false; + msg.motion_states.forEach(function (ms) { + if (applyMotionState(ms)) changed = true; + }); + if (changed) updateLinkList(); + } + + // BLE devices + if (msg.ble_devices) { + handleBLEScanMessage({ type: 'ble_scan', devices: msg.ble_devices }); + } + + // Triggers + if (msg.triggers) { + msg.triggers.forEach(function (trigger) { + if (window.Automations && window.Automations.updateTriggerState) { + window.Automations.updateTriggerState(trigger); + } + }); + } + + // Zones + if (msg.zones) { + if (window.Viz3D && window.Viz3D.handleZoneUpdate) { + Viz3D.handleZoneUpdate(msg.zones); + } + } + } + // applyMotionState updates a link's motion fields; returns true if it changed. function applyMotionState(ms) { const link = state.links.get(ms.link_id); diff --git a/mothership/internal/dashboard/hub.go b/mothership/internal/dashboard/hub.go index 712508f..6feac18 100644 --- a/mothership/internal/dashboard/hub.go +++ b/mothership/internal/dashboard/hub.go @@ -501,23 +501,23 @@ func (h *Hub) buildZoneSnapshots(zp ZoneStateProvider) []ZoneSnapshot { occupancy := zp.GetOccupancy() result := make([]ZoneSnapshot, 0, len(zones)) for _, z := range zones { - occ := occupancy[z.ID] + occ, ok := occupancy[z.ID] people := make([]string, 0) - if occ != nil { + if ok { // Blob IDs don't have names yet; leave people empty. _ = occ.BlobIDs } result = append(result, ZoneSnapshot{ ID: z.ID, Name: z.Name, - Count: func() int { if occ != nil { return occ.Count }; return 0 }(), + Count: occ.Count, People: people, MinX: z.MinX, MinY: z.MinY, MinZ: z.MinZ, - SizeX: z.MaxX - z.MinX, - SizeY: z.MaxY - z.MinY, - SizeZ: z.MaxZ - z.MinZ, + SizeX: z.SizeX, + SizeY: z.SizeY, + SizeZ: z.SizeZ, }) } return result @@ -663,25 +663,6 @@ func (h *Hub) ClientCount() int { return len(h.clients) } -// broadcastBLEScan broadcasts the current BLE device list to all dashboard clients. -func (h *Hub) broadcastBLEScan() { - h.mu.RLock() - state := h.bleState - clientCount := len(h.clients) - h.mu.RUnlock() - - if state == nil || clientCount == 0 { - return - } - - devices := state.GetCurrentDevices() - if len(devices) == 0 { - return - } - - h.BroadcastBLEScan(devices) -} - // broadcastSystemHealth broadcasts system health stats to all dashboard clients. func (h *Hub) broadcastSystemHealth() { h.mu.RLock() diff --git a/mothership/internal/dashboard/hub_test.go b/mothership/internal/dashboard/hub_test.go index 575ac8b..ae88310 100644 --- a/mothership/internal/dashboard/hub_test.go +++ b/mothership/internal/dashboard/hub_test.go @@ -1,6 +1,8 @@ package dashboard import ( + "encoding/json" + "strings" "testing" "time" @@ -36,6 +38,16 @@ func TestHub_RegisterUnregister(t *testing.T) { } } +// drainSnapshot reads and discards the initial snapshot message sent on connect. +func drainSnapshot(t *testing.T, ch chan []byte) { + t.Helper() + select { + case <-ch: + case <-time.After(100 * time.Millisecond): + t.Fatal("expected snapshot message") + } +} + func TestHub_Broadcast(t *testing.T) { hub := NewHub() go hub.Run() @@ -47,6 +59,7 @@ func TestHub_Broadcast(t *testing.T) { hub.Register(client) time.Sleep(10 * time.Millisecond) + drainSnapshot(t, client.send) // Broadcast a message testMsg := []byte(`{"type":"test"}`) @@ -74,6 +87,7 @@ func TestHub_BroadcastCSI(t *testing.T) { hub.Register(client) time.Sleep(10 * time.Millisecond) + drainSnapshot(t, client.send) // Broadcast CSI (raw binary data) testData := []byte{0x01, 0x02, 0x03, 0x04} @@ -100,6 +114,7 @@ func TestHub_NodeEvents(t *testing.T) { hub.Register(client) time.Sleep(10 * time.Millisecond) + drainSnapshot(t, client.send) // Test node connected event hub.BroadcastNodeConnected("AA:BB:CC:DD:EE:FF", "1.0.0", "ESP32-S3") @@ -131,6 +146,7 @@ func TestHub_LinkEvents(t *testing.T) { hub.Register(client) time.Sleep(10 * time.Millisecond) + drainSnapshot(t, client.send) // Test link active event hub.BroadcastLinkActive("AA:BB:CC:DD:EE:FF:11:22:33:44:55:66", "AA:BB:CC:DD:EE:FF", "11:22:33:44:55:66") @@ -160,11 +176,11 @@ func (m *MockIngestionState) GetAllMotionStates() []ingestion.MotionStateItem { return nil } -func TestHub_InitialState(t *testing.T) { +func TestHub_SnapshotOnConnect(t *testing.T) { hub := NewHub() go hub.Run() - // Set mock ingestion state + // Set mock ingestion state so the snapshot has content mock := &MockIngestionState{ nodes: []ingestion.NodeInfo{ {MAC: "AA:BB:CC:DD:EE:FF", FirmwareVersion: "1.0.0", Chip: "ESP32-S3"}, @@ -183,10 +199,148 @@ func TestHub_InitialState(t *testing.T) { hub.Register(client) time.Sleep(10 * time.Millisecond) - // Should receive initial state - msg := <-client.send - // Just check it's a valid JSON with type "state" - if len(msg) == 0 || msg[0] != '{' { - t.Errorf("expected JSON state message, got %v", msg) + // First message must be the snapshot + select { + case msg := <-client.send: + if len(msg) == 0 || msg[0] != '{' { + t.Fatalf("expected JSON snapshot message, got %v", msg) + } + + var parsed map[string]json.RawMessage + if err := json.Unmarshal(msg, &parsed); err != nil { + t.Fatalf("failed to parse snapshot JSON: %v", err) + } + + // Must have type="snapshot" + if typ, ok := parsed["type"]; !ok { + t.Fatal("snapshot missing 'type' field") + } else if strings.Trim(string(typ), `"`) != "snapshot" { + t.Fatalf("expected type=snapshot, got %s", string(typ)) + } + + // Must have timestamp_ms + if _, ok := parsed["timestamp_ms"]; !ok { + t.Fatal("snapshot missing 'timestamp_ms' field") + } + + // Must contain nodes from the mock + if _, ok := parsed["nodes"]; !ok { + t.Fatal("snapshot missing 'nodes' field") + } + + case <-time.After(100 * time.Millisecond): + t.Fatal("expected snapshot message within 100 ms") + } +} + +func TestHub_SnapshotBeforeDelta(t *testing.T) { + hub := NewHub() + go hub.Run() + + mock := &MockIngestionState{ + nodes: []ingestion.NodeInfo{ + {MAC: "AA:BB:CC:DD:EE:FF", FirmwareVersion: "1.0.0"}, + }, + } + hub.SetIngestionState(mock) + + client := &Client{ + hub: hub, + send: make(chan []byte, 256), + } + + hub.Register(client) + time.Sleep(10 * time.Millisecond) + + // Drain all messages; first must be snapshot, rest must be deltas or events. + timeout := time.After(200 * time.Millisecond) + first := true + for { + select { + case msg := <-client.send: + var parsed map[string]json.RawMessage + if err := json.Unmarshal(msg, &parsed); err != nil { + continue // skip non-JSON (binary CSI) + } + if first { + first = false + typ := strings.Trim(string(parsed["type"]), `"`) + if typ != "snapshot" { + t.Fatalf("first message should be snapshot, got type=%s", typ) + } + } else { + // Subsequent messages from tickDelta must NOT have a type field. + // Event-driven messages (BroadcastNodeConnected etc.) do have type. + if _, hasType := parsed["type"]; hasType { + typ := strings.Trim(string(parsed["type"]), `"`) + // These are acceptable event types + switch typ { + case "node_connected", "node_disconnected", "link_active", + "link_inactive", "motion_state", "loc_update", + "registry_state", "fleet_change", "system_health", + "ble_scan", "trigger_state", "event", "alert", + "anomaly_detected", "system_mode_change", + "fleet_health", "fleet_history": + // OK — event-driven broadcast + default: + t.Errorf("unexpected type in non-snapshot message: %s", typ) + } + } + // No type field → delta update (expected from tickDelta) + } + case <-timeout: + return // test passed + } + } +} + +func TestHub_DeltaOmitsTypeField(t *testing.T) { + hub := NewHub() + go hub.Run() + + mock := &MockIngestionState{ + nodes: []ingestion.NodeInfo{ + {MAC: "AA:BB:CC:DD:EE:FF", FirmwareVersion: "1.0.0"}, + }, + } + hub.SetIngestionState(mock) + + client := &Client{ + hub: hub, + send: make(chan []byte, 256), + } + + hub.Register(client) + time.Sleep(10 * time.Millisecond) + + // Read the first message (snapshot) and discard it + select { + case <-client.send: + case <-time.After(100 * time.Millisecond): + t.Fatal("expected snapshot") + } + + // Wait for a delta tick (100ms) and check it has no type field + found := false + for i := 0; i < 5; i++ { + select { + case msg := <-client.send: + var parsed map[string]json.RawMessage + if err := json.Unmarshal(msg, &parsed); err != nil { + continue + } + if _, hasType := parsed["type"]; !hasType { + // This is a delta message — must have timestamp_ms + if _, ok := parsed["timestamp_ms"]; !ok { + t.Error("delta message missing timestamp_ms") + } + found = true + } + case <-time.After(150 * time.Millisecond): + // Try next tick + } + } + if !found { + t.Error("expected at least one delta message (no type field)") } }