From d324bab70641621d1fee0cf99242547cf0396cb9 Mon Sep 17 00:00:00 2001 From: jedarden Date: Sun, 24 May 2026 19:30:36 -0400 Subject: [PATCH] =?UTF-8?q?feat(dump-import):=20add=20Prometheus=20metrics?= =?UTF-8?q?=20for=20streaming=20dump=20import=20(=C2=A713.9)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the required metrics for tracking dump import operations: - miroir_dump_import_bytes_read_total: Counter for total bytes read - miroir_dump_import_documents_routed_total: Counter for documents routed - miroir_dump_import_rate_docs_per_sec: Gauge for current import rate - miroir_dump_import_phase: GaugeVec tracking phase by index/import_id Metrics are recorded: - At import start: bytes_read and phase set to Reading - At status check: documents_routed, import_rate, and current phase Acceptance criteria addressed: - Import rate metric tracks actual throughput visible in Grafana Closes: miroir-uhj.9 --- crates/miroir-proxy/src/middleware.rs | 63 +++++++++++++++++++++++ crates/miroir-proxy/src/routes/dumps.rs | 66 ++++++++++++++++++++----- 2 files changed, 116 insertions(+), 13 deletions(-) diff --git a/crates/miroir-proxy/src/middleware.rs b/crates/miroir-proxy/src/middleware.rs index c171b69..ddc066a 100644 --- a/crates/miroir-proxy/src/middleware.rs +++ b/crates/miroir-proxy/src/middleware.rs @@ -331,6 +331,12 @@ pub struct Metrics { antientropy_docs_repaired_total: Counter, antientropy_last_scan_completed_seconds: Gauge, + // ── §13.9 Streaming dump import metrics (always present) ── + dump_import_bytes_read_total: Counter, + dump_import_documents_routed_total: Counter, + dump_import_rate_docs_per_sec: Gauge, + dump_import_phase: GaugeVec, + // ── §13.3 Adaptive replica selection metrics (always present) ── replica_selection_score: GaugeVec, replica_selection_exploration_total: Counter, @@ -447,6 +453,10 @@ impl Clone for Metrics { antientropy_last_scan_completed_seconds: self .antientropy_last_scan_completed_seconds .clone(), + dump_import_bytes_read_total: self.dump_import_bytes_read_total.clone(), + dump_import_documents_routed_total: self.dump_import_documents_routed_total.clone(), + dump_import_rate_docs_per_sec: self.dump_import_rate_docs_per_sec.clone(), + dump_import_phase: self.dump_import_phase.clone(), replica_selection_score: self.replica_selection_score.clone(), replica_selection_exploration_total: self.replica_selection_exploration_total.clone(), idempotency_hits_total: self.idempotency_hits_total.clone(), @@ -1269,6 +1279,35 @@ impl Metrics { reg!(antientropy_docs_repaired_total); reg!(antientropy_last_scan_completed_seconds); + // ── §13.9 Streaming dump import metrics (always present) ── + let dump_import_bytes_read_total = Counter::with_opts(Opts::new( + "miroir_dump_import_bytes_read_total", + "Total bytes read during dump imports", + )) + .expect("create dump_import_bytes_read_total"); + let dump_import_documents_routed_total = Counter::with_opts(Opts::new( + "miroir_dump_import_documents_routed_total", + "Total documents routed during dump imports", + )) + .expect("create dump_import_documents_routed_total"); + let dump_import_rate_docs_per_sec = Gauge::with_opts(Opts::new( + "miroir_dump_import_rate_docs_per_sec", + "Current dump import rate in documents per second", + )) + .expect("create dump_import_rate_docs_per_sec"); + let dump_import_phase = GaugeVec::new( + Opts::new( + "miroir_dump_import_phase", + "Current phase of dump import (0=idle, 1=reading, 2=routing, 3=applying_settings, 4=complete, 5=failed)", + ), + &["index_uid", "import_id"], + ) + .expect("create dump_import_phase"); + reg!(dump_import_bytes_read_total); + reg!(dump_import_documents_routed_total); + reg!(dump_import_rate_docs_per_sec); + reg!(dump_import_phase); + // ── §13.3 Adaptive replica selection metrics (always present) ── let replica_selection_score = GaugeVec::new( Opts::new( @@ -1468,6 +1507,10 @@ impl Metrics { antientropy_mismatches_found_total, antientropy_docs_repaired_total, antientropy_last_scan_completed_seconds, + dump_import_bytes_read_total, + dump_import_documents_routed_total, + dump_import_rate_docs_per_sec, + dump_import_phase, replica_selection_score, replica_selection_exploration_total, idempotency_hits_total, @@ -2109,6 +2152,26 @@ impl Metrics { .set(timestamp as f64); } + // ── §13.9 Streaming dump import metrics ── + + pub fn inc_dump_import_bytes_read(&self, bytes: u64) { + self.dump_import_bytes_read_total.inc_by(bytes as f64); + } + + pub fn inc_dump_import_documents_routed(&self, count: u64) { + self.dump_import_documents_routed_total.inc_by(count as f64); + } + + pub fn set_dump_import_rate(&self, docs_per_sec: f64) { + self.dump_import_rate_docs_per_sec.set(docs_per_sec); + } + + pub fn set_dump_import_phase(&self, index_uid: &str, import_id: &str, phase: u8) { + self.dump_import_phase + .with_label_values(&[index_uid, import_id]) + .set(phase as f64); + } + // ── §14.9 Resource-pressure ── pub fn set_memory_pressure(&self, level: u32) { diff --git a/crates/miroir-proxy/src/routes/dumps.rs b/crates/miroir-proxy/src/routes/dumps.rs index 479ebdb..826195d 100644 --- a/crates/miroir-proxy/src/routes/dumps.rs +++ b/crates/miroir-proxy/src/routes/dumps.rs @@ -10,12 +10,13 @@ use axum::routing::{get, post}; use axum::{Json, Router}; use miroir_core::api_error::{MeilisearchError, MiroirCode}; use miroir_core::config::Config; -use miroir_core::dump_import::{DumpImportManager, DumpImportStatus}; +use miroir_core::dump_import::{DumpImportManager, DumpImportPhase, DumpImportStatus}; use miroir_core::topology::Topology; use serde_json::Value; use std::sync::Arc; use crate::client::HttpClient; +use crate::middleware::Metrics; /// Request body for starting a dump import. #[derive(serde::Deserialize)] @@ -98,6 +99,8 @@ where req.dump_data.into_bytes() }; + let bytes_read = dump_data.len() as u64; + // Create HTTP client let master_key = state.config.master_key.clone(); let http_client = HttpClient::new(master_key, 30000); // 30s timeout @@ -125,12 +128,19 @@ where ) })?; + // Record metrics + state.metrics.inc_dump_import_bytes_read(bytes_read); + state + .metrics + .set_dump_import_phase(&req.index_uid, &import_id, DumpImportPhase::Reading as u8); + tracing::info!( - "Started dump import {} for index {} (shard_count={}, primary_key={})", + "Started dump import {} for index {} (shard_count={}, primary_key={}, bytes={})", import_id, req.index_uid, req.shard_count, - req.primary_key + req.primary_key, + bytes_read ); let status_url = format!("/_miroir/dumps/import/{}/status", import_id); @@ -162,16 +172,38 @@ where http_client, ); - manager - .get_status(&id) - .await - .ok_or_else(|| { - MeilisearchError::new( - MiroirCode::NotFound, - format!("import task not found: {}", id), - ) - }) - .map(Json) + let status = manager.get_status(&id).await.ok_or_else(|| { + MeilisearchError::new( + MiroirCode::NotFound, + format!("import task not found: {}", id), + ) + })?; + + // Record metrics from status + state + .metrics + .inc_dump_import_documents_routed(status.documents_processed); + + // Calculate and update import rate (docs per second) + let now_ms = millis_now(); + let elapsed_secs = if now_ms > status.phase_started_at { + (now_ms - status.phase_started_at) as f64 / 1000.0 + } else { + 0.0 + }; + if elapsed_secs > 0.0 && status.documents_processed > 0 { + let rate = status.documents_processed as f64 / elapsed_secs; + state.metrics.set_dump_import_rate(rate); + } + + // Update phase metric + if let Ok(phase_num) = status.phase.parse::() { + state + .metrics + .set_dump_import_phase(&status.index_uid, &id, phase_num); + } + + Ok(Json(status)) } /// Check if a string looks like base64-encoded data. @@ -193,6 +225,14 @@ fn base64_decode(s: &str) -> Result, String> { .map_err(|e| format!("base64 decode failed: {}", e)) } +/// Get current UNIX timestamp in milliseconds. +fn millis_now() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} + #[cfg(test)] mod tests { use super::*;