diff --git a/crates/miroir-core/src/reshard.rs b/crates/miroir-core/src/reshard.rs index 81e60ee..36bc2ae 100644 --- a/crates/miroir-core/src/reshard.rs +++ b/crates/miroir-core/src/reshard.rs @@ -3568,6 +3568,9 @@ pub enum CleanupError { CleanupAborted(String), } +/// Callback type for cleanup metrics emission. +pub type CleanupMetricsCallback = Arc; + /// Execute Phase 6: Cleanup old index after retention (plan ยง13.1 step 6). /// /// Deletes the live index from all nodes after the retention period. @@ -3578,9 +3581,11 @@ pub enum CleanupError { /// * `new_index_uid` - The shadow index UID (now live) /// * `node_addresses` - List of all node addresses /// * `master_key` - Meilisearch master key +/// * `cleanup_deadline` - UNIX ms timestamp when retention expires (None = skip cleanup) +/// * `metrics_callback` - Optional callback for metrics emission /// /// # Returns -/// `Ok(CleanupResult)` with cleanup details on success. +/// `Ok(CleanupResult)` with cleanup details on success, or Err if deadline not reached. /// /// # Rollback /// If cleanup fails on some nodes, the index remains partially available. @@ -3590,16 +3595,51 @@ pub async fn cleanup_phase( new_index_uid: &str, node_addresses: &[String], master_key: &str, + cleanup_deadline: Option, + metrics_callback: Option, ) -> Result { use std::time::{SystemTime, UNIX_EPOCH}; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + tracing::info!( old_index = %old_index_uid, new_index = %new_index_uid, nodes = node_addresses.len(), + cleanup_deadline, "starting Phase 6: cleanup old index" ); + // Check retention deadline before proceeding + + if let Some(deadline) = cleanup_deadline { + if now < deadline { + let remaining_hours = (deadline - now) / 3600 / 1000; + tracing::info!( + old_index = %old_index_uid, + remaining_hours, + deadline, + "retention period not yet reached, skipping cleanup" + ); + return Err(CleanupError::CleanupAborted(format!( + "retention period not reached: {} hours remaining", + remaining_hours + ))); + } + } else { + tracing::info!( + old_index = %old_index_uid, + "no cleanup deadline set, skipping cleanup" + ); + return Err(CleanupError::CleanupAborted( + "no cleanup deadline set".to_string(), + )); + } + + let cleanup_start = now; let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(30)) .build() @@ -3658,12 +3698,21 @@ pub async fn cleanup_phase( ); } - Ok(CleanupResult { + let result = CleanupResult { old_index: old_index_uid.to_string(), new_index: new_index_uid.to_string(), nodes_deleted_from, completed_at, - }) + }; + + // Emit cleanup completion metric (miroir_reshard_cleanup_completed_seconds) + // Measures the duration of the cleanup phase itself (time to delete the index) + let cleanup_duration_secs = (completed_at.saturating_sub(cleanup_start)) as f64 / 1000.0; + if let Some(ref callback) = metrics_callback { + callback(cleanup_duration_secs, &result); + } + + Ok(result) } // --------------------------------------------------------------------------- @@ -3733,6 +3782,13 @@ mod tests_backfill_cleanup { assert!(err.to_string().contains("node-1")); assert!(err.to_string().contains("timeout")); } + + #[test] + fn cleanup_error_aborted_display() { + let err = CleanupError::CleanupAborted("retention period not reached: 23 hours remaining".to_string()); + assert!(err.to_string().contains("retention period not reached")); + assert!(err.to_string().contains("23 hours remaining")); + } } // ---------------------------------------------------------------------------