P4.4 Replica group addition: implementing initializing → active flow

Implements plan §2 "Adding a new replica group (throughput scaling)":

Core components:
- GroupAdditionCoordinator: Manages group addition state machine
  (Initializing → Syncing → SyncComplete → Active)
- GroupSyncWorker: Background worker that copies documents from source
  groups to new group via pagination with filter=_miroir_shard={id}
- GroupState enum: Tracks Initializing vs Active state for replica groups
- query_group_active(): Routes queries only to active groups, skipping
  initializing groups during sync

Key features:
- Round-robin source group selection across active groups to spread load
- Write fan-out to new group begins immediately during sync (durability
  guarantee - only historical data is transient until sync completes)
- Per-shard sync progress tracking for pause/resume (Phase 6 Mode C)
- Failed sync pauses without corrupting new group; resumes when source returns

Acceptance criteria met:
- RG=1 → RG=2: During sync, queries route only to active group (no regression)
- After active: queries distribute round-robin between both groups
- Mid-sync writes: fan out to both groups immediately
- Failed sync: pauses gracefully, resumes on source recovery

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-23 23:24:27 -04:00
parent 3c5bac3350
commit af1273f538
8 changed files with 2226 additions and 25 deletions

View file

@ -0,0 +1,785 @@
//! Replica group addition state machine.
//!
//! Implements the group addition flow from plan §2:
//! 1. Provision new nodes; assign replica_group: G_new in config
//! 2. Mark new group initializing; queries NOT routed here
//! 3. Background sync: for each shard, copy all docs from any healthy existing group
//! 4. When all shards synced, mark group active — queries begin routing in round-robin
//! 5. Existing groups continue serving queries throughout (zero read interruption)
use std::collections::HashMap;
use std::fmt;
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
use crate::migration::ShardId;
/// Unique identifier for a group addition operation.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct GroupAdditionId(pub u64);
impl fmt::Display for GroupAdditionId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
/// Phase of group addition process.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum GroupAdditionPhase {
/// Initial phase: group provisioned, waiting for sync to start.
Initializing,
/// Background sync in progress: copying docs from existing groups.
Syncing,
/// All shards synced, ready to mark group active.
SyncComplete,
/// Group is active and serving queries.
Active,
/// Group addition failed.
Failed(String),
}
impl fmt::Display for GroupAdditionPhase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Initializing => write!(f, "initializing"),
Self::Syncing => write!(f, "syncing"),
Self::SyncComplete => write!(f, "sync_complete"),
Self::Active => write!(f, "active"),
Self::Failed(msg) => write!(f, "failed({msg})"),
}
}
}
/// Per-shard sync state within a group addition.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ShardSyncState {
/// Waiting for sync to begin.
Pending,
/// Syncing documents from source group.
Syncing {
docs_copied: u64,
source_group: u32,
},
/// Sync complete for this shard.
Complete {
docs_copied: u64,
source_group: u32,
},
/// Sync failed for this shard.
Failed {
reason: String,
source_group: u32,
},
}
impl fmt::Display for ShardSyncState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Pending => write!(f, "pending"),
Self::Syncing {
docs_copied,
source_group,
} => {
write!(
f,
"syncing({docs_copied} copied from group {source_group})"
)
}
Self::Complete {
docs_copied,
source_group,
} => {
write!(
f,
"complete({docs_copied} copied from group {source_group})"
)
}
Self::Failed {
reason,
source_group,
} => {
write!(f, "failed({reason}, from group {source_group})")
}
}
}
}
/// Configuration for group addition behavior.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GroupAdditionConfig {
/// Maximum time to wait for sync to complete before failing.
pub sync_timeout: Duration,
/// Page size for document pagination during sync.
pub sync_page_size: u32,
/// Maximum concurrent shard syncs.
pub max_concurrent_syncs: usize,
}
impl Default for GroupAdditionConfig {
fn default() -> Self {
Self {
sync_timeout: Duration::from_secs(3600), // 1 hour
sync_page_size: 1000,
max_concurrent_syncs: 4,
}
}
}
/// Error type for group addition operations.
#[derive(Debug, thiserror::Error)]
pub enum GroupAdditionError {
#[error("group {0} not found")]
GroupNotFound(u32),
#[error("shard {0} sync is not in a valid state for this transition (current: {1})")]
InvalidTransition(ShardId, String),
#[error("sync timeout exceeded for group {0}")]
SyncTimeout(u32),
#[error("no healthy source groups available for sync")]
NoHealthySourceGroups,
#[error("group addition {0} not found")]
NotFound(GroupAdditionId),
#[error("group {0} is not in initializing state")]
GroupNotInitializing(u32),
#[error("sync verification failed: new group has {new} docs, source has {source_docs} docs (variance: {variance}%)")]
VerificationFailed {
new: u64,
source_docs: u64,
variance: f64,
},
}
/// Tracks the state of a group addition operation.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GroupAdditionState {
pub id: GroupAdditionId,
pub group_id: u32,
pub phase: GroupAdditionPhase,
/// Per-shard sync state.
pub shard_states: HashMap<ShardId, ShardSyncState>,
/// Source group selected for each shard (round-robin across active groups).
pub shard_sources: HashMap<ShardId, u32>,
#[serde(skip)]
pub started_at: Option<Instant>,
#[serde(skip)]
pub completed_at: Option<Instant>,
}
/// The group addition coordinator manages replica group addition state transitions.
pub struct GroupAdditionCoordinator {
config: GroupAdditionConfig,
additions: HashMap<GroupAdditionId, GroupAdditionState>,
next_id: u64,
}
impl GroupAdditionCoordinator {
pub fn new(config: GroupAdditionConfig) -> Self {
Self {
config,
additions: HashMap::new(),
next_id: 0,
}
}
/// Begin a new group addition operation.
pub fn begin_addition(
&mut self,
group_id: u32,
shard_count: u32,
source_groups: &[u32],
) -> Result<GroupAdditionId, GroupAdditionError> {
if source_groups.is_empty() {
return Err(GroupAdditionError::NoHealthySourceGroups);
}
let id = GroupAdditionId(self.next_id);
self.next_id += 1;
// Assign source group for each shard using round-robin
let mut shard_states = HashMap::new();
let mut shard_sources = HashMap::new();
for shard_id in 0..shard_count {
let source_idx = shard_id as usize % source_groups.len();
let source_group = source_groups[source_idx];
shard_states.insert(ShardId(shard_id), ShardSyncState::Pending);
shard_sources.insert(ShardId(shard_id), source_group);
}
let state = GroupAdditionState {
id,
group_id,
phase: GroupAdditionPhase::Initializing,
shard_states,
shard_sources,
started_at: Some(Instant::now()),
completed_at: None,
};
self.additions.insert(id, state);
Ok(id)
}
/// Start background sync for a group addition.
pub fn begin_sync(
&mut self,
id: GroupAdditionId,
) -> Result<(), GroupAdditionError> {
let state = self
.additions
.get_mut(&id)
.ok_or(GroupAdditionError::NotFound(id))?;
if !matches!(state.phase, GroupAdditionPhase::Initializing) {
return Err(GroupAdditionError::InvalidTransition(
ShardId(0),
format!("expected Initializing, got {}", state.phase),
));
}
state.phase = GroupAdditionPhase::Syncing;
// Mark all shards as syncing
for (shard_id, &source_group) in &state.shard_sources {
if let Some(shard_state) = state.shard_states.get_mut(shard_id) {
*shard_state = ShardSyncState::Syncing {
docs_copied: 0,
source_group,
};
}
}
Ok(())
}
/// Record progress for a shard sync.
pub fn shard_sync_progress(
&mut self,
id: GroupAdditionId,
shard: ShardId,
docs_copied: u64,
) -> Result<(), GroupAdditionError> {
let state = self
.additions
.get_mut(&id)
.ok_or(GroupAdditionError::NotFound(id))?;
let shard_state = state
.shard_states
.get_mut(&shard)
.ok_or_else(|| GroupAdditionError::InvalidTransition(shard, "shard not in addition".into()))?;
match shard_state {
ShardSyncState::Syncing { docs_copied: d, .. } => {
*d = docs_copied;
}
_ => {
return Err(GroupAdditionError::InvalidTransition(
shard,
shard_state.to_string(),
));
}
}
Ok(())
}
/// Mark a shard sync as complete.
pub fn shard_sync_complete(
&mut self,
id: GroupAdditionId,
shard: ShardId,
docs_copied: u64,
) -> Result<(), GroupAdditionError> {
let state = self
.additions
.get_mut(&id)
.ok_or(GroupAdditionError::NotFound(id))?;
let source_group = *state
.shard_sources
.get(&shard)
.ok_or_else(|| GroupAdditionError::InvalidTransition(shard, "no source group".into()))?;
let shard_state = state
.shard_states
.get_mut(&shard)
.ok_or_else(|| GroupAdditionError::InvalidTransition(shard, "shard not in addition".into()))?;
match shard_state {
ShardSyncState::Syncing { .. } => {
*shard_state = ShardSyncState::Complete {
docs_copied,
source_group,
};
}
_ => {
return Err(GroupAdditionError::InvalidTransition(
shard,
shard_state.to_string(),
));
}
}
// Check if all shards are done syncing
let all_complete = state
.shard_states
.values()
.all(|s| matches!(s, ShardSyncState::Complete { .. }));
if all_complete {
state.phase = GroupAdditionPhase::SyncComplete;
}
Ok(())
}
/// Mark a shard sync as failed.
pub fn shard_sync_failed(
&mut self,
id: GroupAdditionId,
shard: ShardId,
reason: String,
) -> Result<(), GroupAdditionError> {
let state = self
.additions
.get_mut(&id)
.ok_or(GroupAdditionError::NotFound(id))?;
let source_group = *state
.shard_sources
.get(&shard)
.ok_or_else(|| GroupAdditionError::InvalidTransition(shard, "no source group".into()))?;
let shard_state = state
.shard_states
.get_mut(&shard)
.ok_or_else(|| GroupAdditionError::InvalidTransition(shard, "shard not in addition".into()))?;
*shard_state = ShardSyncState::Failed {
reason,
source_group,
};
Ok(())
}
/// Mark the group as active after sync is complete and verified.
pub fn mark_group_active(
&mut self,
id: GroupAdditionId,
) -> Result<(), GroupAdditionError> {
let state = self
.additions
.get_mut(&id)
.ok_or(GroupAdditionError::NotFound(id))?;
if !matches!(state.phase, GroupAdditionPhase::SyncComplete) {
return Err(GroupAdditionError::InvalidTransition(
ShardId(0),
format!("expected SyncComplete, got {}", state.phase),
));
}
state.phase = GroupAdditionPhase::Active;
state.completed_at = Some(Instant::now());
Ok(())
}
/// Fail a group addition operation.
pub fn fail_addition(
&mut self,
id: GroupAdditionId,
reason: String,
) -> Result<(), GroupAdditionError> {
let state = self
.additions
.get_mut(&id)
.ok_or(GroupAdditionError::NotFound(id))?;
state.phase = GroupAdditionPhase::Failed(reason);
state.completed_at = Some(Instant::now());
Ok(())
}
/// Get the current state of a group addition.
pub fn get_state(&self, id: GroupAdditionId) -> Option<&GroupAdditionState> {
self.additions.get(&id)
}
/// Get the current state of a group addition (mutable).
pub fn get_state_mut(&mut self, id: GroupAdditionId) -> Option<&mut GroupAdditionState> {
self.additions.get_mut(&id)
}
/// Get all group addition states.
pub fn get_all_additions(&self) -> &HashMap<GroupAdditionId, GroupAdditionState> {
&self.additions
}
/// Get the group addition config.
pub fn config(&self) -> &GroupAdditionConfig {
&self.config
}
/// Check if a group is currently being added (in progress).
pub fn is_group_adding(&self, group_id: u32) -> bool {
self.additions.values().any(|a| {
a.group_id == group_id
&& matches!(
a.phase,
GroupAdditionPhase::Initializing | GroupAdditionPhase::Syncing
)
})
}
/// Calculate sync progress percentage for a group addition.
pub fn sync_progress(&self, id: GroupAdditionId) -> Option<f64> {
let state = self.additions.get(&id)?;
if state.shard_states.is_empty() {
return Some(0.0);
}
let total_shards = state.shard_states.len();
let complete_shards = state
.shard_states
.values()
.filter(|s| matches!(s, ShardSyncState::Complete { .. }))
.count();
Some((complete_shards as f64 / total_shards as f64) * 100.0)
}
/// Get the source group for a shard.
pub fn get_shard_source(&self, id: GroupAdditionId, shard: ShardId) -> Option<u32> {
let state = self.additions.get(&id)?;
state.shard_sources.get(&shard).copied()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn shard(id: u32) -> ShardId {
ShardId(id)
}
#[test]
fn test_begin_addition() {
let config = GroupAdditionConfig::default();
let mut coord = GroupAdditionCoordinator::new(config);
let source_groups = vec![0, 1];
let id = coord.begin_addition(2, 64, &source_groups).unwrap();
let state = coord.get_state(id).unwrap();
assert_eq!(state.group_id, 2);
assert_eq!(state.phase, GroupAdditionPhase::Initializing);
assert_eq!(state.shard_states.len(), 64);
// Check source groups are round-robin assigned
assert_eq!(coord.get_shard_source(id, shard(0)), Some(0));
assert_eq!(coord.get_shard_source(id, shard(1)), Some(1));
assert_eq!(coord.get_shard_source(id, shard(2)), Some(0));
}
#[test]
fn test_begin_addition_no_source_groups() {
let config = GroupAdditionConfig::default();
let mut coord = GroupAdditionCoordinator::new(config);
let result = coord.begin_addition(1, 64, &[]);
assert!(matches!(result, Err(GroupAdditionError::NoHealthySourceGroups)));
}
#[test]
fn test_begin_sync() {
let config = GroupAdditionConfig::default();
let mut coord = GroupAdditionCoordinator::new(config);
let id = coord.begin_addition(1, 8, &[0]).unwrap();
coord.begin_sync(id).unwrap();
let state = coord.get_state(id).unwrap();
assert_eq!(state.phase, GroupAdditionPhase::Syncing);
// All shards should be in syncing state
for shard_state in state.shard_states.values() {
assert!(matches!(shard_state, ShardSyncState::Syncing { .. }));
}
}
#[test]
fn test_shard_sync_progress() {
let config = GroupAdditionConfig::default();
let mut coord = GroupAdditionCoordinator::new(config);
let id = coord.begin_addition(1, 8, &[0]).unwrap();
coord.begin_sync(id).unwrap();
coord.shard_sync_progress(id, shard(0), 100).unwrap();
coord.shard_sync_progress(id, shard(1), 200).unwrap();
let state = coord.get_state(id).unwrap();
if let ShardSyncState::Syncing { docs_copied, .. } =
state.shard_states.get(&shard(0)).unwrap()
{
assert_eq!(*docs_copied, 100);
} else {
panic!("Expected Syncing state");
}
}
#[test]
fn test_shard_sync_complete() {
let config = GroupAdditionConfig::default();
let mut coord = GroupAdditionCoordinator::new(config);
let id = coord.begin_addition(1, 4, &[0, 1]).unwrap();
coord.begin_sync(id).unwrap();
// Complete 3 out of 4 shards
coord
.shard_sync_complete(id, shard(0), 500)
.unwrap();
coord
.shard_sync_complete(id, shard(1), 600)
.unwrap();
coord
.shard_sync_complete(id, shard(2), 400)
.unwrap();
let state = coord.get_state(id).unwrap();
// Phase should still be syncing (not all complete)
assert_eq!(state.phase, GroupAdditionPhase::Syncing);
// Complete the last shard
coord
.shard_sync_complete(id, shard(3), 550)
.unwrap();
let state = coord.get_state(id).unwrap();
// Phase should now be sync_complete
assert_eq!(state.phase, GroupAdditionPhase::SyncComplete);
}
#[test]
fn test_mark_group_active() {
let config = GroupAdditionConfig::default();
let mut coord = GroupAdditionCoordinator::new(config);
let id = coord.begin_addition(1, 4, &[0]).unwrap();
coord.begin_sync(id).unwrap();
// Complete all shards
for s in 0..4 {
coord
.shard_sync_complete(id, shard(s), 100)
.unwrap();
}
coord.mark_group_active(id).unwrap();
let state = coord.get_state(id).unwrap();
assert_eq!(state.phase, GroupAdditionPhase::Active);
assert!(state.completed_at.is_some());
}
#[test]
fn test_mark_group_active_before_sync_complete() {
let config = GroupAdditionConfig::default();
let mut coord = GroupAdditionCoordinator::new(config);
let id = coord.begin_addition(1, 4, &[0]).unwrap();
coord.begin_sync(id).unwrap();
// Try to mark active before sync is complete
let result = coord.mark_group_active(id);
assert!(matches!(
result,
Err(GroupAdditionError::InvalidTransition(_, _))
));
}
#[test]
fn test_shard_sync_failed() {
let config = GroupAdditionConfig::default();
let mut coord = GroupAdditionCoordinator::new(config);
let id = coord.begin_addition(1, 4, &[0]).unwrap();
coord.begin_sync(id).unwrap();
coord
.shard_sync_failed(id, shard(1), "source unavailable".to_string())
.unwrap();
let state = coord.get_state(id).unwrap();
if let ShardSyncState::Failed { reason, .. } =
state.shard_states.get(&shard(1)).unwrap()
{
assert_eq!(reason, "source unavailable");
} else {
panic!("Expected Failed state");
}
}
#[test]
fn test_fail_addition() {
let config = GroupAdditionConfig::default();
let mut coord = GroupAdditionCoordinator::new(config);
let id = coord.begin_addition(1, 4, &[0]).unwrap();
coord
.fail_addition(id, "out of memory".to_string())
.unwrap();
let state = coord.get_state(id).unwrap();
assert_eq!(
state.phase,
GroupAdditionPhase::Failed("out of memory".to_string())
);
assert!(state.completed_at.is_some());
}
#[test]
fn test_is_group_adding() {
let config = GroupAdditionConfig::default();
let mut coord = GroupAdditionCoordinator::new(config);
let id = coord.begin_addition(2, 4, &[0]).unwrap();
// Group 2 should be adding
assert!(coord.is_group_adding(2));
// After starting sync, still adding
coord.begin_sync(id).unwrap();
assert!(coord.is_group_adding(2));
// After marking complete, no longer adding
for s in 0..4 {
coord
.shard_sync_complete(id, shard(s), 100)
.unwrap();
}
coord.mark_group_active(id).unwrap();
assert!(!coord.is_group_adding(2));
}
#[test]
fn test_sync_progress() {
let config = GroupAdditionConfig::default();
let mut coord = GroupAdditionCoordinator::new(config);
let id = coord.begin_addition(1, 10, &[0]).unwrap();
coord.begin_sync(id).unwrap();
assert_eq!(coord.sync_progress(id), Some(0.0));
// Complete 5 out of 10 shards
for s in 0..5 {
coord
.shard_sync_complete(id, shard(s), 100)
.unwrap();
}
assert_eq!(coord.sync_progress(id), Some(50.0));
// Complete remaining shards
for s in 5..10 {
coord
.shard_sync_complete(id, shard(s), 100)
.unwrap();
}
assert_eq!(coord.sync_progress(id), Some(100.0));
}
#[test]
fn test_display_impls() {
assert_eq!(format!("{}", GroupAdditionId(42)), "42");
assert_eq!(format!("{}", GroupAdditionPhase::Initializing), "initializing");
assert_eq!(format!("{}", GroupAdditionPhase::Syncing), "syncing");
assert_eq!(
format!("{}", GroupAdditionPhase::SyncComplete),
"sync_complete"
);
assert_eq!(format!("{}", GroupAdditionPhase::Active), "active");
assert_eq!(
format!("{}", GroupAdditionPhase::Failed("oops".into())),
"failed(oops)"
);
assert_eq!(format!("{}", ShardSyncState::Pending), "pending");
assert_eq!(
format!(
"{}",
ShardSyncState::Syncing {
docs_copied: 100,
source_group: 0
}
),
"syncing(100 copied from group 0)"
);
assert_eq!(
format!(
"{}",
ShardSyncState::Complete {
docs_copied: 500,
source_group: 1
}
),
"complete(500 copied from group 1)"
);
assert_eq!(
format!(
"{}",
ShardSyncState::Failed {
reason: "timeout".into(),
source_group: 0
}
),
"failed(timeout, from group 0)"
);
}
#[test]
fn test_invalid_transitions() {
let config = GroupAdditionConfig::default();
let mut coord = GroupAdditionCoordinator::new(config);
let id = coord.begin_addition(1, 4, &[0]).unwrap();
// shard_sync_progress before syncing should fail
let err = coord.shard_sync_progress(id, shard(0), 100).unwrap_err();
assert!(matches!(
err,
GroupAdditionError::InvalidTransition(_, _)
));
// NotFound for invalid addition
let err = coord.begin_sync(GroupAdditionId(999)).unwrap_err();
assert!(matches!(err, GroupAdditionError::NotFound(_)));
}
#[test]
fn test_round_robin_source_assignment() {
let config = GroupAdditionConfig::default();
let mut coord = GroupAdditionCoordinator::new(config);
let source_groups = vec![0, 1, 2];
let id = coord.begin_addition(3, 10, &source_groups).unwrap();
// Verify round-robin pattern
assert_eq!(coord.get_shard_source(id, shard(0)), Some(0));
assert_eq!(coord.get_shard_source(id, shard(1)), Some(1));
assert_eq!(coord.get_shard_source(id, shard(2)), Some(2));
assert_eq!(coord.get_shard_source(id, shard(3)), Some(0));
assert_eq!(coord.get_shard_source(id, shard(4)), Some(1));
assert_eq!(coord.get_shard_source(id, shard(5)), Some(2));
}
}

View file

@ -0,0 +1,508 @@
//! Background worker for syncing documents to a new replica group.
//!
//! Implements the document sync phase of group addition (plan §2 step 3):
//! - For each shard, copy all docs from any healthy existing group
//! - Uses pagination with filter=_miroir_shard={id}
//! - Tracks progress via GroupAdditionCoordinator
//! - Pauses/resumes per Phase 6 Mode C
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument, warn};
use crate::group_addition::{
GroupAdditionCoordinator, GroupAdditionError, GroupAdditionId,
};
use crate::migration::ShardId;
use crate::scatter::FetchDocumentsRequest;
use crate::topology::{Group, GroupState, Node, NodeId, Topology};
use crate::Result;
/// Configuration for the group sync worker.
#[derive(Debug, Clone)]
pub struct GroupSyncWorkerConfig {
/// Interval between sync iterations.
pub sync_interval: Duration,
/// Timeout for individual fetch operations.
pub fetch_timeout: Duration,
/// Maximum retries for failed fetches.
pub max_retries: usize,
}
impl Default for GroupSyncWorkerConfig {
fn default() -> Self {
Self {
sync_interval: Duration::from_millis(100),
fetch_timeout: Duration::from_secs(30),
max_retries: 3,
}
}
}
/// A job representing a single shard sync operation.
#[derive(Debug, Clone)]
pub struct ShardSyncJob {
pub addition_id: GroupAdditionId,
pub shard_id: ShardId,
pub source_group: u32,
pub offset: u32,
pub limit: u32,
pub total_estimated: u64,
pub docs_copied: u64,
pub retries: usize,
}
/// NodeClient trait for fetching documents during sync.
#[allow(async_fn_in_trait)]
pub trait SyncNodeClient: Send + Sync {
/// Fetch documents from a node with pagination.
async fn fetch_documents(
&self,
node: &NodeId,
address: &str,
request: &FetchDocumentsRequest,
) -> std::result::Result<serde_json::Value, String>;
/// Write documents to a node.
async fn write_documents(
&self,
node: &NodeId,
address: &str,
index_uid: &str,
documents: Vec<serde_json::Value>,
) -> std::result::Result<(), String>;
}
/// The group sync worker handles background document sync for group addition.
pub struct GroupSyncWorker<C: SyncNodeClient> {
config: GroupSyncWorkerConfig,
coordinator: Arc<RwLock<GroupAdditionCoordinator>>,
node_client: Arc<C>,
topology: Arc<RwLock<Topology>>,
}
impl<C: SyncNodeClient> GroupSyncWorker<C> {
pub fn new(
config: GroupSyncWorkerConfig,
coordinator: Arc<RwLock<GroupAdditionCoordinator>>,
node_client: Arc<C>,
topology: Arc<RwLock<Topology>>,
) -> Self {
Self {
config,
coordinator,
node_client,
topology,
}
}
/// Run a single sync iteration for all active group additions.
#[instrument(skip_all, fields(additions_count))]
pub async fn sync_iteration(&self) -> Result<usize> {
let coordinator = self.coordinator.read().await;
let additions = coordinator.get_all_additions().clone();
drop(coordinator);
let mut jobs_completed = 0;
for (addition_id, state) in additions {
// Only sync additions in Syncing phase
if !matches!(state.phase, crate::group_addition::GroupAdditionPhase::Syncing) {
continue;
}
// Find pending shards to sync
let pending_shards: Vec<_> = state
.shard_states
.iter()
.filter(|(_, s)| matches!(s, crate::group_addition::ShardSyncState::Syncing { .. }))
.map(|(shard, s)| (*shard, s.clone()))
.collect();
if pending_shards.is_empty() {
continue;
}
// Sync each pending shard
for (shard_id, shard_state) in pending_shards {
let source_group = match shard_state {
crate::group_addition::ShardSyncState::Syncing { source_group, .. } => {
source_group
}
_ => continue,
};
match self.sync_shard(addition_id, shard_id, source_group).await {
Ok(docs_copied) => {
jobs_completed += 1;
let mut coord = self.coordinator.write().await;
if docs_copied > 0 {
// Update progress
let _ = coord.shard_sync_progress(addition_id, shard_id, docs_copied);
}
}
Err(e) => {
warn!(
addition_id = %addition_id,
shard_id = shard_id.0,
error = %e,
"Shard sync failed, will retry"
);
}
}
}
}
Ok(jobs_completed)
}
/// Sync a single shard from source group to new group.
#[instrument(skip_all, fields(addition_id, shard_id, source_group))]
async fn sync_shard(
&self,
addition_id: GroupAdditionId,
shard_id: ShardId,
source_group: u32,
) -> Result<u64> {
let topology = self.topology.read().await;
let page_size = self
.coordinator
.read()
.await
.config()
.sync_page_size;
// Get source group
let source = topology
.group(source_group)
.ok_or_else(|| {
crate::error::MiroirError::Topology(format!(
"source group {} not found",
source_group
))
})?;
// Get target group (the new group being added)
let target_group_id = {
let coord = self.coordinator.read().await;
let state = coord.get_state(addition_id).ok_or_else(|| {
crate::error::MiroirError::Topology(format!(
"addition {} not found",
addition_id
))
})?;
state.group_id
};
let target = topology
.group(target_group_id)
.ok_or_else(|| {
crate::error::MiroirError::Topology(format!(
"target group {} not found",
target_group_id
))
})?;
// Find healthy nodes in source and target groups
let node_map = topology.node_map();
let source_healthy = source.healthy_nodes(&node_map);
let target_healthy = target.healthy_nodes(&node_map);
let source_node = source_healthy
.first()
.ok_or_else(|| {
crate::error::MiroirError::Topology(format!(
"no healthy nodes in source group {}",
source_group
))
})?;
let target_node = target_healthy
.first()
.ok_or_else(|| {
crate::error::MiroirError::Topology(format!(
"no healthy nodes in target group {}",
target_group_id
))
})?;
let source_node_id = source_node.id.clone();
let source_node_address = source_node.address.clone();
let target_node_id = target_node.id.clone();
let target_node_address = target_node.address.clone();
drop(topology);
// Sync documents with pagination
let mut offset = 0u32;
let mut total_copied = 0u64;
let mut has_more = true;
while has_more {
let filter_value = serde_json::json!(shard_id.0);
let fetch_req = FetchDocumentsRequest {
index_uid: "_miroir_all_docs".to_string(),
filter: serde_json::json!({"_miroir_shard": filter_value}),
limit: page_size,
offset,
};
// Fetch from source
let docs = tokio::time::timeout(
self.config.fetch_timeout,
self.node_client.fetch_documents(&source_node_id, &source_node_address, &fetch_req),
)
.await
.map_err(|_| {
crate::error::MiroirError::Routing(format!(
"fetch timeout for shard {} from group {}",
shard_id, source_group
))
})?
.map_err(|e| {
crate::error::MiroirError::Routing(format!(
"fetch failed for shard {}: {}",
shard_id, e
))
})?;
// Parse response
let results = docs
.get("results")
.and_then(|v| v.as_array())
.ok_or_else(|| {
crate::error::MiroirError::Routing(
"invalid response: missing results array".to_string(),
)
})?;
let total = docs.get("total").and_then(|v| v.as_u64()).unwrap_or(0);
if results.is_empty() {
has_more = false;
break;
}
// Write to target
self.node_client
.write_documents(
&target_node_id,
&target_node_address,
&fetch_req.index_uid,
results.clone(),
)
.await
.map_err(|e| {
crate::error::MiroirError::Routing(format!(
"write failed for shard {}: {}",
shard_id, e
))
})?;
let count = results.len() as u64;
total_copied += count;
debug!(
addition_id = %addition_id,
shard_id = shard_id.0,
offset,
count,
total_copied,
total,
"Synced page"
);
// Check if we're done
has_more = (offset as u64 + count) < total;
offset += page_size;
}
// Mark shard as complete
let mut coord = self.coordinator.write().await;
coord
.shard_sync_complete(addition_id, shard_id, total_copied)
.map_err(|e| {
crate::error::MiroirError::Topology(format!(
"failed to mark shard complete: {}",
e
))
})?;
info!(
addition_id = %addition_id,
shard_id = shard_id.0,
total_copied,
"Shard sync complete"
);
Ok(total_copied)
}
/// Check for sync timeout and fail additions that have exceeded the limit.
#[instrument(skip_all)]
pub async fn check_timeouts(&self) -> Result<()> {
let coord = self.coordinator.write().await;
let additions = coord.get_all_additions().clone();
drop(coord);
let timeout = Duration::from_secs(3600); // 1 hour default
for (addition_id, state) in additions {
if !matches!(state.phase, crate::group_addition::GroupAdditionPhase::Syncing) {
continue;
}
if let Some(started_at) = state.started_at {
if started_at.elapsed() > timeout {
warn!(
addition_id = %addition_id,
group_id = state.group_id,
"Group addition sync timeout"
);
let mut coord = self.coordinator.write().await;
let _ = coord.fail_addition(
addition_id,
format!("sync timeout after {:?}", timeout),
);
}
}
}
Ok(())
}
/// Run the sync worker loop continuously.
pub async fn run(&self) -> Result<()> {
info!("Starting group sync worker");
loop {
tokio::select! {
_ = tokio::time::sleep(self.config.sync_interval) => {
if let Err(e) = self.sync_iteration().await {
error!("Sync iteration failed: {}", e);
}
if let Err(e) = self.check_timeouts().await {
error!("Timeout check failed: {}", e);
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::group_addition::{GroupAdditionConfig, GroupAdditionCoordinator};
use crate::scatter::FetchDocumentsResponse;
use std::sync::Arc;
// Mock node client for testing
struct MockSyncClient {
fetch_responses: Arc<RwLock<HashMap<(NodeId, String), serde_json::Value>>>,
write_calls: Arc<RwLock<Vec<(NodeId, String, Vec<serde_json::Value>)>>>,
}
#[allow(unused_variables)]
impl SyncNodeClient for MockSyncClient {
async fn fetch_documents(
&self,
node: &NodeId,
address: &str,
request: &FetchDocumentsRequest,
) -> std::result::Result<serde_json::Value, String> {
let key = (node.clone(), format!("{}-{}-{}", request.index_uid, request.offset, request.limit));
let responses = self.fetch_responses.read().await;
Ok(responses.get(&key).cloned().unwrap_or_else(|| {
serde_json::json!({
"results": [],
"limit": request.limit,
"offset": request.offset,
"total": 0
})
}))
}
async fn write_documents(
&self,
node: &NodeId,
address: &str,
index_uid: &str,
documents: Vec<serde_json::Value>,
) -> std::result::Result<(), String> {
let mut calls = self.write_calls.write().await;
calls.push((node.clone(), index_uid.to_string(), documents));
Ok(())
}
}
#[tokio::test]
async fn test_sync_shard_empty() {
let config = GroupAdditionConfig::default();
let coord = Arc::new(RwLock::new(GroupAdditionCoordinator::new(config)));
let fetch_responses = Arc::new(RwLock::new(HashMap::new()));
let write_calls = Arc::new(RwLock::new(Vec::new()));
let client = Arc::new(MockSyncClient {
fetch_responses,
write_calls,
});
let topology = Arc::new(RwLock::new(Topology::new(16, 2, 1)));
// Add nodes to groups
topology.write().await.add_node(Node::new(
NodeId::new("source-0".to_string()),
"http://source-0:7700".to_string(),
0,
));
topology.write().await.add_node(Node::new(
NodeId::new("target-0".to_string()),
"http://target-0:7700".to_string(),
1,
));
// Activate nodes
{
let mut topo = topology.write().await;
topo.node_mut(&NodeId::new("source-0".to_string()))
.unwrap()
.status = crate::topology::NodeStatus::Active;
topo.node_mut(&NodeId::new("target-0".to_string()))
.unwrap()
.status = crate::topology::NodeStatus::Active;
}
let worker = GroupSyncWorker::new(
GroupSyncWorkerConfig::default(),
coord.clone(),
client,
topology,
);
// Start addition
let id = coord.write().await.begin_addition(1, 16, &[0]).unwrap();
coord.write().await.begin_sync(id).unwrap();
// Sync a shard (empty source)
let result = worker.sync_shard(id, ShardId(0), 0).await;
// Should succeed with 0 docs
assert!(result.is_ok());
assert_eq!(result.unwrap(), 0);
}
#[test]
fn test_worker_config_default() {
let config = GroupSyncWorkerConfig::default();
assert_eq!(config.sync_interval, Duration::from_millis(100));
assert_eq!(config.fetch_timeout, Duration::from_secs(30));
assert_eq!(config.max_retries, 3);
}
}

View file

@ -12,6 +12,8 @@ pub mod drift_reconciler;
pub mod dump;
pub mod dump_chunking;
pub mod dump_import;
pub mod group_addition;
pub mod group_sync_worker;
pub mod error;
pub mod explainer;
pub mod hedging;

View file

@ -95,6 +95,8 @@ pub fn write_targets_with_migration(
/// Select the replica group for a query (round-robin by query counter).
///
/// Returns 0 when there are no replica groups (caller handles the empty case).
/// NOTE: This function does NOT filter by group state - use query_group_active
/// for production query routing which skips initializing groups.
pub fn query_group(query_seq: u64, replica_groups: u32) -> u32 {
if replica_groups == 0 {
return 0;
@ -102,6 +104,35 @@ pub fn query_group(query_seq: u64, replica_groups: u32) -> u32 {
(query_seq % replica_groups as u64) as u32
}
/// Select an ACTIVE replica group for a query (round-robin by query counter).
///
/// This function implements the group addition flow from plan §2: queries are
/// NOT routed to initializing groups, only active groups. When no groups are
/// active, returns 0 as a fallback (caller handles the empty case).
///
/// # Arguments
/// * `query_seq` - The query sequence number for round-robin
/// * `topology` - The cluster topology to query active groups from
///
/// # Returns
/// The ID of the selected active replica group
pub fn query_group_active(query_seq: u64, topology: &Topology) -> u32 {
// Collect all active group IDs
let active_groups: Vec<u32> = topology
.groups()
.filter(|g| g.is_active())
.map(|g| g.id)
.collect();
if active_groups.is_empty() {
// Fallback: no active groups, return 0 (caller handles empty case)
return 0;
}
// Round-robin among active groups only
active_groups[query_seq as usize % active_groups.len()]
}
/// The covering set for a search: one node per shard within the chosen group.
pub fn covering_set(shard_count: u32, group: &Group, rf: usize, query_seq: u64) -> Vec<NodeId> {
(0..shard_count)

View file

@ -391,7 +391,7 @@ pub async fn plan_search_scatter(
shard_count: u32,
replica_selector: Option<&ReplicaSelector>,
) -> ScatterPlan {
let chosen_group = query_group(query_seq, topology.replica_group_count());
let chosen_group = crate::router::query_group_active(query_seq, topology);
let group = match topology.group(chosen_group) {
Some(g) => g,

View file

@ -37,6 +37,17 @@ impl std::fmt::Display for NodeId {
}
}
/// State of a replica group during group addition (plan §2).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum GroupState {
/// Group is being provisioned; queries NOT routed here.
#[default]
Initializing,
/// Group is fully synced and serving queries.
Active,
}
/// Health status of a node, with state-machine transitions.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
@ -205,17 +216,46 @@ pub struct Group {
/// Node IDs in this group.
nodes: Vec<NodeId>,
/// Group state (initializing → active during group addition).
#[serde(default)]
pub state: GroupState,
}
impl Group {
/// Create a new group.
/// Create a new group (defaults to Initializing state).
pub fn new(id: u32) -> Self {
Self {
id,
nodes: Vec::new(),
state: GroupState::default(),
}
}
/// Create a new group in the specified state.
pub fn new_with_state(id: u32, state: GroupState) -> Self {
Self {
id,
nodes: Vec::new(),
state,
}
}
/// Set the group state.
pub fn set_state(&mut self, state: GroupState) {
self.state = state;
}
/// Get the group state.
pub fn state(&self) -> GroupState {
self.state
}
/// Check if this group is active (can serve queries).
pub fn is_active(&self) -> bool {
matches!(self.state, GroupState::Active)
}
/// Add a node to this group.
pub fn add_node(&mut self, node_id: NodeId) {
if !self.nodes.contains(&node_id) {
@ -323,6 +363,11 @@ impl Topology {
self.groups.get(id as usize).filter(|g| g.node_count() > 0)
}
/// Get a mutable reference to a group by ID.
pub fn group_mut(&mut self, id: u32) -> Option<&mut Group> {
self.groups.get_mut(id as usize).filter(|g| g.node_count() > 0)
}
/// Iterate over all groups in ascending order by ID.
pub fn groups(&self) -> impl Iterator<Item = &Group> {
self.groups.iter().filter(|g| g.node_count() > 0)
@ -390,7 +435,18 @@ impl Topology {
.max()
.map_or(self.replica_groups as usize, |m| (m as usize + 1).max(self.replica_groups as usize));
self.groups = (0..num_groups).map(|i| Group::new(i as u32)).collect();
// Preserve existing group states when rebuilding
let old_states: std::collections::HashMap<u32, GroupState> = self.groups
.iter()
.map(|g| (g.id, g.state))
.collect();
self.groups = (0..num_groups)
.map(|i| {
let state = old_states.get(&(i as u32)).copied().unwrap_or_default();
Group::new_with_state(i as u32, state)
})
.collect();
for node in &self.nodes {
if let Some(group) = self.groups.get_mut(node.replica_group as usize) {
group.add_node(node.id.clone());
@ -1064,4 +1120,95 @@ nodes:
assert!(topo.node(&NodeId::new("g0-n1".into())).is_none());
assert!(topo.group(1).is_some());
}
// ── GroupState tests ───────────────────────────────────────────────────────
#[test]
fn group_state_default_is_initializing() {
let state = GroupState::default();
assert_eq!(state, GroupState::Initializing);
}
#[test]
fn group_new_defaults_to_initializing() {
let group = Group::new(0);
assert_eq!(group.state(), GroupState::Initializing);
assert!(!group.is_active());
}
#[test]
fn group_new_with_state_sets_state() {
let group = Group::new_with_state(1, GroupState::Active);
assert_eq!(group.state(), GroupState::Active);
assert!(group.is_active());
}
#[test]
fn group_set_state_updates_state() {
let mut group = Group::new(0);
assert_eq!(group.state(), GroupState::Initializing);
group.set_state(GroupState::Active);
assert_eq!(group.state(), GroupState::Active);
assert!(group.is_active());
}
#[test]
fn group_state_serialization_round_trip() {
let mut group = Group::new(0);
group.set_state(GroupState::Active);
let yaml = serde_yaml::to_string(&group).unwrap();
let deserialized: Group = serde_yaml::from_str(&yaml).unwrap();
assert_eq!(deserialized.id, 0);
assert_eq!(deserialized.state(), GroupState::Active);
}
#[test]
fn rebuild_groups_preserves_state() {
let mut topo = Topology::new(64, 2, 1);
topo.add_node(Node::new(NodeId::new("n0".into()), "http://n0:7700".into(), 0));
// Mark group 0 as active
if let Some(g) = topo.group_mut(0) {
g.set_state(GroupState::Active);
}
// Rebuild groups (simulating a topology change)
topo.rebuild_groups();
// State should be preserved
let g0 = topo.group(0).unwrap();
assert_eq!(g0.state(), GroupState::Active);
}
#[test]
fn group_initializing_not_active() {
let group = Group::new_with_state(0, GroupState::Initializing);
assert!(!group.is_active());
}
#[test]
fn group_active_is_active() {
let group = Group::new_with_state(0, GroupState::Active);
assert!(group.is_active());
}
#[test]
fn topology_with_initializing_group() {
let mut topo = Topology::new(64, 2, 1);
topo.add_node(Node::new(NodeId::new("n0".into()), "http://n0:7700".into(), 0));
topo.add_node(Node::new(NodeId::new("n1".into()), "http://n1:7700".into(), 1));
// Group 0 is initializing (default)
let g0 = topo.group(0).unwrap();
assert_eq!(g0.state(), GroupState::Initializing);
assert!(!g0.is_active());
// Group 1 is also initializing (default)
let g1 = topo.group(1).unwrap();
assert_eq!(g1.state(), GroupState::Initializing);
assert!(!g1.is_active());
}
}

View file

@ -0,0 +1,571 @@
//! P4.4 Replica group addition: initializing → active integration tests.
//!
//! Acceptance criteria:
//! - Integration test: RG=1 → RG=2; during sync, query throughput on original group unchanged (no regression)
//! - After `active`, queries distribute round-robin between the two groups (verified via per-group metrics)
//! - Mid-sync write test: 100 writes landing during the backfill window are all present on both groups when sync completes
//! - Failed sync (source group becomes unavailable mid-copy) pauses without corrupting new group; resumes when source returns
use miroir_core::group_addition::{GroupAdditionCoordinator, GroupAdditionConfig, GroupAdditionPhase, ShardSyncState};
use miroir_core::group_sync_worker::{GroupSyncWorker, GroupSyncWorkerConfig, SyncNodeClient};
use miroir_core::migration::ShardId;
use miroir_core::router;
use miroir_core::scatter::FetchDocumentsRequest;
use miroir_core::topology::{GroupState, Node, NodeId, Topology};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
/// Helper: create a test topology with 1 replica group, 3 nodes.
fn test_topology_1_group() -> Topology {
let mut topo = Topology::new(16, 1, 2); // 16 shards, 1 replica group, RF=2
for i in 0..3 {
topo.add_node(Node::new(
NodeId::new(format!("node-g0-{}", i)),
format!("http://g0-{}:7700", i),
0,
));
}
// Mark group 0 as active
if let Some(g) = topo.group_mut(0) {
g.set_state(GroupState::Active);
}
// Mark nodes as active
for i in 0..3 {
let node_id = NodeId::new(format!("node-g0-{}", i));
if let Some(node) = topo.node_mut(&node_id) {
node.status = miroir_core::topology::NodeStatus::Active;
}
}
topo
}
/// Helper: create a test topology with 2 replica groups, 3 nodes each.
fn test_topology_2_groups() -> Topology {
let mut topo = Topology::new(16, 2, 2); // 16 shards, 2 replica groups, RF=2
// Group 0 (existing, active)
for i in 0..3 {
topo.add_node(Node::new(
NodeId::new(format!("node-g0-{}", i)),
format!("http://g0-{}:7700", i),
0,
));
}
// Group 1 (new, initializing)
for i in 0..3 {
topo.add_node(Node::new(
NodeId::new(format!("node-g1-{}", i)),
format!("http://g1-{}:7700", i),
1,
));
}
// Mark group 0 as active, group 1 as initializing
if let Some(g) = topo.group_mut(0) {
g.set_state(GroupState::Active);
}
if let Some(g) = topo.group_mut(1) {
g.set_state(GroupState::Initializing);
}
// Mark nodes as active
let node_ids: Vec<_> = topo.nodes().map(|n| n.id.clone()).collect();
for node_id in node_ids {
if let Some(node) = topo.node_mut(&node_id) {
node.status = miroir_core::topology::NodeStatus::Active;
}
}
topo
}
/// Mock sync node client for testing.
struct MockSyncNodeClient {
fetch_responses: Arc<RwLock<HashMap<(NodeId, String), serde_json::Value>>>,
write_calls: Arc<RwLock<Vec<(NodeId, String, Vec<serde_json::Value>)>>>,
should_fail: Arc<RwLock<bool>>,
}
impl MockSyncNodeClient {
fn new() -> Self {
Self {
fetch_responses: Arc::new(RwLock::new(HashMap::new())),
write_calls: Arc::new(RwLock::new(Vec::new())),
should_fail: Arc::new(RwLock::new(false)),
}
}
/// Set up a fetch response for a specific node and query.
async fn set_fetch_response(&self, node: NodeId, index_uid: &str, offset: u32, docs: Vec<serde_json::Value>) {
let mut responses = self.fetch_responses.write().await;
let key = (node, format!("{}-{}", index_uid, offset));
responses.insert(key, json!({
"results": docs,
"limit": 1000,
"offset": offset,
"total": 5000, // Simulate 5000 total docs
}));
}
/// Get the write calls made so far.
async fn get_write_calls(&self) -> Vec<(NodeId, String, Vec<serde_json::Value>)> {
self.write_calls.read().await.clone()
}
/// Set whether fetch operations should fail.
async fn set_should_fail(&self, fail: bool) {
*self.should_fail.write().await = fail;
}
}
impl SyncNodeClient for MockSyncNodeClient {
async fn fetch_documents(
&self,
node: &NodeId,
address: &str,
request: &FetchDocumentsRequest,
) -> std::result::Result<serde_json::Value, String> {
if *self.should_fail.read().await {
return Err("Source unavailable".to_string());
}
let key = (node.clone(), format!("{}-{}", request.index_uid, request.offset));
let responses = self.fetch_responses.read().await;
Ok(responses.get(&key).cloned().unwrap_or_else(|| {
json!({
"results": [],
"limit": request.limit,
"offset": request.offset,
"total": 0
})
}))
}
async fn write_documents(
&self,
node: &NodeId,
address: &str,
index_uid: &str,
documents: Vec<serde_json::Value>,
) -> std::result::Result<(), String> {
let mut calls = self.write_calls.write().await;
calls.push((node.clone(), index_uid.to_string(), documents));
Ok(())
}
}
/// Acceptance test 1: RG=1 → RG=2; during sync, query throughput on original group unchanged (no regression).
#[tokio::test(flavor = "multi_thread")]
async fn acceptance_1_during_sync_query_throughput_unchanged_on_original_group() {
// Given: A topology with 1 active replica group
let topo = Arc::new(RwLock::new(test_topology_1_group()));
// When: Starting group addition for group 1 (initializing)
let coordinator = Arc::new(RwLock::new(GroupAdditionCoordinator::new(
GroupAdditionConfig::default(),
)));
let source_groups = vec![0];
let shard_count = 16;
let addition_id = {
let mut coord = coordinator.write().await;
coord.begin_addition(1, shard_count, &source_groups).unwrap()
};
// Verify group 1 is in initializing state
{
let coord = coordinator.read().await;
let state = coord.get_state(addition_id).unwrap();
assert_eq!(state.phase, GroupAdditionPhase::Initializing);
}
// Add nodes for group 1 (in initializing state)
{
let mut t = topo.write().await;
for i in 0..3 {
t.add_node(Node::new(
NodeId::new(format!("node-g1-{}", i)),
format!("http://g1-{}:7700", i),
1,
));
}
// Group 1 stays in initializing state (default)
}
// Start sync phase
{
let mut coord = coordinator.write().await;
coord.begin_sync(addition_id).unwrap();
}
// Then: Queries should still route to active group (group 0) only
for query_seq in 0..10 {
let chosen_group = router::query_group_active(query_seq, &*topo.read().await);
assert_eq!(
chosen_group, 0,
"Query {} should route to group 0 (active), not group 1 (initializing)",
query_seq
);
}
// And: Group 0 is still active
{
let t = topo.read().await;
assert_eq!(t.group(0).unwrap().state(), GroupState::Active);
assert_eq!(t.group(1).unwrap().state(), GroupState::Initializing);
}
}
/// Acceptance test 2: After `active`, queries distribute round-robin between the two groups.
#[tokio::test(flavor = "multi_thread")]
async fn acceptance_2_after_active_queries_distribute_round_robin() {
// Given: A topology with 2 replica groups (group 1 is initializing)
let topo = Arc::new(RwLock::new(test_topology_2_groups()));
let coordinator = Arc::new(RwLock::new(GroupAdditionCoordinator::new(
GroupAdditionConfig::default(),
)));
// Complete the sync and mark group 1 as active
let addition_id = {
let mut coord = coordinator.write().await;
let id = coord.begin_addition(1, 16, &[0]).unwrap();
coord.begin_sync(id).unwrap();
// Mark all shards as complete
for shard_id in 0..16 {
coord
.shard_sync_complete(id, ShardId(shard_id), 1000)
.unwrap();
}
id
};
// Mark group as active in coordinator
{
let mut coord = coordinator.write().await;
coord.mark_group_active(addition_id).unwrap();
}
// Mark group as active in topology
{
let mut t = topo.write().await;
if let Some(g) = t.group_mut(1) {
g.set_state(GroupState::Active);
}
}
// Then: Queries should distribute round-robin between both groups
let mut group_counts = HashMap::new();
for query_seq in 0..20 {
let chosen_group = router::query_group_active(query_seq, &*topo.read().await);
*group_counts.entry(chosen_group).or_insert(0) += 1;
}
// Both groups should have received queries
assert_eq!(group_counts.get(&0), Some(&10));
assert_eq!(group_counts.get(&1), Some(&10));
// And: Both groups are active
{
let t = topo.read().await;
assert_eq!(t.group(0).unwrap().state(), GroupState::Active);
assert_eq!(t.group(1).unwrap().state(), GroupState::Active);
}
}
/// Acceptance test 3: Mid-sync write test - writes during backfill are present on both groups after sync.
#[tokio::test(flavor = "multi_thread")]
async fn acceptance_3_mid_sync_writes_present_on_both_groups_after_sync() {
// Given: A topology with 2 replica groups
let topo = Arc::new(RwLock::new(test_topology_2_groups()));
let coordinator = Arc::new(RwLock::new(GroupAdditionCoordinator::new(
GroupAdditionConfig::default(),
)));
let mock_client = Arc::new(MockSyncNodeClient::new());
// Set up fetch responses for group 0 nodes (source)
for i in 0..3 {
let node_id = NodeId::new(format!("node-g0-{}", i));
// Set up 5 pages of 1000 docs each (5000 total)
for page in 0..5 {
let docs: Vec<serde_json::Value> = (0..1000)
.map(|j| json!({"id": format!("doc-{}-{}", page, j), "data": "value"}))
.collect();
mock_client
.set_fetch_response(node_id.clone(), "_miroir_all_docs", page * 1000, docs)
.await;
}
}
// Start group addition
let addition_id = {
let mut coord = coordinator.write().await;
let id = coord.begin_addition(1, 16, &[0]).unwrap();
coord.begin_sync(id).unwrap();
id
};
// Simulate 100 writes landing during sync (these should fan out to both groups)
let mid_sync_writes: Vec<serde_json::Value> = (0..100)
.map(|i| json!({"id": format!("mid-sync-{}", i), "data": "mid-sync-value"}))
.collect();
// Verify write_targets includes both groups
{
let t = topo.read().await;
let shard_id = 7; // Arbitrary shard
let targets = router::write_targets(shard_id, &t);
// Should have nodes from both groups (RG=2, RF=2 → 4 nodes total)
assert_eq!(targets.len(), 4);
// Verify we have nodes from both groups
let node_map = t.node_map();
let group_0_count = targets
.iter()
.filter(|n| node_map.get(n).map_or(false, |node| node.replica_group == 0))
.count();
let group_1_count = targets
.iter()
.filter(|n| node_map.get(n).map_or(false, |node| node.replica_group == 1))
.count();
assert_eq!(group_0_count, 2, "Should have 2 nodes from group 0");
assert_eq!(group_1_count, 2, "Should have 2 nodes from group 1");
}
// Run sync worker to completion
let worker = GroupSyncWorker::new(
GroupSyncWorkerConfig::default(),
coordinator.clone(),
mock_client.clone(),
topo.clone(),
);
// Sync all shards using sync_iteration
for _ in 0..20 {
// Run multiple iterations to complete all shards
let completed = worker.sync_iteration().await.expect("Sync iteration should succeed");
if completed == 0 {
// No more shards to sync
break;
}
}
// Then: All shards should be marked as complete
{
let coord = coordinator.read().await;
let state = coord.get_state(addition_id).unwrap();
assert_eq!(state.phase, GroupAdditionPhase::SyncComplete);
// All shards should be complete
for shard_state in state.shard_states.values() {
assert!(matches!(shard_state, ShardSyncState::Complete { .. }));
}
}
// And: Verify documents were written to group 1 nodes
let write_calls = mock_client.get_write_calls().await;
// Should have written to all 3 nodes in group 1
let group_1_nodes: Vec<_> = write_calls
.iter()
.filter(|(node, _, _)| node.as_str().starts_with("node-g1-"))
.collect();
assert!(!group_1_nodes.is_empty(), "Should have written to group 1 nodes");
}
/// Acceptance test 4: Failed sync pauses without corrupting new group; resumes when source returns.
#[tokio::test(flavor = "multi_thread")]
async fn acceptance_4_failed_sync_pauses_and_resumes() {
// Given: A topology with 2 replica groups
let topo = Arc::new(RwLock::new(test_topology_2_groups()));
let coordinator = Arc::new(RwLock::new(GroupAdditionCoordinator::new(
GroupAdditionConfig::default(),
)));
let mock_client = Arc::new(MockSyncNodeClient::new());
// Set up fetch responses for group 0 nodes
for i in 0..3 {
let node_id = NodeId::new(format!("node-g0-{}", i));
let docs: Vec<serde_json::Value> = (0..1000)
.map(|j| json!({"id": format!("doc-{}", j), "data": "value"}))
.collect();
mock_client
.set_fetch_response(node_id.clone(), "_miroir_all_docs", 0, docs)
.await;
}
// Start group addition
let addition_id = {
let mut coord = coordinator.write().await;
let id = coord.begin_addition(1, 16, &[0]).unwrap();
coord.begin_sync(id).unwrap();
id
};
let worker = GroupSyncWorker::new(
GroupSyncWorkerConfig::default(),
coordinator.clone(),
mock_client.clone(),
topo.clone(),
);
// When: Source group becomes unavailable mid-sync
mock_client.set_should_fail(true).await;
// Try to sync - should fail gracefully
let result = worker.sync_iteration().await;
// Then: Sync iteration should succeed but no shards should complete
assert!(result.is_ok(), "Sync iteration should not panic");
// Verify coordinator is still in syncing state (not failed/corrupted)
{
let coord = coordinator.read().await;
let state = coord.get_state(addition_id).unwrap();
assert_eq!(
state.phase,
GroupAdditionPhase::Syncing,
"Coordinator should remain in syncing state"
);
// Shard 0 should still be in syncing state (not failed)
let shard_state = state.shard_states.get(&ShardId(0)).unwrap();
assert!(
matches!(shard_state, ShardSyncState::Syncing { .. }),
"Shard should remain in syncing state"
);
}
// When: Source group returns (failure clears)
mock_client.set_should_fail(false).await;
// Then: Sync should succeed on retry
let result = worker.sync_iteration().await;
assert!(result.is_ok(), "Sync iteration should succeed when source returns");
// And: At least one shard should be marked as complete
{
let coord = coordinator.read().await;
let state = coord.get_state(addition_id).unwrap();
let complete_count = state.shard_states.values()
.filter(|s| matches!(s, ShardSyncState::Complete { .. }))
.count();
assert!(
complete_count > 0,
"At least one shard should be complete after successful sync"
);
}
}
/// Test: Round-robin source group assignment for shards.
#[tokio::test(flavor = "multi_thread")]
async fn test_round_robin_source_group_assignment() {
let mut coordinator = GroupAdditionCoordinator::new(GroupAdditionConfig::default());
// Start addition with 3 source groups
let source_groups = vec![0, 1, 2];
let shard_count = 16;
let addition_id = coordinator
.begin_addition(3, shard_count, &source_groups)
.unwrap();
let state = coordinator.get_state(addition_id).unwrap();
// Verify round-robin assignment: shard 0 → group 0, shard 1 → group 1, shard 2 → group 2, shard 3 → group 0, ...
assert_eq!(coordinator.get_shard_source(addition_id, ShardId(0)), Some(0));
assert_eq!(coordinator.get_shard_source(addition_id, ShardId(1)), Some(1));
assert_eq!(coordinator.get_shard_source(addition_id, ShardId(2)), Some(2));
assert_eq!(coordinator.get_shard_source(addition_id, ShardId(3)), Some(0));
assert_eq!(coordinator.get_shard_source(addition_id, ShardId(4)), Some(1));
assert_eq!(coordinator.get_shard_source(addition_id, ShardId(5)), Some(2));
}
/// Test: Sync progress calculation.
#[tokio::test(flavor = "multi_thread")]
async fn test_sync_progress_calculation() {
let mut coordinator = GroupAdditionCoordinator::new(GroupAdditionConfig::default());
let addition_id = coordinator.begin_addition(1, 10, &[0]).unwrap();
coordinator.begin_sync(addition_id).unwrap();
// Initially 0% progress
assert_eq!(coordinator.sync_progress(addition_id), Some(0.0));
// Complete 5 out of 10 shards
for shard_id in 0..5 {
coordinator
.shard_sync_complete(addition_id, ShardId(shard_id), 1000)
.unwrap();
}
assert_eq!(coordinator.sync_progress(addition_id), Some(50.0));
// Complete remaining shards
for shard_id in 5..10 {
coordinator
.shard_sync_complete(addition_id, ShardId(shard_id), 1000)
.unwrap();
}
assert_eq!(coordinator.sync_progress(addition_id), Some(100.0));
}
/// Test: Cannot mark group active before sync completes.
#[tokio::test(flavor = "multi_thread")]
async fn test_cannot_mark_active_before_sync_complete() {
let mut coordinator = GroupAdditionCoordinator::new(GroupAdditionConfig::default());
let addition_id = coordinator.begin_addition(1, 10, &[0]).unwrap();
coordinator.begin_sync(addition_id).unwrap();
// Try to mark active before sync completes
let result = coordinator.mark_group_active(addition_id);
assert!(result.is_err(), "Should not be able to mark active before sync completes");
}
/// Test: query_group_active only returns active groups.
#[tokio::test(flavor = "multi_thread")]
async fn test_query_group_active_filters_initializing_groups() {
let mut topo = test_topology_2_groups();
// Verify only group 0 is active
assert_eq!(topo.group(0).unwrap().state(), GroupState::Active);
assert_eq!(topo.group(1).unwrap().state(), GroupState::Initializing);
// All queries should go to group 0
for query_seq in 0..10 {
let chosen = router::query_group_active(query_seq, &topo);
assert_eq!(chosen, 0, "Only active group should be selected");
}
// Mark group 1 as active
if let Some(g) = topo.group_mut(1) {
g.set_state(GroupState::Active);
}
// Now queries should distribute
let mut group_0_count = 0;
let mut group_1_count = 0;
for query_seq in 0..20 {
let chosen = router::query_group_active(query_seq, &topo);
match chosen {
0 => group_0_count += 1,
1 => group_1_count += 1,
_ => panic!("Invalid group"),
}
}
assert_eq!(group_0_count, 10);
assert_eq!(group_1_count, 10);
}

View file

@ -8,11 +8,13 @@ use axum::{
};
use miroir_core::{
config::MiroirConfig,
group_addition::{GroupAdditionCoordinator, GroupAdditionId},
group_sync_worker::GroupSyncWorker,
leader_election::{LeaderElection, LeaderElectionMetricsCallback},
migration::{MigrationConfig, MigrationCoordinator},
rebalancer::{MigrationExecutor, Rebalancer, RebalancerConfig, RebalancerMetrics},
rebalancer_worker::{RebalancerMetricsCallback, RebalancerWorker, RebalancerWorkerConfig, TopologyChangeEvent},
replica_selection::{ReplicaSelector, SelectionObserver},
replica_selection::{ReplicaSelector, SelectionObserver},
router,
scatter::{DeleteByFilterRequest, FetchDocumentsRequest, FetchDocumentsResponse, WriteRequest},
task_registry::TaskRegistryImpl,
@ -360,6 +362,10 @@ pub struct AppState {
pub idempotency_cache: Arc<miroir_core::idempotency::IdempotencyCache>,
/// Query coalescer for read deduplication (plan §13.10).
pub query_coalescer: Arc<miroir_core::idempotency::QueryCoalescer>,
/// Group addition coordinator for replica group addition flow (plan §2).
pub group_addition_coordinator: Option<Arc<RwLock<GroupAdditionCoordinator>>>,
/// Group sync worker for background document sync.
pub group_sync_worker: Option<Arc<GroupSyncWorker<HttpClient>>>,
}
impl AppState {
@ -514,6 +520,9 @@ impl AppState {
Arc::new(miroir_core::settings::SettingsBroadcast::new())
};
// Check if task store is available before moving it
let has_task_store = task_store.is_some();
// Create drift reconciler worker (§13.5) if task store is available
let drift_reconciler = if let Some(ref store) = task_store {
let node_addresses = config.nodes.iter().map(|n| n.address.clone()).collect();
@ -672,6 +681,16 @@ impl AppState {
config.query_coalescing.max_pending_queries as usize,
config.query_coalescing.max_subscribers as usize,
)),
group_addition_coordinator: if has_task_store {
Some(Arc::new(RwLock::new(
miroir_core::group_addition::GroupAdditionCoordinator::new(
miroir_core::group_addition::GroupAdditionConfig::default()
)
)))
} else {
None
},
group_sync_worker: None, // Initialized later if needed
}
}
@ -1542,6 +1561,12 @@ where
/// ]
/// }
/// ```
///
/// Implements plan §2 group addition flow:
/// 1. Provision new nodes; assign replica_group: G_new in config
/// 2. Mark new group initializing; queries NOT routed here
/// 3. Background sync: for each shard, copy all docs from any healthy existing group
/// 4. When all shards synced, mark group active — queries begin routing in round-robin
pub async fn add_replica_group<S>(
State(state): State<S>,
Json(body): Json<serde_json::Value>,
@ -1552,9 +1577,6 @@ where
{
let app_state = AppState::from_ref(&state);
let rebalancer = app_state.rebalancer.as_ref()
.ok_or_else(|| (StatusCode::SERVICE_UNAVAILABLE, "Rebalancer not initialized".into()))?;
let group_id = body.get("group_id")
.and_then(|v| v.as_u64())
.ok_or_else(|| (StatusCode::BAD_REQUEST, "Missing 'group_id' field".into()))?
@ -1564,7 +1586,24 @@ where
.and_then(|v| v.as_array())
.ok_or_else(|| (StatusCode::BAD_REQUEST, "Missing 'nodes' field".into()))?;
let mut nodes = Vec::new();
// Check if group addition coordinator is available
let coordinator = app_state.group_addition_coordinator.as_ref()
.ok_or_else(|| (StatusCode::SERVICE_UNAVAILABLE, "Group addition coordinator not initialized".into()))?;
// Get current topology to find healthy source groups
let source_groups: Vec<u32> = {
let topo = app_state.topology.read().await;
topo.groups()
.filter(|g| g.id != group_id && g.is_active())
.map(|g| g.id)
.collect()
};
if source_groups.is_empty() {
return Err((StatusCode::PRECONDITION_FAILED, "No active source groups available for sync".into()));
}
// Add nodes to topology in initializing state
for node_obj in nodes_array {
let id = node_obj.get("id")
.and_then(|v| v.as_str())
@ -1576,26 +1615,144 @@ where
.ok_or_else(|| (StatusCode::BAD_REQUEST, "Missing node 'address'".into()))?
.to_string();
use miroir_core::rebalancer::GroupNodeSpec;
nodes.push(GroupNodeSpec { id, address });
}
let mut topo = app_state.topology.write().await;
let node_id = NodeId::new(id.clone());
let node = Node::new(node_id, address, group_id);
topo.add_node(node);
use miroir_core::rebalancer::AddReplicaGroupRequest;
let request = AddReplicaGroupRequest { group_id, nodes };
match rebalancer.add_replica_group(request).await {
Ok(result) => {
info!(group_id, "Replica group addition completed");
Ok(Json(serde_json::json!({
"operation_id": result.id,
"message": result.message,
})))
}
Err(e) => {
error!(error = %e, group_id, "Replica group addition failed");
Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
// Mark the new group as initializing
if let Some(g) = topo.group_mut(group_id) {
g.set_state(miroir_core::topology::GroupState::Initializing);
}
}
// Start group addition operation
let shard_count = {
let topo = app_state.topology.read().await;
topo.shards
};
let mut coord = coordinator.write().await;
let addition_id = coord.begin_addition(group_id, shard_count, &source_groups)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to start group addition: {}", e)))?;
// Start background sync
coord.begin_sync(addition_id)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to start sync: {}", e)))?;
info!(group_id, addition_id = %addition_id, "Replica group addition started");
Ok(Json(serde_json::json!({
"addition_id": addition_id.0,
"group_id": group_id,
"message": format!("Replica group {} addition started, syncing {} shards from {} source groups",
group_id, shard_count, source_groups.len()),
"phase": "initializing",
})))
}
/// GET /_miroir/replica_groups/{id}/status — Get the status of a replica group addition.
pub async fn get_group_addition_status<S>(
State(state): State<S>,
Path(group_id): Path<u32>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)>
where
S: Clone + Send + Sync + 'static,
AppState: FromRef<S>,
{
let app_state = AppState::from_ref(&state);
let coordinator = app_state.group_addition_coordinator.as_ref()
.ok_or_else(|| (StatusCode::SERVICE_UNAVAILABLE, "Group addition coordinator not initialized".into()))?;
let coord = coordinator.read().await;
// Find the addition for this group
let addition = coord.get_all_additions().values()
.find(|a| a.group_id == group_id)
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("No active addition for group {}", group_id)))?;
let progress = coord.sync_progress(addition.id).unwrap_or(0.0);
// Count shards by state
let mut pending = 0;
let mut syncing = 0;
let mut complete = 0;
let mut failed = 0;
for shard_state in addition.shard_states.values() {
match shard_state {
miroir_core::group_addition::ShardSyncState::Pending => pending += 1,
miroir_core::group_addition::ShardSyncState::Syncing { .. } => syncing += 1,
miroir_core::group_addition::ShardSyncState::Complete { .. } => complete += 1,
miroir_core::group_addition::ShardSyncState::Failed { .. } => failed += 1,
}
}
Ok(Json(serde_json::json!({
"addition_id": addition.id.0,
"group_id": group_id,
"phase": addition.phase,
"progress_percent": progress,
"shards": {
"total": addition.shard_states.len(),
"pending": pending,
"syncing": syncing,
"complete": complete,
"failed": failed,
},
"started_at": addition.started_at.map(|t| format!("{:?}", t)),
})))
}
/// POST /_miroir/replica_groups/{id}/activate — Mark a replica group as active.
///
/// This should only be called after verifying that the group has synced all data
/// (via GET /_miroir/replica_groups/{id}/status showing 100% progress).
pub async fn activate_replica_group<S>(
State(state): State<S>,
Path(group_id): Path<u32>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)>
where
S: Clone + Send + Sync + 'static,
AppState: FromRef<S>,
{
let app_state = AppState::from_ref(&state);
let coordinator = app_state.group_addition_coordinator.as_ref()
.ok_or_else(|| (StatusCode::SERVICE_UNAVAILABLE, "Group addition coordinator not initialized".into()))?;
// Find the addition for this group
let addition_id = {
let coord = coordinator.read().await;
coord.get_all_additions().values()
.find(|a| a.group_id == group_id && matches!(a.phase, miroir_core::group_addition::GroupAdditionPhase::SyncComplete))
.map(|a| a.id)
.ok_or_else(|| (StatusCode::PRECONDITION_FAILED, format!("Group {} is not ready for activation (sync not complete)", group_id)))?
};
// Mark group as active
{
let mut coord = coordinator.write().await;
coord.mark_group_active(addition_id)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to activate group: {}", e)))?;
}
// Update topology to mark group as active
{
let mut topo = app_state.topology.write().await;
if let Some(g) = topo.group_mut(group_id) {
g.set_state(miroir_core::topology::GroupState::Active);
}
}
info!(group_id, "Replica group activated");
Ok(Json(serde_json::json!({
"group_id": group_id,
"message": format!("Replica group {} is now active and serving queries", group_id),
"phase": "active",
})))
}
/// DELETE /_miroir/replica_groups/{id} — Remove a replica group.