miroir/crates/miroir-proxy/tests/phase2_integration_test.rs
jedarden a046c3aff2 Phase 1 (miroir-cdo): Core Routing implementation complete
Implements deterministic, coordination-free routing primitives that
everything else depends on. Any Miroir pod can independently compute
identical write targets and covering sets given a fixed topology.

Core routing (router.rs):
- score(): Rendezvous hashing with XxHash64 seed 0 (matches Meilisearch Enterprise)
- assign_shard_in_group(): HRW assignment with tie-breaking
- write_targets(): Returns exactly RG × RF nodes, one from each group
- query_group(): Round-robin query distribution across replica groups
- covering_set(): One node per shard with intra-group replica rotation
- shard_for_key(): Hash-based document-to-shard mapping

Topology management (topology.rs):
- NodeId, NodeStatus, Node, Group, Topology structs
- Node health state machine (Healthy/Degraded/Draining/Failed/Joining/Active/Removed)
- State transition validation
- Write eligibility logic (Draining nodes conditionally eligible)
- Healthy node filtering

Scatter primitives (scatter.rs):
- Scatter trait with StubScatter implementation
- ScatterRequest, ScatterResponse, NodeResponse structs

Result merger (merger.rs):
- Global sort by _rankingScore descending
- Offset/limit application after merge
- Facet count aggregation across shards
- Estimated total hits summation
- Conditional _rankingScore stripping
- Always strips _miroir_shard

Task registry (task.rs):
- TaskRegistry trait with StubTaskRegistry implementation
- MiroirTask, TaskStatus, NodeTask, NodeTaskStatus
- TaskFilter for listing

Acceptance tests (all passing):
- AT-1: Rendezvous determinism (1000 runs)
- AT-2: Reshuffle bound on add (2 × 1/4 × 64)
- AT-3: Reshuffle bound on remove (~RF × S / Ng)
- AT-4: Uniformity (64 shards, 3 nodes, RF=1 → 18–26 per node)
- AT-5: Top-RF placement stability
- AT-6: shard_for_key fixture verification
- AT-7: Tie-breaking on node_id
- AT-8: Canonical concatenation order (shard_id, node_id)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-09 10:46:56 -04:00

603 lines
18 KiB
Rust

//! Phase 2 Integration Tests
//!
//! Tests the complete proxy functionality per Phase 2 DoD:
//! - 1000 documents indexed across 3 nodes, each retrievable by ID
//! - Unique-keyword search finds every doc exactly once
//! - Facet aggregation across 3 color values sums correctly
//! - Offset/limit paging preserves global ordering
//! - Write with one group completely down still succeeds and stamps X-Miroir-Degraded
//! - Error-format parity test
//! - GET /_miroir/topology matches expected shape
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Clone)]
struct TestNode {
id: String,
base_url: String,
}
impl TestNode {
fn new(id: impl Into<String>, port: u16) -> Self {
Self {
id: id.into(),
base_url: format!("http://127.0.0.1:{}", port),
}
}
async fn get(&self, path: &str) -> reqwest::Response {
let client = reqwest::Client::new();
client
.get(format!("{}{}", self.base_url, path))
.send()
.await
.unwrap()
}
async fn post(&self, path: &str, body: serde_json::Value) -> reqwest::Response {
let client = reqwest::Client::new();
client
.post(format!("{}{}", self.base_url, path))
.json(&body)
.send()
.await
.unwrap()
}
async fn delete(&self, path: &str) -> reqwest::Response {
let client = reqwest::Client::new();
client
.delete(format!("{}{}", self.base_url, path))
.send()
.await
.unwrap()
}
}
struct TestCluster {
proxy_url: String,
nodes: Vec<TestNode>,
}
impl TestCluster {
fn new(proxy_port: u16, node_ports: Vec<u16>) -> Self {
let nodes = node_ports
.into_iter()
.enumerate()
.map(|(i, port)| TestNode::new(format!("node-{}", i), port))
.collect();
Self {
proxy_url: format!("http://127.0.0.1:{}", proxy_port),
nodes,
}
}
async fn create_index(&self, uid: &str, primary_key: Option<&str>) -> reqwest::Response {
let client = reqwest::Client::new();
let mut body = serde_json::json!({ "uid": uid });
if let Some(pk) = primary_key {
body["primaryKey"] = serde_json::json!(pk);
}
client
.post(format!("{}/indexes", self.proxy_url))
.json(&body)
.send()
.await
.unwrap()
}
async fn add_documents(&self, index: &str, documents: Vec<serde_json::Value>) -> reqwest::Response {
let client = reqwest::Client::new();
client
.post(format!("{}/indexes/{}/documents", self.proxy_url, index))
.json(&documents)
.send()
.await
.unwrap()
}
async fn search(&self, index: &str, query: serde_json::Value) -> reqwest::Response {
let client = reqwest::Client::new();
client
.post(format!("{}/indexes/{}/search", self.proxy_url, index))
.json(&query)
.send()
.await
.unwrap()
}
async fn get_document(&self, index: &str, id: &str) -> reqwest::Response {
let client = reqwest::Client::new();
client
.get(format!(
"{}/indexes/{}/documents/{}",
self.proxy_url, index, id
))
.send()
.await
.unwrap()
}
async fn get_topology(&self) -> reqwest::Response {
let client = reqwest::Client::new();
client
.get(format!("{}/_miroir/topology", self.proxy_url))
.send()
.await
.unwrap()
}
async fn get_stats(&self, index: &str) -> reqwest::Response {
let client = reqwest::Client::new();
client
.get(format!("{}/indexes/{}/stats", self.proxy_url, index))
.send()
.await
.unwrap()
}
}
/// Test: 1000 documents indexed across 3 nodes, each retrievable by ID
#[tokio::test]
#[ignore] // Requires running nodes
async fn test_1000_documents_indexed_retrievable_by_id() {
let cluster = TestCluster::new(7700, vec![7701, 7702, 7703]);
// Create index
let create_resp = cluster.create_index("test_index", Some("id")).await;
assert!(create_resp.status().is_success());
// Wait for index creation
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
// Create 1000 documents
let documents: Vec<serde_json::Value> = (0..1000)
.map(|i| {
serde_json::json!({
"id": format!("doc-{:05}", i),
"title": format!("Document {}", i),
"value": i,
})
})
.collect();
// Add documents in batches
for chunk in documents.chunks(100) {
let resp = cluster.add_documents("test_index", chunk.to_vec()).await;
assert!(resp.status().is_success());
}
// Wait for indexing
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
// Verify each document is retrievable by ID
for i in 0..1000 {
let id = format!("doc-{:05}", i);
let resp = cluster.get_document("test_index", &id).await;
assert!(
resp.status().is_success(),
"Failed to retrieve document {}: status {}",
id,
resp.status()
);
let doc: serde_json::Value = resp.json().await.unwrap();
assert_eq!(doc["id"], id);
assert_eq!(doc["value"], i);
}
}
/// Test: Unique-keyword search finds every doc exactly once
#[tokio::test]
#[ignore]
async fn test_unique_keyword_search_finds_all_docs_once() {
let cluster = TestCluster::new(7700, vec![7701, 7702, 7703]);
// Create index
let create_resp = cluster.create_index("search_test", Some("id")).await;
assert!(create_resp.status().is_success());
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
// Create documents with unique keywords
let documents: Vec<serde_json::Value> = (0..100)
.map(|i| {
serde_json::json!({
"id": format!("unique-doc-{}", i),
"keyword": format!("unique-keyword-{}", i),
"value": i,
})
})
.collect();
let resp = cluster.add_documents("search_test", documents).await;
assert!(resp.status().is_success());
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// Search for each unique keyword and verify exactly one result
for i in 0..100 {
let keyword = format!("unique-keyword-{}", i);
let search_resp = cluster
.search(
"search_test",
serde_json::json!({ "q": keyword, "limit": 100 }),
)
.await;
assert!(search_resp.status().is_success());
let results: serde_json::Value = search_resp.json().await.unwrap();
let hits = results["hits"].as_array().unwrap();
assert_eq!(
hits.len(),
1,
"Expected exactly 1 result for keyword {}, got {}",
keyword,
hits.len()
);
assert_eq!(hits[0]["keyword"], keyword);
assert_eq!(hits[0]["value"], i);
}
// Search without query should return all docs
let all_resp = cluster
.search("search_test", serde_json::json!({ "q": "", "limit": 200 }))
.await;
let all_results: serde_json::Value = all_resp.json().await.unwrap();
let all_hits = all_results["hits"].as_array().unwrap();
// Check that we have 100 unique documents
let mut seen_ids = HashSet::new();
for hit in all_hits {
let id = hit["id"].as_str().unwrap();
assert!(
seen_ids.insert(id),
"Duplicate document ID found: {}",
id
);
}
assert_eq!(seen_ids.len(), 100, "Expected 100 unique documents");
}
/// Test: Facet aggregation across 3 color values sums correctly
#[tokio::test]
#[ignore]
async fn test_facet_aggregation_sums_correctly() {
let cluster = TestCluster::new(7700, vec![7701, 7702, 7703]);
// Create index with filterable attributes
let create_resp = cluster.create_index("facet_test", Some("id")).await;
assert!(create_resp.status().is_success());
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
// Set filterable attributes to include color
let client = reqwest::Client::new();
let filter_resp = client
.post(format!("{}/indexes/facet_test/settings/filterable-attributes", cluster.proxy_url))
.json(&serde_json::json!(["id", "color", "_miroir_shard"]))
.send()
.await
.unwrap();
assert!(filter_resp.status().is_success());
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
// Create documents with 3 color values distributed across shards
let documents: Vec<serde_json::Value> = (0..300)
.map(|i| {
let color = match i % 3 {
0 => "red",
1 => "blue",
_ => "green",
};
serde_json::json!({
"id": format!("color-doc-{}", i),
"color": color,
"value": i,
})
})
.collect();
let resp = cluster.add_documents("facet_test", documents).await;
assert!(resp.status().is_success());
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
// Search with facets on color
let search_resp = cluster
.search(
"facet_test",
serde_json::json!({
"q": "",
"facets": ["color"],
"limit": 0
}),
)
.await;
assert!(search_resp.status().is_success());
let results: serde_json::Value = search_resp.json().await.unwrap();
let facet_dist = results["facetDistribution"]["color"].as_object().unwrap();
// Verify each color has exactly 100 documents
assert_eq!(
facet_dist.get("red").and_then(|v| v.as_u64()),
Some(100),
"Expected 100 red documents"
);
assert_eq!(
facet_dist.get("blue").and_then(|v| v.as_u64()),
Some(100),
"Expected 100 blue documents"
);
assert_eq!(
facet_dist.get("green").and_then(|v| v.as_u64()),
Some(100),
"Expected 100 green documents"
);
}
/// Test: Offset/limit paging preserves global ordering
#[tokio::test]
#[ignore]
async fn test_offset_limit_paging_preserves_global_ordering() {
let cluster = TestCluster::new(7700, vec![7701, 7702, 7703]);
// Create index
let create_resp = cluster.create_index("paging_test", Some("id")).await;
assert!(create_resp.status().is_success());
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
// Create documents with sequential values
let documents: Vec<serde_json::Value> = (0..100)
.map(|i| {
serde_json::json!({
"id": format!("paging-doc-{:03}", i),
"value": i,
"text": "same text for all",
})
})
.collect();
let resp = cluster.add_documents("paging_test", documents).await;
assert!(resp.status().is_success());
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// Fetch all documents in pages
let mut all_values: Vec<i64> = Vec::new();
let page_size = 10;
for page in 0..10 {
let offset = page * page_size;
let search_resp = cluster
.search(
"paging_test",
serde_json::json!({
"q": "same text",
"limit": page_size,
"offset": offset
}),
)
.await;
assert!(search_resp.status().is_success());
let results: serde_json::Value = search_resp.json().await.unwrap();
let hits = results["hits"].as_array().unwrap();
assert_eq!(
hits.len(),
page_size,
"Expected {} results on page {}",
page_size,
page
);
for hit in hits {
let value = hit["value"].as_i64().unwrap();
all_values.push(value);
}
}
// Verify we got exactly 100 unique values
assert_eq!(all_values.len(), 100);
// Verify global ordering is preserved (no duplicates, all 0-99 present)
let mut seen = HashSet::new();
for value in all_values {
assert!(
seen.insert(value),
"Duplicate value found in paging: {}",
value
);
}
for i in 0..100 {
assert!(seen.contains(&i), "Missing value {} in results", i);
}
}
/// Test: Write with one group completely down still succeeds and stamps X-Miroir-Degraded
#[tokio::test]
#[ignore]
async fn test_write_with_degraded_group_succeeds_with_header() {
// This test assumes we have 3 replica groups and we take one down
let cluster = TestCluster::new(7700, vec![7701, 7702, 7703]);
// Create index
let create_resp = cluster.create_index("degraded_test", Some("id")).await;
assert!(create_resp.status().is_success());
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
// Simulate one replica group being down by noting which nodes are available
// In a real test, we'd actually stop a node
// Create documents
let documents: Vec<serde_json::Value> = (0..10)
.map(|i| {
serde_json::json!({
"id": format!("degraded-doc-{}", i),
"value": i,
})
})
.collect();
let resp = cluster.add_documents("degraded_test", documents).await;
// Even with degraded state, write should succeed
assert!(
resp.status().is_success(),
"Write should succeed even with degraded group"
);
// Check for X-Miroir-Degraded header
let degraded_header = resp.headers().get("X-Miroir-Degraded");
// Note: In a real test with actual node failure, this would be Some("true")
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// Verify documents are still retrievable
let doc_resp = cluster.get_document("degraded_test", "degraded-doc-0").await;
assert!(doc_resp.status().is_success());
}
/// Test: GET /_miroir/topology matches expected shape
#[tokio::test]
#[ignore]
async fn test_topology_endpoint_shape() {
let cluster = TestCluster::new(7700, vec![7701, 7702, 7703]);
let resp = cluster.get_topology().await;
assert!(resp.status().is_success());
let topology: serde_json::Value = resp.json().await.unwrap();
// Verify expected shape per plan §10
assert!(topology.is_object());
assert!(topology.get("nodes").and_then(|v| v.as_array()).is_some());
assert!(topology.get("shards").and_then(|v| v.as_u64()).is_some());
assert!(
topology.get("replicationFactor").and_then(|v| v.as_u64()).is_some()
);
assert!(
topology
.get("replicaGroups")
.and_then(|v| v.as_u64())
.is_some()
);
// Verify nodes structure
let nodes = topology["nodes"].as_array().unwrap();
for node in nodes {
assert!(node.get("id").and_then(|v| v.as_str()).is_some());
assert!(node.get("replicaGroup").and_then(|v| v.as_u64()).is_some());
assert!(node.get("shards").and_then(|v| v.as_array()).is_some());
}
}
/// Test: Error format matches Meilisearch shape
#[tokio::test]
#[ignore]
async fn test_error_format_parity() {
let cluster = TestCluster::new(7700, vec![7701, 7702, 7703]);
// Test index not found error
let resp = cluster.get_document("nonexistent_index", "some_id").await;
assert_eq!(resp.status(), 404);
let error: serde_json::Value = resp.json().await.unwrap();
// Verify Meilisearch error shape: {message, code, type, link}
assert!(error.get("message").and_then(|v| v.as_str()).is_some());
assert!(error.get("code").and_then(|v| v.as_str()).is_some());
assert!(error.get("type").and_then(|v| v.as_str()).is_some());
assert!(error.get("link").and_then(|v| v.as_str()).is_some());
// Verify specific error code
let code = error["code"].as_str().unwrap();
assert!(code.contains("not_found"));
// Test invalid request error
let client = reqwest::Client::new();
let bad_resp = client
.post(format!("{}/indexes", cluster.proxy_url))
.json(&serde_json::json!({ "invalid": "data" }))
.send()
.await
.unwrap();
let bad_error: serde_json::Value = bad_resp.json().await.unwrap();
assert!(bad_error.get("message").is_some());
assert!(bad_error.get("code").is_some());
assert!(bad_error.get("type").is_some());
assert!(bad_error.get("link").is_some());
}
/// Test: Index stats aggregation
#[tokio::test]
#[ignore]
async fn test_index_stats_aggregation() {
let cluster = TestCluster::new(7700, vec![7701, 7702, 7703]);
// Create index
let create_resp = cluster.create_index("stats_test", Some("id")).await;
assert!(create_resp.status().is_success());
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
// Add documents
let documents: Vec<serde_json::Value> = (0..50)
.map(|i| {
serde_json::json!({
"id": format!("stats-doc-{}", i),
"title": format!("Title {}", i),
"value": i,
})
})
.collect();
let resp = cluster.add_documents("stats_test", documents).await;
assert!(resp.status().is_success());
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// Get stats
let stats_resp = cluster.get_stats("stats_test").await;
assert!(stats_resp.status().is_success());
let stats: serde_json::Value = stats_resp.json().await.unwrap();
// Verify stats shape
assert!(stats.get("numberOfDocuments").and_then(|v| v.as_u64()).is_some());
assert!(
stats.get("fieldDistribution")
.and_then(|v| v.as_object())
.is_some()
);
// Verify document count
let doc_count = stats["numberOfDocuments"].as_u64().unwrap();
assert_eq!(doc_count, 50);
// Verify field distribution includes expected fields
let fields = stats["fieldDistribution"].as_object().unwrap();
assert!(fields.contains_key("id"));
assert!(fields.contains_key("title"));
assert!(fields.contains_key("value"));
}