ai-code-battle/cmd/acb-worker/metrics.go
jedarden ea04f4debb style: apply gofmt alignment fixes across codebase
Tab/space alignment consistency from running gofmt on all packages.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 10:40:33 -04:00

255 lines
7.8 KiB
Go

// Metrics collection and HTTP server for monitoring.
//
// Exposes Prometheus text format metrics at /metrics, plus
// /health and /ready endpoints for K8s probes.
package main
import (
"fmt"
"net/http"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
)
// Metrics collects operational metrics for the match worker.
type Metrics struct {
// Counters
matchesTotal atomic.Int64
matchErrorsTotal atomic.Int64
jobsClaimedTotal atomic.Int64
jobsFailedTotal atomic.Int64
replaysUploaded atomic.Int64
replayUploadErrs atomic.Int64
pollCycles atomic.Int64
heartbeatsSent atomic.Int64
heartbeatErrors atomic.Int64
// Histograms (stored as individual observations)
mu sync.Mutex
matchDurations []float64 // seconds
replayUploadDurations []float64 // seconds
replaySizes []float64 // bytes
// State
startTime time.Time
workerID string
ready atomic.Bool
}
// NewMetrics creates a new Metrics instance.
func NewMetrics(workerID string) *Metrics {
m := &Metrics{
startTime: time.Now(),
workerID: workerID,
matchDurations: make([]float64, 0, 1024),
replayUploadDurations: make([]float64, 0, 1024),
replaySizes: make([]float64, 0, 1024),
}
m.ready.Store(true)
return m
}
// RecordMatch records a completed match.
func (m *Metrics) RecordMatch(duration time.Duration) {
m.matchesTotal.Add(1)
m.mu.Lock()
m.matchDurations = append(m.matchDurations, duration.Seconds())
m.mu.Unlock()
}
// RecordMatchError records a match execution error.
func (m *Metrics) RecordMatchError() {
m.matchErrorsTotal.Add(1)
}
// RecordJobClaimed records a job claim.
func (m *Metrics) RecordJobClaimed() {
m.jobsClaimedTotal.Add(1)
}
// RecordJobFailed records a job failure report.
func (m *Metrics) RecordJobFailed() {
m.jobsFailedTotal.Add(1)
}
// RecordReplayUpload records a successful replay upload.
func (m *Metrics) RecordReplayUpload(duration time.Duration, sizeBytes int) {
m.replaysUploaded.Add(1)
m.mu.Lock()
m.replayUploadDurations = append(m.replayUploadDurations, duration.Seconds())
m.replaySizes = append(m.replaySizes, float64(sizeBytes))
m.mu.Unlock()
}
// RecordReplayUploadError records a replay upload error.
func (m *Metrics) RecordReplayUploadError() {
m.replayUploadErrs.Add(1)
}
// RecordPollCycle records a poll cycle.
func (m *Metrics) RecordPollCycle() {
m.pollCycles.Add(1)
}
// RecordHeartbeat records a heartbeat sent.
func (m *Metrics) RecordHeartbeat() {
m.heartbeatsSent.Add(1)
}
// RecordHeartbeatError records a heartbeat error.
func (m *Metrics) RecordHeartbeatError() {
m.heartbeatErrors.Add(1)
}
// SetReady sets the worker readiness state.
func (m *Metrics) SetReady(ready bool) {
m.ready.Store(ready)
}
// Handler returns an http.Handler serving metrics and health endpoints.
func (m *Metrics) Handler() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/metrics", m.handleMetrics)
mux.HandleFunc("/health", m.handleHealth)
mux.HandleFunc("/ready", m.handleReady)
return mux
}
func (m *Metrics) handleHealth(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{"status":"ok","worker_id":%q,"uptime_seconds":%.0f}`,
m.workerID, time.Since(m.startTime).Seconds())
}
func (m *Metrics) handleReady(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if m.ready.Load() {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{"status":"ready","worker_id":%q}`, m.workerID)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprintf(w, `{"status":"not_ready","worker_id":%q}`, m.workerID)
}
}
func (m *Metrics) handleMetrics(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
var b strings.Builder
// Worker info
writeGauge(&b, "acb_worker_info", "Worker metadata", 1,
"worker_id", m.workerID)
writeGauge(&b, "acb_worker_uptime_seconds", "Time since worker started",
time.Since(m.startTime).Seconds())
// Counters
writeCounter(&b, "acb_matches_total", "Total matches executed",
float64(m.matchesTotal.Load()))
writeCounter(&b, "acb_match_errors_total", "Total match execution errors",
float64(m.matchErrorsTotal.Load()))
writeCounter(&b, "acb_jobs_claimed_total", "Total jobs claimed",
float64(m.jobsClaimedTotal.Load()))
writeCounter(&b, "acb_jobs_failed_total", "Total jobs reported as failed",
float64(m.jobsFailedTotal.Load()))
writeCounter(&b, "acb_replays_uploaded_total", "Total replays uploaded to R2",
float64(m.replaysUploaded.Load()))
writeCounter(&b, "acb_replay_upload_errors_total", "Total replay upload errors",
float64(m.replayUploadErrs.Load()))
writeCounter(&b, "acb_poll_cycles_total", "Total job poll cycles",
float64(m.pollCycles.Load()))
writeCounter(&b, "acb_heartbeats_sent_total", "Total heartbeats sent",
float64(m.heartbeatsSent.Load()))
writeCounter(&b, "acb_heartbeat_errors_total", "Total heartbeat errors",
float64(m.heartbeatErrors.Load()))
// Histograms (snapshot under lock)
m.mu.Lock()
matchDurs := make([]float64, len(m.matchDurations))
copy(matchDurs, m.matchDurations)
uploadDurs := make([]float64, len(m.replayUploadDurations))
copy(uploadDurs, m.replayUploadDurations)
replaySizes := make([]float64, len(m.replaySizes))
copy(replaySizes, m.replaySizes)
m.mu.Unlock()
matchBuckets := []float64{1, 5, 10, 30, 60, 120, 300, 600}
writeHistogram(&b, "acb_match_duration_seconds",
"Match execution duration in seconds", matchDurs, matchBuckets)
uploadBuckets := []float64{0.1, 0.5, 1, 2, 5, 10, 30}
writeHistogram(&b, "acb_replay_upload_duration_seconds",
"Replay upload duration in seconds", uploadDurs, uploadBuckets)
sizeBuckets := []float64{1024, 10240, 102400, 1048576, 10485760}
writeHistogram(&b, "acb_replay_size_bytes",
"Replay file size in bytes", replaySizes, sizeBuckets)
w.Write([]byte(b.String()))
}
// writeCounter writes a counter metric in Prometheus text format.
func writeCounter(b *strings.Builder, name, help string, value float64) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s counter\n", name)
fmt.Fprintf(b, "%s %g\n\n", name, value)
}
// writeGauge writes a gauge metric in Prometheus text format.
func writeGauge(b *strings.Builder, name, help string, value float64, labels ...string) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s gauge\n", name)
if len(labels) > 0 {
labelStr := formatLabels(labels)
fmt.Fprintf(b, "%s{%s} %g\n\n", name, labelStr, value)
} else {
fmt.Fprintf(b, "%s %g\n\n", name, value)
}
}
// writeHistogram writes a histogram metric in Prometheus text format.
func writeHistogram(b *strings.Builder, name, help string, observations []float64, buckets []float64) {
fmt.Fprintf(b, "# HELP %s %s\n", name, help)
fmt.Fprintf(b, "# TYPE %s histogram\n", name)
sort.Float64s(buckets)
var sum float64
for _, v := range observations {
sum += v
}
sorted := make([]float64, len(observations))
copy(sorted, observations)
sort.Float64s(sorted)
count := len(sorted)
for _, boundary := range buckets {
n := countLE(sorted, boundary)
fmt.Fprintf(b, "%s_bucket{le=\"%g\"} %d\n", name, boundary, n)
}
fmt.Fprintf(b, "%s_bucket{le=\"+Inf\"} %d\n", name, count)
fmt.Fprintf(b, "%s_sum %g\n", name, sum)
fmt.Fprintf(b, "%s_count %d\n\n", name, count)
}
// countLE counts values <= boundary in a sorted slice.
func countLE(sorted []float64, boundary float64) int {
i := sort.Search(len(sorted), func(i int) bool {
return sorted[i] > boundary
})
return i
}
// formatLabels formats label key-value pairs for Prometheus output.
func formatLabels(labels []string) string {
var parts []string
for i := 0; i+1 < len(labels); i += 2 {
parts = append(parts, fmt.Sprintf(`%s=%q`, labels[i], labels[i+1]))
}
return strings.Join(parts, ",")
}