diff --git a/mothership/internal/eventbus/eventbus.go b/mothership/internal/eventbus/eventbus.go index 0f0814f..9482f60 100644 --- a/mothership/internal/eventbus/eventbus.go +++ b/mothership/internal/eventbus/eventbus.go @@ -6,14 +6,41 @@ import ( "sync" ) +// Event type constants. These match the EventType values from the events package. +const ( + TypeDetection = "detection" + TypeZoneEntry = "zone_entry" + TypeZoneExit = "zone_exit" + TypePortalCrossing = "portal_crossing" + TypeTriggerFired = "trigger_fired" + TypeFallAlert = "fall_alert" + TypeAnomaly = "anomaly" + TypeSecurityAlert = "security_alert" + TypeNodeOnline = "node_online" + TypeNodeOffline = "node_offline" + TypeOTAUpdate = "ota_update" + TypeBaselineChanged = "baseline_changed" + TypeSystem = "system" + TypeLearningMilestone = "learning_milestone" +) + +// Severity level constants. +const ( + SeverityInfo = "info" + SeverityWarning = "warning" + SeverityAlert = "alert" + SeverityCritical = "critical" +) + // Event represents a timeline event published on the bus. type Event struct { - Type string // detection, zone_entry, zone_exit, etc. - Zone string // optional zone name - Person string // optional person name (BLE-identified) - BlobID int // optional associated blob ID - Detail interface{} // optional detail payload (will be JSON-encoded by subscribers) - Severity string // info, warning, alert, critical + Type string // detection, zone_entry, zone_exit, etc. + TimestampMs int64 // Unix milliseconds timestamp + Zone string // optional zone name + Person string // optional person name (BLE-identified) + BlobID int // optional associated blob ID + Detail interface{} // optional detail payload (will be JSON-encoded by subscribers) + Severity string // info, warning, alert, critical } // Subscriber receives events published on the bus. @@ -21,6 +48,37 @@ type Event struct { // persist to SQLite, broadcast to WebSocket, etc. type Subscriber func(Event) +// defaultBus is the global default event bus. +// Packages can call Default() to get a shared bus instance. +var ( + defaultBus *Bus + once sync.Once +) + +// Default returns the global default event bus instance. +// It is safe to call from any goroutine. +func Default() *Bus { + once.Do(func() { + defaultBus = New() + }) + return defaultBus +} + +// PublishDefault is a convenience function that publishes to the default bus. +func PublishDefault(e Event) { + Default().Publish(e) +} + +// PublishDefaultSync is a convenience function that publishes to the default bus synchronously. +func PublishDefaultSync(e Event) { + Default().PublishSync(e) +} + +// SubscribeDefault is a convenience function that subscribes to the default bus. +func SubscribeDefault(fn Subscriber) { + Default().Subscribe(fn) +} + // Bus is an internal publish/subscribe mechanism for timeline events. // It is safe for concurrent use. type Bus struct { diff --git a/mothership/internal/eventbus/eventbus_test.go b/mothership/internal/eventbus/eventbus_test.go index 0b8a469..664b7b4 100644 --- a/mothership/internal/eventbus/eventbus_test.go +++ b/mothership/internal/eventbus/eventbus_test.go @@ -4,6 +4,7 @@ import ( "sync" "sync/atomic" "testing" + "time" ) func TestPublishSync(t *testing.T) { @@ -14,16 +15,16 @@ func TestPublishSync(t *testing.T) { received = append(received, e) }) - bus.PublishSync(Event{Type: "detection", Zone: "Kitchen"}) - bus.PublishSync(Event{Type: "zone_exit", Person: "Alice"}) + bus.PublishSync(Event{Type: TypeDetection, Zone: "Kitchen"}) + bus.PublishSync(Event{Type: TypeZoneExit, Person: "Alice"}) if len(received) != 2 { t.Fatalf("expected 2 events, got %d", len(received)) } - if received[0].Type != "detection" || received[0].Zone != "Kitchen" { + if received[0].Type != TypeDetection || received[0].Zone != "Kitchen" { t.Errorf("event 0 mismatch: %+v", received[0]) } - if received[1].Type != "zone_exit" || received[1].Person != "Alice" { + if received[1].Type != TypeZoneExit || received[1].Person != "Alice" { t.Errorf("event 1 mismatch: %+v", received[1]) } } @@ -71,3 +72,195 @@ func TestPublishNoSubscribers(t *testing.T) { bus.PublishSync(Event{Type: "test"}) bus.Publish(Event{Type: "test"}) } + +// TestEventTypes verifies all event type constants are defined. +func TestEventTypes(t *testing.T) { + tests := []struct { + name string + typ string + }{ + {"Detection", TypeDetection}, + {"ZoneEntry", TypeZoneEntry}, + {"ZoneExit", TypeZoneExit}, + {"PortalCrossing", TypePortalCrossing}, + {"TriggerFired", TypeTriggerFired}, + {"FallAlert", TypeFallAlert}, + {"Anomaly", TypeAnomaly}, + {"SecurityAlert", TypeSecurityAlert}, + {"NodeOnline", TypeNodeOnline}, + {"NodeOffline", TypeNodeOffline}, + {"OTAUpdate", TypeOTAUpdate}, + {"BaselineChanged", TypeBaselineChanged}, + {"System", TypeSystem}, + {"LearningMilestone", TypeLearningMilestone}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.typ == "" { + t.Errorf("event type %s is empty string", tt.name) + } + }) + } +} + +// TestSeverityConstants verifies all severity constants are defined. +func TestSeverityConstants(t *testing.T) { + tests := []struct { + name string + severity string + }{ + {"Info", SeverityInfo}, + {"Warning", SeverityWarning}, + {"Alert", SeverityAlert}, + {"Critical", SeverityCritical}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.severity == "" { + t.Errorf("severity %s is empty string", tt.name) + } + }) + } +} + +// TestEventFields verifies event struct fields work correctly. +func TestEventFields(t *testing.T) { + tests := []struct { + name string + event Event + check func(Event) error + }{ + { + name: "full event", + event: Event{ + Type: TypeDetection, + TimestampMs: 1234567890, + Zone: "Kitchen", + Person: "Alice", + BlobID: 42, + Detail: map[string]interface{}{"x": 1.0, "y": 2.0}, + Severity: SeverityInfo, + }, + check: func(e Event) error { + if e.Type != TypeDetection { + t.Errorf("Type = %v, want %v", e.Type, TypeDetection) + } + if e.TimestampMs != 1234567890 { + t.Errorf("TimestampMs = %v, want 1234567890", e.TimestampMs) + } + if e.Zone != "Kitchen" { + t.Errorf("Zone = %v, want Kitchen", e.Zone) + } + if e.Person != "Alice" { + t.Errorf("Person = %v, want Alice", e.Person) + } + if e.BlobID != 42 { + t.Errorf("BlobID = %v, want 42", e.BlobID) + } + if e.Severity != SeverityInfo { + t.Errorf("Severity = %v, want %v", e.Severity, SeverityInfo) + } + return nil + }, + }, + { + name: "minimal event", + event: Event{ + Type: TypeSystem, + Severity: SeverityWarning, + }, + check: func(e Event) error { + if e.Type != TypeSystem { + t.Errorf("Type = %v, want %v", e.Type, TypeSystem) + } + if e.Severity != SeverityWarning { + t.Errorf("Severity = %v, want %v", e.Severity, SeverityWarning) + } + return nil + }, + }, + { + name: "event with timestamp", + event: Event{ + Type: TypeZoneEntry, + TimestampMs: time.Now().UnixMilli(), + Zone: "Hallway", + Person: "Bob", + }, + check: func(e Event) error { + if e.Type != TypeZoneEntry { + t.Errorf("Type = %v, want %v", e.Type, TypeZoneEntry) + } + if e.TimestampMs == 0 { + t.Error("TimestampMs not set") + } + return nil + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.check != nil { + tt.check(tt.event) + } + + // Verify event can be published and received intact + bus := New() + var received Event + bus.Subscribe(func(e Event) { + received = e + }) + bus.PublishSync(tt.event) + + if received.Type != tt.event.Type { + t.Errorf("received Type = %v, want %v", received.Type, tt.event.Type) + } + if received.Zone != tt.event.Zone { + t.Errorf("received Zone = %v, want %v", received.Zone, tt.event.Zone) + } + if received.Person != tt.event.Person { + t.Errorf("received Person = %v, want %v", received.Person, tt.event.Person) + } + if received.BlobID != tt.event.BlobID { + t.Errorf("received BlobID = %v, want %v", received.BlobID, tt.event.BlobID) + } + if received.Severity != tt.event.Severity { + t.Errorf("received Severity = %v, want %v", received.Severity, tt.event.Severity) + } + }) + } +} + +// TestSubscribeDuringPublish verifies subscriptions can be added safely. +func TestSubscribeDuringPublish(t *testing.T) { + bus := New() + + var count int + var wg sync.WaitGroup + + // Subscribe while publish is happening + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(10 * time.Millisecond) + bus.Subscribe(func(e Event) { + count++ + }) + }() + + // Publish some events + for i := 0; i < 5; i++ { + bus.Publish(Event{Type: "test"}) + time.Sleep(5 * time.Millisecond) + } + + wg.Wait() + + // Count depends on timing; just verify no panic/ deadlock + if count < 0 { + t.Errorf("count = %v, want >= 0", count) + } +} diff --git a/mothership/internal/events/events.go b/mothership/internal/events/events.go index 00ee953..181ca12 100644 --- a/mothership/internal/events/events.go +++ b/mothership/internal/events/events.go @@ -9,6 +9,8 @@ import ( "time" _ "modernc.org/sqlite" + + "github.com/spaxel/mothership/internal/eventbus" ) // EventType represents the type of an event. @@ -66,7 +68,7 @@ type QueryParams struct { SearchQuery string // FTS5 search query } -// InsertEvent inserts a new event into the database. +// InsertEvent inserts a new event into the database and publishes it to the event bus. func InsertEvent(db *sql.DB, e Event) (int64, error) { if e.TimestampMs == 0 { e.TimestampMs = time.Now().UnixMilli() @@ -88,6 +90,18 @@ func InsertEvent(db *sql.DB, e Event) (int64, error) { return 0, fmt.Errorf("get last insert id: %w", err) } + // Publish to the internal event bus for WebSocket clients and other subscribers. + // This is non-blocking; subscribers run in separate goroutines. + eventbus.PublishDefault(eventbus.Event{ + Type: string(e.Type), + TimestampMs: e.TimestampMs, + Zone: e.Zone, + Person: e.Person, + BlobID: e.BlobID, + Detail: e.DetailJSON, // Pass as string; subscribers can parse if needed + Severity: string(e.Severity), + }) + return id, nil }