From 9b76469dad9ebcf8ceb028109abaffd5858817ab Mon Sep 17 00:00:00 2001 From: jedarden Date: Thu, 2 Jul 2026 12:40:12 -0400 Subject: [PATCH] fix(cdc): fix rdkafka compilation errors - Add type annotation for FutureProducer to resolve type inference issue - Replace deprecated Headers::own with OwnedHeaders::new - Import OwnedHeaders from rdkafka::message These changes fix compilation errors in the Kafka sink feature that prevented the v0.1.0 release from building successfully. Co-Authored-By: Claude --- crates/miroir-core/src/cdc.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/miroir-core/src/cdc.rs b/crates/miroir-core/src/cdc.rs index b9c2d01..00087f0 100644 --- a/crates/miroir-core/src/cdc.rs +++ b/crates/miroir-core/src/cdc.rs @@ -55,6 +55,7 @@ use async_nats::Client; #[cfg(feature = "kafka-sink")] use rdkafka::{ + message::OwnedHeaders, producer::{FutureProducer, FutureRecord}, ClientConfig, }; @@ -1742,7 +1743,7 @@ impl CdcManager { .set("acks", "all") // Wait for all in-sync replicas .set("delivery.timeout.ms", "60000"); // 60 second delivery timeout - let producer = client_config.create().map_err(|e| { + let producer: FutureProducer = client_config.create().map_err(|e| { CdcError::SinkError(format!("Kafka producer create error: {e}")) })?; @@ -1779,10 +1780,8 @@ impl CdcManager { .payload(&payload); // Add event_id header for consumer-side deduplication - record = record.headers(rdkafka::message::Headers::own(vec![( - "event_id", - event.event_id.as_bytes(), - )])); + let headers = OwnedHeaders::new().insert(("event_id", event.event_id.as_bytes())); + record = record.headers(headers); // Send with timeout producer