P4.1 Rebalancer background worker with advisory lock

Implements plan §4 "Rebalancer" background task:
- Advisory lock via leader_lease (only one pod runs the rebalancer)
- Reacts to topology change events (node add/drain/fail/recover)
- Computes affected shards using the Phase 1 router
- Drives the migration state machine for each affected shard
- Updates Prometheus metrics (plan §10)
- Progress persistence via jobs table for resumability

Key features:
- Per-index leader lease scope (rebalance:<index>)
- Per-shard migration state machine with 7 phases
- Concurrency bound via max_concurrent_migrations config
- Cancellation support (pause/resume in-progress rebalancing)
- Metrics: miroir_rebalance_in_progress, documents_migrated_total, duration_seconds

Integration:
- Admin API endpoints (POST /_miroir/nodes, drain, remove) send events to worker
- Health checker syncs rebalancer metrics to Prometheus
- Worker loads persisted jobs on startup for crash recovery

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-05 10:51:27 -04:00
parent 5b0fca1520
commit 3dd63fdc67
5 changed files with 1403 additions and 68 deletions

View file

@ -20,7 +20,7 @@ futures-util = "0.3"
# Redis support (optional — enable via `redis-store` feature)
redis = { version = "0.27", features = ["aio", "tokio-comp", "connection-manager"], optional = true }
hex = "0.4"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "sync"] }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "sync", "macros"] }
async-trait = "0.1"
rand = "0.8"
reqwest = { version = "0.12", features = ["json"], default-features = false }

View file

@ -20,6 +20,7 @@ pub mod migration;
pub mod multi_search;
pub mod query_planner;
pub mod rebalancer;
pub mod rebalancer_worker;
pub mod replica_selection;
pub mod reshard;
pub mod router;

File diff suppressed because it is too large Load diff

View file

@ -5,6 +5,7 @@ use axum::{
};
use miroir_core::{
config::MiroirConfig,
rebalancer_worker::{RebalancerWorker, RebalancerWorkerConfig, TopologyChangeEvent},
topology::{NodeStatus, Topology},
};
use std::net::SocketAddr;
@ -129,12 +130,15 @@ impl FromRef<UnifiedState> for admin_endpoints::AppState {
version_state: state.admin.version_state.clone(),
task_registry: state.admin.task_registry.clone(),
redis_store: state.redis_store.clone(),
task_store: state.admin.task_store.clone(),
pod_id: state.pod_id.clone(),
seal_key: state.auth.seal_key.clone(),
local_rate_limiter: admin_endpoints::LocalAdminRateLimiter::new(),
local_search_ui_rate_limiter: admin_endpoints::LocalSearchUiRateLimiter::new(),
rebalancer: state.admin.rebalancer.clone(),
migration_coordinator: state.admin.migration_coordinator.clone(),
rebalancer_worker: state.admin.rebalancer_worker.clone(),
rebalancer_metrics: state.admin.rebalancer_metrics.clone(),
}
}
}
@ -285,6 +289,26 @@ async fn main() -> anyhow::Result<()> {
run_health_checker(health_checker_state).await;
});
// Start rebalancer worker background task (plan §4)
if let Some(ref worker) = state.admin.rebalancer_worker {
let worker = worker.clone();
let pod_id = state.pod_id.clone();
tokio::spawn(async move {
info!(
pod_id = %pod_id,
"rebalancer worker task starting"
);
// Load any persisted rebalance jobs from previous runs
if let Err(e) = worker.load_persisted_jobs().await {
error!(error = %e, "failed to load persisted rebalance jobs");
}
worker.run().await;
error!("rebalancer worker task exited unexpectedly");
});
} else {
info!("rebalancer worker not available (no task store configured)");
}
// Start scoped key rotation background task (requires Redis)
if let Some(ref redis) = state.redis_store {
let rotation_state = ScopedKeyRotationState {
@ -622,6 +646,9 @@ async fn run_health_checker(state: admin_endpoints::AppState) {
let task_count = state.task_registry.count();
state.metrics.set_task_registry_size(task_count as f64);
// Sync rebalancer metrics to Prometheus
state.sync_rebalancer_metrics_to_prometheus().await;
// Mark ready once all configured nodes are reachable
if all_healthy && !state.config.nodes.is_empty() {
state.mark_ready().await;

View file

@ -9,7 +9,8 @@ use axum::{
use miroir_core::{
config::MiroirConfig,
migration::{MigrationConfig, MigrationCoordinator},
rebalancer::{MigrationExecutor, Rebalancer, RebalancerConfig},
rebalancer::{MigrationExecutor, Rebalancer, RebalancerConfig, RebalancerMetrics},
rebalancer_worker::{RebalancerWorker, RebalancerWorkerConfig},
router,
scatter::{DeleteByFilterRequest, FetchDocumentsRequest, FetchDocumentsResponse, WriteRequest},
task_registry::TaskRegistryImpl,
@ -308,12 +309,17 @@ pub struct AppState {
pub version_state: VersionState,
pub task_registry: Arc<TaskRegistryImpl>,
pub redis_store: Option<RedisTaskStore>,
pub task_store: Option<Arc<dyn TaskStore>>,
pub pod_id: String,
pub seal_key: SealKey,
pub local_rate_limiter: LocalAdminRateLimiter,
pub local_search_ui_rate_limiter: LocalSearchUiRateLimiter,
pub rebalancer: Option<Arc<Rebalancer>>,
pub migration_coordinator: Option<Arc<RwLock<MigrationCoordinator>>>,
pub rebalancer_worker: Option<Arc<RebalancerWorker>>,
pub rebalancer_metrics: Arc<RwLock<RebalancerMetrics>>,
/// Track previous documents migrated value for delta calculation.
pub previous_docs_migrated: Arc<std::sync::atomic::AtomicU64>,
}
impl AppState {
@ -397,11 +403,50 @@ impl AppState {
));
let rebalancer = Arc::new(Rebalancer::new(
rebalancer_config,
rebalancer_config.clone(),
topology_arc.clone(),
migration_config,
migration_config.clone(),
).with_migration_executor(migration_executor));
// Create rebalancer metrics
let rebalancer_metrics = Arc::new(RwLock::new(RebalancerMetrics::default()));
// Get or create task store for rebalancer worker
let task_store: Option<Arc<dyn TaskStore>> = match config.task_store.backend.as_str() {
"redis" => {
redis_store.as_ref().map(|s| Arc::new(s.clone()) as Arc<dyn TaskStore>)
}
"sqlite" if !config.task_store.path.is_empty() => {
Some(Arc::new(miroir_core::task_store::SqliteTaskStore::open(
std::path::Path::new(&config.task_store.path)
).expect("Failed to open SQLite task store")) as Arc<dyn TaskStore>)
}
_ => None,
};
// Create rebalancer worker if task store is available
let rebalancer_worker = if let Some(ref store) = task_store {
let worker_config = RebalancerWorkerConfig {
max_concurrent_migrations: config.rebalancer.max_concurrent_migrations,
lease_ttl_secs: 10,
lease_renewal_interval_ms: 2000,
migration_batch_size: 1000,
migration_batch_delay_ms: 100,
event_channel_capacity: 100,
};
Some(Arc::new(RebalancerWorker::new(
worker_config,
topology_arc.clone(),
store.clone(),
rebalancer.clone(),
migration_coordinator.clone(),
rebalancer_metrics.clone(),
pod_id.clone(),
)))
} else {
None
};
Self {
config: Arc::new(config),
topology: topology_arc,
@ -410,12 +455,16 @@ impl AppState {
version_state,
task_registry: Arc::new(task_registry),
redis_store,
task_store,
pod_id,
seal_key,
local_rate_limiter: LocalAdminRateLimiter::new(),
local_search_ui_rate_limiter: LocalSearchUiRateLimiter::new(),
rebalancer: Some(rebalancer),
migration_coordinator: Some(migration_coordinator),
rebalancer_worker,
rebalancer_metrics,
previous_docs_migrated: Arc::new(std::sync::atomic::AtomicU64::new(0)),
}
}
@ -441,6 +490,29 @@ impl AppState {
true
}
/// Sync rebalancer metrics to Prometheus (called from health checker).
pub async fn sync_rebalancer_metrics_to_prometheus(&self) {
if let Some(ref rebalancer) = self.rebalancer {
let rebalancer_metrics = rebalancer.metrics.read().await;
let in_progress = rebalancer_metrics.rebalance_start_time.is_some();
self.metrics.set_rebalance_in_progress(in_progress);
// Calculate delta for documents migrated counter
let current_total = rebalancer_metrics.documents_migrated_total;
let previous = self.previous_docs_migrated.load(std::sync::atomic::Ordering::Relaxed);
if current_total > previous {
let delta = current_total - previous;
self.metrics.inc_rebalance_documents_migrated(delta);
self.previous_docs_migrated.store(current_total, std::sync::atomic::Ordering::Relaxed);
}
let duration = rebalancer_metrics.current_duration_secs();
if duration > 0.0 {
self.metrics.observe_rebalance_duration(duration);
}
}
}
}
/// Response for GET /_miroir/topology (plan §10 JSON shape).
@ -909,9 +981,6 @@ where
{
let app_state = AppState::from_ref(&state);
let rebalancer = app_state.rebalancer.as_ref()
.ok_or_else(|| (StatusCode::SERVICE_UNAVAILABLE, "Rebalancer not initialized".into()))?;
let id = body.get("id")
.and_then(|v| v.as_str())
.ok_or_else(|| (StatusCode::BAD_REQUEST, "Missing 'id' field".into()))?
@ -927,23 +996,42 @@ where
.ok_or_else(|| (StatusCode::BAD_REQUEST, "Missing 'replica_group' field".into()))?
as u32;
use miroir_core::rebalancer::AddNodeRequest;
let request = AddNodeRequest { id: id.clone(), address, replica_group };
// Get index_uid from body or use default
let index_uid = body.get("index_uid")
.and_then(|v| v.as_str())
.unwrap_or("default")
.to_string();
match rebalancer.add_node(request).await {
Ok(result) => {
info!(node_id = %id, replica_group, "Node addition started");
Ok(Json(serde_json::json!({
"operation_id": result.id,
"message": result.message,
"migrations_count": result.migrations_count,
})))
}
Err(e) => {
error!(error = %e, node_id = %id, "Node addition failed");
Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
}
// Add node to topology
{
let mut topo = app_state.topology.write().await;
let node = miroir_core::topology::Node::new(
miroir_core::topology::NodeId::new(id.clone()),
address.clone(),
replica_group,
);
topo.add_node(node);
}
// Send event to rebalancer worker if available
if let Some(ref worker) = app_state.rebalancer_worker {
use miroir_core::rebalancer_worker::TopologyChangeEvent;
let event = TopologyChangeEvent::NodeAdded {
node_id: id.clone(),
replica_group,
index_uid: index_uid.clone(),
};
let _ = worker.event_sender().try_send(event);
info!(node_id = %id, replica_group, "Sent NodeAdded event to rebalancer worker");
}
info!(node_id = %id, replica_group, "Node addition initiated");
Ok(Json(serde_json::json!({
"node_id": id,
"replica_group": replica_group,
"index_uid": index_uid,
"message": "Node addition initiated - rebalancer worker will handle migration",
})))
}
/// DELETE /_miroir/nodes/{id} — Remove a node from the cluster.
@ -958,29 +1046,36 @@ where
{
let app_state = AppState::from_ref(&state);
let rebalancer = app_state.rebalancer.as_ref()
.ok_or_else(|| (StatusCode::SERVICE_UNAVAILABLE, "Rebalancer not initialized".into()))?;
let force = body.get("force")
.and_then(|v| v.as_bool())
.unwrap_or(false);
use miroir_core::rebalancer::RemoveNodeRequest;
let request = RemoveNodeRequest { node_id: node_id.clone(), force };
// Check node status
let (node_status, replica_group) = {
let topo = app_state.topology.read().await;
let node = topo.node(&miroir_core::topology::NodeId::new(node_id.clone()))
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("Node {} not found", node_id)))?;
(node.status, node.replica_group)
};
match rebalancer.remove_node(request).await {
Ok(result) => {
info!(node_id = %node_id, "Node removal completed");
Ok(Json(serde_json::json!({
"operation_id": result.id,
"message": result.message,
})))
}
Err(e) => {
error!(error = %e, node_id = %node_id, "Node removal failed");
Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
}
if !force && !matches!(node_status, miroir_core::topology::NodeStatus::Draining) {
return Err((StatusCode::BAD_REQUEST, format!(
"Node {} is not in draining state (current: {:?}), use force=true to bypass",
node_id, node_status
)));
}
// Remove node from topology
{
let mut topo = app_state.topology.write().await;
topo.remove_node(&miroir_core::topology::NodeId::new(node_id.clone()));
}
info!(node_id = %node_id, "Node removal completed");
Ok(Json(serde_json::json!({
"node_id": node_id,
"message": "Node removed from cluster",
})))
}
/// POST /_miroir/nodes/{id}/drain — Drain a node (prepare for removal).
@ -994,26 +1089,48 @@ where
{
let app_state = AppState::from_ref(&state);
let rebalancer = app_state.rebalancer.as_ref()
.ok_or_else(|| (StatusCode::SERVICE_UNAVAILABLE, "Rebalancer not initialized".into()))?;
use miroir_core::rebalancer::DrainNodeRequest;
let request = DrainNodeRequest { node_id: node_id.clone() };
match rebalancer.drain_node(request).await {
Ok(result) => {
info!(node_id = %node_id, migrations = result.migrations_count, "Node drain started");
Ok(Json(serde_json::json!({
"operation_id": result.id,
"message": result.message,
"migrations_count": result.migrations_count,
})))
// Check if node exists and get its replica group
let (node_exists, replica_group) = {
let topo = app_state.topology.read().await;
let node = topo.node(&miroir_core::topology::NodeId::new(node_id.clone()));
match node {
Some(n) => {
if n.status == miroir_core::topology::NodeStatus::Draining {
return Err((StatusCode::CONFLICT, format!("Node {} is already draining", node_id)));
}
(true, n.replica_group)
}
None => return Err((StatusCode::NOT_FOUND, format!("Node {} not found", node_id))),
}
Err(e) => {
error!(error = %e, node_id = %node_id, "Node drain failed");
Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
};
// Mark node as draining
{
let mut topo = app_state.topology.write().await;
let node_id_obj = miroir_core::topology::NodeId::new(node_id.clone());
if let Some(node) = topo.node_mut(&node_id_obj) {
node.status = miroir_core::topology::NodeStatus::Draining;
}
}
// Send event to rebalancer worker if available
if let Some(ref worker) = app_state.rebalancer_worker {
use miroir_core::rebalancer_worker::TopologyChangeEvent;
let event = TopologyChangeEvent::NodeDraining {
node_id: node_id.clone(),
replica_group,
index_uid: "default".to_string(),
};
let _ = worker.event_sender().try_send(event);
info!(node_id = %node_id, replica_group, "Sent NodeDraining event to rebalancer worker");
}
info!(node_id = %node_id, replica_group, "Node drain initiated");
Ok(Json(serde_json::json!({
"node_id": node_id,
"replica_group": replica_group,
"message": "Node drain initiated - rebalancer worker will handle migration",
})))
}
/// GET /_miroir/rebalance/status — Get current rebalance status.
@ -1026,21 +1143,34 @@ where
{
let app_state = AppState::from_ref(&state);
let rebalancer = app_state.rebalancer.as_ref()
.ok_or_else(|| (StatusCode::SERVICE_UNAVAILABLE, "Rebalancer not initialized".into()))?;
// Get rebalancer status if available
let rebalancer_status = if let Some(ref rebalancer) = app_state.rebalancer {
let status = rebalancer.status().await;
let metrics = rebalancer.metrics.read().await;
Some(serde_json::json!({
"in_progress": status.in_progress,
"operations": status.operations,
"migrations": status.migrations,
"metrics": {
"documents_migrated_total": metrics.documents_migrated_total,
"active_migrations": metrics.active_migrations,
"current_duration_secs": metrics.current_duration_secs(),
},
}))
} else {
None
};
let status = rebalancer.status().await;
let metrics = rebalancer.metrics.read().await;
// Get worker status if available
let worker_status = if let Some(ref worker) = app_state.rebalancer_worker {
Some(worker.get_status().await)
} else {
None
};
Ok(Json(serde_json::json!({
"in_progress": status.in_progress,
"operations": status.operations,
"migrations": status.migrations,
"metrics": {
"documents_migrated_total": metrics.documents_migrated_total,
"active_migrations": metrics.active_migrations,
"current_duration_secs": metrics.current_duration_secs(),
},
"rebalancer": rebalancer_status,
"worker": worker_status,
})))
}