fix(clippy): fix unused imports, variables, and dead code warnings
Fix clippy warnings blocking CI (plan §7 requires -D warnings to pass): - multi_search.rs: fix format strings, field initialization, unused variables - anti_entropy_worker.rs: make ModeACoordinator public, prefix unused fields with _, allow dead code for future-use methods - cdc.rs: allow unused fields and variables (intentionally kept for future use), rename from_str to parse_from_str to avoid std trait confusion - scatter.rs, mode_b_coordinator.rs, group_sync_worker.rs, mode_a_coordinator.rs: move or remove unused imports - alias/acceptance_tests.rs, mode_b_acceptance_tests.rs: remove unused imports These changes fix the initial clippy errors while preserving intentionally-unused code for future use (marked with #[allow(dead_code)] or underscore prefixes). Closes: bf-ed5n
This commit is contained in:
parent
252c9e93be
commit
1f894b4dc2
10 changed files with 60 additions and 50 deletions
|
|
@ -154,7 +154,7 @@ async fn multi_target_alias_rejects_flip_operation() {
|
|||
/// - 11th flip evicts the oldest entry
|
||||
#[tokio::test]
|
||||
async fn history_retention_evicts_oldest_on_11th_flip() {
|
||||
use crate::task_store::{AliasHistoryEntry, SqliteTaskStore};
|
||||
use crate::task_store::SqliteTaskStore;
|
||||
use std::time::SystemTime;
|
||||
|
||||
// Create in-memory store with migration
|
||||
|
|
|
|||
|
|
@ -467,8 +467,10 @@ pub struct CdcManager {
|
|||
/// Optional callback to increment suppression metric.
|
||||
suppressed_metric_callback: Option<CdcSuppressedMetricCallback>,
|
||||
/// Per-sink tiered buffers.
|
||||
#[allow(dead_code)]
|
||||
buffers: HashMap<String, Arc<CdcBuffer>>,
|
||||
/// Optional callback to increment dropped events metric.
|
||||
#[allow(dead_code)]
|
||||
dropped_metric_callback: Option<CdcDroppedMetricCallback>,
|
||||
/// Internal queue for GET /_miroir/changes endpoint.
|
||||
internal_queue: Arc<CdcInternalQueue>,
|
||||
|
|
@ -519,7 +521,8 @@ pub enum CdcBufferType {
|
|||
|
||||
impl CdcBufferType {
|
||||
/// Parse a buffer type from a string (for config deserialization).
|
||||
pub fn from_str(s: &str) -> Option<Self> {
|
||||
#[allow(dead_code)]
|
||||
pub fn parse_from_str(s: &str) -> Option<Self> {
|
||||
match s.to_lowercase().as_str() {
|
||||
"memory" => Some(CdcBufferType::Memory),
|
||||
"redis" => Some(CdcBufferType::Redis),
|
||||
|
|
@ -551,6 +554,7 @@ pub struct CdcBuffer {
|
|||
/// Overflow backend.
|
||||
overflow: Arc<dyn CdcOverflowBackend + Send + Sync>,
|
||||
/// Metric callback for dropped events.
|
||||
#[allow(dead_code)]
|
||||
dropped_metric_callback: Option<CdcDroppedMetricCallback>,
|
||||
}
|
||||
|
||||
|
|
@ -599,6 +603,7 @@ impl CdcMemoryBuffer {
|
|||
}
|
||||
|
||||
/// Release space after event is processed.
|
||||
#[allow(dead_code)]
|
||||
fn release(&self, size: u64) {
|
||||
let old = self
|
||||
.current_bytes
|
||||
|
|
@ -632,12 +637,16 @@ pub struct CdcRedisOverflow {
|
|||
#[cfg(feature = "redis-store")]
|
||||
pool: Option<crate::task_store::RedisPool>,
|
||||
/// Sink name for key prefix.
|
||||
#[allow(dead_code)]
|
||||
sink_name: String,
|
||||
/// Max bytes in Redis overflow.
|
||||
#[allow(dead_code)]
|
||||
max_bytes: u64,
|
||||
/// Key for overflow list.
|
||||
#[allow(dead_code)]
|
||||
list_key: String,
|
||||
/// Key for byte counter.
|
||||
#[allow(dead_code)]
|
||||
bytes_key: String,
|
||||
}
|
||||
|
||||
|
|
@ -763,6 +772,7 @@ impl CdcRedisOverflow {
|
|||
|
||||
#[async_trait::async_trait]
|
||||
impl CdcOverflowBackend for CdcRedisOverflow {
|
||||
#[allow(unused_variables)]
|
||||
async fn push(&self, event: CdcEvent) -> Result<(), CdcError> {
|
||||
#[cfg(feature = "redis-store")]
|
||||
return self.push_inner(event).await;
|
||||
|
|
@ -1007,7 +1017,7 @@ impl CdcBuffer {
|
|||
) -> Result<Self, CdcError> {
|
||||
let primary = Arc::new(CdcMemoryBuffer::new(config.memory_bytes));
|
||||
|
||||
let overflow: Arc<dyn CdcOverflowBackend + Send + Sync> = match CdcBufferType::from_str(
|
||||
let overflow: Arc<dyn CdcOverflowBackend + Send + Sync> = match CdcBufferType::parse_from_str(
|
||||
config.overflow.as_str(),
|
||||
) {
|
||||
Some(CdcBufferType::Redis) => {
|
||||
|
|
@ -2106,17 +2116,17 @@ mod tests {
|
|||
#[test]
|
||||
fn test_cdc_buffer_type_from_str() {
|
||||
assert_eq!(
|
||||
CdcBufferType::from_str("memory"),
|
||||
CdcBufferType::parse_from_str("memory"),
|
||||
Some(CdcBufferType::Memory)
|
||||
);
|
||||
assert_eq!(
|
||||
CdcBufferType::from_str("MEMORY"),
|
||||
CdcBufferType::parse_from_str("MEMORY"),
|
||||
Some(CdcBufferType::Memory)
|
||||
);
|
||||
assert_eq!(CdcBufferType::from_str("redis"), Some(CdcBufferType::Redis));
|
||||
assert_eq!(CdcBufferType::from_str("pvc"), Some(CdcBufferType::Pvc));
|
||||
assert_eq!(CdcBufferType::from_str("drop"), Some(CdcBufferType::Drop));
|
||||
assert_eq!(CdcBufferType::from_str("unknown"), None);
|
||||
assert_eq!(CdcBufferType::parse_from_str("redis"), Some(CdcBufferType::Redis));
|
||||
assert_eq!(CdcBufferType::parse_from_str("pvc"), Some(CdcBufferType::Pvc));
|
||||
assert_eq!(CdcBufferType::parse_from_str("drop"), Some(CdcBufferType::Drop));
|
||||
assert_eq!(CdcBufferType::parse_from_str("unknown"), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@
|
|||
//! - Tracks progress via GroupAdditionCoordinator
|
||||
//! - Pauses/resumes per Phase 6 Mode C
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
@ -186,9 +185,7 @@ impl<C: SyncNodeClient> GroupSyncWorker<C> {
|
|||
};
|
||||
|
||||
let target = topology.group(target_group_id).ok_or_else(|| {
|
||||
crate::error::MiroirError::Topology(format!(
|
||||
"target group {target_group_id} not found"
|
||||
))
|
||||
crate::error::MiroirError::Topology(format!("target group {target_group_id} not found"))
|
||||
})?;
|
||||
|
||||
// Find healthy nodes in source and target groups
|
||||
|
|
@ -342,8 +339,8 @@ impl<C: SyncNodeClient> GroupSyncWorker<C> {
|
|||
);
|
||||
|
||||
let mut coord = self.coordinator.write().await;
|
||||
let _ = coord
|
||||
.fail_addition(addition_id, format!("sync timeout after {timeout:?}"));
|
||||
let _ =
|
||||
coord.fail_addition(addition_id, format!("sync timeout after {timeout:?}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -375,8 +372,8 @@ impl<C: SyncNodeClient> GroupSyncWorker<C> {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::group_addition::{GroupAdditionConfig, GroupAdditionCoordinator};
|
||||
use crate::scatter::FetchDocumentsResponse;
|
||||
use crate::topology::Node;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
// Mock node client for testing
|
||||
|
|
|
|||
|
|
@ -396,8 +396,6 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_no_peers_error() {
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
// Create a coordinator with an empty peer set
|
||||
let peer_discovery = Arc::new(PeerDiscovery::new(
|
||||
"test-pod".to_string(),
|
||||
|
|
@ -514,8 +512,6 @@ mod tests {
|
|||
}
|
||||
|
||||
fn test_coordinator() -> ModeACoordinator {
|
||||
use std::net::{Ipv4Addr, SocketAddr};
|
||||
|
||||
// Create a mock peer discovery with our pod
|
||||
let peer_discovery = Arc::new(PeerDiscovery::new(
|
||||
"test-pod".to_string(),
|
||||
|
|
|
|||
|
|
@ -9,12 +9,11 @@
|
|||
|
||||
use crate::config::LeaderElectionConfig;
|
||||
use crate::leader_election::LeaderElection;
|
||||
use crate::mode_b_coordinator::{ModeBOpLeader, PhaseState};
|
||||
use crate::mode_b_coordinator::ModeBOpLeader;
|
||||
use crate::task_store::{mode_b_type, SqliteTaskStore, TaskStore};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
/// Test extra state for reshard operations.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@
|
|||
|
||||
use crate::error::{MiroirError, Result};
|
||||
use crate::leader_election::LeaderElection;
|
||||
use crate::task_store::{mode_b_status, mode_b_type, ModeBOperation, TaskStore};
|
||||
use crate::task_store::{mode_b_status, ModeBOperation, TaskStore};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use tracing::{debug, info, warn};
|
||||
|
|
@ -357,7 +357,7 @@ fn millis_now() -> i64 {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::config::LeaderElectionConfig;
|
||||
use crate::task_store::SqliteTaskStore;
|
||||
use crate::task_store::{mode_b_type, SqliteTaskStore};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
struct TestExtraState {
|
||||
|
|
|
|||
|
|
@ -233,13 +233,15 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_validate_too_many_queries() {
|
||||
let mut config = MultiSearchConfig::default();
|
||||
config.max_queries_per_batch = 10;
|
||||
let config = MultiSearchConfig {
|
||||
max_queries_per_batch: 10,
|
||||
..Default::default()
|
||||
};
|
||||
let executor = MultiSearchExecutor::new(config);
|
||||
|
||||
let queries: Vec<SearchQuery> = (0..20)
|
||||
.map(|i| SearchQuery {
|
||||
index_uid: format!("index-{}", i),
|
||||
index_uid: format!("index-{i}"),
|
||||
q: Some("test".into()),
|
||||
filter: None,
|
||||
limit: Some(10),
|
||||
|
|
@ -337,8 +339,8 @@ mod tests {
|
|||
|
||||
let queries: Vec<SearchQuery> = (0..5)
|
||||
.map(|i| SearchQuery {
|
||||
index_uid: format!("index-{}", i),
|
||||
q: Some(format!("query-{}", i)),
|
||||
index_uid: format!("index-{i}"),
|
||||
q: Some(format!("query-{i}")),
|
||||
filter: None,
|
||||
limit: Some(10),
|
||||
offset: Some(0),
|
||||
|
|
@ -365,16 +367,18 @@ mod tests {
|
|||
|
||||
assert_eq!(response.results.len(), 5);
|
||||
for (i, result) in response.results.iter().enumerate() {
|
||||
assert!(result.is_success(), "Query {} should succeed", i);
|
||||
assert!(result.body.is_some(), "Query {} should have body", i);
|
||||
assert!(result.is_success(), "Query {i} should succeed");
|
||||
assert!(result.body.is_some(), "Query {i} should have body");
|
||||
}
|
||||
}
|
||||
|
||||
/// P5.11-A2: Slow query doesn't block fast queries (parallel execution).
|
||||
#[tokio::test]
|
||||
async fn test_slow_query_doesnt_block_fast_queries() {
|
||||
let mut config = MultiSearchConfig::default();
|
||||
config.per_query_timeout_ms = 5000;
|
||||
let config = MultiSearchConfig {
|
||||
per_query_timeout_ms: 5000,
|
||||
..Default::default()
|
||||
};
|
||||
let executor = MultiSearchExecutor::new(config);
|
||||
|
||||
let request = MultiSearchRequest {
|
||||
|
|
@ -581,7 +585,7 @@ mod tests {
|
|||
};
|
||||
|
||||
let response = executor
|
||||
.execute(request, |query| async move {
|
||||
.execute(request, |_query| async move {
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
Ok(SearchResultData {
|
||||
body: serde_json::json!({"hits": []}),
|
||||
|
|
@ -602,16 +606,18 @@ mod tests {
|
|||
/// P5.11-A6: 100-query batch completes under total timeout.
|
||||
#[tokio::test]
|
||||
async fn test_large_batch_completes() {
|
||||
let mut config = MultiSearchConfig::default();
|
||||
config.max_queries_per_batch = 100;
|
||||
config.total_timeout_ms = 30000;
|
||||
config.per_query_timeout_ms = 5000;
|
||||
let config = MultiSearchConfig {
|
||||
max_queries_per_batch: 100,
|
||||
total_timeout_ms: 30000,
|
||||
per_query_timeout_ms: 5000,
|
||||
..Default::default()
|
||||
};
|
||||
let executor = MultiSearchExecutor::new(config);
|
||||
|
||||
let queries: Vec<SearchQuery> = (0..100)
|
||||
.map(|i| SearchQuery {
|
||||
index_uid: format!("index-{}", i % 10),
|
||||
q: Some(format!("query-{}", i)),
|
||||
q: Some(format!("query-{i}")),
|
||||
filter: None,
|
||||
limit: Some(10),
|
||||
offset: Some(0),
|
||||
|
|
|
|||
|
|
@ -20,10 +20,10 @@ use reqwest::Client;
|
|||
|
||||
// Type alias for ModeACoordinator that becomes a dummy type when feature is disabled
|
||||
#[cfg(feature = "peer-discovery")]
|
||||
type ModeACoordinator = ActualModeACoordinator;
|
||||
pub type ModeACoordinator = ActualModeACoordinator;
|
||||
|
||||
#[cfg(not(feature = "peer-discovery"))]
|
||||
struct ModeACoordinator;
|
||||
pub struct ModeACoordinator;
|
||||
|
||||
#[cfg(not(feature = "peer-discovery"))]
|
||||
impl ModeACoordinator {
|
||||
|
|
@ -273,7 +273,8 @@ impl NodeClient for HttpNodeClient {
|
|||
|
||||
let results = json
|
||||
.get("results")
|
||||
.and_then(|v| v.as_array()).cloned()
|
||||
.and_then(|v| v.as_array())
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
let total = json.get("total").and_then(|v| v.as_u64()).unwrap_or(0);
|
||||
|
|
@ -460,18 +461,19 @@ pub struct AntiEntropyWorker {
|
|||
config: AntiEntropyWorkerConfig,
|
||||
reconciler: AntiEntropyReconciler<HttpNodeClient>,
|
||||
topology: Arc<RwLock<Topology>>,
|
||||
#[allow(dead_code)]
|
||||
task_store: Arc<dyn TaskStore>,
|
||||
pod_id: String,
|
||||
/// Mode A coordinator for shard-partitioned ownership (plan §14.5 Mode A).
|
||||
mode_a_coordinator: Option<Arc<ModeACoordinator>>,
|
||||
/// Total shards in the cluster (for Mode A scaling).
|
||||
total_shards: u32,
|
||||
_total_shards: u32,
|
||||
/// This pod's replica group ID (for Mode A scaling).
|
||||
replica_group_id: Option<u32>,
|
||||
/// Total number of pods in Mode A scaling.
|
||||
num_pods: Option<u32>,
|
||||
/// RF (replication factor) for Mode A scaling.
|
||||
rf: usize,
|
||||
_rf: usize,
|
||||
/// Whether TTL is enabled for expired document handling (plan §13.14 interaction).
|
||||
ttl_enabled: bool,
|
||||
/// Metrics callback for shards scanned.
|
||||
|
|
@ -517,10 +519,10 @@ impl AntiEntropyWorker {
|
|||
task_store,
|
||||
pod_id,
|
||||
mode_a_coordinator: None,
|
||||
total_shards: 0, // Will be set when Mode A is enabled
|
||||
_total_shards: 0, // Will be set when Mode A is enabled
|
||||
replica_group_id: None,
|
||||
num_pods: None,
|
||||
rf: 2, // Default RF
|
||||
_rf: 2, // Default RF
|
||||
ttl_enabled: false,
|
||||
metrics_shards_scanned: None,
|
||||
metrics_mismatches_found: None,
|
||||
|
|
@ -617,6 +619,7 @@ impl AntiEntropyWorker {
|
|||
///
|
||||
/// This runs the pass immediately after acquiring lease, then waits
|
||||
/// for the configured interval before running again (if still leader).
|
||||
#[allow(dead_code)]
|
||||
async fn run_pass_cycle(&self) -> Result<(), String> {
|
||||
let scope = "anti_entropy";
|
||||
let mut lease_renewal =
|
||||
|
|
@ -675,7 +678,7 @@ impl AntiEntropyWorker {
|
|||
info!("starting anti-entropy pass");
|
||||
|
||||
// Get topology info for Mode A scaling
|
||||
let (total_shards, rf) = {
|
||||
let (_total_shards, _rf) = {
|
||||
let topo = self.topology.read().await;
|
||||
(topo.shards, topo.rf())
|
||||
};
|
||||
|
|
@ -715,6 +718,7 @@ impl AntiEntropyWorker {
|
|||
}
|
||||
|
||||
/// Get current time in milliseconds since Unix epoch.
|
||||
#[allow(dead_code)]
|
||||
fn now_ms() -> i64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
|
|
|
|||
|
|
@ -1735,8 +1735,6 @@ impl RebalancerWorker {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Pause an in-progress rebalance.
|
||||
|
||||
/// Pause an in-progress rebalance.
|
||||
pub async fn pause_rebalance(&self, index_uid: &str) -> Result<(), String> {
|
||||
let job_id = RebalanceJobId::new(index_uid);
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ use crate::Result;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::time::Duration;
|
||||
use tracing::{info_span, instrument, Instrument};
|
||||
|
|
@ -1547,6 +1546,7 @@ impl NodeClient for MockNodeClient {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::topology::{Node, NodeId};
|
||||
use std::sync::Arc;
|
||||
|
||||
fn make_test_topology() -> Topology {
|
||||
let mut topo = Topology::new(64, 2, 2);
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue