diff --git a/crates/miroir-core/src/explainer.rs b/crates/miroir-core/src/explainer.rs index 3d0cf18..59754b1 100644 --- a/crates/miroir-core/src/explainer.rs +++ b/crates/miroir-core/src/explainer.rs @@ -4,9 +4,11 @@ //! showing the chosen replica group, target nodes, and any warnings. use crate::config::MiroirConfig; +use crate::query_planner::QueryPlanner; use crate::topology::{NodeId, Topology}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::sync::Arc; /// Query explanation response (plan §13.20). #[derive(Debug, Clone, Serialize, Deserialize)] @@ -133,12 +135,16 @@ pub enum Warning { /// Explainer for queries. pub struct Explainer { config: MiroirConfig, + query_planner: Arc, } impl Explainer { /// Create a new explainer. - pub fn new(config: MiroirConfig) -> Self { - Self { config } + pub fn new(config: MiroirConfig, query_planner: Arc) -> Self { + Self { + config, + query_planner, + } } /// Explain a search query. @@ -158,8 +164,122 @@ impl Explainer { // Resolve alias (if applicable) let (resolved_uid, alias_resolution) = self.resolve_alias(index_uid, topology); - // For now, we don't narrow queries - all shards are targeted - // TODO: Integrate QueryPlanner when query planning is implemented + // Query planner integration (plan §13.4): narrow target shards based on PK constraints + let filter_string = query.filter.as_ref().and_then(|v| { + // Convert filter Value to string representation for QueryPlanner + if v.is_string() { + v.as_str().map(|s| s.to_string()) + } else { + // For object/array filters, serialize to JSON string + serde_json::to_string(v).ok() + } + }); + + // Use a blocking runtime since this is a sync method + let rt = tokio::runtime::Handle::try_current(); + let query_plan = if let Ok(handle) = rt { + handle.block_on(async { + self.query_planner + .plan(&resolved_uid, &filter_string, topology.shards) + .await + }) + } else { + // Fallback for contexts without a runtime - use default plan (no narrowing) + return self.explain_without_planner( + &resolved_uid, + query, + topology, + settings_version, + broadcast_pending, + alias_resolution, + warnings, + ); + }; + + let target_shards = if query_plan.narrowed { + query_plan.target_shards + } else { + (0..topology.shards).collect() + }; + let narrowed = query_plan.narrowed; + let narrowing_reason = if query_plan.narrowed { + Some(query_plan.reason) + } else { + None + }; + + // Choose replica group + let chosen_group = self.choose_group(topology, &query.tenant_id, settings_version); + + // Map shards to nodes + let target_nodes = self.map_shards_to_nodes(&target_shards, chosen_group.id, topology); + + // Check for hedging + let hedging_armed = self.config.hedging.enabled; + let hedge_trigger_ms = if hedging_armed { + Some(self.config.hedging.min_trigger_ms) + } else { + None + }; + + // Check coalescing eligibility + let coalescing_eligible = self.config.query_coalescing.enabled; + + // Check cache candidate + let cache_candidate = query.filter.is_none() && query.q.is_some(); + + // Estimate p95 latency + let estimated_p95_ms = self.estimate_latency(topology, chosen_group.id, &target_shards); + + // Tenant affinity + let tenant_affinity_pinned = query.tenant_id.as_ref().and_then(|tenant| { + self.resolve_tenant_affinity(tenant, topology) + .map(|group| GroupAffinity { + tenant: tenant.clone(), + group, + }) + }); + + // Broadcast pending + let broadcast_pending = broadcast_pending.cloned(); + + // Add warnings based on query characteristics + self.add_query_warnings(query, &mut warnings); + + QueryExplanation { + resolved_uid, + plan: ExplainPlan { + alias_resolution, + narrowed, + narrowing_reason, + target_shards, + chosen_group, + target_nodes, + hedging_armed, + hedge_trigger_ms, + coalescing_eligible, + cache_candidate, + tenant_affinity_pinned, + estimated_p95_ms, + settings_version, + broadcast_pending, + }, + warnings, + } + } + + /// Explain a query without query planner (fallback for contexts without a runtime). + fn explain_without_planner( + &self, + resolved_uid: &str, + query: &SearchQueryExplanation, + topology: &Topology, + settings_version: u64, + broadcast_pending: Option<&BroadcastPending>, + alias_resolution: Option, + mut warnings: Vec, + ) -> QueryExplanation { + // No narrowing - target all shards let target_shards: Vec = (0..topology.shards).collect(); let narrowed = false; let narrowing_reason: Option = None; @@ -203,7 +323,7 @@ impl Explainer { self.add_query_warnings(query, &mut warnings); QueryExplanation { - resolved_uid, + resolved_uid: resolved_uid.to_string(), plan: ExplainPlan { alias_resolution, narrowed, @@ -365,11 +485,14 @@ pub struct SearchQueryExplanation { mod tests { use super::*; use crate::config::MiroirConfig; + use crate::query_planner::QueryPlanner; + use std::sync::Arc; #[test] fn test_explain_basic_query() { let config = MiroirConfig::default(); - let explainer = Explainer::new(config); + let query_planner = Arc::new(QueryPlanner::default()); + let explainer = Explainer::new(config, query_planner); let topology = Topology::new(64, 2, 1); let query = SearchQueryExplanation { diff --git a/crates/miroir-proxy/src/routes/explain.rs b/crates/miroir-proxy/src/routes/explain.rs index a5b2867..f5e429b 100644 --- a/crates/miroir-proxy/src/routes/explain.rs +++ b/crates/miroir-proxy/src/routes/explain.rs @@ -90,7 +90,7 @@ pub async fn explain_search( .await; // Create explainer and generate explanation - let explainer = Explainer::new(state.config.as_ref().clone()); + let explainer = Explainer::new(state.config.as_ref().clone(), state.query_planner.clone()); let mut explanation = explainer.explain( &index, &query, @@ -99,15 +99,6 @@ pub async fn explain_search( broadcast_pending_info.as_ref(), ); - // Apply query planner results to explanation - explanation.plan.narrowed = query_plan.narrowed; - explanation.plan.narrowing_reason = if query_plan.narrowed { - Some(query_plan.reason) - } else { - None - }; - explanation.plan.target_shards = query_plan.target_shards; - // Add query planner warnings for warning in query_plan.warnings { explanation diff --git a/crates/miroir-proxy/src/routes/search.rs b/crates/miroir-proxy/src/routes/search.rs index 3816b40..838e626 100644 --- a/crates/miroir-proxy/src/routes/search.rs +++ b/crates/miroir-proxy/src/routes/search.rs @@ -13,7 +13,8 @@ use miroir_core::merger::AdaptiveMergeStrategy; use miroir_core::replica_selection::SelectionObserver; use miroir_core::scatter::{ dfs_query_then_fetch_search, plan_search_scatter, plan_search_scatter_for_group, - plan_search_scatter_with_version_floor, NodeClient, SearchRequest, VectorMode, + plan_search_scatter_with_narrowing, plan_search_scatter_with_version_floor, NodeClient, + SearchRequest, VectorMode, }; use miroir_core::session_pinning::WaitStrategy; use miroir_core::shadow::ShadowOperation; @@ -521,6 +522,31 @@ async fn search_handler( state.config.node_master_key.clone() }; + // Apply filter injection from JWT claims (plan §13.21) + // When a JWT has an injected_filter claim, AND it with any user-supplied filter + let filter = if let Some(Extension(jwt_ext)) = jwt_claims { + if let Some(ref injected_filter) = jwt_ext.0.injected_filter { + // JWT has an injected filter - AND it with user-supplied filter + match body.filter { + None => Some(serde_json::from_str(injected_filter).unwrap_or_else(|_| { + // If parse fails, treat as string filter + serde_json::json!(injected_filter) + })), + Some(ref user_filter) => { + // Combine filters: (user_filter) AND (injected_filter) + // Meilisearch filter syntax: ["user_filter", "injected_filter"] + Some(serde_json::json!([user_filter, injected_filter])) + } + } + } else { + // No injected filter, use user-supplied filter as-is + body.filter.clone() + } + } else { + // No JWT claims, use user-supplied filter as-is + body.filter.clone() + }; + // Extract X-Miroir-Min-Settings-Version header (plan §13.5) let min_settings_version = headers .get("X-Miroir-Min-Settings-Version") @@ -529,6 +555,27 @@ async fn search_handler( // Use live topology from shared state (updated by health checker) let topo = state.topology.read().await; + + // Query planner integration (plan §13.4): narrow target shards based on PK constraints + let filter_string = filter.as_ref().and_then(|v| { + // Convert filter Value to string representation for QueryPlanner + if v.is_string() { + v.as_str().map(|s| s.to_string()) + } else { + // For object/array filters, serialize to JSON string + serde_json::to_string(v).ok() + } + }); + let query_plan = state + .query_planner + .plan(&effective_index, &filter_string, state.config.shards) + .await; + let narrowed_shards = if query_plan.narrowed { + Some(query_plan.target_shards) + } else { + None + }; + let policy = match state.config.scatter.unavailable_shard_policy.as_str() { "partial" => UnavailableShardPolicy::Partial, "error" => UnavailableShardPolicy::Error, @@ -583,12 +630,13 @@ async fn search_handler( pinned_group = group, "pinned group unavailable, falling back to normal routing" ); - plan_search_scatter( + plan_search_scatter_with_narrowing( &topo, 0, state.config.replication_factor as usize, state.config.shards, replica_selector_ref, + narrowed_shards, ) .await } @@ -635,13 +683,14 @@ async fn search_handler( } } } else { - // No version floor requested, use normal planning - plan_search_scatter( + // No version floor requested, use normal planning with query planner narrowing + plan_search_scatter_with_narrowing( &topo, 0, state.config.replication_factor as usize, state.config.shards, replica_selector_ref, + narrowed_shards, ) .await } @@ -651,31 +700,6 @@ async fn search_handler( // Record scatter fan-out size before executing state.metrics.record_scatter_fan_out(node_count); - // Apply filter injection from JWT claims (plan §13.21) - // When a JWT has an injected_filter claim, AND it with any user-supplied filter - let filter = if let Some(Extension(jwt_ext)) = jwt_claims { - if let Some(ref injected_filter) = jwt_ext.0.injected_filter { - // JWT has an injected filter - AND it with user-supplied filter - match body.filter { - None => Some(serde_json::from_str(injected_filter).unwrap_or_else(|_| { - // If parse fails, treat as string filter - serde_json::json!(injected_filter) - })), - Some(ref user_filter) => { - // Combine filters: (user_filter) AND (injected_filter) - // Meilisearch filter syntax: ["user_filter", "injected_filter"] - Some(serde_json::json!([user_filter, injected_filter])) - } - } - } else { - // No injected filter, use user-supplied filter as-is - body.filter - } - } else { - // No JWT claims, use user-supplied filter as-is - body.filter - }; - // Build search request // Clone facets for fingerprinting before moving into SearchRequest let facets_clone = body.facets.clone();