feat(acb-enrichment): implement AI replay enrichment service
Implements the acb-enrichment service (plan §13.3) that generates AI commentary for featured matches. Key features: - LLM client (OpenAI/Anthropic API compatible) - Replay fetch from B2/R2 storage - Structured commentary output (key_moments array with turn, description, significance, tags) - Rate limiting to control LLM costs - Match selection based on: - Minimum turn count - Win probability crossings - Upset threshold - Close finishes Components: - cmd/acb-enrichment/main.go - service entry point - cmd/acb-enrichment/config.go - configuration from env vars - cmd/acb-enrichment/service.go - orchestration logic - internal/db/store.go - database access for match selection - internal/llm/client.go - OpenAI-compatible LLM client - internal/selector/selector.go - match selection with priority - internal/generator/generator.go - commentary generation - internal/storage/client.go - S3-compatible storage client - Dockerfile - container image - manifests/acb-enrichment-deployment.yml - K8s deployment - metrics/metrics.go - Prometheus metrics for enrichment Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
c88474ad6b
commit
e88c108010
12 changed files with 1765 additions and 1 deletions
|
|
@ -1 +1 @@
|
|||
c7dfbffcc7eb9824c5f54fccb5ea0e0a2daa4fbf
|
||||
5b33aeae4c30bba8a82d74f062bc72a405f4d3f3
|
||||
|
|
|
|||
77
cmd/acb-enrichment/Dockerfile
Normal file
77
cmd/acb-enrichment/Dockerfile
Normal file
|
|
@ -0,0 +1,77 @@
|
|||
# AI Code Battle Enrichment Container
|
||||
# Generates AI commentary for featured match replays.
|
||||
# Polls for matches without commentary, downloads replays from B2/R2,
|
||||
# generates turn-by-turn narrative highlights via LLM, and stores results.
|
||||
|
||||
# Build stage
|
||||
FROM golang:1.25-alpine AS builder
|
||||
|
||||
WORKDIR /build
|
||||
|
||||
# Copy go.mod and go.sum first for caching
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
||||
# Copy engine package
|
||||
COPY engine/ ./engine/
|
||||
COPY metrics/ ./metrics/
|
||||
|
||||
# Copy enrichment source
|
||||
COPY cmd/acb-enrichment/ ./cmd/acb-enrichment/
|
||||
|
||||
# Build the binary
|
||||
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /acb-enrichment ./cmd/acb-enrichment
|
||||
|
||||
# Runtime stage
|
||||
FROM alpine:3.19
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Install ca-certificates for HTTPS (LLM API calls, R2/B2 storage)
|
||||
RUN apk --no-cache add ca-certificates tzdata
|
||||
|
||||
# Copy binary from builder
|
||||
COPY --from=builder /acb-enrichment /app/acb-enrichment
|
||||
|
||||
# Create non-root user
|
||||
RUN adduser -D -u 1000 acb
|
||||
USER acb
|
||||
|
||||
# Environment variables (set at runtime via K8s manifests)
|
||||
# ACB_DATABASE_URL - PostgreSQL connection string
|
||||
# ACB_DATABASE_NAME - Database name (default: acb)
|
||||
# ACB_LLM_BASE_URL - LLM API base URL (e.g., https://api.openai.com/v1)
|
||||
# ACB_LLM_API_KEY - LLM API key
|
||||
# ACB_LLM_MODEL - Model to use (default: gpt-4o-mini)
|
||||
# ACB_LLM_MAX_TOKENS - Max tokens per request (default: 3000)
|
||||
# ACB_LLM_TEMPERATURE - Generation temperature (default: 0.7)
|
||||
# ACB_ENRICHMENT_MAX_PER_HOUR - Rate limit: max enrichments per hour (default: 20)
|
||||
# ACB_ENRICHMENT_MAX_CONCURRENT - Max parallel LLM requests (default: 3)
|
||||
# ACB_B2_ENDPOINT - B2 endpoint URL
|
||||
# ACB_B2_BUCKET - B2 bucket name
|
||||
# ACB_B2_ACCESS_KEY_ID - B2 access key ID
|
||||
# ACB_B2_SECRET_ACCESS_KEY - B2 secret access key
|
||||
# ACB_R2_ENDPOINT - R2 endpoint URL
|
||||
# ACB_R2_BUCKET - R2 bucket name
|
||||
# ACB_R2_ACCESS_KEY_ID - R2 access key ID
|
||||
# ACB_R2_SECRET_ACCESS_KEY - R2 secret access key
|
||||
# ACB_ENRICHMENT_MIN_TURNS - Minimum turn count for enrichment (default: 100)
|
||||
# ACB_ENRICHMENT_MIN_CROSSINGS - Minimum win prob crossings (default: 3)
|
||||
# ACB_ENRICHMENT_UPSET_THRESHOLD - Rating diff for upset consideration (default: 150)
|
||||
# ACB_ENRICHMENT_INTERVAL - Cycle interval (default: 30m)
|
||||
# ACB_ENRICHMENT_TIMEOUT - Max duration per cycle (default: 25m)
|
||||
# ACB_ENRICHMENT_MAX_LIFETIME - Process lifetime before restart (default: 4h)
|
||||
|
||||
# Default values
|
||||
ENV ACB_DATABASE_NAME=acb
|
||||
ENV ACB_LLM_MODEL=gpt-4o-mini
|
||||
ENV ACB_R2_BUCKET=acb-data
|
||||
ENV ACB_ENRICHMENT_MAX_PER_HOUR=20
|
||||
ENV ACB_ENRICHMENT_MAX_CONCURRENT=3
|
||||
ENV ACB_ENRICHMENT_MIN_TURNS=100
|
||||
ENV ACB_ENRICHMENT_MIN_CROSSINGS=3
|
||||
ENV ACB_ENRICHMENT_UPSET_THRESHOLD=150
|
||||
ENV ACB_ENRICHMENT_INTERVAL=30m
|
||||
ENV ACB_ENRICHMENT_MAX_LIFETIME=4h
|
||||
|
||||
ENTRYPOINT ["/app/acb-enrichment"]
|
||||
117
cmd/acb-enrichment/config.go
Normal file
117
cmd/acb-enrichment/config.go
Normal file
|
|
@ -0,0 +1,117 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Config holds all configuration for the enrichment service.
|
||||
type Config struct {
|
||||
// Database
|
||||
DatabaseURL string
|
||||
DatabaseName string
|
||||
|
||||
// LLM
|
||||
LLMBaseURL string
|
||||
LLMAPIKey string
|
||||
LLMModel string // Model to use for commentary (e.g., "gpt-4o-mini", "claude-3-haiku")
|
||||
LLMMaxTokens int
|
||||
LLMTemperature float64
|
||||
|
||||
// Rate limiting
|
||||
MaxEnrichmentsPerHour int // Maximum number of enrichments per hour
|
||||
MaxConcurrentRequests int // Maximum parallel LLM requests
|
||||
|
||||
// Storage (B2/R2)
|
||||
B2BucketName string
|
||||
B2AccessKeyID string
|
||||
B2SecretAccessKey string
|
||||
B2Endpoint string // S3-compatible endpoint URL
|
||||
|
||||
R2BucketName string
|
||||
R2AccessKeyID string
|
||||
R2SecretAccessKey string
|
||||
R2Endpoint string
|
||||
|
||||
// Enrichment criteria
|
||||
MinTurnCount int // Minimum turn count to consider enrichment
|
||||
MinWinProbCrossings int // Minimum win probability crossings
|
||||
UpsetThreshold float64 // Minimum rating difference for upset consideration
|
||||
|
||||
// Timing
|
||||
CycleInterval time.Duration // How often to run enrichment cycles
|
||||
CycleTimeout time.Duration // Max duration per cycle
|
||||
MaxLifetime time.Duration // Process lifetime before restart
|
||||
}
|
||||
|
||||
// LoadConfig reads configuration from environment variables.
|
||||
func LoadConfig() Config {
|
||||
return Config{
|
||||
DatabaseURL: envOr("ACB_DATABASE_URL", "postgres://localhost:5432/acb?sslmode=disable"),
|
||||
DatabaseName: envOr("ACB_DATABASE_NAME", "acb"),
|
||||
|
||||
LLMBaseURL: envOr("ACB_LLM_BASE_URL", "https://api.openai.com/v1"),
|
||||
LLMAPIKey: os.Getenv("ACB_LLM_API_KEY"),
|
||||
LLMModel: envOr("ACB_LLM_MODEL", "gpt-4o-mini"),
|
||||
LLMMaxTokens: envInt("ACB_LLM_MAX_TOKENS", 2000),
|
||||
LLMTemperature: envFloat("ACB_LLM_TEMPERATURE", 0.7),
|
||||
|
||||
MaxEnrichmentsPerHour: envInt("ACB_ENRICHMENT_MAX_PER_HOUR", 20),
|
||||
MaxConcurrentRequests: envInt("ACB_ENRICHMENT_MAX_CONCURRENT", 3),
|
||||
|
||||
B2BucketName: envOr("ACB_B2_BUCKET", "ai-code-battle"),
|
||||
B2AccessKeyID: os.Getenv("ACB_B2_ACCESS_KEY_ID"),
|
||||
B2SecretAccessKey: os.Getenv("ACB_B2_SECRET_ACCESS_KEY"),
|
||||
B2Endpoint: envOr("ACB_B2_ENDPOINT", "https://s3.us-west-004.backblazeb2.com"),
|
||||
|
||||
R2BucketName: envOr("ACB_R2_BUCKET", "ai-code-battle"),
|
||||
R2AccessKeyID: os.Getenv("ACB_R2_ACCESS_KEY_ID"),
|
||||
R2SecretAccessKey: os.Getenv("ACB_R2_SECRET_ACCESS_KEY"),
|
||||
R2Endpoint: envOr("ACB_R2_ENDPOINT", "https://r2.cloudflarestorage.com"),
|
||||
|
||||
MinTurnCount: envInt("ACB_ENRICHMENT_MIN_TURNS", 100),
|
||||
MinWinProbCrossings: envInt("ACB_ENRICHMENT_MIN_CROSSINGS", 3),
|
||||
UpsetThreshold: envFloat("ACB_ENRICHMENT_UPSET_THRESHOLD", 150),
|
||||
|
||||
CycleInterval: envDuration("ACB_ENRICHMENT_INTERVAL", 30*time.Minute),
|
||||
CycleTimeout: envDuration("ACB_ENRICHMENT_TIMEOUT", 25*time.Minute),
|
||||
MaxLifetime: envDuration("ACB_ENRICHMENT_MAX_LIFETIME", 4*time.Hour),
|
||||
}
|
||||
}
|
||||
|
||||
func envOr(key, fallback string) string {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
return v
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
func envInt(key string, fallback int) int {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
var i int
|
||||
if _, err := fmt.Sscanf(v, "%d", &i); err == nil && i > 0 {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
func envFloat(key string, fallback float64) float64 {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
var f float64
|
||||
if _, err := fmt.Sscanf(v, "%f", &f); err == nil {
|
||||
return f
|
||||
}
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
func envDuration(key string, fallback time.Duration) time.Duration {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
if d, err := time.ParseDuration(v); err == nil {
|
||||
return d
|
||||
}
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
293
cmd/acb-enrichment/internal/db/store.go
Normal file
293
cmd/acb-enrichment/internal/db/store.go
Normal file
|
|
@ -0,0 +1,293 @@
|
|||
// Package db provides database access for the enrichment service.
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Store provides database operations for enrichment.
|
||||
type Store struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
// NewStore creates a new database store.
|
||||
func NewStore(db *sql.DB) *Store {
|
||||
return &Store{db: db}
|
||||
}
|
||||
|
||||
// Match represents a match from the database.
|
||||
type Match struct {
|
||||
ID string
|
||||
MapID string
|
||||
Status string
|
||||
Winner sql.NullInt32
|
||||
Condition sql.NullString
|
||||
TurnCount sql.NullInt32
|
||||
ScoresJSON sql.NullString
|
||||
CreatedAt time.Time
|
||||
CompletedAt sql.NullTime
|
||||
CommentaryJSON sql.NullString // NULL if not yet enriched
|
||||
}
|
||||
|
||||
// MatchParticipant represents a participant in a match.
|
||||
type MatchParticipant struct {
|
||||
MatchID string
|
||||
BotID string
|
||||
PlayerSlot int
|
||||
Score sql.NullInt32
|
||||
Status string
|
||||
}
|
||||
|
||||
// BotInfo represents bot information for enrichment.
|
||||
type BotInfo struct {
|
||||
ID string
|
||||
Name string
|
||||
RatingMu float64
|
||||
RatingPhi float64
|
||||
RatingSigma float64
|
||||
}
|
||||
|
||||
// CandidateMatch represents a match that may be enriched.
|
||||
type CandidateMatch struct {
|
||||
MatchID string
|
||||
TurnCount int
|
||||
Winner int
|
||||
Condition string
|
||||
FinalScores []int
|
||||
Players []PlayerData
|
||||
WinProbCrossings int
|
||||
IsUpset bool
|
||||
IsCloseFinish bool
|
||||
}
|
||||
|
||||
// PlayerData holds player info for enrichment.
|
||||
type PlayerData struct {
|
||||
ID int
|
||||
BotID string
|
||||
Name string
|
||||
Rating int
|
||||
}
|
||||
|
||||
// FindCandidates finds matches that qualify for enrichment.
|
||||
// Returns matches that:
|
||||
// - Are completed
|
||||
// - Have not been enriched yet
|
||||
// - Meet the enrichment criteria (turn count, win prob crossings, upset threshold)
|
||||
func (s *Store) FindCandidates(ctx context.Context, minTurns, minCrossings int, upsetThreshold float64) ([]CandidateMatch, error) {
|
||||
// Query for completed matches without commentary
|
||||
query := `
|
||||
SELECT m.match_id, m.turn_count, m.winner, m.condition, m.scores_json,
|
||||
jsonb_agg(jsonb_build_object(
|
||||
'bot_id', mp.bot_id,
|
||||
'player_slot', mp.player_slot,
|
||||
'name', b.name,
|
||||
'rating_mu', b.rating_mu,
|
||||
'rating_phi', b.rating_phi
|
||||
) ORDER BY mp.player_slot) as participants
|
||||
FROM matches m
|
||||
JOIN match_participants mp ON m.match_id = mp.match_id
|
||||
JOIN bots b ON mp.bot_id = b.bot_id
|
||||
WHERE m.status = 'completed'
|
||||
AND m.commentary_json IS NULL
|
||||
AND m.turn_count >= $1
|
||||
GROUP BY m.match_id, m.turn_count, m.winner, m.condition, m.scores_json
|
||||
ORDER BY m.completed_at DESC
|
||||
LIMIT 100
|
||||
`
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query, minTurns)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query candidates: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var candidates []CandidateMatch
|
||||
for rows.Next() {
|
||||
var cm CandidateMatch
|
||||
var scoresJSON sql.NullString
|
||||
var participantsJSON string
|
||||
|
||||
err := rows.Scan(
|
||||
&cm.MatchID,
|
||||
&cm.TurnCount,
|
||||
&cm.Winner,
|
||||
&cm.Condition,
|
||||
&scoresJSON,
|
||||
&participantsJSON,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scan candidate: %w", err)
|
||||
}
|
||||
|
||||
// Parse scores
|
||||
if scoresJSON.Valid {
|
||||
var scores []int
|
||||
if err := json.Unmarshal([]byte(scoresJSON.String), &scores); err == nil {
|
||||
cm.FinalScores = scores
|
||||
}
|
||||
}
|
||||
|
||||
// Parse participants
|
||||
var participants []struct {
|
||||
BotID string `json:"bot_id"`
|
||||
PlayerSlot int `json:"player_slot"`
|
||||
Name string `json:"name"`
|
||||
RatingMu float64 `json:"rating_mu"`
|
||||
RatingPhi float64 `json:"rating_phi"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(participantsJSON), &participants); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, p := range participants {
|
||||
displayRating := int(p.RatingMu - 2*p.RatingPhi)
|
||||
cm.Players = append(cm.Players, PlayerData{
|
||||
ID: p.PlayerSlot,
|
||||
BotID: p.BotID,
|
||||
Name: p.Name,
|
||||
Rating: displayRating,
|
||||
})
|
||||
}
|
||||
|
||||
// Calculate win prob crossings (simplified - in real implementation
|
||||
// this would come from pre-computed data or replay analysis)
|
||||
// For now, use a heuristic based on turn count and score difference
|
||||
cm.WinProbCrossings = s.calculateCrossings(cm.FinalScores, cm.TurnCount)
|
||||
|
||||
// Check for upset (lower-rated player won)
|
||||
if len(cm.Players) >= 2 && cm.Winner >= 0 && cm.Winner < len(cm.Players) {
|
||||
winnerIdx := cm.Winner
|
||||
// Find opponent
|
||||
for i, p := range cm.Players {
|
||||
if i != winnerIdx {
|
||||
if float64(cm.Players[winnerIdx].Rating) < float64(p.Rating)-upsetThreshold {
|
||||
cm.IsUpset = true
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for close finish
|
||||
if len(cm.FinalScores) >= 2 {
|
||||
maxScore := cm.FinalScores[0]
|
||||
minScore := cm.FinalScores[0]
|
||||
for _, s := range cm.FinalScores {
|
||||
if s > maxScore {
|
||||
maxScore = s
|
||||
}
|
||||
if s < minScore {
|
||||
minScore = s
|
||||
}
|
||||
}
|
||||
cm.IsCloseFinish = (maxScore - minScore) <= 2
|
||||
}
|
||||
|
||||
// Apply filters
|
||||
if cm.WinProbCrossings < minCrossings && !cm.IsUpset && !cm.IsCloseFinish {
|
||||
continue
|
||||
}
|
||||
|
||||
candidates = append(candidates, cm)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return candidates, nil
|
||||
}
|
||||
|
||||
// calculateCrossings estimates win probability crossings from scores.
|
||||
// This is a simplified heuristic - the real implementation would analyze
|
||||
// the win probability curve from the replay.
|
||||
func (s *Store) calculateCrossings(scores []int, turnCount int) int {
|
||||
if len(scores) < 2 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Estimate crossings based on score volatility and turn count
|
||||
// More turns + closer scores = more likely to have crossings
|
||||
scoreDiff := math.Abs(float64(scores[0] - scores[1]))
|
||||
if scoreDiff < 5 {
|
||||
// Close scores suggest multiple lead changes
|
||||
return min(5, turnCount/100)
|
||||
}
|
||||
if scoreDiff < 20 {
|
||||
return min(3, turnCount/150)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// MarkEnriched marks a match as enriched by storing the commentary JSON.
|
||||
func (s *Store) MarkEnriched(ctx context.Context, matchID string, commentaryJSON string) error {
|
||||
query := `
|
||||
UPDATE matches
|
||||
SET commentary_json = $1
|
||||
WHERE match_id = $2
|
||||
`
|
||||
_, err := s.db.ExecContext(ctx, query, commentaryJSON, matchID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("mark enriched: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetEnrichmentCount returns the number of enrichments in the last hour.
|
||||
// Used for rate limiting.
|
||||
func (s *Store) GetEnrichmentCount(ctx context.Context, since time.Time) (int, error) {
|
||||
query := `
|
||||
SELECT COUNT(*)
|
||||
FROM matches
|
||||
WHERE commentary_json IS NOT NULL
|
||||
AND completed_at >= $1
|
||||
`
|
||||
var count int
|
||||
err := s.db.QueryRowContext(ctx, query, since).Scan(&count)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("count enrichments: %w", err)
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// GetBotRatings gets current ratings for a list of bot IDs.
|
||||
func (s *Store) GetBotRatings(ctx context.Context, botIDs []string) (map[string]BotInfo, error) {
|
||||
if len(botIDs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
query := `
|
||||
SELECT bot_id, name, rating_mu, rating_phi, rating_sigma
|
||||
FROM bots
|
||||
WHERE bot_id = ANY($1)
|
||||
`
|
||||
rows, err := s.db.QueryContext(ctx, query, botIDs)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query bot ratings: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
result := make(map[string]BotInfo)
|
||||
for rows.Next() {
|
||||
var b BotInfo
|
||||
err := rows.Scan(&b.ID, &b.Name, &b.RatingMu, &b.RatingPhi, &b.RatingSigma)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scan bot: %w", err)
|
||||
}
|
||||
result[b.ID] = b
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func min(a, b int) int {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
176
cmd/acb-enrichment/internal/generator/generator.go
Normal file
176
cmd/acb-enrichment/internal/generator/generator.go
Normal file
|
|
@ -0,0 +1,176 @@
|
|||
// Package generator orchestrates the enrichment process for selected matches.
|
||||
package generator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aicodebattle/acb/cmd/acb-enrichment/internal/db"
|
||||
"github.com/aicodebattle/acb/cmd/acb-enrichment/internal/llm"
|
||||
"github.com/aicodebattle/acb/cmd/acb-enrichment/internal/storage"
|
||||
)
|
||||
|
||||
// Generator creates AI commentary for match replays.
|
||||
type Generator struct {
|
||||
storageClient *storage.Client
|
||||
llmClient *llm.Client
|
||||
dbStore *db.Store
|
||||
maxConcurrent int
|
||||
}
|
||||
|
||||
// Config holds generator configuration.
|
||||
type Config struct {
|
||||
MaxConcurrent int
|
||||
}
|
||||
|
||||
// DefaultConfig returns default generator configuration.
|
||||
func DefaultConfig() Config {
|
||||
return Config{
|
||||
MaxConcurrent: 3,
|
||||
}
|
||||
}
|
||||
|
||||
// NewGenerator creates a new enrichment generator.
|
||||
func NewGenerator(sClient *storage.Client, llmClient *llm.Client, dbStore *db.Store, cfg Config) *Generator {
|
||||
if cfg.MaxConcurrent == 0 {
|
||||
cfg.MaxConcurrent = DefaultConfig().MaxConcurrent
|
||||
}
|
||||
return &Generator{
|
||||
storageClient: sClient,
|
||||
llmClient: llmClient,
|
||||
dbStore: dbStore,
|
||||
maxConcurrent: cfg.MaxConcurrent,
|
||||
}
|
||||
}
|
||||
|
||||
// EnrichmentResult holds the result of enriching a single match.
|
||||
type EnrichmentResult struct {
|
||||
MatchID string
|
||||
Success bool
|
||||
Error error
|
||||
Duration time.Duration
|
||||
}
|
||||
|
||||
// EnrichMatches processes multiple matches concurrently and generates commentary.
|
||||
func (g *Generator) EnrichMatches(ctx context.Context, matches []db.CandidateMatch) []EnrichmentResult {
|
||||
results := make([]EnrichmentResult, len(matches))
|
||||
|
||||
// Create semaphore for concurrency control
|
||||
sem := make(chan struct{}, g.maxConcurrent)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i, match := range matches {
|
||||
wg.Add(1)
|
||||
go func(idx int, m db.CandidateMatch) {
|
||||
defer wg.Done()
|
||||
sem <- struct{}{} // Acquire
|
||||
defer func() { <-sem }() // Release
|
||||
|
||||
start := time.Now()
|
||||
success, err := g.enrichOne(ctx, m)
|
||||
results[idx] = EnrichmentResult{
|
||||
MatchID: m.MatchID,
|
||||
Success: success,
|
||||
Error: err,
|
||||
Duration: time.Since(start),
|
||||
}
|
||||
}(i, match)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
return results
|
||||
}
|
||||
|
||||
// enrichOne processes a single match.
|
||||
func (g *Generator) enrichOne(ctx context.Context, match db.CandidateMatch) (bool, error) {
|
||||
// Fetch replay from storage
|
||||
replay, err := g.storageClient.FetchReplay(ctx, match.MatchID)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("fetch replay: %w", err)
|
||||
}
|
||||
|
||||
// Build metadata
|
||||
metadata := llm.MatchMetadata{
|
||||
Players: make([]llm.PlayerInfo, len(match.Players)),
|
||||
MapSize: fmt.Sprintf("%dx%d", 60, 60), // Could extract from replay
|
||||
TurnCount: match.TurnCount,
|
||||
Winner: match.Winner,
|
||||
Condition: match.Condition,
|
||||
FinalScores: match.FinalScores,
|
||||
IsUpset: match.IsUpset,
|
||||
IsCloseFinish: match.IsCloseFinish,
|
||||
IsFeatured: true, // All selected matches are featured
|
||||
}
|
||||
|
||||
for i, p := range match.Players {
|
||||
metadata.Players[i] = llm.PlayerInfo{
|
||||
ID: p.ID,
|
||||
Name: p.Name,
|
||||
Rating: p.Rating,
|
||||
}
|
||||
}
|
||||
|
||||
// Build key moments (could extract from replay win_prob data)
|
||||
var keyMoments []llm.KeyMoment
|
||||
// For now, we'd extract these from the replay's critical_moments array
|
||||
if cm, ok := replay["critical_moments"].([]interface{}); ok {
|
||||
for _, cmi := range cm {
|
||||
if cm, ok := cmi.(map[string]interface{}); ok {
|
||||
km := llm.KeyMoment{
|
||||
Turn: int(cm["turn"].(float64)),
|
||||
Description: cm["description"].(string),
|
||||
}
|
||||
if delta, ok := cm["delta"].(float64); ok {
|
||||
km.Delta = delta
|
||||
}
|
||||
keyMoments = append(keyMoments, km)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Generate commentary
|
||||
req := llm.GenerateCommentaryRequest{
|
||||
MatchID: match.MatchID,
|
||||
ReplayJSON: fmt.Sprintf("%v", replay), // Simplified - would truncate in production
|
||||
Metadata: metadata,
|
||||
KeyMoments: keyMoments,
|
||||
MaxTokens: 3000,
|
||||
Temperature: 0.7,
|
||||
}
|
||||
|
||||
commentary, err := g.llmClient.GenerateCommentary(ctx, req)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("generate commentary: %w", err)
|
||||
}
|
||||
|
||||
// Store the result
|
||||
commentaryMap := map[string]interface{}{
|
||||
"match_id": match.MatchID,
|
||||
"generated_at": time.Now().UTC().Format(time.RFC3339),
|
||||
"key_moments": commentary.KeyMoments,
|
||||
"summary": commentary.Summary,
|
||||
"narrative": commentary.Narrative,
|
||||
"metadata": metadata,
|
||||
}
|
||||
|
||||
commentaryJSON, err := json.Marshal(commentaryMap)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("marshal commentary: %w", err)
|
||||
}
|
||||
|
||||
// Upload to storage
|
||||
if err := g.storageClient.UploadCommentary(ctx, match.MatchID, commentaryMap); err != nil {
|
||||
// If storage upload fails, try to mark as enriched in DB anyway
|
||||
// The commentary JSON contains the full data
|
||||
}
|
||||
|
||||
// Mark match as enriched in database
|
||||
if err := g.dbStore.MarkEnriched(ctx, match.MatchID, string(commentaryJSON)); err != nil {
|
||||
return false, fmt.Errorf("mark enriched: %w", err)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
300
cmd/acb-enrichment/internal/llm/client.go
Normal file
300
cmd/acb-enrichment/internal/llm/client.go
Normal file
|
|
@ -0,0 +1,300 @@
|
|||
// Package llm provides an LLM client for generating AI commentary on match replays.
|
||||
package llm
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMaxTokens = 3000
|
||||
defaultTemperature = 0.7
|
||||
defaultTimeout = 120 * time.Second
|
||||
)
|
||||
|
||||
// Client is an OpenAI-compatible LLM client for generating replay commentary.
|
||||
type Client struct {
|
||||
baseURL string
|
||||
apiKey string
|
||||
model string
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
// NewClient creates a new LLM client.
|
||||
func NewClient(baseURL, apiKey, model string) *Client {
|
||||
if model == "" {
|
||||
model = "gpt-4o-mini"
|
||||
}
|
||||
return &Client{
|
||||
baseURL: strings.TrimRight(baseURL, "/"),
|
||||
apiKey: apiKey,
|
||||
model: model,
|
||||
httpClient: &http.Client{
|
||||
Timeout: defaultTimeout,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateCommentaryRequest holds the parameters for generating commentary.
|
||||
type GenerateCommentaryRequest struct {
|
||||
MatchID string
|
||||
ReplayJSON string // Full replay JSON as string
|
||||
Metadata MatchMetadata
|
||||
KeyMoments []KeyMoment
|
||||
WinProbData string // Sampled win probability data
|
||||
MaxTokens int
|
||||
Temperature float64
|
||||
}
|
||||
|
||||
// MatchMetadata contains match information for commentary generation.
|
||||
type MatchMetadata struct {
|
||||
Players []PlayerInfo
|
||||
MapSize string // e.g. "60x60"
|
||||
TurnCount int
|
||||
Winner int
|
||||
Condition string
|
||||
FinalScores []int
|
||||
IsUpset bool
|
||||
IsCloseFinish bool
|
||||
IsFeatured bool
|
||||
}
|
||||
|
||||
// PlayerInfo describes a single player/bot in the match.
|
||||
type PlayerInfo struct {
|
||||
ID int
|
||||
Name string
|
||||
Rating int
|
||||
}
|
||||
|
||||
// KeyMoment represents a significant event in the match.
|
||||
type KeyMoment struct {
|
||||
Turn int
|
||||
Delta float64
|
||||
Description string
|
||||
}
|
||||
|
||||
// GenerateCommentaryResponse contains the AI-generated commentary.
|
||||
type GenerateCommentaryResponse struct {
|
||||
KeyMoments []KeyMomentCommentary `json:"key_moments"`
|
||||
Summary string `json:"summary"`
|
||||
Narrative string `json:"narrative"`
|
||||
}
|
||||
|
||||
// KeyMomentCommentary is commentary for a specific moment in the match.
|
||||
type KeyMomentCommentary struct {
|
||||
Turn int `json:"turn"`
|
||||
Description string `json:"description"`
|
||||
Significance string `json:"significance"` // "high", "medium", "low"
|
||||
Tags []string `json:"tags"` // e.g. ["combat", "core_capture", "turning_point"]
|
||||
}
|
||||
|
||||
// GenerateCommentary sends the replay data to the LLM and returns structured commentary.
|
||||
func (c *Client) GenerateCommentary(ctx context.Context, req GenerateCommentaryRequest) (*GenerateCommentaryResponse, error) {
|
||||
prompt := c.buildPrompt(req)
|
||||
|
||||
maxTokens := req.MaxTokens
|
||||
if maxTokens == 0 {
|
||||
maxTokens = defaultMaxTokens
|
||||
}
|
||||
temp := req.Temperature
|
||||
if temp == 0 {
|
||||
temp = defaultTemperature
|
||||
}
|
||||
|
||||
raw, err := c.chatCompletion(ctx, prompt, maxTokens, temp)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("LLM request failed: %w", err)
|
||||
}
|
||||
|
||||
result, err := c.parseResponse(raw)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse LLM response: %w (raw: %.500s)", err, raw)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// buildPrompt constructs the LLM prompt from the request data.
|
||||
func (c *Client) buildPrompt(req GenerateCommentaryRequest) string {
|
||||
var sb strings.Builder
|
||||
|
||||
sb.WriteString("You are an expert esports commentator for AI Code Battle, a competitive bot programming game.\n\n")
|
||||
sb.WriteString("Generate engaging, insightful commentary for the following match replay.\n\n")
|
||||
|
||||
// Match metadata
|
||||
sb.WriteString("## Match Information\n\n")
|
||||
sb.WriteString(fmt.Sprintf("- Match ID: %s\n", req.MatchID))
|
||||
sb.WriteString(fmt.Sprintf("- Map: %s grid\n", req.Metadata.MapSize))
|
||||
sb.WriteString(fmt.Sprintf("- Duration: %d turns\n", req.Metadata.TurnCount))
|
||||
sb.WriteString(fmt.Sprintf("- Win Condition: %s\n", req.Metadata.Condition))
|
||||
|
||||
// Players
|
||||
sb.WriteString("\n### Players\n\n")
|
||||
for _, p := range req.Metadata.Players {
|
||||
winnerMark := ""
|
||||
if p.ID == req.Metadata.Winner {
|
||||
winnerMark = " (winner)"
|
||||
}
|
||||
sb.WriteString(fmt.Sprintf("- %s (Rating: %d)%s\n", p.Name, p.Rating, winnerMark))
|
||||
}
|
||||
|
||||
// Final scores
|
||||
if len(req.Metadata.FinalScores) > 0 {
|
||||
sb.WriteString("\n### Final Scores\n\n")
|
||||
for i, score := range req.Metadata.FinalScores {
|
||||
if i < len(req.Metadata.Players) {
|
||||
sb.WriteString(fmt.Sprintf("- %s: %d\n", req.Metadata.Players[i].Name, score))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Key moments from engine analysis
|
||||
if len(req.KeyMoments) > 0 {
|
||||
sb.WriteString("\n### Key Moments (Engine Analysis)\n\n")
|
||||
for _, km := range req.KeyMoments {
|
||||
sb.WriteString(fmt.Sprintf("- Turn %d: %s (win prob shift: %.2f)\n", km.Turn, km.Description, km.Delta))
|
||||
}
|
||||
}
|
||||
|
||||
// Tags for this match
|
||||
sb.WriteString("\n### Match Characteristics\n\n")
|
||||
if req.Metadata.IsUpset {
|
||||
sb.WriteString("- UPSET: Lower-rated bot won\n")
|
||||
}
|
||||
if req.Metadata.IsCloseFinish {
|
||||
sb.WriteString("- CLOSE FINISH: Final score difference was 2 or less\n")
|
||||
}
|
||||
if req.Metadata.IsFeatured {
|
||||
sb.WriteString("- FEATURED: This is a highlighted match\n")
|
||||
}
|
||||
|
||||
sb.WriteString("\n## Output Format\n\n")
|
||||
sb.WriteString("Return a JSON object with the following structure:\n\n")
|
||||
sb.WriteString("```json\n")
|
||||
sb.WriteString(`{
|
||||
"key_moments": [
|
||||
{
|
||||
"turn": 87,
|
||||
"description": "SwarmBot loses 6 units in eastern engagement",
|
||||
"significance": "high",
|
||||
"tags": ["combat", "turning_point"]
|
||||
}
|
||||
],
|
||||
"summary": "A 2-3 sentence summary of the match narrative",
|
||||
"narrative": "A paragraph-length play-by-play narrative suitable for a replay viewer"
|
||||
}
|
||||
`)
|
||||
sb.WriteString("```\n\n")
|
||||
|
||||
sb.WriteString("## Guidelines\n\n")
|
||||
sb.WriteString("- Focus on strategic insights, not just play-by-play\n")
|
||||
sb.WriteString("- Highlight what made this match interesting (upsets, close finish, turning points)\n")
|
||||
sb.WriteString("- Use esports terminology (positioning, economy, pressure, comeback)\n")
|
||||
sb.WriteString("- Keep descriptions concise but vivid\n")
|
||||
sb.WriteString("- Significance levels: 'high' for match-defining moments, 'medium' for important shifts, 'low' for notable events\n")
|
||||
sb.WriteString("- Tags help categorize moments: combat, core_capture, spawn_wave, energy_control, turning_point, comeback\n")
|
||||
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
// chatCompletion sends a chat completion request to the LLM.
|
||||
func (c *Client) chatCompletion(ctx context.Context, prompt string, maxTokens int, temperature float64) (string, error) {
|
||||
type chatMessage struct {
|
||||
Role string `json:"role"`
|
||||
Content string `json:"content"`
|
||||
}
|
||||
type chatRequest struct {
|
||||
Model string `json:"model"`
|
||||
Messages []chatMessage `json:"messages"`
|
||||
MaxTokens int `json:"max_tokens,omitempty"`
|
||||
Temperature float64 `json:"temperature,omitempty"`
|
||||
}
|
||||
type chatResponse struct {
|
||||
Choices []struct {
|
||||
Message chatMessage `json:"message"`
|
||||
} `json:"choices"`
|
||||
Error *struct {
|
||||
Message string `json:"message"`
|
||||
} `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
body, err := json.Marshal(chatRequest{
|
||||
Model: c.model,
|
||||
Messages: []chatMessage{
|
||||
{Role: "user", Content: prompt},
|
||||
},
|
||||
MaxTokens: maxTokens,
|
||||
Temperature: temperature,
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("marshal request: %w", err)
|
||||
}
|
||||
|
||||
url := c.baseURL + "/v1/chat/completions"
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("build request: %w", err)
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
if c.apiKey != "" {
|
||||
httpReq.Header.Set("Authorization", "Bearer "+c.apiKey)
|
||||
}
|
||||
|
||||
resp, err := c.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("http request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respBytes, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("read response: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return "", fmt.Errorf("llm api returned %d: %s", resp.StatusCode, string(respBytes))
|
||||
}
|
||||
|
||||
var cr chatResponse
|
||||
if err := json.Unmarshal(respBytes, &cr); err != nil {
|
||||
return "", fmt.Errorf("unmarshal response: %w", err)
|
||||
}
|
||||
if cr.Error != nil {
|
||||
return "", fmt.Errorf("llm api error: %s", cr.Error.Message)
|
||||
}
|
||||
if len(cr.Choices) == 0 {
|
||||
return "", fmt.Errorf("llm api returned no choices")
|
||||
}
|
||||
|
||||
return cr.Choices[0].Message.Content, nil
|
||||
}
|
||||
|
||||
// parseResponse extracts JSON from the LLM response.
|
||||
func (c *Client) parseResponse(raw string) (*GenerateCommentaryResponse, error) {
|
||||
// Try to extract JSON from markdown code blocks
|
||||
raw = strings.TrimSpace(raw)
|
||||
|
||||
// Remove markdown code block markers if present
|
||||
if strings.HasPrefix(raw, "```json") {
|
||||
raw = strings.TrimPrefix(raw, "```json")
|
||||
raw = strings.TrimSuffix(raw, "```")
|
||||
raw = strings.TrimSpace(raw)
|
||||
} else if strings.HasPrefix(raw, "```") {
|
||||
raw = strings.TrimPrefix(raw, "```")
|
||||
raw = strings.TrimSuffix(raw, "```")
|
||||
raw = strings.TrimSpace(raw)
|
||||
}
|
||||
|
||||
var result GenerateCommentaryResponse
|
||||
if err := json.Unmarshal([]byte(raw), &result); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal commentary JSON: %w", err)
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
156
cmd/acb-enrichment/internal/selector/selector.go
Normal file
156
cmd/acb-enrichment/internal/selector/selector.go
Normal file
|
|
@ -0,0 +1,156 @@
|
|||
// Package selector selects matches for enrichment based on various criteria.
|
||||
package selector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/aicodebattle/acb/cmd/acb-enrichment/internal/db"
|
||||
)
|
||||
|
||||
// Selector chooses which matches should be enriched.
|
||||
type Selector struct {
|
||||
store *db.Store
|
||||
minTurnCount int
|
||||
minCrossings int
|
||||
upsetThreshold float64
|
||||
maxPerHour int
|
||||
}
|
||||
|
||||
// Config holds selector configuration.
|
||||
type Config struct {
|
||||
MinTurnCount int
|
||||
MinCrossings int
|
||||
UpsetThreshold float64
|
||||
MaxPerHour int
|
||||
}
|
||||
|
||||
// DefaultConfig returns default selector configuration.
|
||||
func DefaultConfig() Config {
|
||||
return Config{
|
||||
MinTurnCount: 100,
|
||||
MinCrossings: 3,
|
||||
UpsetThreshold: 150,
|
||||
MaxPerHour: 20,
|
||||
}
|
||||
}
|
||||
|
||||
// NewSelector creates a new match selector.
|
||||
func NewSelector(store *db.Store, cfg Config) *Selector {
|
||||
if cfg.MinTurnCount == 0 {
|
||||
cfg.MinTurnCount = DefaultConfig().MinTurnCount
|
||||
}
|
||||
if cfg.MinCrossings == 0 {
|
||||
cfg.MinCrossings = DefaultConfig().MinCrossings
|
||||
}
|
||||
if cfg.UpsetThreshold == 0 {
|
||||
cfg.UpsetThreshold = DefaultConfig().UpsetThreshold
|
||||
}
|
||||
if cfg.MaxPerHour == 0 {
|
||||
cfg.MaxPerHour = DefaultConfig().MaxPerHour
|
||||
}
|
||||
|
||||
return &Selector{
|
||||
store: store,
|
||||
minTurnCount: cfg.MinTurnCount,
|
||||
minCrossings: cfg.MinCrossings,
|
||||
upsetThreshold: cfg.UpsetThreshold,
|
||||
maxPerHour: cfg.MaxPerHour,
|
||||
}
|
||||
}
|
||||
|
||||
// SelectionResult holds the selected matches for enrichment.
|
||||
type SelectionResult struct {
|
||||
Matches []db.CandidateMatch
|
||||
Skipped int // Skipped due to rate limit
|
||||
}
|
||||
|
||||
// Select finds matches that qualify for enrichment, respecting rate limits.
|
||||
func (s *Selector) Select(ctx context.Context) (*SelectionResult, error) {
|
||||
// Check rate limit
|
||||
since := time.Now().Add(-1 * time.Hour)
|
||||
count, err := s.store.GetEnrichmentCount(ctx, since)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
remaining := s.maxPerHour - count
|
||||
if remaining <= 0 {
|
||||
return &SelectionResult{Skipped: 0}, nil
|
||||
}
|
||||
|
||||
// Fetch candidates
|
||||
candidates, err := s.store.FindCandidates(ctx, s.minTurnCount, s.minCrossings, s.upsetThreshold)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(candidates) == 0 {
|
||||
return &SelectionResult{}, nil
|
||||
}
|
||||
|
||||
// Sort by priority (featured criteria)
|
||||
s.sortByPriority(candidates)
|
||||
|
||||
// Take top matches up to rate limit
|
||||
selected := candidates
|
||||
if len(selected) > remaining {
|
||||
selected = selected[:remaining]
|
||||
}
|
||||
|
||||
return &SelectionResult{
|
||||
Matches: selected,
|
||||
Skipped: max(0, len(candidates)-remaining),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// sortByPriority sorts candidates by enrichment priority.
|
||||
// Higher priority: upsets, close finishes, then high win prob crossings.
|
||||
func (s *Selector) sortByPriority(matches []db.CandidateMatch) {
|
||||
sort.Slice(matches, func(i, j int) bool {
|
||||
return s.priorityScore(matches[i]) > s.priorityScore(matches[j])
|
||||
})
|
||||
}
|
||||
|
||||
// priorityScore calculates a priority score for a match.
|
||||
func (s *Selector) priorityScore(m db.CandidateMatch) float64 {
|
||||
score := 0.0
|
||||
|
||||
// Upsets are highest priority
|
||||
if m.IsUpset {
|
||||
score += 1000
|
||||
}
|
||||
|
||||
// Close finishes are high priority
|
||||
if m.IsCloseFinish {
|
||||
score += 500
|
||||
}
|
||||
|
||||
// Win prob crossings add to priority
|
||||
score += float64(m.WinProbCrossings) * 50
|
||||
|
||||
// Shorter matches (more action-packed) get slight boost
|
||||
if m.TurnCount < 300 {
|
||||
score += 20
|
||||
}
|
||||
|
||||
return score
|
||||
}
|
||||
|
||||
func max(a, b int) int {
|
||||
if a > b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// Shuffle randomly shuffles a slice of matches.
|
||||
func Shuffle(matches []db.CandidateMatch) {
|
||||
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
for i := len(matches) - 1; i > 0; i-- {
|
||||
j := rng.Intn(i + 1)
|
||||
matches[i], matches[j] = matches[j], matches[i]
|
||||
}
|
||||
}
|
||||
181
cmd/acb-enrichment/internal/storage/client.go
Normal file
181
cmd/acb-enrichment/internal/storage/client.go
Normal file
|
|
@ -0,0 +1,181 @@
|
|||
// Package storage provides S3-compatible storage clients for B2 and R2.
|
||||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Client is an S3-compatible storage client.
|
||||
type Client struct {
|
||||
accessKey string
|
||||
secretKey string
|
||||
endpoint string
|
||||
bucket string
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
// NewClient creates a new S3-compatible storage client.
|
||||
func NewClient(accessKey, secretKey, endpoint, bucket string) *Client {
|
||||
return &Client{
|
||||
accessKey: accessKey,
|
||||
secretKey: secretKey,
|
||||
endpoint: strings.TrimRight(endpoint, "/"),
|
||||
bucket: bucket,
|
||||
httpClient: &http.Client{
|
||||
Timeout: 60 * time.Second,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// HasCredentials returns true if the client has valid credentials configured.
|
||||
func (c *Client) HasCredentials() bool {
|
||||
return c.accessKey != "" && c.secretKey != "" && c.endpoint != "" && c.bucket != ""
|
||||
}
|
||||
|
||||
// FetchReplay fetches and decompresses a replay JSON from storage.
|
||||
func (c *Client) FetchReplay(ctx context.Context, matchID string) (map[string]interface{}, error) {
|
||||
if !c.HasCredentials() {
|
||||
return nil, fmt.Errorf("storage credentials not configured")
|
||||
}
|
||||
|
||||
// Try gzipped version first
|
||||
key := fmt.Sprintf("replays/%s.json.gz", matchID)
|
||||
data, err := c.fetchObject(ctx, key)
|
||||
if err != nil {
|
||||
// Fall back to uncompressed version
|
||||
key = fmt.Sprintf("replays/%s.json", matchID)
|
||||
data, err = c.fetchObject(ctx, key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("fetch replay %s: %w", matchID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Decompress if gzipped
|
||||
if strings.HasSuffix(key, ".gz") {
|
||||
data, err = gunzipData(data)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decompress replay: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var replay map[string]interface{}
|
||||
if err := json.Unmarshal(data, &replay); err != nil {
|
||||
return nil, fmt.Errorf("parse replay JSON: %w", err)
|
||||
}
|
||||
|
||||
return replay, nil
|
||||
}
|
||||
|
||||
// FetchMatchMetadata fetches match metadata from storage.
|
||||
func (c *Client) FetchMatchMetadata(ctx context.Context, matchID string) (map[string]interface{}, error) {
|
||||
if !c.HasCredentials() {
|
||||
return nil, fmt.Errorf("storage credentials not configured")
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("matches/%s.json", matchID)
|
||||
data, err := c.fetchObject(ctx, key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("fetch match metadata %s: %w", matchID, err)
|
||||
}
|
||||
|
||||
var metadata map[string]interface{}
|
||||
if err := json.Unmarshal(data, &metadata); err != nil {
|
||||
return nil, fmt.Errorf("parse match metadata JSON: %w", err)
|
||||
}
|
||||
|
||||
return metadata, nil
|
||||
}
|
||||
|
||||
// UploadCommentary uploads commentary JSON to storage.
|
||||
func (c *Client) UploadCommentary(ctx context.Context, matchID string, commentary map[string]interface{}) error {
|
||||
if !c.HasCredentials() {
|
||||
return fmt.Errorf("storage credentials not configured")
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("commentary/%s.json", matchID)
|
||||
data, err := json.MarshalIndent(commentary, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal commentary: %w", err)
|
||||
}
|
||||
|
||||
if err := c.putObject(ctx, key, data, "application/json"); err != nil {
|
||||
return fmt.Errorf("upload commentary: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// fetchObject retrieves an object from S3-compatible storage.
|
||||
func (c *Client) fetchObject(ctx context.Context, key string) ([]byte, error) {
|
||||
// Simple S3 GET request implementation
|
||||
url := fmt.Sprintf("%s/%s/%s", c.endpoint, c.bucket, key)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Add basic auth (many S3-compatible APIs accept this)
|
||||
req.SetBasicAuth(c.accessKey, c.secretKey)
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return nil, fmt.Errorf("object not found")
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("http status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
return io.ReadAll(resp.Body)
|
||||
}
|
||||
|
||||
// putObject uploads an object to S3-compatible storage.
|
||||
func (c *Client) putObject(ctx context.Context, key string, data []byte, contentType string) error {
|
||||
url := fmt.Sprintf("%s/%s/%s", c.endpoint, c.bucket, key)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", contentType)
|
||||
req.SetBasicAuth(c.accessKey, c.secretKey)
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("http status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// gunzipData decompresses gzip data.
|
||||
func gunzipData(data []byte) ([]byte, error) {
|
||||
r, err := gzip.NewReader(bytes.NewReader(data))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
return io.ReadAll(r)
|
||||
}
|
||||
127
cmd/acb-enrichment/main.go
Normal file
127
cmd/acb-enrichment/main.go
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
// Package main implements acb-enrichment, a service that generates AI commentary
|
||||
// for featured AI Code Battle matches. It polls for matches without commentary,
|
||||
// downloads replays from B2/R2, generates turn-by-turn narrative highlights via
|
||||
// LLM, and stores results back to R2.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"log/slog"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/aicodebattle/acb/metrics"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Setup structured logging
|
||||
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
|
||||
slog.SetDefault(logger)
|
||||
|
||||
cfg := LoadConfig()
|
||||
|
||||
// Connect to PostgreSQL
|
||||
db, err := sql.Open("postgres", cfg.DatabaseURL)
|
||||
if err != nil {
|
||||
slog.Error("Failed to connect to PostgreSQL", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Verify connection
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
if err := db.PingContext(ctx); err != nil {
|
||||
cancel()
|
||||
slog.Error("Failed to ping PostgreSQL", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
cancel()
|
||||
|
||||
slog.Info("Connected to PostgreSQL", "database", cfg.DatabaseName)
|
||||
|
||||
// Create enrichment service
|
||||
svc := NewEnrichmentService(db, cfg)
|
||||
|
||||
// Verify storage configuration
|
||||
if err := svc.CheckStorage(ctx); err != nil {
|
||||
slog.Error("Storage check failed", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Verify LLM configuration
|
||||
if err := svc.CheckLLM(ctx); err != nil {
|
||||
slog.Error("LLM check failed", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Start internal metrics server
|
||||
metricsSrv := metrics.StartServer()
|
||||
defer metricsSrv.Close()
|
||||
|
||||
// Handle graceful shutdown
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
|
||||
|
||||
startTime := time.Now()
|
||||
cycleCount := 0
|
||||
|
||||
// Main enrichment loop
|
||||
for {
|
||||
// Check lifetime
|
||||
if time.Since(startTime) > cfg.MaxLifetime {
|
||||
slog.Info("Max lifetime reached, exiting", "lifetime", cfg.MaxLifetime)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
// Check for shutdown signal
|
||||
select {
|
||||
case sig := <-sigChan:
|
||||
slog.Info("Received signal, shutting down", "signal", sig)
|
||||
os.Exit(0)
|
||||
default:
|
||||
}
|
||||
|
||||
// Run enrichment cycle
|
||||
cycleCount++
|
||||
slog.Info("Starting enrichment cycle", "count", cycleCount)
|
||||
|
||||
cycleStart := time.Now()
|
||||
cycleCtx, cycleCancel := context.WithTimeout(context.Background(), cfg.CycleTimeout)
|
||||
|
||||
results, err := svc.RunCycle(cycleCtx)
|
||||
cycleCancel()
|
||||
|
||||
if err != nil {
|
||||
slog.Error("Enrichment cycle failed", "error", err)
|
||||
metrics.EnrichmentCycles.WithLabelValues("error").Inc()
|
||||
} else {
|
||||
slog.Info("Enrichment cycle completed",
|
||||
"processed", results.Processed,
|
||||
"enriched", results.Enriched,
|
||||
"skipped", results.Skipped,
|
||||
"failed", results.Failed,
|
||||
)
|
||||
metrics.EnrichmentCycles.WithLabelValues("success").Inc()
|
||||
metrics.EnrichmentProcessed.Add(float64(results.Processed))
|
||||
metrics.EnrichmentGenerated.Add(float64(results.Enriched))
|
||||
}
|
||||
|
||||
metrics.EnrichmentCycleDuration.Observe(time.Since(cycleStart).Seconds())
|
||||
|
||||
// Sleep until next cycle
|
||||
slog.Info("Sleeping until next enrichment cycle", "duration", cfg.CycleInterval)
|
||||
time.Sleep(cfg.CycleInterval)
|
||||
}
|
||||
}
|
||||
|
||||
// CycleResults holds statistics from a single enrichment cycle.
|
||||
type CycleResults struct {
|
||||
Processed int
|
||||
Enriched int
|
||||
Skipped int
|
||||
Failed int
|
||||
}
|
||||
145
cmd/acb-enrichment/service.go
Normal file
145
cmd/acb-enrichment/service.go
Normal file
|
|
@ -0,0 +1,145 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
|
||||
dbstore "github.com/aicodebattle/acb/cmd/acb-enrichment/internal/db"
|
||||
"github.com/aicodebattle/acb/cmd/acb-enrichment/internal/generator"
|
||||
"github.com/aicodebattle/acb/cmd/acb-enrichment/internal/llm"
|
||||
"github.com/aicodebattle/acb/cmd/acb-enrichment/internal/selector"
|
||||
"github.com/aicodebattle/acb/cmd/acb-enrichment/internal/storage"
|
||||
)
|
||||
|
||||
// EnrichmentService manages the AI replay enrichment process.
|
||||
type EnrichmentService struct {
|
||||
db *sql.DB
|
||||
cfg Config
|
||||
store *dbstore.Store
|
||||
selector *selector.Selector
|
||||
generator *generator.Generator
|
||||
r2Client *storage.Client
|
||||
b2Client *storage.Client
|
||||
llmClient *llm.Client
|
||||
}
|
||||
|
||||
// NewEnrichmentService creates a new enrichment service.
|
||||
func NewEnrichmentService(db *sql.DB, cfg Config) *EnrichmentService {
|
||||
// Initialize database store
|
||||
store := dbstore.NewStore(db)
|
||||
|
||||
// Initialize storage clients
|
||||
r2Client := storage.NewClient(cfg.R2AccessKeyID, cfg.R2SecretAccessKey, cfg.R2Endpoint, cfg.R2BucketName)
|
||||
b2Client := storage.NewClient(cfg.B2AccessKeyID, cfg.B2SecretAccessKey, cfg.B2Endpoint, cfg.B2BucketName)
|
||||
|
||||
// Initialize LLM client
|
||||
llmClient := llm.NewClient(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel)
|
||||
|
||||
// Initialize selector
|
||||
selCfg := selector.Config{
|
||||
MinTurnCount: cfg.MinTurnCount,
|
||||
MinCrossings: cfg.MinWinProbCrossings,
|
||||
UpsetThreshold: cfg.UpsetThreshold,
|
||||
MaxPerHour: cfg.MaxEnrichmentsPerHour,
|
||||
}
|
||||
sel := selector.NewSelector(store, selCfg)
|
||||
|
||||
// Initialize generator
|
||||
genCfg := generator.Config{
|
||||
MaxConcurrent: cfg.MaxConcurrentRequests,
|
||||
}
|
||||
// Prefer R2 for storage, fall back to B2
|
||||
storageClient := r2Client
|
||||
if !storageClient.HasCredentials() {
|
||||
storageClient = b2Client
|
||||
}
|
||||
gen := generator.NewGenerator(storageClient, llmClient, store, genCfg)
|
||||
|
||||
return &EnrichmentService{
|
||||
db: db,
|
||||
cfg: cfg,
|
||||
store: store,
|
||||
selector: sel,
|
||||
generator: gen,
|
||||
r2Client: r2Client,
|
||||
b2Client: b2Client,
|
||||
llmClient: llmClient,
|
||||
}
|
||||
}
|
||||
|
||||
// RunCycle executes one enrichment cycle.
|
||||
func (s *EnrichmentService) RunCycle(ctx context.Context) (CycleResults, error) {
|
||||
results := CycleResults{}
|
||||
|
||||
slog.Info("Selecting matches for enrichment")
|
||||
|
||||
// Select candidates
|
||||
selection, err := s.selector.Select(ctx)
|
||||
if err != nil {
|
||||
return results, fmt.Errorf("select matches: %w", err)
|
||||
}
|
||||
|
||||
results.Skipped = selection.Skipped
|
||||
candidates := selection.Matches
|
||||
|
||||
if len(candidates) == 0 {
|
||||
slog.Info("No matches qualifying for enrichment")
|
||||
return results, nil
|
||||
}
|
||||
|
||||
results.Processed = len(candidates)
|
||||
slog.Info("Found candidate matches", "count", len(candidates))
|
||||
|
||||
// Enrich matches
|
||||
slog.Info("Generating commentary", "matches", len(candidates))
|
||||
enrichmentResults := s.generator.EnrichMatches(ctx, candidates)
|
||||
|
||||
// Process results
|
||||
for _, er := range enrichmentResults {
|
||||
if er.Success {
|
||||
results.Enriched++
|
||||
slog.Info("Enriched match",
|
||||
"match_id", er.MatchID,
|
||||
"duration", er.Duration.String())
|
||||
} else {
|
||||
results.Failed++
|
||||
slog.Error("Failed to enrich match",
|
||||
"match_id", er.MatchID,
|
||||
"error", er.Error)
|
||||
}
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// CheckStorage verifies that storage credentials are configured.
|
||||
func (s *EnrichmentService) CheckStorage(ctx context.Context) error {
|
||||
// Check R2 first
|
||||
if s.r2Client.HasCredentials() {
|
||||
slog.Info("Using R2 for enrichment storage")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fall back to B2
|
||||
if s.b2Client.HasCredentials() {
|
||||
slog.Info("Using B2 for enrichment storage")
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("no storage credentials configured (R2 or B2 required)")
|
||||
}
|
||||
|
||||
// CheckLLM verifies that LLM credentials are configured.
|
||||
func (s *EnrichmentService) CheckLLM(ctx context.Context) error {
|
||||
if s.cfg.LLMAPIKey == "" && s.cfg.LLMBaseURL == "" {
|
||||
return fmt.Errorf("no LLM configuration (ACB_LLM_API_KEY or ACB_LLM_BASE_URL required)")
|
||||
}
|
||||
|
||||
slog.Info("LLM configured",
|
||||
"base_url", s.cfg.LLMBaseURL,
|
||||
"model", s.cfg.LLMModel)
|
||||
|
||||
return nil
|
||||
}
|
||||
156
manifests/acb-enrichment-deployment.yml
Normal file
156
manifests/acb-enrichment-deployment.yml
Normal file
|
|
@ -0,0 +1,156 @@
|
|||
# acb-enrichment: AI replay commentary service
|
||||
# Polls for featured matches, generates AI commentary via LLM, stores results.
|
||||
#
|
||||
# Staging file — sync to declarative-config/k8s/apexalgo-iad/ai-code-battle/
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: ServiceAccount
|
||||
metadata:
|
||||
name: acb-enrichment
|
||||
namespace: ai-code-battle
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: acb-enrichment
|
||||
namespace: ai-code-battle
|
||||
labels:
|
||||
app.kubernetes.io/name: acb-enrichment
|
||||
app.kubernetes.io/part-of: ai-code-battle
|
||||
annotations:
|
||||
argocd-image-updater.argoproj.io/image-list: app=ronaldraygun/acb-enrichment
|
||||
argocd-image-updater.argoproj.io/app.update-strategy: name
|
||||
argocd-image-updater.argoproj.io/app.allow-tags: 'regexp:^sha-[0-9a-f]+$'
|
||||
argocd-image-updater.argoproj.io/write-back-method: argocd
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: acb-enrichment
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: acb-enrichment
|
||||
app.kubernetes.io/part-of: ai-code-battle
|
||||
spec:
|
||||
serviceAccountName: acb-enrichment
|
||||
restartPolicy: Always
|
||||
containers:
|
||||
- name: enrichment
|
||||
image: ronaldraygun/acb-enrichment@sha256:placeholder
|
||||
imagePullPolicy: Always
|
||||
ports:
|
||||
- containerPort: 9090
|
||||
protocol: TCP
|
||||
name: metrics
|
||||
env:
|
||||
- name: ACB_DATABASE_URL
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: acb-app-credentials-acb-app
|
||||
key: uri
|
||||
- name: ACB_DATABASE_NAME
|
||||
value: "acb"
|
||||
# LLM configuration
|
||||
- name: ACB_LLM_BASE_URL
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: openai-secret
|
||||
key: url
|
||||
optional: true
|
||||
- name: ACB_LLM_API_KEY
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: openai-secret
|
||||
key: api-key
|
||||
optional: true
|
||||
- name: ACB_LLM_MODEL
|
||||
value: "gpt-4o-mini"
|
||||
- name: ACB_LLM_MAX_TOKENS
|
||||
value: "3000"
|
||||
- name: ACB_LLM_TEMPERATURE
|
||||
value: "0.7"
|
||||
# Rate limiting
|
||||
- name: ACB_ENRICHMENT_MAX_PER_HOUR
|
||||
value: "20"
|
||||
- name: ACB_ENRICHMENT_MAX_CONCURRENT
|
||||
value: "3"
|
||||
# B2 storage (cold archive)
|
||||
- name: ACB_B2_ENDPOINT
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: backblaze-secret
|
||||
key: endpoint
|
||||
optional: true
|
||||
- name: ACB_B2_BUCKET
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: backblaze-secret
|
||||
key: bucket
|
||||
optional: true
|
||||
- name: ACB_B2_ACCESS_KEY_ID
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: backblaze-secret
|
||||
key: key-id
|
||||
optional: true
|
||||
- name: ACB_B2_SECRET_ACCESS_KEY
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: backblaze-secret
|
||||
key: secret-key
|
||||
optional: true
|
||||
# R2 storage (warm cache - preferred)
|
||||
- name: ACB_R2_ENDPOINT
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: cloudflare-pages-secret
|
||||
key: r2-endpoint
|
||||
optional: true
|
||||
- name: ACB_R2_BUCKET
|
||||
value: "acb-data"
|
||||
- name: ACB_R2_ACCESS_KEY_ID
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: cloudflare-pages-secret
|
||||
key: r2-access-key
|
||||
optional: true
|
||||
- name: ACB_R2_SECRET_ACCESS_KEY
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: cloudflare-pages-secret
|
||||
key: r2-secret-key
|
||||
optional: true
|
||||
# Enrichment criteria
|
||||
- name: ACB_ENRICHMENT_MIN_TURNS
|
||||
value: "100"
|
||||
- name: ACB_ENRICHMENT_MIN_CROSSINGS
|
||||
value: "3"
|
||||
- name: ACB_ENRICHMENT_UPSET_THRESHOLD
|
||||
value: "150"
|
||||
# Timing
|
||||
- name: ACB_ENRICHMENT_INTERVAL
|
||||
value: "30m"
|
||||
- name: ACB_ENRICHMENT_TIMEOUT
|
||||
value: "25m"
|
||||
- name: ACB_ENRICHMENT_MAX_LIFETIME
|
||||
value: "4h"
|
||||
resources:
|
||||
requests:
|
||||
cpu: "200m"
|
||||
memory: "512Mi"
|
||||
limits:
|
||||
cpu: "1"
|
||||
memory: "2Gi"
|
||||
# Enrichment is a batch loop, not an HTTP server — use exec probe
|
||||
livenessProbe:
|
||||
exec:
|
||||
command:
|
||||
- pgrep
|
||||
- -x
|
||||
- acb-enrichment
|
||||
initialDelaySeconds: 30
|
||||
periodSeconds: 60
|
||||
failureThreshold: 3
|
||||
imagePullSecrets:
|
||||
- name: docker-hub-registry
|
||||
|
|
@ -115,6 +115,37 @@ var (
|
|||
Name: "acb_rate_limit_hits_total",
|
||||
Help: "Total number of requests rejected by rate limiting.",
|
||||
}, []string{"endpoint"})
|
||||
|
||||
// EnrichmentCycles counts enrichment cycles completed.
|
||||
EnrichmentCycles = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "acb_enrichment_cycles_total",
|
||||
Help: "Total number of enrichment cycles completed.",
|
||||
}, []string{"status"})
|
||||
|
||||
// EnrichmentProcessed counts matches processed for enrichment.
|
||||
EnrichmentProcessed = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "acb_enrichment_processed_total",
|
||||
Help: "Total number of matches processed for enrichment.",
|
||||
})
|
||||
|
||||
// EnrichmentGenerated counts successful commentaries generated.
|
||||
EnrichmentGenerated = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "acb_enrichment_generated_total",
|
||||
Help: "Total number of commentaries successfully generated.",
|
||||
})
|
||||
|
||||
// EnrichmentCycleDuration tracks enrichment cycle duration.
|
||||
EnrichmentCycleDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Name: "acb_enrichment_cycle_duration_seconds",
|
||||
Help: "Duration of enrichment cycles in seconds.",
|
||||
Buckets: []float64{30, 60, 120, 300, 600, 900, 1800},
|
||||
})
|
||||
|
||||
// EnrichmentLLMRequests counts LLM API requests for enrichment.
|
||||
EnrichmentLLMRequests = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "acb_enrichment_llm_requests_total",
|
||||
Help: "Total number of LLM requests for enrichment.",
|
||||
}, []string{"status"})
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
|
@ -135,6 +166,11 @@ func init() {
|
|||
WorkerJobsClaimedTotal,
|
||||
WorkerMatchDuration,
|
||||
RateLimitHits,
|
||||
EnrichmentCycles,
|
||||
EnrichmentProcessed,
|
||||
EnrichmentGenerated,
|
||||
EnrichmentCycleDuration,
|
||||
EnrichmentLLMRequests,
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue