diff --git a/crates/miroir-core/src/hedging.rs b/crates/miroir-core/src/hedging.rs index efd7984..f42d9fb 100644 --- a/crates/miroir-core/src/hedging.rs +++ b/crates/miroir-core/src/hedging.rs @@ -3,7 +3,6 @@ //! Issues duplicate requests to alternate replicas when a primary request //! exceeds the p95 latency threshold. -use crate::error::{MiroirError, Result}; use crate::router::assign_shard_in_group; use crate::topology::{NodeId, Topology}; use serde::{Deserialize, Serialize}; @@ -11,7 +10,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::RwLock; -use tokio::time::{sleep, Instant}; /// Hedging configuration. #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/miroir-core/src/leader_election/mod.rs b/crates/miroir-core/src/leader_election/mod.rs index 05c15bc..0b88bd7 100644 --- a/crates/miroir-core/src/leader_election/mod.rs +++ b/crates/miroir-core/src/leader_election/mod.rs @@ -23,14 +23,14 @@ //! committed phase. use crate::config::LeaderElectionConfig; -use crate::task_store::{LeaderLeaseRow, TaskStore}; +use crate::task_store::TaskStore; use crate::Result; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::runtime::Handle; use tokio::sync::RwLock; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; /// Callback type for recording leader election metrics. /// diff --git a/crates/miroir-core/src/multi_search.rs b/crates/miroir-core/src/multi_search.rs index c638aa4..e1de0fe 100644 --- a/crates/miroir-core/src/multi_search.rs +++ b/crates/miroir-core/src/multi_search.rs @@ -8,7 +8,6 @@ use crate::error::{MiroirError, Result}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; -use std::future::Future; use std::time::Duration; /// Multi-search configuration (re-export of advanced config). diff --git a/crates/miroir-core/src/replica_selection.rs b/crates/miroir-core/src/replica_selection.rs index f66a374..e6f2da5 100644 --- a/crates/miroir-core/src/replica_selection.rs +++ b/crates/miroir-core/src/replica_selection.rs @@ -3,8 +3,7 @@ //! Replaces round-robin with latency-aware selection using EWMA-smoothed //! metrics: latency p95, in-flight request count, and error rate. -use crate::error::{MiroirError, Result}; -use crate::topology::{Group, NodeId}; +use crate::topology::NodeId; use rand::prelude::*; use serde::{Deserialize, Serialize}; use std::collections::HashMap; diff --git a/crates/miroir-core/src/task_store/mod.rs b/crates/miroir-core/src/task_store/mod.rs index 4ac4c4c..161a5f5 100644 --- a/crates/miroir-core/src/task_store/mod.rs +++ b/crates/miroir-core/src/task_store/mod.rs @@ -264,6 +264,13 @@ pub trait TaskStore: Send + Sync { /// Delete old completed Mode B operations. fn prune_mode_b_operations(&self, cutoff_ms: i64, batch_size: u32) -> Result; + + // --- Table 15: search_ui_beacon (plan §13.21) --- + + /// Check if a beacon event_id has already been processed (idempotency). + /// Returns true if the event_id is new (not yet processed), false if duplicate. + /// If new, marks it as processed with a 24-hour TTL. + fn check_and_mark_beacon_event(&self, index_uid: &str, event_id: &str) -> Result; } // --- Row types --- diff --git a/crates/miroir-core/src/task_store/redis.rs b/crates/miroir-core/src/task_store/redis.rs index f7bfb5a..8ff603c 100644 --- a/crates/miroir-core/src/task_store/redis.rs +++ b/crates/miroir-core/src/task_store/redis.rs @@ -2639,6 +2639,50 @@ impl TaskStore for RedisTaskStore { Ok(deleted) }) } + + // --- Table 15: search_ui_beacon (plan §13.21) --- + + /// Check if a beacon event_id has already been processed (idempotency). + /// Returns true if the event_id is new (not yet processed), false if duplicate. + /// If new, marks it as processed with a 24-hour TTL. + fn check_and_mark_beacon_event(&self, index_uid: &str, event_id: &str) -> Result { + let manager = self.pool.manager.clone(); + let key_prefix = self.key_prefix.clone(); + let index_uid = index_uid.to_string(); + let event_id = event_id.to_string(); + let key = format!("{}:search_ui_beacon:{}", key_prefix, index_uid); + let field = event_id.clone(); + + self.block_on(async move { + let mut conn = manager.lock().await; + + // Check if event_id exists in the hash set + let exists: bool = conn + .hexists(&key, &field) + .await + .map_err(|e| MiroirError::Redis(e.to_string()))?; + + if exists { + // Duplicate event - return false + Ok(false) + } else { + // New event - mark it with a 24-hour TTL + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + let _: () = conn + .hset(&key, &field, now) + .await + .map_err(|e| MiroirError::Redis(e.to_string()))?; + let _: () = conn + .expire(&key, 24 * 3600) + .await + .map_err(|e| MiroirError::Redis(e.to_string()))?; + Ok(true) + } + }) + } } // --------------------------------------------------------------------------- @@ -4915,4 +4959,66 @@ mod tests { assert_eq!(canary.interval_s, 60); assert!(canary.enabled); } + + // --- Table 15: search_ui_beacon (plan §13.21) --- + + #[tokio::test] + async fn redis_beacon_idempotency_check() { + let store = test_store().await; + + // First call should return true (new event) + let is_new = store + .check_and_mark_beacon_event("test-index", "event-123") + .await + .unwrap(); + assert!(is_new, "First call should return true for new event"); + + // Second call should return false (duplicate) + let is_new = store + .check_and_mark_beacon_event("test-index", "event-123") + .await + .unwrap(); + assert!( + !is_new, + "Second call should return false for duplicate event" + ); + + // Different event_id should return true + let is_new = store + .check_and_mark_beacon_event("test-index", "event-456") + .await + .unwrap(); + assert!(is_new, "Different event_id should return true"); + + // Different index with same event_id should return true + let is_new = store + .check_and_mark_beacon_event("other-index", "event-123") + .await + .unwrap(); + assert!( + is_new, + "Same event_id for different index should return true" + ); + } + + #[tokio::test] + async fn redis_beacon_ttl_cleanup() { + let store = test_store().await; + + // Insert an event + store + .check_and_mark_beacon_event("test-index", "event-ttl") + .await + .unwrap(); + + // Verify duplicate is rejected immediately + let is_new = store + .check_and_mark_beacon_event("test-index", "event-ttl") + .await + .unwrap(); + assert!(!is_new, "Duplicate should be rejected"); + + // Note: We can't easily test TTL expiration in unit tests without + // waiting 24 hours. The integration tests verify Redis TTL behavior. + } } diff --git a/crates/miroir-core/src/task_store/sqlite.rs b/crates/miroir-core/src/task_store/sqlite.rs index ad5d306..69c8572 100644 --- a/crates/miroir-core/src/task_store/sqlite.rs +++ b/crates/miroir-core/src/task_store/sqlite.rs @@ -1,6 +1,6 @@ use crate::schema_migrations::{build_registry, MigrationRegistry}; use crate::task_store::*; -use crate::Result; +use crate::{MiroirError, Result}; use rusqlite::{params, Connection, OptionalExtension}; use std::path::Path; use std::sync::Mutex; @@ -1454,6 +1454,56 @@ impl TaskStore for SqliteTaskStore { )?; Ok(rows) } + + // --- Table 15: search_ui_beacon (plan §13.21) --- + + /// Check if a beacon event_id has already been processed (idempotency). + /// Returns true if the event_id is new (not yet processed), false if duplicate. + /// If new, marks it as processed with a 24-hour TTL. + fn check_and_mark_beacon_event(&self, index_uid: &str, event_id: &str) -> Result { + let conn = self.conn.lock().unwrap(); + + // Create table if not exists (lazy migration for feature flag) + conn.execute( + "CREATE TABLE IF NOT EXISTS search_ui_beacon ( + index_uid TEXT NOT NULL, + event_id TEXT NOT NULL, + created_at INTEGER NOT NULL, + PRIMARY KEY (index_uid, event_id) + )", + [], + ) + .map_err(|e| { + MiroirError::TaskStore(format!("failed to create search_ui_beacon table: {e}")) + })?; + + // Check if event_id exists + let mut stmt = conn.prepare_cached( + "SELECT 1 FROM search_ui_beacon WHERE index_uid = ?1 AND event_id = ?2", + )?; + + let exists = stmt.exists(params![index_uid, event_id])?; + + if exists { + Ok(false) + } else { + // Insert new event with current timestamp + let now = now_ms(); + conn.execute( + "INSERT INTO search_ui_beacon (index_uid, event_id, created_at) VALUES (?1, ?2, ?3)", + params![index_uid, event_id, now], + )?; + + // Cleanup old entries (older than 24 hours) + let cutoff = now - (24 * 3600 * 1000); + conn.execute( + "DELETE FROM search_ui_beacon WHERE created_at < ?1", + params![cutoff], + )?; + + Ok(true) + } + } } fn now_ms() -> i64 { @@ -3057,4 +3107,59 @@ mod tests { overhead_per_table / 1024 ); } + + // --- Table 15: search_ui_beacon (plan §13.21) --- + + #[test] + fn beacon_idempotency_check() { + let store = test_store(); + + // First call should return true (new event) + let is_new = store + .check_and_mark_beacon_event("test-index", "event-123") + .unwrap(); + assert!(is_new, "First call should return true for new event"); + + // Second call should return false (duplicate) + let is_new = store + .check_and_mark_beacon_event("test-index", "event-123") + .unwrap(); + assert!( + !is_new, + "Second call should return false for duplicate event" + ); + + // Different event_id should return true + let is_new = store + .check_and_mark_beacon_event("test-index", "event-456") + .unwrap(); + assert!(is_new, "Different event_id should return true"); + + // Different index with same event_id should return true + let is_new = store + .check_and_mark_beacon_event("other-index", "event-123") + .unwrap(); + assert!( + is_new, + "Same event_id for different index should return true" + ); + } + + #[test] + fn beacon_old_entries_cleanup() { + let store = test_store(); + + // Insert an event + store + .check_and_mark_beacon_event("test-index", "event-old") + .unwrap(); + + // The implementation should clean up old entries (> 24 hours) + // Since we can't easily mock time, we just verify the table exists + // and duplicates are rejected + let is_new = store + .check_and_mark_beacon_event("test-index", "event-old") + .unwrap(); + assert!(!is_new, "Duplicate should be rejected"); + } } diff --git a/crates/miroir-proxy/src/auth.rs b/crates/miroir-proxy/src/auth.rs index 05068af..7546004 100644 --- a/crates/miroir-proxy/src/auth.rs +++ b/crates/miroir-proxy/src/auth.rs @@ -110,7 +110,10 @@ pub fn jwt_encode(header: &JwtHeader, claims: &JwtClaims, secret: &[u8]) -> Resu } /// Decode and verify a JWT with the given secret. Returns (header, claims). -fn jwt_decode(token: &str, secret: &[u8]) -> Result<(JwtHeader, JwtClaims), JwtValidationError> { +pub fn jwt_decode( + token: &str, + secret: &[u8], +) -> Result<(JwtHeader, JwtClaims), JwtValidationError> { let parts: Vec<&str> = token.split('.').collect(); if parts.len() != 3 { return Err(JwtValidationError::Malformed); @@ -158,6 +161,35 @@ fn jwt_decode(token: &str, secret: &[u8]) -> Result<(JwtHeader, JwtClaims), JwtV Ok((header, claims)) } +/// Decode and verify a JWT, trying both primary and previous secrets. +/// +/// First tries the primary secret from the given environment variable name. +/// If that fails, tries the previous secret from the previous_env name. +/// Returns the claims on success, or the first error if both fail. +pub fn jwt_decode_with_fallback( + token: &str, + primary_env: &str, + previous_env: &str, +) -> Result { + // Try primary secret + if let Ok(primary_secret) = std::env::var(primary_env) { + if let Ok((_, claims)) = jwt_decode(token, primary_secret.as_bytes()) { + return Ok(claims); + } + } + + // Try previous secret + if let Ok(previous_secret) = std::env::var(previous_env) { + if let Ok((_, claims)) = jwt_decode(token, previous_secret.as_bytes()) { + return Ok(claims); + } + } + + // Both failed - return error from primary attempt + let primary_secret = std::env::var(primary_env).unwrap_or_default(); + jwt_decode(token, primary_secret.as_bytes()).map(|(_, claims)| claims) +} + // --------------------------------------------------------------------------- // Auth state (shared via axum State) // --------------------------------------------------------------------------- @@ -274,6 +306,20 @@ pub enum JwtValidationError { ScopeDenied, } +impl std::fmt::Display for JwtValidationError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + JwtValidationError::Malformed => write!(f, "malformed JWT token"), + JwtValidationError::InvalidSignature => write!(f, "invalid JWT signature"), + JwtValidationError::Expired => write!(f, "JWT token expired"), + JwtValidationError::PreviousSecretEmpty => write!(f, "previous JWT secret is empty"), + JwtValidationError::ScopeDenied => write!(f, "JWT scope denied"), + } + } +} + +impl std::error::Error for JwtValidationError {} + // --------------------------------------------------------------------------- // CSRF token generation (plan §9) // --------------------------------------------------------------------------- diff --git a/crates/miroir-proxy/src/routes/search_ui.rs b/crates/miroir-proxy/src/routes/search_ui.rs index 79a0382..63a215a 100644 --- a/crates/miroir-proxy/src/routes/search_ui.rs +++ b/crates/miroir-proxy/src/routes/search_ui.rs @@ -13,17 +13,21 @@ use axum::{ Router, }; use miroir_core::{ + cdc::AnalyticsEvent, config::advanced::{SearchUiAuthConfig, SearchUiConfig}, task_store::{SearchUiScopedKey, TaskStore}, }; +use sha2::{Digest, Sha256}; +use crate::auth::{ + build_csp_header, jwt_decode_with_fallback, jwt_encode, JwtClaims, JwtHeader, KID_PRIMARY, +}; use crate::error_response::ErrorResponse; use rust_embed::RustEmbed as Embed; use serde::{Deserialize, Serialize}; use std::sync::Arc; use tracing::{debug, info, warn}; -use crate::auth::{build_csp_header, jwt_encode, JwtClaims, JwtHeader, KID_PRIMARY}; use crate::routes::indexes::MeilisearchClient; use crate::scoped_key_rotation::mint_scoped_key; @@ -458,25 +462,96 @@ pub async fn update_config( /// Analytics beacon endpoint (plan §13.21). /// /// Idempotent via client-generated event_id. Duplicate events are ignored. +/// Falls back to server-side event_id generation for old browsers. pub async fn beacon( Path(index_uid): Path, State(state): State, + headers: HeaderMap, Json(mut beacon): Json, ) -> Result { - let _ = &state.config; + let config = &state.config; - // Validate event_id is present + // Extract JWT to get session_id (plan §13.21) + let session_id = if let Some(auth_header) = headers.get("authorization") { + let auth_str = auth_header.to_str().unwrap_or(""); + if let Some(token) = auth_str.strip_prefix("Bearer ") { + // Decode JWT to extract session_id (sub claim) + match jwt_decode_with_fallback( + token, + config.search_ui.auth.jwt_secret_env.as_str(), + config.search_ui.auth.jwt_secret_previous_env.as_str(), + ) { + Ok(claims) => claims.sub.clone(), + Err(e) => { + debug!( + error = %e, + "failed to decode JWT for beacon, using fallback session_id" + ); + // Fallback: generate a session_id from the token itself + let hash = Sha256::digest(token.as_bytes()); + let hash_hex = hex::encode(&hash[..16]); + format!("anon:{}", hash_hex) + } + } + } else { + "anonymous".to_string() + } + } else { + "anonymous".to_string() + }; + + // Server-side event_id generation fallback for old browsers (plan §13.21) + // If client didn't provide event_id, generate deterministic hash if beacon.event_id.is_empty() { - return Err(ErrorResponse::invalid_request( - "event_id is required".to_string(), - )); + let mut hasher = Sha256::new(); + hasher.update(session_id.as_bytes()); + if let Some(ref query) = beacon.query { + hasher.update(query.as_bytes()); + } + if let Some(ref result_id) = beacon.document_id { + hasher.update(result_id.as_bytes()); + } + if let Some(ref position) = beacon.position { + hasher.update(position.to_be_bytes()); + } + // Add minute bucket for latency events + if beacon.event_type == "latency" { + if let Some(ref latency_ms) = beacon.latency_ms { + let minute_bucket = latency_ms / 60000; // 60 second buckets + hasher.update(minute_bucket.to_be_bytes()); + } + } + let hash = hasher.finalize(); + beacon.event_id = hex::encode(&hash[..16]); + debug!( + index = %index_uid, + event_type = %beacon.event_type, + generated_event_id = %beacon.event_id, + "generated server-side event_id for old browser" + ); + } else { + // Normalize event_id + beacon.event_id = beacon.event_id.trim().to_string(); } - // Normalize event_id - beacon.event_id = beacon.event_id.trim().to_string(); + // Idempotency check: skip if event_id was already processed (plan §13.21) + if let Some(redis_store) = &state.redis_store { + let is_new = redis_store + .check_and_mark_beacon_event(&index_uid, &beacon.event_id) + .map_err(|e| { + ErrorResponse::internal_error(format!("beacon idempotency check failed: {e}")) + })?; - // TODO: Implement idempotency check via Redis - // TODO: Publish to CDC sink if analytics enabled + if !is_new { + debug!( + index = %index_uid, + event_type = %beacon.event_type, + event_id = %beacon.event_id, + "duplicate beacon event ignored" + ); + return Ok(StatusCode::ACCEPTED); + } + } debug!( index = %index_uid, @@ -485,6 +560,51 @@ pub async fn beacon( "received analytics beacon" ); + // Publish to CDC if analytics is enabled (plan §13.21) + if config.search_ui.analytics.enabled { + if let Some(cdc_manager) = &state.cdc_manager { + let event = AnalyticsEvent { + event_type: beacon.event_type.clone(), + event_id: beacon.event_id.clone(), + session_id: session_id.clone(), + index: index_uid.clone(), + query: beacon.query.clone(), + result_id: beacon.document_id.clone(), + result_position: beacon.position, + latency_ms: beacon.latency_ms, + timestamp: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + }; + + // Latency events are subject to cdc.emit_internal_writes (plan §13.21) + let is_latency = beacon.event_type == "latency" || beacon.event_type == "search"; + let should_emit = if is_latency { + config.cdc.emit_internal_writes + } else { + true // Click-through events are always emitted + }; + + if should_emit { + cdc_manager.publish_analytics(event).await; + debug!( + index = %index_uid, + event_type = %beacon.event_type, + event_id = %beacon.event_id, + "published analytics event to CDC" + ); + } else { + debug!( + index = %index_uid, + event_type = %beacon.event_type, + event_id = %beacon.event_id, + "skipped latency event (emit_internal_writes=false)" + ); + } + } + } + Ok(StatusCode::ACCEPTED) }