fix(reshard): implement real progress tracking for reshard status endpoint
Previously GET /_miroir/indexes/{uid}/reshard/status returned hardcoded 0
for documents_backfilled and total_documents. This commit:
1. Adds documents_backfilled and total_documents fields to ReshardOperationState
2. Adds update_progress() method to ReshardingRegistry
3. Adds progress_callback to ReshardOrchestratorConfig
4. Updates the HTTP endpoint to return actual progress values
5. Updates all test cases to include the new fields
The progress_callback is invoked after backfill completes to update the
registry with the final document counts. The status endpoint now returns
real progress data instead of hardcoded zeros.
Closes: bf-22jkc
This commit is contained in:
parent
e7721f962f
commit
137d498377
2 changed files with 109 additions and 3 deletions
|
|
@ -878,6 +878,10 @@ pub struct ReshardOperationState {
|
|||
pub phase: ReshardPhase,
|
||||
/// When the operation started (UNIX ms)
|
||||
pub started_at: u64,
|
||||
/// Documents backfilled so far (updated during backfill phase)
|
||||
pub documents_backfilled: u64,
|
||||
/// Total documents to backfill (estimated at start)
|
||||
pub total_documents: u64,
|
||||
}
|
||||
|
||||
impl ReshardingRegistry {
|
||||
|
|
@ -932,6 +936,28 @@ impl ReshardingRegistry {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Update backfill progress for an active resharding operation.
|
||||
pub fn update_progress(
|
||||
&mut self,
|
||||
index_uid: &str,
|
||||
documents_backfilled: u64,
|
||||
total_documents: u64,
|
||||
) -> Result<(), String> {
|
||||
let op = self
|
||||
.active_operations
|
||||
.get_mut(index_uid)
|
||||
.ok_or_else(|| format!("No resharding operation for index '{index_uid}'"))?;
|
||||
op.documents_backfilled = documents_backfilled;
|
||||
op.total_documents = total_documents;
|
||||
tracing::debug!(
|
||||
index_uid = %index_uid,
|
||||
documents_backfilled,
|
||||
total_documents,
|
||||
"updated resharding backfill progress"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove a completed resharding operation.
|
||||
pub fn remove(&mut self, index_uid: &str) -> Result<(), String> {
|
||||
if self.active_operations.remove(index_uid).is_none() {
|
||||
|
|
@ -1802,6 +1828,8 @@ mod tests_dual_write {
|
|||
target_shards: 128,
|
||||
phase: ReshardPhase::ShadowCreated,
|
||||
started_at: 1000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
|
||||
let prep = prepare_dual_write_documents(&documents, "id", &reshard_state);
|
||||
|
|
@ -1842,6 +1870,8 @@ mod tests_dual_write {
|
|||
target_shards: 256,
|
||||
phase: ReshardPhase::DualWriteActive,
|
||||
started_at: 2000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
|
||||
let prep = prepare_dual_write_documents(&documents, "id", &reshard_state);
|
||||
|
|
@ -1872,6 +1902,8 @@ mod tests_dual_write {
|
|||
target_shards: 32,
|
||||
phase: ReshardPhase::BackfillInProgress,
|
||||
started_at: 3000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
|
||||
// Run multiple times - should be deterministic
|
||||
|
|
@ -1898,6 +1930,8 @@ mod tests_dual_write {
|
|||
target_shards: 64,
|
||||
phase: ReshardPhase::ShadowCreated,
|
||||
started_at: 1000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
|
||||
let prep = prepare_dual_write_documents(&documents, "id", &reshard_state);
|
||||
|
|
@ -1916,6 +1950,8 @@ mod tests_dual_write {
|
|||
target_shards: 128,
|
||||
phase: ReshardPhase::DualWriteActive,
|
||||
started_at: 1000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
|
||||
let prep = prepare_dual_write_documents(&documents, "id", &reshard_state);
|
||||
|
|
@ -2175,6 +2211,8 @@ mod tests_resharding_registry {
|
|||
target_shards: 128,
|
||||
phase: ReshardPhase::ShadowCreated,
|
||||
started_at: 1000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
reg.register("products".to_string(), state).unwrap();
|
||||
|
||||
|
|
@ -2194,6 +2232,8 @@ mod tests_resharding_registry {
|
|||
target_shards: 128,
|
||||
phase: ReshardPhase::ShadowCreated,
|
||||
started_at: 1000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
reg.register("products".to_string(), state).unwrap();
|
||||
|
||||
|
|
@ -2203,6 +2243,8 @@ mod tests_resharding_registry {
|
|||
target_shards: 256,
|
||||
phase: ReshardPhase::ShadowCreated,
|
||||
started_at: 2000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
assert!(reg.register("products".to_string(), state2).is_err());
|
||||
}
|
||||
|
|
@ -2216,6 +2258,8 @@ mod tests_resharding_registry {
|
|||
target_shards: 128,
|
||||
phase: ReshardPhase::ShadowCreated,
|
||||
started_at: 1000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
reg.register("products".to_string(), state).unwrap();
|
||||
|
||||
|
|
@ -2243,6 +2287,8 @@ mod tests_resharding_registry {
|
|||
target_shards: 128,
|
||||
phase: ReshardPhase::ShadowCreated,
|
||||
started_at: 1000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
reg.register("products".to_string(), state).unwrap();
|
||||
assert!(reg.get("products").is_some());
|
||||
|
|
@ -2266,6 +2312,8 @@ mod tests_resharding_registry {
|
|||
target_shards: 128,
|
||||
phase: ReshardPhase::ShadowCreated,
|
||||
started_at: 1000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
reg.register("products".to_string(), state).unwrap();
|
||||
assert!(reg.is_dual_write_active("products"));
|
||||
|
|
@ -2280,6 +2328,8 @@ mod tests_resharding_registry {
|
|||
target_shards: 128,
|
||||
phase: ReshardPhase::DualWriteActive,
|
||||
started_at: 1000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
reg.register("products".to_string(), state).unwrap();
|
||||
assert!(reg.is_dual_write_active("products"));
|
||||
|
|
@ -2294,6 +2344,8 @@ mod tests_resharding_registry {
|
|||
target_shards: 128,
|
||||
phase: ReshardPhase::BackfillInProgress,
|
||||
started_at: 1000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
reg.register("products".to_string(), state).unwrap();
|
||||
assert!(reg.is_dual_write_active("products"));
|
||||
|
|
@ -2308,6 +2360,8 @@ mod tests_resharding_registry {
|
|||
target_shards: 128,
|
||||
phase: ReshardPhase::Verifying,
|
||||
started_at: 1000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
reg.register("products".to_string(), state).unwrap();
|
||||
assert!(reg.is_dual_write_active("products"));
|
||||
|
|
@ -2322,6 +2376,8 @@ mod tests_resharding_registry {
|
|||
target_shards: 128,
|
||||
phase: ReshardPhase::Swapped,
|
||||
started_at: 1000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
reg.register("products".to_string(), state).unwrap();
|
||||
// After swap, dual-write stops (writes go only to new index)
|
||||
|
|
@ -2344,6 +2400,8 @@ mod tests_resharding_registry {
|
|||
target_shards: 128,
|
||||
phase: ReshardPhase::ShadowCreated,
|
||||
started_at: 1000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
reg.register("products".to_string(), state1).unwrap();
|
||||
|
||||
|
|
@ -2353,6 +2411,8 @@ mod tests_resharding_registry {
|
|||
target_shards: 256,
|
||||
phase: ReshardPhase::DualWriteActive,
|
||||
started_at: 2000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
reg.register("orders".to_string(), state2).unwrap();
|
||||
|
||||
|
|
@ -2382,6 +2442,8 @@ mod tests_resharding_registry {
|
|||
target_shards: 128,
|
||||
phase: ReshardPhase::DualWriteActive,
|
||||
started_at: 1000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
reg.register("products".to_string(), products_state)
|
||||
.unwrap();
|
||||
|
|
@ -2392,6 +2454,8 @@ mod tests_resharding_registry {
|
|||
target_shards: 256,
|
||||
phase: ReshardPhase::ShadowCreated,
|
||||
started_at: 2000,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
reg.register("orders".to_string(), orders_state).unwrap();
|
||||
|
||||
|
|
@ -3818,11 +3882,16 @@ pub struct ReshardOrchestratorConfig {
|
|||
pub task_store: Option<Arc<dyn crate::task_store::TaskStore>>,
|
||||
/// Metrics callback for phase transitions.
|
||||
pub metrics_callback: Option<ReshardMetricsCallback>,
|
||||
/// Progress callback for registry updates (documents_backfilled, total_documents).
|
||||
pub progress_callback: Option<ReshardProgressCallback>,
|
||||
}
|
||||
|
||||
/// Callback for metrics emission during resharding.
|
||||
pub type ReshardMetricsCallback = Arc<dyn Fn(ReshardPhase, u64) + Send + Sync>;
|
||||
|
||||
/// Callback for progress updates during resharding backfill.
|
||||
pub type ReshardProgressCallback = Arc<dyn Fn(u64, u64) + Send + Sync>;
|
||||
|
||||
/// Result of the full reshard operation.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ReshardOrchestratorResult {
|
||||
|
|
@ -3937,6 +4006,13 @@ pub async fn execute_reshard(
|
|||
ReshardPhase::BackfillInProgress,
|
||||
backfill_result.documents_backfilled,
|
||||
);
|
||||
// Update registry with final backfill progress
|
||||
if let Some(ref cb) = config.progress_callback {
|
||||
cb(
|
||||
backfill_result.documents_backfilled,
|
||||
backfill_result.total_estimated,
|
||||
);
|
||||
}
|
||||
tracing::info!(
|
||||
documents_backfilled = backfill_result.documents_backfilled,
|
||||
"Phase 3 complete: backfill finished"
|
||||
|
|
@ -4143,6 +4219,7 @@ mod tests_orchestrator {
|
|||
alias_history_retention: 10,
|
||||
task_store: None,
|
||||
metrics_callback: None,
|
||||
progress_callback: None,
|
||||
};
|
||||
|
||||
assert_eq!(config.index_uid, "products");
|
||||
|
|
|
|||
|
|
@ -3355,6 +3355,8 @@ where
|
|||
target_shards: req.new_shards,
|
||||
phase: miroir_core::reshard::ReshardPhase::ShadowCreated,
|
||||
started_at: now,
|
||||
documents_backfilled: 0,
|
||||
total_documents: 0,
|
||||
};
|
||||
|
||||
let mut registry = app_state.resharding_registry.write().await;
|
||||
|
|
@ -3438,6 +3440,27 @@ where
|
|||
},
|
||||
);
|
||||
|
||||
// Create progress callback to update registry during backfill
|
||||
let index_uid_for_cb = index_uid_clone.clone();
|
||||
let registry_for_cb = registry.clone();
|
||||
let progress_callback =
|
||||
std::sync::Arc::new(move |docs_backfilled: u64, total_docs: u64| {
|
||||
let index_uid = index_uid_for_cb.clone();
|
||||
let registry = registry_for_cb.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut reg = registry.write().await;
|
||||
if let Err(e) =
|
||||
reg.update_progress(&index_uid, docs_backfilled, total_docs)
|
||||
{
|
||||
tracing::error!(
|
||||
index_uid = %index_uid,
|
||||
error = %e,
|
||||
"failed to update resharding progress"
|
||||
);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
let config = miroir_core::reshard::ReshardOrchestratorConfig {
|
||||
index_uid: index_uid_clone.clone(),
|
||||
target_shards: req.new_shards,
|
||||
|
|
@ -3451,6 +3474,7 @@ where
|
|||
alias_history_retention: 10,
|
||||
task_store: task_store.clone(),
|
||||
metrics_callback: Some(metrics_callback),
|
||||
progress_callback: Some(progress_callback),
|
||||
};
|
||||
|
||||
// Run the full orchestrator
|
||||
|
|
@ -3536,6 +3560,11 @@ where
|
|||
let operation = registry.get(&index_uid);
|
||||
|
||||
if let Some(op) = operation {
|
||||
let backfill_progress = if op.total_documents > 0 {
|
||||
op.documents_backfilled as f64 / op.total_documents as f64
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
Ok(Json(ReshardStatusResponse {
|
||||
active: true,
|
||||
operation: Some(ReshardOperationDetails {
|
||||
|
|
@ -3544,9 +3573,9 @@ where
|
|||
old_shards: op.old_shards,
|
||||
new_shards: op.target_shards,
|
||||
phase: op.phase.name().to_string(),
|
||||
documents_backfilled: 0, // TODO: track progress
|
||||
total_documents: 0,
|
||||
backfill_progress: 0.0,
|
||||
documents_backfilled: op.documents_backfilled,
|
||||
total_documents: op.total_documents,
|
||||
backfill_progress,
|
||||
shadow_index: op.shadow_index.clone(),
|
||||
started_at: op.started_at,
|
||||
last_error: None,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue