feat: add per-iteration timing and load-shedding to ProcessorManager

Add a 5-iteration rolling average timer to Process() with automatic
load-shedding levels (0-3) based on pipeline duration thresholds
(80ms/90ms/95ms). Recovery steps down when avg drops below 60ms for
10 consecutive iterations. Includes GetShedLevel() getter.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-07 12:52:27 -04:00
parent 57c27de729
commit a9fa6f6f25

View file

@ -225,6 +225,11 @@ type ProcessorManager struct {
alpha float64
fusionRate float64 // Hz
trackedBlobs []TrackedBlob
iterDurations [5]time.Duration // ring buffer for last 5 iteration times
iterIdx int // next write index (mod 5)
iterCount int // how many values filled (0-5)
shedLevel int // current load shedding level (0-3)
steadyCount int // consecutive iters below recovery threshold
}
// ProcessorManagerConfig holds configuration for ProcessorManager
@ -250,15 +255,64 @@ func NewProcessorManager(cfg ProcessorManagerConfig) *ProcessorManager {
// Process processes a CSI frame for a link
func (pm *ProcessorManager) Process(linkID string, payload []int8, rssiDBm int8, nSub int, recvTime time.Time) (*ProcessResult, error) {
t0 := time.Now()
pm.mu.Lock()
processor, exists := pm.processors[linkID]
if !exists {
processor = NewLinkProcessor(linkID, pm.nSub, pm.alpha)
pm.processors[linkID] = processor
}
result, err := processor.Process(payload, rssiDBm, nSub, recvTime)
pm.updateShedding(time.Since(t0))
pm.mu.Unlock()
return result, err
}
return processor.Process(payload, rssiDBm, nSub, recvTime)
// updateShedding updates the load-shedding level based on a rolling 5-iteration average.
// Caller must hold pm.mu (write lock).
func (pm *ProcessorManager) updateShedding(elapsed time.Duration) {
pm.iterDurations[pm.iterIdx%5] = elapsed
pm.iterIdx++
if pm.iterCount < 5 {
pm.iterCount++
}
// compute rolling avg
var sum time.Duration
for i := 0; i < pm.iterCount; i++ {
sum += pm.iterDurations[i]
}
avg := sum / time.Duration(pm.iterCount)
// level up
if avg >= 95*time.Millisecond && pm.shedLevel < 3 {
pm.shedLevel = 3
pm.steadyCount = 0
} else if avg >= 90*time.Millisecond && pm.shedLevel < 2 {
pm.shedLevel = 2
pm.steadyCount = 0
} else if avg >= 80*time.Millisecond && pm.shedLevel < 1 {
pm.shedLevel = 1
pm.steadyCount = 0
}
// recovery: step down one level when avg < 60ms for 10 consecutive iters
if avg < 60*time.Millisecond {
pm.steadyCount++
if pm.steadyCount >= 10 && pm.shedLevel > 0 {
pm.shedLevel--
pm.steadyCount = 0
}
} else {
pm.steadyCount = 0
}
}
// GetShedLevel returns the current load-shedding level (0-3).
func (pm *ProcessorManager) GetShedLevel() int {
pm.mu.RLock()
defer pm.mu.RUnlock()
return pm.shedLevel
}
// GetProcessor returns the processor for a link, or nil if not exists