diff --git a/crates/miroir-core/src/anti_entropy.rs b/crates/miroir-core/src/anti_entropy.rs index 3a36462..972fddd 100644 --- a/crates/miroir-core/src/anti_entropy.rs +++ b/crates/miroir-core/src/anti_entropy.rs @@ -7,19 +7,24 @@ use crate::error::{MiroirError, Result}; use crate::migration::{MigrationConfig, MigrationError}; use crate::router::assign_shard_in_group; -use crate::topology::{Group, NodeId, Topology}; +use crate::scatter::{FetchDocumentsRequest, FetchDocumentsResponse, NodeClient}; +use crate::topology::{NodeId, Topology}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use serde_json::{json, Value}; +use std::collections::BTreeMap; +use std::hash::Hasher; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock; use tracing::{debug, error, info, warn}; +use twox_hash::XxHash64; /// Anti-entropy configuration (plan §13.8). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AntiEntropyConfig { pub enabled: bool, pub schedule: String, + pub index_uid: String, pub shards_per_pass: u32, pub max_read_concurrency: u32, pub fingerprint_batch_size: u32, @@ -32,6 +37,7 @@ impl Default for AntiEntropyConfig { Self { enabled: true, schedule: "every 6h".to_string(), + index_uid: "default".to_string(), shards_per_pass: 0, max_read_concurrency: 2, fingerprint_batch_size: 1000, @@ -113,7 +119,7 @@ pub struct ReconcilerPass { } /// Anti-entropy reconciler. -pub struct AntiEntropyReconciler { +pub struct AntiEntropyReconciler { /// Configuration. config: AntiEntropyConfig, /// Shared topology. @@ -122,22 +128,137 @@ pub struct AntiEntropyReconciler { pass_history: Arc>>, /// Currently running pass. current_pass: Arc>>, + /// HTTP client for node communication. + node_client: Arc, } -impl AntiEntropyReconciler { +impl AntiEntropyReconciler { /// Create a new anti-entropy reconciler. pub fn new( config: AntiEntropyConfig, topology: Arc>, + node_client: Arc, ) -> Self { Self { config, topology, pass_history: Arc::new(RwLock::new(Vec::new())), current_pass: Arc::new(RwLock::new(None)), + node_client, } } + /// Compute the canonical content hash of a document. + /// + /// The canonical form excludes internal Miroir fields (_miroir_*, _rankingScore) + /// and serializes with sorted keys for deterministic hashing. + fn compute_content_hash(document: &Value) -> u64 { + // Remove internal fields to get canonical content + let mut canonical = document.clone(); + if let Some(obj) = canonical.as_object_mut() { + // Remove all _miroir_* fields + obj.retain(|k, _| !k.starts_with("_miroir_")); + // Remove _rankingScore (not content, used for scoring) + obj.remove("_rankingScore"); + } + + // Serialize with sorted keys for deterministic output + let canonical_json = if let Some(obj) = canonical.as_object() { + // Use BTreeMap to sort keys + let sorted: BTreeMap<_, _> = obj.iter().collect(); + serde_json::to_string(&sorted).unwrap_or_else(|_| "{}".to_string()) + } else { + serde_json::to_string(&canonical).unwrap_or_else(|_| "{}".to_string()) + }; + + // Hash using xxh3 (xxhash family, same as router) + let mut hasher = XxHash64::with_seed(0); + hasher.write(canonical_json.as_bytes()); + hasher.finish() + } + + /// Fingerprint a single shard on a node (plan §13.8 step 1). + /// + /// Iterates all documents with filter=_miroir_shard={id}, computes + /// hash(primary_key || content_hash) for each, and folds into a + /// streaming xxh3 digest. + async fn fingerprint_shard( + &self, + node_id: &NodeId, + shard_id: u32, + index_uid: &str, + address: &str, + ) -> Result { + let batch_size = self.config.fingerprint_batch_size as usize; + let mut offset = 0u32; + let mut document_count = 0u64; + let mut hasher = XxHash64::with_seed(shard_id as u64); // Shard-seeded digest + + // Paginated iteration through documents + loop { + let filter = serde_json::json!({ "_miroir_shard": shard_id }); + let request = FetchDocumentsRequest { + index_uid: index_uid.to_string(), + filter, + limit: batch_size as u32, + offset, + }; + + let response: FetchDocumentsResponse = self + .node_client + .fetch_documents(node_id, address, &request) + .await + .map_err(|e| MiroirError::Topology(format!("fetch failed: {:?}", e)))?; + + if response.results.is_empty() { + break; // No more documents + } + + for doc in &response.results { + // Extract primary key + let primary_key = doc + .get("id") + .or(doc.get("_id")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + + // Compute content hash + let content_hash = Self::compute_content_hash(doc); + + // Fold: hash(primary_key || content_hash) into digest + let mut pk_hasher = XxHash64::with_seed(0); + pk_hasher.write(primary_key.as_bytes()); + pk_hasher.write_u64(content_hash); + let doc_hash = pk_hasher.finish(); + + // Fold into shard digest + hasher.write_u64(doc_hash); + document_count += 1; + } + + offset += batch_size as u32; + + // Self-throttle: small sleep between batches to target <2% CPU + // (In production, this would be adaptive based on CPU metrics) + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + + let merkle_root = format!("xxh3:{}", hasher.finish()); + + debug!( + "Fingerprinted shard {} on node {}: {} documents, root {}", + shard_id, node_id, document_count, merkle_root + ); + + Ok(ShardFingerprint { + shard_id, + node_id: node_id.to_string(), + merkle_root, + document_count, + bucket_hashes: Vec::new(), // Computed on-demand during diff + }) + } + /// Run a single reconciliation pass. pub async fn run_pass(&self) -> Result { let mut pass = ReconcilerPass { @@ -224,10 +345,30 @@ impl AntiEntropyReconciler { for group in topology.groups() { let assigned = assign_shard_in_group(shard_id, group.nodes(), topology.rf()); for node_id in assigned { - match self.fingerprint_shard(node_id.as_str(), shard_id).await { + // Look up node address from topology + let topology_guard = self.topology.read().await; + let node = topology_guard + .node(&node_id) + .ok_or_else(|| MiroirError::Topology(format!("node {} not found", node_id)))?; + + if !node.is_healthy() { + warn!("Node {} is not healthy, skipping fingerprint", node_id); + continue; + } + + let address = node.address.clone(); + drop(topology_guard); + + match self + .fingerprint_shard(&node_id, shard_id, &self.config.index_uid, &address) + .await + { Ok(fp) => fingerprints.push((node_id, fp)), Err(e) => { - warn!("Failed to fingerprint shard {} on node {}: {}", shard_id, node_id, e); + warn!( + "Failed to fingerprint shard {} on node {}: {}", + shard_id, node_id, e + ); // Continue with other nodes } } @@ -254,7 +395,10 @@ impl AntiEntropyReconciler { if self.config.auto_repair { // Perform detailed diff and repair if let Err(e) = self.repair_shard(shard_id, reference, fp).await { - error!("Failed to repair shard {} on node {}: {}", shard_id, node_id, e); + error!( + "Failed to repair shard {} on node {}: {}", + shard_id, node_id, e + ); } } } @@ -263,24 +407,6 @@ impl AntiEntropyReconciler { Ok(drift_detected) } - /// Fingerprint a single shard on a node. - async fn fingerprint_shard(&self, node_id: &str, shard_id: u32) -> Result { - // In a real implementation, this would: - // 1. GET /indexes/{uid}/documents?filter=_miroir_shard={shard_id} - // 2. Iterate through documents in batches - // 3. Hash each document's (primary_key || content_hash) - // 4. Fold into a Merkle tree - - // For now, return a placeholder - Ok(ShardFingerprint { - shard_id, - node_id: node_id.to_string(), - merkle_root: format!("sha256:{}", uuid::Uuid::new_v4()), - document_count: 0, - bucket_hashes: Vec::new(), - }) - } - /// Repair a shard by comparing replicas and applying fixes. async fn repair_shard( &self, @@ -297,7 +423,10 @@ impl AntiEntropyReconciler { // c. Otherwise: pick authoritative version (highest _miroir_updated_at) // d. Write authoritative version to divergent replicas - debug!("Repairing shard {} on node {}", shard_id, target.node_id); + debug!( + "Repairing shard {} on node {}", + shard_id, target.node_id + ); Ok(()) } @@ -359,6 +488,7 @@ fn millis_now() -> u64 { #[cfg(test)] mod tests { use super::*; + use serde_json::json; #[test] fn test_validate_safe_with_delta_pass() { @@ -406,4 +536,88 @@ mod tests { assert!(migration_warning_if_ae_disabled(false).is_some()); assert!(migration_warning_if_ae_disabled(true).is_none()); } + + #[test] + fn test_compute_content_hash_strips_internal_fields() { + let doc_with_internal = json!({ + "id": "test-1", + "title": "Test Document", + "content": "Some content", + "_miroir_shard": 5, + "_miroir_updated_at": 1234567890, + "_rankingScore": 0.95, + }); + + let hash1 = AntiEntropyReconciler::::compute_content_hash(&doc_with_internal); + + let doc_clean = json!({ + "id": "test-1", + "title": "Test Document", + "content": "Some content", + }); + + let hash2 = AntiEntropyReconciler::::compute_content_hash(&doc_clean); + + // Same content (without internal fields) should produce same hash + assert_eq!( + hash1, hash2, + "hashes should match after stripping internal fields" + ); + } + + #[test] + fn test_compute_content_hash_is_deterministic() { + let doc = json!({ + "id": "test-2", + "z_field": "last", + "a_field": "first", + "m_field": "middle", + }); + + let hash1 = AntiEntropyReconciler::::compute_content_hash(&doc); + let hash2 = AntiEntropyReconciler::::compute_content_hash(&doc); + + assert_eq!(hash1, hash2, "hash should be deterministic"); + } + + #[test] + fn test_compute_content_hash_different_content() { + let doc1 = json!({ + "id": "test-3", + "title": "First", + }); + + let doc2 = json!({ + "id": "test-3", + "title": "Second", + }); + + let hash1 = AntiEntropyReconciler::::compute_content_hash(&doc1); + let hash2 = AntiEntropyReconciler::::compute_content_hash(&doc2); + + assert_ne!(hash1, hash2, "different content should produce different hashes"); + } + + #[test] + fn test_compute_content_hash_key_order_independence() { + // JSON objects with same fields but different key order + let doc1 = json!({ + "id": "test-4", + "b": "value_b", + "a": "value_a", + "c": "value_c", + }); + + let doc2 = json!({ + "c": "value_c", + "a": "value_a", + "id": "test-4", + "b": "value_b", + }); + + let hash1 = AntiEntropyReconciler::::compute_content_hash(&doc1); + let hash2 = AntiEntropyReconciler::::compute_content_hash(&doc2); + + assert_eq!(hash1, hash2, "hash should be independent of key order"); + } } diff --git a/crates/miroir-core/src/error.rs b/crates/miroir-core/src/error.rs index 80840e5..9fdae84 100644 --- a/crates/miroir-core/src/error.rs +++ b/crates/miroir-core/src/error.rs @@ -101,4 +101,8 @@ pub enum MiroirError { /// Discovery error. #[error("discovery error: {0}")] Discovery(String), + + /// Anti-entropy error. + #[error("anti-entropy error: {0}")] + AntiEntropy(String), }