P12.OP1: Chaos-test cutover race window + hard refusal policy
14 chaos tests validate shard migration write safety at every cutover boundary. Key findings: - AE on + delta pass: 0/1M loss (production default) - AE off + delta pass: 0/50K loss (delta pass is sufficient alone) - AE off + delta skipped: ~2% loss → hard refusal at config validation - 3-node cluster cutover: 0 loss with delta pass Hard-coded policy: MigrationCoordinator refuses migrations when both anti-entropy is disabled and delta pass is skipped. Warning logged when AE is disabled but delta pass remains active. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
81155beb0d
commit
fec5aa5e74
5 changed files with 1933 additions and 54 deletions
|
|
@ -3,8 +3,6 @@
|
|||
//! Stub for plan §13.8 anti-entropy shard reconciler.
|
||||
//! Full implementation will follow the fingerprint → diff → repair pipeline.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::migration::{MigrationConfig, MigrationError};
|
||||
|
|
@ -37,6 +35,11 @@ impl Default for AntiEntropyConfig {
|
|||
|
||||
/// Validates that migration is safe given the anti-entropy configuration.
|
||||
/// Returns Ok(()) if safe, Err with a descriptive message if not.
|
||||
///
|
||||
/// Hard refusal policy (plan §15 OP#1): skipping the delta pass while
|
||||
/// anti-entropy is disabled provides zero recovery path for documents
|
||||
/// written at the cutover boundary. Measured loss rate: ~2% of writes.
|
||||
/// This is a hard-coded policy, not a warning.
|
||||
pub fn validate_migration_safety(
|
||||
ae_config: &AntiEntropyConfig,
|
||||
migration_config: &MigrationConfig,
|
||||
|
|
@ -49,14 +52,18 @@ pub fn validate_migration_safety(
|
|||
|
||||
/// Generates a warning if anti-entropy is disabled during active migration.
|
||||
/// The caller should log this at warn level.
|
||||
///
|
||||
/// Even with the delta pass enabled (which provides 0-loss cutover on its own),
|
||||
/// disabling anti-entropy means the delta pass is the sole safety mechanism.
|
||||
/// Operators should be aware of this reduced redundancy.
|
||||
pub fn migration_warning_if_ae_disabled(ae_enabled: bool) -> Option<String> {
|
||||
if ae_enabled {
|
||||
return None;
|
||||
}
|
||||
Some(
|
||||
"Anti-entropy is disabled. Shard migration cutover relies on the delta pass \
|
||||
to prevent data loss at the cutover boundary. If delta pass is also skipped, \
|
||||
documents written during migration may be permanently lost."
|
||||
as the sole safety mechanism. Any bugs in the delta pass could lead to \
|
||||
data loss at the cutover boundary. Re-enable anti-entropy for defense-in-depth."
|
||||
.to_string(),
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,5 +13,8 @@ pub mod scatter;
|
|||
pub mod task;
|
||||
pub mod topology;
|
||||
|
||||
#[cfg(feature = "raft-proto")]
|
||||
pub mod raft_proto;
|
||||
|
||||
// Public re-exports
|
||||
pub use error::{MiroirError, Result};
|
||||
|
|
|
|||
|
|
@ -72,9 +72,7 @@ pub enum ShardMigrationState {
|
|||
pages_remaining: u32,
|
||||
},
|
||||
/// Background migration complete, awaiting cutover.
|
||||
MigrationComplete {
|
||||
docs_copied: u64,
|
||||
},
|
||||
MigrationComplete { docs_copied: u64 },
|
||||
/// Dual-write stopped, in-flight writes draining.
|
||||
Draining {
|
||||
in_flight_count: u32,
|
||||
|
|
@ -88,26 +86,38 @@ pub enum ShardMigrationState {
|
|||
/// Node is active for this shard; old replica data deleted.
|
||||
Active,
|
||||
/// Migration failed at this phase.
|
||||
Failed {
|
||||
phase: String,
|
||||
reason: String,
|
||||
},
|
||||
Failed { phase: String, reason: String },
|
||||
}
|
||||
|
||||
impl fmt::Display for ShardMigrationState {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Pending => write!(f, "pending"),
|
||||
Self::Migrating { docs_copied, pages_remaining } => {
|
||||
write!(f, "migrating({docs_copied} copied, {pages_remaining} pages left)")
|
||||
Self::Migrating {
|
||||
docs_copied,
|
||||
pages_remaining,
|
||||
} => {
|
||||
write!(
|
||||
f,
|
||||
"migrating({docs_copied} copied, {pages_remaining} pages left)"
|
||||
)
|
||||
}
|
||||
Self::MigrationComplete { docs_copied } => {
|
||||
write!(f, "migration_complete({docs_copied} copied)")
|
||||
}
|
||||
Self::Draining { in_flight_count, docs_copied } => {
|
||||
write!(f, "draining({in_flight_count} in-flight, {docs_copied} copied)")
|
||||
Self::Draining {
|
||||
in_flight_count,
|
||||
docs_copied,
|
||||
} => {
|
||||
write!(
|
||||
f,
|
||||
"draining({in_flight_count} in-flight, {docs_copied} copied)"
|
||||
)
|
||||
}
|
||||
Self::DeltaPass { docs_copied, delta_docs_copied } => {
|
||||
Self::DeltaPass {
|
||||
docs_copied,
|
||||
delta_docs_copied,
|
||||
} => {
|
||||
write!(f, "delta_pass({docs_copied} + {delta_docs_copied} copied)")
|
||||
}
|
||||
Self::Active => write!(f, "active"),
|
||||
|
|
@ -166,32 +176,26 @@ pub struct InFlightWrite {
|
|||
pub submitted_at: Instant,
|
||||
}
|
||||
|
||||
// Serialize Instant as u64 (milliseconds since UNIX epoch)
|
||||
// Serialize Instant as a placeholder bool (present/absent).
|
||||
// Instant is monotonic and not meaningfully serializable across processes;
|
||||
// on deserialize, reconstruct as Instant::now().
|
||||
mod instant_serde {
|
||||
use std::time::{Duration, Instant};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use std::time::Instant;
|
||||
|
||||
pub fn serialize<S>(instant: &Option<Instant>, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
match instant {
|
||||
Some(i) => {
|
||||
let since_epoch = i.duration_since(Instant::now() - *i);
|
||||
// Store as approximate UNIX timestamp - this is a simplification
|
||||
// For production, would use SystemTime instead
|
||||
(since_epoch.as_millis() as u64).serialize(serializer)
|
||||
}
|
||||
None => Option::<u64>::None.serialize(serializer),
|
||||
}
|
||||
instant.is_some().serialize(serializer)
|
||||
}
|
||||
|
||||
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<Instant>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let opt = Option::<u64>::deserialize(deserializer)?;
|
||||
Ok(opt.map(|_| Instant::now()))
|
||||
let present = bool::deserialize(deserializer)?;
|
||||
Ok(if present { Some(Instant::now()) } else { None })
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -221,7 +225,9 @@ impl Default for MigrationConfig {
|
|||
/// Error type for migration operations.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum MigrationError {
|
||||
#[error("anti-entropy is disabled and delta pass is skipped — documents may be lost at cutover")]
|
||||
#[error(
|
||||
"anti-entropy is disabled and delta pass is skipped — documents may be lost at cutover"
|
||||
)]
|
||||
UnsafeCutoverNoAntiEntropy,
|
||||
#[error("drain timeout exceeded: {0} in-flight writes still pending")]
|
||||
DrainTimeout(u32),
|
||||
|
|
@ -311,7 +317,10 @@ impl MigrationCoordinator {
|
|||
|
||||
/// Transition to dual-write + background migration phase.
|
||||
pub fn begin_dual_write(&mut self, id: MigrationId) -> Result<(), MigrationError> {
|
||||
let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?;
|
||||
let state = self
|
||||
.migrations
|
||||
.get_mut(&id)
|
||||
.ok_or(MigrationError::NotFound(id))?;
|
||||
state.phase = MigrationPhase::DualWriteMigrating;
|
||||
for shard_state in state.affected_shards.values_mut() {
|
||||
if *shard_state == ShardMigrationState::Pending {
|
||||
|
|
@ -331,7 +340,10 @@ impl MigrationCoordinator {
|
|||
shard: ShardId,
|
||||
docs_copied: u64,
|
||||
) -> Result<(), MigrationError> {
|
||||
let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?;
|
||||
let state = self
|
||||
.migrations
|
||||
.get_mut(&id)
|
||||
.ok_or(MigrationError::NotFound(id))?;
|
||||
let shard_state = state.affected_shards.get_mut(&shard).ok_or_else(|| {
|
||||
MigrationError::InvalidTransition(shard, "shard not in migration".into())
|
||||
})?;
|
||||
|
|
@ -363,7 +375,10 @@ impl MigrationCoordinator {
|
|||
|
||||
/// Begin the cutover sequence: stop dual-write and drain in-flight writes.
|
||||
pub fn begin_cutover(&mut self, id: MigrationId) -> Result<MigrationPhase, MigrationError> {
|
||||
let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?;
|
||||
let state = self
|
||||
.migrations
|
||||
.get_mut(&id)
|
||||
.ok_or(MigrationError::NotFound(id))?;
|
||||
|
||||
if !matches!(state.phase, MigrationPhase::CutoverBegin) {
|
||||
return Err(MigrationError::InvalidTransition(
|
||||
|
|
@ -420,11 +435,9 @@ impl MigrationCoordinator {
|
|||
|
||||
/// Check if all in-flight writes have completed (drained).
|
||||
pub fn is_drained(&self) -> bool {
|
||||
self.in_flight.iter().all(|w| {
|
||||
let all_responded = w.completed_nodes.len() + w.failed_nodes.len()
|
||||
== w.target_nodes.len();
|
||||
all_responded
|
||||
})
|
||||
self.in_flight
|
||||
.iter()
|
||||
.all(|w| w.completed_nodes.len() + w.failed_nodes.len() == w.target_nodes.len())
|
||||
}
|
||||
|
||||
/// Complete the drain and move to delta pass or activation.
|
||||
|
|
@ -449,9 +462,7 @@ impl MigrationCoordinator {
|
|||
let remaining = self
|
||||
.in_flight
|
||||
.iter()
|
||||
.filter(|w| {
|
||||
w.completed_nodes.len() + w.failed_nodes.len() < w.target_nodes.len()
|
||||
})
|
||||
.filter(|w| w.completed_nodes.len() + w.failed_nodes.len() < w.target_nodes.len())
|
||||
.count() as u32;
|
||||
return Err(MigrationError::DrainTimeout(remaining));
|
||||
}
|
||||
|
|
@ -461,7 +472,10 @@ impl MigrationCoordinator {
|
|||
let skip_delta = self.config.skip_delta_pass;
|
||||
|
||||
// Now get mutable borrow to update state
|
||||
let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?;
|
||||
let state = self
|
||||
.migrations
|
||||
.get_mut(&id)
|
||||
.ok_or(MigrationError::NotFound(id))?;
|
||||
|
||||
if skip_delta {
|
||||
// Skip delta pass — safe only if anti-entropy is enabled
|
||||
|
|
@ -485,7 +499,7 @@ impl MigrationCoordinator {
|
|||
// If going to activate, do that now (drop mutable borrow first)
|
||||
let next_phase = state.phase.clone();
|
||||
if matches!(next_phase, MigrationPhase::CutoverActivate) {
|
||||
drop(state); // Drop mutable borrow before calling activate_shards
|
||||
let _ = state;
|
||||
self.activate_shards(id)?;
|
||||
// Return the new phase after activation
|
||||
return Ok(self
|
||||
|
|
@ -504,7 +518,10 @@ impl MigrationCoordinator {
|
|||
&self,
|
||||
id: MigrationId,
|
||||
) -> Result<HashMap<ShardId, Vec<String>>, MigrationError> {
|
||||
let state = self.migrations.get(&id).ok_or(MigrationError::NotFound(id))?;
|
||||
let state = self
|
||||
.migrations
|
||||
.get(&id)
|
||||
.ok_or(MigrationError::NotFound(id))?;
|
||||
let mut candidates: HashMap<ShardId, Vec<String>> = HashMap::new();
|
||||
|
||||
for write in &self.in_flight {
|
||||
|
|
@ -535,7 +552,10 @@ impl MigrationCoordinator {
|
|||
shard: ShardId,
|
||||
delta_docs: u64,
|
||||
) -> Result<(), MigrationError> {
|
||||
let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?;
|
||||
let state = self
|
||||
.migrations
|
||||
.get_mut(&id)
|
||||
.ok_or(MigrationError::NotFound(id))?;
|
||||
let shard_state = state.affected_shards.get_mut(&shard).ok_or_else(|| {
|
||||
MigrationError::InvalidTransition(shard, "shard not in migration".into())
|
||||
})?;
|
||||
|
|
@ -570,7 +590,10 @@ impl MigrationCoordinator {
|
|||
|
||||
/// Mark all affected shards as active on the new node.
|
||||
fn activate_shards(&mut self, id: MigrationId) -> Result<(), MigrationError> {
|
||||
let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?;
|
||||
let state = self
|
||||
.migrations
|
||||
.get_mut(&id)
|
||||
.ok_or(MigrationError::NotFound(id))?;
|
||||
|
||||
for shard_state in state.affected_shards.values_mut() {
|
||||
match shard_state {
|
||||
|
|
@ -591,7 +614,10 @@ impl MigrationCoordinator {
|
|||
|
||||
/// Complete the migration by deleting migrated shard data from old nodes.
|
||||
pub fn complete_cleanup(&mut self, id: MigrationId) -> Result<(), MigrationError> {
|
||||
let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?;
|
||||
let state = self
|
||||
.migrations
|
||||
.get_mut(&id)
|
||||
.ok_or(MigrationError::NotFound(id))?;
|
||||
|
||||
if !matches!(state.phase, MigrationPhase::CutoverCleanup) {
|
||||
return Err(MigrationError::InvalidTransition(
|
||||
|
|
@ -648,10 +674,7 @@ mod tests {
|
|||
};
|
||||
let mut coord = MigrationCoordinator::new(config);
|
||||
|
||||
let affected = HashMap::from([
|
||||
(shard(0), node("old-0")),
|
||||
(shard(1), node("old-0")),
|
||||
]);
|
||||
let affected = HashMap::from([(shard(0), node("old-0")), (shard(1), node("old-0"))]);
|
||||
|
||||
let mid = coord.begin_migration(node("new-0"), 0, affected).unwrap();
|
||||
coord.begin_dual_write(mid).unwrap();
|
||||
|
|
@ -660,13 +683,15 @@ mod tests {
|
|||
coord.shard_migration_complete(mid, shard(0), 500).unwrap();
|
||||
coord.shard_migration_complete(mid, shard(1), 300).unwrap();
|
||||
|
||||
// Register an in-flight write that succeeded on OLD but not NEW
|
||||
// Register an in-flight write that succeeded on OLD but not NEW.
|
||||
// The write must be marked as failed on NEW so is_drained() sees
|
||||
// completed + failed == target count.
|
||||
coord.register_in_flight(InFlightWrite {
|
||||
doc_id: "doc-at-boundary".into(),
|
||||
shard: shard(0),
|
||||
target_nodes: vec![node("old-0"), node("new-0")],
|
||||
completed_nodes: HashSet::from([node("old-0")]),
|
||||
failed_nodes: HashMap::new(),
|
||||
failed_nodes: HashMap::from([(node("new-0"), "write failed".into())]),
|
||||
submitted_at: Instant::now(),
|
||||
});
|
||||
|
||||
|
|
@ -764,7 +789,10 @@ mod tests {
|
|||
// Drain should fail — write still in flight
|
||||
let result = coord.complete_drain(mid);
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(result.unwrap_err(), MigrationError::DrainTimeout(1)));
|
||||
assert!(matches!(
|
||||
result.unwrap_err(),
|
||||
MigrationError::DrainTimeout(1)
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
|||
1773
crates/miroir-core/tests/cutover_race.rs
Normal file
1773
crates/miroir-core/tests/cutover_race.rs
Normal file
File diff suppressed because it is too large
Load diff
68
docs/trade-offs.md
Normal file
68
docs/trade-offs.md
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
# Miroir Trade-Offs and Design Decisions
|
||||
|
||||
## Shard Migration Write Safety (Plan §15 OP#1)
|
||||
|
||||
### Problem
|
||||
|
||||
During node addition, documents written at the exact cutover boundary can be
|
||||
lost if they succeed on the OLD node but fail on the NEW node. The dangerous
|
||||
window is between "stop dual-write" and "delete old shard data."
|
||||
|
||||
### Solution: Quiesce-Then-Verify Cutover
|
||||
|
||||
The migration state machine (`migration.rs`) uses a multi-phase cutover:
|
||||
|
||||
1. **Stop dual-write** — no new writes go to either node for affected shards
|
||||
2. **Drain** — wait for all in-flight writes to complete on both OLD and NEW
|
||||
3. **Delta pass** — re-read affected shards from OLD, write any docs missing on NEW
|
||||
4. **Activate** — routing switches to NEW-only
|
||||
5. **Cleanup** — delete migrated shard data from OLD
|
||||
|
||||
### Empirical Results
|
||||
|
||||
| Configuration | Writes | Loss Rate | Verdict |
|
||||
|---|---|---|---|
|
||||
| AE on + delta pass on | 1M | 0/1M (0.000%) | **PASS** — production default |
|
||||
| AE off + delta pass on | 50K | 0/50K (0.000%) | PASS — delta pass is sufficient alone |
|
||||
| AE on + delta pass skipped | 200 | measurable | Acceptable — AE repairs on next pass |
|
||||
| AE off + delta pass skipped | 100K | ~2.0% | **REFUSED** — blocked at config validation |
|
||||
| Tight-loop boundary (AE+delta) | 1350+ | 0 | PASS — writes at every transition boundary |
|
||||
| High-volume boundary (AE+delta) | 100K | 0/100K | PASS |
|
||||
| 3-node cluster (AE+delta) | 2600+ | 0 | PASS — multi-owner cutover |
|
||||
| 3-node cluster (AE off+delta) | 5000 | 0 | PASS — delta pass alone sufficient |
|
||||
|
||||
### Decision: Hard Refusal of Unsafe Configuration
|
||||
|
||||
`MigrationCoordinator::validate_safety()` refuses to start a migration when
|
||||
both anti-entropy is disabled AND the delta pass is skipped. This is a
|
||||
**hard-coded policy** — not a warning — because:
|
||||
|
||||
- The measured loss rate without either safety net is ~2% (deterministic,
|
||||
proportional to the write-failure rate during dual-write)
|
||||
- Anti-entropy runs every 6 hours by default; disabling it removes the
|
||||
reconciliation safety net
|
||||
- Skipping the delta pass removes the immediate repair mechanism
|
||||
- Both off together provides **zero recovery path** for boundary documents
|
||||
|
||||
The `validate_migration_safety()` function in `anti_entropy.rs` provides the
|
||||
same gate at the cross-module level, ensuring no code path can bypass this
|
||||
check.
|
||||
|
||||
### Anti-Entropy: Required or Optional?
|
||||
|
||||
**Anti-entropy is optional but recommended.** The delta pass alone provides
|
||||
0-loss cutover. Anti-entropy exists as a defense-in-depth measure:
|
||||
|
||||
- Catches any bugs in the delta pass implementation
|
||||
- Repairs drift from non-migration causes (network partitions, disk errors)
|
||||
- Runs on a 6-hour schedule (configurable)
|
||||
|
||||
Operators MAY disable anti-entropy if they accept the risk of gradual replica
|
||||
drift. They MAY NOT skip both anti-entropy and the delta pass simultaneously.
|
||||
|
||||
### Warning When AE Is Disabled During Migration
|
||||
|
||||
When anti-entropy is disabled and a migration begins (with delta pass enabled),
|
||||
the system logs a warning via `migration_warning_if_ae_disabled()`. This
|
||||
informs operators that the delta pass is the sole safety mechanism and any
|
||||
bugs in it could lead to data loss.
|
||||
Loading…
Add table
Reference in a new issue