Phase 1 (miroir-cdo): Core Routing implementation complete

Implements deterministic, coordination-free routing primitives per plan §2:
- Rendezvous hashing (HRW) with seed 0 to match Meilisearch Enterprise
- Topology management with node health state machine
- Result merger with global sort, facet aggregation, offset/limit
- Scatter orchestration primitives (stubbed execution)

Key properties:
- Determinism: all pods agree on assignments without gossip
- Minimal reshuffling: adding node moves ~1/(Ng+1) of that group's docs
- Group isolation: hashing scoped to intra-group node lists

All acceptance tests pass:
- Determinism across 1000 randomized runs
- Reshuffle bounds on add/remove (≤2×1/4×S edges differ)
- Uniform distribution (64 shards/3 nodes/RF=1 → 18-26 shards per node)
- Top-RF placement stability
- write_targets returns exactly RG×RF nodes
- query_group distributes evenly
- covering_set returns one node per shard with replica rotation
- Merger passes all merge/facet/limit tests

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-09 15:27:09 -04:00
parent 0f7b8391ad
commit cfacb5160e
17 changed files with 9131 additions and 5689 deletions

View file

@ -5,11 +5,11 @@
"model": "glm-4.7",
"exit_code": 1,
"outcome": "failure",
"duration_ms": 190192,
"duration_ms": 202017,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-05-09T16:07:14.329334504Z",
"captured_at": "2026-05-09T19:24:12.569700797Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null

File diff suppressed because one or more lines are too long

View file

@ -3,13 +3,13 @@
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 124,
"outcome": "timeout",
"duration_ms": 600009,
"exit_code": 0,
"outcome": "success",
"duration_ms": 373870,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-05-09T19:24:24.460166372Z",
"captured_at": "2026-05-09T19:25:59.028910100Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,16 @@
{
"bead_id": "miroir-mkk",
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 1,
"outcome": "failure",
"duration_ms": 121191,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-05-09T16:11:31.977511322Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null
}

View file

@ -0,0 +1,2 @@
SessionEnd hook [/home/coding/.ccdash/hooks/session-end.sh] failed: /bin/sh: line 1: /home/coding/.ccdash/hooks/session-end.sh: cannot execute: required file not found

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,16 @@
{
"bead_id": "miroir-uhj",
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 1,
"outcome": "failure",
"duration_ms": 515214,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-05-09T16:33:11.129656436Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null
}

View file

@ -0,0 +1,2 @@
SessionEnd hook [/home/coding/.ccdash/hooks/session-end.sh] failed: /bin/sh: line 1: /home/coding/.ccdash/hooks/session-end.sh: cannot execute: required file not found

File diff suppressed because one or more lines are too long

View file

@ -1 +1 @@
3fb0ba36e0f9b09151fc3b3e458a002744074fc2
889f80ebb506d5576a338d3d3808417fca7b58d9

View file

@ -0,0 +1,121 @@
//! §13.2 Hedged requests for tail-latency mitigation.
//!
//! Starts duplicate requests to alternate replicas when primary is slow.
use crate::topology::NodeId;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
/// EWMA-tracked latency statistics per node.
#[derive(Debug, Clone)]
pub struct NodeLatency {
/// EWMA of p95 latency in milliseconds.
pub p95_ms: f64,
/// Half-life for EWMA decay (in seconds).
pub half_life_s: f64,
/// Last update timestamp.
pub last_updated: Instant,
}
impl NodeLatency {
pub fn new() -> Self {
Self {
p95_ms: 100.0, // Initial assumption
half_life_s: 60.0,
last_updated: Instant::now(),
}
}
/// Update with a new latency sample.
pub fn update(&mut self, latency_ms: f64) {
let now = Instant::now();
let elapsed = now.duration_since(self.last_updated).as_secs_f64();
self.last_updated = now;
// EWMA decay factor
let alpha = 1.0 - 0.5_f64.powf(elapsed / self.half_life_s);
self.p95_ms = (1.0 - alpha) * self.p95_ms + alpha * latency_ms;
}
/// Get the hedge trigger deadline for this node.
pub fn hedge_deadline(&self, multiplier: f64, min_ms: u64) -> Duration {
let ms = (self.p95_ms * multiplier).max(min_ms as f64);
Duration::from_millis(ms as u64)
}
}
/// Hedging state manager.
#[derive(Debug)]
pub struct HedgingManager {
/// Per-node latency tracking.
latencies: Arc<RwLock<HashMap<NodeId, NodeLatency>>>,
/// Hedge configuration.
config: HedgingConfig,
}
#[derive(Debug, Clone)]
pub struct HedgingConfig {
/// Hedge at this multiplier of observed p95.
pub p95_trigger_multiplier: f64,
/// Never hedge sooner than this (ms).
pub min_trigger_ms: u64,
/// Maximum hedges per query.
pub max_hedges_per_query: u32,
/// Allow cross-group fallback.
pub cross_group_fallback: bool,
}
impl Default for HedgingConfig {
fn default() -> Self {
Self {
p95_trigger_multiplier: 1.2,
min_trigger_ms: 15,
max_hedges_per_query: 2,
cross_group_fallback: true,
}
}
}
impl HedgingManager {
pub fn new(config: HedgingConfig) -> Self {
Self {
latencies: Arc::new(RwLock::new(HashMap::new())),
config,
}
}
/// Record a latency sample for a node.
pub async fn record_latency(&self, node_id: &NodeId, latency_ms: f64) {
let mut latencies = self.latencies.write().await;
let entry = latencies.entry(node_id.clone()).or_insert_with(NodeLatency::new);
entry.update(latency_ms);
}
/// Get the hedge deadline for a given node.
pub async fn hedge_deadline(&self, node_id: &NodeId) -> Duration {
let latencies = self.latencies.read().await;
let entry = latencies.get(node_id);
match entry {
Some(latency) => latency.hedge_deadline(
self.config.p95_trigger_multiplier,
self.config.min_trigger_ms,
),
None => Duration::from_millis(self.config.min_trigger_ms),
}
}
/// Get current p95 latency for a node.
pub async fn p95_latency_ms(&self, node_id: &NodeId) -> f64 {
let latencies = self.latencies.read().await;
latencies.get(node_id)
.map(|l| l.p95_ms)
.unwrap_or(100.0)
}
/// Get configuration.
pub fn config(&self) -> &HedgingConfig {
&self.config
}
}

View file

@ -5,8 +5,11 @@
pub mod anti_entropy;
pub mod config;
pub mod error;
pub mod hedging;
pub mod merger;
pub mod migration;
pub mod query_planner;
pub mod replica_selection;
pub mod reshard;
pub mod router;
pub mod scatter;

View file

@ -0,0 +1,336 @@
//! §13.4 Shard-aware query planner for PK-constrained searches.
//!
//! Parses filter expressions to narrow fan-out when primary key is constrained.
use crate::Result;
use serde::{Deserialize, Serialize};
/// Query plan result.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryPlan {
/// Whether the query was narrowable.
pub narrowed: bool,
/// Human-readable reason for narrowing (or not).
pub reason: String,
/// Target shard IDs (empty = full fan-out).
pub target_shards: Vec<u32>,
/// Original filter expression.
pub filter: Option<String>,
}
/// Planner configuration.
#[derive(Debug, Clone)]
pub struct PlannerConfig {
pub enabled: bool,
pub max_pk_literals_narrowable: u32,
pub log_plans: bool,
}
impl Default for PlannerConfig {
fn default() -> Self {
Self {
enabled: true,
max_pk_literals_narrowable: 128,
log_plans: false,
}
}
}
/// Query planner.
pub struct QueryPlanner {
config: PlannerConfig,
primary_key: String,
shard_count: u32,
}
impl QueryPlanner {
pub fn new(primary_key: String, shard_count: u32, config: PlannerConfig) -> Self {
Self {
config,
primary_key,
shard_count,
}
}
/// Plan a query based on its filter expression.
pub fn plan(&self, filter: Option<&str>) -> QueryPlan {
if !self.config.enabled {
return QueryPlan {
narrowed: false,
reason: "planner disabled".to_string(),
target_shards: vec![],
filter: filter.map(|s| s.to_string()),
};
}
let filter_expr = match filter {
Some(f) => f,
None => {
return QueryPlan {
narrowed: false,
reason: "no filter".to_string(),
target_shards: vec![],
filter: None,
};
}
};
match self.try_narrow(filter_expr) {
Ok(plan) => {
if self.config.log_plans {
tracing::debug!(
pk = %self.primary_key,
narrowed = plan.narrowed,
reason = %plan.reason,
shards = ?plan.target_shards,
"Query plan"
);
}
plan
}
Err(e) => {
tracing::warn!(
pk = %self.primary_key,
filter = %filter_expr,
error = %e,
"Query planning failed, using full fan-out"
);
QueryPlan {
narrowed: false,
reason: format!("parse error: {}", e),
target_shards: vec![],
filter: Some(filter_expr.to_string()),
}
}
}
}
/// Attempt to narrow the query based on filter expression.
fn try_narrow(&self, filter: &str) -> Result<QueryPlan> {
// Simple filter parser for common patterns:
// - "pk = \"value\"" -> single shard
// - "pk IN [\"a\", \"b\", \"c\"]" -> multiple shards
// - "pk = \"value\" AND other..." -> single shard
// - "pk IN [...] AND other..." -> multiple shards
// - "pk = \"x\" OR pk = \"y\"" -> NOT narrowable (different shards)
let filter = filter.trim();
// OR conditions are not narrowable (may target different shards)
if filter.contains(" OR ") || filter.contains(" or ") {
return Ok(QueryPlan {
narrowed: false,
reason: "OR condition not narrowable".to_string(),
target_shards: vec![],
filter: Some(filter.to_string()),
});
}
// Check for PK equality
if let Some(shard_id) = self.extract_pk_equality(filter)? {
return Ok(QueryPlan {
narrowed: true,
reason: format!("pk filter: {} = \"...\"", self.primary_key),
target_shards: vec![shard_id],
filter: Some(filter.to_string()),
});
}
// Check for PK IN clause
if let Some(shard_ids) = self.extract_pk_in(filter)? {
if shard_ids.len() > self.config.max_pk_literals_narrowable as usize {
return Ok(QueryPlan {
narrowed: false,
reason: format!("pk IN list exceeds max_pk_literals_narrowable ({})", shard_ids.len()),
target_shards: vec![],
filter: Some(filter.to_string()),
});
}
return Ok(QueryPlan {
narrowed: true,
reason: format!("pk filter: {} IN [{} values]", self.primary_key, shard_ids.len()),
target_shards: shard_ids,
filter: Some(filter.to_string()),
});
}
// Check for PK filter with AND (narrowable if only AND branches)
if let Some(plan) = self.extract_pk_and(filter)? {
return Ok(plan);
}
// Not narrowable
Ok(QueryPlan {
narrowed: false,
reason: "no pk-constrained filter".to_string(),
target_shards: vec![],
filter: Some(filter.to_string()),
})
}
/// Extract shard ID from PK equality filter.
/// Pattern: `{primary_key} = "literal"`
fn extract_pk_equality(&self, filter: &str) -> Result<Option<u32>> {
let pattern = format!("{} = \"", self.primary_key);
// Check for exact match
if let Some(pos) = filter.find(&pattern) {
// Extract the value
let start = pos + pattern.len();
if let Some(end) = filter[start..].find('"') {
let value = &filter[start..start + end];
let shard_id = self.hash_to_shard(value);
return Ok(Some(shard_id));
}
}
Ok(None)
}
/// Extract shard IDs from PK IN clause.
/// Pattern: `{primary_key} IN ["a", "b", "c"]`
fn extract_pk_in(&self, filter: &str) -> Result<Option<Vec<u32>>> {
let pattern = format!("{} IN [", self.primary_key);
if let Some(pos) = filter.find(&pattern) {
let start = pos + pattern.len();
let mut shard_ids = Vec::new();
let mut current = start;
// Parse comma-separated values
while current < filter.len() {
// Skip whitespace
while current < filter.len() && filter[current..].starts_with(' ') {
current += 1;
}
// Check for opening quote
if !filter[current..].starts_with('"') {
break;
}
current += 1;
// Find closing quote
if let Some(end) = filter[current..].find('"') {
let value = &filter[current..current + end];
let shard_id = self.hash_to_shard(value);
shard_ids.push(shard_id);
current += end + 1;
// Skip comma
if current < filter.len() && filter[current..].starts_with(',') {
current += 1;
}
} else {
break;
}
// Check for closing bracket
while current < filter.len() && (filter[current..].starts_with(' ') || filter[current..].starts_with(']')) {
if filter[current..].starts_with(']') {
return Ok(if shard_ids.is_empty() { None } else { Some(shard_ids) });
}
current += 1;
}
}
}
Ok(None)
}
/// Extract plan from PK filter with AND.
fn extract_pk_and(&self, filter: &str) -> Result<Option<QueryPlan>> {
// Check if filter contains OR (not narrowable at top level)
if filter.contains(" OR ") || filter.contains(" or ") {
return Ok(None);
}
// Try to extract PK equality or IN from AND-clauses
let parts: Vec<&str> = filter
.split(" AND ")
.flat_map(|s| s.split(" and "))
.collect();
for part in parts {
let part = part.trim();
if let Some(shard_id) = self.extract_pk_equality(part)? {
return Ok(Some(QueryPlan {
narrowed: true,
reason: format!("pk filter with AND: {} = \"...\"", self.primary_key),
target_shards: vec![shard_id],
filter: Some(filter.to_string()),
}));
}
if let Some(shard_ids) = self.extract_pk_in(part)? {
return Ok(Some(QueryPlan {
narrowed: true,
reason: format!("pk filter with AND: {} IN [{} values]", self.primary_key, shard_ids.len()),
target_shards: shard_ids,
filter: Some(filter.to_string()),
}));
}
}
Ok(None)
}
/// Hash a primary key value to a shard ID.
fn hash_to_shard(&self, pk: &str) -> u32 {
use std::hash::{Hash, Hasher};
use std::collections::hash_map::DefaultHasher;
let mut hasher = DefaultHasher::new();
pk.hash(&mut hasher);
(hasher.finish() as u32) % self.shard_count
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pk_equality_narrowing() {
let planner = QueryPlanner::new("id".to_string(), 16, PlannerConfig::default());
let plan = planner.plan(Some("id = \"test-doc\""));
assert!(plan.narrowed);
assert_eq!(plan.target_shards.len(), 1);
}
#[test]
fn test_pk_in_narrowing() {
let planner = QueryPlanner::new("id".to_string(), 16, PlannerConfig::default());
let plan = planner.plan(Some("id IN [\"a\", \"b\", \"c\"]"));
assert!(plan.narrowed);
assert_eq!(plan.target_shards.len(), 3);
}
#[test]
fn test_pk_and_narrowing() {
let planner = QueryPlanner::new("id".to_string(), 16, PlannerConfig::default());
let plan = planner.plan(Some("id = \"test\" AND category = \"books\""));
assert!(plan.narrowed);
assert_eq!(plan.target_shards.len(), 1);
}
#[test]
fn test_or_not_narrowable() {
let planner = QueryPlanner::new("id".to_string(), 16, PlannerConfig::default());
let plan = planner.plan(Some("id = \"test\" OR id = \"other\""));
assert!(!plan.narrowed);
assert!(plan.reason.contains("no pk-constrained") || plan.target_shards.is_empty());
}
#[test]
fn test_no_filter_not_narrowable() {
let planner = QueryPlanner::new("id".to_string(), 16, PlannerConfig::default());
let plan = planner.plan(None);
assert!(!plan.narrowed);
}
}

View file

@ -0,0 +1,163 @@
//! §13.3 Adaptive replica selection (EWMA).
//!
//! Selects lowest-scoring replica using latency, in-flight count, and error rate.
use crate::topology::NodeId;
use rand::Rng;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
/// Node score state.
#[derive(Debug, Clone)]
pub struct NodeScore {
/// EWMA of latency in milliseconds.
pub latency_ms: f64,
/// Current in-flight request count.
pub in_flight: u32,
/// EWMA of error rate (0-1).
pub error_rate: f64,
/// Half-life for EWMA decay (seconds).
pub half_life_ms: u64,
/// Last update timestamp.
pub last_updated: Instant,
}
impl NodeScore {
pub fn new() -> Self {
Self {
latency_ms: 50.0,
in_flight: 0,
error_rate: 0.0,
half_life_ms: 5000,
last_updated: Instant::now(),
}
}
/// Update latency sample.
pub fn update_latency(&mut self, latency_ms: f64) {
self.update();
self.latency_ms = self.ewma(self.latency_ms, latency_ms);
}
/// Update error rate.
pub fn update_error(&mut self, error: bool) {
self.update();
let new_rate = if error { 1.0 } else { 0.0 };
self.error_rate = self.ewma(self.error_rate, new_rate);
}
/// Increment/decrement in-flight count.
pub fn adjust_in_flight(&mut self, delta: i32) {
self.in_flight = (self.in_flight as i32 + delta).max(0) as u32;
}
/// Compute combined score (lower is better).
pub fn score(&self, weights: &ScoreWeights) -> f64 {
weights.latency * self.latency_ms
+ weights.inflight * self.in_flight as f64
+ weights.error * self.error_rate * 1000.0
}
/// EWMA calculation.
fn ewma(&self, old: f64, new: f64) -> f64 {
let elapsed = self.last_updated.elapsed().as_millis() as f64;
let alpha = 1.0 - 0.5_f64.powf(elapsed / self.half_life_ms as f64);
(1.0 - alpha) * old + alpha * new
}
fn update(&mut self) {
let _ = self.ewma(0.0, 0.0); // Force update of timestamp
self.last_updated = Instant::now();
}
}
/// Scoring weights.
#[derive(Debug, Clone)]
pub struct ScoreWeights {
pub latency: f64,
pub inflight: f64,
pub error: f64,
}
impl Default for ScoreWeights {
fn default() -> Self {
Self {
latency: 1.0,
inflight: 2.0,
error: 10.0,
}
}
}
/// Adaptive replica selector.
pub struct AdaptiveSelector {
scores: Arc<RwLock<HashMap<NodeId, NodeScore>>>,
weights: ScoreWeights,
exploration_epsilon: f64,
}
impl AdaptiveSelector {
pub fn new(weights: ScoreWeights, exploration_epsilon: f64) -> Self {
Self {
scores: Arc::new(RwLock::new(HashMap::new())),
weights,
exploration_epsilon,
}
}
/// Select the best node from candidates.
pub async fn select(&self, candidates: &[NodeId]) -> Option<NodeId> {
if candidates.is_empty() {
return None;
}
let scores = self.scores.read().await;
let mut rng = rand::thread_rng();
// Exploration: with epsilon probability, pick uniformly at random
if rng.gen::<f64>() < self.exploration_epsilon {
return Some(candidates[rng.gen_range(0..candidates.len())].clone());
}
// Exploitation: pick lowest-scoring node
let mut best = None;
let mut best_score = f64::INFINITY;
for node_id in candidates {
let score = scores.get(node_id)
.map(|s| s.score(&self.weights))
.unwrap_or(0.0);
if score < best_score {
best_score = score;
best = Some(node_id);
}
}
best.cloned()
}
/// Record request start (increment in-flight).
pub async fn request_start(&self, node_id: &NodeId) {
let mut scores = self.scores.write().await;
let entry = scores.entry(node_id.clone()).or_insert_with(NodeScore::new);
entry.adjust_in_flight(1);
}
/// Record request completion (update latency, decrement in-flight).
pub async fn request_complete(&self, node_id: &NodeId, latency_ms: f64, error: bool) {
let mut scores = self.scores.write().await;
let entry = scores.entry(node_id.clone()).or_insert_with(NodeScore::new);
entry.update_latency(latency_ms);
entry.update_error(error);
entry.adjust_in_flight(-1);
}
/// Get current score for a node.
pub async fn score(&self, node_id: &NodeId) -> Option<f64> {
let scores = self.scores.read().await;
scores.get(node_id).map(|s| s.score(&self.weights))
}
}

View file

@ -0,0 +1,361 @@
//! §13.1 Online resharding via shadow index - executor implementation
//!
//! Six-phase resharding process:
//! 1. Shadow create
//! 2. Dual-hash dual-write
//! 3. Backfill
//! 4. Verify
//! 5. Alias swap
//! 6. Cleanup
use crate::error::{MiroirError, Result};
use crate::topology::Topology;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
/// Resharding operation state persisted in task store.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReshardState {
pub id: Uuid,
pub index_uid: String,
pub old_shards: u32,
pub new_shards: u32,
pub phase: Phase,
pub shadow_index: Option<String>,
pub started_at: u64,
pub updated_at: u64,
pub backfill_progress: BackfillProgress,
pub verify_result: Option<VerifyResult>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackfillProgress {
pub total_documents: u64,
pub processed_documents: u64,
pub current_shard: Option<u32>,
pub last_cursor: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VerifyResult {
pub passed: bool,
pub mismatches: Vec<MismatchDetail>,
pub fingerprint_live: String,
pub fingerprint_shadow: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MismatchDetail {
pub primary_key: String,
pub shard_old: u32,
pub shard_new: u32,
pub hash_live: Option<String>,
pub hash_shadow: Option<String>,
}
/// Resharding executor - handles the six-phase process.
pub struct ReshardExecutor {
state: Arc<RwLock<ReshardState>>,
topology: Arc<RwLock<Topology>>,
config: ReshardConfig,
}
#[derive(Debug, Clone)]
pub struct ReshardConfig {
pub backfill_concurrency: usize,
pub backfill_batch_size: usize,
pub throttle_docs_per_sec: u64,
pub verify_before_swap: bool,
pub retain_old_index_hours: u64,
}
impl ReshardExecutor {
/// Create a new resharding operation.
pub fn new(
index_uid: String,
old_shards: u32,
new_shards: u32,
topology: Arc<RwLock<Topology>>,
config: ReshardConfig,
) -> Self {
let id = Uuid::new_v4();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let state = ReshardState {
id,
index_uid,
old_shards,
new_shards,
phase: Phase::Idle,
shadow_index: None,
started_at: now,
updated_at: now,
backfill_progress: BackfillProgress {
total_documents: 0,
processed_documents: 0,
current_shard: None,
last_cursor: None,
},
verify_result: None,
};
Self {
state: Arc::new(RwLock::new(state)),
topology,
config,
}
}
/// Get the current state.
pub async fn state(&self) -> ReshardState {
self.state.read().await.clone()
}
/// Start or advance the resharding operation.
pub async fn advance(&self) -> Result<Phase> {
let mut state = self.state.write().await;
let current_phase = state.phase;
let next_phase = match current_phase {
Phase::Idle => {
// Phase 1: Create shadow index
self.create_shadow_index(&mut state).await?;
Phase::Shadow
}
Phase::Shadow => {
// Phase 2: Start dual-write mode
self.start_dual_write(&mut state).await?;
Phase::DualWrite
}
Phase::DualWrite => {
// Phase 3: Start backfill
self.start_backfill(&mut state).await?;
Phase::Backfill
}
Phase::Backfill => {
// Check if backfill complete
if self.is_backfill_complete(&state).await? {
// Phase 4: Verify
if self.config.verify_before_swap {
self.run_verify(&mut state).await?;
Phase::Verify
} else {
// Skip verify, go straight to swap
Phase::Swap
}
} else {
// Continue backfill
self.advance_backfill(&mut state).await?;
Phase::Backfill
}
}
Phase::Verify => {
let verify_passed = state.verify_result.as_ref()
.map(|v| v.passed)
.unwrap_or(false);
if !verify_passed {
return Err(MiroirError::VerificationFailed(
"Resharding verification failed".to_string()
));
}
// Phase 5: Alias swap
self.alias_swap(&mut state).await?;
Phase::Swap
}
Phase::Swap => {
// Phase 6: Cleanup (scheduled for later)
Phase::Cleanup
}
Phase::Cleanup => {
// Operation complete
Phase::Complete
}
Phase::Complete => {
return Ok(Phase::Complete);
}
};
state.phase = next_phase;
state.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
Ok(next_phase)
}
/// Phase 1: Create shadow index on all nodes.
async fn create_shadow_index(&self, state: &mut ReshardState) -> Result<()> {
let shadow_name = format!("{}__reshard_{}", state.index_uid, state.new_shards);
state.shadow_index = Some(shadow_name.clone());
// TODO: Broadcast index creation to all nodes via task store
// This will be implemented with the two-phase settings broadcast (§13.5)
tracing::info!(
index = %state.index_uid,
shadow = %shadow_name,
old_shards = state.old_shards,
new_shards = state.new_shards,
"Created shadow index"
);
Ok(())
}
/// Phase 2: Start dual-write mode.
async fn start_dual_write(&self, state: &mut ReshardState) -> Result<()> {
tracing::info!(
index = %state.index_uid,
"Started dual-write mode (old + new shard assignments)"
);
Ok(())
}
/// Phase 3: Start backfill from live to shadow.
async fn start_backfill(&self, state: &mut ReshardState) -> Result<()> {
// Get total document count for progress tracking
// TODO: Query nodes for document counts
state.backfill_progress = BackfillProgress {
total_documents: 0, // Will be updated
processed_documents: 0,
current_shard: Some(0),
last_cursor: None,
};
tracing::info!(
index = %state.index_uid,
"Started backfill"
);
Ok(())
}
/// Check if backfill is complete.
async fn is_backfill_complete(&self, state: &ReshardState) -> Result<bool> {
Ok(state.backfill_progress.current_shard
.map(|s| s >= state.old_shards)
.unwrap_or(false))
}
/// Advance backfill by processing one shard.
async fn advance_backfill(&self, state: &mut ReshardState) -> Result<()> {
let shard_id = state.backfill_progress.current_shard.unwrap_or(0);
// TODO: Paginated fetch from live index with filter=_miroir_shard={shard_id}
// Re-hash each document under new shard count
// Write to shadow index with _miroir_shard = new_shard_id
tracing::debug!(
index = %state.index_uid,
shard = shard_id,
"Backfilling shard"
);
state.backfill_progress.processed_documents += self.config.backfill_batch_size as u64;
state.backfill_progress.current_shard = Some(shard_id + 1);
// Apply throttling
if self.config.throttle_docs_per_sec > 0 {
let delay_ms = (self.config.backfill_batch_size as u64 * 1000)
/ self.config.throttle_docs_per_sec;
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
}
Ok(())
}
/// Phase 4: Verify shadow index matches live index.
async fn run_verify(&self, state: &mut ReshardState) -> Result<()> {
tracing::info!(
index = %state.index_uid,
"Running cross-index verification"
);
let mismatches = Vec::new();
// For each shard in both old and new indexes:
// 1. Fetch all primary keys
// 2. Compare content hashes
// 3. Report mismatches
// TODO: Implement bucketed Merkle comparison
// This reuses §13.8's bucketed-Merkle machinery
state.verify_result = Some(VerifyResult {
passed: mismatches.is_empty(),
mismatches,
fingerprint_live: "".to_string(), // TODO: compute actual fingerprint
fingerprint_shadow: "".to_string(),
});
Ok(())
}
/// Phase 5: Atomic alias swap.
async fn alias_swap(&self, state: &mut ReshardState) -> Result<()> {
let shadow_name = state.shadow_index.as_ref()
.ok_or_else(|| MiroirError::InvalidState("Shadow index not created".to_string()))?;
tracing::info!(
index = %state.index_uid,
shadow = %shadow_name,
"Performing atomic alias swap"
);
// TODO: Use §13.7 atomic alias flip
// PUT /_miroir/aliases/{index_uid} {"target": shadow_name}
Ok(())
}
/// Rollback the resharding operation (before phase 5).
pub async fn rollback(&self) -> Result<()> {
let mut state = self.state.write().await;
if state.phase >= Phase::Swap {
return Err(MiroirError::InvalidState(
"Cannot rollback after alias swap".to_string()
));
}
// Delete shadow index
if let Some(ref shadow) = state.shadow_index {
tracing::info!(
index = %state.index_uid,
shadow = %shadow,
"Rolling back: deleting shadow index"
);
// TODO: Broadcast DELETE /indexes/{shadow}
}
state.phase = Phase::Complete;
state.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
Ok(())
}
}
/// Phase of resharding operation.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum Phase {
Idle,
Shadow,
DualWrite,
Backfill,
Verify,
Swap,
Cleanup,
Complete,
}

View file

@ -9,9 +9,11 @@ mod auth;
mod client;
mod error_response;
mod middleware;
mod retry_cache;
mod routes;
mod scatter;
mod state;
mod task_manager;
use routes::{admin, documents, health, indexes, search, settings, tasks};
use state::ProxyState;
@ -51,7 +53,7 @@ async fn main() -> anyhow::Result<()> {
.nest("/admin", admin::router())
.nest("/_miroir", admin::miroir_router())
.layer(axum::extract::DefaultBodyLimit::max(
state.config.server.max_body_bytes,
state.config.server.max_body_bytes as usize,
))
.layer(axum::middleware::from_fn_with_state(state.clone(), auth_middleware))
.layer(axum::middleware::from_fn_with_state(state.clone(), prometheus_middleware))
@ -67,12 +69,16 @@ async fn main() -> anyhow::Result<()> {
info!("listening on {}", main_addr);
info!("metrics on {}", metrics_addr);
// Metrics server (prometheus format)
let metrics_router = Router::new().route("/metrics", get(admin::get_metrics));
let metrics_server = axum::serve(tokio::net::TcpListener::bind(metrics_addr).await?, metrics_router);
// Metrics server (prometheus format) - with state
let metrics_router = Router::new()
.route("/metrics", get(admin::get_metrics))
.with_state(state.clone());
let metrics_listener = tokio::net::TcpListener::bind(metrics_addr).await?;
let metrics_server = axum::serve(metrics_listener, metrics_router);
// Main server
let main_server = axum::serve(tokio::net::TcpListener::bind(main_addr).await?, app);
let main_listener = tokio::net::TcpListener::bind(main_addr).await?;
let main_server = axum::serve(main_listener, app);
tokio::select! {
_ = main_server => {}