P7.6: Implement OpenTelemetry tracing (disabled by default)

Add OTel distributed tracing support with zero overhead when disabled.

Configuration (plan §10):
- tracing.enabled: false (default, zero overhead)
- tracing.endpoint: "http://tempo.monitoring.svc:4317"
- tracing.service_name: "miroir"
- tracing.sample_rate: 0.1 (head-based sampling)

Span hierarchy:
- Parent: inbound request (POST /indexes/:index/search)
- Child: scatter plan construction
- Parallel children: one per node in covering set
- Child: merge operation

Resource attributes: service.name, service.version, host.name

When disabled (tracing.enabled: false), no OTel library calls are made.
Shutdown handler flushes pending traces before exit.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-19 10:15:39 -04:00
parent 2dcfae8822
commit 69e33a6744
6 changed files with 233 additions and 13 deletions

View file

@ -62,6 +62,7 @@ pub struct MiroirConfig {
pub explain: advanced::ExplainConfig,
pub admin_ui: advanced::AdminUiConfig,
pub search_ui: advanced::SearchUiConfig,
pub tracing: advanced::TracingConfig,
// --- §14 horizontal scaling ---
pub peer_discovery: PeerDiscoveryConfig,
@ -112,6 +113,7 @@ impl Default for MiroirConfig {
explain: advanced::ExplainConfig::default(),
admin_ui: advanced::AdminUiConfig::default(),
search_ui: advanced::SearchUiConfig::default(),
tracing: advanced::TracingConfig::default(),
peer_discovery: PeerDiscoveryConfig::default(),
leader_election: LeaderElectionConfig::default(),
hpa: HpaConfig::default(),

View file

@ -819,3 +819,35 @@ impl Default for SearchUiAnalyticsConfig {
}
}
}
// ---------------------------------------------------------------------------
// §10 OpenTelemetry tracing
// ---------------------------------------------------------------------------
/// OpenTelemetry distributed tracing configuration (plan §10).
///
/// When enabled, every search produces a trace with parallel spans for each node
/// in the covering set. A slow node shows up as an outlier span in Tempo.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct TracingConfig {
/// Enable or disable OTel tracing. Default: false (zero overhead when disabled).
pub enabled: bool,
/// OTLP endpoint (e.g., "http://tempo.monitoring.svc:4317" for gRPC).
pub endpoint: String,
/// Service name for trace identification.
pub service_name: String,
/// Head-based sampling rate (0.0 to 1.0). 0.1 = ~10% of requests traced.
pub sample_rate: f64,
}
impl Default for TracingConfig {
fn default() -> Self {
Self {
enabled: false,
endpoint: "http://tempo.monitoring.svc:4317".into(),
service_name: "miroir".into(),
sample_rate: 0.1,
}
}
}

View file

@ -1,6 +1,7 @@
//! Scatter orchestration: fan-out logic and covering set builder.
use crate::config::UnavailableShardPolicy;
use tracing::{instrument, info_span, Instrument};
use crate::merger::{MergeInput, MergedSearchResult, MergeStrategy, ShardHitPage};
use crate::router::{covering_set, query_group};
use crate::topology::{NodeId, Topology};
@ -332,6 +333,7 @@ pub struct ScatterResult {
pub deadline_exceeded: bool,
}
#[instrument(skip_all, fields(query_seq, rf, shard_count))]
pub fn plan_search_scatter(
topology: &Topology,
query_seq: u64,
@ -368,6 +370,7 @@ pub fn plan_search_scatter(
}
}
#[instrument(skip_all, fields(node_count))]
pub async fn execute_scatter<C: NodeClient>(
plan: ScatterPlan,
client: &C,
@ -381,6 +384,7 @@ pub async fn execute_scatter<C: NodeClient>(
node_to_shards.entry(node_id.clone()).or_default().push(shard_id);
}
}
tracing::Span::current().record("node_count", node_to_shards.len());
let mut shard_pages = Vec::new();
let mut failed_shards = HashMap::new();
@ -400,10 +404,18 @@ pub async fn execute_scatter<C: NodeClient>(
let client_ref = client;
let req_clone = req.clone();
let node_id_clone = node_id.clone();
let shard_count = shards.len();
// Create a span for this node's scatter call
let span = info_span!(
"scatter_node",
node_id = %node_id_clone,
address = %node.address,
shard_count = shard_count,
);
tasks.push(async move {
let result = client_ref.search_node(&node_id_clone, &node.address, &req_clone).await;
(node_id_clone, shards, result)
});
}.instrument(span));
}
let results = futures_util::future::join_all(tasks).await;
@ -505,6 +517,7 @@ pub async fn execute_scatter<C: NodeClient>(
Ok(ScatterResult { shard_pages, failed_shards, partial, deadline_exceeded })
}
#[instrument(skip_all, fields(index = %req.index_uid))]
pub async fn scatter_gather_search<C: NodeClient>(
plan: ScatterPlan,
client: &C,
@ -536,6 +549,14 @@ pub async fn scatter_gather_search<C: NodeClient>(
failed_shards,
};
// Span for the merge operation
let _span = info_span!(
"merge",
shard_count = merge_input.shard_hits.len(),
offset = req.offset,
limit = req.limit,
).entered();
strategy.merge(merge_input)
}
@ -560,6 +581,7 @@ pub fn extract_query_terms(query: &Option<String>) -> Vec<String> {
}
/// Execute the preflight phase: gather term frequencies from all shards.
#[instrument(skip_all, fields(node_count, term_count = req.terms.len()))]
pub async fn execute_preflight<C: NodeClient>(
plan: &ScatterPlan,
client: &C,
@ -595,6 +617,7 @@ pub async fn execute_preflight<C: NodeClient>(
}
/// Execute a full dfs_query_then_fetch search (OP#4 global-IDF preflight).
#[instrument(skip_all, fields(index = %req.index_uid))]
pub async fn dfs_query_then_fetch_search<C: NodeClient>(
plan: ScatterPlan,
client: &C,

View file

@ -11,15 +11,16 @@ use std::net::SocketAddr;
use std::time::Duration;
use tokio::signal;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
use tracing_subscriber::{EnvFilter, layer::SubscriberExt, registry};
mod auth;
mod client;
mod middleware;
mod otel;
mod routes;
use auth::AuthState;
use middleware::{Metrics, metrics_router};
use middleware::{Metrics, metrics_router, TelemetryState};
use routes::{
admin, admin_endpoints, health, indexes, keys, search, settings, tasks, version,
};
@ -30,6 +31,7 @@ struct UnifiedState {
auth: AuthState,
metrics: Metrics,
admin: admin_endpoints::AppState,
pod_id: String,
}
impl UnifiedState {
@ -49,7 +51,9 @@ impl UnifiedState {
let admin = admin_endpoints::AppState::new(config.clone(), metrics.clone());
Self { auth, metrics, admin }
let pod_id = std::env::var("POD_NAME").unwrap_or_else(|_| "unknown".to_string());
Self { auth, metrics, admin, pod_id }
}
}
@ -60,6 +64,16 @@ impl FromRef<UnifiedState> for admin_endpoints::AppState {
}
}
// Implement FromRef so that TelemetryState can be extracted from UnifiedState
impl FromRef<UnifiedState> for TelemetryState {
fn from_ref(state: &UnifiedState) -> Self {
TelemetryState {
metrics: state.metrics.clone(),
pod_id: state.pod_id.clone(),
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Load configuration (file → env → CLI overlay)
@ -67,15 +81,38 @@ async fn main() -> anyhow::Result<()> {
.map_err(|e| anyhow::anyhow!("Failed to load config: {}", e))?;
// Initialize structured JSON logging (plan §10 format)
// Fields: timestamp, level, target, message, request_id, pod_id
let filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info"));
tracing_subscriber::fmt()
.with_env_filter(filter)
.json()
.with_current_span(false)
.with_span_list(false)
.init();
// Build subscriber - conditionally add OTel layer
// Note: We rebuild the layers in each branch because the types differ
// OTel layer must be applied to the bare registry first
if let Some(otel_layer) = otel::init_otel_layer(&config) {
let json_layer = tracing_subscriber::fmt::layer()
.json()
.with_target(true)
.with_current_span(false)
.with_span_list(false);
// Apply OTel layer to registry first, then add filter and json layer
let subscriber = registry()
.with(otel_layer)
.with(filter)
.with(json_layer);
tracing::subscriber::set_global_default(subscriber)
.map_err(|e| anyhow::anyhow!("Failed to set subscriber: {}", e))?;
} else {
let json_layer = tracing_subscriber::fmt::layer()
.json()
.with_target(true)
.with_current_span(false)
.with_span_list(false);
let subscriber = registry()
.with(filter)
.with(json_layer);
tracing::subscriber::set_global_default(subscriber)
.map_err(|e| anyhow::anyhow!("Failed to set subscriber: {}", e))?;
}
info!(
shards = config.shards,
@ -114,7 +151,10 @@ async fn main() -> anyhow::Result<()> {
auth::auth_middleware,
))
.layer(axum::middleware::from_fn_with_state(
state.metrics.clone(),
TelemetryState {
metrics: state.metrics.clone(),
pod_id: state.pod_id.clone(),
},
middleware::telemetry_middleware,
))
.with_state(state.clone());
@ -251,4 +291,7 @@ async fn shutdown_signal() {
}
info!("shutdown signal received, draining in-flight requests...");
// Shutdown OpenTelemetry to flush any pending traces
otel::shutdown_otel();
}

View file

@ -0,0 +1,95 @@
//! OpenTelemetry tracing layer setup (plan §10).
//!
//! When `tracing.enabled: false`, this module is a no-op — zero overhead.
//! When enabled, it initializes an OTLP exporter with head-based sampling.
use miroir_core::config::MiroirConfig;
#[cfg(feature = "tracing")]
use opentelemetry::trace::TracerProvider;
/// Initialize the OpenTelemetry tracing layer if enabled in config.
///
/// Returns `Some(layer)` when tracing is enabled, `None` otherwise.
/// The caller is responsible for adding the layer to the subscriber.
#[cfg(feature = "tracing")]
pub fn init_otel_layer(config: &MiroirConfig) -> Option<tracing_opentelemetry::OpenTelemetryLayer<tracing_subscriber::Registry, opentelemetry_sdk::trace::Tracer>> {
if !config.tracing.enabled {
return None;
}
use opentelemetry_sdk::{
trace::{TracerProvider as SdkTracerProvider, Sampler, Tracer},
Resource,
};
use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing_opentelemetry::OpenTelemetryLayer;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry::KeyValue;
// Set global propagator for distributed tracing
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
// Build resource attributes (service.name, service.version, host.name)
let pod_name = std::env::var("POD_NAME").unwrap_or_else(|_| "unknown".to_string());
let resource = Resource::new(vec![
KeyValue::new("service.name", config.tracing.service_name.clone()),
KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
KeyValue::new("host.name", pod_name),
]);
// Create OTLP exporter with tonic (gRPC) transport
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(&config.tracing.endpoint)
.build()
.map_err(|e| {
eprintln!("Failed to create OTLP exporter: {}", e);
e
})
.ok()?;
// Head-based sampler: sample_rate fraction of traces
let sampler = Sampler::TraceIdRatioBased(config.tracing.sample_rate);
// Build provider with exporter, sampler, and resource
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter)
.with_sampler(sampler)
.with_resource(resource)
.build();
// Get the tracer from the provider
let tracer = provider.tracer("miroir-proxy");
// Set global tracer provider
let _ = opentelemetry::global::set_tracer_provider(provider);
// Create and return the tracing layer
let layer = tracing_opentelemetry::layer().with_tracer(tracer);
Some(layer)
}
/// Shutdown the OpenTelemetry tracer provider, flushing any pending spans.
///
/// This should be called during graceful shutdown to ensure all in-flight
/// traces are exported before the process exits.
#[cfg(feature = "tracing")]
pub fn shutdown_otel() {
use opentelemetry::global;
// Flush any remaining traces
let _ = global::shutdown_tracer_provider();
}
/// No-op implementation when tracing feature is disabled.
#[cfg(not(feature = "tracing"))]
pub fn init_otel_layer(_config: &MiroirConfig) -> Option<tracing_subscriber::layer::Identity> {
None
}
/// No-op shutdown when tracing feature is disabled.
#[cfg(not(feature = "tracing"))]
pub fn shutdown_otel() {
// No-op
}

View file

@ -1,6 +1,7 @@
//! Search route handler with DFS (Distributed Frequency Search) support.
use axum::extract::{Extension, Path};
use tracing::instrument;
use axum::http::StatusCode;
use axum::response::Response;
use axum::Json;
@ -12,6 +13,7 @@ use miroir_core::scatter::{
use serde::Deserialize;
use serde_json::Value;
use std::sync::Arc;
use std::time::Instant;
use crate::routes::admin_endpoints::AppState;
@ -80,11 +82,13 @@ struct SearchRequestBody {
/// Returns `X-Miroir-Degraded: shards=X,Y,Z` header when any shards are unavailable.
/// Strips `_miroir_shard` from all hits; strips `_rankingScore` unless client
/// explicitly requested it.
#[instrument(skip_all, fields(index = %index))]
async fn search_handler(
Path(index): Path<String>,
Extension(state): Extension<Arc<AppState>>,
Json(body): Json<SearchRequestBody>,
) -> Result<Response, StatusCode> {
let start = Instant::now();
let client_requested_score = body.ranking_score.unwrap_or(false);
// Use live topology from shared state (updated by health checker)
@ -96,8 +100,17 @@ async fn search_handler(
_ => return Err(StatusCode::INTERNAL_SERVER_ERROR),
};
// Plan scatter using live topology
let plan = plan_search_scatter(&topo, 0, state.config.replication_factor as usize, state.config.shards);
// Plan scatter using live topology (span for plan construction)
let plan = {
let _plan_span = tracing::info_span!(
"scatter_plan",
replica_groups = state.config.replica_groups,
shards = state.config.shards,
rf = state.config.replication_factor,
).entered();
plan_search_scatter(&topo, 0, state.config.replication_factor as usize, state.config.shards)
};
let node_count = plan.shard_to_node.len() as u64;
// Build search request
let search_req = SearchRequest {
@ -178,6 +191,18 @@ async fn search_handler(
.body(axum::body::Body::from(serde_json::to_string(&body).unwrap()))
.unwrap();
// Structured log entry (plan §10 shape)
// Note: request_id and pod_id are inherited from the middleware span
tracing::info!(
target: "miroir.search",
index = %index,
node_count = node_count,
estimated_hits = result.estimated_total_hits,
degraded = result.degraded,
duration_ms = start.elapsed().as_millis(),
"search completed"
);
Ok(response)
}