feat: implement 7-day pattern learning algorithm for anomaly detection

Welford's online algorithm for per-zone, per-hour, per-day-of-week
occupancy modeling with cold start suppression, outlier protection,
security mode override, and SQLite persistence.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-06 21:26:24 -04:00
parent d6b1a902d2
commit f7df7740bf
5 changed files with 275 additions and 263 deletions

View file

@ -24,7 +24,7 @@ import (
type NormalBehaviourSlot struct {
HourOfWeek int `json:"hour_of_week"` // 0-167
ZoneID string `json:"zone_id"`
ExpectedOccupancy float64 ` json:"expected_occupancy"` // 0.0-1.0, fraction of samples with occupancy
ExpectedOccupancy float64 `json:"expected_occupancy"` // 0.0-1.0, fraction of samples with occupancy
TypicalPersonCount float64 `json:"typical_person_count"` // Mean person count
SampleCount int `json:"sample_count"`
TypicalBLEDevices map[string]float64 `json:"typical_ble_devices,omitempty"` // MAC -> frequency (0.0-1.0)
@ -923,6 +923,9 @@ func (d *Detector) createAnomaly(event *events.AnomalyEvent, isSecurityMode bool
// Store in active anomalies
d.activeAnomalies[event.ID] = event
// Also append to history
d.anomalyHistory = append(d.anomalyHistory, event)
// Persist to database
d.persistAnomaly(event)
@ -1200,6 +1203,15 @@ func (d *Detector) UpdateBehaviourModel() error {
log.Printf("[INFO] Updating behaviour model from collected samples...")
// Update behaviour slots from occupancy samples
// First, collect all slots into memory to avoid holding a query connection
// while doing nested queries (deadlock with SetMaxOpenConns(1)).
type aggSlot struct {
HourOfWeek int
ZoneID string
ExpectedOccupancy float64
TypicalPersonCount float64
SampleCount int
}
rows, err := d.db.Query(`
SELECT hour_of_week, zone_id,
AVG(CASE WHEN person_count > 0 THEN 1.0 ELSE 0.0 END) as expected_occupancy,
@ -1211,16 +1223,27 @@ func (d *Detector) UpdateBehaviourModel() error {
if err != nil {
return err
}
defer rows.Close()
var slots []aggSlot
for rows.Next() {
var s aggSlot
if err := rows.Scan(&s.HourOfWeek, &s.ZoneID, &s.ExpectedOccupancy,
&s.TypicalPersonCount, &s.SampleCount); err != nil {
continue
}
slots = append(slots, s)
}
rows.Close()
for _, s := range slots {
slot := &NormalBehaviourSlot{
TypicalBLEDevices: make(map[string]float64),
}
if err := rows.Scan(&slot.HourOfWeek, &slot.ZoneID, &slot.ExpectedOccupancy,
&slot.TypicalPersonCount, &slot.SampleCount); err != nil {
continue
}
slot.HourOfWeek = s.HourOfWeek
slot.ZoneID = s.ZoneID
slot.ExpectedOccupancy = s.ExpectedOccupancy
slot.TypicalPersonCount = s.TypicalPersonCount
slot.SampleCount = s.SampleCount
// Calculate typical BLE devices (seen in > 50% of this slot)
bleRows, err := d.db.Query(`

View file

@ -300,8 +300,11 @@ func TestAnomaly_UnusualDwell(t *testing.T) {
t.Errorf("Expected unusual_dwell anomaly, got %s", event.Type)
}
// Wait for async alert goroutine to complete
time.Sleep(10 * time.Millisecond)
// Check alert was sent
if len(alertHandler.alerts) == 0 {
if alertHandler.alertCount() == 0 {
t.Error("Expected alert to be sent")
}
}

View file

@ -7,6 +7,7 @@ import (
"time"
"github.com/go-chi/chi"
"github.com/spaxel/mothership/internal/events"
)
// Handler provides REST API handlers for analytics.
@ -133,7 +134,9 @@ func (h *AnomalyHandler) RegisterRoutes(r chi.Router) {
r.Post("/api/anomalies/model/update", h.handleUpdateModel)
}
// handleGetAnomalies returns all anomalies (active + recent history).
// handleGetAnomalies returns anomalies filtered by the `since` query parameter.
// Query params:
// - since: duration string (e.g. "24h", "7d", "1h"). Default "24h".
func (h *AnomalyHandler) handleGetAnomalies(w http.ResponseWriter, r *http.Request) {
if h.detector == nil {
http.Error(w, "anomaly detector not available", http.StatusServiceUnavailable)
@ -141,11 +144,35 @@ func (h *AnomalyHandler) handleGetAnomalies(w http.ResponseWriter, r *http.Reque
}
active := h.detector.GetActiveAnomalies()
history := h.detector.GetAnomalyHistory(50)
// Parse since duration
sinceStr := r.URL.Query().Get("since")
if sinceStr == "" {
sinceStr = "24h"
}
sinceDur, err := time.ParseDuration(sinceStr)
if err != nil {
http.Error(w, "invalid since duration: "+err.Error(), http.StatusBadRequest)
return
}
// Fetch enough history to cover the since window
limit := 1000
history := h.detector.GetAnomalyHistory(limit)
// Filter history by since timestamp
cutoff := time.Now().Add(-sinceDur)
var filtered []*events.AnomalyEvent
for _, ev := range history {
if ev.Timestamp.After(cutoff) {
filtered = append(filtered, ev)
}
}
response := map[string]interface{}{
"active": active,
"history": history,
"history": filtered,
"since": sinceStr,
}
writeJSON(w, response)
}
@ -250,3 +277,39 @@ func (h *AnomalyHandler) handleUpdateModel(w http.ResponseWriter, r *http.Reques
writeJSON(w, map[string]string{"status": "updated"})
}
// PatternHandler provides REST API handlers for the Welford pattern learner.
type PatternHandler struct {
learner *PatternLearner
}
// NewPatternHandler creates a new pattern handler.
func NewPatternHandler(learner *PatternLearner) *PatternHandler {
return &PatternHandler{learner: learner}
}
// RegisterRoutes registers pattern API routes on the given router.
func (h *PatternHandler) RegisterRoutes(r chi.Router) {
r.Get("/api/anomaly_patterns", h.handleGetPatterns)
}
// handleGetPatterns returns pattern model data for debugging.
// Query params:
// - zone: filter by zone_id (string). If omitted, returns all patterns.
func (h *PatternHandler) handleGetPatterns(w http.ResponseWriter, r *http.Request) {
if h.learner == nil {
http.Error(w, "pattern learner not available", http.StatusServiceUnavailable)
return
}
zoneID := r.URL.Query().Get("zone")
patterns := h.learner.GetPatterns(zoneID)
response := map[string]interface{}{
"cold_start": h.learner.IsColdStart(),
"patterns": patterns,
"count": len(patterns),
}
writeJSON(w, response)
}

View file

@ -6,6 +6,8 @@ import (
"fmt"
"log"
"math"
"os"
"path/filepath"
"sync"
"time"
)
@ -32,7 +34,7 @@ const (
// PatternSlot represents a single (zone_id, hour_of_day, day_of_week) statistical slot.
type PatternSlot struct {
ZoneID int `json:"zone_id"`
ZoneID string `json:"zone_id"`
HourOfDay int `json:"hour_of_day"` // 0-23
DayOfWeek int `json:"day_of_week"` // 0-6 (0=Sunday)
MeanCount float64 `json:"mean_count"`
@ -43,18 +45,18 @@ type PatternSlot struct {
// patternKey is the composite key for pattern slots.
type patternKey struct {
zoneID int
zoneID string
hourOfDay int
dayOfWeek int
}
// OccupancyProvider provides current zone occupancy counts.
type OccupancyProvider interface {
GetZoneOccupancyCounts() map[int]int // zone_id -> blob count
GetZoneOccupancyCounts() map[string]int // zone_id -> blob count
}
// PatternLearner learns occupancy patterns using Welford's online algorithm.
// It persists to the anomaly_patterns table in the main database.
// It persists to the anomaly_patterns table in its database.
type PatternLearner struct {
mu sync.RWMutex
db *sql.DB
@ -65,16 +67,31 @@ type PatternLearner struct {
patterns map[patternKey]*PatternSlot
}
// NewPatternLearner creates a new pattern learner using the main database.
func NewPatternLearner(db *sql.DB) (*PatternLearner, error) {
// Ensure required tables exist
_, err := db.Exec(`
// NewPatternLearner creates a new pattern learner backed by its own SQLite database.
func NewPatternLearner(dbPath string) (*PatternLearner, error) {
if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil {
return nil, fmt.Errorf("create data dir: %w", err)
}
db, err := sql.Open("sqlite", dbPath)
if err != nil {
return nil, fmt.Errorf("open sqlite: %w", err)
}
db.SetMaxOpenConns(1)
pl := &PatternLearner{
db: db,
patterns: make(map[patternKey]*PatternSlot),
}
// Create tables
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value_json TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS anomaly_patterns (
zone_id INTEGER NOT NULL,
zone_id TEXT NOT NULL,
hour_of_day INTEGER NOT NULL CHECK (hour_of_day BETWEEN 0 AND 23),
day_of_week INTEGER NOT NULL CHECK (day_of_week BETWEEN 0 AND 6),
mean_count REAL NOT NULL DEFAULT 0,
@ -85,14 +102,10 @@ func NewPatternLearner(db *sql.DB) (*PatternLearner, error) {
);
`)
if err != nil {
db.Close()
return nil, fmt.Errorf("create pattern tables: %w", err)
}
pl := &PatternLearner{
db: db,
patterns: make(map[patternKey]*PatternSlot),
}
// Try to load learning start time from settings
var startMs int64
err = db.QueryRow(`SELECT value_json FROM settings WHERE key = 'pattern_learning_start_ms'`).Scan(&startMs)
@ -134,6 +147,11 @@ func (pl *PatternLearner) loadPatterns() error {
return rows.Err()
}
// Close closes the database.
func (pl *PatternLearner) Close() error {
return pl.db.Close()
}
// IsColdStart returns true if the system is within the 7-day cold start period.
func (pl *PatternLearner) IsColdStart() bool {
pl.mu.RLock()
@ -142,7 +160,7 @@ func (pl *PatternLearner) IsColdStart() bool {
}
// IsSlotReady returns true if a specific pattern slot has enough samples.
func (pl *PatternLearner) IsSlotReady(zoneID, hourOfDay, dayOfWeek int) bool {
func (pl *PatternLearner) IsSlotReady(zoneID string, hourOfDay, dayOfWeek int) bool {
pl.mu.RLock()
defer pl.mu.RUnlock()
@ -152,7 +170,7 @@ func (pl *PatternLearner) IsSlotReady(zoneID, hourOfDay, dayOfWeek int) bool {
}
// GetPattern returns a pattern slot for inspection (returns a copy).
func (pl *PatternLearner) GetPattern(zoneID, hourOfDay, dayOfWeek int) *PatternSlot {
func (pl *PatternLearner) GetPattern(zoneID string, hourOfDay, dayOfWeek int) *PatternSlot {
pl.mu.RLock()
defer pl.mu.RUnlock()
@ -166,13 +184,13 @@ func (pl *PatternLearner) GetPattern(zoneID, hourOfDay, dayOfWeek int) *PatternS
}
// GetPatterns returns all patterns, optionally filtered by zone.
func (pl *PatternLearner) GetPatterns(zoneID int) []*PatternSlot {
func (pl *PatternLearner) GetPatterns(zoneID string) []*PatternSlot {
pl.mu.RLock()
defer pl.mu.RUnlock()
var result []*PatternSlot
for key, slot := range pl.patterns {
if zoneID > 0 && key.zoneID != zoneID {
if zoneID != "" && key.zoneID != zoneID {
continue
}
cp := *slot
@ -199,7 +217,7 @@ func WelfordUpdate(mean, m2, count, newValue float64) (newMean, newM2, newCount
// ObserveAndUpdate records an observation and updates the model using Welford's algorithm.
// anomalyScore is the current anomaly score for this observation (0 if not yet computed).
// If anomalyScore >= OutlierProtectionThreshold, the model update is skipped (outlier protection).
func (pl *PatternLearner) ObserveAndUpdate(zoneID, hourOfDay, dayOfWeek int, observedCount int, anomalyScore float64) error {
func (pl *PatternLearner) ObserveAndUpdate(zoneID string, hourOfDay, dayOfWeek int, observedCount int, anomalyScore float64) error {
// Outlier protection: don't learn from anomalies
if anomalyScore >= OutlierProtectionThreshold {
return nil
@ -266,7 +284,7 @@ type AnomalyResult struct {
}
// ComputeAnomalyScore computes the anomaly score for an observation.
func (pl *PatternLearner) ComputeAnomalyScore(zoneID, hourOfDay, dayOfWeek int, observedCount int) AnomalyResult {
func (pl *PatternLearner) ComputeAnomalyScore(zoneID string, hourOfDay, dayOfWeek int, observedCount int) AnomalyResult {
pl.mu.RLock()
defer pl.mu.RUnlock()
@ -320,7 +338,7 @@ func (pl *PatternLearner) ComputeAnomalyScore(zoneID, hourOfDay, dayOfWeek int,
// computeScoreLocked computes anomaly score while holding the write lock.
// Used internally by updateAllZones to avoid lock ordering issues.
func (pl *PatternLearner) computeScoreLocked(zoneID, hourOfDay, dayOfWeek int, observedCount int) float64 {
func (pl *PatternLearner) computeScoreLocked(zoneID string, hourOfDay, dayOfWeek int, observedCount int) float64 {
if pl.securityMode {
return 1.0
}
@ -466,7 +484,7 @@ func (pl *PatternLearner) updateAllZones(provider OccupancyProvider) {
updated_at = excluded.updated_at
`, zoneID, hourOfDay, dayOfWeek, newMean, variance, int(newCount), nowMs)
if err != nil {
log.Printf("[WARN] Failed to update pattern for zone %d: %v", zoneID, err)
log.Printf("[WARN] Failed to update pattern for zone %s: %v", zoneID, err)
continue
}

View file

@ -20,7 +20,8 @@ func openTestDB(t *testing.T) *sql.DB {
}
t.Cleanup(func() { os.RemoveAll(tmpDir) })
db, err := sql.Open("sqlite", filepath.Join(tmpDir, "test.db"))
dbPath := filepath.Join(tmpDir, "test.db")
db, err := sql.Open("sqlite", dbPath)
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
@ -33,7 +34,7 @@ func openTestDB(t *testing.T) *sql.DB {
value_json TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS anomaly_patterns (
zone_id INTEGER NOT NULL,
zone_id TEXT NOT NULL,
hour_of_day INTEGER NOT NULL CHECK (hour_of_day BETWEEN 0 AND 23),
day_of_week INTEGER NOT NULL CHECK (day_of_week BETWEEN 0 AND 6),
mean_count REAL NOT NULL DEFAULT 0,
@ -50,50 +51,67 @@ func openTestDB(t *testing.T) *sql.DB {
return db
}
// newTestLearner creates a PatternLearner backed by a temp database.
func newTestLearner(t *testing.T) *PatternLearner {
t.Helper()
tmpDir, err := os.MkdirTemp("", "pattern_learner_test")
if err != nil {
t.Fatalf("create temp dir: %v", err)
}
t.Cleanup(func() { os.RemoveAll(tmpDir) })
pl, err := NewPatternLearner(filepath.Join(tmpDir, "patterns.db"))
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
t.Cleanup(func() { pl.Close() })
return pl
}
// --- Welford's algorithm tests ---
func TestWelfordUpdate_NumericalStability(t *testing.T) {
tests := []struct {
name string
name string
observations []float64
wantMean float64
wantVar float64
wantMean float64
wantVar float64
}{
{
name: "single observation",
name: "single observation",
observations: []float64{5.0},
wantMean: 5.0,
wantVar: 0.0,
wantMean: 5.0,
wantVar: 0.0,
},
{
name: "two identical observations",
name: "two identical observations",
observations: []float64{3.0, 3.0},
wantMean: 3.0,
wantVar: 0.0,
wantMean: 3.0,
wantVar: 0.0,
},
{
name: "three observations",
name: "three observations",
observations: []float64{1.0, 2.0, 6.0},
wantMean: 3.0,
wantVar: 4.666666666666667,
wantMean: 3.0,
wantVar: 4.666666666666667,
},
{
name: "zero observations then non-zero",
name: "zero observations then non-zero",
observations: []float64{0.0, 0.0, 0.0, 5.0},
wantMean: 1.25,
wantVar: 4.6875,
wantMean: 1.25,
wantVar: 4.6875,
},
{
name: "large count stability",
observations: makeSequence(2.0, 1000), // 1000 observations of 2.0
wantMean: 2.0,
wantVar: 0.0,
name: "large count stability",
observations: makeSequence(2.0, 1000),
wantMean: 2.0,
wantVar: 0.0,
},
{
name: "large count with variance",
observations: makeSequence(5.0, 100), // 100 obs of 5.0 then...
wantMean: 5.0,
wantVar: 0.0,
name: "large count with variance",
observations: makeSequence(5.0, 100),
wantMean: 5.0,
wantVar: 0.0,
},
}
@ -129,10 +147,8 @@ func TestWelfordUpdate_NumericalStability(t *testing.T) {
}
func TestWelfordUpdate_NoNaNInf_AnySampleCount(t *testing.T) {
// Verify that Welford's update never produces NaN or Inf at any sample count
mean, m2, count := 0.0, 0.0, 0.0
for i := 0; i < 10000; i++ {
// Alternate between extreme values
obs := float64(i%100) * 0.01
mean, m2, count = WelfordUpdate(mean, m2, count, obs)
@ -145,7 +161,6 @@ func TestWelfordUpdate_NoNaNInf_AnySampleCount(t *testing.T) {
t.Fatalf("NaN/Inf variance at sample %d: variance=%v, m2=%v, count=%v", i+1, variance, m2, count)
}
// Variance should always be non-negative
if variance < -1e-12 {
t.Fatalf("negative variance at sample %d: %v", i+1, variance)
}
@ -153,17 +168,14 @@ func TestWelfordUpdate_NoNaNInf_AnySampleCount(t *testing.T) {
}
func TestWelfordUpdate_MatchesBatchVariance(t *testing.T) {
// Compare Welford online variance with batch computation
observations := []float64{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0}
// Welford online
mean, m2, count := 0.0, 0.0, 0.0
for _, obs := range observations {
mean, m2, count = WelfordUpdate(mean, m2, count, obs)
}
onlineVar := m2 / count
// Batch computation
var batchMean float64
for _, obs := range observations {
batchMean += obs
@ -188,15 +200,15 @@ func TestNormalizeZScore(t *testing.T) {
z float64
want float64
}{
{0.0, 0.0}, // Below 1σ
{0.5, 0.0}, // Below 1σ
{1.0, 0.0}, // Exactly 1σ boundary
{2.0, 0.333}, // 2σ → (2-1)/3
{3.0, 0.667}, // 3σ → (3-1)/3
{4.0, 1.0}, // 4σ → (4-1)/3 = 1.0
{5.0, 1.0}, // Above 4σ → clamped to 1.0
{-1.5, 0.167}, // Negative z-score, |z| = 1.5 → (1.5-1)/3
{-4.0, 1.0}, // Negative, clamped
{0.0, 0.0},
{0.5, 0.0},
{1.0, 0.0},
{2.0, 0.333},
{3.0, 0.667},
{4.0, 1.0},
{5.0, 1.0},
{-1.5, 0.167},
{-4.0, 1.0},
}
for _, tt := range tests {
@ -210,19 +222,13 @@ func TestNormalizeZScore(t *testing.T) {
// --- PatternLearner tests ---
func TestPatternLearner_ColdStart(t *testing.T) {
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
pl := newTestLearner(t)
// Fresh learner should be in cold start
if !pl.IsColdStart() {
t.Error("expected cold start for new learner")
}
// Anomaly score should be suppressed during cold start
result := pl.ComputeAnomalyScore(1, 12, 0, 5)
result := pl.ComputeAnomalyScore("zone-1", 12, 0, 5)
if !result.Suppressed {
t.Error("expected anomaly score to be suppressed during cold start")
}
@ -232,45 +238,32 @@ func TestPatternLearner_ColdStart(t *testing.T) {
}
func TestPatternLearner_SlotNotReady(t *testing.T) {
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
pl := newTestLearner(t)
// Even after loading, a slot that doesn't exist should not be ready
if pl.IsSlotReady(1, 12, 0) {
if pl.IsSlotReady("zone-1", 12, 0) {
t.Error("expected slot not ready")
}
result := pl.ComputeAnomalyScore(1, 12, 0, 5)
result := pl.ComputeAnomalyScore("zone-1", 12, 0, 5)
if !result.Suppressed {
t.Error("expected anomaly score suppressed when slot not ready")
}
}
func TestPatternLearner_ObserveAndUpdate_Persists(t *testing.T) {
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
pl := newTestLearner(t)
// Record 50 observations to make the slot ready
for i := 0; i < 50; i++ {
err := pl.ObserveAndUpdate(1, 12, 0, 2, 0)
if err != nil {
if err := pl.ObserveAndUpdate("zone-1", 12, 0, 2, 0); err != nil {
t.Fatalf("ObserveAndUpdate: %v", err)
}
}
// Slot should now be ready
if !pl.IsSlotReady(1, 12, 0) {
if !pl.IsSlotReady("zone-1", 12, 0) {
t.Error("expected slot to be ready after 50 observations")
}
// Verify the pattern was persisted
slot := pl.GetPattern(1, 12, 0)
slot := pl.GetPattern("zone-1", 12, 0)
if slot == nil {
t.Fatal("expected pattern to exist")
}
@ -280,70 +273,53 @@ func TestPatternLearner_ObserveAndUpdate_Persists(t *testing.T) {
if slot.SampleCount != 50 {
t.Errorf("expected sample_count=50, got %d", slot.SampleCount)
}
// Variance should be 0 since all observations are identical
if slot.Variance > 1e-9 {
t.Errorf("expected variance=0 for identical observations, got %v", slot.Variance)
}
}
func TestPatternLearner_ObserveAndUpdate_WithVariance(t *testing.T) {
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
pl := newTestLearner(t)
// Record observations: [0, 1, 2, 3, 4] repeated to get 50 samples
for i := 0; i < 50; i++ {
err := pl.ObserveAndUpdate(1, 12, 0, i%5, 0)
if err != nil {
if err := pl.ObserveAndUpdate("zone-1", 12, 0, i%5, 0); err != nil {
t.Fatalf("ObserveAndUpdate: %v", err)
}
}
slot := pl.GetPattern(1, 12, 0)
slot := pl.GetPattern("zone-1", 12, 0)
if slot == nil {
t.Fatal("expected pattern")
}
// Mean of [0,1,2,3,4] repeated = 2.0
if math.Abs(slot.MeanCount-2.0) > 1e-9 {
t.Errorf("expected mean=2.0, got %v", slot.MeanCount)
}
// Variance of [0,1,2,3,4] = 2.0
if math.Abs(slot.Variance-2.0) > 1e-6 {
t.Errorf("expected variance=2.0, got %v", slot.Variance)
}
}
func TestPatternLearner_OutlierProtection(t *testing.T) {
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
pl := newTestLearner(t)
// Establish a pattern with 50 samples of count=0 (zone usually empty)
for i := 0; i < 50; i++ {
err := pl.ObserveAndUpdate(1, 12, 0, 0, 0)
if err != nil {
if err := pl.ObserveAndUpdate("zone-1", 12, 0, 0, 0); err != nil {
t.Fatalf("ObserveAndUpdate: %v", err)
}
}
slotBefore := pl.GetPattern(1, 12, 0)
slotBefore := pl.GetPattern("zone-1", 12, 0)
meanBefore := slotBefore.MeanCount
countBefore := slotBefore.SampleCount
// Try to update with an anomaly score >= 0.5 — should be skipped
err = pl.ObserveAndUpdate(1, 12, 0, 100, 0.6)
if err != nil {
// Outlier should be skipped
if err := pl.ObserveAndUpdate("zone-1", 12, 0, 100, 0.6); err != nil {
t.Fatalf("ObserveAndUpdate: %v", err)
}
// Verify the model was NOT updated
slotAfter := pl.GetPattern(1, 12, 0)
slotAfter := pl.GetPattern("zone-1", 12, 0)
if slotAfter.MeanCount != meanBefore {
t.Errorf("outlier protection failed: mean changed from %v to %v", meanBefore, slotAfter.MeanCount)
}
@ -353,28 +329,21 @@ func TestPatternLearner_OutlierProtection(t *testing.T) {
}
func TestPatternLearner_OutlierProtection_AfterMultipleAnomalies(t *testing.T) {
// Bead spec: injecting synthetic anomaly does not corrupt model after 3 occurrences
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
pl := newTestLearner(t)
// Establish baseline: 50 samples of count=1
for i := 0; i < 50; i++ {
pl.ObserveAndUpdate(1, 12, 0, 1, 0)
pl.ObserveAndUpdate("zone-1", 12, 0, 1, 0)
}
slot := pl.GetPattern(1, 12, 0)
slot := pl.GetPattern("zone-1", 12, 0)
meanBefore := slot.MeanCount
// Inject 3 synthetic anomalies (count=50 with high anomaly score)
// Inject 3 synthetic anomalies
for i := 0; i < 3; i++ {
pl.ObserveAndUpdate(1, 12, 0, 50, 1.0)
pl.ObserveAndUpdate("zone-1", 12, 0, 50, 1.0)
}
// Model should be unchanged
slot = pl.GetPattern(1, 12, 0)
slot = pl.GetPattern("zone-1", 12, 0)
if slot.SampleCount != 50 {
t.Errorf("expected sample_count to remain 50, got %d", slot.SampleCount)
}
@ -384,16 +353,11 @@ func TestPatternLearner_OutlierProtection_AfterMultipleAnomalies(t *testing.T) {
}
func TestPatternLearner_SecurityModeOverride(t *testing.T) {
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
pl := newTestLearner(t)
pl.SetSecurityMode(true)
// Any detection in security mode should score 1.0
result := pl.ComputeAnomalyScore(1, 12, 0, 0)
result := pl.ComputeAnomalyScore("zone-1", 12, 0, 0)
if result.CompositeScore != 1.0 {
t.Errorf("security mode: expected composite=1.0, got %v", result.CompositeScore)
}
@ -401,8 +365,7 @@ func TestPatternLearner_SecurityModeOverride(t *testing.T) {
t.Error("security mode: expected is_alert=true")
}
// Even with 0 observed count
result = pl.ComputeAnomalyScore(1, 12, 0, 0)
result = pl.ComputeAnomalyScore("zone-1", 12, 0, 0)
if result.CompositeScore != 1.0 {
t.Errorf("security mode with 0 count: expected composite=1.0, got %v", result.CompositeScore)
}
@ -411,23 +374,14 @@ func TestPatternLearner_SecurityModeOverride(t *testing.T) {
}
func TestPatternLearner_AnomalyScoring(t *testing.T) {
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
// Move past cold start so scoring is active
pl := newTestLearner(t)
pl.SetLearningStartTime(time.Now().Add(-8 * 24 * time.Hour))
// Build a pattern: zone usually has 0 occupants at this time
// 50 samples of count=0
for i := 0; i < 50; i++ {
pl.ObserveAndUpdate(1, 3, 0, 0, 0) // 3 AM, Sunday
pl.ObserveAndUpdate("zone-1", 3, 0, 0, 0)
}
// Check: observing 0 people should score low
result := pl.ComputeAnomalyScore(1, 3, 0, 0)
result := pl.ComputeAnomalyScore("zone-1", 3, 0, 0)
if result.CompositeScore > 0.01 {
t.Errorf("expected low score for expected observation, got %v", result.CompositeScore)
}
@ -435,9 +389,7 @@ func TestPatternLearner_AnomalyScoring(t *testing.T) {
t.Error("expected not suppressed when slot is ready")
}
// Check: observing people at 3 AM when zone is normally empty
// This should trigger zone_score = 1.0 (zone normally empty, now occupied)
result = pl.ComputeAnomalyScore(1, 3, 0, 3)
result = pl.ComputeAnomalyScore("zone-1", 3, 0, 3)
if result.ZoneScore != 1.0 {
t.Errorf("expected zone_score=1.0 when zone normally empty, got %v", result.ZoneScore)
}
@ -450,98 +402,82 @@ func TestPatternLearner_AnomalyScoring(t *testing.T) {
}
func TestPatternLearner_AnomalyScoring_ZScoreBased(t *testing.T) {
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
// Move past cold start so scoring is active
pl := newTestLearner(t)
pl.SetLearningStartTime(time.Now().Add(-8 * 24 * time.Hour))
// Build a pattern: zone has mean=2, need variance
// Feed [1, 3] alternating to get mean=2, variance=1
for i := 0; i < 50; i++ {
pl.ObserveAndUpdate(1, 14, 0, 1+i%2, 0) // 2 PM, alternating 1 and 2
pl.ObserveAndUpdate("zone-1", 14, 0, 1+i%2, 0)
}
slot := pl.GetPattern(1, 14, 0)
slot := pl.GetPattern("zone-1", 14, 0)
if slot == nil {
t.Fatal("expected pattern")
}
// Observation at the mean should score low
result := pl.ComputeAnomalyScore(1, 14, 0, 2)
result := pl.ComputeAnomalyScore("zone-1", 14, 0, 2)
if result.TimeScore > 0.01 {
t.Errorf("expected low time_score for mean observation, got %v", result.TimeScore)
}
// Observation far from mean should score higher
// z = (10 - 2) / sqrt(variance + epsilon) — should be well above 4σ
result = pl.ComputeAnomalyScore(1, 14, 0, 10)
result = pl.ComputeAnomalyScore("zone-1", 14, 0, 10)
if result.TimeScore < 0.9 {
t.Errorf("expected high time_score for extreme observation, got %v", result.TimeScore)
}
}
func TestPatternLearner_GetPatterns(t *testing.T) {
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
pl := newTestLearner(t)
// Add patterns for two zones
for i := 0; i < 50; i++ {
pl.ObserveAndUpdate(1, 12, 0, 2, 0)
pl.ObserveAndUpdate(2, 12, 0, 3, 0)
pl.ObserveAndUpdate("zone-1", 12, 0, 2, 0)
pl.ObserveAndUpdate("zone-2", 12, 0, 3, 0)
}
// Get all patterns
all := pl.GetPatterns(0)
all := pl.GetPatterns("")
if len(all) != 2 {
t.Errorf("expected 2 patterns, got %d", len(all))
}
// Filter by zone
zone1 := pl.GetPatterns(1)
zone1 := pl.GetPatterns("zone-1")
if len(zone1) != 1 {
t.Errorf("expected 1 pattern for zone 1, got %d", len(zone1))
t.Errorf("expected 1 pattern for zone-1, got %d", len(zone1))
}
if zone1[0].ZoneID != 1 {
t.Errorf("expected zone_id=1, got %d", zone1[0].ZoneID)
if zone1[0].ZoneID != "zone-1" {
t.Errorf("expected zone_id=zone-1, got %s", zone1[0].ZoneID)
}
}
func TestPatternLearner_SurvivesRestart(t *testing.T) {
db := openTestDB(t)
tmpDir, err := os.MkdirTemp("", "pattern_restart_test")
if err != nil {
t.Fatalf("create temp dir: %v", err)
}
t.Cleanup(func() { os.RemoveAll(tmpDir) })
// Create learner and add data
pl1, err := NewPatternLearner(db)
dbPath := filepath.Join(tmpDir, "patterns.db")
pl1, err := NewPatternLearner(dbPath)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
for i := 0; i < 50; i++ {
pl1.ObserveAndUpdate(1, 12, 0, 2, 0)
pl1.ObserveAndUpdate("zone-1", 12, 0, 2, 0)
}
// Close and recreate (simulates server restart)
pl1.mu.Lock() // Clear in-memory cache to simulate fresh start
pl1.patterns = make(map[patternKey]*PatternSlot)
pl1.mu.Unlock()
pl1.Close()
pl2, err := NewPatternLearner(db)
pl2, err := NewPatternLearner(dbPath)
if err != nil {
t.Fatalf("NewPatternLearner after restart: %v", err)
}
defer pl2.Close()
// Pattern should be loaded from DB
if !pl2.IsSlotReady(1, 12, 0) {
if !pl2.IsSlotReady("zone-1", 12, 0) {
t.Error("expected slot to be ready after reload from DB")
}
slot := pl2.GetPattern(1, 12, 0)
slot := pl2.GetPattern("zone-1", 12, 0)
if slot == nil {
t.Fatal("expected pattern after restart")
}
@ -572,17 +508,13 @@ func TestPatternLearner_AlertThresholds(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
pl := newTestLearner(t)
for _, obs := range tt.observations {
pl.ObserveAndUpdate(1, 14, 0, obs, 0)
pl.ObserveAndUpdate("zone-1", 14, 0, obs, 0)
}
result := pl.ComputeAnomalyScore(1, 14, 0, tt.testCount)
result := pl.ComputeAnomalyScore("zone-1", 14, 0, tt.testCount)
if result.IsAlert != tt.wantAlert {
t.Errorf("is_alert = %v, want %v (composite=%v)", result.IsAlert, tt.wantAlert, result.CompositeScore)
}
@ -594,21 +526,16 @@ func TestPatternLearner_AlertThresholds(t *testing.T) {
}
func TestPatternLearner_NaNInf_NeverProduced(t *testing.T) {
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
pl := newTestLearner(t)
// Feed extreme and varied observations
observations := []int{0, 100, 0, 100, 0, 100, 1, 99, 50, 0}
for i := 0; i < 5; i++ { // Repeat to get 50+ samples
for i := 0; i < 5; i++ {
for _, obs := range observations {
pl.ObserveAndUpdate(1, 14, 0, obs, 0)
pl.ObserveAndUpdate("zone-1", 14, 0, obs, 0)
}
}
slot := pl.GetPattern(1, 14, 0)
slot := pl.GetPattern("zone-1", 14, 0)
if slot == nil {
t.Fatal("expected pattern")
}
@ -620,9 +547,8 @@ func TestPatternLearner_NaNInf_NeverProduced(t *testing.T) {
t.Error("variance is NaN or Inf")
}
// Score should also never be NaN/Inf
for _, obs := range []int{0, 1, 5, 50, 100, 200} {
result := pl.ComputeAnomalyScore(1, 14, 0, obs)
result := pl.ComputeAnomalyScore("zone-1", 14, 0, obs)
if math.IsNaN(result.CompositeScore) || math.IsInf(result.CompositeScore, 0) {
t.Errorf("NaN/Inf composite for obs=%d: %v", obs, result.CompositeScore)
}
@ -633,24 +559,17 @@ func TestPatternLearner_NaNInf_NeverProduced(t *testing.T) {
}
func TestPatternLearner_NoAlertsDuringColdStart(t *testing.T) {
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
pl := newTestLearner(t)
// Even with extreme observations during cold start, no alerts
for i := 0; i < 100; i++ {
pl.ObserveAndUpdate(1, 3, 0, 50, 0)
pl.ObserveAndUpdate("zone-1", 3, 0, 50, 0)
}
// Still in cold start (7 days haven't passed)
if !pl.IsColdStart() {
t.Log("note: cold start check depends on timing")
}
// ComputeAnomalyScore should suppress
result := pl.ComputeAnomalyScore(1, 3, 0, 50)
result := pl.ComputeAnomalyScore("zone-1", 3, 0, 50)
if !result.Suppressed {
t.Error("expected anomaly score to be suppressed during cold start regardless of activity")
}
@ -662,70 +581,56 @@ func TestPatternLearner_NoAlertsDuringColdStart(t *testing.T) {
// --- Integration test: hourly update with mock provider ---
type mockOccupancyProvider struct {
counts map[int]int
counts map[string]int
}
func (m *mockOccupancyProvider) GetZoneOccupancyCounts() map[int]int {
func (m *mockOccupancyProvider) GetZoneOccupancyCounts() map[string]int {
return m.counts
}
func TestPatternLearner_HourlyUpdate_Integration(t *testing.T) {
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
pl := newTestLearner(t)
provider := &mockOccupancyProvider{
counts: map[int]int{1: 2, 2: 0},
counts: map[string]int{"zone-1": 2, "zone-2": 0},
}
// Manually trigger the update (instead of waiting for the hourly timer)
pl.updateAllZones(provider)
// Check patterns were created
slot1 := pl.GetPattern(1, time.Now().Hour(), int(time.Now().Weekday()))
slot1 := pl.GetPattern("zone-1", time.Now().Hour(), int(time.Now().Weekday()))
if slot1 == nil {
t.Fatal("expected pattern for zone 1 after hourly update")
t.Fatal("expected pattern for zone-1 after hourly update")
}
if slot1.MeanCount != 2.0 {
t.Errorf("expected mean=2.0 for zone 1, got %v", slot1.MeanCount)
t.Errorf("expected mean=2.0 for zone-1, got %v", slot1.MeanCount)
}
slot2 := pl.GetPattern(2, time.Now().Hour(), int(time.Now().Weekday()))
slot2 := pl.GetPattern("zone-2", time.Now().Hour(), int(time.Now().Weekday()))
if slot2 == nil {
t.Fatal("expected pattern for zone 2 after hourly update")
t.Fatal("expected pattern for zone-2 after hourly update")
}
if slot2.MeanCount != 0.0 {
t.Errorf("expected mean=0.0 for zone 2, got %v", slot2.MeanCount)
t.Errorf("expected mean=0.0 for zone-2, got %v", slot2.MeanCount)
}
}
func TestPatternLearner_HourlyUpdate_OutlierProtectionInUpdate(t *testing.T) {
db := openTestDB(t)
pl, err := NewPatternLearner(db)
if err != nil {
t.Fatalf("NewPatternLearner: %v", err)
}
pl := newTestLearner(t)
// First establish a baseline
for i := 0; i < 50; i++ {
pl.ObserveAndUpdate(1, 12, 0, 1, 0)
pl.ObserveAndUpdate("zone-1", 12, 0, 1, 0)
}
slotBefore := pl.GetPattern(1, 12, 0)
slotBefore := pl.GetPattern("zone-1", 12, 0)
// Now simulate an hourly update with anomalous count
provider := &mockOccupancyProvider{
counts: map[int]int{1: 50}, // Way above normal
counts: map[string]int{"zone-1": 50},
}
pl.updateAllZones(provider)
slotAfter := pl.GetPattern(1, 12, 0)
slotAfter := pl.GetPattern("zone-1", 12, 0)
// The outlier should be skipped — model should be unchanged
// (score for count=50 when mean=1 will be very high, triggering outlier protection)
if slotAfter.SampleCount != slotBefore.SampleCount {
t.Logf("note: sample count changed from %d to %d (outlier protection may not trigger if score < 0.5)", slotBefore.SampleCount, slotAfter.SampleCount)
}