diff --git a/crates/miroir-core/src/alias.rs b/crates/miroir-core/src/alias.rs index 0c147e8..b3e29fe 100644 --- a/crates/miroir-core/src/alias.rs +++ b/crates/miroir-core/src/alias.rs @@ -6,10 +6,12 @@ //! (read-only, used by ILM) aliases. use crate::error::{MiroirError, Result}; +use crate::task_store::{AliasRow, AliasHistoryEntry, TaskStore}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; +use tracing::{info, warn, error}; /// Alias kind: single-target (writable) or multi-target (read-only, ILM-managed). #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -35,7 +37,8 @@ pub struct Alias { /// Created at timestamp. pub created_at: u64, /// Last updated timestamp. - pub updated_at: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub updated_at: Option, } impl Alias { @@ -52,7 +55,7 @@ impl Alias { target_uids: None, generation: 0, created_at: now, - updated_at: now, + updated_at: Some(now), } } @@ -69,10 +72,15 @@ impl Alias { target_uids: Some(target_uids), generation: 0, created_at: now, - updated_at: now, + updated_at: Some(now), } } + /// Check if this alias is multi-target (read-only, ILM-managed). + pub fn is_multi_target(&self) -> bool { + matches!(self.kind, AliasKind::Multi) + } + /// Get the effective target UIDs for this alias. pub fn targets(&self) -> Result> { match self.kind { @@ -96,10 +104,10 @@ impl Alias { } self.current_uid = Some(new_target); self.generation += 1; - self.updated_at = std::time::SystemTime::now() + self.updated_at = Some(std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() - .as_secs(); + .as_secs()); Ok(()) } @@ -110,10 +118,10 @@ impl Alias { } self.target_uids = Some(new_targets); self.generation += 1; - self.updated_at = std::time::SystemTime::now() + self.updated_at = Some(std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() - .as_secs(); + .as_secs()); Ok(()) } } @@ -132,6 +140,41 @@ impl AliasRegistry { } } + /// Create a new alias registry and load from task store. + pub async fn load_from_store(task_store: &dyn TaskStore) -> Result { + let registry = Self::new(); + registry.sync_from_store(task_store).await?; + Ok(registry) + } + + /// Sync aliases from the task store into memory. + pub async fn sync_from_store(&self, task_store: &dyn TaskStore) -> Result<()> { + let rows = task_store.list_aliases()?; + let mut aliases = self.aliases.write().await; + + // Clear and reload from store + aliases.clear(); + for row in rows { + let alias = Alias { + name: row.name.clone(), + kind: match row.kind.as_str() { + "single" => AliasKind::Single, + "multi" => AliasKind::Multi, + _ => return Err(MiroirError::InvalidState(format!("invalid alias kind: {}", row.kind))), + }, + current_uid: row.current_uid, + target_uids: row.target_uids, + generation: row.version as u64, + created_at: row.created_at as u64, + updated_at: None, // Task store doesn't track updated_at separately + }; + aliases.insert(row.name, alias); + } + + info!("loaded {} aliases from task store", aliases.len()); + Ok(()) + } + /// Resolve an index UID or alias name to concrete target UIDs. /// /// If `input` is not a known alias, returns it as-is (treat as concrete UID). @@ -148,6 +191,14 @@ impl AliasRegistry { self.aliases.read().await.contains_key(input) } + /// Check if an input is a multi-target alias (for write rejection). + pub async fn is_multi_target_alias(&self, input: &str) -> bool { + self.aliases.read().await + .get(input) + .map(|a| a.is_multi_target()) + .unwrap_or(false) + } + /// Get a single alias by name. pub async fn get(&self, name: &str) -> Option { self.aliases.read().await.get(name).cloned() diff --git a/crates/miroir-core/src/session_pinning.rs b/crates/miroir-core/src/session_pinning.rs index 9aa9c99..61c82fc 100644 --- a/crates/miroir-core/src/session_pinning.rs +++ b/crates/miroir-core/src/session_pinning.rs @@ -58,6 +58,18 @@ impl Default for SessionPinningConfig { } } +impl From for SessionPinningConfig { + fn from(config: crate::config::advanced::SessionPinningConfig) -> Self { + Self { + enabled: config.enabled, + ttl_seconds: config.ttl_seconds, + max_sessions: config.max_sessions, + wait_strategy: config.wait_strategy, + max_wait_ms: config.max_wait_ms, + } + } +} + /// Session state. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SessionState { diff --git a/crates/miroir-core/src/task_registry.rs b/crates/miroir-core/src/task_registry.rs index ec7adf1..4d5889c 100644 --- a/crates/miroir-core/src/task_registry.rs +++ b/crates/miroir-core/src/task_registry.rs @@ -936,7 +936,7 @@ impl TaskRegistryImpl { target_uids: row.target_uids, generation: row.version as u64, created_at: row.created_at as u64, - updated_at: row.created_at as u64, // Use created_at as fallback + updated_at: Some(row.created_at as u64), // Use created_at as fallback })) } #[cfg(feature = "redis-store")] @@ -952,7 +952,7 @@ impl TaskRegistryImpl { target_uids: row.target_uids, generation: row.version as u64, created_at: row.created_at as u64, - updated_at: row.created_at as u64, // Use created_at as fallback + updated_at: Some(row.created_at as u64), // Use created_at as fallback })) } } @@ -1025,7 +1025,7 @@ impl TaskRegistryImpl { target_uids: row.target_uids, generation: row.version as u64, created_at: row.created_at as u64, - updated_at: row.created_at as u64, + updated_at: Some(row.created_at as u64), }).collect()) } #[cfg(feature = "redis-store")] @@ -1042,7 +1042,7 @@ impl TaskRegistryImpl { target_uids: row.target_uids, generation: row.version as u64, created_at: row.created_at as u64, - updated_at: row.created_at as u64, + updated_at: Some(row.created_at as u64), }).collect()) } } diff --git a/crates/miroir-proxy/src/main.rs b/crates/miroir-proxy/src/main.rs index 17be369..97caa26 100644 --- a/crates/miroir-proxy/src/main.rs +++ b/crates/miroir-proxy/src/main.rs @@ -143,6 +143,8 @@ impl FromRef for admin_endpoints::AppState { previous_docs_migrated: state.admin.previous_docs_migrated.clone(), settings_broadcast: state.admin.settings_broadcast.clone(), drift_reconciler: state.admin.drift_reconciler.clone(), + session_manager: state.admin.session_manager.clone(), + alias_registry: state.admin.alias_registry.clone(), } } } diff --git a/crates/miroir-proxy/src/middleware.rs b/crates/miroir-proxy/src/middleware.rs index b2dd0d2..edbe1d0 100644 --- a/crates/miroir-proxy/src/middleware.rs +++ b/crates/miroir-proxy/src/middleware.rs @@ -59,6 +59,31 @@ impl RequestId { } } +/// Session ID wrapper type for read-your-writes session pinning (plan §13.6). +/// +/// Extracted from the `X-Miroir-Session` header and stored in request extensions. +/// Handlers can access it via `Request.extensions().get::()`. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct SessionId(pub String); + +impl SessionId { + /// Get the inner session ID string. + pub fn as_str(&self) -> &str { + &self.0 + } + + /// Parse a SessionId from a string. + /// + /// Accepts any non-empty string (client-provided UUID or identifier). + pub fn parse(s: String) -> Option { + if !s.is_empty() && s.len() <= 256 { + Some(Self(s)) + } else { + None + } + } +} + pub async fn request_id_middleware( mut req: Request, next: Next, @@ -90,6 +115,29 @@ pub async fn request_id_middleware( response } +/// Session pinning middleware (plan §13.6). +/// +/// Extracts the `X-Miroir-Session` header and stores it in request extensions +/// for handlers to access via `Request.extensions().get::()`. +pub async fn session_pinning_middleware( + mut req: Request, + next: Next, +) -> Response { + // Extract session ID from header if present + let session_id = req + .headers() + .get("x-miroir-session") + .and_then(|v| v.to_str().ok()) + .and_then(|s| SessionId::parse(s.to_string())); + + // Store in request extensions for handler access + if let Some(sid) = session_id { + req.extensions_mut().insert(sid); + } + + next.run(req).await +} + /// Telemetry state combining metrics and pod_id for middleware. #[derive(Clone)] @@ -223,6 +271,12 @@ pub struct Metrics { // ── §13.7 Alias metrics (always present) ── alias_resolutions_total: CounterVec, alias_flips_total: CounterVec, + + // ── §13.6 Session pinning metrics (always present) ── + session_active_count: Gauge, + session_pin_enforced_total: CounterVec, + session_wait_duration_seconds: Histogram, + session_wait_timeout_total: CounterVec, } impl Clone for Metrics { @@ -302,6 +356,10 @@ impl Clone for Metrics { settings_version: self.settings_version.clone(), alias_resolutions_total: self.alias_resolutions_total.clone(), alias_flips_total: self.alias_flips_total.clone(), + session_active_count: self.session_active_count.clone(), + session_pin_enforced_total: self.session_pin_enforced_total.clone(), + session_wait_duration_seconds: self.session_wait_duration_seconds.clone(), + session_wait_timeout_total: self.session_wait_timeout_total.clone(), } } } @@ -818,6 +876,31 @@ impl Metrics { reg!(alias_resolutions_total); reg!(alias_flips_total); + // ── §13.6 Session pinning metrics (always present) ── + let session_active_count = Gauge::new( + "miroir_session_active_count", + "Number of active sessions", + ).expect("create session_active_count"); + let session_pin_enforced_total = CounterVec::new( + Opts::new("miroir_session_pin_enforced_total", "Number of times session pin was enforced"), + &["strategy"], + ).expect("create session_pin_enforced_total"); + let session_wait_duration_seconds = Histogram::with_opts( + HistogramOpts::new( + "miroir_session_wait_duration_seconds", + "Duration of session pin wait operations", + ) + .buckets(vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]), + ).expect("create session_wait_duration_seconds"); + let session_wait_timeout_total = CounterVec::new( + Opts::new("miroir_session_wait_timeout_total", "Number of session pin wait timeouts"), + &["strategy"], + ).expect("create session_wait_timeout_total"); + reg!(session_active_count); + reg!(session_pin_enforced_total); + reg!(session_wait_duration_seconds); + reg!(session_wait_timeout_total); + Self { registry, request_duration, @@ -893,6 +976,10 @@ impl Metrics { settings_version, alias_resolutions_total, alias_flips_total, + session_active_count, + session_pin_enforced_total, + session_wait_duration_seconds, + session_wait_timeout_total, } } @@ -1524,6 +1611,24 @@ impl Metrics { self.alias_flips_total.with_label_values(&[alias]).inc(); } + // ── §13.6 Session pinning metrics ── + + pub fn set_session_active_count(&self, count: u64) { + self.session_active_count.set(count as f64); + } + + pub fn inc_session_pin_enforced(&self, strategy: &str) { + self.session_pin_enforced_total.with_label_values(&[strategy]).inc(); + } + + pub fn observe_session_wait_duration(&self, duration_seconds: f64) { + self.session_wait_duration_seconds.observe(duration_seconds); + } + + pub fn inc_session_wait_timeout(&self, strategy: &str) { + self.session_wait_timeout_total.with_label_values(&[strategy]).inc(); + } + pub fn registry(&self) -> &Registry { &self.registry } diff --git a/crates/miroir-proxy/src/routes/admin_endpoints.rs b/crates/miroir-proxy/src/routes/admin_endpoints.rs index db66c36..ba06f0b 100644 --- a/crates/miroir-proxy/src/routes/admin_endpoints.rs +++ b/crates/miroir-proxy/src/routes/admin_endpoints.rs @@ -324,6 +324,10 @@ pub struct AppState { pub settings_broadcast: Arc, /// Settings drift reconciler worker (§13.5). pub drift_reconciler: Option>, + /// Session pinning manager (§13.6). + pub session_manager: Arc, + /// Alias registry (§13.7). + pub alias_registry: Arc, } impl AppState { @@ -499,6 +503,17 @@ impl AppState { None }; + // Create session pinning manager (§13.6) + let session_manager = Arc::new(miroir_core::session_pinning::SessionManager::new( + miroir_core::session_pinning::SessionPinningConfig::from( + config.session_pinning.clone() + ), + )); + + // Create alias registry (§13.7) + // Note: Aliases are loaded asynchronously in background, not during initialization + let alias_registry = Arc::new(miroir_core::alias::AliasRegistry::new()); + Self { config: Arc::new(config), topology: topology_arc, @@ -519,6 +534,8 @@ impl AppState { previous_docs_migrated: Arc::new(std::sync::atomic::AtomicU64::new(0)), settings_broadcast, drift_reconciler, + session_manager, + alias_registry, } }