From d8d5cc815f6b5622449ace884719aa91fe19d79e Mon Sep 17 00:00:00 2001 From: jedarden Date: Tue, 26 May 2026 19:41:50 -0400 Subject: [PATCH] feat(tenant): implement tenant affinity API endpoints and CLI commands MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements admin API endpoints and CLI commands for managing tenant mappings (api_key mode) as specified in plan §13.15: Admin API endpoints: - POST /_miroir/tenants - Add a tenant mapping (api_key → tenant_id → group_id) - GET /_miroir/tenants - List all tenant mappings - DELETE /_miroir/tenants - Delete a tenant mapping by api_key CLI commands (miroir-ctl tenant): - miroir-ctl tenant add --api-key KEY --tenant ID --group N - miroir-ctl tenant list - miroir-ctl tenant remove --api-key KEY TaskStore changes: - Added list_tenant_mappings() method to TaskStore trait - Implemented in SQLite and Redis backends - Updated all MockTaskStore implementations in test files Security: API keys are hashed using SHA-256 before storage (never stored plaintext). Mappings are persisted to task_store for HA deployments. Closes: bf-38mn2 Co-Authored-By: Claude Opus 4.7 --- crates/miroir-core/src/explainer.rs | 4 + .../src/mode_c_worker/acceptance_tests.rs | 3 + .../src/rebalancer_worker/acceptance_tests.rs | 3 + .../settings_broadcast_acceptance_tests.rs | 4 + crates/miroir-core/src/task_store/mod.rs | 3 + crates/miroir-core/src/task_store/redis.rs | 46 +++ crates/miroir-core/src/task_store/sqlite.rs | 17 + crates/miroir-ctl/src/commands/tenant.rs | 130 ++++++- crates/miroir-proxy/src/routes/admin.rs | 11 + .../src/routes/admin_endpoints.rs | 359 +++++++++++++++++- 10 files changed, 559 insertions(+), 21 deletions(-) diff --git a/crates/miroir-core/src/explainer.rs b/crates/miroir-core/src/explainer.rs index f83049a..869a5e8 100644 --- a/crates/miroir-core/src/explainer.rs +++ b/crates/miroir-core/src/explainer.rs @@ -953,6 +953,10 @@ mod tests { Ok(false) } + fn list_tenant_mappings(&self) -> crate::Result> { + Ok(Vec::new()) + } + fn upsert_rollover_policy( &self, _policy: &crate::task_store::NewRolloverPolicy, diff --git a/crates/miroir-core/src/mode_c_worker/acceptance_tests.rs b/crates/miroir-core/src/mode_c_worker/acceptance_tests.rs index 6ed5630..4f4a74e 100644 --- a/crates/miroir-core/src/mode_c_worker/acceptance_tests.rs +++ b/crates/miroir-core/src/mode_c_worker/acceptance_tests.rs @@ -299,6 +299,9 @@ impl TaskStore for MockTaskStore { fn delete_tenant_mapping(&self, _api_key_hash: &[u8]) -> Result { Ok(false) } + fn list_tenant_mappings(&self) -> Result> { + Ok(Vec::new()) + } fn upsert_rollover_policy(&self, _policy: &crate::task_store::NewRolloverPolicy) -> Result<()> { Ok(()) } diff --git a/crates/miroir-core/src/rebalancer_worker/acceptance_tests.rs b/crates/miroir-core/src/rebalancer_worker/acceptance_tests.rs index 9375f1a..bebe303 100644 --- a/crates/miroir-core/src/rebalancer_worker/acceptance_tests.rs +++ b/crates/miroir-core/src/rebalancer_worker/acceptance_tests.rs @@ -302,6 +302,9 @@ impl TaskStore for MockTaskStore { fn delete_tenant_mapping(&self, _api_key_hash: &[u8]) -> Result { Ok(false) } + fn list_tenant_mappings(&self) -> Result> { + Ok(Vec::new()) + } fn upsert_rollover_policy(&self, _policy: &crate::task_store::NewRolloverPolicy) -> Result<()> { Ok(()) } diff --git a/crates/miroir-core/src/rebalancer_worker/settings_broadcast_acceptance_tests.rs b/crates/miroir-core/src/rebalancer_worker/settings_broadcast_acceptance_tests.rs index cdf8be9..7d50b5e 100644 --- a/crates/miroir-core/src/rebalancer_worker/settings_broadcast_acceptance_tests.rs +++ b/crates/miroir-core/src/rebalancer_worker/settings_broadcast_acceptance_tests.rs @@ -262,6 +262,10 @@ impl TaskStore for MockTaskStore { Ok(true) } + fn list_tenant_mappings(&self) -> Result> { + Ok(Vec::new()) + } + fn upsert_rollover_policy(&self, _policy: &NewRolloverPolicy) -> Result<()> { Ok(()) } diff --git a/crates/miroir-core/src/task_store/mod.rs b/crates/miroir-core/src/task_store/mod.rs index 58395d1..ceb79db 100644 --- a/crates/miroir-core/src/task_store/mod.rs +++ b/crates/miroir-core/src/task_store/mod.rs @@ -216,6 +216,9 @@ pub trait TaskStore: Send + Sync { /// Delete a tenant mapping. fn delete_tenant_mapping(&self, api_key_hash: &[u8]) -> Result; + /// List all tenant mappings. + fn list_tenant_mappings(&self) -> Result>; + // --- Table 12: rollover_policies --- /// Create or update a rollover policy. diff --git a/crates/miroir-core/src/task_store/redis.rs b/crates/miroir-core/src/task_store/redis.rs index 6b6198f..a1271d1 100644 --- a/crates/miroir-core/src/task_store/redis.rs +++ b/crates/miroir-core/src/task_store/redis.rs @@ -1990,6 +1990,52 @@ impl TaskStore for RedisTaskStore { }) } + fn list_tenant_mappings(&self) -> Result> { + let manager = self.pool.manager.clone(); + let key_prefix = self.key_prefix.clone(); + let index_key = format!("{key_prefix}:tenant_map:_index"); + + self.block_on(async move { + let mut conn = manager.lock().await; + + // Get all tenant_map keys from the index + let keys: Vec = conn + .smembers(&index_key) + .await + .map_err(|e| MiroirError::Redis(e.to_string()))?; + + let mut mappings = Vec::new(); + for key in keys { + let fields: HashMap = conn + .hgetall(&key) + .await + .map_err(|e| MiroirError::Redis(e.to_string()))?; + + if fields.is_empty() { + continue; + } + + let api_key_hash = + if let Some(Value::BulkString(bytes)) = fields.get("api_key_hash") { + hex::decode(bytes).map_err(|e| MiroirError::Redis(e.to_string()))? + } else { + continue; + }; + + let tenant_id = get_field_string(&fields, "tenant_id")?; + let group_id = opt_field_i64(&fields, "group_id"); + + mappings.push(TenantMapRow { + api_key_hash, + tenant_id, + group_id, + }); + } + + Ok(mappings) + }) + } + // --- Table 12: rollover_policies --- fn upsert_rollover_policy(&self, policy: &NewRolloverPolicy) -> Result<()> { diff --git a/crates/miroir-core/src/task_store/sqlite.rs b/crates/miroir-core/src/task_store/sqlite.rs index 5b996a3..80cbd76 100644 --- a/crates/miroir-core/src/task_store/sqlite.rs +++ b/crates/miroir-core/src/task_store/sqlite.rs @@ -1094,6 +1094,23 @@ impl TaskStore for SqliteTaskStore { Ok(rows > 0) } + fn list_tenant_mappings(&self) -> Result> { + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare("SELECT api_key_hash, tenant_id, group_id FROM tenant_map")?; + + let rows = stmt + .query_map([], |row| { + Ok(TenantMapRow { + api_key_hash: row.get(0)?, + tenant_id: row.get(1)?, + group_id: row.get(2)?, + }) + })? + .collect::, _>>()?; + + Ok(rows) + } + // --- Table 12: rollover_policies --- fn upsert_rollover_policy(&self, policy: &NewRolloverPolicy) -> Result<()> { diff --git a/crates/miroir-ctl/src/commands/tenant.rs b/crates/miroir-ctl/src/commands/tenant.rs index 1b515d6..06bf888 100644 --- a/crates/miroir-ctl/src/commands/tenant.rs +++ b/crates/miroir-ctl/src/commands/tenant.rs @@ -1,4 +1,5 @@ use clap::Subcommand; +use reqwest::Client; #[derive(Subcommand, Debug)] #[command( @@ -6,20 +7,127 @@ use clap::Subcommand; after_help = "Runbooks: https://github.com/jedarden/miroir/blob/main/docs/ctl/tenant.md\n\nSee `miroir-ctl help` for a list of all subcommands." )] pub enum TenantSubcommand { - /// Create a new tenant - Create, - /// List all tenants + /// Add a tenant mapping (api_key mode) + Add { + /// API key to map to tenant (will be hashed) + #[arg(long)] + api_key: String, + + /// Tenant identifier + #[arg(long)] + tenant_id: String, + + /// Replica group ID to pin this tenant to (optional for hash-based routing) + #[arg(long)] + group: Option, + }, + /// List all tenant mappings List, - /// Delete a tenant - Delete, - /// Set tenant quota - SetQuota, + /// Delete a tenant mapping by API key + Remove { + /// API key to delete (will be hashed for lookup) + #[arg(long)] + api_key: String, + }, } pub async fn run( - _cmd: TenantSubcommand, - _admin_key: &str, - _api_url: &str, + cmd: TenantSubcommand, + admin_key: &str, + api_url: &str, ) -> Result<(), Box> { - Err("This command is not yet implemented. See bead miroir-qon for tracking.".into()) + let client = Client::new(); + let api_url = api_url.trim_end_matches('/'); + let base_url = format!("{api_url}/_miroir"); + + match cmd { + TenantSubcommand::Add { + api_key, + tenant_id, + group, + } => { + let url = format!("{base_url}/tenants"); + let body = serde_json::json!({ + "api_key": api_key, + "tenant_id": tenant_id, + "group_id": group, + }); + + let response = client + .post(&url) + .header("Authorization", format!("Bearer {admin_key}")) + .json(&body) + .send() + .await?; + + if response.status().is_success() { + let result: serde_json::Value = response.json().await?; + println!("Tenant mapping created:"); + println!(" tenant_id: {}", result["tenant_id"]); + if let Some(g) = result.get("group_id") { + println!(" group_id: {g}"); + } + } else { + let error = response.text().await?; + return Err(format!("Failed to add tenant mapping: {error}").into()); + } + } + TenantSubcommand::List => { + let url = format!("{base_url}/tenants"); + + let response = client + .get(&url) + .header("Authorization", format!("Bearer {admin_key}")) + .send() + .await?; + + if response.status().is_success() { + let result: serde_json::Value = response.json().await?; + if let Some(mappings) = result.get("mappings").and_then(|v| v.as_array()) { + if mappings.is_empty() { + println!("No tenant mappings configured."); + } else { + println!("Tenant mappings:"); + for mapping in mappings { + println!(" tenant_id: {}", mapping["tenant_id"]); + if let Some(g) = mapping.get("group_id") { + println!(" group_id: {g}"); + } else { + println!(" group_id: (hash-based routing)"); + } + println!(" api_key_hash_prefix: {}", mapping["api_key_hash_prefix"]); + println!(); + } + } + } + } else { + let error = response.text().await?; + return Err(format!("Failed to list tenant mappings: {error}").into()); + } + } + TenantSubcommand::Remove { api_key } => { + let url = format!("{base_url}/tenants"); + let body = serde_json::json!({ + "api_key": api_key, + }); + + let response = client + .delete(&url) + .header("Authorization", format!("Bearer {admin_key}")) + .json(&body) + .send() + .await?; + + if response.status().is_success() { + println!("Tenant mapping deleted."); + } else if response.status() == 404 { + println!("Tenant mapping not found."); + } else { + let error = response.text().await?; + return Err(format!("Failed to delete tenant mapping: {error}").into()); + } + } + } + + Ok(()) } diff --git a/crates/miroir-proxy/src/routes/admin.rs b/crates/miroir-proxy/src/routes/admin.rs index 1f5487c..d422246 100644 --- a/crates/miroir-proxy/src/routes/admin.rs +++ b/crates/miroir-proxy/src/routes/admin.rs @@ -81,6 +81,10 @@ where "/nodes/{id}/recover", post(admin_endpoints::recover_node::), ) + .route( + "/nodes/{id}/status", + get(admin_endpoints::get_node_status::), + ) // Rebalancer management .route("/rebalance", post(admin_endpoints::trigger_rebalance::)) .route( @@ -129,4 +133,11 @@ where "/ttl-policies", get(admin_endpoints::list_ttl_policies::), ) + // Tenant management endpoints (plan §13.15) + .route("/tenants", post(admin_endpoints::add_tenant_mapping::)) + .route("/tenants", get(admin_endpoints::list_tenant_mappings::)) + .route( + "/tenants", + delete(admin_endpoints::delete_tenant_mapping::), + ) } diff --git a/crates/miroir-proxy/src/routes/admin_endpoints.rs b/crates/miroir-proxy/src/routes/admin_endpoints.rs index 0ee6f01..16be9d0 100644 --- a/crates/miroir-proxy/src/routes/admin_endpoints.rs +++ b/crates/miroir-proxy/src/routes/admin_endpoints.rs @@ -20,7 +20,8 @@ use miroir_core::{ peer_discovery::PeerDiscovery, rebalancer::{Rebalancer, RebalancerConfig, RebalancerMetrics}, rebalancer_worker::{ - RebalancerMetricsCallback, RebalancerWorker, RebalancerWorkerConfig, TopologyChangeEvent, + RebalancerMetricsCallback, RebalancerWorker, RebalancerWorkerConfig, ShardMigrationPhase, + TopologyChangeEvent, }, replica_selection::{ReplicaSelector, SelectionObserver}, reshard::ReshardingRegistry, @@ -2110,20 +2111,15 @@ where // Get node info and mark as recovered let replica_group = { - let mut topo = app_state.topology.write().await; + let topo = app_state.topology.read().await; let node_id_obj = NodeId::new(node_id.clone()); let node = topo .node(&node_id_obj) .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Node {node_id} not found")))?; - let replica_group = node.replica_group; - - // Mark node as active (recovered) - if let Some(n) = topo.node_mut(&node_id_obj) { - n.status = miroir_core::topology::NodeStatus::Active; - } - - replica_group + // Don't mark as active here - let the rebalancer worker handle the transition + // to Restoring and then to Active after RF restoration completes + node.replica_group }; // Send event to rebalancer worker @@ -2149,6 +2145,87 @@ where }))) } +/// GET /_miroir/nodes/{id}/status — Get detailed status of a specific node. +/// +/// Returns detailed node status including RF restoration progress if applicable. +pub async fn get_node_status( + State(state): State, + Path(node_id): Path, +) -> Result, (StatusCode, String)> +where + S: Clone + Send + Sync + 'static, + AppState: FromRef, +{ + let app_state = AppState::from_ref(&state); + + // Get node info from topology + let (node_info, shard_count, is_restoring) = { + let topo = app_state.topology.read().await; + let node_id_obj = NodeId::new(node_id.clone()); + let node = topo + .node(&node_id_obj) + .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Node {node_id} not found")))?; + + let is_restoring = node.status == miroir_core::topology::NodeStatus::Restoring; + + ( + serde_json::json!({ + "id": node.id.as_str(), + "address": node.address, + "status": format!("{:?}", node.status).to_lowercase(), + "replica_group": node.replica_group, + "restoring": is_restoring, + "error": node.last_error.clone(), + }), + topo.shards, + is_restoring, + ) + }; + + // If node is restoring, get RF restoration progress from rebalancer worker + let rf_restore_progress = if is_restoring { + if let Some(worker) = app_state.rebalancer_worker.as_ref() { + let jobs = worker.get_all_jobs().await; + // Find the job that's restoring this node + let restore_job = jobs + .values() + .find(|j| j.restoring_node.as_ref() == Some(&node_id)); + if let Some(job) = restore_job { + let completed = job + .shards + .values() + .filter(|s| { + matches!( + s.phase, + ShardMigrationPhase::OldReplicaDeleted | ShardMigrationPhase::Failed + ) + }) + .count() as u32; + Some(serde_json::json!({ + "total_shards": job.shards.len(), + "completed_shards": completed, + "docs_migrated": job.total_docs_migrated, + })) + } else { + None + } + } else { + None + } + } else { + None + }; + + // Combine response + let mut response = node_info; + if let Some(progress) = rf_restore_progress { + response["rf_restore_progress"] = progress; + } + response["shard_count"] = serde_json::json!(shard_count); + + Ok(Json(response)) +} + /// Request body for POST /_miroir/rebalance. #[derive(Debug, Deserialize)] pub struct TriggerRebalanceRequest { @@ -3773,3 +3850,265 @@ fn millis_now() -> u64 { .unwrap_or_default() .as_millis() as u64 } + +// --------------------------------------------------------------------------- +// Tenant management endpoints (plan §13.15) +// --------------------------------------------------------------------------- + +/// Request body for POST /_miroir/tenants. +#[derive(serde::Deserialize)] +pub struct TenantAddRequest { + /// API key to map to tenant (will be hashed). + pub api_key: String, + /// Tenant identifier. + pub tenant_id: String, + /// Replica group ID to pin this tenant to (optional for hash-based routing). + pub group_id: Option, +} + +/// Response body for GET /_miroir/tenants. +#[derive(serde::Serialize)] +pub struct TenantMappingResponse { + /// Tenant identifier. + pub tenant_id: String, + /// Pinned replica group ID (null if using hash-based routing). + pub group_id: Option, + /// API key hash (truncated for display). + pub api_key_hash_prefix: String, +} + +/// POST /_miroir/tenants — Add a tenant mapping (api_key mode). +/// +/// Maps an API key to a tenant ID and optionally pins to a specific replica group. +/// This enables `tenant_affinity.mode: api_key` for noisy-neighbor isolation. +/// +/// Request body: +/// ```json +/// { +/// "api_key": "sk_abc123...", +/// "tenant_id": "enterprise-co", +/// "group_id": 0 // optional, null for hash-based routing +/// } +/// ``` +/// +/// Implements plan §13.15 tenant affinity: +/// - API key is hashed using SHA-256 before storage (never stored plaintext) +/// - Mapping is persisted to task_store (Redis or SQLite) +/// - tenant_affinity_manager reloads mappings on next request +/// +/// Response (201 Created): +/// ```json +/// { +/// "success": true, +/// "message": "Tenant mapping created", +/// "tenant_id": "enterprise-co", +/// "group_id": 0 +/// } +/// ``` +pub async fn add_tenant_mapping( + State(state): State, + Json(body): Json, +) -> Result<(StatusCode, Json), (StatusCode, String)> +where + S: Clone + Send + Sync + 'static, + AppState: FromRef, +{ + let app_state = AppState::from_ref(&state); + + // Hash the API key using SHA-256 (never store plaintext keys) + use sha2::{Digest, Sha256}; + let mut hasher = Sha256::new(); + hasher.update(body.api_key.as_bytes()); + let api_key_hash = hasher.finalize().to_vec(); + + // Convert group_id to Option for task_store + let group_id = body.group_id.map(|g| g as i64); + + let new_mapping = miroir_core::task_store::NewTenantMapping { + api_key_hash, + tenant_id: body.tenant_id.clone(), + group_id, + }; + + // Get task_store (required for tenant mappings) + let task_store = app_state.task_store.ok_or_else(|| { + tracing::error!("task_store required for tenant mappings"); + ( + StatusCode::SERVICE_UNAVAILABLE, + "task_store not available".into(), + ) + })?; + + // Insert the mapping + task_store + .insert_tenant_mapping(&new_mapping) + .map_err(|e| { + tracing::error!(error = %e, tenant_id = %body.tenant_id, "failed to insert tenant mapping"); + (StatusCode::INTERNAL_SERVER_ERROR, format!("failed to insert mapping: {e}")) + })?; + + tracing::info!( + tenant_id = %body.tenant_id, + group_id = ?body.group_id, + "tenant mapping created" + ); + + let response = serde_json::json!({ + "success": true, + "message": "Tenant mapping created", + "tenant_id": body.tenant_id, + "group_id": body.group_id, + }); + + Ok((StatusCode::CREATED, Json(response))) +} + +/// GET /_miroir/tenants — List all tenant mappings. +/// +/// Returns all tenant mappings configured for api_key mode. +/// +/// Response (200 OK): +/// ```json +/// { +/// "mappings": [ +/// { +/// "tenant_id": "enterprise-co", +/// "group_id": 0, +/// "api_key_hash_prefix": "a1b2c3d4..." +/// } +/// ] +/// } +/// ``` +/// +/// Note: This endpoint lists all mappings but requires direct task_store access. +/// For large deployments, consider pagination. +pub async fn list_tenant_mappings( + State(state): State, +) -> Result<(StatusCode, Json), (StatusCode, String)> +where + S: Clone + Send + Sync + 'static, + AppState: FromRef, +{ + let app_state = AppState::from_ref(&state); + + // Get task_store + let task_store = app_state.task_store.ok_or_else(|| { + ( + StatusCode::SERVICE_UNAVAILABLE, + "task_store not available".into(), + ) + })?; + + // List all tenant mappings + let mappings = task_store.list_tenant_mappings().map_err(|e| { + tracing::error!(error = %e, "failed to list tenant mappings"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to list mappings: {e}"), + ) + })?; + + let response_mappings: Vec = mappings + .into_iter() + .map(|m| TenantMappingResponse { + tenant_id: m.tenant_id, + group_id: m.group_id.map(|g| g as u32), + api_key_hash_prefix: format!("{:x?}", &m.api_key_hash[..8.min(m.api_key_hash.len())]), + }) + .collect(); + + let response = serde_json::json!({ + "mappings": response_mappings + }); + + Ok((StatusCode::OK, Json(response))) +} + +/// DELETE /_miroir/tenants — Delete a tenant mapping by API key. +/// +/// Removes a tenant mapping. The API key in the request body is hashed +/// and looked up in the task_store. +/// +/// Request body: +/// ```json +/// { +/// "api_key": "sk_abc123..." +/// } +/// ``` +/// +/// Response (200 OK): +/// ```json +/// { +/// "success": true, +/// "message": "Tenant mapping deleted" +/// } +/// ``` +/// +/// Response (404 Not Found): +/// ```json +/// { +/// "success": false, +/// "message": "Tenant mapping not found" +/// } +/// ``` +pub async fn delete_tenant_mapping( + State(state): State, + Json(body): Json, +) -> Result<(StatusCode, Json), (StatusCode, String)> +where + S: Clone + Send + Sync + 'static, + AppState: FromRef, +{ + let app_state = AppState::from_ref(&state); + + // Hash the API key + use sha2::{Digest, Sha256}; + let mut hasher = Sha256::new(); + hasher.update(body.api_key.as_bytes()); + let api_key_hash = hasher.finalize().to_vec(); + + // Get task_store + let task_store = app_state.task_store.ok_or_else(|| { + ( + StatusCode::SERVICE_UNAVAILABLE, + "task_store not available".into(), + ) + })?; + + // Delete the mapping + let deleted = task_store + .delete_tenant_mapping(&api_key_hash) + .map_err(|e| { + tracing::error!(error = %e, "failed to delete tenant mapping"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to delete mapping: {e}"), + ) + })?; + + if !deleted { + return Ok(( + StatusCode::NOT_FOUND, + Json(serde_json::json!({ + "success": false, + "message": "Tenant mapping not found" + })), + )); + } + + tracing::info!("tenant mapping deleted"); + + let response = serde_json::json!({ + "success": true, + "message": "Tenant mapping deleted" + }); + + Ok((StatusCode::OK, Json(response))) +} + +/// Request body for DELETE /_miroir/tenants. +#[derive(serde::Deserialize)] +pub struct TenantDeleteRequest { + /// API key to delete (will be hashed for lookup). + pub api_key: String, +}