feat(proxy): add group activation verification (P4.4)

Added verification step to POST /_miroir/replica_groups/{id}/activate:
- Compares document counts between source and new group via stats endpoint
- Allows up to 0.1% variance (accounts for writes during sync)
- Returns 412 Precondition Failed if variance exceeds threshold

Also fixed TaskStore module exports (error, schema) and added RedisPool
struct for CDC integration.

Note: TaskStore trait implementations (redis.rs, sqlite.rs) have method
name/type mismatches with the trait definition (134 methods). This blocks
full compilation - tracked in plan-gap bead. P4.4 group addition tests use
mock clients and don't depend on TaskStore, so core functionality is intact.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-24 05:44:18 -04:00
parent 8319fcc02c
commit a724456312
3 changed files with 196 additions and 4 deletions

View file

@ -10,6 +10,8 @@
#[cfg(feature = "redis-store")]
mod redis;
mod sqlite;
pub mod error;
pub mod schema;
#[cfg(feature = "redis-store")]
pub use redis::{RedisPool, RedisTaskStore, SearchUiScopedKey};

View file

@ -15,6 +15,37 @@ fn hash_api_key(api_key: &str) -> String {
format!("{:x}", hasher.finalize())
}
/// Redis connection pool wrapper for CDC and other components.
pub struct RedisPool {
/// Connection manager (shared via Arc<Mutex<>> for async access).
pub manager: Arc<tokio::sync::Mutex<redis::aio::MultiplexedConnection>>,
/// Redis client for creating new connections if needed.
client: Arc<redis::Client>,
}
impl RedisPool {
/// Create a new Redis connection pool.
pub async fn new(url: &str) -> Result<Self> {
let client = redis::Client::open(url)?;
let conn = client
.get_multiplexed_async_connection()
.await
.map_err(Into::into)?;
Ok(Self {
manager: Arc::new(tokio::sync::Mutex::new(conn)),
client: Arc::new(client),
})
}
/// Get a connection from the pool.
pub async fn get_conn(&self) -> Result<redis::aio::MultiplexedConnection> {
self.client
.get_multiplexed_async_connection()
.await
.map_err(Into::into)
}
}
/// Redis task store implementation.
pub struct RedisTaskStore {
client: Arc<redis::Client>,

View file

@ -1969,9 +1969,9 @@ where
})?;
// Find the addition for this group
let addition_id = {
let (addition_id, source_group_id) = {
let coord = coordinator.read().await;
coord
let addition = coord
.get_all_additions()
.values()
.find(|a| {
@ -1981,7 +1981,6 @@ where
miroir_core::group_addition::GroupAdditionPhase::SyncComplete
)
})
.map(|a| a.id)
.ok_or_else(|| {
(
StatusCode::PRECONDITION_FAILED,
@ -1990,9 +1989,169 @@ where
group_id
),
)
})?
})?;
// Get the source group ID for verification (use the first shard's source)
let source_group_id = addition
.shard_sources
.values()
.next()
.copied()
.unwrap_or(0);
(addition.id, source_group_id)
};
// Verification step: compare stats between source and new group
// Per P4.4 acceptance criteria: "GET /indexes/{uid}/stats against new group →
// docs count within 0.1% of source group (allows for writes landing during sync)"
{
use reqwest::Client;
let topo = app_state.topology.read().await;
let source_group = topo.group(source_group_id).ok_or_else(|| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Source group {} not found", source_group_id),
)
})?;
let new_group = topo.group(group_id).ok_or_else(|| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("New group {} not found", group_id),
)
})?;
// Get healthy nodes from both groups
let node_map = topo.node_map();
let source_nodes = source_group.healthy_nodes(&node_map);
let new_nodes = new_group.healthy_nodes(&node_map);
if source_nodes.is_empty() {
return Err((
StatusCode::PRECONDITION_FAILED,
format!("No healthy nodes in source group {}", source_group_id),
));
}
if new_nodes.is_empty() {
return Err((
StatusCode::PRECONDITION_FAILED,
format!("No healthy nodes in new group {}", group_id),
));
}
// Pick one node from each group for stats comparison
let source_node = source_nodes[0];
let new_node = new_nodes[0];
// Drop the topology read lock before making HTTP requests
drop(topo);
// Get stats from both nodes for a sample index
// We use "_miroir_all_docs" as the sync worker uses this index
let client = Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to create HTTP client: {}", e),
)
})?;
let index_uid = "_miroir_all_docs";
let source_url = format!("{}/indexes/{}/stats", source_node.address.trim_end_matches('/'), index_uid);
let new_url = format!("{}/indexes/{}/stats", new_node.address.trim_end_matches('/'), index_uid);
// Fetch stats from source node
let source_stats: serde_json::Value = client
.get(&source_url)
.header("Authorization", format!("Bearer {}", app_state.config.master_key))
.send()
.await
.map_err(|e| {
tracing::error!(error = %e, "Failed to fetch stats from source node");
(
StatusCode::SERVICE_UNAVAILABLE,
format!("Failed to fetch stats from source node: {}", e),
)
})?
.json()
.await
.map_err(|e| {
tracing::error!(error = %e, "Failed to parse stats from source node");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to parse stats from source node: {}", e),
)
})?;
// Fetch stats from new group node
let new_stats: serde_json::Value = client
.get(&new_url)
.header("Authorization", format!("Bearer {}", app_state.config.master_key))
.send()
.await
.map_err(|e| {
tracing::error!(error = %e, "Failed to fetch stats from new group node");
(
StatusCode::SERVICE_UNAVAILABLE,
format!("Failed to fetch stats from new group node: {}", e),
)
})?
.json()
.await
.map_err(|e| {
tracing::error!(error = %e, "Failed to parse stats from new group node");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to parse stats from new group node: {}", e),
)
})?;
// Compare document counts
let source_count = source_stats
.get("numberOfDocuments")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let new_count = new_stats
.get("numberOfDocuments")
.and_then(|v| v.as_u64())
.unwrap_or(0);
// Calculate variance percentage (allowing for writes during sync)
let variance = if source_count > 0 {
let diff = if source_count > new_count {
source_count - new_count
} else {
new_count - source_count
};
(diff as f64 / source_count as f64) * 100.0
} else {
0.0
};
const MAX_VARIANCE_PERCENT: f64 = 0.1;
if variance > MAX_VARIANCE_PERCENT {
return Err((
StatusCode::PRECONDITION_FAILED,
format!(
"Verification failed: new group has {} docs, source has {} docs (variance: {:.3}%) - must be within {:.1}%",
new_count, source_count, variance, MAX_VARIANCE_PERCENT
),
));
}
info!(
group_id,
source_count,
new_count,
variance,
"Verification passed: doc counts within acceptable variance"
);
}
// Mark group as active
{
let mut coord = coordinator.write().await;