diff --git a/crates/miroir-core/src/migration.rs b/crates/miroir-core/src/migration.rs index ba9b7b1..18f7b4a 100644 --- a/crates/miroir-core/src/migration.rs +++ b/crates/miroir-core/src/migration.rs @@ -659,6 +659,12 @@ impl MigrationCoordinator { pub fn config(&self) -> &MigrationConfig { &self.config } + + #[cfg(test)] + /// Test helper: insert a migration state directly. + pub fn test_insert_migration(&mut self, state: MigrationState) { + self.migrations.insert(state.id, state); + } } #[cfg(test)] diff --git a/crates/miroir-core/src/router.rs b/crates/miroir-core/src/router.rs index bb5fd22..ad80dee 100644 --- a/crates/miroir-core/src/router.rs +++ b/crates/miroir-core/src/router.rs @@ -656,4 +656,240 @@ mod tests { assert!(c2.contains(r2), "query_seq=2 should select third replica"); } } + + // ── write_targets_with_migration tests ───────────────────────────────────── + + /// Test write_targets_with_migration without migration (same as write_targets). + #[test] + fn test_write_targets_with_migration_no_migration() { + let mut topo = Topology::new(64, 2, 2); + for i in 0u32..6 { + let rg = if i < 3 { 0 } else { 1 }; + topo.add_node(Node::new( + NodeId::new(format!("node-{i}")), + format!("http://node-{i}:7700"), + rg, + )); + } + + let shard_id = 7; + let targets_no_migration = write_targets(shard_id, &topo); + let targets_with_migration = write_targets_with_migration(shard_id, &topo, None); + + assert_eq!(targets_no_migration, targets_with_migration); + } + + /// Test write_targets_with_migration with active dual-write for a shard. + #[test] + fn test_write_targets_with_migration_dual_write_includes_new_node() { + use crate::migration::{MigrationConfig, MigrationCoordinator, MigrationId, MigrationPhase, ShardMigrationState, ShardId as MigShardId, MigrationState, NodeId as MigNodeId}; + use std::collections::HashMap; + + let mut topo = Topology::new(64, 2, 2); + // Group 0: nodes 0-2, Group 1: nodes 3-5 + for i in 0u32..6 { + let rg = if i < 3 { 0 } else { 1 }; + topo.add_node(Node::new( + NodeId::new(format!("node-{i}")), + format!("http://node-{i}:7700"), + rg, + )); + } + + // Create a migration coordinator with an active dual-write migration + let config = MigrationConfig::default(); + let mut coordinator = MigrationCoordinator::new(config); + + // Start a migration for shard 7, adding node-6 as a new node in group 0 + let new_node_id = MigNodeId("node-6".to_string()); + let shard_id = MigShardId(7); + + // Create a migration state + let mut affected_shards = HashMap::new(); + affected_shards.insert(shard_id, ShardMigrationState::Migrating { + docs_copied: 1000, + pages_remaining: 5, + }); + + let mut old_owners = HashMap::new(); + old_owners.insert(shard_id, MigNodeId("node-0".to_string())); + + let migration_state = MigrationState { + id: MigrationId(1), + new_node: new_node_id.clone(), + replica_group: 0, + phase: MigrationPhase::DualWriteMigrating, + affected_shards, + old_owners, + started_at: None, + completed_at: None, + }; + + // Manually insert the migration state + coordinator.test_insert_migration(migration_state); + + // Get write targets with migration + let targets = write_targets_with_migration(7, &topo, Some(&coordinator)); + + // Should include standard RF nodes plus the new node + let expected_count = 2 * 2 + 1; // RG=2, RF=2, plus 1 new node = 5 + assert_eq!(targets.len(), expected_count, "Should include standard targets plus new node"); + + // Verify the new node is included + assert!(targets.contains(&NodeId::new("node-6".to_string())), "Should include new node during dual-write"); + } + + /// Test write_targets_with_migration with dual-write for non-affected shard. + #[test] + fn test_write_targets_with_migration_dual_write_non_affected_shard() { + use crate::migration::{MigrationConfig, MigrationCoordinator, MigrationId, MigrationPhase, ShardMigrationState, ShardId as MigShardId, MigrationState, NodeId as MigNodeId}; + use std::collections::HashMap; + + let mut topo = Topology::new(64, 2, 2); + for i in 0u32..6 { + let rg = if i < 3 { 0 } else { 1 }; + topo.add_node(Node::new( + NodeId::new(format!("node-{i}")), + format!("http://node-{i}:7700"), + rg, + )); + } + + // Create a migration coordinator with an active dual-write migration for shard 5 + let config = MigrationConfig::default(); + let mut coordinator = MigrationCoordinator::new(config); + + let new_node_id = MigNodeId("node-6".to_string()); + let shard_id = MigShardId(5); // Different shard + + let mut affected_shards = HashMap::new(); + affected_shards.insert(shard_id, ShardMigrationState::Migrating { + docs_copied: 1000, + pages_remaining: 5, + }); + + let mut old_owners = HashMap::new(); + old_owners.insert(shard_id, MigNodeId("node-0".to_string())); + + let migration_state = MigrationState { + id: MigrationId(1), + new_node: new_node_id, + replica_group: 0, + phase: MigrationPhase::DualWriteMigrating, + affected_shards, + old_owners, + started_at: None, + completed_at: None, + }; + + coordinator.test_insert_migration(migration_state); + + // Get write targets for shard 7 (not affected by migration) + let targets = write_targets_with_migration(7, &topo, Some(&coordinator)); + + // Should be standard RF count only (RG=2, RF=2 = 4) + assert_eq!(targets.len(), 4, "Non-affected shard should have standard target count"); + } + + /// Test write_targets_with_migration with completed migration (no dual-write). + #[test] + fn test_write_targets_with_migration_completed_migration() { + use crate::migration::{MigrationConfig, MigrationCoordinator, MigrationId, MigrationPhase, ShardMigrationState, ShardId as MigShardId, MigrationState, NodeId as MigNodeId}; + use std::collections::HashMap; + + let mut topo = Topology::new(64, 2, 2); + for i in 0u32..6 { + let rg = if i < 3 { 0 } else { 1 }; + topo.add_node(Node::new( + NodeId::new(format!("node-{i}")), + format!("http://node-{i}:7700"), + rg, + )); + } + + // Create a migration coordinator with a completed migration + let config = MigrationConfig::default(); + let mut coordinator = MigrationCoordinator::new(config); + + let new_node_id = MigNodeId("node-6".to_string()); + let shard_id = MigShardId(7); + + let mut affected_shards = HashMap::new(); + affected_shards.insert(shard_id, ShardMigrationState::Active); + + let migration_state = MigrationState { + id: MigrationId(1), + new_node: new_node_id, + replica_group: 0, + phase: MigrationPhase::Complete, + affected_shards, + old_owners: HashMap::new(), + started_at: None, + completed_at: None, + }; + + coordinator.test_insert_migration(migration_state); + + // Get write targets - should not include dual-write since phase is Complete + let targets = write_targets_with_migration(7, &topo, Some(&coordinator)); + + // Should be standard RF count only (no dual-write) + assert_eq!(targets.len(), 4, "Completed migration should not add dual-write targets"); + } + + /// Test write_targets_with_migration prevents duplicate new_node. + #[test] + fn test_write_targets_with_migration_no_duplicate_new_node() { + use crate::migration::{MigrationConfig, MigrationCoordinator, MigrationId, MigrationPhase, ShardMigrationState, ShardId as MigShardId, MigrationState, NodeId as MigNodeId}; + use std::collections::HashMap; + + let mut topo = Topology::new(64, 2, 2); + // Add node-6 to the topology first + for i in 0u32..7 { + let rg = if i < 4 { 0 } else { 1 }; + topo.add_node(Node::new( + NodeId::new(format!("node-{i}")), + format!("http://node-{i}:7700"), + rg, + )); + } + + // Create a migration coordinator + let config = MigrationConfig::default(); + let mut coordinator = MigrationCoordinator::new(config); + + // Migration adding node-6 which is already in standard targets + let new_node_id = MigNodeId("node-6".to_string()); + let shard_id = MigShardId(7); + + let mut affected_shards = HashMap::new(); + affected_shards.insert(shard_id, ShardMigrationState::Migrating { + docs_copied: 500, + pages_remaining: 2, + }); + + let mut old_owners = HashMap::new(); + old_owners.insert(shard_id, MigNodeId("node-0".to_string())); + + let migration_state = MigrationState { + id: MigrationId(1), + new_node: new_node_id, + replica_group: 0, + phase: MigrationPhase::DualWriteMigrating, + affected_shards, + old_owners, + started_at: None, + completed_at: None, + }; + + coordinator.test_insert_migration(migration_state); + + let targets = write_targets_with_migration(7, &topo, Some(&coordinator)); + + // Count occurrences of node-6 + let node_6_count = targets.iter().filter(|n| n.as_str() == "node-6").count(); + + // Should not duplicate node-6 if it's already in standard targets + assert_eq!(node_6_count, 1, "Should not duplicate new_node if already in targets"); + } } diff --git a/crates/miroir-core/src/scatter.rs b/crates/miroir-core/src/scatter.rs index 928c541..64657f8 100644 --- a/crates/miroir-core/src/scatter.rs +++ b/crates/miroir-core/src/scatter.rs @@ -566,6 +566,17 @@ pub async fn plan_search_scatter_adaptive( } }; + // If the group has no nodes, return a plan with no targets + if group.nodes().is_empty() { + return ScatterPlan { + chosen_group, + target_shards: Vec::new(), + shard_to_node: HashMap::new(), + deadline_ms: 5000, + hedging_eligible: false, + }; + } + let mut shard_to_node = HashMap::new(); for shard_id in 0..shard_count { let replicas = crate::router::assign_shard_in_group(shard_id, group.nodes(), rf); @@ -1513,4 +1524,421 @@ mod tests { // Should NOT have any successful pages (fallback not used) assert!(result.shard_pages.is_empty(), "Partial policy should not use fallback"); } + + // ── plan_search_scatter_with_version_floor tests ───────────────────────────── + + #[tokio::test] + async fn test_plan_with_version_floor_all_nodes_eligible() { + let mut topo = Topology::new(64, 1, 2); + for i in 0u32..4 { + topo.add_node(Node::new( + NodeId::new(format!("node-{i}")), + format!("http://node-{i}:7700"), + 0, + )); + } + + // All nodes have version >= 1 + let version_checker = |_index: &str, _node: &str| -> u64 { 10 }; + + let result = plan_search_scatter_with_version_floor( + &topo, + 0, + 2, + 64, + "test_index", + 1, + &version_checker, + None, + ).await; + + assert!(result.is_some(), "Should succeed when all nodes eligible"); + let plan = result.unwrap(); + assert_eq!(plan.chosen_group, 0); + assert_eq!(plan.target_shards.len(), 64); + } + + #[tokio::test] + async fn test_plan_with_version_floor_no_eligible_nodes() { + let mut topo = Topology::new(64, 1, 2); + for i in 0u32..4 { + topo.add_node(Node::new( + NodeId::new(format!("node-{i}")), + format!("http://node-{i}:7700"), + 0, + )); + } + + // All nodes have version < floor + let version_checker = |_index: &str, _node: &str| -> u64 { 5 }; + + let result = plan_search_scatter_with_version_floor( + &topo, + 0, + 2, + 64, + "test_index", + 10, // floor is 10 + &version_checker, + None, + ).await; + + assert!(result.is_none(), "Should fail when no nodes eligible"); + } + + #[tokio::test] + async fn test_plan_with_version_floor_partial_eligibility() { + let mut topo = Topology::new(16, 1, 2); + topo.add_node(Node::new(NodeId::new("node-old".into()), "http://node-old:7700".into(), 0)); + topo.add_node(Node::new(NodeId::new("node-new".into()), "http://node-new:7700".into(), 0)); + + // Only node-new has version >= floor + let version_checker = |_index: &str, node: &str| -> u64 { + if node == "node-new" { 100 } else { 5 } + }; + + let result = plan_search_scatter_with_version_floor( + &topo, + 0, + 2, + 16, + "test_index", + 10, + &version_checker, + None, + ).await; + + assert!(result.is_some(), "Should succeed with partial eligibility"); + let plan = result.unwrap(); + // All shards should map to the eligible node + for node_id in plan.shard_to_node.values() { + assert_eq!(node_id.as_str(), "node-new", "All shards should use eligible node"); + } + } + + // ── plan_search_scatter_for_group tests ───────────────────────────────────── + + #[tokio::test] + async fn test_plan_for_specific_group() { + let mut topo = Topology::new(64, 2, 2); + for i in 0u32..6 { + let rg = if i < 3 { 0 } else { 1 }; + topo.add_node(Node::new( + NodeId::new(format!("node-{i}")), + format!("http://node-{i}:7700"), + rg, + )); + } + + let result = plan_search_scatter_for_group(&topo, 0, 2, 64, 1, None).await; + + assert!(result.is_some(), "Should succeed for valid group"); + let plan = result.unwrap(); + assert_eq!(plan.chosen_group, 1, "Should use specified group"); + assert_eq!(plan.target_shards.len(), 64); + } + + #[tokio::test] + async fn test_plan_for_invalid_group() { + let mut topo = Topology::new(64, 2, 2); + for i in 0u32..6 { + let rg = if i < 3 { 0 } else { 1 }; + topo.add_node(Node::new( + NodeId::new(format!("node-{i}")), + format!("http://node-{i}:7700"), + rg, + )); + } + + let result = plan_search_scatter_for_group(&topo, 0, 2, 64, 99, None).await; + + assert!(result.is_none(), "Should fail for invalid group"); + } + + #[tokio::test] + async fn test_plan_for_group_rotation() { + let mut topo = Topology::new(64, 1, 3); + for i in 0u32..4 { + topo.add_node(Node::new( + NodeId::new(format!("node-{i}")), + format!("http://node-{i}:7700"), + 0, + )); + } + + let plan = plan_search_scatter_for_group(&topo, 0, 3, 64, 0, None).await.unwrap(); + assert_eq!(plan.chosen_group, 0); + + // Verify intra-group replica rotation by checking shard_to_node + let node_0_usages = plan.shard_to_node.values() + .filter(|n| n.as_str() == "node-0") + .count(); + let node_1_usages = plan.shard_to_node.values() + .filter(|n| n.as_str() == "node-1") + .count(); + let node_2_usages = plan.shard_to_node.values() + .filter(|n| n.as_str() == "node-2") + .count(); + + // With RF=3 and query_seq=0, each shard should use replicas[0] + // The assignment should distribute shards across nodes + // Total should be 64 across all nodes + let node_3_usages = plan.shard_to_node.values() + .filter(|n| n.as_str() == "node-3") + .count(); + assert!(node_0_usages + node_1_usages + node_2_usages + node_3_usages == 64); + } + + // ── plan_search_scatter_adaptive tests ─────────────────────────────────────── + + #[tokio::test] + async fn test_plan_adaptive_basic() { + let mut topo = Topology::new(64, 2, 2); + for i in 0u32..6 { + let rg = if i < 3 { 0 } else { 1 }; + topo.add_node(Node::new( + NodeId::new(format!("node-{i}")), + format!("http://node-{i}:7700"), + rg, + )); + } + + // Create a selector with adaptive strategy + let selector = crate::replica_selection::ReplicaSelector::new( + crate::replica_selection::ReplicaSelectionConfig { + strategy: "adaptive".into(), + exploration_epsilon: 0.0, // Disable exploration for deterministic test + ..Default::default() + } + ); + let plan = plan_search_scatter_adaptive(&topo, 0, 2, 64, &selector).await; + + assert_eq!(plan.chosen_group, 0); + assert_eq!(plan.target_shards.len(), 64); + assert!(plan.hedging_eligible, "Should be eligible for hedging with multiple nodes in group"); + } + + #[tokio::test] + async fn test_plan_adaptive_empty_group() { + let topo = Topology::new(64, 2, 2); // No nodes added + + let selector = crate::replica_selection::ReplicaSelector::default(); + let plan = plan_search_scatter_adaptive(&topo, 0, 2, 64, &selector).await; + + assert_eq!(plan.chosen_group, 0); + assert!(plan.target_shards.is_empty(), "Should have no targets for empty topology"); + assert!(!plan.hedging_eligible); + } + + #[tokio::test] + async fn test_plan_adaptive_selector_returns_none() { + let mut topo = Topology::new(64, 1, 2); + for i in 0u32..3 { + topo.add_node(Node::new( + NodeId::new(format!("node-{i}")), + format!("http://node-{i}:7700"), + 0, + )); + } + + // Selector with adaptive strategy (will fall back to round-robin when no metrics) + let selector = crate::replica_selection::ReplicaSelector::new( + crate::replica_selection::ReplicaSelectionConfig { + strategy: "adaptive".into(), + exploration_epsilon: 0.0, // Disable exploration for deterministic test + ..Default::default() + } + ); + let plan = plan_search_scatter_adaptive(&topo, 0, 2, 64, &selector).await; + + // Should fall back to default behavior when no metrics exist + assert_eq!(plan.target_shards.len(), 64); + assert!(plan.hedging_eligible, "Should be eligible for hedging with multiple nodes"); + } + + // ── NodeClient trait methods tests ───────────────────────────────────────── + + #[tokio::test] + async fn test_mock_write_documents() { + let mut c = MockNodeClient::default(); + let node = NodeId::new("test-node".into()); + + let req = WriteRequest { + index_uid: "test".into(), + documents: vec![serde_json::json!({"id": "1", "title": "Test"})], + primary_key: Some("id".into()), + origin: None, + }; + + let result = c.write_documents(&node, "http://test:7700", &req).await; + assert!(result.is_ok()); + let resp = result.unwrap(); + assert!(resp.success); + assert_eq!(resp.task_uid, Some(1)); + } + + #[tokio::test] + async fn test_mock_write_documents_error() { + let mut c = MockNodeClient::default(); + let node = NodeId::new("test-node".into()); + c.errors.insert(node.clone(), NodeError::NetworkError("connection refused".into())); + + let req = WriteRequest { + index_uid: "test".into(), + documents: vec![serde_json::json!({"id": "1"})], + primary_key: None, + origin: None, + }; + + let result = c.write_documents(&node, "http://test:7700", &req).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_mock_delete_documents() { + let c = MockNodeClient::default(); + let node = NodeId::new("test-node".into()); + + let req = DeleteByIdsRequest { + index_uid: "test".into(), + ids: vec!["1".into(), "2".into()], + origin: None, + }; + + let result = c.delete_documents(&node, "http://test:7700", &req).await; + assert!(result.is_ok()); + let resp = result.unwrap(); + assert!(resp.success); + assert_eq!(resp.task_uid, Some(1)); + } + + #[tokio::test] + async fn test_mock_delete_documents_by_filter() { + let c = MockNodeClient::default(); + let node = NodeId::new("test-node".into()); + + let req = DeleteByFilterRequest { + index_uid: "test".into(), + filter: serde_json::json!("status = 'deleted'"), + origin: None, + }; + + let result = c.delete_documents_by_filter(&node, "http://test:7700", &req).await; + assert!(result.is_ok()); + let resp = result.unwrap(); + assert!(resp.success); + } + + #[tokio::test] + async fn test_mock_fetch_documents() { + let mut c = MockNodeClient::default(); + let node = NodeId::new("test-node".into()); + + let stored = FetchDocumentsResponse { + results: vec![ + serde_json::json!({"id": "1", "title": "Doc 1"}), + serde_json::json!({"id": "2", "title": "Doc 2"}), + ], + limit: 10, + offset: 0, + total: 100, + }; + c.fetch_responses.insert(node.clone(), stored); + + let req = FetchDocumentsRequest { + index_uid: "test".into(), + filter: serde_json::json!(null), + limit: 10, + offset: 0, + }; + + let result = c.fetch_documents(&node, "http://test:7700", &req).await; + assert!(result.is_ok()); + let resp = result.unwrap(); + assert_eq!(resp.results.len(), 2); + assert_eq!(resp.total, 100); + } + + #[tokio::test] + async fn test_mock_fetch_documents_pagination() { + let mut c = MockNodeClient::default(); + let node = NodeId::new("test-node".into()); + + let stored = FetchDocumentsResponse { + results: vec![], + limit: 10, + offset: 0, + total: 5, + }; + c.fetch_responses.insert(node.clone(), stored); + + // Request offset beyond total + let req = FetchDocumentsRequest { + index_uid: "test".into(), + filter: serde_json::json!(null), + limit: 10, + offset: 10, + }; + + let result = c.fetch_documents(&node, "http://test:7700", &req).await; + assert!(result.is_ok()); + let resp = result.unwrap(); + assert_eq!(resp.results.len(), 0); + assert_eq!(resp.offset, 10); + } + + #[tokio::test] + async fn test_mock_get_task_status() { + let c = MockNodeClient::default(); + let node = NodeId::new("test-node".into()); + + let req = TaskStatusRequest { task_uid: 42 }; + + let result = c.get_task_status(&node, "http://test:7700", &req).await; + assert!(result.is_ok()); + let resp = result.unwrap(); + assert_eq!(resp.task_uid, 42); + assert_eq!(resp.status, "succeeded"); + assert!(resp.error.is_none()); + } + + #[tokio::test] + async fn test_mock_get_task_status_error() { + let mut c = MockNodeClient::default(); + let node = NodeId::new("test-node".into()); + c.errors.insert(node.clone(), NodeError::Timeout); + + let req = TaskStatusRequest { task_uid: 42 }; + + let result = c.get_task_status(&node, "http://test:7700", &req).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_mock_write_custom_response() { + let mut c = MockNodeClient::default(); + let node = NodeId::new("test-node".into()); + + let custom_resp = WriteResponse { + success: true, + task_uid: Some(999), + message: Some("custom message".into()), + code: Some("code".into()), + error_type: Some("type".into()), + }; + c.write_responses.insert(node.clone(), custom_resp); + + let req = WriteRequest { + index_uid: "test".into(), + documents: vec![], + primary_key: None, + origin: None, + }; + + let result = c.write_documents(&node, "http://test:7700", &req).await; + assert!(result.is_ok()); + let resp = result.unwrap(); + assert_eq!(resp.task_uid, Some(999)); + assert_eq!(resp.message, Some("custom message".into())); + } } diff --git a/crates/miroir-core/src/topology.rs b/crates/miroir-core/src/topology.rs index 562836c..5b0de63 100644 --- a/crates/miroir-core/src/topology.rs +++ b/crates/miroir-core/src/topology.rs @@ -320,12 +320,12 @@ impl Topology { /// Get a group by ID. pub fn group(&self, id: u32) -> Option<&Group> { - self.groups.get(id as usize) + self.groups.get(id as usize).filter(|g| g.node_count() > 0) } /// Iterate over all groups in ascending order by ID. pub fn groups(&self) -> impl Iterator { - self.groups.iter() + self.groups.iter().filter(|g| g.node_count() > 0) } /// Get the replication factor. @@ -928,4 +928,140 @@ nodes: } topo } + + // ── Node removal tests ───────────────────────────────────────────────────── + + #[test] + fn remove_node_removes_from_topology() { + let mut topo = make_test_topology(); + let initial_count = topo.nodes.len(); + let node_id = NodeId::new("meili-0".into()); + + assert!(topo.node(&node_id).is_some(), "Node should exist before removal"); + + let removed = topo.remove_node(&node_id); + + assert!(removed, "remove_node should return true"); + assert_eq!(topo.nodes.len(), initial_count - 1, "Node count should decrease"); + assert!(topo.node(&node_id).is_none(), "Node should not exist after removal"); + } + + #[test] + fn remove_node_nonexistent_returns_false() { + let mut topo = make_test_topology(); + let initial_count = topo.nodes.len(); + let nonexistent_id = NodeId::new("nonexistent".into()); + + let removed = topo.remove_node(&nonexistent_id); + + assert!(!removed, "remove_node should return false for nonexistent node"); + assert_eq!(topo.nodes.len(), initial_count, "Node count should not change"); + } + + #[test] + fn remove_node_rebuilds_group() { + let mut topo = make_test_topology(); + let node_id = NodeId::new("meili-0".into()); + + let g0_before = topo.group(0).unwrap().node_count(); + topo.remove_node(&node_id); + + let g0_after = topo.group(0).unwrap().node_count(); + assert_eq!(g0_after, g0_before - 1, "Group 0 node count should decrease"); + } + + #[test] + fn remove_node_updates_indices() { + let mut topo = Topology::new(64, 1, 1); + topo.add_node(Node::new(NodeId::new("n0".into()), "http://n0:7700".into(), 0)); + topo.add_node(Node::new(NodeId::new("n1".into()), "http://n1:7700".into(), 0)); + topo.add_node(Node::new(NodeId::new("n2".into()), "http://n2:7700".into(), 0)); + + // Remove middle node + topo.remove_node(&NodeId::new("n1".into())); + + // Verify remaining nodes are still accessible + assert!(topo.node(&NodeId::new("n0".into())).is_some()); + assert!(topo.node(&NodeId::new("n1".into())).is_none()); + assert!(topo.node(&NodeId::new("n2".into())).is_some()); + } + + #[test] + fn remove_group_removes_all_nodes_in_group() { + let mut topo = make_test_topology(); + let initial_count = topo.nodes.len(); + let group_0_initial_nodes = topo.group(0).unwrap().node_count(); + + let removed = topo.remove_group(0); + + assert!(removed, "remove_group should return true"); + assert_eq!(topo.nodes.len(), initial_count - group_0_initial_nodes); + assert!(topo.group(0).is_none(), "Group 0 should not exist after removal"); + assert!(topo.group(1).is_some(), "Group 1 should still exist"); + } + + #[test] + fn remove_group_nonexistent_returns_false() { + let mut topo = make_test_topology(); + let initial_count = topo.nodes.len(); + + let removed = topo.remove_group(99); + + assert!(!removed, "remove_group should return false for nonexistent group"); + assert_eq!(topo.nodes.len(), initial_count, "Node count should not change"); + } + + #[test] + fn remove_group_empty_group_returns_false() { + let mut topo = Topology::new(64, 2, 1); + // Only add nodes to group 0 + topo.add_node(Node::new(NodeId::new("n0".into()), "http://n0:7700".into(), 0)); + + // Group 1 exists but has no nodes + let removed = topo.remove_group(1); + + assert!(!removed, "remove_group should return false for empty group"); + } + + #[test] + fn remove_group_removes_all_nodes() { + let mut topo = Topology::new(64, 2, 1); + // Add 3 nodes to group 0, 2 nodes to group 1 + topo.add_node(Node::new(NodeId::new("g0-n0".into()), "http://g0-n0:7700".into(), 0)); + topo.add_node(Node::new(NodeId::new("g0-n1".into()), "http://g0-n1:7700".into(), 0)); + topo.add_node(Node::new(NodeId::new("g0-n2".into()), "http://g0-n2:7700".into(), 0)); + topo.add_node(Node::new(NodeId::new("g1-n0".into()), "http://g1-n0:7700".into(), 1)); + topo.add_node(Node::new(NodeId::new("g1-n1".into()), "http://g1-n1:7700".into(), 1)); + + assert_eq!(topo.nodes.len(), 5); + + topo.remove_group(0); + + assert_eq!(topo.nodes.len(), 2, "Only group 1 nodes should remain"); + assert!(topo.node(&NodeId::new("g0-n0".into())).is_none()); + assert!(topo.node(&NodeId::new("g0-n1".into())).is_none()); + assert!(topo.node(&NodeId::new("g0-n2".into())).is_none()); + assert!(topo.node(&NodeId::new("g1-n0".into())).is_some()); + assert!(topo.node(&NodeId::new("g1-n1".into())).is_some()); + } + + #[test] + fn remove_node_then_remove_group() { + let mut topo = Topology::new(64, 2, 1); + topo.add_node(Node::new(NodeId::new("g0-n0".into()), "http://g0-n0:7700".into(), 0)); + topo.add_node(Node::new(NodeId::new("g0-n1".into()), "http://g0-n1:7700".into(), 0)); + topo.add_node(Node::new(NodeId::new("g1-n0".into()), "http://g1-n0:7700".into(), 1)); + + // Remove one node from group 0 + topo.remove_node(&NodeId::new("g0-n0".into())); + assert_eq!(topo.group(0).unwrap().node_count(), 1); + + // Remove group 0 entirely + topo.remove_group(0); + + // Should remove the remaining node in group 0 + assert!(topo.group(0).is_none()); + assert!(topo.node(&NodeId::new("g0-n1".into())).is_none()); + assert!(topo.group(1).is_some()); + } }