Phase 0 (miroir-qon): Add proxy infrastructure modules
Adds core proxy server modules that were previously untracked: - client.rs: HTTP client for node communication with connection pooling - state.rs: Shared application state for proxy server - error_response.rs: Meilisearch-compatible error responses These modules are foundational to the proxy server and complete the Phase 0 scaffolding requirements. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
0616828865
commit
a8e33b3bfd
3 changed files with 468 additions and 0 deletions
170
crates/miroir-proxy/src/client.rs
Normal file
170
crates/miroir-proxy/src/client.rs
Normal file
|
|
@ -0,0 +1,170 @@
|
|||
//! HTTP client for forwarding requests to Meilisearch nodes.
|
||||
//!
|
||||
//! Implements connection pooling, retries, and orchestrator-side
|
||||
//! retry cache for idempotency (plan §4 note on scatter.retry_on_timeout).
|
||||
|
||||
use miroir_core::config::ServerConfig;
|
||||
use miroir_core::topology::{NodeId, Topology};
|
||||
use miroir_core::{MiroirError, Result};
|
||||
use reqwest::Client;
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
/// Node response with status code and body.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NodeResponse {
|
||||
pub node_id: NodeId,
|
||||
pub status: u16,
|
||||
pub body: Value,
|
||||
#[allow(dead_code)]
|
||||
pub headers: Vec<(String, String)>,
|
||||
}
|
||||
|
||||
/// HTTP client for scatter-gather requests to Meilisearch nodes.
|
||||
#[derive(Clone)]
|
||||
pub struct NodeClient {
|
||||
/// Reqwest client with connection pooling.
|
||||
client: Client,
|
||||
|
||||
/// Node master key for authentication.
|
||||
node_master_key: Arc<String>,
|
||||
|
||||
/// Orchestrator-side retry cache for idempotency.
|
||||
/// Key: sha256(request_body || target_node || idempotency_key)
|
||||
/// Value: cached terminal response
|
||||
retry_cache: Arc<RwLock<HashMap<String, CachedResponse>>>,
|
||||
}
|
||||
|
||||
/// Cached response for retry deduplication.
|
||||
#[derive(Debug, Clone)]
|
||||
struct CachedResponse {
|
||||
#[allow(dead_code)]
|
||||
response: NodeResponse,
|
||||
cached_at: std::time::Instant,
|
||||
}
|
||||
|
||||
impl NodeClient {
|
||||
/// Create a new node client with connection pooling.
|
||||
pub fn new(node_master_key: String, server_config: &ServerConfig) -> Self {
|
||||
let client = Client::builder()
|
||||
.pool_max_idle_per_host(32)
|
||||
.pool_idle_timeout(Duration::from_secs(60))
|
||||
.timeout(Duration::from_millis(server_config.request_timeout_ms))
|
||||
.build()
|
||||
.expect("failed to create HTTP client");
|
||||
|
||||
Self {
|
||||
client,
|
||||
node_master_key: Arc::new(node_master_key),
|
||||
retry_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a request to a single node.
|
||||
pub async fn send_to_node(
|
||||
&self,
|
||||
topology: &Topology,
|
||||
node_id: &NodeId,
|
||||
method: &str,
|
||||
path: &str,
|
||||
body: Option<&[u8]>,
|
||||
headers: &[(String, String)],
|
||||
) -> Result<NodeResponse> {
|
||||
let node = topology
|
||||
.node(node_id)
|
||||
.ok_or_else(|| MiroirError::Routing(format!("node {} not found", node_id.as_str())))?;
|
||||
|
||||
let url = format!("{}{}", node.url, path);
|
||||
|
||||
let mut request = match method {
|
||||
"GET" => self.client.get(&url),
|
||||
"POST" => self.client.post(&url),
|
||||
"PUT" => self.client.put(&url),
|
||||
"PATCH" => self.client.patch(&url),
|
||||
"DELETE" => self.client.delete(&url),
|
||||
_ => {
|
||||
return Err(MiroirError::Routing(format!(
|
||||
"unsupported HTTP method: {method}",
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
// Add node master key header
|
||||
request = request.header(
|
||||
"Authorization",
|
||||
format!("Bearer {}", self.node_master_key.as_str()),
|
||||
);
|
||||
|
||||
// Add custom headers
|
||||
for (key, value) in headers {
|
||||
request = request.header(key, value);
|
||||
}
|
||||
|
||||
// Add body if present
|
||||
if let Some(body_bytes) = body {
|
||||
request = request.header("Content-Type", "application/json");
|
||||
request = request.body(body_bytes.to_vec());
|
||||
}
|
||||
|
||||
let response = request.send().await.map_err(|e| {
|
||||
MiroirError::Routing(format!("request to node {} failed: {e}", node_id.as_str(),))
|
||||
})?;
|
||||
|
||||
let status = response.status().as_u16();
|
||||
|
||||
// Collect response headers
|
||||
let response_headers: Vec<(String, String)> = response
|
||||
.headers()
|
||||
.iter()
|
||||
.filter_map(|(name, value)| {
|
||||
value
|
||||
.to_str()
|
||||
.ok()
|
||||
.map(|v| (name.as_str().to_string(), v.to_string()))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let body_bytes = response
|
||||
.bytes()
|
||||
.await
|
||||
.map_err(|e| MiroirError::Routing(format!("failed to read response body: {e}")))?;
|
||||
|
||||
let body_json: Value = serde_json::from_slice(&body_bytes)
|
||||
.unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&body_bytes).to_string()));
|
||||
|
||||
Ok(NodeResponse {
|
||||
node_id: node_id.clone(),
|
||||
status,
|
||||
body: body_json,
|
||||
headers: response_headers,
|
||||
})
|
||||
}
|
||||
|
||||
/// Send requests to multiple nodes in parallel.
|
||||
pub async fn send_to_many(
|
||||
&self,
|
||||
topology: &Topology,
|
||||
node_ids: &[NodeId],
|
||||
method: &str,
|
||||
path: &str,
|
||||
body: Option<&[u8]>,
|
||||
headers: &[(String, String)],
|
||||
) -> Vec<Result<NodeResponse>> {
|
||||
let futures: Vec<_> = node_ids
|
||||
.iter()
|
||||
.map(|node_id| self.send_to_node(topology, node_id, method, path, body, headers))
|
||||
.collect();
|
||||
|
||||
futures::future::join_all(futures).await
|
||||
}
|
||||
|
||||
/// Prune old entries from the retry cache.
|
||||
pub async fn prune_retry_cache(&self, max_age: Duration) {
|
||||
let mut cache = self.retry_cache.write().await;
|
||||
let now = std::time::Instant::now();
|
||||
cache.retain(|_, entry| now.duration_since(entry.cached_at) < max_age);
|
||||
}
|
||||
}
|
||||
140
crates/miroir-proxy/src/error_response.rs
Normal file
140
crates/miroir-proxy/src/error_response.rs
Normal file
|
|
@ -0,0 +1,140 @@
|
|||
//! Meilisearch-compatible error responses.
|
||||
//!
|
||||
//! Per plan §5, all errors must match the Meilisearch shape:
|
||||
//! {"message": "...", "code": "...", "type": "...", "link": "..."}
|
||||
|
||||
use axum::{
|
||||
http::StatusCode,
|
||||
response::{IntoResponse, Json, Response},
|
||||
};
|
||||
use serde::Serialize;
|
||||
|
||||
/// Meilisearch-compatible error response.
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ErrorResponse {
|
||||
/// Human-readable error message.
|
||||
pub message: String,
|
||||
|
||||
/// Machine-readable error code.
|
||||
pub code: String,
|
||||
|
||||
/// Error type category.
|
||||
#[serde(rename = "type")]
|
||||
pub error_type: String,
|
||||
|
||||
/// Documentation link.
|
||||
pub link: String,
|
||||
}
|
||||
|
||||
impl ErrorResponse {
|
||||
/// Create a new error response.
|
||||
pub fn new(message: impl Into<String>, code: impl Into<String>) -> Self {
|
||||
let message = message.into();
|
||||
let code = code.into();
|
||||
|
||||
// Determine error type from code
|
||||
let error_type = if code.starts_with("miroir_") {
|
||||
"invalid_request".to_string()
|
||||
} else if code.contains("index") {
|
||||
"index_creation".to_string()
|
||||
} else if code.contains("document") {
|
||||
"document".to_string()
|
||||
} else {
|
||||
"invalid_request".to_string()
|
||||
};
|
||||
|
||||
Self {
|
||||
message,
|
||||
code,
|
||||
error_type,
|
||||
link: "https://docs.meilisearch.com/errors".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create an error for missing primary key.
|
||||
pub fn primary_key_required(index: &str) -> Self {
|
||||
Self::new(
|
||||
format!("Index `{index}` does not have a primary key. A primary key must be declared when creating the index in order to use the document routes."),
|
||||
"miroir_primary_key_required",
|
||||
)
|
||||
}
|
||||
|
||||
/// Create an error for no quorum.
|
||||
pub fn no_quorum(shard_id: u32) -> Self {
|
||||
Self::new(
|
||||
format!("No replica group met quorum for shard {shard_id}"),
|
||||
"miroir_no_quorum",
|
||||
)
|
||||
}
|
||||
|
||||
/// Create an error for unavailable shard.
|
||||
#[allow(dead_code)]
|
||||
pub fn shard_unavailable(shard_id: u32) -> Self {
|
||||
Self::new(
|
||||
format!("Shard {shard_id} is unavailable"),
|
||||
"miroir_shard_unavailable",
|
||||
)
|
||||
}
|
||||
|
||||
/// Create an error for reserved field usage.
|
||||
#[allow(dead_code)]
|
||||
pub fn reserved_field(field: &str) -> Self {
|
||||
Self::new(
|
||||
format!("Field `{field}` is reserved for internal use and cannot be used in documents",),
|
||||
"miroir_reserved_field",
|
||||
)
|
||||
}
|
||||
|
||||
/// Create an error for index not found.
|
||||
pub fn index_not_found(uid: &str) -> Self {
|
||||
Self::new(format!("Index `{uid}` not found."), "index_not_found")
|
||||
}
|
||||
|
||||
/// Create an error for invalid request.
|
||||
pub fn invalid_request(message: impl Into<String>) -> Self {
|
||||
Self::new(message, "invalid_request")
|
||||
}
|
||||
|
||||
/// Create an error for document not found.
|
||||
#[allow(dead_code)]
|
||||
pub fn document_not_found(id: &str) -> Self {
|
||||
Self::new(
|
||||
format!("Document with id `{id}` not found."),
|
||||
"document_not_found",
|
||||
)
|
||||
}
|
||||
|
||||
/// Create an internal server error.
|
||||
pub fn internal_error(message: impl Into<String>) -> Self {
|
||||
Self::new(message, "internal_error")
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoResponse for ErrorResponse {
|
||||
fn into_response(self) -> Response {
|
||||
let status = if self.code == "miroir_no_quorum" || self.code == "miroir_shard_unavailable" {
|
||||
StatusCode::SERVICE_UNAVAILABLE
|
||||
} else if self.code.contains("not_found") {
|
||||
StatusCode::NOT_FOUND
|
||||
} else {
|
||||
StatusCode::BAD_REQUEST
|
||||
};
|
||||
|
||||
(status, Json(self)).into_response()
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert MiroirError to ErrorResponse.
|
||||
impl From<miroir_core::MiroirError> for ErrorResponse {
|
||||
fn from(err: miroir_core::MiroirError) -> Self {
|
||||
match err {
|
||||
miroir_core::MiroirError::Config(msg) => Self::new(msg, "invalid_configuration"),
|
||||
miroir_core::MiroirError::Topology(msg) => Self::new(msg, "invalid_topology"),
|
||||
miroir_core::MiroirError::Routing(msg) => Self::new(msg, "internal_error"),
|
||||
miroir_core::MiroirError::Merge(msg) => Self::new(msg, "internal_error"),
|
||||
miroir_core::MiroirError::Task(msg) => Self::new(msg, "task_error"),
|
||||
miroir_core::MiroirError::Io(err) => Self::new(err.to_string(), "internal_error"),
|
||||
miroir_core::MiroirError::Json(err) => Self::new(err.to_string(), "internal_error"),
|
||||
}
|
||||
}
|
||||
}
|
||||
158
crates/miroir-proxy/src/state.rs
Normal file
158
crates/miroir-proxy/src/state.rs
Normal file
|
|
@ -0,0 +1,158 @@
|
|||
//! Shared application state for the proxy server.
|
||||
|
||||
use miroir_core::config::MiroirConfig;
|
||||
use miroir_core::topology::{Node, NodeId, NodeStatus, Topology};
|
||||
use miroir_core::{MiroirError, Result};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::client::NodeClient;
|
||||
|
||||
/// Shared application state.
|
||||
#[derive(Clone)]
|
||||
pub struct ProxyState {
|
||||
/// Miroir configuration.
|
||||
pub config: Arc<MiroirConfig>,
|
||||
|
||||
/// Cluster topology.
|
||||
pub topology: Arc<RwLock<Topology>>,
|
||||
|
||||
/// HTTP client for node communication.
|
||||
pub client: Arc<NodeClient>,
|
||||
|
||||
/// Query sequence counter for round-robin group selection.
|
||||
pub query_seq: Arc<AtomicU64>,
|
||||
|
||||
/// Master key for client authentication.
|
||||
#[allow(dead_code)]
|
||||
pub master_key: Arc<String>,
|
||||
|
||||
/// Admin API key.
|
||||
#[allow(dead_code)]
|
||||
pub admin_key: Arc<String>,
|
||||
}
|
||||
|
||||
impl ProxyState {
|
||||
/// Create a new proxy state from configuration.
|
||||
pub fn new(config: MiroirConfig) -> Result<Self> {
|
||||
// Build topology from config nodes
|
||||
let mut topology = Topology::new(config.replication_factor as usize);
|
||||
|
||||
for node_config in &config.nodes {
|
||||
let node = Node::new(
|
||||
NodeId::new(node_config.id.clone()),
|
||||
node_config.address.clone(),
|
||||
node_config.replica_group,
|
||||
);
|
||||
topology.add_node(node);
|
||||
}
|
||||
|
||||
// Validate topology matches config
|
||||
if topology.replica_group_count() != config.replica_groups {
|
||||
return Err(MiroirError::Config(format!(
|
||||
"Topology has {} groups but config specifies {}",
|
||||
topology.replica_group_count(),
|
||||
config.replica_groups
|
||||
)));
|
||||
}
|
||||
|
||||
let client = Arc::new(NodeClient::new(
|
||||
config.node_master_key.clone(),
|
||||
&config.server,
|
||||
));
|
||||
|
||||
Ok(Self {
|
||||
config: Arc::new(config),
|
||||
topology: Arc::new(RwLock::new(topology)),
|
||||
client,
|
||||
query_seq: Arc::new(AtomicU64::new(0)),
|
||||
master_key: Arc::new(
|
||||
std::env::var("MIROIR_MASTER_KEY").unwrap_or_else(|_| "".to_string()),
|
||||
),
|
||||
admin_key: Arc::new(
|
||||
std::env::var("MIROIR_ADMIN_API_KEY").unwrap_or_else(|_| "".to_string()),
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
/// Increment and get the next query sequence number.
|
||||
pub fn next_query_seq(&self) -> u64 {
|
||||
self.query_seq.fetch_add(1, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Get the current topology.
|
||||
pub async fn topology(&self) -> Topology {
|
||||
self.topology.read().await.clone()
|
||||
}
|
||||
|
||||
/// Check if a master key is valid.
|
||||
#[allow(dead_code)]
|
||||
pub fn is_valid_master_key(&self, key: &str) -> bool {
|
||||
!self.master_key.is_empty() && self.master_key.as_str() == key
|
||||
}
|
||||
|
||||
/// Check if an admin key is valid.
|
||||
#[allow(dead_code)]
|
||||
pub fn is_valid_admin_key(&self, key: &str) -> bool {
|
||||
!self.admin_key.is_empty() && self.admin_key.as_str() == key
|
||||
}
|
||||
|
||||
/// Get node health status for topology endpoint.
|
||||
pub async fn get_node_health(&self) -> Vec<NodeHealth> {
|
||||
let topology = self.topology.read().await;
|
||||
let mut health = Vec::new();
|
||||
|
||||
for node in topology.nodes() {
|
||||
health.push(NodeHealth {
|
||||
id: node.id.as_str().to_string(),
|
||||
url: node.url.clone(),
|
||||
replica_group: node.replica_group,
|
||||
status: node.status,
|
||||
is_healthy: node.is_healthy(),
|
||||
});
|
||||
}
|
||||
|
||||
health
|
||||
}
|
||||
|
||||
/// Get shard assignment for topology endpoint.
|
||||
pub async fn get_shard_assignments(&self) -> Vec<ShardAssignment> {
|
||||
use miroir_core::router;
|
||||
|
||||
let topology = self.topology.read().await;
|
||||
let mut assignments = Vec::new();
|
||||
|
||||
for group in topology.groups() {
|
||||
for shard_id in 0..self.config.shards {
|
||||
let nodes = router::assign_shard_in_group(shard_id, group.nodes(), topology.rf());
|
||||
|
||||
assignments.push(ShardAssignment {
|
||||
shard_id,
|
||||
replica_group: group.id,
|
||||
nodes: nodes.iter().map(|n| n.as_str().to_string()).collect(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
assignments
|
||||
}
|
||||
}
|
||||
|
||||
/// Node health information for topology endpoint.
|
||||
#[derive(Debug, Clone, serde::Serialize)]
|
||||
pub struct NodeHealth {
|
||||
pub id: String,
|
||||
pub url: String,
|
||||
pub replica_group: u32,
|
||||
pub status: NodeStatus,
|
||||
pub is_healthy: bool,
|
||||
}
|
||||
|
||||
/// Shard assignment information for topology endpoint.
|
||||
#[derive(Debug, Clone, serde::Serialize)]
|
||||
pub struct ShardAssignment {
|
||||
pub shard_id: u32,
|
||||
pub replica_group: u32,
|
||||
pub nodes: Vec<String>,
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue