diff --git a/crates/miroir-core/src/reshard.rs b/crates/miroir-core/src/reshard.rs index 36bc2ae..a55febb 100644 --- a/crates/miroir-core/src/reshard.rs +++ b/crates/miroir-core/src/reshard.rs @@ -3785,7 +3785,9 @@ mod tests_backfill_cleanup { #[test] fn cleanup_error_aborted_display() { - let err = CleanupError::CleanupAborted("retention period not reached: 23 hours remaining".to_string()); + let err = CleanupError::CleanupAborted( + "retention period not reached: 23 hours remaining".to_string(), + ); assert!(err.to_string().contains("retention period not reached")); assert!(err.to_string().contains("23 hours remaining")); } diff --git a/crates/miroir-ctl/src/commands/reshard.rs b/crates/miroir-ctl/src/commands/reshard.rs index b01981d..ab1ff6c 100644 --- a/crates/miroir-ctl/src/commands/reshard.rs +++ b/crates/miroir-ctl/src/commands/reshard.rs @@ -49,10 +49,9 @@ pub enum ReshardSubcommand { pub async fn run( cmd: ReshardSubcommand, - _admin_key: &str, - _api_url: &str, + admin_key: &str, + api_url: &str, ) -> Result<(), Box> { - let _ = (_admin_key, _api_url); match cmd { ReshardSubcommand::Start { index, @@ -61,8 +60,20 @@ pub async fn run( schedule_window, force, dry_run, - } => run_start(index, new_shards, throttle, schedule_window, force, dry_run).await, - ReshardSubcommand::Status { index } => run_status(index).await, + } => { + run_start( + index, + new_shards, + throttle, + schedule_window, + force, + dry_run, + admin_key, + api_url, + ) + .await + } + ReshardSubcommand::Status { index } => run_status(index, admin_key, api_url).await, } } @@ -73,6 +84,8 @@ async fn run_start( schedule_window: Option, force: bool, dry_run: bool, + admin_key: &str, + api_url: &str, ) -> Result<(), Box> { let config = load_reshard_config()?; @@ -154,14 +167,133 @@ async fn run_start( return Ok(()); } - // TODO: Submit reshard job via admin API when proxy is implemented. - Err("Reshard start requires admin API (not yet connected). Use --dry-run to validate.".into()) + // Submit reshard job via admin API + let client = reqwest::Client::new(); + let url = format!( + "{}/indexes/{}/reshard", + api_url.trim_end_matches('/'), + index + ); + + let request_body = serde_json::json!({ + "new_shards": new_shards, + "throttle_docs_per_sec": throttle + }); + + let response = client + .post(&url) + .header("Authorization", format!("Bearer {}", admin_key)) + .json(&request_body) + .send() + .await + .map_err(|e| format!("Failed to connect to {}: {}", url, e))?; + + let status = response.status(); + let body_text = response + .text() + .await + .map_err(|e| format!("Failed to read response: {}", e))?; + + if status.as_u16() == 409 { + return Err(format!("Resharding already in progress for index '{}'", index).into()); + } + + if !status.is_success() { + return Err(format!( + "Reshard request failed: HTTP {} - {}", + status.as_u16(), + body_text + ) + .into()); + } + + let result: serde_json::Value = + serde_json::from_str(&body_text).map_err(|e| format!("Failed to parse response: {}", e))?; + + println!("Resharding started successfully!"); + println!(" Operation ID: {}", result["operation_id"]); + println!(" Index: {}", result["index_uid"]); + println!(" Old shards: {}", result["old_shards"]); + println!(" New shards: {}", result["new_shards"]); + println!(" Shadow index: {}", result["shadow_index"]); + println!(" Current phase: {}", result["phase"]); + println!(); + println!("Monitor progress with:"); + println!(" miroir-ctl reshard --status --index {}", index); + + Ok(()) } -async fn run_status(index: String) -> Result<(), Box> { - // TODO: Query reshard status via admin API when proxy is implemented. - let _ = index; - Err("Reshard status requires admin API (not yet connected).".into()) +async fn run_status( + index: String, + admin_key: &str, + api_url: &str, +) -> Result<(), Box> { + let client = reqwest::Client::new(); + let url = format!( + "{}/indexes/{}/reshard/status", + api_url.trim_end_matches('/'), + index + ); + + let response = client + .get(&url) + .header("Authorization", format!("Bearer {}", admin_key)) + .send() + .await + .map_err(|e| format!("Failed to connect to {}: {}", url, e))?; + + let status = response.status(); + let body_text = response + .text() + .await + .map_err(|e| format!("Failed to read response: {}", e))?; + + if !status.is_success() { + return Err(format!( + "Status request failed: HTTP {} - {}", + status.as_u16(), + body_text + ) + .into()); + } + + let result: serde_json::Value = + serde_json::from_str(&body_text).map_err(|e| format!("Failed to parse response: {}", e))?; + + if !result["active"].as_bool().unwrap_or(false) { + println!("No active resharding operation for index '{}'", index); + return Ok(()); + } + + let op = &result["operation"]; + println!("Resharding status for index '{}':", index); + println!(" Operation ID: {}", op["id"]); + println!(" Current phase: {}", op["phase"]); + println!(" Old shards: {}", op["old_shards"]); + println!(" New shards: {}", op["new_shards"]); + println!(" Shadow index: {}", op["shadow_index"]); + + if let Some(docs) = op["documents_backfilled"].as_u64() { + let total = op["total_documents"].as_u64().unwrap_or(0); + let progress = op["backfill_progress"].as_f64().unwrap_or(0.0); + println!( + " Backfill progress: {} / {} ({:.1}%)", + docs, + total, + progress * 100.0 + ); + } + + if let Some(error) = op["last_error"].as_str() { + println!(" Last error: {}", error); + } + + if let Some(_verify) = op["verification_results"].as_object() { + println!(" Verification: completed"); + } + + Ok(()) } /// Load resharding config from the standard config path. diff --git a/crates/miroir-proxy/src/routes/admin_endpoints.rs b/crates/miroir-proxy/src/routes/admin_endpoints.rs index f3e2f17..c4c5416 100644 --- a/crates/miroir-proxy/src/routes/admin_endpoints.rs +++ b/crates/miroir-proxy/src/routes/admin_endpoints.rs @@ -2819,10 +2819,92 @@ where return Err(StatusCode::INTERNAL_SERVER_ERROR); } - // In a full implementation, we would also: - // - Persist the operation to the task store for recovery - // - Start a background task for phases 2-6 - // - Emit metrics + // Spawn a background task to run the full orchestrator (phases 2-6) + let index_uid_clone = index_uid.clone(); + let shadow_index_clone = shadow_index.clone(); + let registry = app_state.resharding_registry.clone(); + let topology = app_state.topology.clone(); + let master_key_clone = master_key.clone(); + let task_store = app_state.task_store.clone(); + + tokio::spawn(async move { + info!( + index_uid = %index_uid_clone, + "Starting background reshard orchestrator for phases 2-6" + ); + + // Get primary key from topology + let topo = topology.read().await; + let primary_key = "id"; // Default - should be fetched from index schema + drop(topo); + + // Get node addresses again + let topo = topology.read().await; + let node_addresses: Vec = topo.nodes().map(|n| n.address.clone()).collect(); + drop(topo); + + // Configure the orchestrator + let config = miroir_core::reshard::ReshardOrchestratorConfig { + index_uid: index_uid_clone.clone(), + target_shards: req.new_shards, + node_addresses, + master_key: master_key_clone, + primary_key: primary_key.to_string(), + throttle_docs_per_sec: req.throttle_docs_per_sec, + backfill_batch_size: 1000, + retain_old_index_hours: 48, + verify_before_swap: true, + alias_history_retention: 10, + task_store: task_store.clone(), + metrics_callback: None, // TODO: wire up metrics + }; + + // Run the full orchestrator + match miroir_core::reshard::execute_reshard(config).await { + Ok(result) => { + info!( + index_uid = %index_uid_clone, + documents_backfilled = result.documents_backfilled, + duration_secs = result.total_duration_secs, + final_phase = ?result.final_phase, + "Reshard orchestrator completed successfully" + ); + + // Update registry to final phase + let mut reg = registry.write().await; + if let Err(e) = reg.update_phase( + &index_uid_clone, + miroir_core::reshard::ReshardPhase::Complete, + ) { + error!( + index_uid = %index_uid_clone, + error = %e, + "failed to update resharding phase to Complete" + ); + } + } + Err(e) => { + error!( + index_uid = %index_uid_clone, + error = %e, + "Reshard orchestrator failed" + ); + + // Update registry to failed state + let mut reg = registry.write().await; + if let Err(err) = reg.update_phase( + &index_uid_clone, + miroir_core::reshard::ReshardPhase::Failed, + ) { + error!( + index_uid = %index_uid_clone, + error = %err, + "failed to update resharding phase to Failed" + ); + } + } + } + }); Ok(Json(ReshardResponse { operation_id: format!("reshard-{}-{}", index_uid, now),