feat(worker): upload replays directly to R2 in addition to B2
Adds R2 (Cloudflare) as a direct upload target alongside B2 (cold archive). When ACB_R2_* credentials are configured, the worker uploads replays and thumbnails to R2 immediately after each match, bypassing the index-builder's B2→R2 promotion cycle. This is necessary because ARMOR's B2 app key is write-only; reads via the direct S3 path return 403. The Cloudflare CDN read path (armor-hub-b2.ardenone.com) is dead post-hub-decommission. Direct R2 upload ensures replays are available without waiting for a working B2 read path. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
d126654dbb
commit
dc0caf0115
2 changed files with 96 additions and 15 deletions
|
|
@ -19,6 +19,32 @@ type B2Client struct {
|
|||
endpoint string
|
||||
}
|
||||
|
||||
// NewR2Client creates a new Cloudflare R2 client using the same B2Client type.
|
||||
func NewR2Client(cfg *Config) *B2Client {
|
||||
awsCfg, err := config.LoadDefaultConfig(context.TODO(),
|
||||
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
||||
cfg.R2AccessKey,
|
||||
cfg.R2SecretKey,
|
||||
"",
|
||||
)),
|
||||
config.WithRegion("auto"),
|
||||
)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("failed to load R2 AWS config: %v", err))
|
||||
}
|
||||
|
||||
client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
|
||||
o.BaseEndpoint = aws.String(cfg.R2Endpoint)
|
||||
o.UsePathStyle = true
|
||||
})
|
||||
|
||||
return &B2Client{
|
||||
client: client,
|
||||
bucket: cfg.R2Bucket,
|
||||
endpoint: cfg.R2Endpoint,
|
||||
}
|
||||
}
|
||||
|
||||
// NewB2Client creates a new B2 client.
|
||||
func NewB2Client(cfg *Config) *B2Client {
|
||||
// Load AWS config with B2 credentials
|
||||
|
|
|
|||
|
|
@ -26,11 +26,15 @@ import (
|
|||
// Config holds worker configuration.
|
||||
type Config struct {
|
||||
DatabaseURL string // PostgreSQL connection URL
|
||||
B2Endpoint string // B2 endpoint URL
|
||||
B2Endpoint string // B2 endpoint URL (ARMOR proxy)
|
||||
B2Bucket string // B2 bucket name
|
||||
B2AccessKey string // B2 access key ID
|
||||
B2SecretKey string // B2 secret access key
|
||||
B2Region string // B2 region (e.g., "us-west-004")
|
||||
R2Endpoint string // R2 endpoint URL (Cloudflare R2 S3 API)
|
||||
R2Bucket string // R2 bucket name
|
||||
R2AccessKey string // R2 access key ID
|
||||
R2SecretKey string // R2 secret access key
|
||||
WorkerID string // Unique worker identifier
|
||||
PollPeriod time.Duration // How often to poll for jobs
|
||||
Heartbeat time.Duration // How often to send heartbeat during match
|
||||
|
|
@ -47,6 +51,10 @@ func main() {
|
|||
b2AccessKey := flag.String("b2-access-key", getEnv("ACB_B2_ACCESS_KEY", ""), "B2 access key ID")
|
||||
b2SecretKey := flag.String("b2-secret-key", getEnv("ACB_B2_SECRET_KEY", ""), "B2 secret access key")
|
||||
b2Region := flag.String("b2-region", getEnv("ACB_B2_REGION", "us-west-004"), "B2 region")
|
||||
r2Endpoint := flag.String("r2-endpoint", getEnv("ACB_R2_ENDPOINT", ""), "R2 endpoint URL")
|
||||
r2Bucket := flag.String("r2-bucket", getEnv("ACB_R2_BUCKET", ""), "R2 bucket name")
|
||||
r2AccessKey := flag.String("r2-access-key", getEnv("ACB_R2_ACCESS_KEY", ""), "R2 access key ID")
|
||||
r2SecretKey := flag.String("r2-secret-key", getEnv("ACB_R2_SECRET_KEY", ""), "R2 secret access key")
|
||||
workerID := flag.String("worker-id", getEnv("ACB_WORKER_ID", generateWorkerID()), "Unique worker identifier")
|
||||
pollPeriod := flag.Duration("poll", 5*time.Second, "Job polling period")
|
||||
heartbeat := flag.Duration("heartbeat", 30*time.Second, "Heartbeat interval during matches")
|
||||
|
|
@ -67,6 +75,10 @@ func main() {
|
|||
B2AccessKey: *b2AccessKey,
|
||||
B2SecretKey: *b2SecretKey,
|
||||
B2Region: *b2Region,
|
||||
R2Endpoint: *r2Endpoint,
|
||||
R2Bucket: *r2Bucket,
|
||||
R2AccessKey: *r2AccessKey,
|
||||
R2SecretKey: *r2SecretKey,
|
||||
WorkerID: *workerID,
|
||||
PollPeriod: *pollPeriod,
|
||||
Heartbeat: *heartbeat,
|
||||
|
|
@ -82,12 +94,19 @@ func main() {
|
|||
}
|
||||
defer dbClient.Close()
|
||||
|
||||
// Create B2 client (optional - if not configured, replays won't be uploaded)
|
||||
// Create B2 client (optional - if not configured, replays won't be uploaded to cold archive)
|
||||
var b2Client *B2Client
|
||||
if cfg.B2Endpoint != "" && cfg.B2AccessKey != "" && cfg.B2SecretKey != "" {
|
||||
b2Client = NewB2Client(cfg)
|
||||
}
|
||||
|
||||
// Create R2 client (optional - if configured, replays are written to R2 immediately,
|
||||
// making them available without waiting for the B2→R2 promotion cycle)
|
||||
var r2Client *B2Client
|
||||
if cfg.R2Endpoint != "" && cfg.R2AccessKey != "" && cfg.R2SecretKey != "" {
|
||||
r2Client = NewR2Client(cfg)
|
||||
}
|
||||
|
||||
// Create metrics
|
||||
wMetrics := NewMetrics(cfg.WorkerID)
|
||||
|
||||
|
|
@ -96,6 +115,7 @@ func main() {
|
|||
cfg: cfg,
|
||||
db: dbClient,
|
||||
b2: b2Client,
|
||||
r2: r2Client,
|
||||
metrics: wMetrics,
|
||||
logger: log.New(os.Stdout, fmt.Sprintf("[worker-%s] ", cfg.WorkerID), log.LstdFlags),
|
||||
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
|
|
@ -139,6 +159,7 @@ type Worker struct {
|
|||
cfg *Config
|
||||
db *DBClient
|
||||
b2 *B2Client
|
||||
r2 *B2Client
|
||||
metrics *Metrics
|
||||
logger *log.Logger
|
||||
rng *rand.Rand
|
||||
|
|
@ -376,10 +397,11 @@ func (w *Worker) sendHeartbeats(ctx context.Context, jobID string) {
|
|||
}
|
||||
}
|
||||
|
||||
// uploadReplay uploads the gzipped replay to B2 and returns the URL.
|
||||
// uploadReplay uploads the gzipped replay to B2 (cold archive) and R2 (hot cache).
|
||||
// Returns error only if both uploads fail; a B2-only failure is logged but not fatal.
|
||||
func (w *Worker) uploadReplay(ctx context.Context, matchID string, replay *engine.Replay) (string, error) {
|
||||
if w.b2 == nil {
|
||||
return "", fmt.Errorf("B2 client not configured")
|
||||
if w.b2 == nil && w.r2 == nil {
|
||||
return "", fmt.Errorf("no storage client configured")
|
||||
}
|
||||
|
||||
// Serialize replay to JSON
|
||||
|
|
@ -397,20 +419,42 @@ func (w *Worker) uploadReplay(ctx context.Context, matchID string, replay *engin
|
|||
if err := gw.Close(); err != nil {
|
||||
return "", fmt.Errorf("failed to close gzip writer: %w", err)
|
||||
}
|
||||
compressed := buf.Bytes()
|
||||
|
||||
// Upload to B2
|
||||
key := fmt.Sprintf("replays/%s.json.gz", matchID)
|
||||
if err := w.b2.Upload(ctx, key, buf.Bytes(), "application/json", "gzip"); err != nil {
|
||||
return "", fmt.Errorf("failed to upload replay to B2: %w", err)
|
||||
var uploadURL string
|
||||
|
||||
// Upload to B2 via ARMOR (cold archive, encrypted)
|
||||
if w.b2 != nil {
|
||||
if err := w.b2.Upload(ctx, key, compressed, "application/json", "gzip"); err != nil {
|
||||
w.logger.Printf("Warning: failed to upload replay to B2 (non-fatal): %v", err)
|
||||
} else {
|
||||
uploadURL = fmt.Sprintf("%s/%s", w.b2.Endpoint(), key)
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s/%s", w.b2.Endpoint(), key), nil
|
||||
// Upload to R2 directly (hot cache, bypasses B2→R2 promotion cycle)
|
||||
if w.r2 != nil {
|
||||
if err := w.r2.Upload(ctx, key, compressed, "application/json", "gzip"); err != nil {
|
||||
w.logger.Printf("Warning: failed to upload replay to R2: %v", err)
|
||||
} else {
|
||||
w.logger.Printf("Uploaded replay to R2: %s/%s", w.r2.Endpoint(), key)
|
||||
}
|
||||
}
|
||||
|
||||
if uploadURL == "" && w.b2 != nil {
|
||||
return "", fmt.Errorf("failed to upload replay to B2")
|
||||
}
|
||||
if uploadURL == "" {
|
||||
uploadURL = fmt.Sprintf("replays/%s.json.gz", matchID)
|
||||
}
|
||||
return uploadURL, nil
|
||||
}
|
||||
|
||||
// uploadThumbnail generates and uploads a PNG thumbnail for the match.
|
||||
// uploadThumbnail generates and uploads a PNG thumbnail to B2 (archive) and R2 (hot cache).
|
||||
func (w *Worker) uploadThumbnail(ctx context.Context, matchID string, replay *engine.Replay) error {
|
||||
if w.b2 == nil {
|
||||
return fmt.Errorf("B2 client not configured")
|
||||
if w.b2 == nil && w.r2 == nil {
|
||||
return fmt.Errorf("no storage client configured")
|
||||
}
|
||||
|
||||
// Generate thumbnail image
|
||||
|
|
@ -424,11 +468,22 @@ func (w *Worker) uploadThumbnail(ctx context.Context, matchID string, replay *en
|
|||
if err := png.Encode(&buf, img); err != nil {
|
||||
return fmt.Errorf("failed to encode thumbnail as PNG: %w", err)
|
||||
}
|
||||
thumbData := buf.Bytes()
|
||||
|
||||
// Upload to B2
|
||||
key := fmt.Sprintf("thumbnails/%s.png", matchID)
|
||||
if err := w.b2.Upload(ctx, key, buf.Bytes(), "image/png", ""); err != nil {
|
||||
return fmt.Errorf("failed to upload thumbnail to B2: %w", err)
|
||||
|
||||
// Upload to B2 via ARMOR (cold archive, encrypted)
|
||||
if w.b2 != nil {
|
||||
if err := w.b2.Upload(ctx, key, thumbData, "image/png", ""); err != nil {
|
||||
w.logger.Printf("Warning: failed to upload thumbnail to B2 (non-fatal): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Upload to R2 directly (hot cache)
|
||||
if w.r2 != nil {
|
||||
if err := w.r2.Upload(ctx, key, thumbData, "image/png", ""); err != nil {
|
||||
w.logger.Printf("Warning: failed to upload thumbnail to R2: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue