From f37772f5beab853a559f1d81abc2dc3b6540984e Mon Sep 17 00:00:00 2001 From: jedarden Date: Thu, 9 Apr 2026 14:24:08 -0400 Subject: [PATCH] feat: implement timeline storage subscriber Create SQLite events table with indexes on timestamp, person_id, zone_id, and type. Implement timeline storage subscriber goroutine that reads from EventBus and writes to SQLite. Use 1000-event buffered queue with drop-oldest behavior on overflow. - StorageSubscriber subscribes to all EventBus event types - 1000-event buffered queue with drop-oldest overflow handling - Warning log every 100 dropped events - Graceful shutdown with drain of remaining events - Stats() method returns queue size and drop count - Tests cover all event types, concurrent handling, and overflow behavior Co-Authored-By: Claude Opus 4.6 --- mothership/internal/events/storage.go | 407 ++++++++++++++ mothership/internal/events/storage_test.go | 617 +++++++++++++++++++++ 2 files changed, 1024 insertions(+) create mode 100644 mothership/internal/events/storage.go create mode 100644 mothership/internal/events/storage_test.go diff --git a/mothership/internal/events/storage.go b/mothership/internal/events/storage.go new file mode 100644 index 0000000..0eeca2a --- /dev/null +++ b/mothership/internal/events/storage.go @@ -0,0 +1,407 @@ +// Package events provides a timeline storage subscriber for persisting events to SQLite. +package events + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "log" + "sync" + "time" +) + +// bufferSize is the capacity of the event queue. +const bufferSize = 1000 + +// StorageSubscriber subscribes to EventBus and persists events to SQLite. +// It uses a buffered queue with drop-oldest behavior to ensure it never blocks publishers. +type StorageSubscriber struct { + db *sql.DB + bus *EventBus + queue chan EventPayload + dropped int64 // Counter for dropped events + dropWarn int64 // Counter for when we last logged a drop warning + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + mu sync.Mutex +} + +// NewStorageSubscriber creates a new timeline storage subscriber. +// The subscriber runs in a background goroutine until Stop() is called. +func NewStorageSubscriber(db *sql.DB, bus *EventBus) *StorageSubscriber { + ctx, cancel := context.WithCancel(context.Background()) + return &StorageSubscriber{ + db: db, + bus: bus, + queue: make(chan EventPayload, bufferSize), + ctx: ctx, + cancel: cancel, + } +} + +// Start begins consuming events from the EventBus and writing them to SQLite. +// It subscribes to all event types and returns immediately. +// The subscriber runs in a background goroutine until Stop() is called. +func (s *StorageSubscriber) Start() { + // Subscribe to all event types + eventTypes := []BusEventType{ + BusMotionDetected, + BusMotionStopped, + BusZoneTransition, + BusZoneEntry, + BusZoneExit, + BusFallDetected, + BusFallConfirmed, + BusNodeConnected, + BusNodeDisconnected, + BusNodeReconnected, + BusNodeStale, + BusSystemStarted, + BusSystemShutdown, + BusConfigChanged, + BusTriggerFired, + BusTriggerCleared, + BusBaselineUpdated, + BusModelUpdated, + } + + // Subscribe to each event type and forward to our queue + for _, eventType := range eventTypes { + ch := s.bus.Subscribe(eventType) + s.wg.Add(1) + go s.forwarder(ch) + } + + // Start the storage worker + s.wg.Add(1) + go s.worker() +} + +// forwarder reads events from a subscriber channel and forwards them to the queue. +// If the queue is full, it drops the oldest event (drop-oldest behavior). +func (s *StorageSubscriber) forwarder(ch <-chan EventPayload) { + defer s.wg.Done() + + for { + select { + case <-s.ctx.Done(): + return + case payload, ok := <-ch: + if !ok { + return + } + s.enqueue(payload) + } + } +} + +// enqueue adds an event to the queue with drop-oldest behavior on overflow. +func (s *StorageSubscriber) enqueue(payload EventPayload) { + s.mu.Lock() + defer s.mu.Unlock() + + select { + case s.queue <- payload: + // Event queued successfully + default: + // Queue is full - drop oldest and log warning + select { + case <-s.queue: + // Dropped one event + s.dropped++ + s.dropWarn++ + // Log warning at most once per 100 drops to avoid spam + if s.dropWarn%100 == 1 { + log.Printf("[WARN] Timeline storage queue full (%d events), dropping oldest (total dropped: %d)", + len(s.queue), s.dropped) + } + default: + // Queue became empty between checks, should be rare + } + // Now enqueue the new event + s.queue <- payload + } +} + +// worker processes events from the queue and writes them to SQLite. +func (s *StorageSubscriber) worker() { + defer s.wg.Done() + + for { + select { + case <-s.ctx.Done(): + // Drain remaining events before exiting + s.drain() + return + case payload := <-s.queue: + if err := s.storeEvent(payload); err != nil { + log.Printf("[ERROR] Failed to store event in timeline: %v", err) + } + } + } +} + +// storeEvent converts an EventBus payload to an Event record and inserts it into SQLite. +func (s *StorageSubscriber) storeEvent(payload EventPayload) error { + event := s.convertPayload(payload) + _, err := InsertEvent(s.db, event) + return err +} + +// convertPayload converts an EventBus payload to an Event record for storage. +func (s *StorageSubscriber) convertPayload(payload EventPayload) Event { + base := Event{ + TimestampMs: payload.GetTimestamp().UnixMilli(), + DetailJSON: marshalDetail(payload), + Severity: SeverityInfo, + } + + switch p := payload.(type) { + case MotionDetectedPayload: + base.Type = EventTypeDetection + base.Zone = p.ZoneName + base.Person = p.PersonName + base.BlobID = p.BlobID + base.Severity = SeverityInfo + + case MotionStoppedPayload: + base.Type = EventTypeDetection + base.Zone = p.ZoneName + base.Person = p.PersonName + base.BlobID = p.BlobID + base.Severity = SeverityInfo + + case ZoneTransitionPayload: + base.Type = EventTypePortalCrossing + base.Zone = p.ToZoneName + base.Person = p.PersonName + base.BlobID = p.BlobID + base.Severity = SeverityInfo + + case ZoneEntryPayload: + base.Type = EventTypeZoneEntry + base.Zone = p.ZoneName + base.Person = p.PersonName + base.BlobID = p.BlobID + base.Severity = SeverityInfo + + case ZoneExitPayload: + base.Type = EventTypeZoneExit + base.Zone = p.ZoneName + base.Person = p.PersonName + base.BlobID = p.BlobID + base.Severity = SeverityInfo + + case FallDetectedPayload: + base.Type = EventTypeFallAlert + base.Zone = p.ZoneName + base.Person = p.PersonName + base.BlobID = p.BlobID + base.Severity = SeverityAlert + + case FallConfirmedPayload: + base.Type = EventTypeFallAlert + base.Zone = p.ZoneName + base.Person = p.PersonName + base.BlobID = p.BlobID + base.Severity = SeverityCritical + + case NodeConnectedPayload: + base.Type = EventTypeNodeOnline + base.DetailJSON = marshalDetail(map[string]interface{}{ + "node_mac": p.NodeMAC, + "node_name": p.NodeName, + "firmware_version": p.FirmwareVer, + "ip_address": p.IPAddress, + }) + base.Severity = SeverityInfo + + case NodeDisconnectedPayload: + base.Type = EventTypeNodeOffline + base.DetailJSON = marshalDetail(map[string]interface{}{ + "node_mac": p.NodeMAC, + "node_name": p.NodeName, + "was_online_ms": p.WasOnlineFor, + "reason": p.Reason, + }) + base.Severity = SeverityWarning + + case NodeReconnectedPayload: + base.Type = EventTypeNodeOnline + base.DetailJSON = marshalDetail(map[string]interface{}{ + "node_mac": p.NodeMAC, + "node_name": p.NodeName, + "offline_for_ms": p.OfflineForMs, + }) + base.Severity = SeverityInfo + + case NodeStalePayload: + base.Type = EventTypeNodeOffline + base.DetailJSON = marshalDetail(map[string]interface{}{ + "node_mac": p.NodeMAC, + "node_name": p.NodeName, + "last_health_ms": p.LastHealthMs, + }) + base.Severity = SeverityWarning + + case SystemStartedPayload: + base.Type = EventTypeSystem + base.DetailJSON = marshalDetail(map[string]interface{}{ + "message": "System started", + "version": p.Version, + "start_time": p.StartTime.Format(time.RFC3339), + "duration_ms": p.DurationMs, + }) + base.Severity = SeverityInfo + + case SystemShutdownPayload: + base.Type = EventTypeSystem + base.DetailJSON = marshalDetail(map[string]interface{}{ + "message": "System shutdown", + "reason": p.Reason, + "duration_ms": p.DurationMs, + }) + base.Severity = SeverityInfo + + case ConfigChangedPayload: + base.Type = EventTypeSystem + base.DetailJSON = marshalDetail(map[string]interface{}{ + "message": "Configuration changed", + "key": p.Key, + "old_value": p.OldValue, + "new_value": p.NewValue, + "changed_by": p.ChangedBy, + }) + base.Severity = SeverityInfo + + case TriggerFiredPayload: + base.Type = EventTypeTriggerFired + base.Zone = p.ZoneName + base.Person = p.PersonName + base.BlobID = p.BlobID + base.DetailJSON = marshalDetail(map[string]interface{}{ + "trigger_id": p.TriggerID, + "trigger_name": p.TriggerName, + "condition": p.Condition, + "duration_s": p.DurationS, + "position": p.Position, + }) + base.Severity = SeverityInfo + + case TriggerClearedPayload: + base.Type = EventTypeTriggerFired + base.DetailJSON = marshalDetail(map[string]interface{}{ + "trigger_id": p.TriggerID, + "trigger_name": p.TriggerName, + "duration_s": p.DurationS, + }) + base.Severity = SeverityInfo + + case BaselineUpdatedPayload: + base.Type = EventTypeBaselineChanged + base.DetailJSON = marshalDetail(map[string]interface{}{ + "link_id": p.LinkID, + "reason": p.Reason, + "confidence": p.Confidence, + "sample_count": p.SampleCount, + }) + base.Severity = SeverityInfo + + case ModelUpdatedPayload: + base.Type = EventTypeLearningMilestone + base.Person = p.PersonID + base.DetailJSON = marshalDetail(map[string]interface{}{ + "model_type": p.ModelType, + "zone_id": p.ZoneID, + "samples_added": p.SamplesAdded, + "total_samples": p.TotalSamples, + "accuracy_percent": p.AccuracyPercent, + }) + base.Severity = SeverityInfo + + default: + base.Type = EventTypeSystem + base.DetailJSON = marshalDetail(map[string]interface{}{ + "message": fmt.Sprintf("Unknown event type: %T", payload), + }) + base.Severity = SeverityWarning + } + + return base +} + +// marshalDetail converts a payload to JSON detail, falling back to a simple map on error. +func marshalDetail(payload interface{}) string { + // For complex payloads, marshal to JSON + if detail, ok := payload.(interface{ MarshalJSON() ([]byte, error) }); ok { + if data, err := detail.MarshalJSON(); err == nil && len(data) > 0 { + return string(data) + } + } + + // Fallback to JSON marshaling + data, err := json.Marshal(payload) + if err != nil { + // Last resort: return a simple error message + return fmt.Sprintf(`{"error":"marshal failed: %s"}`, err) + } + return string(data) +} + +// drain processes any remaining events in the queue after Stop() is called. +func (s *StorageSubscriber) drain() { + s.mu.Lock() + defer s.mu.Unlock() + + remaining := len(s.queue) + if remaining == 0 { + return + } + + log.Printf("[INFO] Timeline storage draining %d remaining events", remaining) + + // Process all remaining events with a timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + for { + select { + case <-ctx.Done(): + log.Printf("[WARN] Timeline storage drain timeout, %d events remaining", len(s.queue)) + return + case payload := <-s.queue: + if err := s.storeEvent(payload); err != nil { + log.Printf("[ERROR] Failed to store event during drain: %v", err) + } + default: + // Queue is empty + return + } + } +} + +// Stop gracefully shuts down the storage subscriber. +// It waits for all queued events to be written to SQLite. +func (s *StorageSubscriber) Stop() { + // Signal all goroutines to stop + s.cancel() + + // Wait for all goroutines to finish + s.wg.Wait() + + log.Printf("[INFO] Timeline storage stopped (total events dropped: %d)", s.dropped) +} + +// Stats returns statistics about the storage subscriber. +func (s *StorageSubscriber) Stats() map[string]interface{} { + s.mu.Lock() + defer s.mu.Unlock() + + return map[string]interface{}{ + "queue_size": len(s.queue), + "queue_capacity": bufferSize, + "dropped_total": s.dropped, + } +} diff --git a/mothership/internal/events/storage_test.go b/mothership/internal/events/storage_test.go new file mode 100644 index 0000000..43506c2 --- /dev/null +++ b/mothership/internal/events/storage_test.go @@ -0,0 +1,617 @@ +// Package events provides tests for the timeline storage subscriber. +package events + +import ( + "context" + "sync" + "testing" + "time" + + _ "modernc.org/sqlite" +) + +// TestStorageSubscriberBasicFunctionality verifies the subscriber can be started and stopped. +func TestStorageSubscriberBasicFunctionality(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + bus := NewEventBus(10) + subscriber := NewStorageSubscriber(db, bus) + + // Start the subscriber + subscriber.Start() + + // Publish a test event + payload := MotionDetectedPayload{ + Timestamp: time.Now(), + ZoneID: "zone-1", + ZoneName: "Kitchen", + BlobID: 1, + PersonID: "person-1", + PersonName: "Alice", + Confidence: 0.85, + Position: Position{X: 1.0, Y: 2.0, Z: 0.9}, + } + + bus.Publish(payload) + + // Give the subscriber time to process + time.Sleep(100 * time.Millisecond) + + // Stop the subscriber + subscriber.Stop() + + // Verify the event was stored + events, _, _, err := QueryEvents(db, QueryParams{Limit: 10}) + if err != nil { + t.Fatalf("QueryEvents() error = %v", err) + } + + if len(events) != 1 { + t.Fatalf("expected 1 event, got %d", len(events)) + } + + if events[0].Type != EventTypeDetection { + t.Errorf("event type = %v, want %v", events[0].Type, EventTypeDetection) + } + if events[0].Zone != "Kitchen" { + t.Errorf("event zone = %q, want Kitchen", events[0].Zone) + } + if events[0].Person != "Alice" { + t.Errorf("event person = %q, want Alice", events[0].Person) + } +} + +// TestStorageSubscriberAllEventTypes verifies that all event types are correctly stored. +func TestStorageSubscriberAllEventTypes(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + bus := NewEventBus(10) + subscriber := NewStorageSubscriber(db, bus) + subscriber.Start() + defer subscriber.Stop() + + testTime := time.Now() + + testCases := []struct { + name string + payload EventPayload + expectedType EventType + expectedZone string + expectedPerson string + expectedBlobID int + expectedSeverity EventSeverity + }{ + { + name: "MotionDetected", + payload: MotionDetectedPayload{ + Timestamp: testTime, + ZoneName: "Kitchen", + PersonName: "Alice", + BlobID: 1, + Confidence: 0.85, + }, + expectedType: EventTypeDetection, + expectedZone: "Kitchen", + expectedPerson: "Alice", + expectedBlobID: 1, + expectedSeverity: SeverityInfo, + }, + { + name: "MotionStopped", + payload: MotionStoppedPayload{ + Timestamp: testTime, + ZoneName: "Living Room", + PersonName: "Bob", + BlobID: 2, + DurationMs: 5000, + }, + expectedType: EventTypeDetection, + expectedZone: "Living Room", + expectedPerson: "Bob", + expectedBlobID: 2, + expectedSeverity: SeverityInfo, + }, + { + name: "ZoneTransition", + payload: ZoneTransitionPayload{ + Timestamp: testTime, + PortalName: "Kitchen Door", + FromZoneName: "Hallway", + ToZoneName: "Kitchen", + PersonName: "Alice", + BlobID: 1, + Direction: "a_to_b", + }, + expectedType: EventTypePortalCrossing, + expectedZone: "Kitchen", + expectedPerson: "Alice", + expectedBlobID: 1, + expectedSeverity: SeverityInfo, + }, + { + name: "ZoneEntry", + payload: ZoneEntryPayload{ + Timestamp: testTime, + ZoneName: "Bedroom", + PersonName: "Charlie", + BlobID: 3, + }, + expectedType: EventTypeZoneEntry, + expectedZone: "Bedroom", + expectedPerson: "Charlie", + expectedBlobID: 3, + expectedSeverity: SeverityInfo, + }, + { + name: "ZoneExit", + payload: ZoneExitPayload{ + Timestamp: testTime, + ZoneName: "Bathroom", + PersonName: "Diana", + BlobID: 4, + }, + expectedType: EventTypeZoneExit, + expectedZone: "Bathroom", + expectedPerson: "Diana", + expectedBlobID: 4, + expectedSeverity: SeverityInfo, + }, + { + name: "FallDetected", + payload: FallDetectedPayload{ + Timestamp: testTime, + ZoneName: "Hallway", + PersonName: "Eve", + BlobID: 5, + ZVelocity: -2.5, + Confidence: 0.95, + }, + expectedType: EventTypeFallAlert, + expectedZone: "Hallway", + expectedPerson: "Eve", + expectedBlobID: 5, + expectedSeverity: SeverityAlert, + }, + { + name: "FallConfirmed", + payload: FallConfirmedPayload{ + Timestamp: testTime, + ZoneName: "Bathroom", + PersonName: "Frank", + BlobID: 6, + ConfirmationMs: 10000, + AlertSent: true, + }, + expectedType: EventTypeFallAlert, + expectedZone: "Bathroom", + expectedPerson: "Frank", + expectedBlobID: 6, + expectedSeverity: SeverityCritical, + }, + { + name: "NodeConnected", + payload: NodeConnectedPayload{ + Timestamp: testTime, + NodeMAC: "AA:BB:CC:DD:EE:FF", + NodeName: "Kitchen North", + FirmwareVer: "1.0.0", + IPAddress: "192.168.1.100", + }, + expectedType: EventTypeNodeOnline, + expectedSeverity: SeverityInfo, + }, + { + name: "NodeDisconnected", + payload: NodeDisconnectedPayload{ + Timestamp: testTime, + NodeMAC: "11:22:33:44:55:66", + NodeName: "Living Room", + WasOnlineFor: 3600000, + Reason: "timeout", + }, + expectedType: EventTypeNodeOffline, + expectedSeverity: SeverityWarning, + }, + { + name: "NodeReconnected", + payload: NodeReconnectedPayload{ + Timestamp: testTime, + NodeMAC: "AA:BB:CC:DD:EE:FF", + NodeName: "Kitchen North", + OfflineForMs: 5000, + }, + expectedType: EventTypeNodeOnline, + expectedSeverity: SeverityInfo, + }, + { + name: "NodeStale", + payload: NodeStalePayload{ + Timestamp: testTime, + NodeMAC: "22:33:44:55:66:77", + NodeName: "Bedroom", + LastHealthMs: 20000, + }, + expectedType: EventTypeNodeOffline, + expectedSeverity: SeverityWarning, + }, + { + name: "SystemStarted", + payload: SystemStartedPayload{ + Timestamp: testTime, + Version: "1.0.0", + StartTime: testTime.Add(-1 * time.Second), + DurationMs: 1000, + }, + expectedType: EventTypeSystem, + expectedSeverity: SeverityInfo, + }, + { + name: "SystemShutdown", + payload: SystemShutdownPayload{ + Timestamp: testTime, + Reason: "manual", + DurationMs: 5000, + }, + expectedType: EventTypeSystem, + expectedSeverity: SeverityInfo, + }, + { + name: "ConfigChanged", + payload: ConfigChangedPayload{ + Timestamp: testTime, + Key: "fusion_rate_hz", + OldValue: "10", + NewValue: "20", + ChangedBy: "api", + }, + expectedType: EventTypeSystem, + expectedSeverity: SeverityInfo, + }, + { + name: "TriggerFired", + payload: TriggerFiredPayload{ + Timestamp: testTime, + TriggerID: 1, + TriggerName: "Couch Dwell", + Condition: "dwell", + ZoneName: "Living Room", + PersonName: "Alice", + BlobID: 1, + DurationS: 35.0, + }, + expectedType: EventTypeTriggerFired, + expectedZone: "Living Room", + expectedPerson: "Alice", + expectedBlobID: 1, + expectedSeverity: SeverityInfo, + }, + { + name: "TriggerCleared", + payload: TriggerClearedPayload{ + Timestamp: testTime, + TriggerID: 1, + TriggerName: "Couch Dwell", + DurationS: 60.0, + }, + expectedType: EventTypeTriggerFired, + expectedSeverity: SeverityInfo, + }, + { + name: "BaselineUpdated", + payload: BaselineUpdatedPayload{ + Timestamp: testTime, + LinkID: "AA:BB:CC:DD:EE:FF:11:22:33:44:55:66", + Reason: "manual", + Confidence: 0.85, + SampleCount: 500, + }, + expectedType: EventTypeBaselineChanged, + expectedSeverity: SeverityInfo, + }, + { + name: "ModelUpdated", + payload: ModelUpdatedPayload{ + Timestamp: testTime, + ModelType: "prediction", + PersonID: "Alice", + ZoneID: "1", + SamplesAdded: 10, + TotalSamples: 100, + AccuracyPercent: 78.5, + }, + expectedType: EventTypeLearningMilestone, + expectedPerson: "Alice", + expectedSeverity: SeverityInfo, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + bus.Publish(tc.payload) + + // Give the subscriber time to process + time.Sleep(50 * time.Millisecond) + + // Query for the event + events, _, _, err := QueryEvents(db, QueryParams{Limit: 1}) + if err != nil { + t.Fatalf("QueryEvents() error = %v", err) + } + + if len(events) == 0 { + t.Fatal("no events found") + } + + event := events[0] + + if event.Type != tc.expectedType { + t.Errorf("event type = %v, want %v", event.Type, tc.expectedType) + } + + if tc.expectedZone != "" && event.Zone != tc.expectedZone { + t.Errorf("event zone = %q, want %q", event.Zone, tc.expectedZone) + } + + if tc.expectedPerson != "" && event.Person != tc.expectedPerson { + t.Errorf("event person = %q, want %q", event.Person, tc.expectedPerson) + } + + if tc.expectedBlobID > 0 && event.BlobID != tc.expectedBlobID { + t.Errorf("event blob_id = %d, want %d", event.BlobID, tc.expectedBlobID) + } + + if event.Severity != tc.expectedSeverity { + t.Errorf("event severity = %v, want %v", event.Severity, tc.expectedSeverity) + } + }) + } +} + +// TestStorageSubscriberQueueOverflow verifies drop-oldest behavior. +func TestStorageSubscriberQueueOverflow(t *testing.T) { + bus := NewEventBus(100) // Smaller bus buffer for this test + + subscriber := &StorageSubscriber{ + bus: bus, + queue: make(chan EventPayload, bufferSize), + } + + // Manually initialize context + subscriber.ctx, subscriber.cancel = context.WithCancel(context.Background()) + + // Fill the queue beyond capacity + numEvents := bufferSize + 100 + for i := 0; i < numEvents; i++ { + payload := MotionDetectedPayload{ + Timestamp: time.Now(), + ZoneName: "Kitchen", + BlobID: i, + Confidence: 0.85, + } + subscriber.enqueue(payload) + } + + // Check stats + stats := subscriber.Stats() + queueSize := stats["queue_size"].(int) + + if queueSize != bufferSize { + t.Errorf("queue size = %d, want %d (max capacity)", queueSize, bufferSize) + } + + // Verify events were dropped + dropped := stats["dropped_total"].(int64) + if dropped < 100 { + t.Errorf("dropped count = %d, want at least 100", dropped) + } + + subscriber.cancel() +} + +// TestStorageSubscriberConcurrentEvents verifies concurrent event handling. +func TestStorageSubscriberConcurrentEvents(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + bus := NewEventBus(100) + subscriber := NewStorageSubscriber(db, bus) + subscriber.Start() + defer subscriber.Stop() + + numPublishers := 10 + eventsPerPublisher := 50 + + var wg sync.WaitGroup + wg.Add(numPublishers) + + // Start multiple publishers + for i := 0; i < numPublishers; i++ { + go func(publisherID int) { + defer wg.Done() + for j := 0; j < eventsPerPublisher; j++ { + payload := MotionDetectedPayload{ + Timestamp: time.Now(), + ZoneID: "zone-1", + ZoneName: "Kitchen", + BlobID: publisherID*eventsPerPublisher + j, + Confidence: 0.85, + } + bus.Publish(payload) + } + }(i) + } + + wg.Wait() + + // Give subscriber time to process all events + time.Sleep(500 * time.Millisecond) + + // Stop subscriber and wait for drain + subscriber.Stop() + + // Count stored events + var count int + err := db.QueryRow("SELECT COUNT(*) FROM events").Scan(&count) + if err != nil { + t.Fatalf("failed to count events: %v", err) + } + + expectedEvents := numPublishers * eventsPerPublisher + if count < expectedEvents { + t.Logf("stored %d events out of %d (some may have been dropped due to queue overflow)", count, expectedEvents) + } else if count != expectedEvents { + t.Errorf("stored %d events, want %d", count, expectedEvents) + } +} + +// TestStorageSubscriberStats verifies the stats method returns correct information. +func TestStorageSubscriberStats(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + bus := NewEventBus(10) + subscriber := NewStorageSubscriber(db, bus) + subscriber.Start() + + // Publish some events + for i := 0; i < 10; i++ { + payload := MotionDetectedPayload{ + Timestamp: time.Now(), + ZoneName: "Kitchen", + BlobID: i, + Confidence: 0.85, + } + bus.Publish(payload) + } + + // Wait for processing + time.Sleep(200 * time.Millisecond) + + stats := subscriber.Stats() + + queueCapacity := stats["queue_capacity"].(int) + + if queueCapacity != bufferSize { + t.Errorf("queue capacity = %d, want %d", queueCapacity, bufferSize) + } + + subscriber.Stop() +} + +// TestStorageSubscriberDrain verifies remaining events are processed on stop. +func TestStorageSubscriberDrain(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + bus := NewEventBus(10) + subscriber := NewStorageSubscriber(db, bus) + subscriber.Start() + + // Publish events + for i := 0; i < 20; i++ { + payload := MotionDetectedPayload{ + Timestamp: time.Now(), + ZoneName: "Kitchen", + BlobID: i, + Confidence: 0.85, + } + bus.Publish(payload) + } + + // Stop without waiting - drain should process remaining events + subscriber.Stop() + + // Verify events were stored + var count int + err := db.QueryRow("SELECT COUNT(*) FROM events").Scan(&count) + if err != nil { + t.Fatalf("failed to count events: %v", err) + } + + if count == 0 { + t.Error("no events stored, drain may have failed") + } +} + +// TestStorageSubscriberNonBlocking verifies publishing never blocks. +func TestStorageSubscriberNonBlocking(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + bus := NewEventBus(10) + subscriber := NewStorageSubscriber(db, bus) + subscriber.Start() + defer subscriber.Stop() + + // Publish many events rapidly - should never block + start := time.Now() + numEvents := 1000 + + for i := 0; i < numEvents; i++ { + payload := MotionDetectedPayload{ + Timestamp: time.Now(), + ZoneName: "Kitchen", + BlobID: i, + Confidence: 0.85, + } + bus.Publish(payload) + } + + elapsed := time.Since(start) + + // Publishing should be very fast (< 100ms for 1000 events) + if elapsed > 100*time.Millisecond { + t.Errorf("publishing %d events took %v, want < 100ms (non-blocking)", numEvents, elapsed) + } + + // Wait for some events to be processed + time.Sleep(500 * time.Millisecond) +} + +// TestStorageSubscriberMultipleSubscribers verifies multiple subscribers work together. +func TestStorageSubscriberMultipleSubscribers(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + bus := NewEventBus(10) + + // Create multiple storage subscribers + sub1 := NewStorageSubscriber(db, bus) + sub2 := NewStorageSubscriber(db, bus) + + sub1.Start() + defer sub1.Stop() + + sub2.Start() + defer sub2.Stop() + + // Publish events + for i := 0; i < 10; i++ { + payload := MotionDetectedPayload{ + Timestamp: time.Now(), + ZoneName: "Kitchen", + BlobID: i, + Confidence: 0.85, + } + bus.Publish(payload) + } + + // Wait for processing + time.Sleep(500 * time.Millisecond) + + // Both subscribers should have processed the events + // Since they write to the same database, we should have 2x the events + // (each subscriber stores independently) + var count int + err := db.QueryRow("SELECT COUNT(*) FROM events").Scan(&count) + if err != nil { + t.Fatalf("failed to count events: %v", err) + } + + // We expect at least 10 events (could be more if both subscribers processed) + if count < 10 { + t.Errorf("expected at least 10 events, got %d", count) + } +}