fix(prediction): add version tracking to prediction subsystem schema migrations
Some checks are pending
CI Benchmark - Fusion Loop Timing / Fusion Loop Timing Benchmark (push) Waiting to run

The prediction subsystem previously created 8 tables at runtime without
version tracking in separate SQLite databases (prediction.db,
prediction_accuracy.db). This created schema drift issues where changes
were unversioned and difficult to track.

Changes:
- Add prediction_schema_version table to prediction.db (model.go)
- Add prediction_accuracy_schema_version table to prediction_accuracy.db (accuracy.go)
- Convert migrate() functions to use versioned migrations (version 1)
- All 8 tables now created through versioned migration system:
  - zone_transitions_history, transition_probabilities, dwell_times, person_zone_entry
  - recorded_predictions, accuracy_stats, zone_occupancy_patterns, zone_occupancy_history

Closes: bf-38wcp
This commit is contained in:
jedarden 2026-05-24 09:55:42 -04:00
parent 2c8bbcf646
commit 3dd52861b3
3 changed files with 181 additions and 131 deletions

View file

@ -487,7 +487,6 @@ func migration_006_add_virtual_node_columns(tx *sql.Tx) error {
return nil
}
// migration_007_add_webhook_tables adds webhook_log, trigger_state tables
// and error_message/error_count columns to the triggers table.
func migration_007_add_webhook_tables(tx *sql.Tx) error {

View file

@ -24,39 +24,39 @@ const MinPredictionsForAccuracy = 10
// RecordedPrediction represents a prediction made at a specific time.
type RecordedPrediction struct {
ID string `json:"id"`
PersonID string `json:"person_id"`
PredictedAt time.Time `json:"predicted_at"`
TargetTime time.Time `json:"target_time"` // When the prediction targets
CurrentZoneID string `json:"current_zone_id"`
PredictedZoneID string `json:"predicted_zone_id"` // Zone predicted at target time
ActualZoneID string `json:"actual_zone_id,omitempty"` // Actual zone at target time (filled later)
PredictionConfidence float64 `json:"prediction_confidence"`
HorizonMinutes int `json:"horizon_minutes"`
Evaluated bool `json:"evaluated"`
Correct bool `json:"correct,omitempty"`
EvaluatedAt time.Time `json:"evaluated_at,omitempty"`
ID string `json:"id"`
PersonID string `json:"person_id"`
PredictedAt time.Time `json:"predicted_at"`
TargetTime time.Time `json:"target_time"` // When the prediction targets
CurrentZoneID string `json:"current_zone_id"`
PredictedZoneID string `json:"predicted_zone_id"` // Zone predicted at target time
ActualZoneID string `json:"actual_zone_id,omitempty"` // Actual zone at target time (filled later)
PredictionConfidence float64 `json:"prediction_confidence"`
HorizonMinutes int `json:"horizon_minutes"`
Evaluated bool `json:"evaluated"`
Correct bool `json:"correct,omitempty"`
EvaluatedAt time.Time `json:"evaluated_at,omitempty"`
}
// AccuracyStats represents accuracy statistics for a person.
type AccuracyStats struct {
PersonID string `json:"person_id"`
HorizonMinutes int `json:"horizon_minutes"`
TotalPredictions int `json:"total_predictions"`
CorrectPredictions int `json:"correct_predictions"`
Accuracy float64 `json:"accuracy"`
WindowStart time.Time `json:"window_start"`
WindowEnd time.Time `json:"window_end"`
LastUpdated time.Time `json:"last_updated"`
MeetsTarget bool `json:"meets_target"` // true if accuracy >= 75%
ConfusionMatrix map[string]map[string]int `json:"confusion_matrix,omitempty"` // actual -> predicted -> count
PersonID string `json:"person_id"`
HorizonMinutes int `json:"horizon_minutes"`
TotalPredictions int `json:"total_predictions"`
CorrectPredictions int `json:"correct_predictions"`
Accuracy float64 `json:"accuracy"`
WindowStart time.Time `json:"window_start"`
WindowEnd time.Time `json:"window_end"`
LastUpdated time.Time `json:"last_updated"`
MeetsTarget bool `json:"meets_target"` // true if accuracy >= 75%
ConfusionMatrix map[string]map[string]int `json:"confusion_matrix,omitempty"` // actual -> predicted -> count
}
// ZoneOccupancyPattern represents typical occupancy patterns for a zone.
type ZoneOccupancyPattern struct {
ZoneID string `json:"zone_id"`
HourOfWeek int `json:"hour_of_week"`
OccupancyProb float64 `json:"occupancy_probability"` // P(occupied | hour)
OccupancyProb float64 `json:"occupancy_probability"` // P(occupied | hour)
MeanDwellMinutes float64 `json:"mean_dwell_minutes"`
StddevDwell float64 `json:"stddev_dwell_minutes"`
SampleCount int `json:"sample_count"`
@ -65,9 +65,9 @@ type ZoneOccupancyPattern struct {
// AccuracyTracker tracks prediction accuracy over time.
type AccuracyTracker struct {
mu sync.RWMutex
db *sql.DB
path string
mu sync.RWMutex
db *sql.DB
path string
// Pending predictions awaiting evaluation
pendingPredictions map[string]RecordedPrediction // id -> prediction
@ -113,63 +113,88 @@ func NewAccuracyTracker(dbPath string) (*AccuracyTracker, error) {
}
func (t *AccuracyTracker) migrate() error {
// Create prediction accuracy schema version tracking table
_, err := t.db.Exec(`
CREATE TABLE IF NOT EXISTS recorded_predictions (
id TEXT PRIMARY KEY,
person_id TEXT NOT NULL,
predicted_at INTEGER NOT NULL,
target_time INTEGER NOT NULL,
current_zone_id TEXT NOT NULL,
predicted_zone_id TEXT NOT NULL,
actual_zone_id TEXT,
prediction_confidence REAL NOT NULL,
horizon_minutes INTEGER NOT NULL,
evaluated INTEGER NOT NULL DEFAULT 0,
correct INTEGER DEFAULT 0,
evaluated_at INTEGER
CREATE TABLE IF NOT EXISTS prediction_accuracy_schema_version (
version INTEGER PRIMARY KEY,
applied_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now') * 1000)
);
CREATE INDEX IF NOT EXISTS idx_predictions_person ON recorded_predictions(person_id);
CREATE INDEX IF NOT EXISTS idx_predictions_target ON recorded_predictions(target_time);
CREATE INDEX IF NOT EXISTS idx_predictions_evaluated ON recorded_predictions(evaluated);
CREATE INDEX IF NOT EXISTS idx_predictions_person_target ON recorded_predictions(person_id, target_time);
CREATE TABLE IF NOT EXISTS accuracy_stats (
person_id TEXT NOT NULL,
horizon_minutes INTEGER NOT NULL,
total_predictions INTEGER NOT NULL,
correct_predictions INTEGER NOT NULL,
accuracy REAL NOT NULL,
window_start INTEGER NOT NULL,
window_end INTEGER NOT NULL,
last_updated INTEGER NOT NULL,
PRIMARY KEY (person_id, horizon_minutes)
);
CREATE TABLE IF NOT EXISTS zone_occupancy_patterns (
zone_id TEXT NOT NULL,
hour_of_week INTEGER NOT NULL,
occupancy_prob REAL NOT NULL,
mean_dwell_minutes REAL NOT NULL,
stddev_dwell REAL NOT NULL,
sample_count INTEGER NOT NULL,
last_computed INTEGER NOT NULL,
PRIMARY KEY (zone_id, hour_of_week)
);
CREATE TABLE IF NOT EXISTS zone_occupancy_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
zone_id TEXT NOT NULL,
person_id TEXT,
enter_time INTEGER NOT NULL,
exit_time INTEGER,
duration_minutes REAL
);
CREATE INDEX IF NOT EXISTS idx_occupancy_zone ON zone_occupancy_history(zone_id);
CREATE INDEX IF NOT EXISTS idx_occupancy_enter ON zone_occupancy_history(enter_time);
`)
return err
if err != nil {
return err
}
// Check current version
var currentVersion int
err = t.db.QueryRow(`SELECT COALESCE(MAX(version), 0) FROM prediction_accuracy_schema_version`).Scan(&currentVersion)
if err != nil {
return err
}
// Version 1: initial accuracy tracking tables
if currentVersion < 1 {
_, err = t.db.Exec(`
CREATE TABLE IF NOT EXISTS recorded_predictions (
id TEXT PRIMARY KEY,
person_id TEXT NOT NULL,
predicted_at INTEGER NOT NULL,
target_time INTEGER NOT NULL,
current_zone_id TEXT NOT NULL,
predicted_zone_id TEXT NOT NULL,
actual_zone_id TEXT,
prediction_confidence REAL NOT NULL,
horizon_minutes INTEGER NOT NULL,
evaluated INTEGER NOT NULL DEFAULT 0,
correct INTEGER DEFAULT 0,
evaluated_at INTEGER
);
CREATE INDEX IF NOT EXISTS idx_predictions_person ON recorded_predictions(person_id);
CREATE INDEX IF NOT EXISTS idx_predictions_target ON recorded_predictions(target_time);
CREATE INDEX IF NOT EXISTS idx_predictions_evaluated ON recorded_predictions(evaluated);
CREATE INDEX IF NOT EXISTS idx_predictions_person_target ON recorded_predictions(person_id, target_time);
CREATE TABLE IF NOT EXISTS accuracy_stats (
person_id TEXT NOT NULL,
horizon_minutes INTEGER NOT NULL,
total_predictions INTEGER NOT NULL,
correct_predictions INTEGER NOT NULL,
accuracy REAL NOT NULL,
window_start INTEGER NOT NULL,
window_end INTEGER NOT NULL,
last_updated INTEGER NOT NULL,
PRIMARY KEY (person_id, horizon_minutes)
);
CREATE TABLE IF NOT EXISTS zone_occupancy_patterns (
zone_id TEXT NOT NULL,
hour_of_week INTEGER NOT NULL,
occupancy_prob REAL NOT NULL,
mean_dwell_minutes REAL NOT NULL,
stddev_dwell REAL NOT NULL,
sample_count INTEGER NOT NULL,
last_computed INTEGER NOT NULL,
PRIMARY KEY (zone_id, hour_of_week)
);
CREATE TABLE IF NOT EXISTS zone_occupancy_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
zone_id TEXT NOT NULL,
person_id TEXT,
enter_time INTEGER NOT NULL,
exit_time INTEGER,
duration_minutes REAL
);
CREATE INDEX IF NOT EXISTS idx_occupancy_zone ON zone_occupancy_history(zone_id);
CREATE INDEX IF NOT EXISTS idx_occupancy_enter ON zone_occupancy_history(enter_time);
INSERT INTO prediction_accuracy_schema_version (version) VALUES (1);
`)
if err != nil {
return err
}
}
return nil
}
func (t *AccuracyTracker) loadPendingPredictions() error {

View file

@ -15,13 +15,13 @@ import (
// ZoneTransition represents a recorded zone transition event.
type ZoneTransition struct {
ID string `json:"id"`
PersonID string `json:"person_id"`
FromZoneID string `json:"from_zone_id"`
ToZoneID string `json:"to_zone_id"`
HourOfWeek int `json:"hour_of_week"` // 0-167: day_of_week * 24 + hour_of_day
DwellDurationMinutes float64 `json:"dwell_duration_minutes"`
Timestamp time.Time `json:"timestamp"`
ID string `json:"id"`
PersonID string `json:"person_id"`
FromZoneID string `json:"from_zone_id"`
ToZoneID string `json:"to_zone_id"`
HourOfWeek int `json:"hour_of_week"` // 0-167: day_of_week * 24 + hour_of_day
DwellDurationMinutes float64 `json:"dwell_duration_minutes"`
Timestamp time.Time `json:"timestamp"`
}
// TransitionProbability represents the probability of transitioning from one zone to another.
@ -100,52 +100,78 @@ func NewModelStore(dbPath string) (*ModelStore, error) {
}
func (s *ModelStore) migrate() error {
// Create prediction schema version tracking table
_, err := s.db.Exec(`
CREATE TABLE IF NOT EXISTS zone_transitions_history (
id TEXT PRIMARY KEY,
person_id TEXT NOT NULL,
from_zone_id TEXT NOT NULL,
to_zone_id TEXT NOT NULL,
hour_of_week INTEGER NOT NULL,
dwell_duration_minutes REAL NOT NULL DEFAULT 0,
timestamp INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_transitions_person_time ON zone_transitions_history(person_id, hour_of_week);
CREATE INDEX IF NOT EXISTS idx_transitions_from ON zone_transitions_history(from_zone_id);
CREATE INDEX IF NOT EXISTS idx_transitions_timestamp ON zone_transitions_history(timestamp);
CREATE TABLE IF NOT EXISTS transition_probabilities (
person_id TEXT NOT NULL,
hour_of_week INTEGER NOT NULL,
from_zone_id TEXT NOT NULL,
to_zone_id TEXT NOT NULL,
probability REAL NOT NULL,
count INTEGER NOT NULL,
last_computed INTEGER NOT NULL,
PRIMARY KEY (person_id, hour_of_week, from_zone_id, to_zone_id)
);
CREATE TABLE IF NOT EXISTS dwell_times (
person_id TEXT NOT NULL,
zone_id TEXT NOT NULL,
hour_of_week INTEGER NOT NULL,
mean_minutes REAL NOT NULL,
stddev_minutes REAL NOT NULL,
count INTEGER NOT NULL,
last_computed INTEGER NOT NULL,
PRIMARY KEY (person_id, zone_id, hour_of_week)
);
CREATE TABLE IF NOT EXISTS person_zone_entry (
person_id TEXT NOT NULL,
zone_id TEXT NOT NULL,
entry_time INTEGER NOT NULL,
blob_id INTEGER NOT NULL,
PRIMARY KEY (person_id, zone_id)
CREATE TABLE IF NOT EXISTS prediction_schema_version (
version INTEGER PRIMARY KEY,
applied_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now') * 1000)
);
`)
return err
if err != nil {
return err
}
// Check current version
var currentVersion int
err = s.db.QueryRow(`SELECT COALESCE(MAX(version), 0) FROM prediction_schema_version`).Scan(&currentVersion)
if err != nil {
return err
}
// Version 1: initial prediction tables
if currentVersion < 1 {
_, err = s.db.Exec(`
CREATE TABLE IF NOT EXISTS zone_transitions_history (
id TEXT PRIMARY KEY,
person_id TEXT NOT NULL,
from_zone_id TEXT NOT NULL,
to_zone_id TEXT NOT NULL,
hour_of_week INTEGER NOT NULL,
dwell_duration_minutes REAL NOT NULL DEFAULT 0,
timestamp INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_transitions_person_time ON zone_transitions_history(person_id, hour_of_week);
CREATE INDEX IF NOT EXISTS idx_transitions_from ON zone_transitions_history(from_zone_id);
CREATE INDEX IF NOT EXISTS idx_transitions_timestamp ON zone_transitions_history(timestamp);
CREATE TABLE IF NOT EXISTS transition_probabilities (
person_id TEXT NOT NULL,
hour_of_week INTEGER NOT NULL,
from_zone_id TEXT NOT NULL,
to_zone_id TEXT NOT NULL,
probability REAL NOT NULL,
count INTEGER NOT NULL,
last_computed INTEGER NOT NULL,
PRIMARY KEY (person_id, hour_of_week, from_zone_id, to_zone_id)
);
CREATE TABLE IF NOT EXISTS dwell_times (
person_id TEXT NOT NULL,
zone_id TEXT NOT NULL,
hour_of_week INTEGER NOT NULL,
mean_minutes REAL NOT NULL,
stddev_minutes REAL NOT NULL,
count INTEGER NOT NULL,
last_computed INTEGER NOT NULL,
PRIMARY KEY (person_id, zone_id, hour_of_week)
);
CREATE TABLE IF NOT EXISTS person_zone_entry (
person_id TEXT NOT NULL,
zone_id TEXT NOT NULL,
entry_time INTEGER NOT NULL,
blob_id INTEGER NOT NULL,
PRIMARY KEY (person_id, zone_id)
);
INSERT INTO prediction_schema_version (version) VALUES (1);
`)
if err != nil {
return err
}
}
return nil
}
func (s *ModelStore) loadFirstTransitionTime() {