diff --git a/crates/miroir-core/src/rebalancer.rs b/crates/miroir-core/src/rebalancer.rs index 4b7ca55..77bea57 100644 --- a/crates/miroir-core/src/rebalancer.rs +++ b/crates/miroir-core/src/rebalancer.rs @@ -1099,9 +1099,6 @@ impl Rebalancer { // Create operation record let op_id = self.next_op_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - // TODO: Schedule background replication to restore RF if needed - // For now, just record the failure - let operation = TopologyOperation { id: op_id, op_type: TopologyOperationType::NodeFailure, @@ -1126,6 +1123,249 @@ impl Rebalancer { }) } + /// Handle a node recovery and restore RF within the group. + pub async fn handle_node_recovery( + &self, + node_id: &str, + ) -> Result { + info!(node_id = %node_id, "handling node recovery and RF restore"); + + let node_id_obj = TopologyNodeId::new(node_id.to_string()); + + // Mark node as recovered and get group info + let (replica_group, has_rf_to_restore) = { + let topo = self.topology.read().await; + let node = topo.node(&node_id_obj).ok_or_else(|| { + RebalancerError::NodeNotFound(node_id.to_string()) + })?; + + if node.status != NodeStatus::Failed && node.status != NodeStatus::Degraded { + return Err(RebalancerError::InvalidState(format!( + "node {} is not in a failed state (current: {:?})", + node_id, node.status + ))); + } + + let replica_group = node.replica_group; + + // Check if RF needs to be restored (other healthy nodes exist in group) + let group = topo.groups().find(|g| g.id == replica_group); + let has_other_healthy = group.map_or(false, |g| { + g.nodes().iter().any(|nid| { + nid != &node_id_obj && topo.node(nid).map(|n| n.is_healthy()).unwrap_or(false) + }) + }); + + (replica_group, has_other_healthy) + }; + + if !has_rf_to_restore { + // No other healthy nodes in group - just mark as active + let mut topo = self.topology.write().await; + let node = topo.node_mut(&node_id_obj).ok_or_else(|| { + RebalancerError::NodeNotFound(node_id.to_string()) + })?; + node.status = NodeStatus::Active; + + return Ok(TopologyOperationResult { + id: 0, + message: format!("Node {} recovered (no RF restore needed - no other healthy nodes in group)", node_id), + migrations_count: 0, + }); + } + + // Create operation record + let op_id = self.next_op_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + + // Mark node as active + { + let mut topo = self.topology.write().await; + let node = topo.node_mut(&node_id_obj).ok_or_else(|| { + RebalancerError::NodeNotFound(node_id.to_string()) + })?; + node.status = NodeStatus::Active; + } + + // Compute shards that need RF restore (shards where this node should be a replica) + let shards_to_restore = self.compute_shards_for_rf_restore(node_id, replica_group).await?; + + if !shards_to_restore.is_empty() { + // Create migrations for RF restore + let migrations = { + let mut coordinator = self.migration_coordinator.write().await; + let mut migs = Vec::new(); + + for shard in shards_to_restore { + // Find a healthy source node in the same group + let source_node = self.find_healthy_source_for_shard(shard, replica_group, node_id).await?; + + let mut old_owners = HashMap::new(); + old_owners.insert(shard, topo_to_migration_node_id(&source_node)); + + let mid = coordinator.begin_migration( + topo_to_migration_node_id(&node_id_obj), + replica_group, + old_owners, + )?; + + migs.push(mid); + } + + // Start dual-write for all migrations + for mid in &migs { + coordinator.begin_dual_write(*mid)?; + } + + migs + }; + + let migrations_count = migrations.len(); + + // Record operation (before moving migrations) + let operation = TopologyOperation { + id: op_id, + op_type: TopologyOperationType::NodeFailure, // Reuse NodeFailure type for recovery + status: TopologyOperationStatus::InProgress, + target_node: Some(node_id.to_string()), + target_group: Some(replica_group), + migrations: migrations.clone(), + started_at: Some(now_ms()), + completed_at: None, + error: None, + }; + + { + let mut ops = self.operations.write().await; + ops.insert(op_id, operation); + } + + // Start background RF restore task + let topo_arc = self.topology.clone(); + let coord_arc = self.migration_coordinator.clone(); + let ops_arc = self.operations.clone(); + let active_arc = self.active_migrations.clone(); + let config = self.config.clone(); + let executor = self.migration_executor.clone(); + let metrics_arc = self.metrics.clone(); + + tokio::spawn(async move { + if let Err(e) = run_migration_task( + topo_arc, + coord_arc, + ops_arc, + active_arc, + op_id, + migrations, + config, + executor, + metrics_arc, + ) + .await + { + error!(error = %e, op_id = op_id, "RF restore task failed"); + } + }); + + Ok(TopologyOperationResult { + id: op_id, + message: format!( + "Node {} recovered with RF restore ({} shards)", + node_id, + migrations_count + ), + migrations_count, + }) + } else { + // No shards need restoration + let operation = TopologyOperation { + id: op_id, + op_type: TopologyOperationType::NodeFailure, + status: TopologyOperationStatus::Complete, + target_node: Some(node_id.to_string()), + target_group: Some(replica_group), + migrations: Vec::new(), + started_at: Some(now_ms()), + completed_at: Some(now_ms()), + error: None, + }; + + { + let mut ops = self.operations.write().await; + ops.insert(op_id, operation); + } + + Ok(TopologyOperationResult { + id: op_id, + message: format!("Node {} recovered (no shards needed restoration)", node_id), + migrations_count: 0, + }) + } + } + + /// Compute which shards need RF restore for a recovered node. + /// Returns shards where the recovered node should be a replica but may have lost data. + async fn compute_shards_for_rf_restore( + &self, + recovered_node_id: &str, + replica_group: u32, + ) -> Result, RebalancerError> { + let topo = self.topology.read().await; + let recovered_node = TopologyNodeId::new(recovered_node_id.to_string()); + let rf = topo.rf(); + + let group = topo + .groups() + .find(|g| g.id == replica_group) + .ok_or_else(|| RebalancerError::GroupNotFound(replica_group))?; + + let mut shards_to_restore = Vec::new(); + + // For each shard, check if the recovered node should be a replica + for shard_id in 0..topo.shards { + let assignment = assign_shard_in_group(shard_id, group.nodes(), rf); + + if assignment.contains(&recovered_node) { + // This node should be a replica for this shard + shards_to_restore.push(ShardId(shard_id)); + } + } + + Ok(shards_to_restore) + } + + /// Find a healthy source node for RF restore of a specific shard. + async fn find_healthy_source_for_shard( + &self, + shard: ShardId, + replica_group: u32, + exclude_node_id: &str, + ) -> Result { + let topo = self.topology.read().await; + let exclude_node = TopologyNodeId::new(exclude_node_id.to_string()); + + let group = topo + .groups() + .find(|g| g.id == replica_group) + .ok_or_else(|| RebalancerError::GroupNotFound(replica_group))?; + + let assignment = assign_shard_in_group(shard.0, group.nodes(), topo.rf()); + + // Find a healthy replica (excluding the recovered node) + for node in assignment { + if node != exclude_node { + if let Some(n) = topo.node(&node) { + if n.is_healthy() { + return Ok(node); + } + } + } + } + + Err(RebalancerError::InvalidState( + format!("no healthy source found for shard {} in group {}", shard.0, replica_group) + )) + } + /// Compute which shards should move to a new node. /// Returns shard -> old_owner mapping for shards that will move. /// diff --git a/crates/miroir-core/src/router.rs b/crates/miroir-core/src/router.rs index 22bc800..48b50d3 100644 --- a/crates/miroir-core/src/router.rs +++ b/crates/miroir-core/src/router.rs @@ -106,8 +106,8 @@ pub fn query_group(query_seq: u64, replica_groups: u32) -> u32 { /// Select an ACTIVE replica group for a query (round-robin by query counter). /// -/// This function implements the group addition flow from plan §2: queries are -/// NOT routed to initializing groups, only active groups. When no groups are +/// This function implements the group addition/removal flow from plan §2: queries are +/// NOT routed to initializing or draining groups, only active groups. When no groups are /// active, returns 0 as a fallback (caller handles the empty case). /// /// # Arguments @@ -117,20 +117,20 @@ pub fn query_group(query_seq: u64, replica_groups: u32) -> u32 { /// # Returns /// The ID of the selected active replica group pub fn query_group_active(query_seq: u64, topology: &Topology) -> u32 { - // Collect all active group IDs - let active_groups: Vec = topology + // Collect all routing groups (active, not initializing or draining) + let routing_groups: Vec = topology .groups() - .filter(|g| g.is_active()) + .filter(|g| g.is_routing()) .map(|g| g.id) .collect(); - if active_groups.is_empty() { - // Fallback: no active groups, return 0 (caller handles empty case) + if routing_groups.is_empty() { + // Fallback: no routing groups, return 0 (caller handles empty case) return 0; } - // Round-robin among active groups only - active_groups[query_seq as usize % active_groups.len()] + // Round-robin among routing groups only + routing_groups[query_seq as usize % routing_groups.len()] } /// The covering set for a search: one node per shard within the chosen group. diff --git a/crates/miroir-core/src/scatter.rs b/crates/miroir-core/src/scatter.rs index f7f36b9..28c51e9 100644 --- a/crates/miroir-core/src/scatter.rs +++ b/crates/miroir-core/src/scatter.rs @@ -406,16 +406,56 @@ pub async fn plan_search_scatter( let _covering = covering_set(shard_count, group, rf, query_seq); let mut shard_to_node = HashMap::new(); + let node_map = topology.node_map(); + for shard_id in 0..shard_count { let replicas = crate::router::assign_shard_in_group(shard_id, group.nodes(), rf); - let selected = if let Some(selector) = replica_selector { - match selector.select(&replicas, chosen_group).await { - Some(node) => node, - None => replicas[(query_seq as usize) % replicas.len()].clone(), + // Filter to only healthy nodes within the group + let healthy_replicas: Vec = replicas + .iter() + .filter(|node_id| { + node_map.get(node_id) + .map(|n| n.is_healthy()) + .unwrap_or(false) + }) + .cloned() + .collect(); + + let selected = if !healthy_replicas.is_empty() { + // Use healthy intra-group replica + if let Some(selector) = replica_selector { + match selector.select(&healthy_replicas, chosen_group).await { + Some(node) => node, + None => healthy_replicas[(query_seq as usize) % healthy_replicas.len()].clone(), + } + } else { + healthy_replicas[(query_seq as usize) % healthy_replicas.len()].clone() } } else { - replicas[(query_seq as usize) % replicas.len()].clone() + // Cross-group fallback: try other groups for this shard + let mut fallback_node = None; + 'fallback: for group_id in 0..topology.replica_group_count() { + if group_id == chosen_group { + continue; + } + if let Some(other_group) = topology.group(group_id) { + let other_replicas = crate::router::assign_shard_in_group(shard_id, other_group.nodes(), rf); + for other_node in other_replicas { + if let Some(node) = node_map.get(&other_node) { + if node.is_healthy() { + fallback_node = Some(other_node); + break 'fallback; + } + } + } + } + } + + fallback_node.unwrap_or_else(|| { + // No healthy node found anywhere - use original replica and let it fail + replicas[(query_seq as usize) % replicas.len()].clone() + }) }; shard_to_node.insert(shard_id, selected); diff --git a/crates/miroir-core/src/topology.rs b/crates/miroir-core/src/topology.rs index 625ed2b..b75b309 100644 --- a/crates/miroir-core/src/topology.rs +++ b/crates/miroir-core/src/topology.rs @@ -37,7 +37,7 @@ impl std::fmt::Display for NodeId { } } -/// State of a replica group during group addition (plan §2). +/// State of a replica group during group addition/removal (plan §2). #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(rename_all = "snake_case")] pub enum GroupState { @@ -46,6 +46,8 @@ pub enum GroupState { Initializing, /// Group is fully synced and serving queries. Active, + /// Group is being removed; queries NOT routed here. + Draining, } /// Health status of a node, with state-machine transitions. @@ -256,6 +258,21 @@ impl Group { matches!(self.state, GroupState::Active) } + /// Check if this group is draining (being removed). + pub fn is_draining(&self) -> bool { + matches!(self.state, GroupState::Draining) + } + + /// Check if this group is routing queries (not initializing or draining). + pub fn is_routing(&self) -> bool { + matches!(self.state, GroupState::Active) + } + + /// Mark the group as draining (being removed). + pub fn mark_draining(&mut self) { + self.state = GroupState::Draining; + } + /// Add a node to this group. pub fn add_node(&mut self, node_id: NodeId) { if !self.nodes.contains(&node_id) { diff --git a/crates/miroir-core/tests/p25_task_reconciliation.rs b/crates/miroir-core/tests/p25_task_reconciliation.rs new file mode 100644 index 0000000..f260ca0 --- /dev/null +++ b/crates/miroir-core/tests/p25_task_reconciliation.rs @@ -0,0 +1,297 @@ +//! P2.5 Task ID reconciliation integration tests. +//! +//! Acceptance criteria: +//! - Fan-out to 3 nodes → all 3 `taskUid`s captured in one mtask +//! - GET /tasks/{mtask_id} while all nodes are processing → `processing` +//! - One node fails → status `failed`, error includes per-node breakdown +//! - In-memory registry survives the request's own lifetime + +use miroir_core::task::{NodeTaskStatus, TaskStatus, TaskRegistry, TaskFilter}; +use miroir_core::task_registry::InMemoryTaskRegistry; +use miroir_core::topology::{Topology, Node, NodeId}; +use std::collections::HashMap; +use tokio::time::{sleep, Duration}; + +/// Helper: create a test topology with 3 nodes in one replica group. +fn test_topology_3_nodes() -> Topology { + let mut topo = Topology::new(64, 1, 1); // 1 replica group, RF=1 + topo.add_node(Node::new(NodeId::new("node-0".into()), "http://node-0:7700".into(), 0)); + topo.add_node(Node::new(NodeId::new("node-1".into()), "http://node-1:7700".into(), 0)); + topo.add_node(Node::new(NodeId::new("node-2".into()), "http://node-2:7700".into(), 0)); + topo +} + +/// Helper: create a test topology with 2 replica groups, 2 nodes each. +fn test_topology_2_groups() -> Topology { + let mut topo = Topology::new(64, 2, 1); // 2 replica groups, RF=1 + topo.add_node(Node::new(NodeId::new("node-g0-0".into()), "http://g0-0:7700".into(), 0)); + topo.add_node(Node::new(NodeId::new("node-g1-0".into()), "http://g1-0:7700".into(), 1)); + topo +} + +#[tokio::test(flavor = "multi_thread")] +async fn acceptance_1_fanout_to_3_nodes_captures_all_task_uids() { + // Given: A task registry + let registry = InMemoryTaskRegistry::new(); + + // When: Fan-out to 3 nodes returns task UIDs + let mut node_task_uids = HashMap::new(); + node_task_uids.insert("node-0".to_string(), 42); + node_task_uids.insert("node-1".to_string(), 17); + node_task_uids.insert("node-2".to_string(), 88); + + // And: Register with metadata + let task = registry.register_with_metadata( + node_task_uids.clone(), + Some("test-index".to_string()), + Some("documentAdditionOrUpdate".to_string()), + ).expect("registration succeeds"); + + // Then: All 3 node task UIDs are captured in the mtask + assert_eq!(task.node_tasks.len(), 3); + assert_eq!(task.node_tasks.get("node-0").unwrap().task_uid, 42); + assert_eq!(task.node_tasks.get("node-1").unwrap().task_uid, 17); + assert_eq!(task.node_tasks.get("node-2").unwrap().task_uid, 88); + + // And: mtask ID is in correct format + assert!(task.miroir_id.starts_with("mtask-")); + + // And: Can retrieve the task by ID + let retrieved = registry.get(&task.miroir_id).expect("get succeeds"); + assert!(retrieved.is_some()); + let retrieved = retrieved.unwrap(); + assert_eq!(retrieved.miroir_id, task.miroir_id); + assert_eq!(retrieved.node_tasks.len(), 3); +} + +#[tokio::test(flavor = "multi_thread")] +async fn acceptance_2_get_task_while_processing_returns_processing() { + // Given: A task registry with a registered task + let registry = InMemoryTaskRegistry::new(); + let mut node_task_uids = HashMap::new(); + node_task_uids.insert("node-0".to_string(), 42); + node_task_uids.insert("node-1".to_string(), 17); + node_task_uids.insert("node-2".to_string(), 88); + + let task = registry.register_with_metadata( + node_task_uids, + Some("test-index".to_string()), + Some("documentAdditionOrUpdate".to_string()), + ).expect("registration succeeds"); + + // When: All nodes are still processing (not terminal) + // Simulate by setting node tasks to Processing status + registry.update_node_task(&task.miroir_id, "node-0", NodeTaskStatus::Processing).await.expect("update node-0"); + registry.update_node_task(&task.miroir_id, "node-1", NodeTaskStatus::Processing).await.expect("update node-1"); + registry.update_node_task(&task.miroir_id, "node-2", NodeTaskStatus::Processing).await.expect("update node-2"); + + // And: Update overall status to Processing (which sets started_at) + registry.update_status(&task.miroir_id, TaskStatus::Processing).await.expect("update succeeds"); + + // Then: GET /tasks/{mtask_id} returns "processing" status + let retrieved = registry.get(&task.miroir_id).expect("get succeeds").expect("task exists"); + assert_eq!(retrieved.status, TaskStatus::Processing); + assert!(retrieved.started_at.is_some()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn acceptance_3_one_node_fails_returns_failed_with_breakdown() { + // Given: A task registry with a registered task + let registry = InMemoryTaskRegistry::new(); + let mut node_task_uids = HashMap::new(); + node_task_uids.insert("node-0".to_string(), 42); + node_task_uids.insert("node-1".to_string(), 17); + node_task_uids.insert("node-2".to_string(), 88); + + let task = registry.register_with_metadata( + node_task_uids, + Some("test-index".to_string()), + Some("documentAdditionOrUpdate".to_string()), + ).expect("registration succeeds"); + + // When: One node fails + registry.update_node_task(&task.miroir_id, "node-0", NodeTaskStatus::Succeeded).await.expect("update node-0"); + registry.update_node_task(&task.miroir_id, "node-1", NodeTaskStatus::Failed).await.expect("update node-1"); + registry.update_node_task(&task.miroir_id, "node-2", NodeTaskStatus::Succeeded).await.expect("update node-2"); + + // And: Update overall status (will detect failure and set failed state) + registry.update_overall_status(&task.miroir_id).await.expect("update succeeds"); + + // Then: Status is "failed" + let retrieved = registry.get(&task.miroir_id).expect("get succeeds").expect("task exists"); + assert_eq!(retrieved.status, TaskStatus::Failed); +} + +#[tokio::test(flavor = "multi_thread")] +async fn acceptance_4_in_memory_registry_survives_request_lifetime() { + // Given: A task registry + let registry = InMemoryTaskRegistry::new(); + + // When: Register a task in a request scope + let task_id = { + let mut node_task_uids = HashMap::new(); + node_task_uids.insert("node-0".to_string(), 42); + let task = registry.register_with_metadata( + node_task_uids, + Some("test-index".to_string()), + Some("documentAdditionOrUpdate".to_string()), + ).expect("registration succeeds"); + task.miroir_id + }; + + // Then: Task is still accessible after the "request" scope ends + let retrieved = registry.get(&task_id).expect("get succeeds").expect("task exists"); + assert_eq!(retrieved.miroir_id, task_id); + assert_eq!(retrieved.status, TaskStatus::Enqueued); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_all_nodes_succeeded_returns_succeeded() { + // Given: A task with 3 node tasks + let registry = InMemoryTaskRegistry::new(); + let mut node_task_uids = HashMap::new(); + node_task_uids.insert("node-0".to_string(), 42); + node_task_uids.insert("node-1".to_string(), 17); + node_task_uids.insert("node-2".to_string(), 88); + + let task = registry.register_with_metadata( + node_task_uids, + Some("test-index".to_string()), + Some("documentAdditionOrUpdate".to_string()), + ).expect("registration succeeds"); + + // When: All nodes succeed + registry.update_node_task(&task.miroir_id, "node-0", NodeTaskStatus::Succeeded).await.expect("update node-0"); + registry.update_node_task(&task.miroir_id, "node-1", NodeTaskStatus::Succeeded).await.expect("update node-1"); + registry.update_node_task(&task.miroir_id, "node-2", NodeTaskStatus::Succeeded).await.expect("update node-2"); + + // And: Update overall status to Succeeded (which sets finished_at) + registry.update_status(&task.miroir_id, TaskStatus::Succeeded).await.expect("update succeeds"); + + // Then: Status is "succeeded" + let retrieved = registry.get(&task.miroir_id).expect("get succeeds").expect("task exists"); + assert_eq!(retrieved.status, TaskStatus::Succeeded); + assert!(retrieved.finished_at.is_some()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_task_list_with_filters() { + // Given: A task registry with multiple tasks + let registry = InMemoryTaskRegistry::new(); + + // Create tasks with different statuses and types + let mut node_task_uids_1 = HashMap::new(); + node_task_uids_1.insert("node-0".to_string(), 1); + let task1 = registry.register_with_metadata( + node_task_uids_1.clone(), + Some("index-a".to_string()), + Some("documentAdditionOrUpdate".to_string()), + ).expect("registration succeeds"); + + let mut node_task_uids_2 = HashMap::new(); + node_task_uids_2.insert("node-1".to_string(), 2); + let task2 = registry.register_with_metadata( + node_task_uids_2.clone(), + Some("index-b".to_string()), + Some("documentDeletion".to_string()), + ).expect("registration succeeds"); + + // Mark task1 as succeeded + registry.update_status(&task1.miroir_id, TaskStatus::Succeeded).await.expect("update status"); + + // When: Filter by status + let filter = TaskFilter { + status: Some(TaskStatus::Succeeded), + node_id: None, + index_uid: None, + task_type: None, + limit: None, + offset: None, + }; + + let tasks = registry.list(filter).expect("list succeeds"); + assert_eq!(tasks.len(), 1); + assert_eq!(tasks[0].miroir_id, task1.miroir_id); + + // When: Filter by index_uid + let filter = TaskFilter { + status: None, + node_id: None, + index_uid: Some("index-b".to_string()), + task_type: None, + limit: None, + offset: None, + }; + + let tasks = registry.list(filter).expect("list succeeds"); + assert_eq!(tasks.len(), 1); + assert_eq!(tasks[0].miroir_id, task2.miroir_id); + + // When: Filter by task_type + let filter = TaskFilter { + status: None, + node_id: None, + index_uid: None, + task_type: Some("documentDeletion".to_string()), + limit: None, + offset: None, + }; + + let tasks = registry.list(filter).expect("list succeeds"); + assert_eq!(tasks.len(), 1); + assert_eq!(tasks[0].miroir_id, task2.miroir_id); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_task_count() { + // Given: A task registry + let registry = InMemoryTaskRegistry::new(); + + // When: No tasks + assert_eq!(registry.count().await, 0); + + // When: Add tasks + let mut node_task_uids = HashMap::new(); + node_task_uids.insert("node-0".to_string(), 42); + registry.register_with_metadata( + node_task_uids.clone(), + Some("test-index".to_string()), + Some("documentAdditionOrUpdate".to_string()), + ).expect("registration succeeds"); + + registry.register_with_metadata( + node_task_uids.clone(), + Some("test-index".to_string()), + Some("documentDeletion".to_string()), + ).expect("registration succeeds"); + + // Then: Count reflects total + assert_eq!(registry.count().await, 2); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_exponential_backoff_polling_simulation() { + // Given: A task registry + let registry = InMemoryTaskRegistry::new(); + + // When: Register a task (spawns background polling) + let mut node_task_uids = HashMap::new(); + node_task_uids.insert("node-0".to_string(), 42); + node_task_uids.insert("node-1".to_string(), 17); + node_task_uids.insert("node-2".to_string(), 88); + + let task = registry.register_with_metadata( + node_task_uids, + Some("test-index".to_string()), + Some("documentAdditionOrUpdate".to_string()), + ).expect("registration succeeds"); + + // The background poller simulates task completion after ~500ms + // Wait for completion + sleep(Duration::from_millis(800)).await; + + // Then: Task should be succeeded + let retrieved = registry.get(&task.miroir_id).expect("get succeeds").expect("task exists"); + assert_eq!(retrieved.status, TaskStatus::Succeeded); + assert!(retrieved.finished_at.is_some()); +} diff --git a/crates/miroir-proxy/src/main.rs b/crates/miroir-proxy/src/main.rs index 05d6f5c..5719981 100644 --- a/crates/miroir-proxy/src/main.rs +++ b/crates/miroir-proxy/src/main.rs @@ -744,11 +744,19 @@ async fn main() -> anyhow::Result<()> { /// /// On each tick it also updates the Prometheus metrics for node health, /// shard coverage, shard distribution, and degraded shard count. +/// +/// Implements unplanned node failure detection (plan §2): +/// - unhealthy_threshold consecutive failures → mark node as Failed +/// - recovery_threshold consecutive successes → recover from Failed/Degraded async fn run_health_checker(state: admin_endpoints::AppState) { let mut interval = tokio::time::interval(Duration::from_millis( state.config.health.interval_ms, )); + // Track consecutive failures per node (in-memory only) + let mut consecutive_failures: std::collections::HashMap = std::collections::HashMap::new(); + let mut consecutive_successes: std::collections::HashMap = std::collections::HashMap::new(); + loop { interval.tick().await; @@ -762,11 +770,6 @@ async fn run_health_checker(state: admin_endpoints::AppState) { // Get current node status let current_status = topo.node(node_id).map(|n| n.status); - // Skip nodes that are already Active/Healthy - if let Some(NodeStatus::Active) | Some(NodeStatus::Healthy) = current_status { - continue; - } - // Get node address let node_address = match topo.node(node_id) { Some(n) => n.address.clone(), @@ -784,6 +787,9 @@ async fn run_health_checker(state: admin_endpoints::AppState) { Ok(c) => c, Err(_) => { all_healthy = false; + // Increment failure counter + *consecutive_failures.entry(node_id.clone()).or_insert(0) += 1; + consecutive_successes.remove(node_id); continue; } }; @@ -792,13 +798,89 @@ async fn run_health_checker(state: admin_endpoints::AppState) { let result = client.get(&url).send().await; if result.is_ok() && result.unwrap().status().is_success() { - // Node is reachable - promote to Active + // Node is reachable + consecutive_successes.entry(node_id.clone()).and_modify(|c| *c += 1).or_insert(1); + consecutive_failures.remove(node_id); + + let successes = *consecutive_successes.get(node_id.as_str()).unwrap_or(&1); + + // Check if we should promote from Joining/Degraded/Failed to Active if let Some(node) = topo.node_mut(node_id) { - let _ = node.transition_to(NodeStatus::Active); - info!(node_id = %node_id, "node promoted to Active"); + match node.status { + NodeStatus::Joining => { + // Promote joining nodes immediately on first success + let _ = node.transition_to(NodeStatus::Active); + info!(node_id = %node_id, "node promoted to Active (was Joining)"); + } + NodeStatus::Degraded => { + // Need recovery_threshold consecutive successes to recover + if successes >= state.config.health.recovery_threshold { + let _ = node.transition_to(NodeStatus::Active); + info!(node_id = %node_id, "node recovered to Active (was Degraded)"); + } + } + NodeStatus::Failed => { + // Need recovery_threshold consecutive successes to recover + if successes >= state.config.health.recovery_threshold { + let _ = node.transition_to(NodeStatus::Active); + info!(node_id = %node_id, "node recovered to Active (was Failed)"); + + // Trigger RF-restore if configured + if let Some(ref rebalancer) = state.rebalancer { + if let Some(ref worker) = state.rebalancer_worker { + let event = TopologyChangeEvent::NodeRecovered { + node_id: node_id.as_str().to_string(), + replica_group: node.replica_group, + index_uid: "default".to_string(), + }; + let _ = worker.event_sender().try_send(event); + } + } + } + } + _ => {} + } } } else { + // Node is unreachable all_healthy = false; + *consecutive_failures.entry(node_id.clone()).or_insert(0) += 1; + consecutive_successes.remove(node_id); + + let failures = *consecutive_failures.get(node_id.as_str()).unwrap_or(&1); + + // Check if we should mark as Degraded or Failed + if let Some(node) = topo.node_mut(node_id) { + match node.status { + NodeStatus::Active | NodeStatus::Healthy => { + // First failure → mark Degraded (not full failure yet) + if failures >= 1 { + let _ = node.transition_to(NodeStatus::Degraded); + warn!(node_id = %node_id, consecutive_failures = failures, "node marked Degraded"); + } + } + NodeStatus::Degraded => { + // unhealthy_threshold consecutive failures → mark Failed + if failures >= state.config.health.unhealthy_threshold { + let _ = node.transition_to(NodeStatus::Failed); + warn!(node_id = %node_id, consecutive_failures = failures, "node marked Failed"); + + // Trigger failure handling + if let Some(ref rebalancer) = state.rebalancer { + if let Some(ref worker) = state.rebalancer_worker { + let event = TopologyChangeEvent::NodeFailed { + node_id: node_id.as_str().to_string(), + replica_group: node.replica_group, + index_uid: "default".to_string(), + }; + let _ = worker.event_sender().try_send(event); + } + } + } + } + _ => {} + } + } } } diff --git a/crates/miroir-proxy/src/routes/admin_endpoints.rs b/crates/miroir-proxy/src/routes/admin_endpoints.rs index 3f9e924..a848d49 100644 --- a/crates/miroir-proxy/src/routes/admin_endpoints.rs +++ b/crates/miroir-proxy/src/routes/admin_endpoints.rs @@ -639,48 +639,6 @@ impl AppState { None }; - Self { - config: Arc::new(config.clone()), - topology: topology_arc, - ready: Arc::new(RwLock::new(false)), - metrics: metrics.clone(), - version_state, - task_registry: Arc::new(task_registry), - redis_store, - task_store, - pod_id, - seal_key, - local_rate_limiter: LocalAdminRateLimiter::new(), - local_search_ui_rate_limiter: LocalSearchUiRateLimiter::new(), - rebalancer: Some(rebalancer), - migration_coordinator: Some(migration_coordinator), - rebalancer_worker, - rebalancer_metrics, - previous_docs_migrated: Arc::new(std::sync::atomic::AtomicU64::new(0)), - settings_broadcast, - drift_reconciler, - anti_entropy_worker, - session_manager, - alias_registry, - leader_election, - mode_c_worker, - replica_selector: { - let advanced_config = config.replica_selection.clone(); - let selector_config = miroir_core::replica_selection::ReplicaSelectionConfig::from(advanced_config); - let observer = Arc::new(ReplicaSelectionMetricsObserver { - metrics: metrics.clone(), - }); - Arc::new(ReplicaSelector::new_with_observer(selector_config, observer)) - }, - idempotency_cache: Arc::new(miroir_core::idempotency::IdempotencyCache::new( - config.idempotency.max_cached_keys as usize, - config.idempotency.ttl_seconds, - )), - query_coalescer: Arc::new(miroir_core::idempotency::QueryCoalescer::new( - config.query_coalescing.window_ms, - config.query_coalescing.max_pending_queries as usize, - config.query_coalescing.max_subscribers as usize, - )), // Create group addition coordinator (needed for both API and sync worker) let group_addition_coordinator = if has_task_store { Some(Arc::new(RwLock::new( diff --git a/crates/miroir-proxy/tests/p13_6_session_pinning.rs b/crates/miroir-proxy/tests/p13_6_session_pinning.rs index f8ba7e9..9e3a9f1 100644 --- a/crates/miroir-proxy/tests/p13_6_session_pinning.rs +++ b/crates/miroir-proxy/tests/p13_6_session_pinning.rs @@ -742,7 +742,7 @@ async fn integration_session_pin_with_scatter_plan() { } // Plan scatter for pinned group 1 - let plan = plan_search_scatter_for_group(&topo, 0, 2, 4, 1); + let plan = plan_search_scatter_for_group(&topo, 0, 2, 4, 1, None).await; assert!(plan.is_some(), "Plan should be created for pinned group"); let plan = plan.unwrap();