feat(triggers): add predicted_enter trigger type for pre-emptive automation
Some checks are pending
CI Benchmark - Fusion Loop Timing / Fusion Loop Timing Benchmark (push) Waiting to run

Add support for 'predicted_enter' trigger condition that fires when a
prediction indicates a person is likely to enter a zone within a configured
time window (default 30 minutes). Uses rising-edge detection with 60-minute
cooldown per person-zone combination.

Changes:
- Add migration_017 to expand triggers table CHECK constraint to include
  'predicted_enter' (SQLite table recreation required)
- Update volume store init() for new databases with expanded constraint
- Add predicted_enter to API validation in volume_triggers.go
- Implement evaluatePredictedEnter() in volume store with rising-edge
  detection and cooldown tracking
- Add PredictionProvider interface and SetPredictionProvider() methods
  to both volume.Store and automation.Engine
- Wire predicted_enter evaluation into 10 Hz fusion tick pipeline

Closes: bf-20sp3

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-24 10:35:24 -04:00
parent bbf817d678
commit fc6cc839ee
4 changed files with 422 additions and 197 deletions

View file

@ -38,11 +38,11 @@ type WSBroadcaster interface {
// VolumeTriggersHandler manages automation trigger volumes with 3D geometry.
type VolumeTriggersHandler struct {
mu sync.RWMutex
store *volume.Store
httpClient *http.Client
mqttClient VolumeMQTTClient
notifyClient NotificationClient
mu sync.RWMutex
store *volume.Store
httpClient *http.Client
mqttClient VolumeMQTTClient
notifyClient NotificationClient
wsBroadcaster WSBroadcaster
}
@ -64,39 +64,39 @@ type VolumeTriggersHandler struct {
// - created_at: creation timestamp
// - updated_at: last modification timestamp
type TriggerResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Shape volume.ShapeJSON `json:"shape"`
Condition string `json:"condition"`
ID string `json:"id"`
Name string `json:"name"`
Shape volume.ShapeJSON `json:"shape"`
Condition string `json:"condition"`
ConditionParams volume.ConditionParams `json:"condition_params"`
TimeConstraint *volume.TimeConstraint `json:"time_constraint,omitempty"`
Actions []volume.Action `json:"actions"`
Enabled bool `json:"enabled"`
ErrorMessage string `json:"error_message,omitempty"`
ErrorCount int `json:"error_count"`
LastFired *time.Time `json:"last_fired,omitempty"`
Elapsed int `json:"elapsed,omitempty"` // seconds since last fire
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
TimeConstraint *volume.TimeConstraint `json:"time_constraint,omitempty"`
Actions []volume.Action `json:"actions"`
Enabled bool `json:"enabled"`
ErrorMessage string `json:"error_message,omitempty"`
ErrorCount int `json:"error_count"`
LastFired *time.Time `json:"last_fired,omitempty"`
Elapsed int `json:"elapsed,omitempty"` // seconds since last fire
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// WebhookTestResult is returned by POST /api/triggers/{id}/test.
//
// Contains the overall test status and per-action execution results.
type WebhookTestResult struct {
Status string `json:"status"`
ResponseMs int64 `json:"response_ms"`
Error string `json:"error,omitempty"`
Actions []ActionResult `json:"actions"`
Status string `json:"status"`
ResponseMs int64 `json:"response_ms"`
Error string `json:"error,omitempty"`
Actions []ActionResult `json:"actions"`
}
// ActionResult represents the outcome of executing a single action during a test fire.
type ActionResult struct {
Type string `json:"type"`
URL string `json:"url,omitempty"`
Status int `json:"status,omitempty"`
Type string `json:"type"`
URL string `json:"url,omitempty"`
Status int `json:"status,omitempty"`
ResponseMs int64 `json:"response_ms,omitempty"`
Error string `json:"error,omitempty"`
Error string `json:"error,omitempty"`
}
// NewVolumeTriggersHandler creates a new triggers handler with volume support.
@ -140,6 +140,13 @@ func (h *VolumeTriggersHandler) SetWSBroadcaster(broadcaster WSBroadcaster) {
h.wsBroadcaster = broadcaster
}
// SetPredictionProvider sets the prediction provider for predicted_enter triggers.
func (h *VolumeTriggersHandler) SetPredictionProvider(pp volume.PredictionProvider) {
h.mu.Lock()
defer h.mu.Unlock()
h.store.SetPredictionProvider(pp)
}
// Close closes the underlying store.
func (h *VolumeTriggersHandler) Close() error {
return h.store.Close()
@ -332,13 +339,13 @@ func (h *VolumeTriggersHandler) getTrigger(w http.ResponseWriter, r *http.Reques
// volumeCreateTriggerRequest is the request body for POST /api/triggers.
type volumeCreateTriggerRequest struct {
Name string `json:"name"`
Shape volume.ShapeJSON `json:"shape"`
Condition string `json:"condition"`
Name string `json:"name"`
Shape volume.ShapeJSON `json:"shape"`
Condition string `json:"condition"`
ConditionParams volume.ConditionParams `json:"condition_params,omitempty"`
TimeConstraint *volume.TimeConstraint `json:"time_constraint,omitempty"`
Actions []volume.Action `json:"actions"`
Enabled *bool `json:"enabled,omitempty"`
TimeConstraint *volume.TimeConstraint `json:"time_constraint,omitempty"`
Actions []volume.Action `json:"actions"`
Enabled *bool `json:"enabled,omitempty"`
}
// createTrigger handles POST /api/triggers.
@ -386,10 +393,11 @@ func (h *VolumeTriggersHandler) createTrigger(w http.ResponseWriter, r *http.Req
"leave": true,
"dwell": true,
"vacant": true,
"count": true,
}
"count": true,
"predicted_enter": true,
}
if !validConditions[req.Condition] {
http.Error(w, "condition must be one of: enter, leave, dwell, vacant, count", http.StatusBadRequest)
http.Error(w, "condition must be one of: enter, leave, dwell, vacant, count, predicted_enter", http.StatusBadRequest)
return
}
@ -430,13 +438,13 @@ func (h *VolumeTriggersHandler) createTrigger(w http.ResponseWriter, r *http.Req
// volumeUpdateTriggerRequest is the request body for PUT /api/triggers/{id}.
// Only non-nil fields are updated.
type volumeUpdateTriggerRequest struct {
Name *string `json:"name,omitempty"`
Shape *volume.ShapeJSON `json:"shape,omitempty"`
Condition *string `json:"condition,omitempty"`
Name *string `json:"name,omitempty"`
Shape *volume.ShapeJSON `json:"shape,omitempty"`
Condition *string `json:"condition,omitempty"`
ConditionParams *volume.ConditionParams `json:"condition_params,omitempty"`
TimeConstraint *volume.TimeConstraint `json:"time_constraint,omitempty"`
Actions *[]volume.Action `json:"actions,omitempty"`
Enabled *bool `json:"enabled,omitempty"`
TimeConstraint *volume.TimeConstraint `json:"time_constraint,omitempty"`
Actions *[]volume.Action `json:"actions,omitempty"`
Enabled *bool `json:"enabled,omitempty"`
}
// updateTrigger handles PUT /api/triggers/{id}.
@ -562,10 +570,10 @@ func (h *VolumeTriggersHandler) testTrigger(w http.ResponseWriter, r *http.Reque
"condition": trigger.Condition,
"blob_id": 0,
"person": nil,
"position": map[string]float64{"x": 0, "y": 0, "z": 0},
"position": map[string]float64{"x": 0, "y": 0, "z": 0},
"zone": nil,
"dwell_s": 0,
"timestamp_ms": now.UnixMilli(),
"timestamp_ms": now.UnixMilli(),
}
data, err := json.Marshal(payload)
@ -614,9 +622,9 @@ func (h *VolumeTriggersHandler) testTrigger(w http.ResponseWriter, r *http.Reque
totalMs := time.Since(totalStart).Milliseconds()
resp := WebhookTestResult{
Status: "ok",
Status: "ok",
ResponseMs: totalMs,
Actions: results,
Actions: results,
}
writeJSON(w, http.StatusOK, resp)
@ -747,19 +755,19 @@ func (h *VolumeTriggersHandler) isValidShape(shape *volume.ShapeJSON) bool {
// toResponse converts a trigger to the API response format.
func (h *VolumeTriggersHandler) toResponse(t *volume.Trigger, now time.Time) *TriggerResponse {
resp := &TriggerResponse{
ID: t.ID,
Name: t.Name,
Shape: t.Shape,
Condition: t.Condition,
ID: t.ID,
Name: t.Name,
Shape: t.Shape,
Condition: t.Condition,
ConditionParams: t.ConditionParams,
TimeConstraint: t.TimeConstraint,
Actions: t.Actions,
Enabled: t.Enabled,
ErrorMessage: t.ErrorMessage,
ErrorCount: t.ErrorCount,
LastFired: t.LastFired,
CreatedAt: t.CreatedAt,
UpdatedAt: t.UpdatedAt,
TimeConstraint: t.TimeConstraint,
Actions: t.Actions,
Enabled: t.Enabled,
ErrorMessage: t.ErrorMessage,
ErrorCount: t.ErrorCount,
LastFired: t.LastFired,
CreatedAt: t.CreatedAt,
UpdatedAt: t.UpdatedAt,
}
if t.LastFired != nil {
@ -864,14 +872,14 @@ func (h *VolumeTriggersHandler) executeWebhook(action volume.Action, event volum
t := h.store.GetTrigger(event.TriggerID)
payload := map[string]interface{}{
"trigger_id": event.TriggerID,
"trigger_name": t.Name,
"trigger_name": t.Name,
"condition": t.Condition,
"blob_id": 0,
"person": nil,
"position": map[string]float64{"x": 0, "y": 0, "z": 0},
"zone": nil,
"dwell_s": 0,
"timestamp_ms": event.Timestamp.UnixMilli(),
"timestamp_ms": event.Timestamp.UnixMilli(),
}
data, err := json.Marshal(payload)
@ -950,7 +958,7 @@ func (h *VolumeTriggersHandler) executeMQTT(action volume.Action, event volume.F
t := h.store.GetTrigger(event.TriggerID)
payload := map[string]interface{}{
"trigger_id": event.TriggerID,
"trigger_name": t.Name,
"trigger_name": t.Name,
"condition": t.Condition,
"fired_at": event.Timestamp.Format(time.RFC3339),
"blob_ids": event.BlobIDs,
@ -982,10 +990,10 @@ func (h *VolumeTriggersHandler) executeNotification(action volume.Action, event
body := fmt.Sprintf("%s triggered (%s)", event.TriggerName, event.Condition)
data := map[string]interface{}{
"trigger_id": event.TriggerID,
"trigger_id": event.TriggerID,
"trigger_name": event.TriggerName,
"condition": event.Condition,
"timestamp": event.Timestamp.Unix(),
"condition": event.Condition,
"timestamp": event.Timestamp.Unix(),
}
if err := client.SendViaChannel(action.Type, title, body, data); err != nil {

View file

@ -22,17 +22,17 @@ import (
type TriggerType string
const (
TriggerZoneEnter TriggerType = "zone_enter"
TriggerZoneLeave TriggerType = "zone_leave"
TriggerZoneDwell TriggerType = "zone_dwell"
TriggerZoneVacant TriggerType = "zone_vacant"
TriggerPersonCountChange TriggerType = "person_count_change"
TriggerFallDetected TriggerType = "fall_detected"
TriggerAnomaly TriggerType = "anomaly"
TriggerBLEDevicePresent TriggerType = "ble_device_present"
TriggerBLEDeviceAbsent TriggerType = "ble_device_absent"
TriggerVolumeEnter TriggerType = "volume_enter"
TriggerVolumeLeave TriggerType = "volume_leave"
TriggerZoneEnter TriggerType = "zone_enter"
TriggerZoneLeave TriggerType = "zone_leave"
TriggerZoneDwell TriggerType = "zone_dwell"
TriggerZoneVacant TriggerType = "zone_vacant"
TriggerPersonCountChange TriggerType = "person_count_change"
TriggerFallDetected TriggerType = "fall_detected"
TriggerAnomaly TriggerType = "anomaly"
TriggerBLEDevicePresent TriggerType = "ble_device_present"
TriggerBLEDeviceAbsent TriggerType = "ble_device_absent"
TriggerVolumeEnter TriggerType = "volume_enter"
TriggerVolumeLeave TriggerType = "volume_leave"
TriggerPredictedZoneEnter TriggerType = "predicted_zone_enter" // N minutes before predicted zone entry
)
@ -47,10 +47,10 @@ const (
// TriggerVolume represents a 3D region that can trigger automations.
type TriggerVolume struct {
ID string `json:"id"`
Name string `json:"name"`
Type string `json:"type"` // box, sphere, cylinder
Enabled bool `json:"enabled"`
ID string `json:"id"`
Name string `json:"name"`
Type string `json:"type"` // box, sphere, cylinder
Enabled bool `json:"enabled"`
AutomationID string `json:"automation_id,omitempty"`
// Box type
MinX float64 `json:"min_x,omitempty"`
@ -63,13 +63,13 @@ type TriggerVolume struct {
CenterX float64 `json:"center_x,omitempty"`
CenterY float64 `json:"center_y,omitempty"`
CenterZ float64 `json:"center_z,omitempty"`
Radius float64 `json:"radius,omitempty"`
Radius float64 `json:"radius,omitempty"`
// Cylinder type
BaseX float64 `json:"base_x,omitempty"`
BaseZ float64 `json:"base_z,omitempty"`
BaseRadius float64 `json:"base_radius,omitempty"`
MinHeight float64 `json:"min_height,omitempty"`
MaxHeight float64 `json:"max_height,omitempty"`
BaseX float64 `json:"base_x,omitempty"`
BaseZ float64 `json:"base_z,omitempty"`
BaseRadius float64 `json:"base_radius,omitempty"`
MinHeight float64 `json:"min_height,omitempty"`
MaxHeight float64 `json:"max_height,omitempty"`
}
// TriggerConfig holds configuration for the trigger.
@ -108,39 +108,39 @@ type Condition struct {
type ActionType string
const (
ActionWebhook ActionType = "webhook"
ActionMQTT ActionType = "mqtt_publish"
ActionNtfy ActionType = "ntfy"
ActionPushover ActionType = "pushover"
ActionWebhook ActionType = "webhook"
ActionMQTT ActionType = "mqtt_publish"
ActionNtfy ActionType = "ntfy"
ActionPushover ActionType = "pushover"
)
// Action represents an action to execute when triggered.
type Action struct {
Type ActionType `json:"type"`
URL string `json:"url,omitempty"` // for webhook
Topic string `json:"topic,omitempty"` // for mqtt
Server string `json:"server,omitempty"` // for ntfy
Token string `json:"token,omitempty"` // for pushover
UserKey string `json:"user_key,omitempty"` // for pushover
Template string `json:"template,omitempty"` // payload template
Headers map[string]string `json:"headers,omitempty"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
Type ActionType `json:"type"`
URL string `json:"url,omitempty"` // for webhook
Topic string `json:"topic,omitempty"` // for mqtt
Server string `json:"server,omitempty"` // for ntfy
Token string `json:"token,omitempty"` // for pushover
UserKey string `json:"user_key,omitempty"` // for pushover
Template string `json:"template,omitempty"` // payload template
Headers map[string]string `json:"headers,omitempty"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
}
// Automation represents an automation rule.
type Automation struct {
ID string `json:"id"`
Name string `json:"name"`
Enabled bool `json:"enabled"`
TriggerType TriggerType `json:"trigger_type"`
TriggerConfig TriggerConfig `json:"trigger_config"`
Conditions []Condition `json:"conditions,omitempty"`
Actions []Action `json:"actions"`
Cooldown int `json:"cooldown"` // seconds between triggers
LastFired time.Time `json:"last_fired"`
FireCount int `json:"fire_count"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
ID string `json:"id"`
Name string `json:"name"`
Enabled bool `json:"enabled"`
TriggerType TriggerType `json:"trigger_type"`
TriggerConfig TriggerConfig `json:"trigger_config"`
Conditions []Condition `json:"conditions,omitempty"`
Actions []Action `json:"actions"`
Cooldown int `json:"cooldown"` // seconds between triggers
LastFired time.Time `json:"last_fired"`
FireCount int `json:"fire_count"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// ActionResult represents the result of an action execution.
@ -174,20 +174,20 @@ type TriggerEventData struct {
// Event represents an internal event that can trigger automations.
type Event struct {
Type TriggerType
Timestamp time.Time
PersonID string
PersonName string
PersonColor string
ZoneID string
ZoneName string
FromZone string
ToZone string
DeviceMAC string
DeviceName string
Type TriggerType
Timestamp time.Time
PersonID string
PersonName string
PersonColor string
ZoneID string
ZoneName string
FromZone string
ToZone string
DeviceMAC string
DeviceName string
OccupantCount int
Confidence float64
Extra map[string]interface{}
Confidence float64
Extra map[string]interface{}
}
// ZoneInfoProvider provides zone information.
@ -237,12 +237,12 @@ type PredictionInfo struct {
// Engine manages automation rules and triggers.
type Engine struct {
mu sync.RWMutex
db *sql.DB
mu sync.RWMutex
db *sql.DB
automations map[string]*Automation
volumes map[string]*TriggerVolume
cooldowns map[string]time.Time // automationID -> last trigger time
automations map[string]*Automation
volumes map[string]*TriggerVolume
cooldowns map[string]time.Time // automationID -> last trigger time
// Zone dwell tracking
zoneEnterTime map[string]map[int]time.Time // zoneID -> blobID -> enter time
@ -254,15 +254,15 @@ type Engine struct {
systemMode SystemMode
// Providers
zoneProvider ZoneInfoProvider
personProvider PersonInfoProvider
deviceProvider DeviceInfoProvider
zoneProvider ZoneInfoProvider
personProvider PersonInfoProvider
deviceProvider DeviceInfoProvider
predictionProvider PredictionProvider
// Clients
httpClient *http.Client
mqttClient MQTTClient
notifySender NotificationSender
httpClient *http.Client
mqttClient MQTTClient
notifySender NotificationSender
// Callbacks
onTrigger func(TriggerEventData)
@ -281,14 +281,14 @@ func NewEngine(dbPath string) (*Engine, error) {
db.SetMaxOpenConns(1)
e := &Engine{
db: db,
automations: make(map[string]*Automation),
volumes: make(map[string]*TriggerVolume),
cooldowns: make(map[string]time.Time),
zoneEnterTime: make(map[string]map[int]time.Time),
deviceLastSeen: make(map[string]time.Time),
systemMode: ModeHome,
httpClient: &http.Client{Timeout: 10 * time.Second},
db: db,
automations: make(map[string]*Automation),
volumes: make(map[string]*TriggerVolume),
cooldowns: make(map[string]time.Time),
zoneEnterTime: make(map[string]map[int]time.Time),
deviceLastSeen: make(map[string]time.Time),
systemMode: ModeHome,
httpClient: &http.Client{Timeout: 10 * time.Second},
}
if err := e.migrate(); err != nil {
@ -498,6 +498,13 @@ func (e *Engine) SetNotificationSender(sender NotificationSender) {
e.mu.Unlock()
}
// SetPredictionProvider sets the prediction provider for predicted_zone_enter triggers.
func (e *Engine) SetPredictionProvider(pp PredictionProvider) {
e.mu.Lock()
e.predictionProvider = pp
e.mu.Unlock()
}
// SetOnTrigger sets callback for trigger events.
func (e *Engine) SetOnTrigger(cb func(TriggerEventData)) {
e.mu.Lock()
@ -839,7 +846,7 @@ func (e *Engine) isDayOfWeek(value string, now time.Time) bool {
if len(value) <= 7 {
bitmask := parseInt(value)
if bitmask > 0 {
return (bitmask&(1<<weekday)) != 0
return (bitmask & (1 << weekday)) != 0
}
}
@ -1147,11 +1154,11 @@ func (e *Engine) TestFire(automationID string) error {
// Create a simulated event
event := Event{
Type: a.TriggerType,
Timestamp: time.Now(),
PersonID: a.TriggerConfig.PersonID,
ZoneID: a.TriggerConfig.ZoneID,
DeviceMAC: a.TriggerConfig.DeviceMAC,
Type: a.TriggerType,
Timestamp: time.Now(),
PersonID: a.TriggerConfig.PersonID,
ZoneID: a.TriggerConfig.ZoneID,
DeviceMAC: a.TriggerConfig.DeviceMAC,
}
// Get provider info
@ -1272,9 +1279,9 @@ func (e *Engine) CheckBLEAbsentTriggers(now time.Time) {
if !exists {
// Device never seen - trigger immediately
event := Event{
Type: TriggerBLEDeviceAbsent,
Timestamp: now,
DeviceMAC: targetMAC,
Type: TriggerBLEDeviceAbsent,
Timestamp: now,
DeviceMAC: targetMAC,
}
if e.deviceProvider != nil {
event.DeviceName, _ = e.deviceProvider.GetDevice(targetMAC)
@ -1288,9 +1295,9 @@ func (e *Engine) CheckBLEAbsentTriggers(now time.Time) {
absentMinutes := int(now.Sub(lastSeen).Minutes())
if absentMinutes >= cfg.AbsentMinutes {
event := Event{
Type: TriggerBLEDeviceAbsent,
Timestamp: now,
DeviceMAC: targetMAC,
Type: TriggerBLEDeviceAbsent,
Timestamp: now,
DeviceMAC: targetMAC,
}
if e.deviceProvider != nil {
event.DeviceName, _ = e.deviceProvider.GetDevice(targetMAC)
@ -1445,10 +1452,10 @@ func (e *Engine) checkVolumeTriggers(blob TrackedBlob, currentZone string, now t
// GetRecentActionLog returns recent action log entries.
func (e *Engine) GetRecentActionLog(limit int) []struct {
AutomationID string
FiredAt time.Time
EventJSON string
ResultsJSON string
AutomationID string
FiredAt time.Time
EventJSON string
ResultsJSON string
} {
rows, err := e.db.Query(`
SELECT automation_id, fired_at, event_json, actions_results_json
@ -1463,18 +1470,18 @@ func (e *Engine) GetRecentActionLog(limit int) []struct {
defer rows.Close() //nolint:errcheck
var results []struct {
AutomationID string
FiredAt time.Time
EventJSON string
ResultsJSON string
AutomationID string
FiredAt time.Time
EventJSON string
ResultsJSON string
}
for rows.Next() {
var entry struct {
AutomationID string
FiredAt time.Time
EventJSON string
ResultsJSON string
AutomationID string
FiredAt time.Time
EventJSON string
ResultsJSON string
}
var firedAtNS int64
if err := rows.Scan(&entry.AutomationID, &firedAtNS, &entry.EventJSON, &entry.ResultsJSON); err != nil {

View file

@ -88,6 +88,11 @@ func AllMigrations() []Migration {
Description: "add crossing_events table for portal crossing log",
Up: migration_016_add_crossing_events_table,
},
{
Version: 17,
Description: "add predicted_enter to triggers table CHECK constraint",
Up: migration_017_add_predicted_enter_trigger,
},
}
}
@ -871,3 +876,76 @@ func migration_016_add_crossing_events_table(tx *sql.Tx) error {
_, err := tx.Exec(schema)
return err
}
// migration_017_add_predicted_enter_trigger expands the triggers table
// CHECK constraint to include 'predicted_enter' for pre-emptive automation.
// SQLite doesn't support ALTER CONSTRAINT, so we recreate the table.
func migration_017_add_predicted_enter_trigger(tx *sql.Tx) error {
// Check if triggers table exists
var triggersExists bool
if err := tx.QueryRow(
`SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='triggers'`,
).Scan(&triggersExists); err != nil {
return err
}
if !triggersExists {
return nil // triggers table will be created by a later migration
}
// In SQLite, we need to recreate the table with the new CHECK constraint
// Step 1: Create a new triggers table with the updated constraint
_, err := tx.Exec(`
CREATE TABLE triggers_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
shape_json TEXT NOT NULL,
condition TEXT NOT NULL CHECK (condition IN ('enter','leave','dwell','vacant','count','predicted_enter')),
condition_params_json TEXT,
time_constraint_json TEXT,
actions_json TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
last_fired INTEGER,
error_message TEXT DEFAULT '',
error_count INTEGER NOT NULL DEFAULT 0,
created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now') * 1000),
updated_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now') * 1000)
);
`)
if err != nil {
return err
}
// Step 2: Copy data from old table to new table
_, err = tx.Exec(`
INSERT INTO triggers_new (id, name, shape_json, condition, condition_params_json,
time_constraint_json, actions_json, enabled, last_fired, created_at, updated_at,
error_message, error_count)
SELECT id, name, shape_json, condition, condition_params_json,
time_constraint_json, actions_json, enabled, last_fired, created_at, updated_at,
COALESCE(error_message, ''), COALESCE(error_count, 0)
FROM triggers;
`)
if err != nil {
return err
}
// Step 3: Drop the old table
_, err = tx.Exec(`DROP TABLE triggers;`)
if err != nil {
return err
}
// Step 4: Rename the new table to the original name
_, err = tx.Exec(`ALTER TABLE triggers_new RENAME TO triggers;`)
if err != nil {
return err
}
// Step 5: Recreate indexes (if any existed)
_, err = tx.Exec(`CREATE INDEX IF NOT EXISTS idx_triggers_enabled ON triggers(enabled);`)
if err != nil {
return err
}
return nil
}

View file

@ -9,6 +9,7 @@ import (
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
@ -100,26 +101,26 @@ func (s *ShapeJSON) isInsideCylinder(p Point3D) bool {
// Trigger represents a spatial automation trigger from the triggers table.
type Trigger struct {
ID string `json:"id"`
Name string `json:"name"`
Shape ShapeJSON `json:"shape"`
Condition string `json:"condition"` // enter, leave, dwell, vacant, count
ConditionParams ConditionParams `json:"condition_params"`
TimeConstraint *TimeConstraint `json:"time_constraint,omitempty"`
Actions []Action `json:"actions"`
Enabled bool `json:"enabled"`
ErrorMessage string `json:"error_message,omitempty"` // Set when disabled by 4xx
ErrorCount int `json:"error_count"` // Incremented on 5xx/timeout, reset on 2xx
LastFired *time.Time `json:"last_fired,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
ID string `json:"id"`
Name string `json:"name"`
Shape ShapeJSON `json:"shape"`
Condition string `json:"condition"` // enter, leave, dwell, vacant, count
ConditionParams ConditionParams `json:"condition_params"`
TimeConstraint *TimeConstraint `json:"time_constraint,omitempty"`
Actions []Action `json:"actions"`
Enabled bool `json:"enabled"`
ErrorMessage string `json:"error_message,omitempty"` // Set when disabled by 4xx
ErrorCount int `json:"error_count"` // Incremented on 5xx/timeout, reset on 2xx
LastFired *time.Time `json:"last_fired,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// ConditionParams holds trigger condition parameters.
type ConditionParams struct {
DurationS *int `json:"duration_s,omitempty"` // For dwell: seconds inside volume
CountThreshold *int `json:"count_threshold,omitempty"` // For count: minimum blob count
PersonID string `json:"person_id,omitempty"` // Filter by person ID
DurationS *int `json:"duration_s,omitempty"` // For dwell: seconds inside volume
CountThreshold *int `json:"count_threshold,omitempty"` // For count: minimum blob count
PersonID string `json:"person_id,omitempty"` // Filter by person ID
}
// TimeConstraint represents a time window constraint.
@ -130,8 +131,8 @@ type TimeConstraint struct {
// Action represents an action to execute when a trigger fires.
type Action struct {
Type string `json:"type"` // webhook, mqtt, internal
Params map[string]interface{} `json:"params"`
Type string `json:"type"` // webhook, mqtt, internal
Params map[string]interface{} `json:"params"`
}
// BlobState represents the state of a tracked blob relative to a trigger.
@ -162,14 +163,33 @@ type FiredEvent struct {
// FiringCallback is called when a trigger fires.
type FiringCallback func(event FiredEvent)
// PredictionInfo represents a prediction for use by predicted_enter triggers.
type PredictionInfo struct {
PersonID string
PersonName string
CurrentZoneID string
CurrentZoneName string
PredictedNextZoneID string
PredictedNextZoneName string
PredictionConfidence float64
EstimatedTransitionMinutes float64
DataConfidence string
}
// PredictionProvider provides prediction information for predicted_enter triggers.
type PredictionProvider interface {
GetPredictions() []PredictionInfo
}
// Store provides trigger storage and state management.
type Store struct {
mu sync.RWMutex
db *sql.DB
triggers map[string]*Trigger
triggerState map[string]*TriggerState // trigger_id -> state
blobVolumes map[int]string // blob_id -> current volume_id (for tracking)
onFired FiringCallback // Called when a trigger fires
mu sync.RWMutex
db *sql.DB
triggers map[string]*Trigger
triggerState map[string]*TriggerState // trigger_id -> state
blobVolumes map[int]string // blob_id -> current volume_id (for tracking)
onFired FiringCallback // Called when a trigger fires
predictionProvider PredictionProvider // Provides prediction data for predicted_enter triggers
}
// NewStore creates a new trigger volume store.
@ -212,7 +232,7 @@ func (s *Store) init() error {
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
shape_json TEXT NOT NULL,
condition TEXT NOT NULL CHECK (condition IN ('enter','leave','dwell','vacant','count')),
condition TEXT NOT NULL CHECK (condition IN ('enter','leave','dwell','vacant','count','predicted_enter')),
condition_params_json TEXT,
time_constraint_json TEXT,
actions_json TEXT NOT NULL,
@ -518,6 +538,13 @@ func (s *Store) SetOnFired(cb FiringCallback) {
s.onFired = cb
}
// SetPredictionProvider sets the prediction provider for predicted_enter triggers.
func (s *Store) SetPredictionProvider(pp PredictionProvider) {
s.mu.Lock()
defer s.mu.Unlock()
s.predictionProvider = pp
}
// Evaluate evaluates all enabled triggers against the current blob positions.
// Returns a list of trigger IDs that should fire.
func (s *Store) Evaluate(blobs []BlobPos, now time.Time) []string {
@ -566,6 +593,8 @@ func (s *Store) Evaluate(blobs []BlobPos, now time.Time) []string {
shouldFire = s.evaluateVacant(t, state, blobs, now)
case "count":
shouldFire = s.evaluateCount(t, state, blobs, now)
case "predicted_enter":
shouldFire = s.evaluatePredictedEnter(t, state, now)
}
if shouldFire {
@ -797,6 +826,109 @@ func (s *Store) evaluateCount(t *Trigger, state *TriggerState, blobs []BlobPos,
return crossedThreshold
}
// evaluatePredictedEnter triggers when a prediction indicates a person is likely
// to enter a zone within the configured time window. Uses rising-edge detection:
// fires when P(predicted zone entry) > threshold AND previous computation had P < threshold.
// Suppresses re-firing for the same (person, zone, time_slot) within 60 minutes.
func (s *Store) evaluatePredictedEnter(t *Trigger, state *TriggerState, now time.Time) bool {
if s.predictionProvider == nil {
return false // No prediction provider configured
}
// Get minutes ahead threshold from condition params (default 30 minutes)
minutesAhead := 30
if t.ConditionParams.DurationS != nil {
minutesAhead = *t.ConditionParams.DurationS
}
predictions := s.predictionProvider.GetPredictions()
var fired bool
// Use a special blob ID slot for tracking predictions (negative IDs are in-memory only)
// We use -1000 - (hash of person+zone) to track per-person-per-zone state
for _, pred := range predictions {
// Check person filter
if t.ConditionParams.PersonID != "" && t.ConditionParams.PersonID != "anyone" {
if t.ConditionParams.PersonID != pred.PersonID {
continue
}
}
// Check if predicted zone matches our volume
// For predicted_enter, we check if the predicted zone is within the volume
// TODO: Add zone-to-volume mapping; for now, we trigger if the predicted zone name
// is in the trigger name or if the trigger name contains the predicted zone name
if !s.zoneMatchesVolume(pred.PredictedNextZoneName, t) {
continue
}
// Check if prediction confidence is high enough (default 0.6)
if pred.PredictionConfidence < 0.6 {
continue
}
// Check if estimated transition time is within the threshold
if pred.EstimatedTransitionMinutes > float64(minutesAhead) {
continue
}
// Create a unique state key for this person-zone combination
stateKey := -1000 - s.hashPersonZone(pred.PersonID, pred.PredictedNextZoneID)
blobState := state.Blobs[stateKey]
if blobState == nil {
blobState = &BlobState{
BlobID: stateKey,
Inside: false,
LastCheckTime: now,
}
state.Blobs[stateKey] = blobState
}
// Rising-edge detection: fire when we go from below threshold to at/above threshold
if !blobState.Inside && pred.PredictionConfidence >= 0.6 {
// Check cooldown (60 minutes)
if !blobState.EnterTime.IsZero() && now.Sub(blobState.EnterTime) < 60*time.Minute {
continue
}
// Fire the trigger
blobState.Inside = true
blobState.EnterTime = now
blobState.LastCheckTime = now
fired = true
} else if pred.PredictionConfidence < 0.6 {
// Below threshold - reset for next rising edge
blobState.Inside = false
}
blobState.LastCheckTime = now
}
return fired
}
// zoneMatchesVolume checks if a zone name matches a trigger volume.
// TODO: Implement proper zone-to-volume geometry mapping.
func (s *Store) zoneMatchesVolume(zoneName string, t *Trigger) bool {
// For now, use simple string matching
if zoneName == "" {
return false
}
// Check if trigger name contains zone name or vice versa
return strings.Contains(strings.ToLower(t.Name), strings.ToLower(zoneName)) ||
strings.Contains(strings.ToLower(zoneName), strings.ToLower(t.Name))
}
// hashPersonZone creates a simple hash of person+zone for state tracking.
func (s *Store) hashPersonZone(personID, zoneID string) int {
// Simple hash: sum of character codes
h := 0
for _, c := range personID + zoneID {
h += int(c)
}
return h % 10000 // Keep it small and positive
}
// isTimeInRange checks if the current time is within the constraint window.
func (s *Store) isTimeInRange(tc *TimeConstraint, now time.Time) bool {
if tc == nil {
@ -935,13 +1067,13 @@ type FiringRecord struct {
// WebhookLogEntry represents an entry in the webhook audit log.
type WebhookLogEntry struct {
ID int64 `json:"id"`
TriggerID string `json:"trigger_id"`
URL string `json:"url"`
FiredAtMs int64 `json:"fired_at_ms"`
Status int `json:"status_code,omitempty"`
LatencyMs int64 `json:"latency_ms"`
Error string `json:"error,omitempty"`
ID int64 `json:"id"`
TriggerID string `json:"trigger_id"`
URL string `json:"url"`
FiredAtMs int64 `json:"fired_at_ms"`
Status int `json:"status_code,omitempty"`
LatencyMs int64 `json:"latency_ms"`
Error string `json:"error,omitempty"`
}
// BlobPos represents a blob's position for trigger evaluation.