feat: implement internal pub/sub event bus

- Add TimestampMs field to eventbus.Event
- Add event type constants (detection, zone_entry, zone_exit, etc.)
- Add severity level constants
- Add global Default() bus instance for shared access
- Add convenience functions: PublishDefault, PublishDefaultSync, SubscribeDefault
- Integrate with events.InsertEvent to publish to eventbus
- Add comprehensive table-driven tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-07 12:51:41 -04:00
parent 3db48cd61d
commit 41d6d09561
3 changed files with 276 additions and 11 deletions

View file

@ -6,14 +6,41 @@ import (
"sync"
)
// Event type constants. These match the EventType values from the events package.
const (
TypeDetection = "detection"
TypeZoneEntry = "zone_entry"
TypeZoneExit = "zone_exit"
TypePortalCrossing = "portal_crossing"
TypeTriggerFired = "trigger_fired"
TypeFallAlert = "fall_alert"
TypeAnomaly = "anomaly"
TypeSecurityAlert = "security_alert"
TypeNodeOnline = "node_online"
TypeNodeOffline = "node_offline"
TypeOTAUpdate = "ota_update"
TypeBaselineChanged = "baseline_changed"
TypeSystem = "system"
TypeLearningMilestone = "learning_milestone"
)
// Severity level constants.
const (
SeverityInfo = "info"
SeverityWarning = "warning"
SeverityAlert = "alert"
SeverityCritical = "critical"
)
// Event represents a timeline event published on the bus.
type Event struct {
Type string // detection, zone_entry, zone_exit, etc.
Zone string // optional zone name
Person string // optional person name (BLE-identified)
BlobID int // optional associated blob ID
Detail interface{} // optional detail payload (will be JSON-encoded by subscribers)
Severity string // info, warning, alert, critical
Type string // detection, zone_entry, zone_exit, etc.
TimestampMs int64 // Unix milliseconds timestamp
Zone string // optional zone name
Person string // optional person name (BLE-identified)
BlobID int // optional associated blob ID
Detail interface{} // optional detail payload (will be JSON-encoded by subscribers)
Severity string // info, warning, alert, critical
}
// Subscriber receives events published on the bus.
@ -21,6 +48,37 @@ type Event struct {
// persist to SQLite, broadcast to WebSocket, etc.
type Subscriber func(Event)
// defaultBus is the global default event bus.
// Packages can call Default() to get a shared bus instance.
var (
defaultBus *Bus
once sync.Once
)
// Default returns the global default event bus instance.
// It is safe to call from any goroutine.
func Default() *Bus {
once.Do(func() {
defaultBus = New()
})
return defaultBus
}
// PublishDefault is a convenience function that publishes to the default bus.
func PublishDefault(e Event) {
Default().Publish(e)
}
// PublishDefaultSync is a convenience function that publishes to the default bus synchronously.
func PublishDefaultSync(e Event) {
Default().PublishSync(e)
}
// SubscribeDefault is a convenience function that subscribes to the default bus.
func SubscribeDefault(fn Subscriber) {
Default().Subscribe(fn)
}
// Bus is an internal publish/subscribe mechanism for timeline events.
// It is safe for concurrent use.
type Bus struct {

View file

@ -4,6 +4,7 @@ import (
"sync"
"sync/atomic"
"testing"
"time"
)
func TestPublishSync(t *testing.T) {
@ -14,16 +15,16 @@ func TestPublishSync(t *testing.T) {
received = append(received, e)
})
bus.PublishSync(Event{Type: "detection", Zone: "Kitchen"})
bus.PublishSync(Event{Type: "zone_exit", Person: "Alice"})
bus.PublishSync(Event{Type: TypeDetection, Zone: "Kitchen"})
bus.PublishSync(Event{Type: TypeZoneExit, Person: "Alice"})
if len(received) != 2 {
t.Fatalf("expected 2 events, got %d", len(received))
}
if received[0].Type != "detection" || received[0].Zone != "Kitchen" {
if received[0].Type != TypeDetection || received[0].Zone != "Kitchen" {
t.Errorf("event 0 mismatch: %+v", received[0])
}
if received[1].Type != "zone_exit" || received[1].Person != "Alice" {
if received[1].Type != TypeZoneExit || received[1].Person != "Alice" {
t.Errorf("event 1 mismatch: %+v", received[1])
}
}
@ -71,3 +72,195 @@ func TestPublishNoSubscribers(t *testing.T) {
bus.PublishSync(Event{Type: "test"})
bus.Publish(Event{Type: "test"})
}
// TestEventTypes verifies all event type constants are defined.
func TestEventTypes(t *testing.T) {
tests := []struct {
name string
typ string
}{
{"Detection", TypeDetection},
{"ZoneEntry", TypeZoneEntry},
{"ZoneExit", TypeZoneExit},
{"PortalCrossing", TypePortalCrossing},
{"TriggerFired", TypeTriggerFired},
{"FallAlert", TypeFallAlert},
{"Anomaly", TypeAnomaly},
{"SecurityAlert", TypeSecurityAlert},
{"NodeOnline", TypeNodeOnline},
{"NodeOffline", TypeNodeOffline},
{"OTAUpdate", TypeOTAUpdate},
{"BaselineChanged", TypeBaselineChanged},
{"System", TypeSystem},
{"LearningMilestone", TypeLearningMilestone},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.typ == "" {
t.Errorf("event type %s is empty string", tt.name)
}
})
}
}
// TestSeverityConstants verifies all severity constants are defined.
func TestSeverityConstants(t *testing.T) {
tests := []struct {
name string
severity string
}{
{"Info", SeverityInfo},
{"Warning", SeverityWarning},
{"Alert", SeverityAlert},
{"Critical", SeverityCritical},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.severity == "" {
t.Errorf("severity %s is empty string", tt.name)
}
})
}
}
// TestEventFields verifies event struct fields work correctly.
func TestEventFields(t *testing.T) {
tests := []struct {
name string
event Event
check func(Event) error
}{
{
name: "full event",
event: Event{
Type: TypeDetection,
TimestampMs: 1234567890,
Zone: "Kitchen",
Person: "Alice",
BlobID: 42,
Detail: map[string]interface{}{"x": 1.0, "y": 2.0},
Severity: SeverityInfo,
},
check: func(e Event) error {
if e.Type != TypeDetection {
t.Errorf("Type = %v, want %v", e.Type, TypeDetection)
}
if e.TimestampMs != 1234567890 {
t.Errorf("TimestampMs = %v, want 1234567890", e.TimestampMs)
}
if e.Zone != "Kitchen" {
t.Errorf("Zone = %v, want Kitchen", e.Zone)
}
if e.Person != "Alice" {
t.Errorf("Person = %v, want Alice", e.Person)
}
if e.BlobID != 42 {
t.Errorf("BlobID = %v, want 42", e.BlobID)
}
if e.Severity != SeverityInfo {
t.Errorf("Severity = %v, want %v", e.Severity, SeverityInfo)
}
return nil
},
},
{
name: "minimal event",
event: Event{
Type: TypeSystem,
Severity: SeverityWarning,
},
check: func(e Event) error {
if e.Type != TypeSystem {
t.Errorf("Type = %v, want %v", e.Type, TypeSystem)
}
if e.Severity != SeverityWarning {
t.Errorf("Severity = %v, want %v", e.Severity, SeverityWarning)
}
return nil
},
},
{
name: "event with timestamp",
event: Event{
Type: TypeZoneEntry,
TimestampMs: time.Now().UnixMilli(),
Zone: "Hallway",
Person: "Bob",
},
check: func(e Event) error {
if e.Type != TypeZoneEntry {
t.Errorf("Type = %v, want %v", e.Type, TypeZoneEntry)
}
if e.TimestampMs == 0 {
t.Error("TimestampMs not set")
}
return nil
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.check != nil {
tt.check(tt.event)
}
// Verify event can be published and received intact
bus := New()
var received Event
bus.Subscribe(func(e Event) {
received = e
})
bus.PublishSync(tt.event)
if received.Type != tt.event.Type {
t.Errorf("received Type = %v, want %v", received.Type, tt.event.Type)
}
if received.Zone != tt.event.Zone {
t.Errorf("received Zone = %v, want %v", received.Zone, tt.event.Zone)
}
if received.Person != tt.event.Person {
t.Errorf("received Person = %v, want %v", received.Person, tt.event.Person)
}
if received.BlobID != tt.event.BlobID {
t.Errorf("received BlobID = %v, want %v", received.BlobID, tt.event.BlobID)
}
if received.Severity != tt.event.Severity {
t.Errorf("received Severity = %v, want %v", received.Severity, tt.event.Severity)
}
})
}
}
// TestSubscribeDuringPublish verifies subscriptions can be added safely.
func TestSubscribeDuringPublish(t *testing.T) {
bus := New()
var count int
var wg sync.WaitGroup
// Subscribe while publish is happening
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(10 * time.Millisecond)
bus.Subscribe(func(e Event) {
count++
})
}()
// Publish some events
for i := 0; i < 5; i++ {
bus.Publish(Event{Type: "test"})
time.Sleep(5 * time.Millisecond)
}
wg.Wait()
// Count depends on timing; just verify no panic/ deadlock
if count < 0 {
t.Errorf("count = %v, want >= 0", count)
}
}

View file

@ -9,6 +9,8 @@ import (
"time"
_ "modernc.org/sqlite"
"github.com/spaxel/mothership/internal/eventbus"
)
// EventType represents the type of an event.
@ -66,7 +68,7 @@ type QueryParams struct {
SearchQuery string // FTS5 search query
}
// InsertEvent inserts a new event into the database.
// InsertEvent inserts a new event into the database and publishes it to the event bus.
func InsertEvent(db *sql.DB, e Event) (int64, error) {
if e.TimestampMs == 0 {
e.TimestampMs = time.Now().UnixMilli()
@ -88,6 +90,18 @@ func InsertEvent(db *sql.DB, e Event) (int64, error) {
return 0, fmt.Errorf("get last insert id: %w", err)
}
// Publish to the internal event bus for WebSocket clients and other subscribers.
// This is non-blocking; subscribers run in separate goroutines.
eventbus.PublishDefault(eventbus.Event{
Type: string(e.Type),
TimestampMs: e.TimestampMs,
Zone: e.Zone,
Person: e.Person,
BlobID: e.BlobID,
Detail: e.DetailJSON, // Pass as string; subscribers can parse if needed
Severity: string(e.Severity),
})
return id, nil
}