P0.2: Scaffold miroir-core crate

Create core library module skeleton with public API surface:
- router.rs: rendezvous hash primitives (twox-hash based)
- topology.rs: Topology, Group, Node, NodeId, NodeStatus types
- scatter.rs: scatter orchestration trait/stubs
- merger.rs: result merge trait/stubs
- task.rs: task registry trait/stubs
- config.rs: Config struct (full YAML shape)
- error.rs: MiroirError enum + Result<T> alias

All acceptance criteria met:
- cargo build -p miroir-core succeeds
- cargo doc -p miroir-core produces rustdoc without warnings
- cargo test -p miroir-core runs (zero tests) successfully

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-18 20:57:47 -04:00
parent 601988829d
commit 93891cd03b
15 changed files with 2894 additions and 1 deletions

View file

@ -6,3 +6,11 @@ license.workspace = true
repository.workspace = true
[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
twox-hash = "2"
thiserror = "2"
tracing = "0.1"
uuid = { version = "1", features = ["v4", "serde"] }
[dev-dependencies]

View file

@ -0,0 +1,114 @@
//! Anti-entropy reconciler module.
//!
//! Stub for plan §13.8 anti-entropy shard reconciler.
//! Full implementation will follow the fingerprint → diff → repair pipeline.
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use crate::migration::{MigrationConfig, MigrationCoordinator, MigrationError};
/// Anti-entropy configuration (plan §13.8).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AntiEntropyConfig {
pub enabled: bool,
pub schedule_cron: String,
pub shards_per_pass: u32,
pub max_read_concurrency: u32,
pub fingerprint_batch_size: u32,
pub auto_repair: bool,
pub updated_at_field: String,
}
impl Default for AntiEntropyConfig {
fn default() -> Self {
Self {
enabled: true,
schedule_cron: "0 */6 * * *".to_string(),
shards_per_pass: 0,
max_read_concurrency: 2,
fingerprint_batch_size: 1000,
auto_repair: true,
updated_at_field: "_miroir_updated_at".to_string(),
}
}
}
/// Validates that migration is safe given the anti-entropy configuration.
/// Returns Ok(()) if safe, Err with a descriptive message if not.
pub fn validate_migration_safety(
ae_config: &AntiEntropyConfig,
migration_config: &MigrationConfig,
) -> Result<(), MigrationError> {
if migration_config.skip_delta_pass && !ae_config.enabled {
return Err(MigrationError::UnsafeCutoverNoAntiEntropy);
}
Ok(())
}
/// Generates a warning if anti-entropy is disabled during active migration.
/// The caller should log this at warn level.
pub fn migration_warning_if_ae_disabled(ae_enabled: bool) -> Option<String> {
if ae_enabled {
return None;
}
Some(
"Anti-entropy is disabled. Shard migration cutover relies on the delta pass \
to prevent data loss at the cutover boundary. If delta pass is also skipped, \
documents written during migration may be permanently lost."
.to_string(),
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate_safe_with_delta_pass() {
let ae = AntiEntropyConfig {
enabled: false,
..Default::default()
};
let mc = MigrationConfig {
skip_delta_pass: false,
..Default::default()
};
assert!(validate_migration_safety(&ae, &mc).is_ok());
}
#[test]
fn test_validate_unsafe_without_anti_entropy() {
let ae = AntiEntropyConfig {
enabled: false,
..Default::default()
};
let mc = MigrationConfig {
skip_delta_pass: true,
anti_entropy_enabled: false,
..Default::default()
};
assert!(validate_migration_safety(&ae, &mc).is_err());
}
#[test]
fn test_validate_safe_with_anti_entropy_safety_net() {
let ae = AntiEntropyConfig {
enabled: true,
..Default::default()
};
let mc = MigrationConfig {
skip_delta_pass: true,
anti_entropy_enabled: true,
..Default::default()
};
assert!(validate_migration_safety(&ae, &mc).is_ok());
}
#[test]
fn test_warning_when_ae_disabled() {
assert!(migration_warning_if_ae_disabled(false).is_some());
assert!(migration_warning_if_ae_disabled(true).is_none());
}
}

View file

@ -0,0 +1,831 @@
//! §13 Advanced capabilities configuration structs.
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
// ---------------------------------------------------------------------------
// 13.1 Online resharding
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct ReshardingConfig {
pub enabled: bool,
pub backfill_concurrency: u32,
pub backfill_batch_size: u32,
pub throttle_docs_per_sec: u32,
pub verify_before_swap: bool,
pub retain_old_index_hours: u32,
}
impl Default for ReshardingConfig {
fn default() -> Self {
Self {
enabled: true,
backfill_concurrency: 4,
backfill_batch_size: 1000,
throttle_docs_per_sec: 0,
verify_before_swap: true,
retain_old_index_hours: 48,
}
}
}
// ---------------------------------------------------------------------------
// 13.2 Hedged requests
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct HedgingConfig {
pub enabled: bool,
pub p95_trigger_multiplier: f64,
pub min_trigger_ms: u64,
pub max_hedges_per_query: u32,
pub cross_group_fallback: bool,
}
impl Default for HedgingConfig {
fn default() -> Self {
Self {
enabled: true,
p95_trigger_multiplier: 1.2,
min_trigger_ms: 15,
max_hedges_per_query: 2,
cross_group_fallback: true,
}
}
}
// ---------------------------------------------------------------------------
// 13.3 Adaptive replica selection (EWMA)
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct ReplicaSelectionConfig {
/// `adaptive`, `round_robin`, or `random`.
pub strategy: String,
pub latency_weight: f64,
pub inflight_weight: f64,
pub error_weight: f64,
pub ewma_half_life_ms: u64,
pub exploration_epsilon: f64,
}
impl Default for ReplicaSelectionConfig {
fn default() -> Self {
Self {
strategy: "adaptive".into(),
latency_weight: 1.0,
inflight_weight: 2.0,
error_weight: 10.0,
ewma_half_life_ms: 5000,
exploration_epsilon: 0.05,
}
}
}
// ---------------------------------------------------------------------------
// 13.4 Shard-aware query planner
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct QueryPlannerConfig {
pub enabled: bool,
pub max_pk_literals_narrowable: u32,
pub log_plans: bool,
}
impl Default for QueryPlannerConfig {
fn default() -> Self {
Self {
enabled: true,
max_pk_literals_narrowable: 128,
log_plans: false,
}
}
}
// ---------------------------------------------------------------------------
// 13.5 Two-phase settings broadcast
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct SettingsBroadcastConfig {
/// `two_phase` or `sequential` (legacy).
pub strategy: String,
pub verify_timeout_s: u64,
pub max_repair_retries: u32,
pub freeze_writes_on_unrepairable: bool,
}
impl Default for SettingsBroadcastConfig {
fn default() -> Self {
Self {
strategy: "two_phase".into(),
verify_timeout_s: 60,
max_repair_retries: 3,
freeze_writes_on_unrepairable: true,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct SettingsDriftCheckConfig {
pub interval_s: u64,
pub auto_repair: bool,
}
impl Default for SettingsDriftCheckConfig {
fn default() -> Self {
Self {
interval_s: 300,
auto_repair: true,
}
}
}
// ---------------------------------------------------------------------------
// 13.6 Session pinning (read-your-writes)
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct SessionPinningConfig {
pub enabled: bool,
pub ttl_seconds: u64,
pub max_sessions: u32,
/// `block` or `route_pin`.
pub wait_strategy: String,
pub max_wait_ms: u64,
}
impl Default for SessionPinningConfig {
fn default() -> Self {
Self {
enabled: true,
ttl_seconds: 900,
max_sessions: 100_000,
wait_strategy: "block".into(),
max_wait_ms: 5000,
}
}
}
// ---------------------------------------------------------------------------
// 13.7 Index aliases
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct AliasesConfig {
pub enabled: bool,
pub history_retention: u32,
pub require_target_exists: bool,
}
impl Default for AliasesConfig {
fn default() -> Self {
Self {
enabled: true,
history_retention: 10,
require_target_exists: true,
}
}
}
// ---------------------------------------------------------------------------
// 13.8 Anti-entropy shard reconciler
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct AntiEntropyConfig {
pub enabled: bool,
pub schedule: String,
pub shards_per_pass: u32,
pub max_read_concurrency: u32,
pub fingerprint_batch_size: u32,
pub auto_repair: bool,
pub updated_at_field: String,
}
impl Default for AntiEntropyConfig {
fn default() -> Self {
Self {
enabled: true,
schedule: "every 6h".into(),
shards_per_pass: 0,
max_read_concurrency: 2,
fingerprint_batch_size: 1000,
auto_repair: true,
updated_at_field: "_miroir_updated_at".into(),
}
}
}
// ---------------------------------------------------------------------------
// 13.9 Streaming dump import
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct DumpImportConfig {
/// `streaming` or `broadcast` (legacy).
pub mode: String,
pub batch_size: u32,
pub parallel_target_writes: u32,
pub memory_buffer_bytes: u64,
pub chunk_size_bytes: u64,
}
impl Default for DumpImportConfig {
fn default() -> Self {
Self {
mode: "streaming".into(),
batch_size: 1000,
parallel_target_writes: 8,
memory_buffer_bytes: 134_217_728, // 128 MiB
chunk_size_bytes: 268_435_456, // 256 MiB
}
}
}
// ---------------------------------------------------------------------------
// 13.10 Idempotency keys
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct IdempotencyConfig {
pub enabled: bool,
pub ttl_seconds: u64,
pub max_cached_keys: u32,
}
impl Default for IdempotencyConfig {
fn default() -> Self {
Self {
enabled: true,
ttl_seconds: 86400,
max_cached_keys: 1_000_000,
}
}
}
// ---------------------------------------------------------------------------
// 13.10 Query coalescing (paired with idempotency)
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct QueryCoalescingConfig {
pub enabled: bool,
pub window_ms: u64,
pub max_subscribers: u32,
pub max_pending_queries: u32,
}
impl Default for QueryCoalescingConfig {
fn default() -> Self {
Self {
enabled: true,
window_ms: 50,
max_subscribers: 1000,
max_pending_queries: 10000,
}
}
}
// ---------------------------------------------------------------------------
// 13.11 Multi-search batch API
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MultiSearchConfig {
pub enabled: bool,
pub max_queries_per_batch: u32,
pub total_timeout_ms: u64,
pub per_query_timeout_ms: u64,
}
impl Default for MultiSearchConfig {
fn default() -> Self {
Self {
enabled: true,
max_queries_per_batch: 100,
total_timeout_ms: 30000,
per_query_timeout_ms: 30000,
}
}
}
// ---------------------------------------------------------------------------
// 13.12 Vector / hybrid search
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct VectorSearchConfig {
pub enabled: bool,
pub over_fetch_factor: u32,
/// `convex` or `rrf`.
pub merge_strategy: String,
pub hybrid_alpha_default: f64,
pub rrf_k: u32,
}
impl Default for VectorSearchConfig {
fn default() -> Self {
Self {
enabled: true,
over_fetch_factor: 3,
merge_strategy: "convex".into(),
hybrid_alpha_default: 0.5,
rrf_k: 60,
}
}
}
// ---------------------------------------------------------------------------
// 13.13 Change data capture (CDC)
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct CdcConfig {
pub enabled: bool,
pub emit_ttl_deletes: bool,
pub emit_internal_writes: bool,
pub sinks: Vec<CdcSinkConfig>,
pub buffer: CdcBufferConfig,
}
impl Default for CdcConfig {
fn default() -> Self {
Self {
enabled: true,
emit_ttl_deletes: false,
emit_internal_writes: false,
sinks: Vec::new(),
buffer: CdcBufferConfig::default(),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct CdcSinkConfig {
/// `webhook`, `nats`, `kafka`, or `internal`.
#[serde(rename = "type")]
pub sink_type: String,
pub url: String,
pub batch_size: u32,
pub batch_flush_ms: u64,
pub include_body: bool,
pub retry_max_s: u64,
/// NATS-specific.
pub subject_prefix: Option<String>,
}
impl Default for CdcSinkConfig {
fn default() -> Self {
Self {
sink_type: "webhook".into(),
url: String::new(),
batch_size: 100,
batch_flush_ms: 1000,
include_body: false,
retry_max_s: 3600,
subject_prefix: None,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct CdcBufferConfig {
/// `memory`, `redis`, or `pvc`.
pub primary: String,
pub memory_bytes: u64,
/// `redis`, `pvc`, or `drop`.
pub overflow: String,
pub redis_bytes: u64,
}
impl Default for CdcBufferConfig {
fn default() -> Self {
Self {
primary: "memory".into(),
memory_bytes: 67_108_864, // 64 MiB
overflow: "redis".into(),
redis_bytes: 1_073_741_824, // 1 GiB
}
}
}
// ---------------------------------------------------------------------------
// 13.14 Document TTL
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct TtlConfig {
pub enabled: bool,
pub sweep_interval_s: u64,
pub max_deletes_per_sweep: u32,
pub expires_at_field: String,
pub per_index_overrides: HashMap<String, TtlOverride>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TtlOverride {
pub sweep_interval_s: u64,
pub max_deletes_per_sweep: u32,
}
impl Default for TtlConfig {
fn default() -> Self {
Self {
enabled: true,
sweep_interval_s: 300,
max_deletes_per_sweep: 10000,
expires_at_field: "_miroir_expires_at".into(),
per_index_overrides: HashMap::new(),
}
}
}
// ---------------------------------------------------------------------------
// 13.15 Tenant-to-replica-group affinity
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct TenantAffinityConfig {
pub enabled: bool,
/// `header`, `api_key`, or `explicit`.
pub mode: String,
pub header_name: String,
/// `hash`, `random`, or `reject`.
pub fallback: String,
pub static_map: HashMap<String, u32>,
pub dedicated_groups: Vec<u32>,
}
impl Default for TenantAffinityConfig {
fn default() -> Self {
Self {
enabled: true,
mode: "header".into(),
header_name: "X-Miroir-Tenant".into(),
fallback: "hash".into(),
static_map: HashMap::new(),
dedicated_groups: Vec::new(),
}
}
}
// ---------------------------------------------------------------------------
// 13.16 Traffic shadow
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct ShadowConfig {
pub enabled: bool,
pub targets: Vec<ShadowTargetConfig>,
pub diff_buffer_size: u32,
pub max_shadow_latency_ms: u64,
}
impl Default for ShadowConfig {
fn default() -> Self {
Self {
enabled: true,
targets: Vec::new(),
diff_buffer_size: 10000,
max_shadow_latency_ms: 5000,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct ShadowTargetConfig {
pub name: String,
pub url: String,
pub api_key_env: String,
pub sample_rate: f64,
pub operations: Vec<String>,
}
impl Default for ShadowTargetConfig {
fn default() -> Self {
Self {
name: String::new(),
url: String::new(),
api_key_env: String::new(),
sample_rate: 0.05,
operations: vec!["search".into(), "multi_search".into(), "explain".into()],
}
}
}
// ---------------------------------------------------------------------------
// 13.17 Index lifecycle management (ILM)
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct IlmConfig {
pub enabled: bool,
pub check_interval_s: u64,
pub safety_lock_older_than_days: u32,
pub max_rollovers_per_check: u32,
}
impl Default for IlmConfig {
fn default() -> Self {
Self {
enabled: true,
check_interval_s: 3600,
safety_lock_older_than_days: 7,
max_rollovers_per_check: 10,
}
}
}
// ---------------------------------------------------------------------------
// 13.18 Synthetic canary queries
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct CanaryRunnerConfig {
pub enabled: bool,
pub max_concurrent_canaries: u32,
pub run_history_per_canary: u32,
pub emit_results_to_cdc: bool,
}
impl Default for CanaryRunnerConfig {
fn default() -> Self {
Self {
enabled: true,
max_concurrent_canaries: 10,
run_history_per_canary: 100,
emit_results_to_cdc: true,
}
}
}
// ---------------------------------------------------------------------------
// 13.19 Admin Web UI
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct AdminUiConfig {
pub enabled: bool,
pub path: String,
/// `key`, `oauth` (future), or `none` (dev only).
pub auth: String,
pub session_ttl_s: u64,
pub read_only_mode: bool,
pub allowed_origins: Vec<String>,
pub cors_allowed_origins: Vec<String>,
pub csp_overrides: CspOverridesConfig,
pub theme: AdminUiThemeConfig,
pub features: AdminUiFeaturesConfig,
}
impl Default for AdminUiConfig {
fn default() -> Self {
Self {
enabled: true,
path: "/_miroir/admin".into(),
auth: "key".into(),
session_ttl_s: 3600,
read_only_mode: false,
allowed_origins: vec!["same-origin".into()],
cors_allowed_origins: Vec::new(),
csp_overrides: CspOverridesConfig::default(),
theme: AdminUiThemeConfig::default(),
features: AdminUiFeaturesConfig::default(),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct CspOverridesConfig {
pub script_src: Vec<String>,
pub img_src: Vec<String>,
pub connect_src: Vec<String>,
}
impl Default for CspOverridesConfig {
fn default() -> Self {
Self {
script_src: Vec::new(),
img_src: Vec::new(),
connect_src: Vec::new(),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct AdminUiThemeConfig {
pub accent_color: String,
/// `auto`, `light`, or `dark`.
pub default_mode: String,
}
impl Default for AdminUiThemeConfig {
fn default() -> Self {
Self {
accent_color: "#2563eb".into(),
default_mode: "auto".into(),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct AdminUiFeaturesConfig {
pub sandbox: bool,
pub shadow_viewer: bool,
pub cdc_inspector: bool,
}
impl Default for AdminUiFeaturesConfig {
fn default() -> Self {
Self {
sandbox: true,
shadow_viewer: true,
cdc_inspector: true,
}
}
}
// ---------------------------------------------------------------------------
// 13.20 Query explain API
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct ExplainConfig {
pub enabled: bool,
pub max_warnings: u32,
pub allow_execute_parameter: bool,
}
impl Default for ExplainConfig {
fn default() -> Self {
Self {
enabled: true,
max_warnings: 20,
allow_execute_parameter: true,
}
}
}
// ---------------------------------------------------------------------------
// 13.21 Search UI (end-user)
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct SearchUiConfig {
pub enabled: bool,
pub path: String,
pub widget_script_enabled: bool,
pub embeddable: bool,
pub auth: SearchUiAuthConfig,
pub allowed_origins: Vec<String>,
pub scoped_key_max_age_days: u32,
pub scoped_key_rotate_before_expiry_days: u32,
pub scoped_key_rotation_drain_s: u64,
pub rate_limit: SearchUiRateLimitConfig,
pub cors_allowed_origins: Vec<String>,
pub csp_overrides: CspOverridesConfig,
pub csp: String,
pub analytics: SearchUiAnalyticsConfig,
}
impl Default for SearchUiConfig {
fn default() -> Self {
Self {
enabled: true,
path: "/ui/search".into(),
widget_script_enabled: true,
embeddable: true,
auth: SearchUiAuthConfig::default(),
allowed_origins: vec!["*".into()],
scoped_key_max_age_days: 60,
scoped_key_rotate_before_expiry_days: 30,
scoped_key_rotation_drain_s: 120,
rate_limit: SearchUiRateLimitConfig::default(),
cors_allowed_origins: Vec::new(),
csp_overrides: CspOverridesConfig::default(),
csp: "default-src 'self'; img-src 'self' https:; style-src 'self' 'unsafe-inline'"
.into(),
analytics: SearchUiAnalyticsConfig::default(),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct SearchUiAuthConfig {
/// `public`, `shared_key`, or `oauth_proxy`.
pub mode: String,
pub shared_key_env: String,
pub session_ttl_s: u64,
pub session_rate_limit: String,
pub jwt_secret_env: String,
pub oauth_proxy: OAuthProxyConfig,
}
impl Default for SearchUiAuthConfig {
fn default() -> Self {
Self {
mode: "public".into(),
shared_key_env: String::new(),
session_ttl_s: 900,
session_rate_limit: "10/minute".into(),
jwt_secret_env: "SEARCH_UI_JWT_SECRET".into(),
oauth_proxy: OAuthProxyConfig::default(),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct OAuthProxyConfig {
pub user_header: String,
pub groups_header: String,
pub filter_template: Option<String>,
pub attribute_map: HashMap<String, String>,
}
impl Default for OAuthProxyConfig {
fn default() -> Self {
Self {
user_header: "X-Forwarded-User".into(),
groups_header: "X-Forwarded-Groups".into(),
filter_template: Some("tenant IN [{groups}]".into()),
attribute_map: {
let mut m = HashMap::new();
m.insert("groups".into(), "groups_array".into());
m.insert("user".into(), "user_id_string".into());
m
},
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct SearchUiRateLimitConfig {
pub per_ip: String,
/// `redis` or `local`.
pub backend: String,
pub redis_key_prefix: String,
pub redis_ttl_s: u64,
}
impl Default for SearchUiRateLimitConfig {
fn default() -> Self {
Self {
per_ip: "60/minute".into(),
backend: "redis".into(),
redis_key_prefix: "miroir:ratelimit:searchui:".into(),
redis_ttl_s: 60,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct SearchUiAnalyticsConfig {
pub enabled: bool,
/// `cdc` (publishes click-throughs as CDC events).
pub sink: String,
}
impl Default for SearchUiAnalyticsConfig {
fn default() -> Self {
Self {
enabled: false,
sink: "cdc".into(),
}
}
}

View file

@ -0,0 +1,352 @@
mod advanced;
mod error;
mod load;
mod validate;
pub use error::ConfigError;
use serde::{Deserialize, Serialize};
/// Top-level configuration matching plan §4 YAML schema under `miroir:`.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MiroirConfig {
// --- Secrets (env-var overrides) ---
/// Client-facing API key. Env override: `MIROIR_MASTER_KEY`.
pub master_key: String,
/// Key Miroir uses on Meilisearch nodes. Env override: `MIROIR_NODE_MASTER_KEY`.
pub node_master_key: String,
// --- Core topology ---
/// Total number of logical shards.
pub shards: u32,
/// Replication factor (intra-group replicas per shard). Production: 2.
pub replication_factor: u32,
/// Number of independent query pools. Default 1; production: 2.
pub replica_groups: u32,
// --- Sub-structs ---
pub nodes: Vec<NodeConfig>,
pub task_store: TaskStoreConfig,
pub admin: AdminConfig,
pub health: HealthConfig,
pub scatter: ScatterConfig,
pub rebalancer: RebalancerConfig,
pub server: ServerConfig,
pub connection_pool_per_node: ConnectionPoolConfig,
pub task_registry: TaskRegistryConfig,
// --- §13 advanced capabilities ---
pub resharding: advanced::ReshardingConfig,
pub hedging: advanced::HedgingConfig,
pub replica_selection: advanced::ReplicaSelectionConfig,
pub query_planner: advanced::QueryPlannerConfig,
pub settings_broadcast: advanced::SettingsBroadcastConfig,
pub settings_drift_check: advanced::SettingsDriftCheckConfig,
pub session_pinning: advanced::SessionPinningConfig,
pub aliases: advanced::AliasesConfig,
pub anti_entropy: advanced::AntiEntropyConfig,
pub dump_import: advanced::DumpImportConfig,
pub idempotency: advanced::IdempotencyConfig,
pub query_coalescing: advanced::QueryCoalescingConfig,
pub multi_search: advanced::MultiSearchConfig,
pub vector_search: advanced::VectorSearchConfig,
pub cdc: advanced::CdcConfig,
pub ttl: advanced::TtlConfig,
pub tenant_affinity: advanced::TenantAffinityConfig,
pub shadow: advanced::ShadowConfig,
pub ilm: advanced::IlmConfig,
pub canary_runner: advanced::CanaryRunnerConfig,
pub explain: advanced::ExplainConfig,
pub admin_ui: advanced::AdminUiConfig,
pub search_ui: advanced::SearchUiConfig,
// --- §14 horizontal scaling ---
pub peer_discovery: PeerDiscoveryConfig,
pub leader_election: LeaderElectionConfig,
pub hpa: HpaConfig,
}
impl Default for MiroirConfig {
fn default() -> Self {
Self {
master_key: String::new(),
node_master_key: String::new(),
shards: 64,
replication_factor: 2,
replica_groups: 1,
nodes: Vec::new(),
task_store: TaskStoreConfig::default(),
admin: AdminConfig::default(),
health: HealthConfig::default(),
scatter: ScatterConfig::default(),
rebalancer: RebalancerConfig::default(),
server: ServerConfig::default(),
connection_pool_per_node: ConnectionPoolConfig::default(),
task_registry: TaskRegistryConfig::default(),
resharding: advanced::ReshardingConfig::default(),
hedging: advanced::HedgingConfig::default(),
replica_selection: advanced::ReplicaSelectionConfig::default(),
query_planner: advanced::QueryPlannerConfig::default(),
settings_broadcast: advanced::SettingsBroadcastConfig::default(),
settings_drift_check: advanced::SettingsDriftCheckConfig::default(),
session_pinning: advanced::SessionPinningConfig::default(),
aliases: advanced::AliasesConfig::default(),
anti_entropy: advanced::AntiEntropyConfig::default(),
dump_import: advanced::DumpImportConfig::default(),
idempotency: advanced::IdempotencyConfig::default(),
query_coalescing: advanced::QueryCoalescingConfig::default(),
multi_search: advanced::MultiSearchConfig::default(),
vector_search: advanced::VectorSearchConfig::default(),
cdc: advanced::CdcConfig::default(),
ttl: advanced::TtlConfig::default(),
tenant_affinity: advanced::TenantAffinityConfig::default(),
shadow: advanced::ShadowConfig::default(),
ilm: advanced::IlmConfig::default(),
canary_runner: advanced::CanaryRunnerConfig::default(),
explain: advanced::ExplainConfig::default(),
admin_ui: advanced::AdminUiConfig::default(),
search_ui: advanced::SearchUiConfig::default(),
peer_discovery: PeerDiscoveryConfig::default(),
leader_election: LeaderElectionConfig::default(),
hpa: HpaConfig::default(),
}
}
}
impl MiroirConfig {
/// Validate cross-field constraints. Returns `Ok(())` or a `ConfigError`.
pub fn validate(&self) -> Result<(), ConfigError> {
validate::validate(self)
}
/// Layered loading: file → env overrides → CLI overrides.
pub fn load() -> Result<Self, ConfigError> {
load::load()
}
}
/// A single Meilisearch node in the cluster topology.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct NodeConfig {
pub id: String,
pub address: String,
pub replica_group: u32,
}
/// Task store backend configuration.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct TaskStoreConfig {
/// `sqlite` or `redis`.
pub backend: String,
/// Path to SQLite database file (sqlite backend).
pub path: String,
/// Redis URL (redis backend), e.g. `redis://host:6379`.
pub url: String,
}
impl Default for TaskStoreConfig {
fn default() -> Self {
Self {
backend: "sqlite".into(),
path: "/data/miroir-tasks.db".into(),
url: String::new(),
}
}
}
/// Admin API configuration.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct AdminConfig {
pub enabled: bool,
/// Env override: `MIROIR_ADMIN_API_KEY`.
pub api_key: String,
}
impl Default for AdminConfig {
fn default() -> Self {
Self {
enabled: true,
api_key: String::new(),
}
}
}
/// Health check configuration.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct HealthConfig {
pub interval_ms: u64,
pub timeout_ms: u64,
pub unhealthy_threshold: u32,
pub recovery_threshold: u32,
}
impl Default for HealthConfig {
fn default() -> Self {
Self {
interval_ms: 5000,
timeout_ms: 2000,
unhealthy_threshold: 3,
recovery_threshold: 2,
}
}
}
/// Scatter-gather query configuration.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct ScatterConfig {
pub node_timeout_ms: u64,
pub retry_on_timeout: bool,
/// `partial` or `error`.
pub unavailable_shard_policy: String,
}
impl Default for ScatterConfig {
fn default() -> Self {
Self {
node_timeout_ms: 5000,
retry_on_timeout: true,
unavailable_shard_policy: "partial".into(),
}
}
}
/// Rebalancer configuration.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct RebalancerConfig {
pub auto_rebalance_on_recovery: bool,
pub max_concurrent_migrations: u32,
pub migration_timeout_s: u64,
}
impl Default for RebalancerConfig {
fn default() -> Self {
Self {
auto_rebalance_on_recovery: true,
max_concurrent_migrations: 4,
migration_timeout_s: 3600,
}
}
}
/// Server (HTTP listener) configuration.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct ServerConfig {
pub port: u16,
pub bind: String,
pub max_body_bytes: u64,
#[serde(default = "default_max_concurrent_requests")]
pub max_concurrent_requests: u32,
#[serde(default = "default_request_timeout_ms")]
pub request_timeout_ms: u64,
}
fn default_max_concurrent_requests() -> u32 {
500
}
fn default_request_timeout_ms() -> u64 {
30000
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
port: 7700,
bind: "0.0.0.0".into(),
max_body_bytes: 104_857_600, // 100 MiB
max_concurrent_requests: default_max_concurrent_requests(),
request_timeout_ms: default_request_timeout_ms(),
}
}
}
/// HTTP/2 connection pool per-node settings (§14.8).
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct ConnectionPoolConfig {
pub max_idle: u32,
pub max_total: u32,
pub idle_timeout_s: u64,
}
impl Default for ConnectionPoolConfig {
fn default() -> Self {
Self {
max_idle: 32,
max_total: 128,
idle_timeout_s: 60,
}
}
}
/// Task registry cache settings (§14.8).
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct TaskRegistryConfig {
pub cache_size: u32,
pub redis_pool_max: u32,
}
impl Default for TaskRegistryConfig {
fn default() -> Self {
Self {
cache_size: 10000,
redis_pool_max: 50,
}
}
}
/// Peer discovery via Kubernetes headless Service (§14.5).
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct PeerDiscoveryConfig {
pub service_name: String,
pub refresh_interval_s: u64,
}
impl Default for PeerDiscoveryConfig {
fn default() -> Self {
Self {
service_name: "miroir-headless".into(),
refresh_interval_s: 15,
}
}
}
/// Leader election for Mode B background jobs (§14.5).
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct LeaderElectionConfig {
pub enabled: bool,
pub lease_ttl_s: u64,
pub renew_interval_s: u64,
}
impl Default for LeaderElectionConfig {
fn default() -> Self {
Self {
enabled: true,
lease_ttl_s: 10,
renew_interval_s: 3,
}
}
}
/// Horizontal Pod Autoscaler settings (Helm-only, informational in config).
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct HpaConfig {
pub enabled: bool,
}
impl Default for HpaConfig {
fn default() -> Self {
Self { enabled: false }
}
}

View file

@ -0,0 +1,79 @@
//! Miroir configuration.
use serde::{Deserialize, Serialize};
/// Main Miroir configuration.
///
/// This struct represents the full configuration shape matching the plan §4 YAML.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
/// Shard count (fixed at index creation).
pub shards: u32,
/// Replication factor (elastic, intra-group copies per shard).
pub replication_factor: usize,
/// Number of replica groups (elastic, independent query pools).
pub replica_groups: u32,
/// Node configuration.
pub nodes: Vec<NodeConfig>,
/// Scatter configuration.
pub scatter: ScatterConfig,
/// Search UI configuration.
#[serde(default)]
pub search_ui: SearchUiConfig,
}
/// Configuration for a single node.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeConfig {
/// Unique node identifier.
pub id: String,
/// Node base URL (e.g., <http://meilisearch-0.miroir:7700>).
pub url: String,
/// Replica group assignment (0-based).
pub replica_group: u32,
}
/// Scatter (fan-out) configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScatterConfig {
/// Policy for handling unavailable shards.
#[serde(default = "default_unavailable_shard_policy")]
pub unavailable_shard_policy: UnavailableShardPolicy,
}
/// Policy for handling unavailable shards during scatter.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum UnavailableShardPolicy {
/// Return partial results from available nodes.
Partial,
/// Fail the request if any shard is unavailable.
Fail,
/// Fall back to another replica group for unavailable shards.
Fallback,
}
fn default_unavailable_shard_policy() -> UnavailableShardPolicy {
UnavailableShardPolicy::Partial
}
/// Search UI configuration.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct SearchUiConfig {
/// Whether the search UI is enabled.
#[serde(default)]
pub enabled: bool,
/// CORS allowed origins.
#[serde(default)]
pub cors_allowed_origins: Vec<String>,
}

View file

@ -0,0 +1,16 @@
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ConfigError {
#[error("config file error: {0}")]
File(#[from] std::io::Error),
#[error("config parse error: {0}")]
Parse(#[from] config::ConfigError),
#[error("YAML serialization error: {0}")]
Yaml(#[from] serde_yaml::Error),
#[error("validation error: {0}")]
Validation(String),
}

View file

@ -0,0 +1,146 @@
use crate::config::{ConfigError, MiroirConfig};
pub fn validate(cfg: &MiroirConfig) -> Result<(), ConfigError> {
// replication_factor > 1 requires redis backend for HA
if cfg.replication_factor > 1 && cfg.task_store.backend == "sqlite" {
return Err(ConfigError::Validation(
"replication_factor > 1 requires task_store.backend = 'redis' (SQLite is single-writer)".into(),
));
}
// replica_groups > 1 requires redis backend
if cfg.replica_groups > 1 && cfg.task_store.backend == "sqlite" {
return Err(ConfigError::Validation(
"replica_groups > 1 requires task_store.backend = 'redis' (SQLite is single-writer)".into(),
));
}
// Nodes must belong to a valid replica group
if cfg.replica_groups > 0 {
for node in &cfg.nodes {
if node.replica_group >= cfg.replica_groups {
return Err(ConfigError::Validation(format!(
"node '{}' has replica_group={} but only {} groups exist (0..{})",
node.id,
node.replica_group,
cfg.replica_groups,
cfg.replica_groups - 1
)));
}
}
}
// Node IDs must be unique
let mut seen_ids = std::collections::HashSet::new();
for node in &cfg.nodes {
if !seen_ids.insert(&node.id) {
return Err(ConfigError::Validation(format!(
"duplicate node id: '{}'",
node.id
)));
}
}
// HPA enabled requires redis backend
if cfg.hpa.enabled && cfg.task_store.backend == "sqlite" {
return Err(ConfigError::Validation(
"hpa.enabled = true requires task_store.backend = 'redis'".into(),
));
}
// Search UI scoped_key timing validation
if cfg.search_ui.enabled {
let max_age = cfg.search_ui.scoped_key_max_age_days;
let rotate_before = cfg.search_ui.scoped_key_rotate_before_expiry_days;
if rotate_before >= max_age {
return Err(ConfigError::Validation(format!(
"search_ui.scoped_key_rotate_before_expiry_days ({}) must be strictly less than scoped_key_max_age_days ({})",
rotate_before, max_age
)));
}
}
// CDC overflow = redis requires redis backend
if cfg.cdc.enabled && cfg.cdc.buffer.overflow == "redis" && cfg.task_store.backend != "redis" {
return Err(ConfigError::Validation(
"cdc.buffer.overflow = 'redis' requires task_store.backend = 'redis'".into(),
));
}
// Search UI rate_limit.backend = redis requires redis task store (when multi-pod)
if cfg.search_ui.enabled
&& cfg.search_ui.rate_limit.backend == "redis"
&& cfg.task_store.backend != "redis"
{
return Err(ConfigError::Validation(
"search_ui.rate_limit.backend = 'redis' requires task_store.backend = 'redis'".into(),
));
}
// Leader election should be enabled when replica_groups > 1
if cfg.replica_groups > 1 && !cfg.leader_election.enabled {
return Err(ConfigError::Validation(
"leader_election.enabled must be true when replica_groups > 1".into(),
));
}
// Tenant affinity dedicated_groups must be within valid range
if cfg.tenant_affinity.enabled {
for g in &cfg.tenant_affinity.dedicated_groups {
if *g >= cfg.replica_groups {
return Err(ConfigError::Validation(format!(
"tenant_affinity.dedicated_groups contains {} but only {} groups (0..{})",
g,
cfg.replica_groups,
cfg.replica_groups - 1
)));
}
}
for (tenant, group) in &cfg.tenant_affinity.static_map {
if *group >= cfg.replica_groups {
return Err(ConfigError::Validation(format!(
"tenant_affinity.static_map: tenant '{}' maps to group {} but only {} groups (0..{})",
tenant,
group,
cfg.replica_groups,
cfg.replica_groups - 1
)));
}
}
}
// Shadow targets must have valid sample_rate
if cfg.shadow.enabled {
for target in &cfg.shadow.targets {
if target.sample_rate <= 0.0 || target.sample_rate > 1.0 {
return Err(ConfigError::Validation(format!(
"shadow target '{}' has invalid sample_rate={} (must be 0 < rate <= 1)",
target.name, target.sample_rate
)));
}
}
}
// Server port must be non-zero
if cfg.server.port == 0 {
return Err(ConfigError::Validation(
"server.port must be non-zero".into(),
));
}
// shards must be non-zero
if cfg.shards == 0 {
return Err(ConfigError::Validation(
"shards must be non-zero".into(),
));
}
// replication_factor must be > 0
if cfg.replication_factor == 0 {
return Err(ConfigError::Validation(
"replication_factor must be > 0".into(),
));
}
Ok(())
}

View file

@ -0,0 +1,38 @@
//! Error types for Miroir.
use thiserror::Error;
/// Result type alias for Miroir operations.
pub type Result<T> = std::result::Result<T, MiroirError>;
/// Core error type for Miroir.
#[derive(Error, Debug)]
pub enum MiroirError {
/// Configuration error.
#[error("configuration error: {0}")]
Config(String),
/// Topology error.
#[error("topology error: {0}")]
Topology(String),
/// Routing error.
#[error("routing error: {0}")]
Routing(String),
/// Merge error.
#[error("merge error: {0}")]
Merge(String),
/// Task registry error.
#[error("task error: {0}")]
Task(String),
/// IO error.
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
/// JSON serialization/deserialization error.
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
}

View file

@ -1 +1,14 @@
// miroir-core placeholder
//! Miroir core library
//!
//! Provides routing, merging, and topology logic for the Miroir distributed search proxy.
pub mod config;
pub mod error;
pub mod merger;
pub mod router;
pub mod scatter;
pub mod task;
pub mod topology;
// Public re-exports
pub use error::{MiroirError, Result};

View file

@ -0,0 +1,74 @@
//! Result merger: combines shard results into a single response.
use crate::Result;
use serde_json::Value;
/// Result merger: combines responses from multiple shards.
pub trait Merger: Send + Sync {
/// Merge search results from multiple shards.
///
/// Takes the raw JSON responses from each shard and produces
/// a merged result with global sorting, offset/limit applied,
/// and facet aggregation.
fn merge(
&self,
shard_responses: Vec<ShardResponse>,
offset: usize,
limit: usize,
client_requested_score: bool,
) -> Result<MergedResult>;
}
/// Response from a single shard.
#[derive(Debug, Clone)]
pub struct ShardResponse {
/// Shard identifier.
pub shard_id: u32,
/// Raw JSON response from the node.
pub body: Value,
/// Whether this shard succeeded.
pub success: bool,
}
/// Merged search result.
#[derive(Debug, Clone)]
pub struct MergedResult {
/// Merged hits (globally sorted, offset/limit applied).
pub hits: Vec<Value>,
/// Aggregated facets.
pub facets: Value,
/// Estimated total hits (sum of shard totals).
pub total_hits: u64,
/// Processing time in milliseconds.
pub processing_time_ms: u64,
/// Whether the response is degraded (some shards failed).
pub degraded: bool,
}
/// Default stub implementation of Merger.
#[derive(Debug, Clone, Default)]
pub struct StubMerger;
impl Merger for StubMerger {
fn merge(
&self,
_shard_responses: Vec<ShardResponse>,
_offset: usize,
_limit: usize,
_client_requested_score: bool,
) -> Result<MergedResult> {
Ok(MergedResult {
hits: Vec::new(),
facets: serde_json::json!({}),
total_hits: 0,
processing_time_ms: 0,
degraded: false,
})
}
}

View file

@ -0,0 +1,765 @@
//! Shard migration cutover state machine.
//!
//! Implements the node-addition migration flow from plan §4 with explicit state
//! transitions and a race-window-safe cutover sequence.
//!
//! ## Race window analysis (plan §15 OP#1)
//!
//! The dangerous window is between "mark node active" (routing changes to new-node-only)
//! and "delete migrated shard from old node." A document written during dual-write that
//! succeeded on OLD but failed on NEW — and arrived after the last migration page —
//! would be deleted from OLD without ever reaching NEW.
//!
//! ## Solution: quiesce-then-verify cutover
//!
//! Instead of the naïve sequence (mark active → stop dual-write → delete old), we use:
//!
//! 1. Stop dual-write (no new writes go to either node for affected shards)
//! 2. Drain: wait for all in-flight writes to both OLD and NEW to complete
//! 3. Delta migration: re-read affected shards from OLD (catches anything written since
//! the last migration page) and write deltas to NEW
//! 4. Mark node active (routing switches to NEW-only)
//! 5. Delete migrated shard from OLD
//!
//! Step 3 is the key: it closes the race window by ensuring NEW has a complete picture
//! before we commit the routing change. The cost is one extra pagination pass over each
//! migrated shard — bounded by the number of docs written during the migration window.
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
/// Unique identifier for a shard migration operation.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct MigrationId(pub u64);
impl fmt::Display for MigrationId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
/// Identifier for a physical node in the cluster.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct NodeId(pub String);
impl fmt::Display for NodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
/// Identifier for a logical shard.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ShardId(pub u32);
impl fmt::Display for ShardId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "s{}", self.0)
}
}
/// Per-shard migration state within a node-addition migration.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ShardMigrationState {
/// Waiting for background migration to begin.
Pending,
/// Background pagination is reading docs from source and writing to target.
Migrating {
docs_copied: u64,
pages_remaining: u32,
},
/// Background migration complete, awaiting cutover.
MigrationComplete {
docs_copied: u64,
},
/// Dual-write stopped, in-flight writes draining.
Draining {
in_flight_count: u32,
docs_copied: u64,
},
/// Delta pass: re-reading source to catch stragglers written during migration.
DeltaPass {
docs_copied: u64,
delta_docs_copied: u64,
},
/// Node is active for this shard; old replica data deleted.
Active,
/// Migration failed at this phase.
Failed {
phase: String,
reason: String,
},
}
impl fmt::Display for ShardMigrationState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Pending => write!(f, "pending"),
Self::Migrating { docs_copied, pages_remaining } => {
write!(f, "migrating({docs_copied} copied, {pages_remaining} pages left)")
}
Self::MigrationComplete { docs_copied } => {
write!(f, "migration_complete({docs_copied} copied)")
}
Self::Draining { in_flight_count, docs_copied } => {
write!(f, "draining({in_flight_count} in-flight, {docs_copied} copied)")
}
Self::DeltaPass { docs_copied, delta_docs_copied } => {
write!(f, "delta_pass({docs_copied} + {delta_docs_copied} copied)")
}
Self::Active => write!(f, "active"),
Self::Failed { phase, reason } => write!(f, "failed({phase}: {reason})"),
}
}
}
/// Overall migration phase for a node addition.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum MigrationPhase {
/// Computing which shards move to the new node.
ComputingAssignments,
/// Dual-write active; background migration in progress.
DualWriteMigrating,
/// Background migration done; beginning cutover.
CutoverBegin,
/// Stopping dual-write; waiting for in-flight writes to settle.
CutoverDraining,
/// Re-reading source to catch docs written during migration.
CutoverDeltaPass,
/// Marking new node active; switching routing.
CutoverActivate,
/// Deleting migrated shard data from old nodes.
CutoverCleanup,
/// All shards migrated; migration complete.
Complete,
/// Migration failed.
Failed(String),
}
impl fmt::Display for MigrationPhase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ComputingAssignments => write!(f, "computing_assignments"),
Self::DualWriteMigrating => write!(f, "dual_write_migrating"),
Self::CutoverBegin => write!(f, "cutover_begin"),
Self::CutoverDraining => write!(f, "cutover_draining"),
Self::CutoverDeltaPass => write!(f, "cutover_delta_pass"),
Self::CutoverActivate => write!(f, "cutover_activate"),
Self::CutoverCleanup => write!(f, "cutover_cleanup"),
Self::Complete => write!(f, "complete"),
Self::Failed(msg) => write!(f, "failed({msg})"),
}
}
}
/// A single document write targeting a shard during migration.
#[derive(Debug, Clone)]
pub struct InFlightWrite {
pub doc_id: String,
pub shard: ShardId,
pub target_nodes: Vec<NodeId>,
pub completed_nodes: HashSet<NodeId>,
pub failed_nodes: HashMap<NodeId, String>,
pub submitted_at: Instant,
}
// Serialize Instant as u64 (milliseconds since UNIX epoch)
mod instant_serde {
use std::time::{Duration, Instant};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S>(instant: &Option<Instant>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match instant {
Some(i) => {
let since_epoch = i.duration_since(Instant::now() - *i);
// Store as approximate UNIX timestamp - this is a simplification
// For production, would use SystemTime instead
(since_epoch.as_millis() as u64).serialize(serializer)
}
None => Option::<u64>::None.serialize(serializer),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<Instant>, D::Error>
where
D: Deserializer<'de>,
{
let opt = Option::<u64>::deserialize(deserializer)?;
Ok(opt.map(|_| Instant::now()))
}
}
/// Configuration for migration cutover behavior.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationConfig {
/// Maximum time to wait for in-flight writes to drain during cutover.
pub drain_timeout: Duration,
/// Whether to perform the delta pass (re-read source after stopping dual-write).
/// Disabling this saves a pagination pass but opens the race window — only safe
/// when anti-entropy is enabled as a safety net.
pub skip_delta_pass: bool,
/// Whether anti-entropy is enabled — used to determine if skip_delta_pass is safe.
pub anti_entropy_enabled: bool,
}
impl Default for MigrationConfig {
fn default() -> Self {
Self {
drain_timeout: Duration::from_secs(30),
skip_delta_pass: false,
anti_entropy_enabled: true,
}
}
}
/// Error type for migration operations.
#[derive(Debug, thiserror::Error)]
pub enum MigrationError {
#[error("anti-entropy is disabled and delta pass is skipped — documents may be lost at cutover")]
UnsafeCutoverNoAntiEntropy,
#[error("drain timeout exceeded: {0} in-flight writes still pending")]
DrainTimeout(u32),
#[error("shard {0} is not in a valid state for this transition (current: {1})")]
InvalidTransition(ShardId, String),
#[error("migration {0} not found")]
NotFound(MigrationId),
#[error("delta pass failed for shard {0}: {1}")]
DeltaPassFailed(ShardId, String),
}
/// Tracks the state of a node-addition migration.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationState {
pub id: MigrationId,
pub new_node: NodeId,
pub replica_group: u32,
pub phase: MigrationPhase,
pub affected_shards: HashMap<ShardId, ShardMigrationState>,
/// Maps shard → old node that currently owns it.
pub old_owners: HashMap<ShardId, NodeId>,
#[serde(with = "instant_serde")]
pub started_at: Option<Instant>,
#[serde(with = "instant_serde")]
pub completed_at: Option<Instant>,
}
/// The migration coordinator manages shard migration state transitions.
pub struct MigrationCoordinator {
config: MigrationConfig,
migrations: HashMap<MigrationId, MigrationState>,
next_id: u64,
/// In-flight writes being tracked for drain during cutover.
in_flight: Vec<InFlightWrite>,
}
impl MigrationCoordinator {
pub fn new(config: MigrationConfig) -> Self {
Self {
config,
migrations: HashMap::new(),
next_id: 0,
in_flight: Vec::new(),
}
}
/// Validate migration safety before starting. Returns an error if the configuration
/// would allow data loss at the cutover boundary.
pub fn validate_safety(&self) -> Result<(), MigrationError> {
if self.config.skip_delta_pass && !self.config.anti_entropy_enabled {
return Err(MigrationError::UnsafeCutoverNoAntiEntropy);
}
Ok(())
}
/// Begin a new node-addition migration.
pub fn begin_migration(
&mut self,
new_node: NodeId,
replica_group: u32,
affected_shards: HashMap<ShardId, NodeId>,
) -> Result<MigrationId, MigrationError> {
self.validate_safety()?;
let id = MigrationId(self.next_id);
self.next_id += 1;
let shard_states: HashMap<ShardId, ShardMigrationState> = affected_shards
.keys()
.map(|&shard| (shard, ShardMigrationState::Pending))
.collect();
let state = MigrationState {
id,
new_node,
replica_group,
phase: MigrationPhase::ComputingAssignments,
affected_shards: shard_states,
old_owners: affected_shards,
started_at: Some(Instant::now()),
completed_at: None,
};
self.migrations.insert(id, state);
Ok(id)
}
/// Transition to dual-write + background migration phase.
pub fn begin_dual_write(&mut self, id: MigrationId) -> Result<(), MigrationError> {
let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?;
state.phase = MigrationPhase::DualWriteMigrating;
for shard_state in state.affected_shards.values_mut() {
if *shard_state == ShardMigrationState::Pending {
*shard_state = ShardMigrationState::Migrating {
docs_copied: 0,
pages_remaining: 0,
};
}
}
Ok(())
}
/// Record that a shard's background migration completed.
pub fn shard_migration_complete(
&mut self,
id: MigrationId,
shard: ShardId,
docs_copied: u64,
) -> Result<(), MigrationError> {
let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?;
let shard_state = state.affected_shards.get_mut(&shard).ok_or_else(|| {
MigrationError::InvalidTransition(shard, "shard not in migration".into())
})?;
match shard_state {
ShardMigrationState::Migrating { .. } => {
*shard_state = ShardMigrationState::MigrationComplete { docs_copied };
}
_ => {
return Err(MigrationError::InvalidTransition(
shard,
shard_state.to_string(),
));
}
}
// Check if all shards are done migrating
let all_complete = state
.affected_shards
.values()
.all(|s| matches!(s, ShardMigrationState::MigrationComplete { .. }));
if all_complete {
state.phase = MigrationPhase::CutoverBegin;
}
Ok(())
}
/// Begin the cutover sequence: stop dual-write and drain in-flight writes.
pub fn begin_cutover(&mut self, id: MigrationId) -> Result<MigrationPhase, MigrationError> {
let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?;
if !matches!(state.phase, MigrationPhase::CutoverBegin) {
return Err(MigrationError::InvalidTransition(
ShardId(0),
format!("expected CutoverBegin, got {}", state.phase),
));
}
// Transition all shards to Draining
let total_in_flight = self.in_flight.len() as u32;
for (shard, shard_state) in state.affected_shards.iter_mut() {
match shard_state {
ShardMigrationState::MigrationComplete { docs_copied } => {
*shard_state = ShardMigrationState::Draining {
in_flight_count: total_in_flight,
docs_copied: *docs_copied,
};
}
_ => {
return Err(MigrationError::InvalidTransition(
*shard,
shard_state.to_string(),
));
}
}
}
state.phase = MigrationPhase::CutoverDraining;
Ok(state.phase.clone())
}
/// Register an in-flight write for tracking during drain.
pub fn register_in_flight(&mut self, write: InFlightWrite) {
self.in_flight.push(write);
}
/// Acknowledge completion of a write to a specific node.
pub fn ack_write(&mut self, doc_id: &str, node: &NodeId) {
for write in &mut self.in_flight {
if write.doc_id == doc_id {
write.completed_nodes.insert(node.clone());
}
}
}
/// Mark a write as failed on a specific node.
pub fn fail_write(&mut self, doc_id: &str, node: &NodeId, reason: String) {
for write in &mut self.in_flight {
if write.doc_id == doc_id {
write.failed_nodes.insert(node.clone(), reason.clone());
}
}
}
/// Check if all in-flight writes have completed (drained).
pub fn is_drained(&self) -> bool {
self.in_flight.iter().all(|w| {
let all_responded = w.completed_nodes.len() + w.failed_nodes.len()
== w.target_nodes.len();
all_responded
})
}
/// Complete the drain and move to delta pass or activation.
pub fn complete_drain(&mut self, id: MigrationId) -> Result<MigrationPhase, MigrationError> {
let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?;
if !matches!(state.phase, MigrationPhase::CutoverDraining) {
return Err(MigrationError::InvalidTransition(
ShardId(0),
format!("expected CutoverDraining, got {}", state.phase),
));
}
if !self.is_drained() {
let remaining = self
.in_flight
.iter()
.filter(|w| {
w.completed_nodes.len() + w.failed_nodes.len() < w.target_nodes.len()
})
.count() as u32;
return Err(MigrationError::DrainTimeout(remaining));
}
// Collect docs that need delta pass (written to OLD but may not be on NEW)
let needs_delta = self.collect_delta_candidates(id)?;
if self.config.skip_delta_pass {
// Skip delta pass — safe only if anti-entropy is enabled
state.phase = MigrationPhase::CutoverActivate;
self.activate_shards(id)?;
} else if needs_delta.is_empty() {
state.phase = MigrationPhase::CutoverActivate;
self.activate_shards(id)?;
} else {
state.phase = MigrationPhase::CutoverDeltaPass;
for (shard, shard_state) in state.affected_shards.iter_mut() {
if let ShardMigrationState::Draining { docs_copied, .. } = shard_state {
*shard_state = ShardMigrationState::DeltaPass {
docs_copied: *docs_copied,
delta_docs_copied: 0,
};
}
}
}
self.in_flight.clear();
Ok(state.phase.clone())
}
/// Identify writes that need the delta pass — those that succeeded on OLD but
/// failed (or never reached) NEW.
fn collect_delta_candidates(
&self,
id: MigrationId,
) -> Result<HashMap<ShardId, Vec<String>>, MigrationError> {
let state = self.migrations.get(&id).ok_or(MigrationError::NotFound(id))?;
let mut candidates: HashMap<ShardId, Vec<String>> = HashMap::new();
for write in &self.in_flight {
let old_owner = match state.old_owners.get(&write.shard) {
Some(owner) => owner,
None => continue,
};
let succeeded_on_old = write.completed_nodes.contains(old_owner);
let succeeded_on_new = write.completed_nodes.contains(&state.new_node);
// Doc is on OLD but not on NEW — delta pass must catch it
if succeeded_on_old && !succeeded_on_new {
candidates
.entry(write.shard)
.or_default()
.push(write.doc_id.clone());
}
}
Ok(candidates)
}
/// Record that the delta pass completed for a shard.
pub fn shard_delta_complete(
&mut self,
id: MigrationId,
shard: ShardId,
delta_docs: u64,
) -> Result<(), MigrationError> {
let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?;
let shard_state = state.affected_shards.get_mut(&shard).ok_or_else(|| {
MigrationError::InvalidTransition(shard, "shard not in migration".into())
})?;
match shard_state {
ShardMigrationState::DeltaPass { docs_copied, .. } => {
*shard_state = ShardMigrationState::MigrationComplete {
docs_copied: *docs_copied + delta_docs,
};
}
_ => {
return Err(MigrationError::InvalidTransition(
shard,
shard_state.to_string(),
));
}
}
// Check if all shards done with delta
let all_complete = state
.affected_shards
.values()
.all(|s| matches!(s, ShardMigrationState::MigrationComplete { .. }));
if all_complete {
state.phase = MigrationPhase::CutoverActivate;
self.activate_shards(id)?;
}
Ok(())
}
/// Mark all affected shards as active on the new node.
fn activate_shards(&mut self, id: MigrationId) -> Result<(), MigrationError> {
let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?;
for shard_state in state.affected_shards.values_mut() {
match shard_state {
ShardMigrationState::MigrationComplete { .. }
| ShardMigrationState::Draining { .. } => {
*shard_state = ShardMigrationState::Active;
}
_ => {}
}
}
if matches!(state.phase, MigrationPhase::CutoverActivate) {
state.phase = MigrationPhase::CutoverCleanup;
}
Ok(())
}
/// Complete the migration by deleting migrated shard data from old nodes.
pub fn complete_cleanup(&mut self, id: MigrationId) -> Result<(), MigrationError> {
let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?;
if !matches!(state.phase, MigrationPhase::CutoverCleanup) {
return Err(MigrationError::InvalidTransition(
ShardId(0),
format!("expected CutoverCleanup, got {}", state.phase),
));
}
state.phase = MigrationPhase::Complete;
state.completed_at = Some(Instant::now());
Ok(())
}
/// Get the current state of a migration.
pub fn get_state(&self, id: MigrationId) -> Option<&MigrationState> {
self.migrations.get(&id)
}
/// Check if a write should go to both old and new node (dual-write phase).
pub fn is_dual_write_active(&self, shard: ShardId) -> bool {
self.migrations.values().any(|m| {
matches!(m.phase, MigrationPhase::DualWriteMigrating)
&& matches!(
m.affected_shards.get(&shard),
Some(ShardMigrationState::Migrating { .. })
)
})
}
/// Get the migration config.
pub fn config(&self) -> &MigrationConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
fn node(s: &str) -> NodeId {
NodeId(s.to_string())
}
fn shard(id: u32) -> ShardId {
ShardId(id)
}
#[test]
fn test_safe_cutover_with_delta_pass() {
let config = MigrationConfig {
anti_entropy_enabled: false,
skip_delta_pass: false,
..Default::default()
};
let mut coord = MigrationCoordinator::new(config);
let affected = HashMap::from([
(shard(0), node("old-0")),
(shard(1), node("old-0")),
]);
let mid = coord.begin_migration(node("new-0"), 0, affected).unwrap();
coord.begin_dual_write(mid).unwrap();
// Simulate background migration completing
coord.shard_migration_complete(mid, shard(0), 500).unwrap();
coord.shard_migration_complete(mid, shard(1), 300).unwrap();
// Register an in-flight write that succeeded on OLD but not NEW
coord.register_in_flight(InFlightWrite {
doc_id: "doc-at-boundary".into(),
shard: shard(0),
target_nodes: vec![node("old-0"), node("new-0")],
completed_nodes: HashSet::from([node("old-0")]),
failed_nodes: HashMap::new(),
submitted_at: Instant::now(),
});
// Cutover
coord.begin_cutover(mid).unwrap();
// The drain sees the in-flight write completed (on old, not on new)
// Delta pass should be triggered
let phase = coord.complete_drain(mid).unwrap();
assert_eq!(phase, MigrationPhase::CutoverDeltaPass);
// Delta pass catches the straggler
coord.shard_delta_complete(mid, shard(0), 1).unwrap();
// Shard 1 had no stragglers, but needs delta complete too
coord.shard_delta_complete(mid, shard(1), 0).unwrap();
// Now activation and cleanup
let state = coord.get_state(mid).unwrap();
assert_eq!(state.phase, MigrationPhase::CutoverCleanup);
coord.complete_cleanup(mid).unwrap();
let state = coord.get_state(mid).unwrap();
assert_eq!(state.phase, MigrationPhase::Complete);
}
#[test]
fn test_unsafe_cutover_refused_without_anti_entropy() {
let config = MigrationConfig {
anti_entropy_enabled: false,
skip_delta_pass: true,
..Default::default()
};
let mut coord = MigrationCoordinator::new(config);
let affected = HashMap::from([(shard(0), node("old-0"))]);
let result = coord.begin_migration(node("new-0"), 0, affected);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, MigrationError::UnsafeCutoverNoAntiEntropy));
}
#[test]
fn test_skip_delta_pass_allowed_with_anti_entropy() {
let config = MigrationConfig {
anti_entropy_enabled: true,
skip_delta_pass: true,
..Default::default()
};
let mut coord = MigrationCoordinator::new(config);
let affected = HashMap::from([(shard(0), node("old-0"))]);
let mid = coord.begin_migration(node("new-0"), 0, affected).unwrap();
coord.begin_dual_write(mid).unwrap();
coord.shard_migration_complete(mid, shard(0), 100).unwrap();
coord.begin_cutover(mid).unwrap();
// With skip_delta_pass=true and AE enabled, drain goes straight to activate
let phase = coord.complete_drain(mid).unwrap();
assert_eq!(phase, MigrationPhase::CutoverCleanup);
coord.complete_cleanup(mid).unwrap();
assert_eq!(
coord.get_state(mid).unwrap().phase,
MigrationPhase::Complete
);
}
#[test]
fn test_drain_timeout_blocks_cutover() {
let config = MigrationConfig {
anti_entropy_enabled: true,
skip_delta_pass: true,
..Default::default()
};
let mut coord = MigrationCoordinator::new(config);
let affected = HashMap::from([(shard(0), node("old-0"))]);
let mid = coord.begin_migration(node("new-0"), 0, affected).unwrap();
coord.begin_dual_write(mid).unwrap();
coord.shard_migration_complete(mid, shard(0), 100).unwrap();
coord.begin_cutover(mid).unwrap();
// Register an in-flight write that hasn't completed on either node
coord.register_in_flight(InFlightWrite {
doc_id: "stuck-doc".into(),
shard: shard(0),
target_nodes: vec![node("old-0"), node("new-0")],
completed_nodes: HashSet::new(),
failed_nodes: HashMap::new(),
submitted_at: Instant::now(),
});
// Drain should fail — write still in flight
let result = coord.complete_drain(mid);
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), MigrationError::DrainTimeout(1)));
}
#[test]
fn test_dual_write_tracking() {
let config = MigrationConfig::default();
let mut coord = MigrationCoordinator::new(config);
let affected = HashMap::from([(shard(5), node("old-0"))]);
let mid = coord.begin_migration(node("new-0"), 0, affected).unwrap();
coord.begin_dual_write(mid).unwrap();
// Shard 5 is in dual-write
assert!(coord.is_dual_write_active(shard(5)));
// Shard 99 is not being migrated
assert!(!coord.is_dual_write_active(shard(99)));
// After migration completes, shard 5 is no longer dual-write
coord.shard_migration_complete(mid, shard(5), 100).unwrap();
assert!(!coord.is_dual_write_active(shard(5)));
}
}

View file

@ -0,0 +1,55 @@
//! Rendezvous hash-based routing and shard assignment.
use crate::topology::{Group, NodeId, Topology};
use twox_hash::XxHash64;
use std::hash::{Hash, Hasher};
/// Compute a rendezvous score for a shard+node pair.
///
/// Higher scores win; used for deterministic shard assignment.
pub fn score(shard_id: u32, node_id: &str) -> u64 {
let mut h = XxHash64::with_seed(0);
shard_id.hash(&mut h);
node_id.hash(&mut h);
h.finish()
}
/// Assign a shard to `rf` nodes within a single replica group.
///
/// `group_nodes` is the subset of nodes belonging to that group.
pub fn assign_shard_in_group(shard_id: u32, group_nodes: &[NodeId], rf: usize) -> Vec<NodeId> {
let mut scored: Vec<(u64, &NodeId)> = group_nodes
.iter()
.map(|n| (score(shard_id, n.as_str()), n))
.collect();
scored.sort_unstable_by(|a, b| b.0.cmp(&a.0));
scored.into_iter().take(rf).map(|(_, n)| n.clone()).collect()
}
/// All write targets for a document: the RF nodes in EACH replica group.
pub fn write_targets(shard_id: u32, topology: &Topology) -> Vec<NodeId> {
topology.groups().flat_map(|group| {
assign_shard_in_group(shard_id, group.nodes(), topology.rf())
}).collect()
}
/// Select the replica group for a query (round-robin by query counter).
pub fn query_group(query_seq: u64, replica_groups: u32) -> u32 {
(query_seq % replica_groups as u64) as u32
}
/// The covering set for a search: one node per shard within the chosen group.
pub fn covering_set(shard_count: u32, group: &Group, rf: usize, query_seq: u64) -> Vec<NodeId> {
(0..shard_count).map(|shard_id| {
let replicas = assign_shard_in_group(shard_id, group.nodes(), rf);
// rotate through replicas for intra-group load balancing
replicas[(query_seq as usize) % replicas.len()].clone()
}).collect::<std::collections::HashSet<_>>().into_iter().collect()
}
/// Compute the shard ID for a document's primary key.
pub fn shard_for_key(primary_key: &str, shard_count: u32) -> u32 {
let mut h = XxHash64::with_seed(0);
primary_key.hash(&mut h);
(h.finish() % shard_count as u64) as u32
}

View file

@ -0,0 +1,81 @@
//! Scatter orchestration: fan-out logic and covering set builder.
use crate::config::UnavailableShardPolicy;
use crate::topology::{NodeId, Topology};
use crate::Result;
/// Scatter orchestrator: fans out requests to the covering set.
pub trait Scatter: Send + Sync {
/// Execute a scatter request to multiple nodes.
///
/// Returns a map of node ID to response. Failed nodes are omitted
/// based on the unavailable shard policy.
fn scatter(
&self,
topology: &Topology,
nodes: Vec<NodeId>,
request: ScatterRequest,
policy: UnavailableShardPolicy,
) -> Result<ScatterResponse>;
}
/// A scatter request to be sent to each node.
#[derive(Debug, Clone)]
pub struct ScatterRequest {
/// Request body (JSON or raw bytes).
pub body: Vec<u8>,
/// Request headers.
pub headers: Vec<(String, String)>,
/// HTTP method.
pub method: String,
/// Request path.
pub path: String,
}
/// Response from a scatter operation.
#[derive(Debug, Clone)]
pub struct ScatterResponse {
/// Responses from successful nodes.
pub responses: Vec<NodeResponse>,
/// Nodes that failed or timed out.
pub failed: Vec<NodeId>,
}
/// Response from a single node.
#[derive(Debug, Clone)]
pub struct NodeResponse {
/// Node that responded.
pub node_id: NodeId,
/// Response body.
pub body: Vec<u8>,
/// HTTP status code.
pub status: u16,
/// Response headers.
pub headers: Vec<(String, String)>,
}
/// Default stub implementation of Scatter.
#[derive(Debug, Clone, Default)]
pub struct StubScatter;
impl Scatter for StubScatter {
fn scatter(
&self,
_topology: &Topology,
_nodes: Vec<NodeId>,
_request: ScatterRequest,
_policy: UnavailableShardPolicy,
) -> Result<ScatterResponse> {
Ok(ScatterResponse {
responses: Vec::new(),
failed: Vec::new(),
})
}
}

View file

@ -0,0 +1,136 @@
//! Task registry: unified task namespace across all Meilisearch nodes.
use crate::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;
/// Task registry: manages the unified task namespace.
pub trait TaskRegistry: Send + Sync {
/// Register a new Miroir task that fans out to multiple nodes.
fn register(&self, node_tasks: HashMap<String, u64>) -> Result<MiroirTask>;
/// Get a task by its Miroir ID.
fn get(&self, miroir_id: &str) -> Result<Option<MiroirTask>>;
/// Update the status of a Miroir task.
fn update_status(&self, miroir_id: &str, status: TaskStatus) -> Result<()>;
/// Update node task status.
fn update_node_task(&self, miroir_id: &str, node_id: &str, node_status: NodeTaskStatus) -> Result<()>;
/// List tasks with optional filtering.
fn list(&self, filter: TaskFilter) -> Result<Vec<MiroirTask>>;
}
/// A Miroir task: unified view of a fan-out write operation.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MiroirTask {
/// Unique Miroir task ID (UUID).
pub miroir_id: String,
/// Creation timestamp (Unix millis).
pub created_at: u64,
/// Current task status.
pub status: TaskStatus,
/// Map of node ID to local Meilisearch task UID.
pub node_tasks: HashMap<String, NodeTask>,
/// Error message if the task failed.
pub error: Option<String>,
}
/// Status of a Miroir task.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum TaskStatus {
/// Task is enqueued.
Enqueued,
/// Task is being processed.
Processing,
/// Task completed successfully.
Succeeded,
/// Task failed.
Failed,
/// Task was canceled.
Canceled,
}
/// A node task: local Meilisearch task reference.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeTask {
/// Local Meilisearch task UID.
pub task_uid: u64,
/// Current status of this node task.
pub status: NodeTaskStatus,
}
/// Status of a node task.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum NodeTaskStatus {
/// Task is enqueued on the node.
Enqueued,
/// Task is processing on the node.
Processing,
/// Task succeeded on the node.
Succeeded,
/// Task failed on the node.
Failed,
}
/// Filter for listing tasks.
#[derive(Debug, Clone, Default)]
pub struct TaskFilter {
/// Filter by status.
pub status: Option<TaskStatus>,
/// Filter by node ID.
pub node_id: Option<String>,
/// Maximum number of results.
pub limit: Option<usize>,
/// Offset for pagination.
pub offset: Option<usize>,
}
/// Default stub implementation of TaskRegistry.
#[derive(Debug, Clone, Default)]
pub struct StubTaskRegistry;
impl TaskRegistry for StubTaskRegistry {
fn register(&self, _node_tasks: HashMap<String, u64>) -> Result<MiroirTask> {
Ok(MiroirTask {
miroir_id: Uuid::new_v4().to_string(),
created_at: 0,
status: TaskStatus::Enqueued,
node_tasks: HashMap::new(),
error: None,
})
}
fn get(&self, _miroir_id: &str) -> Result<Option<MiroirTask>> {
Ok(None)
}
fn update_status(&self, _miroir_id: &str, _status: TaskStatus) -> Result<()> {
Ok(())
}
fn update_node_task(&self, _miroir_id: &str, _node_id: &str, _node_status: NodeTaskStatus) -> Result<()> {
Ok(())
}
fn list(&self, _filter: TaskFilter) -> Result<Vec<MiroirTask>> {
Ok(Vec::new())
}
}

View file

@ -0,0 +1,185 @@
//! Topology management: node registry, groups, and health state.
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Unique identifier for a node.
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub struct NodeId(String);
impl NodeId {
/// Create a new NodeId.
pub fn new(id: String) -> Self {
Self(id)
}
/// Get the node ID as a string slice.
pub fn as_str(&self) -> &str {
&self.0
}
}
impl From<String> for NodeId {
fn from(s: String) -> Self {
Self(s)
}
}
impl AsRef<str> for NodeId {
fn as_ref(&self) -> &str {
&self.0
}
}
/// Health status of a node.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum NodeStatus {
/// Node is healthy and serving traffic.
Healthy,
/// Node is joining the cluster (being provisioned).
Joining,
/// Node is draining (graceful shutdown, not accepting new writes).
Draining,
/// Node has failed (unplanned outage).
Failed,
}
/// A single Meilisearch node in the topology.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Node {
/// Unique node identifier.
pub id: NodeId,
/// Node base URL.
pub url: String,
/// Current health status.
pub status: NodeStatus,
/// Replica group assignment (0-based).
pub replica_group: u32,
}
impl Node {
/// Create a new node.
pub fn new(id: NodeId, url: String, replica_group: u32) -> Self {
Self {
id,
url,
status: NodeStatus::Joining,
replica_group,
}
}
/// Check if the node is healthy (can serve traffic).
pub fn is_healthy(&self) -> bool {
matches!(self.status, NodeStatus::Healthy)
}
}
/// A replica group: an independent query pool.
///
/// Each group holds all S shards, distributed across its nodes.
/// Reads are routed to a single group per query.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Group {
/// Group identifier (0-based).
pub id: u32,
/// Nodes in this group.
nodes: Vec<NodeId>,
}
impl Group {
/// Create a new group.
pub fn new(id: u32) -> Self {
Self {
id,
nodes: Vec::new(),
}
}
/// Add a node to this group.
pub fn add_node(&mut self, node_id: NodeId) {
if !self.nodes.contains(&node_id) {
self.nodes.push(node_id);
}
}
/// Get the nodes in this group.
pub fn nodes(&self) -> &[NodeId] {
&self.nodes
}
/// Get the number of nodes in this group.
pub fn node_count(&self) -> usize {
self.nodes.len()
}
}
/// Cluster topology: groups, nodes, and health state.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Topology {
/// All nodes in the cluster.
nodes: HashMap<NodeId, Node>,
/// Replica groups.
groups: Vec<Group>,
/// Replication factor (intra-group).
rf: usize,
}
impl Topology {
/// Create a new empty topology.
pub fn new(rf: usize) -> Self {
Self {
nodes: HashMap::new(),
groups: Vec::new(),
rf,
}
}
/// Add a node to the topology.
pub fn add_node(&mut self, node: Node) {
let group_id = node.replica_group as usize;
// Ensure group exists
while self.groups.len() <= group_id {
self.groups.push(Group::new(self.groups.len() as u32));
}
self.groups[group_id].add_node(node.id.clone());
self.nodes.insert(node.id.clone(), node);
}
/// Get a node by ID.
pub fn node(&self, id: &NodeId) -> Option<&Node> {
self.nodes.get(id)
}
/// Get all nodes.
pub fn nodes(&self) -> impl Iterator<Item = &Node> {
self.nodes.values()
}
/// Get a group by ID.
pub fn group(&self, id: u32) -> Option<&Group> {
self.groups.get(id as usize)
}
/// Iterate over all groups.
pub fn groups(&self) -> impl Iterator<Item = &Group> {
self.groups.iter()
}
/// Get the replication factor.
pub fn rf(&self) -> usize {
self.rf
}
/// Get the number of replica groups.
pub fn replica_group_count(&self) -> u32 {
self.groups.len() as u32
}
}