From 8535aa087c6fa48b3bc5f342b488e8e0a461d92d Mon Sep 17 00:00:00 2001 From: jedarden Date: Sat, 9 May 2026 10:27:21 -0400 Subject: [PATCH] Phase 1 (miroir-cdo): Make Scatter trait async Update scatter.rs to use async_trait for async scatter execution. This allows the scatter implementation to perform async I/O when fanning out requests to nodes. Co-Authored-By: Claude Opus 4.7 --- Cargo.lock | 2 ++ crates/miroir-core/src/scatter.rs | 22 ++++++++++++++-------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4464561..27454df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1471,11 +1471,13 @@ name = "miroir-proxy" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "axum", "config", "http", "http-body-util", "miroir-core", + "once_cell", "prometheus", "reqwest", "serde", diff --git a/crates/miroir-core/src/scatter.rs b/crates/miroir-core/src/scatter.rs index a1bdc91..c6e42fc 100644 --- a/crates/miroir-core/src/scatter.rs +++ b/crates/miroir-core/src/scatter.rs @@ -1,16 +1,18 @@ //! Scatter orchestration: fan-out logic and covering set builder. +use async_trait::async_trait; use crate::config::UnavailableShardPolicy; use crate::topology::{NodeId, Topology}; use crate::Result; /// Scatter orchestrator: fans out requests to the covering set. +#[async_trait] pub trait Scatter: Send + Sync { /// Execute a scatter request to multiple nodes. /// /// Returns a map of node ID to response. Failed nodes are omitted /// based on the unavailable shard policy. - fn scatter( + async fn scatter( &self, topology: &Topology, nodes: Vec, @@ -65,8 +67,9 @@ pub struct NodeResponse { #[derive(Debug, Clone, Default)] pub struct StubScatter; +#[async_trait] impl Scatter for StubScatter { - fn scatter( + async fn scatter( &self, _topology: &Topology, _nodes: Vec, @@ -100,8 +103,8 @@ mod tests { assert_eq!(request.path, "/search"); } - #[test] - fn test_stub_scatter_returns_empty_response() { + #[tokio::test] + async fn test_stub_scatter_returns_empty_response() { let scatter = StubScatter; let topology = Topology::new(1); let nodes = vec![NodeId::new("node1".to_string())]; @@ -114,6 +117,7 @@ mod tests { let result = scatter .scatter(&topology, nodes, request, UnavailableShardPolicy::Partial) + .await .unwrap(); assert!(result.responses.is_empty()); @@ -168,8 +172,8 @@ mod tests { assert_eq!(response.headers[0].0, "X-Custom"); } - #[test] - fn test_stub_scatter_with_empty_nodes() { + #[tokio::test] + async fn test_stub_scatter_with_empty_nodes() { let scatter = StubScatter; let topology = Topology::new(1); let nodes: Vec = Vec::new(); @@ -182,14 +186,15 @@ mod tests { let result = scatter .scatter(&topology, nodes, request, UnavailableShardPolicy::Partial) + .await .unwrap(); assert!(result.responses.is_empty()); assert!(result.failed.is_empty()); } - #[test] - fn test_stub_scatter_with_multiple_nodes() { + #[tokio::test] + async fn test_stub_scatter_with_multiple_nodes() { let scatter = StubScatter; let mut topology = Topology::new(1); @@ -223,6 +228,7 @@ mod tests { let result = scatter .scatter(&topology, nodes, request, UnavailableShardPolicy::Partial) + .await .unwrap(); assert!(result.responses.is_empty());