feat(cdc): implement webhook sink batching, retries, and cursor persistence (P5.13.a, miroir-uhj.13.1)

Implements plan §13.13 webhook sink with:
- Time-based batch flushing (batch_flush_ms timer) in addition to size-based (batch_size)
- Exponential backoff retries with jitter, capped by retry_max_s (default 3600s)
- Per-sink cursor persistence on successful ACK only (at-least-once delivery)
- Document body inclusion controlled by include_body sink config

Key changes:
- Added duration_jitter() helper for randomized backoff (±25%)
- Modified background_publisher to use tokio::select! for event + timer handling
- Implemented retry loop in flush_webhook with:
  - Initial delay: 100ms
  - Exponential multiplier: 2x (max 60s)
  - Retries on 5xx, 429, and network errors
  - No retry on 4xx client errors (except 429)
- Cursor advances only on 2xx response via internal_queue.persist_cursor()

Closes: miroir-uhj.13.1

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-24 12:33:58 -04:00
parent adab169bed
commit ddd84f53e1

View file

@ -34,6 +34,7 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::AsyncWriteExt;
use tokio::sync::{mpsc, RwLock, Semaphore};
use tracing::{debug, error, info, warn};
@ -43,6 +44,17 @@ use crate::task_store::{NewCdcCursor, TaskStore};
#[cfg(feature = "redis-store")]
use ::redis::AsyncCommands;
/// Add random jitter to a duration.
///
/// Jitter is ±`fraction` of the base duration. For example, with fraction=0.25,
/// the result is uniformly distributed in [0.75 * base, 1.25 * base].
fn duration_jitter(base: Duration, fraction: f64) -> Duration {
let millis = base.as_millis() as f64;
let jitter_range = millis * fraction;
let random_jitter = (rand::random::<f64>() * 2.0 - 1.0) * jitter_range;
Duration::from_millis((millis + random_jitter).max(0.0) as u64)
}
/// Origin tag for anti-entropy repair writes (plan §13.8).
/// These writes are suppressed from CDC unless `emit_internal_writes` is true.
pub const ORIGIN_ANTIENTROPY: &str = "antientropy";
@ -1044,6 +1056,9 @@ impl CdcManager {
}
/// Background task that buffers and publishes events to sinks.
///
/// Implements time-based flushing (batch_flush_ms) and per-sink batching.
/// Plan §13.13: "batched POST to configured URL; default batch_size: 100 events or batch_flush_ms: 1000"
async fn background_publisher(
mut event_rx: mpsc::UnboundedReceiver<CdcEvent>,
state: Arc<RwLock<CdcPublisherState>>,
@ -1055,47 +1070,93 @@ impl CdcManager {
// Per-sink event buffers
let mut sink_buffers: HashMap<String, Vec<CdcEvent>> = HashMap::new();
// Per-sink last flush time
let mut last_flush: HashMap<String, Instant> = HashMap::new();
while let Some(event) = event_rx.recv().await {
// Store event in internal queue for GET /_miroir/changes endpoint
let _sequence = internal_queue.store(event.clone()).await;
// Create a timer that ticks at the minimum batch_flush_ms interval
let min_flush_ms = config
.sinks
.iter()
.map(|s| s.batch_flush_ms)
.min()
.unwrap_or(1000);
let mut flush_timer = tokio::time::interval(Duration::from_millis(min_flush_ms));
flush_timer.tick().await; // Skip first immediate tick
// Buffer event for each sink (use the original event for sinks)
for sink in &config.sinks {
// Push to tiered buffer (memory → overflow)
if let Some(buffer) = buffers.get(&sink.url) {
if let Err(e) = buffer.push(event.clone()).await {
match e {
CdcError::BufferOverflow => {
// Event was dropped (overflow: drop)
// Metric already incremented by CdcDropOverflow
let mut st = state.write().await;
st.dropped_count += 1;
loop {
tokio::select! {
// Handle incoming events
event = event_rx.recv() => {
match event {
Some(event) => {
// Store event in internal queue for GET /_miroir/changes endpoint
let _sequence = internal_queue.store(event.clone()).await;
// Buffer event for each sink
for sink in &config.sinks {
// Push to tiered buffer (memory → overflow)
if let Some(buffer) = buffers.get(&sink.url) {
if let Err(e) = buffer.push(event.clone()).await {
match e {
CdcError::BufferOverflow => {
// Event was dropped (overflow: drop)
let mut st = state.write().await;
st.dropped_count += 1;
}
_ => {
error!("CDC: buffer error for sink {}: {}", sink.url, e);
}
}
}
}
let buffer = sink_buffers.entry(sink.url.clone()).or_default();
buffer.push(event.clone());
// Flush if buffer size reached (batch_size trigger)
if buffer.len() >= sink.batch_size as usize {
if let Err(e) = Self::flush_sink(sink, buffer, &state, &internal_queue).await {
error!("CDC: failed to flush sink {}: {}", sink.url, e);
}
sink_buffers.insert(sink.url.clone(), Vec::new());
last_flush.insert(sink.url.clone(), Instant::now());
}
}
_ => {
error!("CDC: buffer error for sink {}: {}", sink.url, e);
// Update buffer bytes metrics
let mut st = state.write().await;
for (sink_url, buffer) in &buffers {
let bytes = buffer.size_bytes().await;
st.buffer_bytes.insert(sink_url.clone(), bytes);
}
}
None => {
// Channel closed - flush remaining and exit
break;
}
}
}
// Handle time-based flushing (batch_flush_ms trigger)
_ = flush_timer.tick() => {
let now = Instant::now();
for sink in &config.sinks {
if let Some(buffer) = sink_buffers.get_mut(&sink.url) {
if !buffer.is_empty() {
let flush_deadline = last_flush.get(&sink.url)
.map(|t| *t + Duration::from_millis(sink.batch_flush_ms))
.unwrap_or(now);
if now >= flush_deadline {
if let Err(e) = Self::flush_sink(sink, buffer, &state, &internal_queue).await {
error!("CDC: failed to flush sink {} on timer: {}", sink.url, e);
}
buffer.clear();
last_flush.insert(sink.url.clone(), now);
}
}
}
}
}
let buffer = sink_buffers.entry(sink.url.clone()).or_default();
buffer.push(event.clone());
// Flush if buffer size reached
if buffer.len() >= sink.batch_size as usize {
if let Err(e) = Self::flush_sink(sink, buffer, &state).await {
error!("CDC: failed to flush sink {}: {}", sink.url, e);
}
sink_buffers.insert(sink.url.clone(), Vec::new());
}
}
// Update buffer bytes metrics
let mut st = state.write().await;
for (sink_url, buffer) in &buffers {
let bytes = buffer.size_bytes().await;
st.buffer_bytes.insert(sink_url.clone(), bytes);
}
}
@ -1104,7 +1165,7 @@ impl CdcManager {
if !buffer.is_empty() {
let sink = config.sinks.iter().find(|s| s.url == sink_url);
if let Some(sink) = sink {
if let Err(e) = Self::flush_sink(sink, &buffer, &state).await {
if let Err(e) = Self::flush_sink(sink, &buffer, &state, &internal_queue).await {
error!("CDC: failed to flush sink {} on shutdown: {}", sink_url, e);
}
}
@ -1115,51 +1176,154 @@ impl CdcManager {
}
/// Flush buffered events to a single sink.
///
/// On success, advances the per-sink cursor in cdc_cursors table (plan §13.13).
/// Cursor is only advanced on sink ACK, ensuring at-least-once delivery.
async fn flush_sink(
sink: &CdcSinkConfig,
events: &[CdcEvent],
_state: &Arc<RwLock<CdcPublisherState>>,
state: &Arc<RwLock<CdcPublisherState>>,
internal_queue: &Arc<CdcInternalQueue>,
) -> Result<(), CdcError> {
match sink.sink_type {
CdcSinkType::Webhook => Self::flush_webhook(sink, events).await,
CdcSinkType::Webhook => {
Self::flush_webhook(sink, events, internal_queue).await?;
// Increment published count on success
let mut st = state.write().await;
st.published_count += events.len() as u64;
Ok(())
}
CdcSinkType::Nats => Self::flush_nats(sink, events).await,
CdcSinkType::Kafka => Self::flush_kafka(sink, events).await,
CdcSinkType::Internal => {
// Internal queue: events are stored in memory for polling
// (implementation depends on internal queue design)
Ok(())
}
}
}
/// Flush events to a webhook sink.
async fn flush_webhook(sink: &CdcSinkConfig, events: &[CdcEvent]) -> Result<(), CdcError> {
///
/// Implements exponential backoff retries capped by retry_max_s (plan §13.13).
/// Advances cursor only on successful ACK (2xx response).
///
/// Retry strategy:
/// - Initial delay: 100ms
/// - Exponential backoff multiplier: 2
/// - Max retry time: retry_max_s (default 3600s = 1 hour)
/// - Retries on 5xx errors and network errors
/// - No retry on 4xx client errors (except 429 Too Many Requests)
async fn flush_webhook(
sink: &CdcSinkConfig,
events: &[CdcEvent],
internal_queue: &Arc<CdcInternalQueue>,
) -> Result<(), CdcError> {
let client = reqwest::Client::new();
let response = client
.post(&sink.url)
.json(events)
.send()
.await
.map_err(|e| CdcError::SinkError(e.to_string()))?;
let retry_max_s = sink.retry_max_s;
let retry_start = Instant::now();
let mut delay = Duration::from_millis(100); // Initial retry delay
if response.status().is_success() {
Ok(())
} else {
let status = response.status();
Err(CdcError::SinkError(format!("webhook returned {status}")))
loop {
// Clone events for potential retry
let events_to_send: Vec<CdcEvent> = events
.iter()
.map(|ev| {
let mut ev_clone = ev.clone();
// Respect include_body setting
if !sink.include_body {
ev_clone.document = None;
}
ev_clone
})
.collect();
let result = client.post(&sink.url).json(&events_to_send).send().await;
match result {
Ok(response) => {
let status = response.status();
if status.is_success() {
// Success - advance cursor for each index in the batch
// Cursor is per-sink per-index (plan §13.13)
for event in events {
if let Err(e) = internal_queue
.persist_cursor(&sink.url, &event.index, event.timestamp)
.await
{
warn!(
"CDC: failed to persist cursor for sink {} index {}: {}",
sink.url, event.index, e
);
}
}
return Ok(());
}
// Determine if we should retry
let should_retry = status.is_server_error()
|| status == reqwest::StatusCode::TOO_MANY_REQUESTS;
if should_retry {
// Check if we've exceeded retry_max_s
let elapsed = retry_start.elapsed().as_secs();
if elapsed >= retry_max_s {
return Err(CdcError::SinkError(format!(
"webhook retry timeout after {elapsed}s (max {retry_max_s}s), last status: {status}"
)));
}
warn!(
"CDC: webhook returned {}, retrying in {:?} (elapsed: {}s)",
status, delay, elapsed
);
tokio::time::sleep(delay).await;
// Exponential backoff with jitter (±25%)
delay = (delay * 2).min(Duration::from_secs(60));
let jitter = duration_jitter(delay, 0.25);
delay = jitter;
} else {
// Client error (4xx) - don't retry
return Err(CdcError::SinkError(format!(
"webhook returned non-retryable status {status}"
)));
}
}
Err(e) => {
// Network error - check if we should retry
let elapsed = retry_start.elapsed().as_secs();
if elapsed >= retry_max_s {
return Err(CdcError::SinkError(format!(
"webhook network error after {elapsed}s (max {retry_max_s}s): {e}"
)));
}
warn!(
"CDC: webhook network error: {}, retrying in {:?} (elapsed: {}s)",
e, delay, elapsed
);
tokio::time::sleep(delay).await;
// Exponential backoff with jitter
delay = (delay * 2).min(Duration::from_secs(60));
let jitter = duration_jitter(delay, 0.25);
delay = jitter;
}
}
}
}
/// Flush events to a NATS sink.
/// NATS flush (placeholder for P5.13.b).
async fn flush_nats(_sink: &CdcSinkConfig, _events: &[CdcEvent]) -> Result<(), CdcError> {
// NATS publishing implementation
// NATS publishing implementation (P5.13.b)
// (requires async-nats crate)
Ok(())
}
/// Flush events to a Kafka sink.
/// Kafka flush (placeholder for P5.13.c).
async fn flush_kafka(_sink: &CdcSinkConfig, _events: &[CdcEvent]) -> Result<(), CdcError> {
// Kafka publishing implementation
// Kafka publishing implementation (P5.13.c)
// (requires rustafka or rdkafka crate)
Ok(())
}
@ -1541,4 +1705,53 @@ mod tests {
// Clear should succeed
assert!(drop_backend.clear().await.is_ok());
}
#[test]
fn test_duration_jitter_returns_positive_duration() {
let base = Duration::from_millis(1000);
let jittered = duration_jitter(base, 0.25);
// Result should be positive
assert!(jittered.as_millis() > 0);
// Result should be in reasonable range [750, 1250] ms
assert!(jittered.as_millis() >= 750);
assert!(jittered.as_millis() <= 1250);
}
#[test]
fn test_duration_jitter_with_zero_fraction() {
let base = Duration::from_millis(1000);
let jittered = duration_jitter(base, 0.0);
// With zero jitter, should return base duration
assert_eq!(jittered, base);
}
#[test]
fn test_duration_jitter_small_duration() {
let base = Duration::from_millis(10);
let jittered = duration_jitter(base, 0.5);
// Result should be positive
assert!(jittered.as_millis() > 0);
// Result should be in range [5, 15] ms
assert!(jittered.as_millis() >= 5);
assert!(jittered.as_millis() <= 15);
}
#[test]
fn test_duration_jitter_multiple_calls_produce_variance() {
let base = Duration::from_millis(1000);
let mut results = Vec::new();
for _ in 0..100 {
results.push(duration_jitter(base, 0.25));
}
// Check that we get different results (variance)
let unique: std::collections::HashSet<_> = results.iter().collect();
assert!(unique.len() > 10, "Jitter should produce variance");
}
}