diff --git a/mothership/cmd/mothership/main.go b/mothership/cmd/mothership/main.go index 07462ce..a333f3c 100644 --- a/mothership/cmd/mothership/main.go +++ b/mothership/cmd/mothership/main.go @@ -874,12 +874,11 @@ func main() { var predictionPredictor *prediction.Predictor var predictionAccuracy *prediction.AccuracyTracker var predictionHorizon *prediction.HorizonPredictor - predictionStore, err = prediction.NewModelStore(filepath.Join(cfg.DataDir, "prediction.db")) + predictionStore, err = prediction.NewModelStoreWithDB(mainDB) if err != nil { - log.Printf("[WARN] Failed to open prediction store: %v", err) + log.Printf("[WARN] Failed to initialize prediction store: %v", err) } else { - defer closeQuietly(predictionStore) - log.Printf("[INFO] Prediction store at %s", filepath.Join(cfg.DataDir, "prediction.db")) + log.Printf("[INFO] Prediction store using main database") // Create history updater predictionHistory = prediction.NewHistoryUpdater(predictionStore) @@ -890,12 +889,11 @@ func main() { } // Create accuracy tracker - predictionAccuracy, err = prediction.NewAccuracyTracker(filepath.Join(cfg.DataDir, "prediction_accuracy.db")) + predictionAccuracy, err = prediction.NewAccuracyTrackerWithDB(mainDB) if err != nil { - log.Printf("[WARN] Failed to open accuracy tracker: %v", err) + log.Printf("[WARN] Failed to initialize accuracy tracker: %v", err) } else { - defer closeQuietly(predictionAccuracy) - log.Printf("[INFO] Prediction accuracy tracker at %s", filepath.Join(cfg.DataDir, "prediction_accuracy.db")) + log.Printf("[INFO] Prediction accuracy tracker using main database") } // Create predictor diff --git a/mothership/internal/db/migrations.go b/mothership/internal/db/migrations.go index 875c60e..ce66d62 100644 --- a/mothership/internal/db/migrations.go +++ b/mothership/internal/db/migrations.go @@ -93,6 +93,11 @@ func AllMigrations() []Migration { Description: "add predicted_enter to triggers table CHECK constraint", Up: migration_017_add_predicted_enter_trigger, }, + { + Version: 18, + Description: "add prediction subsystem tables", + Up: migration_018_add_prediction_tables, + }, } } @@ -949,3 +954,115 @@ func migration_017_add_predicted_enter_trigger(tx *sql.Tx) error { return nil } + +// migration_018_add_prediction_tables creates all prediction subsystem tables. +// Consolidates prediction.db and prediction_accuracy.db into the main database. +func migration_018_add_prediction_tables(tx *sql.Tx) error { + schema := ` + -- Zone transition history for probability models + CREATE TABLE IF NOT EXISTS zone_transitions_history ( + id TEXT PRIMARY KEY, + person_id TEXT NOT NULL, + from_zone_id TEXT NOT NULL, + to_zone_id TEXT NOT NULL, + hour_of_week INTEGER NOT NULL, + dwell_duration_minutes REAL NOT NULL DEFAULT 0, + timestamp INTEGER NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_transitions_person_time ON zone_transitions_history(person_id, hour_of_week); + CREATE INDEX IF NOT EXISTS idx_transitions_from ON zone_transitions_history(from_zone_id); + CREATE INDEX IF NOT EXISTS idx_transitions_timestamp ON zone_transitions_history(timestamp); + + -- Transition probability cache + CREATE TABLE IF NOT EXISTS transition_probabilities ( + person_id TEXT NOT NULL, + hour_of_week INTEGER NOT NULL, + from_zone_id TEXT NOT NULL, + to_zone_id TEXT NOT NULL, + probability REAL NOT NULL, + count INTEGER NOT NULL, + last_computed INTEGER NOT NULL, + PRIMARY KEY (person_id, hour_of_week, from_zone_id, to_zone_id) + ); + + -- Dwell time statistics cache + CREATE TABLE IF NOT EXISTS dwell_times ( + person_id TEXT NOT NULL, + zone_id TEXT NOT NULL, + hour_of_week INTEGER NOT NULL, + mean_minutes REAL NOT NULL, + stddev_minutes REAL NOT NULL, + count INTEGER NOT NULL, + last_computed INTEGER NOT NULL, + PRIMARY KEY (person_id, zone_id, hour_of_week) + ); + + -- Person zone entry tracking + CREATE TABLE IF NOT EXISTS person_zone_entry ( + person_id TEXT NOT NULL, + zone_id TEXT NOT NULL, + entry_time INTEGER NOT NULL, + blob_id INTEGER NOT NULL, + PRIMARY KEY (person_id, zone_id) + ); + + -- Recorded predictions for accuracy tracking + CREATE TABLE IF NOT EXISTS recorded_predictions ( + id TEXT PRIMARY KEY, + person_id TEXT NOT NULL, + predicted_at INTEGER NOT NULL, + target_time INTEGER NOT NULL, + current_zone_id TEXT NOT NULL, + predicted_zone_id TEXT NOT NULL, + actual_zone_id TEXT, + prediction_confidence REAL NOT NULL, + horizon_minutes INTEGER NOT NULL, + evaluated INTEGER NOT NULL DEFAULT 0, + correct INTEGER DEFAULT 0, + evaluated_at INTEGER + ); + CREATE INDEX IF NOT EXISTS idx_predictions_person ON recorded_predictions(person_id); + CREATE INDEX IF NOT EXISTS idx_predictions_target ON recorded_predictions(target_time); + CREATE INDEX IF NOT EXISTS idx_predictions_evaluated ON recorded_predictions(evaluated); + CREATE INDEX IF NOT EXISTS idx_predictions_person_target ON recorded_predictions(person_id, target_time); + + -- Accuracy statistics summary + CREATE TABLE IF NOT EXISTS accuracy_stats ( + person_id TEXT NOT NULL, + horizon_minutes INTEGER NOT NULL, + total_predictions INTEGER NOT NULL, + correct_predictions INTEGER NOT NULL, + accuracy REAL NOT NULL, + window_start INTEGER NOT NULL, + window_end INTEGER NOT NULL, + last_updated INTEGER NOT NULL, + PRIMARY KEY (person_id, horizon_minutes) + ); + + -- Zone occupancy patterns for time-based predictions + CREATE TABLE IF NOT EXISTS zone_occupancy_patterns ( + zone_id TEXT NOT NULL, + hour_of_week INTEGER NOT NULL, + occupancy_prob REAL NOT NULL, + mean_dwell_minutes REAL NOT NULL, + stddev_dwell REAL NOT NULL, + sample_count INTEGER NOT NULL, + last_computed INTEGER NOT NULL, + PRIMARY KEY (zone_id, hour_of_week) + ); + + -- Zone occupancy history for pattern learning + CREATE TABLE IF NOT EXISTS zone_occupancy_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + zone_id TEXT NOT NULL, + person_id TEXT, + enter_time INTEGER NOT NULL, + exit_time INTEGER, + duration_minutes REAL + ); + CREATE INDEX IF NOT EXISTS idx_occupancy_zone ON zone_occupancy_history(zone_id); + CREATE INDEX IF NOT EXISTS idx_occupancy_enter ON zone_occupancy_history(enter_time); + ` + _, err := tx.Exec(schema) + return err +} diff --git a/mothership/internal/prediction/accuracy.go b/mothership/internal/prediction/accuracy.go index 8fc7266..3ba0217 100644 --- a/mothership/internal/prediction/accuracy.go +++ b/mothership/internal/prediction/accuracy.go @@ -79,7 +79,8 @@ type AccuracyTracker struct { horizon time.Duration } -// NewAccuracyTracker creates a new accuracy tracker. +// NewAccuracyTracker creates a new accuracy tracker with its own database connection. +// Deprecated: Use NewAccuracyTrackerWithDB to share the main database connection. func NewAccuracyTracker(dbPath string) (*AccuracyTracker, error) { if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil { return nil, fmt.Errorf("create data dir: %w", err) @@ -112,6 +113,24 @@ func NewAccuracyTracker(dbPath string) (*AccuracyTracker, error) { return t, nil } +// NewAccuracyTrackerWithDB creates a new accuracy tracker using an existing database connection. +// Tables must have been created by the migration framework. +func NewAccuracyTrackerWithDB(db *sql.DB) (*AccuracyTracker, error) { + t := &AccuracyTracker{ + db: db, + pendingPredictions: make(map[string]RecordedPrediction), + cachedStats: make(map[string]*AccuracyStats), + horizon: PredictionHorizon, + } + + // Load pending predictions + if err := t.loadPendingPredictions(); err != nil { + log.Printf("[WARN] prediction: failed to load pending predictions: %v", err) + } + + return t, nil +} + func (t *AccuracyTracker) migrate() error { // Create prediction accuracy schema version tracking table _, err := t.db.Exec(` @@ -227,9 +246,12 @@ func (t *AccuracyTracker) loadPendingPredictions() error { return nil } -// Close closes the database. +// Close closes the database if we own it. func (t *AccuracyTracker) Close() error { - return t.db.Close() + if t.path != "" { + return t.db.Close() + } + return nil } // RecordPrediction records a new prediction for later evaluation. diff --git a/mothership/internal/prediction/model.go b/mothership/internal/prediction/model.go index c977249..aed94ea 100644 --- a/mothership/internal/prediction/model.go +++ b/mothership/internal/prediction/model.go @@ -71,7 +71,8 @@ type ModelStore struct { firstTransitionTime time.Time } -// NewModelStore creates a new prediction model store. +// NewModelStore creates a new prediction model store with its own database connection. +// Deprecated: Use NewModelStoreWithDB to share the main database connection. func NewModelStore(dbPath string) (*ModelStore, error) { if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil { return nil, fmt.Errorf("create data dir: %w", err) @@ -99,6 +100,19 @@ func NewModelStore(dbPath string) (*ModelStore, error) { return s, nil } +// NewModelStoreWithDB creates a new prediction model store using an existing database connection. +// Tables must have been created by the migration framework. +func NewModelStoreWithDB(db *sql.DB) (*ModelStore, error) { + s := &ModelStore{ + db: db, + } + + // Load first transition time + s.loadFirstTransitionTime() + + return s, nil +} + func (s *ModelStore) migrate() error { // Create prediction schema version tracking table _, err := s.db.Exec(` @@ -182,9 +196,12 @@ func (s *ModelStore) loadFirstTransitionTime() { } } -// Close closes the database. +// Close closes the database if we own it. func (s *ModelStore) Close() error { - return s.db.Close() + if s.path != "" { + return s.db.Close() + } + return nil } // RecordTransition records a zone transition event.