diff --git a/crates/miroir-core/src/reshard.rs b/crates/miroir-core/src/reshard.rs index 3263fc4..1cbc807 100644 --- a/crates/miroir-core/src/reshard.rs +++ b/crates/miroir-core/src/reshard.rs @@ -849,6 +849,125 @@ pub struct ReshardRegistry { index_ops: HashMap, } +/// In-memory registry tracking active resharding operations for dual-write detection. +/// +/// This is used by the write path to determine if an index is in dual-write phase +/// (shadow exists) and needs dual-hash routing. +#[derive(Debug, Default)] +pub struct ReshardingRegistry { + /// Map of index_uid -> active resharding state + /// When an index is in this registry with phase >= ShadowCreated, + /// writes must be dual-hashed to both live and shadow indexes. + active_operations: HashMap, +} + +/// Active resharding state for an index. +#[derive(Debug, Clone)] +pub struct ReshardOperationState { + /// Shadow index UID (e.g., "products__reshard_128") + pub shadow_index: String, + /// Old shard count + pub old_shards: u32, + /// New shard count + pub target_shards: u32, + /// Current phase + pub phase: ReshardPhase, + /// When the operation started (UNIX ms) + pub started_at: u64, +} + +impl ReshardingRegistry { + /// Create a new empty registry. + pub fn new() -> Self { + Self::default() + } + + /// Register a resharding operation for dual-write detection. + /// + /// Once registered, writes to the index will be dual-hashed to both + /// live and shadow indexes when phase >= ShadowCreated. + pub fn register( + &mut self, + index_uid: String, + state: ReshardOperationState, + ) -> Result<(), String> { + if self.active_operations.contains_key(&index_uid) { + return Err(format!( + "Resharding already in progress for index '{}'", + index_uid + )); + } + tracing::info!( + index_uid = %index_uid, + shadow_index = %state.shadow_index, + old_shards = state.old_shards, + target_shards = state.target_shards, + phase = ?state.phase, + "registered resharding operation for dual-write" + ); + self.active_operations.insert(index_uid, state); + Ok(()) + } + + /// Get the active resharding state for an index (if any). + pub fn get(&self, index_uid: &str) -> Option<&ReshardOperationState> { + self.active_operations.get(index_uid) + } + + /// Update the phase of an active resharding operation. + pub fn update_phase(&mut self, index_uid: &str, new_phase: ReshardPhase) -> Result<(), String> { + let op = self + .active_operations + .get_mut(index_uid) + .ok_or_else(|| format!("No resharding operation for index '{}'", index_uid))?; + op.phase = new_phase; + tracing::info!( + index_uid = %index_uid, + phase = ?new_phase, + "updated resharding phase" + ); + Ok(()) + } + + /// Remove a completed resharding operation. + pub fn remove(&mut self, index_uid: &str) -> Result<(), String> { + if self.active_operations.remove(index_uid).is_none() { + return Err(format!("No resharding operation for index '{}'", index_uid)); + } + tracing::info!( + index_uid = %index_uid, + "removed resharding operation from registry" + ); + Ok(()) + } + + /// Check if an index is in dual-write phase. + /// + /// Returns true if the index has an active resharding operation with + /// phase >= ShadowCreated and phase <= Swapped. + pub fn is_dual_write_active(&self, index_uid: &str) -> bool { + if let Some(op) = self.get(index_uid) { + matches!( + op.phase, + ReshardPhase::ShadowCreated + | ReshardPhase::DualWriteActive + | ReshardPhase::BackfillInProgress + | ReshardPhase::Verifying + ) + } else { + false + } + } + + /// List all active resharding operations. + pub fn list(&self) -> Vec<(String, &ReshardOperationState)> { + self.active_operations + .iter() + .map(|(k, v)| (k.clone(), v)) + .collect() + } +} + /// Leader-coordinated reshard coordinator (plan §14.5 Mode B). /// /// Acquires a per-index leader lease (scope: "reshard:") and persists @@ -1452,7 +1571,7 @@ async fn two_phase_broadcast_settings( } Ok(resp) => { let status = resp.status(); - let text = resp.text().await.unwrap_or_default(); + let _text = resp.text().await.unwrap_or_default(); Err(format!("{}: HTTP {}", address, status.as_u16())) } Err(e) => Err(format!("{}: {}", address, e)), @@ -1587,6 +1706,208 @@ async fn rollback_shadow_index( } } +// --------------------------------------------------------------------------- +// Phase 2: Dual-hash dual-write (plan §13.1 step 2) +// --------------------------------------------------------------------------- + +/// Result of preparing documents for dual-hash dual-write. +#[derive(Debug, Clone)] +pub struct DualWritePreparation { + /// Documents to write to live index (with old shard tags). + pub live_documents: Vec, + /// Documents to write to shadow index (with new shard tags). + pub shadow_documents: Vec, + /// Shadow index UID. + pub shadow_index: String, + /// Old shard count. + pub old_shards: u32, + /// New shard count. + pub target_shards: u32, +} + +/// Prepare documents for dual-hash dual-write during resharding. +/// +/// When an index is in dual-write phase (shadow exists), every write must be +/// routed to BOTH live and shadow indexes with different shard tags: +/// - Live index: `_miroir_shard = hash(pk) % S_old` +/// - Shadow index: `_miroir_shard = hash(pk) % S_new` +/// +/// Shadow writes are tagged with `_miroir_origin: "reshard_backfill"` so +/// CDC suppresses them by default (plan §13.13). +/// +/// # Arguments +/// * `documents` - Original documents from client (without _miroir_shard) +/// * `primary_key` - Primary key field name +/// * `reshard_state` - Active resharding state for the index +/// +/// # Returns +/// `Ok(DualWritePreparation)` with separate document batches for live and shadow. +/// +/// # Panics +/// Panics if any document is missing the primary key field (caller should validate first). +pub fn prepare_dual_write_documents( + documents: &[serde_json::Value], + primary_key: &str, + reshard_state: &ReshardOperationState, +) -> DualWritePreparation { + let mut live_documents = Vec::with_capacity(documents.len()); + let mut shadow_documents = Vec::with_capacity(documents.len()); + + for doc in documents { + let pk_value = doc + .get(primary_key) + .and_then(|v| v.as_str()) + .expect("primary key validation should have happened before this call"); + + // Compute old shard assignment for live index + let old_shard_id = crate::router::shard_for_key(pk_value, reshard_state.old_shards); + + // Compute new shard assignment for shadow index + let new_shard_id = crate::router::shard_for_key(pk_value, reshard_state.target_shards); + + // Clone document for live index + let mut live_doc = doc.clone(); + live_doc["_miroir_shard"] = serde_json::json!(old_shard_id); + live_documents.push(live_doc); + + // Clone document for shadow index with new shard tag + let mut shadow_doc = doc.clone(); + shadow_doc["_miroir_shard"] = serde_json::json!(new_shard_id); + shadow_documents.push(shadow_doc); + } + + DualWritePreparation { + live_documents, + shadow_documents, + shadow_index: reshard_state.shadow_index.clone(), + old_shards: reshard_state.old_shards, + target_shards: reshard_state.target_shards, + } +} + +#[cfg(test)] +mod tests_dual_write { + use super::*; + use serde_json::json; + + #[test] + fn prepare_dual_write_separates_shards() { + let documents = vec![ + json!({"id": "user:123", "name": "Alice"}), + json!({"id": "user:456", "name": "Bob"}), + ]; + + let reshard_state = ReshardOperationState { + shadow_index: "users__reshard_128".to_string(), + old_shards: 64, + target_shards: 128, + phase: ReshardPhase::ShadowCreated, + started_at: 1000, + }; + + let prep = prepare_dual_write_documents(&documents, "id", &reshard_state); + + assert_eq!(prep.live_documents.len(), 2); + assert_eq!(prep.shadow_documents.len(), 2); + assert_eq!(prep.shadow_index, "users__reshard_128"); + assert_eq!(prep.old_shards, 64); + assert_eq!(prep.target_shards, 128); + + // Verify live documents have old shard tags + for doc in &prep.live_documents { + assert!(doc.get("_miroir_shard").is_some()); + let shard = doc["_miroir_shard"].as_u64().unwrap(); + assert!(shard < 64, "live shard should be < 64"); + } + + // Verify shadow documents have new shard tags + for doc in &prep.shadow_documents { + assert!(doc.get("_miroir_shard").is_some()); + let shard = doc["_miroir_shard"].as_u64().unwrap(); + assert!(shard < 128, "shadow shard should be < 128"); + } + } + + #[test] + fn prepare_dual_write_preserves_other_fields() { + let documents = vec![json!({ + "id": "product:abc", + "name": "Widget", + "price": 19.99, + "tags": ["widget", "sale"] + })]; + + let reshard_state = ReshardOperationState { + shadow_index: "products__reshard_256".to_string(), + old_shards: 128, + target_shards: 256, + phase: ReshardPhase::DualWriteActive, + started_at: 2000, + }; + + let prep = prepare_dual_write_documents(&documents, "id", &reshard_state); + + let live_doc = &prep.live_documents[0]; + let shadow_doc = &prep.shadow_documents[0]; + + // Check that all fields are preserved + assert_eq!(live_doc["id"], "product:abc"); + assert_eq!(live_doc["name"], "Widget"); + assert_eq!(live_doc["price"], 19.99); + assert_eq!(live_doc["tags"], json!(["widget", "sale"])); + + // Shadow should have same fields except shard tag + assert_eq!(shadow_doc["id"], "product:abc"); + assert_eq!(shadow_doc["name"], "Widget"); + assert_eq!(shadow_doc["price"], 19.99); + assert_eq!(shadow_doc["tags"], json!(["widget", "sale"])); + } + + #[test] + fn prepare_dual_write_deterministic_shard_assignment() { + let documents = vec![json!({"id": "test:key"})]; + + let reshard_state = ReshardOperationState { + shadow_index: "test__reshard_32".to_string(), + old_shards: 16, + target_shards: 32, + phase: ReshardPhase::BackfillInProgress, + started_at: 3000, + }; + + // Run multiple times - should be deterministic + let prep1 = prepare_dual_write_documents(&documents, "id", &reshard_state); + let prep2 = prepare_dual_write_documents(&documents, "id", &reshard_state); + + assert_eq!( + prep1.live_documents[0]["_miroir_shard"], prep2.live_documents[0]["_miroir_shard"], + "live shard assignment should be deterministic" + ); + assert_eq!( + prep1.shadow_documents[0]["_miroir_shard"], prep2.shadow_documents[0]["_miroir_shard"], + "shadow shard assignment should be deterministic" + ); + } + + #[test] + fn prepare_dual_write_handles_empty_batch() { + let documents: Vec = vec![]; + + let reshard_state = ReshardOperationState { + shadow_index: "empty__reshard_64".to_string(), + old_shards: 32, + target_shards: 64, + phase: ReshardPhase::ShadowCreated, + started_at: 1000, + }; + + let prep = prepare_dual_write_documents(&documents, "id", &reshard_state); + + assert_eq!(prep.live_documents.len(), 0); + assert_eq!(prep.shadow_documents.len(), 0); + } +} + #[cfg(test)] mod tests_reshard_execution { use super::*; @@ -1800,3 +2121,267 @@ mod tests_shadow_create { assert!(err.to_string().contains("rollback")); } } + +// --------------------------------------------------------------------------- +// ReshardingRegistry tests (P5.1.b dual-write detection) +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests_resharding_registry { + use super::*; + + #[test] + fn registry_new_is_empty() { + let reg = ReshardingRegistry::new(); + assert!(reg.get("products").is_none()); + assert!(!reg.is_dual_write_active("products")); + } + + #[test] + fn registry_register_and_get() { + let mut reg = ReshardingRegistry::new(); + let state = ReshardOperationState { + shadow_index: "products__reshard_128".to_string(), + old_shards: 64, + target_shards: 128, + phase: ReshardPhase::ShadowCreated, + started_at: 1000, + }; + reg.register("products".to_string(), state).unwrap(); + + let retrieved = reg.get("products").unwrap(); + assert_eq!(retrieved.shadow_index, "products__reshard_128"); + assert_eq!(retrieved.old_shards, 64); + assert_eq!(retrieved.target_shards, 128); + assert_eq!(retrieved.phase, ReshardPhase::ShadowCreated); + } + + #[test] + fn registry_register_duplicate_rejected() { + let mut reg = ReshardingRegistry::new(); + let state = ReshardOperationState { + shadow_index: "products__reshard_128".to_string(), + old_shards: 64, + target_shards: 128, + phase: ReshardPhase::ShadowCreated, + started_at: 1000, + }; + reg.register("products".to_string(), state).unwrap(); + + let state2 = ReshardOperationState { + shadow_index: "products__reshard_256".to_string(), + old_shards: 128, + target_shards: 256, + phase: ReshardPhase::ShadowCreated, + started_at: 2000, + }; + assert!(reg.register("products".to_string(), state2).is_err()); + } + + #[test] + fn registry_update_phase() { + let mut reg = ReshardingRegistry::new(); + let state = ReshardOperationState { + shadow_index: "products__reshard_128".to_string(), + old_shards: 64, + target_shards: 128, + phase: ReshardPhase::ShadowCreated, + started_at: 1000, + }; + reg.register("products".to_string(), state).unwrap(); + + reg.update_phase("products", ReshardPhase::DualWriteActive) + .unwrap(); + + let retrieved = reg.get("products").unwrap(); + assert_eq!(retrieved.phase, ReshardPhase::DualWriteActive); + } + + #[test] + fn registry_update_phase_nonexistent_errors() { + let mut reg = ReshardingRegistry::new(); + assert!(reg + .update_phase("products", ReshardPhase::DualWriteActive) + .is_err()); + } + + #[test] + fn registry_remove() { + let mut reg = ReshardingRegistry::new(); + let state = ReshardOperationState { + shadow_index: "products__reshard_128".to_string(), + old_shards: 64, + target_shards: 128, + phase: ReshardPhase::ShadowCreated, + started_at: 1000, + }; + reg.register("products".to_string(), state).unwrap(); + assert!(reg.get("products").is_some()); + + reg.remove("products").unwrap(); + assert!(reg.get("products").is_none()); + } + + #[test] + fn registry_remove_nonexistent_errors() { + let mut reg = ReshardingRegistry::new(); + assert!(reg.remove("products").is_err()); + } + + #[test] + fn registry_is_dual_write_active_shadow_created() { + let mut reg = ReshardingRegistry::new(); + let state = ReshardOperationState { + shadow_index: "products__reshard_128".to_string(), + old_shards: 64, + target_shards: 128, + phase: ReshardPhase::ShadowCreated, + started_at: 1000, + }; + reg.register("products".to_string(), state).unwrap(); + assert!(reg.is_dual_write_active("products")); + } + + #[test] + fn registry_is_dual_write_active_dual_write_phase() { + let mut reg = ReshardingRegistry::new(); + let state = ReshardOperationState { + shadow_index: "products__reshard_128".to_string(), + old_shards: 64, + target_shards: 128, + phase: ReshardPhase::DualWriteActive, + started_at: 1000, + }; + reg.register("products".to_string(), state).unwrap(); + assert!(reg.is_dual_write_active("products")); + } + + #[test] + fn registry_is_dual_write_active_backfill_phase() { + let mut reg = ReshardingRegistry::new(); + let state = ReshardOperationState { + shadow_index: "products__reshard_128".to_string(), + old_shards: 64, + target_shards: 128, + phase: ReshardPhase::BackfillInProgress, + started_at: 1000, + }; + reg.register("products".to_string(), state).unwrap(); + assert!(reg.is_dual_write_active("products")); + } + + #[test] + fn registry_is_dual_write_active_verifying_phase() { + let mut reg = ReshardingRegistry::new(); + let state = ReshardOperationState { + shadow_index: "products__reshard_128".to_string(), + old_shards: 64, + target_shards: 128, + phase: ReshardPhase::Verifying, + started_at: 1000, + }; + reg.register("products".to_string(), state).unwrap(); + assert!(reg.is_dual_write_active("products")); + } + + #[test] + fn registry_is_dual_write_active_swapped_phase_false() { + let mut reg = ReshardingRegistry::new(); + let state = ReshardOperationState { + shadow_index: "products__reshard_128".to_string(), + old_shards: 64, + target_shards: 128, + phase: ReshardPhase::Swapped, + started_at: 1000, + }; + reg.register("products".to_string(), state).unwrap(); + // After swap, dual-write stops (writes go only to new index) + assert!(!reg.is_dual_write_active("products")); + } + + #[test] + fn registry_is_dual_write_active_no_operation() { + let reg = ReshardingRegistry::new(); + assert!(!reg.is_dual_write_active("products")); + } + + #[test] + fn registry_list() { + let mut reg = ReshardingRegistry::new(); + + let state1 = ReshardOperationState { + shadow_index: "products__reshard_128".to_string(), + old_shards: 64, + target_shards: 128, + phase: ReshardPhase::ShadowCreated, + started_at: 1000, + }; + reg.register("products".to_string(), state1).unwrap(); + + let state2 = ReshardOperationState { + shadow_index: "orders__reshard_256".to_string(), + old_shards: 128, + target_shards: 256, + phase: ReshardPhase::DualWriteActive, + started_at: 2000, + }; + reg.register("orders".to_string(), state2).unwrap(); + + let list = reg.list(); + assert_eq!(list.len(), 2); + + let list_map: std::collections::HashMap<_, _> = list.into_iter().collect(); + assert!(list_map.contains_key("products")); + assert!(list_map.contains_key("orders")); + assert_eq!( + list_map.get("products").unwrap().shadow_index, + "products__reshard_128" + ); + assert_eq!( + list_map.get("orders").unwrap().shadow_index, + "orders__reshard_256" + ); + } + + #[test] + fn registry_multiple_indexes_independent() { + let mut reg = ReshardingRegistry::new(); + + let products_state = ReshardOperationState { + shadow_index: "products__reshard_128".to_string(), + old_shards: 64, + target_shards: 128, + phase: ReshardPhase::DualWriteActive, + started_at: 1000, + }; + reg.register("products".to_string(), products_state) + .unwrap(); + + let orders_state = ReshardOperationState { + shadow_index: "orders__reshard_256".to_string(), + old_shards: 128, + target_shards: 256, + phase: ReshardPhase::ShadowCreated, + started_at: 2000, + }; + reg.register("orders".to_string(), orders_state).unwrap(); + + // Both should be in dual-write + assert!(reg.is_dual_write_active("products")); + assert!(reg.is_dual_write_active("orders")); + + // Update products to swapped + reg.update_phase("products", ReshardPhase::Swapped).unwrap(); + + // Now only orders should be in dual-write + assert!(!reg.is_dual_write_active("products")); + assert!(reg.is_dual_write_active("orders")); + + // Remove orders + reg.remove("orders").unwrap(); + + // Neither should be in dual-write + assert!(!reg.is_dual_write_active("products")); + assert!(!reg.is_dual_write_active("orders")); + } +} diff --git a/crates/miroir-proxy/src/routes/admin_endpoints.rs b/crates/miroir-proxy/src/routes/admin_endpoints.rs index bbf0ab0..3aca6b3 100644 --- a/crates/miroir-proxy/src/routes/admin_endpoints.rs +++ b/crates/miroir-proxy/src/routes/admin_endpoints.rs @@ -20,6 +20,7 @@ use miroir_core::{ RebalancerMetricsCallback, RebalancerWorker, RebalancerWorkerConfig, TopologyChangeEvent, }, replica_selection::{ReplicaSelector, SelectionObserver}, + reshard::ReshardingRegistry, router, scatter::{DeleteByFilterRequest, FetchDocumentsRequest, FetchDocumentsResponse, WriteRequest}, task_registry::TaskRegistryImpl, @@ -381,6 +382,9 @@ pub struct AppState { pub group_sync_worker: Option>>, /// Mode A coordinator for shard-partitioned ownership (plan §14.5 Mode A). pub mode_a_coordinator: Option>, + /// Resharding registry for tracking active resharding operations (plan §13.1). + /// Used by the write path to detect dual-write phase and route to both live and shadow indexes. + pub resharding_registry: Arc>, } impl AppState { @@ -665,10 +669,15 @@ impl AppState { // Create Mode A coordinator for shard-partitioned ownership (plan §14.5 Mode A) let mode_a_coordinator = if cfg!(feature = "peer-discovery") { let pod_name = std::env::var("POD_NAME").unwrap_or_else(|_| "unknown".to_string()); - let namespace = std::env::var("POD_NAMESPACE").unwrap_or_else(|_| "default".to_string()); + let namespace = + std::env::var("POD_NAMESPACE").unwrap_or_else(|_| "default".to_string()); let service_name = std::env::var("MIROR_SERVICE_NAME") .unwrap_or_else(|_| "miroir-headless".to_string()); - let peer_discovery = Arc::new(PeerDiscovery::new(pod_name.clone(), namespace, service_name)); + let peer_discovery = Arc::new(PeerDiscovery::new( + pod_name.clone(), + namespace, + service_name, + )); Some(Arc::new(ModeACoordinator::new(pod_name, peer_discovery))) } else { None @@ -756,6 +765,9 @@ impl AppState { group_addition_coordinator, group_sync_worker, mode_a_coordinator, + resharding_registry: Arc::new(tokio::sync::RwLock::new( + miroir_core::reshard::ReshardingRegistry::new(), + )), } } diff --git a/crates/miroir-proxy/src/routes/documents.rs b/crates/miroir-proxy/src/routes/documents.rs index 978d770..5fa301c 100644 --- a/crates/miroir-proxy/src/routes/documents.rs +++ b/crates/miroir-proxy/src/routes/documents.rs @@ -461,6 +461,28 @@ async fn write_documents_impl( } } + // 2.5. Check if index is in resharding dual-write phase (plan §13.1 step 2) + // If yes, prepare separate document batches for live and shadow indexes + let resharding_state = state.resharding_registry.read().await; + let dual_write_prep = if resharding_state.is_dual_write_active(&index_uid) { + let state = resharding_state.get(&index_uid).unwrap(); + tracing::debug!( + index_uid = %index_uid, + shadow_index = %state.shadow_index, + old_shards = state.old_shards, + target_shards = state.target_shards, + "index in resharding dual-write phase, preparing dual-hash writes" + ); + Some(miroir_core::reshard::prepare_dual_write_documents( + &documents, + &primary_key, + state, + )) + } else { + None + }; + drop(resharding_state); // Release lock before async operations + // 3. Inject _miroir_shard and _miroir_updated_at into each document let topology = state.topology.read().await; let shard_count = topology.shards; @@ -479,21 +501,49 @@ async fn write_documents_impl( None }; - for doc in &mut documents { - if let Some(pk_value) = doc.get(&primary_key).and_then(|v| v.as_str()) { - let shard_id = shard_for_key(pk_value, shard_count); - doc["_miroir_shard"] = serde_json::json!(shard_id); - } + // Handle dual-write resharding: prepare separate document batches + // If dual_write_prep is Some, use pre-computed batches with different shard tags + // Otherwise, inject shard tags inline as before + let (mut live_docs, shadow_write_info) = if let Some(prep) = dual_write_prep { + // Use pre-computed batches from prepare_dual_write_documents + // Shadow documents already have new shard tags; live documents have old shard tags + ( + prep.live_documents, + Some(( + prep.shadow_documents, + prep.shadow_index, + prep.old_shards, + prep.target_shards, + )), + ) + } else { + // Normal path: inject shard tags inline + for doc in &mut documents { + if let Some(pk_value) = doc.get(&primary_key).and_then(|v| v.as_str()) { + let shard_id = shard_for_key(pk_value, shard_count); + doc["_miroir_shard"] = serde_json::json!(shard_id); + } - // Stamp _miroir_updated_at when anti_entropy is enabled (plan §13.8) - // This happens AFTER reserved field validation, so orchestrator-controlled injection is allowed - if let Some(timestamp) = now_ms { - doc[updated_at_field] = serde_json::json!(timestamp); + // Stamp _miroir_updated_at when anti_entropy is enabled (plan §13.8) + // This happens AFTER reserved field validation, so orchestrator-controlled injection is allowed + if let Some(timestamp) = now_ms { + doc[updated_at_field] = serde_json::json!(timestamp); + } + } + (documents, None) + }; + + // Stamp _miroir_updated_at on live documents if anti_entropy is enabled + if let Some(timestamp) = now_ms { + for doc in &mut live_docs { + if doc.get(updated_at_field).is_none() { + doc[updated_at_field] = serde_json::json!(timestamp); + } } } // 4. Group documents by target nodes (per-batch grouping for efficient fan-out) - let node_documents = group_documents_by_shard(&documents, &primary_key, &topology)?; + let node_documents = group_documents_by_shard(&live_docs, &primary_key, &topology)?; // 5. Fan out to nodes and track quorum let client = HttpClient::new( @@ -560,6 +610,73 @@ async fn write_documents_impl( } } + // 5.5. Dual-write to shadow index during resharding (plan §13.1 step 2) + // Shadow writes are tagged with origin="reshard_backfill" for CDC suppression (plan §13.13) + if let Some((shadow_docs, shadow_index, old_shards, target_shards)) = shadow_write_info { + tracing::debug!( + shadow_index = %shadow_index, + docs_count = shadow_docs.len(), + "writing to shadow index during resharding dual-write phase" + ); + + // Group shadow documents by their new shard assignment for efficient fan-out + let mut shadow_node_documents: HashMap> = HashMap::new(); + for doc in &shadow_docs { + let pk_value = doc + .get(&primary_key) + .and_then(|v| v.as_str()) + .expect("primary key validation should have happened"); + + // Shadow documents already have new shard tags from prepare_dual_write_documents + let shard_id = shard_for_key(pk_value, target_shards); + shadow_node_documents + .entry(shard_id) + .or_default() + .push(doc.clone()); + } + + // Write shadow documents to all nodes (shadow index exists on all nodes) + for (_shard_id, docs) in shadow_node_documents { + for node in topology.nodes() { + let group_id = node.replica_group; + quorum_state.record_attempt(group_id, &node.id); + + let req = WriteRequest { + index_uid: shadow_index.clone(), + documents: docs.clone(), + primary_key: Some(primary_key.clone()), + // Tag shadow writes with origin for CDC suppression (plan §13.13) + origin: Some(miroir_core::cdc::ORIGIN_RESHARD_BACKFILL.to_string()), + }; + + match client.write_documents(&node.id, &node.address, &req).await { + Ok(resp) if resp.success => { + quorum_state.record_success(group_id, &node.id); + if let Some(task_uid) = resp.task_uid { + node_task_uids.insert(node.id.as_str().to_string(), task_uid); + } + } + Ok(resp) => { + // Non-success response - log but don't fail the live write + tracing::warn!( + node = %node.id, + error = ?resp.message, + "shadow index write returned non-success" + ); + } + Err(e) => { + // Log shadow write failure but don't fail the live write + tracing::warn!( + node = %node.id, + error = ?e, + "shadow index write failed" + ); + } + } + } + } + } + // 6. Apply two-rule quorum logic let degraded_groups = quorum_state.count_degraded_groups(replica_group_count, rf); let quorum_groups = quorum_state.count_quorum_groups(); @@ -619,7 +736,7 @@ async fn write_documents_impl( use sha2::{Digest, Sha256}; let body_hash = format!( "{:x}", - Sha256::digest(serde_json::to_string(&documents).unwrap_or_default()) + Sha256::digest(serde_json::to_string(&live_docs).unwrap_or_default()) ); state .idempotency_cache