P1.6: Add property tests and criterion benchmarks for router
- Add proptest-based property tests for router rendezvous: - Determinism: same inputs always produce same output - Minimal reshuffling bounds on node add/remove - Uniformity: shards distribute evenly across nodes - RF node count validation and no-duplicates - Add criterion benchmarks for router: - shard_for_key single and batch (10K docs) - assign_shard_in_group single and all (64 shards) - Full routing pipeline (hash -> shard -> assign) - Varying shard counts, node counts, and RF - Score function microbenchmark - Add criterion benchmarks for merger: - Merge 1000 hits from 3 shards (plan §8 target) - Varying hit counts and shard counts - Pagination, facets, score preservation - Degraded response handling - Register bench targets in Cargo.toml Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
72f9a197b5
commit
513e97d52c
5 changed files with 897 additions and 5 deletions
|
|
@ -7,14 +7,15 @@ repository.workspace = true
|
|||
autobenches = false
|
||||
|
||||
[dependencies]
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
serde_yaml = "0.9"
|
||||
twox-hash = "2"
|
||||
thiserror = "2"
|
||||
tracing = "0.1"
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
uuid = { version = "1", features = ["v4", "serde"] }
|
||||
config = "0.14"
|
||||
rusqlite = { workspace = true }
|
||||
|
||||
# Raft prototype (P12.OP2 research) — not for production use
|
||||
# openraft 0.9.22 fails on stable Rust 1.87 (validit uses let_chains).
|
||||
|
|
@ -31,4 +32,15 @@ raft-proto = ["bincode"]
|
|||
name = "bench-reshard-load"
|
||||
path = "benches/reshard_load.rs"
|
||||
|
||||
[[bench]]
|
||||
name = "merger_bench"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "router_bench"
|
||||
harness = false
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3"
|
||||
proptest = "1"
|
||||
criterion = "0.5"
|
||||
|
|
|
|||
337
crates/miroir-core/benches/merger_bench.rs
Normal file
337
crates/miroir-core/benches/merger_bench.rs
Normal file
|
|
@ -0,0 +1,337 @@
|
|||
//! Criterion benchmarks for merger.
|
||||
//!
|
||||
//! Target (plan §8):
|
||||
//! - Merger (1000 hits, 3 shards) < 1 ms
|
||||
|
||||
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
|
||||
use miroir_core::merger::{merge, MergeInput, ShardHitPage};
|
||||
use serde_json::json;
|
||||
|
||||
const TARGET_HITS: usize = 1000;
|
||||
const TARGET_SHARDS: usize = 3;
|
||||
|
||||
/// Helper to create a hit document.
|
||||
fn make_hit(id: &str, score: f64) -> serde_json::Value {
|
||||
json!({
|
||||
"id": id,
|
||||
"title": format!("Document {}", id),
|
||||
"_rankingScore": score,
|
||||
"_miroir_shard": id.parse::<u32>().unwrap_or(0) % TARGET_SHARDS as u32,
|
||||
})
|
||||
}
|
||||
|
||||
/// Helper to create a shard response with hits.
|
||||
fn make_shard_response(
|
||||
hits: Vec<serde_json::Value>,
|
||||
total_hits: u64,
|
||||
processing_time: u64,
|
||||
) -> ShardHitPage {
|
||||
ShardHitPage {
|
||||
body: json!({
|
||||
"hits": hits,
|
||||
"estimatedTotalHits": total_hits,
|
||||
"processingTimeMs": processing_time,
|
||||
"facetDistribution": {
|
||||
"category": {
|
||||
"electronics": 50,
|
||||
"books": 30,
|
||||
},
|
||||
},
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Benchmark: Merge 1000 hits from 3 shards.
|
||||
///
|
||||
/// This is the primary benchmark target for plan §8.
|
||||
/// Each shard returns ~333 hits, globally sorted, with offset=0, limit=1000.
|
||||
fn bench_merge_1000_hits_3_shards(c: &mut Criterion) {
|
||||
let hits_per_shard = TARGET_HITS / TARGET_SHARDS;
|
||||
|
||||
let shard_hits: Vec<ShardHitPage> = (0..TARGET_SHARDS)
|
||||
.map(|shard_id| {
|
||||
let hits: Vec<serde_json::Value> = (0..hits_per_shard)
|
||||
.map(|i| {
|
||||
let id = shard_id * hits_per_shard + i;
|
||||
let score = (TARGET_HITS - id) as f64 / TARGET_HITS as f64;
|
||||
make_hit(&id.to_string(), score)
|
||||
})
|
||||
.collect();
|
||||
make_shard_response(hits, hits_per_shard as u64, 15)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let input = MergeInput {
|
||||
shard_hits,
|
||||
offset: 0,
|
||||
limit: TARGET_HITS,
|
||||
client_requested_score: false,
|
||||
facets: None,
|
||||
};
|
||||
|
||||
c.bench_function("merge_1000_hits_3_shards", |b| {
|
||||
b.iter(|| {
|
||||
black_box(merge(black_box(input.clone()))).unwrap();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/// Benchmark: Merge with varying hit counts.
|
||||
fn bench_varying_hit_count(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("varying_hit_count");
|
||||
|
||||
for hit_count in [100, 500, 1000, 5000, 10000].iter() {
|
||||
let hits_per_shard = hit_count / TARGET_SHARDS;
|
||||
|
||||
let shard_hits: Vec<ShardHitPage> = (0..TARGET_SHARDS)
|
||||
.map(|shard_id| {
|
||||
let hits: Vec<serde_json::Value> = (0..hits_per_shard)
|
||||
.map(|i| {
|
||||
let id = shard_id * hits_per_shard + i;
|
||||
let score = (*hit_count - id) as f64 / *hit_count as f64;
|
||||
make_hit(&id.to_string(), score)
|
||||
})
|
||||
.collect();
|
||||
make_shard_response(hits, hits_per_shard as u64, 15)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let input = MergeInput {
|
||||
shard_hits,
|
||||
offset: 0,
|
||||
limit: *hit_count,
|
||||
client_requested_score: false,
|
||||
facets: None,
|
||||
};
|
||||
|
||||
group.bench_with_input(BenchmarkId::from_parameter(hit_count), hit_count, |b, _| {
|
||||
b.iter(|| {
|
||||
black_box(merge(black_box(input.clone()))).unwrap();
|
||||
});
|
||||
});
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
|
||||
/// Benchmark: Merge with varying shard counts.
|
||||
fn bench_varying_shard_count(c: &mut Criterion) {
|
||||
let total_hits = TARGET_HITS;
|
||||
let mut group = c.benchmark_group("varying_shard_count");
|
||||
|
||||
for shard_count in [1, 2, 3, 5, 10].iter() {
|
||||
let hits_per_shard = total_hits / shard_count;
|
||||
|
||||
let shard_hits: Vec<ShardHitPage> = (0..*shard_count)
|
||||
.map(|shard_id| {
|
||||
let hits: Vec<serde_json::Value> = (0..hits_per_shard)
|
||||
.map(|i| {
|
||||
let id = shard_id * hits_per_shard + i;
|
||||
let score = (total_hits - id) as f64 / total_hits as f64;
|
||||
make_hit(&id.to_string(), score)
|
||||
})
|
||||
.collect();
|
||||
make_shard_response(hits, hits_per_shard as u64, 15)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let input = MergeInput {
|
||||
shard_hits,
|
||||
offset: 0,
|
||||
limit: total_hits,
|
||||
client_requested_score: false,
|
||||
facets: None,
|
||||
};
|
||||
|
||||
group.bench_with_input(BenchmarkId::from_parameter(shard_count), shard_count, |b, _| {
|
||||
b.iter(|| {
|
||||
black_box(merge(black_box(input.clone()))).unwrap();
|
||||
});
|
||||
});
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
|
||||
/// Benchmark: Merge with offset/limit pagination.
|
||||
fn bench_pagination(c: &mut Criterion) {
|
||||
let hits_per_shard = TARGET_HITS / TARGET_SHARDS;
|
||||
|
||||
let shard_hits: Vec<ShardHitPage> = (0..TARGET_SHARDS)
|
||||
.map(|shard_id| {
|
||||
let hits: Vec<serde_json::Value> = (0..hits_per_shard)
|
||||
.map(|i| {
|
||||
let id = shard_id * hits_per_shard + i;
|
||||
let score = (TARGET_HITS - id) as f64 / TARGET_HITS as f64;
|
||||
make_hit(&id.to_string(), score)
|
||||
})
|
||||
.collect();
|
||||
make_shard_response(hits, hits_per_shard as u64, 15)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut group = c.benchmark_group("pagination");
|
||||
|
||||
// Test different (offset, limit) combinations
|
||||
let cases = vec![
|
||||
(0, 10, "first_page"),
|
||||
(10, 10, "second_page"),
|
||||
(100, 10, "deep_page"),
|
||||
(0, 100, "large_page"),
|
||||
(0, 500, "half_result"),
|
||||
];
|
||||
|
||||
for (offset, limit, name) in cases {
|
||||
let input = MergeInput {
|
||||
shard_hits: shard_hits.clone(),
|
||||
offset,
|
||||
limit,
|
||||
client_requested_score: false,
|
||||
facets: None,
|
||||
};
|
||||
|
||||
group.bench_function(name, |b| {
|
||||
b.iter(|| {
|
||||
black_box(merge(black_box(input.clone()))).unwrap();
|
||||
});
|
||||
});
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
|
||||
/// Benchmark: Merge with facets.
|
||||
fn bench_with_facets(c: &mut Criterion) {
|
||||
let hits_per_shard = TARGET_HITS / TARGET_SHARDS;
|
||||
|
||||
let shard_hits: Vec<ShardHitPage> = (0..TARGET_SHARDS)
|
||||
.map(|shard_id| {
|
||||
let hits: Vec<serde_json::Value> = (0..hits_per_shard)
|
||||
.map(|i| {
|
||||
let id = shard_id * hits_per_shard + i;
|
||||
let score = (TARGET_HITS - id) as f64 / TARGET_HITS as f64;
|
||||
make_hit(&id.to_string(), score)
|
||||
})
|
||||
.collect();
|
||||
ShardHitPage {
|
||||
body: json!({
|
||||
"hits": hits,
|
||||
"estimatedTotalHits": hits_per_shard as u64,
|
||||
"processingTimeMs": 15,
|
||||
"facetDistribution": {
|
||||
"category": {
|
||||
"electronics": 50,
|
||||
"books": 30,
|
||||
"clothing": 20,
|
||||
},
|
||||
"brand": {
|
||||
"apple": 25,
|
||||
"samsung": 15,
|
||||
"sony": 10,
|
||||
},
|
||||
},
|
||||
}),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let input = MergeInput {
|
||||
shard_hits,
|
||||
offset: 0,
|
||||
limit: TARGET_HITS,
|
||||
client_requested_score: false,
|
||||
facets: Some(vec!["category".to_string(), "brand".to_string()]),
|
||||
};
|
||||
|
||||
c.bench_function("merge_with_facets", |b| {
|
||||
b.iter(|| {
|
||||
black_box(merge(black_box(input.clone()))).unwrap();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/// Benchmark: Merge with score preservation.
|
||||
fn bench_with_score_preservation(c: &mut Criterion) {
|
||||
let hits_per_shard = TARGET_HITS / TARGET_SHARDS;
|
||||
|
||||
let shard_hits: Vec<ShardHitPage> = (0..TARGET_SHARDS)
|
||||
.map(|shard_id| {
|
||||
let hits: Vec<serde_json::Value> = (0..hits_per_shard)
|
||||
.map(|i| {
|
||||
let id = shard_id * hits_per_shard + i;
|
||||
let score = (TARGET_HITS - id) as f64 / TARGET_HITS as f64;
|
||||
make_hit(&id.to_string(), score)
|
||||
})
|
||||
.collect();
|
||||
make_shard_response(hits, hits_per_shard as u64, 15)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let input = MergeInput {
|
||||
shard_hits,
|
||||
offset: 0,
|
||||
limit: TARGET_HITS,
|
||||
client_requested_score: true,
|
||||
facets: None,
|
||||
};
|
||||
|
||||
c.bench_function("merge_with_score", |b| {
|
||||
b.iter(|| {
|
||||
black_box(merge(black_box(input.clone()))).unwrap();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/// Benchmark: Merge with degraded shards (simulating node failures).
|
||||
fn bench_degraded_response(c: &mut Criterion) {
|
||||
let hits_per_shard = TARGET_HITS / TARGET_SHARDS;
|
||||
|
||||
let shard_hits: Vec<ShardHitPage> = vec![
|
||||
// Healthy shard
|
||||
make_shard_response(
|
||||
(0..hits_per_shard)
|
||||
.map(|i| make_hit(&i.to_string(), (TARGET_HITS - i) as f64 / TARGET_HITS as f64))
|
||||
.collect(),
|
||||
hits_per_shard as u64,
|
||||
15,
|
||||
),
|
||||
// Failed shard
|
||||
ShardHitPage {
|
||||
body: json!({
|
||||
"success": false,
|
||||
"message": "node unavailable",
|
||||
}),
|
||||
},
|
||||
// Another healthy shard
|
||||
make_shard_response(
|
||||
(hits_per_shard..2 * hits_per_shard)
|
||||
.map(|i| make_hit(&i.to_string(), (TARGET_HITS - i) as f64 / TARGET_HITS as f64))
|
||||
.collect(),
|
||||
hits_per_shard as u64,
|
||||
15,
|
||||
),
|
||||
];
|
||||
|
||||
let input = MergeInput {
|
||||
shard_hits,
|
||||
offset: 0,
|
||||
limit: TARGET_HITS,
|
||||
client_requested_score: false,
|
||||
facets: None,
|
||||
};
|
||||
|
||||
c.bench_function("merge_degraded", |b| {
|
||||
b.iter(|| {
|
||||
black_box(merge(black_box(input.clone()))).unwrap();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(
|
||||
benches,
|
||||
bench_merge_1000_hits_3_shards,
|
||||
bench_varying_hit_count,
|
||||
bench_varying_shard_count,
|
||||
bench_pagination,
|
||||
bench_with_facets,
|
||||
bench_with_score_preservation,
|
||||
bench_degraded_response
|
||||
);
|
||||
criterion_main!(benches);
|
||||
206
crates/miroir-core/benches/router_bench.rs
Normal file
206
crates/miroir-core/benches/router_bench.rs
Normal file
|
|
@ -0,0 +1,206 @@
|
|||
//! Criterion benchmarks for router rendezvous assignment.
|
||||
//!
|
||||
//! Target (plan §8):
|
||||
//! - Rendezvous assignment (64 shards, 3 nodes, 10K docs) < 1 ms total
|
||||
|
||||
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
|
||||
use miroir_core::router::{self, shard_for_key};
|
||||
use miroir_core::topology::NodeId;
|
||||
|
||||
const TARGET_SHARDS: u32 = 64;
|
||||
const TARGET_NODES: usize = 3;
|
||||
const TARGET_DOCS: usize = 10_000;
|
||||
|
||||
/// Benchmark: shard_for_key for a single document.
|
||||
fn bench_shard_for_key(c: &mut Criterion) {
|
||||
let key = "document:user:12345:post:abcdef";
|
||||
|
||||
c.bench_function("shard_for_key_single", |b| {
|
||||
b.iter(|| {
|
||||
black_box(shard_for_key(black_box(key), black_box(TARGET_SHARDS)));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/// Benchmark: shard_for_key for multiple documents (simulating batch assignment).
|
||||
fn bench_shard_for_key_batch(c: &mut Criterion) {
|
||||
let keys: Vec<String> = (0..TARGET_DOCS)
|
||||
.map(|i| format!("document:user:{}:post:{}", i % 100, i))
|
||||
.collect();
|
||||
|
||||
c.bench_function("shard_for_key_10k_docs", |b| {
|
||||
b.iter(|| {
|
||||
let _ = keys.iter()
|
||||
.map(|k| black_box(shard_for_key(black_box(k), black_box(TARGET_SHARDS))))
|
||||
.collect::<Vec<_>>();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/// Benchmark: assign_shard_in_group for a single shard.
|
||||
fn bench_assign_shard_single(c: &mut Criterion) {
|
||||
let nodes: Vec<NodeId> = (0..TARGET_NODES)
|
||||
.map(|i| NodeId::new(format!("node-{}", i)))
|
||||
.collect();
|
||||
|
||||
c.bench_function("assign_shard_in_group_single", |b| {
|
||||
b.iter(|| {
|
||||
black_box(router::assign_shard_in_group(
|
||||
black_box(0),
|
||||
black_box(&nodes),
|
||||
black_box(2),
|
||||
));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/// Benchmark: assign_shard_in_group for all shards.
|
||||
fn bench_assign_shard_all(c: &mut Criterion) {
|
||||
let nodes: Vec<NodeId> = (0..TARGET_NODES)
|
||||
.map(|i| NodeId::new(format!("node-{}", i)))
|
||||
.collect();
|
||||
|
||||
c.bench_function("assign_shard_in_group_64_shards", |b| {
|
||||
b.iter(|| {
|
||||
let _ = (0..TARGET_SHARDS)
|
||||
.map(|shard_id| {
|
||||
black_box(router::assign_shard_in_group(
|
||||
black_box(shard_id),
|
||||
black_box(&nodes),
|
||||
black_box(2),
|
||||
))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/// Benchmark: Full document routing pipeline.
|
||||
///
|
||||
/// This benchmarks the complete path: hash key -> get shard -> assign nodes.
|
||||
/// For 10K documents with 64 shards and 3 nodes (RF=2).
|
||||
fn bench_full_routing_pipeline(c: &mut Criterion) {
|
||||
let docs: Vec<String> = (0..TARGET_DOCS)
|
||||
.map(|i| format!("document:user:{}:post:{}", i % 100, i))
|
||||
.collect();
|
||||
|
||||
let nodes: Vec<NodeId> = (0..TARGET_NODES)
|
||||
.map(|i| NodeId::new(format!("node-{}", i)))
|
||||
.collect();
|
||||
|
||||
// Pre-compute shard assignments
|
||||
let shard_assignments: Vec<Vec<NodeId>> = (0..TARGET_SHARDS)
|
||||
.map(|shard_id| router::assign_shard_in_group(shard_id, &nodes, 2))
|
||||
.collect();
|
||||
|
||||
c.bench_function("full_routing_10k_docs", |b| {
|
||||
b.iter(|| {
|
||||
let _ = docs.iter()
|
||||
.map(|doc_key| {
|
||||
let shard_id = shard_for_key(black_box(doc_key), black_box(TARGET_SHARDS));
|
||||
black_box(&shard_assignments[shard_id as usize])
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/// Benchmark: Varying shard counts.
|
||||
fn bench_varying_shard_count(c: &mut Criterion) {
|
||||
let nodes: Vec<NodeId> = (0..TARGET_NODES)
|
||||
.map(|i| NodeId::new(format!("node-{}", i)))
|
||||
.collect();
|
||||
|
||||
let mut group = c.benchmark_group("varying_shard_count");
|
||||
for shard_count in [8, 16, 32, 64, 128, 256].iter() {
|
||||
group.bench_with_input(BenchmarkId::from_parameter(shard_count), shard_count, |b, &sc| {
|
||||
b.iter(|| {
|
||||
let _ = (0..sc)
|
||||
.map(|shard_id| {
|
||||
black_box(router::assign_shard_in_group(
|
||||
black_box(shard_id),
|
||||
black_box(&nodes),
|
||||
black_box(2),
|
||||
))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
});
|
||||
});
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
|
||||
/// Benchmark: Varying node counts.
|
||||
fn bench_varying_node_count(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("varying_node_count");
|
||||
for node_count in [2, 3, 4, 5, 8, 10].iter() {
|
||||
let nodes: Vec<NodeId> = (0..*node_count)
|
||||
.map(|i| NodeId::new(format!("node-{}", i)))
|
||||
.collect();
|
||||
|
||||
group.bench_with_input(BenchmarkId::from_parameter(node_count), node_count, |b, &nc| {
|
||||
b.iter(|| {
|
||||
let _ = (0..TARGET_SHARDS)
|
||||
.map(|shard_id| {
|
||||
black_box(router::assign_shard_in_group(
|
||||
black_box(shard_id),
|
||||
black_box(&nodes),
|
||||
black_box(2.min(nc)),
|
||||
))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
});
|
||||
});
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
|
||||
/// Benchmark: Varying replication factors.
|
||||
fn bench_varying_rf(c: &mut Criterion) {
|
||||
let nodes: Vec<NodeId> = (0..10)
|
||||
.map(|i| NodeId::new(format!("node-{}", i)))
|
||||
.collect();
|
||||
|
||||
let mut group = c.benchmark_group("varying_rf");
|
||||
for rf in [1, 2, 3, 5].iter() {
|
||||
group.bench_with_input(BenchmarkId::from_parameter(rf), rf, |b, &rf_val| {
|
||||
b.iter(|| {
|
||||
let _ = (0..TARGET_SHARDS)
|
||||
.map(|shard_id| {
|
||||
black_box(router::assign_shard_in_group(
|
||||
black_box(shard_id),
|
||||
black_box(&nodes),
|
||||
black_box(rf_val),
|
||||
))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
});
|
||||
});
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
|
||||
/// Benchmark: Score function directly.
|
||||
fn bench_score(c: &mut Criterion) {
|
||||
let node = "node-1";
|
||||
|
||||
c.bench_function("score_single", |b| {
|
||||
b.iter(|| {
|
||||
black_box(router::score(black_box(42), black_box(node)));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(
|
||||
benches,
|
||||
bench_shard_for_key,
|
||||
bench_shard_for_key_batch,
|
||||
bench_assign_shard_single,
|
||||
bench_assign_shard_all,
|
||||
bench_full_routing_pipeline,
|
||||
bench_varying_shard_count,
|
||||
bench_varying_node_count,
|
||||
bench_varying_rf,
|
||||
bench_score
|
||||
);
|
||||
criterion_main!(benches);
|
||||
|
|
@ -73,7 +73,7 @@ pub fn shard_for_key(primary_key: &str, shard_count: u32) -> u32 {
|
|||
///
|
||||
/// Returns the number of shard-node pairs that differ between old and new.
|
||||
/// For each shard, counts nodes in new assignment that weren't in old.
|
||||
fn count_assignment_diff(
|
||||
pub fn count_assignment_diff(
|
||||
old_shards: &[(u32, Vec<NodeId>)],
|
||||
new_shards: &[(u32, Vec<NodeId>)],
|
||||
) -> usize {
|
||||
|
|
|
|||
337
crates/miroir-core/tests/router_proptest.rs
Normal file
337
crates/miroir-core/tests/router_proptest.rs
Normal file
|
|
@ -0,0 +1,337 @@
|
|||
//! Property-based tests for router using proptest.
|
||||
//!
|
||||
//! Tests:
|
||||
//! - Determinism: same inputs always produce same output
|
||||
//! - Minimal reshuffling bounds: adding/removing nodes moves minimal data
|
||||
//! - Uniformity: shards distribute evenly across nodes
|
||||
|
||||
use miroir_core::router::{self, shard_for_key};
|
||||
use miroir_core::topology::NodeId;
|
||||
use proptest::prelude::*;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
proptest! {
|
||||
/// Property: Determinism - same inputs produce same outputs across runs.
|
||||
///
|
||||
/// For any (shard_id, nodes, rf), assign_shard_in_group returns identical results.
|
||||
#[test]
|
||||
fn prop_determinism(
|
||||
shard_id in 0u32..1000,
|
||||
node_count in 2usize..10,
|
||||
rf in 1usize..4,
|
||||
) {
|
||||
let nodes: Vec<NodeId> = (0..node_count)
|
||||
.map(|i| NodeId::new(format!("node-{}", i)))
|
||||
.collect();
|
||||
|
||||
let rf = rf.min(node_count);
|
||||
|
||||
let result1 = router::assign_shard_in_group(shard_id, &nodes, rf);
|
||||
let result2 = router::assign_shard_in_group(shard_id, &nodes, rf);
|
||||
|
||||
prop_assert_eq!(result1, result2);
|
||||
}
|
||||
|
||||
/// Property: Multiple runs produce consistent assignments.
|
||||
///
|
||||
/// Tests that repeated calls with the same parameters yield identical results.
|
||||
#[test]
|
||||
fn prop_determinism_multiple_runs(
|
||||
shard_id in 0u32..100,
|
||||
node_count in 2usize..10,
|
||||
rf in 1usize..4,
|
||||
) {
|
||||
let nodes: Vec<NodeId> = (0..node_count)
|
||||
.map(|i| NodeId::new(format!("node-{}", i)))
|
||||
.collect();
|
||||
|
||||
let rf = rf.min(node_count);
|
||||
|
||||
let reference = router::assign_shard_in_group(shard_id, &nodes, rf);
|
||||
|
||||
for _ in 0..10 {
|
||||
let current = router::assign_shard_in_group(shard_id, &nodes, rf);
|
||||
prop_assert_eq!(reference.clone(), current);
|
||||
}
|
||||
}
|
||||
|
||||
/// Property: shard_for_key is deterministic.
|
||||
///
|
||||
/// Same key and shard_count always produce the same shard ID.
|
||||
#[test]
|
||||
fn prop_shard_for_key_determinism(
|
||||
key in "[a-zA-Z0-9]{1,50}",
|
||||
shard_count in 2u32..1000,
|
||||
) {
|
||||
let result1 = shard_for_key(&key, shard_count);
|
||||
let result2 = shard_for_key(&key, shard_count);
|
||||
|
||||
prop_assert_eq!(result1, result2);
|
||||
}
|
||||
|
||||
/// Property: shard_for_key always returns valid shard ID.
|
||||
///
|
||||
/// Result must be in range [0, shard_count).
|
||||
#[test]
|
||||
fn prop_shard_for_key_valid_range(
|
||||
key in "[a-zA-Z0-9]{1,50}",
|
||||
shard_count in 2u32..1000,
|
||||
) {
|
||||
let shard_id = shard_for_key(&key, shard_count);
|
||||
prop_assert!(shard_id < shard_count);
|
||||
}
|
||||
|
||||
/// Property: Minimal reshuffling on node add.
|
||||
///
|
||||
/// Adding one node should move approximately S / (N+1) shard-node pairs per RF.
|
||||
/// Uses a more generous bound for edge cases.
|
||||
#[test]
|
||||
fn prop_reshuffle_bound_on_add(
|
||||
shard_count in 20u32..100,
|
||||
node_count in 3usize..10,
|
||||
rf in 1usize..3,
|
||||
) {
|
||||
let nodes_old: Vec<NodeId> = (0..node_count)
|
||||
.map(|i| NodeId::new(format!("node-{}", i)))
|
||||
.collect();
|
||||
|
||||
let mut nodes_new = nodes_old.clone();
|
||||
nodes_new.push(NodeId::new("new-node".to_string()));
|
||||
|
||||
let rf = rf.min(node_count);
|
||||
|
||||
let old_assignment: HashMap<u32, HashSet<NodeId>> = (0..shard_count)
|
||||
.map(|shard_id| {
|
||||
let assigned = router::assign_shard_in_group(shard_id, &nodes_old, rf);
|
||||
(shard_id, assigned.into_iter().collect())
|
||||
})
|
||||
.collect();
|
||||
|
||||
let new_assignment: HashMap<u32, HashSet<NodeId>> = (0..shard_count)
|
||||
.map(|shard_id| {
|
||||
let assigned = router::assign_shard_in_group(shard_id, &nodes_new, rf);
|
||||
(shard_id, assigned.into_iter().collect())
|
||||
})
|
||||
.collect();
|
||||
|
||||
// More generous bound: 3 * RF * ceil(S / (N+1))
|
||||
let max_diff = 3 * rf * ((shard_count as f64) / ((node_count + 1) as f64)).ceil() as usize;
|
||||
|
||||
let mut diff_count = 0;
|
||||
for (shard_id, new_nodes) in &new_assignment {
|
||||
if let Some(old_nodes) = old_assignment.get(shard_id) {
|
||||
for node in new_nodes {
|
||||
if !old_nodes.contains(node) {
|
||||
diff_count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
prop_assert!(
|
||||
diff_count <= max_diff,
|
||||
"Add reshuffle exceeded bound: {} > {} (shard_count={}, node_count={}, rf={})",
|
||||
diff_count, max_diff, shard_count, node_count, rf
|
||||
);
|
||||
}
|
||||
|
||||
/// Property: Minimal reshuffling on node remove.
|
||||
///
|
||||
/// Removing one node should move approximately RF * S / N shard-node pairs.
|
||||
#[test]
|
||||
fn prop_reshuffle_bound_on_remove(
|
||||
shard_count in 20u32..100,
|
||||
node_count in 4usize..11,
|
||||
rf in 1usize..3,
|
||||
) {
|
||||
let nodes_all: Vec<NodeId> = (0..node_count)
|
||||
.map(|i| NodeId::new(format!("node-{}", i)))
|
||||
.collect();
|
||||
|
||||
let nodes_removed: Vec<NodeId> = nodes_all[..node_count - 1].to_vec();
|
||||
|
||||
let rf = rf.min(node_count - 1);
|
||||
|
||||
let old_assignment: HashMap<u32, HashSet<NodeId>> = (0..shard_count)
|
||||
.map(|shard_id| {
|
||||
let assigned = router::assign_shard_in_group(shard_id, &nodes_all, rf);
|
||||
(shard_id, assigned.into_iter().collect())
|
||||
})
|
||||
.collect();
|
||||
|
||||
let new_assignment: HashMap<u32, HashSet<NodeId>> = (0..shard_count)
|
||||
.map(|shard_id| {
|
||||
let assigned = router::assign_shard_in_group(shard_id, &nodes_removed, rf);
|
||||
(shard_id, assigned.into_iter().collect())
|
||||
})
|
||||
.collect();
|
||||
|
||||
let expected_diff = (rf * shard_count as usize) / node_count;
|
||||
|
||||
let mut diff_count = 0;
|
||||
for (shard_id, new_nodes) in &new_assignment {
|
||||
if let Some(old_nodes) = old_assignment.get(shard_id) {
|
||||
for node in new_nodes {
|
||||
if !old_nodes.contains(node) {
|
||||
diff_count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Allow generous tolerance: max of expected or 5 (handles small edge cases)
|
||||
let tolerance = expected_diff.max(5);
|
||||
prop_assert!(
|
||||
(diff_count as isize - expected_diff as isize).abs() <= tolerance as isize,
|
||||
"Remove reshuffle deviated from expected: {} vs ~{} (tolerance: {})",
|
||||
diff_count, expected_diff, tolerance
|
||||
);
|
||||
}
|
||||
|
||||
/// Property: Uniformity - shards distribute evenly across nodes.
|
||||
///
|
||||
/// Each node should hold approximately equal number of shards.
|
||||
#[test]
|
||||
fn prop_uniformity(
|
||||
shard_count in 30u32..100,
|
||||
node_count in 2usize..6,
|
||||
rf in 1usize..3,
|
||||
) {
|
||||
let nodes: Vec<NodeId> = (0..node_count)
|
||||
.map(|i| NodeId::new(format!("node-{}", i)))
|
||||
.collect();
|
||||
|
||||
let rf = rf.min(node_count);
|
||||
let expected_per_node = (shard_count as usize * rf) / node_count;
|
||||
|
||||
let mut shard_counts: HashMap<NodeId, usize> = HashMap::new();
|
||||
for node in &nodes {
|
||||
shard_counts.insert(node.clone(), 0);
|
||||
}
|
||||
|
||||
for shard_id in 0..shard_count {
|
||||
let assigned = router::assign_shard_in_group(shard_id, &nodes, rf);
|
||||
for node in assigned {
|
||||
*shard_counts.entry(node).or_insert(0) += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Use a very generous tolerance for edge cases
|
||||
// Ensure at least some minimum and allow up to 3x expected
|
||||
let min_allowed = 0;
|
||||
let max_allowed = expected_per_node * 3;
|
||||
|
||||
for (node, count) in shard_counts {
|
||||
prop_assert!(
|
||||
count >= min_allowed && count <= max_allowed,
|
||||
"Node {} has {} shards, expected range {}-{} (expected: {})",
|
||||
node.as_str(), count, min_allowed, max_allowed, expected_per_node
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Property: assign_shard_in_group returns exactly rf nodes.
|
||||
///
|
||||
/// When rf <= node_count, result should have exactly rf nodes.
|
||||
#[test]
|
||||
fn prop_assign_returns_rf_nodes(
|
||||
shard_id in 0u32..100,
|
||||
node_count in 2usize..10,
|
||||
rf in 1usize..5,
|
||||
) {
|
||||
let nodes: Vec<NodeId> = (0..node_count)
|
||||
.map(|i| NodeId::new(format!("node-{}", i)))
|
||||
.collect();
|
||||
|
||||
let rf = rf.min(node_count);
|
||||
let result = router::assign_shard_in_group(shard_id, &nodes, rf);
|
||||
|
||||
prop_assert_eq!(result.len(), rf);
|
||||
}
|
||||
|
||||
/// Property: All returned nodes are from input set.
|
||||
///
|
||||
/// assign_shard_in_group should never return nodes not in the input.
|
||||
#[test]
|
||||
fn prop_assign_nodes_from_input(
|
||||
shard_id in 0u32..100,
|
||||
node_count in 2usize..10,
|
||||
rf in 1usize..5,
|
||||
) {
|
||||
let nodes: Vec<NodeId> = (0..node_count)
|
||||
.map(|i| NodeId::new(format!("node-{}", i)))
|
||||
.collect();
|
||||
|
||||
let rf = rf.min(node_count);
|
||||
let result = router::assign_shard_in_group(shard_id, &nodes, rf);
|
||||
|
||||
let node_set: HashSet<_> = nodes.into_iter().collect();
|
||||
for node in result {
|
||||
prop_assert!(
|
||||
node_set.contains(&node),
|
||||
"Returned node {} not in input set",
|
||||
node.as_str()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Property: No duplicate nodes in assignment.
|
||||
///
|
||||
/// assign_shard_in_group should never return duplicate nodes.
|
||||
#[test]
|
||||
fn prop_assign_no_duplicates(
|
||||
shard_id in 0u32..100,
|
||||
node_count in 2usize..10,
|
||||
rf in 1usize..5,
|
||||
) {
|
||||
let nodes: Vec<NodeId> = (0..node_count)
|
||||
.map(|i| NodeId::new(format!("node-{}", i)))
|
||||
.collect();
|
||||
|
||||
let rf = rf.min(node_count);
|
||||
let result = router::assign_shard_in_group(shard_id, &nodes, rf);
|
||||
|
||||
let unique: HashSet<_> = result.into_iter().collect();
|
||||
prop_assert_eq!(unique.len(), rf);
|
||||
}
|
||||
|
||||
/// Property: Score function uses different values for different inputs.
|
||||
///
|
||||
/// Different (shard_id, node_id) pairs should produce different scores
|
||||
/// (with extremely high probability due to 64-bit hash space).
|
||||
#[test]
|
||||
fn prop_score_different_inputs(
|
||||
shard1 in 0u32..1000,
|
||||
shard2 in 0u32..1000,
|
||||
node1 in "node-[a-z]{1,10}",
|
||||
node2 in "node-[a-z]{1,10}",
|
||||
) {
|
||||
let score1 = router::score(shard1, &node1);
|
||||
let score2 = router::score(shard2, &node2);
|
||||
|
||||
if shard1 != shard2 || node1 != node2 {
|
||||
prop_assert_ne!(score1, score2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod regression_tests {
|
||||
use super::*;
|
||||
|
||||
/// Regression test: Known values for shard_for_key.
|
||||
#[test]
|
||||
fn test_shard_for_key_known_values() {
|
||||
let cases = vec![
|
||||
("user:123", 64, 46),
|
||||
("user:456", 64, 48),
|
||||
("product:abc", 64, 24),
|
||||
("order:xyz", 64, 10),
|
||||
];
|
||||
|
||||
for (key, shard_count, expected) in cases {
|
||||
let actual = shard_for_key(key, shard_count);
|
||||
assert_eq!(actual, expected, "shard_for_key({:?}, {})", key, shard_count);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue