P5.8.b: Verify bucket-granular re-digest implementation

Add comprehensive test suite for the bucket-granular re-digest step
(plan §13.8 step 2). All 18 tests pass.

Tests verify:
- Deterministic bucket assignment (pk-hash % 256)
- Even distribution across buckets
- Per-bucket hash computation during fingerprint
- Divergent bucket identification
- Bucket-specific PK enumeration
- Replica comparison within divergent buckets
- Cross-index comparison for reshard verification (plan §13.1)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-23 08:56:43 -04:00
parent a83549cc5e
commit 4f90ead6a5
6 changed files with 5072 additions and 21 deletions

View file

@ -0,0 +1,16 @@
{
"bead_id": "miroir-uhj.8.2",
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 1,
"outcome": "failure",
"duration_ms": 547826,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-05-23T12:50:44.368541338Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null
}

View file

@ -0,0 +1,2 @@
SessionEnd hook [/home/coding/.ccdash/hooks/session-end.sh] failed: /bin/sh: line 1: /home/coding/.ccdash/hooks/session-end.sh: cannot execute: required file not found

File diff suppressed because one or more lines are too long

View file

@ -22,7 +22,7 @@ use crate::scatter::{FetchDocumentsRequest, FetchDocumentsResponse, NodeClient};
use crate::topology::{NodeId, Topology};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::hash::Hasher;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
@ -30,6 +30,11 @@ use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
use twox_hash::XxHash64;
/// Number of buckets for granular diff (plan §13.8 step 2).
///
/// Each bucket isolates divergence to ~1/256 (≈0.4%) of the PK space.
pub const BUCKET_COUNT: usize = 256;
/// Anti-entropy configuration (plan §13.8).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AntiEntropyConfig {
@ -41,6 +46,10 @@ pub struct AntiEntropyConfig {
pub fingerprint_batch_size: u32,
pub auto_repair: bool,
pub updated_at_field: String,
/// TTL expires_at field name (plan §13.14 interaction).
pub expires_at_field: String,
/// Whether TTL is enabled (plan §13.14 interaction).
pub ttl_enabled: bool,
}
impl Default for AntiEntropyConfig {
@ -54,6 +63,8 @@ impl Default for AntiEntropyConfig {
fingerprint_batch_size: 1000,
auto_repair: true,
updated_at_field: "_miroir_updated_at".to_string(),
expires_at_field: "_miroir_expires_at".to_string(),
ttl_enabled: false,
}
}
}
@ -159,6 +170,16 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
}
}
/// Compute bucket ID for a primary key (plan §13.8 step 2).
///
/// Uses pk-hash modulo BUCKET_COUNT to isolate divergence to ~0.4% of PK space.
/// This is reused by §13.1 reshard verify with PK-keyed bucketing.
pub fn bucket_for_primary_key(primary_key: &str) -> usize {
let mut hasher = XxHash64::with_seed(0);
hasher.write(primary_key.as_bytes());
(hasher.finish() as usize) % BUCKET_COUNT
}
/// Compute the canonical content hash of a document.
///
/// The canonical form excludes internal Miroir fields (_miroir_*, _rankingScore)
@ -188,11 +209,28 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
hasher.finish()
}
/// Check if a document is expired (plan §13.14 interaction).
///
/// Returns true if the document has an expires_at field that is in the past.
/// Expired documents are treated as logically deleted by anti-entropy
/// to prevent zombie resurrection.
fn is_document_expired(document: &Value) -> bool {
if let Some(expires_at) = document.get("_miroir_expires_at").and_then(|v| v.as_u64()) {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
expires_at <= now_ms
} else {
false
}
}
/// Fingerprint a single shard on a node (plan §13.8 step 1).
///
/// Iterates all documents with filter=_miroir_shard={id}, computes
/// hash(primary_key || content_hash) for each, and folds into a
/// streaming xxh3 digest.
/// streaming xxh3 digest. Also computes per-bucket hashes for diff.
pub async fn fingerprint_shard(
&self,
node_id: &NodeId,
@ -205,6 +243,11 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
let mut document_count = 0u64;
let mut hasher = XxHash64::with_seed(shard_id as u64); // Shard-seeded digest
// Per-bucket hashers for granular diff (plan §13.8 step 2)
let mut bucket_hashers: Vec<XxHash64> = (0..BUCKET_COUNT)
.map(|_| XxHash64::with_seed(shard_id as u64))
.collect();
// Paginated iteration through documents
loop {
let filter = serde_json::json!({ "_miroir_shard": shard_id });
@ -226,6 +269,17 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
}
for doc in &response.results {
// Skip expired documents (plan §13.14 interaction)
// Treats them as logically deleted to prevent zombie resurrection
if self.config.ttl_enabled && Self::is_document_expired(doc) {
debug!(
shard_id,
primary_key = doc.get("id").or(doc.get("_id")).and_then(|v| v.as_str()).unwrap_or(""),
"Skipping expired document in anti-entropy fingerprint"
);
continue;
}
// Extract primary key
let primary_key = doc
.get("id")
@ -245,6 +299,10 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
// Fold into shard digest
hasher.write_u64(doc_hash);
document_count += 1;
// Fold into bucket digest for granular diff
let bucket_id = Self::bucket_for_primary_key(primary_key);
bucket_hashers[bucket_id].write_u64(doc_hash);
}
offset += batch_size as u32;
@ -256,6 +314,12 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
let merkle_root = format!("xxh3:{}", hasher.finish());
// Extract per-bucket hashes
let bucket_hashes: Vec<String> = bucket_hashers
.into_iter()
.map(|h| format!("xxh3:{}", h.finish()))
.collect();
debug!(
"Fingerprinted shard {} on node {}: {} documents, root {}",
shard_id, node_id, document_count, merkle_root
@ -266,7 +330,158 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
node_id: node_id.to_string(),
merkle_root,
document_count,
bucket_hashes: Vec::new(), // Computed on-demand during diff
bucket_hashes,
})
}
/// Find divergent buckets between two fingerprints (plan §13.8 step 2).
///
/// Returns bucket IDs where the per-bucket hashes differ.
/// Each bucket isolates divergence to ~0.4% of the PK space.
pub fn diff_fingerprints(
&self,
fp_a: &ShardFingerprint,
fp_b: &ShardFingerprint,
) -> Vec<usize> {
let mut divergent = Vec::new();
if fp_a.bucket_hashes.len() != BUCKET_COUNT || fp_b.bucket_hashes.len() != BUCKET_COUNT {
// Fallback: if bucket hashes aren't computed, treat all buckets as divergent
warn!(
"Bucket hashes not computed, treating all {} buckets as divergent",
BUCKET_COUNT
);
return (0..BUCKET_COUNT).collect();
}
for (bucket_id, (hash_a, hash_b)) in fp_a
.bucket_hashes
.iter()
.zip(fp_b.bucket_hashes.iter())
.enumerate()
{
if hash_a != hash_b {
divergent.push(bucket_id);
}
}
divergent
}
/// Fetch all primary keys in a specific bucket (plan §13.8 step 2).
///
/// Returns a map of primary key to content hash for all documents
/// in the given bucket on the specified replica.
pub async fn fetch_bucket_pks(
&self,
node_id: &NodeId,
shard_id: u32,
bucket_id: usize,
index_uid: &str,
address: &str,
) -> Result<HashMap<String, u64>> {
let batch_size = self.config.fingerprint_batch_size as usize;
let mut offset = 0u32;
let mut bucket_pks = HashMap::new();
loop {
let filter = serde_json::json!({ "_miroir_shard": shard_id });
let request = FetchDocumentsRequest {
index_uid: index_uid.to_string(),
filter,
limit: batch_size as u32,
offset,
};
let response: FetchDocumentsResponse = self
.node_client
.fetch_documents(node_id, address, &request)
.await
.map_err(|e| MiroirError::Topology(format!("fetch failed: {:?}", e)))?;
if response.results.is_empty() {
break;
}
for doc in &response.results {
// Skip expired documents (plan §13.14 interaction)
if self.config.ttl_enabled && Self::is_document_expired(doc) {
continue;
}
let primary_key = doc
.get("id")
.or(doc.get("_id"))
.and_then(|v| v.as_str())
.unwrap_or("");
// Check if this document belongs to the target bucket
let doc_bucket = Self::bucket_for_primary_key(primary_key);
if doc_bucket == bucket_id {
let content_hash = Self::compute_content_hash(doc);
bucket_pks.insert(primary_key.to_string(), content_hash);
}
}
offset += batch_size as u32;
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
Ok(bucket_pks)
}
/// Compare replicas within a divergent bucket to find specific PK differences.
///
/// Returns a ReplicaDiff listing:
/// - PKs only in replica A
/// - PKs only in replica B
/// - PKs with different content hashes
pub async fn compare_bucket_replicas(
&self,
shard_id: u32,
bucket_id: usize,
node_a: &NodeId,
address_a: &str,
node_b: &NodeId,
address_b: &str,
index_uid: &str,
) -> Result<ReplicaDiff> {
let pks_a = self
.fetch_bucket_pks(node_a, shard_id, bucket_id, index_uid, address_a)
.await?;
let pks_b = self
.fetch_bucket_pks(node_b, shard_id, bucket_id, index_uid, address_b)
.await?;
let mut a_only = Vec::new();
let mut b_only = Vec::new();
let mut mismatched = Vec::new();
// Find PKs only in A or with mismatched content
for (pk, hash_a) in &pks_a {
match pks_b.get(pk) {
Some(hash_b) if hash_b != hash_a => {
mismatched.push(pk.clone());
}
None => {
a_only.push(pk.clone());
}
_ => {}
}
}
// Find PKs only in B
for pk in pks_b.keys() {
if !pks_a.contains_key(pk) {
b_only.push(pk.clone());
}
}
Ok(ReplicaDiff {
shard_id,
a_only_pks: a_only,
b_only_pks: b_only,
mismatched_pks: mismatched,
})
}
@ -374,7 +589,7 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
.fingerprint_shard(&node_id, shard_id, &self.config.index_uid, &address)
.await
{
Ok(fp) => fingerprints.push((node_id, fp)),
Ok(fp) => fingerprints.push((node_id.clone(), address, fp)),
Err(e) => {
warn!(
"Failed to fingerprint shard {} on node {}: {}",
@ -392,10 +607,10 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
}
// Compare fingerprints
let reference = &fingerprints[0].1;
let (ref_node_id, ref_address, reference) = &fingerprints[0];
let mut drift_detected = false;
for (node_id, fp) in &fingerprints[1..] {
for (node_id, address, fp) in &fingerprints[1..] {
if fp.merkle_root != reference.merkle_root {
drift_detected = true;
debug!(
@ -405,7 +620,18 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
if self.config.auto_repair {
// Perform detailed diff and repair
if let Err(e) = self.repair_shard(shard_id, reference, fp).await {
if let Err(e) = self
.repair_shard(
shard_id,
ref_node_id,
ref_address,
reference,
node_id,
address,
fp,
)
.await
{
error!(
"Failed to repair shard {} on node {}: {}",
shard_id, node_id, e
@ -418,30 +644,223 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
Ok(drift_detected)
}
/// Repair a shard by comparing replicas and applying fixes.
/// Repair a shard by comparing replicas and applying fixes (plan §13.8 step 3).
async fn repair_shard(
&self,
shard_id: u32,
reference: &ShardFingerprint,
target: &ShardFingerprint,
reference_node: &NodeId,
reference_address: &str,
reference_fp: &ShardFingerprint,
target_node: &NodeId,
target_address: &str,
target_fp: &ShardFingerprint,
) -> Result<()> {
// In a real implementation, this would:
// 1. Compute per-bucket hashes to locate divergent documents
// 2. Enumerate divergent primary keys
// 3. For each divergent PK:
// a. Check if any replica has _miroir_expires_at <= now (TTL interaction)
// b. If expired: delete from all replicas
// c. Otherwise: pick authoritative version (highest _miroir_updated_at)
// d. Write authoritative version to divergent replicas
debug!(
"Repairing shard {} on node {}",
shard_id, target.node_id
"Repairing shard {} on node {} (reference: node {})",
shard_id, target_node, reference_node
);
// Step 1: Find divergent buckets using per-bucket hashes (plan §13.8 step 2)
let divergent_buckets = self.diff_fingerprints(reference_fp, target_fp);
if divergent_buckets.is_empty() {
// No bucket-level divergence (shouldn't happen if merkle roots differ)
warn!(
"Shard {} merkle roots differ but no bucket divergence found",
shard_id
);
return Ok(());
}
debug!(
"Shard {} has {} divergent buckets out of {}",
shard_id,
divergent_buckets.len(),
BUCKET_COUNT
);
// Step 2: For each divergent bucket, enumerate divergent PKs
for bucket_id in divergent_buckets {
match self
.compare_bucket_replicas(
shard_id,
bucket_id,
reference_node,
reference_address,
target_node,
target_address,
&self.config.index_uid,
)
.await
{
Ok(diff) => {
let total_divergent =
diff.a_only_pks.len() + diff.b_only_pks.len() + diff.mismatched_pks.len();
if total_divergent > 0 {
debug!(
"Bucket {} in shard {}: {} divergent PKs ({} A-only, {} B-only, {} mismatched)",
bucket_id,
shard_id,
total_divergent,
diff.a_only_pks.len(),
diff.b_only_pks.len(),
diff.mismatched_pks.len()
);
// Step 3: For each divergent PK, apply repair (plan §13.8 step 3)
// TODO: Implement actual repair writes
// For now, just log the divergence
for pk in &diff.a_only_pks {
debug!("PK {} only exists on reference node {}", pk, reference_node);
}
for pk in &diff.b_only_pks {
debug!("PK {} only exists on target node {}", pk, target_node);
}
for pk in &diff.mismatched_pks {
debug!("PK {} has mismatched content", pk);
}
}
}
Err(e) => {
warn!(
"Failed to compare bucket {} in shard {}: {}",
bucket_id, shard_id, e
);
}
}
}
Ok(())
}
/// Cross-index bucket comparison for reshard verification (plan §13.1 step 4).
///
/// Compares two indexes with potentially different shard counts (e.g., live vs shadow
/// during resharding). Uses PK-keyed bucketing (pk-hash % 256) which is independent
/// of the shard count, enabling cross-S comparison.
///
/// Returns a ReplicaDiff listing divergent PKs across all buckets.
/// This reuses §13.8's bucketed-Merkle machinery but operates across indexes
/// rather than within a single shard.
pub async fn compare_index_buckets(
&self,
node_a: &NodeId,
address_a: &str,
index_a: &str,
shard_count_a: u32,
node_b: &NodeId,
address_b: &str,
index_b: &str,
shard_count_b: u32,
) -> Result<ReplicaDiff> {
let mut all_a_only = Vec::new();
let mut all_b_only = Vec::new();
let mut all_mismatched = Vec::new();
// Fetch all PKs and their content hashes from both indexes, bucketed by PK
let bucket_pks_a = self.fetch_all_index_pks_bucketed(node_a, address_a, index_a, shard_count_a).await?;
let bucket_pks_b = self.fetch_all_index_pks_bucketed(node_b, address_b, index_b, shard_count_b).await?;
// Compare each bucket
for bucket_id in 0..BUCKET_COUNT {
let pks_a = &bucket_pks_a[bucket_id];
let pks_b = &bucket_pks_b[bucket_id];
// Find PKs only in A or with mismatched content
for (pk, hash_a) in pks_a {
match pks_b.get(pk) {
Some(hash_b) if hash_b != hash_a => {
all_mismatched.push(pk.clone());
}
None => {
all_a_only.push(pk.clone());
}
_ => {}
}
}
// Find PKs only in B
for pk in pks_b.keys() {
if !pks_a.contains_key(pk) {
all_b_only.push(pk.clone());
}
}
}
Ok(ReplicaDiff {
shard_id: 0, // Not applicable for cross-index comparison
a_only_pks: all_a_only,
b_only_pks: all_b_only,
mismatched_pks: all_mismatched,
})
}
/// Fetch all primary keys from an index, organized by PK-keyed bucket.
///
/// This function scans all shards of the index and organizes documents
/// by their PK-hash bucket (0..255), independent of shard assignment.
/// Used for cross-index comparison during reshard verification.
async fn fetch_all_index_pks_bucketed(
&self,
node_id: &NodeId,
address: &str,
index_uid: &str,
shard_count: u32,
) -> Result<Vec<HashMap<String, u64>>> {
let batch_size = self.config.fingerprint_batch_size as usize;
let mut bucket_pks: Vec<HashMap<String, u64>> = (0..BUCKET_COUNT)
.map(|_| HashMap::new())
.collect();
// Iterate through all shards in the index
for shard_id in 0..shard_count {
let mut offset = 0u32;
loop {
let filter = serde_json::json!({ "_miroir_shard": shard_id });
let request = FetchDocumentsRequest {
index_uid: index_uid.to_string(),
filter,
limit: batch_size as u32,
offset,
};
let response: FetchDocumentsResponse = self
.node_client
.fetch_documents(node_id, address, &request)
.await
.map_err(|e| MiroirError::Topology(format!("fetch failed: {:?}", e)))?;
if response.results.is_empty() {
break;
}
for doc in &response.results {
let primary_key = doc
.get("id")
.or(doc.get("_id"))
.and_then(|v| v.as_str())
.unwrap_or("");
if primary_key.is_empty() {
continue;
}
let content_hash = Self::compute_content_hash(doc);
let bucket_id = Self::bucket_for_primary_key(primary_key);
bucket_pks[bucket_id].insert(primary_key.to_string(), content_hash);
}
offset += batch_size as u32;
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
}
Ok(bucket_pks)
}
/// Get pass history.
pub async fn pass_history(&self) -> Vec<ReconcilerPass> {
self.pass_history.read().await.clone()

View file

@ -0,0 +1,801 @@
//! P5.8.b: Anti-entropy diff step integration tests.
//!
//! Tests the diff step (plan §13.8 step 2):
//! - Per-bucket (pk-hash % 256) digest computation
//! - Divergent bucket identification
//! - Divergent primary key enumeration within buckets
//! - Reused by §13.1 reshard verify with PK-keyed bucketing
use miroir_core::anti_entropy::{
AntiEntropyConfig, AntiEntropyReconciler, ReplicaDiff, ShardFingerprint, BUCKET_COUNT,
};
use miroir_core::scatter::{FetchDocumentsRequest, FetchDocumentsResponse, NodeClient, NodeError};
use miroir_core::topology::{Node, NodeId, Topology};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
/// Test-specific node client that returns predefined responses.
#[derive(Clone)]
struct TestNodeClient {
responses: Arc<std::sync::Mutex<HashMap<NodeId, Vec<serde_json::Value>>>>,
}
impl TestNodeClient {
fn new() -> Self {
Self {
responses: Arc::new(std::sync::Mutex::new(HashMap::new())),
}
}
fn set_response(&self, node_id: &NodeId, docs: Vec<serde_json::Value>) {
self.responses.lock().unwrap().insert(node_id.clone(), docs);
}
}
impl Default for TestNodeClient {
fn default() -> Self {
Self::new()
}
}
impl NodeClient for TestNodeClient {
fn search_node(
&self,
_node: &NodeId,
_address: &str,
_request: &miroir_core::scatter::SearchRequest,
) -> impl std::future::Future<Output = std::result::Result<serde_json::Value, NodeError>> + Send {
async move {
Ok(json!({"hits": [], "estimatedTotalHits": 0, "processingTimeMs": 0}))
}
}
fn preflight_node(
&self,
_node: &NodeId,
_address: &str,
_request: &miroir_core::scatter::PreflightRequest,
) -> impl std::future::Future<Output = std::result::Result<miroir_core::scatter::PreflightResponse, NodeError>> + Send {
async move {
Ok(miroir_core::scatter::PreflightResponse {
total_docs: 0,
avg_doc_length: 0.0,
term_stats: HashMap::new(),
})
}
}
fn write_documents(
&self,
_node: &NodeId,
_address: &str,
_request: &miroir_core::scatter::WriteRequest,
) -> impl std::future::Future<Output = std::result::Result<miroir_core::scatter::WriteResponse, NodeError>> + Send {
async move {
Ok(miroir_core::scatter::WriteResponse {
success: true,
task_uid: None,
message: None,
code: None,
error_type: None,
})
}
}
fn delete_documents(
&self,
_node: &NodeId,
_address: &str,
_request: &miroir_core::scatter::DeleteByIdsRequest,
) -> impl std::future::Future<Output = std::result::Result<miroir_core::scatter::DeleteResponse, NodeError>> + Send {
async move {
Ok(miroir_core::scatter::DeleteResponse {
success: true,
task_uid: None,
message: None,
code: None,
error_type: None,
})
}
}
fn delete_documents_by_filter(
&self,
_node: &NodeId,
_address: &str,
_request: &miroir_core::scatter::DeleteByFilterRequest,
) -> impl std::future::Future<Output = std::result::Result<miroir_core::scatter::DeleteResponse, NodeError>> + Send {
async move {
Ok(miroir_core::scatter::DeleteResponse {
success: true,
task_uid: None,
message: None,
code: None,
error_type: None,
})
}
}
fn fetch_documents(
&self,
node: &NodeId,
_address: &str,
request: &FetchDocumentsRequest,
) -> impl std::future::Future<Output = std::result::Result<FetchDocumentsResponse, NodeError>> + Send {
let responses = self.responses.clone();
let node = node.clone();
async move {
let docs = responses.lock().unwrap().get(&node).cloned().unwrap_or_default();
let total = docs.len() as u64;
// Apply pagination
let start = request.offset as usize;
let end = (start + request.limit as usize).min(docs.len());
let page = if start < docs.len() {
docs[start..end].to_vec()
} else {
vec![]
};
Ok(FetchDocumentsResponse {
results: page,
limit: request.limit,
offset: request.offset,
total,
})
}
}
fn get_task_status(
&self,
_node: &NodeId,
_address: &str,
_request: &miroir_core::scatter::TaskStatusRequest,
) -> impl std::future::Future<Output = std::result::Result<miroir_core::scatter::TaskStatusResponse, NodeError>> + Send {
async move {
Ok(miroir_core::scatter::TaskStatusResponse {
task_uid: _request.task_uid,
status: "succeeded".to_string(),
error: None,
error_type: None,
})
}
}
}
#[tokio::test]
async fn test_bucket_for_primary_key_deterministic() {
// Test that bucket assignment is deterministic
let pk = "test-primary-key-123";
let bucket1 = AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key(pk);
let bucket2 = AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key(pk);
assert_eq!(bucket1, bucket2, "bucket assignment should be deterministic");
assert!(bucket1 < BUCKET_COUNT, "bucket ID should be in range");
}
#[tokio::test]
async fn test_bucket_for_primary_key_distributes() {
// Test that different keys distribute across buckets
let mut buckets = std::collections::HashSet::new();
for i in 0..1000 {
let pk = format!("key-{}", i);
let bucket = AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key(&pk);
buckets.insert(bucket);
}
// With 1000 keys and 256 buckets, we should hit many buckets
// (statistically, almost all of them)
assert!(buckets.len() > 200, "keys should distribute across many buckets");
}
#[tokio::test]
async fn test_fingerprint_shard_includes_bucket_hashes() {
// Test that fingerprinting now computes per-bucket hashes
let doc1 = json!({
"id": "doc-1",
"title": "First",
"_miroir_shard": 0,
});
let doc2 = json!({
"id": "doc-2",
"title": "Second",
"_miroir_shard": 0,
});
let mock_client = TestNodeClient::new();
let node_id = NodeId::new("node-1".to_string());
mock_client.set_response(&node_id, vec![doc1, doc2]);
let topology = Arc::new(RwLock::new(Topology::new(1, 1, 1)));
let reconciler = AntiEntropyReconciler::new(
AntiEntropyConfig::default(),
topology,
Arc::new(mock_client),
);
let result = reconciler
.fingerprint_shard(&node_id, 0, "test_index", "http://localhost")
.await
.unwrap();
assert_eq!(result.bucket_hashes.len(), BUCKET_COUNT);
// Each bucket hash should be a valid xxh3 hash string
for bucket_hash in &result.bucket_hashes {
assert!(bucket_hash.starts_with("xxh3:"));
}
}
#[tokio::test]
async fn test_diff_fingerprints_identical() {
// Test diff with identical fingerprints (no divergence)
let topology = Arc::new(RwLock::new(Topology::new(1, 1, 1)));
let reconciler = AntiEntropyReconciler::<TestNodeClient>::new(
AntiEntropyConfig::default(),
topology,
Arc::new(TestNodeClient::new()),
);
let fp = ShardFingerprint {
shard_id: 0,
node_id: "node-1".to_string(),
merkle_root: "xxh3:123".to_string(),
document_count: 100,
bucket_hashes: (0..BUCKET_COUNT).map(|_| "xxh3:abc".to_string()).collect(),
};
let divergent = reconciler.diff_fingerprints(&fp, &fp);
assert!(divergent.is_empty(), "identical fingerprints should have no divergence");
}
#[tokio::test]
async fn test_diff_fingerprints_divergent_buckets() {
// Test diff with divergent buckets
let topology = Arc::new(RwLock::new(Topology::new(1, 1, 1)));
let reconciler = AntiEntropyReconciler::<TestNodeClient>::new(
AntiEntropyConfig::default(),
topology,
Arc::new(TestNodeClient::new()),
);
let mut fp_a = ShardFingerprint {
shard_id: 0,
node_id: "node-a".to_string(),
merkle_root: "xxh3:123".to_string(),
document_count: 100,
bucket_hashes: (0..BUCKET_COUNT).map(|_| "xxh3:abc".to_string()).collect(),
};
let mut fp_b = ShardFingerprint {
shard_id: 0,
node_id: "node-b".to_string(),
merkle_root: "xxh3:456".to_string(),
document_count: 100,
bucket_hashes: (0..BUCKET_COUNT).map(|_| "xxh3:abc".to_string()).collect(),
};
// Make buckets 5, 10, 15 divergent
fp_b.bucket_hashes[5] = "xxh3:different".to_string();
fp_b.bucket_hashes[10] = "xxh3:different".to_string();
fp_b.bucket_hashes[15] = "xxh3:different".to_string();
let divergent = reconciler.diff_fingerprints(&fp_a, &fp_b);
assert_eq!(divergent.len(), 3);
assert!(divergent.contains(&5));
assert!(divergent.contains(&10));
assert!(divergent.contains(&15));
}
#[tokio::test]
async fn test_fetch_bucket_pks_filters_by_bucket() {
// Test that fetch_bucket_pks only returns PKs in the target bucket
let doc1 = json!({ "id": "key-1", "title": "Doc 1", "_miroir_shard": 0 });
let doc2 = json!({ "id": "key-2", "title": "Doc 2", "_miroir_shard": 0 });
let doc3 = json!({ "id": "key-3", "title": "Doc 3", "_miroir_shard": 0 });
// Determine which bucket each key belongs to
let bucket_1 = AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key("key-1");
let bucket_2 = AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key("key-2");
let bucket_3 = AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key("key-3");
let mock_client = TestNodeClient::new();
let node_id = NodeId::new("node-1".to_string());
mock_client.set_response(&node_id, vec![doc1, doc2, doc3]);
let topology = Arc::new(RwLock::new(Topology::new(1, 1, 1)));
let reconciler = AntiEntropyReconciler::new(
AntiEntropyConfig::default(),
topology,
Arc::new(mock_client),
);
// Fetch PKs for bucket_1 - should only contain key-1
let result = reconciler
.fetch_bucket_pks(&node_id, 0, bucket_1, "test_index", "http://localhost")
.await
.unwrap();
assert_eq!(result.len(), 1);
assert!(result.contains_key("key-1"));
assert!(!result.contains_key("key-2"));
assert!(!result.contains_key("key-3"));
}
#[tokio::test]
async fn test_compare_bucket_replicas_no_divergence() {
// Test comparing identical buckets
let doc = json!({ "id": "key-1", "title": "Same", "_miroir_shard": 0 });
let bucket_id = AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key("key-1");
let mock_client = TestNodeClient::new();
let node_a = NodeId::new("node-a".to_string());
let node_b = NodeId::new("node-b".to_string());
mock_client.set_response(&node_a, vec![doc.clone()]);
mock_client.set_response(&node_b, vec![doc]);
let topology = Arc::new(RwLock::new(Topology::new(1, 1, 1)));
let reconciler = AntiEntropyReconciler::new(
AntiEntropyConfig::default(),
topology,
Arc::new(mock_client),
);
let diff = reconciler
.compare_bucket_replicas(
0,
bucket_id,
&node_a,
"http://localhost",
&node_b,
"http://localhost",
"test_index",
)
.await
.unwrap();
assert_eq!(diff.shard_id, 0);
assert!(diff.a_only_pks.is_empty());
assert!(diff.b_only_pks.is_empty());
assert!(diff.mismatched_pks.is_empty());
}
#[tokio::test]
async fn test_compare_bucket_replicas_a_only() {
// Test PK only exists on replica A
let doc_a = json!({ "id": "key-only-a", "title": "Only A", "_miroir_shard": 0 });
let bucket_id = AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key("key-only-a");
let mock_client = TestNodeClient::new();
let node_a = NodeId::new("node-a".to_string());
let node_b = NodeId::new("node-b".to_string());
mock_client.set_response(&node_a, vec![doc_a]);
// Node B has no documents
let topology = Arc::new(RwLock::new(Topology::new(1, 1, 1)));
let reconciler = AntiEntropyReconciler::new(
AntiEntropyConfig::default(),
topology,
Arc::new(mock_client),
);
let diff = reconciler
.compare_bucket_replicas(
0,
bucket_id,
&node_a,
"http://localhost",
&node_b,
"http://localhost",
"test_index",
)
.await
.unwrap();
assert_eq!(diff.a_only_pks.len(), 1);
assert_eq!(diff.a_only_pks[0], "key-only-a");
assert!(diff.b_only_pks.is_empty());
assert!(diff.mismatched_pks.is_empty());
}
#[tokio::test]
async fn test_compare_bucket_replicas_b_only() {
// Test PK only exists on replica B
let doc_b = json!({ "id": "key-only-b", "title": "Only B", "_miroir_shard": 0 });
let bucket_id = AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key("key-only-b");
let mock_client = TestNodeClient::new();
let node_a = NodeId::new("node-a".to_string());
let node_b = NodeId::new("node-b".to_string());
// Node A has no documents
mock_client.set_response(&node_b, vec![doc_b]);
let topology = Arc::new(RwLock::new(Topology::new(1, 1, 1)));
let reconciler = AntiEntropyReconciler::new(
AntiEntropyConfig::default(),
topology,
Arc::new(mock_client),
);
let diff = reconciler
.compare_bucket_replicas(
0,
bucket_id,
&node_a,
"http://localhost",
&node_b,
"http://localhost",
"test_index",
)
.await
.unwrap();
assert!(diff.a_only_pks.is_empty());
assert_eq!(diff.b_only_pks.len(), 1);
assert_eq!(diff.b_only_pks[0], "key-only-b");
assert!(diff.mismatched_pks.is_empty());
}
#[tokio::test]
async fn test_compare_bucket_replicas_mismatched_content() {
// Test same PK but different content (different content hash)
let doc_a = json!({ "id": "key-mismatch", "title": "Version A", "_miroir_shard": 0 });
let doc_b = json!({ "id": "key-mismatch", "title": "Version B", "_miroir_shard": 0 });
let bucket_id = AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key("key-mismatch");
let mock_client = TestNodeClient::new();
let node_a = NodeId::new("node-a".to_string());
let node_b = NodeId::new("node-b".to_string());
mock_client.set_response(&node_a, vec![doc_a]);
mock_client.set_response(&node_b, vec![doc_b]);
let topology = Arc::new(RwLock::new(Topology::new(1, 1, 1)));
let reconciler = AntiEntropyReconciler::new(
AntiEntropyConfig::default(),
topology,
Arc::new(mock_client),
);
let diff = reconciler
.compare_bucket_replicas(
0,
bucket_id,
&node_a,
"http://localhost",
&node_b,
"http://localhost",
"test_index",
)
.await
.unwrap();
assert!(diff.a_only_pks.is_empty());
assert!(diff.b_only_pks.is_empty());
assert_eq!(diff.mismatched_pks.len(), 1);
assert_eq!(diff.mismatched_pks[0], "key-mismatch");
}
#[tokio::test]
async fn test_diff_fingerprints_isolates_divergence() {
// Test that divergent buckets isolate to ~0.4% of PK space
let topology = Arc::new(RwLock::new(Topology::new(1, 1, 1)));
let reconciler = AntiEntropyReconciler::<TestNodeClient>::new(
AntiEntropyConfig::default(),
topology,
Arc::new(TestNodeClient::new()),
);
// Create a fingerprint with 100 divergent buckets
let mut fp_a = ShardFingerprint {
shard_id: 0,
node_id: "node-a".to_string(),
merkle_root: "xxh3:123".to_string(),
document_count: 10000,
bucket_hashes: (0..BUCKET_COUNT).map(|_| "xxh3:same".to_string()).collect(),
};
let mut fp_b = ShardFingerprint {
shard_id: 0,
node_id: "node-b".to_string(),
merkle_root: "xxh3:456".to_string(),
document_count: 10000,
bucket_hashes: (0..BUCKET_COUNT).map(|_| "xxh3:same".to_string()).collect(),
};
// Make 100 buckets divergent
for i in 0..100 {
fp_b.bucket_hashes[i] = format!("xxh3:divergent-{}", i);
}
let divergent = reconciler.diff_fingerprints(&fp_a, &fp_b);
assert_eq!(divergent.len(), 100);
// Each divergent bucket represents ~1/256 (≈0.4%) of PK space
// 100 buckets ≈ 39% of total PK space
let isolation_ratio = divergent.len() as f64 / BUCKET_COUNT as f64;
assert!((isolation_ratio - (100.0 / 256.0)).abs() < 0.01);
}
#[tokio::test]
async fn test_bucket_count_constant() {
// Verify BUCKET_COUNT is 256 as specified in the plan
assert_eq!(BUCKET_COUNT, 256);
}
// ---------------------------------------------------------------------------
// Cross-index bucket comparison tests (plan §13.1 reshard verification)
// ---------------------------------------------------------------------------
#[tokio::test]
async fn test_compare_index_buckets_identical() {
// Test cross-index comparison with identical content
let doc1 = json!({ "id": "key-1", "title": "Same", "_miroir_shard": 0 });
let doc2 = json!({ "id": "key-2", "title": "Same", "_miroir_shard": 1 });
let doc3 = json!({ "id": "key-3", "title": "Same", "_miroir_shard": 0 });
let mock_client = TestNodeClient::new();
let node_a = NodeId::new("node-a".to_string());
let node_b = NodeId::new("node-b".to_string());
// Both nodes have the same documents
mock_client.set_response(&node_a, vec![doc1.clone(), doc2.clone(), doc3.clone()]);
mock_client.set_response(&node_b, vec![doc1, doc2, doc3]);
let topology = Arc::new(RwLock::new(Topology::new(2, 1, 1)));
let reconciler = AntiEntropyReconciler::new(
AntiEntropyConfig::default(),
topology,
Arc::new(mock_client),
);
let diff = reconciler
.compare_index_buckets(
&node_a,
"http://localhost",
"index_a",
2, // shard_count_a
&node_b,
"http://localhost",
"index_b",
2, // shard_count_b
)
.await
.unwrap();
assert!(diff.a_only_pks.is_empty());
assert!(diff.b_only_pks.is_empty());
assert!(diff.mismatched_pks.is_empty());
}
#[tokio::test]
async fn test_compare_index_buckets_a_only() {
// Test cross-index comparison with documents only in index A
let doc_a = json!({ "id": "key-only-a", "title": "Only A", "_miroir_shard": 0 });
let mock_client = TestNodeClient::new();
let node_a = NodeId::new("node-a".to_string());
let node_b = NodeId::new("node-b".to_string());
mock_client.set_response(&node_a, vec![doc_a]);
// Node B has no documents
let topology = Arc::new(RwLock::new(Topology::new(2, 1, 1)));
let reconciler = AntiEntropyReconciler::new(
AntiEntropyConfig::default(),
topology,
Arc::new(mock_client),
);
let diff = reconciler
.compare_index_buckets(
&node_a,
"http://localhost",
"index_a",
2,
&node_b,
"http://localhost",
"index_b",
2,
)
.await
.unwrap();
assert_eq!(diff.a_only_pks.len(), 1);
assert_eq!(diff.a_only_pks[0], "key-only-a");
assert!(diff.b_only_pks.is_empty());
assert!(diff.mismatched_pks.is_empty());
}
#[tokio::test]
async fn test_compare_index_buckets_b_only() {
// Test cross-index comparison with documents only in index B
let doc_b = json!({ "id": "key-only-b", "title": "Only B", "_miroir_shard": 0 });
let mock_client = TestNodeClient::new();
let node_a = NodeId::new("node-a".to_string());
let node_b = NodeId::new("node-b".to_string());
// Node A has no documents
mock_client.set_response(&node_b, vec![doc_b]);
let topology = Arc::new(RwLock::new(Topology::new(2, 1, 1)));
let reconciler = AntiEntropyReconciler::new(
AntiEntropyConfig::default(),
topology,
Arc::new(mock_client),
);
let diff = reconciler
.compare_index_buckets(
&node_a,
"http://localhost",
"index_a",
2,
&node_b,
"http://localhost",
"index_b",
2,
)
.await
.unwrap();
assert!(diff.a_only_pks.is_empty());
assert_eq!(diff.b_only_pks.len(), 1);
assert_eq!(diff.b_only_pks[0], "key-only-b");
assert!(diff.mismatched_pks.is_empty());
}
#[tokio::test]
async fn test_compare_index_buckets_mismatched_content() {
// Test cross-index comparison with same PK but different content
let doc_a = json!({ "id": "key-mismatch", "title": "Version A", "_miroir_shard": 0 });
let doc_b = json!({ "id": "key-mismatch", "title": "Version B", "_miroir_shard": 0 });
let mock_client = TestNodeClient::new();
let node_a = NodeId::new("node-a".to_string());
let node_b = NodeId::new("node-b".to_string());
mock_client.set_response(&node_a, vec![doc_a]);
mock_client.set_response(&node_b, vec![doc_b]);
let topology = Arc::new(RwLock::new(Topology::new(2, 1, 1)));
let reconciler = AntiEntropyReconciler::new(
AntiEntropyConfig::default(),
topology,
Arc::new(mock_client),
);
let diff = reconciler
.compare_index_buckets(
&node_a,
"http://localhost",
"index_a",
2,
&node_b,
"http://localhost",
"index_b",
2,
)
.await
.unwrap();
assert!(diff.a_only_pks.is_empty());
assert!(diff.b_only_pks.is_empty());
assert_eq!(diff.mismatched_pks.len(), 1);
assert_eq!(diff.mismatched_pks[0], "key-mismatch");
}
#[tokio::test]
async fn test_compare_index_buckets_across_different_shard_counts() {
// Test that PK-keyed bucketing works across different shard counts
// This is the key requirement for reshard verification (plan §13.1 step 4)
// Same PK but different shard assignments due to different S values
// With S=16: hash("key-1") % 16 = some_shard_old
// With S=32: hash("key-1") % 32 = some_shard_new
// But PK-keyed bucket (hash("key-1") % 256) should be the same
let doc_old_shard = json!({ "id": "key-reshard", "title": "Same", "_miroir_shard": 5 });
let doc_new_shard = json!({ "id": "key-reshard", "title": "Same", "_miroir_shard": 21 });
let mock_client = TestNodeClient::new();
let node_a = NodeId::new("node-a".to_string());
let node_b = NodeId::new("node-b".to_string());
// Simulate live index (S=16) and shadow index (S=32)
mock_client.set_response(&node_a, vec![doc_old_shard]);
mock_client.set_response(&node_b, vec![doc_new_shard]);
let topology = Arc::new(RwLock::new(Topology::new(32, 1, 1)));
let reconciler = AntiEntropyReconciler::new(
AntiEntropyConfig::default(),
topology,
Arc::new(mock_client),
);
let diff = reconciler
.compare_index_buckets(
&node_a,
"http://localhost",
"live_index", // S=16
16,
&node_b,
"http://localhost",
"shadow_index", // S=32
32,
)
.await
.unwrap();
// Even though the documents are in different shards, they should match
// because PK-keyed bucketing is independent of shard count
assert!(diff.a_only_pks.is_empty(), "PK should exist in both indexes");
assert!(diff.b_only_pks.is_empty(), "PK should exist in both indexes");
assert!(diff.mismatched_pks.is_empty(), "Content should be identical");
}
#[tokio::test]
async fn test_compare_index_buckets_multiple_divergent_buckets() {
// Test that divergence is isolated to specific buckets
let doc1_a = json!({ "id": "bucket-0-key-a", "title": "In A", "_miroir_shard": 0 });
let doc2_a = json!({ "id": "bucket-5-key-a", "title": "In A", "_miroir_shard": 0 });
let doc1_b = json!({ "id": "bucket-0-key-b", "title": "In B", "_miroir_shard": 0 });
let doc2_b = json!({ "id": "bucket-5-key-b", "title": "In B", "_miroir_shard": 0 });
// Determine which buckets these keys belong to
let bucket_0_key_a = AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key("bucket-0-key-a");
let bucket_5_key_a = AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key("bucket-5-key-a");
let bucket_0_key_b = AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key("bucket-0-key-b");
let bucket_5_key_b = AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key("bucket-5-key-b");
let mock_client = TestNodeClient::new();
let node_a = NodeId::new("node-a".to_string());
let node_b = NodeId::new("node-b".to_string());
mock_client.set_response(&node_a, vec![doc1_a, doc2_a]);
mock_client.set_response(&node_b, vec![doc1_b, doc2_b]);
let topology = Arc::new(RwLock::new(Topology::new(2, 1, 1)));
let reconciler = AntiEntropyReconciler::new(
AntiEntropyConfig::default(),
topology,
Arc::new(mock_client),
);
let diff = reconciler
.compare_index_buckets(
&node_a,
"http://localhost",
"index_a",
2,
&node_b,
"http://localhost",
"index_b",
2,
)
.await
.unwrap();
// Each key should only exist in one index
assert_eq!(diff.a_only_pks.len(), 2);
assert_eq!(diff.b_only_pks.len(), 2);
assert!(diff.mismatched_pks.is_empty());
// Verify the divergent keys are in different buckets
let divergent_buckets: std::collections::HashSet<_> = diff
.a_only_pks
.iter()
.chain(diff.b_only_pks.iter())
.map(|pk| AntiEntropyReconciler::<TestNodeClient>::bucket_for_primary_key(pk))
.collect();
assert!(divergent_buckets.contains(&bucket_0_key_a));
assert!(divergent_buckets.contains(&bucket_5_key_a));
assert!(divergent_buckets.contains(&bucket_0_key_b));
assert!(divergent_buckets.contains(&bucket_5_key_b));
}

View file

@ -58,3 +58,27 @@ Comprehensive tests in `/home/coding/miroir/crates/miroir-proxy/tests/p5_8_b_ant
## Reusability for §13.1 Reshard Verify
The `bucket_for_primary_key()` function is public and documented for reuse in reshard verification (plan §13.1), where PK-keyed (not shard-keyed) bucketing is needed for cross-shard comparison.
## Verification (2026-05-23)
All 18 tests in `p5_8_b_anti_entropy_diff.rs` passed:
- `test_bucket_count_constant` - Verifies BUCKET_COUNT = 256
- `test_bucket_for_primary_key_deterministic` - Verifies deterministic bucket assignment
- `test_bucket_for_primary_key_distributes` - Verifies even distribution across buckets
- `test_fingerprint_shard_includes_bucket_hashes` - Verifies per-bucket hash computation
- `test_diff_fingerprints_identical` - Tests no divergence case
- `test_diff_fingerprints_divergent_buckets` - Tests divergent bucket detection
- `test_diff_fingerprints_isolates_divergence` - Verifies ~0.4% isolation per bucket
- `test_fetch_bucket_pks_filters_by_bucket` - Tests bucket filtering
- `test_compare_bucket_replicas_no_divergence` - Tests identical buckets
- `test_compare_bucket_replicas_a_only` - Tests PK only on replica A
- `test_compare_bucket_replicas_b_only` - Tests PK only on replica B
- `test_compare_bucket_replicas_mismatched_content` - Tests content hash mismatch
- `test_compare_index_buckets_identical` - Cross-index comparison with identical content
- `test_compare_index_buckets_a_only` - Cross-index comparison with documents only in A
- `test_compare_index_buckets_b_only` - Cross-index comparison with documents only in B
- `test_compare_index_buckets_mismatched_content` - Cross-index comparison with mismatched content
- `test_compare_index_buckets_across_different_shard_counts` - PK-keyed bucketing works across different shard counts (reshard verification)
- `test_compare_index_buckets_multiple_divergent_buckets` - Divergence isolation to specific buckets
The bucket-granular re-digest implementation for P5.8.b is verified complete.