diff --git a/crates/miroir-proxy/src/routes/admin_endpoints.rs b/crates/miroir-proxy/src/routes/admin_endpoints.rs index 976ab7d..d1f0f16 100644 --- a/crates/miroir-proxy/src/routes/admin_endpoints.rs +++ b/crates/miroir-proxy/src/routes/admin_endpoints.rs @@ -1466,6 +1466,16 @@ where /// } /// ``` /// +/// Response (202 Accepted): +/// ```json +/// { +/// "miroir_task_id": "rebalance:default", +/// "node_id": "node-new", +/// "replica_group": 0, +/// "status": "accepted" +/// } +/// ``` +/// /// Implements plan §2 "Adding a node to an existing group": /// 1. Add node to topology in `Joining` state /// 2. Send `NodeAdded` event to rebalancer worker @@ -1473,7 +1483,7 @@ where pub async fn add_node( State(state): State, Json(body): Json, -) -> Result, (StatusCode, String)> +) -> Result<(StatusCode, Json), (StatusCode, String)> where S: Clone + Send + Sync + 'static, AppState: FromRef, @@ -1526,11 +1536,12 @@ where } // Send event to rebalancer worker (if available) + let index_uid = "default"; if let Some(ref worker) = app_state.rebalancer_worker { let event = TopologyChangeEvent::NodeAdded { node_id: id.clone(), replica_group, - index_uid: "default".to_string(), + index_uid: index_uid.to_string(), }; if let Err(e) = worker.event_sender().try_send(event) { error!(error = %e, node_id = %id, "failed to send NodeAdded event to rebalancer worker"); @@ -1541,12 +1552,22 @@ where } } - info!(node_id = %id, replica_group, "Node addition queued for rebalancing"); - Ok(Json(serde_json::json!({ - "node_id": id, - "replica_group": replica_group, - "message": format!("Node {} added to replica group {}, rebalancing will start shortly", id, replica_group), - }))) + let job_id = miroir_core::rebalancer_worker::RebalanceJobId::new(index_uid); + info!( + node_id = %id, + replica_group, + miroir_task_id = %job_id.0, + "Node addition queued for rebalancing" + ); + Ok(( + StatusCode::ACCEPTED, + Json(serde_json::json!({ + "miroir_task_id": job_id.0, + "node_id": id, + "replica_group": replica_group, + "status": "accepted", + })), + )) } /// DELETE /_miroir/nodes/{id} — Remove a node from the cluster. @@ -1629,6 +1650,16 @@ where /// POST /_miroir/nodes/{id}/drain — Drain a node (prepare for removal). /// +/// Response (202 Accepted): +/// ```json +/// { +/// "miroir_task_id": "rebalance:default", +/// "node_id": "node-0", +/// "replica_group": 0, +/// "status": "draining" +/// } +/// ``` +/// /// Implements plan §2 node drain flow: /// 1. Mark node as `draining` /// 2. Send `NodeDraining` event to rebalancer worker @@ -1636,7 +1667,7 @@ where pub async fn drain_node( State(state): State, Path(node_id): Path, -) -> Result, (StatusCode, String)> +) -> Result<(StatusCode, Json), (StatusCode, String)> where S: Clone + Send + Sync + 'static, AppState: FromRef, @@ -1688,10 +1719,11 @@ where }; // Send event to rebalancer worker + let index_uid = "default"; let event = TopologyChangeEvent::NodeDraining { node_id: node_id.clone(), replica_group, - index_uid: "default".to_string(), + index_uid: index_uid.to_string(), }; if let Err(e) = worker.event_sender().try_send(event) { @@ -1702,12 +1734,22 @@ where )); } - info!(node_id = %node_id, replica_group, "Node drain queued for rebalancing"); - Ok(Json(serde_json::json!({ - "node_id": node_id, - "replica_group": replica_group, - "message": format!("Node {} is draining, migrations will start shortly", node_id), - }))) + let job_id = miroir_core::rebalancer_worker::RebalanceJobId::new(index_uid); + info!( + node_id = %node_id, + replica_group, + miroir_task_id = %job_id.0, + "Node drain queued for rebalancing" + ); + Ok(( + StatusCode::ACCEPTED, + Json(serde_json::json!({ + "miroir_task_id": job_id.0, + "node_id": node_id, + "replica_group": replica_group, + "status": "draining", + })), + )) } /// POST /_miroir/nodes/{id}/fail — Mark a node as failed.