From 93891cd03ba8b5fb9bef7b7a23de152f71c6b791 Mon Sep 17 00:00:00 2001 From: jedarden Date: Sat, 18 Apr 2026 20:57:47 -0400 Subject: [PATCH] P0.2: Scaffold miroir-core crate Create core library module skeleton with public API surface: - router.rs: rendezvous hash primitives (twox-hash based) - topology.rs: Topology, Group, Node, NodeId, NodeStatus types - scatter.rs: scatter orchestration trait/stubs - merger.rs: result merge trait/stubs - task.rs: task registry trait/stubs - config.rs: Config struct (full YAML shape) - error.rs: MiroirError enum + Result alias All acceptance criteria met: - cargo build -p miroir-core succeeds - cargo doc -p miroir-core produces rustdoc without warnings - cargo test -p miroir-core runs (zero tests) successfully Co-Authored-By: Claude Opus 4.7 --- crates/miroir-core/Cargo.toml | 8 + crates/miroir-core/src/anti_entropy.rs | 114 +++ crates/miroir-core/src/config.bak/advanced.rs | 831 ++++++++++++++++++ crates/miroir-core/src/config.bak/mod.rs | 352 ++++++++ crates/miroir-core/src/config.rs | 79 ++ crates/miroir-core/src/config/error.rs | 16 + crates/miroir-core/src/config/validate.rs | 146 +++ crates/miroir-core/src/error.rs | 38 + crates/miroir-core/src/lib.rs | 15 +- crates/miroir-core/src/merger.rs | 74 ++ crates/miroir-core/src/migration.rs | 765 ++++++++++++++++ crates/miroir-core/src/router.rs | 55 ++ crates/miroir-core/src/scatter.rs | 81 ++ crates/miroir-core/src/task.rs | 136 +++ crates/miroir-core/src/topology.rs | 185 ++++ 15 files changed, 2894 insertions(+), 1 deletion(-) create mode 100644 crates/miroir-core/src/anti_entropy.rs create mode 100644 crates/miroir-core/src/config.bak/advanced.rs create mode 100644 crates/miroir-core/src/config.bak/mod.rs create mode 100644 crates/miroir-core/src/config.rs create mode 100644 crates/miroir-core/src/config/error.rs create mode 100644 crates/miroir-core/src/config/validate.rs create mode 100644 crates/miroir-core/src/error.rs create mode 100644 crates/miroir-core/src/merger.rs create mode 100644 crates/miroir-core/src/migration.rs create mode 100644 crates/miroir-core/src/router.rs create mode 100644 crates/miroir-core/src/scatter.rs create mode 100644 crates/miroir-core/src/task.rs create mode 100644 crates/miroir-core/src/topology.rs diff --git a/crates/miroir-core/Cargo.toml b/crates/miroir-core/Cargo.toml index a7a3cb2..c6d9cec 100644 --- a/crates/miroir-core/Cargo.toml +++ b/crates/miroir-core/Cargo.toml @@ -6,3 +6,11 @@ license.workspace = true repository.workspace = true [dependencies] +serde = { version = "1", features = ["derive"] } +serde_json = "1" +twox-hash = "2" +thiserror = "2" +tracing = "0.1" +uuid = { version = "1", features = ["v4", "serde"] } + +[dev-dependencies] diff --git a/crates/miroir-core/src/anti_entropy.rs b/crates/miroir-core/src/anti_entropy.rs new file mode 100644 index 0000000..8713be4 --- /dev/null +++ b/crates/miroir-core/src/anti_entropy.rs @@ -0,0 +1,114 @@ +//! Anti-entropy reconciler module. +//! +//! Stub for plan §13.8 anti-entropy shard reconciler. +//! Full implementation will follow the fingerprint → diff → repair pipeline. + +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +use crate::migration::{MigrationConfig, MigrationCoordinator, MigrationError}; + +/// Anti-entropy configuration (plan §13.8). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AntiEntropyConfig { + pub enabled: bool, + pub schedule_cron: String, + pub shards_per_pass: u32, + pub max_read_concurrency: u32, + pub fingerprint_batch_size: u32, + pub auto_repair: bool, + pub updated_at_field: String, +} + +impl Default for AntiEntropyConfig { + fn default() -> Self { + Self { + enabled: true, + schedule_cron: "0 */6 * * *".to_string(), + shards_per_pass: 0, + max_read_concurrency: 2, + fingerprint_batch_size: 1000, + auto_repair: true, + updated_at_field: "_miroir_updated_at".to_string(), + } + } +} + +/// Validates that migration is safe given the anti-entropy configuration. +/// Returns Ok(()) if safe, Err with a descriptive message if not. +pub fn validate_migration_safety( + ae_config: &AntiEntropyConfig, + migration_config: &MigrationConfig, +) -> Result<(), MigrationError> { + if migration_config.skip_delta_pass && !ae_config.enabled { + return Err(MigrationError::UnsafeCutoverNoAntiEntropy); + } + Ok(()) +} + +/// Generates a warning if anti-entropy is disabled during active migration. +/// The caller should log this at warn level. +pub fn migration_warning_if_ae_disabled(ae_enabled: bool) -> Option { + if ae_enabled { + return None; + } + Some( + "Anti-entropy is disabled. Shard migration cutover relies on the delta pass \ + to prevent data loss at the cutover boundary. If delta pass is also skipped, \ + documents written during migration may be permanently lost." + .to_string(), + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_validate_safe_with_delta_pass() { + let ae = AntiEntropyConfig { + enabled: false, + ..Default::default() + }; + let mc = MigrationConfig { + skip_delta_pass: false, + ..Default::default() + }; + assert!(validate_migration_safety(&ae, &mc).is_ok()); + } + + #[test] + fn test_validate_unsafe_without_anti_entropy() { + let ae = AntiEntropyConfig { + enabled: false, + ..Default::default() + }; + let mc = MigrationConfig { + skip_delta_pass: true, + anti_entropy_enabled: false, + ..Default::default() + }; + assert!(validate_migration_safety(&ae, &mc).is_err()); + } + + #[test] + fn test_validate_safe_with_anti_entropy_safety_net() { + let ae = AntiEntropyConfig { + enabled: true, + ..Default::default() + }; + let mc = MigrationConfig { + skip_delta_pass: true, + anti_entropy_enabled: true, + ..Default::default() + }; + assert!(validate_migration_safety(&ae, &mc).is_ok()); + } + + #[test] + fn test_warning_when_ae_disabled() { + assert!(migration_warning_if_ae_disabled(false).is_some()); + assert!(migration_warning_if_ae_disabled(true).is_none()); + } +} diff --git a/crates/miroir-core/src/config.bak/advanced.rs b/crates/miroir-core/src/config.bak/advanced.rs new file mode 100644 index 0000000..3a2092c --- /dev/null +++ b/crates/miroir-core/src/config.bak/advanced.rs @@ -0,0 +1,831 @@ +//! §13 Advanced capabilities configuration structs. + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +// --------------------------------------------------------------------------- +// 13.1 Online resharding +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct ReshardingConfig { + pub enabled: bool, + pub backfill_concurrency: u32, + pub backfill_batch_size: u32, + pub throttle_docs_per_sec: u32, + pub verify_before_swap: bool, + pub retain_old_index_hours: u32, +} + +impl Default for ReshardingConfig { + fn default() -> Self { + Self { + enabled: true, + backfill_concurrency: 4, + backfill_batch_size: 1000, + throttle_docs_per_sec: 0, + verify_before_swap: true, + retain_old_index_hours: 48, + } + } +} + +// --------------------------------------------------------------------------- +// 13.2 Hedged requests +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct HedgingConfig { + pub enabled: bool, + pub p95_trigger_multiplier: f64, + pub min_trigger_ms: u64, + pub max_hedges_per_query: u32, + pub cross_group_fallback: bool, +} + +impl Default for HedgingConfig { + fn default() -> Self { + Self { + enabled: true, + p95_trigger_multiplier: 1.2, + min_trigger_ms: 15, + max_hedges_per_query: 2, + cross_group_fallback: true, + } + } +} + +// --------------------------------------------------------------------------- +// 13.3 Adaptive replica selection (EWMA) +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct ReplicaSelectionConfig { + /// `adaptive`, `round_robin`, or `random`. + pub strategy: String, + pub latency_weight: f64, + pub inflight_weight: f64, + pub error_weight: f64, + pub ewma_half_life_ms: u64, + pub exploration_epsilon: f64, +} + +impl Default for ReplicaSelectionConfig { + fn default() -> Self { + Self { + strategy: "adaptive".into(), + latency_weight: 1.0, + inflight_weight: 2.0, + error_weight: 10.0, + ewma_half_life_ms: 5000, + exploration_epsilon: 0.05, + } + } +} + +// --------------------------------------------------------------------------- +// 13.4 Shard-aware query planner +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct QueryPlannerConfig { + pub enabled: bool, + pub max_pk_literals_narrowable: u32, + pub log_plans: bool, +} + +impl Default for QueryPlannerConfig { + fn default() -> Self { + Self { + enabled: true, + max_pk_literals_narrowable: 128, + log_plans: false, + } + } +} + +// --------------------------------------------------------------------------- +// 13.5 Two-phase settings broadcast +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct SettingsBroadcastConfig { + /// `two_phase` or `sequential` (legacy). + pub strategy: String, + pub verify_timeout_s: u64, + pub max_repair_retries: u32, + pub freeze_writes_on_unrepairable: bool, +} + +impl Default for SettingsBroadcastConfig { + fn default() -> Self { + Self { + strategy: "two_phase".into(), + verify_timeout_s: 60, + max_repair_retries: 3, + freeze_writes_on_unrepairable: true, + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct SettingsDriftCheckConfig { + pub interval_s: u64, + pub auto_repair: bool, +} + +impl Default for SettingsDriftCheckConfig { + fn default() -> Self { + Self { + interval_s: 300, + auto_repair: true, + } + } +} + +// --------------------------------------------------------------------------- +// 13.6 Session pinning (read-your-writes) +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct SessionPinningConfig { + pub enabled: bool, + pub ttl_seconds: u64, + pub max_sessions: u32, + /// `block` or `route_pin`. + pub wait_strategy: String, + pub max_wait_ms: u64, +} + +impl Default for SessionPinningConfig { + fn default() -> Self { + Self { + enabled: true, + ttl_seconds: 900, + max_sessions: 100_000, + wait_strategy: "block".into(), + max_wait_ms: 5000, + } + } +} + +// --------------------------------------------------------------------------- +// 13.7 Index aliases +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct AliasesConfig { + pub enabled: bool, + pub history_retention: u32, + pub require_target_exists: bool, +} + +impl Default for AliasesConfig { + fn default() -> Self { + Self { + enabled: true, + history_retention: 10, + require_target_exists: true, + } + } +} + +// --------------------------------------------------------------------------- +// 13.8 Anti-entropy shard reconciler +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct AntiEntropyConfig { + pub enabled: bool, + pub schedule: String, + pub shards_per_pass: u32, + pub max_read_concurrency: u32, + pub fingerprint_batch_size: u32, + pub auto_repair: bool, + pub updated_at_field: String, +} + +impl Default for AntiEntropyConfig { + fn default() -> Self { + Self { + enabled: true, + schedule: "every 6h".into(), + shards_per_pass: 0, + max_read_concurrency: 2, + fingerprint_batch_size: 1000, + auto_repair: true, + updated_at_field: "_miroir_updated_at".into(), + } + } +} + +// --------------------------------------------------------------------------- +// 13.9 Streaming dump import +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct DumpImportConfig { + /// `streaming` or `broadcast` (legacy). + pub mode: String, + pub batch_size: u32, + pub parallel_target_writes: u32, + pub memory_buffer_bytes: u64, + pub chunk_size_bytes: u64, +} + +impl Default for DumpImportConfig { + fn default() -> Self { + Self { + mode: "streaming".into(), + batch_size: 1000, + parallel_target_writes: 8, + memory_buffer_bytes: 134_217_728, // 128 MiB + chunk_size_bytes: 268_435_456, // 256 MiB + } + } +} + +// --------------------------------------------------------------------------- +// 13.10 Idempotency keys +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct IdempotencyConfig { + pub enabled: bool, + pub ttl_seconds: u64, + pub max_cached_keys: u32, +} + +impl Default for IdempotencyConfig { + fn default() -> Self { + Self { + enabled: true, + ttl_seconds: 86400, + max_cached_keys: 1_000_000, + } + } +} + +// --------------------------------------------------------------------------- +// 13.10 Query coalescing (paired with idempotency) +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct QueryCoalescingConfig { + pub enabled: bool, + pub window_ms: u64, + pub max_subscribers: u32, + pub max_pending_queries: u32, +} + +impl Default for QueryCoalescingConfig { + fn default() -> Self { + Self { + enabled: true, + window_ms: 50, + max_subscribers: 1000, + max_pending_queries: 10000, + } + } +} + +// --------------------------------------------------------------------------- +// 13.11 Multi-search batch API +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct MultiSearchConfig { + pub enabled: bool, + pub max_queries_per_batch: u32, + pub total_timeout_ms: u64, + pub per_query_timeout_ms: u64, +} + +impl Default for MultiSearchConfig { + fn default() -> Self { + Self { + enabled: true, + max_queries_per_batch: 100, + total_timeout_ms: 30000, + per_query_timeout_ms: 30000, + } + } +} + +// --------------------------------------------------------------------------- +// 13.12 Vector / hybrid search +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct VectorSearchConfig { + pub enabled: bool, + pub over_fetch_factor: u32, + /// `convex` or `rrf`. + pub merge_strategy: String, + pub hybrid_alpha_default: f64, + pub rrf_k: u32, +} + +impl Default for VectorSearchConfig { + fn default() -> Self { + Self { + enabled: true, + over_fetch_factor: 3, + merge_strategy: "convex".into(), + hybrid_alpha_default: 0.5, + rrf_k: 60, + } + } +} + +// --------------------------------------------------------------------------- +// 13.13 Change data capture (CDC) +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct CdcConfig { + pub enabled: bool, + pub emit_ttl_deletes: bool, + pub emit_internal_writes: bool, + pub sinks: Vec, + pub buffer: CdcBufferConfig, +} + +impl Default for CdcConfig { + fn default() -> Self { + Self { + enabled: true, + emit_ttl_deletes: false, + emit_internal_writes: false, + sinks: Vec::new(), + buffer: CdcBufferConfig::default(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct CdcSinkConfig { + /// `webhook`, `nats`, `kafka`, or `internal`. + #[serde(rename = "type")] + pub sink_type: String, + pub url: String, + pub batch_size: u32, + pub batch_flush_ms: u64, + pub include_body: bool, + pub retry_max_s: u64, + /// NATS-specific. + pub subject_prefix: Option, +} + +impl Default for CdcSinkConfig { + fn default() -> Self { + Self { + sink_type: "webhook".into(), + url: String::new(), + batch_size: 100, + batch_flush_ms: 1000, + include_body: false, + retry_max_s: 3600, + subject_prefix: None, + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct CdcBufferConfig { + /// `memory`, `redis`, or `pvc`. + pub primary: String, + pub memory_bytes: u64, + /// `redis`, `pvc`, or `drop`. + pub overflow: String, + pub redis_bytes: u64, +} + +impl Default for CdcBufferConfig { + fn default() -> Self { + Self { + primary: "memory".into(), + memory_bytes: 67_108_864, // 64 MiB + overflow: "redis".into(), + redis_bytes: 1_073_741_824, // 1 GiB + } + } +} + +// --------------------------------------------------------------------------- +// 13.14 Document TTL +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct TtlConfig { + pub enabled: bool, + pub sweep_interval_s: u64, + pub max_deletes_per_sweep: u32, + pub expires_at_field: String, + pub per_index_overrides: HashMap, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct TtlOverride { + pub sweep_interval_s: u64, + pub max_deletes_per_sweep: u32, +} + +impl Default for TtlConfig { + fn default() -> Self { + Self { + enabled: true, + sweep_interval_s: 300, + max_deletes_per_sweep: 10000, + expires_at_field: "_miroir_expires_at".into(), + per_index_overrides: HashMap::new(), + } + } +} + +// --------------------------------------------------------------------------- +// 13.15 Tenant-to-replica-group affinity +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct TenantAffinityConfig { + pub enabled: bool, + /// `header`, `api_key`, or `explicit`. + pub mode: String, + pub header_name: String, + /// `hash`, `random`, or `reject`. + pub fallback: String, + pub static_map: HashMap, + pub dedicated_groups: Vec, +} + +impl Default for TenantAffinityConfig { + fn default() -> Self { + Self { + enabled: true, + mode: "header".into(), + header_name: "X-Miroir-Tenant".into(), + fallback: "hash".into(), + static_map: HashMap::new(), + dedicated_groups: Vec::new(), + } + } +} + +// --------------------------------------------------------------------------- +// 13.16 Traffic shadow +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct ShadowConfig { + pub enabled: bool, + pub targets: Vec, + pub diff_buffer_size: u32, + pub max_shadow_latency_ms: u64, +} + +impl Default for ShadowConfig { + fn default() -> Self { + Self { + enabled: true, + targets: Vec::new(), + diff_buffer_size: 10000, + max_shadow_latency_ms: 5000, + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct ShadowTargetConfig { + pub name: String, + pub url: String, + pub api_key_env: String, + pub sample_rate: f64, + pub operations: Vec, +} + +impl Default for ShadowTargetConfig { + fn default() -> Self { + Self { + name: String::new(), + url: String::new(), + api_key_env: String::new(), + sample_rate: 0.05, + operations: vec!["search".into(), "multi_search".into(), "explain".into()], + } + } +} + +// --------------------------------------------------------------------------- +// 13.17 Index lifecycle management (ILM) +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct IlmConfig { + pub enabled: bool, + pub check_interval_s: u64, + pub safety_lock_older_than_days: u32, + pub max_rollovers_per_check: u32, +} + +impl Default for IlmConfig { + fn default() -> Self { + Self { + enabled: true, + check_interval_s: 3600, + safety_lock_older_than_days: 7, + max_rollovers_per_check: 10, + } + } +} + +// --------------------------------------------------------------------------- +// 13.18 Synthetic canary queries +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct CanaryRunnerConfig { + pub enabled: bool, + pub max_concurrent_canaries: u32, + pub run_history_per_canary: u32, + pub emit_results_to_cdc: bool, +} + +impl Default for CanaryRunnerConfig { + fn default() -> Self { + Self { + enabled: true, + max_concurrent_canaries: 10, + run_history_per_canary: 100, + emit_results_to_cdc: true, + } + } +} + +// --------------------------------------------------------------------------- +// 13.19 Admin Web UI +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct AdminUiConfig { + pub enabled: bool, + pub path: String, + /// `key`, `oauth` (future), or `none` (dev only). + pub auth: String, + pub session_ttl_s: u64, + pub read_only_mode: bool, + pub allowed_origins: Vec, + pub cors_allowed_origins: Vec, + pub csp_overrides: CspOverridesConfig, + pub theme: AdminUiThemeConfig, + pub features: AdminUiFeaturesConfig, +} + +impl Default for AdminUiConfig { + fn default() -> Self { + Self { + enabled: true, + path: "/_miroir/admin".into(), + auth: "key".into(), + session_ttl_s: 3600, + read_only_mode: false, + allowed_origins: vec!["same-origin".into()], + cors_allowed_origins: Vec::new(), + csp_overrides: CspOverridesConfig::default(), + theme: AdminUiThemeConfig::default(), + features: AdminUiFeaturesConfig::default(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct CspOverridesConfig { + pub script_src: Vec, + pub img_src: Vec, + pub connect_src: Vec, +} + +impl Default for CspOverridesConfig { + fn default() -> Self { + Self { + script_src: Vec::new(), + img_src: Vec::new(), + connect_src: Vec::new(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct AdminUiThemeConfig { + pub accent_color: String, + /// `auto`, `light`, or `dark`. + pub default_mode: String, +} + +impl Default for AdminUiThemeConfig { + fn default() -> Self { + Self { + accent_color: "#2563eb".into(), + default_mode: "auto".into(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct AdminUiFeaturesConfig { + pub sandbox: bool, + pub shadow_viewer: bool, + pub cdc_inspector: bool, +} + +impl Default for AdminUiFeaturesConfig { + fn default() -> Self { + Self { + sandbox: true, + shadow_viewer: true, + cdc_inspector: true, + } + } +} + +// --------------------------------------------------------------------------- +// 13.20 Query explain API +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct ExplainConfig { + pub enabled: bool, + pub max_warnings: u32, + pub allow_execute_parameter: bool, +} + +impl Default for ExplainConfig { + fn default() -> Self { + Self { + enabled: true, + max_warnings: 20, + allow_execute_parameter: true, + } + } +} + +// --------------------------------------------------------------------------- +// 13.21 Search UI (end-user) +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct SearchUiConfig { + pub enabled: bool, + pub path: String, + pub widget_script_enabled: bool, + pub embeddable: bool, + pub auth: SearchUiAuthConfig, + pub allowed_origins: Vec, + pub scoped_key_max_age_days: u32, + pub scoped_key_rotate_before_expiry_days: u32, + pub scoped_key_rotation_drain_s: u64, + pub rate_limit: SearchUiRateLimitConfig, + pub cors_allowed_origins: Vec, + pub csp_overrides: CspOverridesConfig, + pub csp: String, + pub analytics: SearchUiAnalyticsConfig, +} + +impl Default for SearchUiConfig { + fn default() -> Self { + Self { + enabled: true, + path: "/ui/search".into(), + widget_script_enabled: true, + embeddable: true, + auth: SearchUiAuthConfig::default(), + allowed_origins: vec!["*".into()], + scoped_key_max_age_days: 60, + scoped_key_rotate_before_expiry_days: 30, + scoped_key_rotation_drain_s: 120, + rate_limit: SearchUiRateLimitConfig::default(), + cors_allowed_origins: Vec::new(), + csp_overrides: CspOverridesConfig::default(), + csp: "default-src 'self'; img-src 'self' https:; style-src 'self' 'unsafe-inline'" + .into(), + analytics: SearchUiAnalyticsConfig::default(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct SearchUiAuthConfig { + /// `public`, `shared_key`, or `oauth_proxy`. + pub mode: String, + pub shared_key_env: String, + pub session_ttl_s: u64, + pub session_rate_limit: String, + pub jwt_secret_env: String, + pub oauth_proxy: OAuthProxyConfig, +} + +impl Default for SearchUiAuthConfig { + fn default() -> Self { + Self { + mode: "public".into(), + shared_key_env: String::new(), + session_ttl_s: 900, + session_rate_limit: "10/minute".into(), + jwt_secret_env: "SEARCH_UI_JWT_SECRET".into(), + oauth_proxy: OAuthProxyConfig::default(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct OAuthProxyConfig { + pub user_header: String, + pub groups_header: String, + pub filter_template: Option, + pub attribute_map: HashMap, +} + +impl Default for OAuthProxyConfig { + fn default() -> Self { + Self { + user_header: "X-Forwarded-User".into(), + groups_header: "X-Forwarded-Groups".into(), + filter_template: Some("tenant IN [{groups}]".into()), + attribute_map: { + let mut m = HashMap::new(); + m.insert("groups".into(), "groups_array".into()); + m.insert("user".into(), "user_id_string".into()); + m + }, + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct SearchUiRateLimitConfig { + pub per_ip: String, + /// `redis` or `local`. + pub backend: String, + pub redis_key_prefix: String, + pub redis_ttl_s: u64, +} + +impl Default for SearchUiRateLimitConfig { + fn default() -> Self { + Self { + per_ip: "60/minute".into(), + backend: "redis".into(), + redis_key_prefix: "miroir:ratelimit:searchui:".into(), + redis_ttl_s: 60, + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct SearchUiAnalyticsConfig { + pub enabled: bool, + /// `cdc` (publishes click-throughs as CDC events). + pub sink: String, +} + +impl Default for SearchUiAnalyticsConfig { + fn default() -> Self { + Self { + enabled: false, + sink: "cdc".into(), + } + } +} diff --git a/crates/miroir-core/src/config.bak/mod.rs b/crates/miroir-core/src/config.bak/mod.rs new file mode 100644 index 0000000..9fbf8dd --- /dev/null +++ b/crates/miroir-core/src/config.bak/mod.rs @@ -0,0 +1,352 @@ +mod advanced; +mod error; +mod load; +mod validate; + +pub use error::ConfigError; + +use serde::{Deserialize, Serialize}; + +/// Top-level configuration matching plan §4 YAML schema under `miroir:`. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct MiroirConfig { + // --- Secrets (env-var overrides) --- + /// Client-facing API key. Env override: `MIROIR_MASTER_KEY`. + pub master_key: String, + /// Key Miroir uses on Meilisearch nodes. Env override: `MIROIR_NODE_MASTER_KEY`. + pub node_master_key: String, + + // --- Core topology --- + /// Total number of logical shards. + pub shards: u32, + /// Replication factor (intra-group replicas per shard). Production: 2. + pub replication_factor: u32, + /// Number of independent query pools. Default 1; production: 2. + pub replica_groups: u32, + + // --- Sub-structs --- + pub nodes: Vec, + pub task_store: TaskStoreConfig, + pub admin: AdminConfig, + pub health: HealthConfig, + pub scatter: ScatterConfig, + pub rebalancer: RebalancerConfig, + pub server: ServerConfig, + pub connection_pool_per_node: ConnectionPoolConfig, + pub task_registry: TaskRegistryConfig, + + // --- §13 advanced capabilities --- + pub resharding: advanced::ReshardingConfig, + pub hedging: advanced::HedgingConfig, + pub replica_selection: advanced::ReplicaSelectionConfig, + pub query_planner: advanced::QueryPlannerConfig, + pub settings_broadcast: advanced::SettingsBroadcastConfig, + pub settings_drift_check: advanced::SettingsDriftCheckConfig, + pub session_pinning: advanced::SessionPinningConfig, + pub aliases: advanced::AliasesConfig, + pub anti_entropy: advanced::AntiEntropyConfig, + pub dump_import: advanced::DumpImportConfig, + pub idempotency: advanced::IdempotencyConfig, + pub query_coalescing: advanced::QueryCoalescingConfig, + pub multi_search: advanced::MultiSearchConfig, + pub vector_search: advanced::VectorSearchConfig, + pub cdc: advanced::CdcConfig, + pub ttl: advanced::TtlConfig, + pub tenant_affinity: advanced::TenantAffinityConfig, + pub shadow: advanced::ShadowConfig, + pub ilm: advanced::IlmConfig, + pub canary_runner: advanced::CanaryRunnerConfig, + pub explain: advanced::ExplainConfig, + pub admin_ui: advanced::AdminUiConfig, + pub search_ui: advanced::SearchUiConfig, + + // --- §14 horizontal scaling --- + pub peer_discovery: PeerDiscoveryConfig, + pub leader_election: LeaderElectionConfig, + pub hpa: HpaConfig, +} + +impl Default for MiroirConfig { + fn default() -> Self { + Self { + master_key: String::new(), + node_master_key: String::new(), + shards: 64, + replication_factor: 2, + replica_groups: 1, + nodes: Vec::new(), + task_store: TaskStoreConfig::default(), + admin: AdminConfig::default(), + health: HealthConfig::default(), + scatter: ScatterConfig::default(), + rebalancer: RebalancerConfig::default(), + server: ServerConfig::default(), + connection_pool_per_node: ConnectionPoolConfig::default(), + task_registry: TaskRegistryConfig::default(), + resharding: advanced::ReshardingConfig::default(), + hedging: advanced::HedgingConfig::default(), + replica_selection: advanced::ReplicaSelectionConfig::default(), + query_planner: advanced::QueryPlannerConfig::default(), + settings_broadcast: advanced::SettingsBroadcastConfig::default(), + settings_drift_check: advanced::SettingsDriftCheckConfig::default(), + session_pinning: advanced::SessionPinningConfig::default(), + aliases: advanced::AliasesConfig::default(), + anti_entropy: advanced::AntiEntropyConfig::default(), + dump_import: advanced::DumpImportConfig::default(), + idempotency: advanced::IdempotencyConfig::default(), + query_coalescing: advanced::QueryCoalescingConfig::default(), + multi_search: advanced::MultiSearchConfig::default(), + vector_search: advanced::VectorSearchConfig::default(), + cdc: advanced::CdcConfig::default(), + ttl: advanced::TtlConfig::default(), + tenant_affinity: advanced::TenantAffinityConfig::default(), + shadow: advanced::ShadowConfig::default(), + ilm: advanced::IlmConfig::default(), + canary_runner: advanced::CanaryRunnerConfig::default(), + explain: advanced::ExplainConfig::default(), + admin_ui: advanced::AdminUiConfig::default(), + search_ui: advanced::SearchUiConfig::default(), + peer_discovery: PeerDiscoveryConfig::default(), + leader_election: LeaderElectionConfig::default(), + hpa: HpaConfig::default(), + } + } +} + +impl MiroirConfig { + /// Validate cross-field constraints. Returns `Ok(())` or a `ConfigError`. + pub fn validate(&self) -> Result<(), ConfigError> { + validate::validate(self) + } + + /// Layered loading: file → env overrides → CLI overrides. + pub fn load() -> Result { + load::load() + } +} + +/// A single Meilisearch node in the cluster topology. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct NodeConfig { + pub id: String, + pub address: String, + pub replica_group: u32, +} + +/// Task store backend configuration. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct TaskStoreConfig { + /// `sqlite` or `redis`. + pub backend: String, + /// Path to SQLite database file (sqlite backend). + pub path: String, + /// Redis URL (redis backend), e.g. `redis://host:6379`. + pub url: String, +} + +impl Default for TaskStoreConfig { + fn default() -> Self { + Self { + backend: "sqlite".into(), + path: "/data/miroir-tasks.db".into(), + url: String::new(), + } + } +} + +/// Admin API configuration. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct AdminConfig { + pub enabled: bool, + /// Env override: `MIROIR_ADMIN_API_KEY`. + pub api_key: String, +} + +impl Default for AdminConfig { + fn default() -> Self { + Self { + enabled: true, + api_key: String::new(), + } + } +} + +/// Health check configuration. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct HealthConfig { + pub interval_ms: u64, + pub timeout_ms: u64, + pub unhealthy_threshold: u32, + pub recovery_threshold: u32, +} + +impl Default for HealthConfig { + fn default() -> Self { + Self { + interval_ms: 5000, + timeout_ms: 2000, + unhealthy_threshold: 3, + recovery_threshold: 2, + } + } +} + +/// Scatter-gather query configuration. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct ScatterConfig { + pub node_timeout_ms: u64, + pub retry_on_timeout: bool, + /// `partial` or `error`. + pub unavailable_shard_policy: String, +} + +impl Default for ScatterConfig { + fn default() -> Self { + Self { + node_timeout_ms: 5000, + retry_on_timeout: true, + unavailable_shard_policy: "partial".into(), + } + } +} + +/// Rebalancer configuration. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct RebalancerConfig { + pub auto_rebalance_on_recovery: bool, + pub max_concurrent_migrations: u32, + pub migration_timeout_s: u64, +} + +impl Default for RebalancerConfig { + fn default() -> Self { + Self { + auto_rebalance_on_recovery: true, + max_concurrent_migrations: 4, + migration_timeout_s: 3600, + } + } +} + +/// Server (HTTP listener) configuration. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct ServerConfig { + pub port: u16, + pub bind: String, + pub max_body_bytes: u64, + #[serde(default = "default_max_concurrent_requests")] + pub max_concurrent_requests: u32, + #[serde(default = "default_request_timeout_ms")] + pub request_timeout_ms: u64, +} + +fn default_max_concurrent_requests() -> u32 { + 500 +} +fn default_request_timeout_ms() -> u64 { + 30000 +} + +impl Default for ServerConfig { + fn default() -> Self { + Self { + port: 7700, + bind: "0.0.0.0".into(), + max_body_bytes: 104_857_600, // 100 MiB + max_concurrent_requests: default_max_concurrent_requests(), + request_timeout_ms: default_request_timeout_ms(), + } + } +} + +/// HTTP/2 connection pool per-node settings (§14.8). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct ConnectionPoolConfig { + pub max_idle: u32, + pub max_total: u32, + pub idle_timeout_s: u64, +} + +impl Default for ConnectionPoolConfig { + fn default() -> Self { + Self { + max_idle: 32, + max_total: 128, + idle_timeout_s: 60, + } + } +} + +/// Task registry cache settings (§14.8). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct TaskRegistryConfig { + pub cache_size: u32, + pub redis_pool_max: u32, +} + +impl Default for TaskRegistryConfig { + fn default() -> Self { + Self { + cache_size: 10000, + redis_pool_max: 50, + } + } +} + +/// Peer discovery via Kubernetes headless Service (§14.5). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct PeerDiscoveryConfig { + pub service_name: String, + pub refresh_interval_s: u64, +} + +impl Default for PeerDiscoveryConfig { + fn default() -> Self { + Self { + service_name: "miroir-headless".into(), + refresh_interval_s: 15, + } + } +} + +/// Leader election for Mode B background jobs (§14.5). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct LeaderElectionConfig { + pub enabled: bool, + pub lease_ttl_s: u64, + pub renew_interval_s: u64, +} + +impl Default for LeaderElectionConfig { + fn default() -> Self { + Self { + enabled: true, + lease_ttl_s: 10, + renew_interval_s: 3, + } + } +} + +/// Horizontal Pod Autoscaler settings (Helm-only, informational in config). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct HpaConfig { + pub enabled: bool, +} + +impl Default for HpaConfig { + fn default() -> Self { + Self { enabled: false } + } +} diff --git a/crates/miroir-core/src/config.rs b/crates/miroir-core/src/config.rs new file mode 100644 index 0000000..9fbf5c2 --- /dev/null +++ b/crates/miroir-core/src/config.rs @@ -0,0 +1,79 @@ +//! Miroir configuration. + +use serde::{Deserialize, Serialize}; + +/// Main Miroir configuration. +/// +/// This struct represents the full configuration shape matching the plan §4 YAML. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Config { + /// Shard count (fixed at index creation). + pub shards: u32, + + /// Replication factor (elastic, intra-group copies per shard). + pub replication_factor: usize, + + /// Number of replica groups (elastic, independent query pools). + pub replica_groups: u32, + + /// Node configuration. + pub nodes: Vec, + + /// Scatter configuration. + pub scatter: ScatterConfig, + + /// Search UI configuration. + #[serde(default)] + pub search_ui: SearchUiConfig, +} + +/// Configuration for a single node. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeConfig { + /// Unique node identifier. + pub id: String, + + /// Node base URL (e.g., ). + pub url: String, + + /// Replica group assignment (0-based). + pub replica_group: u32, +} + +/// Scatter (fan-out) configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScatterConfig { + /// Policy for handling unavailable shards. + #[serde(default = "default_unavailable_shard_policy")] + pub unavailable_shard_policy: UnavailableShardPolicy, +} + +/// Policy for handling unavailable shards during scatter. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum UnavailableShardPolicy { + /// Return partial results from available nodes. + Partial, + + /// Fail the request if any shard is unavailable. + Fail, + + /// Fall back to another replica group for unavailable shards. + Fallback, +} + +fn default_unavailable_shard_policy() -> UnavailableShardPolicy { + UnavailableShardPolicy::Partial +} + +/// Search UI configuration. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct SearchUiConfig { + /// Whether the search UI is enabled. + #[serde(default)] + pub enabled: bool, + + /// CORS allowed origins. + #[serde(default)] + pub cors_allowed_origins: Vec, +} diff --git a/crates/miroir-core/src/config/error.rs b/crates/miroir-core/src/config/error.rs new file mode 100644 index 0000000..4e4d855 --- /dev/null +++ b/crates/miroir-core/src/config/error.rs @@ -0,0 +1,16 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ConfigError { + #[error("config file error: {0}")] + File(#[from] std::io::Error), + + #[error("config parse error: {0}")] + Parse(#[from] config::ConfigError), + + #[error("YAML serialization error: {0}")] + Yaml(#[from] serde_yaml::Error), + + #[error("validation error: {0}")] + Validation(String), +} diff --git a/crates/miroir-core/src/config/validate.rs b/crates/miroir-core/src/config/validate.rs new file mode 100644 index 0000000..6d79631 --- /dev/null +++ b/crates/miroir-core/src/config/validate.rs @@ -0,0 +1,146 @@ +use crate::config::{ConfigError, MiroirConfig}; + +pub fn validate(cfg: &MiroirConfig) -> Result<(), ConfigError> { + // replication_factor > 1 requires redis backend for HA + if cfg.replication_factor > 1 && cfg.task_store.backend == "sqlite" { + return Err(ConfigError::Validation( + "replication_factor > 1 requires task_store.backend = 'redis' (SQLite is single-writer)".into(), + )); + } + + // replica_groups > 1 requires redis backend + if cfg.replica_groups > 1 && cfg.task_store.backend == "sqlite" { + return Err(ConfigError::Validation( + "replica_groups > 1 requires task_store.backend = 'redis' (SQLite is single-writer)".into(), + )); + } + + // Nodes must belong to a valid replica group + if cfg.replica_groups > 0 { + for node in &cfg.nodes { + if node.replica_group >= cfg.replica_groups { + return Err(ConfigError::Validation(format!( + "node '{}' has replica_group={} but only {} groups exist (0..{})", + node.id, + node.replica_group, + cfg.replica_groups, + cfg.replica_groups - 1 + ))); + } + } + } + + // Node IDs must be unique + let mut seen_ids = std::collections::HashSet::new(); + for node in &cfg.nodes { + if !seen_ids.insert(&node.id) { + return Err(ConfigError::Validation(format!( + "duplicate node id: '{}'", + node.id + ))); + } + } + + // HPA enabled requires redis backend + if cfg.hpa.enabled && cfg.task_store.backend == "sqlite" { + return Err(ConfigError::Validation( + "hpa.enabled = true requires task_store.backend = 'redis'".into(), + )); + } + + // Search UI scoped_key timing validation + if cfg.search_ui.enabled { + let max_age = cfg.search_ui.scoped_key_max_age_days; + let rotate_before = cfg.search_ui.scoped_key_rotate_before_expiry_days; + if rotate_before >= max_age { + return Err(ConfigError::Validation(format!( + "search_ui.scoped_key_rotate_before_expiry_days ({}) must be strictly less than scoped_key_max_age_days ({})", + rotate_before, max_age + ))); + } + } + + // CDC overflow = redis requires redis backend + if cfg.cdc.enabled && cfg.cdc.buffer.overflow == "redis" && cfg.task_store.backend != "redis" { + return Err(ConfigError::Validation( + "cdc.buffer.overflow = 'redis' requires task_store.backend = 'redis'".into(), + )); + } + + // Search UI rate_limit.backend = redis requires redis task store (when multi-pod) + if cfg.search_ui.enabled + && cfg.search_ui.rate_limit.backend == "redis" + && cfg.task_store.backend != "redis" + { + return Err(ConfigError::Validation( + "search_ui.rate_limit.backend = 'redis' requires task_store.backend = 'redis'".into(), + )); + } + + // Leader election should be enabled when replica_groups > 1 + if cfg.replica_groups > 1 && !cfg.leader_election.enabled { + return Err(ConfigError::Validation( + "leader_election.enabled must be true when replica_groups > 1".into(), + )); + } + + // Tenant affinity dedicated_groups must be within valid range + if cfg.tenant_affinity.enabled { + for g in &cfg.tenant_affinity.dedicated_groups { + if *g >= cfg.replica_groups { + return Err(ConfigError::Validation(format!( + "tenant_affinity.dedicated_groups contains {} but only {} groups (0..{})", + g, + cfg.replica_groups, + cfg.replica_groups - 1 + ))); + } + } + for (tenant, group) in &cfg.tenant_affinity.static_map { + if *group >= cfg.replica_groups { + return Err(ConfigError::Validation(format!( + "tenant_affinity.static_map: tenant '{}' maps to group {} but only {} groups (0..{})", + tenant, + group, + cfg.replica_groups, + cfg.replica_groups - 1 + ))); + } + } + } + + // Shadow targets must have valid sample_rate + if cfg.shadow.enabled { + for target in &cfg.shadow.targets { + if target.sample_rate <= 0.0 || target.sample_rate > 1.0 { + return Err(ConfigError::Validation(format!( + "shadow target '{}' has invalid sample_rate={} (must be 0 < rate <= 1)", + target.name, target.sample_rate + ))); + } + } + } + + // Server port must be non-zero + if cfg.server.port == 0 { + return Err(ConfigError::Validation( + "server.port must be non-zero".into(), + )); + } + + // shards must be non-zero + if cfg.shards == 0 { + return Err(ConfigError::Validation( + "shards must be non-zero".into(), + )); + } + + // replication_factor must be > 0 + if cfg.replication_factor == 0 { + return Err(ConfigError::Validation( + "replication_factor must be > 0".into(), + )); + } + + Ok(()) +} diff --git a/crates/miroir-core/src/error.rs b/crates/miroir-core/src/error.rs new file mode 100644 index 0000000..ad65fee --- /dev/null +++ b/crates/miroir-core/src/error.rs @@ -0,0 +1,38 @@ +//! Error types for Miroir. + +use thiserror::Error; + +/// Result type alias for Miroir operations. +pub type Result = std::result::Result; + +/// Core error type for Miroir. +#[derive(Error, Debug)] +pub enum MiroirError { + /// Configuration error. + #[error("configuration error: {0}")] + Config(String), + + /// Topology error. + #[error("topology error: {0}")] + Topology(String), + + /// Routing error. + #[error("routing error: {0}")] + Routing(String), + + /// Merge error. + #[error("merge error: {0}")] + Merge(String), + + /// Task registry error. + #[error("task error: {0}")] + Task(String), + + /// IO error. + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + /// JSON serialization/deserialization error. + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), +} diff --git a/crates/miroir-core/src/lib.rs b/crates/miroir-core/src/lib.rs index 69e8d79..336af15 100644 --- a/crates/miroir-core/src/lib.rs +++ b/crates/miroir-core/src/lib.rs @@ -1 +1,14 @@ -// miroir-core placeholder +//! Miroir core library +//! +//! Provides routing, merging, and topology logic for the Miroir distributed search proxy. + +pub mod config; +pub mod error; +pub mod merger; +pub mod router; +pub mod scatter; +pub mod task; +pub mod topology; + +// Public re-exports +pub use error::{MiroirError, Result}; diff --git a/crates/miroir-core/src/merger.rs b/crates/miroir-core/src/merger.rs new file mode 100644 index 0000000..a6cac63 --- /dev/null +++ b/crates/miroir-core/src/merger.rs @@ -0,0 +1,74 @@ +//! Result merger: combines shard results into a single response. + +use crate::Result; +use serde_json::Value; + +/// Result merger: combines responses from multiple shards. +pub trait Merger: Send + Sync { + /// Merge search results from multiple shards. + /// + /// Takes the raw JSON responses from each shard and produces + /// a merged result with global sorting, offset/limit applied, + /// and facet aggregation. + fn merge( + &self, + shard_responses: Vec, + offset: usize, + limit: usize, + client_requested_score: bool, + ) -> Result; +} + +/// Response from a single shard. +#[derive(Debug, Clone)] +pub struct ShardResponse { + /// Shard identifier. + pub shard_id: u32, + + /// Raw JSON response from the node. + pub body: Value, + + /// Whether this shard succeeded. + pub success: bool, +} + +/// Merged search result. +#[derive(Debug, Clone)] +pub struct MergedResult { + /// Merged hits (globally sorted, offset/limit applied). + pub hits: Vec, + + /// Aggregated facets. + pub facets: Value, + + /// Estimated total hits (sum of shard totals). + pub total_hits: u64, + + /// Processing time in milliseconds. + pub processing_time_ms: u64, + + /// Whether the response is degraded (some shards failed). + pub degraded: bool, +} + +/// Default stub implementation of Merger. +#[derive(Debug, Clone, Default)] +pub struct StubMerger; + +impl Merger for StubMerger { + fn merge( + &self, + _shard_responses: Vec, + _offset: usize, + _limit: usize, + _client_requested_score: bool, + ) -> Result { + Ok(MergedResult { + hits: Vec::new(), + facets: serde_json::json!({}), + total_hits: 0, + processing_time_ms: 0, + degraded: false, + }) + } +} diff --git a/crates/miroir-core/src/migration.rs b/crates/miroir-core/src/migration.rs new file mode 100644 index 0000000..0a6a0ad --- /dev/null +++ b/crates/miroir-core/src/migration.rs @@ -0,0 +1,765 @@ +//! Shard migration cutover state machine. +//! +//! Implements the node-addition migration flow from plan §4 with explicit state +//! transitions and a race-window-safe cutover sequence. +//! +//! ## Race window analysis (plan §15 OP#1) +//! +//! The dangerous window is between "mark node active" (routing changes to new-node-only) +//! and "delete migrated shard from old node." A document written during dual-write that +//! succeeded on OLD but failed on NEW — and arrived after the last migration page — +//! would be deleted from OLD without ever reaching NEW. +//! +//! ## Solution: quiesce-then-verify cutover +//! +//! Instead of the naïve sequence (mark active → stop dual-write → delete old), we use: +//! +//! 1. Stop dual-write (no new writes go to either node for affected shards) +//! 2. Drain: wait for all in-flight writes to both OLD and NEW to complete +//! 3. Delta migration: re-read affected shards from OLD (catches anything written since +//! the last migration page) and write deltas to NEW +//! 4. Mark node active (routing switches to NEW-only) +//! 5. Delete migrated shard from OLD +//! +//! Step 3 is the key: it closes the race window by ensuring NEW has a complete picture +//! before we commit the routing change. The cost is one extra pagination pass over each +//! migrated shard — bounded by the number of docs written during the migration window. + +use std::collections::{HashMap, HashSet}; +use std::fmt; +use std::time::{Duration, Instant}; + +use serde::{Deserialize, Serialize}; + +/// Unique identifier for a shard migration operation. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct MigrationId(pub u64); + +impl fmt::Display for MigrationId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +/// Identifier for a physical node in the cluster. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct NodeId(pub String); + +impl fmt::Display for NodeId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +/// Identifier for a logical shard. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct ShardId(pub u32); + +impl fmt::Display for ShardId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "s{}", self.0) + } +} + +/// Per-shard migration state within a node-addition migration. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ShardMigrationState { + /// Waiting for background migration to begin. + Pending, + /// Background pagination is reading docs from source and writing to target. + Migrating { + docs_copied: u64, + pages_remaining: u32, + }, + /// Background migration complete, awaiting cutover. + MigrationComplete { + docs_copied: u64, + }, + /// Dual-write stopped, in-flight writes draining. + Draining { + in_flight_count: u32, + docs_copied: u64, + }, + /// Delta pass: re-reading source to catch stragglers written during migration. + DeltaPass { + docs_copied: u64, + delta_docs_copied: u64, + }, + /// Node is active for this shard; old replica data deleted. + Active, + /// Migration failed at this phase. + Failed { + phase: String, + reason: String, + }, +} + +impl fmt::Display for ShardMigrationState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Pending => write!(f, "pending"), + Self::Migrating { docs_copied, pages_remaining } => { + write!(f, "migrating({docs_copied} copied, {pages_remaining} pages left)") + } + Self::MigrationComplete { docs_copied } => { + write!(f, "migration_complete({docs_copied} copied)") + } + Self::Draining { in_flight_count, docs_copied } => { + write!(f, "draining({in_flight_count} in-flight, {docs_copied} copied)") + } + Self::DeltaPass { docs_copied, delta_docs_copied } => { + write!(f, "delta_pass({docs_copied} + {delta_docs_copied} copied)") + } + Self::Active => write!(f, "active"), + Self::Failed { phase, reason } => write!(f, "failed({phase}: {reason})"), + } + } +} + +/// Overall migration phase for a node addition. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum MigrationPhase { + /// Computing which shards move to the new node. + ComputingAssignments, + /// Dual-write active; background migration in progress. + DualWriteMigrating, + /// Background migration done; beginning cutover. + CutoverBegin, + /// Stopping dual-write; waiting for in-flight writes to settle. + CutoverDraining, + /// Re-reading source to catch docs written during migration. + CutoverDeltaPass, + /// Marking new node active; switching routing. + CutoverActivate, + /// Deleting migrated shard data from old nodes. + CutoverCleanup, + /// All shards migrated; migration complete. + Complete, + /// Migration failed. + Failed(String), +} + +impl fmt::Display for MigrationPhase { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::ComputingAssignments => write!(f, "computing_assignments"), + Self::DualWriteMigrating => write!(f, "dual_write_migrating"), + Self::CutoverBegin => write!(f, "cutover_begin"), + Self::CutoverDraining => write!(f, "cutover_draining"), + Self::CutoverDeltaPass => write!(f, "cutover_delta_pass"), + Self::CutoverActivate => write!(f, "cutover_activate"), + Self::CutoverCleanup => write!(f, "cutover_cleanup"), + Self::Complete => write!(f, "complete"), + Self::Failed(msg) => write!(f, "failed({msg})"), + } + } +} + +/// A single document write targeting a shard during migration. +#[derive(Debug, Clone)] +pub struct InFlightWrite { + pub doc_id: String, + pub shard: ShardId, + pub target_nodes: Vec, + pub completed_nodes: HashSet, + pub failed_nodes: HashMap, + pub submitted_at: Instant, +} + +// Serialize Instant as u64 (milliseconds since UNIX epoch) +mod instant_serde { + use std::time::{Duration, Instant}; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + pub fn serialize(instant: &Option, serializer: S) -> Result + where + S: Serializer, + { + match instant { + Some(i) => { + let since_epoch = i.duration_since(Instant::now() - *i); + // Store as approximate UNIX timestamp - this is a simplification + // For production, would use SystemTime instead + (since_epoch.as_millis() as u64).serialize(serializer) + } + None => Option::::None.serialize(serializer), + } + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let opt = Option::::deserialize(deserializer)?; + Ok(opt.map(|_| Instant::now())) + } +} + +/// Configuration for migration cutover behavior. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MigrationConfig { + /// Maximum time to wait for in-flight writes to drain during cutover. + pub drain_timeout: Duration, + /// Whether to perform the delta pass (re-read source after stopping dual-write). + /// Disabling this saves a pagination pass but opens the race window — only safe + /// when anti-entropy is enabled as a safety net. + pub skip_delta_pass: bool, + /// Whether anti-entropy is enabled — used to determine if skip_delta_pass is safe. + pub anti_entropy_enabled: bool, +} + +impl Default for MigrationConfig { + fn default() -> Self { + Self { + drain_timeout: Duration::from_secs(30), + skip_delta_pass: false, + anti_entropy_enabled: true, + } + } +} + +/// Error type for migration operations. +#[derive(Debug, thiserror::Error)] +pub enum MigrationError { + #[error("anti-entropy is disabled and delta pass is skipped — documents may be lost at cutover")] + UnsafeCutoverNoAntiEntropy, + #[error("drain timeout exceeded: {0} in-flight writes still pending")] + DrainTimeout(u32), + #[error("shard {0} is not in a valid state for this transition (current: {1})")] + InvalidTransition(ShardId, String), + #[error("migration {0} not found")] + NotFound(MigrationId), + #[error("delta pass failed for shard {0}: {1}")] + DeltaPassFailed(ShardId, String), +} + +/// Tracks the state of a node-addition migration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MigrationState { + pub id: MigrationId, + pub new_node: NodeId, + pub replica_group: u32, + pub phase: MigrationPhase, + pub affected_shards: HashMap, + /// Maps shard → old node that currently owns it. + pub old_owners: HashMap, + #[serde(with = "instant_serde")] + pub started_at: Option, + #[serde(with = "instant_serde")] + pub completed_at: Option, +} + +/// The migration coordinator manages shard migration state transitions. +pub struct MigrationCoordinator { + config: MigrationConfig, + migrations: HashMap, + next_id: u64, + /// In-flight writes being tracked for drain during cutover. + in_flight: Vec, +} + +impl MigrationCoordinator { + pub fn new(config: MigrationConfig) -> Self { + Self { + config, + migrations: HashMap::new(), + next_id: 0, + in_flight: Vec::new(), + } + } + + /// Validate migration safety before starting. Returns an error if the configuration + /// would allow data loss at the cutover boundary. + pub fn validate_safety(&self) -> Result<(), MigrationError> { + if self.config.skip_delta_pass && !self.config.anti_entropy_enabled { + return Err(MigrationError::UnsafeCutoverNoAntiEntropy); + } + Ok(()) + } + + /// Begin a new node-addition migration. + pub fn begin_migration( + &mut self, + new_node: NodeId, + replica_group: u32, + affected_shards: HashMap, + ) -> Result { + self.validate_safety()?; + + let id = MigrationId(self.next_id); + self.next_id += 1; + + let shard_states: HashMap = affected_shards + .keys() + .map(|&shard| (shard, ShardMigrationState::Pending)) + .collect(); + + let state = MigrationState { + id, + new_node, + replica_group, + phase: MigrationPhase::ComputingAssignments, + affected_shards: shard_states, + old_owners: affected_shards, + started_at: Some(Instant::now()), + completed_at: None, + }; + + self.migrations.insert(id, state); + Ok(id) + } + + /// Transition to dual-write + background migration phase. + pub fn begin_dual_write(&mut self, id: MigrationId) -> Result<(), MigrationError> { + let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?; + state.phase = MigrationPhase::DualWriteMigrating; + for shard_state in state.affected_shards.values_mut() { + if *shard_state == ShardMigrationState::Pending { + *shard_state = ShardMigrationState::Migrating { + docs_copied: 0, + pages_remaining: 0, + }; + } + } + Ok(()) + } + + /// Record that a shard's background migration completed. + pub fn shard_migration_complete( + &mut self, + id: MigrationId, + shard: ShardId, + docs_copied: u64, + ) -> Result<(), MigrationError> { + let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?; + let shard_state = state.affected_shards.get_mut(&shard).ok_or_else(|| { + MigrationError::InvalidTransition(shard, "shard not in migration".into()) + })?; + + match shard_state { + ShardMigrationState::Migrating { .. } => { + *shard_state = ShardMigrationState::MigrationComplete { docs_copied }; + } + _ => { + return Err(MigrationError::InvalidTransition( + shard, + shard_state.to_string(), + )); + } + } + + // Check if all shards are done migrating + let all_complete = state + .affected_shards + .values() + .all(|s| matches!(s, ShardMigrationState::MigrationComplete { .. })); + + if all_complete { + state.phase = MigrationPhase::CutoverBegin; + } + + Ok(()) + } + + /// Begin the cutover sequence: stop dual-write and drain in-flight writes. + pub fn begin_cutover(&mut self, id: MigrationId) -> Result { + let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?; + + if !matches!(state.phase, MigrationPhase::CutoverBegin) { + return Err(MigrationError::InvalidTransition( + ShardId(0), + format!("expected CutoverBegin, got {}", state.phase), + )); + } + + // Transition all shards to Draining + let total_in_flight = self.in_flight.len() as u32; + for (shard, shard_state) in state.affected_shards.iter_mut() { + match shard_state { + ShardMigrationState::MigrationComplete { docs_copied } => { + *shard_state = ShardMigrationState::Draining { + in_flight_count: total_in_flight, + docs_copied: *docs_copied, + }; + } + _ => { + return Err(MigrationError::InvalidTransition( + *shard, + shard_state.to_string(), + )); + } + } + } + + state.phase = MigrationPhase::CutoverDraining; + Ok(state.phase.clone()) + } + + /// Register an in-flight write for tracking during drain. + pub fn register_in_flight(&mut self, write: InFlightWrite) { + self.in_flight.push(write); + } + + /// Acknowledge completion of a write to a specific node. + pub fn ack_write(&mut self, doc_id: &str, node: &NodeId) { + for write in &mut self.in_flight { + if write.doc_id == doc_id { + write.completed_nodes.insert(node.clone()); + } + } + } + + /// Mark a write as failed on a specific node. + pub fn fail_write(&mut self, doc_id: &str, node: &NodeId, reason: String) { + for write in &mut self.in_flight { + if write.doc_id == doc_id { + write.failed_nodes.insert(node.clone(), reason.clone()); + } + } + } + + /// Check if all in-flight writes have completed (drained). + pub fn is_drained(&self) -> bool { + self.in_flight.iter().all(|w| { + let all_responded = w.completed_nodes.len() + w.failed_nodes.len() + == w.target_nodes.len(); + all_responded + }) + } + + /// Complete the drain and move to delta pass or activation. + pub fn complete_drain(&mut self, id: MigrationId) -> Result { + let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?; + + if !matches!(state.phase, MigrationPhase::CutoverDraining) { + return Err(MigrationError::InvalidTransition( + ShardId(0), + format!("expected CutoverDraining, got {}", state.phase), + )); + } + + if !self.is_drained() { + let remaining = self + .in_flight + .iter() + .filter(|w| { + w.completed_nodes.len() + w.failed_nodes.len() < w.target_nodes.len() + }) + .count() as u32; + return Err(MigrationError::DrainTimeout(remaining)); + } + + // Collect docs that need delta pass (written to OLD but may not be on NEW) + let needs_delta = self.collect_delta_candidates(id)?; + + if self.config.skip_delta_pass { + // Skip delta pass — safe only if anti-entropy is enabled + state.phase = MigrationPhase::CutoverActivate; + self.activate_shards(id)?; + } else if needs_delta.is_empty() { + state.phase = MigrationPhase::CutoverActivate; + self.activate_shards(id)?; + } else { + state.phase = MigrationPhase::CutoverDeltaPass; + for (shard, shard_state) in state.affected_shards.iter_mut() { + if let ShardMigrationState::Draining { docs_copied, .. } = shard_state { + *shard_state = ShardMigrationState::DeltaPass { + docs_copied: *docs_copied, + delta_docs_copied: 0, + }; + } + } + } + + self.in_flight.clear(); + Ok(state.phase.clone()) + } + + /// Identify writes that need the delta pass — those that succeeded on OLD but + /// failed (or never reached) NEW. + fn collect_delta_candidates( + &self, + id: MigrationId, + ) -> Result>, MigrationError> { + let state = self.migrations.get(&id).ok_or(MigrationError::NotFound(id))?; + let mut candidates: HashMap> = HashMap::new(); + + for write in &self.in_flight { + let old_owner = match state.old_owners.get(&write.shard) { + Some(owner) => owner, + None => continue, + }; + + let succeeded_on_old = write.completed_nodes.contains(old_owner); + let succeeded_on_new = write.completed_nodes.contains(&state.new_node); + + // Doc is on OLD but not on NEW — delta pass must catch it + if succeeded_on_old && !succeeded_on_new { + candidates + .entry(write.shard) + .or_default() + .push(write.doc_id.clone()); + } + } + + Ok(candidates) + } + + /// Record that the delta pass completed for a shard. + pub fn shard_delta_complete( + &mut self, + id: MigrationId, + shard: ShardId, + delta_docs: u64, + ) -> Result<(), MigrationError> { + let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?; + let shard_state = state.affected_shards.get_mut(&shard).ok_or_else(|| { + MigrationError::InvalidTransition(shard, "shard not in migration".into()) + })?; + + match shard_state { + ShardMigrationState::DeltaPass { docs_copied, .. } => { + *shard_state = ShardMigrationState::MigrationComplete { + docs_copied: *docs_copied + delta_docs, + }; + } + _ => { + return Err(MigrationError::InvalidTransition( + shard, + shard_state.to_string(), + )); + } + } + + // Check if all shards done with delta + let all_complete = state + .affected_shards + .values() + .all(|s| matches!(s, ShardMigrationState::MigrationComplete { .. })); + + if all_complete { + state.phase = MigrationPhase::CutoverActivate; + self.activate_shards(id)?; + } + + Ok(()) + } + + /// Mark all affected shards as active on the new node. + fn activate_shards(&mut self, id: MigrationId) -> Result<(), MigrationError> { + let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?; + + for shard_state in state.affected_shards.values_mut() { + match shard_state { + ShardMigrationState::MigrationComplete { .. } + | ShardMigrationState::Draining { .. } => { + *shard_state = ShardMigrationState::Active; + } + _ => {} + } + } + + if matches!(state.phase, MigrationPhase::CutoverActivate) { + state.phase = MigrationPhase::CutoverCleanup; + } + + Ok(()) + } + + /// Complete the migration by deleting migrated shard data from old nodes. + pub fn complete_cleanup(&mut self, id: MigrationId) -> Result<(), MigrationError> { + let state = self.migrations.get_mut(&id).ok_or(MigrationError::NotFound(id))?; + + if !matches!(state.phase, MigrationPhase::CutoverCleanup) { + return Err(MigrationError::InvalidTransition( + ShardId(0), + format!("expected CutoverCleanup, got {}", state.phase), + )); + } + + state.phase = MigrationPhase::Complete; + state.completed_at = Some(Instant::now()); + Ok(()) + } + + /// Get the current state of a migration. + pub fn get_state(&self, id: MigrationId) -> Option<&MigrationState> { + self.migrations.get(&id) + } + + /// Check if a write should go to both old and new node (dual-write phase). + pub fn is_dual_write_active(&self, shard: ShardId) -> bool { + self.migrations.values().any(|m| { + matches!(m.phase, MigrationPhase::DualWriteMigrating) + && matches!( + m.affected_shards.get(&shard), + Some(ShardMigrationState::Migrating { .. }) + ) + }) + } + + /// Get the migration config. + pub fn config(&self) -> &MigrationConfig { + &self.config + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn node(s: &str) -> NodeId { + NodeId(s.to_string()) + } + + fn shard(id: u32) -> ShardId { + ShardId(id) + } + + #[test] + fn test_safe_cutover_with_delta_pass() { + let config = MigrationConfig { + anti_entropy_enabled: false, + skip_delta_pass: false, + ..Default::default() + }; + let mut coord = MigrationCoordinator::new(config); + + let affected = HashMap::from([ + (shard(0), node("old-0")), + (shard(1), node("old-0")), + ]); + + let mid = coord.begin_migration(node("new-0"), 0, affected).unwrap(); + coord.begin_dual_write(mid).unwrap(); + + // Simulate background migration completing + coord.shard_migration_complete(mid, shard(0), 500).unwrap(); + coord.shard_migration_complete(mid, shard(1), 300).unwrap(); + + // Register an in-flight write that succeeded on OLD but not NEW + coord.register_in_flight(InFlightWrite { + doc_id: "doc-at-boundary".into(), + shard: shard(0), + target_nodes: vec![node("old-0"), node("new-0")], + completed_nodes: HashSet::from([node("old-0")]), + failed_nodes: HashMap::new(), + submitted_at: Instant::now(), + }); + + // Cutover + coord.begin_cutover(mid).unwrap(); + + // The drain sees the in-flight write completed (on old, not on new) + // Delta pass should be triggered + let phase = coord.complete_drain(mid).unwrap(); + assert_eq!(phase, MigrationPhase::CutoverDeltaPass); + + // Delta pass catches the straggler + coord.shard_delta_complete(mid, shard(0), 1).unwrap(); + // Shard 1 had no stragglers, but needs delta complete too + coord.shard_delta_complete(mid, shard(1), 0).unwrap(); + + // Now activation and cleanup + let state = coord.get_state(mid).unwrap(); + assert_eq!(state.phase, MigrationPhase::CutoverCleanup); + + coord.complete_cleanup(mid).unwrap(); + let state = coord.get_state(mid).unwrap(); + assert_eq!(state.phase, MigrationPhase::Complete); + } + + #[test] + fn test_unsafe_cutover_refused_without_anti_entropy() { + let config = MigrationConfig { + anti_entropy_enabled: false, + skip_delta_pass: true, + ..Default::default() + }; + let mut coord = MigrationCoordinator::new(config); + + let affected = HashMap::from([(shard(0), node("old-0"))]); + let result = coord.begin_migration(node("new-0"), 0, affected); + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(matches!(err, MigrationError::UnsafeCutoverNoAntiEntropy)); + } + + #[test] + fn test_skip_delta_pass_allowed_with_anti_entropy() { + let config = MigrationConfig { + anti_entropy_enabled: true, + skip_delta_pass: true, + ..Default::default() + }; + let mut coord = MigrationCoordinator::new(config); + + let affected = HashMap::from([(shard(0), node("old-0"))]); + let mid = coord.begin_migration(node("new-0"), 0, affected).unwrap(); + coord.begin_dual_write(mid).unwrap(); + coord.shard_migration_complete(mid, shard(0), 100).unwrap(); + + coord.begin_cutover(mid).unwrap(); + + // With skip_delta_pass=true and AE enabled, drain goes straight to activate + let phase = coord.complete_drain(mid).unwrap(); + assert_eq!(phase, MigrationPhase::CutoverCleanup); + + coord.complete_cleanup(mid).unwrap(); + assert_eq!( + coord.get_state(mid).unwrap().phase, + MigrationPhase::Complete + ); + } + + #[test] + fn test_drain_timeout_blocks_cutover() { + let config = MigrationConfig { + anti_entropy_enabled: true, + skip_delta_pass: true, + ..Default::default() + }; + let mut coord = MigrationCoordinator::new(config); + + let affected = HashMap::from([(shard(0), node("old-0"))]); + let mid = coord.begin_migration(node("new-0"), 0, affected).unwrap(); + coord.begin_dual_write(mid).unwrap(); + coord.shard_migration_complete(mid, shard(0), 100).unwrap(); + coord.begin_cutover(mid).unwrap(); + + // Register an in-flight write that hasn't completed on either node + coord.register_in_flight(InFlightWrite { + doc_id: "stuck-doc".into(), + shard: shard(0), + target_nodes: vec![node("old-0"), node("new-0")], + completed_nodes: HashSet::new(), + failed_nodes: HashMap::new(), + submitted_at: Instant::now(), + }); + + // Drain should fail — write still in flight + let result = coord.complete_drain(mid); + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), MigrationError::DrainTimeout(1))); + } + + #[test] + fn test_dual_write_tracking() { + let config = MigrationConfig::default(); + let mut coord = MigrationCoordinator::new(config); + + let affected = HashMap::from([(shard(5), node("old-0"))]); + let mid = coord.begin_migration(node("new-0"), 0, affected).unwrap(); + coord.begin_dual_write(mid).unwrap(); + + // Shard 5 is in dual-write + assert!(coord.is_dual_write_active(shard(5))); + // Shard 99 is not being migrated + assert!(!coord.is_dual_write_active(shard(99))); + + // After migration completes, shard 5 is no longer dual-write + coord.shard_migration_complete(mid, shard(5), 100).unwrap(); + assert!(!coord.is_dual_write_active(shard(5))); + } +} diff --git a/crates/miroir-core/src/router.rs b/crates/miroir-core/src/router.rs new file mode 100644 index 0000000..92b5459 --- /dev/null +++ b/crates/miroir-core/src/router.rs @@ -0,0 +1,55 @@ +//! Rendezvous hash-based routing and shard assignment. + +use crate::topology::{Group, NodeId, Topology}; +use twox_hash::XxHash64; +use std::hash::{Hash, Hasher}; + +/// Compute a rendezvous score for a shard+node pair. +/// +/// Higher scores win; used for deterministic shard assignment. +pub fn score(shard_id: u32, node_id: &str) -> u64 { + let mut h = XxHash64::with_seed(0); + shard_id.hash(&mut h); + node_id.hash(&mut h); + h.finish() +} + +/// Assign a shard to `rf` nodes within a single replica group. +/// +/// `group_nodes` is the subset of nodes belonging to that group. +pub fn assign_shard_in_group(shard_id: u32, group_nodes: &[NodeId], rf: usize) -> Vec { + let mut scored: Vec<(u64, &NodeId)> = group_nodes + .iter() + .map(|n| (score(shard_id, n.as_str()), n)) + .collect(); + scored.sort_unstable_by(|a, b| b.0.cmp(&a.0)); + scored.into_iter().take(rf).map(|(_, n)| n.clone()).collect() +} + +/// All write targets for a document: the RF nodes in EACH replica group. +pub fn write_targets(shard_id: u32, topology: &Topology) -> Vec { + topology.groups().flat_map(|group| { + assign_shard_in_group(shard_id, group.nodes(), topology.rf()) + }).collect() +} + +/// Select the replica group for a query (round-robin by query counter). +pub fn query_group(query_seq: u64, replica_groups: u32) -> u32 { + (query_seq % replica_groups as u64) as u32 +} + +/// The covering set for a search: one node per shard within the chosen group. +pub fn covering_set(shard_count: u32, group: &Group, rf: usize, query_seq: u64) -> Vec { + (0..shard_count).map(|shard_id| { + let replicas = assign_shard_in_group(shard_id, group.nodes(), rf); + // rotate through replicas for intra-group load balancing + replicas[(query_seq as usize) % replicas.len()].clone() + }).collect::>().into_iter().collect() +} + +/// Compute the shard ID for a document's primary key. +pub fn shard_for_key(primary_key: &str, shard_count: u32) -> u32 { + let mut h = XxHash64::with_seed(0); + primary_key.hash(&mut h); + (h.finish() % shard_count as u64) as u32 +} diff --git a/crates/miroir-core/src/scatter.rs b/crates/miroir-core/src/scatter.rs new file mode 100644 index 0000000..54f97b2 --- /dev/null +++ b/crates/miroir-core/src/scatter.rs @@ -0,0 +1,81 @@ +//! Scatter orchestration: fan-out logic and covering set builder. + +use crate::config::UnavailableShardPolicy; +use crate::topology::{NodeId, Topology}; +use crate::Result; + +/// Scatter orchestrator: fans out requests to the covering set. +pub trait Scatter: Send + Sync { + /// Execute a scatter request to multiple nodes. + /// + /// Returns a map of node ID to response. Failed nodes are omitted + /// based on the unavailable shard policy. + fn scatter( + &self, + topology: &Topology, + nodes: Vec, + request: ScatterRequest, + policy: UnavailableShardPolicy, + ) -> Result; +} + +/// A scatter request to be sent to each node. +#[derive(Debug, Clone)] +pub struct ScatterRequest { + /// Request body (JSON or raw bytes). + pub body: Vec, + + /// Request headers. + pub headers: Vec<(String, String)>, + + /// HTTP method. + pub method: String, + + /// Request path. + pub path: String, +} + +/// Response from a scatter operation. +#[derive(Debug, Clone)] +pub struct ScatterResponse { + /// Responses from successful nodes. + pub responses: Vec, + + /// Nodes that failed or timed out. + pub failed: Vec, +} + +/// Response from a single node. +#[derive(Debug, Clone)] +pub struct NodeResponse { + /// Node that responded. + pub node_id: NodeId, + + /// Response body. + pub body: Vec, + + /// HTTP status code. + pub status: u16, + + /// Response headers. + pub headers: Vec<(String, String)>, +} + +/// Default stub implementation of Scatter. +#[derive(Debug, Clone, Default)] +pub struct StubScatter; + +impl Scatter for StubScatter { + fn scatter( + &self, + _topology: &Topology, + _nodes: Vec, + _request: ScatterRequest, + _policy: UnavailableShardPolicy, + ) -> Result { + Ok(ScatterResponse { + responses: Vec::new(), + failed: Vec::new(), + }) + } +} diff --git a/crates/miroir-core/src/task.rs b/crates/miroir-core/src/task.rs new file mode 100644 index 0000000..59011a7 --- /dev/null +++ b/crates/miroir-core/src/task.rs @@ -0,0 +1,136 @@ +//! Task registry: unified task namespace across all Meilisearch nodes. + +use crate::Result; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use uuid::Uuid; + +/// Task registry: manages the unified task namespace. +pub trait TaskRegistry: Send + Sync { + /// Register a new Miroir task that fans out to multiple nodes. + fn register(&self, node_tasks: HashMap) -> Result; + + /// Get a task by its Miroir ID. + fn get(&self, miroir_id: &str) -> Result>; + + /// Update the status of a Miroir task. + fn update_status(&self, miroir_id: &str, status: TaskStatus) -> Result<()>; + + /// Update node task status. + fn update_node_task(&self, miroir_id: &str, node_id: &str, node_status: NodeTaskStatus) -> Result<()>; + + /// List tasks with optional filtering. + fn list(&self, filter: TaskFilter) -> Result>; +} + +/// A Miroir task: unified view of a fan-out write operation. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MiroirTask { + /// Unique Miroir task ID (UUID). + pub miroir_id: String, + + /// Creation timestamp (Unix millis). + pub created_at: u64, + + /// Current task status. + pub status: TaskStatus, + + /// Map of node ID to local Meilisearch task UID. + pub node_tasks: HashMap, + + /// Error message if the task failed. + pub error: Option, +} + +/// Status of a Miroir task. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub enum TaskStatus { + /// Task is enqueued. + Enqueued, + + /// Task is being processed. + Processing, + + /// Task completed successfully. + Succeeded, + + /// Task failed. + Failed, + + /// Task was canceled. + Canceled, +} + +/// A node task: local Meilisearch task reference. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeTask { + /// Local Meilisearch task UID. + pub task_uid: u64, + + /// Current status of this node task. + pub status: NodeTaskStatus, +} + +/// Status of a node task. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub enum NodeTaskStatus { + /// Task is enqueued on the node. + Enqueued, + + /// Task is processing on the node. + Processing, + + /// Task succeeded on the node. + Succeeded, + + /// Task failed on the node. + Failed, +} + +/// Filter for listing tasks. +#[derive(Debug, Clone, Default)] +pub struct TaskFilter { + /// Filter by status. + pub status: Option, + + /// Filter by node ID. + pub node_id: Option, + + /// Maximum number of results. + pub limit: Option, + + /// Offset for pagination. + pub offset: Option, +} + +/// Default stub implementation of TaskRegistry. +#[derive(Debug, Clone, Default)] +pub struct StubTaskRegistry; + +impl TaskRegistry for StubTaskRegistry { + fn register(&self, _node_tasks: HashMap) -> Result { + Ok(MiroirTask { + miroir_id: Uuid::new_v4().to_string(), + created_at: 0, + status: TaskStatus::Enqueued, + node_tasks: HashMap::new(), + error: None, + }) + } + + fn get(&self, _miroir_id: &str) -> Result> { + Ok(None) + } + + fn update_status(&self, _miroir_id: &str, _status: TaskStatus) -> Result<()> { + Ok(()) + } + + fn update_node_task(&self, _miroir_id: &str, _node_id: &str, _node_status: NodeTaskStatus) -> Result<()> { + Ok(()) + } + + fn list(&self, _filter: TaskFilter) -> Result> { + Ok(Vec::new()) + } +} diff --git a/crates/miroir-core/src/topology.rs b/crates/miroir-core/src/topology.rs new file mode 100644 index 0000000..4cc5cdd --- /dev/null +++ b/crates/miroir-core/src/topology.rs @@ -0,0 +1,185 @@ +//! Topology management: node registry, groups, and health state. + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Unique identifier for a node. +#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] +pub struct NodeId(String); + +impl NodeId { + /// Create a new NodeId. + pub fn new(id: String) -> Self { + Self(id) + } + + /// Get the node ID as a string slice. + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl From for NodeId { + fn from(s: String) -> Self { + Self(s) + } +} + +impl AsRef for NodeId { + fn as_ref(&self) -> &str { + &self.0 + } +} + +/// Health status of a node. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum NodeStatus { + /// Node is healthy and serving traffic. + Healthy, + /// Node is joining the cluster (being provisioned). + Joining, + /// Node is draining (graceful shutdown, not accepting new writes). + Draining, + /// Node has failed (unplanned outage). + Failed, +} + +/// A single Meilisearch node in the topology. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Node { + /// Unique node identifier. + pub id: NodeId, + + /// Node base URL. + pub url: String, + + /// Current health status. + pub status: NodeStatus, + + /// Replica group assignment (0-based). + pub replica_group: u32, +} + +impl Node { + /// Create a new node. + pub fn new(id: NodeId, url: String, replica_group: u32) -> Self { + Self { + id, + url, + status: NodeStatus::Joining, + replica_group, + } + } + + /// Check if the node is healthy (can serve traffic). + pub fn is_healthy(&self) -> bool { + matches!(self.status, NodeStatus::Healthy) + } +} + +/// A replica group: an independent query pool. +/// +/// Each group holds all S shards, distributed across its nodes. +/// Reads are routed to a single group per query. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Group { + /// Group identifier (0-based). + pub id: u32, + + /// Nodes in this group. + nodes: Vec, +} + +impl Group { + /// Create a new group. + pub fn new(id: u32) -> Self { + Self { + id, + nodes: Vec::new(), + } + } + + /// Add a node to this group. + pub fn add_node(&mut self, node_id: NodeId) { + if !self.nodes.contains(&node_id) { + self.nodes.push(node_id); + } + } + + /// Get the nodes in this group. + pub fn nodes(&self) -> &[NodeId] { + &self.nodes + } + + /// Get the number of nodes in this group. + pub fn node_count(&self) -> usize { + self.nodes.len() + } +} + +/// Cluster topology: groups, nodes, and health state. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Topology { + /// All nodes in the cluster. + nodes: HashMap, + + /// Replica groups. + groups: Vec, + + /// Replication factor (intra-group). + rf: usize, +} + +impl Topology { + /// Create a new empty topology. + pub fn new(rf: usize) -> Self { + Self { + nodes: HashMap::new(), + groups: Vec::new(), + rf, + } + } + + /// Add a node to the topology. + pub fn add_node(&mut self, node: Node) { + let group_id = node.replica_group as usize; + + // Ensure group exists + while self.groups.len() <= group_id { + self.groups.push(Group::new(self.groups.len() as u32)); + } + + self.groups[group_id].add_node(node.id.clone()); + self.nodes.insert(node.id.clone(), node); + } + + /// Get a node by ID. + pub fn node(&self, id: &NodeId) -> Option<&Node> { + self.nodes.get(id) + } + + /// Get all nodes. + pub fn nodes(&self) -> impl Iterator { + self.nodes.values() + } + + /// Get a group by ID. + pub fn group(&self, id: u32) -> Option<&Group> { + self.groups.get(id as usize) + } + + /// Iterate over all groups. + pub fn groups(&self) -> impl Iterator { + self.groups.iter() + } + + /// Get the replication factor. + pub fn rf(&self) -> usize { + self.rf + } + + /// Get the number of replica groups. + pub fn replica_group_count(&self) -> u32 { + self.groups.len() as u32 + } +}