P4.2 Node addition: migration-aware dual-write routing + admin routes

- Add write_targets_with_migration() to router: includes new node in write
  targets when a shard is in dual-write phase during node addition
- Wire migration-aware routing into write_documents_impl (documents.rs)
- Expose get_all_migrations() accessor on MigrationCoordinator for router use
- Add node management API routes: POST /nodes, DELETE /nodes/{id},
  POST /nodes/{id}/drain, GET /rebalance/status, replica_group CRUD
- Improve compute_shard_moves_for_new_node: prefer displaced node as
  migration source; fall back to lowest-scored old owner

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-11 21:43:40 -04:00
parent 2c09312964
commit ce3c0cb73c
5 changed files with 114 additions and 15 deletions

View file

@ -650,6 +650,11 @@ impl MigrationCoordinator {
})
}
/// Get all migration states for inspection (e.g., by router for dual-write).
pub fn get_all_migrations(&self) -> &HashMap<MigrationId, MigrationState> {
&self.migrations
}
/// Get the migration config.
pub fn config(&self) -> &MigrationConfig {
&self.config

View file

@ -912,6 +912,10 @@ impl Rebalancer {
/// Compute which shards should move to a new node.
/// Returns shard -> old_owner mapping for shards that will move.
///
/// For each shard where the new node enters the assignment, we select one
/// of the old owners as the migration source. If the new node displaced
/// an old owner, we use that node; otherwise we use the lowest-scored old owner.
async fn compute_shard_moves_for_new_node(
&self,
new_node_id: &str,
@ -931,7 +935,7 @@ impl Rebalancer {
let existing_nodes: Vec<_> = group.nodes().iter().cloned().collect();
let mut affected_shards = Vec::new();
// For each shard, check if adding the new node would change the assignment
// For each shard, check if the new node is in the new assignment
for shard_id in 0..topo.shards {
let old_assignment: Vec<_> = assign_shard_in_group(shard_id, &existing_nodes, rf)
.into_iter()
@ -947,17 +951,36 @@ impl Rebalancer {
.into_iter()
.collect();
// Check if the new node is in the new assignment
if new_assignment.contains(&new_node_id) {
// This shard moves to the new node
// Find which old node previously held this slot (the one being displaced)
// We need to pick an old node to migrate from
if let Some(old_owner) = old_assignment.first().cloned() {
// Only add if this shard wasn't already assigned to all existing nodes
// (i.e., the new node is actually taking a slot from someone)
affected_shards.push((ShardId(shard_id), old_owner));
}
// Check if new node is in the new assignment
if !new_assignment.contains(&new_node_id) {
continue;
}
// Find the source node for migration
// Priority 1: Use the displaced node (if any)
// Priority 2: Use the lowest-scored old owner (load balancing)
let source_node = if let Some(displaced) = old_assignment.iter()
.find(|n| !new_assignment.contains(n)) {
// An old node was displaced - use it as source
displaced.clone()
} else {
// No displacement - pick lowest-scored old owner
// Find the old owner with the minimum rendezvous score
let mut min_score = u64::MAX;
let mut min_node = old_assignment.first().cloned()
.unwrap_or_else(|| existing_nodes.first().unwrap().clone());
for old_node in &old_assignment {
let s = score(shard_id, old_node.as_str());
if s < min_score {
min_score = s;
min_node = old_node.clone();
}
}
min_node
};
affected_shards.push((ShardId(shard_id), source_node));
}
Ok(affected_shards)

View file

@ -1,5 +1,6 @@
//! Rendezvous hash-based routing and shard assignment.
use crate::migration::{MigrationCoordinator, ShardId};
use crate::topology::{Group, NodeId, Topology};
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
@ -45,6 +46,52 @@ pub fn write_targets(shard_id: u32, topology: &Topology) -> Vec<NodeId> {
.collect()
}
/// All write targets for a document, considering dual-write state during migration.
///
/// This is the migration-aware version of `write_targets`. When a shard is in
/// dual-write phase (node addition in progress), it includes both the old owner
/// AND the new node in the target list to ensure no writes are lost during migration.
///
/// # Arguments
/// * `shard_id` - The shard ID being written to
/// * `topology` - The cluster topology
/// * `migration_coordinator` - Optional migration coordinator for dual-write detection
///
/// # Returns
/// A vector of node IDs that should receive the write. During dual-write for a shard,
/// this includes both the standard RF nodes AND the new node.
pub fn write_targets_with_migration(
shard_id: u32,
topology: &Topology,
migration_coordinator: Option<&MigrationCoordinator>,
) -> Vec<NodeId> {
let shard = ShardId(shard_id);
// Start with standard write targets
let mut targets: Vec<NodeId> = write_targets(shard_id, topology);
// Check if this shard is in dual-write phase
if let Some(coordinator) = migration_coordinator {
if coordinator.is_dual_write_active(shard) {
// Find migrations affecting this shard
for (_mid, state) in coordinator.get_all_migrations() {
if state.affected_shards.contains_key(&shard) {
// This shard is being migrated - include the new node
// Convert migration NodeId to topology NodeId
let new_node_id = crate::topology::NodeId::new(state.new_node.0.clone());
// Only add if not already in targets
if !targets.contains(&new_node_id) {
targets.push(new_node_id);
}
}
}
}
}
targets
}
/// Select the replica group for a query (round-robin by query counter).
///
/// Returns 0 when there are no replica groups (caller handles the empty case).

View file

@ -57,4 +57,13 @@ where
.route("/canaries/from-capture/{index}", post(canary::create_from_capture::<S>))
// Explain endpoint (plan §13.20)
.route("/indexes/{index}/explain", post(explain::explain_search::<S>))
// Node management (plan §2 node addition flow)
.route("/nodes", post(admin_endpoints::add_node::<S>))
.route("/nodes/{id}", delete(admin_endpoints::remove_node::<S>))
.route("/nodes/{id}/drain", post(admin_endpoints::drain_node::<S>))
// Rebalancer status
.route("/rebalance/status", get(admin_endpoints::get_rebalance_status::<S>))
// Replica group management
.route("/replica_groups", post(admin_endpoints::add_replica_group::<S>))
.route("/replica_groups/{id}", delete(admin_endpoints::remove_replica_group::<S>))
}

View file

@ -16,7 +16,7 @@ use axum::response::{IntoResponse, Response};
use axum::http::{StatusCode, header};
use axum::{Json, Router};
use miroir_core::api_error::{MiroirCode, MeilisearchError};
use miroir_core::router::{shard_for_key, write_targets};
use miroir_core::router::{shard_for_key, write_targets_with_migration};
use miroir_core::scatter::{DeleteByIdsRequest, DeleteByFilterRequest, NodeClient, WriteRequest, WriteResponse};
use miroir_core::task::TaskRegistry;
use miroir_core::topology::{Topology, NodeId};
@ -284,9 +284,24 @@ async fn write_documents_impl(
let mut quorum_state = QuorumState::default();
let mut node_task_uids: HashMap<String, u64> = HashMap::new();
// For each shard, write to all RF nodes in each replica group
// For each shard, write to all RF nodes in each replica group (with dual-write support)
for (shard_id, docs) in node_documents {
let targets = write_targets(shard_id, &topology);
// Get migration coordinator reference for dual-write detection
let migration_coordinator = state.migration_coordinator.as_ref().map(|c| {
// We need a read lock on the coordinator
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
c.read().await
})
})
});
// Use migration-aware routing
let targets = write_targets_with_migration(
shard_id,
&topology,
migration_coordinator.as_deref(),
);
if targets.is_empty() {
return Err(MeilisearchError::new(
@ -403,7 +418,7 @@ async fn delete_by_ids_impl(
// For each shard, write to all RF nodes in each replica group
for (shard_id, ids) in shard_ids {
let targets = write_targets(shard_id, &topology);
let targets = miroir_core::router::write_targets(shard_id, &topology);
if targets.is_empty() {
return Err(MeilisearchError::new(