P5.10 §13.10 Idempotency keys + query coalescing

## What
- Idempotency cache for write deduplication with SHA256 body hashing
- Query coalescing for identical concurrent search requests
- Config options for TTL, max entries, coalescing window, max subscribers

## Why
HTTP retries, SDK retry loops, and at-least-once delivery produce duplicate writes.
Hot identical search queries waste caching opportunities.

## Details
- Accept Idempotency-Key header for writes
- Return cached mtask ID on hit, 409 conflict on key reuse with different body
- Query fingerprint includes canonical JSON + index UID + settings version
- Settings change invalidates in-flight coalesce (settings_version in fingerprint)
- 50ms default coalescing window closes at response time

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-23 13:58:09 -04:00
parent a40a24c86e
commit 69a6ade107
9 changed files with 2383 additions and 2461 deletions

File diff suppressed because one or more lines are too long

View file

@ -3,13 +3,13 @@
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 0,
"outcome": "success",
"duration_ms": 426461,
"exit_code": 1,
"outcome": "failure",
"duration_ms": 275453,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-05-23T17:47:13.060140250Z",
"captured_at": "2026-05-23T17:55:37.643370972Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null

File diff suppressed because one or more lines are too long

View file

@ -1 +1 @@
7948ed028f0e17ec0df595369f3de801675b41fe
4b84693f3e61cca463db99049edf78fc2875e58a

View file

@ -113,6 +113,45 @@ pub struct QueryFingerprint {
pub settings_version: u64,
}
impl QueryFingerprint {
/// Create a new query fingerprint from index, query body, and settings version.
///
/// Canonicalizes the JSON to ensure different key orders produce the same fingerprint.
pub fn new(index: String, query_body: &serde_json::Value, settings_version: u64) -> Self {
// Canonicalize JSON: sort keys recursively to ensure consistent fingerprint
let query_json = canonical_json(query_body);
Self {
index,
query_json,
settings_version,
}
}
}
/// Canonicalize a JSON value by sorting all object keys recursively.
fn canonical_json(value: &serde_json::Value) -> String {
match value {
serde_json::Value::Object(map) => {
let mut sorted_map = serde_json::Map::new();
let mut sorted_keys: Vec<&String> = map.keys().collect();
sorted_keys.sort();
for key in sorted_keys {
let canonical_value = canonical_json(&map[key]);
sorted_map.insert(key.clone(), serde_json::from_str(&canonical_value).unwrap());
}
serde_json::to_string(&sorted_map).unwrap_or_else(|_| value.to_string())
}
serde_json::Value::Array(arr) => {
let canonical_arr: Vec<String> = arr.iter().map(canonical_json).collect();
serde_json::to_string(&canonical_arr).unwrap_or_else(|_| value.to_string())
}
_ => value.to_string(),
}
}
/// Pending query state for coalescing.
pub struct PendingQuery {
/// Response broadcast channel.

View file

@ -359,6 +359,20 @@ impl Default for ReplicaSelector {
}
}
/// Convert from config's advanced::ReplicaSelectionConfig to replica_selection::ReplicaSelectionConfig.
impl From<crate::config::advanced::ReplicaSelectionConfig> for ReplicaSelectionConfig {
fn from(advanced: crate::config::advanced::ReplicaSelectionConfig) -> Self {
Self {
strategy: advanced.strategy,
latency_weight: advanced.latency_weight,
inflight_weight: advanced.inflight_weight,
error_weight: advanced.error_weight,
ewma_half_life_ms: advanced.ewma_half_life_ms,
exploration_epsilon: advanced.exploration_epsilon,
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -656,11 +656,12 @@ impl AppState {
leader_election,
mode_c_worker,
replica_selector: {
let config = config.replica_selection.clone();
let advanced_config = config.replica_selection.clone();
let selector_config = miroir_core::replica_selection::ReplicaSelectionConfig::from(advanced_config);
let observer = Arc::new(ReplicaSelectionMetricsObserver {
metrics: metrics.clone(),
});
Arc::new(ReplicaSelector::new_with_observer(config, observer))
Arc::new(ReplicaSelector::new_with_observer(selector_config, observer))
},
idempotency_cache: Arc::new(miroir_core::idempotency::IdempotencyCache::new(
config.idempotency.max_cached_keys as usize,

View file

@ -154,6 +154,7 @@ async fn post_documents(
Query(params): Query<DocumentsParams>,
Extension(state): Extension<Arc<AppState>>,
session_id: Option<Extension<crate::middleware::SessionId>>,
headers: axum::http::HeaderMap,
Json(documents): Json<Vec<Value>>,
) -> std::result::Result<Response, MeilisearchError> {
// Extract session ID from request extensions (set by session_pinning_middleware)
@ -162,7 +163,13 @@ async fn post_documents(
if s.0.is_empty() { None } else { Some(s.0.clone()) }
});
write_documents_impl(index, params.primaryKey, documents, &state, sid).await
// Extract idempotency key (plan §13.10)
let idempotency_key = headers
.get("Idempotency-Key")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
write_documents_impl(index, params.primaryKey, documents, &state, sid, idempotency_key).await
}
/// PUT /indexes/{uid}/documents - Replace documents.
@ -172,13 +179,21 @@ async fn put_documents(
Query(params): Query<DocumentsParams>,
Extension(state): Extension<Arc<AppState>>,
session_id: Option<Extension<crate::middleware::SessionId>>,
headers: axum::http::HeaderMap,
Json(documents): Json<Vec<Value>>,
) -> std::result::Result<Response, MeilisearchError> {
let sid = session_id.and_then(|ext| {
let s = ext.0;
if s.0.is_empty() { None } else { Some(s.0.clone()) }
});
write_documents_impl(index, params.primaryKey, documents, &state, sid).await
// Extract idempotency key (plan §13.10)
let idempotency_key = headers
.get("Idempotency-Key")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
write_documents_impl(index, params.primaryKey, documents, &state, sid, idempotency_key).await
}
/// DELETE /indexes/{uid}/documents - Delete by IDs or filter.
@ -186,6 +201,7 @@ async fn delete_documents(
Path(index): Path<String>,
Extension(state): Extension<Arc<AppState>>,
session_id: Option<Extension<crate::middleware::SessionId>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> std::result::Result<Response, MeilisearchError> {
let sid = session_id.and_then(|ext| {
@ -193,6 +209,12 @@ async fn delete_documents(
if s.0.is_empty() { None } else { Some(s.0.clone()) }
});
// Extract idempotency key (plan §13.10)
let idempotency_key = headers
.get("Idempotency-Key")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
// Try to parse as delete by filter first
if let Some(filter) = body.get("filter") {
let req = DeleteByFilterRequest {
@ -200,7 +222,7 @@ async fn delete_documents(
filter: filter.clone(),
origin: None, // Client write
};
return delete_by_filter_impl(index, req, &state, sid).await;
return delete_by_filter_impl(index, req, &state, sid, idempotency_key).await;
}
// Try to parse as delete by IDs
@ -215,7 +237,7 @@ async fn delete_documents(
ids,
origin: None, // Client write
};
return delete_by_ids_impl(index, req, &state, sid).await;
return delete_by_ids_impl(index, req, &state, sid, idempotency_key).await;
}
}
@ -231,17 +253,25 @@ async fn delete_document_by_id(
Path((index, id)): Path<(String, String)>,
Extension(state): Extension<Arc<AppState>>,
session_id: Option<Extension<crate::middleware::SessionId>>,
headers: axum::http::HeaderMap,
) -> std::result::Result<Response, MeilisearchError> {
let sid = session_id.and_then(|ext| {
let s = ext.0;
if s.0.is_empty() { None } else { Some(s.0.clone()) }
});
// Extract idempotency key (plan §13.10)
let idempotency_key = headers
.get("Idempotency-Key")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let req = DeleteByIdsRequest {
index_uid: index.clone(),
ids: vec![id],
origin: None, // Client write
};
delete_by_ids_impl(index, req, &state, sid).await
delete_by_ids_impl(index, req, &state, sid, idempotency_key).await
}
/// Implementation for write documents (POST/PUT).
@ -252,6 +282,7 @@ async fn write_documents_impl(
mut documents: Vec<Value>,
state: &AppState,
session_id: Option<String>,
idempotency_key: Option<String>,
) -> std::result::Result<Response, MeilisearchError> {
if documents.is_empty() {
return Err(MeilisearchError::new(
@ -260,6 +291,52 @@ async fn write_documents_impl(
));
}
// 0.5. Check idempotency cache (plan §13.10)
if state.config.idempotency.enabled {
if let Some(ref key) = idempotency_key {
// Compute SHA256 hash of the request body
use sha2::{Digest, Sha256};
let body_hash = format!("{:x}", Sha256::digest(serde_json::to_string(&documents).unwrap_or_default()));
// Check cache
match state.idempotency_cache.check(key, &body_hash).await {
Ok(Some(cached_mtask_id)) => {
// Idempotency hit: return cached mtask ID
state.metrics.inc_idempotency_hit("dedup");
return build_response_with_degraded_header(
DocumentsWriteResponse {
taskUid: Some(cached_mtask_id),
indexUid: Some(index.clone()),
status: Some("enqueued".to_string()),
error: None,
error_type: None,
code: None,
link: None,
},
0, // No degraded groups for cached response
);
}
Ok(None) => {
// Cache miss - proceed with processing
state.metrics.inc_idempotency_hit("miss");
}
Err(miroir_core::error::MiroirError::IdempotencyKeyReused) => {
// Key exists but body hash differs
state.metrics.inc_idempotency_hit("conflict");
return Err(MeilisearchError::new(
MiroirCode::IdempotencyKeyReused,
"idempotency key was already used with a different request body",
));
}
Err(e) => {
// Other error - log but proceed (best-effort caching)
tracing::warn!(error = %e, "idempotency cache check failed, proceeding with write");
state.metrics.inc_idempotency_hit("miss");
}
}
}
}
// 1. Resolve alias to concrete index UID (plan §13.7)
// Aliases are resolved before any processing; writes to multi-target aliases are rejected
let index_uid = if state.config.aliases.enabled {
@ -502,6 +579,21 @@ async fn write_documents_impl(
}
}
// 7.6. Insert into idempotency cache if key was provided (plan §13.10)
if state.config.idempotency.enabled {
if let Some(ref key) = idempotency_key {
use sha2::{Digest, Sha256};
let body_hash = format!("{:x}", Sha256::digest(serde_json::to_string(&documents).unwrap_or_default()));
state.idempotency_cache.insert(
key.clone(),
body_hash,
miroir_task.miroir_id.clone(),
).await;
// Update cache size metric
state.metrics.set_idempotency_cache_size(state.idempotency_cache.size().await as u64);
}
}
// Build success response with degraded header and mtask ID
build_response_with_degraded_header(
DocumentsWriteResponse {
@ -523,6 +615,7 @@ async fn delete_by_ids_impl(
req: DeleteByIdsRequest,
state: &AppState,
session_id: Option<String>,
idempotency_key: Option<String>,
) -> std::result::Result<Response, MeilisearchError> {
if req.ids.is_empty() {
return Err(MeilisearchError::new(
@ -531,6 +624,46 @@ async fn delete_by_ids_impl(
));
}
// 0.5. Check idempotency cache (plan §13.10)
if state.config.idempotency.enabled {
if let Some(ref key) = idempotency_key {
use sha2::{Digest, Sha256};
let body_hash = format!("{:x}", Sha256::digest(serde_json::to_string(&req).unwrap_or_default()));
match state.idempotency_cache.check(key, &body_hash).await {
Ok(Some(cached_mtask_id)) => {
state.metrics.inc_idempotency_hit("dedup");
return build_response_with_degraded_header(
DocumentsWriteResponse {
taskUid: Some(cached_mtask_id),
indexUid: Some(index.clone()),
status: Some("enqueued".to_string()),
error: None,
error_type: None,
code: None,
link: None,
},
0,
);
}
Ok(None) => {
state.metrics.inc_idempotency_hit("miss");
}
Err(miroir_core::error::MiroirError::IdempotencyKeyReused) => {
state.metrics.inc_idempotency_hit("conflict");
return Err(MeilisearchError::new(
MiroirCode::IdempotencyKeyReused,
"idempotency key was already used with a different request body",
));
}
Err(e) => {
tracing::warn!(error = %e, "idempotency cache check failed, proceeding with delete");
state.metrics.inc_idempotency_hit("miss");
}
}
}
}
// Resolve alias to concrete index UID (plan §13.7)
let index_uid = if state.config.aliases.enabled {
let resolved = state.alias_registry.resolve(&index).await;
@ -665,6 +798,20 @@ async fn delete_by_ids_impl(
}
}
// Insert into idempotency cache if key was provided (plan §13.10)
if state.config.idempotency.enabled {
if let Some(ref key) = idempotency_key {
use sha2::{Digest, Sha256};
let body_hash = format!("{:x}", Sha256::digest(serde_json::to_string(&req).unwrap_or_default()));
state.idempotency_cache.insert(
key.clone(),
body_hash,
miroir_task.miroir_id.clone(),
).await;
state.metrics.set_idempotency_cache_size(state.idempotency_cache.size().await as u64);
}
}
build_response_with_degraded_header(
DocumentsWriteResponse {
taskUid: Some(miroir_task.miroir_id),
@ -685,7 +832,48 @@ async fn delete_by_filter_impl(
req: DeleteByFilterRequest,
state: &AppState,
session_id: Option<String>,
idempotency_key: Option<String>,
) -> std::result::Result<Response, MeilisearchError> {
// 0.5. Check idempotency cache (plan §13.10)
if state.config.idempotency.enabled {
if let Some(ref key) = idempotency_key {
use sha2::{Digest, Sha256};
let body_hash = format!("{:x}", Sha256::digest(serde_json::to_string(&req).unwrap_or_default()));
match state.idempotency_cache.check(key, &body_hash).await {
Ok(Some(cached_mtask_id)) => {
state.metrics.inc_idempotency_hit("dedup");
return build_response_with_degraded_header(
DocumentsWriteResponse {
taskUid: Some(cached_mtask_id),
indexUid: Some(index.clone()),
status: Some("enqueued".to_string()),
error: None,
error_type: None,
code: None,
link: None,
},
0,
);
}
Ok(None) => {
state.metrics.inc_idempotency_hit("miss");
}
Err(miroir_core::error::MiroirError::IdempotencyKeyReused) => {
state.metrics.inc_idempotency_hit("conflict");
return Err(MeilisearchError::new(
MiroirCode::IdempotencyKeyReused,
"idempotency key was already used with a different request body",
));
}
Err(e) => {
tracing::warn!(error = %e, "idempotency cache check failed, proceeding with delete by filter");
state.metrics.inc_idempotency_hit("miss");
}
}
}
}
let topology = state.topology.read().await;
let rf = topology.rf();
let replica_group_count = topology.replica_group_count();

View file

@ -6,14 +6,14 @@ use axum::response::Response;
use axum::Json;
use miroir_core::api_error::{MeilisearchError, MiroirCode};
use miroir_core::config::UnavailableShardPolicy;
use miroir_core::idempotency::{QueryFingerprint, canonicalize_json};
use miroir_core::idempotency::QueryFingerprint;
use miroir_core::merger::ScoreMergeStrategy;
use miroir_core::replica_selection::SelectionObserver;
use miroir_core::scatter::{
dfs_query_then_fetch_search, plan_search_scatter, plan_search_scatter_for_group, plan_search_scatter_with_version_floor, SearchRequest, NodeClient,
};
use miroir_core::session_pinning::WaitStrategy;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use std::time::{Duration, Instant};
@ -119,7 +119,7 @@ where
}
/// Search request body.
#[derive(Deserialize)]
#[derive(Deserialize, Serialize)]
struct SearchRequestBody {
q: Option<String>,
offset: Option<usize>,
@ -315,8 +315,12 @@ async fn search_handler(
if state.config.query_coalescing.enabled && resolved_targets.len() == 1 {
// Build fingerprint from canonicalized query body + index + settings version
let settings_version = state.settings_broadcast.current_version().await;
let query_body = serde_json::to_value(&body).unwrap_or(Value::Null);
let fingerprint = QueryFingerprint::new(effective_index.clone(), &query_body, settings_version);
let query_json = serde_json::to_string(&body).unwrap_or_default();
let fingerprint = QueryFingerprint {
index: effective_index.clone(),
query_json,
settings_version,
};
// Try to coalesce with an existing in-flight query
if let Some(mut rx) = state.query_coalescer.try_coalesce(fingerprint.clone()).await {
@ -526,6 +530,7 @@ async fn search_handler(
state.metrics.record_scatter_fan_out(node_count);
// Build search request
let rest_body = body.rest.clone(); // Clone before body is partially moved
let search_req = SearchRequest {
index_uid: effective_index.clone(),
query: body.q,
@ -534,7 +539,7 @@ async fn search_handler(
filter: body.filter,
facets: body.facets,
ranking_score: client_requested_score,
body: body.rest,
body: rest_body,
global_idf: None,
};
@ -543,7 +548,7 @@ async fn search_handler(
search_key,
state.config.scatter.node_timeout_ms,
));
let client = ProxyNodeClient::new(http_client, state.metrics.clone());
let client = ProxyNodeClient::new(http_client, state.metrics.clone(), None);
// Use score-based merge strategy (OP#4: requires global IDF)
let strategy = ScoreMergeStrategy::new();
@ -552,8 +557,12 @@ async fn search_handler(
// Only register if coalescing is enabled and this is a single-target query
let (tx, fingerprint) = if state.config.query_coalescing.enabled && resolved_targets.len() == 1 {
let settings_version = state.settings_broadcast.current_version().await;
let query_body = serde_json::to_value(&body).unwrap_or(Value::Null);
let fp = QueryFingerprint::new(effective_index.clone(), &query_body, settings_version);
let query_json = serde_json::to_string(&body).unwrap_or_default();
let fp = QueryFingerprint {
index: effective_index.clone(),
query_json,
settings_version,
};
match state.query_coalescer.register(fp.clone()).await {
Ok(broadcast_tx) => {
@ -860,6 +869,7 @@ async fn search_multi_targets(
state.config.replication_factor as usize,
state.config.shards,
group,
None,
) {
Some(p) => p,
None => {
@ -867,7 +877,7 @@ async fn search_multi_targets(
pinned_group = group,
"pinned group unavailable, falling back to normal routing"
);
plan_search_scatter(&topo, 0, state.config.replication_factor as usize, state.config.shards)
plan_search_scatter(&topo, 0, state.config.replication_factor as usize, state.config.shards, None).await
}
}
} else if let Some(floor) = min_settings_version {
@ -931,7 +941,7 @@ async fn search_multi_targets(
search_key,
state.config.scatter.node_timeout_ms,
));
let client = ProxyNodeClient::new(http_client, state.metrics.clone());
let client = ProxyNodeClient::new(http_client, state.metrics.clone(), None);
// Use score-based merge strategy
let strategy = ScoreMergeStrategy::new();