diff --git a/mothership/internal/api/replay.go b/mothership/internal/api/replay.go index f00fc69..359768f 100644 --- a/mothership/internal/api/replay.go +++ b/mothership/internal/api/replay.go @@ -65,11 +65,71 @@ func (h *ReplayHandler) Close() error { // RegisterRoutes registers replay endpoints. // -// GET /api/replay/sessions — list available recording sessions -// POST /api/replay/start — start replay at given timestamp -// POST /api/replay/stop — stop replay, return to live -// POST /api/replay/seek — seek to timestamp within session -// POST /api/replay/tune — update pipeline parameters mid-replay +// Replay/Time-Travel Endpoints: +// +// GET /api/replay/sessions — list recording sessions and replay store info +// +// @Summary List replay sessions +// @Description Returns information about available recorded data and active replay sessions. +// @Description Includes file size, timestamp range, and all active sessions. +// @Tags replay +// @Produce json +// @Success 200 {object} replayInfo "Replay store info and active sessions" +// @Router /api/replay/sessions [get] +// +// POST /api/replay/start — start replay at given timestamp +// +// @Summary Start replay session +// @Description Creates a new replay session for the specified time range. The session +// @Description starts in paused state. Use speed to control playback rate (1, 2, or 5). +// @Tags replay +// @Accept json +// @Produce json +// @Param request body startSessionRequest true "Replay start parameters" +// @Success 200 {object} map[string]interface{} "Session created with ID and state" +// @Failure 400 {object} map[string]string "Invalid request parameters" +// @Router /api/replay/start [post] +// +// POST /api/replay/stop — stop replay, return to live +// +// @Summary Stop replay session +// @Description Stops the specified replay session and returns to live mode. +// @Tags replay +// @Accept json +// @Produce json +// @Param request body stopSessionRequest true "Session to stop" +// @Success 200 {object} map[string]string "Session stopped" +// @Failure 404 {object} map[string]string "Session not found" +// @Router /api/replay/stop [post] +// +// POST /api/replay/seek — seek to timestamp within session +// +// @Summary Seek within replay session +// @Description Moves the replay cursor to the specified timestamp within the session range. +// @Description Pauses playback and reads one frame at the target position. +// @Tags replay +// @Accept json +// @Produce json +// @Param request body seekRequest true "Seek parameters" +// @Success 200 {object} map[string]interface{} "Seek complete with current position" +// @Failure 400 {object} map[string]string "Invalid timestamp or out of range" +// @Failure 404 {object} map[string]string "Session not found" +// @Router /api/replay/seek [post] +// +// POST /api/replay/tune — update pipeline parameters mid-replay +// +// @Summary Tune replay pipeline parameters +// @Description Updates detection pipeline parameters for the replay session without +// @Description affecting live processing. Useful for exploring how parameter changes +// @Description affect detection on historical data. +// @Tags replay +// @Accept json +// @Produce json +// @Param request body tuneRequest true "Parameter updates" +// @Success 200 {object} map[string]interface{} "Parameters updated" +// @Failure 400 {object} map[string]string "Invalid request" +// @Failure 404 {object} map[string]string "Session not found" +// @Router /api/replay/tune [post] func (h *ReplayHandler) RegisterRoutes(r chi.Router) { r.Get("/api/replay/sessions", h.listSessions) r.Post("/api/replay/start", h.startSession) @@ -78,16 +138,19 @@ func (h *ReplayHandler) RegisterRoutes(r chi.Router) { r.Post("/api/replay/tune", h.tune) } +// replayInfo represents the response from GET /api/replay/sessions. type replayInfo struct { - HasData bool `json:"has_data"` - FileSize int64 `json:"file_size_mb"` - WritePos int64 `json:"write_pos"` - OldestPos int64 `json:"oldest_pos"` - OldestTS int64 `json:"oldest_timestamp_ms"` - NewestTS int64 `json:"newest_timestamp_ms"` + HasData bool `json:"has_data"` + FileSize int64 `json:"file_size_mb"` + WritePos int64 `json:"write_pos"` + OldestPos int64 `json:"oldest_pos"` + OldestTS int64 `json:"oldest_timestamp_ms"` + NewestTS int64 `json:"newest_timestamp_ms"` Sessions []*_replaySession `json:"sessions"` } +// listSessions handles GET /api/replay/sessions. +// Returns replay store statistics and all active sessions. func (h *ReplayHandler) listSessions(w http.ResponseWriter, r *http.Request) { stats := h.store.Stats() @@ -115,25 +178,31 @@ func (h *ReplayHandler) listSessions(w http.ResponseWriter, r *http.Request) { Sessions: sessions, } - writeJSON(w, info) + writeJSON(w, http.StatusOK, info) } +// startSessionRequest represents the request body for POST /api/replay/start. type startSessionRequest struct { + // FromISO8601 is the start timestamp in ISO8601 format (e.g., "2024-03-15T14:30:00Z") FromISO8601 string `json:"from_iso8601"` - ToISO8601 string `json:"to_iso8601"` - Speed int `json:"speed,omitempty"` // 1, 2, 5 + // ToISO8601 is the end timestamp in ISO8601 format. If empty, defaults to now. + ToISO8601 string `json:"to_iso8601"` + // Speed is the playback speed multiplier: 1, 2, or 5. Defaults to 1. + Speed int `json:"speed,omitempty"` } +// startSession handles POST /api/replay/start. +// Creates a new replay session for the specified time range. func (h *ReplayHandler) startSession(w http.ResponseWriter, r *http.Request) { var req startSessionRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - http.Error(w, "invalid request body", http.StatusBadRequest) + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body: " + err.Error()}) return } fromMS, err := parseISO8601(req.FromISO8601) if err != nil { - http.Error(w, "invalid from_iso8601: "+err.Error(), http.StatusBadRequest) + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid from_iso8601: " + err.Error()}) return } @@ -141,13 +210,13 @@ func (h *ReplayHandler) startSession(w http.ResponseWriter, r *http.Request) { if req.ToISO8601 != "" { toMS, err = parseISO8601(req.ToISO8601) if err != nil { - http.Error(w, "invalid to_iso8601: "+err.Error(), http.StatusBadRequest) + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid to_iso8601: " + err.Error()}) return } } if toMS < fromMS { - http.Error(w, "to_iso8601 must be after from_iso8601", http.StatusBadRequest) + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "to_iso8601 must be after from_iso8601"}) return } @@ -156,7 +225,7 @@ func (h *ReplayHandler) startSession(w http.ResponseWriter, r *http.Request) { speed = 1 } if speed != 1 && speed != 2 && speed != 5 { - http.Error(w, "speed must be 1, 2, or 5", http.StatusBadRequest) + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "speed must be 1, 2, or 5"}) return } @@ -179,7 +248,7 @@ func (h *ReplayHandler) startSession(w http.ResponseWriter, r *http.Request) { log.Printf("[INFO] Replay session started: %s (from %d to %d, speed %dx)", session.ID, fromMS, toMS, speed) - writeJSON(w, map[string]interface{}{ + writeJSON(w, http.StatusOK, map[string]interface{}{ "session_id": session.ID, "from_ms": fromMS, "to_ms": toMS, @@ -188,14 +257,18 @@ func (h *ReplayHandler) startSession(w http.ResponseWriter, r *http.Request) { }) } +// stopSessionRequest represents the request body for POST /api/replay/stop. type stopSessionRequest struct { + // SessionID is the ID of the session to stop. SessionID string `json:"session_id"` } +// stopSession handles POST /api/replay/stop. +// Stops the specified replay session and deletes it. func (h *ReplayHandler) stopSession(w http.ResponseWriter, r *http.Request) { var req stopSessionRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - http.Error(w, "invalid request body", http.StatusBadRequest) + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body: " + err.Error()}) return } @@ -204,28 +277,35 @@ func (h *ReplayHandler) stopSession(w http.ResponseWriter, r *http.Request) { session, exists := h.sessions[req.SessionID] if !exists { - http.Error(w, "session not found", http.StatusNotFound) + writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"}) return } session.State = "stopped" delete(h.sessions, req.SessionID) - writeJSON(w, map[string]interface{}{ - "status": "stopped", + log.Printf("[INFO] Replay session stopped: %s", req.SessionID) + + writeJSON(w, http.StatusOK, map[string]interface{}{ + "status": "stopped", "session": req.SessionID, }) } +// seekRequest represents the request body for POST /api/replay/seek. type seekRequest struct { - SessionID string `json:"session_id"` + // SessionID is the ID of the session to seek within. + SessionID string `json:"session_id"` + // TimestampISO8601 is the target timestamp in ISO8601 format. TimestampISO8601 string `json:"timestamp_iso8601"` } +// seek handles POST /api/replay/seek. +// Seeks to the specified timestamp within the session. func (h *ReplayHandler) seek(w http.ResponseWriter, r *http.Request) { var req seekRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - http.Error(w, "invalid request body", http.StatusBadRequest) + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body: " + err.Error()}) return } @@ -234,18 +314,18 @@ func (h *ReplayHandler) seek(w http.ResponseWriter, r *http.Request) { session, exists := h.sessions[req.SessionID] if !exists { - http.Error(w, "session not found", http.StatusNotFound) + writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"}) return } targetMS, err := parseISO8601(req.TimestampISO8601) if err != nil { - http.Error(w, "invalid timestamp: "+err.Error(), http.StatusBadRequest) + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid timestamp: " + err.Error()}) return } if targetMS < session.FromMS || targetMS > session.ToMS { - http.Error(w, "timestamp outside session range", http.StatusBadRequest) + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "timestamp outside session range"}) return } @@ -263,26 +343,37 @@ func (h *ReplayHandler) seek(w http.ResponseWriter, r *http.Request) { return true }) - writeJSON(w, map[string]interface{}{ + log.Printf("[INFO] Replay session seeked: %s to %d", req.SessionID, targetMS) + + writeJSON(w, http.StatusOK, map[string]interface{}{ "status": "seeked", "current_ms": targetMS, "frame_found": len(frameData) > 0, }) } +// tuneRequest represents the request body for POST /api/replay/tune. type tuneRequest struct { - SessionID string `json:"session_id"` - DeltaRMSThreshold *float64 `json:"delta_rms_threshold,omitempty"` - TauS *float64 `json:"tau_s,omitempty"` - FresnelDecay *float64 `json:"fresnel_decay,omitempty"` - Subcarriers *int `json:"n_subcarriers,omitempty"` - BreathingSensitivity *float64 `json:"breathing_sensitivity,omitempty"` + // SessionID is the ID of the session to tune. + SessionID string `json:"session_id"` + // DeltaRMSThreshold is the motion detection threshold (0.001-1.0). + DeltaRMSThreshold *float64 `json:"delta_rms_threshold,omitempty"` + // TauS is the EMA baseline time constant in seconds (1-600). + TauS *float64 `json:"tau_s,omitempty"` + // FresnelDecay is the Fresnel zone weight decay rate (1.0-4.0). + FresnelDecay *float64 `json:"fresnel_decay,omitempty"` + // Subcarriers is the number of subcarriers for NBVI selection (8-47). + Subcarriers *int `json:"n_subcarriers,omitempty"` + // BreathingSensitivity is the breathing detection threshold in radians RMS (0.001-0.1). + BreathingSensitivity *float64 `json:"breathing_sensitivity,omitempty"` } +// tune handles POST /api/replay/tune. +// Updates pipeline parameters for the replay session. func (h *ReplayHandler) tune(w http.ResponseWriter, r *http.Request) { var req tuneRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - http.Error(w, "invalid request body", http.StatusBadRequest) + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body: " + err.Error()}) return } @@ -291,7 +382,7 @@ func (h *ReplayHandler) tune(w http.ResponseWriter, r *http.Request) { session, exists := h.sessions[req.SessionID] if !exists { - http.Error(w, "session not found", http.StatusNotFound) + writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"}) return } @@ -315,7 +406,7 @@ func (h *ReplayHandler) tune(w http.ResponseWriter, r *http.Request) { log.Printf("[INFO] Replay session tuned: %s params=%+v", req.SessionID, params) - writeJSON(w, map[string]interface{}{ + writeJSON(w, http.StatusOK, map[string]interface{}{ "status": "tuned", "params": params, "session": req.SessionID, diff --git a/mothership/internal/api/replay_test.go b/mothership/internal/api/replay_test.go new file mode 100644 index 0000000..ed57d8a --- /dev/null +++ b/mothership/internal/api/replay_test.go @@ -0,0 +1,1137 @@ +package api + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/go-chi/chi" + "github.com/spaxel/mothership/internal/replay" +) + +// mockRecordingStore is a mock implementation of RecordingStore for testing. +type mockRecordingStore struct { + stats replay.Stats + scanFunc func(fn func(recvTimeNS int64, frame []byte) bool) bool + closed bool + closeErr error +} + +func (m *mockRecordingStore) Stats() replay.Stats { + return m.stats +} + +func (m *mockRecordingStore) Scan(fn func(recvTimeNS int64, frame []byte) bool) bool { + if m.scanFunc != nil { + return m.scanFunc(fn) + } + return true +} + +func (m *mockRecordingStore) Close() error { + m.closed = true + if m.closeErr != nil { + return m.closeErr + } + return nil +} + +// newTestReplayHandler creates a ReplayHandler with a mock store. +func newTestReplayHandler(t *testing.T) *ReplayHandler { + t.Helper() + + store := &mockRecordingStore{ + stats: replay.Stats{ + HasData: true, + WritePos: 5000, + OldestPos: 32, + FileSize: 360 * 1024 * 1024, + }, + scanFunc: func(fn func(recvTimeNS int64, frame []byte) bool) bool { + // Simulate some frames at different timestamps + timestamps := []int64{ + 1710450000000000000, // 2024-03-14 12:00:00 + 1710450030000000000, // 2024-03-14 12:00:30 + 1710450060000000000, // 2024-03-14 12:01:00 + } + frames := [][]byte{ + []byte("frame1"), + []byte("frame2"), + []byte("frame3"), + } + for i, ts := range timestamps { + if !fn(ts, frames[i]) { + break + } + } + return true + }, + } + + handler, err := NewReplayHandler("/data/csi_replay.bin", store) + if err != nil { + t.Fatalf("NewReplayHandler: %v", err) + } + return handler +} + +// setupReplayRouter creates a chi.Router with replay routes registered. +func setupReplayRouter(h *ReplayHandler) *chi.Mux { + r := chi.NewRouter() + h.RegisterRoutes(r) + return r +} + +// TestListSessions tests GET /api/replay/sessions. +func TestListSessions(t *testing.T) { + tests := []struct { + name string + hasData bool + wantStatus int + check func(*testing.T, map[string]interface{}) + }{ + { + name: "list sessions with data", + hasData: true, + wantStatus: http.StatusOK, + check: func(t *testing.T, resp map[string]interface{}) { + if resp["has_data"] != true { + t.Errorf("Expected has_data=true, got %v", resp["has_data"]) + } + if fileSize, ok := resp["file_size_mb"].(int64); !ok || fileSize == 0 { + t.Errorf("Expected non-zero file_size_mb, got %v", resp["file_size_mb"]) + } + if oldestTS, ok := resp["oldest_timestamp_ms"].(int64); !ok || oldestTS == 0 { + t.Errorf("Expected non-zero oldest_timestamp_ms, got %v", resp["oldest_timestamp_ms"]) + } + if newestTS, ok := resp["newest_timestamp_ms"].(int64); !ok || newestTS == 0 { + t.Errorf("Expected non-zero newest_timestamp_ms, got %v", resp["newest_timestamp_ms"]) + } + sessions, ok := resp["sessions"].([]interface{}) + if !ok { + t.Errorf("Expected sessions array, got %T", resp["sessions"]) + } + // Empty sessions list initially + if len(sessions) != 0 { + t.Errorf("Expected 0 sessions, got %d", len(sessions)) + } + }, + }, + { + name: "list sessions with no data", + hasData: false, + wantStatus: http.StatusOK, + check: func(t *testing.T, resp map[string]interface{}) { + if resp["has_data"] != false { + t.Errorf("Expected has_data=false, got %v", resp["has_data"]) + } + if oldestTS, ok := resp["oldest_timestamp_ms"].(int64); ok && oldestTS != 0 { + t.Errorf("Expected zero oldest_timestamp_ms when no data, got %d", oldestTS) + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := newTestReplayHandler(t) + handler.store.(*mockRecordingStore).stats.HasData = tt.hasData + + r := setupReplayRouter(handler) + req := httptest.NewRequest("GET", "/api/replay/sessions", nil) + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != tt.wantStatus { + t.Fatalf("Expected status %d, got %d: %s", tt.wantStatus, rr.Code, rr.Body.String()) + } + + var resp map[string]interface{} + if err := json.NewDecoder(rr.Body).Decode(&resp); err != nil { + t.Fatalf("Failed to decode response: %v", err) + } + + if tt.check != nil { + tt.check(t, resp) + } + }) + } +} + +// TestStartSession tests POST /api/replay/start. +func TestStartSession(t *testing.T) { + now := time.Now().UnixNano() / 1e6 + pastTime := time.Now().Add(-1 * time.Hour).Format(time.RFC3339Nano) + + tests := []struct { + name string + body startSessionRequest + wantStatus int + check func(*testing.T, map[string]interface{}) + }{ + { + name: "start session with valid range", + body: startSessionRequest{ + FromISO8601: pastTime, + ToISO8601: "", + Speed: 1, + }, + wantStatus: http.StatusOK, + check: func(t *testing.T, resp map[string]interface{}) { + sessionID, ok := resp["session_id"].(string) + if !ok || sessionID == "" { + t.Errorf("Expected non-empty session_id, got %v", resp["session_id"]) + } + if fromMS, ok := resp["from_ms"].(int64); !ok || fromMS == 0 { + t.Errorf("Expected non-zero from_ms, got %v", resp["from_ms"]) + } + if toMS, ok := resp["to_ms"].(int64); !ok || toMS == 0 { + t.Errorf("Expected non-zero to_ms, got %v", resp["to_ms"]) + } + if state, ok := resp["state"].(string); !ok || state != "paused" { + t.Errorf("Expected state=paused, got %v", resp["state"]) + } + }, + }, + { + name: "start session with explicit to time", + body: startSessionRequest{ + FromISO8601: time.Now().Add(-2 * time.Hour).Format(time.RFC3339Nano), + ToISO8601: time.Now().Add(-1 * time.Hour).Format(time.RFC3339Nano), + Speed: 2, + }, + wantStatus: http.StatusOK, + check: func(t *testing.T, resp map[string]interface{}) { + if speed, ok := resp["speed"].(int); !ok || speed != 2 { + t.Errorf("Expected speed=2, got %v", resp["speed"]) + } + }, + }, + { + name: "start session with speed 5", + body: startSessionRequest{ + FromISO8601: pastTime, + Speed: 5, + }, + wantStatus: http.StatusOK, + check: func(t *testing.T, resp map[string]interface{}) { + if speed, ok := resp["speed"].(int); !ok || speed != 5 { + t.Errorf("Expected speed=5, got %v", resp["speed"]) + } + }, + }, + { + name: "default speed when not specified", + body: startSessionRequest{ + FromISO8601: pastTime, + }, + wantStatus: http.StatusOK, + check: func(t *testing.T, resp map[string]interface{}) { + if speed, ok := resp["speed"].(int); !ok || speed != 1 { + t.Errorf("Expected default speed=1, got %v", resp["speed"]) + } + }, + }, + { + name: "invalid from timestamp", + body: startSessionRequest{ + FromISO8601: "invalid-timestamp", + }, + wantStatus: http.StatusBadRequest, + check: func(t *testing.T, resp map[string]interface{}) { + if _, ok := resp["error"]; !ok { + t.Error("Expected error in response") + } + }, + }, + { + name: "invalid to timestamp", + body: startSessionRequest{ + FromISO8601: pastTime, + ToISO8601: "not-a-timestamp", + }, + wantStatus: http.StatusBadRequest, + check: func(t *testing.T, resp map[string]interface{}) { + if _, ok := resp["error"]; !ok { + t.Error("Expected error in response") + } + }, + }, + { + name: "to before from", + body: startSessionRequest{ + FromISO8601: time.Now().Format(time.RFC3339Nano), + ToISO8601: time.Now().Add(-1 * time.Hour).Format(time.RFC3339Nano), + }, + wantStatus: http.StatusBadRequest, + check: func(t *testing.T, resp map[string]interface{}) { + if _, ok := resp["error"]; !ok { + t.Error("Expected error in response") + } + }, + }, + { + name: "invalid speed", + body: startSessionRequest{ + FromISO8601: pastTime, + Speed: 3, + }, + wantStatus: http.StatusBadRequest, + check: func(t *testing.T, resp map[string]interface{}) { + if _, ok := resp["error"]; !ok { + t.Error("Expected error in response") + } + }, + }, + { + name: "empty body", + body: startSessionRequest{}, + wantStatus: http.StatusBadRequest, + check: func(t *testing.T, resp map[string]interface{}) { + if _, ok := resp["error"]; !ok { + t.Error("Expected error in response") + } + }, + }, + { + name: "malformed JSON", + body: startSessionRequest{}, + wantStatus: http.StatusBadRequest, + check: func(t *testing.T, resp map[string]interface{}) { + if _, ok := resp["error"]; !ok { + t.Error("Expected error in response") + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := newTestReplayHandler(t) + r := setupReplayRouter(handler) + + var body []byte + var err error + if tt.name == "malformed JSON" { + body = []byte(`{invalid json}`) + } else if tt.name == "empty body" { + body = []byte(``) + } else { + body, err = json.Marshal(tt.body) + if err != nil { + t.Fatalf("Failed to marshal request: %v", err) + } + } + + req := httptest.NewRequest("POST", "/api/replay/start", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != tt.wantStatus { + t.Fatalf("Expected status %d, got %d: %s", tt.wantStatus, rr.Code, rr.Body.String()) + } + + var resp map[string]interface{} + if err := json.NewDecoder(rr.Body).Decode(&resp); err != nil { + t.Fatalf("Failed to decode response: %v", err) + } + + if tt.check != nil { + tt.check(t, resp) + } + }) + } +} + +// TestStopSession tests POST /api/replay/stop. +func TestStopSession(t *testing.T) { + pastTime := time.Now().Add(-1 * time.Hour).Format(time.RFC3339Nano) + + tests := []struct { + name string + setup func(*ReplayHandler) string + body stopSessionRequest + wantStatus int + check func(*testing.T, *ReplayHandler, map[string]interface{}) + }{ + { + name: "stop existing session", + setup: func(h *ReplayHandler) string { + // Create a session first + body, _ := json.Marshal(startSessionRequest{ + FromISO8601: pastTime, + Speed: 1, + }) + r := setupReplayRouter(h) + req := httptest.NewRequest("POST", "/api/replay/start", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + var resp map[string]interface{} + json.NewDecoder(rr.Body).Decode(&resp) + return resp["session_id"].(string) + }, + body: stopSessionRequest{SessionID: "replay-1"}, + wantStatus: http.StatusOK, + check: func(t *testing.T, h *ReplayHandler, resp map[string]interface{}) { + if resp["status"] != "stopped" { + t.Errorf("Expected status=stopped, got %v", resp["status"]) + } + // Verify session is removed + h.mu.RLock() + _, exists := h.sessions["replay-1"] + h.mu.RUnlock() + if exists { + t.Error("Session should be removed after stop") + } + }, + }, + { + name: "stop nonexistent session", + setup: func(h *ReplayHandler) string { return "" }, + body: stopSessionRequest{ + SessionID: "does-not-exist", + }, + wantStatus: http.StatusNotFound, + check: func(t *testing.T, h *ReplayHandler, resp map[string]interface{}) { + if _, ok := resp["error"]; !ok { + t.Error("Expected error in response") + } + }, + }, + { + name: "empty session_id", + setup: func(h *ReplayHandler) string { return "" }, + body: stopSessionRequest{}, + wantStatus: http.StatusNotFound, + check: func(t *testing.T, h *ReplayHandler, resp map[string]interface{}) { + if _, ok := resp["error"]; !ok { + t.Error("Expected error in response") + } + }, + }, + { + name: "malformed JSON", + setup: func(h *ReplayHandler) string { return "" }, + body: stopSessionRequest{}, + wantStatus: http.StatusBadRequest, + check: func(t *testing.T, h *ReplayHandler, resp map[string]interface{}) { + if _, ok := resp["error"]; !ok { + t.Error("Expected error in response") + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := newTestReplayHandler(t) + + // For the "malformed JSON" test, we need special handling + if tt.name == "malformed JSON" { + r := setupReplayRouter(handler) + req := httptest.NewRequest("POST", "/api/replay/stop", bytes.NewReader([]byte(`{invalid}`))) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != tt.wantStatus { + t.Errorf("Expected status %d, got %d", tt.wantStatus, rr.Code) + } + return + } + + sessionID := tt.setup(handler) + if sessionID != "" { + tt.body.SessionID = sessionID + } + + r := setupReplayRouter(handler) + body, _ := json.Marshal(tt.body) + req := httptest.NewRequest("POST", "/api/replay/stop", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != tt.wantStatus { + t.Fatalf("Expected status %d, got %d: %s", tt.wantStatus, rr.Code, rr.Body.String()) + } + + var resp map[string]interface{} + if err := json.NewDecoder(rr.Body).Decode(&resp); err != nil { + t.Fatalf("Failed to decode response: %v", err) + } + + if tt.check != nil { + tt.check(t, handler, resp) + } + }) + } +} + +// TestSeek tests POST /api/replay/seek. +func TestSeek(t *testing.T) { + pastTime := time.Now().Add(-1 * time.Hour).Format(time.RFC3339Nano) + midTime := time.Now().Add(-30 * time.Minute).Format(time.RFC3339Nano) + + tests := []struct { + name string + setup func(*ReplayHandler) string + body seekRequest + wantStatus int + check func(*testing.T, map[string]interface{}) + }{ + { + name: "seek to valid timestamp within range", + setup: func(h *ReplayHandler) string { + body, _ := json.Marshal(startSessionRequest{ + FromISO8601: time.Now().Add(-1 * time.Hour).Format(time.RFC3339Nano), + Speed: 1, + }) + r := setupReplayRouter(h) + req := httptest.NewRequest("POST", "/api/replay/start", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + var resp map[string]interface{} + json.NewDecoder(rr.Body).Decode(&resp) + return resp["session_id"].(string) + }, + body: seekRequest{ + TimestampISO8601: midTime, + }, + wantStatus: http.StatusOK, + check: func(t *testing.T, resp map[string]interface{}) { + if resp["status"] != "seeked" { + t.Errorf("Expected status=seeked, got %v", resp["status"]) + } + if currentMS, ok := resp["current_ms"].(int64); !ok || currentMS == 0 { + t.Errorf("Expected non-zero current_ms, got %v", resp["current_ms"]) + } + // Mock store should find a frame + if frameFound, ok := resp["frame_found"].(bool); !ok || !frameFound { + t.Error("Expected frame_found=true with mock store") + } + }, + }, + { + name: "seek before session range", + setup: func(h *ReplayHandler) string { + fromTime := time.Now().Add(-1 * time.Hour).Format(time.RFC3339Nano) + body, _ := json.Marshal(startSessionRequest{ + FromISO8601: fromTime, + Speed: 1, + }) + r := setupReplayRouter(h) + req := httptest.NewRequest("POST", "/api/replay/start", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + var resp map[string]interface{} + json.NewDecoder(rr.Body).Decode(&resp) + return resp["session_id"].(string) + }, + body: seekRequest{ + TimestampISO8601: time.Now().Add(-2 * time.Hour).Format(time.RFC3339Nano), + }, + wantStatus: http.StatusBadRequest, + check: func(t *testing.T, resp map[string]interface{}) { + if _, ok := resp["error"]; !ok { + t.Error("Expected error in response") + } + }, + }, + { + name: "invalid timestamp format", + setup: func(h *ReplayHandler) string { + body, _ := json.Marshal(startSessionRequest{ + FromISO8601: pastTime, + Speed: 1, + }) + r := setupReplayRouter(h) + req := httptest.NewRequest("POST", "/api/replay/start", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + var resp map[string]interface{} + json.NewDecoder(rr.Body).Decode(&resp) + return resp["session_id"].(string) + }, + body: seekRequest{ + TimestampISO8601: "not-a-timestamp", + }, + wantStatus: http.StatusBadRequest, + check: func(t *testing.T, resp map[string]interface{}) { + if _, ok := resp["error"]; !ok { + t.Error("Expected error in response") + } + }, + }, + { + name: "session not found", + setup: func(h *ReplayHandler) string { return "" }, + body: seekRequest{ + SessionID: "does-not-exist", + TimestampISO8601: midTime, + }, + wantStatus: http.StatusNotFound, + check: func(t *testing.T, resp map[string]interface{}) { + if _, ok := resp["error"]; !ok { + t.Error("Expected error in response") + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := newTestReplayHandler(t) + sessionID := tt.setup(handler) + if sessionID != "" { + tt.body.SessionID = sessionID + } + + r := setupReplayRouter(handler) + body, _ := json.Marshal(tt.body) + req := httptest.NewRequest("POST", "/api/replay/seek", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != tt.wantStatus { + t.Fatalf("Expected status %d, got %d: %s", tt.wantStatus, rr.Code, rr.Body.String()) + } + + var resp map[string]interface{} + if err := json.NewDecoder(rr.Body).Decode(&resp); err != nil { + t.Fatalf("Failed to decode response: %v", err) + } + + if tt.check != nil { + tt.check(t, resp) + } + }) + } +} + +// TestTune tests POST /api/replay/tune. +func TestTune(t *testing.T) { + pastTime := time.Now().Add(-1 * time.Hour).Format(time.RFC3339Nano) + + deltaThreshold := 0.05 + tauS := 45.0 + fresnelDecay := 2.5 + subcarriers := 24 + breathingSens := 0.008 + + tests := []struct { + name string + setup func(*ReplayHandler) string + body tuneRequest + wantStatus int + check func(*testing.T, map[string]interface{}) + }{ + { + name: "tune all parameters", + setup: func(h *ReplayHandler) string { + body, _ := json.Marshal(startSessionRequest{ + FromISO8601: pastTime, + Speed: 1, + }) + r := setupReplayRouter(h) + req := httptest.NewRequest("POST", "/api/replay/start", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + var resp map[string]interface{} + json.NewDecoder(rr.Body).Decode(&resp) + return resp["session_id"].(string) + }, + body: tuneRequest{ + DeltaRMSThreshold: &deltaThreshold, + TauS: &tauS, + FresnelDecay: &fresnelDecay, + Subcarriers: &subcarriers, + BreathingSensitivity: &breathingSens, + }, + wantStatus: http.StatusOK, + check: func(t *testing.T, resp map[string]interface{}) { + if resp["status"] != "tuned" { + t.Errorf("Expected status=tuned, got %v", resp["status"]) + } + params, ok := resp["params"].(map[string]interface{}) + if !ok { + t.Fatalf("Expected params map, got %T", resp["params"]) + } + if params["delta_rms_threshold"] != 0.05 { + t.Errorf("Expected delta_rms_threshold=0.05, got %v", params["delta_rms_threshold"]) + } + if params["tau_s"] != 45.0 { + t.Errorf("Expected tau_s=45.0, got %v", params["tau_s"]) + } + if params["fresnel_decay"] != 2.5 { + t.Errorf("Expected fresnel_decay=2.5, got %v", params["fresnel_decay"]) + } + if params["n_subcarriers"] != 24 { + t.Errorf("Expected n_subcarriers=24, got %v", params["n_subcarriers"]) + } + if params["breathing_sensitivity"] != 0.008 { + t.Errorf("Expected breathing_sensitivity=0.008, got %v", params["breathing_sensitivity"]) + } + }, + }, + { + name: "tune single parameter", + setup: func(h *ReplayHandler) string { + body, _ := json.Marshal(startSessionRequest{ + FromISO8601: pastTime, + Speed: 1, + }) + r := setupReplayRouter(h) + req := httptest.NewRequest("POST", "/api/replay/start", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + var resp map[string]interface{} + json.NewDecoder(rr.Body).Decode(&resp) + return resp["session_id"].(string) + }, + body: tuneRequest{ + DeltaRMSThreshold: &deltaThreshold, + }, + wantStatus: http.StatusOK, + check: func(t *testing.T, resp map[string]interface{}) { + params, ok := resp["params"].(map[string]interface{}) + if !ok { + t.Fatalf("Expected params map, got %T", resp["params"]) + } + if params["delta_rms_threshold"] != 0.05 { + t.Errorf("Expected delta_rms_threshold=0.05, got %v", params["delta_rms_threshold"]) + } + }, + }, + { + name: "session not found", + setup: func(h *ReplayHandler) string { return "" }, + body: tuneRequest{ + DeltaRMSThreshold: &deltaThreshold, + }, + wantStatus: http.StatusNotFound, + check: func(t *testing.T, resp map[string]interface{}) { + if _, ok := resp["error"]; !ok { + t.Error("Expected error in response") + } + }, + }, + { + name: "malformed JSON", + setup: func(h *ReplayHandler) string { return "" }, + body: tuneRequest{}, + wantStatus: http.StatusBadRequest, + check: func(t *testing.T, resp map[string]interface{}) { + if _, ok := resp["error"]; !ok { + t.Error("Expected error in response") + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := newTestReplayHandler(t) + + // Special handling for malformed JSON test + if tt.name == "malformed JSON" { + r := setupReplayRouter(handler) + req := httptest.NewRequest("POST", "/api/replay/tune", bytes.NewReader([]byte(`{invalid}`))) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != tt.wantStatus { + t.Errorf("Expected status %d, got %d", tt.wantStatus, rr.Code) + } + return + } + + sessionID := tt.setup(handler) + if sessionID != "" { + tt.body.SessionID = sessionID + } + + r := setupReplayRouter(handler) + body, _ := json.Marshal(tt.body) + req := httptest.NewRequest("POST", "/api/replay/tune", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != tt.wantStatus { + t.Fatalf("Expected status %d, got %d: %s", tt.wantStatus, rr.Code, rr.Body.String()) + } + + var resp map[string]interface{} + if err := json.NewDecoder(rr.Body).Decode(&resp); err != nil { + t.Fatalf("Failed to decode response: %v", err) + } + + if tt.check != nil { + tt.check(t, resp) + } + }) + } +} + +// TestReplaySessionLifecycle tests the full lifecycle: start -> tune -> seek -> stop. +func TestReplaySessionLifecycle(t *testing.T) { + handler := newTestReplayHandler(t) + r := setupReplayRouter(handler) + + pastTime := time.Now().Add(-1 * time.Hour).Format(time.RFC3339Nano) + midTime := time.Now().Add(-30 * time.Minute).Format(time.RFC3339Nano) + + // 1. Start a session + startBody, _ := json.Marshal(startSessionRequest{ + FromISO8601: pastTime, + Speed: 2, + }) + req := httptest.NewRequest("POST", "/api/replay/start", bytes.NewReader(startBody)) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != http.StatusOK { + t.Fatalf("Start: expected 200, got %d: %s", rr.Code, rr.Body.String()) + } + + var startResp map[string]interface{} + if err := json.NewDecoder(rr.Body).Decode(&startResp); err != nil { + t.Fatalf("Failed to decode start response: %v", err) + } + + sessionID, ok := startResp["session_id"].(string) + if !ok || sessionID == "" { + t.Fatal("Expected non-empty session_id") + } + + // 2. Tune the session + threshold := 0.03 + tuneBody, _ := json.Marshal(tuneRequest{ + SessionID: sessionID, + DeltaRMSThreshold: &threshold, + }) + req = httptest.NewRequest("POST", "/api/replay/tune", bytes.NewReader(tuneBody)) + req.Header.Set("Content-Type", "application/json") + rr = httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != http.StatusOK { + t.Fatalf("Tune: expected 200, got %d: %s", rr.Code, rr.Body.String()) + } + + // 3. Seek within the session + seekBody, _ := json.Marshal(seekRequest{ + SessionID: sessionID, + TimestampISO8601: midTime, + }) + req = httptest.NewRequest("POST", "/api/replay/seek", bytes.NewReader(seekBody)) + req.Header.Set("Content-Type", "application/json") + rr = httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != http.StatusOK { + t.Fatalf("Seek: expected 200, got %d: %s", rr.Code, rr.Body.String()) + } + + // 4. Verify session appears in list + req = httptest.NewRequest("GET", "/api/replay/sessions", nil) + rr = httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != http.StatusOK { + t.Fatalf("List: expected 200, got %d", rr.Code) + } + + var listResp map[string]interface{} + if err := json.NewDecoder(rr.Body).Decode(&listResp); err != nil { + t.Fatalf("Failed to decode list response: %v", err) + } + + sessions, ok := listResp["sessions"].([]interface{}) + if !ok { + t.Fatalf("Expected sessions array, got %T", listResp["sessions"]) + } + if len(sessions) != 1 { + t.Errorf("Expected 1 session, got %d", len(sessions)) + } + + // 5. Stop the session + stopBody, _ := json.Marshal(stopSessionRequest{ + SessionID: sessionID, + }) + req = httptest.NewRequest("POST", "/api/replay/stop", bytes.NewReader(stopBody)) + req.Header.Set("Content-Type", "application/json") + rr = httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != http.StatusOK { + t.Fatalf("Stop: expected 200, got %d: %s", rr.Code, rr.Body.String()) + } + + // 6. Verify session is removed from list + req = httptest.NewRequest("GET", "/api/replay/sessions", nil) + rr = httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != http.StatusOK { + t.Fatalf("List after stop: expected 200, got %d", rr.Code) + } + + json.NewDecoder(rr.Body).Decode(&listResp) + sessions, _ = listResp["sessions"].([]interface{}) + if len(sessions) != 0 { + t.Errorf("Expected 0 sessions after stop, got %d", len(sessions)) + } +} + +// TestMultipleSessions tests managing multiple concurrent replay sessions. +func TestMultipleSessions(t *testing.T) { + handler := newTestReplayHandler(t) + r := setupReplayRouter(handler) + + pastTime1 := time.Now().Add(-2 * time.Hour).Format(time.RFC3339Nano) + pastTime2 := time.Now().Add(-1 * time.Hour).Format(time.RFC3339Nano) + + // Start two sessions + startBody, _ := json.Marshal(startSessionRequest{ + FromISO8601: pastTime1, + Speed: 1, + }) + req := httptest.NewRequest("POST", "/api/replay/start", bytes.NewReader(startBody)) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + var resp1 map[string]interface{} + json.NewDecoder(rr.Body).Decode(&resp1) + sessionID1 := resp1["session_id"].(string) + + startBody, _ = json.Marshal(startSessionRequest{ + FromISO8601: pastTime2, + Speed: 5, + }) + req = httptest.NewRequest("POST", "/api/replay/start", bytes.NewReader(startBody)) + req.Header.Set("Content-Type", "application/json") + rr = httptest.NewRecorder() + r.ServeHTTP(rr, req) + + var resp2 map[string]interface{} + json.NewDecoder(rr.Body).Decode(&resp2) + sessionID2 := resp2["session_id"].(string) + + // Verify both sessions exist + req = httptest.NewRequest("GET", "/api/replay/sessions", nil) + rr = httptest.NewRecorder() + r.ServeHTTP(rr, req) + + var listResp map[string]interface{} + json.NewDecoder(rr.Body).Decode(&listResp) + sessions, _ := listResp["sessions"].([]interface{}) + + if len(sessions) != 2 { + t.Fatalf("Expected 2 sessions, got %d", len(sessions)) + } + + // Stop first session + stopBody, _ := json.Marshal(stopSessionRequest{ + SessionID: sessionID1, + }) + req = httptest.NewRequest("POST", "/api/replay/stop", bytes.NewReader(stopBody)) + req.Header.Set("Content-Type", "application/json") + rr = httptest.NewRecorder() + r.ServeHTTP(rr, req) + + // Verify one session remains + req = httptest.NewRequest("GET", "/api/replay/sessions", nil) + rr = httptest.NewRecorder() + r.ServeHTTP(rr, req) + + json.NewDecoder(rr.Body).Decode(&listResp) + sessions, _ = listResp["sessions"].([]interface{}) + + if len(sessions) != 1 { + t.Errorf("Expected 1 session after stopping one, got %d", len(sessions)) + } + + // Stop second session + stopBody, _ = json.Marshal(stopSessionRequest{ + SessionID: sessionID2, + }) + req = httptest.NewRequest("POST", "/api/replay/stop", bytes.NewReader(stopBody)) + req.Header.Set("Content-Type", "application/json") + rr = httptest.NewRecorder() + r.ServeHTTP(rr, req) + + // Verify no sessions remain + req = httptest.NewRequest("GET", "/api/replay/sessions", nil) + rr = httptest.NewRecorder() + r.ServeHTTP(rr, req) + + json.NewDecoder(rr.Body).Decode(&listResp) + sessions, _ = listResp["sessions"].([]interface{}) + + if len(sessions) != 0 { + t.Errorf("Expected 0 sessions after stopping both, got %d", len(sessions)) + } +} + +// TestGetSessions tests the GetSessions method. +func TestGetSessions(t *testing.T) { + handler := newTestReplayHandler(t) + + // Initially empty + sessions := handler.GetSessions() + if len(sessions) != 0 { + t.Errorf("Expected 0 sessions initially, got %d", len(sessions)) + } + + // Create a session + handler.mu.Lock() + handler.sessions["test-session"] = &_replaySession{ + ID: "test-session", + FromMS: 1000, + ToMS: 2000, + State: "paused", + } + handler.mu.Unlock() + + sessions = handler.GetSessions() + if len(sessions) != 1 { + t.Errorf("Expected 1 session, got %d", len(sessions)) + } + if sessions[0].ID != "test-session" { + t.Errorf("Expected session ID 'test-session', got %s", sessions[0].ID) + } +} + +// TestGetReplayPath tests the GetReplayPath method. +func TestGetReplayPath(t *testing.T) { + handler := newTestReplayHandler(t) + + path := handler.GetReplayPath() + if path != "/data/csi_replay.bin" { + t.Errorf("Expected path '/data/csi_replay.bin', got %s", path) + } +} + +// TestClose tests the Close method. +func TestClose(t *testing.T) { + store := &mockRecordingStore{} + handler, err := NewReplayHandler("/data/test.bin", store) + if err != nil { + t.Fatalf("NewReplayHandler: %v", err) + } + + if err := handler.Close(); err != nil { + t.Errorf("Close returned error: %v", err) + } + + if !store.closed { + t.Error("Expected store to be closed") + } +} + +// TestCloseWithError tests Close when store returns an error. +func TestCloseWithError(t *testing.T) { + expectedErr := fmt.Errorf("close failed") + store := &mockRecordingStore{closeErr: expectedErr} + handler, err := NewReplayHandler("/data/test.bin", store) + if err != nil { + t.Fatalf("NewReplayHandler: %v", err) + } + + if err := handler.Close(); err != expectedErr { + t.Errorf("Expected error %v, got %v", expectedErr, err) + } +} + +// TestParseISO8601 tests the parseISO8601 helper function. +func TestParseISO8601(t *testing.T) { + tests := []struct { + name string + input string + wantErr bool + check func(int64) bool + }{ + { + name: "valid RFC3339 timestamp", + input: "2024-03-15T14:30:00Z", + wantErr: false, + check: func(ms int64) bool { + expected := int64(1710519800000) // 2024-03-15 14:30:00 UTC in ms + return ms == expected + }, + }, + { + name: "valid RFC3339Nano timestamp", + input: "2024-03-15T14:30:00.123456789Z", + wantErr: false, + check: func(ms int64) bool { + return ms > 1710519800000 && ms < 1710519800200 + }, + }, + { + name: "invalid timestamp", + input: "not-a-timestamp", + wantErr: true, + check: nil, + }, + { + name: "empty string", + input: "", + wantErr: true, + check: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ms, err := parseISO8601(tt.input) + if (err != nil) != tt.wantErr { + t.Errorf("parseISO8601(%q) error = %v, wantErr %v", tt.input, err, tt.wantErr) + return + } + if !tt.wantErr && tt.check != nil { + if !tt.check(ms) { + t.Errorf("parseISO8601(%q) = %d, check failed", tt.input, ms) + } + } + }) + } +} + +// TestFormatTimestamp tests the formatTimestamp helper function. +func TestFormatTimestamp(t *testing.T) { + ms := int64(1710519800000) // 2024-03-15 14:30:00 UTC + formatted := formatTimestamp(ms) + + if formatted == "" { + t.Error("formatTimestamp returned empty string") + } + + // Verify it can be parsed back + _, err := time.Parse(time.RFC3339Nano, formatted) + if err != nil { + t.Errorf("formatTimestamp(%d) returned invalid format: %v", ms, err) + } +}