zai-proxy/dashboard/storage/storage_test.go
jedarden dee82a76a3 chore: update module paths and add evaluation package
- proxy/go.mod: github.com/ardenone/zai-proxy → git.ardenone.com/jedarden/zai-proxy
- dashboard/go.mod: github.com/ardenone/ardenone-cluster/containers/zai-proxy-dashboard → git.ardenone.com/jedarden/zai-proxy/dashboard
- Update all Go import paths in proxy/ and dashboard/ to match new module paths
- Add proxy/evaluation/ package (was missing from initial commit)
- Add docs/plan/plan.md with architecture, security model, telemetry design, and migration checklist

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-16 16:03:50 -04:00

400 lines
9 KiB
Go

package storage
import (
"database/sql"
"path/filepath"
"testing"
"time"
_ "modernc.org/sqlite"
"git.ardenone.com/jedarden/zai-proxy/dashboard/model"
)
func TestStorage_WriteAndRead(t *testing.T) {
// Create temporary database
tmpDir := t.TempDir()
dbPath := filepath.Join(tmpDir, "test.db")
config := Config{
DBPath: dbPath,
Retention5s: 24 * time.Hour,
Retention1m: 7 * 24 * time.Hour,
}
store, err := NewStorage(config)
if err != nil {
t.Fatalf("failed to create storage: %v", err)
}
defer store.Close()
// Write a snapshot
snapshot := &model.MetricSnapshot{
Timestamp: time.Now().UnixMilli(),
Variant: "production",
Requests2xx: 100,
Requests4xx: 10,
Requests5xx: 5,
ReqRate: 2.5,
LatencyP50: 150.0,
WorkerUtilization: 0.75,
}
store.Write(snapshot)
// Wait for write to complete
time.Sleep(100 * time.Millisecond)
// Read it back
snapshots, err := store.QueryRange(1*time.Hour, "production")
if err != nil {
t.Fatalf("failed to query: %v", err)
}
if len(snapshots) != 1 {
t.Fatalf("expected 1 snapshot, got %d", len(snapshots))
}
read := snapshots[0]
if read.Variant != "production" {
t.Errorf("variant mismatch: got %s", read.Variant)
}
if read.Requests2xx != 100 {
t.Errorf("requests_2xx mismatch: got %f", read.Requests2xx)
}
if read.ReqRate != 2.5 {
t.Errorf("req_rate mismatch: got %f", read.ReqRate)
}
}
func TestStorage_RangeQuery(t *testing.T) {
tmpDir := t.TempDir()
dbPath := filepath.Join(tmpDir, "test.db")
config := Config{
DBPath: dbPath,
Retention5s: 24 * time.Hour,
Retention1m: 7 * 24 * time.Hour,
}
store, err := NewStorage(config)
if err != nil {
t.Fatalf("failed to create storage: %v", err)
}
defer store.Close()
now := time.Now()
// Write multiple snapshots at different times
for i := 0; i < 10; i++ {
snapshot := &model.MetricSnapshot{
Timestamp: now.Add(-time.Duration(i) * time.Minute).UnixMilli(),
Variant: "production",
ReqRate: float64(10 - i),
}
store.Write(snapshot)
}
time.Sleep(100 * time.Millisecond)
// Query for last 5 minutes
snapshots, err := store.QueryRange(5*time.Minute, "production")
if err != nil {
t.Fatalf("failed to query: %v", err)
}
// Should get snapshots from last 5 minutes (0-4 inclusive)
if len(snapshots) < 5 {
t.Errorf("expected at least 5 snapshots, got %d", len(snapshots))
}
}
func TestStorage_VariantFilter(t *testing.T) {
tmpDir := t.TempDir()
dbPath := filepath.Join(tmpDir, "test.db")
config := Config{
DBPath: dbPath,
Retention5s: 24 * time.Hour,
Retention1m: 7 * 24 * time.Hour,
}
store, err := NewStorage(config)
if err != nil {
t.Fatalf("failed to create storage: %v", err)
}
defer store.Close()
now := time.Now()
// Write snapshots for both variants
store.Write(&model.MetricSnapshot{
Timestamp: now.UnixMilli(),
Variant: "production",
ReqRate: 100,
})
store.Write(&model.MetricSnapshot{
Timestamp: now.UnixMilli(),
Variant: "canary",
ReqRate: 50,
})
time.Sleep(100 * time.Millisecond)
// Query only production
prodSnapshots, err := store.QueryRange(1*time.Hour, "production")
if err != nil {
t.Fatalf("failed to query: %v", err)
}
for _, s := range prodSnapshots {
if s.Variant != "production" {
t.Errorf("expected only production variant, got %s", s.Variant)
}
}
// Query all variants
allSnapshots, err := store.QueryRange(1*time.Hour, "all")
if err != nil {
t.Fatalf("failed to query: %v", err)
}
if len(allSnapshots) < 2 {
t.Errorf("expected at least 2 snapshots for all variants, got %d", len(allSnapshots))
}
}
func TestStorage_GetLatest(t *testing.T) {
tmpDir := t.TempDir()
dbPath := filepath.Join(tmpDir, "test.db")
config := Config{
DBPath: dbPath,
Retention5s: 24 * time.Hour,
Retention1m: 7 * 24 * time.Hour,
}
store, err := NewStorage(config)
if err != nil {
t.Fatalf("failed to create storage: %v", err)
}
defer store.Close()
now := time.Now()
// Write multiple snapshots with different timestamps
store.Write(&model.MetricSnapshot{
Timestamp: now.Add(-2 * time.Minute).UnixMilli(),
Variant: "production",
ReqRate: 100,
})
store.Write(&model.MetricSnapshot{
Timestamp: now.Add(-1 * time.Minute).UnixMilli(),
Variant: "production",
ReqRate: 200,
})
store.Write(&model.MetricSnapshot{
Timestamp: now.UnixMilli(),
Variant: "production",
ReqRate: 300,
})
time.Sleep(100 * time.Millisecond)
latest, err := store.GetLatest()
if err != nil {
t.Fatalf("failed to get latest: %v", err)
}
prod, ok := latest["production"]
if !ok {
t.Fatal("production variant not found")
}
if prod.ReqRate != 300 {
t.Errorf("expected latest req_rate 300, got %f", prod.ReqRate)
}
}
func TestStorage_Downsampling(t *testing.T) {
tmpDir := t.TempDir()
dbPath := filepath.Join(tmpDir, "test.db")
config := Config{
DBPath: dbPath,
Retention5s: 24 * time.Hour,
Retention1m: 7 * 24 * time.Hour,
}
store, err := NewStorage(config)
if err != nil {
t.Fatalf("failed to create storage: %v", err)
}
defer store.Close()
// Write snapshots that span multiple minutes
now := time.Now()
for i := 0; i < 120; i++ {
snapshot := &model.MetricSnapshot{
Timestamp: now.Add(-time.Duration(i) * time.Second).UnixMilli(),
Variant: "production",
ReqRate: float64(i),
}
store.Write(snapshot)
}
// Trigger downsampling manually
time.Sleep(100 * time.Millisecond)
if err := store.Downsample(); err != nil {
t.Fatalf("downsampling failed: %v", err)
}
// Query from 1m table
end := time.Now()
start := end.Add(-2 * time.Minute)
snapshots, err := store.Query(start, end, "production", true)
if err != nil {
t.Fatalf("failed to query 1m table: %v", err)
}
// Should have downsampled data
if len(snapshots) < 1 {
t.Error("expected at least 1 downsampled snapshot")
}
}
func TestStorage_Retention(t *testing.T) {
tmpDir := t.TempDir()
dbPath := filepath.Join(tmpDir, "test.db")
config := Config{
DBPath: dbPath,
Retention5s: 1 * time.Hour,
Retention1m: 24 * time.Hour,
}
store, err := NewStorage(config)
if err != nil {
t.Fatalf("failed to create storage: %v", err)
}
defer store.Close()
now := time.Now()
// Write old snapshot (should be deleted)
oldSnapshot := &model.MetricSnapshot{
Timestamp: now.Add(-2 * time.Hour).UnixMilli(),
Variant: "production",
ReqRate: 50,
}
store.Write(oldSnapshot)
// Write recent snapshot (should be kept)
recentSnapshot := &model.MetricSnapshot{
Timestamp: now.Add(-30 * time.Minute).UnixMilli(),
Variant: "production",
ReqRate: 100,
}
store.Write(recentSnapshot)
time.Sleep(100 * time.Millisecond)
// Trigger cleanup
if err := store.Cleanup(); err != nil {
t.Fatalf("cleanup failed: %v", err)
}
// Query all - should only have recent snapshot
snapshots, err := store.QueryRange(3*time.Hour, "production")
if err != nil {
t.Fatalf("failed to query: %v", err)
}
if len(snapshots) != 1 {
t.Errorf("expected 1 snapshot after cleanup, got %d", len(snapshots))
}
}
func TestStorage_ConcurrentWrites(t *testing.T) {
tmpDir := t.TempDir()
dbPath := filepath.Join(tmpDir, "test.db")
config := Config{
DBPath: dbPath,
Retention5s: 24 * time.Hour,
Retention1m: 7 * 24 * time.Hour,
}
store, err := NewStorage(config)
if err != nil {
t.Fatalf("failed to create storage: %v", err)
}
defer store.Close()
// Write many snapshots concurrently
done := make(chan bool)
for i := 0; i < 100; i++ {
go func(idx int) {
snapshot := &model.MetricSnapshot{
Timestamp: time.Now().Add(time.Duration(idx) * time.Second).UnixMilli(),
Variant: "production",
ReqRate: float64(idx),
}
store.Write(snapshot)
done <- true
}(i)
}
// Wait for all writes
for i := 0; i < 100; i++ {
<-done
}
time.Sleep(500 * time.Millisecond)
// Should have all snapshots
snapshots, err := store.QueryRange(1*time.Hour, "production")
if err != nil {
t.Fatalf("failed to query: %v", err)
}
if len(snapshots) < 100 {
t.Errorf("expected at least 100 snapshots, got %d", len(snapshots))
}
}
func TestSchema_Initialize(t *testing.T) {
tmpDir := t.TempDir()
dbPath := filepath.Join(tmpDir, "test.db")
// Create database manually
db, err := sql.Open("sqlite", dbPath)
if err != nil {
t.Fatalf("failed to open database: %v", err)
}
defer db.Close()
schema := NewSchema(db)
if err := schema.Initialize(); err != nil {
t.Fatalf("failed to initialize schema: %v", err)
}
// Verify tables exist
var count int
err = db.QueryRow(`SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name IN ('metrics_5s', 'metrics_1m')`).Scan(&count)
if err != nil {
t.Fatalf("failed to query tables: %v", err)
}
if count != 2 {
t.Errorf("expected 2 tables, got %d", count)
}
// Verify indexes exist
err = db.QueryRow(`SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name IN ('idx_5s_ts', 'idx_1m_ts')`).Scan(&count)
if err != nil {
t.Fatalf("failed to query indexes: %v", err)
}
if count != 2 {
t.Errorf("expected 2 indexes, got %d", count)
}
}