feat(mothership): complete Phase 2 signal processing & detection

- Dashboard presence indicator: per-link MOTION/CLEAR badges with global
  status bar indicator, motion_state WebSocket messages, amplitude chart
- CSI recording buffer: disk-backed circular buffer (replay/store.go) with
  magic-tagged binary format, wrap/eviction, 360 MB default (~48 h at 20 Hz)
- Adaptive sensing rate: RateController ramps nodes to 20 Hz on motion,
  drops to 2 Hz after 10 s idle; wires to SendConfigToMAC over WebSocket
- Fix: alias internal/signal as sigproc to avoid conflict with os/signal
- Fix: add GetAllMotionStates() to MockIngestionState in dashboard tests

All tests pass (signal, ingestion, replay, dashboard).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-03-26 22:27:24 -04:00
parent 973b0a0b62
commit de424a1f63
10 changed files with 1036 additions and 116 deletions

View file

@ -186,6 +186,44 @@
.link-item.selected {
background: rgba(79, 195, 247, 0.2);
}
/* Presence badge */
.presence-badge {
font-size: 10px;
font-weight: 600;
padding: 2px 6px;
border-radius: 3px;
letter-spacing: 0.5px;
}
.presence-badge.motion {
background: rgba(244, 67, 54, 0.25);
color: #ef5350;
}
.presence-badge.clear {
background: rgba(76, 175, 80, 0.2);
color: #66bb6a;
}
/* Overall presence indicator in status bar */
#presence-indicator {
font-size: 12px;
font-weight: 600;
padding: 3px 8px;
border-radius: 4px;
transition: background 0.3s, color 0.3s;
}
#presence-indicator.motion {
background: rgba(244, 67, 54, 0.3);
color: #ef5350;
}
#presence-indicator.clear {
background: rgba(76, 175, 80, 0.15);
color: #66bb6a;
}
</style>
</head>
<body>
@ -201,6 +239,9 @@
<div class="status-item">
<span>Links: <strong id="link-count">0</strong></span>
</div>
<div class="status-item">
<span id="presence-indicator" class="clear">CLEAR</span>
</div>
<div class="status-item" style="margin-left:auto;">
<span>FPS: <strong id="fps-counter">0</strong></span>
</div>

View file

@ -31,7 +31,7 @@
ws: null,
wsConnected: false,
nodes: new Map(), // MAC -> { mac, firmware, chip, lastSeen }
links: new Map(), // linkID -> { nodeMAC, peerMAC, lastFrame, lastCSI }
links: new Map(), // linkID -> { nodeMAC, peerMAC, lastFrame, lastCSI, motionDetected, deltaRMS }
selectedLinkID: null,
lastChartUpdate: 0,
frameCount: 0,
@ -220,14 +220,20 @@
}
if (msg.links) {
msg.links.forEach(link => {
const existing = state.links.get(link.id) || {};
state.links.set(link.id, {
nodeMAC: link.node_mac,
peerMAC: link.peer_mac,
lastFrame: Date.now(),
lastCSI: null
lastCSI: existing.lastCSI || null,
motionDetected: existing.motionDetected || false,
deltaRMS: existing.deltaRMS || 0
});
});
}
if (msg.motion_states) {
msg.motion_states.forEach(ms => applyMotionState(ms));
}
updateNodeList();
updateLinkList();
break;
@ -253,17 +259,40 @@
nodeMAC: msg.node_mac,
peerMAC: msg.peer_mac,
lastFrame: Date.now(),
lastCSI: null
lastCSI: null,
motionDetected: false,
deltaRMS: 0
});
updateLinkList();
}
break;
case 'motion_state':
// Targeted broadcast on state change
if (msg.links) {
let changed = false;
msg.links.forEach(ms => {
if (applyMotionState(ms)) changed = true;
});
if (changed) updateLinkList();
}
break;
default:
// Ignore unknown types (forward-compatible)
}
}
// applyMotionState updates a link's motion fields; returns true if it changed.
function applyMotionState(ms) {
const link = state.links.get(ms.link_id);
if (!link) return false;
const prev = link.motionDetected;
link.motionDetected = ms.motion_detected;
link.deltaRMS = ms.delta_rms || 0;
return prev !== ms.motion_detected;
}
function handleBinaryFrame(buffer) {
const frame = parseCSIFrame(buffer);
if (!frame) return;
@ -277,7 +306,9 @@
nodeMAC: frame.nodeMAC,
peerMAC: frame.peerMAC,
lastFrame: Date.now(),
lastCSI: null
lastCSI: null,
motionDetected: false,
deltaRMS: 0
};
state.links.set(linkID, link);
updateLinkList();
@ -387,6 +418,21 @@
container.innerHTML = html;
}
function updatePresenceIndicator() {
let anyMotion = false;
state.links.forEach(link => {
if (link.motionDetected) anyMotion = true;
});
const el = document.getElementById('presence-indicator');
if (anyMotion) {
el.className = 'motion';
el.textContent = 'MOTION';
} else {
el.className = 'clear';
el.textContent = 'CLEAR';
}
}
function updateLinkList() {
const container = document.getElementById('link-list');
document.getElementById('link-count').textContent = state.links.size;
@ -400,10 +446,12 @@
state.links.forEach((link, id) => {
const selected = state.selectedLinkID === id ? 'selected' : '';
const shortID = id.split(':').map(p => p.split(':').slice(-1)[0]).join('→');
const motionClass = link.motionDetected ? 'motion' : 'clear';
const motionLabel = link.motionDetected ? 'MOTION' : 'CLEAR';
html += `
<div class="link-item ${selected}" data-link-id="${id}">
<span>${shortID}</span>
<span style="color:#666">${link.nSub || 64} sub</span>
<span class="presence-badge ${motionClass}">${motionLabel}</span>
</div>
`;
});
@ -413,6 +461,8 @@
container.querySelectorAll('.link-item').forEach(el => {
el.addEventListener('click', () => selectLink(el.dataset.linkId));
});
updatePresenceIndicator();
}
function selectLink(linkID) {

View file

@ -10,6 +10,7 @@ import (
"os"
"os/signal"
"path/filepath"
"strconv"
"syscall"
"time"
@ -18,6 +19,8 @@ import (
"github.com/hashicorp/mdns"
"github.com/spaxel/mothership/internal/dashboard"
"github.com/spaxel/mothership/internal/ingestion"
"github.com/spaxel/mothership/internal/replay"
sigproc "github.com/spaxel/mothership/internal/signal"
)
// Build-time version injection
@ -31,6 +34,7 @@ type Config struct {
MDNSName string
MDNSEnabled bool
LogLevel string
ReplayMaxMB int
}
func main() {
@ -38,20 +42,16 @@ func main() {
log.Printf("[INFO] Spaxel mothership v%s starting", version)
log.Printf("[DEBUG] Config: bind=%s data=%s static=%s mdns=%s", cfg.BindAddr, cfg.DataDir, cfg.StaticDir, cfg.MDNSName)
// Create context with cancellation for graceful shutdown
_, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Set up signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
// Create router
r := chi.NewRouter()
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)
// Health check endpoint
r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
@ -62,52 +62,75 @@ func main() {
ingestSrv := ingestion.NewServer()
r.HandleFunc("/ws/node", ingestSrv.HandleNodeWS)
// Create dashboard hub and server
// Signal processing pipeline
pm := sigproc.NewProcessorManager(sigproc.ProcessorManagerConfig{
NSub: 64,
FusionRate: 10.0,
Tau: 30.0,
})
ingestSrv.SetProcessorManager(pm)
// Replay recording store
if err := os.MkdirAll(cfg.DataDir, 0755); err != nil {
log.Printf("[WARN] Failed to create data dir %s: %v", cfg.DataDir, err)
} else {
store, err := replay.NewRecordingStore(filepath.Join(cfg.DataDir, "csi_replay.bin"), cfg.ReplayMaxMB)
if err != nil {
log.Printf("[WARN] Failed to open replay store: %v (CSI recording disabled)", err)
} else {
ingestSrv.SetReplayStore(store)
defer store.Close()
log.Printf("[INFO] CSI replay store at %s (%d MB max)", filepath.Join(cfg.DataDir, "csi_replay.bin"), cfg.ReplayMaxMB)
}
}
// Adaptive rate controller
rateCtrl := ingestion.NewRateController(func(mac string, rateHz int) {
ingestSrv.SendConfigToMAC(mac, rateHz)
})
ingestSrv.SetRateController(rateCtrl)
go rateCtrl.Run(ctx)
// Dashboard hub and server
dashboardHub := dashboard.NewHub()
dashboardSrv := dashboard.NewServer(dashboardHub)
// Connect ingestion to dashboard (for state queries)
dashboardHub.SetIngestionState(ingestSrv)
// Start dashboard hub in background
// Wire ingestion → dashboard for CSI and motion broadcasts
ingestSrv.SetDashboardBroadcaster(dashboardHub)
ingestSrv.SetMotionBroadcaster(dashboardHub)
go dashboardHub.Run()
// Dashboard WebSocket endpoint
r.HandleFunc("/ws/dashboard", dashboardSrv.HandleDashboardWS)
// Serve dashboard static files
staticDir := cfg.StaticDir
if staticDir == "" {
// Default: look for dashboard directory relative to binary or cwd
staticDir = findDashboardDir()
}
if staticDir != "" {
// Check if directory exists
if _, err := os.Stat(staticDir); err == nil {
log.Printf("[INFO] Serving dashboard from %s", staticDir)
r.Get("/*", func(w http.ResponseWriter, r *http.Request) {
// Try to serve static file, fall back to index.html for SPA routing
path := filepath.Join(staticDir, r.URL.Path)
// If path is a directory, serve index.html
if info, err := os.Stat(path); err == nil && info.IsDir() {
path = filepath.Join(path, "index.html")
}
// If file exists, serve it
if _, err := os.Stat(path); err == nil {
http.ServeFile(w, r, path)
return
}
// Fall back to index.html for SPA routing (except for /js/* paths)
if filepath.Ext(r.URL.Path) == "" {
http.ServeFile(w, r, filepath.Join(staticDir, "index.html"))
return
}
// File not found
http.NotFound(w, r)
})
} else {
@ -117,7 +140,7 @@ func main() {
log.Printf("[WARN] No dashboard directory found, static files not served")
}
// Start mDNS advertisement
// mDNS advertisement
var mdnsServer *mdns.Server
if cfg.MDNSEnabled {
service, err := mdns.NewMDNSService(
@ -141,15 +164,13 @@ func main() {
}
}
// Start HTTP server
srv := &http.Server{
Addr: cfg.BindAddr,
Handler: r,
ReadTimeout: 10 * time.Second,
WriteTimeout: 30 * time.Second, // Longer for WebSocket
WriteTimeout: 30 * time.Second,
}
// Run server in goroutine
go func() {
log.Printf("[INFO] HTTP server listening on %s", cfg.BindAddr)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
@ -157,28 +178,24 @@ func main() {
}
}()
// Wait for shutdown signal
sig := <-sigChan
log.Printf("[INFO] Received signal %v, initiating graceful shutdown", sig)
// Shutdown sequence
cancel()
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
// Stop accepting new connections
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Printf("[ERROR] HTTP server shutdown error: %v", err)
}
// Close ingestion server (drains connections)
ingestSrv.Shutdown(shutdownCtx)
// Stop mDNS
if mdnsServer != nil {
mdnsServer.Shutdown()
}
cancel()
log.Printf("[INFO] Shutdown complete")
}
@ -189,6 +206,7 @@ func parseConfig() Config {
mdnsName := getEnv("SPAXEL_MDNS_NAME", "spaxel")
mdnsEnabled := getEnvBool("SPAXEL_MDNS_ENABLED", true)
logLevel := getEnv("SPAXEL_LOG_LEVEL", "info")
replayMaxMB := getEnvInt("SPAXEL_REPLAY_MAX_MB", replay.DefaultMaxMB)
flag.StringVar(&bindAddr, "bind", bindAddr, "Listen address")
flag.StringVar(&dataDir, "data", dataDir, "Data directory")
@ -196,6 +214,7 @@ func parseConfig() Config {
flag.StringVar(&mdnsName, "mdns-name", mdnsName, "mDNS service name")
flag.BoolVar(&mdnsEnabled, "mdns", mdnsEnabled, "Enable mDNS advertisement")
flag.StringVar(&logLevel, "log-level", logLevel, "Log level (debug, info, warn, error)")
flag.IntVar(&replayMaxMB, "replay-max-mb", replayMaxMB, "CSI replay buffer size in MB")
flag.Parse()
return Config{
@ -205,6 +224,7 @@ func parseConfig() Config {
MDNSName: mdnsName,
MDNSEnabled: mdnsEnabled,
LogLevel: logLevel,
ReplayMaxMB: replayMaxMB,
}
}
@ -222,14 +242,21 @@ func getEnvBool(key string, defaultVal bool) bool {
return defaultVal
}
// findDashboardDir attempts to locate the dashboard directory
func getEnvInt(key string, defaultVal int) int {
if val := os.Getenv(key); val != "" {
if n, err := strconv.Atoi(val); err == nil {
return n
}
}
return defaultVal
}
func findDashboardDir() string {
// Try common locations
candidates := []string{
"dashboard", // When running from repo root
"../dashboard", // When running from mothership/
"../../dashboard", // When running from mothership/cmd/mothership/
"/app/dashboard", // Docker container location
"dashboard",
"../dashboard",
"../../dashboard",
"/app/dashboard",
}
for _, dir := range candidates {

View file

@ -13,20 +13,21 @@ import (
// Hub manages all dashboard client connections and broadcasts
type Hub struct {
mu sync.RWMutex
clients map[*Client]struct{}
broadcast chan []byte
register chan *Client
unregister chan *Client
mu sync.RWMutex
clients map[*Client]struct{}
broadcast chan []byte
register chan *Client
unregister chan *Client
// Reference to ingestion server for state queries
ingestionState IngestionState
}
// IngestionState is an interface to query node/link state from ingestion
// IngestionState is an interface to query node/link/motion state from ingestion
type IngestionState interface {
GetConnectedNodesInfo() []ingestion.NodeInfo
GetAllLinksInfo() []ingestion.LinkInfo
GetAllMotionStates() []ingestion.MotionStateItem
}
// Client represents a dashboard WebSocket client
@ -64,8 +65,6 @@ func (h *Hub) Run() {
h.clients[client] = struct{}{}
h.mu.Unlock()
log.Printf("[INFO] Dashboard client connected (total: %d)", len(h.clients))
// Send initial state
h.sendInitialState(client)
case client := <-h.unregister:
@ -89,7 +88,6 @@ func (h *Hub) Run() {
h.mu.RUnlock()
case <-ticker.C:
// Periodic state broadcast
h.broadcastState()
}
}
@ -116,8 +114,6 @@ func (h *Hub) Broadcast(message []byte) {
// BroadcastCSI broadcasts a CSI frame to all dashboard clients
func (h *Hub) BroadcastCSI(nodeMAC, peerMAC string, data []byte) {
// For now, just forward the raw binary frame
// Dashboard clients will parse it
h.Broadcast(data)
}
@ -165,6 +161,17 @@ func (h *Hub) BroadcastLinkInactive(linkID string) {
h.Broadcast(data)
}
// BroadcastMotionState sends motion state for one or more links to all dashboard clients.
// Called on state changes (idle↔motion) so the dashboard updates immediately.
func (h *Hub) BroadcastMotionState(states []ingestion.MotionStateItem) {
msg := map[string]interface{}{
"type": "motion_state",
"links": states,
}
data, _ := json.Marshal(msg)
h.Broadcast(data)
}
func (h *Hub) sendInitialState(client *Client) {
h.mu.RLock()
state := h.ingestionState
@ -174,27 +181,12 @@ func (h *Hub) sendInitialState(client *Client) {
return
}
// Build state message
msg := map[string]interface{}{
"type": "state",
}
nodes := state.GetConnectedNodesInfo()
if nodes != nil {
msg["nodes"] = nodes
}
links := state.GetAllLinksInfo()
if links != nil {
msg["links"] = links
}
msg := h.buildStateMsg(state)
data, _ := json.Marshal(msg)
select {
case client.send <- data:
default:
// Buffer full, skip
}
}
@ -208,23 +200,27 @@ func (h *Hub) broadcastState() {
return
}
// Build state message
msg := h.buildStateMsg(state)
data, _ := json.Marshal(msg)
h.Broadcast(data)
}
func (h *Hub) buildStateMsg(state IngestionState) map[string]interface{} {
msg := map[string]interface{}{
"type": "state",
}
nodes := state.GetConnectedNodesInfo()
if nodes != nil {
if nodes := state.GetConnectedNodesInfo(); nodes != nil {
msg["nodes"] = nodes
}
links := state.GetAllLinksInfo()
if links != nil {
if links := state.GetAllLinksInfo(); links != nil {
msg["links"] = links
}
if motionStates := state.GetAllMotionStates(); len(motionStates) > 0 {
msg["motion_states"] = motionStates
}
data, _ := json.Marshal(msg)
h.Broadcast(data)
return msg
}
// ClientCount returns the number of connected dashboard clients

View file

@ -156,6 +156,10 @@ func (m *MockIngestionState) GetAllLinksInfo() []ingestion.LinkInfo {
return m.links
}
func (m *MockIngestionState) GetAllMotionStates() []ingestion.MotionStateItem {
return nil
}
func TestHub_InitialState(t *testing.T) {
hub := NewHub()
go hub.Run()

View file

@ -0,0 +1,112 @@
package ingestion
import (
"context"
"sync"
"time"
)
const (
// RateIdle is the CSI sampling rate (Hz) when no motion is detected.
RateIdle = 2
// RateActive is the CSI sampling rate (Hz) when motion is detected.
RateActive = 20
// idleTimeout is how long after the last motion event before dropping back to idle.
idleTimeout = 10 * time.Second
)
// nodeRateState tracks the adaptive rate state for a single node.
type nodeRateState struct {
active bool
lastMotionAt time.Time
}
// RateController manages per-node adaptive sensing rates. When motion is detected
// on a node's link, it ramps that node to RateActive (20 Hz). When no motion has
// been seen for idleTimeout, it drops back to RateIdle (2 Hz). The caller provides
// a configSender callback that sends the rate command to the node over WebSocket.
type RateController struct {
mu sync.Mutex
nodes map[string]*nodeRateState // keyed by node MAC
configSender func(nodeMAC string, rateHz int)
}
// NewRateController creates a RateController. configSender is called whenever a
// node's rate should change; it must be goroutine-safe.
func NewRateController(configSender func(nodeMAC string, rateHz int)) *RateController {
return &RateController{
nodes: make(map[string]*nodeRateState),
configSender: configSender,
}
}
// OnMotionState is called after each CSI frame is processed. If the node was idle
// and motion is now detected, it ramps up immediately.
func (rc *RateController) OnMotionState(nodeMAC string, motionDetected bool) {
if !motionDetected {
return
}
rc.mu.Lock()
defer rc.mu.Unlock()
ns := rc.getOrCreate(nodeMAC)
ns.lastMotionAt = time.Now()
if !ns.active {
ns.active = true
rc.configSender(nodeMAC, RateActive)
}
}
// OnMotionHint is called when the ESP32 sends a motion_hint message (on-device
// variance exceeded threshold). Treated identically to a detected motion event.
func (rc *RateController) OnMotionHint(nodeMAC string) {
rc.OnMotionState(nodeMAC, true)
}
// OnNodeDisconnected removes rate state for a disconnected node.
func (rc *RateController) OnNodeDisconnected(nodeMAC string) {
rc.mu.Lock()
delete(rc.nodes, nodeMAC)
rc.mu.Unlock()
}
// Run starts the background goroutine that enforces idle timeouts.
// It returns when ctx is cancelled.
func (rc *RateController) Run(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
rc.checkIdleTimeouts()
}
}
}
func (rc *RateController) checkIdleTimeouts() {
now := time.Now()
rc.mu.Lock()
defer rc.mu.Unlock()
for mac, ns := range rc.nodes {
if ns.active && now.Sub(ns.lastMotionAt) >= idleTimeout {
ns.active = false
rc.configSender(mac, RateIdle)
}
}
}
func (rc *RateController) getOrCreate(nodeMAC string) *nodeRateState {
if ns, ok := rc.nodes[nodeMAC]; ok {
return ns
}
ns := &nodeRateState{}
rc.nodes[nodeMAC] = ns
return ns
}

View file

@ -0,0 +1,143 @@
package ingestion
import (
"testing"
"time"
)
type rateSend struct {
mac string
rate int
}
func newTestRC() (*RateController, *[]rateSend) {
sent := &[]rateSend{}
rc := NewRateController(func(mac string, rateHz int) {
*sent = append(*sent, rateSend{mac, rateHz})
})
return rc, sent
}
func TestIdleToActive(t *testing.T) {
rc, sent := newTestRC()
rc.OnMotionState("AA:BB:CC:DD:EE:FF", true)
if len(*sent) != 1 {
t.Fatalf("expected 1 config send, got %d", len(*sent))
}
if (*sent)[0].rate != RateActive {
t.Errorf("expected RateActive (%d), got %d", RateActive, (*sent)[0].rate)
}
if (*sent)[0].mac != "AA:BB:CC:DD:EE:FF" {
t.Errorf("unexpected mac %q", (*sent)[0].mac)
}
}
func TestActiveNoRedundantSend(t *testing.T) {
rc, sent := newTestRC()
// First motion event: ramp up
rc.OnMotionState("AA:BB:CC:DD:EE:FF", true)
// Subsequent motion events while active: no additional sends
rc.OnMotionState("AA:BB:CC:DD:EE:FF", true)
rc.OnMotionState("AA:BB:CC:DD:EE:FF", true)
if len(*sent) != 1 {
t.Errorf("expected 1 send, got %d (redundant sends should be suppressed)", len(*sent))
}
}
func TestNoSendOnNoMotion(t *testing.T) {
rc, sent := newTestRC()
rc.OnMotionState("AA:BB:CC:DD:EE:FF", false)
if len(*sent) != 0 {
t.Errorf("expected 0 sends for no-motion event, got %d", len(*sent))
}
}
func TestIdleTimeoutDropsRate(t *testing.T) {
rc, sent := newTestRC()
// Ramp up
rc.OnMotionState("AA:BB:CC:DD:EE:FF", true)
// Manually set lastMotionAt far in the past to simulate timeout
rc.mu.Lock()
rc.nodes["AA:BB:CC:DD:EE:FF"].lastMotionAt = time.Now().Add(-idleTimeout - time.Second)
rc.mu.Unlock()
rc.checkIdleTimeouts()
if len(*sent) != 2 {
t.Fatalf("expected 2 sends (active + idle), got %d", len(*sent))
}
if (*sent)[1].rate != RateIdle {
t.Errorf("expected RateIdle (%d) after timeout, got %d", RateIdle, (*sent)[1].rate)
}
}
func TestIdleTimeoutNotTriggeredEarly(t *testing.T) {
rc, sent := newTestRC()
rc.OnMotionState("AA:BB:CC:DD:EE:FF", true)
initialSends := len(*sent)
// Timeout has not elapsed
rc.checkIdleTimeouts()
if len(*sent) != initialSends {
t.Errorf("expected no new sends before timeout, got %d new sends", len(*sent)-initialSends)
}
}
func TestMotionHint(t *testing.T) {
rc, sent := newTestRC()
rc.OnMotionHint("AA:BB:CC:DD:EE:FF")
if len(*sent) != 1 || (*sent)[0].rate != RateActive {
t.Errorf("OnMotionHint should ramp to active; sends=%v", *sent)
}
}
func TestActiveToIdleAndBackToActive(t *testing.T) {
rc, sent := newTestRC()
// Ramp up
rc.OnMotionState("AA:BB:CC:DD:EE:FF", true)
// Force timeout
rc.mu.Lock()
rc.nodes["AA:BB:CC:DD:EE:FF"].lastMotionAt = time.Now().Add(-idleTimeout - time.Second)
rc.mu.Unlock()
rc.checkIdleTimeouts()
// Motion detected again
rc.OnMotionState("AA:BB:CC:DD:EE:FF", true)
// Should have: active, idle, active
if len(*sent) != 3 {
t.Fatalf("expected 3 sends, got %d: %v", len(*sent), *sent)
}
if (*sent)[2].rate != RateActive {
t.Errorf("expected third send to be RateActive, got %d", (*sent)[2].rate)
}
}
func TestNodeDisconnectClearsState(t *testing.T) {
rc, _ := newTestRC()
rc.OnMotionState("AA:BB:CC:DD:EE:FF", true)
rc.OnNodeDisconnected("AA:BB:CC:DD:EE:FF")
rc.mu.Lock()
_, exists := rc.nodes["AA:BB:CC:DD:EE:FF"]
rc.mu.Unlock()
if exists {
t.Error("node state should be removed after disconnect")
}
}

View file

@ -9,6 +9,7 @@ import (
"time"
"github.com/gorilla/websocket"
"github.com/spaxel/mothership/internal/signal"
)
// CSIBroadcaster is a callback for broadcasting CSI frames to dashboard
@ -19,11 +20,32 @@ type CSIBroadcaster interface {
BroadcastLinkActive(linkID, nodeMAC, peerMAC string)
}
// MotionBroadcaster broadcasts motion state changes to dashboard clients.
type MotionBroadcaster interface {
BroadcastMotionState(states []MotionStateItem)
}
// MotionStateItem represents a single link's current motion state.
type MotionStateItem struct {
LinkID string `json:"link_id"`
MotionDetected bool `json:"motion_detected"`
DeltaRMS float64 `json:"delta_rms"`
}
// ReplayAppender appends raw CSI frames to a persistent store.
type ReplayAppender interface {
Append(recvTimeNS int64, rawFrame []byte) error
}
// Server manages WebSocket connections from ESP32 nodes
type Server struct {
mu sync.RWMutex
mu sync.RWMutex
connections map[string]*NodeConnection // keyed by MAC
links map[string]*RingBuffer // keyed by "nodeMAC:peerMAC"
links map[string]*RingBuffer // keyed by "nodeMAC:peerMAC"
// Motion state per link (for change detection and state queries)
linkMotionState map[string]bool // linkID -> motionDetected
linkDeltaRMS map[string]float64 // linkID -> smoothDeltaRMS
// Malformed frame tracking per connection
malformedCounts map[string]*malformedCounter
@ -34,19 +56,23 @@ type Server struct {
// Shutdown state
shutdown bool
// Dashboard broadcaster (optional)
// Optional pipeline components (set via setters)
dashboardBroadcaster CSIBroadcaster
motionBroadcaster MotionBroadcaster
processorMgr *signal.ProcessorManager
replayStore ReplayAppender
rateCtrl *RateController
}
// NodeConnection tracks state for a connected node
type NodeConnection struct {
MAC string
Conn *websocket.Conn
Hello *HelloMessage
LastHealth *HealthMessage
LastHealthTime time.Time
ConnectedAt time.Time
LastFrameTime time.Time
MAC string
Conn *websocket.Conn
Hello *HelloMessage
LastHealth *HealthMessage
LastHealthTime time.Time
ConnectedAt time.Time
LastFrameTime time.Time
// Write mutex for thread-safe sends
writeMu sync.Mutex
@ -74,9 +100,10 @@ func NewServer() *Server {
return &Server{
connections: make(map[string]*NodeConnection),
links: make(map[string]*RingBuffer),
linkMotionState: make(map[string]bool),
linkDeltaRMS: make(map[string]float64),
malformedCounts: make(map[string]*malformedCounter),
upgrader: websocket.Upgrader{
// Allow all origins for development (TODO: restrict in production)
CheckOrigin: func(r *http.Request) bool {
return true
},
@ -93,25 +120,60 @@ func (s *Server) SetDashboardBroadcaster(broadcaster CSIBroadcaster) {
s.mu.Unlock()
}
// SetMotionBroadcaster sets the callback for broadcasting motion state changes.
func (s *Server) SetMotionBroadcaster(mb MotionBroadcaster) {
s.mu.Lock()
s.motionBroadcaster = mb
s.mu.Unlock()
}
// SetProcessorManager sets the signal processing pipeline.
func (s *Server) SetProcessorManager(pm *signal.ProcessorManager) {
s.mu.Lock()
s.processorMgr = pm
s.mu.Unlock()
}
// SetReplayStore sets the disk-backed recording store.
func (s *Server) SetReplayStore(store ReplayAppender) {
s.mu.Lock()
s.replayStore = store
s.mu.Unlock()
}
// SetRateController sets the adaptive rate controller.
func (s *Server) SetRateController(rc *RateController) {
s.mu.Lock()
s.rateCtrl = rc
s.mu.Unlock()
}
// SendConfigToMAC sends a rate config command to a connected node by MAC.
func (s *Server) SendConfigToMAC(mac string, rateHz int) {
s.mu.RLock()
nc, ok := s.connections[mac]
s.mu.RUnlock()
if !ok {
return
}
s.sendConfig(nc, rateHz, 0, 0)
}
// HandleNodeWS handles WebSocket connections at /ws/node
func (s *Server) HandleNodeWS(w http.ResponseWriter, r *http.Request) {
// Upgrade HTTP connection to WebSocket
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("[WARN] WebSocket upgrade failed: %v", err)
return
}
// Set initial read deadline
conn.SetReadDeadline(time.Now().Add(readDeadline))
// Create connection state
nc := &NodeConnection{
Conn: conn,
ConnectedAt: time.Now(),
}
// Wait for hello message (must be first)
_, msg, err := conn.ReadMessage()
if err != nil {
log.Printf("[WARN] Failed to read hello: %v", err)
@ -119,7 +181,6 @@ func (s *Server) HandleNodeWS(w http.ResponseWriter, r *http.Request) {
return
}
// Parse as JSON (hello must be JSON)
parsed, err := ParseJSONMessage(msg)
if err != nil {
s.sendReject(conn, "invalid hello format")
@ -137,9 +198,7 @@ func (s *Server) HandleNodeWS(w http.ResponseWriter, r *http.Request) {
nc.MAC = hello.MAC
nc.Hello = hello
// Register connection
s.mu.Lock()
// Close existing connection from same MAC if present
if existing, exists := s.connections[hello.MAC]; exists {
existing.Conn.Close()
}
@ -151,19 +210,14 @@ func (s *Server) HandleNodeWS(w http.ResponseWriter, r *http.Request) {
log.Printf("[INFO] Node connected: MAC=%s firmware=%s chip=%s",
hello.MAC, hello.FirmwareVersion, hello.Chip)
// Broadcast node connected to dashboard
if broadcaster != nil {
broadcaster.BroadcastNodeConnected(hello.MAC, hello.FirmwareVersion, hello.Chip)
}
// Send initial role and config
s.sendRole(nc, "rx", "")
s.sendConfig(nc, 20, 0, 0) // 20 Hz default
s.sendConfig(nc, 20, 0, 0)
// Start ping goroutine
go s.pingLoop(nc)
// Message handling loop
s.handleMessages(nc)
}
@ -175,18 +229,20 @@ func (s *Server) handleMessages(nc *NodeConnection) {
delete(s.connections, nc.MAC)
delete(s.malformedCounts, nc.MAC)
broadcaster := s.dashboardBroadcaster
rateCtrl := s.rateCtrl
s.mu.Unlock()
log.Printf("[INFO] Node disconnected: MAC=%s", nc.MAC)
// Broadcast node disconnected to dashboard
if broadcaster != nil {
broadcaster.BroadcastNodeDisconnected(nc.MAC)
}
if rateCtrl != nil {
rateCtrl.OnNodeDisconnected(nc.MAC)
}
}()
for {
// Reset read deadline on each message
nc.Conn.SetReadDeadline(time.Now().Add(readDeadline))
messageType, data, err := nc.Conn.ReadMessage()
@ -213,10 +269,22 @@ func (s *Server) handleBinaryFrame(nc *NodeConnection, data []byte) {
return
}
// Update last frame time
nc.LastFrameTime = time.Now()
recvTime := nc.LastFrameTime
// Get or create ring buffer for this link
s.mu.RLock()
replay := s.replayStore
pm := s.processorMgr
s.mu.RUnlock()
// 1. Record raw frame to disk before any processing.
if replay != nil {
if err := replay.Append(recvTime.UnixNano(), data); err != nil {
log.Printf("[WARN] Replay append error: %v", err)
}
}
// 2. Get or create ring buffer.
linkID := frame.LinkID()
s.mu.Lock()
ring, exists := s.links[linkID]
@ -228,26 +296,54 @@ func (s *Server) handleBinaryFrame(nc *NodeConnection, data []byte) {
broadcaster := s.dashboardBroadcaster
s.mu.Unlock()
// Push frame to ring buffer
ring.Push(frame, time.Now())
ring.Push(frame, recvTime)
// Broadcast to dashboard clients
if broadcaster != nil {
// Forward raw binary frame to dashboard
broadcaster.BroadcastCSI(frame.MACString(), frame.PeerMACString(), data)
// Notify of new link
if isNewLink {
broadcaster.BroadcastLinkActive(linkID, frame.MACString(), frame.PeerMACString())
}
}
// 3. Signal processing pipeline.
if pm != nil && int(frame.NSub) > 0 {
result, err := pm.Process(linkID, frame.Payload, frame.RSSI, int(frame.NSub), recvTime)
if err != nil {
log.Printf("[DEBUG] Signal processing error for %s: %v", linkID, err)
return
}
motionDetected := result.Features.MotionDetected
deltaRMS := result.Features.SmoothDeltaRMS
// Check if motion state changed.
s.mu.Lock()
prev := s.linkMotionState[linkID]
stateChanged := prev != motionDetected
s.linkMotionState[linkID] = motionDetected
s.linkDeltaRMS[linkID] = deltaRMS
mb := s.motionBroadcaster
rateCtrl := s.rateCtrl
s.mu.Unlock()
if stateChanged && mb != nil {
mb.BroadcastMotionState([]MotionStateItem{{
LinkID: linkID,
MotionDetected: motionDetected,
DeltaRMS: deltaRMS,
}})
}
if rateCtrl != nil {
rateCtrl.OnMotionState(nc.MAC, motionDetected)
}
}
}
// handleJSONMessage processes a JSON control message
func (s *Server) handleJSONMessage(nc *NodeConnection, data []byte) {
parsed, err := ParseJSONMessage(data)
if err != nil {
// Unknown types are silently ignored per protocol
return
}
@ -255,13 +351,17 @@ func (s *Server) handleJSONMessage(nc *NodeConnection, data []byte) {
case *HealthMessage:
nc.LastHealth = msg
nc.LastHealthTime = time.Now()
// TODO: expose health metrics
case *BLEMessage:
// TODO: forward BLE data to identity matcher
case *MotionHintMessage:
// TODO: trigger adaptive rate changes
s.mu.RLock()
rateCtrl := s.rateCtrl
s.mu.RUnlock()
if rateCtrl != nil {
rateCtrl.OnMotionHint(nc.MAC)
}
case *OTAStatusMessage:
// TODO: track OTA progress
@ -278,7 +378,6 @@ func (s *Server) recordMalformed(mac string) {
return
}
// Reset counter if window has passed
if time.Since(counter.firstSeen) > malformedWindow {
counter.count = 0
counter.firstSeen = time.Now()
@ -286,12 +385,10 @@ func (s *Server) recordMalformed(mac string) {
counter.count++
// Warn at 100
if counter.count == malformedWarnThreshold {
log.Printf("[WARN] Node %s sending malformed CSI frames (count=%d)", mac, counter.count)
}
// Close at 1000
if counter.count >= malformedCloseThreshold {
log.Printf("[ERROR] Node %s exceeded malformed frame threshold, closing connection", mac)
if nc, exists := s.connections[mac]; exists {
@ -311,10 +408,9 @@ func (s *Server) pingLoop(nc *NodeConnection) {
nc.writeMu.Unlock()
if err != nil {
return // Connection closed
return
}
// Check for shutdown
s.mu.RLock()
shutdown := s.shutdown
s.mu.RUnlock()
@ -365,7 +461,6 @@ func (s *Server) Shutdown(ctx context.Context) {
s.mu.Lock()
s.shutdown = true
// Send shutdown message to all connected nodes
shutdownMsg := ShutdownMessage{Type: "shutdown", ReconnectInMS: 30000}
data, _ := json.Marshal(shutdownMsg)
@ -431,7 +526,6 @@ func (s *Server) GetAllLinksInfo() []LinkInfo {
links := make([]LinkInfo, 0, len(s.links))
for linkID := range s.links {
// Parse linkID format "nodeMAC:peerMAC" (17 chars + 1 colon + 17 chars)
if len(linkID) >= 35 {
links = append(links, LinkInfo{
ID: linkID,
@ -443,6 +537,22 @@ func (s *Server) GetAllLinksInfo() []LinkInfo {
return links
}
// GetAllMotionStates returns current motion state for all known links.
func (s *Server) GetAllMotionStates() []MotionStateItem {
s.mu.RLock()
defer s.mu.RUnlock()
states := make([]MotionStateItem, 0, len(s.linkMotionState))
for linkID, detected := range s.linkMotionState {
states = append(states, MotionStateItem{
LinkID: linkID,
MotionDetected: detected,
DeltaRMS: s.linkDeltaRMS[linkID],
})
}
return states
}
// GetLinkBuffer returns the ring buffer for a specific link
func (s *Server) GetLinkBuffer(nodeMAC, peerMAC string) *RingBuffer {
linkID := nodeMAC + ":" + peerMAC

View file

@ -0,0 +1,232 @@
// Package replay provides a disk-backed circular buffer for raw CSI frames.
//
// File layout:
//
// Header (32 bytes):
// magic[8] "SPAXLREP"
// writePos[8] uint64 LE — absolute file offset of next write
// oldestPos[8] uint64 LE — absolute file offset of oldest valid record (0 = empty)
// wrapPos[8] uint64 LE — writePos at last wrap point (0 = no pending wrap)
//
// Record (10 + frameLen bytes):
// recvTimeNS[8] int64 LE — Unix nanosecond receive timestamp
// frameLen[2] uint16 LE — length of following frame bytes
// frameData[N] raw CSI frame bytes
//
// Eviction: when the oldest record's successor would reach wrapPos, oldest wraps
// to headerSize. New writes evict oldest records as needed to make room.
package replay
import (
"encoding/binary"
"errors"
"os"
"sync"
)
const (
fileMagic = "SPAXLREP"
headerSize = int64(32) // magic(8) + writePos(8) + oldestPos(8) + wrapPos(8)
recordOverhead = int64(10) // recvTimeNS(8) + frameLen(2)
maxFrameBytes = int64(280) // per plan: max CSI frame = 24 + 128*2
// DefaultMaxMB is the default recording buffer capacity in megabytes (~48h at 20 Hz, 20 links).
DefaultMaxMB = 360
)
// RecordingStore is a disk-backed circular buffer for raw CSI frames.
// It is safe for concurrent use.
type RecordingStore struct {
mu sync.Mutex
f *os.File
fileSize int64 // total file size including header
writePos int64 // absolute file offset of next write
oldestPos int64 // absolute file offset of oldest valid record (0 = empty)
wrapPos int64 // writePos at time of last wrap (0 = no pending wrap)
}
// NewRecordingStore opens or creates a recording store at path.
// maxMB is the data capacity; pass 0 to use DefaultMaxMB.
func NewRecordingStore(path string, maxMB int) (*RecordingStore, error) {
if maxMB <= 0 {
maxMB = DefaultMaxMB
}
fileSize := headerSize + int64(maxMB)*1024*1024
if fileSize-headerSize < maxFrameBytes+recordOverhead {
return nil, errors.New("replay: maxMB too small for a single record")
}
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
s := &RecordingStore{
f: f,
fileSize: fileSize,
}
info, err := f.Stat()
if err != nil {
f.Close()
return nil, err
}
if info.Size() >= headerSize {
if herr := s.readHeader(); herr == nil && s.headerValid() {
// Grow file to new size if needed
if info.Size() < fileSize {
if terr := f.Truncate(fileSize); terr != nil {
f.Close()
return nil, terr
}
}
return s, nil
}
}
// Fresh store
s.writePos = headerSize
s.oldestPos = 0
s.wrapPos = 0
if err := f.Truncate(fileSize); err != nil {
f.Close()
return nil, err
}
if err := s.syncHeader(); err != nil {
f.Close()
return nil, err
}
return s, nil
}
// Append writes a raw CSI frame to the store.
func (s *RecordingStore) Append(recvTimeNS int64, rawFrame []byte) error {
frameLen := int64(len(rawFrame))
if frameLen > maxFrameBytes {
return errors.New("replay: frame exceeds maximum size")
}
recordSize := recordOverhead + frameLen
if recordSize > s.fileSize-headerSize {
return errors.New("replay: buffer too small for record")
}
s.mu.Lock()
defer s.mu.Unlock()
// Wrap writePos if record won't fit before end of file.
if s.writePos+recordSize > s.fileSize {
s.wrapPos = s.writePos
s.writePos = headerSize
}
// Evict oldest records that fall within [writePos, writePos+recordSize).
for s.hasData() && s.oldestPos >= s.writePos && s.oldestPos < s.writePos+recordSize {
if err := s.evictOne(); err != nil {
return err
}
}
wasEmpty := !s.hasData()
// Build record.
buf := make([]byte, recordSize)
binary.LittleEndian.PutUint64(buf[0:8], uint64(recvTimeNS))
binary.LittleEndian.PutUint16(buf[8:10], uint16(frameLen))
copy(buf[10:], rawFrame)
if _, err := s.f.WriteAt(buf, s.writePos); err != nil {
return err
}
if wasEmpty {
s.oldestPos = s.writePos
}
s.writePos += recordSize
return s.syncHeader()
}
// WritePos returns the current write position (for diagnostics).
func (s *RecordingStore) WritePos() int64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.writePos
}
// Close closes the underlying file.
func (s *RecordingStore) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.f.Close()
}
// hasData reports whether there are any valid records.
func (s *RecordingStore) hasData() bool {
return s.oldestPos != 0
}
// evictOne advances oldestPos past the oldest record.
func (s *RecordingStore) evictOne() error {
if !s.hasData() {
return nil
}
var lenBuf [2]byte
if _, err := s.f.ReadAt(lenBuf[:], s.oldestPos+8); err != nil {
return err
}
frameLen := int64(binary.LittleEndian.Uint16(lenBuf[:]))
if frameLen > maxFrameBytes {
// Corrupted record; reset state to recover.
s.oldestPos = 0
s.wrapPos = 0
return nil
}
nextPos := s.oldestPos + recordOverhead + frameLen
// When oldest has consumed the right arc, wrap it to the data start.
if s.wrapPos != 0 && nextPos >= s.wrapPos {
nextPos = headerSize
s.wrapPos = 0
}
if nextPos == s.writePos {
s.oldestPos = 0 // buffer is now empty
} else {
s.oldestPos = nextPos
}
return nil
}
func (s *RecordingStore) headerValid() bool {
return s.writePos >= headerSize && s.writePos <= s.fileSize &&
(s.oldestPos == 0 || (s.oldestPos >= headerSize && s.oldestPos <= s.fileSize)) &&
(s.wrapPos == 0 || (s.wrapPos >= headerSize && s.wrapPos <= s.fileSize))
}
func (s *RecordingStore) readHeader() error {
var buf [32]byte
if _, err := s.f.ReadAt(buf[:], 0); err != nil {
return err
}
if string(buf[0:8]) != fileMagic {
return errors.New("replay: invalid magic")
}
s.writePos = int64(binary.LittleEndian.Uint64(buf[8:16]))
s.oldestPos = int64(binary.LittleEndian.Uint64(buf[16:24]))
s.wrapPos = int64(binary.LittleEndian.Uint64(buf[24:32]))
return nil
}
func (s *RecordingStore) syncHeader() error {
var buf [32]byte
copy(buf[0:8], fileMagic)
binary.LittleEndian.PutUint64(buf[8:16], uint64(s.writePos))
binary.LittleEndian.PutUint64(buf[16:24], uint64(s.oldestPos))
binary.LittleEndian.PutUint64(buf[24:32], uint64(s.wrapPos))
_, err := s.f.WriteAt(buf[:], 0)
return err
}

View file

@ -0,0 +1,205 @@
package replay
import (
"os"
"path/filepath"
"testing"
)
func tempStore(t *testing.T, maxMB int) (*RecordingStore, string) {
t.Helper()
path := filepath.Join(t.TempDir(), "test_replay.bin")
s, err := NewRecordingStore(path, maxMB)
if err != nil {
t.Fatalf("NewRecordingStore: %v", err)
}
return s, path
}
func makeFrame(size int) []byte {
b := make([]byte, size)
for i := range b {
b[i] = byte(i % 251)
}
return b
}
func TestNewStore(t *testing.T) {
s, _ := tempStore(t, 1)
defer s.Close()
if s.writePos != headerSize {
t.Errorf("writePos = %d, want %d", s.writePos, headerSize)
}
if s.hasData() {
t.Error("new store should be empty")
}
}
func TestBasicAppend(t *testing.T) {
s, _ := tempStore(t, 1)
defer s.Close()
frame := makeFrame(152) // typical 64-subcarrier frame
if err := s.Append(1000, frame); err != nil {
t.Fatalf("Append: %v", err)
}
expected := headerSize + recordOverhead + int64(len(frame))
if s.writePos != expected {
t.Errorf("writePos = %d, want %d", s.writePos, expected)
}
if !s.hasData() {
t.Error("store should have data after append")
}
if s.oldestPos != headerSize {
t.Errorf("oldestPos = %d, want %d", s.oldestPos, headerSize)
}
}
func TestMultipleAppends(t *testing.T) {
s, _ := tempStore(t, 1)
defer s.Close()
frame := makeFrame(152)
for i := 0; i < 5; i++ {
if err := s.Append(int64(i*1000), frame); err != nil {
t.Fatalf("Append %d: %v", i, err)
}
}
expectedPos := headerSize + 5*(recordOverhead+int64(len(frame)))
if s.writePos != expectedPos {
t.Errorf("writePos = %d, want %d", s.writePos, expectedPos)
}
}
func TestWrapAround(t *testing.T) {
// Use a tiny 1-byte "MB" which gives ~1 MB buffer. Use a custom small size via
// direct manipulation for testing. Instead, use a small-but-valid maxMB.
// 1 MB data area fits floor(1MB / (10+152)) = ~6501 frames.
s, _ := tempStore(t, 1)
defer s.Close()
frame := makeFrame(152)
recordSize := recordOverhead + int64(len(frame))
// Calculate how many records fit before a wrap
dataArea := s.fileSize - headerSize
recsBeforeWrap := int(dataArea / recordSize) // integer division
// Fill to just before wrap
for i := 0; i < recsBeforeWrap; i++ {
if err := s.Append(int64(i), frame); err != nil {
t.Fatalf("Append %d before wrap: %v", i, err)
}
}
beforeWrapPos := s.writePos
firstOldest := s.oldestPos
// One more append should trigger wrap + eviction
if err := s.Append(int64(recsBeforeWrap), frame); err != nil {
t.Fatalf("Append triggering wrap: %v", err)
}
// writePos should have wrapped and advanced from headerSize
if s.writePos >= beforeWrapPos {
t.Errorf("writePos %d should have wrapped (was %d before wrap)", s.writePos, beforeWrapPos)
}
// oldest should have advanced (eviction happened)
if s.oldestPos == firstOldest && s.oldestPos != 0 {
t.Errorf("oldestPos %d should have advanced after wrap (first was %d)", s.oldestPos, firstOldest)
}
}
func TestCrashRecovery(t *testing.T) {
frame := makeFrame(152)
dir := t.TempDir()
path := filepath.Join(dir, "replay.bin")
// Write some frames
s1, err := NewRecordingStore(path, 1)
if err != nil {
t.Fatal(err)
}
for i := 0; i < 3; i++ {
if err := s1.Append(int64(i*1000), frame); err != nil {
t.Fatalf("s1.Append %d: %v", i, err)
}
}
savedWrite := s1.writePos
savedOldest := s1.oldestPos
s1.Close()
// Reopen should restore state
s2, err := NewRecordingStore(path, 1)
if err != nil {
t.Fatal(err)
}
defer s2.Close()
if s2.writePos != savedWrite {
t.Errorf("writePos after reopen = %d, want %d", s2.writePos, savedWrite)
}
if s2.oldestPos != savedOldest {
t.Errorf("oldestPos after reopen = %d, want %d", s2.oldestPos, savedOldest)
}
}
func TestEvictionMaintainsData(t *testing.T) {
// Use a very small buffer: headerSize + 3 records fit
// We do this by checking eviction logic directly, using
// a tiny 1MB store (already tested above). Simulate eviction
// explicitly by calling evictOne after appending.
s, _ := tempStore(t, 1)
defer s.Close()
frame := makeFrame(100)
// Fill buffer just past the point where eviction must occur
dataArea := s.fileSize - headerSize
recordSize := recordOverhead + int64(len(frame))
count := int(dataArea/recordSize) + 5 // force wraps
for i := 0; i < count; i++ {
if err := s.Append(int64(i), frame); err != nil {
t.Fatalf("Append %d: %v", i, err)
}
}
// Store should still be valid (not returning errors)
if s.writePos < headerSize || s.writePos > s.fileSize {
t.Errorf("writePos %d out of range [%d, %d]", s.writePos, headerSize, s.fileSize)
}
}
func TestInvalidMagicStartsFresh(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "bad.bin")
// Write garbage
if err := os.WriteFile(path, []byte("GARBAGE_DATA_123456789012345678901234567890"), 0644); err != nil {
t.Fatal(err)
}
s, err := NewRecordingStore(path, 1)
if err != nil {
t.Fatalf("should recover from bad magic: %v", err)
}
defer s.Close()
if s.writePos != headerSize {
t.Errorf("writePos = %d after bad magic, want %d", s.writePos, headerSize)
}
}
func TestFrameTooLarge(t *testing.T) {
s, _ := tempStore(t, 1)
defer s.Close()
oversized := make([]byte, maxFrameBytes+1)
if err := s.Append(0, oversized); err == nil {
t.Error("expected error for oversized frame")
}
}