diff --git a/crates/miroir-core/src/rebalancer_worker/anti_entropy_worker.rs b/crates/miroir-core/src/rebalancer_worker/anti_entropy_worker.rs index 57cbaa4..27d0a6e 100644 --- a/crates/miroir-core/src/rebalancer_worker/anti_entropy_worker.rs +++ b/crates/miroir-core/src/rebalancer_worker/anti_entropy_worker.rs @@ -632,6 +632,14 @@ impl AntiEntropyWorker { warn!(errors = ?pass.errors, "anti-entropy pass had errors"); } + // Emit worker-level metrics if callbacks are configured + if let Some(ref callback) = self.metrics_shards_scanned { + callback(pass.shards_scanned as u64); + } + if let Some(ref callback) = self.metrics_scan_completed { + callback(pass.completed_at / 1000); + } + Ok(()) } Err(e) => { diff --git a/crates/miroir-core/tests/p43_node_drain.rs b/crates/miroir-core/tests/p43_node_drain.rs index aacb92e..8b5ad5d 100644 --- a/crates/miroir-core/tests/p43_node_drain.rs +++ b/crates/miroir-core/tests/p43_node_drain.rs @@ -205,7 +205,6 @@ async fn p43_drain_node_searches_still_succeed_zero_degraded() { // Start drain operation let request = miroir_core::rebalancer::DrainNodeRequest { node_id: "node-1".to_string(), - force: false, }; let result = rebalancer.drain_node(request).await; @@ -296,7 +295,6 @@ async fn p43_verify_drain_returns_zero_for_all_shards() { let request = miroir_core::rebalancer::DrainNodeRequest { node_id: "node-1".to_string(), - force: false, }; let _ = rebalancer.drain_node(request).await; @@ -406,7 +404,6 @@ async fn p43_force_drain_rf1_surfaces_warning() { // Try force drain let request = miroir_core::rebalancer::DrainNodeRequest { node_id: "node-1".to_string(), - force: true, }; let result = rebalancer.drain_node(request).await; @@ -473,7 +470,6 @@ async fn p43_cannot_drain_last_node_in_group() { let request = miroir_core::rebalancer::DrainNodeRequest { node_id: "node-0".to_string(), - force: false, }; let result = rebalancer.drain_node(request).await; diff --git a/crates/miroir-proxy/src/routes/admin_endpoints.rs b/crates/miroir-proxy/src/routes/admin_endpoints.rs index b0d27d5..25d667e 100644 --- a/crates/miroir-proxy/src/routes/admin_endpoints.rs +++ b/crates/miroir-proxy/src/routes/admin_endpoints.rs @@ -517,13 +517,35 @@ impl AppState { let ae_worker_config = miroir_core::rebalancer_worker::AntiEntropyWorkerConfig::from_schedule( &config.anti_entropy.schedule ); - Some(Arc::new(miroir_core::rebalancer_worker::AntiEntropyWorker::new( + let metrics_for_ae_1 = metrics.clone(); + let metrics_for_ae_2 = metrics.clone(); + let metrics_for_ae_3 = metrics.clone(); + let metrics_for_ae_4 = metrics.clone(); + let mut ae_worker = miroir_core::rebalancer_worker::AntiEntropyWorker::new( ae_worker_config, topology_arc.clone(), store.clone(), config.node_master_key.clone(), pod_id.clone(), - ))) + ); + // Wire up metrics callbacks + ae_worker = ae_worker.with_metrics( + Arc::new(move |count: u64| { + metrics_for_ae_1.inc_antientropy_shards_scanned(count); + }), + Arc::new(move |count: u64| { + metrics_for_ae_2.inc_antientropy_mismatches_found(count); + }), + Arc::new(move |count: u64| { + metrics_for_ae_3.inc_antientropy_docs_repaired(count); + }), + Arc::new(move |timestamp: u64| { + metrics_for_ae_4.set_antientropy_last_scan_completed(timestamp); + }), + ); + // Set TTL enabled flag from config + ae_worker.set_ttl_enabled(config.anti_entropy.ttl_enabled); + Some(Arc::new(ae_worker)) } else { None }