style: apply cargo fmt formatting changes
This commit is contained in:
parent
83c03d0909
commit
3cee2fbbb7
5 changed files with 83 additions and 39 deletions
|
|
@ -254,7 +254,9 @@ impl CdcMemoryBuffer {
|
|||
/// Returns `None` if buffer is at capacity (should overflow).
|
||||
async fn acquire(&self, size: u64) -> Option<()> {
|
||||
// Check soft watermark (80% of max)
|
||||
let current = self.current_bytes.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let current = self
|
||||
.current_bytes
|
||||
.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if current + size > (self.max_bytes * 8 / 10) {
|
||||
return None;
|
||||
}
|
||||
|
|
@ -262,7 +264,8 @@ impl CdcMemoryBuffer {
|
|||
// Try to acquire semaphore permit (non-blocking)
|
||||
match self.semaphore.try_acquire() {
|
||||
Ok(_permit) => {
|
||||
self.current_bytes.fetch_add(size, std::sync::atomic::Ordering::Relaxed);
|
||||
self.current_bytes
|
||||
.fetch_add(size, std::sync::atomic::Ordering::Relaxed);
|
||||
Some(())
|
||||
}
|
||||
Err(_) => None,
|
||||
|
|
@ -271,7 +274,9 @@ impl CdcMemoryBuffer {
|
|||
|
||||
/// Release space after event is processed.
|
||||
fn release(&self, size: u64) {
|
||||
let old = self.current_bytes.fetch_sub(size, std::sync::atomic::Ordering::Relaxed);
|
||||
let old = self
|
||||
.current_bytes
|
||||
.fetch_sub(size, std::sync::atomic::Ordering::Relaxed);
|
||||
// Add semaphore permit back
|
||||
self.semaphore.add_permits(1);
|
||||
debug_assert!(old >= size, "buffer underflow: {old} < {size}");
|
||||
|
|
@ -345,7 +350,10 @@ impl CdcRedisOverflow {
|
|||
|
||||
/// Push event to Redis list (LPUSH).
|
||||
async fn push_inner(&self, event: CdcEvent) -> Result<(), CdcError> {
|
||||
let pool = self.pool.as_ref().ok_or_else(|| CdcError::SinkError("no pool".into()))?;
|
||||
let pool = self
|
||||
.pool
|
||||
.as_ref()
|
||||
.ok_or_else(|| CdcError::SinkError("no pool".into()))?;
|
||||
|
||||
// Serialize event
|
||||
let json = serde_json::to_vec(&event)
|
||||
|
|
@ -354,7 +362,8 @@ impl CdcRedisOverflow {
|
|||
|
||||
// Check size limit
|
||||
let mut conn = pool.manager.lock().await;
|
||||
let current_bytes: Option<u64> = conn.get(&self.bytes_key)
|
||||
let current_bytes: Option<u64> = conn
|
||||
.get(&self.bytes_key)
|
||||
.await
|
||||
.map_err(|e| CdcError::SinkError(format!("redis get error: {e}")))?;
|
||||
let current_bytes = current_bytes.unwrap_or(0);
|
||||
|
|
@ -394,7 +403,9 @@ impl CdcRedisOverflow {
|
|||
// Note: We can't modify self here since it's behind &self
|
||||
// This is a limitation of lazy initialization in this pattern
|
||||
// For now, we'll just return an error
|
||||
return Err(CdcError::SinkError("Redis pool not initialized - use lazy_new with explicit URL".into()));
|
||||
return Err(CdcError::SinkError(
|
||||
"Redis pool not initialized - use lazy_new with explicit URL".into(),
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -417,12 +428,10 @@ impl CdcRedisOverflow {
|
|||
#[cfg(not(feature = "redis-store"))]
|
||||
impl CdcRedisOverflow {
|
||||
/// Create a new Redis overflow backend (no-op without redis-store feature).
|
||||
pub async fn new(
|
||||
_pool: (),
|
||||
_sink_name: String,
|
||||
_max_bytes: u64,
|
||||
) -> Result<Self, CdcError> {
|
||||
Err(CdcError::SinkError("redis-store feature not enabled".into()))
|
||||
pub async fn new(_pool: (), _sink_name: String, _max_bytes: u64) -> Result<Self, CdcError> {
|
||||
Err(CdcError::SinkError(
|
||||
"redis-store feature not enabled".into(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -433,7 +442,9 @@ impl CdcOverflowBackend for CdcRedisOverflow {
|
|||
return self.push_inner(event).await;
|
||||
|
||||
#[cfg(not(feature = "redis-store"))]
|
||||
Err(CdcError::SinkError("redis-store feature not enabled".into()))
|
||||
Err(CdcError::SinkError(
|
||||
"redis-store feature not enabled".into(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn pop(&self) -> Option<CdcEvent> {
|
||||
|
|
@ -500,7 +511,8 @@ impl CdcPvcOverflow {
|
|||
}
|
||||
|
||||
fn file_path(&self) -> std::path::PathBuf {
|
||||
self.data_dir.join(format!("cdc-overflow-{}.log", self.sink_name))
|
||||
self.data_dir
|
||||
.join(format!("cdc-overflow-{}.log", self.sink_name))
|
||||
}
|
||||
|
||||
/// Push event to file (append, truncate if over limit).
|
||||
|
|
@ -534,15 +546,18 @@ impl CdcPvcOverflow {
|
|||
}
|
||||
|
||||
// Rewrite file
|
||||
let mut file = tokio::fs::File::create(&path).await
|
||||
let mut file = tokio::fs::File::create(&path)
|
||||
.await
|
||||
.map_err(|e| CdcError::SinkError(format!("create error: {e}")))?;
|
||||
for ev in events {
|
||||
let line = serde_json::to_string(&ev)
|
||||
.map_err(|e| CdcError::SinkError(format!("serialize error: {e}")))?;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
file.write_all(line.as_bytes()).await
|
||||
file.write_all(line.as_bytes())
|
||||
.await
|
||||
.map_err(|e| CdcError::SinkError(format!("write error: {e}")))?;
|
||||
file.write_all(b"\n").await
|
||||
file.write_all(b"\n")
|
||||
.await
|
||||
.map_err(|e| CdcError::SinkError(format!("write error: {e}")))?;
|
||||
}
|
||||
}
|
||||
|
|
@ -555,9 +570,11 @@ impl CdcPvcOverflow {
|
|||
.open(&path)
|
||||
.await
|
||||
.map_err(|e| CdcError::SinkError(format!("open error: {e}")))?;
|
||||
file.write_all(&json).await
|
||||
file.write_all(&json)
|
||||
.await
|
||||
.map_err(|e| CdcError::SinkError(format!("write error: {e}")))?;
|
||||
file.write_all(b"\n").await
|
||||
file.write_all(b"\n")
|
||||
.await
|
||||
.map_err(|e| CdcError::SinkError(format!("write error: {e}")))?;
|
||||
|
||||
Ok(())
|
||||
|
|
@ -634,7 +651,10 @@ impl CdcOverflowBackend for CdcDropOverflow {
|
|||
if let Some(ref callback) = self.metric_callback {
|
||||
callback(&self.sink_name);
|
||||
}
|
||||
debug!("CDC: dropped event for sink {} (overflow: drop)", self.sink_name);
|
||||
debug!(
|
||||
"CDC: dropped event for sink {} (overflow: drop)",
|
||||
self.sink_name
|
||||
);
|
||||
// Return error to signal event was dropped
|
||||
Err(CdcError::BufferOverflow)
|
||||
}
|
||||
|
|
@ -661,14 +681,17 @@ impl CdcBuffer {
|
|||
) -> Result<Self, CdcError> {
|
||||
let primary = Arc::new(CdcMemoryBuffer::new(config.memory_bytes));
|
||||
|
||||
let overflow: Arc<dyn CdcOverflowBackend + Send + Sync> = match CdcBufferType::from_str(config.overflow.as_str()) {
|
||||
let overflow: Arc<dyn CdcOverflowBackend + Send + Sync> = match CdcBufferType::from_str(
|
||||
config.overflow.as_str(),
|
||||
) {
|
||||
Some(CdcBufferType::Redis) => {
|
||||
#[cfg(feature = "redis-store")]
|
||||
{
|
||||
// Redis pool will be created lazily on first use
|
||||
let redis_url = std::env::var("MIROIR_REDIS_URL")
|
||||
.unwrap_or_else(|_| "redis://localhost:6379".to_string());
|
||||
let backend = CdcRedisOverflow::lazy_new(sink_name, config.redis_bytes, redis_url);
|
||||
let backend =
|
||||
CdcRedisOverflow::lazy_new(sink_name, config.redis_bytes, redis_url);
|
||||
Arc::new(backend)
|
||||
}
|
||||
#[cfg(not(feature = "redis-store"))]
|
||||
|
|
@ -678,8 +701,8 @@ impl CdcBuffer {
|
|||
}
|
||||
}
|
||||
Some(CdcBufferType::Pvc) => {
|
||||
let data_dir = std::env::var("MIROIR_DATA_DIR")
|
||||
.unwrap_or_else(|_| "/data".to_string());
|
||||
let data_dir =
|
||||
std::env::var("MIROIR_DATA_DIR").unwrap_or_else(|_| "/data".to_string());
|
||||
Arc::new(CdcPvcOverflow::new(
|
||||
sink_name,
|
||||
std::path::PathBuf::from(data_dir),
|
||||
|
|
@ -728,7 +751,10 @@ impl CdcBuffer {
|
|||
|
||||
/// Get total buffer size in bytes (primary + overflow).
|
||||
pub async fn size_bytes(&self) -> u64 {
|
||||
let primary = self.primary.current_bytes.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let primary = self
|
||||
.primary
|
||||
.current_bytes
|
||||
.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let overflow = self.overflow.size_bytes().await;
|
||||
primary + overflow
|
||||
}
|
||||
|
|
@ -781,7 +807,11 @@ impl CdcManager {
|
|||
let mut buffers = HashMap::new();
|
||||
for sink in &config.sinks {
|
||||
let sink_name = sink.url.clone();
|
||||
match CdcBuffer::new(&config.buffer, sink_name.clone(), dropped_metric_callback.clone()) {
|
||||
match CdcBuffer::new(
|
||||
&config.buffer,
|
||||
sink_name.clone(),
|
||||
dropped_metric_callback.clone(),
|
||||
) {
|
||||
Ok(buffer) => {
|
||||
buffers.insert(sink_name.clone(), Arc::new(buffer));
|
||||
}
|
||||
|
|
@ -797,7 +827,8 @@ impl CdcManager {
|
|||
let config_clone = config.clone();
|
||||
let buffers_clone = buffers.clone();
|
||||
tokio::spawn(async move {
|
||||
Self::background_publisher(event_rx, state_clone, config_clone, buffers_clone).await;
|
||||
Self::background_publisher(event_rx, state_clone, config_clone, buffers_clone)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -1231,8 +1262,14 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_cdc_buffer_type_from_str() {
|
||||
assert_eq!(CdcBufferType::from_str("memory"), Some(CdcBufferType::Memory));
|
||||
assert_eq!(CdcBufferType::from_str("MEMORY"), Some(CdcBufferType::Memory));
|
||||
assert_eq!(
|
||||
CdcBufferType::from_str("memory"),
|
||||
Some(CdcBufferType::Memory)
|
||||
);
|
||||
assert_eq!(
|
||||
CdcBufferType::from_str("MEMORY"),
|
||||
Some(CdcBufferType::Memory)
|
||||
);
|
||||
assert_eq!(CdcBufferType::from_str("redis"), Some(CdcBufferType::Redis));
|
||||
assert_eq!(CdcBufferType::from_str("pvc"), Some(CdcBufferType::Pvc));
|
||||
assert_eq!(CdcBufferType::from_str("drop"), Some(CdcBufferType::Drop));
|
||||
|
|
|
|||
|
|
@ -53,9 +53,9 @@ pub struct AntiEntropyWorkerConfig {
|
|||
impl Default for AntiEntropyWorkerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
interval_s: 6 * 3600, // 6 hours
|
||||
interval_s: 6 * 3600, // 6 hours
|
||||
lease_renewal_interval_ms: 5000, // 5 seconds
|
||||
lease_ttl_secs: 30, // 30 seconds
|
||||
lease_ttl_secs: 30, // 30 seconds
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ impl Default for DriftReconcilerConfig {
|
|||
interval_s: 300, // 5 minutes
|
||||
auto_repair: true,
|
||||
lease_renewal_interval_ms: 5000, // 5 seconds
|
||||
lease_ttl_secs: 30, // 30 seconds
|
||||
lease_ttl_secs: 30, // 30 seconds
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -579,8 +579,10 @@ impl TaskStore for RedisTaskStore {
|
|||
let mut p = pipe();
|
||||
p.hget(&key, "created_at");
|
||||
p.hget(&key, "status");
|
||||
let result: (Option<String>, Option<String>) =
|
||||
pool.pipeline_query(&mut p).await.map_err(|e| MiroirError::Redis(e.to_string()))?;
|
||||
let result: (Option<String>, Option<String>) = pool
|
||||
.pipeline_query(&mut p)
|
||||
.await
|
||||
.map_err(|e| MiroirError::Redis(e.to_string()))?;
|
||||
|
||||
if let (Some(created_at_str), Some(status)) = result {
|
||||
if !terminal_statuses.contains(&status.as_str()) {
|
||||
|
|
@ -614,15 +616,20 @@ impl TaskStore for RedisTaskStore {
|
|||
let status = get_field_string(&fields, "status")?;
|
||||
let node_tasks_json = get_field_string(&fields, "node_tasks")?;
|
||||
let node_tasks: HashMap<String, u64> = serde_json::from_str(&node_tasks_json)
|
||||
.map_err(|e| MiroirError::TaskStore(format!("invalid node_tasks JSON: {e}")))?;
|
||||
.map_err(|e| {
|
||||
MiroirError::TaskStore(format!("invalid node_tasks JSON: {e}"))
|
||||
})?;
|
||||
let error = opt_field(&fields, "error");
|
||||
let started_at = opt_field_i64(&fields, "started_at");
|
||||
let finished_at = opt_field_i64(&fields, "finished_at");
|
||||
let index_uid = opt_field(&fields, "index_uid");
|
||||
let task_type = opt_field(&fields, "task_type");
|
||||
let node_errors_json = opt_field(&fields, "node_errors").unwrap_or_else(|| "{}".to_string());
|
||||
let node_errors: HashMap<String, String> = serde_json::from_str(&node_errors_json)
|
||||
.map_err(|e| MiroirError::TaskStore(format!("invalid node_errors JSON: {e}")))?;
|
||||
let node_errors_json =
|
||||
opt_field(&fields, "node_errors").unwrap_or_else(|| "{}".to_string());
|
||||
let node_errors: HashMap<String, String> =
|
||||
serde_json::from_str(&node_errors_json).map_err(|e| {
|
||||
MiroirError::TaskStore(format!("invalid node_errors JSON: {e}"))
|
||||
})?;
|
||||
|
||||
results.push(TaskRow {
|
||||
miroir_id,
|
||||
|
|
|
|||
|
|
@ -4,9 +4,9 @@
|
|||
//! 1. Error format parity with Meilisearch (plan §5)
|
||||
//! 2. GET /_miroir/topology matches plan §10 JSON shape
|
||||
|
||||
use axum::response::IntoResponse;
|
||||
use miroir_core::api_error::{ErrorType, MeilisearchError, MiroirCode};
|
||||
use serde_json::json;
|
||||
use axum::response::IntoResponse;
|
||||
|
||||
/// Test 1: All Miroir error codes produce the correct Meilisearch-compatible shape.
|
||||
///
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue