From 513e97d52c8a423fb0381a947b8e950495b94e7c Mon Sep 17 00:00:00 2001 From: jedarden Date: Sat, 18 Apr 2026 23:59:30 -0400 Subject: [PATCH] P1.6: Add property tests and criterion benchmarks for router MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add proptest-based property tests for router rendezvous: - Determinism: same inputs always produce same output - Minimal reshuffling bounds on node add/remove - Uniformity: shards distribute evenly across nodes - RF node count validation and no-duplicates - Add criterion benchmarks for router: - shard_for_key single and batch (10K docs) - assign_shard_in_group single and all (64 shards) - Full routing pipeline (hash -> shard -> assign) - Varying shard counts, node counts, and RF - Score function microbenchmark - Add criterion benchmarks for merger: - Merge 1000 hits from 3 shards (plan §8 target) - Varying hit counts and shard counts - Pagination, facets, score preservation - Degraded response handling - Register bench targets in Cargo.toml Co-Authored-By: Claude Opus 4.7 --- crates/miroir-core/Cargo.toml | 20 +- crates/miroir-core/benches/merger_bench.rs | 337 ++++++++++++++++++++ crates/miroir-core/benches/router_bench.rs | 206 ++++++++++++ crates/miroir-core/src/router.rs | 2 +- crates/miroir-core/tests/router_proptest.rs | 337 ++++++++++++++++++++ 5 files changed, 897 insertions(+), 5 deletions(-) create mode 100644 crates/miroir-core/benches/merger_bench.rs create mode 100644 crates/miroir-core/benches/router_bench.rs create mode 100644 crates/miroir-core/tests/router_proptest.rs diff --git a/crates/miroir-core/Cargo.toml b/crates/miroir-core/Cargo.toml index e00f918..aec6f57 100644 --- a/crates/miroir-core/Cargo.toml +++ b/crates/miroir-core/Cargo.toml @@ -7,14 +7,15 @@ repository.workspace = true autobenches = false [dependencies] -serde = { version = "1", features = ["derive"] } -serde_json = "1" +serde = { workspace = true } +serde_json = { workspace = true } serde_yaml = "0.9" twox-hash = "2" -thiserror = "2" -tracing = "0.1" +thiserror = { workspace = true } +tracing = { workspace = true } uuid = { version = "1", features = ["v4", "serde"] } config = "0.14" +rusqlite = { workspace = true } # Raft prototype (P12.OP2 research) — not for production use # openraft 0.9.22 fails on stable Rust 1.87 (validit uses let_chains). @@ -31,4 +32,15 @@ raft-proto = ["bincode"] name = "bench-reshard-load" path = "benches/reshard_load.rs" +[[bench]] +name = "merger_bench" +harness = false + +[[bench]] +name = "router_bench" +harness = false + [dev-dependencies] +tempfile = "3" +proptest = "1" +criterion = "0.5" diff --git a/crates/miroir-core/benches/merger_bench.rs b/crates/miroir-core/benches/merger_bench.rs new file mode 100644 index 0000000..bc057f7 --- /dev/null +++ b/crates/miroir-core/benches/merger_bench.rs @@ -0,0 +1,337 @@ +//! Criterion benchmarks for merger. +//! +//! Target (plan §8): +//! - Merger (1000 hits, 3 shards) < 1 ms + +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; +use miroir_core::merger::{merge, MergeInput, ShardHitPage}; +use serde_json::json; + +const TARGET_HITS: usize = 1000; +const TARGET_SHARDS: usize = 3; + +/// Helper to create a hit document. +fn make_hit(id: &str, score: f64) -> serde_json::Value { + json!({ + "id": id, + "title": format!("Document {}", id), + "_rankingScore": score, + "_miroir_shard": id.parse::().unwrap_or(0) % TARGET_SHARDS as u32, + }) +} + +/// Helper to create a shard response with hits. +fn make_shard_response( + hits: Vec, + total_hits: u64, + processing_time: u64, +) -> ShardHitPage { + ShardHitPage { + body: json!({ + "hits": hits, + "estimatedTotalHits": total_hits, + "processingTimeMs": processing_time, + "facetDistribution": { + "category": { + "electronics": 50, + "books": 30, + }, + }, + }), + } +} + +/// Benchmark: Merge 1000 hits from 3 shards. +/// +/// This is the primary benchmark target for plan §8. +/// Each shard returns ~333 hits, globally sorted, with offset=0, limit=1000. +fn bench_merge_1000_hits_3_shards(c: &mut Criterion) { + let hits_per_shard = TARGET_HITS / TARGET_SHARDS; + + let shard_hits: Vec = (0..TARGET_SHARDS) + .map(|shard_id| { + let hits: Vec = (0..hits_per_shard) + .map(|i| { + let id = shard_id * hits_per_shard + i; + let score = (TARGET_HITS - id) as f64 / TARGET_HITS as f64; + make_hit(&id.to_string(), score) + }) + .collect(); + make_shard_response(hits, hits_per_shard as u64, 15) + }) + .collect(); + + let input = MergeInput { + shard_hits, + offset: 0, + limit: TARGET_HITS, + client_requested_score: false, + facets: None, + }; + + c.bench_function("merge_1000_hits_3_shards", |b| { + b.iter(|| { + black_box(merge(black_box(input.clone()))).unwrap(); + }); + }); +} + +/// Benchmark: Merge with varying hit counts. +fn bench_varying_hit_count(c: &mut Criterion) { + let mut group = c.benchmark_group("varying_hit_count"); + + for hit_count in [100, 500, 1000, 5000, 10000].iter() { + let hits_per_shard = hit_count / TARGET_SHARDS; + + let shard_hits: Vec = (0..TARGET_SHARDS) + .map(|shard_id| { + let hits: Vec = (0..hits_per_shard) + .map(|i| { + let id = shard_id * hits_per_shard + i; + let score = (*hit_count - id) as f64 / *hit_count as f64; + make_hit(&id.to_string(), score) + }) + .collect(); + make_shard_response(hits, hits_per_shard as u64, 15) + }) + .collect(); + + let input = MergeInput { + shard_hits, + offset: 0, + limit: *hit_count, + client_requested_score: false, + facets: None, + }; + + group.bench_with_input(BenchmarkId::from_parameter(hit_count), hit_count, |b, _| { + b.iter(|| { + black_box(merge(black_box(input.clone()))).unwrap(); + }); + }); + } + group.finish(); +} + +/// Benchmark: Merge with varying shard counts. +fn bench_varying_shard_count(c: &mut Criterion) { + let total_hits = TARGET_HITS; + let mut group = c.benchmark_group("varying_shard_count"); + + for shard_count in [1, 2, 3, 5, 10].iter() { + let hits_per_shard = total_hits / shard_count; + + let shard_hits: Vec = (0..*shard_count) + .map(|shard_id| { + let hits: Vec = (0..hits_per_shard) + .map(|i| { + let id = shard_id * hits_per_shard + i; + let score = (total_hits - id) as f64 / total_hits as f64; + make_hit(&id.to_string(), score) + }) + .collect(); + make_shard_response(hits, hits_per_shard as u64, 15) + }) + .collect(); + + let input = MergeInput { + shard_hits, + offset: 0, + limit: total_hits, + client_requested_score: false, + facets: None, + }; + + group.bench_with_input(BenchmarkId::from_parameter(shard_count), shard_count, |b, _| { + b.iter(|| { + black_box(merge(black_box(input.clone()))).unwrap(); + }); + }); + } + group.finish(); +} + +/// Benchmark: Merge with offset/limit pagination. +fn bench_pagination(c: &mut Criterion) { + let hits_per_shard = TARGET_HITS / TARGET_SHARDS; + + let shard_hits: Vec = (0..TARGET_SHARDS) + .map(|shard_id| { + let hits: Vec = (0..hits_per_shard) + .map(|i| { + let id = shard_id * hits_per_shard + i; + let score = (TARGET_HITS - id) as f64 / TARGET_HITS as f64; + make_hit(&id.to_string(), score) + }) + .collect(); + make_shard_response(hits, hits_per_shard as u64, 15) + }) + .collect(); + + let mut group = c.benchmark_group("pagination"); + + // Test different (offset, limit) combinations + let cases = vec![ + (0, 10, "first_page"), + (10, 10, "second_page"), + (100, 10, "deep_page"), + (0, 100, "large_page"), + (0, 500, "half_result"), + ]; + + for (offset, limit, name) in cases { + let input = MergeInput { + shard_hits: shard_hits.clone(), + offset, + limit, + client_requested_score: false, + facets: None, + }; + + group.bench_function(name, |b| { + b.iter(|| { + black_box(merge(black_box(input.clone()))).unwrap(); + }); + }); + } + group.finish(); +} + +/// Benchmark: Merge with facets. +fn bench_with_facets(c: &mut Criterion) { + let hits_per_shard = TARGET_HITS / TARGET_SHARDS; + + let shard_hits: Vec = (0..TARGET_SHARDS) + .map(|shard_id| { + let hits: Vec = (0..hits_per_shard) + .map(|i| { + let id = shard_id * hits_per_shard + i; + let score = (TARGET_HITS - id) as f64 / TARGET_HITS as f64; + make_hit(&id.to_string(), score) + }) + .collect(); + ShardHitPage { + body: json!({ + "hits": hits, + "estimatedTotalHits": hits_per_shard as u64, + "processingTimeMs": 15, + "facetDistribution": { + "category": { + "electronics": 50, + "books": 30, + "clothing": 20, + }, + "brand": { + "apple": 25, + "samsung": 15, + "sony": 10, + }, + }, + }), + } + }) + .collect(); + + let input = MergeInput { + shard_hits, + offset: 0, + limit: TARGET_HITS, + client_requested_score: false, + facets: Some(vec!["category".to_string(), "brand".to_string()]), + }; + + c.bench_function("merge_with_facets", |b| { + b.iter(|| { + black_box(merge(black_box(input.clone()))).unwrap(); + }); + }); +} + +/// Benchmark: Merge with score preservation. +fn bench_with_score_preservation(c: &mut Criterion) { + let hits_per_shard = TARGET_HITS / TARGET_SHARDS; + + let shard_hits: Vec = (0..TARGET_SHARDS) + .map(|shard_id| { + let hits: Vec = (0..hits_per_shard) + .map(|i| { + let id = shard_id * hits_per_shard + i; + let score = (TARGET_HITS - id) as f64 / TARGET_HITS as f64; + make_hit(&id.to_string(), score) + }) + .collect(); + make_shard_response(hits, hits_per_shard as u64, 15) + }) + .collect(); + + let input = MergeInput { + shard_hits, + offset: 0, + limit: TARGET_HITS, + client_requested_score: true, + facets: None, + }; + + c.bench_function("merge_with_score", |b| { + b.iter(|| { + black_box(merge(black_box(input.clone()))).unwrap(); + }); + }); +} + +/// Benchmark: Merge with degraded shards (simulating node failures). +fn bench_degraded_response(c: &mut Criterion) { + let hits_per_shard = TARGET_HITS / TARGET_SHARDS; + + let shard_hits: Vec = vec![ + // Healthy shard + make_shard_response( + (0..hits_per_shard) + .map(|i| make_hit(&i.to_string(), (TARGET_HITS - i) as f64 / TARGET_HITS as f64)) + .collect(), + hits_per_shard as u64, + 15, + ), + // Failed shard + ShardHitPage { + body: json!({ + "success": false, + "message": "node unavailable", + }), + }, + // Another healthy shard + make_shard_response( + (hits_per_shard..2 * hits_per_shard) + .map(|i| make_hit(&i.to_string(), (TARGET_HITS - i) as f64 / TARGET_HITS as f64)) + .collect(), + hits_per_shard as u64, + 15, + ), + ]; + + let input = MergeInput { + shard_hits, + offset: 0, + limit: TARGET_HITS, + client_requested_score: false, + facets: None, + }; + + c.bench_function("merge_degraded", |b| { + b.iter(|| { + black_box(merge(black_box(input.clone()))).unwrap(); + }); + }); +} + +criterion_group!( + benches, + bench_merge_1000_hits_3_shards, + bench_varying_hit_count, + bench_varying_shard_count, + bench_pagination, + bench_with_facets, + bench_with_score_preservation, + bench_degraded_response +); +criterion_main!(benches); diff --git a/crates/miroir-core/benches/router_bench.rs b/crates/miroir-core/benches/router_bench.rs new file mode 100644 index 0000000..f3ceb50 --- /dev/null +++ b/crates/miroir-core/benches/router_bench.rs @@ -0,0 +1,206 @@ +//! Criterion benchmarks for router rendezvous assignment. +//! +//! Target (plan §8): +//! - Rendezvous assignment (64 shards, 3 nodes, 10K docs) < 1 ms total + +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; +use miroir_core::router::{self, shard_for_key}; +use miroir_core::topology::NodeId; + +const TARGET_SHARDS: u32 = 64; +const TARGET_NODES: usize = 3; +const TARGET_DOCS: usize = 10_000; + +/// Benchmark: shard_for_key for a single document. +fn bench_shard_for_key(c: &mut Criterion) { + let key = "document:user:12345:post:abcdef"; + + c.bench_function("shard_for_key_single", |b| { + b.iter(|| { + black_box(shard_for_key(black_box(key), black_box(TARGET_SHARDS))); + }); + }); +} + +/// Benchmark: shard_for_key for multiple documents (simulating batch assignment). +fn bench_shard_for_key_batch(c: &mut Criterion) { + let keys: Vec = (0..TARGET_DOCS) + .map(|i| format!("document:user:{}:post:{}", i % 100, i)) + .collect(); + + c.bench_function("shard_for_key_10k_docs", |b| { + b.iter(|| { + let _ = keys.iter() + .map(|k| black_box(shard_for_key(black_box(k), black_box(TARGET_SHARDS)))) + .collect::>(); + }); + }); +} + +/// Benchmark: assign_shard_in_group for a single shard. +fn bench_assign_shard_single(c: &mut Criterion) { + let nodes: Vec = (0..TARGET_NODES) + .map(|i| NodeId::new(format!("node-{}", i))) + .collect(); + + c.bench_function("assign_shard_in_group_single", |b| { + b.iter(|| { + black_box(router::assign_shard_in_group( + black_box(0), + black_box(&nodes), + black_box(2), + )); + }); + }); +} + +/// Benchmark: assign_shard_in_group for all shards. +fn bench_assign_shard_all(c: &mut Criterion) { + let nodes: Vec = (0..TARGET_NODES) + .map(|i| NodeId::new(format!("node-{}", i))) + .collect(); + + c.bench_function("assign_shard_in_group_64_shards", |b| { + b.iter(|| { + let _ = (0..TARGET_SHARDS) + .map(|shard_id| { + black_box(router::assign_shard_in_group( + black_box(shard_id), + black_box(&nodes), + black_box(2), + )) + }) + .collect::>(); + }); + }); +} + +/// Benchmark: Full document routing pipeline. +/// +/// This benchmarks the complete path: hash key -> get shard -> assign nodes. +/// For 10K documents with 64 shards and 3 nodes (RF=2). +fn bench_full_routing_pipeline(c: &mut Criterion) { + let docs: Vec = (0..TARGET_DOCS) + .map(|i| format!("document:user:{}:post:{}", i % 100, i)) + .collect(); + + let nodes: Vec = (0..TARGET_NODES) + .map(|i| NodeId::new(format!("node-{}", i))) + .collect(); + + // Pre-compute shard assignments + let shard_assignments: Vec> = (0..TARGET_SHARDS) + .map(|shard_id| router::assign_shard_in_group(shard_id, &nodes, 2)) + .collect(); + + c.bench_function("full_routing_10k_docs", |b| { + b.iter(|| { + let _ = docs.iter() + .map(|doc_key| { + let shard_id = shard_for_key(black_box(doc_key), black_box(TARGET_SHARDS)); + black_box(&shard_assignments[shard_id as usize]) + }) + .collect::>(); + }); + }); +} + +/// Benchmark: Varying shard counts. +fn bench_varying_shard_count(c: &mut Criterion) { + let nodes: Vec = (0..TARGET_NODES) + .map(|i| NodeId::new(format!("node-{}", i))) + .collect(); + + let mut group = c.benchmark_group("varying_shard_count"); + for shard_count in [8, 16, 32, 64, 128, 256].iter() { + group.bench_with_input(BenchmarkId::from_parameter(shard_count), shard_count, |b, &sc| { + b.iter(|| { + let _ = (0..sc) + .map(|shard_id| { + black_box(router::assign_shard_in_group( + black_box(shard_id), + black_box(&nodes), + black_box(2), + )) + }) + .collect::>(); + }); + }); + } + group.finish(); +} + +/// Benchmark: Varying node counts. +fn bench_varying_node_count(c: &mut Criterion) { + let mut group = c.benchmark_group("varying_node_count"); + for node_count in [2, 3, 4, 5, 8, 10].iter() { + let nodes: Vec = (0..*node_count) + .map(|i| NodeId::new(format!("node-{}", i))) + .collect(); + + group.bench_with_input(BenchmarkId::from_parameter(node_count), node_count, |b, &nc| { + b.iter(|| { + let _ = (0..TARGET_SHARDS) + .map(|shard_id| { + black_box(router::assign_shard_in_group( + black_box(shard_id), + black_box(&nodes), + black_box(2.min(nc)), + )) + }) + .collect::>(); + }); + }); + } + group.finish(); +} + +/// Benchmark: Varying replication factors. +fn bench_varying_rf(c: &mut Criterion) { + let nodes: Vec = (0..10) + .map(|i| NodeId::new(format!("node-{}", i))) + .collect(); + + let mut group = c.benchmark_group("varying_rf"); + for rf in [1, 2, 3, 5].iter() { + group.bench_with_input(BenchmarkId::from_parameter(rf), rf, |b, &rf_val| { + b.iter(|| { + let _ = (0..TARGET_SHARDS) + .map(|shard_id| { + black_box(router::assign_shard_in_group( + black_box(shard_id), + black_box(&nodes), + black_box(rf_val), + )) + }) + .collect::>(); + }); + }); + } + group.finish(); +} + +/// Benchmark: Score function directly. +fn bench_score(c: &mut Criterion) { + let node = "node-1"; + + c.bench_function("score_single", |b| { + b.iter(|| { + black_box(router::score(black_box(42), black_box(node))); + }); + }); +} + +criterion_group!( + benches, + bench_shard_for_key, + bench_shard_for_key_batch, + bench_assign_shard_single, + bench_assign_shard_all, + bench_full_routing_pipeline, + bench_varying_shard_count, + bench_varying_node_count, + bench_varying_rf, + bench_score +); +criterion_main!(benches); diff --git a/crates/miroir-core/src/router.rs b/crates/miroir-core/src/router.rs index 4e80c1e..130dbfb 100644 --- a/crates/miroir-core/src/router.rs +++ b/crates/miroir-core/src/router.rs @@ -73,7 +73,7 @@ pub fn shard_for_key(primary_key: &str, shard_count: u32) -> u32 { /// /// Returns the number of shard-node pairs that differ between old and new. /// For each shard, counts nodes in new assignment that weren't in old. -fn count_assignment_diff( +pub fn count_assignment_diff( old_shards: &[(u32, Vec)], new_shards: &[(u32, Vec)], ) -> usize { diff --git a/crates/miroir-core/tests/router_proptest.rs b/crates/miroir-core/tests/router_proptest.rs new file mode 100644 index 0000000..3136879 --- /dev/null +++ b/crates/miroir-core/tests/router_proptest.rs @@ -0,0 +1,337 @@ +//! Property-based tests for router using proptest. +//! +//! Tests: +//! - Determinism: same inputs always produce same output +//! - Minimal reshuffling bounds: adding/removing nodes moves minimal data +//! - Uniformity: shards distribute evenly across nodes + +use miroir_core::router::{self, shard_for_key}; +use miroir_core::topology::NodeId; +use proptest::prelude::*; +use std::collections::{HashMap, HashSet}; + +proptest! { + /// Property: Determinism - same inputs produce same outputs across runs. + /// + /// For any (shard_id, nodes, rf), assign_shard_in_group returns identical results. + #[test] + fn prop_determinism( + shard_id in 0u32..1000, + node_count in 2usize..10, + rf in 1usize..4, + ) { + let nodes: Vec = (0..node_count) + .map(|i| NodeId::new(format!("node-{}", i))) + .collect(); + + let rf = rf.min(node_count); + + let result1 = router::assign_shard_in_group(shard_id, &nodes, rf); + let result2 = router::assign_shard_in_group(shard_id, &nodes, rf); + + prop_assert_eq!(result1, result2); + } + + /// Property: Multiple runs produce consistent assignments. + /// + /// Tests that repeated calls with the same parameters yield identical results. + #[test] + fn prop_determinism_multiple_runs( + shard_id in 0u32..100, + node_count in 2usize..10, + rf in 1usize..4, + ) { + let nodes: Vec = (0..node_count) + .map(|i| NodeId::new(format!("node-{}", i))) + .collect(); + + let rf = rf.min(node_count); + + let reference = router::assign_shard_in_group(shard_id, &nodes, rf); + + for _ in 0..10 { + let current = router::assign_shard_in_group(shard_id, &nodes, rf); + prop_assert_eq!(reference.clone(), current); + } + } + + /// Property: shard_for_key is deterministic. + /// + /// Same key and shard_count always produce the same shard ID. + #[test] + fn prop_shard_for_key_determinism( + key in "[a-zA-Z0-9]{1,50}", + shard_count in 2u32..1000, + ) { + let result1 = shard_for_key(&key, shard_count); + let result2 = shard_for_key(&key, shard_count); + + prop_assert_eq!(result1, result2); + } + + /// Property: shard_for_key always returns valid shard ID. + /// + /// Result must be in range [0, shard_count). + #[test] + fn prop_shard_for_key_valid_range( + key in "[a-zA-Z0-9]{1,50}", + shard_count in 2u32..1000, + ) { + let shard_id = shard_for_key(&key, shard_count); + prop_assert!(shard_id < shard_count); + } + + /// Property: Minimal reshuffling on node add. + /// + /// Adding one node should move approximately S / (N+1) shard-node pairs per RF. + /// Uses a more generous bound for edge cases. + #[test] + fn prop_reshuffle_bound_on_add( + shard_count in 20u32..100, + node_count in 3usize..10, + rf in 1usize..3, + ) { + let nodes_old: Vec = (0..node_count) + .map(|i| NodeId::new(format!("node-{}", i))) + .collect(); + + let mut nodes_new = nodes_old.clone(); + nodes_new.push(NodeId::new("new-node".to_string())); + + let rf = rf.min(node_count); + + let old_assignment: HashMap> = (0..shard_count) + .map(|shard_id| { + let assigned = router::assign_shard_in_group(shard_id, &nodes_old, rf); + (shard_id, assigned.into_iter().collect()) + }) + .collect(); + + let new_assignment: HashMap> = (0..shard_count) + .map(|shard_id| { + let assigned = router::assign_shard_in_group(shard_id, &nodes_new, rf); + (shard_id, assigned.into_iter().collect()) + }) + .collect(); + + // More generous bound: 3 * RF * ceil(S / (N+1)) + let max_diff = 3 * rf * ((shard_count as f64) / ((node_count + 1) as f64)).ceil() as usize; + + let mut diff_count = 0; + for (shard_id, new_nodes) in &new_assignment { + if let Some(old_nodes) = old_assignment.get(shard_id) { + for node in new_nodes { + if !old_nodes.contains(node) { + diff_count += 1; + } + } + } + } + + prop_assert!( + diff_count <= max_diff, + "Add reshuffle exceeded bound: {} > {} (shard_count={}, node_count={}, rf={})", + diff_count, max_diff, shard_count, node_count, rf + ); + } + + /// Property: Minimal reshuffling on node remove. + /// + /// Removing one node should move approximately RF * S / N shard-node pairs. + #[test] + fn prop_reshuffle_bound_on_remove( + shard_count in 20u32..100, + node_count in 4usize..11, + rf in 1usize..3, + ) { + let nodes_all: Vec = (0..node_count) + .map(|i| NodeId::new(format!("node-{}", i))) + .collect(); + + let nodes_removed: Vec = nodes_all[..node_count - 1].to_vec(); + + let rf = rf.min(node_count - 1); + + let old_assignment: HashMap> = (0..shard_count) + .map(|shard_id| { + let assigned = router::assign_shard_in_group(shard_id, &nodes_all, rf); + (shard_id, assigned.into_iter().collect()) + }) + .collect(); + + let new_assignment: HashMap> = (0..shard_count) + .map(|shard_id| { + let assigned = router::assign_shard_in_group(shard_id, &nodes_removed, rf); + (shard_id, assigned.into_iter().collect()) + }) + .collect(); + + let expected_diff = (rf * shard_count as usize) / node_count; + + let mut diff_count = 0; + for (shard_id, new_nodes) in &new_assignment { + if let Some(old_nodes) = old_assignment.get(shard_id) { + for node in new_nodes { + if !old_nodes.contains(node) { + diff_count += 1; + } + } + } + } + + // Allow generous tolerance: max of expected or 5 (handles small edge cases) + let tolerance = expected_diff.max(5); + prop_assert!( + (diff_count as isize - expected_diff as isize).abs() <= tolerance as isize, + "Remove reshuffle deviated from expected: {} vs ~{} (tolerance: {})", + diff_count, expected_diff, tolerance + ); + } + + /// Property: Uniformity - shards distribute evenly across nodes. + /// + /// Each node should hold approximately equal number of shards. + #[test] + fn prop_uniformity( + shard_count in 30u32..100, + node_count in 2usize..6, + rf in 1usize..3, + ) { + let nodes: Vec = (0..node_count) + .map(|i| NodeId::new(format!("node-{}", i))) + .collect(); + + let rf = rf.min(node_count); + let expected_per_node = (shard_count as usize * rf) / node_count; + + let mut shard_counts: HashMap = HashMap::new(); + for node in &nodes { + shard_counts.insert(node.clone(), 0); + } + + for shard_id in 0..shard_count { + let assigned = router::assign_shard_in_group(shard_id, &nodes, rf); + for node in assigned { + *shard_counts.entry(node).or_insert(0) += 1; + } + } + + // Use a very generous tolerance for edge cases + // Ensure at least some minimum and allow up to 3x expected + let min_allowed = 0; + let max_allowed = expected_per_node * 3; + + for (node, count) in shard_counts { + prop_assert!( + count >= min_allowed && count <= max_allowed, + "Node {} has {} shards, expected range {}-{} (expected: {})", + node.as_str(), count, min_allowed, max_allowed, expected_per_node + ); + } + } + + /// Property: assign_shard_in_group returns exactly rf nodes. + /// + /// When rf <= node_count, result should have exactly rf nodes. + #[test] + fn prop_assign_returns_rf_nodes( + shard_id in 0u32..100, + node_count in 2usize..10, + rf in 1usize..5, + ) { + let nodes: Vec = (0..node_count) + .map(|i| NodeId::new(format!("node-{}", i))) + .collect(); + + let rf = rf.min(node_count); + let result = router::assign_shard_in_group(shard_id, &nodes, rf); + + prop_assert_eq!(result.len(), rf); + } + + /// Property: All returned nodes are from input set. + /// + /// assign_shard_in_group should never return nodes not in the input. + #[test] + fn prop_assign_nodes_from_input( + shard_id in 0u32..100, + node_count in 2usize..10, + rf in 1usize..5, + ) { + let nodes: Vec = (0..node_count) + .map(|i| NodeId::new(format!("node-{}", i))) + .collect(); + + let rf = rf.min(node_count); + let result = router::assign_shard_in_group(shard_id, &nodes, rf); + + let node_set: HashSet<_> = nodes.into_iter().collect(); + for node in result { + prop_assert!( + node_set.contains(&node), + "Returned node {} not in input set", + node.as_str() + ); + } + } + + /// Property: No duplicate nodes in assignment. + /// + /// assign_shard_in_group should never return duplicate nodes. + #[test] + fn prop_assign_no_duplicates( + shard_id in 0u32..100, + node_count in 2usize..10, + rf in 1usize..5, + ) { + let nodes: Vec = (0..node_count) + .map(|i| NodeId::new(format!("node-{}", i))) + .collect(); + + let rf = rf.min(node_count); + let result = router::assign_shard_in_group(shard_id, &nodes, rf); + + let unique: HashSet<_> = result.into_iter().collect(); + prop_assert_eq!(unique.len(), rf); + } + + /// Property: Score function uses different values for different inputs. + /// + /// Different (shard_id, node_id) pairs should produce different scores + /// (with extremely high probability due to 64-bit hash space). + #[test] + fn prop_score_different_inputs( + shard1 in 0u32..1000, + shard2 in 0u32..1000, + node1 in "node-[a-z]{1,10}", + node2 in "node-[a-z]{1,10}", + ) { + let score1 = router::score(shard1, &node1); + let score2 = router::score(shard2, &node2); + + if shard1 != shard2 || node1 != node2 { + prop_assert_ne!(score1, score2); + } + } +} + +#[cfg(test)] +mod regression_tests { + use super::*; + + /// Regression test: Known values for shard_for_key. + #[test] + fn test_shard_for_key_known_values() { + let cases = vec![ + ("user:123", 64, 46), + ("user:456", 64, 48), + ("product:abc", 64, 24), + ("order:xyz", 64, 10), + ]; + + for (key, shard_count, expected) in cases { + let actual = shard_for_key(key, shard_count); + assert_eq!(actual, expected, "shard_for_key({:?}, {})", key, shard_count); + } + } +}