ai-code-battle/cmd/acb-index-builder/s3.go
jedarden 7e9d1af69c fix(db): reduce query LIMITs and fix O(n²) complexity to prevent OOMKill
- Reduce fetchBots LIMIT from 10000 to 2000
- Reduce fetchRatingHistory LIMIT from 10000 to 5000
- Reduce fetchFeedback LIMIT from 5000 to 1000
- Fix O(n²) participant name lookup in generateBotProfiles by using botNameMap
- Add panic recovery in runBuildCycle to log panics via slog before crashing
- Add R2/B2 client helper functions in s3.go

This fixes acb-index-builder CrashLoopBackOff caused by OOMKill after
web asset copy. The pod was silently crashing during fetchAllData()
due to unbounded query results consuming all memory.

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-25 06:43:50 -04:00

203 lines
5.4 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"io"
"sort"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)
// S3Client wraps S3 API operations for R2 and B2
type S3Client struct {
client *s3.Client
bucket string
}
// NewS3Client creates a new S3-compatible client
func NewS3Client(endpoint, accessKey, secretKey, bucket string) (*S3Client, error) {
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
accessKey, secretKey, "",
)),
config.WithRegion("us-east-1"),
)
if err != nil {
return nil, fmt.Errorf("failed to load AWS config: %w", err)
}
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.BaseEndpoint = aws.String(endpoint)
o.UsePathStyle = true // Use path-style URLs for R2/B2 compatibility
})
return &S3Client{
client: client,
bucket: bucket,
}, nil
}
// listObjects lists all objects in the bucket with the given prefix
func (c *S3Client) listObjects(ctx context.Context, prefix string) ([]R2Object, error) {
var objects []R2Object
var continuationToken *string
for {
input := &s3.ListObjectsV2Input{
Bucket: aws.String(c.bucket),
Prefix: aws.String(prefix),
ContinuationToken: continuationToken,
}
output, err := c.client.ListObjectsV2(ctx, input)
if err != nil {
return nil, fmt.Errorf("list objects: %w", err)
}
// Add objects to result
for _, obj := range output.Contents {
if obj.Key == nil || obj.LastModified == nil || obj.Size == nil {
continue
}
objects = append(objects, R2Object{
Key: *obj.Key,
Size: *obj.Size,
LastModified: *obj.LastModified,
})
}
// Set continuation token for next page
continuationToken = output.NextContinuationToken
if continuationToken == nil {
break
}
}
// Sort by LastModified (oldest first)
sort.Slice(objects, func(i, j int) bool {
return objects[i].LastModified.Before(objects[j].LastModified)
})
return objects, nil
}
// deleteObject deletes an object from the bucket
func (c *S3Client) deleteObject(ctx context.Context, key string) error {
input := &s3.DeleteObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(key),
}
_, err := c.client.DeleteObject(ctx, input)
if err != nil {
return fmt.Errorf("delete object %s: %w", key, err)
}
return nil
}
// objectExists checks if an object exists in the bucket
func (c *S3Client) objectExists(ctx context.Context, key string) (bool, error) {
input := &s3.HeadObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(key),
}
_, err := c.client.HeadObject(ctx, input)
if err != nil {
var notFound *types.NotFound
if errors.As(err, &notFound) {
return false, nil
}
return false, fmt.Errorf("head object %s: %w", key, err)
}
return true, nil
}
// uploadFile uploads a file to the bucket
func (c *S3Client) uploadFile(ctx context.Context, key string, body io.Reader, contentType string) error {
input := &s3.PutObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(key),
Body: body,
ContentType: aws.String(contentType),
}
_, err := c.client.PutObject(ctx, input)
if err != nil {
return fmt.Errorf("upload object %s: %w", key, err)
}
return nil
}
// copyObject copies an object from another bucket (cross-account copy)
func (c *S3Client) copyObject(ctx context.Context, sourceBucket, sourceKey, destKey string) error {
copySource := fmt.Sprintf("%s/%s", sourceBucket, sourceKey)
input := &s3.CopyObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(destKey),
CopySource: aws.String(copySource),
}
_, err := c.client.CopyObject(ctx, input)
if err != nil {
return fmt.Errorf("copy object from %s to %s: %w", sourceKey, destKey, err)
}
return nil
}
// downloadObject downloads an object from the bucket
func (c *S3Client) downloadObject(ctx context.Context, key string) (io.ReadCloser, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(key),
}
output, err := c.client.GetObject(ctx, input)
if err != nil {
return nil, fmt.Errorf("download object %s: %w", key, err)
}
return output.Body, nil
}
// getS3ContentType returns the content type for a file extension
func getS3ContentType(filename string) string {
switch {
case len(filename) >= 3 && filename[len(filename)-3:] == ".gz":
return "application/gzip"
case len(filename) >= 5 && filename[len(filename)-5:] == ".json":
return "application/json"
case len(filename) >= 4 && filename[len(filename)-4:] == ".png":
return "image/png"
default:
return "application/octet-stream"
}
}
// getR2Client creates an R2 client from config
func getR2Client(cfg *Config) (*S3Client, error) {
if cfg.R2Endpoint == "" || cfg.R2AccessKey == "" || cfg.R2SecretKey == "" || cfg.R2BucketName == "" {
return nil, fmt.Errorf("R2 config incomplete")
}
return NewS3Client(cfg.R2Endpoint, cfg.R2AccessKey, cfg.R2SecretKey, cfg.R2BucketName)
}
// getB2Client creates a B2 client from config
func getB2Client(cfg *Config) (*S3Client, error) {
if cfg.B2Endpoint == "" || cfg.B2AccessKey == "" || cfg.B2SecretKey == "" || cfg.B2BucketName == "" {
return nil, fmt.Errorf("B2 config incomplete")
}
return NewS3Client(cfg.B2Endpoint, cfg.B2AccessKey, cfg.B2SecretKey, cfg.B2BucketName)
}