Phase 2 (miroir-9dj): Proxy + API Surface — Complete implementation

Implemented the complete HTTP proxy layer with full Meilisearch API compatibility.

## Core Components

**HTTP Server (main.rs)**
- axum server on port 7700 with metrics endpoint on port 9090
- Graceful shutdown handling for SIGINT/SIGTERM
- Structured JSON logging middleware
- Prometheus metrics collection

**Write Path (documents.rs, write.rs, scatter.rs)**
- Hash-based sharding using XxHash64 (seed 0) for primary key → shard mapping
- Automatic injection of _miroir_shard field into all documents
- Fan-out to RG × RF nodes per replica group
- Per-group quorum enforcement (floor(RF/2)+1)
- X-Miroir-Degraded header when any group misses quorum
- 503 miroir_no_quorum only when no group met quorum
- Orchestrator-side retry cache for idempotency

**Read Path (search.rs, merger.rs)**
- Replica group selection via query_seq % RG (round-robin)
- Intra-group covering set construction for all shards
- Parallel scatter to covering set nodes
- Global result merge by _rankingScore descending
- Offset/limit applied AFTER merge (global ordering preserved)
- Automatic stripping of _miroir_* reserved fields
- Conditional stripping of _rankingScore (only if not requested)
- Facet aggregation across shards (sum counts)
- Group fallback when covering set has holes

**Index Lifecycle (indexes.rs, settings.rs)**
- Create: broadcasts to all nodes + injects _miroir_shard into filterableAttributes
- Settings: sequential apply-with-rollback on failure
- Delete: broadcasts to all nodes
- Stats: aggregates numberOfDocuments (max) + fieldDistribution (merge)

**Tasks (tasks.rs, task_manager.rs)**
- Per-task ID reconciliation across nodes
- Aggregated status: failed if any failed, processing if any processing, etc.
- Node completion tracking in task metadata

**Error Handling (error_response.rs)**
- Meilisearch-compatible shape: {message, code, type, link}
- Custom miroir_* error codes
- Proper HTTP status codes (503 for no_quorum, 404 for not_found, etc.)

**Auth (auth.rs)**
- Bearer token dispatch per plan §5 rules 2-5
- master-key: full access to all endpoints
- admin-key: admin-only endpoints (/admin/*, /_miroir/*)
- No token: public endpoints only (/health, /version)
- Invalid token: 403 Forbidden

**Admin Endpoints (admin.rs, health.rs)**
- GET /health - public health check
- GET /version - version info
- GET /_miroir/ready - readiness check (requires healthy nodes)
- GET /_miroir/topology - cluster topology with node health
- GET /_miroir/shards - shard assignment information
- GET /_miroir/metrics - Prometheus metrics (admin-key gated)
- GET /admin/stats - aggregated stats across all nodes

## Bug Fixes

This commit includes several bug fixes:
- Fixed query value extraction before moving req in search.rs
- Fixed JSON deserialization in settings.rs (body bytes → Value)
- Fixed NodeId reference passing in rollback_setting
- Fixed type signatures in scatter.rs (headers slice, error types)
- Fixed response body handling in scatter (use bytes directly)

## Testing

Integration tests written in tests/phase2_integration_test.rs:
- test_1000_documents_indexed_retrievable_by_id
- test_unique_keyword_search_finds_all_docs_once
- test_facet_aggregation_sums_correctly
- test_offset_limit_paging_preserves_global_ordering
- test_write_with_degraded_group_succeeds_with_header
- test_topology_endpoint_shape
- test_error_format_parity
- test_index_stats_aggregation

Tests marked #[ignore] as they require running Meilisearch nodes.

## Definition of Done

- [x] axum server on port 7700, metrics on 9090
- [x] Write path with hash, _miroir_shard injection, fan-out, quorum
- [x] Read path with group selection, covering set, merge, fallback
- [x] Index lifecycle with broadcast, settings rollback, delete, stats
- [x] Tasks with ID reconciliation and aggregation
- [x] Meilisearch-compatible error format
- [x] Reserved fields contract (_miroir_shard always-reserved)
- [x] Bearer token auth (master-key, admin-key)
- [x] /health, /version, /_miroir/* endpoints
- [x] Structured JSON logging + Prometheus metrics
- [x] Scatter-gather with retry cache

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-09 12:08:16 -04:00
parent f513bf096c
commit f170a3034b
18 changed files with 7449 additions and 4359 deletions

File diff suppressed because one or more lines are too long

View file

@ -5,11 +5,11 @@
"model": "glm-4.7",
"exit_code": 1,
"outcome": "failure",
"duration_ms": 169615,
"duration_ms": 190192,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-05-09T15:34:59.597069683Z",
"captured_at": "2026-05-09T16:07:14.329334504Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,16 @@
{
"bead_id": "miroir-cdo.4",
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 1,
"outcome": "failure",
"duration_ms": 196499,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-05-09T15:01:23.134387857Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null
}

View file

@ -0,0 +1,2 @@
SessionEnd hook [/home/coding/.ccdash/hooks/session-end.sh] failed: /bin/sh: line 1: /home/coding/.ccdash/hooks/session-end.sh: cannot execute: required file not found

File diff suppressed because one or more lines are too long

View file

@ -3,13 +3,13 @@
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 1,
"outcome": "failure",
"duration_ms": 72610,
"exit_code": 0,
"outcome": "success",
"duration_ms": 359035,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-05-09T15:35:29.171177981Z",
"captured_at": "2026-05-09T16:04:03.670605825Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null

File diff suppressed because one or more lines are too long

View file

@ -1 +1 @@
7247b338e65273ce86a9884f29eb816cacf18d1b
3065ba246521ef64849d536ce9b3a094cbab4c3a

1
Cargo.lock generated
View file

@ -1497,6 +1497,7 @@ dependencies = [
"tower",
"tracing",
"tracing-subscriber",
"uuid",
]
[[package]]

View file

@ -29,6 +29,7 @@ futures = "0.3"
miroir-core = { path = "../miroir-core" }
sha2 = "0.10"
hex = "0.4"
uuid = { version = "1.11", features = ["v4", "serde"] }
[dev-dependencies]
tower = "0.5"

View file

@ -13,16 +13,24 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
/// Node response with status code and body.
/// Node response with status code and body (as Vec<u8> for scatter compatibility).
#[derive(Debug, Clone)]
pub struct NodeResponse {
pub node_id: NodeId,
pub status: u16,
pub body: Value,
pub body: Vec<u8>,
#[allow(dead_code)]
pub headers: Vec<(String, String)>,
}
impl NodeResponse {
/// Convert body to JSON value.
#[allow(dead_code)]
pub fn body_json(&self) -> Result<Value> {
serde_json::from_slice(&self.body).map_err(|e| MiroirError::Json(e))
}
}
/// HTTP client for scatter-gather requests to Meilisearch nodes.
#[derive(Clone)]
pub struct NodeClient {
@ -132,13 +140,10 @@ impl NodeClient {
.await
.map_err(|e| MiroirError::Routing(format!("failed to read response body: {e}")))?;
let body_json: Value = serde_json::from_slice(&body_bytes)
.unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&body_bytes).to_string()));
Ok(NodeResponse {
node_id: node_id.clone(),
status,
body: body_json,
body: body_bytes.to_vec(),
headers: response_headers,
})
}

View file

@ -111,7 +111,7 @@ async fn add_documents(
for mut doc in body {
let pk_value = doc
.get(primary_key)
.get(primary_key.as_str())
.and_then(|v| v.as_str())
.ok_or_else(|| ErrorResponse::invalid_request(format!("Missing primary key field '{primary_key}'")))?;

View file

@ -136,7 +136,7 @@ async fn create_index(
let scatter = HttpScatter::new((*state.client).clone(), state.config.server.request_timeout_ms);
let result = scatter
.scatter(&topology, targets, request, UnavailableShardPolicy::Partial)
.scatter(&topology, targets.clone(), request, UnavailableShardPolicy::Partial)
.await
.map_err(|e| ErrorResponse::internal_error(e.to_string()))?;

View file

@ -35,7 +35,7 @@ pub fn router() -> axum::Router<ProxyState> {
}
/// Search request body (Meilisearch format).
#[derive(Debug, serde::Deserialize)]
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
struct SearchRequest {
q: Option<String>,
@ -91,10 +91,10 @@ async fn search_with_group(
index: &str,
req_body: &[u8],
query_seq: u64,
limit: usize,
offset: usize,
client_requested_score: bool,
facets: Option<Vec<String>>,
_limit: usize,
_offset: usize,
_client_requested_score: bool,
_facets: Option<Vec<String>>,
) -> Result<(Vec<CoreShardResponse>, bool, Vec<u32>), ErrorResponse> {
let group = topology
.group(group_id)
@ -166,6 +166,9 @@ async fn search(
let topology = state.topology().await;
let query_seq = state.next_query_seq();
// Extract query before moving req.0
let query_value = req.q.clone();
// Build request body for nodes
let req_body = serde_json::to_vec(&req.0).unwrap_or_default();
@ -175,8 +178,8 @@ async fn search(
let facets = req.facets.clone();
// Try the primary group first
let primary_group_id = query_group(query_seq, state.config.replication_groups);
let (mut shard_responses, mut any_degraded, failed_shard_ids) = search_with_group(
let primary_group_id = query_group(query_seq, state.config.replica_groups);
let (mut shard_responses, mut any_degraded, mut failed_shard_ids) = search_with_group(
&state,
&topology,
primary_group_id,
@ -186,7 +189,7 @@ async fn search(
limit,
offset,
client_requested_score,
facets,
facets.clone(),
)
.await?;
@ -218,7 +221,7 @@ async fn search(
limit,
offset,
client_requested_score,
facets.clone(),
req.facets.clone(),
)
.await?;
@ -274,7 +277,7 @@ async fn search(
// Build response
let search_response = SearchResponse {
hits: merged.hits,
query: req.q.unwrap_or_default(),
query: query_value.unwrap_or_default(),
limit,
offset,
estimated_total_hits: merged.total_hits,

View file

@ -98,7 +98,9 @@ async fn get_all_settings(
if let Some(resp) = result.responses.first() {
if resp.status == 200 {
return Ok(Json(resp.body.clone()));
let body: Value = serde_json::from_slice(&resp.body)
.unwrap_or_else(|_| Value::Null);
return Ok(Json(body));
} else if resp.status == 404 {
return Err(ErrorResponse::index_not_found(&index));
}
@ -148,7 +150,7 @@ async fn update_setting_with_rollback(
Ok(resp) => {
if let Some(r) = resp.responses.first() {
let original_value = if r.status == 200 {
Some(r.body.clone())
Some(serde_json::from_slice(&r.body).unwrap_or(Value::Null))
} else {
None
};
@ -197,7 +199,8 @@ async fn update_setting_with_rollback(
if let Some(r) = resp.responses.first() {
if (200..300).contains(&r.status) {
successful_nodes.push(target.as_str().to_string());
last_response = Some(r.body.clone());
let body: Value = serde_json::from_slice(&r.body).unwrap_or(Value::Null);
last_response = Some(body);
} else {
// Rollback from successful nodes
rollback_setting(state, &topology, &successful_nodes, &rollback_values, index, setting_path).await;
@ -248,6 +251,7 @@ async fn rollback_setting(
) {
for node_id in successful_nodes {
if let Some(rollback) = rollback_values.get(node_id) {
let node_id_ref = miroir_core::topology::NodeId::new(node_id.clone());
if rollback.existed {
// Restore original value
if let Some(original) = &rollback.original_value {
@ -256,7 +260,7 @@ async fn rollback_setting(
.client
.send_to_node(
topology,
&node_id.as_str().into(),
&node_id_ref,
"PUT",
&format!("/indexes/{}/{}", index, setting_path),
Some(&body_bytes),
@ -270,7 +274,7 @@ async fn rollback_setting(
.client
.send_to_node(
topology,
&node_id.as_str().into(),
&node_id_ref,
"DELETE",
&format!("/indexes/{}/{}", index, setting_path),
None,
@ -287,17 +291,17 @@ async fn get_filterable_attributes(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Json<Value>, ErrorResponse> {
get_setting(state, &index, "filterable-attributes").await
get_setting(&state, &index, "filterable-attributes").await
}
/// PUT /indexes/:index/settings/filterable-attributes
async fn update_filterable_attributes(
State(state): State<ProxyState>,
Path(index): Path<String>,
body: Value,
body: Json<Value>,
) -> Result<Response, ErrorResponse> {
// Ensure _miroir_shard is always in filterable attributes
let mut updated = body.clone();
let mut updated = body.0.clone();
if let Some(arr) = updated.as_array_mut() {
if !arr.iter().any(|v| v.as_str() == Some("_miroir_shard")) {
arr.push(serde_json::json!("_miroir_shard"));
@ -321,16 +325,16 @@ async fn get_searchable_attributes(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Json<Value>, ErrorResponse> {
get_setting(state, &index, "settings/searchable-attributes").await
get_setting(&state, &index, "settings/searchable-attributes").await
}
/// PUT /indexes/:index/settings/searchable-attributes
async fn update_searchable_attributes(
State(state): State<ProxyState>,
Path(index): Path<String>,
body: Value,
body: Json<Value>,
) -> Result<Response, ErrorResponse> {
update_setting_with_rollback(&state, &index, "settings/searchable-attributes", &body).await
update_setting_with_rollback(&state, &index, "settings/searchable-attributes", &body.0).await
}
/// DELETE /indexes/:index/settings/searchable-attributes
@ -338,7 +342,7 @@ async fn delete_searchable_attributes(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Response, ErrorResponse> {
delete_setting(state, &index, "settings/searchable-attributes").await
delete_setting(&state, &index, "settings/searchable-attributes").await
}
/// GET /indexes/:index/settings/sortable-attributes
@ -346,16 +350,16 @@ async fn get_sortable_attributes(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Json<Value>, ErrorResponse> {
get_setting(state, &index, "settings/sortable-attributes").await
get_setting(&state, &index, "settings/sortable-attributes").await
}
/// PUT /indexes/:index/settings/sortable-attributes
async fn update_sortable_attributes(
State(state): State<ProxyState>,
Path(index): Path<String>,
body: Value,
body: Json<Value>,
) -> Result<Response, ErrorResponse> {
update_setting_with_rollback(&state, &index, "settings/sortable-attributes", &body).await
update_setting_with_rollback(&state, &index, "settings/sortable-attributes", &body.0).await
}
/// DELETE /indexes/:index/settings/sortable-attributes
@ -363,7 +367,7 @@ async fn delete_sortable_attributes(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Response, ErrorResponse> {
delete_setting(state, &index, "settings/sortable-attributes").await
delete_setting(&state, &index, "settings/sortable-attributes").await
}
/// GET /indexes/:index/settings/displayed-attributes
@ -371,16 +375,16 @@ async fn get_displayed_attributes(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Json<Value>, ErrorResponse> {
get_setting(state, &index, "settings/displayed-attributes").await
get_setting(&state, &index, "settings/displayed-attributes").await
}
/// PUT /indexes/:index/settings/displayed-attributes
async fn update_displayed_attributes(
State(state): State<ProxyState>,
Path(index): Path<String>,
body: Value,
body: Json<Value>,
) -> Result<Response, ErrorResponse> {
update_setting_with_rollback(&state, &index, "settings/displayed-attributes", &body).await
update_setting_with_rollback(&state, &index, "settings/displayed-attributes", &body.0).await
}
/// DELETE /indexes/:index/settings/displayed-attributes
@ -388,7 +392,7 @@ async fn delete_displayed_attributes(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Response, ErrorResponse> {
delete_setting(state, &index, "settings/displayed-attributes").await
delete_setting(&state, &index, "settings/displayed-attributes").await
}
/// GET /indexes/:index/settings/ranking-rules
@ -396,16 +400,16 @@ async fn get_ranking_rules(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Json<Value>, ErrorResponse> {
get_setting(state, &index, "settings/ranking-rules").await
get_setting(&state, &index, "settings/ranking-rules").await
}
/// PUT /indexes/:index/settings/ranking-rules
async fn update_ranking_rules(
State(state): State<ProxyState>,
Path(index): Path<String>,
body: Value,
body: Json<Value>,
) -> Result<Response, ErrorResponse> {
update_setting_with_rollback(&state, &index, "settings/ranking-rules", &body).await
update_setting_with_rollback(&state, &index, "settings/ranking-rules", &body.0).await
}
/// DELETE /indexes/:index/settings/ranking-rules
@ -413,7 +417,7 @@ async fn delete_ranking_rules(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Response, ErrorResponse> {
delete_setting(state, &index, "settings/ranking-rules").await
delete_setting(&state, &index, "settings/ranking-rules").await
}
/// GET /indexes/:index/settings/stop-words
@ -421,16 +425,16 @@ async fn get_stop_words(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Json<Value>, ErrorResponse> {
get_setting(state, &index, "settings/stop-words").await
get_setting(&state, &index, "settings/stop-words").await
}
/// PUT /indexes/:index/settings/stop-words
async fn update_stop_words(
State(state): State<ProxyState>,
Path(index): Path<String>,
body: Value,
body: Json<Value>,
) -> Result<Response, ErrorResponse> {
update_setting_with_rollback(&state, &index, "settings/stop-words", &body).await
update_setting_with_rollback(&state, &index, "settings/stop-words", &body.0).await
}
/// DELETE /indexes/:index/settings/stop-words
@ -438,7 +442,7 @@ async fn delete_stop_words(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Response, ErrorResponse> {
delete_setting(state, &index, "settings/stop-words").await
delete_setting(&state, &index, "settings/stop-words").await
}
/// GET /indexes/:index/settings/synonyms
@ -446,16 +450,16 @@ async fn get_synonyms(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Json<Value>, ErrorResponse> {
get_setting(state, &index, "settings/synonyms").await
get_setting(&state, &index, "settings/synonyms").await
}
/// PUT /indexes/:index/settings/synonyms
async fn update_synonyms(
State(state): State<ProxyState>,
Path(index): Path<String>,
body: Value,
body: Json<Value>,
) -> Result<Response, ErrorResponse> {
update_setting_with_rollback(&state, &index, "settings/synonyms", &body).await
update_setting_with_rollback(&state, &index, "settings/synonyms", &body.0).await
}
/// DELETE /indexes/:index/settings/synonyms
@ -463,7 +467,7 @@ async fn delete_synonyms(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Response, ErrorResponse> {
delete_setting(state, &index, "settings/synonyms").await
delete_setting(&state, &index, "settings/synonyms").await
}
/// GET /indexes/:index/settings/distinct-attribute
@ -471,16 +475,16 @@ async fn get_distinct_attribute(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Json<Value>, ErrorResponse> {
get_setting(state, &index, "settings/distinct-attribute").await
get_setting(&state, &index, "settings/distinct-attribute").await
}
/// PUT /indexes/:index/settings/distinct-attribute
async fn update_distinct_attribute(
State(state): State<ProxyState>,
Path(index): Path<String>,
body: Value,
body: Json<Value>,
) -> Result<Response, ErrorResponse> {
update_setting_with_rollback(&state, &index, "settings/distinct-attribute", &body).await
update_setting_with_rollback(&state, &index, "settings/distinct-attribute", &body.0).await
}
/// DELETE /indexes/:index/settings/distinct-attribute
@ -488,7 +492,7 @@ async fn delete_distinct_attribute(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Response, ErrorResponse> {
delete_setting(state, &index, "settings/distinct-attribute").await
delete_setting(&state, &index, "settings/distinct-attribute").await
}
/// GET /indexes/:index/settings/typo-tolerance
@ -496,16 +500,16 @@ async fn get_typo_tolerance(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Json<Value>, ErrorResponse> {
get_setting(state, &index, "settings/typo-tolerance").await
get_setting(&state, &index, "settings/typo-tolerance").await
}
/// PUT /indexes/:index/settings/typo-tolerance
async fn update_typo_tolerance(
State(state): State<ProxyState>,
Path(index): Path<String>,
body: Value,
body: Json<Value>,
) -> Result<Response, ErrorResponse> {
update_setting_with_rollback(&state, &index, "settings/typo-tolerance", &body).await
update_setting_with_rollback(&state, &index, "settings/typo-tolerance", &body.0).await
}
/// DELETE /indexes/:index/settings/typo-tolerance
@ -513,7 +517,7 @@ async fn delete_typo_tolerance(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Response, ErrorResponse> {
delete_setting(state, &index, "settings/typo-tolerance").await
delete_setting(&state, &index, "settings/typo-tolerance").await
}
/// GET /indexes/:index/settings/faceting
@ -521,16 +525,16 @@ async fn get_faceting(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Json<Value>, ErrorResponse> {
get_setting(state, &index, "settings/faceting").await
get_setting(&state, &index, "settings/faceting").await
}
/// PUT /indexes/:index/settings/faceting
async fn update_faceting(
State(state): State<ProxyState>,
Path(index): Path<String>,
body: Value,
body: Json<Value>,
) -> Result<Response, ErrorResponse> {
update_setting_with_rollback(&state, &index, "settings/faceting", &body).await
update_setting_with_rollback(&state, &index, "settings/faceting", &body.0).await
}
/// DELETE /indexes/:index/settings/faceting
@ -538,7 +542,7 @@ async fn delete_faceting(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Response, ErrorResponse> {
delete_setting(state, &index, "settings/faceting").await
delete_setting(&state, &index, "settings/faceting").await
}
/// GET /indexes/:index/settings/pagination
@ -546,16 +550,16 @@ async fn get_pagination(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Json<Value>, ErrorResponse> {
get_setting(state, &index, "settings/pagination").await
get_setting(&state, &index, "settings/pagination").await
}
/// PUT /indexes/:index/settings/pagination
async fn update_pagination(
State(state): State<ProxyState>,
Path(index): Path<String>,
body: Value,
body: Json<Value>,
) -> Result<Response, ErrorResponse> {
update_setting_with_rollback(&state, &index, "settings/pagination", &body).await
update_setting_with_rollback(&state, &index, "settings/pagination", &body.0).await
}
/// DELETE /indexes/:index/settings/pagination
@ -563,7 +567,7 @@ async fn delete_pagination(
State(state): State<ProxyState>,
Path(index): Path<String>,
) -> Result<Response, ErrorResponse> {
delete_setting(state, &index, "settings/pagination").await
delete_setting(&state, &index, "settings/pagination").await
}
/// Generic GET handler for a setting.
@ -592,7 +596,9 @@ async fn get_setting(
if let Some(resp) = result.responses.first() {
if resp.status == 200 {
return Ok(Json(resp.body.clone()));
let value: Value = serde_json::from_slice(&resp.body)
.map_err(|_| ErrorResponse::internal_error("Invalid JSON response"))?;
return Ok(Json(value));
} else if resp.status == 404 {
return Err(ErrorResponse::index_not_found(index));
}
@ -631,7 +637,9 @@ async fn delete_setting(
if let Some(resp) = result.responses.first() {
let status = axum::http::StatusCode::from_u16(resp.status).unwrap_or(axum::http::StatusCode::OK);
return Ok((status, Json(resp.body.clone())).into_response());
let value: Value = serde_json::from_slice(&resp.body)
.unwrap_or_else(|_| Value::Null);
return Ok((status, Json(value)).into_response());
}
Ok((axum::http::StatusCode::ACCEPTED, Json(serde_json::json!({}))).into_response())

View file

@ -49,7 +49,7 @@ struct TasksQuery {
}
/// Task response from a single node.
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct NodeTask {
task_uid: u64,

View file

@ -45,8 +45,8 @@ impl HttpScatter {
method: &str,
path: &str,
body: &[u8],
headers: &[String],
) -> Result<NodeResponse, NodeId> {
headers: &[(String, String)],
) -> Result<miroir_core::scatter::NodeResponse> {
// Check retry cache first if enabled
if let Some(ref cache) = self.retry_cache {
let cache_key = RetryCache::cache_key(body, node_id.as_str(), None);
@ -73,7 +73,7 @@ impl HttpScatter {
match result {
Ok(Ok(resp)) => {
let body_bytes = serde_json::to_vec(&resp.body).unwrap_or_default();
let body_bytes = resp.body;
let status = resp.status;
// Cache successful responses
@ -90,14 +90,17 @@ impl HttpScatter {
}
}
Ok(NodeResponse {
Ok(miroir_core::scatter::NodeResponse {
node_id: node_id.clone(),
body: body_bytes,
status,
headers: resp.headers,
})
}
_ => Err(node_id.clone()),
_ => Err(MiroirError::Routing(format!(
"request to node {} timed out",
node_id.as_str()
))),
}
}
}
@ -124,7 +127,7 @@ impl Scatter for HttpScatter {
let topo = topology.clone();
async move {
this.send_to_node_with_cache(
match this.send_to_node_with_cache(
&topo,
&node_id,
&method,
@ -133,6 +136,10 @@ impl Scatter for HttpScatter {
&headers,
)
.await
{
Ok(resp) => Ok(resp),
Err(_) => Err(node_id),
}
}
})
.collect();