diff --git a/PROGRESS.md b/PROGRESS.md new file mode 100644 index 0000000..8a62fec --- /dev/null +++ b/PROGRESS.md @@ -0,0 +1,64 @@ +# Spaxel Implementation Progress + +## Phase 1 — Foundation + +Goal: Bare-minimum loop from ESP32 to browser. Zero-config with passive radar and mDNS from day one. + +### Status + +| Item | Status | Notes | +|------|--------|-------| +| ESP32 firmware skeleton | Not started | | +| Passive radar support | Not started | (part of firmware) | +| BLE scanning | Not started | (part of firmware) | +| Mothership WebSocket ingestion | **Done** | See iteration 1 below | +| Dashboard skeleton | Not started | | +| Docker packaging | Not started | | + +### Iteration Log + +#### Iteration 1 — 2026-03-26 + +**Completed:** Mothership WebSocket ingestion server + +Implemented the core ingestion server in Go with: + +- **Module structure:** `mothership/` with `cmd/mothership/` entrypoint and `internal/ingestion/` package +- **WebSocket endpoint:** `/ws/node` accepts bidirectional connections from ESP32 nodes +- **Binary frame parsing:** 24-byte header + variable payload, per spec in plan.md + - Validation: min/max length, payload size match, channel validity (1-14), subcarrier limit (128) + - Malformed frame tracking with warn/close thresholds (100/1000 per minute) +- **JSON message handling:** Parses hello, health, ble, motion_hint, ota_status +- **Per-link ring buffers:** 256-sample circular buffers keyed by `nodeMAC:peerMAC` +- **Connection lifecycle:** Node registration via hello, ping/pong keepalive (30s/60s), graceful shutdown +- **mDNS advertisement:** `_spaxel._tcp.local` via github.com/hashicorp/mdns +- **Role/config push:** Sends initial `rx` role and 20 Hz config on connect +- **Health endpoint:** `GET /healthz` returns `{"status":"ok","version":"..."}` + +**Dependencies used:** +- `github.com/go-chi/chi` — HTTP routing +- `github.com/gorilla/websocket` — WebSocket server +- `github.com/hashicorp/mdns` — mDNS advertisement + +**Tests:** 22 tests covering frame parsing, JSON messages, and ring buffer operations. All pass. + +**Files created:** +``` +mothership/ +├── cmd/mothership/main.go +├── go.mod +├── go.sum +└── internal/ingestion/ + ├── frame.go + ├── frame_test.go + ├── message.go + ├── message_test.go + ├── ring.go + ├── ring_test.go + └── server.go +``` + +**Remaining for Phase 1:** +- ESP32 firmware (WiFi, mDNS discovery, CSI capture, WebSocket client) +- Dashboard skeleton (HTML/JS + Three.js) +- Docker packaging diff --git a/mothership/cmd/mothership/main.go b/mothership/cmd/mothership/main.go new file mode 100644 index 0000000..159f2d5 --- /dev/null +++ b/mothership/cmd/mothership/main.go @@ -0,0 +1,162 @@ +// Package main provides the mothership entry point +package main + +import ( + "context" + "flag" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/go-chi/chi" + "github.com/go-chi/chi/middleware" + "github.com/hashicorp/mdns" + "github.com/spaxel/mothership/internal/ingestion" +) + +// Build-time version injection +var version = "dev" + +// Config holds application configuration +type Config struct { + BindAddr string + DataDir string + MDNSName string + MDNSEnabled bool + LogLevel string +} + +func main() { + cfg := parseConfig() + log.Printf("[INFO] Spaxel mothership v%s starting", version) + log.Printf("[DEBUG] Config: bind=%s data=%s mdns=%s", cfg.BindAddr, cfg.DataDir, cfg.MDNSName) + + // Create context with cancellation for graceful shutdown + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set up signal handling + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) + + // Create router + r := chi.NewRouter() + r.Use(middleware.Logger) + r.Use(middleware.Recoverer) + + // Health check endpoint + r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, `{"status":"ok","version":"%s"}`, version) + }) + + // Create ingestion server + ingestSrv := ingestion.NewServer() + r.HandleFunc("/ws/node", ingestSrv.HandleNodeWS) + + // Start mDNS advertisement + var mdnsServer *mdns.Server + if cfg.MDNSEnabled { + service, err := mdns.NewMDNSService( + cfg.MDNSName, + "_spaxel._tcp", + "local.", + "", + 8080, + nil, + []string{"version=1", "ws=/ws/node", "api=/api"}, + ) + if err != nil { + log.Printf("[ERROR] Failed to create mDNS service: %v", err) + } else { + mdnsServer, err = mdns.NewServer(&mdns.Config{Zone: service}) + if err != nil { + log.Printf("[ERROR] Failed to start mDNS server: %v", err) + } else { + log.Printf("[INFO] mDNS advertising %s._spaxel._tcp.local:8080", cfg.MDNSName) + } + } + } + + // Start HTTP server + srv := &http.Server{ + Addr: cfg.BindAddr, + Handler: r, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + } + + // Run server in goroutine + go func() { + log.Printf("[INFO] HTTP server listening on %s", cfg.BindAddr) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("[FATAL] HTTP server error: %v", err) + } + }() + + // Wait for shutdown signal + sig := <-sigChan + log.Printf("[INFO] Received signal %v, initiating graceful shutdown", sig) + + // Shutdown sequence + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer shutdownCancel() + + // Stop accepting new connections + if err := srv.Shutdown(shutdownCtx); err != nil { + log.Printf("[ERROR] HTTP server shutdown error: %v", err) + } + + // Close ingestion server (drains connections) + ingestSrv.Shutdown(shutdownCtx) + + // Stop mDNS + if mdnsServer != nil { + mdnsServer.Shutdown() + } + + cancel() + log.Printf("[INFO] Shutdown complete") +} + +func parseConfig() Config { + bindAddr := getEnv("SPAXEL_BIND_ADDR", "0.0.0.0:8080") + dataDir := getEnv("SPAXEL_DATA_DIR", "/data") + mdnsName := getEnv("SPAXEL_MDNS_NAME", "spaxel") + mdnsEnabled := getEnvBool("SPAXEL_MDNS_ENABLED", true) + logLevel := getEnv("SPAXEL_LOG_LEVEL", "info") + + flag.StringVar(&bindAddr, "bind", bindAddr, "Listen address") + flag.StringVar(&dataDir, "data", dataDir, "Data directory") + flag.StringVar(&mdnsName, "mdns-name", mdnsName, "mDNS service name") + flag.BoolVar(&mdnsEnabled, "mdns", mdnsEnabled, "Enable mDNS advertisement") + flag.StringVar(&logLevel, "log-level", logLevel, "Log level (debug, info, warn, error)") + flag.Parse() + + return Config{ + BindAddr: bindAddr, + DataDir: dataDir, + MDNSName: mdnsName, + MDNSEnabled: mdnsEnabled, + LogLevel: logLevel, + } +} + +func getEnv(key, defaultVal string) string { + if val := os.Getenv(key); val != "" { + return val + } + return defaultVal +} + +func getEnvBool(key string, defaultVal bool) bool { + if val := os.Getenv(key); val != "" { + return val == "true" || val == "1" + } + return defaultVal +} diff --git a/mothership/go.mod b/mothership/go.mod new file mode 100644 index 0000000..e954193 --- /dev/null +++ b/mothership/go.mod @@ -0,0 +1,15 @@ +module github.com/spaxel/mothership + +go 1.23 + +require ( + github.com/go-chi/chi v1.5.5 + github.com/gorilla/websocket v1.5.3 + github.com/hashicorp/mdns v1.0.5 +) + +require ( + github.com/miekg/dns v1.1.41 // indirect + golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1 // indirect + golang.org/x/sys v0.19.0 // indirect +) diff --git a/mothership/go.sum b/mothership/go.sum new file mode 100644 index 0000000..a45d683 --- /dev/null +++ b/mothership/go.sum @@ -0,0 +1,22 @@ +github.com/go-chi/chi v1.5.5 h1:vOB/HbEMt9QqBqErz07QehcOKHaWFtuj87tTDVz2qXE= +github.com/go-chi/chi v1.5.5/go.mod h1:C9JqLr3tIYjDOZpzn+BCuxY8z8vmca43EeMgyZt7irw= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/mdns v1.0.5 h1:1M5hW1cunYeoXOqHwEb/GBDDHAFo0Yqb/uz/beC6LbE= +github.com/hashicorp/mdns v1.0.5/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc= +github.com/miekg/dns v1.1.41 h1:WMszZWJG0XmzbK9FEmzH2TVcqYzFesusSIB41b8KHxY= +github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1 h1:4qWs8cYYH6PoEFy4dfhDFgoMGkwAcETd+MmPdCPMzUc= +golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/mothership/internal/ingestion/frame.go b/mothership/internal/ingestion/frame.go new file mode 100644 index 0000000..3fb1511 --- /dev/null +++ b/mothership/internal/ingestion/frame.go @@ -0,0 +1,107 @@ +// Package ingestion handles WebSocket connections from ESP32 nodes +package ingestion + +import ( + "encoding/binary" + "fmt" +) + +// Frame constants from the plan +const ( + HeaderSize = 24 // Fixed header size + MaxPayloadSize = 128 * 2 + MaxFrameSize = HeaderSize + MaxPayloadSize + MinFrameSize = HeaderSize +) + +// CSIFrame represents a parsed CSI binary frame +// Header (fixed 24 bytes): +// node_mac: 6 bytes — source node MAC +// peer_mac: 6 bytes — transmitting peer MAC +// timestamp_us: 8 bytes — uint64, microseconds since node boot +// rssi: 1 byte — int8, dBm +// noise_floor: 1 byte — int8, dBm +// channel: 1 byte — uint8, WiFi channel +// n_sub: 1 byte — uint8, subcarrier count +// Payload (n_sub × 2 bytes): +// Per subcarrier: int8 I, int8 Q +type CSIFrame struct { + NodeMAC [6]byte + PeerMAC [6]byte + TimestampUS uint64 + RSSI int8 + NoiseFloor int8 + Channel uint8 + NSub uint8 + Payload []int8 // Interleaved I,Q pairs (length = NSub * 2) +} + +// ParseFrame parses a binary WebSocket frame into a CSIFrame +// Returns nil and an error if the frame is malformed +func ParseFrame(data []byte) (*CSIFrame, error) { + // Validation rule 1: minimum length + if len(data) < MinFrameSize { + return nil, fmt.Errorf("frame too short: %d bytes (minimum %d)", len(data), MinFrameSize) + } + + // Read header fields + var frame CSIFrame + copy(frame.NodeMAC[:], data[0:6]) + copy(frame.PeerMAC[:], data[6:12]) + frame.TimestampUS = binary.LittleEndian.Uint64(data[12:20]) + frame.RSSI = int8(data[20]) + frame.NoiseFloor = int8(data[21]) + frame.Channel = uint8(data[22]) + frame.NSub = uint8(data[23]) + + // Validation rule 2: n_sub read from byte 23 + nSub := frame.NSub + + // Validation rule 3: payload length must match + expectedLen := HeaderSize + int(nSub)*2 + if len(data) != expectedLen { + return nil, fmt.Errorf("payload length mismatch: expected %d bytes, got %d", expectedLen, len(data)) + } + + // Validation rule 4: n_sub must not exceed 128 + if nSub > 128 { + return nil, fmt.Errorf("implausible subcarrier count: %d (max 128)", nSub) + } + + // Validation rule 6: channel must be valid (1-14 for 2.4 GHz) + if frame.Channel == 0 || frame.Channel > 14 { + return nil, fmt.Errorf("invalid channel: %d", frame.Channel) + } + + // Parse payload (I,Q pairs as int8) + if nSub > 0 { + frame.Payload = make([]int8, int(nSub)*2) + payloadData := data[HeaderSize:] + for i := range frame.Payload { + frame.Payload[i] = int8(payloadData[i]) + } + } + + return &frame, nil +} + +// MACString returns the node MAC as a colon-separated hex string +func (f *CSIFrame) MACString() string { + return macToString(f.NodeMAC) +} + +// PeerMACString returns the peer MAC as a colon-separated hex string +func (f *CSIFrame) PeerMACString() string { + return macToString(f.PeerMAC) +} + +// LinkID returns a unique identifier for this link (node_mac:peer_mac) +func (f *CSIFrame) LinkID() string { + return fmt.Sprintf("%s:%s", f.MACString(), f.PeerMACString()) +} + +// macToString converts a 6-byte MAC to uppercase colon-separated hex +func macToString(mac [6]byte) string { + return fmt.Sprintf("%02X:%02X:%02X:%02X:%02X:%02X", + mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]) +} diff --git a/mothership/internal/ingestion/frame_test.go b/mothership/internal/ingestion/frame_test.go new file mode 100644 index 0000000..e7dd668 --- /dev/null +++ b/mothership/internal/ingestion/frame_test.go @@ -0,0 +1,167 @@ +package ingestion + +import ( + "testing" +) + +func TestParseFrame_Valid(t *testing.T) { + // Create a valid frame with 64 subcarriers + nSub := uint8(64) + payloadSize := int(nSub) * 2 + frameSize := HeaderSize + payloadSize + + data := make([]byte, frameSize) + + // Node MAC: AA:BB:CC:DD:EE:FF + copy(data[0:6], []byte{0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF}) + + // Peer MAC: 11:22:33:44:55:66 + copy(data[6:12], []byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66}) + + // Timestamp: 12345678 microseconds (little-endian uint64) + data[12] = 0x4E + data[13] = 0x61 + data[14] = 0xBC + data[15] = 0x00 + // bytes 16-19 are 0 + + // RSSI: -52 dBm (int8 = 204 = 0xCC) + data[20] = 0xCC + + // Noise floor: -95 dBm (int8 = 161 = 0xA1) + data[21] = 0xA1 + + // Channel: 6 + data[22] = 0x06 + + // n_sub: 64 + data[23] = nSub + + // Payload: alternating I, Q values + for i := 0; i < payloadSize; i++ { + data[HeaderSize+i] = byte(i % 256) + } + + frame, err := ParseFrame(data) + if err != nil { + t.Fatalf("ParseFrame failed: %v", err) + } + + // Verify header fields + expectedNodeMAC := [6]byte{0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF} + if frame.NodeMAC != expectedNodeMAC { + t.Errorf("NodeMAC mismatch: got %v, want %v", frame.NodeMAC, expectedNodeMAC) + } + + expectedPeerMAC := [6]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66} + if frame.PeerMAC != expectedPeerMAC { + t.Errorf("PeerMAC mismatch: got %v, want %v", frame.PeerMAC, expectedPeerMAC) + } + + if frame.TimestampUS != 0xBC614E { + t.Errorf("TimestampUS mismatch: got %d, want %d", frame.TimestampUS, 0xBC614E) + } + + if frame.RSSI != -52 { + t.Errorf("RSSI mismatch: got %d, want -52", frame.RSSI) + } + + if frame.NoiseFloor != -95 { + t.Errorf("NoiseFloor mismatch: got %d, want -95", frame.NoiseFloor) + } + + if frame.Channel != 6 { + t.Errorf("Channel mismatch: got %d, want 6", frame.Channel) + } + + if frame.NSub != 64 { + t.Errorf("NSub mismatch: got %d, want 64", frame.NSub) + } + + // Verify payload length + if len(frame.Payload) != int(nSub)*2 { + t.Errorf("Payload length mismatch: got %d, want %d", len(frame.Payload), int(nSub)*2) + } + + // Verify MAC string format + if frame.MACString() != "AA:BB:CC:DD:EE:FF" { + t.Errorf("MACString mismatch: got %s", frame.MACString()) + } + + if frame.PeerMACString() != "11:22:33:44:55:66" { + t.Errorf("PeerMACString mismatch: got %s", frame.PeerMACString()) + } + + if frame.LinkID() != "AA:BB:CC:DD:EE:FF:11:22:33:44:55:66" { + t.Errorf("LinkID mismatch: got %s", frame.LinkID()) + } +} + +func TestParseFrame_TooShort(t *testing.T) { + data := make([]byte, 10) // Less than header size + _, err := ParseFrame(data) + if err == nil { + t.Error("Expected error for short frame") + } +} + +func TestParseFrame_PayloadMismatch(t *testing.T) { + data := make([]byte, HeaderSize+10) + data[23] = 64 // n_sub says 64, but only 10 bytes of payload + _, err := ParseFrame(data) + if err == nil { + t.Error("Expected error for payload length mismatch") + } +} + +func TestParseFrame_InvalidChannel(t *testing.T) { + tests := []struct { + name string + channel uint8 + }{ + {"zero channel", 0}, + {"channel 15", 15}, + {"channel 255", 255}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + data := make([]byte, HeaderSize) + data[22] = tt.channel + data[23] = 0 // n_sub = 0 is valid (header-only frame) + _, err := ParseFrame(data) + if err == nil { + t.Error("Expected error for invalid channel") + } + }) + } +} + +func TestParseFrame_ExcessSubcarriers(t *testing.T) { + data := make([]byte, HeaderSize+256) + data[23] = 129 // n_sub > 128 + _, err := ParseFrame(data) + if err == nil { + t.Error("Expected error for excess subcarriers") + } +} + +func TestParseFrame_HeaderOnly(t *testing.T) { + // Header-only frame (n_sub = 0) is valid per the spec + data := make([]byte, HeaderSize) + data[22] = 6 // valid channel + data[23] = 0 // n_sub = 0 + + frame, err := ParseFrame(data) + if err != nil { + t.Fatalf("ParseFrame failed: %v", err) + } + + if frame.NSub != 0 { + t.Errorf("NSub mismatch: got %d, want 0", frame.NSub) + } + + if len(frame.Payload) != 0 { + t.Errorf("Payload should be empty, got %d bytes", len(frame.Payload)) + } +} diff --git a/mothership/internal/ingestion/message.go b/mothership/internal/ingestion/message.go new file mode 100644 index 0000000..8460ca2 --- /dev/null +++ b/mothership/internal/ingestion/message.go @@ -0,0 +1,177 @@ +package ingestion + +import ( + "encoding/json" + "fmt" +) + +// Upstream messages (node -> mothership) + +// HelloMessage is sent by the node on WebSocket connect +type HelloMessage struct { + Type string `json:"type"` + MAC string `json:"mac"` + NodeID string `json:"node_id,omitempty"` + FirmwareVersion string `json:"firmware_version"` + Capabilities []string `json:"capabilities"` + Chip string `json:"chip,omitempty"` + FlashMB int `json:"flash_mb,omitempty"` + UptimeMS int64 `json:"uptime_ms,omitempty"` + APBSSID string `json:"ap_bssid,omitempty"` + APChannel int `json:"ap_channel,omitempty"` +} + +// HealthMessage is sent every 10 seconds +type HealthMessage struct { + Type string `json:"type"` + MAC string `json:"mac"` + TimestampMS int64 `json:"timestamp_ms"` + FreeHeapBytes int64 `json:"free_heap_bytes"` + WifiRSSIdBm int `json:"wifi_rssi_dbm"` + UptimeMS int64 `json:"uptime_ms"` + TemperatureC float64 `json:"temperature_c,omitempty"` + CSIRateHz int `json:"csi_rate_hz"` + WifiChannel int `json:"wifi_channel"` + IP string `json:"ip,omitempty"` +} + +// BLEDevice represents a discovered BLE device +type BLEDevice struct { + Addr string `json:"addr"` + AddrType string `json:"addr_type,omitempty"` + RSSIdBm int `json:"rssi_dbm"` + Name string `json:"name,omitempty"` + MfrID int `json:"mfr_id,omitempty"` + MfrDataHex string `json:"mfr_data_hex,omitempty"` +} + +// BLEMessage is sent every 5 seconds with discovered devices +type BLEMessage struct { + Type string `json:"type"` + MAC string `json:"mac"` + TimestampMS int64 `json:"timestamp_ms"` + Devices []BLEDevice `json:"devices"` +} + +// MotionHintMessage is sent when on-device variance exceeds threshold +type MotionHintMessage struct { + Type string `json:"type"` + MAC string `json:"mac"` + TimestampMS int64 `json:"timestamp_ms"` + Variance float64 `json:"variance"` +} + +// OTAStatusMessage reports OTA progress +type OTAStatusMessage struct { + Type string `json:"type"` + MAC string `json:"mac"` + State string `json:"state"` // downloading | verifying | writing | rebooting | failed + ProgressPct int `json:"progress_pct,omitempty"` + Error string `json:"error,omitempty"` +} + +// Downstream messages (mothership -> node) + +// RoleMessage assigns operational role to a node +type RoleMessage struct { + Type string `json:"type"` + Role string `json:"role"` // tx | rx | tx_rx | passive | idle + PassiveBSSID string `json:"passive_bssid,omitempty"` +} + +// ConfigMessage changes operational parameters +type ConfigMessage struct { + Type string `json:"type"` + RateHz *int `json:"rate_hz,omitempty"` + TXSlotUS *int `json:"tx_slot_us,omitempty"` + VarianceThreshold *float64 `json:"variance_threshold,omitempty"` +} + +// OTAMessage triggers a firmware update +type OTAMessage struct { + Type string `json:"type"` + URL string `json:"url"` + SHA256 string `json:"sha256"` + Version string `json:"version"` +} + +// RebootMessage triggers a node reboot +type RebootMessage struct { + Type string `json:"type"` + DelayMS int `json:"delay_ms,omitempty"` +} + +// IdentifyMessage triggers LED blink for identification +type IdentifyMessage struct { + Type string `json:"type"` + DurationMS int `json:"duration_ms,omitempty"` +} + +// BaselineRequestMessage requests baseline data from node +type BaselineRequestMessage struct { + Type string `json:"type"` +} + +// ShutdownMessage notifies node of mothership shutdown +type ShutdownMessage struct { + Type string `json:"type"` + ReconnectInMS int `json:"reconnect_in_ms"` +} + +// RejectMessage rejects a connection +type RejectMessage struct { + Type string `json:"type"` + Reason string `json:"reason"` // invalid_token | unknown_node | rate_limited +} + +// ParseJSONMessage parses a JSON message and returns the appropriate type +func ParseJSONMessage(data []byte) (interface{}, error) { + // First, extract the type field + var base struct { + Type string `json:"type"` + } + if err := json.Unmarshal(data, &base); err != nil { + return nil, fmt.Errorf("failed to parse message type: %w", err) + } + + switch base.Type { + case "hello": + var msg HelloMessage + if err := json.Unmarshal(data, &msg); err != nil { + return nil, err + } + return &msg, nil + case "health": + var msg HealthMessage + if err := json.Unmarshal(data, &msg); err != nil { + return nil, err + } + return &msg, nil + case "ble": + var msg BLEMessage + if err := json.Unmarshal(data, &msg); err != nil { + return nil, err + } + return &msg, nil + case "motion_hint": + var msg MotionHintMessage + if err := json.Unmarshal(data, &msg); err != nil { + return nil, err + } + return &msg, nil + case "ota_status": + var msg OTAStatusMessage + if err := json.Unmarshal(data, &msg); err != nil { + return nil, err + } + return &msg, nil + default: + // Unknown type - per protocol, silently ignore but return raw + return nil, fmt.Errorf("unknown message type: %s", base.Type) + } +} + +// ToJSON serializes a downstream message to JSON bytes +func ToJSON(msg interface{}) ([]byte, error) { + return json.Marshal(msg) +} diff --git a/mothership/internal/ingestion/message_test.go b/mothership/internal/ingestion/message_test.go new file mode 100644 index 0000000..020a420 --- /dev/null +++ b/mothership/internal/ingestion/message_test.go @@ -0,0 +1,207 @@ +package ingestion + +import ( + "encoding/json" + "testing" +) + +func TestParseJSONMessage_Hello(t *testing.T) { + data := `{"type":"hello","mac":"AA:BB:CC:DD:EE:FF","firmware_version":"1.0.0","capabilities":["csi","ble","tx","rx"],"chip":"ESP32-S3","flash_mb":16,"uptime_ms":4200}` + + msg, err := ParseJSONMessage([]byte(data)) + if err != nil { + t.Fatalf("ParseJSONMessage failed: %v", err) + } + + hello, ok := msg.(*HelloMessage) + if !ok { + t.Fatal("Expected HelloMessage type") + } + + if hello.MAC != "AA:BB:CC:DD:EE:FF" { + t.Errorf("MAC mismatch: got %s", hello.MAC) + } + + if hello.FirmwareVersion != "1.0.0" { + t.Errorf("FirmwareVersion mismatch: got %s", hello.FirmwareVersion) + } + + if len(hello.Capabilities) != 4 { + t.Errorf("Capabilities count: got %d, want 4", len(hello.Capabilities)) + } + + if hello.Chip != "ESP32-S3" { + t.Errorf("Chip mismatch: got %s", hello.Chip) + } + + if hello.FlashMB != 16 { + t.Errorf("FlashMB mismatch: got %d", hello.FlashMB) + } +} + +func TestParseJSONMessage_Health(t *testing.T) { + data := `{"type":"health","mac":"AA:BB:CC:DD:EE:FF","timestamp_ms":1711234567890,"free_heap_bytes":204800,"wifi_rssi_dbm":-52,"uptime_ms":3600000,"temperature_c":42.1,"csi_rate_hz":20,"wifi_channel":6,"ip":"192.168.1.123"}` + + msg, err := ParseJSONMessage([]byte(data)) + if err != nil { + t.Fatalf("ParseJSONMessage failed: %v", err) + } + + health, ok := msg.(*HealthMessage) + if !ok { + t.Fatal("Expected HealthMessage type") + } + + if health.MAC != "AA:BB:CC:DD:EE:FF" { + t.Errorf("MAC mismatch: got %s", health.MAC) + } + + if health.FreeHeapBytes != 204800 { + t.Errorf("FreeHeapBytes mismatch: got %d", health.FreeHeapBytes) + } + + if health.WifiRSSIdBm != -52 { + t.Errorf("WifiRSSIdBm mismatch: got %d", health.WifiRSSIdBm) + } + + if health.TemperatureC != 42.1 { + t.Errorf("TemperatureC mismatch: got %f", health.TemperatureC) + } +} + +func TestParseJSONMessage_BLE(t *testing.T) { + data := `{"type":"ble","mac":"AA:BB:CC:DD:EE:FF","timestamp_ms":1711234567890,"devices":[{"addr":"AA:BB:CC:DD:EE:00","addr_type":"public","rssi_dbm":-62,"name":"iPhone"}]}` + + msg, err := ParseJSONMessage([]byte(data)) + if err != nil { + t.Fatalf("ParseJSONMessage failed: %v", err) + } + + ble, ok := msg.(*BLEMessage) + if !ok { + t.Fatal("Expected BLEMessage type") + } + + if len(ble.Devices) != 1 { + t.Fatalf("Devices count: got %d, want 1", len(ble.Devices)) + } + + if ble.Devices[0].Name != "iPhone" { + t.Errorf("Device name mismatch: got %s", ble.Devices[0].Name) + } + + if ble.Devices[0].RSSIdBm != -62 { + t.Errorf("Device RSSI mismatch: got %d", ble.Devices[0].RSSIdBm) + } +} + +func TestParseJSONMessage_MotionHint(t *testing.T) { + data := `{"type":"motion_hint","mac":"AA:BB:CC:DD:EE:FF","timestamp_ms":1711234567890,"variance":0.043}` + + msg, err := ParseJSONMessage([]byte(data)) + if err != nil { + t.Fatalf("ParseJSONMessage failed: %v", err) + } + + motion, ok := msg.(*MotionHintMessage) + if !ok { + t.Fatal("Expected MotionHintMessage type") + } + + if motion.Variance != 0.043 { + t.Errorf("Variance mismatch: got %f", motion.Variance) + } +} + +func TestParseJSONMessage_OTAStatus(t *testing.T) { + data := `{"type":"ota_status","mac":"AA:BB:CC:DD:EE:FF","state":"downloading","progress_pct":45}` + + msg, err := ParseJSONMessage([]byte(data)) + if err != nil { + t.Fatalf("ParseJSONMessage failed: %v", err) + } + + ota, ok := msg.(*OTAStatusMessage) + if !ok { + t.Fatal("Expected OTAStatusMessage type") + } + + if ota.State != "downloading" { + t.Errorf("State mismatch: got %s", ota.State) + } + + if ota.ProgressPct != 45 { + t.Errorf("ProgressPct mismatch: got %d", ota.ProgressPct) + } +} + +func TestParseJSONMessage_Unknown(t *testing.T) { + data := `{"type":"unknown_type","mac":"AA:BB:CC:DD:EE:FF"}` + + _, err := ParseJSONMessage([]byte(data)) + if err == nil { + t.Error("Expected error for unknown message type") + } +} + +func TestParseJSONMessage_Invalid(t *testing.T) { + data := `{"type":` + + _, err := ParseJSONMessage([]byte(data)) + if err == nil { + t.Error("Expected error for invalid JSON") + } +} + +func TestToJSON_RoleMessage(t *testing.T) { + msg := RoleMessage{Type: "role", Role: "rx"} + data, err := ToJSON(msg) + if err != nil { + t.Fatalf("ToJSON failed: %v", err) + } + + // Verify it's valid JSON + var parsed RoleMessage + if err := json.Unmarshal(data, &parsed); err != nil { + t.Fatalf("Result is not valid JSON: %v", err) + } + + if parsed.Role != "rx" { + t.Errorf("Role mismatch: got %s", parsed.Role) + } +} + +func TestToJSON_ConfigMessage(t *testing.T) { + rate := 50 + msg := ConfigMessage{Type: "config", RateHz: &rate} + data, err := ToJSON(msg) + if err != nil { + t.Fatalf("ToJSON failed: %v", err) + } + + var parsed ConfigMessage + if err := json.Unmarshal(data, &parsed); err != nil { + t.Fatalf("Result is not valid JSON: %v", err) + } + + if parsed.RateHz == nil || *parsed.RateHz != 50 { + t.Errorf("RateHz mismatch") + } +} + +func TestToJSON_RejectMessage(t *testing.T) { + msg := RejectMessage{Type: "reject", Reason: "invalid_token"} + data, err := ToJSON(msg) + if err != nil { + t.Fatalf("ToJSON failed: %v", err) + } + + var parsed RejectMessage + if err := json.Unmarshal(data, &parsed); err != nil { + t.Fatalf("Result is not valid JSON: %v", err) + } + + if parsed.Reason != "invalid_token" { + t.Errorf("Reason mismatch: got %s", parsed.Reason) + } +} diff --git a/mothership/internal/ingestion/ring.go b/mothership/internal/ingestion/ring.go new file mode 100644 index 0000000..d983522 --- /dev/null +++ b/mothership/internal/ingestion/ring.go @@ -0,0 +1,136 @@ +package ingestion + +import ( + "sync" + "time" +) + +// RingBufferCapacity is the maximum number of CSI samples per link +const RingBufferCapacity = 256 + +// TimestampedFrame wraps a CSIFrame with the mothership receive time +type TimestampedFrame struct { + Frame *CSIFrame + RecvTime time.Time + RecvTimeNS int64 // Nanoseconds since Unix epoch for precise timing +} + +// RingBuffer is a thread-safe circular buffer for CSI samples +type RingBuffer struct { + mu sync.RWMutex + frames [RingBufferCapacity]*TimestampedFrame + head int // Next write position + count int // Number of valid entries + totalSeq int // Total frames ever written (for computing sequence numbers) +} + +// NewRingBuffer creates a new ring buffer +func NewRingBuffer() *RingBuffer { + return &RingBuffer{} +} + +// Push adds a frame to the buffer +func (rb *RingBuffer) Push(frame *CSIFrame, recvTime time.Time) { + rb.mu.Lock() + defer rb.mu.Unlock() + + rb.frames[rb.head] = &TimestampedFrame{ + Frame: frame, + RecvTime: recvTime, + RecvTimeNS: recvTime.UnixNano(), + } + rb.head = (rb.head + 1) % RingBufferCapacity + if rb.count < RingBufferCapacity { + rb.count++ + } + rb.totalSeq++ +} + +// GetAll returns all frames in chronological order (oldest first) +func (rb *RingBuffer) GetAll() []*TimestampedFrame { + rb.mu.RLock() + defer rb.mu.RUnlock() + + if rb.count == 0 { + return nil + } + + result := make([]*TimestampedFrame, rb.count) + + // If buffer is not full, start from beginning + // If buffer is full, head points to oldest entry + start := 0 + if rb.count == RingBufferCapacity { + start = rb.head + } + + for i := 0; i < rb.count; i++ { + idx := (start + i) % RingBufferCapacity + // Return copies to prevent race conditions + f := rb.frames[idx] + result[i] = &TimestampedFrame{ + Frame: f.Frame, + RecvTime: f.RecvTime, + RecvTimeNS: f.RecvTimeNS, + } + } + + return result +} + +// GetLast returns the N most recent frames (or all if N > count) +func (rb *RingBuffer) GetLast(n int) []*TimestampedFrame { + rb.mu.RLock() + defer rb.mu.RUnlock() + + if rb.count == 0 || n <= 0 { + return nil + } + + count := n + if count > rb.count { + count = rb.count + } + + result := make([]*TimestampedFrame, count) + + // Most recent entry is at (head - 1 + Capacity) % Capacity + for i := 0; i < count; i++ { + idx := (rb.head - 1 - i + RingBufferCapacity) % RingBufferCapacity + f := rb.frames[idx] + result[count-1-i] = &TimestampedFrame{ + Frame: f.Frame, + RecvTime: f.RecvTime, + RecvTimeNS: f.RecvTimeNS, + } + } + + return result +} + +// Count returns the number of frames in the buffer +func (rb *RingBuffer) Count() int { + rb.mu.RLock() + defer rb.mu.RUnlock() + return rb.count +} + +// Clear removes all frames from the buffer +func (rb *RingBuffer) Clear() { + rb.mu.Lock() + defer rb.mu.Unlock() + + rb.head = 0 + rb.count = 0 + rb.totalSeq = 0 + for i := range rb.frames { + rb.frames[i] = nil + } +} + +// TotalSeq returns the total number of frames ever pushed +func (rb *RingBuffer) TotalSeq() int { + rb.mu.RLock() + defer rb.mu.RUnlock() + return rb.totalSeq +} diff --git a/mothership/internal/ingestion/ring_test.go b/mothership/internal/ingestion/ring_test.go new file mode 100644 index 0000000..7fa865e --- /dev/null +++ b/mothership/internal/ingestion/ring_test.go @@ -0,0 +1,188 @@ +package ingestion + +import ( + "testing" + "time" +) + +func makeTestFrame(seq int) *CSIFrame { + return &CSIFrame{ + NodeMAC: [6]byte{0xAA, 0xBB, 0xCC, 0xDD, 0xEE, byte(seq)}, + PeerMAC: [6]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66}, + TimestampUS: uint64(seq * 50000), // 50ms intervals + RSSI: -50, + Channel: 6, + NSub: 2, + Payload: []int8{int8(seq), int8(seq + 1), int8(seq + 2), int8(seq + 3)}, + } +} + +func TestRingBuffer_PushAndGetAll(t *testing.T) { + rb := NewRingBuffer() + + // Push 5 frames + for i := 0; i < 5; i++ { + rb.Push(makeTestFrame(i), time.Now()) + } + + if rb.Count() != 5 { + t.Errorf("Count mismatch: got %d, want 5", rb.Count()) + } + + frames := rb.GetAll() + if len(frames) != 5 { + t.Fatalf("GetAll returned %d frames, want 5", len(frames)) + } + + // Verify chronological order (oldest first) + for i, tf := range frames { + if tf.Frame.TimestampUS != uint64(i*50000) { + t.Errorf("Frame %d has wrong timestamp: %d", i, tf.Frame.TimestampUS) + } + } +} + +func TestRingBuffer_Wraparound(t *testing.T) { + rb := NewRingBuffer() + + // Push more frames than capacity + for i := 0; i < RingBufferCapacity+50; i++ { + rb.Push(makeTestFrame(i), time.Now()) + } + + if rb.Count() != RingBufferCapacity { + t.Errorf("Count after wrap: got %d, want %d", rb.Count(), RingBufferCapacity) + } + + frames := rb.GetAll() + if len(frames) != RingBufferCapacity { + t.Fatalf("GetAll returned %d frames, want %d", len(frames), RingBufferCapacity) + } + + // First frame should be frame 50 (oldest after wrap) + if frames[0].Frame.TimestampUS != 50*50000 { + t.Errorf("First frame timestamp: got %d, want %d", frames[0].Frame.TimestampUS, 50*50000) + } + + // Last frame should be frame 299 (newest) + lastIdx := len(frames) - 1 + if frames[lastIdx].Frame.TimestampUS != (50+RingBufferCapacity-1)*50000 { + t.Errorf("Last frame timestamp: got %d, want %d", + frames[lastIdx].Frame.TimestampUS, (50+RingBufferCapacity-1)*50000) + } +} + +func TestRingBuffer_GetLast(t *testing.T) { + rb := NewRingBuffer() + + // Push 10 frames + for i := 0; i < 10; i++ { + rb.Push(makeTestFrame(i), time.Now()) + } + + // Get last 3 + frames := rb.GetLast(3) + if len(frames) != 3 { + t.Fatalf("GetLast(3) returned %d frames", len(frames)) + } + + // Should be frames 7, 8, 9 in chronological order + for i, tf := range frames { + expectedSeq := 7 + i + if tf.Frame.TimestampUS != uint64(expectedSeq*50000) { + t.Errorf("Frame %d: expected timestamp %d, got %d", + i, expectedSeq*50000, tf.Frame.TimestampUS) + } + } +} + +func TestRingBuffer_GetLastMoreThanCount(t *testing.T) { + rb := NewRingBuffer() + + // Push 5 frames + for i := 0; i < 5; i++ { + rb.Push(makeTestFrame(i), time.Now()) + } + + // Request 10, should get 5 + frames := rb.GetLast(10) + if len(frames) != 5 { + t.Errorf("GetLast(10) returned %d frames, want 5", len(frames)) + } +} + +func TestRingBuffer_Empty(t *testing.T) { + rb := NewRingBuffer() + + if rb.Count() != 0 { + t.Errorf("Empty buffer count: got %d, want 0", rb.Count()) + } + + frames := rb.GetAll() + if frames != nil { + t.Errorf("GetAll on empty buffer: got %v, want nil", frames) + } + + frames = rb.GetLast(5) + if frames != nil { + t.Errorf("GetLast on empty buffer: got %v, want nil", frames) + } +} + +func TestRingBuffer_Clear(t *testing.T) { + rb := NewRingBuffer() + + for i := 0; i < 10; i++ { + rb.Push(makeTestFrame(i), time.Now()) + } + + rb.Clear() + + if rb.Count() != 0 { + t.Errorf("After clear, count: got %d, want 0", rb.Count()) + } + + frames := rb.GetAll() + if frames != nil { + t.Errorf("After clear, GetAll: got %v, want nil", frames) + } +} + +func TestRingBuffer_TotalSeq(t *testing.T) { + rb := NewRingBuffer() + + for i := 0; i < 10; i++ { + rb.Push(makeTestFrame(i), time.Now()) + } + + if rb.TotalSeq() != 10 { + t.Errorf("TotalSeq: got %d, want 10", rb.TotalSeq()) + } + + rb.Clear() + + // TotalSeq should also reset on clear + if rb.TotalSeq() != 0 { + t.Errorf("After clear, TotalSeq: got %d, want 0", rb.TotalSeq()) + } +} + +func TestRingBuffer_RecvTime(t *testing.T) { + rb := NewRingBuffer() + + now := time.Now() + rb.Push(makeTestFrame(0), now) + + frames := rb.GetAll() + if len(frames) != 1 { + t.Fatal("Expected 1 frame") + } + + if !frames[0].RecvTime.Equal(now) { + t.Errorf("RecvTime mismatch: got %v, want %v", frames[0].RecvTime, now) + } + + if frames[0].RecvTimeNS != now.UnixNano() { + t.Errorf("RecvTimeNS mismatch: got %d, want %d", frames[0].RecvTimeNS, now.UnixNano()) + } +} diff --git a/mothership/internal/ingestion/server.go b/mothership/internal/ingestion/server.go new file mode 100644 index 0000000..88367cc --- /dev/null +++ b/mothership/internal/ingestion/server.go @@ -0,0 +1,370 @@ +package ingestion + +import ( + "context" + "encoding/json" + "log" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +// Server manages WebSocket connections from ESP32 nodes +type Server struct { + mu sync.RWMutex + connections map[string]*NodeConnection // keyed by MAC + links map[string]*RingBuffer // keyed by "nodeMAC:peerMAC" + + // Malformed frame tracking per connection + malformedCounts map[string]*malformedCounter + + // WebSocket upgrader + upgrader websocket.Upgrader + + // Shutdown state + shutdown bool +} + +// NodeConnection tracks state for a connected node +type NodeConnection struct { + MAC string + Conn *websocket.Conn + Hello *HelloMessage + LastHealth *HealthMessage + LastHealthTime time.Time + ConnectedAt time.Time + LastFrameTime time.Time + + // Write mutex for thread-safe sends + writeMu sync.Mutex +} + +// malformedCounter tracks malformed frame counts for rate limiting +type malformedCounter struct { + count int + firstSeen time.Time +} + +const ( + // Ping/pong timing + pingInterval = 30 * time.Second + readDeadline = 60 * time.Second + + // Malformed frame thresholds + malformedWarnThreshold = 100 + malformedCloseThreshold = 1000 + malformedWindow = time.Minute +) + +// NewServer creates a new ingestion server +func NewServer() *Server { + return &Server{ + connections: make(map[string]*NodeConnection), + links: make(map[string]*RingBuffer), + malformedCounts: make(map[string]*malformedCounter), + upgrader: websocket.Upgrader{ + // Allow all origins for development (TODO: restrict in production) + CheckOrigin: func(r *http.Request) bool { + return true + }, + ReadBufferSize: 512, + WriteBufferSize: 512, + }, + } +} + +// HandleNodeWS handles WebSocket connections at /ws/node +func (s *Server) HandleNodeWS(w http.ResponseWriter, r *http.Request) { + // Upgrade HTTP connection to WebSocket + conn, err := s.upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("[WARN] WebSocket upgrade failed: %v", err) + return + } + + // Set initial read deadline + conn.SetReadDeadline(time.Now().Add(readDeadline)) + + // Create connection state + nc := &NodeConnection{ + Conn: conn, + ConnectedAt: time.Now(), + } + + // Wait for hello message (must be first) + _, msg, err := conn.ReadMessage() + if err != nil { + log.Printf("[WARN] Failed to read hello: %v", err) + conn.Close() + return + } + + // Parse as JSON (hello must be JSON) + parsed, err := ParseJSONMessage(msg) + if err != nil { + s.sendReject(conn, "invalid hello format") + conn.Close() + return + } + + hello, ok := parsed.(*HelloMessage) + if !ok { + s.sendReject(conn, "expected hello first") + conn.Close() + return + } + + nc.MAC = hello.MAC + nc.Hello = hello + + // Register connection + s.mu.Lock() + // Close existing connection from same MAC if present + if existing, exists := s.connections[hello.MAC]; exists { + existing.Conn.Close() + } + s.connections[hello.MAC] = nc + s.malformedCounts[hello.MAC] = &malformedCounter{} + s.mu.Unlock() + + log.Printf("[INFO] Node connected: MAC=%s firmware=%s chip=%s", + hello.MAC, hello.FirmwareVersion, hello.Chip) + + // Send initial role and config + s.sendRole(nc, "rx", "") + s.sendConfig(nc, 20, 0, 0) // 20 Hz default + + // Start ping goroutine + go s.pingLoop(nc) + + // Message handling loop + s.handleMessages(nc) +} + +// handleMessages processes incoming WebSocket messages +func (s *Server) handleMessages(nc *NodeConnection) { + defer func() { + nc.Conn.Close() + s.mu.Lock() + delete(s.connections, nc.MAC) + delete(s.malformedCounts, nc.MAC) + s.mu.Unlock() + log.Printf("[INFO] Node disconnected: MAC=%s", nc.MAC) + }() + + for { + // Reset read deadline on each message + nc.Conn.SetReadDeadline(time.Now().Add(readDeadline)) + + messageType, data, err := nc.Conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("[WARN] Unexpected close from %s: %v", nc.MAC, err) + } + return + } + + if messageType == websocket.BinaryMessage { + s.handleBinaryFrame(nc, data) + } else if messageType == websocket.TextMessage { + s.handleJSONMessage(nc, data) + } + } +} + +// handleBinaryFrame processes a CSI binary frame +func (s *Server) handleBinaryFrame(nc *NodeConnection, data []byte) { + frame, err := ParseFrame(data) + if err != nil { + s.recordMalformed(nc.MAC) + return + } + + // Update last frame time + nc.LastFrameTime = time.Now() + + // Get or create ring buffer for this link + linkID := frame.LinkID() + s.mu.Lock() + ring, exists := s.links[linkID] + if !exists { + ring = NewRingBuffer() + s.links[linkID] = ring + } + s.mu.Unlock() + + // Push frame to ring buffer + ring.Push(frame, time.Now()) +} + +// handleJSONMessage processes a JSON control message +func (s *Server) handleJSONMessage(nc *NodeConnection, data []byte) { + parsed, err := ParseJSONMessage(data) + if err != nil { + // Unknown types are silently ignored per protocol + return + } + + switch msg := parsed.(type) { + case *HealthMessage: + nc.LastHealth = msg + nc.LastHealthTime = time.Now() + // TODO: expose health metrics + + case *BLEMessage: + // TODO: forward BLE data to identity matcher + + case *MotionHintMessage: + // TODO: trigger adaptive rate changes + + case *OTAStatusMessage: + // TODO: track OTA progress + } +} + +// recordMalformed tracks malformed frames and closes connection if threshold exceeded +func (s *Server) recordMalformed(mac string) { + s.mu.Lock() + defer s.mu.Unlock() + + counter, exists := s.malformedCounts[mac] + if !exists { + return + } + + // Reset counter if window has passed + if time.Since(counter.firstSeen) > malformedWindow { + counter.count = 0 + counter.firstSeen = time.Now() + } + + counter.count++ + + // Warn at 100 + if counter.count == malformedWarnThreshold { + log.Printf("[WARN] Node %s sending malformed CSI frames (count=%d)", mac, counter.count) + } + + // Close at 1000 + if counter.count >= malformedCloseThreshold { + log.Printf("[ERROR] Node %s exceeded malformed frame threshold, closing connection", mac) + if nc, exists := s.connections[mac]; exists { + nc.Conn.Close() + } + } +} + +// pingLoop sends periodic ping frames to keep the connection alive +func (s *Server) pingLoop(nc *NodeConnection) { + ticker := time.NewTicker(pingInterval) + defer ticker.Stop() + + for range ticker.C { + nc.writeMu.Lock() + err := nc.Conn.WriteMessage(websocket.PingMessage, nil) + nc.writeMu.Unlock() + + if err != nil { + return // Connection closed + } + + // Check for shutdown + s.mu.RLock() + shutdown := s.shutdown + s.mu.RUnlock() + if shutdown { + return + } + } +} + +// sendReject sends a reject message and closes the connection +func (s *Server) sendReject(conn *websocket.Conn, reason string) { + msg := RejectMessage{Type: "reject", Reason: reason} + data, _ := json.Marshal(msg) + conn.WriteMessage(websocket.TextMessage, data) +} + +// sendRole sends a role assignment to a node +func (s *Server) sendRole(nc *NodeConnection, role string, passiveBSSID string) { + msg := RoleMessage{Type: "role", Role: role, PassiveBSSID: passiveBSSID} + data, _ := json.Marshal(msg) + + nc.writeMu.Lock() + nc.Conn.WriteMessage(websocket.TextMessage, data) + nc.writeMu.Unlock() +} + +// sendConfig sends configuration to a node +func (s *Server) sendConfig(nc *NodeConnection, rateHz int, txSlotUS int, varianceThreshold float64) { + msg := ConfigMessage{Type: "config"} + if rateHz > 0 { + msg.RateHz = &rateHz + } + if txSlotUS > 0 { + msg.TXSlotUS = &txSlotUS + } + if varianceThreshold > 0 { + msg.VarianceThreshold = &varianceThreshold + } + data, _ := json.Marshal(msg) + + nc.writeMu.Lock() + nc.Conn.WriteMessage(websocket.TextMessage, data) + nc.writeMu.Unlock() +} + +// Shutdown gracefully shuts down the server +func (s *Server) Shutdown(ctx context.Context) { + s.mu.Lock() + s.shutdown = true + + // Send shutdown message to all connected nodes + shutdownMsg := ShutdownMessage{Type: "shutdown", ReconnectInMS: 30000} + data, _ := json.Marshal(shutdownMsg) + + for mac, nc := range s.connections { + nc.writeMu.Lock() + nc.Conn.WriteMessage(websocket.TextMessage, data) + nc.Conn.Close() + nc.writeMu.Unlock() + delete(s.connections, mac) + } + s.mu.Unlock() + + log.Printf("[INFO] Ingestion server shutdown complete") +} + +// GetConnectedNodes returns a list of connected node MACs +func (s *Server) GetConnectedNodes() []string { + s.mu.RLock() + defer s.mu.RUnlock() + + macs := make([]string, 0, len(s.connections)) + for mac := range s.connections { + macs = append(macs, mac) + } + return macs +} + +// GetLinkBuffer returns the ring buffer for a specific link +func (s *Server) GetLinkBuffer(nodeMAC, peerMAC string) *RingBuffer { + linkID := nodeMAC + ":" + peerMAC + s.mu.RLock() + defer s.mu.RUnlock() + return s.links[linkID] +} + +// GetAllLinks returns all link IDs that have data +func (s *Server) GetAllLinks() []string { + s.mu.RLock() + defer s.mu.RUnlock() + + links := make([]string, 0, len(s.links)) + for linkID := range s.links { + links = append(links, linkID) + } + return links +}