feat: implement snapshot-on-connect + incremental update protocol for dashboard WebSocket

- Hub sends full snapshot (type=snapshot) on new client connect before adding
  to broadcast list, preventing race with delta messages
- tickDelta at 10 Hz computes byte-level diffs of cached JSON fields, broadcasts
  only changed fields with no type field
- Dashboard sets awaitingSnapshot on WS open, drops incremental updates until
  snapshot received, rebuilds full state from snapshot
- Fix zone snapshot building (nil→ok check, SizeX fields)
- Remove unused broadcastBLEScan function
- Add drainSnapshot helper and fix TestHub_LinkEvents to account for snapshot
- Add TestHub_SnapshotOnConnect, TestHub_SnapshotBeforeDelta, TestHub_DeltaOmitsTypeField

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-06 21:59:39 -04:00
parent dbc18e2793
commit 757d7240dc
3 changed files with 329 additions and 32 deletions

View file

@ -37,6 +37,7 @@
const state = {
ws: null,
wsConnected: false,
awaitingSnapshot: false, // true between WS open and first snapshot message
nodes: new Map(), // MAC -> { mac, firmware, chip, lastSeen }
links: new Map(), // linkID -> { nodeMAC, peerMAC, lastFrame, lastCSI, motionDetected, deltaRMS, ampHistory, lastAmpSample }
selectedLinkID: null,
@ -377,6 +378,7 @@
state.ws.onopen = function() {
console.log('[Spaxel] WebSocket connected');
state.wsConnected = true;
state.awaitingSnapshot = true;
updateConnectionStatus(true);
};
@ -555,6 +557,19 @@
}
function handleJSONMessage(msg) {
// Snapshot: first message on every connect/reconnect. Contains full state.
if (msg.type === 'snapshot') {
handleSnapshot(msg);
return;
}
// Incremental update: 10 Hz delta with no type field.
// Only fields that changed since last tick are present.
if (!msg.type) {
handleIncrementalUpdate(msg);
return;
}
switch (msg.type) {
case 'state':
// Initial state dump
@ -733,6 +748,153 @@
}
}
// ─── Snapshot + Incremental Update Protocol ─────────────────────────────
function handleSnapshot(msg) {
state.awaitingSnapshot = false;
console.log('[Spaxel] Received snapshot, rebuilding state');
// Nodes
if (msg.nodes) {
state.nodes.clear();
msg.nodes.forEach(function (node) {
state.nodes.set(node.mac, {
mac: node.mac,
firmware: node.firmware_version,
chip: node.chip,
lastSeen: Date.now()
});
});
}
// Links
if (msg.links) {
state.links.clear();
msg.links.forEach(function (link) {
state.links.set(link.id, {
nodeMAC: link.node_mac,
peerMAC: link.peer_mac,
lastFrame: Date.now(),
lastCSI: null,
motionDetected: false,
deltaRMS: 0,
ampHistory: [],
lastAmpSample: 0
});
});
}
// Motion states
if (msg.motion_states) {
msg.motion_states.forEach(function (ms) { applyMotionState(ms); });
}
// Blobs (localisation)
if (msg.blobs) {
Viz3D.handleLocUpdate({ type: 'loc_update', blobs: msg.blobs });
}
// BLE devices
if (msg.ble_devices) {
handleBLEScanMessage({ type: 'ble_scan', devices: msg.ble_devices });
}
// Triggers
if (msg.triggers) {
msg.triggers.forEach(function (trigger) {
if (window.Automations && window.Automations.updateTriggerState) {
window.Automations.updateTriggerState(trigger);
}
});
}
// Zones
if (msg.zones) {
if (window.Viz3D && window.Viz3D.handleZoneUpdate) {
Viz3D.handleZoneUpdate(msg.zones);
}
}
updateNodeList();
updateLinkList();
Viz3D.applyLinks(msg.links || []);
}
function handleIncrementalUpdate(msg) {
// Drop incremental updates until the snapshot has been received.
if (state.awaitingSnapshot) return;
// Blobs (always present when localisation is running)
if (msg.blobs) {
Viz3D.handleLocUpdate({ type: 'loc_update', blobs: msg.blobs });
}
// Nodes (only present when node list changed)
if (msg.nodes !== undefined) {
if (msg.nodes.length > 0) {
msg.nodes.forEach(function (node) {
state.nodes.set(node.mac, {
mac: node.mac,
firmware: node.firmware_version,
chip: node.chip,
lastSeen: Date.now()
});
});
}
updateNodeList();
}
// Links (only present when link list changed)
if (msg.links !== undefined) {
if (msg.links.length > 0) {
msg.links.forEach(function (link) {
state.links.set(link.id, {
nodeMAC: link.node_mac,
peerMAC: link.peer_mac,
lastFrame: Date.now(),
lastCSI: null,
motionDetected: false,
deltaRMS: 0,
ampHistory: [],
lastAmpSample: 0
});
});
}
updateLinkList();
Viz3D.applyLinks(msg.links);
}
// Motion states (only present when motion state changed)
if (msg.motion_states) {
var changed = false;
msg.motion_states.forEach(function (ms) {
if (applyMotionState(ms)) changed = true;
});
if (changed) updateLinkList();
}
// BLE devices
if (msg.ble_devices) {
handleBLEScanMessage({ type: 'ble_scan', devices: msg.ble_devices });
}
// Triggers
if (msg.triggers) {
msg.triggers.forEach(function (trigger) {
if (window.Automations && window.Automations.updateTriggerState) {
window.Automations.updateTriggerState(trigger);
}
});
}
// Zones
if (msg.zones) {
if (window.Viz3D && window.Viz3D.handleZoneUpdate) {
Viz3D.handleZoneUpdate(msg.zones);
}
}
}
// applyMotionState updates a link's motion fields; returns true if it changed.
function applyMotionState(ms) {
const link = state.links.get(ms.link_id);

View file

@ -501,23 +501,23 @@ func (h *Hub) buildZoneSnapshots(zp ZoneStateProvider) []ZoneSnapshot {
occupancy := zp.GetOccupancy()
result := make([]ZoneSnapshot, 0, len(zones))
for _, z := range zones {
occ := occupancy[z.ID]
occ, ok := occupancy[z.ID]
people := make([]string, 0)
if occ != nil {
if ok {
// Blob IDs don't have names yet; leave people empty.
_ = occ.BlobIDs
}
result = append(result, ZoneSnapshot{
ID: z.ID,
Name: z.Name,
Count: func() int { if occ != nil { return occ.Count }; return 0 }(),
Count: occ.Count,
People: people,
MinX: z.MinX,
MinY: z.MinY,
MinZ: z.MinZ,
SizeX: z.MaxX - z.MinX,
SizeY: z.MaxY - z.MinY,
SizeZ: z.MaxZ - z.MinZ,
SizeX: z.SizeX,
SizeY: z.SizeY,
SizeZ: z.SizeZ,
})
}
return result
@ -663,25 +663,6 @@ func (h *Hub) ClientCount() int {
return len(h.clients)
}
// broadcastBLEScan broadcasts the current BLE device list to all dashboard clients.
func (h *Hub) broadcastBLEScan() {
h.mu.RLock()
state := h.bleState
clientCount := len(h.clients)
h.mu.RUnlock()
if state == nil || clientCount == 0 {
return
}
devices := state.GetCurrentDevices()
if len(devices) == 0 {
return
}
h.BroadcastBLEScan(devices)
}
// broadcastSystemHealth broadcasts system health stats to all dashboard clients.
func (h *Hub) broadcastSystemHealth() {
h.mu.RLock()

View file

@ -1,6 +1,8 @@
package dashboard
import (
"encoding/json"
"strings"
"testing"
"time"
@ -36,6 +38,16 @@ func TestHub_RegisterUnregister(t *testing.T) {
}
}
// drainSnapshot reads and discards the initial snapshot message sent on connect.
func drainSnapshot(t *testing.T, ch chan []byte) {
t.Helper()
select {
case <-ch:
case <-time.After(100 * time.Millisecond):
t.Fatal("expected snapshot message")
}
}
func TestHub_Broadcast(t *testing.T) {
hub := NewHub()
go hub.Run()
@ -47,6 +59,7 @@ func TestHub_Broadcast(t *testing.T) {
hub.Register(client)
time.Sleep(10 * time.Millisecond)
drainSnapshot(t, client.send)
// Broadcast a message
testMsg := []byte(`{"type":"test"}`)
@ -74,6 +87,7 @@ func TestHub_BroadcastCSI(t *testing.T) {
hub.Register(client)
time.Sleep(10 * time.Millisecond)
drainSnapshot(t, client.send)
// Broadcast CSI (raw binary data)
testData := []byte{0x01, 0x02, 0x03, 0x04}
@ -100,6 +114,7 @@ func TestHub_NodeEvents(t *testing.T) {
hub.Register(client)
time.Sleep(10 * time.Millisecond)
drainSnapshot(t, client.send)
// Test node connected event
hub.BroadcastNodeConnected("AA:BB:CC:DD:EE:FF", "1.0.0", "ESP32-S3")
@ -131,6 +146,7 @@ func TestHub_LinkEvents(t *testing.T) {
hub.Register(client)
time.Sleep(10 * time.Millisecond)
drainSnapshot(t, client.send)
// Test link active event
hub.BroadcastLinkActive("AA:BB:CC:DD:EE:FF:11:22:33:44:55:66", "AA:BB:CC:DD:EE:FF", "11:22:33:44:55:66")
@ -160,11 +176,11 @@ func (m *MockIngestionState) GetAllMotionStates() []ingestion.MotionStateItem {
return nil
}
func TestHub_InitialState(t *testing.T) {
func TestHub_SnapshotOnConnect(t *testing.T) {
hub := NewHub()
go hub.Run()
// Set mock ingestion state
// Set mock ingestion state so the snapshot has content
mock := &MockIngestionState{
nodes: []ingestion.NodeInfo{
{MAC: "AA:BB:CC:DD:EE:FF", FirmwareVersion: "1.0.0", Chip: "ESP32-S3"},
@ -183,10 +199,148 @@ func TestHub_InitialState(t *testing.T) {
hub.Register(client)
time.Sleep(10 * time.Millisecond)
// Should receive initial state
msg := <-client.send
// Just check it's a valid JSON with type "state"
if len(msg) == 0 || msg[0] != '{' {
t.Errorf("expected JSON state message, got %v", msg)
// First message must be the snapshot
select {
case msg := <-client.send:
if len(msg) == 0 || msg[0] != '{' {
t.Fatalf("expected JSON snapshot message, got %v", msg)
}
var parsed map[string]json.RawMessage
if err := json.Unmarshal(msg, &parsed); err != nil {
t.Fatalf("failed to parse snapshot JSON: %v", err)
}
// Must have type="snapshot"
if typ, ok := parsed["type"]; !ok {
t.Fatal("snapshot missing 'type' field")
} else if strings.Trim(string(typ), `"`) != "snapshot" {
t.Fatalf("expected type=snapshot, got %s", string(typ))
}
// Must have timestamp_ms
if _, ok := parsed["timestamp_ms"]; !ok {
t.Fatal("snapshot missing 'timestamp_ms' field")
}
// Must contain nodes from the mock
if _, ok := parsed["nodes"]; !ok {
t.Fatal("snapshot missing 'nodes' field")
}
case <-time.After(100 * time.Millisecond):
t.Fatal("expected snapshot message within 100 ms")
}
}
func TestHub_SnapshotBeforeDelta(t *testing.T) {
hub := NewHub()
go hub.Run()
mock := &MockIngestionState{
nodes: []ingestion.NodeInfo{
{MAC: "AA:BB:CC:DD:EE:FF", FirmwareVersion: "1.0.0"},
},
}
hub.SetIngestionState(mock)
client := &Client{
hub: hub,
send: make(chan []byte, 256),
}
hub.Register(client)
time.Sleep(10 * time.Millisecond)
// Drain all messages; first must be snapshot, rest must be deltas or events.
timeout := time.After(200 * time.Millisecond)
first := true
for {
select {
case msg := <-client.send:
var parsed map[string]json.RawMessage
if err := json.Unmarshal(msg, &parsed); err != nil {
continue // skip non-JSON (binary CSI)
}
if first {
first = false
typ := strings.Trim(string(parsed["type"]), `"`)
if typ != "snapshot" {
t.Fatalf("first message should be snapshot, got type=%s", typ)
}
} else {
// Subsequent messages from tickDelta must NOT have a type field.
// Event-driven messages (BroadcastNodeConnected etc.) do have type.
if _, hasType := parsed["type"]; hasType {
typ := strings.Trim(string(parsed["type"]), `"`)
// These are acceptable event types
switch typ {
case "node_connected", "node_disconnected", "link_active",
"link_inactive", "motion_state", "loc_update",
"registry_state", "fleet_change", "system_health",
"ble_scan", "trigger_state", "event", "alert",
"anomaly_detected", "system_mode_change",
"fleet_health", "fleet_history":
// OK — event-driven broadcast
default:
t.Errorf("unexpected type in non-snapshot message: %s", typ)
}
}
// No type field → delta update (expected from tickDelta)
}
case <-timeout:
return // test passed
}
}
}
func TestHub_DeltaOmitsTypeField(t *testing.T) {
hub := NewHub()
go hub.Run()
mock := &MockIngestionState{
nodes: []ingestion.NodeInfo{
{MAC: "AA:BB:CC:DD:EE:FF", FirmwareVersion: "1.0.0"},
},
}
hub.SetIngestionState(mock)
client := &Client{
hub: hub,
send: make(chan []byte, 256),
}
hub.Register(client)
time.Sleep(10 * time.Millisecond)
// Read the first message (snapshot) and discard it
select {
case <-client.send:
case <-time.After(100 * time.Millisecond):
t.Fatal("expected snapshot")
}
// Wait for a delta tick (100ms) and check it has no type field
found := false
for i := 0; i < 5; i++ {
select {
case msg := <-client.send:
var parsed map[string]json.RawMessage
if err := json.Unmarshal(msg, &parsed); err != nil {
continue
}
if _, hasType := parsed["type"]; !hasType {
// This is a delta message — must have timestamp_ms
if _, ok := parsed["timestamp_ms"]; !ok {
t.Error("delta message missing timestamp_ms")
}
found = true
}
case <-time.After(150 * time.Millisecond):
// Try next tick
}
}
if !found {
t.Error("expected at least one delta message (no type field)")
}
}