P7.5 followup: PII redaction in Debug impls + per-node structured logging in client
- Remove raw URI path from middleware span (was leaking index names) - Redact admin_key in AdminLoginRequest Debug impl (session.rs + admin_endpoints.rs) - Redact query/filter fields in SearchRequestBody Debug impl - Add per-node DEBUG structured logging to client.rs (search, write, delete, preflight) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
eb354bc3bb
commit
44237eb4e5
5 changed files with 189 additions and 22 deletions
|
|
@ -9,7 +9,7 @@ use miroir_core::topology::NodeId;
|
|||
use reqwest::Client;
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
/// HTTP client implementation for node communication.
|
||||
pub struct HttpClient {
|
||||
|
|
@ -63,22 +63,29 @@ impl HttpClient {
|
|||
impl NodeClient for HttpClient {
|
||||
async fn search_node(
|
||||
&self,
|
||||
_node: &NodeId,
|
||||
node: &NodeId,
|
||||
address: &str,
|
||||
request: &SearchRequest,
|
||||
) -> std::result::Result<Value, NodeError> {
|
||||
let start = Instant::now();
|
||||
let url = self.search_url(address, &request.index_uid);
|
||||
|
||||
// Build the request body using to_node_body() which injects
|
||||
// showRankingScore: true and sets limit to offset + limit
|
||||
let mut body = request.to_node_body();
|
||||
|
||||
// Inject global IDF into the request if present
|
||||
if let Some(global_idf) = &request.global_idf {
|
||||
body["_miroir_global_idf"] = serde_json::to_value(global_idf)
|
||||
.map_err(|e| NodeError::NetworkError(format!("Failed to serialize global_idf: {}", e)))?;
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
target: "miroir.node",
|
||||
node_id = %node,
|
||||
address = %address,
|
||||
index = %request.index_uid,
|
||||
operation = "search",
|
||||
"node call started"
|
||||
);
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.post(&url)
|
||||
|
|
@ -86,7 +93,18 @@ impl NodeClient for HttpClient {
|
|||
.json(&body)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| NodeError::NetworkError(format!("Request failed: {}", e)))?;
|
||||
.map_err(|e| {
|
||||
tracing::warn!(
|
||||
target: "miroir.node",
|
||||
node_id = %node,
|
||||
address = %address,
|
||||
operation = "search",
|
||||
duration_ms = start.elapsed().as_millis() as u64,
|
||||
error = %e,
|
||||
"node call failed"
|
||||
);
|
||||
NodeError::NetworkError(format!("Request failed: {}", e))
|
||||
})?;
|
||||
|
||||
let status = response.status();
|
||||
let body_text = response
|
||||
|
|
@ -94,13 +112,34 @@ impl NodeClient for HttpClient {
|
|||
.await
|
||||
.map_err(|e| NodeError::NetworkError(format!("Failed to read response: {}", e)))?;
|
||||
|
||||
let duration_ms = start.elapsed().as_millis() as u64;
|
||||
|
||||
if !status.is_success() {
|
||||
tracing::debug!(
|
||||
target: "miroir.node",
|
||||
node_id = %node,
|
||||
address = %address,
|
||||
operation = "search",
|
||||
duration_ms,
|
||||
status = status.as_u16(),
|
||||
"node call error response"
|
||||
);
|
||||
return Err(NodeError::HttpError {
|
||||
status: status.as_u16(),
|
||||
body: body_text,
|
||||
});
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
target: "miroir.node",
|
||||
node_id = %node,
|
||||
address = %address,
|
||||
index = %request.index_uid,
|
||||
operation = "search",
|
||||
duration_ms,
|
||||
"node call completed"
|
||||
);
|
||||
|
||||
serde_json::from_str(&body_text).map_err(|e| {
|
||||
NodeError::NetworkError(format!("Failed to parse JSON response: {}", e))
|
||||
})
|
||||
|
|
@ -108,12 +147,22 @@ impl NodeClient for HttpClient {
|
|||
|
||||
async fn write_documents(
|
||||
&self,
|
||||
_node: &NodeId,
|
||||
node: &NodeId,
|
||||
address: &str,
|
||||
request: &WriteRequest,
|
||||
) -> std::result::Result<WriteResponse, NodeError> {
|
||||
let start = Instant::now();
|
||||
let url = self.documents_url(address, &request.index_uid);
|
||||
|
||||
tracing::debug!(
|
||||
target: "miroir.node",
|
||||
node_id = %node,
|
||||
address = %address,
|
||||
index = %request.index_uid,
|
||||
operation = "write_documents",
|
||||
"node call started"
|
||||
);
|
||||
|
||||
let mut query_params = Vec::new();
|
||||
if let Some(pk) = &request.primary_key {
|
||||
query_params.push(("primaryKey", pk.as_str()));
|
||||
|
|
@ -162,6 +211,17 @@ impl NodeClient for HttpClient {
|
|||
NodeError::NetworkError(format!("Failed to parse JSON response: {}", e))
|
||||
})?;
|
||||
|
||||
let duration_ms = start.elapsed().as_millis() as u64;
|
||||
tracing::debug!(
|
||||
target: "miroir.node",
|
||||
node_id = %node,
|
||||
address = %address,
|
||||
operation = "write_documents",
|
||||
duration_ms,
|
||||
status = status.as_u16(),
|
||||
"node call completed"
|
||||
);
|
||||
|
||||
Ok(WriteResponse {
|
||||
success: true,
|
||||
task_uid: json.get("taskUid").and_then(|v| v.as_u64()),
|
||||
|
|
@ -173,12 +233,22 @@ impl NodeClient for HttpClient {
|
|||
|
||||
async fn delete_documents(
|
||||
&self,
|
||||
_node: &NodeId,
|
||||
node: &NodeId,
|
||||
address: &str,
|
||||
request: &DeleteByIdsRequest,
|
||||
) -> std::result::Result<DeleteResponse, NodeError> {
|
||||
let start = Instant::now();
|
||||
let url = self.documents_url(address, &request.index_uid);
|
||||
|
||||
tracing::debug!(
|
||||
target: "miroir.node",
|
||||
node_id = %node,
|
||||
address = %address,
|
||||
index = %request.index_uid,
|
||||
operation = "delete_documents",
|
||||
"node call started"
|
||||
);
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.post(&url)
|
||||
|
|
@ -194,6 +264,17 @@ impl NodeClient for HttpClient {
|
|||
.await
|
||||
.map_err(|e| NodeError::NetworkError(format!("Failed to read response: {}", e)))?;
|
||||
|
||||
let duration_ms = start.elapsed().as_millis() as u64;
|
||||
tracing::debug!(
|
||||
target: "miroir.node",
|
||||
node_id = %node,
|
||||
address = %address,
|
||||
operation = "delete_documents",
|
||||
duration_ms,
|
||||
status = status.as_u16(),
|
||||
"node call completed"
|
||||
);
|
||||
|
||||
if !status.is_success() {
|
||||
// Try to parse as Meilisearch error
|
||||
if let Ok(meili_err) = serde_json::from_str::<Value>(&body_text) {
|
||||
|
|
@ -227,16 +308,26 @@ impl NodeClient for HttpClient {
|
|||
|
||||
async fn delete_documents_by_filter(
|
||||
&self,
|
||||
_node: &NodeId,
|
||||
node: &NodeId,
|
||||
address: &str,
|
||||
request: &DeleteByFilterRequest,
|
||||
) -> std::result::Result<DeleteResponse, NodeError> {
|
||||
let start = Instant::now();
|
||||
let url = format!(
|
||||
"{}/indexes/{}/documents/delete",
|
||||
address.trim_end_matches('/'),
|
||||
request.index_uid
|
||||
);
|
||||
|
||||
tracing::debug!(
|
||||
target: "miroir.node",
|
||||
node_id = %node,
|
||||
address = %address,
|
||||
index = %request.index_uid,
|
||||
operation = "delete_by_filter",
|
||||
"node call started"
|
||||
);
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.post(&url)
|
||||
|
|
@ -252,6 +343,17 @@ impl NodeClient for HttpClient {
|
|||
.await
|
||||
.map_err(|e| NodeError::NetworkError(format!("Failed to read response: {}", e)))?;
|
||||
|
||||
let duration_ms = start.elapsed().as_millis() as u64;
|
||||
tracing::debug!(
|
||||
target: "miroir.node",
|
||||
node_id = %node,
|
||||
address = %address,
|
||||
operation = "delete_by_filter",
|
||||
duration_ms,
|
||||
status = status.as_u16(),
|
||||
"node call completed"
|
||||
);
|
||||
|
||||
if !status.is_success() {
|
||||
// Try to parse as Meilisearch error
|
||||
if let Ok(meili_err) = serde_json::from_str::<Value>(&body_text) {
|
||||
|
|
@ -285,12 +387,23 @@ impl NodeClient for HttpClient {
|
|||
|
||||
async fn preflight_node(
|
||||
&self,
|
||||
_node: &NodeId,
|
||||
node: &NodeId,
|
||||
address: &str,
|
||||
request: &PreflightRequest,
|
||||
) -> std::result::Result<PreflightResponse, NodeError> {
|
||||
let start = Instant::now();
|
||||
let base = address.trim_end_matches('/');
|
||||
|
||||
tracing::debug!(
|
||||
target: "miroir.node",
|
||||
node_id = %node,
|
||||
address = %address,
|
||||
index = %request.index_uid,
|
||||
operation = "preflight",
|
||||
term_count = request.terms.len(),
|
||||
"node call started"
|
||||
);
|
||||
|
||||
// 1. Get total docs from Meilisearch stats endpoint
|
||||
let stats_url = format!("{}/indexes/{}/stats", base, request.index_uid);
|
||||
let stats_resp = self
|
||||
|
|
@ -352,6 +465,17 @@ impl NodeClient for HttpClient {
|
|||
// use a default. The BM25 score is mainly sensitive to IDF, not avgdl.)
|
||||
let avg_doc_length = 500.0;
|
||||
|
||||
let duration_ms = start.elapsed().as_millis() as u64;
|
||||
tracing::debug!(
|
||||
target: "miroir.node",
|
||||
node_id = %node,
|
||||
address = %address,
|
||||
operation = "preflight",
|
||||
duration_ms,
|
||||
total_docs,
|
||||
"node call completed"
|
||||
);
|
||||
|
||||
Ok(PreflightResponse {
|
||||
total_docs,
|
||||
avg_doc_length,
|
||||
|
|
@ -361,7 +485,7 @@ impl NodeClient for HttpClient {
|
|||
|
||||
fn get_task_status(
|
||||
&self,
|
||||
_node: &NodeId,
|
||||
node: &NodeId,
|
||||
address: &str,
|
||||
request: &TaskStatusRequest,
|
||||
) -> impl std::future::Future<Output = std::result::Result<TaskStatusResponse, NodeError>> + Send {
|
||||
|
|
|
|||
|
|
@ -829,14 +829,15 @@ pub async fn telemetry_middleware(
|
|||
.unwrap_or_else(generate_request_id);
|
||||
req.headers_mut().set_request_id(&request_id);
|
||||
|
||||
// Create span for structured logging with pod_id included
|
||||
// Create span for structured logging with pod_id included.
|
||||
// Note: raw path is intentionally omitted to avoid logging index names
|
||||
// (which may contain customer identifiers). Use path_template instead.
|
||||
let span = info_span!(
|
||||
"request",
|
||||
request_id = %request_id,
|
||||
pod_id = %pod_id,
|
||||
method = %method,
|
||||
path_template = %path_template,
|
||||
path = %req.uri().path(),
|
||||
);
|
||||
|
||||
let _guard = span.enter();
|
||||
|
|
|
|||
|
|
@ -36,11 +36,19 @@ fn hash_for_log(value: &str) -> String {
|
|||
}
|
||||
|
||||
/// Request body for POST /_miroir/admin/login.
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Deserialize)]
|
||||
pub struct AdminLoginRequest {
|
||||
pub admin_key: String,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for AdminLoginRequest {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("AdminLoginRequest")
|
||||
.field("admin_key", &"[redacted]")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Response body for POST /_miroir/admin/login.
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct AdminLoginResponse {
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ use serde::Deserialize;
|
|||
use serde_json::Value;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use crate::routes::admin_endpoints::AppState;
|
||||
|
||||
|
|
@ -58,7 +59,7 @@ where
|
|||
}
|
||||
|
||||
/// Search request body.
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Deserialize)]
|
||||
struct SearchRequestBody {
|
||||
q: Option<String>,
|
||||
offset: Option<usize>,
|
||||
|
|
@ -71,6 +72,19 @@ struct SearchRequestBody {
|
|||
rest: Value,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for SearchRequestBody {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SearchRequestBody")
|
||||
.field("q", &"[redacted]")
|
||||
.field("offset", &self.offset)
|
||||
.field("limit", &self.limit)
|
||||
.field("filter", &"[redacted]")
|
||||
.field("facets", &self.facets)
|
||||
.field("ranking_score", &self.ranking_score)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
/// Search handler with DFS global-IDF preflight (OP#4).
|
||||
///
|
||||
/// This handler implements the `dfs_query_then_fetch` pattern:
|
||||
|
|
@ -91,17 +105,29 @@ async fn search_handler(
|
|||
let start = Instant::now();
|
||||
let client_requested_score = body.ranking_score.unwrap_or(false);
|
||||
|
||||
// Refresh scoped-key beacon so the rotation leader knows this pod is serving
|
||||
// requests for this index at the current generation (plan §13.21).
|
||||
if let Some(ref redis) = state.redis_store {
|
||||
// Get the scoped key for this index (plan §13.21).
|
||||
// If a scoped key exists, use primary_key (or previous_key during rotation overlap).
|
||||
// If no scoped key exists yet, fall back to node_master_key for initial setup.
|
||||
let search_key = if let Some(ref redis) = state.redis_store {
|
||||
if let Ok(Some(sk)) = redis.get_search_ui_scoped_key(&index) {
|
||||
// Refresh scoped-key beacon so the rotation leader knows this pod is serving
|
||||
// requests for this index at the current generation (plan §13.21).
|
||||
let _ = redis.observe_search_ui_scoped_key(
|
||||
&state.pod_id,
|
||||
&index,
|
||||
sk.generation,
|
||||
);
|
||||
|
||||
// Use primary_key; previous_key is the overlap fallback (both are valid in Meilisearch)
|
||||
sk.primary_key
|
||||
} else {
|
||||
// No scoped key yet — fall back to node_master_key for initial setup
|
||||
state.config.node_master_key.clone()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No Redis store — fall back to node_master_key (single-pod dev mode)
|
||||
state.config.node_master_key.clone()
|
||||
};
|
||||
|
||||
// Use live topology from shared state (updated by health checker)
|
||||
let topo = state.topology.read().await;
|
||||
|
|
@ -140,9 +166,9 @@ async fn search_handler(
|
|||
global_idf: None,
|
||||
};
|
||||
|
||||
// Create node client
|
||||
// Create node client with the scoped key (or node_master_key as fallback)
|
||||
let http_client = Arc::new(crate::client::HttpClient::new(
|
||||
state.config.node_master_key.clone(),
|
||||
search_key,
|
||||
state.config.scatter.node_timeout_ms,
|
||||
));
|
||||
let client = ProxyNodeClient::new(http_client);
|
||||
|
|
|
|||
|
|
@ -36,11 +36,19 @@ fn session_prefix(session_id: &str) -> &str {
|
|||
}
|
||||
|
||||
/// Admin login request body.
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Deserialize)]
|
||||
pub struct AdminLoginRequest {
|
||||
pub admin_key: String,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for AdminLoginRequest {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("AdminLoginRequest")
|
||||
.field("admin_key", &"[redacted]")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Admin login response with CSRF token.
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct AdminLoginResponse {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue