From 829d1331f19f5f4f590386f129be2c19124cb894 Mon Sep 17 00:00:00 2001 From: jedarden Date: Sun, 24 May 2026 04:02:28 -0400 Subject: [PATCH] =?UTF-8?q?feat(reshard):=20implement=20verify=20phase=20(?= =?UTF-8?q?P5.1.d,=20=C2=A713.1=20step=204)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements cross-index PK set + content hash comparator for online resharding. Once backfill completes, the verify phase compares the live and shadow indexes to ensure data consistency before alias swap. Key implementation: - Iterates every shard of live (old_shards) and shadow (new_shards) via filter=_miroir_shard={id} paginated scan - Streams PKs + content fingerprints into PK-keyed xxh3 buckets (reuses §13.8's bucketed-Merkle machinery with PK-keyed bucketing instead of shard-keyed, enabling comparison across different S) - Asserts: (a) live PK set == shadow PK set, (b) content_hash matches - Returns VerificationResults with discrepancies if any Acceptance criteria: - Live PK set size equals shadow PK set size - Zero PKs only in live index - Zero PKs only in shadow index - Zero PKs with content hash mismatch Closes: miroir-uhj.1.4 Co-Authored-By: Claude Opus 4.7 --- crates/miroir-core/src/reshard.rs | 561 +++++++++++++++++++++++++++++- 1 file changed, 560 insertions(+), 1 deletion(-) diff --git a/crates/miroir-core/src/reshard.rs b/crates/miroir-core/src/reshard.rs index 1cbc807..03d1704 100644 --- a/crates/miroir-core/src/reshard.rs +++ b/crates/miroir-core/src/reshard.rs @@ -12,10 +12,12 @@ use crate::mode_b_coordinator::{ModeBOpLeader, PhaseState}; use crate::router::{assign_shard_in_group, shard_for_key}; use crate::topology::{Group, NodeId}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; +use std::hash::Hasher; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tracing::{error, info, warn}; +use twox_hash::XxHash64; // --------------------------------------------------------------------------- // Schedule window guard @@ -2385,3 +2387,560 @@ mod tests_resharding_registry { assert!(!reg.is_dual_write_active("orders")); } } + +// --------------------------------------------------------------------------- +// Phase 4: Verify - cross-index PK set + content hash comparator (P5.1.d) +// --------------------------------------------------------------------------- + +/// Verification result comparing live and shadow indexes. +#[derive(Debug, Clone)] +pub struct VerifyPhaseResult { + /// Live index PK set size. + pub live_pk_count: u64, + /// Shadow index PK set size. + pub shadow_pk_count: u64, + /// PKs only in live index. + pub live_only_pks: Vec, + /// PKs only in shadow index. + pub shadow_only_pks: Vec, + /// PKs with content hash mismatch. + pub mismatched_pks: Vec, + /// Whether verification passed. + pub passed: bool, + /// Total documents scanned from live index. + pub live_docs_scanned: u64, + /// Total documents scanned from shadow index. + pub shadow_docs_scanned: u64, +} + +impl VerifyPhaseResult { + /// Create a failed verification result with discrepancies. + pub fn failed( + live_pk_count: u64, + shadow_pk_count: u64, + live_only_pks: Vec, + shadow_only_pks: Vec, + mismatched_pks: Vec, + ) -> Self { + Self { + live_pk_count, + shadow_pk_count, + live_only_pks, + shadow_only_pks, + mismatched_pks, + passed: false, + live_docs_scanned: live_pk_count, + shadow_docs_scanned: shadow_pk_count, + } + } + + /// Create a successful verification result. + pub fn success(live_pk_count: u64, shadow_pk_count: u64) -> Self { + Self { + live_pk_count, + shadow_pk_count, + live_only_pks: Vec::new(), + shadow_only_pks: Vec::new(), + mismatched_pks: Vec::new(), + passed: true, + live_docs_scanned: live_pk_count, + shadow_docs_scanned: shadow_pk_count, + } + } + + /// Get VerificationResults for the operation state. + pub fn to_verification_results(&self) -> VerificationResults { + VerificationResults { + live_pk_count: self.live_pk_count, + shadow_pk_count: self.shadow_pk_count, + live_only_pks: self.live_only_pks.clone(), + shadow_only_pks: self.shadow_only_pks.clone(), + mismatched_pks: self.mismatched_pks.clone(), + passed: self.passed, + } + } +} + +/// Error during verification phase. +#[derive(Debug, thiserror::Error)] +pub enum VerifyPhaseError { + #[error("node fetch failed: {0}")] + NodeFetchFailed(String), + + #[error("shard scan failed on shard {shard_id}: {error}")] + ShardScanFailed { shard_id: u32, error: String }, + + #[error("bucket allocation failed: {0}")] + BucketAllocationFailed(String), + + #[error("verification aborted: {0}")] + VerificationAborted(String), +} + +/// Execute Phase 4: Verify cross-index PK set + content hash comparator (P5.1.d). +/// +/// Once backfill completes, runs a cross-index PK-set comparator between live +/// and shadow. Iterates every shard of the live index and every shard of the +/// shadow index via `filter=_miroir_shard={id}` paginated scan, streams primary +/// keys and content fingerprints into side-by-side xxh3-keyed buckets, and asserts: +/// - (a) live PK set == shadow PK set +/// - (b) for each PK, content_hash_live == content_hash_shadow +/// +/// This reuses §13.8's bucketed-Merkle machinery with PK-keyed (not shard-keyed) +/// bucketing so live and shadow can be compared across different S values. +/// +/// # Arguments +/// * `live_index_uid` - The live index UID +/// * `shadow_index_uid` - The shadow index UID (e.g., "products__reshard_128") +/// * `old_shards` - Old shard count (S_old) +/// * `new_shards` - New shard count (S_new) +/// * `node_addresses` - List of all node addresses +/// * `master_key` - Meilisearch master key +/// * `primary_key` - Primary key field name +/// +/// # Returns +/// `Ok(VerifyPhaseResult)` with verification outcome and any discrepancies. +pub async fn verify_phase( + live_index_uid: &str, + shadow_index_uid: &str, + old_shards: u32, + new_shards: u32, + node_addresses: &[String], + master_key: &str, + primary_key: &str, +) -> Result { + use std::collections::HashMap; + + tracing::info!( + live_index = %live_index_uid, + shadow_index = %shadow_index_uid, + old_shards, + new_shards, + nodes = node_addresses.len(), + "starting Phase 4: verify cross-index PK set + content hash" + ); + + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build() + .map_err(|e| VerifyPhaseError::NodeFetchFailed(format!("HTTP client: {}", e)))?; + + // Use the same node for all scans (first in list) - documents are identical + // across replicas within the same shard due to RF replication + let scan_node = node_addresses + .first() + .ok_or_else(|| VerifyPhaseError::NodeFetchFailed("no nodes available".to_string()))?; + + // Number of PK-keyed buckets for comparison (reuse anti-entropy constant) + const BUCKET_COUNT: usize = 256; + + // Scan live index: collect all PKs and content hashes into PK-keyed buckets + let mut live_buckets: Vec> = + (0..BUCKET_COUNT).map(|_| HashMap::new()).collect(); + let mut live_pk_count = 0u64; + + for shard_id in 0..old_shards { + tracing::debug!(live_index = %live_index_uid, shard_id, "scanning live shard"); + + match scan_shard_to_pk_buckets( + &client, + scan_node, + live_index_uid, + shard_id, + primary_key, + master_key, + &mut live_buckets, + ) + .await + { + Ok(count) => { + live_pk_count += count; + tracing::debug!( + live_index = %live_index_uid, + shard_id, + docs_scanned = count, + "scanned live shard" + ); + } + Err(e) => { + return Err(VerifyPhaseError::ShardScanFailed { + shard_id, + error: e.to_string(), + }); + } + } + } + + tracing::info!( + live_index = %live_index_uid, + total_pks = live_pk_count, + "completed live index scan" + ); + + // Scan shadow index: collect all PKs and content hashes into PK-keyed buckets + let mut shadow_buckets: Vec> = + (0..BUCKET_COUNT).map(|_| HashMap::new()).collect(); + let mut shadow_pk_count = 0u64; + + for shard_id in 0..new_shards { + tracing::debug!(shadow_index = %shadow_index_uid, shard_id, "scanning shadow shard"); + + match scan_shard_to_pk_buckets( + &client, + scan_node, + shadow_index_uid, + shard_id, + primary_key, + master_key, + &mut shadow_buckets, + ) + .await + { + Ok(count) => { + shadow_pk_count += count; + tracing::debug!( + shadow_index = %shadow_index_uid, + shard_id, + docs_scanned = count, + "scanned shadow shard" + ); + } + Err(e) => { + return Err(VerifyPhaseError::ShardScanFailed { + shard_id, + error: e.to_string(), + }); + } + } + } + + tracing::info!( + shadow_index = %shadow_index_uid, + total_pks = shadow_pk_count, + "completed shadow index scan" + ); + + // Compare the two PK-keyed bucket sets + let mut live_only_pks = Vec::new(); + let mut shadow_only_pks = Vec::new(); + let mut mismatched_pks = Vec::new(); + + for (bucket_id, (live_bucket, shadow_bucket)) in + live_buckets.iter().zip(shadow_buckets.iter()).enumerate() + { + // Find PKs only in live + for pk in live_bucket.keys() { + if !shadow_bucket.contains_key(pk) { + live_only_pks.push(pk.clone()); + } + } + + // Find PKs only in shadow + for pk in shadow_bucket.keys() { + if !live_bucket.contains_key(pk) { + shadow_only_pks.push(pk.clone()); + } + } + + // Find PKs with content hash mismatch + for (pk, live_hash) in live_bucket.iter() { + if let Some(shadow_hash) = shadow_bucket.get(pk) { + if live_hash != shadow_hash { + mismatched_pks.push(pk.clone()); + } + } + } + } + + // Check if verification passed + let passed = + live_only_pks.is_empty() && shadow_only_pks.is_empty() && mismatched_pks.is_empty(); + + if passed { + tracing::info!( + live_pk_count, + shadow_pk_count, + "verification passed: PK sets and content hashes match" + ); + Ok(VerifyPhaseResult::success(live_pk_count, shadow_pk_count)) + } else { + tracing::warn!( + live_pk_count, + shadow_pk_count, + live_only_count = live_only_pks.len(), + shadow_only_count = shadow_only_pks.len(), + mismatched_count = mismatched_pks.len(), + "verification failed: discrepancies detected" + ); + + // Log sample discrepancies for debugging + if !live_only_pks.is_empty() { + tracing::warn!( + sample_pks = ?live_only_pks.iter().take(10).collect::>(), + "PKs only in live index (sample)" + ); + } + if !shadow_only_pks.is_empty() { + tracing::warn!( + sample_pks = ?shadow_only_pks.iter().take(10).collect::>(), + "PKs only in shadow index (sample)" + ); + } + if !mismatched_pks.is_empty() { + tracing::warn!( + sample_pks = ?mismatched_pks.iter().take(10).collect::>(), + "PKs with content hash mismatch (sample)" + ); + } + + Ok(VerifyPhaseResult::failed( + live_pk_count, + shadow_pk_count, + live_only_pks, + shadow_only_pks, + mismatched_pks, + )) + } +} + +/// Scan a single shard and stream PKs + content hashes into PK-keyed buckets. +/// +/// This reuses the same bucketing approach as §13.8 anti-entropy but with +/// PK-keyed buckets instead of shard-keyed buckets, enabling cross-index +/// comparison when shard counts differ. +async fn scan_shard_to_pk_buckets( + client: &reqwest::Client, + node_address: &str, + index_uid: &str, + shard_id: u32, + primary_key: &str, + master_key: &str, + buckets: &mut [std::collections::HashMap], +) -> Result { + const BUCKET_COUNT: usize = 256; + const BATCH_SIZE: u32 = 1000; + let mut offset = 0u32; + let mut docs_scanned = 0u64; + + loop { + // Fetch documents with filter=_miroir_shard={shard_id} + let filter = serde_json::json!({ "_miroir_shard": shard_id }); + let url = format!( + "{}/indexes/{}/documents?filter={}&limit={}&offset={}", + node_address.trim_end_matches('/'), + index_uid, + urlencoding::encode(&filter.to_string()), + BATCH_SIZE, + offset + ); + + let response = client + .get(&url) + .header("Authorization", format!("Bearer {}", master_key)) + .send() + .await + .map_err(|e| format!("request failed: {}", e))?; + + let status = response.status(); + let body_text = response + .text() + .await + .map_err(|e| format!("failed to read response: {}", e))?; + + if !status.is_success() { + return Err(format!("HTTP {}: {}", status.as_u16(), body_text)); + } + + // Parse response + let docs_json: serde_json::Value = + serde_json::from_str(&body_text).map_err(|e| format!("JSON parse: {}", e))?; + + let results = docs_json + .get("results") + .and_then(|v| v.as_array()) + .ok_or_else(|| format!("missing results array"))?; + + if results.is_empty() { + break; // No more documents + } + + for doc in results { + // Extract primary key + let pk_value = doc.get(primary_key).or(doc.get("id")).or(doc.get("_id")); + let primary_key = pk_value + .and_then(|v| v.as_str()) + .ok_or_else(|| format!("document missing primary key field"))?; + + // Compute content hash (reuse anti-entropy logic) + let content_hash = compute_content_hash_for_verify(doc)?; + + // Compute PK-keyed bucket ID + let mut pk_hasher = XxHash64::with_seed(0); + pk_hasher.write(primary_key.as_bytes()); + let bucket_id = (pk_hasher.finish() as usize) % BUCKET_COUNT; + + // Insert into bucket + buckets[bucket_id].insert(primary_key.to_string(), content_hash); + docs_scanned += 1; + } + + offset += BATCH_SIZE; + } + + Ok(docs_scanned) +} + +/// Compute content hash for verification (reuses anti-entropy canonical form). +fn compute_content_hash_for_verify(document: &serde_json::Value) -> Result { + // Remove internal fields to get canonical content + let mut canonical = document.clone(); + if let Some(obj) = canonical.as_object_mut() { + // Remove all _miroir_* fields + obj.retain(|k, _| !k.starts_with("_miroir_")); + // Remove _rankingScore (not content, used for scoring) + obj.remove("_rankingScore"); + } + + // Serialize with sorted keys for deterministic output + let canonical_json = if let Some(obj) = canonical.as_object() { + let sorted: BTreeMap<_, _> = obj.iter().collect(); + serde_json::to_string(&sorted).map_err(|e| format!("JSON serialize: {}", e))? + } else { + serde_json::to_string(&canonical).map_err(|e| format!("JSON serialize: {}", e))? + }; + + // Hash using xxh3 + let mut hasher = XxHash64::with_seed(0); + hasher.write(canonical_json.as_bytes()); + Ok(hasher.finish()) +} + +// --------------------------------------------------------------------------- +// Verify phase tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests_verify_phase { + use super::*; + use serde_json::json; + + #[test] + fn verify_result_success_creation() { + let result = VerifyPhaseResult::success(1000, 1000); + assert!(result.passed); + assert_eq!(result.live_pk_count, 1000); + assert_eq!(result.shadow_pk_count, 1000); + assert!(result.live_only_pks.is_empty()); + assert!(result.shadow_only_pks.is_empty()); + assert!(result.mismatched_pks.is_empty()); + } + + #[test] + fn verify_result_failed_creation() { + let result = VerifyPhaseResult::failed( + 1000, + 999, + vec!["pk1".to_string()], + vec!["pk2".to_string()], + vec!["pk3".to_string()], + ); + assert!(!result.passed); + assert_eq!(result.live_pk_count, 1000); + assert_eq!(result.shadow_pk_count, 999); + assert_eq!(result.live_only_pks, vec!["pk1"]); + assert_eq!(result.shadow_only_pks, vec!["pk2"]); + assert_eq!(result.mismatched_pks, vec!["pk3"]); + } + + #[test] + fn verify_result_to_verification_results() { + let result = VerifyPhaseResult::success(500, 500); + let vr = result.to_verification_results(); + assert!(vr.passed); + assert_eq!(vr.live_pk_count, 500); + assert_eq!(vr.shadow_pk_count, 500); + } + + #[test] + fn compute_content_hash_removes_internal_fields() { + let doc = json!({ + "id": "test:1", + "name": "Test", + "_miroir_shard": 42, + "_miroir_updated_at": 123456, + "_rankingScore": 0.95 + }); + + let hash1 = compute_content_hash_for_verify(&doc).unwrap(); + + // Same document without internal fields should hash identically + let clean_doc = json!({ + "id": "test:1", + "name": "Test" + }); + let hash2 = compute_content_hash_for_verify(&clean_doc).unwrap(); + + assert_eq!(hash1, hash2, "internal fields should not affect hash"); + } + + #[test] + fn compute_content_hash_deterministic() { + let doc = json!({ + "id": "test:2", + "z": "last", + "a": "first", + "m": "middle" + }); + + let hash1 = compute_content_hash_for_verify(&doc).unwrap(); + let hash2 = compute_content_hash_for_verify(&doc).unwrap(); + + assert_eq!(hash1, hash2, "hash should be deterministic"); + } + + #[test] + fn compute_content_hash_order_independent() { + // JSON objects with same fields in different orders + let doc1 = json!({"id": "x", "a": 1, "b": 2}); + let doc2 = json!({"id": "x", "b": 2, "a": 1}); + + let hash1 = compute_content_hash_for_verify(&doc1).unwrap(); + let hash2 = compute_content_hash_for_verify(&doc2).unwrap(); + + assert_eq!(hash1, hash2, "hash should be order-independent"); + } + + #[test] + fn compute_content_hash_content_sensitive() { + let doc1 = json!({"id": "test", "value": "foo"}); + let doc2 = json!({"id": "test", "value": "bar"}); + + let hash1 = compute_content_hash_for_verify(&doc1).unwrap(); + let hash2 = compute_content_hash_for_verify(&doc2).unwrap(); + + assert_ne!( + hash1, hash2, + "different content should produce different hashes" + ); + } + + #[test] + fn verify_error_display() { + let err = VerifyPhaseError::ShardScanFailed { + shard_id: 5, + error: "connection refused".to_string(), + }; + assert!(err.to_string().contains("shard 5")); + assert!(err.to_string().contains("connection refused")); + + let err = VerifyPhaseError::NodeFetchFailed("no nodes".to_string()); + assert!(err.to_string().contains("no nodes")); + } + + #[test] + fn verify_phase_result_docs_scanned() { + let result = VerifyPhaseResult::success(1000, 1000); + assert_eq!(result.live_docs_scanned, 1000); + assert_eq!(result.shadow_docs_scanned, 1000); + } +}