fix(admin-api): return 202 Accepted with miroir_task_id for topology ops
Update add_node and drain_node endpoints to return 202 Accepted with miroir_task_id in the response, matching the P4.6 spec. Changes: - add_node now returns 202 with miroir_task_id (rebalance:default) - drain_node now returns 202 with miroir_task_id (rebalance:default) - Both endpoints include task ID in logging for observability - Added response shape documentation to both endpoints Closes: miroir-mkk.6
This commit is contained in:
parent
bb6a1216ff
commit
86925436e4
1 changed files with 58 additions and 16 deletions
|
|
@ -1466,6 +1466,16 @@ where
|
|||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// Response (202 Accepted):
|
||||
/// ```json
|
||||
/// {
|
||||
/// "miroir_task_id": "rebalance:default",
|
||||
/// "node_id": "node-new",
|
||||
/// "replica_group": 0,
|
||||
/// "status": "accepted"
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// Implements plan §2 "Adding a node to an existing group":
|
||||
/// 1. Add node to topology in `Joining` state
|
||||
/// 2. Send `NodeAdded` event to rebalancer worker
|
||||
|
|
@ -1473,7 +1483,7 @@ where
|
|||
pub async fn add_node<S>(
|
||||
State(state): State<S>,
|
||||
Json(body): Json<serde_json::Value>,
|
||||
) -> Result<Json<serde_json::Value>, (StatusCode, String)>
|
||||
) -> Result<(StatusCode, Json<serde_json::Value>), (StatusCode, String)>
|
||||
where
|
||||
S: Clone + Send + Sync + 'static,
|
||||
AppState: FromRef<S>,
|
||||
|
|
@ -1526,11 +1536,12 @@ where
|
|||
}
|
||||
|
||||
// Send event to rebalancer worker (if available)
|
||||
let index_uid = "default";
|
||||
if let Some(ref worker) = app_state.rebalancer_worker {
|
||||
let event = TopologyChangeEvent::NodeAdded {
|
||||
node_id: id.clone(),
|
||||
replica_group,
|
||||
index_uid: "default".to_string(),
|
||||
index_uid: index_uid.to_string(),
|
||||
};
|
||||
if let Err(e) = worker.event_sender().try_send(event) {
|
||||
error!(error = %e, node_id = %id, "failed to send NodeAdded event to rebalancer worker");
|
||||
|
|
@ -1541,12 +1552,22 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
info!(node_id = %id, replica_group, "Node addition queued for rebalancing");
|
||||
Ok(Json(serde_json::json!({
|
||||
"node_id": id,
|
||||
"replica_group": replica_group,
|
||||
"message": format!("Node {} added to replica group {}, rebalancing will start shortly", id, replica_group),
|
||||
})))
|
||||
let job_id = miroir_core::rebalancer_worker::RebalanceJobId::new(index_uid);
|
||||
info!(
|
||||
node_id = %id,
|
||||
replica_group,
|
||||
miroir_task_id = %job_id.0,
|
||||
"Node addition queued for rebalancing"
|
||||
);
|
||||
Ok((
|
||||
StatusCode::ACCEPTED,
|
||||
Json(serde_json::json!({
|
||||
"miroir_task_id": job_id.0,
|
||||
"node_id": id,
|
||||
"replica_group": replica_group,
|
||||
"status": "accepted",
|
||||
})),
|
||||
))
|
||||
}
|
||||
|
||||
/// DELETE /_miroir/nodes/{id} — Remove a node from the cluster.
|
||||
|
|
@ -1629,6 +1650,16 @@ where
|
|||
|
||||
/// POST /_miroir/nodes/{id}/drain — Drain a node (prepare for removal).
|
||||
///
|
||||
/// Response (202 Accepted):
|
||||
/// ```json
|
||||
/// {
|
||||
/// "miroir_task_id": "rebalance:default",
|
||||
/// "node_id": "node-0",
|
||||
/// "replica_group": 0,
|
||||
/// "status": "draining"
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// Implements plan §2 node drain flow:
|
||||
/// 1. Mark node as `draining`
|
||||
/// 2. Send `NodeDraining` event to rebalancer worker
|
||||
|
|
@ -1636,7 +1667,7 @@ where
|
|||
pub async fn drain_node<S>(
|
||||
State(state): State<S>,
|
||||
Path(node_id): Path<String>,
|
||||
) -> Result<Json<serde_json::Value>, (StatusCode, String)>
|
||||
) -> Result<(StatusCode, Json<serde_json::Value>), (StatusCode, String)>
|
||||
where
|
||||
S: Clone + Send + Sync + 'static,
|
||||
AppState: FromRef<S>,
|
||||
|
|
@ -1688,10 +1719,11 @@ where
|
|||
};
|
||||
|
||||
// Send event to rebalancer worker
|
||||
let index_uid = "default";
|
||||
let event = TopologyChangeEvent::NodeDraining {
|
||||
node_id: node_id.clone(),
|
||||
replica_group,
|
||||
index_uid: "default".to_string(),
|
||||
index_uid: index_uid.to_string(),
|
||||
};
|
||||
|
||||
if let Err(e) = worker.event_sender().try_send(event) {
|
||||
|
|
@ -1702,12 +1734,22 @@ where
|
|||
));
|
||||
}
|
||||
|
||||
info!(node_id = %node_id, replica_group, "Node drain queued for rebalancing");
|
||||
Ok(Json(serde_json::json!({
|
||||
"node_id": node_id,
|
||||
"replica_group": replica_group,
|
||||
"message": format!("Node {} is draining, migrations will start shortly", node_id),
|
||||
})))
|
||||
let job_id = miroir_core::rebalancer_worker::RebalanceJobId::new(index_uid);
|
||||
info!(
|
||||
node_id = %node_id,
|
||||
replica_group,
|
||||
miroir_task_id = %job_id.0,
|
||||
"Node drain queued for rebalancing"
|
||||
);
|
||||
Ok((
|
||||
StatusCode::ACCEPTED,
|
||||
Json(serde_json::json!({
|
||||
"miroir_task_id": job_id.0,
|
||||
"node_id": node_id,
|
||||
"replica_group": replica_group,
|
||||
"status": "draining",
|
||||
})),
|
||||
))
|
||||
}
|
||||
|
||||
/// POST /_miroir/nodes/{id}/fail — Mark a node as failed.
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue