diff --git a/crates/miroir-proxy/src/routes/admin.rs b/crates/miroir-proxy/src/routes/admin.rs index 55897eb..719cb3e 100644 --- a/crates/miroir-proxy/src/routes/admin.rs +++ b/crates/miroir-proxy/src/routes/admin.rs @@ -96,4 +96,13 @@ where .route("/changes", get(cdc::get_changes::)) // Dump import routes (plan §13.9) .nest("/dumps", dumps::routes()) + // Resharding endpoints (plan §13.1) + .route( + "/indexes/{uid}/reshard", + post(admin_endpoints::post_reshard::), + ) + .route( + "/indexes/{uid}/reshard/status", + get(admin_endpoints::get_reshard_status::), + ) } diff --git a/crates/miroir-proxy/src/routes/admin_endpoints.rs b/crates/miroir-proxy/src/routes/admin_endpoints.rs index 6e3abf1..eac83cd 100644 --- a/crates/miroir-proxy/src/routes/admin_endpoints.rs +++ b/crates/miroir-proxy/src/routes/admin_endpoints.rs @@ -2604,3 +2604,268 @@ where "recent_diffs_count": stats.recent_diffs_count, }))) } + +// --------------------------------------------------------------------------- +// Resharding endpoints (plan §13.1) +// --------------------------------------------------------------------------- + +/// Request body for POST /_miroir/indexes/{uid}/reshard. +#[derive(Debug, Deserialize)] +pub struct ReshardRequest { + /// New shard count (S_new). + pub new_shards: u32, + /// Backfill throttle in documents per second (0 = unlimited). + #[serde(default = "default_throttle")] + pub throttle_docs_per_sec: u64, +} + +fn default_throttle() -> u64 { + 10000 +} + +/// Response for POST /_miroir/indexes/{uid}/reshard. +#[derive(Debug, Serialize)] +pub struct ReshardResponse { + /// Reshard operation ID. + pub operation_id: String, + /// Index being resharded. + pub index_uid: String, + /// Old shard count. + pub old_shards: u32, + /// New shard count. + pub new_shards: u32, + /// Shadow index UID. + pub shadow_index: String, + /// Current phase. + pub phase: String, + /// Started at (UNIX ms). + pub started_at: u64, +} + +/// Response for GET /_miroir/indexes/{uid}/reshard/status. +#[derive(Debug, Serialize)] +pub struct ReshardStatusResponse { + /// Whether an operation is active for this index. + pub active: bool, + /// Reshard operation details (if active). + pub operation: Option, +} + +#[derive(Debug, Serialize)] +pub struct ReshardOperationDetails { + /// Operation ID. + pub id: String, + /// Index being resharded. + pub index_uid: String, + /// Old shard count. + pub old_shards: u32, + /// New shard count. + pub new_shards: u32, + /// Current phase. + pub phase: String, + /// Documents backfilled so far. + pub documents_backfilled: u64, + /// Total documents to backfill. + pub total_documents: u64, + /// Backfill progress ratio (0.0 to 1.0). + pub backfill_progress: f64, + /// Shadow index UID. + pub shadow_index: String, + /// Started at (UNIX ms). + pub started_at: u64, + /// Last error (if any). + pub last_error: Option, + /// Verification results (if verified). + pub verification_results: Option, +} + +#[derive(Debug, Serialize)] +pub struct VerificationResultDetails { + /// Whether verification passed. + pub passed: bool, + /// Live index PK count. + pub live_pk_count: u64, + /// Shadow index PK count. + pub shadow_pk_count: u64, + /// PKs only in live index. + pub live_only_pks: Vec, + /// PKs only in shadow index. + pub shadow_only_pks: Vec, + /// PKs with content hash mismatch. + pub mismatched_pks: Vec, +} + +/// POST /_miroir/indexes/{uid}/reshard — Begin online resharding (plan §13.1). +/// +/// Request body: +/// ```json +/// { +/// "new_shards": 256, +/// "throttle_docs_per_sec": 10000 +/// } +/// ``` +/// +/// Returns the operation ID and initial state. +pub async fn post_reshard( + State(state): State, + Path(index_uid): Path, + Json(req): Json, +) -> Result, StatusCode> +where + S: Clone + Send + Sync + 'static, + AppState: FromRef, +{ + let app_state = AppState::from_ref(&state); + + // Validate new shard count + if req.new_shards == 0 { + return Err(StatusCode::BAD_REQUEST); + } + + // Check if resharding is already active for this index + let registry = app_state.resharding_registry.read().await; + if let Some(existing) = registry.get(&index_uid) { + // Return conflict if already resharding + return Err(StatusCode::CONFLICT); + } + drop(registry); + + // Get current shard count from topology + let topology = app_state.topology.read().await; + let old_shards = topology.shard_count(); + drop(topology); + + // Validate new_shards > old_shards (only scaling up is supported) + if req.new_shards <= old_shards { + return Err(StatusCode::BAD_REQUEST); + } + + // Get node addresses for shadow creation + let topology = app_state.topology.read().await; + let node_addresses: Vec = topology + .all_nodes() + .iter() + .map(|n| n.address().to_string()) + .collect(); + drop(topology); + + if node_addresses.is_empty() { + error!("no nodes available for resharding"); + return Err(StatusCode::SERVICE_UNAVAILABLE); + } + + // Create shadow index (phase 1) + let shadow_index = format!("{}__reshard_{}", index_uid, req.new_shards); + let master_key = &app_state.config.master_key; + + // Use the shadow_create_phase function from reshard module + match miroir_core::reshard::shadow_create_phase( + &index_uid, + req.new_shards, + &node_addresses, + master_key, + None, // primary_key will be copied from live index + ) + .await + { + Ok(result) => { + info!( + index_uid = %index_uid, + shadow_index = %shadow_index, + nodes_created = result.nodes_created, + "Phase 1 complete: shadow index created" + ); + + let now = millis_now(); + + // Register the resharding operation for dual-write detection + let op_state = miroir_core::reshard::ReshardOperationState { + shadow_index: shadow_index.clone(), + old_shards, + target_shards: req.new_shards, + phase: miroir_core::reshard::ReshardPhase::ShadowCreated, + started_at: now, + }; + + let mut registry = app_state.resharding_registry.write().await; + if let Err(e) = registry.register(index_uid.clone(), op_state) { + error!( + index_uid = %index_uid, + error = %e, + "failed to register resharding operation" + ); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + + // In a full implementation, we would also: + // - Persist the operation to the task store for recovery + // - Start a background task for phases 2-6 + // - Emit metrics + + Ok(Json(ReshardResponse { + operation_id: format!("reshard-{}-{}", index_uid, now), + index_uid, + old_shards, + new_shards: req.new_shards, + shadow_index, + phase: "Shadow Created".to_string(), + started_at: now, + })) + } + Err(e) => { + error!( + index_uid = %index_uid, + error = %e, + "shadow create phase failed" + ); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + } +} + +/// GET /_miroir/indexes/{uid}/reshard/status — Get resharding status. +pub async fn get_reshard_status( + State(state): State, + Path(index_uid): Path, +) -> Result, StatusCode> +where + S: Clone + Send + Sync + 'static, + AppState: FromRef, +{ + let app_state = AppState::from_ref(&state); + + let registry = app_state.resharding_registry.read().await; + let operation = registry.get(&index_uid); + + if let Some(op) = operation { + Ok(Json(ReshardStatusResponse { + active: true, + operation: Some(ReshardOperationDetails { + id: format!("reshard-{}-{}", index_uid, op.started_at), + index_uid: index_uid.clone(), + old_shards: op.old_shards, + new_shards: op.target_shards, + phase: op.phase.name().to_string(), + documents_backfilled: 0, // TODO: track progress + total_documents: 0, + backfill_progress: 0.0, + shadow_index: op.shadow_index.clone(), + started_at: op.started_at, + last_error: None, + verification_results: None, + }), + })) + } else { + Ok(Json(ReshardStatusResponse { + active: false, + operation: None, + })) + } +} + +fn millis_now() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +}