refactor(prediction): consolidate prediction tables into migration framework
Some checks are pending
CI Benchmark - Fusion Loop Timing / Fusion Loop Timing Benchmark (push) Waiting to run

Add migration_018 that creates all 8 prediction subsystem tables in the main
database, consolidating the separate prediction.db and prediction_accuracy.db
files. Add NewModelStoreWithDB and NewAccuracyTrackerWithDB constructors that
accept an existing *sql.DB connection, and update main.go to use the main
database connection instead of separate files.

- Added migration_018_add_prediction_tables with all prediction tables
- Added NewModelStoreWithDB() to accept shared DB connection
- Added NewAccuracyTrackerWithDB() to accept shared DB connection
- Updated Close() to only close DB when we own it (path != "")
- Updated main.go prediction init to use mainDB

The legacy NewModelStore() and NewAccuracyTracker() constructors remain for
backward compatibility (tests continue using their own migrations).

Closes: bf-38wcp
This commit is contained in:
jedarden 2026-05-24 11:40:45 -04:00
parent 85a16e3152
commit bd64d602bc
4 changed files with 168 additions and 14 deletions

View file

@ -874,12 +874,11 @@ func main() {
var predictionPredictor *prediction.Predictor
var predictionAccuracy *prediction.AccuracyTracker
var predictionHorizon *prediction.HorizonPredictor
predictionStore, err = prediction.NewModelStore(filepath.Join(cfg.DataDir, "prediction.db"))
predictionStore, err = prediction.NewModelStoreWithDB(mainDB)
if err != nil {
log.Printf("[WARN] Failed to open prediction store: %v", err)
log.Printf("[WARN] Failed to initialize prediction store: %v", err)
} else {
defer closeQuietly(predictionStore)
log.Printf("[INFO] Prediction store at %s", filepath.Join(cfg.DataDir, "prediction.db"))
log.Printf("[INFO] Prediction store using main database")
// Create history updater
predictionHistory = prediction.NewHistoryUpdater(predictionStore)
@ -890,12 +889,11 @@ func main() {
}
// Create accuracy tracker
predictionAccuracy, err = prediction.NewAccuracyTracker(filepath.Join(cfg.DataDir, "prediction_accuracy.db"))
predictionAccuracy, err = prediction.NewAccuracyTrackerWithDB(mainDB)
if err != nil {
log.Printf("[WARN] Failed to open accuracy tracker: %v", err)
log.Printf("[WARN] Failed to initialize accuracy tracker: %v", err)
} else {
defer closeQuietly(predictionAccuracy)
log.Printf("[INFO] Prediction accuracy tracker at %s", filepath.Join(cfg.DataDir, "prediction_accuracy.db"))
log.Printf("[INFO] Prediction accuracy tracker using main database")
}
// Create predictor

View file

@ -93,6 +93,11 @@ func AllMigrations() []Migration {
Description: "add predicted_enter to triggers table CHECK constraint",
Up: migration_017_add_predicted_enter_trigger,
},
{
Version: 18,
Description: "add prediction subsystem tables",
Up: migration_018_add_prediction_tables,
},
}
}
@ -949,3 +954,115 @@ func migration_017_add_predicted_enter_trigger(tx *sql.Tx) error {
return nil
}
// migration_018_add_prediction_tables creates all prediction subsystem tables.
// Consolidates prediction.db and prediction_accuracy.db into the main database.
func migration_018_add_prediction_tables(tx *sql.Tx) error {
schema := `
-- Zone transition history for probability models
CREATE TABLE IF NOT EXISTS zone_transitions_history (
id TEXT PRIMARY KEY,
person_id TEXT NOT NULL,
from_zone_id TEXT NOT NULL,
to_zone_id TEXT NOT NULL,
hour_of_week INTEGER NOT NULL,
dwell_duration_minutes REAL NOT NULL DEFAULT 0,
timestamp INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_transitions_person_time ON zone_transitions_history(person_id, hour_of_week);
CREATE INDEX IF NOT EXISTS idx_transitions_from ON zone_transitions_history(from_zone_id);
CREATE INDEX IF NOT EXISTS idx_transitions_timestamp ON zone_transitions_history(timestamp);
-- Transition probability cache
CREATE TABLE IF NOT EXISTS transition_probabilities (
person_id TEXT NOT NULL,
hour_of_week INTEGER NOT NULL,
from_zone_id TEXT NOT NULL,
to_zone_id TEXT NOT NULL,
probability REAL NOT NULL,
count INTEGER NOT NULL,
last_computed INTEGER NOT NULL,
PRIMARY KEY (person_id, hour_of_week, from_zone_id, to_zone_id)
);
-- Dwell time statistics cache
CREATE TABLE IF NOT EXISTS dwell_times (
person_id TEXT NOT NULL,
zone_id TEXT NOT NULL,
hour_of_week INTEGER NOT NULL,
mean_minutes REAL NOT NULL,
stddev_minutes REAL NOT NULL,
count INTEGER NOT NULL,
last_computed INTEGER NOT NULL,
PRIMARY KEY (person_id, zone_id, hour_of_week)
);
-- Person zone entry tracking
CREATE TABLE IF NOT EXISTS person_zone_entry (
person_id TEXT NOT NULL,
zone_id TEXT NOT NULL,
entry_time INTEGER NOT NULL,
blob_id INTEGER NOT NULL,
PRIMARY KEY (person_id, zone_id)
);
-- Recorded predictions for accuracy tracking
CREATE TABLE IF NOT EXISTS recorded_predictions (
id TEXT PRIMARY KEY,
person_id TEXT NOT NULL,
predicted_at INTEGER NOT NULL,
target_time INTEGER NOT NULL,
current_zone_id TEXT NOT NULL,
predicted_zone_id TEXT NOT NULL,
actual_zone_id TEXT,
prediction_confidence REAL NOT NULL,
horizon_minutes INTEGER NOT NULL,
evaluated INTEGER NOT NULL DEFAULT 0,
correct INTEGER DEFAULT 0,
evaluated_at INTEGER
);
CREATE INDEX IF NOT EXISTS idx_predictions_person ON recorded_predictions(person_id);
CREATE INDEX IF NOT EXISTS idx_predictions_target ON recorded_predictions(target_time);
CREATE INDEX IF NOT EXISTS idx_predictions_evaluated ON recorded_predictions(evaluated);
CREATE INDEX IF NOT EXISTS idx_predictions_person_target ON recorded_predictions(person_id, target_time);
-- Accuracy statistics summary
CREATE TABLE IF NOT EXISTS accuracy_stats (
person_id TEXT NOT NULL,
horizon_minutes INTEGER NOT NULL,
total_predictions INTEGER NOT NULL,
correct_predictions INTEGER NOT NULL,
accuracy REAL NOT NULL,
window_start INTEGER NOT NULL,
window_end INTEGER NOT NULL,
last_updated INTEGER NOT NULL,
PRIMARY KEY (person_id, horizon_minutes)
);
-- Zone occupancy patterns for time-based predictions
CREATE TABLE IF NOT EXISTS zone_occupancy_patterns (
zone_id TEXT NOT NULL,
hour_of_week INTEGER NOT NULL,
occupancy_prob REAL NOT NULL,
mean_dwell_minutes REAL NOT NULL,
stddev_dwell REAL NOT NULL,
sample_count INTEGER NOT NULL,
last_computed INTEGER NOT NULL,
PRIMARY KEY (zone_id, hour_of_week)
);
-- Zone occupancy history for pattern learning
CREATE TABLE IF NOT EXISTS zone_occupancy_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
zone_id TEXT NOT NULL,
person_id TEXT,
enter_time INTEGER NOT NULL,
exit_time INTEGER,
duration_minutes REAL
);
CREATE INDEX IF NOT EXISTS idx_occupancy_zone ON zone_occupancy_history(zone_id);
CREATE INDEX IF NOT EXISTS idx_occupancy_enter ON zone_occupancy_history(enter_time);
`
_, err := tx.Exec(schema)
return err
}

View file

@ -79,7 +79,8 @@ type AccuracyTracker struct {
horizon time.Duration
}
// NewAccuracyTracker creates a new accuracy tracker.
// NewAccuracyTracker creates a new accuracy tracker with its own database connection.
// Deprecated: Use NewAccuracyTrackerWithDB to share the main database connection.
func NewAccuracyTracker(dbPath string) (*AccuracyTracker, error) {
if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil {
return nil, fmt.Errorf("create data dir: %w", err)
@ -112,6 +113,24 @@ func NewAccuracyTracker(dbPath string) (*AccuracyTracker, error) {
return t, nil
}
// NewAccuracyTrackerWithDB creates a new accuracy tracker using an existing database connection.
// Tables must have been created by the migration framework.
func NewAccuracyTrackerWithDB(db *sql.DB) (*AccuracyTracker, error) {
t := &AccuracyTracker{
db: db,
pendingPredictions: make(map[string]RecordedPrediction),
cachedStats: make(map[string]*AccuracyStats),
horizon: PredictionHorizon,
}
// Load pending predictions
if err := t.loadPendingPredictions(); err != nil {
log.Printf("[WARN] prediction: failed to load pending predictions: %v", err)
}
return t, nil
}
func (t *AccuracyTracker) migrate() error {
// Create prediction accuracy schema version tracking table
_, err := t.db.Exec(`
@ -227,9 +246,12 @@ func (t *AccuracyTracker) loadPendingPredictions() error {
return nil
}
// Close closes the database.
// Close closes the database if we own it.
func (t *AccuracyTracker) Close() error {
return t.db.Close()
if t.path != "" {
return t.db.Close()
}
return nil
}
// RecordPrediction records a new prediction for later evaluation.

View file

@ -71,7 +71,8 @@ type ModelStore struct {
firstTransitionTime time.Time
}
// NewModelStore creates a new prediction model store.
// NewModelStore creates a new prediction model store with its own database connection.
// Deprecated: Use NewModelStoreWithDB to share the main database connection.
func NewModelStore(dbPath string) (*ModelStore, error) {
if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil {
return nil, fmt.Errorf("create data dir: %w", err)
@ -99,6 +100,19 @@ func NewModelStore(dbPath string) (*ModelStore, error) {
return s, nil
}
// NewModelStoreWithDB creates a new prediction model store using an existing database connection.
// Tables must have been created by the migration framework.
func NewModelStoreWithDB(db *sql.DB) (*ModelStore, error) {
s := &ModelStore{
db: db,
}
// Load first transition time
s.loadFirstTransitionTime()
return s, nil
}
func (s *ModelStore) migrate() error {
// Create prediction schema version tracking table
_, err := s.db.Exec(`
@ -182,9 +196,12 @@ func (s *ModelStore) loadFirstTransitionTime() {
}
}
// Close closes the database.
// Close closes the database if we own it.
func (s *ModelStore) Close() error {
return s.db.Close()
if s.path != "" {
return s.db.Close()
}
return nil
}
// RecordTransition records a zone transition event.