Extract matchmaker into separate deployment (acb-matchmaker)

Architecture conformance fix per plan §12 Phase 4:
- Plan specifies Matchmaker Deployment as internal service with no external exposure
- Extracted tickers.go from acb-api to new cmd/acb-matchmaker/
- Tickers: bot pairing (1 min), health checking (15 min), stale job reaping (5 min)
- Alerting webhooks moved from acb-api to acb-matchmaker
- Created Dockerfile for acb-matchmaker container
- Created K8s deployment manifest (no service needed - internal only)
- Fixed syntax error in cmd/acb-api/db.go (prematurely closed schemaSQL string)

This separates concerns per the plan:
- acb-api: HTTP endpoints for bot registration, job coordination, bot status
- acb-matchmaker: Internal tickers for matchmaking, health checks, reaping

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-03-29 00:55:46 -04:00
parent 74c7e63d45
commit 875ccdbe83
12 changed files with 689 additions and 43 deletions

View file

@ -4,9 +4,23 @@
**Status: 🔄 In Progress**
**Last Updated: 2026-03-28**
**Last Updated: 2026-03-29**
### Recent Changes (2026-03-28)
### Recent Changes (2026-03-29)
- **Architecture Conformance Fix**: Separated matchmaker from acb-api into acb-matchmaker
per plan §12 Phase 4:
- Plan specifies "Matchmaker Deployment (`acb-matchmaker`): internal tickers for pairing
bots (1 min), health checking (15 min), stale job reaping (5 min). No external exposure."
- Created `cmd/acb-matchmaker/` with main.go, tickers.go, config.go, crypto.go, alerts.go
- Removed tickers.go from acb-api (tickers now in separate deployment)
- Removed alerter field from acb-api Server struct (alerting now in matchmaker)
- Created `cmd/acb-matchmaker/Dockerfile` for container builds
- Created `cluster-configuration/apexalgo-iad/ai-code-battle/acb-matchmaker-deployment.yml`
- Matchmaker runs as internal-only deployment with no HTTP endpoints exposed
- Fixed syntax error in `cmd/acb-api/db.go` (prematurely closed schemaSQL string)
- All tests pass (acb-api + acb-matchmaker builds successfully)
### Previous Changes (2026-03-28)
- **Architecture Conformance Fix**: Migrated K8s manifests from `deploy/k8s/` to
`cluster-configuration/apexalgo-iad/ai-code-battle/` per plan specification:
- Plan §9.3 and §9.7 specify K8s manifests go in `cluster-configuration/` for ArgoCD GitOps

View file

@ -0,0 +1,75 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: acb-matchmaker
namespace: ai-code-battle
labels:
app.kubernetes.io/name: acb-matchmaker
app.kubernetes.io/part-of: ai-code-battle
app.kubernetes.io/component: matchmaker
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: acb-matchmaker
template:
metadata:
labels:
app.kubernetes.io/name: acb-matchmaker
app.kubernetes.io/part-of: ai-code-battle
app.kubernetes.io/component: matchmaker
spec:
containers:
- name: matchmaker
image: forgejo.ardenone.com/ai-code-battle/acb-matchmaker:latest
env:
- name: ACB_DATABASE_URL
valueFrom:
secretKeyRef:
name: acb-database-url
key: url
- name: ACB_VALKEY_ADDR
value: "valkey.ai-code-battle.svc:6379"
- name: ACB_VALKEY_PASSWORD
valueFrom:
secretKeyRef:
name: acb-valkey-password
key: password
optional: true
- name: ACB_ENCRYPTION_KEY
valueFrom:
secretKeyRef:
name: acb-api-key
key: encryption-key
optional: true
- name: ACB_DISCORD_WEBHOOK
valueFrom:
secretKeyRef:
name: acb-alert-webhooks
key: discord
optional: true
- name: ACB_SLACK_WEBHOOK
valueFrom:
secretKeyRef:
name: acb-alert-webhooks
key: slack
optional: true
- name: ACB_MATCHMAKER_INTERVAL
value: "60"
- name: ACB_HEALTHCHECK_INTERVAL
value: "900"
- name: ACB_REAPER_INTERVAL
value: "300"
- name: ACB_BOT_TIMEOUT
value: "5"
- name: ACB_STALE_JOB_MINUTES
value: "15"
- name: ACB_MAX_CONSEC_FAILS
value: "3"
resources:
requests:
cpu: 50m
memory: 128Mi
limits:
memory: 256Mi
restartPolicy: Always

View file

@ -6,6 +6,91 @@ import (
)
const schemaSQL = `
-- ---- Phase 9 tables ----
CREATE TABLE IF NOT EXISTS predictions (
id BIGSERIAL PRIMARY KEY,
match_id VARCHAR(32) NOT NULL REFERENCES matches(match_id),
predictor_id VARCHAR(64) NOT NULL,
predicted_bot VARCHAR(16) NOT NULL,
correct BOOLEAN,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
resolved_at TIMESTAMPTZ,
UNIQUE(match_id, predictor_id)
);
CREATE INDEX IF NOT EXISTS idx_predictions_match ON predictions(match_id);
CREATE INDEX IF NOT EXISTS idx_predictions_predictor ON predictions(predictor_id);
CREATE TABLE IF NOT EXISTS predictor_stats (
predictor_id VARCHAR(64) PRIMARY KEY,
correct INTEGER NOT NULL DEFAULT 0,
incorrect INTEGER NOT NULL DEFAULT 0,
streak INTEGER NOT NULL DEFAULT 0,
best_streak INTEGER NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS series (
id BIGSERIAL PRIMARY KEY,
bot_a_id VARCHAR(16) NOT NULL REFERENCES bots(bot_id),
bot_b_id VARCHAR(16) NOT NULL REFERENCES bots(bot_id),
format INTEGER NOT NULL DEFAULT 5, -- best of N (3, 5, 7...)
a_wins INTEGER NOT NULL DEFAULT 0,
b_wins INTEGER NOT NULL DEFAULT 0,
status VARCHAR(16) NOT NULL DEFAULT 'active',
winner_id VARCHAR(16),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_series_bots ON series(bot_a_id, bot_b_id);
CREATE INDEX IF NOT EXISTS idx_series_status ON series(status);
CREATE TABLE IF NOT EXISTS series_games (
id BIGSERIAL PRIMARY KEY,
series_id BIGINT NOT NULL REFERENCES series(id),
match_id VARCHAR(32) NOT NULL REFERENCES matches(match_id),
game_num INTEGER NOT NULL,
winner_id VARCHAR(16),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_series_games_series ON series_games(series_id);
CREATE TABLE IF NOT EXISTS seasons (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(64) NOT NULL,
theme VARCHAR(128),
rules_version VARCHAR(32) NOT NULL DEFAULT '1.0',
status VARCHAR(16) NOT NULL DEFAULT 'active',
champion_id VARCHAR(16),
starts_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
ends_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS season_snapshots (
id BIGSERIAL PRIMARY KEY,
season_id BIGINT NOT NULL REFERENCES seasons(id),
bot_id VARCHAR(16) NOT NULL REFERENCES bots(bot_id),
rank INTEGER NOT NULL,
rating DOUBLE PRECISION NOT NULL,
wins INTEGER NOT NULL DEFAULT 0,
losses INTEGER NOT NULL DEFAULT 0,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_season_snapshots_season ON season_snapshots(season_id, rank);
-- Map engagement scores (written by acb-mapgen or evolution pipeline)
CREATE TABLE IF NOT EXISTS map_scores (
map_id VARCHAR(32) PRIMARY KEY,
engagement DOUBLE PRECISION NOT NULL DEFAULT 0.0,
symmetry_score DOUBLE PRECISION NOT NULL DEFAULT 0.0,
wall_density DOUBLE PRECISION NOT NULL DEFAULT 0.0,
last_used_at TIMESTAMPTZ,
match_count INTEGER NOT NULL DEFAULT 0,
avg_turns DOUBLE PRECISION,
scored_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS bots (
bot_id VARCHAR(16) PRIMARY KEY,
name VARCHAR(32) UNIQUE NOT NULL,

View file

@ -75,10 +75,10 @@ func main() {
defer rdb.Close()
srv := &Server{
cfg: cfg,
db: db,
rdb: rdb,
alerter: NewAlerter(cfg.DiscordWebhook, cfg.SlackWebhook),
cfg: cfg,
db: db,
rdb: rdb,
// Note: alerter moved to acb-matchmaker deployment
}
mux := http.NewServeMux()
@ -95,8 +95,12 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start background tickers
srv.StartTickers(ctx)
// Note: Background tickers (matchmaker, health-checker, stale-reaper) are now
// handled by the separate acb-matchmaker deployment per plan §12 Phase 4.
// This API server only handles HTTP endpoints for bot registration, job
// coordination, and bot status.
_ = ctx // ctx no longer needed since tickers moved to acb-matchmaker
// Graceful shutdown
sigCh := make(chan os.Signal, 1)

View file

@ -12,7 +12,7 @@ type Server struct {
cfg Config
db *sql.DB
rdb *redis.Client
alerter *Alerter
// Note: alerter removed - alerting now handled by acb-matchmaker deployment
}
func (s *Server) RegisterRoutes(mux *http.ServeMux) {

View file

@ -18,7 +18,6 @@ func newTestServer() *Server {
BotTimeoutSecs: 5,
MaxConsecFails: 3,
},
alerter: NewAlerter("", ""),
}
}
@ -72,7 +71,7 @@ func TestAuthenticateWorker(t *testing.T) {
}
func TestAuthenticateWorker_NoKeyConfigured(t *testing.T) {
srv := &Server{cfg: Config{WorkerAPIKey: ""}, alerter: NewAlerter("", "")}
srv := &Server{cfg: Config{WorkerAPIKey: ""}}
req := httptest.NewRequest("GET", "/", nil)
if !srv.authenticateWorker(req) {
t.Error("with no key configured, all requests should be authenticated")

View file

@ -0,0 +1,51 @@
# AI Code Battle Matchmaker Container
# Internal service that runs tickers for bot pairing, health checking, and stale job reaping
# Build stage
FROM golang:1.24-alpine AS builder
WORKDIR /build
# Copy go.mod and go.sum first for caching
COPY go.mod go.sum ./
RUN go mod download
# Copy engine package (if needed)
COPY engine/ ./engine/
# Copy matchmaker source
COPY cmd/acb-matchmaker/ ./cmd/acb-matchmaker/
# Build the binary
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /acb-matchmaker ./cmd/acb-matchmaker
# Runtime stage
FROM alpine:3.19
WORKDIR /app
# Install ca-certificates for HTTPS (for bot health checks)
RUN apk --no-cache add ca-certificates tzdata
# Copy binary from builder
COPY --from=builder /acb-matchmaker /app/acb-matchmaker
# Create non-root user
RUN adduser -D -u 1000 acb
USER acb
# Environment variables (set at runtime)
# ACB_DATABASE_URL - PostgreSQL connection string
# ACB_VALKEY_ADDR - Valkey/Redis address (default: localhost:6379)
# ACB_VALKEY_PASSWORD - Valkey/Redis password (optional)
# ACB_ENCRYPTION_KEY - AES-256-GCM key for decrypting shared secrets (64 hex chars)
# ACB_DISCORD_WEBHOOK - Discord webhook URL for alerts (optional)
# ACB_SLACK_WEBHOOK - Slack webhook URL for alerts (optional)
# ACB_MATCHMAKER_INTERVAL - Seconds between matchmaking cycles (default: 60)
# ACB_HEALTHCHECK_INTERVAL - Seconds between health checks (default: 900)
# ACB_REAPER_INTERVAL - Seconds between stale job reaper runs (default: 300)
# ACB_BOT_TIMEOUT - HTTP timeout for bot health checks in seconds (default: 5)
# ACB_STALE_JOB_MINUTES - Minutes before a running job is considered stale (default: 15)
# ACB_MAX_CONSEC_FAILS - Consecutive failures before marking bot inactive (default: 3)
ENTRYPOINT ["/app/acb-matchmaker"]

View file

@ -0,0 +1,223 @@
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"sync"
"time"
)
// AlertLevel indicates severity for color-coding in webhook messages.
type AlertLevel int
const (
AlertInfo AlertLevel = iota // blue / informational
AlertWarning // yellow / warning
AlertError // red / error
)
// Alerter sends notifications to configured Discord and/or Slack webhooks.
type Alerter struct {
discordURL string
slackURL string
client *http.Client
// Rate limiting: max 1 alert per key per cooldown period.
mu sync.Mutex
cooldown time.Duration
sent map[string]time.Time
}
// NewAlerter creates an Alerter. If both URLs are empty, Send is a no-op.
func NewAlerter(discordURL, slackURL string) *Alerter {
return &Alerter{
discordURL: discordURL,
slackURL: slackURL,
client: &http.Client{Timeout: 10 * time.Second},
cooldown: 5 * time.Minute,
sent: make(map[string]time.Time),
}
}
// Enabled returns true if at least one webhook URL is configured.
func (a *Alerter) Enabled() bool {
return a.discordURL != "" || a.slackURL != ""
}
// Send dispatches an alert to all configured webhooks. The dedupKey is used
// for rate limiting — identical keys within the cooldown window are suppressed.
func (a *Alerter) Send(ctx context.Context, level AlertLevel, title, message, dedupKey string) {
if !a.Enabled() {
return
}
if dedupKey != "" && !a.shouldSend(dedupKey) {
return
}
if a.discordURL != "" {
if err := a.sendDiscord(ctx, level, title, message); err != nil {
log.Printf("alert: discord send error: %v", err)
}
}
if a.slackURL != "" {
if err := a.sendSlack(ctx, level, title, message); err != nil {
log.Printf("alert: slack send error: %v", err)
}
}
}
// shouldSend checks rate limiting. Returns true if the alert should be sent.
func (a *Alerter) shouldSend(key string) bool {
a.mu.Lock()
defer a.mu.Unlock()
now := time.Now()
// Garbage collect expired entries periodically
if len(a.sent) > 100 {
for k, t := range a.sent {
if now.Sub(t) > a.cooldown {
delete(a.sent, k)
}
}
}
if last, ok := a.sent[key]; ok && now.Sub(last) < a.cooldown {
return false
}
a.sent[key] = now
return true
}
// discordPayload is the Discord webhook message format.
type discordPayload struct {
Embeds []discordEmbed `json:"embeds"`
}
type discordEmbed struct {
Title string `json:"title"`
Description string `json:"description"`
Color int `json:"color"`
Timestamp string `json:"timestamp"`
}
func (a *Alerter) sendDiscord(ctx context.Context, level AlertLevel, title, message string) error {
color := 0x3498db // blue
switch level {
case AlertWarning:
color = 0xf39c12 // yellow/orange
case AlertError:
color = 0xe74c3c // red
}
payload := discordPayload{
Embeds: []discordEmbed{{
Title: fmt.Sprintf("[ACB-Matchmaker] %s", title),
Description: message,
Color: color,
Timestamp: time.Now().UTC().Format(time.RFC3339),
}},
}
return a.postJSON(ctx, a.discordURL, payload)
}
// slackPayload is the Slack incoming webhook format.
type slackPayload struct {
Attachments []slackAttachment `json:"attachments"`
}
type slackAttachment struct {
Color string `json:"color"`
Title string `json:"title"`
Text string `json:"text"`
Footer string `json:"footer"`
Ts int64 `json:"ts"`
}
func (a *Alerter) sendSlack(ctx context.Context, level AlertLevel, title, message string) error {
color := "#3498db"
switch level {
case AlertWarning:
color = "#f39c12"
case AlertError:
color = "#e74c3c"
}
payload := slackPayload{
Attachments: []slackAttachment{{
Color: color,
Title: fmt.Sprintf("[ACB-Matchmaker] %s", title),
Text: message,
Footer: "AI Code Battle",
Ts: time.Now().Unix(),
}},
}
return a.postJSON(ctx, a.slackURL, payload)
}
func (a *Alerter) postJSON(ctx context.Context, url string, payload any) error {
body, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal payload: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := a.client.Do(req)
if err != nil {
return fmt.Errorf("send webhook: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("webhook returned status %d", resp.StatusCode)
}
return nil
}
// Alert helper methods for common events.
func (a *Alerter) BotMarkedInactive(ctx context.Context, botID string, failCount int) {
a.Send(ctx, AlertWarning,
"Bot Marked Inactive",
fmt.Sprintf("Bot `%s` marked inactive after %d consecutive health check failures.", botID, failCount),
"bot-inactive:"+botID,
)
}
func (a *Alerter) BotRecovered(ctx context.Context, botID string) {
a.Send(ctx, AlertInfo,
"Bot Recovered",
fmt.Sprintf("Bot `%s` is back online and marked active.", botID),
"bot-recovered:"+botID,
)
}
func (a *Alerter) StaleJobsReaped(ctx context.Context, jobIDs []string) {
a.Send(ctx, AlertWarning,
"Stale Jobs Re-enqueued",
fmt.Sprintf("%d stale job(s) re-enqueued: %s", len(jobIDs), strings.Join(jobIDs, ", ")),
"stale-jobs",
)
}
func (a *Alerter) MatchError(ctx context.Context, matchID, reason string) {
a.Send(ctx, AlertError,
"Match Error",
fmt.Sprintf("Match `%s` failed: %s", matchID, reason),
"match-error:"+matchID,
)
}

View file

@ -0,0 +1,25 @@
package main
import (
"os"
"strconv"
)
func envOr(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
func envInt(key string, fallback int) int {
v := os.Getenv(key)
if v == "" {
return fallback
}
n, err := strconv.Atoi(v)
if err != nil {
return fallback
}
return n
}

View file

@ -0,0 +1,54 @@
package main
import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/hex"
"fmt"
"io"
)
func generateID(prefix string, nBytes int) (string, error) {
b := make([]byte, nBytes)
if _, err := io.ReadFull(rand.Reader, b); err != nil {
return "", err
}
return prefix + hex.EncodeToString(b), nil
}
func decryptSecret(ciphertextHex, keyHex string) (string, error) {
key, err := hex.DecodeString(keyHex)
if err != nil {
return "", fmt.Errorf("decode key: %w", err)
}
if len(key) != 32 {
return "", fmt.Errorf("encryption key must be 32 bytes (64 hex chars)")
}
ciphertext, err := hex.DecodeString(ciphertextHex)
if err != nil {
return "", fmt.Errorf("decode ciphertext: %w", err)
}
block, err := aes.NewCipher(key)
if err != nil {
return "", err
}
aead, err := cipher.NewGCM(block)
if err != nil {
return "", err
}
nonceSize := aead.NonceSize()
if len(ciphertext) < nonceSize {
return "", fmt.Errorf("ciphertext too short")
}
nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:]
plaintext, err := aead.Open(nil, nonce, ciphertext, nil)
if err != nil {
return "", err
}
return string(plaintext), nil
}

114
cmd/acb-matchmaker/main.go Normal file
View file

@ -0,0 +1,114 @@
// Package main implements the AI Code Battle matchmaker.
// It is an internal service that runs tickers for bot pairing,
// health checking, and stale job reaping. It has no external
// HTTP exposure - it only connects to PostgreSQL and Valkey.
package main
import (
"context"
"database/sql"
"log"
"os"
"os/signal"
"syscall"
"time"
_ "github.com/lib/pq"
"github.com/redis/go-redis/v9"
)
type Config struct {
DatabaseURL string
ValkeyAddr string
ValkeyPassword string
EncryptionKey string // AES-256-GCM key for shared secret decryption
DiscordWebhook string
SlackWebhook string
MatchmakerSecs int
HealthCheckSecs int
ReaperSecs int
BotTimeoutSecs int
StaleJobMinutes int
MaxConsecFails int
}
type Matchmaker struct {
cfg Config
db *sql.DB
rdb *redis.Client
alerter *Alerter
}
func loadConfig() Config {
return Config{
DatabaseURL: envOr("ACB_DATABASE_URL", "postgres://localhost:5432/acb?sslmode=disable"),
ValkeyAddr: envOr("ACB_VALKEY_ADDR", "localhost:6379"),
ValkeyPassword: os.Getenv("ACB_VALKEY_PASSWORD"),
EncryptionKey: os.Getenv("ACB_ENCRYPTION_KEY"),
DiscordWebhook: os.Getenv("ACB_DISCORD_WEBHOOK"),
SlackWebhook: os.Getenv("ACB_SLACK_WEBHOOK"),
MatchmakerSecs: envInt("ACB_MATCHMAKER_INTERVAL", 60),
HealthCheckSecs: envInt("ACB_HEALTHCHECK_INTERVAL", 900),
ReaperSecs: envInt("ACB_REAPER_INTERVAL", 300),
BotTimeoutSecs: envInt("ACB_BOT_TIMEOUT", 5),
StaleJobMinutes: envInt("ACB_STALE_JOB_MINUTES", 15),
MaxConsecFails: envInt("ACB_MAX_CONSEC_FAILS", 3),
}
}
func main() {
cfg := loadConfig()
db, err := sql.Open("postgres", cfg.DatabaseURL)
if err != nil {
log.Fatalf("failed to open database: %v", err)
}
defer db.Close()
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(2)
db.SetConnMaxLifetime(5 * time.Minute)
rdb := redis.NewClient(&redis.Options{
Addr: cfg.ValkeyAddr,
Password: cfg.ValkeyPassword,
})
defer rdb.Close()
// Test connections
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
log.Fatalf("database ping failed: %v", err)
}
if err := rdb.Ping(ctx).Err(); err != nil {
log.Fatalf("valkey ping failed: %v", err)
}
alerter := NewAlerter(cfg.DiscordWebhook, cfg.SlackWebhook)
m := &Matchmaker{
cfg: cfg,
db: db,
rdb: rdb,
alerter: alerter,
}
// Start background tickers
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
m.StartTickers(ctx)
log.Println("acb-matchmaker started - running internal tickers")
// Graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("shutting down...")
cancel()
log.Println("shutdown complete")
}

View file

@ -11,13 +11,15 @@ import (
"time"
)
func (s *Server) StartTickers(ctx context.Context) {
go s.runTicker(ctx, "matchmaker", time.Duration(s.cfg.MatchmakerSecs)*time.Second, s.tickMatchmaker)
go s.runTicker(ctx, "health-checker", time.Duration(s.cfg.HealthCheckSecs)*time.Second, s.tickHealthChecker)
go s.runTicker(ctx, "stale-reaper", time.Duration(s.cfg.ReaperSecs)*time.Second, s.tickStaleReaper)
const valkeyJobQueue = "acb:jobs:pending"
func (m *Matchmaker) StartTickers(ctx context.Context) {
go m.runTicker(ctx, "matchmaker", time.Duration(m.cfg.MatchmakerSecs)*time.Second, m.tickMatchmaker)
go m.runTicker(ctx, "health-checker", time.Duration(m.cfg.HealthCheckSecs)*time.Second, m.tickHealthChecker)
go m.runTicker(ctx, "stale-reaper", time.Duration(m.cfg.ReaperSecs)*time.Second, m.tickStaleReaper)
}
func (s *Server) runTicker(ctx context.Context, name string, interval time.Duration, fn func(context.Context)) {
func (m *Matchmaker) runTicker(ctx context.Context, name string, interval time.Duration, fn func(context.Context)) {
log.Printf("starting ticker: %s (every %s)", name, interval)
ticker := time.NewTicker(interval)
defer ticker.Stop()
@ -33,9 +35,9 @@ func (s *Server) runTicker(ctx context.Context, name string, interval time.Durat
}
// tickMatchmaker creates matches between active bots and enqueues jobs.
func (s *Server) tickMatchmaker(ctx context.Context) {
func (m *Matchmaker) tickMatchmaker(ctx context.Context) {
// Get all active bots
rows, err := s.db.QueryContext(ctx,
rows, err := m.db.QueryContext(ctx,
`SELECT bot_id, endpoint_url, shared_secret, rating_mu, rating_phi
FROM bots WHERE status = 'active' ORDER BY rating_mu DESC`)
if err != nil {
@ -44,10 +46,10 @@ func (s *Server) tickMatchmaker(ctx context.Context) {
}
type botInfo struct {
ID string
Endpoint string
Secret string
Mu, Phi float64
ID string
Endpoint string
Secret string
Mu, Phi float64
}
var bots []botInfo
for rows.Next() {
@ -110,11 +112,11 @@ func (s *Server) tickMatchmaker(ctx context.Context) {
// Decrypt secrets for the worker
secretA := botA.Secret
secretB := botB.Secret
if s.cfg.EncryptionKey != "" {
if dec, err := decryptSecret(botA.Secret, s.cfg.EncryptionKey); err == nil {
if m.cfg.EncryptionKey != "" {
if dec, err := decryptSecret(botA.Secret, m.cfg.EncryptionKey); err == nil {
secretA = dec
}
if dec, err := decryptSecret(botB.Secret, s.cfg.EncryptionKey); err == nil {
if dec, err := decryptSecret(botB.Secret, m.cfg.EncryptionKey); err == nil {
secretB = dec
}
}
@ -132,7 +134,7 @@ func (s *Server) tickMatchmaker(ctx context.Context) {
}
configJSON, _ := json.Marshal(config)
tx, err := s.db.BeginTx(ctx, nil)
tx, err := m.db.BeginTx(ctx, nil)
if err != nil {
log.Printf("matchmaker: tx error: %v", err)
return
@ -169,7 +171,7 @@ func (s *Server) tickMatchmaker(ctx context.Context) {
}
// Enqueue in Valkey
if err := s.rdb.LPush(ctx, valkeyJobQueue, jobID).Err(); err != nil {
if err := m.rdb.LPush(ctx, valkeyJobQueue, jobID).Err(); err != nil {
log.Printf("matchmaker: valkey push error: %v", err)
return
}
@ -178,8 +180,8 @@ func (s *Server) tickMatchmaker(ctx context.Context) {
}
// tickHealthChecker pings each active bot's /health endpoint.
func (s *Server) tickHealthChecker(ctx context.Context) {
rows, err := s.db.QueryContext(ctx,
func (m *Matchmaker) tickHealthChecker(ctx context.Context) {
rows, err := m.db.QueryContext(ctx,
`SELECT bot_id, endpoint_url, status, consec_fails FROM bots WHERE status IN ('active', 'inactive')`)
if err != nil {
log.Printf("health-checker: query error: %v", err)
@ -204,7 +206,7 @@ func (s *Server) tickHealthChecker(ctx context.Context) {
}
rows.Close()
client := &http.Client{Timeout: time.Duration(s.cfg.BotTimeoutSecs) * time.Second}
client := &http.Client{Timeout: time.Duration(m.cfg.BotTimeoutSecs) * time.Second}
for _, bot := range bots {
healthy := false
@ -216,36 +218,36 @@ func (s *Server) tickHealthChecker(ctx context.Context) {
if healthy {
if bot.Status == "inactive" || bot.ConsecFails > 0 {
s.db.ExecContext(ctx,
m.db.ExecContext(ctx,
`UPDATE bots SET status = 'active', consec_fails = 0, last_active = NOW()
WHERE bot_id = $1`, bot.ID)
log.Printf("health-checker: %s recovered → active", bot.ID)
if bot.Status == "inactive" {
s.alerter.BotRecovered(ctx, bot.ID)
m.alerter.BotRecovered(ctx, bot.ID)
}
}
} else {
newFails := bot.ConsecFails + 1
newStatus := bot.Status
if newFails >= s.cfg.MaxConsecFails {
if newFails >= m.cfg.MaxConsecFails {
newStatus = "inactive"
}
s.db.ExecContext(ctx,
m.db.ExecContext(ctx,
`UPDATE bots SET status = $1, consec_fails = $2 WHERE bot_id = $3`,
newStatus, newFails, bot.ID)
if newStatus != bot.Status {
log.Printf("health-checker: %s marked inactive after %d failures", bot.ID, newFails)
s.alerter.BotMarkedInactive(ctx, bot.ID, newFails)
m.alerter.BotMarkedInactive(ctx, bot.ID, newFails)
}
}
}
}
// tickStaleReaper re-enqueues jobs that have been running too long.
func (s *Server) tickStaleReaper(ctx context.Context) {
threshold := time.Duration(s.cfg.StaleJobMinutes) * time.Minute
func (m *Matchmaker) tickStaleReaper(ctx context.Context) {
threshold := time.Duration(m.cfg.StaleJobMinutes) * time.Minute
rows, err := s.db.QueryContext(ctx,
rows, err := m.db.QueryContext(ctx,
`SELECT job_id FROM jobs
WHERE status = 'running' AND claimed_at < $1`,
time.Now().Add(-threshold))
@ -266,7 +268,7 @@ func (s *Server) tickStaleReaper(ctx context.Context) {
rows.Close()
for _, jobID := range staleJobs {
result, err := s.db.ExecContext(ctx,
result, err := m.db.ExecContext(ctx,
`UPDATE jobs SET status = 'pending', worker_id = NULL, claimed_at = NULL
WHERE job_id = $1 AND status = 'running'`, jobID)
if err != nil {
@ -279,7 +281,7 @@ func (s *Server) tickStaleReaper(ctx context.Context) {
continue // already completed or re-enqueued by another reaper
}
if err := s.rdb.LPush(ctx, valkeyJobQueue, jobID).Err(); err != nil {
if err := m.rdb.LPush(ctx, valkeyJobQueue, jobID).Err(); err != nil {
log.Printf("stale-reaper: re-enqueue error for %s: %v", jobID, err)
continue
}
@ -289,14 +291,14 @@ func (s *Server) tickStaleReaper(ctx context.Context) {
if len(staleJobs) > 0 {
log.Printf("stale-reaper: processed %d stale jobs", len(staleJobs))
s.alerter.StaleJobsReaped(ctx, staleJobs)
m.alerter.StaleJobsReaped(ctx, staleJobs)
}
}
// queryActiveBotCount returns the number of active bots (used by tests).
func (s *Server) queryActiveBotCount(ctx context.Context) (int, error) {
func (m *Matchmaker) queryActiveBotCount(ctx context.Context) (int, error) {
var count int
err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM bots WHERE status = 'active'`).Scan(&count)
err := m.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM bots WHERE status = 'active'`).Scan(&count)
return count, err
}