diff --git a/mothership/internal/analytics/anomaly.go b/mothership/internal/analytics/anomaly.go index 4375794..9f3f0c9 100644 --- a/mothership/internal/analytics/anomaly.go +++ b/mothership/internal/analytics/anomaly.go @@ -24,7 +24,7 @@ import ( type NormalBehaviourSlot struct { HourOfWeek int `json:"hour_of_week"` // 0-167 ZoneID string `json:"zone_id"` - ExpectedOccupancy float64 ` json:"expected_occupancy"` // 0.0-1.0, fraction of samples with occupancy + ExpectedOccupancy float64 `json:"expected_occupancy"` // 0.0-1.0, fraction of samples with occupancy TypicalPersonCount float64 `json:"typical_person_count"` // Mean person count SampleCount int `json:"sample_count"` TypicalBLEDevices map[string]float64 `json:"typical_ble_devices,omitempty"` // MAC -> frequency (0.0-1.0) @@ -923,6 +923,9 @@ func (d *Detector) createAnomaly(event *events.AnomalyEvent, isSecurityMode bool // Store in active anomalies d.activeAnomalies[event.ID] = event + // Also append to history + d.anomalyHistory = append(d.anomalyHistory, event) + // Persist to database d.persistAnomaly(event) @@ -1200,6 +1203,15 @@ func (d *Detector) UpdateBehaviourModel() error { log.Printf("[INFO] Updating behaviour model from collected samples...") // Update behaviour slots from occupancy samples + // First, collect all slots into memory to avoid holding a query connection + // while doing nested queries (deadlock with SetMaxOpenConns(1)). + type aggSlot struct { + HourOfWeek int + ZoneID string + ExpectedOccupancy float64 + TypicalPersonCount float64 + SampleCount int + } rows, err := d.db.Query(` SELECT hour_of_week, zone_id, AVG(CASE WHEN person_count > 0 THEN 1.0 ELSE 0.0 END) as expected_occupancy, @@ -1211,16 +1223,27 @@ func (d *Detector) UpdateBehaviourModel() error { if err != nil { return err } - defer rows.Close() + var slots []aggSlot for rows.Next() { + var s aggSlot + if err := rows.Scan(&s.HourOfWeek, &s.ZoneID, &s.ExpectedOccupancy, + &s.TypicalPersonCount, &s.SampleCount); err != nil { + continue + } + slots = append(slots, s) + } + rows.Close() + + for _, s := range slots { slot := &NormalBehaviourSlot{ TypicalBLEDevices: make(map[string]float64), } - if err := rows.Scan(&slot.HourOfWeek, &slot.ZoneID, &slot.ExpectedOccupancy, - &slot.TypicalPersonCount, &slot.SampleCount); err != nil { - continue - } + slot.HourOfWeek = s.HourOfWeek + slot.ZoneID = s.ZoneID + slot.ExpectedOccupancy = s.ExpectedOccupancy + slot.TypicalPersonCount = s.TypicalPersonCount + slot.SampleCount = s.SampleCount // Calculate typical BLE devices (seen in > 50% of this slot) bleRows, err := d.db.Query(` diff --git a/mothership/internal/analytics/anomaly_test.go b/mothership/internal/analytics/anomaly_test.go index a8b58d3..fcab1a1 100644 --- a/mothership/internal/analytics/anomaly_test.go +++ b/mothership/internal/analytics/anomaly_test.go @@ -300,8 +300,11 @@ func TestAnomaly_UnusualDwell(t *testing.T) { t.Errorf("Expected unusual_dwell anomaly, got %s", event.Type) } + // Wait for async alert goroutine to complete + time.Sleep(10 * time.Millisecond) + // Check alert was sent - if len(alertHandler.alerts) == 0 { + if alertHandler.alertCount() == 0 { t.Error("Expected alert to be sent") } } diff --git a/mothership/internal/analytics/handler.go b/mothership/internal/analytics/handler.go index 314e086..ab90bf7 100644 --- a/mothership/internal/analytics/handler.go +++ b/mothership/internal/analytics/handler.go @@ -7,6 +7,7 @@ import ( "time" "github.com/go-chi/chi" + "github.com/spaxel/mothership/internal/events" ) // Handler provides REST API handlers for analytics. @@ -133,7 +134,9 @@ func (h *AnomalyHandler) RegisterRoutes(r chi.Router) { r.Post("/api/anomalies/model/update", h.handleUpdateModel) } -// handleGetAnomalies returns all anomalies (active + recent history). +// handleGetAnomalies returns anomalies filtered by the `since` query parameter. +// Query params: +// - since: duration string (e.g. "24h", "7d", "1h"). Default "24h". func (h *AnomalyHandler) handleGetAnomalies(w http.ResponseWriter, r *http.Request) { if h.detector == nil { http.Error(w, "anomaly detector not available", http.StatusServiceUnavailable) @@ -141,11 +144,35 @@ func (h *AnomalyHandler) handleGetAnomalies(w http.ResponseWriter, r *http.Reque } active := h.detector.GetActiveAnomalies() - history := h.detector.GetAnomalyHistory(50) + + // Parse since duration + sinceStr := r.URL.Query().Get("since") + if sinceStr == "" { + sinceStr = "24h" + } + sinceDur, err := time.ParseDuration(sinceStr) + if err != nil { + http.Error(w, "invalid since duration: "+err.Error(), http.StatusBadRequest) + return + } + + // Fetch enough history to cover the since window + limit := 1000 + history := h.detector.GetAnomalyHistory(limit) + + // Filter history by since timestamp + cutoff := time.Now().Add(-sinceDur) + var filtered []*events.AnomalyEvent + for _, ev := range history { + if ev.Timestamp.After(cutoff) { + filtered = append(filtered, ev) + } + } response := map[string]interface{}{ "active": active, - "history": history, + "history": filtered, + "since": sinceStr, } writeJSON(w, response) } @@ -250,3 +277,39 @@ func (h *AnomalyHandler) handleUpdateModel(w http.ResponseWriter, r *http.Reques writeJSON(w, map[string]string{"status": "updated"}) } + +// PatternHandler provides REST API handlers for the Welford pattern learner. +type PatternHandler struct { + learner *PatternLearner +} + +// NewPatternHandler creates a new pattern handler. +func NewPatternHandler(learner *PatternLearner) *PatternHandler { + return &PatternHandler{learner: learner} +} + +// RegisterRoutes registers pattern API routes on the given router. +func (h *PatternHandler) RegisterRoutes(r chi.Router) { + r.Get("/api/anomaly_patterns", h.handleGetPatterns) +} + +// handleGetPatterns returns pattern model data for debugging. +// Query params: +// - zone: filter by zone_id (string). If omitted, returns all patterns. +func (h *PatternHandler) handleGetPatterns(w http.ResponseWriter, r *http.Request) { + if h.learner == nil { + http.Error(w, "pattern learner not available", http.StatusServiceUnavailable) + return + } + + zoneID := r.URL.Query().Get("zone") + + patterns := h.learner.GetPatterns(zoneID) + + response := map[string]interface{}{ + "cold_start": h.learner.IsColdStart(), + "patterns": patterns, + "count": len(patterns), + } + writeJSON(w, response) +} diff --git a/mothership/internal/analytics/patterns.go b/mothership/internal/analytics/patterns.go index 987198c..06c5e32 100644 --- a/mothership/internal/analytics/patterns.go +++ b/mothership/internal/analytics/patterns.go @@ -6,6 +6,8 @@ import ( "fmt" "log" "math" + "os" + "path/filepath" "sync" "time" ) @@ -32,7 +34,7 @@ const ( // PatternSlot represents a single (zone_id, hour_of_day, day_of_week) statistical slot. type PatternSlot struct { - ZoneID int `json:"zone_id"` + ZoneID string `json:"zone_id"` HourOfDay int `json:"hour_of_day"` // 0-23 DayOfWeek int `json:"day_of_week"` // 0-6 (0=Sunday) MeanCount float64 `json:"mean_count"` @@ -43,18 +45,18 @@ type PatternSlot struct { // patternKey is the composite key for pattern slots. type patternKey struct { - zoneID int + zoneID string hourOfDay int dayOfWeek int } // OccupancyProvider provides current zone occupancy counts. type OccupancyProvider interface { - GetZoneOccupancyCounts() map[int]int // zone_id -> blob count + GetZoneOccupancyCounts() map[string]int // zone_id -> blob count } // PatternLearner learns occupancy patterns using Welford's online algorithm. -// It persists to the anomaly_patterns table in the main database. +// It persists to the anomaly_patterns table in its database. type PatternLearner struct { mu sync.RWMutex db *sql.DB @@ -65,16 +67,31 @@ type PatternLearner struct { patterns map[patternKey]*PatternSlot } -// NewPatternLearner creates a new pattern learner using the main database. -func NewPatternLearner(db *sql.DB) (*PatternLearner, error) { - // Ensure required tables exist - _, err := db.Exec(` +// NewPatternLearner creates a new pattern learner backed by its own SQLite database. +func NewPatternLearner(dbPath string) (*PatternLearner, error) { + if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil { + return nil, fmt.Errorf("create data dir: %w", err) + } + + db, err := sql.Open("sqlite", dbPath) + if err != nil { + return nil, fmt.Errorf("open sqlite: %w", err) + } + db.SetMaxOpenConns(1) + + pl := &PatternLearner{ + db: db, + patterns: make(map[patternKey]*PatternSlot), + } + + // Create tables + _, err = db.Exec(` CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value_json TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS anomaly_patterns ( - zone_id INTEGER NOT NULL, + zone_id TEXT NOT NULL, hour_of_day INTEGER NOT NULL CHECK (hour_of_day BETWEEN 0 AND 23), day_of_week INTEGER NOT NULL CHECK (day_of_week BETWEEN 0 AND 6), mean_count REAL NOT NULL DEFAULT 0, @@ -85,14 +102,10 @@ func NewPatternLearner(db *sql.DB) (*PatternLearner, error) { ); `) if err != nil { + db.Close() return nil, fmt.Errorf("create pattern tables: %w", err) } - pl := &PatternLearner{ - db: db, - patterns: make(map[patternKey]*PatternSlot), - } - // Try to load learning start time from settings var startMs int64 err = db.QueryRow(`SELECT value_json FROM settings WHERE key = 'pattern_learning_start_ms'`).Scan(&startMs) @@ -134,6 +147,11 @@ func (pl *PatternLearner) loadPatterns() error { return rows.Err() } +// Close closes the database. +func (pl *PatternLearner) Close() error { + return pl.db.Close() +} + // IsColdStart returns true if the system is within the 7-day cold start period. func (pl *PatternLearner) IsColdStart() bool { pl.mu.RLock() @@ -142,7 +160,7 @@ func (pl *PatternLearner) IsColdStart() bool { } // IsSlotReady returns true if a specific pattern slot has enough samples. -func (pl *PatternLearner) IsSlotReady(zoneID, hourOfDay, dayOfWeek int) bool { +func (pl *PatternLearner) IsSlotReady(zoneID string, hourOfDay, dayOfWeek int) bool { pl.mu.RLock() defer pl.mu.RUnlock() @@ -152,7 +170,7 @@ func (pl *PatternLearner) IsSlotReady(zoneID, hourOfDay, dayOfWeek int) bool { } // GetPattern returns a pattern slot for inspection (returns a copy). -func (pl *PatternLearner) GetPattern(zoneID, hourOfDay, dayOfWeek int) *PatternSlot { +func (pl *PatternLearner) GetPattern(zoneID string, hourOfDay, dayOfWeek int) *PatternSlot { pl.mu.RLock() defer pl.mu.RUnlock() @@ -166,13 +184,13 @@ func (pl *PatternLearner) GetPattern(zoneID, hourOfDay, dayOfWeek int) *PatternS } // GetPatterns returns all patterns, optionally filtered by zone. -func (pl *PatternLearner) GetPatterns(zoneID int) []*PatternSlot { +func (pl *PatternLearner) GetPatterns(zoneID string) []*PatternSlot { pl.mu.RLock() defer pl.mu.RUnlock() var result []*PatternSlot for key, slot := range pl.patterns { - if zoneID > 0 && key.zoneID != zoneID { + if zoneID != "" && key.zoneID != zoneID { continue } cp := *slot @@ -199,7 +217,7 @@ func WelfordUpdate(mean, m2, count, newValue float64) (newMean, newM2, newCount // ObserveAndUpdate records an observation and updates the model using Welford's algorithm. // anomalyScore is the current anomaly score for this observation (0 if not yet computed). // If anomalyScore >= OutlierProtectionThreshold, the model update is skipped (outlier protection). -func (pl *PatternLearner) ObserveAndUpdate(zoneID, hourOfDay, dayOfWeek int, observedCount int, anomalyScore float64) error { +func (pl *PatternLearner) ObserveAndUpdate(zoneID string, hourOfDay, dayOfWeek int, observedCount int, anomalyScore float64) error { // Outlier protection: don't learn from anomalies if anomalyScore >= OutlierProtectionThreshold { return nil @@ -266,7 +284,7 @@ type AnomalyResult struct { } // ComputeAnomalyScore computes the anomaly score for an observation. -func (pl *PatternLearner) ComputeAnomalyScore(zoneID, hourOfDay, dayOfWeek int, observedCount int) AnomalyResult { +func (pl *PatternLearner) ComputeAnomalyScore(zoneID string, hourOfDay, dayOfWeek int, observedCount int) AnomalyResult { pl.mu.RLock() defer pl.mu.RUnlock() @@ -320,7 +338,7 @@ func (pl *PatternLearner) ComputeAnomalyScore(zoneID, hourOfDay, dayOfWeek int, // computeScoreLocked computes anomaly score while holding the write lock. // Used internally by updateAllZones to avoid lock ordering issues. -func (pl *PatternLearner) computeScoreLocked(zoneID, hourOfDay, dayOfWeek int, observedCount int) float64 { +func (pl *PatternLearner) computeScoreLocked(zoneID string, hourOfDay, dayOfWeek int, observedCount int) float64 { if pl.securityMode { return 1.0 } @@ -466,7 +484,7 @@ func (pl *PatternLearner) updateAllZones(provider OccupancyProvider) { updated_at = excluded.updated_at `, zoneID, hourOfDay, dayOfWeek, newMean, variance, int(newCount), nowMs) if err != nil { - log.Printf("[WARN] Failed to update pattern for zone %d: %v", zoneID, err) + log.Printf("[WARN] Failed to update pattern for zone %s: %v", zoneID, err) continue } diff --git a/mothership/internal/analytics/patterns_test.go b/mothership/internal/analytics/patterns_test.go index 8b58b7a..4604766 100644 --- a/mothership/internal/analytics/patterns_test.go +++ b/mothership/internal/analytics/patterns_test.go @@ -20,7 +20,8 @@ func openTestDB(t *testing.T) *sql.DB { } t.Cleanup(func() { os.RemoveAll(tmpDir) }) - db, err := sql.Open("sqlite", filepath.Join(tmpDir, "test.db")) + dbPath := filepath.Join(tmpDir, "test.db") + db, err := sql.Open("sqlite", dbPath) if err != nil { t.Fatalf("open sqlite: %v", err) } @@ -33,7 +34,7 @@ func openTestDB(t *testing.T) *sql.DB { value_json TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS anomaly_patterns ( - zone_id INTEGER NOT NULL, + zone_id TEXT NOT NULL, hour_of_day INTEGER NOT NULL CHECK (hour_of_day BETWEEN 0 AND 23), day_of_week INTEGER NOT NULL CHECK (day_of_week BETWEEN 0 AND 6), mean_count REAL NOT NULL DEFAULT 0, @@ -50,50 +51,67 @@ func openTestDB(t *testing.T) *sql.DB { return db } +// newTestLearner creates a PatternLearner backed by a temp database. +func newTestLearner(t *testing.T) *PatternLearner { + t.Helper() + tmpDir, err := os.MkdirTemp("", "pattern_learner_test") + if err != nil { + t.Fatalf("create temp dir: %v", err) + } + t.Cleanup(func() { os.RemoveAll(tmpDir) }) + + pl, err := NewPatternLearner(filepath.Join(tmpDir, "patterns.db")) + if err != nil { + t.Fatalf("NewPatternLearner: %v", err) + } + t.Cleanup(func() { pl.Close() }) + return pl +} + // --- Welford's algorithm tests --- func TestWelfordUpdate_NumericalStability(t *testing.T) { tests := []struct { - name string + name string observations []float64 - wantMean float64 - wantVar float64 + wantMean float64 + wantVar float64 }{ { - name: "single observation", + name: "single observation", observations: []float64{5.0}, - wantMean: 5.0, - wantVar: 0.0, + wantMean: 5.0, + wantVar: 0.0, }, { - name: "two identical observations", + name: "two identical observations", observations: []float64{3.0, 3.0}, - wantMean: 3.0, - wantVar: 0.0, + wantMean: 3.0, + wantVar: 0.0, }, { - name: "three observations", + name: "three observations", observations: []float64{1.0, 2.0, 6.0}, - wantMean: 3.0, - wantVar: 4.666666666666667, + wantMean: 3.0, + wantVar: 4.666666666666667, }, { - name: "zero observations then non-zero", + name: "zero observations then non-zero", observations: []float64{0.0, 0.0, 0.0, 5.0}, - wantMean: 1.25, - wantVar: 4.6875, + wantMean: 1.25, + wantVar: 4.6875, }, { - name: "large count stability", - observations: makeSequence(2.0, 1000), // 1000 observations of 2.0 - wantMean: 2.0, - wantVar: 0.0, + name: "large count stability", + observations: makeSequence(2.0, 1000), + wantMean: 2.0, + wantVar: 0.0, }, { - name: "large count with variance", - observations: makeSequence(5.0, 100), // 100 obs of 5.0 then... - wantMean: 5.0, - wantVar: 0.0, + name: "large count with variance", + observations: makeSequence(5.0, 100), + wantMean: 5.0, + wantVar: 0.0, }, } @@ -129,10 +147,8 @@ func TestWelfordUpdate_NumericalStability(t *testing.T) { } func TestWelfordUpdate_NoNaNInf_AnySampleCount(t *testing.T) { - // Verify that Welford's update never produces NaN or Inf at any sample count mean, m2, count := 0.0, 0.0, 0.0 for i := 0; i < 10000; i++ { - // Alternate between extreme values obs := float64(i%100) * 0.01 mean, m2, count = WelfordUpdate(mean, m2, count, obs) @@ -145,7 +161,6 @@ func TestWelfordUpdate_NoNaNInf_AnySampleCount(t *testing.T) { t.Fatalf("NaN/Inf variance at sample %d: variance=%v, m2=%v, count=%v", i+1, variance, m2, count) } - // Variance should always be non-negative if variance < -1e-12 { t.Fatalf("negative variance at sample %d: %v", i+1, variance) } @@ -153,17 +168,14 @@ func TestWelfordUpdate_NoNaNInf_AnySampleCount(t *testing.T) { } func TestWelfordUpdate_MatchesBatchVariance(t *testing.T) { - // Compare Welford online variance with batch computation observations := []float64{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0} - // Welford online mean, m2, count := 0.0, 0.0, 0.0 for _, obs := range observations { mean, m2, count = WelfordUpdate(mean, m2, count, obs) } onlineVar := m2 / count - // Batch computation var batchMean float64 for _, obs := range observations { batchMean += obs @@ -188,15 +200,15 @@ func TestNormalizeZScore(t *testing.T) { z float64 want float64 }{ - {0.0, 0.0}, // Below 1σ - {0.5, 0.0}, // Below 1σ - {1.0, 0.0}, // Exactly 1σ boundary - {2.0, 0.333}, // 2σ → (2-1)/3 - {3.0, 0.667}, // 3σ → (3-1)/3 - {4.0, 1.0}, // 4σ → (4-1)/3 = 1.0 - {5.0, 1.0}, // Above 4σ → clamped to 1.0 - {-1.5, 0.167}, // Negative z-score, |z| = 1.5 → (1.5-1)/3 - {-4.0, 1.0}, // Negative, clamped + {0.0, 0.0}, + {0.5, 0.0}, + {1.0, 0.0}, + {2.0, 0.333}, + {3.0, 0.667}, + {4.0, 1.0}, + {5.0, 1.0}, + {-1.5, 0.167}, + {-4.0, 1.0}, } for _, tt := range tests { @@ -210,19 +222,13 @@ func TestNormalizeZScore(t *testing.T) { // --- PatternLearner tests --- func TestPatternLearner_ColdStart(t *testing.T) { - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } + pl := newTestLearner(t) - // Fresh learner should be in cold start if !pl.IsColdStart() { t.Error("expected cold start for new learner") } - // Anomaly score should be suppressed during cold start - result := pl.ComputeAnomalyScore(1, 12, 0, 5) + result := pl.ComputeAnomalyScore("zone-1", 12, 0, 5) if !result.Suppressed { t.Error("expected anomaly score to be suppressed during cold start") } @@ -232,45 +238,32 @@ func TestPatternLearner_ColdStart(t *testing.T) { } func TestPatternLearner_SlotNotReady(t *testing.T) { - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } + pl := newTestLearner(t) - // Even after loading, a slot that doesn't exist should not be ready - if pl.IsSlotReady(1, 12, 0) { + if pl.IsSlotReady("zone-1", 12, 0) { t.Error("expected slot not ready") } - result := pl.ComputeAnomalyScore(1, 12, 0, 5) + result := pl.ComputeAnomalyScore("zone-1", 12, 0, 5) if !result.Suppressed { t.Error("expected anomaly score suppressed when slot not ready") } } func TestPatternLearner_ObserveAndUpdate_Persists(t *testing.T) { - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } + pl := newTestLearner(t) - // Record 50 observations to make the slot ready for i := 0; i < 50; i++ { - err := pl.ObserveAndUpdate(1, 12, 0, 2, 0) - if err != nil { + if err := pl.ObserveAndUpdate("zone-1", 12, 0, 2, 0); err != nil { t.Fatalf("ObserveAndUpdate: %v", err) } } - // Slot should now be ready - if !pl.IsSlotReady(1, 12, 0) { + if !pl.IsSlotReady("zone-1", 12, 0) { t.Error("expected slot to be ready after 50 observations") } - // Verify the pattern was persisted - slot := pl.GetPattern(1, 12, 0) + slot := pl.GetPattern("zone-1", 12, 0) if slot == nil { t.Fatal("expected pattern to exist") } @@ -280,70 +273,53 @@ func TestPatternLearner_ObserveAndUpdate_Persists(t *testing.T) { if slot.SampleCount != 50 { t.Errorf("expected sample_count=50, got %d", slot.SampleCount) } - // Variance should be 0 since all observations are identical if slot.Variance > 1e-9 { t.Errorf("expected variance=0 for identical observations, got %v", slot.Variance) } } func TestPatternLearner_ObserveAndUpdate_WithVariance(t *testing.T) { - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } + pl := newTestLearner(t) - // Record observations: [0, 1, 2, 3, 4] repeated to get 50 samples for i := 0; i < 50; i++ { - err := pl.ObserveAndUpdate(1, 12, 0, i%5, 0) - if err != nil { + if err := pl.ObserveAndUpdate("zone-1", 12, 0, i%5, 0); err != nil { t.Fatalf("ObserveAndUpdate: %v", err) } } - slot := pl.GetPattern(1, 12, 0) + slot := pl.GetPattern("zone-1", 12, 0) if slot == nil { t.Fatal("expected pattern") } - // Mean of [0,1,2,3,4] repeated = 2.0 if math.Abs(slot.MeanCount-2.0) > 1e-9 { t.Errorf("expected mean=2.0, got %v", slot.MeanCount) } - // Variance of [0,1,2,3,4] = 2.0 if math.Abs(slot.Variance-2.0) > 1e-6 { t.Errorf("expected variance=2.0, got %v", slot.Variance) } } func TestPatternLearner_OutlierProtection(t *testing.T) { - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } + pl := newTestLearner(t) - // Establish a pattern with 50 samples of count=0 (zone usually empty) for i := 0; i < 50; i++ { - err := pl.ObserveAndUpdate(1, 12, 0, 0, 0) - if err != nil { + if err := pl.ObserveAndUpdate("zone-1", 12, 0, 0, 0); err != nil { t.Fatalf("ObserveAndUpdate: %v", err) } } - slotBefore := pl.GetPattern(1, 12, 0) + slotBefore := pl.GetPattern("zone-1", 12, 0) meanBefore := slotBefore.MeanCount countBefore := slotBefore.SampleCount - // Try to update with an anomaly score >= 0.5 — should be skipped - err = pl.ObserveAndUpdate(1, 12, 0, 100, 0.6) - if err != nil { + // Outlier should be skipped + if err := pl.ObserveAndUpdate("zone-1", 12, 0, 100, 0.6); err != nil { t.Fatalf("ObserveAndUpdate: %v", err) } - // Verify the model was NOT updated - slotAfter := pl.GetPattern(1, 12, 0) + slotAfter := pl.GetPattern("zone-1", 12, 0) if slotAfter.MeanCount != meanBefore { t.Errorf("outlier protection failed: mean changed from %v to %v", meanBefore, slotAfter.MeanCount) } @@ -353,28 +329,21 @@ func TestPatternLearner_OutlierProtection(t *testing.T) { } func TestPatternLearner_OutlierProtection_AfterMultipleAnomalies(t *testing.T) { - // Bead spec: injecting synthetic anomaly does not corrupt model after 3 occurrences - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } + pl := newTestLearner(t) - // Establish baseline: 50 samples of count=1 for i := 0; i < 50; i++ { - pl.ObserveAndUpdate(1, 12, 0, 1, 0) + pl.ObserveAndUpdate("zone-1", 12, 0, 1, 0) } - slot := pl.GetPattern(1, 12, 0) + slot := pl.GetPattern("zone-1", 12, 0) meanBefore := slot.MeanCount - // Inject 3 synthetic anomalies (count=50 with high anomaly score) + // Inject 3 synthetic anomalies for i := 0; i < 3; i++ { - pl.ObserveAndUpdate(1, 12, 0, 50, 1.0) + pl.ObserveAndUpdate("zone-1", 12, 0, 50, 1.0) } - // Model should be unchanged - slot = pl.GetPattern(1, 12, 0) + slot = pl.GetPattern("zone-1", 12, 0) if slot.SampleCount != 50 { t.Errorf("expected sample_count to remain 50, got %d", slot.SampleCount) } @@ -384,16 +353,11 @@ func TestPatternLearner_OutlierProtection_AfterMultipleAnomalies(t *testing.T) { } func TestPatternLearner_SecurityModeOverride(t *testing.T) { - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } + pl := newTestLearner(t) pl.SetSecurityMode(true) - // Any detection in security mode should score 1.0 - result := pl.ComputeAnomalyScore(1, 12, 0, 0) + result := pl.ComputeAnomalyScore("zone-1", 12, 0, 0) if result.CompositeScore != 1.0 { t.Errorf("security mode: expected composite=1.0, got %v", result.CompositeScore) } @@ -401,8 +365,7 @@ func TestPatternLearner_SecurityModeOverride(t *testing.T) { t.Error("security mode: expected is_alert=true") } - // Even with 0 observed count - result = pl.ComputeAnomalyScore(1, 12, 0, 0) + result = pl.ComputeAnomalyScore("zone-1", 12, 0, 0) if result.CompositeScore != 1.0 { t.Errorf("security mode with 0 count: expected composite=1.0, got %v", result.CompositeScore) } @@ -411,23 +374,14 @@ func TestPatternLearner_SecurityModeOverride(t *testing.T) { } func TestPatternLearner_AnomalyScoring(t *testing.T) { - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } - - // Move past cold start so scoring is active + pl := newTestLearner(t) pl.SetLearningStartTime(time.Now().Add(-8 * 24 * time.Hour)) - // Build a pattern: zone usually has 0 occupants at this time - // 50 samples of count=0 for i := 0; i < 50; i++ { - pl.ObserveAndUpdate(1, 3, 0, 0, 0) // 3 AM, Sunday + pl.ObserveAndUpdate("zone-1", 3, 0, 0, 0) } - // Check: observing 0 people should score low - result := pl.ComputeAnomalyScore(1, 3, 0, 0) + result := pl.ComputeAnomalyScore("zone-1", 3, 0, 0) if result.CompositeScore > 0.01 { t.Errorf("expected low score for expected observation, got %v", result.CompositeScore) } @@ -435,9 +389,7 @@ func TestPatternLearner_AnomalyScoring(t *testing.T) { t.Error("expected not suppressed when slot is ready") } - // Check: observing people at 3 AM when zone is normally empty - // This should trigger zone_score = 1.0 (zone normally empty, now occupied) - result = pl.ComputeAnomalyScore(1, 3, 0, 3) + result = pl.ComputeAnomalyScore("zone-1", 3, 0, 3) if result.ZoneScore != 1.0 { t.Errorf("expected zone_score=1.0 when zone normally empty, got %v", result.ZoneScore) } @@ -450,98 +402,82 @@ func TestPatternLearner_AnomalyScoring(t *testing.T) { } func TestPatternLearner_AnomalyScoring_ZScoreBased(t *testing.T) { - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } - - // Move past cold start so scoring is active + pl := newTestLearner(t) pl.SetLearningStartTime(time.Now().Add(-8 * 24 * time.Hour)) - // Build a pattern: zone has mean=2, need variance - // Feed [1, 3] alternating to get mean=2, variance=1 for i := 0; i < 50; i++ { - pl.ObserveAndUpdate(1, 14, 0, 1+i%2, 0) // 2 PM, alternating 1 and 2 + pl.ObserveAndUpdate("zone-1", 14, 0, 1+i%2, 0) } - slot := pl.GetPattern(1, 14, 0) + slot := pl.GetPattern("zone-1", 14, 0) if slot == nil { t.Fatal("expected pattern") } - // Observation at the mean should score low - result := pl.ComputeAnomalyScore(1, 14, 0, 2) + result := pl.ComputeAnomalyScore("zone-1", 14, 0, 2) if result.TimeScore > 0.01 { t.Errorf("expected low time_score for mean observation, got %v", result.TimeScore) } - // Observation far from mean should score higher - // z = (10 - 2) / sqrt(variance + epsilon) — should be well above 4σ - result = pl.ComputeAnomalyScore(1, 14, 0, 10) + result = pl.ComputeAnomalyScore("zone-1", 14, 0, 10) if result.TimeScore < 0.9 { t.Errorf("expected high time_score for extreme observation, got %v", result.TimeScore) } } func TestPatternLearner_GetPatterns(t *testing.T) { - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } + pl := newTestLearner(t) - // Add patterns for two zones for i := 0; i < 50; i++ { - pl.ObserveAndUpdate(1, 12, 0, 2, 0) - pl.ObserveAndUpdate(2, 12, 0, 3, 0) + pl.ObserveAndUpdate("zone-1", 12, 0, 2, 0) + pl.ObserveAndUpdate("zone-2", 12, 0, 3, 0) } - // Get all patterns - all := pl.GetPatterns(0) + all := pl.GetPatterns("") if len(all) != 2 { t.Errorf("expected 2 patterns, got %d", len(all)) } - // Filter by zone - zone1 := pl.GetPatterns(1) + zone1 := pl.GetPatterns("zone-1") if len(zone1) != 1 { - t.Errorf("expected 1 pattern for zone 1, got %d", len(zone1)) + t.Errorf("expected 1 pattern for zone-1, got %d", len(zone1)) } - if zone1[0].ZoneID != 1 { - t.Errorf("expected zone_id=1, got %d", zone1[0].ZoneID) + if zone1[0].ZoneID != "zone-1" { + t.Errorf("expected zone_id=zone-1, got %s", zone1[0].ZoneID) } } func TestPatternLearner_SurvivesRestart(t *testing.T) { - db := openTestDB(t) + tmpDir, err := os.MkdirTemp("", "pattern_restart_test") + if err != nil { + t.Fatalf("create temp dir: %v", err) + } + t.Cleanup(func() { os.RemoveAll(tmpDir) }) - // Create learner and add data - pl1, err := NewPatternLearner(db) + dbPath := filepath.Join(tmpDir, "patterns.db") + + pl1, err := NewPatternLearner(dbPath) if err != nil { t.Fatalf("NewPatternLearner: %v", err) } for i := 0; i < 50; i++ { - pl1.ObserveAndUpdate(1, 12, 0, 2, 0) + pl1.ObserveAndUpdate("zone-1", 12, 0, 2, 0) } - // Close and recreate (simulates server restart) - pl1.mu.Lock() // Clear in-memory cache to simulate fresh start - pl1.patterns = make(map[patternKey]*PatternSlot) - pl1.mu.Unlock() + pl1.Close() - pl2, err := NewPatternLearner(db) + pl2, err := NewPatternLearner(dbPath) if err != nil { t.Fatalf("NewPatternLearner after restart: %v", err) } + defer pl2.Close() - // Pattern should be loaded from DB - if !pl2.IsSlotReady(1, 12, 0) { + if !pl2.IsSlotReady("zone-1", 12, 0) { t.Error("expected slot to be ready after reload from DB") } - slot := pl2.GetPattern(1, 12, 0) + slot := pl2.GetPattern("zone-1", 12, 0) if slot == nil { t.Fatal("expected pattern after restart") } @@ -572,17 +508,13 @@ func TestPatternLearner_AlertThresholds(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } + pl := newTestLearner(t) for _, obs := range tt.observations { - pl.ObserveAndUpdate(1, 14, 0, obs, 0) + pl.ObserveAndUpdate("zone-1", 14, 0, obs, 0) } - result := pl.ComputeAnomalyScore(1, 14, 0, tt.testCount) + result := pl.ComputeAnomalyScore("zone-1", 14, 0, tt.testCount) if result.IsAlert != tt.wantAlert { t.Errorf("is_alert = %v, want %v (composite=%v)", result.IsAlert, tt.wantAlert, result.CompositeScore) } @@ -594,21 +526,16 @@ func TestPatternLearner_AlertThresholds(t *testing.T) { } func TestPatternLearner_NaNInf_NeverProduced(t *testing.T) { - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } + pl := newTestLearner(t) - // Feed extreme and varied observations observations := []int{0, 100, 0, 100, 0, 100, 1, 99, 50, 0} - for i := 0; i < 5; i++ { // Repeat to get 50+ samples + for i := 0; i < 5; i++ { for _, obs := range observations { - pl.ObserveAndUpdate(1, 14, 0, obs, 0) + pl.ObserveAndUpdate("zone-1", 14, 0, obs, 0) } } - slot := pl.GetPattern(1, 14, 0) + slot := pl.GetPattern("zone-1", 14, 0) if slot == nil { t.Fatal("expected pattern") } @@ -620,9 +547,8 @@ func TestPatternLearner_NaNInf_NeverProduced(t *testing.T) { t.Error("variance is NaN or Inf") } - // Score should also never be NaN/Inf for _, obs := range []int{0, 1, 5, 50, 100, 200} { - result := pl.ComputeAnomalyScore(1, 14, 0, obs) + result := pl.ComputeAnomalyScore("zone-1", 14, 0, obs) if math.IsNaN(result.CompositeScore) || math.IsInf(result.CompositeScore, 0) { t.Errorf("NaN/Inf composite for obs=%d: %v", obs, result.CompositeScore) } @@ -633,24 +559,17 @@ func TestPatternLearner_NaNInf_NeverProduced(t *testing.T) { } func TestPatternLearner_NoAlertsDuringColdStart(t *testing.T) { - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } + pl := newTestLearner(t) - // Even with extreme observations during cold start, no alerts for i := 0; i < 100; i++ { - pl.ObserveAndUpdate(1, 3, 0, 50, 0) + pl.ObserveAndUpdate("zone-1", 3, 0, 50, 0) } - // Still in cold start (7 days haven't passed) if !pl.IsColdStart() { t.Log("note: cold start check depends on timing") } - // ComputeAnomalyScore should suppress - result := pl.ComputeAnomalyScore(1, 3, 0, 50) + result := pl.ComputeAnomalyScore("zone-1", 3, 0, 50) if !result.Suppressed { t.Error("expected anomaly score to be suppressed during cold start regardless of activity") } @@ -662,70 +581,56 @@ func TestPatternLearner_NoAlertsDuringColdStart(t *testing.T) { // --- Integration test: hourly update with mock provider --- type mockOccupancyProvider struct { - counts map[int]int + counts map[string]int } -func (m *mockOccupancyProvider) GetZoneOccupancyCounts() map[int]int { +func (m *mockOccupancyProvider) GetZoneOccupancyCounts() map[string]int { return m.counts } func TestPatternLearner_HourlyUpdate_Integration(t *testing.T) { - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } + pl := newTestLearner(t) provider := &mockOccupancyProvider{ - counts: map[int]int{1: 2, 2: 0}, + counts: map[string]int{"zone-1": 2, "zone-2": 0}, } - // Manually trigger the update (instead of waiting for the hourly timer) pl.updateAllZones(provider) - // Check patterns were created - slot1 := pl.GetPattern(1, time.Now().Hour(), int(time.Now().Weekday())) + slot1 := pl.GetPattern("zone-1", time.Now().Hour(), int(time.Now().Weekday())) if slot1 == nil { - t.Fatal("expected pattern for zone 1 after hourly update") + t.Fatal("expected pattern for zone-1 after hourly update") } if slot1.MeanCount != 2.0 { - t.Errorf("expected mean=2.0 for zone 1, got %v", slot1.MeanCount) + t.Errorf("expected mean=2.0 for zone-1, got %v", slot1.MeanCount) } - slot2 := pl.GetPattern(2, time.Now().Hour(), int(time.Now().Weekday())) + slot2 := pl.GetPattern("zone-2", time.Now().Hour(), int(time.Now().Weekday())) if slot2 == nil { - t.Fatal("expected pattern for zone 2 after hourly update") + t.Fatal("expected pattern for zone-2 after hourly update") } if slot2.MeanCount != 0.0 { - t.Errorf("expected mean=0.0 for zone 2, got %v", slot2.MeanCount) + t.Errorf("expected mean=0.0 for zone-2, got %v", slot2.MeanCount) } } func TestPatternLearner_HourlyUpdate_OutlierProtectionInUpdate(t *testing.T) { - db := openTestDB(t) - pl, err := NewPatternLearner(db) - if err != nil { - t.Fatalf("NewPatternLearner: %v", err) - } + pl := newTestLearner(t) - // First establish a baseline for i := 0; i < 50; i++ { - pl.ObserveAndUpdate(1, 12, 0, 1, 0) + pl.ObserveAndUpdate("zone-1", 12, 0, 1, 0) } - slotBefore := pl.GetPattern(1, 12, 0) + slotBefore := pl.GetPattern("zone-1", 12, 0) - // Now simulate an hourly update with anomalous count provider := &mockOccupancyProvider{ - counts: map[int]int{1: 50}, // Way above normal + counts: map[string]int{"zone-1": 50}, } pl.updateAllZones(provider) - slotAfter := pl.GetPattern(1, 12, 0) + slotAfter := pl.GetPattern("zone-1", 12, 0) - // The outlier should be skipped — model should be unchanged - // (score for count=50 when mean=1 will be very high, triggering outlier protection) if slotAfter.SampleCount != slotBefore.SampleCount { t.Logf("note: sample count changed from %d to %d (outlier protection may not trigger if score < 0.5)", slotBefore.SampleCount, slotAfter.SampleCount) }