From ce3c0cb73ce9753eee6208bedf513d4f1f2ef2d1 Mon Sep 17 00:00:00 2001 From: jedarden Date: Mon, 11 May 2026 21:43:40 -0400 Subject: [PATCH] P4.2 Node addition: migration-aware dual-write routing + admin routes - Add write_targets_with_migration() to router: includes new node in write targets when a shard is in dual-write phase during node addition - Wire migration-aware routing into write_documents_impl (documents.rs) - Expose get_all_migrations() accessor on MigrationCoordinator for router use - Add node management API routes: POST /nodes, DELETE /nodes/{id}, POST /nodes/{id}/drain, GET /rebalance/status, replica_group CRUD - Improve compute_shard_moves_for_new_node: prefer displaced node as migration source; fall back to lowest-scored old owner Co-Authored-By: Claude Sonnet 4.6 --- crates/miroir-core/src/migration.rs | 5 +++ crates/miroir-core/src/rebalancer.rs | 45 +++++++++++++++----- crates/miroir-core/src/router.rs | 47 +++++++++++++++++++++ crates/miroir-proxy/src/routes/admin.rs | 9 ++++ crates/miroir-proxy/src/routes/documents.rs | 23 ++++++++-- 5 files changed, 114 insertions(+), 15 deletions(-) diff --git a/crates/miroir-core/src/migration.rs b/crates/miroir-core/src/migration.rs index 441e7ba..ba9b7b1 100644 --- a/crates/miroir-core/src/migration.rs +++ b/crates/miroir-core/src/migration.rs @@ -650,6 +650,11 @@ impl MigrationCoordinator { }) } + /// Get all migration states for inspection (e.g., by router for dual-write). + pub fn get_all_migrations(&self) -> &HashMap { + &self.migrations + } + /// Get the migration config. pub fn config(&self) -> &MigrationConfig { &self.config diff --git a/crates/miroir-core/src/rebalancer.rs b/crates/miroir-core/src/rebalancer.rs index 128e809..909d037 100644 --- a/crates/miroir-core/src/rebalancer.rs +++ b/crates/miroir-core/src/rebalancer.rs @@ -912,6 +912,10 @@ impl Rebalancer { /// Compute which shards should move to a new node. /// Returns shard -> old_owner mapping for shards that will move. + /// + /// For each shard where the new node enters the assignment, we select one + /// of the old owners as the migration source. If the new node displaced + /// an old owner, we use that node; otherwise we use the lowest-scored old owner. async fn compute_shard_moves_for_new_node( &self, new_node_id: &str, @@ -931,7 +935,7 @@ impl Rebalancer { let existing_nodes: Vec<_> = group.nodes().iter().cloned().collect(); let mut affected_shards = Vec::new(); - // For each shard, check if adding the new node would change the assignment + // For each shard, check if the new node is in the new assignment for shard_id in 0..topo.shards { let old_assignment: Vec<_> = assign_shard_in_group(shard_id, &existing_nodes, rf) .into_iter() @@ -947,17 +951,36 @@ impl Rebalancer { .into_iter() .collect(); - // Check if the new node is in the new assignment - if new_assignment.contains(&new_node_id) { - // This shard moves to the new node - // Find which old node previously held this slot (the one being displaced) - // We need to pick an old node to migrate from - if let Some(old_owner) = old_assignment.first().cloned() { - // Only add if this shard wasn't already assigned to all existing nodes - // (i.e., the new node is actually taking a slot from someone) - affected_shards.push((ShardId(shard_id), old_owner)); - } + // Check if new node is in the new assignment + if !new_assignment.contains(&new_node_id) { + continue; } + + // Find the source node for migration + // Priority 1: Use the displaced node (if any) + // Priority 2: Use the lowest-scored old owner (load balancing) + let source_node = if let Some(displaced) = old_assignment.iter() + .find(|n| !new_assignment.contains(n)) { + // An old node was displaced - use it as source + displaced.clone() + } else { + // No displacement - pick lowest-scored old owner + // Find the old owner with the minimum rendezvous score + let mut min_score = u64::MAX; + let mut min_node = old_assignment.first().cloned() + .unwrap_or_else(|| existing_nodes.first().unwrap().clone()); + + for old_node in &old_assignment { + let s = score(shard_id, old_node.as_str()); + if s < min_score { + min_score = s; + min_node = old_node.clone(); + } + } + min_node + }; + + affected_shards.push((ShardId(shard_id), source_node)); } Ok(affected_shards) diff --git a/crates/miroir-core/src/router.rs b/crates/miroir-core/src/router.rs index 058ab6d..bb5fd22 100644 --- a/crates/miroir-core/src/router.rs +++ b/crates/miroir-core/src/router.rs @@ -1,5 +1,6 @@ //! Rendezvous hash-based routing and shard assignment. +use crate::migration::{MigrationCoordinator, ShardId}; use crate::topology::{Group, NodeId, Topology}; use std::collections::HashSet; use std::hash::{Hash, Hasher}; @@ -45,6 +46,52 @@ pub fn write_targets(shard_id: u32, topology: &Topology) -> Vec { .collect() } +/// All write targets for a document, considering dual-write state during migration. +/// +/// This is the migration-aware version of `write_targets`. When a shard is in +/// dual-write phase (node addition in progress), it includes both the old owner +/// AND the new node in the target list to ensure no writes are lost during migration. +/// +/// # Arguments +/// * `shard_id` - The shard ID being written to +/// * `topology` - The cluster topology +/// * `migration_coordinator` - Optional migration coordinator for dual-write detection +/// +/// # Returns +/// A vector of node IDs that should receive the write. During dual-write for a shard, +/// this includes both the standard RF nodes AND the new node. +pub fn write_targets_with_migration( + shard_id: u32, + topology: &Topology, + migration_coordinator: Option<&MigrationCoordinator>, +) -> Vec { + let shard = ShardId(shard_id); + + // Start with standard write targets + let mut targets: Vec = write_targets(shard_id, topology); + + // Check if this shard is in dual-write phase + if let Some(coordinator) = migration_coordinator { + if coordinator.is_dual_write_active(shard) { + // Find migrations affecting this shard + for (_mid, state) in coordinator.get_all_migrations() { + if state.affected_shards.contains_key(&shard) { + // This shard is being migrated - include the new node + // Convert migration NodeId to topology NodeId + let new_node_id = crate::topology::NodeId::new(state.new_node.0.clone()); + + // Only add if not already in targets + if !targets.contains(&new_node_id) { + targets.push(new_node_id); + } + } + } + } + } + + targets +} + /// Select the replica group for a query (round-robin by query counter). /// /// Returns 0 when there are no replica groups (caller handles the empty case). diff --git a/crates/miroir-proxy/src/routes/admin.rs b/crates/miroir-proxy/src/routes/admin.rs index 81d98d4..bea8fdc 100644 --- a/crates/miroir-proxy/src/routes/admin.rs +++ b/crates/miroir-proxy/src/routes/admin.rs @@ -57,4 +57,13 @@ where .route("/canaries/from-capture/{index}", post(canary::create_from_capture::)) // Explain endpoint (plan §13.20) .route("/indexes/{index}/explain", post(explain::explain_search::)) + // Node management (plan §2 node addition flow) + .route("/nodes", post(admin_endpoints::add_node::)) + .route("/nodes/{id}", delete(admin_endpoints::remove_node::)) + .route("/nodes/{id}/drain", post(admin_endpoints::drain_node::)) + // Rebalancer status + .route("/rebalance/status", get(admin_endpoints::get_rebalance_status::)) + // Replica group management + .route("/replica_groups", post(admin_endpoints::add_replica_group::)) + .route("/replica_groups/{id}", delete(admin_endpoints::remove_replica_group::)) } diff --git a/crates/miroir-proxy/src/routes/documents.rs b/crates/miroir-proxy/src/routes/documents.rs index 635d689..8e03607 100644 --- a/crates/miroir-proxy/src/routes/documents.rs +++ b/crates/miroir-proxy/src/routes/documents.rs @@ -16,7 +16,7 @@ use axum::response::{IntoResponse, Response}; use axum::http::{StatusCode, header}; use axum::{Json, Router}; use miroir_core::api_error::{MiroirCode, MeilisearchError}; -use miroir_core::router::{shard_for_key, write_targets}; +use miroir_core::router::{shard_for_key, write_targets_with_migration}; use miroir_core::scatter::{DeleteByIdsRequest, DeleteByFilterRequest, NodeClient, WriteRequest, WriteResponse}; use miroir_core::task::TaskRegistry; use miroir_core::topology::{Topology, NodeId}; @@ -284,9 +284,24 @@ async fn write_documents_impl( let mut quorum_state = QuorumState::default(); let mut node_task_uids: HashMap = HashMap::new(); - // For each shard, write to all RF nodes in each replica group + // For each shard, write to all RF nodes in each replica group (with dual-write support) for (shard_id, docs) in node_documents { - let targets = write_targets(shard_id, &topology); + // Get migration coordinator reference for dual-write detection + let migration_coordinator = state.migration_coordinator.as_ref().map(|c| { + // We need a read lock on the coordinator + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + c.read().await + }) + }) + }); + + // Use migration-aware routing + let targets = write_targets_with_migration( + shard_id, + &topology, + migration_coordinator.as_deref(), + ); if targets.is_empty() { return Err(MeilisearchError::new( @@ -403,7 +418,7 @@ async fn delete_by_ids_impl( // For each shard, write to all RF nodes in each replica group for (shard_id, ids) in shard_ids { - let targets = write_targets(shard_id, &topology); + let targets = miroir_core::router::write_targets(shard_id, &topology); if targets.is_empty() { return Err(MeilisearchError::new(