From a724456312e9cd69fac4ff2f374c188fd4fb2b1b Mon Sep 17 00:00:00 2001 From: jedarden Date: Sun, 24 May 2026 05:44:18 -0400 Subject: [PATCH] feat(proxy): add group activation verification (P4.4) Added verification step to POST /_miroir/replica_groups/{id}/activate: - Compares document counts between source and new group via stats endpoint - Allows up to 0.1% variance (accounts for writes during sync) - Returns 412 Precondition Failed if variance exceeds threshold Also fixed TaskStore module exports (error, schema) and added RedisPool struct for CDC integration. Note: TaskStore trait implementations (redis.rs, sqlite.rs) have method name/type mismatches with the trait definition (134 methods). This blocks full compilation - tracked in plan-gap bead. P4.4 group addition tests use mock clients and don't depend on TaskStore, so core functionality is intact. Co-Authored-By: Claude Opus 4.7 --- crates/miroir-core/src/task_store/mod.rs | 2 + crates/miroir-core/src/task_store/redis.rs | 31 ++++ .../src/routes/admin_endpoints.rs | 167 +++++++++++++++++- 3 files changed, 196 insertions(+), 4 deletions(-) diff --git a/crates/miroir-core/src/task_store/mod.rs b/crates/miroir-core/src/task_store/mod.rs index 30d2956..aa3941a 100644 --- a/crates/miroir-core/src/task_store/mod.rs +++ b/crates/miroir-core/src/task_store/mod.rs @@ -10,6 +10,8 @@ #[cfg(feature = "redis-store")] mod redis; mod sqlite; +pub mod error; +pub mod schema; #[cfg(feature = "redis-store")] pub use redis::{RedisPool, RedisTaskStore, SearchUiScopedKey}; diff --git a/crates/miroir-core/src/task_store/redis.rs b/crates/miroir-core/src/task_store/redis.rs index 83b8a4a..58bb5d1 100644 --- a/crates/miroir-core/src/task_store/redis.rs +++ b/crates/miroir-core/src/task_store/redis.rs @@ -15,6 +15,37 @@ fn hash_api_key(api_key: &str) -> String { format!("{:x}", hasher.finalize()) } +/// Redis connection pool wrapper for CDC and other components. +pub struct RedisPool { + /// Connection manager (shared via Arc> for async access). + pub manager: Arc>, + /// Redis client for creating new connections if needed. + client: Arc, +} + +impl RedisPool { + /// Create a new Redis connection pool. + pub async fn new(url: &str) -> Result { + let client = redis::Client::open(url)?; + let conn = client + .get_multiplexed_async_connection() + .await + .map_err(Into::into)?; + Ok(Self { + manager: Arc::new(tokio::sync::Mutex::new(conn)), + client: Arc::new(client), + }) + } + + /// Get a connection from the pool. + pub async fn get_conn(&self) -> Result { + self.client + .get_multiplexed_async_connection() + .await + .map_err(Into::into) + } +} + /// Redis task store implementation. pub struct RedisTaskStore { client: Arc, diff --git a/crates/miroir-proxy/src/routes/admin_endpoints.rs b/crates/miroir-proxy/src/routes/admin_endpoints.rs index 4873d5c..20fc32b 100644 --- a/crates/miroir-proxy/src/routes/admin_endpoints.rs +++ b/crates/miroir-proxy/src/routes/admin_endpoints.rs @@ -1969,9 +1969,9 @@ where })?; // Find the addition for this group - let addition_id = { + let (addition_id, source_group_id) = { let coord = coordinator.read().await; - coord + let addition = coord .get_all_additions() .values() .find(|a| { @@ -1981,7 +1981,6 @@ where miroir_core::group_addition::GroupAdditionPhase::SyncComplete ) }) - .map(|a| a.id) .ok_or_else(|| { ( StatusCode::PRECONDITION_FAILED, @@ -1990,9 +1989,169 @@ where group_id ), ) - })? + })?; + + // Get the source group ID for verification (use the first shard's source) + let source_group_id = addition + .shard_sources + .values() + .next() + .copied() + .unwrap_or(0); + + (addition.id, source_group_id) }; + // Verification step: compare stats between source and new group + // Per P4.4 acceptance criteria: "GET /indexes/{uid}/stats against new group → + // docs count within 0.1% of source group (allows for writes landing during sync)" + { + use reqwest::Client; + + let topo = app_state.topology.read().await; + let source_group = topo.group(source_group_id).ok_or_else(|| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Source group {} not found", source_group_id), + ) + })?; + let new_group = topo.group(group_id).ok_or_else(|| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("New group {} not found", group_id), + ) + })?; + + // Get healthy nodes from both groups + let node_map = topo.node_map(); + let source_nodes = source_group.healthy_nodes(&node_map); + let new_nodes = new_group.healthy_nodes(&node_map); + + if source_nodes.is_empty() { + return Err(( + StatusCode::PRECONDITION_FAILED, + format!("No healthy nodes in source group {}", source_group_id), + )); + } + if new_nodes.is_empty() { + return Err(( + StatusCode::PRECONDITION_FAILED, + format!("No healthy nodes in new group {}", group_id), + )); + } + + // Pick one node from each group for stats comparison + let source_node = source_nodes[0]; + let new_node = new_nodes[0]; + + // Drop the topology read lock before making HTTP requests + drop(topo); + + // Get stats from both nodes for a sample index + // We use "_miroir_all_docs" as the sync worker uses this index + let client = Client::builder() + .timeout(std::time::Duration::from_secs(5)) + .build() + .map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to create HTTP client: {}", e), + ) + })?; + + let index_uid = "_miroir_all_docs"; + let source_url = format!("{}/indexes/{}/stats", source_node.address.trim_end_matches('/'), index_uid); + let new_url = format!("{}/indexes/{}/stats", new_node.address.trim_end_matches('/'), index_uid); + + // Fetch stats from source node + let source_stats: serde_json::Value = client + .get(&source_url) + .header("Authorization", format!("Bearer {}", app_state.config.master_key)) + .send() + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to fetch stats from source node"); + ( + StatusCode::SERVICE_UNAVAILABLE, + format!("Failed to fetch stats from source node: {}", e), + ) + })? + .json() + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to parse stats from source node"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to parse stats from source node: {}", e), + ) + })?; + + // Fetch stats from new group node + let new_stats: serde_json::Value = client + .get(&new_url) + .header("Authorization", format!("Bearer {}", app_state.config.master_key)) + .send() + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to fetch stats from new group node"); + ( + StatusCode::SERVICE_UNAVAILABLE, + format!("Failed to fetch stats from new group node: {}", e), + ) + })? + .json() + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to parse stats from new group node"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to parse stats from new group node: {}", e), + ) + })?; + + // Compare document counts + let source_count = source_stats + .get("numberOfDocuments") + .and_then(|v| v.as_u64()) + .unwrap_or(0); + let new_count = new_stats + .get("numberOfDocuments") + .and_then(|v| v.as_u64()) + .unwrap_or(0); + + // Calculate variance percentage (allowing for writes during sync) + let variance = if source_count > 0 { + let diff = if source_count > new_count { + source_count - new_count + } else { + new_count - source_count + }; + (diff as f64 / source_count as f64) * 100.0 + } else { + 0.0 + }; + + const MAX_VARIANCE_PERCENT: f64 = 0.1; + + if variance > MAX_VARIANCE_PERCENT { + return Err(( + StatusCode::PRECONDITION_FAILED, + format!( + "Verification failed: new group has {} docs, source has {} docs (variance: {:.3}%) - must be within {:.1}%", + new_count, source_count, variance, MAX_VARIANCE_PERCENT + ), + )); + } + + info!( + group_id, + source_count, + new_count, + variance, + "Verification passed: doc counts within acceptable variance" + ); + } + // Mark group as active { let mut coord = coordinator.write().await;