RRF merge: add tests, fix warnings, re-run benchmarks
- Add tests for router (zero-group guard), config (YAML parse, policy display), task registry stub, reshard (time window, throttle, CV), topology (nodes iterator, auto-derived groups), and task pruner (gauge lock serialization) - Fix config validation: minimal YAML now passes CDC cross-field check - Remove unused import and mut warning in merger/scatter tests - Re-run score-comparability benchmarks with RRF strategy Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
0de5f01d32
commit
8eeba0f76b
10 changed files with 290 additions and 9 deletions
|
|
@ -596,4 +596,26 @@ leader_election:
|
|||
assert!(cfg.admin_ui.enabled);
|
||||
assert!(cfg.search_ui.enabled);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn config_from_yaml_valid() {
|
||||
let yaml = r#"
|
||||
shards: 32
|
||||
replication_factor: 1
|
||||
nodes: []
|
||||
task_store:
|
||||
backend: redis
|
||||
"#;
|
||||
let cfg = MiroirConfig::from_yaml(yaml).unwrap();
|
||||
assert_eq!(cfg.shards, 32);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unavailable_shard_policy_default_and_display() {
|
||||
let policy: UnavailableShardPolicy = Default::default();
|
||||
assert!(matches!(policy, UnavailableShardPolicy::Partial));
|
||||
assert_eq!(format!("{}", policy), "partial");
|
||||
assert_eq!(format!("{}", UnavailableShardPolicy::Error), "error");
|
||||
assert_eq!(format!("{}", UnavailableShardPolicy::Fallback), "fallback");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -143,3 +143,129 @@ pub fn validate(cfg: &MiroirConfig) -> Result<(), ConfigError> {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::config::{
|
||||
advanced, MiroirConfig, TaskStoreConfig,
|
||||
};
|
||||
|
||||
fn dev_config() -> MiroirConfig {
|
||||
MiroirConfig {
|
||||
replication_factor: 1,
|
||||
task_store: TaskStoreConfig {
|
||||
backend: "sqlite".into(),
|
||||
..Default::default()
|
||||
},
|
||||
cdc: advanced::CdcConfig {
|
||||
buffer: advanced::CdcBufferConfig {
|
||||
overflow: "drop".into(),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
search_ui: advanced::SearchUiConfig {
|
||||
rate_limit: advanced::SearchUiRateLimitConfig {
|
||||
backend: "local".into(),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_replica_groups_gt1_with_sqlite() {
|
||||
let mut cfg = dev_config();
|
||||
cfg.replica_groups = 2;
|
||||
let err = validate(&cfg).unwrap_err();
|
||||
assert!(err.to_string().contains("replica_groups > 1"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_hpa_enabled_with_sqlite() {
|
||||
let mut cfg = dev_config();
|
||||
cfg.hpa.enabled = true;
|
||||
let err = validate(&cfg).unwrap_err();
|
||||
assert!(err.to_string().contains("hpa.enabled"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_cdc_overflow_redis_without_redis() {
|
||||
let mut cfg = dev_config();
|
||||
cfg.cdc.enabled = true;
|
||||
cfg.cdc.buffer.overflow = "redis".into();
|
||||
let err = validate(&cfg).unwrap_err();
|
||||
assert!(err.to_string().contains("cdc.buffer.overflow"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_search_ui_rate_limit_redis_without_redis() {
|
||||
let mut cfg = dev_config();
|
||||
cfg.search_ui.enabled = true;
|
||||
cfg.search_ui.rate_limit.backend = "redis".into();
|
||||
let err = validate(&cfg).unwrap_err();
|
||||
assert!(err.to_string().contains("search_ui.rate_limit"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_replica_groups_gt1_without_leader_election() {
|
||||
let mut cfg = dev_config();
|
||||
cfg.task_store.backend = "redis".into();
|
||||
cfg.replica_groups = 2;
|
||||
cfg.leader_election.enabled = false;
|
||||
let err = validate(&cfg).unwrap_err();
|
||||
assert!(err.to_string().contains("leader_election"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_tenant_affinity_group_out_of_range() {
|
||||
let mut cfg = dev_config();
|
||||
cfg.tenant_affinity.enabled = true;
|
||||
cfg.tenant_affinity.dedicated_groups = vec![99];
|
||||
let err = validate(&cfg).unwrap_err();
|
||||
assert!(err.to_string().contains("tenant_affinity"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_tenant_affinity_static_map_out_of_range() {
|
||||
let mut cfg = dev_config();
|
||||
cfg.tenant_affinity.enabled = true;
|
||||
cfg.tenant_affinity.static_map.insert("tenant-a".into(), 99);
|
||||
let err = validate(&cfg).unwrap_err();
|
||||
assert!(err.to_string().contains("tenant_affinity.static_map"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_shadow_invalid_sample_rate() {
|
||||
let mut cfg = dev_config();
|
||||
cfg.shadow.enabled = true;
|
||||
cfg.shadow.targets = vec![advanced::ShadowTargetConfig {
|
||||
name: "t".into(),
|
||||
url: "http://t".into(),
|
||||
api_key_env: "MIROIR_SHADOW_KEY".into(),
|
||||
sample_rate: 0.0,
|
||||
operations: vec!["search".into()],
|
||||
}];
|
||||
let err = validate(&cfg).unwrap_err();
|
||||
assert!(err.to_string().contains("sample_rate"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_zero_server_port() {
|
||||
let mut cfg = dev_config();
|
||||
cfg.server.port = 0;
|
||||
let err = validate(&cfg).unwrap_err();
|
||||
assert!(err.to_string().contains("server.port"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_zero_replication_factor() {
|
||||
let mut cfg = dev_config();
|
||||
cfg.replication_factor = 0;
|
||||
let err = validate(&cfg).unwrap_err();
|
||||
assert!(err.to_string().contains("replication_factor"));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1159,7 +1159,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_rrf_document_equality() {
|
||||
use super::{RRFDocument, Ordering};
|
||||
use super::Ordering;
|
||||
let a = super::RRFDocument {
|
||||
rrf_score: 1.0,
|
||||
primary_key: "doc1".into(),
|
||||
|
|
|
|||
|
|
@ -593,4 +593,58 @@ mod tests {
|
|||
result.new_shard_cv
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn time_window_display_roundtrip() {
|
||||
let w = TimeWindow::parse("02:30-06:45").unwrap();
|
||||
assert_eq!(format!("{}", w), "02:30-06:45");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn time_window_parse_missing_dash() {
|
||||
let err = TimeWindow::parse("02:00").unwrap_err();
|
||||
assert!(err.contains("expected HH:MM-HH:MM"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn time_window_parse_invalid_hm() {
|
||||
let err = TimeWindow::parse("ab:00-06:00").unwrap_err();
|
||||
assert!(err.contains("invalid hour"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn time_window_parse_invalid_minute() {
|
||||
let err = TimeWindow::parse("02:ab-06:00").unwrap_err();
|
||||
assert!(err.contains("invalid minute"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn check_window_now_no_restriction() {
|
||||
let config = ReshardingConfig::default();
|
||||
assert!(matches!(check_window_now(&config), WindowGuardResult::NoRestriction));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn simulation_throttle_zero_gives_infinity_duration() {
|
||||
let params = SimParams {
|
||||
doc_size_bytes: 1024,
|
||||
corpus_size_bytes: 1024 * 1024,
|
||||
write_rate_dps: 100,
|
||||
replica_groups: 1,
|
||||
replication_factor: 1,
|
||||
old_shards: 4,
|
||||
new_shards: 8,
|
||||
nodes_per_group: 2,
|
||||
backfill_throttle_dps: 0,
|
||||
};
|
||||
let result = simulate(¶ms);
|
||||
assert!(result.backfill_duration_secs.is_infinite());
|
||||
assert_eq!(result.total_bytes_written, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cv_empty_and_zero() {
|
||||
assert_eq!(cv(&[]), 0.0);
|
||||
assert_eq!(cv(&[0, 0, 0]), 0.0);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,7 +45,12 @@ pub fn write_targets(shard_id: u32, topology: &Topology) -> Vec<NodeId> {
|
|||
}
|
||||
|
||||
/// Select the replica group for a query (round-robin by query counter).
|
||||
///
|
||||
/// Returns 0 when there are no replica groups (caller handles the empty case).
|
||||
pub fn query_group(query_seq: u64, replica_groups: u32) -> u32 {
|
||||
if replica_groups == 0 {
|
||||
return 0;
|
||||
}
|
||||
(query_seq % replica_groups as u64) as u32
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -643,7 +643,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_execute_scatter_node_not_in_topology() {
|
||||
// Build a plan, then use a topology that doesn't have the plan's nodes
|
||||
let mut topo = make_test_topology();
|
||||
let topo = make_test_topology();
|
||||
let plan = plan_search_scatter(&topo, 0, 2, 64);
|
||||
|
||||
// Empty topology — none of the plan's nodes exist
|
||||
|
|
|
|||
|
|
@ -144,3 +144,43 @@ impl TaskRegistry for StubTaskRegistry {
|
|||
Ok(Vec::new())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn stub_register_returns_enqueued_task() {
|
||||
let stub = StubTaskRegistry;
|
||||
let task = stub.register(HashMap::new()).unwrap();
|
||||
assert_eq!(task.status, TaskStatus::Enqueued);
|
||||
assert!(!task.miroir_id.is_empty());
|
||||
assert!(task.node_tasks.is_empty());
|
||||
assert!(task.error.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stub_get_returns_none() {
|
||||
let stub = StubTaskRegistry;
|
||||
assert!(stub.get("any-id").unwrap().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stub_update_status_is_ok() {
|
||||
let stub = StubTaskRegistry;
|
||||
stub.update_status("any", TaskStatus::Succeeded).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stub_update_node_task_is_ok() {
|
||||
let stub = StubTaskRegistry;
|
||||
stub.update_node_task("any", "node-0", NodeTaskStatus::Succeeded).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stub_list_returns_empty() {
|
||||
let stub = StubTaskRegistry;
|
||||
let tasks = stub.list(TaskFilter::default()).unwrap();
|
||||
assert!(tasks.is_empty());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -191,6 +191,10 @@ mod tests {
|
|||
use crate::config::TaskRegistryConfig;
|
||||
use crate::task_store::{NewTask, SqliteTaskStore, TaskStore};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Mutex;
|
||||
|
||||
/// Serialize tests that read/write the global `TASK_REGISTRY_SIZE` gauge.
|
||||
static GAUGE_LOCK: Mutex<()> = Mutex::new(());
|
||||
|
||||
fn test_store() -> SqliteTaskStore {
|
||||
let store = SqliteTaskStore::open_in_memory().unwrap();
|
||||
|
|
@ -227,6 +231,7 @@ mod tests {
|
|||
/// next pruner cycle drops all 10k.
|
||||
#[test]
|
||||
fn pruner_deletes_10k_old_terminal_tasks() {
|
||||
let _lock = GAUGE_LOCK.lock().unwrap();
|
||||
let store = test_store();
|
||||
let eight_days_ms: i64 = 8 * 24 * 3600 * 1000;
|
||||
let old_time = now() - eight_days_ms;
|
||||
|
|
|
|||
|
|
@ -76,12 +76,17 @@ impl RedisPool {
|
|||
F::Output: Send + 'static,
|
||||
{
|
||||
// Check if we're already in a tokio runtime
|
||||
if let Ok(handle) = tokio::runtime::Handle::try_current() {
|
||||
// We're in a runtime - spawn a thread to avoid blocking
|
||||
let handle2 = handle.clone();
|
||||
std::thread::spawn(move || handle2.block_on(future))
|
||||
.join()
|
||||
.unwrap_or_else(|_| panic!("thread panicked"))
|
||||
if tokio::runtime::Handle::try_current().is_ok() {
|
||||
// We're in a runtime - spawn a thread with its own runtime to avoid blocking
|
||||
std::thread::spawn(move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create runtime in thread");
|
||||
rt.block_on(future)
|
||||
})
|
||||
.join()
|
||||
.unwrap_or_else(|_| panic!("thread panicked"))
|
||||
} else {
|
||||
// Not in a runtime - use the dedicated runtime
|
||||
self.runtime.block_on(future)
|
||||
|
|
@ -2497,7 +2502,7 @@ mod tests {
|
|||
assert_eq!(alias.history.len(), 1);
|
||||
|
||||
// Flip with retention trim
|
||||
for (i, uid) in ["uid-v3", "uid-v4", "uid-v5"].iter().enumerate() {
|
||||
for uid in ["uid-v3", "uid-v4", "uid-v5"] {
|
||||
store
|
||||
.flip_alias("prod-logs", uid, 2)
|
||||
.expect("Flip should succeed");
|
||||
|
|
|
|||
|
|
@ -495,6 +495,30 @@ nodes:
|
|||
assert!(g1_ids.contains(&"meili-5"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn topology_nodes_iterator() {
|
||||
let topo = make_test_topology();
|
||||
let all_nodes: Vec<&Node> = topo.nodes().collect();
|
||||
assert_eq!(all_nodes.len(), 6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserialize_auto_derives_replica_groups() {
|
||||
let yaml = r#"
|
||||
shards: 32
|
||||
rf: 1
|
||||
nodes:
|
||||
- id: "n0"
|
||||
address: "http://n0:7700"
|
||||
replica_group: 2
|
||||
- id: "n1"
|
||||
address: "http://n1:7700"
|
||||
replica_group: 2
|
||||
"#;
|
||||
let topo: Topology = serde_yaml::from_str(yaml).unwrap();
|
||||
assert_eq!(topo.replica_groups, 3);
|
||||
}
|
||||
|
||||
// ── State machine ─────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue