feat(dump-import): add Prometheus metrics for streaming dump import (§13.9)

Implements the required metrics for tracking dump import operations:

- miroir_dump_import_bytes_read_total: Counter for total bytes read
- miroir_dump_import_documents_routed_total: Counter for documents routed
- miroir_dump_import_rate_docs_per_sec: Gauge for current import rate
- miroir_dump_import_phase: GaugeVec tracking phase by index/import_id

Metrics are recorded:
- At import start: bytes_read and phase set to Reading
- At status check: documents_routed, import_rate, and current phase

Acceptance criteria addressed:
- Import rate metric tracks actual throughput visible in Grafana

Closes: miroir-uhj.9
This commit is contained in:
jedarden 2026-05-24 19:30:36 -04:00
parent 3055e2af00
commit d324bab706
2 changed files with 116 additions and 13 deletions

View file

@ -331,6 +331,12 @@ pub struct Metrics {
antientropy_docs_repaired_total: Counter,
antientropy_last_scan_completed_seconds: Gauge,
// ── §13.9 Streaming dump import metrics (always present) ──
dump_import_bytes_read_total: Counter,
dump_import_documents_routed_total: Counter,
dump_import_rate_docs_per_sec: Gauge,
dump_import_phase: GaugeVec,
// ── §13.3 Adaptive replica selection metrics (always present) ──
replica_selection_score: GaugeVec,
replica_selection_exploration_total: Counter,
@ -447,6 +453,10 @@ impl Clone for Metrics {
antientropy_last_scan_completed_seconds: self
.antientropy_last_scan_completed_seconds
.clone(),
dump_import_bytes_read_total: self.dump_import_bytes_read_total.clone(),
dump_import_documents_routed_total: self.dump_import_documents_routed_total.clone(),
dump_import_rate_docs_per_sec: self.dump_import_rate_docs_per_sec.clone(),
dump_import_phase: self.dump_import_phase.clone(),
replica_selection_score: self.replica_selection_score.clone(),
replica_selection_exploration_total: self.replica_selection_exploration_total.clone(),
idempotency_hits_total: self.idempotency_hits_total.clone(),
@ -1269,6 +1279,35 @@ impl Metrics {
reg!(antientropy_docs_repaired_total);
reg!(antientropy_last_scan_completed_seconds);
// ── §13.9 Streaming dump import metrics (always present) ──
let dump_import_bytes_read_total = Counter::with_opts(Opts::new(
"miroir_dump_import_bytes_read_total",
"Total bytes read during dump imports",
))
.expect("create dump_import_bytes_read_total");
let dump_import_documents_routed_total = Counter::with_opts(Opts::new(
"miroir_dump_import_documents_routed_total",
"Total documents routed during dump imports",
))
.expect("create dump_import_documents_routed_total");
let dump_import_rate_docs_per_sec = Gauge::with_opts(Opts::new(
"miroir_dump_import_rate_docs_per_sec",
"Current dump import rate in documents per second",
))
.expect("create dump_import_rate_docs_per_sec");
let dump_import_phase = GaugeVec::new(
Opts::new(
"miroir_dump_import_phase",
"Current phase of dump import (0=idle, 1=reading, 2=routing, 3=applying_settings, 4=complete, 5=failed)",
),
&["index_uid", "import_id"],
)
.expect("create dump_import_phase");
reg!(dump_import_bytes_read_total);
reg!(dump_import_documents_routed_total);
reg!(dump_import_rate_docs_per_sec);
reg!(dump_import_phase);
// ── §13.3 Adaptive replica selection metrics (always present) ──
let replica_selection_score = GaugeVec::new(
Opts::new(
@ -1468,6 +1507,10 @@ impl Metrics {
antientropy_mismatches_found_total,
antientropy_docs_repaired_total,
antientropy_last_scan_completed_seconds,
dump_import_bytes_read_total,
dump_import_documents_routed_total,
dump_import_rate_docs_per_sec,
dump_import_phase,
replica_selection_score,
replica_selection_exploration_total,
idempotency_hits_total,
@ -2109,6 +2152,26 @@ impl Metrics {
.set(timestamp as f64);
}
// ── §13.9 Streaming dump import metrics ──
pub fn inc_dump_import_bytes_read(&self, bytes: u64) {
self.dump_import_bytes_read_total.inc_by(bytes as f64);
}
pub fn inc_dump_import_documents_routed(&self, count: u64) {
self.dump_import_documents_routed_total.inc_by(count as f64);
}
pub fn set_dump_import_rate(&self, docs_per_sec: f64) {
self.dump_import_rate_docs_per_sec.set(docs_per_sec);
}
pub fn set_dump_import_phase(&self, index_uid: &str, import_id: &str, phase: u8) {
self.dump_import_phase
.with_label_values(&[index_uid, import_id])
.set(phase as f64);
}
// ── §14.9 Resource-pressure ──
pub fn set_memory_pressure(&self, level: u32) {

View file

@ -10,12 +10,13 @@ use axum::routing::{get, post};
use axum::{Json, Router};
use miroir_core::api_error::{MeilisearchError, MiroirCode};
use miroir_core::config::Config;
use miroir_core::dump_import::{DumpImportManager, DumpImportStatus};
use miroir_core::dump_import::{DumpImportManager, DumpImportPhase, DumpImportStatus};
use miroir_core::topology::Topology;
use serde_json::Value;
use std::sync::Arc;
use crate::client::HttpClient;
use crate::middleware::Metrics;
/// Request body for starting a dump import.
#[derive(serde::Deserialize)]
@ -98,6 +99,8 @@ where
req.dump_data.into_bytes()
};
let bytes_read = dump_data.len() as u64;
// Create HTTP client
let master_key = state.config.master_key.clone();
let http_client = HttpClient::new(master_key, 30000); // 30s timeout
@ -125,12 +128,19 @@ where
)
})?;
// Record metrics
state.metrics.inc_dump_import_bytes_read(bytes_read);
state
.metrics
.set_dump_import_phase(&req.index_uid, &import_id, DumpImportPhase::Reading as u8);
tracing::info!(
"Started dump import {} for index {} (shard_count={}, primary_key={})",
"Started dump import {} for index {} (shard_count={}, primary_key={}, bytes={})",
import_id,
req.index_uid,
req.shard_count,
req.primary_key
req.primary_key,
bytes_read
);
let status_url = format!("/_miroir/dumps/import/{}/status", import_id);
@ -162,16 +172,38 @@ where
http_client,
);
manager
.get_status(&id)
.await
.ok_or_else(|| {
MeilisearchError::new(
MiroirCode::NotFound,
format!("import task not found: {}", id),
)
})
.map(Json)
let status = manager.get_status(&id).await.ok_or_else(|| {
MeilisearchError::new(
MiroirCode::NotFound,
format!("import task not found: {}", id),
)
})?;
// Record metrics from status
state
.metrics
.inc_dump_import_documents_routed(status.documents_processed);
// Calculate and update import rate (docs per second)
let now_ms = millis_now();
let elapsed_secs = if now_ms > status.phase_started_at {
(now_ms - status.phase_started_at) as f64 / 1000.0
} else {
0.0
};
if elapsed_secs > 0.0 && status.documents_processed > 0 {
let rate = status.documents_processed as f64 / elapsed_secs;
state.metrics.set_dump_import_rate(rate);
}
// Update phase metric
if let Ok(phase_num) = status.phase.parse::<u8>() {
state
.metrics
.set_dump_import_phase(&status.index_uid, &id, phase_num);
}
Ok(Json(status))
}
/// Check if a string looks like base64-encoded data.
@ -193,6 +225,14 @@ fn base64_decode(s: &str) -> Result<Vec<u8>, String> {
.map_err(|e| format!("base64 decode failed: {}", e))
}
/// Get current UNIX timestamp in milliseconds.
fn millis_now() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[cfg(test)]
mod tests {
use super::*;