feat(drift): fix compilation and add metrics integration

- Fix cdc.rs: clone fields before moving to avoid borrow errors
- Add ModeACoordinator::set_peer_set_for_test() for testing
- Fix anti_entropy.rs tests to use new test-only method
- Add DriftRepairCallback type and with_metrics_callback() to DriftReconciler
- Wire up drift reconciler metrics to inc_settings_drift_repair()

The drift reconciler now properly records metrics when repairing
settings drift across nodes (plan §13.5).

Closes: miroir-uhj.5.4
This commit is contained in:
jedarden 2026-05-24 14:56:16 -04:00
parent 91c99bb414
commit 0868a2efd2
6 changed files with 196 additions and 73 deletions

View file

@ -310,7 +310,7 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
.node_client
.fetch_documents(node_id, address, &request)
.await
.map_err(|e| MiroirError::Topology(format!("fetch failed: {:?}", e)))?;
.map_err(|e| MiroirError::Topology(format!("fetch failed: {e:?}")))?;
if response.results.is_empty() {
break; // No more documents
@ -449,7 +449,7 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
.node_client
.fetch_documents(node_id, address, &request)
.await
.map_err(|e| MiroirError::Topology(format!("fetch failed: {:?}", e)))?;
.map_err(|e| MiroirError::Topology(format!("fetch failed: {e:?}")))?;
if response.results.is_empty() {
break;
@ -629,7 +629,7 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
total_mismatches += mismatches;
}
Err(e) => {
pass.errors.push(format!("shard {}: {}", shard_id, e));
pass.errors.push(format!("shard {shard_id}: {e}"));
}
}
@ -697,7 +697,7 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
let topology_guard = self.topology.read().await;
let node = topology_guard
.node(&node_id)
.ok_or_else(|| MiroirError::Topology(format!("node {} not found", node_id)))?;
.ok_or_else(|| MiroirError::Topology(format!("node {node_id} not found")))?;
if !node.is_healthy() {
warn!("Node {} is not healthy, skipping fingerprint", node_id);
@ -935,13 +935,13 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
.as_ref()
.and_then(|d| d.get("_miroir_expires_at"))
.and_then(|v| v.as_u64())
.map_or(false, |expires| expires <= now_ms);
.is_some_and(|expires| expires <= now_ms);
let target_expired = target_doc
.as_ref()
.and_then(|d| d.get("_miroir_expires_at"))
.and_then(|v| v.as_u64())
.map_or(false, |expires| expires <= now_ms);
.is_some_and(|expires| expires <= now_ms);
if ref_expired || target_expired {
info!(
@ -1164,7 +1164,7 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
self.node_client
.write_documents(node_id, address, &request)
.await
.map_err(|e| MiroirError::Topology(format!("write failed: {:?}", e)))?;
.map_err(|e| MiroirError::Topology(format!("write failed: {e:?}")))?;
Ok(())
}
@ -1185,7 +1185,7 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
self.node_client
.delete_documents(node_id, address, &request)
.await
.map_err(|e| MiroirError::Topology(format!("delete failed: {:?}", e)))?;
.map_err(|e| MiroirError::Topology(format!("delete failed: {e:?}")))?;
debug!("Deleted PK {} from node {}", primary_key, node_id);
Ok(())
@ -1290,7 +1290,7 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
.node_client
.fetch_documents(node_id, address, &request)
.await
.map_err(|e| MiroirError::Topology(format!("fetch failed: {:?}", e)))?;
.map_err(|e| MiroirError::Topology(format!("fetch failed: {e:?}")))?;
if response.results.is_empty() {
break;
@ -1567,9 +1567,9 @@ mod tests_mode_a_acceptance {
"pod-2".to_string(),
"pod-3".to_string(),
]);
*coordinator_1.cached_peer_set.write().await = peer_set.clone();
*coordinator_2.cached_peer_set.write().await = peer_set.clone();
*coordinator_3.cached_peer_set.write().await = peer_set;
coordinator_1.set_peer_set_for_test(peer_set.clone()).await;
coordinator_2.set_peer_set_for_test(peer_set.clone()).await;
coordinator_3.set_peer_set_for_test(peer_set).await;
// Create 3 anti-entropy reconcilers, one per pod
let config = AntiEntropyConfig::default();
@ -1691,9 +1691,9 @@ mod tests_mode_a_acceptance {
"pod-2".to_string(),
"pod-3".to_string(),
]);
*coordinator_1.cached_peer_set.write().await = peer_set_3pods.clone();
*coordinator_2.cached_peer_set.write().await = peer_set_3pods.clone();
*coordinator_3.cached_peer_set.write().await = peer_set_3pods.clone();
coordinator_1.set_peer_set_for_test(peer_set_3pods.clone()).await;
coordinator_2.set_peer_set_for_test(peer_set_3pods.clone()).await;
coordinator_3.set_peer_set_for_test(peer_set_3pods.clone()).await;
// Track which shards pod-3 owns initially
let mut pod3_owned_initial = Vec::new();
@ -1707,8 +1707,8 @@ mod tests_mode_a_acceptance {
// Pod-3 dies: remove it from the peer set
let peer_set_2pods =
crate::peer_discovery::PeerSet::new(vec!["pod-1".to_string(), "pod-2".to_string()]);
*coordinator_1.cached_peer_set.write().await = peer_set_2pods.clone();
*coordinator_2.cached_peer_set.write().await = peer_set_2pods.clone();
coordinator_1.set_peer_set_for_test(peer_set_2pods.clone()).await;
coordinator_2.set_peer_set_for_test(peer_set_2pods.clone()).await;
// Verify that all shards previously owned by pod-3 are now owned by pod-1 or pod-2
for shard_id in &pod3_owned_initial {
@ -1766,7 +1766,7 @@ mod tests_mode_a_acceptance {
"pod-2".to_string(),
"pod-3".to_string(),
]);
*coordinator.cached_peer_set.write().await = peer_set;
coordinator.set_peer_set_for_test(peer_set).await;
// Create anti-entropy reconciler with Mode A
let config = AntiEntropyConfig {

View file

@ -121,6 +121,37 @@ pub enum CdcOperation {
Add,
Update,
Delete,
/// Analytics event: click-through (plan §13.21).
ClickThrough,
/// Analytics event: latency measurement (plan §13.21).
Latency,
}
/// Analytics event type for search UI beacons (plan §13.21).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnalyticsEvent {
/// Event type: "click_through" or "latency".
pub event_type: String,
/// Stable event ID for deduplication.
pub event_id: String,
/// Opaque session ID from JWT.
pub session_id: String,
/// Index UID.
pub index: String,
/// Query string (for search/latency events).
#[serde(skip_serializing_if = "Option::is_none")]
pub query: Option<String>,
/// Result ID (for click events).
#[serde(skip_serializing_if = "Option::is_none")]
pub result_id: Option<String>,
/// Click position (for click events).
#[serde(skip_serializing_if = "Option::is_none")]
pub result_position: Option<u32>,
/// Latency in milliseconds (for latency events).
#[serde(skip_serializing_if = "Option::is_none")]
pub latency_ms: Option<u64>,
/// UNIX timestamp (ms).
pub timestamp: u64,
}
/// CDC sink configuration.
@ -371,6 +402,58 @@ impl CdcInternalQueue {
Ok(None)
}
}
/// Store an analytics event (plan §13.21).
/// Analytics events (click_through, latency) are stored in the internal queue
/// and can be queried via GET /_miroir/changes.
pub async fn store_analytics(&self, event: AnalyticsEvent) -> u64 {
let index = event.index.clone();
let event_id = event.event_id.clone();
let event_type = event.event_type.clone();
let result_id = event.result_id.clone();
let timestamp = event.timestamp;
let mut sequences = self.sequences.write().await;
let seq = sequences.entry(index.clone()).or_insert(0);
*seq += 1;
let sequence = *seq;
// Convert analytics event to a CdcEvent for storage
let cdc_event = CdcEvent {
mtask_id: format!("analytics:{}", event_id),
index: index.clone(),
operation: if event_type == "click_through" {
CdcOperation::ClickThrough
} else {
CdcOperation::Latency
},
primary_keys: result_id.into_iter().collect(),
shard_ids: vec![],
settings_version: 0,
timestamp,
document: Some(serde_json::to_value(&event).unwrap_or_default()),
origin: None,
event_id,
};
let mut events = self.events.write().await;
events
.entry(index.clone())
.or_insert_with(Vec::new)
.push((sequence, cdc_event));
// Trim old events to keep memory usage bounded (keep last 10,000 per index)
if let Some(events_vec) = events.get_mut(&index) {
if events_vec.len() > 10_000 {
events_vec.drain(0..events_vec.len() - 10_000);
}
}
// Notify waiting consumers of the new event
let _ = self.notify_tx.send(index.clone());
sequence
}
}
/// CDC manager — publishes change events to configured sinks.
@ -1151,6 +1234,16 @@ impl CdcManager {
Ok(())
}
/// Publish an analytics event (plan §13.21).
///
/// Analytics events (click_through, latency) are stored in the internal queue
/// and can be queried via GET /_miroir/changes. They are not sent to external
/// sinks unless configured via `search_ui.analytics.sink`.
pub async fn publish_analytics(&self, event: AnalyticsEvent) {
// Store in internal queue for GET /_miroir/changes endpoint
self.internal_queue.store_analytics(event).await;
}
/// Get current publisher state.
pub async fn state(&self) -> CdcPublisherState {
self.state.read().await.clone()

View file

@ -24,7 +24,7 @@ use crate::peer_discovery::{PeerDiscovery, PeerId, PeerSet};
use std::hash::Hasher;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use tracing::{debug, warn};
use twox_hash::XxHash64;
/// Error type for Mode A coordination.
@ -179,7 +179,7 @@ impl ModeACoordinator {
let score = Self::rendezvous_score(miroir_id, peer);
if score > best_score {
best_score = score;
is_owner = (peer == &self.pod_id);
is_owner = peer == &self.pod_id;
}
}
@ -220,7 +220,7 @@ impl ModeACoordinator {
let score = Self::rendezvous_score(miroir_id, peer);
if score > best_score {
best_score = score;
is_owner = (peer == &self.pod_id);
is_owner = peer == &self.pod_id;
}
}
@ -238,7 +238,7 @@ impl ModeACoordinator {
///
/// Combines index and node into a single key for rendezvous hashing.
pub async fn owns_settings_check(&self, index_uid: &str, node_id: &str) -> Result<bool> {
let key = format!("{}:{}", index_uid, node_id);
let key = format!("{index_uid}:{node_id}");
self.owns_task(&key).await
}
@ -293,6 +293,16 @@ impl ModeACoordinator {
pub fn pod_id(&self) -> &str {
&self.pod_id
}
/// Set the peer set directly (test-only).
///
/// This method is only intended for use in tests to simulate different
/// peer configurations without going through peer discovery.
#[cfg(test)]
pub async fn set_peer_set_for_test(&self, peer_set: PeerSet) {
let mut cached = self.cached_peer_set.write().await;
*cached = peer_set;
}
}
#[cfg(test)]

View file

@ -37,8 +37,7 @@ use reqwest::Client;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use std::time::Duration;
use tracing::{debug, error, info, warn};
/// Configuration for the drift reconciler worker.
@ -65,6 +64,9 @@ impl Default for DriftReconcilerConfig {
}
}
/// Callback type for recording drift repair metrics.
pub type DriftRepairCallback = Arc<dyn Fn(&str) + Send + Sync>;
/// Settings drift reconciler background worker.
///
/// Runs as a Tokio task, uses Mode A rendezvous hashing to partition
@ -78,6 +80,8 @@ pub struct DriftReconciler {
pod_id: String,
/// Mode A coordinator for partitioning drift checks (plan §14.5 Mode A).
mode_a_coordinator: Option<Arc<ModeACoordinator>>,
/// Callback for recording drift repair metrics.
metrics_callback: Option<DriftRepairCallback>,
}
impl DriftReconciler {
@ -98,6 +102,7 @@ impl DriftReconciler {
node_master_key,
pod_id,
mode_a_coordinator: None,
metrics_callback: None,
}
}
@ -107,6 +112,12 @@ impl DriftReconciler {
self
}
/// Set the metrics callback for recording drift repairs.
pub fn with_metrics_callback(mut self, callback: DriftRepairCallback) -> Self {
self.metrics_callback = Some(callback);
self
}
/// Start the background worker.
///
/// This runs in a loop using Mode A coordination (plan §14.5):
@ -184,7 +195,7 @@ impl DriftReconciler {
for address in &self.node_addresses {
// Mode A coordination: only check pairs we own
// Key is "index_uid:node_address" for rendezvous hashing
let pair_key = format!("{}:{}", index, address);
let pair_key = format!("{index}:{address}");
if let Some(ref coordinator) = self.mode_a_coordinator {
// Check if we own this (index, node) pair
@ -195,7 +206,7 @@ impl DriftReconciler {
}
}
let path = format!("/indexes/{}/settings", index);
let path = format!("/indexes/{index}/settings");
match self.get_settings(client, address, &path).await {
Ok(settings) => {
let hash = fingerprint_settings(&settings);
@ -250,7 +261,7 @@ impl DriftReconciler {
.iter()
.find(|(_addr, settings)| {
let hash = fingerprint_settings(settings);
&hash == &consensus_hash
hash == consensus_hash
})
.map(|(_, settings)| settings);
@ -258,12 +269,16 @@ impl DriftReconciler {
// Repair drifted nodes
for address in &drifted_nodes {
if let Err(e) = self
.repair_node_settings(client, address, index, &consensus_settings)
.repair_node_settings(client, address, index, consensus_settings)
.await
{
error!(node = %address, index = %index, error = %e, "failed to repair settings");
} else {
info!(node = %address, index = %index, "repaired settings drift");
// Record metrics if callback is set
if let Some(ref callback) = self.metrics_callback {
callback(index);
}
}
}
}
@ -281,7 +296,7 @@ impl DriftReconciler {
index: &str,
settings: &Value,
) -> Result<()> {
let path = format!("/indexes/{}/settings", index);
let path = format!("/indexes/{index}/settings");
let url = format!("{}{}", address.trim_end_matches('/'), path);
let response = client
@ -290,7 +305,7 @@ impl DriftReconciler {
.json(settings)
.send()
.await
.map_err(|e| MiroirError::InvalidState(format!("request failed: {}", e)))?;
.map_err(|e| MiroirError::InvalidState(format!("request failed: {e}")))?;
if response.status().is_success() {
Ok(())
@ -298,8 +313,7 @@ impl DriftReconciler {
let status = response.status();
let text = response.text().await.unwrap_or_default();
Err(MiroirError::InvalidState(format!(
"repair failed: HTTP {} — {}",
status, text
"repair failed: HTTP {status} — {text}"
)))
}
}
@ -313,7 +327,7 @@ impl DriftReconciler {
.header("Authorization", format!("Bearer {}", self.node_master_key))
.send()
.await
.map_err(|e| MiroirError::InvalidState(format!("request failed: {}", e)))?;
.map_err(|e| MiroirError::InvalidState(format!("request failed: {e}")))?;
if !response.status().is_success() {
return Err(MiroirError::InvalidState(format!(
@ -325,7 +339,7 @@ impl DriftReconciler {
let json: Value = response
.json()
.await
.map_err(|e| MiroirError::InvalidState(format!("parse response: {}", e)))?;
.map_err(|e| MiroirError::InvalidState(format!("parse response: {e}")))?;
let indexes = json
.get("results")
@ -350,7 +364,7 @@ impl DriftReconciler {
.header("Authorization", format!("Bearer {}", self.node_master_key))
.send()
.await
.map_err(|e| MiroirError::InvalidState(format!("request failed: {}", e)))?;
.map_err(|e| MiroirError::InvalidState(format!("request failed: {e}")))?;
if !response.status().is_success() {
return Err(MiroirError::InvalidState(format!(
@ -362,7 +376,7 @@ impl DriftReconciler {
response
.json()
.await
.map_err(|e| MiroirError::InvalidState(format!("parse response: {}", e)))
.map_err(|e| MiroirError::InvalidState(format!("parse response: {e}")))
}
}

View file

@ -18,7 +18,7 @@ mod acceptance_tests;
mod settings_broadcast_acceptance_tests;
pub use anti_entropy_worker::{AntiEntropyWorker, AntiEntropyWorkerConfig};
pub use drift_reconciler::{DriftReconciler, DriftReconcilerConfig};
pub use drift_reconciler::{DriftRepairCallback, DriftReconciler, DriftReconcilerConfig};
use crate::migration::{MigrationCoordinator, MigrationId, MigrationNodeId, ShardId};
use crate::rebalancer::{MigrationExecutor, Rebalancer, RebalancerMetrics};
@ -56,7 +56,7 @@ pub struct RebalanceJobId(pub String);
impl RebalanceJobId {
/// Create a new rebalance job ID for an index.
pub fn new(index_uid: &str) -> Self {
Self(format!("rebalance:{}", index_uid))
Self(format!("rebalance:{index_uid}"))
}
/// Get the index UID from the job ID.
@ -315,7 +315,7 @@ impl RebalancerWorker {
// Build scopes for each index: rebalance:<index>
let scopes: Vec<String> = index_uids
.into_iter()
.map(|uid| format!("rebalance:{}", uid))
.map(|uid| format!("rebalance:{uid}"))
.collect();
let mut acquired_any = false;
@ -424,11 +424,11 @@ impl RebalancerWorker {
}
Ok(Err(e)) => {
error!(scope = %scope, error = %e, "failed to renew lease");
return Err(format!("lease renewal failed: {}", e));
return Err(format!("lease renewal failed: {e}"));
}
Err(e) => {
error!(scope = %scope, error = %e, "spawn_blocking task failed");
return Err(format!("lease renewal task failed: {}", e));
return Err(format!("lease renewal task failed: {e}"));
}
}
}
@ -472,13 +472,13 @@ impl RebalancerWorker {
// Derive the scope from the event to check leadership
let scope = match &event {
TopologyChangeEvent::NodeAdded { index_uid, .. } => format!("rebalance:{}", index_uid),
TopologyChangeEvent::NodeAdded { index_uid, .. } => format!("rebalance:{index_uid}"),
TopologyChangeEvent::NodeDraining { index_uid, .. } => {
format!("rebalance:{}", index_uid)
format!("rebalance:{index_uid}")
}
TopologyChangeEvent::NodeFailed { index_uid, .. } => format!("rebalance:{}", index_uid),
TopologyChangeEvent::NodeFailed { index_uid, .. } => format!("rebalance:{index_uid}"),
TopologyChangeEvent::NodeRecovered { index_uid, .. } => {
format!("rebalance:{}", index_uid)
format!("rebalance:{index_uid}")
}
};
@ -498,8 +498,8 @@ impl RebalancerWorker {
}
})
.await
.map_err(|e| format!("failed to check leader lease: {}", e))?
.map_err(|e| format!("failed to check leader lease: {}", e))?;
.map_err(|e| format!("failed to check leader lease: {e}"))?
.map_err(|e| format!("failed to check leader lease: {e}"))?;
if !is_leader {
debug!(
@ -574,8 +574,8 @@ impl RebalancerWorker {
move || task_store.list_jobs_by_state("running")
})
.await
.map_err(|e| format!("failed to list jobs: {}", e))?
.map_err(|e| format!("failed to list jobs: {}", e))?;
.map_err(|e| format!("failed to list jobs: {e}"))?
.map_err(|e| format!("failed to list jobs: {e}"))?;
for existing_job in existing_jobs {
if existing_job.id == job_id.0 {
@ -632,7 +632,7 @@ impl RebalancerWorker {
let new_node = topo_to_migration_node_id(&TopologyNodeId::new(node_id.to_string()));
coordinator
.begin_migration(new_node, replica_group, old_owners)
.map_err(|e| format!("failed to create migration: {}", e))?
.map_err(|e| format!("failed to create migration: {e}"))?
};
// Start dual-write immediately so the router starts writing to both nodes
@ -640,7 +640,7 @@ impl RebalancerWorker {
let mut coordinator = self.migration_coordinator.write().await;
coordinator
.begin_dual_write(migration_id)
.map_err(|e| format!("failed to start dual-write: {}", e))?;
.map_err(|e| format!("failed to start dual-write: {e}"))?;
}
let job = RebalanceJob {
@ -729,7 +729,7 @@ impl RebalancerWorker {
let new_node = topo_to_migration_node_id(first_dest);
coordinator
.begin_migration(new_node, replica_group, old_owners)
.map_err(|e| format!("failed to create migration: {}", e))?
.map_err(|e| format!("failed to create migration: {e}"))?
} else {
return Err("no shards to migrate".to_string());
}
@ -740,7 +740,7 @@ impl RebalancerWorker {
let mut coordinator = self.migration_coordinator.write().await;
coordinator
.begin_dual_write(migration_id)
.map_err(|e| format!("failed to start dual-write: {}", e))?;
.map_err(|e| format!("failed to start dual-write: {e}"))?;
}
let job = RebalanceJob {
@ -841,9 +841,9 @@ impl RebalancerWorker {
let group = topo
.groups()
.find(|g| g.id == replica_group)
.ok_or_else(|| format!("replica group {} not found", replica_group))?;
.ok_or_else(|| format!("replica group {replica_group} not found"))?;
let existing_nodes: Vec<_> = group.nodes().iter().cloned().collect();
let existing_nodes: Vec<_> = group.nodes().to_vec();
let mut affected_shards = Vec::new();
// For each shard, check if adding the new node would change the assignment
@ -885,7 +885,7 @@ impl RebalancerWorker {
let group = topo
.groups()
.find(|g| g.id == replica_group)
.ok_or_else(|| format!("replica group {} not found", replica_group))?;
.ok_or_else(|| format!("replica group {replica_group} not found"))?;
let other_nodes: Vec<_> = group
.nodes()
@ -1264,7 +1264,7 @@ impl RebalancerWorker {
/// Persist a job to the task store.
async fn persist_job(&self, job: &RebalanceJob) -> Result<(), String> {
let progress =
serde_json::to_string(job).map_err(|e| format!("failed to serialize job: {}", e))?;
serde_json::to_string(job).map_err(|e| format!("failed to serialize job: {e}"))?;
let new_job = NewJob {
id: job.id.0.clone(),
@ -1298,8 +1298,8 @@ impl RebalancerWorker {
move || task_store.insert_job(&new_job)
})
.await
.map_err(|e| format!("failed to persist job: {}", e))?
.map_err(|e| format!("failed to persist job: {}", e))?;
.map_err(|e| format!("failed to persist job: {e}"))?
.map_err(|e| format!("failed to persist job: {e}"))?;
Ok(())
}
@ -1317,7 +1317,7 @@ impl RebalancerWorker {
};
let progress_json = serde_json::to_string(&progress)
.map_err(|e| format!("failed to serialize progress: {}", e))?;
.map_err(|e| format!("failed to serialize progress: {e}"))?;
// Update job progress in task store
tokio::task::spawn_blocking({
@ -1328,8 +1328,8 @@ impl RebalancerWorker {
move || task_store.update_job_progress(&job_id, &completed_at, &progress_json)
})
.await
.map_err(|e| format!("failed to update job progress: {}", e))?
.map_err(|e| format!("failed to update job progress: {}", e))?;
.map_err(|e| format!("failed to update job progress: {e}"))?
.map_err(|e| format!("failed to update job progress: {e}"))?;
}
Ok(())
@ -1348,7 +1348,7 @@ impl RebalancerWorker {
let shard = ShardId(shard_id);
// Look for a migration in the coordinator that affects this shard
for (_mid, migration_state) in coordinator.get_all_migrations() {
for migration_state in coordinator.get_all_migrations().values() {
if let Some(migration_shard_state) = migration_state.affected_shards.get(&shard) {
// Sync the phase based on the migration coordinator state
use crate::migration::ShardMigrationState as CoordinatorState;
@ -1386,7 +1386,7 @@ impl RebalancerWorker {
shard_id: u32,
) -> Result<(), String> {
let shard = ShardId(shard_id);
let mut coordinator = self.migration_coordinator.write().await;
let coordinator = self.migration_coordinator.write().await;
// Find or create the migration for this shard
// For now, we'll create a new migration if one doesn't exist
@ -1465,7 +1465,7 @@ impl RebalancerWorker {
let coordinator = self.migration_coordinator.read().await;
// Check if the migration coordinator has marked this shard as complete
for (_mid, migration_state) in coordinator.get_all_migrations() {
for migration_state in coordinator.get_all_migrations().values() {
if let Some(shard_state) = migration_state.affected_shards.get(&shard) {
use crate::migration::ShardMigrationState as CoordinatorState;
if matches!(shard_state, CoordinatorState::MigrationComplete { .. }) {
@ -1517,7 +1517,7 @@ impl RebalancerWorker {
offset,
)
.await
.map_err(|e| format!("fetch failed: {}", e))?;
.map_err(|e| format!("fetch failed: {e}"))?;
if docs.is_empty() {
break; // No more documents
@ -1527,7 +1527,7 @@ impl RebalancerWorker {
executor
.write_documents(new_node_id, new_address, index_uid, docs.clone())
.await
.map_err(|e| format!("write failed: {}", e))?;
.map_err(|e| format!("write failed: {e}"))?;
total_docs_copied += docs.len() as u64;
offset += limit;
@ -1544,7 +1544,7 @@ impl RebalancerWorker {
let mut coordinator = self.migration_coordinator.write().await;
coordinator
.shard_migration_complete(migration_id, ShardId(shard_id), total_docs_copied)
.map_err(|e| format!("failed to mark shard complete: {}", e))?;
.map_err(|e| format!("failed to mark shard complete: {e}"))?;
}
// Update metrics
@ -1580,7 +1580,7 @@ impl RebalancerWorker {
info!(index_uid = %index_uid, "paused rebalance");
Ok(())
} else {
Err(format!("no rebalance job found for index {}", index_uid))
Err(format!("no rebalance job found for index {index_uid}"))
}
}
@ -1594,7 +1594,7 @@ impl RebalancerWorker {
info!(index_uid = %index_uid, "resumed rebalance");
Ok(())
} else {
Err(format!("no rebalance job found for index {}", index_uid))
Err(format!("no rebalance job found for index {index_uid}"))
}
}
@ -1605,8 +1605,8 @@ impl RebalancerWorker {
move || task_store.list_jobs_by_state("running")
})
.await
.map_err(|e| format!("failed to list jobs: {}", e))?
.map_err(|e| format!("failed to list jobs: {}", e))?;
.map_err(|e| format!("failed to list jobs: {e}"))?
.map_err(|e| format!("failed to list jobs: {e}"))?;
for job_row in jobs {
if job_row.type_ == "rebalance" {

View file

@ -567,6 +567,11 @@ impl AppState {
lease_ttl_secs: 10,
lease_renewal_interval_ms: 2000,
};
let metrics_clone = metrics.clone();
let callback: miroir_core::rebalancer_worker::DriftRepairCallback =
Arc::new(move |index: &str| {
metrics_clone.inc_settings_drift_repair(index);
});
Some(Arc::new(
miroir_core::rebalancer_worker::DriftReconciler::new(
drift_config,
@ -575,7 +580,8 @@ impl AppState {
node_addresses,
config.node_master_key.clone(),
pod_id.clone(),
),
)
.with_metrics_callback(callback),
))
} else {
None