ai-code-battle/cmd/acb-worker/db.go
jedarden c56cc8bae6 fix(matchmaker): multi-match crash cooldown (3 strikes / 30 min) per §4.5 + §6.1
Add crash_strikes and cooldown_until columns to bots table. Worker
increments strikes on crash (cooldown at 3), resets on success.
Matchmaker excludes cooldown bots from pairing, series scheduling,
and championship brackets. Fix erroneous cooldown filter on series
table in finalizeCompletedSeries (column only exists on bots).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 15:22:12 -04:00

637 lines
18 KiB
Go

// PostgreSQL database client for match results and job coordination
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"time"
_ "github.com/lib/pq"
)
// DBClient handles PostgreSQL operations.
type DBClient struct {
db *sql.DB
}
// NewDBClient creates a new database client.
func NewDBClient(databaseURL string) (*DBClient, error) {
db, err := sql.Open("postgres", databaseURL)
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
}
// Configure connection pool
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(5 * time.Minute)
// Verify connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
}
return &DBClient{db: db}, nil
}
// Close closes the database connection.
func (c *DBClient) Close() error {
return c.db.Close()
}
// DBJob represents a pending job from the database.
type DBJob struct {
ID string `json:"id"`
MatchID string `json:"match_id"`
Status string `json:"status"`
WorkerID *string `json:"worker_id"`
ClaimedAt *time.Time `json:"claimed_at"`
HeartbeatAt *time.Time `json:"heartbeat_at"`
CreatedAt time.Time `json:"created_at"`
}
// DBMatch represents match metadata from the database.
type DBMatch struct {
ID string `json:"id"`
Status string `json:"status"`
Winner *int `json:"winner"` // player index
MapID string `json:"map_id"`
CreatedAt time.Time `json:"created_at"`
CompletedAt *time.Time `json:"completed_at"`
}
// DBParticipant represents a match participant.
type DBParticipant struct {
MatchID string `json:"match_id"`
BotID string `json:"bot_id"`
PlayerSlot int `json:"player_slot"`
Score int `json:"score"`
RatingMuBefore float64
RatingPhiBefore float64
RatingSigmaBefore float64
RatingMuAfter *float64
RatingPhiAfter *float64
RatingSigmaAfter *float64
}
// DBBotInfo contains bot endpoint and secret information.
type DBBotInfo struct {
ID string
EndpointURL string
Secret string
}
// DBMapData represents map configuration.
type DBMapData struct {
ID string `json:"id"`
Width int `json:"width"`
Height int `json:"height"`
Walls string `json:"walls"`
Spawns string `json:"spawns"`
Cores string `json:"cores"`
}
// JobClaimData contains all data needed to execute a match.
type JobClaimData struct {
Job DBJob
Match DBMatch
Participants []DBParticipant
Map DBMapData
Bots []DBBotInfo
}
// GetNextJob fetches the next pending job from the database.
func (c *DBClient) GetNextJob(ctx context.Context) (*DBJob, error) {
query := `
SELECT job_id, match_id, status, worker_id, claimed_at, heartbeat_at, created_at
FROM jobs
WHERE status = 'pending'
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
`
var job DBJob
err := c.db.QueryRowContext(ctx, query).Scan(
&job.ID, &job.MatchID, &job.Status, &job.WorkerID,
&job.ClaimedAt, &job.HeartbeatAt, &job.CreatedAt,
)
if err == sql.ErrNoRows {
return nil, nil // No pending jobs
}
if err != nil {
return nil, fmt.Errorf("failed to get next job: %w", err)
}
return &job, nil
}
// ClaimJob claims a job and returns all data needed to execute the match.
func (c *DBClient) ClaimJob(ctx context.Context, jobID string, workerID string) (*JobClaimData, error) {
tx, err := c.db.BeginTx(ctx, nil)
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
// Update job status
now := time.Now().UTC()
_, err = tx.ExecContext(ctx, `
UPDATE jobs
SET status = 'claimed', worker_id = $1, claimed_at = $2
WHERE job_id = $3 AND status = 'pending'
`, workerID, now, jobID)
if err != nil {
return nil, fmt.Errorf("failed to claim job: %w", err)
}
// Get job details
var job DBJob
err = tx.QueryRowContext(ctx, `
SELECT job_id, match_id, status, worker_id, claimed_at, heartbeat_at, created_at
FROM jobs WHERE job_id = $1
`, jobID).Scan(
&job.ID, &job.MatchID, &job.Status, &job.WorkerID,
&job.ClaimedAt, &job.HeartbeatAt, &job.CreatedAt,
)
if err != nil {
return nil, fmt.Errorf("failed to get job: %w", err)
}
// Get match details
var match DBMatch
err = tx.QueryRowContext(ctx, `
SELECT match_id, status, winner, map_id, created_at, completed_at
FROM matches WHERE match_id = $1
`, job.MatchID).Scan(
&match.ID, &match.Status, &match.Winner, &match.MapID,
&match.CreatedAt, &match.CompletedAt,
)
if err != nil {
return nil, fmt.Errorf("failed to get match: %w", err)
}
// Get map data
var mapData DBMapData
err = tx.QueryRowContext(ctx, `
SELECT map_id, grid_width, grid_height, map_json->>'walls' as walls,
map_json->>'spawns' as spawns, map_json->>'cores' as cores
FROM maps WHERE map_id = $1
`, match.MapID).Scan(
&mapData.ID, &mapData.Width, &mapData.Height,
&mapData.Walls, &mapData.Spawns, &mapData.Cores,
)
if err != nil {
return nil, fmt.Errorf("failed to get map: %w", err)
}
// Get participants
participantRows, err := tx.QueryContext(ctx, `
SELECT mp.match_id, mp.bot_id, mp.player_slot, mp.score,
b.rating_mu, b.rating_phi, b.rating_sigma
FROM match_participants mp
JOIN bots b ON mp.bot_id = b.bot_id
WHERE mp.match_id = $1
ORDER BY mp.player_slot
`, job.MatchID)
if err != nil {
return nil, fmt.Errorf("failed to get participants: %w", err)
}
defer participantRows.Close()
var participants []DBParticipant
var botIDs []string
for participantRows.Next() {
var p DBParticipant
err := participantRows.Scan(
&p.MatchID, &p.BotID, &p.PlayerSlot, &p.Score,
&p.RatingMuBefore, &p.RatingPhiBefore, &p.RatingSigmaBefore,
)
if err != nil {
return nil, fmt.Errorf("failed to scan participant: %w", err)
}
participants = append(participants, p)
botIDs = append(botIDs, p.BotID)
}
// Get bot endpoints and secrets
botRows, err := tx.QueryContext(ctx, `
SELECT bot_id, endpoint_url, shared_secret
FROM bots WHERE bot_id = ANY($1)
`, botIDs)
if err != nil {
return nil, fmt.Errorf("failed to get bots: %w", err)
}
defer botRows.Close()
var bots []DBBotInfo
for botRows.Next() {
var b DBBotInfo
if err := botRows.Scan(&b.ID, &b.EndpointURL, &b.Secret); err != nil {
return nil, fmt.Errorf("failed to scan bot: %w", err)
}
bots = append(bots, b)
}
// Update match status to running
_, err = tx.ExecContext(ctx, `
UPDATE matches SET status = 'running' WHERE match_id = $1
`, job.MatchID)
if err != nil {
return nil, fmt.Errorf("failed to update match status: %w", err)
}
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("failed to commit transaction: %w", err)
}
return &JobClaimData{
Job: job,
Match: match,
Participants: participants,
Map: mapData,
Bots: bots,
}, nil
}
// Heartbeat updates the heartbeat timestamp for a job.
func (c *DBClient) Heartbeat(ctx context.Context, jobID string, workerID string) error {
result, err := c.db.ExecContext(ctx, `
UPDATE jobs
SET heartbeat_at = NOW()
WHERE job_id = $1 AND worker_id = $2 AND status = 'claimed'
`, jobID, workerID)
if err != nil {
return fmt.Errorf("failed to send heartbeat: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to check rows affected: %w", err)
}
if rows == 0 {
return fmt.Errorf("job not found or not claimed by this worker")
}
return nil
}
// SubmitMatchResult writes the match result to the database and updates ratings.
func (c *DBClient) SubmitMatchResult(ctx context.Context, jobID string, result *MatchResult, replayURL string, ratingUpdates []RatingUpdate) error {
tx, err := c.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
now := time.Now().UTC()
// Determine winner player index from result
var winnerIndex *int
if result.WinnerID != "" {
// Look up player slot for winner
var idx int
err := tx.QueryRowContext(ctx, `
SELECT player_slot FROM match_participants WHERE match_id = (
SELECT match_id FROM jobs WHERE job_id = $1
) AND bot_id = $2
`, jobID, result.WinnerID).Scan(&idx)
if err == nil {
winnerIndex = &idx
}
}
// Update job status
_, err = tx.ExecContext(ctx, `
UPDATE jobs
SET status = 'completed', completed_at = $1
WHERE job_id = $2
`, now, jobID)
if err != nil {
return fmt.Errorf("failed to update job: %w", err)
}
// Get match ID
var matchID string
err = tx.QueryRowContext(ctx, `
SELECT match_id FROM jobs WHERE job_id = $1
`, jobID).Scan(&matchID)
if err != nil {
return fmt.Errorf("failed to get match ID: %w", err)
}
// Update match status and result
scoresJSON, _ := json.Marshal(result.Scores)
_, err = tx.ExecContext(ctx, `
UPDATE matches
SET status = 'completed', winner = $1, condition = $2,
turn_count = $3, scores_json = $4, completed_at = $5
WHERE match_id = $6
`, winnerIndex, result.EndReason, result.Turns, scoresJSON, now, matchID)
if err != nil {
return fmt.Errorf("failed to update match: %w", err)
}
// Update participant scores
for botID, score := range result.Scores {
_, err = tx.ExecContext(ctx, `
UPDATE match_participants
SET score = $1
WHERE match_id = $2 AND bot_id = $3
`, score, matchID, botID)
if err != nil {
return fmt.Errorf("failed to update participant score: %w", err)
}
}
// Apply rating updates (Glicko-2)
for _, update := range ratingUpdates {
// Update bot rating
_, err = tx.ExecContext(ctx, `
UPDATE bots
SET rating_mu = $1, rating_phi = $2, rating_sigma = $3, last_active = $4
WHERE bot_id = $5
`, update.Mu, update.Phi, update.Sigma, now, update.BotID)
if err != nil {
return fmt.Errorf("failed to update bot rating: %w", err)
}
// Record rating history
_, err = tx.ExecContext(ctx, `
INSERT INTO rating_history (bot_id, match_id, rating, recorded_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (bot_id, match_id) DO UPDATE SET rating = $3, recorded_at = $4
`, update.BotID, matchID, update.DisplayRating, now)
if err != nil {
return fmt.Errorf("failed to record rating history: %w", err)
}
// Update participant with rating after
_, err = tx.ExecContext(ctx, `
UPDATE match_participants
SET rating_mu_after = $1, rating_phi_after = $2, rating_sigma_after = $3
WHERE match_id = $4 AND bot_id = $5
`, update.Mu, update.Phi, update.Sigma, matchID, update.BotID)
if err != nil {
return fmt.Errorf("failed to update participant rating after: %w", err)
}
}
// Resolve predictions for this match
if err := resolvePredictions(ctx, tx, matchID, result.WinnerID); err != nil {
// Log but don't fail the match result — predictions are non-critical
log.Printf("failed to resolve predictions for match %s: %v", matchID, err)
}
// Update series tables if this match is part of a series
if err := updateSeriesResult(ctx, tx, matchID, result.WinnerID); err != nil {
log.Printf("failed to update series result for match %s: %v", matchID, err)
}
// Update crash strikes and cooldown for each participant
if err := updateCrashStrikes(ctx, tx, result.CrashedBots); err != nil {
log.Printf("failed to update crash strikes for match %s: %v", matchID, err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
// resolvePredictions marks open predictions as correct/incorrect and updates predictor_stats.
// Uses RETURNING to only process predictions that were just resolved, preventing double-counting.
func resolvePredictions(ctx context.Context, tx *sql.Tx, matchID string, winnerBotID string) error {
var rows *sql.Rows
var err error
if winnerBotID == "" {
rows, err = tx.QueryContext(ctx, `
UPDATE predictions
SET correct = false, resolved_at = NOW()
WHERE match_id = $1 AND correct IS NULL
RETURNING predictor_id, correct
`, matchID)
} else {
rows, err = tx.QueryContext(ctx, `
UPDATE predictions
SET correct = (predicted_bot = $1), resolved_at = NOW()
WHERE match_id = $2 AND correct IS NULL
RETURNING predictor_id, correct
`, winnerBotID, matchID)
}
if err != nil {
return fmt.Errorf("failed to resolve predictions: %w", err)
}
defer rows.Close()
for rows.Next() {
var predictorID string
var correct bool
if err := rows.Scan(&predictorID, &correct); err != nil {
return fmt.Errorf("failed to scan resolved prediction: %w", err)
}
if err := upsertPredictorStats(ctx, tx, predictorID, correct); err != nil {
return fmt.Errorf("failed to update predictor_stats for %s: %w", predictorID, err)
}
}
return nil
}
// upsertPredictorStats updates the predictor_stats row for a single resolution.
func upsertPredictorStats(ctx context.Context, tx *sql.Tx, predictorID string, correct bool) error {
if correct {
_, err := tx.ExecContext(ctx, `
INSERT INTO predictor_stats (predictor_id, correct, streak, best_streak, updated_at)
VALUES ($1, 1, 1, 1, NOW())
ON CONFLICT (predictor_id) DO UPDATE SET
correct = predictor_stats.correct + 1,
streak = predictor_stats.streak + 1,
best_streak = GREATEST(predictor_stats.best_streak, predictor_stats.streak + 1),
updated_at = NOW()
`, predictorID)
return err
}
_, err := tx.ExecContext(ctx, `
INSERT INTO predictor_stats (predictor_id, incorrect, streak, best_streak, updated_at)
VALUES ($1, 1, 0, 0, NOW())
ON CONFLICT (predictor_id) DO UPDATE SET
incorrect = predictor_stats.incorrect + 1,
streak = 0,
updated_at = NOW()
`, predictorID)
return err
}
// FailJob marks a job as failed.
func (c *DBClient) FailJob(ctx context.Context, jobID string, workerID string, errorMessage string) error {
result, err := c.db.ExecContext(ctx, `
UPDATE jobs
SET status = 'failed', completed_at = NOW()
WHERE job_id = $1 AND worker_id = $2 AND status = 'claimed'
`, jobID, workerID)
if err != nil {
return fmt.Errorf("failed to fail job: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to check rows affected: %w", err)
}
if rows == 0 {
return fmt.Errorf("job not found or not claimed by this worker")
}
// Also update match status
_, err = c.db.ExecContext(ctx, `
UPDATE matches
SET status = 'failed', completed_at = NOW()
WHERE match_id = (SELECT match_id FROM jobs WHERE job_id = $1)
`, jobID)
if err != nil {
return fmt.Errorf("failed to update match status: %w", err)
}
return nil
}
// CrashCooldownDuration is the 30-minute cooldown when 3 consecutive crashes are detected.
const CrashCooldownDuration = 30 * time.Minute
// MaxCrashStrikes is the number of consecutive crashes that triggers cooldown.
const MaxCrashStrikes = 3
// updateCrashStrikes updates the crash_strikes and cooldown_until columns for
// match participants. A crashed bot gets its strikes incremented; a non-crashed
// bot has its strikes reset to 0. When strikes reach MaxCrashStrikes, cooldown
// is set to now + 30 min.
func updateCrashStrikes(ctx context.Context, tx *sql.Tx, crashedBots map[string]bool) error {
if len(crashedBots) == 0 {
return nil
}
for botID, crashed := range crashedBots {
if crashed {
// Increment strikes; if threshold reached, set cooldown
_, err := tx.ExecContext(ctx, `
UPDATE bots
SET crash_strikes = crash_strikes + 1,
cooldown_until = CASE
WHEN crash_strikes + 1 >= $1 THEN NOW() + $2
ELSE cooldown_until
END
WHERE bot_id = $3
`, MaxCrashStrikes, CrashCooldownDuration, botID)
if err != nil {
return fmt.Errorf("failed to increment crash strikes for %s: %w", botID, err)
}
} else {
// Reset strikes on successful match
_, err := tx.ExecContext(ctx, `
UPDATE bots SET crash_strikes = 0 WHERE bot_id = $1
`, botID)
if err != nil {
return fmt.Errorf("failed to reset crash strikes for %s: %w", botID, err)
}
}
}
return nil
}
// RatingUpdate represents a Glicko-2 rating update for a bot.
type RatingUpdate struct {
BotID string
Mu float64
Phi float64
Sigma float64
DisplayRating float64
RatingMuBefore float64
RatingPhiBefore float64
RatingDeviationChange float64
}
// GetBotRatings retrieves current ratings for a list of bots.
func (c *DBClient) GetBotRatings(ctx context.Context, botIDs []string) (map[string]Glicko2Rating, error) {
rows, err := c.db.QueryContext(ctx, `
SELECT bot_id, rating_mu, rating_phi, rating_sigma
FROM bots WHERE bot_id = ANY($1)
`, botIDs)
if err != nil {
return nil, fmt.Errorf("failed to get bot ratings: %w", err)
}
defer rows.Close()
ratings := make(map[string]Glicko2Rating)
for rows.Next() {
var botID string
var r Glicko2Rating
if err := rows.Scan(&botID, &r.Mu, &r.Phi, &r.Sigma); err != nil {
return nil, fmt.Errorf("failed to scan rating: %w", err)
}
ratings[botID] = r
}
return ratings, nil
}
// updateSeriesResult updates series_games.winner_id and series.a_wins/b_wins
// when a match that belongs to a series completes.
func updateSeriesResult(ctx context.Context, tx *sql.Tx, matchID string, winnerBotID string) error {
// Find the series_game for this match
var seriesID int64
var gameNum int
err := tx.QueryRowContext(ctx, `
SELECT series_id, game_num FROM series_games WHERE match_id = $1
`, matchID).Scan(&seriesID, &gameNum)
if err == sql.ErrNoRows {
return nil // not a series game
}
if err != nil {
return fmt.Errorf("find series game: %w", err)
}
// Update the series_games row with the winner
if _, err := tx.ExecContext(ctx, `
UPDATE series_games SET winner_id = $1 WHERE match_id = $2
`, winnerBotID, matchID); err != nil {
return fmt.Errorf("update series game winner: %w", err)
}
// Increment a_wins or b_wins on the series
if winnerBotID == "" {
return nil // draw — no increment
}
// Determine if the winner is bot_a or bot_b
var botAID string
err = tx.QueryRowContext(ctx, `
SELECT bot_a_id FROM series WHERE id = $1
`, seriesID).Scan(&botAID)
if err != nil {
return fmt.Errorf("find series bot_a: %w", err)
}
if winnerBotID == botAID {
_, err = tx.ExecContext(ctx, `
UPDATE series SET a_wins = a_wins + 1, updated_at = NOW() WHERE id = $1
`, seriesID)
} else {
_, err = tx.ExecContext(ctx, `
UPDATE series SET b_wins = b_wins + 1, updated_at = NOW() WHERE id = $1
`, seriesID)
}
if err != nil {
return fmt.Errorf("increment series wins: %w", err)
}
log.Printf("series: game %d result recorded — series %d, winner=%s", gameNum, seriesID, winnerBotID)
return nil
}