P5.13.f Event suppression by _miroir_origin tag (internal writes)

- Add CdcSuppressedMetricCallback type for suppression metric tracking
- Add with_metrics() constructor to CdcManager for optional callback
- Update publish() to call callback when suppressing events by origin
- Clean up duplicate TTL delete filtering logic
- Add tests: suppression metric callback, all origins, emit_internal_writes mode, client writes

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-06 07:19:38 -04:00
parent 64b436f085
commit 330991f0b3

View file

@ -8,6 +8,10 @@ use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, error, info};
/// Callback type for incrementing the CDC events suppressed metric.
/// Called with the origin tag when an event is suppressed.
pub type CdcSuppressedMetricCallback = Arc<dyn Fn(&str) + Send + Sync>;
/// CDC event published on every successful write (after quorum ACK).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CdcEvent {
@ -91,6 +95,8 @@ pub struct CdcManager {
event_tx: mpsc::UnboundedSender<CdcEvent>,
/// Per-sink state (shared with background task).
state: Arc<RwLock<CdcPublisherState>>,
/// Optional callback to increment suppression metric.
suppressed_metric_callback: Option<CdcSuppressedMetricCallback>,
}
/// CDC manager configuration.
@ -150,6 +156,11 @@ impl Default for CdcConfig {
impl CdcManager {
/// Create a new CDC manager.
pub fn new(config: CdcConfig) -> Self {
Self::with_metrics(config, None)
}
/// Create a new CDC manager with an optional suppression metric callback.
pub fn with_metrics(config: CdcConfig, suppressed_metric_callback: Option<CdcSuppressedMetricCallback>) -> Self {
let (event_tx, event_rx) = mpsc::unbounded_channel();
let state = Arc::new(RwLock::new(CdcPublisherState {
cursors: HashMap::new(),
@ -171,6 +182,7 @@ impl CdcManager {
config,
event_tx,
state,
suppressed_metric_callback,
}
}
@ -180,31 +192,27 @@ impl CdcManager {
return Ok(());
}
// Filter based on origin tag
// Filter based on origin tag (plan §13.13: CDC event suppression)
if let Some(ref origin) = event.origin {
match origin.as_str() {
let should_suppress = match origin.as_str() {
"antientropy" | "reshard_backfill" | "rollover" => {
if !self.config.emit_internal_writes {
debug!("CDC: suppressing internal write with origin {}", origin);
return Ok(());
}
// Internal writes: suppressed unless emit_internal_writes is true
!self.config.emit_internal_writes
}
"ttl_expire" => {
if !self.config.emit_ttl_deletes {
debug!("CDC: suppressing TTL delete");
return Ok(());
}
// TTL deletes: suppressed unless emit_ttl_deletes is true
!self.config.emit_ttl_deletes
}
_ => {}
}
}
_ => false,
};
// TTL deletes are filtered by emit_ttl_deletes flag
if event.operation == CdcOperation::Delete {
if let Some(ref origin) = event.origin {
if origin == "ttl_expire" && !self.config.emit_ttl_deletes {
return Ok(());
if should_suppress {
debug!("CDC: suppressing event with origin {}", origin);
// Increment suppression metric if callback is provided
if let Some(ref callback) = self.suppressed_metric_callback {
callback(origin);
}
return Ok(());
}
}
@ -417,4 +425,162 @@ mod tests {
let json = serde_json::to_string(&sink_type).unwrap();
assert_eq!(json, "\"Webhook\"");
}
#[tokio::test]
async fn test_cdc_suppression_metric_callback() {
use std::sync::atomic::{AtomicUsize, Ordering};
let callback_called = Arc::new(AtomicUsize::new(0));
let callback_clone = callback_called.clone();
let callback: CdcSuppressedMetricCallback = Arc::new(move |origin| {
assert_eq!(origin, "antientropy");
callback_clone.fetch_add(1, Ordering::SeqCst);
});
let config = CdcConfig {
enabled: true,
emit_internal_writes: false,
..Default::default()
};
let manager = CdcManager::with_metrics(config, Some(callback));
let event = CdcEvent {
mtask_id: "mtask-123".into(),
index: "products".into(),
operation: CdcOperation::Add,
primary_keys: vec!["sku-123".into()],
shard_ids: vec![5],
settings_version: 1,
timestamp: 1234567890,
document: None,
origin: Some("antientropy".into()),
event_id: uuid::Uuid::new_v4().to_string(),
};
assert!(manager.publish(event).is_ok());
assert_eq!(callback_called.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_cdc_suppression_metric_all_origins() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::HashSet;
let suppressed_origins = Arc::new(std::sync::Mutex::new(HashSet::new()));
let origins_clone = suppressed_origins.clone();
let callback: CdcSuppressedMetricCallback = Arc::new(move |origin| {
origins_clone.lock().unwrap().insert(origin.to_string());
});
let config = CdcConfig {
enabled: true,
emit_internal_writes: false,
emit_ttl_deletes: false,
..Default::default()
};
let manager = CdcManager::with_metrics(config, Some(callback));
// Test all suppressible origins
let origins = vec!["antientropy", "reshard_backfill", "rollover", "ttl_expire"];
for origin in origins {
let event = CdcEvent {
mtask_id: "mtask-123".into(),
index: "products".into(),
operation: CdcOperation::Delete,
primary_keys: vec!["sku-123".into()],
shard_ids: vec![5],
settings_version: 1,
timestamp: 1234567890,
document: None,
origin: Some(origin.into()),
event_id: uuid::Uuid::new_v4().to_string(),
};
assert!(manager.publish(event).is_ok());
}
let suppressed = suppressed_origins.lock().unwrap();
assert_eq!(suppressed.len(), 4);
assert!(suppressed.contains("antientropy"));
assert!(suppressed.contains("reshard_backfill"));
assert!(suppressed.contains("rollover"));
assert!(suppressed.contains("ttl_expire"));
}
#[tokio::test]
async fn test_cdc_no_suppression_with_emit_internal_writes() {
use std::sync::atomic::{AtomicUsize, Ordering};
let callback_called = Arc::new(AtomicUsize::new(0));
let callback_clone = callback_called.clone();
let callback: CdcSuppressedMetricCallback = Arc::new(move |_origin| {
callback_clone.fetch_add(1, Ordering::SeqCst);
});
let config = CdcConfig {
enabled: true,
emit_internal_writes: true, // Enable internal writes
..Default::default()
};
let manager = CdcManager::with_metrics(config, Some(callback));
let event = CdcEvent {
mtask_id: "mtask-123".into(),
index: "products".into(),
operation: CdcOperation::Add,
primary_keys: vec!["sku-123".into()],
shard_ids: vec![5],
settings_version: 1,
timestamp: 1234567890,
document: None,
origin: Some("antientropy".into()),
event_id: uuid::Uuid::new_v4().to_string(),
};
// Should NOT be suppressed because emit_internal_writes is true
assert!(manager.publish(event).is_ok());
// Callback should NOT have been called
assert_eq!(callback_called.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn test_cdc_client_write_never_suppressed() {
use std::sync::atomic::{AtomicUsize, Ordering};
let callback_called = Arc::new(AtomicUsize::new(0));
let callback_clone = callback_called.clone();
let callback: CdcSuppressedMetricCallback = Arc::new(move |_origin| {
callback_clone.fetch_add(1, Ordering::SeqCst);
});
let config = CdcConfig {
enabled: true,
emit_internal_writes: false,
emit_ttl_deletes: false,
..Default::default()
};
let manager = CdcManager::with_metrics(config, Some(callback));
// Client write has no origin tag
let event = CdcEvent {
mtask_id: "mtask-123".into(),
index: "products".into(),
operation: CdcOperation::Add,
primary_keys: vec!["sku-123".into()],
shard_ids: vec![5],
settings_version: 1,
timestamp: 1234567890,
document: None,
origin: None, // No origin = client write
event_id: uuid::Uuid::new_v4().to_string(),
};
// Should NOT be suppressed (client writes are always emitted)
assert!(manager.publish(event).is_ok());
// Callback should NOT have been called
assert_eq!(callback_called.load(Ordering::SeqCst), 0);
}
}