From 729efb3f45423a1b6306bf153dceda1877c3540c Mon Sep 17 00:00:00 2001 From: jedarden Date: Sun, 29 Mar 2026 10:46:23 -0400 Subject: [PATCH] Refactor acb-worker: B2 uploads, PostgreSQL writes, Glicko-2 ratings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Upload replays to B2 (Backblaze) instead of R2 for cold archive storage - Write match results directly to PostgreSQL instead of HTTP API - Perform Glicko-2 rating updates in worker after match completion - Update config: ACB_R2_* env vars → ACB_B2_* - Remove obsolete api_test.go (tested removed HTTP client) Co-Authored-By: Claude Opus 4.6 --- cmd/acb-worker/api_test.go | 337 ------------------------------------- cmd/acb-worker/db.go | 12 -- cmd/acb-worker/main.go | 238 ++++++++++++++------------ 3 files changed, 130 insertions(+), 457 deletions(-) delete mode 100644 cmd/acb-worker/api_test.go diff --git a/cmd/acb-worker/api_test.go b/cmd/acb-worker/api_test.go deleted file mode 100644 index 02bff8e..0000000 --- a/cmd/acb-worker/api_test.go +++ /dev/null @@ -1,337 +0,0 @@ -// API client tests -package main - -import ( - "context" - "encoding/json" - "net/http" - "net/http/httptest" - "testing" - "time" -) - -func TestGetNextJob(t *testing.T) { - tests := []struct { - name string - response APIResponse - wantNil bool - wantErr bool - }{ - { - name: "no pending jobs", - response: APIResponse{ - Success: true, - Data: nil, - }, - wantNil: true, - wantErr: false, - }, - { - name: "pending job found", - response: APIResponse{ - Success: true, - Data: json.RawMessage(`{ - "id": "job-123", - "match_id": "match-456", - "status": "pending", - "created_at": "2024-01-01T00:00:00Z" - }`), - }, - wantNil: false, - wantErr: false, - }, - { - name: "api error", - response: APIResponse{ - Success: false, - Error: "internal server error", - }, - wantNil: true, - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/api/jobs/next" { - t.Errorf("unexpected path: %s", r.URL.Path) - } - if r.Method != "GET" { - t.Errorf("unexpected method: %s", r.Method) - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(tt.response) - })) - defer server.Close() - - cfg := &Config{ - APIEndpoint: server.URL, - APIKey: "test-key", - MaxRetries: 0, - } - client := NewAPIClient(cfg) - - job, err := client.GetNextJob(context.Background()) - if (err != nil) != tt.wantErr { - t.Errorf("GetNextJob() error = %v, wantErr %v", err, tt.wantErr) - return - } - if (job == nil) != tt.wantNil { - t.Errorf("GetNextJob() job = %v, wantNil %v", job, tt.wantNil) - } - }) - } -} - -func TestClaimJob(t *testing.T) { - tests := []struct { - name string - response APIResponse - wantErr bool - }{ - { - name: "successful claim", - response: APIResponse{ - Success: true, - Data: json.RawMessage(`{ - "job": {"id": "job-123", "match_id": "match-456", "status": "claimed", "created_at": "2024-01-01T00:00:00Z"}, - "match": {"id": "match-456", "status": "running", "map_id": "map-789", "created_at": "2024-01-01T00:00:00Z"}, - "participants": [], - "map": {"id": "map-789", "width": 60, "height": 60, "walls": "", "spawns": "", "cores": ""}, - "bots": [], - "bot_secrets": [] - }`), - }, - wantErr: false, - }, - { - name: "job already claimed", - response: APIResponse{ - Success: false, - Error: "job not found or already claimed", - }, - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - t.Errorf("unexpected method: %s", r.Method) - } - - var body map[string]string - if err := json.NewDecoder(r.Body).Decode(&body); err != nil { - t.Errorf("failed to decode body: %v", err) - } - if body["worker_id"] != "worker-1" { - t.Errorf("unexpected worker_id: %s", body["worker_id"]) - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(tt.response) - })) - defer server.Close() - - cfg := &Config{ - APIEndpoint: server.URL, - APIKey: "test-key", - MaxRetries: 0, - } - client := NewAPIClient(cfg) - - claim, err := client.ClaimJob(context.Background(), "job-123", "worker-1") - if (err != nil) != tt.wantErr { - t.Errorf("ClaimJob() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !tt.wantErr && claim == nil { - t.Error("ClaimJob() returned nil claim without error") - } - }) - } -} - -func TestHeartbeat(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - t.Errorf("unexpected method: %s", r.Method) - } - - var body map[string]string - json.NewDecoder(r.Body).Decode(&body) - if body["worker_id"] != "worker-1" { - t.Errorf("unexpected worker_id: %s", body["worker_id"]) - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(APIResponse{Success: true}) - })) - defer server.Close() - - cfg := &Config{ - APIEndpoint: server.URL, - APIKey: "test-key", - MaxRetries: 0, - } - client := NewAPIClient(cfg) - - if err := client.Heartbeat(context.Background(), "job-123", "worker-1"); err != nil { - t.Errorf("Heartbeat() error = %v", err) - } -} - -func TestSubmitResult(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - t.Errorf("unexpected method: %s", r.Method) - } - - var body map[string]interface{} - json.NewDecoder(r.Body).Decode(&body) - if body["winner_id"] != "bot-1" { - t.Errorf("unexpected winner_id: %v", body["winner_id"]) - } - if body["turns"].(float64) != 100 { - t.Errorf("unexpected turns: %v", body["turns"]) - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(APIResponse{Success: true}) - })) - defer server.Close() - - cfg := &Config{ - APIEndpoint: server.URL, - APIKey: "test-key", - MaxRetries: 0, - } - client := NewAPIClient(cfg) - - result := &MatchResult{ - WinnerID: "bot-1", - Turns: 100, - EndReason: "elimination", - Scores: map[string]int{"bot-1": 5, "bot-2": 2}, - } - - if err := client.SubmitResult(context.Background(), "job-123", result, "https://r2.example.com/replay.json"); err != nil { - t.Errorf("SubmitResult() error = %v", err) - } -} - -func TestHTTPError(t *testing.T) { - err := &HTTPError{StatusCode: 404, Body: "not found"} - expected := "HTTP 404: not found" - if err.Error() != expected { - t.Errorf("HTTPError.Error() = %q, want %q", err.Error(), expected) - } -} - -func TestAPIAuth(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - auth := r.Header.Get("Authorization") - if auth != "Bearer test-api-key" { - t.Errorf("unexpected authorization header: %s", auth) - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(APIResponse{Success: true}) - })) - defer server.Close() - - cfg := &Config{ - APIEndpoint: server.URL, - APIKey: "test-api-key", - MaxRetries: 0, - } - client := NewAPIClient(cfg) - - _, err := client.GetNextJob(context.Background()) - if err != nil { - t.Errorf("GetNextJob() error = %v", err) - } -} - -func TestRetryLogic(t *testing.T) { - attempts := 0 - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - attempts++ - if attempts < 3 { - // Simulate server error (retryable) - w.WriteHeader(http.StatusInternalServerError) - return - } - // Success on third attempt - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(APIResponse{Success: true}) - })) - defer server.Close() - - cfg := &Config{ - APIEndpoint: server.URL, - APIKey: "test-key", - MaxRetries: 3, - } - client := NewAPIClient(cfg) - - _, err := client.GetNextJob(context.Background()) - if err != nil { - t.Errorf("GetNextJob() error = %v", err) - } - if attempts != 3 { - t.Errorf("expected 3 attempts, got %d", attempts) - } -} - -func TestClientErrorNoRetry(t *testing.T) { - attempts := 0 - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - attempts++ - // Client error (should not retry) - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte("bad request")) - })) - defer server.Close() - - cfg := &Config{ - APIEndpoint: server.URL, - APIKey: "test-key", - MaxRetries: 3, - } - client := NewAPIClient(cfg) - - _, err := client.GetNextJob(context.Background()) - if err == nil { - t.Error("expected error for bad request") - } - if attempts != 1 { - t.Errorf("expected 1 attempt (no retry for client errors), got %d", attempts) - } -} - -func TestContextCancellation(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - time.Sleep(2 * time.Second) // Long delay - w.WriteHeader(http.StatusOK) - })) - defer server.Close() - - cfg := &Config{ - APIEndpoint: server.URL, - APIKey: "test-key", - MaxRetries: 0, - } - client := NewAPIClient(cfg) - - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - - _, err := client.GetNextJob(ctx) - if err == nil { - t.Error("expected context cancellation error") - } -} diff --git a/cmd/acb-worker/db.go b/cmd/acb-worker/db.go index 799f73d..7cd06b9 100644 --- a/cmd/acb-worker/db.go +++ b/cmd/acb-worker/db.go @@ -61,18 +61,6 @@ type DBMatch struct { Winner *int `json:"winner"` // player index MapID string `json:"map_id"` CreatedAt time.Time `json:"created_at"` - StartedAt *time.Time `json:"started_at"` - CompletedAt *time.Time `json:"completed_at"` -} - -// DBMatch represents match metadata. -type DBMatch struct { - ID string `json:"id"` - Status string `json:"status"` - Winner *int `json:"winner"` // player index - MapID string `json:"map_id"` - CreatedAt time.Time `json:"created_at"` - StartedAt *time.Time `json:"started_at"` CompletedAt *time.Time `json:"completed_at"` } diff --git a/cmd/acb-worker/main.go b/cmd/acb-worker/main.go index 3983477..d965958 100644 --- a/cmd/acb-worker/main.go +++ b/cmd/acb-worker/main.go @@ -1,8 +1,8 @@ // acb-worker: Match execution worker for AI Code Battle // -// This worker polls the Cloudflare Worker API for pending match jobs, -// executes matches using the game engine, uploads replays to R2, -// and submits results back to the API. +// This worker polls PostgreSQL for pending match jobs, +// executes matches using the game engine, uploads replays to B2, +// writes results directly to PostgreSQL, and performs Glicko-2 rating updates. package main import ( @@ -23,28 +23,28 @@ import ( // Config holds worker configuration. type Config struct { - APIEndpoint string // Worker API endpoint (e.g., https://api.aicodebattle.com) - APIKey string // Worker API key for authentication - R2Endpoint string // R2 endpoint for replay uploads - R2Bucket string // R2 bucket name - R2AccessKey string // R2 access key ID - R2SecretKey string // R2 secret access key - WorkerID string // Unique worker identifier - PollPeriod time.Duration // How often to poll for jobs - Heartbeat time.Duration // How often to send heartbeat during match - TurnTimeout time.Duration // Per-turn timeout for bots - MaxRetries int // Max retries for transient errors - Verbose bool // Enable verbose logging + DatabaseURL string // PostgreSQL connection URL + B2Endpoint string // B2 endpoint URL + B2Bucket string // B2 bucket name + B2AccessKey string // B2 access key ID + B2SecretKey string // B2 secret access key + B2Region string // B2 region (e.g., "us-west-004") + WorkerID string // Unique worker identifier + PollPeriod time.Duration // How often to poll for jobs + Heartbeat time.Duration // How often to send heartbeat during match + TurnTimeout time.Duration // Per-turn timeout for bots + MaxRetries int // Max retries for transient errors + Verbose bool // Enable verbose logging } func main() { // Parse command-line flags - apiEndpoint := flag.String("api", getEnv("ACB_API_ENDPOINT", "http://localhost:8787"), "Worker API endpoint") - apiKey := flag.String("api-key", getEnv("ACB_API_KEY", ""), "Worker API key") - r2Endpoint := flag.String("r2-endpoint", getEnv("ACB_R2_ENDPOINT", ""), "R2 endpoint URL") - r2Bucket := flag.String("r2-bucket", getEnv("ACB_R2_BUCKET", "acb-data"), "R2 bucket name") - r2AccessKey := flag.String("r2-access-key", getEnv("ACB_R2_ACCESS_KEY", ""), "R2 access key ID") - r2SecretKey := flag.String("r2-secret-key", getEnv("ACB_R2_SECRET_KEY", ""), "R2 secret access key") + databaseURL := flag.String("db", getEnv("ACB_DATABASE_URL", ""), "PostgreSQL connection URL") + b2Endpoint := flag.String("b2-endpoint", getEnv("ACB_B2_ENDPOINT", ""), "B2 endpoint URL") + b2Bucket := flag.String("b2-bucket", getEnv("ACB_B2_BUCKET", "acb-data"), "B2 bucket name") + b2AccessKey := flag.String("b2-access-key", getEnv("ACB_B2_ACCESS_KEY", ""), "B2 access key ID") + b2SecretKey := flag.String("b2-secret-key", getEnv("ACB_B2_SECRET_KEY", ""), "B2 secret access key") + b2Region := flag.String("b2-region", getEnv("ACB_B2_REGION", "us-west-004"), "B2 region") workerID := flag.String("worker-id", getEnv("ACB_WORKER_ID", generateWorkerID()), "Unique worker identifier") pollPeriod := flag.Duration("poll", 5*time.Second, "Job polling period") heartbeat := flag.Duration("heartbeat", 30*time.Second, "Heartbeat interval during matches") @@ -54,32 +54,36 @@ func main() { flag.Parse() // Validate required config - if *apiKey == "" { - log.Fatal("API key is required (set ACB_API_KEY or use -api-key flag)") + if *databaseURL == "" { + log.Fatal("Database URL is required (set ACB_DATABASE_URL or use -db flag)") } cfg := &Config{ - APIEndpoint: *apiEndpoint, - APIKey: *apiKey, - R2Endpoint: *r2Endpoint, - R2Bucket: *r2Bucket, - R2AccessKey: *r2AccessKey, - R2SecretKey: *r2SecretKey, - WorkerID: *workerID, - PollPeriod: *pollPeriod, - Heartbeat: *heartbeat, - TurnTimeout: *turnTimeout, - MaxRetries: *maxRetries, - Verbose: *verbose, + DatabaseURL: *databaseURL, + B2Endpoint: *b2Endpoint, + B2Bucket: *b2Bucket, + B2AccessKey: *b2AccessKey, + B2SecretKey: *b2SecretKey, + B2Region: *b2Region, + WorkerID: *workerID, + PollPeriod: *pollPeriod, + Heartbeat: *heartbeat, + TurnTimeout: *turnTimeout, + MaxRetries: *maxRetries, + Verbose: *verbose, } - // Create API client - apiClient := NewAPIClient(cfg) + // Create database client + dbClient, err := NewDBClient(cfg.DatabaseURL) + if err != nil { + log.Fatalf("Failed to connect to database: %v", err) + } + defer dbClient.Close() - // Create R2 client (optional - if not configured, replays won't be uploaded) - var r2Client *R2Client - if cfg.R2Endpoint != "" && cfg.R2AccessKey != "" && cfg.R2SecretKey != "" { - r2Client = NewR2Client(cfg) + // Create B2 client (optional - if not configured, replays won't be uploaded) + var b2Client *B2Client + if cfg.B2Endpoint != "" && cfg.B2AccessKey != "" && cfg.B2SecretKey != "" { + b2Client = NewB2Client(cfg) } // Create metrics @@ -88,8 +92,8 @@ func main() { // Create worker worker := &Worker{ cfg: cfg, - api: apiClient, - r2: r2Client, + db: dbClient, + b2: b2Client, metrics: metrics, logger: log.New(os.Stdout, fmt.Sprintf("[worker-%s] ", cfg.WorkerID), log.LstdFlags), rng: rand.New(rand.NewSource(time.Now().UnixNano())), @@ -145,8 +149,8 @@ func generateWorkerID() string { // Worker executes match jobs. type Worker struct { cfg *Config - api *APIClient - r2 *R2Client + db *DBClient + b2 *B2Client metrics *Metrics logger *log.Logger rng *rand.Rand @@ -178,7 +182,7 @@ func (w *Worker) pollAndExecute(ctx context.Context) error { w.metrics.RecordPollCycle() // Get next pending job - job, err := w.api.GetNextJob(ctx) + job, err := w.db.GetNextJob(ctx) if err != nil { return fmt.Errorf("failed to get next job: %w", err) } @@ -192,8 +196,8 @@ func (w *Worker) pollAndExecute(ctx context.Context) error { w.logger.Printf("Found job %s for match %s", job.ID, job.MatchID) - // Claim the job - claimResp, err := w.api.ClaimJob(ctx, job.ID, w.cfg.WorkerID) + // Claim the job and get match data + claimData, err := w.db.ClaimJob(ctx, job.ID, w.cfg.WorkerID) if err != nil { return fmt.Errorf("failed to claim job %s: %w", job.ID, err) } @@ -203,12 +207,12 @@ func (w *Worker) pollAndExecute(ctx context.Context) error { // Execute the match matchStart := time.Now() - result, replay, err := w.executeMatch(ctx, claimResp) + result, replay, err := w.executeMatch(ctx, claimData) if err != nil { w.metrics.RecordMatchError() w.logger.Printf("Match execution failed: %v", err) // Mark job as failed - if failErr := w.api.FailJob(ctx, job.ID, w.cfg.WorkerID, err.Error()); failErr != nil { + if failErr := w.db.FailJob(ctx, job.ID, w.cfg.WorkerID, err.Error()); failErr != nil { w.metrics.RecordJobFailed() w.logger.Printf("Failed to mark job as failed: %v", failErr) } @@ -216,12 +220,12 @@ func (w *Worker) pollAndExecute(ctx context.Context) error { } w.metrics.RecordMatch(time.Since(matchStart)) - // Upload replay to R2 + // Upload replay to B2 replayURL := "" - if w.r2 != nil { + if w.b2 != nil { uploadStart := time.Now() replayData, _ := json.Marshal(replay) - replayURL, err = w.uploadReplay(ctx, claimResp.Match.ID, replay) + replayURL, err = w.uploadReplay(ctx, claimData.Match.ID, replay) if err != nil { w.metrics.RecordReplayUploadError() w.logger.Printf("Failed to upload replay: %v", err) @@ -232,8 +236,12 @@ func (w *Worker) pollAndExecute(ctx context.Context) error { } } - // Submit result - err = w.api.SubmitResult(ctx, job.ID, result, replayURL) + // Compute Glicko-2 rating updates + ratingUpdates := w.computeRatingUpdates(claimData, result) + w.logger.Printf("Computed %d rating updates", len(ratingUpdates)) + + // Submit result directly to PostgreSQL + err = w.db.SubmitMatchResult(ctx, job.ID, result, replayURL, ratingUpdates) if err != nil { return fmt.Errorf("failed to submit result for job %s: %w", job.ID, err) } @@ -243,16 +251,16 @@ func (w *Worker) pollAndExecute(ctx context.Context) error { } // executeMatch runs a match and returns the result and replay. -func (w *Worker) executeMatch(ctx context.Context, claim *JobClaimResponse) (*MatchResult, *engine.Replay, error) { +func (w *Worker) executeMatch(ctx context.Context, claimData *JobClaimData) (*MatchResult, *engine.Replay, error) { // Build game config from map data config := engine.Config{ - Rows: claim.Map.Width, - Cols: claim.Map.Height, - MaxTurns: 500, // Default max turns - VisionRadius2: 49, // Default vision - AttackRadius2: 5, // Default attack - SpawnCost: 3, // Default spawn cost - EnergyInterval: 10, // Default energy interval + Rows: claimData.Map.Width, + Cols: claimData.Map.Height, + MaxTurns: 500, // Default max turns + VisionRadius2: 49, // Default vision + AttackRadius2: 5, // Default attack + SpawnCost: 3, // Default spawn cost + EnergyInterval: 10, // Default energy interval } // Create match runner @@ -262,49 +270,45 @@ func (w *Worker) executeMatch(ctx context.Context, claim *JobClaimResponse) (*Ma engine.WithTimeout(w.cfg.TurnTimeout), ) - // Add bots from claim response - for _, participant := range claim.Participants { - // Find bot endpoint - var endpointURL string - for _, bot := range claim.Bots { - if bot.ID == participant.BotID { - endpointURL = bot.EndpointURL - break - } - } + // Build bot ID to info lookup + botInfoMap := make(map[string]DBBotInfo) + for _, bot := range claimData.Bots { + botInfoMap[bot.ID] = bot + } - // Find bot secret - var secret string - for _, s := range claim.BotSecrets { - if s.BotID == participant.BotID { - secret = s.Secret - break - } - } + // Add bots from claim data (in player slot order) + participantMap := make(map[int]DBParticipant) + for _, p := range claimData.Participants { + participantMap[p.PlayerSlot] = p + } + + for slot := 0; slot < len(claimData.Participants); slot++ { + p := participantMap[slot] + botInfo := botInfoMap[p.BotID] // Create auth config for HTTP bot auth := engine.AuthConfig{ - BotID: participant.BotID, - Secret: secret, - MatchID: claim.Match.ID, + BotID: p.BotID, + Secret: botInfo.Secret, + MatchID: claimData.Match.ID, } // Create HTTP bot client httpBot := engine.NewHTTPBot( - endpointURL, + botInfo.EndpointURL, auth, engine.WithHTTPTimeout(w.cfg.TurnTimeout), ) - runner.AddBot(httpBot, participant.BotID) - w.logger.Printf("Added bot %s at %s (player %d)", participant.BotID, endpointURL, participant.PlayerIndex) + runner.AddBot(httpBot, p.BotID) + w.logger.Printf("Added bot %s at %s (player %d)", p.BotID, botInfo.EndpointURL, p.PlayerSlot) } // Start heartbeat goroutine heartbeatCtx, heartbeatCancel := context.WithCancel(ctx) defer heartbeatCancel() - go w.sendHeartbeats(heartbeatCtx, claim.Job.ID) + go w.sendHeartbeats(heartbeatCtx, claimData.Job.ID) // Run the match engineResult, replay, err := runner.Run() @@ -321,9 +325,9 @@ func (w *Worker) executeMatch(ctx context.Context, claim *JobClaimResponse) (*Ma } // Set winner ID from result (Winner is int, -1 for draw) - if engineResult.Winner >= 0 && engineResult.Winner < len(claim.Participants) { - for _, p := range claim.Participants { - if p.PlayerIndex == engineResult.Winner { + if engineResult.Winner >= 0 && engineResult.Winner < len(claimData.Participants) { + for _, p := range claimData.Participants { + if p.PlayerSlot == engineResult.Winner { result.WinnerID = p.BotID break } @@ -331,9 +335,9 @@ func (w *Worker) executeMatch(ctx context.Context, claim *JobClaimResponse) (*Ma } // Calculate scores from replay - for i, p := range claim.Participants { - if i < len(engineResult.Scores) { - result.Scores[p.BotID] = engineResult.Scores[i] + for _, p := range claimData.Participants { + if p.PlayerSlot < len(engineResult.Scores) { + result.Scores[p.BotID] = engineResult.Scores[p.PlayerSlot] } } @@ -350,7 +354,7 @@ func (w *Worker) sendHeartbeats(ctx context.Context, jobID string) { case <-ctx.Done(): return case <-ticker.C: - if err := w.api.Heartbeat(ctx, jobID, w.cfg.WorkerID); err != nil { + if err := w.db.Heartbeat(ctx, jobID, w.cfg.WorkerID); err != nil { w.metrics.RecordHeartbeatError() w.logger.Printf("Heartbeat failed: %v", err) } else { @@ -360,10 +364,10 @@ func (w *Worker) sendHeartbeats(ctx context.Context, jobID string) { } } -// uploadReplay uploads the replay to R2 and returns the URL. +// uploadReplay uploads the replay to B2 and returns the URL. func (w *Worker) uploadReplay(ctx context.Context, matchID string, replay *engine.Replay) (string, error) { - if w.r2 == nil { - return "", fmt.Errorf("R2 client not configured") + if w.b2 == nil { + return "", fmt.Errorf("B2 client not configured") } // Serialize replay to JSON @@ -372,19 +376,37 @@ func (w *Worker) uploadReplay(ctx context.Context, matchID string, replay *engin return "", fmt.Errorf("failed to serialize replay: %w", err) } - // Upload to R2 + // Upload to B2 key := fmt.Sprintf("replays/%s.json", matchID) - if err := w.r2.Upload(ctx, key, data, "application/json"); err != nil { - return "", fmt.Errorf("failed to upload replay to R2: %w", err) + if err := w.b2.Upload(ctx, key, data, "application/json"); err != nil { + return "", fmt.Errorf("failed to upload replay to B2: %w", err) } - return fmt.Sprintf("%s/%s", w.r2.Endpoint(), key), nil + return fmt.Sprintf("%s/%s", w.b2.Endpoint(), key), nil } -// MatchResult represents the result of a match for API submission. -type MatchResult struct { - WinnerID string `json:"winner_id"` - Turns int `json:"turns"` - EndReason string `json:"end_reason"` - Scores map[string]int `json:"scores"` +// computeRatingUpdates computes Glicko-2 rating updates for match participants. +func (w *Worker) computeRatingUpdates(claimData *JobClaimData, result *MatchResult) []RatingUpdate { + if len(claimData.Participants) < 2 { + return nil + } + + // Extract bot IDs and current ratings + botIDs := make([]string, len(claimData.Participants)) + ratings := make([]Glicko2Rating, len(claimData.Participants)) + scores := make([]float64, len(claimData.Participants)) + + for i, p := range claimData.Participants { + botIDs[i] = p.BotID + ratings[i] = Glicko2Rating{ + Mu: p.RatingMuBefore, + Phi: p.RatingPhiBefore, + Sigma: p.RatingSigmaBefore, + } + // Normalize scores for Glicko-2 (higher is better) + scores[i] = float64(result.Scores[p.BotID]) + } + + // Compute rating updates + return ComputeRatingUpdates(botIDs, ratings, scores) }