// PostgreSQL database client for match results and job coordination package main import ( "context" "database/sql" "encoding/json" "fmt" "log" "time" _ "github.com/lib/pq" ) // DBClient handles PostgreSQL operations. type DBClient struct { db *sql.DB } // NewDBClient creates a new database client. func NewDBClient(databaseURL string) (*DBClient, error) { db, err := sql.Open("postgres", databaseURL) if err != nil { return nil, fmt.Errorf("failed to open database: %w", err) } // Configure connection pool db.SetMaxOpenConns(10) db.SetMaxIdleConns(5) db.SetConnMaxLifetime(5 * time.Minute) // Verify connection ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := db.PingContext(ctx); err != nil { return nil, fmt.Errorf("failed to connect to database: %w", err) } return &DBClient{db: db}, nil } // Close closes the database connection. func (c *DBClient) Close() error { return c.db.Close() } // DBJob represents a pending job from the database. type DBJob struct { ID string `json:"id"` MatchID string `json:"match_id"` Status string `json:"status"` WorkerID *string `json:"worker_id"` ClaimedAt *time.Time `json:"claimed_at"` HeartbeatAt *time.Time `json:"heartbeat_at"` CreatedAt time.Time `json:"created_at"` } // DBMatch represents match metadata from the database. type DBMatch struct { ID string `json:"id"` Status string `json:"status"` Winner *int `json:"winner"` // player index MapID string `json:"map_id"` CreatedAt time.Time `json:"created_at"` CompletedAt *time.Time `json:"completed_at"` SeasonID string `json:"season_id"` RulesVersion string `json:"rules_version"` } // DBParticipant represents a match participant. type DBParticipant struct { MatchID string `json:"match_id"` BotID string `json:"bot_id"` PlayerSlot int `json:"player_slot"` Score int `json:"score"` RatingMuBefore float64 RatingPhiBefore float64 RatingSigmaBefore float64 RatingMuAfter *float64 RatingPhiAfter *float64 RatingSigmaAfter *float64 } // DBBotInfo contains bot endpoint and secret information. type DBBotInfo struct { ID string EndpointURL string Secret string } // DBMapData represents map configuration. type DBMapData struct { ID string `json:"id"` Width int `json:"width"` Height int `json:"height"` Walls string `json:"walls"` Spawns string `json:"spawns"` Cores string `json:"cores"` } // JobClaimData contains all data needed to execute a match. type JobClaimData struct { Job DBJob Match DBMatch Participants []DBParticipant Map DBMapData Bots []DBBotInfo } // GetNextJob fetches the next pending job from the database. func (c *DBClient) GetNextJob(ctx context.Context) (*DBJob, error) { query := ` SELECT job_id, match_id, status, worker_id, claimed_at, heartbeat_at, created_at FROM jobs WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED ` var job DBJob err := c.db.QueryRowContext(ctx, query).Scan( &job.ID, &job.MatchID, &job.Status, &job.WorkerID, &job.ClaimedAt, &job.HeartbeatAt, &job.CreatedAt, ) if err == sql.ErrNoRows { return nil, nil // No pending jobs } if err != nil { return nil, fmt.Errorf("failed to get next job: %w", err) } return &job, nil } // ClaimJob claims a job and returns all data needed to execute the match. func (c *DBClient) ClaimJob(ctx context.Context, jobID string, workerID string) (*JobClaimData, error) { tx, err := c.db.BeginTx(ctx, nil) if err != nil { return nil, fmt.Errorf("failed to begin transaction: %w", err) } defer tx.Rollback() // Update job status now := time.Now().UTC() _, err = tx.ExecContext(ctx, ` UPDATE jobs SET status = 'claimed', worker_id = $1, claimed_at = $2 WHERE job_id = $3 AND status = 'pending' `, workerID, now, jobID) if err != nil { return nil, fmt.Errorf("failed to claim job: %w", err) } // Get job details var job DBJob err = tx.QueryRowContext(ctx, ` SELECT job_id, match_id, status, worker_id, claimed_at, heartbeat_at, created_at FROM jobs WHERE job_id = $1 `, jobID).Scan( &job.ID, &job.MatchID, &job.Status, &job.WorkerID, &job.ClaimedAt, &job.HeartbeatAt, &job.CreatedAt, ) if err != nil { return nil, fmt.Errorf("failed to get job: %w", err) } // Get match details + active season info var match DBMatch err = tx.QueryRowContext(ctx, ` SELECT m.match_id, m.status, m.winner, m.map_id, m.created_at, m.completed_at, COALESCE(s.id::text, ''), COALESCE(s.rules_version::text, '') FROM matches m LEFT JOIN seasons s ON s.status = 'active' WHERE m.match_id = $1 `, job.MatchID).Scan( &match.ID, &match.Status, &match.Winner, &match.MapID, &match.CreatedAt, &match.CompletedAt, &match.SeasonID, &match.RulesVersion, ) if err != nil { return nil, fmt.Errorf("failed to get match: %w", err) } // Get map data var mapData DBMapData err = tx.QueryRowContext(ctx, ` SELECT map_id, grid_width, grid_height, COALESCE(map_json->>'walls', '') as walls, COALESCE(map_json->>'spawns', '') as spawns, COALESCE(map_json->>'cores', '') as cores FROM maps WHERE map_id = $1 `, match.MapID).Scan( &mapData.ID, &mapData.Width, &mapData.Height, &mapData.Walls, &mapData.Spawns, &mapData.Cores, ) if err != nil { return nil, fmt.Errorf("failed to get map: %w", err) } // Get participants participantRows, err := tx.QueryContext(ctx, ` SELECT mp.match_id, mp.bot_id, mp.player_slot, mp.score, b.rating_mu, b.rating_phi, b.rating_sigma FROM match_participants mp JOIN bots b ON mp.bot_id = b.bot_id WHERE mp.match_id = $1 ORDER BY mp.player_slot `, job.MatchID) if err != nil { return nil, fmt.Errorf("failed to get participants: %w", err) } defer participantRows.Close() var participants []DBParticipant var botIDs []string for participantRows.Next() { var p DBParticipant err := participantRows.Scan( &p.MatchID, &p.BotID, &p.PlayerSlot, &p.Score, &p.RatingMuBefore, &p.RatingPhiBefore, &p.RatingSigmaBefore, ) if err != nil { return nil, fmt.Errorf("failed to scan participant: %w", err) } participants = append(participants, p) botIDs = append(botIDs, p.BotID) } // Get bot endpoints and secrets botRows, err := tx.QueryContext(ctx, ` SELECT bot_id, endpoint_url, shared_secret FROM bots WHERE bot_id = ANY($1) `, botIDs) if err != nil { return nil, fmt.Errorf("failed to get bots: %w", err) } defer botRows.Close() var bots []DBBotInfo for botRows.Next() { var b DBBotInfo if err := botRows.Scan(&b.ID, &b.EndpointURL, &b.Secret); err != nil { return nil, fmt.Errorf("failed to scan bot: %w", err) } bots = append(bots, b) } // Update match status to running _, err = tx.ExecContext(ctx, ` UPDATE matches SET status = 'running' WHERE match_id = $1 `, job.MatchID) if err != nil { return nil, fmt.Errorf("failed to update match status: %w", err) } if err := tx.Commit(); err != nil { return nil, fmt.Errorf("failed to commit transaction: %w", err) } return &JobClaimData{ Job: job, Match: match, Participants: participants, Map: mapData, Bots: bots, }, nil } // Heartbeat updates the heartbeat timestamp for a job. func (c *DBClient) Heartbeat(ctx context.Context, jobID string, workerID string) error { result, err := c.db.ExecContext(ctx, ` UPDATE jobs SET heartbeat_at = NOW() WHERE job_id = $1 AND worker_id = $2 AND status = 'claimed' `, jobID, workerID) if err != nil { return fmt.Errorf("failed to send heartbeat: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("failed to check rows affected: %w", err) } if rows == 0 { return fmt.Errorf("job not found or not claimed by this worker") } return nil } // SubmitMatchResult writes the match result to the database and updates ratings. func (c *DBClient) SubmitMatchResult(ctx context.Context, jobID string, result *MatchResult, replayURL string, ratingUpdates []RatingUpdate) error { tx, err := c.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } defer tx.Rollback() now := time.Now().UTC() // Determine winner player index from result var winnerIndex *int if result.WinnerID != "" { // Look up player slot for winner var idx int err := tx.QueryRowContext(ctx, ` SELECT player_slot FROM match_participants WHERE match_id = ( SELECT match_id FROM jobs WHERE job_id = $1 ) AND bot_id = $2 `, jobID, result.WinnerID).Scan(&idx) if err == nil { winnerIndex = &idx } } // Update job status _, err = tx.ExecContext(ctx, ` UPDATE jobs SET status = 'completed', completed_at = $1 WHERE job_id = $2 `, now, jobID) if err != nil { return fmt.Errorf("failed to update job: %w", err) } // Get match ID var matchID string err = tx.QueryRowContext(ctx, ` SELECT match_id FROM jobs WHERE job_id = $1 `, jobID).Scan(&matchID) if err != nil { return fmt.Errorf("failed to get match ID: %w", err) } // Update match status and result scoresJSON, _ := json.Marshal(result.Scores) _, err = tx.ExecContext(ctx, ` UPDATE matches SET status = 'completed', winner = $1, condition = $2, turn_count = $3, scores_json = $4, completed_at = $5, combat_turns = $7 WHERE match_id = $6 `, winnerIndex, result.EndReason, result.Turns, scoresJSON, now, matchID, result.CombatTurns) if err != nil { return fmt.Errorf("failed to update match: %w", err) } // Update participant scores for botID, score := range result.Scores { _, err = tx.ExecContext(ctx, ` UPDATE match_participants SET score = $1 WHERE match_id = $2 AND bot_id = $3 `, score, matchID, botID) if err != nil { return fmt.Errorf("failed to update participant score: %w", err) } } // Apply rating updates (Glicko-2) for _, update := range ratingUpdates { // Update bot rating _, err = tx.ExecContext(ctx, ` UPDATE bots SET rating_mu = $1, rating_phi = $2, rating_sigma = $3, last_active = $4 WHERE bot_id = $5 `, update.Mu, update.Phi, update.Sigma, now, update.BotID) if err != nil { return fmt.Errorf("failed to update bot rating: %w", err) } // Record rating history _, err = tx.ExecContext(ctx, ` INSERT INTO rating_history (bot_id, match_id, rating, recorded_at) VALUES ($1, $2, $3, $4) ON CONFLICT (bot_id, match_id) DO UPDATE SET rating = $3, recorded_at = $4 `, update.BotID, matchID, update.DisplayRating, now) if err != nil { return fmt.Errorf("failed to record rating history: %w", err) } // Update participant with rating after _, err = tx.ExecContext(ctx, ` UPDATE match_participants SET rating_mu_after = $1, rating_phi_after = $2, rating_sigma_after = $3 WHERE match_id = $4 AND bot_id = $5 `, update.Mu, update.Phi, update.Sigma, matchID, update.BotID) if err != nil { return fmt.Errorf("failed to update participant rating after: %w", err) } } // Resolve predictions for this match if err := resolvePredictions(ctx, tx, matchID, result.WinnerID); err != nil { // Log but don't fail the match result — predictions are non-critical log.Printf("failed to resolve predictions for match %s: %v", matchID, err) } // Update series tables if this match is part of a series if err := updateSeriesResult(ctx, tx, matchID, result.WinnerID); err != nil { log.Printf("failed to update series result for match %s: %v", matchID, err) } // Update crash strikes only for technical failures, not for normal game // endings. In snake-style games every bot eventually "crashes" (hits a wall // or another snake) — that is the normal end condition, not an error. // "stalemate" and "turns" are the expected EndReason values for normal play. if result.EndReason != "stalemate" && result.EndReason != "turns" { if err := updateCrashStrikes(ctx, tx, result.CrashedBots); err != nil { log.Printf("failed to update crash strikes for match %s: %v", matchID, err) } } if err := tx.Commit(); err != nil { return fmt.Errorf("failed to commit transaction: %w", err) } return nil } // resolvePredictions marks open predictions as correct/incorrect and updates predictor_stats. // Uses RETURNING to only process predictions that were just resolved, preventing double-counting. func resolvePredictions(ctx context.Context, tx *sql.Tx, matchID string, winnerBotID string) error { var rows *sql.Rows var err error if winnerBotID == "" { rows, err = tx.QueryContext(ctx, ` UPDATE predictions SET correct = false, resolved_at = NOW() WHERE match_id = $1 AND correct IS NULL RETURNING predictor_id, correct `, matchID) } else { rows, err = tx.QueryContext(ctx, ` UPDATE predictions SET correct = (predicted_bot = $1), resolved_at = NOW() WHERE match_id = $2 AND correct IS NULL RETURNING predictor_id, correct `, winnerBotID, matchID) } if err != nil { return fmt.Errorf("failed to resolve predictions: %w", err) } defer rows.Close() for rows.Next() { var predictorID string var correct bool if err := rows.Scan(&predictorID, &correct); err != nil { return fmt.Errorf("failed to scan resolved prediction: %w", err) } if err := upsertPredictorStats(ctx, tx, predictorID, correct); err != nil { return fmt.Errorf("failed to update predictor_stats for %s: %w", predictorID, err) } } return nil } // upsertPredictorStats updates the predictor_stats row for a single resolution. func upsertPredictorStats(ctx context.Context, tx *sql.Tx, predictorID string, correct bool) error { if correct { _, err := tx.ExecContext(ctx, ` INSERT INTO predictor_stats (predictor_id, correct, streak, best_streak, updated_at) VALUES ($1, 1, 1, 1, NOW()) ON CONFLICT (predictor_id) DO UPDATE SET correct = predictor_stats.correct + 1, streak = predictor_stats.streak + 1, best_streak = GREATEST(predictor_stats.best_streak, predictor_stats.streak + 1), updated_at = NOW() `, predictorID) return err } _, err := tx.ExecContext(ctx, ` INSERT INTO predictor_stats (predictor_id, incorrect, streak, best_streak, updated_at) VALUES ($1, 1, 0, 0, NOW()) ON CONFLICT (predictor_id) DO UPDATE SET incorrect = predictor_stats.incorrect + 1, streak = 0, updated_at = NOW() `, predictorID) return err } // FailJob marks a job as failed. func (c *DBClient) FailJob(ctx context.Context, jobID string, workerID string, errorMessage string) error { result, err := c.db.ExecContext(ctx, ` UPDATE jobs SET status = 'failed', completed_at = NOW() WHERE job_id = $1 AND worker_id = $2 AND status = 'claimed' `, jobID, workerID) if err != nil { return fmt.Errorf("failed to fail job: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("failed to check rows affected: %w", err) } if rows == 0 { return fmt.Errorf("job not found or not claimed by this worker") } // Also update match status _, err = c.db.ExecContext(ctx, ` UPDATE matches SET status = 'failed', completed_at = NOW() WHERE match_id = (SELECT match_id FROM jobs WHERE job_id = $1) `, jobID) if err != nil { return fmt.Errorf("failed to update match status: %w", err) } return nil } // CrashCooldownDuration is the 30-minute cooldown when 3 consecutive crashes are detected. const CrashCooldownDuration = 30 * time.Minute // MaxCrashStrikes is the number of consecutive crashes that triggers cooldown. const MaxCrashStrikes = 3 // updateCrashStrikes updates the crash_strikes and cooldown_until columns for // match participants. A crashed bot gets its strikes incremented; a non-crashed // bot has its strikes reset to 0. When strikes reach MaxCrashStrikes, cooldown // is set to now + 30 min. func updateCrashStrikes(ctx context.Context, tx *sql.Tx, crashedBots map[string]bool) error { if len(crashedBots) == 0 { return nil } cooldownUntil := time.Now().Add(CrashCooldownDuration) for botID, crashed := range crashedBots { if crashed { // Increment strikes; if threshold reached, set cooldown. // Pass cooldownUntil as time.Time so pq encodes it as a proper // timestamptz — passing time.Duration directly sends raw nanoseconds // which PostgreSQL interprets as seconds, resulting in a ~57000-year cooldown. _, err := tx.ExecContext(ctx, ` UPDATE bots SET crash_strikes = crash_strikes + 1, cooldown_until = CASE WHEN crash_strikes + 1 >= $1 THEN $2 ELSE cooldown_until END WHERE bot_id = $3 `, MaxCrashStrikes, cooldownUntil, botID) if err != nil { return fmt.Errorf("failed to increment crash strikes for %s: %w", botID, err) } } else { // Reset strikes on successful match _, err := tx.ExecContext(ctx, ` UPDATE bots SET crash_strikes = 0 WHERE bot_id = $1 `, botID) if err != nil { return fmt.Errorf("failed to reset crash strikes for %s: %w", botID, err) } } } return nil } // RatingUpdate represents a Glicko-2 rating update for a bot. type RatingUpdate struct { BotID string Mu float64 Phi float64 Sigma float64 DisplayRating float64 RatingMuBefore float64 RatingPhiBefore float64 RatingDeviationChange float64 } // GetBotRatings retrieves current ratings for a list of bots. func (c *DBClient) GetBotRatings(ctx context.Context, botIDs []string) (map[string]Glicko2Rating, error) { rows, err := c.db.QueryContext(ctx, ` SELECT bot_id, rating_mu, rating_phi, rating_sigma FROM bots WHERE bot_id = ANY($1) `, botIDs) if err != nil { return nil, fmt.Errorf("failed to get bot ratings: %w", err) } defer rows.Close() ratings := make(map[string]Glicko2Rating) for rows.Next() { var botID string var r Glicko2Rating if err := rows.Scan(&botID, &r.Mu, &r.Phi, &r.Sigma); err != nil { return nil, fmt.Errorf("failed to scan rating: %w", err) } ratings[botID] = r } return ratings, nil } // updateSeriesResult updates series_games.winner_id and series.a_wins/b_wins // when a match that belongs to a series completes. func updateSeriesResult(ctx context.Context, tx *sql.Tx, matchID string, winnerBotID string) error { // Find the series_game for this match var seriesID int64 var gameNum int err := tx.QueryRowContext(ctx, ` SELECT series_id, game_num FROM series_games WHERE match_id = $1 `, matchID).Scan(&seriesID, &gameNum) if err == sql.ErrNoRows { return nil // not a series game } if err != nil { return fmt.Errorf("find series game: %w", err) } // Update the series_games row with the winner if _, err := tx.ExecContext(ctx, ` UPDATE series_games SET winner_id = $1 WHERE match_id = $2 `, winnerBotID, matchID); err != nil { return fmt.Errorf("update series game winner: %w", err) } // Increment a_wins or b_wins on the series if winnerBotID == "" { return nil // draw — no increment } // Determine if the winner is bot_a or bot_b var botAID string err = tx.QueryRowContext(ctx, ` SELECT bot_a_id FROM series WHERE id = $1 `, seriesID).Scan(&botAID) if err != nil { return fmt.Errorf("find series bot_a: %w", err) } if winnerBotID == botAID { _, err = tx.ExecContext(ctx, ` UPDATE series SET a_wins = a_wins + 1, updated_at = NOW() WHERE id = $1 `, seriesID) } else { _, err = tx.ExecContext(ctx, ` UPDATE series SET b_wins = b_wins + 1, updated_at = NOW() WHERE id = $1 `, seriesID) } if err != nil { return fmt.Errorf("increment series wins: %w", err) } log.Printf("series: game %d result recorded — series %d, winner=%s", gameNum, seriesID, winnerBotID) return nil } // UpdateMapEngagement updates the engagement score for a map using a rolling average. // The new engagement score is computed as: (old_engagement * match_count + new_engagement) / (match_count + 1) func (c *DBClient) UpdateMapEngagement(ctx context.Context, mapID string, engagementScore float64, turns int) error { // Use a transaction to safely read and update the engagement score tx, err := c.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } defer tx.Rollback() // Get current engagement and match count var currentEngagement float64 var matchCount int err = tx.QueryRowContext(ctx, ` SELECT COALESCE(engagement, 0.0), COALESCE(match_count, 0) FROM maps WHERE map_id = $1 `, mapID).Scan(¤tEngagement, &matchCount) if err != nil { return fmt.Errorf("failed to get current map stats: %w", err) } // Compute rolling average newEngagement := (currentEngagement*float64(matchCount) + engagementScore) / float64(matchCount+1) // Update engagement and match count _, err = tx.ExecContext(ctx, ` UPDATE maps SET engagement = $1, match_count = match_count + 1, last_used_at = NOW() WHERE map_id = $2 `, newEngagement, mapID) if err != nil { return fmt.Errorf("failed to update map engagement: %w", err) } return tx.Commit() } // CompletedMatchForRecalc represents a completed match with participants for rating recalculation. type CompletedMatchForRecalc struct { ID string CompletedAt time.Time Winner *int // player_slot of winner, nil for draw WinnerBotID *string // bot_id of winner (derived from winner player_slot) Participants []MatchParticipantForRecalc } // MatchParticipantForRecalc represents a match participant for rating recalculation. type MatchParticipantForRecalc struct { BotID string PlayerSlot int } // ResetAllRatings resets all bot ratings to Glicko-2 default values. func (c *DBClient) ResetAllRatings(ctx context.Context) error { _, err := c.db.ExecContext(ctx, ` UPDATE bots SET rating_mu = $1, rating_phi = $2, rating_sigma = $3 `, glicko2DefaultMu, glicko2DefaultRD, glicko2DefaultSigma) if err != nil { return fmt.Errorf("failed to reset ratings: %w", err) } return nil } // GetAllCompletedMatches fetches all completed matches with their participants // in chronological order (by completed_at). Used for rating recovery. func (c *DBClient) GetAllCompletedMatches(ctx context.Context) ([]CompletedMatchForRecalc, error) { // First, get all completed matches in order rows, err := c.db.QueryContext(ctx, ` SELECT match_id, winner, completed_at FROM matches WHERE status = 'completed' AND completed_at IS NOT NULL ORDER BY completed_at ASC `) if err != nil { return nil, fmt.Errorf("failed to query completed matches: %w", err) } defer rows.Close() var matches []CompletedMatchForRecalc for rows.Next() { var m CompletedMatchForRecalc err := rows.Scan(&m.ID, &m.Winner, &m.CompletedAt) if err != nil { return nil, fmt.Errorf("failed to scan match: %w", err) } matches = append(matches, m) } if rows.Err() != nil { return nil, fmt.Errorf("error iterating matches: %w", rows.Err()) } // For each match, get participants for i := range matches { partRows, err := c.db.QueryContext(ctx, ` SELECT bot_id, player_slot FROM match_participants WHERE match_id = $1 ORDER BY player_slot `, matches[i].ID) if err != nil { return nil, fmt.Errorf("failed to query participants for match %s: %w", matches[i].ID, err) } var participants []MatchParticipantForRecalc for partRows.Next() { var p MatchParticipantForRecalc err := partRows.Scan(&p.BotID, &p.PlayerSlot) if err != nil { partRows.Close() return nil, fmt.Errorf("failed to scan participant: %w", err) } participants = append(participants, p) } partRows.Close() if partRows.Err() != nil { return nil, fmt.Errorf("error iterating participants for match %s: %w", matches[i].ID, partRows.Err()) } matches[i].Participants = participants // Derive WinnerBotID from Winner (player_slot) if matches[i].Winner != nil { for _, p := range participants { if p.PlayerSlot == *matches[i].Winner { winnerID := p.BotID matches[i].WinnerBotID = &winnerID break } } } } return matches, nil } // UpdateAllRatings updates all bot ratings in a single transaction. func (c *DBClient) UpdateAllRatings(ctx context.Context, ratings map[string]Glicko2Rating) error { tx, err := c.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } defer tx.Rollback() now := time.Now().UTC() for botID, rating := range ratings { _, err := tx.ExecContext(ctx, ` UPDATE bots SET rating_mu = $1, rating_phi = $2, rating_sigma = $3, last_active = $4 WHERE bot_id = $5 `, rating.Mu, rating.Phi, rating.Sigma, now, botID) if err != nil { return fmt.Errorf("failed to update rating for bot %s: %w", botID, err) } } if err := tx.Commit(); err != nil { return fmt.Errorf("failed to commit transaction: %w", err) } return nil }