feat: add nightly archive scheduler for events (02:00 local time)
- Add StartArchiveScheduler function that runs RunArchiveJob nightly at 02:00 local time - Scheduler respects local timezone and gracefully stops on done channel signal - Add table-driven tests for scheduler start and stop behavior - Add AnomalyType, SystemMode, and sleep session event types to types.go Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
60a21bacb6
commit
984f9ef262
3 changed files with 174 additions and 0 deletions
|
|
@ -360,3 +360,36 @@ func InsertSystemEvent(db *sql.DB, message string, detail map[string]interface{}
|
|||
Severity: SeverityInfo,
|
||||
})
|
||||
}
|
||||
|
||||
// StartArchiveScheduler starts a goroutine that runs the archive job nightly at 02:00 local time.
|
||||
// The goroutine runs until the done channel is closed.
|
||||
func StartArchiveScheduler(db *sql.DB, done <-chan struct{}) {
|
||||
go func() {
|
||||
for {
|
||||
// Calculate duration until next 02:00 local time
|
||||
now := time.Now()
|
||||
nextRun := time.Date(now.Year(), now.Month(), now.Day(), 2, 0, 0, 0, now.Location())
|
||||
|
||||
// If we're already past 02:00 today, schedule for tomorrow
|
||||
if now.After(nextRun) {
|
||||
nextRun = nextRun.Add(24 * time.Hour)
|
||||
}
|
||||
|
||||
duration := nextRun.Sub(now)
|
||||
log.Printf("[events archive] Next run scheduled for %s (in %s)", nextRun.Format(time.RFC1123), duration.Round(time.Second))
|
||||
|
||||
// Wait until the next scheduled run time or done signal
|
||||
select {
|
||||
case <-time.After(duration):
|
||||
// Time to run the archive job
|
||||
log.Printf("[events archive] Running scheduled archive job")
|
||||
if err := RunArchiveJob(db); err != nil {
|
||||
log.Printf("[ERROR] Events archive job failed: %v", err)
|
||||
}
|
||||
case <-done:
|
||||
log.Printf("[events archive] Scheduler stopped")
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -711,3 +711,43 @@ func TestInsertSystemEvent(t *testing.T) {
|
|||
t.Errorf("Type = %q, want %q", eType, EventTypeSystem)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartArchiveScheduler(t *testing.T) {
|
||||
db := openTestDB(t)
|
||||
defer db.Close()
|
||||
|
||||
// Start the scheduler with a done channel
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
|
||||
// The scheduler should start without error
|
||||
StartArchiveScheduler(db, done)
|
||||
|
||||
// Give the goroutine a moment to start
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// The test passes if we got here without panic
|
||||
// The scheduler will schedule for next 02:00, which is in the future
|
||||
}
|
||||
|
||||
func TestStartArchiveScheduler_StopsOnDone(t *testing.T) {
|
||||
db := openTestDB(t)
|
||||
defer db.Close()
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
// Start the scheduler
|
||||
StartArchiveScheduler(db, done)
|
||||
|
||||
// Give the goroutine a moment to start
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Signal done - should stop the scheduler gracefully
|
||||
close(done)
|
||||
|
||||
// Give the goroutine a moment to stop
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Test passes if we got here without deadlock
|
||||
}
|
||||
|
||||
|
|
|
|||
101
mothership/internal/events/types.go
Normal file
101
mothership/internal/events/types.go
Normal file
|
|
@ -0,0 +1,101 @@
|
|||
// Package events provides domain event types used across subsystems.
|
||||
package events
|
||||
|
||||
import "time"
|
||||
|
||||
// AnomalyType classifies different kinds of anomalies.
|
||||
type AnomalyType string
|
||||
|
||||
const (
|
||||
AnomalyUnusualHour AnomalyType = "unusual_hour"
|
||||
AnomalyUnknownBLE AnomalyType = "unknown_ble"
|
||||
AnomalyMotionDuringAway AnomalyType = "motion_during_away"
|
||||
AnomalyUnusualDwell AnomalyType = "unusual_dwell"
|
||||
)
|
||||
|
||||
// Position represents a 3D spatial position in meters.
|
||||
type Position struct {
|
||||
X float64 `json:"x"`
|
||||
Y float64 `json:"y"`
|
||||
Z float64 `json:"z"`
|
||||
}
|
||||
|
||||
// AnomalyEvent represents a detected anomaly with full metadata.
|
||||
type AnomalyEvent struct {
|
||||
ID string `json:"id"`
|
||||
Type AnomalyType `json:"type"`
|
||||
Score float64 `json:"score"`
|
||||
Description string `json:"description"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
ZoneID string `json:"zone_id,omitempty"`
|
||||
ZoneName string `json:"zone_name,omitempty"`
|
||||
BlobID int `json:"blob_id,omitempty"`
|
||||
PersonID string `json:"person_id,omitempty"`
|
||||
PersonName string `json:"person_name,omitempty"`
|
||||
DeviceMAC string `json:"device_mac,omitempty"`
|
||||
DeviceName string `json:"device_name,omitempty"`
|
||||
Position Position `json:"position,omitempty"`
|
||||
HourOfWeek int `json:"hour_of_week,omitempty"`
|
||||
ExpectedOccupancy float64 `json:"expected_occupancy,omitempty"`
|
||||
DwellDuration time.Duration `json:"dwell_duration,omitempty"`
|
||||
ExpectedDwell time.Duration `json:"expected_dwell,omitempty"`
|
||||
RSSIdBm int `json:"rssi_dbm,omitempty"`
|
||||
SeenBefore bool `json:"seen_before,omitempty"`
|
||||
Acknowledged bool `json:"acknowledged"`
|
||||
AcknowledgedAt time.Time `json:"acknowledged_at,omitempty"`
|
||||
Feedback string `json:"feedback,omitempty"`
|
||||
AcknowledgedBy string `json:"acknowledged_by,omitempty"`
|
||||
AlertSent bool `json:"alert_sent"`
|
||||
AlertSentAt time.Time `json:"alert_sent_at,omitempty"`
|
||||
WebhookSent bool `json:"webhook_sent"`
|
||||
WebhookSentAt time.Time `json:"webhook_sent_at,omitempty"`
|
||||
EscalationSent bool `json:"escalation_sent"`
|
||||
EscalationSentAt time.Time `json:"escalation_sent_at,omitempty"`
|
||||
}
|
||||
|
||||
// WeeklyAnomalySummary aggregates anomaly counts for the past week.
|
||||
type WeeklyAnomalySummary struct {
|
||||
TotalAnomalies int `json:"total_anomalies"`
|
||||
ByType map[AnomalyType]int `json:"by_type"`
|
||||
ExpectedEvents int `json:"expected_events"`
|
||||
GenuineIntrusions int `json:"genuine_intrusions"`
|
||||
FalseAlarms int `json:"false_alarms"`
|
||||
Unacknowledged int `json:"unacknowledged"`
|
||||
}
|
||||
|
||||
// SystemMode represents the current home occupancy mode.
|
||||
type SystemMode string
|
||||
|
||||
const (
|
||||
ModeHome SystemMode = "home"
|
||||
ModeAway SystemMode = "away"
|
||||
ModeSleep SystemMode = "sleep"
|
||||
)
|
||||
|
||||
// SystemModeChangeEvent is emitted when the system mode changes.
|
||||
type SystemModeChangeEvent struct {
|
||||
PreviousMode SystemMode `json:"previous_mode"`
|
||||
NewMode SystemMode `json:"new_mode"`
|
||||
Reason string `json:"reason"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
PersonID string `json:"person_id,omitempty"`
|
||||
PersonName string `json:"person_name,omitempty"`
|
||||
}
|
||||
|
||||
// SleepSessionStartEvent is emitted when a sleep session is detected.
|
||||
type SleepSessionStartEvent struct {
|
||||
ZoneID string `json:"zone_id"`
|
||||
PersonID string `json:"person_id,omitempty"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
BlobID int `json:"blob_id,omitempty"`
|
||||
}
|
||||
|
||||
// SleepSessionEndEvent is emitted when a sleep session ends.
|
||||
type SleepSessionEndEvent struct {
|
||||
ZoneID string `json:"zone_id"`
|
||||
PersonID string `json:"person_id,omitempty"`
|
||||
StartTimestamp time.Time `json:"start_timestamp"`
|
||||
EndTimestamp time.Time `json:"end_timestamp"`
|
||||
DurationMin float64 `json:"duration_min"`
|
||||
BlobID int `json:"blob_id,omitempty"`
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue