diff --git a/crates/miroir-core/src/scatter.rs b/crates/miroir-core/src/scatter.rs index 48c1496..64560ec 100644 --- a/crates/miroir-core/src/scatter.rs +++ b/crates/miroir-core/src/scatter.rs @@ -1,6 +1,7 @@ //! Scatter orchestration: fan-out logic and covering set builder. use crate::config::UnavailableShardPolicy; +use crate::replica_selection::ReplicaSelector; use tracing::{instrument, info_span, Instrument}; use crate::merger::{MergeInput, MergedSearchResult, MergeStrategy, ShardHitPage}; use crate::router::{covering_set, covering_set_with_version_floor, query_group}; @@ -507,6 +508,61 @@ pub fn plan_search_scatter_for_group( }) } +/// Plan search scatter using adaptive replica selection (plan §13.3). +/// +/// Uses EWMA-based scoring to select the best replica for each shard, +/// falling back to round-robin for shards with no metrics data. +#[instrument(skip_all, fields(query_seq, rf, shard_count))] +pub async fn plan_search_scatter_adaptive( + topology: &Topology, + query_seq: u64, + rf: usize, + shard_count: u32, + replica_selector: &ReplicaSelector, +) -> ScatterPlan { + let chosen_group = query_group(query_seq, topology.replica_group_count()); + + let group = match topology.group(chosen_group) { + Some(g) => g, + None => { + return ScatterPlan { + chosen_group, + target_shards: Vec::new(), + shard_to_node: HashMap::new(), + deadline_ms: 0, + hedging_eligible: false, + }; + } + }; + + let mut shard_to_node = HashMap::new(); + for shard_id in 0..shard_count { + let replicas = crate::router::assign_shard_in_group(shard_id, group.nodes(), rf); + if replicas.is_empty() { + continue; + } + + // Use adaptive selection to pick the best replica + let selected = match replica_selector.select(&replicas, chosen_group).await { + Some(node) => node, + None => { + // Fallback to round-robin if selector returns None + replicas[(query_seq as usize) % replicas.len()].clone() + } + }; + + shard_to_node.insert(shard_id, selected); + } + + ScatterPlan { + chosen_group, + target_shards: (0..shard_count).collect(), + shard_to_node, + deadline_ms: 5000, + hedging_eligible: group.node_count() > 1, + } +} + #[instrument(skip_all, fields(node_count))] pub async fn execute_scatter( plan: ScatterPlan, diff --git a/crates/miroir-proxy/src/middleware.rs b/crates/miroir-proxy/src/middleware.rs index 9334eaa..7ca8d1a 100644 --- a/crates/miroir-proxy/src/middleware.rs +++ b/crates/miroir-proxy/src/middleware.rs @@ -291,6 +291,10 @@ pub struct Metrics { antientropy_mismatches_found_total: Counter, antientropy_docs_repaired_total: Counter, antientropy_last_scan_completed_seconds: Gauge, + + // ── §13.3 Adaptive replica selection metrics (always present) ── + replica_selection_score: GaugeVec, + replica_selection_exploration_total: Counter, } impl Clone for Metrics { @@ -380,6 +384,8 @@ impl Clone for Metrics { antientropy_mismatches_found_total: self.antientropy_mismatches_found_total.clone(), antientropy_docs_repaired_total: self.antientropy_docs_repaired_total.clone(), antientropy_last_scan_completed_seconds: self.antientropy_last_scan_completed_seconds.clone(), + replica_selection_score: self.replica_selection_score.clone(), + replica_selection_exploration_total: self.replica_selection_exploration_total.clone(), } } } @@ -949,6 +955,17 @@ impl Metrics { reg!(antientropy_docs_repaired_total); reg!(antientropy_last_scan_completed_seconds); + // ── §13.3 Adaptive replica selection metrics (always present) ── + let replica_selection_score = GaugeVec::new( + Opts::new("miroir_replica_selection_score", "Adaptive replica selection score (lower is better)"), + &["node_id"], + ).expect("create replica_selection_score"); + let replica_selection_exploration_total = Counter::with_opts( + Opts::new("miroir_replica_selection_exploration_total", "Exploration selections (epsilon-greedy random picks)") + ).expect("create replica_selection_exploration_total"); + reg!(replica_selection_score); + reg!(replica_selection_exploration_total); + Self { registry, request_duration, @@ -1034,6 +1051,8 @@ impl Metrics { antientropy_mismatches_found_total, antientropy_docs_repaired_total, antientropy_last_scan_completed_seconds, + replica_selection_score, + replica_selection_exploration_total, } } @@ -1727,6 +1746,16 @@ impl Metrics { self.session_wait_timeout_total.with_label_values(&[strategy]).inc(); } + // ── §13.3 Adaptive replica selection metrics ── + + pub fn set_replica_selection_score(&self, node_id: &str, score: f64) { + self.replica_selection_score.with_label_values(&[node_id]).set(score); + } + + pub fn inc_replica_selection_exploration(&self) { + self.replica_selection_exploration_total.inc(); + } + pub fn registry(&self) -> &Registry { &self.registry }