From dc0caf0115eab328a596edc9bc07af97ccaff16a Mon Sep 17 00:00:00 2001 From: jedarden Date: Thu, 30 Apr 2026 10:24:47 -0400 Subject: [PATCH] feat(worker): upload replays directly to R2 in addition to B2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds R2 (Cloudflare) as a direct upload target alongside B2 (cold archive). When ACB_R2_* credentials are configured, the worker uploads replays and thumbnails to R2 immediately after each match, bypassing the index-builder's B2→R2 promotion cycle. This is necessary because ARMOR's B2 app key is write-only; reads via the direct S3 path return 403. The Cloudflare CDN read path (armor-hub-b2.ardenone.com) is dead post-hub-decommission. Direct R2 upload ensures replays are available without waiting for a working B2 read path. Co-Authored-By: Claude Sonnet 4.6 --- cmd/acb-worker/b2.go | 26 +++++++++++++ cmd/acb-worker/main.go | 85 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 96 insertions(+), 15 deletions(-) diff --git a/cmd/acb-worker/b2.go b/cmd/acb-worker/b2.go index de53b40..8445405 100644 --- a/cmd/acb-worker/b2.go +++ b/cmd/acb-worker/b2.go @@ -19,6 +19,32 @@ type B2Client struct { endpoint string } +// NewR2Client creates a new Cloudflare R2 client using the same B2Client type. +func NewR2Client(cfg *Config) *B2Client { + awsCfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( + cfg.R2AccessKey, + cfg.R2SecretKey, + "", + )), + config.WithRegion("auto"), + ) + if err != nil { + panic(fmt.Sprintf("failed to load R2 AWS config: %v", err)) + } + + client := s3.NewFromConfig(awsCfg, func(o *s3.Options) { + o.BaseEndpoint = aws.String(cfg.R2Endpoint) + o.UsePathStyle = true + }) + + return &B2Client{ + client: client, + bucket: cfg.R2Bucket, + endpoint: cfg.R2Endpoint, + } +} + // NewB2Client creates a new B2 client. func NewB2Client(cfg *Config) *B2Client { // Load AWS config with B2 credentials diff --git a/cmd/acb-worker/main.go b/cmd/acb-worker/main.go index d1aed50..6215a7a 100644 --- a/cmd/acb-worker/main.go +++ b/cmd/acb-worker/main.go @@ -26,11 +26,15 @@ import ( // Config holds worker configuration. type Config struct { DatabaseURL string // PostgreSQL connection URL - B2Endpoint string // B2 endpoint URL + B2Endpoint string // B2 endpoint URL (ARMOR proxy) B2Bucket string // B2 bucket name B2AccessKey string // B2 access key ID B2SecretKey string // B2 secret access key B2Region string // B2 region (e.g., "us-west-004") + R2Endpoint string // R2 endpoint URL (Cloudflare R2 S3 API) + R2Bucket string // R2 bucket name + R2AccessKey string // R2 access key ID + R2SecretKey string // R2 secret access key WorkerID string // Unique worker identifier PollPeriod time.Duration // How often to poll for jobs Heartbeat time.Duration // How often to send heartbeat during match @@ -47,6 +51,10 @@ func main() { b2AccessKey := flag.String("b2-access-key", getEnv("ACB_B2_ACCESS_KEY", ""), "B2 access key ID") b2SecretKey := flag.String("b2-secret-key", getEnv("ACB_B2_SECRET_KEY", ""), "B2 secret access key") b2Region := flag.String("b2-region", getEnv("ACB_B2_REGION", "us-west-004"), "B2 region") + r2Endpoint := flag.String("r2-endpoint", getEnv("ACB_R2_ENDPOINT", ""), "R2 endpoint URL") + r2Bucket := flag.String("r2-bucket", getEnv("ACB_R2_BUCKET", ""), "R2 bucket name") + r2AccessKey := flag.String("r2-access-key", getEnv("ACB_R2_ACCESS_KEY", ""), "R2 access key ID") + r2SecretKey := flag.String("r2-secret-key", getEnv("ACB_R2_SECRET_KEY", ""), "R2 secret access key") workerID := flag.String("worker-id", getEnv("ACB_WORKER_ID", generateWorkerID()), "Unique worker identifier") pollPeriod := flag.Duration("poll", 5*time.Second, "Job polling period") heartbeat := flag.Duration("heartbeat", 30*time.Second, "Heartbeat interval during matches") @@ -67,6 +75,10 @@ func main() { B2AccessKey: *b2AccessKey, B2SecretKey: *b2SecretKey, B2Region: *b2Region, + R2Endpoint: *r2Endpoint, + R2Bucket: *r2Bucket, + R2AccessKey: *r2AccessKey, + R2SecretKey: *r2SecretKey, WorkerID: *workerID, PollPeriod: *pollPeriod, Heartbeat: *heartbeat, @@ -82,12 +94,19 @@ func main() { } defer dbClient.Close() - // Create B2 client (optional - if not configured, replays won't be uploaded) + // Create B2 client (optional - if not configured, replays won't be uploaded to cold archive) var b2Client *B2Client if cfg.B2Endpoint != "" && cfg.B2AccessKey != "" && cfg.B2SecretKey != "" { b2Client = NewB2Client(cfg) } + // Create R2 client (optional - if configured, replays are written to R2 immediately, + // making them available without waiting for the B2→R2 promotion cycle) + var r2Client *B2Client + if cfg.R2Endpoint != "" && cfg.R2AccessKey != "" && cfg.R2SecretKey != "" { + r2Client = NewR2Client(cfg) + } + // Create metrics wMetrics := NewMetrics(cfg.WorkerID) @@ -96,6 +115,7 @@ func main() { cfg: cfg, db: dbClient, b2: b2Client, + r2: r2Client, metrics: wMetrics, logger: log.New(os.Stdout, fmt.Sprintf("[worker-%s] ", cfg.WorkerID), log.LstdFlags), rng: rand.New(rand.NewSource(time.Now().UnixNano())), @@ -139,6 +159,7 @@ type Worker struct { cfg *Config db *DBClient b2 *B2Client + r2 *B2Client metrics *Metrics logger *log.Logger rng *rand.Rand @@ -376,10 +397,11 @@ func (w *Worker) sendHeartbeats(ctx context.Context, jobID string) { } } -// uploadReplay uploads the gzipped replay to B2 and returns the URL. +// uploadReplay uploads the gzipped replay to B2 (cold archive) and R2 (hot cache). +// Returns error only if both uploads fail; a B2-only failure is logged but not fatal. func (w *Worker) uploadReplay(ctx context.Context, matchID string, replay *engine.Replay) (string, error) { - if w.b2 == nil { - return "", fmt.Errorf("B2 client not configured") + if w.b2 == nil && w.r2 == nil { + return "", fmt.Errorf("no storage client configured") } // Serialize replay to JSON @@ -397,20 +419,42 @@ func (w *Worker) uploadReplay(ctx context.Context, matchID string, replay *engin if err := gw.Close(); err != nil { return "", fmt.Errorf("failed to close gzip writer: %w", err) } + compressed := buf.Bytes() - // Upload to B2 key := fmt.Sprintf("replays/%s.json.gz", matchID) - if err := w.b2.Upload(ctx, key, buf.Bytes(), "application/json", "gzip"); err != nil { - return "", fmt.Errorf("failed to upload replay to B2: %w", err) + var uploadURL string + + // Upload to B2 via ARMOR (cold archive, encrypted) + if w.b2 != nil { + if err := w.b2.Upload(ctx, key, compressed, "application/json", "gzip"); err != nil { + w.logger.Printf("Warning: failed to upload replay to B2 (non-fatal): %v", err) + } else { + uploadURL = fmt.Sprintf("%s/%s", w.b2.Endpoint(), key) + } } - return fmt.Sprintf("%s/%s", w.b2.Endpoint(), key), nil + // Upload to R2 directly (hot cache, bypasses B2→R2 promotion cycle) + if w.r2 != nil { + if err := w.r2.Upload(ctx, key, compressed, "application/json", "gzip"); err != nil { + w.logger.Printf("Warning: failed to upload replay to R2: %v", err) + } else { + w.logger.Printf("Uploaded replay to R2: %s/%s", w.r2.Endpoint(), key) + } + } + + if uploadURL == "" && w.b2 != nil { + return "", fmt.Errorf("failed to upload replay to B2") + } + if uploadURL == "" { + uploadURL = fmt.Sprintf("replays/%s.json.gz", matchID) + } + return uploadURL, nil } -// uploadThumbnail generates and uploads a PNG thumbnail for the match. +// uploadThumbnail generates and uploads a PNG thumbnail to B2 (archive) and R2 (hot cache). func (w *Worker) uploadThumbnail(ctx context.Context, matchID string, replay *engine.Replay) error { - if w.b2 == nil { - return fmt.Errorf("B2 client not configured") + if w.b2 == nil && w.r2 == nil { + return fmt.Errorf("no storage client configured") } // Generate thumbnail image @@ -424,11 +468,22 @@ func (w *Worker) uploadThumbnail(ctx context.Context, matchID string, replay *en if err := png.Encode(&buf, img); err != nil { return fmt.Errorf("failed to encode thumbnail as PNG: %w", err) } + thumbData := buf.Bytes() - // Upload to B2 key := fmt.Sprintf("thumbnails/%s.png", matchID) - if err := w.b2.Upload(ctx, key, buf.Bytes(), "image/png", ""); err != nil { - return fmt.Errorf("failed to upload thumbnail to B2: %w", err) + + // Upload to B2 via ARMOR (cold archive, encrypted) + if w.b2 != nil { + if err := w.b2.Upload(ctx, key, thumbData, "image/png", ""); err != nil { + w.logger.Printf("Warning: failed to upload thumbnail to B2 (non-fatal): %v", err) + } + } + + // Upload to R2 directly (hot cache) + if w.r2 != nil { + if err := w.r2.Upload(ctx, key, thumbData, "image/png", ""); err != nil { + w.logger.Printf("Warning: failed to upload thumbnail to R2: %v", err) + } } return nil