diff --git a/mothership/internal/api/volume_triggers.go b/mothership/internal/api/volume_triggers.go index 18a8b72..18002c8 100644 --- a/mothership/internal/api/volume_triggers.go +++ b/mothership/internal/api/volume_triggers.go @@ -38,11 +38,11 @@ type WSBroadcaster interface { // VolumeTriggersHandler manages automation trigger volumes with 3D geometry. type VolumeTriggersHandler struct { - mu sync.RWMutex - store *volume.Store - httpClient *http.Client - mqttClient VolumeMQTTClient - notifyClient NotificationClient + mu sync.RWMutex + store *volume.Store + httpClient *http.Client + mqttClient VolumeMQTTClient + notifyClient NotificationClient wsBroadcaster WSBroadcaster } @@ -64,39 +64,39 @@ type VolumeTriggersHandler struct { // - created_at: creation timestamp // - updated_at: last modification timestamp type TriggerResponse struct { - ID string `json:"id"` - Name string `json:"name"` - Shape volume.ShapeJSON `json:"shape"` - Condition string `json:"condition"` + ID string `json:"id"` + Name string `json:"name"` + Shape volume.ShapeJSON `json:"shape"` + Condition string `json:"condition"` ConditionParams volume.ConditionParams `json:"condition_params"` - TimeConstraint *volume.TimeConstraint `json:"time_constraint,omitempty"` - Actions []volume.Action `json:"actions"` - Enabled bool `json:"enabled"` - ErrorMessage string `json:"error_message,omitempty"` - ErrorCount int `json:"error_count"` - LastFired *time.Time `json:"last_fired,omitempty"` - Elapsed int `json:"elapsed,omitempty"` // seconds since last fire - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + TimeConstraint *volume.TimeConstraint `json:"time_constraint,omitempty"` + Actions []volume.Action `json:"actions"` + Enabled bool `json:"enabled"` + ErrorMessage string `json:"error_message,omitempty"` + ErrorCount int `json:"error_count"` + LastFired *time.Time `json:"last_fired,omitempty"` + Elapsed int `json:"elapsed,omitempty"` // seconds since last fire + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // WebhookTestResult is returned by POST /api/triggers/{id}/test. // // Contains the overall test status and per-action execution results. type WebhookTestResult struct { - Status string `json:"status"` - ResponseMs int64 `json:"response_ms"` - Error string `json:"error,omitempty"` - Actions []ActionResult `json:"actions"` + Status string `json:"status"` + ResponseMs int64 `json:"response_ms"` + Error string `json:"error,omitempty"` + Actions []ActionResult `json:"actions"` } // ActionResult represents the outcome of executing a single action during a test fire. type ActionResult struct { - Type string `json:"type"` - URL string `json:"url,omitempty"` - Status int `json:"status,omitempty"` + Type string `json:"type"` + URL string `json:"url,omitempty"` + Status int `json:"status,omitempty"` ResponseMs int64 `json:"response_ms,omitempty"` - Error string `json:"error,omitempty"` + Error string `json:"error,omitempty"` } // NewVolumeTriggersHandler creates a new triggers handler with volume support. @@ -140,6 +140,13 @@ func (h *VolumeTriggersHandler) SetWSBroadcaster(broadcaster WSBroadcaster) { h.wsBroadcaster = broadcaster } +// SetPredictionProvider sets the prediction provider for predicted_enter triggers. +func (h *VolumeTriggersHandler) SetPredictionProvider(pp volume.PredictionProvider) { + h.mu.Lock() + defer h.mu.Unlock() + h.store.SetPredictionProvider(pp) +} + // Close closes the underlying store. func (h *VolumeTriggersHandler) Close() error { return h.store.Close() @@ -332,13 +339,13 @@ func (h *VolumeTriggersHandler) getTrigger(w http.ResponseWriter, r *http.Reques // volumeCreateTriggerRequest is the request body for POST /api/triggers. type volumeCreateTriggerRequest struct { - Name string `json:"name"` - Shape volume.ShapeJSON `json:"shape"` - Condition string `json:"condition"` + Name string `json:"name"` + Shape volume.ShapeJSON `json:"shape"` + Condition string `json:"condition"` ConditionParams volume.ConditionParams `json:"condition_params,omitempty"` - TimeConstraint *volume.TimeConstraint `json:"time_constraint,omitempty"` - Actions []volume.Action `json:"actions"` - Enabled *bool `json:"enabled,omitempty"` + TimeConstraint *volume.TimeConstraint `json:"time_constraint,omitempty"` + Actions []volume.Action `json:"actions"` + Enabled *bool `json:"enabled,omitempty"` } // createTrigger handles POST /api/triggers. @@ -386,10 +393,11 @@ func (h *VolumeTriggersHandler) createTrigger(w http.ResponseWriter, r *http.Req "leave": true, "dwell": true, "vacant": true, - "count": true, - } + "count": true, + "predicted_enter": true, + } if !validConditions[req.Condition] { - http.Error(w, "condition must be one of: enter, leave, dwell, vacant, count", http.StatusBadRequest) + http.Error(w, "condition must be one of: enter, leave, dwell, vacant, count, predicted_enter", http.StatusBadRequest) return } @@ -430,13 +438,13 @@ func (h *VolumeTriggersHandler) createTrigger(w http.ResponseWriter, r *http.Req // volumeUpdateTriggerRequest is the request body for PUT /api/triggers/{id}. // Only non-nil fields are updated. type volumeUpdateTriggerRequest struct { - Name *string `json:"name,omitempty"` - Shape *volume.ShapeJSON `json:"shape,omitempty"` - Condition *string `json:"condition,omitempty"` + Name *string `json:"name,omitempty"` + Shape *volume.ShapeJSON `json:"shape,omitempty"` + Condition *string `json:"condition,omitempty"` ConditionParams *volume.ConditionParams `json:"condition_params,omitempty"` - TimeConstraint *volume.TimeConstraint `json:"time_constraint,omitempty"` - Actions *[]volume.Action `json:"actions,omitempty"` - Enabled *bool `json:"enabled,omitempty"` + TimeConstraint *volume.TimeConstraint `json:"time_constraint,omitempty"` + Actions *[]volume.Action `json:"actions,omitempty"` + Enabled *bool `json:"enabled,omitempty"` } // updateTrigger handles PUT /api/triggers/{id}. @@ -562,10 +570,10 @@ func (h *VolumeTriggersHandler) testTrigger(w http.ResponseWriter, r *http.Reque "condition": trigger.Condition, "blob_id": 0, "person": nil, - "position": map[string]float64{"x": 0, "y": 0, "z": 0}, + "position": map[string]float64{"x": 0, "y": 0, "z": 0}, "zone": nil, "dwell_s": 0, - "timestamp_ms": now.UnixMilli(), + "timestamp_ms": now.UnixMilli(), } data, err := json.Marshal(payload) @@ -614,9 +622,9 @@ func (h *VolumeTriggersHandler) testTrigger(w http.ResponseWriter, r *http.Reque totalMs := time.Since(totalStart).Milliseconds() resp := WebhookTestResult{ - Status: "ok", + Status: "ok", ResponseMs: totalMs, - Actions: results, + Actions: results, } writeJSON(w, http.StatusOK, resp) @@ -747,19 +755,19 @@ func (h *VolumeTriggersHandler) isValidShape(shape *volume.ShapeJSON) bool { // toResponse converts a trigger to the API response format. func (h *VolumeTriggersHandler) toResponse(t *volume.Trigger, now time.Time) *TriggerResponse { resp := &TriggerResponse{ - ID: t.ID, - Name: t.Name, - Shape: t.Shape, - Condition: t.Condition, + ID: t.ID, + Name: t.Name, + Shape: t.Shape, + Condition: t.Condition, ConditionParams: t.ConditionParams, - TimeConstraint: t.TimeConstraint, - Actions: t.Actions, - Enabled: t.Enabled, - ErrorMessage: t.ErrorMessage, - ErrorCount: t.ErrorCount, - LastFired: t.LastFired, - CreatedAt: t.CreatedAt, - UpdatedAt: t.UpdatedAt, + TimeConstraint: t.TimeConstraint, + Actions: t.Actions, + Enabled: t.Enabled, + ErrorMessage: t.ErrorMessage, + ErrorCount: t.ErrorCount, + LastFired: t.LastFired, + CreatedAt: t.CreatedAt, + UpdatedAt: t.UpdatedAt, } if t.LastFired != nil { @@ -864,14 +872,14 @@ func (h *VolumeTriggersHandler) executeWebhook(action volume.Action, event volum t := h.store.GetTrigger(event.TriggerID) payload := map[string]interface{}{ "trigger_id": event.TriggerID, - "trigger_name": t.Name, + "trigger_name": t.Name, "condition": t.Condition, "blob_id": 0, "person": nil, "position": map[string]float64{"x": 0, "y": 0, "z": 0}, "zone": nil, "dwell_s": 0, - "timestamp_ms": event.Timestamp.UnixMilli(), + "timestamp_ms": event.Timestamp.UnixMilli(), } data, err := json.Marshal(payload) @@ -950,7 +958,7 @@ func (h *VolumeTriggersHandler) executeMQTT(action volume.Action, event volume.F t := h.store.GetTrigger(event.TriggerID) payload := map[string]interface{}{ "trigger_id": event.TriggerID, - "trigger_name": t.Name, + "trigger_name": t.Name, "condition": t.Condition, "fired_at": event.Timestamp.Format(time.RFC3339), "blob_ids": event.BlobIDs, @@ -982,10 +990,10 @@ func (h *VolumeTriggersHandler) executeNotification(action volume.Action, event body := fmt.Sprintf("%s triggered (%s)", event.TriggerName, event.Condition) data := map[string]interface{}{ - "trigger_id": event.TriggerID, + "trigger_id": event.TriggerID, "trigger_name": event.TriggerName, - "condition": event.Condition, - "timestamp": event.Timestamp.Unix(), + "condition": event.Condition, + "timestamp": event.Timestamp.Unix(), } if err := client.SendViaChannel(action.Type, title, body, data); err != nil { diff --git a/mothership/internal/automation/engine.go b/mothership/internal/automation/engine.go index aa0c802..db76136 100644 --- a/mothership/internal/automation/engine.go +++ b/mothership/internal/automation/engine.go @@ -22,17 +22,17 @@ import ( type TriggerType string const ( - TriggerZoneEnter TriggerType = "zone_enter" - TriggerZoneLeave TriggerType = "zone_leave" - TriggerZoneDwell TriggerType = "zone_dwell" - TriggerZoneVacant TriggerType = "zone_vacant" - TriggerPersonCountChange TriggerType = "person_count_change" - TriggerFallDetected TriggerType = "fall_detected" - TriggerAnomaly TriggerType = "anomaly" - TriggerBLEDevicePresent TriggerType = "ble_device_present" - TriggerBLEDeviceAbsent TriggerType = "ble_device_absent" - TriggerVolumeEnter TriggerType = "volume_enter" - TriggerVolumeLeave TriggerType = "volume_leave" + TriggerZoneEnter TriggerType = "zone_enter" + TriggerZoneLeave TriggerType = "zone_leave" + TriggerZoneDwell TriggerType = "zone_dwell" + TriggerZoneVacant TriggerType = "zone_vacant" + TriggerPersonCountChange TriggerType = "person_count_change" + TriggerFallDetected TriggerType = "fall_detected" + TriggerAnomaly TriggerType = "anomaly" + TriggerBLEDevicePresent TriggerType = "ble_device_present" + TriggerBLEDeviceAbsent TriggerType = "ble_device_absent" + TriggerVolumeEnter TriggerType = "volume_enter" + TriggerVolumeLeave TriggerType = "volume_leave" TriggerPredictedZoneEnter TriggerType = "predicted_zone_enter" // N minutes before predicted zone entry ) @@ -47,10 +47,10 @@ const ( // TriggerVolume represents a 3D region that can trigger automations. type TriggerVolume struct { - ID string `json:"id"` - Name string `json:"name"` - Type string `json:"type"` // box, sphere, cylinder - Enabled bool `json:"enabled"` + ID string `json:"id"` + Name string `json:"name"` + Type string `json:"type"` // box, sphere, cylinder + Enabled bool `json:"enabled"` AutomationID string `json:"automation_id,omitempty"` // Box type MinX float64 `json:"min_x,omitempty"` @@ -63,13 +63,13 @@ type TriggerVolume struct { CenterX float64 `json:"center_x,omitempty"` CenterY float64 `json:"center_y,omitempty"` CenterZ float64 `json:"center_z,omitempty"` - Radius float64 `json:"radius,omitempty"` + Radius float64 `json:"radius,omitempty"` // Cylinder type - BaseX float64 `json:"base_x,omitempty"` - BaseZ float64 `json:"base_z,omitempty"` - BaseRadius float64 `json:"base_radius,omitempty"` - MinHeight float64 `json:"min_height,omitempty"` - MaxHeight float64 `json:"max_height,omitempty"` + BaseX float64 `json:"base_x,omitempty"` + BaseZ float64 `json:"base_z,omitempty"` + BaseRadius float64 `json:"base_radius,omitempty"` + MinHeight float64 `json:"min_height,omitempty"` + MaxHeight float64 `json:"max_height,omitempty"` } // TriggerConfig holds configuration for the trigger. @@ -108,39 +108,39 @@ type Condition struct { type ActionType string const ( - ActionWebhook ActionType = "webhook" - ActionMQTT ActionType = "mqtt_publish" - ActionNtfy ActionType = "ntfy" - ActionPushover ActionType = "pushover" + ActionWebhook ActionType = "webhook" + ActionMQTT ActionType = "mqtt_publish" + ActionNtfy ActionType = "ntfy" + ActionPushover ActionType = "pushover" ) // Action represents an action to execute when triggered. type Action struct { - Type ActionType `json:"type"` - URL string `json:"url,omitempty"` // for webhook - Topic string `json:"topic,omitempty"` // for mqtt - Server string `json:"server,omitempty"` // for ntfy - Token string `json:"token,omitempty"` // for pushover - UserKey string `json:"user_key,omitempty"` // for pushover - Template string `json:"template,omitempty"` // payload template - Headers map[string]string `json:"headers,omitempty"` - Parameters map[string]interface{} `json:"parameters,omitempty"` + Type ActionType `json:"type"` + URL string `json:"url,omitempty"` // for webhook + Topic string `json:"topic,omitempty"` // for mqtt + Server string `json:"server,omitempty"` // for ntfy + Token string `json:"token,omitempty"` // for pushover + UserKey string `json:"user_key,omitempty"` // for pushover + Template string `json:"template,omitempty"` // payload template + Headers map[string]string `json:"headers,omitempty"` + Parameters map[string]interface{} `json:"parameters,omitempty"` } // Automation represents an automation rule. type Automation struct { - ID string `json:"id"` - Name string `json:"name"` - Enabled bool `json:"enabled"` - TriggerType TriggerType `json:"trigger_type"` - TriggerConfig TriggerConfig `json:"trigger_config"` - Conditions []Condition `json:"conditions,omitempty"` - Actions []Action `json:"actions"` - Cooldown int `json:"cooldown"` // seconds between triggers - LastFired time.Time `json:"last_fired"` - FireCount int `json:"fire_count"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID string `json:"id"` + Name string `json:"name"` + Enabled bool `json:"enabled"` + TriggerType TriggerType `json:"trigger_type"` + TriggerConfig TriggerConfig `json:"trigger_config"` + Conditions []Condition `json:"conditions,omitempty"` + Actions []Action `json:"actions"` + Cooldown int `json:"cooldown"` // seconds between triggers + LastFired time.Time `json:"last_fired"` + FireCount int `json:"fire_count"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // ActionResult represents the result of an action execution. @@ -174,20 +174,20 @@ type TriggerEventData struct { // Event represents an internal event that can trigger automations. type Event struct { - Type TriggerType - Timestamp time.Time - PersonID string - PersonName string - PersonColor string - ZoneID string - ZoneName string - FromZone string - ToZone string - DeviceMAC string - DeviceName string + Type TriggerType + Timestamp time.Time + PersonID string + PersonName string + PersonColor string + ZoneID string + ZoneName string + FromZone string + ToZone string + DeviceMAC string + DeviceName string OccupantCount int - Confidence float64 - Extra map[string]interface{} + Confidence float64 + Extra map[string]interface{} } // ZoneInfoProvider provides zone information. @@ -237,12 +237,12 @@ type PredictionInfo struct { // Engine manages automation rules and triggers. type Engine struct { - mu sync.RWMutex - db *sql.DB + mu sync.RWMutex + db *sql.DB - automations map[string]*Automation - volumes map[string]*TriggerVolume - cooldowns map[string]time.Time // automationID -> last trigger time + automations map[string]*Automation + volumes map[string]*TriggerVolume + cooldowns map[string]time.Time // automationID -> last trigger time // Zone dwell tracking zoneEnterTime map[string]map[int]time.Time // zoneID -> blobID -> enter time @@ -254,15 +254,15 @@ type Engine struct { systemMode SystemMode // Providers - zoneProvider ZoneInfoProvider - personProvider PersonInfoProvider - deviceProvider DeviceInfoProvider + zoneProvider ZoneInfoProvider + personProvider PersonInfoProvider + deviceProvider DeviceInfoProvider predictionProvider PredictionProvider // Clients - httpClient *http.Client - mqttClient MQTTClient - notifySender NotificationSender + httpClient *http.Client + mqttClient MQTTClient + notifySender NotificationSender // Callbacks onTrigger func(TriggerEventData) @@ -281,14 +281,14 @@ func NewEngine(dbPath string) (*Engine, error) { db.SetMaxOpenConns(1) e := &Engine{ - db: db, - automations: make(map[string]*Automation), - volumes: make(map[string]*TriggerVolume), - cooldowns: make(map[string]time.Time), - zoneEnterTime: make(map[string]map[int]time.Time), - deviceLastSeen: make(map[string]time.Time), - systemMode: ModeHome, - httpClient: &http.Client{Timeout: 10 * time.Second}, + db: db, + automations: make(map[string]*Automation), + volumes: make(map[string]*TriggerVolume), + cooldowns: make(map[string]time.Time), + zoneEnterTime: make(map[string]map[int]time.Time), + deviceLastSeen: make(map[string]time.Time), + systemMode: ModeHome, + httpClient: &http.Client{Timeout: 10 * time.Second}, } if err := e.migrate(); err != nil { @@ -498,6 +498,13 @@ func (e *Engine) SetNotificationSender(sender NotificationSender) { e.mu.Unlock() } +// SetPredictionProvider sets the prediction provider for predicted_zone_enter triggers. +func (e *Engine) SetPredictionProvider(pp PredictionProvider) { + e.mu.Lock() + e.predictionProvider = pp + e.mu.Unlock() +} + // SetOnTrigger sets callback for trigger events. func (e *Engine) SetOnTrigger(cb func(TriggerEventData)) { e.mu.Lock() @@ -839,7 +846,7 @@ func (e *Engine) isDayOfWeek(value string, now time.Time) bool { if len(value) <= 7 { bitmask := parseInt(value) if bitmask > 0 { - return (bitmask&(1<= cfg.AbsentMinutes { event := Event{ - Type: TriggerBLEDeviceAbsent, - Timestamp: now, - DeviceMAC: targetMAC, + Type: TriggerBLEDeviceAbsent, + Timestamp: now, + DeviceMAC: targetMAC, } if e.deviceProvider != nil { event.DeviceName, _ = e.deviceProvider.GetDevice(targetMAC) @@ -1445,10 +1452,10 @@ func (e *Engine) checkVolumeTriggers(blob TrackedBlob, currentZone string, now t // GetRecentActionLog returns recent action log entries. func (e *Engine) GetRecentActionLog(limit int) []struct { - AutomationID string - FiredAt time.Time - EventJSON string - ResultsJSON string + AutomationID string + FiredAt time.Time + EventJSON string + ResultsJSON string } { rows, err := e.db.Query(` SELECT automation_id, fired_at, event_json, actions_results_json @@ -1463,18 +1470,18 @@ func (e *Engine) GetRecentActionLog(limit int) []struct { defer rows.Close() //nolint:errcheck var results []struct { - AutomationID string - FiredAt time.Time - EventJSON string - ResultsJSON string + AutomationID string + FiredAt time.Time + EventJSON string + ResultsJSON string } for rows.Next() { var entry struct { - AutomationID string - FiredAt time.Time - EventJSON string - ResultsJSON string + AutomationID string + FiredAt time.Time + EventJSON string + ResultsJSON string } var firedAtNS int64 if err := rows.Scan(&entry.AutomationID, &firedAtNS, &entry.EventJSON, &entry.ResultsJSON); err != nil { diff --git a/mothership/internal/db/migrations.go b/mothership/internal/db/migrations.go index 98717d7..875c60e 100644 --- a/mothership/internal/db/migrations.go +++ b/mothership/internal/db/migrations.go @@ -88,6 +88,11 @@ func AllMigrations() []Migration { Description: "add crossing_events table for portal crossing log", Up: migration_016_add_crossing_events_table, }, + { + Version: 17, + Description: "add predicted_enter to triggers table CHECK constraint", + Up: migration_017_add_predicted_enter_trigger, + }, } } @@ -871,3 +876,76 @@ func migration_016_add_crossing_events_table(tx *sql.Tx) error { _, err := tx.Exec(schema) return err } + +// migration_017_add_predicted_enter_trigger expands the triggers table +// CHECK constraint to include 'predicted_enter' for pre-emptive automation. +// SQLite doesn't support ALTER CONSTRAINT, so we recreate the table. +func migration_017_add_predicted_enter_trigger(tx *sql.Tx) error { + // Check if triggers table exists + var triggersExists bool + if err := tx.QueryRow( + `SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='triggers'`, + ).Scan(&triggersExists); err != nil { + return err + } + if !triggersExists { + return nil // triggers table will be created by a later migration + } + + // In SQLite, we need to recreate the table with the new CHECK constraint + // Step 1: Create a new triggers table with the updated constraint + _, err := tx.Exec(` + CREATE TABLE triggers_new ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + shape_json TEXT NOT NULL, + condition TEXT NOT NULL CHECK (condition IN ('enter','leave','dwell','vacant','count','predicted_enter')), + condition_params_json TEXT, + time_constraint_json TEXT, + actions_json TEXT NOT NULL, + enabled INTEGER NOT NULL DEFAULT 1, + last_fired INTEGER, + error_message TEXT DEFAULT '', + error_count INTEGER NOT NULL DEFAULT 0, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now') * 1000), + updated_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now') * 1000) + ); + `) + if err != nil { + return err + } + + // Step 2: Copy data from old table to new table + _, err = tx.Exec(` + INSERT INTO triggers_new (id, name, shape_json, condition, condition_params_json, + time_constraint_json, actions_json, enabled, last_fired, created_at, updated_at, + error_message, error_count) + SELECT id, name, shape_json, condition, condition_params_json, + time_constraint_json, actions_json, enabled, last_fired, created_at, updated_at, + COALESCE(error_message, ''), COALESCE(error_count, 0) + FROM triggers; + `) + if err != nil { + return err + } + + // Step 3: Drop the old table + _, err = tx.Exec(`DROP TABLE triggers;`) + if err != nil { + return err + } + + // Step 4: Rename the new table to the original name + _, err = tx.Exec(`ALTER TABLE triggers_new RENAME TO triggers;`) + if err != nil { + return err + } + + // Step 5: Recreate indexes (if any existed) + _, err = tx.Exec(`CREATE INDEX IF NOT EXISTS idx_triggers_enabled ON triggers(enabled);`) + if err != nil { + return err + } + + return nil +} diff --git a/mothership/internal/volume/shape.go b/mothership/internal/volume/shape.go index d934ab9..156aef5 100644 --- a/mothership/internal/volume/shape.go +++ b/mothership/internal/volume/shape.go @@ -9,6 +9,7 @@ import ( "log" "os" "path/filepath" + "strings" "sync" "time" @@ -100,26 +101,26 @@ func (s *ShapeJSON) isInsideCylinder(p Point3D) bool { // Trigger represents a spatial automation trigger from the triggers table. type Trigger struct { - ID string `json:"id"` - Name string `json:"name"` - Shape ShapeJSON `json:"shape"` - Condition string `json:"condition"` // enter, leave, dwell, vacant, count - ConditionParams ConditionParams `json:"condition_params"` - TimeConstraint *TimeConstraint `json:"time_constraint,omitempty"` - Actions []Action `json:"actions"` - Enabled bool `json:"enabled"` - ErrorMessage string `json:"error_message,omitempty"` // Set when disabled by 4xx - ErrorCount int `json:"error_count"` // Incremented on 5xx/timeout, reset on 2xx - LastFired *time.Time `json:"last_fired,omitempty"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID string `json:"id"` + Name string `json:"name"` + Shape ShapeJSON `json:"shape"` + Condition string `json:"condition"` // enter, leave, dwell, vacant, count + ConditionParams ConditionParams `json:"condition_params"` + TimeConstraint *TimeConstraint `json:"time_constraint,omitempty"` + Actions []Action `json:"actions"` + Enabled bool `json:"enabled"` + ErrorMessage string `json:"error_message,omitempty"` // Set when disabled by 4xx + ErrorCount int `json:"error_count"` // Incremented on 5xx/timeout, reset on 2xx + LastFired *time.Time `json:"last_fired,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // ConditionParams holds trigger condition parameters. type ConditionParams struct { - DurationS *int `json:"duration_s,omitempty"` // For dwell: seconds inside volume - CountThreshold *int `json:"count_threshold,omitempty"` // For count: minimum blob count - PersonID string `json:"person_id,omitempty"` // Filter by person ID + DurationS *int `json:"duration_s,omitempty"` // For dwell: seconds inside volume + CountThreshold *int `json:"count_threshold,omitempty"` // For count: minimum blob count + PersonID string `json:"person_id,omitempty"` // Filter by person ID } // TimeConstraint represents a time window constraint. @@ -130,8 +131,8 @@ type TimeConstraint struct { // Action represents an action to execute when a trigger fires. type Action struct { - Type string `json:"type"` // webhook, mqtt, internal - Params map[string]interface{} `json:"params"` + Type string `json:"type"` // webhook, mqtt, internal + Params map[string]interface{} `json:"params"` } // BlobState represents the state of a tracked blob relative to a trigger. @@ -162,14 +163,33 @@ type FiredEvent struct { // FiringCallback is called when a trigger fires. type FiringCallback func(event FiredEvent) +// PredictionInfo represents a prediction for use by predicted_enter triggers. +type PredictionInfo struct { + PersonID string + PersonName string + CurrentZoneID string + CurrentZoneName string + PredictedNextZoneID string + PredictedNextZoneName string + PredictionConfidence float64 + EstimatedTransitionMinutes float64 + DataConfidence string +} + +// PredictionProvider provides prediction information for predicted_enter triggers. +type PredictionProvider interface { + GetPredictions() []PredictionInfo +} + // Store provides trigger storage and state management. type Store struct { - mu sync.RWMutex - db *sql.DB - triggers map[string]*Trigger - triggerState map[string]*TriggerState // trigger_id -> state - blobVolumes map[int]string // blob_id -> current volume_id (for tracking) - onFired FiringCallback // Called when a trigger fires + mu sync.RWMutex + db *sql.DB + triggers map[string]*Trigger + triggerState map[string]*TriggerState // trigger_id -> state + blobVolumes map[int]string // blob_id -> current volume_id (for tracking) + onFired FiringCallback // Called when a trigger fires + predictionProvider PredictionProvider // Provides prediction data for predicted_enter triggers } // NewStore creates a new trigger volume store. @@ -212,7 +232,7 @@ func (s *Store) init() error { id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, shape_json TEXT NOT NULL, - condition TEXT NOT NULL CHECK (condition IN ('enter','leave','dwell','vacant','count')), + condition TEXT NOT NULL CHECK (condition IN ('enter','leave','dwell','vacant','count','predicted_enter')), condition_params_json TEXT, time_constraint_json TEXT, actions_json TEXT NOT NULL, @@ -518,6 +538,13 @@ func (s *Store) SetOnFired(cb FiringCallback) { s.onFired = cb } +// SetPredictionProvider sets the prediction provider for predicted_enter triggers. +func (s *Store) SetPredictionProvider(pp PredictionProvider) { + s.mu.Lock() + defer s.mu.Unlock() + s.predictionProvider = pp +} + // Evaluate evaluates all enabled triggers against the current blob positions. // Returns a list of trigger IDs that should fire. func (s *Store) Evaluate(blobs []BlobPos, now time.Time) []string { @@ -566,6 +593,8 @@ func (s *Store) Evaluate(blobs []BlobPos, now time.Time) []string { shouldFire = s.evaluateVacant(t, state, blobs, now) case "count": shouldFire = s.evaluateCount(t, state, blobs, now) + case "predicted_enter": + shouldFire = s.evaluatePredictedEnter(t, state, now) } if shouldFire { @@ -797,6 +826,109 @@ func (s *Store) evaluateCount(t *Trigger, state *TriggerState, blobs []BlobPos, return crossedThreshold } +// evaluatePredictedEnter triggers when a prediction indicates a person is likely +// to enter a zone within the configured time window. Uses rising-edge detection: +// fires when P(predicted zone entry) > threshold AND previous computation had P < threshold. +// Suppresses re-firing for the same (person, zone, time_slot) within 60 minutes. +func (s *Store) evaluatePredictedEnter(t *Trigger, state *TriggerState, now time.Time) bool { + if s.predictionProvider == nil { + return false // No prediction provider configured + } + + // Get minutes ahead threshold from condition params (default 30 minutes) + minutesAhead := 30 + if t.ConditionParams.DurationS != nil { + minutesAhead = *t.ConditionParams.DurationS + } + + predictions := s.predictionProvider.GetPredictions() + var fired bool + + // Use a special blob ID slot for tracking predictions (negative IDs are in-memory only) + // We use -1000 - (hash of person+zone) to track per-person-per-zone state + for _, pred := range predictions { + // Check person filter + if t.ConditionParams.PersonID != "" && t.ConditionParams.PersonID != "anyone" { + if t.ConditionParams.PersonID != pred.PersonID { + continue + } + } + + // Check if predicted zone matches our volume + // For predicted_enter, we check if the predicted zone is within the volume + // TODO: Add zone-to-volume mapping; for now, we trigger if the predicted zone name + // is in the trigger name or if the trigger name contains the predicted zone name + if !s.zoneMatchesVolume(pred.PredictedNextZoneName, t) { + continue + } + + // Check if prediction confidence is high enough (default 0.6) + if pred.PredictionConfidence < 0.6 { + continue + } + + // Check if estimated transition time is within the threshold + if pred.EstimatedTransitionMinutes > float64(minutesAhead) { + continue + } + + // Create a unique state key for this person-zone combination + stateKey := -1000 - s.hashPersonZone(pred.PersonID, pred.PredictedNextZoneID) + blobState := state.Blobs[stateKey] + if blobState == nil { + blobState = &BlobState{ + BlobID: stateKey, + Inside: false, + LastCheckTime: now, + } + state.Blobs[stateKey] = blobState + } + + // Rising-edge detection: fire when we go from below threshold to at/above threshold + if !blobState.Inside && pred.PredictionConfidence >= 0.6 { + // Check cooldown (60 minutes) + if !blobState.EnterTime.IsZero() && now.Sub(blobState.EnterTime) < 60*time.Minute { + continue + } + + // Fire the trigger + blobState.Inside = true + blobState.EnterTime = now + blobState.LastCheckTime = now + fired = true + } else if pred.PredictionConfidence < 0.6 { + // Below threshold - reset for next rising edge + blobState.Inside = false + } + + blobState.LastCheckTime = now + } + + return fired +} + +// zoneMatchesVolume checks if a zone name matches a trigger volume. +// TODO: Implement proper zone-to-volume geometry mapping. +func (s *Store) zoneMatchesVolume(zoneName string, t *Trigger) bool { + // For now, use simple string matching + if zoneName == "" { + return false + } + // Check if trigger name contains zone name or vice versa + return strings.Contains(strings.ToLower(t.Name), strings.ToLower(zoneName)) || + strings.Contains(strings.ToLower(zoneName), strings.ToLower(t.Name)) +} + +// hashPersonZone creates a simple hash of person+zone for state tracking. +func (s *Store) hashPersonZone(personID, zoneID string) int { + // Simple hash: sum of character codes + h := 0 + for _, c := range personID + zoneID { + h += int(c) + } + return h % 10000 // Keep it small and positive +} + // isTimeInRange checks if the current time is within the constraint window. func (s *Store) isTimeInRange(tc *TimeConstraint, now time.Time) bool { if tc == nil { @@ -935,13 +1067,13 @@ type FiringRecord struct { // WebhookLogEntry represents an entry in the webhook audit log. type WebhookLogEntry struct { - ID int64 `json:"id"` - TriggerID string `json:"trigger_id"` - URL string `json:"url"` - FiredAtMs int64 `json:"fired_at_ms"` - Status int `json:"status_code,omitempty"` - LatencyMs int64 `json:"latency_ms"` - Error string `json:"error,omitempty"` + ID int64 `json:"id"` + TriggerID string `json:"trigger_id"` + URL string `json:"url"` + FiredAtMs int64 `json:"fired_at_ms"` + Status int `json:"status_code,omitempty"` + LatencyMs int64 `json:"latency_ms"` + Error string `json:"error,omitempty"` } // BlobPos represents a blob's position for trigger evaluation.