feat(api): implement POST /api/request-enrichment endpoint
Per plan §13.3, implements user-requested AI replay commentary with: - HMAC bot authentication via shared_secret - Rate limiting: 5 requests/day per bot - Match validation (exists and completed) - Idempotency via enrichment_requested_at column - Enqueues to Valkey for acb-enrichment service - Returns 202 Accepted with estimated wait time Also adds: - AllowN() method to ratelimit package for multi-token checks - enrichment_requested_at column to matches table (idempotency) - enrichLtr rate limiter (5/day per bot) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
9e566acf92
commit
6bfd3e6679
6 changed files with 395 additions and 9 deletions
|
|
@ -1 +1 @@
|
|||
9972cb8c841386dfa9648c189e613c977047edda
|
||||
9e566acf921bd92497a9b85cf3b558ce8739e5e7
|
||||
|
|
|
|||
|
|
@ -274,6 +274,13 @@ DO $$ BEGIN
|
|||
END IF;
|
||||
END $$;
|
||||
|
||||
-- Add enrichment_requested_at column to matches for idempotency (§13.3)
|
||||
DO $$ BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'matches' AND column_name = 'enrichment_requested_at') THEN
|
||||
ALTER TABLE matches ADD COLUMN enrichment_requested_at TIMESTAMPTZ;
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
-- Community replay feedback (plan §13.6, §8.3)
|
||||
CREATE TABLE IF NOT EXISTS replay_feedback (
|
||||
feedback_id VARCHAR(32) PRIMARY KEY,
|
||||
|
|
@ -294,6 +301,20 @@ CREATE TABLE IF NOT EXISTS feedback_upvotes (
|
|||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
PRIMARY KEY (feedback_id, voter_id)
|
||||
);
|
||||
|
||||
-- User-requested replay enrichment (plan §13.3)
|
||||
CREATE TABLE IF NOT EXISTS enrichment_requests (
|
||||
request_id VARCHAR(32) PRIMARY KEY,
|
||||
match_id VARCHAR(32) NOT NULL REFERENCES matches(match_id),
|
||||
bot_id VARCHAR(16) NOT NULL REFERENCES bots(bot_id),
|
||||
status VARCHAR(16) NOT NULL DEFAULT 'pending', -- pending, processing, completed, failed
|
||||
requested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
processed_at TIMESTAMPTZ,
|
||||
error_msg TEXT,
|
||||
UNIQUE(match_id, bot_id)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_enrichment_requests_status ON enrichment_requests(status, requested_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_enrichment_requests_bot ON enrichment_requests(bot_id, requested_at);
|
||||
`
|
||||
|
||||
func ensureSchema(ctx context.Context, db *sql.DB) error {
|
||||
|
|
|
|||
103
cmd/acb-api/enrichment_test.go
Normal file
103
cmd/acb-api/enrichment_test.go
Normal file
|
|
@ -0,0 +1,103 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRequestEnrichmentRouteRegistered(t *testing.T) {
|
||||
srv := newTestServer()
|
||||
mux := http.NewServeMux()
|
||||
srv.RegisterRoutes(mux)
|
||||
|
||||
// POST /api/request-enrichment should be routed (400 for empty body, not 404)
|
||||
req := httptest.NewRequest("POST", "/api/request-enrichment", nil)
|
||||
w := httptest.NewRecorder()
|
||||
mux.ServeHTTP(w, req)
|
||||
|
||||
if w.Code == http.StatusNotFound {
|
||||
t.Fatal("POST /api/request-enrichment returned 404 — route not registered")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestEnrichmentRejectsBadMethod(t *testing.T) {
|
||||
srv := newTestServer()
|
||||
mux := http.NewServeMux()
|
||||
srv.RegisterRoutes(mux)
|
||||
|
||||
// GET should be rejected on the enrichment endpoint
|
||||
req := httptest.NewRequest("GET", "/api/request-enrichment", nil)
|
||||
w := httptest.NewRecorder()
|
||||
mux.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusMethodNotAllowed {
|
||||
t.Errorf("GET /api/request-enrichment returned %d, want 405", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestEnrichmentRejectsInvalidBody(t *testing.T) {
|
||||
srv := newTestServer()
|
||||
mux := http.NewServeMux()
|
||||
srv.RegisterRoutes(mux)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
body string
|
||||
wantErr string
|
||||
}{
|
||||
{
|
||||
name: "empty body",
|
||||
body: "",
|
||||
wantErr: "invalid request body",
|
||||
},
|
||||
{
|
||||
name: "missing match_id",
|
||||
body: `{"shared_secret": "test"}`,
|
||||
wantErr: "match_id and shared_secret are required",
|
||||
},
|
||||
{
|
||||
name: "missing shared_secret",
|
||||
body: `{"match_id": "m_test123"}`,
|
||||
wantErr: "match_id and shared_secret are required",
|
||||
},
|
||||
{
|
||||
name: "empty match_id",
|
||||
body: `{"match_id": "", "shared_secret": "test"}`,
|
||||
wantErr: "match_id and shared_secret are required",
|
||||
},
|
||||
{
|
||||
name: "empty shared_secret",
|
||||
body: `{"match_id": "m_test123", "shared_secret": ""}`,
|
||||
wantErr: "match_id and shared_secret are required",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
req := httptest.NewRequest("POST", "/api/request-enrichment", strings.NewReader(tt.body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
mux.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("POST /api/request-enrichment with %s returned %d, want 400", tt.name, w.Code)
|
||||
}
|
||||
|
||||
var resp map[string]string
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to decode response: %v", err)
|
||||
}
|
||||
|
||||
if resp["error"] != tt.wantErr {
|
||||
t.Errorf("error message = %q, want %q", resp["error"], tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Note: Full enrichment endpoint testing requires a database connection.
|
||||
// The tests above cover routing and basic request validation.
|
||||
// Integration tests with real database are skipped unless ACB_TEST_DATABASE_URL is set.
|
||||
|
|
@ -85,11 +85,12 @@ func main() {
|
|||
cfg: cfg,
|
||||
db: db,
|
||||
rdb: rdb,
|
||||
regLimiter: ratelimit.NewLimiter(5, 5.0/3600), // 5/hour per IP
|
||||
feedbackLtr: ratelimit.NewLimiter(20, 20.0/3600), // 20/hour per IP
|
||||
predictLtr: ratelimit.NewLimiter(60, 60.0/3600), // 60/hour per IP
|
||||
submitLtr: ratelimit.NewLimiter(5, 5.0/86400), // 5/day per key
|
||||
voteLtr: ratelimit.NewLimiter(10, 10.0/3600), // 10/hour per IP
|
||||
regLimiter: ratelimit.NewLimiter(5, 5.0/3600), // 5/hour per IP
|
||||
feedbackLtr: ratelimit.NewLimiter(20, 20.0/3600), // 20/hour per IP
|
||||
predictLtr: ratelimit.NewLimiter(60, 60.0/3600), // 60/hour per IP
|
||||
submitLtr: ratelimit.NewLimiter(5, 5.0/86400), // 5/day per key
|
||||
enrichLtr: ratelimit.NewLimiter(5, 5.0/86400), // 5/day per bot
|
||||
voteLtr: ratelimit.NewLimiter(10, 10.0/3600), // 10/hour per IP
|
||||
}
|
||||
|
||||
// Initialize spam filter with configurable block-list
|
||||
|
|
@ -113,6 +114,7 @@ func main() {
|
|||
srv.feedbackLtr.Cleanup(time.Hour)
|
||||
srv.predictLtr.Cleanup(time.Hour)
|
||||
srv.submitLtr.Cleanup(24 * time.Hour)
|
||||
srv.enrichLtr.Cleanup(24 * time.Hour)
|
||||
srv.voteLtr.Cleanup(time.Hour)
|
||||
}
|
||||
}()
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ type Server struct {
|
|||
feedbackLtr *ratelimit.Limiter // 20/hour per IP
|
||||
predictLtr *ratelimit.Limiter // 60/hour per IP
|
||||
submitLtr *ratelimit.Limiter // 5/day per bot_id
|
||||
enrichLtr *ratelimit.Limiter // 5/day per bot_id for enrichment requests
|
||||
voteLtr *ratelimit.Limiter // 10/hour per IP
|
||||
spamFilter *SpamFilter // word/spam filter for feedback
|
||||
}
|
||||
|
|
@ -94,6 +95,10 @@ func (s *Server) RegisterRoutes(mux *http.ServeMux) {
|
|||
})
|
||||
mux.HandleFunc("POST /api/vote/map", mapVoteMW(http.HandlerFunc(s.handleMapVote)).ServeHTTP)
|
||||
mux.HandleFunc("GET /api/vote/map/", s.handleGetMapVotes)
|
||||
|
||||
// User-requested replay enrichment — 5/day per bot (§13.3)
|
||||
// Rate limiting is handled inside the handler after bot authentication
|
||||
mux.HandleFunc("POST /api/request-enrichment", s.handleRequestEnrichment)
|
||||
}
|
||||
|
||||
// ipKey extracts the client IP from the request for rate limiting.
|
||||
|
|
@ -1841,6 +1846,251 @@ func (s *Server) handleGetMapVotes(w http.ResponseWriter, r *http.Request) {
|
|||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
// handleRequestEnrichment handles POST /api/request-enrichment
|
||||
// Allows bot owners to request AI commentary for a specific match replay.
|
||||
// Requires bot authentication via shared_secret.
|
||||
// Rate limited to 5 requests/day per bot.
|
||||
// Returns 202 Accepted with estimated wait time.
|
||||
func (s *Server) handleRequestEnrichment(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||
return
|
||||
}
|
||||
|
||||
var req struct {
|
||||
MatchID string `json:"match_id"`
|
||||
Secret string `json:"shared_secret"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
if req.MatchID == "" || req.Secret == "" {
|
||||
writeError(w, http.StatusBadRequest, "match_id and shared_secret are required")
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// First, find the bot by matching the shared_secret
|
||||
// We need to iterate since secrets are encrypted and we can't query by them
|
||||
var botID string
|
||||
var found bool
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, `SELECT bot_id, shared_secret FROM bots WHERE shared_secret IS NOT NULL`)
|
||||
if err != nil {
|
||||
log.Printf("[ENRICHMENT] database error querying bots: %v", err)
|
||||
writeError(w, http.StatusInternalServerError, "database error")
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var id, encrypted string
|
||||
if err := rows.Scan(&id, &encrypted); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Decrypt and compare
|
||||
var storedSecret string
|
||||
if s.cfg.EncryptionKey != "" {
|
||||
storedSecret, err = decryptSecret(encrypted, s.cfg.EncryptionKey)
|
||||
if err != nil {
|
||||
// If decryption fails, try treating it as plaintext
|
||||
storedSecret = encrypted
|
||||
}
|
||||
} else {
|
||||
storedSecret = encrypted
|
||||
}
|
||||
|
||||
if storedSecret == req.Secret {
|
||||
botID = id
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("[ENRICHMENT] error iterating bots: %v", err)
|
||||
}
|
||||
|
||||
if !found {
|
||||
writeError(w, http.StatusUnauthorized, "invalid shared_secret")
|
||||
return
|
||||
}
|
||||
|
||||
// Check rate limit: 5 requests per day per bot
|
||||
_, allowed := s.enrichLtr.AllowN(botID, 1)
|
||||
if !allowed {
|
||||
metrics.RateLimitHits.WithLabelValues("enrichment").Inc()
|
||||
writeError(w, http.StatusTooManyRequests, "rate limit exceeded: 5 enrichment requests per day per bot")
|
||||
return
|
||||
}
|
||||
|
||||
// Check if match exists and is completed
|
||||
var matchStatus string
|
||||
var enrichmentRequestedAt sql.NullTime
|
||||
err = s.db.QueryRowContext(ctx, `
|
||||
SELECT status, enrichment_requested_at FROM matches WHERE match_id = $1
|
||||
`, req.MatchID).Scan(&matchStatus, &enrichmentRequestedAt)
|
||||
if err == sql.ErrNoRows {
|
||||
writeError(w, http.StatusNotFound, "match not found")
|
||||
return
|
||||
} else if err != nil {
|
||||
log.Printf("[ENRICHMENT] database error checking match: %v", err)
|
||||
writeError(w, http.StatusInternalServerError, "database error")
|
||||
return
|
||||
}
|
||||
|
||||
if matchStatus != "completed" {
|
||||
writeError(w, http.StatusConflict, "match must be completed before requesting enrichment")
|
||||
return
|
||||
}
|
||||
|
||||
// Idempotency check: if enrichment was already requested for this match, return success
|
||||
if enrichmentRequestedAt.Valid {
|
||||
// Check if there's an existing enrichment request record
|
||||
var existingRequestID sql.NullString
|
||||
var existingStatus sql.NullString
|
||||
err := s.db.QueryRowContext(ctx, `
|
||||
SELECT request_id, status FROM enrichment_requests
|
||||
WHERE match_id = $1 ORDER BY requested_at DESC LIMIT 1
|
||||
`, req.MatchID).Scan(&existingRequestID, &existingStatus)
|
||||
|
||||
if err == nil {
|
||||
status := existingStatus.String
|
||||
waitSeconds := estimateWaitTime(status)
|
||||
|
||||
respStatus := http.StatusAccepted
|
||||
if status == "completed" {
|
||||
respStatus = http.StatusOK
|
||||
}
|
||||
|
||||
writeJSON(w, respStatus, map[string]interface{}{
|
||||
"status": status,
|
||||
"request_id": existingRequestID.String,
|
||||
"match_id": req.MatchID,
|
||||
"estimated_wait_s": waitSeconds,
|
||||
})
|
||||
return
|
||||
} else if err != sql.ErrNoRows {
|
||||
log.Printf("[ENRICHMENT] database error checking existing request: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Check if there's already a pending request for this match/bot combination
|
||||
var existingRequestID sql.NullString
|
||||
var existingStatus sql.NullString
|
||||
err = s.db.QueryRowContext(ctx, `
|
||||
SELECT request_id, status FROM enrichment_requests
|
||||
WHERE match_id = $1 AND bot_id = $2
|
||||
`, req.MatchID, botID).Scan(&existingRequestID, &existingStatus)
|
||||
if err == nil {
|
||||
// Request already exists from this bot
|
||||
status := existingStatus.String
|
||||
waitSeconds := estimateWaitTime(status)
|
||||
writeJSON(w, http.StatusAccepted, map[string]interface{}{
|
||||
"status": status,
|
||||
"request_id": existingRequestID.String,
|
||||
"match_id": req.MatchID,
|
||||
"estimated_wait_s": waitSeconds,
|
||||
})
|
||||
return
|
||||
} else if err != sql.ErrNoRows {
|
||||
log.Printf("[ENRICHMENT] database error checking existing request: %v", err)
|
||||
writeError(w, http.StatusInternalServerError, "database error")
|
||||
return
|
||||
}
|
||||
|
||||
// Create new enrichment request
|
||||
requestID, err := generateID("req_", 6)
|
||||
if err != nil {
|
||||
log.Printf("[ENRICHMENT] failed to generate request ID: %v", err)
|
||||
writeError(w, http.StatusInternalServerError, "failed to generate request ID")
|
||||
return
|
||||
}
|
||||
|
||||
// Insert the request and update the match's enrichment_requested_at timestamp
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
log.Printf("[ENRICHMENT] failed to begin transaction: %v", err)
|
||||
writeError(w, http.StatusInternalServerError, "database error")
|
||||
return
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
INSERT INTO enrichment_requests (request_id, match_id, bot_id, status, requested_at)
|
||||
VALUES ($1, $2, $3, 'pending', NOW())
|
||||
`, requestID, req.MatchID, botID)
|
||||
if err != nil {
|
||||
log.Printf("[ENRICHMENT] failed to insert enrichment request: %v", err)
|
||||
writeError(w, http.StatusInternalServerError, "failed to create enrichment request")
|
||||
return
|
||||
}
|
||||
|
||||
// Mark the match as having enrichment requested (for idempotency)
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
UPDATE matches SET enrichment_requested_at = NOW() WHERE match_id = $1
|
||||
`, req.MatchID)
|
||||
if err != nil {
|
||||
log.Printf("[ENRICHMENT] failed to update match enrichment_requested_at: %v", err)
|
||||
writeError(w, http.StatusInternalServerError, "failed to update match")
|
||||
return
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
log.Printf("[ENRICHMENT] failed to commit transaction: %v", err)
|
||||
writeError(w, http.StatusInternalServerError, "failed to create enrichment request")
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("[ENRICHMENT] request created: id=%s match=%s bot=%s", requestID, req.MatchID, botID)
|
||||
|
||||
// Enqueue the match for acb-enrichment service by pushing to Valkey
|
||||
if s.rdb != nil {
|
||||
if err := s.enqueueForEnrichment(ctx, req.MatchID); err != nil {
|
||||
log.Printf("[ENRICHMENT] failed to enqueue match for enrichment: %v", err)
|
||||
// Don't fail the request - the enrichment service polls the database anyway
|
||||
}
|
||||
} else {
|
||||
log.Printf("[ENRICHMENT] no Valkey connection, skipping queue enqueue")
|
||||
}
|
||||
|
||||
waitSeconds := estimateWaitTime("pending")
|
||||
|
||||
writeJSON(w, http.StatusAccepted, map[string]interface{}{
|
||||
"status": "pending",
|
||||
"request_id": requestID,
|
||||
"match_id": req.MatchID,
|
||||
"estimated_wait_s": waitSeconds,
|
||||
})
|
||||
}
|
||||
|
||||
// enqueueForEnrichment pushes the match ID to a Valkey queue for the enrichment service.
|
||||
func (s *Server) enqueueForEnrichment(ctx context.Context, matchID string) error {
|
||||
// Push to the enrichment queue in Valkey
|
||||
// The enrichment service will BRPOP from this queue
|
||||
return s.rdb.LPush(ctx, "acb:enrichment:queue", matchID).Err()
|
||||
}
|
||||
|
||||
// estimateWaitTime returns an estimated wait time in seconds based on request status.
|
||||
func estimateWaitTime(status string) int {
|
||||
switch status {
|
||||
case "pending":
|
||||
return 300 // 5 minutes for new requests
|
||||
case "processing":
|
||||
return 60 // 1 minute if already being processed
|
||||
case "completed":
|
||||
return 0 // Already done
|
||||
case "failed":
|
||||
return -1 // Failed - will retry
|
||||
default:
|
||||
return 300 // Default to 5 minutes
|
||||
}
|
||||
}
|
||||
|
||||
func getEnv(key, defaultValue string) string {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
return v
|
||||
|
|
|
|||
|
|
@ -30,6 +30,11 @@ func NewBucket(max, refillPerSec float64) *Bucket {
|
|||
|
||||
// Allow consumes one token. Returns true if a token was available.
|
||||
func (b *Bucket) Allow() bool {
|
||||
return b.AllowN(1)
|
||||
}
|
||||
|
||||
// AllowN consumes n tokens. Returns true if n tokens were available.
|
||||
func (b *Bucket) AllowN(n int) bool {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
|
|
@ -40,10 +45,10 @@ func (b *Bucket) Allow() bool {
|
|||
if b.tokens > b.max {
|
||||
b.tokens = b.max
|
||||
}
|
||||
if b.tokens < 1 {
|
||||
if b.tokens < float64(n) {
|
||||
return false
|
||||
}
|
||||
b.tokens--
|
||||
b.tokens -= float64(n)
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
@ -84,6 +89,11 @@ func NewLimiter(max, refillPerSec float64) *Limiter {
|
|||
|
||||
// Allow checks the bucket for the given key. Creates one if needed.
|
||||
func (l *Limiter) Allow(key string) (*Bucket, bool) {
|
||||
return l.AllowN(key, 1)
|
||||
}
|
||||
|
||||
// AllowN checks if n tokens are available for the given key. Creates a bucket if needed.
|
||||
func (l *Limiter) AllowN(key string, n int) (*Bucket, bool) {
|
||||
l.mu.Lock()
|
||||
b, ok := l.buckets[key]
|
||||
if !ok {
|
||||
|
|
@ -92,7 +102,7 @@ func (l *Limiter) Allow(key string) (*Bucket, bool) {
|
|||
}
|
||||
l.mu.Unlock()
|
||||
|
||||
return b, b.Allow()
|
||||
return b, b.AllowN(n)
|
||||
}
|
||||
|
||||
// Cleanup removes buckets that haven't been used in the given duration.
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue