diff --git a/crates/miroir-core/src/reshard.rs b/crates/miroir-core/src/reshard.rs index a9d32fa..bdb1a97 100644 --- a/crates/miroir-core/src/reshard.rs +++ b/crates/miroir-core/src/reshard.rs @@ -878,6 +878,10 @@ pub struct ReshardOperationState { pub phase: ReshardPhase, /// When the operation started (UNIX ms) pub started_at: u64, + /// Documents backfilled so far (updated during backfill phase) + pub documents_backfilled: u64, + /// Total documents to backfill (estimated at start) + pub total_documents: u64, } impl ReshardingRegistry { @@ -932,6 +936,28 @@ impl ReshardingRegistry { Ok(()) } + /// Update backfill progress for an active resharding operation. + pub fn update_progress( + &mut self, + index_uid: &str, + documents_backfilled: u64, + total_documents: u64, + ) -> Result<(), String> { + let op = self + .active_operations + .get_mut(index_uid) + .ok_or_else(|| format!("No resharding operation for index '{index_uid}'"))?; + op.documents_backfilled = documents_backfilled; + op.total_documents = total_documents; + tracing::debug!( + index_uid = %index_uid, + documents_backfilled, + total_documents, + "updated resharding backfill progress" + ); + Ok(()) + } + /// Remove a completed resharding operation. pub fn remove(&mut self, index_uid: &str) -> Result<(), String> { if self.active_operations.remove(index_uid).is_none() { @@ -1802,6 +1828,8 @@ mod tests_dual_write { target_shards: 128, phase: ReshardPhase::ShadowCreated, started_at: 1000, + documents_backfilled: 0, + total_documents: 0, }; let prep = prepare_dual_write_documents(&documents, "id", &reshard_state); @@ -1842,6 +1870,8 @@ mod tests_dual_write { target_shards: 256, phase: ReshardPhase::DualWriteActive, started_at: 2000, + documents_backfilled: 0, + total_documents: 0, }; let prep = prepare_dual_write_documents(&documents, "id", &reshard_state); @@ -1872,6 +1902,8 @@ mod tests_dual_write { target_shards: 32, phase: ReshardPhase::BackfillInProgress, started_at: 3000, + documents_backfilled: 0, + total_documents: 0, }; // Run multiple times - should be deterministic @@ -1898,6 +1930,8 @@ mod tests_dual_write { target_shards: 64, phase: ReshardPhase::ShadowCreated, started_at: 1000, + documents_backfilled: 0, + total_documents: 0, }; let prep = prepare_dual_write_documents(&documents, "id", &reshard_state); @@ -1916,6 +1950,8 @@ mod tests_dual_write { target_shards: 128, phase: ReshardPhase::DualWriteActive, started_at: 1000, + documents_backfilled: 0, + total_documents: 0, }; let prep = prepare_dual_write_documents(&documents, "id", &reshard_state); @@ -2175,6 +2211,8 @@ mod tests_resharding_registry { target_shards: 128, phase: ReshardPhase::ShadowCreated, started_at: 1000, + documents_backfilled: 0, + total_documents: 0, }; reg.register("products".to_string(), state).unwrap(); @@ -2194,6 +2232,8 @@ mod tests_resharding_registry { target_shards: 128, phase: ReshardPhase::ShadowCreated, started_at: 1000, + documents_backfilled: 0, + total_documents: 0, }; reg.register("products".to_string(), state).unwrap(); @@ -2203,6 +2243,8 @@ mod tests_resharding_registry { target_shards: 256, phase: ReshardPhase::ShadowCreated, started_at: 2000, + documents_backfilled: 0, + total_documents: 0, }; assert!(reg.register("products".to_string(), state2).is_err()); } @@ -2216,6 +2258,8 @@ mod tests_resharding_registry { target_shards: 128, phase: ReshardPhase::ShadowCreated, started_at: 1000, + documents_backfilled: 0, + total_documents: 0, }; reg.register("products".to_string(), state).unwrap(); @@ -2243,6 +2287,8 @@ mod tests_resharding_registry { target_shards: 128, phase: ReshardPhase::ShadowCreated, started_at: 1000, + documents_backfilled: 0, + total_documents: 0, }; reg.register("products".to_string(), state).unwrap(); assert!(reg.get("products").is_some()); @@ -2266,6 +2312,8 @@ mod tests_resharding_registry { target_shards: 128, phase: ReshardPhase::ShadowCreated, started_at: 1000, + documents_backfilled: 0, + total_documents: 0, }; reg.register("products".to_string(), state).unwrap(); assert!(reg.is_dual_write_active("products")); @@ -2280,6 +2328,8 @@ mod tests_resharding_registry { target_shards: 128, phase: ReshardPhase::DualWriteActive, started_at: 1000, + documents_backfilled: 0, + total_documents: 0, }; reg.register("products".to_string(), state).unwrap(); assert!(reg.is_dual_write_active("products")); @@ -2294,6 +2344,8 @@ mod tests_resharding_registry { target_shards: 128, phase: ReshardPhase::BackfillInProgress, started_at: 1000, + documents_backfilled: 0, + total_documents: 0, }; reg.register("products".to_string(), state).unwrap(); assert!(reg.is_dual_write_active("products")); @@ -2308,6 +2360,8 @@ mod tests_resharding_registry { target_shards: 128, phase: ReshardPhase::Verifying, started_at: 1000, + documents_backfilled: 0, + total_documents: 0, }; reg.register("products".to_string(), state).unwrap(); assert!(reg.is_dual_write_active("products")); @@ -2322,6 +2376,8 @@ mod tests_resharding_registry { target_shards: 128, phase: ReshardPhase::Swapped, started_at: 1000, + documents_backfilled: 0, + total_documents: 0, }; reg.register("products".to_string(), state).unwrap(); // After swap, dual-write stops (writes go only to new index) @@ -2344,6 +2400,8 @@ mod tests_resharding_registry { target_shards: 128, phase: ReshardPhase::ShadowCreated, started_at: 1000, + documents_backfilled: 0, + total_documents: 0, }; reg.register("products".to_string(), state1).unwrap(); @@ -2353,6 +2411,8 @@ mod tests_resharding_registry { target_shards: 256, phase: ReshardPhase::DualWriteActive, started_at: 2000, + documents_backfilled: 0, + total_documents: 0, }; reg.register("orders".to_string(), state2).unwrap(); @@ -2382,6 +2442,8 @@ mod tests_resharding_registry { target_shards: 128, phase: ReshardPhase::DualWriteActive, started_at: 1000, + documents_backfilled: 0, + total_documents: 0, }; reg.register("products".to_string(), products_state) .unwrap(); @@ -2392,6 +2454,8 @@ mod tests_resharding_registry { target_shards: 256, phase: ReshardPhase::ShadowCreated, started_at: 2000, + documents_backfilled: 0, + total_documents: 0, }; reg.register("orders".to_string(), orders_state).unwrap(); @@ -3818,11 +3882,16 @@ pub struct ReshardOrchestratorConfig { pub task_store: Option>, /// Metrics callback for phase transitions. pub metrics_callback: Option, + /// Progress callback for registry updates (documents_backfilled, total_documents). + pub progress_callback: Option, } /// Callback for metrics emission during resharding. pub type ReshardMetricsCallback = Arc; +/// Callback for progress updates during resharding backfill. +pub type ReshardProgressCallback = Arc; + /// Result of the full reshard operation. #[derive(Debug, Clone)] pub struct ReshardOrchestratorResult { @@ -3937,6 +4006,13 @@ pub async fn execute_reshard( ReshardPhase::BackfillInProgress, backfill_result.documents_backfilled, ); + // Update registry with final backfill progress + if let Some(ref cb) = config.progress_callback { + cb( + backfill_result.documents_backfilled, + backfill_result.total_estimated, + ); + } tracing::info!( documents_backfilled = backfill_result.documents_backfilled, "Phase 3 complete: backfill finished" @@ -4143,6 +4219,7 @@ mod tests_orchestrator { alias_history_retention: 10, task_store: None, metrics_callback: None, + progress_callback: None, }; assert_eq!(config.index_uid, "products"); diff --git a/crates/miroir-proxy/src/routes/admin_endpoints.rs b/crates/miroir-proxy/src/routes/admin_endpoints.rs index 471f264..0ee6f01 100644 --- a/crates/miroir-proxy/src/routes/admin_endpoints.rs +++ b/crates/miroir-proxy/src/routes/admin_endpoints.rs @@ -3355,6 +3355,8 @@ where target_shards: req.new_shards, phase: miroir_core::reshard::ReshardPhase::ShadowCreated, started_at: now, + documents_backfilled: 0, + total_documents: 0, }; let mut registry = app_state.resharding_registry.write().await; @@ -3438,6 +3440,27 @@ where }, ); + // Create progress callback to update registry during backfill + let index_uid_for_cb = index_uid_clone.clone(); + let registry_for_cb = registry.clone(); + let progress_callback = + std::sync::Arc::new(move |docs_backfilled: u64, total_docs: u64| { + let index_uid = index_uid_for_cb.clone(); + let registry = registry_for_cb.clone(); + tokio::spawn(async move { + let mut reg = registry.write().await; + if let Err(e) = + reg.update_progress(&index_uid, docs_backfilled, total_docs) + { + tracing::error!( + index_uid = %index_uid, + error = %e, + "failed to update resharding progress" + ); + } + }); + }); + let config = miroir_core::reshard::ReshardOrchestratorConfig { index_uid: index_uid_clone.clone(), target_shards: req.new_shards, @@ -3451,6 +3474,7 @@ where alias_history_retention: 10, task_store: task_store.clone(), metrics_callback: Some(metrics_callback), + progress_callback: Some(progress_callback), }; // Run the full orchestrator @@ -3536,6 +3560,11 @@ where let operation = registry.get(&index_uid); if let Some(op) = operation { + let backfill_progress = if op.total_documents > 0 { + op.documents_backfilled as f64 / op.total_documents as f64 + } else { + 0.0 + }; Ok(Json(ReshardStatusResponse { active: true, operation: Some(ReshardOperationDetails { @@ -3544,9 +3573,9 @@ where old_shards: op.old_shards, new_shards: op.target_shards, phase: op.phase.name().to_string(), - documents_backfilled: 0, // TODO: track progress - total_documents: 0, - backfill_progress: 0.0, + documents_backfilled: op.documents_backfilled, + total_documents: op.total_documents, + backfill_progress, shadow_index: op.shadow_index.clone(), started_at: op.started_at, last_error: None,