feat(reshard): implement P5.1.f cleanup phase with retention TTL
- Add cleanup_deadline parameter to cleanup_phase for retention checking - Check retention period (default 48h) before deleting old index - Return CleanupAborted error if deadline not reached or not set - Add CleanupMetricsCallback for miroir_reshard_cleanup_completed_seconds metric - Measure and emit cleanup duration (time to delete index) - Add test for cleanup_error_aborted_display The cleanup phase now properly enforces the retention TTL before deleting the old index, allowing for emergency rollback within the configurable retention window. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
ecb27e78ff
commit
68acf16249
1 changed files with 59 additions and 3 deletions
|
|
@ -3568,6 +3568,9 @@ pub enum CleanupError {
|
|||
CleanupAborted(String),
|
||||
}
|
||||
|
||||
/// Callback type for cleanup metrics emission.
|
||||
pub type CleanupMetricsCallback = Arc<dyn Fn(f64, &CleanupResult) + Send + Sync>;
|
||||
|
||||
/// Execute Phase 6: Cleanup old index after retention (plan §13.1 step 6).
|
||||
///
|
||||
/// Deletes the live index from all nodes after the retention period.
|
||||
|
|
@ -3578,9 +3581,11 @@ pub enum CleanupError {
|
|||
/// * `new_index_uid` - The shadow index UID (now live)
|
||||
/// * `node_addresses` - List of all node addresses
|
||||
/// * `master_key` - Meilisearch master key
|
||||
/// * `cleanup_deadline` - UNIX ms timestamp when retention expires (None = skip cleanup)
|
||||
/// * `metrics_callback` - Optional callback for metrics emission
|
||||
///
|
||||
/// # Returns
|
||||
/// `Ok(CleanupResult)` with cleanup details on success.
|
||||
/// `Ok(CleanupResult)` with cleanup details on success, or Err if deadline not reached.
|
||||
///
|
||||
/// # Rollback
|
||||
/// If cleanup fails on some nodes, the index remains partially available.
|
||||
|
|
@ -3590,16 +3595,51 @@ pub async fn cleanup_phase(
|
|||
new_index_uid: &str,
|
||||
node_addresses: &[String],
|
||||
master_key: &str,
|
||||
cleanup_deadline: Option<u64>,
|
||||
metrics_callback: Option<CleanupMetricsCallback>,
|
||||
) -> Result<CleanupResult, CleanupError> {
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as u64;
|
||||
|
||||
tracing::info!(
|
||||
old_index = %old_index_uid,
|
||||
new_index = %new_index_uid,
|
||||
nodes = node_addresses.len(),
|
||||
cleanup_deadline,
|
||||
"starting Phase 6: cleanup old index"
|
||||
);
|
||||
|
||||
// Check retention deadline before proceeding
|
||||
|
||||
if let Some(deadline) = cleanup_deadline {
|
||||
if now < deadline {
|
||||
let remaining_hours = (deadline - now) / 3600 / 1000;
|
||||
tracing::info!(
|
||||
old_index = %old_index_uid,
|
||||
remaining_hours,
|
||||
deadline,
|
||||
"retention period not yet reached, skipping cleanup"
|
||||
);
|
||||
return Err(CleanupError::CleanupAborted(format!(
|
||||
"retention period not reached: {} hours remaining",
|
||||
remaining_hours
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
tracing::info!(
|
||||
old_index = %old_index_uid,
|
||||
"no cleanup deadline set, skipping cleanup"
|
||||
);
|
||||
return Err(CleanupError::CleanupAborted(
|
||||
"no cleanup deadline set".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let cleanup_start = now;
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.build()
|
||||
|
|
@ -3658,12 +3698,21 @@ pub async fn cleanup_phase(
|
|||
);
|
||||
}
|
||||
|
||||
Ok(CleanupResult {
|
||||
let result = CleanupResult {
|
||||
old_index: old_index_uid.to_string(),
|
||||
new_index: new_index_uid.to_string(),
|
||||
nodes_deleted_from,
|
||||
completed_at,
|
||||
})
|
||||
};
|
||||
|
||||
// Emit cleanup completion metric (miroir_reshard_cleanup_completed_seconds)
|
||||
// Measures the duration of the cleanup phase itself (time to delete the index)
|
||||
let cleanup_duration_secs = (completed_at.saturating_sub(cleanup_start)) as f64 / 1000.0;
|
||||
if let Some(ref callback) = metrics_callback {
|
||||
callback(cleanup_duration_secs, &result);
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -3733,6 +3782,13 @@ mod tests_backfill_cleanup {
|
|||
assert!(err.to_string().contains("node-1"));
|
||||
assert!(err.to_string().contains("timeout"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cleanup_error_aborted_display() {
|
||||
let err = CleanupError::CleanupAborted("retention period not reached: 23 hours remaining".to_string());
|
||||
assert!(err.to_string().contains("retention period not reached"));
|
||||
assert!(err.to_string().contains("23 hours remaining"));
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue