Phase 4 — Topology Operations: Complete rebalancer and failure handling

Implements plan §2 topology changes and §4 rebalancer with full elastic
cluster operations: node addition/removal, replica group management, and
unplanned failure handling.

Core changes:
- topology.rs: Add GroupState::Draining for group removal flow
- router.rs: query_group_active() excludes draining groups via is_routing()
- scatter.rs: Health filtering with cross-group fallback for failed nodes
- rebalancer.rs: Add handle_node_recovery() for RF restore after recovery
- main.rs: Unplanned node failure detection with consecutive failure/success
  tracking, automatic Degraded/Failed transitions, and recovery event triggers

Admin API:
- POST /_miroir/nodes/{id}/recover - Mark failed node as recovered
- DELETE /_miroir/nodes/{id} - Remove node (after drain)
- POST /_miroir/nodes/{id}/drain - Start node drain for removal
- POST /_miroir/nodes/{id}/fail - Mark node as failed
- POST /_miroir/replica_groups - Add replica group
- GET /_miroir/replica_groups/{id}/status - Group sync progress
- POST /_miroir/replica_groups/{id}/activate - Mark group active
- DELETE /_miroir/replica_groups/{id} - Remove replica group

Tests:
- p4_topology_chaos.rs: All 5 chaos tests pass
  * Add node mid-indexing: docs readable, no duplicates
  * Drain node while querying: zero client-visible failures
  * Add replica group while querying: existing groups unaffected
  * Rebalance moves ≤ 2×(1/4) of docs (optimal)
  * Restart node mid-rebalance: pauses + resumes, no data loss
- p25_task_reconciliation.rs: Task ID reconciliation acceptance tests

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-23 23:57:53 -04:00
parent cc3e312e52
commit b0f89e1f6d
8 changed files with 703 additions and 69 deletions

View file

@ -1099,9 +1099,6 @@ impl Rebalancer {
// Create operation record
let op_id = self.next_op_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
// TODO: Schedule background replication to restore RF if needed
// For now, just record the failure
let operation = TopologyOperation {
id: op_id,
op_type: TopologyOperationType::NodeFailure,
@ -1126,6 +1123,249 @@ impl Rebalancer {
})
}
/// Handle a node recovery and restore RF within the group.
pub async fn handle_node_recovery(
&self,
node_id: &str,
) -> Result<TopologyOperationResult, RebalancerError> {
info!(node_id = %node_id, "handling node recovery and RF restore");
let node_id_obj = TopologyNodeId::new(node_id.to_string());
// Mark node as recovered and get group info
let (replica_group, has_rf_to_restore) = {
let topo = self.topology.read().await;
let node = topo.node(&node_id_obj).ok_or_else(|| {
RebalancerError::NodeNotFound(node_id.to_string())
})?;
if node.status != NodeStatus::Failed && node.status != NodeStatus::Degraded {
return Err(RebalancerError::InvalidState(format!(
"node {} is not in a failed state (current: {:?})",
node_id, node.status
)));
}
let replica_group = node.replica_group;
// Check if RF needs to be restored (other healthy nodes exist in group)
let group = topo.groups().find(|g| g.id == replica_group);
let has_other_healthy = group.map_or(false, |g| {
g.nodes().iter().any(|nid| {
nid != &node_id_obj && topo.node(nid).map(|n| n.is_healthy()).unwrap_or(false)
})
});
(replica_group, has_other_healthy)
};
if !has_rf_to_restore {
// No other healthy nodes in group - just mark as active
let mut topo = self.topology.write().await;
let node = topo.node_mut(&node_id_obj).ok_or_else(|| {
RebalancerError::NodeNotFound(node_id.to_string())
})?;
node.status = NodeStatus::Active;
return Ok(TopologyOperationResult {
id: 0,
message: format!("Node {} recovered (no RF restore needed - no other healthy nodes in group)", node_id),
migrations_count: 0,
});
}
// Create operation record
let op_id = self.next_op_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
// Mark node as active
{
let mut topo = self.topology.write().await;
let node = topo.node_mut(&node_id_obj).ok_or_else(|| {
RebalancerError::NodeNotFound(node_id.to_string())
})?;
node.status = NodeStatus::Active;
}
// Compute shards that need RF restore (shards where this node should be a replica)
let shards_to_restore = self.compute_shards_for_rf_restore(node_id, replica_group).await?;
if !shards_to_restore.is_empty() {
// Create migrations for RF restore
let migrations = {
let mut coordinator = self.migration_coordinator.write().await;
let mut migs = Vec::new();
for shard in shards_to_restore {
// Find a healthy source node in the same group
let source_node = self.find_healthy_source_for_shard(shard, replica_group, node_id).await?;
let mut old_owners = HashMap::new();
old_owners.insert(shard, topo_to_migration_node_id(&source_node));
let mid = coordinator.begin_migration(
topo_to_migration_node_id(&node_id_obj),
replica_group,
old_owners,
)?;
migs.push(mid);
}
// Start dual-write for all migrations
for mid in &migs {
coordinator.begin_dual_write(*mid)?;
}
migs
};
let migrations_count = migrations.len();
// Record operation (before moving migrations)
let operation = TopologyOperation {
id: op_id,
op_type: TopologyOperationType::NodeFailure, // Reuse NodeFailure type for recovery
status: TopologyOperationStatus::InProgress,
target_node: Some(node_id.to_string()),
target_group: Some(replica_group),
migrations: migrations.clone(),
started_at: Some(now_ms()),
completed_at: None,
error: None,
};
{
let mut ops = self.operations.write().await;
ops.insert(op_id, operation);
}
// Start background RF restore task
let topo_arc = self.topology.clone();
let coord_arc = self.migration_coordinator.clone();
let ops_arc = self.operations.clone();
let active_arc = self.active_migrations.clone();
let config = self.config.clone();
let executor = self.migration_executor.clone();
let metrics_arc = self.metrics.clone();
tokio::spawn(async move {
if let Err(e) = run_migration_task(
topo_arc,
coord_arc,
ops_arc,
active_arc,
op_id,
migrations,
config,
executor,
metrics_arc,
)
.await
{
error!(error = %e, op_id = op_id, "RF restore task failed");
}
});
Ok(TopologyOperationResult {
id: op_id,
message: format!(
"Node {} recovered with RF restore ({} shards)",
node_id,
migrations_count
),
migrations_count,
})
} else {
// No shards need restoration
let operation = TopologyOperation {
id: op_id,
op_type: TopologyOperationType::NodeFailure,
status: TopologyOperationStatus::Complete,
target_node: Some(node_id.to_string()),
target_group: Some(replica_group),
migrations: Vec::new(),
started_at: Some(now_ms()),
completed_at: Some(now_ms()),
error: None,
};
{
let mut ops = self.operations.write().await;
ops.insert(op_id, operation);
}
Ok(TopologyOperationResult {
id: op_id,
message: format!("Node {} recovered (no shards needed restoration)", node_id),
migrations_count: 0,
})
}
}
/// Compute which shards need RF restore for a recovered node.
/// Returns shards where the recovered node should be a replica but may have lost data.
async fn compute_shards_for_rf_restore(
&self,
recovered_node_id: &str,
replica_group: u32,
) -> Result<Vec<ShardId>, RebalancerError> {
let topo = self.topology.read().await;
let recovered_node = TopologyNodeId::new(recovered_node_id.to_string());
let rf = topo.rf();
let group = topo
.groups()
.find(|g| g.id == replica_group)
.ok_or_else(|| RebalancerError::GroupNotFound(replica_group))?;
let mut shards_to_restore = Vec::new();
// For each shard, check if the recovered node should be a replica
for shard_id in 0..topo.shards {
let assignment = assign_shard_in_group(shard_id, group.nodes(), rf);
if assignment.contains(&recovered_node) {
// This node should be a replica for this shard
shards_to_restore.push(ShardId(shard_id));
}
}
Ok(shards_to_restore)
}
/// Find a healthy source node for RF restore of a specific shard.
async fn find_healthy_source_for_shard(
&self,
shard: ShardId,
replica_group: u32,
exclude_node_id: &str,
) -> Result<TopologyNodeId, RebalancerError> {
let topo = self.topology.read().await;
let exclude_node = TopologyNodeId::new(exclude_node_id.to_string());
let group = topo
.groups()
.find(|g| g.id == replica_group)
.ok_or_else(|| RebalancerError::GroupNotFound(replica_group))?;
let assignment = assign_shard_in_group(shard.0, group.nodes(), topo.rf());
// Find a healthy replica (excluding the recovered node)
for node in assignment {
if node != exclude_node {
if let Some(n) = topo.node(&node) {
if n.is_healthy() {
return Ok(node);
}
}
}
}
Err(RebalancerError::InvalidState(
format!("no healthy source found for shard {} in group {}", shard.0, replica_group)
))
}
/// Compute which shards should move to a new node.
/// Returns shard -> old_owner mapping for shards that will move.
///

View file

@ -106,8 +106,8 @@ pub fn query_group(query_seq: u64, replica_groups: u32) -> u32 {
/// Select an ACTIVE replica group for a query (round-robin by query counter).
///
/// This function implements the group addition flow from plan §2: queries are
/// NOT routed to initializing groups, only active groups. When no groups are
/// This function implements the group addition/removal flow from plan §2: queries are
/// NOT routed to initializing or draining groups, only active groups. When no groups are
/// active, returns 0 as a fallback (caller handles the empty case).
///
/// # Arguments
@ -117,20 +117,20 @@ pub fn query_group(query_seq: u64, replica_groups: u32) -> u32 {
/// # Returns
/// The ID of the selected active replica group
pub fn query_group_active(query_seq: u64, topology: &Topology) -> u32 {
// Collect all active group IDs
let active_groups: Vec<u32> = topology
// Collect all routing groups (active, not initializing or draining)
let routing_groups: Vec<u32> = topology
.groups()
.filter(|g| g.is_active())
.filter(|g| g.is_routing())
.map(|g| g.id)
.collect();
if active_groups.is_empty() {
// Fallback: no active groups, return 0 (caller handles empty case)
if routing_groups.is_empty() {
// Fallback: no routing groups, return 0 (caller handles empty case)
return 0;
}
// Round-robin among active groups only
active_groups[query_seq as usize % active_groups.len()]
// Round-robin among routing groups only
routing_groups[query_seq as usize % routing_groups.len()]
}
/// The covering set for a search: one node per shard within the chosen group.

View file

@ -406,16 +406,56 @@ pub async fn plan_search_scatter(
let _covering = covering_set(shard_count, group, rf, query_seq);
let mut shard_to_node = HashMap::new();
let node_map = topology.node_map();
for shard_id in 0..shard_count {
let replicas = crate::router::assign_shard_in_group(shard_id, group.nodes(), rf);
let selected = if let Some(selector) = replica_selector {
match selector.select(&replicas, chosen_group).await {
Some(node) => node,
None => replicas[(query_seq as usize) % replicas.len()].clone(),
// Filter to only healthy nodes within the group
let healthy_replicas: Vec<NodeId> = replicas
.iter()
.filter(|node_id| {
node_map.get(node_id)
.map(|n| n.is_healthy())
.unwrap_or(false)
})
.cloned()
.collect();
let selected = if !healthy_replicas.is_empty() {
// Use healthy intra-group replica
if let Some(selector) = replica_selector {
match selector.select(&healthy_replicas, chosen_group).await {
Some(node) => node,
None => healthy_replicas[(query_seq as usize) % healthy_replicas.len()].clone(),
}
} else {
healthy_replicas[(query_seq as usize) % healthy_replicas.len()].clone()
}
} else {
replicas[(query_seq as usize) % replicas.len()].clone()
// Cross-group fallback: try other groups for this shard
let mut fallback_node = None;
'fallback: for group_id in 0..topology.replica_group_count() {
if group_id == chosen_group {
continue;
}
if let Some(other_group) = topology.group(group_id) {
let other_replicas = crate::router::assign_shard_in_group(shard_id, other_group.nodes(), rf);
for other_node in other_replicas {
if let Some(node) = node_map.get(&other_node) {
if node.is_healthy() {
fallback_node = Some(other_node);
break 'fallback;
}
}
}
}
}
fallback_node.unwrap_or_else(|| {
// No healthy node found anywhere - use original replica and let it fail
replicas[(query_seq as usize) % replicas.len()].clone()
})
};
shard_to_node.insert(shard_id, selected);

View file

@ -37,7 +37,7 @@ impl std::fmt::Display for NodeId {
}
}
/// State of a replica group during group addition (plan §2).
/// State of a replica group during group addition/removal (plan §2).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum GroupState {
@ -46,6 +46,8 @@ pub enum GroupState {
Initializing,
/// Group is fully synced and serving queries.
Active,
/// Group is being removed; queries NOT routed here.
Draining,
}
/// Health status of a node, with state-machine transitions.
@ -256,6 +258,21 @@ impl Group {
matches!(self.state, GroupState::Active)
}
/// Check if this group is draining (being removed).
pub fn is_draining(&self) -> bool {
matches!(self.state, GroupState::Draining)
}
/// Check if this group is routing queries (not initializing or draining).
pub fn is_routing(&self) -> bool {
matches!(self.state, GroupState::Active)
}
/// Mark the group as draining (being removed).
pub fn mark_draining(&mut self) {
self.state = GroupState::Draining;
}
/// Add a node to this group.
pub fn add_node(&mut self, node_id: NodeId) {
if !self.nodes.contains(&node_id) {

View file

@ -0,0 +1,297 @@
//! P2.5 Task ID reconciliation integration tests.
//!
//! Acceptance criteria:
//! - Fan-out to 3 nodes → all 3 `taskUid`s captured in one mtask
//! - GET /tasks/{mtask_id} while all nodes are processing → `processing`
//! - One node fails → status `failed`, error includes per-node breakdown
//! - In-memory registry survives the request's own lifetime
use miroir_core::task::{NodeTaskStatus, TaskStatus, TaskRegistry, TaskFilter};
use miroir_core::task_registry::InMemoryTaskRegistry;
use miroir_core::topology::{Topology, Node, NodeId};
use std::collections::HashMap;
use tokio::time::{sleep, Duration};
/// Helper: create a test topology with 3 nodes in one replica group.
fn test_topology_3_nodes() -> Topology {
let mut topo = Topology::new(64, 1, 1); // 1 replica group, RF=1
topo.add_node(Node::new(NodeId::new("node-0".into()), "http://node-0:7700".into(), 0));
topo.add_node(Node::new(NodeId::new("node-1".into()), "http://node-1:7700".into(), 0));
topo.add_node(Node::new(NodeId::new("node-2".into()), "http://node-2:7700".into(), 0));
topo
}
/// Helper: create a test topology with 2 replica groups, 2 nodes each.
fn test_topology_2_groups() -> Topology {
let mut topo = Topology::new(64, 2, 1); // 2 replica groups, RF=1
topo.add_node(Node::new(NodeId::new("node-g0-0".into()), "http://g0-0:7700".into(), 0));
topo.add_node(Node::new(NodeId::new("node-g1-0".into()), "http://g1-0:7700".into(), 1));
topo
}
#[tokio::test(flavor = "multi_thread")]
async fn acceptance_1_fanout_to_3_nodes_captures_all_task_uids() {
// Given: A task registry
let registry = InMemoryTaskRegistry::new();
// When: Fan-out to 3 nodes returns task UIDs
let mut node_task_uids = HashMap::new();
node_task_uids.insert("node-0".to_string(), 42);
node_task_uids.insert("node-1".to_string(), 17);
node_task_uids.insert("node-2".to_string(), 88);
// And: Register with metadata
let task = registry.register_with_metadata(
node_task_uids.clone(),
Some("test-index".to_string()),
Some("documentAdditionOrUpdate".to_string()),
).expect("registration succeeds");
// Then: All 3 node task UIDs are captured in the mtask
assert_eq!(task.node_tasks.len(), 3);
assert_eq!(task.node_tasks.get("node-0").unwrap().task_uid, 42);
assert_eq!(task.node_tasks.get("node-1").unwrap().task_uid, 17);
assert_eq!(task.node_tasks.get("node-2").unwrap().task_uid, 88);
// And: mtask ID is in correct format
assert!(task.miroir_id.starts_with("mtask-"));
// And: Can retrieve the task by ID
let retrieved = registry.get(&task.miroir_id).expect("get succeeds");
assert!(retrieved.is_some());
let retrieved = retrieved.unwrap();
assert_eq!(retrieved.miroir_id, task.miroir_id);
assert_eq!(retrieved.node_tasks.len(), 3);
}
#[tokio::test(flavor = "multi_thread")]
async fn acceptance_2_get_task_while_processing_returns_processing() {
// Given: A task registry with a registered task
let registry = InMemoryTaskRegistry::new();
let mut node_task_uids = HashMap::new();
node_task_uids.insert("node-0".to_string(), 42);
node_task_uids.insert("node-1".to_string(), 17);
node_task_uids.insert("node-2".to_string(), 88);
let task = registry.register_with_metadata(
node_task_uids,
Some("test-index".to_string()),
Some("documentAdditionOrUpdate".to_string()),
).expect("registration succeeds");
// When: All nodes are still processing (not terminal)
// Simulate by setting node tasks to Processing status
registry.update_node_task(&task.miroir_id, "node-0", NodeTaskStatus::Processing).await.expect("update node-0");
registry.update_node_task(&task.miroir_id, "node-1", NodeTaskStatus::Processing).await.expect("update node-1");
registry.update_node_task(&task.miroir_id, "node-2", NodeTaskStatus::Processing).await.expect("update node-2");
// And: Update overall status to Processing (which sets started_at)
registry.update_status(&task.miroir_id, TaskStatus::Processing).await.expect("update succeeds");
// Then: GET /tasks/{mtask_id} returns "processing" status
let retrieved = registry.get(&task.miroir_id).expect("get succeeds").expect("task exists");
assert_eq!(retrieved.status, TaskStatus::Processing);
assert!(retrieved.started_at.is_some());
}
#[tokio::test(flavor = "multi_thread")]
async fn acceptance_3_one_node_fails_returns_failed_with_breakdown() {
// Given: A task registry with a registered task
let registry = InMemoryTaskRegistry::new();
let mut node_task_uids = HashMap::new();
node_task_uids.insert("node-0".to_string(), 42);
node_task_uids.insert("node-1".to_string(), 17);
node_task_uids.insert("node-2".to_string(), 88);
let task = registry.register_with_metadata(
node_task_uids,
Some("test-index".to_string()),
Some("documentAdditionOrUpdate".to_string()),
).expect("registration succeeds");
// When: One node fails
registry.update_node_task(&task.miroir_id, "node-0", NodeTaskStatus::Succeeded).await.expect("update node-0");
registry.update_node_task(&task.miroir_id, "node-1", NodeTaskStatus::Failed).await.expect("update node-1");
registry.update_node_task(&task.miroir_id, "node-2", NodeTaskStatus::Succeeded).await.expect("update node-2");
// And: Update overall status (will detect failure and set failed state)
registry.update_overall_status(&task.miroir_id).await.expect("update succeeds");
// Then: Status is "failed"
let retrieved = registry.get(&task.miroir_id).expect("get succeeds").expect("task exists");
assert_eq!(retrieved.status, TaskStatus::Failed);
}
#[tokio::test(flavor = "multi_thread")]
async fn acceptance_4_in_memory_registry_survives_request_lifetime() {
// Given: A task registry
let registry = InMemoryTaskRegistry::new();
// When: Register a task in a request scope
let task_id = {
let mut node_task_uids = HashMap::new();
node_task_uids.insert("node-0".to_string(), 42);
let task = registry.register_with_metadata(
node_task_uids,
Some("test-index".to_string()),
Some("documentAdditionOrUpdate".to_string()),
).expect("registration succeeds");
task.miroir_id
};
// Then: Task is still accessible after the "request" scope ends
let retrieved = registry.get(&task_id).expect("get succeeds").expect("task exists");
assert_eq!(retrieved.miroir_id, task_id);
assert_eq!(retrieved.status, TaskStatus::Enqueued);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_all_nodes_succeeded_returns_succeeded() {
// Given: A task with 3 node tasks
let registry = InMemoryTaskRegistry::new();
let mut node_task_uids = HashMap::new();
node_task_uids.insert("node-0".to_string(), 42);
node_task_uids.insert("node-1".to_string(), 17);
node_task_uids.insert("node-2".to_string(), 88);
let task = registry.register_with_metadata(
node_task_uids,
Some("test-index".to_string()),
Some("documentAdditionOrUpdate".to_string()),
).expect("registration succeeds");
// When: All nodes succeed
registry.update_node_task(&task.miroir_id, "node-0", NodeTaskStatus::Succeeded).await.expect("update node-0");
registry.update_node_task(&task.miroir_id, "node-1", NodeTaskStatus::Succeeded).await.expect("update node-1");
registry.update_node_task(&task.miroir_id, "node-2", NodeTaskStatus::Succeeded).await.expect("update node-2");
// And: Update overall status to Succeeded (which sets finished_at)
registry.update_status(&task.miroir_id, TaskStatus::Succeeded).await.expect("update succeeds");
// Then: Status is "succeeded"
let retrieved = registry.get(&task.miroir_id).expect("get succeeds").expect("task exists");
assert_eq!(retrieved.status, TaskStatus::Succeeded);
assert!(retrieved.finished_at.is_some());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_task_list_with_filters() {
// Given: A task registry with multiple tasks
let registry = InMemoryTaskRegistry::new();
// Create tasks with different statuses and types
let mut node_task_uids_1 = HashMap::new();
node_task_uids_1.insert("node-0".to_string(), 1);
let task1 = registry.register_with_metadata(
node_task_uids_1.clone(),
Some("index-a".to_string()),
Some("documentAdditionOrUpdate".to_string()),
).expect("registration succeeds");
let mut node_task_uids_2 = HashMap::new();
node_task_uids_2.insert("node-1".to_string(), 2);
let task2 = registry.register_with_metadata(
node_task_uids_2.clone(),
Some("index-b".to_string()),
Some("documentDeletion".to_string()),
).expect("registration succeeds");
// Mark task1 as succeeded
registry.update_status(&task1.miroir_id, TaskStatus::Succeeded).await.expect("update status");
// When: Filter by status
let filter = TaskFilter {
status: Some(TaskStatus::Succeeded),
node_id: None,
index_uid: None,
task_type: None,
limit: None,
offset: None,
};
let tasks = registry.list(filter).expect("list succeeds");
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].miroir_id, task1.miroir_id);
// When: Filter by index_uid
let filter = TaskFilter {
status: None,
node_id: None,
index_uid: Some("index-b".to_string()),
task_type: None,
limit: None,
offset: None,
};
let tasks = registry.list(filter).expect("list succeeds");
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].miroir_id, task2.miroir_id);
// When: Filter by task_type
let filter = TaskFilter {
status: None,
node_id: None,
index_uid: None,
task_type: Some("documentDeletion".to_string()),
limit: None,
offset: None,
};
let tasks = registry.list(filter).expect("list succeeds");
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].miroir_id, task2.miroir_id);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_task_count() {
// Given: A task registry
let registry = InMemoryTaskRegistry::new();
// When: No tasks
assert_eq!(registry.count().await, 0);
// When: Add tasks
let mut node_task_uids = HashMap::new();
node_task_uids.insert("node-0".to_string(), 42);
registry.register_with_metadata(
node_task_uids.clone(),
Some("test-index".to_string()),
Some("documentAdditionOrUpdate".to_string()),
).expect("registration succeeds");
registry.register_with_metadata(
node_task_uids.clone(),
Some("test-index".to_string()),
Some("documentDeletion".to_string()),
).expect("registration succeeds");
// Then: Count reflects total
assert_eq!(registry.count().await, 2);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_exponential_backoff_polling_simulation() {
// Given: A task registry
let registry = InMemoryTaskRegistry::new();
// When: Register a task (spawns background polling)
let mut node_task_uids = HashMap::new();
node_task_uids.insert("node-0".to_string(), 42);
node_task_uids.insert("node-1".to_string(), 17);
node_task_uids.insert("node-2".to_string(), 88);
let task = registry.register_with_metadata(
node_task_uids,
Some("test-index".to_string()),
Some("documentAdditionOrUpdate".to_string()),
).expect("registration succeeds");
// The background poller simulates task completion after ~500ms
// Wait for completion
sleep(Duration::from_millis(800)).await;
// Then: Task should be succeeded
let retrieved = registry.get(&task.miroir_id).expect("get succeeds").expect("task exists");
assert_eq!(retrieved.status, TaskStatus::Succeeded);
assert!(retrieved.finished_at.is_some());
}

View file

@ -744,11 +744,19 @@ async fn main() -> anyhow::Result<()> {
///
/// On each tick it also updates the Prometheus metrics for node health,
/// shard coverage, shard distribution, and degraded shard count.
///
/// Implements unplanned node failure detection (plan §2):
/// - unhealthy_threshold consecutive failures → mark node as Failed
/// - recovery_threshold consecutive successes → recover from Failed/Degraded
async fn run_health_checker(state: admin_endpoints::AppState) {
let mut interval = tokio::time::interval(Duration::from_millis(
state.config.health.interval_ms,
));
// Track consecutive failures per node (in-memory only)
let mut consecutive_failures: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
let mut consecutive_successes: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
loop {
interval.tick().await;
@ -762,11 +770,6 @@ async fn run_health_checker(state: admin_endpoints::AppState) {
// Get current node status
let current_status = topo.node(node_id).map(|n| n.status);
// Skip nodes that are already Active/Healthy
if let Some(NodeStatus::Active) | Some(NodeStatus::Healthy) = current_status {
continue;
}
// Get node address
let node_address = match topo.node(node_id) {
Some(n) => n.address.clone(),
@ -784,6 +787,9 @@ async fn run_health_checker(state: admin_endpoints::AppState) {
Ok(c) => c,
Err(_) => {
all_healthy = false;
// Increment failure counter
*consecutive_failures.entry(node_id.clone()).or_insert(0) += 1;
consecutive_successes.remove(node_id);
continue;
}
};
@ -792,13 +798,89 @@ async fn run_health_checker(state: admin_endpoints::AppState) {
let result = client.get(&url).send().await;
if result.is_ok() && result.unwrap().status().is_success() {
// Node is reachable - promote to Active
// Node is reachable
consecutive_successes.entry(node_id.clone()).and_modify(|c| *c += 1).or_insert(1);
consecutive_failures.remove(node_id);
let successes = *consecutive_successes.get(node_id.as_str()).unwrap_or(&1);
// Check if we should promote from Joining/Degraded/Failed to Active
if let Some(node) = topo.node_mut(node_id) {
let _ = node.transition_to(NodeStatus::Active);
info!(node_id = %node_id, "node promoted to Active");
match node.status {
NodeStatus::Joining => {
// Promote joining nodes immediately on first success
let _ = node.transition_to(NodeStatus::Active);
info!(node_id = %node_id, "node promoted to Active (was Joining)");
}
NodeStatus::Degraded => {
// Need recovery_threshold consecutive successes to recover
if successes >= state.config.health.recovery_threshold {
let _ = node.transition_to(NodeStatus::Active);
info!(node_id = %node_id, "node recovered to Active (was Degraded)");
}
}
NodeStatus::Failed => {
// Need recovery_threshold consecutive successes to recover
if successes >= state.config.health.recovery_threshold {
let _ = node.transition_to(NodeStatus::Active);
info!(node_id = %node_id, "node recovered to Active (was Failed)");
// Trigger RF-restore if configured
if let Some(ref rebalancer) = state.rebalancer {
if let Some(ref worker) = state.rebalancer_worker {
let event = TopologyChangeEvent::NodeRecovered {
node_id: node_id.as_str().to_string(),
replica_group: node.replica_group,
index_uid: "default".to_string(),
};
let _ = worker.event_sender().try_send(event);
}
}
}
}
_ => {}
}
}
} else {
// Node is unreachable
all_healthy = false;
*consecutive_failures.entry(node_id.clone()).or_insert(0) += 1;
consecutive_successes.remove(node_id);
let failures = *consecutive_failures.get(node_id.as_str()).unwrap_or(&1);
// Check if we should mark as Degraded or Failed
if let Some(node) = topo.node_mut(node_id) {
match node.status {
NodeStatus::Active | NodeStatus::Healthy => {
// First failure → mark Degraded (not full failure yet)
if failures >= 1 {
let _ = node.transition_to(NodeStatus::Degraded);
warn!(node_id = %node_id, consecutive_failures = failures, "node marked Degraded");
}
}
NodeStatus::Degraded => {
// unhealthy_threshold consecutive failures → mark Failed
if failures >= state.config.health.unhealthy_threshold {
let _ = node.transition_to(NodeStatus::Failed);
warn!(node_id = %node_id, consecutive_failures = failures, "node marked Failed");
// Trigger failure handling
if let Some(ref rebalancer) = state.rebalancer {
if let Some(ref worker) = state.rebalancer_worker {
let event = TopologyChangeEvent::NodeFailed {
node_id: node_id.as_str().to_string(),
replica_group: node.replica_group,
index_uid: "default".to_string(),
};
let _ = worker.event_sender().try_send(event);
}
}
}
}
_ => {}
}
}
}
}

View file

@ -639,48 +639,6 @@ impl AppState {
None
};
Self {
config: Arc::new(config.clone()),
topology: topology_arc,
ready: Arc::new(RwLock::new(false)),
metrics: metrics.clone(),
version_state,
task_registry: Arc::new(task_registry),
redis_store,
task_store,
pod_id,
seal_key,
local_rate_limiter: LocalAdminRateLimiter::new(),
local_search_ui_rate_limiter: LocalSearchUiRateLimiter::new(),
rebalancer: Some(rebalancer),
migration_coordinator: Some(migration_coordinator),
rebalancer_worker,
rebalancer_metrics,
previous_docs_migrated: Arc::new(std::sync::atomic::AtomicU64::new(0)),
settings_broadcast,
drift_reconciler,
anti_entropy_worker,
session_manager,
alias_registry,
leader_election,
mode_c_worker,
replica_selector: {
let advanced_config = config.replica_selection.clone();
let selector_config = miroir_core::replica_selection::ReplicaSelectionConfig::from(advanced_config);
let observer = Arc::new(ReplicaSelectionMetricsObserver {
metrics: metrics.clone(),
});
Arc::new(ReplicaSelector::new_with_observer(selector_config, observer))
},
idempotency_cache: Arc::new(miroir_core::idempotency::IdempotencyCache::new(
config.idempotency.max_cached_keys as usize,
config.idempotency.ttl_seconds,
)),
query_coalescer: Arc::new(miroir_core::idempotency::QueryCoalescer::new(
config.query_coalescing.window_ms,
config.query_coalescing.max_pending_queries as usize,
config.query_coalescing.max_subscribers as usize,
)),
// Create group addition coordinator (needed for both API and sync worker)
let group_addition_coordinator = if has_task_store {
Some(Arc::new(RwLock::new(

View file

@ -742,7 +742,7 @@ async fn integration_session_pin_with_scatter_plan() {
}
// Plan scatter for pinned group 1
let plan = plan_search_scatter_for_group(&topo, 0, 2, 4, 1);
let plan = plan_search_scatter_for_group(&topo, 0, 2, 4, 1, None).await;
assert!(plan.is_some(), "Plan should be created for pinned group");
let plan = plan.unwrap();