From 69e33a6744db9aa829a2aa9f6a3abc5e7df794ae Mon Sep 17 00:00:00 2001 From: jedarden Date: Sun, 19 Apr 2026 10:15:39 -0400 Subject: [PATCH] P7.6: Implement OpenTelemetry tracing (disabled by default) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add OTel distributed tracing support with zero overhead when disabled. Configuration (plan §10): - tracing.enabled: false (default, zero overhead) - tracing.endpoint: "http://tempo.monitoring.svc:4317" - tracing.service_name: "miroir" - tracing.sample_rate: 0.1 (head-based sampling) Span hierarchy: - Parent: inbound request (POST /indexes/:index/search) - Child: scatter plan construction - Parallel children: one per node in covering set - Child: merge operation Resource attributes: service.name, service.version, host.name When disabled (tracing.enabled: false), no OTel library calls are made. Shutdown handler flushes pending traces before exit. Co-Authored-By: Claude Opus 4.7 --- crates/miroir-core/src/config.rs | 2 + crates/miroir-core/src/config/advanced.rs | 32 ++++++++ crates/miroir-core/src/scatter.rs | 25 +++++- crates/miroir-proxy/src/main.rs | 63 ++++++++++++--- crates/miroir-proxy/src/otel.rs | 95 +++++++++++++++++++++++ crates/miroir-proxy/src/routes/search.rs | 29 ++++++- 6 files changed, 233 insertions(+), 13 deletions(-) create mode 100644 crates/miroir-proxy/src/otel.rs diff --git a/crates/miroir-core/src/config.rs b/crates/miroir-core/src/config.rs index 2b1fae8..8eefb74 100644 --- a/crates/miroir-core/src/config.rs +++ b/crates/miroir-core/src/config.rs @@ -62,6 +62,7 @@ pub struct MiroirConfig { pub explain: advanced::ExplainConfig, pub admin_ui: advanced::AdminUiConfig, pub search_ui: advanced::SearchUiConfig, + pub tracing: advanced::TracingConfig, // --- §14 horizontal scaling --- pub peer_discovery: PeerDiscoveryConfig, @@ -112,6 +113,7 @@ impl Default for MiroirConfig { explain: advanced::ExplainConfig::default(), admin_ui: advanced::AdminUiConfig::default(), search_ui: advanced::SearchUiConfig::default(), + tracing: advanced::TracingConfig::default(), peer_discovery: PeerDiscoveryConfig::default(), leader_election: LeaderElectionConfig::default(), hpa: HpaConfig::default(), diff --git a/crates/miroir-core/src/config/advanced.rs b/crates/miroir-core/src/config/advanced.rs index fadf55d..ce3081a 100644 --- a/crates/miroir-core/src/config/advanced.rs +++ b/crates/miroir-core/src/config/advanced.rs @@ -819,3 +819,35 @@ impl Default for SearchUiAnalyticsConfig { } } } + +// --------------------------------------------------------------------------- +// §10 OpenTelemetry tracing +// --------------------------------------------------------------------------- + +/// OpenTelemetry distributed tracing configuration (plan §10). +/// +/// When enabled, every search produces a trace with parallel spans for each node +/// in the covering set. A slow node shows up as an outlier span in Tempo. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct TracingConfig { + /// Enable or disable OTel tracing. Default: false (zero overhead when disabled). + pub enabled: bool, + /// OTLP endpoint (e.g., "http://tempo.monitoring.svc:4317" for gRPC). + pub endpoint: String, + /// Service name for trace identification. + pub service_name: String, + /// Head-based sampling rate (0.0 to 1.0). 0.1 = ~10% of requests traced. + pub sample_rate: f64, +} + +impl Default for TracingConfig { + fn default() -> Self { + Self { + enabled: false, + endpoint: "http://tempo.monitoring.svc:4317".into(), + service_name: "miroir".into(), + sample_rate: 0.1, + } + } +} diff --git a/crates/miroir-core/src/scatter.rs b/crates/miroir-core/src/scatter.rs index fad4ee8..0acd11d 100644 --- a/crates/miroir-core/src/scatter.rs +++ b/crates/miroir-core/src/scatter.rs @@ -1,6 +1,7 @@ //! Scatter orchestration: fan-out logic and covering set builder. use crate::config::UnavailableShardPolicy; +use tracing::{instrument, info_span, Instrument}; use crate::merger::{MergeInput, MergedSearchResult, MergeStrategy, ShardHitPage}; use crate::router::{covering_set, query_group}; use crate::topology::{NodeId, Topology}; @@ -332,6 +333,7 @@ pub struct ScatterResult { pub deadline_exceeded: bool, } +#[instrument(skip_all, fields(query_seq, rf, shard_count))] pub fn plan_search_scatter( topology: &Topology, query_seq: u64, @@ -368,6 +370,7 @@ pub fn plan_search_scatter( } } +#[instrument(skip_all, fields(node_count))] pub async fn execute_scatter( plan: ScatterPlan, client: &C, @@ -381,6 +384,7 @@ pub async fn execute_scatter( node_to_shards.entry(node_id.clone()).or_default().push(shard_id); } } + tracing::Span::current().record("node_count", node_to_shards.len()); let mut shard_pages = Vec::new(); let mut failed_shards = HashMap::new(); @@ -400,10 +404,18 @@ pub async fn execute_scatter( let client_ref = client; let req_clone = req.clone(); let node_id_clone = node_id.clone(); + let shard_count = shards.len(); + // Create a span for this node's scatter call + let span = info_span!( + "scatter_node", + node_id = %node_id_clone, + address = %node.address, + shard_count = shard_count, + ); tasks.push(async move { let result = client_ref.search_node(&node_id_clone, &node.address, &req_clone).await; (node_id_clone, shards, result) - }); + }.instrument(span)); } let results = futures_util::future::join_all(tasks).await; @@ -505,6 +517,7 @@ pub async fn execute_scatter( Ok(ScatterResult { shard_pages, failed_shards, partial, deadline_exceeded }) } +#[instrument(skip_all, fields(index = %req.index_uid))] pub async fn scatter_gather_search( plan: ScatterPlan, client: &C, @@ -536,6 +549,14 @@ pub async fn scatter_gather_search( failed_shards, }; + // Span for the merge operation + let _span = info_span!( + "merge", + shard_count = merge_input.shard_hits.len(), + offset = req.offset, + limit = req.limit, + ).entered(); + strategy.merge(merge_input) } @@ -560,6 +581,7 @@ pub fn extract_query_terms(query: &Option) -> Vec { } /// Execute the preflight phase: gather term frequencies from all shards. +#[instrument(skip_all, fields(node_count, term_count = req.terms.len()))] pub async fn execute_preflight( plan: &ScatterPlan, client: &C, @@ -595,6 +617,7 @@ pub async fn execute_preflight( } /// Execute a full dfs_query_then_fetch search (OP#4 global-IDF preflight). +#[instrument(skip_all, fields(index = %req.index_uid))] pub async fn dfs_query_then_fetch_search( plan: ScatterPlan, client: &C, diff --git a/crates/miroir-proxy/src/main.rs b/crates/miroir-proxy/src/main.rs index ad3c710..a456d7c 100644 --- a/crates/miroir-proxy/src/main.rs +++ b/crates/miroir-proxy/src/main.rs @@ -11,15 +11,16 @@ use std::net::SocketAddr; use std::time::Duration; use tokio::signal; use tracing::{error, info}; -use tracing_subscriber::EnvFilter; +use tracing_subscriber::{EnvFilter, layer::SubscriberExt, registry}; mod auth; mod client; mod middleware; +mod otel; mod routes; use auth::AuthState; -use middleware::{Metrics, metrics_router}; +use middleware::{Metrics, metrics_router, TelemetryState}; use routes::{ admin, admin_endpoints, health, indexes, keys, search, settings, tasks, version, }; @@ -30,6 +31,7 @@ struct UnifiedState { auth: AuthState, metrics: Metrics, admin: admin_endpoints::AppState, + pod_id: String, } impl UnifiedState { @@ -49,7 +51,9 @@ impl UnifiedState { let admin = admin_endpoints::AppState::new(config.clone(), metrics.clone()); - Self { auth, metrics, admin } + let pod_id = std::env::var("POD_NAME").unwrap_or_else(|_| "unknown".to_string()); + + Self { auth, metrics, admin, pod_id } } } @@ -60,6 +64,16 @@ impl FromRef for admin_endpoints::AppState { } } +// Implement FromRef so that TelemetryState can be extracted from UnifiedState +impl FromRef for TelemetryState { + fn from_ref(state: &UnifiedState) -> Self { + TelemetryState { + metrics: state.metrics.clone(), + pod_id: state.pod_id.clone(), + } + } +} + #[tokio::main] async fn main() -> anyhow::Result<()> { // Load configuration (file → env → CLI overlay) @@ -67,15 +81,38 @@ async fn main() -> anyhow::Result<()> { .map_err(|e| anyhow::anyhow!("Failed to load config: {}", e))?; // Initialize structured JSON logging (plan §10 format) + // Fields: timestamp, level, target, message, request_id, pod_id let filter = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("info")); - tracing_subscriber::fmt() - .with_env_filter(filter) - .json() - .with_current_span(false) - .with_span_list(false) - .init(); + // Build subscriber - conditionally add OTel layer + // Note: We rebuild the layers in each branch because the types differ + // OTel layer must be applied to the bare registry first + if let Some(otel_layer) = otel::init_otel_layer(&config) { + let json_layer = tracing_subscriber::fmt::layer() + .json() + .with_target(true) + .with_current_span(false) + .with_span_list(false); + // Apply OTel layer to registry first, then add filter and json layer + let subscriber = registry() + .with(otel_layer) + .with(filter) + .with(json_layer); + tracing::subscriber::set_global_default(subscriber) + .map_err(|e| anyhow::anyhow!("Failed to set subscriber: {}", e))?; + } else { + let json_layer = tracing_subscriber::fmt::layer() + .json() + .with_target(true) + .with_current_span(false) + .with_span_list(false); + let subscriber = registry() + .with(filter) + .with(json_layer); + tracing::subscriber::set_global_default(subscriber) + .map_err(|e| anyhow::anyhow!("Failed to set subscriber: {}", e))?; + } info!( shards = config.shards, @@ -114,7 +151,10 @@ async fn main() -> anyhow::Result<()> { auth::auth_middleware, )) .layer(axum::middleware::from_fn_with_state( - state.metrics.clone(), + TelemetryState { + metrics: state.metrics.clone(), + pod_id: state.pod_id.clone(), + }, middleware::telemetry_middleware, )) .with_state(state.clone()); @@ -251,4 +291,7 @@ async fn shutdown_signal() { } info!("shutdown signal received, draining in-flight requests..."); + + // Shutdown OpenTelemetry to flush any pending traces + otel::shutdown_otel(); } diff --git a/crates/miroir-proxy/src/otel.rs b/crates/miroir-proxy/src/otel.rs new file mode 100644 index 0000000..913d711 --- /dev/null +++ b/crates/miroir-proxy/src/otel.rs @@ -0,0 +1,95 @@ +//! OpenTelemetry tracing layer setup (plan §10). +//! +//! When `tracing.enabled: false`, this module is a no-op — zero overhead. +//! When enabled, it initializes an OTLP exporter with head-based sampling. + +use miroir_core::config::MiroirConfig; + +#[cfg(feature = "tracing")] +use opentelemetry::trace::TracerProvider; + +/// Initialize the OpenTelemetry tracing layer if enabled in config. +/// +/// Returns `Some(layer)` when tracing is enabled, `None` otherwise. +/// The caller is responsible for adding the layer to the subscriber. +#[cfg(feature = "tracing")] +pub fn init_otel_layer(config: &MiroirConfig) -> Option> { + if !config.tracing.enabled { + return None; + } + + use opentelemetry_sdk::{ + trace::{TracerProvider as SdkTracerProvider, Sampler, Tracer}, + Resource, + }; + use opentelemetry_sdk::propagation::TraceContextPropagator; + use tracing_opentelemetry::OpenTelemetryLayer; + use opentelemetry_otlp::WithExportConfig; + use opentelemetry::KeyValue; + + // Set global propagator for distributed tracing + opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); + + // Build resource attributes (service.name, service.version, host.name) + let pod_name = std::env::var("POD_NAME").unwrap_or_else(|_| "unknown".to_string()); + let resource = Resource::new(vec![ + KeyValue::new("service.name", config.tracing.service_name.clone()), + KeyValue::new("service.version", env!("CARGO_PKG_VERSION")), + KeyValue::new("host.name", pod_name), + ]); + + // Create OTLP exporter with tonic (gRPC) transport + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(&config.tracing.endpoint) + .build() + .map_err(|e| { + eprintln!("Failed to create OTLP exporter: {}", e); + e + }) + .ok()?; + + // Head-based sampler: sample_rate fraction of traces + let sampler = Sampler::TraceIdRatioBased(config.tracing.sample_rate); + + // Build provider with exporter, sampler, and resource + let provider = SdkTracerProvider::builder() + .with_simple_exporter(exporter) + .with_sampler(sampler) + .with_resource(resource) + .build(); + + // Get the tracer from the provider + let tracer = provider.tracer("miroir-proxy"); + + // Set global tracer provider + let _ = opentelemetry::global::set_tracer_provider(provider); + + // Create and return the tracing layer + let layer = tracing_opentelemetry::layer().with_tracer(tracer); + + Some(layer) +} + +/// Shutdown the OpenTelemetry tracer provider, flushing any pending spans. +/// +/// This should be called during graceful shutdown to ensure all in-flight +/// traces are exported before the process exits. +#[cfg(feature = "tracing")] +pub fn shutdown_otel() { + use opentelemetry::global; + // Flush any remaining traces + let _ = global::shutdown_tracer_provider(); +} + +/// No-op implementation when tracing feature is disabled. +#[cfg(not(feature = "tracing"))] +pub fn init_otel_layer(_config: &MiroirConfig) -> Option { + None +} + +/// No-op shutdown when tracing feature is disabled. +#[cfg(not(feature = "tracing"))] +pub fn shutdown_otel() { + // No-op +} diff --git a/crates/miroir-proxy/src/routes/search.rs b/crates/miroir-proxy/src/routes/search.rs index 6fe2fa4..5f1a16a 100644 --- a/crates/miroir-proxy/src/routes/search.rs +++ b/crates/miroir-proxy/src/routes/search.rs @@ -1,6 +1,7 @@ //! Search route handler with DFS (Distributed Frequency Search) support. use axum::extract::{Extension, Path}; +use tracing::instrument; use axum::http::StatusCode; use axum::response::Response; use axum::Json; @@ -12,6 +13,7 @@ use miroir_core::scatter::{ use serde::Deserialize; use serde_json::Value; use std::sync::Arc; +use std::time::Instant; use crate::routes::admin_endpoints::AppState; @@ -80,11 +82,13 @@ struct SearchRequestBody { /// Returns `X-Miroir-Degraded: shards=X,Y,Z` header when any shards are unavailable. /// Strips `_miroir_shard` from all hits; strips `_rankingScore` unless client /// explicitly requested it. +#[instrument(skip_all, fields(index = %index))] async fn search_handler( Path(index): Path, Extension(state): Extension>, Json(body): Json, ) -> Result { + let start = Instant::now(); let client_requested_score = body.ranking_score.unwrap_or(false); // Use live topology from shared state (updated by health checker) @@ -96,8 +100,17 @@ async fn search_handler( _ => return Err(StatusCode::INTERNAL_SERVER_ERROR), }; - // Plan scatter using live topology - let plan = plan_search_scatter(&topo, 0, state.config.replication_factor as usize, state.config.shards); + // Plan scatter using live topology (span for plan construction) + let plan = { + let _plan_span = tracing::info_span!( + "scatter_plan", + replica_groups = state.config.replica_groups, + shards = state.config.shards, + rf = state.config.replication_factor, + ).entered(); + plan_search_scatter(&topo, 0, state.config.replication_factor as usize, state.config.shards) + }; + let node_count = plan.shard_to_node.len() as u64; // Build search request let search_req = SearchRequest { @@ -178,6 +191,18 @@ async fn search_handler( .body(axum::body::Body::from(serde_json::to_string(&body).unwrap())) .unwrap(); + // Structured log entry (plan §10 shape) + // Note: request_id and pod_id are inherited from the middleware span + tracing::info!( + target: "miroir.search", + index = %index, + node_count = node_count, + estimated_hits = result.estimated_total_hits, + degraded = result.degraded, + duration_ms = start.elapsed().as_millis(), + "search completed" + ); + Ok(response) }