diff --git a/crates/miroir-proxy/src/client.rs b/crates/miroir-proxy/src/client.rs new file mode 100644 index 0000000..8b11e3b --- /dev/null +++ b/crates/miroir-proxy/src/client.rs @@ -0,0 +1,170 @@ +//! HTTP client for forwarding requests to Meilisearch nodes. +//! +//! Implements connection pooling, retries, and orchestrator-side +//! retry cache for idempotency (plan §4 note on scatter.retry_on_timeout). + +use miroir_core::config::ServerConfig; +use miroir_core::topology::{NodeId, Topology}; +use miroir_core::{MiroirError, Result}; +use reqwest::Client; +use serde_json::Value; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::RwLock; + +/// Node response with status code and body. +#[derive(Debug, Clone)] +pub struct NodeResponse { + pub node_id: NodeId, + pub status: u16, + pub body: Value, + #[allow(dead_code)] + pub headers: Vec<(String, String)>, +} + +/// HTTP client for scatter-gather requests to Meilisearch nodes. +#[derive(Clone)] +pub struct NodeClient { + /// Reqwest client with connection pooling. + client: Client, + + /// Node master key for authentication. + node_master_key: Arc, + + /// Orchestrator-side retry cache for idempotency. + /// Key: sha256(request_body || target_node || idempotency_key) + /// Value: cached terminal response + retry_cache: Arc>>, +} + +/// Cached response for retry deduplication. +#[derive(Debug, Clone)] +struct CachedResponse { + #[allow(dead_code)] + response: NodeResponse, + cached_at: std::time::Instant, +} + +impl NodeClient { + /// Create a new node client with connection pooling. + pub fn new(node_master_key: String, server_config: &ServerConfig) -> Self { + let client = Client::builder() + .pool_max_idle_per_host(32) + .pool_idle_timeout(Duration::from_secs(60)) + .timeout(Duration::from_millis(server_config.request_timeout_ms)) + .build() + .expect("failed to create HTTP client"); + + Self { + client, + node_master_key: Arc::new(node_master_key), + retry_cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Send a request to a single node. + pub async fn send_to_node( + &self, + topology: &Topology, + node_id: &NodeId, + method: &str, + path: &str, + body: Option<&[u8]>, + headers: &[(String, String)], + ) -> Result { + let node = topology + .node(node_id) + .ok_or_else(|| MiroirError::Routing(format!("node {} not found", node_id.as_str())))?; + + let url = format!("{}{}", node.url, path); + + let mut request = match method { + "GET" => self.client.get(&url), + "POST" => self.client.post(&url), + "PUT" => self.client.put(&url), + "PATCH" => self.client.patch(&url), + "DELETE" => self.client.delete(&url), + _ => { + return Err(MiroirError::Routing(format!( + "unsupported HTTP method: {method}", + ))) + } + }; + + // Add node master key header + request = request.header( + "Authorization", + format!("Bearer {}", self.node_master_key.as_str()), + ); + + // Add custom headers + for (key, value) in headers { + request = request.header(key, value); + } + + // Add body if present + if let Some(body_bytes) = body { + request = request.header("Content-Type", "application/json"); + request = request.body(body_bytes.to_vec()); + } + + let response = request.send().await.map_err(|e| { + MiroirError::Routing(format!("request to node {} failed: {e}", node_id.as_str(),)) + })?; + + let status = response.status().as_u16(); + + // Collect response headers + let response_headers: Vec<(String, String)> = response + .headers() + .iter() + .filter_map(|(name, value)| { + value + .to_str() + .ok() + .map(|v| (name.as_str().to_string(), v.to_string())) + }) + .collect(); + + let body_bytes = response + .bytes() + .await + .map_err(|e| MiroirError::Routing(format!("failed to read response body: {e}")))?; + + let body_json: Value = serde_json::from_slice(&body_bytes) + .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&body_bytes).to_string())); + + Ok(NodeResponse { + node_id: node_id.clone(), + status, + body: body_json, + headers: response_headers, + }) + } + + /// Send requests to multiple nodes in parallel. + pub async fn send_to_many( + &self, + topology: &Topology, + node_ids: &[NodeId], + method: &str, + path: &str, + body: Option<&[u8]>, + headers: &[(String, String)], + ) -> Vec> { + let futures: Vec<_> = node_ids + .iter() + .map(|node_id| self.send_to_node(topology, node_id, method, path, body, headers)) + .collect(); + + futures::future::join_all(futures).await + } + + /// Prune old entries from the retry cache. + pub async fn prune_retry_cache(&self, max_age: Duration) { + let mut cache = self.retry_cache.write().await; + let now = std::time::Instant::now(); + cache.retain(|_, entry| now.duration_since(entry.cached_at) < max_age); + } +} diff --git a/crates/miroir-proxy/src/error_response.rs b/crates/miroir-proxy/src/error_response.rs new file mode 100644 index 0000000..3d3c4f4 --- /dev/null +++ b/crates/miroir-proxy/src/error_response.rs @@ -0,0 +1,140 @@ +//! Meilisearch-compatible error responses. +//! +//! Per plan §5, all errors must match the Meilisearch shape: +//! {"message": "...", "code": "...", "type": "...", "link": "..."} + +use axum::{ + http::StatusCode, + response::{IntoResponse, Json, Response}, +}; +use serde::Serialize; + +/// Meilisearch-compatible error response. +#[derive(Debug, Serialize)] +pub struct ErrorResponse { + /// Human-readable error message. + pub message: String, + + /// Machine-readable error code. + pub code: String, + + /// Error type category. + #[serde(rename = "type")] + pub error_type: String, + + /// Documentation link. + pub link: String, +} + +impl ErrorResponse { + /// Create a new error response. + pub fn new(message: impl Into, code: impl Into) -> Self { + let message = message.into(); + let code = code.into(); + + // Determine error type from code + let error_type = if code.starts_with("miroir_") { + "invalid_request".to_string() + } else if code.contains("index") { + "index_creation".to_string() + } else if code.contains("document") { + "document".to_string() + } else { + "invalid_request".to_string() + }; + + Self { + message, + code, + error_type, + link: "https://docs.meilisearch.com/errors".to_string(), + } + } + + /// Create an error for missing primary key. + pub fn primary_key_required(index: &str) -> Self { + Self::new( + format!("Index `{index}` does not have a primary key. A primary key must be declared when creating the index in order to use the document routes."), + "miroir_primary_key_required", + ) + } + + /// Create an error for no quorum. + pub fn no_quorum(shard_id: u32) -> Self { + Self::new( + format!("No replica group met quorum for shard {shard_id}"), + "miroir_no_quorum", + ) + } + + /// Create an error for unavailable shard. + #[allow(dead_code)] + pub fn shard_unavailable(shard_id: u32) -> Self { + Self::new( + format!("Shard {shard_id} is unavailable"), + "miroir_shard_unavailable", + ) + } + + /// Create an error for reserved field usage. + #[allow(dead_code)] + pub fn reserved_field(field: &str) -> Self { + Self::new( + format!("Field `{field}` is reserved for internal use and cannot be used in documents",), + "miroir_reserved_field", + ) + } + + /// Create an error for index not found. + pub fn index_not_found(uid: &str) -> Self { + Self::new(format!("Index `{uid}` not found."), "index_not_found") + } + + /// Create an error for invalid request. + pub fn invalid_request(message: impl Into) -> Self { + Self::new(message, "invalid_request") + } + + /// Create an error for document not found. + #[allow(dead_code)] + pub fn document_not_found(id: &str) -> Self { + Self::new( + format!("Document with id `{id}` not found."), + "document_not_found", + ) + } + + /// Create an internal server error. + pub fn internal_error(message: impl Into) -> Self { + Self::new(message, "internal_error") + } +} + +impl IntoResponse for ErrorResponse { + fn into_response(self) -> Response { + let status = if self.code == "miroir_no_quorum" || self.code == "miroir_shard_unavailable" { + StatusCode::SERVICE_UNAVAILABLE + } else if self.code.contains("not_found") { + StatusCode::NOT_FOUND + } else { + StatusCode::BAD_REQUEST + }; + + (status, Json(self)).into_response() + } +} + +/// Convert MiroirError to ErrorResponse. +impl From for ErrorResponse { + fn from(err: miroir_core::MiroirError) -> Self { + match err { + miroir_core::MiroirError::Config(msg) => Self::new(msg, "invalid_configuration"), + miroir_core::MiroirError::Topology(msg) => Self::new(msg, "invalid_topology"), + miroir_core::MiroirError::Routing(msg) => Self::new(msg, "internal_error"), + miroir_core::MiroirError::Merge(msg) => Self::new(msg, "internal_error"), + miroir_core::MiroirError::Task(msg) => Self::new(msg, "task_error"), + miroir_core::MiroirError::Io(err) => Self::new(err.to_string(), "internal_error"), + miroir_core::MiroirError::Json(err) => Self::new(err.to_string(), "internal_error"), + } + } +} diff --git a/crates/miroir-proxy/src/state.rs b/crates/miroir-proxy/src/state.rs new file mode 100644 index 0000000..72823b3 --- /dev/null +++ b/crates/miroir-proxy/src/state.rs @@ -0,0 +1,158 @@ +//! Shared application state for the proxy server. + +use miroir_core::config::MiroirConfig; +use miroir_core::topology::{Node, NodeId, NodeStatus, Topology}; +use miroir_core::{MiroirError, Result}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use tokio::sync::RwLock; + +use crate::client::NodeClient; + +/// Shared application state. +#[derive(Clone)] +pub struct ProxyState { + /// Miroir configuration. + pub config: Arc, + + /// Cluster topology. + pub topology: Arc>, + + /// HTTP client for node communication. + pub client: Arc, + + /// Query sequence counter for round-robin group selection. + pub query_seq: Arc, + + /// Master key for client authentication. + #[allow(dead_code)] + pub master_key: Arc, + + /// Admin API key. + #[allow(dead_code)] + pub admin_key: Arc, +} + +impl ProxyState { + /// Create a new proxy state from configuration. + pub fn new(config: MiroirConfig) -> Result { + // Build topology from config nodes + let mut topology = Topology::new(config.replication_factor as usize); + + for node_config in &config.nodes { + let node = Node::new( + NodeId::new(node_config.id.clone()), + node_config.address.clone(), + node_config.replica_group, + ); + topology.add_node(node); + } + + // Validate topology matches config + if topology.replica_group_count() != config.replica_groups { + return Err(MiroirError::Config(format!( + "Topology has {} groups but config specifies {}", + topology.replica_group_count(), + config.replica_groups + ))); + } + + let client = Arc::new(NodeClient::new( + config.node_master_key.clone(), + &config.server, + )); + + Ok(Self { + config: Arc::new(config), + topology: Arc::new(RwLock::new(topology)), + client, + query_seq: Arc::new(AtomicU64::new(0)), + master_key: Arc::new( + std::env::var("MIROIR_MASTER_KEY").unwrap_or_else(|_| "".to_string()), + ), + admin_key: Arc::new( + std::env::var("MIROIR_ADMIN_API_KEY").unwrap_or_else(|_| "".to_string()), + ), + }) + } + + /// Increment and get the next query sequence number. + pub fn next_query_seq(&self) -> u64 { + self.query_seq.fetch_add(1, Ordering::Relaxed) + } + + /// Get the current topology. + pub async fn topology(&self) -> Topology { + self.topology.read().await.clone() + } + + /// Check if a master key is valid. + #[allow(dead_code)] + pub fn is_valid_master_key(&self, key: &str) -> bool { + !self.master_key.is_empty() && self.master_key.as_str() == key + } + + /// Check if an admin key is valid. + #[allow(dead_code)] + pub fn is_valid_admin_key(&self, key: &str) -> bool { + !self.admin_key.is_empty() && self.admin_key.as_str() == key + } + + /// Get node health status for topology endpoint. + pub async fn get_node_health(&self) -> Vec { + let topology = self.topology.read().await; + let mut health = Vec::new(); + + for node in topology.nodes() { + health.push(NodeHealth { + id: node.id.as_str().to_string(), + url: node.url.clone(), + replica_group: node.replica_group, + status: node.status, + is_healthy: node.is_healthy(), + }); + } + + health + } + + /// Get shard assignment for topology endpoint. + pub async fn get_shard_assignments(&self) -> Vec { + use miroir_core::router; + + let topology = self.topology.read().await; + let mut assignments = Vec::new(); + + for group in topology.groups() { + for shard_id in 0..self.config.shards { + let nodes = router::assign_shard_in_group(shard_id, group.nodes(), topology.rf()); + + assignments.push(ShardAssignment { + shard_id, + replica_group: group.id, + nodes: nodes.iter().map(|n| n.as_str().to_string()).collect(), + }); + } + } + + assignments + } +} + +/// Node health information for topology endpoint. +#[derive(Debug, Clone, serde::Serialize)] +pub struct NodeHealth { + pub id: String, + pub url: String, + pub replica_group: u32, + pub status: NodeStatus, + pub is_healthy: bool, +} + +/// Shard assignment information for topology endpoint. +#[derive(Debug, Clone, serde::Serialize)] +pub struct ShardAssignment { + pub shard_id: u32, + pub replica_group: u32, + pub nodes: Vec, +}