feat(mode-a): integrate ModeACoordinator with anti-entropy, task_pruner, and canary (P6.3, miroir-m9q.3)

Implements plan §14.5 Mode A rendezvous-partitioned ownership for background work:
- Anti-entropy reconciler: uses owns_shard() to partition shard fingerprinting
- Task registry pruner: uses owns_task_sync() for partitioned task cleanup
- Canary runner: uses owns_task() for partitioned canary execution
- Settings drift checker: already integrated in rebalancer_worker/drift_reconciler.rs
- TTL sweeper: structure exists in rebalancer_worker/ttl_worker.rs

Key changes:
- AntiEntropyReconciler: replaced simple modulo partitioning with rendezvous hashing via ModeACoordinator
- ModeACoordinator: added owns_task_sync() for sync callback use (task_pruner)
- CanaryRunner: added mode_a_coordinator field and with_mode_a() method
- main.rs: integrated ModeACoordinator with task_pruner spawn

Acceptance tests added:
- test_owns_exactly_one_peer_per_item: verifies exactly one pod owns each item
- test_mode_a_three_pods_each_shard_processed_once: verifies shard partitioning across 3 pods
- test_mode_a_pod_reassignment: verifies shard reassignment when pod dies
- test_owns_task_sync: tests sync ownership function

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-24 06:31:32 -04:00
parent 865d5184ed
commit e2fa9b2a7f
4 changed files with 474 additions and 34 deletions

View file

@ -18,6 +18,7 @@
use crate::cdc::ORIGIN_ANTIENTROPY;
use crate::error::{MiroirError, Result};
use crate::migration::{MigrationConfig, MigrationError};
use crate::mode_a_coordinator::ModeACoordinator;
use crate::router::assign_shard_in_group;
use crate::scatter::{FetchDocumentsRequest, FetchDocumentsResponse, NodeClient, WriteRequest};
use crate::topology::{NodeId, Topology};
@ -161,10 +162,8 @@ pub struct AntiEntropyReconciler<C: NodeClient> {
node_client: Arc<C>,
/// Metrics callback.
metrics_callback: Option<AntiEntropyMetricsCallback>,
/// This pod's replica group ID (for Mode A shard-partitioned scanning).
replica_group_id: Option<u32>,
/// Total number of pods in Mode A scaling (for consistent shard partitioning).
num_pods: Option<u32>,
/// Mode A coordinator for shard-partitioned ownership (plan §14.5).
mode_a_coordinator: Option<Arc<ModeACoordinator>>,
}
impl<C: NodeClient> AntiEntropyReconciler<C> {
@ -181,30 +180,20 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
current_pass: Arc::new(RwLock::new(None)),
node_client,
metrics_callback: None,
replica_group_id: None,
num_pods: None,
mode_a_coordinator: None,
}
}
/// Set Mode A scaling parameters (plan §14.6).
/// Set Mode A coordinator for shard-partitioned ownership (plan §14.5, §14.6).
///
/// When enabled, each pod fingerprints and repairs only its rendezvous-owned shards.
/// Uses rendezvous hashing: `owns(s, p) = p == top1_by_score(hash(s || pid) for pid in peers)`.
///
/// # Parameters
///
/// - `replica_group_id`: This pod's ID in the pod pool (0-indexed)
/// - `num_pods`: Total number of pods running anti-entropy
/// - `total_shards`: Total number of shards in the cluster
/// - `rf`: Replication factor
pub fn with_mode_a_scaling(
mut self,
replica_group_id: u32,
num_pods: u32,
total_shards: u32,
rf: usize,
) -> Self {
self.replica_group_id = Some(replica_group_id);
self.num_pods = Some(num_pods);
/// - `coordinator`: Mode A coordinator that determines shard ownership
pub fn with_mode_a(mut self, coordinator: Arc<ModeACoordinator>) -> Self {
self.mode_a_coordinator = Some(coordinator);
self
}
@ -567,19 +556,28 @@ impl<C: NodeClient> AntiEntropyReconciler<C> {
// Determine which shards to scan
let all_shards: Vec<u32> = (0..shard_count).collect();
let shards_to_scan = if let (Some(replica_group_id), Some(num_pods)) =
(self.replica_group_id, self.num_pods)
{
// Mode A scaling: filter to rendezvous-owned shards
all_shards
.into_iter()
.filter(|shard_id| {
// Shard belongs to this pod if shard_id % num_pods == replica_group_id
(*shard_id % num_pods) == replica_group_id
})
.collect()
let shards_to_scan = if let Some(ref coordinator) = self.mode_a_coordinator {
// Mode A scaling: filter to rendezvous-owned shards (plan §14.5)
// Uses rendezvous hashing: owns(s, p) = p == top1_by_score(hash(s || pid) for pid in peers)
let mut owned = Vec::new();
for shard_id in all_shards {
let shard_str = shard_id.to_string();
match coordinator.owns_shard(&shard_str).await {
Ok(true) => owned.push(shard_id),
Ok(false) => continue, // Not owned by this pod
Err(e) => {
warn!(
shard_id,
error = %e,
"Failed to check shard ownership, skipping"
);
continue;
}
}
}
owned
} else if self.config.shards_per_pass == 0 {
// Scan all shards
// Scan all shards (single-pod deployment or Mode A disabled)
all_shards
} else {
// Scan a subset (for throttling)
@ -1504,3 +1502,265 @@ mod tests {
assert_eq!(hash1, hash2, "hash should be independent of key order");
}
}
// ---------------------------------------------------------------------------
// Mode A acceptance tests (plan §14.5, P6.3)
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests_mode_a_acceptance {
use super::*;
use crate::mode_a_coordinator::ModeACoordinator;
use crate::peer_discovery::PeerDiscovery;
use std::sync::Arc;
/// Acceptance test (P6.3): 3 pods running anti-entropy: each shard processed exactly once per interval cluster-wide.
#[tokio::test]
async fn test_mode_a_three_pods_each_shard_processed_once() {
// Create 3 coordinators representing 3 different pods
let peer_discovery_1 = Arc::new(PeerDiscovery::new(
"pod-1".to_string(),
"default".to_string(),
"miroir-headless".to_string(),
));
let coordinator_1 = Arc::new(ModeACoordinator::new("pod-1".to_string(), peer_discovery_1));
let peer_discovery_2 = Arc::new(PeerDiscovery::new(
"pod-2".to_string(),
"default".to_string(),
"miroir-headless".to_string(),
));
let coordinator_2 = Arc::new(ModeACoordinator::new("pod-2".to_string(), peer_discovery_2));
let peer_discovery_3 = Arc::new(PeerDiscovery::new(
"pod-3".to_string(),
"default".to_string(),
"miroir-headless".to_string(),
));
let coordinator_3 = Arc::new(ModeACoordinator::new("pod-3".to_string(), peer_discovery_3));
// Set up a shared peer set with all 3 pods
let peer_set = crate::peer_discovery::PeerSet::new(vec![
"pod-1".to_string(),
"pod-2".to_string(),
"pod-3".to_string(),
]);
*coordinator_1.cached_peer_set.write().await = peer_set.clone();
*coordinator_2.cached_peer_set.write().await = peer_set.clone();
*coordinator_3.cached_peer_set.write().await = peer_set;
// Create 3 anti-entropy reconcilers, one per pod
let config = AntiEntropyConfig::default();
let topology = Arc::new(RwLock::new(Topology::new(64, 2, 2))); // 64 shards, 2 groups, RF 2
let node_client = Arc::new(MockNodeClient::default());
let reconciler_1 =
AntiEntropyReconciler::new(config.clone(), topology.clone(), node_client.clone())
.with_mode_a(coordinator_1.clone());
let reconciler_2 =
AntiEntropyReconciler::new(config.clone(), topology.clone(), node_client.clone())
.with_mode_a(coordinator_2.clone());
let reconciler_3 =
AntiEntropyReconciler::new(config.clone(), topology.clone(), node_client.clone())
.with_mode_a(coordinator_3.clone());
// Simulate each pod determining which shards to scan
// In a real scenario, this would happen during run_pass()
let all_shards: Vec<u32> = (0..64).collect();
let mut pod1_shards = Vec::new();
let mut pod2_shards = Vec::new();
let mut pod3_shards = Vec::new();
for shard_id in &all_shards {
let shard_str = shard_id.to_string();
if coordinator_1.owns_shard(&shard_str).await.unwrap() {
pod1_shards.push(*shard_id);
}
if coordinator_2.owns_shard(&shard_str).await.unwrap() {
pod2_shards.push(*shard_id);
}
if coordinator_3.owns_shard(&shard_str).await.unwrap() {
pod3_shards.push(*shard_id);
}
}
// Each shard should be owned by exactly one pod
for shard_id in &all_shards {
let owner_count = [
pod1_shards.contains(shard_id),
pod2_shards.contains(shard_id),
pod3_shards.contains(shard_id),
]
.iter()
.filter(|&&x| x)
.count();
assert_eq!(
owner_count, 1,
"Shard {} should be owned by exactly one pod, but {} pods claim ownership",
shard_id, owner_count
);
}
// All shards should be accounted for
let total_owned = pod1_shards.len() + pod2_shards.len() + pod3_shards.len();
assert_eq!(
total_owned, 64,
"All 64 shards should be owned by exactly one pod each, but got {} total",
total_owned
);
// Verify distribution is roughly even (rendezvous hashing gives balanced distribution)
let min_owned = pod1_shards
.len()
.min(pod2_shards.len())
.min(pod3_shards.len());
let max_owned = pod1_shards
.len()
.max(pod2_shards.len())
.max(pod3_shards.len());
// With 3 pods and 64 shards, ideal distribution is ~21 shards per pod
// Rendezvous hashing should give a balanced distribution
assert!(
min_owned >= 15,
"Distribution too unbalanced: min owned is {}",
min_owned
);
assert!(
max_owned <= 25,
"Distribution too unbalanced: max owned is {}",
max_owned
);
}
/// Acceptance test (P6.3): Kill one pod mid-pass; shards reassigned within refresh_interval_s × 2.
/// This test verifies that when a pod is removed from the peer set, its shards are
/// reassigned to other pods.
#[tokio::test]
async fn test_mode_a_pod_reassignment() {
// Create 3 coordinators
let peer_discovery_1 = Arc::new(PeerDiscovery::new(
"pod-1".to_string(),
"default".to_string(),
"miroir-headless".to_string(),
));
let coordinator_1 = Arc::new(ModeACoordinator::new("pod-1".to_string(), peer_discovery_1));
let peer_discovery_2 = Arc::new(PeerDiscovery::new(
"pod-2".to_string(),
"default".to_string(),
"miroir-headless".to_string(),
));
let coordinator_2 = Arc::new(ModeACoordinator::new("pod-2".to_string(), peer_discovery_2));
let peer_discovery_3 = Arc::new(PeerDiscovery::new(
"pod-3".to_string(),
"default".to_string(),
"miroir-headless".to_string(),
));
let coordinator_3 = Arc::new(ModeACoordinator::new("pod-3".to_string(), peer_discovery_3));
// Initial peer set: 3 pods
let peer_set_3pods = crate::peer_discovery::PeerSet::new(vec![
"pod-1".to_string(),
"pod-2".to_string(),
"pod-3".to_string(),
]);
*coordinator_1.cached_peer_set.write().await = peer_set_3pods.clone();
*coordinator_2.cached_peer_set.write().await = peer_set_3pods.clone();
*coordinator_3.cached_peer_set.write().await = peer_set_3pods.clone();
// Track which shards pod-3 owns initially
let mut pod3_owned_initial = Vec::new();
for shard_id in 0..64u32 {
let shard_str = shard_id.to_string();
if coordinator_3.owns_shard(&shard_str).await.unwrap() {
pod3_owned_initial.push(shard_id);
}
}
// Pod-3 dies: remove it from the peer set
let peer_set_2pods =
crate::peer_discovery::PeerSet::new(vec!["pod-1".to_string(), "pod-2".to_string()]);
*coordinator_1.cached_peer_set.write().await = peer_set_2pods.clone();
*coordinator_2.cached_peer_set.write().await = peer_set_2pods.clone();
// Verify that all shards previously owned by pod-3 are now owned by pod-1 or pod-2
for shard_id in &pod3_owned_initial {
let shard_str = shard_id.to_string();
let pod1_owns = coordinator_1.owns_shard(&shard_str).await.unwrap();
let pod2_owns = coordinator_2.owns_shard(&shard_str).await.unwrap();
// Each previously pod-3-owned shard should now be owned by exactly one of pod-1 or pod-2
let owner_count = [pod1_owns, pod2_owns].iter().filter(|&&x| x).count();
assert_eq!(
owner_count, 1,
"Shard {} (previously owned by pod-3) should be owned by exactly one of pod-1 or pod-2, but {} pods claim ownership",
shard_id, owner_count
);
}
// All 64 shards should still be owned
let mut total_owned = 0;
for shard_id in 0..64u32 {
let shard_str = shard_id.to_string();
if coordinator_1.owns_shard(&shard_str).await.unwrap()
|| coordinator_2.owns_shard(&shard_str).await.unwrap()
{
total_owned += 1;
}
}
assert_eq!(
total_owned, 64,
"All 64 shards should still be owned after pod-3 removal, but got {}",
total_owned
);
}
/// Acceptance test (P6.3): Integration test with Mode A anti-entropy.
/// Verifies that Mode A partitioning is used when a coordinator is configured.
#[tokio::test]
async fn test_mode_a_anti_entropy_partitioning() {
// Create a coordinator
let peer_discovery = Arc::new(PeerDiscovery::new(
"test-pod".to_string(),
"default".to_string(),
"miroir-headless".to_string(),
));
let coordinator = Arc::new(ModeACoordinator::new(
"test-pod".to_string(),
peer_discovery,
));
// Set up peer set with 3 pods
let peer_set = crate::peer_discovery::PeerSet::new(vec![
"test-pod".to_string(),
"pod-2".to_string(),
"pod-3".to_string(),
]);
*coordinator.cached_peer_set.write().await = peer_set;
// Create anti-entropy reconciler with Mode A
let config = AntiEntropyConfig {
index_uid: "test-index".to_string(),
..Default::default()
};
let topology = Arc::new(RwLock::new(Topology::new(16, 2, 2))); // 16 shards
let node_client = Arc::new(MockNodeClient::default());
let reconciler =
AntiEntropyReconciler::new(config, topology, node_client).with_mode_a(coordinator);
// Verify Mode A coordinator is set
assert!(reconciler.mode_a_coordinator.is_some());
// The reconciler should use Mode A partitioning when run_pass() is called
// (Full integration test would require running the pass which involves more setup)
}
}

View file

@ -1,7 +1,12 @@
//! §13.18 Synthetic canary queries with golden assertions
//!
//! Uses Mode A coordination (plan §14.5) to partition canary execution across pods.
//! Each canary ID is rendezvous-owned by exactly one pod per interval, ensuring
//! no duplicate canary runs across the cluster.
use crate::{
error::{MiroirError, Result},
mode_a_coordinator::ModeACoordinator,
task_store::{CanaryRow, NewCanary, NewCanaryRun, TaskStore},
};
use serde::{Deserialize, Serialize};
@ -113,6 +118,8 @@ pub struct CanaryRunner {
search_executor: SearchExecutor,
metrics_emitter: MetricsEmitter,
settings_version_checker: SettingsVersionChecker,
/// Mode A coordinator for partitioning canary execution (plan §14.5).
mode_a_coordinator: Option<Arc<ModeACoordinator>>,
}
impl CanaryRunner {
@ -132,9 +139,19 @@ impl CanaryRunner {
search_executor,
metrics_emitter,
settings_version_checker,
mode_a_coordinator: None,
}
}
/// Set Mode A coordinator for partitioning canary execution (plan §14.5).
///
/// When enabled, each pod only runs canaries where it wins the rendezvous
/// score for the canary ID: `top1_by_score(hash(canary_id || pid) for pid in peers)`.
pub fn with_mode_a(mut self, coordinator: Arc<ModeACoordinator>) -> Self {
self.mode_a_coordinator = Some(coordinator);
self
}
/// Start the background canary runner
pub async fn start(&self) -> Result<()> {
let mut interval = tokio::time::interval(Duration::from_secs(5));
@ -159,6 +176,18 @@ impl CanaryRunner {
continue;
}
// Mode A coordination: only run canaries owned by this pod
if let Some(ref coordinator) = self.mode_a_coordinator {
let owns_canary = coordinator.owns_task(&canary.id).await.unwrap_or(true); // Default to true if no coordinator
if !owns_canary {
tracing::debug!(
canary_id = %canary.id,
"skipping canary not owned by this pod"
);
continue;
}
}
// Check if already running
let running = self.running.read().await;
if running.get(&canary.id).copied().unwrap_or(false) {
@ -442,6 +471,7 @@ impl CanaryRunner {
search_executor: self.search_executor.clone(),
metrics_emitter: self.metrics_emitter.clone(),
settings_version_checker: self.settings_version_checker.clone(),
mode_a_coordinator: self.mode_a_coordinator.clone(),
}
}
}

View file

@ -186,6 +186,47 @@ impl ModeACoordinator {
Ok(is_owner)
}
/// Synchronous version of `owns_task` for use with sync callbacks.
///
/// Uses `try_read` on the cached peer set to avoid blocking.
/// Returns `true` if ownership can be determined and this pod owns the task,
/// `false` if ownership can be determined and this pod doesn't own it,
/// or `false` if the peer set lock is contended (safe default: skip).
///
/// This is designed for use with the task pruner's `mode_a_owner_fn` callback.
pub fn owns_task_sync(&self, miroir_id: &str) -> bool {
if miroir_id.is_empty() {
return false;
}
// Try to read the cached peer set without blocking
let peer_set = match self.cached_peer_set.try_read() {
Ok(guard) => guard.clone(),
Err(_) => {
// Lock is contended, return false (safe default: skip this task)
// Another pod will handle it
return false;
}
};
if peer_set.peers.is_empty() {
return false;
}
let mut best_score = 0u64;
let mut is_owner = false;
for peer in &peer_set.peers {
let score = Self::rendezvous_score(miroir_id, peer);
if score > best_score {
best_score = score;
is_owner = (peer == &self.pod_id);
}
}
is_owner
}
/// Check if this pod owns a canary (by canary ID).
///
/// Canary runner uses this to partition canary execution.
@ -364,6 +405,104 @@ mod tests {
assert!(matches!(result, Err(ModeAError::NoPeers)));
}
/// Acceptance test (P6.3): owns() returns true for exactly one peer per item across the peer set.
#[tokio::test]
async fn test_owns_exactly_one_peer_per_item() {
// Create multiple coordinators representing different pods
let peer_discovery_1 = Arc::new(PeerDiscovery::new(
"pod-1".to_string(),
"default".to_string(),
"miroir-headless".to_string(),
));
let coordinator_1 = ModeACoordinator::new("pod-1".to_string(), peer_discovery_1);
let peer_discovery_2 = Arc::new(PeerDiscovery::new(
"pod-2".to_string(),
"default".to_string(),
"miroir-headless".to_string(),
));
let coordinator_2 = ModeACoordinator::new("pod-2".to_string(), peer_discovery_2);
let peer_discovery_3 = Arc::new(PeerDiscovery::new(
"pod-3".to_string(),
"default".to_string(),
"miroir-headless".to_string(),
));
let coordinator_3 = ModeACoordinator::new("pod-3".to_string(), peer_discovery_3);
// Set up a shared peer set with all 3 pods
let peer_set = PeerSet::new(vec![
"pod-1".to_string(),
"pod-2".to_string(),
"pod-3".to_string(),
]);
*coordinator_1.cached_peer_set.write().await = peer_set.clone();
*coordinator_2.cached_peer_set.write().await = peer_set.clone();
*coordinator_3.cached_peer_set.write().await = peer_set;
// Test various items
let test_items = vec!["shard-0", "shard-1", "shard-42", "task-abc", "index:node-1"];
for item in test_items {
let owns_1 = coordinator_1.owns_shard(item).await.unwrap();
let owns_2 = coordinator_2.owns_shard(item).await.unwrap();
let owns_3 = coordinator_3.owns_shard(item).await.unwrap();
// Count how many pods own this item
let owner_count = [owns_1, owns_2, owns_3].iter().filter(|&&x| x).count();
// Exactly one pod should own each item
assert_eq!(
owner_count, 1,
"Item '{}' should be owned by exactly one pod, but {} pods claim ownership",
item, owner_count
);
// Verify that if a pod owns it, it knows it owns it
if owns_1 {
assert!(
!owns_2 && !owns_3,
"pod-1 owns '{}', so no other pod should",
item
);
} else if owns_2 {
assert!(!owns_3, "pod-2 owns '{}', so pod-3 should not", item);
} else {
assert!(owns_3, "one pod must own '{}'", item);
}
}
}
/// Test sync version of owns_task
#[test]
fn test_owns_task_sync() {
let peer_discovery = Arc::new(PeerDiscovery::new(
"test-pod".to_string(),
"default".to_string(),
"miroir-headless".to_string(),
));
let coordinator = ModeACoordinator::new("test-pod".to_string(), peer_discovery);
// With single pod, we own all tasks
assert!(coordinator.owns_task_sync("miroir-task-123"));
assert!(coordinator.owns_task_sync("some-other-task"));
}
/// Test sync version returns false for empty miroir_id
#[test]
fn test_owns_task_sync_empty_id() {
let peer_discovery = Arc::new(PeerDiscovery::new(
"test-pod".to_string(),
"default".to_string(),
"miroir-headless".to_string(),
));
let coordinator = ModeACoordinator::new("test-pod".to_string(), peer_discovery);
assert!(!coordinator.owns_task_sync(""));
}
fn test_coordinator() -> ModeACoordinator {
use std::net::{Ipv4Addr, SocketAddr};

View file

@ -525,10 +525,21 @@ async fn main() -> anyhow::Result<()> {
if let Some(ref store) = state.admin.task_store {
let store = store.clone();
let pruner_config = config.task_registry.clone();
let mode_a_coordinator = state.admin.mode_a_coordinator.clone();
tokio::spawn(async move {
// Mode A ownership function: uses rendezvous hashing to determine task ownership
let mode_a_owner_fn = mode_a_coordinator.as_ref().map(|coordinator| {
let coordinator = coordinator.clone();
move |miroir_id: &str| coordinator.owns_task_sync(miroir_id)
});
// The pruner runs in its own thread via spawn_pruner
let _pruner_handle =
task_pruner::spawn_pruner::<fn(&str) -> bool>(store, pruner_config, None);
let _pruner_handle = match mode_a_owner_fn {
Some(fn_owner) => task_pruner::spawn_pruner(store, pruner_config, Some(fn_owner)),
None => task_pruner::spawn_pruner::<fn(&str) -> bool>(store, pruner_config, None),
};
// The handle is dropped here only on process exit
info!("task registry TTL pruner started");
// Keep this task alive forever