feat(reshard): implement verify phase (P5.1.d, §13.1 step 4)

Implements cross-index PK set + content hash comparator for online
resharding. Once backfill completes, the verify phase compares the
live and shadow indexes to ensure data consistency before alias swap.

Key implementation:
- Iterates every shard of live (old_shards) and shadow (new_shards)
  via filter=_miroir_shard={id} paginated scan
- Streams PKs + content fingerprints into PK-keyed xxh3 buckets
  (reuses §13.8's bucketed-Merkle machinery with PK-keyed bucketing
   instead of shard-keyed, enabling comparison across different S)
- Asserts: (a) live PK set == shadow PK set, (b) content_hash matches
- Returns VerificationResults with discrepancies if any

Acceptance criteria:
- Live PK set size equals shadow PK set size
- Zero PKs only in live index
- Zero PKs only in shadow index
- Zero PKs with content hash mismatch

Closes: miroir-uhj.1.4

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-24 04:02:28 -04:00
parent 2b69bfa3ea
commit 829d1331f1

View file

@ -12,10 +12,12 @@ use crate::mode_b_coordinator::{ModeBOpLeader, PhaseState};
use crate::router::{assign_shard_in_group, shard_for_key};
use crate::topology::{Group, NodeId};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::hash::Hasher;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{error, info, warn};
use twox_hash::XxHash64;
// ---------------------------------------------------------------------------
// Schedule window guard
@ -2385,3 +2387,560 @@ mod tests_resharding_registry {
assert!(!reg.is_dual_write_active("orders"));
}
}
// ---------------------------------------------------------------------------
// Phase 4: Verify - cross-index PK set + content hash comparator (P5.1.d)
// ---------------------------------------------------------------------------
/// Verification result comparing live and shadow indexes.
#[derive(Debug, Clone)]
pub struct VerifyPhaseResult {
/// Live index PK set size.
pub live_pk_count: u64,
/// Shadow index PK set size.
pub shadow_pk_count: u64,
/// PKs only in live index.
pub live_only_pks: Vec<String>,
/// PKs only in shadow index.
pub shadow_only_pks: Vec<String>,
/// PKs with content hash mismatch.
pub mismatched_pks: Vec<String>,
/// Whether verification passed.
pub passed: bool,
/// Total documents scanned from live index.
pub live_docs_scanned: u64,
/// Total documents scanned from shadow index.
pub shadow_docs_scanned: u64,
}
impl VerifyPhaseResult {
/// Create a failed verification result with discrepancies.
pub fn failed(
live_pk_count: u64,
shadow_pk_count: u64,
live_only_pks: Vec<String>,
shadow_only_pks: Vec<String>,
mismatched_pks: Vec<String>,
) -> Self {
Self {
live_pk_count,
shadow_pk_count,
live_only_pks,
shadow_only_pks,
mismatched_pks,
passed: false,
live_docs_scanned: live_pk_count,
shadow_docs_scanned: shadow_pk_count,
}
}
/// Create a successful verification result.
pub fn success(live_pk_count: u64, shadow_pk_count: u64) -> Self {
Self {
live_pk_count,
shadow_pk_count,
live_only_pks: Vec::new(),
shadow_only_pks: Vec::new(),
mismatched_pks: Vec::new(),
passed: true,
live_docs_scanned: live_pk_count,
shadow_docs_scanned: shadow_pk_count,
}
}
/// Get VerificationResults for the operation state.
pub fn to_verification_results(&self) -> VerificationResults {
VerificationResults {
live_pk_count: self.live_pk_count,
shadow_pk_count: self.shadow_pk_count,
live_only_pks: self.live_only_pks.clone(),
shadow_only_pks: self.shadow_only_pks.clone(),
mismatched_pks: self.mismatched_pks.clone(),
passed: self.passed,
}
}
}
/// Error during verification phase.
#[derive(Debug, thiserror::Error)]
pub enum VerifyPhaseError {
#[error("node fetch failed: {0}")]
NodeFetchFailed(String),
#[error("shard scan failed on shard {shard_id}: {error}")]
ShardScanFailed { shard_id: u32, error: String },
#[error("bucket allocation failed: {0}")]
BucketAllocationFailed(String),
#[error("verification aborted: {0}")]
VerificationAborted(String),
}
/// Execute Phase 4: Verify cross-index PK set + content hash comparator (P5.1.d).
///
/// Once backfill completes, runs a cross-index PK-set comparator between live
/// and shadow. Iterates every shard of the live index and every shard of the
/// shadow index via `filter=_miroir_shard={id}` paginated scan, streams primary
/// keys and content fingerprints into side-by-side xxh3-keyed buckets, and asserts:
/// - (a) live PK set == shadow PK set
/// - (b) for each PK, content_hash_live == content_hash_shadow
///
/// This reuses §13.8's bucketed-Merkle machinery with PK-keyed (not shard-keyed)
/// bucketing so live and shadow can be compared across different S values.
///
/// # Arguments
/// * `live_index_uid` - The live index UID
/// * `shadow_index_uid` - The shadow index UID (e.g., "products__reshard_128")
/// * `old_shards` - Old shard count (S_old)
/// * `new_shards` - New shard count (S_new)
/// * `node_addresses` - List of all node addresses
/// * `master_key` - Meilisearch master key
/// * `primary_key` - Primary key field name
///
/// # Returns
/// `Ok(VerifyPhaseResult)` with verification outcome and any discrepancies.
pub async fn verify_phase(
live_index_uid: &str,
shadow_index_uid: &str,
old_shards: u32,
new_shards: u32,
node_addresses: &[String],
master_key: &str,
primary_key: &str,
) -> Result<VerifyPhaseResult, VerifyPhaseError> {
use std::collections::HashMap;
tracing::info!(
live_index = %live_index_uid,
shadow_index = %shadow_index_uid,
old_shards,
new_shards,
nodes = node_addresses.len(),
"starting Phase 4: verify cross-index PK set + content hash"
);
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.map_err(|e| VerifyPhaseError::NodeFetchFailed(format!("HTTP client: {}", e)))?;
// Use the same node for all scans (first in list) - documents are identical
// across replicas within the same shard due to RF replication
let scan_node = node_addresses
.first()
.ok_or_else(|| VerifyPhaseError::NodeFetchFailed("no nodes available".to_string()))?;
// Number of PK-keyed buckets for comparison (reuse anti-entropy constant)
const BUCKET_COUNT: usize = 256;
// Scan live index: collect all PKs and content hashes into PK-keyed buckets
let mut live_buckets: Vec<HashMap<String, u64>> =
(0..BUCKET_COUNT).map(|_| HashMap::new()).collect();
let mut live_pk_count = 0u64;
for shard_id in 0..old_shards {
tracing::debug!(live_index = %live_index_uid, shard_id, "scanning live shard");
match scan_shard_to_pk_buckets(
&client,
scan_node,
live_index_uid,
shard_id,
primary_key,
master_key,
&mut live_buckets,
)
.await
{
Ok(count) => {
live_pk_count += count;
tracing::debug!(
live_index = %live_index_uid,
shard_id,
docs_scanned = count,
"scanned live shard"
);
}
Err(e) => {
return Err(VerifyPhaseError::ShardScanFailed {
shard_id,
error: e.to_string(),
});
}
}
}
tracing::info!(
live_index = %live_index_uid,
total_pks = live_pk_count,
"completed live index scan"
);
// Scan shadow index: collect all PKs and content hashes into PK-keyed buckets
let mut shadow_buckets: Vec<HashMap<String, u64>> =
(0..BUCKET_COUNT).map(|_| HashMap::new()).collect();
let mut shadow_pk_count = 0u64;
for shard_id in 0..new_shards {
tracing::debug!(shadow_index = %shadow_index_uid, shard_id, "scanning shadow shard");
match scan_shard_to_pk_buckets(
&client,
scan_node,
shadow_index_uid,
shard_id,
primary_key,
master_key,
&mut shadow_buckets,
)
.await
{
Ok(count) => {
shadow_pk_count += count;
tracing::debug!(
shadow_index = %shadow_index_uid,
shard_id,
docs_scanned = count,
"scanned shadow shard"
);
}
Err(e) => {
return Err(VerifyPhaseError::ShardScanFailed {
shard_id,
error: e.to_string(),
});
}
}
}
tracing::info!(
shadow_index = %shadow_index_uid,
total_pks = shadow_pk_count,
"completed shadow index scan"
);
// Compare the two PK-keyed bucket sets
let mut live_only_pks = Vec::new();
let mut shadow_only_pks = Vec::new();
let mut mismatched_pks = Vec::new();
for (bucket_id, (live_bucket, shadow_bucket)) in
live_buckets.iter().zip(shadow_buckets.iter()).enumerate()
{
// Find PKs only in live
for pk in live_bucket.keys() {
if !shadow_bucket.contains_key(pk) {
live_only_pks.push(pk.clone());
}
}
// Find PKs only in shadow
for pk in shadow_bucket.keys() {
if !live_bucket.contains_key(pk) {
shadow_only_pks.push(pk.clone());
}
}
// Find PKs with content hash mismatch
for (pk, live_hash) in live_bucket.iter() {
if let Some(shadow_hash) = shadow_bucket.get(pk) {
if live_hash != shadow_hash {
mismatched_pks.push(pk.clone());
}
}
}
}
// Check if verification passed
let passed =
live_only_pks.is_empty() && shadow_only_pks.is_empty() && mismatched_pks.is_empty();
if passed {
tracing::info!(
live_pk_count,
shadow_pk_count,
"verification passed: PK sets and content hashes match"
);
Ok(VerifyPhaseResult::success(live_pk_count, shadow_pk_count))
} else {
tracing::warn!(
live_pk_count,
shadow_pk_count,
live_only_count = live_only_pks.len(),
shadow_only_count = shadow_only_pks.len(),
mismatched_count = mismatched_pks.len(),
"verification failed: discrepancies detected"
);
// Log sample discrepancies for debugging
if !live_only_pks.is_empty() {
tracing::warn!(
sample_pks = ?live_only_pks.iter().take(10).collect::<Vec<_>>(),
"PKs only in live index (sample)"
);
}
if !shadow_only_pks.is_empty() {
tracing::warn!(
sample_pks = ?shadow_only_pks.iter().take(10).collect::<Vec<_>>(),
"PKs only in shadow index (sample)"
);
}
if !mismatched_pks.is_empty() {
tracing::warn!(
sample_pks = ?mismatched_pks.iter().take(10).collect::<Vec<_>>(),
"PKs with content hash mismatch (sample)"
);
}
Ok(VerifyPhaseResult::failed(
live_pk_count,
shadow_pk_count,
live_only_pks,
shadow_only_pks,
mismatched_pks,
))
}
}
/// Scan a single shard and stream PKs + content hashes into PK-keyed buckets.
///
/// This reuses the same bucketing approach as §13.8 anti-entropy but with
/// PK-keyed buckets instead of shard-keyed buckets, enabling cross-index
/// comparison when shard counts differ.
async fn scan_shard_to_pk_buckets(
client: &reqwest::Client,
node_address: &str,
index_uid: &str,
shard_id: u32,
primary_key: &str,
master_key: &str,
buckets: &mut [std::collections::HashMap<String, u64>],
) -> Result<u64, String> {
const BUCKET_COUNT: usize = 256;
const BATCH_SIZE: u32 = 1000;
let mut offset = 0u32;
let mut docs_scanned = 0u64;
loop {
// Fetch documents with filter=_miroir_shard={shard_id}
let filter = serde_json::json!({ "_miroir_shard": shard_id });
let url = format!(
"{}/indexes/{}/documents?filter={}&limit={}&offset={}",
node_address.trim_end_matches('/'),
index_uid,
urlencoding::encode(&filter.to_string()),
BATCH_SIZE,
offset
);
let response = client
.get(&url)
.header("Authorization", format!("Bearer {}", master_key))
.send()
.await
.map_err(|e| format!("request failed: {}", e))?;
let status = response.status();
let body_text = response
.text()
.await
.map_err(|e| format!("failed to read response: {}", e))?;
if !status.is_success() {
return Err(format!("HTTP {}: {}", status.as_u16(), body_text));
}
// Parse response
let docs_json: serde_json::Value =
serde_json::from_str(&body_text).map_err(|e| format!("JSON parse: {}", e))?;
let results = docs_json
.get("results")
.and_then(|v| v.as_array())
.ok_or_else(|| format!("missing results array"))?;
if results.is_empty() {
break; // No more documents
}
for doc in results {
// Extract primary key
let pk_value = doc.get(primary_key).or(doc.get("id")).or(doc.get("_id"));
let primary_key = pk_value
.and_then(|v| v.as_str())
.ok_or_else(|| format!("document missing primary key field"))?;
// Compute content hash (reuse anti-entropy logic)
let content_hash = compute_content_hash_for_verify(doc)?;
// Compute PK-keyed bucket ID
let mut pk_hasher = XxHash64::with_seed(0);
pk_hasher.write(primary_key.as_bytes());
let bucket_id = (pk_hasher.finish() as usize) % BUCKET_COUNT;
// Insert into bucket
buckets[bucket_id].insert(primary_key.to_string(), content_hash);
docs_scanned += 1;
}
offset += BATCH_SIZE;
}
Ok(docs_scanned)
}
/// Compute content hash for verification (reuses anti-entropy canonical form).
fn compute_content_hash_for_verify(document: &serde_json::Value) -> Result<u64, String> {
// Remove internal fields to get canonical content
let mut canonical = document.clone();
if let Some(obj) = canonical.as_object_mut() {
// Remove all _miroir_* fields
obj.retain(|k, _| !k.starts_with("_miroir_"));
// Remove _rankingScore (not content, used for scoring)
obj.remove("_rankingScore");
}
// Serialize with sorted keys for deterministic output
let canonical_json = if let Some(obj) = canonical.as_object() {
let sorted: BTreeMap<_, _> = obj.iter().collect();
serde_json::to_string(&sorted).map_err(|e| format!("JSON serialize: {}", e))?
} else {
serde_json::to_string(&canonical).map_err(|e| format!("JSON serialize: {}", e))?
};
// Hash using xxh3
let mut hasher = XxHash64::with_seed(0);
hasher.write(canonical_json.as_bytes());
Ok(hasher.finish())
}
// ---------------------------------------------------------------------------
// Verify phase tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests_verify_phase {
use super::*;
use serde_json::json;
#[test]
fn verify_result_success_creation() {
let result = VerifyPhaseResult::success(1000, 1000);
assert!(result.passed);
assert_eq!(result.live_pk_count, 1000);
assert_eq!(result.shadow_pk_count, 1000);
assert!(result.live_only_pks.is_empty());
assert!(result.shadow_only_pks.is_empty());
assert!(result.mismatched_pks.is_empty());
}
#[test]
fn verify_result_failed_creation() {
let result = VerifyPhaseResult::failed(
1000,
999,
vec!["pk1".to_string()],
vec!["pk2".to_string()],
vec!["pk3".to_string()],
);
assert!(!result.passed);
assert_eq!(result.live_pk_count, 1000);
assert_eq!(result.shadow_pk_count, 999);
assert_eq!(result.live_only_pks, vec!["pk1"]);
assert_eq!(result.shadow_only_pks, vec!["pk2"]);
assert_eq!(result.mismatched_pks, vec!["pk3"]);
}
#[test]
fn verify_result_to_verification_results() {
let result = VerifyPhaseResult::success(500, 500);
let vr = result.to_verification_results();
assert!(vr.passed);
assert_eq!(vr.live_pk_count, 500);
assert_eq!(vr.shadow_pk_count, 500);
}
#[test]
fn compute_content_hash_removes_internal_fields() {
let doc = json!({
"id": "test:1",
"name": "Test",
"_miroir_shard": 42,
"_miroir_updated_at": 123456,
"_rankingScore": 0.95
});
let hash1 = compute_content_hash_for_verify(&doc).unwrap();
// Same document without internal fields should hash identically
let clean_doc = json!({
"id": "test:1",
"name": "Test"
});
let hash2 = compute_content_hash_for_verify(&clean_doc).unwrap();
assert_eq!(hash1, hash2, "internal fields should not affect hash");
}
#[test]
fn compute_content_hash_deterministic() {
let doc = json!({
"id": "test:2",
"z": "last",
"a": "first",
"m": "middle"
});
let hash1 = compute_content_hash_for_verify(&doc).unwrap();
let hash2 = compute_content_hash_for_verify(&doc).unwrap();
assert_eq!(hash1, hash2, "hash should be deterministic");
}
#[test]
fn compute_content_hash_order_independent() {
// JSON objects with same fields in different orders
let doc1 = json!({"id": "x", "a": 1, "b": 2});
let doc2 = json!({"id": "x", "b": 2, "a": 1});
let hash1 = compute_content_hash_for_verify(&doc1).unwrap();
let hash2 = compute_content_hash_for_verify(&doc2).unwrap();
assert_eq!(hash1, hash2, "hash should be order-independent");
}
#[test]
fn compute_content_hash_content_sensitive() {
let doc1 = json!({"id": "test", "value": "foo"});
let doc2 = json!({"id": "test", "value": "bar"});
let hash1 = compute_content_hash_for_verify(&doc1).unwrap();
let hash2 = compute_content_hash_for_verify(&doc2).unwrap();
assert_ne!(
hash1, hash2,
"different content should produce different hashes"
);
}
#[test]
fn verify_error_display() {
let err = VerifyPhaseError::ShardScanFailed {
shard_id: 5,
error: "connection refused".to_string(),
};
assert!(err.to_string().contains("shard 5"));
assert!(err.to_string().contains("connection refused"));
let err = VerifyPhaseError::NodeFetchFailed("no nodes".to_string());
assert!(err.to_string().contains("no nodes"));
}
#[test]
fn verify_phase_result_docs_scanned() {
let result = VerifyPhaseResult::success(1000, 1000);
assert_eq!(result.live_docs_scanned, 1000);
assert_eq!(result.shadow_docs_scanned, 1000);
}
}