feat(reshard): implement dual-hash dual-write phase (P5.1.b)

Implements plan §13.1 step 2: dual-hash dual-write during resharding.
When an index is in resharding dual-write phase (shadow exists),
every write routes to BOTH live (hash %S_old) AND shadow (hash %S_new)
indexes, each with its own _miroir_shard tag. Shadow writes are tagged
with origin="reshard_backfill" for CDC suppression (plan §13.13).

Changes:
- Add ReshardingRegistry to track active resharding operations
- Add ReshardOperationState for dual-write detection
- Add prepare_dual_write_documents() to separate live/shard batches
- Modify write_documents_impl to check resharding registry
- Add shadow index write path with origin tagging
- Add ReshardingRegistry to AppState for write path access

Tests:
- 15 ReshardingRegistry tests covering register, get, update, remove
- 4 dual_write tests for document preparation logic

Closes: miroir-uhj.1.2
This commit is contained in:
jedarden 2026-05-24 03:02:36 -04:00
parent 8d5c12787e
commit 83c03d0909
3 changed files with 728 additions and 14 deletions

View file

@ -849,6 +849,125 @@ pub struct ReshardRegistry {
index_ops: HashMap<String, String>,
}
/// In-memory registry tracking active resharding operations for dual-write detection.
///
/// This is used by the write path to determine if an index is in dual-write phase
/// (shadow exists) and needs dual-hash routing.
#[derive(Debug, Default)]
pub struct ReshardingRegistry {
/// Map of index_uid -> active resharding state
/// When an index is in this registry with phase >= ShadowCreated,
/// writes must be dual-hashed to both live and shadow indexes.
active_operations: HashMap<String, ReshardOperationState>,
}
/// Active resharding state for an index.
#[derive(Debug, Clone)]
pub struct ReshardOperationState {
/// Shadow index UID (e.g., "products__reshard_128")
pub shadow_index: String,
/// Old shard count
pub old_shards: u32,
/// New shard count
pub target_shards: u32,
/// Current phase
pub phase: ReshardPhase,
/// When the operation started (UNIX ms)
pub started_at: u64,
}
impl ReshardingRegistry {
/// Create a new empty registry.
pub fn new() -> Self {
Self::default()
}
/// Register a resharding operation for dual-write detection.
///
/// Once registered, writes to the index will be dual-hashed to both
/// live and shadow indexes when phase >= ShadowCreated.
pub fn register(
&mut self,
index_uid: String,
state: ReshardOperationState,
) -> Result<(), String> {
if self.active_operations.contains_key(&index_uid) {
return Err(format!(
"Resharding already in progress for index '{}'",
index_uid
));
}
tracing::info!(
index_uid = %index_uid,
shadow_index = %state.shadow_index,
old_shards = state.old_shards,
target_shards = state.target_shards,
phase = ?state.phase,
"registered resharding operation for dual-write"
);
self.active_operations.insert(index_uid, state);
Ok(())
}
/// Get the active resharding state for an index (if any).
pub fn get(&self, index_uid: &str) -> Option<&ReshardOperationState> {
self.active_operations.get(index_uid)
}
/// Update the phase of an active resharding operation.
pub fn update_phase(&mut self, index_uid: &str, new_phase: ReshardPhase) -> Result<(), String> {
let op = self
.active_operations
.get_mut(index_uid)
.ok_or_else(|| format!("No resharding operation for index '{}'", index_uid))?;
op.phase = new_phase;
tracing::info!(
index_uid = %index_uid,
phase = ?new_phase,
"updated resharding phase"
);
Ok(())
}
/// Remove a completed resharding operation.
pub fn remove(&mut self, index_uid: &str) -> Result<(), String> {
if self.active_operations.remove(index_uid).is_none() {
return Err(format!("No resharding operation for index '{}'", index_uid));
}
tracing::info!(
index_uid = %index_uid,
"removed resharding operation from registry"
);
Ok(())
}
/// Check if an index is in dual-write phase.
///
/// Returns true if the index has an active resharding operation with
/// phase >= ShadowCreated and phase <= Swapped.
pub fn is_dual_write_active(&self, index_uid: &str) -> bool {
if let Some(op) = self.get(index_uid) {
matches!(
op.phase,
ReshardPhase::ShadowCreated
| ReshardPhase::DualWriteActive
| ReshardPhase::BackfillInProgress
| ReshardPhase::Verifying
)
} else {
false
}
}
/// List all active resharding operations.
pub fn list(&self) -> Vec<(String, &ReshardOperationState)> {
self.active_operations
.iter()
.map(|(k, v)| (k.clone(), v))
.collect()
}
}
/// Leader-coordinated reshard coordinator (plan §14.5 Mode B).
///
/// Acquires a per-index leader lease (scope: "reshard:<index>") and persists
@ -1452,7 +1571,7 @@ async fn two_phase_broadcast_settings(
}
Ok(resp) => {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
let _text = resp.text().await.unwrap_or_default();
Err(format!("{}: HTTP {}", address, status.as_u16()))
}
Err(e) => Err(format!("{}: {}", address, e)),
@ -1587,6 +1706,208 @@ async fn rollback_shadow_index(
}
}
// ---------------------------------------------------------------------------
// Phase 2: Dual-hash dual-write (plan §13.1 step 2)
// ---------------------------------------------------------------------------
/// Result of preparing documents for dual-hash dual-write.
#[derive(Debug, Clone)]
pub struct DualWritePreparation {
/// Documents to write to live index (with old shard tags).
pub live_documents: Vec<serde_json::Value>,
/// Documents to write to shadow index (with new shard tags).
pub shadow_documents: Vec<serde_json::Value>,
/// Shadow index UID.
pub shadow_index: String,
/// Old shard count.
pub old_shards: u32,
/// New shard count.
pub target_shards: u32,
}
/// Prepare documents for dual-hash dual-write during resharding.
///
/// When an index is in dual-write phase (shadow exists), every write must be
/// routed to BOTH live and shadow indexes with different shard tags:
/// - Live index: `_miroir_shard = hash(pk) % S_old`
/// - Shadow index: `_miroir_shard = hash(pk) % S_new`
///
/// Shadow writes are tagged with `_miroir_origin: "reshard_backfill"` so
/// CDC suppresses them by default (plan §13.13).
///
/// # Arguments
/// * `documents` - Original documents from client (without _miroir_shard)
/// * `primary_key` - Primary key field name
/// * `reshard_state` - Active resharding state for the index
///
/// # Returns
/// `Ok(DualWritePreparation)` with separate document batches for live and shadow.
///
/// # Panics
/// Panics if any document is missing the primary key field (caller should validate first).
pub fn prepare_dual_write_documents(
documents: &[serde_json::Value],
primary_key: &str,
reshard_state: &ReshardOperationState,
) -> DualWritePreparation {
let mut live_documents = Vec::with_capacity(documents.len());
let mut shadow_documents = Vec::with_capacity(documents.len());
for doc in documents {
let pk_value = doc
.get(primary_key)
.and_then(|v| v.as_str())
.expect("primary key validation should have happened before this call");
// Compute old shard assignment for live index
let old_shard_id = crate::router::shard_for_key(pk_value, reshard_state.old_shards);
// Compute new shard assignment for shadow index
let new_shard_id = crate::router::shard_for_key(pk_value, reshard_state.target_shards);
// Clone document for live index
let mut live_doc = doc.clone();
live_doc["_miroir_shard"] = serde_json::json!(old_shard_id);
live_documents.push(live_doc);
// Clone document for shadow index with new shard tag
let mut shadow_doc = doc.clone();
shadow_doc["_miroir_shard"] = serde_json::json!(new_shard_id);
shadow_documents.push(shadow_doc);
}
DualWritePreparation {
live_documents,
shadow_documents,
shadow_index: reshard_state.shadow_index.clone(),
old_shards: reshard_state.old_shards,
target_shards: reshard_state.target_shards,
}
}
#[cfg(test)]
mod tests_dual_write {
use super::*;
use serde_json::json;
#[test]
fn prepare_dual_write_separates_shards() {
let documents = vec![
json!({"id": "user:123", "name": "Alice"}),
json!({"id": "user:456", "name": "Bob"}),
];
let reshard_state = ReshardOperationState {
shadow_index: "users__reshard_128".to_string(),
old_shards: 64,
target_shards: 128,
phase: ReshardPhase::ShadowCreated,
started_at: 1000,
};
let prep = prepare_dual_write_documents(&documents, "id", &reshard_state);
assert_eq!(prep.live_documents.len(), 2);
assert_eq!(prep.shadow_documents.len(), 2);
assert_eq!(prep.shadow_index, "users__reshard_128");
assert_eq!(prep.old_shards, 64);
assert_eq!(prep.target_shards, 128);
// Verify live documents have old shard tags
for doc in &prep.live_documents {
assert!(doc.get("_miroir_shard").is_some());
let shard = doc["_miroir_shard"].as_u64().unwrap();
assert!(shard < 64, "live shard should be < 64");
}
// Verify shadow documents have new shard tags
for doc in &prep.shadow_documents {
assert!(doc.get("_miroir_shard").is_some());
let shard = doc["_miroir_shard"].as_u64().unwrap();
assert!(shard < 128, "shadow shard should be < 128");
}
}
#[test]
fn prepare_dual_write_preserves_other_fields() {
let documents = vec![json!({
"id": "product:abc",
"name": "Widget",
"price": 19.99,
"tags": ["widget", "sale"]
})];
let reshard_state = ReshardOperationState {
shadow_index: "products__reshard_256".to_string(),
old_shards: 128,
target_shards: 256,
phase: ReshardPhase::DualWriteActive,
started_at: 2000,
};
let prep = prepare_dual_write_documents(&documents, "id", &reshard_state);
let live_doc = &prep.live_documents[0];
let shadow_doc = &prep.shadow_documents[0];
// Check that all fields are preserved
assert_eq!(live_doc["id"], "product:abc");
assert_eq!(live_doc["name"], "Widget");
assert_eq!(live_doc["price"], 19.99);
assert_eq!(live_doc["tags"], json!(["widget", "sale"]));
// Shadow should have same fields except shard tag
assert_eq!(shadow_doc["id"], "product:abc");
assert_eq!(shadow_doc["name"], "Widget");
assert_eq!(shadow_doc["price"], 19.99);
assert_eq!(shadow_doc["tags"], json!(["widget", "sale"]));
}
#[test]
fn prepare_dual_write_deterministic_shard_assignment() {
let documents = vec![json!({"id": "test:key"})];
let reshard_state = ReshardOperationState {
shadow_index: "test__reshard_32".to_string(),
old_shards: 16,
target_shards: 32,
phase: ReshardPhase::BackfillInProgress,
started_at: 3000,
};
// Run multiple times - should be deterministic
let prep1 = prepare_dual_write_documents(&documents, "id", &reshard_state);
let prep2 = prepare_dual_write_documents(&documents, "id", &reshard_state);
assert_eq!(
prep1.live_documents[0]["_miroir_shard"], prep2.live_documents[0]["_miroir_shard"],
"live shard assignment should be deterministic"
);
assert_eq!(
prep1.shadow_documents[0]["_miroir_shard"], prep2.shadow_documents[0]["_miroir_shard"],
"shadow shard assignment should be deterministic"
);
}
#[test]
fn prepare_dual_write_handles_empty_batch() {
let documents: Vec<serde_json::Value> = vec![];
let reshard_state = ReshardOperationState {
shadow_index: "empty__reshard_64".to_string(),
old_shards: 32,
target_shards: 64,
phase: ReshardPhase::ShadowCreated,
started_at: 1000,
};
let prep = prepare_dual_write_documents(&documents, "id", &reshard_state);
assert_eq!(prep.live_documents.len(), 0);
assert_eq!(prep.shadow_documents.len(), 0);
}
}
#[cfg(test)]
mod tests_reshard_execution {
use super::*;
@ -1800,3 +2121,267 @@ mod tests_shadow_create {
assert!(err.to_string().contains("rollback"));
}
}
// ---------------------------------------------------------------------------
// ReshardingRegistry tests (P5.1.b dual-write detection)
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests_resharding_registry {
use super::*;
#[test]
fn registry_new_is_empty() {
let reg = ReshardingRegistry::new();
assert!(reg.get("products").is_none());
assert!(!reg.is_dual_write_active("products"));
}
#[test]
fn registry_register_and_get() {
let mut reg = ReshardingRegistry::new();
let state = ReshardOperationState {
shadow_index: "products__reshard_128".to_string(),
old_shards: 64,
target_shards: 128,
phase: ReshardPhase::ShadowCreated,
started_at: 1000,
};
reg.register("products".to_string(), state).unwrap();
let retrieved = reg.get("products").unwrap();
assert_eq!(retrieved.shadow_index, "products__reshard_128");
assert_eq!(retrieved.old_shards, 64);
assert_eq!(retrieved.target_shards, 128);
assert_eq!(retrieved.phase, ReshardPhase::ShadowCreated);
}
#[test]
fn registry_register_duplicate_rejected() {
let mut reg = ReshardingRegistry::new();
let state = ReshardOperationState {
shadow_index: "products__reshard_128".to_string(),
old_shards: 64,
target_shards: 128,
phase: ReshardPhase::ShadowCreated,
started_at: 1000,
};
reg.register("products".to_string(), state).unwrap();
let state2 = ReshardOperationState {
shadow_index: "products__reshard_256".to_string(),
old_shards: 128,
target_shards: 256,
phase: ReshardPhase::ShadowCreated,
started_at: 2000,
};
assert!(reg.register("products".to_string(), state2).is_err());
}
#[test]
fn registry_update_phase() {
let mut reg = ReshardingRegistry::new();
let state = ReshardOperationState {
shadow_index: "products__reshard_128".to_string(),
old_shards: 64,
target_shards: 128,
phase: ReshardPhase::ShadowCreated,
started_at: 1000,
};
reg.register("products".to_string(), state).unwrap();
reg.update_phase("products", ReshardPhase::DualWriteActive)
.unwrap();
let retrieved = reg.get("products").unwrap();
assert_eq!(retrieved.phase, ReshardPhase::DualWriteActive);
}
#[test]
fn registry_update_phase_nonexistent_errors() {
let mut reg = ReshardingRegistry::new();
assert!(reg
.update_phase("products", ReshardPhase::DualWriteActive)
.is_err());
}
#[test]
fn registry_remove() {
let mut reg = ReshardingRegistry::new();
let state = ReshardOperationState {
shadow_index: "products__reshard_128".to_string(),
old_shards: 64,
target_shards: 128,
phase: ReshardPhase::ShadowCreated,
started_at: 1000,
};
reg.register("products".to_string(), state).unwrap();
assert!(reg.get("products").is_some());
reg.remove("products").unwrap();
assert!(reg.get("products").is_none());
}
#[test]
fn registry_remove_nonexistent_errors() {
let mut reg = ReshardingRegistry::new();
assert!(reg.remove("products").is_err());
}
#[test]
fn registry_is_dual_write_active_shadow_created() {
let mut reg = ReshardingRegistry::new();
let state = ReshardOperationState {
shadow_index: "products__reshard_128".to_string(),
old_shards: 64,
target_shards: 128,
phase: ReshardPhase::ShadowCreated,
started_at: 1000,
};
reg.register("products".to_string(), state).unwrap();
assert!(reg.is_dual_write_active("products"));
}
#[test]
fn registry_is_dual_write_active_dual_write_phase() {
let mut reg = ReshardingRegistry::new();
let state = ReshardOperationState {
shadow_index: "products__reshard_128".to_string(),
old_shards: 64,
target_shards: 128,
phase: ReshardPhase::DualWriteActive,
started_at: 1000,
};
reg.register("products".to_string(), state).unwrap();
assert!(reg.is_dual_write_active("products"));
}
#[test]
fn registry_is_dual_write_active_backfill_phase() {
let mut reg = ReshardingRegistry::new();
let state = ReshardOperationState {
shadow_index: "products__reshard_128".to_string(),
old_shards: 64,
target_shards: 128,
phase: ReshardPhase::BackfillInProgress,
started_at: 1000,
};
reg.register("products".to_string(), state).unwrap();
assert!(reg.is_dual_write_active("products"));
}
#[test]
fn registry_is_dual_write_active_verifying_phase() {
let mut reg = ReshardingRegistry::new();
let state = ReshardOperationState {
shadow_index: "products__reshard_128".to_string(),
old_shards: 64,
target_shards: 128,
phase: ReshardPhase::Verifying,
started_at: 1000,
};
reg.register("products".to_string(), state).unwrap();
assert!(reg.is_dual_write_active("products"));
}
#[test]
fn registry_is_dual_write_active_swapped_phase_false() {
let mut reg = ReshardingRegistry::new();
let state = ReshardOperationState {
shadow_index: "products__reshard_128".to_string(),
old_shards: 64,
target_shards: 128,
phase: ReshardPhase::Swapped,
started_at: 1000,
};
reg.register("products".to_string(), state).unwrap();
// After swap, dual-write stops (writes go only to new index)
assert!(!reg.is_dual_write_active("products"));
}
#[test]
fn registry_is_dual_write_active_no_operation() {
let reg = ReshardingRegistry::new();
assert!(!reg.is_dual_write_active("products"));
}
#[test]
fn registry_list() {
let mut reg = ReshardingRegistry::new();
let state1 = ReshardOperationState {
shadow_index: "products__reshard_128".to_string(),
old_shards: 64,
target_shards: 128,
phase: ReshardPhase::ShadowCreated,
started_at: 1000,
};
reg.register("products".to_string(), state1).unwrap();
let state2 = ReshardOperationState {
shadow_index: "orders__reshard_256".to_string(),
old_shards: 128,
target_shards: 256,
phase: ReshardPhase::DualWriteActive,
started_at: 2000,
};
reg.register("orders".to_string(), state2).unwrap();
let list = reg.list();
assert_eq!(list.len(), 2);
let list_map: std::collections::HashMap<_, _> = list.into_iter().collect();
assert!(list_map.contains_key("products"));
assert!(list_map.contains_key("orders"));
assert_eq!(
list_map.get("products").unwrap().shadow_index,
"products__reshard_128"
);
assert_eq!(
list_map.get("orders").unwrap().shadow_index,
"orders__reshard_256"
);
}
#[test]
fn registry_multiple_indexes_independent() {
let mut reg = ReshardingRegistry::new();
let products_state = ReshardOperationState {
shadow_index: "products__reshard_128".to_string(),
old_shards: 64,
target_shards: 128,
phase: ReshardPhase::DualWriteActive,
started_at: 1000,
};
reg.register("products".to_string(), products_state)
.unwrap();
let orders_state = ReshardOperationState {
shadow_index: "orders__reshard_256".to_string(),
old_shards: 128,
target_shards: 256,
phase: ReshardPhase::ShadowCreated,
started_at: 2000,
};
reg.register("orders".to_string(), orders_state).unwrap();
// Both should be in dual-write
assert!(reg.is_dual_write_active("products"));
assert!(reg.is_dual_write_active("orders"));
// Update products to swapped
reg.update_phase("products", ReshardPhase::Swapped).unwrap();
// Now only orders should be in dual-write
assert!(!reg.is_dual_write_active("products"));
assert!(reg.is_dual_write_active("orders"));
// Remove orders
reg.remove("orders").unwrap();
// Neither should be in dual-write
assert!(!reg.is_dual_write_active("products"));
assert!(!reg.is_dual_write_active("orders"));
}
}

View file

@ -20,6 +20,7 @@ use miroir_core::{
RebalancerMetricsCallback, RebalancerWorker, RebalancerWorkerConfig, TopologyChangeEvent,
},
replica_selection::{ReplicaSelector, SelectionObserver},
reshard::ReshardingRegistry,
router,
scatter::{DeleteByFilterRequest, FetchDocumentsRequest, FetchDocumentsResponse, WriteRequest},
task_registry::TaskRegistryImpl,
@ -381,6 +382,9 @@ pub struct AppState {
pub group_sync_worker: Option<Arc<GroupSyncWorker<HttpClient>>>,
/// Mode A coordinator for shard-partitioned ownership (plan §14.5 Mode A).
pub mode_a_coordinator: Option<Arc<ModeACoordinator>>,
/// Resharding registry for tracking active resharding operations (plan §13.1).
/// Used by the write path to detect dual-write phase and route to both live and shadow indexes.
pub resharding_registry: Arc<tokio::sync::RwLock<ReshardingRegistry>>,
}
impl AppState {
@ -665,10 +669,15 @@ impl AppState {
// Create Mode A coordinator for shard-partitioned ownership (plan §14.5 Mode A)
let mode_a_coordinator = if cfg!(feature = "peer-discovery") {
let pod_name = std::env::var("POD_NAME").unwrap_or_else(|_| "unknown".to_string());
let namespace = std::env::var("POD_NAMESPACE").unwrap_or_else(|_| "default".to_string());
let namespace =
std::env::var("POD_NAMESPACE").unwrap_or_else(|_| "default".to_string());
let service_name = std::env::var("MIROR_SERVICE_NAME")
.unwrap_or_else(|_| "miroir-headless".to_string());
let peer_discovery = Arc::new(PeerDiscovery::new(pod_name.clone(), namespace, service_name));
let peer_discovery = Arc::new(PeerDiscovery::new(
pod_name.clone(),
namespace,
service_name,
));
Some(Arc::new(ModeACoordinator::new(pod_name, peer_discovery)))
} else {
None
@ -756,6 +765,9 @@ impl AppState {
group_addition_coordinator,
group_sync_worker,
mode_a_coordinator,
resharding_registry: Arc::new(tokio::sync::RwLock::new(
miroir_core::reshard::ReshardingRegistry::new(),
)),
}
}

View file

@ -461,6 +461,28 @@ async fn write_documents_impl(
}
}
// 2.5. Check if index is in resharding dual-write phase (plan §13.1 step 2)
// If yes, prepare separate document batches for live and shadow indexes
let resharding_state = state.resharding_registry.read().await;
let dual_write_prep = if resharding_state.is_dual_write_active(&index_uid) {
let state = resharding_state.get(&index_uid).unwrap();
tracing::debug!(
index_uid = %index_uid,
shadow_index = %state.shadow_index,
old_shards = state.old_shards,
target_shards = state.target_shards,
"index in resharding dual-write phase, preparing dual-hash writes"
);
Some(miroir_core::reshard::prepare_dual_write_documents(
&documents,
&primary_key,
state,
))
} else {
None
};
drop(resharding_state); // Release lock before async operations
// 3. Inject _miroir_shard and _miroir_updated_at into each document
let topology = state.topology.read().await;
let shard_count = topology.shards;
@ -479,21 +501,49 @@ async fn write_documents_impl(
None
};
for doc in &mut documents {
if let Some(pk_value) = doc.get(&primary_key).and_then(|v| v.as_str()) {
let shard_id = shard_for_key(pk_value, shard_count);
doc["_miroir_shard"] = serde_json::json!(shard_id);
}
// Handle dual-write resharding: prepare separate document batches
// If dual_write_prep is Some, use pre-computed batches with different shard tags
// Otherwise, inject shard tags inline as before
let (mut live_docs, shadow_write_info) = if let Some(prep) = dual_write_prep {
// Use pre-computed batches from prepare_dual_write_documents
// Shadow documents already have new shard tags; live documents have old shard tags
(
prep.live_documents,
Some((
prep.shadow_documents,
prep.shadow_index,
prep.old_shards,
prep.target_shards,
)),
)
} else {
// Normal path: inject shard tags inline
for doc in &mut documents {
if let Some(pk_value) = doc.get(&primary_key).and_then(|v| v.as_str()) {
let shard_id = shard_for_key(pk_value, shard_count);
doc["_miroir_shard"] = serde_json::json!(shard_id);
}
// Stamp _miroir_updated_at when anti_entropy is enabled (plan §13.8)
// This happens AFTER reserved field validation, so orchestrator-controlled injection is allowed
if let Some(timestamp) = now_ms {
doc[updated_at_field] = serde_json::json!(timestamp);
// Stamp _miroir_updated_at when anti_entropy is enabled (plan §13.8)
// This happens AFTER reserved field validation, so orchestrator-controlled injection is allowed
if let Some(timestamp) = now_ms {
doc[updated_at_field] = serde_json::json!(timestamp);
}
}
(documents, None)
};
// Stamp _miroir_updated_at on live documents if anti_entropy is enabled
if let Some(timestamp) = now_ms {
for doc in &mut live_docs {
if doc.get(updated_at_field).is_none() {
doc[updated_at_field] = serde_json::json!(timestamp);
}
}
}
// 4. Group documents by target nodes (per-batch grouping for efficient fan-out)
let node_documents = group_documents_by_shard(&documents, &primary_key, &topology)?;
let node_documents = group_documents_by_shard(&live_docs, &primary_key, &topology)?;
// 5. Fan out to nodes and track quorum
let client = HttpClient::new(
@ -560,6 +610,73 @@ async fn write_documents_impl(
}
}
// 5.5. Dual-write to shadow index during resharding (plan §13.1 step 2)
// Shadow writes are tagged with origin="reshard_backfill" for CDC suppression (plan §13.13)
if let Some((shadow_docs, shadow_index, old_shards, target_shards)) = shadow_write_info {
tracing::debug!(
shadow_index = %shadow_index,
docs_count = shadow_docs.len(),
"writing to shadow index during resharding dual-write phase"
);
// Group shadow documents by their new shard assignment for efficient fan-out
let mut shadow_node_documents: HashMap<u32, Vec<Value>> = HashMap::new();
for doc in &shadow_docs {
let pk_value = doc
.get(&primary_key)
.and_then(|v| v.as_str())
.expect("primary key validation should have happened");
// Shadow documents already have new shard tags from prepare_dual_write_documents
let shard_id = shard_for_key(pk_value, target_shards);
shadow_node_documents
.entry(shard_id)
.or_default()
.push(doc.clone());
}
// Write shadow documents to all nodes (shadow index exists on all nodes)
for (_shard_id, docs) in shadow_node_documents {
for node in topology.nodes() {
let group_id = node.replica_group;
quorum_state.record_attempt(group_id, &node.id);
let req = WriteRequest {
index_uid: shadow_index.clone(),
documents: docs.clone(),
primary_key: Some(primary_key.clone()),
// Tag shadow writes with origin for CDC suppression (plan §13.13)
origin: Some(miroir_core::cdc::ORIGIN_RESHARD_BACKFILL.to_string()),
};
match client.write_documents(&node.id, &node.address, &req).await {
Ok(resp) if resp.success => {
quorum_state.record_success(group_id, &node.id);
if let Some(task_uid) = resp.task_uid {
node_task_uids.insert(node.id.as_str().to_string(), task_uid);
}
}
Ok(resp) => {
// Non-success response - log but don't fail the live write
tracing::warn!(
node = %node.id,
error = ?resp.message,
"shadow index write returned non-success"
);
}
Err(e) => {
// Log shadow write failure but don't fail the live write
tracing::warn!(
node = %node.id,
error = ?e,
"shadow index write failed"
);
}
}
}
}
}
// 6. Apply two-rule quorum logic
let degraded_groups = quorum_state.count_degraded_groups(replica_group_count, rf);
let quorum_groups = quorum_state.count_quorum_groups();
@ -619,7 +736,7 @@ async fn write_documents_impl(
use sha2::{Digest, Sha256};
let body_hash = format!(
"{:x}",
Sha256::digest(serde_json::to_string(&documents).unwrap_or_default())
Sha256::digest(serde_json::to_string(&live_docs).unwrap_or_default())
);
state
.idempotency_cache