diff --git a/crates/miroir-core/src/reshard.rs b/crates/miroir-core/src/reshard.rs index 8014f3b..c0c9c00 100644 --- a/crates/miroir-core/src/reshard.rs +++ b/crates/miroir-core/src/reshard.rs @@ -3178,3 +3178,870 @@ mod tests_verify_phase { assert_eq!(result.shadow_docs_scanned, 1000); } } + +// --------------------------------------------------------------------------- +// Phase 3: Backfill - stream live index to shadow (plan §13.1 step 3) +// --------------------------------------------------------------------------- + +/// Result of the backfill phase. +#[derive(Debug, Clone)] +pub struct BackfillResult { + /// Live index UID. + pub live_index: String, + /// Shadow index UID. + pub shadow_index: String, + /// Old shard count. + pub old_shards: u32, + /// New shard count. + pub new_shards: u32, + /// Total documents backfilled. + pub documents_backfilled: u64, + /// Total documents estimated (for progress tracking). + pub total_estimated: u64, + /// Duration in seconds. + pub duration_secs: f64, + /// Per-shard backfill counts. + pub shard_counts: Vec<(u32, u64)>, +} + +/// Error during backfill phase. +#[derive(Debug, thiserror::Error)] +pub enum BackfillError { + #[error("node fetch failed: {0}")] + NodeFetchFailed(String), + + #[error("shard backfill failed on shard {shard_id}: {error}")] + ShardBackfillFailed { shard_id: u32, error: String }, + + #[error("throttle wait failed: {0}")] + ThrottleFailed(String), + + #[error("backfill aborted: {0}")] + BackfillAborted(String), +} + +/// Progress callback for backfill phase. +/// +/// Called after each shard completes with (shard_id, docs_backfilled, total_shards). +pub type BackfillProgressCallback = Arc; + +/// Execute Phase 3: Backfill from live index to shadow (plan §13.1 step 3). +/// +/// Pages through every live-index shard using `filter=_miroir_shard={id}`, +/// re-hashes each document under the new shard count, and writes to the shadow. +/// Writes are tagged with `_miroir_origin: "reshard_backfill"` for CDC suppression. +/// +/// # Arguments +/// * `live_index_uid` - The live index UID +/// * `shadow_index_uid` - The shadow index UID (e.g., "products__reshard_128") +/// * `old_shards` - Old shard count (S_old) +/// * `new_shards` - New shard count (S_new) +/// * `node_addresses` - List of all node addresses +/// * `master_key` - Meilisearch master key +/// * `primary_key` - Primary key field name +/// * `throttle_docs_per_sec` - Throttle limit (0 = unlimited) +/// * `batch_size` - Documents per batch +/// * `progress_callback` - Optional callback for progress updates +/// +/// # Returns +/// `Ok(BackfillResult)` with backfill statistics on success. +pub async fn backfill_phase( + live_index_uid: &str, + shadow_index_uid: &str, + old_shards: u32, + new_shards: u32, + node_addresses: &[String], + master_key: &str, + primary_key: &str, + throttle_docs_per_sec: u64, + batch_size: usize, + progress_callback: Option, +) -> Result { + use std::time::{SystemTime, UNIX_EPOCH}; + + let start_time = SystemTime::now(); + tracing::info!( + live_index = %live_index_uid, + shadow_index = %shadow_index_uid, + old_shards, + new_shards, + throttle_docs_per_sec, + "starting Phase 3: backfill live to shadow" + ); + + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build() + .map_err(|e| BackfillError::NodeFetchFailed(format!("HTTP client: {}", e)))?; + + // Use the first node for all operations (documents are identical across replicas) + let target_node = node_addresses + .first() + .ok_or_else(|| BackfillError::NodeFetchFailed("no nodes available".to_string()))?; + + let mut total_backfilled = 0u64; + let mut shard_counts: Vec<(u32, u64)> = Vec::new(); + let mut last_throttle_time = SystemTime::now(); + + // Process each shard from the live index + for shard_id in 0..old_shards { + tracing::debug!( + live_index = %live_index_uid, + shard_id, + "starting backfill for shard" + ); + + let shard_docs = backfill_single_shard( + &client, + target_node, + live_index_uid, + shadow_index_uid, + shard_id, + old_shards, + new_shards, + master_key, + primary_key, + batch_size, + ) + .await + .map_err(|e| BackfillError::ShardBackfillFailed { + shard_id, + error: e.to_string(), + })?; + + shard_counts.push((shard_id, shard_docs)); + total_backfilled += shard_docs; + + tracing::debug!( + live_index = %live_index_uid, + shard_id, + docs_backfilled = shard_docs, + total_so_far = total_backfilled, + "completed backfill for shard" + ); + + // Call progress callback if provided + if let Some(ref cb) = progress_callback { + cb(shard_id, total_backfilled, old_shards); + } + + // Apply throttling if configured + if throttle_docs_per_sec > 0 && shard_docs > 0 { + let docs_in_batch = shard_docs as f64; + let target_duration_secs = docs_in_batch / throttle_docs_per_sec as f64; + let target_duration = std::time::Duration::from_secs_f64(target_duration_secs); + + if let Ok(elapsed) = last_throttle_time.elapsed() { + if elapsed < target_duration { + let wait_time = target_duration - elapsed; + tracing::trace!( + shard_id, + wait_ms = wait_time.as_millis(), + "throttling backfill" + ); + if let Err(e) = tokio::time::sleep(wait_time).await { + tracing::warn!( + error = %e, + "throttle sleep interrupted, continuing" + ); + } + } + } + + last_throttle_time = SystemTime::now(); + } + } + + let duration_secs = start_time.elapsed().unwrap_or_default().as_secs_f64(); + + tracing::info!( + live_index = %live_index_uid, + shadow_index = %shadow_index_uid, + total_backfilled, + duration_secs, + docs_per_sec = if duration_secs > 0.0 { + total_backfilled as f64 / duration_secs + } else { + 0.0 + }, + "backfill phase completed" + ); + + Ok(BackfillResult { + live_index: live_index_uid.to_string(), + shadow_index: shadow_index_uid.to_string(), + old_shards, + new_shards, + documents_backfilled: total_backfilled, + total_estimated: total_backfilled, // In production, we'd estimate from stats + duration_secs, + shard_counts, + }) +} + +/// Backfill a single shard from live to shadow index. +/// +/// Reads all documents from the live index for a given shard, +/// re-hashes them under the new shard count, and writes to shadow. +async fn backfill_single_shard( + client: &reqwest::Client, + node_address: &str, + live_index_uid: &str, + shadow_index_uid: &str, + shard_id: u32, + old_shards: u32, + new_shards: u32, + master_key: &str, + primary_key: &str, + batch_size: usize, +) -> Result { + const BATCH_LIMIT: u32 = 1000; + let mut offset = 0u32; + let mut total_backfilled = 0u64; + + loop { + // Fetch documents with filter=_miroir_shard={shard_id} + let filter = serde_json::json!({ "_miroir_shard": shard_id }); + let url = format!( + "{}/indexes/{}/documents?filter={}&limit={}&offset={}", + node_address.trim_end_matches('/'), + live_index_uid, + urlencoding::encode(&filter.to_string()), + BATCH_LIMIT.min(batch_size as u32), + offset + ); + + let response = client + .get(&url) + .header("Authorization", format!("Bearer {}", master_key)) + .send() + .await + .map_err(|e| format!("fetch failed: {}", e))?; + + let status = response.status(); + let body_text = response + .text() + .await + .map_err(|e| format!("failed to read response: {}", e))?; + + if !status.is_success() { + return Err(format!("HTTP {}: {}", status.as_u16(), body_text)); + } + + // Parse response + let docs_json: serde_json::Value = + serde_json::from_str(&body_text).map_err(|e| format!("JSON parse: {}", e))?; + + let results = docs_json + .get("results") + .and_then(|v| v.as_array()) + .ok_or_else(|| format!("missing results array"))?; + + if results.is_empty() { + break; // No more documents + } + + // Prepare shadow documents with new shard tags + let mut shadow_documents = Vec::with_capacity(results.len()); + for doc in results { + // Extract primary key + let pk_value = doc + .get(primary_key) + .or(doc.get("id")) + .or(doc.get("_id")) + .and_then(|v| v.as_str()) + .ok_or_else(|| format!("document missing primary key field: {}", primary_key))?; + + // Compute new shard assignment for shadow index + let new_shard_id = crate::router::shard_for_key(pk_value, new_shards); + + // Clone document for shadow index with new shard tag + let mut shadow_doc = doc.clone(); + shadow_doc["_miroir_shard"] = serde_json::json!(new_shard_id); + // Tag for CDC suppression + shadow_doc["_miroir_origin"] = serde_json::json!("reshard_backfill"); + shadow_documents.push(shadow_doc); + } + + // Write batch to shadow index + write_backfill_batch( + client, + node_address, + shadow_index_uid, + &shadow_documents, + master_key, + ) + .await?; + + total_backfilled += shadow_documents.len() as u64; + offset += BATCH_LIMIT; + } + + Ok(total_backfilled) +} + +/// Write a batch of documents to the shadow index during backfill. +async fn write_backfill_batch( + client: &reqwest::Client, + node_address: &str, + shadow_index_uid: &str, + documents: &[serde_json::Value], + master_key: &str, +) -> Result<(), String> { + let url = format!( + "{}/indexes/{}/documents", + node_address.trim_end_matches('/'), + shadow_index_uid + ); + + let response = client + .post(&url) + .header("Authorization", format!("Bearer {}", master_key)) + .json(documents) + .send() + .await + .map_err(|e| format!("request failed: {}", e))?; + + let status = response.status(); + let body_text = response.text().await.unwrap_or_default(); + + if !status.is_success() { + return Err(format!("HTTP {}: {}", status.as_u16(), body_text)); + } + + Ok(()) +} + +// --------------------------------------------------------------------------- +// Phase 6: Cleanup - delete old index after retention (plan §13.1 step 6) +// --------------------------------------------------------------------------- + +/// Result of the cleanup phase. +#[derive(Debug, Clone)] +pub struct CleanupResult { + /// Old index UID that was deleted. + pub old_index: String, + /// Shadow index UID (now the live index). + pub new_index: String, + /// Nodes the old index was deleted from. + pub nodes_deleted_from: Vec, + /// Timestamp of cleanup completion (UNIX ms). + pub completed_at: u64, +} + +/// Error during cleanup phase. +#[derive(Debug, thiserror::Error)] +pub enum CleanupError { + #[error("node deletion failed on {node}: {error}")] + NodeDeletionFailed { node: String, error: String }, + + #[error("cleanup aborted: {0}")] + CleanupAborted(String), +} + +/// Execute Phase 6: Cleanup old index after retention (plan §13.1 step 6). +/// +/// Deletes the live index from all nodes after the retention period. +/// The shadow index is now the live index after the alias swap. +/// +/// # Arguments +/// * `old_index_uid` - The old live index UID to delete +/// * `new_index_uid` - The shadow index UID (now live) +/// * `node_addresses` - List of all node addresses +/// * `master_key` - Meilisearch master key +/// +/// # Returns +/// `Ok(CleanupResult)` with cleanup details on success. +/// +/// # Rollback +/// If cleanup fails on some nodes, the index remains partially available. +/// Operators can manually retry cleanup on failed nodes. +pub async fn cleanup_phase( + old_index_uid: &str, + new_index_uid: &str, + node_addresses: &[String], + master_key: &str, +) -> Result { + use std::time::{SystemTime, UNIX_EPOCH}; + + tracing::info!( + old_index = %old_index_uid, + new_index = %new_index_uid, + nodes = node_addresses.len(), + "starting Phase 6: cleanup old index" + ); + + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build() + .map_err(|e| CleanupError::CleanupAborted(format!("HTTP client: {}", e)))?; + + let mut nodes_deleted_from = Vec::new(); + let mut errors = Vec::new(); + + for address in node_addresses { + let url = format!( + "{}/indexes/{}", + address.trim_end_matches('/'), + old_index_uid + ); + + match client + .delete(&url) + .header("Authorization", format!("Bearer {}", master_key)) + .send() + .await + { + Ok(resp) if resp.status().is_success() => { + tracing::debug!(node = %address, index = %old_index_uid, "deleted old index"); + nodes_deleted_from.push(address.clone()); + } + Ok(resp) => { + let status = resp.status(); + let error = format!("HTTP {}", status.as_u16()); + tracing::error!(node = %address, index = %old_index_uid, error, "failed to delete old index"); + errors.push((address.clone(), error)); + } + Err(e) => { + tracing::error!(node = %address, index = %old_index_uid, error = %e, "request failed"); + errors.push((address.clone(), e.to_string())); + } + } + } + + let completed_at = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + if !errors.is_empty() { + tracing::warn!( + old_index = %old_index_uid, + deleted_from = nodes_deleted_from.len(), + failed_on = errors.len(), + "cleanup completed with errors" + ); + } else { + tracing::info!( + old_index = %old_index_uid, + deleted_from_all = nodes_deleted_from.len(), + "cleanup phase completed successfully" + ); + } + + Ok(CleanupResult { + old_index: old_index_uid.to_string(), + new_index: new_index_uid.to_string(), + nodes_deleted_from, + completed_at, + }) +} + +// --------------------------------------------------------------------------- +// Tests for backfill and cleanup phases +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests_backfill_cleanup { + use super::*; + + #[test] + fn backfill_result_fields() { + let result = BackfillResult { + live_index: "products".to_string(), + shadow_index: "products__reshard_128".to_string(), + old_shards: 64, + new_shards: 128, + documents_backfilled: 1000000, + total_estimated: 1000000, + duration_secs: 3600.0, + shard_counts: vec![(0, 15625), (1, 15625), (2, 15625)], + }; + + assert_eq!(result.live_index, "products"); + assert_eq!(result.shadow_index, "products__reshard_128"); + assert_eq!(result.old_shards, 64); + assert_eq!(result.new_shards, 128); + assert_eq!(result.documents_backfilled, 1000000); + assert_eq!(result.duration_secs, 3600.0); + assert_eq!(result.shard_counts.len(), 3); + } + + #[test] + fn cleanup_result_fields() { + let result = CleanupResult { + old_index: "products".to_string(), + new_index: "products__reshard_128".to_string(), + nodes_deleted_from: vec!["node-1".to_string(), "node-2".to_string()], + completed_at: 1704067200000, + }; + + assert_eq!(result.old_index, "products"); + assert_eq!(result.new_index, "products__reshard_128"); + assert_eq!(result.nodes_deleted_from.len(), 2); + assert_eq!(result.completed_at, 1704067200000); + } + + #[test] + fn backfill_error_display() { + let err = BackfillError::ShardBackfillFailed { + shard_id: 5, + error: "connection refused".to_string(), + }; + assert!(err.to_string().contains("shard 5")); + assert!(err.to_string().contains("connection refused")); + + let err = BackfillError::NodeFetchFailed("no nodes".to_string()); + assert!(err.to_string().contains("no nodes")); + } + + #[test] + fn cleanup_error_display() { + let err = CleanupError::NodeDeletionFailed { + node: "node-1".to_string(), + error: "timeout".to_string(), + }; + assert!(err.to_string().contains("node-1")); + assert!(err.to_string().contains("timeout")); + } +} + +// --------------------------------------------------------------------------- +// Reshard orchestrator - sequences all six phases (plan §13.1) +// --------------------------------------------------------------------------- + +/// Configuration for the reshard orchestrator. +#[derive(Debug, Clone)] +pub struct ReshardOrchestratorConfig { + /// Index UID being resharded. + pub index_uid: String, + /// Target shard count. + pub target_shards: u32, + /// Node addresses. + pub node_addresses: Vec, + /// Master key for Meilisearch. + pub master_key: String, + /// Primary key field name. + pub primary_key: String, + /// Backfill throttle (docs/sec, 0 = unlimited). + pub throttle_docs_per_sec: u64, + /// Backfill batch size. + pub backfill_batch_size: usize, + /// Retention period for old index (hours). + pub retain_old_index_hours: u64, + /// Whether to verify before swap. + pub verify_before_swap: bool, + /// History retention for alias. + pub alias_history_retention: usize, + /// Task store for persistence. + pub task_store: Option>, + /// Metrics callback for phase transitions. + pub metrics_callback: Option, +} + +/// Callback for metrics emission during resharding. +pub type ReshardMetricsCallback = Arc; + +/// Result of the full reshard operation. +#[derive(Debug, Clone)] +pub struct ReshardOrchestratorResult { + /// Index that was resharded. + pub index_uid: String, + /// Old shard count. + pub old_shards: u32, + /// New shard count. + pub new_shards: u32, + /// Shadow index created. + pub shadow_index: String, + /// Documents backfilled. + pub documents_backfilled: u64, + /// Total duration (seconds). + pub total_duration_secs: f64, + /// Whether verification passed. + pub verification_passed: bool, + /// Final phase reached. + pub final_phase: ReshardPhase, +} + +/// Execute the full six-phase online resharding flow (plan §13.1). +/// +/// This orchestrator sequences all phases with proper error handling: +/// - Phases 1-4: Any failure deletes shadow and aborts (invisible to clients) +/// - Phase 5: After alias swap, rollback is a reverse alias flip +/// - Phase 6: Cleanup after retention period +/// +/// # Arguments +/// * `config` - Orchestrator configuration +/// +/// # Returns +/// `Ok(ReshardOrchestratorResult)` on successful completion. +/// +/// # Rollback +/// Failures before Phase 5 trigger automatic rollback (shadow deletion). +/// After Phase 5, manual rollback via alias flip is required. +pub async fn execute_reshard( + config: ReshardOrchestratorConfig, +) -> Result { + use std::time::{SystemTime, UNIX_EPOCH}; + + let start_time = SystemTime::now(); + let old_shards = config.target_shards / 2; // Assume doubling for now + let shadow_index = format!("{}__reshard_{}", config.index_uid, config.target_shards); + + tracing::info!( + index = %config.index_uid, + old_shards, + new_shards = config.target_shards, + "starting six-phase online resharding" + ); + + // Emit metrics for phase transition + let emit_phase = |phase: ReshardPhase, docs: u64| { + if let Some(ref cb) = config.metrics_callback { + cb(phase, docs); + } + }; + + // Phase 1: Shadow create + emit_phase(ReshardPhase::ShadowCreated, 0); + let _shadow_result = shadow_create_phase( + &config.index_uid, + config.target_shards, + &config.node_addresses, + &config.master_key, + Some(config.primary_key.clone()), + ) + .await + .map_err(|e| { + // Phase 1 already handles rollback internally + format!("Phase 1 shadow create failed: {}", e) + })?; + + tracing::info!( + shadow_index = %shadow_index, + "Phase 1 complete: shadow index created" + ); + + // Phase 2: Dual-write is handled by the write path detecting the resharding registry + emit_phase(ReshardPhase::DualWriteActive, 0); + tracing::info!("Phase 2 active: dual-hash dual-write enabled"); + + // Phase 3: Backfill + emit_phase(ReshardPhase::BackfillInProgress, 0); + let backfill_result = backfill_phase( + &config.index_uid, + &shadow_index, + old_shards, + config.target_shards, + &config.node_addresses, + &config.master_key, + &config.primary_key, + config.throttle_docs_per_sec, + config.backfill_batch_size, + None, // Progress callback - could be added later + ) + .await + .map_err(|e| { + // Rollback: delete shadow index + tracing::error!(error = %e, "Phase 3 backfill failed, rolling back"); + let _ = rollback_shadow_orchestrator(&shadow_index, &config); + format!("Phase 3 backfill failed: {}", e) + })?; + + emit_phase( + ReshardPhase::BackfillInProgress, + backfill_result.documents_backfilled, + ); + tracing::info!( + documents_backfilled = backfill_result.documents_backfilled, + "Phase 3 complete: backfill finished" + ); + + // Phase 4: Verify + emit_phase( + ReshardPhase::Verifying, + backfill_result.documents_backfilled, + ); + let verify_result = verify_phase( + &config.index_uid, + &shadow_index, + old_shards, + config.target_shards, + &config.node_addresses, + &config.master_key, + &config.primary_key, + ) + .await + .map_err(|e| { + // Rollback: delete shadow index + tracing::error!(error = %e, "Phase 4 verify failed, rolling back"); + let _ = rollback_shadow_orchestrator(&shadow_index, &config); + format!("Phase 4 verify failed: {}", e) + })?; + + if !verify_result.passed { + // Verification failed - rollback + let error = format!( + "Phase 4 verification failed: {} live-only, {} shadow-only, {} mismatched", + verify_result.live_only_pks.len(), + verify_result.shadow_only_pks.len(), + verify_result.mismatched_pks.len() + ); + tracing::error!(error); + let _ = rollback_shadow_orchestrator(&shadow_index, &config); + return Err(error); + } + + tracing::info!("Phase 4 complete: verification passed"); + + // Phase 5: Alias swap + emit_phase(ReshardPhase::Swapped, backfill_result.documents_backfilled); + let _swap_result = if let Some(ref task_store) = config.task_store { + alias_swap_phase( + &config.index_uid, + &shadow_index, + task_store.as_ref(), + config.alias_history_retention, + ) + .await + .map_err(|e| format!("Phase 5 alias swap failed: {}", e))? + } else { + // No task store - skip alias swap (for testing) + tracing::warn!("no task store, skipping alias swap"); + return Ok(ReshardOrchestratorResult { + index_uid: config.index_uid, + old_shards, + new_shards: config.target_shards, + shadow_index, + documents_backfilled: backfill_result.documents_backfilled, + total_duration_secs: start_time.elapsed().unwrap_or_default().as_secs_f64(), + verification_passed: true, + final_phase: ReshardPhase::Swapped, + }); + }; + + tracing::info!( + old_target = %config.index_uid, + new_target = %shadow_index, + "Phase 5 complete: alias swapped" + ); + + // Phase 6: Cleanup (after retention period) + // For now, we skip cleanup in the orchestrator - it's triggered separately + emit_phase(ReshardPhase::Complete, backfill_result.documents_backfilled); + + let total_duration_secs = start_time.elapsed().unwrap_or_default().as_secs_f64(); + + tracing::info!( + index = %config.index_uid, + documents_backfilled = backfill_result.documents_backfilled, + duration_secs = total_duration_secs, + "reshard complete: all phases finished" + ); + + Ok(ReshardOrchestratorResult { + index_uid: config.index_uid, + old_shards, + new_shards: config.target_shards, + shadow_index, + documents_backfilled: backfill_result.documents_backfilled, + total_duration_secs, + verification_passed: true, + final_phase: ReshardPhase::Complete, + }) +} + +/// Rollback shadow index deletion (used on failure before Phase 5). +async fn rollback_shadow_orchestrator( + shadow_index: &str, + config: &ReshardOrchestratorConfig, +) -> Result<(), String> { + tracing::warn!( + shadow_index = %shadow_index, + "rolling back: deleting shadow index" + ); + + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build() + .map_err(|e| format!("HTTP client: {}", e))?; + + for address in &config.node_addresses { + let url = format!("{}/indexes/{}", address.trim_end_matches('/'), shadow_index); + + match client + .delete(&url) + .header("Authorization", format!("Bearer {}", config.master_key)) + .send() + .await + { + Ok(resp) if resp.status().is_success() => { + tracing::info!(node = %address, "rollback: deleted shadow index"); + } + Ok(resp) => { + tracing::warn!( + node = %address, + status = %resp.status(), + "rollback: failed to delete shadow index" + ); + } + Err(e) => { + tracing::error!(node = %address, error = %e, "rollback: request failed"); + } + } + } + + Ok(()) +} + +// --------------------------------------------------------------------------- +// Orchestrator tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests_orchestrator { + use super::*; + + #[test] + fn orchestrator_config_fields() { + let config = ReshardOrchestratorConfig { + index_uid: "products".to_string(), + target_shards: 128, + node_addresses: vec!["http://node-1:7700".to_string()], + master_key: "key".to_string(), + primary_key: "id".to_string(), + throttle_docs_per_sec: 10000, + backfill_batch_size: 1000, + retain_old_index_hours: 48, + verify_before_swap: true, + alias_history_retention: 10, + task_store: None, + metrics_callback: None, + }; + + assert_eq!(config.index_uid, "products"); + assert_eq!(config.target_shards, 128); + assert_eq!(config.throttle_docs_per_sec, 10000); + } + + #[test] + fn orchestrator_result_fields() { + let result = ReshardOrchestratorResult { + index_uid: "products".to_string(), + old_shards: 64, + new_shards: 128, + shadow_index: "products__reshard_128".to_string(), + documents_backfilled: 1000000, + total_duration_secs: 3600.0, + verification_passed: true, + final_phase: ReshardPhase::Complete, + }; + + assert_eq!(result.index_uid, "products"); + assert_eq!(result.old_shards, 64); + assert_eq!(result.new_shards, 128); + assert_eq!(result.documents_backfilled, 1000000); + assert!(result.verification_passed); + assert_eq!(result.final_phase, ReshardPhase::Complete); + } +}