From 330991f0b34dbdf6bd24f6c106cd97ce67d58971 Mon Sep 17 00:00:00 2001 From: jedarden Date: Wed, 6 May 2026 07:19:38 -0400 Subject: [PATCH] P5.13.f Event suppression by _miroir_origin tag (internal writes) - Add CdcSuppressedMetricCallback type for suppression metric tracking - Add with_metrics() constructor to CdcManager for optional callback - Update publish() to call callback when suppressing events by origin - Clean up duplicate TTL delete filtering logic - Add tests: suppression metric callback, all origins, emit_internal_writes mode, client writes Co-Authored-By: Claude Opus 4.7 --- crates/miroir-core/src/cdc.rs | 202 +++++++++++++++++++++++++++++++--- 1 file changed, 184 insertions(+), 18 deletions(-) diff --git a/crates/miroir-core/src/cdc.rs b/crates/miroir-core/src/cdc.rs index 2bce150..83f6ba1 100644 --- a/crates/miroir-core/src/cdc.rs +++ b/crates/miroir-core/src/cdc.rs @@ -8,6 +8,10 @@ use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; use tracing::{debug, error, info}; +/// Callback type for incrementing the CDC events suppressed metric. +/// Called with the origin tag when an event is suppressed. +pub type CdcSuppressedMetricCallback = Arc; + /// CDC event published on every successful write (after quorum ACK). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CdcEvent { @@ -91,6 +95,8 @@ pub struct CdcManager { event_tx: mpsc::UnboundedSender, /// Per-sink state (shared with background task). state: Arc>, + /// Optional callback to increment suppression metric. + suppressed_metric_callback: Option, } /// CDC manager configuration. @@ -150,6 +156,11 @@ impl Default for CdcConfig { impl CdcManager { /// Create a new CDC manager. pub fn new(config: CdcConfig) -> Self { + Self::with_metrics(config, None) + } + + /// Create a new CDC manager with an optional suppression metric callback. + pub fn with_metrics(config: CdcConfig, suppressed_metric_callback: Option) -> Self { let (event_tx, event_rx) = mpsc::unbounded_channel(); let state = Arc::new(RwLock::new(CdcPublisherState { cursors: HashMap::new(), @@ -171,6 +182,7 @@ impl CdcManager { config, event_tx, state, + suppressed_metric_callback, } } @@ -180,31 +192,27 @@ impl CdcManager { return Ok(()); } - // Filter based on origin tag + // Filter based on origin tag (plan ยง13.13: CDC event suppression) if let Some(ref origin) = event.origin { - match origin.as_str() { + let should_suppress = match origin.as_str() { "antientropy" | "reshard_backfill" | "rollover" => { - if !self.config.emit_internal_writes { - debug!("CDC: suppressing internal write with origin {}", origin); - return Ok(()); - } + // Internal writes: suppressed unless emit_internal_writes is true + !self.config.emit_internal_writes } "ttl_expire" => { - if !self.config.emit_ttl_deletes { - debug!("CDC: suppressing TTL delete"); - return Ok(()); - } + // TTL deletes: suppressed unless emit_ttl_deletes is true + !self.config.emit_ttl_deletes } - _ => {} - } - } + _ => false, + }; - // TTL deletes are filtered by emit_ttl_deletes flag - if event.operation == CdcOperation::Delete { - if let Some(ref origin) = event.origin { - if origin == "ttl_expire" && !self.config.emit_ttl_deletes { - return Ok(()); + if should_suppress { + debug!("CDC: suppressing event with origin {}", origin); + // Increment suppression metric if callback is provided + if let Some(ref callback) = self.suppressed_metric_callback { + callback(origin); } + return Ok(()); } } @@ -417,4 +425,162 @@ mod tests { let json = serde_json::to_string(&sink_type).unwrap(); assert_eq!(json, "\"Webhook\""); } + + #[tokio::test] + async fn test_cdc_suppression_metric_callback() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + let callback_called = Arc::new(AtomicUsize::new(0)); + let callback_clone = callback_called.clone(); + + let callback: CdcSuppressedMetricCallback = Arc::new(move |origin| { + assert_eq!(origin, "antientropy"); + callback_clone.fetch_add(1, Ordering::SeqCst); + }); + + let config = CdcConfig { + enabled: true, + emit_internal_writes: false, + ..Default::default() + }; + let manager = CdcManager::with_metrics(config, Some(callback)); + + let event = CdcEvent { + mtask_id: "mtask-123".into(), + index: "products".into(), + operation: CdcOperation::Add, + primary_keys: vec!["sku-123".into()], + shard_ids: vec![5], + settings_version: 1, + timestamp: 1234567890, + document: None, + origin: Some("antientropy".into()), + event_id: uuid::Uuid::new_v4().to_string(), + }; + + assert!(manager.publish(event).is_ok()); + assert_eq!(callback_called.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn test_cdc_suppression_metric_all_origins() { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::collections::HashSet; + + let suppressed_origins = Arc::new(std::sync::Mutex::new(HashSet::new())); + let origins_clone = suppressed_origins.clone(); + + let callback: CdcSuppressedMetricCallback = Arc::new(move |origin| { + origins_clone.lock().unwrap().insert(origin.to_string()); + }); + + let config = CdcConfig { + enabled: true, + emit_internal_writes: false, + emit_ttl_deletes: false, + ..Default::default() + }; + let manager = CdcManager::with_metrics(config, Some(callback)); + + // Test all suppressible origins + let origins = vec!["antientropy", "reshard_backfill", "rollover", "ttl_expire"]; + for origin in origins { + let event = CdcEvent { + mtask_id: "mtask-123".into(), + index: "products".into(), + operation: CdcOperation::Delete, + primary_keys: vec!["sku-123".into()], + shard_ids: vec![5], + settings_version: 1, + timestamp: 1234567890, + document: None, + origin: Some(origin.into()), + event_id: uuid::Uuid::new_v4().to_string(), + }; + assert!(manager.publish(event).is_ok()); + } + + let suppressed = suppressed_origins.lock().unwrap(); + assert_eq!(suppressed.len(), 4); + assert!(suppressed.contains("antientropy")); + assert!(suppressed.contains("reshard_backfill")); + assert!(suppressed.contains("rollover")); + assert!(suppressed.contains("ttl_expire")); + } + + #[tokio::test] + async fn test_cdc_no_suppression_with_emit_internal_writes() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + let callback_called = Arc::new(AtomicUsize::new(0)); + let callback_clone = callback_called.clone(); + + let callback: CdcSuppressedMetricCallback = Arc::new(move |_origin| { + callback_clone.fetch_add(1, Ordering::SeqCst); + }); + + let config = CdcConfig { + enabled: true, + emit_internal_writes: true, // Enable internal writes + ..Default::default() + }; + let manager = CdcManager::with_metrics(config, Some(callback)); + + let event = CdcEvent { + mtask_id: "mtask-123".into(), + index: "products".into(), + operation: CdcOperation::Add, + primary_keys: vec!["sku-123".into()], + shard_ids: vec![5], + settings_version: 1, + timestamp: 1234567890, + document: None, + origin: Some("antientropy".into()), + event_id: uuid::Uuid::new_v4().to_string(), + }; + + // Should NOT be suppressed because emit_internal_writes is true + assert!(manager.publish(event).is_ok()); + // Callback should NOT have been called + assert_eq!(callback_called.load(Ordering::SeqCst), 0); + } + + #[tokio::test] + async fn test_cdc_client_write_never_suppressed() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + let callback_called = Arc::new(AtomicUsize::new(0)); + let callback_clone = callback_called.clone(); + + let callback: CdcSuppressedMetricCallback = Arc::new(move |_origin| { + callback_clone.fetch_add(1, Ordering::SeqCst); + }); + + let config = CdcConfig { + enabled: true, + emit_internal_writes: false, + emit_ttl_deletes: false, + ..Default::default() + }; + let manager = CdcManager::with_metrics(config, Some(callback)); + + // Client write has no origin tag + let event = CdcEvent { + mtask_id: "mtask-123".into(), + index: "products".into(), + operation: CdcOperation::Add, + primary_keys: vec!["sku-123".into()], + shard_ids: vec![5], + settings_version: 1, + timestamp: 1234567890, + document: None, + origin: None, // No origin = client write + event_id: uuid::Uuid::new_v4().to_string(), + }; + + // Should NOT be suppressed (client writes are always emitted) + assert!(manager.publish(event).is_ok()); + // Callback should NOT have been called + assert_eq!(callback_called.load(Ordering::SeqCst), 0); + } }