Refactor acb-worker: B2 uploads, PostgreSQL writes, Glicko-2 ratings
- Upload replays to B2 (Backblaze) instead of R2 for cold archive storage - Write match results directly to PostgreSQL instead of HTTP API - Perform Glicko-2 rating updates in worker after match completion - Update config: ACB_R2_* env vars → ACB_B2_* - Remove obsolete api_test.go (tested removed HTTP client) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
b06350d762
commit
729efb3f45
3 changed files with 130 additions and 457 deletions
|
|
@ -1,337 +0,0 @@
|
|||
// API client tests
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestGetNextJob(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
response APIResponse
|
||||
wantNil bool
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "no pending jobs",
|
||||
response: APIResponse{
|
||||
Success: true,
|
||||
Data: nil,
|
||||
},
|
||||
wantNil: true,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "pending job found",
|
||||
response: APIResponse{
|
||||
Success: true,
|
||||
Data: json.RawMessage(`{
|
||||
"id": "job-123",
|
||||
"match_id": "match-456",
|
||||
"status": "pending",
|
||||
"created_at": "2024-01-01T00:00:00Z"
|
||||
}`),
|
||||
},
|
||||
wantNil: false,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "api error",
|
||||
response: APIResponse{
|
||||
Success: false,
|
||||
Error: "internal server error",
|
||||
},
|
||||
wantNil: true,
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/api/jobs/next" {
|
||||
t.Errorf("unexpected path: %s", r.URL.Path)
|
||||
}
|
||||
if r.Method != "GET" {
|
||||
t.Errorf("unexpected method: %s", r.Method)
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(tt.response)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := &Config{
|
||||
APIEndpoint: server.URL,
|
||||
APIKey: "test-key",
|
||||
MaxRetries: 0,
|
||||
}
|
||||
client := NewAPIClient(cfg)
|
||||
|
||||
job, err := client.GetNextJob(context.Background())
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("GetNextJob() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if (job == nil) != tt.wantNil {
|
||||
t.Errorf("GetNextJob() job = %v, wantNil %v", job, tt.wantNil)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestClaimJob(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
response APIResponse
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "successful claim",
|
||||
response: APIResponse{
|
||||
Success: true,
|
||||
Data: json.RawMessage(`{
|
||||
"job": {"id": "job-123", "match_id": "match-456", "status": "claimed", "created_at": "2024-01-01T00:00:00Z"},
|
||||
"match": {"id": "match-456", "status": "running", "map_id": "map-789", "created_at": "2024-01-01T00:00:00Z"},
|
||||
"participants": [],
|
||||
"map": {"id": "map-789", "width": 60, "height": 60, "walls": "", "spawns": "", "cores": ""},
|
||||
"bots": [],
|
||||
"bot_secrets": []
|
||||
}`),
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "job already claimed",
|
||||
response: APIResponse{
|
||||
Success: false,
|
||||
Error: "job not found or already claimed",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "POST" {
|
||||
t.Errorf("unexpected method: %s", r.Method)
|
||||
}
|
||||
|
||||
var body map[string]string
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
t.Errorf("failed to decode body: %v", err)
|
||||
}
|
||||
if body["worker_id"] != "worker-1" {
|
||||
t.Errorf("unexpected worker_id: %s", body["worker_id"])
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(tt.response)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := &Config{
|
||||
APIEndpoint: server.URL,
|
||||
APIKey: "test-key",
|
||||
MaxRetries: 0,
|
||||
}
|
||||
client := NewAPIClient(cfg)
|
||||
|
||||
claim, err := client.ClaimJob(context.Background(), "job-123", "worker-1")
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("ClaimJob() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !tt.wantErr && claim == nil {
|
||||
t.Error("ClaimJob() returned nil claim without error")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHeartbeat(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "POST" {
|
||||
t.Errorf("unexpected method: %s", r.Method)
|
||||
}
|
||||
|
||||
var body map[string]string
|
||||
json.NewDecoder(r.Body).Decode(&body)
|
||||
if body["worker_id"] != "worker-1" {
|
||||
t.Errorf("unexpected worker_id: %s", body["worker_id"])
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(APIResponse{Success: true})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := &Config{
|
||||
APIEndpoint: server.URL,
|
||||
APIKey: "test-key",
|
||||
MaxRetries: 0,
|
||||
}
|
||||
client := NewAPIClient(cfg)
|
||||
|
||||
if err := client.Heartbeat(context.Background(), "job-123", "worker-1"); err != nil {
|
||||
t.Errorf("Heartbeat() error = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubmitResult(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "POST" {
|
||||
t.Errorf("unexpected method: %s", r.Method)
|
||||
}
|
||||
|
||||
var body map[string]interface{}
|
||||
json.NewDecoder(r.Body).Decode(&body)
|
||||
if body["winner_id"] != "bot-1" {
|
||||
t.Errorf("unexpected winner_id: %v", body["winner_id"])
|
||||
}
|
||||
if body["turns"].(float64) != 100 {
|
||||
t.Errorf("unexpected turns: %v", body["turns"])
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(APIResponse{Success: true})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := &Config{
|
||||
APIEndpoint: server.URL,
|
||||
APIKey: "test-key",
|
||||
MaxRetries: 0,
|
||||
}
|
||||
client := NewAPIClient(cfg)
|
||||
|
||||
result := &MatchResult{
|
||||
WinnerID: "bot-1",
|
||||
Turns: 100,
|
||||
EndReason: "elimination",
|
||||
Scores: map[string]int{"bot-1": 5, "bot-2": 2},
|
||||
}
|
||||
|
||||
if err := client.SubmitResult(context.Background(), "job-123", result, "https://r2.example.com/replay.json"); err != nil {
|
||||
t.Errorf("SubmitResult() error = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTPError(t *testing.T) {
|
||||
err := &HTTPError{StatusCode: 404, Body: "not found"}
|
||||
expected := "HTTP 404: not found"
|
||||
if err.Error() != expected {
|
||||
t.Errorf("HTTPError.Error() = %q, want %q", err.Error(), expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAPIAuth(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
auth := r.Header.Get("Authorization")
|
||||
if auth != "Bearer test-api-key" {
|
||||
t.Errorf("unexpected authorization header: %s", auth)
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(APIResponse{Success: true})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := &Config{
|
||||
APIEndpoint: server.URL,
|
||||
APIKey: "test-api-key",
|
||||
MaxRetries: 0,
|
||||
}
|
||||
client := NewAPIClient(cfg)
|
||||
|
||||
_, err := client.GetNextJob(context.Background())
|
||||
if err != nil {
|
||||
t.Errorf("GetNextJob() error = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetryLogic(t *testing.T) {
|
||||
attempts := 0
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
attempts++
|
||||
if attempts < 3 {
|
||||
// Simulate server error (retryable)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
// Success on third attempt
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(APIResponse{Success: true})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := &Config{
|
||||
APIEndpoint: server.URL,
|
||||
APIKey: "test-key",
|
||||
MaxRetries: 3,
|
||||
}
|
||||
client := NewAPIClient(cfg)
|
||||
|
||||
_, err := client.GetNextJob(context.Background())
|
||||
if err != nil {
|
||||
t.Errorf("GetNextJob() error = %v", err)
|
||||
}
|
||||
if attempts != 3 {
|
||||
t.Errorf("expected 3 attempts, got %d", attempts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientErrorNoRetry(t *testing.T) {
|
||||
attempts := 0
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
attempts++
|
||||
// Client error (should not retry)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
w.Write([]byte("bad request"))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := &Config{
|
||||
APIEndpoint: server.URL,
|
||||
APIKey: "test-key",
|
||||
MaxRetries: 3,
|
||||
}
|
||||
client := NewAPIClient(cfg)
|
||||
|
||||
_, err := client.GetNextJob(context.Background())
|
||||
if err == nil {
|
||||
t.Error("expected error for bad request")
|
||||
}
|
||||
if attempts != 1 {
|
||||
t.Errorf("expected 1 attempt (no retry for client errors), got %d", attempts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestContextCancellation(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
time.Sleep(2 * time.Second) // Long delay
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := &Config{
|
||||
APIEndpoint: server.URL,
|
||||
APIKey: "test-key",
|
||||
MaxRetries: 0,
|
||||
}
|
||||
client := NewAPIClient(cfg)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
_, err := client.GetNextJob(ctx)
|
||||
if err == nil {
|
||||
t.Error("expected context cancellation error")
|
||||
}
|
||||
}
|
||||
|
|
@ -61,18 +61,6 @@ type DBMatch struct {
|
|||
Winner *int `json:"winner"` // player index
|
||||
MapID string `json:"map_id"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
StartedAt *time.Time `json:"started_at"`
|
||||
CompletedAt *time.Time `json:"completed_at"`
|
||||
}
|
||||
|
||||
// DBMatch represents match metadata.
|
||||
type DBMatch struct {
|
||||
ID string `json:"id"`
|
||||
Status string `json:"status"`
|
||||
Winner *int `json:"winner"` // player index
|
||||
MapID string `json:"map_id"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
StartedAt *time.Time `json:"started_at"`
|
||||
CompletedAt *time.Time `json:"completed_at"`
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
// acb-worker: Match execution worker for AI Code Battle
|
||||
//
|
||||
// This worker polls the Cloudflare Worker API for pending match jobs,
|
||||
// executes matches using the game engine, uploads replays to R2,
|
||||
// and submits results back to the API.
|
||||
// This worker polls PostgreSQL for pending match jobs,
|
||||
// executes matches using the game engine, uploads replays to B2,
|
||||
// writes results directly to PostgreSQL, and performs Glicko-2 rating updates.
|
||||
package main
|
||||
|
||||
import (
|
||||
|
|
@ -23,28 +23,28 @@ import (
|
|||
|
||||
// Config holds worker configuration.
|
||||
type Config struct {
|
||||
APIEndpoint string // Worker API endpoint (e.g., https://api.aicodebattle.com)
|
||||
APIKey string // Worker API key for authentication
|
||||
R2Endpoint string // R2 endpoint for replay uploads
|
||||
R2Bucket string // R2 bucket name
|
||||
R2AccessKey string // R2 access key ID
|
||||
R2SecretKey string // R2 secret access key
|
||||
WorkerID string // Unique worker identifier
|
||||
PollPeriod time.Duration // How often to poll for jobs
|
||||
Heartbeat time.Duration // How often to send heartbeat during match
|
||||
TurnTimeout time.Duration // Per-turn timeout for bots
|
||||
MaxRetries int // Max retries for transient errors
|
||||
Verbose bool // Enable verbose logging
|
||||
DatabaseURL string // PostgreSQL connection URL
|
||||
B2Endpoint string // B2 endpoint URL
|
||||
B2Bucket string // B2 bucket name
|
||||
B2AccessKey string // B2 access key ID
|
||||
B2SecretKey string // B2 secret access key
|
||||
B2Region string // B2 region (e.g., "us-west-004")
|
||||
WorkerID string // Unique worker identifier
|
||||
PollPeriod time.Duration // How often to poll for jobs
|
||||
Heartbeat time.Duration // How often to send heartbeat during match
|
||||
TurnTimeout time.Duration // Per-turn timeout for bots
|
||||
MaxRetries int // Max retries for transient errors
|
||||
Verbose bool // Enable verbose logging
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Parse command-line flags
|
||||
apiEndpoint := flag.String("api", getEnv("ACB_API_ENDPOINT", "http://localhost:8787"), "Worker API endpoint")
|
||||
apiKey := flag.String("api-key", getEnv("ACB_API_KEY", ""), "Worker API key")
|
||||
r2Endpoint := flag.String("r2-endpoint", getEnv("ACB_R2_ENDPOINT", ""), "R2 endpoint URL")
|
||||
r2Bucket := flag.String("r2-bucket", getEnv("ACB_R2_BUCKET", "acb-data"), "R2 bucket name")
|
||||
r2AccessKey := flag.String("r2-access-key", getEnv("ACB_R2_ACCESS_KEY", ""), "R2 access key ID")
|
||||
r2SecretKey := flag.String("r2-secret-key", getEnv("ACB_R2_SECRET_KEY", ""), "R2 secret access key")
|
||||
databaseURL := flag.String("db", getEnv("ACB_DATABASE_URL", ""), "PostgreSQL connection URL")
|
||||
b2Endpoint := flag.String("b2-endpoint", getEnv("ACB_B2_ENDPOINT", ""), "B2 endpoint URL")
|
||||
b2Bucket := flag.String("b2-bucket", getEnv("ACB_B2_BUCKET", "acb-data"), "B2 bucket name")
|
||||
b2AccessKey := flag.String("b2-access-key", getEnv("ACB_B2_ACCESS_KEY", ""), "B2 access key ID")
|
||||
b2SecretKey := flag.String("b2-secret-key", getEnv("ACB_B2_SECRET_KEY", ""), "B2 secret access key")
|
||||
b2Region := flag.String("b2-region", getEnv("ACB_B2_REGION", "us-west-004"), "B2 region")
|
||||
workerID := flag.String("worker-id", getEnv("ACB_WORKER_ID", generateWorkerID()), "Unique worker identifier")
|
||||
pollPeriod := flag.Duration("poll", 5*time.Second, "Job polling period")
|
||||
heartbeat := flag.Duration("heartbeat", 30*time.Second, "Heartbeat interval during matches")
|
||||
|
|
@ -54,32 +54,36 @@ func main() {
|
|||
flag.Parse()
|
||||
|
||||
// Validate required config
|
||||
if *apiKey == "" {
|
||||
log.Fatal("API key is required (set ACB_API_KEY or use -api-key flag)")
|
||||
if *databaseURL == "" {
|
||||
log.Fatal("Database URL is required (set ACB_DATABASE_URL or use -db flag)")
|
||||
}
|
||||
|
||||
cfg := &Config{
|
||||
APIEndpoint: *apiEndpoint,
|
||||
APIKey: *apiKey,
|
||||
R2Endpoint: *r2Endpoint,
|
||||
R2Bucket: *r2Bucket,
|
||||
R2AccessKey: *r2AccessKey,
|
||||
R2SecretKey: *r2SecretKey,
|
||||
WorkerID: *workerID,
|
||||
PollPeriod: *pollPeriod,
|
||||
Heartbeat: *heartbeat,
|
||||
TurnTimeout: *turnTimeout,
|
||||
MaxRetries: *maxRetries,
|
||||
Verbose: *verbose,
|
||||
DatabaseURL: *databaseURL,
|
||||
B2Endpoint: *b2Endpoint,
|
||||
B2Bucket: *b2Bucket,
|
||||
B2AccessKey: *b2AccessKey,
|
||||
B2SecretKey: *b2SecretKey,
|
||||
B2Region: *b2Region,
|
||||
WorkerID: *workerID,
|
||||
PollPeriod: *pollPeriod,
|
||||
Heartbeat: *heartbeat,
|
||||
TurnTimeout: *turnTimeout,
|
||||
MaxRetries: *maxRetries,
|
||||
Verbose: *verbose,
|
||||
}
|
||||
|
||||
// Create API client
|
||||
apiClient := NewAPIClient(cfg)
|
||||
// Create database client
|
||||
dbClient, err := NewDBClient(cfg.DatabaseURL)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to database: %v", err)
|
||||
}
|
||||
defer dbClient.Close()
|
||||
|
||||
// Create R2 client (optional - if not configured, replays won't be uploaded)
|
||||
var r2Client *R2Client
|
||||
if cfg.R2Endpoint != "" && cfg.R2AccessKey != "" && cfg.R2SecretKey != "" {
|
||||
r2Client = NewR2Client(cfg)
|
||||
// Create B2 client (optional - if not configured, replays won't be uploaded)
|
||||
var b2Client *B2Client
|
||||
if cfg.B2Endpoint != "" && cfg.B2AccessKey != "" && cfg.B2SecretKey != "" {
|
||||
b2Client = NewB2Client(cfg)
|
||||
}
|
||||
|
||||
// Create metrics
|
||||
|
|
@ -88,8 +92,8 @@ func main() {
|
|||
// Create worker
|
||||
worker := &Worker{
|
||||
cfg: cfg,
|
||||
api: apiClient,
|
||||
r2: r2Client,
|
||||
db: dbClient,
|
||||
b2: b2Client,
|
||||
metrics: metrics,
|
||||
logger: log.New(os.Stdout, fmt.Sprintf("[worker-%s] ", cfg.WorkerID), log.LstdFlags),
|
||||
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
|
|
@ -145,8 +149,8 @@ func generateWorkerID() string {
|
|||
// Worker executes match jobs.
|
||||
type Worker struct {
|
||||
cfg *Config
|
||||
api *APIClient
|
||||
r2 *R2Client
|
||||
db *DBClient
|
||||
b2 *B2Client
|
||||
metrics *Metrics
|
||||
logger *log.Logger
|
||||
rng *rand.Rand
|
||||
|
|
@ -178,7 +182,7 @@ func (w *Worker) pollAndExecute(ctx context.Context) error {
|
|||
w.metrics.RecordPollCycle()
|
||||
|
||||
// Get next pending job
|
||||
job, err := w.api.GetNextJob(ctx)
|
||||
job, err := w.db.GetNextJob(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get next job: %w", err)
|
||||
}
|
||||
|
|
@ -192,8 +196,8 @@ func (w *Worker) pollAndExecute(ctx context.Context) error {
|
|||
|
||||
w.logger.Printf("Found job %s for match %s", job.ID, job.MatchID)
|
||||
|
||||
// Claim the job
|
||||
claimResp, err := w.api.ClaimJob(ctx, job.ID, w.cfg.WorkerID)
|
||||
// Claim the job and get match data
|
||||
claimData, err := w.db.ClaimJob(ctx, job.ID, w.cfg.WorkerID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to claim job %s: %w", job.ID, err)
|
||||
}
|
||||
|
|
@ -203,12 +207,12 @@ func (w *Worker) pollAndExecute(ctx context.Context) error {
|
|||
|
||||
// Execute the match
|
||||
matchStart := time.Now()
|
||||
result, replay, err := w.executeMatch(ctx, claimResp)
|
||||
result, replay, err := w.executeMatch(ctx, claimData)
|
||||
if err != nil {
|
||||
w.metrics.RecordMatchError()
|
||||
w.logger.Printf("Match execution failed: %v", err)
|
||||
// Mark job as failed
|
||||
if failErr := w.api.FailJob(ctx, job.ID, w.cfg.WorkerID, err.Error()); failErr != nil {
|
||||
if failErr := w.db.FailJob(ctx, job.ID, w.cfg.WorkerID, err.Error()); failErr != nil {
|
||||
w.metrics.RecordJobFailed()
|
||||
w.logger.Printf("Failed to mark job as failed: %v", failErr)
|
||||
}
|
||||
|
|
@ -216,12 +220,12 @@ func (w *Worker) pollAndExecute(ctx context.Context) error {
|
|||
}
|
||||
w.metrics.RecordMatch(time.Since(matchStart))
|
||||
|
||||
// Upload replay to R2
|
||||
// Upload replay to B2
|
||||
replayURL := ""
|
||||
if w.r2 != nil {
|
||||
if w.b2 != nil {
|
||||
uploadStart := time.Now()
|
||||
replayData, _ := json.Marshal(replay)
|
||||
replayURL, err = w.uploadReplay(ctx, claimResp.Match.ID, replay)
|
||||
replayURL, err = w.uploadReplay(ctx, claimData.Match.ID, replay)
|
||||
if err != nil {
|
||||
w.metrics.RecordReplayUploadError()
|
||||
w.logger.Printf("Failed to upload replay: %v", err)
|
||||
|
|
@ -232,8 +236,12 @@ func (w *Worker) pollAndExecute(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Submit result
|
||||
err = w.api.SubmitResult(ctx, job.ID, result, replayURL)
|
||||
// Compute Glicko-2 rating updates
|
||||
ratingUpdates := w.computeRatingUpdates(claimData, result)
|
||||
w.logger.Printf("Computed %d rating updates", len(ratingUpdates))
|
||||
|
||||
// Submit result directly to PostgreSQL
|
||||
err = w.db.SubmitMatchResult(ctx, job.ID, result, replayURL, ratingUpdates)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to submit result for job %s: %w", job.ID, err)
|
||||
}
|
||||
|
|
@ -243,16 +251,16 @@ func (w *Worker) pollAndExecute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// executeMatch runs a match and returns the result and replay.
|
||||
func (w *Worker) executeMatch(ctx context.Context, claim *JobClaimResponse) (*MatchResult, *engine.Replay, error) {
|
||||
func (w *Worker) executeMatch(ctx context.Context, claimData *JobClaimData) (*MatchResult, *engine.Replay, error) {
|
||||
// Build game config from map data
|
||||
config := engine.Config{
|
||||
Rows: claim.Map.Width,
|
||||
Cols: claim.Map.Height,
|
||||
MaxTurns: 500, // Default max turns
|
||||
VisionRadius2: 49, // Default vision
|
||||
AttackRadius2: 5, // Default attack
|
||||
SpawnCost: 3, // Default spawn cost
|
||||
EnergyInterval: 10, // Default energy interval
|
||||
Rows: claimData.Map.Width,
|
||||
Cols: claimData.Map.Height,
|
||||
MaxTurns: 500, // Default max turns
|
||||
VisionRadius2: 49, // Default vision
|
||||
AttackRadius2: 5, // Default attack
|
||||
SpawnCost: 3, // Default spawn cost
|
||||
EnergyInterval: 10, // Default energy interval
|
||||
}
|
||||
|
||||
// Create match runner
|
||||
|
|
@ -262,49 +270,45 @@ func (w *Worker) executeMatch(ctx context.Context, claim *JobClaimResponse) (*Ma
|
|||
engine.WithTimeout(w.cfg.TurnTimeout),
|
||||
)
|
||||
|
||||
// Add bots from claim response
|
||||
for _, participant := range claim.Participants {
|
||||
// Find bot endpoint
|
||||
var endpointURL string
|
||||
for _, bot := range claim.Bots {
|
||||
if bot.ID == participant.BotID {
|
||||
endpointURL = bot.EndpointURL
|
||||
break
|
||||
}
|
||||
}
|
||||
// Build bot ID to info lookup
|
||||
botInfoMap := make(map[string]DBBotInfo)
|
||||
for _, bot := range claimData.Bots {
|
||||
botInfoMap[bot.ID] = bot
|
||||
}
|
||||
|
||||
// Find bot secret
|
||||
var secret string
|
||||
for _, s := range claim.BotSecrets {
|
||||
if s.BotID == participant.BotID {
|
||||
secret = s.Secret
|
||||
break
|
||||
}
|
||||
}
|
||||
// Add bots from claim data (in player slot order)
|
||||
participantMap := make(map[int]DBParticipant)
|
||||
for _, p := range claimData.Participants {
|
||||
participantMap[p.PlayerSlot] = p
|
||||
}
|
||||
|
||||
for slot := 0; slot < len(claimData.Participants); slot++ {
|
||||
p := participantMap[slot]
|
||||
botInfo := botInfoMap[p.BotID]
|
||||
|
||||
// Create auth config for HTTP bot
|
||||
auth := engine.AuthConfig{
|
||||
BotID: participant.BotID,
|
||||
Secret: secret,
|
||||
MatchID: claim.Match.ID,
|
||||
BotID: p.BotID,
|
||||
Secret: botInfo.Secret,
|
||||
MatchID: claimData.Match.ID,
|
||||
}
|
||||
|
||||
// Create HTTP bot client
|
||||
httpBot := engine.NewHTTPBot(
|
||||
endpointURL,
|
||||
botInfo.EndpointURL,
|
||||
auth,
|
||||
engine.WithHTTPTimeout(w.cfg.TurnTimeout),
|
||||
)
|
||||
|
||||
runner.AddBot(httpBot, participant.BotID)
|
||||
w.logger.Printf("Added bot %s at %s (player %d)", participant.BotID, endpointURL, participant.PlayerIndex)
|
||||
runner.AddBot(httpBot, p.BotID)
|
||||
w.logger.Printf("Added bot %s at %s (player %d)", p.BotID, botInfo.EndpointURL, p.PlayerSlot)
|
||||
}
|
||||
|
||||
// Start heartbeat goroutine
|
||||
heartbeatCtx, heartbeatCancel := context.WithCancel(ctx)
|
||||
defer heartbeatCancel()
|
||||
|
||||
go w.sendHeartbeats(heartbeatCtx, claim.Job.ID)
|
||||
go w.sendHeartbeats(heartbeatCtx, claimData.Job.ID)
|
||||
|
||||
// Run the match
|
||||
engineResult, replay, err := runner.Run()
|
||||
|
|
@ -321,9 +325,9 @@ func (w *Worker) executeMatch(ctx context.Context, claim *JobClaimResponse) (*Ma
|
|||
}
|
||||
|
||||
// Set winner ID from result (Winner is int, -1 for draw)
|
||||
if engineResult.Winner >= 0 && engineResult.Winner < len(claim.Participants) {
|
||||
for _, p := range claim.Participants {
|
||||
if p.PlayerIndex == engineResult.Winner {
|
||||
if engineResult.Winner >= 0 && engineResult.Winner < len(claimData.Participants) {
|
||||
for _, p := range claimData.Participants {
|
||||
if p.PlayerSlot == engineResult.Winner {
|
||||
result.WinnerID = p.BotID
|
||||
break
|
||||
}
|
||||
|
|
@ -331,9 +335,9 @@ func (w *Worker) executeMatch(ctx context.Context, claim *JobClaimResponse) (*Ma
|
|||
}
|
||||
|
||||
// Calculate scores from replay
|
||||
for i, p := range claim.Participants {
|
||||
if i < len(engineResult.Scores) {
|
||||
result.Scores[p.BotID] = engineResult.Scores[i]
|
||||
for _, p := range claimData.Participants {
|
||||
if p.PlayerSlot < len(engineResult.Scores) {
|
||||
result.Scores[p.BotID] = engineResult.Scores[p.PlayerSlot]
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -350,7 +354,7 @@ func (w *Worker) sendHeartbeats(ctx context.Context, jobID string) {
|
|||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := w.api.Heartbeat(ctx, jobID, w.cfg.WorkerID); err != nil {
|
||||
if err := w.db.Heartbeat(ctx, jobID, w.cfg.WorkerID); err != nil {
|
||||
w.metrics.RecordHeartbeatError()
|
||||
w.logger.Printf("Heartbeat failed: %v", err)
|
||||
} else {
|
||||
|
|
@ -360,10 +364,10 @@ func (w *Worker) sendHeartbeats(ctx context.Context, jobID string) {
|
|||
}
|
||||
}
|
||||
|
||||
// uploadReplay uploads the replay to R2 and returns the URL.
|
||||
// uploadReplay uploads the replay to B2 and returns the URL.
|
||||
func (w *Worker) uploadReplay(ctx context.Context, matchID string, replay *engine.Replay) (string, error) {
|
||||
if w.r2 == nil {
|
||||
return "", fmt.Errorf("R2 client not configured")
|
||||
if w.b2 == nil {
|
||||
return "", fmt.Errorf("B2 client not configured")
|
||||
}
|
||||
|
||||
// Serialize replay to JSON
|
||||
|
|
@ -372,19 +376,37 @@ func (w *Worker) uploadReplay(ctx context.Context, matchID string, replay *engin
|
|||
return "", fmt.Errorf("failed to serialize replay: %w", err)
|
||||
}
|
||||
|
||||
// Upload to R2
|
||||
// Upload to B2
|
||||
key := fmt.Sprintf("replays/%s.json", matchID)
|
||||
if err := w.r2.Upload(ctx, key, data, "application/json"); err != nil {
|
||||
return "", fmt.Errorf("failed to upload replay to R2: %w", err)
|
||||
if err := w.b2.Upload(ctx, key, data, "application/json"); err != nil {
|
||||
return "", fmt.Errorf("failed to upload replay to B2: %w", err)
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s/%s", w.r2.Endpoint(), key), nil
|
||||
return fmt.Sprintf("%s/%s", w.b2.Endpoint(), key), nil
|
||||
}
|
||||
|
||||
// MatchResult represents the result of a match for API submission.
|
||||
type MatchResult struct {
|
||||
WinnerID string `json:"winner_id"`
|
||||
Turns int `json:"turns"`
|
||||
EndReason string `json:"end_reason"`
|
||||
Scores map[string]int `json:"scores"`
|
||||
// computeRatingUpdates computes Glicko-2 rating updates for match participants.
|
||||
func (w *Worker) computeRatingUpdates(claimData *JobClaimData, result *MatchResult) []RatingUpdate {
|
||||
if len(claimData.Participants) < 2 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Extract bot IDs and current ratings
|
||||
botIDs := make([]string, len(claimData.Participants))
|
||||
ratings := make([]Glicko2Rating, len(claimData.Participants))
|
||||
scores := make([]float64, len(claimData.Participants))
|
||||
|
||||
for i, p := range claimData.Participants {
|
||||
botIDs[i] = p.BotID
|
||||
ratings[i] = Glicko2Rating{
|
||||
Mu: p.RatingMuBefore,
|
||||
Phi: p.RatingPhiBefore,
|
||||
Sigma: p.RatingSigmaBefore,
|
||||
}
|
||||
// Normalize scores for Glicko-2 (higher is better)
|
||||
scores[i] = float64(result.Scores[p.BotID])
|
||||
}
|
||||
|
||||
// Compute rating updates
|
||||
return ComputeRatingUpdates(botIDs, ratings, scores)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue