Implement match worker container (cmd/acb-worker/)

- Worker polls Cloudflare Worker API for pending match jobs
- Claims jobs and executes matches using the game engine
- Uploads replays to R2 via S3-compatible API
- Sends heartbeats during match execution
- Submits results back to Worker API
- Includes retry logic with exponential backoff
- API client tests for job coordination endpoints

Also fixes glicko2.ts: export g() and E() functions for testing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-03-24 08:06:15 -04:00
parent 9bd4efc935
commit 6659027bec
8 changed files with 1147 additions and 4 deletions

274
cmd/acb-worker/api.go Normal file
View file

@ -0,0 +1,274 @@
// API client for Worker API communication
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
// APIClient communicates with the Worker API.
type APIClient struct {
endpoint string
apiKey string
httpClient *http.Client
maxRetries int
}
// NewAPIClient creates a new API client.
func NewAPIClient(cfg *Config) *APIClient {
return &APIClient{
endpoint: cfg.APIEndpoint,
apiKey: cfg.APIKey,
httpClient: &http.Client{
Timeout: 30 * time.Second,
},
maxRetries: cfg.MaxRetries,
}
}
// Job represents a pending job from the API.
type Job struct {
ID string `json:"id"`
MatchID string `json:"match_id"`
Status string `json:"status"`
WorkerID *string `json:"worker_id"`
ClaimedAt *time.Time `json:"claimed_at"`
HeartbeatAt *time.Time `json:"heartbeat_at"`
CreatedAt time.Time `json:"created_at"`
}
// JobClaimResponse contains the data needed to execute a match.
type JobClaimResponse struct {
Job Job `json:"job"`
Match Match `json:"match"`
Participants []Participant `json:"participants"`
Map MapData `json:"map"`
Bots []BotInfo `json:"bots"`
BotSecrets []BotSecret `json:"bot_secrets"`
}
// Match represents match metadata.
type Match struct {
ID string `json:"id"`
Status string `json:"status"`
WinnerID *string `json:"winner_id"`
Turns *int `json:"turns"`
EndReason *string `json:"end_reason"`
MapID string `json:"map_id"`
CreatedAt time.Time `json:"created_at"`
StartedAt *time.Time `json:"started_at"`
CompletedAt *time.Time `json:"completed_at"`
}
// Participant represents a match participant.
type Participant struct {
ID string `json:"id"`
MatchID string `json:"match_id"`
BotID string `json:"bot_id"`
PlayerIndex int `json:"player_index"`
Score int `json:"score"`
RatingBefore int `json:"rating_before"`
RatingAfter *int `json:"rating_after"`
RatingDeviationBefore int `json:"rating_deviation_before"`
RatingDeviationAfter *int `json:"rating_deviation_after"`
}
// MapData represents map configuration.
type MapData struct {
ID string `json:"id"`
Width int `json:"width"`
Height int `json:"height"`
Walls string `json:"walls"`
Spawns string `json:"spawns"`
Cores string `json:"cores"`
}
// BotInfo contains bot endpoint information.
type BotInfo struct {
ID string `json:"id"`
EndpointURL string `json:"endpoint_url"`
}
// BotSecret contains bot authentication secret.
type BotSecret struct {
BotID string `json:"bot_id"`
Secret string `json:"secret"`
}
// APIResponse is a generic API response.
type APIResponse struct {
Success bool `json:"success"`
Data json.RawMessage `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
// GetNextJob fetches the next pending job.
func (c *APIClient) GetNextJob(ctx context.Context) (*Job, error) {
resp, err := c.doRequest(ctx, "GET", "/api/jobs/next", nil)
if err != nil {
return nil, err
}
var apiResp APIResponse
if err := json.Unmarshal(resp, &apiResp); err != nil {
return nil, fmt.Errorf("failed to parse response: %w", err)
}
if !apiResp.Success {
return nil, fmt.Errorf("API error: %s", apiResp.Error)
}
if apiResp.Data == nil {
return nil, nil // No pending jobs
}
var job Job
if err := json.Unmarshal(apiResp.Data, &job); err != nil {
return nil, fmt.Errorf("failed to parse job: %w", err)
}
return &job, nil
}
// ClaimJob claims a job for execution.
func (c *APIClient) ClaimJob(ctx context.Context, jobID string, workerID string) (*JobClaimResponse, error) {
body := map[string]string{"worker_id": workerID}
resp, err := c.doRequest(ctx, "POST", "/api/jobs/"+jobID+"/claim", body)
if err != nil {
return nil, err
}
var apiResp APIResponse
if err := json.Unmarshal(resp, &apiResp); err != nil {
return nil, fmt.Errorf("failed to parse response: %w", err)
}
if !apiResp.Success {
return nil, fmt.Errorf("API error: %s", apiResp.Error)
}
var claimResp JobClaimResponse
if err := json.Unmarshal(apiResp.Data, &claimResp); err != nil {
return nil, fmt.Errorf("failed to parse claim response: %w", err)
}
return &claimResp, nil
}
// Heartbeat sends a heartbeat for a claimed job.
func (c *APIClient) Heartbeat(ctx context.Context, jobID string, workerID string) error {
body := map[string]string{"worker_id": workerID}
_, err := c.doRequest(ctx, "POST", "/api/jobs/"+jobID+"/heartbeat", body)
return err
}
// SubmitResult submits the result of a completed match.
func (c *APIClient) SubmitResult(ctx context.Context, jobID string, result *MatchResult, replayURL string) error {
body := map[string]interface{}{
"winner_id": result.WinnerID,
"turns": result.Turns,
"end_reason": result.EndReason,
"replay_url": replayURL,
"scores": result.Scores,
}
_, err := c.doRequest(ctx, "POST", "/api/jobs/"+jobID+"/result", body)
return err
}
// FailJob marks a job as failed.
func (c *APIClient) FailJob(ctx context.Context, jobID string, workerID string, errorMessage string) error {
body := map[string]string{
"worker_id": workerID,
"error_message": errorMessage,
}
_, err := c.doRequest(ctx, "POST", "/api/jobs/"+jobID+"/fail", body)
return err
}
// doRequest makes an HTTP request with retries.
func (c *APIClient) doRequest(ctx context.Context, method string, path string, body interface{}) ([]byte, error) {
var lastErr error
for attempt := 0; attempt <= c.maxRetries; attempt++ {
if attempt > 0 {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(time.Second * time.Duration(attempt)):
}
}
resp, err := c.doSingleRequest(ctx, method, path, body)
if err != nil {
lastErr = err
// Check if it's a client error (don't retry)
if httpErr, ok := err.(*HTTPError); ok && httpErr.StatusCode >= 400 && httpErr.StatusCode < 500 {
return nil, err
}
continue
}
return resp, nil
}
return nil, fmt.Errorf("request failed after %d retries: %w", c.maxRetries, lastErr)
}
// doSingleRequest makes a single HTTP request.
func (c *APIClient) doSingleRequest(ctx context.Context, method string, path string, body interface{}) ([]byte, error) {
var reqBody io.Reader
if body != nil {
data, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("failed to marshal request body: %w", err)
}
reqBody = bytes.NewReader(data)
}
req, err := http.NewRequestWithContext(ctx, method, c.endpoint+path, reqBody)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+c.apiKey)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("request failed: %w", err)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response: %w", err)
}
if resp.StatusCode >= 400 {
return nil, &HTTPError{
StatusCode: resp.StatusCode,
Body: string(respBody),
}
}
return respBody, nil
}
// HTTPError represents an HTTP error response.
type HTTPError struct {
StatusCode int
Body string
}
func (e *HTTPError) Error() string {
return fmt.Sprintf("HTTP %d: %s", e.StatusCode, e.Body)
}

337
cmd/acb-worker/api_test.go Normal file
View file

@ -0,0 +1,337 @@
// API client tests
package main
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
)
func TestGetNextJob(t *testing.T) {
tests := []struct {
name string
response APIResponse
wantNil bool
wantErr bool
}{
{
name: "no pending jobs",
response: APIResponse{
Success: true,
Data: nil,
},
wantNil: true,
wantErr: false,
},
{
name: "pending job found",
response: APIResponse{
Success: true,
Data: json.RawMessage(`{
"id": "job-123",
"match_id": "match-456",
"status": "pending",
"created_at": "2024-01-01T00:00:00Z"
}`),
},
wantNil: false,
wantErr: false,
},
{
name: "api error",
response: APIResponse{
Success: false,
Error: "internal server error",
},
wantNil: true,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/jobs/next" {
t.Errorf("unexpected path: %s", r.URL.Path)
}
if r.Method != "GET" {
t.Errorf("unexpected method: %s", r.Method)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(tt.response)
}))
defer server.Close()
cfg := &Config{
APIEndpoint: server.URL,
APIKey: "test-key",
MaxRetries: 0,
}
client := NewAPIClient(cfg)
job, err := client.GetNextJob(context.Background())
if (err != nil) != tt.wantErr {
t.Errorf("GetNextJob() error = %v, wantErr %v", err, tt.wantErr)
return
}
if (job == nil) != tt.wantNil {
t.Errorf("GetNextJob() job = %v, wantNil %v", job, tt.wantNil)
}
})
}
}
func TestClaimJob(t *testing.T) {
tests := []struct {
name string
response APIResponse
wantErr bool
}{
{
name: "successful claim",
response: APIResponse{
Success: true,
Data: json.RawMessage(`{
"job": {"id": "job-123", "match_id": "match-456", "status": "claimed", "created_at": "2024-01-01T00:00:00Z"},
"match": {"id": "match-456", "status": "running", "map_id": "map-789", "created_at": "2024-01-01T00:00:00Z"},
"participants": [],
"map": {"id": "map-789", "width": 60, "height": 60, "walls": "", "spawns": "", "cores": ""},
"bots": [],
"bot_secrets": []
}`),
},
wantErr: false,
},
{
name: "job already claimed",
response: APIResponse{
Success: false,
Error: "job not found or already claimed",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
t.Errorf("unexpected method: %s", r.Method)
}
var body map[string]string
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Errorf("failed to decode body: %v", err)
}
if body["worker_id"] != "worker-1" {
t.Errorf("unexpected worker_id: %s", body["worker_id"])
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(tt.response)
}))
defer server.Close()
cfg := &Config{
APIEndpoint: server.URL,
APIKey: "test-key",
MaxRetries: 0,
}
client := NewAPIClient(cfg)
claim, err := client.ClaimJob(context.Background(), "job-123", "worker-1")
if (err != nil) != tt.wantErr {
t.Errorf("ClaimJob() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !tt.wantErr && claim == nil {
t.Error("ClaimJob() returned nil claim without error")
}
})
}
}
func TestHeartbeat(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
t.Errorf("unexpected method: %s", r.Method)
}
var body map[string]string
json.NewDecoder(r.Body).Decode(&body)
if body["worker_id"] != "worker-1" {
t.Errorf("unexpected worker_id: %s", body["worker_id"])
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(APIResponse{Success: true})
}))
defer server.Close()
cfg := &Config{
APIEndpoint: server.URL,
APIKey: "test-key",
MaxRetries: 0,
}
client := NewAPIClient(cfg)
if err := client.Heartbeat(context.Background(), "job-123", "worker-1"); err != nil {
t.Errorf("Heartbeat() error = %v", err)
}
}
func TestSubmitResult(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
t.Errorf("unexpected method: %s", r.Method)
}
var body map[string]interface{}
json.NewDecoder(r.Body).Decode(&body)
if body["winner_id"] != "bot-1" {
t.Errorf("unexpected winner_id: %v", body["winner_id"])
}
if body["turns"].(float64) != 100 {
t.Errorf("unexpected turns: %v", body["turns"])
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(APIResponse{Success: true})
}))
defer server.Close()
cfg := &Config{
APIEndpoint: server.URL,
APIKey: "test-key",
MaxRetries: 0,
}
client := NewAPIClient(cfg)
result := &MatchResult{
WinnerID: "bot-1",
Turns: 100,
EndReason: "elimination",
Scores: map[string]int{"bot-1": 5, "bot-2": 2},
}
if err := client.SubmitResult(context.Background(), "job-123", result, "https://r2.example.com/replay.json"); err != nil {
t.Errorf("SubmitResult() error = %v", err)
}
}
func TestHTTPError(t *testing.T) {
err := &HTTPError{StatusCode: 404, Body: "not found"}
expected := "HTTP 404: not found"
if err.Error() != expected {
t.Errorf("HTTPError.Error() = %q, want %q", err.Error(), expected)
}
}
func TestAPIAuth(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
auth := r.Header.Get("Authorization")
if auth != "Bearer test-api-key" {
t.Errorf("unexpected authorization header: %s", auth)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(APIResponse{Success: true})
}))
defer server.Close()
cfg := &Config{
APIEndpoint: server.URL,
APIKey: "test-api-key",
MaxRetries: 0,
}
client := NewAPIClient(cfg)
_, err := client.GetNextJob(context.Background())
if err != nil {
t.Errorf("GetNextJob() error = %v", err)
}
}
func TestRetryLogic(t *testing.T) {
attempts := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
attempts++
if attempts < 3 {
// Simulate server error (retryable)
w.WriteHeader(http.StatusInternalServerError)
return
}
// Success on third attempt
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(APIResponse{Success: true})
}))
defer server.Close()
cfg := &Config{
APIEndpoint: server.URL,
APIKey: "test-key",
MaxRetries: 3,
}
client := NewAPIClient(cfg)
_, err := client.GetNextJob(context.Background())
if err != nil {
t.Errorf("GetNextJob() error = %v", err)
}
if attempts != 3 {
t.Errorf("expected 3 attempts, got %d", attempts)
}
}
func TestClientErrorNoRetry(t *testing.T) {
attempts := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
attempts++
// Client error (should not retry)
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("bad request"))
}))
defer server.Close()
cfg := &Config{
APIEndpoint: server.URL,
APIKey: "test-key",
MaxRetries: 3,
}
client := NewAPIClient(cfg)
_, err := client.GetNextJob(context.Background())
if err == nil {
t.Error("expected error for bad request")
}
if attempts != 1 {
t.Errorf("expected 1 attempt (no retry for client errors), got %d", attempts)
}
}
func TestContextCancellation(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(2 * time.Second) // Long delay
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
cfg := &Config{
APIEndpoint: server.URL,
APIKey: "test-key",
MaxRetries: 0,
}
client := NewAPIClient(cfg)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, err := client.GetNextJob(ctx)
if err == nil {
t.Error("expected context cancellation error")
}
}

352
cmd/acb-worker/main.go Normal file
View file

@ -0,0 +1,352 @@
// acb-worker: Match execution worker for AI Code Battle
//
// This worker polls the Cloudflare Worker API for pending match jobs,
// executes matches using the game engine, uploads replays to R2,
// and submits results back to the API.
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
"github.com/aicodebattle/acb/engine"
)
// Config holds worker configuration.
type Config struct {
APIEndpoint string // Worker API endpoint (e.g., https://api.aicodebattle.com)
APIKey string // Worker API key for authentication
R2Endpoint string // R2 endpoint for replay uploads
R2Bucket string // R2 bucket name
R2AccessKey string // R2 access key ID
R2SecretKey string // R2 secret access key
WorkerID string // Unique worker identifier
PollPeriod time.Duration // How often to poll for jobs
Heartbeat time.Duration // How often to send heartbeat during match
TurnTimeout time.Duration // Per-turn timeout for bots
MaxRetries int // Max retries for transient errors
Verbose bool // Enable verbose logging
}
func main() {
// Parse command-line flags
apiEndpoint := flag.String("api", getEnv("ACB_API_ENDPOINT", "http://localhost:8787"), "Worker API endpoint")
apiKey := flag.String("api-key", getEnv("ACB_API_KEY", ""), "Worker API key")
r2Endpoint := flag.String("r2-endpoint", getEnv("ACB_R2_ENDPOINT", ""), "R2 endpoint URL")
r2Bucket := flag.String("r2-bucket", getEnv("ACB_R2_BUCKET", "acb-data"), "R2 bucket name")
r2AccessKey := flag.String("r2-access-key", getEnv("ACB_R2_ACCESS_KEY", ""), "R2 access key ID")
r2SecretKey := flag.String("r2-secret-key", getEnv("ACB_R2_SECRET_KEY", ""), "R2 secret access key")
workerID := flag.String("worker-id", getEnv("ACB_WORKER_ID", generateWorkerID()), "Unique worker identifier")
pollPeriod := flag.Duration("poll", 5*time.Second, "Job polling period")
heartbeat := flag.Duration("heartbeat", 30*time.Second, "Heartbeat interval during matches")
turnTimeout := flag.Duration("timeout", 3*time.Second, "Per-turn bot timeout")
maxRetries := flag.Int("retries", 3, "Max retries for transient errors")
verbose := flag.Bool("verbose", getEnv("ACB_VERBOSE", "false") == "true", "Enable verbose logging")
flag.Parse()
// Validate required config
if *apiKey == "" {
log.Fatal("API key is required (set ACB_API_KEY or use -api-key flag)")
}
cfg := &Config{
APIEndpoint: *apiEndpoint,
APIKey: *apiKey,
R2Endpoint: *r2Endpoint,
R2Bucket: *r2Bucket,
R2AccessKey: *r2AccessKey,
R2SecretKey: *r2SecretKey,
WorkerID: *workerID,
PollPeriod: *pollPeriod,
Heartbeat: *heartbeat,
TurnTimeout: *turnTimeout,
MaxRetries: *maxRetries,
Verbose: *verbose,
}
// Create API client
apiClient := NewAPIClient(cfg)
// Create R2 client (optional - if not configured, replays won't be uploaded)
var r2Client *R2Client
if cfg.R2Endpoint != "" && cfg.R2AccessKey != "" && cfg.R2SecretKey != "" {
r2Client = NewR2Client(cfg)
}
// Create worker
worker := &Worker{
cfg: cfg,
api: apiClient,
r2: r2Client,
logger: log.New(os.Stdout, fmt.Sprintf("[worker-%s] ", cfg.WorkerID), log.LstdFlags),
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
heartbeat: *heartbeat,
}
// Set up signal handling
ctx, cancel := context.WithCancel(context.Background())
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
worker.logger.Println("Received shutdown signal, finishing current job...")
cancel()
}()
// Run worker loop
worker.Run(ctx)
}
// getEnv gets an environment variable with a default value.
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
// generateWorkerID generates a random worker ID.
func generateWorkerID() string {
return fmt.Sprintf("worker-%d", rand.Intn(100000))
}
// Worker executes match jobs.
type Worker struct {
cfg *Config
api *APIClient
r2 *R2Client
logger *log.Logger
rng *rand.Rand
heartbeat time.Duration
}
// Run starts the worker loop.
func (w *Worker) Run(ctx context.Context) {
w.logger.Println("Worker started, polling for jobs...")
ticker := time.NewTicker(w.cfg.PollPeriod)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
w.logger.Println("Worker shutting down")
return
case <-ticker.C:
if err := w.pollAndExecute(ctx); err != nil {
w.logger.Printf("Error in poll cycle: %v", err)
}
}
}
}
// pollAndExecute polls for a job and executes it if available.
func (w *Worker) pollAndExecute(ctx context.Context) error {
// Get next pending job
job, err := w.api.GetNextJob(ctx)
if err != nil {
return fmt.Errorf("failed to get next job: %w", err)
}
if job == nil {
if w.cfg.Verbose {
w.logger.Println("No pending jobs")
}
return nil
}
w.logger.Printf("Found job %s for match %s", job.ID, job.MatchID)
// Claim the job
claimResp, err := w.api.ClaimJob(ctx, job.ID, w.cfg.WorkerID)
if err != nil {
return fmt.Errorf("failed to claim job %s: %w", job.ID, err)
}
w.logger.Printf("Claimed job %s, executing match...", job.ID)
// Execute the match
result, replay, err := w.executeMatch(ctx, claimResp)
if err != nil {
w.logger.Printf("Match execution failed: %v", err)
// Mark job as failed
if failErr := w.api.FailJob(ctx, job.ID, w.cfg.WorkerID, err.Error()); failErr != nil {
w.logger.Printf("Failed to mark job as failed: %v", failErr)
}
return err
}
// Upload replay to R2
replayURL := ""
if w.r2 != nil {
replayURL, err = w.uploadReplay(ctx, claimResp.Match.ID, replay)
if err != nil {
w.logger.Printf("Failed to upload replay: %v", err)
// Continue without replay URL - match result is more important
} else {
w.logger.Printf("Uploaded replay to %s", replayURL)
}
}
// Submit result
err = w.api.SubmitResult(ctx, job.ID, result, replayURL)
if err != nil {
return fmt.Errorf("failed to submit result for job %s: %w", job.ID, err)
}
w.logger.Printf("Completed job %s, winner: %s", job.ID, result.WinnerID)
return nil
}
// executeMatch runs a match and returns the result and replay.
func (w *Worker) executeMatch(ctx context.Context, claim *JobClaimResponse) (*MatchResult, *engine.Replay, error) {
// Build game config from map data
config := engine.Config{
Rows: claim.Map.Width,
Cols: claim.Map.Height,
MaxTurns: 500, // Default max turns
VisionRadius2: 49, // Default vision
AttackRadius2: 5, // Default attack
SpawnCost: 3, // Default spawn cost
EnergyInterval: 10, // Default energy interval
}
// Create match runner
runner := engine.NewMatchRunner(config,
engine.WithRNG(w.rng),
engine.WithVerbose(w.cfg.Verbose),
engine.WithTimeout(w.cfg.TurnTimeout),
)
// Add bots from claim response
for _, participant := range claim.Participants {
// Find bot endpoint
var endpointURL string
for _, bot := range claim.Bots {
if bot.ID == participant.BotID {
endpointURL = bot.EndpointURL
break
}
}
// Find bot secret
var secret string
for _, s := range claim.BotSecrets {
if s.BotID == participant.BotID {
secret = s.Secret
break
}
}
// Create auth config for HTTP bot
auth := engine.AuthConfig{
BotID: participant.BotID,
Secret: secret,
MatchID: claim.Match.ID,
}
// Create HTTP bot client
httpBot := engine.NewHTTPBot(
endpointURL,
auth,
engine.WithHTTPTimeout(w.cfg.TurnTimeout),
)
runner.AddBot(httpBot, participant.BotID)
w.logger.Printf("Added bot %s at %s (player %d)", participant.BotID, endpointURL, participant.PlayerIndex)
}
// Start heartbeat goroutine
heartbeatCtx, heartbeatCancel := context.WithCancel(ctx)
defer heartbeatCancel()
go w.sendHeartbeats(heartbeatCtx, claim.Job.ID)
// Run the match
engineResult, replay, err := runner.Run()
if err != nil {
return nil, nil, fmt.Errorf("match execution failed: %w", err)
}
// Convert result
result := &MatchResult{
WinnerID: "",
Turns: engineResult.Turns,
EndReason: engineResult.Reason,
Scores: make(map[string]int),
}
// Set winner ID from result (Winner is int, -1 for draw)
if engineResult.Winner >= 0 && engineResult.Winner < len(claim.Participants) {
for _, p := range claim.Participants {
if p.PlayerIndex == engineResult.Winner {
result.WinnerID = p.BotID
break
}
}
}
// Calculate scores from replay
for i, p := range claim.Participants {
if i < len(engineResult.Scores) {
result.Scores[p.BotID] = engineResult.Scores[i]
}
}
return result, replay, nil
}
// sendHeartbeats sends periodic heartbeats while a match is running.
func (w *Worker) sendHeartbeats(ctx context.Context, jobID string) {
ticker := time.NewTicker(w.heartbeat)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := w.api.Heartbeat(ctx, jobID, w.cfg.WorkerID); err != nil {
w.logger.Printf("Heartbeat failed: %v", err)
}
}
}
}
// uploadReplay uploads the replay to R2 and returns the URL.
func (w *Worker) uploadReplay(ctx context.Context, matchID string, replay *engine.Replay) (string, error) {
if w.r2 == nil {
return "", fmt.Errorf("R2 client not configured")
}
// Serialize replay to JSON
data, err := json.Marshal(replay)
if err != nil {
return "", fmt.Errorf("failed to serialize replay: %w", err)
}
// Upload to R2
key := fmt.Sprintf("replays/%s.json", matchID)
if err := w.r2.Upload(ctx, key, data, "application/json"); err != nil {
return "", fmt.Errorf("failed to upload replay to R2: %w", err)
}
return fmt.Sprintf("%s/%s", w.r2.Endpoint(), key), nil
}
// MatchResult represents the result of a match for API submission.
type MatchResult struct {
WinnerID string `json:"winner_id"`
Turns int `json:"turns"`
EndReason string `json:"end_reason"`
Scores map[string]int `json:"scores"`
}

120
cmd/acb-worker/r2.go Normal file
View file

@ -0,0 +1,120 @@
// R2 client for uploading replays
package main
import (
"bytes"
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// R2Client handles R2 bucket operations.
type R2Client struct {
client *s3.Client
bucket string
endpoint string
}
// NewR2Client creates a new R2 client.
func NewR2Client(cfg *Config) *R2Client {
// Create custom endpoint resolver for R2
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
URL: cfg.R2Endpoint,
SigningRegion: "auto",
}, nil
})
// Load AWS config with R2 credentials
awsCfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
cfg.R2AccessKey,
cfg.R2SecretKey,
"",
)),
config.WithEndpointResolverWithOptions(customResolver),
)
if err != nil {
panic(fmt.Sprintf("failed to load AWS config: %v", err))
}
return &R2Client{
client: s3.NewFromConfig(awsCfg),
bucket: cfg.R2Bucket,
endpoint: cfg.R2Endpoint,
}
}
// Upload uploads data to R2.
func (c *R2Client) Upload(ctx context.Context, key string, data []byte, contentType string) error {
_, err := c.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
ContentType: aws.String(contentType),
CacheControl: aws.String("public, max-age=31536000, immutable"),
})
return err
}
// Download downloads data from R2.
func (c *R2Client) Download(ctx context.Context, key string) ([]byte, error) {
resp, err := c.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(key),
})
if err != nil {
return nil, err
}
defer resp.Body.Close()
buf := new(bytes.Buffer)
if _, err := buf.ReadFrom(resp.Body); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Delete deletes an object from R2.
func (c *R2Client) Delete(ctx context.Context, key string) error {
_, err := c.client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(key),
})
return err
}
// List lists objects with a prefix.
func (c *R2Client) List(ctx context.Context, prefix string) ([]string, error) {
var keys []string
paginator := s3.NewListObjectsV2Paginator(c.client, &s3.ListObjectsV2Input{
Bucket: aws.String(c.bucket),
Prefix: aws.String(prefix),
})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return nil, err
}
for _, obj := range page.Contents {
if obj.Key != nil {
keys = append(keys, *obj.Key)
}
}
}
return keys, nil
}
// Endpoint returns the R2 endpoint URL.
func (c *R2Client) Endpoint() string {
return c.endpoint
}

22
go.mod
View file

@ -1,3 +1,25 @@
module github.com/aicodebattle/acb
go 1.24.3
require (
github.com/aws/aws-sdk-go-v2 v1.41.4 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 // indirect
github.com/aws/aws-sdk-go-v2/config v1.32.12 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.19.12 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.21 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.12 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.20 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.20 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.97.2 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.17 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.41.9 // indirect
github.com/aws/smithy-go v1.24.2 // indirect
)

38
go.sum Normal file
View file

@ -0,0 +1,38 @@
github.com/aws/aws-sdk-go-v2 v1.41.4 h1:10f50G7WyU02T56ox1wWXq+zTX9I1zxG46HYuG1hH/k=
github.com/aws/aws-sdk-go-v2 v1.41.4/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 h1:eBMB84YGghSocM7PsjmmPffTa+1FBUeNvGvFou6V/4o=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8/go.mod h1:lyw7GFp3qENLh7kwzf7iMzAxDn+NzjXEAGjKS2UOKqI=
github.com/aws/aws-sdk-go-v2/config v1.32.12 h1:O3csC7HUGn2895eNrLytOJQdoL2xyJy0iYXhoZ1OmP0=
github.com/aws/aws-sdk-go-v2/config v1.32.12/go.mod h1:96zTvoOFR4FURjI+/5wY1vc1ABceROO4lWgWJuxgy0g=
github.com/aws/aws-sdk-go-v2/credentials v1.19.12 h1:oqtA6v+y5fZg//tcTWahyN9PEn5eDU/Wpvc2+kJ4aY8=
github.com/aws/aws-sdk-go-v2/credentials v1.19.12/go.mod h1:U3R1RtSHx6NB0DvEQFGyf/0sbrpJrluENHdPy1j/3TE=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20 h1:zOgq3uezl5nznfoK3ODuqbhVg1JzAGDUhXOsU0IDCAo=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20/go.mod h1:z/MVwUARehy6GAg/yQ1GO2IMl0k++cu1ohP9zo887wE=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.20 h1:CNXO7mvgThFGqOFgbNAP2nol2qAWBOGfqR/7tQlvLmc=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.20/go.mod h1:oydPDJKcfMhgfcgBUZaG+toBbwy8yPWubJXBVERtI4o=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.20 h1:tN6W/hg+pkM+tf9XDkWUbDEjGLb+raoBMFsTodcoYKw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.20/go.mod h1:YJ898MhD067hSHA6xYCx5ts/jEd8BSOLtQDL3iZsvbc=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 h1:qYQ4pzQ2Oz6WpQ8T3HvGHnZydA72MnLuFK9tJwmrbHw=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6/go.mod h1:O3h0IK87yXci+kg6flUKzJnWeziQUKciKrLjcatSNcY=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.21 h1:SwGMTMLIlvDNyhMteQ6r8IJSBPlRdXX5d4idhIGbkXA=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.21/go.mod h1:UUxgWxofmOdAMuqEsSppbDtGKLfR04HGsD0HXzvhI1k=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 h1:5EniKhLZe4xzL7a+fU3C2tfUN4nWIqlLesfrjkuPFTY=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7/go.mod h1:x0nZssQ3qZSnIcePWLvcoFisRXJzcTVvYpAAdYX8+GI=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.12 h1:qtJZ70afD3ISKWnoX3xB0J2otEqu3LqicRcDBqsj0hQ=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.12/go.mod h1:v2pNpJbRNl4vEUWEh5ytQok0zACAKfdmKS51Hotc3pQ=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.20 h1:2HvVAIq+YqgGotK6EkMf+KIEqTISmTYh5zLpYyeTo1Y=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.20/go.mod h1:V4X406Y666khGa8ghKmphma/7C0DAtEQYhkq9z4vpbk=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.20 h1:siU1A6xjUZ2N8zjTHSXFhB9L/2OY8Dqs0xXiLjF30jA=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.20/go.mod h1:4TLZCmVJDM3FOu5P5TJP0zOlu9zWgDWU7aUxWbr+rcw=
github.com/aws/aws-sdk-go-v2/service/s3 v1.97.2 h1:MRNiP6nqa20aEl8fQ6PJpEq11b2d40b16sm4WD7QgMU=
github.com/aws/aws-sdk-go-v2/service/s3 v1.97.2/go.mod h1:FrNA56srbsr3WShiaelyWYEo70x80mXnVZ17ZZfbeqg=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 h1:0GFOLzEbOyZABS3PhYfBIx2rNBACYcKty+XGkTgw1ow=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.8/go.mod h1:LXypKvk85AROkKhOG6/YEcHFPoX+prKTowKnVdcaIxE=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 h1:kiIDLZ005EcKomYYITtfsjn7dtOwHDOFy7IbPXKek2o=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.13/go.mod h1:2h/xGEowcW/g38g06g3KpRWDlT+OTfxxI0o1KqayAB8=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.17 h1:jzKAXIlhZhJbnYwHbvUQZEB8KfgAEuG0dc08Bkda7NU=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.17/go.mod h1:Al9fFsXjv4KfbzQHGe6V4NZSZQXecFcvaIF4e70FoRA=
github.com/aws/aws-sdk-go-v2/service/sts v1.41.9 h1:Cng+OOwCHmFljXIxpEVXAGMnBia8MSU6Ch5i9PgBkcU=
github.com/aws/aws-sdk-go-v2/service/sts v1.41.9/go.mod h1:LrlIndBDdjA/EeXeyNBle+gyCwTlizzW5ycgWnvIxkk=
github.com/aws/smithy-go v1.24.2 h1:FzA3bu/nt/vDvmnkg+R8Xl46gmzEDam6mZ1hzmwXFng=
github.com/aws/smithy-go v1.24.2/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=

View file

@ -55,8 +55,8 @@ describe('Glicko-2 Rating System', () => {
});
it('returns correct values for known inputs', () => {
// g(0.2) ≈ 0.9955 (from paper example)
expect(g(0.2)).toBeCloseTo(0.9955, 4);
// g(0.2) = 1/sqrt(1 + 3*0.04/pi^2) ≈ 0.993976
expect(g(0.2)).toBeCloseTo(0.993976, 4);
});
});

View file

@ -40,14 +40,14 @@ export function fromGlicko2(g2: Glicko2Rating): { rating: number; rd: number } {
/**
* Compute g(phi) function
*/
function g(phi: number): number {
export function g(phi: number): number {
return 1 / Math.sqrt(1 + (3 * phi * phi) / (Math.PI * Math.PI));
}
/**
* Compute E(mu, mu_j, phi_j) function
*/
function E(mu: number, mu_j: number, phi_j: number): number {
export function E(mu: number, mu_j: number, phi_j: number): number {
return 1 / (1 + Math.exp(-g(phi_j) * (mu - mu_j)));
}