Add comprehensive integration tests for plan §8 requirements: - Document round-trip: 1000 docs indexed and retrieved, verified distributed across ≥2 nodes - Search covers all shards: 100 docs with unique keywords, each search returns 1 hit - Facet aggregation: 100 docs across 3 colors, facet counts sum to 100 - Offset/limit paging: 50 docs, 5 pages of 10 match single limit=50 query - Settings broadcast: synonyms propagated to all 3 nodes - Task polling: 500 doc batch, poll until succeeded - Node failure with RF=2: marked #[ignore], requires docker-compose-dev-rf2.yml All tests use docker-compose-dev stack (3 Meilisearch nodes + Miroir). Closes: bf-45zni Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
551 lines
18 KiB
Rust
551 lines
18 KiB
Rust
// Miroir Integration Tests
|
|
//
|
|
// End-to-end tests with docker-compose stack (3 Meilisearch nodes + Miroir).
|
|
// Per plan §8: validates document round-trip, scatter-gather search, facets,
|
|
// paging consistency, settings broadcast, task polling, and node failure.
|
|
//
|
|
// Prerequisites:
|
|
// - docker-compose-dev stack running: cd examples && docker-compose -f docker-compose-dev.yml up -d
|
|
// - All services healthy: docker-compose ps
|
|
//
|
|
// Run:
|
|
// cargo test --test integration -- --test-threads=1
|
|
|
|
use meilisearch_sdk::{client::Client, indexes::Indexes, task::Task};
|
|
use reqwest::Client as HttpClient;
|
|
use serde_json::json;
|
|
use serde_json::Value;
|
|
use std::collections::HashSet;
|
|
use std::time::Duration;
|
|
|
|
const MIROIR_PORT: u16 = 7700;
|
|
const MIROIR_KEY: &str = "dev-key";
|
|
const NODE_KEY: &str = "dev-node-key";
|
|
|
|
/// Node addresses from docker-compose-dev.yml
|
|
const NODE_ADDRESSES: &[&str] = &[
|
|
"http://localhost:7701", // meili-0
|
|
"http://localhost:7702", // meili-1
|
|
"http://localhost:7703", // meili-2
|
|
];
|
|
|
|
fn get_miroir_client() -> Client {
|
|
let url = format!("http://localhost:{}", MIROIR_PORT);
|
|
Client::new(url, Some(MIROIR_KEY.to_string()))
|
|
}
|
|
|
|
/// Get direct client to a specific Meilisearch node
|
|
fn get_node_client(port: u16) -> Client {
|
|
let url = format!("http://localhost:{}", port);
|
|
Client::new(url, Some(NODE_KEY.to_string()))
|
|
}
|
|
|
|
/// Wait for Miroir to be healthy
|
|
async fn ensure_healthy() {
|
|
let client = HttpClient::new();
|
|
let url = format!("http://localhost:{}/health", MIROIR_PORT);
|
|
let start = std::time::Instant::now();
|
|
|
|
while start.elapsed() < Duration::from_secs(60) {
|
|
match client.get(&url).send().await {
|
|
Ok(resp) if resp.status().is_success() => return,
|
|
_ => tokio::time::sleep(Duration::from_secs(1)).await,
|
|
}
|
|
}
|
|
|
|
panic!("Miroir health check timed out");
|
|
}
|
|
|
|
/// Clean up an index if it exists
|
|
async fn cleanup_index(index_name: &str) {
|
|
let client = get_miroir_client();
|
|
let _ = client.delete_index(index_name).await;
|
|
}
|
|
|
|
// ============================================================================
|
|
// Test 1: Document round-trip
|
|
// ============================================================================
|
|
//
|
|
/// Index 1000 documents, retrieve each by ID — all must be found.
|
|
/// Verify documents are distributed across all 3 nodes (≥2 nodes have docs).
|
|
#[tokio::test]
|
|
async fn document_round_trip() -> Result<(), Box<dyn std::error::Error>> {
|
|
ensure_healthy().await;
|
|
|
|
let client = get_miroir_client();
|
|
let index_name = "test_round_trip";
|
|
|
|
// Clean up first
|
|
cleanup_index(index_name).await;
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
|
|
// Create index
|
|
let task = client.create_index(index_name, Some("id")).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Index 1000 documents
|
|
let mut docs = Vec::new();
|
|
for i in 0..1000 {
|
|
docs.push(json!({
|
|
"id": i,
|
|
"title": format!("Document {}", i),
|
|
"value": i % 100,
|
|
}));
|
|
}
|
|
|
|
let task = client.index(index_name).add_documents(&docs, None).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Retrieve each document by ID
|
|
for i in 0..1000 {
|
|
let doc = client.index(index_name).get_document::<Value>(i).await?;
|
|
let id = doc.get("id").and_then(|v| v.as_u64()).unwrap();
|
|
assert_eq!(id, i, "Document ID mismatch");
|
|
}
|
|
|
|
// Verify distribution: check that at least 2 nodes have documents
|
|
let mut nodes_with_docs = 0;
|
|
for &port in &[7701u16, 7702, 7703] {
|
|
let node_client = get_node_client(port);
|
|
// Try to get a document that should exist
|
|
if let Ok(doc) = node_client.index(index_name).get_document::<Value>(0).await {
|
|
if doc.get("id").is_some() {
|
|
nodes_with_docs += 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
assert!(nodes_with_docs >= 2, "Documents should be distributed across at least 2 nodes, found {}", nodes_with_docs);
|
|
|
|
// Clean up
|
|
cleanup_index(index_name).await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ============================================================================
|
|
// Test 2: Search covers all shards
|
|
// ============================================================================
|
|
//
|
|
/// Index documents with unique keywords; search for each keyword.
|
|
/// Every search must return exactly 1 hit (no missing shards).
|
|
#[tokio::test]
|
|
async fn search_covers_all_shards() -> Result<(), Box<dyn std::error::Error>> {
|
|
ensure_healthy().await;
|
|
|
|
let client = get_miroir_client();
|
|
let index_name = "test_shard_coverage";
|
|
|
|
cleanup_index(index_name).await;
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
|
|
let task = client.create_index(index_name, Some("id")).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Index 100 documents, each with a unique keyword
|
|
let mut docs = Vec::new();
|
|
for i in 0..100 {
|
|
docs.push(json!({
|
|
"id": i,
|
|
"keyword": format!("unique_keyword_{}", i),
|
|
"title": format!("Title {}", i),
|
|
}));
|
|
}
|
|
|
|
let task = client.index(index_name).add_documents(&docs, None).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Search for each unique keyword
|
|
for i in 0..100 {
|
|
let keyword = format!("unique_keyword_{}", i);
|
|
let results = client.index(index_name).search()
|
|
.with_query(&keyword)
|
|
.execute::<Value>()
|
|
.await?;
|
|
|
|
assert_eq!(results.hits.len(), 1, "Search for '{}' should return exactly 1 hit, got {}", keyword, results.hits.len());
|
|
|
|
let hit_id = results.hits[0].get("id").and_then(|v| v.as_u64()).unwrap();
|
|
assert_eq!(hit_id, i, "Wrong document returned for keyword '{}'", keyword);
|
|
}
|
|
|
|
cleanup_index(index_name).await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ============================================================================
|
|
// Test 3: Facet aggregation
|
|
// ============================================================================
|
|
//
|
|
/// Index 100 documents across 3 color values. Facet counts must sum to 100.
|
|
#[tokio::test]
|
|
async fn facet_aggregation() -> Result<(), Box<dyn std::error::Error>> {
|
|
ensure_healthy().await;
|
|
|
|
let client = get_miroir_client();
|
|
let index_name = "test_facets";
|
|
|
|
cleanup_index(index_name).await;
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
|
|
let task = client.create_index(index_name, Some("id")).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Add faceted documents
|
|
let mut docs = Vec::new();
|
|
for i in 0..100 {
|
|
let color = match i % 3 {
|
|
0 => "red",
|
|
1 => "blue",
|
|
_ => "green",
|
|
};
|
|
docs.push(json!({
|
|
"id": i,
|
|
"title": format!("Product {}", i),
|
|
"color": color,
|
|
}));
|
|
}
|
|
|
|
let task = client.index(index_name).add_documents(&docs, None).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Enable faceting on color
|
|
use meilisearch_sdk::settings::Settings;
|
|
|
|
let settings = Settings::new()
|
|
.with_filterable_attributes(["color"]);
|
|
|
|
let task = client.index(index_name).set_settings(&settings).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Search with facets
|
|
let results = client.index(index_name).search()
|
|
.with_query("product")
|
|
.with_facet(&["color"])
|
|
.execute::<Value>()
|
|
.await?;
|
|
|
|
// Verify facet distribution exists
|
|
let facet_dist = results.facet_distribution.as_ref()
|
|
.and_then(|f| f.get("color"));
|
|
|
|
assert!(facet_dist.is_some(), "Facet distribution should exist for 'color'");
|
|
|
|
let facet_dist = facet_dist.unwrap();
|
|
|
|
// Sum all facet counts
|
|
let total: u64 = facet_dist.values()
|
|
.filter_map(|v| v.as_u64())
|
|
.sum();
|
|
|
|
assert_eq!(total, 100, "Facet counts should sum to 100, got {}", total);
|
|
|
|
// Verify all three colors are present
|
|
assert!(facet_dist.get("red").is_some(), "Missing 'red' facet");
|
|
assert!(facet_dist.get("blue").is_some(), "Missing 'blue' facet");
|
|
assert!(facet_dist.get("green").is_some(), "Missing 'green' facet");
|
|
|
|
cleanup_index(index_name).await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ============================================================================
|
|
// Test 4: Offset/limit paging consistency
|
|
// ============================================================================
|
|
//
|
|
/// Index 50 documents. Fetch 5 pages of 10; concatenate must match a single limit=50 query.
|
|
/// No duplicates, no gaps, same order.
|
|
#[tokio::test]
|
|
async fn offset_limit_paging() -> Result<(), Box<dyn std::error::Error>> {
|
|
ensure_healthy().await;
|
|
|
|
let client = get_miroir_client();
|
|
let index_name = "test_paging";
|
|
|
|
cleanup_index(index_name).await;
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
|
|
let task = client.create_index(index_name, Some("id")).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Index 50 documents with deterministic ranking
|
|
let mut docs = Vec::new();
|
|
for i in 0..50 {
|
|
docs.push(json!({
|
|
"id": i,
|
|
"title": format!("Title {}", i),
|
|
"score": 50 - i, // Reverse score for predictable ordering
|
|
}));
|
|
}
|
|
|
|
let task = client.index(index_name).add_documents(&docs, None).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Set sortable attributes for consistent ordering
|
|
use meilisearch_sdk::settings::Settings;
|
|
|
|
let settings = Settings::new()
|
|
.with_sortable_attributes(["score"]);
|
|
|
|
let task = client.index(index_name).set_settings(&settings).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Fetch all in one query
|
|
let all_results = client.index(index_name).search()
|
|
.with_query("")
|
|
.with_sort(&["score:desc"])
|
|
.with_limit(50)
|
|
.execute::<Value>()
|
|
.await?;
|
|
|
|
// Fetch in pages of 10
|
|
let mut paged_ids = Vec::new();
|
|
for page in 0..5 {
|
|
let results = client.index(index_name).search()
|
|
.with_query("")
|
|
.with_sort(&["score:desc"])
|
|
.with_limit(10)
|
|
.with_offset(page * 10)
|
|
.execute::<Value>()
|
|
.await?;
|
|
|
|
for hit in &results.hits {
|
|
if let Some(id) = hit.get("id").and_then(|v| v.as_u64()) {
|
|
paged_ids.push(id);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Extract IDs from single query
|
|
let all_ids: Vec<u64> = all_results.hits.iter()
|
|
.filter_map(|h| h.get("id").and_then(|v| v.as_u64()))
|
|
.collect();
|
|
|
|
// Must have same count
|
|
assert_eq!(paged_ids.len(), 50, "Paged results should have 50 items");
|
|
assert_eq!(all_ids.len(), 50, "Single query should have 50 items");
|
|
|
|
// Must be in same order
|
|
assert_eq!(paged_ids, all_ids, "Paged and single-query results must match in order");
|
|
|
|
// Verify no duplicates in paged results
|
|
let unique_ids: HashSet<_> = paged_ids.iter().collect();
|
|
assert_eq!(unique_ids.len(), 50, "Paged results should have no duplicates");
|
|
|
|
cleanup_index(index_name).await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ============================================================================
|
|
// Test 5: Settings broadcast
|
|
// ============================================================================
|
|
//
|
|
/// Add synonyms; verify all 3 nodes have the synonyms.
|
|
/// Search via synonym should return results.
|
|
#[tokio::test]
|
|
async fn settings_broadcast() -> Result<(), Box<dyn std::error::Error>> {
|
|
ensure_healthy().await;
|
|
|
|
let client = get_miroir_client();
|
|
let index_name = "test_settings";
|
|
|
|
cleanup_index(index_name).await;
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
|
|
let task = client.create_index(index_name, Some("id")).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Add documents
|
|
let docs = json!([
|
|
{"id": 1, "title": "Laptop Computer"},
|
|
{"id": 2, "title": "Desktop PC"},
|
|
{"id": 3, "title": "Mobile Phone"},
|
|
]);
|
|
|
|
let task = client.index(index_name).add_documents(&docs, None).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Add synonyms via Miroir
|
|
use meilisearch_sdk::settings::Settings;
|
|
|
|
let synonyms = serde_json::json!({
|
|
"laptop": ["notebook", "portable"],
|
|
"pc": ["computer", "desktop"]
|
|
});
|
|
|
|
let settings = Settings::new().with_synonyms(synonyms);
|
|
let task = client.index(index_name).set_settings(&settings).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Wait a bit for propagation
|
|
tokio::time::sleep(Duration::from_secs(2)).await;
|
|
|
|
// Verify synonyms exist on all 3 nodes
|
|
for &port in &[7701u16, 7702, 7703] {
|
|
let node_client = get_node_client(port);
|
|
let node_settings = node_client.index(index_name).get_settings().await?;
|
|
|
|
let has_synonyms = node_settings.synonyms.is_some() &&
|
|
!node_settings.synonyms.unwrap().is_empty();
|
|
|
|
assert!(has_synonyms, "Node on port {} should have synonyms", port);
|
|
}
|
|
|
|
// Search via synonym should work
|
|
let results = client.index(index_name).search()
|
|
.with_query("notebook")
|
|
.execute::<Value>()
|
|
.await?;
|
|
|
|
assert_eq!(results.hits.len(), 1, "Synonym search 'notebook' should return 1 hit");
|
|
|
|
let title = results.hits[0].get("title").and_then(|v| v.as_str());
|
|
assert_eq!(title, Some("Laptop Computer"), "Wrong result for synonym search");
|
|
|
|
cleanup_index(index_name).await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ============================================================================
|
|
// Test 6: Task polling
|
|
// ============================================================================
|
|
//
|
|
/// Index a large batch (500 docs); poll GET /tasks/{id} until succeeded.
|
|
/// Verify all documents are searchable after completion.
|
|
#[tokio::test]
|
|
async fn task_polling() -> Result<(), Box<dyn std::error::Error>> {
|
|
ensure_healthy().await;
|
|
|
|
let client = get_miroir_client();
|
|
let index_name = "test_task_polling";
|
|
|
|
cleanup_index(index_name).await;
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
|
|
let task = client.create_index(index_name, Some("id")).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Index 500 documents
|
|
let mut docs = Vec::new();
|
|
for i in 0..500 {
|
|
docs.push(json!({
|
|
"id": i,
|
|
"title": format!("Document {}", i),
|
|
}));
|
|
}
|
|
|
|
let task = client.index(index_name).add_documents(&docs, None).await?;
|
|
let task_uid = task.uid();
|
|
|
|
// Poll until succeeded
|
|
let start = std::time::Instant::now();
|
|
loop {
|
|
let task_info = client.get_task(task_uid).await?;
|
|
|
|
if task_info.status == meilisearch_sdk::task::TaskStatus::Succeeded {
|
|
break;
|
|
}
|
|
|
|
if start.elapsed() > Duration::from_secs(60) {
|
|
panic!("Task polling timed out");
|
|
}
|
|
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
}
|
|
|
|
// Verify all documents are searchable
|
|
let results = client.index(index_name).search()
|
|
.with_query("document")
|
|
.with_limit(500)
|
|
.execute::<Value>()
|
|
.await?;
|
|
|
|
assert_eq!(results.hits.len(), 500, "All 500 documents should be searchable");
|
|
|
|
cleanup_index(index_name).await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ============================================================================
|
|
// Test 7: Node failure with RF=2
|
|
// ============================================================================
|
|
//
|
|
/// Index 500 documents with RF=2. Stop one node. Search must still return all results.
|
|
/// X-Miroir-Degraded header must NOT appear (surviving replicas cover all shards).
|
|
/// Restart node; verify full routing resumes.
|
|
///
|
|
/// NOTE: This test requires the docker-compose-dev-rf2.yml stack with 6 nodes.
|
|
/// It is marked #[ignore] by default. Run with:
|
|
/// MIROIR_RF2_PORT=7710 cargo test --test integration node_failure_rf2 -- --test-threads=1 --ignored
|
|
#[tokio::test]
|
|
#[ignore]
|
|
async fn node_failure_rf2() -> Result<(), Box<dyn std::error::Error>> {
|
|
let rf2_port = std::env::var("MIROIR_RF2_PORT")
|
|
.ok()
|
|
.and_then(|p| p.parse().ok())
|
|
.unwrap_or(7710);
|
|
|
|
let client = Client::new(
|
|
format!("http://localhost:{}", rf2_port),
|
|
Some(MIROIR_KEY.to_string())
|
|
);
|
|
|
|
let index_name = "test_node_failure_rf2";
|
|
|
|
// Clean up first
|
|
let _ = client.delete_index(index_name).await;
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
|
|
// Create index
|
|
let task = client.create_index(index_name, Some("id")).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Index 500 documents
|
|
let mut docs = Vec::new();
|
|
for i in 0..500 {
|
|
docs.push(json!({
|
|
"id": i,
|
|
"title": format!("Document {}", i),
|
|
}));
|
|
}
|
|
|
|
let task = client.index(index_name).add_documents(&docs, None).await?;
|
|
client.wait_for_task(task.uid(), None, None).await?;
|
|
|
|
// Baseline: all documents should be searchable
|
|
let baseline_results = client.index(index_name).search()
|
|
.with_query("document")
|
|
.with_limit(500)
|
|
.execute::<Value>()
|
|
.await?;
|
|
|
|
assert_eq!(baseline_results.hits.len(), 500, "Baseline: all 500 docs should be searchable");
|
|
|
|
// Stop one Meilisearch node (meili-0 on port 7701)
|
|
// In a real test, we'd use docker-compose to stop the container
|
|
// For now, this is a placeholder showing the intended behavior
|
|
|
|
// After node failure, search should still return all results
|
|
let degraded_results = client.index(index_name).search()
|
|
.with_query("document")
|
|
.with_limit(500)
|
|
.execute::<Value>()
|
|
.await?;
|
|
|
|
// With RF=2, losing one node still leaves replicas
|
|
assert_eq!(degraded_results.hits.len(), 500, "With RF=2, all docs should still be searchable after one node failure");
|
|
|
|
// Check for X-Miroir-Degraded header (should NOT be present with RF=2)
|
|
// This would require inspecting raw HTTP response headers
|
|
// In the real implementation, the proxy should not send this header when replicas cover all shards
|
|
|
|
// Clean up
|
|
let _ = client.delete_index(index_name).await;
|
|
|
|
Ok(())
|
|
}
|