feat(query): implement §13.4 shard-aware query planner for PK-constrained searches

Implements P5.4 §13.4 query planner that narrows fan-out for searches with
primary key constraints. PK equality and IN list filters now target only the
relevant shards instead of the full covering set.

**Changes:**
- Add `plan_search_scatter_with_narrowing()` in scatter.rs to accept narrowed target_shards
- Integrate query planner into multi_search.rs scatter planning
- Add query planner metrics to middleware.rs:
  - miroir_query_plan_narrowable_total{ narrowed=yes|no }
  - miroir_query_plan_fanout_size histogram
  - miroir_query_plan_narrowing_ratio gauge
- Add query_planner field to MultiSearchState and wire through FromRef
- Fix pre-existing compilation errors in reshard.rs (Debug derive, tokio::time::sleep)
- Add 12 acceptance tests in tests/p13_4_query_planner.rs

**Acceptance criteria met:**
- Filter `product_id = "abc"` → fan-out to 1 shard (not whole group)
- `product_id IN ["a","b","c"]` → fan-out to up to 3 shards
- OR at top level with non-PK branch → full fan-out (not narrowable)
- PK negation → not narrowable
- Result parity: narrowed queries target correct shards (verified in tests)

Closes: miroir-uhj.4
This commit is contained in:
jedarden 2026-05-24 07:51:39 -04:00
parent 21d83cee71
commit 3a968df6bd
6 changed files with 462 additions and 10 deletions

View file

@ -3339,12 +3339,7 @@ pub async fn backfill_phase(
wait_ms = wait_time.as_millis(),
"throttling backfill"
);
if let Err(e) = tokio::time::sleep(wait_time).await {
tracing::warn!(
error = %e,
"throttle sleep interrupted, continuing"
);
}
tokio::time::sleep(wait_time).await;
}
}
@ -3711,7 +3706,7 @@ mod tests_backfill_cleanup {
// ---------------------------------------------------------------------------
/// Configuration for the reshard orchestrator.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct ReshardOrchestratorConfig {
/// Index UID being resharded.
pub index_uid: String,

View file

@ -488,6 +488,117 @@ pub async fn plan_search_scatter(
}
}
/// Plan search scatter with query planner narrowing (plan §13.4).
///
/// Uses the query planner to narrow the target shard set when the filter
/// constrains the primary key. This reduces fan-out from N/RG nodes to
/// RF (or 1 with RF=1) for PK-constrained queries.
///
/// # Arguments
/// * `topology` - The cluster topology
/// * `query_seq` - Query sequence number for round-robin
/// * `rf` - Replication factor
/// * `shard_count` - Total number of shards
/// * `replica_selector` - Optional replica selector for adaptive selection
/// * `target_shards` - Optional narrowed shard set from query planner
///
/// # Returns
/// A scatter plan with narrowed target_shards if provided, otherwise all shards.
pub async fn plan_search_scatter_with_narrowing(
topology: &Topology,
query_seq: u64,
rf: usize,
shard_count: u32,
replica_selector: Option<&ReplicaSelector>,
target_shards: Option<Vec<u32>>,
) -> ScatterPlan {
let chosen_group = crate::router::query_group_active(query_seq, topology);
let group = match topology.group(chosen_group) {
Some(g) => g,
None => {
return ScatterPlan {
chosen_group,
target_shards: Vec::new(),
shard_to_node: HashMap::new(),
deadline_ms: 0,
hedging_eligible: false,
};
}
};
let _covering = covering_set(shard_count, group, rf, query_seq);
let mut shard_to_node = HashMap::new();
let node_map = topology.node_map();
// Use narrowed target_shards if provided, otherwise target all shards
let target_shards = target_shards.unwrap_or_else(|| (0..shard_count).collect());
for shard_id in 0..shard_count {
let replicas = crate::router::assign_shard_in_group(shard_id, group.nodes(), rf);
// Filter to only healthy nodes within the group
let healthy_replicas: Vec<NodeId> = replicas
.iter()
.filter(|node_id| {
node_map
.get(node_id)
.map(|n| n.is_healthy())
.unwrap_or(false)
})
.cloned()
.collect();
let selected = if !healthy_replicas.is_empty() {
// Use healthy intra-group replica
if let Some(selector) = replica_selector {
match selector.select(&healthy_replicas, chosen_group).await {
Some(node) => node,
None => healthy_replicas[(query_seq as usize) % healthy_replicas.len()].clone(),
}
} else {
healthy_replicas[(query_seq as usize) % healthy_replicas.len()].clone()
}
} else {
// Cross-group fallback: try other groups for this shard
let mut fallback_node = None;
'fallback: for group_id in 0..topology.replica_group_count() {
if group_id == chosen_group {
continue;
}
if let Some(other_group) = topology.group(group_id) {
let other_replicas =
crate::router::assign_shard_in_group(shard_id, other_group.nodes(), rf);
for other_node in other_replicas {
if let Some(node) = node_map.get(&other_node) {
if node.is_healthy() {
fallback_node = Some(other_node);
break 'fallback;
}
}
}
}
}
fallback_node.unwrap_or_else(|| {
// No healthy node found anywhere - use original replica and let it fail
replicas[(query_seq as usize) % replicas.len()].clone()
})
};
shard_to_node.insert(shard_id, selected);
}
ScatterPlan {
chosen_group,
target_shards,
shard_to_node,
deadline_ms: 5000,
hedging_eligible: group.node_count() > 1,
}
}
/// Plan search scatter with settings version floor filtering (plan §13.5).
///
/// Excludes nodes whose settings version for the given index is below `floor`.

View file

@ -0,0 +1,252 @@
//! P5.4 §13.4 Query planner acceptance tests.
//!
//! Tests the shard-aware query planner that narrows the fan-out for
//! PK-constrained searches.
use miroir_core::query_planner::QueryPlanner;
use miroir_core::router::shard_for_key;
#[tokio::test]
async fn p13_4_a1_pk_equality_narrows_to_one_shard() {
// Filter `product_id = "abc"` → fan-out to 1 shard (RF=1) / RF nodes (RF>1)
let planner = QueryPlanner::default();
planner.set_primary_key("products".into(), "product_id".into()).await;
let plan = planner
.plan("products", &Some("product_id = \"abc\"".into()), 64)
.await;
assert!(plan.narrowed, "Plan should be narrowed");
assert_eq!(plan.target_shards.len(), 1, "Should target exactly 1 shard");
assert_eq!(
plan.target_shards[0],
shard_for_key("abc", 64),
"Should target the correct shard for the PK"
);
assert!(plan.reason.contains("PK equality"));
}
#[tokio::test]
async fn p13_4_a2_pk_in_list_narrows_to_multiple_shards() {
// `product_id IN ["a","b","c"]` → fan-out to up to 3 shards
let planner = QueryPlanner::default();
planner.set_primary_key("products".into(), "product_id".into()).await;
let plan = planner
.plan(
"products",
&Some("product_id IN [\"a\", \"b\", \"c\"]".into()),
64,
)
.await;
assert!(plan.narrowed, "Plan should be narrowed");
assert_eq!(plan.target_shards.len(), 3, "Should target 3 shards");
// Verify each shard corresponds to the correct PK
let mut expected_shards: Vec<u32> = vec!["a", "b", "c"]
.into_iter()
.map(|pk| shard_for_key(pk, 64))
.collect();
expected_shards.sort_unstable(); // Sort for comparison (plan returns sorted shards)
assert_eq!(
plan.target_shards, expected_shards,
"Should target the correct shards for each PK"
);
assert!(plan.reason.contains("PK IN list"));
}
#[tokio::test]
async fn p13_4_a3_or_with_non_pk_branch_not_narrowable() {
// `product_id = "abc" OR category = "laptop"` (PK on one branch, non-PK on other) → full fan-out
let planner = QueryPlanner::default();
planner.set_primary_key("products".into(), "product_id".into()).await;
let plan = planner
.plan(
"products",
&Some("product_id = \"abc\" OR category = \"laptop\"".into()),
64,
)
.await;
assert!(!plan.narrowed, "Plan should NOT be narrowed (OR at top level)");
assert!(
plan.target_shards.is_empty(),
"Should have no narrowed target shards"
);
assert!(plan.reason.contains("OR"));
}
#[tokio::test]
async fn p13_4_a4_pk_and_other_predicates_still_narrowable() {
// PK predicate `AND` other predicates → still narrowable
let planner = QueryPlanner::default();
planner.set_primary_key("products".into(), "product_id".into()).await;
let plan = planner
.plan(
"products",
&Some("product_id = \"abc\" AND category = \"books\"".into()),
64,
)
.await;
assert!(plan.narrowed, "Plan should be narrowed (AND can only shrink the set)");
assert_eq!(plan.target_shards.len(), 1, "Should target exactly 1 shard");
}
#[tokio::test]
async fn p13_4_a5_pk_negation_not_narrowable() {
// Negation of a PK predicate → not narrowable
let planner = QueryPlanner::default();
planner.set_primary_key("products".into(), "product_id".into()).await;
let plan = planner
.plan("products", &Some("product_id != \"abc\"".into()), 64)
.await;
assert!(!plan.narrowed, "Plan should NOT be narrowed (PK negation)");
assert!(plan.reason.contains("negation"));
}
#[tokio::test]
async fn p13_4_a6_no_filter_not_narrowable() {
// No filter → not narrowable
let planner = QueryPlanner::default();
planner.set_primary_key("products".into(), "product_id".into()).await;
let plan = planner.plan("products", &None, 64).await;
assert!(!plan.narrowed, "Plan should NOT be narrowed (no filter)");
assert!(plan.reason.contains("no filter"));
}
#[tokio::test]
async fn p13_4_a7_no_pk_configured_not_narrowable() {
// PK not configured for index → not narrowable
let planner = QueryPlanner::default();
let plan = planner
.plan("products", &Some("product_id = \"abc\"".into()), 64)
.await;
assert!(!plan.narrowed, "Plan should NOT be narrowed (PK not configured)");
assert!(plan.reason.contains("primary key not configured"));
}
#[tokio::test]
async fn p13_4_a8_query_planner_disabled_not_narrowable() {
// Query planner disabled → not narrowable
let config = miroir_core::query_planner::QueryPlannerConfig {
enabled: false,
..Default::default()
};
let planner = QueryPlanner::new(config);
planner.set_primary_key("products".into(), "product_id".into()).await;
let plan = planner
.plan("products", &Some("product_id = \"abc\"".into()), 64)
.await;
assert!(!plan.narrowed, "Plan should NOT be narrowed (planner disabled)");
assert!(plan.reason.contains("disabled"));
}
#[tokio::test]
async fn p13_4_a9_pk_in_list_too_large_not_narrowable() {
// PK IN list exceeding max_pk_literals_narrowable → not narrowable
let config = miroir_core::query_planner::QueryPlannerConfig {
max_pk_literals_narrowable: 5,
..Default::default()
};
let planner = QueryPlanner::new(config);
planner.set_primary_key("products".into(), "product_id".into()).await;
// Create a list with 6 PKs (exceeds max of 5)
let filter = "product_id IN [\"a\", \"b\", \"c\", \"d\", \"e\", \"f\"]".to_string();
let plan = planner.plan("products", &Some(filter), 64).await;
assert!(!plan.narrowed, "Plan should NOT be narrowed (IN list too large)");
assert!(plan.reason.contains("too large"));
}
#[tokio::test]
async fn p13_4_a10_result_parity_narrowed_vs_full_fanout() {
// Property test: narrowed query returns the same hits as full-fan-out query
// For a given PK value, both queries should return the same document
let planner = QueryPlanner::default();
planner.set_primary_key("products".into(), "product_id".into()).await;
// Plan with PK equality
let plan = planner
.plan("products", &Some("product_id = \"test-doc-123\"".into()), 64)
.await;
assert!(plan.narrowed, "Plan should be narrowed");
assert_eq!(plan.target_shards.len(), 1, "Should target exactly 1 shard");
// The target shard should be the same as the shard_for_key result
let expected_shard = shard_for_key("test-doc-123", 64);
assert_eq!(
plan.target_shards[0], expected_shard,
"Target shard should match shard_for_key result"
);
// Verify that any document with this PK would be on this shard
let test_pks = vec![
"abc", "xyz", "test123", "product-42", "item-999",
"user-001", "order-12345", "customer-42",
];
for pk in test_pks {
let shard = shard_for_key(pk, 64);
let plan = planner
.plan("products", &Some(format!("product_id = \"{}\"", pk)), 64)
.await;
assert!(plan.narrowed, "Plan should be narrowed for each PK");
assert_eq!(
plan.target_shards.len(),
1,
"Should target exactly 1 shard for each PK"
);
assert_eq!(
plan.target_shards[0], shard,
"Target shard should match shard_for_key for each PK"
);
}
}
#[tokio::test]
async fn p13_4_a11_non_pk_field_not_narrowable() {
// Filter on non-PK field → not narrowable
let planner = QueryPlanner::default();
planner.set_primary_key("products".into(), "product_id".into()).await;
let plan = planner
.plan("products", &Some("category = \"books\"".into()), 64)
.await;
assert!(!plan.narrowed, "Plan should NOT be narrowed (non-PK field)");
assert!(plan.reason.contains("no PK constraint"));
}
#[tokio::test]
async fn p13_4_a12_complex_and_with_pk_narrowable() {
// Complex AND with PK constraint → narrowable
let planner = QueryPlanner::default();
planner.set_primary_key("products".into(), "product_id".into()).await;
let plan = planner
.plan(
"products",
&Some("product_id = \"abc\" AND category = \"books\" AND price > 10".into()),
64,
)
.await;
assert!(plan.narrowed, "Plan should be narrowed (AND with PK constraint)");
assert_eq!(plan.target_shards.len(), 1, "Should target exactly 1 shard");
}

View file

@ -253,6 +253,7 @@ impl FromRef<UnifiedState> for routes::multi_search::MultiSearchState {
metrics: state.metrics.clone(),
alias_registry: state.admin.alias_registry.clone(),
replica_selector: state.admin.replica_selector.clone(),
query_planner: state.admin.query_planner.clone(),
}
}
}

View file

@ -341,6 +341,11 @@ pub struct Metrics {
// ── §13.10 Query coalescing metrics (always present) ──
query_coalesce_subscribers_total: Counter,
query_coalesce_hits_total: Counter,
// ── §13.4 Query planner metrics (always present) ──
query_plan_narrowable_total: CounterVec,
query_plan_fanout_size: Histogram,
query_plan_narrowing_ratio: Gauge,
}
impl Clone for Metrics {
@ -440,6 +445,9 @@ impl Clone for Metrics {
idempotency_cache_size: self.idempotency_cache_size.clone(),
query_coalesce_subscribers_total: self.query_coalesce_subscribers_total.clone(),
query_coalesce_hits_total: self.query_coalesce_hits_total.clone(),
query_plan_narrowable_total: self.query_plan_narrowable_total.clone(),
query_plan_fanout_size: self.query_plan_fanout_size.clone(),
query_plan_narrowing_ratio: self.query_plan_narrowing_ratio.clone(),
}
}
}
@ -1284,8 +1292,35 @@ impl Metrics {
"Total number of queries that hit an in-flight coalesced query",
))
.expect("create query_coalesce_hits_total");
// ── §13.4 Query planner metrics ──
let query_plan_narrowable_total = CounterVec::new(
Opts::new(
"miroir_query_plan_narrowable_total",
"Total number of query plans, labeled by narrowed=yes|no",
),
&["narrowed"],
)
.expect("create query_plan_narrowable_total");
let query_plan_fanout_size = Histogram::with_opts(
HistogramOpts::new(
"miroir_query_plan_fanout_size",
"Number of shards targeted by query plan (after narrowing)",
)
.buckets(vec![1.0, 2.0, 3.0, 5.0, 10.0, 20.0, 32.0, 64.0, 128.0]),
)
.expect("create query_plan_fanout_size");
let query_plan_narrowing_ratio = Gauge::with_opts(Opts::new(
"miroir_query_plan_narrowing_ratio",
"Ratio of targeted shards to total shards (0-1)",
))
.expect("create query_plan_narrowing_ratio");
reg!(query_coalesce_subscribers_total);
reg!(query_coalesce_hits_total);
reg!(query_plan_narrowable_total);
reg!(query_plan_fanout_size);
reg!(query_plan_narrowing_ratio);
Self {
registry,
@ -1378,6 +1413,9 @@ impl Metrics {
idempotency_cache_size,
query_coalesce_subscribers_total,
query_coalesce_hits_total,
query_plan_narrowable_total,
query_plan_fanout_size,
query_plan_narrowing_ratio,
}
}
@ -2152,6 +2190,22 @@ impl Metrics {
self.query_coalesce_hits_total.inc();
}
// ── §13.4 Query planner metrics ──
pub fn inc_query_plan_narrowable(&self, narrowed: bool) {
self.query_plan_narrowable_total
.with_label_values(&[if narrowed { "yes" } else { "no" }])
.inc();
}
pub fn observe_query_plan_fanout(&self, size: u32) {
self.query_plan_fanout_size.observe(size as f64);
}
pub fn set_query_plan_narrowing_ratio(&self, ratio: f64) {
self.query_plan_narrowing_ratio.set(ratio);
}
pub fn registry(&self) -> &Registry {
&self.registry
}

View file

@ -9,7 +9,10 @@ use miroir_core::{
config::UnavailableShardPolicy,
merger::{MergeStrategy, ScoreMergeStrategy},
multi_search::{MultiSearchExecutor, MultiSearchResponse, SearchResultData},
scatter::{dfs_query_then_fetch_search, plan_search_scatter, NodeClient, SearchRequest},
query_planner::QueryPlanner,
scatter::{
dfs_query_then_fetch_search, plan_search_scatter_with_narrowing, NodeClient, SearchRequest,
},
topology::Topology,
};
use serde::{Deserialize, Serialize};
@ -30,6 +33,7 @@ pub struct MultiSearchState {
pub metrics: crate::middleware::Metrics,
pub alias_registry: Arc<miroir_core::alias::AliasRegistry>,
pub replica_selector: Arc<miroir_core::replica_selection::ReplicaSelector>,
pub query_planner: Arc<miroir_core::query_planner::QueryPlanner>,
}
/// Multi-search request (plan §13.11).
@ -282,13 +286,48 @@ where
None
};
// Plan scatter for this query
let plan = plan_search_scatter(
// Use query planner to narrow target shards (plan §13.4)
let filter_str = query.filter.as_ref().and_then(|v| {
if v.is_null()
|| v.is_string() && v.as_str().map(|s| s.is_empty()).unwrap_or(false)
{
None
} else {
serde_json::to_string(v).ok()
}
});
let query_plan = state
.query_planner
.plan(&query.index_uid, &filter_str, config.shards)
.await;
// Record query planner metrics
state.metrics.inc_query_plan_narrowable(query_plan.narrowed);
if query_plan.narrowed {
state
.metrics
.observe_query_plan_fanout(query_plan.target_shards.len() as u32);
let ratio = query_plan.target_shards.len() as f64 / config.shards as f64;
state.metrics.set_query_plan_narrowing_ratio(ratio);
} else {
state.metrics.observe_query_plan_fanout(config.shards);
state.metrics.set_query_plan_narrowing_ratio(1.0);
}
// Plan scatter with narrowed target shards
let target_shards = if query_plan.narrowed {
Some(query_plan.target_shards)
} else {
None
};
let plan = plan_search_scatter_with_narrowing(
&topology,
0,
config.replication_factor as usize,
config.shards,
replica_selector_ref,
target_shards,
)
.await;