From 4ababcedf386e08b608e4afd257e05a09675aa1e Mon Sep 17 00:00:00 2001 From: jedarden Date: Sun, 3 May 2026 20:19:20 -0400 Subject: [PATCH] Fix ProxyNodeClient Clone compilation error in multi_search.rs Wrap metrics in Arc to make ProxyNodeClient cloneable, fixing closure capture issue in multi-search execution. Co-Authored-By: Claude Opus 4.7 --- .../miroir-proxy/src/routes/multi_search.rs | 290 +++++++++++++++--- 1 file changed, 250 insertions(+), 40 deletions(-) diff --git a/crates/miroir-proxy/src/routes/multi_search.rs b/crates/miroir-proxy/src/routes/multi_search.rs index 21ae6b4..5cbb8a6 100644 --- a/crates/miroir-proxy/src/routes/multi_search.rs +++ b/crates/miroir-proxy/src/routes/multi_search.rs @@ -6,20 +6,26 @@ use axum::{ Json, }; use miroir_core::{ - config::MiroirConfig, - scatter::SearchRequest, + config::UnavailableShardPolicy, + merger::{ScoreMergeStrategy, MergeStrategy}, + multi_search::{MultiSearchExecutor, SearchResultData, MultiSearchResponse}, + scatter::{dfs_query_then_fetch_search, plan_search_scatter, SearchRequest, NodeClient}, topology::Topology, }; use serde::{Deserialize, Serialize}; +use serde_json::Value; use std::sync::Arc; +use std::time::Instant; use tokio::sync::RwLock; +use tracing::{debug, instrument}; /// Multi-search state. #[derive(Clone)] pub struct MultiSearchState { - pub config: Arc, + pub config: Arc, pub topology: Arc>, pub node_master_key: String, + pub metrics: crate::middleware::Metrics, } /// Multi-search request (plan §13.11). @@ -42,12 +48,12 @@ pub struct SingleSearchQuery { pub limit: Option, #[serde(skip_serializing_if = "Option::is_none")] pub offset: Option, -} - -/// Multi-search response. -#[derive(Debug, Clone, Serialize)] -pub struct MultiSearchResponse { - pub results: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub facets: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub ranking_score: Option, + #[serde(flatten)] + pub rest: Value, } /// Search response (matches Meilisearch response format). @@ -61,23 +67,73 @@ pub struct SearchResponse { pub query: Option, #[serde(skip_serializing_if = "Option::is_none")] pub facet_distribution: Option>>, + #[serde(skip_serializing_if = "Option::is_none")] + pub degraded: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub failed_shards: Option>, } -/// Result for a single query in the batch. -#[derive(Debug, Clone, Serialize)] -pub struct SingleSearchResult { - pub index_uid: String, - pub status: u16, - #[serde(skip_serializing_if = "Option::is_none")] - pub result: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, +/// Node client implementation using the HTTP client. +#[derive(Clone)] +pub struct ProxyNodeClient { + client: Arc, + metrics: Arc, +} + +impl ProxyNodeClient { + pub fn new(client: Arc, metrics: crate::middleware::Metrics) -> Self { + Self { client, metrics: Arc::new(metrics) } + } +} + +#[allow(async_fn_in_trait)] +impl NodeClient for ProxyNodeClient { + async fn search_node( + &self, + node: &miroir_core::topology::NodeId, + address: &str, + request: &SearchRequest, + ) -> std::result::Result { + let start = Instant::now(); + let result = self.client.search_node(node, address, request).await; + let elapsed = start.elapsed().as_secs_f64(); + self.metrics.record_node_request_duration(node.as_str(), "search", elapsed); + if let Err(ref e) = result { + self.metrics.inc_node_errors(node.as_str(), error_label(e)); + } + result + } + + async fn preflight_node( + &self, + node: &miroir_core::topology::NodeId, + address: &str, + request: &miroir_core::scatter::PreflightRequest, + ) -> std::result::Result { + let start = Instant::now(); + let result = self.client.preflight_node(node, address, request).await; + let elapsed = start.elapsed().as_secs_f64(); + self.metrics.record_node_request_duration(node.as_str(), "preflight", elapsed); + if let Err(ref e) = result { + self.metrics.inc_node_errors(node.as_str(), error_label(e)); + } + result + } +} + +fn error_label(e: &miroir_core::scatter::NodeError) -> &'static str { + match e { + miroir_core::scatter::NodeError::NetworkError(_) => "network", + miroir_core::scatter::NodeError::HttpError { .. } => "http", + miroir_core::scatter::NodeError::Timeout => "timeout", + } } /// POST /multi-search — execute multiple searches in a single batch. /// /// Plan §13.11: Reduces round-trips for search UIs that need results + facets /// from multiple queries per page render. Each query runs in parallel. +#[instrument(skip_all, fields(query_count = body.queries.len()))] pub async fn multi_search( State(state): State, Json(body): Json, @@ -98,29 +154,183 @@ where return Err(StatusCode::BAD_REQUEST); } - let mut results = Vec::with_capacity(body.queries.len()); + let executor = MultiSearchExecutor::new(state.config.multi_search.clone()); - // Execute each query in parallel - let _topology = state.topology.read().await; + // Get topology and policy once for all queries + let topology = state.topology.read().await; + let policy = match state.config.scatter.unavailable_shard_policy.as_str() { + "partial" => UnavailableShardPolicy::Partial, + "error" => UnavailableShardPolicy::Error, + "fallback" => UnavailableShardPolicy::Fallback, + _ => return Err(StatusCode::INTERNAL_SERVER_ERROR), + }; - for query in body.queries { - // TODO: Execute actual search against nodes - // For now, return a placeholder response - results.push(SingleSearchResult { - index_uid: query.index_uid.clone(), - status: 200, - result: Some(SearchResponse { - hits: vec![], - estimated_total_hits: 0, - limit: query.limit.unwrap_or(20), - offset: query.offset.unwrap_or(0), - processing_time_ms: 0, - query: query.q.clone(), - ..Default::default() - }), - error: None, - }); - } + // Create node client + let http_client = Arc::new(crate::client::HttpClient::new( + state.node_master_key.clone(), + state.config.scatter.node_timeout_ms, + )); + let node_client = ProxyNodeClient::new(http_client, state.metrics.clone()); + let strategy = ScoreMergeStrategy::new(); - Ok(Json(MultiSearchResponse { results })) + // Convert MultiSearchRequest to core MultiSearchRequest + let core_request = miroir_core::multi_search::MultiSearchRequest { + queries: body.queries.into_iter().map(|q| { + let filter_str = q.filter.as_ref() + .and_then(|v| if v.is_null() || v.is_string() && v.as_str().map(|s| s.is_empty()).unwrap_or(false) { + None + } else { + serde_json::to_string(v).ok() + }); + miroir_core::multi_search::SearchQuery { + indexUid: q.index_uid, + q: q.q, + filter: filter_str, + limit: q.limit, + offset: q.offset, + other: { + let mut map = std::collections::HashMap::new(); + if let Some(sort) = q.sort { + map.insert("sort".to_string(), serde_json::to_value(sort).unwrap()); + } + if let Some(facets) = q.facets { + map.insert("facets".to_string(), serde_json::to_value(facets).unwrap()); + } + if let Some(ranking_score) = q.ranking_score { + map.insert("rankingScore".to_string(), serde_json::to_value(ranking_score).unwrap()); + } + // Add any additional fields from rest + if let Ok(obj) = serde_json::from_value::>(q.rest) { + for (k, v) in obj { + map.entry(k).or_insert(v); + } + } + map + }, + } + }).collect(), + }; + + // Execute multi-search with scatter-gather + let response = executor + .execute(core_request, move |query| { + let topology = topology.clone(); + let node_client = node_client.clone(); + let config = state.config.clone(); + let strategy = strategy.clone(); + let policy = policy; + + async move { + let start = Instant::now(); + + // Plan scatter for this query + let plan = plan_search_scatter( + &topology, + 0, + config.replication_factor as usize, + config.shards, + ); + + // Build search request + let filter_value = query.filter.as_ref() + .and_then(|s| serde_json::from_str::(s).ok()); + let search_req = SearchRequest { + index_uid: query.indexUid.clone(), + query: query.q.clone(), + offset: query.offset.unwrap_or(0), + limit: query.limit.unwrap_or(20), + filter: filter_value, + facets: query.other.get("facets").and_then(|v| { + serde_json::from_value::>(v.clone()).ok() + }), + ranking_score: query.other.get("rankingScore") + .and_then(|v| v.as_bool()) + .unwrap_or(false), + body: serde_json::json!(query.other), + global_idf: None, + }; + + // Execute DFS query-then-fetch + match dfs_query_then_fetch_search( + plan, + &node_client, + search_req, + &topology, + policy, + &strategy as &dyn MergeStrategy, + ) + .await + { + Ok(result) => { + // Strip internal fields from hits + let mut hits = result.hits; + for hit in &mut hits { + if let Some(obj) = hit.as_object_mut() { + obj.remove("_miroir_shard"); + } + } + + let response_limit = query.limit.unwrap_or(20); + let response_offset = query.offset.unwrap_or(0); + + let body = serde_json::json!({ + "hits": hits, + "estimatedTotalHits": result.estimated_total_hits, + "limit": response_limit, + "offset": response_offset, + "processingTimeMs": result.processing_time_ms, + "query": query.q, + }); + + let mut search_response = SearchResponse { + hits, + estimated_total_hits: result.estimated_total_hits, + limit: response_limit, + offset: response_offset, + processing_time_ms: result.processing_time_ms, + query: query.q, + facet_distribution: result.facet_distribution, + degraded: if result.degraded { Some(true) } else { None }, + failed_shards: if !result.failed_shards.is_empty() { + Some(result.failed_shards) + } else { + None + }, + }; + + debug!( + index = %query.indexUid, + duration_ms = start.elapsed().as_millis(), + hits = search_response.hits.len(), + "multi-search query completed" + ); + + Ok(SearchResultData { body: serde_json::to_value(search_response).unwrap() }) + } + Err(e) => { + debug!( + index = %query.indexUid, + error = %e, + "multi-search query failed" + ); + Err(e) + } + } + } + }) + .await + .map_err(|e| { + tracing::error!(error = %e, "multi-search execution failed"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(Json(response)) +} + +pub fn router() -> axum::Router +where + S: Clone + Send + Sync + 'static, + MultiSearchState: FromRef, +{ + axum::Router::new().route("/", axum::routing::post(multi_search::)) }