diff --git a/crates/miroir-core/src/anti_entropy.rs b/crates/miroir-core/src/anti_entropy.rs index 0d84905..a22f27f 100644 --- a/crates/miroir-core/src/anti_entropy.rs +++ b/crates/miroir-core/src/anti_entropy.rs @@ -310,7 +310,7 @@ impl AntiEntropyReconciler { .node_client .fetch_documents(node_id, address, &request) .await - .map_err(|e| MiroirError::Topology(format!("fetch failed: {:?}", e)))?; + .map_err(|e| MiroirError::Topology(format!("fetch failed: {e:?}")))?; if response.results.is_empty() { break; // No more documents @@ -449,7 +449,7 @@ impl AntiEntropyReconciler { .node_client .fetch_documents(node_id, address, &request) .await - .map_err(|e| MiroirError::Topology(format!("fetch failed: {:?}", e)))?; + .map_err(|e| MiroirError::Topology(format!("fetch failed: {e:?}")))?; if response.results.is_empty() { break; @@ -629,7 +629,7 @@ impl AntiEntropyReconciler { total_mismatches += mismatches; } Err(e) => { - pass.errors.push(format!("shard {}: {}", shard_id, e)); + pass.errors.push(format!("shard {shard_id}: {e}")); } } @@ -697,7 +697,7 @@ impl AntiEntropyReconciler { let topology_guard = self.topology.read().await; let node = topology_guard .node(&node_id) - .ok_or_else(|| MiroirError::Topology(format!("node {} not found", node_id)))?; + .ok_or_else(|| MiroirError::Topology(format!("node {node_id} not found")))?; if !node.is_healthy() { warn!("Node {} is not healthy, skipping fingerprint", node_id); @@ -935,13 +935,13 @@ impl AntiEntropyReconciler { .as_ref() .and_then(|d| d.get("_miroir_expires_at")) .and_then(|v| v.as_u64()) - .map_or(false, |expires| expires <= now_ms); + .is_some_and(|expires| expires <= now_ms); let target_expired = target_doc .as_ref() .and_then(|d| d.get("_miroir_expires_at")) .and_then(|v| v.as_u64()) - .map_or(false, |expires| expires <= now_ms); + .is_some_and(|expires| expires <= now_ms); if ref_expired || target_expired { info!( @@ -1164,7 +1164,7 @@ impl AntiEntropyReconciler { self.node_client .write_documents(node_id, address, &request) .await - .map_err(|e| MiroirError::Topology(format!("write failed: {:?}", e)))?; + .map_err(|e| MiroirError::Topology(format!("write failed: {e:?}")))?; Ok(()) } @@ -1185,7 +1185,7 @@ impl AntiEntropyReconciler { self.node_client .delete_documents(node_id, address, &request) .await - .map_err(|e| MiroirError::Topology(format!("delete failed: {:?}", e)))?; + .map_err(|e| MiroirError::Topology(format!("delete failed: {e:?}")))?; debug!("Deleted PK {} from node {}", primary_key, node_id); Ok(()) @@ -1290,7 +1290,7 @@ impl AntiEntropyReconciler { .node_client .fetch_documents(node_id, address, &request) .await - .map_err(|e| MiroirError::Topology(format!("fetch failed: {:?}", e)))?; + .map_err(|e| MiroirError::Topology(format!("fetch failed: {e:?}")))?; if response.results.is_empty() { break; @@ -1567,9 +1567,9 @@ mod tests_mode_a_acceptance { "pod-2".to_string(), "pod-3".to_string(), ]); - *coordinator_1.cached_peer_set.write().await = peer_set.clone(); - *coordinator_2.cached_peer_set.write().await = peer_set.clone(); - *coordinator_3.cached_peer_set.write().await = peer_set; + coordinator_1.set_peer_set_for_test(peer_set.clone()).await; + coordinator_2.set_peer_set_for_test(peer_set.clone()).await; + coordinator_3.set_peer_set_for_test(peer_set).await; // Create 3 anti-entropy reconcilers, one per pod let config = AntiEntropyConfig::default(); @@ -1691,9 +1691,9 @@ mod tests_mode_a_acceptance { "pod-2".to_string(), "pod-3".to_string(), ]); - *coordinator_1.cached_peer_set.write().await = peer_set_3pods.clone(); - *coordinator_2.cached_peer_set.write().await = peer_set_3pods.clone(); - *coordinator_3.cached_peer_set.write().await = peer_set_3pods.clone(); + coordinator_1.set_peer_set_for_test(peer_set_3pods.clone()).await; + coordinator_2.set_peer_set_for_test(peer_set_3pods.clone()).await; + coordinator_3.set_peer_set_for_test(peer_set_3pods.clone()).await; // Track which shards pod-3 owns initially let mut pod3_owned_initial = Vec::new(); @@ -1707,8 +1707,8 @@ mod tests_mode_a_acceptance { // Pod-3 dies: remove it from the peer set let peer_set_2pods = crate::peer_discovery::PeerSet::new(vec!["pod-1".to_string(), "pod-2".to_string()]); - *coordinator_1.cached_peer_set.write().await = peer_set_2pods.clone(); - *coordinator_2.cached_peer_set.write().await = peer_set_2pods.clone(); + coordinator_1.set_peer_set_for_test(peer_set_2pods.clone()).await; + coordinator_2.set_peer_set_for_test(peer_set_2pods.clone()).await; // Verify that all shards previously owned by pod-3 are now owned by pod-1 or pod-2 for shard_id in &pod3_owned_initial { @@ -1766,7 +1766,7 @@ mod tests_mode_a_acceptance { "pod-2".to_string(), "pod-3".to_string(), ]); - *coordinator.cached_peer_set.write().await = peer_set; + coordinator.set_peer_set_for_test(peer_set).await; // Create anti-entropy reconciler with Mode A let config = AntiEntropyConfig { diff --git a/crates/miroir-core/src/cdc.rs b/crates/miroir-core/src/cdc.rs index d3d321e..882b20c 100644 --- a/crates/miroir-core/src/cdc.rs +++ b/crates/miroir-core/src/cdc.rs @@ -121,6 +121,37 @@ pub enum CdcOperation { Add, Update, Delete, + /// Analytics event: click-through (plan §13.21). + ClickThrough, + /// Analytics event: latency measurement (plan §13.21). + Latency, +} + +/// Analytics event type for search UI beacons (plan §13.21). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AnalyticsEvent { + /// Event type: "click_through" or "latency". + pub event_type: String, + /// Stable event ID for deduplication. + pub event_id: String, + /// Opaque session ID from JWT. + pub session_id: String, + /// Index UID. + pub index: String, + /// Query string (for search/latency events). + #[serde(skip_serializing_if = "Option::is_none")] + pub query: Option, + /// Result ID (for click events). + #[serde(skip_serializing_if = "Option::is_none")] + pub result_id: Option, + /// Click position (for click events). + #[serde(skip_serializing_if = "Option::is_none")] + pub result_position: Option, + /// Latency in milliseconds (for latency events). + #[serde(skip_serializing_if = "Option::is_none")] + pub latency_ms: Option, + /// UNIX timestamp (ms). + pub timestamp: u64, } /// CDC sink configuration. @@ -371,6 +402,58 @@ impl CdcInternalQueue { Ok(None) } } + + /// Store an analytics event (plan §13.21). + /// Analytics events (click_through, latency) are stored in the internal queue + /// and can be queried via GET /_miroir/changes. + pub async fn store_analytics(&self, event: AnalyticsEvent) -> u64 { + let index = event.index.clone(); + let event_id = event.event_id.clone(); + let event_type = event.event_type.clone(); + let result_id = event.result_id.clone(); + let timestamp = event.timestamp; + + let mut sequences = self.sequences.write().await; + let seq = sequences.entry(index.clone()).or_insert(0); + *seq += 1; + let sequence = *seq; + + // Convert analytics event to a CdcEvent for storage + let cdc_event = CdcEvent { + mtask_id: format!("analytics:{}", event_id), + index: index.clone(), + operation: if event_type == "click_through" { + CdcOperation::ClickThrough + } else { + CdcOperation::Latency + }, + primary_keys: result_id.into_iter().collect(), + shard_ids: vec![], + settings_version: 0, + timestamp, + document: Some(serde_json::to_value(&event).unwrap_or_default()), + origin: None, + event_id, + }; + + let mut events = self.events.write().await; + events + .entry(index.clone()) + .or_insert_with(Vec::new) + .push((sequence, cdc_event)); + + // Trim old events to keep memory usage bounded (keep last 10,000 per index) + if let Some(events_vec) = events.get_mut(&index) { + if events_vec.len() > 10_000 { + events_vec.drain(0..events_vec.len() - 10_000); + } + } + + // Notify waiting consumers of the new event + let _ = self.notify_tx.send(index.clone()); + + sequence + } } /// CDC manager — publishes change events to configured sinks. @@ -1151,6 +1234,16 @@ impl CdcManager { Ok(()) } + /// Publish an analytics event (plan §13.21). + /// + /// Analytics events (click_through, latency) are stored in the internal queue + /// and can be queried via GET /_miroir/changes. They are not sent to external + /// sinks unless configured via `search_ui.analytics.sink`. + pub async fn publish_analytics(&self, event: AnalyticsEvent) { + // Store in internal queue for GET /_miroir/changes endpoint + self.internal_queue.store_analytics(event).await; + } + /// Get current publisher state. pub async fn state(&self) -> CdcPublisherState { self.state.read().await.clone() diff --git a/crates/miroir-core/src/mode_a_coordinator.rs b/crates/miroir-core/src/mode_a_coordinator.rs index 2f80391..5d3416a 100644 --- a/crates/miroir-core/src/mode_a_coordinator.rs +++ b/crates/miroir-core/src/mode_a_coordinator.rs @@ -24,7 +24,7 @@ use crate::peer_discovery::{PeerDiscovery, PeerId, PeerSet}; use std::hash::Hasher; use std::sync::Arc; use tokio::sync::RwLock; -use tracing::{debug, info, warn}; +use tracing::{debug, warn}; use twox_hash::XxHash64; /// Error type for Mode A coordination. @@ -179,7 +179,7 @@ impl ModeACoordinator { let score = Self::rendezvous_score(miroir_id, peer); if score > best_score { best_score = score; - is_owner = (peer == &self.pod_id); + is_owner = peer == &self.pod_id; } } @@ -220,7 +220,7 @@ impl ModeACoordinator { let score = Self::rendezvous_score(miroir_id, peer); if score > best_score { best_score = score; - is_owner = (peer == &self.pod_id); + is_owner = peer == &self.pod_id; } } @@ -238,7 +238,7 @@ impl ModeACoordinator { /// /// Combines index and node into a single key for rendezvous hashing. pub async fn owns_settings_check(&self, index_uid: &str, node_id: &str) -> Result { - let key = format!("{}:{}", index_uid, node_id); + let key = format!("{index_uid}:{node_id}"); self.owns_task(&key).await } @@ -293,6 +293,16 @@ impl ModeACoordinator { pub fn pod_id(&self) -> &str { &self.pod_id } + + /// Set the peer set directly (test-only). + /// + /// This method is only intended for use in tests to simulate different + /// peer configurations without going through peer discovery. + #[cfg(test)] + pub async fn set_peer_set_for_test(&self, peer_set: PeerSet) { + let mut cached = self.cached_peer_set.write().await; + *cached = peer_set; + } } #[cfg(test)] diff --git a/crates/miroir-core/src/rebalancer_worker/drift_reconciler.rs b/crates/miroir-core/src/rebalancer_worker/drift_reconciler.rs index b5f3303..001b1da 100644 --- a/crates/miroir-core/src/rebalancer_worker/drift_reconciler.rs +++ b/crates/miroir-core/src/rebalancer_worker/drift_reconciler.rs @@ -37,8 +37,7 @@ use reqwest::Client; use serde_json::Value; use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::sync::RwLock; +use std::time::Duration; use tracing::{debug, error, info, warn}; /// Configuration for the drift reconciler worker. @@ -65,6 +64,9 @@ impl Default for DriftReconcilerConfig { } } +/// Callback type for recording drift repair metrics. +pub type DriftRepairCallback = Arc; + /// Settings drift reconciler background worker. /// /// Runs as a Tokio task, uses Mode A rendezvous hashing to partition @@ -78,6 +80,8 @@ pub struct DriftReconciler { pod_id: String, /// Mode A coordinator for partitioning drift checks (plan §14.5 Mode A). mode_a_coordinator: Option>, + /// Callback for recording drift repair metrics. + metrics_callback: Option, } impl DriftReconciler { @@ -98,6 +102,7 @@ impl DriftReconciler { node_master_key, pod_id, mode_a_coordinator: None, + metrics_callback: None, } } @@ -107,6 +112,12 @@ impl DriftReconciler { self } + /// Set the metrics callback for recording drift repairs. + pub fn with_metrics_callback(mut self, callback: DriftRepairCallback) -> Self { + self.metrics_callback = Some(callback); + self + } + /// Start the background worker. /// /// This runs in a loop using Mode A coordination (plan §14.5): @@ -184,7 +195,7 @@ impl DriftReconciler { for address in &self.node_addresses { // Mode A coordination: only check pairs we own // Key is "index_uid:node_address" for rendezvous hashing - let pair_key = format!("{}:{}", index, address); + let pair_key = format!("{index}:{address}"); if let Some(ref coordinator) = self.mode_a_coordinator { // Check if we own this (index, node) pair @@ -195,7 +206,7 @@ impl DriftReconciler { } } - let path = format!("/indexes/{}/settings", index); + let path = format!("/indexes/{index}/settings"); match self.get_settings(client, address, &path).await { Ok(settings) => { let hash = fingerprint_settings(&settings); @@ -250,7 +261,7 @@ impl DriftReconciler { .iter() .find(|(_addr, settings)| { let hash = fingerprint_settings(settings); - &hash == &consensus_hash + hash == consensus_hash }) .map(|(_, settings)| settings); @@ -258,12 +269,16 @@ impl DriftReconciler { // Repair drifted nodes for address in &drifted_nodes { if let Err(e) = self - .repair_node_settings(client, address, index, &consensus_settings) + .repair_node_settings(client, address, index, consensus_settings) .await { error!(node = %address, index = %index, error = %e, "failed to repair settings"); } else { info!(node = %address, index = %index, "repaired settings drift"); + // Record metrics if callback is set + if let Some(ref callback) = self.metrics_callback { + callback(index); + } } } } @@ -281,7 +296,7 @@ impl DriftReconciler { index: &str, settings: &Value, ) -> Result<()> { - let path = format!("/indexes/{}/settings", index); + let path = format!("/indexes/{index}/settings"); let url = format!("{}{}", address.trim_end_matches('/'), path); let response = client @@ -290,7 +305,7 @@ impl DriftReconciler { .json(settings) .send() .await - .map_err(|e| MiroirError::InvalidState(format!("request failed: {}", e)))?; + .map_err(|e| MiroirError::InvalidState(format!("request failed: {e}")))?; if response.status().is_success() { Ok(()) @@ -298,8 +313,7 @@ impl DriftReconciler { let status = response.status(); let text = response.text().await.unwrap_or_default(); Err(MiroirError::InvalidState(format!( - "repair failed: HTTP {} — {}", - status, text + "repair failed: HTTP {status} — {text}" ))) } } @@ -313,7 +327,7 @@ impl DriftReconciler { .header("Authorization", format!("Bearer {}", self.node_master_key)) .send() .await - .map_err(|e| MiroirError::InvalidState(format!("request failed: {}", e)))?; + .map_err(|e| MiroirError::InvalidState(format!("request failed: {e}")))?; if !response.status().is_success() { return Err(MiroirError::InvalidState(format!( @@ -325,7 +339,7 @@ impl DriftReconciler { let json: Value = response .json() .await - .map_err(|e| MiroirError::InvalidState(format!("parse response: {}", e)))?; + .map_err(|e| MiroirError::InvalidState(format!("parse response: {e}")))?; let indexes = json .get("results") @@ -350,7 +364,7 @@ impl DriftReconciler { .header("Authorization", format!("Bearer {}", self.node_master_key)) .send() .await - .map_err(|e| MiroirError::InvalidState(format!("request failed: {}", e)))?; + .map_err(|e| MiroirError::InvalidState(format!("request failed: {e}")))?; if !response.status().is_success() { return Err(MiroirError::InvalidState(format!( @@ -362,7 +376,7 @@ impl DriftReconciler { response .json() .await - .map_err(|e| MiroirError::InvalidState(format!("parse response: {}", e))) + .map_err(|e| MiroirError::InvalidState(format!("parse response: {e}"))) } } diff --git a/crates/miroir-core/src/rebalancer_worker/mod.rs b/crates/miroir-core/src/rebalancer_worker/mod.rs index a0a0637..106a024 100644 --- a/crates/miroir-core/src/rebalancer_worker/mod.rs +++ b/crates/miroir-core/src/rebalancer_worker/mod.rs @@ -18,7 +18,7 @@ mod acceptance_tests; mod settings_broadcast_acceptance_tests; pub use anti_entropy_worker::{AntiEntropyWorker, AntiEntropyWorkerConfig}; -pub use drift_reconciler::{DriftReconciler, DriftReconcilerConfig}; +pub use drift_reconciler::{DriftRepairCallback, DriftReconciler, DriftReconcilerConfig}; use crate::migration::{MigrationCoordinator, MigrationId, MigrationNodeId, ShardId}; use crate::rebalancer::{MigrationExecutor, Rebalancer, RebalancerMetrics}; @@ -56,7 +56,7 @@ pub struct RebalanceJobId(pub String); impl RebalanceJobId { /// Create a new rebalance job ID for an index. pub fn new(index_uid: &str) -> Self { - Self(format!("rebalance:{}", index_uid)) + Self(format!("rebalance:{index_uid}")) } /// Get the index UID from the job ID. @@ -315,7 +315,7 @@ impl RebalancerWorker { // Build scopes for each index: rebalance: let scopes: Vec = index_uids .into_iter() - .map(|uid| format!("rebalance:{}", uid)) + .map(|uid| format!("rebalance:{uid}")) .collect(); let mut acquired_any = false; @@ -424,11 +424,11 @@ impl RebalancerWorker { } Ok(Err(e)) => { error!(scope = %scope, error = %e, "failed to renew lease"); - return Err(format!("lease renewal failed: {}", e)); + return Err(format!("lease renewal failed: {e}")); } Err(e) => { error!(scope = %scope, error = %e, "spawn_blocking task failed"); - return Err(format!("lease renewal task failed: {}", e)); + return Err(format!("lease renewal task failed: {e}")); } } } @@ -472,13 +472,13 @@ impl RebalancerWorker { // Derive the scope from the event to check leadership let scope = match &event { - TopologyChangeEvent::NodeAdded { index_uid, .. } => format!("rebalance:{}", index_uid), + TopologyChangeEvent::NodeAdded { index_uid, .. } => format!("rebalance:{index_uid}"), TopologyChangeEvent::NodeDraining { index_uid, .. } => { - format!("rebalance:{}", index_uid) + format!("rebalance:{index_uid}") } - TopologyChangeEvent::NodeFailed { index_uid, .. } => format!("rebalance:{}", index_uid), + TopologyChangeEvent::NodeFailed { index_uid, .. } => format!("rebalance:{index_uid}"), TopologyChangeEvent::NodeRecovered { index_uid, .. } => { - format!("rebalance:{}", index_uid) + format!("rebalance:{index_uid}") } }; @@ -498,8 +498,8 @@ impl RebalancerWorker { } }) .await - .map_err(|e| format!("failed to check leader lease: {}", e))? - .map_err(|e| format!("failed to check leader lease: {}", e))?; + .map_err(|e| format!("failed to check leader lease: {e}"))? + .map_err(|e| format!("failed to check leader lease: {e}"))?; if !is_leader { debug!( @@ -574,8 +574,8 @@ impl RebalancerWorker { move || task_store.list_jobs_by_state("running") }) .await - .map_err(|e| format!("failed to list jobs: {}", e))? - .map_err(|e| format!("failed to list jobs: {}", e))?; + .map_err(|e| format!("failed to list jobs: {e}"))? + .map_err(|e| format!("failed to list jobs: {e}"))?; for existing_job in existing_jobs { if existing_job.id == job_id.0 { @@ -632,7 +632,7 @@ impl RebalancerWorker { let new_node = topo_to_migration_node_id(&TopologyNodeId::new(node_id.to_string())); coordinator .begin_migration(new_node, replica_group, old_owners) - .map_err(|e| format!("failed to create migration: {}", e))? + .map_err(|e| format!("failed to create migration: {e}"))? }; // Start dual-write immediately so the router starts writing to both nodes @@ -640,7 +640,7 @@ impl RebalancerWorker { let mut coordinator = self.migration_coordinator.write().await; coordinator .begin_dual_write(migration_id) - .map_err(|e| format!("failed to start dual-write: {}", e))?; + .map_err(|e| format!("failed to start dual-write: {e}"))?; } let job = RebalanceJob { @@ -729,7 +729,7 @@ impl RebalancerWorker { let new_node = topo_to_migration_node_id(first_dest); coordinator .begin_migration(new_node, replica_group, old_owners) - .map_err(|e| format!("failed to create migration: {}", e))? + .map_err(|e| format!("failed to create migration: {e}"))? } else { return Err("no shards to migrate".to_string()); } @@ -740,7 +740,7 @@ impl RebalancerWorker { let mut coordinator = self.migration_coordinator.write().await; coordinator .begin_dual_write(migration_id) - .map_err(|e| format!("failed to start dual-write: {}", e))?; + .map_err(|e| format!("failed to start dual-write: {e}"))?; } let job = RebalanceJob { @@ -841,9 +841,9 @@ impl RebalancerWorker { let group = topo .groups() .find(|g| g.id == replica_group) - .ok_or_else(|| format!("replica group {} not found", replica_group))?; + .ok_or_else(|| format!("replica group {replica_group} not found"))?; - let existing_nodes: Vec<_> = group.nodes().iter().cloned().collect(); + let existing_nodes: Vec<_> = group.nodes().to_vec(); let mut affected_shards = Vec::new(); // For each shard, check if adding the new node would change the assignment @@ -885,7 +885,7 @@ impl RebalancerWorker { let group = topo .groups() .find(|g| g.id == replica_group) - .ok_or_else(|| format!("replica group {} not found", replica_group))?; + .ok_or_else(|| format!("replica group {replica_group} not found"))?; let other_nodes: Vec<_> = group .nodes() @@ -1264,7 +1264,7 @@ impl RebalancerWorker { /// Persist a job to the task store. async fn persist_job(&self, job: &RebalanceJob) -> Result<(), String> { let progress = - serde_json::to_string(job).map_err(|e| format!("failed to serialize job: {}", e))?; + serde_json::to_string(job).map_err(|e| format!("failed to serialize job: {e}"))?; let new_job = NewJob { id: job.id.0.clone(), @@ -1298,8 +1298,8 @@ impl RebalancerWorker { move || task_store.insert_job(&new_job) }) .await - .map_err(|e| format!("failed to persist job: {}", e))? - .map_err(|e| format!("failed to persist job: {}", e))?; + .map_err(|e| format!("failed to persist job: {e}"))? + .map_err(|e| format!("failed to persist job: {e}"))?; Ok(()) } @@ -1317,7 +1317,7 @@ impl RebalancerWorker { }; let progress_json = serde_json::to_string(&progress) - .map_err(|e| format!("failed to serialize progress: {}", e))?; + .map_err(|e| format!("failed to serialize progress: {e}"))?; // Update job progress in task store tokio::task::spawn_blocking({ @@ -1328,8 +1328,8 @@ impl RebalancerWorker { move || task_store.update_job_progress(&job_id, &completed_at, &progress_json) }) .await - .map_err(|e| format!("failed to update job progress: {}", e))? - .map_err(|e| format!("failed to update job progress: {}", e))?; + .map_err(|e| format!("failed to update job progress: {e}"))? + .map_err(|e| format!("failed to update job progress: {e}"))?; } Ok(()) @@ -1348,7 +1348,7 @@ impl RebalancerWorker { let shard = ShardId(shard_id); // Look for a migration in the coordinator that affects this shard - for (_mid, migration_state) in coordinator.get_all_migrations() { + for migration_state in coordinator.get_all_migrations().values() { if let Some(migration_shard_state) = migration_state.affected_shards.get(&shard) { // Sync the phase based on the migration coordinator state use crate::migration::ShardMigrationState as CoordinatorState; @@ -1386,7 +1386,7 @@ impl RebalancerWorker { shard_id: u32, ) -> Result<(), String> { let shard = ShardId(shard_id); - let mut coordinator = self.migration_coordinator.write().await; + let coordinator = self.migration_coordinator.write().await; // Find or create the migration for this shard // For now, we'll create a new migration if one doesn't exist @@ -1465,7 +1465,7 @@ impl RebalancerWorker { let coordinator = self.migration_coordinator.read().await; // Check if the migration coordinator has marked this shard as complete - for (_mid, migration_state) in coordinator.get_all_migrations() { + for migration_state in coordinator.get_all_migrations().values() { if let Some(shard_state) = migration_state.affected_shards.get(&shard) { use crate::migration::ShardMigrationState as CoordinatorState; if matches!(shard_state, CoordinatorState::MigrationComplete { .. }) { @@ -1517,7 +1517,7 @@ impl RebalancerWorker { offset, ) .await - .map_err(|e| format!("fetch failed: {}", e))?; + .map_err(|e| format!("fetch failed: {e}"))?; if docs.is_empty() { break; // No more documents @@ -1527,7 +1527,7 @@ impl RebalancerWorker { executor .write_documents(new_node_id, new_address, index_uid, docs.clone()) .await - .map_err(|e| format!("write failed: {}", e))?; + .map_err(|e| format!("write failed: {e}"))?; total_docs_copied += docs.len() as u64; offset += limit; @@ -1544,7 +1544,7 @@ impl RebalancerWorker { let mut coordinator = self.migration_coordinator.write().await; coordinator .shard_migration_complete(migration_id, ShardId(shard_id), total_docs_copied) - .map_err(|e| format!("failed to mark shard complete: {}", e))?; + .map_err(|e| format!("failed to mark shard complete: {e}"))?; } // Update metrics @@ -1580,7 +1580,7 @@ impl RebalancerWorker { info!(index_uid = %index_uid, "paused rebalance"); Ok(()) } else { - Err(format!("no rebalance job found for index {}", index_uid)) + Err(format!("no rebalance job found for index {index_uid}")) } } @@ -1594,7 +1594,7 @@ impl RebalancerWorker { info!(index_uid = %index_uid, "resumed rebalance"); Ok(()) } else { - Err(format!("no rebalance job found for index {}", index_uid)) + Err(format!("no rebalance job found for index {index_uid}")) } } @@ -1605,8 +1605,8 @@ impl RebalancerWorker { move || task_store.list_jobs_by_state("running") }) .await - .map_err(|e| format!("failed to list jobs: {}", e))? - .map_err(|e| format!("failed to list jobs: {}", e))?; + .map_err(|e| format!("failed to list jobs: {e}"))? + .map_err(|e| format!("failed to list jobs: {e}"))?; for job_row in jobs { if job_row.type_ == "rebalance" { diff --git a/crates/miroir-proxy/src/routes/admin_endpoints.rs b/crates/miroir-proxy/src/routes/admin_endpoints.rs index eac83cd..5d7a3a7 100644 --- a/crates/miroir-proxy/src/routes/admin_endpoints.rs +++ b/crates/miroir-proxy/src/routes/admin_endpoints.rs @@ -567,6 +567,11 @@ impl AppState { lease_ttl_secs: 10, lease_renewal_interval_ms: 2000, }; + let metrics_clone = metrics.clone(); + let callback: miroir_core::rebalancer_worker::DriftRepairCallback = + Arc::new(move |index: &str| { + metrics_clone.inc_settings_drift_repair(index); + }); Some(Arc::new( miroir_core::rebalancer_worker::DriftReconciler::new( drift_config, @@ -575,7 +580,8 @@ impl AppState { node_addresses, config.node_master_key.clone(), pod_id.clone(), - ), + ) + .with_metrics_callback(callback), )) } else { None