diff --git a/crates/miroir-core/src/cdc.rs b/crates/miroir-core/src/cdc.rs index 0b12bc6..c221beb 100644 --- a/crates/miroir-core/src/cdc.rs +++ b/crates/miroir-core/src/cdc.rs @@ -254,7 +254,9 @@ impl CdcMemoryBuffer { /// Returns `None` if buffer is at capacity (should overflow). async fn acquire(&self, size: u64) -> Option<()> { // Check soft watermark (80% of max) - let current = self.current_bytes.load(std::sync::atomic::Ordering::Relaxed); + let current = self + .current_bytes + .load(std::sync::atomic::Ordering::Relaxed); if current + size > (self.max_bytes * 8 / 10) { return None; } @@ -262,7 +264,8 @@ impl CdcMemoryBuffer { // Try to acquire semaphore permit (non-blocking) match self.semaphore.try_acquire() { Ok(_permit) => { - self.current_bytes.fetch_add(size, std::sync::atomic::Ordering::Relaxed); + self.current_bytes + .fetch_add(size, std::sync::atomic::Ordering::Relaxed); Some(()) } Err(_) => None, @@ -271,7 +274,9 @@ impl CdcMemoryBuffer { /// Release space after event is processed. fn release(&self, size: u64) { - let old = self.current_bytes.fetch_sub(size, std::sync::atomic::Ordering::Relaxed); + let old = self + .current_bytes + .fetch_sub(size, std::sync::atomic::Ordering::Relaxed); // Add semaphore permit back self.semaphore.add_permits(1); debug_assert!(old >= size, "buffer underflow: {old} < {size}"); @@ -345,7 +350,10 @@ impl CdcRedisOverflow { /// Push event to Redis list (LPUSH). async fn push_inner(&self, event: CdcEvent) -> Result<(), CdcError> { - let pool = self.pool.as_ref().ok_or_else(|| CdcError::SinkError("no pool".into()))?; + let pool = self + .pool + .as_ref() + .ok_or_else(|| CdcError::SinkError("no pool".into()))?; // Serialize event let json = serde_json::to_vec(&event) @@ -354,7 +362,8 @@ impl CdcRedisOverflow { // Check size limit let mut conn = pool.manager.lock().await; - let current_bytes: Option = conn.get(&self.bytes_key) + let current_bytes: Option = conn + .get(&self.bytes_key) .await .map_err(|e| CdcError::SinkError(format!("redis get error: {e}")))?; let current_bytes = current_bytes.unwrap_or(0); @@ -394,7 +403,9 @@ impl CdcRedisOverflow { // Note: We can't modify self here since it's behind &self // This is a limitation of lazy initialization in this pattern // For now, we'll just return an error - return Err(CdcError::SinkError("Redis pool not initialized - use lazy_new with explicit URL".into())); + return Err(CdcError::SinkError( + "Redis pool not initialized - use lazy_new with explicit URL".into(), + )); } Ok(()) } @@ -417,12 +428,10 @@ impl CdcRedisOverflow { #[cfg(not(feature = "redis-store"))] impl CdcRedisOverflow { /// Create a new Redis overflow backend (no-op without redis-store feature). - pub async fn new( - _pool: (), - _sink_name: String, - _max_bytes: u64, - ) -> Result { - Err(CdcError::SinkError("redis-store feature not enabled".into())) + pub async fn new(_pool: (), _sink_name: String, _max_bytes: u64) -> Result { + Err(CdcError::SinkError( + "redis-store feature not enabled".into(), + )) } } @@ -433,7 +442,9 @@ impl CdcOverflowBackend for CdcRedisOverflow { return self.push_inner(event).await; #[cfg(not(feature = "redis-store"))] - Err(CdcError::SinkError("redis-store feature not enabled".into())) + Err(CdcError::SinkError( + "redis-store feature not enabled".into(), + )) } async fn pop(&self) -> Option { @@ -500,7 +511,8 @@ impl CdcPvcOverflow { } fn file_path(&self) -> std::path::PathBuf { - self.data_dir.join(format!("cdc-overflow-{}.log", self.sink_name)) + self.data_dir + .join(format!("cdc-overflow-{}.log", self.sink_name)) } /// Push event to file (append, truncate if over limit). @@ -534,15 +546,18 @@ impl CdcPvcOverflow { } // Rewrite file - let mut file = tokio::fs::File::create(&path).await + let mut file = tokio::fs::File::create(&path) + .await .map_err(|e| CdcError::SinkError(format!("create error: {e}")))?; for ev in events { let line = serde_json::to_string(&ev) .map_err(|e| CdcError::SinkError(format!("serialize error: {e}")))?; use tokio::io::AsyncWriteExt; - file.write_all(line.as_bytes()).await + file.write_all(line.as_bytes()) + .await .map_err(|e| CdcError::SinkError(format!("write error: {e}")))?; - file.write_all(b"\n").await + file.write_all(b"\n") + .await .map_err(|e| CdcError::SinkError(format!("write error: {e}")))?; } } @@ -555,9 +570,11 @@ impl CdcPvcOverflow { .open(&path) .await .map_err(|e| CdcError::SinkError(format!("open error: {e}")))?; - file.write_all(&json).await + file.write_all(&json) + .await .map_err(|e| CdcError::SinkError(format!("write error: {e}")))?; - file.write_all(b"\n").await + file.write_all(b"\n") + .await .map_err(|e| CdcError::SinkError(format!("write error: {e}")))?; Ok(()) @@ -634,7 +651,10 @@ impl CdcOverflowBackend for CdcDropOverflow { if let Some(ref callback) = self.metric_callback { callback(&self.sink_name); } - debug!("CDC: dropped event for sink {} (overflow: drop)", self.sink_name); + debug!( + "CDC: dropped event for sink {} (overflow: drop)", + self.sink_name + ); // Return error to signal event was dropped Err(CdcError::BufferOverflow) } @@ -661,14 +681,17 @@ impl CdcBuffer { ) -> Result { let primary = Arc::new(CdcMemoryBuffer::new(config.memory_bytes)); - let overflow: Arc = match CdcBufferType::from_str(config.overflow.as_str()) { + let overflow: Arc = match CdcBufferType::from_str( + config.overflow.as_str(), + ) { Some(CdcBufferType::Redis) => { #[cfg(feature = "redis-store")] { // Redis pool will be created lazily on first use let redis_url = std::env::var("MIROIR_REDIS_URL") .unwrap_or_else(|_| "redis://localhost:6379".to_string()); - let backend = CdcRedisOverflow::lazy_new(sink_name, config.redis_bytes, redis_url); + let backend = + CdcRedisOverflow::lazy_new(sink_name, config.redis_bytes, redis_url); Arc::new(backend) } #[cfg(not(feature = "redis-store"))] @@ -678,8 +701,8 @@ impl CdcBuffer { } } Some(CdcBufferType::Pvc) => { - let data_dir = std::env::var("MIROIR_DATA_DIR") - .unwrap_or_else(|_| "/data".to_string()); + let data_dir = + std::env::var("MIROIR_DATA_DIR").unwrap_or_else(|_| "/data".to_string()); Arc::new(CdcPvcOverflow::new( sink_name, std::path::PathBuf::from(data_dir), @@ -728,7 +751,10 @@ impl CdcBuffer { /// Get total buffer size in bytes (primary + overflow). pub async fn size_bytes(&self) -> u64 { - let primary = self.primary.current_bytes.load(std::sync::atomic::Ordering::Relaxed); + let primary = self + .primary + .current_bytes + .load(std::sync::atomic::Ordering::Relaxed); let overflow = self.overflow.size_bytes().await; primary + overflow } @@ -781,7 +807,11 @@ impl CdcManager { let mut buffers = HashMap::new(); for sink in &config.sinks { let sink_name = sink.url.clone(); - match CdcBuffer::new(&config.buffer, sink_name.clone(), dropped_metric_callback.clone()) { + match CdcBuffer::new( + &config.buffer, + sink_name.clone(), + dropped_metric_callback.clone(), + ) { Ok(buffer) => { buffers.insert(sink_name.clone(), Arc::new(buffer)); } @@ -797,7 +827,8 @@ impl CdcManager { let config_clone = config.clone(); let buffers_clone = buffers.clone(); tokio::spawn(async move { - Self::background_publisher(event_rx, state_clone, config_clone, buffers_clone).await; + Self::background_publisher(event_rx, state_clone, config_clone, buffers_clone) + .await; }); } @@ -1231,8 +1262,14 @@ mod tests { #[test] fn test_cdc_buffer_type_from_str() { - assert_eq!(CdcBufferType::from_str("memory"), Some(CdcBufferType::Memory)); - assert_eq!(CdcBufferType::from_str("MEMORY"), Some(CdcBufferType::Memory)); + assert_eq!( + CdcBufferType::from_str("memory"), + Some(CdcBufferType::Memory) + ); + assert_eq!( + CdcBufferType::from_str("MEMORY"), + Some(CdcBufferType::Memory) + ); assert_eq!(CdcBufferType::from_str("redis"), Some(CdcBufferType::Redis)); assert_eq!(CdcBufferType::from_str("pvc"), Some(CdcBufferType::Pvc)); assert_eq!(CdcBufferType::from_str("drop"), Some(CdcBufferType::Drop)); diff --git a/crates/miroir-core/src/rebalancer_worker/anti_entropy_worker.rs b/crates/miroir-core/src/rebalancer_worker/anti_entropy_worker.rs index ce2715c..6a56308 100644 --- a/crates/miroir-core/src/rebalancer_worker/anti_entropy_worker.rs +++ b/crates/miroir-core/src/rebalancer_worker/anti_entropy_worker.rs @@ -53,9 +53,9 @@ pub struct AntiEntropyWorkerConfig { impl Default for AntiEntropyWorkerConfig { fn default() -> Self { Self { - interval_s: 6 * 3600, // 6 hours + interval_s: 6 * 3600, // 6 hours lease_renewal_interval_ms: 5000, // 5 seconds - lease_ttl_secs: 30, // 30 seconds + lease_ttl_secs: 30, // 30 seconds } } } diff --git a/crates/miroir-core/src/rebalancer_worker/drift_reconciler.rs b/crates/miroir-core/src/rebalancer_worker/drift_reconciler.rs index 9fd91b0..b5f3303 100644 --- a/crates/miroir-core/src/rebalancer_worker/drift_reconciler.rs +++ b/crates/miroir-core/src/rebalancer_worker/drift_reconciler.rs @@ -60,7 +60,7 @@ impl Default for DriftReconcilerConfig { interval_s: 300, // 5 minutes auto_repair: true, lease_renewal_interval_ms: 5000, // 5 seconds - lease_ttl_secs: 30, // 30 seconds + lease_ttl_secs: 30, // 30 seconds } } } diff --git a/crates/miroir-core/src/task_store/redis.rs b/crates/miroir-core/src/task_store/redis.rs index e289673..f7bfb5a 100644 --- a/crates/miroir-core/src/task_store/redis.rs +++ b/crates/miroir-core/src/task_store/redis.rs @@ -579,8 +579,10 @@ impl TaskStore for RedisTaskStore { let mut p = pipe(); p.hget(&key, "created_at"); p.hget(&key, "status"); - let result: (Option, Option) = - pool.pipeline_query(&mut p).await.map_err(|e| MiroirError::Redis(e.to_string()))?; + let result: (Option, Option) = pool + .pipeline_query(&mut p) + .await + .map_err(|e| MiroirError::Redis(e.to_string()))?; if let (Some(created_at_str), Some(status)) = result { if !terminal_statuses.contains(&status.as_str()) { @@ -614,15 +616,20 @@ impl TaskStore for RedisTaskStore { let status = get_field_string(&fields, "status")?; let node_tasks_json = get_field_string(&fields, "node_tasks")?; let node_tasks: HashMap = serde_json::from_str(&node_tasks_json) - .map_err(|e| MiroirError::TaskStore(format!("invalid node_tasks JSON: {e}")))?; + .map_err(|e| { + MiroirError::TaskStore(format!("invalid node_tasks JSON: {e}")) + })?; let error = opt_field(&fields, "error"); let started_at = opt_field_i64(&fields, "started_at"); let finished_at = opt_field_i64(&fields, "finished_at"); let index_uid = opt_field(&fields, "index_uid"); let task_type = opt_field(&fields, "task_type"); - let node_errors_json = opt_field(&fields, "node_errors").unwrap_or_else(|| "{}".to_string()); - let node_errors: HashMap = serde_json::from_str(&node_errors_json) - .map_err(|e| MiroirError::TaskStore(format!("invalid node_errors JSON: {e}")))?; + let node_errors_json = + opt_field(&fields, "node_errors").unwrap_or_else(|| "{}".to_string()); + let node_errors: HashMap = + serde_json::from_str(&node_errors_json).map_err(|e| { + MiroirError::TaskStore(format!("invalid node_errors JSON: {e}")) + })?; results.push(TaskRow { miroir_id, diff --git a/crates/miroir-core/tests/p28_api_compatibility.rs b/crates/miroir-core/tests/p28_api_compatibility.rs index b5f81dd..22d54a6 100644 --- a/crates/miroir-core/tests/p28_api_compatibility.rs +++ b/crates/miroir-core/tests/p28_api_compatibility.rs @@ -4,9 +4,9 @@ //! 1. Error format parity with Meilisearch (plan §5) //! 2. GET /_miroir/topology matches plan §10 JSON shape +use axum::response::IntoResponse; use miroir_core::api_error::{ErrorType, MeilisearchError, MiroirCode}; use serde_json::json; -use axum::response::IntoResponse; /// Test 1: All Miroir error codes produce the correct Meilisearch-compatible shape. ///