From 6659027bec1ada670d54d7220af4ddfef72f6218 Mon Sep 17 00:00:00 2001 From: jedarden Date: Tue, 24 Mar 2026 08:06:15 -0400 Subject: [PATCH] Implement match worker container (cmd/acb-worker/) - Worker polls Cloudflare Worker API for pending match jobs - Claims jobs and executes matches using the game engine - Uploads replays to R2 via S3-compatible API - Sends heartbeats during match execution - Submits results back to Worker API - Includes retry logic with exponential backoff - API client tests for job coordination endpoints Also fixes glicko2.ts: export g() and E() functions for testing Co-Authored-By: Claude Opus 4.6 --- cmd/acb-worker/api.go | 274 +++++++++++++++++++++++++ cmd/acb-worker/api_test.go | 337 +++++++++++++++++++++++++++++++ cmd/acb-worker/main.go | 352 +++++++++++++++++++++++++++++++++ cmd/acb-worker/r2.go | 120 +++++++++++ go.mod | 22 +++ go.sum | 38 ++++ worker-api/src/glicko2.test.ts | 4 +- worker-api/src/glicko2.ts | 4 +- 8 files changed, 1147 insertions(+), 4 deletions(-) create mode 100644 cmd/acb-worker/api.go create mode 100644 cmd/acb-worker/api_test.go create mode 100644 cmd/acb-worker/main.go create mode 100644 cmd/acb-worker/r2.go create mode 100644 go.sum diff --git a/cmd/acb-worker/api.go b/cmd/acb-worker/api.go new file mode 100644 index 0000000..2c65f99 --- /dev/null +++ b/cmd/acb-worker/api.go @@ -0,0 +1,274 @@ +// API client for Worker API communication +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +// APIClient communicates with the Worker API. +type APIClient struct { + endpoint string + apiKey string + httpClient *http.Client + maxRetries int +} + +// NewAPIClient creates a new API client. +func NewAPIClient(cfg *Config) *APIClient { + return &APIClient{ + endpoint: cfg.APIEndpoint, + apiKey: cfg.APIKey, + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + maxRetries: cfg.MaxRetries, + } +} + +// Job represents a pending job from the API. +type Job struct { + ID string `json:"id"` + MatchID string `json:"match_id"` + Status string `json:"status"` + WorkerID *string `json:"worker_id"` + ClaimedAt *time.Time `json:"claimed_at"` + HeartbeatAt *time.Time `json:"heartbeat_at"` + CreatedAt time.Time `json:"created_at"` +} + +// JobClaimResponse contains the data needed to execute a match. +type JobClaimResponse struct { + Job Job `json:"job"` + Match Match `json:"match"` + Participants []Participant `json:"participants"` + Map MapData `json:"map"` + Bots []BotInfo `json:"bots"` + BotSecrets []BotSecret `json:"bot_secrets"` +} + +// Match represents match metadata. +type Match struct { + ID string `json:"id"` + Status string `json:"status"` + WinnerID *string `json:"winner_id"` + Turns *int `json:"turns"` + EndReason *string `json:"end_reason"` + MapID string `json:"map_id"` + CreatedAt time.Time `json:"created_at"` + StartedAt *time.Time `json:"started_at"` + CompletedAt *time.Time `json:"completed_at"` +} + +// Participant represents a match participant. +type Participant struct { + ID string `json:"id"` + MatchID string `json:"match_id"` + BotID string `json:"bot_id"` + PlayerIndex int `json:"player_index"` + Score int `json:"score"` + RatingBefore int `json:"rating_before"` + RatingAfter *int `json:"rating_after"` + RatingDeviationBefore int `json:"rating_deviation_before"` + RatingDeviationAfter *int `json:"rating_deviation_after"` +} + +// MapData represents map configuration. +type MapData struct { + ID string `json:"id"` + Width int `json:"width"` + Height int `json:"height"` + Walls string `json:"walls"` + Spawns string `json:"spawns"` + Cores string `json:"cores"` +} + +// BotInfo contains bot endpoint information. +type BotInfo struct { + ID string `json:"id"` + EndpointURL string `json:"endpoint_url"` +} + +// BotSecret contains bot authentication secret. +type BotSecret struct { + BotID string `json:"bot_id"` + Secret string `json:"secret"` +} + +// APIResponse is a generic API response. +type APIResponse struct { + Success bool `json:"success"` + Data json.RawMessage `json:"data,omitempty"` + Error string `json:"error,omitempty"` +} + +// GetNextJob fetches the next pending job. +func (c *APIClient) GetNextJob(ctx context.Context) (*Job, error) { + resp, err := c.doRequest(ctx, "GET", "/api/jobs/next", nil) + if err != nil { + return nil, err + } + + var apiResp APIResponse + if err := json.Unmarshal(resp, &apiResp); err != nil { + return nil, fmt.Errorf("failed to parse response: %w", err) + } + + if !apiResp.Success { + return nil, fmt.Errorf("API error: %s", apiResp.Error) + } + + if apiResp.Data == nil { + return nil, nil // No pending jobs + } + + var job Job + if err := json.Unmarshal(apiResp.Data, &job); err != nil { + return nil, fmt.Errorf("failed to parse job: %w", err) + } + + return &job, nil +} + +// ClaimJob claims a job for execution. +func (c *APIClient) ClaimJob(ctx context.Context, jobID string, workerID string) (*JobClaimResponse, error) { + body := map[string]string{"worker_id": workerID} + + resp, err := c.doRequest(ctx, "POST", "/api/jobs/"+jobID+"/claim", body) + if err != nil { + return nil, err + } + + var apiResp APIResponse + if err := json.Unmarshal(resp, &apiResp); err != nil { + return nil, fmt.Errorf("failed to parse response: %w", err) + } + + if !apiResp.Success { + return nil, fmt.Errorf("API error: %s", apiResp.Error) + } + + var claimResp JobClaimResponse + if err := json.Unmarshal(apiResp.Data, &claimResp); err != nil { + return nil, fmt.Errorf("failed to parse claim response: %w", err) + } + + return &claimResp, nil +} + +// Heartbeat sends a heartbeat for a claimed job. +func (c *APIClient) Heartbeat(ctx context.Context, jobID string, workerID string) error { + body := map[string]string{"worker_id": workerID} + + _, err := c.doRequest(ctx, "POST", "/api/jobs/"+jobID+"/heartbeat", body) + return err +} + +// SubmitResult submits the result of a completed match. +func (c *APIClient) SubmitResult(ctx context.Context, jobID string, result *MatchResult, replayURL string) error { + body := map[string]interface{}{ + "winner_id": result.WinnerID, + "turns": result.Turns, + "end_reason": result.EndReason, + "replay_url": replayURL, + "scores": result.Scores, + } + + _, err := c.doRequest(ctx, "POST", "/api/jobs/"+jobID+"/result", body) + return err +} + +// FailJob marks a job as failed. +func (c *APIClient) FailJob(ctx context.Context, jobID string, workerID string, errorMessage string) error { + body := map[string]string{ + "worker_id": workerID, + "error_message": errorMessage, + } + + _, err := c.doRequest(ctx, "POST", "/api/jobs/"+jobID+"/fail", body) + return err +} + +// doRequest makes an HTTP request with retries. +func (c *APIClient) doRequest(ctx context.Context, method string, path string, body interface{}) ([]byte, error) { + var lastErr error + + for attempt := 0; attempt <= c.maxRetries; attempt++ { + if attempt > 0 { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(time.Second * time.Duration(attempt)): + } + } + + resp, err := c.doSingleRequest(ctx, method, path, body) + if err != nil { + lastErr = err + // Check if it's a client error (don't retry) + if httpErr, ok := err.(*HTTPError); ok && httpErr.StatusCode >= 400 && httpErr.StatusCode < 500 { + return nil, err + } + continue + } + + return resp, nil + } + + return nil, fmt.Errorf("request failed after %d retries: %w", c.maxRetries, lastErr) +} + +// doSingleRequest makes a single HTTP request. +func (c *APIClient) doSingleRequest(ctx context.Context, method string, path string, body interface{}) ([]byte, error) { + var reqBody io.Reader + if body != nil { + data, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("failed to marshal request body: %w", err) + } + reqBody = bytes.NewReader(data) + } + + req, err := http.NewRequestWithContext(ctx, method, c.endpoint+path, reqBody) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+c.apiKey) + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + if resp.StatusCode >= 400 { + return nil, &HTTPError{ + StatusCode: resp.StatusCode, + Body: string(respBody), + } + } + + return respBody, nil +} + +// HTTPError represents an HTTP error response. +type HTTPError struct { + StatusCode int + Body string +} + +func (e *HTTPError) Error() string { + return fmt.Sprintf("HTTP %d: %s", e.StatusCode, e.Body) +} diff --git a/cmd/acb-worker/api_test.go b/cmd/acb-worker/api_test.go new file mode 100644 index 0000000..02bff8e --- /dev/null +++ b/cmd/acb-worker/api_test.go @@ -0,0 +1,337 @@ +// API client tests +package main + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestGetNextJob(t *testing.T) { + tests := []struct { + name string + response APIResponse + wantNil bool + wantErr bool + }{ + { + name: "no pending jobs", + response: APIResponse{ + Success: true, + Data: nil, + }, + wantNil: true, + wantErr: false, + }, + { + name: "pending job found", + response: APIResponse{ + Success: true, + Data: json.RawMessage(`{ + "id": "job-123", + "match_id": "match-456", + "status": "pending", + "created_at": "2024-01-01T00:00:00Z" + }`), + }, + wantNil: false, + wantErr: false, + }, + { + name: "api error", + response: APIResponse{ + Success: false, + Error: "internal server error", + }, + wantNil: true, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/jobs/next" { + t.Errorf("unexpected path: %s", r.URL.Path) + } + if r.Method != "GET" { + t.Errorf("unexpected method: %s", r.Method) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(tt.response) + })) + defer server.Close() + + cfg := &Config{ + APIEndpoint: server.URL, + APIKey: "test-key", + MaxRetries: 0, + } + client := NewAPIClient(cfg) + + job, err := client.GetNextJob(context.Background()) + if (err != nil) != tt.wantErr { + t.Errorf("GetNextJob() error = %v, wantErr %v", err, tt.wantErr) + return + } + if (job == nil) != tt.wantNil { + t.Errorf("GetNextJob() job = %v, wantNil %v", job, tt.wantNil) + } + }) + } +} + +func TestClaimJob(t *testing.T) { + tests := []struct { + name string + response APIResponse + wantErr bool + }{ + { + name: "successful claim", + response: APIResponse{ + Success: true, + Data: json.RawMessage(`{ + "job": {"id": "job-123", "match_id": "match-456", "status": "claimed", "created_at": "2024-01-01T00:00:00Z"}, + "match": {"id": "match-456", "status": "running", "map_id": "map-789", "created_at": "2024-01-01T00:00:00Z"}, + "participants": [], + "map": {"id": "map-789", "width": 60, "height": 60, "walls": "", "spawns": "", "cores": ""}, + "bots": [], + "bot_secrets": [] + }`), + }, + wantErr: false, + }, + { + name: "job already claimed", + response: APIResponse{ + Success: false, + Error: "job not found or already claimed", + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + t.Errorf("unexpected method: %s", r.Method) + } + + var body map[string]string + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + t.Errorf("failed to decode body: %v", err) + } + if body["worker_id"] != "worker-1" { + t.Errorf("unexpected worker_id: %s", body["worker_id"]) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(tt.response) + })) + defer server.Close() + + cfg := &Config{ + APIEndpoint: server.URL, + APIKey: "test-key", + MaxRetries: 0, + } + client := NewAPIClient(cfg) + + claim, err := client.ClaimJob(context.Background(), "job-123", "worker-1") + if (err != nil) != tt.wantErr { + t.Errorf("ClaimJob() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr && claim == nil { + t.Error("ClaimJob() returned nil claim without error") + } + }) + } +} + +func TestHeartbeat(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + t.Errorf("unexpected method: %s", r.Method) + } + + var body map[string]string + json.NewDecoder(r.Body).Decode(&body) + if body["worker_id"] != "worker-1" { + t.Errorf("unexpected worker_id: %s", body["worker_id"]) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(APIResponse{Success: true}) + })) + defer server.Close() + + cfg := &Config{ + APIEndpoint: server.URL, + APIKey: "test-key", + MaxRetries: 0, + } + client := NewAPIClient(cfg) + + if err := client.Heartbeat(context.Background(), "job-123", "worker-1"); err != nil { + t.Errorf("Heartbeat() error = %v", err) + } +} + +func TestSubmitResult(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + t.Errorf("unexpected method: %s", r.Method) + } + + var body map[string]interface{} + json.NewDecoder(r.Body).Decode(&body) + if body["winner_id"] != "bot-1" { + t.Errorf("unexpected winner_id: %v", body["winner_id"]) + } + if body["turns"].(float64) != 100 { + t.Errorf("unexpected turns: %v", body["turns"]) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(APIResponse{Success: true}) + })) + defer server.Close() + + cfg := &Config{ + APIEndpoint: server.URL, + APIKey: "test-key", + MaxRetries: 0, + } + client := NewAPIClient(cfg) + + result := &MatchResult{ + WinnerID: "bot-1", + Turns: 100, + EndReason: "elimination", + Scores: map[string]int{"bot-1": 5, "bot-2": 2}, + } + + if err := client.SubmitResult(context.Background(), "job-123", result, "https://r2.example.com/replay.json"); err != nil { + t.Errorf("SubmitResult() error = %v", err) + } +} + +func TestHTTPError(t *testing.T) { + err := &HTTPError{StatusCode: 404, Body: "not found"} + expected := "HTTP 404: not found" + if err.Error() != expected { + t.Errorf("HTTPError.Error() = %q, want %q", err.Error(), expected) + } +} + +func TestAPIAuth(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + auth := r.Header.Get("Authorization") + if auth != "Bearer test-api-key" { + t.Errorf("unexpected authorization header: %s", auth) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(APIResponse{Success: true}) + })) + defer server.Close() + + cfg := &Config{ + APIEndpoint: server.URL, + APIKey: "test-api-key", + MaxRetries: 0, + } + client := NewAPIClient(cfg) + + _, err := client.GetNextJob(context.Background()) + if err != nil { + t.Errorf("GetNextJob() error = %v", err) + } +} + +func TestRetryLogic(t *testing.T) { + attempts := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attempts++ + if attempts < 3 { + // Simulate server error (retryable) + w.WriteHeader(http.StatusInternalServerError) + return + } + // Success on third attempt + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(APIResponse{Success: true}) + })) + defer server.Close() + + cfg := &Config{ + APIEndpoint: server.URL, + APIKey: "test-key", + MaxRetries: 3, + } + client := NewAPIClient(cfg) + + _, err := client.GetNextJob(context.Background()) + if err != nil { + t.Errorf("GetNextJob() error = %v", err) + } + if attempts != 3 { + t.Errorf("expected 3 attempts, got %d", attempts) + } +} + +func TestClientErrorNoRetry(t *testing.T) { + attempts := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attempts++ + // Client error (should not retry) + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("bad request")) + })) + defer server.Close() + + cfg := &Config{ + APIEndpoint: server.URL, + APIKey: "test-key", + MaxRetries: 3, + } + client := NewAPIClient(cfg) + + _, err := client.GetNextJob(context.Background()) + if err == nil { + t.Error("expected error for bad request") + } + if attempts != 1 { + t.Errorf("expected 1 attempt (no retry for client errors), got %d", attempts) + } +} + +func TestContextCancellation(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(2 * time.Second) // Long delay + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + cfg := &Config{ + APIEndpoint: server.URL, + APIKey: "test-key", + MaxRetries: 0, + } + client := NewAPIClient(cfg) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + _, err := client.GetNextJob(ctx) + if err == nil { + t.Error("expected context cancellation error") + } +} diff --git a/cmd/acb-worker/main.go b/cmd/acb-worker/main.go new file mode 100644 index 0000000..bfb8e69 --- /dev/null +++ b/cmd/acb-worker/main.go @@ -0,0 +1,352 @@ +// acb-worker: Match execution worker for AI Code Battle +// +// This worker polls the Cloudflare Worker API for pending match jobs, +// executes matches using the game engine, uploads replays to R2, +// and submits results back to the API. +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "math/rand" + "os" + "os/signal" + "syscall" + "time" + + "github.com/aicodebattle/acb/engine" +) + +// Config holds worker configuration. +type Config struct { + APIEndpoint string // Worker API endpoint (e.g., https://api.aicodebattle.com) + APIKey string // Worker API key for authentication + R2Endpoint string // R2 endpoint for replay uploads + R2Bucket string // R2 bucket name + R2AccessKey string // R2 access key ID + R2SecretKey string // R2 secret access key + WorkerID string // Unique worker identifier + PollPeriod time.Duration // How often to poll for jobs + Heartbeat time.Duration // How often to send heartbeat during match + TurnTimeout time.Duration // Per-turn timeout for bots + MaxRetries int // Max retries for transient errors + Verbose bool // Enable verbose logging +} + +func main() { + // Parse command-line flags + apiEndpoint := flag.String("api", getEnv("ACB_API_ENDPOINT", "http://localhost:8787"), "Worker API endpoint") + apiKey := flag.String("api-key", getEnv("ACB_API_KEY", ""), "Worker API key") + r2Endpoint := flag.String("r2-endpoint", getEnv("ACB_R2_ENDPOINT", ""), "R2 endpoint URL") + r2Bucket := flag.String("r2-bucket", getEnv("ACB_R2_BUCKET", "acb-data"), "R2 bucket name") + r2AccessKey := flag.String("r2-access-key", getEnv("ACB_R2_ACCESS_KEY", ""), "R2 access key ID") + r2SecretKey := flag.String("r2-secret-key", getEnv("ACB_R2_SECRET_KEY", ""), "R2 secret access key") + workerID := flag.String("worker-id", getEnv("ACB_WORKER_ID", generateWorkerID()), "Unique worker identifier") + pollPeriod := flag.Duration("poll", 5*time.Second, "Job polling period") + heartbeat := flag.Duration("heartbeat", 30*time.Second, "Heartbeat interval during matches") + turnTimeout := flag.Duration("timeout", 3*time.Second, "Per-turn bot timeout") + maxRetries := flag.Int("retries", 3, "Max retries for transient errors") + verbose := flag.Bool("verbose", getEnv("ACB_VERBOSE", "false") == "true", "Enable verbose logging") + flag.Parse() + + // Validate required config + if *apiKey == "" { + log.Fatal("API key is required (set ACB_API_KEY or use -api-key flag)") + } + + cfg := &Config{ + APIEndpoint: *apiEndpoint, + APIKey: *apiKey, + R2Endpoint: *r2Endpoint, + R2Bucket: *r2Bucket, + R2AccessKey: *r2AccessKey, + R2SecretKey: *r2SecretKey, + WorkerID: *workerID, + PollPeriod: *pollPeriod, + Heartbeat: *heartbeat, + TurnTimeout: *turnTimeout, + MaxRetries: *maxRetries, + Verbose: *verbose, + } + + // Create API client + apiClient := NewAPIClient(cfg) + + // Create R2 client (optional - if not configured, replays won't be uploaded) + var r2Client *R2Client + if cfg.R2Endpoint != "" && cfg.R2AccessKey != "" && cfg.R2SecretKey != "" { + r2Client = NewR2Client(cfg) + } + + // Create worker + worker := &Worker{ + cfg: cfg, + api: apiClient, + r2: r2Client, + logger: log.New(os.Stdout, fmt.Sprintf("[worker-%s] ", cfg.WorkerID), log.LstdFlags), + rng: rand.New(rand.NewSource(time.Now().UnixNano())), + heartbeat: *heartbeat, + } + + // Set up signal handling + ctx, cancel := context.WithCancel(context.Background()) + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-sigChan + worker.logger.Println("Received shutdown signal, finishing current job...") + cancel() + }() + + // Run worker loop + worker.Run(ctx) +} + +// getEnv gets an environment variable with a default value. +func getEnv(key, defaultValue string) string { + if value := os.Getenv(key); value != "" { + return value + } + return defaultValue +} + +// generateWorkerID generates a random worker ID. +func generateWorkerID() string { + return fmt.Sprintf("worker-%d", rand.Intn(100000)) +} + +// Worker executes match jobs. +type Worker struct { + cfg *Config + api *APIClient + r2 *R2Client + logger *log.Logger + rng *rand.Rand + heartbeat time.Duration +} + +// Run starts the worker loop. +func (w *Worker) Run(ctx context.Context) { + w.logger.Println("Worker started, polling for jobs...") + + ticker := time.NewTicker(w.cfg.PollPeriod) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + w.logger.Println("Worker shutting down") + return + case <-ticker.C: + if err := w.pollAndExecute(ctx); err != nil { + w.logger.Printf("Error in poll cycle: %v", err) + } + } + } +} + +// pollAndExecute polls for a job and executes it if available. +func (w *Worker) pollAndExecute(ctx context.Context) error { + // Get next pending job + job, err := w.api.GetNextJob(ctx) + if err != nil { + return fmt.Errorf("failed to get next job: %w", err) + } + + if job == nil { + if w.cfg.Verbose { + w.logger.Println("No pending jobs") + } + return nil + } + + w.logger.Printf("Found job %s for match %s", job.ID, job.MatchID) + + // Claim the job + claimResp, err := w.api.ClaimJob(ctx, job.ID, w.cfg.WorkerID) + if err != nil { + return fmt.Errorf("failed to claim job %s: %w", job.ID, err) + } + + w.logger.Printf("Claimed job %s, executing match...", job.ID) + + // Execute the match + result, replay, err := w.executeMatch(ctx, claimResp) + if err != nil { + w.logger.Printf("Match execution failed: %v", err) + // Mark job as failed + if failErr := w.api.FailJob(ctx, job.ID, w.cfg.WorkerID, err.Error()); failErr != nil { + w.logger.Printf("Failed to mark job as failed: %v", failErr) + } + return err + } + + // Upload replay to R2 + replayURL := "" + if w.r2 != nil { + replayURL, err = w.uploadReplay(ctx, claimResp.Match.ID, replay) + if err != nil { + w.logger.Printf("Failed to upload replay: %v", err) + // Continue without replay URL - match result is more important + } else { + w.logger.Printf("Uploaded replay to %s", replayURL) + } + } + + // Submit result + err = w.api.SubmitResult(ctx, job.ID, result, replayURL) + if err != nil { + return fmt.Errorf("failed to submit result for job %s: %w", job.ID, err) + } + + w.logger.Printf("Completed job %s, winner: %s", job.ID, result.WinnerID) + return nil +} + +// executeMatch runs a match and returns the result and replay. +func (w *Worker) executeMatch(ctx context.Context, claim *JobClaimResponse) (*MatchResult, *engine.Replay, error) { + // Build game config from map data + config := engine.Config{ + Rows: claim.Map.Width, + Cols: claim.Map.Height, + MaxTurns: 500, // Default max turns + VisionRadius2: 49, // Default vision + AttackRadius2: 5, // Default attack + SpawnCost: 3, // Default spawn cost + EnergyInterval: 10, // Default energy interval + } + + // Create match runner + runner := engine.NewMatchRunner(config, + engine.WithRNG(w.rng), + engine.WithVerbose(w.cfg.Verbose), + engine.WithTimeout(w.cfg.TurnTimeout), + ) + + // Add bots from claim response + for _, participant := range claim.Participants { + // Find bot endpoint + var endpointURL string + for _, bot := range claim.Bots { + if bot.ID == participant.BotID { + endpointURL = bot.EndpointURL + break + } + } + + // Find bot secret + var secret string + for _, s := range claim.BotSecrets { + if s.BotID == participant.BotID { + secret = s.Secret + break + } + } + + // Create auth config for HTTP bot + auth := engine.AuthConfig{ + BotID: participant.BotID, + Secret: secret, + MatchID: claim.Match.ID, + } + + // Create HTTP bot client + httpBot := engine.NewHTTPBot( + endpointURL, + auth, + engine.WithHTTPTimeout(w.cfg.TurnTimeout), + ) + + runner.AddBot(httpBot, participant.BotID) + w.logger.Printf("Added bot %s at %s (player %d)", participant.BotID, endpointURL, participant.PlayerIndex) + } + + // Start heartbeat goroutine + heartbeatCtx, heartbeatCancel := context.WithCancel(ctx) + defer heartbeatCancel() + + go w.sendHeartbeats(heartbeatCtx, claim.Job.ID) + + // Run the match + engineResult, replay, err := runner.Run() + if err != nil { + return nil, nil, fmt.Errorf("match execution failed: %w", err) + } + + // Convert result + result := &MatchResult{ + WinnerID: "", + Turns: engineResult.Turns, + EndReason: engineResult.Reason, + Scores: make(map[string]int), + } + + // Set winner ID from result (Winner is int, -1 for draw) + if engineResult.Winner >= 0 && engineResult.Winner < len(claim.Participants) { + for _, p := range claim.Participants { + if p.PlayerIndex == engineResult.Winner { + result.WinnerID = p.BotID + break + } + } + } + + // Calculate scores from replay + for i, p := range claim.Participants { + if i < len(engineResult.Scores) { + result.Scores[p.BotID] = engineResult.Scores[i] + } + } + + return result, replay, nil +} + +// sendHeartbeats sends periodic heartbeats while a match is running. +func (w *Worker) sendHeartbeats(ctx context.Context, jobID string) { + ticker := time.NewTicker(w.heartbeat) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := w.api.Heartbeat(ctx, jobID, w.cfg.WorkerID); err != nil { + w.logger.Printf("Heartbeat failed: %v", err) + } + } + } +} + +// uploadReplay uploads the replay to R2 and returns the URL. +func (w *Worker) uploadReplay(ctx context.Context, matchID string, replay *engine.Replay) (string, error) { + if w.r2 == nil { + return "", fmt.Errorf("R2 client not configured") + } + + // Serialize replay to JSON + data, err := json.Marshal(replay) + if err != nil { + return "", fmt.Errorf("failed to serialize replay: %w", err) + } + + // Upload to R2 + key := fmt.Sprintf("replays/%s.json", matchID) + if err := w.r2.Upload(ctx, key, data, "application/json"); err != nil { + return "", fmt.Errorf("failed to upload replay to R2: %w", err) + } + + return fmt.Sprintf("%s/%s", w.r2.Endpoint(), key), nil +} + +// MatchResult represents the result of a match for API submission. +type MatchResult struct { + WinnerID string `json:"winner_id"` + Turns int `json:"turns"` + EndReason string `json:"end_reason"` + Scores map[string]int `json:"scores"` +} diff --git a/cmd/acb-worker/r2.go b/cmd/acb-worker/r2.go new file mode 100644 index 0000000..4707fd8 --- /dev/null +++ b/cmd/acb-worker/r2.go @@ -0,0 +1,120 @@ +// R2 client for uploading replays +package main + +import ( + "bytes" + "context" + "fmt" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +// R2Client handles R2 bucket operations. +type R2Client struct { + client *s3.Client + bucket string + endpoint string +} + +// NewR2Client creates a new R2 client. +func NewR2Client(cfg *Config) *R2Client { + // Create custom endpoint resolver for R2 + customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + return aws.Endpoint{ + URL: cfg.R2Endpoint, + SigningRegion: "auto", + }, nil + }) + + // Load AWS config with R2 credentials + awsCfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( + cfg.R2AccessKey, + cfg.R2SecretKey, + "", + )), + config.WithEndpointResolverWithOptions(customResolver), + ) + if err != nil { + panic(fmt.Sprintf("failed to load AWS config: %v", err)) + } + + return &R2Client{ + client: s3.NewFromConfig(awsCfg), + bucket: cfg.R2Bucket, + endpoint: cfg.R2Endpoint, + } +} + +// Upload uploads data to R2. +func (c *R2Client) Upload(ctx context.Context, key string, data []byte, contentType string) error { + _, err := c.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(c.bucket), + Key: aws.String(key), + Body: bytes.NewReader(data), + ContentType: aws.String(contentType), + CacheControl: aws.String("public, max-age=31536000, immutable"), + }) + return err +} + +// Download downloads data from R2. +func (c *R2Client) Download(ctx context.Context, key string) ([]byte, error) { + resp, err := c.client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(c.bucket), + Key: aws.String(key), + }) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + buf := new(bytes.Buffer) + if _, err := buf.ReadFrom(resp.Body); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +// Delete deletes an object from R2. +func (c *R2Client) Delete(ctx context.Context, key string) error { + _, err := c.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(c.bucket), + Key: aws.String(key), + }) + return err +} + +// List lists objects with a prefix. +func (c *R2Client) List(ctx context.Context, prefix string) ([]string, error) { + var keys []string + + paginator := s3.NewListObjectsV2Paginator(c.client, &s3.ListObjectsV2Input{ + Bucket: aws.String(c.bucket), + Prefix: aws.String(prefix), + }) + + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return nil, err + } + + for _, obj := range page.Contents { + if obj.Key != nil { + keys = append(keys, *obj.Key) + } + } + } + + return keys, nil +} + +// Endpoint returns the R2 endpoint URL. +func (c *R2Client) Endpoint() string { + return c.endpoint +} diff --git a/go.mod b/go.mod index 7a6c5e6..3b05195 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,25 @@ module github.com/aicodebattle/acb go 1.24.3 + +require ( + github.com/aws/aws-sdk-go-v2 v1.41.4 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 // indirect + github.com/aws/aws-sdk-go-v2/config v1.32.12 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.19.12 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.20 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.20 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.21 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.12 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.20 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.20 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.97.2 // indirect + github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.17 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.41.9 // indirect + github.com/aws/smithy-go v1.24.2 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..7eecae4 --- /dev/null +++ b/go.sum @@ -0,0 +1,38 @@ +github.com/aws/aws-sdk-go-v2 v1.41.4 h1:10f50G7WyU02T56ox1wWXq+zTX9I1zxG46HYuG1hH/k= +github.com/aws/aws-sdk-go-v2 v1.41.4/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 h1:eBMB84YGghSocM7PsjmmPffTa+1FBUeNvGvFou6V/4o= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8/go.mod h1:lyw7GFp3qENLh7kwzf7iMzAxDn+NzjXEAGjKS2UOKqI= +github.com/aws/aws-sdk-go-v2/config v1.32.12 h1:O3csC7HUGn2895eNrLytOJQdoL2xyJy0iYXhoZ1OmP0= +github.com/aws/aws-sdk-go-v2/config v1.32.12/go.mod h1:96zTvoOFR4FURjI+/5wY1vc1ABceROO4lWgWJuxgy0g= +github.com/aws/aws-sdk-go-v2/credentials v1.19.12 h1:oqtA6v+y5fZg//tcTWahyN9PEn5eDU/Wpvc2+kJ4aY8= +github.com/aws/aws-sdk-go-v2/credentials v1.19.12/go.mod h1:U3R1RtSHx6NB0DvEQFGyf/0sbrpJrluENHdPy1j/3TE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20 h1:zOgq3uezl5nznfoK3ODuqbhVg1JzAGDUhXOsU0IDCAo= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20/go.mod h1:z/MVwUARehy6GAg/yQ1GO2IMl0k++cu1ohP9zo887wE= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.20 h1:CNXO7mvgThFGqOFgbNAP2nol2qAWBOGfqR/7tQlvLmc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.20/go.mod h1:oydPDJKcfMhgfcgBUZaG+toBbwy8yPWubJXBVERtI4o= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.20 h1:tN6W/hg+pkM+tf9XDkWUbDEjGLb+raoBMFsTodcoYKw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.20/go.mod h1:YJ898MhD067hSHA6xYCx5ts/jEd8BSOLtQDL3iZsvbc= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 h1:qYQ4pzQ2Oz6WpQ8T3HvGHnZydA72MnLuFK9tJwmrbHw= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6/go.mod h1:O3h0IK87yXci+kg6flUKzJnWeziQUKciKrLjcatSNcY= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.21 h1:SwGMTMLIlvDNyhMteQ6r8IJSBPlRdXX5d4idhIGbkXA= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.21/go.mod h1:UUxgWxofmOdAMuqEsSppbDtGKLfR04HGsD0HXzvhI1k= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 h1:5EniKhLZe4xzL7a+fU3C2tfUN4nWIqlLesfrjkuPFTY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7/go.mod h1:x0nZssQ3qZSnIcePWLvcoFisRXJzcTVvYpAAdYX8+GI= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.12 h1:qtJZ70afD3ISKWnoX3xB0J2otEqu3LqicRcDBqsj0hQ= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.12/go.mod h1:v2pNpJbRNl4vEUWEh5ytQok0zACAKfdmKS51Hotc3pQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.20 h1:2HvVAIq+YqgGotK6EkMf+KIEqTISmTYh5zLpYyeTo1Y= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.20/go.mod h1:V4X406Y666khGa8ghKmphma/7C0DAtEQYhkq9z4vpbk= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.20 h1:siU1A6xjUZ2N8zjTHSXFhB9L/2OY8Dqs0xXiLjF30jA= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.20/go.mod h1:4TLZCmVJDM3FOu5P5TJP0zOlu9zWgDWU7aUxWbr+rcw= +github.com/aws/aws-sdk-go-v2/service/s3 v1.97.2 h1:MRNiP6nqa20aEl8fQ6PJpEq11b2d40b16sm4WD7QgMU= +github.com/aws/aws-sdk-go-v2/service/s3 v1.97.2/go.mod h1:FrNA56srbsr3WShiaelyWYEo70x80mXnVZ17ZZfbeqg= +github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 h1:0GFOLzEbOyZABS3PhYfBIx2rNBACYcKty+XGkTgw1ow= +github.com/aws/aws-sdk-go-v2/service/signin v1.0.8/go.mod h1:LXypKvk85AROkKhOG6/YEcHFPoX+prKTowKnVdcaIxE= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 h1:kiIDLZ005EcKomYYITtfsjn7dtOwHDOFy7IbPXKek2o= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.13/go.mod h1:2h/xGEowcW/g38g06g3KpRWDlT+OTfxxI0o1KqayAB8= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.17 h1:jzKAXIlhZhJbnYwHbvUQZEB8KfgAEuG0dc08Bkda7NU= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.17/go.mod h1:Al9fFsXjv4KfbzQHGe6V4NZSZQXecFcvaIF4e70FoRA= +github.com/aws/aws-sdk-go-v2/service/sts v1.41.9 h1:Cng+OOwCHmFljXIxpEVXAGMnBia8MSU6Ch5i9PgBkcU= +github.com/aws/aws-sdk-go-v2/service/sts v1.41.9/go.mod h1:LrlIndBDdjA/EeXeyNBle+gyCwTlizzW5ycgWnvIxkk= +github.com/aws/smithy-go v1.24.2 h1:FzA3bu/nt/vDvmnkg+R8Xl46gmzEDam6mZ1hzmwXFng= +github.com/aws/smithy-go v1.24.2/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= diff --git a/worker-api/src/glicko2.test.ts b/worker-api/src/glicko2.test.ts index e3adbe8..da0074d 100644 --- a/worker-api/src/glicko2.test.ts +++ b/worker-api/src/glicko2.test.ts @@ -55,8 +55,8 @@ describe('Glicko-2 Rating System', () => { }); it('returns correct values for known inputs', () => { - // g(0.2) ≈ 0.9955 (from paper example) - expect(g(0.2)).toBeCloseTo(0.9955, 4); + // g(0.2) = 1/sqrt(1 + 3*0.04/pi^2) ≈ 0.993976 + expect(g(0.2)).toBeCloseTo(0.993976, 4); }); }); diff --git a/worker-api/src/glicko2.ts b/worker-api/src/glicko2.ts index 47c5e6b..6353a55 100644 --- a/worker-api/src/glicko2.ts +++ b/worker-api/src/glicko2.ts @@ -40,14 +40,14 @@ export function fromGlicko2(g2: Glicko2Rating): { rating: number; rd: number } { /** * Compute g(phi) function */ -function g(phi: number): number { +export function g(phi: number): number { return 1 / Math.sqrt(1 + (3 * phi * phi) / (Math.PI * Math.PI)); } /** * Compute E(mu, mu_j, phi_j) function */ -function E(mu: number, mu_j: number, phi_j: number): number { +export function E(mu: number, mu_j: number, phi_j: number): number { return 1 / (1 + Math.exp(-g(phi_j) * (mu - mu_j))); }