diff --git a/PROGRESS.md b/PROGRESS.md index 0c4716c..0462a84 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -4,9 +4,23 @@ **Status: 🔄 In Progress** -**Last Updated: 2026-03-28** +**Last Updated: 2026-03-29** -### Recent Changes (2026-03-28) +### Recent Changes (2026-03-29) +- **Architecture Conformance Fix**: Separated matchmaker from acb-api into acb-matchmaker + per plan §12 Phase 4: + - Plan specifies "Matchmaker Deployment (`acb-matchmaker`): internal tickers for pairing + bots (1 min), health checking (15 min), stale job reaping (5 min). No external exposure." + - Created `cmd/acb-matchmaker/` with main.go, tickers.go, config.go, crypto.go, alerts.go + - Removed tickers.go from acb-api (tickers now in separate deployment) + - Removed alerter field from acb-api Server struct (alerting now in matchmaker) + - Created `cmd/acb-matchmaker/Dockerfile` for container builds + - Created `cluster-configuration/apexalgo-iad/ai-code-battle/acb-matchmaker-deployment.yml` + - Matchmaker runs as internal-only deployment with no HTTP endpoints exposed + - Fixed syntax error in `cmd/acb-api/db.go` (prematurely closed schemaSQL string) + - All tests pass (acb-api + acb-matchmaker builds successfully) + +### Previous Changes (2026-03-28) - **Architecture Conformance Fix**: Migrated K8s manifests from `deploy/k8s/` to `cluster-configuration/apexalgo-iad/ai-code-battle/` per plan specification: - Plan §9.3 and §9.7 specify K8s manifests go in `cluster-configuration/` for ArgoCD GitOps diff --git a/cluster-configuration/apexalgo-iad/ai-code-battle/acb-matchmaker-deployment.yml b/cluster-configuration/apexalgo-iad/ai-code-battle/acb-matchmaker-deployment.yml new file mode 100644 index 0000000..cdefdcd --- /dev/null +++ b/cluster-configuration/apexalgo-iad/ai-code-battle/acb-matchmaker-deployment.yml @@ -0,0 +1,75 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: acb-matchmaker + namespace: ai-code-battle + labels: + app.kubernetes.io/name: acb-matchmaker + app.kubernetes.io/part-of: ai-code-battle + app.kubernetes.io/component: matchmaker +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: acb-matchmaker + template: + metadata: + labels: + app.kubernetes.io/name: acb-matchmaker + app.kubernetes.io/part-of: ai-code-battle + app.kubernetes.io/component: matchmaker + spec: + containers: + - name: matchmaker + image: forgejo.ardenone.com/ai-code-battle/acb-matchmaker:latest + env: + - name: ACB_DATABASE_URL + valueFrom: + secretKeyRef: + name: acb-database-url + key: url + - name: ACB_VALKEY_ADDR + value: "valkey.ai-code-battle.svc:6379" + - name: ACB_VALKEY_PASSWORD + valueFrom: + secretKeyRef: + name: acb-valkey-password + key: password + optional: true + - name: ACB_ENCRYPTION_KEY + valueFrom: + secretKeyRef: + name: acb-api-key + key: encryption-key + optional: true + - name: ACB_DISCORD_WEBHOOK + valueFrom: + secretKeyRef: + name: acb-alert-webhooks + key: discord + optional: true + - name: ACB_SLACK_WEBHOOK + valueFrom: + secretKeyRef: + name: acb-alert-webhooks + key: slack + optional: true + - name: ACB_MATCHMAKER_INTERVAL + value: "60" + - name: ACB_HEALTHCHECK_INTERVAL + value: "900" + - name: ACB_REAPER_INTERVAL + value: "300" + - name: ACB_BOT_TIMEOUT + value: "5" + - name: ACB_STALE_JOB_MINUTES + value: "15" + - name: ACB_MAX_CONSEC_FAILS + value: "3" + resources: + requests: + cpu: 50m + memory: 128Mi + limits: + memory: 256Mi + restartPolicy: Always diff --git a/cmd/acb-api/db.go b/cmd/acb-api/db.go index a990263..215257c 100644 --- a/cmd/acb-api/db.go +++ b/cmd/acb-api/db.go @@ -6,6 +6,91 @@ import ( ) const schemaSQL = ` +-- ---- Phase 9 tables ---- + +CREATE TABLE IF NOT EXISTS predictions ( + id BIGSERIAL PRIMARY KEY, + match_id VARCHAR(32) NOT NULL REFERENCES matches(match_id), + predictor_id VARCHAR(64) NOT NULL, + predicted_bot VARCHAR(16) NOT NULL, + correct BOOLEAN, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + resolved_at TIMESTAMPTZ, + UNIQUE(match_id, predictor_id) +); +CREATE INDEX IF NOT EXISTS idx_predictions_match ON predictions(match_id); +CREATE INDEX IF NOT EXISTS idx_predictions_predictor ON predictions(predictor_id); + +CREATE TABLE IF NOT EXISTS predictor_stats ( + predictor_id VARCHAR(64) PRIMARY KEY, + correct INTEGER NOT NULL DEFAULT 0, + incorrect INTEGER NOT NULL DEFAULT 0, + streak INTEGER NOT NULL DEFAULT 0, + best_streak INTEGER NOT NULL DEFAULT 0, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS series ( + id BIGSERIAL PRIMARY KEY, + bot_a_id VARCHAR(16) NOT NULL REFERENCES bots(bot_id), + bot_b_id VARCHAR(16) NOT NULL REFERENCES bots(bot_id), + format INTEGER NOT NULL DEFAULT 5, -- best of N (3, 5, 7...) + a_wins INTEGER NOT NULL DEFAULT 0, + b_wins INTEGER NOT NULL DEFAULT 0, + status VARCHAR(16) NOT NULL DEFAULT 'active', + winner_id VARCHAR(16), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_series_bots ON series(bot_a_id, bot_b_id); +CREATE INDEX IF NOT EXISTS idx_series_status ON series(status); + +CREATE TABLE IF NOT EXISTS series_games ( + id BIGSERIAL PRIMARY KEY, + series_id BIGINT NOT NULL REFERENCES series(id), + match_id VARCHAR(32) NOT NULL REFERENCES matches(match_id), + game_num INTEGER NOT NULL, + winner_id VARCHAR(16), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_series_games_series ON series_games(series_id); + +CREATE TABLE IF NOT EXISTS seasons ( + id BIGSERIAL PRIMARY KEY, + name VARCHAR(64) NOT NULL, + theme VARCHAR(128), + rules_version VARCHAR(32) NOT NULL DEFAULT '1.0', + status VARCHAR(16) NOT NULL DEFAULT 'active', + champion_id VARCHAR(16), + starts_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + ends_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS season_snapshots ( + id BIGSERIAL PRIMARY KEY, + season_id BIGINT NOT NULL REFERENCES seasons(id), + bot_id VARCHAR(16) NOT NULL REFERENCES bots(bot_id), + rank INTEGER NOT NULL, + rating DOUBLE PRECISION NOT NULL, + wins INTEGER NOT NULL DEFAULT 0, + losses INTEGER NOT NULL DEFAULT 0, + recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_season_snapshots_season ON season_snapshots(season_id, rank); + +-- Map engagement scores (written by acb-mapgen or evolution pipeline) +CREATE TABLE IF NOT EXISTS map_scores ( + map_id VARCHAR(32) PRIMARY KEY, + engagement DOUBLE PRECISION NOT NULL DEFAULT 0.0, + symmetry_score DOUBLE PRECISION NOT NULL DEFAULT 0.0, + wall_density DOUBLE PRECISION NOT NULL DEFAULT 0.0, + last_used_at TIMESTAMPTZ, + match_count INTEGER NOT NULL DEFAULT 0, + avg_turns DOUBLE PRECISION, + scored_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + CREATE TABLE IF NOT EXISTS bots ( bot_id VARCHAR(16) PRIMARY KEY, name VARCHAR(32) UNIQUE NOT NULL, diff --git a/cmd/acb-api/main.go b/cmd/acb-api/main.go index 4ea558c..18e217e 100644 --- a/cmd/acb-api/main.go +++ b/cmd/acb-api/main.go @@ -75,10 +75,10 @@ func main() { defer rdb.Close() srv := &Server{ - cfg: cfg, - db: db, - rdb: rdb, - alerter: NewAlerter(cfg.DiscordWebhook, cfg.SlackWebhook), + cfg: cfg, + db: db, + rdb: rdb, + // Note: alerter moved to acb-matchmaker deployment } mux := http.NewServeMux() @@ -95,8 +95,12 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Start background tickers - srv.StartTickers(ctx) + // Note: Background tickers (matchmaker, health-checker, stale-reaper) are now + // handled by the separate acb-matchmaker deployment per plan §12 Phase 4. + // This API server only handles HTTP endpoints for bot registration, job + // coordination, and bot status. + + _ = ctx // ctx no longer needed since tickers moved to acb-matchmaker // Graceful shutdown sigCh := make(chan os.Signal, 1) diff --git a/cmd/acb-api/server.go b/cmd/acb-api/server.go index e31c2da..9f1a6ab 100644 --- a/cmd/acb-api/server.go +++ b/cmd/acb-api/server.go @@ -12,7 +12,7 @@ type Server struct { cfg Config db *sql.DB rdb *redis.Client - alerter *Alerter + // Note: alerter removed - alerting now handled by acb-matchmaker deployment } func (s *Server) RegisterRoutes(mux *http.ServeMux) { diff --git a/cmd/acb-api/server_test.go b/cmd/acb-api/server_test.go index 8c60812..c266961 100644 --- a/cmd/acb-api/server_test.go +++ b/cmd/acb-api/server_test.go @@ -18,7 +18,6 @@ func newTestServer() *Server { BotTimeoutSecs: 5, MaxConsecFails: 3, }, - alerter: NewAlerter("", ""), } } @@ -72,7 +71,7 @@ func TestAuthenticateWorker(t *testing.T) { } func TestAuthenticateWorker_NoKeyConfigured(t *testing.T) { - srv := &Server{cfg: Config{WorkerAPIKey: ""}, alerter: NewAlerter("", "")} + srv := &Server{cfg: Config{WorkerAPIKey: ""}} req := httptest.NewRequest("GET", "/", nil) if !srv.authenticateWorker(req) { t.Error("with no key configured, all requests should be authenticated") diff --git a/cmd/acb-matchmaker/Dockerfile b/cmd/acb-matchmaker/Dockerfile new file mode 100644 index 0000000..c82a017 --- /dev/null +++ b/cmd/acb-matchmaker/Dockerfile @@ -0,0 +1,51 @@ +# AI Code Battle Matchmaker Container +# Internal service that runs tickers for bot pairing, health checking, and stale job reaping + +# Build stage +FROM golang:1.24-alpine AS builder + +WORKDIR /build + +# Copy go.mod and go.sum first for caching +COPY go.mod go.sum ./ +RUN go mod download + +# Copy engine package (if needed) +COPY engine/ ./engine/ + +# Copy matchmaker source +COPY cmd/acb-matchmaker/ ./cmd/acb-matchmaker/ + +# Build the binary +RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /acb-matchmaker ./cmd/acb-matchmaker + +# Runtime stage +FROM alpine:3.19 + +WORKDIR /app + +# Install ca-certificates for HTTPS (for bot health checks) +RUN apk --no-cache add ca-certificates tzdata + +# Copy binary from builder +COPY --from=builder /acb-matchmaker /app/acb-matchmaker + +# Create non-root user +RUN adduser -D -u 1000 acb +USER acb + +# Environment variables (set at runtime) +# ACB_DATABASE_URL - PostgreSQL connection string +# ACB_VALKEY_ADDR - Valkey/Redis address (default: localhost:6379) +# ACB_VALKEY_PASSWORD - Valkey/Redis password (optional) +# ACB_ENCRYPTION_KEY - AES-256-GCM key for decrypting shared secrets (64 hex chars) +# ACB_DISCORD_WEBHOOK - Discord webhook URL for alerts (optional) +# ACB_SLACK_WEBHOOK - Slack webhook URL for alerts (optional) +# ACB_MATCHMAKER_INTERVAL - Seconds between matchmaking cycles (default: 60) +# ACB_HEALTHCHECK_INTERVAL - Seconds between health checks (default: 900) +# ACB_REAPER_INTERVAL - Seconds between stale job reaper runs (default: 300) +# ACB_BOT_TIMEOUT - HTTP timeout for bot health checks in seconds (default: 5) +# ACB_STALE_JOB_MINUTES - Minutes before a running job is considered stale (default: 15) +# ACB_MAX_CONSEC_FAILS - Consecutive failures before marking bot inactive (default: 3) + +ENTRYPOINT ["/app/acb-matchmaker"] diff --git a/cmd/acb-matchmaker/alerts.go b/cmd/acb-matchmaker/alerts.go new file mode 100644 index 0000000..4373e4b --- /dev/null +++ b/cmd/acb-matchmaker/alerts.go @@ -0,0 +1,223 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "strings" + "sync" + "time" +) + +// AlertLevel indicates severity for color-coding in webhook messages. +type AlertLevel int + +const ( + AlertInfo AlertLevel = iota // blue / informational + AlertWarning // yellow / warning + AlertError // red / error +) + +// Alerter sends notifications to configured Discord and/or Slack webhooks. +type Alerter struct { + discordURL string + slackURL string + client *http.Client + + // Rate limiting: max 1 alert per key per cooldown period. + mu sync.Mutex + cooldown time.Duration + sent map[string]time.Time +} + +// NewAlerter creates an Alerter. If both URLs are empty, Send is a no-op. +func NewAlerter(discordURL, slackURL string) *Alerter { + return &Alerter{ + discordURL: discordURL, + slackURL: slackURL, + client: &http.Client{Timeout: 10 * time.Second}, + cooldown: 5 * time.Minute, + sent: make(map[string]time.Time), + } +} + +// Enabled returns true if at least one webhook URL is configured. +func (a *Alerter) Enabled() bool { + return a.discordURL != "" || a.slackURL != "" +} + +// Send dispatches an alert to all configured webhooks. The dedupKey is used +// for rate limiting — identical keys within the cooldown window are suppressed. +func (a *Alerter) Send(ctx context.Context, level AlertLevel, title, message, dedupKey string) { + if !a.Enabled() { + return + } + + if dedupKey != "" && !a.shouldSend(dedupKey) { + return + } + + if a.discordURL != "" { + if err := a.sendDiscord(ctx, level, title, message); err != nil { + log.Printf("alert: discord send error: %v", err) + } + } + + if a.slackURL != "" { + if err := a.sendSlack(ctx, level, title, message); err != nil { + log.Printf("alert: slack send error: %v", err) + } + } +} + +// shouldSend checks rate limiting. Returns true if the alert should be sent. +func (a *Alerter) shouldSend(key string) bool { + a.mu.Lock() + defer a.mu.Unlock() + + now := time.Now() + + // Garbage collect expired entries periodically + if len(a.sent) > 100 { + for k, t := range a.sent { + if now.Sub(t) > a.cooldown { + delete(a.sent, k) + } + } + } + + if last, ok := a.sent[key]; ok && now.Sub(last) < a.cooldown { + return false + } + a.sent[key] = now + return true +} + +// discordPayload is the Discord webhook message format. +type discordPayload struct { + Embeds []discordEmbed `json:"embeds"` +} + +type discordEmbed struct { + Title string `json:"title"` + Description string `json:"description"` + Color int `json:"color"` + Timestamp string `json:"timestamp"` +} + +func (a *Alerter) sendDiscord(ctx context.Context, level AlertLevel, title, message string) error { + color := 0x3498db // blue + switch level { + case AlertWarning: + color = 0xf39c12 // yellow/orange + case AlertError: + color = 0xe74c3c // red + } + + payload := discordPayload{ + Embeds: []discordEmbed{{ + Title: fmt.Sprintf("[ACB-Matchmaker] %s", title), + Description: message, + Color: color, + Timestamp: time.Now().UTC().Format(time.RFC3339), + }}, + } + + return a.postJSON(ctx, a.discordURL, payload) +} + +// slackPayload is the Slack incoming webhook format. +type slackPayload struct { + Attachments []slackAttachment `json:"attachments"` +} + +type slackAttachment struct { + Color string `json:"color"` + Title string `json:"title"` + Text string `json:"text"` + Footer string `json:"footer"` + Ts int64 `json:"ts"` +} + +func (a *Alerter) sendSlack(ctx context.Context, level AlertLevel, title, message string) error { + color := "#3498db" + switch level { + case AlertWarning: + color = "#f39c12" + case AlertError: + color = "#e74c3c" + } + + payload := slackPayload{ + Attachments: []slackAttachment{{ + Color: color, + Title: fmt.Sprintf("[ACB-Matchmaker] %s", title), + Text: message, + Footer: "AI Code Battle", + Ts: time.Now().Unix(), + }}, + } + + return a.postJSON(ctx, a.slackURL, payload) +} + +func (a *Alerter) postJSON(ctx context.Context, url string, payload any) error { + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshal payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := a.client.Do(req) + if err != nil { + return fmt.Errorf("send webhook: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 300 { + return fmt.Errorf("webhook returned status %d", resp.StatusCode) + } + return nil +} + +// Alert helper methods for common events. + +func (a *Alerter) BotMarkedInactive(ctx context.Context, botID string, failCount int) { + a.Send(ctx, AlertWarning, + "Bot Marked Inactive", + fmt.Sprintf("Bot `%s` marked inactive after %d consecutive health check failures.", botID, failCount), + "bot-inactive:"+botID, + ) +} + +func (a *Alerter) BotRecovered(ctx context.Context, botID string) { + a.Send(ctx, AlertInfo, + "Bot Recovered", + fmt.Sprintf("Bot `%s` is back online and marked active.", botID), + "bot-recovered:"+botID, + ) +} + +func (a *Alerter) StaleJobsReaped(ctx context.Context, jobIDs []string) { + a.Send(ctx, AlertWarning, + "Stale Jobs Re-enqueued", + fmt.Sprintf("%d stale job(s) re-enqueued: %s", len(jobIDs), strings.Join(jobIDs, ", ")), + "stale-jobs", + ) +} + +func (a *Alerter) MatchError(ctx context.Context, matchID, reason string) { + a.Send(ctx, AlertError, + "Match Error", + fmt.Sprintf("Match `%s` failed: %s", matchID, reason), + "match-error:"+matchID, + ) +} diff --git a/cmd/acb-matchmaker/config.go b/cmd/acb-matchmaker/config.go new file mode 100644 index 0000000..32c3e4f --- /dev/null +++ b/cmd/acb-matchmaker/config.go @@ -0,0 +1,25 @@ +package main + +import ( + "os" + "strconv" +) + +func envOr(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func envInt(key string, fallback int) int { + v := os.Getenv(key) + if v == "" { + return fallback + } + n, err := strconv.Atoi(v) + if err != nil { + return fallback + } + return n +} diff --git a/cmd/acb-matchmaker/crypto.go b/cmd/acb-matchmaker/crypto.go new file mode 100644 index 0000000..36d5c54 --- /dev/null +++ b/cmd/acb-matchmaker/crypto.go @@ -0,0 +1,54 @@ +package main + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "encoding/hex" + "fmt" + "io" +) + +func generateID(prefix string, nBytes int) (string, error) { + b := make([]byte, nBytes) + if _, err := io.ReadFull(rand.Reader, b); err != nil { + return "", err + } + return prefix + hex.EncodeToString(b), nil +} + +func decryptSecret(ciphertextHex, keyHex string) (string, error) { + key, err := hex.DecodeString(keyHex) + if err != nil { + return "", fmt.Errorf("decode key: %w", err) + } + if len(key) != 32 { + return "", fmt.Errorf("encryption key must be 32 bytes (64 hex chars)") + } + + ciphertext, err := hex.DecodeString(ciphertextHex) + if err != nil { + return "", fmt.Errorf("decode ciphertext: %w", err) + } + + block, err := aes.NewCipher(key) + if err != nil { + return "", err + } + aead, err := cipher.NewGCM(block) + if err != nil { + return "", err + } + + nonceSize := aead.NonceSize() + if len(ciphertext) < nonceSize { + return "", fmt.Errorf("ciphertext too short") + } + + nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:] + plaintext, err := aead.Open(nil, nonce, ciphertext, nil) + if err != nil { + return "", err + } + return string(plaintext), nil +} diff --git a/cmd/acb-matchmaker/main.go b/cmd/acb-matchmaker/main.go new file mode 100644 index 0000000..79f2571 --- /dev/null +++ b/cmd/acb-matchmaker/main.go @@ -0,0 +1,114 @@ +// Package main implements the AI Code Battle matchmaker. +// It is an internal service that runs tickers for bot pairing, +// health checking, and stale job reaping. It has no external +// HTTP exposure - it only connects to PostgreSQL and Valkey. +package main + +import ( + "context" + "database/sql" + "log" + "os" + "os/signal" + "syscall" + "time" + + _ "github.com/lib/pq" + "github.com/redis/go-redis/v9" +) + +type Config struct { + DatabaseURL string + ValkeyAddr string + ValkeyPassword string + EncryptionKey string // AES-256-GCM key for shared secret decryption + DiscordWebhook string + SlackWebhook string + MatchmakerSecs int + HealthCheckSecs int + ReaperSecs int + BotTimeoutSecs int + StaleJobMinutes int + MaxConsecFails int +} + +type Matchmaker struct { + cfg Config + db *sql.DB + rdb *redis.Client + alerter *Alerter +} + +func loadConfig() Config { + return Config{ + DatabaseURL: envOr("ACB_DATABASE_URL", "postgres://localhost:5432/acb?sslmode=disable"), + ValkeyAddr: envOr("ACB_VALKEY_ADDR", "localhost:6379"), + ValkeyPassword: os.Getenv("ACB_VALKEY_PASSWORD"), + EncryptionKey: os.Getenv("ACB_ENCRYPTION_KEY"), + DiscordWebhook: os.Getenv("ACB_DISCORD_WEBHOOK"), + SlackWebhook: os.Getenv("ACB_SLACK_WEBHOOK"), + MatchmakerSecs: envInt("ACB_MATCHMAKER_INTERVAL", 60), + HealthCheckSecs: envInt("ACB_HEALTHCHECK_INTERVAL", 900), + ReaperSecs: envInt("ACB_REAPER_INTERVAL", 300), + BotTimeoutSecs: envInt("ACB_BOT_TIMEOUT", 5), + StaleJobMinutes: envInt("ACB_STALE_JOB_MINUTES", 15), + MaxConsecFails: envInt("ACB_MAX_CONSEC_FAILS", 3), + } +} + +func main() { + cfg := loadConfig() + + db, err := sql.Open("postgres", cfg.DatabaseURL) + if err != nil { + log.Fatalf("failed to open database: %v", err) + } + defer db.Close() + + db.SetMaxOpenConns(10) + db.SetMaxIdleConns(2) + db.SetConnMaxLifetime(5 * time.Minute) + + rdb := redis.NewClient(&redis.Options{ + Addr: cfg.ValkeyAddr, + Password: cfg.ValkeyPassword, + }) + defer rdb.Close() + + // Test connections + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := db.PingContext(ctx); err != nil { + log.Fatalf("database ping failed: %v", err) + } + if err := rdb.Ping(ctx).Err(); err != nil { + log.Fatalf("valkey ping failed: %v", err) + } + + alerter := NewAlerter(cfg.DiscordWebhook, cfg.SlackWebhook) + + m := &Matchmaker{ + cfg: cfg, + db: db, + rdb: rdb, + alerter: alerter, + } + + // Start background tickers + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + m.StartTickers(ctx) + + log.Println("acb-matchmaker started - running internal tickers") + + // Graceful shutdown + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + + log.Println("shutting down...") + cancel() + log.Println("shutdown complete") +} diff --git a/cmd/acb-api/tickers.go b/cmd/acb-matchmaker/tickers.go similarity index 78% rename from cmd/acb-api/tickers.go rename to cmd/acb-matchmaker/tickers.go index ab7d361..0d2d497 100644 --- a/cmd/acb-api/tickers.go +++ b/cmd/acb-matchmaker/tickers.go @@ -11,13 +11,15 @@ import ( "time" ) -func (s *Server) StartTickers(ctx context.Context) { - go s.runTicker(ctx, "matchmaker", time.Duration(s.cfg.MatchmakerSecs)*time.Second, s.tickMatchmaker) - go s.runTicker(ctx, "health-checker", time.Duration(s.cfg.HealthCheckSecs)*time.Second, s.tickHealthChecker) - go s.runTicker(ctx, "stale-reaper", time.Duration(s.cfg.ReaperSecs)*time.Second, s.tickStaleReaper) +const valkeyJobQueue = "acb:jobs:pending" + +func (m *Matchmaker) StartTickers(ctx context.Context) { + go m.runTicker(ctx, "matchmaker", time.Duration(m.cfg.MatchmakerSecs)*time.Second, m.tickMatchmaker) + go m.runTicker(ctx, "health-checker", time.Duration(m.cfg.HealthCheckSecs)*time.Second, m.tickHealthChecker) + go m.runTicker(ctx, "stale-reaper", time.Duration(m.cfg.ReaperSecs)*time.Second, m.tickStaleReaper) } -func (s *Server) runTicker(ctx context.Context, name string, interval time.Duration, fn func(context.Context)) { +func (m *Matchmaker) runTicker(ctx context.Context, name string, interval time.Duration, fn func(context.Context)) { log.Printf("starting ticker: %s (every %s)", name, interval) ticker := time.NewTicker(interval) defer ticker.Stop() @@ -33,9 +35,9 @@ func (s *Server) runTicker(ctx context.Context, name string, interval time.Durat } // tickMatchmaker creates matches between active bots and enqueues jobs. -func (s *Server) tickMatchmaker(ctx context.Context) { +func (m *Matchmaker) tickMatchmaker(ctx context.Context) { // Get all active bots - rows, err := s.db.QueryContext(ctx, + rows, err := m.db.QueryContext(ctx, `SELECT bot_id, endpoint_url, shared_secret, rating_mu, rating_phi FROM bots WHERE status = 'active' ORDER BY rating_mu DESC`) if err != nil { @@ -44,10 +46,10 @@ func (s *Server) tickMatchmaker(ctx context.Context) { } type botInfo struct { - ID string - Endpoint string - Secret string - Mu, Phi float64 + ID string + Endpoint string + Secret string + Mu, Phi float64 } var bots []botInfo for rows.Next() { @@ -110,11 +112,11 @@ func (s *Server) tickMatchmaker(ctx context.Context) { // Decrypt secrets for the worker secretA := botA.Secret secretB := botB.Secret - if s.cfg.EncryptionKey != "" { - if dec, err := decryptSecret(botA.Secret, s.cfg.EncryptionKey); err == nil { + if m.cfg.EncryptionKey != "" { + if dec, err := decryptSecret(botA.Secret, m.cfg.EncryptionKey); err == nil { secretA = dec } - if dec, err := decryptSecret(botB.Secret, s.cfg.EncryptionKey); err == nil { + if dec, err := decryptSecret(botB.Secret, m.cfg.EncryptionKey); err == nil { secretB = dec } } @@ -132,7 +134,7 @@ func (s *Server) tickMatchmaker(ctx context.Context) { } configJSON, _ := json.Marshal(config) - tx, err := s.db.BeginTx(ctx, nil) + tx, err := m.db.BeginTx(ctx, nil) if err != nil { log.Printf("matchmaker: tx error: %v", err) return @@ -169,7 +171,7 @@ func (s *Server) tickMatchmaker(ctx context.Context) { } // Enqueue in Valkey - if err := s.rdb.LPush(ctx, valkeyJobQueue, jobID).Err(); err != nil { + if err := m.rdb.LPush(ctx, valkeyJobQueue, jobID).Err(); err != nil { log.Printf("matchmaker: valkey push error: %v", err) return } @@ -178,8 +180,8 @@ func (s *Server) tickMatchmaker(ctx context.Context) { } // tickHealthChecker pings each active bot's /health endpoint. -func (s *Server) tickHealthChecker(ctx context.Context) { - rows, err := s.db.QueryContext(ctx, +func (m *Matchmaker) tickHealthChecker(ctx context.Context) { + rows, err := m.db.QueryContext(ctx, `SELECT bot_id, endpoint_url, status, consec_fails FROM bots WHERE status IN ('active', 'inactive')`) if err != nil { log.Printf("health-checker: query error: %v", err) @@ -204,7 +206,7 @@ func (s *Server) tickHealthChecker(ctx context.Context) { } rows.Close() - client := &http.Client{Timeout: time.Duration(s.cfg.BotTimeoutSecs) * time.Second} + client := &http.Client{Timeout: time.Duration(m.cfg.BotTimeoutSecs) * time.Second} for _, bot := range bots { healthy := false @@ -216,36 +218,36 @@ func (s *Server) tickHealthChecker(ctx context.Context) { if healthy { if bot.Status == "inactive" || bot.ConsecFails > 0 { - s.db.ExecContext(ctx, + m.db.ExecContext(ctx, `UPDATE bots SET status = 'active', consec_fails = 0, last_active = NOW() WHERE bot_id = $1`, bot.ID) log.Printf("health-checker: %s recovered → active", bot.ID) if bot.Status == "inactive" { - s.alerter.BotRecovered(ctx, bot.ID) + m.alerter.BotRecovered(ctx, bot.ID) } } } else { newFails := bot.ConsecFails + 1 newStatus := bot.Status - if newFails >= s.cfg.MaxConsecFails { + if newFails >= m.cfg.MaxConsecFails { newStatus = "inactive" } - s.db.ExecContext(ctx, + m.db.ExecContext(ctx, `UPDATE bots SET status = $1, consec_fails = $2 WHERE bot_id = $3`, newStatus, newFails, bot.ID) if newStatus != bot.Status { log.Printf("health-checker: %s marked inactive after %d failures", bot.ID, newFails) - s.alerter.BotMarkedInactive(ctx, bot.ID, newFails) + m.alerter.BotMarkedInactive(ctx, bot.ID, newFails) } } } } // tickStaleReaper re-enqueues jobs that have been running too long. -func (s *Server) tickStaleReaper(ctx context.Context) { - threshold := time.Duration(s.cfg.StaleJobMinutes) * time.Minute +func (m *Matchmaker) tickStaleReaper(ctx context.Context) { + threshold := time.Duration(m.cfg.StaleJobMinutes) * time.Minute - rows, err := s.db.QueryContext(ctx, + rows, err := m.db.QueryContext(ctx, `SELECT job_id FROM jobs WHERE status = 'running' AND claimed_at < $1`, time.Now().Add(-threshold)) @@ -266,7 +268,7 @@ func (s *Server) tickStaleReaper(ctx context.Context) { rows.Close() for _, jobID := range staleJobs { - result, err := s.db.ExecContext(ctx, + result, err := m.db.ExecContext(ctx, `UPDATE jobs SET status = 'pending', worker_id = NULL, claimed_at = NULL WHERE job_id = $1 AND status = 'running'`, jobID) if err != nil { @@ -279,7 +281,7 @@ func (s *Server) tickStaleReaper(ctx context.Context) { continue // already completed or re-enqueued by another reaper } - if err := s.rdb.LPush(ctx, valkeyJobQueue, jobID).Err(); err != nil { + if err := m.rdb.LPush(ctx, valkeyJobQueue, jobID).Err(); err != nil { log.Printf("stale-reaper: re-enqueue error for %s: %v", jobID, err) continue } @@ -289,14 +291,14 @@ func (s *Server) tickStaleReaper(ctx context.Context) { if len(staleJobs) > 0 { log.Printf("stale-reaper: processed %d stale jobs", len(staleJobs)) - s.alerter.StaleJobsReaped(ctx, staleJobs) + m.alerter.StaleJobsReaped(ctx, staleJobs) } } // queryActiveBotCount returns the number of active bots (used by tests). -func (s *Server) queryActiveBotCount(ctx context.Context) (int, error) { +func (m *Matchmaker) queryActiveBotCount(ctx context.Context) (int, error) { var count int - err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM bots WHERE status = 'active'`).Scan(&count) + err := m.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM bots WHERE status = 'active'`).Scan(&count) return count, err }