From a9fa6f6f25faceee2b0b26d0ebcdbbc2e3697eeb Mon Sep 17 00:00:00 2001 From: jedarden Date: Tue, 7 Apr 2026 12:52:27 -0400 Subject: [PATCH] feat: add per-iteration timing and load-shedding to ProcessorManager Add a 5-iteration rolling average timer to Process() with automatic load-shedding levels (0-3) based on pipeline duration thresholds (80ms/90ms/95ms). Recovery steps down when avg drops below 60ms for 10 consecutive iterations. Includes GetShedLevel() getter. Co-Authored-By: Claude Opus 4.6 --- mothership/internal/signal/processor.go | 56 ++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/mothership/internal/signal/processor.go b/mothership/internal/signal/processor.go index eca9420..7fb4b70 100644 --- a/mothership/internal/signal/processor.go +++ b/mothership/internal/signal/processor.go @@ -225,6 +225,11 @@ type ProcessorManager struct { alpha float64 fusionRate float64 // Hz trackedBlobs []TrackedBlob + iterDurations [5]time.Duration // ring buffer for last 5 iteration times + iterIdx int // next write index (mod 5) + iterCount int // how many values filled (0-5) + shedLevel int // current load shedding level (0-3) + steadyCount int // consecutive iters below recovery threshold } // ProcessorManagerConfig holds configuration for ProcessorManager @@ -250,15 +255,64 @@ func NewProcessorManager(cfg ProcessorManagerConfig) *ProcessorManager { // Process processes a CSI frame for a link func (pm *ProcessorManager) Process(linkID string, payload []int8, rssiDBm int8, nSub int, recvTime time.Time) (*ProcessResult, error) { + t0 := time.Now() pm.mu.Lock() processor, exists := pm.processors[linkID] if !exists { processor = NewLinkProcessor(linkID, pm.nSub, pm.alpha) pm.processors[linkID] = processor } + result, err := processor.Process(payload, rssiDBm, nSub, recvTime) + pm.updateShedding(time.Since(t0)) pm.mu.Unlock() + return result, err +} - return processor.Process(payload, rssiDBm, nSub, recvTime) +// updateShedding updates the load-shedding level based on a rolling 5-iteration average. +// Caller must hold pm.mu (write lock). +func (pm *ProcessorManager) updateShedding(elapsed time.Duration) { + pm.iterDurations[pm.iterIdx%5] = elapsed + pm.iterIdx++ + if pm.iterCount < 5 { + pm.iterCount++ + } + + // compute rolling avg + var sum time.Duration + for i := 0; i < pm.iterCount; i++ { + sum += pm.iterDurations[i] + } + avg := sum / time.Duration(pm.iterCount) + + // level up + if avg >= 95*time.Millisecond && pm.shedLevel < 3 { + pm.shedLevel = 3 + pm.steadyCount = 0 + } else if avg >= 90*time.Millisecond && pm.shedLevel < 2 { + pm.shedLevel = 2 + pm.steadyCount = 0 + } else if avg >= 80*time.Millisecond && pm.shedLevel < 1 { + pm.shedLevel = 1 + pm.steadyCount = 0 + } + + // recovery: step down one level when avg < 60ms for 10 consecutive iters + if avg < 60*time.Millisecond { + pm.steadyCount++ + if pm.steadyCount >= 10 && pm.shedLevel > 0 { + pm.shedLevel-- + pm.steadyCount = 0 + } + } else { + pm.steadyCount = 0 + } +} + +// GetShedLevel returns the current load-shedding level (0-3). +func (pm *ProcessorManager) GetShedLevel() int { + pm.mu.RLock() + defer pm.mu.RUnlock() + return pm.shedLevel } // GetProcessor returns the processor for a link, or nil if not exists