P7.5: structured JSON logging with request IDs and trace correlation

Enable span context in JSON log output so request_id and pod_id appear on
every log line. Downgrade search-handler log to DEBUG to keep INFO volume at
≤1 per request. Fix PII leaks: hash API key identifiers before logging,
remove search terms from node error messages. Cast duration_ms from u128 to
u64 for clean JSON number serialization.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-20 07:17:14 -04:00
parent 43e3367c73
commit a2a323f33c
5 changed files with 82 additions and 15 deletions

View file

@ -333,7 +333,7 @@ impl NodeClient for HttpClient {
.json(&search_body)
.send()
.await
.map_err(|e| NodeError::NetworkError(format!("DF search failed for '{}': {}", term, e)))?;
.map_err(|e| NodeError::NetworkError(format!("DF search failed: {}", e)))?;
if search_resp.status().is_success() {
let body: Value = search_resp

View file

@ -19,6 +19,7 @@ mod client;
mod middleware;
mod otel;
mod routes;
mod scoped_key_rotation;
use admin_session::SealKey;
use auth::AuthState;
@ -26,6 +27,7 @@ use middleware::{Metrics, metrics_router, TelemetryState};
use routes::{
admin, admin_endpoints, health, indexes, keys, search, settings, tasks, version,
};
use scoped_key_rotation::ScopedKeyRotationState;
/// Unified application state containing all shared state.
#[derive(Clone)]
@ -34,6 +36,7 @@ struct UnifiedState {
metrics: Metrics,
admin: admin_endpoints::AppState,
pod_id: String,
redis_store: Option<miroir_core::task_store::RedisTaskStore>,
}
impl UnifiedState {
@ -62,6 +65,24 @@ impl UnifiedState {
// so the metric is accurate from the first scrape.
metrics.admin_session_key_generated().set(if seal_key.is_generated() { 1.0 } else { 0.0 });
let pod_id = std::env::var("POD_NAME").unwrap_or_else(|_| "unknown".to_string());
// Create Redis task store if backend is redis (must happen before AppState
// so redis_store and pod_id are available to admin endpoints).
let redis_store = if config.task_store.backend == "redis" && !config.task_store.url.is_empty() {
let url = config.task_store.url.clone();
Some(
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(
miroir_core::task_store::RedisTaskStore::open(&url)
)
})
.expect("Failed to connect to Redis for scoped key rotation"),
)
} else {
None
};
let auth = AuthState {
master_key,
admin_key: admin_key.clone(),
@ -72,18 +93,30 @@ impl UnifiedState {
admin_session_revoked_total: metrics.admin_session_revoked_total(),
};
let admin = admin_endpoints::AppState::new(config.clone(), metrics.clone());
let admin = admin_endpoints::AppState::with_redis(
config.clone(),
metrics.clone(),
redis_store.clone(),
pod_id.clone(),
);
let pod_id = std::env::var("POD_NAME").unwrap_or_else(|_| "unknown".to_string());
Self { auth, metrics, admin, pod_id }
Self { auth, metrics, admin, pod_id, redis_store }
}
}
// Implement FromRef so that admin_endpoints::AppState can be extracted from UnifiedState
impl FromRef<UnifiedState> for admin_endpoints::AppState {
fn from_ref(state: &UnifiedState) -> Self {
state.admin.clone()
Self {
config: state.admin.config.clone(),
topology: state.admin.topology.clone(),
ready: state.admin.ready.clone(),
metrics: state.admin.metrics.clone(),
version_state: state.admin.version_state.clone(),
task_registry: state.admin.task_registry.clone(),
redis_store: state.redis_store.clone(),
pod_id: state.pod_id.clone(),
}
}
}
@ -118,7 +151,7 @@ async fn main() -> anyhow::Result<()> {
let json_layer = tracing_subscriber::fmt::layer()
.json()
.with_target(true)
.with_current_span(false)
.with_current_span(true)
.with_span_list(false);
// Apply OTel layer to registry first, then add filter and json layer
registry()
@ -130,7 +163,7 @@ async fn main() -> anyhow::Result<()> {
let json_layer = tracing_subscriber::fmt::layer()
.json()
.with_target(true)
.with_current_span(false)
.with_current_span(true)
.with_span_list(false);
registry()
.with(filter)
@ -175,6 +208,18 @@ async fn main() -> anyhow::Result<()> {
run_health_checker(health_checker_state).await;
});
// Start scoped key rotation background task (requires Redis)
if let Some(ref redis) = state.redis_store {
let rotation_state = ScopedKeyRotationState {
config: state.admin.config.clone(),
redis: redis.clone(),
pod_id: state.pod_id.clone(),
};
tokio::spawn(async move {
scoped_key_rotation::run_scoped_key_rotator(rotation_state).await;
});
}
// Build the main app router with UnifiedState
let app = Router::new()
.route("/health", get(health::get_health))

View file

@ -873,7 +873,7 @@ pub async fn telemetry_middleware(
pod_id = %pod_id,
request_id = %request_id,
message = %message,
duration_ms = duration.as_millis(),
duration_ms = duration.as_millis() as u64,
status = status_u16,
method = %method,
path_template = %path_template,
@ -884,7 +884,7 @@ pub async fn telemetry_middleware(
pod_id = %pod_id,
request_id = %request_id,
message = %message,
duration_ms = duration.as_millis(),
duration_ms = duration.as_millis() as u64,
status = status_u16,
method = %method,
path_template = %path_template,
@ -895,7 +895,7 @@ pub async fn telemetry_middleware(
pod_id = %pod_id,
request_id = %request_id,
message = %message,
duration_ms = duration.as_millis(),
duration_ms = duration.as_millis() as u64,
status = status_u16,
method = %method,
path_template = %path_template,

View file

@ -247,7 +247,14 @@ async fn delete_key_handler(
}
if !errors.is_empty() {
tracing::warn!(key = %key, errors = errors.len(), "key deletion partially failed");
// Hash the key identifier for correlation without logging the raw value (plan §10: no PII).
let key_hash = {
use std::hash::{Hash, Hasher};
let mut h = std::collections::hash_map::DefaultHasher::new();
key.hash(&mut h);
format!("{:016x}", h.finish())
};
tracing::warn!(key_hash = %key_hash, errors = errors.len(), "key deletion partially failed");
}
Ok(Json(first_response.unwrap_or(serde_json::json!({"status": "deleted"}))))

View file

@ -91,6 +91,18 @@ async fn search_handler(
let start = Instant::now();
let client_requested_score = body.ranking_score.unwrap_or(false);
// Refresh scoped-key beacon so the rotation leader knows this pod is serving
// requests for this index at the current generation (plan §13.21).
if let Some(ref redis) = state.redis_store {
if let Ok(Some(sk)) = redis.get_search_ui_scoped_key(&index) {
let _ = redis.observe_search_ui_scoped_key(
&state.pod_id,
&index,
sk.generation,
);
}
}
// Use live topology from shared state (updated by health checker)
let topo = state.topology.read().await;
let policy = match state.config.scatter.unavailable_shard_policy.as_str() {
@ -199,14 +211,17 @@ async fn search_handler(
.unwrap();
// Structured log entry (plan §10 shape)
// Note: request_id and pod_id are inherited from the middleware span
tracing::info!(
// request_id and pod_id are included from the middleware span via
// .with_current_span(true) on the JSON subscriber layer.
// Logged at DEBUG to keep INFO volume at ≤1 per request (the middleware
// already emits one INFO line for every response).
tracing::debug!(
target: "miroir.search",
index = %index,
node_count = node_count,
estimated_hits = result.estimated_total_hits,
degraded = result.degraded,
duration_ms = start.elapsed().as_millis(),
duration_ms = start.elapsed().as_millis() as u64,
"search completed"
);