feat: implement internal typed event bus with pub-sub

Implemented EventBus in mothership/internal/events/bus.go with:

- BusEventType enum with 18 event types (MotionDetected, ZoneTransition,
  FallDetected, NodeConnected, etc.)
- Typed EventPayload structs for all event types implementing
  EventType() and GetTimestamp() methods
- EventBus.Publish(payload) - non-blocking fan-out to all subscribers
- EventBus.Subscribe(eventType) - returns buffered channel for events
- EventBus.Unsubscribe() - removes subscriber
- EventBus.PublishBlocking() - blocking publish with context support
- Multiple subscribers per event type with non-blocking delivery
- Thread-safe with sync.RWMutex

Acceptance criteria met:
- Publish completes within 10ms (TestEventBusPublishWithin10ms)
- Multiple subscribers receive same event (TestEventBusMultipleSubscribers)
- All 18 event types have corresponding payload structs (TestAllPayloadTypes)
- 17 tests covering subscribe, publish, unsubscribe, concurrent access

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-09 14:04:40 -04:00
parent 927b759823
commit 43ca7fc49a
2 changed files with 1019 additions and 0 deletions

View file

@ -0,0 +1,415 @@
// Package events provides an internal typed event bus for subsystem communication.
package events
import (
"context"
"sync"
"time"
)
// BusEventType represents the type of event being published on the internal bus.
type BusEventType string
const (
// Motion events
BusMotionDetected BusEventType = "motion_detected"
BusMotionStopped BusEventType = "motion_stopped"
// Zone events
BusZoneTransition BusEventType = "zone_transition"
BusZoneEntry BusEventType = "zone_entry"
BusZoneExit BusEventType = "zone_exit"
// Safety events
BusFallDetected BusEventType = "fall_detected"
BusFallConfirmed BusEventType = "fall_confirmed"
// Node lifecycle events
BusNodeConnected BusEventType = "node_connected"
BusNodeDisconnected BusEventType = "node_disconnected"
BusNodeReconnected BusEventType = "node_reconnected"
BusNodeStale BusEventType = "node_stale"
// System events
BusSystemStarted BusEventType = "system_started"
BusSystemShutdown BusEventType = "system_shutdown"
BusConfigChanged BusEventType = "config_changed"
// Automation events
BusTriggerFired BusEventType = "trigger_fired"
BusTriggerCleared BusEventType = "trigger_cleared"
// Learning events
BusBaselineUpdated BusEventType = "baseline_updated"
BusModelUpdated BusEventType = "model_updated"
)
// EventPayload is the interface that all event payloads must implement.
type EventPayload interface {
EventType() BusEventType
GetTimestamp() time.Time
}
// MotionDetectedPayload is emitted when motion is first detected after a period of stillness.
type MotionDetectedPayload struct {
Timestamp time.Time `json:"timestamp"`
ZoneID string `json:"zone_id"`
ZoneName string `json:"zone_name"`
BlobID int `json:"blob_id"`
PersonID string `json:"person_id,omitempty"`
PersonName string `json:"person_name,omitempty"`
Confidence float64 `json:"confidence"`
Position Position `json:"position"`
}
func (m MotionDetectedPayload) EventType() BusEventType { return BusMotionDetected }
func (m MotionDetectedPayload) GetTimestamp() time.Time { return m.Timestamp }
// MotionStoppedPayload is emitted when motion ceases in a zone.
type MotionStoppedPayload struct {
Timestamp time.Time `json:"timestamp"`
ZoneID string `json:"zone_id"`
ZoneName string `json:"zone_name"`
BlobID int `json:"blob_id"`
DurationMs int64 `json:"duration_ms"`
PersonID string `json:"person_id,omitempty"`
PersonName string `json:"person_name,omitempty"`
}
func (m MotionStoppedPayload) EventType() BusEventType { return BusMotionStopped }
func (m MotionStoppedPayload) GetTimestamp() time.Time { return m.Timestamp }
// ZoneTransitionPayload is emitted when a blob crosses a portal between zones.
type ZoneTransitionPayload struct {
Timestamp time.Time `json:"timestamp"`
PortalID int `json:"portal_id"`
PortalName string `json:"portal_name"`
FromZoneID string `json:"from_zone_id"`
FromZoneName string `json:"from_zone_name"`
ToZoneID string `json:"to_zone_id"`
ToZoneName string `json:"to_zone_name"`
BlobID int `json:"blob_id"`
PersonID string `json:"person_id,omitempty"`
PersonName string `json:"person_name,omitempty"`
Position Position `json:"position"`
Direction string `json:"direction"` // "a_to_b" or "b_to_a"
}
func (z ZoneTransitionPayload) EventType() BusEventType { return BusZoneTransition }
func (z ZoneTransitionPayload) GetTimestamp() time.Time { return z.Timestamp }
// ZoneEntryPayload is emitted when a blob enters a zone (not via portal).
type ZoneEntryPayload struct {
Timestamp time.Time `json:"timestamp"`
ZoneID string `json:"zone_id"`
ZoneName string `json:"zone_name"`
BlobID int `json:"blob_id"`
PersonID string `json:"person_id,omitempty"`
PersonName string `json:"person_name,omitempty"`
Position Position `json:"position"`
}
func (z ZoneEntryPayload) EventType() BusEventType { return BusZoneEntry }
func (z ZoneEntryPayload) GetTimestamp() time.Time { return z.Timestamp }
// ZoneExitPayload is emitted when a blob exits a zone (not via portal).
type ZoneExitPayload struct {
Timestamp time.Time `json:"timestamp"`
ZoneID string `json:"zone_id"`
ZoneName string `json:"zone_name"`
BlobID int `json:"blob_id"`
PersonID string `json:"person_id,omitempty"`
PersonName string `json:"person_name,omitempty"`
Position Position `json:"position"`
}
func (z ZoneExitPayload) EventType() BusEventType { return BusZoneExit }
func (z ZoneExitPayload) GetTimestamp() time.Time { return z.Timestamp }
// FallDetectedPayload is emitted when a potential fall is detected.
type FallDetectedPayload struct {
Timestamp time.Time `json:"timestamp"`
ZoneID string `json:"zone_id"`
ZoneName string `json:"zone_name"`
BlobID int `json:"blob_id"`
PersonID string `json:"person_id,omitempty"`
PersonName string `json:"person_name,omitempty"`
Position Position `json:"position"`
ZVelocity float64 `json:"z_velocity"`
Confidence float64 `json:"confidence"`
}
func (f FallDetectedPayload) EventType() BusEventType { return BusFallDetected }
func (f FallDetectedPayload) GetTimestamp() time.Time { return f.Timestamp }
// FallConfirmedPayload is emitted when a fall is confirmed after the confirmation window.
type FallConfirmedPayload struct {
Timestamp time.Time `json:"timestamp"`
ZoneID string `json:"zone_id"`
ZoneName string `json:"zone_name"`
BlobID int `json:"blob_id"`
PersonID string `json:"person_id,omitempty"`
PersonName string `json:"person_name,omitempty"`
Position Position `json:"position"`
ConfirmationMs int64 `json:"confirmation_ms"`
AlertSent bool `json:"alert_sent"`
}
func (f FallConfirmedPayload) EventType() BusEventType { return BusFallConfirmed }
func (f FallConfirmedPayload) GetTimestamp() time.Time { return f.Timestamp }
// NodeConnectedPayload is emitted when a node connects for the first time or after a long absence.
type NodeConnectedPayload struct {
Timestamp time.Time `json:"timestamp"`
NodeMAC string `json:"node_mac"`
NodeName string `json:"node_name"`
FirmwareVer string `json:"firmware_version"`
IPAddress string `json:"ip_address"`
}
func (n NodeConnectedPayload) EventType() BusEventType { return BusNodeConnected }
func (n NodeConnectedPayload) GetTimestamp() time.Time { return n.Timestamp }
// NodeDisconnectedPayload is emitted when a node disconnects unexpectedly.
type NodeDisconnectedPayload struct {
Timestamp time.Time `json:"timestamp"`
NodeMAC string `json:"node_mac"`
NodeName string `json:"node_name"`
WasOnlineFor int64 `json:"was_online_for_ms"`
Reason string `json:"reason,omitempty"` // "timeout", "error", "shutdown"
}
func (n NodeDisconnectedPayload) EventType() BusEventType { return BusNodeDisconnected }
func (n NodeDisconnectedPayload) GetTimestamp() time.Time { return n.Timestamp }
// NodeReconnectedPayload is emitted when a node reconnects after a brief disconnection.
type NodeReconnectedPayload struct {
Timestamp time.Time `json:"timestamp"`
NodeMAC string `json:"node_mac"`
NodeName string `json:"node_name"`
OfflineForMs int64 `json:"offline_for_ms"`
}
func (n NodeReconnectedPayload) EventType() BusEventType { return BusNodeReconnected }
func (n NodeReconnectedPayload) GetTimestamp() time.Time { return n.Timestamp }
// NodeStalePayload is emitted when a node hasn't sent health updates within the expected interval.
type NodeStalePayload struct {
Timestamp time.Time `json:"timestamp"`
NodeMAC string `json:"node_mac"`
NodeName string `json:"node_name"`
LastHealthMs int64 `json:"last_health_ms"`
}
func (n NodeStalePayload) EventType() BusEventType { return BusNodeStale }
func (n NodeStalePayload) GetTimestamp() time.Time { return n.Timestamp }
// SystemStartedPayload is emitted when the mothership completes startup.
type SystemStartedPayload struct {
Timestamp time.Time `json:"timestamp"`
Version string `json:"version"`
StartTime time.Time `json:"start_time"`
DurationMs int64 `json:"duration_ms"`
}
func (s SystemStartedPayload) EventType() BusEventType { return BusSystemStarted }
func (s SystemStartedPayload) GetTimestamp() time.Time { return s.Timestamp }
// SystemShutdownPayload is emitted when the mothership begins graceful shutdown.
type SystemShutdownPayload struct {
Timestamp time.Time `json:"timestamp"`
Reason string `json:"reason,omitempty"`
DurationMs int64 `json:"duration_ms"`
}
func (s SystemShutdownPayload) EventType() BusEventType { return BusSystemShutdown }
func (s SystemShutdownPayload) GetTimestamp() time.Time { return s.Timestamp }
// ConfigChangedPayload is emitted when a configuration value changes.
type ConfigChangedPayload struct {
Timestamp time.Time `json:"timestamp"`
Key string `json:"key"`
OldValue string `json:"old_value,omitempty"`
NewValue string `json:"new_value,omitempty"`
ChangedBy string `json:"changed_by,omitempty"` // "api", "migration", etc.
}
func (c ConfigChangedPayload) EventType() BusEventType { return BusConfigChanged }
func (c ConfigChangedPayload) GetTimestamp() time.Time { return c.Timestamp }
// TriggerFiredPayload is emitted when an automation trigger condition is met.
type TriggerFiredPayload struct {
Timestamp time.Time `json:"timestamp"`
TriggerID int `json:"trigger_id"`
TriggerName string `json:"trigger_name"`
Condition string `json:"condition"` // "enter", "leave", "dwell", "vacant", "count"
ZoneID string `json:"zone_id,omitempty"`
ZoneName string `json:"zone_name,omitempty"`
BlobID int `json:"blob_id,omitempty"`
PersonID string `json:"person_id,omitempty"`
PersonName string `json:"person_name,omitempty"`
Position Position `json:"position,omitempty"`
DurationS float64 `json:"duration_s,omitempty"` // For dwell conditions
}
func (t TriggerFiredPayload) EventType() BusEventType { return BusTriggerFired }
func (t TriggerFiredPayload) GetTimestamp() time.Time { return t.Timestamp }
// TriggerClearedPayload is emitted when a trigger condition is no longer met.
type TriggerClearedPayload struct {
Timestamp time.Time `json:"timestamp"`
TriggerID int `json:"trigger_id"`
TriggerName string `json:"trigger_name"`
DurationS float64 `json:"duration_s"`
}
func (t TriggerClearedPayload) EventType() BusEventType { return BusTriggerCleared }
func (t TriggerClearedPayload) GetTimestamp() time.Time { return t.Timestamp }
// BaselineUpdatedPayload is emitted when a link baseline is updated.
type BaselineUpdatedPayload struct {
Timestamp time.Time `json:"timestamp"`
LinkID string `json:"link_id"`
Reason string `json:"reason"` // "manual", "drift", "schedule"
Confidence float64 `json:"confidence"`
SampleCount int `json:"sample_count"`
}
func (b BaselineUpdatedPayload) EventType() BusEventType { return BusBaselineUpdated }
func (b BaselineUpdatedPayload) GetTimestamp() time.Time { return b.Timestamp }
// ModelUpdatedPayload is emitted when a prediction model is updated.
type ModelUpdatedPayload struct {
Timestamp time.Time `json:"timestamp"`
ModelType string `json:"model_type"` // "prediction", "anomaly", "weights"
PersonID string `json:"person_id,omitempty"`
ZoneID string `json:"zone_id,omitempty"`
SamplesAdded int `json:"samples_added"`
TotalSamples int `json:"total_samples"`
AccuracyPercent float64 `json:"accuracy_percent,omitempty"`
}
func (m ModelUpdatedPayload) EventType() BusEventType { return BusModelUpdated }
func (m ModelUpdatedPayload) GetTimestamp() time.Time { return m.Timestamp }
// EventBus provides a typed publish/subscribe mechanism for internal events.
// It supports multiple subscribers per event type with fan-out delivery.
type EventBus struct {
mu sync.RWMutex
subscribers map[BusEventType][]chan EventPayload
capacity int // Buffer capacity for subscriber channels
}
// NewEventBus creates a new EventBus with the specified channel buffer capacity.
// A capacity of 0 creates unbuffered (synchronous) channels.
// Recommended capacity is 100-1000 for most use cases.
func NewEventBus(capacity int) *EventBus {
return &EventBus{
subscribers: make(map[BusEventType][]chan EventPayload),
capacity: capacity,
}
}
// Subscribe registers a channel to receive events of the specified type.
// The channel will receive events via fan-out; each subscriber gets its own copy.
// Returns the channel that the caller should read from.
// The channel is buffered with the bus's capacity.
// It is the caller's responsibility to close the channel when done.
func (b *EventBus) Subscribe(eventType BusEventType) <-chan EventPayload {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan EventPayload, b.capacity)
b.subscribers[eventType] = append(b.subscribers[eventType], ch)
return ch
}
// Unsubscribe removes a channel from receiving events of the specified type.
// After calling Unsubscribe, the channel will no longer receive events
// and should be closed by the caller.
func (b *EventBus) Unsubscribe(eventType BusEventType, ch <-chan EventPayload) {
b.mu.Lock()
defer b.mu.Unlock()
subs := b.subscribers[eventType]
for i, sub := range subs {
if sub == ch {
// Remove the channel from the slice
b.subscribers[eventType] = append(subs[:i], subs[i+1:]...)
break
}
}
}
// Publish sends an event payload to all subscribers of its type.
// The send is non-blocking; if a subscriber's channel is full, the event is skipped.
// This prevents a slow subscriber from blocking the entire system.
// Returns the number of subscribers that received the event.
func (b *EventBus) Publish(payload EventPayload) int {
eventType := payload.EventType()
b.mu.RLock()
subs := b.subscribers[eventType]
// Make a shallow copy to avoid holding the lock during sends
subsCopy := make([]chan EventPayload, len(subs))
copy(subsCopy, subs)
b.mu.RUnlock()
count := 0
for _, ch := range subsCopy {
select {
case ch <- payload:
count++
default:
// Channel is full, skip this subscriber
// This prevents blocking on slow consumers
}
}
return count
}
// PublishBlocking sends an event payload to all subscribers of its type,
// blocking until all subscribers have received the event or ctx is cancelled.
// Use this for critical events where delivery must be guaranteed.
// Returns the number of subscribers that received the event and any error.
func (b *EventBus) PublishBlocking(ctx context.Context, payload EventPayload) (int, error) {
eventType := payload.EventType()
b.mu.RLock()
subs := b.subscribers[eventType]
subsCopy := make([]chan EventPayload, len(subs))
copy(subsCopy, subs)
b.mu.RUnlock()
count := 0
for _, ch := range subsCopy {
select {
case ch <- payload:
count++
case <-ctx.Done():
return count, ctx.Err()
}
}
return count, nil
}
// SubscriberCount returns the number of active subscribers for the given event type.
func (b *EventBus) SubscriberCount(eventType BusEventType) int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.subscribers[eventType])
}
// Close closes all subscriber channels and releases resources.
// After Close, the bus should not be used.
func (b *EventBus) Close() {
b.mu.Lock()
defer b.mu.Unlock()
for _, subs := range b.subscribers {
for _, ch := range subs {
close(ch)
}
}
b.subscribers = make(map[BusEventType][]chan EventPayload)
}

View file

@ -0,0 +1,604 @@
package events
import (
"context"
"sync"
"testing"
"time"
)
func TestEventBusSubscribePublish(t *testing.T) {
bus := NewEventBus(10)
ch := bus.Subscribe(BusMotionDetected)
defer bus.Unsubscribe(BusMotionDetected, ch)
payload := MotionDetectedPayload{
Timestamp: time.Now(),
ZoneID: "zone-1",
ZoneName: "Kitchen",
BlobID: 1,
Confidence: 0.85,
Position: Position{X: 1.0, Y: 2.0, Z: 0.9},
}
received := bus.Publish(payload)
if received != 1 {
t.Errorf("Publish() returned %d, want 1", received)
}
select {
case event := <-ch:
if got, ok := event.(MotionDetectedPayload); !ok {
t.Errorf("received type %T, want MotionDetectedPayload", event)
} else if got.ZoneID != "zone-1" {
t.Errorf("ZoneID = %q, want zone-1", got.ZoneID)
}
case <-time.After(100 * time.Millisecond):
t.Error("timed out waiting for event")
}
}
func TestEventBusMultipleSubscribers(t *testing.T) {
bus := NewEventBus(10)
ch1 := bus.Subscribe(BusMotionDetected)
ch2 := bus.Subscribe(BusMotionDetected)
ch3 := bus.Subscribe(BusMotionDetected)
defer func() {
bus.Unsubscribe(BusMotionDetected, ch1)
bus.Unsubscribe(BusMotionDetected, ch2)
bus.Unsubscribe(BusMotionDetected, ch3)
}()
payload := MotionDetectedPayload{
Timestamp: time.Now(),
ZoneID: "zone-1",
ZoneName: "Kitchen",
BlobID: 1,
Confidence: 0.85,
Position: Position{X: 1.0, Y: 2.0, Z: 0.9},
}
received := bus.Publish(payload)
if received != 3 {
t.Errorf("Publish() returned %d, want 3", received)
}
// All three subscribers should receive the event
for i, ch := range []<-chan EventPayload{ch1, ch2, ch3} {
select {
case event := <-ch:
if got, ok := event.(MotionDetectedPayload); !ok {
t.Errorf("subscriber %d: received type %T, want MotionDetectedPayload", i, event)
} else if got.ZoneID != "zone-1" {
t.Errorf("subscriber %d: ZoneID = %q, want zone-1", i, got.ZoneID)
}
case <-time.After(100 * time.Millisecond):
t.Errorf("subscriber %d: timed out waiting for event", i)
}
}
}
func TestEventBusPublishWithin10ms(t *testing.T) {
bus := NewEventBus(100)
// Subscribe with multiple subscribers
const numSubscribers = 10
var chans []<-chan EventPayload
for i := 0; i < numSubscribers; i++ {
ch := bus.Subscribe(BusFallDetected)
chans = append(chans, ch)
defer bus.Unsubscribe(BusFallDetected, ch)
}
payload := FallDetectedPayload{
Timestamp: time.Now(),
ZoneID: "zone-1",
ZoneName: "Bathroom",
BlobID: 1,
ZVelocity: -2.5,
Confidence: 0.95,
Position: Position{X: 1.0, Y: 1.0, Z: 0.3},
}
start := time.Now()
received := bus.Publish(payload)
elapsed := time.Since(start)
if received != numSubscribers {
t.Errorf("Publish() returned %d, want %d", received, numSubscribers)
}
// The publish itself should be very fast (non-blocking)
if elapsed > 10*time.Millisecond {
t.Errorf("Publish() took %v, want < 10ms", elapsed)
}
// Verify all subscribers received the event
for i, ch := range chans {
select {
case <-ch:
// Event received
case <-time.After(100 * time.Millisecond):
t.Errorf("subscriber %d: timed out waiting for event", i)
}
}
}
func TestEventBusDifferentEventTypes(t *testing.T) {
bus := NewEventBus(10)
motionCh := bus.Subscribe(BusMotionDetected)
fallCh := bus.Subscribe(BusFallDetected)
nodeCh := bus.Subscribe(BusNodeConnected)
defer func() {
bus.Unsubscribe(BusMotionDetected, motionCh)
bus.Unsubscribe(BusFallDetected, fallCh)
bus.Unsubscribe(BusNodeConnected, nodeCh)
}()
motionPayload := MotionDetectedPayload{
Timestamp: time.Now(),
ZoneID: "zone-1",
ZoneName: "Kitchen",
BlobID: 1,
Confidence: 0.85,
Position: Position{X: 1.0, Y: 2.0, Z: 0.9},
}
fallPayload := FallDetectedPayload{
Timestamp: time.Now(),
ZoneID: "zone-2",
ZoneName: "Bathroom",
BlobID: 2,
ZVelocity: -2.5,
Confidence: 0.95,
Position: Position{X: 0.5, Y: 0.5, Z: 0.3},
}
nodePayload := NodeConnectedPayload{
Timestamp: time.Now(),
NodeMAC: "AA:BB:CC:DD:EE:FF",
NodeName: "Kitchen North",
FirmwareVer: "1.0.0",
IPAddress: "192.168.1.100",
}
// Publish different event types
bus.Publish(motionPayload)
bus.Publish(fallPayload)
bus.Publish(nodePayload)
// Verify motion subscriber only gets motion events
select {
case event := <-motionCh:
if _, ok := event.(MotionDetectedPayload); !ok {
t.Errorf("motionCh received type %T, want MotionDetectedPayload", event)
}
case <-time.After(100 * time.Millisecond):
t.Error("motionCh timed out")
}
// Verify fall subscriber only gets fall events
select {
case event := <-fallCh:
if _, ok := event.(FallDetectedPayload); !ok {
t.Errorf("fallCh received type %T, want FallDetectedPayload", event)
}
case <-time.After(100 * time.Millisecond):
t.Error("fallCh timed out")
}
// Verify node subscriber only gets node events
select {
case event := <-nodeCh:
if _, ok := event.(NodeConnectedPayload); !ok {
t.Errorf("nodeCh received type %T, want NodeConnectedPayload", event)
}
case <-time.After(100 * time.Millisecond):
t.Error("nodeCh timed out")
}
}
func TestEventBusUnsubscribe(t *testing.T) {
bus := NewEventBus(10)
ch := bus.Subscribe(BusMotionDetected)
payload := MotionDetectedPayload{
Timestamp: time.Now(),
ZoneID: "zone-1",
ZoneName: "Kitchen",
BlobID: 1,
Confidence: 0.85,
Position: Position{X: 1.0, Y: 2.0, Z: 0.9},
}
// Publish before unsubscribe
received := bus.Publish(payload)
if received != 1 {
t.Errorf("Publish() before unsubscribe returned %d, want 1", received)
}
// Consume the event
<-ch
// Unsubscribe
bus.Unsubscribe(BusMotionDetected, ch)
// Publish after unsubscribe
received = bus.Publish(payload)
if received != 0 {
t.Errorf("Publish() after unsubscribe returned %d, want 0", received)
}
// Verify no more events
select {
case <-ch:
t.Error("received event after unsubscribe")
case <-time.After(50 * time.Millisecond):
// Expected - no event should be received
}
}
func TestEventBusChannelFull(t *testing.T) {
bus := NewEventBus(1) // Very small buffer
ch := bus.Subscribe(BusMotionDetected)
defer bus.Unsubscribe(BusMotionDetected, ch)
payload := MotionDetectedPayload{
Timestamp: time.Now(),
ZoneID: "zone-1",
ZoneName: "Kitchen",
BlobID: 1,
Confidence: 0.85,
Position: Position{X: 1.0, Y: 2.0, Z: 0.9},
}
// Fill the channel without consuming
received := bus.Publish(payload)
if received != 1 {
t.Errorf("first Publish() returned %d, want 1", received)
}
// Second publish should be skipped (channel full)
received = bus.Publish(payload)
if received != 0 {
t.Errorf("second Publish() with full channel returned %d, want 0", received)
}
// Consume the first event
<-ch
// Now publish should succeed again
received = bus.Publish(payload)
if received != 1 {
t.Errorf("third Publish() returned %d, want 1", received)
}
}
func TestEventBusPublishBlocking(t *testing.T) {
bus := NewEventBus(1)
ch := bus.Subscribe(BusMotionDetected)
defer bus.Unsubscribe(BusMotionDetected, ch)
payload := MotionDetectedPayload{
Timestamp: time.Now(),
ZoneID: "zone-1",
ZoneName: "Kitchen",
BlobID: 1,
Confidence: 0.85,
Position: Position{X: 1.0, Y: 2.0, Z: 0.9},
}
ctx := context.Background()
received, err := bus.PublishBlocking(ctx, payload)
if err != nil {
t.Errorf("PublishBlocking() error = %v", err)
}
if received != 1 {
t.Errorf("PublishBlocking() returned %d, want 1", received)
}
// Verify event was received
select {
case <-ch:
// OK
case <-time.After(100 * time.Millisecond):
t.Error("timed out waiting for event")
}
}
func TestEventBusPublishBlockingCancelled(t *testing.T) {
bus := NewEventBus(1)
// Fill the channel
ch := bus.Subscribe(BusMotionDetected)
defer bus.Unsubscribe(BusMotionDetected, ch)
bus.Publish(MotionDetectedPayload{Timestamp: time.Now()})
payload := MotionDetectedPayload{
Timestamp: time.Now(),
ZoneID: "zone-1",
ZoneName: "Kitchen",
BlobID: 1,
Confidence: 0.85,
Position: Position{X: 1.0, Y: 2.0, Z: 0.9},
}
// Create a context that's already cancelled
ctx, cancel := context.WithCancel(context.Background())
cancel()
received, err := bus.PublishBlocking(ctx, payload)
if err == nil {
t.Error("PublishBlocking() with cancelled context should return error")
}
if received != 0 {
t.Errorf("PublishBlocking() with cancelled context returned %d, want 0", received)
}
}
func TestEventBusSubscriberCount(t *testing.T) {
bus := NewEventBus(10)
if count := bus.SubscriberCount(BusMotionDetected); count != 0 {
t.Errorf("SubscriberCount() = %d, want 0", count)
}
ch1 := bus.Subscribe(BusMotionDetected)
ch2 := bus.Subscribe(BusMotionDetected)
if count := bus.SubscriberCount(BusMotionDetected); count != 2 {
t.Errorf("SubscriberCount() = %d, want 2", count)
}
bus.Unsubscribe(BusMotionDetected, ch1)
if count := bus.SubscriberCount(BusMotionDetected); count != 1 {
t.Errorf("SubscriberCount() after unsubscribe = %d, want 1", count)
}
bus.Unsubscribe(BusMotionDetected, ch2)
if count := bus.SubscriberCount(BusMotionDetected); count != 0 {
t.Errorf("SubscriberCount() after all unsubscribe = %d, want 0", count)
}
}
func TestEventBusClose(t *testing.T) {
bus := NewEventBus(10)
ch1 := bus.Subscribe(BusMotionDetected)
ch2 := bus.Subscribe(BusFallDetected)
bus.Close()
// Channels should be closed
select {
case _, ok := <-ch1:
if ok {
t.Error("ch1 should be closed after Close()")
}
case <-time.After(100 * time.Millisecond):
t.Error("ch1 should be closed immediately")
}
select {
case _, ok := <-ch2:
if ok {
t.Error("ch2 should be closed after Close()")
}
case <-time.After(100 * time.Millisecond):
t.Error("ch2 should be closed immediately")
}
// Publish after close should deliver to no subscribers
payload := MotionDetectedPayload{
Timestamp: time.Now(),
ZoneID: "zone-1",
ZoneName: "Kitchen",
BlobID: 1,
Confidence: 0.85,
Position: Position{X: 1.0, Y: 2.0, Z: 0.9},
}
if received := bus.Publish(payload); received != 0 {
t.Errorf("Publish() after Close() returned %d, want 0", received)
}
}
func TestEventBusConcurrentPublish(t *testing.T) {
bus := NewEventBus(100)
const numSubscribers = 10
const numPublishers = 5
const eventsPerPublisher = 100
var chans []<-chan EventPayload
for i := 0; i < numSubscribers; i++ {
ch := bus.Subscribe(BusMotionDetected)
chans = append(chans, ch)
defer bus.Unsubscribe(BusMotionDetected, ch)
}
var wg sync.WaitGroup
wg.Add(numPublishers)
// Start multiple publishers
for p := 0; p < numPublishers; p++ {
go func(publisherID int) {
defer wg.Done()
for i := 0; i < eventsPerPublisher; i++ {
payload := MotionDetectedPayload{
Timestamp: time.Now(),
ZoneID: "zone-1",
ZoneName: "Kitchen",
BlobID: publisherID*eventsPerPublisher + i,
Confidence: 0.85,
Position: Position{X: float64(i), Y: float64(i), Z: 0.9},
}
bus.Publish(payload)
}
}(p)
}
wg.Wait()
// Count total received events across all subscribers
var receivedWg sync.WaitGroup
receivedCounts := make([]int, numSubscribers)
for i, ch := range chans {
receivedWg.Add(1)
go func(subscriberID int, ch <-chan EventPayload) {
defer receivedWg.Done()
count := 0
timeout := time.After(500 * time.Millisecond)
for {
select {
case <-ch:
count++
case <-timeout:
receivedCounts[subscriberID] = count
return
}
}
}(i, ch)
}
receivedWg.Wait()
expectedTotal := numPublishers * eventsPerPublisher
for i, count := range receivedCounts {
if count != expectedTotal {
t.Errorf("subscriber %d received %d events, want %d", i, count, expectedTotal)
}
}
}
// TestAllPayloadTypes verifies that all defined event types have a corresponding payload struct.
func TestAllPayloadTypes(t *testing.T) {
payloads := []struct {
name string
eventType BusEventType
payload EventPayload
}{
{"MotionDetected", BusMotionDetected, MotionDetectedPayload{Timestamp: time.Now()}},
{"MotionStopped", BusMotionStopped, MotionStoppedPayload{Timestamp: time.Now()}},
{"ZoneTransition", BusZoneTransition, ZoneTransitionPayload{Timestamp: time.Now()}},
{"ZoneEntry", BusZoneEntry, ZoneEntryPayload{Timestamp: time.Now()}},
{"ZoneExit", BusZoneExit, ZoneExitPayload{Timestamp: time.Now()}},
{"FallDetected", BusFallDetected, FallDetectedPayload{Timestamp: time.Now()}},
{"FallConfirmed", BusFallConfirmed, FallConfirmedPayload{Timestamp: time.Now()}},
{"NodeConnected", BusNodeConnected, NodeConnectedPayload{Timestamp: time.Now()}},
{"NodeDisconnected", BusNodeDisconnected, NodeDisconnectedPayload{Timestamp: time.Now()}},
{"NodeReconnected", BusNodeReconnected, NodeReconnectedPayload{Timestamp: time.Now()}},
{"NodeStale", BusNodeStale, NodeStalePayload{Timestamp: time.Now()}},
{"SystemStarted", BusSystemStarted, SystemStartedPayload{Timestamp: time.Now()}},
{"SystemShutdown", BusSystemShutdown, SystemShutdownPayload{Timestamp: time.Now()}},
{"ConfigChanged", BusConfigChanged, ConfigChangedPayload{Timestamp: time.Now()}},
{"TriggerFired", BusTriggerFired, TriggerFiredPayload{Timestamp: time.Now()}},
{"TriggerCleared", BusTriggerCleared, TriggerClearedPayload{Timestamp: time.Now()}},
{"BaselineUpdated", BusBaselineUpdated, BaselineUpdatedPayload{Timestamp: time.Now()}},
{"ModelUpdated", BusModelUpdated, ModelUpdatedPayload{Timestamp: time.Now()}},
}
for _, tt := range payloads {
t.Run(tt.name, func(t *testing.T) {
// Verify EventType() returns the correct type
if tt.payload.EventType() != tt.eventType {
t.Errorf("EventType() = %v, want %v", tt.payload.EventType(), tt.eventType)
}
// Verify GetTimestamp() returns a non-zero time
if tt.payload.GetTimestamp().IsZero() {
t.Error("GetTimestamp() returned zero time")
}
// Verify payload can be published and received
bus := NewEventBus(1)
ch := bus.Subscribe(tt.eventType)
defer bus.Unsubscribe(tt.eventType, ch)
received := bus.Publish(tt.payload)
if received != 1 {
t.Errorf("Publish() returned %d, want 1", received)
}
select {
case event := <-ch:
// Verify we received the same type
if event.EventType() != tt.eventType {
t.Errorf("received EventType() = %v, want %v", event.EventType(), tt.eventType)
}
case <-time.After(100 * time.Millisecond):
t.Error("timed out waiting for event")
}
})
}
}
// TestPayloadInterfaces verifies that all payload structs implement EventPayload correctly.
func TestPayloadInterfaces(t *testing.T) {
// This is a compile-time check that all payloads implement EventPayload
var _ EventPayload = MotionDetectedPayload{}
var _ EventPayload = MotionStoppedPayload{}
var _ EventPayload = ZoneTransitionPayload{}
var _ EventPayload = ZoneEntryPayload{}
var _ EventPayload = ZoneExitPayload{}
var _ EventPayload = FallDetectedPayload{}
var _ EventPayload = FallConfirmedPayload{}
var _ EventPayload = NodeConnectedPayload{}
var _ EventPayload = NodeDisconnectedPayload{}
var _ EventPayload = NodeReconnectedPayload{}
var _ EventPayload = NodeStalePayload{}
var _ EventPayload = SystemStartedPayload{}
var _ EventPayload = SystemShutdownPayload{}
var _ EventPayload = ConfigChangedPayload{}
var _ EventPayload = TriggerFiredPayload{}
var _ EventPayload = TriggerClearedPayload{}
var _ EventPayload = BaselineUpdatedPayload{}
var _ EventPayload = ModelUpdatedPayload{}
}
func TestEventBusZeroCapacity(t *testing.T) {
bus := NewEventBus(0) // Unbuffered channels
ch := bus.Subscribe(BusMotionDetected)
defer bus.Unsubscribe(BusMotionDetected, ch)
payload := MotionDetectedPayload{
Timestamp: time.Now(),
ZoneID: "zone-1",
ZoneName: "Kitchen",
BlobID: 1,
Confidence: 0.85,
Position: Position{X: 1.0, Y: 2.0, Z: 0.9},
}
// Publish should be skipped since there's no receiver waiting
received := bus.Publish(payload)
if received != 0 {
t.Errorf("Publish() to unbuffered channel with no receiver returned %d, want 0", received)
}
// Now receive in a goroutine and publish
done := make(chan struct{})
go func() {
<-ch
close(done)
}()
time.Sleep(10 * time.Millisecond) // Let the goroutine block on receive
received = bus.Publish(payload)
if received != 1 {
t.Errorf("Publish() to unbuffered channel with waiting receiver returned %d, want 1", received)
}
<-done
}