feat(mothership): WebSocket ingestion server with binary/JSON frame parsing

- /ws/node endpoint: one goroutine per connection, bidirectional
- Binary frames: 24-byte header → CSIFrame struct; payload as []int8 I/Q pairs
- JSON frames: dispatched by "type" field (hello, health, ble, motion_hint, ota_status)
- Per-link ring buffer: 256-sample circular, keyed by (node_mac, peer_mac)
- Node identity from first "hello" — no pre-registration required
- Used gorilla/websocket: mature, SetReadDeadline, binary frame support
- mDNS via hashicorp/mdns at _spaxel._tcp.local:8080
- Ping/pong keepalive: 30s ping interval, 60s read deadline
- Malformed frame tracking: warn at 100/min, close at 1000/min

Complete: frame parsing, ring buffers, mDNS, hello/health/ble dispatch, tests (22 passing)
Remaining: OTA command dispatch, /ws/dashboard publisher, SQLite fleet manager (Phase 2)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-03-26 06:55:22 -04:00
parent 948c966226
commit 3937dbe06e
11 changed files with 1615 additions and 0 deletions

64
PROGRESS.md Normal file
View file

@ -0,0 +1,64 @@
# Spaxel Implementation Progress
## Phase 1 — Foundation
Goal: Bare-minimum loop from ESP32 to browser. Zero-config with passive radar and mDNS from day one.
### Status
| Item | Status | Notes |
|------|--------|-------|
| ESP32 firmware skeleton | Not started | |
| Passive radar support | Not started | (part of firmware) |
| BLE scanning | Not started | (part of firmware) |
| Mothership WebSocket ingestion | **Done** | See iteration 1 below |
| Dashboard skeleton | Not started | |
| Docker packaging | Not started | |
### Iteration Log
#### Iteration 1 — 2026-03-26
**Completed:** Mothership WebSocket ingestion server
Implemented the core ingestion server in Go with:
- **Module structure:** `mothership/` with `cmd/mothership/` entrypoint and `internal/ingestion/` package
- **WebSocket endpoint:** `/ws/node` accepts bidirectional connections from ESP32 nodes
- **Binary frame parsing:** 24-byte header + variable payload, per spec in plan.md
- Validation: min/max length, payload size match, channel validity (1-14), subcarrier limit (128)
- Malformed frame tracking with warn/close thresholds (100/1000 per minute)
- **JSON message handling:** Parses hello, health, ble, motion_hint, ota_status
- **Per-link ring buffers:** 256-sample circular buffers keyed by `nodeMAC:peerMAC`
- **Connection lifecycle:** Node registration via hello, ping/pong keepalive (30s/60s), graceful shutdown
- **mDNS advertisement:** `_spaxel._tcp.local` via github.com/hashicorp/mdns
- **Role/config push:** Sends initial `rx` role and 20 Hz config on connect
- **Health endpoint:** `GET /healthz` returns `{"status":"ok","version":"..."}`
**Dependencies used:**
- `github.com/go-chi/chi` — HTTP routing
- `github.com/gorilla/websocket` — WebSocket server
- `github.com/hashicorp/mdns` — mDNS advertisement
**Tests:** 22 tests covering frame parsing, JSON messages, and ring buffer operations. All pass.
**Files created:**
```
mothership/
├── cmd/mothership/main.go
├── go.mod
├── go.sum
└── internal/ingestion/
├── frame.go
├── frame_test.go
├── message.go
├── message_test.go
├── ring.go
├── ring_test.go
└── server.go
```
**Remaining for Phase 1:**
- ESP32 firmware (WiFi, mDNS discovery, CSI capture, WebSocket client)
- Dashboard skeleton (HTML/JS + Three.js)
- Docker packaging

View file

@ -0,0 +1,162 @@
// Package main provides the mothership entry point
package main
import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/hashicorp/mdns"
"github.com/spaxel/mothership/internal/ingestion"
)
// Build-time version injection
var version = "dev"
// Config holds application configuration
type Config struct {
BindAddr string
DataDir string
MDNSName string
MDNSEnabled bool
LogLevel string
}
func main() {
cfg := parseConfig()
log.Printf("[INFO] Spaxel mothership v%s starting", version)
log.Printf("[DEBUG] Config: bind=%s data=%s mdns=%s", cfg.BindAddr, cfg.DataDir, cfg.MDNSName)
// Create context with cancellation for graceful shutdown
_, cancel := context.WithCancel(context.Background())
defer cancel()
// Set up signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
// Create router
r := chi.NewRouter()
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)
// Health check endpoint
r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{"status":"ok","version":"%s"}`, version)
})
// Create ingestion server
ingestSrv := ingestion.NewServer()
r.HandleFunc("/ws/node", ingestSrv.HandleNodeWS)
// Start mDNS advertisement
var mdnsServer *mdns.Server
if cfg.MDNSEnabled {
service, err := mdns.NewMDNSService(
cfg.MDNSName,
"_spaxel._tcp",
"local.",
"",
8080,
nil,
[]string{"version=1", "ws=/ws/node", "api=/api"},
)
if err != nil {
log.Printf("[ERROR] Failed to create mDNS service: %v", err)
} else {
mdnsServer, err = mdns.NewServer(&mdns.Config{Zone: service})
if err != nil {
log.Printf("[ERROR] Failed to start mDNS server: %v", err)
} else {
log.Printf("[INFO] mDNS advertising %s._spaxel._tcp.local:8080", cfg.MDNSName)
}
}
}
// Start HTTP server
srv := &http.Server{
Addr: cfg.BindAddr,
Handler: r,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
// Run server in goroutine
go func() {
log.Printf("[INFO] HTTP server listening on %s", cfg.BindAddr)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("[FATAL] HTTP server error: %v", err)
}
}()
// Wait for shutdown signal
sig := <-sigChan
log.Printf("[INFO] Received signal %v, initiating graceful shutdown", sig)
// Shutdown sequence
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
// Stop accepting new connections
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Printf("[ERROR] HTTP server shutdown error: %v", err)
}
// Close ingestion server (drains connections)
ingestSrv.Shutdown(shutdownCtx)
// Stop mDNS
if mdnsServer != nil {
mdnsServer.Shutdown()
}
cancel()
log.Printf("[INFO] Shutdown complete")
}
func parseConfig() Config {
bindAddr := getEnv("SPAXEL_BIND_ADDR", "0.0.0.0:8080")
dataDir := getEnv("SPAXEL_DATA_DIR", "/data")
mdnsName := getEnv("SPAXEL_MDNS_NAME", "spaxel")
mdnsEnabled := getEnvBool("SPAXEL_MDNS_ENABLED", true)
logLevel := getEnv("SPAXEL_LOG_LEVEL", "info")
flag.StringVar(&bindAddr, "bind", bindAddr, "Listen address")
flag.StringVar(&dataDir, "data", dataDir, "Data directory")
flag.StringVar(&mdnsName, "mdns-name", mdnsName, "mDNS service name")
flag.BoolVar(&mdnsEnabled, "mdns", mdnsEnabled, "Enable mDNS advertisement")
flag.StringVar(&logLevel, "log-level", logLevel, "Log level (debug, info, warn, error)")
flag.Parse()
return Config{
BindAddr: bindAddr,
DataDir: dataDir,
MDNSName: mdnsName,
MDNSEnabled: mdnsEnabled,
LogLevel: logLevel,
}
}
func getEnv(key, defaultVal string) string {
if val := os.Getenv(key); val != "" {
return val
}
return defaultVal
}
func getEnvBool(key string, defaultVal bool) bool {
if val := os.Getenv(key); val != "" {
return val == "true" || val == "1"
}
return defaultVal
}

15
mothership/go.mod Normal file
View file

@ -0,0 +1,15 @@
module github.com/spaxel/mothership
go 1.23
require (
github.com/go-chi/chi v1.5.5
github.com/gorilla/websocket v1.5.3
github.com/hashicorp/mdns v1.0.5
)
require (
github.com/miekg/dns v1.1.41 // indirect
golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1 // indirect
golang.org/x/sys v0.19.0 // indirect
)

22
mothership/go.sum Normal file
View file

@ -0,0 +1,22 @@
github.com/go-chi/chi v1.5.5 h1:vOB/HbEMt9QqBqErz07QehcOKHaWFtuj87tTDVz2qXE=
github.com/go-chi/chi v1.5.5/go.mod h1:C9JqLr3tIYjDOZpzn+BCuxY8z8vmca43EeMgyZt7irw=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/mdns v1.0.5 h1:1M5hW1cunYeoXOqHwEb/GBDDHAFo0Yqb/uz/beC6LbE=
github.com/hashicorp/mdns v1.0.5/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc=
github.com/miekg/dns v1.1.41 h1:WMszZWJG0XmzbK9FEmzH2TVcqYzFesusSIB41b8KHxY=
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1 h1:4qWs8cYYH6PoEFy4dfhDFgoMGkwAcETd+MmPdCPMzUc=
golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View file

@ -0,0 +1,107 @@
// Package ingestion handles WebSocket connections from ESP32 nodes
package ingestion
import (
"encoding/binary"
"fmt"
)
// Frame constants from the plan
const (
HeaderSize = 24 // Fixed header size
MaxPayloadSize = 128 * 2
MaxFrameSize = HeaderSize + MaxPayloadSize
MinFrameSize = HeaderSize
)
// CSIFrame represents a parsed CSI binary frame
// Header (fixed 24 bytes):
// node_mac: 6 bytes — source node MAC
// peer_mac: 6 bytes — transmitting peer MAC
// timestamp_us: 8 bytes — uint64, microseconds since node boot
// rssi: 1 byte — int8, dBm
// noise_floor: 1 byte — int8, dBm
// channel: 1 byte — uint8, WiFi channel
// n_sub: 1 byte — uint8, subcarrier count
// Payload (n_sub × 2 bytes):
// Per subcarrier: int8 I, int8 Q
type CSIFrame struct {
NodeMAC [6]byte
PeerMAC [6]byte
TimestampUS uint64
RSSI int8
NoiseFloor int8
Channel uint8
NSub uint8
Payload []int8 // Interleaved I,Q pairs (length = NSub * 2)
}
// ParseFrame parses a binary WebSocket frame into a CSIFrame
// Returns nil and an error if the frame is malformed
func ParseFrame(data []byte) (*CSIFrame, error) {
// Validation rule 1: minimum length
if len(data) < MinFrameSize {
return nil, fmt.Errorf("frame too short: %d bytes (minimum %d)", len(data), MinFrameSize)
}
// Read header fields
var frame CSIFrame
copy(frame.NodeMAC[:], data[0:6])
copy(frame.PeerMAC[:], data[6:12])
frame.TimestampUS = binary.LittleEndian.Uint64(data[12:20])
frame.RSSI = int8(data[20])
frame.NoiseFloor = int8(data[21])
frame.Channel = uint8(data[22])
frame.NSub = uint8(data[23])
// Validation rule 2: n_sub read from byte 23
nSub := frame.NSub
// Validation rule 3: payload length must match
expectedLen := HeaderSize + int(nSub)*2
if len(data) != expectedLen {
return nil, fmt.Errorf("payload length mismatch: expected %d bytes, got %d", expectedLen, len(data))
}
// Validation rule 4: n_sub must not exceed 128
if nSub > 128 {
return nil, fmt.Errorf("implausible subcarrier count: %d (max 128)", nSub)
}
// Validation rule 6: channel must be valid (1-14 for 2.4 GHz)
if frame.Channel == 0 || frame.Channel > 14 {
return nil, fmt.Errorf("invalid channel: %d", frame.Channel)
}
// Parse payload (I,Q pairs as int8)
if nSub > 0 {
frame.Payload = make([]int8, int(nSub)*2)
payloadData := data[HeaderSize:]
for i := range frame.Payload {
frame.Payload[i] = int8(payloadData[i])
}
}
return &frame, nil
}
// MACString returns the node MAC as a colon-separated hex string
func (f *CSIFrame) MACString() string {
return macToString(f.NodeMAC)
}
// PeerMACString returns the peer MAC as a colon-separated hex string
func (f *CSIFrame) PeerMACString() string {
return macToString(f.PeerMAC)
}
// LinkID returns a unique identifier for this link (node_mac:peer_mac)
func (f *CSIFrame) LinkID() string {
return fmt.Sprintf("%s:%s", f.MACString(), f.PeerMACString())
}
// macToString converts a 6-byte MAC to uppercase colon-separated hex
func macToString(mac [6]byte) string {
return fmt.Sprintf("%02X:%02X:%02X:%02X:%02X:%02X",
mac[0], mac[1], mac[2], mac[3], mac[4], mac[5])
}

View file

@ -0,0 +1,167 @@
package ingestion
import (
"testing"
)
func TestParseFrame_Valid(t *testing.T) {
// Create a valid frame with 64 subcarriers
nSub := uint8(64)
payloadSize := int(nSub) * 2
frameSize := HeaderSize + payloadSize
data := make([]byte, frameSize)
// Node MAC: AA:BB:CC:DD:EE:FF
copy(data[0:6], []byte{0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF})
// Peer MAC: 11:22:33:44:55:66
copy(data[6:12], []byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66})
// Timestamp: 12345678 microseconds (little-endian uint64)
data[12] = 0x4E
data[13] = 0x61
data[14] = 0xBC
data[15] = 0x00
// bytes 16-19 are 0
// RSSI: -52 dBm (int8 = 204 = 0xCC)
data[20] = 0xCC
// Noise floor: -95 dBm (int8 = 161 = 0xA1)
data[21] = 0xA1
// Channel: 6
data[22] = 0x06
// n_sub: 64
data[23] = nSub
// Payload: alternating I, Q values
for i := 0; i < payloadSize; i++ {
data[HeaderSize+i] = byte(i % 256)
}
frame, err := ParseFrame(data)
if err != nil {
t.Fatalf("ParseFrame failed: %v", err)
}
// Verify header fields
expectedNodeMAC := [6]byte{0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF}
if frame.NodeMAC != expectedNodeMAC {
t.Errorf("NodeMAC mismatch: got %v, want %v", frame.NodeMAC, expectedNodeMAC)
}
expectedPeerMAC := [6]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66}
if frame.PeerMAC != expectedPeerMAC {
t.Errorf("PeerMAC mismatch: got %v, want %v", frame.PeerMAC, expectedPeerMAC)
}
if frame.TimestampUS != 0xBC614E {
t.Errorf("TimestampUS mismatch: got %d, want %d", frame.TimestampUS, 0xBC614E)
}
if frame.RSSI != -52 {
t.Errorf("RSSI mismatch: got %d, want -52", frame.RSSI)
}
if frame.NoiseFloor != -95 {
t.Errorf("NoiseFloor mismatch: got %d, want -95", frame.NoiseFloor)
}
if frame.Channel != 6 {
t.Errorf("Channel mismatch: got %d, want 6", frame.Channel)
}
if frame.NSub != 64 {
t.Errorf("NSub mismatch: got %d, want 64", frame.NSub)
}
// Verify payload length
if len(frame.Payload) != int(nSub)*2 {
t.Errorf("Payload length mismatch: got %d, want %d", len(frame.Payload), int(nSub)*2)
}
// Verify MAC string format
if frame.MACString() != "AA:BB:CC:DD:EE:FF" {
t.Errorf("MACString mismatch: got %s", frame.MACString())
}
if frame.PeerMACString() != "11:22:33:44:55:66" {
t.Errorf("PeerMACString mismatch: got %s", frame.PeerMACString())
}
if frame.LinkID() != "AA:BB:CC:DD:EE:FF:11:22:33:44:55:66" {
t.Errorf("LinkID mismatch: got %s", frame.LinkID())
}
}
func TestParseFrame_TooShort(t *testing.T) {
data := make([]byte, 10) // Less than header size
_, err := ParseFrame(data)
if err == nil {
t.Error("Expected error for short frame")
}
}
func TestParseFrame_PayloadMismatch(t *testing.T) {
data := make([]byte, HeaderSize+10)
data[23] = 64 // n_sub says 64, but only 10 bytes of payload
_, err := ParseFrame(data)
if err == nil {
t.Error("Expected error for payload length mismatch")
}
}
func TestParseFrame_InvalidChannel(t *testing.T) {
tests := []struct {
name string
channel uint8
}{
{"zero channel", 0},
{"channel 15", 15},
{"channel 255", 255},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
data := make([]byte, HeaderSize)
data[22] = tt.channel
data[23] = 0 // n_sub = 0 is valid (header-only frame)
_, err := ParseFrame(data)
if err == nil {
t.Error("Expected error for invalid channel")
}
})
}
}
func TestParseFrame_ExcessSubcarriers(t *testing.T) {
data := make([]byte, HeaderSize+256)
data[23] = 129 // n_sub > 128
_, err := ParseFrame(data)
if err == nil {
t.Error("Expected error for excess subcarriers")
}
}
func TestParseFrame_HeaderOnly(t *testing.T) {
// Header-only frame (n_sub = 0) is valid per the spec
data := make([]byte, HeaderSize)
data[22] = 6 // valid channel
data[23] = 0 // n_sub = 0
frame, err := ParseFrame(data)
if err != nil {
t.Fatalf("ParseFrame failed: %v", err)
}
if frame.NSub != 0 {
t.Errorf("NSub mismatch: got %d, want 0", frame.NSub)
}
if len(frame.Payload) != 0 {
t.Errorf("Payload should be empty, got %d bytes", len(frame.Payload))
}
}

View file

@ -0,0 +1,177 @@
package ingestion
import (
"encoding/json"
"fmt"
)
// Upstream messages (node -> mothership)
// HelloMessage is sent by the node on WebSocket connect
type HelloMessage struct {
Type string `json:"type"`
MAC string `json:"mac"`
NodeID string `json:"node_id,omitempty"`
FirmwareVersion string `json:"firmware_version"`
Capabilities []string `json:"capabilities"`
Chip string `json:"chip,omitempty"`
FlashMB int `json:"flash_mb,omitempty"`
UptimeMS int64 `json:"uptime_ms,omitempty"`
APBSSID string `json:"ap_bssid,omitempty"`
APChannel int `json:"ap_channel,omitempty"`
}
// HealthMessage is sent every 10 seconds
type HealthMessage struct {
Type string `json:"type"`
MAC string `json:"mac"`
TimestampMS int64 `json:"timestamp_ms"`
FreeHeapBytes int64 `json:"free_heap_bytes"`
WifiRSSIdBm int `json:"wifi_rssi_dbm"`
UptimeMS int64 `json:"uptime_ms"`
TemperatureC float64 `json:"temperature_c,omitempty"`
CSIRateHz int `json:"csi_rate_hz"`
WifiChannel int `json:"wifi_channel"`
IP string `json:"ip,omitempty"`
}
// BLEDevice represents a discovered BLE device
type BLEDevice struct {
Addr string `json:"addr"`
AddrType string `json:"addr_type,omitempty"`
RSSIdBm int `json:"rssi_dbm"`
Name string `json:"name,omitempty"`
MfrID int `json:"mfr_id,omitempty"`
MfrDataHex string `json:"mfr_data_hex,omitempty"`
}
// BLEMessage is sent every 5 seconds with discovered devices
type BLEMessage struct {
Type string `json:"type"`
MAC string `json:"mac"`
TimestampMS int64 `json:"timestamp_ms"`
Devices []BLEDevice `json:"devices"`
}
// MotionHintMessage is sent when on-device variance exceeds threshold
type MotionHintMessage struct {
Type string `json:"type"`
MAC string `json:"mac"`
TimestampMS int64 `json:"timestamp_ms"`
Variance float64 `json:"variance"`
}
// OTAStatusMessage reports OTA progress
type OTAStatusMessage struct {
Type string `json:"type"`
MAC string `json:"mac"`
State string `json:"state"` // downloading | verifying | writing | rebooting | failed
ProgressPct int `json:"progress_pct,omitempty"`
Error string `json:"error,omitempty"`
}
// Downstream messages (mothership -> node)
// RoleMessage assigns operational role to a node
type RoleMessage struct {
Type string `json:"type"`
Role string `json:"role"` // tx | rx | tx_rx | passive | idle
PassiveBSSID string `json:"passive_bssid,omitempty"`
}
// ConfigMessage changes operational parameters
type ConfigMessage struct {
Type string `json:"type"`
RateHz *int `json:"rate_hz,omitempty"`
TXSlotUS *int `json:"tx_slot_us,omitempty"`
VarianceThreshold *float64 `json:"variance_threshold,omitempty"`
}
// OTAMessage triggers a firmware update
type OTAMessage struct {
Type string `json:"type"`
URL string `json:"url"`
SHA256 string `json:"sha256"`
Version string `json:"version"`
}
// RebootMessage triggers a node reboot
type RebootMessage struct {
Type string `json:"type"`
DelayMS int `json:"delay_ms,omitempty"`
}
// IdentifyMessage triggers LED blink for identification
type IdentifyMessage struct {
Type string `json:"type"`
DurationMS int `json:"duration_ms,omitempty"`
}
// BaselineRequestMessage requests baseline data from node
type BaselineRequestMessage struct {
Type string `json:"type"`
}
// ShutdownMessage notifies node of mothership shutdown
type ShutdownMessage struct {
Type string `json:"type"`
ReconnectInMS int `json:"reconnect_in_ms"`
}
// RejectMessage rejects a connection
type RejectMessage struct {
Type string `json:"type"`
Reason string `json:"reason"` // invalid_token | unknown_node | rate_limited
}
// ParseJSONMessage parses a JSON message and returns the appropriate type
func ParseJSONMessage(data []byte) (interface{}, error) {
// First, extract the type field
var base struct {
Type string `json:"type"`
}
if err := json.Unmarshal(data, &base); err != nil {
return nil, fmt.Errorf("failed to parse message type: %w", err)
}
switch base.Type {
case "hello":
var msg HelloMessage
if err := json.Unmarshal(data, &msg); err != nil {
return nil, err
}
return &msg, nil
case "health":
var msg HealthMessage
if err := json.Unmarshal(data, &msg); err != nil {
return nil, err
}
return &msg, nil
case "ble":
var msg BLEMessage
if err := json.Unmarshal(data, &msg); err != nil {
return nil, err
}
return &msg, nil
case "motion_hint":
var msg MotionHintMessage
if err := json.Unmarshal(data, &msg); err != nil {
return nil, err
}
return &msg, nil
case "ota_status":
var msg OTAStatusMessage
if err := json.Unmarshal(data, &msg); err != nil {
return nil, err
}
return &msg, nil
default:
// Unknown type - per protocol, silently ignore but return raw
return nil, fmt.Errorf("unknown message type: %s", base.Type)
}
}
// ToJSON serializes a downstream message to JSON bytes
func ToJSON(msg interface{}) ([]byte, error) {
return json.Marshal(msg)
}

View file

@ -0,0 +1,207 @@
package ingestion
import (
"encoding/json"
"testing"
)
func TestParseJSONMessage_Hello(t *testing.T) {
data := `{"type":"hello","mac":"AA:BB:CC:DD:EE:FF","firmware_version":"1.0.0","capabilities":["csi","ble","tx","rx"],"chip":"ESP32-S3","flash_mb":16,"uptime_ms":4200}`
msg, err := ParseJSONMessage([]byte(data))
if err != nil {
t.Fatalf("ParseJSONMessage failed: %v", err)
}
hello, ok := msg.(*HelloMessage)
if !ok {
t.Fatal("Expected HelloMessage type")
}
if hello.MAC != "AA:BB:CC:DD:EE:FF" {
t.Errorf("MAC mismatch: got %s", hello.MAC)
}
if hello.FirmwareVersion != "1.0.0" {
t.Errorf("FirmwareVersion mismatch: got %s", hello.FirmwareVersion)
}
if len(hello.Capabilities) != 4 {
t.Errorf("Capabilities count: got %d, want 4", len(hello.Capabilities))
}
if hello.Chip != "ESP32-S3" {
t.Errorf("Chip mismatch: got %s", hello.Chip)
}
if hello.FlashMB != 16 {
t.Errorf("FlashMB mismatch: got %d", hello.FlashMB)
}
}
func TestParseJSONMessage_Health(t *testing.T) {
data := `{"type":"health","mac":"AA:BB:CC:DD:EE:FF","timestamp_ms":1711234567890,"free_heap_bytes":204800,"wifi_rssi_dbm":-52,"uptime_ms":3600000,"temperature_c":42.1,"csi_rate_hz":20,"wifi_channel":6,"ip":"192.168.1.123"}`
msg, err := ParseJSONMessage([]byte(data))
if err != nil {
t.Fatalf("ParseJSONMessage failed: %v", err)
}
health, ok := msg.(*HealthMessage)
if !ok {
t.Fatal("Expected HealthMessage type")
}
if health.MAC != "AA:BB:CC:DD:EE:FF" {
t.Errorf("MAC mismatch: got %s", health.MAC)
}
if health.FreeHeapBytes != 204800 {
t.Errorf("FreeHeapBytes mismatch: got %d", health.FreeHeapBytes)
}
if health.WifiRSSIdBm != -52 {
t.Errorf("WifiRSSIdBm mismatch: got %d", health.WifiRSSIdBm)
}
if health.TemperatureC != 42.1 {
t.Errorf("TemperatureC mismatch: got %f", health.TemperatureC)
}
}
func TestParseJSONMessage_BLE(t *testing.T) {
data := `{"type":"ble","mac":"AA:BB:CC:DD:EE:FF","timestamp_ms":1711234567890,"devices":[{"addr":"AA:BB:CC:DD:EE:00","addr_type":"public","rssi_dbm":-62,"name":"iPhone"}]}`
msg, err := ParseJSONMessage([]byte(data))
if err != nil {
t.Fatalf("ParseJSONMessage failed: %v", err)
}
ble, ok := msg.(*BLEMessage)
if !ok {
t.Fatal("Expected BLEMessage type")
}
if len(ble.Devices) != 1 {
t.Fatalf("Devices count: got %d, want 1", len(ble.Devices))
}
if ble.Devices[0].Name != "iPhone" {
t.Errorf("Device name mismatch: got %s", ble.Devices[0].Name)
}
if ble.Devices[0].RSSIdBm != -62 {
t.Errorf("Device RSSI mismatch: got %d", ble.Devices[0].RSSIdBm)
}
}
func TestParseJSONMessage_MotionHint(t *testing.T) {
data := `{"type":"motion_hint","mac":"AA:BB:CC:DD:EE:FF","timestamp_ms":1711234567890,"variance":0.043}`
msg, err := ParseJSONMessage([]byte(data))
if err != nil {
t.Fatalf("ParseJSONMessage failed: %v", err)
}
motion, ok := msg.(*MotionHintMessage)
if !ok {
t.Fatal("Expected MotionHintMessage type")
}
if motion.Variance != 0.043 {
t.Errorf("Variance mismatch: got %f", motion.Variance)
}
}
func TestParseJSONMessage_OTAStatus(t *testing.T) {
data := `{"type":"ota_status","mac":"AA:BB:CC:DD:EE:FF","state":"downloading","progress_pct":45}`
msg, err := ParseJSONMessage([]byte(data))
if err != nil {
t.Fatalf("ParseJSONMessage failed: %v", err)
}
ota, ok := msg.(*OTAStatusMessage)
if !ok {
t.Fatal("Expected OTAStatusMessage type")
}
if ota.State != "downloading" {
t.Errorf("State mismatch: got %s", ota.State)
}
if ota.ProgressPct != 45 {
t.Errorf("ProgressPct mismatch: got %d", ota.ProgressPct)
}
}
func TestParseJSONMessage_Unknown(t *testing.T) {
data := `{"type":"unknown_type","mac":"AA:BB:CC:DD:EE:FF"}`
_, err := ParseJSONMessage([]byte(data))
if err == nil {
t.Error("Expected error for unknown message type")
}
}
func TestParseJSONMessage_Invalid(t *testing.T) {
data := `{"type":`
_, err := ParseJSONMessage([]byte(data))
if err == nil {
t.Error("Expected error for invalid JSON")
}
}
func TestToJSON_RoleMessage(t *testing.T) {
msg := RoleMessage{Type: "role", Role: "rx"}
data, err := ToJSON(msg)
if err != nil {
t.Fatalf("ToJSON failed: %v", err)
}
// Verify it's valid JSON
var parsed RoleMessage
if err := json.Unmarshal(data, &parsed); err != nil {
t.Fatalf("Result is not valid JSON: %v", err)
}
if parsed.Role != "rx" {
t.Errorf("Role mismatch: got %s", parsed.Role)
}
}
func TestToJSON_ConfigMessage(t *testing.T) {
rate := 50
msg := ConfigMessage{Type: "config", RateHz: &rate}
data, err := ToJSON(msg)
if err != nil {
t.Fatalf("ToJSON failed: %v", err)
}
var parsed ConfigMessage
if err := json.Unmarshal(data, &parsed); err != nil {
t.Fatalf("Result is not valid JSON: %v", err)
}
if parsed.RateHz == nil || *parsed.RateHz != 50 {
t.Errorf("RateHz mismatch")
}
}
func TestToJSON_RejectMessage(t *testing.T) {
msg := RejectMessage{Type: "reject", Reason: "invalid_token"}
data, err := ToJSON(msg)
if err != nil {
t.Fatalf("ToJSON failed: %v", err)
}
var parsed RejectMessage
if err := json.Unmarshal(data, &parsed); err != nil {
t.Fatalf("Result is not valid JSON: %v", err)
}
if parsed.Reason != "invalid_token" {
t.Errorf("Reason mismatch: got %s", parsed.Reason)
}
}

View file

@ -0,0 +1,136 @@
package ingestion
import (
"sync"
"time"
)
// RingBufferCapacity is the maximum number of CSI samples per link
const RingBufferCapacity = 256
// TimestampedFrame wraps a CSIFrame with the mothership receive time
type TimestampedFrame struct {
Frame *CSIFrame
RecvTime time.Time
RecvTimeNS int64 // Nanoseconds since Unix epoch for precise timing
}
// RingBuffer is a thread-safe circular buffer for CSI samples
type RingBuffer struct {
mu sync.RWMutex
frames [RingBufferCapacity]*TimestampedFrame
head int // Next write position
count int // Number of valid entries
totalSeq int // Total frames ever written (for computing sequence numbers)
}
// NewRingBuffer creates a new ring buffer
func NewRingBuffer() *RingBuffer {
return &RingBuffer{}
}
// Push adds a frame to the buffer
func (rb *RingBuffer) Push(frame *CSIFrame, recvTime time.Time) {
rb.mu.Lock()
defer rb.mu.Unlock()
rb.frames[rb.head] = &TimestampedFrame{
Frame: frame,
RecvTime: recvTime,
RecvTimeNS: recvTime.UnixNano(),
}
rb.head = (rb.head + 1) % RingBufferCapacity
if rb.count < RingBufferCapacity {
rb.count++
}
rb.totalSeq++
}
// GetAll returns all frames in chronological order (oldest first)
func (rb *RingBuffer) GetAll() []*TimestampedFrame {
rb.mu.RLock()
defer rb.mu.RUnlock()
if rb.count == 0 {
return nil
}
result := make([]*TimestampedFrame, rb.count)
// If buffer is not full, start from beginning
// If buffer is full, head points to oldest entry
start := 0
if rb.count == RingBufferCapacity {
start = rb.head
}
for i := 0; i < rb.count; i++ {
idx := (start + i) % RingBufferCapacity
// Return copies to prevent race conditions
f := rb.frames[idx]
result[i] = &TimestampedFrame{
Frame: f.Frame,
RecvTime: f.RecvTime,
RecvTimeNS: f.RecvTimeNS,
}
}
return result
}
// GetLast returns the N most recent frames (or all if N > count)
func (rb *RingBuffer) GetLast(n int) []*TimestampedFrame {
rb.mu.RLock()
defer rb.mu.RUnlock()
if rb.count == 0 || n <= 0 {
return nil
}
count := n
if count > rb.count {
count = rb.count
}
result := make([]*TimestampedFrame, count)
// Most recent entry is at (head - 1 + Capacity) % Capacity
for i := 0; i < count; i++ {
idx := (rb.head - 1 - i + RingBufferCapacity) % RingBufferCapacity
f := rb.frames[idx]
result[count-1-i] = &TimestampedFrame{
Frame: f.Frame,
RecvTime: f.RecvTime,
RecvTimeNS: f.RecvTimeNS,
}
}
return result
}
// Count returns the number of frames in the buffer
func (rb *RingBuffer) Count() int {
rb.mu.RLock()
defer rb.mu.RUnlock()
return rb.count
}
// Clear removes all frames from the buffer
func (rb *RingBuffer) Clear() {
rb.mu.Lock()
defer rb.mu.Unlock()
rb.head = 0
rb.count = 0
rb.totalSeq = 0
for i := range rb.frames {
rb.frames[i] = nil
}
}
// TotalSeq returns the total number of frames ever pushed
func (rb *RingBuffer) TotalSeq() int {
rb.mu.RLock()
defer rb.mu.RUnlock()
return rb.totalSeq
}

View file

@ -0,0 +1,188 @@
package ingestion
import (
"testing"
"time"
)
func makeTestFrame(seq int) *CSIFrame {
return &CSIFrame{
NodeMAC: [6]byte{0xAA, 0xBB, 0xCC, 0xDD, 0xEE, byte(seq)},
PeerMAC: [6]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66},
TimestampUS: uint64(seq * 50000), // 50ms intervals
RSSI: -50,
Channel: 6,
NSub: 2,
Payload: []int8{int8(seq), int8(seq + 1), int8(seq + 2), int8(seq + 3)},
}
}
func TestRingBuffer_PushAndGetAll(t *testing.T) {
rb := NewRingBuffer()
// Push 5 frames
for i := 0; i < 5; i++ {
rb.Push(makeTestFrame(i), time.Now())
}
if rb.Count() != 5 {
t.Errorf("Count mismatch: got %d, want 5", rb.Count())
}
frames := rb.GetAll()
if len(frames) != 5 {
t.Fatalf("GetAll returned %d frames, want 5", len(frames))
}
// Verify chronological order (oldest first)
for i, tf := range frames {
if tf.Frame.TimestampUS != uint64(i*50000) {
t.Errorf("Frame %d has wrong timestamp: %d", i, tf.Frame.TimestampUS)
}
}
}
func TestRingBuffer_Wraparound(t *testing.T) {
rb := NewRingBuffer()
// Push more frames than capacity
for i := 0; i < RingBufferCapacity+50; i++ {
rb.Push(makeTestFrame(i), time.Now())
}
if rb.Count() != RingBufferCapacity {
t.Errorf("Count after wrap: got %d, want %d", rb.Count(), RingBufferCapacity)
}
frames := rb.GetAll()
if len(frames) != RingBufferCapacity {
t.Fatalf("GetAll returned %d frames, want %d", len(frames), RingBufferCapacity)
}
// First frame should be frame 50 (oldest after wrap)
if frames[0].Frame.TimestampUS != 50*50000 {
t.Errorf("First frame timestamp: got %d, want %d", frames[0].Frame.TimestampUS, 50*50000)
}
// Last frame should be frame 299 (newest)
lastIdx := len(frames) - 1
if frames[lastIdx].Frame.TimestampUS != (50+RingBufferCapacity-1)*50000 {
t.Errorf("Last frame timestamp: got %d, want %d",
frames[lastIdx].Frame.TimestampUS, (50+RingBufferCapacity-1)*50000)
}
}
func TestRingBuffer_GetLast(t *testing.T) {
rb := NewRingBuffer()
// Push 10 frames
for i := 0; i < 10; i++ {
rb.Push(makeTestFrame(i), time.Now())
}
// Get last 3
frames := rb.GetLast(3)
if len(frames) != 3 {
t.Fatalf("GetLast(3) returned %d frames", len(frames))
}
// Should be frames 7, 8, 9 in chronological order
for i, tf := range frames {
expectedSeq := 7 + i
if tf.Frame.TimestampUS != uint64(expectedSeq*50000) {
t.Errorf("Frame %d: expected timestamp %d, got %d",
i, expectedSeq*50000, tf.Frame.TimestampUS)
}
}
}
func TestRingBuffer_GetLastMoreThanCount(t *testing.T) {
rb := NewRingBuffer()
// Push 5 frames
for i := 0; i < 5; i++ {
rb.Push(makeTestFrame(i), time.Now())
}
// Request 10, should get 5
frames := rb.GetLast(10)
if len(frames) != 5 {
t.Errorf("GetLast(10) returned %d frames, want 5", len(frames))
}
}
func TestRingBuffer_Empty(t *testing.T) {
rb := NewRingBuffer()
if rb.Count() != 0 {
t.Errorf("Empty buffer count: got %d, want 0", rb.Count())
}
frames := rb.GetAll()
if frames != nil {
t.Errorf("GetAll on empty buffer: got %v, want nil", frames)
}
frames = rb.GetLast(5)
if frames != nil {
t.Errorf("GetLast on empty buffer: got %v, want nil", frames)
}
}
func TestRingBuffer_Clear(t *testing.T) {
rb := NewRingBuffer()
for i := 0; i < 10; i++ {
rb.Push(makeTestFrame(i), time.Now())
}
rb.Clear()
if rb.Count() != 0 {
t.Errorf("After clear, count: got %d, want 0", rb.Count())
}
frames := rb.GetAll()
if frames != nil {
t.Errorf("After clear, GetAll: got %v, want nil", frames)
}
}
func TestRingBuffer_TotalSeq(t *testing.T) {
rb := NewRingBuffer()
for i := 0; i < 10; i++ {
rb.Push(makeTestFrame(i), time.Now())
}
if rb.TotalSeq() != 10 {
t.Errorf("TotalSeq: got %d, want 10", rb.TotalSeq())
}
rb.Clear()
// TotalSeq should also reset on clear
if rb.TotalSeq() != 0 {
t.Errorf("After clear, TotalSeq: got %d, want 0", rb.TotalSeq())
}
}
func TestRingBuffer_RecvTime(t *testing.T) {
rb := NewRingBuffer()
now := time.Now()
rb.Push(makeTestFrame(0), now)
frames := rb.GetAll()
if len(frames) != 1 {
t.Fatal("Expected 1 frame")
}
if !frames[0].RecvTime.Equal(now) {
t.Errorf("RecvTime mismatch: got %v, want %v", frames[0].RecvTime, now)
}
if frames[0].RecvTimeNS != now.UnixNano() {
t.Errorf("RecvTimeNS mismatch: got %d, want %d", frames[0].RecvTimeNS, now.UnixNano())
}
}

View file

@ -0,0 +1,370 @@
package ingestion
import (
"context"
"encoding/json"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
// Server manages WebSocket connections from ESP32 nodes
type Server struct {
mu sync.RWMutex
connections map[string]*NodeConnection // keyed by MAC
links map[string]*RingBuffer // keyed by "nodeMAC:peerMAC"
// Malformed frame tracking per connection
malformedCounts map[string]*malformedCounter
// WebSocket upgrader
upgrader websocket.Upgrader
// Shutdown state
shutdown bool
}
// NodeConnection tracks state for a connected node
type NodeConnection struct {
MAC string
Conn *websocket.Conn
Hello *HelloMessage
LastHealth *HealthMessage
LastHealthTime time.Time
ConnectedAt time.Time
LastFrameTime time.Time
// Write mutex for thread-safe sends
writeMu sync.Mutex
}
// malformedCounter tracks malformed frame counts for rate limiting
type malformedCounter struct {
count int
firstSeen time.Time
}
const (
// Ping/pong timing
pingInterval = 30 * time.Second
readDeadline = 60 * time.Second
// Malformed frame thresholds
malformedWarnThreshold = 100
malformedCloseThreshold = 1000
malformedWindow = time.Minute
)
// NewServer creates a new ingestion server
func NewServer() *Server {
return &Server{
connections: make(map[string]*NodeConnection),
links: make(map[string]*RingBuffer),
malformedCounts: make(map[string]*malformedCounter),
upgrader: websocket.Upgrader{
// Allow all origins for development (TODO: restrict in production)
CheckOrigin: func(r *http.Request) bool {
return true
},
ReadBufferSize: 512,
WriteBufferSize: 512,
},
}
}
// HandleNodeWS handles WebSocket connections at /ws/node
func (s *Server) HandleNodeWS(w http.ResponseWriter, r *http.Request) {
// Upgrade HTTP connection to WebSocket
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("[WARN] WebSocket upgrade failed: %v", err)
return
}
// Set initial read deadline
conn.SetReadDeadline(time.Now().Add(readDeadline))
// Create connection state
nc := &NodeConnection{
Conn: conn,
ConnectedAt: time.Now(),
}
// Wait for hello message (must be first)
_, msg, err := conn.ReadMessage()
if err != nil {
log.Printf("[WARN] Failed to read hello: %v", err)
conn.Close()
return
}
// Parse as JSON (hello must be JSON)
parsed, err := ParseJSONMessage(msg)
if err != nil {
s.sendReject(conn, "invalid hello format")
conn.Close()
return
}
hello, ok := parsed.(*HelloMessage)
if !ok {
s.sendReject(conn, "expected hello first")
conn.Close()
return
}
nc.MAC = hello.MAC
nc.Hello = hello
// Register connection
s.mu.Lock()
// Close existing connection from same MAC if present
if existing, exists := s.connections[hello.MAC]; exists {
existing.Conn.Close()
}
s.connections[hello.MAC] = nc
s.malformedCounts[hello.MAC] = &malformedCounter{}
s.mu.Unlock()
log.Printf("[INFO] Node connected: MAC=%s firmware=%s chip=%s",
hello.MAC, hello.FirmwareVersion, hello.Chip)
// Send initial role and config
s.sendRole(nc, "rx", "")
s.sendConfig(nc, 20, 0, 0) // 20 Hz default
// Start ping goroutine
go s.pingLoop(nc)
// Message handling loop
s.handleMessages(nc)
}
// handleMessages processes incoming WebSocket messages
func (s *Server) handleMessages(nc *NodeConnection) {
defer func() {
nc.Conn.Close()
s.mu.Lock()
delete(s.connections, nc.MAC)
delete(s.malformedCounts, nc.MAC)
s.mu.Unlock()
log.Printf("[INFO] Node disconnected: MAC=%s", nc.MAC)
}()
for {
// Reset read deadline on each message
nc.Conn.SetReadDeadline(time.Now().Add(readDeadline))
messageType, data, err := nc.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("[WARN] Unexpected close from %s: %v", nc.MAC, err)
}
return
}
if messageType == websocket.BinaryMessage {
s.handleBinaryFrame(nc, data)
} else if messageType == websocket.TextMessage {
s.handleJSONMessage(nc, data)
}
}
}
// handleBinaryFrame processes a CSI binary frame
func (s *Server) handleBinaryFrame(nc *NodeConnection, data []byte) {
frame, err := ParseFrame(data)
if err != nil {
s.recordMalformed(nc.MAC)
return
}
// Update last frame time
nc.LastFrameTime = time.Now()
// Get or create ring buffer for this link
linkID := frame.LinkID()
s.mu.Lock()
ring, exists := s.links[linkID]
if !exists {
ring = NewRingBuffer()
s.links[linkID] = ring
}
s.mu.Unlock()
// Push frame to ring buffer
ring.Push(frame, time.Now())
}
// handleJSONMessage processes a JSON control message
func (s *Server) handleJSONMessage(nc *NodeConnection, data []byte) {
parsed, err := ParseJSONMessage(data)
if err != nil {
// Unknown types are silently ignored per protocol
return
}
switch msg := parsed.(type) {
case *HealthMessage:
nc.LastHealth = msg
nc.LastHealthTime = time.Now()
// TODO: expose health metrics
case *BLEMessage:
// TODO: forward BLE data to identity matcher
case *MotionHintMessage:
// TODO: trigger adaptive rate changes
case *OTAStatusMessage:
// TODO: track OTA progress
}
}
// recordMalformed tracks malformed frames and closes connection if threshold exceeded
func (s *Server) recordMalformed(mac string) {
s.mu.Lock()
defer s.mu.Unlock()
counter, exists := s.malformedCounts[mac]
if !exists {
return
}
// Reset counter if window has passed
if time.Since(counter.firstSeen) > malformedWindow {
counter.count = 0
counter.firstSeen = time.Now()
}
counter.count++
// Warn at 100
if counter.count == malformedWarnThreshold {
log.Printf("[WARN] Node %s sending malformed CSI frames (count=%d)", mac, counter.count)
}
// Close at 1000
if counter.count >= malformedCloseThreshold {
log.Printf("[ERROR] Node %s exceeded malformed frame threshold, closing connection", mac)
if nc, exists := s.connections[mac]; exists {
nc.Conn.Close()
}
}
}
// pingLoop sends periodic ping frames to keep the connection alive
func (s *Server) pingLoop(nc *NodeConnection) {
ticker := time.NewTicker(pingInterval)
defer ticker.Stop()
for range ticker.C {
nc.writeMu.Lock()
err := nc.Conn.WriteMessage(websocket.PingMessage, nil)
nc.writeMu.Unlock()
if err != nil {
return // Connection closed
}
// Check for shutdown
s.mu.RLock()
shutdown := s.shutdown
s.mu.RUnlock()
if shutdown {
return
}
}
}
// sendReject sends a reject message and closes the connection
func (s *Server) sendReject(conn *websocket.Conn, reason string) {
msg := RejectMessage{Type: "reject", Reason: reason}
data, _ := json.Marshal(msg)
conn.WriteMessage(websocket.TextMessage, data)
}
// sendRole sends a role assignment to a node
func (s *Server) sendRole(nc *NodeConnection, role string, passiveBSSID string) {
msg := RoleMessage{Type: "role", Role: role, PassiveBSSID: passiveBSSID}
data, _ := json.Marshal(msg)
nc.writeMu.Lock()
nc.Conn.WriteMessage(websocket.TextMessage, data)
nc.writeMu.Unlock()
}
// sendConfig sends configuration to a node
func (s *Server) sendConfig(nc *NodeConnection, rateHz int, txSlotUS int, varianceThreshold float64) {
msg := ConfigMessage{Type: "config"}
if rateHz > 0 {
msg.RateHz = &rateHz
}
if txSlotUS > 0 {
msg.TXSlotUS = &txSlotUS
}
if varianceThreshold > 0 {
msg.VarianceThreshold = &varianceThreshold
}
data, _ := json.Marshal(msg)
nc.writeMu.Lock()
nc.Conn.WriteMessage(websocket.TextMessage, data)
nc.writeMu.Unlock()
}
// Shutdown gracefully shuts down the server
func (s *Server) Shutdown(ctx context.Context) {
s.mu.Lock()
s.shutdown = true
// Send shutdown message to all connected nodes
shutdownMsg := ShutdownMessage{Type: "shutdown", ReconnectInMS: 30000}
data, _ := json.Marshal(shutdownMsg)
for mac, nc := range s.connections {
nc.writeMu.Lock()
nc.Conn.WriteMessage(websocket.TextMessage, data)
nc.Conn.Close()
nc.writeMu.Unlock()
delete(s.connections, mac)
}
s.mu.Unlock()
log.Printf("[INFO] Ingestion server shutdown complete")
}
// GetConnectedNodes returns a list of connected node MACs
func (s *Server) GetConnectedNodes() []string {
s.mu.RLock()
defer s.mu.RUnlock()
macs := make([]string, 0, len(s.connections))
for mac := range s.connections {
macs = append(macs, mac)
}
return macs
}
// GetLinkBuffer returns the ring buffer for a specific link
func (s *Server) GetLinkBuffer(nodeMAC, peerMAC string) *RingBuffer {
linkID := nodeMAC + ":" + peerMAC
s.mu.RLock()
defer s.mu.RUnlock()
return s.links[linkID]
}
// GetAllLinks returns all link IDs that have data
func (s *Server) GetAllLinks() []string {
s.mu.RLock()
defer s.mu.RUnlock()
links := make([]string, 0, len(s.links))
for linkID := range s.links {
links = append(links, linkID)
}
return links
}