diff --git a/crates/miroir-core/src/alias/acceptance_tests.rs b/crates/miroir-core/src/alias/acceptance_tests.rs index 7694529..37418c4 100644 --- a/crates/miroir-core/src/alias/acceptance_tests.rs +++ b/crates/miroir-core/src/alias/acceptance_tests.rs @@ -154,7 +154,7 @@ async fn multi_target_alias_rejects_flip_operation() { /// - 11th flip evicts the oldest entry #[tokio::test] async fn history_retention_evicts_oldest_on_11th_flip() { - use crate::task_store::{AliasHistoryEntry, SqliteTaskStore}; + use crate::task_store::SqliteTaskStore; use std::time::SystemTime; // Create in-memory store with migration diff --git a/crates/miroir-core/src/cdc.rs b/crates/miroir-core/src/cdc.rs index f2ff3be..0c5e4fa 100644 --- a/crates/miroir-core/src/cdc.rs +++ b/crates/miroir-core/src/cdc.rs @@ -467,8 +467,10 @@ pub struct CdcManager { /// Optional callback to increment suppression metric. suppressed_metric_callback: Option, /// Per-sink tiered buffers. + #[allow(dead_code)] buffers: HashMap>, /// Optional callback to increment dropped events metric. + #[allow(dead_code)] dropped_metric_callback: Option, /// Internal queue for GET /_miroir/changes endpoint. internal_queue: Arc, @@ -519,7 +521,8 @@ pub enum CdcBufferType { impl CdcBufferType { /// Parse a buffer type from a string (for config deserialization). - pub fn from_str(s: &str) -> Option { + #[allow(dead_code)] + pub fn parse_from_str(s: &str) -> Option { match s.to_lowercase().as_str() { "memory" => Some(CdcBufferType::Memory), "redis" => Some(CdcBufferType::Redis), @@ -551,6 +554,7 @@ pub struct CdcBuffer { /// Overflow backend. overflow: Arc, /// Metric callback for dropped events. + #[allow(dead_code)] dropped_metric_callback: Option, } @@ -599,6 +603,7 @@ impl CdcMemoryBuffer { } /// Release space after event is processed. + #[allow(dead_code)] fn release(&self, size: u64) { let old = self .current_bytes @@ -632,12 +637,16 @@ pub struct CdcRedisOverflow { #[cfg(feature = "redis-store")] pool: Option, /// Sink name for key prefix. + #[allow(dead_code)] sink_name: String, /// Max bytes in Redis overflow. + #[allow(dead_code)] max_bytes: u64, /// Key for overflow list. + #[allow(dead_code)] list_key: String, /// Key for byte counter. + #[allow(dead_code)] bytes_key: String, } @@ -763,6 +772,7 @@ impl CdcRedisOverflow { #[async_trait::async_trait] impl CdcOverflowBackend for CdcRedisOverflow { + #[allow(unused_variables)] async fn push(&self, event: CdcEvent) -> Result<(), CdcError> { #[cfg(feature = "redis-store")] return self.push_inner(event).await; @@ -1007,7 +1017,7 @@ impl CdcBuffer { ) -> Result { let primary = Arc::new(CdcMemoryBuffer::new(config.memory_bytes)); - let overflow: Arc = match CdcBufferType::from_str( + let overflow: Arc = match CdcBufferType::parse_from_str( config.overflow.as_str(), ) { Some(CdcBufferType::Redis) => { @@ -2106,17 +2116,17 @@ mod tests { #[test] fn test_cdc_buffer_type_from_str() { assert_eq!( - CdcBufferType::from_str("memory"), + CdcBufferType::parse_from_str("memory"), Some(CdcBufferType::Memory) ); assert_eq!( - CdcBufferType::from_str("MEMORY"), + CdcBufferType::parse_from_str("MEMORY"), Some(CdcBufferType::Memory) ); - assert_eq!(CdcBufferType::from_str("redis"), Some(CdcBufferType::Redis)); - assert_eq!(CdcBufferType::from_str("pvc"), Some(CdcBufferType::Pvc)); - assert_eq!(CdcBufferType::from_str("drop"), Some(CdcBufferType::Drop)); - assert_eq!(CdcBufferType::from_str("unknown"), None); + assert_eq!(CdcBufferType::parse_from_str("redis"), Some(CdcBufferType::Redis)); + assert_eq!(CdcBufferType::parse_from_str("pvc"), Some(CdcBufferType::Pvc)); + assert_eq!(CdcBufferType::parse_from_str("drop"), Some(CdcBufferType::Drop)); + assert_eq!(CdcBufferType::parse_from_str("unknown"), None); } #[tokio::test] diff --git a/crates/miroir-core/src/group_sync_worker.rs b/crates/miroir-core/src/group_sync_worker.rs index ea4a98f..55335b7 100644 --- a/crates/miroir-core/src/group_sync_worker.rs +++ b/crates/miroir-core/src/group_sync_worker.rs @@ -6,7 +6,6 @@ //! - Tracks progress via GroupAdditionCoordinator //! - Pauses/resumes per Phase 6 Mode C -use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -186,9 +185,7 @@ impl GroupSyncWorker { }; let target = topology.group(target_group_id).ok_or_else(|| { - crate::error::MiroirError::Topology(format!( - "target group {target_group_id} not found" - )) + crate::error::MiroirError::Topology(format!("target group {target_group_id} not found")) })?; // Find healthy nodes in source and target groups @@ -342,8 +339,8 @@ impl GroupSyncWorker { ); let mut coord = self.coordinator.write().await; - let _ = coord - .fail_addition(addition_id, format!("sync timeout after {timeout:?}")); + let _ = + coord.fail_addition(addition_id, format!("sync timeout after {timeout:?}")); } } } @@ -375,8 +372,8 @@ impl GroupSyncWorker { mod tests { use super::*; use crate::group_addition::{GroupAdditionConfig, GroupAdditionCoordinator}; - use crate::scatter::FetchDocumentsResponse; use crate::topology::Node; + use std::collections::HashMap; use std::sync::Arc; // Mock node client for testing diff --git a/crates/miroir-core/src/mode_a_coordinator.rs b/crates/miroir-core/src/mode_a_coordinator.rs index 5d3416a..b6f2ad2 100644 --- a/crates/miroir-core/src/mode_a_coordinator.rs +++ b/crates/miroir-core/src/mode_a_coordinator.rs @@ -396,8 +396,6 @@ mod tests { #[tokio::test] async fn test_no_peers_error() { - use tokio::sync::RwLock; - // Create a coordinator with an empty peer set let peer_discovery = Arc::new(PeerDiscovery::new( "test-pod".to_string(), @@ -514,8 +512,6 @@ mod tests { } fn test_coordinator() -> ModeACoordinator { - use std::net::{Ipv4Addr, SocketAddr}; - // Create a mock peer discovery with our pod let peer_discovery = Arc::new(PeerDiscovery::new( "test-pod".to_string(), diff --git a/crates/miroir-core/src/mode_b_acceptance_tests.rs b/crates/miroir-core/src/mode_b_acceptance_tests.rs index ad41b7a..cde8917 100644 --- a/crates/miroir-core/src/mode_b_acceptance_tests.rs +++ b/crates/miroir-core/src/mode_b_acceptance_tests.rs @@ -9,12 +9,11 @@ use crate::config::LeaderElectionConfig; use crate::leader_election::LeaderElection; -use crate::mode_b_coordinator::{ModeBOpLeader, PhaseState}; +use crate::mode_b_coordinator::ModeBOpLeader; use crate::task_store::{mode_b_type, SqliteTaskStore, TaskStore}; use serde::{Deserialize, Serialize}; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tokio::sync::RwLock; /// Test extra state for reshard operations. #[derive(Debug, Clone, Serialize, Deserialize, Default)] diff --git a/crates/miroir-core/src/mode_b_coordinator.rs b/crates/miroir-core/src/mode_b_coordinator.rs index b3f0bf6..950e17f 100644 --- a/crates/miroir-core/src/mode_b_coordinator.rs +++ b/crates/miroir-core/src/mode_b_coordinator.rs @@ -14,7 +14,7 @@ use crate::error::{MiroirError, Result}; use crate::leader_election::LeaderElection; -use crate::task_store::{mode_b_status, mode_b_type, ModeBOperation, TaskStore}; +use crate::task_store::{mode_b_status, ModeBOperation, TaskStore}; use serde::{Deserialize, Serialize}; use std::sync::Arc; use tracing::{debug, info, warn}; @@ -357,7 +357,7 @@ fn millis_now() -> i64 { mod tests { use super::*; use crate::config::LeaderElectionConfig; - use crate::task_store::SqliteTaskStore; + use crate::task_store::{mode_b_type, SqliteTaskStore}; #[derive(Debug, Clone, Serialize, Deserialize, Default)] struct TestExtraState { diff --git a/crates/miroir-core/src/multi_search.rs b/crates/miroir-core/src/multi_search.rs index e1de0fe..b5bfbf3 100644 --- a/crates/miroir-core/src/multi_search.rs +++ b/crates/miroir-core/src/multi_search.rs @@ -233,13 +233,15 @@ mod tests { #[test] fn test_validate_too_many_queries() { - let mut config = MultiSearchConfig::default(); - config.max_queries_per_batch = 10; + let config = MultiSearchConfig { + max_queries_per_batch: 10, + ..Default::default() + }; let executor = MultiSearchExecutor::new(config); let queries: Vec = (0..20) .map(|i| SearchQuery { - index_uid: format!("index-{}", i), + index_uid: format!("index-{i}"), q: Some("test".into()), filter: None, limit: Some(10), @@ -337,8 +339,8 @@ mod tests { let queries: Vec = (0..5) .map(|i| SearchQuery { - index_uid: format!("index-{}", i), - q: Some(format!("query-{}", i)), + index_uid: format!("index-{i}"), + q: Some(format!("query-{i}")), filter: None, limit: Some(10), offset: Some(0), @@ -365,16 +367,18 @@ mod tests { assert_eq!(response.results.len(), 5); for (i, result) in response.results.iter().enumerate() { - assert!(result.is_success(), "Query {} should succeed", i); - assert!(result.body.is_some(), "Query {} should have body", i); + assert!(result.is_success(), "Query {i} should succeed"); + assert!(result.body.is_some(), "Query {i} should have body"); } } /// P5.11-A2: Slow query doesn't block fast queries (parallel execution). #[tokio::test] async fn test_slow_query_doesnt_block_fast_queries() { - let mut config = MultiSearchConfig::default(); - config.per_query_timeout_ms = 5000; + let config = MultiSearchConfig { + per_query_timeout_ms: 5000, + ..Default::default() + }; let executor = MultiSearchExecutor::new(config); let request = MultiSearchRequest { @@ -581,7 +585,7 @@ mod tests { }; let response = executor - .execute(request, |query| async move { + .execute(request, |_query| async move { tokio::time::sleep(Duration::from_millis(200)).await; Ok(SearchResultData { body: serde_json::json!({"hits": []}), @@ -602,16 +606,18 @@ mod tests { /// P5.11-A6: 100-query batch completes under total timeout. #[tokio::test] async fn test_large_batch_completes() { - let mut config = MultiSearchConfig::default(); - config.max_queries_per_batch = 100; - config.total_timeout_ms = 30000; - config.per_query_timeout_ms = 5000; + let config = MultiSearchConfig { + max_queries_per_batch: 100, + total_timeout_ms: 30000, + per_query_timeout_ms: 5000, + ..Default::default() + }; let executor = MultiSearchExecutor::new(config); let queries: Vec = (0..100) .map(|i| SearchQuery { index_uid: format!("index-{}", i % 10), - q: Some(format!("query-{}", i)), + q: Some(format!("query-{i}")), filter: None, limit: Some(10), offset: Some(0), diff --git a/crates/miroir-core/src/rebalancer_worker/anti_entropy_worker.rs b/crates/miroir-core/src/rebalancer_worker/anti_entropy_worker.rs index 845d376..8bfc334 100644 --- a/crates/miroir-core/src/rebalancer_worker/anti_entropy_worker.rs +++ b/crates/miroir-core/src/rebalancer_worker/anti_entropy_worker.rs @@ -20,10 +20,10 @@ use reqwest::Client; // Type alias for ModeACoordinator that becomes a dummy type when feature is disabled #[cfg(feature = "peer-discovery")] -type ModeACoordinator = ActualModeACoordinator; +pub type ModeACoordinator = ActualModeACoordinator; #[cfg(not(feature = "peer-discovery"))] -struct ModeACoordinator; +pub struct ModeACoordinator; #[cfg(not(feature = "peer-discovery"))] impl ModeACoordinator { @@ -273,7 +273,8 @@ impl NodeClient for HttpNodeClient { let results = json .get("results") - .and_then(|v| v.as_array()).cloned() + .and_then(|v| v.as_array()) + .cloned() .unwrap_or_default(); let total = json.get("total").and_then(|v| v.as_u64()).unwrap_or(0); @@ -460,18 +461,19 @@ pub struct AntiEntropyWorker { config: AntiEntropyWorkerConfig, reconciler: AntiEntropyReconciler, topology: Arc>, + #[allow(dead_code)] task_store: Arc, pod_id: String, /// Mode A coordinator for shard-partitioned ownership (plan §14.5 Mode A). mode_a_coordinator: Option>, /// Total shards in the cluster (for Mode A scaling). - total_shards: u32, + _total_shards: u32, /// This pod's replica group ID (for Mode A scaling). replica_group_id: Option, /// Total number of pods in Mode A scaling. num_pods: Option, /// RF (replication factor) for Mode A scaling. - rf: usize, + _rf: usize, /// Whether TTL is enabled for expired document handling (plan §13.14 interaction). ttl_enabled: bool, /// Metrics callback for shards scanned. @@ -517,10 +519,10 @@ impl AntiEntropyWorker { task_store, pod_id, mode_a_coordinator: None, - total_shards: 0, // Will be set when Mode A is enabled + _total_shards: 0, // Will be set when Mode A is enabled replica_group_id: None, num_pods: None, - rf: 2, // Default RF + _rf: 2, // Default RF ttl_enabled: false, metrics_shards_scanned: None, metrics_mismatches_found: None, @@ -617,6 +619,7 @@ impl AntiEntropyWorker { /// /// This runs the pass immediately after acquiring lease, then waits /// for the configured interval before running again (if still leader). + #[allow(dead_code)] async fn run_pass_cycle(&self) -> Result<(), String> { let scope = "anti_entropy"; let mut lease_renewal = @@ -675,7 +678,7 @@ impl AntiEntropyWorker { info!("starting anti-entropy pass"); // Get topology info for Mode A scaling - let (total_shards, rf) = { + let (_total_shards, _rf) = { let topo = self.topology.read().await; (topo.shards, topo.rf()) }; @@ -715,6 +718,7 @@ impl AntiEntropyWorker { } /// Get current time in milliseconds since Unix epoch. +#[allow(dead_code)] fn now_ms() -> i64 { SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/crates/miroir-core/src/rebalancer_worker/mod.rs b/crates/miroir-core/src/rebalancer_worker/mod.rs index 9ee1ce3..0124ccd 100644 --- a/crates/miroir-core/src/rebalancer_worker/mod.rs +++ b/crates/miroir-core/src/rebalancer_worker/mod.rs @@ -1735,8 +1735,6 @@ impl RebalancerWorker { Ok(()) } - /// Pause an in-progress rebalance. - /// Pause an in-progress rebalance. pub async fn pause_rebalance(&self, index_uid: &str) -> Result<(), String> { let job_id = RebalanceJobId::new(index_uid); diff --git a/crates/miroir-core/src/scatter.rs b/crates/miroir-core/src/scatter.rs index ba7165b..dfc77f9 100644 --- a/crates/miroir-core/src/scatter.rs +++ b/crates/miroir-core/src/scatter.rs @@ -10,7 +10,6 @@ use crate::Result; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; -use std::sync::Arc; use std::time::Instant; use tokio::time::Duration; use tracing::{info_span, instrument, Instrument}; @@ -1547,6 +1546,7 @@ impl NodeClient for MockNodeClient { mod tests { use super::*; use crate::topology::{Node, NodeId}; + use std::sync::Arc; fn make_test_topology() -> Topology { let mut topo = Topology::new(64, 2, 2);