feat(analytics): implement beacon idempotency and CDC integration (P5.21.f §13.21)

Implement analytics beacon endpoint with idempotency and CDC integration:

- Add `check_and_mark_beacon_event` to TaskStore trait for idempotency
- Implement for both Redis (HSET with 24h TTL) and SQLite (table with cleanup)
- Add JWT session extraction for session_id in beacon events
- Add server-side event_id generation fallback for old browsers (SHA256 hash)
- Integrate with CDC manager to publish AnalyticsEvents (click_through, latency)
- Respect cdc.emit_internal_writes for latency events
- Add Display impl for JwtValidationError for proper error logging
- Add jwt_decode_with_fallback helper for JWT rotation support
- Add unit tests for beacon idempotency (SQLite and Redis)

Closes: miroir-uhj.21.6
This commit is contained in:
jedarden 2026-05-25 02:48:55 -04:00
parent 451771382e
commit 17b25e4cf1
9 changed files with 399 additions and 19 deletions

View file

@ -3,7 +3,6 @@
//! Issues duplicate requests to alternate replicas when a primary request
//! exceeds the p95 latency threshold.
use crate::error::{MiroirError, Result};
use crate::router::assign_shard_in_group;
use crate::topology::{NodeId, Topology};
use serde::{Deserialize, Serialize};
@ -11,7 +10,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::{sleep, Instant};
/// Hedging configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]

View file

@ -23,14 +23,14 @@
//! committed phase.
use crate::config::LeaderElectionConfig;
use crate::task_store::{LeaderLeaseRow, TaskStore};
use crate::task_store::TaskStore;
use crate::Result;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::runtime::Handle;
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
use tracing::{debug, info, warn};
/// Callback type for recording leader election metrics.
///

View file

@ -8,7 +8,6 @@ use crate::error::{MiroirError, Result};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::future::Future;
use std::time::Duration;
/// Multi-search configuration (re-export of advanced config).

View file

@ -3,8 +3,7 @@
//! Replaces round-robin with latency-aware selection using EWMA-smoothed
//! metrics: latency p95, in-flight request count, and error rate.
use crate::error::{MiroirError, Result};
use crate::topology::{Group, NodeId};
use crate::topology::NodeId;
use rand::prelude::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

View file

@ -264,6 +264,13 @@ pub trait TaskStore: Send + Sync {
/// Delete old completed Mode B operations.
fn prune_mode_b_operations(&self, cutoff_ms: i64, batch_size: u32) -> Result<usize>;
// --- Table 15: search_ui_beacon (plan §13.21) ---
/// Check if a beacon event_id has already been processed (idempotency).
/// Returns true if the event_id is new (not yet processed), false if duplicate.
/// If new, marks it as processed with a 24-hour TTL.
fn check_and_mark_beacon_event(&self, index_uid: &str, event_id: &str) -> Result<bool>;
}
// --- Row types ---

View file

@ -2639,6 +2639,50 @@ impl TaskStore for RedisTaskStore {
Ok(deleted)
})
}
// --- Table 15: search_ui_beacon (plan §13.21) ---
/// Check if a beacon event_id has already been processed (idempotency).
/// Returns true if the event_id is new (not yet processed), false if duplicate.
/// If new, marks it as processed with a 24-hour TTL.
fn check_and_mark_beacon_event(&self, index_uid: &str, event_id: &str) -> Result<bool> {
let manager = self.pool.manager.clone();
let key_prefix = self.key_prefix.clone();
let index_uid = index_uid.to_string();
let event_id = event_id.to_string();
let key = format!("{}:search_ui_beacon:{}", key_prefix, index_uid);
let field = event_id.clone();
self.block_on(async move {
let mut conn = manager.lock().await;
// Check if event_id exists in the hash set
let exists: bool = conn
.hexists(&key, &field)
.await
.map_err(|e| MiroirError::Redis(e.to_string()))?;
if exists {
// Duplicate event - return false
Ok(false)
} else {
// New event - mark it with a 24-hour TTL
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let _: () = conn
.hset(&key, &field, now)
.await
.map_err(|e| MiroirError::Redis(e.to_string()))?;
let _: () = conn
.expire(&key, 24 * 3600)
.await
.map_err(|e| MiroirError::Redis(e.to_string()))?;
Ok(true)
}
})
}
}
// ---------------------------------------------------------------------------
@ -4915,4 +4959,66 @@ mod tests {
assert_eq!(canary.interval_s, 60);
assert!(canary.enabled);
}
// --- Table 15: search_ui_beacon (plan §13.21) ---
#[tokio::test]
async fn redis_beacon_idempotency_check() {
let store = test_store().await;
// First call should return true (new event)
let is_new = store
.check_and_mark_beacon_event("test-index", "event-123")
.await
.unwrap();
assert!(is_new, "First call should return true for new event");
// Second call should return false (duplicate)
let is_new = store
.check_and_mark_beacon_event("test-index", "event-123")
.await
.unwrap();
assert!(
!is_new,
"Second call should return false for duplicate event"
);
// Different event_id should return true
let is_new = store
.check_and_mark_beacon_event("test-index", "event-456")
.await
.unwrap();
assert!(is_new, "Different event_id should return true");
// Different index with same event_id should return true
let is_new = store
.check_and_mark_beacon_event("other-index", "event-123")
.await
.unwrap();
assert!(
is_new,
"Same event_id for different index should return true"
);
}
#[tokio::test]
async fn redis_beacon_ttl_cleanup() {
let store = test_store().await;
// Insert an event
store
.check_and_mark_beacon_event("test-index", "event-ttl")
.await
.unwrap();
// Verify duplicate is rejected immediately
let is_new = store
.check_and_mark_beacon_event("test-index", "event-ttl")
.await
.unwrap();
assert!(!is_new, "Duplicate should be rejected");
// Note: We can't easily test TTL expiration in unit tests without
// waiting 24 hours. The integration tests verify Redis TTL behavior.
}
}

View file

@ -1,6 +1,6 @@
use crate::schema_migrations::{build_registry, MigrationRegistry};
use crate::task_store::*;
use crate::Result;
use crate::{MiroirError, Result};
use rusqlite::{params, Connection, OptionalExtension};
use std::path::Path;
use std::sync::Mutex;
@ -1454,6 +1454,56 @@ impl TaskStore for SqliteTaskStore {
)?;
Ok(rows)
}
// --- Table 15: search_ui_beacon (plan §13.21) ---
/// Check if a beacon event_id has already been processed (idempotency).
/// Returns true if the event_id is new (not yet processed), false if duplicate.
/// If new, marks it as processed with a 24-hour TTL.
fn check_and_mark_beacon_event(&self, index_uid: &str, event_id: &str) -> Result<bool> {
let conn = self.conn.lock().unwrap();
// Create table if not exists (lazy migration for feature flag)
conn.execute(
"CREATE TABLE IF NOT EXISTS search_ui_beacon (
index_uid TEXT NOT NULL,
event_id TEXT NOT NULL,
created_at INTEGER NOT NULL,
PRIMARY KEY (index_uid, event_id)
)",
[],
)
.map_err(|e| {
MiroirError::TaskStore(format!("failed to create search_ui_beacon table: {e}"))
})?;
// Check if event_id exists
let mut stmt = conn.prepare_cached(
"SELECT 1 FROM search_ui_beacon WHERE index_uid = ?1 AND event_id = ?2",
)?;
let exists = stmt.exists(params![index_uid, event_id])?;
if exists {
Ok(false)
} else {
// Insert new event with current timestamp
let now = now_ms();
conn.execute(
"INSERT INTO search_ui_beacon (index_uid, event_id, created_at) VALUES (?1, ?2, ?3)",
params![index_uid, event_id, now],
)?;
// Cleanup old entries (older than 24 hours)
let cutoff = now - (24 * 3600 * 1000);
conn.execute(
"DELETE FROM search_ui_beacon WHERE created_at < ?1",
params![cutoff],
)?;
Ok(true)
}
}
}
fn now_ms() -> i64 {
@ -3057,4 +3107,59 @@ mod tests {
overhead_per_table / 1024
);
}
// --- Table 15: search_ui_beacon (plan §13.21) ---
#[test]
fn beacon_idempotency_check() {
let store = test_store();
// First call should return true (new event)
let is_new = store
.check_and_mark_beacon_event("test-index", "event-123")
.unwrap();
assert!(is_new, "First call should return true for new event");
// Second call should return false (duplicate)
let is_new = store
.check_and_mark_beacon_event("test-index", "event-123")
.unwrap();
assert!(
!is_new,
"Second call should return false for duplicate event"
);
// Different event_id should return true
let is_new = store
.check_and_mark_beacon_event("test-index", "event-456")
.unwrap();
assert!(is_new, "Different event_id should return true");
// Different index with same event_id should return true
let is_new = store
.check_and_mark_beacon_event("other-index", "event-123")
.unwrap();
assert!(
is_new,
"Same event_id for different index should return true"
);
}
#[test]
fn beacon_old_entries_cleanup() {
let store = test_store();
// Insert an event
store
.check_and_mark_beacon_event("test-index", "event-old")
.unwrap();
// The implementation should clean up old entries (> 24 hours)
// Since we can't easily mock time, we just verify the table exists
// and duplicates are rejected
let is_new = store
.check_and_mark_beacon_event("test-index", "event-old")
.unwrap();
assert!(!is_new, "Duplicate should be rejected");
}
}

View file

@ -110,7 +110,10 @@ pub fn jwt_encode(header: &JwtHeader, claims: &JwtClaims, secret: &[u8]) -> Resu
}
/// Decode and verify a JWT with the given secret. Returns (header, claims).
fn jwt_decode(token: &str, secret: &[u8]) -> Result<(JwtHeader, JwtClaims), JwtValidationError> {
pub fn jwt_decode(
token: &str,
secret: &[u8],
) -> Result<(JwtHeader, JwtClaims), JwtValidationError> {
let parts: Vec<&str> = token.split('.').collect();
if parts.len() != 3 {
return Err(JwtValidationError::Malformed);
@ -158,6 +161,35 @@ fn jwt_decode(token: &str, secret: &[u8]) -> Result<(JwtHeader, JwtClaims), JwtV
Ok((header, claims))
}
/// Decode and verify a JWT, trying both primary and previous secrets.
///
/// First tries the primary secret from the given environment variable name.
/// If that fails, tries the previous secret from the previous_env name.
/// Returns the claims on success, or the first error if both fail.
pub fn jwt_decode_with_fallback(
token: &str,
primary_env: &str,
previous_env: &str,
) -> Result<JwtClaims, JwtValidationError> {
// Try primary secret
if let Ok(primary_secret) = std::env::var(primary_env) {
if let Ok((_, claims)) = jwt_decode(token, primary_secret.as_bytes()) {
return Ok(claims);
}
}
// Try previous secret
if let Ok(previous_secret) = std::env::var(previous_env) {
if let Ok((_, claims)) = jwt_decode(token, previous_secret.as_bytes()) {
return Ok(claims);
}
}
// Both failed - return error from primary attempt
let primary_secret = std::env::var(primary_env).unwrap_or_default();
jwt_decode(token, primary_secret.as_bytes()).map(|(_, claims)| claims)
}
// ---------------------------------------------------------------------------
// Auth state (shared via axum State)
// ---------------------------------------------------------------------------
@ -274,6 +306,20 @@ pub enum JwtValidationError {
ScopeDenied,
}
impl std::fmt::Display for JwtValidationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
JwtValidationError::Malformed => write!(f, "malformed JWT token"),
JwtValidationError::InvalidSignature => write!(f, "invalid JWT signature"),
JwtValidationError::Expired => write!(f, "JWT token expired"),
JwtValidationError::PreviousSecretEmpty => write!(f, "previous JWT secret is empty"),
JwtValidationError::ScopeDenied => write!(f, "JWT scope denied"),
}
}
}
impl std::error::Error for JwtValidationError {}
// ---------------------------------------------------------------------------
// CSRF token generation (plan §9)
// ---------------------------------------------------------------------------

View file

@ -13,17 +13,21 @@ use axum::{
Router,
};
use miroir_core::{
cdc::AnalyticsEvent,
config::advanced::{SearchUiAuthConfig, SearchUiConfig},
task_store::{SearchUiScopedKey, TaskStore},
};
use sha2::{Digest, Sha256};
use crate::auth::{
build_csp_header, jwt_decode_with_fallback, jwt_encode, JwtClaims, JwtHeader, KID_PRIMARY,
};
use crate::error_response::ErrorResponse;
use rust_embed::RustEmbed as Embed;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::{debug, info, warn};
use crate::auth::{build_csp_header, jwt_encode, JwtClaims, JwtHeader, KID_PRIMARY};
use crate::routes::indexes::MeilisearchClient;
use crate::scoped_key_rotation::mint_scoped_key;
@ -458,25 +462,96 @@ pub async fn update_config(
/// Analytics beacon endpoint (plan §13.21).
///
/// Idempotent via client-generated event_id. Duplicate events are ignored.
/// Falls back to server-side event_id generation for old browsers.
pub async fn beacon(
Path(index_uid): Path<String>,
State(state): State<AppState>,
headers: HeaderMap,
Json(mut beacon): Json<BeaconRequest>,
) -> Result<impl IntoResponse, ErrorResponse> {
let _ = &state.config;
let config = &state.config;
// Validate event_id is present
// Extract JWT to get session_id (plan §13.21)
let session_id = if let Some(auth_header) = headers.get("authorization") {
let auth_str = auth_header.to_str().unwrap_or("");
if let Some(token) = auth_str.strip_prefix("Bearer ") {
// Decode JWT to extract session_id (sub claim)
match jwt_decode_with_fallback(
token,
config.search_ui.auth.jwt_secret_env.as_str(),
config.search_ui.auth.jwt_secret_previous_env.as_str(),
) {
Ok(claims) => claims.sub.clone(),
Err(e) => {
debug!(
error = %e,
"failed to decode JWT for beacon, using fallback session_id"
);
// Fallback: generate a session_id from the token itself
let hash = Sha256::digest(token.as_bytes());
let hash_hex = hex::encode(&hash[..16]);
format!("anon:{}", hash_hex)
}
}
} else {
"anonymous".to_string()
}
} else {
"anonymous".to_string()
};
// Server-side event_id generation fallback for old browsers (plan §13.21)
// If client didn't provide event_id, generate deterministic hash
if beacon.event_id.is_empty() {
return Err(ErrorResponse::invalid_request(
"event_id is required".to_string(),
));
let mut hasher = Sha256::new();
hasher.update(session_id.as_bytes());
if let Some(ref query) = beacon.query {
hasher.update(query.as_bytes());
}
if let Some(ref result_id) = beacon.document_id {
hasher.update(result_id.as_bytes());
}
if let Some(ref position) = beacon.position {
hasher.update(position.to_be_bytes());
}
// Add minute bucket for latency events
if beacon.event_type == "latency" {
if let Some(ref latency_ms) = beacon.latency_ms {
let minute_bucket = latency_ms / 60000; // 60 second buckets
hasher.update(minute_bucket.to_be_bytes());
}
}
let hash = hasher.finalize();
beacon.event_id = hex::encode(&hash[..16]);
debug!(
index = %index_uid,
event_type = %beacon.event_type,
generated_event_id = %beacon.event_id,
"generated server-side event_id for old browser"
);
} else {
// Normalize event_id
beacon.event_id = beacon.event_id.trim().to_string();
}
// Normalize event_id
beacon.event_id = beacon.event_id.trim().to_string();
// Idempotency check: skip if event_id was already processed (plan §13.21)
if let Some(redis_store) = &state.redis_store {
let is_new = redis_store
.check_and_mark_beacon_event(&index_uid, &beacon.event_id)
.map_err(|e| {
ErrorResponse::internal_error(format!("beacon idempotency check failed: {e}"))
})?;
// TODO: Implement idempotency check via Redis
// TODO: Publish to CDC sink if analytics enabled
if !is_new {
debug!(
index = %index_uid,
event_type = %beacon.event_type,
event_id = %beacon.event_id,
"duplicate beacon event ignored"
);
return Ok(StatusCode::ACCEPTED);
}
}
debug!(
index = %index_uid,
@ -485,6 +560,51 @@ pub async fn beacon(
"received analytics beacon"
);
// Publish to CDC if analytics is enabled (plan §13.21)
if config.search_ui.analytics.enabled {
if let Some(cdc_manager) = &state.cdc_manager {
let event = AnalyticsEvent {
event_type: beacon.event_type.clone(),
event_id: beacon.event_id.clone(),
session_id: session_id.clone(),
index: index_uid.clone(),
query: beacon.query.clone(),
result_id: beacon.document_id.clone(),
result_position: beacon.position,
latency_ms: beacon.latency_ms,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
};
// Latency events are subject to cdc.emit_internal_writes (plan §13.21)
let is_latency = beacon.event_type == "latency" || beacon.event_type == "search";
let should_emit = if is_latency {
config.cdc.emit_internal_writes
} else {
true // Click-through events are always emitted
};
if should_emit {
cdc_manager.publish_analytics(event).await;
debug!(
index = %index_uid,
event_type = %beacon.event_type,
event_id = %beacon.event_id,
"published analytics event to CDC"
);
} else {
debug!(
index = %index_uid,
event_type = %beacon.event_type,
event_id = %beacon.event_id,
"skipped latency event (emit_internal_writes=false)"
);
}
}
}
Ok(StatusCode::ACCEPTED)
}