diff --git a/crates/miroir-core/src/cdc.rs b/crates/miroir-core/src/cdc.rs index 0c5e4fa..9338865 100644 --- a/crates/miroir-core/src/cdc.rs +++ b/crates/miroir-core/src/cdc.rs @@ -1017,42 +1017,41 @@ impl CdcBuffer { ) -> Result { let primary = Arc::new(CdcMemoryBuffer::new(config.memory_bytes)); - let overflow: Arc = match CdcBufferType::parse_from_str( - config.overflow.as_str(), - ) { - Some(CdcBufferType::Redis) => { - #[cfg(feature = "redis-store")] - { - // Redis pool will be created lazily on first use - let redis_url = std::env::var("MIROIR_REDIS_URL") - .unwrap_or_else(|_| "redis://localhost:6379".to_string()); - let backend = - CdcRedisOverflow::lazy_new(sink_name, config.redis_bytes, redis_url); - Arc::new(backend) + let overflow: Arc = + match CdcBufferType::parse_from_str(config.overflow.as_str()) { + Some(CdcBufferType::Redis) => { + #[cfg(feature = "redis-store")] + { + // Redis pool will be created lazily on first use + let redis_url = std::env::var("MIROIR_REDIS_URL") + .unwrap_or_else(|_| "redis://localhost:6379".to_string()); + let backend = + CdcRedisOverflow::lazy_new(sink_name, config.redis_bytes, redis_url); + Arc::new(backend) + } + #[cfg(not(feature = "redis-store"))] + { + warn!("CDC: redis overflow requested but redis-store feature not enabled, using drop backend"); + Arc::new(CdcDropOverflow::new(sink_name, dropped_callback.clone())) + } } - #[cfg(not(feature = "redis-store"))] - { - warn!("CDC: redis overflow requested but redis-store feature not enabled, using drop backend"); + Some(CdcBufferType::Pvc) => { + let data_dir = + std::env::var("MIROIR_DATA_DIR").unwrap_or_else(|_| "/data".to_string()); + Arc::new(CdcPvcOverflow::new( + sink_name, + std::path::PathBuf::from(data_dir), + config.redis_bytes, // Use same budget + )) + } + Some(CdcBufferType::Drop) => { Arc::new(CdcDropOverflow::new(sink_name, dropped_callback.clone())) } - } - Some(CdcBufferType::Pvc) => { - let data_dir = - std::env::var("MIROIR_DATA_DIR").unwrap_or_else(|_| "/data".to_string()); - Arc::new(CdcPvcOverflow::new( - sink_name, - std::path::PathBuf::from(data_dir), - config.redis_bytes, // Use same budget - )) - } - Some(CdcBufferType::Drop) => { - Arc::new(CdcDropOverflow::new(sink_name, dropped_callback.clone())) - } - Some(CdcBufferType::Memory) | None => { - // Memory overflow = drop (no secondary buffer) - Arc::new(CdcDropOverflow::new(sink_name, dropped_callback.clone())) - } - }; + Some(CdcBufferType::Memory) | None => { + // Memory overflow = drop (no secondary buffer) + Arc::new(CdcDropOverflow::new(sink_name, dropped_callback.clone())) + } + }; Ok(Self { primary, @@ -2123,9 +2122,18 @@ mod tests { CdcBufferType::parse_from_str("MEMORY"), Some(CdcBufferType::Memory) ); - assert_eq!(CdcBufferType::parse_from_str("redis"), Some(CdcBufferType::Redis)); - assert_eq!(CdcBufferType::parse_from_str("pvc"), Some(CdcBufferType::Pvc)); - assert_eq!(CdcBufferType::parse_from_str("drop"), Some(CdcBufferType::Drop)); + assert_eq!( + CdcBufferType::parse_from_str("redis"), + Some(CdcBufferType::Redis) + ); + assert_eq!( + CdcBufferType::parse_from_str("pvc"), + Some(CdcBufferType::Pvc) + ); + assert_eq!( + CdcBufferType::parse_from_str("drop"), + Some(CdcBufferType::Drop) + ); assert_eq!(CdcBufferType::parse_from_str("unknown"), None); } diff --git a/crates/miroir-core/src/vector.rs b/crates/miroir-core/src/vector.rs index f0fb55a..7099f2d 100644 --- a/crates/miroir-core/src/vector.rs +++ b/crates/miroir-core/src/vector.rs @@ -335,8 +335,10 @@ mod tests { // Should deduplicate doc1, keeping the highest combined score assert_eq!(result.len(), 3); assert_eq!(result[0].pk, "doc2"); // (0.7 + 0.9) / 2 = 0.8 (highest) - assert_eq!(result[1].pk, "doc1"); // (0.8 + 0.6) / 2 = 0.7, kept over shard 1's 0.7 - assert_eq!(result[2].pk, "doc3"); // (0.9 + 0.5) / 2 = 0.7 + // doc1 and doc3 both have score 0.7; order between them is unstable + let pks: Vec<_> = result.iter().map(|h| h.pk.as_str()).collect(); + assert!(pks[1..].contains(&"doc1")); + assert!(pks[1..].contains(&"doc3")); } #[test]