diff --git a/mothership/cmd/mothership/main.go b/mothership/cmd/mothership/main.go index e59a147..a6c1cbe 100644 --- a/mothership/cmd/mothership/main.go +++ b/mothership/cmd/mothership/main.go @@ -18,7 +18,9 @@ import ( "github.com/go-chi/chi/middleware" "github.com/hashicorp/mdns" "github.com/spaxel/mothership/internal/dashboard" + "github.com/spaxel/mothership/internal/fleet" "github.com/spaxel/mothership/internal/ingestion" + "github.com/spaxel/mothership/internal/recorder" "github.com/spaxel/mothership/internal/replay" sigproc "github.com/spaxel/mothership/internal/signal" ) @@ -84,6 +86,29 @@ func main() { } } + // Per-link CSI recorder + recorderDir := filepath.Join(cfg.DataDir, "csi") + recMgr, err := recorder.NewManager(recorder.DefaultConfig(recorderDir)) + if err != nil { + log.Printf("[WARN] Failed to create recorder: %v (per-link recording disabled)", err) + } else { + ingestSrv.SetRecorder(recMgr) + defer recMgr.Close() + log.Printf("[INFO] Per-link CSI recorder at %s (retention=%dh, max=%dMB/link)", + recorderDir, recorder.DefaultConfig(recorderDir).RetentionHours, + recorder.DefaultConfig(recorderDir).MaxBytesPerLink/1<<20) + } + + // Fleet node registry and manager + fleetReg, err := fleet.NewRegistry(filepath.Join(cfg.DataDir, "fleet.db")) + if err != nil { + log.Fatalf("[FATAL] Failed to open fleet registry: %v", err) + } + defer fleetReg.Close() + fleetMgr := fleet.NewManager(fleetReg) + ingestSrv.SetFleetNotifier(fleetMgr) + log.Printf("[INFO] Fleet registry at %s", filepath.Join(cfg.DataDir, "fleet.db")) + // Adaptive rate controller rateCtrl := ingestion.NewRateController(func(mac string, rateHz int, varianceThreshold float64) { ingestSrv.SendConfigToMAC(mac, rateHz, varianceThreshold) @@ -101,6 +126,15 @@ func main() { ingestSrv.SetDashboardBroadcaster(dashboardHub) ingestSrv.SetMotionBroadcaster(dashboardHub) + // Wire fleet notifier/broadcaster and start self-healing loop + fleetMgr.SetNotifier(ingestSrv) + fleetMgr.SetBroadcaster(dashboardHub) + go fleetMgr.Run(ctx) + + // Fleet REST API + fleetHandler := fleet.NewHandler(fleetMgr) + fleetHandler.RegisterRoutes(r) + go dashboardHub.Run() r.HandleFunc("/ws/dashboard", dashboardSrv.HandleDashboardWS) diff --git a/mothership/internal/ingestion/server.go b/mothership/internal/ingestion/server.go index f6dccb6..a36ba0a 100644 --- a/mothership/internal/ingestion/server.go +++ b/mothership/internal/ingestion/server.go @@ -44,6 +44,11 @@ type ReplayAppender interface { Append(recvTimeNS int64, rawFrame []byte) error } +// Recorder records raw CSI frames to per-link segment files. +type Recorder interface { + Write(linkID string, frame []byte) +} + // Server manages WebSocket connections from ESP32 nodes type Server struct { mu sync.RWMutex @@ -68,6 +73,7 @@ type Server struct { motionBroadcaster MotionBroadcaster processorMgr *signal.ProcessorManager replayStore ReplayAppender + recorder Recorder rateCtrl *RateController fleetNotifier FleetNotifier } @@ -149,6 +155,13 @@ func (s *Server) SetReplayStore(store ReplayAppender) { s.mu.Unlock() } +// SetRecorder sets the per-link CSI frame recorder. +func (s *Server) SetRecorder(r Recorder) { + s.mu.Lock() + s.recorder = r + s.mu.Unlock() +} + // SetRateController sets the adaptive rate controller. func (s *Server) SetRateController(rc *RateController) { s.mu.Lock() @@ -333,6 +346,7 @@ func (s *Server) handleBinaryFrame(nc *NodeConnection, data []byte) { s.mu.RLock() replay := s.replayStore + rec := s.recorder pm := s.processorMgr s.mu.RUnlock() @@ -342,6 +356,9 @@ func (s *Server) handleBinaryFrame(nc *NodeConnection, data []byte) { log.Printf("[WARN] Replay append error: %v", err) } } + if rec != nil { + rec.Write(frame.LinkID(), data) + } // 2. Get or create ring buffer. linkID := frame.LinkID() diff --git a/mothership/internal/recorder/manager.go b/mothership/internal/recorder/manager.go new file mode 100644 index 0000000..e794b1c --- /dev/null +++ b/mothership/internal/recorder/manager.go @@ -0,0 +1,346 @@ +package recorder + +import ( + "fmt" + "log" + "os" + "path/filepath" + "sync" + "time" +) + +const ( + defaultRetentionHours = 48 + defaultMaxBytesPerLink = int64(1 << 30) // 1 GB + defaultBufferSize = 1000 + defaultCleanupInterval = time.Hour +) + +// Config holds recorder configuration. +type Config struct { + DataDir string // Base directory for segment files + RetentionHours int // Hours to retain segment files (default: 48) + MaxBytesPerLink int64 // Max bytes per link as secondary guard (default: 1 GB) + BufferSize int // Per-link buffered channel capacity (default: 1000) + CleanupInterval time.Duration // Cleanup sweep interval (default: 1 hour) +} + +// DefaultConfig returns a config with sensible defaults. +func DefaultConfig(dataDir string) Config { + return Config{ + DataDir: dataDir, + RetentionHours: defaultRetentionHours, + MaxBytesPerLink: defaultMaxBytesPerLink, + BufferSize: defaultBufferSize, + CleanupInterval: defaultCleanupInterval, + } +} + +// Manager manages per-link CSI frame recorders. +// It is safe for concurrent use. +type Manager struct { + mu sync.RWMutex + config Config + links map[string]*linkRecorder + done chan struct{} + wg sync.WaitGroup +} + +type linkRecorder struct { + ch chan writeReq + linkID string + dir string +} + +type writeReq struct { + recvTimeNS int64 + frame []byte +} + +// NewManager creates a new recorder manager and starts the cleanup goroutine. +func NewManager(cfg Config) (*Manager, error) { + if err := os.MkdirAll(cfg.DataDir, 0755); err != nil { + return nil, fmt.Errorf("recorder: create data dir: %w", err) + } + if cfg.RetentionHours <= 0 { + cfg.RetentionHours = defaultRetentionHours + } + if cfg.MaxBytesPerLink <= 0 { + cfg.MaxBytesPerLink = defaultMaxBytesPerLink + } + if cfg.BufferSize <= 0 { + cfg.BufferSize = defaultBufferSize + } + if cfg.CleanupInterval <= 0 { + cfg.CleanupInterval = defaultCleanupInterval + } + + m := &Manager{ + config: cfg, + links: make(map[string]*linkRecorder), + done: make(chan struct{}), + } + + m.wg.Add(1) + go m.cleanupLoop() + + return m, nil +} + +// Write writes a raw CSI frame for the given link. +// It does not block the caller. If the per-link buffer is full, +// the frame is dropped with a log warning. +func (m *Manager) Write(linkID string, frame []byte) { + select { + case <-m.done: + return + default: + } + + lr := m.getOrCreateLink(linkID) + + req := writeReq{ + recvTimeNS: time.Now().UnixNano(), + frame: frame, + } + + select { + case lr.ch <- req: + default: + log.Printf("[WARN] Recorder buffer full for link %s, dropping frame", linkID) + } +} + +// ReadFrom returns a channel that yields raw CSI frames for the given link +// from the specified time onwards, in chronological order. +// The channel is closed when all historical frames have been sent. +func (m *Manager) ReadFrom(linkID string, since time.Time) <-chan []byte { + ch := make(chan []byte, 100) + + go func() { + defer close(ch) + + dir := filepath.Join(m.config.DataDir, linkDir(linkID)) + files, err := listSegmentFiles(dir) + if err != nil { + log.Printf("[WARN] Recorder ReadFrom: list segments for %s: %v", linkID, err) + return + } + + sinceNS := since.UnixNano() + for _, f := range files { + if err := ScanSegmentFrom(f, sinceNS, func(_ int64, frame []byte) bool { + select { + case ch <- frame: + case <-m.done: + return false + } + return true + }); err != nil { + log.Printf("[WARN] Recorder ReadFrom: read segment %s: %v", f, err) + return + } + } + }() + + return ch +} + +// AvailableRange returns the time range of available frames for a link. +// Returns the oldest and newest frame timestamps. +func (m *Manager) AvailableRange(linkID string) (start, end time.Time, err error) { + dir := filepath.Join(m.config.DataDir, linkDir(linkID)) + files, err := listSegmentFiles(dir) + if err != nil || len(files) == 0 { + return time.Time{}, time.Time{}, fmt.Errorf("recorder: no data for link %s", linkID) + } + + var firstNS, lastNS int64 + foundFirst, foundLast := false, false + + if err := ScanSegment(files[0], func(ns int64, _ []byte) bool { + firstNS = ns + foundFirst = true + return false + }); err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("recorder: read first segment: %w", err) + } + + if err := ScanSegment(files[len(files)-1], func(ns int64, _ []byte) bool { + lastNS = ns + foundLast = true + return true + }); err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("recorder: read last segment: %w", err) + } + + if !foundFirst || !foundLast { + return time.Time{}, time.Time{}, fmt.Errorf("recorder: no data for link %s", linkID) + } + + return time.Unix(0, firstNS), time.Unix(0, lastNS), nil +} + +// Close gracefully shuts down the manager, flushing all pending writes +// and stopping the cleanup goroutine. +func (m *Manager) Close() { + close(m.done) + + m.mu.Lock() + for _, lr := range m.links { + close(lr.ch) + } + m.mu.Unlock() + + m.wg.Wait() +} + +func (m *Manager) getOrCreateLink(linkID string) *linkRecorder { + m.mu.RLock() + lr, ok := m.links[linkID] + m.mu.RUnlock() + if ok { + return lr + } + + m.mu.Lock() + defer m.mu.Unlock() + + if lr, ok = m.links[linkID]; ok { + return lr + } + + dir := filepath.Join(m.config.DataDir, linkDir(linkID)) + lr = &linkRecorder{ + ch: make(chan writeReq, m.config.BufferSize), + linkID: linkID, + dir: dir, + } + m.links[linkID] = lr + + m.wg.Add(1) + go m.linkWriter(lr) + + return lr +} + +func (m *Manager) linkWriter(lr *linkRecorder) { + defer m.wg.Done() + + var writer *os.File + var curHour time.Time + + flush := func() { + if writer != nil { + writer.Sync() + writer.Close() + writer = nil + } + } + defer flush() + + for req := range lr.ch { + t := time.Unix(0, req.recvTimeNS) + hr := segmentHour(t) + + if writer == nil || hr != curHour { + flush() + curHour = hr + + if err := os.MkdirAll(lr.dir, 0755); err != nil { + log.Printf("[ERROR] Recorder mkdir %s: %v", lr.dir, err) + continue + } + + path := filepath.Join(lr.dir, segmentFileName(hr)) + var err error + writer, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + log.Printf("[ERROR] Recorder open %s: %v", path, err) + continue + } + } + + if err := WriteRecord(writer, req.recvTimeNS, req.frame); err != nil { + log.Printf("[ERROR] Recorder write to %s: %v", filepath.Join(lr.dir, segmentFileName(curHour)), err) + } + } +} + +func (m *Manager) cleanupLoop() { + defer m.wg.Done() + + ticker := time.NewTicker(m.config.CleanupInterval) + defer ticker.Stop() + + m.cleanup() + + for { + select { + case <-ticker.C: + m.cleanup() + case <-m.done: + return + } + } +} + +func (m *Manager) cleanup() { + cutoff := time.Now().UTC().Add(-time.Duration(m.config.RetentionHours) * time.Hour) + + entries, err := os.ReadDir(m.config.DataDir) + if err != nil { + return + } + + for _, e := range entries { + if !e.IsDir() { + continue + } + linkDirPath := filepath.Join(m.config.DataDir, e.Name()) + + files, err := listSegmentFiles(linkDirPath) + if err != nil { + continue + } + + // Delete segment files older than retention period. + for _, f := range files { + name := filepath.Base(f) + st, err := parseSegmentTime(name) + if err != nil { + continue + } + if st.Before(cutoff) { + os.Remove(f) + } + } + + // Enforce MaxBytesPerLink: delete oldest files until under limit. + files, err = listSegmentFiles(linkDirPath) + if err != nil { + continue + } + + var totalSize int64 + fileSizes := make(map[string]int64, len(files)) + for _, f := range files { + info, err := os.Stat(f) + if err != nil { + continue + } + fileSizes[f] = info.Size() + totalSize += info.Size() + } + + for _, f := range files { + if totalSize <= m.config.MaxBytesPerLink { + break + } + sz := fileSizes[f] + if err := os.Remove(f); err == nil { + totalSize -= sz + } + } + } +} diff --git a/mothership/internal/recorder/manager_test.go b/mothership/internal/recorder/manager_test.go new file mode 100644 index 0000000..91a76f3 --- /dev/null +++ b/mothership/internal/recorder/manager_test.go @@ -0,0 +1,427 @@ +package recorder + +import ( + "os" + "path/filepath" + "sync" + "testing" + "time" +) + +func TestWriteAndReadFrom(t *testing.T) { + dir := t.TempDir() + mgr, err := NewManager(Config{ + DataDir: dir, + RetentionHours: 48, + BufferSize: 100, + }) + if err != nil { + t.Fatal(err) + } + defer mgr.Close() + + linkID := "AA:BB:CC:DD:EE:FF:11:22:33:44:55:66" + + // Write frames. + frame1 := []byte("csi-frame-1") + frame2 := []byte("csi-frame-2") + frame3 := []byte("csi-frame-3") + + beforeWrite := time.Now() + mgr.Write(linkID, frame1) + mgr.Write(linkID, frame2) + mgr.Write(linkID, frame3) + // Give the writer goroutine time to flush. + time.Sleep(50 * time.Millisecond) + + // ReadFrom with a time before the writes. + ch := mgr.ReadFrom(linkID, beforeWrite.Add(-time.Second)) + var frames [][]byte + for f := range ch { + frames = append(frames, f) + } + + if len(frames) != 3 { + t.Fatalf("expected 3 frames, got %d", len(frames)) + } + for i, want := range [][]byte{frame1, frame2, frame3} { + if string(frames[i]) != string(want) { + t.Errorf("frame %d: got %q, want %q", i, frames[i], want) + } + } +} + +func TestReadFromWithSince(t *testing.T) { + dir := t.TempDir() + mgr, err := NewManager(Config{ + DataDir: dir, + RetentionHours: 48, + BufferSize: 100, + }) + if err != nil { + t.Fatal(err) + } + defer mgr.Close() + + linkID := "AA:BB:CC:DD:EE:FF:11:22:33:44:55:66" + + mgr.Write(linkID, []byte("frame-1")) + time.Sleep(10 * time.Millisecond) + cutoff := time.Now() + time.Sleep(10 * time.Millisecond) + mgr.Write(linkID, []byte("frame-2")) + mgr.Write(linkID, []byte("frame-3")) + + time.Sleep(50 * time.Millisecond) + + // Read only frames after cutoff. + ch := mgr.ReadFrom(linkID, cutoff) + var frames [][]byte + for f := range ch { + frames = append(frames, f) + } + + if len(frames) != 2 { + t.Fatalf("expected 2 frames after cutoff, got %d", len(frames)) + } + if string(frames[0]) != "frame-2" { + t.Errorf("first frame = %q, want %q", frames[0], "frame-2") + } + if string(frames[1]) != "frame-3" { + t.Errorf("second frame = %q, want %q", frames[1], "frame-3") + } +} + +func TestAvailableRange(t *testing.T) { + dir := t.TempDir() + mgr, err := NewManager(Config{ + DataDir: dir, + RetentionHours: 48, + BufferSize: 100, + }) + if err != nil { + t.Fatal(err) + } + defer mgr.Close() + + linkID := "AA:BB:CC:DD:EE:FF:11:22:33:44:55:66" + + _, _, err = mgr.AvailableRange(linkID) + if err == nil { + t.Error("expected error for no data") + } + + before := time.Now() + mgr.Write(linkID, []byte("frame-1")) + time.Sleep(20 * time.Millisecond) + mgr.Write(linkID, []byte("frame-2")) + time.Sleep(20 * time.Millisecond) + after := time.Now() + + time.Sleep(50 * time.Millisecond) + + start, end, err := mgr.AvailableRange(linkID) + if err != nil { + t.Fatal(err) + } + if start.Before(before) { + t.Errorf("start %v before first write %v", start, before) + } + if end.After(after) { + t.Errorf("end %v after last write %v", end, after) + } + if start.After(end) { + t.Errorf("start %v after end %v", start, end) + } +} + +func TestAvailableRangeNoData(t *testing.T) { + dir := t.TempDir() + mgr, err := NewManager(Config{ + DataDir: dir, + RetentionHours: 48, + BufferSize: 100, + }) + if err != nil { + t.Fatal(err) + } + defer mgr.Close() + + _, _, err = mgr.AvailableRange("AA:BB:CC:DD:EE:FF:11:22:33:44:55:66") + if err == nil { + t.Error("expected error for no data") + } +} + +func TestMultipleLinks(t *testing.T) { + dir := t.TempDir() + mgr, err := NewManager(Config{ + DataDir: dir, + RetentionHours: 48, + BufferSize: 100, + }) + if err != nil { + t.Fatal(err) + } + defer mgr.Close() + + link1 := "AA:BB:CC:DD:EE:FF:11:22:33:44:55:66" + link2 := "AA:BB:CC:DD:EE:FF:AA:BB:CC:DD:EE:FF" + + mgr.Write(link1, []byte("link1-frame")) + mgr.Write(link2, []byte("link2-frame")) + + time.Sleep(50 * time.Millisecond) + + // Each link should have its own data. + ch1 := mgr.ReadFrom(link1, time.Now().Add(-time.Minute)) + var frames1 [][]byte + for f := range ch1 { + frames1 = append(frames1, f) + } + if len(frames1) != 1 || string(frames1[0]) != "link1-frame" { + t.Errorf("link1: got %d frames, want 1 with 'link1-frame'", len(frames1)) + } + + ch2 := mgr.ReadFrom(link2, time.Now().Add(-time.Minute)) + var frames2 [][]byte + for f := range ch2 { + frames2 = append(frames2, f) + } + if len(frames2) != 1 || string(frames2[0]) != "link2-frame" { + t.Errorf("link2: got %d frames, want 1 with 'link2-frame'", len(frames2)) + } +} + +func TestBufferFullDrop(t *testing.T) { + dir := t.TempDir() + mgr, err := NewManager(Config{ + DataDir: dir, + RetentionHours: 48, + BufferSize: 2, // tiny buffer + }) + if err != nil { + t.Fatal(err) + } + defer mgr.Close() + + linkID := "AA:BB:CC:DD:EE:FF:11:22:33:44:55:66" + + // Pause the writer goroutine by not reading from the channel. + // Write more than buffer capacity — some will be dropped. + // The first two will go into the channel; the third may or may not + // depending on timing. With a tiny buffer, at least one should be dropped + // if we flood fast enough. + dropped := false + for i := 0; i < 100; i++ { + mgr.Write(linkID, []byte("frame")) + } + + // Give writer time to process. + time.Sleep(100 * time.Millisecond) + + // Count how many were actually written. + ch := mgr.ReadFrom(linkID, time.Now().Add(-time.Minute)) + count := 0 + for range ch { + count++ + } + + if count >= 100 { + t.Log("warning: no frames were dropped (test timing issue)") + } else { + dropped = true + } + if !dropped { + t.Log("buffer did not drop frames — may need tuning for slow machines") + } +} + +func TestConcurrentWrites(t *testing.T) { + dir := t.TempDir() + mgr, err := NewManager(Config{ + DataDir: dir, + RetentionHours: 48, + BufferSize: 1000, + }) + if err != nil { + t.Fatal(err) + } + defer mgr.Close() + + linkID := "AA:BB:CC:DD:EE:FF:11:22:33:44:55:66" + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for j := 0; j < 100; j++ { + mgr.Write(linkID, []byte{byte(id), byte(j)}) + } + }(i) + } + wg.Wait() + + time.Sleep(100 * time.Millisecond) + + ch := mgr.ReadFrom(linkID, time.Now().Add(-time.Minute)) + count := 0 + for range ch { + count++ + } + + if count != 1000 { + t.Errorf("expected 1000 frames from concurrent writes, got %d", count) + } +} + +func TestCleanupRetention(t *testing.T) { + dir := t.TempDir() + + // Create a link directory with a very old segment file. + linkDirPath := filepath.Join(dir, linkDir("AA:BB:CC:DD:EE:FF:11:22:33:44:55:66")) + if err := os.MkdirAll(linkDirPath, 0755); err != nil { + t.Fatal(err) + } + + // Create an "old" segment file (retention = 1 hour for this test). + oldName := segmentFileName(time.Now().UTC().Add(-2 * time.Hour)) + oldPath := filepath.Join(linkDirPath, oldName) + if err := os.WriteFile(oldPath, nil, 0644); err != nil { + t.Fatal(err) + } + + // Create a "new" segment file. + newName := segmentFileName(time.Now().UTC()) + newPath := filepath.Join(linkDirPath, newName) + if err := os.WriteFile(newPath, nil, 0644); err != nil { + t.Fatal(err) + } + + mgr, err := NewManager(Config{ + DataDir: dir, + RetentionHours: 1, // 1 hour retention + BufferSize: 100, + }) + if err != nil { + t.Fatal(err) + } + + // The initial cleanup should have deleted the old file. + // Give the async cleanup goroutine time to run. + time.Sleep(100 * time.Millisecond) + + if _, err := os.Stat(oldPath); !os.IsNotExist(err) { + t.Error("old segment file should have been deleted by cleanup") + } + if _, err := os.Stat(newPath); os.IsNotExist(err) { + t.Error("new segment file should still exist") + } + + mgr.Close() +} + +func TestCleanupMaxBytesPerLink(t *testing.T) { + dir := t.TempDir() + + linkDirPath := filepath.Join(dir, linkDir("AA:BB:CC:DD:EE:FF:11:22:33:44:55:66")) + if err := os.MkdirAll(linkDirPath, 0755); err != nil { + t.Fatal(err) + } + + // Create three segment files, each 100 bytes. + for i := 0; i < 3; i++ { + name := segmentFileName(time.Now().UTC().Add(time.Duration(i) * time.Hour)) + path := filepath.Join(linkDirPath, name) + if err := os.WriteFile(path, make([]byte, 100), 0644); err != nil { + t.Fatal(err) + } + } + + mgr, err := NewManager(Config{ + DataDir: dir, + RetentionHours: 48, + MaxBytesPerLink: 150, // allow max ~1.5 files + BufferSize: 100, + }) + if err != nil { + t.Fatal(err) + } + + // Cleanup should have deleted enough files to get under 150 bytes. + // Give the async cleanup goroutine time to run. + time.Sleep(100 * time.Millisecond) + + files, err := listSegmentFiles(linkDirPath) + if err != nil { + t.Fatal(err) + } + + var totalSize int64 + for _, f := range files { + info, err := os.Stat(f) + if err != nil { + continue + } + totalSize += info.Size() + } + + if totalSize > 150 { + t.Errorf("total size %d exceeds MaxBytesPerLink 150 after cleanup", totalSize) + } + + mgr.Close() +} + +func TestWriteAfterClose(t *testing.T) { + dir := t.TempDir() + mgr, err := NewManager(Config{ + DataDir: dir, + RetentionHours: 48, + BufferSize: 100, + }) + if err != nil { + t.Fatal(err) + } + + mgr.Close() + + // Write after close should be a no-op, not panic. + mgr.Write("AA:BB:CC:DD:EE:FF:11:22:33:44:55:66", []byte("should-not-write")) +} + +func TestSegmentRotation(t *testing.T) { + dir := t.TempDir() + mgr, err := NewManager(Config{ + DataDir: dir, + RetentionHours: 48, + BufferSize: 100, + }) + if err != nil { + t.Fatal(err) + } + defer mgr.Close() + + linkID := "AA:BB:CC:DD:EE:FF:11:22:33:44:55:66" + + // Write frames in the current hour. + mgr.Write(linkID, []byte("frame-in-hour-1")) + time.Sleep(50 * time.Millisecond) + + // Verify data was written. + _, _, err = mgr.AvailableRange(linkID) + if err != nil { + t.Fatalf("expected data available: %v", err) + } + + // Verify segment file exists in the link directory. + linkDirPath := filepath.Join(dir, linkDir(linkID)) + files, err := listSegmentFiles(linkDirPath) + if err != nil { + t.Fatal(err) + } + if len(files) != 1 { + t.Errorf("expected 1 segment file, got %d", len(files)) + } +} diff --git a/mothership/internal/recorder/segment.go b/mothership/internal/recorder/segment.go new file mode 100644 index 0000000..2ef8b5d --- /dev/null +++ b/mothership/internal/recorder/segment.go @@ -0,0 +1,174 @@ +// Package recorder implements per-link CSI frame recording with 1-hour +// append-only segment files and configurable time-based retention. +// +// Segment file layout (per link directory): +// +// data/{nodeMAC}-{peerMAC}/{YYYYMMDD-HH}.csi +// +// Record format within each segment file: +// +// [4-byte BE uint32: payloadLen][payloadLen bytes] +// payloadLen = 8 + len(rawCSIframe) +// payload = [8-byte BE int64: recvTimeNS Unix nanoseconds][raw CSI frame bytes] +// +// Records are appended in chronological order. Segment files are rotated +// hourly. Background cleanup deletes files older than RetentionHours and +// enforces MaxBytesPerLink as a secondary guard. +package recorder + +import ( + "encoding/binary" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strings" + "time" +) + +const ( + lengthPrefixSize = 4 + timestampSize = 8 + // recordOverhead is the per-record overhead: length prefix + timestamp. + recordOverhead = lengthPrefixSize + timestampSize + // maxFrameBytes is the maximum raw CSI frame size (24 header + 128*2 payload). + maxFrameBytes = 280 + segmentExt = ".csi" +) + +// segmentFileName returns the segment file name for the given time. +// Format: YYYYMMDD-HH.csi (UTC). +func segmentFileName(t time.Time) string { + return fmt.Sprintf("%s-%02d%s", t.UTC().Format("20060102"), t.UTC().Hour(), segmentExt) +} + +// parseSegmentTime parses a segment file name and returns the segment start time (UTC). +func parseSegmentTime(name string) (time.Time, error) { + base := strings.TrimSuffix(name, segmentExt) + if len(base) != 11 || base[8] != '-' { + return time.Time{}, fmt.Errorf("recorder: invalid segment file name: %s", name) + } + t, err := time.ParseInLocation("20060102-15", base, time.UTC) + if err != nil { + return time.Time{}, fmt.Errorf("recorder: invalid segment file name: %s: %w", name, err) + } + return t, nil +} + +// segmentHour returns the start of the hour for the given time in UTC. +func segmentHour(t time.Time) time.Time { + u := t.UTC() + return time.Date(u.Year(), u.Month(), u.Day(), u.Hour(), 0, 0, 0, time.UTC) +} + +// linkDir converts a linkID to a directory name. +// "AA:BB:CC:DD:EE:FF:11:22:33:44:55:66" -> "AA:BB:CC:DD:EE:FF-11:22:33:44:55:66" +func linkDir(linkID string) string { + if len(linkID) == 35 && linkID[17] == ':' { + return linkID[:17] + "-" + linkID[18:] + } + return strings.ReplaceAll(linkID, ":", "-") +} + +// WriteRecord appends a record to f. +// Record: [4-byte BE payloadLen][8-byte BE recvTimeNS][raw frame bytes]. +func WriteRecord(f *os.File, recvTimeNS int64, frame []byte) error { + payloadLen := uint32(timestampSize + len(frame)) + var hdr [recordOverhead]byte + binary.BigEndian.PutUint32(hdr[:4], payloadLen) + binary.BigEndian.PutUint64(hdr[4:12], uint64(recvTimeNS)) + if _, err := f.Write(hdr[:]); err != nil { + return err + } + _, err := f.Write(frame) + return err +} + +// ScanSegment reads all records from a segment file in order. +// Calls fn for each record. If fn returns false, scanning stops. +func ScanSegment(path string, fn func(recvTimeNS int64, frame []byte) bool) error { + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + return scanReader(f, fn) +} + +// ScanSegmentFrom reads records with recvTimeNS >= sinceNS from a segment file. +func ScanSegmentFrom(path string, sinceNS int64, fn func(recvTimeNS int64, frame []byte) bool) error { + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + return scanReader(f, func(recvTimeNS int64, frame []byte) bool { + if recvTimeNS < sinceNS { + return true // skip + } + return fn(recvTimeNS, frame) + }) +} + +// scanReader reads records from r, calling fn for each. +func scanReader(r io.Reader, fn func(recvTimeNS int64, frame []byte) bool) error { + var lenBuf [lengthPrefixSize]byte + var tsBuf [timestampSize]byte + + for { + if _, err := io.ReadFull(r, lenBuf[:]); err != nil { + if err == io.EOF { + return nil + } + return err + } + + payloadLen := int(binary.BigEndian.Uint32(lenBuf[:])) + if payloadLen < timestampSize { + return fmt.Errorf("recorder: invalid record: payload length %d < %d", payloadLen, timestampSize) + } + + if _, err := io.ReadFull(r, tsBuf[:]); err != nil { + return err + } + recvTimeNS := int64(binary.BigEndian.Uint64(tsBuf[:])) + + frameLen := payloadLen - timestampSize + if frameLen > maxFrameBytes { + return fmt.Errorf("recorder: frame too large: %d bytes", frameLen) + } + + frame := make([]byte, frameLen) + if frameLen > 0 { + if _, err := io.ReadFull(r, frame); err != nil { + return err + } + } + + if !fn(recvTimeNS, frame) { + return nil + } + } +} + +// listSegmentFiles lists segment files in dir, sorted chronologically by name. +func listSegmentFiles(dir string) ([]string, error) { + entries, err := os.ReadDir(dir) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + + var files []string + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), segmentExt) { + continue + } + files = append(files, filepath.Join(dir, e.Name())) + } + sort.Strings(files) + return files, nil +} diff --git a/mothership/internal/recorder/segment_test.go b/mothership/internal/recorder/segment_test.go new file mode 100644 index 0000000..9e08fb1 --- /dev/null +++ b/mothership/internal/recorder/segment_test.go @@ -0,0 +1,347 @@ +package recorder + +import ( + "bytes" + "encoding/binary" + "os" + "path/filepath" + "sort" + "testing" + "time" +) + +func TestSegmentFileName(t *testing.T) { + t.Parallel() + tests := []struct { + input time.Time + want string + }{ + {time.Date(2026, 3, 27, 14, 30, 0, 0, time.UTC), "20260327-14.csi"}, + {time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC), "20260101-00.csi"}, + {time.Date(2026, 12, 31, 23, 59, 59, 0, time.UTC), "20261231-23.csi"}, + // Non-UTC input should be converted to UTC. + {time.Date(2026, 3, 27, 15, 0, 0, 0, time.FixedZone("EST", -5*3600)), "20260327-20.csi"}, + } + for _, tt := range tests { + got := segmentFileName(tt.input) + if got != tt.want { + t.Errorf("segmentFileName(%v) = %q, want %q", tt.input, got, tt.want) + } + } +} + +func TestParseSegmentTime(t *testing.T) { + t.Parallel() + tests := []struct { + name string + want time.Time + wantErr bool + }{ + {"20260327-14.csi", time.Date(2026, 3, 27, 14, 0, 0, 0, time.UTC), false}, + {"20260101-00.csi", time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC), false}, + {"20261231-23.csi", time.Date(2026, 12, 31, 23, 0, 0, 0, time.UTC), false}, + {"not-a-segment.csi", time.Time{}, true}, + {"20260327.csi", time.Time{}, true}, + {"20260327-14.dat", time.Time{}, true}, + } + for _, tt := range tests { + got, err := parseSegmentTime(tt.name) + if tt.wantErr { + if err == nil { + t.Errorf("parseSegmentTime(%q): expected error, got nil", tt.name) + } + continue + } + if err != nil { + t.Errorf("parseSegmentTime(%q): unexpected error: %v", tt.name, err) + continue + } + if !got.Equal(tt.want) { + t.Errorf("parseSegmentTime(%q) = %v, want %v", tt.name, got, tt.want) + } + } +} + +func TestSegmentHour(t *testing.T) { + t.Parallel() + input := time.Date(2026, 3, 27, 14, 30, 45, 123456789, time.UTC) + got := segmentHour(input) + want := time.Date(2026, 3, 27, 14, 0, 0, 0, time.UTC) + if !got.Equal(want) { + t.Errorf("segmentHour(%v) = %v, want %v", input, got, want) + } + + // Non-UTC input should be converted. + input2 := time.Date(2026, 3, 27, 14, 30, 45, 0, time.FixedZone("EST", -5*3600)) + got2 := segmentHour(input2) + want2 := time.Date(2026, 3, 27, 19, 0, 0, 0, time.UTC) // 14:30 EST = 19:30 UTC + if !got2.Equal(want2) { + t.Errorf("segmentHour(%v) = %v, want %v", input2, got2, want2) + } +} + +func TestLinkDir(t *testing.T) { + t.Parallel() + tests := []struct { + input string + want string + }{ + {"AA:BB:CC:DD:EE:FF:11:22:33:44:55:66", "AA:BB:CC:DD:EE:FF-11:22:33:44:55:66"}, + {"AA:BB:CC:DD:EE:FF:AA:BB:CC:DD:EE:FF", "AA:BB:CC:DD:EE:FF-AA:BB:CC:DD:EE:FF"}, + {"short", "short"}, + } + for _, tt := range tests { + got := linkDir(tt.input) + if got != tt.want { + t.Errorf("linkDir(%q) = %q, want %q", tt.input, got, tt.want) + } + } +} + +func TestWriteAndScan(t *testing.T) { + t.Parallel() + dir := t.TempDir() + path := filepath.Join(dir, "20260327-14.csi") + + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + t.Fatal(err) + } + + // Write three records with different timestamps. + records := []struct { + ns int64 + frame []byte + }{ + {1000, []byte("frame1")}, + {2000, []byte("frame2-data")}, + {3000, []byte("frame3-longer-data-here")}, + } + + for _, r := range records { + if err := WriteRecord(f, r.ns, r.frame); err != nil { + t.Fatal(err) + } + } + f.Close() + + // Scan all records. + var scanned []struct { + ns int64 + frame []byte + } + err = ScanSegment(path, func(ns int64, frame []byte) bool { + scanned = append(scanned, struct { + ns int64 + frame []byte + }{ns, frame}) + return true + }) + if err != nil { + t.Fatal(err) + } + + if len(scanned) != 3 { + t.Fatalf("expected 3 records, got %d", len(scanned)) + } + for i, r := range records { + if scanned[i].ns != r.ns { + t.Errorf("record %d: ns = %d, want %d", i, scanned[i].ns, r.ns) + } + if string(scanned[i].frame) != string(r.frame) { + t.Errorf("record %d: frame = %q, want %q", i, scanned[i].frame, r.frame) + } + } +} + +func TestScanSegmentFrom(t *testing.T) { + t.Parallel() + dir := t.TempDir() + path := filepath.Join(dir, "20260327-14.csi") + + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + t.Fatal(err) + } + + WriteRecord(f, 1000, []byte("a")) + WriteRecord(f, 2000, []byte("b")) + WriteRecord(f, 3000, []byte("c")) + f.Close() + + // Scan from 2000 — should get "b" and "c". + var result [][]byte + err = ScanSegmentFrom(path, 2000, func(_ int64, frame []byte) bool { + result = append(result, frame) + return true + }) + if err != nil { + t.Fatal(err) + } + + if len(result) != 2 { + t.Fatalf("expected 2 records, got %d", len(result)) + } + if string(result[0]) != "b" { + t.Errorf("first frame = %q, want %q", result[0], "b") + } + if string(result[1]) != "c" { + t.Errorf("second frame = %q, want %q", result[1], "c") + } +} + +func TestScanStopEarly(t *testing.T) { + t.Parallel() + dir := t.TempDir() + path := filepath.Join(dir, "20260327-14.csi") + + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + t.Fatal(err) + } + WriteRecord(f, 1000, []byte("a")) + WriteRecord(f, 2000, []byte("b")) + WriteRecord(f, 3000, []byte("c")) + f.Close() + + count := 0 + err = ScanSegment(path, func(_ int64, _ []byte) bool { + count++ + return count < 2 // stop after 2 + }) + if err != nil { + t.Fatal(err) + } + if count != 2 { + t.Errorf("expected 2 records scanned, got %d", count) + } +} + +func TestScanEmptyFile(t *testing.T) { + t.Parallel() + dir := t.TempDir() + path := filepath.Join(dir, "20260327-14.csi") + + f, err := os.Create(path) + if err != nil { + t.Fatal(err) + } + f.Close() + + count := 0 + err = ScanSegment(path, func(_ int64, _ []byte) bool { + count++ + return true + }) + if err != nil { + t.Fatal(err) + } + if count != 0 { + t.Errorf("expected 0 records from empty file, got %d", count) + } +} + +func TestScanCorruptRecord(t *testing.T) { + t.Parallel() + dir := t.TempDir() + path := filepath.Join(dir, "20260327-14.csi") + + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + t.Fatal(err) + } + // Write a valid record. + WriteRecord(f, 1000, []byte("ok")) + // Write truncated data (4-byte length says 100 bytes payload but only write 2). + var buf [4]byte + binary.BigEndian.PutUint32(buf[:], 100) + f.Write(buf[:]) + f.Write([]byte{0xFF, 0xFF}) + f.Close() + + err = ScanSegment(path, func(_ int64, _ []byte) bool { + return true + }) + if err == nil { + t.Error("expected error on corrupt record") + } +} + +func TestListSegmentFiles(t *testing.T) { + t.Parallel() + dir := t.TempDir() + + // Create some segment files and a non-segment file. + segments := []string{"20260327-13.csi", "20260327-14.csi", "20260327-15.csi"} + for _, name := range segments { + if err := os.WriteFile(filepath.Join(dir, name), nil, 0644); err != nil { + t.Fatal(err) + } + } + os.WriteFile(filepath.Join(dir, "other.txt"), nil, 0644) + os.MkdirAll(filepath.Join(dir, "subdir"), 0755) + + files, err := listSegmentFiles(dir) + if err != nil { + t.Fatal(err) + } + + if len(files) != 3 { + t.Fatalf("expected 3 segment files, got %d", len(files)) + } + + // Should be sorted chronologically. + var names []string + for _, f := range files { + names = append(names, filepath.Base(f)) + } + if !sort.StringsAreSorted(names) { + t.Errorf("segment files not sorted: %v", names) + } +} + +func TestListSegmentFilesNonExistentDir(t *testing.T) { + t.Parallel() + files, err := listSegmentFiles("/nonexistent/path") + if err != nil { + t.Fatal(err) + } + if len(files) != 0 { + t.Errorf("expected 0 files for nonexistent dir, got %d", len(files)) + } +} + +func TestScanReaderWithBytesBuffer(t *testing.T) { + t.Parallel() + // Build a record in memory and scan it. + var buf bytes.Buffer + WriteRecordToBuffer(&buf, 42, []byte("hello")) + + count := 0 + err := scanReader(&buf, func(ns int64, frame []byte) bool { + count++ + if ns != 42 { + t.Errorf("ns = %d, want 42", ns) + } + if string(frame) != "hello" { + t.Errorf("frame = %q, want %q", frame, "hello") + } + return true + }) + if err != nil { + t.Fatal(err) + } + if count != 1 { + t.Errorf("expected 1 record, got %d", count) + } +} + +// WriteRecordToBuffer writes a record to a bytes.Buffer (for testing scanReader). +func WriteRecordToBuffer(buf *bytes.Buffer, recvTimeNS int64, frame []byte) { + payloadLen := uint32(timestampSize + len(frame)) + var hdr [recordOverhead]byte + binary.BigEndian.PutUint32(hdr[:4], payloadLen) + binary.BigEndian.PutUint64(hdr[4:12], uint64(recvTimeNS)) + buf.Write(hdr[:]) + buf.Write(frame) +}