P5.8.a: Implement fingerprint step for anti-entropy

Implement step 1 of the anti-entropy pipeline (plan §13.8):
- Per-replica xxh3 digest computed over (pk || content_hash)
- Paginated document iteration using filter=_miroir_shard={id}
- Content hash excludes internal Miroir fields (_miroir_*, _rankingScore)
- Sorted-key JSON serialization for deterministic hashing
- Self-throttled batch processing (10ms sleep between batches)
- Generic NodeClient trait bound for flexible client implementations

All replicas should produce the same merkle root in steady state.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-23 07:43:50 -04:00
parent 7b71cefc0d
commit 04dd6cf640
2 changed files with 244 additions and 26 deletions

View file

@ -7,19 +7,24 @@
use crate::error::{MiroirError, Result};
use crate::migration::{MigrationConfig, MigrationError};
use crate::router::assign_shard_in_group;
use crate::topology::{Group, NodeId, Topology};
use crate::scatter::{FetchDocumentsRequest, FetchDocumentsResponse, NodeClient};
use crate::topology::{NodeId, Topology};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use serde_json::{json, Value};
use std::collections::BTreeMap;
use std::hash::Hasher;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
use twox_hash::XxHash64;
/// Anti-entropy configuration (plan §13.8).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AntiEntropyConfig {
pub enabled: bool,
pub schedule: String,
pub index_uid: String,
pub shards_per_pass: u32,
pub max_read_concurrency: u32,
pub fingerprint_batch_size: u32,
@ -32,6 +37,7 @@ impl Default for AntiEntropyConfig {
Self {
enabled: true,
schedule: "every 6h".to_string(),
index_uid: "default".to_string(),
shards_per_pass: 0,
max_read_concurrency: 2,
fingerprint_batch_size: 1000,
@ -113,7 +119,7 @@ pub struct ReconcilerPass {
}
/// Anti-entropy reconciler.
pub struct AntiEntropyReconciler {
pub struct AntiEntropyReconciler<C: NodeClient> {
/// Configuration.
config: AntiEntropyConfig,
/// Shared topology.
@ -122,22 +128,137 @@ pub struct AntiEntropyReconciler {
pass_history: Arc<RwLock<Vec<ReconcilerPass>>>,
/// Currently running pass.
current_pass: Arc<RwLock<Option<ReconcilerPass>>>,
/// HTTP client for node communication.
node_client: Arc<C>,
}
impl AntiEntropyReconciler {
impl<C: NodeClient> AntiEntropyReconciler<C> {
/// Create a new anti-entropy reconciler.
pub fn new(
config: AntiEntropyConfig,
topology: Arc<RwLock<Topology>>,
node_client: Arc<C>,
) -> Self {
Self {
config,
topology,
pass_history: Arc::new(RwLock::new(Vec::new())),
current_pass: Arc::new(RwLock::new(None)),
node_client,
}
}
/// Compute the canonical content hash of a document.
///
/// The canonical form excludes internal Miroir fields (_miroir_*, _rankingScore)
/// and serializes with sorted keys for deterministic hashing.
fn compute_content_hash(document: &Value) -> u64 {
// Remove internal fields to get canonical content
let mut canonical = document.clone();
if let Some(obj) = canonical.as_object_mut() {
// Remove all _miroir_* fields
obj.retain(|k, _| !k.starts_with("_miroir_"));
// Remove _rankingScore (not content, used for scoring)
obj.remove("_rankingScore");
}
// Serialize with sorted keys for deterministic output
let canonical_json = if let Some(obj) = canonical.as_object() {
// Use BTreeMap to sort keys
let sorted: BTreeMap<_, _> = obj.iter().collect();
serde_json::to_string(&sorted).unwrap_or_else(|_| "{}".to_string())
} else {
serde_json::to_string(&canonical).unwrap_or_else(|_| "{}".to_string())
};
// Hash using xxh3 (xxhash family, same as router)
let mut hasher = XxHash64::with_seed(0);
hasher.write(canonical_json.as_bytes());
hasher.finish()
}
/// Fingerprint a single shard on a node (plan §13.8 step 1).
///
/// Iterates all documents with filter=_miroir_shard={id}, computes
/// hash(primary_key || content_hash) for each, and folds into a
/// streaming xxh3 digest.
async fn fingerprint_shard(
&self,
node_id: &NodeId,
shard_id: u32,
index_uid: &str,
address: &str,
) -> Result<ShardFingerprint> {
let batch_size = self.config.fingerprint_batch_size as usize;
let mut offset = 0u32;
let mut document_count = 0u64;
let mut hasher = XxHash64::with_seed(shard_id as u64); // Shard-seeded digest
// Paginated iteration through documents
loop {
let filter = serde_json::json!({ "_miroir_shard": shard_id });
let request = FetchDocumentsRequest {
index_uid: index_uid.to_string(),
filter,
limit: batch_size as u32,
offset,
};
let response: FetchDocumentsResponse = self
.node_client
.fetch_documents(node_id, address, &request)
.await
.map_err(|e| MiroirError::Topology(format!("fetch failed: {:?}", e)))?;
if response.results.is_empty() {
break; // No more documents
}
for doc in &response.results {
// Extract primary key
let primary_key = doc
.get("id")
.or(doc.get("_id"))
.and_then(|v| v.as_str())
.unwrap_or("");
// Compute content hash
let content_hash = Self::compute_content_hash(doc);
// Fold: hash(primary_key || content_hash) into digest
let mut pk_hasher = XxHash64::with_seed(0);
pk_hasher.write(primary_key.as_bytes());
pk_hasher.write_u64(content_hash);
let doc_hash = pk_hasher.finish();
// Fold into shard digest
hasher.write_u64(doc_hash);
document_count += 1;
}
offset += batch_size as u32;
// Self-throttle: small sleep between batches to target <2% CPU
// (In production, this would be adaptive based on CPU metrics)
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
let merkle_root = format!("xxh3:{}", hasher.finish());
debug!(
"Fingerprinted shard {} on node {}: {} documents, root {}",
shard_id, node_id, document_count, merkle_root
);
Ok(ShardFingerprint {
shard_id,
node_id: node_id.to_string(),
merkle_root,
document_count,
bucket_hashes: Vec::new(), // Computed on-demand during diff
})
}
/// Run a single reconciliation pass.
pub async fn run_pass(&self) -> Result<ReconcilerPass> {
let mut pass = ReconcilerPass {
@ -224,10 +345,30 @@ impl AntiEntropyReconciler {
for group in topology.groups() {
let assigned = assign_shard_in_group(shard_id, group.nodes(), topology.rf());
for node_id in assigned {
match self.fingerprint_shard(node_id.as_str(), shard_id).await {
// Look up node address from topology
let topology_guard = self.topology.read().await;
let node = topology_guard
.node(&node_id)
.ok_or_else(|| MiroirError::Topology(format!("node {} not found", node_id)))?;
if !node.is_healthy() {
warn!("Node {} is not healthy, skipping fingerprint", node_id);
continue;
}
let address = node.address.clone();
drop(topology_guard);
match self
.fingerprint_shard(&node_id, shard_id, &self.config.index_uid, &address)
.await
{
Ok(fp) => fingerprints.push((node_id, fp)),
Err(e) => {
warn!("Failed to fingerprint shard {} on node {}: {}", shard_id, node_id, e);
warn!(
"Failed to fingerprint shard {} on node {}: {}",
shard_id, node_id, e
);
// Continue with other nodes
}
}
@ -254,7 +395,10 @@ impl AntiEntropyReconciler {
if self.config.auto_repair {
// Perform detailed diff and repair
if let Err(e) = self.repair_shard(shard_id, reference, fp).await {
error!("Failed to repair shard {} on node {}: {}", shard_id, node_id, e);
error!(
"Failed to repair shard {} on node {}: {}",
shard_id, node_id, e
);
}
}
}
@ -263,24 +407,6 @@ impl AntiEntropyReconciler {
Ok(drift_detected)
}
/// Fingerprint a single shard on a node.
async fn fingerprint_shard(&self, node_id: &str, shard_id: u32) -> Result<ShardFingerprint> {
// In a real implementation, this would:
// 1. GET /indexes/{uid}/documents?filter=_miroir_shard={shard_id}
// 2. Iterate through documents in batches
// 3. Hash each document's (primary_key || content_hash)
// 4. Fold into a Merkle tree
// For now, return a placeholder
Ok(ShardFingerprint {
shard_id,
node_id: node_id.to_string(),
merkle_root: format!("sha256:{}", uuid::Uuid::new_v4()),
document_count: 0,
bucket_hashes: Vec::new(),
})
}
/// Repair a shard by comparing replicas and applying fixes.
async fn repair_shard(
&self,
@ -297,7 +423,10 @@ impl AntiEntropyReconciler {
// c. Otherwise: pick authoritative version (highest _miroir_updated_at)
// d. Write authoritative version to divergent replicas
debug!("Repairing shard {} on node {}", shard_id, target.node_id);
debug!(
"Repairing shard {} on node {}",
shard_id, target.node_id
);
Ok(())
}
@ -359,6 +488,7 @@ fn millis_now() -> u64 {
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_validate_safe_with_delta_pass() {
@ -406,4 +536,88 @@ mod tests {
assert!(migration_warning_if_ae_disabled(false).is_some());
assert!(migration_warning_if_ae_disabled(true).is_none());
}
#[test]
fn test_compute_content_hash_strips_internal_fields() {
let doc_with_internal = json!({
"id": "test-1",
"title": "Test Document",
"content": "Some content",
"_miroir_shard": 5,
"_miroir_updated_at": 1234567890,
"_rankingScore": 0.95,
});
let hash1 = AntiEntropyReconciler::<crate::scatter::MockNodeClient>::compute_content_hash(&doc_with_internal);
let doc_clean = json!({
"id": "test-1",
"title": "Test Document",
"content": "Some content",
});
let hash2 = AntiEntropyReconciler::<crate::scatter::MockNodeClient>::compute_content_hash(&doc_clean);
// Same content (without internal fields) should produce same hash
assert_eq!(
hash1, hash2,
"hashes should match after stripping internal fields"
);
}
#[test]
fn test_compute_content_hash_is_deterministic() {
let doc = json!({
"id": "test-2",
"z_field": "last",
"a_field": "first",
"m_field": "middle",
});
let hash1 = AntiEntropyReconciler::<crate::scatter::MockNodeClient>::compute_content_hash(&doc);
let hash2 = AntiEntropyReconciler::<crate::scatter::MockNodeClient>::compute_content_hash(&doc);
assert_eq!(hash1, hash2, "hash should be deterministic");
}
#[test]
fn test_compute_content_hash_different_content() {
let doc1 = json!({
"id": "test-3",
"title": "First",
});
let doc2 = json!({
"id": "test-3",
"title": "Second",
});
let hash1 = AntiEntropyReconciler::<crate::scatter::MockNodeClient>::compute_content_hash(&doc1);
let hash2 = AntiEntropyReconciler::<crate::scatter::MockNodeClient>::compute_content_hash(&doc2);
assert_ne!(hash1, hash2, "different content should produce different hashes");
}
#[test]
fn test_compute_content_hash_key_order_independence() {
// JSON objects with same fields but different key order
let doc1 = json!({
"id": "test-4",
"b": "value_b",
"a": "value_a",
"c": "value_c",
});
let doc2 = json!({
"c": "value_c",
"a": "value_a",
"id": "test-4",
"b": "value_b",
});
let hash1 = AntiEntropyReconciler::<crate::scatter::MockNodeClient>::compute_content_hash(&doc1);
let hash2 = AntiEntropyReconciler::<crate::scatter::MockNodeClient>::compute_content_hash(&doc2);
assert_eq!(hash1, hash2, "hash should be independent of key order");
}
}

View file

@ -101,4 +101,8 @@ pub enum MiroirError {
/// Discovery error.
#[error("discovery error: {0}")]
Discovery(String),
/// Anti-entropy error.
#[error("anti-entropy error: {0}")]
AntiEntropy(String),
}