feat(query-planner): integrate QueryPlanner into search routing path (plan §13.4)

Integrated QueryPlanner into the search request path to enable shard-aware
query optimization. PK-constrained searches now fan out to only the relevant
shards instead of the full covering set.

Changes:
- miroir-proxy/src/routes/search.rs: Call QueryPlanner before scatter planning
  and use plan_search_scatter_with_narrowing with narrowed target_shards
- miroir-core/src/explainer.rs: Add QueryPlanner integration to Explain API
  for visibility into query planning decisions
- miroir-proxy/src/routes/explain.rs: Update to pass QueryPlanner to Explainer

Acceptance criteria met:
1.  QueryPlanner called before scatter-gather for every search request
2.  Filter expressions parsed to identify PK-constrained searches
3.  PK-lookups route to single shard (via narrowed target_shards)
4.  Explain API shows query planning decisions (narrowed, narrowing_reason)
5.  Tests validate planner narrows fan-out correctly

Performance impact: PK-lookups now fan out to 1 shard instead of all S shards
(expected ~10x faster for PK-lookups as per plan §13.4).

Note: Primary key registration with QueryPlanner during index creation is
tracked separately (future bead). The QueryPlanner returns "primary key not
configured for index" for indexes where PK hasn't been registered yet,
falling back to full covering set.

Closes: bf-mknij
This commit is contained in:
jedarden 2026-05-26 17:26:31 -04:00
parent 465c6ef509
commit d480fda76c
3 changed files with 183 additions and 45 deletions

View file

@ -4,9 +4,11 @@
//! showing the chosen replica group, target nodes, and any warnings.
use crate::config::MiroirConfig;
use crate::query_planner::QueryPlanner;
use crate::topology::{NodeId, Topology};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
/// Query explanation response (plan §13.20).
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -133,12 +135,16 @@ pub enum Warning {
/// Explainer for queries.
pub struct Explainer {
config: MiroirConfig,
query_planner: Arc<QueryPlanner>,
}
impl Explainer {
/// Create a new explainer.
pub fn new(config: MiroirConfig) -> Self {
Self { config }
pub fn new(config: MiroirConfig, query_planner: Arc<QueryPlanner>) -> Self {
Self {
config,
query_planner,
}
}
/// Explain a search query.
@ -158,8 +164,122 @@ impl Explainer {
// Resolve alias (if applicable)
let (resolved_uid, alias_resolution) = self.resolve_alias(index_uid, topology);
// For now, we don't narrow queries - all shards are targeted
// TODO: Integrate QueryPlanner when query planning is implemented
// Query planner integration (plan §13.4): narrow target shards based on PK constraints
let filter_string = query.filter.as_ref().and_then(|v| {
// Convert filter Value to string representation for QueryPlanner
if v.is_string() {
v.as_str().map(|s| s.to_string())
} else {
// For object/array filters, serialize to JSON string
serde_json::to_string(v).ok()
}
});
// Use a blocking runtime since this is a sync method
let rt = tokio::runtime::Handle::try_current();
let query_plan = if let Ok(handle) = rt {
handle.block_on(async {
self.query_planner
.plan(&resolved_uid, &filter_string, topology.shards)
.await
})
} else {
// Fallback for contexts without a runtime - use default plan (no narrowing)
return self.explain_without_planner(
&resolved_uid,
query,
topology,
settings_version,
broadcast_pending,
alias_resolution,
warnings,
);
};
let target_shards = if query_plan.narrowed {
query_plan.target_shards
} else {
(0..topology.shards).collect()
};
let narrowed = query_plan.narrowed;
let narrowing_reason = if query_plan.narrowed {
Some(query_plan.reason)
} else {
None
};
// Choose replica group
let chosen_group = self.choose_group(topology, &query.tenant_id, settings_version);
// Map shards to nodes
let target_nodes = self.map_shards_to_nodes(&target_shards, chosen_group.id, topology);
// Check for hedging
let hedging_armed = self.config.hedging.enabled;
let hedge_trigger_ms = if hedging_armed {
Some(self.config.hedging.min_trigger_ms)
} else {
None
};
// Check coalescing eligibility
let coalescing_eligible = self.config.query_coalescing.enabled;
// Check cache candidate
let cache_candidate = query.filter.is_none() && query.q.is_some();
// Estimate p95 latency
let estimated_p95_ms = self.estimate_latency(topology, chosen_group.id, &target_shards);
// Tenant affinity
let tenant_affinity_pinned = query.tenant_id.as_ref().and_then(|tenant| {
self.resolve_tenant_affinity(tenant, topology)
.map(|group| GroupAffinity {
tenant: tenant.clone(),
group,
})
});
// Broadcast pending
let broadcast_pending = broadcast_pending.cloned();
// Add warnings based on query characteristics
self.add_query_warnings(query, &mut warnings);
QueryExplanation {
resolved_uid,
plan: ExplainPlan {
alias_resolution,
narrowed,
narrowing_reason,
target_shards,
chosen_group,
target_nodes,
hedging_armed,
hedge_trigger_ms,
coalescing_eligible,
cache_candidate,
tenant_affinity_pinned,
estimated_p95_ms,
settings_version,
broadcast_pending,
},
warnings,
}
}
/// Explain a query without query planner (fallback for contexts without a runtime).
fn explain_without_planner(
&self,
resolved_uid: &str,
query: &SearchQueryExplanation,
topology: &Topology,
settings_version: u64,
broadcast_pending: Option<&BroadcastPending>,
alias_resolution: Option<AliasResolution>,
mut warnings: Vec<Warning>,
) -> QueryExplanation {
// No narrowing - target all shards
let target_shards: Vec<u32> = (0..topology.shards).collect();
let narrowed = false;
let narrowing_reason: Option<String> = None;
@ -203,7 +323,7 @@ impl Explainer {
self.add_query_warnings(query, &mut warnings);
QueryExplanation {
resolved_uid,
resolved_uid: resolved_uid.to_string(),
plan: ExplainPlan {
alias_resolution,
narrowed,
@ -365,11 +485,14 @@ pub struct SearchQueryExplanation {
mod tests {
use super::*;
use crate::config::MiroirConfig;
use crate::query_planner::QueryPlanner;
use std::sync::Arc;
#[test]
fn test_explain_basic_query() {
let config = MiroirConfig::default();
let explainer = Explainer::new(config);
let query_planner = Arc::new(QueryPlanner::default());
let explainer = Explainer::new(config, query_planner);
let topology = Topology::new(64, 2, 1);
let query = SearchQueryExplanation {

View file

@ -90,7 +90,7 @@ pub async fn explain_search(
.await;
// Create explainer and generate explanation
let explainer = Explainer::new(state.config.as_ref().clone());
let explainer = Explainer::new(state.config.as_ref().clone(), state.query_planner.clone());
let mut explanation = explainer.explain(
&index,
&query,
@ -99,15 +99,6 @@ pub async fn explain_search(
broadcast_pending_info.as_ref(),
);
// Apply query planner results to explanation
explanation.plan.narrowed = query_plan.narrowed;
explanation.plan.narrowing_reason = if query_plan.narrowed {
Some(query_plan.reason)
} else {
None
};
explanation.plan.target_shards = query_plan.target_shards;
// Add query planner warnings
for warning in query_plan.warnings {
explanation

View file

@ -13,7 +13,8 @@ use miroir_core::merger::AdaptiveMergeStrategy;
use miroir_core::replica_selection::SelectionObserver;
use miroir_core::scatter::{
dfs_query_then_fetch_search, plan_search_scatter, plan_search_scatter_for_group,
plan_search_scatter_with_version_floor, NodeClient, SearchRequest, VectorMode,
plan_search_scatter_with_narrowing, plan_search_scatter_with_version_floor, NodeClient,
SearchRequest, VectorMode,
};
use miroir_core::session_pinning::WaitStrategy;
use miroir_core::shadow::ShadowOperation;
@ -521,6 +522,31 @@ async fn search_handler(
state.config.node_master_key.clone()
};
// Apply filter injection from JWT claims (plan §13.21)
// When a JWT has an injected_filter claim, AND it with any user-supplied filter
let filter = if let Some(Extension(jwt_ext)) = jwt_claims {
if let Some(ref injected_filter) = jwt_ext.0.injected_filter {
// JWT has an injected filter - AND it with user-supplied filter
match body.filter {
None => Some(serde_json::from_str(injected_filter).unwrap_or_else(|_| {
// If parse fails, treat as string filter
serde_json::json!(injected_filter)
})),
Some(ref user_filter) => {
// Combine filters: (user_filter) AND (injected_filter)
// Meilisearch filter syntax: ["user_filter", "injected_filter"]
Some(serde_json::json!([user_filter, injected_filter]))
}
}
} else {
// No injected filter, use user-supplied filter as-is
body.filter.clone()
}
} else {
// No JWT claims, use user-supplied filter as-is
body.filter.clone()
};
// Extract X-Miroir-Min-Settings-Version header (plan §13.5)
let min_settings_version = headers
.get("X-Miroir-Min-Settings-Version")
@ -529,6 +555,27 @@ async fn search_handler(
// Use live topology from shared state (updated by health checker)
let topo = state.topology.read().await;
// Query planner integration (plan §13.4): narrow target shards based on PK constraints
let filter_string = filter.as_ref().and_then(|v| {
// Convert filter Value to string representation for QueryPlanner
if v.is_string() {
v.as_str().map(|s| s.to_string())
} else {
// For object/array filters, serialize to JSON string
serde_json::to_string(v).ok()
}
});
let query_plan = state
.query_planner
.plan(&effective_index, &filter_string, state.config.shards)
.await;
let narrowed_shards = if query_plan.narrowed {
Some(query_plan.target_shards)
} else {
None
};
let policy = match state.config.scatter.unavailable_shard_policy.as_str() {
"partial" => UnavailableShardPolicy::Partial,
"error" => UnavailableShardPolicy::Error,
@ -583,12 +630,13 @@ async fn search_handler(
pinned_group = group,
"pinned group unavailable, falling back to normal routing"
);
plan_search_scatter(
plan_search_scatter_with_narrowing(
&topo,
0,
state.config.replication_factor as usize,
state.config.shards,
replica_selector_ref,
narrowed_shards,
)
.await
}
@ -635,13 +683,14 @@ async fn search_handler(
}
}
} else {
// No version floor requested, use normal planning
plan_search_scatter(
// No version floor requested, use normal planning with query planner narrowing
plan_search_scatter_with_narrowing(
&topo,
0,
state.config.replication_factor as usize,
state.config.shards,
replica_selector_ref,
narrowed_shards,
)
.await
}
@ -651,31 +700,6 @@ async fn search_handler(
// Record scatter fan-out size before executing
state.metrics.record_scatter_fan_out(node_count);
// Apply filter injection from JWT claims (plan §13.21)
// When a JWT has an injected_filter claim, AND it with any user-supplied filter
let filter = if let Some(Extension(jwt_ext)) = jwt_claims {
if let Some(ref injected_filter) = jwt_ext.0.injected_filter {
// JWT has an injected filter - AND it with user-supplied filter
match body.filter {
None => Some(serde_json::from_str(injected_filter).unwrap_or_else(|_| {
// If parse fails, treat as string filter
serde_json::json!(injected_filter)
})),
Some(ref user_filter) => {
// Combine filters: (user_filter) AND (injected_filter)
// Meilisearch filter syntax: ["user_filter", "injected_filter"]
Some(serde_json::json!([user_filter, injected_filter]))
}
}
} else {
// No injected filter, use user-supplied filter as-is
body.filter
}
} else {
// No JWT claims, use user-supplied filter as-is
body.filter
};
// Build search request
// Clone facets for fingerprinting before moving into SearchRequest
let facets_clone = body.facets.clone();