diff --git a/crates/miroir-core/src/cdc.rs b/crates/miroir-core/src/cdc.rs index b17caca..f7b7eb7 100644 --- a/crates/miroir-core/src/cdc.rs +++ b/crates/miroir-core/src/cdc.rs @@ -34,6 +34,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; +use std::time::{Duration, Instant}; use tokio::io::AsyncWriteExt; use tokio::sync::{mpsc, RwLock, Semaphore}; use tracing::{debug, error, info, warn}; @@ -43,6 +44,17 @@ use crate::task_store::{NewCdcCursor, TaskStore}; #[cfg(feature = "redis-store")] use ::redis::AsyncCommands; +/// Add random jitter to a duration. +/// +/// Jitter is ±`fraction` of the base duration. For example, with fraction=0.25, +/// the result is uniformly distributed in [0.75 * base, 1.25 * base]. +fn duration_jitter(base: Duration, fraction: f64) -> Duration { + let millis = base.as_millis() as f64; + let jitter_range = millis * fraction; + let random_jitter = (rand::random::() * 2.0 - 1.0) * jitter_range; + Duration::from_millis((millis + random_jitter).max(0.0) as u64) +} + /// Origin tag for anti-entropy repair writes (plan §13.8). /// These writes are suppressed from CDC unless `emit_internal_writes` is true. pub const ORIGIN_ANTIENTROPY: &str = "antientropy"; @@ -1044,6 +1056,9 @@ impl CdcManager { } /// Background task that buffers and publishes events to sinks. + /// + /// Implements time-based flushing (batch_flush_ms) and per-sink batching. + /// Plan §13.13: "batched POST to configured URL; default batch_size: 100 events or batch_flush_ms: 1000" async fn background_publisher( mut event_rx: mpsc::UnboundedReceiver, state: Arc>, @@ -1055,47 +1070,93 @@ impl CdcManager { // Per-sink event buffers let mut sink_buffers: HashMap> = HashMap::new(); + // Per-sink last flush time + let mut last_flush: HashMap = HashMap::new(); - while let Some(event) = event_rx.recv().await { - // Store event in internal queue for GET /_miroir/changes endpoint - let _sequence = internal_queue.store(event.clone()).await; + // Create a timer that ticks at the minimum batch_flush_ms interval + let min_flush_ms = config + .sinks + .iter() + .map(|s| s.batch_flush_ms) + .min() + .unwrap_or(1000); + let mut flush_timer = tokio::time::interval(Duration::from_millis(min_flush_ms)); + flush_timer.tick().await; // Skip first immediate tick - // Buffer event for each sink (use the original event for sinks) - for sink in &config.sinks { - // Push to tiered buffer (memory → overflow) - if let Some(buffer) = buffers.get(&sink.url) { - if let Err(e) = buffer.push(event.clone()).await { - match e { - CdcError::BufferOverflow => { - // Event was dropped (overflow: drop) - // Metric already incremented by CdcDropOverflow - let mut st = state.write().await; - st.dropped_count += 1; + loop { + tokio::select! { + // Handle incoming events + event = event_rx.recv() => { + match event { + Some(event) => { + // Store event in internal queue for GET /_miroir/changes endpoint + let _sequence = internal_queue.store(event.clone()).await; + + // Buffer event for each sink + for sink in &config.sinks { + // Push to tiered buffer (memory → overflow) + if let Some(buffer) = buffers.get(&sink.url) { + if let Err(e) = buffer.push(event.clone()).await { + match e { + CdcError::BufferOverflow => { + // Event was dropped (overflow: drop) + let mut st = state.write().await; + st.dropped_count += 1; + } + _ => { + error!("CDC: buffer error for sink {}: {}", sink.url, e); + } + } + } + } + + let buffer = sink_buffers.entry(sink.url.clone()).or_default(); + buffer.push(event.clone()); + + // Flush if buffer size reached (batch_size trigger) + if buffer.len() >= sink.batch_size as usize { + if let Err(e) = Self::flush_sink(sink, buffer, &state, &internal_queue).await { + error!("CDC: failed to flush sink {}: {}", sink.url, e); + } + sink_buffers.insert(sink.url.clone(), Vec::new()); + last_flush.insert(sink.url.clone(), Instant::now()); + } } - _ => { - error!("CDC: buffer error for sink {}: {}", sink.url, e); + + // Update buffer bytes metrics + let mut st = state.write().await; + for (sink_url, buffer) in &buffers { + let bytes = buffer.size_bytes().await; + st.buffer_bytes.insert(sink_url.clone(), bytes); + } + } + None => { + // Channel closed - flush remaining and exit + break; + } + } + } + // Handle time-based flushing (batch_flush_ms trigger) + _ = flush_timer.tick() => { + let now = Instant::now(); + for sink in &config.sinks { + if let Some(buffer) = sink_buffers.get_mut(&sink.url) { + if !buffer.is_empty() { + let flush_deadline = last_flush.get(&sink.url) + .map(|t| *t + Duration::from_millis(sink.batch_flush_ms)) + .unwrap_or(now); + + if now >= flush_deadline { + if let Err(e) = Self::flush_sink(sink, buffer, &state, &internal_queue).await { + error!("CDC: failed to flush sink {} on timer: {}", sink.url, e); + } + buffer.clear(); + last_flush.insert(sink.url.clone(), now); + } } } } } - - let buffer = sink_buffers.entry(sink.url.clone()).or_default(); - buffer.push(event.clone()); - - // Flush if buffer size reached - if buffer.len() >= sink.batch_size as usize { - if let Err(e) = Self::flush_sink(sink, buffer, &state).await { - error!("CDC: failed to flush sink {}: {}", sink.url, e); - } - sink_buffers.insert(sink.url.clone(), Vec::new()); - } - } - - // Update buffer bytes metrics - let mut st = state.write().await; - for (sink_url, buffer) in &buffers { - let bytes = buffer.size_bytes().await; - st.buffer_bytes.insert(sink_url.clone(), bytes); } } @@ -1104,7 +1165,7 @@ impl CdcManager { if !buffer.is_empty() { let sink = config.sinks.iter().find(|s| s.url == sink_url); if let Some(sink) = sink { - if let Err(e) = Self::flush_sink(sink, &buffer, &state).await { + if let Err(e) = Self::flush_sink(sink, &buffer, &state, &internal_queue).await { error!("CDC: failed to flush sink {} on shutdown: {}", sink_url, e); } } @@ -1115,51 +1176,154 @@ impl CdcManager { } /// Flush buffered events to a single sink. + /// + /// On success, advances the per-sink cursor in cdc_cursors table (plan §13.13). + /// Cursor is only advanced on sink ACK, ensuring at-least-once delivery. async fn flush_sink( sink: &CdcSinkConfig, events: &[CdcEvent], - _state: &Arc>, + state: &Arc>, + internal_queue: &Arc, ) -> Result<(), CdcError> { match sink.sink_type { - CdcSinkType::Webhook => Self::flush_webhook(sink, events).await, + CdcSinkType::Webhook => { + Self::flush_webhook(sink, events, internal_queue).await?; + // Increment published count on success + let mut st = state.write().await; + st.published_count += events.len() as u64; + Ok(()) + } CdcSinkType::Nats => Self::flush_nats(sink, events).await, CdcSinkType::Kafka => Self::flush_kafka(sink, events).await, CdcSinkType::Internal => { // Internal queue: events are stored in memory for polling - // (implementation depends on internal queue design) Ok(()) } } } /// Flush events to a webhook sink. - async fn flush_webhook(sink: &CdcSinkConfig, events: &[CdcEvent]) -> Result<(), CdcError> { + /// + /// Implements exponential backoff retries capped by retry_max_s (plan §13.13). + /// Advances cursor only on successful ACK (2xx response). + /// + /// Retry strategy: + /// - Initial delay: 100ms + /// - Exponential backoff multiplier: 2 + /// - Max retry time: retry_max_s (default 3600s = 1 hour) + /// - Retries on 5xx errors and network errors + /// - No retry on 4xx client errors (except 429 Too Many Requests) + async fn flush_webhook( + sink: &CdcSinkConfig, + events: &[CdcEvent], + internal_queue: &Arc, + ) -> Result<(), CdcError> { let client = reqwest::Client::new(); - let response = client - .post(&sink.url) - .json(events) - .send() - .await - .map_err(|e| CdcError::SinkError(e.to_string()))?; + let retry_max_s = sink.retry_max_s; + let retry_start = Instant::now(); + let mut delay = Duration::from_millis(100); // Initial retry delay - if response.status().is_success() { - Ok(()) - } else { - let status = response.status(); - Err(CdcError::SinkError(format!("webhook returned {status}"))) + loop { + // Clone events for potential retry + let events_to_send: Vec = events + .iter() + .map(|ev| { + let mut ev_clone = ev.clone(); + // Respect include_body setting + if !sink.include_body { + ev_clone.document = None; + } + ev_clone + }) + .collect(); + + let result = client.post(&sink.url).json(&events_to_send).send().await; + + match result { + Ok(response) => { + let status = response.status(); + + if status.is_success() { + // Success - advance cursor for each index in the batch + // Cursor is per-sink per-index (plan §13.13) + for event in events { + if let Err(e) = internal_queue + .persist_cursor(&sink.url, &event.index, event.timestamp) + .await + { + warn!( + "CDC: failed to persist cursor for sink {} index {}: {}", + sink.url, event.index, e + ); + } + } + return Ok(()); + } + + // Determine if we should retry + let should_retry = status.is_server_error() + || status == reqwest::StatusCode::TOO_MANY_REQUESTS; + + if should_retry { + // Check if we've exceeded retry_max_s + let elapsed = retry_start.elapsed().as_secs(); + if elapsed >= retry_max_s { + return Err(CdcError::SinkError(format!( + "webhook retry timeout after {elapsed}s (max {retry_max_s}s), last status: {status}" + ))); + } + + warn!( + "CDC: webhook returned {}, retrying in {:?} (elapsed: {}s)", + status, delay, elapsed + ); + tokio::time::sleep(delay).await; + + // Exponential backoff with jitter (±25%) + delay = (delay * 2).min(Duration::from_secs(60)); + let jitter = duration_jitter(delay, 0.25); + delay = jitter; + } else { + // Client error (4xx) - don't retry + return Err(CdcError::SinkError(format!( + "webhook returned non-retryable status {status}" + ))); + } + } + Err(e) => { + // Network error - check if we should retry + let elapsed = retry_start.elapsed().as_secs(); + if elapsed >= retry_max_s { + return Err(CdcError::SinkError(format!( + "webhook network error after {elapsed}s (max {retry_max_s}s): {e}" + ))); + } + + warn!( + "CDC: webhook network error: {}, retrying in {:?} (elapsed: {}s)", + e, delay, elapsed + ); + tokio::time::sleep(delay).await; + + // Exponential backoff with jitter + delay = (delay * 2).min(Duration::from_secs(60)); + let jitter = duration_jitter(delay, 0.25); + delay = jitter; + } + } } } - /// Flush events to a NATS sink. + /// NATS flush (placeholder for P5.13.b). async fn flush_nats(_sink: &CdcSinkConfig, _events: &[CdcEvent]) -> Result<(), CdcError> { - // NATS publishing implementation + // NATS publishing implementation (P5.13.b) // (requires async-nats crate) Ok(()) } - /// Flush events to a Kafka sink. + /// Kafka flush (placeholder for P5.13.c). async fn flush_kafka(_sink: &CdcSinkConfig, _events: &[CdcEvent]) -> Result<(), CdcError> { - // Kafka publishing implementation + // Kafka publishing implementation (P5.13.c) // (requires rustafka or rdkafka crate) Ok(()) } @@ -1541,4 +1705,53 @@ mod tests { // Clear should succeed assert!(drop_backend.clear().await.is_ok()); } + + #[test] + fn test_duration_jitter_returns_positive_duration() { + let base = Duration::from_millis(1000); + let jittered = duration_jitter(base, 0.25); + + // Result should be positive + assert!(jittered.as_millis() > 0); + + // Result should be in reasonable range [750, 1250] ms + assert!(jittered.as_millis() >= 750); + assert!(jittered.as_millis() <= 1250); + } + + #[test] + fn test_duration_jitter_with_zero_fraction() { + let base = Duration::from_millis(1000); + let jittered = duration_jitter(base, 0.0); + + // With zero jitter, should return base duration + assert_eq!(jittered, base); + } + + #[test] + fn test_duration_jitter_small_duration() { + let base = Duration::from_millis(10); + let jittered = duration_jitter(base, 0.5); + + // Result should be positive + assert!(jittered.as_millis() > 0); + + // Result should be in range [5, 15] ms + assert!(jittered.as_millis() >= 5); + assert!(jittered.as_millis() <= 15); + } + + #[test] + fn test_duration_jitter_multiple_calls_produce_variance() { + let base = Duration::from_millis(1000); + let mut results = Vec::new(); + + for _ in 0..100 { + results.push(duration_jitter(base, 0.25)); + } + + // Check that we get different results (variance) + let unique: std::collections::HashSet<_> = results.iter().collect(); + assert!(unique.len() > 10, "Jitter should produce variance"); + } }