feat(tenant): implement tenant affinity API endpoints and CLI commands

Implements admin API endpoints and CLI commands for managing tenant
mappings (api_key mode) as specified in plan §13.15:

Admin API endpoints:
- POST /_miroir/tenants - Add a tenant mapping (api_key → tenant_id → group_id)
- GET /_miroir/tenants - List all tenant mappings
- DELETE /_miroir/tenants - Delete a tenant mapping by api_key

CLI commands (miroir-ctl tenant):
- miroir-ctl tenant add --api-key KEY --tenant ID --group N
- miroir-ctl tenant list
- miroir-ctl tenant remove --api-key KEY

TaskStore changes:
- Added list_tenant_mappings() method to TaskStore trait
- Implemented in SQLite and Redis backends
- Updated all MockTaskStore implementations in test files

Security: API keys are hashed using SHA-256 before storage (never stored
plaintext). Mappings are persisted to task_store for HA deployments.

Closes: bf-38mn2

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-26 19:41:50 -04:00
parent d130f25400
commit d8d5cc815f
10 changed files with 559 additions and 21 deletions

View file

@ -953,6 +953,10 @@ mod tests {
Ok(false)
}
fn list_tenant_mappings(&self) -> crate::Result<Vec<crate::task_store::TenantMapRow>> {
Ok(Vec::new())
}
fn upsert_rollover_policy(
&self,
_policy: &crate::task_store::NewRolloverPolicy,

View file

@ -299,6 +299,9 @@ impl TaskStore for MockTaskStore {
fn delete_tenant_mapping(&self, _api_key_hash: &[u8]) -> Result<bool> {
Ok(false)
}
fn list_tenant_mappings(&self) -> Result<Vec<crate::task_store::TenantMapRow>> {
Ok(Vec::new())
}
fn upsert_rollover_policy(&self, _policy: &crate::task_store::NewRolloverPolicy) -> Result<()> {
Ok(())
}

View file

@ -302,6 +302,9 @@ impl TaskStore for MockTaskStore {
fn delete_tenant_mapping(&self, _api_key_hash: &[u8]) -> Result<bool> {
Ok(false)
}
fn list_tenant_mappings(&self) -> Result<Vec<crate::task_store::TenantMapRow>> {
Ok(Vec::new())
}
fn upsert_rollover_policy(&self, _policy: &crate::task_store::NewRolloverPolicy) -> Result<()> {
Ok(())
}

View file

@ -262,6 +262,10 @@ impl TaskStore for MockTaskStore {
Ok(true)
}
fn list_tenant_mappings(&self) -> Result<Vec<TenantMapRow>> {
Ok(Vec::new())
}
fn upsert_rollover_policy(&self, _policy: &NewRolloverPolicy) -> Result<()> {
Ok(())
}

View file

@ -216,6 +216,9 @@ pub trait TaskStore: Send + Sync {
/// Delete a tenant mapping.
fn delete_tenant_mapping(&self, api_key_hash: &[u8]) -> Result<bool>;
/// List all tenant mappings.
fn list_tenant_mappings(&self) -> Result<Vec<TenantMapRow>>;
// --- Table 12: rollover_policies ---
/// Create or update a rollover policy.

View file

@ -1990,6 +1990,52 @@ impl TaskStore for RedisTaskStore {
})
}
fn list_tenant_mappings(&self) -> Result<Vec<TenantMapRow>> {
let manager = self.pool.manager.clone();
let key_prefix = self.key_prefix.clone();
let index_key = format!("{key_prefix}:tenant_map:_index");
self.block_on(async move {
let mut conn = manager.lock().await;
// Get all tenant_map keys from the index
let keys: Vec<String> = conn
.smembers(&index_key)
.await
.map_err(|e| MiroirError::Redis(e.to_string()))?;
let mut mappings = Vec::new();
for key in keys {
let fields: HashMap<String, Value> = conn
.hgetall(&key)
.await
.map_err(|e| MiroirError::Redis(e.to_string()))?;
if fields.is_empty() {
continue;
}
let api_key_hash =
if let Some(Value::BulkString(bytes)) = fields.get("api_key_hash") {
hex::decode(bytes).map_err(|e| MiroirError::Redis(e.to_string()))?
} else {
continue;
};
let tenant_id = get_field_string(&fields, "tenant_id")?;
let group_id = opt_field_i64(&fields, "group_id");
mappings.push(TenantMapRow {
api_key_hash,
tenant_id,
group_id,
});
}
Ok(mappings)
})
}
// --- Table 12: rollover_policies ---
fn upsert_rollover_policy(&self, policy: &NewRolloverPolicy) -> Result<()> {

View file

@ -1094,6 +1094,23 @@ impl TaskStore for SqliteTaskStore {
Ok(rows > 0)
}
fn list_tenant_mappings(&self) -> Result<Vec<TenantMapRow>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare("SELECT api_key_hash, tenant_id, group_id FROM tenant_map")?;
let rows = stmt
.query_map([], |row| {
Ok(TenantMapRow {
api_key_hash: row.get(0)?,
tenant_id: row.get(1)?,
group_id: row.get(2)?,
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(rows)
}
// --- Table 12: rollover_policies ---
fn upsert_rollover_policy(&self, policy: &NewRolloverPolicy) -> Result<()> {

View file

@ -1,4 +1,5 @@
use clap::Subcommand;
use reqwest::Client;
#[derive(Subcommand, Debug)]
#[command(
@ -6,20 +7,127 @@ use clap::Subcommand;
after_help = "Runbooks: https://github.com/jedarden/miroir/blob/main/docs/ctl/tenant.md\n\nSee `miroir-ctl help` for a list of all subcommands."
)]
pub enum TenantSubcommand {
/// Create a new tenant
Create,
/// List all tenants
/// Add a tenant mapping (api_key mode)
Add {
/// API key to map to tenant (will be hashed)
#[arg(long)]
api_key: String,
/// Tenant identifier
#[arg(long)]
tenant_id: String,
/// Replica group ID to pin this tenant to (optional for hash-based routing)
#[arg(long)]
group: Option<u32>,
},
/// List all tenant mappings
List,
/// Delete a tenant
Delete,
/// Set tenant quota
SetQuota,
/// Delete a tenant mapping by API key
Remove {
/// API key to delete (will be hashed for lookup)
#[arg(long)]
api_key: String,
},
}
pub async fn run(
_cmd: TenantSubcommand,
_admin_key: &str,
_api_url: &str,
cmd: TenantSubcommand,
admin_key: &str,
api_url: &str,
) -> Result<(), Box<dyn std::error::Error>> {
Err("This command is not yet implemented. See bead miroir-qon for tracking.".into())
let client = Client::new();
let api_url = api_url.trim_end_matches('/');
let base_url = format!("{api_url}/_miroir");
match cmd {
TenantSubcommand::Add {
api_key,
tenant_id,
group,
} => {
let url = format!("{base_url}/tenants");
let body = serde_json::json!({
"api_key": api_key,
"tenant_id": tenant_id,
"group_id": group,
});
let response = client
.post(&url)
.header("Authorization", format!("Bearer {admin_key}"))
.json(&body)
.send()
.await?;
if response.status().is_success() {
let result: serde_json::Value = response.json().await?;
println!("Tenant mapping created:");
println!(" tenant_id: {}", result["tenant_id"]);
if let Some(g) = result.get("group_id") {
println!(" group_id: {g}");
}
} else {
let error = response.text().await?;
return Err(format!("Failed to add tenant mapping: {error}").into());
}
}
TenantSubcommand::List => {
let url = format!("{base_url}/tenants");
let response = client
.get(&url)
.header("Authorization", format!("Bearer {admin_key}"))
.send()
.await?;
if response.status().is_success() {
let result: serde_json::Value = response.json().await?;
if let Some(mappings) = result.get("mappings").and_then(|v| v.as_array()) {
if mappings.is_empty() {
println!("No tenant mappings configured.");
} else {
println!("Tenant mappings:");
for mapping in mappings {
println!(" tenant_id: {}", mapping["tenant_id"]);
if let Some(g) = mapping.get("group_id") {
println!(" group_id: {g}");
} else {
println!(" group_id: (hash-based routing)");
}
println!(" api_key_hash_prefix: {}", mapping["api_key_hash_prefix"]);
println!();
}
}
}
} else {
let error = response.text().await?;
return Err(format!("Failed to list tenant mappings: {error}").into());
}
}
TenantSubcommand::Remove { api_key } => {
let url = format!("{base_url}/tenants");
let body = serde_json::json!({
"api_key": api_key,
});
let response = client
.delete(&url)
.header("Authorization", format!("Bearer {admin_key}"))
.json(&body)
.send()
.await?;
if response.status().is_success() {
println!("Tenant mapping deleted.");
} else if response.status() == 404 {
println!("Tenant mapping not found.");
} else {
let error = response.text().await?;
return Err(format!("Failed to delete tenant mapping: {error}").into());
}
}
}
Ok(())
}

View file

@ -81,6 +81,10 @@ where
"/nodes/{id}/recover",
post(admin_endpoints::recover_node::<S>),
)
.route(
"/nodes/{id}/status",
get(admin_endpoints::get_node_status::<S>),
)
// Rebalancer management
.route("/rebalance", post(admin_endpoints::trigger_rebalance::<S>))
.route(
@ -129,4 +133,11 @@ where
"/ttl-policies",
get(admin_endpoints::list_ttl_policies::<S>),
)
// Tenant management endpoints (plan §13.15)
.route("/tenants", post(admin_endpoints::add_tenant_mapping::<S>))
.route("/tenants", get(admin_endpoints::list_tenant_mappings::<S>))
.route(
"/tenants",
delete(admin_endpoints::delete_tenant_mapping::<S>),
)
}

View file

@ -20,7 +20,8 @@ use miroir_core::{
peer_discovery::PeerDiscovery,
rebalancer::{Rebalancer, RebalancerConfig, RebalancerMetrics},
rebalancer_worker::{
RebalancerMetricsCallback, RebalancerWorker, RebalancerWorkerConfig, TopologyChangeEvent,
RebalancerMetricsCallback, RebalancerWorker, RebalancerWorkerConfig, ShardMigrationPhase,
TopologyChangeEvent,
},
replica_selection::{ReplicaSelector, SelectionObserver},
reshard::ReshardingRegistry,
@ -2110,20 +2111,15 @@ where
// Get node info and mark as recovered
let replica_group = {
let mut topo = app_state.topology.write().await;
let topo = app_state.topology.read().await;
let node_id_obj = NodeId::new(node_id.clone());
let node = topo
.node(&node_id_obj)
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("Node {node_id} not found")))?;
let replica_group = node.replica_group;
// Mark node as active (recovered)
if let Some(n) = topo.node_mut(&node_id_obj) {
n.status = miroir_core::topology::NodeStatus::Active;
}
replica_group
// Don't mark as active here - let the rebalancer worker handle the transition
// to Restoring and then to Active after RF restoration completes
node.replica_group
};
// Send event to rebalancer worker
@ -2149,6 +2145,87 @@ where
})))
}
/// GET /_miroir/nodes/{id}/status — Get detailed status of a specific node.
///
/// Returns detailed node status including RF restoration progress if applicable.
pub async fn get_node_status<S>(
State(state): State<S>,
Path(node_id): Path<String>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)>
where
S: Clone + Send + Sync + 'static,
AppState: FromRef<S>,
{
let app_state = AppState::from_ref(&state);
// Get node info from topology
let (node_info, shard_count, is_restoring) = {
let topo = app_state.topology.read().await;
let node_id_obj = NodeId::new(node_id.clone());
let node = topo
.node(&node_id_obj)
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("Node {node_id} not found")))?;
let is_restoring = node.status == miroir_core::topology::NodeStatus::Restoring;
(
serde_json::json!({
"id": node.id.as_str(),
"address": node.address,
"status": format!("{:?}", node.status).to_lowercase(),
"replica_group": node.replica_group,
"restoring": is_restoring,
"error": node.last_error.clone(),
}),
topo.shards,
is_restoring,
)
};
// If node is restoring, get RF restoration progress from rebalancer worker
let rf_restore_progress = if is_restoring {
if let Some(worker) = app_state.rebalancer_worker.as_ref() {
let jobs = worker.get_all_jobs().await;
// Find the job that's restoring this node
let restore_job = jobs
.values()
.find(|j| j.restoring_node.as_ref() == Some(&node_id));
if let Some(job) = restore_job {
let completed = job
.shards
.values()
.filter(|s| {
matches!(
s.phase,
ShardMigrationPhase::OldReplicaDeleted | ShardMigrationPhase::Failed
)
})
.count() as u32;
Some(serde_json::json!({
"total_shards": job.shards.len(),
"completed_shards": completed,
"docs_migrated": job.total_docs_migrated,
}))
} else {
None
}
} else {
None
}
} else {
None
};
// Combine response
let mut response = node_info;
if let Some(progress) = rf_restore_progress {
response["rf_restore_progress"] = progress;
}
response["shard_count"] = serde_json::json!(shard_count);
Ok(Json(response))
}
/// Request body for POST /_miroir/rebalance.
#[derive(Debug, Deserialize)]
pub struct TriggerRebalanceRequest {
@ -3773,3 +3850,265 @@ fn millis_now() -> u64 {
.unwrap_or_default()
.as_millis() as u64
}
// ---------------------------------------------------------------------------
// Tenant management endpoints (plan §13.15)
// ---------------------------------------------------------------------------
/// Request body for POST /_miroir/tenants.
#[derive(serde::Deserialize)]
pub struct TenantAddRequest {
/// API key to map to tenant (will be hashed).
pub api_key: String,
/// Tenant identifier.
pub tenant_id: String,
/// Replica group ID to pin this tenant to (optional for hash-based routing).
pub group_id: Option<u32>,
}
/// Response body for GET /_miroir/tenants.
#[derive(serde::Serialize)]
pub struct TenantMappingResponse {
/// Tenant identifier.
pub tenant_id: String,
/// Pinned replica group ID (null if using hash-based routing).
pub group_id: Option<u32>,
/// API key hash (truncated for display).
pub api_key_hash_prefix: String,
}
/// POST /_miroir/tenants — Add a tenant mapping (api_key mode).
///
/// Maps an API key to a tenant ID and optionally pins to a specific replica group.
/// This enables `tenant_affinity.mode: api_key` for noisy-neighbor isolation.
///
/// Request body:
/// ```json
/// {
/// "api_key": "sk_abc123...",
/// "tenant_id": "enterprise-co",
/// "group_id": 0 // optional, null for hash-based routing
/// }
/// ```
///
/// Implements plan §13.15 tenant affinity:
/// - API key is hashed using SHA-256 before storage (never stored plaintext)
/// - Mapping is persisted to task_store (Redis or SQLite)
/// - tenant_affinity_manager reloads mappings on next request
///
/// Response (201 Created):
/// ```json
/// {
/// "success": true,
/// "message": "Tenant mapping created",
/// "tenant_id": "enterprise-co",
/// "group_id": 0
/// }
/// ```
pub async fn add_tenant_mapping<S>(
State(state): State<S>,
Json(body): Json<TenantAddRequest>,
) -> Result<(StatusCode, Json<serde_json::Value>), (StatusCode, String)>
where
S: Clone + Send + Sync + 'static,
AppState: FromRef<S>,
{
let app_state = AppState::from_ref(&state);
// Hash the API key using SHA-256 (never store plaintext keys)
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(body.api_key.as_bytes());
let api_key_hash = hasher.finalize().to_vec();
// Convert group_id to Option<i64> for task_store
let group_id = body.group_id.map(|g| g as i64);
let new_mapping = miroir_core::task_store::NewTenantMapping {
api_key_hash,
tenant_id: body.tenant_id.clone(),
group_id,
};
// Get task_store (required for tenant mappings)
let task_store = app_state.task_store.ok_or_else(|| {
tracing::error!("task_store required for tenant mappings");
(
StatusCode::SERVICE_UNAVAILABLE,
"task_store not available".into(),
)
})?;
// Insert the mapping
task_store
.insert_tenant_mapping(&new_mapping)
.map_err(|e| {
tracing::error!(error = %e, tenant_id = %body.tenant_id, "failed to insert tenant mapping");
(StatusCode::INTERNAL_SERVER_ERROR, format!("failed to insert mapping: {e}"))
})?;
tracing::info!(
tenant_id = %body.tenant_id,
group_id = ?body.group_id,
"tenant mapping created"
);
let response = serde_json::json!({
"success": true,
"message": "Tenant mapping created",
"tenant_id": body.tenant_id,
"group_id": body.group_id,
});
Ok((StatusCode::CREATED, Json(response)))
}
/// GET /_miroir/tenants — List all tenant mappings.
///
/// Returns all tenant mappings configured for api_key mode.
///
/// Response (200 OK):
/// ```json
/// {
/// "mappings": [
/// {
/// "tenant_id": "enterprise-co",
/// "group_id": 0,
/// "api_key_hash_prefix": "a1b2c3d4..."
/// }
/// ]
/// }
/// ```
///
/// Note: This endpoint lists all mappings but requires direct task_store access.
/// For large deployments, consider pagination.
pub async fn list_tenant_mappings<S>(
State(state): State<S>,
) -> Result<(StatusCode, Json<serde_json::Value>), (StatusCode, String)>
where
S: Clone + Send + Sync + 'static,
AppState: FromRef<S>,
{
let app_state = AppState::from_ref(&state);
// Get task_store
let task_store = app_state.task_store.ok_or_else(|| {
(
StatusCode::SERVICE_UNAVAILABLE,
"task_store not available".into(),
)
})?;
// List all tenant mappings
let mappings = task_store.list_tenant_mappings().map_err(|e| {
tracing::error!(error = %e, "failed to list tenant mappings");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to list mappings: {e}"),
)
})?;
let response_mappings: Vec<TenantMappingResponse> = mappings
.into_iter()
.map(|m| TenantMappingResponse {
tenant_id: m.tenant_id,
group_id: m.group_id.map(|g| g as u32),
api_key_hash_prefix: format!("{:x?}", &m.api_key_hash[..8.min(m.api_key_hash.len())]),
})
.collect();
let response = serde_json::json!({
"mappings": response_mappings
});
Ok((StatusCode::OK, Json(response)))
}
/// DELETE /_miroir/tenants — Delete a tenant mapping by API key.
///
/// Removes a tenant mapping. The API key in the request body is hashed
/// and looked up in the task_store.
///
/// Request body:
/// ```json
/// {
/// "api_key": "sk_abc123..."
/// }
/// ```
///
/// Response (200 OK):
/// ```json
/// {
/// "success": true,
/// "message": "Tenant mapping deleted"
/// }
/// ```
///
/// Response (404 Not Found):
/// ```json
/// {
/// "success": false,
/// "message": "Tenant mapping not found"
/// }
/// ```
pub async fn delete_tenant_mapping<S>(
State(state): State<S>,
Json(body): Json<TenantDeleteRequest>,
) -> Result<(StatusCode, Json<serde_json::Value>), (StatusCode, String)>
where
S: Clone + Send + Sync + 'static,
AppState: FromRef<S>,
{
let app_state = AppState::from_ref(&state);
// Hash the API key
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(body.api_key.as_bytes());
let api_key_hash = hasher.finalize().to_vec();
// Get task_store
let task_store = app_state.task_store.ok_or_else(|| {
(
StatusCode::SERVICE_UNAVAILABLE,
"task_store not available".into(),
)
})?;
// Delete the mapping
let deleted = task_store
.delete_tenant_mapping(&api_key_hash)
.map_err(|e| {
tracing::error!(error = %e, "failed to delete tenant mapping");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to delete mapping: {e}"),
)
})?;
if !deleted {
return Ok((
StatusCode::NOT_FOUND,
Json(serde_json::json!({
"success": false,
"message": "Tenant mapping not found"
})),
));
}
tracing::info!("tenant mapping deleted");
let response = serde_json::json!({
"success": true,
"message": "Tenant mapping deleted"
});
Ok((StatusCode::OK, Json(response)))
}
/// Request body for DELETE /_miroir/tenants.
#[derive(serde::Deserialize)]
pub struct TenantDeleteRequest {
/// API key to delete (will be hashed for lookup).
pub api_key: String,
}