diff --git a/dashboard/index.html b/dashboard/index.html index aeee685..13abc1d 100644 --- a/dashboard/index.html +++ b/dashboard/index.html @@ -186,6 +186,44 @@ .link-item.selected { background: rgba(79, 195, 247, 0.2); } + + /* Presence badge */ + .presence-badge { + font-size: 10px; + font-weight: 600; + padding: 2px 6px; + border-radius: 3px; + letter-spacing: 0.5px; + } + + .presence-badge.motion { + background: rgba(244, 67, 54, 0.25); + color: #ef5350; + } + + .presence-badge.clear { + background: rgba(76, 175, 80, 0.2); + color: #66bb6a; + } + + /* Overall presence indicator in status bar */ + #presence-indicator { + font-size: 12px; + font-weight: 600; + padding: 3px 8px; + border-radius: 4px; + transition: background 0.3s, color 0.3s; + } + + #presence-indicator.motion { + background: rgba(244, 67, 54, 0.3); + color: #ef5350; + } + + #presence-indicator.clear { + background: rgba(76, 175, 80, 0.15); + color: #66bb6a; + } @@ -201,6 +239,9 @@
Links: 0
+
+ CLEAR +
FPS: 0
diff --git a/dashboard/js/app.js b/dashboard/js/app.js index 7a6854b..bf35944 100644 --- a/dashboard/js/app.js +++ b/dashboard/js/app.js @@ -31,7 +31,7 @@ ws: null, wsConnected: false, nodes: new Map(), // MAC -> { mac, firmware, chip, lastSeen } - links: new Map(), // linkID -> { nodeMAC, peerMAC, lastFrame, lastCSI } + links: new Map(), // linkID -> { nodeMAC, peerMAC, lastFrame, lastCSI, motionDetected, deltaRMS } selectedLinkID: null, lastChartUpdate: 0, frameCount: 0, @@ -220,14 +220,20 @@ } if (msg.links) { msg.links.forEach(link => { + const existing = state.links.get(link.id) || {}; state.links.set(link.id, { nodeMAC: link.node_mac, peerMAC: link.peer_mac, lastFrame: Date.now(), - lastCSI: null + lastCSI: existing.lastCSI || null, + motionDetected: existing.motionDetected || false, + deltaRMS: existing.deltaRMS || 0 }); }); } + if (msg.motion_states) { + msg.motion_states.forEach(ms => applyMotionState(ms)); + } updateNodeList(); updateLinkList(); break; @@ -253,17 +259,40 @@ nodeMAC: msg.node_mac, peerMAC: msg.peer_mac, lastFrame: Date.now(), - lastCSI: null + lastCSI: null, + motionDetected: false, + deltaRMS: 0 }); updateLinkList(); } break; + case 'motion_state': + // Targeted broadcast on state change + if (msg.links) { + let changed = false; + msg.links.forEach(ms => { + if (applyMotionState(ms)) changed = true; + }); + if (changed) updateLinkList(); + } + break; + default: // Ignore unknown types (forward-compatible) } } + // applyMotionState updates a link's motion fields; returns true if it changed. + function applyMotionState(ms) { + const link = state.links.get(ms.link_id); + if (!link) return false; + const prev = link.motionDetected; + link.motionDetected = ms.motion_detected; + link.deltaRMS = ms.delta_rms || 0; + return prev !== ms.motion_detected; + } + function handleBinaryFrame(buffer) { const frame = parseCSIFrame(buffer); if (!frame) return; @@ -277,7 +306,9 @@ nodeMAC: frame.nodeMAC, peerMAC: frame.peerMAC, lastFrame: Date.now(), - lastCSI: null + lastCSI: null, + motionDetected: false, + deltaRMS: 0 }; state.links.set(linkID, link); updateLinkList(); @@ -387,6 +418,21 @@ container.innerHTML = html; } + function updatePresenceIndicator() { + let anyMotion = false; + state.links.forEach(link => { + if (link.motionDetected) anyMotion = true; + }); + const el = document.getElementById('presence-indicator'); + if (anyMotion) { + el.className = 'motion'; + el.textContent = 'MOTION'; + } else { + el.className = 'clear'; + el.textContent = 'CLEAR'; + } + } + function updateLinkList() { const container = document.getElementById('link-list'); document.getElementById('link-count').textContent = state.links.size; @@ -400,10 +446,12 @@ state.links.forEach((link, id) => { const selected = state.selectedLinkID === id ? 'selected' : ''; const shortID = id.split(':').map(p => p.split(':').slice(-1)[0]).join('→'); + const motionClass = link.motionDetected ? 'motion' : 'clear'; + const motionLabel = link.motionDetected ? 'MOTION' : 'CLEAR'; html += ` `; }); @@ -413,6 +461,8 @@ container.querySelectorAll('.link-item').forEach(el => { el.addEventListener('click', () => selectLink(el.dataset.linkId)); }); + + updatePresenceIndicator(); } function selectLink(linkID) { diff --git a/mothership/cmd/mothership/main.go b/mothership/cmd/mothership/main.go index fcbafb7..fc59f7c 100644 --- a/mothership/cmd/mothership/main.go +++ b/mothership/cmd/mothership/main.go @@ -10,6 +10,7 @@ import ( "os" "os/signal" "path/filepath" + "strconv" "syscall" "time" @@ -18,6 +19,8 @@ import ( "github.com/hashicorp/mdns" "github.com/spaxel/mothership/internal/dashboard" "github.com/spaxel/mothership/internal/ingestion" + "github.com/spaxel/mothership/internal/replay" + sigproc "github.com/spaxel/mothership/internal/signal" ) // Build-time version injection @@ -31,6 +34,7 @@ type Config struct { MDNSName string MDNSEnabled bool LogLevel string + ReplayMaxMB int } func main() { @@ -38,20 +42,16 @@ func main() { log.Printf("[INFO] Spaxel mothership v%s starting", version) log.Printf("[DEBUG] Config: bind=%s data=%s static=%s mdns=%s", cfg.BindAddr, cfg.DataDir, cfg.StaticDir, cfg.MDNSName) - // Create context with cancellation for graceful shutdown - _, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Set up signal handling sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) - // Create router r := chi.NewRouter() r.Use(middleware.Logger) r.Use(middleware.Recoverer) - // Health check endpoint r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -62,52 +62,75 @@ func main() { ingestSrv := ingestion.NewServer() r.HandleFunc("/ws/node", ingestSrv.HandleNodeWS) - // Create dashboard hub and server + // Signal processing pipeline + pm := sigproc.NewProcessorManager(sigproc.ProcessorManagerConfig{ + NSub: 64, + FusionRate: 10.0, + Tau: 30.0, + }) + ingestSrv.SetProcessorManager(pm) + + // Replay recording store + if err := os.MkdirAll(cfg.DataDir, 0755); err != nil { + log.Printf("[WARN] Failed to create data dir %s: %v", cfg.DataDir, err) + } else { + store, err := replay.NewRecordingStore(filepath.Join(cfg.DataDir, "csi_replay.bin"), cfg.ReplayMaxMB) + if err != nil { + log.Printf("[WARN] Failed to open replay store: %v (CSI recording disabled)", err) + } else { + ingestSrv.SetReplayStore(store) + defer store.Close() + log.Printf("[INFO] CSI replay store at %s (%d MB max)", filepath.Join(cfg.DataDir, "csi_replay.bin"), cfg.ReplayMaxMB) + } + } + + // Adaptive rate controller + rateCtrl := ingestion.NewRateController(func(mac string, rateHz int) { + ingestSrv.SendConfigToMAC(mac, rateHz) + }) + ingestSrv.SetRateController(rateCtrl) + go rateCtrl.Run(ctx) + + // Dashboard hub and server dashboardHub := dashboard.NewHub() dashboardSrv := dashboard.NewServer(dashboardHub) - // Connect ingestion to dashboard (for state queries) dashboardHub.SetIngestionState(ingestSrv) - // Start dashboard hub in background + // Wire ingestion → dashboard for CSI and motion broadcasts + ingestSrv.SetDashboardBroadcaster(dashboardHub) + ingestSrv.SetMotionBroadcaster(dashboardHub) + go dashboardHub.Run() - // Dashboard WebSocket endpoint r.HandleFunc("/ws/dashboard", dashboardSrv.HandleDashboardWS) // Serve dashboard static files staticDir := cfg.StaticDir if staticDir == "" { - // Default: look for dashboard directory relative to binary or cwd staticDir = findDashboardDir() } if staticDir != "" { - // Check if directory exists if _, err := os.Stat(staticDir); err == nil { log.Printf("[INFO] Serving dashboard from %s", staticDir) r.Get("/*", func(w http.ResponseWriter, r *http.Request) { - // Try to serve static file, fall back to index.html for SPA routing path := filepath.Join(staticDir, r.URL.Path) - // If path is a directory, serve index.html if info, err := os.Stat(path); err == nil && info.IsDir() { path = filepath.Join(path, "index.html") } - // If file exists, serve it if _, err := os.Stat(path); err == nil { http.ServeFile(w, r, path) return } - // Fall back to index.html for SPA routing (except for /js/* paths) if filepath.Ext(r.URL.Path) == "" { http.ServeFile(w, r, filepath.Join(staticDir, "index.html")) return } - // File not found http.NotFound(w, r) }) } else { @@ -117,7 +140,7 @@ func main() { log.Printf("[WARN] No dashboard directory found, static files not served") } - // Start mDNS advertisement + // mDNS advertisement var mdnsServer *mdns.Server if cfg.MDNSEnabled { service, err := mdns.NewMDNSService( @@ -141,15 +164,13 @@ func main() { } } - // Start HTTP server srv := &http.Server{ Addr: cfg.BindAddr, Handler: r, ReadTimeout: 10 * time.Second, - WriteTimeout: 30 * time.Second, // Longer for WebSocket + WriteTimeout: 30 * time.Second, } - // Run server in goroutine go func() { log.Printf("[INFO] HTTP server listening on %s", cfg.BindAddr) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { @@ -157,28 +178,24 @@ func main() { } }() - // Wait for shutdown signal sig := <-sigChan log.Printf("[INFO] Received signal %v, initiating graceful shutdown", sig) - // Shutdown sequence + cancel() + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) defer shutdownCancel() - // Stop accepting new connections if err := srv.Shutdown(shutdownCtx); err != nil { log.Printf("[ERROR] HTTP server shutdown error: %v", err) } - // Close ingestion server (drains connections) ingestSrv.Shutdown(shutdownCtx) - // Stop mDNS if mdnsServer != nil { mdnsServer.Shutdown() } - cancel() log.Printf("[INFO] Shutdown complete") } @@ -189,6 +206,7 @@ func parseConfig() Config { mdnsName := getEnv("SPAXEL_MDNS_NAME", "spaxel") mdnsEnabled := getEnvBool("SPAXEL_MDNS_ENABLED", true) logLevel := getEnv("SPAXEL_LOG_LEVEL", "info") + replayMaxMB := getEnvInt("SPAXEL_REPLAY_MAX_MB", replay.DefaultMaxMB) flag.StringVar(&bindAddr, "bind", bindAddr, "Listen address") flag.StringVar(&dataDir, "data", dataDir, "Data directory") @@ -196,6 +214,7 @@ func parseConfig() Config { flag.StringVar(&mdnsName, "mdns-name", mdnsName, "mDNS service name") flag.BoolVar(&mdnsEnabled, "mdns", mdnsEnabled, "Enable mDNS advertisement") flag.StringVar(&logLevel, "log-level", logLevel, "Log level (debug, info, warn, error)") + flag.IntVar(&replayMaxMB, "replay-max-mb", replayMaxMB, "CSI replay buffer size in MB") flag.Parse() return Config{ @@ -205,6 +224,7 @@ func parseConfig() Config { MDNSName: mdnsName, MDNSEnabled: mdnsEnabled, LogLevel: logLevel, + ReplayMaxMB: replayMaxMB, } } @@ -222,14 +242,21 @@ func getEnvBool(key string, defaultVal bool) bool { return defaultVal } -// findDashboardDir attempts to locate the dashboard directory +func getEnvInt(key string, defaultVal int) int { + if val := os.Getenv(key); val != "" { + if n, err := strconv.Atoi(val); err == nil { + return n + } + } + return defaultVal +} + func findDashboardDir() string { - // Try common locations candidates := []string{ - "dashboard", // When running from repo root - "../dashboard", // When running from mothership/ - "../../dashboard", // When running from mothership/cmd/mothership/ - "/app/dashboard", // Docker container location + "dashboard", + "../dashboard", + "../../dashboard", + "/app/dashboard", } for _, dir := range candidates { diff --git a/mothership/internal/dashboard/hub.go b/mothership/internal/dashboard/hub.go index 6c0c121..1668747 100644 --- a/mothership/internal/dashboard/hub.go +++ b/mothership/internal/dashboard/hub.go @@ -13,20 +13,21 @@ import ( // Hub manages all dashboard client connections and broadcasts type Hub struct { - mu sync.RWMutex - clients map[*Client]struct{} - broadcast chan []byte - register chan *Client - unregister chan *Client + mu sync.RWMutex + clients map[*Client]struct{} + broadcast chan []byte + register chan *Client + unregister chan *Client // Reference to ingestion server for state queries ingestionState IngestionState } -// IngestionState is an interface to query node/link state from ingestion +// IngestionState is an interface to query node/link/motion state from ingestion type IngestionState interface { GetConnectedNodesInfo() []ingestion.NodeInfo GetAllLinksInfo() []ingestion.LinkInfo + GetAllMotionStates() []ingestion.MotionStateItem } // Client represents a dashboard WebSocket client @@ -64,8 +65,6 @@ func (h *Hub) Run() { h.clients[client] = struct{}{} h.mu.Unlock() log.Printf("[INFO] Dashboard client connected (total: %d)", len(h.clients)) - - // Send initial state h.sendInitialState(client) case client := <-h.unregister: @@ -89,7 +88,6 @@ func (h *Hub) Run() { h.mu.RUnlock() case <-ticker.C: - // Periodic state broadcast h.broadcastState() } } @@ -116,8 +114,6 @@ func (h *Hub) Broadcast(message []byte) { // BroadcastCSI broadcasts a CSI frame to all dashboard clients func (h *Hub) BroadcastCSI(nodeMAC, peerMAC string, data []byte) { - // For now, just forward the raw binary frame - // Dashboard clients will parse it h.Broadcast(data) } @@ -165,6 +161,17 @@ func (h *Hub) BroadcastLinkInactive(linkID string) { h.Broadcast(data) } +// BroadcastMotionState sends motion state for one or more links to all dashboard clients. +// Called on state changes (idle↔motion) so the dashboard updates immediately. +func (h *Hub) BroadcastMotionState(states []ingestion.MotionStateItem) { + msg := map[string]interface{}{ + "type": "motion_state", + "links": states, + } + data, _ := json.Marshal(msg) + h.Broadcast(data) +} + func (h *Hub) sendInitialState(client *Client) { h.mu.RLock() state := h.ingestionState @@ -174,27 +181,12 @@ func (h *Hub) sendInitialState(client *Client) { return } - // Build state message - msg := map[string]interface{}{ - "type": "state", - } - - nodes := state.GetConnectedNodesInfo() - if nodes != nil { - msg["nodes"] = nodes - } - - links := state.GetAllLinksInfo() - if links != nil { - msg["links"] = links - } - + msg := h.buildStateMsg(state) data, _ := json.Marshal(msg) select { case client.send <- data: default: - // Buffer full, skip } } @@ -208,23 +200,27 @@ func (h *Hub) broadcastState() { return } - // Build state message + msg := h.buildStateMsg(state) + data, _ := json.Marshal(msg) + h.Broadcast(data) +} + +func (h *Hub) buildStateMsg(state IngestionState) map[string]interface{} { msg := map[string]interface{}{ "type": "state", } - nodes := state.GetConnectedNodesInfo() - if nodes != nil { + if nodes := state.GetConnectedNodesInfo(); nodes != nil { msg["nodes"] = nodes } - - links := state.GetAllLinksInfo() - if links != nil { + if links := state.GetAllLinksInfo(); links != nil { msg["links"] = links } + if motionStates := state.GetAllMotionStates(); len(motionStates) > 0 { + msg["motion_states"] = motionStates + } - data, _ := json.Marshal(msg) - h.Broadcast(data) + return msg } // ClientCount returns the number of connected dashboard clients diff --git a/mothership/internal/dashboard/hub_test.go b/mothership/internal/dashboard/hub_test.go index ab002e8..575ac8b 100644 --- a/mothership/internal/dashboard/hub_test.go +++ b/mothership/internal/dashboard/hub_test.go @@ -156,6 +156,10 @@ func (m *MockIngestionState) GetAllLinksInfo() []ingestion.LinkInfo { return m.links } +func (m *MockIngestionState) GetAllMotionStates() []ingestion.MotionStateItem { + return nil +} + func TestHub_InitialState(t *testing.T) { hub := NewHub() go hub.Run() diff --git a/mothership/internal/ingestion/ratecontrol.go b/mothership/internal/ingestion/ratecontrol.go new file mode 100644 index 0000000..fa75314 --- /dev/null +++ b/mothership/internal/ingestion/ratecontrol.go @@ -0,0 +1,112 @@ +package ingestion + +import ( + "context" + "sync" + "time" +) + +const ( + // RateIdle is the CSI sampling rate (Hz) when no motion is detected. + RateIdle = 2 + // RateActive is the CSI sampling rate (Hz) when motion is detected. + RateActive = 20 + // idleTimeout is how long after the last motion event before dropping back to idle. + idleTimeout = 10 * time.Second +) + +// nodeRateState tracks the adaptive rate state for a single node. +type nodeRateState struct { + active bool + lastMotionAt time.Time +} + +// RateController manages per-node adaptive sensing rates. When motion is detected +// on a node's link, it ramps that node to RateActive (20 Hz). When no motion has +// been seen for idleTimeout, it drops back to RateIdle (2 Hz). The caller provides +// a configSender callback that sends the rate command to the node over WebSocket. +type RateController struct { + mu sync.Mutex + nodes map[string]*nodeRateState // keyed by node MAC + configSender func(nodeMAC string, rateHz int) +} + +// NewRateController creates a RateController. configSender is called whenever a +// node's rate should change; it must be goroutine-safe. +func NewRateController(configSender func(nodeMAC string, rateHz int)) *RateController { + return &RateController{ + nodes: make(map[string]*nodeRateState), + configSender: configSender, + } +} + +// OnMotionState is called after each CSI frame is processed. If the node was idle +// and motion is now detected, it ramps up immediately. +func (rc *RateController) OnMotionState(nodeMAC string, motionDetected bool) { + if !motionDetected { + return + } + + rc.mu.Lock() + defer rc.mu.Unlock() + + ns := rc.getOrCreate(nodeMAC) + ns.lastMotionAt = time.Now() + + if !ns.active { + ns.active = true + rc.configSender(nodeMAC, RateActive) + } +} + +// OnMotionHint is called when the ESP32 sends a motion_hint message (on-device +// variance exceeded threshold). Treated identically to a detected motion event. +func (rc *RateController) OnMotionHint(nodeMAC string) { + rc.OnMotionState(nodeMAC, true) +} + +// OnNodeDisconnected removes rate state for a disconnected node. +func (rc *RateController) OnNodeDisconnected(nodeMAC string) { + rc.mu.Lock() + delete(rc.nodes, nodeMAC) + rc.mu.Unlock() +} + +// Run starts the background goroutine that enforces idle timeouts. +// It returns when ctx is cancelled. +func (rc *RateController) Run(ctx context.Context) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + rc.checkIdleTimeouts() + } + } +} + +func (rc *RateController) checkIdleTimeouts() { + now := time.Now() + + rc.mu.Lock() + defer rc.mu.Unlock() + + for mac, ns := range rc.nodes { + if ns.active && now.Sub(ns.lastMotionAt) >= idleTimeout { + ns.active = false + rc.configSender(mac, RateIdle) + } + } +} + +func (rc *RateController) getOrCreate(nodeMAC string) *nodeRateState { + if ns, ok := rc.nodes[nodeMAC]; ok { + return ns + } + ns := &nodeRateState{} + rc.nodes[nodeMAC] = ns + return ns +} diff --git a/mothership/internal/ingestion/ratecontrol_test.go b/mothership/internal/ingestion/ratecontrol_test.go new file mode 100644 index 0000000..3357722 --- /dev/null +++ b/mothership/internal/ingestion/ratecontrol_test.go @@ -0,0 +1,143 @@ +package ingestion + +import ( + "testing" + "time" +) + +type rateSend struct { + mac string + rate int +} + +func newTestRC() (*RateController, *[]rateSend) { + sent := &[]rateSend{} + rc := NewRateController(func(mac string, rateHz int) { + *sent = append(*sent, rateSend{mac, rateHz}) + }) + return rc, sent +} + +func TestIdleToActive(t *testing.T) { + rc, sent := newTestRC() + + rc.OnMotionState("AA:BB:CC:DD:EE:FF", true) + + if len(*sent) != 1 { + t.Fatalf("expected 1 config send, got %d", len(*sent)) + } + if (*sent)[0].rate != RateActive { + t.Errorf("expected RateActive (%d), got %d", RateActive, (*sent)[0].rate) + } + if (*sent)[0].mac != "AA:BB:CC:DD:EE:FF" { + t.Errorf("unexpected mac %q", (*sent)[0].mac) + } +} + +func TestActiveNoRedundantSend(t *testing.T) { + rc, sent := newTestRC() + + // First motion event: ramp up + rc.OnMotionState("AA:BB:CC:DD:EE:FF", true) + // Subsequent motion events while active: no additional sends + rc.OnMotionState("AA:BB:CC:DD:EE:FF", true) + rc.OnMotionState("AA:BB:CC:DD:EE:FF", true) + + if len(*sent) != 1 { + t.Errorf("expected 1 send, got %d (redundant sends should be suppressed)", len(*sent)) + } +} + +func TestNoSendOnNoMotion(t *testing.T) { + rc, sent := newTestRC() + + rc.OnMotionState("AA:BB:CC:DD:EE:FF", false) + + if len(*sent) != 0 { + t.Errorf("expected 0 sends for no-motion event, got %d", len(*sent)) + } +} + +func TestIdleTimeoutDropsRate(t *testing.T) { + rc, sent := newTestRC() + + // Ramp up + rc.OnMotionState("AA:BB:CC:DD:EE:FF", true) + + // Manually set lastMotionAt far in the past to simulate timeout + rc.mu.Lock() + rc.nodes["AA:BB:CC:DD:EE:FF"].lastMotionAt = time.Now().Add(-idleTimeout - time.Second) + rc.mu.Unlock() + + rc.checkIdleTimeouts() + + if len(*sent) != 2 { + t.Fatalf("expected 2 sends (active + idle), got %d", len(*sent)) + } + if (*sent)[1].rate != RateIdle { + t.Errorf("expected RateIdle (%d) after timeout, got %d", RateIdle, (*sent)[1].rate) + } +} + +func TestIdleTimeoutNotTriggeredEarly(t *testing.T) { + rc, sent := newTestRC() + + rc.OnMotionState("AA:BB:CC:DD:EE:FF", true) + initialSends := len(*sent) + + // Timeout has not elapsed + rc.checkIdleTimeouts() + + if len(*sent) != initialSends { + t.Errorf("expected no new sends before timeout, got %d new sends", len(*sent)-initialSends) + } +} + +func TestMotionHint(t *testing.T) { + rc, sent := newTestRC() + + rc.OnMotionHint("AA:BB:CC:DD:EE:FF") + + if len(*sent) != 1 || (*sent)[0].rate != RateActive { + t.Errorf("OnMotionHint should ramp to active; sends=%v", *sent) + } +} + +func TestActiveToIdleAndBackToActive(t *testing.T) { + rc, sent := newTestRC() + + // Ramp up + rc.OnMotionState("AA:BB:CC:DD:EE:FF", true) + + // Force timeout + rc.mu.Lock() + rc.nodes["AA:BB:CC:DD:EE:FF"].lastMotionAt = time.Now().Add(-idleTimeout - time.Second) + rc.mu.Unlock() + rc.checkIdleTimeouts() + + // Motion detected again + rc.OnMotionState("AA:BB:CC:DD:EE:FF", true) + + // Should have: active, idle, active + if len(*sent) != 3 { + t.Fatalf("expected 3 sends, got %d: %v", len(*sent), *sent) + } + if (*sent)[2].rate != RateActive { + t.Errorf("expected third send to be RateActive, got %d", (*sent)[2].rate) + } +} + +func TestNodeDisconnectClearsState(t *testing.T) { + rc, _ := newTestRC() + + rc.OnMotionState("AA:BB:CC:DD:EE:FF", true) + rc.OnNodeDisconnected("AA:BB:CC:DD:EE:FF") + + rc.mu.Lock() + _, exists := rc.nodes["AA:BB:CC:DD:EE:FF"] + rc.mu.Unlock() + + if exists { + t.Error("node state should be removed after disconnect") + } +} diff --git a/mothership/internal/ingestion/server.go b/mothership/internal/ingestion/server.go index 2c315ee..dd83e1b 100644 --- a/mothership/internal/ingestion/server.go +++ b/mothership/internal/ingestion/server.go @@ -9,6 +9,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/spaxel/mothership/internal/signal" ) // CSIBroadcaster is a callback for broadcasting CSI frames to dashboard @@ -19,11 +20,32 @@ type CSIBroadcaster interface { BroadcastLinkActive(linkID, nodeMAC, peerMAC string) } +// MotionBroadcaster broadcasts motion state changes to dashboard clients. +type MotionBroadcaster interface { + BroadcastMotionState(states []MotionStateItem) +} + +// MotionStateItem represents a single link's current motion state. +type MotionStateItem struct { + LinkID string `json:"link_id"` + MotionDetected bool `json:"motion_detected"` + DeltaRMS float64 `json:"delta_rms"` +} + +// ReplayAppender appends raw CSI frames to a persistent store. +type ReplayAppender interface { + Append(recvTimeNS int64, rawFrame []byte) error +} + // Server manages WebSocket connections from ESP32 nodes type Server struct { - mu sync.RWMutex + mu sync.RWMutex connections map[string]*NodeConnection // keyed by MAC - links map[string]*RingBuffer // keyed by "nodeMAC:peerMAC" + links map[string]*RingBuffer // keyed by "nodeMAC:peerMAC" + + // Motion state per link (for change detection and state queries) + linkMotionState map[string]bool // linkID -> motionDetected + linkDeltaRMS map[string]float64 // linkID -> smoothDeltaRMS // Malformed frame tracking per connection malformedCounts map[string]*malformedCounter @@ -34,19 +56,23 @@ type Server struct { // Shutdown state shutdown bool - // Dashboard broadcaster (optional) + // Optional pipeline components (set via setters) dashboardBroadcaster CSIBroadcaster + motionBroadcaster MotionBroadcaster + processorMgr *signal.ProcessorManager + replayStore ReplayAppender + rateCtrl *RateController } // NodeConnection tracks state for a connected node type NodeConnection struct { - MAC string - Conn *websocket.Conn - Hello *HelloMessage - LastHealth *HealthMessage - LastHealthTime time.Time - ConnectedAt time.Time - LastFrameTime time.Time + MAC string + Conn *websocket.Conn + Hello *HelloMessage + LastHealth *HealthMessage + LastHealthTime time.Time + ConnectedAt time.Time + LastFrameTime time.Time // Write mutex for thread-safe sends writeMu sync.Mutex @@ -74,9 +100,10 @@ func NewServer() *Server { return &Server{ connections: make(map[string]*NodeConnection), links: make(map[string]*RingBuffer), + linkMotionState: make(map[string]bool), + linkDeltaRMS: make(map[string]float64), malformedCounts: make(map[string]*malformedCounter), upgrader: websocket.Upgrader{ - // Allow all origins for development (TODO: restrict in production) CheckOrigin: func(r *http.Request) bool { return true }, @@ -93,25 +120,60 @@ func (s *Server) SetDashboardBroadcaster(broadcaster CSIBroadcaster) { s.mu.Unlock() } +// SetMotionBroadcaster sets the callback for broadcasting motion state changes. +func (s *Server) SetMotionBroadcaster(mb MotionBroadcaster) { + s.mu.Lock() + s.motionBroadcaster = mb + s.mu.Unlock() +} + +// SetProcessorManager sets the signal processing pipeline. +func (s *Server) SetProcessorManager(pm *signal.ProcessorManager) { + s.mu.Lock() + s.processorMgr = pm + s.mu.Unlock() +} + +// SetReplayStore sets the disk-backed recording store. +func (s *Server) SetReplayStore(store ReplayAppender) { + s.mu.Lock() + s.replayStore = store + s.mu.Unlock() +} + +// SetRateController sets the adaptive rate controller. +func (s *Server) SetRateController(rc *RateController) { + s.mu.Lock() + s.rateCtrl = rc + s.mu.Unlock() +} + +// SendConfigToMAC sends a rate config command to a connected node by MAC. +func (s *Server) SendConfigToMAC(mac string, rateHz int) { + s.mu.RLock() + nc, ok := s.connections[mac] + s.mu.RUnlock() + if !ok { + return + } + s.sendConfig(nc, rateHz, 0, 0) +} + // HandleNodeWS handles WebSocket connections at /ws/node func (s *Server) HandleNodeWS(w http.ResponseWriter, r *http.Request) { - // Upgrade HTTP connection to WebSocket conn, err := s.upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("[WARN] WebSocket upgrade failed: %v", err) return } - // Set initial read deadline conn.SetReadDeadline(time.Now().Add(readDeadline)) - // Create connection state nc := &NodeConnection{ Conn: conn, ConnectedAt: time.Now(), } - // Wait for hello message (must be first) _, msg, err := conn.ReadMessage() if err != nil { log.Printf("[WARN] Failed to read hello: %v", err) @@ -119,7 +181,6 @@ func (s *Server) HandleNodeWS(w http.ResponseWriter, r *http.Request) { return } - // Parse as JSON (hello must be JSON) parsed, err := ParseJSONMessage(msg) if err != nil { s.sendReject(conn, "invalid hello format") @@ -137,9 +198,7 @@ func (s *Server) HandleNodeWS(w http.ResponseWriter, r *http.Request) { nc.MAC = hello.MAC nc.Hello = hello - // Register connection s.mu.Lock() - // Close existing connection from same MAC if present if existing, exists := s.connections[hello.MAC]; exists { existing.Conn.Close() } @@ -151,19 +210,14 @@ func (s *Server) HandleNodeWS(w http.ResponseWriter, r *http.Request) { log.Printf("[INFO] Node connected: MAC=%s firmware=%s chip=%s", hello.MAC, hello.FirmwareVersion, hello.Chip) - // Broadcast node connected to dashboard if broadcaster != nil { broadcaster.BroadcastNodeConnected(hello.MAC, hello.FirmwareVersion, hello.Chip) } - // Send initial role and config s.sendRole(nc, "rx", "") - s.sendConfig(nc, 20, 0, 0) // 20 Hz default + s.sendConfig(nc, 20, 0, 0) - // Start ping goroutine go s.pingLoop(nc) - - // Message handling loop s.handleMessages(nc) } @@ -175,18 +229,20 @@ func (s *Server) handleMessages(nc *NodeConnection) { delete(s.connections, nc.MAC) delete(s.malformedCounts, nc.MAC) broadcaster := s.dashboardBroadcaster + rateCtrl := s.rateCtrl s.mu.Unlock() log.Printf("[INFO] Node disconnected: MAC=%s", nc.MAC) - // Broadcast node disconnected to dashboard if broadcaster != nil { broadcaster.BroadcastNodeDisconnected(nc.MAC) } + if rateCtrl != nil { + rateCtrl.OnNodeDisconnected(nc.MAC) + } }() for { - // Reset read deadline on each message nc.Conn.SetReadDeadline(time.Now().Add(readDeadline)) messageType, data, err := nc.Conn.ReadMessage() @@ -213,10 +269,22 @@ func (s *Server) handleBinaryFrame(nc *NodeConnection, data []byte) { return } - // Update last frame time nc.LastFrameTime = time.Now() + recvTime := nc.LastFrameTime - // Get or create ring buffer for this link + s.mu.RLock() + replay := s.replayStore + pm := s.processorMgr + s.mu.RUnlock() + + // 1. Record raw frame to disk before any processing. + if replay != nil { + if err := replay.Append(recvTime.UnixNano(), data); err != nil { + log.Printf("[WARN] Replay append error: %v", err) + } + } + + // 2. Get or create ring buffer. linkID := frame.LinkID() s.mu.Lock() ring, exists := s.links[linkID] @@ -228,26 +296,54 @@ func (s *Server) handleBinaryFrame(nc *NodeConnection, data []byte) { broadcaster := s.dashboardBroadcaster s.mu.Unlock() - // Push frame to ring buffer - ring.Push(frame, time.Now()) + ring.Push(frame, recvTime) - // Broadcast to dashboard clients if broadcaster != nil { - // Forward raw binary frame to dashboard broadcaster.BroadcastCSI(frame.MACString(), frame.PeerMACString(), data) - - // Notify of new link if isNewLink { broadcaster.BroadcastLinkActive(linkID, frame.MACString(), frame.PeerMACString()) } } + + // 3. Signal processing pipeline. + if pm != nil && int(frame.NSub) > 0 { + result, err := pm.Process(linkID, frame.Payload, frame.RSSI, int(frame.NSub), recvTime) + if err != nil { + log.Printf("[DEBUG] Signal processing error for %s: %v", linkID, err) + return + } + + motionDetected := result.Features.MotionDetected + deltaRMS := result.Features.SmoothDeltaRMS + + // Check if motion state changed. + s.mu.Lock() + prev := s.linkMotionState[linkID] + stateChanged := prev != motionDetected + s.linkMotionState[linkID] = motionDetected + s.linkDeltaRMS[linkID] = deltaRMS + mb := s.motionBroadcaster + rateCtrl := s.rateCtrl + s.mu.Unlock() + + if stateChanged && mb != nil { + mb.BroadcastMotionState([]MotionStateItem{{ + LinkID: linkID, + MotionDetected: motionDetected, + DeltaRMS: deltaRMS, + }}) + } + + if rateCtrl != nil { + rateCtrl.OnMotionState(nc.MAC, motionDetected) + } + } } // handleJSONMessage processes a JSON control message func (s *Server) handleJSONMessage(nc *NodeConnection, data []byte) { parsed, err := ParseJSONMessage(data) if err != nil { - // Unknown types are silently ignored per protocol return } @@ -255,13 +351,17 @@ func (s *Server) handleJSONMessage(nc *NodeConnection, data []byte) { case *HealthMessage: nc.LastHealth = msg nc.LastHealthTime = time.Now() - // TODO: expose health metrics case *BLEMessage: // TODO: forward BLE data to identity matcher case *MotionHintMessage: - // TODO: trigger adaptive rate changes + s.mu.RLock() + rateCtrl := s.rateCtrl + s.mu.RUnlock() + if rateCtrl != nil { + rateCtrl.OnMotionHint(nc.MAC) + } case *OTAStatusMessage: // TODO: track OTA progress @@ -278,7 +378,6 @@ func (s *Server) recordMalformed(mac string) { return } - // Reset counter if window has passed if time.Since(counter.firstSeen) > malformedWindow { counter.count = 0 counter.firstSeen = time.Now() @@ -286,12 +385,10 @@ func (s *Server) recordMalformed(mac string) { counter.count++ - // Warn at 100 if counter.count == malformedWarnThreshold { log.Printf("[WARN] Node %s sending malformed CSI frames (count=%d)", mac, counter.count) } - // Close at 1000 if counter.count >= malformedCloseThreshold { log.Printf("[ERROR] Node %s exceeded malformed frame threshold, closing connection", mac) if nc, exists := s.connections[mac]; exists { @@ -311,10 +408,9 @@ func (s *Server) pingLoop(nc *NodeConnection) { nc.writeMu.Unlock() if err != nil { - return // Connection closed + return } - // Check for shutdown s.mu.RLock() shutdown := s.shutdown s.mu.RUnlock() @@ -365,7 +461,6 @@ func (s *Server) Shutdown(ctx context.Context) { s.mu.Lock() s.shutdown = true - // Send shutdown message to all connected nodes shutdownMsg := ShutdownMessage{Type: "shutdown", ReconnectInMS: 30000} data, _ := json.Marshal(shutdownMsg) @@ -431,7 +526,6 @@ func (s *Server) GetAllLinksInfo() []LinkInfo { links := make([]LinkInfo, 0, len(s.links)) for linkID := range s.links { - // Parse linkID format "nodeMAC:peerMAC" (17 chars + 1 colon + 17 chars) if len(linkID) >= 35 { links = append(links, LinkInfo{ ID: linkID, @@ -443,6 +537,22 @@ func (s *Server) GetAllLinksInfo() []LinkInfo { return links } +// GetAllMotionStates returns current motion state for all known links. +func (s *Server) GetAllMotionStates() []MotionStateItem { + s.mu.RLock() + defer s.mu.RUnlock() + + states := make([]MotionStateItem, 0, len(s.linkMotionState)) + for linkID, detected := range s.linkMotionState { + states = append(states, MotionStateItem{ + LinkID: linkID, + MotionDetected: detected, + DeltaRMS: s.linkDeltaRMS[linkID], + }) + } + return states +} + // GetLinkBuffer returns the ring buffer for a specific link func (s *Server) GetLinkBuffer(nodeMAC, peerMAC string) *RingBuffer { linkID := nodeMAC + ":" + peerMAC diff --git a/mothership/internal/replay/store.go b/mothership/internal/replay/store.go new file mode 100644 index 0000000..1c41e16 --- /dev/null +++ b/mothership/internal/replay/store.go @@ -0,0 +1,232 @@ +// Package replay provides a disk-backed circular buffer for raw CSI frames. +// +// File layout: +// +// Header (32 bytes): +// magic[8] "SPAXLREP" +// writePos[8] uint64 LE — absolute file offset of next write +// oldestPos[8] uint64 LE — absolute file offset of oldest valid record (0 = empty) +// wrapPos[8] uint64 LE — writePos at last wrap point (0 = no pending wrap) +// +// Record (10 + frameLen bytes): +// recvTimeNS[8] int64 LE — Unix nanosecond receive timestamp +// frameLen[2] uint16 LE — length of following frame bytes +// frameData[N] raw CSI frame bytes +// +// Eviction: when the oldest record's successor would reach wrapPos, oldest wraps +// to headerSize. New writes evict oldest records as needed to make room. +package replay + +import ( + "encoding/binary" + "errors" + "os" + "sync" +) + +const ( + fileMagic = "SPAXLREP" + headerSize = int64(32) // magic(8) + writePos(8) + oldestPos(8) + wrapPos(8) + recordOverhead = int64(10) // recvTimeNS(8) + frameLen(2) + maxFrameBytes = int64(280) // per plan: max CSI frame = 24 + 128*2 + + // DefaultMaxMB is the default recording buffer capacity in megabytes (~48h at 20 Hz, 20 links). + DefaultMaxMB = 360 +) + +// RecordingStore is a disk-backed circular buffer for raw CSI frames. +// It is safe for concurrent use. +type RecordingStore struct { + mu sync.Mutex + f *os.File + fileSize int64 // total file size including header + writePos int64 // absolute file offset of next write + oldestPos int64 // absolute file offset of oldest valid record (0 = empty) + wrapPos int64 // writePos at time of last wrap (0 = no pending wrap) +} + +// NewRecordingStore opens or creates a recording store at path. +// maxMB is the data capacity; pass 0 to use DefaultMaxMB. +func NewRecordingStore(path string, maxMB int) (*RecordingStore, error) { + if maxMB <= 0 { + maxMB = DefaultMaxMB + } + fileSize := headerSize + int64(maxMB)*1024*1024 + + if fileSize-headerSize < maxFrameBytes+recordOverhead { + return nil, errors.New("replay: maxMB too small for a single record") + } + + f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + + s := &RecordingStore{ + f: f, + fileSize: fileSize, + } + + info, err := f.Stat() + if err != nil { + f.Close() + return nil, err + } + + if info.Size() >= headerSize { + if herr := s.readHeader(); herr == nil && s.headerValid() { + // Grow file to new size if needed + if info.Size() < fileSize { + if terr := f.Truncate(fileSize); terr != nil { + f.Close() + return nil, terr + } + } + return s, nil + } + } + + // Fresh store + s.writePos = headerSize + s.oldestPos = 0 + s.wrapPos = 0 + if err := f.Truncate(fileSize); err != nil { + f.Close() + return nil, err + } + if err := s.syncHeader(); err != nil { + f.Close() + return nil, err + } + return s, nil +} + +// Append writes a raw CSI frame to the store. +func (s *RecordingStore) Append(recvTimeNS int64, rawFrame []byte) error { + frameLen := int64(len(rawFrame)) + if frameLen > maxFrameBytes { + return errors.New("replay: frame exceeds maximum size") + } + recordSize := recordOverhead + frameLen + if recordSize > s.fileSize-headerSize { + return errors.New("replay: buffer too small for record") + } + + s.mu.Lock() + defer s.mu.Unlock() + + // Wrap writePos if record won't fit before end of file. + if s.writePos+recordSize > s.fileSize { + s.wrapPos = s.writePos + s.writePos = headerSize + } + + // Evict oldest records that fall within [writePos, writePos+recordSize). + for s.hasData() && s.oldestPos >= s.writePos && s.oldestPos < s.writePos+recordSize { + if err := s.evictOne(); err != nil { + return err + } + } + + wasEmpty := !s.hasData() + + // Build record. + buf := make([]byte, recordSize) + binary.LittleEndian.PutUint64(buf[0:8], uint64(recvTimeNS)) + binary.LittleEndian.PutUint16(buf[8:10], uint16(frameLen)) + copy(buf[10:], rawFrame) + + if _, err := s.f.WriteAt(buf, s.writePos); err != nil { + return err + } + + if wasEmpty { + s.oldestPos = s.writePos + } + s.writePos += recordSize + + return s.syncHeader() +} + +// WritePos returns the current write position (for diagnostics). +func (s *RecordingStore) WritePos() int64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.writePos +} + +// Close closes the underlying file. +func (s *RecordingStore) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.f.Close() +} + +// hasData reports whether there are any valid records. +func (s *RecordingStore) hasData() bool { + return s.oldestPos != 0 +} + +// evictOne advances oldestPos past the oldest record. +func (s *RecordingStore) evictOne() error { + if !s.hasData() { + return nil + } + + var lenBuf [2]byte + if _, err := s.f.ReadAt(lenBuf[:], s.oldestPos+8); err != nil { + return err + } + frameLen := int64(binary.LittleEndian.Uint16(lenBuf[:])) + if frameLen > maxFrameBytes { + // Corrupted record; reset state to recover. + s.oldestPos = 0 + s.wrapPos = 0 + return nil + } + + nextPos := s.oldestPos + recordOverhead + frameLen + + // When oldest has consumed the right arc, wrap it to the data start. + if s.wrapPos != 0 && nextPos >= s.wrapPos { + nextPos = headerSize + s.wrapPos = 0 + } + + if nextPos == s.writePos { + s.oldestPos = 0 // buffer is now empty + } else { + s.oldestPos = nextPos + } + return nil +} + +func (s *RecordingStore) headerValid() bool { + return s.writePos >= headerSize && s.writePos <= s.fileSize && + (s.oldestPos == 0 || (s.oldestPos >= headerSize && s.oldestPos <= s.fileSize)) && + (s.wrapPos == 0 || (s.wrapPos >= headerSize && s.wrapPos <= s.fileSize)) +} + +func (s *RecordingStore) readHeader() error { + var buf [32]byte + if _, err := s.f.ReadAt(buf[:], 0); err != nil { + return err + } + if string(buf[0:8]) != fileMagic { + return errors.New("replay: invalid magic") + } + s.writePos = int64(binary.LittleEndian.Uint64(buf[8:16])) + s.oldestPos = int64(binary.LittleEndian.Uint64(buf[16:24])) + s.wrapPos = int64(binary.LittleEndian.Uint64(buf[24:32])) + return nil +} + +func (s *RecordingStore) syncHeader() error { + var buf [32]byte + copy(buf[0:8], fileMagic) + binary.LittleEndian.PutUint64(buf[8:16], uint64(s.writePos)) + binary.LittleEndian.PutUint64(buf[16:24], uint64(s.oldestPos)) + binary.LittleEndian.PutUint64(buf[24:32], uint64(s.wrapPos)) + _, err := s.f.WriteAt(buf[:], 0) + return err +} diff --git a/mothership/internal/replay/store_test.go b/mothership/internal/replay/store_test.go new file mode 100644 index 0000000..de264a0 --- /dev/null +++ b/mothership/internal/replay/store_test.go @@ -0,0 +1,205 @@ +package replay + +import ( + "os" + "path/filepath" + "testing" +) + +func tempStore(t *testing.T, maxMB int) (*RecordingStore, string) { + t.Helper() + path := filepath.Join(t.TempDir(), "test_replay.bin") + s, err := NewRecordingStore(path, maxMB) + if err != nil { + t.Fatalf("NewRecordingStore: %v", err) + } + return s, path +} + +func makeFrame(size int) []byte { + b := make([]byte, size) + for i := range b { + b[i] = byte(i % 251) + } + return b +} + +func TestNewStore(t *testing.T) { + s, _ := tempStore(t, 1) + defer s.Close() + + if s.writePos != headerSize { + t.Errorf("writePos = %d, want %d", s.writePos, headerSize) + } + if s.hasData() { + t.Error("new store should be empty") + } +} + +func TestBasicAppend(t *testing.T) { + s, _ := tempStore(t, 1) + defer s.Close() + + frame := makeFrame(152) // typical 64-subcarrier frame + if err := s.Append(1000, frame); err != nil { + t.Fatalf("Append: %v", err) + } + + expected := headerSize + recordOverhead + int64(len(frame)) + if s.writePos != expected { + t.Errorf("writePos = %d, want %d", s.writePos, expected) + } + if !s.hasData() { + t.Error("store should have data after append") + } + if s.oldestPos != headerSize { + t.Errorf("oldestPos = %d, want %d", s.oldestPos, headerSize) + } +} + +func TestMultipleAppends(t *testing.T) { + s, _ := tempStore(t, 1) + defer s.Close() + + frame := makeFrame(152) + for i := 0; i < 5; i++ { + if err := s.Append(int64(i*1000), frame); err != nil { + t.Fatalf("Append %d: %v", i, err) + } + } + + expectedPos := headerSize + 5*(recordOverhead+int64(len(frame))) + if s.writePos != expectedPos { + t.Errorf("writePos = %d, want %d", s.writePos, expectedPos) + } +} + +func TestWrapAround(t *testing.T) { + // Use a tiny 1-byte "MB" which gives ~1 MB buffer. Use a custom small size via + // direct manipulation for testing. Instead, use a small-but-valid maxMB. + // 1 MB data area fits floor(1MB / (10+152)) = ~6501 frames. + s, _ := tempStore(t, 1) + defer s.Close() + + frame := makeFrame(152) + recordSize := recordOverhead + int64(len(frame)) + + // Calculate how many records fit before a wrap + dataArea := s.fileSize - headerSize + recsBeforeWrap := int(dataArea / recordSize) // integer division + + // Fill to just before wrap + for i := 0; i < recsBeforeWrap; i++ { + if err := s.Append(int64(i), frame); err != nil { + t.Fatalf("Append %d before wrap: %v", i, err) + } + } + + beforeWrapPos := s.writePos + firstOldest := s.oldestPos + + // One more append should trigger wrap + eviction + if err := s.Append(int64(recsBeforeWrap), frame); err != nil { + t.Fatalf("Append triggering wrap: %v", err) + } + + // writePos should have wrapped and advanced from headerSize + if s.writePos >= beforeWrapPos { + t.Errorf("writePos %d should have wrapped (was %d before wrap)", s.writePos, beforeWrapPos) + } + // oldest should have advanced (eviction happened) + if s.oldestPos == firstOldest && s.oldestPos != 0 { + t.Errorf("oldestPos %d should have advanced after wrap (first was %d)", s.oldestPos, firstOldest) + } +} + +func TestCrashRecovery(t *testing.T) { + frame := makeFrame(152) + dir := t.TempDir() + path := filepath.Join(dir, "replay.bin") + + // Write some frames + s1, err := NewRecordingStore(path, 1) + if err != nil { + t.Fatal(err) + } + for i := 0; i < 3; i++ { + if err := s1.Append(int64(i*1000), frame); err != nil { + t.Fatalf("s1.Append %d: %v", i, err) + } + } + savedWrite := s1.writePos + savedOldest := s1.oldestPos + s1.Close() + + // Reopen should restore state + s2, err := NewRecordingStore(path, 1) + if err != nil { + t.Fatal(err) + } + defer s2.Close() + + if s2.writePos != savedWrite { + t.Errorf("writePos after reopen = %d, want %d", s2.writePos, savedWrite) + } + if s2.oldestPos != savedOldest { + t.Errorf("oldestPos after reopen = %d, want %d", s2.oldestPos, savedOldest) + } +} + +func TestEvictionMaintainsData(t *testing.T) { + // Use a very small buffer: headerSize + 3 records fit + // We do this by checking eviction logic directly, using + // a tiny 1MB store (already tested above). Simulate eviction + // explicitly by calling evictOne after appending. + s, _ := tempStore(t, 1) + defer s.Close() + + frame := makeFrame(100) + + // Fill buffer just past the point where eviction must occur + dataArea := s.fileSize - headerSize + recordSize := recordOverhead + int64(len(frame)) + count := int(dataArea/recordSize) + 5 // force wraps + + for i := 0; i < count; i++ { + if err := s.Append(int64(i), frame); err != nil { + t.Fatalf("Append %d: %v", i, err) + } + } + + // Store should still be valid (not returning errors) + if s.writePos < headerSize || s.writePos > s.fileSize { + t.Errorf("writePos %d out of range [%d, %d]", s.writePos, headerSize, s.fileSize) + } +} + +func TestInvalidMagicStartsFresh(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "bad.bin") + + // Write garbage + if err := os.WriteFile(path, []byte("GARBAGE_DATA_123456789012345678901234567890"), 0644); err != nil { + t.Fatal(err) + } + + s, err := NewRecordingStore(path, 1) + if err != nil { + t.Fatalf("should recover from bad magic: %v", err) + } + defer s.Close() + + if s.writePos != headerSize { + t.Errorf("writePos = %d after bad magic, want %d", s.writePos, headerSize) + } +} + +func TestFrameTooLarge(t *testing.T) { + s, _ := tempStore(t, 1) + defer s.Close() + + oversized := make([]byte, maxFrameBytes+1) + if err := s.Append(0, oversized); err == nil { + t.Error("expected error for oversized frame") + } +}