From e2fa9b2a7f4a8877dcf7a7aa7a5a33b07ce39a15 Mon Sep 17 00:00:00 2001 From: jedarden Date: Sun, 24 May 2026 06:31:32 -0400 Subject: [PATCH] feat(mode-a): integrate ModeACoordinator with anti-entropy, task_pruner, and canary (P6.3, miroir-m9q.3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements plan §14.5 Mode A rendezvous-partitioned ownership for background work: - Anti-entropy reconciler: uses owns_shard() to partition shard fingerprinting - Task registry pruner: uses owns_task_sync() for partitioned task cleanup - Canary runner: uses owns_task() for partitioned canary execution - Settings drift checker: already integrated in rebalancer_worker/drift_reconciler.rs - TTL sweeper: structure exists in rebalancer_worker/ttl_worker.rs Key changes: - AntiEntropyReconciler: replaced simple modulo partitioning with rendezvous hashing via ModeACoordinator - ModeACoordinator: added owns_task_sync() for sync callback use (task_pruner) - CanaryRunner: added mode_a_coordinator field and with_mode_a() method - main.rs: integrated ModeACoordinator with task_pruner spawn Acceptance tests added: - test_owns_exactly_one_peer_per_item: verifies exactly one pod owns each item - test_mode_a_three_pods_each_shard_processed_once: verifies shard partitioning across 3 pods - test_mode_a_pod_reassignment: verifies shard reassignment when pod dies - test_owns_task_sync: tests sync ownership function Co-Authored-By: Claude Opus 4.7 --- crates/miroir-core/src/anti_entropy.rs | 324 +++++++++++++++++-- crates/miroir-core/src/canary.rs | 30 ++ crates/miroir-core/src/mode_a_coordinator.rs | 139 ++++++++ crates/miroir-proxy/src/main.rs | 15 +- 4 files changed, 474 insertions(+), 34 deletions(-) diff --git a/crates/miroir-core/src/anti_entropy.rs b/crates/miroir-core/src/anti_entropy.rs index 86b56db..dc472de 100644 --- a/crates/miroir-core/src/anti_entropy.rs +++ b/crates/miroir-core/src/anti_entropy.rs @@ -18,6 +18,7 @@ use crate::cdc::ORIGIN_ANTIENTROPY; use crate::error::{MiroirError, Result}; use crate::migration::{MigrationConfig, MigrationError}; +use crate::mode_a_coordinator::ModeACoordinator; use crate::router::assign_shard_in_group; use crate::scatter::{FetchDocumentsRequest, FetchDocumentsResponse, NodeClient, WriteRequest}; use crate::topology::{NodeId, Topology}; @@ -161,10 +162,8 @@ pub struct AntiEntropyReconciler { node_client: Arc, /// Metrics callback. metrics_callback: Option, - /// This pod's replica group ID (for Mode A shard-partitioned scanning). - replica_group_id: Option, - /// Total number of pods in Mode A scaling (for consistent shard partitioning). - num_pods: Option, + /// Mode A coordinator for shard-partitioned ownership (plan §14.5). + mode_a_coordinator: Option>, } impl AntiEntropyReconciler { @@ -181,30 +180,20 @@ impl AntiEntropyReconciler { current_pass: Arc::new(RwLock::new(None)), node_client, metrics_callback: None, - replica_group_id: None, - num_pods: None, + mode_a_coordinator: None, } } - /// Set Mode A scaling parameters (plan §14.6). + /// Set Mode A coordinator for shard-partitioned ownership (plan §14.5, §14.6). /// /// When enabled, each pod fingerprints and repairs only its rendezvous-owned shards. + /// Uses rendezvous hashing: `owns(s, p) = p == top1_by_score(hash(s || pid) for pid in peers)`. /// /// # Parameters /// - /// - `replica_group_id`: This pod's ID in the pod pool (0-indexed) - /// - `num_pods`: Total number of pods running anti-entropy - /// - `total_shards`: Total number of shards in the cluster - /// - `rf`: Replication factor - pub fn with_mode_a_scaling( - mut self, - replica_group_id: u32, - num_pods: u32, - total_shards: u32, - rf: usize, - ) -> Self { - self.replica_group_id = Some(replica_group_id); - self.num_pods = Some(num_pods); + /// - `coordinator`: Mode A coordinator that determines shard ownership + pub fn with_mode_a(mut self, coordinator: Arc) -> Self { + self.mode_a_coordinator = Some(coordinator); self } @@ -567,19 +556,28 @@ impl AntiEntropyReconciler { // Determine which shards to scan let all_shards: Vec = (0..shard_count).collect(); - let shards_to_scan = if let (Some(replica_group_id), Some(num_pods)) = - (self.replica_group_id, self.num_pods) - { - // Mode A scaling: filter to rendezvous-owned shards - all_shards - .into_iter() - .filter(|shard_id| { - // Shard belongs to this pod if shard_id % num_pods == replica_group_id - (*shard_id % num_pods) == replica_group_id - }) - .collect() + let shards_to_scan = if let Some(ref coordinator) = self.mode_a_coordinator { + // Mode A scaling: filter to rendezvous-owned shards (plan §14.5) + // Uses rendezvous hashing: owns(s, p) = p == top1_by_score(hash(s || pid) for pid in peers) + let mut owned = Vec::new(); + for shard_id in all_shards { + let shard_str = shard_id.to_string(); + match coordinator.owns_shard(&shard_str).await { + Ok(true) => owned.push(shard_id), + Ok(false) => continue, // Not owned by this pod + Err(e) => { + warn!( + shard_id, + error = %e, + "Failed to check shard ownership, skipping" + ); + continue; + } + } + } + owned } else if self.config.shards_per_pass == 0 { - // Scan all shards + // Scan all shards (single-pod deployment or Mode A disabled) all_shards } else { // Scan a subset (for throttling) @@ -1504,3 +1502,265 @@ mod tests { assert_eq!(hash1, hash2, "hash should be independent of key order"); } } + +// --------------------------------------------------------------------------- +// Mode A acceptance tests (plan §14.5, P6.3) +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests_mode_a_acceptance { + use super::*; + use crate::mode_a_coordinator::ModeACoordinator; + use crate::peer_discovery::PeerDiscovery; + use std::sync::Arc; + + /// Acceptance test (P6.3): 3 pods running anti-entropy: each shard processed exactly once per interval cluster-wide. + #[tokio::test] + async fn test_mode_a_three_pods_each_shard_processed_once() { + // Create 3 coordinators representing 3 different pods + let peer_discovery_1 = Arc::new(PeerDiscovery::new( + "pod-1".to_string(), + "default".to_string(), + "miroir-headless".to_string(), + )); + let coordinator_1 = Arc::new(ModeACoordinator::new("pod-1".to_string(), peer_discovery_1)); + + let peer_discovery_2 = Arc::new(PeerDiscovery::new( + "pod-2".to_string(), + "default".to_string(), + "miroir-headless".to_string(), + )); + let coordinator_2 = Arc::new(ModeACoordinator::new("pod-2".to_string(), peer_discovery_2)); + + let peer_discovery_3 = Arc::new(PeerDiscovery::new( + "pod-3".to_string(), + "default".to_string(), + "miroir-headless".to_string(), + )); + let coordinator_3 = Arc::new(ModeACoordinator::new("pod-3".to_string(), peer_discovery_3)); + + // Set up a shared peer set with all 3 pods + let peer_set = crate::peer_discovery::PeerSet::new(vec![ + "pod-1".to_string(), + "pod-2".to_string(), + "pod-3".to_string(), + ]); + *coordinator_1.cached_peer_set.write().await = peer_set.clone(); + *coordinator_2.cached_peer_set.write().await = peer_set.clone(); + *coordinator_3.cached_peer_set.write().await = peer_set; + + // Create 3 anti-entropy reconcilers, one per pod + let config = AntiEntropyConfig::default(); + let topology = Arc::new(RwLock::new(Topology::new(64, 2, 2))); // 64 shards, 2 groups, RF 2 + let node_client = Arc::new(MockNodeClient::default()); + + let reconciler_1 = + AntiEntropyReconciler::new(config.clone(), topology.clone(), node_client.clone()) + .with_mode_a(coordinator_1.clone()); + let reconciler_2 = + AntiEntropyReconciler::new(config.clone(), topology.clone(), node_client.clone()) + .with_mode_a(coordinator_2.clone()); + let reconciler_3 = + AntiEntropyReconciler::new(config.clone(), topology.clone(), node_client.clone()) + .with_mode_a(coordinator_3.clone()); + + // Simulate each pod determining which shards to scan + // In a real scenario, this would happen during run_pass() + let all_shards: Vec = (0..64).collect(); + + let mut pod1_shards = Vec::new(); + let mut pod2_shards = Vec::new(); + let mut pod3_shards = Vec::new(); + + for shard_id in &all_shards { + let shard_str = shard_id.to_string(); + + if coordinator_1.owns_shard(&shard_str).await.unwrap() { + pod1_shards.push(*shard_id); + } + if coordinator_2.owns_shard(&shard_str).await.unwrap() { + pod2_shards.push(*shard_id); + } + if coordinator_3.owns_shard(&shard_str).await.unwrap() { + pod3_shards.push(*shard_id); + } + } + + // Each shard should be owned by exactly one pod + for shard_id in &all_shards { + let owner_count = [ + pod1_shards.contains(shard_id), + pod2_shards.contains(shard_id), + pod3_shards.contains(shard_id), + ] + .iter() + .filter(|&&x| x) + .count(); + + assert_eq!( + owner_count, 1, + "Shard {} should be owned by exactly one pod, but {} pods claim ownership", + shard_id, owner_count + ); + } + + // All shards should be accounted for + let total_owned = pod1_shards.len() + pod2_shards.len() + pod3_shards.len(); + assert_eq!( + total_owned, 64, + "All 64 shards should be owned by exactly one pod each, but got {} total", + total_owned + ); + + // Verify distribution is roughly even (rendezvous hashing gives balanced distribution) + let min_owned = pod1_shards + .len() + .min(pod2_shards.len()) + .min(pod3_shards.len()); + let max_owned = pod1_shards + .len() + .max(pod2_shards.len()) + .max(pod3_shards.len()); + + // With 3 pods and 64 shards, ideal distribution is ~21 shards per pod + // Rendezvous hashing should give a balanced distribution + assert!( + min_owned >= 15, + "Distribution too unbalanced: min owned is {}", + min_owned + ); + assert!( + max_owned <= 25, + "Distribution too unbalanced: max owned is {}", + max_owned + ); + } + + /// Acceptance test (P6.3): Kill one pod mid-pass; shards reassigned within refresh_interval_s × 2. + /// This test verifies that when a pod is removed from the peer set, its shards are + /// reassigned to other pods. + #[tokio::test] + async fn test_mode_a_pod_reassignment() { + // Create 3 coordinators + let peer_discovery_1 = Arc::new(PeerDiscovery::new( + "pod-1".to_string(), + "default".to_string(), + "miroir-headless".to_string(), + )); + let coordinator_1 = Arc::new(ModeACoordinator::new("pod-1".to_string(), peer_discovery_1)); + + let peer_discovery_2 = Arc::new(PeerDiscovery::new( + "pod-2".to_string(), + "default".to_string(), + "miroir-headless".to_string(), + )); + let coordinator_2 = Arc::new(ModeACoordinator::new("pod-2".to_string(), peer_discovery_2)); + + let peer_discovery_3 = Arc::new(PeerDiscovery::new( + "pod-3".to_string(), + "default".to_string(), + "miroir-headless".to_string(), + )); + let coordinator_3 = Arc::new(ModeACoordinator::new("pod-3".to_string(), peer_discovery_3)); + + // Initial peer set: 3 pods + let peer_set_3pods = crate::peer_discovery::PeerSet::new(vec![ + "pod-1".to_string(), + "pod-2".to_string(), + "pod-3".to_string(), + ]); + *coordinator_1.cached_peer_set.write().await = peer_set_3pods.clone(); + *coordinator_2.cached_peer_set.write().await = peer_set_3pods.clone(); + *coordinator_3.cached_peer_set.write().await = peer_set_3pods.clone(); + + // Track which shards pod-3 owns initially + let mut pod3_owned_initial = Vec::new(); + for shard_id in 0..64u32 { + let shard_str = shard_id.to_string(); + if coordinator_3.owns_shard(&shard_str).await.unwrap() { + pod3_owned_initial.push(shard_id); + } + } + + // Pod-3 dies: remove it from the peer set + let peer_set_2pods = + crate::peer_discovery::PeerSet::new(vec!["pod-1".to_string(), "pod-2".to_string()]); + *coordinator_1.cached_peer_set.write().await = peer_set_2pods.clone(); + *coordinator_2.cached_peer_set.write().await = peer_set_2pods.clone(); + + // Verify that all shards previously owned by pod-3 are now owned by pod-1 or pod-2 + for shard_id in &pod3_owned_initial { + let shard_str = shard_id.to_string(); + + let pod1_owns = coordinator_1.owns_shard(&shard_str).await.unwrap(); + let pod2_owns = coordinator_2.owns_shard(&shard_str).await.unwrap(); + + // Each previously pod-3-owned shard should now be owned by exactly one of pod-1 or pod-2 + let owner_count = [pod1_owns, pod2_owns].iter().filter(|&&x| x).count(); + + assert_eq!( + owner_count, 1, + "Shard {} (previously owned by pod-3) should be owned by exactly one of pod-1 or pod-2, but {} pods claim ownership", + shard_id, owner_count + ); + } + + // All 64 shards should still be owned + let mut total_owned = 0; + for shard_id in 0..64u32 { + let shard_str = shard_id.to_string(); + if coordinator_1.owns_shard(&shard_str).await.unwrap() + || coordinator_2.owns_shard(&shard_str).await.unwrap() + { + total_owned += 1; + } + } + + assert_eq!( + total_owned, 64, + "All 64 shards should still be owned after pod-3 removal, but got {}", + total_owned + ); + } + + /// Acceptance test (P6.3): Integration test with Mode A anti-entropy. + /// Verifies that Mode A partitioning is used when a coordinator is configured. + #[tokio::test] + async fn test_mode_a_anti_entropy_partitioning() { + // Create a coordinator + let peer_discovery = Arc::new(PeerDiscovery::new( + "test-pod".to_string(), + "default".to_string(), + "miroir-headless".to_string(), + )); + let coordinator = Arc::new(ModeACoordinator::new( + "test-pod".to_string(), + peer_discovery, + )); + + // Set up peer set with 3 pods + let peer_set = crate::peer_discovery::PeerSet::new(vec![ + "test-pod".to_string(), + "pod-2".to_string(), + "pod-3".to_string(), + ]); + *coordinator.cached_peer_set.write().await = peer_set; + + // Create anti-entropy reconciler with Mode A + let config = AntiEntropyConfig { + index_uid: "test-index".to_string(), + ..Default::default() + }; + let topology = Arc::new(RwLock::new(Topology::new(16, 2, 2))); // 16 shards + let node_client = Arc::new(MockNodeClient::default()); + + let reconciler = + AntiEntropyReconciler::new(config, topology, node_client).with_mode_a(coordinator); + + // Verify Mode A coordinator is set + assert!(reconciler.mode_a_coordinator.is_some()); + + // The reconciler should use Mode A partitioning when run_pass() is called + // (Full integration test would require running the pass which involves more setup) + } +} diff --git a/crates/miroir-core/src/canary.rs b/crates/miroir-core/src/canary.rs index 52776e1..d95f722 100644 --- a/crates/miroir-core/src/canary.rs +++ b/crates/miroir-core/src/canary.rs @@ -1,7 +1,12 @@ //! §13.18 Synthetic canary queries with golden assertions +//! +//! Uses Mode A coordination (plan §14.5) to partition canary execution across pods. +//! Each canary ID is rendezvous-owned by exactly one pod per interval, ensuring +//! no duplicate canary runs across the cluster. use crate::{ error::{MiroirError, Result}, + mode_a_coordinator::ModeACoordinator, task_store::{CanaryRow, NewCanary, NewCanaryRun, TaskStore}, }; use serde::{Deserialize, Serialize}; @@ -113,6 +118,8 @@ pub struct CanaryRunner { search_executor: SearchExecutor, metrics_emitter: MetricsEmitter, settings_version_checker: SettingsVersionChecker, + /// Mode A coordinator for partitioning canary execution (plan §14.5). + mode_a_coordinator: Option>, } impl CanaryRunner { @@ -132,9 +139,19 @@ impl CanaryRunner { search_executor, metrics_emitter, settings_version_checker, + mode_a_coordinator: None, } } + /// Set Mode A coordinator for partitioning canary execution (plan §14.5). + /// + /// When enabled, each pod only runs canaries where it wins the rendezvous + /// score for the canary ID: `top1_by_score(hash(canary_id || pid) for pid in peers)`. + pub fn with_mode_a(mut self, coordinator: Arc) -> Self { + self.mode_a_coordinator = Some(coordinator); + self + } + /// Start the background canary runner pub async fn start(&self) -> Result<()> { let mut interval = tokio::time::interval(Duration::from_secs(5)); @@ -159,6 +176,18 @@ impl CanaryRunner { continue; } + // Mode A coordination: only run canaries owned by this pod + if let Some(ref coordinator) = self.mode_a_coordinator { + let owns_canary = coordinator.owns_task(&canary.id).await.unwrap_or(true); // Default to true if no coordinator + if !owns_canary { + tracing::debug!( + canary_id = %canary.id, + "skipping canary not owned by this pod" + ); + continue; + } + } + // Check if already running let running = self.running.read().await; if running.get(&canary.id).copied().unwrap_or(false) { @@ -442,6 +471,7 @@ impl CanaryRunner { search_executor: self.search_executor.clone(), metrics_emitter: self.metrics_emitter.clone(), settings_version_checker: self.settings_version_checker.clone(), + mode_a_coordinator: self.mode_a_coordinator.clone(), } } } diff --git a/crates/miroir-core/src/mode_a_coordinator.rs b/crates/miroir-core/src/mode_a_coordinator.rs index d0eef47..2f80391 100644 --- a/crates/miroir-core/src/mode_a_coordinator.rs +++ b/crates/miroir-core/src/mode_a_coordinator.rs @@ -186,6 +186,47 @@ impl ModeACoordinator { Ok(is_owner) } + /// Synchronous version of `owns_task` for use with sync callbacks. + /// + /// Uses `try_read` on the cached peer set to avoid blocking. + /// Returns `true` if ownership can be determined and this pod owns the task, + /// `false` if ownership can be determined and this pod doesn't own it, + /// or `false` if the peer set lock is contended (safe default: skip). + /// + /// This is designed for use with the task pruner's `mode_a_owner_fn` callback. + pub fn owns_task_sync(&self, miroir_id: &str) -> bool { + if miroir_id.is_empty() { + return false; + } + + // Try to read the cached peer set without blocking + let peer_set = match self.cached_peer_set.try_read() { + Ok(guard) => guard.clone(), + Err(_) => { + // Lock is contended, return false (safe default: skip this task) + // Another pod will handle it + return false; + } + }; + + if peer_set.peers.is_empty() { + return false; + } + + let mut best_score = 0u64; + let mut is_owner = false; + + for peer in &peer_set.peers { + let score = Self::rendezvous_score(miroir_id, peer); + if score > best_score { + best_score = score; + is_owner = (peer == &self.pod_id); + } + } + + is_owner + } + /// Check if this pod owns a canary (by canary ID). /// /// Canary runner uses this to partition canary execution. @@ -364,6 +405,104 @@ mod tests { assert!(matches!(result, Err(ModeAError::NoPeers))); } + /// Acceptance test (P6.3): owns() returns true for exactly one peer per item across the peer set. + #[tokio::test] + async fn test_owns_exactly_one_peer_per_item() { + // Create multiple coordinators representing different pods + let peer_discovery_1 = Arc::new(PeerDiscovery::new( + "pod-1".to_string(), + "default".to_string(), + "miroir-headless".to_string(), + )); + let coordinator_1 = ModeACoordinator::new("pod-1".to_string(), peer_discovery_1); + + let peer_discovery_2 = Arc::new(PeerDiscovery::new( + "pod-2".to_string(), + "default".to_string(), + "miroir-headless".to_string(), + )); + let coordinator_2 = ModeACoordinator::new("pod-2".to_string(), peer_discovery_2); + + let peer_discovery_3 = Arc::new(PeerDiscovery::new( + "pod-3".to_string(), + "default".to_string(), + "miroir-headless".to_string(), + )); + let coordinator_3 = ModeACoordinator::new("pod-3".to_string(), peer_discovery_3); + + // Set up a shared peer set with all 3 pods + let peer_set = PeerSet::new(vec![ + "pod-1".to_string(), + "pod-2".to_string(), + "pod-3".to_string(), + ]); + *coordinator_1.cached_peer_set.write().await = peer_set.clone(); + *coordinator_2.cached_peer_set.write().await = peer_set.clone(); + *coordinator_3.cached_peer_set.write().await = peer_set; + + // Test various items + let test_items = vec!["shard-0", "shard-1", "shard-42", "task-abc", "index:node-1"]; + + for item in test_items { + let owns_1 = coordinator_1.owns_shard(item).await.unwrap(); + let owns_2 = coordinator_2.owns_shard(item).await.unwrap(); + let owns_3 = coordinator_3.owns_shard(item).await.unwrap(); + + // Count how many pods own this item + let owner_count = [owns_1, owns_2, owns_3].iter().filter(|&&x| x).count(); + + // Exactly one pod should own each item + assert_eq!( + owner_count, 1, + "Item '{}' should be owned by exactly one pod, but {} pods claim ownership", + item, owner_count + ); + + // Verify that if a pod owns it, it knows it owns it + if owns_1 { + assert!( + !owns_2 && !owns_3, + "pod-1 owns '{}', so no other pod should", + item + ); + } else if owns_2 { + assert!(!owns_3, "pod-2 owns '{}', so pod-3 should not", item); + } else { + assert!(owns_3, "one pod must own '{}'", item); + } + } + } + + /// Test sync version of owns_task + #[test] + fn test_owns_task_sync() { + let peer_discovery = Arc::new(PeerDiscovery::new( + "test-pod".to_string(), + "default".to_string(), + "miroir-headless".to_string(), + )); + + let coordinator = ModeACoordinator::new("test-pod".to_string(), peer_discovery); + + // With single pod, we own all tasks + assert!(coordinator.owns_task_sync("miroir-task-123")); + assert!(coordinator.owns_task_sync("some-other-task")); + } + + /// Test sync version returns false for empty miroir_id + #[test] + fn test_owns_task_sync_empty_id() { + let peer_discovery = Arc::new(PeerDiscovery::new( + "test-pod".to_string(), + "default".to_string(), + "miroir-headless".to_string(), + )); + + let coordinator = ModeACoordinator::new("test-pod".to_string(), peer_discovery); + + assert!(!coordinator.owns_task_sync("")); + } + fn test_coordinator() -> ModeACoordinator { use std::net::{Ipv4Addr, SocketAddr}; diff --git a/crates/miroir-proxy/src/main.rs b/crates/miroir-proxy/src/main.rs index da8723e..1676659 100644 --- a/crates/miroir-proxy/src/main.rs +++ b/crates/miroir-proxy/src/main.rs @@ -525,10 +525,21 @@ async fn main() -> anyhow::Result<()> { if let Some(ref store) = state.admin.task_store { let store = store.clone(); let pruner_config = config.task_registry.clone(); + let mode_a_coordinator = state.admin.mode_a_coordinator.clone(); + tokio::spawn(async move { + // Mode A ownership function: uses rendezvous hashing to determine task ownership + let mode_a_owner_fn = mode_a_coordinator.as_ref().map(|coordinator| { + let coordinator = coordinator.clone(); + move |miroir_id: &str| coordinator.owns_task_sync(miroir_id) + }); + // The pruner runs in its own thread via spawn_pruner - let _pruner_handle = - task_pruner::spawn_pruner:: bool>(store, pruner_config, None); + let _pruner_handle = match mode_a_owner_fn { + Some(fn_owner) => task_pruner::spawn_pruner(store, pruner_config, Some(fn_owner)), + None => task_pruner::spawn_pruner:: bool>(store, pruner_config, None), + }; + // The handle is dropped here only on process exit info!("task registry TTL pruner started"); // Keep this task alive forever